novel-tools/sync_tools/feishu_sync.py

288 lines
9.4 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
飞书小说同步工具
将本地章节同步到飞书文档
"""
import os
import json
import time
from datetime import datetime
import requests
class FeishuNovelSync:
def __init__(self, config_path="configs/feishu_config.json"):
"""初始化同步工具"""
self.config = self.load_config(config_path)
self.base_url = "https://open.feishu.cn/open-apis"
self.headers = {
"Authorization": f"Bearer {self.config.get('access_token')}",
"Content-Type": "application/json"
}
def load_config(self, config_path):
"""加载配置文件"""
if os.path.exists(config_path):
with open(config_path, 'r', encoding='utf-8') as f:
return json.load(f)
else:
# 默认配置
return {
"app_id": "",
"app_secret": "",
"access_token": "",
"refresh_token": "",
"expire_time": 0
}
def save_config(self, config_path="configs/feishu_config.json"):
"""保存配置文件"""
os.makedirs(os.path.dirname(config_path), exist_ok=True)
with open(config_path, 'w', encoding='utf-8') as f:
json.dump(self.config, f, ensure_ascii=False, indent=2)
def get_access_token(self):
"""获取访问令牌"""
if self.config.get('expire_time', 0) > time.time():
return self.config.get('access_token')
url = f"{self.base_url}/auth/v3/tenant_access_token/internal"
data = {
"app_id": self.config['app_id'],
"app_secret": self.config['app_secret']
}
try:
response = requests.post(url, json=data)
result = response.json()
if result.get('code') == 0:
self.config['access_token'] = result['tenant_access_token']
self.config['expire_time'] = time.time() + result['expire'] - 60
self.save_config()
return self.config['access_token']
else:
print(f"获取token失败: {result}")
return None
except Exception as e:
print(f"请求失败: {e}")
return None
def create_doc(self, title, folder_token=None):
"""创建飞书文档"""
token = self.get_access_token()
if not token:
return None
url = f"{self.base_url}/drive/v1/files/create"
headers = self.headers.copy()
headers["Authorization"] = f"Bearer {token}"
data = {
"title": title,
"type": "doc",
"folder_token": folder_token or ""
}
try:
response = requests.post(url, headers=headers, json=data)
result = response.json()
if result.get('code') == 0:
doc_token = result['data']['token']
print(f"文档创建成功: {title} (token: {doc_token})")
return doc_token
else:
print(f"创建文档失败: {result}")
return None
except Exception as e:
print(f"创建文档请求失败: {e}")
return None
def update_doc(self, doc_token, content):
"""更新文档内容"""
token = self.get_access_token()
if not token:
return False
url = f"{self.base_url}/drive/v1/files/{doc_token}/content"
headers = self.headers.copy()
headers["Authorization"] = f"Bearer {token}"
# 构建文档内容
doc_content = {
"title": "小说章节",
"body": {
"blocks": [
{
"type": "paragraph",
"paragraph": {
"elements": [
{
"type": "text",
"text": content
}
]
}
}
]
}
}
try:
response = requests.put(url, headers=headers, json=doc_content)
result = response.json()
if result.get('code') == 0:
print(f"文档更新成功: {doc_token}")
return True
else:
print(f"更新文档失败: {result}")
return False
except Exception as e:
print(f"更新文档请求失败: {e}")
return False
def sync_chapter(self, chapter_path, doc_token=None, title=None):
"""同步单个章节"""
if not os.path.exists(chapter_path):
print(f"章节文件不存在: {chapter_path}")
return None
# 读取章节内容
with open(chapter_path, 'r', encoding='utf-8') as f:
content = f.read()
# 获取章节标题
if not title:
title = os.path.basename(chapter_path).replace('.md', '')
# 如果没有文档token先创建文档
if not doc_token:
doc_token = self.create_doc(title)
if not doc_token:
return None
# 更新文档内容
if self.update_doc(doc_token, content):
return doc_token
else:
return None
def sync_project(self, project_path, folder_token=None):
"""同步整个项目"""
if not os.path.exists(project_path):
print(f"项目路径不存在: {project_path}")
return False
chapters_dir = os.path.join(project_path, "chapters")
if not os.path.exists(chapters_dir):
print(f"章节目录不存在: {chapters_dir}")
return False
# 获取所有章节文件
chapter_files = []
for root, dirs, files in os.walk(chapters_dir):
for file in files:
if file.endswith('.md'):
chapter_files.append(os.path.join(root, file))
if not chapter_files:
print("没有找到章节文件")
return False
print(f"找到 {len(chapter_files)} 个章节文件")
# 按文件名排序
chapter_files.sort()
# 同步每个章节
results = []
for i, chapter_file in enumerate(chapter_files, 1):
print(f"[{i}/{len(chapter_files)}] 同步: {os.path.basename(chapter_file)}")
chapter_title = os.path.basename(chapter_file).replace('.md', '')
doc_token = self.sync_chapter(chapter_file, None, chapter_title)
if doc_token:
results.append({
"file": chapter_file,
"title": chapter_title,
"doc_token": doc_token,
"status": "success"
})
print(f" 成功: https://example.feishu.cn/docx/{doc_token}")
else:
results.append({
"file": chapter_file,
"title": chapter_title,
"doc_token": None,
"status": "failed"
})
print(f" 失败")
# 避免请求过快
time.sleep(1)
# 保存同步结果
sync_log = {
"project": project_path,
"sync_time": datetime.now().isoformat(),
"total_chapters": len(chapter_files),
"success": len([r for r in results if r['status'] == 'success']),
"failed": len([r for r in results if r['status'] == 'failed']),
"details": results
}
log_file = os.path.join(project_path, "sync", "sync_log.json")
os.makedirs(os.path.dirname(log_file), exist_ok=True)
with open(log_file, 'w', encoding='utf-8') as f:
json.dump(sync_log, f, ensure_ascii=False, indent=2)
print(f"\n同步完成:")
print(f" 总章节: {len(chapter_files)}")
print(f" 成功: {sync_log['success']}")
print(f" 失败: {sync_log['failed']}")
print(f" 日志: {log_file}")
return sync_log['success'] > 0
def main():
"""主函数"""
import argparse
parser = argparse.ArgumentParser(description="飞书小说同步工具")
parser.add_argument("--project", help="项目路径", default=".")
parser.add_argument("--config", help="配置文件路径", default="configs/feishu_config.json")
parser.add_argument("--chapter", help="单个章节文件路径")
parser.add_argument("--init", action="store_true", help="初始化配置")
args = parser.parse_args()
sync_tool = FeishuNovelSync(args.config)
if args.init:
# 初始化配置
app_id = input("请输入 App ID: ")
app_secret = input("请输入 App Secret: ")
sync_tool.config.update({
"app_id": app_id,
"app_secret": app_secret
})
sync_tool.save_config()
print("配置已保存")
return
if args.chapter:
# 同步单个章节
sync_tool.sync_chapter(args.chapter)
else:
# 同步整个项目
sync_tool.sync_project(args.project)
if __name__ == "__main__":
main()