Vastai-ConnectHub/extensions/sync_ehr_leaves_to_oa/job.py

414 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import json
import logging
from datetime import date, datetime
from decimal import Decimal, InvalidOperation
from typing import Any
import httpx
from app.integrations.seeyon import SeeyonClient
from app.jobs.base import BaseJob
from extensions.sync_ehr_leaves_to_oa.api import SyncEhrLeavesToOaApi
logger = logging.getLogger("connecthub.extensions.sync_ehr_leaves_to_oa")
# OA SQLServer与 sync_ehr_to_oa 同源)
_OA_SQLSERVER_PARAMS: dict[str, Any] = {
"host": "192.168.30.108",
"port": 1433,
"database": "seeyon",
"username": "SHOADB91",
"password": "E7nZ8x@12",
"driver": "ODBC Driver 18 for SQL Server",
"encrypt": "no",
"trust_server_certificate": "yes",
"connect_timeout_s": 10,
}
_OA_SQLSERVER_SCHEMA = "dbo"
_OA_SQLSERVER_TABLE = "formmain_20250360"
def _to_date(v: Any) -> date:
if isinstance(v, date) and not isinstance(v, datetime):
return v
if isinstance(v, datetime):
return v.date()
s = str(v or "").strip()
if not s:
return date.today()
s = s.replace("T", " ")
if " " in s:
s = s.split(" ", 1)[0]
return datetime.strptime(s, "%Y-%m-%d").date()
def _date_only(v: Any) -> str:
s = str(v or "").strip()
if not s:
return ""
s = s.replace("T", " ")
if " " in s:
s = s.split(" ", 1)[0]
return s
def _to_decimal(v: Any) -> Decimal:
if v is None:
return Decimal("0")
try:
return Decimal(str(v).strip())
except (InvalidOperation, ValueError):
return Decimal("0")
def _decimal_to_str(v: Decimal) -> str:
# OA 字段请假天数为 DECIMAL(?,1),统一保留 1 位小数(如 1.0
try:
q = v.quantize(Decimal("0.1"))
except Exception:
q = Decimal("0.0")
return format(q, "f")
def _to_int_safe(v: Any) -> int:
try:
return int(str(v).strip())
except Exception:
return 0
def _normalize_decimal_1(v: Any) -> str:
return _decimal_to_str(_to_decimal(v))
class SyncEhrLeavesToOaMonthJob(BaseJob):
"""
EHR 请假 -> OA 月度同步(按工号+日期汇总):
- 接口POST /AttendanceOpen/api/v1/Vacation/GetListByDate
- 统计范围:执行日所在月份的 1 号到执行日
- 唯一键:工号 + 日期
"""
job_id = "sync_ehr_leaves_to_oa.sync_monthly"
def run(self, params: dict[str, Any], secrets: dict[str, Any]) -> dict[str, Any]:
oa_base_url = str(params.get("oa_base_url") or "").strip()
oa_form_code = str(params.get("oa_form_code") or "").strip()
oa_right_id = str(params.get("oa_right_id") or "").strip()
oa_login_name = str(params.get("oa_login_name") or "").strip()
if not oa_base_url:
raise ValueError("public_cfg.oa_base_url is required")
if not oa_form_code:
raise ValueError("public_cfg.oa_form_code is required")
if not oa_right_id:
raise ValueError("public_cfg.oa_right_id is required")
if not oa_login_name:
raise ValueError("public_cfg.oa_login_name is required")
oa_template_code = str(params.get("oa_template_code") or oa_form_code).strip()
run_date = _to_date(params.get("run_date"))
month_start = run_date.replace(day=1)
month_end = run_date
display_job_no = str(params.get("oa_display_job_no") or "工号").strip()
display_leave_date = str(params.get("oa_display_leave_date") or "请假日期").strip()
display_leave_days = str(params.get("oa_display_leave_days") or "请假天数").strip()
display_name = str(params.get("oa_display_name") or "姓名").strip()
if not display_job_no or not display_leave_date or not display_leave_days:
raise ValueError("oa_display_job_no/oa_display_leave_date/oa_display_leave_days cannot be empty")
page_size = int(params.get("page_size") or 100)
if page_size <= 0:
page_size = 100
batch_size = int(params.get("batch_size") or 100)
if batch_size <= 0:
batch_size = 100
max_pages_per_day = int(params.get("max_pages_per_day") or 500)
if max_pages_per_day <= 0:
max_pages_per_day = 500
approved_statuses_param = params.get("approved_statuses")
if approved_statuses_param is None:
approved_statuses = {"通过", "已通过"}
elif isinstance(approved_statuses_param, list):
approved_statuses = {str(x).strip() for x in approved_statuses_param if str(x).strip()}
else:
approved_statuses = {str(approved_statuses_param).strip()} if str(approved_statuses_param).strip() else set()
rest_user = str(secrets.get("rest_user") or "").strip()
rest_password = str(secrets.get("rest_password") or "").strip()
login_name = secrets.get("loginName")
login_name = str(login_name).strip() if login_name else None
if not rest_user or not rest_password:
raise ValueError("secret_cfg.rest_user and secret_cfg.rest_password are required")
app_key = str(secrets.get("app_key") or "").strip()
app_secret = str(secrets.get("app_secret") or "").strip()
if not app_key or not app_secret:
raise ValueError("secret_cfg.app_key and secret_cfg.app_secret are required")
sender_login_name = params.get("senderLoginName")
sender_login_name = str(sender_login_name).strip() if sender_login_name else None
do_trigger = params.get("doTrigger")
seeyon = SeeyonClient(base_url=oa_base_url, rest_user=rest_user, rest_password=rest_password, loginName=login_name)
ehr = SyncEhrLeavesToOaApi(
secret_params={"app_key": app_key, "app_secret": app_secret},
sqlserver_params=_OA_SQLSERVER_PARAMS,
)
try:
ehr.ping_sqlserver()
logger.info(
"SQLServer 连通性检查通过host=%s db=%s table=%s",
_OA_SQLSERVER_PARAMS["host"],
_OA_SQLSERVER_PARAMS["database"],
_OA_SQLSERVER_TABLE,
)
exp_resp = seeyon.export_cap4_form_soap(
templateCode=oa_template_code,
senderLoginName=sender_login_name,
rightId=oa_right_id,
)
raw = exp_resp.text or ""
payload = json.loads(raw) if raw else {}
code = payload.get("code")
if code not in (None, 0, "0"):
raise RuntimeError(f"OA export failed code={code!r} message={payload.get('message')!r}")
outer = payload.get("data") or {}
form = outer.get("data") or {}
if not isinstance(form, dict):
raise RuntimeError("OA export invalid: data.data is not an object")
definition = form.get("definition") or {}
fields = definition.get("fields") or []
if not isinstance(fields, list):
raise RuntimeError("OA export invalid: definition.fields is not a list")
display_to_code: dict[str, str] = {}
for f in fields:
if not isinstance(f, dict):
continue
display = str(f.get("display") or "").strip()
name = str(f.get("name") or "").strip()
if display and name:
display_to_code[display] = name
needed = [display_job_no, display_leave_date, display_leave_days]
missing = [x for x in needed if x not in display_to_code]
if missing:
raise RuntimeError(f"OA export invalid: missing form fields by display names: {missing}")
field_job_no = display_to_code[display_job_no]
field_leave_date = display_to_code[display_leave_date]
field_leave_days = display_to_code[display_leave_days]
field_name = display_to_code.get(display_name)
master_table_name = _OA_SQLSERVER_TABLE
vacations = ehr.get_vacations_in_date_range(
start_date=month_start,
end_date=month_end,
page_size=page_size,
max_pages_per_day=max_pages_per_day,
)
staff_ids: set[int] = set()
for v in vacations:
sid = _to_int_safe(v.get("staffId"))
if sid > 0:
staff_ids.add(sid)
staff_brief_map = ehr.get_staff_briefs_by_user_ids(user_ids=list(staff_ids))
agg: dict[tuple[str, str], Decimal] = {}
name_map: dict[tuple[str, str], str] = {}
skipped_no_job_no = 0
skipped_bad_date = 0
skipped_not_approved = 0
skipped_non_positive = 0
for item in vacations:
approve_status = str(item.get("approveStatus") or "").strip()
if approved_statuses and approve_status not in approved_statuses:
skipped_not_approved += 1
continue
sid = _to_int_safe(item.get("staffId"))
staff_brief = staff_brief_map.get(sid) or {}
job_no = str(staff_brief.get("job_no") or "").strip()
staff_name = str(staff_brief.get("name") or "").strip()
if not job_no:
skipped_no_job_no += 1
continue
leave_date = _date_only(item.get("vacationStartDateTime"))
if not leave_date:
skipped_bad_date += 1
continue
try:
leave_date_obj = datetime.strptime(leave_date, "%Y-%m-%d").date()
except Exception:
skipped_bad_date += 1
continue
if leave_date_obj < month_start or leave_date_obj > month_end:
skipped_bad_date += 1
continue
day_value = _to_decimal(item.get("dayValueOfDuration"))
if day_value <= Decimal("0"):
skipped_non_positive += 1
continue
key = (job_no, leave_date)
exists = agg.get(key)
if exists is None:
agg[key] = day_value
else:
agg[key] = exists + day_value
# 姓名按工号+日期保留一个非空值(用于 OA 展示)
if staff_name and key not in name_map:
name_map[key] = staff_name
existing_row_map = ehr.get_oa_rows_by_job_and_date(
table_name=_OA_SQLSERVER_TABLE,
schema=_OA_SQLSERVER_SCHEMA,
job_no_column=field_job_no,
date_column=field_leave_date,
leave_days_column=field_leave_days,
name_column=field_name,
start_date=month_start,
end_date=month_end,
)
data_list: list[dict[str, Any]] = []
to_update = 0
to_insert = 0
skipped_unchanged = 0
for (job_no, leave_date), leave_days in sorted(agg.items(), key=lambda x: (x[0][0], x[0][1])):
leave_date_value = f"{leave_date} 00:00:00"
leave_days_value = _decimal_to_str(leave_days)
name_val = name_map.get((job_no, leave_date), "")
existing_row = existing_row_map.get((job_no, leave_date))
if existing_row is not None:
old_days = _normalize_decimal_1(existing_row.get("leave_days"))
old_name = str(existing_row.get("name") or "").strip()
if old_days == leave_days_value and (not field_name or old_name == name_val):
skipped_unchanged += 1
continue
fields_payload = [
{"name": field_job_no, "value": job_no, "showValue": job_no},
{"name": field_leave_date, "value": leave_date_value, "showValue": leave_date},
{"name": field_leave_days, "value": leave_days_value, "showValue": leave_days_value},
]
if field_name:
fields_payload.append({"name": field_name, "value": name_val, "showValue": name_val})
record: dict[str, Any] = {
"fields": fields_payload,
}
existing_id = _to_int_safe((existing_row or {}).get("id"))
if existing_id > 0:
record["id"] = existing_id
to_update += 1
else:
to_insert += 1
data_list.append(
{
"masterTable": {
"name": master_table_name,
"record": record,
"changedFields": [f["name"] for f in fields_payload],
},
"subTables": [],
}
)
success_count = 0
failed_count = 0
failed_data: dict[str, str] = {}
do_trigger_bool: bool | None
if do_trigger is None:
do_trigger_bool = None
else:
do_trigger_bool = str(do_trigger).strip().lower() in ("1", "true", "yes", "y", "on")
for i in range(0, len(data_list), batch_size):
chunk = data_list[i : i + batch_size]
if not chunk:
continue
try:
resp = seeyon.batch_update_cap4_form_soap(
formCode=oa_form_code,
loginName=oa_login_name,
rightId=oa_right_id,
dataList=chunk,
uniqueFiled=[field_job_no, field_leave_date],
doTrigger=do_trigger_bool,
)
except httpx.HTTPStatusError as e:
resp_text = ""
try:
resp_text = str((e.response.text or "")[:2000])
except Exception:
resp_text = ""
first_row = chunk[0] if chunk else {}
raise RuntimeError(
"OA batch-update HTTP error "
f"status={getattr(e.response, 'status_code', None)!r} "
f"body_preview={resp_text!r} "
f"first_row={json.dumps(first_row, ensure_ascii=False, default=str)[:2000]}"
) from e
rj = resp.json() if resp.content else {}
code_local = int(rj.get("code", -1))
if code_local != 0:
raise RuntimeError(f"OA batch-update failed code={code_local} message={rj.get('message')!r}")
data_local = rj.get("data") or {}
chunk_success = int(data_local.get("successCount", 0) or 0)
chunk_failed = int(data_local.get("failCount", 0) or 0)
if chunk_success == 0 and chunk_failed == 0:
fd2 = data_local.get("failedData") or {}
fd_len = len(fd2) if isinstance(fd2, dict) else 0
chunk_failed = fd_len
chunk_success = max(0, len(chunk) - chunk_failed)
success_count += chunk_success
failed_count += chunk_failed
fd = data_local.get("failedData") or {}
if isinstance(fd, dict):
for k, v in fd.items():
if str(k) not in failed_data:
failed_data[str(k)] = str(v)
logger.info(
"EHR 请假月度同步完成month=%s~%s vacations=%s aggregated=%s to_update=%s to_insert=%s success=%s failed=%s",
month_start.isoformat(),
month_end.isoformat(),
len(vacations),
len(agg),
to_update,
to_insert,
success_count,
failed_count,
)
return {
"month_start": month_start.isoformat(),
"month_end": month_end.isoformat(),
"vacation_records": len(vacations),
"aggregated_records": len(agg),
"to_update": to_update,
"to_insert": to_insert,
"skipped_unchanged": skipped_unchanged,
"success_count": success_count,
"failed_count": failed_count,
"skipped_no_job_no": skipped_no_job_no,
"skipped_bad_date": skipped_bad_date,
"skipped_not_approved": skipped_not_approved,
"skipped_non_positive": skipped_non_positive,
"failed_data": dict(list(failed_data.items())[:100]),
}
finally:
ehr.close()
seeyon.close()