This commit is contained in:
Marsway 2026-01-13 02:05:39 +08:00
parent c97890e2b7
commit 86e689f453
9 changed files with 420 additions and 94 deletions

View File

@ -22,7 +22,7 @@
<div class="ms-3 d-inline-block dropdown">
<a href="#" class="btn btn-secondary dropdown-toggle" id="dropdownMenuButton1" data-bs-toggle="dropdown"
aria-expanded="false">
Export
导出
</a>
<ul class="dropdown-menu" aria-labelledby="dropdownMenuButton1">
{% for export_type in model_view.export_types %}
@ -36,7 +36,7 @@
<div class="ms-3 d-inline-block">
<a href="{{ url_for('admin:export', identity=model_view.identity, export_type=model_view.export_types[0]) }}"
class="btn btn-secondary">
Export
导出
</a>
</div>
{% endif %}
@ -44,7 +44,7 @@
{% if model_view.can_create %}
<div class="ms-3 d-inline-block">
<a href="{{ url_for('admin:create', identity=model_view.identity) }}" class="btn btn-primary">
+ New {{ model_view.name }}
+ 新建{{ model_view.name }}
</a>
</div>
{% endif %}
@ -56,14 +56,14 @@
<button {% if not model_view.can_delete and not model_view._custom_actions_in_list %} disabled {% endif %}
class="btn btn-light dropdown-toggle" type="button" id="dropdownMenuButton" data-toggle="dropdown"
aria-haspopup="true" aria-expanded="false">
Actions
操作
</button>
{% if model_view.can_delete or model_view._custom_actions_in_list %}
<div class="dropdown-menu" aria-labelledby="dropdownMenuButton">
{% if model_view.can_delete %}
<a class="dropdown-item" id="action-delete" href="#" data-name="{{ model_view.name }}"
data-url="{{ url_for('admin:delete', identity=model_view.identity) }}" data-bs-toggle="modal"
data-bs-target="#modal-delete">Delete selected items</a>
data-bs-target="#modal-delete">删除所选</a>
{% endif %}
{% for custom_action, label in model_view._custom_actions_in_list.items() %}
{% if custom_action in model_view._custom_actions_confirmation %}
@ -85,9 +85,9 @@
<div class="col-md-4 text-muted">
<div class="input-group">
<input id="search-input" type="text" class="form-control"
placeholder="Search: {{ model_view.search_placeholder() }}"
placeholder="搜索:{{ model_view.search_placeholder() }}"
value="{{ request.query_params.get('search', '') }}">
<button id="search-button" class="btn" type="button">Search</button>
<button id="search-button" class="btn" type="button">搜索</button>
<button id="search-reset" class="btn" type="button" {% if not request.query_params.get('search')
%}disabled{% endif %}><i class="fa-solid fa-times"></i></button>
</div>
@ -120,7 +120,7 @@
{% endif %}
</th>
{% endfor %}
<th>Run Now</th>
<th>立即运行</th>
</tr>
</thead>
<tbody>
@ -172,8 +172,8 @@
{% endif %}
{% endfor %}
<td>
<form class="connecthub-run-form" method="post" action="/admin/jobs/{{ get_object_identifier(row) }}/run" onsubmit="return confirm('Run this job now?');">
<button type="submit" class="btn btn-primary btn-sm">Run Now</button>
<form class="connecthub-run-form" method="post" action="/admin/jobs/{{ get_object_identifier(row) }}/run" onsubmit="return confirm('确认立即执行该任务?');">
<button type="submit" class="btn btn-primary btn-sm">立即运行</button>
</form>
</td>
</tr>
@ -182,9 +182,9 @@
</table>
</div>
<div class="card-footer d-flex justify-content-between align-items-center gap-2">
<p class="m-0 text-muted">Showing <span>{{ ((pagination.page - 1) * pagination.page_size) + 1 }}</span> to
<span>{{ min(pagination.page * pagination.page_size, pagination.count) }}</span> of <span>{{ pagination.count
}}</span> items
<p class="m-0 text-muted">显示 <span>{{ ((pagination.page - 1) * pagination.page_size) + 1 }}</span>
<span>{{ min(pagination.page * pagination.page_size, pagination.count) }}</span>,共 <span>{{ pagination.count
}}</span>
</p>
<ul class="pagination m-0 ms-auto">
<li class="page-item {% if not pagination.has_previous %}disabled{% endif %}">
@ -194,7 +194,7 @@
<a class="page-link" href="#">
{% endif %}
<i class="fa-solid fa-chevron-left"></i>
prev
上一页
</a>
</li>
{% for page_control in pagination.page_controls %}
@ -207,21 +207,21 @@
{% else %}
<a class="page-link" href="#">
{% endif %}
next
下一页
<i class="fa-solid fa-chevron-right"></i>
</a>
</li>
</ul>
<div class="dropdown text-muted">
Show
每页显示
<a href="#" class="btn btn-sm btn-light dropdown-toggle" data-toggle="dropdown" aria-haspopup="true"
aria-expanded="false">
{{ request.query_params.get("pageSize") or model_view.page_size }} / Page
{{ request.query_params.get("pageSize") or model_view.page_size }} /
</a>
<div class="dropdown-menu">
{% for page_size_option in model_view.page_size_options %}
<a class="dropdown-item" href="{{ request.url.include_query_params(pageSize=page_size_option, page=pagination.resize(page_size_option).page) }}">
{{ page_size_option }} / Page
{{ page_size_option }} /
</a>
{% endfor %}
</div>
@ -233,7 +233,7 @@
<div class="col-md-3" style="width: 300px; flex-shrink: 0;">
<div id="filter-sidebar" class="card">
<div class="card-header">
<h3 class="card-title">Filters</h3>
<h3 class="card-title">筛选</h3>
</div>
<div class="card-body">
{% for filter in model_view.get_filters() %}
@ -245,8 +245,8 @@
{% set current_op = request.query_params.get(filter.parameter_name + '_op', '') %}
{% if current_filter %}
<div class="mb-2 text-muted small">
Current: {{ current_op }} {{ current_filter }}
<a href="{{ request.url.remove_query_params(filter.parameter_name).remove_query_params(filter.parameter_name + '_op') }}" class="text-decoration-none">[Clear]</a>
当前:{{ current_op }} {{ current_filter }}
<a href="{{ request.url.remove_query_params(filter.parameter_name).remove_query_params(filter.parameter_name + '_op') }}" class="text-decoration-none">[清除]</a>
</div>
{% endif %}
<form method="get" class="d-flex flex-column" style="gap: 8px;">
@ -256,7 +256,7 @@
{% endif %}
{% endfor %}
<select name="{{ filter.parameter_name }}_op" class="form-select form-select-sm" required>
<option value="">Select operation...</option>
<option value="">选择操作...</option>
{% for op_value, op_label in filter.get_operation_options_for_model(model_view.model) %}
<option value="{{ op_value }}" {% if current_op == op_value %}selected{% endif %}>{{ op_label }}</option>
{% endfor %}
@ -267,7 +267,7 @@
class="form-control form-control-sm"
value="{{ current_filter }}"
required>
<button type="submit" class="btn btn-sm btn-outline-primary">Apply Filter</button>
<button type="submit" class="btn btn-sm btn-outline-primary">应用筛选</button>
</form>
</div>
</div>

