# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. #!/usr/bin/env python3 """火山引擎联网搜索 API 客户端。 官方文档:https://www.volcengine.com/docs/85508/1650263 签名参考:https://github.com/volcengine/volc-openapi-demos/blob/main/signature/python/sign.py 认证优先级: 1. WEB_SEARCH_API_KEY 环境变量或 --api-key 2. VOLCENGINE_ACCESS_KEY + VOLCENGINE_SECRET_KEY 环境变量 3. VeFaaS IAM 临时凭证(需 veadk-python 库) 示例: python web_search.py "北京天气" python web_search.py "OpenAI 最新发布" --time-range OneWeek python web_search.py "故宫博物院" --type image --count 3 """ import argparse import datetime import getpass import hashlib import hmac import json import os import re import shlex import sys from typing import Optional from urllib.parse import quote SERVICE = "volc_torchlight_api" VERSION = "2025-01-01" REGION = "cn-beijing" HOST = "mercury.volcengineapi.com" ACTION = "WebSearch" INTERNAL_API_URL = "https://open.feedcoopapi.com/search_api/web_search" TRAFFIC_TAG_HEADER = "X-Traffic-Tag" TRAFFIC_TAG_VALUE = "skill_web_search_common" TIME_RANGE_SHORTCUTS = {"OneDay", "OneWeek", "OneMonth", "OneYear"} DATE_RANGE_PATTERN = re.compile(r"^(\d{4}-\d{2}-\d{2})\.\.(\d{4}-\d{2}-\d{2})$") LEGACY_ENV_PATH = "/root/.openclaw/.env" SUMMARY_PREVIEW_LIMIT = 1000 # ---- 依赖加载 ---- def _require_requests(): try: import requests except ImportError: print("Error: requests not installed. Run: pip install requests", file=sys.stderr) sys.exit(1) return requests def _load_legacy_env_file(env_path: str = LEGACY_ENV_PATH) -> None: if not os.path.exists(env_path): return try: with open(env_path, "r", encoding="utf-8") as f: for raw_line in f: line = raw_line.strip() if not line or line.startswith("#"): continue if line.startswith("export "): line = line[len("export "):].strip() if "=" not in line: continue key, value = line.split("=", 1) key = key.strip() value = value.strip() if not key: continue try: parsed = shlex.split(value, comments=True) value = parsed[0] if parsed else "" except ValueError: value = value.strip("\"'") os.environ.setdefault(key, value) except OSError: return # ---- 火山引擎 HMAC-SHA256 签名 (基于官方示例) ---- def _hmac_sha256(key: bytes, content: str) -> bytes: return hmac.new(key, content.encode("utf-8"), hashlib.sha256).digest() def _hash_sha256(content: str) -> str: return hashlib.sha256(content.encode("utf-8")).hexdigest() def _norm_query(params: dict) -> str: query = "" for key in sorted(params.keys()): if isinstance(params[key], list): for value in params[key]: query += quote(key, safe="-_.~") + "=" + quote(value, safe="-_.~") + "&" else: query += quote(key, safe="-_.~") + "=" + quote(str(params[key]), safe="-_.~") + "&" return query[:-1].replace("+", "%20") if query else "" def _utc_now(): try: from datetime import timezone return datetime.datetime.now(timezone.utc) except ImportError: return datetime.datetime.utcnow() def _sign_request(method: str, ak: str, sk: str, body: str, session_token: str = "") -> dict: now = _utc_now() x_date = now.strftime("%Y%m%dT%H%M%SZ") short_date = x_date[:8] x_content_sha256 = _hash_sha256(body) content_type = "application/json" query_params = {"Action": ACTION, "Version": VERSION} signed_header_keys = ["content-type", "host", "x-content-sha256", "x-date", "x-traffic-tag"] if session_token: signed_header_keys.append("x-security-token") signed_header_keys.sort() signed_headers_str = ";".join(signed_header_keys) canonical_header_lines = [ f"content-type:{content_type}", f"host:{HOST}", f"x-content-sha256:{x_content_sha256}", f"x-date:{x_date}", f"x-traffic-tag:{TRAFFIC_TAG_VALUE}", ] if session_token: canonical_header_lines.append(f"x-security-token:{session_token}") canonical_header_lines.sort() canonical_request = "\n".join( [ method.upper(), "/", _norm_query(query_params), "\n".join(canonical_header_lines), "", signed_headers_str, x_content_sha256, ] ) credential_scope = f"{short_date}/{REGION}/{SERVICE}/request" string_to_sign = "\n".join( [ "HMAC-SHA256", x_date, credential_scope, _hash_sha256(canonical_request), ] ) k_date = _hmac_sha256(sk.encode("utf-8"), short_date) k_region = _hmac_sha256(k_date, REGION) k_service = _hmac_sha256(k_region, SERVICE) k_signing = _hmac_sha256(k_service, "request") signature = _hmac_sha256(k_signing, string_to_sign).hex() authorization = ( f"HMAC-SHA256 Credential={ak}/{credential_scope}, " f"SignedHeaders={signed_headers_str}, " f"Signature={signature}" ) headers = { "Content-Type": content_type, "Host": HOST, "X-Date": x_date, "X-Content-Sha256": x_content_sha256, TRAFFIC_TAG_HEADER: TRAFFIC_TAG_VALUE, "Authorization": authorization, } if session_token: headers["X-Security-Token"] = session_token return headers # ---- 凭证获取 ---- def _get_credentials() -> tuple: """返回 (ak, sk, session_token)。""" ak = os.getenv("VOLCENGINE_ACCESS_KEY") sk = os.getenv("VOLCENGINE_SECRET_KEY") if ak and sk: return ak, sk, "" try: from veadk.auth.veauth.utils import get_credential_from_vefaas_iam cred = get_credential_from_vefaas_iam() return cred.access_key_id, cred.secret_access_key, cred.session_token except Exception: return None, None, "" # ---- 请求构建 ---- def _get_api_key(cli_api_key: Optional[str]) -> Optional[str]: api_key = cli_api_key or os.getenv("WEB_SEARCH_API_KEY") return api_key.strip() if api_key else None def _validate_time_range(time_range: Optional[str]) -> Optional[str]: if not time_range: return None if time_range in TIME_RANGE_SHORTCUTS: return time_range match = DATE_RANGE_PATTERN.match(time_range) if not match: raise ValueError( "--time-range 必须是 OneDay/OneWeek/OneMonth/OneYear,或日期区间 YYYY-MM-DD..YYYY-MM-DD。" ) start_text, end_text = match.groups() try: start_date = datetime.date.fromisoformat(start_text) end_date = datetime.date.fromisoformat(end_text) except ValueError as exc: raise ValueError("--time-range 中的日期必须是有效的 YYYY-MM-DD。") from exc if start_date > end_date: raise ValueError("--time-range 的开始日期不能晚于结束日期。") return time_range def build_body( query: str, search_type: str = "web", count: int = 5, time_range: Optional[str] = None, auth_level: int = 0, query_rewrite: bool = False, ) -> dict: body = {"Query": query, "SearchType": search_type, "Count": count} if search_type == "web": body["NeedSummary"] = True filters = {} if auth_level > 0: filters["AuthInfoLevel"] = auth_level if filters: body["Filter"] = filters if time_range: body["TimeRange"] = time_range if query_rewrite: body["QueryControl"] = {"QueryRewrite": True} return body # ---- API 调用 ---- def do_search( body: dict, api_key: Optional[str] = None, ak: Optional[str] = None, sk: Optional[str] = None, session_token: str = "", ): requests = _require_requests() body_str = json.dumps(body, ensure_ascii=False) if api_key: headers = { "Content-Type": "application/json", TRAFFIC_TAG_HEADER: TRAFFIC_TAG_VALUE, "Authorization": f"Bearer {api_key}", } url = INTERNAL_API_URL else: if not ak or not sk: raise ValueError("missing volcengine credentials") headers = _sign_request("POST", ak, sk, body_str, session_token) url = f"https://{HOST}?Action={ACTION}&Version={VERSION}" response = requests.post(url, headers=headers, data=body_str.encode("utf-8"), timeout=30) response.raise_for_status() return response.json() # ---- 输出格式化 ---- def format_output(data: dict, search_type: str) -> str: result = data.get("Result", {}) lines = [f"结果数: {result.get('ResultCount', 0)} 耗时: {result.get('TimeCost', 0)}ms", ""] if search_type == "web": for item in result.get("WebResults") or []: lines.append(f"[{item.get('SortId', '')}] {item.get('Title', '')}") meta_parts = [part for part in [item.get("SiteName", ""), item.get("AuthInfoDes", "")] if part] if meta_parts: lines.append(f" {' | '.join(meta_parts)}") if item.get("Url"): lines.append(f" {item['Url']}") summary = item.get("Summary") or item.get("Snippet", "") if summary: lines.append(f" {summary[:SUMMARY_PREVIEW_LIMIT]}") lines.append("") elif search_type == "image": for item in result.get("ImageResults") or []: image = item.get("Image", {}) lines.append(f"[{item.get('SortId', '')}] {item.get('Title', '')}") if image.get("Url"): lines.append(f" {image['Url']}") lines.append(f" {image.get('Width', '?')}x{image.get('Height', '?')} ({image.get('Shape', '')})") lines.append("") return "\n".join(lines) # ---- CLI ---- def main(): _load_legacy_env_file() parser = argparse.ArgumentParser(description="火山引擎联网搜索 API\nhttps://www.volcengine.com/docs/85508/1650263") parser.add_argument("query", help="搜索关键词") parser.add_argument("--type", "-t", default="web", choices=["web", "image"]) parser.add_argument("--count", "-c", type=int, default=5) parser.add_argument( "--time-range", help="OneDay/OneWeek/OneMonth/OneYear/YYYY-MM-DD..YYYY-MM-DD", ) parser.add_argument("--auth-level", type=int, default=0, choices=[0, 1]) parser.add_argument("--query-rewrite", action="store_true", help="开启 Query 改写") parser.add_argument("--api-key", help="API Key(优先于环境变量 WEB_SEARCH_API_KEY)") parser.add_argument("--prompt-api-key", action="store_true", help="交互式输入 API Key(不回显)") args = parser.parse_args() if args.type == "image" and args.count > 5: print("Error: image 类型最多返回 5 条,请调整 --count。", file=sys.stderr) sys.exit(1) if args.type == "web" and args.count > 50: print("Error: web 类型最多返回 50 条,请调整 --count。", file=sys.stderr) sys.exit(1) try: time_range = _validate_time_range(args.time_range) except ValueError as exc: print(f"Error: {exc}", file=sys.stderr) sys.exit(1) api_key = _get_api_key(args.api_key) if not api_key and args.prompt_api_key: entered = getpass.getpass("API Key: ").strip() api_key = entered or None ak = sk = session_token = None if not api_key: ak, sk, session_token = _get_credentials() if not ak or not sk: print( "Error: 未找到凭证。请配置以下任一方式:\n" "1) API Key:设置 WEB_SEARCH_API_KEY 或传入 --api-key\n" "2) AK/SK:设置 VOLCENGINE_ACCESS_KEY 和 VOLCENGINE_SECRET_KEY", file=sys.stderr, ) sys.exit(1) body = build_body( query=args.query, search_type=args.type, count=args.count, time_range=time_range, auth_level=args.auth_level, query_rewrite=args.query_rewrite, ) requests = _require_requests() try: data = do_search(body, api_key=api_key, ak=ak, sk=sk, session_token=session_token or "") except requests.exceptions.HTTPError as exc: print(f"HTTP Error: {exc}", file=sys.stderr) if exc.response is not None: print(exc.response.text, file=sys.stderr) sys.exit(1) except Exception as exc: print(f"Error: {exc}", file=sys.stderr) sys.exit(1) if data is None: print("No response.", file=sys.stderr) sys.exit(1) error = (data.get("ResponseMetadata") or {}).get("Error") if error: print(f"API Error [{error.get('Code')}]: {error.get('Message')}", file=sys.stderr) sys.exit(1) print(format_output(data, args.type)) if __name__ == "__main__": main()