approval/app/services/approval_sync_service.py

204 lines
7.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import logging
import time
from typing import Any, Dict, Optional, Tuple
from app.clients.feishu_client import FeishuClient
from app.config.settings import get_settings
from app.services.huobanyun_service import HuobanyunService
logger = logging.getLogger(__name__)
class ApprovalSyncService:
def __init__(self) -> None:
self.settings = get_settings()
self.feishu_client = FeishuClient()
self.huobanyun_service = HuobanyunService()
self._seen: Dict[Tuple[str, str], float] = {}
def _cleanup_seen(self) -> None:
now = time.time()
expired = [key for key, ts in self._seen.items() if now - ts > 3600]
for key in expired:
self._seen.pop(key, None)
def _is_done(self, status: Optional[str]) -> bool:
if not status:
return False
return status.upper() in {"APPROVED", "REJECTED", "CANCELED", "CANCELLED", "DONE"}
def _extract_project_no_from_detail(self, detail: Dict[str, Any]) -> str:
field_code = self.settings.feishu_project_no_field_code
if not field_code:
return ""
data = detail.get("data", {})
form = data.get("form")
if isinstance(form, str):
try:
import json
form = json.loads(form)
except Exception:
form = None
if isinstance(form, list):
for item in form:
if not isinstance(item, dict):
continue
code = (
item.get("custom_id")
or item.get("id")
or item.get("field_code")
or item.get("code")
)
if code == field_code:
value = item.get("value") or item.get("text") or item.get("name")
if isinstance(value, list) and value:
return str(value[0])
return "" if value is None else str(value)
if isinstance(form, dict):
value = form.get(field_code)
if isinstance(value, list) and value:
return str(value[0])
return "" if value is None else str(value)
return ""
def _extract_payment_type_from_detail(self, detail: Dict[str, Any]) -> str:
data = detail.get("data", {})
form = data.get("form")
if isinstance(form, str):
try:
import json
form = json.loads(form)
except Exception:
form = None
if isinstance(form, list):
for item in form:
if not isinstance(item, dict):
continue
code = (
item.get("custom_id")
or item.get("id")
or item.get("field_code")
or item.get("code")
)
if code == "type":
value = item.get("value") or item.get("text") or item.get("name")
if isinstance(value, list) and value:
return str(value[0])
return "" if value is None else str(value)
if isinstance(form, dict):
value = form.get("type")
if isinstance(value, list) and value:
return str(value[0])
return "" if value is None else str(value)
return ""
async def ensure_approval_subscription(self) -> None:
codes = [self.settings.feishu_approval_code] if self.settings.feishu_approval_code else []
if not codes:
logger.error("未配置 FEISHU_APPROVAL_CODE无法订阅审批定义")
return
for approval_code in codes:
try:
resp = await self.feishu_client.subscribe_approval(approval_code)
code = resp.get("code")
msg = resp.get("msg")
if code in (0, 1390007):
logger.info("审批定义订阅成功或已存在: %s", msg)
else:
logger.error("审批定义订阅失败: code=%s msg=%s", code, msg)
except Exception as exc:
logger.error("订阅审批定义异常: %s", exc)
async def ensure_unsubscribe_once(self) -> None:
if not self.settings.feishu_unsubscribe_once:
return
codes = self.settings.feishu_unsubscribe_codes
if not codes:
logger.error("未配置 FEISHU_UNSUBSCRIBE_CODES无法取消订阅")
return
for approval_code in codes:
try:
resp = await self.feishu_client.unsubscribe_approval(approval_code)
code = resp.get("code")
msg = resp.get("msg")
if code in (0, 1390007):
logger.info("取消订阅成功或不存在: %s", msg)
else:
logger.error("取消订阅失败: code=%s msg=%s", code, msg)
except Exception as exc:
logger.error("取消订阅异常: %s", exc)
async def handle_approval_event(self, event: Dict[str, Any]) -> None:
payload = event.get("event", event)
logger.info("审批事件内容: %s", payload)
approval_code = payload.get("approval_code") or payload.get("approvalCode")
if self.settings.feishu_approval_code and approval_code != self.settings.feishu_approval_code:
logger.info("审批定义不匹配,跳过: %s", approval_code)
return
instance_id = (
payload.get("instance_id")
or payload.get("instanceId")
or payload.get("instance_code")
or payload.get("instanceCode")
or ""
)
status = payload.get("status") or payload.get("instance_status") or payload.get("instanceStatus")
if not instance_id or not status:
logger.info("审批事件未结束或缺少实例ID跳过处理")
return
logger.info("审批事件实例: id=%s status=%s", instance_id, status)
dedupe_key = (instance_id, str(status))
self._cleanup_seen()
if dedupe_key in self._seen:
logger.info("审批事件重复,跳过: %s", dedupe_key)
return
self._seen[dedupe_key] = time.time()
try:
detail = await self.feishu_client.get_approval_instance(instance_id)
logger.info("审批实例详情: %s", detail)
except Exception as exc:
logger.error("拉取审批详情失败: %s", exc)
return
detail_status = (
detail.get("data", {}).get("status")
or detail.get("data", {}).get("instance_status")
or status
or ""
)
if str(detail_status).upper() != "APPROVED":
logger.info("审批实例未通过,跳过回写: %s", detail_status)
return
project_no = self._extract_project_no_from_detail(detail)
if not project_no:
logger.error("未能从审批详情提取项目单号")
return
logger.info("提取项目单号: %s", project_no)
payment_type = self._extract_payment_type_from_detail(detail)
if not payment_type:
logger.error("未能从审批详情提取付款类型")
return
item_id = await self.huobanyun_service.find_item_by_project_no(project_no)
if not item_id:
logger.error("未找到对应伙伴云项目: %s", project_no)
return
logger.info("找到伙伴云项目: %s", item_id)
try:
await self.huobanyun_service.update_linked_flags(item_id, payment_type)
logger.info("审批完成回写成功: %s", item_id)
except Exception as exc:
logger.error("审批完成回写失败: %s", exc)
async def handle_event(self, event: Dict[str, Any]) -> None:
await self.handle_approval_event(event)