from __future__ import annotations import asyncio import json import logging from typing import Awaitable, Callable, Optional from app.config.settings import get_settings logger = logging.getLogger(__name__) class FeishuWsClient: def __init__(self, handler: Callable[[dict], Awaitable[None]]) -> None: self.settings = get_settings() self._handler = handler self._task: Optional[asyncio.Task] = None self._stopped = asyncio.Event() async def _run(self) -> None: try: import websockets # type: ignore except Exception as exc: logger.error("WebSocket 依赖缺失: %s", exc) return url = self.settings.feishu_ws_url if not url: logger.info("未配置飞书 WebSocket 地址,跳过连接") return while not self._stopped.is_set(): try: async with websockets.connect(url, ping_interval=20, ping_timeout=20) as ws: logger.info("飞书 WebSocket 已连接") async for message in ws: try: payload = json.loads(message) await self._handler(payload) except Exception as exc: logger.error("处理 WebSocket 消息失败: %s", exc) except Exception as exc: logger.error("WebSocket 连接异常: %s", exc) await asyncio.sleep(3) def start(self) -> None: if self._task is None: self._task = asyncio.create_task(self._run()) async def stop(self) -> None: self._stopped.set() if self._task: await self._task