384 lines
14 KiB
Python
384 lines
14 KiB
Python
from __future__ import annotations
|
||
|
||
import json
|
||
from datetime import datetime
|
||
from typing import Any
|
||
from urllib.parse import quote_plus
|
||
from zoneinfo import ZoneInfo
|
||
|
||
from croniter import croniter
|
||
from markupsafe import Markup
|
||
from sqladmin import ModelView, action
|
||
from sqladmin.filters import OperationColumnFilter
|
||
from sqladmin.models import Request
|
||
from starlette.responses import RedirectResponse
|
||
|
||
from app.db import crud
|
||
from app.db.engine import get_session
|
||
from app.db.models import JobStatus
|
||
from app.db.models import Job, JobLog
|
||
from app.plugins.manager import load_job_class
|
||
from app.security.fernet import encrypt_json
|
||
from app.tasks.execute import execute_job
|
||
|
||
|
||
def _maybe_json(value: Any) -> Any:
|
||
if isinstance(value, str):
|
||
s = value.strip()
|
||
if not s:
|
||
return value
|
||
try:
|
||
return json.loads(s)
|
||
except json.JSONDecodeError:
|
||
return value
|
||
return value
|
||
|
||
|
||
def _fmt_dt_seconds(dt: datetime | None) -> str:
|
||
if not dt:
|
||
return ""
|
||
# DB 中保存的时间多为 naive;按 UTC 解释后转换为 Asia/Shanghai 展示
|
||
tz = ZoneInfo("Asia/Shanghai")
|
||
if dt.tzinfo is None:
|
||
dt = dt.replace(tzinfo=ZoneInfo("UTC"))
|
||
return dt.astimezone(tz).strftime("%Y-%m-%d %H:%M:%S")
|
||
|
||
|
||
def _truncate(s: str, n: int = 120) -> str:
|
||
s = s or ""
|
||
return (s[: n - 3] + "...") if len(s) > n else s
|
||
|
||
|
||
class JobAdmin(ModelView, model=Job):
|
||
name = "任务"
|
||
name_plural = "任务"
|
||
icon = "fa fa-cogs"
|
||
can_delete = False
|
||
|
||
column_list = [Job.id, Job.enabled, Job.cron_expr, Job.handler_path, Job.updated_at]
|
||
column_details_list = [
|
||
Job.id,
|
||
Job.enabled,
|
||
Job.cron_expr,
|
||
Job.handler_path,
|
||
Job.public_cfg,
|
||
Job.secret_cfg,
|
||
Job.last_run_at,
|
||
Job.created_at,
|
||
Job.updated_at,
|
||
]
|
||
|
||
# 允许在表单中编辑主键(创建 Job 必填)
|
||
form_include_pk = True
|
||
form_columns = [Job.id, Job.enabled, Job.cron_expr, Job.handler_path, Job.public_cfg, Job.secret_cfg]
|
||
|
||
# 为 Job 详情页指定模板(用于调整按钮间距)
|
||
details_template = "job_details.html"
|
||
|
||
# 编辑页:secret_cfg 只写不读(不回显密文;留空表示不更新)
|
||
edit_template = "job_edit.html"
|
||
|
||
# 列表页模板:加入每行 Run Now
|
||
list_template = "job_list.html"
|
||
|
||
# 编辑页排除 secret_cfg,避免回显密文;由自定义模板额外渲染一个空输入框
|
||
# 注意:SQLAdmin 这里需要字段名字符串(不是 SQLAlchemy Column 对象)
|
||
form_edit_rules = ["id", "enabled", "cron_expr", "handler_path", "public_cfg"]
|
||
|
||
column_labels = {
|
||
"id": "任务ID",
|
||
"enabled": "启用",
|
||
"cron_expr": "Cron 表达式",
|
||
"handler_path": "处理器",
|
||
"public_cfg": "明文配置",
|
||
"secret_cfg": "密文配置",
|
||
"last_run_at": "上次运行时间",
|
||
"created_at": "创建时间",
|
||
"updated_at": "更新时间",
|
||
}
|
||
|
||
column_formatters = {
|
||
Job.created_at: lambda m, a: _fmt_dt_seconds(m.created_at),
|
||
Job.updated_at: lambda m, a: _fmt_dt_seconds(m.updated_at),
|
||
Job.last_run_at: lambda m, a: _fmt_dt_seconds(m.last_run_at),
|
||
}
|
||
|
||
column_formatters_detail = {
|
||
Job.created_at: lambda m, a: _fmt_dt_seconds(m.created_at),
|
||
Job.updated_at: lambda m, a: _fmt_dt_seconds(m.updated_at),
|
||
Job.last_run_at: lambda m, a: _fmt_dt_seconds(m.last_run_at),
|
||
}
|
||
|
||
@action(
|
||
name="run_now",
|
||
label="立即运行",
|
||
confirmation_message="确认立即执行该任务?",
|
||
add_in_list=True,
|
||
add_in_detail=True,
|
||
)
|
||
async def run_now(self, request: Request): # type: ignore[override]
|
||
pks = request.query_params.get("pks", "").split(",")
|
||
ids = [p for p in pks if p]
|
||
if not ids:
|
||
return RedirectResponse(request.url_for("admin:list", identity=self.identity), status_code=303)
|
||
|
||
session = get_session()
|
||
created_log_id: int | None = None
|
||
try:
|
||
for pk in ids:
|
||
job = crud.get_job(session, pk)
|
||
if not job:
|
||
continue
|
||
snapshot = {
|
||
"job_id": job.id,
|
||
"handler_path": job.handler_path,
|
||
"public_cfg": job.public_cfg or {},
|
||
"secret_cfg": job.secret_cfg or "",
|
||
"meta": {"trigger": "admin_run_now"},
|
||
}
|
||
log = crud.create_job_log(
|
||
session,
|
||
job_id=job.id,
|
||
status=JobStatus.RUNNING,
|
||
snapshot_params=snapshot,
|
||
message="运行中",
|
||
traceback="",
|
||
run_log="",
|
||
celery_task_id="",
|
||
attempt=0,
|
||
started_at=datetime.utcnow(),
|
||
finished_at=None,
|
||
)
|
||
if created_log_id is None:
|
||
created_log_id = int(log.id)
|
||
execute_job.delay(job_id=job.id, log_id=int(log.id))
|
||
finally:
|
||
session.close()
|
||
|
||
if created_log_id is not None:
|
||
url = request.url_for("admin:details", identity="job-log", pk=str(created_log_id))
|
||
return RedirectResponse(url, status_code=303)
|
||
return RedirectResponse(request.url_for("admin:list", identity=self.identity), status_code=303)
|
||
|
||
@action(
|
||
name="view_logs",
|
||
label="查看日志",
|
||
add_in_list=True,
|
||
add_in_detail=True,
|
||
)
|
||
async def view_logs(self, request: Request): # type: ignore[override]
|
||
pks = request.query_params.get("pks", "").split(",")
|
||
pk = next((p for p in pks if p), "")
|
||
base = str(request.url_for("admin:list", identity="job-log"))
|
||
if pk:
|
||
return RedirectResponse(f"{base}?search={quote_plus(pk)}", status_code=303)
|
||
return RedirectResponse(base, status_code=303)
|
||
|
||
@action(
|
||
name="disable_job",
|
||
label="停用任务(保留日志)",
|
||
confirmation_message="确认停用该任务(保留历史日志)?",
|
||
add_in_list=True,
|
||
add_in_detail=False,
|
||
)
|
||
async def disable_job(self, request: Request): # type: ignore[override]
|
||
pks = request.query_params.get("pks", "").split(",")
|
||
ids = [p for p in pks if p]
|
||
session = get_session()
|
||
try:
|
||
for pk in ids:
|
||
job = crud.get_job(session, pk)
|
||
if not job:
|
||
continue
|
||
job.enabled = False
|
||
session.add(job)
|
||
session.commit()
|
||
finally:
|
||
session.close()
|
||
referer = request.headers.get("Referer")
|
||
return RedirectResponse(referer or request.url_for("admin:list", identity=self.identity), status_code=303)
|
||
|
||
@action(
|
||
name="clear_job_logs",
|
||
label="清理日志(保留任务)",
|
||
confirmation_message="确认清理该任务的所有日志(保留任务本身)?",
|
||
add_in_list=True,
|
||
add_in_detail=False,
|
||
)
|
||
async def clear_job_logs(self, request: Request): # type: ignore[override]
|
||
pks = request.query_params.get("pks", "").split(",")
|
||
ids = [p for p in pks if p]
|
||
session = get_session()
|
||
try:
|
||
for pk in ids:
|
||
crud.delete_job_logs_by_job_id(session, pk)
|
||
finally:
|
||
session.close()
|
||
referer = request.headers.get("Referer")
|
||
return RedirectResponse(referer or request.url_for("admin:list", identity=self.identity), status_code=303)
|
||
|
||
@action(
|
||
name="delete_job_with_logs",
|
||
label="删除任务(含日志)",
|
||
confirmation_message="确认删除该任务及其所有日志?此操作不可恢复。",
|
||
add_in_list=True,
|
||
add_in_detail=False,
|
||
)
|
||
async def delete_job_with_logs(self, request: Request): # type: ignore[override]
|
||
pks = request.query_params.get("pks", "").split(",")
|
||
ids = [p for p in pks if p]
|
||
session = get_session()
|
||
try:
|
||
for pk in ids:
|
||
job = crud.get_job(session, pk)
|
||
if not job:
|
||
continue
|
||
crud.delete_job_logs_by_job_id(session, job.id)
|
||
session.delete(job)
|
||
session.commit()
|
||
finally:
|
||
session.close()
|
||
return RedirectResponse(request.url_for("admin:list", identity=self.identity), status_code=303)
|
||
|
||
async def on_model_change(self, data: dict, model: Job, is_created: bool, request) -> None: # type: ignore[override]
|
||
# id 必填(避免插入时触发 NOT NULL)
|
||
raw_id = data.get("id") if is_created else (data.get("id") or getattr(model, "id", None))
|
||
if raw_id is None or not str(raw_id).strip():
|
||
raise ValueError("id is required")
|
||
|
||
# handler_path 强校验:必须可 import 且继承 BaseJob
|
||
handler_path = data.get("handler_path") if is_created else (data.get("handler_path") or model.handler_path)
|
||
if handler_path is None or not str(handler_path).strip():
|
||
raise ValueError("handler_path is required")
|
||
load_job_class(str(handler_path).strip())
|
||
|
||
# cron_expr 校验:必须是合法 cron 表达式
|
||
cron_expr = data.get("cron_expr") if is_created else (data.get("cron_expr") or model.cron_expr)
|
||
if cron_expr is None or not str(cron_expr).strip():
|
||
raise ValueError("cron_expr is required")
|
||
base = datetime.now(ZoneInfo("Asia/Shanghai"))
|
||
itr = croniter(str(cron_expr).strip(), base)
|
||
_ = itr.get_next(datetime)
|
||
|
||
# public_cfg:必须是合法 JSON 对象(dict),否则直接报错阻止落库
|
||
pcfg = data.get("public_cfg")
|
||
if isinstance(pcfg, str):
|
||
try:
|
||
pcfg = json.loads(pcfg)
|
||
except json.JSONDecodeError as e:
|
||
raise ValueError("public_cfg must be a JSON object") from e
|
||
if not isinstance(pcfg, dict):
|
||
raise ValueError("public_cfg must be a JSON object")
|
||
data["public_cfg"] = pcfg
|
||
|
||
# secret_cfg:
|
||
# - 创建:必须是合法 JSON 对象(dict),并且保存时必须加密落库
|
||
# - 编辑:出于安全考虑不回显密文;若留空则保留原密文不更新;若填写则按 JSON 校验并加密覆盖
|
||
if is_created:
|
||
scfg = data.get("secret_cfg")
|
||
if isinstance(scfg, str):
|
||
try:
|
||
scfg = json.loads(scfg)
|
||
except json.JSONDecodeError as e:
|
||
raise ValueError("secret_cfg must be a JSON object") from e
|
||
if not isinstance(scfg, dict):
|
||
raise ValueError("secret_cfg must be a JSON object")
|
||
data["secret_cfg"] = encrypt_json(scfg)
|
||
else:
|
||
# 自定义编辑页会以 textarea 传回 secret_cfg(可能不存在或为空)
|
||
try:
|
||
form = await request.form()
|
||
raw = form.get("secret_cfg")
|
||
except Exception:
|
||
raw = None
|
||
raw_s = str(raw).strip() if raw is not None else ""
|
||
if not raw_s:
|
||
# 留空:不更新密文字段
|
||
data.pop("secret_cfg", None)
|
||
else:
|
||
try:
|
||
scfg2 = json.loads(raw_s)
|
||
except json.JSONDecodeError as e:
|
||
raise ValueError("secret_cfg must be a JSON object") from e
|
||
if not isinstance(scfg2, dict):
|
||
raise ValueError("secret_cfg must be a JSON object")
|
||
data["secret_cfg"] = encrypt_json(scfg2)
|
||
|
||
|
||
class JobLogAdmin(ModelView, model=JobLog):
|
||
name = "任务日志"
|
||
name_plural = "任务日志"
|
||
icon = "fa fa-list"
|
||
identity = "job-log"
|
||
|
||
can_create = False
|
||
can_edit = False
|
||
can_delete = False
|
||
|
||
# 支持按 job_id 搜索(不启用筛选栏,避免页面溢出)
|
||
column_searchable_list = [JobLog.job_id]
|
||
|
||
# 列表更适合扫读:保留关键字段 + message(截断)
|
||
column_list = [JobLog.id, JobLog.job_id, JobLog.status, JobLog.started_at, JobLog.finished_at, JobLog.message]
|
||
# 默认按 started_at 倒序(最新在前)
|
||
column_default_sort = [(JobLog.started_at, True)]
|
||
column_details_list = [
|
||
JobLog.id,
|
||
JobLog.job_id,
|
||
JobLog.status,
|
||
JobLog.snapshot_params,
|
||
JobLog.message,
|
||
JobLog.traceback,
|
||
JobLog.run_log,
|
||
JobLog.celery_task_id,
|
||
JobLog.attempt,
|
||
JobLog.started_at,
|
||
JobLog.finished_at,
|
||
]
|
||
|
||
# 列表页模板:加入每行 Retry
|
||
list_template = "joblog_list.html"
|
||
# 为 JobLog 详情页单独指定模板(用于加入 Retry 按钮)
|
||
details_template = "joblog_details.html"
|
||
|
||
column_labels = {
|
||
"id": "日志ID",
|
||
"job_id": "任务ID",
|
||
"status": "状态",
|
||
"snapshot_params": "快照参数",
|
||
"message": "消息",
|
||
"traceback": "异常堆栈",
|
||
"run_log": "运行日志",
|
||
"celery_task_id": "Celery任务ID",
|
||
"attempt": "重试次数",
|
||
"started_at": "开始时间",
|
||
"finished_at": "结束时间",
|
||
}
|
||
|
||
column_formatters = {
|
||
JobLog.started_at: lambda m, a: _fmt_dt_seconds(m.started_at),
|
||
JobLog.finished_at: lambda m, a: _fmt_dt_seconds(m.finished_at),
|
||
JobLog.message: lambda m, a: _truncate(m.message, 120),
|
||
}
|
||
|
||
column_formatters_detail = {
|
||
JobLog.started_at: lambda m, a: _fmt_dt_seconds(m.started_at),
|
||
JobLog.finished_at: lambda m, a: _fmt_dt_seconds(m.finished_at),
|
||
JobLog.message: lambda m, a: Markup(
|
||
"<pre style='max-height:240px;overflow:auto;white-space:pre-wrap'>"
|
||
+ (m.message or "")
|
||
+ "</pre>"
|
||
),
|
||
JobLog.traceback: lambda m, a: Markup(f"<pre style='white-space:pre-wrap'>{m.traceback or ''}</pre>"),
|
||
JobLog.run_log: lambda m, a: Markup(
|
||
"<pre style='max-height:480px;overflow:auto;white-space:pre-wrap'>"
|
||
+ (m.run_log or "")
|
||
+ "</pre>"
|
||
),
|
||
JobLog.snapshot_params: lambda m, a: Markup(
|
||
"<pre style='white-space:pre-wrap'>"
|
||
+ json.dumps(m.snapshot_params or {}, ensure_ascii=False, indent=2, sort_keys=True)
|
||
+ "</pre>"
|
||
),
|
||
}
|