From 25eb7fdacec14045b3df6cd5dbc5c9d617011016 Mon Sep 17 00:00:00 2001 From: Marsway Date: Tue, 13 Jan 2026 15:39:08 +0800 Subject: [PATCH] update --- Log.md | 34 ++ app/core/log_capture.py | 69 +++- app/core/log_context.py | 43 +++ app/core/logging.py | 20 + app/tasks/execute.py | 127 ++++--- docs/connecthub_功能与部署指南.md | 603 ------------------------------ 6 files changed, 246 insertions(+), 650 deletions(-) create mode 100644 Log.md create mode 100644 app/core/log_context.py delete mode 100644 docs/connecthub_功能与部署指南.md diff --git a/Log.md b/Log.md new file mode 100644 index 0000000..fc57c06 --- /dev/null +++ b/Log.md @@ -0,0 +1,34 @@ +# Job 执行全量日志(落盘) + +本项目在任务执行时,会将该次执行的 **全量日志** 落盘到 `data/logs`(容器内为 `/data/logs`)下,便于排障与留存。 + +## 1. 路径规则 + +- 根目录:`./data/logs/` +- 按任务分目录:`./data/logs//` +- 单次执行日志文件名: + - `YYYY-MM-DD_HH-mm-ss_log-.log` + - 时间戳使用 **Asia/Shanghai**(与调度时区一致) + +示例: + +```text +./data/logs/sync_oa_to_didi.sync_legal_entity/2026-01-13_10-20-33_log-1234.log +``` + +> 注意:若 `job_id` 中包含路径分隔符(`/` 或 `\`),会被替换为 `_`,避免产生目录穿越或多级目录。 + +## 2. 与 Admin 的 JobLog.run_log 的区别 + +- **`JobLog.run_log`(入库)** + - 展示在 Admin 的 JobLog 详情页 + - 有字节上限(会截断),适合快速浏览 +- **`data/logs/.../*.log`(落盘全量)** + - 尽力写入,不做长度截断 + - 适合完整排障、留存、归档 + +## 3. 运维建议 + +- 建议将 `./data/logs` 纳入备份或日志归档策略(按业务合规要求)。 +- 如需自动清理(防止无限增长),推荐在宿主机使用 `logrotate` 或定时任务做保留策略(本项目不内置自动清理逻辑)。 + diff --git a/app/core/log_capture.py b/app/core/log_capture.py index 46dd4b6..c3fe0d5 100644 --- a/app/core/log_capture.py +++ b/app/core/log_capture.py @@ -1,10 +1,28 @@ from __future__ import annotations import logging +import os from contextlib import contextmanager from typing import Callable, Iterator +class JobLogIdFilter(logging.Filter): + """ + 仅允许写入“属于指定 job_log_id”的 LogRecord。 + 依赖 setup_logging() 安装的 LogRecordFactory 注入字段:connecthub_job_log_id。 + """ + + def __init__(self, *, job_log_id: int) -> None: + super().__init__() + self.job_log_id = int(job_log_id) + + def filter(self, record: logging.LogRecord) -> bool: # noqa: A003 + try: + return getattr(record, "connecthub_job_log_id", None) == self.job_log_id + except Exception: + return False + + class SafeBufferingHandler(logging.Handler): """ 只用于“尽力捕获”运行日志: @@ -52,7 +70,12 @@ class SafeBufferingHandler(logging.Handler): @contextmanager -def capture_logs(*, max_bytes: int = 200_000) -> Iterator[Callable[[], str]]: +def capture_logs( + *, + max_bytes: int = 200_000, + job_log_id: int | None = None, + file_path: str | None = None, +) -> Iterator[Callable[[], str]]: """ 捕获当前进程(root logger)输出的日志文本。 任何问题都不应影响业务执行。 @@ -64,6 +87,31 @@ def capture_logs(*, max_bytes: int = 200_000) -> Iterator[Callable[[], str]]: logging.Formatter(fmt="%(asctime)s %(levelname)s %(name)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S") ) + file_handler: logging.Handler | None = None + flt: JobLogIdFilter | None = None + if job_log_id is not None: + try: + flt = JobLogIdFilter(job_log_id=int(job_log_id)) + handler.addFilter(flt) + except Exception: + flt = None + + if file_path: + try: + parent = os.path.dirname(file_path) + if parent: + os.makedirs(parent, exist_ok=True) + fh = logging.FileHandler(file_path, encoding="utf-8") + fh.setLevel(logging.INFO) + fh.setFormatter( + logging.Formatter(fmt="%(asctime)s %(levelname)s %(name)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S") + ) + if flt is not None: + fh.addFilter(flt) + file_handler = fh + except Exception: + file_handler = None + try: root.addHandler(handler) except Exception: @@ -71,6 +119,16 @@ def capture_logs(*, max_bytes: int = 200_000) -> Iterator[Callable[[], str]]: yield lambda: "" return + if file_handler is not None: + try: + root.addHandler(file_handler) + except Exception: + try: + file_handler.close() + except Exception: + pass + file_handler = None + try: yield handler.get_text finally: @@ -78,5 +136,14 @@ def capture_logs(*, max_bytes: int = 200_000) -> Iterator[Callable[[], str]]: root.removeHandler(handler) except Exception: pass + if file_handler is not None: + try: + root.removeHandler(file_handler) + except Exception: + pass + try: + file_handler.close() + except Exception: + pass diff --git a/app/core/log_context.py b/app/core/log_context.py new file mode 100644 index 0000000..80f6dd3 --- /dev/null +++ b/app/core/log_context.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +from contextvars import ContextVar, Token + + +_job_id_var: ContextVar[str | None] = ContextVar("connecthub_job_id", default=None) +_job_log_id_var: ContextVar[int | None] = ContextVar("connecthub_job_log_id", default=None) + + +JobContextTokens = tuple[Token[str | None], Token[int | None]] + + +def set_job_context(*, job_id: str | None, job_log_id: int | None) -> JobContextTokens: + """ + 设置当前执行上下文(用于日志隔离)。 + 返回 tokens,便于在 finally 中 reset 回原值。 + """ + t1 = _job_id_var.set(job_id) + t2 = _job_log_id_var.set(job_log_id) + return (t1, t2) + + +def clear_job_context(tokens: JobContextTokens | None = None) -> None: + """ + 清理当前执行上下文。 + - 若提供 tokens:reset 回进入上下文前的值(推荐) + - 否则:直接置空 + """ + if tokens is not None: + _job_id_var.reset(tokens[0]) + _job_log_id_var.reset(tokens[1]) + return + _job_id_var.set(None) + _job_log_id_var.set(None) + + +def get_job_id() -> str | None: + return _job_id_var.get() + + +def get_job_log_id() -> int | None: + return _job_log_id_var.get() + diff --git a/app/core/logging.py b/app/core/logging.py index 1fa274a..454fbc6 100644 --- a/app/core/logging.py +++ b/app/core/logging.py @@ -5,6 +5,7 @@ import os from logging.handlers import RotatingFileHandler from app.core.config import settings +from app.core.log_context import get_job_id, get_job_log_id def setup_logging() -> None: @@ -12,6 +13,25 @@ def setup_logging() -> None: if getattr(logger, "_connecthub_configured", False): return + # 为每条日志注入“当前任务上下文”,供 per-run 日志隔离过滤使用。 + # 仅安装一次(跨所有 logger 生效)。 + if not getattr(logging, "_connecthub_record_factory_installed", False): + old_factory = logging.getLogRecordFactory() + + def _record_factory(*args, **kwargs): # type: ignore[no-untyped-def] + record = old_factory(*args, **kwargs) + try: + setattr(record, "connecthub_job_id", get_job_id()) + setattr(record, "connecthub_job_log_id", get_job_log_id()) + except Exception: + # best-effort:任何问题都不能影响日志系统 + setattr(record, "connecthub_job_id", None) + setattr(record, "connecthub_job_log_id", None) + return record + + logging.setLogRecordFactory(_record_factory) + setattr(logging, "_connecthub_record_factory_installed", True) + logger.setLevel(logging.INFO) formatter = logging.Formatter( fmt="%(asctime)s %(levelname)s %(name)s %(message)s", diff --git a/app/tasks/execute.py b/app/tasks/execute.py index f312a2a..60bd66f 100644 --- a/app/tasks/execute.py +++ b/app/tasks/execute.py @@ -1,12 +1,16 @@ from __future__ import annotations import logging +import os import traceback as tb from datetime import datetime from typing import Any +from zoneinfo import ZoneInfo from app.core.log_capture import capture_logs from app.core.logging import setup_logging +from app.core.log_context import clear_job_context, set_job_context +from app.core.config import settings from app.db import crud from app.db.engine import engine, get_session from app.db.models import JobStatus @@ -55,6 +59,14 @@ def _compose_message(base_message: str, warning_lines: list[str]) -> str: return msg +def _safe_job_dir_name(job_id: str) -> str: + """ + 将 job_id 映射为安全的目录名(避免路径分隔符造成目录穿越/嵌套)。 + """ + s = (job_id or "").strip() or "unknown" + return s.replace("/", "_").replace("\\", "_") + + @celery_app.task(bind=True, name="connecthub.execute_job") def execute_job( self, @@ -89,54 +101,72 @@ def execute_job( snapshot: dict[str, Any] = {} try: - with capture_logs(max_bytes=200_000) as get_run_log: + if snapshot_params: + job_id = snapshot_params["job_id"] + handler_path = snapshot_params["handler_path"] + public_cfg = snapshot_params.get("public_cfg", {}) or {} + secret_token = snapshot_params.get("secret_cfg", "") or "" + else: + if not job_id: + raise ValueError("job_id or snapshot_params is required") + job = crud.get_job(session, job_id) + if not job: + raise ValueError(f"Job not found: {job_id}") + handler_path = job.handler_path + public_cfg = job.public_cfg or {} + secret_token = job.secret_cfg or "" + + snapshot = snapshot_params or { + "job_id": job_id, + "handler_path": handler_path, + "public_cfg": public_cfg, + "secret_cfg": secret_token, + "meta": { + "trigger": "celery", + "celery_task_id": celery_task_id, + "started_at": started_at.isoformat(), + }, + } + + # 任务开始即落库一条 RUNNING 记录(若外部已传入 log_id,则只更新该条;若创建失败则降级为旧行为:结束时再 create) + if job_log_id is None: try: - if snapshot_params: - job_id = snapshot_params["job_id"] - handler_path = snapshot_params["handler_path"] - public_cfg = snapshot_params.get("public_cfg", {}) or {} - secret_token = snapshot_params.get("secret_cfg", "") or "" - else: - if not job_id: - raise ValueError("job_id or snapshot_params is required") - job = crud.get_job(session, job_id) - if not job: - raise ValueError(f"Job not found: {job_id}") - handler_path = job.handler_path - public_cfg = job.public_cfg or {} - secret_token = job.secret_cfg or "" + running = crud.create_job_log( + session, + job_id=str(job_id or ""), + status=JobStatus.RUNNING, + snapshot_params=snapshot, + message="运行中", + traceback="", + run_log="", + celery_task_id=celery_task_id, + attempt=attempt, + started_at=started_at, + finished_at=None, + ) + job_log_id = int(running.id) + except Exception: + job_log_id = None - snapshot = snapshot_params or { - "job_id": job_id, - "handler_path": handler_path, - "public_cfg": public_cfg, - "secret_cfg": secret_token, - "meta": { - "trigger": "celery", - "celery_task_id": celery_task_id, - "started_at": started_at.isoformat(), - }, - } + # per-run 全量日志落盘(best-effort)。若 job_log_id 缺失则无法保证唯一性,直接跳过。 + per_run_log_path: str | None = None + if job_log_id is not None and job_id: + try: + log_root = settings.log_dir or os.path.join(settings.data_dir, "logs") + job_dir = os.path.join(log_root, _safe_job_dir_name(str(job_id))) + os.makedirs(job_dir, exist_ok=True) + tz = ZoneInfo("Asia/Shanghai") + ts = datetime.now(tz).strftime("%Y-%m-%d_%H-%M-%S") + per_run_log_path = os.path.join(job_dir, f"{ts}_log-{int(job_log_id)}.log") + except Exception: + per_run_log_path = None + logger.warning("prepare per-run log file failed job_id=%s log_id=%s", job_id, job_log_id) - # 任务开始即落库一条 RUNNING 记录(若外部已传入 log_id,则只更新该条;若创建失败则降级为旧行为:结束时再 create) - if job_log_id is None: - try: - running = crud.create_job_log( - session, - job_id=str(job_id or ""), - status=JobStatus.RUNNING, - snapshot_params=snapshot, - message="运行中", - traceback="", - run_log="", - celery_task_id=celery_task_id, - attempt=attempt, - started_at=started_at, - finished_at=None, - ) - job_log_id = int(running.id) - except Exception: - job_log_id = None + ctx_tokens = None + with capture_logs(max_bytes=200_000, job_log_id=job_log_id, file_path=per_run_log_path) as get_run_log: + try: + if job_log_id is not None and job_id: + ctx_tokens = set_job_context(job_id=str(job_id), job_log_id=int(job_log_id)) secrets = decrypt_json(secret_token) job_instance = instantiate(handler_path) @@ -152,6 +182,11 @@ def execute_job( traceback = tb.format_exc() logger.exception("execute_job failed job_id=%s", job_id) finally: + try: + clear_job_context(ctx_tokens) + except Exception: + # best-effort:不能影响任务执行 + pass try: run_log_text = get_run_log() or "" except Exception: @@ -160,7 +195,7 @@ def execute_job( finished_at = datetime.utcnow() warning_lines = _extract_warning_lines(run_log_text) message = _compose_message(message, warning_lines) - # 结束时:优先更新 RUNNING 那条;若没有则创建最终记录(兼容降级) + # 结束时:优先更新 RUNNING 那条;若没有则创建最终记录 if job_log_id is not None: crud.update_job_log( session, diff --git a/docs/connecthub_功能与部署指南.md b/docs/connecthub_功能与部署指南.md deleted file mode 100644 index 9353d82..0000000 --- a/docs/connecthub_功能与部署指南.md +++ /dev/null @@ -1,603 +0,0 @@ -# ConnectHub 功能与部署指南 - -本文件面向开发与运维,汇总 ConnectHub 当前仓库 **已实现的全部功能(feature)**,并给出 **开发→上线(docker compose 手动)→备份/回滚/排障** 的全流程指南。 - -> 约定:本文所有“定位”均以本仓库当前代码为准;`extensions/` 下内容被定义为 **标准示例**(可复制改造),不等同于平台核心能力。 - ---- - -### 定时任务不触发排查:last_run_at 时区导致“被认为在未来” - -如果你发现某个 Job 第一次手动触发/定时触发成功后,后续按 cron 到点却一直不再触发(`dispatcher.tick` 的日志里 `triggered: 0`),很可能是 `last_run_at` 的时区写入/解释不一致导致: - -- 调度器会用 `Asia/Shanghai` 解释并比较 `last_run_at`,若认为 `last_run_at >= 当前分钟` 就会跳过触发; -- 如果数据库把 `last_run_at` 存成了 `... +00:00`(UTC),而你期望按 `+08:00` 计算,就可能出现“last_run_at 在未来 8 小时”的错觉,导致任务被持续跳过。 - -解决: -- 新版本已修复:写入 `last_run_at` 时保留时区信息,并在发现 `last_run_at` 明显晚于当前分钟(未来时间)时自动忽略该值,避免任务永久不触发。 - -## 1. 项目概览 - -### 1.1 项目定位 - -ConnectHub 是一个轻量级企业集成中间件:统一管理多系统集成任务(Job),提供定时调度、执行监控与“一键重试/立即运行”。 - -### 1.2 组件与架构(文字版) - -- **FastAPI + SQLAdmin**:提供管理台(任务/日志)与简单健康检查 -- **PostgreSQL**:持久化任务定义与执行日志 -- **Redis**:Celery broker/backend -- **Celery worker**:执行任务 -- **Celery beat**:每分钟 tick 调度器,按 cron 触发任务 - -### 1.3 入口与检查点 - -- **Admin**:`http://:8000/admin`(由 `docker-compose.yml` 暴露 `8000:8000`) -- **健康检查**:`GET /health` - - 入口:`app/main.py` - - 返回:`{"ok": true, "name": ""}` - ---- - -## 2. Feature 全量清单(按模块分组) - -本章只列举“仓库当前代码已实现”的能力,并给出源码定位。 - -### 2.1 Web/API 与 Admin 管理台(SQLAdmin) - -- **FastAPI 应用初始化** - - **定位**:`app/main.py` - - **行为**: - - 初始化日志系统(见 2.7) - - 确保 `/data` 与可选日志目录存在 - - 确保 Fernet key 已生成并持久化(见 2.6) - - 确保 DB schema(见 2.8) - -- **Job(任务)管理** - - **模型定位**:`app/db/models.py`(`Job`) - - **Admin 视图定位**:`app/admin/views.py`(`JobAdmin`) - - **字段含义**: - - `id`:任务 ID(创建必填,可在表单中编辑主键) - - `enabled`:是否启用(调度只触发启用任务) - - `cron_expr`:cron 表达式(Asia/Shanghai 时区语义) - - `handler_path`:处理器路径(插件类定位,见 2.4) - - `public_cfg`:明文配置(JSON object) - - `secret_cfg`:密文配置(DB 存 Fernet token;编辑页不回显) - - `last_run_at`:最近一次被调度触发的时间(防重复) - - **保存校验(重要)**: - - `id` 必填(创建时) - - `handler_path` 必须可 import,且类必须继承 `BaseJob` - - `cron_expr` 必须可被 `croniter` 解析 - - `public_cfg` 必须是 **合法 JSON 对象(dict)** - - `secret_cfg` 创建时必须是 **合法 JSON 对象(dict)**,并会加密落库;编辑时留空表示不更新,填入则会校验并加密覆盖 - - **管理动作(actions)**: - - 立即运行(Run Now):创建 RUNNING 的 JobLog 并异步执行 - - 查看日志:跳转到 JobLog 列表并按 job_id 搜索 - - 停用任务(保留日志) - - 清理日志(保留任务) - - 删除任务(含日志) - -- **JobLog(任务日志)管理** - - **模型定位**:`app/db/models.py`(`JobLog`,`JobStatus`) - - **Admin 视图定位**:`app/admin/views.py`(`JobLogAdmin`) - - **展示与能力**: - - 只读(不可创建/编辑/删除) - - 列表支持按 `job_id` 搜索 - - 详情展示: - - `snapshot_params`(快照参数) - - `message`(包含基础消息 + warnings 摘要,可能截断) - - `traceback`(异常堆栈) - - `run_log`(运行日志,可能截断) - - `celery_task_id` / `attempt` / `started_at` / `finished_at` - -- **Admin 路由(Retry / Run Now)** - - **定位**:`app/admin/routes.py` - - **能力**: - - `POST /admin/joblogs/{log_id}/retry`:基于历史 `snapshot_params` 创建新 RUNNING JobLog 并重跑 - - 保护:当原日志状态为 RUNNING 时禁止重试 - - 标记:在 snapshot.meta 中写入 `trigger=retry` - - `POST /admin/jobs/{job_id}/run`:创建 RUNNING JobLog 并按当前 DB Job 定义执行 - -### 2.2 调度系统(Cron + 防重复) - -- **Beat 定时 tick** - - **定位**:`app/tasks/celery_app.py`(`beat_schedule` 每 60 秒) - - **触发任务名**:`connecthub.dispatcher.tick` - -- **调度器逻辑** - - **定位**:`app/tasks/dispatcher.py` - - **行为**: - - 只读取 `enabled=True` 的 Jobs(`app/db/crud.py:list_enabled_jobs`) - - 使用 `croniter` 判断“当前分钟是否应触发”(时区 Asia/Shanghai) - - 使用 `last_run_at` 防止同一分钟重复触发(对 naive datetime 做时区解释) - - 触发执行:`execute_job.delay(job_id=job.id)` - -### 2.3 执行引擎(通用 execute_job) - -- **任务执行入口** - - **定位**:`app/tasks/execute.py`(Celery task:`connecthub.execute_job`) - - **两种入口模式**: - - 传 `job_id`:从 DB 读取 `handler_path/public_cfg/secret_cfg` - - 传 `snapshot_params`:按快照执行(用于 Retry) - -- **JobLog 生命周期** - - **定位**:`app/tasks/execute.py` + `app/db/crud.py` - - **行为**: - - 若未显式传入 `log_id`,会尽力先创建一条 RUNNING 记录;若创建失败则降级(结束时再创建最终记录) - - 执行结束后更新该条 JobLog:状态(SUCCESS/FAILURE)、message、traceback、run_log、attempt、finished_at 等 - -- **运行日志捕获与截断** - - **定位**:`app/core/log_capture.py` - - **策略**: - - 捕获 root logger 输出写入 `run_log` - - 以字节数限制(默认 200KB)截断并追加标记 - - **message 合成策略** - - **定位**:`app/tasks/execute.py` - - 从 run_log 提取 WARNING 行并追加到 message,并做总长度保护(50k 字符上限) - -### 2.4 插件机制(handler_path 动态加载) - -- **BaseJob 规范** - - **定位**:`app/jobs/base.py` - - **要求**:插件 Job 必须实现 `run(params, secrets)` - - `params`:来自 `Job.public_cfg`(明文) - - `secrets`:来自 `Job.secret_cfg` 解密后的明文(仅内存) - -- **handler_path 解析与加载** - - **定位**:`app/plugins/manager.py` - - **支持格式**: - - `pkg.mod:ClassName`(推荐) - - `pkg.mod.ClassName` - - **约束**: - - 类必须存在且是 `BaseJob` 子类,否则保存/加载失败 - -### 2.5 集成客户端(统一 HTTP 调用层) - -- **致远 OA(SeeyonClient)** - - **定位**:`app/integrations/seeyon.py` - - **能力**: - - `POST /seeyon/rest/token` 获取 token(`id`) - - 业务请求自动携带 `token` header - - 遇到 401 或响应包含 `Invalid token`:自动刷新 token 并重试一次 - - CAP4 无流程表单导出:`POST /seeyon/rest/cap4/form/soap/export` - -- **滴滴(DidiClient)** - - **定位**:`app/integrations/didi.py` - - **能力**: - - `POST /river/Auth/authorize` 获取 `access_token` 并缓存(考虑 skew) - - 生成签名 sign(当前实现为 MD5) - - 遇到 401:清空 token,重新 authorize 后重试一次 - - 关键 API: - - 公司主体查询:`GET /river/LegalEntity/get` - - 员工明细:`GET /river/Member/detail` - - 员工修改:`POST /river/Member/edit` - -### 2.6 配置与安全(public_cfg / secret_cfg 与 Fernet) - -- **配置来源** - - **定位**:`app/core/config.py`(`Settings` 从 `.env` 读取) - - 关键变量(默认值见代码与 `README.md`): - - `DATA_DIR`(默认 `/data`) - - `DB_URL` - - `REDIS_URL` - - `FERNET_KEY_PATH`(默认 `/data/fernet.key`) - - `LOG_DIR`(默认 `/data/logs`,可为空关闭文件日志) - -- **secret_cfg 加密存储与解密执行** - - **定位**:`app/security/fernet.py` - - **行为**: - - Admin 保存 `secret_cfg` 时加密落库(Fernet token) - - Worker 执行时解密为 dict,仅在内存中传给 Job - - Fernet key 在启动时自动生成并写入 `FERNET_KEY_PATH`(见 `app/main.py`) - - **重要约束(上线必读)**: - - **正式环境必须持久化并保留同一个 Fernet key 文件**,否则历史 `secret_cfg` 将无法解密。 - - **常见脏数据兼容/报错** - - token 被引号包裹、混入空白/换行会被清理 - - token 被截断会报 “looks truncated”,需重新保存 secret_cfg 重新加密 - -### 2.7 日志与可观测 - -- **全局日志初始化** - - **定位**:`app/core/logging.py` - - **行为**: - - stdout 输出(INFO) - - 若 `LOG_DIR` 不为空:写入滚动文件 `connecthub.log`(10MB * 5) - -- **任务级日志(run_log)** - - **定位**:`app/core/log_capture.py` + `app/tasks/execute.py` - - **行为**:捕获执行期间日志写入 `JobLog.run_log`,超限截断 - -### 2.8 数据库与 schema 自升级(轻量) - -- **DB Engine/Session** - - **定位**:`app/db/engine.py` - - **行为**:按 `DB_URL` 创建 engine;sqlite 兼容 `check_same_thread=False` - -- **schema 确保与轻量自升级** - - **定位**:`app/db/schema.py` - - **行为**: - - `create_all` 初次建表 - - 若 `job_logs.run_log` 列不存在:`ALTER TABLE` 补列 - - 若 `status` 存在约束且不允许 `RUNNING`:做兼容迁移(SQLite 重建表 / Postgres 调整 CHECK) - -### 2.9 运维脚本与容器化形态 - -- **docker compose 服务组成** - - **定位**:`docker-compose.yml` - - 服务:`redis` / `postgres` / `backend` / `worker` / `beat` - - 持久化: - - `./data/pgdata` → postgres 数据目录 - - `./data` → 容器 `/data`(包含 fernet.key、logs 等) - -- **开发 overlay** - - **定位**:`docker-compose.dev.yml` - - 行为: - - backend:`uvicorn --reload` - - worker/beat:`watchfiles` 监听 `app/` 与 `extensions/` 变更自动重启 - -- **管理脚本** - - **定位**:`connecthub.sh` - - 能力:build/start/restart/stop + dev-build/dev-start/... + logs(可 follow、可 tail、可指定 service) - -- **镜像构建** - - **定位**:`docker/Dockerfile` - - 行为:`pip install .` 后复制 `app/` 与 `extensions/` 到镜像内 `/app` - ---- - -## 3. 标准示例(extensions) - -本章内容来自 `extensions/`,定位为 **标准示例/模板**:用于展示“如何写 Job、如何组织集成调用、如何记录日志与处理错误”。上线时可保留,也可按业务需要替换/扩展。 - -### 3.1 示例文件与入口 - -- **示例文件**:`extensions/sync_oa_to_didi/job.py` -- **handler_path 填写示例**(用于 Admin 创建 Job): - - `extensions.sync_oa_to_didi.job:SyncOAToDidiTokenJob` - - `extensions.sync_oa_to_didi.job:SyncOAToDidiExportFormJob` - - `extensions.sync_oa_to_didi.job:SyncOAToDidiLegalEntitySyncJob` - -### 3.2 SyncOAToDidiTokenJob(token 获取演示) - -- **目的**:演示调用致远 OA 获取 token,并在日志中进行基础脱敏输出。 -- **需要的 public_cfg 字段** - - `base_url`:致远 OA base url(不包含具体路径) -- **需要的 secret_cfg 字段(解密后)** - - `rest_user`:REST 帐号 - - `rest_password`:REST 密码 - - `loginName`:可选(模拟登录名) -- **行为特征** - - 使用 `SeeyonClient.authenticate()` 获取 token - - 日志输出会对 token 做 mask(避免完整泄露) - -### 3.3 SyncOAToDidiExportFormJob(CAP4 无流程表单导出) - -- **目的**:调用致远 OA CAP4 表单导出接口,返回原始响应文本,并将大文本按块写入 run_log(尽力而为)。 -- **需要的 public_cfg 字段** - - `base_url` - - `templateCode` - - 可选:`senderLoginName` / `rightId` / `doTrigger` / `param` / `extra`(扩展字段 dict) -- **需要的 secret_cfg 字段(解密后)** - - `rest_user` / `rest_password` / `loginName` -- **行为特征** - - 返回结构中包含 `raw`(原始文本)与 `meta`(content_length/content_type 等) - - 大文本会拆分 chunk 记录到 run_log(仍受 run_log 总量上限截断) - -### 3.4 SyncOAToDidiLegalEntitySyncJob(OA→滴滴公司主体同步示例) - -- **目的**:从 OA 导出数据中解析“工号/所属公司”,并同步到滴滴员工的 `legal_entity_id`。 -- **需要的 public_cfg 字段** - - `oa_base_url` - - `oa_templateCode` - - `didi_base_url` - - 可选:`senderLoginName/rightId/doTrigger/param/extra`(透传到 OA 导出) -- **需要的 secret_cfg 字段(解密后)** - - OA:`rest_user/rest_password/loginName` - - 滴滴:`company_id/client_id/client_secret/sign_key` -- **行为特征(示例策略)** - - 从表单 definition 中定位 display 为“工号/所属公司”的字段名 - - 公司主体查询结果做进程内缓存 - - 员工修改按文档要求增加 150ms 间隔(避免限频) - - 输出统计:总行数/成功/跳过/错误列表(截取前 50) - ---- - -## 4. 开发全流程指南(本地开发) - -### 4.1 前置条件 - -- 安装 Docker 与 docker compose -- 端口规划(默认 compose 暴露): - - `8000`:backend / Admin - - `5432`:postgres(如不需要宿主机直连,可在 compose 里移除映射) - - `6379`:redis(同上) - -### 4.2 初始化与启动(开发模式) - -1) 在仓库根目录创建 `.env`(参考 `env.example`): - -```bash -cp env.example .env -``` - -2) 启动(开发 overlay,支持热更新): - -```bash -docker compose -f docker-compose.yml -f docker-compose.dev.yml up -d --build -``` - -或使用脚本: - -```bash -./connecthub.sh dev-build -./connecthub.sh dev-start -``` - -3) 首次启动会自动完成(无需手工执行): -- 创建 `/data` 目录(宿主机对应 `./data`) -- 生成并持久化 Fernet key(默认 `./data/fernet.key`) -- 初始化/升级数据库 schema(建表、补列、兼容约束) - -4) 打开 Admin: -- `http://localhost:8000/admin` - -### 4.3 新增一个插件 Job(标准步骤) - -1) 在 `extensions/` 下创建你的模块与 Job 类(继承 `BaseJob`,实现 `run(params, secrets)`)。 -2) 在 Admin 创建 Job: -- `id`:自定义(建议带命名空间) -- `enabled`:true -- `cron_expr`:例如 `* * * * *`(每分钟) -- `handler_path`:例如 `extensions.your_ext.job:YourJob` -- `public_cfg`:必须是 JSON object(形如 `{...}`) -- `secret_cfg`:必须是 JSON object(形如 `{...}`,保存时会加密) -3) 观察执行结果: -- 通过 Job 上的“立即运行”或等待 cron 触发 -- 在 JobLog 中查看 `message/traceback/run_log/snapshot_params` - -### 4.4 查看日志(开发/排查) - -- 查看全部服务日志: - -```bash -./connecthub.sh log -``` - -- 只看 worker(并 follow): - -```bash -./connecthub.sh log -f worker -``` - -### 4.5 常见开发坑(必须注意) - -- `public_cfg/secret_cfg` 在 Admin 中保存时必须输入 **合法 JSON 对象**(双引号、且为 `{...}`),否则会直接阻止落库。 -- 编辑 Job 时 `secret_cfg` **不回显**:留空表示不更新;填写则会校验 JSON 并重新加密覆盖。 -- 如果你在不同环境/不同机器上复用数据库,必须同时迁移 `/data/fernet.key`,否则解密会失败。 - ---- - -## 5. 上线全流程指南(docker compose 手动部署) - -> 本项目当前形态为“统一环境 + 手动发布”,以下流程以 `docker-compose.yml` 为准(生产不要使用 dev overlay)。 - -### 5.1 部署前准备清单 - -- **目标机依赖**:Docker、docker compose -- **部署目录**:建议固定到一个路径(例如 `/opt/connecthub`),保证 `./data` 可持久化 -- **必须持久化的内容** - - `./data/pgdata`:PostgreSQL 数据目录(必须) - - `./data/fernet.key`:Fernet key(必须,影响历史 secret_cfg 可解密) - - `./data/logs`:文件日志(可选,但推荐便于排障) -- **端口与网络** - - 若非必要,建议仅暴露 `8000`;`5432/6379` 建议仅内网或不对外映射 -- **配置文件** - - `.env`:与代码同目录放置(compose `env_file: .env`) - - 核心变量:`DB_URL` / `REDIS_URL` / `DATA_DIR` / `FERNET_KEY_PATH` / `LOG_DIR` - -### 5.2 首次上线(从零部署) - -1) 准备代码与配置: - -```bash -cd /path/to/your/deploy/dir -cp env.example .env -``` - -2) 构建并启动(生产 compose): - -```bash -docker compose up -d --build -``` - -或使用脚本: - -```bash -./connecthub.sh build -``` - -3) 验收检查点: -- `docker compose ps` 中 `backend/worker/beat/postgres/redis` 均为 healthy/running -- `GET http://:8000/health` 返回 ok -- Admin 可打开:`http://:8000/admin` -- 能创建一个 Job 并“立即运行”,在 JobLog 中看到 RUNNING→SUCCESS/FAILURE 的完整链路 - -### 5.3 日常发布(升级) - -1) 拉取新版本代码(或替换为新代码包)。 -2) 重新构建并启动: - -```bash -docker compose up -d --build -``` - -3) 验证点(升级后必做): -- `backend` 可访问、`/health` 正常 -- `beat` 在运行(否则 cron 不会触发) -- worker 能写入 JobLog(检查最新日志是否持续产生) -- 若有 schema 变更:确认 `ensure_schema` 执行未导致写库异常(看 worker 日志) - -### 5.4 回滚策略(手动) - -> 回滚核心目标:恢复“可用的旧版本服务”,并确保数据与 Fernet key 一致。 - -- **代码/镜像回滚** - - 回到上一个可用版本的代码(git tag/commit/发布包) - - 重新 `docker compose up -d --build` -- **关键风险(必须牢记)** - - **不能更换 `./data/fernet.key`**:一旦更换,历史 `secret_cfg` 将无法解密,任务会失败。 -- **数据回滚** - - 依赖备份(见第 6 章),优先恢复 Postgres 数据,再启动服务验证 - ---- - -## 6. 备份与恢复(必须项) - -本项目需要至少备份以下两类资产: -- **PostgreSQL 数据**(任务定义与日志) -- **Fernet key**(密文配置解密必需) - -### 6.1 备份策略(推荐最小集合) - -- **必须备份** - - `./data/fernet.key` - - PostgreSQL 数据(建议做“逻辑备份”或“目录快照”,至少二选一) -- **可选备份** - - `./data/logs/`(用于留存审计与排障) - -### 6.2 逻辑备份(推荐,跨机器/跨路径更稳) - -1) 执行备份(示例:在目标机上对容器内 pg 执行 `pg_dump`): - -```bash -docker compose exec -T postgres pg_dump -U connecthub -d connecthub > backup_connecthub.sql -``` - -2) 同步备份 Fernet key: - -```bash -cp ./data/fernet.key ./backup_fernet.key -``` - -3) 恢复(新环境/故障恢复): -- 启动 postgres/redis(可先不启动 worker/beat) -- 导入 SQL: - -```bash -cat backup_connecthub.sql | docker compose exec -T postgres psql -U connecthub -d connecthub -``` - -- 还原 Fernet key: - -```bash -cp ./backup_fernet.key ./data/fernet.key -``` - -- 启动全量服务并验证: - - Admin 能打开 - - 历史 Job 能正常运行(证明 secret_cfg 可解密) - -### 6.3 目录快照备份(简单,但对一致性要求更高) - -> 目录级备份前建议短暂停止写入(至少停止 worker/beat),避免备份不一致。 - -1) 停止任务执行(建议): - -```bash -docker compose stop worker beat -``` - -2) 打包 `pgdata` 与 `fernet.key`: - -```bash -tar -czf backup_data_$(date +%F).tgz ./data/pgdata ./data/fernet.key -``` - -3) 恢复后再启动 worker/beat: - -```bash -docker compose up -d -``` - ---- - -## 7. 排障手册(症状 → 定位 → 处理) - -### 7.1 Admin 保存 Job 报错:public_cfg/secret_cfg 必须是 JSON object - -- **症状**:保存时报 `public_cfg must be a JSON object` 或 `secret_cfg must be a JSON object` -- **定位**:`app/admin/views.py`(`on_model_change` 校验) -- **处理**: - - 确认输入为 `{...}` 且 key 使用双引号 - - 不要输入数组 `[...]`、字符串 `"..."`、或空内容 - -### 7.2 任务不按 cron 触发 - -- **症状**:Job 创建后一直没有新日志产生 -- **定位**: - - beat 是否运行:`docker compose ps`、`./connecthub.sh log beat` - - worker 是否运行:`./connecthub.sh log worker` - - Job 是否启用:Admin 中 `enabled` - - cron 是否正确:`cron_expr` 保存时虽校验,但仍可能逻辑不符合预期 - - 时区语义:调度使用 `Asia/Shanghai`(`app/tasks/dispatcher.py`) - - last_run_at 是否阻挡:同一分钟不会重复触发(`last_run_at >= now_min` 会跳过) -- **处理**: - - 先用 Job 的“立即运行”验证执行链路(排除插件/外部系统问题) - - 确认 beat 正常运行后再排 cron - -### 7.3 Retry 提示“正在运行中,请结束后再重试” - -- **症状**:点击 Retry 被拒绝 -- **定位**:`app/admin/routes.py`(RUNNING 状态保护) -- **处理**: - - 等待任务结束(SUCCESS/FAILURE)后再 Retry - - 若任务卡死:检查 worker 进程状态与外部依赖,必要时重启 worker - -### 7.4 secret_cfg 解密失败(Invalid token / looks truncated) - -- **症状**:任务失败,traceback 提示 Fernet token 无效或被截断 -- **定位**:`app/security/fernet.py:decrypt_json` -- **处理**: - - 确认 `./data/fernet.key` 未丢失、未被替换 - - 若是 token 被截断/污染:在 Admin 重新填写 `secret_cfg`(JSON object)保存,让系统重新加密 - -### 7.5 外部系统 API 调用失败(401/超时/限频) - -- **定位**: - - 致远:`app/integrations/seeyon.py`(401/Invalid token 会自动刷新重试一次) - - 滴滴:`app/integrations/didi.py`(401 会刷新 token 重试一次;示例 Job 有 150ms 间隔) -- **处理**: - - 查看 JobLog 的 `run_log` 与 `traceback` - - 核对 secret_cfg 中凭证字段是否正确 - - 若接口限频:按业务需要在 Job 内增加节流/批处理(参考示例策略) - -### 7.6 run_log/message 太长被截断 - -- **症状**:JobLog 的 run_log 或 message 尾部出现 `[TRUNCATED]` -- **定位**: - - run_log 截断:`app/core/log_capture.py` - - message 截断:`app/tasks/execute.py` -- **处理**: - - 减少单次输出量(避免把大 payload 全量打印) - - 需要保留大文本时,可按块写日志(示例:`extensions/sync_oa_to_didi/job.py` 的分块写入方法) - ---- - -## 8. 安全建议(上线必读) - -- **敏感信息管理** - - `.env` 不应提交到版本库;上线时使用权限受控的方式分发 - - `secret_cfg` 必须通过 Admin 输入并加密落库,避免在日志/代码中硬编码 - -- **端口暴露与网络隔离** - - `postgres/redis` 建议仅内网访问或不对外映射端口 - - Admin 暴露到公网时建议加一层反向代理与访问控制(本仓库未内置鉴权) - -- **日志脱敏** - - 任何 token/密码类字段不应完整写入日志 - - 可参考示例 Job 的 token mask 思路(`extensions/sync_oa_to_didi/job.py`) - -- **最小权限与持久化文件保护** - - 保护 `./data/fernet.key` 的读权限与备份权限 - - `./data/pgdata` 属于核心数据资产,应纳入备份与权限管理 -