from __future__ import annotations from datetime import datetime from urllib.parse import quote_plus from fastapi import APIRouter, HTTPException, Request from starlette.responses import RedirectResponse from app.db import crud from app.db.engine import get_session from app.db.models import JobStatus from app.security.audit import log_event from app.security.permissions import button_permission_code, request_has_permission from app.tasks.execute import execute_job router = APIRouter() def _redirect_with_error(referer: str, msg: str) -> RedirectResponse: sep = "&" if "?" in referer else "?" return RedirectResponse(f"{referer}{sep}error={quote_plus(msg)}", status_code=303) @router.post("/admin/joblogs/{log_id}/retry") def retry_joblog(request: Request, log_id: int): if not request_has_permission(request, button_permission_code("joblog:retry")): referer = request.headers.get("Referer") or str(request.url_for("admin:list", identity="job-log")) return _redirect_with_error(referer, "无权限执行该操作。") session = get_session() try: log = crud.get_job_log(session, log_id) if not log: raise HTTPException(status_code=404, detail="JobLog not found") if log.status == JobStatus.RUNNING: referer = request.headers.get("Referer") or str(request.url_for("admin:details", identity="joblog", pk=str(log_id))) return _redirect_with_error(referer, "该任务日志正在运行中,请结束后再重试。") # 创建新的 RUNNING JobLog,并跳转到该条详情页 snapshot = dict(log.snapshot_params or {}) meta = dict(snapshot.get("meta") or {}) meta["trigger"] = "retry" meta["started_at"] = datetime.utcnow().isoformat() snapshot["meta"] = meta new_log = crud.create_job_log( session, job_id=str(log.job_id), status=JobStatus.RUNNING, snapshot_params=snapshot, message="运行中", traceback="", run_log="", celery_task_id="", attempt=0, started_at=datetime.utcnow(), finished_at=None, ) execute_job.delay(snapshot_params=snapshot, log_id=int(new_log.id)) url = request.url_for("admin:details", identity="job-log", pk=str(new_log.id)) log_event( session, action="admin.action", target=f"joblog:{log_id}:retry", detail={"new_log_id": int(new_log.id)}, request=request, ) return RedirectResponse(url, status_code=303) finally: session.close() @router.post("/admin/jobs/{job_id}/run") def run_job(request: Request, job_id: str): if not request_has_permission(request, button_permission_code("job:run")): referer = request.headers.get("Referer") or str(request.url_for("admin:list", identity="job")) return _redirect_with_error(referer, "无权限执行该操作。") session = get_session() try: job = crud.get_job(session, job_id) if not job: raise HTTPException(status_code=404, detail="Job not found") snapshot = { "job_id": job.id, "handler_path": job.handler_path, "public_cfg": job.public_cfg or {}, "secret_cfg": job.secret_cfg or "", "meta": {"trigger": "run_now", "started_at": datetime.utcnow().isoformat()}, } new_log = crud.create_job_log( session, job_id=str(job.id), status=JobStatus.RUNNING, snapshot_params=snapshot, message="运行中", traceback="", run_log="", celery_task_id="", attempt=0, started_at=datetime.utcnow(), finished_at=None, ) execute_job.delay(job_id=job.id, log_id=int(new_log.id)) url = request.url_for("admin:details", identity="job-log", pk=str(new_log.id)) log_event( session, action="admin.action", target=f"job:{job_id}:run", detail={"new_log_id": int(new_log.id)}, request=request, ) return RedirectResponse(url, status_code=303) finally: session.close()