diff --git a/.env b/.env index f0df2d4..59d8408 100644 --- a/.env +++ b/.env @@ -1 +1,10 @@ HUOBANYUN_API_KEY=emdYCszTIUrczBf2wOPGQ553J3OO9NCKKnLGJEK9 +FEISHU_APPROVAL_CODES=ECD8CE34-AA80-4A4F-B4C8-8510A7126490,BB944139-432F-4AC2-AD27-81C2F738E7C3,D7252659-47B6-4312-AC16-ECDE87FDB553,93F09E2D-B418-458D-A92D-10B56B53F45E +FEISHU_APPROVAL_CODE_PUBLIC=ECD8CE34-AA80-4A4F-B4C8-8510A7126490,BB944139-432F-4AC2-AD27-81C2F738E7C3 +FEISHU_APPROVAL_CODE_PRIVATE=D7252659-47B6-4312-AC16-ECDE87FDB553,93F09E2D-B418-458D-A92D-10B56B53F45E +FEISHU_APPROVAL_EVENT_KEY=approval_instance +FEISHU_PROJECT_NO_FIELD_CODE=proj_id +HUOBANYUN_ORDER_STATUS_DONE_ID=3 +HUOBANYUN_ORDER_STATUS_DONE_NAME=已完结 +FEISHU_APP_ID=cli_a90b035fd4799cb5 +FEISHU_APP_SECRET=O729hnbQARM2DHncWUd51eFHF6TDZAc3 \ No newline at end of file diff --git a/app/clients/feishu_client.py b/app/clients/feishu_client.py index 193e8ff..33a48be 100644 --- a/app/clients/feishu_client.py +++ b/app/clients/feishu_client.py @@ -44,3 +44,23 @@ class FeishuClient: resp = await client.get(url, headers=headers) resp.raise_for_status() return resp.json() + + async def subscribe_approval(self, approval_code: str) -> Dict[str, Any]: + token = await self._get_tenant_token() + url = ( + "https://open.feishu.cn/open-apis/approval/v4/approvals/" + f"{approval_code}/subscribe" + ) + headers = {"Authorization": f"Bearer {token}"} + async with httpx.AsyncClient(timeout=self.settings.request_timeout) as client: + resp = await client.post(url, headers=headers) + data = {} + try: + data = resp.json() + except Exception: + data = {} + code = data.get("code") + if resp.status_code != 200 and code != 1390007: + logger.error("订阅审批定义失败: status=%s body=%s", resp.status_code, resp.text) + resp.raise_for_status() + return data diff --git a/app/clients/feishu_ws_client.py b/app/clients/feishu_ws_client.py index 461aa60..df58e84 100644 --- a/app/clients/feishu_ws_client.py +++ b/app/clients/feishu_ws_client.py @@ -3,51 +3,75 @@ from __future__ import annotations import asyncio import json import logging +import threading from typing import Awaitable, Callable, Optional +import lark_oapi as lark + from app.config.settings import get_settings logger = logging.getLogger(__name__) class FeishuWsClient: - def __init__(self, handler: Callable[[dict], Awaitable[None]]) -> None: + def __init__( + self, + handler: Callable[[dict], Awaitable[None]], + main_loop: Optional[asyncio.AbstractEventLoop] = None, + ) -> None: self.settings = get_settings() self._handler = handler - self._task: Optional[asyncio.Task] = None - self._stopped = asyncio.Event() + self._main_loop = main_loop + self._thread: Optional[threading.Thread] = None - async def _run(self) -> None: + def _run(self) -> None: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) try: - import websockets # type: ignore + from lark_oapi.ws import client as ws_client # type: ignore + ws_client.loop = loop except Exception as exc: - logger.error("WebSocket 依赖缺失: %s", exc) + logger.error("设置飞书 WS 事件循环失败: %s", exc) + if not self.settings.feishu_app_id or not self.settings.feishu_app_secret: + logger.error("缺少飞书 APP_ID 或 APP_SECRET,无法启动长连接") + return + if not self.settings.feishu_approval_event_key: + logger.error("缺少 FEISHU_APPROVAL_EVENT_KEY,无法订阅审批事件") return - url = self.settings.feishu_ws_url - if not url: - logger.info("未配置飞书 WebSocket 地址,跳过连接") - return - - while not self._stopped.is_set(): + def do_customized_event(data: lark.CustomizedEvent) -> None: try: - async with websockets.connect(url, ping_interval=20, ping_timeout=20) as ws: - logger.info("飞书 WebSocket 已连接") - async for message in ws: - try: - payload = json.loads(message) - await self._handler(payload) - except Exception as exc: - logger.error("处理 WebSocket 消息失败: %s", exc) + payload = lark.JSON.marshal(data) + if isinstance(payload, str): + payload = json.loads(payload) + logger.info("收到飞书事件: %s", payload) + if self._main_loop and self._main_loop.is_running(): + asyncio.run_coroutine_threadsafe(self._handler(payload), self._main_loop) + logger.info("飞书事件已投递主循环") + else: + logger.error("主事件循环不可用,无法处理飞书回调") except Exception as exc: - logger.error("WebSocket 连接异常: %s", exc) - await asyncio.sleep(3) + logger.error("处理飞书事件失败: %s", exc) + + event_handler = ( + lark.EventDispatcherHandler.builder("", "") + .register_p1_customized_event( + self.settings.feishu_approval_event_key, do_customized_event + ) + .build() + ) + client = lark.ws.Client( + self.settings.feishu_app_id, + self.settings.feishu_app_secret, + event_handler=event_handler, + log_level=lark.LogLevel.INFO, + ) + client.start() def start(self) -> None: - if self._task is None: - self._task = asyncio.create_task(self._run()) + if self._thread is None: + self._thread = threading.Thread(target=self._run, daemon=True) + self._thread.start() async def stop(self) -> None: - self._stopped.set() - if self._task: - await self._task + return diff --git a/app/clients/huobanyun_client.py b/app/clients/huobanyun_client.py index ef9e76d..bca5a2a 100644 --- a/app/clients/huobanyun_client.py +++ b/app/clients/huobanyun_client.py @@ -60,3 +60,7 @@ class HuobanyunClient: async def list_items(self, payload: Dict[str, Any]) -> Dict[str, Any]: url = f"{self._base_url()}/openapi/v1/item/list" return await self._request("POST", url, payload) + + async def update_item(self, item_id: str, payload: Dict[str, Any]) -> Dict[str, Any]: + url = f"{self._base_url()}/openapi/v1/item/{item_id}" + return await self._request("PUT", url, payload) diff --git a/app/config/settings.py b/app/config/settings.py index f2c0e36..0eb2a4e 100644 --- a/app/config/settings.py +++ b/app/config/settings.py @@ -1,6 +1,7 @@ from __future__ import annotations import os +from typing import List from dotenv import load_dotenv from dataclasses import dataclass @@ -14,12 +15,20 @@ class Settings: feishu_verify_token: str feishu_encrypt_key: str feishu_ws_url: str + feishu_approval_code: str + feishu_approval_codes: List[str] + feishu_approval_code_public: List[str] + feishu_approval_code_private: List[str] + feishu_approval_event_key: str + feishu_project_no_field_code: str huobanyun_app_id: str huobanyun_app_secret: str huobanyun_token: str huobanyun_api_key: str huobanyun_base_url: str + huobanyun_order_status_done_id: str + huobanyun_order_status_done_name: str log_level: str log_rotation: str @@ -35,6 +44,11 @@ def _env(name: str, default: str = "") -> str: return os.getenv(name, default).strip() +def _env_list(name: str) -> List[str]: + raw = os.getenv(name, "") + return [item.strip() for item in raw.split(",") if item.strip()] + + @lru_cache(maxsize=1) def get_settings() -> Settings: load_dotenv() @@ -44,11 +58,19 @@ def get_settings() -> Settings: feishu_verify_token=_env("FEISHU_VERIFY_TOKEN"), feishu_encrypt_key=_env("FEISHU_ENCRYPT_KEY"), feishu_ws_url=_env("FEISHU_WS_URL"), + feishu_approval_code=_env("FEISHU_APPROVAL_CODE"), + feishu_approval_codes=_env_list("FEISHU_APPROVAL_CODES"), + feishu_approval_code_public=_env_list("FEISHU_APPROVAL_CODE_PUBLIC"), + feishu_approval_code_private=_env_list("FEISHU_APPROVAL_CODE_PRIVATE"), + feishu_approval_event_key=_env("FEISHU_APPROVAL_EVENT_KEY"), + feishu_project_no_field_code=_env("FEISHU_PROJECT_NO_FIELD_CODE"), huobanyun_app_id=_env("HUOBANYUN_APP_ID"), huobanyun_app_secret=_env("HUOBANYUN_APP_SECRET"), huobanyun_token=_env("HUOBANYUN_TOKEN"), huobanyun_api_key=_env("HUOBANYUN_API_KEY"), huobanyun_base_url=_env("HUOBANYUN_BASE_URL"), + huobanyun_order_status_done_id=_env("HUOBANYUN_ORDER_STATUS_DONE_ID"), + huobanyun_order_status_done_name=_env("HUOBANYUN_ORDER_STATUS_DONE_NAME", "已完成"), log_level=_env("LOG_LEVEL", "INFO"), log_rotation=_env("LOG_ROTATION", "size"), log_max_bytes=int(_env("LOG_MAX_BYTES", "10485760")), diff --git a/app/main.py b/app/main.py index b3aa722..da0d91c 100644 --- a/app/main.py +++ b/app/main.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio import logging import os import time @@ -30,12 +31,18 @@ async def on_startup() -> None: setup_logging(LOG_DIR) approval_service = ApprovalSyncService() + await approval_service.ensure_approval_subscription() async def handler(payload: dict) -> None: - event_payload = payload.get("event", payload) - await approval_service.handle_event(event_payload) + logger.info("开始处理飞书事件") + try: + event_payload = payload.get("event", payload) + await approval_service.handle_event(event_payload) + logger.info("完成处理飞书事件") + except Exception as exc: + logger.exception("处理飞书事件异常: %s", exc) - ws_client = FeishuWsClient(handler) + ws_client = FeishuWsClient(handler, asyncio.get_running_loop()) ws_client.start() app.state.ws_client = ws_client diff --git a/app/schemas/feishu_external.py b/app/schemas/feishu_external.py index a01eaf7..ac74a57 100644 --- a/app/schemas/feishu_external.py +++ b/app/schemas/feishu_external.py @@ -18,6 +18,7 @@ class FeishuExternalQueryRequest(BaseModel): user_id: Optional[str] = None employee_id: Optional[str] = None token: Optional[str] = None + approval_code: Optional[str] = None linkage_params: Dict[str, Any] = {} page_token: Optional[str] = None query: Optional[str] = None diff --git a/app/services/approval_sync_service.py b/app/services/approval_sync_service.py index 12dc282..d9e1e2f 100644 --- a/app/services/approval_sync_service.py +++ b/app/services/approval_sync_service.py @@ -5,15 +5,17 @@ import time from typing import Any, Dict, Optional, Tuple from app.clients.feishu_client import FeishuClient -from app.clients.huobanyun_client import HuobanyunClient +from app.config.settings import get_settings +from app.services.huobanyun_service import HuobanyunService logger = logging.getLogger(__name__) class ApprovalSyncService: def __init__(self) -> None: + self.settings = get_settings() self.feishu_client = FeishuClient() - self.huobanyun_client = HuobanyunClient() + self.huobanyun_service = HuobanyunService() self._seen: Dict[Tuple[str, str], float] = {} def _cleanup_seen(self) -> None: @@ -27,32 +29,127 @@ class ApprovalSyncService: return False return status.upper() in {"APPROVED", "REJECTED", "CANCELED", "CANCELLED", "DONE"} - async def handle_event(self, event: Dict[str, Any]) -> None: - instance_id = event.get("instance_id") or event.get("instanceId") or "" - status = event.get("status") or event.get("instance_status") or "" - if not instance_id or not self._is_done(status): + def _extract_project_no_from_detail(self, detail: Dict[str, Any]) -> str: + field_code = self.settings.feishu_project_no_field_code + if not field_code: + return "" + data = detail.get("data", {}) + form = data.get("form") + if isinstance(form, str): + try: + import json + form = json.loads(form) + except Exception: + form = None + if isinstance(form, list): + for item in form: + if not isinstance(item, dict): + continue + code = ( + item.get("custom_id") + or item.get("id") + or item.get("field_code") + or item.get("code") + ) + if code == field_code: + value = item.get("value") or item.get("text") or item.get("name") + if isinstance(value, list) and value: + return str(value[0]) + return "" if value is None else str(value) + if isinstance(form, dict): + value = form.get(field_code) + if isinstance(value, list) and value: + return str(value[0]) + return "" if value is None else str(value) + return "" + + async def ensure_approval_subscription(self) -> None: + codes = self.settings.feishu_approval_codes or ( + [self.settings.feishu_approval_code] if self.settings.feishu_approval_code else [] + ) + if not codes: + logger.error("未配置 FEISHU_APPROVAL_CODE,无法订阅审批定义") + return + for approval_code in codes: + try: + resp = await self.feishu_client.subscribe_approval(approval_code) + code = resp.get("code") + msg = resp.get("msg") + if code in (0, 1390007): + logger.info("审批定义订阅成功或已存在: %s", msg) + else: + logger.error("审批定义订阅失败: code=%s msg=%s", code, msg) + except Exception as exc: + logger.error("订阅审批定义异常: %s", exc) + + async def handle_approval_event(self, event: Dict[str, Any]) -> None: + payload = event.get("event", event) + logger.info("审批事件内容: %s", payload) + approval_code = payload.get("approval_code") or payload.get("approvalCode") + codes = self.settings.feishu_approval_codes or ( + [self.settings.feishu_approval_code] if self.settings.feishu_approval_code else [] + ) + if codes and approval_code not in codes: + logger.info("审批定义不匹配,跳过: %s", approval_code) + return + + instance_id = ( + payload.get("instance_id") + or payload.get("instanceId") + or payload.get("instance_code") + or payload.get("instanceCode") + or "" + ) + status = payload.get("status") or payload.get("instance_status") or payload.get("instanceStatus") + if not instance_id or not status: logger.info("审批事件未结束或缺少实例ID,跳过处理") return - dedupe_key = (instance_id, status) + logger.info("审批事件实例: id=%s status=%s", instance_id, status) + + dedupe_key = (instance_id, str(status)) self._cleanup_seen() if dedupe_key in self._seen: logger.info("审批事件重复,跳过: %s", dedupe_key) return self._seen[dedupe_key] = time.time() - detail = {} try: detail = await self.feishu_client.get_approval_instance(instance_id) + logger.info("审批实例详情: %s", detail) except Exception as exc: logger.error("拉取审批详情失败: %s", exc) + return - record_id = event.get("record_id") or instance_id - fields = event.get("fields") or detail.get("data", {}).get("form", {}) - payload = {"record_id": record_id, "fields": fields} + detail_status = ( + detail.get("data", {}).get("status") + or detail.get("data", {}).get("instance_status") + or status + or "" + ) + if str(detail_status).upper() != "APPROVED": + logger.info("审批实例未通过,跳过回写: %s", detail_status) + return + + project_no = self._extract_project_no_from_detail(detail) + if not project_no: + logger.error("未能从审批详情提取项目单号") + return + logger.info("提取项目单号: %s", project_no) + + item_id = await self.huobanyun_service.find_item_by_project_no(project_no) + if not item_id: + logger.error("未找到对应伙伴云项目: %s", project_no) + return + logger.info("找到伙伴云项目: %s", item_id) try: - await self.huobanyun_client.writeback(payload) - logger.info("审批写回伙伴云成功: %s", record_id) + await self.huobanyun_service.update_order_status(item_id) + if approval_code: + await self.huobanyun_service.update_linked_flags(item_id, approval_code) + logger.info("审批完成回写成功: %s", item_id) except Exception as exc: - logger.error("审批写回伙伴云失败: %s", exc) + logger.error("审批完成回写失败: %s", exc) + + async def handle_event(self, event: Dict[str, Any]) -> None: + await self.handle_approval_event(event) diff --git a/app/services/huobanyun_service.py b/app/services/huobanyun_service.py index a941468..fa05054 100644 --- a/app/services/huobanyun_service.py +++ b/app/services/huobanyun_service.py @@ -3,6 +3,7 @@ from __future__ import annotations import logging from typing import Any, Dict, List, Optional +from app.config.settings import get_settings from app.clients.huobanyun_client import HuobanyunClient from app.schemas.feishu_external import ( FeishuExternalItem, @@ -16,6 +17,7 @@ logger = logging.getLogger(__name__) class HuobanyunService: def __init__(self) -> None: + self.settings = get_settings() self.client = HuobanyunClient() def _extract_value(self, value: Any) -> Any: @@ -50,6 +52,50 @@ class HuobanyunService: } return mapping.get(key, key) + async def find_item_by_project_no(self, project_no: str) -> str: + table_id = "2100000015544940" + project_no_field_id = self._resolve_field_key("项目单号") + payload = { + "table_id": table_id, + "limit": 1, + "offset": 0, + "filter": { + "and": [ + {"field": project_no_field_id, "query": {"eq": str(project_no)}} + ] + }, + } + data = await self.client.list_items(payload) + data_block = data.get("data", {}) if isinstance(data, dict) else {} + items = data_block.get("items", []) if isinstance(data_block, dict) else [] + if not items: + return "" + return str(items[0].get("item_id", "")) + + async def update_order_status(self, item_id: str) -> None: + status_field_id = self._resolve_field_key("订单状态") + status_id = self.settings.huobanyun_order_status_done_id + status_name = self.settings.huobanyun_order_status_done_name or "已完成" + status_value = status_id or status_name + if not status_value: + logger.error("未配置订单状态已完成选项值,跳过更新") + return + payload = {"fields": {status_field_id: [str(status_value)]}} + await self.client.update_item(item_id, payload) + + async def update_linked_flags(self, item_id: str, approval_code: str) -> None: + public_codes = set(self.settings.feishu_approval_code_public) + private_codes = set(self.settings.feishu_approval_code_private) + fields: Dict[str, Any] = {} + if approval_code in public_codes: + fields[self._resolve_field_key("是否已关联对公付款审批")] = True + if approval_code in private_codes: + fields[self._resolve_field_key("是否已关联对私付款审批")] = True + if not fields: + return + payload = {"fields": fields} + await self.client.update_item(item_id, payload) + def _resolve_field_label(self, key: str) -> str: reverse = { "2200000149785345": "项目单号", @@ -189,7 +235,24 @@ class HuobanyunService: else: offset = 0 - key = req.key or raw.get("key") or raw.get("field") or req.token or "" + approval_code = ( + req.approval_code + or raw.get("approval_code") + or raw.get("approvalCode") + or (req.linkage_params or {}).get("approval_code") + or (req.linkage_params or {}).get("approvalCode") + or "" + ) + token_key = req.token or "" + key = req.key or raw.get("key") or raw.get("field") or token_key or "" + approval_hint = "" + if isinstance(token_key, str): + if token_key.startswith("对公"): + approval_hint = "public" + key = token_key.replace("对公", "", 1) + elif token_key.startswith("对私"): + approval_hint = "private" + key = token_key.replace("对私", "", 1) key = self._resolve_field_key(str(key).strip()) if key else "" query_value = req.query or req.keyword or "" linkage_params = req.linkage_params or raw.get("linkage_params") or {} @@ -203,6 +266,21 @@ class HuobanyunService: {"field": project_no_field_id, "query": {"em": False}}, {"field": order_status_field, "query": {"ne": ["已完成"]}}, ] + if approval_code or approval_hint: + if approval_hint == "public" or approval_code in set(self.settings.feishu_approval_code_public): + base_filters.append( + { + "field": self._resolve_field_key("是否已关联对公付款审批"), + "query": {"eq": False}, + } + ) + if approval_hint == "private" or approval_code in set(self.settings.feishu_approval_code_private): + base_filters.append( + { + "field": self._resolve_field_key("是否已关联对私付款审批"), + "query": {"eq": False}, + } + ) if linkage_project_no: base_filters.append( { @@ -264,6 +342,11 @@ class HuobanyunService: project_key = key or self._resolve_field_key("项目单号") value = self._extract_value(fields.get(project_key, "")) value_str = "" if value is None else str(value) + if project_key == self._resolve_field_key("项目单号"): + if not value_str: + continue + if value_str.startswith("2300") and len(value_str) >= 10: + continue i18n_key = f"@i18n@{item_id}" if item_id else f"@i18n@{value_str}" options.append({"id": str(item_id), "value": i18n_key, "isDefault": False}) texts[i18n_key] = value_str diff --git a/requirements.txt b/requirements.txt index b3c0a1d..f604788 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,5 @@ uvicorn httpx websockets python-dotenv +lark-oapi +python-socks