from __future__ import annotations import json import logging import time from typing import Any from app.integrations.didi import DidiClient from app.integrations.seeyon import SeeyonClient from app.jobs.base import BaseJob logger = logging.getLogger("connecthub.extensions.sync_oa_to_didi") def _mask_token(token: str) -> str: token = token or "" if len(token) <= 12: return "***" return f"{token[:6]}***{token[-4:]}" def _log_text_in_chunks(*, prefix: str, text: str, chunk_bytes: int = 8_000) -> None: """ 将大文本尽可能写入 run_log: - 按 UTF-8 字节切分,避免单条日志过大导致整条无法写入(capture_logs 会在超过 max_bytes 时丢弃整条并标记截断) - 由上层 capture_logs(max_bytes=200_000) 负责总量截断 """ try: if not text: logger.info("%s ", prefix) return if chunk_bytes <= 0: chunk_bytes = 8_000 raw_bytes = text.encode("utf-8", errors="replace") total = (len(raw_bytes) + chunk_bytes - 1) // chunk_bytes for i in range(total): b = raw_bytes[i * chunk_bytes : (i + 1) * chunk_bytes] chunk = b.decode("utf-8", errors="replace") logger.info("%s chunk %s/%s: %s", prefix, i + 1, total, chunk) except Exception: # run_log 捕获属于“尽力而为”,任何异常都不应影响任务执行 return class SyncOAToDidiTokenJob(BaseJob): """ 示例 Job:演示致远 OA 的 token 获取与日志记录 public_cfg: - base_url: "https://oa.example.com" secret_cfg (解密后): - rest_user - rest_password - loginName (可选) """ job_id = "sync_oa_to_didi.token_demo" def run(self, params: dict[str, Any], secrets: dict[str, Any]) -> dict[str, Any]: base_url = str(params.get("base_url") or "").strip() if not base_url: raise ValueError("public_cfg.base_url is required") rest_user = str(secrets.get("rest_user") or "").strip() rest_password = str(secrets.get("rest_password") or "").strip() login_name = secrets.get("loginName") login_name = str(login_name).strip() if login_name else None if not rest_user or not rest_password: raise ValueError("secret_cfg.rest_user and secret_cfg.rest_password are required") client = SeeyonClient(base_url=base_url, rest_user=rest_user, rest_password=rest_password, loginName=login_name) try: token = client.authenticate() finally: client.close() masked = _mask_token(token) logger.info("Seeyon token acquired (masked) token=%s loginName=%s base_url=%s", masked, login_name, base_url) return {"token_masked": masked, "loginName": login_name or "", "base_url": base_url} class SyncOAToDidiExportFormJob(BaseJob): """ 无流程表单导出(CAP4): - 调用:POST /seeyon/rest/cap4/form/soap/export - base_url 不包含 /seeyon/rest(例如 https://oa.example.com:8090) public_cfg: - base_url: "https://oa.example.com:8090" - templateCode: "employee" - senderLoginName: "xxx" (可选) - rightId: "xxx" (可选) - doTrigger: "true" (可选) - param: "0" (可选) - extra: {...} (可选,兜底扩展字段) secret_cfg (解密后): - rest_user - rest_password - loginName """ job_id = "sync_oa_to_didi.export_form_soap" def run(self, params: dict[str, Any], secrets: dict[str, Any]) -> dict[str, Any]: base_url = str(params.get("base_url") or "").strip() if not base_url: raise ValueError("public_cfg.base_url is required") template_code = str(params.get("templateCode") or "").strip() if not template_code: raise ValueError("public_cfg.templateCode is required") sender_login_name = params.get("senderLoginName") sender_login_name = str(sender_login_name).strip() if sender_login_name else None right_id = params.get("rightId") right_id = str(right_id).strip() if right_id else None do_trigger = params.get("doTrigger") param = params.get("param") param = str(param) if param is not None else None extra = params.get("extra") if extra is not None and not isinstance(extra, dict): raise ValueError("public_cfg.extra must be a JSON object (dict) if provided") rest_user = str(secrets.get("rest_user") or "").strip() rest_password = str(secrets.get("rest_password") or "").strip() login_name = secrets.get("loginName") login_name = str(login_name).strip() if login_name else None if not rest_user or not rest_password: raise ValueError("secret_cfg.rest_user and secret_cfg.rest_password are required") client = SeeyonClient(base_url=base_url, rest_user=rest_user, rest_password=rest_password, loginName=login_name) try: resp = client.export_cap4_form_soap( templateCode=template_code, senderLoginName=sender_login_name, rightId=right_id, doTrigger=do_trigger, param=param, extra=extra, ) raw_text = resp.text or "" content_type = resp.headers.get("content-type", "") if getattr(resp, "headers", None) else "" finally: client.close() # 避免把 raw_text 打到日志或 run_log(会被截断且污染 JobLog) logger.info( "Seeyon export_form_soap done templateCode=%s content_length=%s content_type=%s base_url=%s", template_code, len(raw_text), content_type, base_url, ) _log_text_in_chunks(prefix="Seeyon export_form_soap raw", text=raw_text, chunk_bytes=8_000) return { "raw": raw_text, "meta": { "templateCode": template_code, "content_length": len(raw_text), "content_type": content_type, }, } class SyncOAToDidiLegalEntitySyncJob(BaseJob): """ 从 OA 无流程表单导出中读取“工号/所属公司”,并同步到滴滴: - 公司主体:GET /river/LegalEntity/get(keyword=name),name 完全相等优先 - 员工查询:GET /river/Member/detail(employee_number=工号) - 员工更新:POST /river/Member/edit(更新 legal_entity_id) public_cfg: - oa_base_url: "https://oa.example.com:8090" - oa_templateCode: "employee" - didi_base_url: "https://api.es.xiaojukeji.com" - senderLoginName/rightId/doTrigger/param/extra: 可选(透传到 OA 导出) secret_cfg (解密后): - rest_user/rest_password/loginName: OA 登录 - company_id/client_id/client_secret/sign_key: 滴滴凭证 - phone: 可选(此 Job 不使用) """ job_id = "sync_oa_to_didi.sync_legal_entity" def run(self, params: dict[str, Any], secrets: dict[str, Any]) -> dict[str, Any]: oa_base_url = str(params.get("oa_base_url") or "").strip() if not oa_base_url: raise ValueError("public_cfg.oa_base_url is required") oa_template_code = str(params.get("oa_templateCode") or "").strip() if not oa_template_code: raise ValueError("public_cfg.oa_templateCode is required") didi_base_url = str(params.get("didi_base_url") or "").strip() if not didi_base_url: raise ValueError("public_cfg.didi_base_url is required") rest_user = str(secrets.get("rest_user") or "").strip() rest_password = str(secrets.get("rest_password") or "").strip() login_name = secrets.get("loginName") login_name = str(login_name).strip() if login_name else None if not rest_user or not rest_password: raise ValueError("secret_cfg.rest_user and secret_cfg.rest_password are required") company_id = str(secrets.get("company_id") or "").strip() client_id = str(secrets.get("client_id") or "").strip() client_secret = str(secrets.get("client_secret") or "").strip() sign_key = str(secrets.get("sign_key") or "").strip() if not company_id or not client_id or not client_secret or not sign_key: raise ValueError("secret_cfg.company_id/client_id/client_secret/sign_key are required") sender_login_name = params.get("senderLoginName") sender_login_name = str(sender_login_name).strip() if sender_login_name else None right_id = params.get("rightId") right_id = str(right_id).strip() if right_id else None do_trigger = params.get("doTrigger") param = params.get("param") param = str(param) if param is not None else None extra = params.get("extra") if extra is not None and not isinstance(extra, dict): raise ValueError("public_cfg.extra must be a JSON object (dict) if provided") seeyon = SeeyonClient(base_url=oa_base_url, rest_user=rest_user, rest_password=rest_password, loginName=login_name) try: resp = seeyon.export_cap4_form_soap( templateCode=oa_template_code, senderLoginName=sender_login_name, rightId=right_id, doTrigger=do_trigger, param=param, extra=extra, ) raw = resp.text or "" content_type = resp.headers.get("content-type", "") if getattr(resp, "headers", None) else "" logger.info("Seeyon export_form_soap done templateCode=%s content_length=%s content_type=%s base_url=%s", oa_template_code, len(raw), content_type, oa_base_url) finally: seeyon.close() payload = json.loads(raw) if raw else {} logger.info(payload) outer = payload.get("data") or {} inner = outer.get("data") or {} inner2 = inner.get("data") or {} definition = inner2.get("definition") or {} logger.info(definition) fields = definition.get("fields") or [] if not isinstance(fields, list): raise RuntimeError("OA export invalid: definition.fields is not a list") emp_field: str | None = None company_field: str | None = None for f in fields: if not isinstance(f, dict): continue display = str(f.get("display") or "") name = str(f.get("name") or "") if display == "工号" and name: emp_field = name if display == "所属公司" and name: company_field = name if not emp_field or not company_field: raise RuntimeError("OA export invalid: cannot locate fields for 工号/所属公司 in definition.fields") rows = inner2.get("data") or [] if not isinstance(rows, list): raise RuntimeError("OA export invalid: data is not a list") didi = DidiClient( base_url=didi_base_url, client_id=client_id, client_secret=client_secret, sign_key=sign_key, ) try: cache_legal_entity: dict[str, str] = {} total_rows = 0 updated = 0 skipped = 0 errors: list[str] = [] for row in rows: total_rows += 1 if not isinstance(row, dict): skipped += 1 logger.warning("OA row is not a dict, skipped") continue master = row.get("masterData") or {} if not isinstance(master, dict): skipped += 1 logger.warning("OA row masterData is not a dict, skipped") continue emp_obj = master.get(emp_field) or {} comp_obj = master.get(company_field) or {} emp_no = "" comp_name = "" if isinstance(emp_obj, dict): emp_no = str(emp_obj.get("value") or emp_obj.get("showValue") or "").strip() if isinstance(comp_obj, dict): comp_name = str(comp_obj.get("value") or comp_obj.get("showValue") or "").strip() if not emp_no or not comp_name: skipped += 1 logger.warning("Missing employee_number/company_name, skipped employee_number=%r company_name=%r", emp_no, comp_name) continue # 公司主体匹配(进程内缓存) legal_entity_id = cache_legal_entity.get(comp_name) if not legal_entity_id: try: data = didi.get_legal_entities(company_id=company_id, offset=0, length=100, keyword=comp_name) records = data.get("records") or [] if not isinstance(records, list): raise RuntimeError("LegalEntity.get invalid: records not a list") matches = [r for r in records if isinstance(r, dict) and str(r.get("name") or "") == comp_name] if len(matches) == 0: skipped += 1 logger.warning("No exact legal entity match, skipped company_name=%r employee_number=%s", comp_name, emp_no) continue if len(matches) > 1: skipped += 1 msg = f"Multiple exact legal entity matches company_name={comp_name!r} count={len(matches)}" errors.append(msg) logger.error(msg) continue legal_entity_id = str(matches[0].get("legal_entity_id") or "").strip() if not legal_entity_id: skipped += 1 logger.warning("Exact match legal_entity_id empty, skipped company_name=%r", comp_name) continue cache_legal_entity[comp_name] = legal_entity_id except Exception as e: # noqa: BLE001 skipped += 1 msg = f"LegalEntity lookup failed company_name={comp_name!r} err={e!r}" errors.append(msg) logger.warning(msg) continue # 员工查询 try: member = didi.get_member_detail(company_id=company_id, employee_number=emp_no) member_id = str(member.get("member_id") or member.get("id") or "").strip() if not member_id: skipped += 1 logger.warning("Member detail missing member_id/id, skipped employee_number=%s", emp_no) continue except Exception as e: # noqa: BLE001 skipped += 1 msg = f"Member.detail failed employee_number={emp_no!r} err={e!r}" errors.append(msg) logger.warning(msg) continue # 员工更新(按文档要求:连续修改间隔 >=150ms) try: didi.edit_member_legal_entity(company_id=company_id, member_id=member_id, employee_number=None, legal_entity_id=legal_entity_id) updated += 1 time.sleep(0.15) except Exception as e: # noqa: BLE001 skipped += 1 msg = f"Member.edit failed employee_number={emp_no!r} member_id={member_id!r} err={e!r}" errors.append(msg) logger.warning(msg) continue return { "total_rows": total_rows, "updated_count": updated, "skipped_count": skipped, "errors": errors[:50], } finally: didi.close()