Compare commits
1 Commits
| Author | SHA1 | Date |
|---|---|---|
|
|
6566549a05 |
|
|
@ -1,3 +1,5 @@
|
||||||
*.db
|
*.db
|
||||||
*.log
|
*.log
|
||||||
pgdata/
|
pgdata/
|
||||||
|
__pycache__/
|
||||||
|
*.pyc
|
||||||
Binary file not shown.
|
|
@ -1,15 +1,23 @@
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from datetime import datetime
|
||||||
|
from urllib.parse import quote_plus
|
||||||
|
|
||||||
from fastapi import APIRouter, HTTPException, Request
|
from fastapi import APIRouter, HTTPException, Request
|
||||||
from starlette.responses import RedirectResponse
|
from starlette.responses import RedirectResponse
|
||||||
|
|
||||||
from app.db import crud
|
from app.db import crud
|
||||||
from app.db.engine import get_session
|
from app.db.engine import get_session
|
||||||
|
from app.db.models import JobStatus
|
||||||
from app.tasks.execute import execute_job
|
from app.tasks.execute import execute_job
|
||||||
|
|
||||||
|
|
||||||
router = APIRouter()
|
router = APIRouter()
|
||||||
|
|
||||||
|
def _redirect_with_error(referer: str, msg: str) -> RedirectResponse:
|
||||||
|
sep = "&" if "?" in referer else "?"
|
||||||
|
return RedirectResponse(f"{referer}{sep}error={quote_plus(msg)}", status_code=303)
|
||||||
|
|
||||||
|
|
||||||
@router.post("/admin/joblogs/{log_id}/retry")
|
@router.post("/admin/joblogs/{log_id}/retry")
|
||||||
def retry_joblog(request: Request, log_id: int):
|
def retry_joblog(request: Request, log_id: int):
|
||||||
|
|
@ -18,20 +26,68 @@ def retry_joblog(request: Request, log_id: int):
|
||||||
log = crud.get_job_log(session, log_id)
|
log = crud.get_job_log(session, log_id)
|
||||||
if not log:
|
if not log:
|
||||||
raise HTTPException(status_code=404, detail="JobLog not found")
|
raise HTTPException(status_code=404, detail="JobLog not found")
|
||||||
# 关键:用 snapshot_params 重新触发任务(其中 secret_cfg 仍为密文)
|
if log.status == JobStatus.RUNNING:
|
||||||
execute_job.delay(snapshot_params=log.snapshot_params)
|
referer = request.headers.get("Referer") or str(request.url_for("admin:details", identity="joblog", pk=str(log_id)))
|
||||||
|
return _redirect_with_error(referer, "该任务日志正在运行中,请结束后再重试。")
|
||||||
|
|
||||||
|
# 创建新的 RUNNING JobLog,并跳转到该条详情页
|
||||||
|
snapshot = dict(log.snapshot_params or {})
|
||||||
|
meta = dict(snapshot.get("meta") or {})
|
||||||
|
meta["trigger"] = "retry"
|
||||||
|
meta["started_at"] = datetime.utcnow().isoformat()
|
||||||
|
snapshot["meta"] = meta
|
||||||
|
|
||||||
|
new_log = crud.create_job_log(
|
||||||
|
session,
|
||||||
|
job_id=str(log.job_id),
|
||||||
|
status=JobStatus.RUNNING,
|
||||||
|
snapshot_params=snapshot,
|
||||||
|
message="运行中",
|
||||||
|
traceback="",
|
||||||
|
run_log="",
|
||||||
|
celery_task_id="",
|
||||||
|
attempt=0,
|
||||||
|
started_at=datetime.utcnow(),
|
||||||
|
finished_at=None,
|
||||||
|
)
|
||||||
|
execute_job.delay(snapshot_params=snapshot, log_id=int(new_log.id))
|
||||||
|
url = request.url_for("admin:details", identity="job-log", pk=str(new_log.id))
|
||||||
|
return RedirectResponse(url, status_code=303)
|
||||||
finally:
|
finally:
|
||||||
session.close()
|
session.close()
|
||||||
|
|
||||||
referer = request.headers.get("Referer") or "/admin"
|
|
||||||
return RedirectResponse(referer, status_code=303)
|
|
||||||
|
|
||||||
|
|
||||||
@router.post("/admin/jobs/{job_id}/run")
|
@router.post("/admin/jobs/{job_id}/run")
|
||||||
def run_job(request: Request, job_id: str):
|
def run_job(request: Request, job_id: str):
|
||||||
# 触发一次立即执行
|
session = get_session()
|
||||||
execute_job.delay(job_id=job_id)
|
try:
|
||||||
referer = request.headers.get("Referer") or "/admin"
|
job = crud.get_job(session, job_id)
|
||||||
return RedirectResponse(referer, status_code=303)
|
if not job:
|
||||||
|
raise HTTPException(status_code=404, detail="Job not found")
|
||||||
|
|
||||||
|
snapshot = {
|
||||||
|
"job_id": job.id,
|
||||||
|
"handler_path": job.handler_path,
|
||||||
|
"public_cfg": job.public_cfg or {},
|
||||||
|
"secret_cfg": job.secret_cfg or "",
|
||||||
|
"meta": {"trigger": "run_now", "started_at": datetime.utcnow().isoformat()},
|
||||||
|
}
|
||||||
|
new_log = crud.create_job_log(
|
||||||
|
session,
|
||||||
|
job_id=str(job.id),
|
||||||
|
status=JobStatus.RUNNING,
|
||||||
|
snapshot_params=snapshot,
|
||||||
|
message="运行中",
|
||||||
|
traceback="",
|
||||||
|
run_log="",
|
||||||
|
celery_task_id="",
|
||||||
|
attempt=0,
|
||||||
|
started_at=datetime.utcnow(),
|
||||||
|
finished_at=None,
|
||||||
|
)
|
||||||
|
execute_job.delay(job_id=job.id, log_id=int(new_log.id))
|
||||||
|
url = request.url_for("admin:details", identity="job-log", pk=str(new_log.id))
|
||||||
|
return RedirectResponse(url, status_code=303)
|
||||||
|
finally:
|
||||||
|
session.close()
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -17,6 +17,11 @@
|
||||||
{% endfor %}: {{ get_object_identifier(model) }}
|
{% endfor %}: {{ get_object_identifier(model) }}
|
||||||
</h3>
|
</h3>
|
||||||
</div>
|
</div>
|
||||||
|
{% if request.query_params.get('error') %}
|
||||||
|
<div class="alert alert-danger m-3" role="alert">
|
||||||
|
{{ request.query_params.get('error') }}
|
||||||
|
</div>
|
||||||
|
{% endif %}
|
||||||
<div class="card-body border-bottom py-3">
|
<div class="card-body border-bottom py-3">
|
||||||
<div class="table-responsive">
|
<div class="table-responsive">
|
||||||
<table class="table card-table table-vcenter text-nowrap table-hover table-bordered">
|
<table class="table card-table table-vcenter text-nowrap table-hover table-bordered">
|
||||||
|
|
|
||||||
|
|
@ -50,6 +50,11 @@
|
||||||
{% endif %}
|
{% endif %}
|
||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
|
{% if request.query_params.get('error') %}
|
||||||
|
<div class="alert alert-danger m-3" role="alert">
|
||||||
|
{{ request.query_params.get('error') }}
|
||||||
|
</div>
|
||||||
|
{% endif %}
|
||||||
<div class="card-body border-bottom py-3">
|
<div class="card-body border-bottom py-3">
|
||||||
<div class="d-flex justify-content-between">
|
<div class="d-flex justify-content-between">
|
||||||
<div class="dropdown col-4">
|
<div class="dropdown col-4">
|
||||||
|
|
|
||||||
|
|
@ -3,14 +3,19 @@ from __future__ import annotations
|
||||||
import json
|
import json
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
from urllib.parse import quote_plus
|
||||||
from zoneinfo import ZoneInfo
|
from zoneinfo import ZoneInfo
|
||||||
|
|
||||||
from croniter import croniter
|
from croniter import croniter
|
||||||
from markupsafe import Markup
|
from markupsafe import Markup
|
||||||
from sqladmin import ModelView, action
|
from sqladmin import ModelView, action
|
||||||
|
from sqladmin.filters import OperationColumnFilter
|
||||||
from sqladmin.models import Request
|
from sqladmin.models import Request
|
||||||
from starlette.responses import RedirectResponse
|
from starlette.responses import RedirectResponse
|
||||||
|
|
||||||
|
from app.db import crud
|
||||||
|
from app.db.engine import get_session
|
||||||
|
from app.db.models import JobStatus
|
||||||
from app.db.models import Job, JobLog
|
from app.db.models import Job, JobLog
|
||||||
from app.plugins.manager import load_job_class
|
from app.plugins.manager import load_job_class
|
||||||
from app.security.fernet import encrypt_json
|
from app.security.fernet import encrypt_json
|
||||||
|
|
@ -48,6 +53,7 @@ class JobAdmin(ModelView, model=Job):
|
||||||
name = "任务"
|
name = "任务"
|
||||||
name_plural = "任务"
|
name_plural = "任务"
|
||||||
icon = "fa fa-cogs"
|
icon = "fa fa-cogs"
|
||||||
|
can_delete = False
|
||||||
|
|
||||||
column_list = [Job.id, Job.enabled, Job.cron_expr, Job.handler_path, Job.updated_at]
|
column_list = [Job.id, Job.enabled, Job.cron_expr, Job.handler_path, Job.updated_at]
|
||||||
column_details_list = [
|
column_details_list = [
|
||||||
|
|
@ -112,11 +118,128 @@ class JobAdmin(ModelView, model=Job):
|
||||||
)
|
)
|
||||||
async def run_now(self, request: Request): # type: ignore[override]
|
async def run_now(self, request: Request): # type: ignore[override]
|
||||||
pks = request.query_params.get("pks", "").split(",")
|
pks = request.query_params.get("pks", "").split(",")
|
||||||
for pk in [p for p in pks if p]:
|
ids = [p for p in pks if p]
|
||||||
execute_job.delay(job_id=pk)
|
if not ids:
|
||||||
|
return RedirectResponse(request.url_for("admin:list", identity=self.identity), status_code=303)
|
||||||
|
|
||||||
|
session = get_session()
|
||||||
|
created_log_id: int | None = None
|
||||||
|
try:
|
||||||
|
for pk in ids:
|
||||||
|
job = crud.get_job(session, pk)
|
||||||
|
if not job:
|
||||||
|
continue
|
||||||
|
snapshot = {
|
||||||
|
"job_id": job.id,
|
||||||
|
"handler_path": job.handler_path,
|
||||||
|
"public_cfg": job.public_cfg or {},
|
||||||
|
"secret_cfg": job.secret_cfg or "",
|
||||||
|
"meta": {"trigger": "admin_run_now"},
|
||||||
|
}
|
||||||
|
log = crud.create_job_log(
|
||||||
|
session,
|
||||||
|
job_id=job.id,
|
||||||
|
status=JobStatus.RUNNING,
|
||||||
|
snapshot_params=snapshot,
|
||||||
|
message="运行中",
|
||||||
|
traceback="",
|
||||||
|
run_log="",
|
||||||
|
celery_task_id="",
|
||||||
|
attempt=0,
|
||||||
|
started_at=datetime.utcnow(),
|
||||||
|
finished_at=None,
|
||||||
|
)
|
||||||
|
if created_log_id is None:
|
||||||
|
created_log_id = int(log.id)
|
||||||
|
execute_job.delay(job_id=job.id, log_id=int(log.id))
|
||||||
|
finally:
|
||||||
|
session.close()
|
||||||
|
|
||||||
|
if created_log_id is not None:
|
||||||
|
url = request.url_for("admin:details", identity="job-log", pk=str(created_log_id))
|
||||||
|
return RedirectResponse(url, status_code=303)
|
||||||
|
return RedirectResponse(request.url_for("admin:list", identity=self.identity), status_code=303)
|
||||||
|
|
||||||
|
@action(
|
||||||
|
name="view_logs",
|
||||||
|
label="查看日志",
|
||||||
|
add_in_list=True,
|
||||||
|
add_in_detail=True,
|
||||||
|
)
|
||||||
|
async def view_logs(self, request: Request): # type: ignore[override]
|
||||||
|
pks = request.query_params.get("pks", "").split(",")
|
||||||
|
pk = next((p for p in pks if p), "")
|
||||||
|
base = str(request.url_for("admin:list", identity="job-log"))
|
||||||
|
if pk:
|
||||||
|
return RedirectResponse(f"{base}?search={quote_plus(pk)}", status_code=303)
|
||||||
|
return RedirectResponse(base, status_code=303)
|
||||||
|
|
||||||
|
@action(
|
||||||
|
name="disable_job",
|
||||||
|
label="停用任务(保留日志)",
|
||||||
|
confirmation_message="确认停用该任务(保留历史日志)?",
|
||||||
|
add_in_list=True,
|
||||||
|
add_in_detail=False,
|
||||||
|
)
|
||||||
|
async def disable_job(self, request: Request): # type: ignore[override]
|
||||||
|
pks = request.query_params.get("pks", "").split(",")
|
||||||
|
ids = [p for p in pks if p]
|
||||||
|
session = get_session()
|
||||||
|
try:
|
||||||
|
for pk in ids:
|
||||||
|
job = crud.get_job(session, pk)
|
||||||
|
if not job:
|
||||||
|
continue
|
||||||
|
job.enabled = False
|
||||||
|
session.add(job)
|
||||||
|
session.commit()
|
||||||
|
finally:
|
||||||
|
session.close()
|
||||||
referer = request.headers.get("Referer")
|
referer = request.headers.get("Referer")
|
||||||
return RedirectResponse(referer or request.url_for("admin:list", identity=self.identity), status_code=303)
|
return RedirectResponse(referer or request.url_for("admin:list", identity=self.identity), status_code=303)
|
||||||
|
|
||||||
|
@action(
|
||||||
|
name="clear_job_logs",
|
||||||
|
label="清理日志(保留任务)",
|
||||||
|
confirmation_message="确认清理该任务的所有日志(保留任务本身)?",
|
||||||
|
add_in_list=True,
|
||||||
|
add_in_detail=False,
|
||||||
|
)
|
||||||
|
async def clear_job_logs(self, request: Request): # type: ignore[override]
|
||||||
|
pks = request.query_params.get("pks", "").split(",")
|
||||||
|
ids = [p for p in pks if p]
|
||||||
|
session = get_session()
|
||||||
|
try:
|
||||||
|
for pk in ids:
|
||||||
|
crud.delete_job_logs_by_job_id(session, pk)
|
||||||
|
finally:
|
||||||
|
session.close()
|
||||||
|
referer = request.headers.get("Referer")
|
||||||
|
return RedirectResponse(referer or request.url_for("admin:list", identity=self.identity), status_code=303)
|
||||||
|
|
||||||
|
@action(
|
||||||
|
name="delete_job_with_logs",
|
||||||
|
label="删除任务(含日志)",
|
||||||
|
confirmation_message="确认删除该任务及其所有日志?此操作不可恢复。",
|
||||||
|
add_in_list=True,
|
||||||
|
add_in_detail=False,
|
||||||
|
)
|
||||||
|
async def delete_job_with_logs(self, request: Request): # type: ignore[override]
|
||||||
|
pks = request.query_params.get("pks", "").split(",")
|
||||||
|
ids = [p for p in pks if p]
|
||||||
|
session = get_session()
|
||||||
|
try:
|
||||||
|
for pk in ids:
|
||||||
|
job = crud.get_job(session, pk)
|
||||||
|
if not job:
|
||||||
|
continue
|
||||||
|
crud.delete_job_logs_by_job_id(session, job.id)
|
||||||
|
session.delete(job)
|
||||||
|
session.commit()
|
||||||
|
finally:
|
||||||
|
session.close()
|
||||||
|
return RedirectResponse(request.url_for("admin:list", identity=self.identity), status_code=303)
|
||||||
|
|
||||||
async def on_model_change(self, data: dict, model: Job, is_created: bool, request) -> None: # type: ignore[override]
|
async def on_model_change(self, data: dict, model: Job, is_created: bool, request) -> None: # type: ignore[override]
|
||||||
# id 必填(避免插入时触发 NOT NULL)
|
# id 必填(避免插入时触发 NOT NULL)
|
||||||
raw_id = data.get("id") if is_created else (data.get("id") or getattr(model, "id", None))
|
raw_id = data.get("id") if is_created else (data.get("id") or getattr(model, "id", None))
|
||||||
|
|
@ -186,11 +309,15 @@ class JobLogAdmin(ModelView, model=JobLog):
|
||||||
name = "任务日志"
|
name = "任务日志"
|
||||||
name_plural = "任务日志"
|
name_plural = "任务日志"
|
||||||
icon = "fa fa-list"
|
icon = "fa fa-list"
|
||||||
|
identity = "job-log"
|
||||||
|
|
||||||
can_create = False
|
can_create = False
|
||||||
can_edit = False
|
can_edit = False
|
||||||
can_delete = False
|
can_delete = False
|
||||||
|
|
||||||
|
# 支持按 job_id 搜索(不启用筛选栏,避免页面溢出)
|
||||||
|
column_searchable_list = [JobLog.job_id]
|
||||||
|
|
||||||
# 列表更适合扫读:保留关键字段 + message(截断)
|
# 列表更适合扫读:保留关键字段 + message(截断)
|
||||||
column_list = [JobLog.id, JobLog.job_id, JobLog.status, JobLog.started_at, JobLog.finished_at, JobLog.message]
|
column_list = [JobLog.id, JobLog.job_id, JobLog.status, JobLog.started_at, JobLog.finished_at, JobLog.message]
|
||||||
# 默认按 started_at 倒序(最新在前)
|
# 默认按 started_at 倒序(最新在前)
|
||||||
|
|
|
||||||
Binary file not shown.
|
|
@ -1,10 +1,28 @@
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
import os
|
||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
from typing import Callable, Iterator
|
from typing import Callable, Iterator
|
||||||
|
|
||||||
|
|
||||||
|
class JobLogIdFilter(logging.Filter):
|
||||||
|
"""
|
||||||
|
仅允许写入“属于指定 job_log_id”的 LogRecord。
|
||||||
|
依赖 setup_logging() 安装的 LogRecordFactory 注入字段:connecthub_job_log_id。
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, *, job_log_id: int) -> None:
|
||||||
|
super().__init__()
|
||||||
|
self.job_log_id = int(job_log_id)
|
||||||
|
|
||||||
|
def filter(self, record: logging.LogRecord) -> bool: # noqa: A003
|
||||||
|
try:
|
||||||
|
return getattr(record, "connecthub_job_log_id", None) == self.job_log_id
|
||||||
|
except Exception:
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
class SafeBufferingHandler(logging.Handler):
|
class SafeBufferingHandler(logging.Handler):
|
||||||
"""
|
"""
|
||||||
只用于“尽力捕获”运行日志:
|
只用于“尽力捕获”运行日志:
|
||||||
|
|
@ -52,7 +70,12 @@ class SafeBufferingHandler(logging.Handler):
|
||||||
|
|
||||||
|
|
||||||
@contextmanager
|
@contextmanager
|
||||||
def capture_logs(*, max_bytes: int = 200_000) -> Iterator[Callable[[], str]]:
|
def capture_logs(
|
||||||
|
*,
|
||||||
|
max_bytes: int = 200_000,
|
||||||
|
job_log_id: int | None = None,
|
||||||
|
file_path: str | None = None,
|
||||||
|
) -> Iterator[Callable[[], str]]:
|
||||||
"""
|
"""
|
||||||
捕获当前进程(root logger)输出的日志文本。
|
捕获当前进程(root logger)输出的日志文本。
|
||||||
任何问题都不应影响业务执行。
|
任何问题都不应影响业务执行。
|
||||||
|
|
@ -64,6 +87,31 @@ def capture_logs(*, max_bytes: int = 200_000) -> Iterator[Callable[[], str]]:
|
||||||
logging.Formatter(fmt="%(asctime)s %(levelname)s %(name)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
|
logging.Formatter(fmt="%(asctime)s %(levelname)s %(name)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
|
||||||
)
|
)
|
||||||
|
|
||||||
|
file_handler: logging.Handler | None = None
|
||||||
|
flt: JobLogIdFilter | None = None
|
||||||
|
if job_log_id is not None:
|
||||||
|
try:
|
||||||
|
flt = JobLogIdFilter(job_log_id=int(job_log_id))
|
||||||
|
handler.addFilter(flt)
|
||||||
|
except Exception:
|
||||||
|
flt = None
|
||||||
|
|
||||||
|
if file_path:
|
||||||
|
try:
|
||||||
|
parent = os.path.dirname(file_path)
|
||||||
|
if parent:
|
||||||
|
os.makedirs(parent, exist_ok=True)
|
||||||
|
fh = logging.FileHandler(file_path, encoding="utf-8")
|
||||||
|
fh.setLevel(logging.INFO)
|
||||||
|
fh.setFormatter(
|
||||||
|
logging.Formatter(fmt="%(asctime)s %(levelname)s %(name)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
|
||||||
|
)
|
||||||
|
if flt is not None:
|
||||||
|
fh.addFilter(flt)
|
||||||
|
file_handler = fh
|
||||||
|
except Exception:
|
||||||
|
file_handler = None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
root.addHandler(handler)
|
root.addHandler(handler)
|
||||||
except Exception:
|
except Exception:
|
||||||
|
|
@ -71,6 +119,16 @@ def capture_logs(*, max_bytes: int = 200_000) -> Iterator[Callable[[], str]]:
|
||||||
yield lambda: ""
|
yield lambda: ""
|
||||||
return
|
return
|
||||||
|
|
||||||
|
if file_handler is not None:
|
||||||
|
try:
|
||||||
|
root.addHandler(file_handler)
|
||||||
|
except Exception:
|
||||||
|
try:
|
||||||
|
file_handler.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
file_handler = None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
yield handler.get_text
|
yield handler.get_text
|
||||||
finally:
|
finally:
|
||||||
|
|
@ -78,5 +136,14 @@ def capture_logs(*, max_bytes: int = 200_000) -> Iterator[Callable[[], str]]:
|
||||||
root.removeHandler(handler)
|
root.removeHandler(handler)
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
if file_handler is not None:
|
||||||
|
try:
|
||||||
|
root.removeHandler(file_handler)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
file_handler.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,43 @@
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from contextvars import ContextVar, Token
|
||||||
|
|
||||||
|
|
||||||
|
_job_id_var: ContextVar[str | None] = ContextVar("connecthub_job_id", default=None)
|
||||||
|
_job_log_id_var: ContextVar[int | None] = ContextVar("connecthub_job_log_id", default=None)
|
||||||
|
|
||||||
|
|
||||||
|
JobContextTokens = tuple[Token[str | None], Token[int | None]]
|
||||||
|
|
||||||
|
|
||||||
|
def set_job_context(*, job_id: str | None, job_log_id: int | None) -> JobContextTokens:
|
||||||
|
"""
|
||||||
|
设置当前执行上下文(用于日志隔离)。
|
||||||
|
返回 tokens,便于在 finally 中 reset 回原值。
|
||||||
|
"""
|
||||||
|
t1 = _job_id_var.set(job_id)
|
||||||
|
t2 = _job_log_id_var.set(job_log_id)
|
||||||
|
return (t1, t2)
|
||||||
|
|
||||||
|
|
||||||
|
def clear_job_context(tokens: JobContextTokens | None = None) -> None:
|
||||||
|
"""
|
||||||
|
清理当前执行上下文。
|
||||||
|
- 若提供 tokens:reset 回进入上下文前的值(推荐)
|
||||||
|
- 否则:直接置空
|
||||||
|
"""
|
||||||
|
if tokens is not None:
|
||||||
|
_job_id_var.reset(tokens[0])
|
||||||
|
_job_log_id_var.reset(tokens[1])
|
||||||
|
return
|
||||||
|
_job_id_var.set(None)
|
||||||
|
_job_log_id_var.set(None)
|
||||||
|
|
||||||
|
|
||||||
|
def get_job_id() -> str | None:
|
||||||
|
return _job_id_var.get()
|
||||||
|
|
||||||
|
|
||||||
|
def get_job_log_id() -> int | None:
|
||||||
|
return _job_log_id_var.get()
|
||||||
|
|
||||||
|
|
@ -5,6 +5,7 @@ import os
|
||||||
from logging.handlers import RotatingFileHandler
|
from logging.handlers import RotatingFileHandler
|
||||||
|
|
||||||
from app.core.config import settings
|
from app.core.config import settings
|
||||||
|
from app.core.log_context import get_job_id, get_job_log_id
|
||||||
|
|
||||||
|
|
||||||
def setup_logging() -> None:
|
def setup_logging() -> None:
|
||||||
|
|
@ -12,6 +13,25 @@ def setup_logging() -> None:
|
||||||
if getattr(logger, "_connecthub_configured", False):
|
if getattr(logger, "_connecthub_configured", False):
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# 为每条日志注入“当前任务上下文”,供 per-run 日志隔离过滤使用。
|
||||||
|
# 仅安装一次(跨所有 logger 生效)。
|
||||||
|
if not getattr(logging, "_connecthub_record_factory_installed", False):
|
||||||
|
old_factory = logging.getLogRecordFactory()
|
||||||
|
|
||||||
|
def _record_factory(*args, **kwargs): # type: ignore[no-untyped-def]
|
||||||
|
record = old_factory(*args, **kwargs)
|
||||||
|
try:
|
||||||
|
setattr(record, "connecthub_job_id", get_job_id())
|
||||||
|
setattr(record, "connecthub_job_log_id", get_job_log_id())
|
||||||
|
except Exception:
|
||||||
|
# best-effort:任何问题都不能影响日志系统
|
||||||
|
setattr(record, "connecthub_job_id", None)
|
||||||
|
setattr(record, "connecthub_job_log_id", None)
|
||||||
|
return record
|
||||||
|
|
||||||
|
logging.setLogRecordFactory(_record_factory)
|
||||||
|
setattr(logging, "_connecthub_record_factory_installed", True)
|
||||||
|
|
||||||
logger.setLevel(logging.INFO)
|
logger.setLevel(logging.INFO)
|
||||||
formatter = logging.Formatter(
|
formatter = logging.Formatter(
|
||||||
fmt="%(asctime)s %(levelname)s %(name)s %(message)s",
|
fmt="%(asctime)s %(levelname)s %(name)s %(message)s",
|
||||||
|
|
|
||||||
Binary file not shown.
Binary file not shown.
|
|
@ -98,3 +98,11 @@ def update_job_log(
|
||||||
session.refresh(log)
|
session.refresh(log)
|
||||||
return log
|
return log
|
||||||
|
|
||||||
|
|
||||||
|
def delete_job_logs_by_job_id(session: Session, job_id: str) -> int:
|
||||||
|
logs = list(session.scalars(select(JobLog).where(JobLog.job_id == job_id)))
|
||||||
|
for log in logs:
|
||||||
|
session.delete(log)
|
||||||
|
session.commit()
|
||||||
|
return len(logs)
|
||||||
|
|
||||||
|
|
|
||||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
|
@ -114,5 +114,3 @@ class SeeyonClient(BaseClient):
|
||||||
json=body,
|
json=body,
|
||||||
headers={"Content-Type": "application/json"},
|
headers={"Content-Type": "application/json"},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
Binary file not shown.
|
|
@ -45,6 +45,15 @@ def tick() -> dict[str, int]:
|
||||||
last_min = _floor_to_minute(last.replace(tzinfo=tz))
|
last_min = _floor_to_minute(last.replace(tzinfo=tz))
|
||||||
else:
|
else:
|
||||||
last_min = _floor_to_minute(last.astimezone(tz))
|
last_min = _floor_to_minute(last.astimezone(tz))
|
||||||
|
# 防御:若 last_run_at 被错误写成 UTC 等导致“在未来”,则忽略该值避免任务永久不触发
|
||||||
|
if last_min > now_min:
|
||||||
|
logger.warning(
|
||||||
|
"job.last_run_at appears in the future (ignored) job_id=%s last_run_at=%s now=%s",
|
||||||
|
job.id,
|
||||||
|
last,
|
||||||
|
now,
|
||||||
|
)
|
||||||
|
else:
|
||||||
if last_min >= now_min:
|
if last_min >= now_min:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
|
@ -56,7 +65,8 @@ def tick() -> dict[str, int]:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
execute_job.delay(job_id=job.id)
|
execute_job.delay(job_id=job.id)
|
||||||
crud.update_job_last_run_at(session, job.id, now_min.replace(tzinfo=None))
|
# 写入时保留 tz 信息,避免在 PostgreSQL timestamptz 中被误当 UTC 导致“未来 last_run_at”
|
||||||
|
crud.update_job_last_run_at(session, job.id, now_min)
|
||||||
triggered += 1
|
triggered += 1
|
||||||
|
|
||||||
except Exception: # noqa: BLE001
|
except Exception: # noqa: BLE001
|
||||||
|
|
|
||||||
|
|
@ -1,12 +1,16 @@
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
import os
|
||||||
import traceback as tb
|
import traceback as tb
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
from zoneinfo import ZoneInfo
|
||||||
|
|
||||||
from app.core.log_capture import capture_logs
|
from app.core.log_capture import capture_logs
|
||||||
from app.core.logging import setup_logging
|
from app.core.logging import setup_logging
|
||||||
|
from app.core.log_context import clear_job_context, set_job_context
|
||||||
|
from app.core.config import settings
|
||||||
from app.db import crud
|
from app.db import crud
|
||||||
from app.db.engine import engine, get_session
|
from app.db.engine import engine, get_session
|
||||||
from app.db.models import JobStatus
|
from app.db.models import JobStatus
|
||||||
|
|
@ -55,8 +59,21 @@ def _compose_message(base_message: str, warning_lines: list[str]) -> str:
|
||||||
return msg
|
return msg
|
||||||
|
|
||||||
|
|
||||||
|
def _safe_job_dir_name(job_id: str) -> str:
|
||||||
|
"""
|
||||||
|
将 job_id 映射为安全的目录名(避免路径分隔符造成目录穿越/嵌套)。
|
||||||
|
"""
|
||||||
|
s = (job_id or "").strip() or "unknown"
|
||||||
|
return s.replace("/", "_").replace("\\", "_")
|
||||||
|
|
||||||
|
|
||||||
@celery_app.task(bind=True, name="connecthub.execute_job")
|
@celery_app.task(bind=True, name="connecthub.execute_job")
|
||||||
def execute_job(self, job_id: str | None = None, snapshot_params: dict[str, Any] | None = None) -> dict[str, Any]:
|
def execute_job(
|
||||||
|
self,
|
||||||
|
job_id: str | None = None,
|
||||||
|
snapshot_params: dict[str, Any] | None = None,
|
||||||
|
log_id: int | None = None,
|
||||||
|
) -> dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
通用执行入口:
|
通用执行入口:
|
||||||
- 传 job_id:从 DB 读取 Job 定义
|
- 传 job_id:从 DB 读取 Job 定义
|
||||||
|
|
@ -78,13 +95,11 @@ def execute_job(self, job_id: str | None = None, snapshot_params: dict[str, Any]
|
||||||
traceback = ""
|
traceback = ""
|
||||||
result: dict[str, Any] = {}
|
result: dict[str, Any] = {}
|
||||||
run_log_text = ""
|
run_log_text = ""
|
||||||
log_id: int | None = None
|
job_log_id: int | None = log_id
|
||||||
celery_task_id = getattr(self.request, "id", "") or ""
|
celery_task_id = getattr(self.request, "id", "") or ""
|
||||||
attempt = int(getattr(self.request, "retries", 0) or 0)
|
attempt = int(getattr(self.request, "retries", 0) or 0)
|
||||||
snapshot: dict[str, Any] = {}
|
snapshot: dict[str, Any] = {}
|
||||||
|
|
||||||
try:
|
|
||||||
with capture_logs(max_bytes=200_000) as get_run_log:
|
|
||||||
try:
|
try:
|
||||||
if snapshot_params:
|
if snapshot_params:
|
||||||
job_id = snapshot_params["job_id"]
|
job_id = snapshot_params["job_id"]
|
||||||
|
|
@ -113,7 +128,8 @@ def execute_job(self, job_id: str | None = None, snapshot_params: dict[str, Any]
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
# 任务开始即落库一条 RUNNING 记录(若失败则降级为旧行为:结束时再 create)
|
# 任务开始即落库一条 RUNNING 记录(若外部已传入 log_id,则只更新该条;若创建失败则降级为旧行为:结束时再 create)
|
||||||
|
if job_log_id is None:
|
||||||
try:
|
try:
|
||||||
running = crud.create_job_log(
|
running = crud.create_job_log(
|
||||||
session,
|
session,
|
||||||
|
|
@ -128,9 +144,29 @@ def execute_job(self, job_id: str | None = None, snapshot_params: dict[str, Any]
|
||||||
started_at=started_at,
|
started_at=started_at,
|
||||||
finished_at=None,
|
finished_at=None,
|
||||||
)
|
)
|
||||||
log_id = int(running.id)
|
job_log_id = int(running.id)
|
||||||
except Exception:
|
except Exception:
|
||||||
log_id = None
|
job_log_id = None
|
||||||
|
|
||||||
|
# per-run 全量日志落盘(best-effort)。若 job_log_id 缺失则无法保证唯一性,直接跳过。
|
||||||
|
per_run_log_path: str | None = None
|
||||||
|
if job_log_id is not None and job_id:
|
||||||
|
try:
|
||||||
|
log_root = settings.log_dir or os.path.join(settings.data_dir, "logs")
|
||||||
|
job_dir = os.path.join(log_root, _safe_job_dir_name(str(job_id)))
|
||||||
|
os.makedirs(job_dir, exist_ok=True)
|
||||||
|
tz = ZoneInfo("Asia/Shanghai")
|
||||||
|
ts = datetime.now(tz).strftime("%Y-%m-%d_%H-%M-%S")
|
||||||
|
per_run_log_path = os.path.join(job_dir, f"{ts}_log-{int(job_log_id)}.log")
|
||||||
|
except Exception:
|
||||||
|
per_run_log_path = None
|
||||||
|
logger.warning("prepare per-run log file failed job_id=%s log_id=%s", job_id, job_log_id)
|
||||||
|
|
||||||
|
ctx_tokens = None
|
||||||
|
with capture_logs(max_bytes=200_000, job_log_id=job_log_id, file_path=per_run_log_path) as get_run_log:
|
||||||
|
try:
|
||||||
|
if job_log_id is not None and job_id:
|
||||||
|
ctx_tokens = set_job_context(job_id=str(job_id), job_log_id=int(job_log_id))
|
||||||
|
|
||||||
secrets = decrypt_json(secret_token)
|
secrets = decrypt_json(secret_token)
|
||||||
job_instance = instantiate(handler_path)
|
job_instance = instantiate(handler_path)
|
||||||
|
|
@ -146,6 +182,11 @@ def execute_job(self, job_id: str | None = None, snapshot_params: dict[str, Any]
|
||||||
traceback = tb.format_exc()
|
traceback = tb.format_exc()
|
||||||
logger.exception("execute_job failed job_id=%s", job_id)
|
logger.exception("execute_job failed job_id=%s", job_id)
|
||||||
finally:
|
finally:
|
||||||
|
try:
|
||||||
|
clear_job_context(ctx_tokens)
|
||||||
|
except Exception:
|
||||||
|
# best-effort:不能影响任务执行
|
||||||
|
pass
|
||||||
try:
|
try:
|
||||||
run_log_text = get_run_log() or ""
|
run_log_text = get_run_log() or ""
|
||||||
except Exception:
|
except Exception:
|
||||||
|
|
@ -154,11 +195,11 @@ def execute_job(self, job_id: str | None = None, snapshot_params: dict[str, Any]
|
||||||
finished_at = datetime.utcnow()
|
finished_at = datetime.utcnow()
|
||||||
warning_lines = _extract_warning_lines(run_log_text)
|
warning_lines = _extract_warning_lines(run_log_text)
|
||||||
message = _compose_message(message, warning_lines)
|
message = _compose_message(message, warning_lines)
|
||||||
# 结束时:优先更新 RUNNING 那条;若没有则创建最终记录(兼容降级)
|
# 结束时:优先更新 RUNNING 那条;若没有则创建最终记录
|
||||||
if log_id is not None:
|
if job_log_id is not None:
|
||||||
crud.update_job_log(
|
crud.update_job_log(
|
||||||
session,
|
session,
|
||||||
log_id,
|
job_log_id,
|
||||||
status=status,
|
status=status,
|
||||||
message=message,
|
message=message,
|
||||||
traceback=traceback,
|
traceback=traceback,
|
||||||
|
|
|
||||||
Binary file not shown.
|
|
@ -1 +0,0 @@
|
||||||
16
|
|
||||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue