from __future__ import annotations from dataclasses import dataclass from typing import Any from sqlalchemy import select from sqlalchemy.orm import Session from app.core.config import settings from app.db.engine import get_session from app.db.models import LdapConfig from app.security.fernet import decrypt_json, encrypt_json LDAP_CONFIG_NAME = "default" @dataclass(frozen=True) class RuntimeLdapConfig: enabled: bool uri: str bind_dn: str bind_password: str base_dn: str user_filter: str group_filter: str use_starttls: bool verify_tls: bool def encrypt_bind_password(password: str) -> str: if not password: return "" return encrypt_json({"password": password}) def decrypt_bind_password(token: str) -> str: if not token: return "" try: data = decrypt_json(token) except Exception: return "" value = data.get("password") return str(value) if value is not None else "" def get_default_ldap_config(session: Session) -> LdapConfig | None: return session.scalar(select(LdapConfig).where(LdapConfig.name == LDAP_CONFIG_NAME)) def ensure_default_ldap_config() -> None: db = get_session() try: if get_default_ldap_config(db): return config = LdapConfig( name=LDAP_CONFIG_NAME, enabled=bool(settings.ldap_base_dn), uri=settings.ldap_uri, bind_dn=settings.ldap_bind_dn, bind_password_encrypted=encrypt_bind_password(settings.ldap_bind_password), base_dn=settings.ldap_base_dn, user_filter=settings.ldap_user_filter, group_filter=settings.ldap_group_filter, use_starttls=settings.ldap_use_starttls, verify_tls=settings.ldap_verify_tls, last_test_result={}, ) db.add(config) db.commit() finally: db.close() def to_runtime_config(config: LdapConfig | None) -> RuntimeLdapConfig | None: if not config or not config.enabled: return None return RuntimeLdapConfig( enabled=bool(config.enabled), uri=config.uri, bind_dn=config.bind_dn, bind_password=decrypt_bind_password(config.bind_password_encrypted), base_dn=config.base_dn, user_filter=config.user_filter, group_filter=config.group_filter, use_starttls=bool(config.use_starttls), verify_tls=bool(config.verify_tls), ) def get_runtime_ldap_config() -> RuntimeLdapConfig | None: db = get_session() try: return to_runtime_config(get_default_ldap_config(db)) finally: db.close() def mask_sensitive_result(result: dict[str, Any]) -> dict[str, Any]: cleaned = dict(result) cleaned.pop("password", None) cleaned.pop("bind_password", None) return cleaned