Vastai-ConnectHub/extensions/sync_ehr_to_oa/job.py

1020 lines
46 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import json
import logging
from typing import Any
from app.integrations.seeyon import SeeyonClient
from app.jobs.base import BaseJob
from extensions.sync_ehr_to_oa.api import SyncEhrToOaApi
logger = logging.getLogger("connecthub.extensions.sync_ehr_to_oa")
# OA SQLServer按你的要求硬编码
_OA_SQLSERVER_PARAMS: dict[str, Any] = {
"host": "192.168.30.108",
"port": 1433,
"database": "seeyon",
"username": "SHOADB91",
"password": "E7nZ8x@12",
"driver": "ODBC Driver 18 for SQL Server",
"encrypt": "no",
"trust_server_certificate": "yes",
"connect_timeout_s": 10,
}
_OA_SQLSERVER_SCHEMA = "dbo"
_OA_SQLSERVER_TABLE = "formmain_20250359"
_OA_SQLSERVER_JOB_NO_COLUMN = "field0001"
_OA_SQLSERVER_ID_COLUMN = "id"
_EHR_RD_ATTR_KEY = "extyfsx_606508_585814777"
_EHR_HRBP_ID_KEY = "extdyhrbp_606508_1933587232"
_EHR_DOMAIN_ACCOUNT_KEY = "extADAccountName_606508_511687157"
def _cell_value(cell: Any) -> str:
if isinstance(cell, dict):
v = cell.get("value")
if v is None or str(v).strip() == "":
v = cell.get("showValue")
return str(v or "").strip()
return str(cell or "").strip()
def _cell_show_value(cell: Any) -> str:
if isinstance(cell, dict):
return str(cell.get("showValue") or "").strip()
return ""
def _date_only(s: Any) -> str:
v = str(s or "").strip()
if not v:
return ""
if "T" in v:
return v.split("T", 1)[0]
if " " in v:
return v.split(" ", 1)[0]
return v
def _custom_prop_value(custom_props: Any, key: str | None) -> str:
if not key:
return ""
if not isinstance(custom_props, dict):
return ""
raw = custom_props.get(key)
if isinstance(raw, dict):
val = raw.get("value")
if val is None or str(val).strip() == "":
val = raw.get("showValue")
return str(val or "").strip()
return str(raw or "").strip()
def _to_bool_or_none(v: Any) -> bool | None:
if v is None:
return None
if isinstance(v, bool):
return v
s = str(v).strip().lower()
if s in ("1", "true", "yes", "y", "on"):
return True
if s in ("0", "false", "no", "n", "off", ""):
return False
return bool(v)
def _normalize_job_no(v: Any) -> str:
"""
工号标准化:
- 去首尾空白、去内部空格
- 数值型字符串如 123.0 -> 123常见于表单数字字段
- 统一大写,便于大小写不敏感匹配
"""
s = str(v or "").strip()
if not s:
return ""
s = s.replace(" ", "")
try:
if "." in s and s.endswith(".0"):
i = int(float(s))
s = str(i)
except Exception:
pass
return s.upper()
def _prefer_non_empty(new_val: Any, old_val: Any) -> str:
s_new = str(new_val or "").strip()
if s_new:
return s_new
return str(old_val or "").strip()
def _rd_attr_to_text(raw: Any) -> str:
s = str(raw or "").strip()
if s == "1":
return "研发"
if s in ("0", "2"):
return "非研发"
return ""
def _to_int_safe(v: Any) -> int:
try:
return int(str(v).strip())
except Exception:
return 0
def _extract_reporting_user_id(staff_profile: dict[str, Any]) -> int:
"""
从 staff/Get 返回中解析汇报人 userId兼容大小写与 list/dict 结构)。
"""
if not isinstance(staff_profile, dict):
return 0
reportings = staff_profile.get("Reportings", staff_profile.get("reportings"))
if isinstance(reportings, list) and reportings:
first = reportings[0]
if isinstance(first, dict):
return _to_int_safe(first.get("userid", first.get("userId")))
if isinstance(reportings, dict):
return _to_int_safe(reportings.get("userid", reportings.get("userId")))
return 0
def _extract_staff_code(staff_profile: dict[str, Any]) -> str:
if not isinstance(staff_profile, dict):
return ""
for key in (
"staffCode",
"StaffCode",
"code",
"Code",
"jobNumber",
"JobNumber",
"employeeNo",
"EmployeeNo",
):
val = str(staff_profile.get(key) or "").strip()
if val:
return val
lower_map = {str(k).lower(): v for k, v in staff_profile.items()}
for key in ("staffcode", "code", "jobnumber", "employeeno"):
val = str(lower_map.get(key) or "").strip()
if val:
return val
return ""
def _extract_mobile_phone(emp_info: dict[str, Any]) -> str:
if not isinstance(emp_info, dict):
return ""
candidate_keys = (
"mobile",
"mobilePhone",
"phone",
"phoneNumber",
"tel",
"telephone",
"cellphone",
"cellPhone",
"Mobile",
"MobilePhone",
"Phone",
"PhoneNumber",
"Tel",
"Telephone",
"Cellphone",
"CellPhone",
)
for key in candidate_keys:
raw = emp_info.get(key)
if isinstance(raw, dict):
val = raw.get("value")
if val is None or str(val).strip() == "":
val = raw.get("showValue")
raw = val
s = str(raw or "").strip()
if s:
return s
translate = emp_info.get("translateProperties")
if isinstance(translate, dict):
for key in candidate_keys:
s = str(translate.get(key) or "").strip()
if s:
return s
return ""
def _pick_best_member_by_code(members: list[dict[str, Any]]) -> dict[str, Any] | None:
if not members:
return None
# 优先:可用且在职
for m in members:
if bool(m.get("isValid", True)) and int(m.get("state", 1) or 1) == 1:
return m
# 次优enabled
for m in members:
if bool(m.get("enabled", True)):
return m
return members[0]
def _extract_oa_row_id_and_fields(row: dict[str, Any]) -> tuple[int | None, dict[str, Any]]:
"""
兼容不同 OA export 返回结构,提取:
- row_id
- 字段字典key=fieldCode, value=单元格对象或值)
"""
field_map: dict[str, Any] = {}
row_id: int | None = None
# 结构 AmasterData 直接是 {field0001: {value,showValue}, ...}
master = row.get("masterData")
if isinstance(master, dict):
for k, v in master.items():
if isinstance(k, str) and k.startswith("field"):
field_map[k] = v
for candidate in (row.get("id"), row.get("masterDataId"), master.get("id")):
if candidate is None:
continue
try:
row_id = int(str(candidate))
break
except Exception:
continue
# 结构 BmasterTable.record.fields = [{name,value,showValue}, ...]
master_table = row.get("masterTable")
if isinstance(master_table, dict):
record = master_table.get("record")
if isinstance(record, dict):
fields = record.get("fields")
if isinstance(fields, list):
for fld in fields:
if not isinstance(fld, dict):
continue
name = str(fld.get("name") or "").strip()
if name:
field_map[name] = fld
if row_id is None:
rid = record.get("id")
if rid is not None:
try:
row_id = int(str(rid))
except Exception:
pass
# 结构 C行级 fields 列表
row_fields = row.get("fields")
if isinstance(row_fields, list):
for fld in row_fields:
if not isinstance(fld, dict):
continue
name = str(fld.get("name") or "").strip()
if name:
field_map[name] = fld
return row_id, field_map
def _choose_better_record(current: dict[str, Any], candidate: dict[str, Any]) -> dict[str, Any]:
def _score(item: dict[str, Any]) -> str:
record = item.get("recordInfo") or {}
emp = item.get("employeeInfo") or {}
parts = [
str(record.get("businessModifiedTime") or ""),
str(record.get("modifiedTime") or ""),
str(emp.get("businessModifiedTime") or ""),
str(emp.get("modifiedTime") or ""),
str(record.get("createdTime") or ""),
str(emp.get("createdTime") or ""),
]
return "|".join(parts)
return candidate if _score(candidate) >= _score(current) else current
class SyncEhrToOaFormJob(BaseJob):
"""
EHR -> OA 无流程表单字段同步。
同步字段:
- 工号(作为唯一对应关系,不写入)
- 所属公司、姓名、研发属性、工作地点、入职日期、离职日期、身份证号、手机号、HRBP、汇报人、在离职、域账号
"""
job_id = "sync_ehr_to_oa.sync_form"
def run(self, params: dict[str, Any], secrets: dict[str, Any]) -> dict[str, Any]:
oa_base_url = str(params.get("oa_base_url") or "").strip()
oa_form_code = str(params.get("oa_form_code") or "").strip()
oa_right_id = str(params.get("oa_right_id") or "").strip()
oa_login_name = str(params.get("oa_login_name") or "").strip()
if not oa_base_url:
raise ValueError("public_cfg.oa_base_url is required")
if not oa_form_code:
raise ValueError("public_cfg.oa_form_code is required")
if not oa_right_id:
raise ValueError("public_cfg.oa_right_id is required")
if not oa_login_name:
raise ValueError("public_cfg.oa_login_name is required")
oa_template_code = str(params.get("oa_template_code") or oa_form_code).strip()
oa_master_table_name = str(params.get("oa_master_table_name") or "").strip()
batch_size = int(params.get("batch_size") or 100)
if batch_size <= 0:
batch_size = 100
stop_time = params.get("stop_time")
capacity = int(params.get("capacity") or 300)
do_trigger = params.get("do_trigger")
sender_login_name = params.get("senderLoginName")
sender_login_name = str(sender_login_name).strip() if sender_login_name else None
rest_user = str(secrets.get("rest_user") or "").strip()
rest_password = str(secrets.get("rest_password") or "").strip()
login_name = secrets.get("loginName")
login_name = str(login_name).strip() if login_name else None
if not rest_user or not rest_password:
raise ValueError("secret_cfg.rest_user and secret_cfg.rest_password are required")
app_key = str(secrets.get("app_key") or "").strip()
app_secret = str(secrets.get("app_secret") or "").strip()
if not app_key or not app_secret:
raise ValueError("secret_cfg.app_key and secret_cfg.app_secret are required")
domain_custom_key = str(params.get("domain_account_custom_key") or "").strip() or _EHR_DOMAIN_ACCOUNT_KEY
mobile_custom_key = str(params.get("mobile_phone_custom_key") or "").strip() or None
verbose_trace = _to_bool_or_none(params.get("verbose_trace"))
if verbose_trace is None:
verbose_trace = True
preview_ehr_data = _to_bool_or_none(params.get("preview_ehr_data"))
if preview_ehr_data is None:
preview_ehr_data = True
preview_limit = int(params.get("preview_limit") or 20)
if preview_limit <= 0:
preview_limit = 20
debug_only = _to_bool_or_none(params.get("debug_only"))
if debug_only is None:
debug_only = False
seeyon = SeeyonClient(base_url=oa_base_url, rest_user=rest_user, rest_password=rest_password, loginName=login_name)
ehr = SyncEhrToOaApi(
secret_params={"app_key": app_key, "app_secret": app_secret},
sqlserver_params=_OA_SQLSERVER_PARAMS,
)
try:
try:
ehr.ping_sqlserver()
logger.info(
"SQLServer 连通性检查通过host=%s db=%s table=%s",
_OA_SQLSERVER_PARAMS["host"],
_OA_SQLSERVER_PARAMS["database"],
_OA_SQLSERVER_TABLE,
)
except Exception as e: # noqa: BLE001
raise RuntimeError(f"SQLServer 连接失败: {e!r}") from e
# 1) EHR 拉取员工任职与组织
emp_res = ehr.get_all_employees_with_record_by_time_window(stop_time=stop_time, capacity=capacity)
org_res = ehr.get_all_organizations_by_time_window(stop_time=stop_time, capacity=capacity)
emp_rows = emp_res.get("data") or []
org_rows = org_res.get("data") or []
if not isinstance(emp_rows, list):
raise RuntimeError("EHR employee result invalid: data is not list")
if not isinstance(org_rows, list):
raise RuntimeError("EHR organization result invalid: data is not list")
# 2) 组织映射
org_by_oid: dict[str, dict[str, Any]] = {}
for o in org_rows:
if not isinstance(o, dict):
continue
oid = str(o.get("oId") or "").strip()
if oid:
org_by_oid[oid] = o
# 3) 员工按工号归并(同工号保留“最新”记录)
ehr_by_job_no: dict[str, dict[str, Any]] = {}
ehr_by_job_no_norm: dict[str, dict[str, Any]] = {}
ehr_user_id_to_job_no: dict[int, str] = {}
for item in emp_rows:
if not isinstance(item, dict):
continue
record = item.get("recordInfo") or {}
emp_info = item.get("employeeInfo") or {}
if not isinstance(record, dict):
continue
job_no = str(record.get("jobNumber") or "").strip()
if not job_no:
continue
existing = ehr_by_job_no.get(job_no)
ehr_by_job_no[job_no] = item if existing is None else _choose_better_record(existing, item)
job_no_norm = _normalize_job_no(job_no)
if job_no_norm:
ex2 = ehr_by_job_no_norm.get(job_no_norm)
ehr_by_job_no_norm[job_no_norm] = item if ex2 is None else _choose_better_record(ex2, item)
try:
uid = int((emp_info or {}).get("userID"))
except Exception:
uid = 0
if uid > 0 and uid not in ehr_user_id_to_job_no:
ehr_user_id_to_job_no[uid] = job_no
logger.info(
"EHR 数据准备完成employee_rows=%s organization_rows=%s distinct_job_numbers=%s distinct_job_numbers_norm=%s userid_jobno_map=%s",
len(emp_rows),
len(org_rows),
len(ehr_by_job_no),
len(ehr_by_job_no_norm),
len(ehr_user_id_to_job_no),
)
# 3.1) 按员工 UserID 查询合同主体公司firstParty作为“所属公司”的优先来源
contract_user_ids: list[int] = []
for item in ehr_by_job_no.values():
emp_info = item.get("employeeInfo") or {}
if not isinstance(emp_info, dict):
continue
try:
uid = int(emp_info.get("userID"))
except Exception:
continue
if uid > 0:
contract_user_ids.append(uid)
first_party_by_user_id = ehr.get_contract_first_party_by_user_ids(
user_ids=contract_user_ids,
is_current_effective=True,
status=1,
contract_type=None,
is_with_deleted=False,
)
logger.info(
"所属公司来源合同主体准备完成input_user_ids=%s matched=%s",
len(contract_user_ids),
len(first_party_by_user_id),
)
# 3.2) 根据员工 userId 查询 staff 信息,用于解析汇报人与工号映射
staff_profile_by_user_id = ehr.get_staff_profiles_by_user_ids(user_ids=contract_user_ids)
reporting_user_ids: list[int] = []
for p in staff_profile_by_user_id.values():
rid = _extract_reporting_user_id(p)
if rid > 0:
reporting_user_ids.append(rid)
# 3.2.1) 从任职记录直接收集汇报人 userId避免只靠 Reportings 漏掉数据
manager_user_ids: list[int] = []
for item in ehr_by_job_no.values():
rec2 = item.get("recordInfo") or {}
if not isinstance(rec2, dict):
continue
mid = _to_int_safe(rec2.get("pOIdEmpAdmin"))
if mid > 0:
manager_user_ids.append(mid)
# 3.3) 收集 HRBP userId来自 recordInfo 自定义字段),并统一反查工号
hrbp_user_ids: list[int] = []
for item in ehr_by_job_no.values():
rec = item.get("recordInfo") or {}
if not isinstance(rec, dict):
continue
hrbp_uid = _to_int_safe(rec.get(_EHR_HRBP_ID_KEY) or _custom_prop_value(rec.get("customProperties"), _EHR_HRBP_ID_KEY))
if hrbp_uid > 0:
hrbp_user_ids.append(hrbp_uid)
resolve_user_ids = list({*reporting_user_ids, *manager_user_ids, *hrbp_user_ids})
resolve_profile_by_user_id = ehr.get_staff_profiles_by_user_ids(user_ids=resolve_user_ids)
user_id_to_staff_code: dict[int, str] = {}
for uid, profile in resolve_profile_by_user_id.items():
code = _extract_staff_code(profile)
if code:
user_id_to_staff_code[int(uid)] = code
# 兜底:若 staffs/Get 未返回工号,尝试从本次员工数据中反查
for uid in resolve_user_ids:
if int(uid) not in user_id_to_staff_code and int(uid) in ehr_user_id_to_job_no:
user_id_to_staff_code[int(uid)] = ehr_user_id_to_job_no[int(uid)]
# 3.4) 将 HRBP/汇报人工号转换为 OA 人员IDmember 字段要求)
need_member_codes = list({c for c in user_id_to_staff_code.values() if str(c or "").strip()})
code_to_member: dict[str, dict[str, str]] = {}
for code in need_member_codes:
try:
members = seeyon.get_org_members_by_code(code=code, pageNo=0, pageSize=20)
best = _pick_best_member_by_code(members)
if not best:
continue
member_id = str(best.get("id") or "").strip()
member_name = str(best.get("name") or best.get("loginName") or "").strip()
member_code = str(best.get("code") or code).strip()
if member_id:
code_to_member[code] = {"id": member_id, "name": member_name, "code": member_code}
except Exception as e: # noqa: BLE001
logger.warning("OA 人员查询失败code=%s err=%r", code, e)
logger.info(
"人员工号反查完成staff_profiles=%s reportings=%s manager_ids=%s hrbp_ids=%s resolved_staff_codes=%s resolved_member_ids=%s",
len(staff_profile_by_user_id),
len(reporting_user_ids),
len(manager_user_ids),
len(hrbp_user_ids),
len(user_id_to_staff_code),
len(code_to_member),
)
if verbose_trace:
sample_codes = list(user_id_to_staff_code.items())[:30]
logger.info("人员工号反查样本(uid->code)=%s", sample_codes)
if verbose_trace:
for job_no in list(ehr_by_job_no.keys()):
logger.info("EHR 工号明细raw=%s norm=%s", job_no, _normalize_job_no(job_no))
if preview_ehr_data:
logger.info("EHR 字段预览开始limit=%s", preview_limit)
count = 0
for job_no, item in ehr_by_job_no.items():
emp = item.get("employeeInfo") or {}
rec = item.get("recordInfo") or {}
if not isinstance(emp, dict):
emp = {}
if not isinstance(rec, dict):
rec = {}
user_id = 0
try:
user_id = int(emp.get("userID"))
except Exception:
user_id = 0
org_oid = str(rec.get("oIdOrganization") or rec.get("oIdDepartment") or "").strip()
org = org_by_oid.get(org_oid, {})
company = str(first_party_by_user_id.get(user_id) or str((org or {}).get("name") or ""))
name = str(emp.get("name") or "")
rd_attr = _rd_attr_to_text(_custom_prop_value(emp.get("customProperties"), _EHR_RD_ATTR_KEY))
rec_translate = rec.get("translateProperties") or {}
place = str((rec_translate or {}).get("PlaceText") or rec.get("place") or "")
entry_date = _date_only(rec.get("entryDate"))
leave_date = _date_only(rec.get("lastWorkDate"))
id_number = str(emp.get("iDNumber") or "")
mobile_phone = _extract_mobile_phone(emp)
if not mobile_phone and mobile_custom_key:
mobile_phone = _custom_prop_value(emp.get("customProperties"), mobile_custom_key)
hrbp_uid = _to_int_safe(rec.get(_EHR_HRBP_ID_KEY) or _custom_prop_value(rec.get("customProperties"), _EHR_HRBP_ID_KEY))
hrbp_code = str(user_id_to_staff_code.get(hrbp_uid) or "")
hrbp_member = code_to_member.get(hrbp_code, {})
hrbp = str(hrbp_member.get("id") or "")
hrbp_show = str(hrbp_member.get("name") or hrbp_code)
manager_uid = _extract_reporting_user_id(staff_profile_by_user_id.get(user_id, {}))
if manager_uid <= 0:
manager_uid = _to_int_safe(rec.get("pOIdEmpAdmin"))
manager_code = str(user_id_to_staff_code.get(manager_uid) or "")
manager_member = code_to_member.get(manager_code, {})
manager = str(manager_member.get("id") or "")
manager_show = str(manager_member.get("name") or manager_code)
is_leaving = "" if _date_only(rec.get("lastWorkDate")) else ""
domain_account = _custom_prop_value(emp.get("customProperties"), domain_custom_key)
logger.info(
"EHR 字段预览job_no=%s company=%s name=%s rd_attr=%s place=%s entry_date=%s leave_date=%s id_number=%s mobile_phone=%s hrbp_id=%s hrbp_show=%s manager_id=%s manager_show=%s is_leaving=%s domain_account=%s",
job_no,
company,
name,
rd_attr,
place,
entry_date,
leave_date,
id_number,
mobile_phone,
hrbp,
hrbp_show,
manager,
manager_show,
is_leaving,
domain_account,
)
count += 1
if count >= preview_limit:
break
logger.info("EHR 字段预览结束printed=%s", count)
# 4) 导出 OA 表单,建立字段映射 + 工号到记录ID映射
exp_resp = seeyon.export_cap4_form_soap(
templateCode=oa_template_code,
senderLoginName=sender_login_name,
rightId=oa_right_id,
)
raw = exp_resp.text or ""
logger.info(
"OA export 返回status=%s content_length=%s template=%s",
exp_resp.status_code,
len(raw),
oa_template_code,
)
if raw:
logger.info("OA export 响应预览:%s", raw[:1000])
try:
payload = json.loads(raw) if raw else {}
except Exception as e: # noqa: BLE001
raise RuntimeError(f"OA export 响应不是有效 JSON: err={e!r} preview={raw[:500]!r}") from e
export_code = payload.get("code")
export_message = payload.get("message")
if export_code not in (None, 0, "0"):
raise RuntimeError(f"OA export failed code={export_code!r} message={export_message!r}")
outer = payload.get("data") or {}
form = outer.get("data") or {}
if not isinstance(form, dict):
raise RuntimeError(
f"OA export invalid: data.data is not an object; payload_keys={list(payload.keys())[:20]}"
)
definition = form.get("definition") or {}
fields = definition.get("fields") or []
if not isinstance(fields, list):
raise RuntimeError("OA export invalid: definition.fields is not a list")
display_to_code: dict[str, str] = {}
for f in fields:
if not isinstance(f, dict):
continue
display = str(f.get("display") or "").strip()
name = str(f.get("name") or "").strip()
if display and name:
display_to_code[display] = name
needed_displays = [
"工号",
"所属公司",
"姓名",
"研发属性",
"工作地点",
"入职日期",
"离职日期",
"身份证号",
"手机号",
"HRBP",
"汇报人",
"在离职",
"域账号",
]
missing = [x for x in needed_displays if x not in display_to_code]
if missing:
raise RuntimeError(f"OA export invalid: missing form fields by display names: {missing}")
rows = form.get("data") or []
if not isinstance(rows, list):
raise RuntimeError("OA export invalid: data is not a list")
if not oa_master_table_name:
for key in ("masterTableName", "masterTable", "masterTableCode"):
v = str((definition or {}).get(key) or "").strip()
if v:
oa_master_table_name = v
break
if not oa_master_table_name and fields:
first_field = fields[0] if isinstance(fields[0], dict) else {}
oa_master_table_name = str(first_field.get("tableName") or "").strip()
# 与 SQLServer 查询目标保持一致(优先使用硬编码表)
oa_master_table_name = _OA_SQLSERVER_TABLE
if not oa_master_table_name:
raise RuntimeError("public_cfg.oa_master_table_name is required (cannot infer from OA export)")
logger.info(
"OA 表单解析完成template=%s master_table=%s",
oa_template_code,
oa_master_table_name,
)
# 从 export 中提取“工号 -> 字段值字典”,用于值兜底(避免把已有值覆盖为空)
oa_fields_by_job_no_norm: dict[str, dict[str, Any]] = {}
export_id_by_job_no: dict[str, int] = {}
export_id_by_job_no_norm: dict[str, int] = {}
for row in rows:
if not isinstance(row, dict):
continue
rid, field_map = _extract_oa_row_id_and_fields(row)
job_cell = field_map.get(display_to_code["工号"])
job_no = _cell_value(job_cell)
norm = _normalize_job_no(job_no)
if norm:
oa_fields_by_job_no_norm[norm] = field_map
if rid is not None:
export_id_by_job_no[job_no] = rid
export_id_by_job_no_norm[norm] = rid
logger.info("OA export 字段值索引完成rows=%s indexed_by_job_no=%s", len(rows), len(oa_fields_by_job_no_norm))
logger.info(
"OA export 记录ID索引完成raw=%s norm=%s",
len(export_id_by_job_no),
len(export_id_by_job_no_norm),
)
job_field_code = display_to_code["工号"]
oa_id_by_job_no: dict[str, int] = {}
oa_id_by_job_no_norm: dict[str, int] = {}
sql_map = ehr.get_oa_record_id_map_from_sqlserver(
table_name=_OA_SQLSERVER_TABLE,
schema=_OA_SQLSERVER_SCHEMA,
job_numbers=list(ehr_by_job_no.keys()),
job_no_column=_OA_SQLSERVER_JOB_NO_COLUMN,
id_column=_OA_SQLSERVER_ID_COLUMN,
)
for job_no, row_id in sql_map.items():
oa_id_by_job_no[job_no] = row_id
job_no_norm = _normalize_job_no(job_no)
if job_no_norm:
oa_id_by_job_no_norm[job_no_norm] = row_id
logger.info(
"OA 工号索引完成SQLServerindexed_job_numbers=%s indexed_job_numbers_norm=%s",
len(oa_id_by_job_no),
len(oa_id_by_job_no_norm),
)
# 关键修复:优先使用 export 的 record.id与 batch-update 同源SQL 仅兜底。
merged_id_by_job_no = dict(oa_id_by_job_no)
merged_id_by_job_no_norm = dict(oa_id_by_job_no_norm)
merged_id_by_job_no.update(export_id_by_job_no)
merged_id_by_job_no_norm.update(export_id_by_job_no_norm)
logger.info(
"OA 工号索引合并完成export 优先, SQL 兜底raw=%s norm=%s",
len(merged_id_by_job_no),
len(merged_id_by_job_no_norm),
)
# 记录 ID 冲突样本,便于确认 SQL 与 export 是否来自同一数据源
id_conflict_samples: list[tuple[str, int, int]] = []
for k, export_id in export_id_by_job_no.items():
sql_id = oa_id_by_job_no.get(k)
if sql_id is not None and sql_id != export_id and len(id_conflict_samples) < 20:
id_conflict_samples.append((k, sql_id, export_id))
if id_conflict_samples:
logger.warning("OA 记录ID冲突样本job_no, sql_id, export_id=%s", id_conflict_samples)
if verbose_trace:
for job_no, row_id in list(merged_id_by_job_no.items()):
logger.info("OA 工号索引明细raw=%s norm=%s row_id=%s", job_no, _normalize_job_no(job_no), row_id)
# 5) 组装批量更新数据
data_list: list[dict[str, Any]] = []
not_found_in_oa = 0
unmatched_samples: list[str] = []
debug_rows = 0
place_debug_count = 0
for job_no, item in ehr_by_job_no.items():
oa_record_id = merged_id_by_job_no.get(job_no)
matched_by = "raw"
if oa_record_id is None:
oa_record_id = merged_id_by_job_no_norm.get(_normalize_job_no(job_no))
matched_by = "normalized"
if oa_record_id is None:
not_found_in_oa += 1
if len(unmatched_samples) < 20:
unmatched_samples.append(job_no)
if verbose_trace:
logger.info("匹配失败job_no=%s norm=%s", job_no, _normalize_job_no(job_no))
continue
if verbose_trace:
logger.info(
"匹配成功job_no=%s norm=%s row_id=%s matched_by=%s",
job_no,
_normalize_job_no(job_no),
oa_record_id,
matched_by,
)
emp = item.get("employeeInfo") or {}
rec = item.get("recordInfo") or {}
if not isinstance(emp, dict):
emp = {}
if not isinstance(rec, dict):
rec = {}
user_id = 0
try:
user_id = int(emp.get("userID"))
except Exception:
user_id = 0
org_oid = str(rec.get("oIdOrganization") or rec.get("oIdDepartment") or "").strip()
org = org_by_oid.get(org_oid, {})
existing_field_map = oa_fields_by_job_no_norm.get(_normalize_job_no(job_no), {})
company = str(first_party_by_user_id.get(user_id) or str((org or {}).get("name") or ""))
name = str(emp.get("name") or "")
rd_attr = _rd_attr_to_text(_custom_prop_value(emp.get("customProperties"), _EHR_RD_ATTR_KEY))
rec_translate = rec.get("translateProperties") or {}
place_text = str((rec_translate or {}).get("PlaceText") or "").strip()
place_code = str(rec.get("place") or "").strip()
place = str(place_text or place_code)
entry_date = _date_only(rec.get("entryDate"))
leave_date = _date_only(rec.get("lastWorkDate"))
id_number = str(emp.get("iDNumber") or "")
mobile_phone = _extract_mobile_phone(emp)
if not mobile_phone and mobile_custom_key:
mobile_phone = _custom_prop_value(emp.get("customProperties"), mobile_custom_key)
hrbp_uid = _to_int_safe(rec.get(_EHR_HRBP_ID_KEY) or _custom_prop_value(rec.get("customProperties"), _EHR_HRBP_ID_KEY))
hrbp_code = str(user_id_to_staff_code.get(hrbp_uid) or "")
hrbp_member = code_to_member.get(hrbp_code, {})
hrbp = str(hrbp_member.get("id") or "")
hrbp_show = str(hrbp_member.get("name") or hrbp_code)
manager_uid = _extract_reporting_user_id(staff_profile_by_user_id.get(user_id, {}))
if manager_uid <= 0:
manager_uid = _to_int_safe(rec.get("pOIdEmpAdmin"))
manager_code = str(user_id_to_staff_code.get(manager_uid) or "")
manager_member = code_to_member.get(manager_code, {})
manager = str(manager_member.get("id") or "")
manager_show = str(manager_member.get("name") or manager_code)
is_leaving = "" if _date_only(rec.get("lastWorkDate")) else ""
domain_account = _custom_prop_value(emp.get("customProperties"), domain_custom_key)
if place_debug_count < 50:
logger.info(
"工作地点调试job_no=%s PlaceText=%r place_code=%r chosen=%r",
job_no,
place_text,
place_code,
place,
)
place_debug_count += 1
debug_payload = {
"job_no": job_no,
"ehr_user_id": user_id,
"oa_record_id": oa_record_id,
"matched_by": matched_by,
"ehr_hrbp_user_id": hrbp_uid,
"ehr_hrbp_staff_code": hrbp_code,
"oa_hrbp_member": {
"id": str(hrbp_member.get("id") or ""),
"code": str(hrbp_member.get("code") or ""),
"name": str(hrbp_member.get("name") or ""),
},
"ehr_manager_user_id": manager_uid,
"ehr_manager_staff_code": manager_code,
"oa_manager_member": {
"id": str(manager_member.get("id") or ""),
"code": str(manager_member.get("code") or ""),
"name": str(manager_member.get("name") or ""),
},
"employeeInfo": emp,
"recordInfo": rec,
}
logger.info("DEBUG_EMPLOYEE_MAPPING %s", json.dumps(debug_payload, ensure_ascii=False, default=str))
debug_rows += 1
company = _prefer_non_empty(company, _cell_value(existing_field_map.get(display_to_code["所属公司"])))
name = _prefer_non_empty(name, _cell_value(existing_field_map.get(display_to_code["姓名"])))
rd_attr = _prefer_non_empty(rd_attr, _cell_value(existing_field_map.get(display_to_code["研发属性"])))
place = _prefer_non_empty(place, _cell_value(existing_field_map.get(display_to_code["工作地点"])))
entry_date = _prefer_non_empty(entry_date, _cell_value(existing_field_map.get(display_to_code["入职日期"])))
# 未离职不填离职日期(保持空),不再回填旧值。
leave_date = str(leave_date or "").strip()
id_number = _prefer_non_empty(id_number, _cell_value(existing_field_map.get(display_to_code["身份证号"])))
mobile_phone = _prefer_non_empty(mobile_phone, _cell_value(existing_field_map.get(display_to_code["手机号"])))
hrbp = _prefer_non_empty(hrbp, _cell_value(existing_field_map.get(display_to_code["HRBP"])))
manager = _prefer_non_empty(manager, _cell_value(existing_field_map.get(display_to_code["汇报人"])))
hrbp_show = _prefer_non_empty(hrbp_show, _cell_show_value(existing_field_map.get(display_to_code["HRBP"])))
manager_show = _prefer_non_empty(manager_show, _cell_show_value(existing_field_map.get(display_to_code["汇报人"])))
# HRBP/汇报人优先走“EHR userId -> 工号 -> OA memberId”
# 若无法映射,保留 OA 现有值,避免写空。
if not hrbp and verbose_trace:
logger.warning("HRBP 映射为空job_no=%s hrbp_uid=%s hrbp_code=%s", job_no, hrbp_uid, hrbp_code)
if not manager and verbose_trace:
logger.warning("汇报人映射为空job_no=%s manager_uid=%s manager_code=%s", job_no, manager_uid, manager_code)
if str(hrbp_member.get("id") or "").strip():
hrbp = str(hrbp_member.get("id") or "")
hrbp_show = str(hrbp_member.get("name") or hrbp_code)
if str(manager_member.get("id") or "").strip():
manager = str(manager_member.get("id") or "")
manager_show = str(manager_member.get("name") or manager_code)
is_leaving = _prefer_non_empty(is_leaving, _cell_value(existing_field_map.get(display_to_code["在离职"])))
domain_account = _prefer_non_empty(domain_account, _cell_value(existing_field_map.get(display_to_code["域账号"])))
fields_payload = [
{"name": display_to_code["所属公司"], "value": company, "showValue": company},
{"name": display_to_code["姓名"], "value": name, "showValue": name},
{"name": display_to_code["研发属性"], "value": rd_attr, "showValue": rd_attr},
{"name": display_to_code["工作地点"], "value": place, "showValue": place},
{"name": display_to_code["入职日期"], "value": entry_date, "showValue": entry_date},
{"name": display_to_code["离职日期"], "value": leave_date, "showValue": leave_date},
{"name": display_to_code["身份证号"], "value": id_number, "showValue": id_number},
{"name": display_to_code["手机号"], "value": mobile_phone, "showValue": mobile_phone},
{"name": display_to_code["HRBP"], "value": hrbp, "showValue": hrbp_show},
{"name": display_to_code["汇报人"], "value": manager, "showValue": manager_show},
{"name": display_to_code["在离职"], "value": is_leaving, "showValue": is_leaving},
{"name": display_to_code["域账号"], "value": domain_account, "showValue": domain_account},
]
if verbose_trace:
logger.info("字段映射job_no=%s row_id=%s", job_no, oa_record_id)
for fld in fields_payload:
logger.info("字段映射明细job_no=%s field=%s value=%s", job_no, fld["name"], fld["value"])
data_list.append(
{
"masterTable": {
"name": oa_master_table_name,
"record": {
"id": oa_record_id,
"fields": fields_payload,
},
"changedFields": [f["name"] for f in fields_payload],
},
"subTables": [],
}
)
logger.info(
"待更新数据准备完成prepared_updates=%s not_found_in_oa=%s",
len(data_list),
not_found_in_oa,
)
if debug_only:
logger.warning("DEBUG模式已停止 batch-update仅输出映射日志debug_rows=%s", debug_rows)
return {
"debug_only": True,
"ehr_total_rows": len(emp_rows),
"ehr_distinct_job_numbers": len(ehr_by_job_no),
"oa_existing_job_numbers": len(oa_id_by_job_no),
"prepared_updates": len(data_list),
"not_found_in_oa": not_found_in_oa,
"unmatched_sample": unmatched_samples[:50],
"debug_rows": debug_rows,
}
# 6) 分批执行 batch-update
success_count = 0
failed_count = 0
failed_data: dict[str, str] = {}
do_trigger_bool = _to_bool_or_none(do_trigger)
unique_filed_payload: list[str] | None = None
def _call_batch_update(*, form_code: str, chunk_rows: list[dict[str, Any]]) -> tuple[dict[str, Any], dict[str, Any], dict[str, Any]]:
resp_local = seeyon.batch_update_cap4_form_soap(
formCode=form_code,
loginName=oa_login_name,
rightId=oa_right_id,
dataList=chunk_rows,
uniqueFiled=unique_filed_payload,
doTrigger=do_trigger_bool,
)
rj_local = resp_local.json() if resp_local.content else {}
code_local = int(rj_local.get("code", -1))
if code_local != 0:
raise RuntimeError(f"OA batch-update failed code={code_local} message={rj_local.get('message')!r} formCode={form_code!r}")
data_local = rj_local.get("data") or {}
fd_local = data_local.get("failedData") or {}
if not isinstance(fd_local, dict):
fd_local = {}
return rj_local, data_local, fd_local
for i in range(0, len(data_list), batch_size):
chunk = data_list[i : i + batch_size]
if verbose_trace:
logger.info("批量更新尝试chunk_index=%s chunk_size=%s", i // batch_size + 1, len(chunk))
for row in chunk:
try:
record = (((row or {}).get("masterTable") or {}).get("record") or {})
row_id = record.get("id")
fields = record.get("fields") or []
logger.info("批量更新行row_id=%s fields_count=%s", row_id, len(fields))
for fld in fields:
if isinstance(fld, dict):
logger.info("批量更新字段row_id=%s field=%s value=%s", row_id, fld.get("name"), fld.get("value"))
except Exception:
logger.info("批量更新行日志输出失败,已忽略")
used_form_code = oa_form_code
rj, data, fd = _call_batch_update(form_code=used_form_code, chunk_rows=chunk)
message = str(rj.get("message") or "")
chunk_success = int(data.get("successCount", 0) or 0)
chunk_failed = int(data.get("failedCount", 0) or 0)
success_count += chunk_success
failed_count += chunk_failed
for k, v in fd.items():
failed_data[str(k)] = str(v)
# 打印失败原因样本,避免只有计数没有原因。
if fd:
sample_items = list(fd.items())[:20]
logger.warning(
"OA batch-update failedData sample chunk=%s size=%s formCode=%s message=%s sample=%s",
i // batch_size + 1,
len(chunk),
used_form_code,
message,
sample_items,
)
logger.info(
"OA batch-update chunk done chunk_size=%s success=%s failed=%s formCode=%s message=%s",
len(chunk),
chunk_success,
chunk_failed,
used_form_code,
message,
)
# 若整批 100% 失败,立即抛错终止并返回样本,避免任务表面继续执行。
if chunk_success == 0 and chunk_failed == len(chunk):
raise RuntimeError(
"OA batch-update chunk all failed; "
f"formCode={used_form_code!r}; message={message!r}; failed_sample={list(fd.items())[:20] if isinstance(fd, dict) else fd!r}"
)
return {
"ehr_total_rows": len(emp_rows),
"ehr_distinct_job_numbers": len(ehr_by_job_no),
"oa_existing_job_numbers": len(oa_id_by_job_no),
"prepared_updates": len(data_list),
"not_found_in_oa": not_found_in_oa,
"success_count": success_count,
"failed_count": failed_count,
"failed_data": dict(list(failed_data.items())[:100]),
}
finally:
ehr.close()
seeyon.close()