approval/app/clients/feishu_ws_client.py

54 lines
1.7 KiB
Python

from __future__ import annotations
import asyncio
import json
import logging
from typing import Awaitable, Callable, Optional
from app.config.settings import get_settings
logger = logging.getLogger(__name__)
class FeishuWsClient:
def __init__(self, handler: Callable[[dict], Awaitable[None]]) -> None:
self.settings = get_settings()
self._handler = handler
self._task: Optional[asyncio.Task] = None
self._stopped = asyncio.Event()
async def _run(self) -> None:
try:
import websockets # type: ignore
except Exception as exc:
logger.error("WebSocket 依赖缺失: %s", exc)
return
url = self.settings.feishu_ws_url
if not url:
logger.info("未配置飞书 WebSocket 地址,跳过连接")
return
while not self._stopped.is_set():
try:
async with websockets.connect(url, ping_interval=20, ping_timeout=20) as ws:
logger.info("飞书 WebSocket 已连接")
async for message in ws:
try:
payload = json.loads(message)
await self._handler(payload)
except Exception as exc:
logger.error("处理 WebSocket 消息失败: %s", exc)
except Exception as exc:
logger.error("WebSocket 连接异常: %s", exc)
await asyncio.sleep(3)
def start(self) -> None:
if self._task is None:
self._task = asyncio.create_task(self._run())
async def stop(self) -> None:
self._stopped.set()
if self._task:
await self._task