View File

@ -22,8 +22,8 @@
<table class="table card-table table-vcenter text-nowrap table-hover table-bordered">
<thead>
<tr>
<th class="w-1">Column</th>
<th class="w-1">Value</th>
<th class="w-1">字段</th>
<th class="w-1"></th>
</tr>
</thead>
<tbody>
@ -58,12 +58,12 @@
<div class="row connecthub-action-row">
<div class="col-md-1">
<a href="{{ url_for('admin:list', identity=model_view.identity) }}" class="btn">
Go Back
返回
</a>
</div>
<div class="col-md-1">
<form method="post" action="/admin/joblogs/{{ get_object_identifier(model) }}/retry" style="display:inline;" onsubmit="return confirm('Retry this job log?');">
<button type="submit" class="btn btn-warning">Retry</button>
<form method="post" action="/admin/joblogs/{{ get_object_identifier(model) }}/retry" style="display:inline;" onsubmit="return confirm('确认重试该任务日志?');">
<button type="submit" class="btn btn-warning">重试</button>
</form>
</div>
{% if model_view.can_delete %}
@ -71,14 +71,14 @@
<a href="#" data-name="{{ model_view.name }}" data-pk="{{ get_object_identifier(model) }}"
data-url="{{ model_view._url_for_delete(request, model) }}" data-bs-toggle="modal"
data-bs-target="#modal-delete" class="btn btn-danger">
Delete
删除
</a>
</div>
{% endif %}
{% if model_view.can_edit %}
<div class="col-md-1">
<a href="{{ model_view._build_url_for('admin:edit', request, model) }}" class="btn btn-primary">
Edit
编辑
</a>
</div>
{% endif %}

