This commit is contained in:
Marsway 2026-03-30 17:41:47 +08:00
parent 9d620da015
commit 233713db59
2 changed files with 153 additions and 67 deletions

View File

@ -166,6 +166,54 @@ class SeeyonClient(BaseClient):
headers={"Content-Type": "application/json"}, headers={"Content-Type": "application/json"},
) )
def batch_add_cap4_form_soap(
self,
*,
formCode: str,
loginName: str,
rightId: str,
dataList: list[dict[str, Any]],
uniqueFiled: list[str] | None = None,
doTrigger: bool | None = None,
) -> httpx.Response:
"""
无流程批量添加
POST /seeyon/rest/cap4/form/soap/batch-add
参数与 batch-update 保持一致
"""
form_code = str(formCode or "").strip()
login_name = str(loginName or "").strip()
right_id = str(rightId or "").strip()
if not form_code:
raise ValueError("formCode is required")
if not login_name:
raise ValueError("loginName is required")
if not right_id:
raise ValueError("rightId is required")
if not isinstance(dataList, list) or len(dataList) == 0:
raise ValueError("dataList is required and must be a non-empty list")
if uniqueFiled is not None and not isinstance(uniqueFiled, list):
raise ValueError("uniqueFiled must be a list if provided")
body: dict[str, Any] = {
"formCode": form_code,
"loginName": login_name,
"rightId": right_id,
"dataList": dataList,
}
if uniqueFiled is not None:
body["uniqueFiled"] = uniqueFiled
if doTrigger is not None:
body["doTrigger"] = doTrigger
return self.request(
"POST",
"/seeyon/rest/cap4/form/soap/batch-add",
json=body,
headers={"Content-Type": "application/json"},
)
def get_org_members_by_code(self, *, code: str, pageNo: int = 0, pageSize: int = 20) -> list[dict[str, Any]]: def get_org_members_by_code(self, *, code: str, pageNo: int = 0, pageSize: int = 20) -> list[dict[str, Any]]:
""" """
按人员编码查询 OA 人员信息 按人员编码查询 OA 人员信息

View File

