Vastai-ConnectHub/extensions/sync_oa_to_didi/job.py

382 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import json
import logging
import time
from typing import Any
from app.integrations.didi import DidiClient
from app.integrations.seeyon import SeeyonClient
from app.jobs.base import BaseJob
logger = logging.getLogger("connecthub.extensions.sync_oa_to_didi")
def _mask_token(token: str) -> str:
token = token or ""
if len(token) <= 12:
return "***"
return f"{token[:6]}***{token[-4:]}"
def _log_text_in_chunks(*, prefix: str, text: str, chunk_bytes: int = 8_000) -> None:
"""
将大文本尽可能写入 run_log
- 按 UTF-8 字节切分避免单条日志过大导致整条无法写入capture_logs 会在超过 max_bytes 时丢弃整条并标记截断)
- 由上层 capture_logs(max_bytes=200_000) 负责总量截断
"""
try:
if not text:
logger.info("%s <empty>", prefix)
return
if chunk_bytes <= 0:
chunk_bytes = 8_000
raw_bytes = text.encode("utf-8", errors="replace")
total = (len(raw_bytes) + chunk_bytes - 1) // chunk_bytes
for i in range(total):
b = raw_bytes[i * chunk_bytes : (i + 1) * chunk_bytes]
chunk = b.decode("utf-8", errors="replace")
logger.info("%s chunk %s/%s: %s", prefix, i + 1, total, chunk)
except Exception:
# run_log 捕获属于“尽力而为”,任何异常都不应影响任务执行
return
class SyncOAToDidiTokenJob(BaseJob):
"""
示例 Job演示致远 OA 的 token 获取与日志记录
public_cfg:
- base_url: "https://oa.example.com"
secret_cfg (解密后):
- rest_user
- rest_password
- loginName (可选)
"""
job_id = "sync_oa_to_didi.token_demo"
def run(self, params: dict[str, Any], secrets: dict[str, Any]) -> dict[str, Any]:
base_url = str(params.get("base_url") or "").strip()
if not base_url:
raise ValueError("public_cfg.base_url is required")
rest_user = str(secrets.get("rest_user") or "").strip()
rest_password = str(secrets.get("rest_password") or "").strip()
login_name = secrets.get("loginName")
login_name = str(login_name).strip() if login_name else None
if not rest_user or not rest_password:
raise ValueError("secret_cfg.rest_user and secret_cfg.rest_password are required")
client = SeeyonClient(base_url=base_url, rest_user=rest_user, rest_password=rest_password, loginName=login_name)
try:
token = client.authenticate()
finally:
client.close()
masked = _mask_token(token)
logger.info("Seeyon token acquired (masked) token=%s loginName=%s base_url=%s", masked, login_name, base_url)
return {"token_masked": masked, "loginName": login_name or "", "base_url": base_url}
class SyncOAToDidiExportFormJob(BaseJob):
"""
无流程表单导出CAP4
- 调用POST /seeyon/rest/cap4/form/soap/export
- base_url 不包含 /seeyon/rest例如 https://oa.example.com:8090
public_cfg:
- base_url: "https://oa.example.com:8090"
- templateCode: "employee"
- senderLoginName: "xxx" (可选)
- rightId: "xxx" (可选)
- doTrigger: "true" (可选)
- param: "0" (可选)
- extra: {...} (可选,兜底扩展字段)
secret_cfg (解密后):
- rest_user
- rest_password
- loginName
"""
job_id = "sync_oa_to_didi.export_form_soap"
def run(self, params: dict[str, Any], secrets: dict[str, Any]) -> dict[str, Any]:
base_url = str(params.get("base_url") or "").strip()
if not base_url:
raise ValueError("public_cfg.base_url is required")
template_code = str(params.get("templateCode") or "").strip()
if not template_code:
raise ValueError("public_cfg.templateCode is required")
sender_login_name = params.get("senderLoginName")
sender_login_name = str(sender_login_name).strip() if sender_login_name else None
right_id = params.get("rightId")
right_id = str(right_id).strip() if right_id else None
do_trigger = params.get("doTrigger")
param = params.get("param")
param = str(param) if param is not None else None
extra = params.get("extra")
if extra is not None and not isinstance(extra, dict):
raise ValueError("public_cfg.extra must be a JSON object (dict) if provided")
rest_user = str(secrets.get("rest_user") or "").strip()
rest_password = str(secrets.get("rest_password") or "").strip()
login_name = secrets.get("loginName")
login_name = str(login_name).strip() if login_name else None
if not rest_user or not rest_password:
raise ValueError("secret_cfg.rest_user and secret_cfg.rest_password are required")
client = SeeyonClient(base_url=base_url, rest_user=rest_user, rest_password=rest_password, loginName=login_name)
try:
resp = client.export_cap4_form_soap(
templateCode=template_code,
senderLoginName=sender_login_name,
rightId=right_id,
doTrigger=do_trigger,
param=param,
extra=extra,
)
raw_text = resp.text or ""
content_type = resp.headers.get("content-type", "") if getattr(resp, "headers", None) else ""
finally:
client.close()
# 避免把 raw_text 打到日志或 run_log会被截断且污染 JobLog
logger.info(
"Seeyon export_form_soap done templateCode=%s content_length=%s content_type=%s base_url=%s",
template_code,
len(raw_text),
content_type,
base_url,
)
_log_text_in_chunks(prefix="Seeyon export_form_soap raw", text=raw_text, chunk_bytes=8_000)
return {
"raw": raw_text,
"meta": {
"templateCode": template_code,
"content_length": len(raw_text),
"content_type": content_type,
},
}
class SyncOAToDidiLegalEntitySyncJob(BaseJob):
"""
从 OA 无流程表单导出中读取“工号/所属公司”,并同步到滴滴:
- 公司主体GET /river/LegalEntity/getkeyword=namename 完全相等优先
- 员工查询GET /river/Member/detailemployee_number=工号)
- 员工更新POST /river/Member/edit更新 legal_entity_id
public_cfg:
- oa_base_url: "https://oa.example.com:8090"
- oa_templateCode: "employee"
- didi_base_url: "https://api.es.xiaojukeji.com"
- senderLoginName/rightId/doTrigger/param/extra: 可选(透传到 OA 导出)
secret_cfg (解密后):
- rest_user/rest_password/loginName: OA 登录
- company_id/client_id/client_secret/sign_key: 滴滴凭证
- phone: 可选(此 Job 不使用)
"""
job_id = "sync_oa_to_didi.sync_legal_entity"
def run(self, params: dict[str, Any], secrets: dict[str, Any]) -> dict[str, Any]:
oa_base_url = str(params.get("oa_base_url") or "").strip()
if not oa_base_url:
raise ValueError("public_cfg.oa_base_url is required")
oa_template_code = str(params.get("oa_templateCode") or "").strip()
if not oa_template_code:
raise ValueError("public_cfg.oa_templateCode is required")
didi_base_url = str(params.get("didi_base_url") or "").strip()
if not didi_base_url:
raise ValueError("public_cfg.didi_base_url is required")
rest_user = str(secrets.get("rest_user") or "").strip()
rest_password = str(secrets.get("rest_password") or "").strip()
login_name = secrets.get("loginName")
login_name = str(login_name).strip() if login_name else None
if not rest_user or not rest_password:
raise ValueError("secret_cfg.rest_user and secret_cfg.rest_password are required")
company_id = str(secrets.get("company_id") or "").strip()
client_id = str(secrets.get("client_id") or "").strip()
client_secret = str(secrets.get("client_secret") or "").strip()
sign_key = str(secrets.get("sign_key") or "").strip()
if not company_id or not client_id or not client_secret or not sign_key:
raise ValueError("secret_cfg.company_id/client_id/client_secret/sign_key are required")
sender_login_name = params.get("senderLoginName")
sender_login_name = str(sender_login_name).strip() if sender_login_name else None
right_id = params.get("rightId")
right_id = str(right_id).strip() if right_id else None
do_trigger = params.get("doTrigger")
param = params.get("param")
param = str(param) if param is not None else None
extra = params.get("extra")
if extra is not None and not isinstance(extra, dict):
raise ValueError("public_cfg.extra must be a JSON object (dict) if provided")
seeyon = SeeyonClient(base_url=oa_base_url, rest_user=rest_user, rest_password=rest_password, loginName=login_name)
try:
resp = seeyon.export_cap4_form_soap(
templateCode=oa_template_code,
senderLoginName=sender_login_name,
rightId=right_id,
doTrigger=do_trigger,
param=param,
extra=extra,
)
raw = resp.text or ""
finally:
seeyon.close()
payload = json.loads(raw) if raw else {}
outer = payload.get("data") or {}
inner = outer.get("data") or {}
inner2 = inner.get("data") or {}
definition = inner2.get("definition") or {}
fields = definition.get("fields") or []
if not isinstance(fields, list):
raise RuntimeError("OA export invalid: definition.fields is not a list")
emp_field: str | None = None
company_field: str | None = None
for f in fields:
if not isinstance(f, dict):
continue
display = str(f.get("display") or "")
name = str(f.get("name") or "")
if display == "工号" and name:
emp_field = name
if display == "所属公司" and name:
company_field = name
if not emp_field or not company_field:
raise RuntimeError("OA export invalid: cannot locate fields for 工号/所属公司 in definition.fields")
rows = inner2.get("data") or []
if not isinstance(rows, list):
raise RuntimeError("OA export invalid: data is not a list")
didi = DidiClient(
base_url=didi_base_url,
client_id=client_id,
client_secret=client_secret,
sign_key=sign_key,
)
try:
cache_legal_entity: dict[str, str] = {}
total_rows = 0
updated = 0
skipped = 0
errors: list[str] = []
for row in rows:
total_rows += 1
if not isinstance(row, dict):
skipped += 1
logger.warning("OA row is not a dict, skipped")
continue
master = row.get("masterData") or {}
if not isinstance(master, dict):
skipped += 1
logger.warning("OA row masterData is not a dict, skipped")
continue
emp_obj = master.get(emp_field) or {}
comp_obj = master.get(company_field) or {}
emp_no = ""
comp_name = ""
if isinstance(emp_obj, dict):
emp_no = str(emp_obj.get("value") or emp_obj.get("showValue") or "").strip()
if isinstance(comp_obj, dict):
comp_name = str(comp_obj.get("value") or comp_obj.get("showValue") or "").strip()
if not emp_no or not comp_name:
skipped += 1
logger.warning("Missing employee_number/company_name, skipped employee_number=%r company_name=%r", emp_no, comp_name)
continue
# 公司主体匹配(进程内缓存)
legal_entity_id = cache_legal_entity.get(comp_name)
if not legal_entity_id:
try:
data = didi.get_legal_entities(company_id=company_id, offset=0, length=100, keyword=comp_name)
records = data.get("records") or []
if not isinstance(records, list):
raise RuntimeError("LegalEntity.get invalid: records not a list")
matches = [r for r in records if isinstance(r, dict) and str(r.get("name") or "") == comp_name]
if len(matches) == 0:
skipped += 1
logger.warning("No exact legal entity match, skipped company_name=%r employee_number=%s", comp_name, emp_no)
continue
if len(matches) > 1:
skipped += 1
msg = f"Multiple exact legal entity matches company_name={comp_name!r} count={len(matches)}"
errors.append(msg)
logger.error(msg)
continue
legal_entity_id = str(matches[0].get("legal_entity_id") or "").strip()
if not legal_entity_id:
skipped += 1
logger.warning("Exact match legal_entity_id empty, skipped company_name=%r", comp_name)
continue
cache_legal_entity[comp_name] = legal_entity_id
except Exception as e: # noqa: BLE001
skipped += 1
msg = f"LegalEntity lookup failed company_name={comp_name!r} err={e!r}"
errors.append(msg)
logger.warning(msg)
continue
# 员工查询
try:
member = didi.get_member_detail(company_id=company_id, employee_number=emp_no)
member_id = str(member.get("member_id") or member.get("id") or "").strip()
if not member_id:
skipped += 1
logger.warning("Member detail missing member_id/id, skipped employee_number=%s", emp_no)
continue
except Exception as e: # noqa: BLE001
skipped += 1
msg = f"Member.detail failed employee_number={emp_no!r} err={e!r}"
errors.append(msg)
logger.warning(msg)
continue
# 员工更新(按文档要求:连续修改间隔 >=150ms
try:
didi.edit_member_legal_entity(company_id=company_id, member_id=member_id, employee_number=None, legal_entity_id=legal_entity_id)
updated += 1
time.sleep(0.15)
except Exception as e: # noqa: BLE001
skipped += 1
msg = f"Member.edit failed employee_number={emp_no!r} member_id={member_id!r} err={e!r}"
errors.append(msg)
logger.warning(msg)
continue
return {
"total_rows": total_rows,
"updated_count": updated,
"skipped_count": skipped,
"errors": errors[:50],
}
finally:
didi.close()