Vastai-ConnectHub/app/integrations/base.py

73 lines
2.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import logging
import time
from typing import Any
import httpx
logger = logging.getLogger("connecthub.integrations")
class BaseClient:
"""
统一的外部系统访问 SDK 基类。
业务 Job 禁止直接写 HTTP只能调用 integrations 下的 Client。
"""
def __init__(
self,
*,
base_url: str,
timeout_s: float = 10.0,
retries: int = 2,
retry_backoff_s: float = 0.5,
headers: dict[str, str] | None = None,
) -> None:
self.base_url = base_url.rstrip("/")
self.timeout_s = timeout_s
self.retries = retries
self.retry_backoff_s = retry_backoff_s
self.headers = headers or {}
self._client = httpx.Client(
base_url=self.base_url,
timeout=httpx.Timeout(self.timeout_s),
headers=self.headers,
)
def close(self) -> None:
self._client.close()
def request(self, method: str, path: str, **kwargs: Any) -> httpx.Response:
url = path if path.startswith("/") else f"/{path}"
extra_headers = kwargs.pop("headers", None) or {}
merged_headers = {**self.headers, **extra_headers} if extra_headers else None
last_exc: Exception | None = None
for attempt in range(self.retries + 1):
try:
start = time.time()
resp = self._client.request(method=method, url=url, headers=merged_headers, **kwargs)
elapsed_ms = int((time.time() - start) * 1000)
logger.info("HTTP %s %s -> %s (%sms)", method, url, resp.status_code, elapsed_ms)
resp.raise_for_status()
return resp
except Exception as e: # noqa: BLE001 (framework-wide)
last_exc = e
logger.warning("HTTP failed (%s %s) attempt=%s err=%r", method, url, attempt + 1, e)
if attempt < self.retries:
time.sleep(self.retry_backoff_s * (2**attempt))
continue
raise
assert last_exc is not None
raise last_exc
def get_json(self, path: str, **kwargs: Any) -> Any:
return self.request("GET", path, **kwargs).json()
def post_json(self, path: str, json: Any = None, **kwargs: Any) -> Any:
return self.request("POST", path, json=json, **kwargs).json()