from __future__ import annotations import logging from typing import Any, Dict, List, Optional from app.clients.huobanyun_client import HuobanyunClient from app.schemas.feishu_external import ( FeishuExternalItem, FeishuExternalQueryRequest, FeishuExternalQueryResponse, ) from app.schemas.huobanyun import HuobanyunQueryResponse logger = logging.getLogger(__name__) class HuobanyunService: def __init__(self) -> None: self.client = HuobanyunClient() def _extract_value(self, value: Any) -> Any: if isinstance(value, dict): for key in ("value", "text", "name", "title", "label", "id"): if key in value: return value.get(key) return value if isinstance(value, list): if not value: return value if len(value) == 1: return self._extract_value(value[0]) return value def _pick_field(self, fields: Dict[str, Any], keys: List[str]) -> Any: for key in keys: if key in fields: return self._extract_value(fields.get(key)) return "" def _resolve_field_key(self, key: str) -> str: mapping = { "项目单号": "2200000149785345", "项目名称": "2200000150711223", "订单状态": "2200000150497330", "下单金额": "2200000149785349", "是否已关联对公付款审批": "2200000589775224", "是否已关联对私付款审批": "2200000589775228", } return mapping.get(key, key) def _to_bool(self, value: Any) -> Optional[bool]: if isinstance(value, bool): return value if isinstance(value, (int, float)): return value != 0 if isinstance(value, str): val = value.strip().lower() if val in {"是", "true", "1", "yes", "y"}: return True if val in {"否", "false", "0", "no", "n"}: return False if isinstance(value, list): return len(value) > 0 return None async def get_projects_list( self, project_name: str | None = None, project_no: str | None = None, page: int = 1, size: int = 50, ) -> Dict[str, Any]: table_id = "2100000015544940" limit = 50 if size <= 0 else min(size, 100) page = 1 if page <= 0 else page offset = (page - 1) * limit filter_items: List[Dict[str, Any]] = [] if project_name: filter_items.append( {"field": "2200000150711223", "query": {"eqm": [project_name]}} ) if project_no: filter_items.append( {"field": "proj_id", "query": {"eqm": [project_no]}} ) payload: Dict[str, Any] = {"table_id": table_id, "limit": limit, "offset": offset} if filter_items: payload["filter"] = {"and": filter_items} payload["order"] = {"field_id": "created_on", "type": "desc"} payload["with_field_config"] = 0 data = await self.client.list_items(payload) if isinstance(data, dict): logger.info( "伙伴云列表响应: keys=%s code=%s message=%s", list(data.keys()), data.get("code"), data.get("message"), ) if isinstance(data, dict): data_block = data.get("data", data) if isinstance(data_block, list): logger.warning("伙伴云 data 字段为列表,按 items 处理") items = data_block total = len(items) elif isinstance(data_block, dict): items = data_block.get("items", []) if not items and isinstance(data.get("items"), list): items = data.get("items", []) total = data_block.get("filtered", data_block.get("total", len(items))) else: logger.error("伙伴云 data 字段结构异常: %s", type(data_block)) items = [] total = 0 elif isinstance(data, list): logger.warning("伙伴云返回为列表,按 items 处理") items = data total = len(items) else: logger.error("伙伴云返回结构异常: %s", type(data)) items = [] total = 0 if items: sample_fields = items[0].get("fields", {}) logger.info("项目字段样例 keys=%s", list(sample_fields.keys())) mapped_items: List[Dict[str, Any]] = [] for item in items: fields = item.get("fields", {}) project_no = self._pick_field(fields, ["proj_id", "2200000149785345"]) project_name = self._pick_field(fields, ["2200000150711223"]) if not project_name: project_name = item.get("title", "") order_status = self._pick_field(fields, ["2200000150497330"]) order_amount = self._pick_field(fields, ["2200000149785349"]) linked_public = self._pick_field( fields, [ "2200000589775224", ], ) linked_private = self._pick_field( fields, [ "2200000589775228", ], ) mapped_items.append( { "project_no": project_no, "project_name": project_name, "order_status": order_status, "order_amount": order_amount, "linked_public_payment": self._to_bool(linked_public), "linked_private_payment": self._to_bool(linked_private), } ) return {"total": total, "items": mapped_items} async def query(self, req: FeishuExternalQueryRequest) -> FeishuExternalQueryResponse: raw = req.raw or {} table_id = ( raw.get("table_id") or raw.get("tableId") or raw.get("table") or "2100000015544940" ) limit = 50 if req.page_token and str(req.page_token).isdigit(): offset = int(str(req.page_token)) else: offset = 0 key = req.key or raw.get("key") or raw.get("field") or "" key = self._resolve_field_key(str(key)) if key else "" query_value = req.query or req.keyword or "" payload: Dict[str, Any] = {"table_id": table_id, "limit": limit, "offset": offset} if key and query_value: payload["filter"] = {"and": [{"field": key, "query": {"eqm": [query_value]}}]} elif raw.get("filter"): payload["filter"] = raw.get("filter") elif query_value: payload["filter"] = {"and": [{"field": "title", "query": {"eqm": [query_value]}}]} payload["order"] = {"field_id": "created_on", "type": "desc"} payload["with_field_config"] = 0 data = await self.client.query(payload) data_block = data.get("data", {}) if isinstance(data, dict) else {} items = data_block.get("items", []) if isinstance(data_block, dict) else [] has_more = bool(data_block.get("has_more")) next_token = str(offset + limit) if has_more else "" options = [] texts = {} for item in items: fields = item.get("fields", {}) item_id = item.get("item_id", "") if key: value = self._extract_value(fields.get(key, "")) else: value = item.get("title", "") value_str = "" if value is None else str(value) i18n_key = f"@i18n@{item_id}" if item_id else f"@i18n@{value_str}" options.append({"id": str(item_id), "value": i18n_key, "isDefault": False}) texts[i18n_key] = value_str return { "options": options, "texts": texts, "has_more": has_more, "next_page_token": next_token, }