Vastai-ConnectHub/extensions/sync_ehr_to_ad/job.py

653 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import json
import logging
import re
from typing import Any
from app.jobs.base import BaseJob
from extensions.sync_ehr_to_ad.api import ActiveDirectoryClient, SyncEhrToAdApi
logger = logging.getLogger("connecthub.extensions.sync_ehr_to_ad")
_EHR_AD_ACCOUNT_KEY = "extADAccountName_606508_511687157"
_EHR_WORK_LOCATION_TEXT_KEY = "extgzddxx1_606508_892394263Text"
_EHR_STREET_ADDRESS_KEY = "extgzddxx_606508_618643707"
def _to_bool_or_none(v: Any) -> bool | None:
if v is None:
return None
if isinstance(v, bool):
return v
s = str(v).strip().lower()
if s in ("1", "true", "yes", "y", "on"):
return True
if s in ("0", "false", "no", "n", "off", ""):
return False
return bool(v)
def _to_int_safe(v: Any) -> int:
try:
return int(str(v).strip())
except Exception:
return 0
def _scalar_value(raw: Any) -> str:
if isinstance(raw, dict):
val = raw.get("value")
if val is None or str(val).strip() == "":
val = raw.get("showValue")
return str(val or "").strip()
if isinstance(raw, (list, tuple, set)):
return "\n".join([str(x).strip() for x in raw if x is not None and str(x).strip() != ""])
return str(raw or "").strip()
def _custom_prop_value(custom_props: Any, key: str | None) -> str:
if not key or not isinstance(custom_props, dict):
return ""
return _scalar_value(custom_props.get(key))
def _translate_value(node: dict[str, Any], key: str | None) -> str:
if not key or not isinstance(node, dict):
return ""
translate = node.get("translateProperties")
if not isinstance(translate, dict):
return ""
candidates = [key, f"{key}Text"]
if key.endswith("Text"):
candidates.insert(0, key)
candidates.append(key[:-4])
for candidate in candidates:
s = str(translate.get(candidate) or "").strip()
if s:
return s
return ""
def _field_value(item: dict[str, Any], key: str) -> str:
emp = item.get("employeeInfo") or {}
rec = item.get("recordInfo") or {}
for node in (emp, rec):
if not isinstance(node, dict):
continue
s = _scalar_value(node.get(key))
if s:
return s
s = _translate_value(node, key)
if s:
return s
s = _custom_prop_value(node.get("customProperties"), key)
if s:
return s
for child in node.values():
if not isinstance(child, dict):
continue
s = _scalar_value(child.get(key))
if s:
return s
s = _translate_value(child, key)
if s:
return s
s = _custom_prop_value(child.get("customProperties"), key)
if s:
return s
return ""
def _field_translate_or_value(item: dict[str, Any], key: str) -> str:
emp = item.get("employeeInfo") or {}
rec = item.get("recordInfo") or {}
for node in (emp, rec):
if isinstance(node, dict):
s = _translate_value(node, key)
if s:
return s
return _field_value(item, key)
def _extract_mobile_phone(emp_info: dict[str, Any]) -> str:
candidate_keys = (
"mobile",
"mobilePhone",
"MobilePhone",
"phone",
"phoneNumber",
"PhoneNumber",
"tel",
"telephone",
)
for key in candidate_keys:
s = _scalar_value(emp_info.get(key))
if s:
return s
translate = emp_info.get("translateProperties")
if isinstance(translate, dict):
for key in candidate_keys:
s = str(translate.get(key) or "").strip()
if s:
return s
return ""
def _choose_better_record(current: dict[str, Any], candidate: dict[str, Any]) -> dict[str, Any]:
def _score(item: dict[str, Any]) -> str:
record = item.get("recordInfo") or {}
emp = item.get("employeeInfo") or {}
return "|".join(
[
str(record.get("businessModifiedTime") or ""),
str(record.get("modifiedTime") or ""),
str(emp.get("businessModifiedTime") or ""),
str(emp.get("modifiedTime") or ""),
str(record.get("createdTime") or ""),
str(emp.get("createdTime") or ""),
]
)
return candidate if _score(candidate) >= _score(current) else current
def _has_cjk(s: str) -> bool:
return bool(re.search(r"[\u4e00-\u9fff]", str(s or "")))
def _display_name(given_name: str, sn: str, name: str) -> str:
english = " ".join([x for x in (given_name.strip(), sn.strip()) if x]).strip()
raw_name = str(name or "").strip()
if raw_name and _has_cjk(raw_name):
return " ".join([x for x in (english, raw_name) if x]).strip()
return english or raw_name
def _proxy_addresses(email: str, sam: str, alias_domain: str | None) -> list[str]:
clean_email = str(email or "").strip()
clean_sam = str(sam or "").strip()
if not clean_email:
return []
domain = str(alias_domain or "").strip()
if not domain and "@" in clean_email:
domain = clean_email.split("@", 1)[1]
values = [f"SMTP:{clean_email}"]
if clean_sam and domain:
values.append(f"smtp:{clean_sam}@{domain}")
out: list[str] = []
seen: set[str] = set()
for value in values:
key = value.lower()
if key in seen:
continue
seen.add(key)
out.append(value)
return out
def _target_values(value: Any) -> list[str]:
if isinstance(value, (list, tuple, set)):
return [str(x).strip() for x in value if x is not None and str(x).strip() != ""]
if value is None or str(value).strip() == "":
return []
return [str(value).strip()]
def _ad_values(attrs: dict[str, Any], attr_name: str) -> list[str]:
raw = attrs.get(attr_name)
if raw is None:
lower_attr = attr_name.lower()
for key, value in attrs.items():
if str(key).lower() == lower_attr:
raw = value
break
if isinstance(raw, (list, tuple, set)):
return [str(x).strip() for x in raw if x is not None and str(x).strip() != ""]
if raw is None or str(raw).strip() == "":
return []
return [str(raw).strip()]
def _canonical_values(attr_name: str, values: list[str]) -> list[str]:
if attr_name in {"proxyAddresses"}:
return sorted(values)
if attr_name in {"manager", "mail", "sAMAccountName"}:
return sorted([v.lower() for v in values])
return sorted(values)
def _diff_ad_attributes(ad_attrs: dict[str, Any], target_attrs: dict[str, Any]) -> dict[str, Any]:
diff: dict[str, Any] = {}
for attr_name, target_value in target_attrs.items():
wanted = _target_values(target_value)
if not wanted:
continue
current = _ad_values(ad_attrs, attr_name)
if _canonical_values(attr_name, current) == _canonical_values(attr_name, wanted):
continue
diff[attr_name] = target_value
return diff
def _location_from_workplace(workplace: str, mappings: dict[str, Any] | None = None) -> dict[str, Any]:
text = str(workplace or "").strip()
defaults: dict[str, dict[str, Any]] = {
"上海": {"co": "China", "c": "CN", "countryCode": 156, "st": "Shanghai", "l": "Shanghai"},
"shanghai": {"co": "China", "c": "CN", "countryCode": 156, "st": "Shanghai", "l": "Shanghai"},
"北京": {"co": "China", "c": "CN", "countryCode": 156, "st": "Beijing", "l": "Beijing"},
"beijing": {"co": "China", "c": "CN", "countryCode": 156, "st": "Beijing", "l": "Beijing"},
"深圳": {"co": "China", "c": "CN", "countryCode": 156, "st": "Guangdong", "l": "Shenzhen"},
"shenzhen": {"co": "China", "c": "CN", "countryCode": 156, "st": "Guangdong", "l": "Shenzhen"},
}
merged = dict(defaults)
if isinstance(mappings, dict):
for k, v in mappings.items():
if isinstance(v, dict):
merged[str(k).strip().lower()] = v
lower_text = text.lower()
for needle, value in merged.items():
if needle and needle in lower_text:
return dict(value)
if "中国" in text or "china" in lower_text:
return {"co": "China", "c": "CN", "countryCode": 156}
return {}
def _org_name(org: dict[str, Any]) -> str:
for key in ("name", "Name", "shortName", "ShortName"):
s = str(org.get(key) or "").strip()
if s:
return s
return ""
def _org_code(org: dict[str, Any]) -> str:
for key in ("code", "Code", "orgCode", "OrgCode"):
s = str(org.get(key) or "").strip()
if s:
return s
return ""
def _root_org_name(org: dict[str, Any], org_by_oid: dict[str, dict[str, Any]]) -> str:
cur = org
seen: set[str] = set()
last_name = _org_name(cur)
while isinstance(cur, dict):
oid = str(cur.get("oId") or cur.get("oid") or cur.get("id") or "").strip()
if oid:
if oid in seen:
break
seen.add(oid)
name = _org_name(cur)
if name:
last_name = name
parent_oid = str(
cur.get("pOId")
or cur.get("parentOId")
or cur.get("oIdParent")
or cur.get("parentId")
or cur.get("ParentId")
or ""
).strip()
if not parent_oid or parent_oid not in org_by_oid:
break
cur = org_by_oid[parent_oid]
return last_name
def _is_current_employee(item: dict[str, Any], current_status_values: set[str]) -> bool:
rec = item.get("recordInfo") or {}
emp = item.get("employeeInfo") or {}
if not isinstance(rec, dict) or not isinstance(emp, dict):
return False
if str(rec.get("lastWorkDate") or "").strip():
return False
status = _field_translate_or_value(item, "EmployeeStatus")
if current_status_values and status and status not in current_status_values:
return False
for key in ("isDeleted", "IsDeleted", "deleted", "disabled", "Disabled"):
raw = emp.get(key, rec.get(key))
if _to_bool_or_none(raw) is True:
return False
return True
def _job_post_name(job_post: dict[str, Any]) -> str:
for key in ("name", "Name", "jobPostName", "JobPostName"):
s = str(job_post.get(key) or "").strip()
if s:
return s
return ""
def _parse_target_sam_accounts(params: dict[str, Any]) -> set[str]:
raw = params.get("target_sam_accounts")
if raw is None:
return set()
if not isinstance(raw, list):
raise ValueError("public_cfg.target_sam_accounts must be a JSON array, e.g. [\"fchen\", \"jqian\"]")
return {str(x).strip().lower() for x in raw if str(x).strip()}
class SyncEhrToAdUserJob(BaseJob):
"""
EHR 当前人员 -> 本地 AD 用户属性同步。
- 只更新 AD 中已存在的 sAMAccountName 用户,不自动创建用户。
- AD 连接信息从 Job params/secrets 注入。
"""
job_id = "sync_ehr_to_ad.sync_users"
def run(self, params: dict[str, Any], secrets: dict[str, Any]) -> dict[str, Any]:
app_key = str(secrets.get("app_key") or "").strip()
app_secret = str(secrets.get("app_secret") or "").strip()
if not app_key or not app_secret:
raise ValueError("secret_cfg.app_key and secret_cfg.app_secret are required")
ldap_uri = str(params.get("ldap_uri") or secrets.get("ldap_uri") or "").strip()
ldap_base_dn = str(params.get("ldap_base_dn") or secrets.get("ldap_base_dn") or "").strip()
ldap_bind_dn = str(params.get("ldap_bind_dn") or secrets.get("ldap_bind_dn") or "").strip()
ldap_bind_password = str(params.get("ldap_bind_password") or secrets.get("ldap_bind_password") or "")
if not ldap_uri or not ldap_base_dn or not ldap_bind_dn or not ldap_bind_password:
raise ValueError("ldap_uri/ldap_base_dn/ldap_bind_dn/ldap_bind_password are required")
ldap_user_filter = str(params.get("ldap_user_filter") or "(sAMAccountName={sAMAccountName})").strip()
ldap_use_starttls = _to_bool_or_none(params.get("ldap_use_starttls"))
ldap_verify_tls = _to_bool_or_none(params.get("ldap_verify_tls"))
dry_run = _to_bool_or_none(params.get("dry_run"))
verbose_trace = _to_bool_or_none(params.get("verbose_trace"))
verify_after_write = _to_bool_or_none(params.get("verify_after_write"))
if ldap_use_starttls is None:
ldap_use_starttls = False
if ldap_verify_tls is None:
ldap_verify_tls = True
if dry_run is None:
dry_run = False
if verbose_trace is None:
verbose_trace = True
if verify_after_write is None:
verify_after_write = True
stop_time = params.get("stop_time")
capacity = int(params.get("capacity") or 300)
if capacity <= 0 or capacity > 300:
capacity = 300
max_users = int(params.get("max_users") or 0)
connect_timeout_s = int(params.get("ldap_connect_timeout_s") or 10)
domain_account_key = str(params.get("domain_account_custom_key") or "").strip() or _EHR_AD_ACCOUNT_KEY
work_location_text_key = str(params.get("work_location_text_key") or "").strip() or _EHR_WORK_LOCATION_TEXT_KEY
street_address_key = str(params.get("street_address_key") or "").strip() or _EHR_STREET_ADDRESS_KEY
proxy_alias_domain = str(params.get("proxy_alias_domain") or "").strip() or None
department_code_attr = str(params.get("department_code_ad_attribute") or "departmentNumber").strip()
postal_code = str(params.get("postal_code") or "").strip()
default_company = str(params.get("default_company") or "").strip()
target_sam_accounts = _parse_target_sam_accounts(params)
current_status_values_param = params.get("current_status_values")
if isinstance(current_status_values_param, list):
current_status_values = {str(x).strip() for x in current_status_values_param if str(x).strip()}
elif str(current_status_values_param or "").strip():
current_status_values = {x.strip() for x in str(current_status_values_param).split(",") if x.strip()}
else:
current_status_values = set()
location_mappings = params.get("location_mappings")
location_mappings = location_mappings if isinstance(location_mappings, dict) else None
ehr = SyncEhrToAdApi(secret_params={"app_key": app_key, "app_secret": app_secret})
ad = ActiveDirectoryClient(
ldap_uri=ldap_uri,
bind_dn=ldap_bind_dn,
bind_password=ldap_bind_password,
base_dn=ldap_base_dn,
user_filter=ldap_user_filter,
use_starttls=ldap_use_starttls,
verify_tls=ldap_verify_tls,
connect_timeout_s=connect_timeout_s,
)
try:
ad.ping()
logger.info("AD 连接检查通过uri=%s base_dn=%s dry_run=%s", ldap_uri, ldap_base_dn, dry_run)
emp_res = ehr.get_all_employees_with_record_by_time_window(
stop_time=stop_time,
capacity=capacity,
with_disabled=False,
is_with_deleted=False,
)
org_res = ehr.get_all_organizations_by_time_window(
stop_time=stop_time,
capacity=capacity,
with_disabled=False,
is_with_deleted=False,
)
job_post_res = ehr.get_all_job_posts_by_time_window(
stop_time=stop_time,
capacity=capacity,
with_disabled=False,
is_with_deleted=False,
)
emp_rows = emp_res.get("data") or []
org_rows = org_res.get("data") or []
job_post_rows = job_post_res.get("data") or []
if not isinstance(emp_rows, list) or not isinstance(org_rows, list) or not isinstance(job_post_rows, list):
raise RuntimeError("EHR result invalid: data is not list")
org_by_oid: dict[str, dict[str, Any]] = {}
for org in org_rows:
if not isinstance(org, dict):
continue
oid = str(org.get("oId") or org.get("oid") or org.get("id") or "").strip()
if oid:
org_by_oid[oid] = org
job_post_by_oid: dict[str, dict[str, Any]] = {}
for job_post in job_post_rows:
if not isinstance(job_post, dict):
continue
oid = str(job_post.get("oId") or job_post.get("oid") or job_post.get("id") or "").strip()
if oid:
job_post_by_oid[oid] = job_post
users_by_sam: dict[str, dict[str, Any]] = {}
user_id_to_sam: dict[int, str] = {}
for item in emp_rows:
if not isinstance(item, dict) or not _is_current_employee(item, current_status_values):
continue
sam = _field_value(item, domain_account_key)
if not sam:
continue
sam_key = sam.lower()
user_id = _to_int_safe((item.get("employeeInfo") or {}).get("userID"))
if user_id > 0:
user_id_to_sam[user_id] = sam
if target_sam_accounts and sam_key not in target_sam_accounts:
continue
existing = users_by_sam.get(sam_key)
users_by_sam[sam_key] = item if existing is None else _choose_better_record(existing, item)
if max_users > 0 and len(users_by_sam) >= max_users:
break
logger.info(
"EHR 当前用户准备完成employee_rows=%s current_with_ad_account=%s org_rows=%s job_post_rows=%s target_sam_accounts=%s",
len(emp_rows),
len(users_by_sam),
len(org_rows),
len(job_post_rows),
len(target_sam_accounts),
)
processed = 0
updated = 0
skipped_unchanged = 0
skipped_missing_sam = 0
skipped_not_found_ad = 0
failed = 0
manager_dn_cache: dict[str, str] = {}
ad_compare_attributes = [
"sAMAccountName",
"givenName",
"sn",
"title",
"department",
"manager",
"proxyAddresses",
"co",
"c",
"countryCode",
"company",
"displayName",
"mail",
"employeeID",
"employeeType",
"mobile",
"physicalDeliveryOfficeName",
"postalCode",
"st",
"l",
"streetAddress",
]
if department_code_attr:
ad_compare_attributes.append(department_code_attr)
for sam_key, item in users_by_sam.items():
emp = item.get("employeeInfo") or {}
rec = item.get("recordInfo") or {}
if not isinstance(emp, dict):
emp = {}
if not isinstance(rec, dict):
rec = {}
sam = _field_value(item, domain_account_key)
if not sam:
skipped_missing_sam += 1
continue
processed += 1
try:
ad_user = ad.find_user(sam, attributes=ad_compare_attributes)
if not ad_user:
skipped_not_found_ad += 1
logger.warning("AD 用户不存在跳过sAMAccountName=%s", sam)
continue
org_oid = str(rec.get("oIdDepartment") or rec.get("OIdDepartment") or "").strip()
org = org_by_oid.get(org_oid, {})
job_post_oid = str(rec.get("oIdJobPost") or rec.get("OIdJobPost") or "").strip()
job_post = job_post_by_oid.get(job_post_oid, {})
given_name = _field_value(item, "PhoneticOfMing")
surname = _field_value(item, "PhoneticOfXing")
name = str(emp.get("name") or emp.get("Name") or "").strip()
title = _field_translate_or_value(item, "OIdJobPost") or _job_post_name(job_post)
department = _field_translate_or_value(item, "OIdDepartment") or _org_name(org)
department_code = _org_code(org)
employee_status = _field_translate_or_value(item, "EmployeeStatus")
email = _field_value(item, "Email") or _field_value(item, "email")
job_number = str(rec.get("jobNumber") or rec.get("JobNumber") or "").strip()
mobile = _field_value(item, "MobilePhone") or _extract_mobile_phone(emp)
office = _field_translate_or_value(item, "Place")
workplace_text = _field_translate_or_value(item, work_location_text_key)
street_address = _field_value(item, street_address_key)
if verbose_trace:
logger.info(
"AD 地址字段解析sam=%s street_address_key=%s streetAddress=%r",
sam,
street_address_key,
street_address,
)
company = default_company or _root_org_name(org, org_by_oid)
location_attrs = _location_from_workplace(workplace_text or office, location_mappings)
manager_dn = ""
manager_uid = _to_int_safe(rec.get("pOIdEmpAdmin") or rec.get("POIdEmpAdmin"))
manager_sam = user_id_to_sam.get(manager_uid, "")
if manager_sam:
manager_dn = manager_dn_cache.get(manager_sam, "")
if not manager_dn:
manager_ad_user = ad.find_user(manager_sam)
if manager_ad_user:
manager_dn = str(manager_ad_user.get("dn") or "")
manager_dn_cache[manager_sam] = manager_dn
attributes: dict[str, Any] = {
"sAMAccountName": sam,
"givenName": given_name,
"sn": surname,
"title": title,
"department": department,
"manager": manager_dn,
"proxyAddresses": _proxy_addresses(email, sam, proxy_alias_domain),
"co": location_attrs.get("co"),
"c": location_attrs.get("c"),
"countryCode": location_attrs.get("countryCode"),
"company": company,
"displayName": _display_name(given_name, surname, name),
"mail": email,
"employeeID": job_number,
"employeeType": employee_status,
"mobile": mobile,
"physicalDeliveryOfficeName": office,
"postalCode": postal_code,
"st": location_attrs.get("st"),
"l": location_attrs.get("l"),
"streetAddress": street_address,
}
if department_code_attr and department_code:
attributes[department_code_attr] = department_code
diff_attributes = _diff_ad_attributes(ad_user.get("attributes") or {}, attributes)
if not diff_attributes:
skipped_unchanged += 1
if verbose_trace:
logger.info("AD 用户信息一致跳过更新sam=%s dn=%s", sam, ad_user["dn"])
continue
changed = ad.modify_user(str(ad_user["dn"]), diff_attributes, dry_run=dry_run)
if changed:
if verify_after_write and not dry_run:
verify_attrs = list(diff_attributes.keys())
readback_user = ad.read_user_by_dn(str(ad_user["dn"]), attributes=verify_attrs)
readback_attrs = (readback_user or {}).get("attributes") or {}
remaining_diff = _diff_ad_attributes(readback_attrs, diff_attributes)
if remaining_diff:
raise RuntimeError(
"AD modify returned success but readback still differs: "
f"sam={sam!r} dn={ad_user['dn']!r} attrs={sorted(remaining_diff.keys())}"
)
updated += 1
if verbose_trace:
logger.info(
"AD 用户同步完成sam=%s dn=%s changed_attrs=%s",
sam,
ad_user["dn"],
json.dumps({k: v for k, v in diff_attributes.items() if v}, ensure_ascii=False, default=str),
)
except Exception as e: # noqa: BLE001
failed += 1
logger.exception("AD 用户同步失败sam_key=%s error=%r", sam_key, e)
result = {
"ok": failed == 0,
"dry_run": dry_run,
"filtered_by_target_sam": bool(target_sam_accounts),
"target_sam_accounts": len(target_sam_accounts),
"ehr_employee_rows": len(emp_rows),
"ehr_current_users_with_ad_account": len(users_by_sam),
"processed": processed,
"updated": updated,
"skipped_unchanged": skipped_unchanged,
"skipped_missing_sam": skipped_missing_sam,
"skipped_not_found_ad": skipped_not_found_ad,
"failed": failed,
}
logger.info("EHR -> AD 同步结束:%s", result)
return result
finally:
ehr.close()