from __future__ import annotations import logging from contextlib import contextmanager from typing import Callable, Iterator class SafeBufferingHandler(logging.Handler): """ 只用于“尽力捕获”运行日志: - emit 内部全 try/except,任何异常都吞掉,绝不影响任务执行 - 有最大字节限制,超过后写入截断标记并停止追加 """ def __init__(self, *, max_bytes: int = 200_000, level: int = logging.INFO) -> None: super().__init__(level=level) self.max_bytes = max_bytes self._buf: list[str] = [] self._size_bytes = 0 self._truncated = False def emit(self, record: logging.LogRecord) -> None: # noqa: D401 try: if self._truncated: return try: msg = self.format(record) except Exception: return line = msg + "\n" try: b = line.encode("utf-8", errors="replace") except Exception: return if self._size_bytes + len(b) > self.max_bytes: self._buf.append("[TRUNCATED] run_log exceeded max_bytes\n") self._truncated = True return self._buf.append(line) self._size_bytes += len(b) except Exception: # 双保险:任何异常都不能冒泡 return def get_text(self) -> str: try: return "".join(self._buf) except Exception: return "" @contextmanager def capture_logs(*, max_bytes: int = 200_000) -> Iterator[Callable[[], str]]: """ 捕获当前进程(root logger)输出的日志文本。 任何问题都不应影响业务执行。 """ root = logging.getLogger() handler = SafeBufferingHandler(max_bytes=max_bytes) handler.setLevel(logging.INFO) handler.setFormatter( logging.Formatter(fmt="%(asctime)s %(levelname)s %(name)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S") ) try: root.addHandler(handler) except Exception: # 无法挂载则降级为空 yield lambda: "" return try: yield handler.get_text finally: try: root.removeHandler(handler) except Exception: pass