This commit is contained in:
Marsway 2026-02-03 16:19:19 +08:00
commit 52468b4484
29 changed files with 991 additions and 0 deletions

1
.env Normal file
View File

@ -0,0 +1 @@
HUOBANYUN_API_KEY=emdYCszTIUrczBf2wOPGQ553J3OO9NCKKnLGJEK9

4
.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
*.pyc
*.log
__pycache__/
logs/

1
app/__init__.py Normal file
View File

@ -0,0 +1 @@
"""Application package."""

1
app/api/__init__.py Normal file
View File

@ -0,0 +1 @@
"""API routes."""

30
app/api/feishu_events.py Normal file
View File

@ -0,0 +1,30 @@
from __future__ import annotations
import logging
from fastapi import APIRouter, HTTPException, Request
from app.schemas.feishu_events import FeishuEventRequest
from app.services.approval_sync_service import ApprovalSyncService
from app.services.feishu_service import FeishuService
router = APIRouter()
feishu_service = FeishuService()
approval_service = ApprovalSyncService()
logger = logging.getLogger(__name__)
@router.post("/feishu/events")
async def feishu_events(request: Request):
body = await request.json()
event_req = FeishuEventRequest(**body)
if event_req.challenge:
return {"challenge": event_req.challenge}
if not feishu_service.verify_event(body):
raise HTTPException(status_code=403, detail="invalid token")
event_payload = body.get("event", body)
try:
await approval_service.handle_event(event_payload)
except Exception as exc:
logger.error("审批事件处理失败: %s", exc)
return {"ok": True}

View File

@ -0,0 +1,20 @@
from __future__ import annotations
from fastapi import APIRouter, Header, HTTPException
from app.schemas.feishu_external import FeishuExternalQueryRequest
from app.services.feishu_service import FeishuService
from app.services.huobanyun_service import HuobanyunService
router = APIRouter()
feishu_service = FeishuService()
huobanyun_service = HuobanyunService()
@router.post("/feishu/approval/external-data/query")
async def feishu_external_query(
req: FeishuExternalQueryRequest, x_feishu_token: str | None = Header(default=None)
):
if not feishu_service.verify_token(x_feishu_token):
raise HTTPException(status_code=403, detail="invalid token")
return await huobanyun_service.query(req)

35
app/api/logs.py Normal file
View File

@ -0,0 +1,35 @@
from __future__ import annotations
from fastapi import APIRouter, Header, HTTPException, Query
from app.config.settings import get_settings
from app.services.logs_service import LogsService
router = APIRouter()
settings = get_settings()
logs_service = LogsService(log_dir="logs")
@router.get("/logs/query")
async def query_logs(
keyword: str = Query(default=""),
start: str | None = Query(default=None),
end: str | None = Query(default=None),
page: int = Query(default=1, ge=1),
size: int = Query(default=50, ge=1, le=500),
x_log_token: str | None = Header(default=None),
token: str | None = Query(default=None),
):
expected = settings.log_query_token
provided = x_log_token or token
if expected and expected != provided:
raise HTTPException(status_code=403, detail="invalid log token")
total, lines = logs_service.query(keyword=keyword, start=start, end=end, page=page, size=size)
has_more = total > page * size
return {
"total": total,
"page": page,
"size": size,
"has_more": has_more,
"lines": lines,
}

22
app/api/projects.py Normal file
View File

@ -0,0 +1,22 @@
from __future__ import annotations
from fastapi import APIRouter, Query
from app.services.huobanyun_service import HuobanyunService
router = APIRouter()
huobanyun_service = HuobanyunService()
@router.get("/projects")
async def get_projects_list(
project_name: str | None = Query(default=None),
project_no: str | None = Query(default=None),
page: int = Query(default=1, ge=1),
):
return await huobanyun_service.get_projects_list(
project_name=project_name,
project_no=project_no,
page=page,
size=50,
)

1
app/clients/__init__.py Normal file
View File

@ -0,0 +1 @@
"""External clients."""

View File

