from __future__ import annotations import json import logging from datetime import date, datetime from decimal import Decimal, InvalidOperation from typing import Any import httpx from app.integrations.seeyon import SeeyonClient from app.jobs.base import BaseJob from extensions.sync_ehr_leaves_to_oa.api import SyncEhrLeavesToOaApi logger = logging.getLogger("connecthub.extensions.sync_ehr_leaves_to_oa") # OA SQLServer(与 sync_ehr_to_oa 同源) _OA_SQLSERVER_PARAMS: dict[str, Any] = { "host": "192.168.30.108", "port": 1433, "database": "seeyon", "username": "SHOADB91", "password": "E7nZ8x@12", "driver": "ODBC Driver 18 for SQL Server", "encrypt": "no", "trust_server_certificate": "yes", "connect_timeout_s": 10, } _OA_SQLSERVER_SCHEMA = "dbo" _OA_SQLSERVER_TABLE = "formmain_20250360" def _to_date(v: Any) -> date: if isinstance(v, date) and not isinstance(v, datetime): return v if isinstance(v, datetime): return v.date() s = str(v or "").strip() if not s: return date.today() s = s.replace("T", " ") if " " in s: s = s.split(" ", 1)[0] return datetime.strptime(s, "%Y-%m-%d").date() def _date_only(v: Any) -> str: s = str(v or "").strip() if not s: return "" s = s.replace("T", " ") if " " in s: s = s.split(" ", 1)[0] return s def _to_decimal(v: Any) -> Decimal: if v is None: return Decimal("0") try: return Decimal(str(v).strip()) except (InvalidOperation, ValueError): return Decimal("0") def _decimal_to_str(v: Decimal) -> str: # OA 字段请假天数为 DECIMAL(?,1),统一保留 1 位小数(如 1.0) try: q = v.quantize(Decimal("0.1")) except Exception: q = Decimal("0.0") return format(q, "f") def _to_int_safe(v: Any) -> int: try: return int(str(v).strip()) except Exception: return 0 class SyncEhrLeavesToOaMonthJob(BaseJob): """ EHR 请假 -> OA 月度同步(按工号+日期汇总): - 接口:POST /AttendanceOpen/api/v1/Vacation/GetListByDate - 统计范围:执行日所在月份的 1 号到执行日 - 唯一键:工号 + 日期 """ job_id = "sync_ehr_leaves_to_oa.sync_monthly" def run(self, params: dict[str, Any], secrets: dict[str, Any]) -> dict[str, Any]: oa_base_url = str(params.get("oa_base_url") or "").strip() oa_form_code = str(params.get("oa_form_code") or "").strip() oa_right_id = str(params.get("oa_right_id") or "").strip() oa_login_name = str(params.get("oa_login_name") or "").strip() if not oa_base_url: raise ValueError("public_cfg.oa_base_url is required") if not oa_form_code: raise ValueError("public_cfg.oa_form_code is required") if not oa_right_id: raise ValueError("public_cfg.oa_right_id is required") if not oa_login_name: raise ValueError("public_cfg.oa_login_name is required") oa_template_code = str(params.get("oa_template_code") or oa_form_code).strip() run_date = _to_date(params.get("run_date")) month_start = run_date.replace(day=1) month_end = run_date display_job_no = str(params.get("oa_display_job_no") or "工号").strip() display_leave_date = str(params.get("oa_display_leave_date") or "请假日期").strip() display_leave_days = str(params.get("oa_display_leave_days") or "请假天数").strip() display_name = str(params.get("oa_display_name") or "姓名").strip() if not display_job_no or not display_leave_date or not display_leave_days: raise ValueError("oa_display_job_no/oa_display_leave_date/oa_display_leave_days cannot be empty") page_size = int(params.get("page_size") or 100) if page_size <= 0: page_size = 100 batch_size = int(params.get("batch_size") or 100) if batch_size <= 0: batch_size = 100 max_pages_per_day = int(params.get("max_pages_per_day") or 500) if max_pages_per_day <= 0: max_pages_per_day = 500 approved_statuses_param = params.get("approved_statuses") if approved_statuses_param is None: approved_statuses = {"通过", "已通过"} elif isinstance(approved_statuses_param, list): approved_statuses = {str(x).strip() for x in approved_statuses_param if str(x).strip()} else: approved_statuses = {str(approved_statuses_param).strip()} if str(approved_statuses_param).strip() else set() rest_user = str(secrets.get("rest_user") or "").strip() rest_password = str(secrets.get("rest_password") or "").strip() login_name = secrets.get("loginName") login_name = str(login_name).strip() if login_name else None if not rest_user or not rest_password: raise ValueError("secret_cfg.rest_user and secret_cfg.rest_password are required") app_key = str(secrets.get("app_key") or "").strip() app_secret = str(secrets.get("app_secret") or "").strip() if not app_key or not app_secret: raise ValueError("secret_cfg.app_key and secret_cfg.app_secret are required") sender_login_name = params.get("senderLoginName") sender_login_name = str(sender_login_name).strip() if sender_login_name else None do_trigger = params.get("doTrigger") seeyon = SeeyonClient(base_url=oa_base_url, rest_user=rest_user, rest_password=rest_password, loginName=login_name) ehr = SyncEhrLeavesToOaApi( secret_params={"app_key": app_key, "app_secret": app_secret}, sqlserver_params=_OA_SQLSERVER_PARAMS, ) try: ehr.ping_sqlserver() logger.info( "SQLServer 连通性检查通过:host=%s db=%s table=%s", _OA_SQLSERVER_PARAMS["host"], _OA_SQLSERVER_PARAMS["database"], _OA_SQLSERVER_TABLE, ) exp_resp = seeyon.export_cap4_form_soap( templateCode=oa_template_code, senderLoginName=sender_login_name, rightId=oa_right_id, ) raw = exp_resp.text or "" payload = json.loads(raw) if raw else {} code = payload.get("code") if code not in (None, 0, "0"): raise RuntimeError(f"OA export failed code={code!r} message={payload.get('message')!r}") outer = payload.get("data") or {} form = outer.get("data") or {} if not isinstance(form, dict): raise RuntimeError("OA export invalid: data.data is not an object") definition = form.get("definition") or {} fields = definition.get("fields") or [] if not isinstance(fields, list): raise RuntimeError("OA export invalid: definition.fields is not a list") display_to_code: dict[str, str] = {} for f in fields: if not isinstance(f, dict): continue display = str(f.get("display") or "").strip() name = str(f.get("name") or "").strip() if display and name: display_to_code[display] = name needed = [display_job_no, display_leave_date, display_leave_days] missing = [x for x in needed if x not in display_to_code] if missing: raise RuntimeError(f"OA export invalid: missing form fields by display names: {missing}") field_job_no = display_to_code[display_job_no] field_leave_date = display_to_code[display_leave_date] field_leave_days = display_to_code[display_leave_days] field_name = display_to_code.get(display_name) master_table_name = _OA_SQLSERVER_TABLE vacations = ehr.get_vacations_in_date_range( start_date=month_start, end_date=month_end, page_size=page_size, max_pages_per_day=max_pages_per_day, ) staff_ids: set[int] = set() for v in vacations: sid = _to_int_safe(v.get("staffId")) if sid > 0: staff_ids.add(sid) staff_brief_map = ehr.get_staff_briefs_by_user_ids(user_ids=list(staff_ids)) agg: dict[tuple[str, str], Decimal] = {} name_map: dict[tuple[str, str], str] = {} skipped_no_job_no = 0 skipped_bad_date = 0 skipped_not_approved = 0 skipped_non_positive = 0 for item in vacations: approve_status = str(item.get("approveStatus") or "").strip() if approved_statuses and approve_status not in approved_statuses: skipped_not_approved += 1 continue sid = _to_int_safe(item.get("staffId")) staff_brief = staff_brief_map.get(sid) or {} job_no = str(staff_brief.get("job_no") or "").strip() staff_name = str(staff_brief.get("name") or "").strip() if not job_no: skipped_no_job_no += 1 continue leave_date = _date_only(item.get("vacationStartDateTime")) if not leave_date: skipped_bad_date += 1 continue try: leave_date_obj = datetime.strptime(leave_date, "%Y-%m-%d").date() except Exception: skipped_bad_date += 1 continue if leave_date_obj < month_start or leave_date_obj > month_end: skipped_bad_date += 1 continue day_value = _to_decimal(item.get("dayValueOfDuration")) if day_value <= Decimal("0"): skipped_non_positive += 1 continue key = (job_no, leave_date) exists = agg.get(key) if exists is None: agg[key] = day_value else: agg[key] = exists + day_value # 姓名按工号+日期保留一个非空值(用于 OA 展示) if staff_name and key not in name_map: name_map[key] = staff_name existing_id_map = ehr.get_oa_row_id_map_by_job_and_date( table_name=_OA_SQLSERVER_TABLE, schema=_OA_SQLSERVER_SCHEMA, job_no_column=field_job_no, date_column=field_leave_date, start_date=month_start, end_date=month_end, ) data_list: list[dict[str, Any]] = [] to_update = 0 to_insert = 0 for (job_no, leave_date), leave_days in sorted(agg.items(), key=lambda x: (x[0][0], x[0][1])): leave_date_value = f"{leave_date} 00:00:00" fields_payload = [ {"name": field_job_no, "value": job_no, "showValue": job_no}, {"name": field_leave_date, "value": leave_date_value, "showValue": leave_date}, {"name": field_leave_days, "value": _decimal_to_str(leave_days), "showValue": _decimal_to_str(leave_days)}, ] if field_name: name_val = name_map.get((job_no, leave_date), "") fields_payload.append({"name": field_name, "value": name_val, "showValue": name_val}) record: dict[str, Any] = { "fields": fields_payload, } existing_id = existing_id_map.get((job_no, leave_date)) if existing_id is not None: record["id"] = existing_id to_update += 1 else: to_insert += 1 data_list.append( { "masterTable": { "name": master_table_name, "record": record, "changedFields": [f["name"] for f in fields_payload], }, "subTables": [], } ) success_count = 0 failed_count = 0 failed_data: dict[str, str] = {} do_trigger_bool: bool | None if do_trigger is None: do_trigger_bool = None else: do_trigger_bool = str(do_trigger).strip().lower() in ("1", "true", "yes", "y", "on") for i in range(0, len(data_list), batch_size): chunk = data_list[i : i + batch_size] if not chunk: continue try: resp = seeyon.batch_update_cap4_form_soap( formCode=oa_form_code, loginName=oa_login_name, rightId=oa_right_id, dataList=chunk, uniqueFiled=[field_job_no, field_leave_date], doTrigger=do_trigger_bool, ) except httpx.HTTPStatusError as e: resp_text = "" try: resp_text = str((e.response.text or "")[:2000]) except Exception: resp_text = "" first_row = chunk[0] if chunk else {} raise RuntimeError( "OA batch-update HTTP error " f"status={getattr(e.response, 'status_code', None)!r} " f"body_preview={resp_text!r} " f"first_row={json.dumps(first_row, ensure_ascii=False, default=str)[:2000]}" ) from e rj = resp.json() if resp.content else {} code_local = int(rj.get("code", -1)) if code_local != 0: raise RuntimeError(f"OA batch-update failed code={code_local} message={rj.get('message')!r}") data_local = rj.get("data") or {} chunk_success = int(data_local.get("successCount", 0) or 0) chunk_failed = int(data_local.get("failCount", 0) or 0) success_count += chunk_success failed_count += chunk_failed fd = data_local.get("failedData") or {} if isinstance(fd, dict): for k, v in fd.items(): if str(k) not in failed_data: failed_data[str(k)] = str(v) logger.info( "EHR 请假月度同步完成:month=%s~%s vacations=%s aggregated=%s to_update=%s to_insert=%s success=%s failed=%s", month_start.isoformat(), month_end.isoformat(), len(vacations), len(agg), to_update, to_insert, success_count, failed_count, ) return { "month_start": month_start.isoformat(), "month_end": month_end.isoformat(), "vacation_records": len(vacations), "aggregated_records": len(agg), "to_update": to_update, "to_insert": to_insert, "success_count": success_count, "failed_count": failed_count, "skipped_no_job_no": skipped_no_job_no, "skipped_bad_date": skipped_bad_date, "skipped_not_approved": skipped_not_approved, "skipped_non_positive": skipped_non_positive, "failed_data": dict(list(failed_data.items())[:100]), } finally: ehr.close() seeyon.close()