Vastai-ConnectHub/extensions/sync_oa_to_didi/job.py

415 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import json
import logging
import time
from typing import Any
from app.integrations.didi import DidiClient
from app.integrations.seeyon import SeeyonClient
from app.jobs.base import BaseJob
logger = logging.getLogger("connecthub.extensions.sync_oa_to_didi")
def _mask_token(token: str) -> str:
token = token or ""
if len(token) <= 12:
return "***"
return f"{token[:6]}***{token[-4:]}"
def _log_text_in_chunks(*, prefix: str, text: str, chunk_bytes: int = 8_000) -> None:
"""
将大文本尽可能写入 run_log
- 按 UTF-8 字节切分避免单条日志过大导致整条无法写入capture_logs 会在超过 max_bytes 时丢弃整条并标记截断)
- 由上层 capture_logs(max_bytes=200_000) 负责总量截断
"""
try:
if not text:
logger.info("%s <empty>", prefix)
return
if chunk_bytes <= 0:
chunk_bytes = 8_000
raw_bytes = text.encode("utf-8", errors="replace")
total = (len(raw_bytes) + chunk_bytes - 1) // chunk_bytes
for i in range(total):
b = raw_bytes[i * chunk_bytes : (i + 1) * chunk_bytes]
chunk = b.decode("utf-8", errors="replace")
logger.info("%s chunk %s/%s: %s", prefix, i + 1, total, chunk)
except Exception:
# run_log 捕获属于“尽力而为”,任何异常都不应影响任务执行
return
class SyncOAToDidiTokenJob(BaseJob):
"""
示例 Job演示致远 OA 的 token 获取与日志记录
public_cfg:
- base_url: "https://oa.example.com"
secret_cfg (解密后):
- rest_user
- rest_password
- loginName (可选)
"""
job_id = "sync_oa_to_didi.token_demo"
def run(self, params: dict[str, Any], secrets: dict[str, Any]) -> dict[str, Any]:
base_url = str(params.get("base_url") or "").strip()
if not base_url:
raise ValueError("public_cfg.base_url is required")
rest_user = str(secrets.get("rest_user") or "").strip()
rest_password = str(secrets.get("rest_password") or "").strip()
login_name = secrets.get("loginName")
login_name = str(login_name).strip() if login_name else None
if not rest_user or not rest_password:
raise ValueError("secret_cfg.rest_user and secret_cfg.rest_password are required")
client = SeeyonClient(base_url=base_url, rest_user=rest_user, rest_password=rest_password, loginName=login_name)
try:
token = client.authenticate()
finally:
client.close()
masked = _mask_token(token)
logger.info("Seeyon token acquired (masked) token=%s loginName=%s base_url=%s", masked, login_name, base_url)
return {"token_masked": masked, "loginName": login_name or "", "base_url": base_url}
class SyncOAToDidiExportFormJob(BaseJob):
"""
无流程表单导出CAP4
- 调用POST /seeyon/rest/cap4/form/soap/export
- base_url 不包含 /seeyon/rest例如 https://oa.example.com:8090
public_cfg:
- base_url: "https://oad.example.com:8090"
- templateCode: "employee"
- senderLoginName: "xxx" (可选)
- rightId: "xxx" (可选)
- doTrigger: "true" (可选)
- param: "0" (可选)
- extra: {...} (可选,兜底扩展字段)
secret_cfg (解密后):
- rest_user
- rest_password
- loginName
"""
job_id = "sync_oa_to_didi.export_form_soap"
def run(self, params: dict[str, Any], secrets: dict[str, Any]) -> dict[str, Any]:
base_url = str(params.get("base_url") or "").strip()
if not base_url:
raise ValueError("public_cfg.base_url is required")
template_code = str(params.get("templateCode") or "").strip()
if not template_code:
raise ValueError("public_cfg.templateCode is required")
sender_login_name = params.get("senderLoginName")
sender_login_name = str(sender_login_name).strip() if sender_login_name else None
right_id = params.get("rightId")
right_id = str(right_id).strip() if right_id else None
do_trigger = params.get("doTrigger")
param = params.get("param")
param = str(param) if param is not None else None
extra = params.get("extra")
if extra is not None and not isinstance(extra, dict):
raise ValueError("public_cfg.extra must be a JSON object (dict) if provided")
rest_user = str(secrets.get("rest_user") or "").strip()
rest_password = str(secrets.get("rest_password") or "").strip()
login_name = secrets.get("loginName")
login_name = str(login_name).strip() if login_name else None
if not rest_user or not rest_password:
raise ValueError("secret_cfg.rest_user and secret_cfg.rest_password are required")
client = SeeyonClient(base_url=base_url, rest_user=rest_user, rest_password=rest_password, loginName=login_name)
try:
resp = client.export_cap4_form_soap(
templateCode=template_code,
senderLoginName=sender_login_name,
rightId=right_id,
doTrigger=do_trigger,
param=param,
extra=extra,
)
raw_text = resp.text or ""
content_type = resp.headers.get("content-type", "") if getattr(resp, "headers", None) else ""
finally:
client.close()
# 避免把 raw_text 打到日志或 run_log会被截断且污染 JobLog
logger.info(
"Seeyon export_form_soap done templateCode=%s content_length=%s content_type=%s base_url=%s",
template_code,
len(raw_text),
content_type,
base_url,
)
_log_text_in_chunks(prefix="Seeyon export_form_soap raw", text=raw_text, chunk_bytes=8_000)
return {
"raw": raw_text,
"meta": {
"templateCode": template_code,
"content_length": len(raw_text),
"content_type": content_type,
},
}
class SyncOAToDidiLegalEntitySyncJob(BaseJob):
"""
从 OA 无流程表单导出中读取“工号/所属公司”,并同步到滴滴:
- 公司主体GET /river/LegalEntity/getkeyword=namename 完全相等优先
- 员工查询GET /river/Member/detailemployee_number=工号)
- 员工更新POST /river/Member/edit更新 legal_entity_id
public_cfg:
- oa_base_url: "https://oa.example.com:8090"
- oa_templateCode: "employee"
- didi_base_url: "https://api.es.xiaojukeji.com"
- senderLoginName/rightId/doTrigger/param/extra: 可选(透传到 OA 导出)
secret_cfg (解密后):
- rest_user/rest_password/loginName: OA 登录
- company_id/client_id/client_secret/sign_key: 滴滴凭证
- phone: 可选(此 Job 不使用)
"""
job_id = "sync_oa_to_didi.sync_legal_entity"
def run(self, params: dict[str, Any], secrets: dict[str, Any]) -> dict[str, Any]:
oa_base_url = str(params.get("oa_base_url") or "").strip()
if not oa_base_url:
raise ValueError("public_cfg.oa_base_url is required")
oa_template_code = str(params.get("oa_templateCode") or "").strip()
if not oa_template_code:
raise ValueError("public_cfg.oa_templateCode is required")
didi_base_url = str(params.get("didi_base_url") or "").strip()
if not didi_base_url:
raise ValueError("public_cfg.didi_base_url is required")
rest_user = str(secrets.get("rest_user") or "").strip()
rest_password = str(secrets.get("rest_password") or "").strip()
login_name = secrets.get("loginName")
login_name = str(login_name).strip() if login_name else None
if not rest_user or not rest_password:
raise ValueError("secret_cfg.rest_user and secret_cfg.rest_password are required")
company_id = str(secrets.get("company_id") or "").strip()
client_id = str(secrets.get("client_id") or "").strip()
client_secret = str(secrets.get("client_secret") or "").strip()
sign_key = str(secrets.get("sign_key") or "").strip()
if not company_id or not client_id or not client_secret or not sign_key:
raise ValueError("secret_cfg.company_id/client_id/client_secret/sign_key are required")
sender_login_name = params.get("senderLoginName")
sender_login_name = str(sender_login_name).strip() if sender_login_name else None
right_id = params.get("rightId")
right_id = str(right_id).strip() if right_id else None
do_trigger = params.get("doTrigger")
param = params.get("param")
param = str(param) if param is not None else None
extra = params.get("extra")
if extra is not None and not isinstance(extra, dict):
raise ValueError("public_cfg.extra must be a JSON object (dict) if provided")
seeyon = SeeyonClient(base_url=oa_base_url, rest_user=rest_user, rest_password=rest_password, loginName=login_name)
try:
resp = seeyon.export_cap4_form_soap(
templateCode=oa_template_code,
senderLoginName=sender_login_name,
rightId=right_id,
doTrigger=do_trigger,
param=param,
extra=extra,
)
raw = resp.text or ""
finally:
seeyon.close()
payload = json.loads(raw) if raw else {}
outer = payload.get("data") or {}
# 成功返回时 outer["data"] 即为表单对象:{"definition": {...}, "data": [...]}
form = outer.get("data") or {}
if not isinstance(form, dict):
raise RuntimeError("OA export invalid: data.data is not an object (dict)")
definition = form.get("definition") or {}
fields = definition.get("fields") or []
if not isinstance(fields, list):
raise RuntimeError("OA export invalid: definition.fields is not a list")
emp_field: str | None = None
company_field: str | None = None
for f in fields:
if not isinstance(f, dict):
continue
display = str(f.get("display") or "")
name = str(f.get("name") or "")
if display == "工号" and name:
emp_field = name
if display == "所属公司" and name:
company_field = name
if not emp_field or not company_field:
raise RuntimeError("OA export invalid: cannot locate fields for 工号/所属公司 in definition.fields")
rows = form.get("data") or []
if not isinstance(rows, list):
raise RuntimeError("OA export invalid: data is not a list")
logger.info("开始同步OA->滴滴 模板=%s 总行数=%s", oa_template_code, len(rows))
didi = DidiClient(
base_url=didi_base_url,
client_id=client_id,
client_secret=client_secret,
sign_key=sign_key,
)
try:
cache_legal_entity: dict[str, str] = {}
total_rows = 0
updated = 0
skipped = 0
errors: list[str] = []
warn_count = 0
for row in rows:
total_rows += 1
if not isinstance(row, dict):
skipped += 1
warn_count += 1
logger.warning("跳过OA 行数据不是对象(dict)")
continue
master = row.get("masterData") or {}
if not isinstance(master, dict):
skipped += 1
warn_count += 1
logger.warning("跳过OA 行 masterData 不是对象(dict)")
continue
emp_obj = master.get(emp_field) or {}
comp_obj = master.get(company_field) or {}
emp_no = ""
comp_name = ""
if isinstance(emp_obj, dict):
emp_no = str(emp_obj.get("value") or emp_obj.get("showValue") or "").strip()
if isinstance(comp_obj, dict):
comp_name = str(comp_obj.get("value") or comp_obj.get("showValue") or "").strip()
if not emp_no or not comp_name:
skipped += 1
warn_count += 1
logger.warning("跳过:缺少工号或所属公司 employee_number=%r company_name=%r", emp_no, comp_name)
continue
logger.info("正在处理:工号=%s 所属公司=%s", emp_no, comp_name)
# 公司主体匹配(进程内缓存)
legal_entity_id = cache_legal_entity.get(comp_name)
if not legal_entity_id:
try:
logger.info("正在查询公司主体name=%s", comp_name)
data = didi.get_legal_entities(company_id=company_id, offset=0, length=100, keyword=comp_name)
records = data.get("records") or []
if not isinstance(records, list):
raise RuntimeError("LegalEntity.get invalid: records not a list")
matches = [r for r in records if isinstance(r, dict) and str(r.get("name") or "") == comp_name]
if len(matches) == 0:
skipped += 1
warn_count += 1
logger.warning("跳过:滴滴公司主体无精确匹配 company_name=%r employee_number=%s", comp_name, emp_no)
continue
if len(matches) > 1:
skipped += 1
msg = f"Multiple exact legal entity matches company_name={comp_name!r} count={len(matches)}"
errors.append(msg)
logger.error("跳过:滴滴公司主体精确匹配多条 company_name=%r count=%s employee_number=%s", comp_name, len(matches), emp_no)
continue
legal_entity_id = str(matches[0].get("legal_entity_id") or "").strip()
if not legal_entity_id:
skipped += 1
warn_count += 1
logger.warning("跳过:滴滴公司主体 legal_entity_id 为空 company_name=%r", comp_name)
continue
cache_legal_entity[comp_name] = legal_entity_id
logger.info("公司主体匹配成功name=%s legal_entity_id=%s", comp_name, legal_entity_id)
except Exception as e: # noqa: BLE001
skipped += 1
msg = f"LegalEntity lookup failed company_name={comp_name!r} err={e!r}"
errors.append(msg)
warn_count += 1
logger.warning("跳过:查询滴滴公司主体失败 company_name=%r err=%r", comp_name, e)
continue
else:
logger.info("公司主体缓存命中name=%s legal_entity_id=%s", comp_name, legal_entity_id)
# 员工查询
try:
logger.info("正在查询滴滴员工employee_number=%s", emp_no)
member = didi.get_member_detail(company_id=company_id, employee_number=emp_no)
member_id = str(member.get("member_id") or member.get("id") or "").strip()
if not member_id:
skipped += 1
warn_count += 1
logger.warning("跳过:滴滴员工明细缺少 member_id/id employee_number=%s", emp_no)
continue
logger.info("员工查询成功employee_number=%s member_id=%s", emp_no, member_id)
except Exception as e: # noqa: BLE001
skipped += 1
msg = f"Member.detail failed employee_number={emp_no!r} err={e!r}"
errors.append(msg)
warn_count += 1
logger.warning("跳过:查询滴滴员工失败 employee_number=%r err=%r", emp_no, e)
continue
# 员工更新(按文档要求:连续修改间隔 >=150ms
try:
logger.info("正在更新员工公司主体member_id=%s legal_entity_id=%s", member_id, legal_entity_id)
didi.edit_member_legal_entity(company_id=company_id, member_id=member_id, employee_number=None, legal_entity_id=legal_entity_id)
updated += 1
time.sleep(0.15)
logger.info("同步成功employee_number=%s legal_entity_id=%s", emp_no, legal_entity_id)
except Exception as e: # noqa: BLE001
skipped += 1
msg = f"Member.edit failed employee_number={emp_no!r} member_id={member_id!r} err={e!r}"
errors.append(msg)
warn_count += 1
logger.warning("同步失败employee_number=%r member_id=%r err=%r", emp_no, member_id, e)
continue
logger.info(
"同步完成:总行数=%s 成功=%s 跳过=%s warnings=%s errors=%s",
total_rows,
updated,
skipped,
warn_count,
len(errors),
)
return {
"total_rows": total_rows,
"updated_count": updated,
"skipped_count": skipped,
"errors": errors[:50],
}
finally:
didi.close()