包含: - 核心配置文件(AGENTS.md, SOUL.md, USER.md等) - 记忆系统(memory/文件夹) - 技能库(skills/文件夹) - 小说内容(novel/文件夹) - .gitignore配置
428 lines
14 KiB
Python
428 lines
14 KiB
Python
# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates.
|
||
#
|
||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||
# you may not use this file except in compliance with the License.
|
||
# You may obtain a copy of the License at
|
||
#
|
||
# http://www.apache.org/licenses/LICENSE-2.0
|
||
#
|
||
# Unless required by applicable law or agreed to in writing, software
|
||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
# See the License for the specific language governing permissions and
|
||
# limitations under the License.
|
||
|
||
#!/usr/bin/env python3
|
||
"""火山引擎联网搜索 API 客户端。
|
||
|
||
官方文档:https://www.volcengine.com/docs/85508/1650263
|
||
签名参考:https://github.com/volcengine/volc-openapi-demos/blob/main/signature/python/sign.py
|
||
|
||
认证优先级:
|
||
1. WEB_SEARCH_API_KEY 环境变量或 --api-key
|
||
2. VOLCENGINE_ACCESS_KEY + VOLCENGINE_SECRET_KEY 环境变量
|
||
3. VeFaaS IAM 临时凭证(需 veadk-python 库)
|
||
|
||
示例:
|
||
python web_search.py "北京天气"
|
||
python web_search.py "OpenAI 最新发布" --time-range OneWeek
|
||
python web_search.py "故宫博物院" --type image --count 3
|
||
"""
|
||
|
||
import argparse
|
||
import datetime
|
||
import getpass
|
||
import hashlib
|
||
import hmac
|
||
import json
|
||
import os
|
||
import re
|
||
import shlex
|
||
import sys
|
||
from typing import Optional
|
||
from urllib.parse import quote
|
||
|
||
SERVICE = "volc_torchlight_api"
|
||
VERSION = "2025-01-01"
|
||
REGION = "cn-beijing"
|
||
HOST = "mercury.volcengineapi.com"
|
||
ACTION = "WebSearch"
|
||
INTERNAL_API_URL = "https://open.feedcoopapi.com/search_api/web_search"
|
||
TRAFFIC_TAG_HEADER = "X-Traffic-Tag"
|
||
TRAFFIC_TAG_VALUE = "skill_web_search_common"
|
||
TIME_RANGE_SHORTCUTS = {"OneDay", "OneWeek", "OneMonth", "OneYear"}
|
||
DATE_RANGE_PATTERN = re.compile(r"^(\d{4}-\d{2}-\d{2})\.\.(\d{4}-\d{2}-\d{2})$")
|
||
LEGACY_ENV_PATH = "/root/.openclaw/.env"
|
||
SUMMARY_PREVIEW_LIMIT = 1000
|
||
|
||
|
||
# ---- 依赖加载 ----
|
||
|
||
def _require_requests():
|
||
try:
|
||
import requests
|
||
except ImportError:
|
||
print("Error: requests not installed. Run: pip install requests", file=sys.stderr)
|
||
sys.exit(1)
|
||
return requests
|
||
|
||
|
||
def _load_legacy_env_file(env_path: str = LEGACY_ENV_PATH) -> None:
|
||
if not os.path.exists(env_path):
|
||
return
|
||
|
||
try:
|
||
with open(env_path, "r", encoding="utf-8") as f:
|
||
for raw_line in f:
|
||
line = raw_line.strip()
|
||
if not line or line.startswith("#"):
|
||
continue
|
||
if line.startswith("export "):
|
||
line = line[len("export "):].strip()
|
||
if "=" not in line:
|
||
continue
|
||
|
||
key, value = line.split("=", 1)
|
||
key = key.strip()
|
||
value = value.strip()
|
||
if not key:
|
||
continue
|
||
|
||
try:
|
||
parsed = shlex.split(value, comments=True)
|
||
value = parsed[0] if parsed else ""
|
||
except ValueError:
|
||
value = value.strip("\"'")
|
||
|
||
os.environ.setdefault(key, value)
|
||
except OSError:
|
||
return
|
||
|
||
|
||
# ---- 火山引擎 HMAC-SHA256 签名 (基于官方示例) ----
|
||
|
||
def _hmac_sha256(key: bytes, content: str) -> bytes:
|
||
return hmac.new(key, content.encode("utf-8"), hashlib.sha256).digest()
|
||
|
||
|
||
def _hash_sha256(content: str) -> str:
|
||
return hashlib.sha256(content.encode("utf-8")).hexdigest()
|
||
|
||
|
||
def _norm_query(params: dict) -> str:
|
||
query = ""
|
||
for key in sorted(params.keys()):
|
||
if isinstance(params[key], list):
|
||
for value in params[key]:
|
||
query += quote(key, safe="-_.~") + "=" + quote(value, safe="-_.~") + "&"
|
||
else:
|
||
query += quote(key, safe="-_.~") + "=" + quote(str(params[key]), safe="-_.~") + "&"
|
||
return query[:-1].replace("+", "%20") if query else ""
|
||
|
||
|
||
def _utc_now():
|
||
try:
|
||
from datetime import timezone
|
||
return datetime.datetime.now(timezone.utc)
|
||
except ImportError:
|
||
return datetime.datetime.utcnow()
|
||
|
||
|
||
def _sign_request(method: str, ak: str, sk: str, body: str, session_token: str = "") -> dict:
|
||
now = _utc_now()
|
||
x_date = now.strftime("%Y%m%dT%H%M%SZ")
|
||
short_date = x_date[:8]
|
||
x_content_sha256 = _hash_sha256(body)
|
||
content_type = "application/json"
|
||
|
||
query_params = {"Action": ACTION, "Version": VERSION}
|
||
|
||
signed_header_keys = ["content-type", "host", "x-content-sha256", "x-date", "x-traffic-tag"]
|
||
if session_token:
|
||
signed_header_keys.append("x-security-token")
|
||
signed_header_keys.sort()
|
||
signed_headers_str = ";".join(signed_header_keys)
|
||
|
||
canonical_header_lines = [
|
||
f"content-type:{content_type}",
|
||
f"host:{HOST}",
|
||
f"x-content-sha256:{x_content_sha256}",
|
||
f"x-date:{x_date}",
|
||
f"x-traffic-tag:{TRAFFIC_TAG_VALUE}",
|
||
]
|
||
if session_token:
|
||
canonical_header_lines.append(f"x-security-token:{session_token}")
|
||
canonical_header_lines.sort()
|
||
|
||
canonical_request = "\n".join(
|
||
[
|
||
method.upper(),
|
||
"/",
|
||
_norm_query(query_params),
|
||
"\n".join(canonical_header_lines),
|
||
"",
|
||
signed_headers_str,
|
||
x_content_sha256,
|
||
]
|
||
)
|
||
|
||
credential_scope = f"{short_date}/{REGION}/{SERVICE}/request"
|
||
string_to_sign = "\n".join(
|
||
[
|
||
"HMAC-SHA256",
|
||
x_date,
|
||
credential_scope,
|
||
_hash_sha256(canonical_request),
|
||
]
|
||
)
|
||
|
||
k_date = _hmac_sha256(sk.encode("utf-8"), short_date)
|
||
k_region = _hmac_sha256(k_date, REGION)
|
||
k_service = _hmac_sha256(k_region, SERVICE)
|
||
k_signing = _hmac_sha256(k_service, "request")
|
||
signature = _hmac_sha256(k_signing, string_to_sign).hex()
|
||
|
||
authorization = (
|
||
f"HMAC-SHA256 Credential={ak}/{credential_scope}, "
|
||
f"SignedHeaders={signed_headers_str}, "
|
||
f"Signature={signature}"
|
||
)
|
||
|
||
headers = {
|
||
"Content-Type": content_type,
|
||
"Host": HOST,
|
||
"X-Date": x_date,
|
||
"X-Content-Sha256": x_content_sha256,
|
||
TRAFFIC_TAG_HEADER: TRAFFIC_TAG_VALUE,
|
||
"Authorization": authorization,
|
||
}
|
||
if session_token:
|
||
headers["X-Security-Token"] = session_token
|
||
return headers
|
||
|
||
|
||
# ---- 凭证获取 ----
|
||
|
||
def _get_credentials() -> tuple:
|
||
"""返回 (ak, sk, session_token)。"""
|
||
ak = os.getenv("VOLCENGINE_ACCESS_KEY")
|
||
sk = os.getenv("VOLCENGINE_SECRET_KEY")
|
||
if ak and sk:
|
||
return ak, sk, ""
|
||
|
||
try:
|
||
from veadk.auth.veauth.utils import get_credential_from_vefaas_iam
|
||
|
||
cred = get_credential_from_vefaas_iam()
|
||
return cred.access_key_id, cred.secret_access_key, cred.session_token
|
||
except Exception:
|
||
return None, None, ""
|
||
|
||
|
||
# ---- 请求构建 ----
|
||
|
||
def _get_api_key(cli_api_key: Optional[str]) -> Optional[str]:
|
||
api_key = cli_api_key or os.getenv("WEB_SEARCH_API_KEY")
|
||
return api_key.strip() if api_key else None
|
||
|
||
|
||
def _validate_time_range(time_range: Optional[str]) -> Optional[str]:
|
||
if not time_range:
|
||
return None
|
||
if time_range in TIME_RANGE_SHORTCUTS:
|
||
return time_range
|
||
|
||
match = DATE_RANGE_PATTERN.match(time_range)
|
||
if not match:
|
||
raise ValueError(
|
||
"--time-range 必须是 OneDay/OneWeek/OneMonth/OneYear,或日期区间 YYYY-MM-DD..YYYY-MM-DD。"
|
||
)
|
||
|
||
start_text, end_text = match.groups()
|
||
try:
|
||
start_date = datetime.date.fromisoformat(start_text)
|
||
end_date = datetime.date.fromisoformat(end_text)
|
||
except ValueError as exc:
|
||
raise ValueError("--time-range 中的日期必须是有效的 YYYY-MM-DD。") from exc
|
||
|
||
if start_date > end_date:
|
||
raise ValueError("--time-range 的开始日期不能晚于结束日期。")
|
||
|
||
return time_range
|
||
|
||
|
||
def build_body(
|
||
query: str,
|
||
search_type: str = "web",
|
||
count: int = 5,
|
||
time_range: Optional[str] = None,
|
||
auth_level: int = 0,
|
||
query_rewrite: bool = False,
|
||
) -> dict:
|
||
body = {"Query": query, "SearchType": search_type, "Count": count}
|
||
|
||
if search_type == "web":
|
||
body["NeedSummary"] = True
|
||
filters = {}
|
||
if auth_level > 0:
|
||
filters["AuthInfoLevel"] = auth_level
|
||
if filters:
|
||
body["Filter"] = filters
|
||
if time_range:
|
||
body["TimeRange"] = time_range
|
||
|
||
if query_rewrite:
|
||
body["QueryControl"] = {"QueryRewrite": True}
|
||
|
||
return body
|
||
|
||
|
||
# ---- API 调用 ----
|
||
|
||
def do_search(
|
||
body: dict,
|
||
api_key: Optional[str] = None,
|
||
ak: Optional[str] = None,
|
||
sk: Optional[str] = None,
|
||
session_token: str = "",
|
||
):
|
||
requests = _require_requests()
|
||
body_str = json.dumps(body, ensure_ascii=False)
|
||
if api_key:
|
||
headers = {
|
||
"Content-Type": "application/json",
|
||
TRAFFIC_TAG_HEADER: TRAFFIC_TAG_VALUE,
|
||
"Authorization": f"Bearer {api_key}",
|
||
}
|
||
url = INTERNAL_API_URL
|
||
else:
|
||
if not ak or not sk:
|
||
raise ValueError("missing volcengine credentials")
|
||
headers = _sign_request("POST", ak, sk, body_str, session_token)
|
||
url = f"https://{HOST}?Action={ACTION}&Version={VERSION}"
|
||
|
||
response = requests.post(url, headers=headers, data=body_str.encode("utf-8"), timeout=30)
|
||
response.raise_for_status()
|
||
return response.json()
|
||
|
||
|
||
# ---- 输出格式化 ----
|
||
|
||
def format_output(data: dict, search_type: str) -> str:
|
||
result = data.get("Result", {})
|
||
lines = [f"结果数: {result.get('ResultCount', 0)} 耗时: {result.get('TimeCost', 0)}ms", ""]
|
||
|
||
if search_type == "web":
|
||
for item in result.get("WebResults") or []:
|
||
lines.append(f"[{item.get('SortId', '')}] {item.get('Title', '')}")
|
||
|
||
meta_parts = [part for part in [item.get("SiteName", ""), item.get("AuthInfoDes", "")] if part]
|
||
if meta_parts:
|
||
lines.append(f" {' | '.join(meta_parts)}")
|
||
|
||
if item.get("Url"):
|
||
lines.append(f" {item['Url']}")
|
||
|
||
summary = item.get("Summary") or item.get("Snippet", "")
|
||
if summary:
|
||
lines.append(f" {summary[:SUMMARY_PREVIEW_LIMIT]}")
|
||
lines.append("")
|
||
|
||
elif search_type == "image":
|
||
for item in result.get("ImageResults") or []:
|
||
image = item.get("Image", {})
|
||
lines.append(f"[{item.get('SortId', '')}] {item.get('Title', '')}")
|
||
if image.get("Url"):
|
||
lines.append(f" {image['Url']}")
|
||
lines.append(f" {image.get('Width', '?')}x{image.get('Height', '?')} ({image.get('Shape', '')})")
|
||
lines.append("")
|
||
|
||
return "\n".join(lines)
|
||
|
||
|
||
# ---- CLI ----
|
||
|
||
def main():
|
||
_load_legacy_env_file()
|
||
|
||
parser = argparse.ArgumentParser(description="火山引擎联网搜索 API\nhttps://www.volcengine.com/docs/85508/1650263")
|
||
parser.add_argument("query", help="搜索关键词")
|
||
parser.add_argument("--type", "-t", default="web", choices=["web", "image"])
|
||
parser.add_argument("--count", "-c", type=int, default=5)
|
||
parser.add_argument(
|
||
"--time-range",
|
||
help="OneDay/OneWeek/OneMonth/OneYear/YYYY-MM-DD..YYYY-MM-DD",
|
||
)
|
||
parser.add_argument("--auth-level", type=int, default=0, choices=[0, 1])
|
||
parser.add_argument("--query-rewrite", action="store_true", help="开启 Query 改写")
|
||
parser.add_argument("--api-key", help="API Key(优先于环境变量 WEB_SEARCH_API_KEY)")
|
||
parser.add_argument("--prompt-api-key", action="store_true", help="交互式输入 API Key(不回显)")
|
||
|
||
args = parser.parse_args()
|
||
|
||
if args.type == "image" and args.count > 5:
|
||
print("Error: image 类型最多返回 5 条,请调整 --count。", file=sys.stderr)
|
||
sys.exit(1)
|
||
if args.type == "web" and args.count > 50:
|
||
print("Error: web 类型最多返回 50 条,请调整 --count。", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
try:
|
||
time_range = _validate_time_range(args.time_range)
|
||
except ValueError as exc:
|
||
print(f"Error: {exc}", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
api_key = _get_api_key(args.api_key)
|
||
if not api_key and args.prompt_api_key:
|
||
entered = getpass.getpass("API Key: ").strip()
|
||
api_key = entered or None
|
||
|
||
ak = sk = session_token = None
|
||
if not api_key:
|
||
ak, sk, session_token = _get_credentials()
|
||
if not ak or not sk:
|
||
print(
|
||
"Error: 未找到凭证。请配置以下任一方式:\n"
|
||
"1) API Key:设置 WEB_SEARCH_API_KEY 或传入 --api-key\n"
|
||
"2) AK/SK:设置 VOLCENGINE_ACCESS_KEY 和 VOLCENGINE_SECRET_KEY",
|
||
file=sys.stderr,
|
||
)
|
||
sys.exit(1)
|
||
|
||
body = build_body(
|
||
query=args.query,
|
||
search_type=args.type,
|
||
count=args.count,
|
||
time_range=time_range,
|
||
auth_level=args.auth_level,
|
||
query_rewrite=args.query_rewrite,
|
||
)
|
||
|
||
requests = _require_requests()
|
||
try:
|
||
data = do_search(body, api_key=api_key, ak=ak, sk=sk, session_token=session_token or "")
|
||
except requests.exceptions.HTTPError as exc:
|
||
print(f"HTTP Error: {exc}", file=sys.stderr)
|
||
if exc.response is not None:
|
||
print(exc.response.text, file=sys.stderr)
|
||
sys.exit(1)
|
||
except Exception as exc:
|
||
print(f"Error: {exc}", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
if data is None:
|
||
print("No response.", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
error = (data.get("ResponseMetadata") or {}).get("Error")
|
||
if error:
|
||
print(f"API Error [{error.get('Code')}]: {error.get('Message')}", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
print(format_output(data, args.type))
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|