Compare commits

..

1 Commits
dev2 ... main

Author SHA1 Message Date
Marsway 6566549a05 publish: version 0.1 2026-01-13 17:28:37 +08:00
1303 changed files with 487 additions and 1118 deletions

2
.gitignore vendored
View File

@ -1,3 +1,5 @@
*.db *.db
*.log *.log
pgdata/ pgdata/
__pycache__/
*.pyc

View File

@ -1,15 +1,23 @@
from __future__ import annotations from __future__ import annotations
from datetime import datetime
from urllib.parse import quote_plus
from fastapi import APIRouter, HTTPException, Request from fastapi import APIRouter, HTTPException, Request
from starlette.responses import RedirectResponse from starlette.responses import RedirectResponse
from app.db import crud from app.db import crud
from app.db.engine import get_session from app.db.engine import get_session
from app.db.models import JobStatus
from app.tasks.execute import execute_job from app.tasks.execute import execute_job
router = APIRouter() router = APIRouter()
def _redirect_with_error(referer: str, msg: str) -> RedirectResponse:
sep = "&" if "?" in referer else "?"
return RedirectResponse(f"{referer}{sep}error={quote_plus(msg)}", status_code=303)
@router.post("/admin/joblogs/{log_id}/retry") @router.post("/admin/joblogs/{log_id}/retry")
def retry_joblog(request: Request, log_id: int): def retry_joblog(request: Request, log_id: int):
@ -18,20 +26,68 @@ def retry_joblog(request: Request, log_id: int):
log = crud.get_job_log(session, log_id) log = crud.get_job_log(session, log_id)
if not log: if not log:
raise HTTPException(status_code=404, detail="JobLog not found") raise HTTPException(status_code=404, detail="JobLog not found")
# 关键:用 snapshot_params 重新触发任务(其中 secret_cfg 仍为密文) if log.status == JobStatus.RUNNING:
execute_job.delay(snapshot_params=log.snapshot_params) referer = request.headers.get("Referer") or str(request.url_for("admin:details", identity="joblog", pk=str(log_id)))
return _redirect_with_error(referer, "该任务日志正在运行中,请结束后再重试。")
# 创建新的 RUNNING JobLog并跳转到该条详情页
snapshot = dict(log.snapshot_params or {})
meta = dict(snapshot.get("meta") or {})
meta["trigger"] = "retry"
meta["started_at"] = datetime.utcnow().isoformat()
snapshot["meta"] = meta
new_log = crud.create_job_log(
session,
job_id=str(log.job_id),
status=JobStatus.RUNNING,
snapshot_params=snapshot,
message="运行中",
traceback="",
run_log="",
celery_task_id="",
attempt=0,
started_at=datetime.utcnow(),
finished_at=None,
)
execute_job.delay(snapshot_params=snapshot, log_id=int(new_log.id))
url = request.url_for("admin:details", identity="job-log", pk=str(new_log.id))
return RedirectResponse(url, status_code=303)
finally: finally:
session.close() session.close()
referer = request.headers.get("Referer") or "/admin"
return RedirectResponse(referer, status_code=303)
@router.post("/admin/jobs/{job_id}/run") @router.post("/admin/jobs/{job_id}/run")
def run_job(request: Request, job_id: str): def run_job(request: Request, job_id: str):
# 触发一次立即执行 session = get_session()
execute_job.delay(job_id=job_id) try:
referer = request.headers.get("Referer") or "/admin" job = crud.get_job(session, job_id)
return RedirectResponse(referer, status_code=303) if not job:
raise HTTPException(status_code=404, detail="Job not found")
snapshot = {
"job_id": job.id,
"handler_path": job.handler_path,
"public_cfg": job.public_cfg or {},
"secret_cfg": job.secret_cfg or "",
"meta": {"trigger": "run_now", "started_at": datetime.utcnow().isoformat()},
}
new_log = crud.create_job_log(
session,
job_id=str(job.id),
status=JobStatus.RUNNING,
snapshot_params=snapshot,
message="运行中",
traceback="",
run_log="",
celery_task_id="",
attempt=0,
started_at=datetime.utcnow(),
finished_at=None,
)
execute_job.delay(job_id=job.id, log_id=int(new_log.id))
url = request.url_for("admin:details", identity="job-log", pk=str(new_log.id))
return RedirectResponse(url, status_code=303)
finally:
session.close()

