approval/app/clients/huobanyun_client.py

67 lines
2.5 KiB
Python

from __future__ import annotations
import asyncio
import logging
from typing import Any, Dict
import httpx
from app.config.settings import get_settings
logger = logging.getLogger(__name__)
class HuobanyunClient:
def __init__(self) -> None:
self.settings = get_settings()
def _headers(self) -> Dict[str, str]:
headers = {"Content-Type": "application/json"}
if self.settings.huobanyun_api_key:
headers["Open-Authorization"] = f"Bearer {self.settings.huobanyun_api_key}"
if self.settings.huobanyun_token:
headers["Authorization"] = f"Bearer {self.settings.huobanyun_token}"
if self.settings.huobanyun_app_id:
headers["X-App-Id"] = self.settings.huobanyun_app_id
if self.settings.huobanyun_app_secret:
headers["X-App-Secret"] = self.settings.huobanyun_app_secret
return headers
def _base_url(self) -> str:
base = self.settings.huobanyun_base_url.strip()
if base:
return base.rstrip("/")
return "https://api.huoban.com"
async def _request(self, method: str, url: str, json: Dict[str, Any]) -> Dict[str, Any]:
timeout = self.settings.request_timeout
retries = self.settings.retry_count
for attempt in range(retries + 1):
try:
async with httpx.AsyncClient(timeout=timeout) as client:
resp = await client.request(method, url, json=json, headers=self._headers())
resp.raise_for_status()
return resp.json()
except Exception as exc:
logger.error("伙伴云请求失败: %s", exc)
if attempt >= retries:
raise
await asyncio.sleep(1 + attempt)
return {}
async def query(self, payload: Dict[str, Any]) -> Dict[str, Any]:
url = f"{self._base_url()}/openapi/v1/item/list"
return await self._request("POST", url, payload)
async def writeback(self, payload: Dict[str, Any]) -> Dict[str, Any]:
url = f"{self._base_url()}/writeback"
return await self._request("POST", url, payload)
async def list_items(self, payload: Dict[str, Any]) -> Dict[str, Any]:
url = f"{self._base_url()}/openapi/v1/item/list"
return await self._request("POST", url, payload)
async def update_item(self, item_id: str, payload: Dict[str, Any]) -> Dict[str, Any]:
url = f"{self._base_url()}/openapi/v1/item/{item_id}"
return await self._request("PUT", url, payload)