@ -3,6 +3,7 @@ from __future__ import annotations
import json import json
import logging import logging
import re import re
import time
from datetime import date, datetime from datetime import date, datetime
from decimal import Decimal, InvalidOperation from decimal import Decimal, InvalidOperation
from typing import Any from typing import Any
@ -169,6 +170,12 @@ def _normalize_decimal_1(v: Any) -> str:
return _decimal_to_str(_to_decimal(v)) return _decimal_to_str(_to_decimal(v))
def _gen_temp_row_id(seed: int) -> int:
# 生成一个 18 位左右的正整数,满足 batch-add 的 record.id 约束
base = int(time.time_ns() // 1000)
return base + int(seed)
class SyncEhrLeavesToOaMonthJob(BaseJob): class SyncEhrLeavesToOaMonthJob(BaseJob):
""" """
EHR 请假 -> OA 月度同步按工号+日期汇总 EHR 请假 -> OA 月度同步按工号+日期汇总
@ -417,10 +424,12 @@ class SyncEhrLeavesToOaMonthJob(BaseJob):
existing_row_map = dict(existing_row_map_by_sql) existing_row_map = dict(existing_row_map_by_sql)
existing_row_map.update(existing_row_map_by_export) existing_row_map.update(existing_row_map_by_export)
data_list: list[dict[str, Any]] = [] update_data_list: list[dict[str, Any]] = []
insert_data_list: list[dict[str, Any]] = []
to_update = 0 to_update = 0
to_insert = 0 to_insert = 0
skipped_unchanged = 0 skipped_unchanged = 0
insert_seed = 0
for (job_no, leave_date), leave_days in sorted(agg.items(), key=lambda x: (x[0][0], x[0][1])): for (job_no, leave_date), leave_days in sorted(agg.items(), key=lambda x: (x[0][0], x[0][1])):
leave_date_value = f"{leave_date} 00:00:00" leave_date_value = f"{leave_date} 00:00:00"
leave_days_value = _decimal_to_str(leave_days) leave_days_value = _decimal_to_str(leave_days)
@ -448,19 +457,30 @@ class SyncEhrLeavesToOaMonthJob(BaseJob):
if existing_id > 0: if existing_id > 0:
record["id"] = existing_id record["id"] = existing_id
to_update += 1 to_update += 1
update_data_list.append(
{
"masterTable": {
"name": master_table_name,
"record": record,
"changedFields": [f["name"] for f in fields_payload],
},
"subTables": [],
}
)
else: else:
insert_seed += 1
record["id"] = _gen_temp_row_id(insert_seed)
to_insert += 1 to_insert += 1
insert_data_list.append(
data_list.append( {
{ "masterTable": {
"masterTable": { "name": master_table_name,
"name": master_table_name, "record": record,
"record": record, "changedFields": [f["name"] for f in fields_payload],
"changedFields": [f["name"] for f in fields_payload], },
}, "subTables": [],
"subTables": [], }
} )
)
success_count = 0 success_count = 0
failed_count = 0 failed_count = 0
@ -471,65 +491,83 @@ class SyncEhrLeavesToOaMonthJob(BaseJob):
else: else:
do_trigger_bool = str(do_trigger).strip().lower() in ("1", "true", "yes", "y", "on") do_trigger_bool = str(do_trigger).strip().lower() in ("1", "true", "yes", "y", "on")
for i in range(0, len(data_list), batch_size): def _run_chunks(*, mode: str, rows: list[dict[str, Any]]) -> None:
chunk = data_list[i : i + batch_size] nonlocal success_count, failed_count, failed_data
if not chunk: for i in range(0, len(rows), batch_size):
continue chunk = rows[i : i + batch_size]
try: if not chunk:
resp = seeyon.batch_update_cap4_form_soap( continue
formCode=oa_form_code,
loginName=oa_login_name,
rightId=oa_right_id,
dataList=chunk,
uniqueFiled=[field_job_no, field_leave_date],
doTrigger=do_trigger_bool,
)
except httpx.HTTPStatusError as e:
resp_text = ""
try: try:
resp_text = str((e.response.text or "")[:2000]) if mode == "update":
except Exception: resp_local = seeyon.batch_update_cap4_form_soap(
formCode=oa_form_code,
loginName=oa_login_name,
rightId=oa_right_id,
dataList=chunk,
uniqueFiled=[field_job_no, field_leave_date],
doTrigger=do_trigger_bool,
)
else:
resp_local = seeyon.batch_add_cap4_form_soap(
formCode=oa_form_code,
loginName=oa_login_name,
rightId=oa_right_id,
dataList=chunk,
uniqueFiled=[field_job_no, field_leave_date],
doTrigger=do_trigger_bool,
)
except httpx.HTTPStatusError as e:
resp_text = "" resp_text = ""
first_row = chunk[0] if chunk else {} try:
raise RuntimeError( resp_text = str((e.response.text or "")[:2000])
"OA batch-update HTTP error " except Exception:
f"status={getattr(e.response, 'status_code', None)!r} " resp_text = ""
f"body_preview={resp_text!r} " first_row = chunk[0] if chunk else {}
f"first_row={json.dumps(first_row, ensure_ascii=False, default=str)[:2000]}" raise RuntimeError(
) from e f"OA batch-{mode} HTTP error "
rj = resp.json() if resp.content else {} f"status={getattr(e.response, 'status_code', None)!r} "
code_local = int(rj.get("code", -1)) f"body_preview={resp_text!r} "
if code_local != 0: f"first_row={json.dumps(first_row, ensure_ascii=False, default=str)[:2000]}"
raise RuntimeError(f"OA batch-update failed code={code_local} message={rj.get('message')!r}") ) from e
data_local = rj.get("data") or {}
chunk_success = int(data_local.get("successCount", 0) or 0) rj = resp_local.json() if resp_local.content else {}
chunk_failed = int(data_local.get("failCount", 0) or 0) code_local = int(rj.get("code", -1))
if chunk_success == 0 and chunk_failed == 0: if code_local != 0:
fd2 = data_local.get("failedData") or {} raise RuntimeError(f"OA batch-{mode} failed code={code_local} message={rj.get('message')!r}")
fd_len = len(fd2) if isinstance(fd2, dict) else 0 data_local = rj.get("data") or {}
chunk_failed = fd_len chunk_success = int(data_local.get("successCount", 0) or 0)
chunk_success = max(0, len(chunk) - chunk_failed) chunk_failed = int(data_local.get("failCount", 0) or 0)
success_count += chunk_success if chunk_success == 0 and chunk_failed == 0:
failed_count += chunk_failed fd2 = data_local.get("failedData") or {}
fd = data_local.get("failedData") or {} fd_len = len(fd2) if isinstance(fd2, dict) else 0
if isinstance(fd, dict): chunk_failed = fd_len
for k, v in fd.items(): chunk_success = max(0, len(chunk) - chunk_failed)
if str(k) not in failed_data: success_count += chunk_success
failed_data[str(k)] = str(v) failed_count += chunk_failed
logger.info( fd = data_local.get("failedData") or {}
"OA batch-update chunk done: index=%s size=%s success=%s failed=%s message=%s", if isinstance(fd, dict):
i // batch_size + 1, for k, v in fd.items():
len(chunk), if str(k) not in failed_data:
chunk_success, failed_data[str(k)] = str(v)
chunk_failed, logger.info(
str(rj.get("message") or ""), "OA batch-%s chunk done: index=%s size=%s success=%s failed=%s message=%s",
) mode,
if isinstance(fd, dict) and fd:
logger.warning(
"OA batch-update failedData sample: chunk=%s sample=%s",
i // batch_size + 1, i // batch_size + 1,
list(fd.items())[:20], len(chunk),
chunk_success,
chunk_failed,
str(rj.get("message") or ""),
) )
if isinstance(fd, dict) and fd:
logger.warning(
"OA batch-%s failedData sample: chunk=%s sample=%s",
mode,
i // batch_size + 1,
list(fd.items())[:20],
)
_run_chunks(mode="update", rows=update_data_list)
_run_chunks(mode="add", rows=insert_data_list)
# 写入后复核:重新 export核对本次 key 实际存在数量 # 写入后复核:重新 export核对本次 key 实际存在数量
verify_resp = seeyon.export_cap4_form_soap( verify_resp = seeyon.export_cap4_form_soap(