This commit is contained in:
Marsway 2026-03-30 17:03:08 +08:00
parent c5f657d641
commit a8eeaba84c
3 changed files with 721 additions and 0 deletions

View File

@ -0,0 +1,6 @@
"""EHR 请假到 OA 月度汇总同步扩展。"""
from extensions.sync_ehr_leaves_to_oa.api import SyncEhrLeavesToOaApi
from extensions.sync_ehr_leaves_to_oa.job import SyncEhrLeavesToOaMonthJob
__all__ = ["SyncEhrLeavesToOaApi", "SyncEhrLeavesToOaMonthJob"]

View File

@ -0,0 +1,342 @@
from __future__ import annotations
import logging
from datetime import date, timedelta
from typing import Any
from urllib.parse import quote_plus
from sqlalchemy import create_engine, text
from sqlalchemy.engine import Engine
from app.integrations.ehr import EhrClient
logger = logging.getLogger("connecthub.extensions.sync_ehr_leaves_to_oa")
def _to_int_safe(v: Any) -> int:
try:
return int(str(v).strip())
except Exception:
return 0
def _extract_staff_code(staff_profile: dict[str, Any]) -> str:
if not isinstance(staff_profile, dict):
return ""
for key in (
"staffCode",
"StaffCode",
"code",
"Code",
"jobNumber",
"JobNumber",
"employeeNo",
"EmployeeNo",
):
val = str(staff_profile.get(key) or "").strip()
if val:
return val
lower_map = {str(k).lower(): v for k, v in staff_profile.items()}
for key in ("staffcode", "code", "jobnumber", "employeeno"):
val = str(lower_map.get(key) or "").strip()
if val:
return val
return ""
def _extract_staff_name(staff_profile: dict[str, Any]) -> str:
if not isinstance(staff_profile, dict):
return ""
for key in ("name", "Name", "staffName", "StaffName", "employeeName", "EmployeeName"):
val = str(staff_profile.get(key) or "").strip()
if val:
return val
lower_map = {str(k).lower(): v for k, v in staff_profile.items()}
for key in ("name", "staffname", "employeename"):
val = str(lower_map.get(key) or "").strip()
if val:
return val
return ""
class SyncEhrLeavesToOaApi:
def __init__(
self,
*,
secret_params: dict[str, str],
sqlserver_params: dict[str, Any],
base_url: str = "https://openapi.italent.cn",
timeout_s: float = 10.0,
retries: int = 2,
retry_backoff_s: float = 0.5,
) -> None:
self._client = EhrClient(
base_url=base_url,
secret_params=secret_params,
timeout_s=timeout_s,
retries=retries,
retry_backoff_s=retry_backoff_s,
)
self._sql_engine = self._create_sqlserver_engine(sqlserver_params)
def close(self) -> None:
self._client.close()
self._sql_engine.dispose()
@staticmethod
def _create_sqlserver_engine(sqlserver_params: dict[str, Any]) -> Engine:
host = str(sqlserver_params.get("host") or "").strip()
username = str(sqlserver_params.get("username") or "").strip()
password = str(sqlserver_params.get("password") or "").strip()
database = str(sqlserver_params.get("database") or "").strip()
if not host or not username or not password or not database:
raise ValueError("sqlserver_params.host/username/password/database are required")
port = int(sqlserver_params.get("port") or 1433)
timeout_s = int(sqlserver_params.get("connect_timeout_s") or 10)
driver = str(sqlserver_params.get("driver") or "ODBC Driver 18 for SQL Server").strip()
trust_server_certificate = str(sqlserver_params.get("trust_server_certificate") or "yes").strip().lower()
encrypt = str(sqlserver_params.get("encrypt") or "yes").strip().lower()
username_q = quote_plus(username)
password_q = quote_plus(password)
driver_q = quote_plus(driver)
conn_url = (
f"mssql+pyodbc://{username_q}:{password_q}@{host}:{port}/{database}"
f"?driver={driver_q}&TrustServerCertificate={trust_server_certificate}&Encrypt={encrypt}"
)
return create_engine(conn_url, pool_pre_ping=True, pool_recycle=1800, connect_args={"timeout": timeout_s})
def ping_sqlserver(self) -> bool:
with self._sql_engine.connect() as conn:
conn.execute(text("SELECT 1"))
return True
def get_vacation_list_by_day(
self,
*,
day: date,
page_size: int = 100,
max_pages: int = 500,
) -> list[dict[str, Any]]:
if page_size <= 0 or page_size > 100:
raise ValueError("page_size must be in range [1, 100]")
if max_pages <= 0:
raise ValueError("max_pages must be > 0")
out: list[dict[str, Any]] = []
cursor: str | None = None
page = 0
while True:
page += 1
if page > max_pages:
raise RuntimeError(f"EHR Vacation.GetListByDate exceeds max_pages={max_pages} day={day.isoformat()}")
body: dict[str, Any] = {
"day": day.isoformat(),
"queryCursor": cursor,
"pageSize": page_size,
}
resp = self._client.request(
"POST",
"/AttendanceOpen/api/v1/Vacation/GetListByDate",
json=body,
headers={"Content-Type": "application/json"},
)
payload = resp.json() if resp.content else {}
code = str(payload.get("code") or "")
if code not in ("200", "206"):
raise RuntimeError(f"EHR Vacation.GetListByDate failed code={code!r} message={payload.get('message')!r}")
data = payload.get("data") or {}
if not isinstance(data, dict):
raise RuntimeError("EHR Vacation.GetListByDate invalid: data is not an object")
vacation_list = data.get("vacationList") or []
if not isinstance(vacation_list, list):
raise RuntimeError("EHR Vacation.GetListByDate invalid: data.vacationList is not a list")
out.extend([x for x in vacation_list if isinstance(x, dict)])
is_last_page = bool(data.get("isLastPage", False))
next_cursor = str(data.get("sortCursor") or "").strip() or None
logger.info(
"EHR Vacation.GetListByDate day=%s page=%s batch=%s is_last_page=%s",
day.isoformat(),
page,
len(vacation_list),
is_last_page,
)
if is_last_page:
break
cursor = next_cursor
if not cursor:
break
return out
def get_vacations_in_date_range(
self,
*,
start_date: date,
end_date: date,
page_size: int = 100,
max_pages_per_day: int = 500,
) -> list[dict[str, Any]]:
if end_date < start_date:
raise ValueError("end_date must be greater than or equal to start_date")
out: list[dict[str, Any]] = []
cur = start_date
while cur <= end_date:
out.extend(
self.get_vacation_list_by_day(
day=cur,
page_size=page_size,
max_pages=max_pages_per_day,
)
)
cur = cur + timedelta(days=1)
logger.info(
"EHR 请假拉取完成start_date=%s end_date=%s total_records=%s",
start_date.isoformat(),
end_date.isoformat(),
len(out),
)
return out
def get_staff_briefs_by_user_ids(self, *, user_ids: list[int], chunk_size: int = 100) -> dict[int, dict[str, str]]:
if chunk_size <= 0:
chunk_size = 100
clean_ids: list[int] = []
seen: set[int] = set()
for u in user_ids:
uid = _to_int_safe(u)
if uid <= 0 or uid in seen:
continue
seen.add(uid)
clean_ids.append(uid)
if not clean_ids:
return {}
out: dict[int, dict[str, str]] = {}
for i in range(0, len(clean_ids), chunk_size):
chunk = clean_ids[i : i + chunk_size]
for uid in chunk:
profile: dict[str, Any] | None = None
for params in ({"userId": str(uid)}, {"userid": str(uid)}):
try:
resp = self._client.request(
"GET",
"/UserFrameworkApiV3/api/v1/staffs/Get",
params=params,
)
except Exception:
continue
payload = resp.json() if resp.content else {}
profile = self._find_staff_profile_by_uid(payload, uid)
if profile is not None:
break
if profile is None:
continue
code = _extract_staff_code(profile)
name = _extract_staff_name(profile)
if code or name:
out[uid] = {"job_no": code, "name": name}
logger.info(
"EHR 员工信息反查完成input_user_ids=%s matched_staff_profiles=%s",
len(clean_ids),
len(out),
)
return out
def get_staff_codes_by_user_ids(self, *, user_ids: list[int], chunk_size: int = 100) -> dict[int, str]:
briefs = self.get_staff_briefs_by_user_ids(user_ids=user_ids, chunk_size=chunk_size)
out: dict[int, str] = {}
for uid, brief in briefs.items():
code = str((brief or {}).get("job_no") or "").strip()
if code:
out[uid] = code
return out
@staticmethod
def _find_staff_profile_by_uid(payload: Any, uid: int) -> dict[str, Any] | None:
def _iter_dicts(node: Any):
if isinstance(node, dict):
yield node
for v in node.values():
yield from _iter_dicts(v)
elif isinstance(node, list):
for it in node:
yield from _iter_dicts(it)
def _uid_from_dict(d: dict[str, Any]) -> int:
for k in ("userId", "UserId", "userid", "UserID", "id", "Id", "ID"):
if k in d:
return _to_int_safe(d.get(k))
return 0
best: dict[str, Any] | None = None
for d in _iter_dicts(payload):
if not isinstance(d, dict):
continue
if _uid_from_dict(d) != uid:
continue
if any(k in d for k in ("staffCode", "StaffCode", "code", "Code", "jobNumber", "JobNumber", "employeeNo", "EmployeeNo")):
return d
if best is None:
best = d
return best
def get_oa_row_id_map_by_job_and_date(
self,
*,
table_name: str,
schema: str,
job_no_column: str,
date_column: str,
start_date: date,
end_date: date,
) -> dict[tuple[str, str], int]:
t = str(table_name or "").strip()
jc = str(job_no_column or "").strip()
dc = str(date_column or "").strip()
s = str(schema or "").strip() or "dbo"
if not t or not jc or not dc:
raise ValueError("table_name/job_no_column/date_column are required")
sql = text(
f"SELECT [id] AS row_id, [{jc}] AS job_no, [{dc}] AS leave_date "
f"FROM [{s}].[{t}] WITH (NOLOCK) "
f"WHERE TRY_CONVERT(date, [{dc}]) >= :start_date "
f"AND TRY_CONVERT(date, [{dc}]) <= :end_date "
f"AND ISNULL(LTRIM(RTRIM([{jc}])), '') <> ''"
)
params = {
"start_date": start_date.isoformat(),
"end_date": end_date.isoformat(),
}
out: dict[tuple[str, str], int] = {}
duplicate_keys = 0
with self._sql_engine.connect() as conn:
rows = conn.execute(sql, params).fetchall()
for r in rows:
try:
row_id = int(r.row_id)
except Exception:
continue
job_no = str(r.job_no or "").strip()
leave_date = str(r.leave_date or "").strip()
leave_date = leave_date.split(" ", 1)[0]
if not job_no or not leave_date:
continue
key = (job_no, leave_date)
if key in out and out[key] != row_id:
duplicate_keys += 1
out[key] = row_id
logger.info(
"OA 现有记录索引完成table=%s month_range=%s~%s indexed=%s duplicate_keys=%s",
t,
start_date.isoformat(),
end_date.isoformat(),
len(out),
duplicate_keys,
)
return out

