Vastai-ConnectHub/app/tasks/dispatcher.py

71 lines
2.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import logging
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
from croniter import croniter
from app.core.logging import setup_logging
from app.db import crud
from app.db.engine import get_session
from app.tasks.celery_app import celery_app
from app.tasks.execute import execute_job
logger = logging.getLogger("connecthub.tasks.dispatcher")
def _floor_to_minute(dt: datetime) -> datetime:
return dt.replace(second=0, microsecond=0)
@celery_app.task(name="connecthub.dispatcher.tick")
def tick() -> dict[str, int]:
"""
Beat 每分钟触发一次:
- 读取 enabled Jobs
- cron_expr 到点则触发 execute_job
- last_run_at 防止同一分钟重复触发
"""
setup_logging()
session = get_session()
tz = ZoneInfo("Asia/Shanghai")
now = datetime.now(tz)
now_min = _floor_to_minute(now)
triggered = 0
try:
for job in crud.list_enabled_jobs(session):
last = job.last_run_at
if last is not None:
# SQLite 通常存 naive datetime按 Asia/Shanghai 解释
if last.tzinfo is None:
last_min = _floor_to_minute(last.replace(tzinfo=tz))
else:
last_min = _floor_to_minute(last.astimezone(tz))
if last_min >= now_min:
continue
# croniter 默认按传入 datetime 计算,这里用 Asia/Shanghai
base = now_min - timedelta(minutes=1)
itr = croniter(job.cron_expr, base)
nxt = itr.get_next(datetime)
if _floor_to_minute(nxt.replace(tzinfo=tz)) != now_min:
continue
execute_job.delay(job_id=job.id)
crud.update_job_last_run_at(session, job.id, now_min.replace(tzinfo=None))
triggered += 1
except Exception: # noqa: BLE001
logger.exception("dispatcher.tick failed")
finally:
session.close()
return {"triggered": triggered}