@ -0,0 +1,46 @@
from __future__ import annotations
import logging
import time
from typing import Any, Dict
import httpx
from app.config.settings import get_settings
logger = logging.getLogger(__name__)
_token_cache: Dict[str, Any] = {"token": "", "expires_at": 0.0}
class FeishuClient:
def __init__(self) -> None:
self.settings = get_settings()
async def _get_tenant_token(self) -> str:
if _token_cache["token"] and time.time() < _token_cache["expires_at"]:
return _token_cache["token"]
url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal"
payload = {
"app_id": self.settings.feishu_app_id,
"app_secret": self.settings.feishu_app_secret,
}
async with httpx.AsyncClient(timeout=self.settings.request_timeout) as client:
resp = await client.post(url, json=payload)
resp.raise_for_status()
data = resp.json()
token = data.get("tenant_access_token", "")
expire = int(data.get("expire", 0))
_token_cache["token"] = token
_token_cache["expires_at"] = time.time() + max(expire - 60, 0)
return token
async def get_approval_instance(self, instance_id: str) -> Dict[str, Any]:
token = await self._get_tenant_token()
url = f"https://open.feishu.cn/open-apis/approval/v4/instances/{instance_id}"
headers = {"Authorization": f"Bearer {token}"}
async with httpx.AsyncClient(timeout=self.settings.request_timeout) as client:
resp = await client.get(url, headers=headers)
resp.raise_for_status()
return resp.json()

View File

@ -0,0 +1,53 @@
from __future__ import annotations
import asyncio
import json
import logging
from typing import Awaitable, Callable, Optional
from app.config.settings import get_settings
logger = logging.getLogger(__name__)
class FeishuWsClient:
def __init__(self, handler: Callable[[dict], Awaitable[None]]) -> None:
self.settings = get_settings()
self._handler = handler
self._task: Optional[asyncio.Task] = None
self._stopped = asyncio.Event()
async def _run(self) -> None:
try:
import websockets # type: ignore
except Exception as exc:
logger.error("WebSocket 依赖缺失: %s", exc)
return
url = self.settings.feishu_ws_url
if not url:
logger.info("未配置飞书 WebSocket 地址,跳过连接")
return
while not self._stopped.is_set():
try:
async with websockets.connect(url, ping_interval=20, ping_timeout=20) as ws:
logger.info("飞书 WebSocket 已连接")
async for message in ws:
try:
payload = json.loads(message)
await self._handler(payload)
except Exception as exc:
logger.error("处理 WebSocket 消息失败: %s", exc)
except Exception as exc:
logger.error("WebSocket 连接异常: %s", exc)
await asyncio.sleep(3)
def start(self) -> None:
if self._task is None:
self._task = asyncio.create_task(self._run())
async def stop(self) -> None:
self._stopped.set()
if self._task:
await self._task

View File

@ -0,0 +1,62 @@
from __future__ import annotations
import asyncio
import logging
from typing import Any, Dict
import httpx
from app.config.settings import get_settings
logger = logging.getLogger(__name__)
class HuobanyunClient:
def __init__(self) -> None:
self.settings = get_settings()
def _headers(self) -> Dict[str, str]:
headers = {"Content-Type": "application/json"}
if self.settings.huobanyun_api_key:
headers["Open-Authorization"] = f"Bearer {self.settings.huobanyun_api_key}"
if self.settings.huobanyun_token:
headers["Authorization"] = f"Bearer {self.settings.huobanyun_token}"
if self.settings.huobanyun_app_id:
headers["X-App-Id"] = self.settings.huobanyun_app_id
if self.settings.huobanyun_app_secret:
headers["X-App-Secret"] = self.settings.huobanyun_app_secret
return headers
def _base_url(self) -> str:
base = self.settings.huobanyun_base_url.strip()
if base:
return base.rstrip("/")
return "https://api.huoban.com"
async def _request(self, method: str, url: str, json: Dict[str, Any]) -> Dict[str, Any]:
timeout = self.settings.request_timeout
retries = self.settings.retry_count
for attempt in range(retries + 1):
try:
async with httpx.AsyncClient(timeout=timeout) as client:
resp = await client.request(method, url, json=json, headers=self._headers())
resp.raise_for_status()
return resp.json()
except Exception as exc:
logger.error("伙伴云请求失败: %s", exc)
if attempt >= retries:
raise
await asyncio.sleep(1 + attempt)
return {}
async def query(self, payload: Dict[str, Any]) -> Dict[str, Any]:
url = f"{self._base_url()}/query"
return await self._request("POST", url, payload)
async def writeback(self, payload: Dict[str, Any]) -> Dict[str, Any]:
url = f"{self._base_url()}/writeback"
return await self._request("POST", url, payload)
async def list_items(self, payload: Dict[str, Any]) -> Dict[str, Any]:
url = f"{self._base_url()}/openapi/v1/item/list"
return await self._request("POST", url, payload)

