#!/usr/bin/env python3 """ inkos 质量监控脚本 监控写作质量,自动发现问题并提供修复建议 """ import os import json import re import sys from datetime import datetime from pathlib import Path class QualityMonitor: def __init__(self, config_path=None): self.config = self.load_config(config_path) self.problems = [] self.recommendations = [] def load_config(self, config_path): """加载配置文件""" default_config = { "paragraph": { "min_length": 35, "max_consecutive_short": 3, "short_warning_ratio": 0.3 }, "golden_points": { "required_per_chapter": 3, "keywords": ["打脸", "升级", "收获", "碾压", "反转", "爽点"] }, "emotional_arc": { "required": True, "min_change": 0.3 } } if config_path and os.path.exists(config_path): try: with open(config_path, 'r', encoding='utf-8') as f: return json.load(f) except: return default_config return default_config def analyze_chapter(self, chapter_path): """分析章节质量""" if not os.path.exists(chapter_path): return {"error": "文件不存在"} with open(chapter_path, 'r', encoding='utf-8') as f: content = f.read() # 提取章节信息 chapter_num = self.extract_chapter_number(chapter_path) # 分析各项指标 results = { "chapter": chapter_num, "file": os.path.basename(chapter_path), "timestamp": datetime.now().isoformat(), "metrics": {} } # 1. 段落分析 para_results = self.analyze_paragraphs(content) results["metrics"]["paragraphs"] = para_results # 2. 爽点分析 golden_results = self.analyze_golden_points(content, chapter_num) results["metrics"]["golden_points"] = golden_results # 3. 情绪弧线分析 emotion_results = self.analyze_emotional_arc(content) results["metrics"]["emotional_arc"] = emotion_results # 4. 对话分析 dialogue_results = self.analyze_dialogue(content) results["metrics"]["dialogue"] = dialogue_results # 5. 问题汇总 problems = self.identify_problems(results) results["problems"] = problems # 6. 修复建议 recommendations = self.generate_recommendations(problems, chapter_num) results["recommendations"] = recommendations return results def extract_chapter_number(self, filepath): """从文件名提取章节号""" filename = os.path.basename(filepath) match = re.search(r'(\d{4})_', filename) if match: return int(match.group(1)) return 0 def analyze_paragraphs(self, content): """分析段落结构""" lines = content.split('\n') paragraphs = [] current_para = [] for line in lines: stripped = line.strip() if not stripped: if current_para: paragraphs.append(''.join(current_para)) current_para = [] else: current_para.append(line + ' ') if current_para: paragraphs.append(''.join(current_para)) # 过滤空段落和标题 filtered_paras = [] for para in paragraphs: para_stripped = para.strip() if para_stripped and not para_stripped.startswith('#'): filtered_paras.append(para_stripped) # 统计段落长度 lengths = [] short_count = 0 consecutive_short = 0 max_consecutive = 0 current_streak = 0 for para in filtered_paras: # 计算中文字符数 chinese_chars = len([c for c in para if '\u4e00' <= c <= '\u9fff']) # 数字和英文字母 other_chars = len(re.findall(r'[a-zA-Z0-9]', para)) total = chinese_chars + other_chars lengths.append(total) if total < self.config["paragraph"]["min_length"]: short_count += 1 current_streak += 1 if current_streak > max_consecutive: max_consecutive = current_streak else: current_streak = 0 total_paras = len(filtered_paras) short_ratio = short_count / total_paras if total_paras > 0 else 0 return { "total_paragraphs": total_paras, "short_paragraphs": short_count, "short_ratio": round(short_ratio, 3), "max_consecutive_short": max_consecutive, "avg_length": round(sum(lengths) / len(lengths)) if lengths else 0, "min_length": min(lengths) if lengths else 0, "max_length": max(lengths) if lengths else 0, "lengths": lengths[:20] # 只保留前20个用于展示 } def analyze_golden_points(self, content, chapter_num): """分析爽点密度""" keywords = self.config["golden_points"]["keywords"] required = self.config["golden_points"]["required_per_chapter"] found_keywords = [] keyword_positions = {} for keyword in keywords: if keyword in content: found_keywords.append(keyword) # 统计出现次数 count = content.count(keyword) keyword_positions[keyword] = count # 根据章节号调整期望值 if chapter_num == 1: expected_min = 3 # 黄金三章需要更多爽点 elif chapter_num <= 3: expected_min = 2 else: expected_min = required return { "found_keywords": found_keywords, "total_found": len(found_keywords), "expected_min": expected_min, "meets_requirement": len(found_keywords) >= expected_min, "keyword_counts": keyword_positions, "content_samples": self.extract_golden_point_samples(content, found_keywords) } def extract_golden_point_samples(self, content, keywords, sample_count=3): """提取爽点示例""" samples = [] lines = content.split('\n') for keyword in keywords[:3]: # 最多检查3个关键词 for i, line in enumerate(lines): if keyword in line and len(line.strip()) > 20: # 取上下文 start = max(0, i - 1) end = min(len(lines), i + 2) context = '\n'.join(lines[start:end]) samples.append({ "keyword": keyword, "context": context }) if len(samples) >= sample_count: break if len(samples) >= sample_count: break return samples def analyze_emotional_arc(self, content): """分析情绪弧线(简化版)""" # 情绪关键词 positive_words = ["兴奋", "开心", "满足", "自信", "希望", "轻松"] negative_words = ["紧张", "焦虑", "恐惧", "痛苦", "绝望", "压力"] neutral_words = ["平静", "思考", "观察", "计算", "等待"] positive_count = sum(content.count(word) for word in positive_words) negative_count = sum(content.count(word) for word in negative_words) neutral_count = sum(content.count(word) for word in neutral_words) total = positive_count + negative_count + neutral_count if total == 0: intensity = 0 else: # 情绪强度 = (积极+消极)/总数 intensity = (positive_count + negative_count) / total # 情绪变化(简化:检查是否有情绪转换) lines = content.split('\n') emotion_changes = 0 last_emotion = None for line in lines[:50]: # 只检查前50行 line_emotion = None if any(word in line for word in positive_words): line_emotion = "positive" elif any(word in line for word in negative_words): line_emotion = "negative" elif any(word in line for word in neutral_words): line_emotion = "neutral" if last_emotion and line_emotion and line_emotion != last_emotion: emotion_changes += 1 if line_emotion: last_emotion = line_emotion return { "positive_count": positive_count, "negative_count": negative_count, "neutral_count": neutral_count, "total_emotion_words": total, "emotional_intensity": round(intensity, 3), "emotion_changes": emotion_changes, "has_emotional_arc": emotion_changes >= 2 } def analyze_dialogue(self, content): """分析对话质量""" # 简单的对话检测 dialogue_pattern = r'["「](.+?)["」]' dialogues = re.findall(dialogue_pattern, content) total_chars = len(content) dialogue_chars = sum(len(d) for d in dialogues) dialogue_ratio = dialogue_chars / total_chars if total_chars > 0 else 0 # 对话长度分析 dialogue_lengths = [len(d) for d in dialogues] avg_dialogue_length = sum(dialogue_lengths) / len(dialogue_lengths) if dialogues else 0 return { "dialogue_count": len(dialogues), "dialogue_ratio": round(dialogue_ratio, 3), "avg_dialogue_length": round(avg_dialogue_length, 1), "sample_dialogues": dialogues[:5] # 前5个对话示例 } def identify_problems(self, results): """识别问题""" problems = [] # 段落问题 para_metrics = results["metrics"]["paragraphs"] if para_metrics["short_ratio"] > self.config["paragraph"]["short_warning_ratio"]: problems.append({ "type": "paragraph_structure", "severity": "high", "description": f"短段落比例过高:{para_metrics['short_ratio']*100:.1f}%", "details": f"共{para_metrics['total_paragraphs']}个段落,其中{para_metrics['short_paragraphs']}个短段落" }) if para_metrics["max_consecutive_short"] > self.config["paragraph"]["max_consecutive_short"]: problems.append({ "type": "consecutive_short_paragraphs", "severity": "medium", "description": f"连续短段落过多:{para_metrics['max_consecutive_short']}个", "details": "影响阅读流畅性" }) # 爽点问题 golden_metrics = results["metrics"]["golden_points"] if not golden_metrics["meets_requirement"]: problems.append({ "type": "insufficient_golden_points", "severity": "high", "description": f"爽点不足:找到{golden_metrics['total_found']}个,需要{golden_metrics['expected_min']}个", "details": f"找到的关键词:{', '.join(golden_metrics['found_keywords'])}" }) # 情绪弧线问题 emotion_metrics = results["metrics"]["emotional_arc"] if self.config["emotional_arc"]["required"] and not emotion_metrics["has_emotional_arc"]: problems.append({ "type": "flat_emotional_arc", "severity": "medium", "description": "情绪弧线平坦", "details": f"情绪变化次数:{emotion_metrics['emotion_changes']}" }) # 对话问题 dialogue_metrics = results["metrics"]["dialogue"] if dialogue_metrics["dialogue_ratio"] < 0.2: problems.append({ "type": "low_dialogue_ratio", "severity": "low", "description": f"对话比例偏低:{dialogue_metrics['dialogue_ratio']*100:.1f}%", "details": "番茄小说建议对话比例在30-40%" }) return problems def generate_recommendations(self, problems, chapter_num): """生成修复建议""" recommendations = [] for problem in problems: if problem["type"] == "paragraph_structure": recommendations.append({ "action": "merge_short_paragraphs", "priority": "high" if problem["severity"] == "high" else "medium", "description": "合并短段落,提高段落平均长度", "command": f"python3 merge_paragraphs.py --chapter {chapter_num} --min-length 35" }) elif problem["type"] == "insufficient_golden_points": recommendations.append({ "action": "add_golden_points", "priority": "high", "description": "增加爽点密度", "suggestions": [ "增加一个打脸情节", "展现主角的优势", "设置一个小型反转", "增加资源收获" ] }) elif problem["type"] == "flat_emotional_arc": recommendations.append({ "action": "enhance_emotional_arc", "priority": "medium", "description": "增强情绪起伏", "suggestions": [ "在章节开头设置紧张情绪", "在中间加入情绪释放点", "在结尾设置情绪钩子" ] }) return recommendations def generate_report(self, results, output_path=None): """生成质量报告""" report = { "summary": { "chapter": results["chapter"], "file": results["file"], "timestamp": results["timestamp"], "problem_count": len(results["problems"]), "recommendation_count": len(results["recommendations"]) }, "metrics": results["metrics"], "problems": results["problems"], "recommendations": results["recommendations"] } if output_path: with open(output_path, 'w', encoding='utf-8') as f: json.dump(report, f, ensure_ascii=False, indent=2) return report def print_summary(self, report): """打印摘要""" print(f"\n{'='*60}") print(f"章节质量报告 - 第{report['summary']['chapter']}章") print(f"{'='*60}") print(f"\n📊 指标概览:") print(f" 段落总数:{report['metrics']['paragraphs']['total_paragraphs']}") print(f" 短段落比例:{report['metrics']['paragraphs']['short_ratio']*100:.1f}%") print(f" 爽点数量:{report['metrics']['golden_points']['total_found']}") print(f" 情绪变化:{report['metrics']['emotional_arc']['emotion_changes']}次") print(f" 对话比例:{report['metrics']['dialogue']['dialogue_ratio']*100:.1f}%") if report['problems']: print(f"\n⚠️ 发现问题({len(report['problems'])}个):") for i, problem in enumerate(report['problems'], 1): print(f" {i}. [{problem['severity'].upper()}] {problem['description']}") if report['recommendations']: print(f"\n💡 修复建议({len(report['recommendations'])}条):") for i, rec in enumerate(report['recommendations'], 1): print(f" {i}. [{rec['priority']}] {rec['description']}") def main(): """主函数""" if len(sys.argv) < 2: print("用法:python quality_monitor.py <章节文件路径> [配置文件路径]") sys.exit(1) chapter_path = sys.argv[1] config_path = sys.argv[2] if len(sys.argv) > 2 else None monitor = QualityMonitor(config_path) results = monitor.analyze_chapter(chapter_path) # 生成报告文件 report_file = f"quality_report_chapter{results['chapter']}.json" report = monitor.generate_report(results, report_file) # 打印摘要 monitor.print_summary(report) print(f"\n📄 完整报告已保存到:{report_file}") # 如果有严重问题,返回非零退出码 if any(p["severity"] == "high" for p in results["problems"]): sys.exit(1) else: sys.exit(0) if __name__ == "__main__": main()