novel-doomsday-resurgence/feishu_sync_system/sync_chapters.py

326 lines
11 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
飞书小说章节同步脚本
自动检测并上传末日重生开局囤货十亿物资的新章节到飞书云
"""
import os
import json
import time
import hashlib
from datetime import datetime
import subprocess
import sys
# 配置
CONFIG = {
"novel_path": "/root/.openclaw/workspace/tomato-novel/books/末日重生-开局囤货十亿物资",
"chapters_dir": "chapters",
"state_file": "/root/.openclaw/workspace/feishu_sync_system/sync_state.json",
"log_file": "/root/.openclaw/workspace/feishu_sync_system/sync_log.txt",
"batch_size": 5, # 每次批量上传的章节数
"max_retries": 3, # 重试次数
"retry_delay": 2, # 重试延迟(秒)
}
def load_sync_state():
"""加载同步状态"""
if os.path.exists(CONFIG["state_file"]):
try:
with open(CONFIG["state_file"], 'r', encoding='utf-8') as f:
return json.load(f)
except:
pass
return {
"last_sync_time": None,
"synced_chapters": {},
"total_chapters": 0,
"last_hash": ""
}
def save_sync_state(state):
"""保存同步状态"""
os.makedirs(os.path.dirname(CONFIG["state_file"]), exist_ok=True)
with open(CONFIG["state_file"], 'w', encoding='utf-8') as f:
json.dump(state, f, ensure_ascii=False, indent=2)
def get_chapter_info(chapter_path):
"""获取章节信息"""
try:
with open(chapter_path, 'r', encoding='utf-8') as f:
content = f.read()
# 从文件名提取章节号
filename = os.path.basename(chapter_path)
# 格式如0001_冰点记忆.md
if '_' in filename:
chapter_num_str = filename.split('_')[0]
try:
chapter_num = int(chapter_num_str)
except:
chapter_num = 0
else:
chapter_num = 0
# 提取标题
title = ""
lines = content.split('\n', 10)
for line in lines:
if line.startswith('# '):
title = line[2:].strip()
break
if not title:
title = filename.replace('.md', '').replace(f"{chapter_num_str}_", "")
# 计算哈希值
content_hash = hashlib.md5(content.encode('utf-8')).hexdigest()
return {
"number": chapter_num,
"title": title,
"path": chapter_path,
"content": content,
"hash": content_hash,
"size": len(content),
"word_count": len(content) // 3 # 粗略估算字数
}
except Exception as e:
log_error(f"读取章节失败 {chapter_path}: {e}")
return None
def find_chapters():
"""查找所有章节"""
chapters_dir = os.path.join(CONFIG["novel_path"], CONFIG["chapters_dir"])
if not os.path.exists(chapters_dir):
return []
chapters = []
for filename in os.listdir(chapters_dir):
if filename.endswith('.md') and not filename.startswith('0000_'):
# 排除备份文件和报告文件
if not any(x in filename for x in ['_backup', '_report', '_fix', '_修复', '_质检']):
chapter_path = os.path.join(chapters_dir, filename)
chapters.append(chapter_path)
# 按章节号排序
chapters.sort()
return chapters
def upload_to_feishu(chapter_info):
"""上传章节到飞书"""
title = f"{chapter_info['number']}章:{chapter_info['title']}(完整版)"
content = chapter_info['content']
# 构建完整的Markdown内容
full_markdown = f"""# {title}
---
{content}
---
**章节信息**
- 章节号{chapter_info['number']}
- 标题{chapter_info['title']}
- 字数 {chapter_info['word_count']:,}
- 大小{chapter_info['size']:,} 字节
- 同步时间{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
- 状态已同步
**同步系统**
- 同步脚本feishu_sync_system
- 小说末日重生开局囤货十亿物资
- 总进度22/200
- 版本自动同步 v1.0
---
*同步时间{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*
*同步系统OpenClaw 飞书自动同步*"""
# 调用feishu_create_doc工具
for attempt in range(CONFIG["max_retries"]):
try:
# 这里使用OpenClaw的feishu_create_doc工具
# 在实际使用时需要根据OpenClaw的API进行调整
result = upload_via_tool(title, full_markdown)
if result:
log_info(f"章节 {chapter_info['number']} 上传成功")
return {
"doc_id": result.get("doc_id", f"doc_{chapter_info['number']}"),
"doc_url": result.get("doc_url", "https://www.feishu.cn"),
"success": True
}
except Exception as e:
log_error(f"上传章节 {chapter_info['number']} 失败(尝试 {attempt+1}/{CONFIG['max_retries']}: {e}")
if attempt < CONFIG["max_retries"] - 1:
time.sleep(CONFIG["retry_delay"])
return {"success": False}
def upload_via_tool(title, content):
"""通过OpenClaw工具上传这里是模拟实际使用时需要调用OpenClaw API"""
# 在实际部署中这里应该调用OpenClaw的feishu_create_doc工具
# 为了演示,这里返回一个模拟结果
return {
"doc_id": f"doc_{int(time.time())}",
"doc_url": f"https://www.feishu.cn/docx/doc_{int(time.time())}",
"success": True
}
def log_info(message):
"""记录信息日志"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
log_entry = f"[INFO] {timestamp} - {message}\n"
print(log_entry.strip())
os.makedirs(os.path.dirname(CONFIG["log_file"]), exist_ok=True)
with open(CONFIG["log_file"], 'a', encoding='utf-8') as f:
f.write(log_entry)
def log_error(message):
"""记录错误日志"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
log_entry = f"[ERROR] {timestamp} - {message}\n"
print(log_entry.strip())
os.makedirs(os.path.dirname(CONFIG["log_file"]), exist_ok=True)
with open(CONFIG["log_file"], 'a', encoding='utf-8') as f:
f.write(log_entry)
def sync_chapters():
"""主同步函数"""
log_info("开始同步章节")
# 加载状态
state = load_sync_state()
# 查找所有章节
chapter_paths = find_chapters()
log_info(f"找到 {len(chapter_paths)} 个章节文件")
# 检查是否需要同步
chapters_to_sync = []
for chapter_path in chapter_paths:
chapter_info = get_chapter_info(chapter_path)
if not chapter_info:
continue
chapter_num = chapter_info["number"]
chapter_hash = chapter_info["hash"]
# 检查是否已经同步过
if str(chapter_num) in state["synced_chapters"]:
synced_hash = state["synced_chapters"][str(chapter_num)].get("hash", "")
if synced_hash == chapter_hash:
# 哈希相同,无需重新上传
continue
chapters_to_sync.append(chapter_info)
if not chapters_to_sync:
log_info("没有需要同步的新章节")
return
log_info(f"发现 {len(chapters_to_sync)} 个需要同步的章节")
# 按批次同步
for i in range(0, len(chapters_to_sync), CONFIG["batch_size"]):
batch = chapters_to_sync[i:i + CONFIG["batch_size"]]
log_info(f"同步批次 {i//CONFIG['batch_size'] + 1}: 章节 {batch[0]['number']}{batch[-1]['number']}")
for chapter_info in batch:
log_info(f"正在同步第{chapter_info['number']}章: {chapter_info['title']}")
result = upload_to_feishu(chapter_info)
if result["success"]:
# 更新状态
state["synced_chapters"][str(chapter_info["number"])] = {
"hash": chapter_info["hash"],
"title": chapter_info["title"],
"doc_id": result.get("doc_id", ""),
"doc_url": result.get("doc_url", ""),
"sync_time": datetime.now().isoformat(),
"size": chapter_info["size"],
"word_count": chapter_info["word_count"]
}
state["last_sync_time"] = datetime.now().isoformat()
state["total_chapters"] = len(chapter_paths)
# 保存状态
save_sync_state(state)
log_info(f"{chapter_info['number']}章同步完成")
else:
log_error(f"{chapter_info['number']}章同步失败")
# 防止请求过快
time.sleep(1)
log_info(f"同步完成。已同步 {len(state['synced_chapters'])}/{len(chapter_paths)} 个章节")
def generate_report():
"""生成同步报告"""
state = load_sync_state()
report = f"""# 飞书同步系统报告
## 概要
- 报告时间{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
- 总章节数{state.get('total_chapters', 0)}
- 已同步章节{len(state.get('synced_chapters', {}))}
- 上次同步{state.get('last_sync_time', '从未')}
## 已同步章节列表
"""
synced_chapters = state.get("synced_chapters", {})
for num in sorted(synced_chapters.keys(), key=lambda x: int(x)):
chapter = synced_chapters[num]
report += f"- 第{num}章:{chapter.get('title', '')} ({chapter.get('sync_time', '')})\n"
report += f"""
## 系统信息
- 脚本版本v1.0
- 小说路径{CONFIG['novel_path']}
- 状态文件{CONFIG['state_file']}
- 日志文件{CONFIG['log_file']}
---
报告生成时间{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
"""
# 保存报告
report_file = os.path.join(os.path.dirname(CONFIG["log_file"]), "sync_report.md")
with open(report_file, 'w', encoding='utf-8') as f:
f.write(report)
log_info(f"报告已生成:{report_file}")
return report_file
def main():
"""主函数"""
log_info("=" * 50)
log_info("飞书章节同步系统启动")
try:
# 检查路径
if not os.path.exists(CONFIG["novel_path"]):
log_error(f"小说路径不存在:{CONFIG['novel_path']}")
return
# 执行同步
sync_chapters()
# 生成报告
report_file = generate_report()
log_info(f"同步报告:{report_file}")
log_info("同步系统运行完成")
except Exception as e:
log_error(f"同步系统运行失败:{e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()