From 96b3439c5b4468c3e1b797320b7911da447efe26 Mon Sep 17 00:00:00 2001 From: Marsway Date: Mon, 30 Mar 2026 17:14:20 +0800 Subject: [PATCH] update --- extensions/sync_ehr_leaves_to_oa/api.py | 61 +++++++++++++++++++++++-- extensions/sync_ehr_leaves_to_oa/job.py | 33 +++++++++++-- 2 files changed, 85 insertions(+), 9 deletions(-) diff --git a/extensions/sync_ehr_leaves_to_oa/api.py b/extensions/sync_ehr_leaves_to_oa/api.py index 403fdd0..ec77e8b 100644 --- a/extensions/sync_ehr_leaves_to_oa/api.py +++ b/extensions/sync_ehr_leaves_to_oa/api.py @@ -295,15 +295,56 @@ class SyncEhrLeavesToOaApi: start_date: date, end_date: date, ) -> dict[tuple[str, str], int]: + rows = self.get_oa_rows_by_job_and_date( + table_name=table_name, + schema=schema, + job_no_column=job_no_column, + date_column=date_column, + leave_days_column=None, + name_column=None, + start_date=start_date, + end_date=end_date, + ) + out: dict[tuple[str, str], int] = {} + for k, v in rows.items(): + rid = _to_int_safe((v or {}).get("id")) + if rid > 0: + out[k] = rid + return out + + def get_oa_rows_by_job_and_date( + self, + *, + table_name: str, + schema: str, + job_no_column: str, + date_column: str, + leave_days_column: str | None, + name_column: str | None, + start_date: date, + end_date: date, + ) -> dict[tuple[str, str], dict[str, str]]: t = str(table_name or "").strip() jc = str(job_no_column or "").strip() dc = str(date_column or "").strip() + lc = str(leave_days_column or "").strip() + nc = str(name_column or "").strip() s = str(schema or "").strip() or "dbo" if not t or not jc or not dc: raise ValueError("table_name/job_no_column/date_column are required") + select_parts = [ + "[id] AS row_id", + f"[{jc}] AS job_no", + f"[{dc}] AS leave_date", + ] + if lc: + select_parts.append(f"[{lc}] AS leave_days") + if nc: + select_parts.append(f"[{nc}] AS staff_name") + sql = text( - f"SELECT [id] AS row_id, [{jc}] AS job_no, [{dc}] AS leave_date " + f"SELECT {', '.join(select_parts)} " f"FROM [{s}].[{t}] WITH (NOLOCK) " f"WHERE TRY_CONVERT(date, [{dc}]) >= :start_date " f"AND TRY_CONVERT(date, [{dc}]) <= :end_date " @@ -313,7 +354,7 @@ class SyncEhrLeavesToOaApi: "start_date": start_date.isoformat(), "end_date": end_date.isoformat(), } - out: dict[tuple[str, str], int] = {} + out: dict[tuple[str, str], dict[str, str]] = {} duplicate_keys = 0 with self._sql_engine.connect() as conn: rows = conn.execute(sql, params).fetchall() @@ -328,9 +369,21 @@ class SyncEhrLeavesToOaApi: if not job_no or not leave_date: continue key = (job_no, leave_date) - if key in out and out[key] != row_id: + leave_days = "" + if lc: + leave_days = str(getattr(r, "leave_days", "") or "").strip() + staff_name = "" + if nc: + staff_name = str(getattr(r, "staff_name", "") or "").strip() + if key in out and _to_int_safe((out[key] or {}).get("id")) != row_id: duplicate_keys += 1 - out[key] = row_id + out[key] = { + "id": str(row_id), + "job_no": job_no, + "leave_date": leave_date, + "leave_days": leave_days, + "name": staff_name, + } logger.info( "OA 现有记录索引完成:table=%s month_range=%s~%s indexed=%s duplicate_keys=%s", t, diff --git a/extensions/sync_ehr_leaves_to_oa/job.py b/extensions/sync_ehr_leaves_to_oa/job.py index aad9cfd..33d53f9 100644 --- a/extensions/sync_ehr_leaves_to_oa/job.py +++ b/extensions/sync_ehr_leaves_to_oa/job.py @@ -80,6 +80,10 @@ def _to_int_safe(v: Any) -> int: return 0 +def _normalize_decimal_1(v: Any) -> str: + return _decimal_to_str(_to_decimal(v)) + + class SyncEhrLeavesToOaMonthJob(BaseJob): """ EHR 请假 -> OA 月度同步(按工号+日期汇总): @@ -266,11 +270,13 @@ class SyncEhrLeavesToOaMonthJob(BaseJob): if staff_name and key not in name_map: name_map[key] = staff_name - existing_id_map = ehr.get_oa_row_id_map_by_job_and_date( + existing_row_map = ehr.get_oa_rows_by_job_and_date( table_name=_OA_SQLSERVER_TABLE, schema=_OA_SQLSERVER_SCHEMA, job_no_column=field_job_no, date_column=field_leave_date, + leave_days_column=field_leave_days, + name_column=field_name, start_date=month_start, end_date=month_end, ) @@ -278,21 +284,32 @@ class SyncEhrLeavesToOaMonthJob(BaseJob): data_list: list[dict[str, Any]] = [] to_update = 0 to_insert = 0 + skipped_unchanged = 0 for (job_no, leave_date), leave_days in sorted(agg.items(), key=lambda x: (x[0][0], x[0][1])): leave_date_value = f"{leave_date} 00:00:00" + leave_days_value = _decimal_to_str(leave_days) + name_val = name_map.get((job_no, leave_date), "") + + existing_row = existing_row_map.get((job_no, leave_date)) + if existing_row is not None: + old_days = _normalize_decimal_1(existing_row.get("leave_days")) + old_name = str(existing_row.get("name") or "").strip() + if old_days == leave_days_value and (not field_name or old_name == name_val): + skipped_unchanged += 1 + continue + fields_payload = [ {"name": field_job_no, "value": job_no, "showValue": job_no}, {"name": field_leave_date, "value": leave_date_value, "showValue": leave_date}, - {"name": field_leave_days, "value": _decimal_to_str(leave_days), "showValue": _decimal_to_str(leave_days)}, + {"name": field_leave_days, "value": leave_days_value, "showValue": leave_days_value}, ] if field_name: - name_val = name_map.get((job_no, leave_date), "") fields_payload.append({"name": field_name, "value": name_val, "showValue": name_val}) record: dict[str, Any] = { "fields": fields_payload, } - existing_id = existing_id_map.get((job_no, leave_date)) - if existing_id is not None: + existing_id = _to_int_safe((existing_row or {}).get("id")) + if existing_id > 0: record["id"] = existing_id to_update += 1 else: @@ -351,6 +368,11 @@ class SyncEhrLeavesToOaMonthJob(BaseJob): data_local = rj.get("data") or {} chunk_success = int(data_local.get("successCount", 0) or 0) chunk_failed = int(data_local.get("failCount", 0) or 0) + if chunk_success == 0 and chunk_failed == 0: + fd2 = data_local.get("failedData") or {} + fd_len = len(fd2) if isinstance(fd2, dict) else 0 + chunk_failed = fd_len + chunk_success = max(0, len(chunk) - chunk_failed) success_count += chunk_success failed_count += chunk_failed fd = data_local.get("failedData") or {} @@ -377,6 +399,7 @@ class SyncEhrLeavesToOaMonthJob(BaseJob): "aggregated_records": len(agg), "to_update": to_update, "to_insert": to_insert, + "skipped_unchanged": skipped_unchanged, "success_count": success_count, "failed_count": failed_count, "skipped_no_job_no": skipped_no_job_no,