#!/usr/bin/env python3 """ 新章节监控脚本 实时监控《末日重生:开局囤货十亿物资》是否有新章节创建 """ import os import json import time import hashlib from datetime import datetime import subprocess import sys import threading from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler # 配置 CONFIG = { "novel_path": "/root/.openclaw/workspace/tomato-novel/books/末日大战", "chapters_dir": "chapters", "monitor_dir": "/root/.openclaw/workspace/tomato-novel/books/末日大战/chapters", "state_file": "/root/.openclaw/workspace/feishu_sync_system/sync_state.json", "log_file": "/root/.openclaw/feishu_sync_system/monitor_log.txt", "check_interval": 300, # 检查间隔(秒),5分钟 "last_check_time": None, "new_chapters_found": [], } def load_sync_state(): """加载同步状态""" if os.path.exists(CONFIG["state_file"]): try: with open(CONFIG["state_file"], 'r', encoding='utf-8') as f: return json.load(f) except: pass return {"synced_chapters": {}} def log_info(message): """记录信息日志""" timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') log_entry = f"[MONITOR] {timestamp} - {message}\n" print(log_entry.strip()) os.makedirs(os.path.dirname(CONFIG["log_file"]), exist_ok=True) with open(CONFIG["log同步_system_log.txt", 'a', encoding='utf-8') as f: f.write(log_entry) def log_error(message): """记录错误日志""" timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') log_entry = f"[MONITOR.ERROR] {timestamp} - {message}\n" print(log_entry.strip()) os.makedirs(os.path.dirname(CONFIG["log_file"]), exist_ok=True) with open(CONFIG["log_file"], 'a', encoding='utf-8') as f: f.write(log_entry) def get_chapter_info(chapter_path): """获取章节信息""" try: with open(chapter_path, 'r', encoding='utf-8') as f: content = f.read() # 从文件名提取章节号 filename = os.path.basename(chapter_path) # 格式如:0001_冰点记忆.md if '_' in filename: chapter_num_str = filename.split('_')[0] try: chapter_num = int(chapter_num_str) except: chapter_num = 0 else: chapter_num = 0 # 提取标题 title = "" lines = content.split('\n', 10) for line in lines: if line.startswith('# '): title = line[2:].strip() break if not title: title = filename.replace('.md', '').replace(f"{chapter_num_str}_", "") # 计算哈希值 content_hash = hashlib.md5(content.encode('utf-8')).hexdigest() return { "number": chapter_num, "title": title, "path": chapter_path, "hash": content_hash, "size": len(content), "word_count": len(content) // 3, "modified_time": os.path.getmtime(chapter_path) } except Exception as e: log_error(f"读取章节失败 {chapter_path}: {e}") return None def find_new_chapters(): """查找新章节""" chapters_dir = os.path.join(CONFIG["novel_path"], CONFIG["chapters_dir"]) if not os.path.exists(chapters_dir): log_info(f"章节目录不存在: {chapters_dir}") return [] # 加载当前状态 state = load_sync_state() synced_chapters = state.get("synced_chapters", {}) # 查找所有章节 chapters_dir = os.path.join(CONFIG["novel_path"], CONFIG["chapters_dir"]) all_chapters = [] for filename in os.listdir(chapters_dir): if filename.endswith('.md') and not filename.startswith('0000_'): # 排除备份文件和报告文件 if not any(x in filename for x in ['_backup', '_report', '_fix', '_修复', '_质检']): chapter_path = os.path.join(chapters_dir, filename) all_chapters.append(chapter_path) # 检查哪些章节是新的或已修改 new_chapters = [] for chapter_path in all_chapters: chapter_info = get_chapter_info(chapter_path) if not chapter_info: continue chapter_num = str(chapter_info["number"]) chapter_hash = chapter_info["hash"] # 检查是否已经同步过 if chapter_num not in synced_chapters: # 新章节 new_chapters.append(chapter_info) log_info(f"发现新章节: 第{chapter_num}章 - {chapter_info['title']}") else: synced_hash = synced_chapters[chapter_num].get("hash", "") if synced_hash != chapter_hash: # 章节内容已修改 log_info(f"章节内容已修改: 第{chapter_num}章 - {chapter_info['title']}") new_chapters.append(chapter_info) return new_chapters class ChapterChangeHandler(FileSystemEventHandler): """监控文件变化的处理器""" def __init__(self): self.last_chapters = set() self.new_chapters = [] super().__init__() def on_created(self, event): if event.is_directory: return if event.src_path.endswith('.md'): # 检查是否是章节文件 filename = os.path.basename(event.src_path) if not filename.startswith('0000_') and not any(x in filename for x in ['_backup', '_report', '_fix']): log_info(f"检测到新章节文件: {filename}") self.new_chapters.append(event.src_path) # 检查是否是新的章节 chapter_info = get_chapter_info(event.src_path) if chapter_info: log_info(f"新章节: 第{chapter_info['number']}章 - {chapter_info['title']}") # 触发同步 trigger_sync() def check_for_new_chapters(): """定期检查新章节""" log_info("开始监控新章节") last_chapters = set() while True: try: log_info(f"检查新章节...") # 查找所有章节 chapters_dir = os.path.join(CONFIG["novel_path"], CONFIG["chapters_dir"]) if not os.path.exists(chapters_dir): log_info("章节目录不存在,等待创建...") time.sleep(CONFIG["check_interval"]) continue current_chapters = set() for filename in os.listdir(chapters_dir): if filename.endswith('.md') and not filename.startswith('0000_'): if not any(x in filename for x in ['_backup', '_report', '_fix']): current_chapters.add(filename) # 检测新章节 new_chapters = current_chapters - last_chapters if new_chapters: for chapter_file in new_chapters: log_info(f"发现新章节: {chapter_file}") # 获取章节信息 chapter_path = os.path.join(chapters_dir, chapter_file) chapter_info = get_chapter_info(chapter_path) if chapter_info: log_info(f"新章节: 第{chapter_info['number']}章 - {chapter_info['title']}") # 触发同步 trigger_sync() last_chapters = current_chapters log_info(f"当前章节数: {len(current_chapters)}") time.sleep(CONFIG["check_interval"]) except Exception as e: log_error(f"监控出错: {e}") time.sleep(60) # 出错后等待60秒再重试 def trigger_sync(): """触发同步""" log_info("触发章节同步") try: # 运行同步脚本 script_path = os.path.join(os.path.dirname(__file__), "sync_chapters.py") if os.path.exists(script_path): # 使用子进程运行同步脚本 result = subprocess.run([sys.executable, script_path], capture_output=True, text=True) if result.returncode == 0: log_info("同步脚本执行成功") log_info(f"输出:\n{result.stdout}") else: log_error(f"同步脚本执行失败: {result.stderr}") else: log_error(f"同步脚本不存在: {script_path}") except Exception as e: log_error(f"触发同步失败: {e}") def watch_changes(): """实时监控文件变化""" log_info("启动实时文件监控") event_handler = ChapterChangeHandler() observer = Observer() try: observer.schedule(event_handler, CONFIG["monitor_dir"], recursive=True) observer.start() log_info("文件监控已启动,正在监控章节目录...") # 等待监控 while True: time.sleep(1) except KeyboardInterrupt: observer.stop() log_info("监控停止") except Exception as e: log_error(f"实时监控失败: {e}") finally: observer.stop() observer.join() def get_chapter_list(): """获取当前所有章节列表""" chapters_dir = os.path.join(CONFIG["novel_path"], CONFIG["chapters_dir"]) if not os.path.exists(chapters_dir): return [] chapters = [] for filename in os.listdir(chapters_dir): if filename.endswith('.md') and not filename.startswith('0000_'): if not any(x in filename for x in ['_backup', '_report', '_fix', '_修复', '_质检']): chapters.append(filename) chapters.sort() return chapters def monitor_changes(): """定期监控变化""" log_info("启动定期监控") last_chapters = get_chapter_list() log_info(f"初始章节数: {len(last_chapters)}") while True: try: current_chapters = get_chapter_list() # 检测新章节 if len(current_chapters) > len(last_chapters): new_chapters = [chap for chap in current_chapters if chap not in last_chapters] for chapter_file in new_chapters: log_info(f"发现新章节: {chapter_file}") # 触发同步 trigger_sync() # 检测章节号更新 if len(current_chapters) == len(last_chapters): for i in range(len(current_chapters)): if current_chapters[i] != last_chapters[i]: log_info(f"章节文件更新: {current_chapters[i]}") last_chapters = current_chapters time.sleep(CONFIG["check_interval"]) except Exception as e: log_error(f"监控出错: {e}") time.sleep(60) # 出错后等待60秒再重试 def main(): """主函数""" log_info("=" * 50) log_info("新章节监控系统启动") log_info(f"监控目录: {CONFIG['monitor_dir']}") log_info(f"检查间隔: {CONFIG['check_interval']}秒") log_info("=" * 50) try: # 检查路径 if not os.path.exists(CONFIG["novel_path"]): log_error(f"小说路径不存在: {CONFIG['novel_path']}") return # 启动监控 monitor_changes() except KeyboardInterrupt: log_info("监控系统被用户中断") except Exception as e: log_error(f"监控系统运行失败: {e}") import traceback traceback.print_exc() if __name__ == "__main__": main()