67 lines
2.5 KiB
Python
67 lines
2.5 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
from typing import Any, Dict
|
|
|
|
import httpx
|
|
|
|
from app.config.settings import get_settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class HuobanyunClient:
|
|
def __init__(self) -> None:
|
|
self.settings = get_settings()
|
|
|
|
def _headers(self) -> Dict[str, str]:
|
|
headers = {"Content-Type": "application/json"}
|
|
if self.settings.huobanyun_api_key:
|
|
headers["Open-Authorization"] = f"Bearer {self.settings.huobanyun_api_key}"
|
|
if self.settings.huobanyun_token:
|
|
headers["Authorization"] = f"Bearer {self.settings.huobanyun_token}"
|
|
if self.settings.huobanyun_app_id:
|
|
headers["X-App-Id"] = self.settings.huobanyun_app_id
|
|
if self.settings.huobanyun_app_secret:
|
|
headers["X-App-Secret"] = self.settings.huobanyun_app_secret
|
|
return headers
|
|
|
|
def _base_url(self) -> str:
|
|
base = self.settings.huobanyun_base_url.strip()
|
|
if base:
|
|
return base.rstrip("/")
|
|
return "https://api.huoban.com"
|
|
|
|
async def _request(self, method: str, url: str, json: Dict[str, Any]) -> Dict[str, Any]:
|
|
timeout = self.settings.request_timeout
|
|
retries = self.settings.retry_count
|
|
for attempt in range(retries + 1):
|
|
try:
|
|
async with httpx.AsyncClient(timeout=timeout) as client:
|
|
resp = await client.request(method, url, json=json, headers=self._headers())
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
except Exception as exc:
|
|
logger.error("伙伴云请求失败: %s", exc)
|
|
if attempt >= retries:
|
|
raise
|
|
await asyncio.sleep(1 + attempt)
|
|
return {}
|
|
|
|
async def query(self, payload: Dict[str, Any]) -> Dict[str, Any]:
|
|
url = f"{self._base_url()}/openapi/v1/item/list"
|
|
return await self._request("POST", url, payload)
|
|
|
|
async def writeback(self, payload: Dict[str, Any]) -> Dict[str, Any]:
|
|
url = f"{self._base_url()}/writeback"
|
|
return await self._request("POST", url, payload)
|
|
|
|
async def list_items(self, payload: Dict[str, Any]) -> Dict[str, Any]:
|
|
url = f"{self._base_url()}/openapi/v1/item/list"
|
|
return await self._request("POST", url, payload)
|
|
|
|
async def update_item(self, item_id: str, payload: Dict[str, Any]) -> Dict[str, Any]:
|
|
url = f"{self._base_url()}/openapi/v1/item/{item_id}"
|
|
return await self._request("PUT", url, payload)
|