from __future__ import annotations import json import logging import re import time from calendar import monthrange from datetime import date, datetime from decimal import Decimal, InvalidOperation from typing import Any import httpx from app.integrations.seeyon import SeeyonClient from app.jobs.base import BaseJob from extensions.sync_ehr_leaves_to_oa.api import SyncEhrLeavesToOaApi logger = logging.getLogger("connecthub.extensions.sync_ehr_leaves_to_oa") # OA SQLServer(与 sync_ehr_to_oa 同源) _OA_SQLSERVER_PARAMS: dict[str, Any] = { "host": "192.168.30.108", "port": 1433, "database": "seeyon", "username": "SHOADB91", "password": "E7nZ8x@12", "driver": "ODBC Driver 18 for SQL Server", "encrypt": "no", "trust_server_certificate": "yes", "connect_timeout_s": 10, } _OA_SQLSERVER_SCHEMA = "dbo" _OA_SQLSERVER_TABLE = "formmain_20250360" def _to_date(v: Any) -> date: if isinstance(v, date) and not isinstance(v, datetime): return v if isinstance(v, datetime): return v.date() s = str(v or "").strip() if not s: return date.today() s = s.replace("T", " ") if " " in s: s = s.split(" ", 1)[0] return datetime.strptime(s, "%Y-%m-%d").date() def _date_only(v: Any) -> str: s = str(v or "").strip() if not s: return "" s2 = s.replace("T", " ") if re.match(r"^\d{4}-\d{2}-\d{2}$", s2): return s2 if re.match(r"^\d{4}-\d{2}-\d{2}\s", s2): return s2.split(" ", 1)[0] m = re.match(r"^[A-Za-z]{3}\s+([A-Za-z]{3})\s+(\d{1,2})\s+\d{2}:\d{2}:\d{2}\s+[A-Za-z]{3,5}\s+(\d{4})$", s) if m: month_map = { "Jan": "01", "Feb": "02", "Mar": "03", "Apr": "04", "May": "05", "Jun": "06", "Jul": "07", "Aug": "08", "Sep": "09", "Oct": "10", "Nov": "11", "Dec": "12", } mon = month_map.get(m.group(1).title()) day = int(m.group(2)) year = m.group(3) if mon: return f"{year}-{mon}-{day:02d}" return s def _to_decimal(v: Any) -> Decimal: if v is None: return Decimal("0") try: return Decimal(str(v).strip()) except (InvalidOperation, ValueError): return Decimal("0") def _decimal_to_str(v: Decimal) -> str: # OA 字段请假天数为 DECIMAL(?,1),统一保留 1 位小数(如 1.0) try: q = v.quantize(Decimal("0.1")) except Exception: q = Decimal("0.0") return format(q, "f") def _to_int_safe(v: Any) -> int: try: return int(str(v).strip()) except Exception: return 0 def _cell_value(cell: Any) -> str: if isinstance(cell, dict): v = cell.get("value") if v is None or str(v).strip() == "": v = cell.get("showValue") return str(v or "").strip() return str(cell or "").strip() def _cell_show_value(cell: Any) -> str: if isinstance(cell, dict): return str(cell.get("showValue") or "").strip() return "" def _extract_oa_row_id_and_fields(row: dict[str, Any]) -> tuple[int | None, dict[str, Any]]: field_map: dict[str, Any] = {} row_id: int | None = None master = row.get("masterData") if isinstance(master, dict): for k, v in master.items(): if isinstance(k, str) and k.startswith("field"): field_map[k] = v for candidate in (row.get("id"), row.get("masterDataId"), master.get("id")): if candidate is None: continue rid = _to_int_safe(candidate) if rid > 0: row_id = rid break master_table = row.get("masterTable") if isinstance(master_table, dict): record = master_table.get("record") if isinstance(record, dict): fields = record.get("fields") if isinstance(fields, list): for fld in fields: if not isinstance(fld, dict): continue name = str(fld.get("name") or "").strip() if name: field_map[name] = fld if row_id is None: rid = _to_int_safe(record.get("id")) if rid > 0: row_id = rid row_fields = row.get("fields") if isinstance(row_fields, list): for fld in row_fields: if not isinstance(fld, dict): continue name = str(fld.get("name") or "").strip() if name: field_map[name] = fld return row_id, field_map def _normalize_decimal_1(v: Any) -> str: return _decimal_to_str(_to_decimal(v)) def _gen_temp_row_id(seed: int) -> int: # 生成一个 18 位左右的正整数,满足 batch-add 的 record.id 约束 base = int(time.time_ns() // 1000) return base + int(seed) class SyncEhrLeavesToOaMonthJob(BaseJob): """ EHR 请假 -> OA 月度同步(按工号+日期汇总): - 接口:POST /AttendanceOpen/api/v1/Vacation/GetListByDate - 统计范围:执行日所在月份的 1 号到执行日 - 唯一键:工号 + 日期 """ job_id = "sync_ehr_leaves_to_oa.sync_monthly" def run(self, params: dict[str, Any], secrets: dict[str, Any]) -> dict[str, Any]: oa_base_url = str(params.get("oa_base_url") or "").strip() oa_form_code = str(params.get("oa_form_code") or "").strip() oa_right_id = str(params.get("oa_right_id") or "").strip() oa_login_name = str(params.get("oa_login_name") or "").strip() if not oa_base_url: raise ValueError("public_cfg.oa_base_url is required") if not oa_form_code: raise ValueError("public_cfg.oa_form_code is required") if not oa_right_id: raise ValueError("public_cfg.oa_right_id is required") if not oa_login_name: raise ValueError("public_cfg.oa_login_name is required") oa_template_code = str(params.get("oa_template_code") or oa_form_code).strip() sync_month = str(params.get("sync_month") or "").strip() if sync_month: if not re.match(r"^\d{4}-\d{2}$", sync_month): raise ValueError("public_cfg.sync_month must be in format YYYY-MM, e.g. 2022-02") year = int(sync_month[:4]) month = int(sync_month[5:7]) if month < 1 or month > 12: raise ValueError("public_cfg.sync_month month must be in range 01-12") month_start = date(year, month, 1) month_end = date(year, month, monthrange(year, month)[1]) else: run_date = _to_date(params.get("run_date")) month_start = run_date.replace(day=1) month_end = run_date display_job_no = str(params.get("oa_display_job_no") or "工号").strip() display_leave_date = str(params.get("oa_display_leave_date") or "请假日期").strip() display_leave_days = str(params.get("oa_display_leave_days") or "请假天数").strip() display_name = str(params.get("oa_display_name") or "姓名").strip() if not display_job_no or not display_leave_date or not display_leave_days: raise ValueError("oa_display_job_no/oa_display_leave_date/oa_display_leave_days cannot be empty") page_size = int(params.get("page_size") or 100) if page_size <= 0: page_size = 100 batch_size = int(params.get("batch_size") or 100) if batch_size <= 0: batch_size = 100 max_pages_per_day = int(params.get("max_pages_per_day") or 500) if max_pages_per_day <= 0: max_pages_per_day = 500 debug_trace = str(params.get("debug_trace") or "").strip().lower() in ("1", "true", "yes", "y", "on") approved_statuses_param = params.get("approved_statuses") if approved_statuses_param is None: approved_statuses = {"通过", "已通过"} elif isinstance(approved_statuses_param, list): approved_statuses = {str(x).strip() for x in approved_statuses_param if str(x).strip()} else: approved_statuses = {str(approved_statuses_param).strip()} if str(approved_statuses_param).strip() else set() rest_user = str(secrets.get("rest_user") or "").strip() rest_password = str(secrets.get("rest_password") or "").strip() login_name = secrets.get("loginName") login_name = str(login_name).strip() if login_name else None if not rest_user or not rest_password: raise ValueError("secret_cfg.rest_user and secret_cfg.rest_password are required") app_key = str(secrets.get("app_key") or "").strip() app_secret = str(secrets.get("app_secret") or "").strip() if not app_key or not app_secret: raise ValueError("secret_cfg.app_key and secret_cfg.app_secret are required") sender_login_name = params.get("senderLoginName") sender_login_name = str(sender_login_name).strip() if sender_login_name else None do_trigger = params.get("doTrigger") seeyon = SeeyonClient(base_url=oa_base_url, rest_user=rest_user, rest_password=rest_password, loginName=login_name) ehr = SyncEhrLeavesToOaApi( secret_params={"app_key": app_key, "app_secret": app_secret}, sqlserver_params=_OA_SQLSERVER_PARAMS, ) try: ehr.ping_sqlserver() logger.info( "SQLServer 连通性检查通过:host=%s db=%s table=%s", _OA_SQLSERVER_PARAMS["host"], _OA_SQLSERVER_PARAMS["database"], _OA_SQLSERVER_TABLE, ) exp_resp = seeyon.export_cap4_form_soap( templateCode=oa_template_code, senderLoginName=sender_login_name, rightId=oa_right_id, ) raw = exp_resp.text or "" payload = json.loads(raw) if raw else {} code = payload.get("code") if code not in (None, 0, "0"): raise RuntimeError(f"OA export failed code={code!r} message={payload.get('message')!r}") outer = payload.get("data") or {} form = outer.get("data") or {} if not isinstance(form, dict): raise RuntimeError("OA export invalid: data.data is not an object") definition = form.get("definition") or {} fields = definition.get("fields") or [] if not isinstance(fields, list): raise RuntimeError("OA export invalid: definition.fields is not a list") display_to_code: dict[str, str] = {} for f in fields: if not isinstance(f, dict): continue display = str(f.get("display") or "").strip() name = str(f.get("name") or "").strip() if display and name: display_to_code[display] = name needed = [display_job_no, display_leave_date, display_leave_days] missing = [x for x in needed if x not in display_to_code] if missing: raise RuntimeError(f"OA export invalid: missing form fields by display names: {missing}") field_job_no = display_to_code[display_job_no] field_leave_date = display_to_code[display_leave_date] field_leave_days = display_to_code[display_leave_days] field_name = display_to_code.get(display_name) master_table_name = _OA_SQLSERVER_TABLE rows = form.get("data") or [] if not isinstance(rows, list): rows = [] vacations = ehr.get_vacations_in_date_range( start_date=month_start, end_date=month_end, page_size=page_size, max_pages_per_day=max_pages_per_day, ) staff_ids: set[int] = set() for v in vacations: sid = _to_int_safe(v.get("staffId")) if sid > 0: staff_ids.add(sid) staff_brief_map = ehr.get_staff_briefs_by_user_ids(user_ids=list(staff_ids)) agg: dict[tuple[str, str], Decimal] = {} name_map: dict[tuple[str, str], str] = {} skipped_no_job_no = 0 skipped_bad_date = 0 skipped_not_approved = 0 skipped_non_positive = 0 for item in vacations: approve_status = str(item.get("approveStatus") or "").strip() if approved_statuses and approve_status not in approved_statuses: skipped_not_approved += 1 continue sid = _to_int_safe(item.get("staffId")) staff_brief = staff_brief_map.get(sid) or {} job_no = str(staff_brief.get("job_no") or "").strip() staff_name = str(staff_brief.get("name") or "").strip() if not job_no: skipped_no_job_no += 1 continue leave_date = _date_only(item.get("vacationStartDateTime")) if not leave_date: skipped_bad_date += 1 continue try: leave_date_obj = datetime.strptime(leave_date, "%Y-%m-%d").date() except Exception: skipped_bad_date += 1 continue if leave_date_obj < month_start or leave_date_obj > month_end: skipped_bad_date += 1 continue day_value = _to_decimal(item.get("dayValueOfDuration")) if day_value <= Decimal("0"): skipped_non_positive += 1 continue key = (job_no, leave_date) exists = agg.get(key) if exists is None: agg[key] = day_value else: agg[key] = exists + day_value # 姓名按工号+日期保留一个非空值(用于 OA 展示) if staff_name and key not in name_map: name_map[key] = staff_name if debug_trace and len(agg) <= 20: logger.info( "EHR 请假汇总样本:staff_id=%s job_no=%s date=%s day_value=%s approve_status=%s", sid, job_no, leave_date, _decimal_to_str(day_value), approve_status, ) existing_row_map_by_export: dict[tuple[str, str], dict[str, str]] = {} for row in rows: if not isinstance(row, dict): continue rid, field_map = _extract_oa_row_id_and_fields(row) job_no = _cell_value(field_map.get(field_job_no)) leave_date = _cell_show_value(field_map.get(field_leave_date)) or _date_only(_cell_value(field_map.get(field_leave_date))) if not job_no or not leave_date: continue leave_days_val = _cell_value(field_map.get(field_leave_days)) name_val = _cell_value(field_map.get(field_name)) if field_name else "" existing_row_map_by_export[(job_no, leave_date)] = { "id": str(rid or ""), "job_no": job_no, "leave_date": leave_date, "leave_days": leave_days_val, "name": name_val, } logger.info( "OA export 现有记录索引完成:month=%s~%s indexed=%s", month_start.isoformat(), month_end.isoformat(), len(existing_row_map_by_export), ) if debug_trace: for (k_job, k_date), rv in list(existing_row_map_by_export.items())[:20]: logger.info( "OA export 索引样本:job_no=%s date=%s row_id=%s leave_days=%s name=%s", k_job, k_date, rv.get("id"), rv.get("leave_days"), rv.get("name"), ) existing_row_map_by_sql = ehr.get_oa_rows_by_job_and_date( table_name=_OA_SQLSERVER_TABLE, schema=_OA_SQLSERVER_SCHEMA, job_no_column=field_job_no, date_column=field_leave_date, leave_days_column=field_leave_days, name_column=field_name, start_date=month_start, end_date=month_end, ) # 以 export 为主(日期 showValue 更稳定),SQL 兜底补齐 existing_row_map = dict(existing_row_map_by_sql) existing_row_map.update(existing_row_map_by_export) update_data_list: list[dict[str, Any]] = [] insert_data_list: list[dict[str, Any]] = [] to_update = 0 to_insert = 0 skipped_unchanged = 0 insert_seed = 0 for (job_no, leave_date), leave_days in sorted(agg.items(), key=lambda x: (x[0][0], x[0][1])): leave_date_value = f"{leave_date} 00:00:00" leave_days_value = _decimal_to_str(leave_days) name_val = name_map.get((job_no, leave_date), "") existing_row = existing_row_map.get((job_no, leave_date)) if existing_row is not None: old_days = _normalize_decimal_1(existing_row.get("leave_days")) old_name = str(existing_row.get("name") or "").strip() if old_days == leave_days_value and (not field_name or old_name == name_val): skipped_unchanged += 1 continue fields_payload = [ {"name": field_job_no, "value": job_no, "showValue": job_no}, {"name": field_leave_date, "value": leave_date_value, "showValue": leave_date}, {"name": field_leave_days, "value": leave_days_value, "showValue": leave_days_value}, ] if field_name: fields_payload.append({"name": field_name, "value": name_val, "showValue": name_val}) record: dict[str, Any] = { "fields": fields_payload, } existing_id = _to_int_safe((existing_row or {}).get("id")) if existing_id > 0: record["id"] = existing_id to_update += 1 update_data_list.append( { "masterTable": { "name": master_table_name, "record": record, "changedFields": [f["name"] for f in fields_payload], }, "subTables": [], } ) else: insert_seed += 1 record["id"] = _gen_temp_row_id(insert_seed) to_insert += 1 insert_data_list.append( { "masterTable": { "name": master_table_name, "record": record, "changedFields": [f["name"] for f in fields_payload], }, "subTables": [], } ) success_count = 0 failed_count = 0 failed_data: dict[str, str] = {} do_trigger_bool: bool | None if do_trigger is None: do_trigger_bool = None else: do_trigger_bool = str(do_trigger).strip().lower() in ("1", "true", "yes", "y", "on") def _run_chunks(*, mode: str, rows: list[dict[str, Any]]) -> None: nonlocal success_count, failed_count, failed_data for i in range(0, len(rows), batch_size): chunk = rows[i : i + batch_size] if not chunk: continue try: if mode == "update": resp_local = seeyon.batch_update_cap4_form_soap( formCode=oa_form_code, loginName=oa_login_name, rightId=oa_right_id, dataList=chunk, uniqueFiled=[field_job_no, field_leave_date], doTrigger=do_trigger_bool, ) else: resp_local = seeyon.batch_add_cap4_form_soap( formCode=oa_form_code, loginName=oa_login_name, rightId=oa_right_id, dataList=chunk, uniqueFiled=[field_job_no, field_leave_date], doTrigger=do_trigger_bool, ) except httpx.HTTPStatusError as e: resp_text = "" try: resp_text = str((e.response.text or "")[:2000]) except Exception: resp_text = "" first_row = chunk[0] if chunk else {} raise RuntimeError( f"OA batch-{mode} HTTP error " f"status={getattr(e.response, 'status_code', None)!r} " f"body_preview={resp_text!r} " f"first_row={json.dumps(first_row, ensure_ascii=False, default=str)[:2000]}" ) from e rj = resp_local.json() if resp_local.content else {} code_local = int(rj.get("code", -1)) if code_local != 0: raise RuntimeError(f"OA batch-{mode} failed code={code_local} message={rj.get('message')!r}") data_local = rj.get("data") or {} chunk_success = int(data_local.get("successCount", 0) or 0) chunk_failed = int(data_local.get("failCount", 0) or 0) if chunk_success == 0 and chunk_failed == 0: fd2 = data_local.get("failedData") or {} fd_len = len(fd2) if isinstance(fd2, dict) else 0 chunk_failed = fd_len chunk_success = max(0, len(chunk) - chunk_failed) success_count += chunk_success failed_count += chunk_failed fd = data_local.get("failedData") or {} if isinstance(fd, dict): for k, v in fd.items(): if str(k) not in failed_data: failed_data[str(k)] = str(v) logger.info( "OA batch-%s chunk done: index=%s size=%s success=%s failed=%s message=%s", mode, i // batch_size + 1, len(chunk), chunk_success, chunk_failed, str(rj.get("message") or ""), ) if isinstance(fd, dict) and fd: logger.warning( "OA batch-%s failedData sample: chunk=%s sample=%s", mode, i // batch_size + 1, list(fd.items())[:20], ) _run_chunks(mode="update", rows=update_data_list) _run_chunks(mode="add", rows=insert_data_list) # 写入后复核:重新 export,核对本次 key 实际存在数量 verify_resp = seeyon.export_cap4_form_soap( templateCode=oa_template_code, senderLoginName=sender_login_name, rightId=oa_right_id, ) verify_raw = verify_resp.text or "" verify_payload = json.loads(verify_raw) if verify_raw else {} verify_form = ((verify_payload.get("data") or {}).get("data") or {}) if isinstance(verify_payload, dict) else {} verify_rows = verify_form.get("data") or [] if not isinstance(verify_rows, list): verify_rows = [] verify_key_set: set[tuple[str, str]] = set() for row in verify_rows: if not isinstance(row, dict): continue _, field_map = _extract_oa_row_id_and_fields(row) job_no = _cell_value(field_map.get(field_job_no)) leave_date = _cell_show_value(field_map.get(field_leave_date)) or _date_only(_cell_value(field_map.get(field_leave_date))) if job_no and leave_date: verify_key_set.add((job_no, leave_date)) aggregated_keys = set(agg.keys()) matched_after_write = len(aggregated_keys & verify_key_set) missing_after_write = sorted(list(aggregated_keys - verify_key_set))[:50] logger.info( "OA 写入后复核:verify_indexed=%s aggregated_keys=%s matched_after_write=%s missing_after_write=%s", len(verify_key_set), len(aggregated_keys), matched_after_write, len(aggregated_keys - verify_key_set), ) if debug_trace and missing_after_write: logger.warning("OA 写入后缺失样本(job_no,date)=%s", missing_after_write[:20]) logger.info( "EHR 请假月度同步完成:month=%s~%s vacations=%s aggregated=%s to_update=%s to_insert=%s skipped_unchanged=%s success=%s failed=%s", month_start.isoformat(), month_end.isoformat(), len(vacations), len(agg), to_update, to_insert, skipped_unchanged, success_count, failed_count, ) return { "month_start": month_start.isoformat(), "month_end": month_end.isoformat(), "vacation_records": len(vacations), "aggregated_records": len(agg), "to_update": to_update, "to_insert": to_insert, "skipped_unchanged": skipped_unchanged, "success_count": success_count, "failed_count": failed_count, "skipped_no_job_no": skipped_no_job_no, "skipped_bad_date": skipped_bad_date, "skipped_not_approved": skipped_not_approved, "skipped_non_positive": skipped_non_positive, "failed_data": dict(list(failed_data.items())[:100]), "matched_after_write": matched_after_write, "missing_after_write": len(aggregated_keys - verify_key_set), "missing_after_write_sample": missing_after_write, } finally: ehr.close() seeyon.close()