Vastai-ConnectHub/extensions/sync_ehr_to_oa/job.py

483 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import json
import logging
from typing import Any
from app.integrations.seeyon import SeeyonClient
from app.jobs.base import BaseJob
from extensions.sync_ehr_to_oa.api import SyncEhrToOaApi
logger = logging.getLogger("connecthub.extensions.sync_ehr_to_oa")
def _cell_value(cell: Any) -> str:
if isinstance(cell, dict):
v = cell.get("value")
if v is None or str(v).strip() == "":
v = cell.get("showValue")
return str(v or "").strip()
return str(cell or "").strip()
def _date_only(s: Any) -> str:
v = str(s or "").strip()
if not v:
return ""
if "T" in v:
return v.split("T", 1)[0]
if " " in v:
return v.split(" ", 1)[0]
return v
def _custom_prop_value(custom_props: Any, key: str | None) -> str:
if not key:
return ""
if not isinstance(custom_props, dict):
return ""
raw = custom_props.get(key)
if isinstance(raw, dict):
val = raw.get("value")
if val is None or str(val).strip() == "":
val = raw.get("showValue")
return str(val or "").strip()
return str(raw or "").strip()
def _to_bool_or_none(v: Any) -> bool | None:
if v is None:
return None
if isinstance(v, bool):
return v
s = str(v).strip().lower()
if s in ("1", "true", "yes", "y", "on"):
return True
if s in ("0", "false", "no", "n", "off", ""):
return False
return bool(v)
def _normalize_job_no(v: Any) -> str:
"""
工号标准化:
- 去首尾空白、去内部空格
- 数值型字符串如 123.0 -> 123常见于表单数字字段
- 统一大写,便于大小写不敏感匹配
"""
s = str(v or "").strip()
if not s:
return ""
s = s.replace(" ", "")
try:
if "." in s and s.endswith(".0"):
i = int(float(s))
s = str(i)
except Exception:
pass
return s.upper()
def _choose_better_record(current: dict[str, Any], candidate: dict[str, Any]) -> dict[str, Any]:
def _score(item: dict[str, Any]) -> str:
record = item.get("recordInfo") or {}
emp = item.get("employeeInfo") or {}
parts = [
str(record.get("businessModifiedTime") or ""),
str(record.get("modifiedTime") or ""),
str(emp.get("businessModifiedTime") or ""),
str(emp.get("modifiedTime") or ""),
str(record.get("createdTime") or ""),
str(emp.get("createdTime") or ""),
]
return "|".join(parts)
return candidate if _score(candidate) >= _score(current) else current
class SyncEhrToOaFormJob(BaseJob):
"""
EHR -> OA 无流程表单字段同步。
同步字段:
- 工号(作为唯一对应关系,不写入)
- 所属公司、姓名、研发属性、工作地点、入职日期、离职日期、身份证号、HRBP、汇报人、在离职、域账号
"""
job_id = "sync_ehr_to_oa.sync_form"
def run(self, params: dict[str, Any], secrets: dict[str, Any]) -> dict[str, Any]:
oa_base_url = str(params.get("oa_base_url") or "").strip()
oa_form_code = str(params.get("oa_form_code") or "").strip()
oa_right_id = str(params.get("oa_right_id") or "").strip()
oa_login_name = str(params.get("oa_login_name") or "").strip()
if not oa_base_url:
raise ValueError("public_cfg.oa_base_url is required")
if not oa_form_code:
raise ValueError("public_cfg.oa_form_code is required")
if not oa_right_id:
raise ValueError("public_cfg.oa_right_id is required")
if not oa_login_name:
raise ValueError("public_cfg.oa_login_name is required")
oa_template_code = str(params.get("oa_template_code") or oa_form_code).strip()
oa_master_table_name = str(params.get("oa_master_table_name") or "").strip()
batch_size = int(params.get("batch_size") or 100)
if batch_size <= 0:
batch_size = 100
stop_time = params.get("stop_time")
capacity = int(params.get("capacity") or 300)
do_trigger = params.get("do_trigger")
sender_login_name = params.get("senderLoginName")
sender_login_name = str(sender_login_name).strip() if sender_login_name else None
rest_user = str(secrets.get("rest_user") or "").strip()
rest_password = str(secrets.get("rest_password") or "").strip()
login_name = secrets.get("loginName")
login_name = str(login_name).strip() if login_name else None
if not rest_user or not rest_password:
raise ValueError("secret_cfg.rest_user and secret_cfg.rest_password are required")
app_key = str(secrets.get("app_key") or "").strip()
app_secret = str(secrets.get("app_secret") or "").strip()
if not app_key or not app_secret:
raise ValueError("secret_cfg.app_key and secret_cfg.app_secret are required")
rd_attr_custom_key = str(params.get("rd_attr_custom_key") or "").strip() or None
domain_custom_key = str(params.get("domain_account_custom_key") or "").strip() or None
verbose_trace = _to_bool_or_none(params.get("verbose_trace"))
if verbose_trace is None:
verbose_trace = True
seeyon = SeeyonClient(base_url=oa_base_url, rest_user=rest_user, rest_password=rest_password, loginName=login_name)
ehr = SyncEhrToOaApi(secret_params={"app_key": app_key, "app_secret": app_secret})
try:
# 1) EHR 拉取员工任职与组织
emp_res = ehr.get_all_employees_with_record_by_time_window(stop_time=stop_time, capacity=capacity)
org_res = ehr.get_all_organizations_by_time_window(stop_time=stop_time, capacity=capacity)
emp_rows = emp_res.get("data") or []
org_rows = org_res.get("data") or []
if not isinstance(emp_rows, list):
raise RuntimeError("EHR employee result invalid: data is not list")
if not isinstance(org_rows, list):
raise RuntimeError("EHR organization result invalid: data is not list")
# 2) 组织映射
org_by_oid: dict[str, dict[str, Any]] = {}
for o in org_rows:
if not isinstance(o, dict):
continue
oid = str(o.get("oId") or "").strip()
if oid:
org_by_oid[oid] = o
# 3) 员工按工号归并(同工号保留“最新”记录)
ehr_by_job_no: dict[str, dict[str, Any]] = {}
ehr_by_job_no_norm: dict[str, dict[str, Any]] = {}
for item in emp_rows:
if not isinstance(item, dict):
continue
record = item.get("recordInfo") or {}
if not isinstance(record, dict):
continue
job_no = str(record.get("jobNumber") or "").strip()
if not job_no:
continue
existing = ehr_by_job_no.get(job_no)
ehr_by_job_no[job_no] = item if existing is None else _choose_better_record(existing, item)
job_no_norm = _normalize_job_no(job_no)
if job_no_norm:
ex2 = ehr_by_job_no_norm.get(job_no_norm)
ehr_by_job_no_norm[job_no_norm] = item if ex2 is None else _choose_better_record(ex2, item)
logger.info(
"EHR 数据准备完成employee_rows=%s organization_rows=%s distinct_job_numbers=%s distinct_job_numbers_norm=%s",
len(emp_rows),
len(org_rows),
len(ehr_by_job_no),
len(ehr_by_job_no_norm),
)
if verbose_trace:
for job_no in list(ehr_by_job_no.keys()):
logger.info("EHR 工号明细raw=%s norm=%s", job_no, _normalize_job_no(job_no))
# 4) 导出 OA 表单,建立字段映射 + 工号到记录ID映射
exp_resp = seeyon.export_cap4_form_soap(
templateCode=oa_template_code,
senderLoginName=sender_login_name,
rightId=oa_right_id,
)
raw = exp_resp.text or ""
logger.info(
"OA export 返回status=%s content_length=%s template=%s",
exp_resp.status_code,
len(raw),
oa_template_code,
)
if raw:
logger.info("OA export 响应预览:%s", raw[:1000])
try:
payload = json.loads(raw) if raw else {}
except Exception as e: # noqa: BLE001
raise RuntimeError(f"OA export 响应不是有效 JSON: err={e!r} preview={raw[:500]!r}") from e
export_code = payload.get("code")
export_message = payload.get("message")
if export_code not in (None, 0, "0"):
raise RuntimeError(f"OA export failed code={export_code!r} message={export_message!r}")
outer = payload.get("data") or {}
form = outer.get("data") or {}
if not isinstance(form, dict):
raise RuntimeError(
f"OA export invalid: data.data is not an object; payload_keys={list(payload.keys())[:20]}"
)
definition = form.get("definition") or {}
fields = definition.get("fields") or []
if not isinstance(fields, list):
raise RuntimeError("OA export invalid: definition.fields is not a list")
display_to_code: dict[str, str] = {}
for f in fields:
if not isinstance(f, dict):
continue
display = str(f.get("display") or "").strip()
name = str(f.get("name") or "").strip()
if display and name:
display_to_code[display] = name
needed_displays = [
"工号",
"所属公司",
"姓名",
"研发属性",
"工作地点",
"入职日期",
"离职日期",
"身份证号",
"HRBP",
"汇报人",
"在离职",
"域账号",
]
missing = [x for x in needed_displays if x not in display_to_code]
if missing:
raise RuntimeError(f"OA export invalid: missing form fields by display names: {missing}")
rows = form.get("data") or []
if not isinstance(rows, list):
raise RuntimeError("OA export invalid: data is not a list")
if not oa_master_table_name:
for key in ("masterTableName", "masterTable", "masterTableCode"):
v = str((definition or {}).get(key) or "").strip()
if v:
oa_master_table_name = v
break
if not oa_master_table_name and rows:
r0 = rows[0] if isinstance(rows[0], dict) else {}
master_tbl = r0.get("masterTable")
if isinstance(master_tbl, dict):
oa_master_table_name = str(master_tbl.get("name") or "").strip()
if not oa_master_table_name:
raise RuntimeError("public_cfg.oa_master_table_name is required (cannot infer from OA export)")
logger.info(
"OA 表单解析完成template=%s master_table=%s form_rows=%s",
oa_template_code,
oa_master_table_name,
len(rows),
)
job_field_code = display_to_code["工号"]
oa_id_by_job_no: dict[str, int] = {}
oa_id_by_job_no_norm: dict[str, int] = {}
for row in rows:
if not isinstance(row, dict):
continue
master = row.get("masterData") or {}
if not isinstance(master, dict):
continue
job_no = _cell_value(master.get(job_field_code))
if not job_no:
continue
row_id_raw = row.get("id")
if row_id_raw is None:
row_id_raw = row.get("masterDataId")
if row_id_raw is None:
row_id_raw = master.get("id")
if row_id_raw is None:
continue
try:
row_id = int(str(row_id_raw))
except Exception:
continue
oa_id_by_job_no[job_no] = row_id
job_no_norm = _normalize_job_no(job_no)
if job_no_norm:
oa_id_by_job_no_norm[job_no_norm] = row_id
logger.info(
"OA 工号索引完成indexed_job_numbers=%s indexed_job_numbers_norm=%s",
len(oa_id_by_job_no),
len(oa_id_by_job_no_norm),
)
if verbose_trace:
for job_no, row_id in list(oa_id_by_job_no.items()):
logger.info("OA 工号索引明细raw=%s norm=%s row_id=%s", job_no, _normalize_job_no(job_no), row_id)
# 5) 组装批量更新数据
data_list: list[dict[str, Any]] = []
not_found_in_oa = 0
unmatched_samples: list[str] = []
for job_no, item in ehr_by_job_no.items():
oa_record_id = oa_id_by_job_no.get(job_no)
matched_by = "raw"
if oa_record_id is None:
oa_record_id = oa_id_by_job_no_norm.get(_normalize_job_no(job_no))
matched_by = "normalized"
if oa_record_id is None:
not_found_in_oa += 1
if len(unmatched_samples) < 20:
unmatched_samples.append(job_no)
if verbose_trace:
logger.info("匹配失败job_no=%s norm=%s", job_no, _normalize_job_no(job_no))
continue
if verbose_trace:
logger.info(
"匹配成功job_no=%s norm=%s row_id=%s matched_by=%s",
job_no,
_normalize_job_no(job_no),
oa_record_id,
matched_by,
)
emp = item.get("employeeInfo") or {}
rec = item.get("recordInfo") or {}
if not isinstance(emp, dict):
emp = {}
if not isinstance(rec, dict):
rec = {}
org_oid = str(rec.get("oIdOrganization") or rec.get("oIdDepartment") or "").strip()
org = org_by_oid.get(org_oid, {})
company = str((org or {}).get("name") or "")
name = str(emp.get("name") or "")
rd_attr = _custom_prop_value(rec.get("customProperties"), rd_attr_custom_key) or _custom_prop_value(
emp.get("customProperties"), rd_attr_custom_key
)
place = str(rec.get("place") or "")
entry_date = _date_only(rec.get("entryDate"))
leave_date = _date_only(rec.get("lastWorkDate")) or "2099-12-31"
id_number = str(emp.get("iDNumber") or "")
hrbp = str((org or {}).get("hRBP") or "")
manager = str(rec.get("pOIdEmpAdmin") or "")
is_leaving = "" if _date_only(rec.get("lastWorkDate")) else ""
domain_account = _custom_prop_value(emp.get("customProperties"), domain_custom_key) or str(emp.get("_Name") or "")
fields_payload = [
{"name": display_to_code["所属公司"], "value": company, "showValue": company},
{"name": display_to_code["姓名"], "value": name, "showValue": name},
{"name": display_to_code["研发属性"], "value": rd_attr, "showValue": rd_attr},
{"name": display_to_code["工作地点"], "value": place, "showValue": place},
{"name": display_to_code["入职日期"], "value": entry_date, "showValue": entry_date},
{"name": display_to_code["离职日期"], "value": leave_date, "showValue": leave_date},
{"name": display_to_code["身份证号"], "value": id_number, "showValue": id_number},
{"name": display_to_code["HRBP"], "value": hrbp, "showValue": hrbp},
{"name": display_to_code["汇报人"], "value": manager, "showValue": manager},
{"name": display_to_code["在离职"], "value": is_leaving, "showValue": is_leaving},
{"name": display_to_code["域账号"], "value": domain_account, "showValue": domain_account},
]
if verbose_trace:
logger.info("字段映射job_no=%s row_id=%s", job_no, oa_record_id)
for fld in fields_payload:
logger.info("字段映射明细job_no=%s field=%s value=%s", job_no, fld["name"], fld["value"])
data_list.append(
{
"masterTable": {
"name": oa_master_table_name,
"record": {
"id": oa_record_id,
"fields": fields_payload,
},
"changedFields": [f["name"] for f in fields_payload],
},
"subTables": [],
}
)
logger.info(
"待更新数据准备完成prepared_updates=%s not_found_in_oa=%s",
len(data_list),
not_found_in_oa,
)
if not data_list:
raise RuntimeError(
"No updates prepared for OA batch-update (check jobNumber matching between EHR and OA, and form field mapping). "
f"unmatched_sample={unmatched_samples}"
)
# 6) 分批执行 batch-update
success_count = 0
failed_count = 0
failed_data: dict[str, str] = {}
do_trigger_bool = _to_bool_or_none(do_trigger)
for i in range(0, len(data_list), batch_size):
chunk = data_list[i : i + batch_size]
if verbose_trace:
logger.info("批量更新尝试chunk_index=%s chunk_size=%s", i // batch_size + 1, len(chunk))
for row in chunk:
try:
record = (((row or {}).get("masterTable") or {}).get("record") or {})
row_id = record.get("id")
fields = record.get("fields") or []
logger.info("批量更新行row_id=%s fields_count=%s", row_id, len(fields))
for fld in fields:
if isinstance(fld, dict):
logger.info("批量更新字段row_id=%s field=%s value=%s", row_id, fld.get("name"), fld.get("value"))
except Exception:
logger.info("批量更新行日志输出失败,已忽略")
resp = seeyon.batch_update_cap4_form_soap(
formCode=oa_form_code,
loginName=oa_login_name,
rightId=oa_right_id,
dataList=chunk,
uniqueFiled=[job_field_code],
doTrigger=do_trigger_bool,
)
rj = resp.json() if resp.content else {}
code = int(rj.get("code", -1))
if code != 0:
raise RuntimeError(f"OA batch-update failed code={code} message={rj.get('message')!r}")
data = rj.get("data") or {}
success_count += int(data.get("successCount", 0) or 0)
failed_count += int(data.get("failedCount", 0) or 0)
fd = data.get("failedData") or {}
if isinstance(fd, dict):
for k, v in fd.items():
failed_data[str(k)] = str(v)
logger.info(
"OA batch-update chunk done chunk_size=%s success=%s failed=%s",
len(chunk),
int(data.get("successCount", 0) or 0),
int(data.get("failedCount", 0) or 0),
)
return {
"ehr_total_rows": len(emp_rows),
"ehr_distinct_job_numbers": len(ehr_by_job_no),
"oa_existing_job_numbers": len(oa_id_by_job_no),
"prepared_updates": len(data_list),
"not_found_in_oa": not_found_in_oa,
"success_count": success_count,
"failed_count": failed_count,
"failed_data": dict(list(failed_data.items())[:100]),
}
finally:
ehr.close()
seeyon.close()