View File

@ -17,6 +17,11 @@
{% endfor %}: {{ get_object_identifier(model) }} {% endfor %}: {{ get_object_identifier(model) }}
</h3> </h3>
</div> </div>
{% if request.query_params.get('error') %}
<div class="alert alert-danger m-3" role="alert">
{{ request.query_params.get('error') }}
</div>
{% endif %}
<div class="card-body border-bottom py-3"> <div class="card-body border-bottom py-3">
<div class="table-responsive"> <div class="table-responsive">
<table class="table card-table table-vcenter text-nowrap table-hover table-bordered"> <table class="table card-table table-vcenter text-nowrap table-hover table-bordered">

View File

@ -50,6 +50,11 @@
{% endif %} {% endif %}
</div> </div>
</div> </div>
{% if request.query_params.get('error') %}
<div class="alert alert-danger m-3" role="alert">
{{ request.query_params.get('error') }}
</div>
{% endif %}
<div class="card-body border-bottom py-3"> <div class="card-body border-bottom py-3">
<div class="d-flex justify-content-between"> <div class="d-flex justify-content-between">
<div class="dropdown col-4"> <div class="dropdown col-4">

View File

@ -3,14 +3,19 @@ from __future__ import annotations
import json import json
from datetime import datetime from datetime import datetime
from typing import Any from typing import Any
from urllib.parse import quote_plus
from zoneinfo import ZoneInfo from zoneinfo import ZoneInfo
from croniter import croniter from croniter import croniter
from markupsafe import Markup from markupsafe import Markup
from sqladmin import ModelView, action from sqladmin import ModelView, action
from sqladmin.filters import OperationColumnFilter
from sqladmin.models import Request from sqladmin.models import Request
from starlette.responses import RedirectResponse from starlette.responses import RedirectResponse
from app.db import crud
from app.db.engine import get_session
from app.db.models import JobStatus
from app.db.models import Job, JobLog from app.db.models import Job, JobLog
from app.plugins.manager import load_job_class from app.plugins.manager import load_job_class
from app.security.fernet import encrypt_json from app.security.fernet import encrypt_json
@ -48,6 +53,7 @@ class JobAdmin(ModelView, model=Job):
name = "任务" name = "任务"
name_plural = "任务" name_plural = "任务"
icon = "fa fa-cogs" icon = "fa fa-cogs"
can_delete = False
column_list = [Job.id, Job.enabled, Job.cron_expr, Job.handler_path, Job.updated_at] column_list = [Job.id, Job.enabled, Job.cron_expr, Job.handler_path, Job.updated_at]
column_details_list = [ column_details_list = [
@ -112,11 +118,128 @@ class JobAdmin(ModelView, model=Job):
) )
async def run_now(self, request: Request): # type: ignore[override] async def run_now(self, request: Request): # type: ignore[override]
pks = request.query_params.get("pks", "").split(",") pks = request.query_params.get("pks", "").split(",")
for pk in [p for p in pks if p]: ids = [p for p in pks if p]
execute_job.delay(job_id=pk) if not ids:
return RedirectResponse(request.url_for("admin:list", identity=self.identity), status_code=303)
session = get_session()
created_log_id: int | None = None
try:
for pk in ids:
job = crud.get_job(session, pk)
if not job:
continue
snapshot = {
"job_id": job.id,
"handler_path": job.handler_path,
"public_cfg": job.public_cfg or {},
"secret_cfg": job.secret_cfg or "",
"meta": {"trigger": "admin_run_now"},
}
log = crud.create_job_log(
session,
job_id=job.id,
status=JobStatus.RUNNING,
snapshot_params=snapshot,
message="运行中",
traceback="",
run_log="",
celery_task_id="",
attempt=0,
started_at=datetime.utcnow(),
finished_at=None,
)
if created_log_id is None:
created_log_id = int(log.id)
execute_job.delay(job_id=job.id, log_id=int(log.id))
finally:
session.close()
if created_log_id is not None:
url = request.url_for("admin:details", identity="job-log", pk=str(created_log_id))
return RedirectResponse(url, status_code=303)
return RedirectResponse(request.url_for("admin:list", identity=self.identity), status_code=303)
@action(
name="view_logs",
label="查看日志",
add_in_list=True,
add_in_detail=True,
)
async def view_logs(self, request: Request): # type: ignore[override]
pks = request.query_params.get("pks", "").split(",")
pk = next((p for p in pks if p), "")
base = str(request.url_for("admin:list", identity="job-log"))
if pk:
return RedirectResponse(f"{base}?search={quote_plus(pk)}", status_code=303)
return RedirectResponse(base, status_code=303)
@action(
name="disable_job",
label="停用任务(保留日志)",
confirmation_message="确认停用该任务(保留历史日志)?",
add_in_list=True,
add_in_detail=False,
)
async def disable_job(self, request: Request): # type: ignore[override]
pks = request.query_params.get("pks", "").split(",")
ids = [p for p in pks if p]
session = get_session()
try:
for pk in ids:
job = crud.get_job(session, pk)
if not job:
continue
job.enabled = False
session.add(job)
session.commit()
finally:
session.close()
referer = request.headers.get("Referer") referer = request.headers.get("Referer")
return RedirectResponse(referer or request.url_for("admin:list", identity=self.identity), status_code=303) return RedirectResponse(referer or request.url_for("admin:list", identity=self.identity), status_code=303)
@action(
name="clear_job_logs",
label="清理日志(保留任务)",
confirmation_message="确认清理该任务的所有日志(保留任务本身)?",
add_in_list=True,
add_in_detail=False,
)
async def clear_job_logs(self, request: Request): # type: ignore[override]
pks = request.query_params.get("pks", "").split(",")
ids = [p for p in pks if p]
session = get_session()
try:
for pk in ids:
crud.delete_job_logs_by_job_id(session, pk)
finally:
session.close()
referer = request.headers.get("Referer")
return RedirectResponse(referer or request.url_for("admin:list", identity=self.identity), status_code=303)
@action(
name="delete_job_with_logs",
label="删除任务(含日志)",
confirmation_message="确认删除该任务及其所有日志?此操作不可恢复。",
add_in_list=True,
add_in_detail=False,
)
async def delete_job_with_logs(self, request: Request): # type: ignore[override]
pks = request.query_params.get("pks", "").split(",")
ids = [p for p in pks if p]
session = get_session()
try:
for pk in ids:
job = crud.get_job(session, pk)
if not job:
continue
crud.delete_job_logs_by_job_id(session, job.id)
session.delete(job)
session.commit()
finally:
session.close()
return RedirectResponse(request.url_for("admin:list", identity=self.identity), status_code=303)
async def on_model_change(self, data: dict, model: Job, is_created: bool, request) -> None: # type: ignore[override] async def on_model_change(self, data: dict, model: Job, is_created: bool, request) -> None: # type: ignore[override]
# id 必填(避免插入时触发 NOT NULL # id 必填(避免插入时触发 NOT NULL
raw_id = data.get("id") if is_created else (data.get("id") or getattr(model, "id", None)) raw_id = data.get("id") if is_created else (data.get("id") or getattr(model, "id", None))
@ -186,11 +309,15 @@ class JobLogAdmin(ModelView, model=JobLog):
name = "任务日志" name = "任务日志"
name_plural = "任务日志" name_plural = "任务日志"
icon = "fa fa-list" icon = "fa fa-list"
identity = "job-log"
can_create = False can_create = False
can_edit = False can_edit = False
can_delete = False can_delete = False
# 支持按 job_id 搜索(不启用筛选栏,避免页面溢出)
column_searchable_list = [JobLog.job_id]
# 列表更适合扫读:保留关键字段 + message截断 # 列表更适合扫读:保留关键字段 + message截断
column_list = [JobLog.id, JobLog.job_id, JobLog.status, JobLog.started_at, JobLog.finished_at, JobLog.message] column_list = [JobLog.id, JobLog.job_id, JobLog.status, JobLog.started_at, JobLog.finished_at, JobLog.message]
# 默认按 started_at 倒序(最新在前) # 默认按 started_at 倒序(最新在前)

