diff --git a/app/integrations/seeyon.py b/app/integrations/seeyon.py index 9e6b170..189bdb6 100644 --- a/app/integrations/seeyon.py +++ b/app/integrations/seeyon.py @@ -166,6 +166,54 @@ class SeeyonClient(BaseClient): headers={"Content-Type": "application/json"}, ) + def batch_add_cap4_form_soap( + self, + *, + formCode: str, + loginName: str, + rightId: str, + dataList: list[dict[str, Any]], + uniqueFiled: list[str] | None = None, + doTrigger: bool | None = None, + ) -> httpx.Response: + """ + 无流程批量添加: + POST /seeyon/rest/cap4/form/soap/batch-add + + 参数与 batch-update 保持一致。 + """ + form_code = str(formCode or "").strip() + login_name = str(loginName or "").strip() + right_id = str(rightId or "").strip() + if not form_code: + raise ValueError("formCode is required") + if not login_name: + raise ValueError("loginName is required") + if not right_id: + raise ValueError("rightId is required") + if not isinstance(dataList, list) or len(dataList) == 0: + raise ValueError("dataList is required and must be a non-empty list") + if uniqueFiled is not None and not isinstance(uniqueFiled, list): + raise ValueError("uniqueFiled must be a list if provided") + + body: dict[str, Any] = { + "formCode": form_code, + "loginName": login_name, + "rightId": right_id, + "dataList": dataList, + } + if uniqueFiled is not None: + body["uniqueFiled"] = uniqueFiled + if doTrigger is not None: + body["doTrigger"] = doTrigger + + return self.request( + "POST", + "/seeyon/rest/cap4/form/soap/batch-add", + json=body, + headers={"Content-Type": "application/json"}, + ) + def get_org_members_by_code(self, *, code: str, pageNo: int = 0, pageSize: int = 20) -> list[dict[str, Any]]: """ 按人员编码查询 OA 人员信息: diff --git a/extensions/sync_ehr_leaves_to_oa/job.py b/extensions/sync_ehr_leaves_to_oa/job.py index 5e5ba6e..defa42e 100644 --- a/extensions/sync_ehr_leaves_to_oa/job.py +++ b/extensions/sync_ehr_leaves_to_oa/job.py @@ -3,6 +3,7 @@ from __future__ import annotations import json import logging import re +import time from datetime import date, datetime from decimal import Decimal, InvalidOperation from typing import Any @@ -169,6 +170,12 @@ def _normalize_decimal_1(v: Any) -> str: return _decimal_to_str(_to_decimal(v)) +def _gen_temp_row_id(seed: int) -> int: + # 生成一个 18 位左右的正整数,满足 batch-add 的 record.id 约束 + base = int(time.time_ns() // 1000) + return base + int(seed) + + class SyncEhrLeavesToOaMonthJob(BaseJob): """ EHR 请假 -> OA 月度同步(按工号+日期汇总): @@ -417,10 +424,12 @@ class SyncEhrLeavesToOaMonthJob(BaseJob): existing_row_map = dict(existing_row_map_by_sql) existing_row_map.update(existing_row_map_by_export) - data_list: list[dict[str, Any]] = [] + update_data_list: list[dict[str, Any]] = [] + insert_data_list: list[dict[str, Any]] = [] to_update = 0 to_insert = 0 skipped_unchanged = 0 + insert_seed = 0 for (job_no, leave_date), leave_days in sorted(agg.items(), key=lambda x: (x[0][0], x[0][1])): leave_date_value = f"{leave_date} 00:00:00" leave_days_value = _decimal_to_str(leave_days) @@ -448,19 +457,30 @@ class SyncEhrLeavesToOaMonthJob(BaseJob): if existing_id > 0: record["id"] = existing_id to_update += 1 + update_data_list.append( + { + "masterTable": { + "name": master_table_name, + "record": record, + "changedFields": [f["name"] for f in fields_payload], + }, + "subTables": [], + } + ) else: + insert_seed += 1 + record["id"] = _gen_temp_row_id(insert_seed) to_insert += 1 - - data_list.append( - { - "masterTable": { - "name": master_table_name, - "record": record, - "changedFields": [f["name"] for f in fields_payload], - }, - "subTables": [], - } - ) + insert_data_list.append( + { + "masterTable": { + "name": master_table_name, + "record": record, + "changedFields": [f["name"] for f in fields_payload], + }, + "subTables": [], + } + ) success_count = 0 failed_count = 0 @@ -471,65 +491,83 @@ class SyncEhrLeavesToOaMonthJob(BaseJob): else: do_trigger_bool = str(do_trigger).strip().lower() in ("1", "true", "yes", "y", "on") - for i in range(0, len(data_list), batch_size): - chunk = data_list[i : i + batch_size] - if not chunk: - continue - try: - resp = seeyon.batch_update_cap4_form_soap( - formCode=oa_form_code, - loginName=oa_login_name, - rightId=oa_right_id, - dataList=chunk, - uniqueFiled=[field_job_no, field_leave_date], - doTrigger=do_trigger_bool, - ) - except httpx.HTTPStatusError as e: - resp_text = "" + def _run_chunks(*, mode: str, rows: list[dict[str, Any]]) -> None: + nonlocal success_count, failed_count, failed_data + for i in range(0, len(rows), batch_size): + chunk = rows[i : i + batch_size] + if not chunk: + continue try: - resp_text = str((e.response.text or "")[:2000]) - except Exception: + if mode == "update": + resp_local = seeyon.batch_update_cap4_form_soap( + formCode=oa_form_code, + loginName=oa_login_name, + rightId=oa_right_id, + dataList=chunk, + uniqueFiled=[field_job_no, field_leave_date], + doTrigger=do_trigger_bool, + ) + else: + resp_local = seeyon.batch_add_cap4_form_soap( + formCode=oa_form_code, + loginName=oa_login_name, + rightId=oa_right_id, + dataList=chunk, + uniqueFiled=[field_job_no, field_leave_date], + doTrigger=do_trigger_bool, + ) + except httpx.HTTPStatusError as e: resp_text = "" - first_row = chunk[0] if chunk else {} - raise RuntimeError( - "OA batch-update HTTP error " - f"status={getattr(e.response, 'status_code', None)!r} " - f"body_preview={resp_text!r} " - f"first_row={json.dumps(first_row, ensure_ascii=False, default=str)[:2000]}" - ) from e - rj = resp.json() if resp.content else {} - code_local = int(rj.get("code", -1)) - if code_local != 0: - raise RuntimeError(f"OA batch-update failed code={code_local} message={rj.get('message')!r}") - data_local = rj.get("data") or {} - chunk_success = int(data_local.get("successCount", 0) or 0) - chunk_failed = int(data_local.get("failCount", 0) or 0) - if chunk_success == 0 and chunk_failed == 0: - fd2 = data_local.get("failedData") or {} - fd_len = len(fd2) if isinstance(fd2, dict) else 0 - chunk_failed = fd_len - chunk_success = max(0, len(chunk) - chunk_failed) - success_count += chunk_success - failed_count += chunk_failed - fd = data_local.get("failedData") or {} - if isinstance(fd, dict): - for k, v in fd.items(): - if str(k) not in failed_data: - failed_data[str(k)] = str(v) - logger.info( - "OA batch-update chunk done: index=%s size=%s success=%s failed=%s message=%s", - i // batch_size + 1, - len(chunk), - chunk_success, - chunk_failed, - str(rj.get("message") or ""), - ) - if isinstance(fd, dict) and fd: - logger.warning( - "OA batch-update failedData sample: chunk=%s sample=%s", + try: + resp_text = str((e.response.text or "")[:2000]) + except Exception: + resp_text = "" + first_row = chunk[0] if chunk else {} + raise RuntimeError( + f"OA batch-{mode} HTTP error " + f"status={getattr(e.response, 'status_code', None)!r} " + f"body_preview={resp_text!r} " + f"first_row={json.dumps(first_row, ensure_ascii=False, default=str)[:2000]}" + ) from e + + rj = resp_local.json() if resp_local.content else {} + code_local = int(rj.get("code", -1)) + if code_local != 0: + raise RuntimeError(f"OA batch-{mode} failed code={code_local} message={rj.get('message')!r}") + data_local = rj.get("data") or {} + chunk_success = int(data_local.get("successCount", 0) or 0) + chunk_failed = int(data_local.get("failCount", 0) or 0) + if chunk_success == 0 and chunk_failed == 0: + fd2 = data_local.get("failedData") or {} + fd_len = len(fd2) if isinstance(fd2, dict) else 0 + chunk_failed = fd_len + chunk_success = max(0, len(chunk) - chunk_failed) + success_count += chunk_success + failed_count += chunk_failed + fd = data_local.get("failedData") or {} + if isinstance(fd, dict): + for k, v in fd.items(): + if str(k) not in failed_data: + failed_data[str(k)] = str(v) + logger.info( + "OA batch-%s chunk done: index=%s size=%s success=%s failed=%s message=%s", + mode, i // batch_size + 1, - list(fd.items())[:20], + len(chunk), + chunk_success, + chunk_failed, + str(rj.get("message") or ""), ) + if isinstance(fd, dict) and fd: + logger.warning( + "OA batch-%s failedData sample: chunk=%s sample=%s", + mode, + i // batch_size + 1, + list(fd.items())[:20], + ) + + _run_chunks(mode="update", rows=update_data_list) + _run_chunks(mode="add", rows=insert_data_list) # 写入后复核:重新 export,核对本次 key 实际存在数量 verify_resp = seeyon.export_cap4_form_soap(