725 lines
33 KiB
Python
725 lines
33 KiB
Python
from __future__ import annotations
|
||
|
||
import json
|
||
import logging
|
||
from typing import Any
|
||
|
||
from app.integrations.seeyon import SeeyonClient
|
||
from app.jobs.base import BaseJob
|
||
from extensions.sync_ehr_to_oa.api import SyncEhrToOaApi
|
||
|
||
|
||
logger = logging.getLogger("connecthub.extensions.sync_ehr_to_oa")
|
||
|
||
# OA SQLServer(按你的要求硬编码)
|
||
_OA_SQLSERVER_PARAMS: dict[str, Any] = {
|
||
"host": "192.168.30.108",
|
||
"port": 1433,
|
||
"database": "seeyon",
|
||
"username": "SHOADB91",
|
||
"password": "E7nZ8x@12",
|
||
"driver": "ODBC Driver 18 for SQL Server",
|
||
"encrypt": "no",
|
||
"trust_server_certificate": "yes",
|
||
"connect_timeout_s": 10,
|
||
}
|
||
_OA_SQLSERVER_SCHEMA = "dbo"
|
||
_OA_SQLSERVER_TABLE = "formmain_20250359"
|
||
_OA_SQLSERVER_JOB_NO_COLUMN = "field0001"
|
||
_OA_SQLSERVER_ID_COLUMN = "id"
|
||
|
||
|
||
def _cell_value(cell: Any) -> str:
|
||
if isinstance(cell, dict):
|
||
v = cell.get("value")
|
||
if v is None or str(v).strip() == "":
|
||
v = cell.get("showValue")
|
||
return str(v or "").strip()
|
||
return str(cell or "").strip()
|
||
|
||
|
||
def _date_only(s: Any) -> str:
|
||
v = str(s or "").strip()
|
||
if not v:
|
||
return ""
|
||
if "T" in v:
|
||
return v.split("T", 1)[0]
|
||
if " " in v:
|
||
return v.split(" ", 1)[0]
|
||
return v
|
||
|
||
|
||
def _custom_prop_value(custom_props: Any, key: str | None) -> str:
|
||
if not key:
|
||
return ""
|
||
if not isinstance(custom_props, dict):
|
||
return ""
|
||
raw = custom_props.get(key)
|
||
if isinstance(raw, dict):
|
||
val = raw.get("value")
|
||
if val is None or str(val).strip() == "":
|
||
val = raw.get("showValue")
|
||
return str(val or "").strip()
|
||
return str(raw or "").strip()
|
||
|
||
|
||
def _to_bool_or_none(v: Any) -> bool | None:
|
||
if v is None:
|
||
return None
|
||
if isinstance(v, bool):
|
||
return v
|
||
s = str(v).strip().lower()
|
||
if s in ("1", "true", "yes", "y", "on"):
|
||
return True
|
||
if s in ("0", "false", "no", "n", "off", ""):
|
||
return False
|
||
return bool(v)
|
||
|
||
|
||
def _normalize_job_no(v: Any) -> str:
|
||
"""
|
||
工号标准化:
|
||
- 去首尾空白、去内部空格
|
||
- 数值型字符串如 123.0 -> 123(常见于表单数字字段)
|
||
- 统一大写,便于大小写不敏感匹配
|
||
"""
|
||
s = str(v or "").strip()
|
||
if not s:
|
||
return ""
|
||
s = s.replace(" ", "")
|
||
try:
|
||
if "." in s and s.endswith(".0"):
|
||
i = int(float(s))
|
||
s = str(i)
|
||
except Exception:
|
||
pass
|
||
return s.upper()
|
||
|
||
|
||
def _prefer_non_empty(new_val: Any, old_val: Any) -> str:
|
||
s_new = str(new_val or "").strip()
|
||
if s_new:
|
||
return s_new
|
||
return str(old_val or "").strip()
|
||
|
||
|
||
def _resolve_org_for_company(record_info: dict[str, Any], org_by_oid: dict[str, dict[str, Any]]) -> tuple[dict[str, Any], str, str]:
|
||
"""
|
||
解析“所属公司”使用的组织对象。
|
||
经验上 oIdOrganization 可能是公司根(导致全员同值),优先尝试 oIdDepartment。
|
||
返回:(org_obj, source_field, source_oid)
|
||
"""
|
||
dept_oid = str(record_info.get("oIdDepartment") or "").strip()
|
||
org_oid = str(record_info.get("oIdOrganization") or "").strip()
|
||
for source, oid in (("oIdDepartment", dept_oid), ("oIdOrganization", org_oid)):
|
||
if not oid:
|
||
continue
|
||
org = org_by_oid.get(oid) or {}
|
||
if isinstance(org, dict) and str(org.get("name") or "").strip():
|
||
return org, source, oid
|
||
return {}, "", ""
|
||
|
||
|
||
def _extract_oa_row_id_and_fields(row: dict[str, Any]) -> tuple[int | None, dict[str, Any]]:
|
||
"""
|
||
兼容不同 OA export 返回结构,提取:
|
||
- row_id
|
||
- 字段字典(key=fieldCode, value=单元格对象或值)
|
||
"""
|
||
field_map: dict[str, Any] = {}
|
||
row_id: int | None = None
|
||
|
||
# 结构 A:masterData 直接是 {field0001: {value,showValue}, ...}
|
||
master = row.get("masterData")
|
||
if isinstance(master, dict):
|
||
for k, v in master.items():
|
||
if isinstance(k, str) and k.startswith("field"):
|
||
field_map[k] = v
|
||
for candidate in (row.get("id"), row.get("masterDataId"), master.get("id")):
|
||
if candidate is None:
|
||
continue
|
||
try:
|
||
row_id = int(str(candidate))
|
||
break
|
||
except Exception:
|
||
continue
|
||
|
||
# 结构 B:masterTable.record.fields = [{name,value,showValue}, ...]
|
||
master_table = row.get("masterTable")
|
||
if isinstance(master_table, dict):
|
||
record = master_table.get("record")
|
||
if isinstance(record, dict):
|
||
fields = record.get("fields")
|
||
if isinstance(fields, list):
|
||
for fld in fields:
|
||
if not isinstance(fld, dict):
|
||
continue
|
||
name = str(fld.get("name") or "").strip()
|
||
if name:
|
||
field_map[name] = fld
|
||
if row_id is None:
|
||
rid = record.get("id")
|
||
if rid is not None:
|
||
try:
|
||
row_id = int(str(rid))
|
||
except Exception:
|
||
pass
|
||
|
||
# 结构 C:行级 fields 列表
|
||
row_fields = row.get("fields")
|
||
if isinstance(row_fields, list):
|
||
for fld in row_fields:
|
||
if not isinstance(fld, dict):
|
||
continue
|
||
name = str(fld.get("name") or "").strip()
|
||
if name:
|
||
field_map[name] = fld
|
||
|
||
return row_id, field_map
|
||
|
||
|
||
def _choose_better_record(current: dict[str, Any], candidate: dict[str, Any]) -> dict[str, Any]:
|
||
def _score(item: dict[str, Any]) -> str:
|
||
record = item.get("recordInfo") or {}
|
||
emp = item.get("employeeInfo") or {}
|
||
parts = [
|
||
str(record.get("businessModifiedTime") or ""),
|
||
str(record.get("modifiedTime") or ""),
|
||
str(emp.get("businessModifiedTime") or ""),
|
||
str(emp.get("modifiedTime") or ""),
|
||
str(record.get("createdTime") or ""),
|
||
str(emp.get("createdTime") or ""),
|
||
]
|
||
return "|".join(parts)
|
||
|
||
return candidate if _score(candidate) >= _score(current) else current
|
||
|
||
|
||
class SyncEhrToOaFormJob(BaseJob):
|
||
"""
|
||
EHR -> OA 无流程表单字段同步。
|
||
|
||
同步字段:
|
||
- 工号(作为唯一对应关系,不写入)
|
||
- 所属公司、姓名、研发属性、工作地点、入职日期、离职日期、身份证号、HRBP、汇报人、在离职、域账号
|
||
"""
|
||
|
||
job_id = "sync_ehr_to_oa.sync_form"
|
||
|
||
def run(self, params: dict[str, Any], secrets: dict[str, Any]) -> dict[str, Any]:
|
||
oa_base_url = str(params.get("oa_base_url") or "").strip()
|
||
oa_form_code = str(params.get("oa_form_code") or "").strip()
|
||
oa_right_id = str(params.get("oa_right_id") or "").strip()
|
||
oa_login_name = str(params.get("oa_login_name") or "").strip()
|
||
if not oa_base_url:
|
||
raise ValueError("public_cfg.oa_base_url is required")
|
||
if not oa_form_code:
|
||
raise ValueError("public_cfg.oa_form_code is required")
|
||
if not oa_right_id:
|
||
raise ValueError("public_cfg.oa_right_id is required")
|
||
if not oa_login_name:
|
||
raise ValueError("public_cfg.oa_login_name is required")
|
||
|
||
oa_template_code = str(params.get("oa_template_code") or oa_form_code).strip()
|
||
oa_master_table_name = str(params.get("oa_master_table_name") or "").strip()
|
||
batch_size = int(params.get("batch_size") or 100)
|
||
if batch_size <= 0:
|
||
batch_size = 100
|
||
|
||
stop_time = params.get("stop_time")
|
||
capacity = int(params.get("capacity") or 300)
|
||
do_trigger = params.get("do_trigger")
|
||
sender_login_name = params.get("senderLoginName")
|
||
sender_login_name = str(sender_login_name).strip() if sender_login_name else None
|
||
|
||
rest_user = str(secrets.get("rest_user") or "").strip()
|
||
rest_password = str(secrets.get("rest_password") or "").strip()
|
||
login_name = secrets.get("loginName")
|
||
login_name = str(login_name).strip() if login_name else None
|
||
if not rest_user or not rest_password:
|
||
raise ValueError("secret_cfg.rest_user and secret_cfg.rest_password are required")
|
||
|
||
app_key = str(secrets.get("app_key") or "").strip()
|
||
app_secret = str(secrets.get("app_secret") or "").strip()
|
||
if not app_key or not app_secret:
|
||
raise ValueError("secret_cfg.app_key and secret_cfg.app_secret are required")
|
||
|
||
rd_attr_custom_key = str(params.get("rd_attr_custom_key") or "").strip() or None
|
||
domain_custom_key = str(params.get("domain_account_custom_key") or "").strip() or None
|
||
verbose_trace = _to_bool_or_none(params.get("verbose_trace"))
|
||
if verbose_trace is None:
|
||
verbose_trace = True
|
||
preview_ehr_data = _to_bool_or_none(params.get("preview_ehr_data"))
|
||
if preview_ehr_data is None:
|
||
preview_ehr_data = True
|
||
preview_limit = int(params.get("preview_limit") or 20)
|
||
if preview_limit <= 0:
|
||
preview_limit = 20
|
||
|
||
seeyon = SeeyonClient(base_url=oa_base_url, rest_user=rest_user, rest_password=rest_password, loginName=login_name)
|
||
ehr = SyncEhrToOaApi(
|
||
secret_params={"app_key": app_key, "app_secret": app_secret},
|
||
sqlserver_params=_OA_SQLSERVER_PARAMS,
|
||
)
|
||
try:
|
||
try:
|
||
ehr.ping_sqlserver()
|
||
logger.info(
|
||
"SQLServer 连通性检查通过:host=%s db=%s table=%s",
|
||
_OA_SQLSERVER_PARAMS["host"],
|
||
_OA_SQLSERVER_PARAMS["database"],
|
||
_OA_SQLSERVER_TABLE,
|
||
)
|
||
except Exception as e: # noqa: BLE001
|
||
raise RuntimeError(f"SQLServer 连接失败: {e!r}") from e
|
||
|
||
# 1) EHR 拉取员工任职与组织
|
||
emp_res = ehr.get_all_employees_with_record_by_time_window(stop_time=stop_time, capacity=capacity)
|
||
org_res = ehr.get_all_organizations_by_time_window(stop_time=stop_time, capacity=capacity)
|
||
emp_rows = emp_res.get("data") or []
|
||
org_rows = org_res.get("data") or []
|
||
if not isinstance(emp_rows, list):
|
||
raise RuntimeError("EHR employee result invalid: data is not list")
|
||
if not isinstance(org_rows, list):
|
||
raise RuntimeError("EHR organization result invalid: data is not list")
|
||
|
||
# 2) 组织映射
|
||
org_by_oid: dict[str, dict[str, Any]] = {}
|
||
for o in org_rows:
|
||
if not isinstance(o, dict):
|
||
continue
|
||
oid = str(o.get("oId") or "").strip()
|
||
if oid:
|
||
org_by_oid[oid] = o
|
||
|
||
# 3) 员工按工号归并(同工号保留“最新”记录)
|
||
ehr_by_job_no: dict[str, dict[str, Any]] = {}
|
||
ehr_by_job_no_norm: dict[str, dict[str, Any]] = {}
|
||
for item in emp_rows:
|
||
if not isinstance(item, dict):
|
||
continue
|
||
record = item.get("recordInfo") or {}
|
||
if not isinstance(record, dict):
|
||
continue
|
||
job_no = str(record.get("jobNumber") or "").strip()
|
||
if not job_no:
|
||
continue
|
||
existing = ehr_by_job_no.get(job_no)
|
||
ehr_by_job_no[job_no] = item if existing is None else _choose_better_record(existing, item)
|
||
job_no_norm = _normalize_job_no(job_no)
|
||
if job_no_norm:
|
||
ex2 = ehr_by_job_no_norm.get(job_no_norm)
|
||
ehr_by_job_no_norm[job_no_norm] = item if ex2 is None else _choose_better_record(ex2, item)
|
||
logger.info(
|
||
"EHR 数据准备完成:employee_rows=%s organization_rows=%s distinct_job_numbers=%s distinct_job_numbers_norm=%s",
|
||
len(emp_rows),
|
||
len(org_rows),
|
||
len(ehr_by_job_no),
|
||
len(ehr_by_job_no_norm),
|
||
)
|
||
if verbose_trace:
|
||
for job_no in list(ehr_by_job_no.keys()):
|
||
logger.info("EHR 工号明细:raw=%s norm=%s", job_no, _normalize_job_no(job_no))
|
||
if preview_ehr_data:
|
||
logger.info("EHR 字段预览开始:limit=%s", preview_limit)
|
||
count = 0
|
||
for job_no, item in ehr_by_job_no.items():
|
||
emp = item.get("employeeInfo") or {}
|
||
rec = item.get("recordInfo") or {}
|
||
if not isinstance(emp, dict):
|
||
emp = {}
|
||
if not isinstance(rec, dict):
|
||
rec = {}
|
||
|
||
org, company_source, company_source_oid = _resolve_org_for_company(rec, org_by_oid)
|
||
company = str((org or {}).get("name") or "")
|
||
name = str(emp.get("name") or "")
|
||
rd_attr = _custom_prop_value(rec.get("customProperties"), rd_attr_custom_key) or _custom_prop_value(
|
||
emp.get("customProperties"), rd_attr_custom_key
|
||
)
|
||
place = str(rec.get("place") or "")
|
||
entry_date = _date_only(rec.get("entryDate"))
|
||
leave_date = _date_only(rec.get("lastWorkDate")) or "2099-12-31"
|
||
id_number = str(emp.get("iDNumber") or "")
|
||
hrbp = str((org or {}).get("hRBP") or "")
|
||
manager = str(rec.get("pOIdEmpAdmin") or "")
|
||
is_leaving = "是" if _date_only(rec.get("lastWorkDate")) else "否"
|
||
domain_account = _custom_prop_value(emp.get("customProperties"), domain_custom_key) or str(emp.get("_Name") or "")
|
||
logger.info(
|
||
"EHR 字段预览:job_no=%s company=%s company_source=%s company_source_oid=%s name=%s rd_attr=%s place=%s entry_date=%s leave_date=%s id_number=%s hrbp=%s manager=%s is_leaving=%s domain_account=%s",
|
||
job_no,
|
||
company,
|
||
company_source,
|
||
company_source_oid,
|
||
name,
|
||
rd_attr,
|
||
place,
|
||
entry_date,
|
||
leave_date,
|
||
id_number,
|
||
hrbp,
|
||
manager,
|
||
is_leaving,
|
||
domain_account,
|
||
)
|
||
count += 1
|
||
if count >= preview_limit:
|
||
break
|
||
logger.info("EHR 字段预览结束:printed=%s", count)
|
||
|
||
# 4) 导出 OA 表单,建立字段映射 + 工号到记录ID映射
|
||
exp_resp = seeyon.export_cap4_form_soap(
|
||
templateCode=oa_template_code,
|
||
senderLoginName=sender_login_name,
|
||
rightId=oa_right_id,
|
||
)
|
||
raw = exp_resp.text or ""
|
||
logger.info(
|
||
"OA export 返回:status=%s content_length=%s template=%s",
|
||
exp_resp.status_code,
|
||
len(raw),
|
||
oa_template_code,
|
||
)
|
||
if raw:
|
||
logger.info("OA export 响应预览:%s", raw[:1000])
|
||
try:
|
||
payload = json.loads(raw) if raw else {}
|
||
except Exception as e: # noqa: BLE001
|
||
raise RuntimeError(f"OA export 响应不是有效 JSON: err={e!r} preview={raw[:500]!r}") from e
|
||
|
||
export_code = payload.get("code")
|
||
export_message = payload.get("message")
|
||
if export_code not in (None, 0, "0"):
|
||
raise RuntimeError(f"OA export failed code={export_code!r} message={export_message!r}")
|
||
outer = payload.get("data") or {}
|
||
form = outer.get("data") or {}
|
||
if not isinstance(form, dict):
|
||
raise RuntimeError(
|
||
f"OA export invalid: data.data is not an object; payload_keys={list(payload.keys())[:20]}"
|
||
)
|
||
|
||
definition = form.get("definition") or {}
|
||
fields = definition.get("fields") or []
|
||
if not isinstance(fields, list):
|
||
raise RuntimeError("OA export invalid: definition.fields is not a list")
|
||
|
||
display_to_code: dict[str, str] = {}
|
||
for f in fields:
|
||
if not isinstance(f, dict):
|
||
continue
|
||
display = str(f.get("display") or "").strip()
|
||
name = str(f.get("name") or "").strip()
|
||
if display and name:
|
||
display_to_code[display] = name
|
||
|
||
needed_displays = [
|
||
"工号",
|
||
"所属公司",
|
||
"姓名",
|
||
"研发属性",
|
||
"工作地点",
|
||
"入职日期",
|
||
"离职日期",
|
||
"身份证号",
|
||
"HRBP",
|
||
"汇报人",
|
||
"在离职",
|
||
"域账号",
|
||
]
|
||
missing = [x for x in needed_displays if x not in display_to_code]
|
||
if missing:
|
||
raise RuntimeError(f"OA export invalid: missing form fields by display names: {missing}")
|
||
rows = form.get("data") or []
|
||
if not isinstance(rows, list):
|
||
raise RuntimeError("OA export invalid: data is not a list")
|
||
|
||
if not oa_master_table_name:
|
||
for key in ("masterTableName", "masterTable", "masterTableCode"):
|
||
v = str((definition or {}).get(key) or "").strip()
|
||
if v:
|
||
oa_master_table_name = v
|
||
break
|
||
if not oa_master_table_name and fields:
|
||
first_field = fields[0] if isinstance(fields[0], dict) else {}
|
||
oa_master_table_name = str(first_field.get("tableName") or "").strip()
|
||
# 与 SQLServer 查询目标保持一致(优先使用硬编码表)
|
||
oa_master_table_name = _OA_SQLSERVER_TABLE
|
||
if not oa_master_table_name:
|
||
raise RuntimeError("public_cfg.oa_master_table_name is required (cannot infer from OA export)")
|
||
logger.info(
|
||
"OA 表单解析完成:template=%s master_table=%s",
|
||
oa_template_code,
|
||
oa_master_table_name,
|
||
)
|
||
|
||
# 从 export 中提取“工号 -> 字段值字典”,用于值兜底(避免把已有值覆盖为空)
|
||
oa_fields_by_job_no_norm: dict[str, dict[str, Any]] = {}
|
||
export_id_by_job_no: dict[str, int] = {}
|
||
export_id_by_job_no_norm: dict[str, int] = {}
|
||
for row in rows:
|
||
if not isinstance(row, dict):
|
||
continue
|
||
rid, field_map = _extract_oa_row_id_and_fields(row)
|
||
job_cell = field_map.get(display_to_code["工号"])
|
||
job_no = _cell_value(job_cell)
|
||
norm = _normalize_job_no(job_no)
|
||
if norm:
|
||
oa_fields_by_job_no_norm[norm] = field_map
|
||
if rid is not None:
|
||
export_id_by_job_no[job_no] = rid
|
||
export_id_by_job_no_norm[norm] = rid
|
||
logger.info("OA export 字段值索引完成:rows=%s indexed_by_job_no=%s", len(rows), len(oa_fields_by_job_no_norm))
|
||
logger.info(
|
||
"OA export 记录ID索引完成:raw=%s norm=%s",
|
||
len(export_id_by_job_no),
|
||
len(export_id_by_job_no_norm),
|
||
)
|
||
|
||
job_field_code = display_to_code["工号"]
|
||
oa_id_by_job_no: dict[str, int] = {}
|
||
oa_id_by_job_no_norm: dict[str, int] = {}
|
||
sql_map = ehr.get_oa_record_id_map_from_sqlserver(
|
||
table_name=_OA_SQLSERVER_TABLE,
|
||
schema=_OA_SQLSERVER_SCHEMA,
|
||
job_numbers=list(ehr_by_job_no.keys()),
|
||
job_no_column=_OA_SQLSERVER_JOB_NO_COLUMN,
|
||
id_column=_OA_SQLSERVER_ID_COLUMN,
|
||
)
|
||
for job_no, row_id in sql_map.items():
|
||
oa_id_by_job_no[job_no] = row_id
|
||
job_no_norm = _normalize_job_no(job_no)
|
||
if job_no_norm:
|
||
oa_id_by_job_no_norm[job_no_norm] = row_id
|
||
logger.info(
|
||
"OA 工号索引完成(SQLServer):indexed_job_numbers=%s indexed_job_numbers_norm=%s",
|
||
len(oa_id_by_job_no),
|
||
len(oa_id_by_job_no_norm),
|
||
)
|
||
# 关键修复:优先使用 export 的 record.id(与 batch-update 同源),SQL 仅兜底。
|
||
merged_id_by_job_no = dict(oa_id_by_job_no)
|
||
merged_id_by_job_no_norm = dict(oa_id_by_job_no_norm)
|
||
merged_id_by_job_no.update(export_id_by_job_no)
|
||
merged_id_by_job_no_norm.update(export_id_by_job_no_norm)
|
||
logger.info(
|
||
"OA 工号索引合并完成(export 优先, SQL 兜底):raw=%s norm=%s",
|
||
len(merged_id_by_job_no),
|
||
len(merged_id_by_job_no_norm),
|
||
)
|
||
# 记录 ID 冲突样本,便于确认 SQL 与 export 是否来自同一数据源
|
||
id_conflict_samples: list[tuple[str, int, int]] = []
|
||
for k, export_id in export_id_by_job_no.items():
|
||
sql_id = oa_id_by_job_no.get(k)
|
||
if sql_id is not None and sql_id != export_id and len(id_conflict_samples) < 20:
|
||
id_conflict_samples.append((k, sql_id, export_id))
|
||
if id_conflict_samples:
|
||
logger.warning("OA 记录ID冲突样本(job_no, sql_id, export_id)=%s", id_conflict_samples)
|
||
if verbose_trace:
|
||
for job_no, row_id in list(merged_id_by_job_no.items()):
|
||
logger.info("OA 工号索引明细:raw=%s norm=%s row_id=%s", job_no, _normalize_job_no(job_no), row_id)
|
||
|
||
# 5) 组装批量更新数据
|
||
data_list: list[dict[str, Any]] = []
|
||
not_found_in_oa = 0
|
||
unmatched_samples: list[str] = []
|
||
for job_no, item in ehr_by_job_no.items():
|
||
oa_record_id = merged_id_by_job_no.get(job_no)
|
||
matched_by = "raw"
|
||
if oa_record_id is None:
|
||
oa_record_id = merged_id_by_job_no_norm.get(_normalize_job_no(job_no))
|
||
matched_by = "normalized"
|
||
if oa_record_id is None:
|
||
not_found_in_oa += 1
|
||
if len(unmatched_samples) < 20:
|
||
unmatched_samples.append(job_no)
|
||
if verbose_trace:
|
||
logger.info("匹配失败:job_no=%s norm=%s", job_no, _normalize_job_no(job_no))
|
||
continue
|
||
if verbose_trace:
|
||
logger.info(
|
||
"匹配成功:job_no=%s norm=%s row_id=%s matched_by=%s",
|
||
job_no,
|
||
_normalize_job_no(job_no),
|
||
oa_record_id,
|
||
matched_by,
|
||
)
|
||
|
||
emp = item.get("employeeInfo") or {}
|
||
rec = item.get("recordInfo") or {}
|
||
if not isinstance(emp, dict):
|
||
emp = {}
|
||
if not isinstance(rec, dict):
|
||
rec = {}
|
||
|
||
org, company_source, company_source_oid = _resolve_org_for_company(rec, org_by_oid)
|
||
existing_field_map = oa_fields_by_job_no_norm.get(_normalize_job_no(job_no), {})
|
||
|
||
company = str((org or {}).get("name") or "")
|
||
name = str(emp.get("name") or "")
|
||
rd_attr = _custom_prop_value(rec.get("customProperties"), rd_attr_custom_key) or _custom_prop_value(
|
||
emp.get("customProperties"), rd_attr_custom_key
|
||
)
|
||
place = str(rec.get("place") or "")
|
||
entry_date = _date_only(rec.get("entryDate"))
|
||
leave_date = _date_only(rec.get("lastWorkDate")) or "2099-12-31"
|
||
id_number = str(emp.get("iDNumber") or "")
|
||
hrbp = str((org or {}).get("hRBP") or "")
|
||
manager = str(rec.get("pOIdEmpAdmin") or "")
|
||
is_leaving = "是" if _date_only(rec.get("lastWorkDate")) else "否"
|
||
domain_account = _custom_prop_value(emp.get("customProperties"), domain_custom_key) or str(emp.get("_Name") or "")
|
||
|
||
company = _prefer_non_empty(company, _cell_value(existing_field_map.get(display_to_code["所属公司"])))
|
||
name = _prefer_non_empty(name, _cell_value(existing_field_map.get(display_to_code["姓名"])))
|
||
rd_attr = _prefer_non_empty(rd_attr, _cell_value(existing_field_map.get(display_to_code["研发属性"])))
|
||
place = _prefer_non_empty(place, _cell_value(existing_field_map.get(display_to_code["工作地点"])))
|
||
entry_date = _prefer_non_empty(entry_date, _cell_value(existing_field_map.get(display_to_code["入职日期"])))
|
||
# 离职日期按需求默认 2099-12-31,仅当已有值且北森也空时可被已有值覆盖
|
||
leave_date = _prefer_non_empty(leave_date, _cell_value(existing_field_map.get(display_to_code["离职日期"])))
|
||
id_number = _prefer_non_empty(id_number, _cell_value(existing_field_map.get(display_to_code["身份证号"])))
|
||
hrbp = _prefer_non_empty(hrbp, _cell_value(existing_field_map.get(display_to_code["HRBP"])))
|
||
manager = _prefer_non_empty(manager, _cell_value(existing_field_map.get(display_to_code["汇报人"])))
|
||
is_leaving = _prefer_non_empty(is_leaving, _cell_value(existing_field_map.get(display_to_code["在离职"])))
|
||
domain_account = _prefer_non_empty(domain_account, _cell_value(existing_field_map.get(display_to_code["域账号"])))
|
||
|
||
fields_payload = [
|
||
{"name": display_to_code["所属公司"], "value": company, "showValue": company},
|
||
{"name": display_to_code["姓名"], "value": name, "showValue": name},
|
||
{"name": display_to_code["研发属性"], "value": rd_attr, "showValue": rd_attr},
|
||
{"name": display_to_code["工作地点"], "value": place, "showValue": place},
|
||
{"name": display_to_code["入职日期"], "value": entry_date, "showValue": entry_date},
|
||
{"name": display_to_code["离职日期"], "value": leave_date, "showValue": leave_date},
|
||
{"name": display_to_code["身份证号"], "value": id_number, "showValue": id_number},
|
||
{"name": display_to_code["HRBP"], "value": hrbp, "showValue": hrbp},
|
||
{"name": display_to_code["汇报人"], "value": manager, "showValue": manager},
|
||
{"name": display_to_code["在离职"], "value": is_leaving, "showValue": is_leaving},
|
||
{"name": display_to_code["域账号"], "value": domain_account, "showValue": domain_account},
|
||
]
|
||
if verbose_trace:
|
||
logger.info(
|
||
"字段映射:job_no=%s row_id=%s company_source=%s company_source_oid=%s",
|
||
job_no,
|
||
oa_record_id,
|
||
company_source,
|
||
company_source_oid,
|
||
)
|
||
for fld in fields_payload:
|
||
logger.info("字段映射明细:job_no=%s field=%s value=%s", job_no, fld["name"], fld["value"])
|
||
|
||
data_list.append(
|
||
{
|
||
"masterTable": {
|
||
"name": oa_master_table_name,
|
||
"record": {
|
||
"id": oa_record_id,
|
||
"fields": fields_payload,
|
||
},
|
||
"changedFields": [f["name"] for f in fields_payload],
|
||
},
|
||
"subTables": [],
|
||
}
|
||
)
|
||
logger.info(
|
||
"待更新数据准备完成:prepared_updates=%s not_found_in_oa=%s",
|
||
len(data_list),
|
||
not_found_in_oa,
|
||
)
|
||
if not data_list:
|
||
raise RuntimeError(
|
||
"No updates prepared for OA batch-update (check jobNumber matching between EHR and OA, and form field mapping). "
|
||
f"unmatched_sample={unmatched_samples}"
|
||
)
|
||
|
||
# 6) 分批执行 batch-update
|
||
success_count = 0
|
||
failed_count = 0
|
||
failed_data: dict[str, str] = {}
|
||
do_trigger_bool = _to_bool_or_none(do_trigger)
|
||
unique_filed_payload: list[str] | None = None
|
||
|
||
def _call_batch_update(*, form_code: str, chunk_rows: list[dict[str, Any]]) -> tuple[dict[str, Any], dict[str, Any], dict[str, Any]]:
|
||
resp_local = seeyon.batch_update_cap4_form_soap(
|
||
formCode=form_code,
|
||
loginName=oa_login_name,
|
||
rightId=oa_right_id,
|
||
dataList=chunk_rows,
|
||
uniqueFiled=unique_filed_payload,
|
||
doTrigger=do_trigger_bool,
|
||
)
|
||
rj_local = resp_local.json() if resp_local.content else {}
|
||
code_local = int(rj_local.get("code", -1))
|
||
if code_local != 0:
|
||
raise RuntimeError(f"OA batch-update failed code={code_local} message={rj_local.get('message')!r} formCode={form_code!r}")
|
||
data_local = rj_local.get("data") or {}
|
||
fd_local = data_local.get("failedData") or {}
|
||
if not isinstance(fd_local, dict):
|
||
fd_local = {}
|
||
return rj_local, data_local, fd_local
|
||
|
||
for i in range(0, len(data_list), batch_size):
|
||
chunk = data_list[i : i + batch_size]
|
||
if verbose_trace:
|
||
logger.info("批量更新尝试:chunk_index=%s chunk_size=%s", i // batch_size + 1, len(chunk))
|
||
for row in chunk:
|
||
try:
|
||
record = (((row or {}).get("masterTable") or {}).get("record") or {})
|
||
row_id = record.get("id")
|
||
fields = record.get("fields") or []
|
||
logger.info("批量更新行:row_id=%s fields_count=%s", row_id, len(fields))
|
||
for fld in fields:
|
||
if isinstance(fld, dict):
|
||
logger.info("批量更新字段:row_id=%s field=%s value=%s", row_id, fld.get("name"), fld.get("value"))
|
||
except Exception:
|
||
logger.info("批量更新行日志输出失败,已忽略")
|
||
used_form_code = oa_form_code
|
||
rj, data, fd = _call_batch_update(form_code=used_form_code, chunk_rows=chunk)
|
||
message = str(rj.get("message") or "")
|
||
|
||
chunk_success = int(data.get("successCount", 0) or 0)
|
||
chunk_failed = int(data.get("failedCount", 0) or 0)
|
||
|
||
success_count += chunk_success
|
||
failed_count += chunk_failed
|
||
for k, v in fd.items():
|
||
failed_data[str(k)] = str(v)
|
||
|
||
# 打印失败原因样本,避免只有计数没有原因。
|
||
if fd:
|
||
sample_items = list(fd.items())[:20]
|
||
logger.warning(
|
||
"OA batch-update failedData sample chunk=%s size=%s formCode=%s message=%s sample=%s",
|
||
i // batch_size + 1,
|
||
len(chunk),
|
||
used_form_code,
|
||
message,
|
||
sample_items,
|
||
)
|
||
|
||
logger.info(
|
||
"OA batch-update chunk done chunk_size=%s success=%s failed=%s formCode=%s message=%s",
|
||
len(chunk),
|
||
chunk_success,
|
||
chunk_failed,
|
||
used_form_code,
|
||
message,
|
||
)
|
||
# 若整批 100% 失败,立即抛错终止并返回样本,避免任务表面继续执行。
|
||
if chunk_success == 0 and chunk_failed == len(chunk):
|
||
raise RuntimeError(
|
||
"OA batch-update chunk all failed; "
|
||
f"formCode={used_form_code!r}; message={message!r}; failed_sample={list(fd.items())[:20] if isinstance(fd, dict) else fd!r}"
|
||
)
|
||
|
||
return {
|
||
"ehr_total_rows": len(emp_rows),
|
||
"ehr_distinct_job_numbers": len(ehr_by_job_no),
|
||
"oa_existing_job_numbers": len(oa_id_by_job_no),
|
||
"prepared_updates": len(data_list),
|
||
"not_found_in_oa": not_found_in_oa,
|
||
"success_count": success_count,
|
||
"failed_count": failed_count,
|
||
"failed_data": dict(list(failed_data.items())[:100]),
|
||
}
|
||
finally:
|
||
ehr.close()
|
||
seeyon.close()
|
||
|