From 90a54a8dd849c28da4819c6c4ee3f8ecb863cd2e Mon Sep 17 00:00:00 2001 From: Marsway Date: Wed, 4 Mar 2026 11:10:06 +0800 Subject: [PATCH] fixing --- extensions/sync_ehr_to_oa/api.py | 399 +++++++++++++------------------ 1 file changed, 164 insertions(+), 235 deletions(-) diff --git a/extensions/sync_ehr_to_oa/api.py b/extensions/sync_ehr_to_oa/api.py index 08f2c5c..0f5557c 100644 --- a/extensions/sync_ehr_to_oa/api.py +++ b/extensions/sync_ehr_to_oa/api.py @@ -1,7 +1,7 @@ from __future__ import annotations import logging -from datetime import datetime +from datetime import datetime, timedelta from typing import Any from app.integrations.ehr import EhrClient @@ -41,13 +41,140 @@ class SyncEhrToOaApi: self._client.close() @staticmethod - def _to_api_datetime(value: datetime | str) -> str: + def _to_datetime(value: datetime | str | None) -> datetime: + if value is None: + return datetime.now() if isinstance(value, datetime): - return value.strftime("%Y-%m-%dT%H:%M:%S") + return value s = str(value).strip() if not s: raise ValueError("datetime string cannot be empty") - return s + if "T" in s: + return datetime.fromisoformat(s) + if " " in s: + return datetime.fromisoformat(s.replace(" ", "T")) + return datetime.strptime(s, "%Y-%m-%d") + + @staticmethod + def _to_api_datetime(value: datetime | str | None) -> str: + return SyncEhrToOaApi._to_datetime(value).strftime("%Y-%m-%dT%H:%M:%S") + + @staticmethod + def _iter_windows(start_dt: datetime, stop_dt: datetime, max_days: int = 90) -> list[tuple[datetime, datetime]]: + if stop_dt < start_dt: + raise ValueError("stop_time must be greater than or equal to start_time") + windows: list[tuple[datetime, datetime]] = [] + cur = start_dt + max_delta = timedelta(days=max_days) + while cur < stop_dt: + nxt = min(cur + max_delta, stop_dt) + windows.append((cur, nxt)) + cur = nxt + if not windows: + windows.append((start_dt, stop_dt)) + return windows + + def _get_all_by_time_window( + self, + *, + api_path: str, + api_name: str, + stop_time: datetime | str | None, + capacity: int, + time_window_query_type: int, + with_disabled: bool, + is_with_deleted: bool, + max_pages: int, + ) -> dict[str, Any]: + if capacity <= 0 or capacity > 300: + raise ValueError("capacity must be in range [1, 300]") + if max_pages <= 0: + raise ValueError("max_pages must be > 0") + + start_dt = datetime(2015, 1, 1, 0, 0, 0) + stop_dt = self._to_datetime(stop_time) + windows = self._iter_windows(start_dt=start_dt, stop_dt=stop_dt, max_days=90) + + all_data: list[dict[str, Any]] = [] + total_pages = 0 + last_scroll_id = "" + + for idx, (w_start, w_stop) in enumerate(windows, start=1): + start_time = self._to_api_datetime(w_start) + stop_time_s = self._to_api_datetime(w_stop) + scroll_id = "" + page = 0 + window_total = 0 + + while True: + page += 1 + total_pages += 1 + if page > max_pages: + raise RuntimeError(f"scroll pages exceed max_pages={max_pages} in window index={idx}") + + body: dict[str, Any] = { + "startTime": start_time, + "stopTime": stop_time_s, + "timeWindowQueryType": time_window_query_type, + "scrollId": scroll_id, + "capacity": capacity, + "withDisabled": with_disabled, + "isWithDeleted": is_with_deleted, + } + + resp = self._client.request( + "POST", + api_path, + json=body, + headers={"Content-Type": "application/json"}, + ) + payload = resp.json() if resp.content else {} + + code = str(payload.get("code", "") or "") + if code != "200": + message = payload.get("message") + raise RuntimeError(f"EHR {api_name} failed code={code!r} message={message!r}") + + batch = payload.get("data") or [] + if not isinstance(batch, list): + raise RuntimeError(f"EHR {api_name} invalid response: data is not a list") + all_data.extend([x for x in batch if isinstance(x, dict)]) + + total_val = payload.get("total") + if total_val is not None: + try: + window_total = int(total_val) + except (TypeError, ValueError): + pass + + is_last_data = bool(payload.get("isLastData", False)) + scroll_id = str(payload.get("scrollId", "") or "") + last_scroll_id = scroll_id + + logger.info( + "EHR %s window=%s/%s page=%s batch=%s window_total=%s isLastData=%s", + api_name, + idx, + len(windows), + page, + len(batch), + window_total, + is_last_data, + ) + + if is_last_data: + break + + return { + "startTime": self._to_api_datetime(start_dt), + "stopTime": self._to_api_datetime(stop_dt), + "total": len(all_data), + "pages": total_pages, + "count": len(all_data), + "data": all_data, + "lastScrollId": last_scroll_id, + "windowCount": len(windows), + } def get_all_employees_with_record_by_time_window( self, @@ -63,84 +190,18 @@ class SyncEhrToOaApi: 滚动查询“员工 + 单条任职”全量结果。 固定起始时间: - - 2001-01-01T00:00:00 + - 2015-01-01T00:00:00 """ - if capacity <= 0 or capacity > 300: - raise ValueError("capacity must be in range [1, 300]") - if max_pages <= 0: - raise ValueError("max_pages must be > 0") - - start_time = "2001-01-01T00:00:00" - stop_time_s = self._to_api_datetime(stop_time or datetime.now()) - - all_data: list[dict[str, Any]] = [] - scroll_id = "" - total = 0 - page = 0 - - while True: - page += 1 - if page > max_pages: - raise RuntimeError(f"scroll pages exceed max_pages={max_pages}") - - body: dict[str, Any] = { - "startTime": start_time, - "stopTime": stop_time_s, - "timeWindowQueryType": time_window_query_type, - "scrollId": scroll_id, - "capacity": capacity, - "withDisabled": with_disabled, - "isWithDeleted": is_with_deleted, - } - - resp = self._client.request( - "POST", - "/TenantBaseExternal/api/v5/Employee/GetByTimeWindow", - json=body, - headers={"Content-Type": "application/json"}, - ) - payload = resp.json() if resp.content else {} - - code = str(payload.get("code", "") or "") - if code != "200": - message = payload.get("message") - raise RuntimeError(f"EHR GetByTimeWindow failed code={code!r} message={message!r}") - - batch = payload.get("data") or [] - if not isinstance(batch, list): - raise RuntimeError("EHR GetByTimeWindow invalid response: data is not a list") - all_data.extend([x for x in batch if isinstance(x, dict)]) - - total_val = payload.get("total") - if total_val is not None: - try: - total = int(total_val) - except (TypeError, ValueError): - total = total - - is_last_data = bool(payload.get("isLastData", False)) - scroll_id = str(payload.get("scrollId", "") or "") - - logger.info( - "EHR GetByTimeWindow page=%s batch=%s total=%s isLastData=%s", - page, - len(batch), - total, - is_last_data, - ) - - if is_last_data: - break - - return { - "startTime": start_time, - "stopTime": stop_time_s, - "total": total, - "pages": page, - "count": len(all_data), - "data": all_data, - "lastScrollId": scroll_id, - } + return self._get_all_by_time_window( + api_path="/TenantBaseExternal/api/v5/Employee/GetByTimeWindow", + api_name="Employee.GetByTimeWindow", + stop_time=stop_time, + capacity=capacity, + time_window_query_type=time_window_query_type, + with_disabled=with_disabled, + is_with_deleted=is_with_deleted, + max_pages=max_pages, + ) def get_all_organizations_by_time_window( self, @@ -156,84 +217,18 @@ class SyncEhrToOaApi: 滚动查询“组织单元”全量结果。 固定起始时间: - - 2001-01-01T00:00:00 + - 2015-01-01T00:00:00 """ - if capacity <= 0 or capacity > 300: - raise ValueError("capacity must be in range [1, 300]") - if max_pages <= 0: - raise ValueError("max_pages must be > 0") - - start_time = "2001-01-01T00:00:00" - stop_time_s = self._to_api_datetime(stop_time or datetime.now()) - - all_data: list[dict[str, Any]] = [] - scroll_id = "" - total = 0 - page = 0 - - while True: - page += 1 - if page > max_pages: - raise RuntimeError(f"scroll pages exceed max_pages={max_pages}") - - body: dict[str, Any] = { - "startTime": start_time, - "stopTime": stop_time_s, - "timeWindowQueryType": time_window_query_type, - "scrollId": scroll_id, - "capacity": capacity, - "withDisabled": with_disabled, - "isWithDeleted": is_with_deleted, - } - - resp = self._client.request( - "POST", - "/TenantBaseExternal/api/v5/Organization/GetByTimeWindow", - json=body, - headers={"Content-Type": "application/json"}, - ) - payload = resp.json() if resp.content else {} - - code = str(payload.get("code", "") or "") - if code != "200": - message = payload.get("message") - raise RuntimeError(f"EHR Organization.GetByTimeWindow failed code={code!r} message={message!r}") - - batch = payload.get("data") or [] - if not isinstance(batch, list): - raise RuntimeError("EHR Organization.GetByTimeWindow invalid response: data is not a list") - all_data.extend([x for x in batch if isinstance(x, dict)]) - - total_val = payload.get("total") - if total_val is not None: - try: - total = int(total_val) - except (TypeError, ValueError): - total = total - - is_last_data = bool(payload.get("isLastData", False)) - scroll_id = str(payload.get("scrollId", "") or "") - - logger.info( - "EHR Organization.GetByTimeWindow page=%s batch=%s total=%s isLastData=%s", - page, - len(batch), - total, - is_last_data, - ) - - if is_last_data: - break - - return { - "startTime": start_time, - "stopTime": stop_time_s, - "total": total, - "pages": page, - "count": len(all_data), - "data": all_data, - "lastScrollId": scroll_id, - } + return self._get_all_by_time_window( + api_path="/TenantBaseExternal/api/v5/Organization/GetByTimeWindow", + api_name="Organization.GetByTimeWindow", + stop_time=stop_time, + capacity=capacity, + time_window_query_type=time_window_query_type, + with_disabled=with_disabled, + is_with_deleted=is_with_deleted, + max_pages=max_pages, + ) def get_all_job_posts_by_time_window( self, @@ -249,81 +244,15 @@ class SyncEhrToOaApi: 滚动查询“职务”全量结果。 固定起始时间: - - 2001-01-01T00:00:00 + - 2015-01-01T00:00:00 """ - if capacity <= 0 or capacity > 300: - raise ValueError("capacity must be in range [1, 300]") - if max_pages <= 0: - raise ValueError("max_pages must be > 0") - - start_time = "2001-01-01T00:00:00" - stop_time_s = self._to_api_datetime(stop_time or datetime.now()) - - all_data: list[dict[str, Any]] = [] - scroll_id = "" - total = 0 - page = 0 - - while True: - page += 1 - if page > max_pages: - raise RuntimeError(f"scroll pages exceed max_pages={max_pages}") - - body: dict[str, Any] = { - "startTime": start_time, - "stopTime": stop_time_s, - "timeWindowQueryType": time_window_query_type, - "scrollId": scroll_id, - "capacity": capacity, - "withDisabled": with_disabled, - "isWithDeleted": is_with_deleted, - } - - resp = self._client.request( - "POST", - "/TenantBaseExternal/api/v5/JobPost/GetByTimeWindow", - json=body, - headers={"Content-Type": "application/json"}, - ) - payload = resp.json() if resp.content else {} - - code = str(payload.get("code", "") or "") - if code != "200": - message = payload.get("message") - raise RuntimeError(f"EHR JobPost.GetByTimeWindow failed code={code!r} message={message!r}") - - batch = payload.get("data") or [] - if not isinstance(batch, list): - raise RuntimeError("EHR JobPost.GetByTimeWindow invalid response: data is not a list") - all_data.extend([x for x in batch if isinstance(x, dict)]) - - total_val = payload.get("total") - if total_val is not None: - try: - total = int(total_val) - except (TypeError, ValueError): - total = total - - is_last_data = bool(payload.get("isLastData", False)) - scroll_id = str(payload.get("scrollId", "") or "") - - logger.info( - "EHR JobPost.GetByTimeWindow page=%s batch=%s total=%s isLastData=%s", - page, - len(batch), - total, - is_last_data, - ) - - if is_last_data: - break - - return { - "startTime": start_time, - "stopTime": stop_time_s, - "total": total, - "pages": page, - "count": len(all_data), - "data": all_data, - "lastScrollId": scroll_id, - } + return self._get_all_by_time_window( + api_path="/TenantBaseExternal/api/v5/JobPost/GetByTimeWindow", + api_name="JobPost.GetByTimeWindow", + stop_time=stop_time, + capacity=capacity, + time_window_query_type=time_window_query_type, + with_disabled=with_disabled, + is_with_deleted=is_with_deleted, + max_pages=max_pages, + )