23 KiB
ConnectHub 功能与部署指南
本文件面向开发与运维,汇总 ConnectHub 当前仓库 已实现的全部功能(feature),并给出 开发→上线(docker compose 手动)→备份/回滚/排障 的全流程指南。
约定:本文所有“定位”均以本仓库当前代码为准;
extensions/下内容被定义为 标准示例(可复制改造),不等同于平台核心能力。
定时任务不触发排查:last_run_at 时区导致“被认为在未来”
如果你发现某个 Job 第一次手动触发/定时触发成功后,后续按 cron 到点却一直不再触发(dispatcher.tick 的日志里 triggered: 0),很可能是 last_run_at 的时区写入/解释不一致导致:
- 调度器会用
Asia/Shanghai解释并比较last_run_at,若认为last_run_at >= 当前分钟就会跳过触发; - 如果数据库把
last_run_at存成了... +00:00(UTC),而你期望按+08:00计算,就可能出现“last_run_at 在未来 8 小时”的错觉,导致任务被持续跳过。
解决:
- 新版本已修复:写入
last_run_at时保留时区信息,并在发现last_run_at明显晚于当前分钟(未来时间)时自动忽略该值,避免任务永久不触发。
1. 项目概览
1.1 项目定位
ConnectHub 是一个轻量级企业集成中间件:统一管理多系统集成任务(Job),提供定时调度、执行监控与“一键重试/立即运行”。
1.2 组件与架构(文字版)
- FastAPI + SQLAdmin:提供管理台(任务/日志)与简单健康检查
- PostgreSQL:持久化任务定义与执行日志
- Redis:Celery broker/backend
- Celery worker:执行任务
- Celery beat:每分钟 tick 调度器,按 cron 触发任务
1.3 入口与检查点
- Admin:
http://<host>:8000/admin(由docker-compose.yml暴露8000:8000) - 健康检查:
GET /health- 入口:
app/main.py - 返回:
{"ok": true, "name": "<app_name>"}
- 入口:
2. Feature 全量清单(按模块分组)
本章只列举“仓库当前代码已实现”的能力,并给出源码定位。
2.1 Web/API 与 Admin 管理台(SQLAdmin)
-
FastAPI 应用初始化
- 定位:
app/main.py - 行为:
- 初始化日志系统(见 2.7)
- 确保
/data与可选日志目录存在 - 确保 Fernet key 已生成并持久化(见 2.6)
- 确保 DB schema(见 2.8)
- 定位:
-
Job(任务)管理
- 模型定位:
app/db/models.py(Job) - Admin 视图定位:
app/admin/views.py(JobAdmin) - 字段含义:
id:任务 ID(创建必填,可在表单中编辑主键)enabled:是否启用(调度只触发启用任务)cron_expr:cron 表达式(Asia/Shanghai 时区语义)handler_path:处理器路径(插件类定位,见 2.4)public_cfg:明文配置(JSON object)secret_cfg:密文配置(DB 存 Fernet token;编辑页不回显)last_run_at:最近一次被调度触发的时间(防重复)
- 保存校验(重要):
id必填(创建时)handler_path必须可 import,且类必须继承BaseJobcron_expr必须可被croniter解析public_cfg必须是 合法 JSON 对象(dict)secret_cfg创建时必须是 合法 JSON 对象(dict),并会加密落库;编辑时留空表示不更新,填入则会校验并加密覆盖
- 管理动作(actions):
- 立即运行(Run Now):创建 RUNNING 的 JobLog 并异步执行
- 查看日志:跳转到 JobLog 列表并按 job_id 搜索
- 停用任务(保留日志)
- 清理日志(保留任务)
- 删除任务(含日志)
- 模型定位:
-
JobLog(任务日志)管理
- 模型定位:
app/db/models.py(JobLog,JobStatus) - Admin 视图定位:
app/admin/views.py(JobLogAdmin) - 展示与能力:
- 只读(不可创建/编辑/删除)
- 列表支持按
job_id搜索 - 详情展示:
snapshot_params(快照参数)message(包含基础消息 + warnings 摘要,可能截断)traceback(异常堆栈)run_log(运行日志,可能截断)celery_task_id/attempt/started_at/finished_at
- 模型定位:
-
Admin 路由(Retry / Run Now)
- 定位:
app/admin/routes.py - 能力:
POST /admin/joblogs/{log_id}/retry:基于历史snapshot_params创建新 RUNNING JobLog 并重跑- 保护:当原日志状态为 RUNNING 时禁止重试
- 标记:在 snapshot.meta 中写入
trigger=retry
POST /admin/jobs/{job_id}/run:创建 RUNNING JobLog 并按当前 DB Job 定义执行
- 定位:
2.2 调度系统(Cron + 防重复)
-
Beat 定时 tick
- 定位:
app/tasks/celery_app.py(beat_schedule每 60 秒) - 触发任务名:
connecthub.dispatcher.tick
- 定位:
-
调度器逻辑
- 定位:
app/tasks/dispatcher.py - 行为:
- 只读取
enabled=True的 Jobs(app/db/crud.py:list_enabled_jobs) - 使用
croniter判断“当前分钟是否应触发”(时区 Asia/Shanghai) - 使用
last_run_at防止同一分钟重复触发(对 naive datetime 做时区解释) - 触发执行:
execute_job.delay(job_id=job.id)
- 只读取
- 定位:
2.3 执行引擎(通用 execute_job)
-
任务执行入口
- 定位:
app/tasks/execute.py(Celery task:connecthub.execute_job) - 两种入口模式:
- 传
job_id:从 DB 读取handler_path/public_cfg/secret_cfg - 传
snapshot_params:按快照执行(用于 Retry)
- 传
- 定位:
-
JobLog 生命周期
- 定位:
app/tasks/execute.py+app/db/crud.py - 行为:
- 若未显式传入
log_id,会尽力先创建一条 RUNNING 记录;若创建失败则降级(结束时再创建最终记录) - 执行结束后更新该条 JobLog:状态(SUCCESS/FAILURE)、message、traceback、run_log、attempt、finished_at 等
- 若未显式传入
- 定位:
-
运行日志捕获与截断
- 定位:
app/core/log_capture.py - 策略:
- 捕获 root logger 输出写入
run_log - 以字节数限制(默认 200KB)截断并追加标记
- 捕获 root logger 输出写入
- message 合成策略
- 定位:
app/tasks/execute.py - 从 run_log 提取 WARNING 行并追加到 message,并做总长度保护(50k 字符上限)
- 定位:
- 定位:
2.4 插件机制(handler_path 动态加载)
-
BaseJob 规范
- 定位:
app/jobs/base.py - 要求:插件 Job 必须实现
run(params, secrets)params:来自Job.public_cfg(明文)secrets:来自Job.secret_cfg解密后的明文(仅内存)
- 定位:
-
handler_path 解析与加载
- 定位:
app/plugins/manager.py - 支持格式:
pkg.mod:ClassName(推荐)pkg.mod.ClassName
- 约束:
- 类必须存在且是
BaseJob子类,否则保存/加载失败
- 类必须存在且是
- 定位:
2.5 集成客户端(统一 HTTP 调用层)
-
致远 OA(SeeyonClient)
- 定位:
app/integrations/seeyon.py - 能力:
POST /seeyon/rest/token获取 token(id)- 业务请求自动携带
tokenheader - 遇到 401 或响应包含
Invalid token:自动刷新 token 并重试一次 - CAP4 无流程表单导出:
POST /seeyon/rest/cap4/form/soap/export
- 定位:
-
滴滴(DidiClient)
- 定位:
app/integrations/didi.py - 能力:
POST /river/Auth/authorize获取access_token并缓存(考虑 skew)- 生成签名 sign(当前实现为 MD5)
- 遇到 401:清空 token,重新 authorize 后重试一次
- 关键 API:
- 公司主体查询:
GET /river/LegalEntity/get - 员工明细:
GET /river/Member/detail - 员工修改:
POST /river/Member/edit
- 公司主体查询:
- 定位:
2.6 配置与安全(public_cfg / secret_cfg 与 Fernet)
-
配置来源
- 定位:
app/core/config.py(Settings从.env读取) - 关键变量(默认值见代码与
README.md):DATA_DIR(默认/data)DB_URLREDIS_URLFERNET_KEY_PATH(默认/data/fernet.key)LOG_DIR(默认/data/logs,可为空关闭文件日志)
- 定位:
-
secret_cfg 加密存储与解密执行
- 定位:
app/security/fernet.py - 行为:
- Admin 保存
secret_cfg时加密落库(Fernet token) - Worker 执行时解密为 dict,仅在内存中传给 Job
- Fernet key 在启动时自动生成并写入
FERNET_KEY_PATH(见app/main.py)
- Admin 保存
- 重要约束(上线必读):
- 正式环境必须持久化并保留同一个 Fernet key 文件,否则历史
secret_cfg将无法解密。
- 正式环境必须持久化并保留同一个 Fernet key 文件,否则历史
- 常见脏数据兼容/报错
- token 被引号包裹、混入空白/换行会被清理
- token 被截断会报 “looks truncated”,需重新保存 secret_cfg 重新加密
- 定位:
2.7 日志与可观测
-
全局日志初始化
- 定位:
app/core/logging.py - 行为:
- stdout 输出(INFO)
- 若
LOG_DIR不为空:写入滚动文件connecthub.log(10MB * 5)
- 定位:
-
任务级日志(run_log)
- 定位:
app/core/log_capture.py+app/tasks/execute.py - 行为:捕获执行期间日志写入
JobLog.run_log,超限截断
- 定位:
2.8 数据库与 schema 自升级(轻量)
-
DB Engine/Session
- 定位:
app/db/engine.py - 行为:按
DB_URL创建 engine;sqlite 兼容check_same_thread=False
- 定位:
-
schema 确保与轻量自升级
- 定位:
app/db/schema.py - 行为:
create_all初次建表- 若
job_logs.run_log列不存在:ALTER TABLE补列 - 若
status存在约束且不允许RUNNING:做兼容迁移(SQLite 重建表 / Postgres 调整 CHECK)
- 定位:
2.9 运维脚本与容器化形态
-
docker compose 服务组成
- 定位:
docker-compose.yml - 服务:
redis/postgres/backend/worker/beat - 持久化:
./data/pgdata→ postgres 数据目录./data→ 容器/data(包含 fernet.key、logs 等)
- 定位:
-
开发 overlay
- 定位:
docker-compose.dev.yml - 行为:
- backend:
uvicorn --reload - worker/beat:
watchfiles监听app/与extensions/变更自动重启
- backend:
- 定位:
-
管理脚本
- 定位:
connecthub.sh - 能力:build/start/restart/stop + dev-build/dev-start/... + logs(可 follow、可 tail、可指定 service)
- 定位:
-
镜像构建
- 定位:
docker/Dockerfile - 行为:
pip install .后复制app/与extensions/到镜像内/app
- 定位:
3. 标准示例(extensions)
本章内容来自 extensions/,定位为 标准示例/模板:用于展示“如何写 Job、如何组织集成调用、如何记录日志与处理错误”。上线时可保留,也可按业务需要替换/扩展。
3.1 示例文件与入口
- 示例文件:
extensions/sync_oa_to_didi/job.py - handler_path 填写示例(用于 Admin 创建 Job):
extensions.sync_oa_to_didi.job:SyncOAToDidiTokenJobextensions.sync_oa_to_didi.job:SyncOAToDidiExportFormJobextensions.sync_oa_to_didi.job:SyncOAToDidiLegalEntitySyncJob
3.2 SyncOAToDidiTokenJob(token 获取演示)
- 目的:演示调用致远 OA 获取 token,并在日志中进行基础脱敏输出。
- 需要的 public_cfg 字段
base_url:致远 OA base url(不包含具体路径)
- 需要的 secret_cfg 字段(解密后)
rest_user:REST 帐号rest_password:REST 密码loginName:可选(模拟登录名)
- 行为特征
- 使用
SeeyonClient.authenticate()获取 token - 日志输出会对 token 做 mask(避免完整泄露)
- 使用
3.3 SyncOAToDidiExportFormJob(CAP4 无流程表单导出)
- 目的:调用致远 OA CAP4 表单导出接口,返回原始响应文本,并将大文本按块写入 run_log(尽力而为)。
- 需要的 public_cfg 字段
base_urltemplateCode- 可选:
senderLoginName/rightId/doTrigger/param/extra(扩展字段 dict)
- 需要的 secret_cfg 字段(解密后)
rest_user/rest_password/loginName
- 行为特征
- 返回结构中包含
raw(原始文本)与meta(content_length/content_type 等) - 大文本会拆分 chunk 记录到 run_log(仍受 run_log 总量上限截断)
- 返回结构中包含
3.4 SyncOAToDidiLegalEntitySyncJob(OA→滴滴公司主体同步示例)
- 目的:从 OA 导出数据中解析“工号/所属公司”,并同步到滴滴员工的
legal_entity_id。 - 需要的 public_cfg 字段
oa_base_urloa_templateCodedidi_base_url- 可选:
senderLoginName/rightId/doTrigger/param/extra(透传到 OA 导出)
- 需要的 secret_cfg 字段(解密后)
- OA:
rest_user/rest_password/loginName - 滴滴:
company_id/client_id/client_secret/sign_key
- OA:
- 行为特征(示例策略)
- 从表单 definition 中定位 display 为“工号/所属公司”的字段名
- 公司主体查询结果做进程内缓存
- 员工修改按文档要求增加 150ms 间隔(避免限频)
- 输出统计:总行数/成功/跳过/错误列表(截取前 50)
4. 开发全流程指南(本地开发)
4.1 前置条件
- 安装 Docker 与 docker compose
- 端口规划(默认 compose 暴露):
8000:backend / Admin5432:postgres(如不需要宿主机直连,可在 compose 里移除映射)6379:redis(同上)
4.2 初始化与启动(开发模式)
- 在仓库根目录创建
.env(参考env.example):
cp env.example .env
- 启动(开发 overlay,支持热更新):
docker compose -f docker-compose.yml -f docker-compose.dev.yml up -d --build
或使用脚本:
./connecthub.sh dev-build
./connecthub.sh dev-start
- 首次启动会自动完成(无需手工执行):
- 创建
/data目录(宿主机对应./data) - 生成并持久化 Fernet key(默认
./data/fernet.key) - 初始化/升级数据库 schema(建表、补列、兼容约束)
- 打开 Admin:
http://localhost:8000/admin
4.3 新增一个插件 Job(标准步骤)
- 在
extensions/下创建你的模块与 Job 类(继承BaseJob,实现run(params, secrets))。 - 在 Admin 创建 Job:
id:自定义(建议带命名空间)enabled:truecron_expr:例如* * * * *(每分钟)handler_path:例如extensions.your_ext.job:YourJobpublic_cfg:必须是 JSON object(形如{...})secret_cfg:必须是 JSON object(形如{...},保存时会加密)
- 观察执行结果:
- 通过 Job 上的“立即运行”或等待 cron 触发
- 在 JobLog 中查看
message/traceback/run_log/snapshot_params
4.4 查看日志(开发/排查)
- 查看全部服务日志:
./connecthub.sh log
- 只看 worker(并 follow):
./connecthub.sh log -f worker
4.5 常见开发坑(必须注意)
public_cfg/secret_cfg在 Admin 中保存时必须输入 合法 JSON 对象(双引号、且为{...}),否则会直接阻止落库。- 编辑 Job 时
secret_cfg不回显:留空表示不更新;填写则会校验 JSON 并重新加密覆盖。 - 如果你在不同环境/不同机器上复用数据库,必须同时迁移
/data/fernet.key,否则解密会失败。
5. 上线全流程指南(docker compose 手动部署)
本项目当前形态为“统一环境 + 手动发布”,以下流程以
docker-compose.yml为准(生产不要使用 dev overlay)。
5.1 部署前准备清单
- 目标机依赖:Docker、docker compose
- 部署目录:建议固定到一个路径(例如
/opt/connecthub),保证./data可持久化 - 必须持久化的内容
./data/pgdata:PostgreSQL 数据目录(必须)./data/fernet.key:Fernet key(必须,影响历史 secret_cfg 可解密)./data/logs:文件日志(可选,但推荐便于排障)
- 端口与网络
- 若非必要,建议仅暴露
8000;5432/6379建议仅内网或不对外映射
- 若非必要,建议仅暴露
- 配置文件
.env:与代码同目录放置(composeenv_file: .env)- 核心变量:
DB_URL/REDIS_URL/DATA_DIR/FERNET_KEY_PATH/LOG_DIR
5.2 首次上线(从零部署)
- 准备代码与配置:
cd /path/to/your/deploy/dir
cp env.example .env
- 构建并启动(生产 compose):
docker compose up -d --build
或使用脚本:
./connecthub.sh build
- 验收检查点:
docker compose ps中backend/worker/beat/postgres/redis均为 healthy/runningGET http://<host>:8000/health返回 ok- Admin 可打开:
http://<host>:8000/admin - 能创建一个 Job 并“立即运行”,在 JobLog 中看到 RUNNING→SUCCESS/FAILURE 的完整链路
5.3 日常发布(升级)
- 拉取新版本代码(或替换为新代码包)。
- 重新构建并启动:
docker compose up -d --build
- 验证点(升级后必做):
backend可访问、/health正常beat在运行(否则 cron 不会触发)- worker 能写入 JobLog(检查最新日志是否持续产生)
- 若有 schema 变更:确认
ensure_schema执行未导致写库异常(看 worker 日志)
5.4 回滚策略(手动)
回滚核心目标:恢复“可用的旧版本服务”,并确保数据与 Fernet key 一致。
- 代码/镜像回滚
- 回到上一个可用版本的代码(git tag/commit/发布包)
- 重新
docker compose up -d --build
- 关键风险(必须牢记)
- 不能更换
./data/fernet.key:一旦更换,历史secret_cfg将无法解密,任务会失败。
- 不能更换
- 数据回滚
- 依赖备份(见第 6 章),优先恢复 Postgres 数据,再启动服务验证
6. 备份与恢复(必须项)
本项目需要至少备份以下两类资产:
- PostgreSQL 数据(任务定义与日志)
- Fernet key(密文配置解密必需)
6.1 备份策略(推荐最小集合)
- 必须备份
./data/fernet.key- PostgreSQL 数据(建议做“逻辑备份”或“目录快照”,至少二选一)
- 可选备份
./data/logs/(用于留存审计与排障)
6.2 逻辑备份(推荐,跨机器/跨路径更稳)
- 执行备份(示例:在目标机上对容器内 pg 执行
pg_dump):
docker compose exec -T postgres pg_dump -U connecthub -d connecthub > backup_connecthub.sql
- 同步备份 Fernet key:
cp ./data/fernet.key ./backup_fernet.key
- 恢复(新环境/故障恢复):
- 启动 postgres/redis(可先不启动 worker/beat)
- 导入 SQL:
cat backup_connecthub.sql | docker compose exec -T postgres psql -U connecthub -d connecthub
- 还原 Fernet key:
cp ./backup_fernet.key ./data/fernet.key
- 启动全量服务并验证:
- Admin 能打开
- 历史 Job 能正常运行(证明 secret_cfg 可解密)
6.3 目录快照备份(简单,但对一致性要求更高)
目录级备份前建议短暂停止写入(至少停止 worker/beat),避免备份不一致。
- 停止任务执行(建议):
docker compose stop worker beat
- 打包
pgdata与fernet.key:
tar -czf backup_data_$(date +%F).tgz ./data/pgdata ./data/fernet.key
- 恢复后再启动 worker/beat:
docker compose up -d
7. 排障手册(症状 → 定位 → 处理)
7.1 Admin 保存 Job 报错:public_cfg/secret_cfg 必须是 JSON object
- 症状:保存时报
public_cfg must be a JSON object或secret_cfg must be a JSON object - 定位:
app/admin/views.py(on_model_change校验) - 处理:
- 确认输入为
{...}且 key 使用双引号 - 不要输入数组
[...]、字符串"..."、或空内容
- 确认输入为
7.2 任务不按 cron 触发
- 症状:Job 创建后一直没有新日志产生
- 定位:
- beat 是否运行:
docker compose ps、./connecthub.sh log beat - worker 是否运行:
./connecthub.sh log worker - Job 是否启用:Admin 中
enabled - cron 是否正确:
cron_expr保存时虽校验,但仍可能逻辑不符合预期 - 时区语义:调度使用
Asia/Shanghai(app/tasks/dispatcher.py) - last_run_at 是否阻挡:同一分钟不会重复触发(
last_run_at >= now_min会跳过)
- beat 是否运行:
- 处理:
- 先用 Job 的“立即运行”验证执行链路(排除插件/外部系统问题)
- 确认 beat 正常运行后再排 cron
7.3 Retry 提示“正在运行中,请结束后再重试”
- 症状:点击 Retry 被拒绝
- 定位:
app/admin/routes.py(RUNNING 状态保护) - 处理:
- 等待任务结束(SUCCESS/FAILURE)后再 Retry
- 若任务卡死:检查 worker 进程状态与外部依赖,必要时重启 worker
7.4 secret_cfg 解密失败(Invalid token / looks truncated)
- 症状:任务失败,traceback 提示 Fernet token 无效或被截断
- 定位:
app/security/fernet.py:decrypt_json - 处理:
- 确认
./data/fernet.key未丢失、未被替换 - 若是 token 被截断/污染:在 Admin 重新填写
secret_cfg(JSON object)保存,让系统重新加密
- 确认
7.5 外部系统 API 调用失败(401/超时/限频)
- 定位:
- 致远:
app/integrations/seeyon.py(401/Invalid token 会自动刷新重试一次) - 滴滴:
app/integrations/didi.py(401 会刷新 token 重试一次;示例 Job 有 150ms 间隔)
- 致远:
- 处理:
- 查看 JobLog 的
run_log与traceback - 核对 secret_cfg 中凭证字段是否正确
- 若接口限频:按业务需要在 Job 内增加节流/批处理(参考示例策略)
- 查看 JobLog 的
7.6 run_log/message 太长被截断
- 症状:JobLog 的 run_log 或 message 尾部出现
[TRUNCATED] - 定位:
- run_log 截断:
app/core/log_capture.py - message 截断:
app/tasks/execute.py
- run_log 截断:
- 处理:
- 减少单次输出量(避免把大 payload 全量打印)
- 需要保留大文本时,可按块写日志(示例:
extensions/sync_oa_to_didi/job.py的分块写入方法)
8. 安全建议(上线必读)
-
敏感信息管理
.env不应提交到版本库;上线时使用权限受控的方式分发secret_cfg必须通过 Admin 输入并加密落库,避免在日志/代码中硬编码
-
端口暴露与网络隔离
postgres/redis建议仅内网访问或不对外映射端口- Admin 暴露到公网时建议加一层反向代理与访问控制(本仓库未内置鉴权)
-
日志脱敏
- 任何 token/密码类字段不应完整写入日志
- 可参考示例 Job 的 token mask 思路(
extensions/sync_oa_to_didi/job.py)
-
最小权限与持久化文件保护
- 保护
./data/fernet.key的读权限与备份权限 ./data/pgdata属于核心数据资产,应纳入备份与权限管理
- 保护