from __future__ import annotations import logging from datetime import datetime, timedelta from zoneinfo import ZoneInfo from croniter import croniter from app.core.logging import setup_logging from app.db import crud from app.db.engine import get_session from app.tasks.celery_app import celery_app from app.tasks.execute import execute_job logger = logging.getLogger("connecthub.tasks.dispatcher") def _floor_to_minute(dt: datetime) -> datetime: return dt.replace(second=0, microsecond=0) @celery_app.task(name="connecthub.dispatcher.tick") def tick() -> dict[str, int]: """ Beat 每分钟触发一次: - 读取 enabled Jobs - cron_expr 到点则触发 execute_job - last_run_at 防止同一分钟重复触发 """ setup_logging() session = get_session() tz = ZoneInfo("Asia/Shanghai") now = datetime.now(tz) now_min = _floor_to_minute(now) triggered = 0 try: for job in crud.list_enabled_jobs(session): last = job.last_run_at if last is not None: # SQLite 通常存 naive datetime;按 Asia/Shanghai 解释 if last.tzinfo is None: last_min = _floor_to_minute(last.replace(tzinfo=tz)) else: last_min = _floor_to_minute(last.astimezone(tz)) # 防御:若 last_run_at 被错误写成 UTC 等导致“在未来”,则忽略该值避免任务永久不触发 if last_min > now_min: logger.warning( "job.last_run_at appears in the future (ignored) job_id=%s last_run_at=%s now=%s", job.id, last, now, ) else: if last_min >= now_min: continue # croniter 默认按传入 datetime 计算,这里用 Asia/Shanghai base = now_min - timedelta(minutes=1) itr = croniter(job.cron_expr, base) nxt = itr.get_next(datetime) if _floor_to_minute(nxt.replace(tzinfo=tz)) != now_min: continue execute_job.delay(job_id=job.id) # 写入时保留 tz 信息,避免在 PostgreSQL timestamptz 中被误当 UTC 导致“未来 last_run_at” crud.update_job_last_run_at(session, job.id, now_min) triggered += 1 except Exception: # noqa: BLE001 logger.exception("dispatcher.tick failed") finally: session.close() return {"triggered": triggered}