This commit is contained in:
Marsway 2026-03-04 11:10:06 +08:00
parent a988696408
commit 90a54a8dd8
1 changed files with 164 additions and 235 deletions

View File

@ -1,7 +1,7 @@
from __future__ import annotations from __future__ import annotations
import logging import logging
from datetime import datetime from datetime import datetime, timedelta
from typing import Any from typing import Any
from app.integrations.ehr import EhrClient from app.integrations.ehr import EhrClient
@ -41,13 +41,140 @@ class SyncEhrToOaApi:
self._client.close() self._client.close()
@staticmethod @staticmethod
def _to_api_datetime(value: datetime | str) -> str: def _to_datetime(value: datetime | str | None) -> datetime:
if value is None:
return datetime.now()
if isinstance(value, datetime): if isinstance(value, datetime):
return value.strftime("%Y-%m-%dT%H:%M:%S") return value
s = str(value).strip() s = str(value).strip()
if not s: if not s:
raise ValueError("datetime string cannot be empty") raise ValueError("datetime string cannot be empty")
return s if "T" in s:
return datetime.fromisoformat(s)
if " " in s:
return datetime.fromisoformat(s.replace(" ", "T"))
return datetime.strptime(s, "%Y-%m-%d")
@staticmethod
def _to_api_datetime(value: datetime | str | None) -> str:
return SyncEhrToOaApi._to_datetime(value).strftime("%Y-%m-%dT%H:%M:%S")
@staticmethod
def _iter_windows(start_dt: datetime, stop_dt: datetime, max_days: int = 90) -> list[tuple[datetime, datetime]]:
if stop_dt < start_dt:
raise ValueError("stop_time must be greater than or equal to start_time")
windows: list[tuple[datetime, datetime]] = []
cur = start_dt
max_delta = timedelta(days=max_days)
while cur < stop_dt:
nxt = min(cur + max_delta, stop_dt)
windows.append((cur, nxt))
cur = nxt
if not windows:
windows.append((start_dt, stop_dt))
return windows
def _get_all_by_time_window(
self,
*,
api_path: str,
api_name: str,
stop_time: datetime | str | None,
capacity: int,
time_window_query_type: int,
with_disabled: bool,
is_with_deleted: bool,
max_pages: int,
) -> dict[str, Any]:
if capacity <= 0 or capacity > 300:
raise ValueError("capacity must be in range [1, 300]")
if max_pages <= 0:
raise ValueError("max_pages must be > 0")
start_dt = datetime(2015, 1, 1, 0, 0, 0)
stop_dt = self._to_datetime(stop_time)
windows = self._iter_windows(start_dt=start_dt, stop_dt=stop_dt, max_days=90)
all_data: list[dict[str, Any]] = []
total_pages = 0
last_scroll_id = ""
for idx, (w_start, w_stop) in enumerate(windows, start=1):
start_time = self._to_api_datetime(w_start)
stop_time_s = self._to_api_datetime(w_stop)
scroll_id = ""
page = 0
window_total = 0
while True:
page += 1
total_pages += 1
if page > max_pages:
raise RuntimeError(f"scroll pages exceed max_pages={max_pages} in window index={idx}")
body: dict[str, Any] = {
"startTime": start_time,
"stopTime": stop_time_s,
"timeWindowQueryType": time_window_query_type,
"scrollId": scroll_id,
"capacity": capacity,
"withDisabled": with_disabled,
"isWithDeleted": is_with_deleted,
}
resp = self._client.request(
"POST",
api_path,
json=body,
headers={"Content-Type": "application/json"},
)
payload = resp.json() if resp.content else {}
code = str(payload.get("code", "") or "")
if code != "200":
message = payload.get("message")
raise RuntimeError(f"EHR {api_name} failed code={code!r} message={message!r}")
batch = payload.get("data") or []
if not isinstance(batch, list):
raise RuntimeError(f"EHR {api_name} invalid response: data is not a list")
all_data.extend([x for x in batch if isinstance(x, dict)])
total_val = payload.get("total")
if total_val is not None:
try:
window_total = int(total_val)
except (TypeError, ValueError):
pass
is_last_data = bool(payload.get("isLastData", False))
scroll_id = str(payload.get("scrollId", "") or "")
last_scroll_id = scroll_id
logger.info(
"EHR %s window=%s/%s page=%s batch=%s window_total=%s isLastData=%s",
api_name,
idx,
len(windows),
page,
len(batch),
window_total,
is_last_data,
)
if is_last_data:
break
return {
"startTime": self._to_api_datetime(start_dt),
"stopTime": self._to_api_datetime(stop_dt),
"total": len(all_data),
"pages": total_pages,
"count": len(all_data),
"data": all_data,
"lastScrollId": last_scroll_id,
"windowCount": len(windows),
}
def get_all_employees_with_record_by_time_window( def get_all_employees_with_record_by_time_window(
self, self,
@ -63,84 +190,18 @@ class SyncEhrToOaApi:
滚动查询员工 + 单条任职全量结果 滚动查询员工 + 单条任职全量结果
固定起始时间 固定起始时间
- 2001-01-01T00:00:00 - 2015-01-01T00:00:00
""" """
if capacity <= 0 or capacity > 300: return self._get_all_by_time_window(
raise ValueError("capacity must be in range [1, 300]") api_path="/TenantBaseExternal/api/v5/Employee/GetByTimeWindow",
if max_pages <= 0: api_name="Employee.GetByTimeWindow",
raise ValueError("max_pages must be > 0") stop_time=stop_time,
capacity=capacity,
start_time = "2001-01-01T00:00:00" time_window_query_type=time_window_query_type,
stop_time_s = self._to_api_datetime(stop_time or datetime.now()) with_disabled=with_disabled,
is_with_deleted=is_with_deleted,
all_data: list[dict[str, Any]] = [] max_pages=max_pages,
scroll_id = ""
total = 0
page = 0
while True:
page += 1
if page > max_pages:
raise RuntimeError(f"scroll pages exceed max_pages={max_pages}")
body: dict[str, Any] = {
"startTime": start_time,
"stopTime": stop_time_s,
"timeWindowQueryType": time_window_query_type,
"scrollId": scroll_id,
"capacity": capacity,
"withDisabled": with_disabled,
"isWithDeleted": is_with_deleted,
}
resp = self._client.request(
"POST",
"/TenantBaseExternal/api/v5/Employee/GetByTimeWindow",
json=body,
headers={"Content-Type": "application/json"},
) )
payload = resp.json() if resp.content else {}
code = str(payload.get("code", "") or "")
if code != "200":
message = payload.get("message")
raise RuntimeError(f"EHR GetByTimeWindow failed code={code!r} message={message!r}")
batch = payload.get("data") or []
if not isinstance(batch, list):
raise RuntimeError("EHR GetByTimeWindow invalid response: data is not a list")
all_data.extend([x for x in batch if isinstance(x, dict)])
total_val = payload.get("total")
if total_val is not None:
try:
total = int(total_val)
except (TypeError, ValueError):
total = total
is_last_data = bool(payload.get("isLastData", False))
scroll_id = str(payload.get("scrollId", "") or "")
logger.info(
"EHR GetByTimeWindow page=%s batch=%s total=%s isLastData=%s",
page,
len(batch),
total,
is_last_data,
)
if is_last_data:
break
return {
"startTime": start_time,
"stopTime": stop_time_s,
"total": total,
"pages": page,
"count": len(all_data),
"data": all_data,
"lastScrollId": scroll_id,
}
def get_all_organizations_by_time_window( def get_all_organizations_by_time_window(
self, self,
@ -156,84 +217,18 @@ class SyncEhrToOaApi:
滚动查询组织单元全量结果 滚动查询组织单元全量结果
固定起始时间 固定起始时间
- 2001-01-01T00:00:00 - 2015-01-01T00:00:00
""" """
if capacity <= 0 or capacity > 300: return self._get_all_by_time_window(
raise ValueError("capacity must be in range [1, 300]") api_path="/TenantBaseExternal/api/v5/Organization/GetByTimeWindow",
if max_pages <= 0: api_name="Organization.GetByTimeWindow",
raise ValueError("max_pages must be > 0") stop_time=stop_time,
capacity=capacity,
start_time = "2001-01-01T00:00:00" time_window_query_type=time_window_query_type,
stop_time_s = self._to_api_datetime(stop_time or datetime.now()) with_disabled=with_disabled,
is_with_deleted=is_with_deleted,
all_data: list[dict[str, Any]] = [] max_pages=max_pages,
scroll_id = ""
total = 0
page = 0
while True:
page += 1
if page > max_pages:
raise RuntimeError(f"scroll pages exceed max_pages={max_pages}")
body: dict[str, Any] = {
"startTime": start_time,
"stopTime": stop_time_s,
"timeWindowQueryType": time_window_query_type,
"scrollId": scroll_id,
"capacity": capacity,
"withDisabled": with_disabled,
"isWithDeleted": is_with_deleted,
}
resp = self._client.request(
"POST",
"/TenantBaseExternal/api/v5/Organization/GetByTimeWindow",
json=body,
headers={"Content-Type": "application/json"},
) )
payload = resp.json() if resp.content else {}
code = str(payload.get("code", "") or "")
if code != "200":
message = payload.get("message")
raise RuntimeError(f"EHR Organization.GetByTimeWindow failed code={code!r} message={message!r}")
batch = payload.get("data") or []
if not isinstance(batch, list):
raise RuntimeError("EHR Organization.GetByTimeWindow invalid response: data is not a list")
all_data.extend([x for x in batch if isinstance(x, dict)])
total_val = payload.get("total")
if total_val is not None:
try:
total = int(total_val)
except (TypeError, ValueError):
total = total
is_last_data = bool(payload.get("isLastData", False))
scroll_id = str(payload.get("scrollId", "") or "")
logger.info(
"EHR Organization.GetByTimeWindow page=%s batch=%s total=%s isLastData=%s",
page,
len(batch),
total,
is_last_data,
)
if is_last_data:
break
return {
"startTime": start_time,
"stopTime": stop_time_s,
"total": total,
"pages": page,
"count": len(all_data),
"data": all_data,
"lastScrollId": scroll_id,
}
def get_all_job_posts_by_time_window( def get_all_job_posts_by_time_window(
self, self,
@ -249,81 +244,15 @@ class SyncEhrToOaApi:
滚动查询职务全量结果 滚动查询职务全量结果
固定起始时间 固定起始时间
- 2001-01-01T00:00:00 - 2015-01-01T00:00:00
""" """
if capacity <= 0 or capacity > 300: return self._get_all_by_time_window(
raise ValueError("capacity must be in range [1, 300]") api_path="/TenantBaseExternal/api/v5/JobPost/GetByTimeWindow",
if max_pages <= 0: api_name="JobPost.GetByTimeWindow",
raise ValueError("max_pages must be > 0") stop_time=stop_time,
capacity=capacity,
start_time = "2001-01-01T00:00:00" time_window_query_type=time_window_query_type,
stop_time_s = self._to_api_datetime(stop_time or datetime.now()) with_disabled=with_disabled,
is_with_deleted=is_with_deleted,
all_data: list[dict[str, Any]] = [] max_pages=max_pages,
scroll_id = ""
total = 0
page = 0
while True:
page += 1
if page > max_pages:
raise RuntimeError(f"scroll pages exceed max_pages={max_pages}")
body: dict[str, Any] = {
"startTime": start_time,
"stopTime": stop_time_s,
"timeWindowQueryType": time_window_query_type,
"scrollId": scroll_id,
"capacity": capacity,
"withDisabled": with_disabled,
"isWithDeleted": is_with_deleted,
}
resp = self._client.request(
"POST",
"/TenantBaseExternal/api/v5/JobPost/GetByTimeWindow",
json=body,
headers={"Content-Type": "application/json"},
) )
payload = resp.json() if resp.content else {}
code = str(payload.get("code", "") or "")
if code != "200":
message = payload.get("message")
raise RuntimeError(f"EHR JobPost.GetByTimeWindow failed code={code!r} message={message!r}")
batch = payload.get("data") or []
if not isinstance(batch, list):
raise RuntimeError("EHR JobPost.GetByTimeWindow invalid response: data is not a list")
all_data.extend([x for x in batch if isinstance(x, dict)])
total_val = payload.get("total")
if total_val is not None:
try:
total = int(total_val)
except (TypeError, ValueError):
total = total
is_last_data = bool(payload.get("isLastData", False))
scroll_id = str(payload.get("scrollId", "") or "")
logger.info(
"EHR JobPost.GetByTimeWindow page=%s batch=%s total=%s isLastData=%s",
page,
len(batch),
total,
is_last_data,
)
if is_last_data:
break
return {
"startTime": start_time,
"stopTime": stop_time_s,
"total": total,
"pages": page,
"count": len(all_data),
"data": all_data,
"lastScrollId": scroll_id,
}