from __future__ import annotations import logging import time from typing import Any, Dict, Optional, Tuple from app.clients.feishu_client import FeishuClient from app.clients.huobanyun_client import HuobanyunClient logger = logging.getLogger(__name__) class ApprovalSyncService: def __init__(self) -> None: self.feishu_client = FeishuClient() self.huobanyun_client = HuobanyunClient() self._seen: Dict[Tuple[str, str], float] = {} def _cleanup_seen(self) -> None: now = time.time() expired = [key for key, ts in self._seen.items() if now - ts > 3600] for key in expired: self._seen.pop(key, None) def _is_done(self, status: Optional[str]) -> bool: if not status: return False return status.upper() in {"APPROVED", "REJECTED", "CANCELED", "CANCELLED", "DONE"} async def handle_event(self, event: Dict[str, Any]) -> None: instance_id = event.get("instance_id") or event.get("instanceId") or "" status = event.get("status") or event.get("instance_status") or "" if not instance_id or not self._is_done(status): logger.info("审批事件未结束或缺少实例ID,跳过处理") return dedupe_key = (instance_id, status) self._cleanup_seen() if dedupe_key in self._seen: logger.info("审批事件重复,跳过: %s", dedupe_key) return self._seen[dedupe_key] = time.time() detail = {} try: detail = await self.feishu_client.get_approval_instance(instance_id) except Exception as exc: logger.error("拉取审批详情失败: %s", exc) record_id = event.get("record_id") or instance_id fields = event.get("fields") or detail.get("data", {}).get("form", {}) payload = {"record_id": record_id, "fields": fields} try: await self.huobanyun_client.writeback(payload) logger.info("审批写回伙伴云成功: %s", record_id) except Exception as exc: logger.error("审批写回伙伴云失败: %s", exc)