150 lines
4.2 KiB
Python
150 lines
4.2 KiB
Python
from __future__ import annotations
|
||
|
||
import logging
|
||
import os
|
||
from contextlib import contextmanager
|
||
from typing import Callable, Iterator
|
||
|
||
|
||
class JobLogIdFilter(logging.Filter):
|
||
"""
|
||
仅允许写入“属于指定 job_log_id”的 LogRecord。
|
||
依赖 setup_logging() 安装的 LogRecordFactory 注入字段:connecthub_job_log_id。
|
||
"""
|
||
|
||
def __init__(self, *, job_log_id: int) -> None:
|
||
super().__init__()
|
||
self.job_log_id = int(job_log_id)
|
||
|
||
def filter(self, record: logging.LogRecord) -> bool: # noqa: A003
|
||
try:
|
||
return getattr(record, "connecthub_job_log_id", None) == self.job_log_id
|
||
except Exception:
|
||
return False
|
||
|
||
|
||
class SafeBufferingHandler(logging.Handler):
|
||
"""
|
||
只用于“尽力捕获”运行日志:
|
||
- emit 内部全 try/except,任何异常都吞掉,绝不影响任务执行
|
||
- 有最大字节限制,超过后写入截断标记并停止追加
|
||
"""
|
||
|
||
def __init__(self, *, max_bytes: int = 200_000, level: int = logging.INFO) -> None:
|
||
super().__init__(level=level)
|
||
self.max_bytes = max_bytes
|
||
self._buf: list[str] = []
|
||
self._size_bytes = 0
|
||
self._truncated = False
|
||
|
||
def emit(self, record: logging.LogRecord) -> None: # noqa: D401
|
||
try:
|
||
if self._truncated:
|
||
return
|
||
try:
|
||
msg = self.format(record)
|
||
except Exception:
|
||
return
|
||
line = msg + "\n"
|
||
try:
|
||
b = line.encode("utf-8", errors="replace")
|
||
except Exception:
|
||
return
|
||
|
||
if self._size_bytes + len(b) > self.max_bytes:
|
||
self._buf.append("[TRUNCATED] run_log exceeded max_bytes\n")
|
||
self._truncated = True
|
||
return
|
||
|
||
self._buf.append(line)
|
||
self._size_bytes += len(b)
|
||
except Exception:
|
||
# 双保险:任何异常都不能冒泡
|
||
return
|
||
|
||
def get_text(self) -> str:
|
||
try:
|
||
return "".join(self._buf)
|
||
except Exception:
|
||
return ""
|
||
|
||
|
||
@contextmanager
|
||
def capture_logs(
|
||
*,
|
||
max_bytes: int = 200_000,
|
||
job_log_id: int | None = None,
|
||
file_path: str | None = None,
|
||
) -> Iterator[Callable[[], str]]:
|
||
"""
|
||
捕获当前进程(root logger)输出的日志文本。
|
||
任何问题都不应影响业务执行。
|
||
"""
|
||
root = logging.getLogger()
|
||
handler = SafeBufferingHandler(max_bytes=max_bytes)
|
||
handler.setLevel(logging.INFO)
|
||
handler.setFormatter(
|
||
logging.Formatter(fmt="%(asctime)s %(levelname)s %(name)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
|
||
)
|
||
|
||
file_handler: logging.Handler | None = None
|
||
flt: JobLogIdFilter | None = None
|
||
if job_log_id is not None:
|
||
try:
|
||
flt = JobLogIdFilter(job_log_id=int(job_log_id))
|
||
handler.addFilter(flt)
|
||
except Exception:
|
||
flt = None
|
||
|
||
if file_path:
|
||
try:
|
||
parent = os.path.dirname(file_path)
|
||
if parent:
|
||
os.makedirs(parent, exist_ok=True)
|
||
fh = logging.FileHandler(file_path, encoding="utf-8")
|
||
fh.setLevel(logging.INFO)
|
||
fh.setFormatter(
|
||
logging.Formatter(fmt="%(asctime)s %(levelname)s %(name)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
|
||
)
|
||
if flt is not None:
|
||
fh.addFilter(flt)
|
||
file_handler = fh
|
||
except Exception:
|
||
file_handler = None
|
||
|
||
try:
|
||
root.addHandler(handler)
|
||
except Exception:
|
||
# 无法挂载则降级为空
|
||
yield lambda: ""
|
||
return
|
||
|
||
if file_handler is not None:
|
||
try:
|
||
root.addHandler(file_handler)
|
||
except Exception:
|
||
try:
|
||
file_handler.close()
|
||
except Exception:
|
||
pass
|
||
file_handler = None
|
||
|
||
try:
|
||
yield handler.get_text
|
||
finally:
|
||
try:
|
||
root.removeHandler(handler)
|
||
except Exception:
|
||
pass
|
||
if file_handler is not None:
|
||
try:
|
||
root.removeHandler(file_handler)
|
||
except Exception:
|
||
pass
|
||
try:
|
||
file_handler.close()
|
||
except Exception:
|
||
pass
|
||
|
||
|