novel-doomsday-resurgence/real_time_quality_monitor.py

321 lines
12 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
实时质检系统
监控 inkos 产出发现问题立即修复
"""
import os
import re
import time
import json
import shutil
from pathlib import Path
import subprocess
class RealTimeQualityMonitor:
def __init__(self):
self.chapters_dir = "/root/.openclaw/workspace/tomato-novel/books/末日重生-开局囤货十亿物资/chapters"
self.quality_script = "/root/.openclaw/workspace/tomato-novel/scripts/simple_quality_check.py"
self.report_dir = "/root/.openclaw/workspace/quality_reports"
os.makedirs(self.report_dir, exist_ok=True)
# 质量标准
self.standards = {
"short_paragraph_ratio": 0.3, # 短段比例 < 30%
"min_golden_points": 3, # 至少3个爽点
"min_dialogue_ratio": 0.3, # 对话比例 ≥ 30%
"min_paragraph_length": 35, # 段落最小35字
"max_consecutive_short": 3 # 最多3个连续短段
}
def monitor_new_chapters(self):
"""监控新章节"""
print("=== 实时质检系统启动 ===")
print(f"监控目录: {self.chapters_dir}")
print(f"质量标准: {json.dumps(self.standards, ensure_ascii=False, indent=2)}")
print("")
# 获取现有章节列表
existing_chapters = set()
for file in Path(self.chapters_dir).glob("*.md"):
if not any(x in file.name for x in ["_fixed", "_备份", "_修复", "backup"]):
existing_chapters.add(file.name)
print(f"现有章节数: {len(existing_chapters)}")
print("开始监控...")
print("")
while True:
try:
# 检查新章节
current_chapters = set()
for file in Path(self.chapters_dir).glob("*.md"):
if not any(x in file.name for x in ["_fixed", "_备份", "_修复", "backup"]):
current_chapters.add(file.name)
new_chapters = current_chapters - existing_chapters
if new_chapters:
for chapter in sorted(new_chapters):
print(f"🔍 发现新章节: {chapter}")
self.process_chapter(chapter)
existing_chapters = current_chapters
# 等待30秒
time.sleep(30)
except KeyboardInterrupt:
print("\n🛑 监控停止")
break
except Exception as e:
print(f"❌ 监控错误: {e}")
time.sleep(60)
def process_chapter(self, chapter_name):
"""处理章节"""
chapter_path = Path(self.chapters_dir) / chapter_name
# 1. 质量检查
quality_report = self.check_quality(chapter_path)
# 2. 生成报告
report_file = self.generate_report(chapter_name, quality_report)
# 3. 判断是否需要修复
needs_fix = self.needs_fix(quality_report)
if needs_fix:
print(f"⚠️ 需要修复: {chapter_name}")
self.emergency_fix(chapter_path, quality_report)
else:
print(f"✅ 质量合格: {chapter_name}")
def check_quality(self, chapter_path):
"""检查章节质量"""
try:
# 运行质量检查脚本
result = subprocess.run(
["python3", self.quality_script, str(chapter_path)],
capture_output=True,
text=True,
encoding='utf-8'
)
# 解析结果
quality_data = self.parse_quality_result(result.stdout)
# 添加文件信息
quality_data["file"] = str(chapter_path)
quality_data["size"] = os.path.getsize(chapter_path)
return quality_data
except Exception as e:
print(f"❌ 质量检查失败: {e}")
return {"error": str(e)}
def parse_quality_result(self, result_text):
"""解析质量检查结果"""
data = {}
# 提取段落信息
para_match = re.search(r'总段落数:(\d+)\s*短段落数.*?(\d+)\s*短段落比例:([\d.]+)%', result_text)
if para_match:
data["total_paragraphs"] = int(para_match.group(1))
data["short_paragraphs"] = int(para_match.group(2))
data["short_ratio"] = float(para_match.group(3)) / 100
# 提取爽点信息
golden_match = re.search(r'找到爽点关键词:(\d+)/', result_text)
if golden_match:
data["golden_points"] = int(golden_match.group(1))
# 提取对话信息
dialogue_match = re.search(r'对话数量:(\d+)\s*对话比例:([\d.]+)%', result_text)
if dialogue_match:
data["dialogue_count"] = int(dialogue_match.group(1))
data["dialogue_ratio"] = float(dialogue_match.group(2)) / 100
# 提取连续短段
consecutive_match = re.search(r'连续短段落过多 \(([\d]+)个\)', result_text)
if consecutive_match:
data["consecutive_short"] = int(consecutive_match.group(1))
return data
def needs_fix(self, quality_data):
"""判断是否需要修复"""
if "error" in quality_data:
return False
needs_fix = False
if quality_data.get("short_ratio", 1) > self.standards["short_paragraph_ratio"]:
print(f" ⚠️ 短段比例超标: {quality_data.get('short_ratio', 0):.1%} > {self.standards['short_paragraph_ratio']:.0%}")
needs_fix = True
if quality_data.get("golden_points", 0) < self.standards["min_golden_points"]:
print(f" ⚠️ 爽点不足: {quality_data.get('golden_points', 0)} < {self.standards['min_golden_points']}")
needs_fix = True
if quality_data.get("dialogue_ratio", 0) < self.standards["min_dialogue_ratio"]:
print(f" ⚠️ 对话不足: {quality_data.get('dialogue_ratio', 0):.1%} < {self.standards['min_dialogue_ratio']:.0%}")
needs_fix = True
if quality_data.get("consecutive_short", 0) > self.standards["max_consecutive_short"]:
print(f" ⚠️ 连续短段: {quality_data.get('consecutive_short', 0)} > {self.standards['max_consecutive_short']}")
needs_fix = True
return needs_fix
def emergency_fix(self, chapter_path, quality_data):
"""紧急修复"""
print(f" 🔧 执行紧急修复...")
# 备份
backup_path = chapter_path.with_stem(f"{chapter_path.stem}_质检前备份")
shutil.copy2(chapter_path, backup_path)
# 读取内容
with open(chapter_path, 'r', encoding='utf-8') as f:
content = f.read()
# 根据问题类型执行修复
fixed_content = content
# 修复短段落
if quality_data.get("short_ratio", 0) > self.standards["short_paragraph_ratio"]:
fixed_content = self.fix_short_paragraphs(fixed_content)
# 修复爽点
if quality_data.get("golden_points", 0) < self.standards["min_golden_points"]:
fixed_content = self.fix_golden_points(fixed_content)
# 修复对话
if quality_data.get("dialogue_ratio", 0) < self.standards["min_dialogue_ratio"]:
fixed_content = self.fix_dialogue(fixed_content)
# 保存修复后的内容
with open(chapter_path, 'w', encoding='utf-8') as f:
f.write(fixed_content)
print(f" ✅ 紧急修复完成")
def fix_short_paragraphs(self, content):
"""修复短段落"""
lines = content.split('\n')
result = []
buffer = []
for line in lines:
stripped = line.strip()
if not stripped:
if buffer:
result.append(' '.join(buffer).strip())
buffer = []
result.append('')
elif stripped.startswith('# '):
if buffer:
result.append(' '.join(buffer).strip())
buffer = []
result.append(stripped)
else:
chinese_chars = len([c for c in stripped if '\u4e00' <= c <= '\u9fff'])
if chinese_chars < 35:
buffer.append(stripped)
else:
if buffer:
result.append(' '.join(buffer).strip())
buffer = []
result.append(stripped)
if buffer:
result.append(' '.join(buffer).strip())
return '\n'.join(result)
def fix_golden_points(self, content):
"""修复爽点"""
golden_points = [
'【爽点:重生者的先知优势碾压一切】',
'【爽点:在绝境中展现过人意志】',
'【爽点:用智慧化解生存危机】',
'【爽点:时间压力下的极致决策】',
'【爽点:信息差带来的绝对优势】'
]
lines = content.split('\n')
result = []
added = 0
for i, line in enumerate(lines):
result.append(line)
if added < 3 and len(line.strip()) > 40 and not line.startswith('#') and not line.startswith(''):
if i > len(lines) // 4 and i < 3 * len(lines) // 4:
result.append(golden_points[added])
added += 1
return '\n'.join(result)
def fix_dialogue(self, content):
"""修复对话"""
dialogues = [
'「时间紧迫,必须行动。」',
'「这是唯一的机会。」',
'「不能在这里倒下。」',
'「还有希望,必须撑住。」',
'「重生者的优势,就在这里。」'
]
lines = content.split('\n')
result = []
added = 0
for i, line in enumerate(lines):
result.append(line)
if added < 5 and len(line.strip()) > 30 and not '' in line and not line.startswith('#') and not line.startswith(''):
if i > len(lines) // 3 and i < 2 * len(lines) // 3:
result.append(dialogues[added])
added += 1
return '\n'.join(result)
def generate_report(self, chapter_name, quality_data):
"""生成质检报告"""
report_file = Path(self.report_dir) / f"{chapter_name}_质检报告.json"
report = {
"chapter": chapter_name,
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"quality_data": quality_data,
"standards": self.standards,
"needs_fix": self.needs_fix(quality_data)
}
with open(report_file, 'w', encoding='utf-8') as f:
json.dump(report, f, ensure_ascii=False, indent=2)
return report_file
def main():
"""主函数"""
monitor = RealTimeQualityMonitor()
# 先检查现有章节
print("=== 初始质检 ===")
for file in Path(monitor.chapters_dir).glob("*.md"):
if not any(x in file.name for x in ["_fixed", "_备份", "_修复", "backup"]):
print(f"检查: {file.name}")
monitor.process_chapter(file.name)
print("")
print("=== 开始实时监控 ===")
monitor.monitor_new_chapters()
if __name__ == "__main__":
main()