1
app/config/__init__.py Normal file
View File

@ -0,0 +1 @@
"""Configuration package."""

72
app/config/logging.py Normal file
View File

@ -0,0 +1,72 @@
from __future__ import annotations
import logging
import os
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler
from typing import Tuple
from app.config.settings import get_settings
def _build_file_handler(
log_path: str, rotation: str, max_bytes: int, backup_count: int
) -> logging.Handler:
if rotation.lower() == "time":
handler: logging.Handler = TimedRotatingFileHandler(
log_path, when="D", backupCount=backup_count, encoding="utf-8"
)
else:
handler = RotatingFileHandler(
log_path, maxBytes=max_bytes, backupCount=backup_count, encoding="utf-8"
)
return handler
def _build_formatters() -> Tuple[logging.Formatter, logging.Formatter]:
base_format = "%(asctime)s | %(levelname)s | %(name)s | %(message)s"
access_format = (
"%(asctime)s | %(levelname)s | access | %(message)s"
)
return (
logging.Formatter(base_format, datefmt="%Y-%m-%d %H:%M:%S"),
logging.Formatter(access_format, datefmt="%Y-%m-%d %H:%M:%S"),
)
def setup_logging(log_dir: str) -> None:
settings = get_settings()
os.makedirs(log_dir, exist_ok=True)
app_log = os.path.join(log_dir, "app.log")
error_log = os.path.join(log_dir, "error.log")
access_log = os.path.join(log_dir, "access.log")
formatter, access_formatter = _build_formatters()
app_handler = _build_file_handler(
app_log, settings.log_rotation, settings.log_max_bytes, settings.log_backup_count
)
app_handler.setFormatter(formatter)
error_handler = _build_file_handler(
error_log, settings.log_rotation, settings.log_max_bytes, settings.log_backup_count
)
error_handler.setLevel(logging.ERROR)
error_handler.setFormatter(formatter)
access_handler = _build_file_handler(
access_log, settings.log_rotation, settings.log_max_bytes, settings.log_backup_count
)
access_handler.setFormatter(access_formatter)
root = logging.getLogger()
root.setLevel(settings.log_level.upper() or "INFO")
root.handlers = []
root.addHandler(app_handler)
root.addHandler(error_handler)
access_logger = logging.getLogger("access")
access_logger.setLevel(settings.log_level.upper() or "INFO")
access_logger.handlers = []
access_logger.propagate = False
access_logger.addHandler(access_handler)

59
app/config/settings.py Normal file
View File

@ -0,0 +1,59 @@
from __future__ import annotations
import os
from dotenv import load_dotenv
from dataclasses import dataclass
from functools import lru_cache
@dataclass(frozen=True)
class Settings:
feishu_app_id: str
feishu_app_secret: str
feishu_verify_token: str
feishu_encrypt_key: str
feishu_ws_url: str
huobanyun_app_id: str
huobanyun_app_secret: str
huobanyun_token: str
huobanyun_api_key: str
huobanyun_base_url: str
log_level: str
log_rotation: str
log_max_bytes: int
log_backup_count: int
log_query_token: str
request_timeout: int
retry_count: int
def _env(name: str, default: str = "") -> str:
return os.getenv(name, default).strip()
@lru_cache(maxsize=1)
def get_settings() -> Settings:
load_dotenv()
return Settings(
feishu_app_id=_env("FEISHU_APP_ID"),
feishu_app_secret=_env("FEISHU_APP_SECRET"),
feishu_verify_token=_env("FEISHU_VERIFY_TOKEN"),
feishu_encrypt_key=_env("FEISHU_ENCRYPT_KEY"),
feishu_ws_url=_env("FEISHU_WS_URL"),
huobanyun_app_id=_env("HUOBANYUN_APP_ID"),
huobanyun_app_secret=_env("HUOBANYUN_APP_SECRET"),
huobanyun_token=_env("HUOBANYUN_TOKEN"),
huobanyun_api_key=_env("HUOBANYUN_API_KEY"),
huobanyun_base_url=_env("HUOBANYUN_BASE_URL"),
log_level=_env("LOG_LEVEL", "INFO"),
log_rotation=_env("LOG_ROTATION", "size"),
log_max_bytes=int(_env("LOG_MAX_BYTES", "10485760")),
log_backup_count=int(_env("LOG_BACKUP_COUNT", "10")),
log_query_token=_env("LOG_QUERY_TOKEN"),
request_timeout=int(_env("REQUEST_TIMEOUT", "10")),
retry_count=int(_env("RETRY_COUNT", "2")),
)

