diff --git a/extensions/sync_ehr_leaves_to_oa/__init__.py b/extensions/sync_ehr_leaves_to_oa/__init__.py new file mode 100644 index 0000000..c02ac49 --- /dev/null +++ b/extensions/sync_ehr_leaves_to_oa/__init__.py @@ -0,0 +1,6 @@ +"""EHR 请假到 OA 月度汇总同步扩展。""" + +from extensions.sync_ehr_leaves_to_oa.api import SyncEhrLeavesToOaApi +from extensions.sync_ehr_leaves_to_oa.job import SyncEhrLeavesToOaMonthJob + +__all__ = ["SyncEhrLeavesToOaApi", "SyncEhrLeavesToOaMonthJob"] diff --git a/extensions/sync_ehr_leaves_to_oa/api.py b/extensions/sync_ehr_leaves_to_oa/api.py new file mode 100644 index 0000000..403fdd0 --- /dev/null +++ b/extensions/sync_ehr_leaves_to_oa/api.py @@ -0,0 +1,342 @@ +from __future__ import annotations + +import logging +from datetime import date, timedelta +from typing import Any +from urllib.parse import quote_plus + +from sqlalchemy import create_engine, text +from sqlalchemy.engine import Engine + +from app.integrations.ehr import EhrClient + + +logger = logging.getLogger("connecthub.extensions.sync_ehr_leaves_to_oa") + + +def _to_int_safe(v: Any) -> int: + try: + return int(str(v).strip()) + except Exception: + return 0 + + +def _extract_staff_code(staff_profile: dict[str, Any]) -> str: + if not isinstance(staff_profile, dict): + return "" + for key in ( + "staffCode", + "StaffCode", + "code", + "Code", + "jobNumber", + "JobNumber", + "employeeNo", + "EmployeeNo", + ): + val = str(staff_profile.get(key) or "").strip() + if val: + return val + lower_map = {str(k).lower(): v for k, v in staff_profile.items()} + for key in ("staffcode", "code", "jobnumber", "employeeno"): + val = str(lower_map.get(key) or "").strip() + if val: + return val + return "" + + +def _extract_staff_name(staff_profile: dict[str, Any]) -> str: + if not isinstance(staff_profile, dict): + return "" + for key in ("name", "Name", "staffName", "StaffName", "employeeName", "EmployeeName"): + val = str(staff_profile.get(key) or "").strip() + if val: + return val + lower_map = {str(k).lower(): v for k, v in staff_profile.items()} + for key in ("name", "staffname", "employeename"): + val = str(lower_map.get(key) or "").strip() + if val: + return val + return "" + + +class SyncEhrLeavesToOaApi: + def __init__( + self, + *, + secret_params: dict[str, str], + sqlserver_params: dict[str, Any], + base_url: str = "https://openapi.italent.cn", + timeout_s: float = 10.0, + retries: int = 2, + retry_backoff_s: float = 0.5, + ) -> None: + self._client = EhrClient( + base_url=base_url, + secret_params=secret_params, + timeout_s=timeout_s, + retries=retries, + retry_backoff_s=retry_backoff_s, + ) + self._sql_engine = self._create_sqlserver_engine(sqlserver_params) + + def close(self) -> None: + self._client.close() + self._sql_engine.dispose() + + @staticmethod + def _create_sqlserver_engine(sqlserver_params: dict[str, Any]) -> Engine: + host = str(sqlserver_params.get("host") or "").strip() + username = str(sqlserver_params.get("username") or "").strip() + password = str(sqlserver_params.get("password") or "").strip() + database = str(sqlserver_params.get("database") or "").strip() + if not host or not username or not password or not database: + raise ValueError("sqlserver_params.host/username/password/database are required") + + port = int(sqlserver_params.get("port") or 1433) + timeout_s = int(sqlserver_params.get("connect_timeout_s") or 10) + driver = str(sqlserver_params.get("driver") or "ODBC Driver 18 for SQL Server").strip() + trust_server_certificate = str(sqlserver_params.get("trust_server_certificate") or "yes").strip().lower() + encrypt = str(sqlserver_params.get("encrypt") or "yes").strip().lower() + + username_q = quote_plus(username) + password_q = quote_plus(password) + driver_q = quote_plus(driver) + conn_url = ( + f"mssql+pyodbc://{username_q}:{password_q}@{host}:{port}/{database}" + f"?driver={driver_q}&TrustServerCertificate={trust_server_certificate}&Encrypt={encrypt}" + ) + return create_engine(conn_url, pool_pre_ping=True, pool_recycle=1800, connect_args={"timeout": timeout_s}) + + def ping_sqlserver(self) -> bool: + with self._sql_engine.connect() as conn: + conn.execute(text("SELECT 1")) + return True + + def get_vacation_list_by_day( + self, + *, + day: date, + page_size: int = 100, + max_pages: int = 500, + ) -> list[dict[str, Any]]: + if page_size <= 0 or page_size > 100: + raise ValueError("page_size must be in range [1, 100]") + if max_pages <= 0: + raise ValueError("max_pages must be > 0") + + out: list[dict[str, Any]] = [] + cursor: str | None = None + page = 0 + while True: + page += 1 + if page > max_pages: + raise RuntimeError(f"EHR Vacation.GetListByDate exceeds max_pages={max_pages} day={day.isoformat()}") + + body: dict[str, Any] = { + "day": day.isoformat(), + "queryCursor": cursor, + "pageSize": page_size, + } + resp = self._client.request( + "POST", + "/AttendanceOpen/api/v1/Vacation/GetListByDate", + json=body, + headers={"Content-Type": "application/json"}, + ) + payload = resp.json() if resp.content else {} + code = str(payload.get("code") or "") + if code not in ("200", "206"): + raise RuntimeError(f"EHR Vacation.GetListByDate failed code={code!r} message={payload.get('message')!r}") + + data = payload.get("data") or {} + if not isinstance(data, dict): + raise RuntimeError("EHR Vacation.GetListByDate invalid: data is not an object") + vacation_list = data.get("vacationList") or [] + if not isinstance(vacation_list, list): + raise RuntimeError("EHR Vacation.GetListByDate invalid: data.vacationList is not a list") + out.extend([x for x in vacation_list if isinstance(x, dict)]) + + is_last_page = bool(data.get("isLastPage", False)) + next_cursor = str(data.get("sortCursor") or "").strip() or None + logger.info( + "EHR Vacation.GetListByDate day=%s page=%s batch=%s is_last_page=%s", + day.isoformat(), + page, + len(vacation_list), + is_last_page, + ) + if is_last_page: + break + cursor = next_cursor + if not cursor: + break + return out + + def get_vacations_in_date_range( + self, + *, + start_date: date, + end_date: date, + page_size: int = 100, + max_pages_per_day: int = 500, + ) -> list[dict[str, Any]]: + if end_date < start_date: + raise ValueError("end_date must be greater than or equal to start_date") + out: list[dict[str, Any]] = [] + cur = start_date + while cur <= end_date: + out.extend( + self.get_vacation_list_by_day( + day=cur, + page_size=page_size, + max_pages=max_pages_per_day, + ) + ) + cur = cur + timedelta(days=1) + logger.info( + "EHR 请假拉取完成:start_date=%s end_date=%s total_records=%s", + start_date.isoformat(), + end_date.isoformat(), + len(out), + ) + return out + + def get_staff_briefs_by_user_ids(self, *, user_ids: list[int], chunk_size: int = 100) -> dict[int, dict[str, str]]: + if chunk_size <= 0: + chunk_size = 100 + clean_ids: list[int] = [] + seen: set[int] = set() + for u in user_ids: + uid = _to_int_safe(u) + if uid <= 0 or uid in seen: + continue + seen.add(uid) + clean_ids.append(uid) + if not clean_ids: + return {} + + out: dict[int, dict[str, str]] = {} + for i in range(0, len(clean_ids), chunk_size): + chunk = clean_ids[i : i + chunk_size] + for uid in chunk: + profile: dict[str, Any] | None = None + for params in ({"userId": str(uid)}, {"userid": str(uid)}): + try: + resp = self._client.request( + "GET", + "/UserFrameworkApiV3/api/v1/staffs/Get", + params=params, + ) + except Exception: + continue + payload = resp.json() if resp.content else {} + profile = self._find_staff_profile_by_uid(payload, uid) + if profile is not None: + break + if profile is None: + continue + code = _extract_staff_code(profile) + name = _extract_staff_name(profile) + if code or name: + out[uid] = {"job_no": code, "name": name} + logger.info( + "EHR 员工信息反查完成:input_user_ids=%s matched_staff_profiles=%s", + len(clean_ids), + len(out), + ) + return out + + def get_staff_codes_by_user_ids(self, *, user_ids: list[int], chunk_size: int = 100) -> dict[int, str]: + briefs = self.get_staff_briefs_by_user_ids(user_ids=user_ids, chunk_size=chunk_size) + out: dict[int, str] = {} + for uid, brief in briefs.items(): + code = str((brief or {}).get("job_no") or "").strip() + if code: + out[uid] = code + return out + + @staticmethod + def _find_staff_profile_by_uid(payload: Any, uid: int) -> dict[str, Any] | None: + def _iter_dicts(node: Any): + if isinstance(node, dict): + yield node + for v in node.values(): + yield from _iter_dicts(v) + elif isinstance(node, list): + for it in node: + yield from _iter_dicts(it) + + def _uid_from_dict(d: dict[str, Any]) -> int: + for k in ("userId", "UserId", "userid", "UserID", "id", "Id", "ID"): + if k in d: + return _to_int_safe(d.get(k)) + return 0 + + best: dict[str, Any] | None = None + for d in _iter_dicts(payload): + if not isinstance(d, dict): + continue + if _uid_from_dict(d) != uid: + continue + if any(k in d for k in ("staffCode", "StaffCode", "code", "Code", "jobNumber", "JobNumber", "employeeNo", "EmployeeNo")): + return d + if best is None: + best = d + return best + + def get_oa_row_id_map_by_job_and_date( + self, + *, + table_name: str, + schema: str, + job_no_column: str, + date_column: str, + start_date: date, + end_date: date, + ) -> dict[tuple[str, str], int]: + t = str(table_name or "").strip() + jc = str(job_no_column or "").strip() + dc = str(date_column or "").strip() + s = str(schema or "").strip() or "dbo" + if not t or not jc or not dc: + raise ValueError("table_name/job_no_column/date_column are required") + + sql = text( + f"SELECT [id] AS row_id, [{jc}] AS job_no, [{dc}] AS leave_date " + f"FROM [{s}].[{t}] WITH (NOLOCK) " + f"WHERE TRY_CONVERT(date, [{dc}]) >= :start_date " + f"AND TRY_CONVERT(date, [{dc}]) <= :end_date " + f"AND ISNULL(LTRIM(RTRIM([{jc}])), '') <> ''" + ) + params = { + "start_date": start_date.isoformat(), + "end_date": end_date.isoformat(), + } + out: dict[tuple[str, str], int] = {} + duplicate_keys = 0 + with self._sql_engine.connect() as conn: + rows = conn.execute(sql, params).fetchall() + for r in rows: + try: + row_id = int(r.row_id) + except Exception: + continue + job_no = str(r.job_no or "").strip() + leave_date = str(r.leave_date or "").strip() + leave_date = leave_date.split(" ", 1)[0] + if not job_no or not leave_date: + continue + key = (job_no, leave_date) + if key in out and out[key] != row_id: + duplicate_keys += 1 + out[key] = row_id + logger.info( + "OA 现有记录索引完成:table=%s month_range=%s~%s indexed=%s duplicate_keys=%s", + t, + start_date.isoformat(), + end_date.isoformat(), + len(out), + duplicate_keys, + ) + return out diff --git a/extensions/sync_ehr_leaves_to_oa/job.py b/extensions/sync_ehr_leaves_to_oa/job.py new file mode 100644 index 0000000..b46a942 --- /dev/null +++ b/extensions/sync_ehr_leaves_to_oa/job.py @@ -0,0 +1,373 @@ +from __future__ import annotations + +import json +import logging +from datetime import date, datetime +from decimal import Decimal, InvalidOperation +from typing import Any + +from app.integrations.seeyon import SeeyonClient +from app.jobs.base import BaseJob +from extensions.sync_ehr_leaves_to_oa.api import SyncEhrLeavesToOaApi + + +logger = logging.getLogger("connecthub.extensions.sync_ehr_leaves_to_oa") + +# OA SQLServer(与 sync_ehr_to_oa 同源) +_OA_SQLSERVER_PARAMS: dict[str, Any] = { + "host": "192.168.30.108", + "port": 1433, + "database": "seeyon", + "username": "SHOADB91", + "password": "E7nZ8x@12", + "driver": "ODBC Driver 18 for SQL Server", + "encrypt": "no", + "trust_server_certificate": "yes", + "connect_timeout_s": 10, +} +_OA_SQLSERVER_SCHEMA = "dbo" +_OA_SQLSERVER_TABLE = "formmain_20250360" + + +def _to_date(v: Any) -> date: + if isinstance(v, date) and not isinstance(v, datetime): + return v + if isinstance(v, datetime): + return v.date() + s = str(v or "").strip() + if not s: + return date.today() + s = s.replace("T", " ") + if " " in s: + s = s.split(" ", 1)[0] + return datetime.strptime(s, "%Y-%m-%d").date() + + +def _date_only(v: Any) -> str: + s = str(v or "").strip() + if not s: + return "" + s = s.replace("T", " ") + if " " in s: + s = s.split(" ", 1)[0] + return s + + +def _to_decimal(v: Any) -> Decimal: + if v is None: + return Decimal("0") + try: + return Decimal(str(v).strip()) + except (InvalidOperation, ValueError): + return Decimal("0") + + +def _decimal_to_str(v: Decimal) -> str: + if v == v.to_integral(): + return str(int(v)) + s = format(v.normalize(), "f") + if "." in s: + s = s.rstrip("0").rstrip(".") + return s + + +def _to_int_safe(v: Any) -> int: + try: + return int(str(v).strip()) + except Exception: + return 0 + + +class SyncEhrLeavesToOaMonthJob(BaseJob): + """ + EHR 请假 -> OA 月度同步(按工号+日期汇总): + - 接口:POST /AttendanceOpen/api/v1/Vacation/GetListByDate + - 统计范围:执行日所在月份的 1 号到执行日 + - 唯一键:工号 + 日期 + """ + + job_id = "sync_ehr_leaves_to_oa.sync_monthly" + + def run(self, params: dict[str, Any], secrets: dict[str, Any]) -> dict[str, Any]: + oa_base_url = str(params.get("oa_base_url") or "").strip() + oa_form_code = str(params.get("oa_form_code") or "").strip() + oa_right_id = str(params.get("oa_right_id") or "").strip() + oa_login_name = str(params.get("oa_login_name") or "").strip() + if not oa_base_url: + raise ValueError("public_cfg.oa_base_url is required") + if not oa_form_code: + raise ValueError("public_cfg.oa_form_code is required") + if not oa_right_id: + raise ValueError("public_cfg.oa_right_id is required") + if not oa_login_name: + raise ValueError("public_cfg.oa_login_name is required") + + oa_template_code = str(params.get("oa_template_code") or oa_form_code).strip() + run_date = _to_date(params.get("run_date")) + month_start = run_date.replace(day=1) + month_end = run_date + + display_job_no = str(params.get("oa_display_job_no") or "工号").strip() + display_leave_date = str(params.get("oa_display_leave_date") or "请假日期").strip() + display_leave_days = str(params.get("oa_display_leave_days") or "请假天数").strip() + display_name = str(params.get("oa_display_name") or "姓名").strip() + if not display_job_no or not display_leave_date or not display_leave_days: + raise ValueError("oa_display_job_no/oa_display_leave_date/oa_display_leave_days cannot be empty") + + page_size = int(params.get("page_size") or 100) + if page_size <= 0: + page_size = 100 + batch_size = int(params.get("batch_size") or 100) + if batch_size <= 0: + batch_size = 100 + max_pages_per_day = int(params.get("max_pages_per_day") or 500) + if max_pages_per_day <= 0: + max_pages_per_day = 500 + + approved_statuses_param = params.get("approved_statuses") + if approved_statuses_param is None: + approved_statuses = {"通过", "已通过"} + elif isinstance(approved_statuses_param, list): + approved_statuses = {str(x).strip() for x in approved_statuses_param if str(x).strip()} + else: + approved_statuses = {str(approved_statuses_param).strip()} if str(approved_statuses_param).strip() else set() + + rest_user = str(secrets.get("rest_user") or "").strip() + rest_password = str(secrets.get("rest_password") or "").strip() + login_name = secrets.get("loginName") + login_name = str(login_name).strip() if login_name else None + if not rest_user or not rest_password: + raise ValueError("secret_cfg.rest_user and secret_cfg.rest_password are required") + + app_key = str(secrets.get("app_key") or "").strip() + app_secret = str(secrets.get("app_secret") or "").strip() + if not app_key or not app_secret: + raise ValueError("secret_cfg.app_key and secret_cfg.app_secret are required") + + sender_login_name = params.get("senderLoginName") + sender_login_name = str(sender_login_name).strip() if sender_login_name else None + do_trigger = params.get("doTrigger") + + seeyon = SeeyonClient(base_url=oa_base_url, rest_user=rest_user, rest_password=rest_password, loginName=login_name) + ehr = SyncEhrLeavesToOaApi( + secret_params={"app_key": app_key, "app_secret": app_secret}, + sqlserver_params=_OA_SQLSERVER_PARAMS, + ) + try: + ehr.ping_sqlserver() + logger.info( + "SQLServer 连通性检查通过:host=%s db=%s table=%s", + _OA_SQLSERVER_PARAMS["host"], + _OA_SQLSERVER_PARAMS["database"], + _OA_SQLSERVER_TABLE, + ) + + exp_resp = seeyon.export_cap4_form_soap( + templateCode=oa_template_code, + senderLoginName=sender_login_name, + rightId=oa_right_id, + ) + raw = exp_resp.text or "" + payload = json.loads(raw) if raw else {} + code = payload.get("code") + if code not in (None, 0, "0"): + raise RuntimeError(f"OA export failed code={code!r} message={payload.get('message')!r}") + outer = payload.get("data") or {} + form = outer.get("data") or {} + if not isinstance(form, dict): + raise RuntimeError("OA export invalid: data.data is not an object") + + definition = form.get("definition") or {} + fields = definition.get("fields") or [] + if not isinstance(fields, list): + raise RuntimeError("OA export invalid: definition.fields is not a list") + + display_to_code: dict[str, str] = {} + for f in fields: + if not isinstance(f, dict): + continue + display = str(f.get("display") or "").strip() + name = str(f.get("name") or "").strip() + if display and name: + display_to_code[display] = name + + needed = [display_job_no, display_leave_date, display_leave_days] + missing = [x for x in needed if x not in display_to_code] + if missing: + raise RuntimeError(f"OA export invalid: missing form fields by display names: {missing}") + + field_job_no = display_to_code[display_job_no] + field_leave_date = display_to_code[display_leave_date] + field_leave_days = display_to_code[display_leave_days] + field_name = display_to_code.get(display_name) + master_table_name = _OA_SQLSERVER_TABLE + + vacations = ehr.get_vacations_in_date_range( + start_date=month_start, + end_date=month_end, + page_size=page_size, + max_pages_per_day=max_pages_per_day, + ) + + staff_ids: set[int] = set() + for v in vacations: + sid = _to_int_safe(v.get("staffId")) + if sid > 0: + staff_ids.add(sid) + staff_brief_map = ehr.get_staff_briefs_by_user_ids(user_ids=list(staff_ids)) + + agg: dict[tuple[str, str], Decimal] = {} + name_map: dict[tuple[str, str], str] = {} + skipped_no_job_no = 0 + skipped_bad_date = 0 + skipped_not_approved = 0 + skipped_non_positive = 0 + for item in vacations: + approve_status = str(item.get("approveStatus") or "").strip() + if approved_statuses and approve_status not in approved_statuses: + skipped_not_approved += 1 + continue + + sid = _to_int_safe(item.get("staffId")) + staff_brief = staff_brief_map.get(sid) or {} + job_no = str(staff_brief.get("job_no") or "").strip() + staff_name = str(staff_brief.get("name") or "").strip() + if not job_no: + skipped_no_job_no += 1 + continue + + leave_date = _date_only(item.get("vacationStartDateTime")) + if not leave_date: + skipped_bad_date += 1 + continue + try: + leave_date_obj = datetime.strptime(leave_date, "%Y-%m-%d").date() + except Exception: + skipped_bad_date += 1 + continue + if leave_date_obj < month_start or leave_date_obj > month_end: + skipped_bad_date += 1 + continue + + day_value = _to_decimal(item.get("dayValueOfDuration")) + if day_value <= Decimal("0"): + skipped_non_positive += 1 + continue + + key = (job_no, leave_date) + exists = agg.get(key) + if exists is None: + agg[key] = day_value + else: + agg[key] = exists + day_value + # 姓名按工号+日期保留一个非空值(用于 OA 展示) + if staff_name and key not in name_map: + name_map[key] = staff_name + + existing_id_map = ehr.get_oa_row_id_map_by_job_and_date( + table_name=_OA_SQLSERVER_TABLE, + schema=_OA_SQLSERVER_SCHEMA, + job_no_column=field_job_no, + date_column=field_leave_date, + start_date=month_start, + end_date=month_end, + ) + + data_list: list[dict[str, Any]] = [] + to_update = 0 + to_insert = 0 + for (job_no, leave_date), leave_days in sorted(agg.items(), key=lambda x: (x[0][0], x[0][1])): + fields_payload = [ + {"name": field_job_no, "value": job_no, "showValue": job_no}, + {"name": field_leave_date, "value": leave_date, "showValue": leave_date}, + {"name": field_leave_days, "value": _decimal_to_str(leave_days), "showValue": _decimal_to_str(leave_days)}, + ] + if field_name: + name_val = name_map.get((job_no, leave_date), "") + fields_payload.append({"name": field_name, "value": name_val, "showValue": name_val}) + record: dict[str, Any] = { + "fields": fields_payload, + } + existing_id = existing_id_map.get((job_no, leave_date)) + if existing_id is not None: + record["id"] = existing_id + to_update += 1 + else: + to_insert += 1 + + data_list.append( + { + "masterTable": { + "name": master_table_name, + "record": record, + "changedFields": [f["name"] for f in fields_payload], + }, + "subTables": [], + } + ) + + success_count = 0 + failed_count = 0 + failed_data: dict[str, str] = {} + do_trigger_bool: bool | None + if do_trigger is None: + do_trigger_bool = None + else: + do_trigger_bool = str(do_trigger).strip().lower() in ("1", "true", "yes", "y", "on") + + for i in range(0, len(data_list), batch_size): + chunk = data_list[i : i + batch_size] + if not chunk: + continue + resp = seeyon.batch_update_cap4_form_soap( + formCode=oa_form_code, + loginName=oa_login_name, + rightId=oa_right_id, + dataList=chunk, + uniqueFiled=None, + doTrigger=do_trigger_bool, + ) + rj = resp.json() if resp.content else {} + code_local = int(rj.get("code", -1)) + if code_local != 0: + raise RuntimeError(f"OA batch-update failed code={code_local} message={rj.get('message')!r}") + data_local = rj.get("data") or {} + chunk_success = int(data_local.get("successCount", 0) or 0) + chunk_failed = int(data_local.get("failCount", 0) or 0) + success_count += chunk_success + failed_count += chunk_failed + fd = data_local.get("failedData") or {} + if isinstance(fd, dict): + for k, v in fd.items(): + if str(k) not in failed_data: + failed_data[str(k)] = str(v) + + logger.info( + "EHR 请假月度同步完成:month=%s~%s vacations=%s aggregated=%s to_update=%s to_insert=%s success=%s failed=%s", + month_start.isoformat(), + month_end.isoformat(), + len(vacations), + len(agg), + to_update, + to_insert, + success_count, + failed_count, + ) + return { + "month_start": month_start.isoformat(), + "month_end": month_end.isoformat(), + "vacation_records": len(vacations), + "aggregated_records": len(agg), + "to_update": to_update, + "to_insert": to_insert, + "success_count": success_count, + "failed_count": failed_count, + "skipped_no_job_no": skipped_no_job_no, + "skipped_bad_date": skipped_bad_date, + "skipped_not_approved": skipped_not_approved, + "skipped_non_positive": skipped_non_positive, + "failed_data": dict(list(failed_data.items())[:100]), + } + finally: + ehr.close() + seeyon.close()