5.9 KiB
5.9 KiB
ConnectHub 开发手册
ConnectHub 是一个轻量级企业集成中间件:统一管理多系统集成任务(Job),提供定时调度、执行监控与“一键重试”。
项目结构树
.
├── app
│ ├── admin
│ │ ├── routes.py
│ │ ├── templates
│ │ │ └── joblog_details.html
│ │ └── views.py
│ ├── core
│ │ ├── config.py
│ │ └── logging.py
│ ├── db
│ │ ├── crud.py
│ │ ├── engine.py
│ │ └── models.py
│ ├── integrations
│ │ └── base.py
│ ├── jobs
│ │ └── base.py
│ ├── plugins
│ │ └── manager.py
│ ├── security
│ │ └── fernet.py
│ ├── tasks
│ │ ├── celery_app.py
│ │ ├── dispatcher.py
│ │ └── execute.py
│ └── main.py
├── extensions
│ └── example
│ ├── client.py
│ └── job.py
├── docker
│ └── Dockerfile
├── docker-compose.yml
├── env.example
└── pyproject.toml
环境与配置
env.example:环境变量示例(由于环境限制,仓库中使用该文件名;本地运行时请手动创建.env并参考此文件)- 关键变量:
DATA_DIR=/data:容器内数据目录DB_URL=postgresql+psycopg://connecthub:connecthub_pwd_change_me@postgres:5432/connecthub:PostgreSQL 连接串(容器内通过 service namepostgres访问)REDIS_URL=redis://redis:6379/0:Celery Broker/BackendFERNET_KEY_PATH=/data/fernet.key:Fernet key 文件(自动生成并持久化;正式环境必须保留同一个 key,否则历史 secret_cfg 将无法解密)LOG_DIR=/data/logs:日志目录(可选)
核心框架实现要点
BaseJob(插件规范)
- 位置:
app/jobs/base.py - 规范:插件必须实现
run(params, secrets),其中:params来自Job.public_cfg(明文)secrets来自Job.secret_cfg解密后的明文(仅内存)
BaseClient(适配器/SDK)
- 位置:
app/integrations/base.py - 规范:业务 Job 禁止直接写 HTTP;必须通过 Client 访问外部系统(统一超时、重试、日志)。
SeeyonClient(致远 OA)
- 位置:
app/integrations/seeyon.py - 认证方式:
POST /seeyon/rest/token获取id作为 token,并在业务请求 header 中携带token: <id>(参考:调用Rest接口)。 - 最小配置示例:
public_cfg:
{"base_url":"https://oa.example.com"}
secret_cfg(会被加密落库):
{"rest_user":"REST帐号","rest_password":"REST密码","loginName":"可选-模拟登录名"}
- token 失效处理:遇到 401 或响应包含
Invalid token,自动刷新 token 并重试一次。
示例插件:sync_oa_to_didi(仅演示 token 获取日志)
- 插件 Job:
extensions/sync_oa_to_didi/job.py的SyncOAToDidiTokenJob - 在 Admin 创建 Job 时可使用:
handler_path:extensions.sync_oa_to_didi.job:SyncOAToDidiTokenJobpublic_cfg:
{"base_url":"https://oa.example.com"}
secret_cfg(会被加密落库):
{"rest_user":"REST帐号","rest_password":"REST密码","loginName":"可选-模拟登录名"}
Security(Fernet 加解密)
- 位置:
app/security/fernet.py - 说明:
secret_cfg在数据库中保存 Fernet 密文 token- Worker 执行前自动解密,仅在内存中传给 Job
- key 自动生成到
FERNET_KEY_PATH(默认/data/fernet.key),volume 挂载后可持久化
PluginManager(动态加载)
- 位置:
app/plugins/manager.py handler_path推荐格式:extensions.example.job:ExampleJob
数据层与 Admin(SQLAdmin)
- 模型:
app/db/models.py(Job / JobLog) - Admin:
Job:可视化增删改查JobLog:可视化查看执行日志(只读)JobLog详情页自定义Retry按钮:点击后读取snapshot_params并触发重试
- 关键文件:
app/admin/views.py:ModelView 定义;保存 Job 时自动加密secret_cfgapp/admin/templates/joblog_details.html:JobLog 详情模板覆盖,加入 Retry 按钮app/admin/routes.py:处理 Retry POST 并触发 Celery
任务引擎(Celery)
- Celery app:
app/tasks/celery_app.py - 调度:
- Beat 每分钟触发一次
connecthub.dispatcher.tick dispatcher.tick读取 DB Jobs,根据cron_expr到点触发connecthub.execute_job
- Beat 每分钟触发一次
- 执行:
app/tasks/execute.py的execute_job:读库/解密/加载插件/执行/写 JobLog(含异常堆栈)
运行指南
- 在仓库根目录创建
.env(参考env.example) - 生产模式启动:
docker compose up -d --build
- 打开 Admin:
http://localhost:8000/admin
- 创建一个示例 Job(ExampleJob):
id:example.hellocron_expr:* * * * *(每分钟)handler_path:extensions.example.job:ExampleJobpublic_cfg:{"name":"Mars"}secret_cfg:{"token":"demo-token"}(保存时自动加密落库)
- 等待 Beat 触发执行,或在 JobLog 里查看结果;若失败/想复跑,在 JobLog 详情页点击 Retry。
开发模式(实时更新代码)
开发阶段可以使用 dev compose 叠加文件,实现:
backend:uvicorn --reloadworker/beat:监听代码变更后自动重启进程加载新代码
启动命令(二选一):
# 方式 A:直接 docker compose 叠加
docker compose -f docker-compose.yml -f docker-compose.dev.yml up -d --build
# 方式 B:使用脚本
./connecthub.sh dev-build
./connecthub.sh dev-start
生产环境请只使用 docker-compose.yml(不挂载源码、不启用 reload/watch),发布通过重新 build 镜像完成。