From 48c48164397ee421ad3945299877e8a72874f497 Mon Sep 17 00:00:00 2001 From: Marsway Date: Wed, 25 Mar 2026 15:31:05 +0800 Subject: [PATCH] update --- extensions/sync_ehr_to_oa/api.py | 100 +++++++++++++++++++++++++++++++ extensions/sync_ehr_to_oa/job.py | 73 ++++++++++++---------- 2 files changed, 142 insertions(+), 31 deletions(-) diff --git a/extensions/sync_ehr_to_oa/api.py b/extensions/sync_ehr_to_oa/api.py index c305360..6a5f955 100644 --- a/extensions/sync_ehr_to_oa/api.py +++ b/extensions/sync_ehr_to_oa/api.py @@ -364,3 +364,103 @@ class SyncEhrToOaApi: is_with_deleted=is_with_deleted, max_pages=max_pages, ) + + @staticmethod + def _pick_company_from_contracts(contracts: list[dict[str, Any]]) -> str: + if not contracts: + return "" + + def _sort_key(item: dict[str, Any]) -> str: + return str(item.get("effectiveDate") or item.get("createdTime") or item.get("modifiedTime") or "") + + sorted_items = sorted([x for x in contracts if isinstance(x, dict)], key=_sort_key, reverse=True) + for c in sorted_items: + first_party = str(c.get("firstParty") or "").strip() + if first_party: + return first_party + return "" + + def get_contract_first_party_by_user_ids( + self, + *, + user_ids: list[int], + is_current_effective: bool = True, + status: int | None = 1, + contract_type: int | None = None, + is_with_deleted: bool = False, + columns: list[str] | None = None, + enable_translate: bool = False, + chunk_size: int = 300, + ) -> dict[int, str]: + """ + 调用合同接口按员工 UserID 集合获取所属公司(firstParty)。 + 接口:POST /TenantBaseExternal/api/v5/Contract/GetByUserIds + """ + if chunk_size <= 0 or chunk_size > 300: + raise ValueError("chunk_size must be in range [1, 300]") + if not user_ids: + return {} + + clean_ids: list[int] = [] + seen: set[int] = set() + for u in user_ids: + try: + uid = int(u) + except Exception: + continue + if uid <= 0: + continue + if uid in seen: + continue + seen.add(uid) + clean_ids.append(uid) + if not clean_ids: + return {} + + out: dict[int, str] = {} + for i in range(0, len(clean_ids), chunk_size): + chunk = clean_ids[i : i + chunk_size] + body: dict[str, Any] = { + "oIds": chunk, + "isCurrentEffective": is_current_effective, + "isWithDeleted": is_with_deleted, + "enableTranslate": enable_translate, + } + if status is not None: + body["status"] = status + if contract_type is not None: + body["contractType"] = contract_type + if columns is not None: + body["columns"] = columns + + resp = self._client.request( + "POST", + "/TenantBaseExternal/api/v5/Contract/GetByUserIds", + json=body, + headers={"Content-Type": "application/json"}, + ) + payload = resp.json() if resp.content else {} + code = str(payload.get("code", "") or "") + if code != "200": + raise RuntimeError(f"EHR Contract.GetByUserIds failed code={code!r} message={payload.get('message')!r}") + + data = payload.get("data") or {} + if not isinstance(data, dict): + raise RuntimeError("EHR Contract.GetByUserIds invalid response: data is not an object") + + for k, v in data.items(): + try: + uid = int(str(k)) + except Exception: + continue + contracts = v if isinstance(v, list) else [] + company = self._pick_company_from_contracts(contracts) + if company: + out[uid] = company + + logger.info( + "EHR 合同公司查询完成:input_user_ids=%s matched_first_party=%s", + len(clean_ids), + len(out), + ) + return out diff --git a/extensions/sync_ehr_to_oa/job.py b/extensions/sync_ehr_to_oa/job.py index 73417b4..4f58267 100644 --- a/extensions/sync_ehr_to_oa/job.py +++ b/extensions/sync_ehr_to_oa/job.py @@ -103,23 +103,6 @@ def _prefer_non_empty(new_val: Any, old_val: Any) -> str: return str(old_val or "").strip() -def _resolve_org_for_company(record_info: dict[str, Any], org_by_oid: dict[str, dict[str, Any]]) -> tuple[dict[str, Any], str, str]: - """ - 解析“所属公司”使用的组织对象。 - 经验上 oIdOrganization 可能是公司根(导致全员同值),优先尝试 oIdDepartment。 - 返回:(org_obj, source_field, source_oid) - """ - dept_oid = str(record_info.get("oIdDepartment") or "").strip() - org_oid = str(record_info.get("oIdOrganization") or "").strip() - for source, oid in (("oIdDepartment", dept_oid), ("oIdOrganization", org_oid)): - if not oid: - continue - org = org_by_oid.get(oid) or {} - if isinstance(org, dict) and str(org.get("name") or "").strip(): - return org, source, oid - return {}, "", "" - - def _extract_oa_row_id_and_fields(row: dict[str, Any]) -> tuple[int | None, dict[str, Any]]: """ 兼容不同 OA export 返回结构,提取: @@ -317,6 +300,30 @@ class SyncEhrToOaFormJob(BaseJob): len(ehr_by_job_no), len(ehr_by_job_no_norm), ) + # 3.1) 按员工 UserID 查询合同主体公司(firstParty),作为“所属公司”的优先来源 + contract_user_ids: list[int] = [] + for item in ehr_by_job_no.values(): + emp_info = item.get("employeeInfo") or {} + if not isinstance(emp_info, dict): + continue + try: + uid = int(emp_info.get("userID")) + except Exception: + continue + if uid > 0: + contract_user_ids.append(uid) + first_party_by_user_id = ehr.get_contract_first_party_by_user_ids( + user_ids=contract_user_ids, + is_current_effective=True, + status=1, + contract_type=None, + is_with_deleted=False, + ) + logger.info( + "所属公司来源(合同主体)准备完成:input_user_ids=%s matched=%s", + len(contract_user_ids), + len(first_party_by_user_id), + ) if verbose_trace: for job_no in list(ehr_by_job_no.keys()): logger.info("EHR 工号明细:raw=%s norm=%s", job_no, _normalize_job_no(job_no)) @@ -331,8 +338,14 @@ class SyncEhrToOaFormJob(BaseJob): if not isinstance(rec, dict): rec = {} - org, company_source, company_source_oid = _resolve_org_for_company(rec, org_by_oid) - company = str((org or {}).get("name") or "") + user_id = 0 + try: + user_id = int(emp.get("userID")) + except Exception: + user_id = 0 + org_oid = str(rec.get("oIdOrganization") or rec.get("oIdDepartment") or "").strip() + org = org_by_oid.get(org_oid, {}) + company = str(first_party_by_user_id.get(user_id) or str((org or {}).get("name") or "")) name = str(emp.get("name") or "") rd_attr = _custom_prop_value(rec.get("customProperties"), rd_attr_custom_key) or _custom_prop_value( emp.get("customProperties"), rd_attr_custom_key @@ -346,11 +359,9 @@ class SyncEhrToOaFormJob(BaseJob): is_leaving = "是" if _date_only(rec.get("lastWorkDate")) else "否" domain_account = _custom_prop_value(emp.get("customProperties"), domain_custom_key) or str(emp.get("_Name") or "") logger.info( - "EHR 字段预览:job_no=%s company=%s company_source=%s company_source_oid=%s name=%s rd_attr=%s place=%s entry_date=%s leave_date=%s id_number=%s hrbp=%s manager=%s is_leaving=%s domain_account=%s", + "EHR 字段预览:job_no=%s company=%s name=%s rd_attr=%s place=%s entry_date=%s leave_date=%s id_number=%s hrbp=%s manager=%s is_leaving=%s domain_account=%s", job_no, company, - company_source, - company_source_oid, name, rd_attr, place, @@ -550,10 +561,16 @@ class SyncEhrToOaFormJob(BaseJob): if not isinstance(rec, dict): rec = {} - org, company_source, company_source_oid = _resolve_org_for_company(rec, org_by_oid) + user_id = 0 + try: + user_id = int(emp.get("userID")) + except Exception: + user_id = 0 + org_oid = str(rec.get("oIdOrganization") or rec.get("oIdDepartment") or "").strip() + org = org_by_oid.get(org_oid, {}) existing_field_map = oa_fields_by_job_no_norm.get(_normalize_job_no(job_no), {}) - company = str((org or {}).get("name") or "") + company = str(first_party_by_user_id.get(user_id) or str((org or {}).get("name") or "")) name = str(emp.get("name") or "") rd_attr = _custom_prop_value(rec.get("customProperties"), rd_attr_custom_key) or _custom_prop_value( emp.get("customProperties"), rd_attr_custom_key @@ -594,13 +611,7 @@ class SyncEhrToOaFormJob(BaseJob): {"name": display_to_code["域账号"], "value": domain_account, "showValue": domain_account}, ] if verbose_trace: - logger.info( - "字段映射:job_no=%s row_id=%s company_source=%s company_source_oid=%s", - job_no, - oa_record_id, - company_source, - company_source_oid, - ) + logger.info("字段映射:job_no=%s row_id=%s", job_no, oa_record_id) for fld in fields_payload: logger.info("字段映射明细:job_no=%s field=%s value=%s", job_no, fld["name"], fld["value"])