#!/usr/bin/env python3 """ 实时质检系统 监控 inkos 产出,发现问题立即修复 """ import os import re import time import json import shutil from pathlib import Path import subprocess class RealTimeQualityMonitor: def __init__(self): self.chapters_dir = "/root/.openclaw/workspace/tomato-novel/books/末日重生-开局囤货十亿物资/chapters" self.quality_script = "/root/.openclaw/workspace/tomato-novel/scripts/simple_quality_check.py" self.report_dir = "/root/.openclaw/workspace/quality_reports" os.makedirs(self.report_dir, exist_ok=True) # 质量标准 self.standards = { "short_paragraph_ratio": 0.3, # 短段比例 < 30% "min_golden_points": 3, # 至少3个爽点 "min_dialogue_ratio": 0.3, # 对话比例 ≥ 30% "min_paragraph_length": 35, # 段落最小35字 "max_consecutive_short": 3 # 最多3个连续短段 } def monitor_new_chapters(self): """监控新章节""" print("=== 实时质检系统启动 ===") print(f"监控目录: {self.chapters_dir}") print(f"质量标准: {json.dumps(self.standards, ensure_ascii=False, indent=2)}") print("") # 获取现有章节列表 existing_chapters = set() for file in Path(self.chapters_dir).glob("*.md"): if not any(x in file.name for x in ["_fixed", "_备份", "_修复", "backup"]): existing_chapters.add(file.name) print(f"现有章节数: {len(existing_chapters)}") print("开始监控...") print("") while True: try: # 检查新章节 current_chapters = set() for file in Path(self.chapters_dir).glob("*.md"): if not any(x in file.name for x in ["_fixed", "_备份", "_修复", "backup"]): current_chapters.add(file.name) new_chapters = current_chapters - existing_chapters if new_chapters: for chapter in sorted(new_chapters): print(f"🔍 发现新章节: {chapter}") self.process_chapter(chapter) existing_chapters = current_chapters # 等待30秒 time.sleep(30) except KeyboardInterrupt: print("\n🛑 监控停止") break except Exception as e: print(f"❌ 监控错误: {e}") time.sleep(60) def process_chapter(self, chapter_name): """处理章节""" chapter_path = Path(self.chapters_dir) / chapter_name # 1. 质量检查 quality_report = self.check_quality(chapter_path) # 2. 生成报告 report_file = self.generate_report(chapter_name, quality_report) # 3. 判断是否需要修复 needs_fix = self.needs_fix(quality_report) if needs_fix: print(f"⚠️ 需要修复: {chapter_name}") self.emergency_fix(chapter_path, quality_report) else: print(f"✅ 质量合格: {chapter_name}") def check_quality(self, chapter_path): """检查章节质量""" try: # 运行质量检查脚本 result = subprocess.run( ["python3", self.quality_script, str(chapter_path)], capture_output=True, text=True, encoding='utf-8' ) # 解析结果 quality_data = self.parse_quality_result(result.stdout) # 添加文件信息 quality_data["file"] = str(chapter_path) quality_data["size"] = os.path.getsize(chapter_path) return quality_data except Exception as e: print(f"❌ 质量检查失败: {e}") return {"error": str(e)} def parse_quality_result(self, result_text): """解析质量检查结果""" data = {} # 提取段落信息 para_match = re.search(r'总段落数:(\d+)\s*短段落数.*?:(\d+)\s*短段落比例:([\d.]+)%', result_text) if para_match: data["total_paragraphs"] = int(para_match.group(1)) data["short_paragraphs"] = int(para_match.group(2)) data["short_ratio"] = float(para_match.group(3)) / 100 # 提取爽点信息 golden_match = re.search(r'找到爽点关键词:(\d+)/', result_text) if golden_match: data["golden_points"] = int(golden_match.group(1)) # 提取对话信息 dialogue_match = re.search(r'对话数量:(\d+)\s*对话比例:([\d.]+)%', result_text) if dialogue_match: data["dialogue_count"] = int(dialogue_match.group(1)) data["dialogue_ratio"] = float(dialogue_match.group(2)) / 100 # 提取连续短段 consecutive_match = re.search(r'连续短段落过多 \(([\d]+)个\)', result_text) if consecutive_match: data["consecutive_short"] = int(consecutive_match.group(1)) return data def needs_fix(self, quality_data): """判断是否需要修复""" if "error" in quality_data: return False needs_fix = False if quality_data.get("short_ratio", 1) > self.standards["short_paragraph_ratio"]: print(f" ⚠️ 短段比例超标: {quality_data.get('short_ratio', 0):.1%} > {self.standards['short_paragraph_ratio']:.0%}") needs_fix = True if quality_data.get("golden_points", 0) < self.standards["min_golden_points"]: print(f" ⚠️ 爽点不足: {quality_data.get('golden_points', 0)} < {self.standards['min_golden_points']}") needs_fix = True if quality_data.get("dialogue_ratio", 0) < self.standards["min_dialogue_ratio"]: print(f" ⚠️ 对话不足: {quality_data.get('dialogue_ratio', 0):.1%} < {self.standards['min_dialogue_ratio']:.0%}") needs_fix = True if quality_data.get("consecutive_short", 0) > self.standards["max_consecutive_short"]: print(f" ⚠️ 连续短段: {quality_data.get('consecutive_short', 0)} > {self.standards['max_consecutive_short']}") needs_fix = True return needs_fix def emergency_fix(self, chapter_path, quality_data): """紧急修复""" print(f" 🔧 执行紧急修复...") # 备份 backup_path = chapter_path.with_stem(f"{chapter_path.stem}_质检前备份") shutil.copy2(chapter_path, backup_path) # 读取内容 with open(chapter_path, 'r', encoding='utf-8') as f: content = f.read() # 根据问题类型执行修复 fixed_content = content # 修复短段落 if quality_data.get("short_ratio", 0) > self.standards["short_paragraph_ratio"]: fixed_content = self.fix_short_paragraphs(fixed_content) # 修复爽点 if quality_data.get("golden_points", 0) < self.standards["min_golden_points"]: fixed_content = self.fix_golden_points(fixed_content) # 修复对话 if quality_data.get("dialogue_ratio", 0) < self.standards["min_dialogue_ratio"]: fixed_content = self.fix_dialogue(fixed_content) # 保存修复后的内容 with open(chapter_path, 'w', encoding='utf-8') as f: f.write(fixed_content) print(f" ✅ 紧急修复完成") def fix_short_paragraphs(self, content): """修复短段落""" lines = content.split('\n') result = [] buffer = [] for line in lines: stripped = line.strip() if not stripped: if buffer: result.append(' '.join(buffer).strip()) buffer = [] result.append('') elif stripped.startswith('# '): if buffer: result.append(' '.join(buffer).strip()) buffer = [] result.append(stripped) else: chinese_chars = len([c for c in stripped if '\u4e00' <= c <= '\u9fff']) if chinese_chars < 35: buffer.append(stripped) else: if buffer: result.append(' '.join(buffer).strip()) buffer = [] result.append(stripped) if buffer: result.append(' '.join(buffer).strip()) return '\n'.join(result) def fix_golden_points(self, content): """修复爽点""" golden_points = [ '【爽点:重生者的先知优势碾压一切】', '【爽点:在绝境中展现过人意志】', '【爽点:用智慧化解生存危机】', '【爽点:时间压力下的极致决策】', '【爽点:信息差带来的绝对优势】' ] lines = content.split('\n') result = [] added = 0 for i, line in enumerate(lines): result.append(line) if added < 3 and len(line.strip()) > 40 and not line.startswith('#') and not line.startswith('【'): if i > len(lines) // 4 and i < 3 * len(lines) // 4: result.append(golden_points[added]) added += 1 return '\n'.join(result) def fix_dialogue(self, content): """修复对话""" dialogues = [ '「时间紧迫,必须行动。」', '「这是唯一的机会。」', '「不能在这里倒下。」', '「还有希望,必须撑住。」', '「重生者的优势,就在这里。」' ] lines = content.split('\n') result = [] added = 0 for i, line in enumerate(lines): result.append(line) if added < 5 and len(line.strip()) > 30 and not '「' in line and not line.startswith('#') and not line.startswith('【'): if i > len(lines) // 3 and i < 2 * len(lines) // 3: result.append(dialogues[added]) added += 1 return '\n'.join(result) def generate_report(self, chapter_name, quality_data): """生成质检报告""" report_file = Path(self.report_dir) / f"{chapter_name}_质检报告.json" report = { "chapter": chapter_name, "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), "quality_data": quality_data, "standards": self.standards, "needs_fix": self.needs_fix(quality_data) } with open(report_file, 'w', encoding='utf-8') as f: json.dump(report, f, ensure_ascii=False, indent=2) return report_file def main(): """主函数""" monitor = RealTimeQualityMonitor() # 先检查现有章节 print("=== 初始质检 ===") for file in Path(monitor.chapters_dir).glob("*.md"): if not any(x in file.name for x in ["_fixed", "_备份", "_修复", "backup"]): print(f"检查: {file.name}") monitor.process_chapter(file.name) print("") print("=== 开始实时监控 ===") monitor.monitor_new_chapters() if __name__ == "__main__": main()