430 lines
18 KiB
Python
430 lines
18 KiB
Python
from __future__ import annotations
|
||
|
||
import json
|
||
import logging
|
||
import time
|
||
from typing import Any
|
||
|
||
from app.integrations.didi import DidiClient
|
||
from app.integrations.seeyon import SeeyonClient
|
||
from app.jobs.base import BaseJob
|
||
|
||
|
||
logger = logging.getLogger("connecthub.extensions.sync_oa_to_didi")
|
||
|
||
|
||
def _mask_token(token: str) -> str:
|
||
token = token or ""
|
||
if len(token) <= 12:
|
||
return "***"
|
||
return f"{token[:6]}***{token[-4:]}"
|
||
|
||
|
||
def _log_text_in_chunks(*, prefix: str, text: str, chunk_bytes: int = 8_000) -> None:
|
||
"""
|
||
将大文本尽可能写入 run_log:
|
||
- 按 UTF-8 字节切分,避免单条日志过大导致整条无法写入(capture_logs 会在超过 max_bytes 时丢弃整条并标记截断)
|
||
- 由上层 capture_logs(max_bytes=200_000) 负责总量截断
|
||
"""
|
||
try:
|
||
if not text:
|
||
logger.info("%s <empty>", prefix)
|
||
return
|
||
|
||
if chunk_bytes <= 0:
|
||
chunk_bytes = 8_000
|
||
|
||
raw_bytes = text.encode("utf-8", errors="replace")
|
||
total = (len(raw_bytes) + chunk_bytes - 1) // chunk_bytes
|
||
for i in range(total):
|
||
b = raw_bytes[i * chunk_bytes : (i + 1) * chunk_bytes]
|
||
chunk = b.decode("utf-8", errors="replace")
|
||
logger.info("%s chunk %s/%s: %s", prefix, i + 1, total, chunk)
|
||
except Exception:
|
||
# run_log 捕获属于“尽力而为”,任何异常都不应影响任务执行
|
||
return
|
||
|
||
|
||
class SyncOAToDidiTokenJob(BaseJob):
|
||
"""
|
||
示例 Job:演示致远 OA 的 token 获取与日志记录
|
||
|
||
public_cfg:
|
||
- base_url: "https://oa.example.com"
|
||
|
||
secret_cfg (解密后):
|
||
- rest_user
|
||
- rest_password
|
||
- loginName (可选)
|
||
"""
|
||
|
||
job_id = "sync_oa_to_didi.token_demo"
|
||
|
||
def run(self, params: dict[str, Any], secrets: dict[str, Any]) -> dict[str, Any]:
|
||
base_url = str(params.get("base_url") or "").strip()
|
||
if not base_url:
|
||
raise ValueError("public_cfg.base_url is required")
|
||
|
||
rest_user = str(secrets.get("rest_user") or "").strip()
|
||
rest_password = str(secrets.get("rest_password") or "").strip()
|
||
login_name = secrets.get("loginName")
|
||
login_name = str(login_name).strip() if login_name else None
|
||
|
||
if not rest_user or not rest_password:
|
||
raise ValueError("secret_cfg.rest_user and secret_cfg.rest_password are required")
|
||
|
||
client = SeeyonClient(base_url=base_url, rest_user=rest_user, rest_password=rest_password, loginName=login_name)
|
||
try:
|
||
token = client.authenticate()
|
||
finally:
|
||
client.close()
|
||
|
||
masked = _mask_token(token)
|
||
logger.info("Seeyon token acquired (masked) token=%s loginName=%s base_url=%s", masked, login_name, base_url)
|
||
return {"token_masked": masked, "loginName": login_name or "", "base_url": base_url}
|
||
|
||
|
||
class SyncOAToDidiExportFormJob(BaseJob):
|
||
"""
|
||
无流程表单导出(CAP4):
|
||
- 调用:POST /seeyon/rest/cap4/form/soap/export
|
||
- base_url 不包含 /seeyon/rest(例如 https://oa.example.com:8090)
|
||
|
||
public_cfg:
|
||
- base_url: "https://oad.example.com:8090"
|
||
- templateCode: "employee"
|
||
- senderLoginName: "xxx" (可选)
|
||
- rightId: "xxx" (可选)
|
||
- doTrigger: "true" (可选)
|
||
- param: "0" (可选)
|
||
- extra: {...} (可选,兜底扩展字段)
|
||
|
||
secret_cfg (解密后):
|
||
- rest_user
|
||
- rest_password
|
||
- loginName
|
||
"""
|
||
|
||
job_id = "sync_oa_to_didi.export_form_soap"
|
||
|
||
def run(self, params: dict[str, Any], secrets: dict[str, Any]) -> dict[str, Any]:
|
||
base_url = str(params.get("base_url") or "").strip()
|
||
if not base_url:
|
||
raise ValueError("public_cfg.base_url is required")
|
||
|
||
template_code = str(params.get("templateCode") or "").strip()
|
||
if not template_code:
|
||
raise ValueError("public_cfg.templateCode is required")
|
||
|
||
sender_login_name = params.get("senderLoginName")
|
||
sender_login_name = str(sender_login_name).strip() if sender_login_name else None
|
||
|
||
right_id = params.get("rightId")
|
||
right_id = str(right_id).strip() if right_id else None
|
||
|
||
do_trigger = params.get("doTrigger")
|
||
param = params.get("param")
|
||
param = str(param) if param is not None else None
|
||
|
||
extra = params.get("extra")
|
||
if extra is not None and not isinstance(extra, dict):
|
||
raise ValueError("public_cfg.extra must be a JSON object (dict) if provided")
|
||
|
||
rest_user = str(secrets.get("rest_user") or "").strip()
|
||
rest_password = str(secrets.get("rest_password") or "").strip()
|
||
login_name = secrets.get("loginName")
|
||
login_name = str(login_name).strip() if login_name else None
|
||
|
||
if not rest_user or not rest_password:
|
||
raise ValueError("secret_cfg.rest_user and secret_cfg.rest_password are required")
|
||
|
||
client = SeeyonClient(base_url=base_url, rest_user=rest_user, rest_password=rest_password, loginName=login_name)
|
||
try:
|
||
resp = client.export_cap4_form_soap(
|
||
templateCode=template_code,
|
||
senderLoginName=sender_login_name,
|
||
rightId=right_id,
|
||
doTrigger=do_trigger,
|
||
param=param,
|
||
extra=extra,
|
||
)
|
||
raw_text = resp.text or ""
|
||
content_type = resp.headers.get("content-type", "") if getattr(resp, "headers", None) else ""
|
||
finally:
|
||
client.close()
|
||
|
||
# 避免把 raw_text 打到日志或 run_log(会被截断且污染 JobLog)
|
||
logger.info(
|
||
"Seeyon export_form_soap done templateCode=%s content_length=%s content_type=%s base_url=%s",
|
||
template_code,
|
||
len(raw_text),
|
||
content_type,
|
||
base_url,
|
||
)
|
||
_log_text_in_chunks(prefix="Seeyon export_form_soap raw", text=raw_text, chunk_bytes=8_000)
|
||
|
||
return {
|
||
"raw": raw_text,
|
||
"meta": {
|
||
"templateCode": template_code,
|
||
"content_length": len(raw_text),
|
||
"content_type": content_type,
|
||
},
|
||
}
|
||
|
||
|
||
class SyncOAToDidiLegalEntitySyncJob(BaseJob):
|
||
"""
|
||
从 OA 无流程表单导出中读取“工号/所属公司”,并同步到滴滴:
|
||
- 公司主体:GET /river/LegalEntity/get(keyword=name),name 完全相等优先
|
||
- 员工查询:GET /river/Member/detail(employee_number=工号)
|
||
- 员工更新:POST /river/Member/edit(更新 legal_entity_id)
|
||
|
||
public_cfg:
|
||
- oa_base_url: "https://oa.example.com:8090"
|
||
- oa_templateCode: "employee"
|
||
- didi_base_url: "https://api.es.xiaojukeji.com"
|
||
- senderLoginName/rightId/doTrigger/param/extra: 可选(透传到 OA 导出)
|
||
|
||
secret_cfg (解密后):
|
||
- rest_user/rest_password/loginName: OA 登录
|
||
- company_id/client_id/client_secret/sign_key: 滴滴凭证
|
||
- phone: 可选(此 Job 不使用)
|
||
"""
|
||
|
||
job_id = "sync_oa_to_didi.sync_legal_entity"
|
||
|
||
def run(self, params: dict[str, Any], secrets: dict[str, Any]) -> dict[str, Any]:
|
||
oa_base_url = str(params.get("oa_base_url") or "").strip()
|
||
if not oa_base_url:
|
||
raise ValueError("public_cfg.oa_base_url is required")
|
||
oa_template_code = str(params.get("oa_templateCode") or "").strip()
|
||
if not oa_template_code:
|
||
raise ValueError("public_cfg.oa_templateCode is required")
|
||
|
||
didi_base_url = str(params.get("didi_base_url") or "").strip()
|
||
if not didi_base_url:
|
||
raise ValueError("public_cfg.didi_base_url is required")
|
||
|
||
rest_user = str(secrets.get("rest_user") or "").strip()
|
||
rest_password = str(secrets.get("rest_password") or "").strip()
|
||
login_name = secrets.get("loginName")
|
||
login_name = str(login_name).strip() if login_name else None
|
||
if not rest_user or not rest_password:
|
||
raise ValueError("secret_cfg.rest_user and secret_cfg.rest_password are required")
|
||
|
||
company_id = str(secrets.get("company_id") or "").strip()
|
||
client_id = str(secrets.get("client_id") or "").strip()
|
||
client_secret = str(secrets.get("client_secret") or "").strip()
|
||
sign_key = str(secrets.get("sign_key") or "").strip()
|
||
if not company_id or not client_id or not client_secret or not sign_key:
|
||
raise ValueError("secret_cfg.company_id/client_id/client_secret/sign_key are required")
|
||
|
||
sender_login_name = params.get("senderLoginName")
|
||
sender_login_name = str(sender_login_name).strip() if sender_login_name else None
|
||
right_id = params.get("rightId")
|
||
right_id = str(right_id).strip() if right_id else None
|
||
do_trigger = params.get("doTrigger")
|
||
param = params.get("param")
|
||
param = str(param) if param is not None else None
|
||
extra = params.get("extra")
|
||
if extra is not None and not isinstance(extra, dict):
|
||
raise ValueError("public_cfg.extra must be a JSON object (dict) if provided")
|
||
|
||
seeyon = SeeyonClient(base_url=oa_base_url, rest_user=rest_user, rest_password=rest_password, loginName=login_name)
|
||
try:
|
||
resp = seeyon.export_cap4_form_soap(
|
||
templateCode=oa_template_code,
|
||
senderLoginName=sender_login_name,
|
||
rightId=right_id,
|
||
doTrigger=do_trigger,
|
||
param=param,
|
||
extra=extra,
|
||
)
|
||
raw = resp.text or ""
|
||
finally:
|
||
seeyon.close()
|
||
|
||
payload = json.loads(raw) if raw else {}
|
||
outer = payload.get("data") or {}
|
||
# 成功返回时 outer["data"] 即为表单对象:{"definition": {...}, "data": [...]}
|
||
form = outer.get("data") or {}
|
||
|
||
if not isinstance(form, dict):
|
||
raise RuntimeError("OA export invalid: data.data is not an object (dict)")
|
||
|
||
definition = form.get("definition") or {}
|
||
fields = definition.get("fields") or []
|
||
if not isinstance(fields, list):
|
||
raise RuntimeError("OA export invalid: definition.fields is not a list")
|
||
|
||
emp_field: str | None = None
|
||
company_field: str | None = None
|
||
sync_field: str | None = None
|
||
for f in fields:
|
||
if not isinstance(f, dict):
|
||
continue
|
||
display = str(f.get("display") or "")
|
||
name = str(f.get("name") or "")
|
||
if display == "工号" and name:
|
||
emp_field = name
|
||
if display == "所属公司" and name:
|
||
company_field = name
|
||
if display == "是否同步滴滴" and name:
|
||
sync_field = name
|
||
if not emp_field or not company_field:
|
||
raise RuntimeError("OA export invalid: cannot locate fields for 工号/所属公司 in definition.fields")
|
||
|
||
rows = form.get("data") or []
|
||
if not isinstance(rows, list):
|
||
raise RuntimeError("OA export invalid: data is not a list")
|
||
|
||
logger.info("开始同步:OA->滴滴 模板=%s 总行数=%s", oa_template_code, len(rows))
|
||
|
||
didi = DidiClient(
|
||
base_url=didi_base_url,
|
||
client_id=client_id,
|
||
client_secret=client_secret,
|
||
sign_key=sign_key,
|
||
)
|
||
try:
|
||
cache_legal_entity: dict[str, str] = {}
|
||
total_rows = 0
|
||
updated = 0
|
||
skipped = 0
|
||
errors: list[str] = []
|
||
warn_count = 0
|
||
|
||
for row in rows:
|
||
total_rows += 1
|
||
if not isinstance(row, dict):
|
||
skipped += 1
|
||
warn_count += 1
|
||
logger.warning("跳过:OA 行数据不是对象(dict)")
|
||
continue
|
||
master = row.get("masterData") or {}
|
||
if not isinstance(master, dict):
|
||
skipped += 1
|
||
warn_count += 1
|
||
logger.warning("跳过:OA 行 masterData 不是对象(dict)")
|
||
continue
|
||
|
||
emp_obj = master.get(emp_field) or {}
|
||
comp_obj = master.get(company_field) or {}
|
||
emp_no = ""
|
||
comp_name = ""
|
||
if isinstance(emp_obj, dict):
|
||
emp_no = str(emp_obj.get("value") or emp_obj.get("showValue") or "").strip()
|
||
if isinstance(comp_obj, dict):
|
||
comp_name = str(comp_obj.get("value") or comp_obj.get("showValue") or "").strip()
|
||
|
||
if not emp_no or not comp_name:
|
||
skipped += 1
|
||
warn_count += 1
|
||
logger.warning("跳过:缺少工号或所属公司 employee_number=%r company_name=%r", emp_no, comp_name)
|
||
continue
|
||
|
||
# 是否同步滴滴:字段存在且值为 "N" 则跳过;字段不存在则默认继续(兼容旧表单)
|
||
if sync_field:
|
||
sync_obj = master.get(sync_field) or {}
|
||
sync_val = ""
|
||
if isinstance(sync_obj, dict):
|
||
sync_val = str(sync_obj.get("value") or sync_obj.get("showValue") or "").strip()
|
||
if sync_val == "N":
|
||
skipped += 1
|
||
warn_count += 1
|
||
logger.warning("跳过:是否同步滴滴=N employee_number=%s company_name=%s", emp_no, comp_name)
|
||
continue
|
||
|
||
logger.info("正在处理:工号=%s 所属公司=%s", emp_no, comp_name)
|
||
|
||
# 公司主体匹配(进程内缓存)
|
||
legal_entity_id = cache_legal_entity.get(comp_name)
|
||
if not legal_entity_id:
|
||
try:
|
||
logger.info("正在查询公司主体:name=%s", comp_name)
|
||
data = didi.get_legal_entities(company_id=company_id, offset=0, length=100, keyword=comp_name)
|
||
records = data.get("records") or []
|
||
if not isinstance(records, list):
|
||
raise RuntimeError("LegalEntity.get invalid: records not a list")
|
||
matches = [r for r in records if isinstance(r, dict) and str(r.get("name") or "") == comp_name]
|
||
if len(matches) == 0:
|
||
skipped += 1
|
||
warn_count += 1
|
||
logger.warning("跳过:滴滴公司主体无精确匹配 company_name=%r employee_number=%s", comp_name, emp_no)
|
||
continue
|
||
if len(matches) > 1:
|
||
skipped += 1
|
||
msg = f"Multiple exact legal entity matches company_name={comp_name!r} count={len(matches)}"
|
||
errors.append(msg)
|
||
logger.error("跳过:滴滴公司主体精确匹配多条 company_name=%r count=%s employee_number=%s", comp_name, len(matches), emp_no)
|
||
continue
|
||
legal_entity_id = str(matches[0].get("legal_entity_id") or "").strip()
|
||
if not legal_entity_id:
|
||
skipped += 1
|
||
warn_count += 1
|
||
logger.warning("跳过:滴滴公司主体 legal_entity_id 为空 company_name=%r", comp_name)
|
||
continue
|
||
cache_legal_entity[comp_name] = legal_entity_id
|
||
logger.info("公司主体匹配成功:name=%s legal_entity_id=%s", comp_name, legal_entity_id)
|
||
except Exception as e: # noqa: BLE001
|
||
skipped += 1
|
||
msg = f"LegalEntity lookup failed company_name={comp_name!r} err={e!r}"
|
||
errors.append(msg)
|
||
warn_count += 1
|
||
logger.warning("跳过:查询滴滴公司主体失败 company_name=%r err=%r", comp_name, e)
|
||
continue
|
||
else:
|
||
logger.info("公司主体缓存命中:name=%s legal_entity_id=%s", comp_name, legal_entity_id)
|
||
|
||
# 员工查询
|
||
try:
|
||
logger.info("正在查询滴滴员工:employee_number=%s", emp_no)
|
||
member = didi.get_member_detail(company_id=company_id, employee_number=emp_no)
|
||
member_id = str(member.get("member_id") or member.get("id") or "").strip()
|
||
if not member_id:
|
||
skipped += 1
|
||
warn_count += 1
|
||
logger.warning("跳过:滴滴员工明细缺少 member_id/id employee_number=%s", emp_no)
|
||
continue
|
||
logger.info("员工查询成功:employee_number=%s member_id=%s", emp_no, member_id)
|
||
except Exception as e: # noqa: BLE001
|
||
skipped += 1
|
||
msg = f"Member.detail failed employee_number={emp_no!r} err={e!r}"
|
||
errors.append(msg)
|
||
warn_count += 1
|
||
logger.warning("跳过:查询滴滴员工失败 employee_number=%r err=%r", emp_no, e)
|
||
continue
|
||
|
||
# 员工更新(按文档要求:连续修改间隔 >=150ms)
|
||
try:
|
||
logger.info("正在更新员工公司主体:member_id=%s legal_entity_id=%s", member_id, legal_entity_id)
|
||
didi.edit_member_legal_entity(company_id=company_id, member_id=member_id, employee_number=None, legal_entity_id=legal_entity_id)
|
||
updated += 1
|
||
time.sleep(0.15)
|
||
logger.info("同步成功:employee_number=%s legal_entity_id=%s", emp_no, legal_entity_id)
|
||
except Exception as e: # noqa: BLE001
|
||
skipped += 1
|
||
msg = f"Member.edit failed employee_number={emp_no!r} member_id={member_id!r} err={e!r}"
|
||
errors.append(msg)
|
||
warn_count += 1
|
||
logger.warning("同步失败:employee_number=%r member_id=%r err=%r", emp_no, member_id, e)
|
||
continue
|
||
|
||
logger.info(
|
||
"同步完成:总行数=%s 成功=%s 跳过=%s warnings=%s errors=%s",
|
||
total_rows,
|
||
updated,
|
||
skipped,
|
||
warn_count,
|
||
len(errors),
|
||
)
|
||
return {
|
||
"total_rows": total_rows,
|
||
"updated_count": updated,
|
||
"skipped_count": skipped,
|
||
"errors": errors[:50],
|
||
}
|
||
finally:
|
||
didi.close()
|