from __future__ import annotations import csv import sys from pathlib import Path from typing import Dict, List, Optional from ad_user_creator.cli import build_parser from ad_user_creator.config import load_config from ad_user_creator.exceptions import AppError, ConfigError, InputValidationError, LdapConnectionError from ad_user_creator.input_parser import parse_input_file from ad_user_creator.interactive import run_interactive_create from ad_user_creator.ldap_client import LdapClient from ad_user_creator.logging_setup import setup_logging from ad_user_creator.models import UserProcessResult from ad_user_creator.persistence import StateStore from ad_user_creator.user_service import UserService def _write_batch_results(path: str, rows: List[Dict[str, str]]) -> None: output = Path(path) output.parent.mkdir(parents=True, exist_ok=True) headers = ["姓名", "用户名", "邮箱", "部门 OU", "基础组", "项目组", "资源组", "状态", "原因", "用户DN", "uidNumber"] with output.open("w", encoding="utf-8-sig", newline="") as handle: writer = csv.DictWriter(handle, fieldnames=headers) writer.writeheader() for row in rows: writer.writerow(row) def _to_bool_text(value: str) -> bool: return value.lower() == "true" def _result_to_row(raw: Dict[str, str], result: UserProcessResult) -> Dict[str, str]: return { "姓名": raw.get("姓名", ""), "用户名": raw.get("用户名", ""), "邮箱": raw.get("邮箱", ""), "部门 OU": raw.get("部门 OU", ""), "基础组": raw.get("基础组", ""), "项目组": raw.get("项目组", ""), "资源组": raw.get("资源组", ""), "状态": result.status, "原因": result.reason, "用户DN": result.user_dn, "uidNumber": "" if result.uid_number is None else str(result.uid_number), } def execute_command( command: str, config_path: str = "config/config.yaml", dry_run: bool = False, input_path: Optional[str] = None, continue_on_error: bool = True, ) -> int: try: config = load_config(config_path=config_path, cli_dry_run=dry_run) except ConfigError as exc: print(f"[FATAL] 配置错误: {exc}") return 2 logger = setup_logging(config.paths.log_file) logger.info("配置加载完成") state = StateStore( uid_state_file=config.paths.uid_state_file, group_gid_map_file=config.paths.group_gid_map_file, initial_uid_number=config.defaults.initial_uid_number, ) if command == "init-state": if dry_run: print(f"[DRY-RUN] 将初始化: {config.paths.uid_state_file} 和 {config.paths.group_gid_map_file}") return 0 state.ensure_state_files() print("状态文件初始化完成。") return 0 state.ensure_state_files() ldap_client = LdapClient(config.ldap) if not config.behavior.dry_run: try: ldap_client.connect() logger.info("LDAP 连接成功") except LdapConnectionError as exc: print(f"[FATAL] LDAP 连接失败: {exc}") return 2 service = UserService(config=config, state_store=state, ldap_client=ldap_client) try: if command == "interactive": try: run_interactive_create( user_service=service, default_base_group=config.defaults.base_group, dry_run=config.behavior.dry_run, ) return 0 except (InputValidationError, AppError) as exc: print(f"[ERROR] 交互式执行失败: {exc}") return 1 if command == "batch": if not input_path: print("[ERROR] batch 模式需要提供输入文件路径") return 1 try: records = parse_input_file(input_path) except InputValidationError as exc: print(f"[ERROR] 输入文件校验失败: {exc}") return 1 created = 0 updated = 0 skipped = 0 failed = 0 result_rows: List[Dict[str, str]] = [] for idx, (record, raw) in enumerate(records, start=1): logger.info("处理第 %s 条: %s", idx, record.sam_account_name) try: result = service.process_user(record, dry_run=config.behavior.dry_run) except (InputValidationError, AppError) as exc: result = UserProcessResult(status="FAILED", reason=str(exc), raw=raw) if result.status == "CREATED": created += 1 elif result.status == "UPDATED": updated += 1 elif result.status in {"SKIPPED_EXISTS", "SKIPPED_NO_CHANGE"}: skipped += 1 else: failed += 1 result_rows.append(_result_to_row(raw, result)) print(f"[{idx}/{len(records)}] {raw.get('用户名', '')} -> {result.status} {result.reason}") if result.status == "FAILED" and not continue_on_error: break _write_batch_results(config.paths.batch_result_file, result_rows) total = len(result_rows) print( f"完成: total={total}, created={created}, updated={updated}, skipped={skipped}, failed={failed}, " f"result={config.paths.batch_result_file}" ) return 1 if failed > 0 else 0 finally: ldap_client.close() return 0 def run() -> int: parser = build_parser() args = parser.parse_args() dry_run = bool(getattr(args, "dry_run", False)) input_path = getattr(args, "input", None) continue_on_error = _to_bool_text(getattr(args, "continue_on_error", "true")) return execute_command( command=args.command, config_path=args.config, dry_run=dry_run, input_path=input_path, continue_on_error=continue_on_error, ) if __name__ == "__main__": sys.exit(run())