96
app/main.py Normal file
View File

@ -0,0 +1,96 @@
from __future__ import annotations
import logging
import os
import time
import uuid
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from fastapi.staticfiles import StaticFiles
from app.api.feishu_events import router as feishu_events_router
from app.api.feishu_external import router as feishu_external_router
from app.api.logs import router as logs_router
from app.api.projects import router as projects_router
from app.clients.feishu_ws_client import FeishuWsClient
from app.config.logging import setup_logging
from app.services.approval_sync_service import ApprovalSyncService
LOG_DIR = "logs"
app = FastAPI()
logger = logging.getLogger(__name__)
access_logger = logging.getLogger("access")
@app.on_event("startup")
async def on_startup() -> None:
os.makedirs(LOG_DIR, exist_ok=True)
setup_logging(LOG_DIR)
approval_service = ApprovalSyncService()
async def handler(payload: dict) -> None:
event_payload = payload.get("event", payload)
await approval_service.handle_event(event_payload)
ws_client = FeishuWsClient(handler)
ws_client.start()
app.state.ws_client = ws_client
@app.on_event("shutdown")
async def on_shutdown() -> None:
ws_client = getattr(app.state, "ws_client", None)
if ws_client:
await ws_client.stop()
@app.middleware("http")
async def request_logging(request: Request, call_next):
request_id = str(uuid.uuid4())
start = time.time()
try:
raw_body = await request.body()
body_text = raw_body.decode("utf-8") if raw_body else ""
except Exception:
body_text = ""
logger.info(
"request_id=%s client=%s method=%s path=%s query=%s body=%s",
request_id,
request.client.host if request.client else "-",
request.method,
request.url.path,
request.url.query,
body_text,
)
try:
response = await call_next(request)
except Exception as exc:
logger.exception("请求异常: %s", exc)
response = JSONResponse(status_code=500, content={"detail": "internal error"})
duration = int((time.time() - start) * 1000)
access_logger.info(
"request_id=%s method=%s path=%s status=%s cost_ms=%s",
request_id,
request.method,
request.url.path,
response.status_code,
duration,
)
return response
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
logger.exception("未处理异常: %s", exc)
return JSONResponse(status_code=500, content={"detail": "internal error"})
app.include_router(feishu_external_router)
app.include_router(feishu_events_router)
app.include_router(logs_router)
app.include_router(projects_router)
app.mount("/logs", StaticFiles(directory="app/static/logs", html=True), name="logs")

1
app/schemas/__init__.py Normal file
View File

@ -0,0 +1 @@
"""Pydantic schemas."""

View File

@ -0,0 +1,27 @@
from __future__ import annotations
from typing import Any, Dict, Optional
from pydantic import BaseModel
class FeishuEventHeader(BaseModel):
event_id: Optional[str] = None
event_type: Optional[str] = None
create_time: Optional[str] = None
class FeishuApprovalEvent(BaseModel):
header: FeishuEventHeader = FeishuEventHeader()
event: Dict[str, Any] = {}
class FeishuEventRequest(BaseModel):
challenge: Optional[str] = None
token: Optional[str] = None
type: Optional[str] = None
event: Dict[str, Any] = {}
header: FeishuEventHeader = FeishuEventHeader()
class Config:
extra = "allow"

View File

