from __future__ import annotations import logging from typing import Any from app.integrations.seeyon import SeeyonClient from app.jobs.base import BaseJob logger = logging.getLogger("connecthub.extensions.sync_oa_to_didi") def _mask_token(token: str) -> str: token = token or "" if len(token) <= 12: return "***" return f"{token[:6]}***{token[-4:]}" def _log_text_in_chunks(*, prefix: str, text: str, chunk_bytes: int = 8_000) -> None: """ 将大文本尽可能写入 run_log: - 按 UTF-8 字节切分,避免单条日志过大导致整条无法写入(capture_logs 会在超过 max_bytes 时丢弃整条并标记截断) - 由上层 capture_logs(max_bytes=200_000) 负责总量截断 """ try: if not text: logger.info("%s ", prefix) return if chunk_bytes <= 0: chunk_bytes = 8_000 raw_bytes = text.encode("utf-8", errors="replace") total = (len(raw_bytes) + chunk_bytes - 1) // chunk_bytes for i in range(total): b = raw_bytes[i * chunk_bytes : (i + 1) * chunk_bytes] chunk = b.decode("utf-8", errors="replace") logger.info("%s chunk %s/%s: %s", prefix, i + 1, total, chunk) except Exception: # run_log 捕获属于“尽力而为”,任何异常都不应影响任务执行 return class SyncOAToDidiTokenJob(BaseJob): """ 示例 Job:演示致远 OA 的 token 获取与日志记录 public_cfg: - base_url: "https://oa.example.com" secret_cfg (解密后): - rest_user - rest_password - loginName (可选) """ job_id = "sync_oa_to_didi.token_demo" def run(self, params: dict[str, Any], secrets: dict[str, Any]) -> dict[str, Any]: base_url = str(params.get("base_url") or "").strip() if not base_url: raise ValueError("public_cfg.base_url is required") rest_user = str(secrets.get("rest_user") or "").strip() rest_password = str(secrets.get("rest_password") or "").strip() login_name = secrets.get("loginName") login_name = str(login_name).strip() if login_name else None if not rest_user or not rest_password: raise ValueError("secret_cfg.rest_user and secret_cfg.rest_password are required") client = SeeyonClient(base_url=base_url, rest_user=rest_user, rest_password=rest_password, loginName=login_name) try: token = client.authenticate() finally: client.close() masked = _mask_token(token) logger.info("Seeyon token acquired (masked) token=%s loginName=%s base_url=%s", masked, login_name, base_url) return {"token_masked": masked, "loginName": login_name or "", "base_url": base_url} class SyncOAToDidiExportFormJob(BaseJob): """ 无流程表单导出(CAP4): - 调用:POST /seeyon/rest/cap4/form/soap/export - base_url 不包含 /seeyon/rest(例如 https://oa.example.com:8090) public_cfg: - base_url: "https://oa.example.com:8090" - templateCode: "employee" - senderLoginName: "xxx" (可选) - rightId: "xxx" (可选) - doTrigger: "true" (可选) - param: "0" (可选) - extra: {...} (可选,兜底扩展字段) secret_cfg (解密后): - rest_user - rest_password - loginName (可选) """ job_id = "sync_oa_to_didi.export_form_soap" def run(self, params: dict[str, Any], secrets: dict[str, Any]) -> dict[str, Any]: base_url = str(params.get("base_url") or "").strip() if not base_url: raise ValueError("public_cfg.base_url is required") template_code = str(params.get("templateCode") or "").strip() if not template_code: raise ValueError("public_cfg.templateCode is required") sender_login_name = params.get("senderLoginName") sender_login_name = str(sender_login_name).strip() if sender_login_name else None right_id = params.get("rightId") right_id = str(right_id).strip() if right_id else None do_trigger = params.get("doTrigger") param = params.get("param") param = str(param) if param is not None else None extra = params.get("extra") if extra is not None and not isinstance(extra, dict): raise ValueError("public_cfg.extra must be a JSON object (dict) if provided") rest_user = str(secrets.get("rest_user") or "").strip() rest_password = str(secrets.get("rest_password") or "").strip() login_name = secrets.get("loginName") login_name = str(login_name).strip() if login_name else None if not rest_user or not rest_password: raise ValueError("secret_cfg.rest_user and secret_cfg.rest_password are required") client = SeeyonClient(base_url=base_url, rest_user=rest_user, rest_password=rest_password, loginName=login_name) try: resp = client.export_cap4_form_soap( templateCode=template_code, senderLoginName=sender_login_name, rightId=right_id, doTrigger=do_trigger, param=param, extra=extra, ) raw_text = resp.text or "" content_type = resp.headers.get("content-type", "") if getattr(resp, "headers", None) else "" finally: client.close() # 避免把 raw_text 打到日志或 run_log(会被截断且污染 JobLog) logger.info( "Seeyon export_form_soap done templateCode=%s content_length=%s content_type=%s base_url=%s", template_code, len(raw_text), content_type, base_url, ) _log_text_in_chunks(prefix="Seeyon export_form_soap raw", text=raw_text, chunk_bytes=8_000) return { "raw": raw_text, "meta": { "templateCode": template_code, "content_length": len(raw_text), "content_type": content_type, }, }