from __future__ import annotations import logging import traceback as tb from datetime import datetime from typing import Any from app.core.log_capture import capture_logs from app.core.logging import setup_logging from app.db import crud from app.db.engine import engine, get_session from app.db.models import JobStatus from app.db.schema import ensure_schema from app.plugins.manager import instantiate from app.security.fernet import decrypt_json from app.tasks.celery_app import celery_app logger = logging.getLogger("connecthub.tasks.execute") @celery_app.task(bind=True, name="connecthub.execute_job") def execute_job(self, job_id: str | None = None, snapshot_params: dict[str, Any] | None = None) -> dict[str, Any]: """ 通用执行入口: - 传 job_id:从 DB 读取 Job 定义 - 传 snapshot_params:按快照重跑(用于 Admin 一键重试) """ setup_logging() # 确保 schema 已升级(即使 worker 先启动也不会写库失败) try: ensure_schema(engine) except Exception: # schema upgrade 失败不能影响执行(最多导致 run_log 无法写入) pass started_at = datetime.utcnow() session = get_session() status = JobStatus.SUCCESS message = "" traceback = "" result: dict[str, Any] = {} run_log_text = "" try: with capture_logs(max_bytes=200_000) as get_run_log: try: if snapshot_params: job_id = snapshot_params["job_id"] handler_path = snapshot_params["handler_path"] public_cfg = snapshot_params.get("public_cfg", {}) or {} secret_token = snapshot_params.get("secret_cfg", "") or "" else: if not job_id: raise ValueError("job_id or snapshot_params is required") job = crud.get_job(session, job_id) if not job: raise ValueError(f"Job not found: {job_id}") handler_path = job.handler_path public_cfg = job.public_cfg or {} secret_token = job.secret_cfg or "" secrets = decrypt_json(secret_token) job_instance = instantiate(handler_path) out = job_instance.run(params=public_cfg, secrets=secrets) if isinstance(out, dict): result = out message = "OK" except Exception as e: # noqa: BLE001 (framework-wide) # 如果是 Celery retry 触发,框架可在此处扩展为自动 retry;此版本先记录失败信息 status = JobStatus.FAILURE message = repr(e) traceback = tb.format_exc() logger.exception("execute_job failed job_id=%s", job_id) finally: try: run_log_text = get_run_log() or "" except Exception: run_log_text = "" finally: finished_at = datetime.utcnow() snapshot = snapshot_params or { "job_id": job_id, "handler_path": handler_path if "handler_path" in locals() else "", "public_cfg": public_cfg if "public_cfg" in locals() else {}, "secret_cfg": secret_token if "secret_token" in locals() else "", "meta": { "trigger": "celery", "celery_task_id": getattr(self.request, "id", "") or "", "started_at": started_at.isoformat(), }, } crud.create_job_log( session, job_id=str(job_id or ""), status=status, snapshot_params=snapshot, message=message, traceback=traceback, run_log=run_log_text, celery_task_id=getattr(self.request, "id", "") or "", attempt=int(getattr(self.request, "retries", 0) or 0), started_at=started_at, finished_at=finished_at, ) session.close() return {"status": status.value, "job_id": job_id, "result": result, "message": message}