This commit is contained in:
Marsway 2026-01-13 15:39:08 +08:00
parent 6b581bcf71
commit 25eb7fdace
6 changed files with 246 additions and 650 deletions

34
Log.md Normal file
View File

@ -0,0 +1,34 @@
# Job 执行全量日志(落盘)
本项目在任务执行时,会将该次执行的 **全量日志** 落盘到 `data/logs`(容器内为 `/data/logs`)下,便于排障与留存。
## 1. 路径规则
- 根目录:`./data/logs/`
- 按任务分目录:`./data/logs/<job_id>/`
- 单次执行日志文件名:
- `YYYY-MM-DD_HH-mm-ss_log-<job_log_id>.log`
- 时间戳使用 **Asia/Shanghai**(与调度时区一致)
示例:
```text
./data/logs/sync_oa_to_didi.sync_legal_entity/2026-01-13_10-20-33_log-1234.log
```
> 注意:若 `job_id` 中包含路径分隔符(`/` 或 `\`),会被替换为 `_`,避免产生目录穿越或多级目录。
## 2. 与 Admin 的 JobLog.run_log 的区别
- **`JobLog.run_log`(入库)**
- 展示在 Admin 的 JobLog 详情页
- 有字节上限(会截断),适合快速浏览
- **`data/logs/.../*.log`(落盘全量)**
- 尽力写入,不做长度截断
- 适合完整排障、留存、归档
## 3. 运维建议
- 建议将 `./data/logs` 纳入备份或日志归档策略(按业务合规要求)。
- 如需自动清理(防止无限增长),推荐在宿主机使用 `logrotate` 或定时任务做保留策略(本项目不内置自动清理逻辑)。

View File

@ -1,10 +1,28 @@
from __future__ import annotations
import logging
import os
from contextlib import contextmanager
from typing import Callable, Iterator
class JobLogIdFilter(logging.Filter):
"""
仅允许写入属于指定 job_log_id LogRecord
依赖 setup_logging() 安装的 LogRecordFactory 注入字段connecthub_job_log_id
"""
def __init__(self, *, job_log_id: int) -> None:
super().__init__()
self.job_log_id = int(job_log_id)
def filter(self, record: logging.LogRecord) -> bool: # noqa: A003
try:
return getattr(record, "connecthub_job_log_id", None) == self.job_log_id
except Exception:
return False
class SafeBufferingHandler(logging.Handler):
"""
只用于尽力捕获运行日志
@ -52,7 +70,12 @@ class SafeBufferingHandler(logging.Handler):
@contextmanager
def capture_logs(*, max_bytes: int = 200_000) -> Iterator[Callable[[], str]]:
def capture_logs(
*,
max_bytes: int = 200_000,
job_log_id: int | None = None,
file_path: str | None = None,
) -> Iterator[Callable[[], str]]:
"""
捕获当前进程root logger输出的日志文本
任何问题都不应影响业务执行
@ -64,6 +87,31 @@ def capture_logs(*, max_bytes: int = 200_000) -> Iterator[Callable[[], str]]:
logging.Formatter(fmt="%(asctime)s %(levelname)s %(name)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
)
file_handler: logging.Handler | None = None
flt: JobLogIdFilter | None = None
if job_log_id is not None:
try:
flt = JobLogIdFilter(job_log_id=int(job_log_id))
handler.addFilter(flt)
except Exception:
flt = None
if file_path:
try:
parent = os.path.dirname(file_path)
if parent:
os.makedirs(parent, exist_ok=True)
fh = logging.FileHandler(file_path, encoding="utf-8")
fh.setLevel(logging.INFO)
fh.setFormatter(
logging.Formatter(fmt="%(asctime)s %(levelname)s %(name)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
)
if flt is not None:
fh.addFilter(flt)
file_handler = fh
except Exception:
file_handler = None
try:
root.addHandler(handler)
except Exception:
@ -71,6 +119,16 @@ def capture_logs(*, max_bytes: int = 200_000) -> Iterator[Callable[[], str]]:
yield lambda: ""
return
if file_handler is not None:
try:
root.addHandler(file_handler)
except Exception:
try:
file_handler.close()
except Exception:
pass
file_handler = None
try:
yield handler.get_text
finally:
@ -78,5 +136,14 @@ def capture_logs(*, max_bytes: int = 200_000) -> Iterator[Callable[[], str]]:
root.removeHandler(handler)
except Exception:
pass
if file_handler is not None:
try:
root.removeHandler(file_handler)
except Exception:
pass
try:
file_handler.close()
except Exception:
pass

43
app/core/log_context.py Normal file
View File

@ -0,0 +1,43 @@
from __future__ import annotations
from contextvars import ContextVar, Token
_job_id_var: ContextVar[str | None] = ContextVar("connecthub_job_id", default=None)
_job_log_id_var: ContextVar[int | None] = ContextVar("connecthub_job_log_id", default=None)
JobContextTokens = tuple[Token[str | None], Token[int | None]]
def set_job_context(*, job_id: str | None, job_log_id: int | None) -> JobContextTokens:
"""
设置当前执行上下文用于日志隔离
返回 tokens便于在 finally reset 回原值
"""
t1 = _job_id_var.set(job_id)
t2 = _job_log_id_var.set(job_log_id)
return (t1, t2)
def clear_job_context(tokens: JobContextTokens | None = None) -> None:
"""
清理当前执行上下文
- 若提供 tokensreset 回进入上下文前的值推荐
- 否则直接置空
"""
if tokens is not None:
_job_id_var.reset(tokens[0])
_job_log_id_var.reset(tokens[1])
return
_job_id_var.set(None)
_job_log_id_var.set(None)
def get_job_id() -> str | None:
return _job_id_var.get()
def get_job_log_id() -> int | None:
return _job_log_id_var.get()

View File

@ -5,6 +5,7 @@ import os
from logging.handlers import RotatingFileHandler
from app.core.config import settings
from app.core.log_context import get_job_id, get_job_log_id
def setup_logging() -> None:
@ -12,6 +13,25 @@ def setup_logging() -> None:
if getattr(logger, "_connecthub_configured", False):
return
# 为每条日志注入“当前任务上下文”,供 per-run 日志隔离过滤使用。
# 仅安装一次(跨所有 logger 生效)。
if not getattr(logging, "_connecthub_record_factory_installed", False):
old_factory = logging.getLogRecordFactory()
def _record_factory(*args, **kwargs): # type: ignore[no-untyped-def]
record = old_factory(*args, **kwargs)
try:
setattr(record, "connecthub_job_id", get_job_id())
setattr(record, "connecthub_job_log_id", get_job_log_id())
except Exception:
# best-effort任何问题都不能影响日志系统
setattr(record, "connecthub_job_id", None)
setattr(record, "connecthub_job_log_id", None)
return record
logging.setLogRecordFactory(_record_factory)
setattr(logging, "_connecthub_record_factory_installed", True)
logger.setLevel(logging.INFO)
formatter = logging.Formatter(
fmt="%(asctime)s %(levelname)s %(name)s %(message)s",

View File

@ -1,12 +1,16 @@
from __future__ import annotations
import logging
import os
import traceback as tb
from datetime import datetime
from typing import Any
from zoneinfo import ZoneInfo
from app.core.log_capture import capture_logs
from app.core.logging import setup_logging
from app.core.log_context import clear_job_context, set_job_context
from app.core.config import settings
from app.db import crud
from app.db.engine import engine, get_session
from app.db.models import JobStatus
@ -55,6 +59,14 @@ def _compose_message(base_message: str, warning_lines: list[str]) -> str:
return msg
def _safe_job_dir_name(job_id: str) -> str:
"""
job_id 映射为安全的目录名避免路径分隔符造成目录穿越/嵌套
"""
s = (job_id or "").strip() or "unknown"
return s.replace("/", "_").replace("\\", "_")
@celery_app.task(bind=True, name="connecthub.execute_job")
def execute_job(
self,
@ -89,54 +101,72 @@ def execute_job(
snapshot: dict[str, Any] = {}
try:
with capture_logs(max_bytes=200_000) as get_run_log:
if snapshot_params:
job_id = snapshot_params["job_id"]
handler_path = snapshot_params["handler_path"]
public_cfg = snapshot_params.get("public_cfg", {}) or {}
secret_token = snapshot_params.get("secret_cfg", "") or ""
else:
if not job_id:
raise ValueError("job_id or snapshot_params is required")
job = crud.get_job(session, job_id)
if not job:
raise ValueError(f"Job not found: {job_id}")
handler_path = job.handler_path
public_cfg = job.public_cfg or {}
secret_token = job.secret_cfg or ""
snapshot = snapshot_params or {
"job_id": job_id,
"handler_path": handler_path,
"public_cfg": public_cfg,
"secret_cfg": secret_token,
"meta": {
"trigger": "celery",
"celery_task_id": celery_task_id,
"started_at": started_at.isoformat(),
},
}
# 任务开始即落库一条 RUNNING 记录(若外部已传入 log_id则只更新该条若创建失败则降级为旧行为结束时再 create
if job_log_id is None:
try:
if snapshot_params:
job_id = snapshot_params["job_id"]
handler_path = snapshot_params["handler_path"]
public_cfg = snapshot_params.get("public_cfg", {}) or {}
secret_token = snapshot_params.get("secret_cfg", "") or ""
else:
if not job_id:
raise ValueError("job_id or snapshot_params is required")
job = crud.get_job(session, job_id)
if not job:
raise ValueError(f"Job not found: {job_id}")
handler_path = job.handler_path
public_cfg = job.public_cfg or {}
secret_token = job.secret_cfg or ""
running = crud.create_job_log(
session,
job_id=str(job_id or ""),
status=JobStatus.RUNNING,
snapshot_params=snapshot,
message="运行中",
traceback="",
run_log="",
celery_task_id=celery_task_id,
attempt=attempt,
started_at=started_at,
finished_at=None,
)
job_log_id = int(running.id)
except Exception:
job_log_id = None
snapshot = snapshot_params or {
"job_id": job_id,
"handler_path": handler_path,
"public_cfg": public_cfg,
"secret_cfg": secret_token,
"meta": {
"trigger": "celery",
"celery_task_id": celery_task_id,
"started_at": started_at.isoformat(),
},
}
# per-run 全量日志落盘best-effort。若 job_log_id 缺失则无法保证唯一性,直接跳过。
per_run_log_path: str | None = None
if job_log_id is not None and job_id:
try:
log_root = settings.log_dir or os.path.join(settings.data_dir, "logs")
job_dir = os.path.join(log_root, _safe_job_dir_name(str(job_id)))
os.makedirs(job_dir, exist_ok=True)
tz = ZoneInfo("Asia/Shanghai")
ts = datetime.now(tz).strftime("%Y-%m-%d_%H-%M-%S")
per_run_log_path = os.path.join(job_dir, f"{ts}_log-{int(job_log_id)}.log")
except Exception:
per_run_log_path = None
logger.warning("prepare per-run log file failed job_id=%s log_id=%s", job_id, job_log_id)
# 任务开始即落库一条 RUNNING 记录(若外部已传入 log_id则只更新该条若创建失败则降级为旧行为结束时再 create
if job_log_id is None:
try:
running = crud.create_job_log(
session,
job_id=str(job_id or ""),
status=JobStatus.RUNNING,
snapshot_params=snapshot,
message="运行中",
traceback="",
run_log="",
celery_task_id=celery_task_id,
attempt=attempt,
started_at=started_at,
finished_at=None,
)
job_log_id = int(running.id)
except Exception:
job_log_id = None
ctx_tokens = None
with capture_logs(max_bytes=200_000, job_log_id=job_log_id, file_path=per_run_log_path) as get_run_log:
try:
if job_log_id is not None and job_id:
ctx_tokens = set_job_context(job_id=str(job_id), job_log_id=int(job_log_id))
secrets = decrypt_json(secret_token)
job_instance = instantiate(handler_path)
@ -152,6 +182,11 @@ def execute_job(
traceback = tb.format_exc()
logger.exception("execute_job failed job_id=%s", job_id)
finally:
try:
clear_job_context(ctx_tokens)
except Exception:
# best-effort不能影响任务执行
pass
try:
run_log_text = get_run_log() or ""
except Exception:
@ -160,7 +195,7 @@ def execute_job(
finished_at = datetime.utcnow()
warning_lines = _extract_warning_lines(run_log_text)
message = _compose_message(message, warning_lines)
# 结束时:优先更新 RUNNING 那条;若没有则创建最终记录(兼容降级)
# 结束时:优先更新 RUNNING 那条;若没有则创建最终记录
if job_log_id is not None:
crud.update_job_log(
session,

View File

@ -1,603 +0,0 @@
# ConnectHub 功能与部署指南
本文件面向开发与运维,汇总 ConnectHub 当前仓库 **已实现的全部功能feature**,并给出 **开发→上线docker compose 手动)→备份/回滚/排障** 的全流程指南。
> 约定:本文所有“定位”均以本仓库当前代码为准;`extensions/` 下内容被定义为 **标准示例**(可复制改造),不等同于平台核心能力。
---
### 定时任务不触发排查last_run_at 时区导致“被认为在未来”
如果你发现某个 Job 第一次手动触发/定时触发成功后,后续按 cron 到点却一直不再触发(`dispatcher.tick` 的日志里 `triggered: 0`),很可能是 `last_run_at` 的时区写入/解释不一致导致:
- 调度器会用 `Asia/Shanghai` 解释并比较 `last_run_at`,若认为 `last_run_at >= 当前分钟` 就会跳过触发;
- 如果数据库把 `last_run_at` 存成了 `... +00:00`UTC而你期望按 `+08:00` 计算就可能出现“last_run_at 在未来 8 小时”的错觉,导致任务被持续跳过。
解决:
- 新版本已修复:写入 `last_run_at` 时保留时区信息,并在发现 `last_run_at` 明显晚于当前分钟(未来时间)时自动忽略该值,避免任务永久不触发。
## 1. 项目概览
### 1.1 项目定位
ConnectHub 是一个轻量级企业集成中间件统一管理多系统集成任务Job提供定时调度、执行监控与“一键重试/立即运行”。
### 1.2 组件与架构(文字版)
- **FastAPI + SQLAdmin**:提供管理台(任务/日志)与简单健康检查
- **PostgreSQL**:持久化任务定义与执行日志
- **Redis**Celery broker/backend
- **Celery worker**:执行任务
- **Celery beat**:每分钟 tick 调度器,按 cron 触发任务
### 1.3 入口与检查点
- **Admin**`http://<host>:8000/admin`(由 `docker-compose.yml` 暴露 `8000:8000`
- **健康检查**`GET /health`
- 入口:`app/main.py`
- 返回:`{"ok": true, "name": "<app_name>"}`
---
## 2. Feature 全量清单(按模块分组)
本章只列举“仓库当前代码已实现”的能力,并给出源码定位。
### 2.1 Web/API 与 Admin 管理台SQLAdmin
- **FastAPI 应用初始化**
- **定位**`app/main.py`
- **行为**
- 初始化日志系统(见 2.7
- 确保 `/data` 与可选日志目录存在
- 确保 Fernet key 已生成并持久化(见 2.6
- 确保 DB schema见 2.8
- **Job任务管理**
- **模型定位**`app/db/models.py``Job`
- **Admin 视图定位**`app/admin/views.py``JobAdmin`
- **字段含义**
- `id`:任务 ID创建必填可在表单中编辑主键
- `enabled`:是否启用(调度只触发启用任务)
- `cron_expr`cron 表达式Asia/Shanghai 时区语义)
- `handler_path`:处理器路径(插件类定位,见 2.4
- `public_cfg`明文配置JSON object
- `secret_cfg`密文配置DB 存 Fernet token编辑页不回显
- `last_run_at`:最近一次被调度触发的时间(防重复)
- **保存校验(重要)**
- `id` 必填(创建时)
- `handler_path` 必须可 import且类必须继承 `BaseJob`
- `cron_expr` 必须可被 `croniter` 解析
- `public_cfg` 必须是 **合法 JSON 对象dict**
- `secret_cfg` 创建时必须是 **合法 JSON 对象dict**,并会加密落库;编辑时留空表示不更新,填入则会校验并加密覆盖
- **管理动作actions**
- 立即运行Run Now创建 RUNNING 的 JobLog 并异步执行
- 查看日志:跳转到 JobLog 列表并按 job_id 搜索
- 停用任务(保留日志)
- 清理日志(保留任务)
- 删除任务(含日志)
- **JobLog任务日志管理**
- **模型定位**`app/db/models.py``JobLog``JobStatus`
- **Admin 视图定位**`app/admin/views.py``JobLogAdmin`
- **展示与能力**
- 只读(不可创建/编辑/删除)
- 列表支持按 `job_id` 搜索
- 详情展示:
- `snapshot_params`(快照参数)
- `message`(包含基础消息 + warnings 摘要,可能截断)
- `traceback`(异常堆栈)
- `run_log`(运行日志,可能截断)
- `celery_task_id` / `attempt` / `started_at` / `finished_at`
- **Admin 路由Retry / Run Now**
- **定位**`app/admin/routes.py`
- **能力**
- `POST /admin/joblogs/{log_id}/retry`:基于历史 `snapshot_params` 创建新 RUNNING JobLog 并重跑
- 保护:当原日志状态为 RUNNING 时禁止重试
- 标记:在 snapshot.meta 中写入 `trigger=retry`
- `POST /admin/jobs/{job_id}/run`:创建 RUNNING JobLog 并按当前 DB Job 定义执行
### 2.2 调度系统Cron + 防重复)
- **Beat 定时 tick**
- **定位**`app/tasks/celery_app.py``beat_schedule` 每 60 秒)
- **触发任务名**`connecthub.dispatcher.tick`
- **调度器逻辑**
- **定位**`app/tasks/dispatcher.py`
- **行为**
- 只读取 `enabled=True` 的 Jobs`app/db/crud.py:list_enabled_jobs`
- 使用 `croniter` 判断“当前分钟是否应触发”(时区 Asia/Shanghai
- 使用 `last_run_at` 防止同一分钟重复触发(对 naive datetime 做时区解释)
- 触发执行:`execute_job.delay(job_id=job.id)`
### 2.3 执行引擎(通用 execute_job
- **任务执行入口**
- **定位**`app/tasks/execute.py`Celery task`connecthub.execute_job`
- **两种入口模式**
- 传 `job_id`:从 DB 读取 `handler_path/public_cfg/secret_cfg`
- 传 `snapshot_params`:按快照执行(用于 Retry
- **JobLog 生命周期**
- **定位**`app/tasks/execute.py` + `app/db/crud.py`
- **行为**
- 若未显式传入 `log_id`,会尽力先创建一条 RUNNING 记录;若创建失败则降级(结束时再创建最终记录)
- 执行结束后更新该条 JobLog状态SUCCESS/FAILURE、message、traceback、run_log、attempt、finished_at 等
- **运行日志捕获与截断**
- **定位**`app/core/log_capture.py`
- **策略**
- 捕获 root logger 输出写入 `run_log`
- 以字节数限制(默认 200KB截断并追加标记
- **message 合成策略**
- **定位**`app/tasks/execute.py`
- 从 run_log 提取 WARNING 行并追加到 message并做总长度保护50k 字符上限)
### 2.4 插件机制handler_path 动态加载)
- **BaseJob 规范**
- **定位**`app/jobs/base.py`
- **要求**:插件 Job 必须实现 `run(params, secrets)`
- `params`:来自 `Job.public_cfg`(明文)
- `secrets`:来自 `Job.secret_cfg` 解密后的明文(仅内存)
- **handler_path 解析与加载**
- **定位**`app/plugins/manager.py`
- **支持格式**
- `pkg.mod:ClassName`(推荐)
- `pkg.mod.ClassName`
- **约束**
- 类必须存在且是 `BaseJob` 子类,否则保存/加载失败
### 2.5 集成客户端(统一 HTTP 调用层)
- **致远 OASeeyonClient**
- **定位**`app/integrations/seeyon.py`
- **能力**
- `POST /seeyon/rest/token` 获取 token`id`
- 业务请求自动携带 `token` header
- 遇到 401 或响应包含 `Invalid token`:自动刷新 token 并重试一次
- CAP4 无流程表单导出:`POST /seeyon/rest/cap4/form/soap/export`
- **滴滴DidiClient**
- **定位**`app/integrations/didi.py`
- **能力**
- `POST /river/Auth/authorize` 获取 `access_token` 并缓存(考虑 skew
- 生成签名 sign当前实现为 MD5
- 遇到 401清空 token重新 authorize 后重试一次
- 关键 API
- 公司主体查询:`GET /river/LegalEntity/get`
- 员工明细:`GET /river/Member/detail`
- 员工修改:`POST /river/Member/edit`
### 2.6 配置与安全public_cfg / secret_cfg 与 Fernet
- **配置来源**
- **定位**`app/core/config.py``Settings` 从 `.env` 读取)
- 关键变量(默认值见代码与 `README.md`
- `DATA_DIR`(默认 `/data`
- `DB_URL`
- `REDIS_URL`
- `FERNET_KEY_PATH`(默认 `/data/fernet.key`
- `LOG_DIR`(默认 `/data/logs`,可为空关闭文件日志)
- **secret_cfg 加密存储与解密执行**
- **定位**`app/security/fernet.py`
- **行为**
- Admin 保存 `secret_cfg` 时加密落库Fernet token
- Worker 执行时解密为 dict仅在内存中传给 Job
- Fernet key 在启动时自动生成并写入 `FERNET_KEY_PATH`(见 `app/main.py`
- **重要约束(上线必读)**
- **正式环境必须持久化并保留同一个 Fernet key 文件**,否则历史 `secret_cfg` 将无法解密。
- **常见脏数据兼容/报错**
- token 被引号包裹、混入空白/换行会被清理
- token 被截断会报 “looks truncated”需重新保存 secret_cfg 重新加密
### 2.7 日志与可观测
- **全局日志初始化**
- **定位**`app/core/logging.py`
- **行为**
- stdout 输出INFO
- 若 `LOG_DIR` 不为空:写入滚动文件 `connecthub.log`10MB * 5
- **任务级日志run_log**
- **定位**`app/core/log_capture.py` + `app/tasks/execute.py`
- **行为**:捕获执行期间日志写入 `JobLog.run_log`,超限截断
### 2.8 数据库与 schema 自升级(轻量)
- **DB Engine/Session**
- **定位**`app/db/engine.py`
- **行为**:按 `DB_URL` 创建 enginesqlite 兼容 `check_same_thread=False`
- **schema 确保与轻量自升级**
- **定位**`app/db/schema.py`
- **行为**
- `create_all` 初次建表
- 若 `job_logs.run_log` 列不存在:`ALTER TABLE` 补列
- 若 `status` 存在约束且不允许 `RUNNING`做兼容迁移SQLite 重建表 / Postgres 调整 CHECK
### 2.9 运维脚本与容器化形态
- **docker compose 服务组成**
- **定位**`docker-compose.yml`
- 服务:`redis` / `postgres` / `backend` / `worker` / `beat`
- 持久化:
- `./data/pgdata` → postgres 数据目录
- `./data` → 容器 `/data`(包含 fernet.key、logs 等)
- **开发 overlay**
- **定位**`docker-compose.dev.yml`
- 行为:
- backend`uvicorn --reload`
- worker/beat`watchfiles` 监听 `app/``extensions/` 变更自动重启
- **管理脚本**
- **定位**`connecthub.sh`
- 能力build/start/restart/stop + dev-build/dev-start/... + logs可 follow、可 tail、可指定 service
- **镜像构建**
- **定位**`docker/Dockerfile`
- 行为:`pip install .` 后复制 `app/``extensions/` 到镜像内 `/app`
---
## 3. 标准示例extensions
本章内容来自 `extensions/`,定位为 **标准示例/模板**:用于展示“如何写 Job、如何组织集成调用、如何记录日志与处理错误”。上线时可保留也可按业务需要替换/扩展。
### 3.1 示例文件与入口
- **示例文件**`extensions/sync_oa_to_didi/job.py`
- **handler_path 填写示例**(用于 Admin 创建 Job
- `extensions.sync_oa_to_didi.job:SyncOAToDidiTokenJob`
- `extensions.sync_oa_to_didi.job:SyncOAToDidiExportFormJob`
- `extensions.sync_oa_to_didi.job:SyncOAToDidiLegalEntitySyncJob`
### 3.2 SyncOAToDidiTokenJobtoken 获取演示)
- **目的**:演示调用致远 OA 获取 token并在日志中进行基础脱敏输出。
- **需要的 public_cfg 字段**
- `base_url`:致远 OA base url不包含具体路径
- **需要的 secret_cfg 字段(解密后)**
- `rest_user`REST 帐号
- `rest_password`REST 密码
- `loginName`:可选(模拟登录名)
- **行为特征**
- 使用 `SeeyonClient.authenticate()` 获取 token
- 日志输出会对 token 做 mask避免完整泄露
### 3.3 SyncOAToDidiExportFormJobCAP4 无流程表单导出)
- **目的**:调用致远 OA CAP4 表单导出接口,返回原始响应文本,并将大文本按块写入 run_log尽力而为
- **需要的 public_cfg 字段**
- `base_url`
- `templateCode`
- 可选:`senderLoginName` / `rightId` / `doTrigger` / `param` / `extra`(扩展字段 dict
- **需要的 secret_cfg 字段(解密后)**
- `rest_user` / `rest_password` / `loginName`
- **行为特征**
- 返回结构中包含 `raw`(原始文本)与 `meta`content_length/content_type 等)
- 大文本会拆分 chunk 记录到 run_log仍受 run_log 总量上限截断)
### 3.4 SyncOAToDidiLegalEntitySyncJobOA→滴滴公司主体同步示例
- **目的**:从 OA 导出数据中解析“工号/所属公司”,并同步到滴滴员工的 `legal_entity_id`
- **需要的 public_cfg 字段**
- `oa_base_url`
- `oa_templateCode`
- `didi_base_url`
- 可选:`senderLoginName/rightId/doTrigger/param/extra`(透传到 OA 导出)
- **需要的 secret_cfg 字段(解密后)**
- OA`rest_user/rest_password/loginName`
- 滴滴:`company_id/client_id/client_secret/sign_key`
- **行为特征(示例策略)**
- 从表单 definition 中定位 display 为“工号/所属公司”的字段名
- 公司主体查询结果做进程内缓存
- 员工修改按文档要求增加 150ms 间隔(避免限频)
- 输出统计:总行数/成功/跳过/错误列表(截取前 50
---
## 4. 开发全流程指南(本地开发)
### 4.1 前置条件
- 安装 Docker 与 docker compose
- 端口规划(默认 compose 暴露):
- `8000`backend / Admin
- `5432`postgres如不需要宿主机直连可在 compose 里移除映射)
- `6379`redis同上
### 4.2 初始化与启动(开发模式)
1) 在仓库根目录创建 `.env`(参考 `env.example`
```bash
cp env.example .env
```
2) 启动(开发 overlay支持热更新
```bash
docker compose -f docker-compose.yml -f docker-compose.dev.yml up -d --build
```
或使用脚本:
```bash
./connecthub.sh dev-build
./connecthub.sh dev-start
```
3) 首次启动会自动完成(无需手工执行):
- 创建 `/data` 目录(宿主机对应 `./data`
- 生成并持久化 Fernet key默认 `./data/fernet.key`
- 初始化/升级数据库 schema建表、补列、兼容约束
4) 打开 Admin
- `http://localhost:8000/admin`
### 4.3 新增一个插件 Job标准步骤
1) 在 `extensions/` 下创建你的模块与 Job 类(继承 `BaseJob`,实现 `run(params, secrets)`)。
2) 在 Admin 创建 Job
- `id`:自定义(建议带命名空间)
- `enabled`true
- `cron_expr`:例如 `* * * * *`(每分钟)
- `handler_path`:例如 `extensions.your_ext.job:YourJob`
- `public_cfg`:必须是 JSON object形如 `{...}`
- `secret_cfg`:必须是 JSON object形如 `{...}`,保存时会加密)
3) 观察执行结果:
- 通过 Job 上的“立即运行”或等待 cron 触发
- 在 JobLog 中查看 `message/traceback/run_log/snapshot_params`
### 4.4 查看日志(开发/排查)
- 查看全部服务日志:
```bash
./connecthub.sh log
```
- 只看 worker并 follow
```bash
./connecthub.sh log -f worker
```
### 4.5 常见开发坑(必须注意)
- `public_cfg/secret_cfg` 在 Admin 中保存时必须输入 **合法 JSON 对象**(双引号、且为 `{...}`),否则会直接阻止落库。
- 编辑 Job 时 `secret_cfg` **不回显**:留空表示不更新;填写则会校验 JSON 并重新加密覆盖。
- 如果你在不同环境/不同机器上复用数据库,必须同时迁移 `/data/fernet.key`,否则解密会失败。
---
## 5. 上线全流程指南docker compose 手动部署)
> 本项目当前形态为“统一环境 + 手动发布”,以下流程以 `docker-compose.yml` 为准(生产不要使用 dev overlay
### 5.1 部署前准备清单
- **目标机依赖**Docker、docker compose
- **部署目录**:建议固定到一个路径(例如 `/opt/connecthub`),保证 `./data` 可持久化
- **必须持久化的内容**
- `./data/pgdata`PostgreSQL 数据目录(必须)
- `./data/fernet.key`Fernet key必须影响历史 secret_cfg 可解密)
- `./data/logs`:文件日志(可选,但推荐便于排障)
- **端口与网络**
- 若非必要,建议仅暴露 `8000``5432/6379` 建议仅内网或不对外映射
- **配置文件**
- `.env`与代码同目录放置compose `env_file: .env`
- 核心变量:`DB_URL` / `REDIS_URL` / `DATA_DIR` / `FERNET_KEY_PATH` / `LOG_DIR`
### 5.2 首次上线(从零部署)
1) 准备代码与配置:
```bash
cd /path/to/your/deploy/dir
cp env.example .env
```
2) 构建并启动(生产 compose
```bash
docker compose up -d --build
```
或使用脚本:
```bash
./connecthub.sh build
```
3) 验收检查点:
- `docker compose ps``backend/worker/beat/postgres/redis` 均为 healthy/running
- `GET http://<host>:8000/health` 返回 ok
- Admin 可打开:`http://<host>:8000/admin`
- 能创建一个 Job 并“立即运行”,在 JobLog 中看到 RUNNING→SUCCESS/FAILURE 的完整链路
### 5.3 日常发布(升级)
1) 拉取新版本代码(或替换为新代码包)。
2) 重新构建并启动:
```bash
docker compose up -d --build
```
3) 验证点(升级后必做):
- `backend` 可访问、`/health` 正常
- `beat` 在运行(否则 cron 不会触发)
- worker 能写入 JobLog检查最新日志是否持续产生
- 若有 schema 变更:确认 `ensure_schema` 执行未导致写库异常(看 worker 日志)
### 5.4 回滚策略(手动)
> 回滚核心目标:恢复“可用的旧版本服务”,并确保数据与 Fernet key 一致。
- **代码/镜像回滚**
- 回到上一个可用版本的代码git tag/commit/发布包)
- 重新 `docker compose up -d --build`
- **关键风险(必须牢记)**
- **不能更换 `./data/fernet.key`**:一旦更换,历史 `secret_cfg` 将无法解密,任务会失败。
- **数据回滚**
- 依赖备份(见第 6 章),优先恢复 Postgres 数据,再启动服务验证
---
## 6. 备份与恢复(必须项)
本项目需要至少备份以下两类资产:
- **PostgreSQL 数据**(任务定义与日志)
- **Fernet key**(密文配置解密必需)
### 6.1 备份策略(推荐最小集合)
- **必须备份**
- `./data/fernet.key`
- PostgreSQL 数据(建议做“逻辑备份”或“目录快照”,至少二选一)
- **可选备份**
- `./data/logs/`(用于留存审计与排障)
### 6.2 逻辑备份(推荐,跨机器/跨路径更稳)
1) 执行备份(示例:在目标机上对容器内 pg 执行 `pg_dump`
```bash
docker compose exec -T postgres pg_dump -U connecthub -d connecthub > backup_connecthub.sql
```
2) 同步备份 Fernet key
```bash
cp ./data/fernet.key ./backup_fernet.key
```
3) 恢复(新环境/故障恢复):
- 启动 postgres/redis可先不启动 worker/beat
- 导入 SQL
```bash
cat backup_connecthub.sql | docker compose exec -T postgres psql -U connecthub -d connecthub
```
- 还原 Fernet key
```bash
cp ./backup_fernet.key ./data/fernet.key
```
- 启动全量服务并验证:
- Admin 能打开
- 历史 Job 能正常运行(证明 secret_cfg 可解密)
### 6.3 目录快照备份(简单,但对一致性要求更高)
> 目录级备份前建议短暂停止写入(至少停止 worker/beat避免备份不一致。
1) 停止任务执行(建议):
```bash
docker compose stop worker beat
```
2) 打包 `pgdata``fernet.key`
```bash
tar -czf backup_data_$(date +%F).tgz ./data/pgdata ./data/fernet.key
```
3) 恢复后再启动 worker/beat
```bash
docker compose up -d
```
---
## 7. 排障手册(症状 → 定位 → 处理)
### 7.1 Admin 保存 Job 报错public_cfg/secret_cfg 必须是 JSON object
- **症状**:保存时报 `public_cfg must be a JSON object``secret_cfg must be a JSON object`
- **定位**`app/admin/views.py``on_model_change` 校验)
- **处理**
- 确认输入为 `{...}` 且 key 使用双引号
- 不要输入数组 `[...]`、字符串 `"..."`、或空内容
### 7.2 任务不按 cron 触发
- **症状**Job 创建后一直没有新日志产生
- **定位**
- beat 是否运行:`docker compose ps`、`./connecthub.sh log beat`
- worker 是否运行:`./connecthub.sh log worker`
- Job 是否启用Admin 中 `enabled`
- cron 是否正确:`cron_expr` 保存时虽校验,但仍可能逻辑不符合预期
- 时区语义:调度使用 `Asia/Shanghai``app/tasks/dispatcher.py`
- last_run_at 是否阻挡:同一分钟不会重复触发(`last_run_at >= now_min` 会跳过)
- **处理**
- 先用 Job 的“立即运行”验证执行链路(排除插件/外部系统问题)
- 确认 beat 正常运行后再排 cron
### 7.3 Retry 提示“正在运行中,请结束后再重试”
- **症状**:点击 Retry 被拒绝
- **定位**`app/admin/routes.py`RUNNING 状态保护)
- **处理**
- 等待任务结束SUCCESS/FAILURE后再 Retry
- 若任务卡死:检查 worker 进程状态与外部依赖,必要时重启 worker
### 7.4 secret_cfg 解密失败Invalid token / looks truncated
- **症状**任务失败traceback 提示 Fernet token 无效或被截断
- **定位**`app/security/fernet.py:decrypt_json`
- **处理**
- 确认 `./data/fernet.key` 未丢失、未被替换
- 若是 token 被截断/污染:在 Admin 重新填写 `secret_cfg`JSON object保存让系统重新加密
### 7.5 外部系统 API 调用失败401/超时/限频)
- **定位**
- 致远:`app/integrations/seeyon.py`401/Invalid token 会自动刷新重试一次)
- 滴滴:`app/integrations/didi.py`401 会刷新 token 重试一次;示例 Job 有 150ms 间隔)
- **处理**
- 查看 JobLog 的 `run_log``traceback`
- 核对 secret_cfg 中凭证字段是否正确
- 若接口限频:按业务需要在 Job 内增加节流/批处理(参考示例策略)
### 7.6 run_log/message 太长被截断
- **症状**JobLog 的 run_log 或 message 尾部出现 `[TRUNCATED]`
- **定位**
- run_log 截断:`app/core/log_capture.py`
- message 截断:`app/tasks/execute.py`
- **处理**
- 减少单次输出量(避免把大 payload 全量打印)
- 需要保留大文本时,可按块写日志(示例:`extensions/sync_oa_to_didi/job.py` 的分块写入方法)
---
## 8. 安全建议(上线必读)
- **敏感信息管理**
- `.env` 不应提交到版本库;上线时使用权限受控的方式分发
- `secret_cfg` 必须通过 Admin 输入并加密落库,避免在日志/代码中硬编码
- **端口暴露与网络隔离**
- `postgres/redis` 建议仅内网访问或不对外映射端口
- Admin 暴露到公网时建议加一层反向代理与访问控制(本仓库未内置鉴权)
- **日志脱敏**
- 任何 token/密码类字段不应完整写入日志
- 可参考示例 Job 的 token mask 思路(`extensions/sync_oa_to_didi/job.py`
- **最小权限与持久化文件保护**
- 保护 `./data/fernet.key` 的读权限与备份权限
- `./data/pgdata` 属于核心数据资产,应纳入备份与权限管理