View File

@ -1,10 +1,28 @@
from __future__ import annotations from __future__ import annotations
import logging import logging
import os
from contextlib import contextmanager from contextlib import contextmanager
from typing import Callable, Iterator from typing import Callable, Iterator
class JobLogIdFilter(logging.Filter):
"""
仅允许写入属于指定 job_log_id LogRecord
依赖 setup_logging() 安装的 LogRecordFactory 注入字段connecthub_job_log_id
"""
def __init__(self, *, job_log_id: int) -> None:
super().__init__()
self.job_log_id = int(job_log_id)
def filter(self, record: logging.LogRecord) -> bool: # noqa: A003
try:
return getattr(record, "connecthub_job_log_id", None) == self.job_log_id
except Exception:
return False
class SafeBufferingHandler(logging.Handler): class SafeBufferingHandler(logging.Handler):
""" """
只用于尽力捕获运行日志 只用于尽力捕获运行日志
@ -52,7 +70,12 @@ class SafeBufferingHandler(logging.Handler):
@contextmanager @contextmanager
def capture_logs(*, max_bytes: int = 200_000) -> Iterator[Callable[[], str]]: def capture_logs(
*,
max_bytes: int = 200_000,
job_log_id: int | None = None,
file_path: str | None = None,
) -> Iterator[Callable[[], str]]:
""" """
捕获当前进程root logger输出的日志文本 捕获当前进程root logger输出的日志文本
任何问题都不应影响业务执行 任何问题都不应影响业务执行
@ -64,6 +87,31 @@ def capture_logs(*, max_bytes: int = 200_000) -> Iterator[Callable[[], str]]:
logging.Formatter(fmt="%(asctime)s %(levelname)s %(name)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S") logging.Formatter(fmt="%(asctime)s %(levelname)s %(name)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
) )
file_handler: logging.Handler | None = None
flt: JobLogIdFilter | None = None
if job_log_id is not None:
try:
flt = JobLogIdFilter(job_log_id=int(job_log_id))
handler.addFilter(flt)
except Exception:
flt = None
if file_path:
try:
parent = os.path.dirname(file_path)
if parent:
os.makedirs(parent, exist_ok=True)
fh = logging.FileHandler(file_path, encoding="utf-8")
fh.setLevel(logging.INFO)
fh.setFormatter(
logging.Formatter(fmt="%(asctime)s %(levelname)s %(name)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
)
if flt is not None:
fh.addFilter(flt)
file_handler = fh
except Exception:
file_handler = None
try: try:
root.addHandler(handler) root.addHandler(handler)
except Exception: except Exception:
@ -71,6 +119,16 @@ def capture_logs(*, max_bytes: int = 200_000) -> Iterator[Callable[[], str]]:
yield lambda: "" yield lambda: ""
return return
if file_handler is not None:
try:
root.addHandler(file_handler)
except Exception:
try:
file_handler.close()
except Exception:
pass
file_handler = None
try: try:
yield handler.get_text yield handler.get_text
finally: finally:
@ -78,5 +136,14 @@ def capture_logs(*, max_bytes: int = 200_000) -> Iterator[Callable[[], str]]:
root.removeHandler(handler) root.removeHandler(handler)
except Exception: except Exception:
pass pass
if file_handler is not None:
try:
root.removeHandler(file_handler)
except Exception:
pass
try:
file_handler.close()
except Exception:
pass

43
app/core/log_context.py Normal file
View File

@ -0,0 +1,43 @@
from __future__ import annotations
from contextvars import ContextVar, Token
_job_id_var: ContextVar[str | None] = ContextVar("connecthub_job_id", default=None)
_job_log_id_var: ContextVar[int | None] = ContextVar("connecthub_job_log_id", default=None)
JobContextTokens = tuple[Token[str | None], Token[int | None]]
def set_job_context(*, job_id: str | None, job_log_id: int | None) -> JobContextTokens:
"""
设置当前执行上下文用于日志隔离
返回 tokens便于在 finally reset 回原值
"""
t1 = _job_id_var.set(job_id)
t2 = _job_log_id_var.set(job_log_id)
return (t1, t2)
def clear_job_context(tokens: JobContextTokens | None = None) -> None:
"""
清理当前执行上下文
- 若提供 tokensreset 回进入上下文前的值推荐
- 否则直接置空
"""
if tokens is not None:
_job_id_var.reset(tokens[0])
_job_log_id_var.reset(tokens[1])
return
_job_id_var.set(None)
_job_log_id_var.set(None)
def get_job_id() -> str | None:
return _job_id_var.get()
def get_job_log_id() -> int | None:
return _job_log_id_var.get()

View File

@ -5,6 +5,7 @@ import os
from logging.handlers import RotatingFileHandler from logging.handlers import RotatingFileHandler
from app.core.config import settings from app.core.config import settings
from app.core.log_context import get_job_id, get_job_log_id
def setup_logging() -> None: def setup_logging() -> None:
@ -12,6 +13,25 @@ def setup_logging() -> None:
if getattr(logger, "_connecthub_configured", False): if getattr(logger, "_connecthub_configured", False):
return return
# 为每条日志注入“当前任务上下文”,供 per-run 日志隔离过滤使用。
# 仅安装一次(跨所有 logger 生效)。
if not getattr(logging, "_connecthub_record_factory_installed", False):
old_factory = logging.getLogRecordFactory()
def _record_factory(*args, **kwargs): # type: ignore[no-untyped-def]
record = old_factory(*args, **kwargs)
try:
setattr(record, "connecthub_job_id", get_job_id())
setattr(record, "connecthub_job_log_id", get_job_log_id())
except Exception:
# best-effort任何问题都不能影响日志系统
setattr(record, "connecthub_job_id", None)
setattr(record, "connecthub_job_log_id", None)
return record
logging.setLogRecordFactory(_record_factory)
setattr(logging, "_connecthub_record_factory_installed", True)
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
formatter = logging.Formatter( formatter = logging.Formatter(
fmt="%(asctime)s %(levelname)s %(name)s %(message)s", fmt="%(asctime)s %(levelname)s %(name)s %(message)s",

View File

@ -98,3 +98,11 @@ def update_job_log(
session.refresh(log) session.refresh(log)
return log return log
def delete_job_logs_by_job_id(session: Session, job_id: str) -> int:
logs = list(session.scalars(select(JobLog).where(JobLog.job_id == job_id)))
for log in logs:
session.delete(log)
session.commit()
return len(logs)

View File

@ -114,5 +114,3 @@ class SeeyonClient(BaseClient):
json=body, json=body,
headers={"Content-Type": "application/json"}, headers={"Content-Type": "application/json"},
) )

View File

@ -45,8 +45,17 @@ def tick() -> dict[str, int]:
last_min = _floor_to_minute(last.replace(tzinfo=tz)) last_min = _floor_to_minute(last.replace(tzinfo=tz))
else: else:
last_min = _floor_to_minute(last.astimezone(tz)) last_min = _floor_to_minute(last.astimezone(tz))
if last_min >= now_min: # 防御:若 last_run_at 被错误写成 UTC 等导致“在未来”,则忽略该值避免任务永久不触发
continue if last_min > now_min:
logger.warning(
"job.last_run_at appears in the future (ignored) job_id=%s last_run_at=%s now=%s",
job.id,
last,
now,
)
else:
if last_min >= now_min:
continue
# croniter 默认按传入 datetime 计算,这里用 Asia/Shanghai # croniter 默认按传入 datetime 计算,这里用 Asia/Shanghai
base = now_min - timedelta(minutes=1) base = now_min - timedelta(minutes=1)
@ -56,7 +65,8 @@ def tick() -> dict[str, int]:
continue continue
execute_job.delay(job_id=job.id) execute_job.delay(job_id=job.id)
crud.update_job_last_run_at(session, job.id, now_min.replace(tzinfo=None)) # 写入时保留 tz 信息,避免在 PostgreSQL timestamptz 中被误当 UTC 导致“未来 last_run_at”
crud.update_job_last_run_at(session, job.id, now_min)
triggered += 1 triggered += 1
except Exception: # noqa: BLE001 except Exception: # noqa: BLE001

View File

@ -1,12 +1,16 @@
from __future__ import annotations from __future__ import annotations
import logging import logging
import os
import traceback as tb import traceback as tb
from datetime import datetime from datetime import datetime
from typing import Any from typing import Any
from zoneinfo import ZoneInfo
from app.core.log_capture import capture_logs from app.core.log_capture import capture_logs
from app.core.logging import setup_logging from app.core.logging import setup_logging
from app.core.log_context import clear_job_context, set_job_context
from app.core.config import settings
from app.db import crud from app.db import crud
from app.db.engine import engine, get_session from app.db.engine import engine, get_session
from app.db.models import JobStatus from app.db.models import JobStatus
@ -55,8 +59,21 @@ def _compose_message(base_message: str, warning_lines: list[str]) -> str:
return msg return msg
def _safe_job_dir_name(job_id: str) -> str:
"""
job_id 映射为安全的目录名避免路径分隔符造成目录穿越/嵌套
"""
s = (job_id or "").strip() or "unknown"
return s.replace("/", "_").replace("\\", "_")
@celery_app.task(bind=True, name="connecthub.execute_job") @celery_app.task(bind=True, name="connecthub.execute_job")
def execute_job(self, job_id: str | None = None, snapshot_params: dict[str, Any] | None = None) -> dict[str, Any]: def execute_job(
self,
job_id: str | None = None,
snapshot_params: dict[str, Any] | None = None,
log_id: int | None = None,
) -> dict[str, Any]:
""" """
通用执行入口 通用执行入口
- job_id DB 读取 Job 定义 - job_id DB 读取 Job 定义
@ -78,59 +95,78 @@ def execute_job(self, job_id: str | None = None, snapshot_params: dict[str, Any]
traceback = "" traceback = ""
result: dict[str, Any] = {} result: dict[str, Any] = {}
run_log_text = "" run_log_text = ""
log_id: int | None = None job_log_id: int | None = log_id
celery_task_id = getattr(self.request, "id", "") or "" celery_task_id = getattr(self.request, "id", "") or ""
attempt = int(getattr(self.request, "retries", 0) or 0) attempt = int(getattr(self.request, "retries", 0) or 0)
snapshot: dict[str, Any] = {} snapshot: dict[str, Any] = {}
try: try:
with capture_logs(max_bytes=200_000) as get_run_log: if snapshot_params:
job_id = snapshot_params["job_id"]
handler_path = snapshot_params["handler_path"]
public_cfg = snapshot_params.get("public_cfg", {}) or {}
secret_token = snapshot_params.get("secret_cfg", "") or ""
else:
if not job_id:
raise ValueError("job_id or snapshot_params is required")
job = crud.get_job(session, job_id)
if not job:
raise ValueError(f"Job not found: {job_id}")
handler_path = job.handler_path
public_cfg = job.public_cfg or {}
secret_token = job.secret_cfg or ""
snapshot = snapshot_params or {
"job_id": job_id,
"handler_path": handler_path,
"public_cfg": public_cfg,
"secret_cfg": secret_token,
"meta": {
"trigger": "celery",
"celery_task_id": celery_task_id,
"started_at": started_at.isoformat(),
},
}
# 任务开始即落库一条 RUNNING 记录(若外部已传入 log_id则只更新该条若创建失败则降级为旧行为结束时再 create
if job_log_id is None:
try: try:
if snapshot_params: running = crud.create_job_log(
job_id = snapshot_params["job_id"] session,
handler_path = snapshot_params["handler_path"] job_id=str(job_id or ""),
public_cfg = snapshot_params.get("public_cfg", {}) or {} status=JobStatus.RUNNING,
secret_token = snapshot_params.get("secret_cfg", "") or "" snapshot_params=snapshot,
else: message="运行中",
if not job_id: traceback="",
raise ValueError("job_id or snapshot_params is required") run_log="",
job = crud.get_job(session, job_id) celery_task_id=celery_task_id,
if not job: attempt=attempt,
raise ValueError(f"Job not found: {job_id}") started_at=started_at,
handler_path = job.handler_path finished_at=None,
public_cfg = job.public_cfg or {} )
secret_token = job.secret_cfg or "" job_log_id = int(running.id)
except Exception:
job_log_id = None
snapshot = snapshot_params or { # per-run 全量日志落盘best-effort。若 job_log_id 缺失则无法保证唯一性,直接跳过。
"job_id": job_id, per_run_log_path: str | None = None
"handler_path": handler_path, if job_log_id is not None and job_id:
"public_cfg": public_cfg, try:
"secret_cfg": secret_token, log_root = settings.log_dir or os.path.join(settings.data_dir, "logs")
"meta": { job_dir = os.path.join(log_root, _safe_job_dir_name(str(job_id)))
"trigger": "celery", os.makedirs(job_dir, exist_ok=True)
"celery_task_id": celery_task_id, tz = ZoneInfo("Asia/Shanghai")
"started_at": started_at.isoformat(), ts = datetime.now(tz).strftime("%Y-%m-%d_%H-%M-%S")
}, per_run_log_path = os.path.join(job_dir, f"{ts}_log-{int(job_log_id)}.log")
} except Exception:
per_run_log_path = None
logger.warning("prepare per-run log file failed job_id=%s log_id=%s", job_id, job_log_id)
# 任务开始即落库一条 RUNNING 记录(若失败则降级为旧行为:结束时再 create ctx_tokens = None
try: with capture_logs(max_bytes=200_000, job_log_id=job_log_id, file_path=per_run_log_path) as get_run_log:
running = crud.create_job_log( try:
session, if job_log_id is not None and job_id:
job_id=str(job_id or ""), ctx_tokens = set_job_context(job_id=str(job_id), job_log_id=int(job_log_id))
status=JobStatus.RUNNING,
snapshot_params=snapshot,
message="运行中",
traceback="",
run_log="",
celery_task_id=celery_task_id,
attempt=attempt,
started_at=started_at,
finished_at=None,
)
log_id = int(running.id)
except Exception:
log_id = None
secrets = decrypt_json(secret_token) secrets = decrypt_json(secret_token)
job_instance = instantiate(handler_path) job_instance = instantiate(handler_path)
@ -146,6 +182,11 @@ def execute_job(self, job_id: str | None = None, snapshot_params: dict[str, Any]
traceback = tb.format_exc() traceback = tb.format_exc()
logger.exception("execute_job failed job_id=%s", job_id) logger.exception("execute_job failed job_id=%s", job_id)
finally: finally:
try:
clear_job_context(ctx_tokens)
except Exception:
# best-effort不能影响任务执行
pass
try: try:
run_log_text = get_run_log() or "" run_log_text = get_run_log() or ""
except Exception: except Exception:
@ -154,11 +195,11 @@ def execute_job(self, job_id: str | None = None, snapshot_params: dict[str, Any]
finished_at = datetime.utcnow() finished_at = datetime.utcnow()
warning_lines = _extract_warning_lines(run_log_text) warning_lines = _extract_warning_lines(run_log_text)
message = _compose_message(message, warning_lines) message = _compose_message(message, warning_lines)
# 结束时:优先更新 RUNNING 那条;若没有则创建最终记录(兼容降级) # 结束时:优先更新 RUNNING 那条;若没有则创建最终记录
if log_id is not None: if job_log_id is not None:
crud.update_job_log( crud.update_job_log(
session, session,
log_id, job_log_id,
status=status, status=status,
message=message, message=message,
traceback=traceback, traceback=traceback,
@ -169,30 +210,30 @@ def execute_job(self, job_id: str | None = None, snapshot_params: dict[str, Any]
) )
else: else:
if not snapshot: if not snapshot:
snapshot = snapshot_params or { snapshot = snapshot_params or {
"job_id": job_id, "job_id": job_id,
"handler_path": handler_path if "handler_path" in locals() else "", "handler_path": handler_path if "handler_path" in locals() else "",
"public_cfg": public_cfg if "public_cfg" in locals() else {}, "public_cfg": public_cfg if "public_cfg" in locals() else {},
"secret_cfg": secret_token if "secret_token" in locals() else "", "secret_cfg": secret_token if "secret_token" in locals() else "",
"meta": { "meta": {
"trigger": "celery", "trigger": "celery",
"celery_task_id": celery_task_id, "celery_task_id": celery_task_id,
"started_at": started_at.isoformat(), "started_at": started_at.isoformat(),
}, },
} }
crud.create_job_log( crud.create_job_log(
session, session,
job_id=str(job_id or ""), job_id=str(job_id or ""),
status=status, status=status,
snapshot_params=snapshot, snapshot_params=snapshot,
message=message, message=message,
traceback=traceback, traceback=traceback,
run_log=run_log_text, run_log=run_log_text,
celery_task_id=celery_task_id, celery_task_id=celery_task_id,
attempt=attempt, attempt=attempt,
started_at=started_at, started_at=started_at,
finished_at=finished_at, finished_at=finished_at,
) )
session.close() session.close()
return {"status": status.value, "job_id": job_id, "result": result, "message": message} return {"status": status.value, "job_id": job_id, "result": result, "message": message}

Binary file not shown.

View File

@ -1 +0,0 @@
16

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Some files were not shown because too many files have changed in this diff Show More