This commit is contained in:
Marsway 2026-01-13 10:23:48 +08:00
parent e3644d85ec
commit 4e47959f8c
6 changed files with 254 additions and 46 deletions

View File

@ -1,15 +1,23 @@
from __future__ import annotations from __future__ import annotations
from datetime import datetime
from urllib.parse import quote_plus
from fastapi import APIRouter, HTTPException, Request from fastapi import APIRouter, HTTPException, Request
from starlette.responses import RedirectResponse from starlette.responses import RedirectResponse
from app.db import crud from app.db import crud
from app.db.engine import get_session from app.db.engine import get_session
from app.db.models import JobStatus
from app.tasks.execute import execute_job from app.tasks.execute import execute_job
router = APIRouter() router = APIRouter()
def _redirect_with_error(referer: str, msg: str) -> RedirectResponse:
sep = "&" if "?" in referer else "?"
return RedirectResponse(f"{referer}{sep}error={quote_plus(msg)}", status_code=303)
@router.post("/admin/joblogs/{log_id}/retry") @router.post("/admin/joblogs/{log_id}/retry")
def retry_joblog(request: Request, log_id: int): def retry_joblog(request: Request, log_id: int):
@ -18,20 +26,68 @@ def retry_joblog(request: Request, log_id: int):
log = crud.get_job_log(session, log_id) log = crud.get_job_log(session, log_id)
if not log: if not log:
raise HTTPException(status_code=404, detail="JobLog not found") raise HTTPException(status_code=404, detail="JobLog not found")
# 关键:用 snapshot_params 重新触发任务(其中 secret_cfg 仍为密文) if log.status == JobStatus.RUNNING:
execute_job.delay(snapshot_params=log.snapshot_params) referer = request.headers.get("Referer") or str(request.url_for("admin:details", identity="joblog", pk=str(log_id)))
return _redirect_with_error(referer, "该任务日志正在运行中,请结束后再重试。")
# 创建新的 RUNNING JobLog并跳转到该条详情页
snapshot = dict(log.snapshot_params or {})
meta = dict(snapshot.get("meta") or {})
meta["trigger"] = "retry"
meta["started_at"] = datetime.utcnow().isoformat()
snapshot["meta"] = meta
new_log = crud.create_job_log(
session,
job_id=str(log.job_id),
status=JobStatus.RUNNING,
snapshot_params=snapshot,
message="运行中",
traceback="",
run_log="",
celery_task_id="",
attempt=0,
started_at=datetime.utcnow(),
finished_at=None,
)
execute_job.delay(snapshot_params=snapshot, log_id=int(new_log.id))
url = request.url_for("admin:details", identity="joblog", pk=str(new_log.id))
return RedirectResponse(url, status_code=303)
finally: finally:
session.close() session.close()
referer = request.headers.get("Referer") or "/admin"
return RedirectResponse(referer, status_code=303)
@router.post("/admin/jobs/{job_id}/run") @router.post("/admin/jobs/{job_id}/run")
def run_job(request: Request, job_id: str): def run_job(request: Request, job_id: str):
# 触发一次立即执行 session = get_session()
execute_job.delay(job_id=job_id) try:
referer = request.headers.get("Referer") or "/admin" job = crud.get_job(session, job_id)
return RedirectResponse(referer, status_code=303) if not job:
raise HTTPException(status_code=404, detail="Job not found")
snapshot = {
"job_id": job.id,
"handler_path": job.handler_path,
"public_cfg": job.public_cfg or {},
"secret_cfg": job.secret_cfg or "",
"meta": {"trigger": "run_now", "started_at": datetime.utcnow().isoformat()},
}
new_log = crud.create_job_log(
session,
job_id=str(job.id),
status=JobStatus.RUNNING,
snapshot_params=snapshot,
message="运行中",
traceback="",
run_log="",
celery_task_id="",
attempt=0,
started_at=datetime.utcnow(),
finished_at=None,
)
execute_job.delay(job_id=job.id, log_id=int(new_log.id))
url = request.url_for("admin:details", identity="joblog", pk=str(new_log.id))
return RedirectResponse(url, status_code=303)
finally:
session.close()

