#!/usr/bin/env python3 """ 《末日重生》项目定时任务管理系统 提供完整的定时同步、版本管理、监控告警功能 """ import os import sys import json import time import subprocess from datetime import datetime, timedelta from pathlib import Path import shutil import logging from typing import Dict, List, Optional, Tuple # 项目配置 PROJECT_DIR = Path("/root/.openclaw/workspace/projects/末日重生_囤货") CONFIG_DIR = PROJECT_DIR / "config" LOG_DIR = PROJECT_DIR / "logs" TOOLS_DIR = PROJECT_DIR / "tools" BACKUP_DIR = PROJECT_DIR / "backups" # 确保目录存在 for dir_path in [CONFIG_DIR, LOG_DIR, BACKUP_DIR]: dir_path.mkdir(parents=True, exist_ok=True) class Scheduler: """定时任务管理器""" def __init__(self): self.config = self.load_config() self.setup_logging() def load_config(self) -> Dict: """加载配置文件""" config_path = CONFIG_DIR / "cron_schedule.json" if config_path.exists(): with open(config_path, 'r', encoding='utf-8') as f: return json.load(f) else: # 默认配置 return { "schedules": { "auto_commit": { "enabled": True, "interval": "30min", "cron_expr": "*/30 * * * *" }, "daily_backup": { "enabled": True, "time": "03:00" }, "status_report": { "enabled": True, "time": "01:00" }, "weekly_cleanup": { "enabled": True, "day": "sunday", "time": "02:00" } } } def setup_logging(self): """配置日志系统""" log_file = LOG_DIR / f"schedule_{datetime.now().strftime('%Y%m')}.log" logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s', handlers=[ logging.FileHandler(log_file, encoding='utf-8'), logging.StreamHandler(sys.stdout) ] ) self.logger = logging.getLogger(__name__) def run_auto_commit(self): """执行自动提交任务""" self.logger.info("开始自动提交任务...") try: # 运行Git版本管理脚本 script_path = TOOLS_DIR / "git_version_manager.sh" if not script_path.exists(): self.logger.error(f"脚本不存在: {script_path}") return False # 检查是否有更改 result = subprocess.run( ["git", "status", "--porcelain"], cwd=PROJECT_DIR, capture_output=True, text=True, timeout=30 ) if result.stdout.strip(): # 有未提交的更改,执行提交 commit_message = f"定时更新: {datetime.now().strftime('%Y年%m月%d日 %H:%M:%S')}" self.logger.info(f"检测到未提交的更改,提交信息: {commit_message}") # 执行提交 commit_result = subprocess.run( [str(script_path), "commit"], cwd=PROJECT_DIR, capture_output=True, text=True, timeout=60 ) if commit_result.returncode == 0: self.logger.info("自动提交成功") return True else: self.logger.error(f"自动提交失败: {commit_result.stderr}") return False else: self.logger.info("没有未提交的更改,跳过提交") return True except subprocess.TimeoutExpired: self.logger.error("自动提交任务超时") return False except Exception as e: self.logger.error(f"自动提交任务异常: {e}") return False def run_backup_task(self, backup_type: str = "full"): """执行备份任务""" self.logger.info(f"开始备份任务,类型: {backup_type}") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") backup_filename = f"backup_{backup_type}_{timestamp}.tar.gz" backup_file = BACKUP_DIR / backup_filename try: # 根据备份类型执行备份 if backup_type == "full": # 完整备份 backup_items = [ "chapters", "outlines", "assets", "tools", "config", "progress" ] backup_cmd = ["tar", "czf", str(backup_file)] + backup_items elif backup_type == "chapters": # 章节备份 backup_cmd = ["tar", "czf", str(backup_file), "chapters"] elif backup_type == "git": # Git包备份 backup_file = BACKUP_DIR / f"git_bundle_{timestamp}.bundle" backup_cmd = ["git", "bundle", "create", str(backup_file), "--all"] else: self.logger.error(f"未知的备份类型: {backup_type}") return False # 执行备份命令 result = subprocess.run( backup_cmd, cwd=PROJECT_DIR, capture_output=True, text=True, timeout=300 ) if result.returncode == 0: backup_size = backup_file.stat().st_size / (1024 * 1024) # MB self.logger.info(f"备份成功: {backup_filename} ({backup_size:.2f} MB)") # 清理旧备份 self.cleanup_old_backups() return True else: self.logger.error(f"备份失败: {result.stderr}") return False except Exception as e: self.logger.error(f"备份任务异常: {e}") return False def cleanup_old_backups(self): """清理旧备份文件""" retention_days = 7 cutoff_time = datetime.now() - timedelta(days=retention_days) deleted_files = [] for backup_file in BACKUP_DIR.glob("*.tar.gz"): file_time = datetime.fromtimestamp(backup_file.stat().st_mtime) if file_time < cutoff_time: try: backup_file.unlink() deleted_files.append(backup_file.name) except Exception as e: self.logger.warning(f"删除旧备份失败 {backup_file}: {e}") if deleted_files: self.logger.info(f"清理旧备份: {len(deleted_files)} 个文件") def run_status_report(self): """执行状态报告任务""" self.logger.info("开始状态报告任务...") try: # 运行Git版本管理脚本生成报告 script_path = TOOLS_DIR / "git_version_manager.sh" if not script_path.exists(): self.logger.error(f"脚本不存在: {script_path}") return False # 执行报告生成 result = subprocess.run( [str(script_path), "report"], cwd=PROJECT_DIR, capture_output=True, text=True, timeout=120 ) if result.returncode == 0: self.logger.info("状态报告生成成功") # 发送通知(可选) if self.config.get("notifications", {}).get("enabled", False): self.send_notification("状态报告已生成") return True else: self.logger.error(f"状态报告生成失败: {result.stderr}") return False except Exception as e: self.logger.error(f"状态报告任务异常: {e}") return False def send_notification(self, message: str): """发送通知(示例)""" # 这里可以实现邮件、钉钉、飞书等通知方式 # 示例:输出到日志和终端 print(f"📢 通知: {message}") self.logger.info(f"通知已发送: {message}") def check_health(self) -> Dict: """检查系统健康状态""" health_status = { "timestamp": datetime.now().isoformat(), "components": {} } # 检查磁盘空间 disk_usage = shutil.disk_usage(PROJECT_DIR) disk_percent = disk_usage.used / disk_usage.total * 100 health_status["components"]["disk"] = { "total_gb": disk_usage.total / (1024**3), "used_gb": disk_usage.used / (1024**3), "free_gb": disk_usage.free / (1024**3), "percent_used": disk_percent, "status": "OK" if disk_percent < 90 else "WARNING" } # 检查Git仓库 git_status = subprocess.run( ["git", "status"], cwd=PROJECT_DIR, capture_output=True, text=True ) health_status["components"]["git"] = { "repository_healthy": git_status.returncode == 0, "status": "OK" if git_status.returncode == 0 else "ERROR" } # 检查项目文件 required_dirs = ["chapters", "outlines", "assets"] for dir_name in required_dirs: dir_path = PROJECT_DIR / dir_name health_status["components"][dir_name] = { "exists": dir_path.exists(), "file_count": len(list(dir_path.glob("*"))) if dir_path.exists() else 0, "status": "OK" if dir_path.exists() else "MISSING" } # 总体状态 all_ok = all( comp.get("status") == "OK" for comp in health_status["components"].values() ) health_status["overall_status"] = "HEALTHY" if all_ok else "UNHEALTHY" return health_status def monitor_task_execution(self): """监控任务执行状态""" health_status = self.check_health() self.logger.info("系统健康状态检查") self.logger.info(f"总体状态: {health_status['overall_status']}") # 记录到监控日志 monitor_log = LOG_DIR / f"monitor_{datetime.now().strftime('%Y%m%d')}.json" monitor_data = { "timestamp": datetime.now().isoformat(), "health": health_status, "tasks": { "auto_commit": self.config.get("schedules", {}).get("auto_commit", {}), "daily_backup": self.config.get("schedules", {}).get("daily_backup", {}), "status_report": self.config.get("schedules", {}).get("status_report", {}) } } try: with open(monitor_log, 'a', encoding='utf-8') as f: f.write(json.dumps(monitor_data, ensure_ascii=False) + "\n") self.logger.info(f"监控数据已记录: {monitor_log}") except Exception as e: self.logger.error(f"记录监控数据失败: {e}") def install_cron_jobs(self): """安装Cron定时任务""" cron_script = """ #!/bin/bash # 《末日重生》定时任务配置 # 自动安装和配置定时同步任务 # 自动提交任务(每30分钟) */30 * * * * cd /root/.openclaw/workspace/projects/末日重生_囤货 && ./tools/git_version_manager.sh commit >> /tmp/末日重生_自动提交.log 2>&1 # 状态报告任务(每天凌晨1点) 0 1 * * * cd /root/.openclaw/workspace/projects/末日重生_囤货 && ./tools/git_version_manager.sh report >> /tmp/末日重生_报告生成.log 2>&1 # 备份任务(每天凌晨3点) 0 3 * * * cd /root/.openclaw/workspace/projects/末日重生_囤货 && ./tools/schedule_manager.py backup >> /tmp/末日重生_备份.log 2>&1 # 健康检查(每小时) 0 * * * * cd /root/.openclaw/workspace/projects/末日重生_囤货 && ./tools/schedule_manager.py health >> /tmp/末日重生_健康检查.log 2>&1 """ cron_file = "/tmp/末日重生_cron.conf" try: with open(cron_file, 'w', encoding='utf-8') as f: f.write(cron_script) # 安装Cron任务 subprocess.run(["crontab", cron_file], check=True) self.logger.info("Cron定时任务安装成功") self.logger.info("已配置以下定时任务:") self.logger.info(" - 自动提交:每30分钟") self.logger.info(" - 状态报告:每天凌晨1点") self.logger.info(" - 项目备份:每天凌晨3点") self.logger.info(" - 健康检查:每小时") os.unlink(cron_file) return True except Exception as e: self.logger.error(f"安装Cron任务失败: {e}") return False def show_dashboard(self): """显示系统仪表板""" health = self.check_health() print("=" * 60) print("📊 《末日重生》项目管理系统仪表板") print("=" * 60) print() print("📈 项目统计") print("─" * 40) # 章节统计 chapters_dir = PROJECT_DIR / "chapters" if chapters_dir.exists(): chapter_files = list(chapters_dir.glob("*.md")) print(f" 📖 章节数量: {len(chapter_files)} 章") # 计算总字数 total_words = 0 for chapter in chapter_files: try: content = chapter.read_text(encoding='utf-8') words = len(content.split()) total_words += words except: continue print(f" 📝 总字数: {total_words} 字") avg_words = total_words // len(chapter_files) if chapter_files else 0 print(f" ⚖️ 平均每章: {avg_words} 字") else: print(" ⚠️ 章节目录不存在") print() print("💾 备份管理") print("─" * 40) backup_files = list(BACKUP_DIR.glob("*.tar.gz")) + list(BACKUP_DIR.glob("*.bundle")) if backup_files: backup_files.sort(key=lambda x: x.stat().st_mtime, reverse=True) print(f" 🗂️ 备份数量: {len(backup_files)} 个") print(f" 🗓️ 最新备份: {backup_files[0].name}") print(f" 备份时间: {datetime.fromtimestamp(backup_files[0].stat().st_mtime).strftime('%Y-%m-%d %H:%M:%S')}") # 计算总备份大小 total_size = sum(f.stat().st_size for f in backup_files) / (1024**3) # GB print(f" 📦 总备份大小: {total_size:.2f} GB") else: print(" ⚠️ 暂无备份文件") print() print("🔄 任务配置") print("─" *40) for task_name, task_config in self.config.get("schedules", {}).items(): status = "✅ 启用" if task_config.get("enabled", False) else "❌ 停用" if task_name == "auto_commit": interval = task_config.get("interval", "未知") print(f" 📝 {task_name}: {status} (间隔: {interval})") elif task_name == "daily_backup": backup_time = task_config.get("time", "未知") print(f" 💾 {task_name}: {status} (时间: {backup_time})") elif task_name == "status_report": report_time = task_config.get("time", "未知") print(f" 📊 {task_name}: {status} (时间: {report_time})") elif task_name == "weekly_cleanup": cleanup_day = task_config.get("day", "未知") cleanup_time = task_config.get("time", "未知") print(f" 🧹 {task_name}: {status} (时间: 每周{cleanup_day}{cleanup_time})") print() print("📈 健康状态") print("─" *40) if health.get("overall_status") == "HEALTHY": print(" 🟢 系统状态: 健康") else: print(" 🔴 系统状态: 异常") # 显示异常组件 for comp_name, comp_status in health.get("components", {}).items(): if comp_status.get("status") != "OK": print(f" ⚠️ {comp_name}: {comp_status.get('status')}") print() print("=" *60) def main(): """主函数""" import argparse parser = argparse.ArgumentParser( description="《末日重生》项目定时任务管理系统", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 使用示例: # 显示系统仪表板 python schedule_manager.py dashboard # 执行自动提交 python schedule_manager.py commit # 执行备份任务 python schedule_manager.py backup --type full # 安装定时任务 python schedule_manager.py install # 检查系统健康 python schedule_manager.py health # 运行状态报告 python schedule_manager.py report """ ) parser.add_argument( "action", choices=[ "dashboard", "commit", "backup", "report", "health", "install" ], help="要执行的操作" ) parser.add_argument( "--type", default="full", choices=["full", "chapters", "git"], help="备份类型" ) parser.add_argument( "--verbose", action="store_true", help="显示详细日志" ) args = parser.parse_args() scheduler = Scheduler() if args.verbose: scheduler.logger.setLevel(logging.DEBUG) if args.action == "dashboard": scheduler.show_dashboard() elif args.action == "commit": success = scheduler.run_auto_commit() exit(0 if success else 1) elif args.action == "backup": success = scheduler.run_backup_task(args.type) exit(0 if success else 1) elif args.action == "report": success = scheduler.run_status_report() exit(0 if success else 1) elif args.action == "health": health_status = scheduler.check_health() print(json.dumps(health_status, ensure_ascii=False, indent=2)) exit(0 if health_status["overall_status"] == "HEALTHY" else 1) elif args.action == "install": success = scheduler.install_cron_jobs() exit(0 if success else 1) if __name__ == "__main__": main()