from __future__ import annotations import logging import os from contextlib import contextmanager from typing import Callable, Iterator class JobLogIdFilter(logging.Filter): """ 仅允许写入“属于指定 job_log_id”的 LogRecord。 依赖 setup_logging() 安装的 LogRecordFactory 注入字段:connecthub_job_log_id。 """ def __init__(self, *, job_log_id: int) -> None: super().__init__() self.job_log_id = int(job_log_id) def filter(self, record: logging.LogRecord) -> bool: # noqa: A003 try: return getattr(record, "connecthub_job_log_id", None) == self.job_log_id except Exception: return False class SafeBufferingHandler(logging.Handler): """ 只用于“尽力捕获”运行日志: - emit 内部全 try/except,任何异常都吞掉,绝不影响任务执行 - 有最大字节限制,超过后写入截断标记并停止追加 """ def __init__(self, *, max_bytes: int = 200_000, level: int = logging.INFO) -> None: super().__init__(level=level) self.max_bytes = max_bytes self._buf: list[str] = [] self._size_bytes = 0 self._truncated = False def emit(self, record: logging.LogRecord) -> None: # noqa: D401 try: if self._truncated: return try: msg = self.format(record) except Exception: return line = msg + "\n" try: b = line.encode("utf-8", errors="replace") except Exception: return if self._size_bytes + len(b) > self.max_bytes: self._buf.append("[TRUNCATED] run_log exceeded max_bytes\n") self._truncated = True return self._buf.append(line) self._size_bytes += len(b) except Exception: # 双保险:任何异常都不能冒泡 return def get_text(self) -> str: try: return "".join(self._buf) except Exception: return "" @contextmanager def capture_logs( *, max_bytes: int = 200_000, job_log_id: int | None = None, file_path: str | None = None, ) -> Iterator[Callable[[], str]]: """ 捕获当前进程(root logger)输出的日志文本。 任何问题都不应影响业务执行。 """ root = logging.getLogger() handler = SafeBufferingHandler(max_bytes=max_bytes) handler.setLevel(logging.INFO) handler.setFormatter( logging.Formatter(fmt="%(asctime)s %(levelname)s %(name)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S") ) file_handler: logging.Handler | None = None flt: JobLogIdFilter | None = None if job_log_id is not None: try: flt = JobLogIdFilter(job_log_id=int(job_log_id)) handler.addFilter(flt) except Exception: flt = None if file_path: try: parent = os.path.dirname(file_path) if parent: os.makedirs(parent, exist_ok=True) fh = logging.FileHandler(file_path, encoding="utf-8") fh.setLevel(logging.INFO) fh.setFormatter( logging.Formatter(fmt="%(asctime)s %(levelname)s %(name)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S") ) if flt is not None: fh.addFilter(flt) file_handler = fh except Exception: file_handler = None try: root.addHandler(handler) except Exception: # 无法挂载则降级为空 yield lambda: "" return if file_handler is not None: try: root.addHandler(file_handler) except Exception: try: file_handler.close() except Exception: pass file_handler = None try: yield handler.get_text finally: try: root.removeHandler(handler) except Exception: pass if file_handler is not None: try: root.removeHandler(file_handler) except Exception: pass try: file_handler.close() except Exception: pass