diff --git a/app/admin/views.py b/app/admin/views.py
index 9cc6158..3e0f929 100644
--- a/app/admin/views.py
+++ b/app/admin/views.py
@@ -3,14 +3,19 @@ from __future__ import annotations
import json
from datetime import datetime
from typing import Any
+from urllib.parse import quote_plus
from zoneinfo import ZoneInfo
from croniter import croniter
from markupsafe import Markup
from sqladmin import ModelView, action
+from sqladmin.filters import OperationColumnFilter
from sqladmin.models import Request
from starlette.responses import RedirectResponse
+from app.db import crud
+from app.db.engine import get_session
+from app.db.models import JobStatus
from app.db.models import Job, JobLog
from app.plugins.manager import load_job_class
from app.security.fernet import encrypt_json
@@ -48,6 +53,7 @@ class JobAdmin(ModelView, model=Job):
name = "任务"
name_plural = "任务"
icon = "fa fa-cogs"
+ can_delete = False
column_list = [Job.id, Job.enabled, Job.cron_expr, Job.handler_path, Job.updated_at]
column_details_list = [
@@ -112,11 +118,129 @@ class JobAdmin(ModelView, model=Job):
)
async def run_now(self, request: Request): # type: ignore[override]
pks = request.query_params.get("pks", "").split(",")
- for pk in [p for p in pks if p]:
- execute_job.delay(job_id=pk)
+ ids = [p for p in pks if p]
+ if not ids:
+ return RedirectResponse(request.url_for("admin:list", identity=self.identity), status_code=303)
+
+ session = get_session()
+ created_log_id: int | None = None
+ try:
+ for pk in ids:
+ job = crud.get_job(session, pk)
+ if not job:
+ continue
+ snapshot = {
+ "job_id": job.id,
+ "handler_path": job.handler_path,
+ "public_cfg": job.public_cfg or {},
+ "secret_cfg": job.secret_cfg or "",
+ "meta": {"trigger": "admin_run_now"},
+ }
+ log = crud.create_job_log(
+ session,
+ job_id=job.id,
+ status=JobStatus.RUNNING,
+ snapshot_params=snapshot,
+ message="运行中",
+ traceback="",
+ run_log="",
+ celery_task_id="",
+ attempt=0,
+ started_at=datetime.utcnow(),
+ finished_at=None,
+ )
+ if created_log_id is None:
+ created_log_id = int(log.id)
+ execute_job.delay(job_id=job.id, log_id=int(log.id))
+ finally:
+ session.close()
+
+ if created_log_id is not None:
+ url = request.url_for("admin:details", identity="joblog", pk=str(created_log_id))
+ return RedirectResponse(url, status_code=303)
+ return RedirectResponse(request.url_for("admin:list", identity=self.identity), status_code=303)
+
+ @action(
+ name="view_logs",
+ label="查看日志",
+ confirmation_message="查看该任务的日志?",
+ add_in_list=True,
+ add_in_detail=True,
+ )
+ async def view_logs(self, request: Request): # type: ignore[override]
+ pks = request.query_params.get("pks", "").split(",")
+ pk = next((p for p in pks if p), "")
+ base = str(request.url_for("admin:list", identity="joblog"))
+ if pk:
+ return RedirectResponse(f"{base}?search={quote_plus(pk)}", status_code=303)
+ return RedirectResponse(base, status_code=303)
+
+ @action(
+ name="disable_job",
+ label="停用任务(保留日志)",
+ confirmation_message="确认停用该任务(保留历史日志)?",
+ add_in_list=True,
+ add_in_detail=True,
+ )
+ async def disable_job(self, request: Request): # type: ignore[override]
+ pks = request.query_params.get("pks", "").split(",")
+ ids = [p for p in pks if p]
+ session = get_session()
+ try:
+ for pk in ids:
+ job = crud.get_job(session, pk)
+ if not job:
+ continue
+ job.enabled = False
+ session.add(job)
+ session.commit()
+ finally:
+ session.close()
referer = request.headers.get("Referer")
return RedirectResponse(referer or request.url_for("admin:list", identity=self.identity), status_code=303)
+ @action(
+ name="clear_job_logs",
+ label="清理任务日志(保留任务)",
+ confirmation_message="确认清理该任务的所有日志(保留任务本身)?",
+ add_in_list=True,
+ add_in_detail=True,
+ )
+ async def clear_job_logs(self, request: Request): # type: ignore[override]
+ pks = request.query_params.get("pks", "").split(",")
+ ids = [p for p in pks if p]
+ session = get_session()
+ try:
+ for pk in ids:
+ crud.delete_job_logs_by_job_id(session, pk)
+ finally:
+ session.close()
+ referer = request.headers.get("Referer")
+ return RedirectResponse(referer or request.url_for("admin:list", identity=self.identity), status_code=303)
+
+ @action(
+ name="delete_job_with_logs",
+ label="删除任务及日志(硬删除)",
+ confirmation_message="确认删除该任务及其所有日志?此操作不可恢复。",
+ add_in_list=True,
+ add_in_detail=True,
+ )
+ async def delete_job_with_logs(self, request: Request): # type: ignore[override]
+ pks = request.query_params.get("pks", "").split(",")
+ ids = [p for p in pks if p]
+ session = get_session()
+ try:
+ for pk in ids:
+ job = crud.get_job(session, pk)
+ if not job:
+ continue
+ crud.delete_job_logs_by_job_id(session, job.id)
+ session.delete(job)
+ session.commit()
+ finally:
+ session.close()
+ return RedirectResponse(request.url_for("admin:list", identity=self.identity), status_code=303)
+
async def on_model_change(self, data: dict, model: Job, is_created: bool, request) -> None: # type: ignore[override]
# id 必填(避免插入时触发 NOT NULL)
raw_id = data.get("id") if is_created else (data.get("id") or getattr(model, "id", None))
@@ -191,6 +315,10 @@ class JobLogAdmin(ModelView, model=JobLog):
can_edit = False
can_delete = False
+ # 支持按 job_id 搜索与筛选
+ column_searchable_list = [JobLog.job_id]
+ column_filters = [OperationColumnFilter(JobLog.job_id, title="任务ID")]
+
# 列表更适合扫读:保留关键字段 + message(截断)
column_list = [JobLog.id, JobLog.job_id, JobLog.status, JobLog.started_at, JobLog.finished_at, JobLog.message]
# 默认按 started_at 倒序(最新在前)
diff --git a/app/db/crud.py b/app/db/crud.py
index 0b36381..5892f4b 100644
--- a/app/db/crud.py
+++ b/app/db/crud.py
@@ -98,3 +98,11 @@ def update_job_log(
session.refresh(log)
return log
+
+def delete_job_logs_by_job_id(session: Session, job_id: str) -> int:
+ logs = list(session.scalars(select(JobLog).where(JobLog.job_id == job_id)))
+ for log in logs:
+ session.delete(log)
+ session.commit()
+ return len(logs)
+
diff --git a/app/tasks/execute.py b/app/tasks/execute.py
index 17b12f4..f312a2a 100644
--- a/app/tasks/execute.py
+++ b/app/tasks/execute.py
@@ -56,7 +56,12 @@ def _compose_message(base_message: str, warning_lines: list[str]) -> str:
@celery_app.task(bind=True, name="connecthub.execute_job")
-def execute_job(self, job_id: str | None = None, snapshot_params: dict[str, Any] | None = None) -> dict[str, Any]:
+def execute_job(
+ self,
+ job_id: str | None = None,
+ snapshot_params: dict[str, Any] | None = None,
+ log_id: int | None = None,
+) -> dict[str, Any]:
"""
通用执行入口:
- 传 job_id:从 DB 读取 Job 定义
@@ -78,7 +83,7 @@ def execute_job(self, job_id: str | None = None, snapshot_params: dict[str, Any]
traceback = ""
result: dict[str, Any] = {}
run_log_text = ""
- log_id: int | None = None
+ job_log_id: int | None = log_id
celery_task_id = getattr(self.request, "id", "") or ""
attempt = int(getattr(self.request, "retries", 0) or 0)
snapshot: dict[str, Any] = {}
@@ -113,24 +118,25 @@ def execute_job(self, job_id: str | None = None, snapshot_params: dict[str, Any]
},
}
- # 任务开始即落库一条 RUNNING 记录(若失败则降级为旧行为:结束时再 create)
- try:
- running = crud.create_job_log(
- session,
- job_id=str(job_id or ""),
- status=JobStatus.RUNNING,
- snapshot_params=snapshot,
- message="运行中",
- traceback="",
- run_log="",
- celery_task_id=celery_task_id,
- attempt=attempt,
- started_at=started_at,
- finished_at=None,
- )
- log_id = int(running.id)
- except Exception:
- log_id = None
+ # 任务开始即落库一条 RUNNING 记录(若外部已传入 log_id,则只更新该条;若创建失败则降级为旧行为:结束时再 create)
+ if job_log_id is None:
+ try:
+ running = crud.create_job_log(
+ session,
+ job_id=str(job_id or ""),
+ status=JobStatus.RUNNING,
+ snapshot_params=snapshot,
+ message="运行中",
+ traceback="",
+ run_log="",
+ celery_task_id=celery_task_id,
+ attempt=attempt,
+ started_at=started_at,
+ finished_at=None,
+ )
+ job_log_id = int(running.id)
+ except Exception:
+ job_log_id = None
secrets = decrypt_json(secret_token)
job_instance = instantiate(handler_path)
@@ -155,10 +161,10 @@ def execute_job(self, job_id: str | None = None, snapshot_params: dict[str, Any]
warning_lines = _extract_warning_lines(run_log_text)
message = _compose_message(message, warning_lines)
# 结束时:优先更新 RUNNING 那条;若没有则创建最终记录(兼容降级)
- if log_id is not None:
+ if job_log_id is not None:
crud.update_job_log(
session,
- log_id,
+ job_log_id,
status=status,
message=message,
traceback=traceback,
@@ -176,23 +182,23 @@ def execute_job(self, job_id: str | None = None, snapshot_params: dict[str, Any]
"secret_cfg": secret_token if "secret_token" in locals() else "",
"meta": {
"trigger": "celery",
- "celery_task_id": celery_task_id,
+ "celery_task_id": celery_task_id,
"started_at": started_at.isoformat(),
},
}
- crud.create_job_log(
- session,
- job_id=str(job_id or ""),
- status=status,
- snapshot_params=snapshot,
- message=message,
- traceback=traceback,
- run_log=run_log_text,
+ crud.create_job_log(
+ session,
+ job_id=str(job_id or ""),
+ status=status,
+ snapshot_params=snapshot,
+ message=message,
+ traceback=traceback,
+ run_log=run_log_text,
celery_task_id=celery_task_id,
attempt=attempt,
- started_at=started_at,
- finished_at=finished_at,
- )
+ started_at=started_at,
+ finished_at=finished_at,
+ )
session.close()
return {"status": status.value, "job_id": job_id, "result": result, "message": message}