Vastai-ConnectHub/app/admin/routes.py

94 lines
3.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
from datetime import datetime
from urllib.parse import quote_plus
from fastapi import APIRouter, HTTPException, Request
from starlette.responses import RedirectResponse
from app.db import crud
from app.db.engine import get_session
from app.db.models import JobStatus
from app.tasks.execute import execute_job
router = APIRouter()
def _redirect_with_error(referer: str, msg: str) -> RedirectResponse:
sep = "&" if "?" in referer else "?"
return RedirectResponse(f"{referer}{sep}error={quote_plus(msg)}", status_code=303)
@router.post("/admin/joblogs/{log_id}/retry")
def retry_joblog(request: Request, log_id: int):
session = get_session()
try:
log = crud.get_job_log(session, log_id)
if not log:
raise HTTPException(status_code=404, detail="JobLog not found")
if log.status == JobStatus.RUNNING:
referer = request.headers.get("Referer") or str(request.url_for("admin:details", identity="joblog", pk=str(log_id)))
return _redirect_with_error(referer, "该任务日志正在运行中,请结束后再重试。")
# 创建新的 RUNNING JobLog并跳转到该条详情页
snapshot = dict(log.snapshot_params or {})
meta = dict(snapshot.get("meta") or {})
meta["trigger"] = "retry"
meta["started_at"] = datetime.utcnow().isoformat()
snapshot["meta"] = meta
new_log = crud.create_job_log(
session,
job_id=str(log.job_id),
status=JobStatus.RUNNING,
snapshot_params=snapshot,
message="运行中",
traceback="",
run_log="",
celery_task_id="",
attempt=0,
started_at=datetime.utcnow(),
finished_at=None,
)
execute_job.delay(snapshot_params=snapshot, log_id=int(new_log.id))
url = request.url_for("admin:details", identity="job-log", pk=str(new_log.id))
return RedirectResponse(url, status_code=303)
finally:
session.close()
@router.post("/admin/jobs/{job_id}/run")
def run_job(request: Request, job_id: str):
session = get_session()
try:
job = crud.get_job(session, job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
snapshot = {
"job_id": job.id,
"handler_path": job.handler_path,
"public_cfg": job.public_cfg or {},
"secret_cfg": job.secret_cfg or "",
"meta": {"trigger": "run_now", "started_at": datetime.utcnow().isoformat()},
}
new_log = crud.create_job_log(
session,
job_id=str(job.id),
status=JobStatus.RUNNING,
snapshot_params=snapshot,
message="运行中",
traceback="",
run_log="",
celery_task_id="",
attempt=0,
started_at=datetime.utcnow(),
finished_at=None,
)
execute_job.delay(job_id=job.id, log_id=int(new_log.id))
url = request.url_for("admin:details", identity="job-log", pk=str(new_log.id))
return RedirectResponse(url, status_code=303)
finally:
session.close()