novel-doomsday-resurgence/tools/schedule_manager.py

561 lines
19 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
《末日重生》项目定时任务管理系统
提供完整的定时同步、版本管理、监控告警功能
"""
import os
import sys
import json
import time
import subprocess
from datetime import datetime, timedelta
from pathlib import Path
import shutil
import logging
from typing import Dict, List, Optional, Tuple
# 项目配置
PROJECT_DIR = Path("/root/.openclaw/workspace/projects/末日重生_囤货")
CONFIG_DIR = PROJECT_DIR / "config"
LOG_DIR = PROJECT_DIR / "logs"
TOOLS_DIR = PROJECT_DIR / "tools"
BACKUP_DIR = PROJECT_DIR / "backups"
# 确保目录存在
for dir_path in [CONFIG_DIR, LOG_DIR, BACKUP_DIR]:
dir_path.mkdir(parents=True, exist_ok=True)
class Scheduler:
"""定时任务管理器"""
def __init__(self):
self.config = self.load_config()
self.setup_logging()
def load_config(self) -> Dict:
"""加载配置文件"""
config_path = CONFIG_DIR / "cron_schedule.json"
if config_path.exists():
with open(config_path, 'r', encoding='utf-8') as f:
return json.load(f)
else:
# 默认配置
return {
"schedules": {
"auto_commit": {
"enabled": True,
"interval": "30min",
"cron_expr": "*/30 * * * *"
},
"daily_backup": {
"enabled": True,
"time": "03:00"
},
"status_report": {
"enabled": True,
"time": "01:00"
},
"weekly_cleanup": {
"enabled": True,
"day": "sunday",
"time": "02:00"
}
}
}
def setup_logging(self):
"""配置日志系统"""
log_file = LOG_DIR / f"schedule_{datetime.now().strftime('%Y%m')}.log"
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.FileHandler(log_file, encoding='utf-8'),
logging.StreamHandler(sys.stdout)
]
)
self.logger = logging.getLogger(__name__)
def run_auto_commit(self):
"""执行自动提交任务"""
self.logger.info("开始自动提交任务...")
try:
# 运行Git版本管理脚本
script_path = TOOLS_DIR / "git_version_manager.sh"
if not script_path.exists():
self.logger.error(f"脚本不存在: {script_path}")
return False
# 检查是否有更改
result = subprocess.run(
["git", "status", "--porcelain"],
cwd=PROJECT_DIR,
capture_output=True,
text=True,
timeout=30
)
if result.stdout.strip():
# 有未提交的更改,执行提交
commit_message = f"定时更新: {datetime.now().strftime('%Y年%m月%d%H:%M:%S')}"
self.logger.info(f"检测到未提交的更改,提交信息: {commit_message}")
# 执行提交
commit_result = subprocess.run(
[str(script_path), "commit"],
cwd=PROJECT_DIR,
capture_output=True,
text=True,
timeout=60
)
if commit_result.returncode == 0:
self.logger.info("自动提交成功")
return True
else:
self.logger.error(f"自动提交失败: {commit_result.stderr}")
return False
else:
self.logger.info("没有未提交的更改,跳过提交")
return True
except subprocess.TimeoutExpired:
self.logger.error("自动提交任务超时")
return False
except Exception as e:
self.logger.error(f"自动提交任务异常: {e}")
return False
def run_backup_task(self, backup_type: str = "full"):
"""执行备份任务"""
self.logger.info(f"开始备份任务,类型: {backup_type}")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_filename = f"backup_{backup_type}_{timestamp}.tar.gz"
backup_file = BACKUP_DIR / backup_filename
try:
# 根据备份类型执行备份
if backup_type == "full":
# 完整备份
backup_items = [
"chapters", "outlines", "assets",
"tools", "config", "progress"
]
backup_cmd = ["tar", "czf", str(backup_file)] + backup_items
elif backup_type == "chapters":
# 章节备份
backup_cmd = ["tar", "czf", str(backup_file), "chapters"]
elif backup_type == "git":
# Git包备份
backup_file = BACKUP_DIR / f"git_bundle_{timestamp}.bundle"
backup_cmd = ["git", "bundle", "create", str(backup_file), "--all"]
else:
self.logger.error(f"未知的备份类型: {backup_type}")
return False
# 执行备份命令
result = subprocess.run(
backup_cmd,
cwd=PROJECT_DIR,
capture_output=True,
text=True,
timeout=300
)
if result.returncode == 0:
backup_size = backup_file.stat().st_size / (1024 * 1024) # MB
self.logger.info(f"备份成功: {backup_filename} ({backup_size:.2f} MB)")
# 清理旧备份
self.cleanup_old_backups()
return True
else:
self.logger.error(f"备份失败: {result.stderr}")
return False
except Exception as e:
self.logger.error(f"备份任务异常: {e}")
return False
def cleanup_old_backups(self):
"""清理旧备份文件"""
retention_days = 7
cutoff_time = datetime.now() - timedelta(days=retention_days)
deleted_files = []
for backup_file in BACKUP_DIR.glob("*.tar.gz"):
file_time = datetime.fromtimestamp(backup_file.stat().st_mtime)
if file_time < cutoff_time:
try:
backup_file.unlink()
deleted_files.append(backup_file.name)
except Exception as e:
self.logger.warning(f"删除旧备份失败 {backup_file}: {e}")
if deleted_files:
self.logger.info(f"清理旧备份: {len(deleted_files)} 个文件")
def run_status_report(self):
"""执行状态报告任务"""
self.logger.info("开始状态报告任务...")
try:
# 运行Git版本管理脚本生成报告
script_path = TOOLS_DIR / "git_version_manager.sh"
if not script_path.exists():
self.logger.error(f"脚本不存在: {script_path}")
return False
# 执行报告生成
result = subprocess.run(
[str(script_path), "report"],
cwd=PROJECT_DIR,
capture_output=True,
text=True,
timeout=120
)
if result.returncode == 0:
self.logger.info("状态报告生成成功")
# 发送通知(可选)
if self.config.get("notifications", {}).get("enabled", False):
self.send_notification("状态报告已生成")
return True
else:
self.logger.error(f"状态报告生成失败: {result.stderr}")
return False
except Exception as e:
self.logger.error(f"状态报告任务异常: {e}")
return False
def send_notification(self, message: str):
"""发送通知(示例)"""
# 这里可以实现邮件、钉钉、飞书等通知方式
# 示例:输出到日志和终端
print(f"📢 通知: {message}")
self.logger.info(f"通知已发送: {message}")
def check_health(self) -> Dict:
"""检查系统健康状态"""
health_status = {
"timestamp": datetime.now().isoformat(),
"components": {}
}
# 检查磁盘空间
disk_usage = shutil.disk_usage(PROJECT_DIR)
disk_percent = disk_usage.used / disk_usage.total * 100
health_status["components"]["disk"] = {
"total_gb": disk_usage.total / (1024**3),
"used_gb": disk_usage.used / (1024**3),
"free_gb": disk_usage.free / (1024**3),
"percent_used": disk_percent,
"status": "OK" if disk_percent < 90 else "WARNING"
}
# 检查Git仓库
git_status = subprocess.run(
["git", "status"],
cwd=PROJECT_DIR,
capture_output=True,
text=True
)
health_status["components"]["git"] = {
"repository_healthy": git_status.returncode == 0,
"status": "OK" if git_status.returncode == 0 else "ERROR"
}
# 检查项目文件
required_dirs = ["chapters", "outlines", "assets"]
for dir_name in required_dirs:
dir_path = PROJECT_DIR / dir_name
health_status["components"][dir_name] = {
"exists": dir_path.exists(),
"file_count": len(list(dir_path.glob("*"))) if dir_path.exists() else 0,
"status": "OK" if dir_path.exists() else "MISSING"
}
# 总体状态
all_ok = all(
comp.get("status") == "OK"
for comp in health_status["components"].values()
)
health_status["overall_status"] = "HEALTHY" if all_ok else "UNHEALTHY"
return health_status
def monitor_task_execution(self):
"""监控任务执行状态"""
health_status = self.check_health()
self.logger.info("系统健康状态检查")
self.logger.info(f"总体状态: {health_status['overall_status']}")
# 记录到监控日志
monitor_log = LOG_DIR / f"monitor_{datetime.now().strftime('%Y%m%d')}.json"
monitor_data = {
"timestamp": datetime.now().isoformat(),
"health": health_status,
"tasks": {
"auto_commit": self.config.get("schedules", {}).get("auto_commit", {}),
"daily_backup": self.config.get("schedules", {}).get("daily_backup", {}),
"status_report": self.config.get("schedules", {}).get("status_report", {})
}
}
try:
with open(monitor_log, 'a', encoding='utf-8') as f:
f.write(json.dumps(monitor_data, ensure_ascii=False) + "\n")
self.logger.info(f"监控数据已记录: {monitor_log}")
except Exception as e:
self.logger.error(f"记录监控数据失败: {e}")
def install_cron_jobs(self):
"""安装Cron定时任务"""
cron_script = """
#!/bin/bash
# 《末日重生》定时任务配置
# 自动安装和配置定时同步任务
# 自动提交任务每30分钟
*/30 * * * * cd /root/.openclaw/workspace/projects/末日重生_囤货 && ./tools/git_version_manager.sh commit >> /tmp/末日重生_自动提交.log 2>&1
# 状态报告任务每天凌晨1点
0 1 * * * cd /root/.openclaw/workspace/projects/末日重生_囤货 && ./tools/git_version_manager.sh report >> /tmp/末日重生_报告生成.log 2>&1
# 备份任务每天凌晨3点
0 3 * * * cd /root/.openclaw/workspace/projects/末日重生_囤货 && ./tools/schedule_manager.py backup >> /tmp/末日重生_备份.log 2>&1
# 健康检查(每小时)
0 * * * * cd /root/.openclaw/workspace/projects/末日重生_囤货 && ./tools/schedule_manager.py health >> /tmp/末日重生_健康检查.log 2>&1
"""
cron_file = "/tmp/末日重生_cron.conf"
try:
with open(cron_file, 'w', encoding='utf-8') as f:
f.write(cron_script)
# 安装Cron任务
subprocess.run(["crontab", cron_file], check=True)
self.logger.info("Cron定时任务安装成功")
self.logger.info("已配置以下定时任务:")
self.logger.info(" - 自动提交每30分钟")
self.logger.info(" - 状态报告每天凌晨1点")
self.logger.info(" - 项目备份每天凌晨3点")
self.logger.info(" - 健康检查:每小时")
os.unlink(cron_file)
return True
except Exception as e:
self.logger.error(f"安装Cron任务失败: {e}")
return False
def show_dashboard(self):
"""显示系统仪表板"""
health = self.check_health()
print("=" * 60)
print("📊 《末日重生》项目管理系统仪表板")
print("=" * 60)
print()
print("📈 项目统计")
print("" * 40)
# 章节统计
chapters_dir = PROJECT_DIR / "chapters"
if chapters_dir.exists():
chapter_files = list(chapters_dir.glob("*.md"))
print(f" 📖 章节数量: {len(chapter_files)}")
# 计算总字数
total_words = 0
for chapter in chapter_files:
try:
content = chapter.read_text(encoding='utf-8')
words = len(content.split())
total_words += words
except:
continue
print(f" 📝 总字数: {total_words}")
avg_words = total_words // len(chapter_files) if chapter_files else 0
print(f" ⚖️ 平均每章: {avg_words}")
else:
print(" ⚠️ 章节目录不存在")
print()
print("💾 备份管理")
print("" * 40)
backup_files = list(BACKUP_DIR.glob("*.tar.gz")) + list(BACKUP_DIR.glob("*.bundle"))
if backup_files:
backup_files.sort(key=lambda x: x.stat().st_mtime, reverse=True)
print(f" 🗂️ 备份数量: {len(backup_files)}")
print(f" 🗓️ 最新备份: {backup_files[0].name}")
print(f" 备份时间: {datetime.fromtimestamp(backup_files[0].stat().st_mtime).strftime('%Y-%m-%d %H:%M:%S')}")
# 计算总备份大小
total_size = sum(f.stat().st_size for f in backup_files) / (1024**3) # GB
print(f" 📦 总备份大小: {total_size:.2f} GB")
else:
print(" ⚠️ 暂无备份文件")
print()
print("🔄 任务配置")
print("" *40)
for task_name, task_config in self.config.get("schedules", {}).items():
status = "✅ 启用" if task_config.get("enabled", False) else "❌ 停用"
if task_name == "auto_commit":
interval = task_config.get("interval", "未知")
print(f" 📝 {task_name}: {status} (间隔: {interval})")
elif task_name == "daily_backup":
backup_time = task_config.get("time", "未知")
print(f" 💾 {task_name}: {status} (时间: {backup_time})")
elif task_name == "status_report":
report_time = task_config.get("time", "未知")
print(f" 📊 {task_name}: {status} (时间: {report_time})")
elif task_name == "weekly_cleanup":
cleanup_day = task_config.get("day", "未知")
cleanup_time = task_config.get("time", "未知")
print(f" 🧹 {task_name}: {status} (时间: 每周{cleanup_day}{cleanup_time})")
print()
print("📈 健康状态")
print("" *40)
if health.get("overall_status") == "HEALTHY":
print(" 🟢 系统状态: 健康")
else:
print(" 🔴 系统状态: 异常")
# 显示异常组件
for comp_name, comp_status in health.get("components", {}).items():
if comp_status.get("status") != "OK":
print(f" ⚠️ {comp_name}: {comp_status.get('status')}")
print()
print("=" *60)
def main():
"""主函数"""
import argparse
parser = argparse.ArgumentParser(
description="《末日重生》项目定时任务管理系统",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
使用示例:
# 显示系统仪表板
python schedule_manager.py dashboard
# 执行自动提交
python schedule_manager.py commit
# 执行备份任务
python schedule_manager.py backup --type full
# 安装定时任务
python schedule_manager.py install
# 检查系统健康
python schedule_manager.py health
# 运行状态报告
python schedule_manager.py report
"""
)
parser.add_argument(
"action",
choices=[
"dashboard", "commit", "backup", "report", "health", "install"
],
help="要执行的操作"
)
parser.add_argument(
"--type",
default="full",
choices=["full", "chapters", "git"],
help="备份类型"
)
parser.add_argument(
"--verbose",
action="store_true",
help="显示详细日志"
)
args = parser.parse_args()
scheduler = Scheduler()
if args.verbose:
scheduler.logger.setLevel(logging.DEBUG)
if args.action == "dashboard":
scheduler.show_dashboard()
elif args.action == "commit":
success = scheduler.run_auto_commit()
exit(0 if success else 1)
elif args.action == "backup":
success = scheduler.run_backup_task(args.type)
exit(0 if success else 1)
elif args.action == "report":
success = scheduler.run_status_report()
exit(0 if success else 1)
elif args.action == "health":
health_status = scheduler.check_health()
print(json.dumps(health_status, ensure_ascii=False, indent=2))
exit(0 if health_status["overall_status"] == "HEALTHY" else 1)
elif args.action == "install":
success = scheduler.install_cron_jobs()
exit(0 if success else 1)
if __name__ == "__main__":
main()