Compare commits

...

1 Commits
dev2 ... main

Author SHA1 Message Date
Marsway 6566549a05 publish: version 0.1 2026-01-13 17:28:37 +08:00
39 changed files with 4772 additions and 212 deletions

2
.env
View File

@ -1,6 +1,6 @@
APP_NAME=ConnectHub
DATA_DIR=/data
DB_URL=sqlite:////data/connecthub.db
DB_URL=postgresql+psycopg://connecthub:connecthub_pwd_change_me@postgres:5432/connecthub
REDIS_URL=redis://redis:6379/0
FERNET_KEY_PATH=/data/fernet.key
DEV_MODE=1

5
.gitignore vendored Normal file
View File

@ -0,0 +1,5 @@
*.db
*.log
pgdata/
__pycache__/
*.pyc

View File

@ -48,9 +48,9 @@ ConnectHub 是一个轻量级企业集成中间件:统一管理多系统集成
- `env.example`:环境变量示例(由于环境限制,仓库中使用该文件名;本地运行时请手动创建 `.env` 并参考此文件)
- 关键变量:
- `DATA_DIR=/data`:容器内数据目录
- `DB_URL=sqlite:////data/connecthub.db`SQLite DB 文件
- `DB_URL=postgresql+psycopg://connecthub:connecthub_pwd_change_me@postgres:5432/connecthub`PostgreSQL 连接串(容器内通过 service name `postgres` 访问)
- `REDIS_URL=redis://redis:6379/0`Celery Broker/Backend
- `FERNET_KEY_PATH=/data/fernet.key`Fernet key 文件(自动生成并持久化)
- `FERNET_KEY_PATH=/data/fernet.key`Fernet key 文件(自动生成并持久化**正式环境必须保留同一个 key否则历史 secret_cfg 将无法解密**
- `LOG_DIR=/data/logs`:日志目录(可选)
### 核心框架实现要点
@ -67,6 +67,44 @@ ConnectHub 是一个轻量级企业集成中间件:统一管理多系统集成
- 位置:`app/integrations/base.py`
- 规范:业务 Job 禁止直接写 HTTP必须通过 Client 访问外部系统(统一超时、重试、日志)。
#### SeeyonClient致远 OA
- 位置:`app/integrations/seeyon.py`
- 认证方式:`POST /seeyon/rest/token` 获取 `id` 作为 token并在业务请求 header 中携带 `token: <id>`(参考:[调用Rest接口](https://open.seeyoncloud.com/seeyonapi/781/))。
- 最小配置示例:
- `public_cfg`
```json
{"base_url":"https://oa.example.com"}
```
- `secret_cfg`(会被加密落库):
```json
{"rest_user":"REST帐号","rest_password":"REST密码","loginName":"可选-模拟登录名"}
```
- 注意:在 Admin 中保存 `public_cfg/secret_cfg` 时必须输入 **合法 JSON 对象(双引号、且为 `{...}`**,否则会直接报错并阻止落库。
- token 失效处理:遇到 401 或响应包含 `Invalid token`,自动刷新 token 并重试一次。
#### 示例插件sync_oa_to_didi仅演示 token 获取日志)
- 插件 Job`extensions/sync_oa_to_didi/job.py` 的 `SyncOAToDidiTokenJob`
- 在 Admin 创建 Job 时可使用:
- `handler_path`: `extensions.sync_oa_to_didi.job:SyncOAToDidiTokenJob`
- `public_cfg`
```json
{"base_url":"https://oa.example.com"}
```
- `secret_cfg`(会被加密落库):
```json
{"rest_user":"REST帐号","rest_password":"REST密码","loginName":"可选-模拟登录名"}
```
#### SecurityFernet 加解密)
- 位置:`app/security/fernet.py`

View File

@ -1,15 +1,23 @@
from __future__ import annotations
from datetime import datetime
from urllib.parse import quote_plus
from fastapi import APIRouter, HTTPException, Request
from starlette.responses import RedirectResponse
from app.db import crud
from app.db.engine import get_session
from app.db.models import JobStatus
from app.tasks.execute import execute_job
router = APIRouter()
def _redirect_with_error(referer: str, msg: str) -> RedirectResponse:
sep = "&" if "?" in referer else "?"
return RedirectResponse(f"{referer}{sep}error={quote_plus(msg)}", status_code=303)
@router.post("/admin/joblogs/{log_id}/retry")
def retry_joblog(request: Request, log_id: int):
@ -18,20 +26,68 @@ def retry_joblog(request: Request, log_id: int):
log = crud.get_job_log(session, log_id)
if not log:
raise HTTPException(status_code=404, detail="JobLog not found")
# 关键:用 snapshot_params 重新触发任务(其中 secret_cfg 仍为密文)
execute_job.delay(snapshot_params=log.snapshot_params)
if log.status == JobStatus.RUNNING:
referer = request.headers.get("Referer") or str(request.url_for("admin:details", identity="joblog", pk=str(log_id)))
return _redirect_with_error(referer, "该任务日志正在运行中,请结束后再重试。")
# 创建新的 RUNNING JobLog并跳转到该条详情页
snapshot = dict(log.snapshot_params or {})
meta = dict(snapshot.get("meta") or {})
meta["trigger"] = "retry"
meta["started_at"] = datetime.utcnow().isoformat()
snapshot["meta"] = meta
new_log = crud.create_job_log(
session,
job_id=str(log.job_id),
status=JobStatus.RUNNING,
snapshot_params=snapshot,
message="运行中",
traceback="",
run_log="",
celery_task_id="",
attempt=0,
started_at=datetime.utcnow(),
finished_at=None,
)
execute_job.delay(snapshot_params=snapshot, log_id=int(new_log.id))
url = request.url_for("admin:details", identity="job-log", pk=str(new_log.id))
return RedirectResponse(url, status_code=303)
finally:
session.close()
referer = request.headers.get("Referer") or "/admin"
return RedirectResponse(referer, status_code=303)
@router.post("/admin/jobs/{job_id}/run")
def run_job(request: Request, job_id: str):
# 触发一次立即执行
execute_job.delay(job_id=job_id)
referer = request.headers.get("Referer") or "/admin"
return RedirectResponse(referer, status_code=303)
session = get_session()
try:
job = crud.get_job(session, job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
snapshot = {
"job_id": job.id,
"handler_path": job.handler_path,
"public_cfg": job.public_cfg or {},
"secret_cfg": job.secret_cfg or "",
"meta": {"trigger": "run_now", "started_at": datetime.utcnow().isoformat()},
}
new_log = crud.create_job_log(
session,
job_id=str(job.id),
status=JobStatus.RUNNING,
snapshot_params=snapshot,
message="运行中",
traceback="",
run_log="",
celery_task_id="",
attempt=0,
started_at=datetime.utcnow(),
finished_at=None,
)
execute_job.delay(job_id=job.id, log_id=int(new_log.id))
url = request.url_for("admin:details", identity="job-log", pk=str(new_log.id))
return RedirectResponse(url, status_code=303)
finally:
session.close()

View File

@ -0,0 +1,44 @@
{% extends "sqladmin/edit.html" %}
{% block content %}
{{ super() }}
<div class="card mt-3">
<div class="card-body">
<div class="row mb-3">
<label class="form-label col-sm-2 col-form-label">密文配置secret_cfg</label>
<div class="col-sm-10">
<textarea id="connecthub-secret-cfg" class="form-control" rows="8" placeholder='留空表示不修改;填写将覆盖并加密保存。示例:{"token":"xxx"}'></textarea>
<div class="form-text">
出于安全考虑,编辑页不回显历史密文。留空表示不修改;填写 JSON 对象将覆盖原值并重新加密保存。
</div>
</div>
</div>
</div>
</div>
{% endblock %}
{% block tail %}
{{ super() }}
<script>
(function () {
// SQLAdmin 默认 edit 页面会渲染一个 form这里将 textarea 的值注入为隐藏字段,以便提交到后端。
const form = document.querySelector("form");
const textarea = document.getElementById("connecthub-secret-cfg");
if (!form || !textarea) return;
let hidden = form.querySelector('input[name="secret_cfg"]');
if (!hidden) {
hidden = document.createElement("input");
hidden.type = "hidden";
hidden.name = "secret_cfg";
form.appendChild(hidden);
}
form.addEventListener("submit", function () {
hidden.value = textarea.value || "";
});
})();
</script>
{% endblock %}

View File

@ -22,7 +22,7 @@
<div class="ms-3 d-inline-block dropdown">
<a href="#" class="btn btn-secondary dropdown-toggle" id="dropdownMenuButton1" data-bs-toggle="dropdown"
aria-expanded="false">
Export
导出
</a>
<ul class="dropdown-menu" aria-labelledby="dropdownMenuButton1">
{% for export_type in model_view.export_types %}
@ -36,7 +36,7 @@
<div class="ms-3 d-inline-block">
<a href="{{ url_for('admin:export', identity=model_view.identity, export_type=model_view.export_types[0]) }}"
class="btn btn-secondary">
Export
导出
</a>
</div>
{% endif %}
@ -44,7 +44,7 @@
{% if model_view.can_create %}
<div class="ms-3 d-inline-block">
<a href="{{ url_for('admin:create', identity=model_view.identity) }}" class="btn btn-primary">
+ New {{ model_view.name }}
+ 新建{{ model_view.name }}
</a>
</div>
{% endif %}
@ -56,14 +56,14 @@
<button {% if not model_view.can_delete and not model_view._custom_actions_in_list %} disabled {% endif %}
class="btn btn-light dropdown-toggle" type="button" id="dropdownMenuButton" data-toggle="dropdown"
aria-haspopup="true" aria-expanded="false">
Actions
操作
</button>
{% if model_view.can_delete or model_view._custom_actions_in_list %}
<div class="dropdown-menu" aria-labelledby="dropdownMenuButton">
{% if model_view.can_delete %}
<a class="dropdown-item" id="action-delete" href="#" data-name="{{ model_view.name }}"
data-url="{{ url_for('admin:delete', identity=model_view.identity) }}" data-bs-toggle="modal"
data-bs-target="#modal-delete">Delete selected items</a>
data-bs-target="#modal-delete">删除所选</a>
{% endif %}
{% for custom_action, label in model_view._custom_actions_in_list.items() %}
{% if custom_action in model_view._custom_actions_confirmation %}
@ -85,9 +85,9 @@
<div class="col-md-4 text-muted">
<div class="input-group">
<input id="search-input" type="text" class="form-control"
placeholder="Search: {{ model_view.search_placeholder() }}"
placeholder="搜索:{{ model_view.search_placeholder() }}"
value="{{ request.query_params.get('search', '') }}">
<button id="search-button" class="btn" type="button">Search</button>
<button id="search-button" class="btn" type="button">搜索</button>
<button id="search-reset" class="btn" type="button" {% if not request.query_params.get('search')
%}disabled{% endif %}><i class="fa-solid fa-times"></i></button>
</div>
@ -120,7 +120,7 @@
{% endif %}
</th>
{% endfor %}
<th>Run Now</th>
<th>立即运行</th>
</tr>
</thead>
<tbody>
@ -172,8 +172,8 @@
{% endif %}
{% endfor %}
<td>
<form class="connecthub-run-form" method="post" action="/admin/jobs/{{ get_object_identifier(row) }}/run" onsubmit="return confirm('Run this job now?');">
<button type="submit" class="btn btn-primary btn-sm">Run Now</button>
<form class="connecthub-run-form" method="post" action="/admin/jobs/{{ get_object_identifier(row) }}/run" onsubmit="return confirm('确认立即执行该任务?');">
<button type="submit" class="btn btn-primary btn-sm">立即运行</button>
</form>
</td>
</tr>
@ -182,9 +182,9 @@
</table>
</div>
<div class="card-footer d-flex justify-content-between align-items-center gap-2">
<p class="m-0 text-muted">Showing <span>{{ ((pagination.page - 1) * pagination.page_size) + 1 }}</span> to
<span>{{ min(pagination.page * pagination.page_size, pagination.count) }}</span> of <span>{{ pagination.count
}}</span> items
<p class="m-0 text-muted">显示 <span>{{ ((pagination.page - 1) * pagination.page_size) + 1 }}</span>
<span>{{ min(pagination.page * pagination.page_size, pagination.count) }}</span>,共 <span>{{ pagination.count
}}</span>
</p>
<ul class="pagination m-0 ms-auto">
<li class="page-item {% if not pagination.has_previous %}disabled{% endif %}">
@ -194,7 +194,7 @@
<a class="page-link" href="#">
{% endif %}
<i class="fa-solid fa-chevron-left"></i>
prev
上一页
</a>
</li>
{% for page_control in pagination.page_controls %}
@ -207,21 +207,21 @@
{% else %}
<a class="page-link" href="#">
{% endif %}
next
下一页
<i class="fa-solid fa-chevron-right"></i>
</a>
</li>
</ul>
<div class="dropdown text-muted">
Show
每页显示
<a href="#" class="btn btn-sm btn-light dropdown-toggle" data-toggle="dropdown" aria-haspopup="true"
aria-expanded="false">
{{ request.query_params.get("pageSize") or model_view.page_size }} / Page
{{ request.query_params.get("pageSize") or model_view.page_size }} /
</a>
<div class="dropdown-menu">
{% for page_size_option in model_view.page_size_options %}
<a class="dropdown-item" href="{{ request.url.include_query_params(pageSize=page_size_option, page=pagination.resize(page_size_option).page) }}">
{{ page_size_option }} / Page
{{ page_size_option }} /
</a>
{% endfor %}
</div>
@ -233,7 +233,7 @@
<div class="col-md-3" style="width: 300px; flex-shrink: 0;">
<div id="filter-sidebar" class="card">
<div class="card-header">
<h3 class="card-title">Filters</h3>
<h3 class="card-title">筛选</h3>
</div>
<div class="card-body">
{% for filter in model_view.get_filters() %}
@ -245,8 +245,8 @@
{% set current_op = request.query_params.get(filter.parameter_name + '_op', '') %}
{% if current_filter %}
<div class="mb-2 text-muted small">
Current: {{ current_op }} {{ current_filter }}
<a href="{{ request.url.remove_query_params(filter.parameter_name).remove_query_params(filter.parameter_name + '_op') }}" class="text-decoration-none">[Clear]</a>
当前:{{ current_op }} {{ current_filter }}
<a href="{{ request.url.remove_query_params(filter.parameter_name).remove_query_params(filter.parameter_name + '_op') }}" class="text-decoration-none">[清除]</a>
</div>
{% endif %}
<form method="get" class="d-flex flex-column" style="gap: 8px;">
@ -256,7 +256,7 @@
{% endif %}
{% endfor %}
<select name="{{ filter.parameter_name }}_op" class="form-select form-select-sm" required>
<option value="">Select operation...</option>
<option value="">选择操作...</option>
{% for op_value, op_label in filter.get_operation_options_for_model(model_view.model) %}
<option value="{{ op_value }}" {% if current_op == op_value %}selected{% endif %}>{{ op_label }}</option>
{% endfor %}
@ -267,7 +267,7 @@
class="form-control form-control-sm"
value="{{ current_filter }}"
required>
<button type="submit" class="btn btn-sm btn-outline-primary">Apply Filter</button>
<button type="submit" class="btn btn-sm btn-outline-primary">应用筛选</button>
</form>
</div>
</div>

View File

@ -17,13 +17,18 @@
{% endfor %}: {{ get_object_identifier(model) }}
</h3>
</div>
{% if request.query_params.get('error') %}
<div class="alert alert-danger m-3" role="alert">
{{ request.query_params.get('error') }}
</div>
{% endif %}
<div class="card-body border-bottom py-3">
<div class="table-responsive">
<table class="table card-table table-vcenter text-nowrap table-hover table-bordered">
<thead>
<tr>
<th class="w-1">Column</th>
<th class="w-1">Value</th>
<th class="w-1">字段</th>
<th class="w-1"></th>
</tr>
</thead>
<tbody>
@ -58,12 +63,12 @@
<div class="row connecthub-action-row">
<div class="col-md-1">
<a href="{{ url_for('admin:list', identity=model_view.identity) }}" class="btn">
Go Back
返回
</a>
</div>
<div class="col-md-1">
<form method="post" action="/admin/joblogs/{{ get_object_identifier(model) }}/retry" style="display:inline;" onsubmit="return confirm('Retry this job log?');">
<button type="submit" class="btn btn-warning">Retry</button>
<form method="post" action="/admin/joblogs/{{ get_object_identifier(model) }}/retry" style="display:inline;" onsubmit="return confirm('确认重试该任务日志?');">
<button type="submit" class="btn btn-warning">重试</button>
</form>
</div>
{% if model_view.can_delete %}
@ -71,14 +76,14 @@
<a href="#" data-name="{{ model_view.name }}" data-pk="{{ get_object_identifier(model) }}"
data-url="{{ model_view._url_for_delete(request, model) }}" data-bs-toggle="modal"
data-bs-target="#modal-delete" class="btn btn-danger">
Delete
删除
</a>
</div>
{% endif %}
{% if model_view.can_edit %}
<div class="col-md-1">
<a href="{{ model_view._build_url_for('admin:edit', request, model) }}" class="btn btn-primary">
Edit
编辑
</a>
</div>
{% endif %}

View File

@ -22,7 +22,7 @@
<div class="ms-3 d-inline-block dropdown">
<a href="#" class="btn btn-secondary dropdown-toggle" id="dropdownMenuButton1" data-bs-toggle="dropdown"
aria-expanded="false">
Export
导出
</a>
<ul class="dropdown-menu" aria-labelledby="dropdownMenuButton1">
{% for export_type in model_view.export_types %}
@ -36,7 +36,7 @@
<div class="ms-3 d-inline-block">
<a href="{{ url_for('admin:export', identity=model_view.identity, export_type=model_view.export_types[0]) }}"
class="btn btn-secondary">
Export
导出
</a>
</div>
{% endif %}
@ -44,26 +44,31 @@
{% if model_view.can_create %}
<div class="ms-3 d-inline-block">
<a href="{{ url_for('admin:create', identity=model_view.identity) }}" class="btn btn-primary">
+ New {{ model_view.name }}
+ 新建{{ model_view.name }}
</a>
</div>
{% endif %}
</div>
</div>
{% if request.query_params.get('error') %}
<div class="alert alert-danger m-3" role="alert">
{{ request.query_params.get('error') }}
</div>
{% endif %}
<div class="card-body border-bottom py-3">
<div class="d-flex justify-content-between">
<div class="dropdown col-4">
<button {% if not model_view.can_delete and not model_view._custom_actions_in_list %} disabled {% endif %}
class="btn btn-light dropdown-toggle" type="button" id="dropdownMenuButton" data-toggle="dropdown"
aria-haspopup="true" aria-expanded="false">
Actions
操作
</button>
{% if model_view.can_delete or model_view._custom_actions_in_list %}
<div class="dropdown-menu" aria-labelledby="dropdownMenuButton">
{% if model_view.can_delete %}
<a class="dropdown-item" id="action-delete" href="#" data-name="{{ model_view.name }}"
data-url="{{ url_for('admin:delete', identity=model_view.identity) }}" data-bs-toggle="modal"
data-bs-target="#modal-delete">Delete selected items</a>
data-bs-target="#modal-delete">删除所选</a>
{% endif %}
{% for custom_action, label in model_view._custom_actions_in_list.items() %}
{% if custom_action in model_view._custom_actions_confirmation %}
@ -85,9 +90,9 @@
<div class="col-md-4 text-muted">
<div class="input-group">
<input id="search-input" type="text" class="form-control"
placeholder="Search: {{ model_view.search_placeholder() }}"
placeholder="搜索:{{ model_view.search_placeholder() }}"
value="{{ request.query_params.get('search', '') }}">
<button id="search-button" class="btn" type="button">Search</button>
<button id="search-button" class="btn" type="button">搜索</button>
<button id="search-reset" class="btn" type="button" {% if not request.query_params.get('search')
%}disabled{% endif %}><i class="fa-solid fa-times"></i></button>
</div>
@ -120,7 +125,7 @@
{% endif %}
</th>
{% endfor %}
<th>Retry</th>
<th>重试</th>
</tr>
</thead>
<tbody>
@ -172,8 +177,8 @@
{% endif %}
{% endfor %}
<td>
<form class="connecthub-retry-form" method="post" action="/admin/joblogs/{{ get_object_identifier(row) }}/retry" onsubmit="return confirm('Retry this job log?');">
<button type="submit" class="btn btn-warning btn-sm">Retry</button>
<form class="connecthub-retry-form" method="post" action="/admin/joblogs/{{ get_object_identifier(row) }}/retry" onsubmit="return confirm('确认重试该任务日志?');">
<button type="submit" class="btn btn-warning btn-sm">重试</button>
</form>
</td>
</tr>
@ -182,9 +187,9 @@
</table>
</div>
<div class="card-footer d-flex justify-content-between align-items-center gap-2">
<p class="m-0 text-muted">Showing <span>{{ ((pagination.page - 1) * pagination.page_size) + 1 }}</span> to
<span>{{ min(pagination.page * pagination.page_size, pagination.count) }}</span> of <span>{{ pagination.count
}}</span> items
<p class="m-0 text-muted">显示 <span>{{ ((pagination.page - 1) * pagination.page_size) + 1 }}</span>
<span>{{ min(pagination.page * pagination.page_size, pagination.count) }}</span>,共 <span>{{ pagination.count
}}</span>
</p>
<ul class="pagination m-0 ms-auto">
<li class="page-item {% if not pagination.has_previous %}disabled{% endif %}">
@ -194,7 +199,7 @@
<a class="page-link" href="#">
{% endif %}
<i class="fa-solid fa-chevron-left"></i>
prev
上一页
</a>
</li>
{% for page_control in pagination.page_controls %}
@ -207,21 +212,21 @@
{% else %}
<a class="page-link" href="#">
{% endif %}
next
下一页
<i class="fa-solid fa-chevron-right"></i>
</a>
</li>
</ul>
<div class="dropdown text-muted">
Show
每页显示
<a href="#" class="btn btn-sm btn-light dropdown-toggle" data-toggle="dropdown" aria-haspopup="true"
aria-expanded="false">
{{ request.query_params.get("pageSize") or model_view.page_size }} / Page
{{ request.query_params.get("pageSize") or model_view.page_size }} /
</a>
<div class="dropdown-menu">
{% for page_size_option in model_view.page_size_options %}
<a class="dropdown-item" href="{{ request.url.include_query_params(pageSize=page_size_option, page=pagination.resize(page_size_option).page) }}">
{{ page_size_option }} / Page
{{ page_size_option }} /
</a>
{% endfor %}
</div>
@ -233,7 +238,7 @@
<div class="col-md-3" style="width: 300px; flex-shrink: 0;">
<div id="filter-sidebar" class="card">
<div class="card-header">
<h3 class="card-title">Filters</h3>
<h3 class="card-title">筛选</h3>
</div>
<div class="card-body">
{% for filter in model_view.get_filters() %}
@ -245,8 +250,8 @@
{% set current_op = request.query_params.get(filter.parameter_name + '_op', '') %}
{% if current_filter %}
<div class="mb-2 text-muted small">
Current: {{ current_op }} {{ current_filter }}
<a href="{{ request.url.remove_query_params(filter.parameter_name).remove_query_params(filter.parameter_name + '_op') }}" class="text-decoration-none">[Clear]</a>
当前:{{ current_op }} {{ current_filter }}
<a href="{{ request.url.remove_query_params(filter.parameter_name).remove_query_params(filter.parameter_name + '_op') }}" class="text-decoration-none">[清除]</a>
</div>
{% endif %}
<form method="get" class="d-flex flex-column" style="gap: 8px;">
@ -256,7 +261,7 @@
{% endif %}
{% endfor %}
<select name="{{ filter.parameter_name }}_op" class="form-select form-select-sm" required>
<option value="">Select operation...</option>
<option value="">选择操作...</option>
{% for op_value, op_label in filter.get_operation_options_for_model(model_view.model) %}
<option value="{{ op_value }}" {% if current_op == op_value %}selected{% endif %}>{{ op_label }}</option>
{% endfor %}
@ -267,7 +272,7 @@
class="form-control form-control-sm"
value="{{ current_filter }}"
required>
<button type="submit" class="btn btn-sm btn-outline-primary">Apply Filter</button>
<button type="submit" class="btn btn-sm btn-outline-primary">应用筛选</button>
</form>
</div>
</div>

View File

@ -3,14 +3,19 @@ from __future__ import annotations
import json
from datetime import datetime
from typing import Any
from urllib.parse import quote_plus
from zoneinfo import ZoneInfo
from croniter import croniter
from markupsafe import Markup
from sqladmin import ModelView, action
from sqladmin.filters import OperationColumnFilter
from sqladmin.models import Request
from starlette.responses import RedirectResponse
from app.db import crud
from app.db.engine import get_session
from app.db.models import JobStatus
from app.db.models import Job, JobLog
from app.plugins.manager import load_job_class
from app.security.fernet import encrypt_json
@ -45,9 +50,10 @@ def _truncate(s: str, n: int = 120) -> str:
class JobAdmin(ModelView, model=Job):
name = "Job"
name_plural = "Jobs"
name = "任务"
name_plural = "任务"
icon = "fa fa-cogs"
can_delete = False
column_list = [Job.id, Job.enabled, Job.cron_expr, Job.handler_path, Job.updated_at]
column_details_list = [
@ -69,23 +75,171 @@ class JobAdmin(ModelView, model=Job):
# 为 Job 详情页指定模板(用于调整按钮间距)
details_template = "job_details.html"
# 编辑页secret_cfg 只写不读(不回显密文;留空表示不更新)
edit_template = "job_edit.html"
# 列表页模板:加入每行 Run Now
list_template = "job_list.html"
# 编辑页排除 secret_cfg避免回显密文由自定义模板额外渲染一个空输入框
# 注意SQLAdmin 这里需要字段名字符串(不是 SQLAlchemy Column 对象)
form_edit_rules = ["id", "enabled", "cron_expr", "handler_path", "public_cfg"]
column_labels = {
"id": "任务ID",
"enabled": "启用",
"cron_expr": "Cron 表达式",
"handler_path": "处理器",
"public_cfg": "明文配置",
"secret_cfg": "密文配置",
"last_run_at": "上次运行时间",
"created_at": "创建时间",
"updated_at": "更新时间",
}
column_formatters = {
Job.created_at: lambda m, a: _fmt_dt_seconds(m.created_at),
Job.updated_at: lambda m, a: _fmt_dt_seconds(m.updated_at),
Job.last_run_at: lambda m, a: _fmt_dt_seconds(m.last_run_at),
}
column_formatters_detail = {
Job.created_at: lambda m, a: _fmt_dt_seconds(m.created_at),
Job.updated_at: lambda m, a: _fmt_dt_seconds(m.updated_at),
Job.last_run_at: lambda m, a: _fmt_dt_seconds(m.last_run_at),
}
@action(
name="run_now",
label="Run Now",
confirmation_message="Trigger this job now?",
label="立即运行",
confirmation_message="确认立即执行该任务?",
add_in_list=True,
add_in_detail=True,
)
async def run_now(self, request: Request): # type: ignore[override]
pks = request.query_params.get("pks", "").split(",")
for pk in [p for p in pks if p]:
execute_job.delay(job_id=pk)
ids = [p for p in pks if p]
if not ids:
return RedirectResponse(request.url_for("admin:list", identity=self.identity), status_code=303)
session = get_session()
created_log_id: int | None = None
try:
for pk in ids:
job = crud.get_job(session, pk)
if not job:
continue
snapshot = {
"job_id": job.id,
"handler_path": job.handler_path,
"public_cfg": job.public_cfg or {},
"secret_cfg": job.secret_cfg or "",
"meta": {"trigger": "admin_run_now"},
}
log = crud.create_job_log(
session,
job_id=job.id,
status=JobStatus.RUNNING,
snapshot_params=snapshot,
message="运行中",
traceback="",
run_log="",
celery_task_id="",
attempt=0,
started_at=datetime.utcnow(),
finished_at=None,
)
if created_log_id is None:
created_log_id = int(log.id)
execute_job.delay(job_id=job.id, log_id=int(log.id))
finally:
session.close()
if created_log_id is not None:
url = request.url_for("admin:details", identity="job-log", pk=str(created_log_id))
return RedirectResponse(url, status_code=303)
return RedirectResponse(request.url_for("admin:list", identity=self.identity), status_code=303)
@action(
name="view_logs",
label="查看日志",
add_in_list=True,
add_in_detail=True,
)
async def view_logs(self, request: Request): # type: ignore[override]
pks = request.query_params.get("pks", "").split(",")
pk = next((p for p in pks if p), "")
base = str(request.url_for("admin:list", identity="job-log"))
if pk:
return RedirectResponse(f"{base}?search={quote_plus(pk)}", status_code=303)
return RedirectResponse(base, status_code=303)
@action(
name="disable_job",
label="停用任务(保留日志)",
confirmation_message="确认停用该任务(保留历史日志)?",
add_in_list=True,
add_in_detail=False,
)
async def disable_job(self, request: Request): # type: ignore[override]
pks = request.query_params.get("pks", "").split(",")
ids = [p for p in pks if p]
session = get_session()
try:
for pk in ids:
job = crud.get_job(session, pk)
if not job:
continue
job.enabled = False
session.add(job)
session.commit()
finally:
session.close()
referer = request.headers.get("Referer")
return RedirectResponse(referer or request.url_for("admin:list", identity=self.identity), status_code=303)
@action(
name="clear_job_logs",
label="清理日志(保留任务)",
confirmation_message="确认清理该任务的所有日志(保留任务本身)?",
add_in_list=True,
add_in_detail=False,
)
async def clear_job_logs(self, request: Request): # type: ignore[override]
pks = request.query_params.get("pks", "").split(",")
ids = [p for p in pks if p]
session = get_session()
try:
for pk in ids:
crud.delete_job_logs_by_job_id(session, pk)
finally:
session.close()
referer = request.headers.get("Referer")
return RedirectResponse(referer or request.url_for("admin:list", identity=self.identity), status_code=303)
@action(
name="delete_job_with_logs",
label="删除任务(含日志)",
confirmation_message="确认删除该任务及其所有日志?此操作不可恢复。",
add_in_list=True,
add_in_detail=False,
)
async def delete_job_with_logs(self, request: Request): # type: ignore[override]
pks = request.query_params.get("pks", "").split(",")
ids = [p for p in pks if p]
session = get_session()
try:
for pk in ids:
job = crud.get_job(session, pk)
if not job:
continue
crud.delete_job_logs_by_job_id(session, job.id)
session.delete(job)
session.commit()
finally:
session.close()
return RedirectResponse(request.url_for("admin:list", identity=self.identity), status_code=303)
async def on_model_change(self, data: dict, model: Job, is_created: bool, request) -> None: # type: ignore[override]
# id 必填(避免插入时触发 NOT NULL
raw_id = data.get("id") if is_created else (data.get("id") or getattr(model, "id", None))
@ -106,45 +260,64 @@ class JobAdmin(ModelView, model=Job):
itr = croniter(str(cron_expr).strip(), base)
_ = itr.get_next(datetime)
# public_cfg 允许以 JSON 字符串输入
pcfg = _maybe_json(data.get("public_cfg"))
# public_cfg:必须是合法 JSON 对象dict否则直接报错阻止落库
pcfg = data.get("public_cfg")
if isinstance(pcfg, str):
try:
pcfg = json.loads(pcfg)
except json.JSONDecodeError as e:
raise ValueError("public_cfg must be a JSON object") from e
if not isinstance(pcfg, dict):
raise ValueError("public_cfg must be a JSON object")
if isinstance(pcfg, dict):
data["public_cfg"] = pcfg
# secret_cfg若用户输入 JSON 字符串,则自动加密落库;若输入已是 token则原样保存
scfg = data.get("secret_cfg", "")
if scfg is None:
data["secret_cfg"] = ""
return
# secret_cfg
# - 创建:必须是合法 JSON 对象dict并且保存时必须加密落库
# - 编辑:出于安全考虑不回显密文;若留空则保留原密文不更新;若填写则按 JSON 校验并加密覆盖
if is_created:
scfg = data.get("secret_cfg")
if isinstance(scfg, str):
s = scfg.strip()
if not s:
data["secret_cfg"] = ""
return
parsed = _maybe_json(s)
if isinstance(parsed, dict):
data["secret_cfg"] = encrypt_json(parsed)
else:
# 非 JSON视为已加密 token
data["secret_cfg"] = s
return
if isinstance(scfg, dict):
try:
scfg = json.loads(scfg)
except json.JSONDecodeError as e:
raise ValueError("secret_cfg must be a JSON object") from e
if not isinstance(scfg, dict):
raise ValueError("secret_cfg must be a JSON object")
data["secret_cfg"] = encrypt_json(scfg)
return
raise ValueError("secret_cfg must be JSON object or encrypted token string")
else:
# 自定义编辑页会以 textarea 传回 secret_cfg可能不存在或为空
try:
form = await request.form()
raw = form.get("secret_cfg")
except Exception:
raw = None
raw_s = str(raw).strip() if raw is not None else ""
if not raw_s:
# 留空:不更新密文字段
data.pop("secret_cfg", None)
else:
try:
scfg2 = json.loads(raw_s)
except json.JSONDecodeError as e:
raise ValueError("secret_cfg must be a JSON object") from e
if not isinstance(scfg2, dict):
raise ValueError("secret_cfg must be a JSON object")
data["secret_cfg"] = encrypt_json(scfg2)
class JobLogAdmin(ModelView, model=JobLog):
name = "JobLog"
name_plural = "JobLogs"
name = "任务日志"
name_plural = "任务日志"
icon = "fa fa-list"
identity = "job-log"
can_create = False
can_edit = False
can_delete = False
# 支持按 job_id 搜索(不启用筛选栏,避免页面溢出)
column_searchable_list = [JobLog.job_id]
# 列表更适合扫读:保留关键字段 + message截断
column_list = [JobLog.id, JobLog.job_id, JobLog.status, JobLog.started_at, JobLog.finished_at, JobLog.message]
# 默认按 started_at 倒序(最新在前)
@ -168,6 +341,20 @@ class JobLogAdmin(ModelView, model=JobLog):
# 为 JobLog 详情页单独指定模板(用于加入 Retry 按钮)
details_template = "joblog_details.html"
column_labels = {
"id": "日志ID",
"job_id": "任务ID",
"status": "状态",
"snapshot_params": "快照参数",
"message": "消息",
"traceback": "异常堆栈",
"run_log": "运行日志",
"celery_task_id": "Celery任务ID",
"attempt": "重试次数",
"started_at": "开始时间",
"finished_at": "结束时间",
}
column_formatters = {
JobLog.started_at: lambda m, a: _fmt_dt_seconds(m.started_at),
JobLog.finished_at: lambda m, a: _fmt_dt_seconds(m.finished_at),
@ -177,6 +364,11 @@ class JobLogAdmin(ModelView, model=JobLog):
column_formatters_detail = {
JobLog.started_at: lambda m, a: _fmt_dt_seconds(m.started_at),
JobLog.finished_at: lambda m, a: _fmt_dt_seconds(m.finished_at),
JobLog.message: lambda m, a: Markup(
"<pre style='max-height:240px;overflow:auto;white-space:pre-wrap'>"
+ (m.message or "")
+ "</pre>"
),
JobLog.traceback: lambda m, a: Markup(f"<pre style='white-space:pre-wrap'>{m.traceback or ''}</pre>"),
JobLog.run_log: lambda m, a: Markup(
"<pre style='max-height:480px;overflow:auto;white-space:pre-wrap'>"

View File

@ -8,7 +8,7 @@ class Settings(BaseSettings):
app_name: str = "ConnectHub"
data_dir: str = "/data"
db_url: str = "sqlite:////data/connecthub.db"
db_url: str = "postgresql+psycopg://connecthub:connecthub_pwd_change_me@postgres:5432/connecthub"
redis_url: str = "redis://redis:6379/0"
fernet_key_path: str = "/data/fernet.key"
dev_mode: bool = False

View File

@ -1,10 +1,28 @@
from __future__ import annotations
import logging
import os
from contextlib import contextmanager
from typing import Callable, Iterator
class JobLogIdFilter(logging.Filter):
"""
仅允许写入属于指定 job_log_id LogRecord
依赖 setup_logging() 安装的 LogRecordFactory 注入字段connecthub_job_log_id
"""
def __init__(self, *, job_log_id: int) -> None:
super().__init__()
self.job_log_id = int(job_log_id)
def filter(self, record: logging.LogRecord) -> bool: # noqa: A003
try:
return getattr(record, "connecthub_job_log_id", None) == self.job_log_id
except Exception:
return False
class SafeBufferingHandler(logging.Handler):
"""
只用于尽力捕获运行日志
@ -52,7 +70,12 @@ class SafeBufferingHandler(logging.Handler):
@contextmanager
def capture_logs(*, max_bytes: int = 200_000) -> Iterator[Callable[[], str]]:
def capture_logs(
*,
max_bytes: int = 200_000,
job_log_id: int | None = None,
file_path: str | None = None,
) -> Iterator[Callable[[], str]]:
"""
捕获当前进程root logger输出的日志文本
任何问题都不应影响业务执行
@ -64,6 +87,31 @@ def capture_logs(*, max_bytes: int = 200_000) -> Iterator[Callable[[], str]]:
logging.Formatter(fmt="%(asctime)s %(levelname)s %(name)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
)
file_handler: logging.Handler | None = None
flt: JobLogIdFilter | None = None
if job_log_id is not None:
try:
flt = JobLogIdFilter(job_log_id=int(job_log_id))
handler.addFilter(flt)
except Exception:
flt = None
if file_path:
try:
parent = os.path.dirname(file_path)
if parent:
os.makedirs(parent, exist_ok=True)
fh = logging.FileHandler(file_path, encoding="utf-8")
fh.setLevel(logging.INFO)
fh.setFormatter(
logging.Formatter(fmt="%(asctime)s %(levelname)s %(name)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
)
if flt is not None:
fh.addFilter(flt)
file_handler = fh
except Exception:
file_handler = None
try:
root.addHandler(handler)
except Exception:
@ -71,6 +119,16 @@ def capture_logs(*, max_bytes: int = 200_000) -> Iterator[Callable[[], str]]:
yield lambda: ""
return
if file_handler is not None:
try:
root.addHandler(file_handler)
except Exception:
try:
file_handler.close()
except Exception:
pass
file_handler = None
try:
yield handler.get_text
finally:
@ -78,5 +136,14 @@ def capture_logs(*, max_bytes: int = 200_000) -> Iterator[Callable[[], str]]:
root.removeHandler(handler)
except Exception:
pass
if file_handler is not None:
try:
root.removeHandler(file_handler)
except Exception:
pass
try:
file_handler.close()
except Exception:
pass

43
app/core/log_context.py Normal file
View File

@ -0,0 +1,43 @@
from __future__ import annotations
from contextvars import ContextVar, Token
_job_id_var: ContextVar[str | None] = ContextVar("connecthub_job_id", default=None)
_job_log_id_var: ContextVar[int | None] = ContextVar("connecthub_job_log_id", default=None)
JobContextTokens = tuple[Token[str | None], Token[int | None]]
def set_job_context(*, job_id: str | None, job_log_id: int | None) -> JobContextTokens:
"""
设置当前执行上下文用于日志隔离
返回 tokens便于在 finally reset 回原值
"""
t1 = _job_id_var.set(job_id)
t2 = _job_log_id_var.set(job_log_id)
return (t1, t2)
def clear_job_context(tokens: JobContextTokens | None = None) -> None:
"""
清理当前执行上下文
- 若提供 tokensreset 回进入上下文前的值推荐
- 否则直接置空
"""
if tokens is not None:
_job_id_var.reset(tokens[0])
_job_log_id_var.reset(tokens[1])
return
_job_id_var.set(None)
_job_log_id_var.set(None)
def get_job_id() -> str | None:
return _job_id_var.get()
def get_job_log_id() -> int | None:
return _job_log_id_var.get()

View File

@ -5,6 +5,7 @@ import os
from logging.handlers import RotatingFileHandler
from app.core.config import settings
from app.core.log_context import get_job_id, get_job_log_id
def setup_logging() -> None:
@ -12,6 +13,25 @@ def setup_logging() -> None:
if getattr(logger, "_connecthub_configured", False):
return
# 为每条日志注入“当前任务上下文”,供 per-run 日志隔离过滤使用。
# 仅安装一次(跨所有 logger 生效)。
if not getattr(logging, "_connecthub_record_factory_installed", False):
old_factory = logging.getLogRecordFactory()
def _record_factory(*args, **kwargs): # type: ignore[no-untyped-def]
record = old_factory(*args, **kwargs)
try:
setattr(record, "connecthub_job_id", get_job_id())
setattr(record, "connecthub_job_log_id", get_job_log_id())
except Exception:
# best-effort任何问题都不能影响日志系统
setattr(record, "connecthub_job_id", None)
setattr(record, "connecthub_job_log_id", None)
return record
logging.setLogRecordFactory(_record_factory)
setattr(logging, "_connecthub_record_factory_installed", True)
logger.setLevel(logging.INFO)
formatter = logging.Formatter(
fmt="%(asctime)s %(levelname)s %(name)s %(message)s",

View File

@ -62,3 +62,47 @@ def get_job_log(session: Session, log_id: int) -> JobLog | None:
return session.get(JobLog, log_id)
def update_job_log(
session: Session,
log_id: int,
*,
status: JobStatus | None = None,
message: str | None = None,
traceback: str | None = None,
run_log: str | None = None,
celery_task_id: str | None = None,
attempt: int | None = None,
finished_at: datetime | None = None,
) -> JobLog | None:
log = session.get(JobLog, log_id)
if not log:
return None
if status is not None:
log.status = status
if message is not None:
log.message = message
if traceback is not None:
log.traceback = traceback
if run_log is not None:
log.run_log = run_log
if celery_task_id is not None:
log.celery_task_id = celery_task_id
if attempt is not None:
log.attempt = attempt
if finished_at is not None:
log.finished_at = finished_at
session.add(log)
session.commit()
session.refresh(log)
return log
def delete_job_logs_by_job_id(session: Session, job_id: str) -> int:
logs = list(session.scalars(select(JobLog).where(JobLog.job_id == job_id)))
for log in logs:
session.delete(log)
session.commit()
return len(logs)

View File

@ -6,11 +6,11 @@ from sqlalchemy.orm import Session, sessionmaker
from app.core.config import settings
engine = create_engine(
settings.db_url,
connect_args={"check_same_thread": False} if settings.db_url.startswith("sqlite") else {},
future=True,
)
_kwargs = {"future": True}
if settings.db_url.startswith("sqlite"):
_kwargs["connect_args"] = {"check_same_thread": False}
engine = create_engine(settings.db_url, **_kwargs)
SessionLocal = sessionmaker(bind=engine, class_=Session, autoflush=False, autocommit=False, future=True)

View File

@ -33,6 +33,7 @@ class Job(Base):
class JobStatus(str, enum.Enum):
RUNNING = "RUNNING"
SUCCESS = "SUCCESS"
FAILURE = "FAILURE"
RETRY = "RETRY"

View File

@ -1,26 +1,160 @@
from __future__ import annotations
from sqlalchemy import Engine, text
from sqlalchemy import Engine, inspect, text
from app.db.models import Base
def _has_column(conn, table: str, col: str) -> bool:
rows = conn.execute(text(f"PRAGMA table_info({table})")).fetchall()
return any(r[1] == col for r in rows) # PRAGMA columns: (cid, name, type, notnull, dflt_value, pk)
def _has_column(engine: Engine, table: str, col: str) -> bool:
insp = inspect(engine)
cols = insp.get_columns(table)
return any(c.get("name") == col for c in cols)
def _sqlite_table_sql(conn, table: str) -> str:
row = conn.execute(
text("SELECT sql FROM sqlite_master WHERE type='table' AND name=:name"),
{"name": table},
).fetchone()
return str(row[0] or "") if row else ""
def _ensure_job_logs_status_allows_running(engine: Engine) -> None:
"""
status 新增 RUNNING 时的轻量自升级
- SQLite如存在 CHECK 且不包含 RUNNING则通过重建表方式迁移移除旧 CHECK确保允许 RUNNING
- PostgreSQL如存在 status CHECK 且不包含 RUNNING drop & recreate
"""
dialect = engine.dialect.name
if dialect not in ("sqlite", "postgresql"):
return
insp = inspect(engine)
try:
cols = insp.get_columns("job_logs")
except Exception:
return
existing_cols = {c.get("name") for c in cols if c.get("name")}
with engine.begin() as conn:
if dialect == "sqlite":
sql = _sqlite_table_sql(conn, "job_logs")
# 没有 CHECK 约束则无需迁移;有 CHECK 但已包含 RUNNING 也无需迁移
if not sql or "CHECK" not in sql or "RUNNING" in sql:
return
# 重建表:去掉旧 CHECK允许 RUNNING并确保列存在缺列用默认值补齐
conn.execute(text("ALTER TABLE job_logs RENAME TO job_logs_old"))
conn.execute(
text(
"""
CREATE TABLE job_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
job_id VARCHAR NOT NULL,
status VARCHAR(16) NOT NULL,
snapshot_params TEXT NOT NULL DEFAULT '{}',
message TEXT NOT NULL DEFAULT '',
traceback TEXT NOT NULL DEFAULT '',
run_log TEXT NOT NULL DEFAULT '',
celery_task_id VARCHAR NOT NULL DEFAULT '',
attempt INTEGER NOT NULL DEFAULT 0,
started_at DATETIME NOT NULL,
finished_at DATETIME
)
"""
)
)
def _expr(col: str, default_expr: str) -> str:
return col if col in existing_cols else f"{default_expr} AS {col}"
insert_cols = [
"id",
"job_id",
"status",
"snapshot_params",
"message",
"traceback",
"run_log",
"celery_task_id",
"attempt",
"started_at",
"finished_at",
]
select_exprs = [
_expr("id", "NULL"),
_expr("job_id", "''"),
_expr("status", "''"),
_expr("snapshot_params", "'{}'"),
_expr("message", "''"),
_expr("traceback", "''"),
_expr("run_log", "''"),
_expr("celery_task_id", "''"),
_expr("attempt", "0"),
_expr("started_at", "CURRENT_TIMESTAMP"),
_expr("finished_at", "NULL"),
]
conn.execute(
text(
f"INSERT INTO job_logs ({', '.join(insert_cols)}) "
f"SELECT {', '.join(select_exprs)} FROM job_logs_old"
)
)
conn.execute(text("DROP TABLE job_logs_old"))
# 还原 job_id 索引SQLAlchemy 默认命名 ix_job_logs_job_id
conn.execute(text("CREATE INDEX IF NOT EXISTS ix_job_logs_job_id ON job_logs (job_id)"))
return
if dialect == "postgresql":
try:
checks = insp.get_check_constraints("job_logs") or []
except Exception:
checks = []
need = False
drop_names: list[str] = []
for ck in checks:
name = str(ck.get("name") or "")
sqltext = str(ck.get("sqltext") or "")
if "status" in sqltext and "RUNNING" not in sqltext:
need = True
if name:
drop_names.append(name)
if not need:
return
# 先尽力 drop 旧约束(名称不确定),再创建统一的新约束
for n in drop_names:
conn.execute(text(f'ALTER TABLE job_logs DROP CONSTRAINT IF EXISTS "{n}"'))
conn.execute(text("ALTER TABLE job_logs DROP CONSTRAINT IF EXISTS ck_job_logs_status"))
conn.execute(
text(
"ALTER TABLE job_logs "
"ADD CONSTRAINT ck_job_logs_status "
"CHECK (status IN ('RUNNING','SUCCESS','FAILURE','RETRY'))"
)
)
return
def ensure_schema(engine: Engine) -> None:
"""
SQLite 轻量自升级
- create_all 不会更新既有表结构因此用 PRAGMA + ALTER TABLE 补列
轻量自升级 SQLite/PostgreSQL
- create_all 不会更新既有表结构因此用 inspector + ALTER TABLE 补列
- 必须保证任何失败都不影响主流程上层可选择忽略异常
"""
Base.metadata.create_all(bind=engine)
with engine.begin() as conn:
# job_logs.run_log
if not _has_column(conn, "job_logs", "run_log"):
if not _has_column(engine, "job_logs", "run_log"):
conn.execute(text("ALTER TABLE job_logs ADD COLUMN run_log TEXT NOT NULL DEFAULT ''"))
# job_logs.status: ensure new enum value RUNNING is accepted by DB constraints
_ensure_job_logs_status_allows_running(engine)

View File

@ -1,5 +1,7 @@
"""系统集成适配器"""
from app.integrations.base import BaseClient
from app.integrations.didi import DidiClient
from app.integrations.seeyon import SeeyonClient
__all__ = ["BaseClient"]
__all__ = ["BaseClient", "DidiClient", "SeeyonClient"]

View File

@ -42,11 +42,13 @@ class BaseClient:
def request(self, method: str, path: str, **kwargs: Any) -> httpx.Response:
url = path if path.startswith("/") else f"/{path}"
extra_headers = kwargs.pop("headers", None) or {}
merged_headers = {**self.headers, **extra_headers} if extra_headers else None
last_exc: Exception | None = None
for attempt in range(self.retries + 1):
try:
start = time.time()
resp = self._client.request(method=method, url=url, **kwargs)
resp = self._client.request(method=method, url=url, headers=merged_headers, **kwargs)
elapsed_ms = int((time.time() - start) * 1000)
logger.info("HTTP %s %s -> %s (%sms)", method, url, resp.status_code, elapsed_ms)
resp.raise_for_status()

411
app/integrations/didi.py Normal file
View File

@ -0,0 +1,411 @@
from __future__ import annotations
import hashlib
import json as jsonlib
import logging
import time
from typing import Any
import httpx
from app.integrations.base import BaseClient
logger = logging.getLogger("connecthub.integrations.didi")
def _contains_unsupported_sign_chars(s: str) -> bool:
# 文档提示:签名中不支持 \0 \t \n \x0B \r 以及空格进行加密处理。
# 这里仅做检测与告警,不自动清洗,避免服务端/客户端不一致。
return any(ch in s for ch in ("\0", "\t", "\n", "\x0b", "\r", " "))
class DidiClient(BaseClient):
"""
滴滴管理 API Client2024
- POST /river/Auth/authorize 获取 access_token建议缓存半小时401 刷新后重试一次
- 按文档规则生成 sign默认 MD5
参考
- https://opendocs.xiaojukeji.com/version2024/10951
- https://opendocs.xiaojukeji.com/version2024/10945
"""
def __init__(
self,
*,
base_url: str,
client_id: str,
client_secret: str,
sign_key: str,
grant_type: str = "client_credentials",
token_skew_s: int = 30,
timeout_s: float = 10.0,
retries: int = 2,
retry_backoff_s: float = 0.5,
headers: dict[str, str] | None = None,
) -> None:
super().__init__(
base_url=base_url,
timeout_s=timeout_s,
retries=retries,
retry_backoff_s=retry_backoff_s,
headers=headers,
)
self.client_id = client_id
self.client_secret = client_secret
self.sign_key = sign_key
self.grant_type = grant_type
self.token_skew_s = token_skew_s
self._access_token: str | None = None
self._token_expires_at: float | None = None
self._token_type: str | None = None
def gen_sign(self, params: dict[str, Any], *, sign_method: str = "md5") -> str:
"""
签名算法默认 MD5
1) sign_key 加入参与签名参数不参与传递仅参与计算
2) 参数名升序排序
3) & 连接成 a=xxx&b=yyy...
4) md5/sha256 得到 sign小写 hex
文档https://opendocs.xiaojukeji.com/version2024/10945
"""
if sign_method.lower() != "md5":
raise ValueError("Only md5 sign_method is supported in this client (default)")
p = dict(params or {})
p["sign_key"] = self.sign_key
# 排序并拼接
items: list[tuple[str, str]] = []
for k in sorted(p.keys()):
v = p.get(k)
sv = "" if v is None else str(v).strip()
items.append((str(k), sv))
sign_str = "&".join([f"{k}={v}" for k, v in items])
if _contains_unsupported_sign_chars(sign_str):
logger.warning("Didi sign_str contains unsupported chars per docs (signing anyway)")
return hashlib.md5(sign_str.encode("utf-8")).hexdigest()
def authorize(self) -> str:
"""
授权获取 access_token
POST /river/Auth/authorize
文档https://opendocs.xiaojukeji.com/version2024/10951
"""
ts = int(time.time())
body: dict[str, Any] = {
"client_id": self.client_id,
"client_secret": self.client_secret,
"grant_type": self.grant_type,
"timestamp": ts,
}
body["sign"] = self.gen_sign(body)
resp = super().request(
"POST",
"/river/Auth/authorize",
json=body,
headers={"Content-Type": "application/json"},
)
data = resp.json() if resp.content else {}
access_token = str(data.get("access_token", "") or "")
expires_in = int(data.get("expires_in", 0) or 0)
token_type = str(data.get("token_type", "") or "Bearer")
if not access_token:
raise RuntimeError("Didi authorize failed (access_token missing)")
now = time.time()
skew = max(0, int(self.token_skew_s or 0))
# expires_in 单位秒;按文档通常为 1800
self._access_token = access_token
self._token_type = token_type
self._token_expires_at = now + max(0, expires_in - skew)
logger.info("Didi access_token acquired (cached) expires_in=%s token_type=%s", expires_in, token_type)
return access_token
def _get_access_token(self) -> str:
now = time.time()
if self._access_token and self._token_expires_at and now < self._token_expires_at:
return self._access_token
return self.authorize()
def _build_signed_query(self, *, company_id: str, extra_params: dict[str, Any]) -> dict[str, Any]:
"""
构造参与签名且实际传递 query 参数不包含 sign_key
- client_id/access_token/company_id/timestamp + extra_params + sign
"""
token = self._get_access_token()
ts = int(time.time())
params: dict[str, Any] = {
"client_id": self.client_id,
"access_token": token,
"company_id": company_id,
"timestamp": ts,
}
if extra_params:
params.update(extra_params)
params["sign"] = self.gen_sign({k: v for k, v in params.items() if k != "sign"})
return params
@staticmethod
def _raise_if_errno(api_name: str, payload: Any) -> None:
try:
errno = payload.get("errno")
errmsg = payload.get("errmsg")
except Exception as e: # noqa: BLE001
raise RuntimeError(f"{api_name} invalid response (not a dict)") from e
if errno is None:
raise RuntimeError(f"{api_name} invalid response (errno missing)")
try:
errno_i = int(errno)
except Exception:
errno_i = -1
if errno_i != 0:
raise RuntimeError(f"{api_name} failed errno={errno} errmsg={errmsg!r}")
def get_legal_entities(
self,
*,
company_id: str,
offset: int,
length: int,
keyword: str | None = None,
legal_entity_id: str | None = None,
out_legal_entity_id: str | None = None,
) -> dict[str, Any]:
"""
公司主体查询
GET /river/LegalEntity/get
"""
extra: dict[str, Any] = {"offset": offset, "length": length}
if keyword:
extra["keyword"] = keyword
if legal_entity_id:
extra["legal_entity_id"] = legal_entity_id
if out_legal_entity_id:
extra["out_legal_entity_id"] = out_legal_entity_id
params = self._build_signed_query(company_id=company_id, extra_params=extra)
resp = super().request(
"GET",
"/river/LegalEntity/get",
params=params,
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
payload = resp.json() if resp.content else {}
self._raise_if_errno("LegalEntity.get", payload)
data = payload.get("data") or {}
if not isinstance(data, dict):
raise RuntimeError("LegalEntity.get invalid response (data not a dict)")
return data # {total, records}
def get_member_detail(
self,
*,
company_id: str,
employee_number: str | None = None,
member_id: str | None = None,
phone: str | None = None,
) -> dict[str, Any]:
"""
员工明细
GET /river/Member/detail
"""
extra: dict[str, Any] = {}
if member_id:
extra["member_id"] = member_id
elif employee_number:
extra["employee_number"] = employee_number
elif phone:
extra["phone"] = phone
else:
raise ValueError("member_id/employee_number/phone cannot all be empty")
params = self._build_signed_query(company_id=company_id, extra_params=extra)
resp = super().request(
"GET",
"/river/Member/detail",
params=params,
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
payload = resp.json() if resp.content else {}
self._raise_if_errno("Member.detail", payload)
data = payload.get("data") or {}
if not isinstance(data, dict):
raise RuntimeError("Member.detail invalid response (data not a dict)")
return data
def edit_member_legal_entity(
self,
*,
company_id: str,
member_id: str | None,
employee_number: str | None,
legal_entity_id: str,
) -> None:
"""
员工修改更新员工所在公司主体legal_entity_id
POST /river/Member/edit
"""
if not member_id and not employee_number:
raise ValueError("member_id or employee_number is required")
if not legal_entity_id:
raise ValueError("legal_entity_id is required")
token = self._get_access_token()
ts = int(time.time())
data_str = self.dumps_data_for_sign({"legal_entity_id": legal_entity_id})
body: dict[str, Any] = {
"client_id": self.client_id,
"access_token": token,
"company_id": company_id,
"timestamp": ts,
"data": data_str,
}
if member_id:
body["member_id"] = member_id
if employee_number:
body["employee_number"] = employee_number
body["sign"] = self.gen_sign({k: v for k, v in body.items() if k != "sign"})
resp = super().request(
"POST",
"/river/Member/edit",
json=body,
headers={"Content-Type": "application/json"},
)
payload = resp.json() if resp.content else {}
self._raise_if_errno("Member.edit", payload)
return None
def request_authed(
self,
method: str,
path: str,
*,
params: dict[str, Any] | None = None,
json: Any = None,
data: Any = None,
headers: dict[str, str] | None = None,
signed_params: dict[str, Any] | None = None,
**kwargs: Any,
) -> httpx.Response:
"""
统一带 token +可选签名的请求
- Authorization: Bearer <access_token>
- signed_params 提供自动补 timestamp sign并注入到 json/params/data优先注入到 dict 类型的 json其次 params再次 data否则默认注入 json dict
- 遇到 401清空 token重新 authorize 后重试一次
"""
token = self._get_access_token()
token_type = self._token_type or "Bearer"
extra_headers = dict(headers or {})
extra_headers["Authorization"] = f"{token_type} {token}"
sp: dict[str, Any] | None = None
if signed_params is not None:
sp = dict(signed_params)
if "timestamp" not in sp:
sp["timestamp"] = int(time.time())
# 如该接口签名参数包含 access_token则需参与签名
if "access_token" in sp and not sp.get("access_token"):
sp["access_token"] = token
sp["sign"] = self.gen_sign({k: v for k, v in sp.items() if k != "sign"})
def _inject(target_json: Any, target_params: dict[str, Any] | None, target_data: Any) -> tuple[Any, dict[str, Any] | None, Any]:
if sp is None:
return target_json, target_params, target_data
if isinstance(target_json, dict):
merged = dict(target_json)
merged.update(sp)
return merged, target_params, target_data
if isinstance(target_params, dict):
merged_p = dict(target_params)
merged_p.update(sp)
return target_json, merged_p, target_data
if isinstance(target_data, dict):
merged_d = dict(target_data)
merged_d.update(sp)
return target_json, target_params, merged_d
# 默认注入到 json dict
return dict(sp), target_params, target_data
json2, params2, data2 = _inject(json, params, data)
try:
return super().request(method, path, params=params2, json=json2, data=data2, headers=extra_headers, **kwargs)
except httpx.HTTPStatusError as e:
resp = e.response
if resp.status_code != 401:
raise
# 401token 无效或过期,刷新后仅重试一次
logger.info("Didi access_token invalid (401), refreshing and retrying once")
self._access_token = None
self._token_expires_at = None
self._token_type = None
token2 = self._get_access_token()
token_type2 = self._token_type or "Bearer"
extra_headers2 = dict(headers or {})
extra_headers2["Authorization"] = f"{token_type2} {token2}"
# 若签名参数中包含 access_token需要更新并重新计算 sign
if signed_params is not None:
sp2 = dict(signed_params)
if "timestamp" not in sp2:
sp2["timestamp"] = int(time.time())
if "access_token" in sp2:
sp2["access_token"] = token2
sp2["sign"] = self.gen_sign({k: v for k, v in sp2.items() if k != "sign"})
json2_retry, params2_retry, data2_retry = _inject(json, params, data)
# _inject 使用闭包 sp这里临时覆盖行为以避免额外结构改动
if isinstance(json, dict):
json2_retry = dict(json)
json2_retry.update(sp2)
elif isinstance(params, dict):
params2_retry = dict(params)
params2_retry.update(sp2)
elif isinstance(data, dict):
data2_retry = dict(data)
data2_retry.update(sp2)
else:
json2_retry = dict(sp2)
return super().request(
method,
path,
params=params2_retry,
json=json2_retry,
data=data2_retry,
headers=extra_headers2,
**kwargs,
)
return super().request(method, path, params=params2, json=json2, data=data2, headers=extra_headers2, **kwargs)
def post_signed_json(self, path: str, *, body: dict[str, Any]) -> httpx.Response:
"""
便捷方法JSON POST + 自动补 timestamp/sign + 自动带 Authorization
注意 body 内包含复杂字段例如 data 为对象建议调用方先 json.dumps(...) 成字符串再参与签名
"""
if not isinstance(body, dict):
raise ValueError("body must be a dict")
return self.request_authed(
"POST",
path,
json=body,
signed_params=body,
headers={"Content-Type": "application/json"},
)
@staticmethod
def dumps_data_for_sign(data_obj: Any) -> str:
"""
将复杂 data 对象序列化为参与签名的字符串紧凑 JSON以贴近文档示例
"""
return jsonlib.dumps(data_obj, ensure_ascii=False, separators=(",", ":"))

116
app/integrations/seeyon.py Normal file
View File

@ -0,0 +1,116 @@
from __future__ import annotations
import logging
from typing import Any
import httpx
from app.integrations.base import BaseClient
logger = logging.getLogger("connecthub.integrations.seeyon")
class SeeyonClient(BaseClient):
"""
致远 OA REST Client
- POST /seeyon/rest/token 获取 tokenid
- 业务请求 header 自动携带 token
- 遇到 401/Invalid token 自动刷新 token 并重试一次
"""
def __init__(self, *, base_url: str, rest_user: str, rest_password: str, loginName: str | None = None) -> None:
super().__init__(base_url=base_url)
self.rest_user = rest_user
self.rest_password = rest_password
self.loginName = loginName
self._token: str | None = None
def authenticate(self) -> str:
body: dict[str, Any] = {
"userName": self.rest_user,
"password": self.rest_password,
}
if self.loginName:
body["loginName"] = self.loginName
# 文档POST /seeyon/rest/token
resp = super().request(
"POST",
"/seeyon/rest/token",
json=body,
headers={"Accept": "application/json", "Content-Type": "application/json"},
)
data = resp.json()
token = str(data.get("id", "") or "")
if not token or token == "-1":
raise RuntimeError("Seeyon auth failed (token id missing or -1)")
self._token = token
logger.info("Seeyon token acquired")
return token
def _get_token(self) -> str:
return self._token or self.authenticate()
def request(self, method: str, path: str, **kwargs: Any) -> httpx.Response: # type: ignore[override]
token = self._get_token()
headers = dict(kwargs.pop("headers", {}) or {})
headers["token"] = token
try:
return super().request(method, path, headers=headers, **kwargs)
except httpx.HTTPStatusError as e:
# token 失效401 或返回包含 Invalid token
resp = e.response
text = ""
try:
text = resp.text or ""
except Exception:
text = ""
if resp.status_code == 401 or ("Invalid token" in text):
logger.info("Seeyon token invalid, refreshing and retrying once")
self._token = None
token2 = self._get_token()
headers["token"] = token2
# 仅重试一次;仍失败则抛出
return super().request(method, path, headers=headers, **kwargs)
raise
def export_cap4_form_soap(
self,
*,
templateCode: str,
senderLoginName: str | None = None,
rightId: str | None = None,
doTrigger: str | bool | None = None,
param: str | None = None,
extra: dict[str, Any] | None = None,
) -> httpx.Response:
"""
无流程表单导出CAP4
POST /seeyon/rest/cap4/form/soap/export
返回 httpx.Response调用方可自行读取 resp.text / resp.headers 等信息
"""
body: dict[str, Any] = {"templateCode": templateCode}
if senderLoginName:
body["senderLoginName"] = senderLoginName
if rightId:
body["rightId"] = rightId
if doTrigger is not None:
body["doTrigger"] = doTrigger
if param is not None:
body["param"] = param
if extra:
# 兜底扩展字段:仅当 key 不冲突时注入,避免覆盖已显式指定的参数
for k, v in extra.items():
if k not in body:
body[k] = v
return self.request(
"POST",
"/seeyon/rest/cap4/form/soap/export",
json=body,
headers={"Content-Type": "application/json"},
)

View File

@ -54,6 +54,29 @@ def encrypt_json(obj: dict[str, Any]) -> str:
def decrypt_json(token: str) -> dict[str, Any]:
if not token:
return {}
token = token.strip()
# 常见脏数据:被包了引号
if (token.startswith('"') and token.endswith('"')) or (token.startswith("'") and token.endswith("'")):
token = token[1:-1].strip()
# 常见脏数据:中间混入换行/空白(复制粘贴/渲染导致)
token = "".join(token.split())
# 兼容:历史/手工输入导致误存明文 JSON
if token.startswith("{"):
try:
obj = json.loads(token)
if isinstance(obj, dict):
return obj
except Exception:
pass
# 兼容:末尾 padding '=' 被裁剪导致 base64 解码失败len % 4 != 0
data_len = len(token.rstrip("="))
# base64 非 padding 字符长度为 4n+1 时不可恢复:大概率是 token 被截断/丢字符
if data_len % 4 == 1:
raise ValueError("Invalid secret_cfg token (looks truncated). Please re-save secret_cfg to re-encrypt.")
if token and (len(token) % 4) != 0:
token = token + ("=" * (-len(token) % 4))
try:
raw = _fernet().decrypt(token.encode("utf-8"))
except InvalidToken as e:

View File

@ -45,6 +45,15 @@ def tick() -> dict[str, int]:
last_min = _floor_to_minute(last.replace(tzinfo=tz))
else:
last_min = _floor_to_minute(last.astimezone(tz))
# 防御:若 last_run_at 被错误写成 UTC 等导致“在未来”,则忽略该值避免任务永久不触发
if last_min > now_min:
logger.warning(
"job.last_run_at appears in the future (ignored) job_id=%s last_run_at=%s now=%s",
job.id,
last,
now,
)
else:
if last_min >= now_min:
continue
@ -56,7 +65,8 @@ def tick() -> dict[str, int]:
continue
execute_job.delay(job_id=job.id)
crud.update_job_last_run_at(session, job.id, now_min.replace(tzinfo=None))
# 写入时保留 tz 信息,避免在 PostgreSQL timestamptz 中被误当 UTC 导致“未来 last_run_at”
crud.update_job_last_run_at(session, job.id, now_min)
triggered += 1
except Exception: # noqa: BLE001

View File

@ -1,12 +1,16 @@
from __future__ import annotations
import logging
import os
import traceback as tb
from datetime import datetime
from typing import Any
from zoneinfo import ZoneInfo
from app.core.log_capture import capture_logs
from app.core.logging import setup_logging
from app.core.log_context import clear_job_context, set_job_context
from app.core.config import settings
from app.db import crud
from app.db.engine import engine, get_session
from app.db.models import JobStatus
@ -18,9 +22,58 @@ from app.tasks.celery_app import celery_app
logger = logging.getLogger("connecthub.tasks.execute")
_MAX_MESSAGE_WARNING_LINES = 200
_MAX_MESSAGE_CHARS = 50_000
def _extract_warning_lines(run_log_text: str) -> list[str]:
"""
run_log 文本里提取 WARNING 保留原始行文本
capture_logs 的格式为'%(asctime)s %(levelname)s %(name)s %(message)s'
"""
run_log_text = run_log_text or ""
lines = run_log_text.splitlines()
return [ln for ln in lines if " WARNING " in f" {ln} "]
def _compose_message(base_message: str, warning_lines: list[str]) -> str:
"""
base_message + warnings(具体内容) + summary并做截断保护
"""
base_message = base_message or ""
warning_lines = warning_lines or []
parts: list[str] = [base_message]
if warning_lines:
parts.append(f"WARNINGS ({len(warning_lines)}):")
if len(warning_lines) <= _MAX_MESSAGE_WARNING_LINES:
parts.extend(warning_lines)
else:
parts.extend(warning_lines[:_MAX_MESSAGE_WARNING_LINES])
parts.append(f"[TRUNCATED] warnings exceeded {_MAX_MESSAGE_WARNING_LINES} lines")
parts.append(f"SUMMARY: warnings={len(warning_lines)}")
msg = "\n".join([p for p in parts if p is not None])
if len(msg) > _MAX_MESSAGE_CHARS:
msg = msg[: _MAX_MESSAGE_CHARS - 64] + "\n[TRUNCATED] message exceeded 50000 chars"
return msg
def _safe_job_dir_name(job_id: str) -> str:
"""
job_id 映射为安全的目录名避免路径分隔符造成目录穿越/嵌套
"""
s = (job_id or "").strip() or "unknown"
return s.replace("/", "_").replace("\\", "_")
@celery_app.task(bind=True, name="connecthub.execute_job")
def execute_job(self, job_id: str | None = None, snapshot_params: dict[str, Any] | None = None) -> dict[str, Any]:
def execute_job(
self,
job_id: str | None = None,
snapshot_params: dict[str, Any] | None = None,
log_id: int | None = None,
) -> dict[str, Any]:
"""
通用执行入口
- job_id DB 读取 Job 定义
@ -42,9 +95,11 @@ def execute_job(self, job_id: str | None = None, snapshot_params: dict[str, Any]
traceback = ""
result: dict[str, Any] = {}
run_log_text = ""
job_log_id: int | None = log_id
celery_task_id = getattr(self.request, "id", "") or ""
attempt = int(getattr(self.request, "retries", 0) or 0)
snapshot: dict[str, Any] = {}
try:
with capture_logs(max_bytes=200_000) as get_run_log:
try:
if snapshot_params:
job_id = snapshot_params["job_id"]
@ -61,6 +116,58 @@ def execute_job(self, job_id: str | None = None, snapshot_params: dict[str, Any]
public_cfg = job.public_cfg or {}
secret_token = job.secret_cfg or ""
snapshot = snapshot_params or {
"job_id": job_id,
"handler_path": handler_path,
"public_cfg": public_cfg,
"secret_cfg": secret_token,
"meta": {
"trigger": "celery",
"celery_task_id": celery_task_id,
"started_at": started_at.isoformat(),
},
}
# 任务开始即落库一条 RUNNING 记录(若外部已传入 log_id则只更新该条若创建失败则降级为旧行为结束时再 create
if job_log_id is None:
try:
running = crud.create_job_log(
session,
job_id=str(job_id or ""),
status=JobStatus.RUNNING,
snapshot_params=snapshot,
message="运行中",
traceback="",
run_log="",
celery_task_id=celery_task_id,
attempt=attempt,
started_at=started_at,
finished_at=None,
)
job_log_id = int(running.id)
except Exception:
job_log_id = None
# per-run 全量日志落盘best-effort。若 job_log_id 缺失则无法保证唯一性,直接跳过。
per_run_log_path: str | None = None
if job_log_id is not None and job_id:
try:
log_root = settings.log_dir or os.path.join(settings.data_dir, "logs")
job_dir = os.path.join(log_root, _safe_job_dir_name(str(job_id)))
os.makedirs(job_dir, exist_ok=True)
tz = ZoneInfo("Asia/Shanghai")
ts = datetime.now(tz).strftime("%Y-%m-%d_%H-%M-%S")
per_run_log_path = os.path.join(job_dir, f"{ts}_log-{int(job_log_id)}.log")
except Exception:
per_run_log_path = None
logger.warning("prepare per-run log file failed job_id=%s log_id=%s", job_id, job_log_id)
ctx_tokens = None
with capture_logs(max_bytes=200_000, job_log_id=job_log_id, file_path=per_run_log_path) as get_run_log:
try:
if job_log_id is not None and job_id:
ctx_tokens = set_job_context(job_id=str(job_id), job_log_id=int(job_log_id))
secrets = decrypt_json(secret_token)
job_instance = instantiate(handler_path)
out = job_instance.run(params=public_cfg, secrets=secrets)
@ -75,12 +182,34 @@ def execute_job(self, job_id: str | None = None, snapshot_params: dict[str, Any]
traceback = tb.format_exc()
logger.exception("execute_job failed job_id=%s", job_id)
finally:
try:
clear_job_context(ctx_tokens)
except Exception:
# best-effort不能影响任务执行
pass
try:
run_log_text = get_run_log() or ""
except Exception:
run_log_text = ""
finally:
finished_at = datetime.utcnow()
warning_lines = _extract_warning_lines(run_log_text)
message = _compose_message(message, warning_lines)
# 结束时:优先更新 RUNNING 那条;若没有则创建最终记录
if job_log_id is not None:
crud.update_job_log(
session,
job_log_id,
status=status,
message=message,
traceback=traceback,
run_log=run_log_text,
celery_task_id=celery_task_id,
attempt=attempt,
finished_at=finished_at,
)
else:
if not snapshot:
snapshot = snapshot_params or {
"job_id": job_id,
"handler_path": handler_path if "handler_path" in locals() else "",
@ -88,7 +217,7 @@ def execute_job(self, job_id: str | None = None, snapshot_params: dict[str, Any]
"secret_cfg": secret_token if "secret_token" in locals() else "",
"meta": {
"trigger": "celery",
"celery_task_id": getattr(self.request, "id", "") or "",
"celery_task_id": celery_task_id,
"started_at": started_at.isoformat(),
},
}
@ -100,8 +229,8 @@ def execute_job(self, job_id: str | None = None, snapshot_params: dict[str, Any]
message=message,
traceback=traceback,
run_log=run_log_text,
celery_task_id=getattr(self.request, "id", "") or "",
attempt=int(getattr(self.request, "retries", 0) or 0),
celery_task_id=celery_task_id,
attempt=attempt,
started_at=started_at,
finished_at=finished_at,
)

Binary file not shown.

File diff suppressed because it is too large Load Diff

View File

@ -23,5 +23,7 @@ services:
- ./extensions:/app/extensions
command: >
sh -c "watchfiles --filter python 'celery -A app.tasks.celery_app:celery_app beat --loglevel=INFO' /app/app /app/extensions"
postgres:
ports:
- "5432:5432"

View File

@ -4,6 +4,18 @@ services:
ports:
- "6379:6379"
postgres:
image: postgres:16-alpine
environment:
POSTGRES_DB: connecthub
POSTGRES_USER: connecthub
POSTGRES_PASSWORD: connecthub_pwd_change_me
volumes:
- ./data/pgdata:/var/lib/postgresql/data
# 如需宿主机直连可打开该映射
ports:
- "5432:5432"
backend:
build:
context: .
@ -18,6 +30,7 @@ services:
sh -c "if [ \"${DEV_MODE:-0}\" = \"1\" ]; then uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload; else uvicorn app.main:app --host 0.0.0.0 --port 8000; fi"
depends_on:
- redis
- postgres
worker:
build:
@ -31,6 +44,7 @@ services:
sh -c "celery -A app.tasks.celery_app:celery_app worker --loglevel=INFO"
depends_on:
- redis
- postgres
beat:
build:
@ -44,5 +58,6 @@ services:
sh -c "celery -A app.tasks.celery_app:celery_app beat --loglevel=INFO"
depends_on:
- redis
- postgres

View File

@ -5,7 +5,22 @@ ENV PYTHONDONTWRITEBYTECODE=1 \
WORKDIR /app
RUN apt-get update && apt-get install -y --no-install-recommends \
# APT 镜像源(默认使用清华 TUNA如需切回官方源可在 build 时覆盖该参数)
# 示例docker build --build-arg APT_MIRROR=deb.debian.org -t connecthub .
ARG APT_MIRROR=mirrors.tuna.tsinghua.edu.cn
RUN set -eux; \
# 兼容 Debian 新旧 sources 格式(/etc/apt/sources.list 或 deb822 的 /etc/apt/sources.list.d/debian.sources
if [ -f /etc/apt/sources.list ]; then \
sed -i "s|http://deb.debian.org/debian|https://${APT_MIRROR}/debian|g" /etc/apt/sources.list; \
sed -i "s|http://security.debian.org/debian-security|https://${APT_MIRROR}/debian-security|g" /etc/apt/sources.list; \
fi; \
if [ -f /etc/apt/sources.list.d/debian.sources ]; then \
sed -i "s|URIs: http://deb.debian.org/debian|URIs: https://${APT_MIRROR}/debian|g" /etc/apt/sources.list.d/debian.sources; \
sed -i "s|URIs: http://security.debian.org/debian-security|URIs: https://${APT_MIRROR}/debian-security|g" /etc/apt/sources.list.d/debian.sources; \
fi; \
apt-get update; \
apt-get install -y --no-install-recommends \
build-essential \
&& rm -rf /var/lib/apt/lists/*

View File

@ -1,6 +1,6 @@
APP_NAME=ConnectHub
DATA_DIR=/data
DB_URL=sqlite:////data/connecthub.db
DB_URL=postgresql+psycopg://connecthub:connecthub_pwd_change_me@postgres:5432/connecthub
REDIS_URL=redis://redis:6379/0
FERNET_KEY_PATH=/data/fernet.key
DEV_MODE=1

View File

@ -1,3 +0,0 @@
#

View File

@ -1,16 +0,0 @@
from __future__ import annotations
from app.integrations.base import BaseClient
class ExampleClient(BaseClient):
"""
演示用 Client真实业务中应封装 OA/HR/ERP API
这里不做实际外部请求仅保留结构与调用方式
"""
def ping(self) -> dict:
# 真实情况return self.get_json("/ping")
return {"ok": True}

View File

@ -1,33 +0,0 @@
from __future__ import annotations
import logging
from typing import Any
from app.jobs.base import BaseJob
from extensions.example.client import ExampleClient
logger = logging.getLogger("connecthub.extensions.example")
class ExampleJob(BaseJob):
job_id = "example.hello"
def run(self, params: dict[str, Any], secrets: dict[str, Any]) -> dict[str, Any]:
# params: 明文配置,例如 {"name": "Mars"}
name = params.get("name", "World")
# secrets: 解密后的明文,例如 {"token": "..."}
token = secrets.get("token", "<missing>")
client = ExampleClient(base_url="https://baidu.com", headers={"Authorization": f"Bearer {token}"})
try:
pong = client.ping()
finally:
client.close()
logger.info("ExampleJob ran name=%s pong=%s", name, pong)
return {"hello": name, "pong": pong}

View File

@ -0,0 +1,3 @@
#
"""OA 到滴滴的同步任务包"""

View File

@ -0,0 +1,429 @@
from __future__ import annotations
import json
import logging
import time
from typing import Any
from app.integrations.didi import DidiClient
from app.integrations.seeyon import SeeyonClient
from app.jobs.base import BaseJob
logger = logging.getLogger("connecthub.extensions.sync_oa_to_didi")
def _mask_token(token: str) -> str:
token = token or ""
if len(token) <= 12:
return "***"
return f"{token[:6]}***{token[-4:]}"
def _log_text_in_chunks(*, prefix: str, text: str, chunk_bytes: int = 8_000) -> None:
"""
将大文本尽可能写入 run_log
- UTF-8 字节切分避免单条日志过大导致整条无法写入capture_logs 会在超过 max_bytes 时丢弃整条并标记截断
- 由上层 capture_logs(max_bytes=200_000) 负责总量截断
"""
try:
if not text:
logger.info("%s <empty>", prefix)
return
if chunk_bytes <= 0:
chunk_bytes = 8_000
raw_bytes = text.encode("utf-8", errors="replace")
total = (len(raw_bytes) + chunk_bytes - 1) // chunk_bytes
for i in range(total):
b = raw_bytes[i * chunk_bytes : (i + 1) * chunk_bytes]
chunk = b.decode("utf-8", errors="replace")
logger.info("%s chunk %s/%s: %s", prefix, i + 1, total, chunk)
except Exception:
# run_log 捕获属于“尽力而为”,任何异常都不应影响任务执行
return
class SyncOAToDidiTokenJob(BaseJob):
"""
示例 Job演示致远 OA token 获取与日志记录
public_cfg:
- base_url: "https://oa.example.com"
secret_cfg (解密后):
- rest_user
- rest_password
- loginName (可选)
"""
job_id = "sync_oa_to_didi.token_demo"
def run(self, params: dict[str, Any], secrets: dict[str, Any]) -> dict[str, Any]:
base_url = str(params.get("base_url") or "").strip()
if not base_url:
raise ValueError("public_cfg.base_url is required")
rest_user = str(secrets.get("rest_user") or "").strip()
rest_password = str(secrets.get("rest_password") or "").strip()
login_name = secrets.get("loginName")
login_name = str(login_name).strip() if login_name else None
if not rest_user or not rest_password:
raise ValueError("secret_cfg.rest_user and secret_cfg.rest_password are required")
client = SeeyonClient(base_url=base_url, rest_user=rest_user, rest_password=rest_password, loginName=login_name)
try:
token = client.authenticate()
finally:
client.close()
masked = _mask_token(token)
logger.info("Seeyon token acquired (masked) token=%s loginName=%s base_url=%s", masked, login_name, base_url)
return {"token_masked": masked, "loginName": login_name or "", "base_url": base_url}
class SyncOAToDidiExportFormJob(BaseJob):
"""
无流程表单导出CAP4
- 调用POST /seeyon/rest/cap4/form/soap/export
- base_url 不包含 /seeyon/rest例如 https://oa.example.com:8090
public_cfg:
- base_url: "https://oad.example.com:8090"
- templateCode: "employee"
- senderLoginName: "xxx" (可选)
- rightId: "xxx" (可选)
- doTrigger: "true" (可选)
- param: "0" (可选)
- extra: {...} (可选兜底扩展字段)
secret_cfg (解密后):
- rest_user
- rest_password
- loginName
"""
job_id = "sync_oa_to_didi.export_form_soap"
def run(self, params: dict[str, Any], secrets: dict[str, Any]) -> dict[str, Any]:
base_url = str(params.get("base_url") or "").strip()
if not base_url:
raise ValueError("public_cfg.base_url is required")
template_code = str(params.get("templateCode") or "").strip()
if not template_code:
raise ValueError("public_cfg.templateCode is required")
sender_login_name = params.get("senderLoginName")
sender_login_name = str(sender_login_name).strip() if sender_login_name else None
right_id = params.get("rightId")
right_id = str(right_id).strip() if right_id else None
do_trigger = params.get("doTrigger")
param = params.get("param")
param = str(param) if param is not None else None
extra = params.get("extra")
if extra is not None and not isinstance(extra, dict):
raise ValueError("public_cfg.extra must be a JSON object (dict) if provided")
rest_user = str(secrets.get("rest_user") or "").strip()
rest_password = str(secrets.get("rest_password") or "").strip()
login_name = secrets.get("loginName")
login_name = str(login_name).strip() if login_name else None
if not rest_user or not rest_password:
raise ValueError("secret_cfg.rest_user and secret_cfg.rest_password are required")
client = SeeyonClient(base_url=base_url, rest_user=rest_user, rest_password=rest_password, loginName=login_name)
try:
resp = client.export_cap4_form_soap(
templateCode=template_code,
senderLoginName=sender_login_name,
rightId=right_id,
doTrigger=do_trigger,
param=param,
extra=extra,
)
raw_text = resp.text or ""
content_type = resp.headers.get("content-type", "") if getattr(resp, "headers", None) else ""
finally:
client.close()
# 避免把 raw_text 打到日志或 run_log会被截断且污染 JobLog
logger.info(
"Seeyon export_form_soap done templateCode=%s content_length=%s content_type=%s base_url=%s",
template_code,
len(raw_text),
content_type,
base_url,
)
_log_text_in_chunks(prefix="Seeyon export_form_soap raw", text=raw_text, chunk_bytes=8_000)
return {
"raw": raw_text,
"meta": {
"templateCode": template_code,
"content_length": len(raw_text),
"content_type": content_type,
},
}
class SyncOAToDidiLegalEntitySyncJob(BaseJob):
"""
OA 无流程表单导出中读取工号/所属公司并同步到滴滴
- 公司主体GET /river/LegalEntity/getkeyword=namename 完全相等优先
- 员工查询GET /river/Member/detailemployee_number=工号
- 员工更新POST /river/Member/edit更新 legal_entity_id
public_cfg:
- oa_base_url: "https://oa.example.com:8090"
- oa_templateCode: "employee"
- didi_base_url: "https://api.es.xiaojukeji.com"
- senderLoginName/rightId/doTrigger/param/extra: 可选透传到 OA 导出
secret_cfg (解密后):
- rest_user/rest_password/loginName: OA 登录
- company_id/client_id/client_secret/sign_key: 滴滴凭证
- phone: 可选 Job 不使用
"""
job_id = "sync_oa_to_didi.sync_legal_entity"
def run(self, params: dict[str, Any], secrets: dict[str, Any]) -> dict[str, Any]:
oa_base_url = str(params.get("oa_base_url") or "").strip()
if not oa_base_url:
raise ValueError("public_cfg.oa_base_url is required")
oa_template_code = str(params.get("oa_templateCode") or "").strip()
if not oa_template_code:
raise ValueError("public_cfg.oa_templateCode is required")
didi_base_url = str(params.get("didi_base_url") or "").strip()
if not didi_base_url:
raise ValueError("public_cfg.didi_base_url is required")
rest_user = str(secrets.get("rest_user") or "").strip()
rest_password = str(secrets.get("rest_password") or "").strip()
login_name = secrets.get("loginName")
login_name = str(login_name).strip() if login_name else None
if not rest_user or not rest_password:
raise ValueError("secret_cfg.rest_user and secret_cfg.rest_password are required")
company_id = str(secrets.get("company_id") or "").strip()
client_id = str(secrets.get("client_id") or "").strip()
client_secret = str(secrets.get("client_secret") or "").strip()
sign_key = str(secrets.get("sign_key") or "").strip()
if not company_id or not client_id or not client_secret or not sign_key:
raise ValueError("secret_cfg.company_id/client_id/client_secret/sign_key are required")
sender_login_name = params.get("senderLoginName")
sender_login_name = str(sender_login_name).strip() if sender_login_name else None
right_id = params.get("rightId")
right_id = str(right_id).strip() if right_id else None
do_trigger = params.get("doTrigger")
param = params.get("param")
param = str(param) if param is not None else None
extra = params.get("extra")
if extra is not None and not isinstance(extra, dict):
raise ValueError("public_cfg.extra must be a JSON object (dict) if provided")
# 致远 OA 客户端
seeyon = SeeyonClient(base_url=oa_base_url, rest_user=rest_user, rest_password=rest_password, loginName=login_name)
try:
resp = seeyon.export_cap4_form_soap(
templateCode=oa_template_code,
senderLoginName=sender_login_name,
rightId=right_id,
doTrigger=do_trigger,
param=param,
extra=extra,
)
raw = resp.text or ""
finally:
seeyon.close()
payload = json.loads(raw) if raw else {}
outer = payload.get("data") or {}
# 成功返回时 outer["data"] 即为表单对象:{"definition": {...}, "data": [...]}
form = outer.get("data") or {}
if not isinstance(form, dict):
raise RuntimeError("OA export invalid: data.data is not an object (dict)")
definition = form.get("definition") or {}
fields = definition.get("fields") or []
if not isinstance(fields, list):
raise RuntimeError("OA export invalid: definition.fields is not a list")
emp_field: str | None = None
company_field: str | None = None
sync_field: str | None = None
for f in fields:
if not isinstance(f, dict):
continue
display = str(f.get("display") or "")
name = str(f.get("name") or "")
if display == "工号" and name:
emp_field = name
if display == "所属公司" and name:
company_field = name
if display == "是否同步滴滴" and name:
sync_field = name
if not emp_field or not company_field:
raise RuntimeError("OA export invalid: cannot locate fields for 工号/所属公司 in definition.fields")
rows = form.get("data") or []
if not isinstance(rows, list):
raise RuntimeError("OA export invalid: data is not a list")
logger.info("开始同步OA->滴滴 模板=%s 总行数=%s", oa_template_code, len(rows))
didi = DidiClient(
base_url=didi_base_url,
client_id=client_id,
client_secret=client_secret,
sign_key=sign_key,
)
try:
cache_legal_entity: dict[str, str] = {}
total_rows = 0
updated = 0
skipped = 0
errors: list[str] = []
warn_count = 0
for row in rows:
total_rows += 1
if not isinstance(row, dict):
skipped += 1
warn_count += 1
logger.warning("跳过OA 行数据不是对象(dict)")
continue
master = row.get("masterData") or {}
if not isinstance(master, dict):
skipped += 1
warn_count += 1
logger.warning("跳过OA 行 masterData 不是对象(dict)")
continue
emp_obj = master.get(emp_field) or {}
comp_obj = master.get(company_field) or {}
emp_no = ""
comp_name = ""
if isinstance(emp_obj, dict):
emp_no = str(emp_obj.get("value") or emp_obj.get("showValue") or "").strip()
if isinstance(comp_obj, dict):
comp_name = str(comp_obj.get("value") or comp_obj.get("showValue") or "").strip()
if not emp_no or not comp_name:
skipped += 1
warn_count += 1
logger.warning("跳过:缺少工号或所属公司 employee_number=%r company_name=%r", emp_no, comp_name)
continue
# 是否同步滴滴:字段存在且值为 "N" 则跳过;字段不存在则默认继续(兼容旧表单)
if sync_field:
sync_obj = master.get(sync_field) or {}
sync_val = ""
if isinstance(sync_obj, dict):
sync_val = str(sync_obj.get("value") or sync_obj.get("showValue") or "").strip()
if sync_val == "N":
skipped += 1
warn_count += 1
logger.warning("跳过:是否同步滴滴=N employee_number=%s company_name=%s", emp_no, comp_name)
continue
logger.info("正在处理:工号=%s 所属公司=%s", emp_no, comp_name)
# 公司主体匹配(进程内缓存)
legal_entity_id = cache_legal_entity.get(comp_name)
if not legal_entity_id:
try:
logger.info("正在查询公司主体name=%s", comp_name)
data = didi.get_legal_entities(company_id=company_id, offset=0, length=100, keyword=comp_name)
records = data.get("records") or []
if not isinstance(records, list):
raise RuntimeError("LegalEntity.get invalid: records not a list")
matches = [r for r in records if isinstance(r, dict) and str(r.get("name") or "") == comp_name]
if len(matches) == 0:
skipped += 1
warn_count += 1
logger.warning("跳过:滴滴公司主体无精确匹配 company_name=%r employee_number=%s", comp_name, emp_no)
continue
if len(matches) > 1:
skipped += 1
msg = f"Multiple exact legal entity matches company_name={comp_name!r} count={len(matches)}"
errors.append(msg)
logger.error("跳过:滴滴公司主体精确匹配多条 company_name=%r count=%s employee_number=%s", comp_name, len(matches), emp_no)
continue
legal_entity_id = str(matches[0].get("legal_entity_id") or "").strip()
if not legal_entity_id:
skipped += 1
warn_count += 1
logger.warning("跳过:滴滴公司主体 legal_entity_id 为空 company_name=%r", comp_name)
continue
cache_legal_entity[comp_name] = legal_entity_id
logger.info("公司主体匹配成功name=%s legal_entity_id=%s", comp_name, legal_entity_id)
except Exception as e: # noqa: BLE001
skipped += 1
msg = f"LegalEntity lookup failed company_name={comp_name!r} err={e!r}"
errors.append(msg)
warn_count += 1
logger.warning("跳过:查询滴滴公司主体失败 company_name=%r err=%r", comp_name, e)
continue
else:
logger.info("公司主体缓存命中name=%s legal_entity_id=%s", comp_name, legal_entity_id)
# 员工查询
try:
logger.info("正在查询滴滴员工employee_number=%s", emp_no)
member = didi.get_member_detail(company_id=company_id, employee_number=emp_no)
member_id = str(member.get("member_id") or member.get("id") or "").strip()
if not member_id:
skipped += 1
warn_count += 1
logger.warning("跳过:滴滴员工明细缺少 member_id/id employee_number=%s", emp_no)
continue
logger.info("员工查询成功employee_number=%s member_id=%s", emp_no, member_id)
except Exception as e: # noqa: BLE001
skipped += 1
msg = f"Member.detail failed employee_number={emp_no!r} err={e!r}"
errors.append(msg)
warn_count += 1
logger.warning("跳过:查询滴滴员工失败 employee_number=%r err=%r", emp_no, e)
continue
# 员工更新(按文档要求:连续修改间隔 >=150ms
try:
logger.info("正在更新员工公司主体member_id=%s legal_entity_id=%s", member_id, legal_entity_id)
didi.edit_member_legal_entity(company_id=company_id, member_id=member_id, employee_number=None, legal_entity_id=legal_entity_id)
updated += 1
time.sleep(0.15)
logger.info("同步成功employee_number=%s legal_entity_id=%s", emp_no, legal_entity_id)
except Exception as e: # noqa: BLE001
skipped += 1
msg = f"Member.edit failed employee_number={emp_no!r} member_id={member_id!r} err={e!r}"
errors.append(msg)
warn_count += 1
logger.warning("同步失败employee_number=%r member_id=%r err=%r", emp_no, member_id, e)
continue
logger.info(
"同步完成:总行数=%s 成功=%s 跳过=%s warnings=%s errors=%s",
total_rows,
updated,
skipped,
warn_count,
len(errors),
)
return {
"total_rows": total_rows,
"updated_count": updated,
"skipped_count": skipped,
"errors": errors[:50],
}
finally:
didi.close()

View File

@ -8,6 +8,7 @@ dependencies = [
"uvicorn[standard]>=0.27",
"sqladmin>=0.16.1",
"sqlalchemy>=2.0",
"psycopg[binary]>=3.1",
"pydantic>=2.6",
"pydantic-settings>=2.1",
"cryptography>=41",