View File

@ -17,6 +17,11 @@
{% endfor %}: {{ get_object_identifier(model) }} {% endfor %}: {{ get_object_identifier(model) }}
</h3> </h3>
</div> </div>
{% if request.query_params.get('error') %}
<div class="alert alert-danger m-3" role="alert">
{{ request.query_params.get('error') }}
</div>
{% endif %}
<div class="card-body border-bottom py-3"> <div class="card-body border-bottom py-3">
<div class="table-responsive"> <div class="table-responsive">
<table class="table card-table table-vcenter text-nowrap table-hover table-bordered"> <table class="table card-table table-vcenter text-nowrap table-hover table-bordered">

View File

@ -50,6 +50,11 @@
{% endif %} {% endif %}
</div> </div>
</div> </div>
{% if request.query_params.get('error') %}
<div class="alert alert-danger m-3" role="alert">
{{ request.query_params.get('error') }}
</div>
{% endif %}
<div class="card-body border-bottom py-3"> <div class="card-body border-bottom py-3">
<div class="d-flex justify-content-between"> <div class="d-flex justify-content-between">
<div class="dropdown col-4"> <div class="dropdown col-4">

View File

@ -3,14 +3,19 @@ from __future__ import annotations
import json import json
from datetime import datetime from datetime import datetime
from typing import Any from typing import Any
from urllib.parse import quote_plus
from zoneinfo import ZoneInfo from zoneinfo import ZoneInfo
from croniter import croniter from croniter import croniter
from markupsafe import Markup from markupsafe import Markup
from sqladmin import ModelView, action from sqladmin import ModelView, action
from sqladmin.filters import OperationColumnFilter
from sqladmin.models import Request from sqladmin.models import Request
from starlette.responses import RedirectResponse from starlette.responses import RedirectResponse
from app.db import crud
from app.db.engine import get_session
from app.db.models import JobStatus
from app.db.models import Job, JobLog from app.db.models import Job, JobLog
from app.plugins.manager import load_job_class from app.plugins.manager import load_job_class
from app.security.fernet import encrypt_json from app.security.fernet import encrypt_json
@ -48,6 +53,7 @@ class JobAdmin(ModelView, model=Job):
name = "任务" name = "任务"
name_plural = "任务" name_plural = "任务"
icon = "fa fa-cogs" icon = "fa fa-cogs"
can_delete = False
column_list = [Job.id, Job.enabled, Job.cron_expr, Job.handler_path, Job.updated_at] column_list = [Job.id, Job.enabled, Job.cron_expr, Job.handler_path, Job.updated_at]
column_details_list = [ column_details_list = [
@ -112,11 +118,129 @@ class JobAdmin(ModelView, model=Job):
) )
async def run_now(self, request: Request): # type: ignore[override] async def run_now(self, request: Request): # type: ignore[override]
pks = request.query_params.get("pks", "").split(",") pks = request.query_params.get("pks", "").split(",")
for pk in [p for p in pks if p]: ids = [p for p in pks if p]
execute_job.delay(job_id=pk) if not ids:
return RedirectResponse(request.url_for("admin:list", identity=self.identity), status_code=303)
session = get_session()
created_log_id: int | None = None
try:
for pk in ids:
job = crud.get_job(session, pk)
if not job:
continue
snapshot = {
"job_id": job.id,
"handler_path": job.handler_path,
"public_cfg": job.public_cfg or {},
"secret_cfg": job.secret_cfg or "",
"meta": {"trigger": "admin_run_now"},
}
log = crud.create_job_log(
session,
job_id=job.id,
status=JobStatus.RUNNING,
snapshot_params=snapshot,
message="运行中",
traceback="",
run_log="",
celery_task_id="",
attempt=0,
started_at=datetime.utcnow(),
finished_at=None,
)
if created_log_id is None:
created_log_id = int(log.id)
execute_job.delay(job_id=job.id, log_id=int(log.id))
finally:
session.close()
if created_log_id is not None:
url = request.url_for("admin:details", identity="joblog", pk=str(created_log_id))
return RedirectResponse(url, status_code=303)
return RedirectResponse(request.url_for("admin:list", identity=self.identity), status_code=303)
@action(
name="view_logs",
label="查看日志",
confirmation_message="查看该任务的日志?",
add_in_list=True,
add_in_detail=True,
)
async def view_logs(self, request: Request): # type: ignore[override]
pks = request.query_params.get("pks", "").split(",")
pk = next((p for p in pks if p), "")
base = str(request.url_for("admin:list", identity="joblog"))
if pk:
return RedirectResponse(f"{base}?search={quote_plus(pk)}", status_code=303)
return RedirectResponse(base, status_code=303)
@action(
name="disable_job",
label="停用任务(保留日志)",
confirmation_message="确认停用该任务(保留历史日志)?",
add_in_list=True,
add_in_detail=True,
)
async def disable_job(self, request: Request): # type: ignore[override]
pks = request.query_params.get("pks", "").split(",")
ids = [p for p in pks if p]
session = get_session()
try:
for pk in ids:
job = crud.get_job(session, pk)
if not job:
continue
job.enabled = False
session.add(job)
session.commit()
finally:
session.close()
referer = request.headers.get("Referer") referer = request.headers.get("Referer")
return RedirectResponse(referer or request.url_for("admin:list", identity=self.identity), status_code=303) return RedirectResponse(referer or request.url_for("admin:list", identity=self.identity), status_code=303)
@action(
name="clear_job_logs",
label="清理任务日志(保留任务)",
confirmation_message="确认清理该任务的所有日志(保留任务本身)?",
add_in_list=True,
add_in_detail=True,
)
async def clear_job_logs(self, request: Request): # type: ignore[override]
pks = request.query_params.get("pks", "").split(",")
ids = [p for p in pks if p]
session = get_session()
try:
for pk in ids:
crud.delete_job_logs_by_job_id(session, pk)
finally:
session.close()
referer = request.headers.get("Referer")
return RedirectResponse(referer or request.url_for("admin:list", identity=self.identity), status_code=303)
@action(
name="delete_job_with_logs",
label="删除任务及日志(硬删除)",
confirmation_message="确认删除该任务及其所有日志?此操作不可恢复。",
add_in_list=True,
add_in_detail=True,
)
async def delete_job_with_logs(self, request: Request): # type: ignore[override]
pks = request.query_params.get("pks", "").split(",")
ids = [p for p in pks if p]
session = get_session()
try:
for pk in ids:
job = crud.get_job(session, pk)
if not job:
continue
crud.delete_job_logs_by_job_id(session, job.id)
session.delete(job)
session.commit()
finally:
session.close()
return RedirectResponse(request.url_for("admin:list", identity=self.identity), status_code=303)
async def on_model_change(self, data: dict, model: Job, is_created: bool, request) -> None: # type: ignore[override] async def on_model_change(self, data: dict, model: Job, is_created: bool, request) -> None: # type: ignore[override]
# id 必填(避免插入时触发 NOT NULL # id 必填(避免插入时触发 NOT NULL
raw_id = data.get("id") if is_created else (data.get("id") or getattr(model, "id", None)) raw_id = data.get("id") if is_created else (data.get("id") or getattr(model, "id", None))
@ -191,6 +315,10 @@ class JobLogAdmin(ModelView, model=JobLog):
can_edit = False can_edit = False
can_delete = False can_delete = False
# 支持按 job_id 搜索与筛选
column_searchable_list = [JobLog.job_id]
column_filters = [OperationColumnFilter(JobLog.job_id, title="任务ID")]
# 列表更适合扫读:保留关键字段 + message截断 # 列表更适合扫读:保留关键字段 + message截断
column_list = [JobLog.id, JobLog.job_id, JobLog.status, JobLog.started_at, JobLog.finished_at, JobLog.message] column_list = [JobLog.id, JobLog.job_id, JobLog.status, JobLog.started_at, JobLog.finished_at, JobLog.message]
# 默认按 started_at 倒序(最新在前) # 默认按 started_at 倒序(最新在前)

