This commit is contained in:
Marsway 2026-03-25 15:51:06 +08:00
parent 48c4816439
commit 92a3a1bed7
2 changed files with 141 additions and 15 deletions

View File

@ -464,3 +464,55 @@ class SyncEhrToOaApi:
len(out),
)
return out
def get_staff_profiles_by_user_ids(self, *, user_ids: list[int], chunk_size: int = 100) -> dict[int, dict[str, Any]]:
"""
调用 UserFrameworkApiV3 获取员工信息 userId
接口GET /UserFrameworkApiV3/api/v1/staffs/Get
返回{userId: staff_profile}
"""
if chunk_size <= 0:
chunk_size = 100
clean_ids: list[int] = []
seen: set[int] = set()
for u in user_ids:
try:
uid = int(u)
except Exception:
continue
if uid <= 0 or uid in seen:
continue
seen.add(uid)
clean_ids.append(uid)
if not clean_ids:
return {}
out: dict[int, dict[str, Any]] = {}
for i in range(0, len(clean_ids), chunk_size):
chunk = clean_ids[i : i + chunk_size]
for uid in chunk:
resp = self._client.request(
"GET",
"/UserFrameworkApiV3/api/v1/staffs/Get",
params={"userId": uid},
)
payload = resp.json() if resp.content else {}
# 兼容多种返回结构
data = payload.get("data", payload)
if isinstance(data, list):
items = [x for x in data if isinstance(x, dict)]
if items:
out[uid] = items[0]
continue
if isinstance(data, dict):
# 可能是单条,也可能是 map
if "userId" in data or "UserId" in data:
out[uid] = data
else:
# map 场景key=uid
d2 = data.get(str(uid))
if isinstance(d2, dict):
out[uid] = d2
# 非 200 业务码场景不硬失败,避免单个用户影响全量
logger.info("EHR 员工详情查询完成input_user_ids=%s matched_profiles=%s", len(clean_ids), len(out))
return out

View File

