novel-doomsday-resurgence/skills/byted-web-search/scripts/web_search.py
唐天洛 cb9b16e5a8 初始提交:番茄小说创作工作区
包含:
- 核心配置文件(AGENTS.md, SOUL.md, USER.md等)
- 记忆系统(memory/文件夹)
- 技能库(skills/文件夹)
- 小说内容(novel/文件夹)
- .gitignore配置
2026-03-30 15:46:26 +08:00

428 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#!/usr/bin/env python3
"""火山引擎联网搜索 API 客户端。
官方文档https://www.volcengine.com/docs/85508/1650263
签名参考https://github.com/volcengine/volc-openapi-demos/blob/main/signature/python/sign.py
认证优先级:
1. WEB_SEARCH_API_KEY 环境变量或 --api-key
2. VOLCENGINE_ACCESS_KEY + VOLCENGINE_SECRET_KEY 环境变量
3. VeFaaS IAM 临时凭证(需 veadk-python 库)
示例:
python web_search.py "北京天气"
python web_search.py "OpenAI 最新发布" --time-range OneWeek
python web_search.py "故宫博物院" --type image --count 3
"""
import argparse
import datetime
import getpass
import hashlib
import hmac
import json
import os
import re
import shlex
import sys
from typing import Optional
from urllib.parse import quote
SERVICE = "volc_torchlight_api"
VERSION = "2025-01-01"
REGION = "cn-beijing"
HOST = "mercury.volcengineapi.com"
ACTION = "WebSearch"
INTERNAL_API_URL = "https://open.feedcoopapi.com/search_api/web_search"
TRAFFIC_TAG_HEADER = "X-Traffic-Tag"
TRAFFIC_TAG_VALUE = "skill_web_search_common"
TIME_RANGE_SHORTCUTS = {"OneDay", "OneWeek", "OneMonth", "OneYear"}
DATE_RANGE_PATTERN = re.compile(r"^(\d{4}-\d{2}-\d{2})\.\.(\d{4}-\d{2}-\d{2})$")
LEGACY_ENV_PATH = "/root/.openclaw/.env"
SUMMARY_PREVIEW_LIMIT = 1000
# ---- 依赖加载 ----
def _require_requests():
try:
import requests
except ImportError:
print("Error: requests not installed. Run: pip install requests", file=sys.stderr)
sys.exit(1)
return requests
def _load_legacy_env_file(env_path: str = LEGACY_ENV_PATH) -> None:
if not os.path.exists(env_path):
return
try:
with open(env_path, "r", encoding="utf-8") as f:
for raw_line in f:
line = raw_line.strip()
if not line or line.startswith("#"):
continue
if line.startswith("export "):
line = line[len("export "):].strip()
if "=" not in line:
continue
key, value = line.split("=", 1)
key = key.strip()
value = value.strip()
if not key:
continue
try:
parsed = shlex.split(value, comments=True)
value = parsed[0] if parsed else ""
except ValueError:
value = value.strip("\"'")
os.environ.setdefault(key, value)
except OSError:
return
# ---- 火山引擎 HMAC-SHA256 签名 (基于官方示例) ----
def _hmac_sha256(key: bytes, content: str) -> bytes:
return hmac.new(key, content.encode("utf-8"), hashlib.sha256).digest()
def _hash_sha256(content: str) -> str:
return hashlib.sha256(content.encode("utf-8")).hexdigest()
def _norm_query(params: dict) -> str:
query = ""
for key in sorted(params.keys()):
if isinstance(params[key], list):
for value in params[key]:
query += quote(key, safe="-_.~") + "=" + quote(value, safe="-_.~") + "&"
else:
query += quote(key, safe="-_.~") + "=" + quote(str(params[key]), safe="-_.~") + "&"
return query[:-1].replace("+", "%20") if query else ""
def _utc_now():
try:
from datetime import timezone
return datetime.datetime.now(timezone.utc)
except ImportError:
return datetime.datetime.utcnow()
def _sign_request(method: str, ak: str, sk: str, body: str, session_token: str = "") -> dict:
now = _utc_now()
x_date = now.strftime("%Y%m%dT%H%M%SZ")
short_date = x_date[:8]
x_content_sha256 = _hash_sha256(body)
content_type = "application/json"
query_params = {"Action": ACTION, "Version": VERSION}
signed_header_keys = ["content-type", "host", "x-content-sha256", "x-date", "x-traffic-tag"]
if session_token:
signed_header_keys.append("x-security-token")
signed_header_keys.sort()
signed_headers_str = ";".join(signed_header_keys)
canonical_header_lines = [
f"content-type:{content_type}",
f"host:{HOST}",
f"x-content-sha256:{x_content_sha256}",
f"x-date:{x_date}",
f"x-traffic-tag:{TRAFFIC_TAG_VALUE}",
]
if session_token:
canonical_header_lines.append(f"x-security-token:{session_token}")
canonical_header_lines.sort()
canonical_request = "\n".join(
[
method.upper(),
"/",
_norm_query(query_params),
"\n".join(canonical_header_lines),
"",
signed_headers_str,
x_content_sha256,
]
)
credential_scope = f"{short_date}/{REGION}/{SERVICE}/request"
string_to_sign = "\n".join(
[
"HMAC-SHA256",
x_date,
credential_scope,
_hash_sha256(canonical_request),
]
)
k_date = _hmac_sha256(sk.encode("utf-8"), short_date)
k_region = _hmac_sha256(k_date, REGION)
k_service = _hmac_sha256(k_region, SERVICE)
k_signing = _hmac_sha256(k_service, "request")
signature = _hmac_sha256(k_signing, string_to_sign).hex()
authorization = (
f"HMAC-SHA256 Credential={ak}/{credential_scope}, "
f"SignedHeaders={signed_headers_str}, "
f"Signature={signature}"
)
headers = {
"Content-Type": content_type,
"Host": HOST,
"X-Date": x_date,
"X-Content-Sha256": x_content_sha256,
TRAFFIC_TAG_HEADER: TRAFFIC_TAG_VALUE,
"Authorization": authorization,
}
if session_token:
headers["X-Security-Token"] = session_token
return headers
# ---- 凭证获取 ----
def _get_credentials() -> tuple:
"""返回 (ak, sk, session_token)。"""
ak = os.getenv("VOLCENGINE_ACCESS_KEY")
sk = os.getenv("VOLCENGINE_SECRET_KEY")
if ak and sk:
return ak, sk, ""
try:
from veadk.auth.veauth.utils import get_credential_from_vefaas_iam
cred = get_credential_from_vefaas_iam()
return cred.access_key_id, cred.secret_access_key, cred.session_token
except Exception:
return None, None, ""
# ---- 请求构建 ----
def _get_api_key(cli_api_key: Optional[str]) -> Optional[str]:
api_key = cli_api_key or os.getenv("WEB_SEARCH_API_KEY")
return api_key.strip() if api_key else None
def _validate_time_range(time_range: Optional[str]) -> Optional[str]:
if not time_range:
return None
if time_range in TIME_RANGE_SHORTCUTS:
return time_range
match = DATE_RANGE_PATTERN.match(time_range)
if not match:
raise ValueError(
"--time-range 必须是 OneDay/OneWeek/OneMonth/OneYear或日期区间 YYYY-MM-DD..YYYY-MM-DD。"
)
start_text, end_text = match.groups()
try:
start_date = datetime.date.fromisoformat(start_text)
end_date = datetime.date.fromisoformat(end_text)
except ValueError as exc:
raise ValueError("--time-range 中的日期必须是有效的 YYYY-MM-DD。") from exc
if start_date > end_date:
raise ValueError("--time-range 的开始日期不能晚于结束日期。")
return time_range
def build_body(
query: str,
search_type: str = "web",
count: int = 5,
time_range: Optional[str] = None,
auth_level: int = 0,
query_rewrite: bool = False,
) -> dict:
body = {"Query": query, "SearchType": search_type, "Count": count}
if search_type == "web":
body["NeedSummary"] = True
filters = {}
if auth_level > 0:
filters["AuthInfoLevel"] = auth_level
if filters:
body["Filter"] = filters
if time_range:
body["TimeRange"] = time_range
if query_rewrite:
body["QueryControl"] = {"QueryRewrite": True}
return body
# ---- API 调用 ----
def do_search(
body: dict,
api_key: Optional[str] = None,
ak: Optional[str] = None,
sk: Optional[str] = None,
session_token: str = "",
):
requests = _require_requests()
body_str = json.dumps(body, ensure_ascii=False)
if api_key:
headers = {
"Content-Type": "application/json",
TRAFFIC_TAG_HEADER: TRAFFIC_TAG_VALUE,
"Authorization": f"Bearer {api_key}",
}
url = INTERNAL_API_URL
else:
if not ak or not sk:
raise ValueError("missing volcengine credentials")
headers = _sign_request("POST", ak, sk, body_str, session_token)
url = f"https://{HOST}?Action={ACTION}&Version={VERSION}"
response = requests.post(url, headers=headers, data=body_str.encode("utf-8"), timeout=30)
response.raise_for_status()
return response.json()
# ---- 输出格式化 ----
def format_output(data: dict, search_type: str) -> str:
result = data.get("Result", {})
lines = [f"结果数: {result.get('ResultCount', 0)} 耗时: {result.get('TimeCost', 0)}ms", ""]
if search_type == "web":
for item in result.get("WebResults") or []:
lines.append(f"[{item.get('SortId', '')}] {item.get('Title', '')}")
meta_parts = [part for part in [item.get("SiteName", ""), item.get("AuthInfoDes", "")] if part]
if meta_parts:
lines.append(f" {' | '.join(meta_parts)}")
if item.get("Url"):
lines.append(f" {item['Url']}")
summary = item.get("Summary") or item.get("Snippet", "")
if summary:
lines.append(f" {summary[:SUMMARY_PREVIEW_LIMIT]}")
lines.append("")
elif search_type == "image":
for item in result.get("ImageResults") or []:
image = item.get("Image", {})
lines.append(f"[{item.get('SortId', '')}] {item.get('Title', '')}")
if image.get("Url"):
lines.append(f" {image['Url']}")
lines.append(f" {image.get('Width', '?')}x{image.get('Height', '?')} ({image.get('Shape', '')})")
lines.append("")
return "\n".join(lines)
# ---- CLI ----
def main():
_load_legacy_env_file()
parser = argparse.ArgumentParser(description="火山引擎联网搜索 API\nhttps://www.volcengine.com/docs/85508/1650263")
parser.add_argument("query", help="搜索关键词")
parser.add_argument("--type", "-t", default="web", choices=["web", "image"])
parser.add_argument("--count", "-c", type=int, default=5)
parser.add_argument(
"--time-range",
help="OneDay/OneWeek/OneMonth/OneYear/YYYY-MM-DD..YYYY-MM-DD",
)
parser.add_argument("--auth-level", type=int, default=0, choices=[0, 1])
parser.add_argument("--query-rewrite", action="store_true", help="开启 Query 改写")
parser.add_argument("--api-key", help="API Key优先于环境变量 WEB_SEARCH_API_KEY")
parser.add_argument("--prompt-api-key", action="store_true", help="交互式输入 API Key不回显")
args = parser.parse_args()
if args.type == "image" and args.count > 5:
print("Error: image 类型最多返回 5 条,请调整 --count。", file=sys.stderr)
sys.exit(1)
if args.type == "web" and args.count > 50:
print("Error: web 类型最多返回 50 条,请调整 --count。", file=sys.stderr)
sys.exit(1)
try:
time_range = _validate_time_range(args.time_range)
except ValueError as exc:
print(f"Error: {exc}", file=sys.stderr)
sys.exit(1)
api_key = _get_api_key(args.api_key)
if not api_key and args.prompt_api_key:
entered = getpass.getpass("API Key: ").strip()
api_key = entered or None
ak = sk = session_token = None
if not api_key:
ak, sk, session_token = _get_credentials()
if not ak or not sk:
print(
"Error: 未找到凭证。请配置以下任一方式:\n"
"1) API Key设置 WEB_SEARCH_API_KEY 或传入 --api-key\n"
"2) AK/SK设置 VOLCENGINE_ACCESS_KEY 和 VOLCENGINE_SECRET_KEY",
file=sys.stderr,
)
sys.exit(1)
body = build_body(
query=args.query,
search_type=args.type,
count=args.count,
time_range=time_range,
auth_level=args.auth_level,
query_rewrite=args.query_rewrite,
)
requests = _require_requests()
try:
data = do_search(body, api_key=api_key, ak=ak, sk=sk, session_token=session_token or "")
except requests.exceptions.HTTPError as exc:
print(f"HTTP Error: {exc}", file=sys.stderr)
if exc.response is not None:
print(exc.response.text, file=sys.stderr)
sys.exit(1)
except Exception as exc:
print(f"Error: {exc}", file=sys.stderr)
sys.exit(1)
if data is None:
print("No response.", file=sys.stderr)
sys.exit(1)
error = (data.get("ResponseMetadata") or {}).get("Error")
if error:
print(f"API Error [{error.get('Code')}]: {error.get('Message')}", file=sys.stderr)
sys.exit(1)
print(format_output(data, args.type))
if __name__ == "__main__":
main()