novel-doomsday-resurgence/feishu_sync_system/monitor_new_chapters.py

338 lines
12 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
新章节监控脚本
实时监控末日重生开局囤货十亿物资是否有新章节创建
"""
import os
import json
import time
import hashlib
from datetime import datetime
import subprocess
import sys
import threading
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
# 配置
CONFIG = {
"novel_path": "/root/.openclaw/workspace/tomato-novel/books/末日大战",
"chapters_dir": "chapters",
"monitor_dir": "/root/.openclaw/workspace/tomato-novel/books/末日大战/chapters",
"state_file": "/root/.openclaw/workspace/feishu_sync_system/sync_state.json",
"log_file": "/root/.openclaw/feishu_sync_system/monitor_log.txt",
"check_interval": 300, # 检查间隔5分钟
"last_check_time": None,
"new_chapters_found": [],
}
def load_sync_state():
"""加载同步状态"""
if os.path.exists(CONFIG["state_file"]):
try:
with open(CONFIG["state_file"], 'r', encoding='utf-8') as f:
return json.load(f)
except:
pass
return {"synced_chapters": {}}
def log_info(message):
"""记录信息日志"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
log_entry = f"[MONITOR] {timestamp} - {message}\n"
print(log_entry.strip())
os.makedirs(os.path.dirname(CONFIG["log_file"]), exist_ok=True)
with open(CONFIG["log同步_system_log.txt", 'a', encoding='utf-8') as f:
f.write(log_entry)
def log_error(message):
"""记录错误日志"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
log_entry = f"[MONITOR.ERROR] {timestamp} - {message}\n"
print(log_entry.strip())
os.makedirs(os.path.dirname(CONFIG["log_file"]), exist_ok=True)
with open(CONFIG["log_file"], 'a', encoding='utf-8') as f:
f.write(log_entry)
def get_chapter_info(chapter_path):
"""获取章节信息"""
try:
with open(chapter_path, 'r', encoding='utf-8') as f:
content = f.read()
# 从文件名提取章节号
filename = os.path.basename(chapter_path)
# 格式如0001_冰点记忆.md
if '_' in filename:
chapter_num_str = filename.split('_')[0]
try:
chapter_num = int(chapter_num_str)
except:
chapter_num = 0
else:
chapter_num = 0
# 提取标题
title = ""
lines = content.split('\n', 10)
for line in lines:
if line.startswith('# '):
title = line[2:].strip()
break
if not title:
title = filename.replace('.md', '').replace(f"{chapter_num_str}_", "")
# 计算哈希值
content_hash = hashlib.md5(content.encode('utf-8')).hexdigest()
return {
"number": chapter_num,
"title": title,
"path": chapter_path,
"hash": content_hash,
"size": len(content),
"word_count": len(content) // 3,
"modified_time": os.path.getmtime(chapter_path)
}
except Exception as e:
log_error(f"读取章节失败 {chapter_path}: {e}")
return None
def find_new_chapters():
"""查找新章节"""
chapters_dir = os.path.join(CONFIG["novel_path"], CONFIG["chapters_dir"])
if not os.path.exists(chapters_dir):
log_info(f"章节目录不存在: {chapters_dir}")
return []
# 加载当前状态
state = load_sync_state()
synced_chapters = state.get("synced_chapters", {})
# 查找所有章节
chapters_dir = os.path.join(CONFIG["novel_path"], CONFIG["chapters_dir"])
all_chapters = []
for filename in os.listdir(chapters_dir):
if filename.endswith('.md') and not filename.startswith('0000_'):
# 排除备份文件和报告文件
if not any(x in filename for x in ['_backup', '_report', '_fix', '_修复', '_质检']):
chapter_path = os.path.join(chapters_dir, filename)
all_chapters.append(chapter_path)
# 检查哪些章节是新的或已修改
new_chapters = []
for chapter_path in all_chapters:
chapter_info = get_chapter_info(chapter_path)
if not chapter_info:
continue
chapter_num = str(chapter_info["number"])
chapter_hash = chapter_info["hash"]
# 检查是否已经同步过
if chapter_num not in synced_chapters:
# 新章节
new_chapters.append(chapter_info)
log_info(f"发现新章节: 第{chapter_num}章 - {chapter_info['title']}")
else:
synced_hash = synced_chapters[chapter_num].get("hash", "")
if synced_hash != chapter_hash:
# 章节内容已修改
log_info(f"章节内容已修改: 第{chapter_num}章 - {chapter_info['title']}")
new_chapters.append(chapter_info)
return new_chapters
class ChapterChangeHandler(FileSystemEventHandler):
"""监控文件变化的处理器"""
def __init__(self):
self.last_chapters = set()
self.new_chapters = []
super().__init__()
def on_created(self, event):
if event.is_directory:
return
if event.src_path.endswith('.md'):
# 检查是否是章节文件
filename = os.path.basename(event.src_path)
if not filename.startswith('0000_') and not any(x in filename for x in ['_backup', '_report', '_fix']):
log_info(f"检测到新章节文件: {filename}")
self.new_chapters.append(event.src_path)
# 检查是否是新的章节
chapter_info = get_chapter_info(event.src_path)
if chapter_info:
log_info(f"新章节: 第{chapter_info['number']}章 - {chapter_info['title']}")
# 触发同步
trigger_sync()
def check_for_new_chapters():
"""定期检查新章节"""
log_info("开始监控新章节")
last_chapters = set()
while True:
try:
log_info(f"检查新章节...")
# 查找所有章节
chapters_dir = os.path.join(CONFIG["novel_path"], CONFIG["chapters_dir"])
if not os.path.exists(chapters_dir):
log_info("章节目录不存在,等待创建...")
time.sleep(CONFIG["check_interval"])
continue
current_chapters = set()
for filename in os.listdir(chapters_dir):
if filename.endswith('.md') and not filename.startswith('0000_'):
if not any(x in filename for x in ['_backup', '_report', '_fix']):
current_chapters.add(filename)
# 检测新章节
new_chapters = current_chapters - last_chapters
if new_chapters:
for chapter_file in new_chapters:
log_info(f"发现新章节: {chapter_file}")
# 获取章节信息
chapter_path = os.path.join(chapters_dir, chapter_file)
chapter_info = get_chapter_info(chapter_path)
if chapter_info:
log_info(f"新章节: 第{chapter_info['number']}章 - {chapter_info['title']}")
# 触发同步
trigger_sync()
last_chapters = current_chapters
log_info(f"当前章节数: {len(current_chapters)}")
time.sleep(CONFIG["check_interval"])
except Exception as e:
log_error(f"监控出错: {e}")
time.sleep(60) # 出错后等待60秒再重试
def trigger_sync():
"""触发同步"""
log_info("触发章节同步")
try:
# 运行同步脚本
script_path = os.path.join(os.path.dirname(__file__), "sync_chapters.py")
if os.path.exists(script_path):
# 使用子进程运行同步脚本
result = subprocess.run([sys.executable, script_path],
capture_output=True, text=True)
if result.returncode == 0:
log_info("同步脚本执行成功")
log_info(f"输出:\n{result.stdout}")
else:
log_error(f"同步脚本执行失败: {result.stderr}")
else:
log_error(f"同步脚本不存在: {script_path}")
except Exception as e:
log_error(f"触发同步失败: {e}")
def watch_changes():
"""实时监控文件变化"""
log_info("启动实时文件监控")
event_handler = ChapterChangeHandler()
observer = Observer()
try:
observer.schedule(event_handler, CONFIG["monitor_dir"], recursive=True)
observer.start()
log_info("文件监控已启动,正在监控章节目录...")
# 等待监控
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
log_info("监控停止")
except Exception as e:
log_error(f"实时监控失败: {e}")
finally:
observer.stop()
observer.join()
def get_chapter_list():
"""获取当前所有章节列表"""
chapters_dir = os.path.join(CONFIG["novel_path"], CONFIG["chapters_dir"])
if not os.path.exists(chapters_dir):
return []
chapters = []
for filename in os.listdir(chapters_dir):
if filename.endswith('.md') and not filename.startswith('0000_'):
if not any(x in filename for x in ['_backup', '_report', '_fix', '_修复', '_质检']):
chapters.append(filename)
chapters.sort()
return chapters
def monitor_changes():
"""定期监控变化"""
log_info("启动定期监控")
last_chapters = get_chapter_list()
log_info(f"初始章节数: {len(last_chapters)}")
while True:
try:
current_chapters = get_chapter_list()
# 检测新章节
if len(current_chapters) > len(last_chapters):
new_chapters = [chap for chap in current_chapters if chap not in last_chapters]
for chapter_file in new_chapters:
log_info(f"发现新章节: {chapter_file}")
# 触发同步
trigger_sync()
# 检测章节号更新
if len(current_chapters) == len(last_chapters):
for i in range(len(current_chapters)):
if current_chapters[i] != last_chapters[i]:
log_info(f"章节文件更新: {current_chapters[i]}")
last_chapters = current_chapters
time.sleep(CONFIG["check_interval"])
except Exception as e:
log_error(f"监控出错: {e}")
time.sleep(60) # 出错后等待60秒再重试
def main():
"""主函数"""
log_info("=" * 50)
log_info("新章节监控系统启动")
log_info(f"监控目录: {CONFIG['monitor_dir']}")
log_info(f"检查间隔: {CONFIG['check_interval']}")
log_info("=" * 50)
try:
# 检查路径
if not os.path.exists(CONFIG["novel_path"]):
log_error(f"小说路径不存在: {CONFIG['novel_path']}")
return
# 启动监控
monitor_changes()
except KeyboardInterrupt:
log_info("监控系统被用户中断")
except Exception as e:
log_error(f"监控系统运行失败: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()