Compare commits

..

No commits in common. "main" and "merge" have entirely different histories.
main ... merge

7 changed files with 6423 additions and 461 deletions

2
.env
View File

@ -3,7 +3,7 @@ DATA_DIR=/data
DB_URL=postgresql+psycopg://connecthub:connecthub_pwd_change_me@postgres:5432/connecthub DB_URL=postgresql+psycopg://connecthub:connecthub_pwd_change_me@postgres:5432/connecthub
REDIS_URL=redis://redis:6379/0 REDIS_URL=redis://redis:6379/0
FERNET_KEY_PATH=/data/fernet.key FERNET_KEY_PATH=/data/fernet.key
DEV_MODE=0 DEV_MODE=1
LOG_DIR=/data/logs LOG_DIR=/data/logs

1
.gitignore vendored
View File

@ -3,4 +3,3 @@
pgdata/ pgdata/
__pycache__/ __pycache__/
*.pyc *.pyc
logs/

View File

@ -53,17 +53,6 @@ class EhrClient(BaseClient):
self._token_type: str | None = None self._token_type: str | None = None
self._token_expires_at: float | None = None self._token_expires_at: float | None = None
@staticmethod
def _normalize_token_type(token_type: str | None) -> str:
# 北森文档示例返回 token_type=bearer小写但鉴权头要求 "Bearer <token>"。
# 这里统一规范为首字母大写,避免服务端大小写敏感导致 401。
raw = str(token_type or "").strip()
if not raw:
return "Bearer"
if raw.lower() == "bearer":
return "Bearer"
return raw
def authenticate(self) -> str: def authenticate(self) -> str:
body: dict[str, Any] = { body: dict[str, Any] = {
"grant_type": self.grant_type, "grant_type": self.grant_type,
@ -78,7 +67,7 @@ class EhrClient(BaseClient):
) )
data = resp.json() if resp.content else {} data = resp.json() if resp.content else {}
access_token = str(data.get("access_token", "") or "") access_token = str(data.get("access_token", "") or "")
token_type = self._normalize_token_type(data.get("token_type")) token_type = str(data.get("token_type", "") or "Bearer")
expires_in = int(data.get("expires_in", 0) or 0) expires_in = int(data.get("expires_in", 0) or 0)
if not access_token: if not access_token:
raise RuntimeError("EHR authenticate failed (access_token missing)") raise RuntimeError("EHR authenticate failed (access_token missing)")
@ -99,7 +88,7 @@ class EhrClient(BaseClient):
def request(self, method: str, path: str, **kwargs: Any) -> httpx.Response: # type: ignore[override] def request(self, method: str, path: str, **kwargs: Any) -> httpx.Response: # type: ignore[override]
token = self._get_access_token() token = self._get_access_token()
token_type = self._normalize_token_type(self._token_type) token_type = self._token_type or "Bearer"
headers = dict(kwargs.pop("headers", {}) or {}) headers = dict(kwargs.pop("headers", {}) or {})
headers["Authorization"] = f"{token_type} {token}" headers["Authorization"] = f"{token_type} {token}"
@ -117,7 +106,7 @@ class EhrClient(BaseClient):
self._token_expires_at = None self._token_expires_at = None
token2 = self._get_access_token() token2 = self._get_access_token()
token_type2 = self._normalize_token_type(self._token_type) token_type2 = self._token_type or "Bearer"
headers["Authorization"] = f"{token_type2} {token2}" headers["Authorization"] = f"{token_type2} {token2}"
return super().request(method, path, headers=headers, **kwargs) return super().request(method, path, headers=headers, **kwargs)

View File

@ -1,6 +1,6 @@
from __future__ import annotations from __future__ import annotations
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta
from uuid import uuid4 from uuid import uuid4
from fastapi import Request, Response from fastapi import Request, Response
@ -8,21 +8,14 @@ from sqlalchemy import delete, select
from app.core.config import settings from app.core.config import settings
from app.db.engine import get_session from app.db.engine import get_session
from app.db.models import Session, User from app.db.models import Session
SESSION_COOKIE_NAME = "session_id" SESSION_COOKIE_NAME = "session_id"
def _now_utc() -> datetime: def _now_utc() -> datetime:
return datetime.now(timezone.utc) return datetime.utcnow()
def _as_utc(dt: datetime) -> datetime:
# 兼容历史脏数据:若为 naive按 UTC 解释;若为 aware统一转换到 UTC。
if dt.tzinfo is None:
return dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc)
def create_session(user_id: int, request: Request) -> str: def create_session(user_id: int, request: Request) -> str:
@ -82,7 +75,7 @@ def get_current_user(request: Request) -> User | None:
if not record: if not record:
request.state.user = None request.state.user = None
return None return None
if _as_utc(record.expires_at) <= _now_utc(): if record.expires_at <= _now_utc():
db.execute(delete(Session).where(Session.id == session_id)) db.execute(delete(Session).where(Session.id == session_id))
db.commit() db.commit()
request.state.user = None request.state.user = None

