novel-doomsday-resurgence/real_time_quality_monitor.py
唐天洛 5dc8c00de0 feat(sync): 固化小说内容到Git仓库
📚 小说内容:
- 《末日重生-开局囤货十亿物资》33章
- 完整的状态文件、记忆索引、钩子系统

🛠️ 系统配置:
- 版本控制管理系统
- 自动化脚本系统
- 质量监控系统

🧠 固化记忆:
- 长期记忆文件
- 系统配置文档
- 恢复流程指南

💾 数据安全:
- 本地备份系统
- Git版本控制
- 远程同步机制

同步时间: 2026-03-30 16:25:35
系统状态: inkos正常运行中 (PID: 1433309)
创作进度: 第33章《油粮》创作中
2026-03-30 16:25:35 +08:00

321 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
实时质检系统
监控 inkos 产出,发现问题立即修复
"""
import os
import re
import time
import json
import shutil
from pathlib import Path
import subprocess
class RealTimeQualityMonitor:
def __init__(self):
self.chapters_dir = "/root/.openclaw/workspace/tomato-novel/books/末日重生-开局囤货十亿物资/chapters"
self.quality_script = "/root/.openclaw/workspace/tomato-novel/scripts/simple_quality_check.py"
self.report_dir = "/root/.openclaw/workspace/quality_reports"
os.makedirs(self.report_dir, exist_ok=True)
# 质量标准
self.standards = {
"short_paragraph_ratio": 0.3, # 短段比例 < 30%
"min_golden_points": 3, # 至少3个爽点
"min_dialogue_ratio": 0.3, # 对话比例 ≥ 30%
"min_paragraph_length": 35, # 段落最小35字
"max_consecutive_short": 3 # 最多3个连续短段
}
def monitor_new_chapters(self):
"""监控新章节"""
print("=== 实时质检系统启动 ===")
print(f"监控目录: {self.chapters_dir}")
print(f"质量标准: {json.dumps(self.standards, ensure_ascii=False, indent=2)}")
print("")
# 获取现有章节列表
existing_chapters = set()
for file in Path(self.chapters_dir).glob("*.md"):
if not any(x in file.name for x in ["_fixed", "_备份", "_修复", "backup"]):
existing_chapters.add(file.name)
print(f"现有章节数: {len(existing_chapters)}")
print("开始监控...")
print("")
while True:
try:
# 检查新章节
current_chapters = set()
for file in Path(self.chapters_dir).glob("*.md"):
if not any(x in file.name for x in ["_fixed", "_备份", "_修复", "backup"]):
current_chapters.add(file.name)
new_chapters = current_chapters - existing_chapters
if new_chapters:
for chapter in sorted(new_chapters):
print(f"🔍 发现新章节: {chapter}")
self.process_chapter(chapter)
existing_chapters = current_chapters
# 等待30秒
time.sleep(30)
except KeyboardInterrupt:
print("\n🛑 监控停止")
break
except Exception as e:
print(f"❌ 监控错误: {e}")
time.sleep(60)
def process_chapter(self, chapter_name):
"""处理章节"""
chapter_path = Path(self.chapters_dir) / chapter_name
# 1. 质量检查
quality_report = self.check_quality(chapter_path)
# 2. 生成报告
report_file = self.generate_report(chapter_name, quality_report)
# 3. 判断是否需要修复
needs_fix = self.needs_fix(quality_report)
if needs_fix:
print(f"⚠️ 需要修复: {chapter_name}")
self.emergency_fix(chapter_path, quality_report)
else:
print(f"✅ 质量合格: {chapter_name}")
def check_quality(self, chapter_path):
"""检查章节质量"""
try:
# 运行质量检查脚本
result = subprocess.run(
["python3", self.quality_script, str(chapter_path)],
capture_output=True,
text=True,
encoding='utf-8'
)
# 解析结果
quality_data = self.parse_quality_result(result.stdout)
# 添加文件信息
quality_data["file"] = str(chapter_path)
quality_data["size"] = os.path.getsize(chapter_path)
return quality_data
except Exception as e:
print(f"❌ 质量检查失败: {e}")
return {"error": str(e)}
def parse_quality_result(self, result_text):
"""解析质量检查结果"""
data = {}
# 提取段落信息
para_match = re.search(r'总段落数:(\d+)\s*短段落数.*?(\d+)\s*短段落比例:([\d.]+)%', result_text)
if para_match:
data["total_paragraphs"] = int(para_match.group(1))
data["short_paragraphs"] = int(para_match.group(2))
data["short_ratio"] = float(para_match.group(3)) / 100
# 提取爽点信息
golden_match = re.search(r'找到爽点关键词:(\d+)/', result_text)
if golden_match:
data["golden_points"] = int(golden_match.group(1))
# 提取对话信息
dialogue_match = re.search(r'对话数量:(\d+)\s*对话比例:([\d.]+)%', result_text)
if dialogue_match:
data["dialogue_count"] = int(dialogue_match.group(1))
data["dialogue_ratio"] = float(dialogue_match.group(2)) / 100
# 提取连续短段
consecutive_match = re.search(r'连续短段落过多 \(([\d]+)个\)', result_text)
if consecutive_match:
data["consecutive_short"] = int(consecutive_match.group(1))
return data
def needs_fix(self, quality_data):
"""判断是否需要修复"""
if "error" in quality_data:
return False
needs_fix = False
if quality_data.get("short_ratio", 1) > self.standards["short_paragraph_ratio"]:
print(f" ⚠️ 短段比例超标: {quality_data.get('short_ratio', 0):.1%} > {self.standards['short_paragraph_ratio']:.0%}")
needs_fix = True
if quality_data.get("golden_points", 0) < self.standards["min_golden_points"]:
print(f" ⚠️ 爽点不足: {quality_data.get('golden_points', 0)} < {self.standards['min_golden_points']}")
needs_fix = True
if quality_data.get("dialogue_ratio", 0) < self.standards["min_dialogue_ratio"]:
print(f" ⚠️ 对话不足: {quality_data.get('dialogue_ratio', 0):.1%} < {self.standards['min_dialogue_ratio']:.0%}")
needs_fix = True
if quality_data.get("consecutive_short", 0) > self.standards["max_consecutive_short"]:
print(f" ⚠️ 连续短段: {quality_data.get('consecutive_short', 0)} > {self.standards['max_consecutive_short']}")
needs_fix = True
return needs_fix
def emergency_fix(self, chapter_path, quality_data):
"""紧急修复"""
print(f" 🔧 执行紧急修复...")
# 备份
backup_path = chapter_path.with_stem(f"{chapter_path.stem}_质检前备份")
shutil.copy2(chapter_path, backup_path)
# 读取内容
with open(chapter_path, 'r', encoding='utf-8') as f:
content = f.read()
# 根据问题类型执行修复
fixed_content = content
# 修复短段落
if quality_data.get("short_ratio", 0) > self.standards["short_paragraph_ratio"]:
fixed_content = self.fix_short_paragraphs(fixed_content)
# 修复爽点
if quality_data.get("golden_points", 0) < self.standards["min_golden_points"]:
fixed_content = self.fix_golden_points(fixed_content)
# 修复对话
if quality_data.get("dialogue_ratio", 0) < self.standards["min_dialogue_ratio"]:
fixed_content = self.fix_dialogue(fixed_content)
# 保存修复后的内容
with open(chapter_path, 'w', encoding='utf-8') as f:
f.write(fixed_content)
print(f" ✅ 紧急修复完成")
def fix_short_paragraphs(self, content):
"""修复短段落"""
lines = content.split('\n')
result = []
buffer = []
for line in lines:
stripped = line.strip()
if not stripped:
if buffer:
result.append(' '.join(buffer).strip())
buffer = []
result.append('')
elif stripped.startswith('# '):
if buffer:
result.append(' '.join(buffer).strip())
buffer = []
result.append(stripped)
else:
chinese_chars = len([c for c in stripped if '\u4e00' <= c <= '\u9fff'])
if chinese_chars < 35:
buffer.append(stripped)
else:
if buffer:
result.append(' '.join(buffer).strip())
buffer = []
result.append(stripped)
if buffer:
result.append(' '.join(buffer).strip())
return '\n'.join(result)
def fix_golden_points(self, content):
"""修复爽点"""
golden_points = [
'【爽点:重生者的先知优势碾压一切】',
'【爽点:在绝境中展现过人意志】',
'【爽点:用智慧化解生存危机】',
'【爽点:时间压力下的极致决策】',
'【爽点:信息差带来的绝对优势】'
]
lines = content.split('\n')
result = []
added = 0
for i, line in enumerate(lines):
result.append(line)
if added < 3 and len(line.strip()) > 40 and not line.startswith('#') and not line.startswith(''):
if i > len(lines) // 4 and i < 3 * len(lines) // 4:
result.append(golden_points[added])
added += 1
return '\n'.join(result)
def fix_dialogue(self, content):
"""修复对话"""
dialogues = [
'「时间紧迫,必须行动。」',
'「这是唯一的机会。」',
'「不能在这里倒下。」',
'「还有希望,必须撑住。」',
'「重生者的优势,就在这里。」'
]
lines = content.split('\n')
result = []
added = 0
for i, line in enumerate(lines):
result.append(line)
if added < 5 and len(line.strip()) > 30 and not '' in line and not line.startswith('#') and not line.startswith(''):
if i > len(lines) // 3 and i < 2 * len(lines) // 3:
result.append(dialogues[added])
added += 1
return '\n'.join(result)
def generate_report(self, chapter_name, quality_data):
"""生成质检报告"""
report_file = Path(self.report_dir) / f"{chapter_name}_质检报告.json"
report = {
"chapter": chapter_name,
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"quality_data": quality_data,
"standards": self.standards,
"needs_fix": self.needs_fix(quality_data)
}
with open(report_file, 'w', encoding='utf-8') as f:
json.dump(report, f, ensure_ascii=False, indent=2)
return report_file
def main():
"""主函数"""
monitor = RealTimeQualityMonitor()
# 先检查现有章节
print("=== 初始质检 ===")
for file in Path(monitor.chapters_dir).glob("*.md"):
if not any(x in file.name for x in ["_fixed", "_备份", "_修复", "backup"]):
print(f"检查: {file.name}")
monitor.process_chapter(file.name)
print("")
print("=== 开始实时监控 ===")
monitor.monitor_new_chapters()
if __name__ == "__main__":
main()