@ -0,0 +1,35 @@
from __future__ import annotations
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
class FeishuExternalQueryFilter(BaseModel):
key: str
value: Any
class FeishuExternalQueryRequest(BaseModel):
keyword: Optional[str] = ""
page: int = 1
page_size: int = Field(default=20, alias="pageSize")
filters: List[FeishuExternalQueryFilter] = []
raw: Dict[str, Any] = {}
class Config:
extra = "allow"
class FeishuExternalItem(BaseModel):
id: str
label: str
value: str
extra: Dict[str, Any] = {}
class FeishuExternalQueryResponse(BaseModel):
code: int = 0
msg: str = "ok"
total: int = 0
data: List[FeishuExternalItem] = []

34
app/schemas/huobanyun.py Normal file
View File

@ -0,0 +1,34 @@
from __future__ import annotations
from typing import Any, Dict, List, Optional
from pydantic import BaseModel
class HuobanyunQueryRequest(BaseModel):
keyword: Optional[str] = ""
page: int = 1
page_size: int = 20
filters: Dict[str, Any] = {}
class HuobanyunItem(BaseModel):
id: str
name: str
value: str
extra: Dict[str, Any] = {}
class HuobanyunQueryResponse(BaseModel):
total: int = 0
items: List[HuobanyunItem] = []
class HuobanyunWritebackRequest(BaseModel):
record_id: str
fields: Dict[str, Any]
class HuobanyunWritebackResponse(BaseModel):
success: bool
message: str = ""

1
app/services/__init__.py Normal file
View File

@ -0,0 +1 @@
"""Service layer."""

View File

@ -0,0 +1,58 @@
from __future__ import annotations
import logging
import time
from typing import Any, Dict, Optional, Tuple
from app.clients.feishu_client import FeishuClient
from app.clients.huobanyun_client import HuobanyunClient
logger = logging.getLogger(__name__)
class ApprovalSyncService:
def __init__(self) -> None:
self.feishu_client = FeishuClient()
self.huobanyun_client = HuobanyunClient()
self._seen: Dict[Tuple[str, str], float] = {}
def _cleanup_seen(self) -> None:
now = time.time()
expired = [key for key, ts in self._seen.items() if now - ts > 3600]
for key in expired:
self._seen.pop(key, None)
def _is_done(self, status: Optional[str]) -> bool:
if not status:
return False
return status.upper() in {"APPROVED", "REJECTED", "CANCELED", "CANCELLED", "DONE"}
async def handle_event(self, event: Dict[str, Any]) -> None:
instance_id = event.get("instance_id") or event.get("instanceId") or ""
status = event.get("status") or event.get("instance_status") or ""
if not instance_id or not self._is_done(status):
logger.info("审批事件未结束或缺少实例ID跳过处理")
return
dedupe_key = (instance_id, status)
self._cleanup_seen()
if dedupe_key in self._seen:
logger.info("审批事件重复,跳过: %s", dedupe_key)
return
self._seen[dedupe_key] = time.time()
detail = {}
try:
detail = await self.feishu_client.get_approval_instance(instance_id)
except Exception as exc:
logger.error("拉取审批详情失败: %s", exc)
record_id = event.get("record_id") or instance_id
fields = event.get("fields") or detail.get("data", {}).get("form", {})
payload = {"record_id": record_id, "fields": fields}
try:
await self.huobanyun_client.writeback(payload)
logger.info("审批写回伙伴云成功: %s", record_id)
except Exception as exc:
logger.error("审批写回伙伴云失败: %s", exc)

View File

@ -0,0 +1,22 @@
from __future__ import annotations
import logging
from typing import Any, Dict, Optional
from app.config.settings import get_settings
logger = logging.getLogger(__name__)
class FeishuService:
def __init__(self) -> None:
self.settings = get_settings()
def verify_token(self, token: Optional[str]) -> bool:
if not self.settings.feishu_verify_token:
return True
return token == self.settings.feishu_verify_token
def verify_event(self, body: Dict[str, Any]) -> bool:
token = body.get("token")
return self.verify_token(token)

View File

