from __future__ import annotations from dataclasses import asdict from typing import Dict, List, Tuple from ad_user_creator.exceptions import InputValidationError, LdapOperationError from ad_user_creator.ldap_client import LdapClient from ad_user_creator.models import AppConfig, ResolvedUserPlan, UserInputRecord, UserProcessResult from ad_user_creator.persistence import StateStore def build_department_dn_fragment(dept_ou: str) -> str: parts = [part.strip() for part in dept_ou.split("/") if part.strip()] if not parts: raise InputValidationError("部门 OU 不能为空") return ",".join(f"OU={part}" for part in reversed(parts)) def build_user_dn(display_name: str, dept_ou: str, people_base_dn: str) -> str: dept_fragment = build_department_dn_fragment(dept_ou) return f"CN={display_name},{dept_fragment},{people_base_dn}" class UserService: def __init__(self, config: AppConfig, state_store: StateStore, ldap_client: LdapClient) -> None: self.config = config self.state_store = state_store self.ldap_client = ldap_client self.group_gid_map = self.state_store.load_group_gid_map() def preview_plan(self, record: UserInputRecord) -> ResolvedUserPlan: gid_number = self._resolve_base_gid(record.base_group) next_uid_number = self.state_store.get_next_uid_number() return self._resolve_plan(record, next_uid_number, gid_number, optional_missing_groups=[]) def process_user(self, record: UserInputRecord, dry_run: bool = False) -> UserProcessResult: gid_number = self._resolve_base_gid(record.base_group) optional_missing_groups: List[str] = [] if not dry_run: base_ok, missing_optional = self._validate_groups(record) optional_missing_groups = missing_optional if not base_ok: return UserProcessResult( status="FAILED", reason=f"基础组不存在: {record.base_group}", raw=asdict(record), ) if missing_optional and not self.config.behavior.skip_missing_optional_groups: return UserProcessResult( status="FAILED", reason=f"可选组不存在且配置不允许跳过: {','.join(missing_optional)}", raw=asdict(record), ) if not dry_run: existing_user_dn = self.ldap_client.find_user_dn_by_sam(record.sam_account_name) if existing_user_dn: return self._process_existing_user( record=record, existing_user_dn=existing_user_dn, gid_number=gid_number, optional_missing_groups=optional_missing_groups, ) uid_number = self.state_store.get_next_uid_number() if dry_run else self.state_store.commit_next_uid_number() plan = self._resolve_plan(record, uid_number, gid_number, optional_missing_groups=optional_missing_groups) if dry_run: reason = "dry-run 未写入 LDAP(将执行: 创建用户->设置初始密码->启用用户->加组)" if optional_missing_groups: reason += f";可选组缺失: {','.join(optional_missing_groups)}" return UserProcessResult( status="CREATED", reason=reason, user_dn=plan.user_dn, uid_number=plan.uid_number, raw=asdict(record), ) attrs = self._build_ldap_attributes(plan) try: self.ldap_client.create_user(plan.user_dn, attrs) except LdapOperationError as exc: return UserProcessResult( status="FAILED", reason=f"create-user-failed: {exc}", user_dn=plan.user_dn, raw=asdict(record), ) try: self.ldap_client.set_user_password(plan.user_dn, self.config.defaults.initial_password) except LdapOperationError as exc: return UserProcessResult( status="FAILED", reason=f"password-set-failed: {exc}", user_dn=plan.user_dn, raw=asdict(record), ) try: self.ldap_client.set_user_enabled(plan.user_dn, enabled=True) except LdapOperationError as exc: return UserProcessResult( status="FAILED", reason=f"enable-user-failed: {exc}", user_dn=plan.user_dn, raw=asdict(record), ) try: added_groups = self._ensure_groups(plan.user_dn, plan.base_group, plan.project_groups, plan.resource_groups, plan.optional_missing_groups) except LdapOperationError as exc: return UserProcessResult( status="FAILED", reason=f"add-group-failed: {exc}", user_dn=plan.user_dn, raw=asdict(record), ) reason = "创建成功" if optional_missing_groups: reason += f";可选组已跳过: {','.join(optional_missing_groups)}" if added_groups: reason += f";新增组成员: {','.join(added_groups)}" return UserProcessResult( status="CREATED", reason=reason, user_dn=plan.user_dn, uid_number=plan.uid_number, raw=asdict(record), ) def _resolve_base_gid(self, base_group: str) -> int: if base_group not in self.group_gid_map: raise InputValidationError(f"基础组未配置 gidNumber: {base_group}") return int(self.group_gid_map[base_group]) def _resolve_plan( self, record: UserInputRecord, uid_number: int, gid_number: int, optional_missing_groups: List[str], ) -> ResolvedUserPlan: user_dn = build_user_dn(record.display_name, record.dept_ou, self.config.ldap.people_base_dn) return ResolvedUserPlan( user_dn=user_dn, display_name=record.display_name, sam_account_name=record.sam_account_name, email=record.email, uid=record.sam_account_name, uid_number=uid_number, gid_number=gid_number, unix_home_directory=f"/home/{record.sam_account_name}", base_group=record.base_group, project_groups=record.project_groups, resource_groups=record.resource_groups, optional_missing_groups=optional_missing_groups, ) def _validate_groups(self, record: UserInputRecord) -> Tuple[bool, List[str]]: if not self.ldap_client.group_exists(record.base_group): return False, [] optional_missing: List[str] = [] for group in record.project_groups + record.resource_groups: if not self.ldap_client.group_exists(group): optional_missing.append(group) return True, optional_missing def _build_ldap_attributes(self, plan: ResolvedUserPlan) -> Dict[str, object]: attrs: Dict[str, object] = { "cn": plan.display_name, "displayName": plan.display_name, "sAMAccountName": plan.sam_account_name, "mail": plan.email, "uid": plan.uid, "uidNumber": str(plan.uid_number), "gidNumber": str(plan.gid_number), "unixHomeDirectory": plan.unix_home_directory, "userAccountControl": "514", } if self.config.ldap.upn_suffix: attrs["userPrincipalName"] = f"{plan.sam_account_name}@{self.config.ldap.upn_suffix}" return attrs def _build_desired_update_attrs(self, record: UserInputRecord, gid_number: int) -> Dict[str, str]: desired = { "displayName": record.display_name, "mail": record.email, "uid": record.sam_account_name, "unixHomeDirectory": f"/home/{record.sam_account_name}", "gidNumber": str(gid_number), } if self.config.ldap.upn_suffix: desired["userPrincipalName"] = f"{record.sam_account_name}@{self.config.ldap.upn_suffix}" return desired def _calculate_attr_changes(self, current: Dict[str, str], desired: Dict[str, str]) -> Dict[str, str]: changes: Dict[str, str] = {} for key, desired_value in desired.items(): current_value = str(current.get(key, "") or "").strip() if current_value != str(desired_value).strip(): changes[key] = desired_value return changes def _ensure_groups( self, user_dn: str, base_group: str, project_groups: List[str], resource_groups: List[str], optional_missing_groups: List[str], ) -> List[str]: added_groups: List[str] = [] base_group_dn = self.ldap_client.get_group_dn(base_group) if self.ldap_client.add_user_to_group_if_missing(user_dn, base_group_dn): added_groups.append(base_group) optional_groups = list(project_groups) + list(resource_groups) skip_set = set(optional_missing_groups) for group in optional_groups: if group in skip_set: continue group_dn = self.ldap_client.get_group_dn(group) if self.ldap_client.add_user_to_group_if_missing(user_dn, group_dn): added_groups.append(group) return added_groups def _process_existing_user( self, record: UserInputRecord, existing_user_dn: str, gid_number: int, optional_missing_groups: List[str], ) -> UserProcessResult: attrs_to_read = ["displayName", "mail", "uid", "unixHomeDirectory", "gidNumber"] if self.config.ldap.upn_suffix: attrs_to_read.append("userPrincipalName") try: current_attrs = self.ldap_client.get_user_attributes(existing_user_dn, attrs_to_read) desired_attrs = self._build_desired_update_attrs(record, gid_number) changes = self._calculate_attr_changes(current_attrs, desired_attrs) if changes: self.ldap_client.modify_user_attributes(existing_user_dn, changes) added_groups = self._ensure_groups( user_dn=existing_user_dn, base_group=record.base_group, project_groups=record.project_groups, resource_groups=record.resource_groups, optional_missing_groups=optional_missing_groups, ) except LdapOperationError as exc: return UserProcessResult( status="FAILED", reason=f"update-user-failed: {exc}", user_dn=existing_user_dn, raw=asdict(record), ) if changes: reason = f"已更新字段: {','.join(changes.keys())}" if added_groups: reason += f";新增组成员: {','.join(added_groups)}" if optional_missing_groups: reason += f";可选组已跳过: {','.join(optional_missing_groups)}" return UserProcessResult(status="UPDATED", reason=reason, user_dn=existing_user_dn, raw=asdict(record)) reason = "用户已存在且字段无变化" if added_groups: reason += f";新增组成员: {','.join(added_groups)}" return UserProcessResult(status="UPDATED", reason=reason, user_dn=existing_user_dn, raw=asdict(record)) if optional_missing_groups: reason += f";可选组已跳过: {','.join(optional_missing_groups)}" return UserProcessResult(status="SKIPPED_NO_CHANGE", reason=reason, user_dn=existing_user_dn, raw=asdict(record))