from __future__ import annotations import asyncio import logging from typing import Any, Dict import httpx from app.config.settings import get_settings logger = logging.getLogger(__name__) class HuobanyunClient: def __init__(self) -> None: self.settings = get_settings() def _headers(self) -> Dict[str, str]: headers = {"Content-Type": "application/json"} if self.settings.huobanyun_api_key: headers["Open-Authorization"] = f"Bearer {self.settings.huobanyun_api_key}" if self.settings.huobanyun_token: headers["Authorization"] = f"Bearer {self.settings.huobanyun_token}" if self.settings.huobanyun_app_id: headers["X-App-Id"] = self.settings.huobanyun_app_id if self.settings.huobanyun_app_secret: headers["X-App-Secret"] = self.settings.huobanyun_app_secret return headers def _base_url(self) -> str: base = self.settings.huobanyun_base_url.strip() if base: return base.rstrip("/") return "https://api.huoban.com" async def _request(self, method: str, url: str, json: Dict[str, Any]) -> Dict[str, Any]: timeout = self.settings.request_timeout retries = self.settings.retry_count for attempt in range(retries + 1): try: async with httpx.AsyncClient(timeout=timeout) as client: resp = await client.request(method, url, json=json, headers=self._headers()) resp.raise_for_status() return resp.json() except Exception as exc: logger.error("伙伴云请求失败: %s", exc) if attempt >= retries: raise await asyncio.sleep(1 + attempt) return {} async def query(self, payload: Dict[str, Any]) -> Dict[str, Any]: url = f"{self._base_url()}/openapi/v1/item/list" return await self._request("POST", url, payload) async def writeback(self, payload: Dict[str, Any]) -> Dict[str, Any]: url = f"{self._base_url()}/writeback" return await self._request("POST", url, payload) async def list_items(self, payload: Dict[str, Any]) -> Dict[str, Any]: url = f"{self._base_url()}/openapi/v1/item/list" return await self._request("POST", url, payload) async def update_item(self, item_id: str, payload: Dict[str, Any]) -> Dict[str, Any]: url = f"{self._base_url()}/openapi/v1/item/{item_id}" return await self._request("PUT", url, payload)