@ -0,0 +1,184 @@
from __future__ import annotations
import logging
from typing import Any, Dict, List, Optional
from app.clients.huobanyun_client import HuobanyunClient
from app.schemas.feishu_external import (
FeishuExternalItem,
FeishuExternalQueryRequest,
FeishuExternalQueryResponse,
)
from app.schemas.huobanyun import HuobanyunQueryResponse
logger = logging.getLogger(__name__)
class HuobanyunService:
def __init__(self) -> None:
self.client = HuobanyunClient()
def _extract_value(self, value: Any) -> Any:
if isinstance(value, dict):
for key in ("value", "text", "name", "title", "label", "id"):
if key in value:
return value.get(key)
return value
if isinstance(value, list):
if not value:
return value
if len(value) == 1:
return self._extract_value(value[0])
return value
def _pick_field(self, fields: Dict[str, Any], keys: List[str]) -> Any:
for key in keys:
if key in fields:
return self._extract_value(fields.get(key))
return ""
def _to_bool(self, value: Any) -> Optional[bool]:
if isinstance(value, bool):
return value
if isinstance(value, (int, float)):
return value != 0
if isinstance(value, str):
val = value.strip().lower()
if val in {"", "true", "1", "yes", "y"}:
return True
if val in {"", "false", "0", "no", "n"}:
return False
if isinstance(value, list):
return len(value) > 0
return None
async def get_projects_list(
self,
project_name: str | None = None,
project_no: str | None = None,
page: int = 1,
size: int = 50,
) -> Dict[str, Any]:
table_id = "2100000015544940"
limit = 50 if size <= 0 else min(size, 100)
page = 1 if page <= 0 else page
offset = (page - 1) * limit
filter_items: List[Dict[str, Any]] = []
if project_name:
filter_items.append(
{"field": "2200000150711223", "query": {"eqm": [project_name]}}
)
if project_no:
filter_items.append(
{"field": "proj_id", "query": {"eqm": [project_no]}}
)
payload: Dict[str, Any] = {"table_id": table_id, "limit": limit, "offset": offset}
if filter_items:
payload["filter"] = {"and": filter_items}
payload["order"] = {"field_id": "created_on", "type": "desc"}
payload["with_field_config"] = 0
data = await self.client.list_items(payload)
if isinstance(data, dict):
logger.info(
"伙伴云列表响应: keys=%s code=%s message=%s",
list(data.keys()),
data.get("code"),
data.get("message"),
)
if isinstance(data, dict):
data_block = data.get("data", data)
if isinstance(data_block, list):
logger.warning("伙伴云 data 字段为列表,按 items 处理")
items = data_block
total = len(items)
elif isinstance(data_block, dict):
items = data_block.get("items", [])
if not items and isinstance(data.get("items"), list):
items = data.get("items", [])
total = data_block.get("filtered", data_block.get("total", len(items)))
else:
logger.error("伙伴云 data 字段结构异常: %s", type(data_block))
items = []
total = 0
elif isinstance(data, list):
logger.warning("伙伴云返回为列表,按 items 处理")
items = data
total = len(items)
else:
logger.error("伙伴云返回结构异常: %s", type(data))
items = []
total = 0
if items:
sample_fields = items[0].get("fields", {})
logger.info("项目字段样例 keys=%s", list(sample_fields.keys()))
mapped_items: List[Dict[str, Any]] = []
for item in items:
fields = item.get("fields", {})
project_no = self._pick_field(fields, ["proj_id", "2200000149785345"])
project_name = self._pick_field(fields, ["2200000150711223"])
if not project_name:
project_name = item.get("title", "")
order_status = self._pick_field(fields, ["2200000150497330"])
order_amount = self._pick_field(fields, ["2200000149785349"])
linked_public = self._pick_field(
fields,
[
"2200000589775224",
],
)
linked_private = self._pick_field(
fields,
[
"2200000589775228",
],
)
mapped_items.append(
{
"project_no": project_no,
"project_name": project_name,
"order_status": order_status,
"order_amount": order_amount,
"linked_public_payment": self._to_bool(linked_public),
"linked_private_payment": self._to_bool(linked_private),
}
)
return {"total": total, "items": mapped_items}
async def query(self, req: FeishuExternalQueryRequest) -> FeishuExternalQueryResponse:
payload = {
"keyword": req.keyword or "",
"page": req.page,
"page_size": req.page_size,
"filters": {f.key: f.value for f in req.filters},
"raw": req.raw or {},
}
data = await self.client.query(payload)
parsed = HuobanyunQueryResponse(
total=data.get("total", 0),
items=[
{
"id": item.get("id", ""),
"name": item.get("name", ""),
"value": item.get("value", ""),
"extra": item,
}
for item in data.get("items", [])
],
)
return FeishuExternalQueryResponse(
total=parsed.total,
data=[
FeishuExternalItem(
id=item.id,
label=item.name,
value=item.value,
extra=item.extra,
)
for item in parsed.items
],
)

