novel-doomsday-resurgence/skills/fanfic-writer/scripts/v2/atomic_io.py

583 lines
19 KiB
Python
Raw Permalink Normal View History

"""
Fanfic Writer v2.0 - Atomic I/O Module
Implements atomic file writes with fsync, snapshots, and rollback capabilities
"""
import os
import json
import shutil
import hashlib
import tempfile
from datetime import datetime
from pathlib import Path
from typing import Optional, Dict, Any, List, Callable, Union
from contextlib import contextmanager
try:
import fcntl # Unix file locking
except ImportError:
fcntl = None # Windows doesn't have fcntl
# ============================================================================
# Atomic Write Operations
# ============================================================================
def atomic_write_text(
path: Path,
content: str,
encoding: str = 'utf-8',
fsync: bool = True
) -> bool:
"""
Atomically write text file using temp fsync rename pattern
Process:
1. Write to temp file in same directory
2. fsync to ensure data hits disk
3. rename (atomic on POSIX and modern Windows)
Returns True on success, False on failure
"""
try:
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
# Create temp file in same directory (for atomic rename)
fd, temp_path = tempfile.mkstemp(
dir=path.parent,
prefix=f'.tmp_{path.stem}_',
suffix='.tmp'
)
try:
# Write content
with os.fdopen(fd, 'w', encoding=encoding) as f:
f.write(content)
if fsync:
f.flush()
os.fsync(f.fileno())
# Atomic rename
os.replace(temp_path, path)
return True
except Exception:
# Clean up temp file on failure
try:
os.unlink(temp_path)
except:
pass
raise
except Exception as e:
print(f"[Atomic Write Error] Failed to write {path}: {e}")
return False
def atomic_write_json(
path: Path,
data: Any,
indent: int = 2,
fsync: bool = True
) -> bool:
"""Atomically write JSON file"""
try:
content = json.dumps(data, indent=indent, ensure_ascii=False, default=str)
return atomic_write_text(path, content, fsync=fsync)
except Exception as e:
print(f"[Atomic Write Error] Failed to serialize JSON for {path}: {e}")
return False
def atomic_write_jsonl(
path: Path,
records: List[Dict[str, Any]],
append: bool = False,
fsync: bool = True
) -> bool:
"""
Atomically write JSONL file (JSON Lines format)
If append=True and file exists, new records are appended
Otherwise, file is overwritten
"""
try:
lines = []
if append and path.exists():
# Read existing content
with open(path, 'r', encoding='utf-8') as f:
existing = f.read()
if existing.strip():
lines.append(existing.rstrip('\n'))
# Add new records
for record in records:
lines.append(json.dumps(record, ensure_ascii=False, default=str))
content = '\n'.join(lines) + '\n'
return atomic_write_text(path, content, fsync=fsync)
except Exception as e:
print(f"[Atomic Write Error] Failed to write JSONL {path}: {e}")
return False
def atomic_append_jsonl(
path: Path,
record: Dict[str, Any],
fsync: bool = True
) -> bool:
"""
Atomically append single record to JSONL file
Less efficient than batching, but useful for logging
"""
return atomic_write_jsonl(path, [record], append=True, fsync=fsync)
# ============================================================================
# File Locking (for exclusive access)
# ============================================================================
class FileLock:
"""
Cross-platform file locking for exclusive write access
Uses fcntl on Unix, msvcrt on Windows
"""
def __init__(self, path: Path, timeout: float = 10.0):
self.path = Path(path)
self.timeout = timeout
self.lock_file = None
def __enter__(self):
self.path.parent.mkdir(parents=True, exist_ok=True)
# Create lock file if not exists
self.path.touch(exist_ok=True)
# Open for locking
self.lock_file = open(self.path, 'r+')
try:
if os.name == 'nt': # Windows
import msvcrt
# Windows doesn't have true advisory locking
# We use a simple exclusive open approach
pass
else: # Unix/Linux/Mac
import fcntl
fcntl.flock(self.lock_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
except (IOError, OSError):
# Lock failed
self.lock_file.close()
raise RuntimeError(f"Could not acquire lock on {self.path}")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.lock_file:
try:
if os.name != 'nt':
import fcntl
fcntl.flock(self.lock_file.fileno(), fcntl.LOCK_UN)
except:
pass
self.lock_file.close()
# ============================================================================
# Checksum / Hash for Integrity Verification
# ============================================================================
def compute_file_hash(path: Path, algorithm: str = 'sha256') -> str:
"""Compute hash of file contents for integrity checking"""
hasher = hashlib.new(algorithm)
with open(path, 'rb') as f:
for chunk in iter(lambda: f.read(8192), b''):
hasher.update(chunk)
return hasher.hexdigest()
def verify_file_integrity(path: Path, expected_hash: str, algorithm: str = 'sha256') -> bool:
"""Verify file matches expected hash"""
if not path.exists():
return False
actual_hash = compute_file_hash(path, algorithm)
return actual_hash == expected_hash
# ============================================================================
# Snapshot Management
# ============================================================================
class SnapshotManager:
"""
Manages directory snapshots for rollback capability
"""
def __init__(self, archive_dir: Path):
self.archive_dir = Path(archive_dir)
self.snapshots_dir = self.archive_dir / "snapshots"
self.snapshots_dir.mkdir(parents=True, exist_ok=True)
def create_snapshot(
self,
source_dirs: List[Path],
snapshot_name: str,
metadata: Optional[Dict[str, Any]] = None
) -> Path:
"""
Create a snapshot of specified directories
Args:
source_dirs: List of directories to snapshot
snapshot_name: Name for this snapshot (e.g., "ch015_attempt1")
metadata: Optional metadata to store with snapshot
Returns:
Path to created snapshot directory
"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
snapshot_dir = self.snapshots_dir / f"{snapshot_name}_{timestamp}"
snapshot_dir.mkdir(parents=True, exist_ok=True)
# Copy each source directory
for src_dir in source_dirs:
if not src_dir.exists():
continue
dst_dir = snapshot_dir / src_dir.name
# Use shutil.copytree for directories
if src_dir.is_dir():
shutil.copytree(src_dir, dst_dir, dirs_exist_ok=True)
# Write metadata
if metadata:
meta_path = snapshot_dir / ".snapshot_meta.json"
with open(meta_path, 'w', encoding='utf-8') as f:
json.dump({
'created_at': datetime.now().isoformat(),
'source_dirs': [str(d) for d in source_dirs],
**metadata
}, f, indent=2, ensure_ascii=False)
return snapshot_dir
def restore_snapshot(
self,
snapshot_dir: Path,
target_dirs: List[Path]
) -> bool:
"""
Restore from snapshot
Args:
snapshot_dir: Path to snapshot directory
target_dirs: List of target directories to restore to
Returns:
True on success
"""
try:
snapshot_dir = Path(snapshot_dir)
for target_dir in target_dirs:
src = snapshot_dir / target_dir.name
if src.exists():
# Remove existing target
if target_dir.exists():
shutil.rmtree(target_dir)
# Copy from snapshot
shutil.copytree(src, target_dir)
return True
except Exception as e:
print(f"[Snapshot Error] Failed to restore from {snapshot_dir}: {e}")
return False
def list_snapshots(self) -> List[Path]:
"""List all available snapshots"""
if not self.snapshots_dir.exists():
return []
return sorted(self.snapshots_dir.iterdir(), key=lambda p: p.stat().st_mtime)
def cleanup_old_snapshots(self, keep_count: int = 10) -> int:
"""
Remove old snapshots, keeping only the most recent N
Returns:
Number of snapshots removed
"""
snapshots = self.list_snapshots()
if len(snapshots) <= keep_count:
return 0
to_remove = snapshots[:-keep_count]
removed = 0
for snap in to_remove:
try:
shutil.rmtree(snap)
removed += 1
except Exception as e:
print(f"[Snapshot Cleanup Error] Failed to remove {snap}: {e}")
return removed
# ============================================================================
# Rollback Manager
# ============================================================================
class RollbackManager:
"""
Manages transaction-like rollback capability
"""
def __init__(self, run_dir: Path):
self.run_dir = Path(run_dir)
self.archive_dir = self.run_dir / "archive"
self.reverted_dir = self.archive_dir / "reverted"
self.reverted_dir.mkdir(parents=True, exist_ok=True)
self.snapshot_manager = SnapshotManager(self.archive_dir)
self._transaction_stack: List[Dict[str, Any]] = []
def begin_transaction(self, name: str) -> str:
"""
Begin a new transaction
Returns:
Transaction ID
"""
tx_id = f"tx_{name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
# Create pre-transaction snapshot
snapshot = self.snapshot_manager.create_snapshot(
source_dirs=[
self.run_dir / "4-state",
self.run_dir / "chapters",
self.run_dir / "drafts"
],
snapshot_name=f"tx_{name}",
metadata={'transaction_id': tx_id, 'phase': 'pre'}
)
self._transaction_stack.append({
'id': tx_id,
'name': name,
'pre_snapshot': snapshot
})
return tx_id
def commit_transaction(self) -> bool:
"""Commit current transaction (remove from stack)"""
if not self._transaction_stack:
return False
tx = self._transaction_stack.pop()
# Create post-transaction snapshot for audit
self.snapshot_manager.create_snapshot(
source_dirs=[
self.run_dir / "4-state",
self.run_dir / "chapters"
],
snapshot_name=f"tx_{tx['name']}_committed",
metadata={'transaction_id': tx['id'], 'phase': 'post'}
)
return True
def rollback_transaction(self) -> bool:
"""
Rollback current transaction to pre-transaction state
Returns:
True on success
"""
if not self._transaction_stack:
return False
tx = self._transaction_stack.pop()
# Restore from pre-transaction snapshot
success = self.snapshot_manager.restore_snapshot(
tx['pre_snapshot'],
[
self.run_dir / "4-state",
self.run_dir / "chapters",
self.run_dir / "drafts"
]
)
return success
def revert_chapter(self, chapter_num: int, chapter_path: Path) -> Path:
"""
Revert a chapter by moving it to archive/reverted/
Returns:
Path to archived file
"""
if not chapter_path.exists():
raise FileNotFoundError(f"Chapter file not found: {chapter_path}")
# Generate archive filename
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
archive_name = f"reverted_ch{chapter_num:03d}_{timestamp}_{chapter_path.name}"
archive_path = self.reverted_dir / archive_name
# Move to archive
shutil.move(str(chapter_path), str(archive_path))
return archive_path
# ============================================================================
# State Commit Helper (Transaction-like)
# ============================================================================
class StateCommit:
"""
Context manager for atomic state commits
Usage:
with StateCommit(rollback_manager, "chapter_15") as commit:
# Make changes...
commit.add_file(state_path, new_state)
commit.add_file(chapter_path, chapter_content)
# If no exception, auto-committed on exit
# If exception, auto-rollback
"""
def __init__(self, rollback_manager: RollbackManager, name: str):
self.rollback_manager = rollback_manager
self.name = name
self.tx_id: Optional[str] = None
self._files_to_write: List[Tuple[Path, Union[str, Dict], str]] = []
def __enter__(self):
self.tx_id = self.rollback_manager.begin_transaction(self.name)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is None:
# Success - commit all files
success = True
for path, content, content_type in self._files_to_write:
if content_type == 'json':
if not atomic_write_json(path, content):
success = False
break
elif content_type == 'jsonl':
if not atomic_write_jsonl(path, content):
success = False
break
else: # text
if not atomic_write_text(path, content):
success = False
break
if success:
self.rollback_manager.commit_transaction()
else:
self.rollback_manager.rollback_transaction()
return False
else:
# Exception - rollback
self.rollback_manager.rollback_transaction()
return False # Don't suppress exception
def add_file(self, path: Path, content: Union[str, Dict], content_type: str = 'text'):
"""Queue a file to be written on commit"""
self._files_to_write.append((path, content, content_type))
# ============================================================================
# Module Test
# ============================================================================
if __name__ == "__main__":
import tempfile
print("=== Atomic I/O Module Test ===\n")
with tempfile.TemporaryDirectory() as tmpdir:
tmpdir = Path(tmpdir)
# Test atomic write text
test_file = tmpdir / "test.txt"
content = "Hello, Atomic World!"
success = atomic_write_text(test_file, content)
print(f"[Test] Atomic write text: {'PASS' if success else 'FAIL'}")
# Verify content
with open(test_file, 'r') as f:
read_content = f.read()
print(f"[Test] Content verification: {'PASS' if read_content == content else 'FAIL'}")
# Test atomic write JSON
json_file = tmpdir / "test.json"
data = {"name": "test", "value": 42, "nested": {"key": "value"}}
success = atomic_write_json(json_file, data)
print(f"[Test] Atomic write JSON: {'PASS' if success else 'FAIL'}")
# Test atomic write JSONL
jsonl_file = tmpdir / "test.jsonl"
records = [
{"event": "start", "ts": "2026-01-01"},
{"event": "progress", "ts": "2026-01-02"},
{"event": "end", "ts": "2026-01-03"}
]
success = atomic_write_jsonl(jsonl_file, records)
print(f"[Test] Atomic write JSONL: {'PASS' if success else 'FAIL'}")
# Test append
new_record = {"event": "append", "ts": "2026-01-04"}
success = atomic_append_jsonl(jsonl_file, new_record)
print(f"[Test] Atomic append JSONL: {'PASS' if success else 'FAIL'}")
# Verify JSONL
with open(jsonl_file, 'r') as f:
lines = f.readlines()
print(f"[Test] JSONL line count: {'PASS' if len(lines) == 4 else 'FAIL'} (expected 4, got {len(lines)})")
# Test file hash
hash1 = compute_file_hash(test_file)
print(f"[Test] File hash computed: {hash1[:16]}...")
integrity = verify_file_integrity(test_file, hash1)
print(f"[Test] Integrity verification: {'PASS' if integrity else 'FAIL'}")
# Test SnapshotManager
archive_dir = tmpdir / "archive"
snapshot_mgr = SnapshotManager(archive_dir)
source_dir = tmpdir / "source"
source_dir.mkdir()
(source_dir / "file1.txt").write_text("content1")
(source_dir / "file2.txt").write_text("content2")
snapshot = snapshot_mgr.create_snapshot([source_dir], "test_snapshot")
print(f"[Test] Snapshot created: {'PASS' if snapshot.exists() else 'FAIL'}")
# Modify source
(source_dir / "file1.txt").write_text("modified")
# Restore snapshot
restore_success = snapshot_mgr.restore_snapshot(snapshot, [source_dir])
print(f"[Test] Snapshot restore: {'PASS' if restore_success else 'FAIL'}")
# Verify restoration
restored_content = (source_dir / "file1.txt").read_text()
print(f"[Test] Restored content: {'PASS' if restored_content == 'content1' else 'FAIL'}")
print("\n=== All tests completed ===")