File diff suppressed because it is too large Load Diff

View File

@ -1,7 +1,7 @@
from __future__ import annotations from __future__ import annotations
import logging import logging
from datetime import datetime, timedelta from datetime import datetime
from typing import Any from typing import Any
from app.integrations.ehr import EhrClient from app.integrations.ehr import EhrClient
@ -41,140 +41,13 @@ class SyncEhrToOaApi:
self._client.close() self._client.close()
@staticmethod @staticmethod
def _to_datetime(value: datetime | str | None) -> datetime: def _to_api_datetime(value: datetime | str) -> str:
if value is None:
return datetime.now()
if isinstance(value, datetime): if isinstance(value, datetime):
return value return value.strftime("%Y-%m-%dT%H:%M:%S")
s = str(value).strip() s = str(value).strip()
if not s: if not s:
raise ValueError("datetime string cannot be empty") raise ValueError("datetime string cannot be empty")
if "T" in s: return s
return datetime.fromisoformat(s)
if " " in s:
return datetime.fromisoformat(s.replace(" ", "T"))
return datetime.strptime(s, "%Y-%m-%d")
@staticmethod
def _to_api_datetime(value: datetime | str | None) -> str:
return SyncEhrToOaApi._to_datetime(value).strftime("%Y-%m-%dT%H:%M:%S")
@staticmethod
def _iter_windows(start_dt: datetime, stop_dt: datetime, max_days: int = 90) -> list[tuple[datetime, datetime]]:
if stop_dt < start_dt:
raise ValueError("stop_time must be greater than or equal to start_time")
windows: list[tuple[datetime, datetime]] = []
cur = start_dt
max_delta = timedelta(days=max_days)
while cur < stop_dt:
nxt = min(cur + max_delta, stop_dt)
windows.append((cur, nxt))
cur = nxt
if not windows:
windows.append((start_dt, stop_dt))
return windows
def _get_all_by_time_window(
self,
*,
api_path: str,
api_name: str,
stop_time: datetime | str | None,
capacity: int,
time_window_query_type: int,
with_disabled: bool,
is_with_deleted: bool,
max_pages: int,
) -> dict[str, Any]:
if capacity <= 0 or capacity > 300:
raise ValueError("capacity must be in range [1, 300]")
if max_pages <= 0:
raise ValueError("max_pages must be > 0")
start_dt = datetime(2015, 1, 1, 0, 0, 0)
stop_dt = self._to_datetime(stop_time)
windows = self._iter_windows(start_dt=start_dt, stop_dt=stop_dt, max_days=90)
all_data: list[dict[str, Any]] = []
total_pages = 0
last_scroll_id = ""
for idx, (w_start, w_stop) in enumerate(windows, start=1):
start_time = self._to_api_datetime(w_start)
stop_time_s = self._to_api_datetime(w_stop)
scroll_id = ""
page = 0
window_total = 0
while True:
page += 1
total_pages += 1
if page > max_pages:
raise RuntimeError(f"scroll pages exceed max_pages={max_pages} in window index={idx}")
body: dict[str, Any] = {
"startTime": start_time,
"stopTime": stop_time_s,
"timeWindowQueryType": time_window_query_type,
"scrollId": scroll_id,
"capacity": capacity,
"withDisabled": with_disabled,
"isWithDeleted": is_with_deleted,
}
resp = self._client.request(
"POST",
api_path,
json=body,
headers={"Content-Type": "application/json"},
)
payload = resp.json() if resp.content else {}
code = str(payload.get("code", "") or "")
if code != "200":
message = payload.get("message")
raise RuntimeError(f"EHR {api_name} failed code={code!r} message={message!r}")
batch = payload.get("data") or []
if not isinstance(batch, list):
raise RuntimeError(f"EHR {api_name} invalid response: data is not a list")
all_data.extend([x for x in batch if isinstance(x, dict)])
total_val = payload.get("total")
if total_val is not None:
try:
window_total = int(total_val)
except (TypeError, ValueError):
pass
is_last_data = bool(payload.get("isLastData", False))
scroll_id = str(payload.get("scrollId", "") or "")
last_scroll_id = scroll_id
logger.info(
"EHR %s window=%s/%s page=%s batch=%s window_total=%s isLastData=%s",
api_name,
idx,
len(windows),
page,
len(batch),
window_total,
is_last_data,
)
if is_last_data:
break
return {
"startTime": self._to_api_datetime(start_dt),
"stopTime": self._to_api_datetime(stop_dt),
"total": len(all_data),
"pages": total_pages,
"count": len(all_data),
"data": all_data,
"lastScrollId": last_scroll_id,
"windowCount": len(windows),
}
def get_all_employees_with_record_by_time_window( def get_all_employees_with_record_by_time_window(
self, self,
@ -190,18 +63,84 @@ class SyncEhrToOaApi:
滚动查询员工 + 单条任职全量结果 滚动查询员工 + 单条任职全量结果
固定起始时间 固定起始时间
- 2015-01-01T00:00:00 - 2001-01-01T00:00:00
""" """
return self._get_all_by_time_window( if capacity <= 0 or capacity > 300:
api_path="/TenantBaseExternal/api/v5/Employee/GetByTimeWindow", raise ValueError("capacity must be in range [1, 300]")
api_name="Employee.GetByTimeWindow", if max_pages <= 0:
stop_time=stop_time, raise ValueError("max_pages must be > 0")
capacity=capacity,
time_window_query_type=time_window_query_type, start_time = "2001-01-01T00:00:00"
with_disabled=with_disabled, stop_time_s = self._to_api_datetime(stop_time or datetime.now())
is_with_deleted=is_with_deleted,
max_pages=max_pages, all_data: list[dict[str, Any]] = []
) scroll_id = ""
total = 0
page = 0
while True:
page += 1
if page > max_pages:
raise RuntimeError(f"scroll pages exceed max_pages={max_pages}")
body: dict[str, Any] = {
"startTime": start_time,
"stopTime": stop_time_s,
"timeWindowQueryType": time_window_query_type,
"scrollId": scroll_id,
"capacity": capacity,
"withDisabled": with_disabled,
"isWithDeleted": is_with_deleted,
}
resp = self._client.request(
"POST",
"/TenantBaseExternal/api/v5/Employee/GetByTimeWindow",
json=body,
headers={"Content-Type": "application/json"},
)
payload = resp.json() if resp.content else {}
code = str(payload.get("code", "") or "")
if code != "200":
message = payload.get("message")
raise RuntimeError(f"EHR GetByTimeWindow failed code={code!r} message={message!r}")
batch = payload.get("data") or []
if not isinstance(batch, list):
raise RuntimeError("EHR GetByTimeWindow invalid response: data is not a list")
all_data.extend([x for x in batch if isinstance(x, dict)])
total_val = payload.get("total")
if total_val is not None:
try:
total = int(total_val)
except (TypeError, ValueError):
total = total
is_last_data = bool(payload.get("isLastData", False))
scroll_id = str(payload.get("scrollId", "") or "")
logger.info(
"EHR GetByTimeWindow page=%s batch=%s total=%s isLastData=%s",
page,
len(batch),
total,
is_last_data,
)
if is_last_data:
break
return {
"startTime": start_time,
"stopTime": stop_time_s,
"total": total,
"pages": page,
"count": len(all_data),
"data": all_data,
"lastScrollId": scroll_id,
}
def get_all_organizations_by_time_window( def get_all_organizations_by_time_window(
self, self,
@ -217,18 +156,84 @@ class SyncEhrToOaApi:
滚动查询组织单元全量结果 滚动查询组织单元全量结果
固定起始时间 固定起始时间
- 2015-01-01T00:00:00 - 2001-01-01T00:00:00
""" """
return self._get_all_by_time_window( if capacity <= 0 or capacity > 300:
api_path="/TenantBaseExternal/api/v5/Organization/GetByTimeWindow", raise ValueError("capacity must be in range [1, 300]")
api_name="Organization.GetByTimeWindow", if max_pages <= 0:
stop_time=stop_time, raise ValueError("max_pages must be > 0")
capacity=capacity,
time_window_query_type=time_window_query_type, start_time = "2001-01-01T00:00:00"
with_disabled=with_disabled, stop_time_s = self._to_api_datetime(stop_time or datetime.now())
is_with_deleted=is_with_deleted,
max_pages=max_pages, all_data: list[dict[str, Any]] = []
) scroll_id = ""
total = 0
page = 0
while True:
page += 1
if page > max_pages:
raise RuntimeError(f"scroll pages exceed max_pages={max_pages}")
body: dict[str, Any] = {
"startTime": start_time,
"stopTime": stop_time_s,
"timeWindowQueryType": time_window_query_type,
"scrollId": scroll_id,
"capacity": capacity,
"withDisabled": with_disabled,
"isWithDeleted": is_with_deleted,
}
resp = self._client.request(
"POST",
"/TenantBaseExternal/api/v5/Organization/GetByTimeWindow",
json=body,
headers={"Content-Type": "application/json"},
)
payload = resp.json() if resp.content else {}
code = str(payload.get("code", "") or "")
if code != "200":
message = payload.get("message")
raise RuntimeError(f"EHR Organization.GetByTimeWindow failed code={code!r} message={message!r}")
batch = payload.get("data") or []
if not isinstance(batch, list):
raise RuntimeError("EHR Organization.GetByTimeWindow invalid response: data is not a list")
all_data.extend([x for x in batch if isinstance(x, dict)])
total_val = payload.get("total")
if total_val is not None:
try:
total = int(total_val)
except (TypeError, ValueError):
total = total
is_last_data = bool(payload.get("isLastData", False))
scroll_id = str(payload.get("scrollId", "") or "")
logger.info(
"EHR Organization.GetByTimeWindow page=%s batch=%s total=%s isLastData=%s",
page,
len(batch),
total,
is_last_data,
)
if is_last_data:
break
return {
"startTime": start_time,
"stopTime": stop_time_s,
"total": total,
"pages": page,
"count": len(all_data),
"data": all_data,
"lastScrollId": scroll_id,
}
def get_all_job_posts_by_time_window( def get_all_job_posts_by_time_window(
self, self,
@ -244,15 +249,81 @@ class SyncEhrToOaApi:
滚动查询职务全量结果 滚动查询职务全量结果
固定起始时间 固定起始时间
- 2015-01-01T00:00:00 - 2001-01-01T00:00:00
""" """
return self._get_all_by_time_window( if capacity <= 0 or capacity > 300:
api_path="/TenantBaseExternal/api/v5/JobPost/GetByTimeWindow", raise ValueError("capacity must be in range [1, 300]")
api_name="JobPost.GetByTimeWindow", if max_pages <= 0:
stop_time=stop_time, raise ValueError("max_pages must be > 0")
capacity=capacity,
time_window_query_type=time_window_query_type, start_time = "2001-01-01T00:00:00"
with_disabled=with_disabled, stop_time_s = self._to_api_datetime(stop_time or datetime.now())
is_with_deleted=is_with_deleted,
max_pages=max_pages, all_data: list[dict[str, Any]] = []
) scroll_id = ""
total = 0
page = 0
while True:
page += 1
if page > max_pages:
raise RuntimeError(f"scroll pages exceed max_pages={max_pages}")
body: dict[str, Any] = {
"startTime": start_time,
"stopTime": stop_time_s,
"timeWindowQueryType": time_window_query_type,
"scrollId": scroll_id,
"capacity": capacity,
"withDisabled": with_disabled,
"isWithDeleted": is_with_deleted,
}
resp = self._client.request(
"POST",
"/TenantBaseExternal/api/v5/JobPost/GetByTimeWindow",
json=body,
headers={"Content-Type": "application/json"},
)
payload = resp.json() if resp.content else {}
code = str(payload.get("code", "") or "")
if code != "200":
message = payload.get("message")
raise RuntimeError(f"EHR JobPost.GetByTimeWindow failed code={code!r} message={message!r}")
batch = payload.get("data") or []
if not isinstance(batch, list):
raise RuntimeError("EHR JobPost.GetByTimeWindow invalid response: data is not a list")
all_data.extend([x for x in batch if isinstance(x, dict)])
total_val = payload.get("total")
if total_val is not None:
try:
total = int(total_val)
except (TypeError, ValueError):
total = total
is_last_data = bool(payload.get("isLastData", False))
scroll_id = str(payload.get("scrollId", "") or "")
logger.info(
"EHR JobPost.GetByTimeWindow page=%s batch=%s total=%s isLastData=%s",
page,
len(batch),
total,
is_last_data,
)
if is_last_data:
break
return {
"startTime": start_time,
"stopTime": stop_time_s,
"total": total,
"pages": page,
"count": len(all_data),
"data": all_data,
"lastScrollId": scroll_id,
}

View File

@ -46,97 +46,6 @@ def _custom_prop_value(custom_props: Any, key: str | None) -> str:
return str(raw or "").strip() return str(raw or "").strip()
def _to_bool_or_none(v: Any) -> bool | None:
if v is None:
return None
if isinstance(v, bool):
return v
s = str(v).strip().lower()
if s in ("1", "true", "yes", "y", "on"):
return True
if s in ("0", "false", "no", "n", "off", ""):
return False
return bool(v)
def _normalize_job_no(v: Any) -> str:
"""
工号标准化
- 去首尾空白去内部空格
- 数值型字符串如 123.0 -> 123常见于表单数字字段
- 统一大写便于大小写不敏感匹配
"""
s = str(v or "").strip()
if not s:
return ""
s = s.replace(" ", "")
try:
if "." in s and s.endswith(".0"):
i = int(float(s))
s = str(i)
except Exception:
pass
return s.upper()
def _extract_oa_row_id_and_fields(row: dict[str, Any]) -> tuple[int | None, dict[str, Any]]:
"""
兼容不同 OA export 返回结构提取
- row_id
- 字段字典key=fieldCode, value=单元格对象或值
"""
field_map: dict[str, Any] = {}
row_id: int | None = None
# 结构 AmasterData 直接是 {field0001: {value,showValue}, ...}
master = row.get("masterData")
if isinstance(master, dict):
for k, v in master.items():
if isinstance(k, str) and k.startswith("field"):
field_map[k] = v
for candidate in (row.get("id"), row.get("masterDataId"), master.get("id")):
if candidate is None:
continue
try:
row_id = int(str(candidate))
break
except Exception:
continue
# 结构 BmasterTable.record.fields = [{name,value,showValue}, ...]
master_table = row.get("masterTable")
if isinstance(master_table, dict):
record = master_table.get("record")
if isinstance(record, dict):
fields = record.get("fields")
if isinstance(fields, list):
for fld in fields:
if not isinstance(fld, dict):
continue
name = str(fld.get("name") or "").strip()
if name:
field_map[name] = fld
if row_id is None:
rid = record.get("id")
if rid is not None:
try:
row_id = int(str(rid))
except Exception:
pass
# 结构 C行级 fields 列表
row_fields = row.get("fields")
if isinstance(row_fields, list):
for fld in row_fields:
if not isinstance(fld, dict):
continue
name = str(fld.get("name") or "").strip()
if name:
field_map[name] = fld
return row_id, field_map
def _choose_better_record(current: dict[str, Any], candidate: dict[str, Any]) -> dict[str, Any]: def _choose_better_record(current: dict[str, Any], candidate: dict[str, Any]) -> dict[str, Any]:
def _score(item: dict[str, Any]) -> str: def _score(item: dict[str, Any]) -> str:
record = item.get("recordInfo") or {} record = item.get("recordInfo") or {}
@ -205,15 +114,6 @@ class SyncEhrToOaFormJob(BaseJob):
rd_attr_custom_key = str(params.get("rd_attr_custom_key") or "").strip() or None rd_attr_custom_key = str(params.get("rd_attr_custom_key") or "").strip() or None
domain_custom_key = str(params.get("domain_account_custom_key") or "").strip() or None domain_custom_key = str(params.get("domain_account_custom_key") or "").strip() or None
verbose_trace = _to_bool_or_none(params.get("verbose_trace"))
if verbose_trace is None:
verbose_trace = True
preview_ehr_data = _to_bool_or_none(params.get("preview_ehr_data"))
if preview_ehr_data is None:
preview_ehr_data = True
preview_limit = int(params.get("preview_limit") or 20)
if preview_limit <= 0:
preview_limit = 20
seeyon = SeeyonClient(base_url=oa_base_url, rest_user=rest_user, rest_password=rest_password, loginName=login_name) seeyon = SeeyonClient(base_url=oa_base_url, rest_user=rest_user, rest_password=rest_password, loginName=login_name)
ehr = SyncEhrToOaApi(secret_params={"app_key": app_key, "app_secret": app_secret}) ehr = SyncEhrToOaApi(secret_params={"app_key": app_key, "app_secret": app_secret})
@ -239,7 +139,6 @@ class SyncEhrToOaFormJob(BaseJob):
# 3) 员工按工号归并(同工号保留“最新”记录) # 3) 员工按工号归并(同工号保留“最新”记录)
ehr_by_job_no: dict[str, dict[str, Any]] = {} ehr_by_job_no: dict[str, dict[str, Any]] = {}
ehr_by_job_no_norm: dict[str, dict[str, Any]] = {}
for item in emp_rows: for item in emp_rows:
if not isinstance(item, dict): if not isinstance(item, dict):
continue continue
@ -251,96 +150,15 @@ class SyncEhrToOaFormJob(BaseJob):
continue continue
existing = ehr_by_job_no.get(job_no) existing = ehr_by_job_no.get(job_no)
ehr_by_job_no[job_no] = item if existing is None else _choose_better_record(existing, item) ehr_by_job_no[job_no] = item if existing is None else _choose_better_record(existing, item)
job_no_norm = _normalize_job_no(job_no)
if job_no_norm:
ex2 = ehr_by_job_no_norm.get(job_no_norm)
ehr_by_job_no_norm[job_no_norm] = item if ex2 is None else _choose_better_record(ex2, item)
logger.info(
"EHR 数据准备完成employee_rows=%s organization_rows=%s distinct_job_numbers=%s distinct_job_numbers_norm=%s",
len(emp_rows),
len(org_rows),
len(ehr_by_job_no),
len(ehr_by_job_no_norm),
)
if verbose_trace:
for job_no in list(ehr_by_job_no.keys()):
logger.info("EHR 工号明细raw=%s norm=%s", job_no, _normalize_job_no(job_no))
if preview_ehr_data:
logger.info("EHR 字段预览开始limit=%s", preview_limit)
count = 0
for job_no, item in ehr_by_job_no.items():
emp = item.get("employeeInfo") or {}
rec = item.get("recordInfo") or {}
if not isinstance(emp, dict):
emp = {}
if not isinstance(rec, dict):
rec = {}
org_oid = str(rec.get("oIdOrganization") or rec.get("oIdDepartment") or "").strip()
org = org_by_oid.get(org_oid, {})
company = str((org or {}).get("name") or "")
name = str(emp.get("name") or "")
rd_attr = _custom_prop_value(rec.get("customProperties"), rd_attr_custom_key) or _custom_prop_value(
emp.get("customProperties"), rd_attr_custom_key
)
place = str(rec.get("place") or "")
entry_date = _date_only(rec.get("entryDate"))
leave_date = _date_only(rec.get("lastWorkDate")) or "2099-12-31"
id_number = str(emp.get("iDNumber") or "")
hrbp = str((org or {}).get("hRBP") or "")
manager = str(rec.get("pOIdEmpAdmin") or "")
is_leaving = "" if _date_only(rec.get("lastWorkDate")) else ""
domain_account = _custom_prop_value(emp.get("customProperties"), domain_custom_key) or str(emp.get("_Name") or "")
logger.info(
"EHR 字段预览job_no=%s company=%s name=%s rd_attr=%s place=%s entry_date=%s leave_date=%s id_number=%s hrbp=%s manager=%s is_leaving=%s domain_account=%s",
job_no,
company,
name,
rd_attr,
place,
entry_date,
leave_date,
id_number,
hrbp,
manager,
is_leaving,
domain_account,
)
count += 1
if count >= preview_limit:
break
logger.info("EHR 字段预览结束printed=%s", count)
# 4) 导出 OA 表单,建立字段映射 + 工号到记录ID映射 # 4) 导出 OA 表单,建立字段映射 + 工号到记录ID映射
exp_resp = seeyon.export_cap4_form_soap( exp_resp = seeyon.export_cap4_form_soap(templateCode=oa_template_code, senderLoginName=sender_login_name, rightId=oa_right_id)
templateCode=oa_template_code,
senderLoginName=sender_login_name,
rightId=oa_right_id,
)
raw = exp_resp.text or "" raw = exp_resp.text or ""
logger.info( payload = json.loads(raw) if raw else {}
"OA export 返回status=%s content_length=%s template=%s",
exp_resp.status_code,
len(raw),
oa_template_code,
)
if raw:
logger.info("OA export 响应预览:%s", raw[:1000])
try:
payload = json.loads(raw) if raw else {}
except Exception as e: # noqa: BLE001
raise RuntimeError(f"OA export 响应不是有效 JSON: err={e!r} preview={raw[:500]!r}") from e
export_code = payload.get("code")
export_message = payload.get("message")
if export_code not in (None, 0, "0"):
raise RuntimeError(f"OA export failed code={export_code!r} message={export_message!r}")
outer = payload.get("data") or {} outer = payload.get("data") or {}
form = outer.get("data") or {} form = outer.get("data") or {}
if not isinstance(form, dict): if not isinstance(form, dict):
raise RuntimeError( raise RuntimeError("OA export invalid: data.data is not an object")
f"OA export invalid: data.data is not an object; payload_keys={list(payload.keys())[:20]}"
)
definition = form.get("definition") or {} definition = form.get("definition") or {}
fields = definition.get("fields") or [] fields = definition.get("fields") or []
@ -391,81 +209,40 @@ class SyncEhrToOaFormJob(BaseJob):
oa_master_table_name = str(master_tbl.get("name") or "").strip() oa_master_table_name = str(master_tbl.get("name") or "").strip()
if not oa_master_table_name: if not oa_master_table_name:
raise RuntimeError("public_cfg.oa_master_table_name is required (cannot infer from OA export)") raise RuntimeError("public_cfg.oa_master_table_name is required (cannot infer from OA export)")
logger.info(
"OA 表单解析完成template=%s master_table=%s form_rows=%s",
oa_template_code,
oa_master_table_name,
len(rows),
)
job_field_code = display_to_code["工号"] job_field_code = display_to_code["工号"]
oa_id_by_job_no: dict[str, int] = {} oa_id_by_job_no: dict[str, int] = {}
oa_id_by_job_no_norm: dict[str, int] = {}
row_parse_miss = 0
for row in rows: for row in rows:
if not isinstance(row, dict): if not isinstance(row, dict):
continue continue
row_id, field_map = _extract_oa_row_id_and_fields(row) master = row.get("masterData") or {}
job_no = _cell_value(field_map.get(job_field_code)) if not isinstance(master, dict):
continue
job_no = _cell_value(master.get(job_field_code))
if not job_no: if not job_no:
row_parse_miss += 1
if verbose_trace and row_parse_miss <= 20:
logger.info(
"OA 行解析未取到工号job_field=%s row_keys=%s field_keys_sample=%s",
job_field_code,
list(row.keys())[:20],
list(field_map.keys())[:20],
)
continue continue
if row_id is None: row_id_raw = row.get("id")
row_parse_miss += 1 if row_id_raw is None:
if verbose_trace and row_parse_miss <= 20: row_id_raw = row.get("masterDataId")
logger.info( if row_id_raw is None:
"OA 行解析未取到记录IDjob_no=%s row_keys=%s", row_id_raw = master.get("id")
job_no, if row_id_raw is None:
list(row.keys())[:20], continue
) try:
row_id = int(str(row_id_raw))
except Exception:
continue continue
oa_id_by_job_no[job_no] = row_id oa_id_by_job_no[job_no] = row_id
job_no_norm = _normalize_job_no(job_no)
if job_no_norm:
oa_id_by_job_no_norm[job_no_norm] = row_id
logger.info(
"OA 工号索引完成indexed_job_numbers=%s indexed_job_numbers_norm=%s parse_miss=%s",
len(oa_id_by_job_no),
len(oa_id_by_job_no_norm),
row_parse_miss,
)
if verbose_trace:
for job_no, row_id in list(oa_id_by_job_no.items()):
logger.info("OA 工号索引明细raw=%s norm=%s row_id=%s", job_no, _normalize_job_no(job_no), row_id)
# 5) 组装批量更新数据 # 5) 组装批量更新数据
data_list: list[dict[str, Any]] = [] data_list: list[dict[str, Any]] = []
not_found_in_oa = 0 not_found_in_oa = 0
unmatched_samples: list[str] = []
for job_no, item in ehr_by_job_no.items(): for job_no, item in ehr_by_job_no.items():
oa_record_id = oa_id_by_job_no.get(job_no) oa_record_id = oa_id_by_job_no.get(job_no)
matched_by = "raw"
if oa_record_id is None:
oa_record_id = oa_id_by_job_no_norm.get(_normalize_job_no(job_no))
matched_by = "normalized"
if oa_record_id is None: if oa_record_id is None:
not_found_in_oa += 1 not_found_in_oa += 1
if len(unmatched_samples) < 20:
unmatched_samples.append(job_no)
if verbose_trace:
logger.info("匹配失败job_no=%s norm=%s", job_no, _normalize_job_no(job_no))
continue continue
if verbose_trace:
logger.info(
"匹配成功job_no=%s norm=%s row_id=%s matched_by=%s",
job_no,
_normalize_job_no(job_no),
oa_record_id,
matched_by,
)
emp = item.get("employeeInfo") or {} emp = item.get("employeeInfo") or {}
rec = item.get("recordInfo") or {} rec = item.get("recordInfo") or {}
@ -504,10 +281,6 @@ class SyncEhrToOaFormJob(BaseJob):
{"name": display_to_code["在离职"], "value": is_leaving, "showValue": is_leaving}, {"name": display_to_code["在离职"], "value": is_leaving, "showValue": is_leaving},
{"name": display_to_code["域账号"], "value": domain_account, "showValue": domain_account}, {"name": display_to_code["域账号"], "value": domain_account, "showValue": domain_account},
] ]
if verbose_trace:
logger.info("字段映射job_no=%s row_id=%s", job_no, oa_record_id)
for fld in fields_payload:
logger.info("字段映射明细job_no=%s field=%s value=%s", job_no, fld["name"], fld["value"])
data_list.append( data_list.append(
{ {
@ -522,44 +295,20 @@ class SyncEhrToOaFormJob(BaseJob):
"subTables": [], "subTables": [],
} }
) )
logger.info(
"待更新数据准备完成prepared_updates=%s not_found_in_oa=%s",
len(data_list),
not_found_in_oa,
)
if not data_list:
raise RuntimeError(
"No updates prepared for OA batch-update (check jobNumber matching between EHR and OA, and form field mapping). "
f"unmatched_sample={unmatched_samples}"
)
# 6) 分批执行 batch-update # 6) 分批执行 batch-update
success_count = 0 success_count = 0
failed_count = 0 failed_count = 0
failed_data: dict[str, str] = {} failed_data: dict[str, str] = {}
do_trigger_bool = _to_bool_or_none(do_trigger)
for i in range(0, len(data_list), batch_size): for i in range(0, len(data_list), batch_size):
chunk = data_list[i : i + batch_size] chunk = data_list[i : i + batch_size]
if verbose_trace:
logger.info("批量更新尝试chunk_index=%s chunk_size=%s", i // batch_size + 1, len(chunk))
for row in chunk:
try:
record = (((row or {}).get("masterTable") or {}).get("record") or {})
row_id = record.get("id")
fields = record.get("fields") or []
logger.info("批量更新行row_id=%s fields_count=%s", row_id, len(fields))
for fld in fields:
if isinstance(fld, dict):
logger.info("批量更新字段row_id=%s field=%s value=%s", row_id, fld.get("name"), fld.get("value"))
except Exception:
logger.info("批量更新行日志输出失败,已忽略")
resp = seeyon.batch_update_cap4_form_soap( resp = seeyon.batch_update_cap4_form_soap(
formCode=oa_form_code, formCode=oa_form_code,
loginName=oa_login_name, loginName=oa_login_name,
rightId=oa_right_id, rightId=oa_right_id,
dataList=chunk, dataList=chunk,
uniqueFiled=[job_field_code], uniqueFiled=[job_field_code],
doTrigger=do_trigger_bool, doTrigger=bool(do_trigger) if do_trigger is not None else None,
) )
rj = resp.json() if resp.content else {} rj = resp.json() if resp.content else {}
code = int(rj.get("code", -1)) code = int(rj.get("code", -1))