71 lines
2.0 KiB
Python
71 lines
2.0 KiB
Python
from __future__ import annotations
|
||
|
||
import logging
|
||
from datetime import datetime, timedelta
|
||
from zoneinfo import ZoneInfo
|
||
|
||
from croniter import croniter
|
||
|
||
from app.core.logging import setup_logging
|
||
from app.db import crud
|
||
from app.db.engine import get_session
|
||
from app.tasks.celery_app import celery_app
|
||
from app.tasks.execute import execute_job
|
||
|
||
|
||
logger = logging.getLogger("connecthub.tasks.dispatcher")
|
||
|
||
|
||
def _floor_to_minute(dt: datetime) -> datetime:
|
||
return dt.replace(second=0, microsecond=0)
|
||
|
||
|
||
@celery_app.task(name="connecthub.dispatcher.tick")
|
||
def tick() -> dict[str, int]:
|
||
"""
|
||
Beat 每分钟触发一次:
|
||
- 读取 enabled Jobs
|
||
- cron_expr 到点则触发 execute_job
|
||
- last_run_at 防止同一分钟重复触发
|
||
"""
|
||
setup_logging()
|
||
|
||
session = get_session()
|
||
tz = ZoneInfo("Asia/Shanghai")
|
||
now = datetime.now(tz)
|
||
now_min = _floor_to_minute(now)
|
||
triggered = 0
|
||
|
||
try:
|
||
for job in crud.list_enabled_jobs(session):
|
||
last = job.last_run_at
|
||
if last is not None:
|
||
# SQLite 通常存 naive datetime;按 Asia/Shanghai 解释
|
||
if last.tzinfo is None:
|
||
last_min = _floor_to_minute(last.replace(tzinfo=tz))
|
||
else:
|
||
last_min = _floor_to_minute(last.astimezone(tz))
|
||
if last_min >= now_min:
|
||
continue
|
||
|
||
# croniter 默认按传入 datetime 计算,这里用 Asia/Shanghai
|
||
base = now_min - timedelta(minutes=1)
|
||
itr = croniter(job.cron_expr, base)
|
||
nxt = itr.get_next(datetime)
|
||
if _floor_to_minute(nxt.replace(tzinfo=tz)) != now_min:
|
||
continue
|
||
|
||
execute_job.delay(job_id=job.id)
|
||
crud.update_job_last_run_at(session, job.id, now_min.replace(tzinfo=None))
|
||
triggered += 1
|
||
|
||
except Exception: # noqa: BLE001
|
||
logger.exception("dispatcher.tick failed")
|
||
finally:
|
||
session.close()
|
||
|
||
return {"triggered": triggered}
|
||
|
||
|
||
|