from __future__ import annotations import asyncio import json import logging import threading from typing import Awaitable, Callable, Optional import lark_oapi as lark from app.config.settings import get_settings logger = logging.getLogger(__name__) class FeishuWsClient: def __init__( self, handler: Callable[[dict], Awaitable[None]], main_loop: Optional[asyncio.AbstractEventLoop] = None, ) -> None: self.settings = get_settings() self._handler = handler self._main_loop = main_loop self._thread: Optional[threading.Thread] = None def _run(self) -> None: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: from lark_oapi.ws import client as ws_client # type: ignore ws_client.loop = loop except Exception as exc: logger.error("设置飞书 WS 事件循环失败: %s", exc) if not self.settings.feishu_app_id or not self.settings.feishu_app_secret: logger.error("缺少飞书 APP_ID 或 APP_SECRET,无法启动长连接") return if not self.settings.feishu_approval_event_key: logger.error("缺少 FEISHU_APPROVAL_EVENT_KEY,无法订阅审批事件") return def do_customized_event(data: lark.CustomizedEvent) -> None: try: payload = lark.JSON.marshal(data) if isinstance(payload, str): payload = json.loads(payload) logger.info("收到飞书事件: %s", payload) if self._main_loop and self._main_loop.is_running(): asyncio.run_coroutine_threadsafe(self._handler(payload), self._main_loop) logger.info("飞书事件已投递主循环") else: logger.error("主事件循环不可用,无法处理飞书回调") except Exception as exc: logger.error("处理飞书事件失败: %s", exc) event_handler = ( lark.EventDispatcherHandler.builder("", "") .register_p1_customized_event( self.settings.feishu_approval_event_key, do_customized_event ) .build() ) client = lark.ws.Client( self.settings.feishu_app_id, self.settings.feishu_app_secret, event_handler=event_handler, log_level=lark.LogLevel.INFO, ) client.start() def start(self) -> None: if self._thread is None: self._thread = threading.Thread(target=self._run, daemon=True) self._thread.start() async def stop(self) -> None: return