View File

@ -0,0 +1,67 @@
from __future__ import annotations
import os
from datetime import datetime
from typing import List, Optional, Tuple
class LogsService:
def __init__(self, log_dir: str) -> None:
self.log_dir = log_dir
def _parse_time(self, line: str) -> Optional[datetime]:
try:
prefix = line.split("|", 1)[0].strip()
return datetime.strptime(prefix, "%Y-%m-%d %H:%M:%S")
except Exception:
return None
def _list_log_files(self) -> List[str]:
if not os.path.isdir(self.log_dir):
return []
files = [
os.path.join(self.log_dir, name)
for name in os.listdir(self.log_dir)
if name.startswith(("app.log", "access.log", "error.log"))
]
return sorted(files)
def query(
self,
keyword: str = "",
start: Optional[str] = None,
end: Optional[str] = None,
page: int = 1,
size: int = 50,
) -> Tuple[int, List[str]]:
start_dt = datetime.fromisoformat(start) if start else None
end_dt = datetime.fromisoformat(end) if end else None
matched: List[Tuple[datetime, str]] = []
for path in self._list_log_files():
try:
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.rstrip("\n")
if "path=/logs/query" in line or "path=/logs/" in line:
continue
if keyword and keyword not in line:
continue
ts = self._parse_time(line)
if start_dt and ts and ts < start_dt:
continue
if end_dt and ts and ts > end_dt:
continue
matched.append((ts or datetime.min, line))
except Exception:
continue
matched.sort(key=lambda item: item[0], reverse=True)
total = len(matched)
if size <= 0:
size = 50
if page <= 0:
page = 1
start_idx = (page - 1) * size
end_idx = start_idx + size
return total, [line for _, line in matched[start_idx:end_idx]]

View File

@ -0,0 +1,35 @@
[Unit]
Description=Feishu Approval External Data Service
After=network.target
[Service]
Type=simple
WorkingDirectory=/Users/marsway/Workspace/杠上开花-1
Environment=APP_MODULE=app.main:app
Environment=HOST=0.0.0.0
Environment=PORT=8000
Environment=LOG_LEVEL=info
ExecStart=/Users/marsway/Workspace/杠上开花-1/scripts/start.sh
Restart=always
RestartSec=3
[Install]
WantedBy=multi-user.target
[Unit]
Description=Feishu Approval External Data Service
After=network.target
[Service]
Type=simple
User=www-data
WorkingDirectory=/opt/feishu-approval
Environment=APP_MODULE=app.main:app
Environment=HOST=0.0.0.0
Environment=PORT=8000
Environment=LOG_LEVEL=info
ExecStart=/opt/feishu-approval/scripts/start.sh
Restart=always
RestartSec=5
[Install]
WantedBy=multi-user.target

5
requirements.txt Normal file
View File

@ -0,0 +1,5 @@
fastapi
uvicorn
httpx
websockets
python-dotenv

9
scripts/start.sh Executable file
View File

@ -0,0 +1,9 @@
#!/usr/bin/env bash
set -euo pipefail
APP_MODULE=${APP_MODULE:-"app.main:app"}
HOST=${HOST:-"0.0.0.0"}
PORT=${PORT:-"8000"}
LOG_LEVEL=${LOG_LEVEL:-"info"}
exec uvicorn "$APP_MODULE" --host "$HOST" --port "$PORT" --log-level "$LOG_LEVEL"

9
scripts/start_dev.sh Executable file
View File

@ -0,0 +1,9 @@
#!/usr/bin/env bash
set -euo pipefail
APP_MODULE=${APP_MODULE:-"app.main:app"}
HOST=${HOST:-"0.0.0.0"}
PORT=${PORT:-"8000"}
LOG_LEVEL=${LOG_LEVEL:-"debug"}
exec uvicorn "$APP_MODULE" --host "$HOST" --port "$PORT" --log-level "$LOG_LEVEL" --reload