View File

@ -22,7 +22,7 @@
<div class="ms-3 d-inline-block dropdown">
<a href="#" class="btn btn-secondary dropdown-toggle" id="dropdownMenuButton1" data-bs-toggle="dropdown"
aria-expanded="false">
Export
导出
</a>
<ul class="dropdown-menu" aria-labelledby="dropdownMenuButton1">
{% for export_type in model_view.export_types %}
@ -36,7 +36,7 @@
<div class="ms-3 d-inline-block">
<a href="{{ url_for('admin:export', identity=model_view.identity, export_type=model_view.export_types[0]) }}"
class="btn btn-secondary">
Export
导出
</a>
</div>
{% endif %}
@ -44,7 +44,7 @@
{% if model_view.can_create %}
<div class="ms-3 d-inline-block">
<a href="{{ url_for('admin:create', identity=model_view.identity) }}" class="btn btn-primary">
+ New {{ model_view.name }}
+ 新建{{ model_view.name }}
</a>
</div>
{% endif %}
@ -56,14 +56,14 @@
<button {% if not model_view.can_delete and not model_view._custom_actions_in_list %} disabled {% endif %}
class="btn btn-light dropdown-toggle" type="button" id="dropdownMenuButton" data-toggle="dropdown"
aria-haspopup="true" aria-expanded="false">
Actions
操作
</button>
{% if model_view.can_delete or model_view._custom_actions_in_list %}
<div class="dropdown-menu" aria-labelledby="dropdownMenuButton">
{% if model_view.can_delete %}
<a class="dropdown-item" id="action-delete" href="#" data-name="{{ model_view.name }}"
data-url="{{ url_for('admin:delete', identity=model_view.identity) }}" data-bs-toggle="modal"
data-bs-target="#modal-delete">Delete selected items</a>
data-bs-target="#modal-delete">删除所选</a>
{% endif %}
{% for custom_action, label in model_view._custom_actions_in_list.items() %}
{% if custom_action in model_view._custom_actions_confirmation %}
@ -85,9 +85,9 @@
<div class="col-md-4 text-muted">
<div class="input-group">
<input id="search-input" type="text" class="form-control"
placeholder="Search: {{ model_view.search_placeholder() }}"
placeholder="搜索:{{ model_view.search_placeholder() }}"
value="{{ request.query_params.get('search', '') }}">
<button id="search-button" class="btn" type="button">Search</button>
<button id="search-button" class="btn" type="button">搜索</button>
<button id="search-reset" class="btn" type="button" {% if not request.query_params.get('search')
%}disabled{% endif %}><i class="fa-solid fa-times"></i></button>
</div>
@ -120,7 +120,7 @@
{% endif %}
</th>
{% endfor %}
<th>Retry</th>
<th>重试</th>
</tr>
</thead>
<tbody>
@ -172,8 +172,8 @@
{% endif %}
{% endfor %}
<td>
<form class="connecthub-retry-form" method="post" action="/admin/joblogs/{{ get_object_identifier(row) }}/retry" onsubmit="return confirm('Retry this job log?');">
<button type="submit" class="btn btn-warning btn-sm">Retry</button>
<form class="connecthub-retry-form" method="post" action="/admin/joblogs/{{ get_object_identifier(row) }}/retry" onsubmit="return confirm('确认重试该任务日志?');">
<button type="submit" class="btn btn-warning btn-sm">重试</button>
</form>
</td>
</tr>
@ -182,9 +182,9 @@
</table>
</div>
<div class="card-footer d-flex justify-content-between align-items-center gap-2">
<p class="m-0 text-muted">Showing <span>{{ ((pagination.page - 1) * pagination.page_size) + 1 }}</span> to
<span>{{ min(pagination.page * pagination.page_size, pagination.count) }}</span> of <span>{{ pagination.count
}}</span> items
<p class="m-0 text-muted">显示 <span>{{ ((pagination.page - 1) * pagination.page_size) + 1 }}</span>
<span>{{ min(pagination.page * pagination.page_size, pagination.count) }}</span>,共 <span>{{ pagination.count
}}</span>
</p>
<ul class="pagination m-0 ms-auto">
<li class="page-item {% if not pagination.has_previous %}disabled{% endif %}">
@ -194,7 +194,7 @@
<a class="page-link" href="#">
{% endif %}
<i class="fa-solid fa-chevron-left"></i>
prev
上一页
</a>
</li>
{% for page_control in pagination.page_controls %}
@ -207,21 +207,21 @@
{% else %}
<a class="page-link" href="#">
{% endif %}
next
下一页
<i class="fa-solid fa-chevron-right"></i>
</a>
</li>
</ul>
<div class="dropdown text-muted">
Show
每页显示
<a href="#" class="btn btn-sm btn-light dropdown-toggle" data-toggle="dropdown" aria-haspopup="true"
aria-expanded="false">
{{ request.query_params.get("pageSize") or model_view.page_size }} / Page
{{ request.query_params.get("pageSize") or model_view.page_size }} /
</a>
<div class="dropdown-menu">
{% for page_size_option in model_view.page_size_options %}
<a class="dropdown-item" href="{{ request.url.include_query_params(pageSize=page_size_option, page=pagination.resize(page_size_option).page) }}">
{{ page_size_option }} / Page
{{ page_size_option }} /
</a>
{% endfor %}
</div>
@ -233,7 +233,7 @@
<div class="col-md-3" style="width: 300px; flex-shrink: 0;">
<div id="filter-sidebar" class="card">
<div class="card-header">
<h3 class="card-title">Filters</h3>
<h3 class="card-title">筛选</h3>
</div>
<div class="card-body">
{% for filter in model_view.get_filters() %}
@ -245,8 +245,8 @@
{% set current_op = request.query_params.get(filter.parameter_name + '_op', '') %}
{% if current_filter %}
<div class="mb-2 text-muted small">
Current: {{ current_op }} {{ current_filter }}
<a href="{{ request.url.remove_query_params(filter.parameter_name).remove_query_params(filter.parameter_name + '_op') }}" class="text-decoration-none">[Clear]</a>
当前:{{ current_op }} {{ current_filter }}
<a href="{{ request.url.remove_query_params(filter.parameter_name).remove_query_params(filter.parameter_name + '_op') }}" class="text-decoration-none">[清除]</a>
</div>
{% endif %}
<form method="get" class="d-flex flex-column" style="gap: 8px;">
@ -256,7 +256,7 @@
{% endif %}
{% endfor %}
<select name="{{ filter.parameter_name }}_op" class="form-select form-select-sm" required>
<option value="">Select operation...</option>
<option value="">选择操作...</option>
{% for op_value, op_label in filter.get_operation_options_for_model(model_view.model) %}
<option value="{{ op_value }}" {% if current_op == op_value %}selected{% endif %}>{{ op_label }}</option>
{% endfor %}
@ -267,7 +267,7 @@
class="form-control form-control-sm"
value="{{ current_filter }}"
required>
<button type="submit" class="btn btn-sm btn-outline-primary">Apply Filter</button>
<button type="submit" class="btn btn-sm btn-outline-primary">应用筛选</button>
</form>
</div>
</div>