View File

@ -98,3 +98,11 @@ def update_job_log(
session.refresh(log) session.refresh(log)
return log return log
def delete_job_logs_by_job_id(session: Session, job_id: str) -> int:
logs = list(session.scalars(select(JobLog).where(JobLog.job_id == job_id)))
for log in logs:
session.delete(log)
session.commit()
return len(logs)

View File

@ -56,7 +56,12 @@ def _compose_message(base_message: str, warning_lines: list[str]) -> str:
@celery_app.task(bind=True, name="connecthub.execute_job") @celery_app.task(bind=True, name="connecthub.execute_job")
def execute_job(self, job_id: str | None = None, snapshot_params: dict[str, Any] | None = None) -> dict[str, Any]: def execute_job(
self,
job_id: str | None = None,
snapshot_params: dict[str, Any] | None = None,
log_id: int | None = None,
) -> dict[str, Any]:
""" """
通用执行入口 通用执行入口
- job_id DB 读取 Job 定义 - job_id DB 读取 Job 定义
@ -78,7 +83,7 @@ def execute_job(self, job_id: str | None = None, snapshot_params: dict[str, Any]
traceback = "" traceback = ""
result: dict[str, Any] = {} result: dict[str, Any] = {}
run_log_text = "" run_log_text = ""
log_id: int | None = None job_log_id: int | None = log_id
celery_task_id = getattr(self.request, "id", "") or "" celery_task_id = getattr(self.request, "id", "") or ""
attempt = int(getattr(self.request, "retries", 0) or 0) attempt = int(getattr(self.request, "retries", 0) or 0)
snapshot: dict[str, Any] = {} snapshot: dict[str, Any] = {}
@ -113,7 +118,8 @@ def execute_job(self, job_id: str | None = None, snapshot_params: dict[str, Any]
}, },
} }
# 任务开始即落库一条 RUNNING 记录(若失败则降级为旧行为:结束时再 create # 任务开始即落库一条 RUNNING 记录(若外部已传入 log_id则只更新该条若创建失败则降级为旧行为结束时再 create
if job_log_id is None:
try: try:
running = crud.create_job_log( running = crud.create_job_log(
session, session,
@ -128,9 +134,9 @@ def execute_job(self, job_id: str | None = None, snapshot_params: dict[str, Any]
started_at=started_at, started_at=started_at,
finished_at=None, finished_at=None,
) )
log_id = int(running.id) job_log_id = int(running.id)
except Exception: except Exception:
log_id = None job_log_id = None
secrets = decrypt_json(secret_token) secrets = decrypt_json(secret_token)
job_instance = instantiate(handler_path) job_instance = instantiate(handler_path)
@ -155,10 +161,10 @@ def execute_job(self, job_id: str | None = None, snapshot_params: dict[str, Any]
warning_lines = _extract_warning_lines(run_log_text) warning_lines = _extract_warning_lines(run_log_text)
message = _compose_message(message, warning_lines) message = _compose_message(message, warning_lines)
# 结束时:优先更新 RUNNING 那条;若没有则创建最终记录(兼容降级) # 结束时:优先更新 RUNNING 那条;若没有则创建最终记录(兼容降级)
if log_id is not None: if job_log_id is not None:
crud.update_job_log( crud.update_job_log(
session, session,
log_id, job_log_id,
status=status, status=status,
message=message, message=message,
traceback=traceback, traceback=traceback,