View File

@ -0,0 +1,373 @@
from __future__ import annotations
import json
import logging
from datetime import date, datetime
from decimal import Decimal, InvalidOperation
from typing import Any
from app.integrations.seeyon import SeeyonClient
from app.jobs.base import BaseJob
from extensions.sync_ehr_leaves_to_oa.api import SyncEhrLeavesToOaApi
logger = logging.getLogger("connecthub.extensions.sync_ehr_leaves_to_oa")
# OA SQLServer与 sync_ehr_to_oa 同源)
_OA_SQLSERVER_PARAMS: dict[str, Any] = {
"host": "192.168.30.108",
"port": 1433,
"database": "seeyon",
"username": "SHOADB91",
"password": "E7nZ8x@12",
"driver": "ODBC Driver 18 for SQL Server",
"encrypt": "no",
"trust_server_certificate": "yes",
"connect_timeout_s": 10,
}
_OA_SQLSERVER_SCHEMA = "dbo"
_OA_SQLSERVER_TABLE = "formmain_20250360"
def _to_date(v: Any) -> date:
if isinstance(v, date) and not isinstance(v, datetime):
return v
if isinstance(v, datetime):
return v.date()
s = str(v or "").strip()
if not s:
return date.today()
s = s.replace("T", " ")
if " " in s:
s = s.split(" ", 1)[0]
return datetime.strptime(s, "%Y-%m-%d").date()
def _date_only(v: Any) -> str:
s = str(v or "").strip()
if not s:
return ""
s = s.replace("T", " ")
if " " in s:
s = s.split(" ", 1)[0]
return s
def _to_decimal(v: Any) -> Decimal:
if v is None:
return Decimal("0")
try:
return Decimal(str(v).strip())
except (InvalidOperation, ValueError):
return Decimal("0")
def _decimal_to_str(v: Decimal) -> str:
if v == v.to_integral():
return str(int(v))
s = format(v.normalize(), "f")
if "." in s:
s = s.rstrip("0").rstrip(".")
return s
def _to_int_safe(v: Any) -> int:
try:
return int(str(v).strip())
except Exception:
return 0
class SyncEhrLeavesToOaMonthJob(BaseJob):
"""
EHR 请假 -> OA 月度同步按工号+日期汇总
- 接口POST /AttendanceOpen/api/v1/Vacation/GetListByDate
- 统计范围执行日所在月份的 1 号到执行日
- 唯一键工号 + 日期
"""
job_id = "sync_ehr_leaves_to_oa.sync_monthly"
def run(self, params: dict[str, Any], secrets: dict[str, Any]) -> dict[str, Any]:
oa_base_url = str(params.get("oa_base_url") or "").strip()
oa_form_code = str(params.get("oa_form_code") or "").strip()
oa_right_id = str(params.get("oa_right_id") or "").strip()
oa_login_name = str(params.get("oa_login_name") or "").strip()
if not oa_base_url:
raise ValueError("public_cfg.oa_base_url is required")
if not oa_form_code:
raise ValueError("public_cfg.oa_form_code is required")
if not oa_right_id:
raise ValueError("public_cfg.oa_right_id is required")
if not oa_login_name:
raise ValueError("public_cfg.oa_login_name is required")
oa_template_code = str(params.get("oa_template_code") or oa_form_code).strip()
run_date = _to_date(params.get("run_date"))
month_start = run_date.replace(day=1)
month_end = run_date
display_job_no = str(params.get("oa_display_job_no") or "工号").strip()
display_leave_date = str(params.get("oa_display_leave_date") or "请假日期").strip()
display_leave_days = str(params.get("oa_display_leave_days") or "请假天数").strip()
display_name = str(params.get("oa_display_name") or "姓名").strip()
if not display_job_no or not display_leave_date or not display_leave_days:
raise ValueError("oa_display_job_no/oa_display_leave_date/oa_display_leave_days cannot be empty")
page_size = int(params.get("page_size") or 100)
if page_size <= 0:
page_size = 100
batch_size = int(params.get("batch_size") or 100)
if batch_size <= 0:
batch_size = 100
max_pages_per_day = int(params.get("max_pages_per_day") or 500)
if max_pages_per_day <= 0:
max_pages_per_day = 500
approved_statuses_param = params.get("approved_statuses")
if approved_statuses_param is None:
approved_statuses = {"通过", "已通过"}
elif isinstance(approved_statuses_param, list):
approved_statuses = {str(x).strip() for x in approved_statuses_param if str(x).strip()}
else:
approved_statuses = {str(approved_statuses_param).strip()} if str(approved_statuses_param).strip() else set()
rest_user = str(secrets.get("rest_user") or "").strip()
rest_password = str(secrets.get("rest_password") or "").strip()
login_name = secrets.get("loginName")
login_name = str(login_name).strip() if login_name else None
if not rest_user or not rest_password:
raise ValueError("secret_cfg.rest_user and secret_cfg.rest_password are required")
app_key = str(secrets.get("app_key") or "").strip()
app_secret = str(secrets.get("app_secret") or "").strip()
if not app_key or not app_secret:
raise ValueError("secret_cfg.app_key and secret_cfg.app_secret are required")
sender_login_name = params.get("senderLoginName")
sender_login_name = str(sender_login_name).strip() if sender_login_name else None
do_trigger = params.get("doTrigger")
seeyon = SeeyonClient(base_url=oa_base_url, rest_user=rest_user, rest_password=rest_password, loginName=login_name)
ehr = SyncEhrLeavesToOaApi(
secret_params={"app_key": app_key, "app_secret": app_secret},
sqlserver_params=_OA_SQLSERVER_PARAMS,
)
try:
ehr.ping_sqlserver()
logger.info(
"SQLServer 连通性检查通过host=%s db=%s table=%s",
_OA_SQLSERVER_PARAMS["host"],
_OA_SQLSERVER_PARAMS["database"],
_OA_SQLSERVER_TABLE,
)
exp_resp = seeyon.export_cap4_form_soap(
templateCode=oa_template_code,
senderLoginName=sender_login_name,
rightId=oa_right_id,
)
raw = exp_resp.text or ""
payload = json.loads(raw) if raw else {}
code = payload.get("code")
if code not in (None, 0, "0"):
raise RuntimeError(f"OA export failed code={code!r} message={payload.get('message')!r}")
outer = payload.get("data") or {}
form = outer.get("data") or {}
if not isinstance(form, dict):
raise RuntimeError("OA export invalid: data.data is not an object")
definition = form.get("definition") or {}
fields = definition.get("fields") or []
if not isinstance(fields, list):
raise RuntimeError("OA export invalid: definition.fields is not a list")
display_to_code: dict[str, str] = {}
for f in fields:
if not isinstance(f, dict):
continue
display = str(f.get("display") or "").strip()
name = str(f.get("name") or "").strip()
if display and name:
display_to_code[display] = name
needed = [display_job_no, display_leave_date, display_leave_days]
missing = [x for x in needed if x not in display_to_code]
if missing:
raise RuntimeError(f"OA export invalid: missing form fields by display names: {missing}")
field_job_no = display_to_code[display_job_no]
field_leave_date = display_to_code[display_leave_date]
field_leave_days = display_to_code[display_leave_days]
field_name = display_to_code.get(display_name)
master_table_name = _OA_SQLSERVER_TABLE
vacations = ehr.get_vacations_in_date_range(
start_date=month_start,
end_date=month_end,
page_size=page_size,
max_pages_per_day=max_pages_per_day,
)
staff_ids: set[int] = set()
for v in vacations:
sid = _to_int_safe(v.get("staffId"))
if sid > 0:
staff_ids.add(sid)
staff_brief_map = ehr.get_staff_briefs_by_user_ids(user_ids=list(staff_ids))
agg: dict[tuple[str, str], Decimal] = {}
name_map: dict[tuple[str, str], str] = {}
skipped_no_job_no = 0
skipped_bad_date = 0
skipped_not_approved = 0
skipped_non_positive = 0
for item in vacations:
approve_status = str(item.get("approveStatus") or "").strip()
if approved_statuses and approve_status not in approved_statuses:
skipped_not_approved += 1
continue
sid = _to_int_safe(item.get("staffId"))
staff_brief = staff_brief_map.get(sid) or {}
job_no = str(staff_brief.get("job_no") or "").strip()
staff_name = str(staff_brief.get("name") or "").strip()
if not job_no:
skipped_no_job_no += 1
continue
leave_date = _date_only(item.get("vacationStartDateTime"))
if not leave_date:
skipped_bad_date += 1
continue
try:
leave_date_obj = datetime.strptime(leave_date, "%Y-%m-%d").date()
except Exception:
skipped_bad_date += 1
continue
if leave_date_obj < month_start or leave_date_obj > month_end:
skipped_bad_date += 1
continue
day_value = _to_decimal(item.get("dayValueOfDuration"))
if day_value <= Decimal("0"):
skipped_non_positive += 1
continue
key = (job_no, leave_date)
exists = agg.get(key)
if exists is None:
agg[key] = day_value
else:
agg[key] = exists + day_value
# 姓名按工号+日期保留一个非空值(用于 OA 展示)
if staff_name and key not in name_map:
name_map[key] = staff_name
existing_id_map = ehr.get_oa_row_id_map_by_job_and_date(
table_name=_OA_SQLSERVER_TABLE,
schema=_OA_SQLSERVER_SCHEMA,
job_no_column=field_job_no,
date_column=field_leave_date,
start_date=month_start,
end_date=month_end,
)
data_list: list[dict[str, Any]] = []
to_update = 0
to_insert = 0
for (job_no, leave_date), leave_days in sorted(agg.items(), key=lambda x: (x[0][0], x[0][1])):
fields_payload = [
{"name": field_job_no, "value": job_no, "showValue": job_no},
{"name": field_leave_date, "value": leave_date, "showValue": leave_date},
{"name": field_leave_days, "value": _decimal_to_str(leave_days), "showValue": _decimal_to_str(leave_days)},
]
if field_name:
name_val = name_map.get((job_no, leave_date), "")
fields_payload.append({"name": field_name, "value": name_val, "showValue": name_val})
record: dict[str, Any] = {
"fields": fields_payload,
}
existing_id = existing_id_map.get((job_no, leave_date))
if existing_id is not None:
record["id"] = existing_id
to_update += 1
else:
to_insert += 1
data_list.append(
{
"masterTable": {
"name": master_table_name,
"record": record,
"changedFields": [f["name"] for f in fields_payload],
},
"subTables": [],
}
)
success_count = 0
failed_count = 0
failed_data: dict[str, str] = {}
do_trigger_bool: bool | None
if do_trigger is None:
do_trigger_bool = None
else:
do_trigger_bool = str(do_trigger).strip().lower() in ("1", "true", "yes", "y", "on")
for i in range(0, len(data_list), batch_size):
chunk = data_list[i : i + batch_size]
if not chunk:
continue
resp = seeyon.batch_update_cap4_form_soap(
formCode=oa_form_code,
loginName=oa_login_name,
rightId=oa_right_id,
dataList=chunk,
uniqueFiled=None,
doTrigger=do_trigger_bool,
)
rj = resp.json() if resp.content else {}
code_local = int(rj.get("code", -1))
if code_local != 0:
raise RuntimeError(f"OA batch-update failed code={code_local} message={rj.get('message')!r}")
data_local = rj.get("data") or {}
chunk_success = int(data_local.get("successCount", 0) or 0)
chunk_failed = int(data_local.get("failCount", 0) or 0)
success_count += chunk_success
failed_count += chunk_failed
fd = data_local.get("failedData") or {}
if isinstance(fd, dict):
for k, v in fd.items():
if str(k) not in failed_data:
failed_data[str(k)] = str(v)
logger.info(
"EHR 请假月度同步完成month=%s~%s vacations=%s aggregated=%s to_update=%s to_insert=%s success=%s failed=%s",
month_start.isoformat(),
month_end.isoformat(),
len(vacations),
len(agg),
to_update,
to_insert,
success_count,
failed_count,
)
return {
"month_start": month_start.isoformat(),
"month_end": month_end.isoformat(),
"vacation_records": len(vacations),
"aggregated_records": len(agg),
"to_update": to_update,
"to_insert": to_insert,
"success_count": success_count,
"failed_count": failed_count,
"skipped_no_job_no": skipped_no_job_no,
"skipped_bad_date": skipped_bad_date,
"skipped_not_approved": skipped_not_approved,
"skipped_non_positive": skipped_non_positive,
"failed_data": dict(list(failed_data.items())[:100]),
}
finally:
ehr.close()
seeyon.close()