from __future__ import annotations import json from datetime import datetime from typing import Any from urllib.parse import quote_plus from zoneinfo import ZoneInfo from croniter import croniter from markupsafe import Markup from sqladmin import ModelView, action from sqladmin.filters import OperationColumnFilter from sqladmin.models import Request from starlette.responses import RedirectResponse from app.db import crud from app.db.engine import get_session from app.db.models import JobStatus from app.db.models import Job, JobLog from app.plugins.manager import load_job_class from app.security.fernet import encrypt_json from app.tasks.execute import execute_job def _maybe_json(value: Any) -> Any: if isinstance(value, str): s = value.strip() if not s: return value try: return json.loads(s) except json.JSONDecodeError: return value return value def _fmt_dt_seconds(dt: datetime | None) -> str: if not dt: return "" # DB 中保存的时间多为 naive;按 UTC 解释后转换为 Asia/Shanghai 展示 tz = ZoneInfo("Asia/Shanghai") if dt.tzinfo is None: dt = dt.replace(tzinfo=ZoneInfo("UTC")) return dt.astimezone(tz).strftime("%Y-%m-%d %H:%M:%S") def _truncate(s: str, n: int = 120) -> str: s = s or "" return (s[: n - 3] + "...") if len(s) > n else s class JobAdmin(ModelView, model=Job): name = "任务" name_plural = "任务" icon = "fa fa-cogs" can_delete = False column_list = [Job.id, Job.enabled, Job.cron_expr, Job.handler_path, Job.updated_at] column_details_list = [ Job.id, Job.enabled, Job.cron_expr, Job.handler_path, Job.public_cfg, Job.secret_cfg, Job.last_run_at, Job.created_at, Job.updated_at, ] # 允许在表单中编辑主键(创建 Job 必填) form_include_pk = True form_columns = [Job.id, Job.enabled, Job.cron_expr, Job.handler_path, Job.public_cfg, Job.secret_cfg] # 为 Job 详情页指定模板(用于调整按钮间距) details_template = "job_details.html" # 编辑页:secret_cfg 只写不读(不回显密文;留空表示不更新) edit_template = "job_edit.html" # 列表页模板:加入每行 Run Now list_template = "job_list.html" # 编辑页排除 secret_cfg,避免回显密文;由自定义模板额外渲染一个空输入框 # 注意:SQLAdmin 这里需要字段名字符串(不是 SQLAlchemy Column 对象) form_edit_rules = ["id", "enabled", "cron_expr", "handler_path", "public_cfg"] column_labels = { "id": "任务ID", "enabled": "启用", "cron_expr": "Cron 表达式", "handler_path": "处理器", "public_cfg": "明文配置", "secret_cfg": "密文配置", "last_run_at": "上次运行时间", "created_at": "创建时间", "updated_at": "更新时间", } column_formatters = { Job.created_at: lambda m, a: _fmt_dt_seconds(m.created_at), Job.updated_at: lambda m, a: _fmt_dt_seconds(m.updated_at), Job.last_run_at: lambda m, a: _fmt_dt_seconds(m.last_run_at), } column_formatters_detail = { Job.created_at: lambda m, a: _fmt_dt_seconds(m.created_at), Job.updated_at: lambda m, a: _fmt_dt_seconds(m.updated_at), Job.last_run_at: lambda m, a: _fmt_dt_seconds(m.last_run_at), } @action( name="run_now", label="立即运行", confirmation_message="确认立即执行该任务?", add_in_list=True, add_in_detail=True, ) async def run_now(self, request: Request): # type: ignore[override] pks = request.query_params.get("pks", "").split(",") ids = [p for p in pks if p] if not ids: return RedirectResponse(request.url_for("admin:list", identity=self.identity), status_code=303) session = get_session() created_log_id: int | None = None try: for pk in ids: job = crud.get_job(session, pk) if not job: continue snapshot = { "job_id": job.id, "handler_path": job.handler_path, "public_cfg": job.public_cfg or {}, "secret_cfg": job.secret_cfg or "", "meta": {"trigger": "admin_run_now"}, } log = crud.create_job_log( session, job_id=job.id, status=JobStatus.RUNNING, snapshot_params=snapshot, message="运行中", traceback="", run_log="", celery_task_id="", attempt=0, started_at=datetime.utcnow(), finished_at=None, ) if created_log_id is None: created_log_id = int(log.id) execute_job.delay(job_id=job.id, log_id=int(log.id)) finally: session.close() if created_log_id is not None: url = request.url_for("admin:details", identity="job-log", pk=str(created_log_id)) return RedirectResponse(url, status_code=303) return RedirectResponse(request.url_for("admin:list", identity=self.identity), status_code=303) @action( name="view_logs", label="查看日志", add_in_list=True, add_in_detail=True, ) async def view_logs(self, request: Request): # type: ignore[override] pks = request.query_params.get("pks", "").split(",") pk = next((p for p in pks if p), "") base = str(request.url_for("admin:list", identity="job-log")) if pk: return RedirectResponse(f"{base}?search={quote_plus(pk)}", status_code=303) return RedirectResponse(base, status_code=303) @action( name="disable_job", label="停用任务(保留日志)", confirmation_message="确认停用该任务(保留历史日志)?", add_in_list=True, add_in_detail=False, ) async def disable_job(self, request: Request): # type: ignore[override] pks = request.query_params.get("pks", "").split(",") ids = [p for p in pks if p] session = get_session() try: for pk in ids: job = crud.get_job(session, pk) if not job: continue job.enabled = False session.add(job) session.commit() finally: session.close() referer = request.headers.get("Referer") return RedirectResponse(referer or request.url_for("admin:list", identity=self.identity), status_code=303) @action( name="clear_job_logs", label="清理日志(保留任务)", confirmation_message="确认清理该任务的所有日志(保留任务本身)?", add_in_list=True, add_in_detail=False, ) async def clear_job_logs(self, request: Request): # type: ignore[override] pks = request.query_params.get("pks", "").split(",") ids = [p for p in pks if p] session = get_session() try: for pk in ids: crud.delete_job_logs_by_job_id(session, pk) finally: session.close() referer = request.headers.get("Referer") return RedirectResponse(referer or request.url_for("admin:list", identity=self.identity), status_code=303) @action( name="delete_job_with_logs", label="删除任务(含日志)", confirmation_message="确认删除该任务及其所有日志?此操作不可恢复。", add_in_list=True, add_in_detail=False, ) async def delete_job_with_logs(self, request: Request): # type: ignore[override] pks = request.query_params.get("pks", "").split(",") ids = [p for p in pks if p] session = get_session() try: for pk in ids: job = crud.get_job(session, pk) if not job: continue crud.delete_job_logs_by_job_id(session, job.id) session.delete(job) session.commit() finally: session.close() return RedirectResponse(request.url_for("admin:list", identity=self.identity), status_code=303) async def on_model_change(self, data: dict, model: Job, is_created: bool, request) -> None: # type: ignore[override] # id 必填(避免插入时触发 NOT NULL) raw_id = data.get("id") if is_created else (data.get("id") or getattr(model, "id", None)) if raw_id is None or not str(raw_id).strip(): raise ValueError("id is required") # handler_path 强校验:必须可 import 且继承 BaseJob handler_path = data.get("handler_path") if is_created else (data.get("handler_path") or model.handler_path) if handler_path is None or not str(handler_path).strip(): raise ValueError("handler_path is required") load_job_class(str(handler_path).strip()) # cron_expr 校验:必须是合法 cron 表达式 cron_expr = data.get("cron_expr") if is_created else (data.get("cron_expr") or model.cron_expr) if cron_expr is None or not str(cron_expr).strip(): raise ValueError("cron_expr is required") base = datetime.now(ZoneInfo("Asia/Shanghai")) itr = croniter(str(cron_expr).strip(), base) _ = itr.get_next(datetime) # public_cfg:必须是合法 JSON 对象(dict),否则直接报错阻止落库 pcfg = data.get("public_cfg") if isinstance(pcfg, str): try: pcfg = json.loads(pcfg) except json.JSONDecodeError as e: raise ValueError("public_cfg must be a JSON object") from e if not isinstance(pcfg, dict): raise ValueError("public_cfg must be a JSON object") data["public_cfg"] = pcfg # secret_cfg: # - 创建:必须是合法 JSON 对象(dict),并且保存时必须加密落库 # - 编辑:出于安全考虑不回显密文;若留空则保留原密文不更新;若填写则按 JSON 校验并加密覆盖 if is_created: scfg = data.get("secret_cfg") if isinstance(scfg, str): try: scfg = json.loads(scfg) except json.JSONDecodeError as e: raise ValueError("secret_cfg must be a JSON object") from e if not isinstance(scfg, dict): raise ValueError("secret_cfg must be a JSON object") data["secret_cfg"] = encrypt_json(scfg) else: # 自定义编辑页会以 textarea 传回 secret_cfg(可能不存在或为空) try: form = await request.form() raw = form.get("secret_cfg") except Exception: raw = None raw_s = str(raw).strip() if raw is not None else "" if not raw_s: # 留空:不更新密文字段 data.pop("secret_cfg", None) else: try: scfg2 = json.loads(raw_s) except json.JSONDecodeError as e: raise ValueError("secret_cfg must be a JSON object") from e if not isinstance(scfg2, dict): raise ValueError("secret_cfg must be a JSON object") data["secret_cfg"] = encrypt_json(scfg2) class JobLogAdmin(ModelView, model=JobLog): name = "任务日志" name_plural = "任务日志" icon = "fa fa-list" identity = "job-log" can_create = False can_edit = False can_delete = False # 支持按 job_id 搜索(不启用筛选栏,避免页面溢出) column_searchable_list = [JobLog.job_id] # 列表更适合扫读:保留关键字段 + message(截断) column_list = [JobLog.id, JobLog.job_id, JobLog.status, JobLog.started_at, JobLog.finished_at, JobLog.message] # 默认按 started_at 倒序(最新在前) column_default_sort = [(JobLog.started_at, True)] column_details_list = [ JobLog.id, JobLog.job_id, JobLog.status, JobLog.snapshot_params, JobLog.message, JobLog.traceback, JobLog.run_log, JobLog.celery_task_id, JobLog.attempt, JobLog.started_at, JobLog.finished_at, ] # 列表页模板:加入每行 Retry list_template = "joblog_list.html" # 为 JobLog 详情页单独指定模板(用于加入 Retry 按钮) details_template = "joblog_details.html" column_labels = { "id": "日志ID", "job_id": "任务ID", "status": "状态", "snapshot_params": "快照参数", "message": "消息", "traceback": "异常堆栈", "run_log": "运行日志", "celery_task_id": "Celery任务ID", "attempt": "重试次数", "started_at": "开始时间", "finished_at": "结束时间", } column_formatters = { JobLog.started_at: lambda m, a: _fmt_dt_seconds(m.started_at), JobLog.finished_at: lambda m, a: _fmt_dt_seconds(m.finished_at), JobLog.message: lambda m, a: _truncate(m.message, 120), } column_formatters_detail = { JobLog.started_at: lambda m, a: _fmt_dt_seconds(m.started_at), JobLog.finished_at: lambda m, a: _fmt_dt_seconds(m.finished_at), JobLog.message: lambda m, a: Markup( "
"
+ (m.message or "")
+ ""
),
JobLog.traceback: lambda m, a: Markup(f"{m.traceback or ''}"),
JobLog.run_log: lambda m, a: Markup(
""
+ (m.run_log or "")
+ ""
),
JobLog.snapshot_params: lambda m, a: Markup(
""
+ json.dumps(m.snapshot_params or {}, ensure_ascii=False, indent=2, sort_keys=True)
+ ""
),
}