330 lines
10 KiB
Python
330 lines
10 KiB
Python
from __future__ import annotations
|
||
|
||
import logging
|
||
from datetime import datetime
|
||
from typing import Any
|
||
|
||
from app.integrations.ehr import EhrClient
|
||
|
||
|
||
logger = logging.getLogger("connecthub.extensions.sync_ehr_to_oa")
|
||
|
||
|
||
class SyncEhrToOaApi:
|
||
"""
|
||
北森 EHR -> OA 同步 API 封装。
|
||
|
||
已封装 API:
|
||
- 员工与单条任职时间窗滚动查询
|
||
- 组织单元时间窗滚动查询
|
||
- 职务时间窗滚动查询
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
*,
|
||
secret_params: dict[str, str],
|
||
base_url: str = "https://openapi.italent.cn",
|
||
timeout_s: float = 10.0,
|
||
retries: int = 2,
|
||
retry_backoff_s: float = 0.5,
|
||
) -> None:
|
||
self._client = EhrClient(
|
||
base_url=base_url,
|
||
secret_params=secret_params,
|
||
timeout_s=timeout_s,
|
||
retries=retries,
|
||
retry_backoff_s=retry_backoff_s,
|
||
)
|
||
|
||
def close(self) -> None:
|
||
self._client.close()
|
||
|
||
@staticmethod
|
||
def _to_api_datetime(value: datetime | str) -> str:
|
||
if isinstance(value, datetime):
|
||
return value.strftime("%Y-%m-%dT%H:%M:%S")
|
||
s = str(value).strip()
|
||
if not s:
|
||
raise ValueError("datetime string cannot be empty")
|
||
return s
|
||
|
||
def get_all_employees_with_record_by_time_window(
|
||
self,
|
||
*,
|
||
stop_time: datetime | str | None = None,
|
||
capacity: int = 300,
|
||
time_window_query_type: int = 1,
|
||
with_disabled: bool = True,
|
||
is_with_deleted: bool = True,
|
||
max_pages: int = 100000,
|
||
) -> dict[str, Any]:
|
||
"""
|
||
滚动查询“员工 + 单条任职”全量结果。
|
||
|
||
固定起始时间:
|
||
- 2001-01-01T00:00:00
|
||
"""
|
||
if capacity <= 0 or capacity > 300:
|
||
raise ValueError("capacity must be in range [1, 300]")
|
||
if max_pages <= 0:
|
||
raise ValueError("max_pages must be > 0")
|
||
|
||
start_time = "2001-01-01T00:00:00"
|
||
stop_time_s = self._to_api_datetime(stop_time or datetime.now())
|
||
|
||
all_data: list[dict[str, Any]] = []
|
||
scroll_id = ""
|
||
total = 0
|
||
page = 0
|
||
|
||
while True:
|
||
page += 1
|
||
if page > max_pages:
|
||
raise RuntimeError(f"scroll pages exceed max_pages={max_pages}")
|
||
|
||
body: dict[str, Any] = {
|
||
"startTime": start_time,
|
||
"stopTime": stop_time_s,
|
||
"timeWindowQueryType": time_window_query_type,
|
||
"scrollId": scroll_id,
|
||
"capacity": capacity,
|
||
"withDisabled": with_disabled,
|
||
"isWithDeleted": is_with_deleted,
|
||
}
|
||
|
||
resp = self._client.request(
|
||
"POST",
|
||
"/TenantBaseExternal/api/v5/Employee/GetByTimeWindow",
|
||
json=body,
|
||
headers={"Content-Type": "application/json"},
|
||
)
|
||
payload = resp.json() if resp.content else {}
|
||
|
||
code = str(payload.get("code", "") or "")
|
||
if code != "200":
|
||
message = payload.get("message")
|
||
raise RuntimeError(f"EHR GetByTimeWindow failed code={code!r} message={message!r}")
|
||
|
||
batch = payload.get("data") or []
|
||
if not isinstance(batch, list):
|
||
raise RuntimeError("EHR GetByTimeWindow invalid response: data is not a list")
|
||
all_data.extend([x for x in batch if isinstance(x, dict)])
|
||
|
||
total_val = payload.get("total")
|
||
if total_val is not None:
|
||
try:
|
||
total = int(total_val)
|
||
except (TypeError, ValueError):
|
||
total = total
|
||
|
||
is_last_data = bool(payload.get("isLastData", False))
|
||
scroll_id = str(payload.get("scrollId", "") or "")
|
||
|
||
logger.info(
|
||
"EHR GetByTimeWindow page=%s batch=%s total=%s isLastData=%s",
|
||
page,
|
||
len(batch),
|
||
total,
|
||
is_last_data,
|
||
)
|
||
|
||
if is_last_data:
|
||
break
|
||
|
||
return {
|
||
"startTime": start_time,
|
||
"stopTime": stop_time_s,
|
||
"total": total,
|
||
"pages": page,
|
||
"count": len(all_data),
|
||
"data": all_data,
|
||
"lastScrollId": scroll_id,
|
||
}
|
||
|
||
def get_all_organizations_by_time_window(
|
||
self,
|
||
*,
|
||
stop_time: datetime | str | None = None,
|
||
capacity: int = 300,
|
||
time_window_query_type: int = 1,
|
||
with_disabled: bool = True,
|
||
is_with_deleted: bool = True,
|
||
max_pages: int = 100000,
|
||
) -> dict[str, Any]:
|
||
"""
|
||
滚动查询“组织单元”全量结果。
|
||
|
||
固定起始时间:
|
||
- 2001-01-01T00:00:00
|
||
"""
|
||
if capacity <= 0 or capacity > 300:
|
||
raise ValueError("capacity must be in range [1, 300]")
|
||
if max_pages <= 0:
|
||
raise ValueError("max_pages must be > 0")
|
||
|
||
start_time = "2001-01-01T00:00:00"
|
||
stop_time_s = self._to_api_datetime(stop_time or datetime.now())
|
||
|
||
all_data: list[dict[str, Any]] = []
|
||
scroll_id = ""
|
||
total = 0
|
||
page = 0
|
||
|
||
while True:
|
||
page += 1
|
||
if page > max_pages:
|
||
raise RuntimeError(f"scroll pages exceed max_pages={max_pages}")
|
||
|
||
body: dict[str, Any] = {
|
||
"startTime": start_time,
|
||
"stopTime": stop_time_s,
|
||
"timeWindowQueryType": time_window_query_type,
|
||
"scrollId": scroll_id,
|
||
"capacity": capacity,
|
||
"withDisabled": with_disabled,
|
||
"isWithDeleted": is_with_deleted,
|
||
}
|
||
|
||
resp = self._client.request(
|
||
"POST",
|
||
"/TenantBaseExternal/api/v5/Organization/GetByTimeWindow",
|
||
json=body,
|
||
headers={"Content-Type": "application/json"},
|
||
)
|
||
payload = resp.json() if resp.content else {}
|
||
|
||
code = str(payload.get("code", "") or "")
|
||
if code != "200":
|
||
message = payload.get("message")
|
||
raise RuntimeError(f"EHR Organization.GetByTimeWindow failed code={code!r} message={message!r}")
|
||
|
||
batch = payload.get("data") or []
|
||
if not isinstance(batch, list):
|
||
raise RuntimeError("EHR Organization.GetByTimeWindow invalid response: data is not a list")
|
||
all_data.extend([x for x in batch if isinstance(x, dict)])
|
||
|
||
total_val = payload.get("total")
|
||
if total_val is not None:
|
||
try:
|
||
total = int(total_val)
|
||
except (TypeError, ValueError):
|
||
total = total
|
||
|
||
is_last_data = bool(payload.get("isLastData", False))
|
||
scroll_id = str(payload.get("scrollId", "") or "")
|
||
|
||
logger.info(
|
||
"EHR Organization.GetByTimeWindow page=%s batch=%s total=%s isLastData=%s",
|
||
page,
|
||
len(batch),
|
||
total,
|
||
is_last_data,
|
||
)
|
||
|
||
if is_last_data:
|
||
break
|
||
|
||
return {
|
||
"startTime": start_time,
|
||
"stopTime": stop_time_s,
|
||
"total": total,
|
||
"pages": page,
|
||
"count": len(all_data),
|
||
"data": all_data,
|
||
"lastScrollId": scroll_id,
|
||
}
|
||
|
||
def get_all_job_posts_by_time_window(
|
||
self,
|
||
*,
|
||
stop_time: datetime | str | None = None,
|
||
capacity: int = 300,
|
||
time_window_query_type: int = 1,
|
||
with_disabled: bool = True,
|
||
is_with_deleted: bool = True,
|
||
max_pages: int = 100000,
|
||
) -> dict[str, Any]:
|
||
"""
|
||
滚动查询“职务”全量结果。
|
||
|
||
固定起始时间:
|
||
- 2001-01-01T00:00:00
|
||
"""
|
||
if capacity <= 0 or capacity > 300:
|
||
raise ValueError("capacity must be in range [1, 300]")
|
||
if max_pages <= 0:
|
||
raise ValueError("max_pages must be > 0")
|
||
|
||
start_time = "2001-01-01T00:00:00"
|
||
stop_time_s = self._to_api_datetime(stop_time or datetime.now())
|
||
|
||
all_data: list[dict[str, Any]] = []
|
||
scroll_id = ""
|
||
total = 0
|
||
page = 0
|
||
|
||
while True:
|
||
page += 1
|
||
if page > max_pages:
|
||
raise RuntimeError(f"scroll pages exceed max_pages={max_pages}")
|
||
|
||
body: dict[str, Any] = {
|
||
"startTime": start_time,
|
||
"stopTime": stop_time_s,
|
||
"timeWindowQueryType": time_window_query_type,
|
||
"scrollId": scroll_id,
|
||
"capacity": capacity,
|
||
"withDisabled": with_disabled,
|
||
"isWithDeleted": is_with_deleted,
|
||
}
|
||
|
||
resp = self._client.request(
|
||
"POST",
|
||
"/TenantBaseExternal/api/v5/JobPost/GetByTimeWindow",
|
||
json=body,
|
||
headers={"Content-Type": "application/json"},
|
||
)
|
||
payload = resp.json() if resp.content else {}
|
||
|
||
code = str(payload.get("code", "") or "")
|
||
if code != "200":
|
||
message = payload.get("message")
|
||
raise RuntimeError(f"EHR JobPost.GetByTimeWindow failed code={code!r} message={message!r}")
|
||
|
||
batch = payload.get("data") or []
|
||
if not isinstance(batch, list):
|
||
raise RuntimeError("EHR JobPost.GetByTimeWindow invalid response: data is not a list")
|
||
all_data.extend([x for x in batch if isinstance(x, dict)])
|
||
|
||
total_val = payload.get("total")
|
||
if total_val is not None:
|
||
try:
|
||
total = int(total_val)
|
||
except (TypeError, ValueError):
|
||
total = total
|
||
|
||
is_last_data = bool(payload.get("isLastData", False))
|
||
scroll_id = str(payload.get("scrollId", "") or "")
|
||
|
||
logger.info(
|
||
"EHR JobPost.GetByTimeWindow page=%s batch=%s total=%s isLastData=%s",
|
||
page,
|
||
len(batch),
|
||
total,
|
||
is_last_data,
|
||
)
|
||
|
||
if is_last_data:
|
||
break
|
||
|
||
return {
|
||
"startTime": start_time,
|
||
"stopTime": stop_time_s,
|
||
"total": total,
|
||
"pages": page,
|
||
"count": len(all_data),
|
||
"data": all_data,
|
||
"lastScrollId": scroll_id,
|
||
}
|