View File

@ -45,8 +45,8 @@ def _truncate(s: str, n: int = 120) -> str:
class JobAdmin(ModelView, model=Job):
name = "Job"
name_plural = "Jobs"
name = "任务"
name_plural = "任务"
icon = "fa fa-cogs"
column_list = [Job.id, Job.enabled, Job.cron_expr, Job.handler_path, Job.updated_at]
@ -72,10 +72,34 @@ class JobAdmin(ModelView, model=Job):
# 列表页模板:加入每行 Run Now
list_template = "job_list.html"
column_labels = {
"id": "任务ID",
"enabled": "启用",
"cron_expr": "Cron 表达式",
"handler_path": "处理器",
"public_cfg": "明文配置",
"secret_cfg": "密文配置",
"last_run_at": "上次运行时间",
"created_at": "创建时间",
"updated_at": "更新时间",
}
column_formatters = {
Job.created_at: lambda m, a: _fmt_dt_seconds(m.created_at),
Job.updated_at: lambda m, a: _fmt_dt_seconds(m.updated_at),
Job.last_run_at: lambda m, a: _fmt_dt_seconds(m.last_run_at),
}
column_formatters_detail = {
Job.created_at: lambda m, a: _fmt_dt_seconds(m.created_at),
Job.updated_at: lambda m, a: _fmt_dt_seconds(m.updated_at),
Job.last_run_at: lambda m, a: _fmt_dt_seconds(m.last_run_at),
}
@action(
name="run_now",
label="Run Now",
confirmation_message="Trigger this job now?",
label="立即运行",
confirmation_message="确认立即执行该任务?",
add_in_list=True,
add_in_detail=True,
)
@ -130,8 +154,8 @@ class JobAdmin(ModelView, model=Job):
class JobLogAdmin(ModelView, model=JobLog):
name = "JobLog"
name_plural = "JobLogs"
name = "任务日志"
name_plural = "任务日志"
icon = "fa fa-list"
can_create = False
@ -161,6 +185,20 @@ class JobLogAdmin(ModelView, model=JobLog):
# 为 JobLog 详情页单独指定模板(用于加入 Retry 按钮)
details_template = "joblog_details.html"
column_labels = {
"id": "日志ID",
"job_id": "任务ID",
"status": "状态",
"snapshot_params": "快照参数",
"message": "消息",
"traceback": "异常堆栈",
"run_log": "运行日志",
"celery_task_id": "Celery任务ID",
"attempt": "重试次数",
"started_at": "开始时间",
"finished_at": "结束时间",
}
column_formatters = {
JobLog.started_at: lambda m, a: _fmt_dt_seconds(m.started_at),
JobLog.finished_at: lambda m, a: _fmt_dt_seconds(m.finished_at),

View File

@ -62,3 +62,39 @@ def get_job_log(session: Session, log_id: int) -> JobLog | None:
return session.get(JobLog, log_id)
def update_job_log(
session: Session,
log_id: int,
*,
status: JobStatus | None = None,
message: str | None = None,
traceback: str | None = None,
run_log: str | None = None,
celery_task_id: str | None = None,
attempt: int | None = None,
finished_at: datetime | None = None,
) -> JobLog | None:
log = session.get(JobLog, log_id)
if not log:
return None
if status is not None:
log.status = status
if message is not None:
log.message = message
if traceback is not None:
log.traceback = traceback
if run_log is not None:
log.run_log = run_log
if celery_task_id is not None:
log.celery_task_id = celery_task_id
if attempt is not None:
log.attempt = attempt
if finished_at is not None:
log.finished_at = finished_at
session.add(log)
session.commit()
session.refresh(log)
return log

View File

@ -33,6 +33,7 @@ class Job(Base):
class JobStatus(str, enum.Enum):
RUNNING = "RUNNING"
SUCCESS = "SUCCESS"
FAILURE = "FAILURE"
RETRY = "RETRY"

View File

@ -11,6 +11,136 @@ def _has_column(engine: Engine, table: str, col: str) -> bool:
return any(c.get("name") == col for c in cols)
def _sqlite_table_sql(conn, table: str) -> str:
row = conn.execute(
text("SELECT sql FROM sqlite_master WHERE type='table' AND name=:name"),
{"name": table},
).fetchone()
return str(row[0] or "") if row else ""
def _ensure_job_logs_status_allows_running(engine: Engine) -> None:
"""
status 新增 RUNNING 时的轻量自升级
- SQLite如存在 CHECK 且不包含 RUNNING则通过重建表方式迁移移除旧 CHECK确保允许 RUNNING
- PostgreSQL如存在 status CHECK 且不包含 RUNNING drop & recreate
"""
dialect = engine.dialect.name
if dialect not in ("sqlite", "postgresql"):
return
insp = inspect(engine)
try:
cols = insp.get_columns("job_logs")
except Exception:
return
existing_cols = {c.get("name") for c in cols if c.get("name")}
with engine.begin() as conn:
if dialect == "sqlite":
sql = _sqlite_table_sql(conn, "job_logs")
# 没有 CHECK 约束则无需迁移;有 CHECK 但已包含 RUNNING 也无需迁移
if not sql or "CHECK" not in sql or "RUNNING" in sql:
return
# 重建表:去掉旧 CHECK允许 RUNNING并确保列存在缺列用默认值补齐
conn.execute(text("ALTER TABLE job_logs RENAME TO job_logs_old"))
conn.execute(
text(
"""
CREATE TABLE job_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
job_id VARCHAR NOT NULL,
status VARCHAR(16) NOT NULL,
snapshot_params TEXT NOT NULL DEFAULT '{}',
message TEXT NOT NULL DEFAULT '',
traceback TEXT NOT NULL DEFAULT '',
run_log TEXT NOT NULL DEFAULT '',
celery_task_id VARCHAR NOT NULL DEFAULT '',
attempt INTEGER NOT NULL DEFAULT 0,
started_at DATETIME NOT NULL,
finished_at DATETIME
)
"""
)
)
def _expr(col: str, default_expr: str) -> str:
return col if col in existing_cols else f"{default_expr} AS {col}"
insert_cols = [
"id",
"job_id",
"status",
"snapshot_params",
"message",
"traceback",
"run_log",
"celery_task_id",
"attempt",
"started_at",
"finished_at",
]
select_exprs = [
_expr("id", "NULL"),
_expr("job_id", "''"),
_expr("status", "''"),
_expr("snapshot_params", "'{}'"),
_expr("message", "''"),
_expr("traceback", "''"),
_expr("run_log", "''"),
_expr("celery_task_id", "''"),
_expr("attempt", "0"),
_expr("started_at", "CURRENT_TIMESTAMP"),
_expr("finished_at", "NULL"),
]
conn.execute(
text(
f"INSERT INTO job_logs ({', '.join(insert_cols)}) "
f"SELECT {', '.join(select_exprs)} FROM job_logs_old"
)
)
conn.execute(text("DROP TABLE job_logs_old"))
# 还原 job_id 索引SQLAlchemy 默认命名 ix_job_logs_job_id
conn.execute(text("CREATE INDEX IF NOT EXISTS ix_job_logs_job_id ON job_logs (job_id)"))
return
if dialect == "postgresql":
try:
checks = insp.get_check_constraints("job_logs") or []
except Exception:
checks = []
need = False
drop_names: list[str] = []
for ck in checks:
name = str(ck.get("name") or "")
sqltext = str(ck.get("sqltext") or "")
if "status" in sqltext and "RUNNING" not in sqltext:
need = True
if name:
drop_names.append(name)
if not need:
return
# 先尽力 drop 旧约束(名称不确定),再创建统一的新约束
for n in drop_names:
conn.execute(text(f'ALTER TABLE job_logs DROP CONSTRAINT IF EXISTS "{n}"'))
conn.execute(text("ALTER TABLE job_logs DROP CONSTRAINT IF EXISTS ck_job_logs_status"))
conn.execute(
text(
"ALTER TABLE job_logs "
"ADD CONSTRAINT ck_job_logs_status "
"CHECK (status IN ('RUNNING','SUCCESS','FAILURE','RETRY'))"
)
)
return
def ensure_schema(engine: Engine) -> None:
"""
轻量自升级 SQLite/PostgreSQL
@ -24,4 +154,7 @@ def ensure_schema(engine: Engine) -> None:
if not _has_column(engine, "job_logs", "run_log"):
conn.execute(text("ALTER TABLE job_logs ADD COLUMN run_log TEXT NOT NULL DEFAULT ''"))
# job_logs.status: ensure new enum value RUNNING is accepted by DB constraints
_ensure_job_logs_status_allows_running(engine)

View File

@ -18,6 +18,42 @@ from app.tasks.celery_app import celery_app
logger = logging.getLogger("connecthub.tasks.execute")
_MAX_MESSAGE_WARNING_LINES = 200
_MAX_MESSAGE_CHARS = 50_000
def _extract_warning_lines(run_log_text: str) -> list[str]:
"""
run_log 文本里提取 WARNING 保留原始行文本
capture_logs 的格式为'%(asctime)s %(levelname)s %(name)s %(message)s'
"""
run_log_text = run_log_text or ""
lines = run_log_text.splitlines()
return [ln for ln in lines if " WARNING " in f" {ln} "]
def _compose_message(base_message: str, warning_lines: list[str]) -> str:
"""
base_message + warnings(具体内容) + summary并做截断保护
"""
base_message = base_message or ""
warning_lines = warning_lines or []
parts: list[str] = [base_message]
if warning_lines:
parts.append(f"WARNINGS ({len(warning_lines)}):")
if len(warning_lines) <= _MAX_MESSAGE_WARNING_LINES:
parts.extend(warning_lines)
else:
parts.extend(warning_lines[:_MAX_MESSAGE_WARNING_LINES])
parts.append(f"[TRUNCATED] warnings exceeded {_MAX_MESSAGE_WARNING_LINES} lines")
parts.append(f"SUMMARY: warnings={len(warning_lines)}")
msg = "\n".join([p for p in parts if p is not None])
if len(msg) > _MAX_MESSAGE_CHARS:
msg = msg[: _MAX_MESSAGE_CHARS - 64] + "\n[TRUNCATED] message exceeded 50000 chars"
return msg
@celery_app.task(bind=True, name="connecthub.execute_job")
def execute_job(self, job_id: str | None = None, snapshot_params: dict[str, Any] | None = None) -> dict[str, Any]:
@ -42,6 +78,10 @@ def execute_job(self, job_id: str | None = None, snapshot_params: dict[str, Any]
traceback = ""
result: dict[str, Any] = {}
run_log_text = ""
log_id: int | None = None
celery_task_id = getattr(self.request, "id", "") or ""
attempt = int(getattr(self.request, "retries", 0) or 0)
snapshot: dict[str, Any] = {}
try:
with capture_logs(max_bytes=200_000) as get_run_log:
@ -61,6 +101,37 @@ def execute_job(self, job_id: str | None = None, snapshot_params: dict[str, Any]
public_cfg = job.public_cfg or {}
secret_token = job.secret_cfg or ""
snapshot = snapshot_params or {
"job_id": job_id,
"handler_path": handler_path,
"public_cfg": public_cfg,
"secret_cfg": secret_token,
"meta": {
"trigger": "celery",
"celery_task_id": celery_task_id,
"started_at": started_at.isoformat(),
},
}
# 任务开始即落库一条 RUNNING 记录(若失败则降级为旧行为:结束时再 create
try:
running = crud.create_job_log(
session,
job_id=str(job_id or ""),
status=JobStatus.RUNNING,
snapshot_params=snapshot,
message="运行中",
traceback="",
run_log="",
celery_task_id=celery_task_id,
attempt=attempt,
started_at=started_at,
finished_at=None,
)
log_id = int(running.id)
except Exception:
log_id = None
secrets = decrypt_json(secret_token)
job_instance = instantiate(handler_path)
out = job_instance.run(params=public_cfg, secrets=secrets)
@ -81,6 +152,23 @@ def execute_job(self, job_id: str | None = None, snapshot_params: dict[str, Any]
run_log_text = ""
finally:
finished_at = datetime.utcnow()
warning_lines = _extract_warning_lines(run_log_text)
message = _compose_message(message, warning_lines)
# 结束时:优先更新 RUNNING 那条;若没有则创建最终记录(兼容降级)
if log_id is not None:
crud.update_job_log(
session,
log_id,
status=status,
message=message,
traceback=traceback,
run_log=run_log_text,
celery_task_id=celery_task_id,
attempt=attempt,
finished_at=finished_at,
)
else:
if not snapshot:
snapshot = snapshot_params or {
"job_id": job_id,
"handler_path": handler_path if "handler_path" in locals() else "",
@ -88,7 +176,7 @@ def execute_job(self, job_id: str | None = None, snapshot_params: dict[str, Any]
"secret_cfg": secret_token if "secret_token" in locals() else "",
"meta": {
"trigger": "celery",
"celery_task_id": getattr(self.request, "id", "") or "",
"celery_task_id": celery_task_id,
"started_at": started_at.isoformat(),
},
}
@ -100,8 +188,8 @@ def execute_job(self, job_id: str | None = None, snapshot_params: dict[str, Any]
message=message,
traceback=traceback,
run_log=run_log_text,
celery_task_id=getattr(self.request, "id", "") or "",
attempt=int(getattr(self.request, "retries", 0) or 0),
celery_task_id=celery_task_id,
attempt=attempt,
started_at=started_at,
finished_at=finished_at,
)

View File

@ -91,7 +91,7 @@ class SyncOAToDidiExportFormJob(BaseJob):
- base_url 不包含 /seeyon/rest例如 https://oa.example.com:8090
public_cfg:
- base_url: "https://oa.example.com:8090"
- base_url: "https://oad.example.com:8090"
- templateCode: "employee"
- senderLoginName: "xxx" (可选)
- rightId: "xxx" (可选)
@ -276,6 +276,8 @@ class SyncOAToDidiLegalEntitySyncJob(BaseJob):
if not isinstance(rows, list):
raise RuntimeError("OA export invalid: data is not a list")
logger.info("开始同步OA->滴滴 模板=%s 总行数=%s", oa_template_code, len(rows))
didi = DidiClient(
base_url=didi_base_url,
client_id=client_id,
@ -288,17 +290,20 @@ class SyncOAToDidiLegalEntitySyncJob(BaseJob):
updated = 0
skipped = 0
errors: list[str] = []
warn_count = 0
for row in rows:
total_rows += 1
if not isinstance(row, dict):
skipped += 1
logger.warning("OA row is not a dict, skipped")
warn_count += 1
logger.warning("跳过OA 行数据不是对象(dict)")
continue
master = row.get("masterData") or {}
if not isinstance(master, dict):
skipped += 1
logger.warning("OA row masterData is not a dict, skipped")
warn_count += 1
logger.warning("跳过OA 行 masterData 不是对象(dict)")
continue
emp_obj = master.get(emp_field) or {}
@ -312,13 +317,17 @@ class SyncOAToDidiLegalEntitySyncJob(BaseJob):
if not emp_no or not comp_name:
skipped += 1
logger.warning("Missing employee_number/company_name, skipped employee_number=%r company_name=%r", emp_no, comp_name)
warn_count += 1
logger.warning("跳过:缺少工号或所属公司 employee_number=%r company_name=%r", emp_no, comp_name)
continue
logger.info("正在处理:工号=%s 所属公司=%s", emp_no, comp_name)
# 公司主体匹配(进程内缓存)
legal_entity_id = cache_legal_entity.get(comp_name)
if not legal_entity_id:
try:
logger.info("正在查询公司主体name=%s", comp_name)
data = didi.get_legal_entities(company_id=company_id, offset=0, length=100, keyword=comp_name)
records = data.get("records") or []
if not isinstance(records, list):
@ -326,54 +335,75 @@ class SyncOAToDidiLegalEntitySyncJob(BaseJob):
matches = [r for r in records if isinstance(r, dict) and str(r.get("name") or "") == comp_name]
if len(matches) == 0:
skipped += 1
logger.warning("No exact legal entity match, skipped company_name=%r employee_number=%s", comp_name, emp_no)
warn_count += 1
logger.warning("跳过:滴滴公司主体无精确匹配 company_name=%r employee_number=%s", comp_name, emp_no)
continue
if len(matches) > 1:
skipped += 1
msg = f"Multiple exact legal entity matches company_name={comp_name!r} count={len(matches)}"
errors.append(msg)
logger.error(msg)
logger.error("跳过:滴滴公司主体精确匹配多条 company_name=%r count=%s employee_number=%s", comp_name, len(matches), emp_no)
continue
legal_entity_id = str(matches[0].get("legal_entity_id") or "").strip()
if not legal_entity_id:
skipped += 1
logger.warning("Exact match legal_entity_id empty, skipped company_name=%r", comp_name)
warn_count += 1
logger.warning("跳过:滴滴公司主体 legal_entity_id 为空 company_name=%r", comp_name)
continue
cache_legal_entity[comp_name] = legal_entity_id
logger.info("公司主体匹配成功name=%s legal_entity_id=%s", comp_name, legal_entity_id)
except Exception as e: # noqa: BLE001
skipped += 1
msg = f"LegalEntity lookup failed company_name={comp_name!r} err={e!r}"
errors.append(msg)
logger.warning(msg)
warn_count += 1
logger.warning("跳过:查询滴滴公司主体失败 company_name=%r err=%r", comp_name, e)
continue
else:
logger.info("公司主体缓存命中name=%s legal_entity_id=%s", comp_name, legal_entity_id)
# 员工查询
try:
logger.info("正在查询滴滴员工employee_number=%s", emp_no)
member = didi.get_member_detail(company_id=company_id, employee_number=emp_no)
member_id = str(member.get("member_id") or member.get("id") or "").strip()
if not member_id:
skipped += 1
logger.warning("Member detail missing member_id/id, skipped employee_number=%s", emp_no)
warn_count += 1
logger.warning("跳过:滴滴员工明细缺少 member_id/id employee_number=%s", emp_no)
continue
logger.info("员工查询成功employee_number=%s member_id=%s", emp_no, member_id)
except Exception as e: # noqa: BLE001
skipped += 1
msg = f"Member.detail failed employee_number={emp_no!r} err={e!r}"
errors.append(msg)
logger.warning(msg)
warn_count += 1
logger.warning("跳过:查询滴滴员工失败 employee_number=%r err=%r", emp_no, e)
continue
# 员工更新(按文档要求:连续修改间隔 >=150ms
try:
logger.info("正在更新员工公司主体member_id=%s legal_entity_id=%s", member_id, legal_entity_id)
didi.edit_member_legal_entity(company_id=company_id, member_id=member_id, employee_number=None, legal_entity_id=legal_entity_id)
updated += 1
time.sleep(0.15)
logger.info("同步成功employee_number=%s legal_entity_id=%s", emp_no, legal_entity_id)
except Exception as e: # noqa: BLE001
skipped += 1
msg = f"Member.edit failed employee_number={emp_no!r} member_id={member_id!r} err={e!r}"
errors.append(msg)
logger.warning(msg)
warn_count += 1
logger.warning("同步失败employee_number=%r member_id=%r err=%r", emp_no, member_id, e)
continue
logger.info(
"同步完成:总行数=%s 成功=%s 跳过=%s warnings=%s errors=%s",
total_rows,
updated,
skipped,
warn_count,
len(errors),
)
return {
"total_rows": total_rows,
"updated_count": updated,