from __future__ import annotations from typing import Dict, Optional from ldap3 import ALL, BASE, Connection, MODIFY_REPLACE, Server from ldap3.core.exceptions import LDAPException from ldap3.utils.conv import escape_filter_chars from ad_user_creator.exceptions import LdapConnectionError, LdapOperationError from ad_user_creator.models import LdapConfig class LdapClient: def __init__(self, config: LdapConfig) -> None: self.config = config self.server: Optional[Server] = None self.conn: Optional[Connection] = None def connect(self) -> None: try: self.server = Server(self.config.host, port=self.config.port, use_ssl=self.config.use_ssl, get_info=ALL) self.conn = Connection( self.server, user=self.config.bind_dn, password=self.config.bind_password, auto_bind=True, ) except LDAPException as exc: raise LdapConnectionError(f"LDAP 连接或绑定失败: {exc}") from exc def ensure_connected(self) -> None: if self.conn is None or not self.conn.bound: self.connect() def close(self) -> None: if self.conn is not None and self.conn.bound: self.conn.unbind() def user_exists(self, sam_account_name: str) -> bool: self.ensure_connected() assert self.conn is not None escaped = escape_filter_chars(sam_account_name) search_filter = f"(sAMAccountName={escaped})" ok = self.conn.search(self.config.base_dn, search_filter, attributes=["distinguishedName"]) if not ok: return False return len(self.conn.entries) > 0 def find_user_dn_by_sam(self, sam_account_name: str) -> Optional[str]: self.ensure_connected() assert self.conn is not None escaped = escape_filter_chars(sam_account_name) search_filter = f"(sAMAccountName={escaped})" ok = self.conn.search(self.config.base_dn, search_filter, attributes=["distinguishedName"]) if not ok or len(self.conn.entries) == 0: return None return str(self.conn.entries[0].distinguishedName.value) def group_exists(self, group_name: str) -> bool: self.ensure_connected() assert self.conn is not None escaped = escape_filter_chars(group_name) search_filter = f"(&(objectClass=group)(cn={escaped}))" ok = self.conn.search(self.config.groups_base_dn, search_filter, attributes=["distinguishedName"]) if not ok: return False return len(self.conn.entries) > 0 def get_group_dn(self, group_name: str) -> str: self.ensure_connected() assert self.conn is not None escaped = escape_filter_chars(group_name) search_filter = f"(&(objectClass=group)(cn={escaped}))" ok = self.conn.search(self.config.groups_base_dn, search_filter, attributes=["distinguishedName"]) if not ok or len(self.conn.entries) == 0: raise LdapOperationError(f"组不存在: {group_name}") return str(self.conn.entries[0].distinguishedName.value) def create_user(self, user_dn: str, attributes: Dict[str, object]) -> None: self.ensure_connected() assert self.conn is not None ok = self.conn.add(user_dn, object_class=self.config.user_object_classes, attributes=attributes) if not ok: raise LdapOperationError(f"创建用户失败: {self.conn.result}") def set_user_password(self, user_dn: str, new_password: str) -> None: self.ensure_connected() assert self.conn is not None ok = self.conn.extend.microsoft.modify_password(user_dn, new_password) if not ok: raise LdapOperationError(f"设置用户密码失败 user={user_dn} result={self.conn.result}") def set_user_enabled(self, user_dn: str, enabled: bool) -> None: self.ensure_connected() assert self.conn is not None value = "512" if enabled else "514" ok = self.conn.modify(user_dn, {"userAccountControl": [(MODIFY_REPLACE, [value])]}) if not ok: action = "启用" if enabled else "禁用" raise LdapOperationError(f"{action}用户失败 user={user_dn} result={self.conn.result}") def add_user_to_group(self, user_dn: str, group_dn: str) -> None: self.ensure_connected() assert self.conn is not None ok = self.conn.extend.microsoft.add_members_to_groups(user_dn, group_dn) if not ok: raise LdapOperationError( f"添加组成员失败 user={user_dn} group={group_dn} result={self.conn.result}" ) def get_user_attributes(self, user_dn: str, attrs: list[str]) -> Dict[str, str]: self.ensure_connected() assert self.conn is not None ok = self.conn.search(search_base=user_dn, search_filter="(objectClass=*)", search_scope=BASE, attributes=attrs) if not ok or len(self.conn.entries) == 0: raise LdapOperationError(f"读取用户属性失败 user={user_dn} result={self.conn.result}") entry = self.conn.entries[0] result: Dict[str, str] = {} for attr in attrs: if hasattr(entry, attr): value = getattr(entry, attr).value if value is None: result[attr] = "" elif isinstance(value, list): result[attr] = ",".join(str(v) for v in value) else: result[attr] = str(value) else: result[attr] = "" return result def modify_user_attributes(self, user_dn: str, changes: Dict[str, str]) -> None: self.ensure_connected() assert self.conn is not None operations = {key: [(MODIFY_REPLACE, [value])] for key, value in changes.items()} ok = self.conn.modify(user_dn, operations) if not ok: raise LdapOperationError(f"更新用户属性失败 user={user_dn} result={self.conn.result}") def add_user_to_group_if_missing(self, user_dn: str, group_dn: str) -> bool: self.ensure_connected() assert self.conn is not None ok = self.conn.search( search_base=group_dn, search_filter="(objectClass=group)", search_scope=BASE, attributes=["member"], ) if not ok or len(self.conn.entries) == 0: raise LdapOperationError(f"读取组成员失败 group={group_dn} result={self.conn.result}") members = self.conn.entries[0].member.values if hasattr(self.conn.entries[0], "member") else [] normalized_members = {str(item).lower() for item in members} if user_dn.lower() in normalized_members: return False self.add_user_to_group(user_dn, group_dn) return True