"""Unified local library management system combining database, initialization, migration, and search. This module provides: - SQLite database management for local file metadata caching - Library scanning and database initialization - Sidecar file migration from old .tags/.metadata files to database - Optimized search functionality using database indices - Worker task tracking for background operations """ from __future__ import annotations import sqlite3 import json import logging import subprocess import shutil from datetime import datetime from pathlib import Path from typing import Optional, Dict, Any, List, Tuple, Set logger = logging.getLogger(__name__) # Try to import optional dependencies try: import mutagen except ImportError: mutagen = None try: from metadata import ( _read_sidecar_metadata, _derive_sidecar_path, write_tags, write_tags_to_file, embed_metadata_in_file, read_tags_from_file, ) METADATA_AVAILABLE = True except ImportError: _read_sidecar_metadata = None _derive_sidecar_path = None write_tags = None write_tags_to_file = None embed_metadata_in_file = None read_tags_from_file = None METADATA_AVAILABLE = False # Media extensions to index MEDIA_EXTENSIONS = { '.mp4', '.mkv', '.mka', '.webm', '.avi', '.mov', '.flv', '.wmv', '.m4v', '.mp3', '.flac', '.wav', '.aac', '.ogg', '.m4a', '.wma', '.jpg', '.jpeg', '.png', '.gif', '.webp', '.bmp', '.tiff', '.pdf', '.epub', '.txt', '.docx', '.doc' } # ============================================================================ # SIDECAR FILE HANDLING # ============================================================================ def read_sidecar(sidecar_path: Path) -> Tuple[Optional[str], List[str], List[str]]: """Read metadata from a sidecar file. Delegates to metadata._read_sidecar_metadata for centralized handling. Args: sidecar_path: Path to .tags sidecar file Returns: Tuple of (hash_value, tags_list, urls_list) Returns (None, [], []) if file doesn't exist or can't be read """ if _read_sidecar_metadata is None: return None, [], [] try: return _read_sidecar_metadata(sidecar_path) except Exception: return None, [], [] def write_sidecar(media_path: Path, tags: List[str], known_urls: List[str], hash_value: Optional[str] = None) -> bool: """Write metadata to a sidecar file. Delegates to metadata.write_tags for centralized handling. Args: media_path: Path to the media file (sidecar created as media_path.tags) tags: List of tag strings known_urls: List of known URL strings hash_value: Optional SHA256 hash to include Returns: True if successful, False otherwise """ if write_tags is None: return False if media_path.exists() and media_path.is_dir(): return False try: write_tags(media_path, tags, known_urls, hash_value) return True except Exception: return False def find_sidecar(media_path: Path) -> Optional[Path]: """Find the sidecar file for a media path. Uses metadata._derive_sidecar_path for centralized handling. Args: media_path: Path to media file Returns: Path to existing sidecar file, or None if not found """ if media_path.is_dir(): return None if _derive_sidecar_path is None: return None try: # Check for new format: filename.ext.tags sidecar_path = _derive_sidecar_path(media_path) if sidecar_path.exists(): return sidecar_path except OSError: pass return None def has_sidecar(media_path: Path) -> bool: """Check if a media file has a sidecar.""" return find_sidecar(media_path) is not None class LocalLibraryDB: """SQLite database for caching local library metadata.""" DB_NAME = ".downlow_library.db" SCHEMA_VERSION = 2 def __init__(self, library_root: Path): """Initialize the database at the library root. Args: library_root: Path to the local library root directory """ self.library_root = Path(library_root) self.db_path = self.library_root / self.DB_NAME self.connection: Optional[sqlite3.Connection] = None self._init_db() def _init_db(self) -> None: """Initialize database connection and create tables if needed.""" try: # Use check_same_thread=False to allow multi-threaded access # This is safe because we're not sharing connections across threads; # each thread will get its own cursor self.connection = sqlite3.connect(str(self.db_path), check_same_thread=False) self.connection.row_factory = sqlite3.Row self.connection.execute("PRAGMA foreign_keys = ON") self._create_tables() logger.info(f"Database initialized at {self.db_path}") except Exception as e: logger.error(f"Failed to initialize database: {e}", exc_info=True) raise def _create_tables(self) -> None: """Create database tables if they don't exist.""" cursor = self.connection.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS files ( id INTEGER PRIMARY KEY AUTOINCREMENT, file_path TEXT UNIQUE NOT NULL, file_hash TEXT, file_size INTEGER, file_modified REAL, indexed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) cursor.execute(""" CREATE TABLE IF NOT EXISTS metadata ( id INTEGER PRIMARY KEY AUTOINCREMENT, file_id INTEGER UNIQUE NOT NULL, hash TEXT, known_urls TEXT, relationships TEXT, duration REAL, size INTEGER, ext TEXT, media_type TEXT, media_kind TEXT, time_imported TIMESTAMP DEFAULT CURRENT_TIMESTAMP, time_modified TIMESTAMP DEFAULT CURRENT_TIMESTAMP, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (file_id) REFERENCES files(id) ON DELETE CASCADE ) """) cursor.execute(""" CREATE TABLE IF NOT EXISTS tags ( id INTEGER PRIMARY KEY AUTOINCREMENT, file_id INTEGER NOT NULL, tag TEXT NOT NULL, tag_type TEXT DEFAULT 'user', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (file_id) REFERENCES files(id) ON DELETE CASCADE, UNIQUE(file_id, tag) ) """) cursor.execute(""" CREATE TABLE IF NOT EXISTS notes ( id INTEGER PRIMARY KEY AUTOINCREMENT, file_id INTEGER UNIQUE NOT NULL, note TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (file_id) REFERENCES files(id) ON DELETE CASCADE ) """) # Worker tracking tables (drop legacy workers table if still present) self._ensure_worker_tables(cursor) # Create indices for performance cursor.execute("CREATE INDEX IF NOT EXISTS idx_files_path ON files(file_path)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_files_hash ON files(file_hash)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_tags_file_id ON tags(file_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_tags_tag ON tags(tag)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_metadata_file_id ON metadata(file_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_worker_id ON worker(worker_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_worker_status ON worker(status)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_worker_type ON worker(worker_type)") self._migrate_metadata_schema(cursor) self.connection.commit() logger.debug("Database tables created/verified") def _ensure_worker_tables(self, cursor) -> None: """Ensure the modern worker tables exist, dropping legacy ones if needed.""" cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='worker'") has_worker = cursor.fetchone() is not None if not has_worker: cursor.execute("DROP TABLE IF EXISTS workers") cursor.execute(""" CREATE TABLE worker ( id INTEGER PRIMARY KEY AUTOINCREMENT, worker_id TEXT UNIQUE NOT NULL, worker_type TEXT NOT NULL, pipe TEXT, status TEXT DEFAULT 'running', title TEXT, description TEXT, progress REAL DEFAULT 0.0, current_step TEXT, total_steps INTEGER DEFAULT 0, error_message TEXT, result_data TEXT, stdout TEXT DEFAULT '', steps TEXT DEFAULT '', started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, completed_at TIMESTAMP, last_stdout_at TIMESTAMP, last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) else: self._ensure_worker_columns(cursor) cursor.execute(""" CREATE TABLE IF NOT EXISTS worker_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, worker_id TEXT NOT NULL, event_type TEXT NOT NULL, step TEXT, channel TEXT, message TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY(worker_id) REFERENCES worker(worker_id) ON DELETE CASCADE ) """) cursor.execute("CREATE INDEX IF NOT EXISTS idx_worker_log_worker_id ON worker_log(worker_id)") def _ensure_worker_columns(self, cursor) -> None: """Backfill columns for older worker tables during upgrade.""" try: cursor.execute('PRAGMA table_info(worker)') existing_columns = {row[1] for row in cursor.fetchall()} except Exception as exc: logger.error(f"Error introspecting worker table: {exc}") return column_specs = { 'pipe': "TEXT", 'progress': "REAL DEFAULT 0.0", 'current_step': "TEXT", 'total_steps': "INTEGER DEFAULT 0", 'stdout': "TEXT DEFAULT ''", 'steps': "TEXT DEFAULT ''", 'last_stdout_at': "TIMESTAMP" } for col_name, ddl in column_specs.items(): if col_name not in existing_columns: try: cursor.execute(f"ALTER TABLE worker ADD COLUMN {col_name} {ddl}") logger.info(f"Added '{col_name}' column to worker table") except Exception as exc: logger.warning(f"Could not add '{col_name}' column to worker table: {exc}") def _insert_worker_log_entry(self, cursor, worker_id: str, event_type: str, message: str, step: Optional[str] = None, channel: Optional[str] = None) -> None: if not message: return cursor.execute(""" INSERT INTO worker_log (worker_id, event_type, step, channel, message) VALUES (?, ?, ?, ?, ?) """, (worker_id, event_type, step, channel, message)) def get_worker_events(self, worker_id: str, limit: int = 500) -> List[Dict[str, Any]]: """Return chronological worker log events for timelines.""" try: cursor = self.connection.cursor() cursor.execute(""" SELECT id, event_type, step, channel, message, created_at FROM worker_log WHERE worker_id = ? ORDER BY id ASC LIMIT ? """, (worker_id, limit)) return [dict(row) for row in cursor.fetchall()] except Exception as exc: logger.error(f"Error retrieving worker events for {worker_id}: {exc}", exc_info=True) return [] def clear_worker_events(self, worker_id: str, event_type: Optional[str] = None) -> None: """Remove worker log entries, optionally filtered by event type.""" try: cursor = self.connection.cursor() if event_type: cursor.execute( "DELETE FROM worker_log WHERE worker_id = ? AND event_type = ?", (worker_id, event_type) ) else: cursor.execute("DELETE FROM worker_log WHERE worker_id = ?", (worker_id,)) self.connection.commit() except Exception as exc: logger.error(f"Error clearing worker log for {worker_id}: {exc}", exc_info=True) def _migrate_metadata_schema(self, cursor) -> None: """Add missing columns to metadata table if they don't exist.""" try: cursor.execute('PRAGMA table_info(metadata)') existing_columns = {row[1] for row in cursor.fetchall()} for col_name, col_def in [('size', 'INTEGER'), ('ext', 'TEXT'), ('time_imported', 'TIMESTAMP DEFAULT CURRENT_TIMESTAMP'), ('time_modified', 'TIMESTAMP DEFAULT CURRENT_TIMESTAMP')]: if col_name not in existing_columns: try: cursor.execute(f"ALTER TABLE metadata ADD COLUMN {col_name} {col_def}") logger.info(f"Added '{col_name}' column to metadata table") except Exception as e: logger.warning(f"Could not add '{col_name}' column: {e}") except Exception as e: logger.error(f"Error during metadata schema migration: {e}") def _update_metadata_modified_time(self, file_id: int) -> None: """Update the time_modified timestamp for a file's metadata.""" try: cursor = self.connection.cursor() cursor.execute(""" UPDATE metadata SET time_modified = CURRENT_TIMESTAMP WHERE file_id = ? """, (file_id,)) self.connection.commit() except Exception as e: logger.debug(f"Could not update metadata modified time for file_id {file_id}: {e}") def get_or_create_file_entry(self, file_path: Path) -> int: """Get or create a file entry in the database.""" try: str_path = str(file_path.resolve()) logger.debug(f"[get_or_create_file_entry] Looking up: {str_path}") cursor = self.connection.cursor() cursor.execute("SELECT id FROM files WHERE file_path = ?", (str_path,)) row = cursor.fetchone() if row: logger.debug(f"[get_or_create_file_entry] Found existing file_id: {row[0]}") return row[0] logger.debug(f"[get_or_create_file_entry] File entry not found, creating new one") stat = file_path.stat() cursor.execute(""" INSERT INTO files (file_path, file_size, file_modified) VALUES (?, ?, ?) """, (str_path, stat.st_size, stat.st_mtime)) file_id = cursor.lastrowid logger.debug(f"[get_or_create_file_entry] Created new file_id: {file_id}") # Auto-create title tag filename_without_ext = file_path.stem if filename_without_ext: # Normalize underscores to spaces for consistency title_value = filename_without_ext.replace("_", " ").strip() title_tag = f"title:{title_value}" cursor.execute(""" INSERT OR IGNORE INTO tags (file_id, tag, tag_type) VALUES (?, ?, 'user') """, (file_id, title_tag)) logger.debug(f"[get_or_create_file_entry] Auto-created title tag for file_id {file_id}") self.connection.commit() logger.debug(f"[get_or_create_file_entry] Committed file entry {file_id}") return file_id except Exception as e: logger.error(f"[get_or_create_file_entry] ❌ Error getting/creating file entry for {file_path}: {e}", exc_info=True) raise def get_metadata(self, file_path: Path) -> Optional[Dict[str, Any]]: """Get metadata for a file.""" try: str_path = str(file_path.resolve()) cursor = self.connection.cursor() cursor.execute(""" SELECT m.* FROM metadata m JOIN files f ON m.file_id = f.id WHERE f.file_path = ? """, (str_path,)) row = cursor.fetchone() if not row: return None metadata = dict(row) # Parse JSON fields for field in ['known_urls', 'relationships']: if metadata.get(field): try: metadata[field] = json.loads(metadata[field]) except (json.JSONDecodeError, TypeError): metadata[field] = [] if field == 'known_urls' else [] return metadata except Exception as e: logger.error(f"Error getting metadata for {file_path}: {e}", exc_info=True) return None def save_metadata(self, file_path: Path, metadata: Dict[str, Any]) -> None: """Save metadata for a file.""" try: str_path = str(file_path.resolve()) logger.debug(f"[save_metadata] Starting save for: {str_path}") file_id = self.get_or_create_file_entry(file_path) logger.debug(f"[save_metadata] Got/created file_id: {file_id}") cursor = self.connection.cursor() known_urls = metadata.get('known_urls', []) if not isinstance(known_urls, str): known_urls = json.dumps(known_urls) relationships = metadata.get('relationships', []) if not isinstance(relationships, str): relationships = json.dumps(relationships) cursor.execute(""" INSERT INTO metadata ( file_id, hash, known_urls, relationships, duration, size, ext, media_type, media_kind, time_imported, time_modified ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) ON CONFLICT(file_id) DO UPDATE SET hash = excluded.hash, known_urls = excluded.known_urls, relationships = excluded.relationships, duration = excluded.duration, size = excluded.size, ext = excluded.ext, media_type = excluded.media_type, media_kind = excluded.media_kind, time_modified = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP """, ( file_id, metadata.get('hash'), known_urls, relationships, metadata.get('duration'), metadata.get('size'), metadata.get('ext'), metadata.get('media_type'), metadata.get('media_kind') )) self.connection.commit() logger.debug(f"[save_metadata] ✅ Committed metadata for file_id {file_id}") except Exception as e: logger.error(f"[save_metadata] ❌ Error saving metadata for {file_path}: {e}", exc_info=True) raise def get_tags(self, file_path: Path) -> List[str]: """Get all tags for a file.""" try: str_path = str(file_path.resolve()) cursor = self.connection.cursor() cursor.execute(""" SELECT t.tag FROM tags t JOIN files f ON t.file_id = f.id WHERE f.file_path = ? ORDER BY t.tag """, (str_path,)) return [row[0] for row in cursor.fetchall()] except Exception as e: logger.error(f"Error getting tags for {file_path}: {e}", exc_info=True) return [] def save_tags(self, file_path: Path, tags: List[str]) -> None: """Save tags for a file, replacing all existing tags.""" try: str_path = str(file_path.resolve()) logger.debug(f"[save_tags] Starting save for: {str_path}") file_id = self.get_or_create_file_entry(file_path) logger.debug(f"[save_tags] Got/created file_id: {file_id}") cursor = self.connection.cursor() cursor.execute(""" SELECT tag FROM tags WHERE file_id = ? AND tag LIKE 'title:%' """, (file_id,)) existing_title = cursor.fetchone() cursor.execute("DELETE FROM tags WHERE file_id = ?", (file_id,)) logger.debug(f"[save_tags] Deleted existing tags for file_id {file_id}") if existing_title: cursor.execute(""" INSERT INTO tags (file_id, tag, tag_type) VALUES (?, ?, 'user') """, (file_id, existing_title[0])) logger.debug(f"[save_tags] Preserved existing title tag") else: filename_without_ext = file_path.stem if filename_without_ext: # Normalize underscores to spaces for consistency title_value = filename_without_ext.replace("_", " ").strip() title_tag = f"title:{title_value}" cursor.execute(""" INSERT INTO tags (file_id, tag, tag_type) VALUES (?, ?, 'user') """, (file_id, title_tag)) logger.debug(f"[save_tags] Created auto-title tag: {title_tag}") for tag in tags: tag = tag.strip() if tag: cursor.execute(""" INSERT OR IGNORE INTO tags (file_id, tag, tag_type) VALUES (?, ?, 'user') """, (file_id, tag)) self.connection.commit() logger.debug(f"[save_tags] ✅ Committed {len(tags)} tags for file_id {file_id}") # Verify they were actually saved cursor.execute("SELECT COUNT(*) FROM tags WHERE file_id = ?", (file_id,)) saved_count = cursor.fetchone()[0] logger.debug(f"[save_tags] Verified: {saved_count} tags in database for file_id {file_id}") self._update_metadata_modified_time(file_id) except Exception as e: logger.error(f"[save_tags] ❌ Error saving tags for {file_path}: {e}", exc_info=True) raise def add_tags(self, file_path: Path, tags: List[str]) -> None: """Add tags to a file.""" try: file_id = self.get_or_create_file_entry(file_path) cursor = self.connection.cursor() user_title_tag = next((tag.strip() for tag in tags if tag.strip().lower().startswith('title:')), None) if user_title_tag: cursor.execute(""" DELETE FROM tags WHERE file_id = ? AND tag LIKE 'title:%' """, (file_id,)) else: cursor.execute(""" SELECT COUNT(*) FROM tags WHERE file_id = ? AND tag LIKE 'title:%' """, (file_id,)) has_title = cursor.fetchone()[0] > 0 if not has_title: filename_without_ext = file_path.stem if filename_without_ext: # Normalize underscores to spaces for consistency title_value = filename_without_ext.replace("_", " ").strip() title_tag = f"title:{title_value}" cursor.execute(""" INSERT OR IGNORE INTO tags (file_id, tag, tag_type) VALUES (?, ?, 'user') """, (file_id, title_tag)) for tag in tags: tag = tag.strip() if tag: cursor.execute(""" INSERT OR IGNORE INTO tags (file_id, tag, tag_type) VALUES (?, ?, 'user') """, (file_id, tag)) self.connection.commit() self._update_metadata_modified_time(file_id) logger.debug(f"Added {len(tags)} tags for {file_path}") except Exception as e: logger.error(f"Error adding tags for {file_path}: {e}", exc_info=True) raise def remove_tags(self, file_path: Path, tags: List[str]) -> None: """Remove specific tags from a file.""" try: str_path = str(file_path.resolve()) cursor = self.connection.cursor() for tag in tags: tag = tag.strip() if tag: cursor.execute(""" DELETE FROM tags WHERE file_id = (SELECT id FROM files WHERE file_path = ?) AND tag = ? """, (str_path, tag)) self.connection.commit() logger.debug(f"Removed {len(tags)} tags for {file_path}") except Exception as e: logger.error(f"Error removing tags for {file_path}: {e}", exc_info=True) raise def get_note(self, file_path: Path) -> Optional[str]: """Get note for a file.""" try: str_path = str(file_path.resolve()) cursor = self.connection.cursor() cursor.execute(""" SELECT n.note FROM notes n JOIN files f ON n.file_id = f.id WHERE f.file_path = ? """, (str_path,)) row = cursor.fetchone() return row[0] if row else None except Exception as e: logger.error(f"Error getting note for {file_path}: {e}", exc_info=True) return None def save_note(self, file_path: Path, note: str) -> None: """Save note for a file.""" try: file_id = self.get_or_create_file_entry(file_path) cursor = self.connection.cursor() cursor.execute(""" INSERT INTO notes (file_id, note) VALUES (?, ?) ON CONFLICT(file_id) DO UPDATE SET note = excluded.note, updated_at = CURRENT_TIMESTAMP """, (file_id, note)) self.connection.commit() logger.debug(f"Saved note for {file_path}") except Exception as e: logger.error(f"Error saving note for {file_path}: {e}", exc_info=True) raise def search_by_tag(self, tag: str, limit: int = 100) -> List[Path]: """Search for files with a specific tag.""" try: cursor = self.connection.cursor() cursor.execute(""" SELECT DISTINCT f.file_path FROM files f JOIN tags t ON f.id = t.file_id WHERE t.tag = ? LIMIT ? """, (tag, limit)) return [Path(row[0]) for row in cursor.fetchall()] except Exception as e: logger.error(f"Error searching by tag '{tag}': {e}", exc_info=True) return [] def search_by_hash(self, file_hash: str) -> Optional[Path]: """Search for a file by hash.""" try: cursor = self.connection.cursor() cursor.execute(""" SELECT file_path FROM files WHERE file_hash = ? """, (file_hash,)) row = cursor.fetchone() return Path(row[0]) if row else None except Exception as e: logger.error(f"Error searching by hash '{file_hash}': {e}", exc_info=True) return None def update_file_hash(self, file_path: Path, file_hash: str) -> None: """Update the file hash.""" try: str_path = str(file_path.resolve()) cursor = self.connection.cursor() cursor.execute(""" UPDATE files SET file_hash = ?, updated_at = CURRENT_TIMESTAMP WHERE file_path = ? """, (file_hash, str_path)) self.connection.commit() logger.debug(f"Updated hash for {file_path}") except Exception as e: logger.error(f"Error updating file hash for {file_path}: {e}", exc_info=True) raise def rename_file(self, old_path: Path, new_path: Path) -> None: """Rename a file in the database, preserving all metadata.""" try: str_old_path = str(old_path.resolve()) str_new_path = str(new_path.resolve()) cursor = self.connection.cursor() cursor.execute(""" UPDATE files SET file_path = ?, updated_at = CURRENT_TIMESTAMP WHERE file_path = ? """, (str_new_path, str_old_path)) self.connection.commit() logger.debug(f"Renamed file in database: {old_path} → {new_path}") except Exception as e: logger.error(f"Error renaming file from {old_path} to {new_path}: {e}", exc_info=True) raise def cleanup_missing_files(self) -> int: """Remove entries for files that no longer exist.""" try: cursor = self.connection.cursor() cursor.execute("SELECT id, file_path FROM files") removed_count = 0 for row_id, file_path in cursor.fetchall(): if not Path(file_path).exists(): cursor.execute("DELETE FROM files WHERE id = ?", (row_id,)) removed_count += 1 self.connection.commit() logger.info(f"Cleaned up {removed_count} missing file entries") return removed_count except Exception as e: logger.error(f"Error cleaning up missing files: {e}", exc_info=True) return 0 # ======================================================================== # WORKER MANAGEMENT # ======================================================================== def insert_worker(self, worker_id: str, worker_type: str, title: str = "", description: str = "", total_steps: int = 0, pipe: Optional[str] = None) -> int: """Insert a new worker entry into the database.""" try: cursor = self.connection.cursor() cursor.execute(""" INSERT INTO worker (worker_id, worker_type, pipe, status, title, description, total_steps) VALUES (?, ?, ?, ?, ?, ?, ?) """, (worker_id, worker_type, pipe, 'running', title, description, total_steps)) self.connection.commit() return cursor.lastrowid or 0 except sqlite3.IntegrityError: return self.update_worker_status(worker_id, 'running') except Exception as e: logger.error(f"Error inserting worker: {e}", exc_info=True) return 0 def update_worker(self, worker_id: str, **kwargs) -> bool: """Update worker entry with given fields.""" try: allowed_fields = { 'status', 'progress', 'current_step', 'error_message', 'result_data', 'title', 'description', 'completed_at', 'total_steps', 'pipe', 'started_at', 'last_stdout_at' } update_fields = {k: v for k, v in kwargs.items() if k in allowed_fields} if not update_fields: return True update_fields['last_updated'] = datetime.now().isoformat() cursor = self.connection.cursor() set_clause = ", ".join(f"{k} = ?" for k in update_fields.keys()) values = list(update_fields.values()) + [worker_id] cursor.execute(f""" UPDATE worker SET {set_clause} WHERE worker_id = ? """, values) self.connection.commit() return cursor.rowcount > 0 except Exception as e: logger.error(f"Error updating worker {worker_id}: {e}", exc_info=True) return False def update_worker_status(self, worker_id: str, status: str) -> int: """Update worker status and return its database ID.""" try: cursor = self.connection.cursor() if status in ('completed', 'error'): cursor.execute(""" UPDATE worker SET status = ?, completed_at = CURRENT_TIMESTAMP, last_updated = CURRENT_TIMESTAMP WHERE worker_id = ? """, (status, worker_id)) else: cursor.execute(""" UPDATE worker SET status = ?, last_updated = CURRENT_TIMESTAMP WHERE worker_id = ? """, (status, worker_id)) self.connection.commit() cursor.execute("SELECT id FROM worker WHERE worker_id = ?", (worker_id,)) row = cursor.fetchone() return row[0] if row else 0 except Exception as e: logger.error(f"Error updating worker status: {e}", exc_info=True) return 0 def get_worker(self, worker_id: str) -> Optional[Dict[str, Any]]: """Retrieve a worker entry by ID.""" try: cursor = self.connection.cursor() cursor.execute("SELECT * FROM worker WHERE worker_id = ?", (worker_id,)) row = cursor.fetchone() return dict(row) if row else None except Exception as e: logger.error(f"Error retrieving worker: {e}", exc_info=True) return None def get_active_workers(self) -> List[Dict[str, Any]]: """Get all active (running) workers.""" try: cursor = self.connection.cursor() cursor.execute("SELECT * FROM worker WHERE status = 'running' ORDER BY started_at DESC") return [dict(row) for row in cursor.fetchall()] except Exception as e: logger.error(f"Error retrieving active workers: {e}", exc_info=True) return [] def get_all_workers(self, limit: int = 100) -> List[Dict[str, Any]]: """Get all workers (recent first).""" try: cursor = self.connection.cursor() cursor.execute(""" SELECT * FROM worker ORDER BY started_at DESC LIMIT ? """, (limit,)) return [dict(row) for row in cursor.fetchall()] except Exception as e: logger.error(f"Error retrieving all workers: {e}", exc_info=True) return [] def delete_worker(self, worker_id: str) -> bool: """Delete a worker entry.""" try: cursor = self.connection.cursor() cursor.execute("DELETE FROM worker WHERE worker_id = ?", (worker_id,)) self.connection.commit() return cursor.rowcount > 0 except Exception as e: logger.error(f"Error deleting worker: {e}", exc_info=True) return False def cleanup_old_workers(self, days: int = 7) -> int: """Clean up completed/errored workers older than specified days.""" try: cursor = self.connection.cursor() cursor.execute(""" DELETE FROM worker WHERE status IN ('completed', 'error') AND completed_at < datetime('now', '-' || ? || ' days') """, (days,)) self.connection.commit() return cursor.rowcount except Exception as e: logger.error(f"Error cleaning up old workers: {e}", exc_info=True) return 0 def expire_running_workers( self, older_than_seconds: int = 300, status: str = "error", reason: str | None = None, worker_id_prefix: str | None = None, ) -> int: """Mark long-idle running workers as finished with the given status. Args: older_than_seconds: Minimum idle time before expiring the worker. status: New status to apply (e.g., "error" or "cancelled"). reason: Error message to set when none is present. worker_id_prefix: Optional LIKE pattern (e.g., 'cli_%') to scope updates. Returns: Number of workers updated. """ idle_seconds = max(1, int(older_than_seconds)) cutoff = f"-{idle_seconds} seconds" auto_reason = reason or "Worker stopped responding; auto-marked as error" try: cursor = self.connection.cursor() if worker_id_prefix: cursor.execute( """ UPDATE worker SET status = ?, error_message = CASE WHEN IFNULL(TRIM(error_message), '') = '' THEN ? ELSE error_message END, completed_at = COALESCE(completed_at, CURRENT_TIMESTAMP), last_updated = CURRENT_TIMESTAMP WHERE status = 'running' AND worker_id LIKE ? AND COALESCE(last_updated, started_at, created_at) < datetime('now', ?) """, (status, auto_reason, worker_id_prefix, cutoff), ) else: cursor.execute( """ UPDATE worker SET status = ?, error_message = CASE WHEN IFNULL(TRIM(error_message), '') = '' THEN ? ELSE error_message END, completed_at = COALESCE(completed_at, CURRENT_TIMESTAMP), last_updated = CURRENT_TIMESTAMP WHERE status = 'running' AND COALESCE(last_updated, started_at, created_at) < datetime('now', ?) """, (status, auto_reason, cutoff), ) self.connection.commit() return cursor.rowcount except Exception as exc: logger.error(f"Error expiring stale workers: {exc}", exc_info=True) return 0 def append_worker_stdout(self, worker_id: str, text: str, step: Optional[str] = None, channel: str = "stdout") -> bool: """Append text to a worker's stdout log and timeline.""" if not text: return True try: cursor = self.connection.cursor() cursor.execute("SELECT stdout FROM worker WHERE worker_id = ?", (worker_id,)) row = cursor.fetchone() if not row: logger.warning(f"Worker {worker_id} not found for stdout append") return False current_stdout = row[0] or "" separator = "" if not current_stdout else ("" if current_stdout.endswith("\n") else "\n") new_stdout = f"{current_stdout}{separator}{text}\n" cursor.execute(""" UPDATE worker SET stdout = ?, last_updated = CURRENT_TIMESTAMP, last_stdout_at = CURRENT_TIMESTAMP WHERE worker_id = ? """, (new_stdout, worker_id)) self._insert_worker_log_entry(cursor, worker_id, 'stdout', text, step, channel) self.connection.commit() return cursor.rowcount > 0 except Exception as e: logger.error(f"Error appending stdout to worker {worker_id}: {e}", exc_info=True) return False def get_worker_stdout(self, worker_id: str) -> str: """Get stdout logs for a worker.""" try: cursor = self.connection.cursor() cursor.execute("SELECT stdout FROM worker WHERE worker_id = ?", (worker_id,)) row = cursor.fetchone() return row[0] if row and row[0] else "" except Exception as e: logger.error(f"Error getting worker stdout for {worker_id}: {e}", exc_info=True) return "" def append_worker_steps(self, worker_id: str, step_text: str) -> bool: """Append a step to a worker's step log and timeline.""" if not step_text: return True try: cursor = self.connection.cursor() cursor.execute("SELECT steps FROM worker WHERE worker_id = ?", (worker_id,)) row = cursor.fetchone() if not row: logger.warning(f"Worker {worker_id} not found for steps append") return False current_steps = row[0] or "" timestamp = datetime.now().strftime('%H:%M:%S') step_entry = f"[{timestamp}] {step_text}" new_steps = (current_steps + "\n" if current_steps else "") + step_entry cursor.execute(""" UPDATE worker SET steps = ?, last_updated = CURRENT_TIMESTAMP, current_step = ? WHERE worker_id = ? """, (new_steps, step_text, worker_id)) self._insert_worker_log_entry(cursor, worker_id, 'step', step_text, step_text, 'step') self.connection.commit() return cursor.rowcount > 0 except Exception as e: logger.error(f"Error appending step to worker {worker_id}: {e}", exc_info=True) return False def get_worker_steps(self, worker_id: str) -> str: """Get step logs for a worker.""" try: cursor = self.connection.cursor() cursor.execute("SELECT steps FROM worker WHERE worker_id = ?", (worker_id,)) row = cursor.fetchone() return row[0] if row and row[0] else "" except Exception as e: logger.error(f"Error getting worker steps for {worker_id}: {e}", exc_info=True) return "" def clear_worker_stdout(self, worker_id: str) -> bool: """Clear stdout logs for a worker.""" try: cursor = self.connection.cursor() cursor.execute(""" UPDATE worker SET stdout = '', last_updated = CURRENT_TIMESTAMP WHERE worker_id = ? """, (worker_id,)) self.clear_worker_events(worker_id, event_type='stdout') self.connection.commit() return cursor.rowcount > 0 except Exception as e: logger.error(f"Error clearing worker stdout: {e}", exc_info=True) return False def clear_finished_workers(self) -> int: """Delete all workers that are not currently running.""" try: cursor = self.connection.cursor() cursor.execute("DELETE FROM worker WHERE status != 'running'") self.connection.commit() return cursor.rowcount except Exception as e: logger.error(f"Error clearing finished workers: {e}", exc_info=True) return 0 def close(self) -> None: """Close the database connection.""" try: if self.connection: self.connection.close() logger.info("Database connection closed") except Exception as e: logger.error(f"Error closing database: {e}", exc_info=True) def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() # ============================================================================ # LIBRARY INITIALIZATION & MIGRATION # ============================================================================ class LocalLibraryInitializer: """Initialize and synchronize local library database.""" def __init__(self, library_root: Path): """Initialize the database scanner.""" self.library_root = Path(library_root) self.db = LocalLibraryDB(library_root) self.stats = { 'files_scanned': 0, 'files_new': 0, 'files_existing': 0, 'sidecars_imported': 0, 'sidecars_deleted': 0, 'tags_imported': 0, 'metadata_imported': 0, 'errors': 0 } def scan_and_index(self) -> Dict[str, int]: """Scan library folder and populate database with file entries.""" try: logger.info(f"Starting library scan at {self.library_root}") media_files = self._find_media_files() logger.info(f"Found {len(media_files)} media files") db_files = self._get_database_files() logger.info(f"Found {len(db_files)} files in database") for file_path in media_files: self._process_file(file_path, db_files) self.db.connection.commit() self._import_sidecars_batch() self.db.connection.commit() self._cleanup_orphaned_sidecars() self.db.connection.commit() logger.info(f"Library scan complete. Stats: {self.stats}") return self.stats except Exception as e: logger.error(f"Error during library scan: {e}", exc_info=True) self.stats['errors'] += 1 raise finally: self.db.close() def _find_media_files(self) -> List[Path]: """Find all media files in the library folder.""" media_files = [] try: for file_path in self.library_root.rglob("*"): if file_path.is_file() and file_path.suffix.lower() in MEDIA_EXTENSIONS: media_files.append(file_path) except Exception as e: logger.error(f"Error scanning media files: {e}", exc_info=True) return sorted(media_files) def _get_database_files(self) -> Dict[str, int]: """Get existing files from database by normalized path.""" try: cursor = self.db.connection.cursor() cursor.execute("SELECT id, file_path FROM files") result = {} for file_id, file_path in cursor.fetchall(): normalized = str(Path(file_path).resolve()).lower() result[normalized] = file_id return result except Exception as e: logger.error(f"Error getting database files: {e}", exc_info=True) return {} def _process_file(self, file_path: Path, db_files: Dict[str, int]) -> None: """Process a single media file.""" try: normalized = str(file_path.resolve()).lower() if normalized in db_files: self.stats['files_existing'] += 1 else: self.db.get_or_create_file_entry(file_path) self.stats['files_new'] += 1 self.stats['files_scanned'] += 1 except Exception as e: logger.warning(f"Error processing file {file_path}: {e}") self.stats['errors'] += 1 def _import_sidecars_batch(self) -> None: """Batch import all sidecar files.""" try: for sidecar_path in self.library_root.rglob("*.tags"): try: base_path = Path(str(sidecar_path)[:-len('.tags')]) if not base_path.exists(): continue hash_val, tags, urls = read_sidecar(sidecar_path) if hash_val or tags or urls: if hash_val: self.db.update_file_hash(base_path, hash_val) if tags: self.db.save_tags(base_path, tags) if urls: self.db.save_metadata(base_path, {'known_urls': urls}) self.stats['sidecars_imported'] += 1 except Exception as e: logger.warning(f"Error importing sidecar {sidecar_path}: {e}") self.stats['errors'] += 1 except Exception as e: logger.error(f"Error batch importing sidecars: {e}", exc_info=True) def _cleanup_orphaned_sidecars(self) -> None: """Remove sidecars for non-existent files.""" try: for sidecar_path in self.library_root.rglob("*.tags"): base_path = Path(str(sidecar_path)[:-len('.tags')]) if not base_path.exists(): try: sidecar_path.unlink() self.stats['sidecars_deleted'] += 1 except Exception as e: logger.warning(f"Could not delete orphaned sidecar {sidecar_path}: {e}") except Exception as e: logger.error(f"Error cleaning up orphaned sidecars: {e}", exc_info=True) def migrate_tags_to_db(library_root: Path, db: LocalLibraryDB) -> int: """Migrate .tags files to the database.""" migrated_count = 0 try: for tags_file in library_root.rglob("*.tags"): try: base_path = Path(str(tags_file)[:-len('.tags')]) tags_text = tags_file.read_text(encoding='utf-8') tags = [line.strip() for line in tags_text.splitlines() if line.strip()] db.save_tags(base_path, tags) migrated_count += 1 try: tags_file.unlink() logger.info(f"Migrated and deleted {tags_file}") except Exception as e: logger.warning(f"Migrated {tags_file} but failed to delete: {e}") except Exception as e: logger.warning(f"Failed to migrate {tags_file}: {e}") logger.info(f"Migrated {migrated_count} .tags files to database") return migrated_count except Exception as e: logger.error(f"Error during tags migration: {e}", exc_info=True) return migrated_count def migrate_metadata_to_db(library_root: Path, db: LocalLibraryDB) -> int: """Migrate .metadata files to the database.""" migrated_count = 0 try: for metadata_file in library_root.rglob("*.metadata"): try: base_path = Path(str(metadata_file)[:-len('.metadata')]) metadata_text = metadata_file.read_text(encoding='utf-8') metadata = _parse_metadata_file(metadata_text) db.save_metadata(base_path, metadata) migrated_count += 1 try: metadata_file.unlink() logger.info(f"Migrated and deleted {metadata_file}") except Exception as e: logger.warning(f"Migrated {metadata_file} but failed to delete: {e}") except Exception as e: logger.warning(f"Failed to migrate {metadata_file}: {e}") logger.info(f"Migrated {migrated_count} .metadata files to database") return migrated_count except Exception as e: logger.error(f"Error during metadata migration: {e}", exc_info=True) return migrated_count def _parse_metadata_file(content: str) -> Dict[str, Any]: """Parse metadata file content.""" try: return json.loads(content) except json.JSONDecodeError: logger.warning("Could not parse metadata JSON, returning empty dict") return {} def migrate_all(library_root: Path, db: Optional[LocalLibraryDB] = None) -> Dict[str, int]: """Migrate all sidecar files to database.""" should_close = db is None try: if db is None: db = LocalLibraryDB(library_root) return { 'tags': migrate_tags_to_db(library_root, db), 'metadata': migrate_metadata_to_db(library_root, db), } finally: if should_close: db.close() # ============================================================================ # SEARCH OPTIMIZATION # ============================================================================ class LocalLibrarySearchOptimizer: """Optimizer that uses database for local library searches.""" def __init__(self, library_root: Path): """Initialize the search optimizer.""" self.library_root = Path(library_root) self.db: Optional[LocalLibraryDB] = None def __enter__(self): """Context manager entry.""" self.db = LocalLibraryDB(self.library_root) return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit.""" if self.db: self.db.close() def get_cached_tags(self, file_path: Path) -> List[str]: """Get tags from database cache.""" if not self.db: return [] return self.db.get_tags(file_path) def get_cached_metadata(self, file_path: Path) -> Optional[Dict[str, Any]]: """Get metadata from database cache.""" if not self.db: return None return self.db.get_metadata(file_path) def prefetch_metadata(self, file_paths: List[Path]) -> None: """Pre-cache metadata for multiple files.""" if not self.db: return for file_path in file_paths: try: self.db.get_or_create_file_entry(file_path) except Exception as e: logger.warning(f"Failed to prefetch {file_path}: {e}") def update_search_result_with_cached_data(self, search_result: Any, file_path: Path) -> None: """Update a search result object with cached database data.""" if not self.db: return try: tags = self.db.get_tags(file_path) if tags: search_result.tag_summary = ", ".join(tags) metadata = self.db.get_metadata(file_path) if metadata: if 'hash' in metadata: search_result.hash_hex = metadata['hash'] if 'duration' in metadata: search_result.duration_seconds = metadata['duration'] if 'media_kind' in metadata: search_result.media_kind = metadata['media_kind'] except Exception as e: logger.warning(f"Failed to update search result for {file_path}: {e}") def search_by_tag(self, tag: str, limit: int = 100) -> List[Path]: """Fast tag-based search using database.""" if not self.db: return [] return self.db.search_by_tag(tag, limit) def search_by_hash(self, file_hash: str) -> Optional[Path]: """Fast hash-based search using database.""" if not self.db: return None return self.db.search_by_hash(file_hash)