@ -27,6 +27,8 @@ _OA_SQLSERVER_SCHEMA = "dbo"
_OA_SQLSERVER_TABLE = "formmain_20250359"
_OA_SQLSERVER_JOB_NO_COLUMN = "field0001"
_OA_SQLSERVER_ID_COLUMN = "id"
_EHR_RD_ATTR_KEY = "extyfsx_606508_585814777"
_EHR_HRBP_ID_KEY = "extdyhrbp_606508_1933587232"
def _cell_value(cell: Any) -> str:
@ -103,6 +105,49 @@ def _prefer_non_empty(new_val: Any, old_val: Any) -> str:
return str(old_val or "").strip()
def _rd_attr_to_text(raw: Any) -> str:
s = str(raw or "").strip()
if s == "1":
return "研发"
if s == "0":
return "非研发"
return ""
def _to_int_safe(v: Any) -> int:
try:
return int(str(v).strip())
except Exception:
return 0
def _extract_reporting_user_id(staff_profile: dict[str, Any]) -> int:
"""
staff/Get 返回中解析汇报人 userId兼容大小写与 list/dict 结构
"""
if not isinstance(staff_profile, dict):
return 0
reportings = staff_profile.get("Reportings", staff_profile.get("reportings"))
if isinstance(reportings, list) and reportings:
first = reportings[0]
if isinstance(first, dict):
return _to_int_safe(first.get("userid", first.get("userId")))
if isinstance(reportings, dict):
return _to_int_safe(reportings.get("userid", reportings.get("userId")))
return 0
def _extract_staff_code(staff_profile: dict[str, Any]) -> str:
if not isinstance(staff_profile, dict):
return ""
return str(
staff_profile.get("staffCode")
or staff_profile.get("StaffCode")
or staff_profile.get("employeeNo")
or ""
).strip()
def _extract_oa_row_id_and_fields(row: dict[str, Any]) -> tuple[int | None, dict[str, Any]]:
"""
兼容不同 OA export 返回结构提取
@ -227,7 +272,6 @@ class SyncEhrToOaFormJob(BaseJob):
if not app_key or not app_secret:
raise ValueError("secret_cfg.app_key and secret_cfg.app_secret are required")
rd_attr_custom_key = str(params.get("rd_attr_custom_key") or "").strip() or None
domain_custom_key = str(params.get("domain_account_custom_key") or "").strip() or None
verbose_trace = _to_bool_or_none(params.get("verbose_trace"))
if verbose_trace is None:
@ -324,6 +368,36 @@ class SyncEhrToOaFormJob(BaseJob):
len(contract_user_ids),
len(first_party_by_user_id),
)
# 3.2) 根据员工 userId 查询 staff 信息,用于解析汇报人与工号映射
staff_profile_by_user_id = ehr.get_staff_profiles_by_user_ids(user_ids=contract_user_ids)
reporting_user_ids: list[int] = []
for p in staff_profile_by_user_id.values():
rid = _extract_reporting_user_id(p)
if rid > 0:
reporting_user_ids.append(rid)
# 3.3) 收集 HRBP userId来自 recordInfo 自定义字段),并统一反查工号
hrbp_user_ids: list[int] = []
for item in ehr_by_job_no.values():
rec = item.get("recordInfo") or {}
if not isinstance(rec, dict):
continue
hrbp_uid = _to_int_safe(rec.get(_EHR_HRBP_ID_KEY) or _custom_prop_value(rec.get("customProperties"), _EHR_HRBP_ID_KEY))
if hrbp_uid > 0:
hrbp_user_ids.append(hrbp_uid)
resolve_user_ids = list({*reporting_user_ids, *hrbp_user_ids})
resolve_profile_by_user_id = ehr.get_staff_profiles_by_user_ids(user_ids=resolve_user_ids)
user_id_to_staff_code: dict[int, str] = {}
for uid, profile in resolve_profile_by_user_id.items():
code = _extract_staff_code(profile)
if code:
user_id_to_staff_code[int(uid)] = code
logger.info(
"人员工号反查完成staff_profiles=%s reportings=%s hrbp_ids=%s resolved_staff_codes=%s",
len(staff_profile_by_user_id),
len(reporting_user_ids),
len(hrbp_user_ids),
len(user_id_to_staff_code),
)
if verbose_trace:
for job_no in list(ehr_by_job_no.keys()):
logger.info("EHR 工号明细raw=%s norm=%s", job_no, _normalize_job_no(job_no))
@ -347,15 +421,15 @@ class SyncEhrToOaFormJob(BaseJob):
org = org_by_oid.get(org_oid, {})
company = str(first_party_by_user_id.get(user_id) or str((org or {}).get("name") or ""))
name = str(emp.get("name") or "")
rd_attr = _custom_prop_value(rec.get("customProperties"), rd_attr_custom_key) or _custom_prop_value(
emp.get("customProperties"), rd_attr_custom_key
)
rd_attr = _rd_attr_to_text(_custom_prop_value(emp.get("customProperties"), _EHR_RD_ATTR_KEY))
place = str(rec.get("place") or "")
entry_date = _date_only(rec.get("entryDate"))
leave_date = _date_only(rec.get("lastWorkDate")) or "2099-12-31"
leave_date = _date_only(rec.get("lastWorkDate"))
id_number = str(emp.get("iDNumber") or "")
hrbp = str((org or {}).get("hRBP") or "")
manager = str(rec.get("pOIdEmpAdmin") or "")
hrbp_uid = _to_int_safe(rec.get(_EHR_HRBP_ID_KEY) or _custom_prop_value(rec.get("customProperties"), _EHR_HRBP_ID_KEY))
hrbp = str(user_id_to_staff_code.get(hrbp_uid) or "")
manager_uid = _extract_reporting_user_id(staff_profile_by_user_id.get(user_id, {}))
manager = str(user_id_to_staff_code.get(manager_uid) or "")
is_leaving = "" if _date_only(rec.get("lastWorkDate")) else ""
domain_account = _custom_prop_value(emp.get("customProperties"), domain_custom_key) or str(emp.get("_Name") or "")
logger.info(
@ -572,15 +646,15 @@ class SyncEhrToOaFormJob(BaseJob):
company = str(first_party_by_user_id.get(user_id) or str((org or {}).get("name") or ""))
name = str(emp.get("name") or "")
rd_attr = _custom_prop_value(rec.get("customProperties"), rd_attr_custom_key) or _custom_prop_value(
emp.get("customProperties"), rd_attr_custom_key
)
rd_attr = _rd_attr_to_text(_custom_prop_value(emp.get("customProperties"), _EHR_RD_ATTR_KEY))
place = str(rec.get("place") or "")
entry_date = _date_only(rec.get("entryDate"))
leave_date = _date_only(rec.get("lastWorkDate")) or "2099-12-31"
leave_date = _date_only(rec.get("lastWorkDate"))
id_number = str(emp.get("iDNumber") or "")
hrbp = str((org or {}).get("hRBP") or "")
manager = str(rec.get("pOIdEmpAdmin") or "")
hrbp_uid = _to_int_safe(rec.get(_EHR_HRBP_ID_KEY) or _custom_prop_value(rec.get("customProperties"), _EHR_HRBP_ID_KEY))
hrbp = str(user_id_to_staff_code.get(hrbp_uid) or "")
manager_uid = _extract_reporting_user_id(staff_profile_by_user_id.get(user_id, {}))
manager = str(user_id_to_staff_code.get(manager_uid) or "")
is_leaving = "" if _date_only(rec.get("lastWorkDate")) else ""
domain_account = _custom_prop_value(emp.get("customProperties"), domain_custom_key) or str(emp.get("_Name") or "")
@ -589,8 +663,8 @@ class SyncEhrToOaFormJob(BaseJob):
rd_attr = _prefer_non_empty(rd_attr, _cell_value(existing_field_map.get(display_to_code["研发属性"])))
place = _prefer_non_empty(place, _cell_value(existing_field_map.get(display_to_code["工作地点"])))
entry_date = _prefer_non_empty(entry_date, _cell_value(existing_field_map.get(display_to_code["入职日期"])))
# 离职日期按需求默认 2099-12-31仅当已有值且北森也空时可被已有值覆盖
leave_date = _prefer_non_empty(leave_date, _cell_value(existing_field_map.get(display_to_code["离职日期"])))
# 未离职不填离职日期(保持空),不再回填旧值。
leave_date = str(leave_date or "").strip()
id_number = _prefer_non_empty(id_number, _cell_value(existing_field_map.get(display_to_code["身份证号"])))
hrbp = _prefer_non_empty(hrbp, _cell_value(existing_field_map.get(display_to_code["HRBP"])))
manager = _prefer_non_empty(manager, _cell_value(existing_field_map.get(display_to_code["汇报人"])))