From ca467a345332096c1a1826dad2a56dee8a999bdf Mon Sep 17 00:00:00 2001 From: Nose Date: Thu, 22 Jan 2026 03:12:16 -0800 Subject: [PATCH] f --- API/folder.py | 4115 --------------------------------- Store/registry.py | 70 - cmdlet/_shared.py | 5 +- cmdlet/add_file.py | 47 +- cmdlet/add_relationship.py | 4 +- cmdlet/delete_relationship.py | 484 +--- cmdlet/get_tag.py | 164 +- cmdlet/trim_file.py | 37 +- 8 files changed, 26 insertions(+), 4900 deletions(-) delete mode 100644 API/folder.py diff --git a/API/folder.py b/API/folder.py deleted file mode 100644 index 90985ad..0000000 --- a/API/folder.py +++ /dev/null @@ -1,4115 +0,0 @@ -"""Unified local library management system combining database, initialization, migration, and search. - -This module provides: -- SQLite database management for local file metadata caching -- Library scanning and database initialization -- Sidecar file migration from .tag/.metadata files to database -- Optimized search functionality using database indices -- Worker task tracking for background operations -""" - -from __future__ import annotations - -import sqlite3 -import json -import logging -import time -import os -from contextlib import contextmanager -from datetime import datetime -from pathlib import Path, PurePosixPath -from threading import RLock -from typing import Optional, Dict, Any, List, Tuple, Set, Sequence - -from SYS.utils import sha256_file, expand_path -from SYS.logger import debug as _debug - -logger = logging.getLogger(__name__) -WORKER_LOG_MAX_ENTRIES = 50 # Reduced from 99 to keep log size down -MAX_FINISHED_WORKERS = 100 # Only keep 100 finished workers globally - -# Wrapper: only emit folder DB diagnostics when MM_DEBUG=1 -def mm_debug(*args, **kwargs): - if not os.environ.get("MM_DEBUG"): - return - _debug(*args, **kwargs) - -# Helper: decorate DB write methods to retry transient SQLITE 'database is locked' errors -def _db_retry(max_attempts: int = 6, base_sleep: float = 0.1): - def _decorator(func): - def _wrapped(*args, **kwargs): - attempts = 0 - while True: - try: - return func(*args, **kwargs) - except sqlite3.OperationalError as e: - msg = str(e or "").lower() - if "database is locked" in msg and attempts < max_attempts: - attempts += 1 - sleep_time = min(base_sleep * (2 ** (attempts - 1)), 5.0) - logger.debug(f"[db_retry] database locked; retrying in {sleep_time:.2f}s ({attempts}/{max_attempts})") - time.sleep(sleep_time) - continue - raise - return _wrapped - return _decorator - -# Try to import optional dependencies -mutagen: Any -try: - import mutagen -except ImportError: - mutagen = None - -try: - from SYS.metadata import ( - _read_sidecar_metadata, - _derive_sidecar_path, - write_tags, - write_tags_to_file, - embed_metadata_in_file, - read_tags_from_file, - ) - - METADATA_AVAILABLE = True -except ImportError: - _read_sidecar_metadata = None # type: ignore - _derive_sidecar_path = None # type: ignore - write_tags = None # type: ignore - write_tags_to_file = None # type: ignore - embed_metadata_in_file = None # type: ignore - read_tags_from_file = None # type: ignore - METADATA_AVAILABLE = False - -# Media extensions to index -MEDIA_EXTENSIONS = { - ".mp4", - ".mkv", - ".mka", - ".webm", - ".avi", - ".mov", - ".flv", - ".wmv", - ".m4v", - ".mp3", - ".flac", - ".wav", - ".aac", - ".ogg", - ".m4a", - ".wma", - ".jpg", - ".jpeg", - ".png", - ".gif", - ".webp", - ".bmp", - ".tiff", - ".pdf", - ".epub", - ".txt", - ".docx", - ".doc", -} - -# ============================================================================ -# SIDECAR FILE HANDLING -# ============================================================================ - - -def read_sidecar(sidecar_path: Path) -> Tuple[Optional[str], List[str], List[str]]: - """Read metadata from a sidecar file. - - Delegates to metadata._read_sidecar_metadata for centralized handling. - - Args: - sidecar_path: Path to .tag sidecar file - - Returns: - Tuple of (hash_value, tags_list, url_list) - Returns (None, [], []) if file doesn't exist or can't be read - """ - if _read_sidecar_metadata is None: - return None, [], [] - - try: - return _read_sidecar_metadata(sidecar_path) - except Exception: - return None, [], [] - - -def write_sidecar( - media_path: Path, - tags: List[str], - url: List[str], - hash_value: Optional[str] = None -) -> bool: - """Write metadata to a sidecar file. - - Delegates to metadata.write_tags for centralized handling. - - Args: - media_path: Path to the media file (sidecar created as media_path.tag) - tags: List of tag strings - url: List of known URL strings - hash_value: Optional SHA256 hash to include - - Returns: - True if successful, False otherwise - """ - if write_tags is None: - return False - - if media_path.exists() and media_path.is_dir(): - return False - - try: - write_tags(media_path, tags, url, hash_value) - return True - except Exception: - return False - - -def find_sidecar(media_path: Path) -> Optional[Path]: - """Find the sidecar file for a media path. - - Uses metadata._derive_sidecar_path for centralized handling. - - Args: - media_path: Path to media file - - Returns: - Path to existing sidecar file, or None if not found - """ - if media_path.is_dir(): - return None - - if _derive_sidecar_path is None: - return None - - try: - # Check for new format: filename.ext.tag - sidecar_path = _derive_sidecar_path(media_path) - if sidecar_path.exists(): - return sidecar_path - except OSError: - pass - - return None - - -def has_sidecar(media_path: Path) -> bool: - """Check if a media file has a sidecar.""" - return find_sidecar(media_path) is not None - - -class API_folder_store: - """SQLite database for caching local library metadata.""" - - DB_NAME = "medios-macina.db" - SCHEMA_VERSION = 4 - # Global lock across all instances to prevent 'database is locked' during concurrent operations. - _shared_db_lock = RLock() - - def __init__(self, library_root: Path): - """Initialize the database at the library root. - - Args: - library_root: Path to the local library root directory - """ - self.library_root = expand_path(library_root).resolve() - self.db_path = self.library_root / self.DB_NAME - self.connection: sqlite3.Connection = None # type: ignore - # Use the shared lock - self._db_lock = self._shared_db_lock - mm_debug(f"[folder-db] init: root={self.library_root} db={self.db_path}") - self._init_db() - - @contextmanager - def _with_db_lock(self, *, timeout: float = 8.0): - """Acquire the shared DB lock with a bounded wait to avoid indefinite stalls.""" - locked = False - try: - locked = self._db_lock.acquire(timeout=timeout) - if not locked: - mm_debug(f"[folder-db] lock acquisition timed out after {timeout:.1f}s; proceeding unlocked") - except Exception as exc: - locked = False - mm_debug(f"[folder-db] lock acquisition failed ({exc}); proceeding unlocked") - try: - yield - finally: - if locked: - try: - self._db_lock.release() - except RuntimeError: - pass - - def _normalize_input_path(self, file_path: Path) -> Path: - p = expand_path(file_path).resolve() - # If the path is relative to the current working directory, we check if it's meant to be in the library_root. - # However, because we call .resolve() above, it's already absolute relative to CWD if it was relative. - # But we want it to be absolute relative to library_root if it's not absolute or if it exists in library_root. - - # If it's already under library_root, we are done. - try: - p.relative_to(self.library_root) - return p - except ValueError: - pass - - # If it was a relative path (unresolved), we should have joined it before resolving. - # Let's re-expand without resolve to check if it's absolute. - raw_p = expand_path(file_path) - if not raw_p.is_absolute(): - return (self.library_root / raw_p).resolve() - - return p - - def _to_db_file_path(self, file_path: Path) -> str: - """Convert an on-disk file path to a DB-stored relative path (POSIX separators).""" - p = self._normalize_input_path(file_path) - p_abs = p.resolve() - root_abs = self.library_root.resolve() - rel = p_abs.relative_to(root_abs) - rel_posix = PurePosixPath(*rel.parts).as_posix() - rel_posix = str(rel_posix or "").strip() - if not rel_posix or rel_posix == ".": - raise ValueError(f"Invalid relative path for DB storage: {file_path}") - return rel_posix - - def _from_db_file_path(self, db_file_path: str) -> Path: - """Convert a DB-stored relative path (POSIX separators) into an absolute path.""" - rel_str = str(db_file_path or "").strip() - if not rel_str: - raise ValueError("Missing DB file_path") - rel_parts = PurePosixPath(rel_str).parts - return self.library_root / Path(*rel_parts) - - def _init_db(self) -> None: - """Initialize database connection and create tables if needed.""" - with self._with_db_lock(): - try: - mm_debug(f"[folder-db] opening sqlite db: {self.db_path}") - # Ensure the library root exists; sqlite cannot create parent dirs. - try: - # User safety: Folder store must be created in a blank folder/no files in it. - # If the DB already exists, we skip this check (it's an existing library). - should_check_empty = not self.db_path.exists() - - self.library_root.mkdir(parents=True, exist_ok=True) - - if should_check_empty: - # Check if there are any files or directories in the library root (excluding the DB itself if it was just created) - existing_items = [item for item in self.library_root.iterdir() if item.name != self.DB_NAME] - - # Allow an empty 'incoming' directory created by upload flow to exist - # (this prevents a false-positive safety check when an upload endpoint - # creates the incoming dir before DB initialization). - if existing_items: - if len(existing_items) == 1 and existing_items[0].name == "incoming" and existing_items[0].is_dir(): - try: - # If the incoming directory is empty, treat it as harmless. - if not any(existing_items[0].iterdir()): - existing_items = [] - except Exception: - # If we can't inspect it safely, leave the original items in place - pass - - if existing_items: - # Log the items found for debugging - item_names = [i.name for i in existing_items[:5]] - if len(existing_items) > 5: - item_names.append("...") - raise RuntimeError( - f"Safety Check Failed: Local library root must be empty for new stores.\n" - f"Directory: {self.library_root}\n" - f"Found {len(existing_items)} items: {', '.join(item_names)}\n" - f"Please use a clean directory to prevent accidental hashing of existing files." - ) - - except RuntimeError: - # Re-raise our specific safety error - raise - except Exception as exc: - raise RuntimeError( - f"Cannot create/open library root directory: {self.library_root}: {exc}" - ) from exc - - # Use check_same_thread=False to allow multi-threaded access - # This is safe because we're not sharing connections across threads; - # each thread will get its own cursor - # Set a generous timeout to avoid "database is locked" errors during heavy concurrency - self.connection = sqlite3.connect( - str(self.db_path), - check_same_thread=False, - timeout=20.0 - ) - self.connection.row_factory = sqlite3.Row - mm_debug(f"[folder-db] sqlite connection opened: {self.db_path}") - - # Ensure busy_timeout is set immediately for all subsequent ops (including pragmas) - try: - self.connection.execute("PRAGMA busy_timeout = 20000") - except Exception: - pass - - # Performance & Size Optimizations - # 1. WAL mode for better concurrency and fewer locks - self.connection.execute("PRAGMA journal_mode=WAL") - # 2. auto_vacuum=FULL to automatically reclaim space from deleted rows/logs - try: - self.connection.execute("PRAGMA auto_vacuum = FULL") - except sqlite3.OperationalError as exc: - if "locked" not in str(exc).lower(): - raise - logger.warning( - "Database locked; skipping PRAGMA auto_vacuum setup for this session." - ) - # 3. Increase page size for modern file systems - self.connection.execute("PRAGMA page_size = 4096") - # 4. Memory and Sync optimizations - self.connection.execute("PRAGMA synchronous = NORMAL") - self.connection.execute("PRAGMA temp_store = MEMORY") - self.connection.execute("PRAGMA cache_size = -2000") - # Use memory mapping for the entire DB (up to 30MB) for near-instant reads - self.connection.execute("PRAGMA mmap_size = 30000000") - # 5. Standard features - self.connection.execute("PRAGMA foreign_keys = ON") - - # Bound how long sqlite will wait on locks before raising. - # Already set to 20000 above, no need to lower it. - # try: - # self.connection.execute("PRAGMA busy_timeout = 5000") - # except Exception: - # pass - - self._create_tables() - - # Run maintenance if the DB has grown suspiciously large - self._run_maintenance_if_needed() - - logger.info(f"Database initialized at {self.db_path}") - except Exception as e: - logger.error(f"Failed to initialize database: {e}", exc_info=True) - if self.connection: - try: - self.connection.close() - except Exception: - pass - self.connection = None - raise - - def _run_maintenance_if_needed(self) -> None: - """Perform a one-time VACUUM if the database file is large.""" - try: - if not self.db_path.exists(): - return - - # Global cleanup of old workers and logs regardless of size - self._global_cleanup() - - # If the database is larger than 64MB, check if a vacuum is worth the time. - # We only do this check on startup to minimize performance impact. - file_stats = self.db_path.stat() - size_mb = file_stats.st_size / (1024 * 1024) - - if size_mb > 64: - # Check fragmentation (freelist count) - try: - freelist = self.connection.execute("PRAGMA freelist_count").fetchone()[0] - page_size = self.connection.execute("PRAGMA page_size").fetchone()[0] - free_mb = (freelist * page_size) / (1024 * 1024) - - # If more than 25% or 10MB of the file is free space, it's worth a VACUUM. - if free_mb > 10 or (free_mb / size_mb) > 0.25: - logger.debug(f"Database size ({size_mb:.1f}MB) has {free_mb:.1f}MB free. Vacuuming...") - self.connection.execute("VACUUM") - # Also optimize the query planner indices - self.connection.execute("ANALYZE") - - new_size_mb = self.db_path.stat().st_size / (1024 * 1024) - reduction = size_mb - new_size_mb - if reduction > 1.0: - logger.info(f"Maintenance reclaimed {reduction:.1f}MB. Current size: {new_size_mb:.1f}MB") - except Exception as inner_e: - logger.debug(f"Refined maintenance check failed: {inner_e}") - # Fallback to simple size threshold if PRAGMA fails - if size_mb > 128: - self.connection.execute("VACUUM") - except Exception as e: - # Maintenance should never block application startup - logger.warning(f"Database maintenance skipped: {e}") - - def _global_cleanup(self) -> None: - """Aggressively prune old workers and logs to prevent database bloat.""" - try: - cursor = self.connection.cursor() - - # 1. Prune finished/failed workers older than MAX_FINISHED_WORKERS - # We keep the newest ones based on completed_at or started_at - cursor.execute( - """ - DELETE FROM worker - WHERE status != 'running' - AND id NOT IN ( - SELECT id FROM worker - WHERE status != 'running' - ORDER BY COALESCE(completed_at, started_at) DESC - LIMIT ? - ) - """, - (MAX_FINISHED_WORKERS,) - ) - worker_deletes = cursor.rowcount - - # 2. Orphans check: Remove logs that no longer have a parent worker - cursor.execute( - "DELETE FROM worker_log WHERE worker_id NOT IN (SELECT worker_id FROM worker)" - ) - log_orphans = cursor.rowcount - - # 3. Global log limit: Ensure we don't have millions of log rows even if workers are within limit - # Limit total log entries to something reasonable like 5,000 - cursor.execute( - """ - DELETE FROM worker_log - WHERE id NOT IN ( - SELECT id FROM worker_log - ORDER BY created_at DESC - LIMIT 5000 - ) - """ - ) - log_limit_deletes = cursor.rowcount - - if worker_deletes > 0 or log_orphans > 0 or log_limit_deletes > 0: - logger.info(f"Global cleanup: Removed {worker_deletes} workers and {log_orphans + log_limit_deletes} log entries.") - self.connection.commit() - - except Exception as e: - logger.warning(f"Global cleanup failed: {e}") - - def _create_tables(self) -> None: - """Create database tables if they don't exist.""" - cursor = self.connection.cursor() - - cursor.execute( - """ - CREATE TABLE IF NOT EXISTS file ( - hash TEXT PRIMARY KEY NOT NULL, - file_path TEXT UNIQUE NOT NULL, - file_modified REAL, - indexed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP - ) - """ - ) - - cursor.execute( - """ - CREATE TABLE IF NOT EXISTS metadata ( - hash TEXT PRIMARY KEY NOT NULL, - url TEXT, - relationships TEXT, - duration REAL, - size INTEGER, - ext TEXT, - type TEXT, - time_imported TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - time_modified TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - FOREIGN KEY (hash) REFERENCES file(hash) ON DELETE CASCADE - ) - """ - ) - - cursor.execute( - """ - CREATE TABLE IF NOT EXISTS tag ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - hash TEXT NOT NULL, - tag TEXT NOT NULL, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - FOREIGN KEY (hash) REFERENCES file(hash) ON DELETE CASCADE, - UNIQUE(hash, tag) - ) - """ - ) - - cursor.execute( - """ - CREATE TABLE IF NOT EXISTS note ( - hash TEXT NOT NULL, - name TEXT NOT NULL, - note TEXT, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - FOREIGN KEY (hash) REFERENCES file(hash) ON DELETE CASCADE, - PRIMARY KEY (hash, name) - ) - """ - ) - - cursor.execute( - """ - CREATE TABLE IF NOT EXISTS playlist ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - name TEXT UNIQUE NOT NULL, - items TEXT NOT NULL, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP - ) - """ - ) - - # Worker tracking tables (drop legacy workers table if still present) - self._ensure_worker_tables(cursor) - - # Create indices for performance - cursor.execute("CREATE INDEX IF NOT EXISTS idx_file_path ON file(file_path)") - cursor.execute("CREATE INDEX IF NOT EXISTS idx_tag_hash ON tag(hash)") - cursor.execute("CREATE INDEX IF NOT EXISTS idx_tag_tag ON tag(tag)") - cursor.execute("CREATE INDEX IF NOT EXISTS idx_metadata_ext ON metadata(ext)") - cursor.execute("CREATE INDEX IF NOT EXISTS idx_worker_id ON worker(worker_id)") - cursor.execute("CREATE INDEX IF NOT EXISTS idx_worker_status ON worker(status)") - cursor.execute( - "CREATE INDEX IF NOT EXISTS idx_worker_type ON worker(worker_type)" - ) - - # Notes indices (after migration so columns exist) - cursor.execute("CREATE INDEX IF NOT EXISTS idx_note_hash ON note(hash)") - cursor.execute("CREATE INDEX IF NOT EXISTS idx_note_name ON note(name)") - - # Additional optimizations for search speed - # Covering index for tags helps query 'tags for hash' without hitting the table - cursor.execute("CREATE INDEX IF NOT EXISTS idx_tag_covering ON tag(hash, tag)") - - # Index on metadata size and imports for common sorting - cursor.execute("CREATE INDEX IF NOT EXISTS idx_metadata_size ON metadata(size)") - cursor.execute("CREATE INDEX IF NOT EXISTS idx_metadata_imported ON metadata(time_imported)") - - self.connection.commit() - logger.debug("Database tables created/verified") - - def _ensure_worker_tables(self, cursor) -> None: - """Ensure the modern worker tables exist, dropping legacy ones if needed.""" - cursor.execute( - "SELECT name FROM sqlite_master WHERE type='table' AND name='worker'" - ) - has_worker = cursor.fetchone() is not None - if not has_worker: - cursor.execute("DROP TABLE IF EXISTS workers") - cursor.execute( - """ - CREATE TABLE worker ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - worker_id TEXT UNIQUE NOT NULL, - worker_type TEXT NOT NULL, - pipe TEXT, - status TEXT DEFAULT 'running', - title TEXT, - description TEXT, - progress REAL DEFAULT 0.0, - current_step TEXT, - total_steps INTEGER DEFAULT 0, - error_message TEXT, - result_data TEXT, - stdout TEXT DEFAULT '', - steps TEXT DEFAULT '', - started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - completed_at TIMESTAMP, - last_stdout_at TIMESTAMP, - last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP - ) - """ - ) - else: - self._ensure_worker_columns(cursor) - - cursor.execute( - """ - CREATE TABLE IF NOT EXISTS worker_log ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - worker_id TEXT NOT NULL, - event_type TEXT NOT NULL, - step TEXT, - channel TEXT, - message TEXT, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - FOREIGN KEY(worker_id) REFERENCES worker(worker_id) ON DELETE CASCADE - ) - """ - ) - cursor.execute( - "CREATE INDEX IF NOT EXISTS idx_worker_log_worker_id ON worker_log(worker_id)" - ) - - def _ensure_worker_columns(self, cursor) -> None: - """Backfill columns for older worker tables during upgrade.""" - try: - cursor.execute("PRAGMA table_info(worker)") - existing_columns = {row[1] - for row in cursor.fetchall()} - except Exception as exc: - logger.error(f"Error introspecting worker table: {exc}") - return - column_specs = { - "pipe": "TEXT", - "progress": "REAL DEFAULT 0.0", - "current_step": "TEXT", - "total_steps": "INTEGER DEFAULT 0", - "stdout": "TEXT DEFAULT ''", - "steps": "TEXT DEFAULT ''", - "last_stdout_at": "TIMESTAMP", - } - for col_name, ddl in column_specs.items(): - if col_name not in existing_columns: - try: - cursor.execute(f"ALTER TABLE worker ADD COLUMN {col_name} {ddl}") - logger.info(f"Added '{col_name}' column to worker table") - except Exception as exc: - logger.warning( - f"Could not add '{col_name}' column to worker table: {exc}" - ) - - def _insert_worker_log_entry( - self, - cursor, - worker_id: str, - event_type: str, - message: str, - step: Optional[str] = None, - channel: Optional[str] = None, - ) -> None: - if not message: - return - cursor.execute( - """ - INSERT INTO worker_log (worker_id, event_type, step, channel, message) - VALUES (?, ?, ?, ?, ?) - """, - (worker_id, - event_type, - step, - channel, - message), - ) - self._prune_worker_log_entries(cursor, worker_id) - - def _prune_worker_log_entries(self, cursor, worker_id: str) -> None: - """Keep at most WORKER_LOG_MAX_ENTRIES rows per worker by trimming oldest ones.""" - if WORKER_LOG_MAX_ENTRIES <= 0: - return - cursor.execute( - """ - SELECT id FROM worker_log - WHERE worker_id = ? - ORDER BY id DESC - LIMIT 1 OFFSET ? - """, - (worker_id, - WORKER_LOG_MAX_ENTRIES - 1), - ) - row = cursor.fetchone() - if not row: - return - cutoff_id = row[0] - cursor.execute( - "DELETE FROM worker_log WHERE worker_id = ? AND id < ?", - (worker_id, - cutoff_id), - ) - - def get_worker_events(self, - worker_id: str, - limit: int = 500) -> List[Dict[str, - Any]]: - """Return chronological worker log events for timelines.""" - try: - cursor = self.connection.cursor() - cursor.execute( - """ - SELECT id, event_type, step, channel, message, created_at - FROM worker_log - WHERE worker_id = ? - ORDER BY id ASC - LIMIT ? - """, - (worker_id, - limit), - ) - return [dict(row) for row in cursor.fetchall()] - except Exception as exc: - logger.error( - f"Error retrieving worker events for {worker_id}: {exc}", - exc_info=True - ) - return [] - - def clear_worker_events( - self, - worker_id: str, - event_type: Optional[str] = None - ) -> None: - """Remove worker log entries, optionally filtered by event type.""" - try: - cursor = self.connection.cursor() - if event_type: - cursor.execute( - "DELETE FROM worker_log WHERE worker_id = ? AND event_type = ?", - (worker_id, - event_type), - ) - else: - cursor.execute( - "DELETE FROM worker_log WHERE worker_id = ?", - (worker_id, - ) - ) - self.connection.commit() - except Exception as exc: - logger.error( - f"Error clearing worker log for {worker_id}: {exc}", - exc_info=True - ) - - @_db_retry() - def _update_metadata_modified_time(self, file_hash: str) -> None: - """Update the time_modified timestamp for a file's metadata.""" - with self._with_db_lock(): - try: - cursor = self.connection.cursor() - cursor.execute( - """ - UPDATE metadata SET time_modified = CURRENT_TIMESTAMP WHERE hash = ? - """, - (file_hash, - ), - ) - self.connection.commit() - except Exception as e: - logger.debug( - f"Could not update metadata modified time for hash {file_hash}: {e}" - ) - - def get_or_create_file_entry( - self, - file_path: Path, - file_hash: Optional[str] = None - ) -> str: - """Get or create a file entry in the database and return the hash. - - Args: - file_path: Path to the file - file_hash: Optional hash (will be computed if not provided) - - Returns: - The file hash (primary key) - """ - abs_path = self._normalize_input_path(file_path) - db_path = self._to_db_file_path(abs_path) - logger.debug(f"[get_or_create_file_entry] Looking up: {db_path}") - mm_debug(f"[folder-db] get_or_create_file_entry start: {db_path}") - - # If hash not provided, compute it - if not file_hash: - file_hash = sha256_file(abs_path) - logger.debug(f"[get_or_create_file_entry] Computed hash: {file_hash}") - mm_debug(f"[folder-db] computed hash: {file_hash}") - - # Retry loop for transient 'database is locked' errors - import time - max_attempts = 6 - attempt = 0 - while True: - try: - with self._with_db_lock(): - cursor = self.connection.cursor() - - mm_debug("[folder-db] SELECT files by file_path") - - # Prefer existing entry by path (file_path is UNIQUE in schema). - cursor.execute( - "SELECT hash FROM file WHERE file_path = ?", - (db_path,), - ) - row = cursor.fetchone() - if row and row[0]: - existing_hash = str(row[0]) - return existing_hash - - # Check if file entry exists - mm_debug("[folder-db] SELECT files by hash") - cursor.execute( - "SELECT hash FROM file WHERE hash = ?", - (file_hash,), - ) - row = cursor.fetchone() - - if row: - return file_hash - - mm_debug(f"[folder-db] INSERT file (in_tx={self.connection.in_transaction})") - stat = abs_path.stat() - try: - cursor.execute( - """ - INSERT INTO file (hash, file_path, file_modified) - VALUES (?, ?, ?) - """, - (file_hash, db_path, stat.st_mtime), - ) - mm_debug("[folder-db] INSERT file done") - except sqlite3.IntegrityError: - # Most likely: UNIQUE constraint on file_path. Re-fetch and return. - mm_debug("[folder-db] UNIQUE(file_path) hit; re-fetch") - cursor.execute( - "SELECT hash FROM file WHERE file_path = ?", - (db_path,), - ) - row2 = cursor.fetchone() - if row2 and row2[0]: - existing_hash = str(row2[0]) - return existing_hash - raise - - # Auto-create title tag - filename_without_ext = abs_path.stem - if filename_without_ext: - # Normalize underscores to spaces for consistency - title_value = filename_without_ext.replace("_", " ").strip() - title_tag = f"title:{title_value}" - mm_debug("[folder-db] INSERT title tag") - cursor.execute( - """ - INSERT OR IGNORE INTO tag (hash, tag) - VALUES (?, ?) - """, - (file_hash, title_tag), - ) - - mm_debug("[folder-db] COMMIT (file/tag)") - self.connection.commit() - mm_debug(f"[folder-db] get_or_create_file_entry done: {file_hash}") - return file_hash - - except sqlite3.OperationalError as e: - # Retry on transient locks - msg = str(e or "").lower() - if "database is locked" in msg and attempt < max_attempts: - attempt += 1 - sleep_time = min(0.5 * (2 ** (attempt - 1)), 5.0) - logger.debug(f"[get_or_create_file_entry] Database locked; retrying in {sleep_time:.2f}s (attempt {attempt}/{max_attempts})") - mm_debug(f"[folder-db] LOCKED (get_or_create); sleep {sleep_time:.2f}s ({attempt}/{max_attempts})") - time.sleep(sleep_time) - continue - logger.error(f"[get_or_create_file_entry] OperationalError: {e}", exc_info=True) - raise - - except Exception as e: - logger.error( - f"[get_or_create_file_entry] ❌ Error getting/creating file entry for {file_path}: {e}", - exc_info=True, - ) - raise - - def get_file_hash(self, file_path: Path) -> Optional[str]: - """Get the file hash for a file path, or None if not found.""" - try: - abs_path = self._normalize_input_path(file_path) - str_path = self._to_db_file_path(abs_path) - cursor = self.connection.cursor() - cursor.execute("SELECT hash FROM file WHERE file_path = ?", - (str_path, - )) - row = cursor.fetchone() - return row[0] if row else None - except Exception as e: - logger.error(f"Error getting file hash for {file_path}: {e}", exc_info=True) - return None - - def get_metadata(self, file_hash: str) -> Optional[Dict[str, Any]]: - """Get metadata for a file by hash.""" - max_attempts = 5 - attempt = 0 - while True: - try: - with self._with_db_lock(): - cursor = self.connection.cursor() - - cursor.execute( - """ - SELECT m.* FROM metadata m - WHERE m.hash = ? - """, - (file_hash, - ), - ) - - row = cursor.fetchone() - if not row: - return None - - metadata = dict(row) - - # Parse JSON fields - for field in ["url", "relationships"]: - if metadata.get(field): - try: - metadata[field] = json.loads(metadata[field]) - except (json.JSONDecodeError, TypeError): - metadata[field] = [] if field == "url" else {} - - # Ensure relationships is always a dict - if metadata.get("relationships") is None: - metadata["relationships"] = {} - if not isinstance(metadata.get("relationships"), dict): - metadata["relationships"] = {} - - return metadata - except sqlite3.OperationalError as e: - msg = str(e or "").lower() - if "database is locked" in msg and attempt < max_attempts: - attempt += 1 - sleep_time = min(0.1 * (2 ** (attempt - 1)), 1.0) - time.sleep(sleep_time) - continue - logger.error( - f"Error getting metadata for hash {file_hash}: {e}", - exc_info=True - ) - return None - except Exception as e: - logger.error( - f"Error getting metadata for hash {file_hash}: {e}", - exc_info=True - ) - return None - - def set_relationship_by_hash( - self, - file_hash: str, - related_file_hash: str, - rel_type: str = "alt", - *, - bidirectional: bool = True, - ) -> None: - """Set a relationship between two files by hash. - - This is the store/hash-first API. It avoids any dependency on local filesystem - paths and only requires that both hashes exist in the DB. - """ - try: - file_hash = str(file_hash or "").strip().lower() - related_file_hash = str(related_file_hash or "").strip().lower() - rel_type = str(rel_type or "alt").strip() or "alt" - - if not file_hash or not related_file_hash: - raise ValueError("Missing file hash for relationship") - if file_hash == related_file_hash: - return - - cursor = self.connection.cursor() - - # Ensure both hashes exist in file table (metadata has FK to file) - cursor.execute("SELECT 1 FROM file WHERE hash = ?", - (file_hash, - )) - if not cursor.fetchone(): - raise ValueError(f"Hash not found in store DB: {file_hash}") - cursor.execute("SELECT 1 FROM file WHERE hash = ?", - (related_file_hash, - )) - if not cursor.fetchone(): - raise ValueError(f"Hash not found in store DB: {related_file_hash}") - - # Load current relationships for the main file - cursor.execute( - "SELECT relationships FROM metadata WHERE hash = ?", - (file_hash, - ) - ) - row = cursor.fetchone() - relationships_str = row[0] if row else None - - try: - relationships = json.loads(relationships_str - ) if relationships_str else {} - except (json.JSONDecodeError, TypeError): - relationships = {} - if not isinstance(relationships, dict): - relationships = {} - - relationships.setdefault(rel_type, []) - if not isinstance(relationships[rel_type], list): - relationships[rel_type] = [] - if related_file_hash not in relationships[rel_type]: - relationships[rel_type].append(related_file_hash) - - cursor.execute( - """ - INSERT INTO metadata (hash, relationships) - VALUES (?, ?) - ON CONFLICT(hash) DO UPDATE SET - relationships = excluded.relationships, - time_modified = CURRENT_TIMESTAMP, - updated_at = CURRENT_TIMESTAMP - """, - (file_hash, - json.dumps(relationships)), - ) - - if bidirectional: - # Update the related file as well - cursor.execute( - "SELECT relationships FROM metadata WHERE hash = ?", - (related_file_hash, - ) - ) - row2 = cursor.fetchone() - relationships_str2 = row2[0] if row2 else None - try: - reverse_relationships = ( - json.loads(relationships_str2) if relationships_str2 else {} - ) - except (json.JSONDecodeError, TypeError): - reverse_relationships = {} - if not isinstance(reverse_relationships, dict): - reverse_relationships = {} - - reverse_relationships.setdefault(rel_type, []) - if not isinstance(reverse_relationships[rel_type], list): - reverse_relationships[rel_type] = [] - if file_hash not in reverse_relationships[rel_type]: - reverse_relationships[rel_type].append(file_hash) - - cursor.execute( - """ - INSERT INTO metadata (hash, relationships) - VALUES (?, ?) - ON CONFLICT(hash) DO UPDATE SET - relationships = excluded.relationships, - time_modified = CURRENT_TIMESTAMP, - updated_at = CURRENT_TIMESTAMP - """, - (related_file_hash, - json.dumps(reverse_relationships)), - ) - - self.connection.commit() - except Exception as e: - logger.error(f"Error setting relationship by hash: {e}", exc_info=True) - raise - - def find_files_pointing_to_hash(self, target_hash: str) -> List[Dict[str, Any]]: - """Find all files that have a relationship pointing to the target hash.""" - try: - target_hash = str(target_hash or "").strip().lower() - if not target_hash: - return [] - - cursor = self.connection.cursor() - - cursor.execute( - """ - SELECT f.hash, f.file_path, m.relationships - FROM metadata m - JOIN file f ON m.hash = f.hash - WHERE m.relationships LIKE ? - """, - (f"%{target_hash}%", - ), - ) - - results: List[Dict[str, Any]] = [] - for row in cursor.fetchall(): - src_hash = row[0] - src_path = row[1] - rels_json = row[2] - try: - rels = json.loads(rels_json) if rels_json else {} - except (json.JSONDecodeError, TypeError): - continue - if not isinstance(rels, dict): - continue - for r_type, hashes in rels.items(): - if not isinstance(hashes, list): - continue - if target_hash in [str(h or "").strip().lower() for h in hashes]: - results.append( - { - "hash": src_hash, - "path": src_path, - "type": r_type, - } - ) - return results - except Exception as e: - logger.error( - f"Error finding files pointing to hash {target_hash}: {e}", - exc_info=True - ) - return [] - - @_db_retry() - def save_metadata(self, file_path: Path, metadata: Dict[str, Any]) -> None: - """Save metadata for a file.""" - try: - abs_path = self._normalize_input_path(file_path) - db_path = self._to_db_file_path(abs_path) - logger.debug(f"[save_metadata] Starting save for: {db_path}") - mm_debug(f"[folder-db] save_metadata start: {db_path}") - file_hash = self.get_or_create_file_entry(abs_path, metadata.get("hash")) - logger.debug(f"[save_metadata] Got/created file_hash: {file_hash}") - mm_debug(f"[folder-db] save_metadata file_hash: {file_hash}") - - url = metadata.get("url", []) - if not isinstance(url, str): - url = json.dumps(url) - - relationships = metadata.get("relationships", []) - if not isinstance(relationships, str): - relationships = json.dumps(relationships) - - mm_debug("[folder-db] UPSERT metadata") - - # Determine type from ext if not provided - file_type = metadata.get("type") - ext = metadata.get("ext") - if not file_type and ext: - from SYS.utils_constant import get_type_from_ext - - file_type = get_type_from_ext(str(ext)) - - with self._with_db_lock(): - cursor = self.connection.cursor() - cursor.execute( - """ - INSERT INTO metadata ( - hash, url, relationships, - duration, size, ext, type, - time_imported, time_modified - ) - VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) - ON CONFLICT(hash) DO UPDATE SET - url = excluded.url, - relationships = excluded.relationships, - duration = excluded.duration, - size = excluded.size, - ext = excluded.ext, - type = excluded.type, - time_modified = CURRENT_TIMESTAMP, - updated_at = CURRENT_TIMESTAMP - """, - ( - file_hash, - url, - relationships, - metadata.get("duration"), - metadata.get("size"), - ext, - file_type, - ), - ) - - mm_debug("[folder-db] COMMIT (metadata)") - self.connection.commit() - mm_debug(f"[folder-db] save_metadata done: {file_hash}") - logger.debug(f"[save_metadata] Committed metadata for hash {file_hash}") - except Exception as e: - logger.error( - f"[save_metadata] ❌ Error saving metadata for {file_path}: {e}", - exc_info=True - ) - raise - - def save_file_info( - self, - file_path: Path, - metadata: Dict[str, - Any], - tags: List[str] - ) -> None: - """Save metadata and tags for a file in a single transaction.""" - with self._with_db_lock(): - try: - abs_path = self._normalize_input_path(file_path) - db_path = self._to_db_file_path(abs_path) - logger.debug(f"[save_file_info] Starting save for: {db_path}") - - file_hash = self.get_or_create_file_entry(abs_path, metadata.get("hash")) - - cursor = self.connection.cursor() - - # 1. Save Metadata - url = metadata.get("url", []) - if not isinstance(url, str): - url = json.dumps(url) - - relationships = metadata.get("relationships", []) - if not isinstance(relationships, str): - relationships = json.dumps(relationships) - - # Determine type from ext if not provided - file_type = metadata.get("type") - ext = metadata.get("ext") - if not file_type and ext: - from SYS.utils_constant import get_type_from_ext - - file_type = get_type_from_ext(str(ext)) - - cursor.execute( - """ - INSERT INTO metadata ( - hash, url, relationships, - duration, size, ext, type, - time_imported, time_modified - ) - VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) - ON CONFLICT(hash) DO UPDATE SET - url = excluded.url, - relationships = excluded.relationships, - duration = excluded.duration, - size = excluded.size, - ext = excluded.ext, - type = excluded.type, - time_modified = CURRENT_TIMESTAMP, - updated_at = CURRENT_TIMESTAMP - """, - ( - file_hash, - url, - relationships, - metadata.get("duration"), - metadata.get("size"), - ext, - file_type, - ), - ) - - # 2. Save Tags - # We assume tags list is complete and includes title if needed - cursor.execute("DELETE FROM tag WHERE hash = ?", (file_hash, )) - - for tag in tags: - tag = tag.strip() - if tag: - cursor.execute( - """ - INSERT OR IGNORE INTO tag (hash, tag) - VALUES (?, ?) - """, - (file_hash, tag), - ) - - self.connection.commit() - logger.debug( - f"[save_file_info] Committed metadata and tags for hash {file_hash}" - ) - - except Exception as e: - logger.error( - f"[save_file_info] ❌ Error saving file info for {file_path}: {e}", - exc_info=True) - raise - - def get_tags(self, file_hash: str) -> List[str]: - """Get all tags for a file by hash.""" - max_attempts = 5 - attempt = 0 - while True: - try: - with self._with_db_lock(): - cursor = self.connection.cursor() - - cursor.execute( - """ - SELECT t.tag FROM tag t - WHERE t.hash = ? - ORDER BY t.tag - """, - (file_hash, - ), - ) - - return [row[0] for row in cursor.fetchall()] - except sqlite3.OperationalError as e: - msg = str(e or "").lower() - if "database is locked" in msg and attempt < max_attempts: - attempt += 1 - sleep_time = min(0.1 * (2 ** (attempt - 1)), 1.0) - time.sleep(sleep_time) - continue - logger.error(f"Error getting tags for hash {file_hash}: {e}", exc_info=True) - return [] - except Exception as e: - logger.error(f"Error getting tags for hash {file_hash}: {e}", exc_info=True) - return [] - - @_db_retry() - def save_tags(self, file_path: Path, tags: List[str]) -> None: - """Save tags for a file, replacing all existing tags.""" - try: - abs_path = self._normalize_input_path(file_path) - db_path = self._to_db_file_path(abs_path) - logger.debug(f"[save_tags] Starting save for: {db_path}") - - file_hash = self.get_or_create_file_entry(abs_path) - logger.debug(f"[save_tags] Got/created file_hash: {file_hash}") - - cursor = self.connection.cursor() - - cursor.execute( - """ - SELECT tag FROM tag WHERE hash = ? AND tag LIKE 'title:%' - """, - (file_hash, - ), - ) - existing_title = cursor.fetchone() - - cursor.execute("DELETE FROM tag WHERE hash = ?", - (file_hash, - )) - logger.debug(f"[save_tags] Deleted existing tags for hash {file_hash}") - - # Check if new tags provide a title - new_title_provided = any( - str(t).strip().lower().startswith("title:") for t in tags - ) - - if existing_title and not new_title_provided: - cursor.execute( - """ - INSERT INTO tag (hash, tag) VALUES (?, ?) - """, - (file_hash, - existing_title[0]), - ) - logger.debug("[save_tags] Preserved existing title tag") - elif not existing_title and not new_title_provided: - filename_without_ext = abs_path.stem - if filename_without_ext: - # Normalize underscores to spaces for consistency - title_value = filename_without_ext.replace("_", " ").strip() - title_tag = f"title:{title_value}" - cursor.execute( - """ - INSERT INTO tag (hash, tag) VALUES (?, ?) - """, - (file_hash, - title_tag), - ) - logger.debug(f"[save_tags] Created auto-title tag: {title_tag}") - - for tag in tags: - tag = tag.strip() - if tag: - cursor.execute( - """ - INSERT OR IGNORE INTO tag (hash, tag) - VALUES (?, ?) - """, - (file_hash, - tag), - ) - - self.connection.commit() - logger.debug(f"[save_tags] Committed {len(tags)} tags for hash {file_hash}") - - # Verify they were actually saved - cursor.execute("SELECT COUNT(*) FROM tag WHERE hash = ?", - (file_hash, - )) - saved_count = cursor.fetchone()[0] - logger.debug( - f"[save_tags] Verified: {saved_count} tags in database for hash {file_hash}" - ) - - self._update_metadata_modified_time(file_hash) - except Exception as e: - logger.error( - f"[save_tags] ❌ Error saving tags for {file_path}: {e}", - exc_info=True - ) - raise - - @_db_retry() - def add_tags(self, file_path: Path, tags: List[str]) -> None: - """Add tags to a file.""" - with self._with_db_lock(): - try: - file_hash = self.get_or_create_file_entry(file_path) - cursor = self.connection.cursor() - - user_title_tag = next( - ( - tag.strip() - for tag in tags if tag.strip().lower().startswith("title:") - ), - None - ) - - if user_title_tag: - cursor.execute( - """ - DELETE FROM tag WHERE hash = ? AND tag LIKE 'title:%' - """, - (file_hash, - ), - ) - else: - cursor.execute( - """ - SELECT COUNT(*) FROM tag WHERE hash = ? AND tag LIKE 'title:%' - """, - (file_hash, - ), - ) - - has_title = cursor.fetchone()[0] > 0 - if not has_title: - filename_without_ext = file_path.stem - if filename_without_ext: - # Normalize underscores to spaces for consistency - title_value = filename_without_ext.replace("_", " ").strip() - title_tag = f"title:{title_value}" - cursor.execute( - """ - INSERT OR IGNORE INTO tag (hash, tag) - VALUES (?, ?) - """, - (file_hash, - title_tag), - ) - - for tag in tags: - tag = tag.strip() - if tag: - cursor.execute( - """ - INSERT OR IGNORE INTO tag (hash, tag) - VALUES (?, ?) - """, - (file_hash, - tag), - ) - - self.connection.commit() - self._update_metadata_modified_time(file_hash) - logger.debug(f"Added {len(tags)} tags for {file_path}") - except Exception as e: - logger.error(f"Error adding tags for {file_path}: {e}", exc_info=True) - raise - - @_db_retry() - def remove_tags(self, file_path: Path, tags: List[str]) -> None: - """Remove specific tags from a file.""" - with self._with_db_lock(): - try: - file_hash = self.get_or_create_file_entry(file_path) - cursor = self.connection.cursor() - - for tag in tags: - tag = tag.strip() - if tag: - cursor.execute( - """ - DELETE FROM tag - WHERE hash = ? - AND tag = ? - """, - (file_hash, - tag), - ) - - self.connection.commit() - logger.debug(f"Removed {len(tags)} tags for {file_path}") - except Exception as e: - logger.error(f"Error removing tags for {file_path}: {e}", exc_info=True) - raise - - @_db_retry() - def add_tags_to_hash(self, file_hash: str, tags: List[str]) -> None: - """Add tags to a file by hash.""" - with self._with_db_lock(): - try: - cursor = self.connection.cursor() - - user_title_tag = next( - ( - tag.strip() - for tag in tags if tag.strip().lower().startswith("title:") - ), - None - ) - - if user_title_tag: - cursor.execute( - """ - DELETE FROM tag WHERE hash = ? AND tag LIKE 'title:%' - """, - (file_hash, - ), - ) - - for tag in tags: - tag = tag.strip() - if tag: - cursor.execute( - """ - INSERT OR IGNORE INTO tag (hash, tag) - VALUES (?, ?) - """, - (file_hash, - tag), - ) - - self.connection.commit() - self._update_metadata_modified_time(file_hash) - logger.debug(f"Added {len(tags)} tags for hash {file_hash}") - except Exception as e: - logger.error(f"Error adding tags for hash {file_hash}: {e}", exc_info=True) - raise - - @_db_retry() - def remove_tags_from_hash(self, file_hash: str, tags: List[str]) -> None: - """Remove specific tags from a file by hash.""" - with self._with_db_lock(): - try: - cursor = self.connection.cursor() - - for tag in tags: - tag = tag.strip() - if tag: - cursor.execute( - """ - DELETE FROM tag - WHERE hash = ? - AND tag = ? - """, - (file_hash, - tag), - ) - - self.connection.commit() - logger.debug(f"Removed {len(tags)} tags for hash {file_hash}") - except Exception as e: - logger.error( - f"Error removing tags for hash {file_hash}: {e}", - exc_info=True - ) - raise - - @_db_retry() - def update_metadata_by_hash( - self, - file_hash: str, - metadata_updates: Dict[str, - Any] - ) -> None: - """Update metadata for a file by hash.""" - with self._with_db_lock(): - try: - cursor = self.connection.cursor() - - fields = [] - values = [] - - for key, value in metadata_updates.items(): - if key in ["url", "relationships"]: - if not isinstance(value, str): - value = json.dumps(value) - fields.append(f"{key} = ?") - values.append(value) - - if not fields: - return - - # Ensure a metadata row exists so updates don't silently no-op. - # This can happen for older DBs or entries created without explicit metadata. - cursor.execute( - "INSERT OR IGNORE INTO metadata (hash) VALUES (?)", - (file_hash, - ), - ) - - values.append(file_hash) - - sql = f"UPDATE metadata SET {', '.join(fields)}, time_modified = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP WHERE hash = ?" - cursor.execute(sql, values) - self.connection.commit() - logger.debug(f"Updated metadata for hash {file_hash}") - except Exception as e: - logger.error( - f"Error updating metadata for hash {file_hash}: {e}", - exc_info=True - ) - raise - - def set_relationship( - self, - file_path: Path, - related_file_path: Path, - rel_type: str = "alt", - *, - bidirectional: bool = True, - ) -> None: - """Set a relationship between two local files. - - Args: - file_path: Path to the file being related - related_file_path: Path to the related file - rel_type: Type of relationship ('king', 'alt', 'related') - """ - with self._with_db_lock(): - try: - str_path = str(file_path.resolve()) - str_related_path = str(related_file_path.resolve()) - - file_hash = self.get_or_create_file_entry(file_path) - related_file_hash = self.get_or_create_file_entry(related_file_path) - - cursor = self.connection.cursor() - - # Get current relationships for the main file - cursor.execute( - """ - SELECT relationships FROM metadata WHERE hash = ? - """, - (file_hash, - ), - ) - - row = cursor.fetchone() - # Use index access to be safe regardless of row_factory - relationships_str = row[0] if row else None - - try: - if relationships_str: - relationships = json.loads(relationships_str) - else: - relationships = {} - except (json.JSONDecodeError, TypeError): - relationships = {} - - # Ensure relationships is a dict (handle case where DB has a list) - if not isinstance(relationships, dict): - relationships = {} - - # Ensure rel_type key exists - if rel_type not in relationships: - relationships[rel_type] = [] - - # Add the relationship (store as hash string) - if related_file_hash not in relationships[rel_type]: - relationships[rel_type].append(related_file_hash) - - # Save the updated relationships for the main file - cursor.execute( - """ - INSERT INTO metadata (hash, relationships) - VALUES (?, ?) - ON CONFLICT(hash) DO UPDATE SET - relationships = excluded.relationships, - time_modified = CURRENT_TIMESTAMP - """, - (file_hash, - json.dumps(relationships)), - ) - - logger.debug( - f"Set {rel_type} relationship: {str_path} ({file_hash}) -> {str_related_path} ({related_file_hash})" - ) - - if bidirectional: - # Set reverse relationship (bidirectional) - # For 'alt' and 'related', the reverse is the same - # For 'king', the reverse is 'subject' (or we just use 'alt' for simplicity as Hydrus does) - # Let's use the same type for now to keep it simple and consistent with Hydrus 'alternates' - reverse_type = rel_type - - # Update the related file - cursor.execute( - """ - SELECT relationships FROM metadata WHERE hash = ? - """, - (related_file_hash, - ), - ) - - row = cursor.fetchone() - relationships_str = row[0] if row else None - - try: - if relationships_str: - reverse_relationships = json.loads(relationships_str) - else: - reverse_relationships = {} - except (json.JSONDecodeError, TypeError): - reverse_relationships = {} - - if not isinstance(reverse_relationships, dict): - reverse_relationships = {} - - if reverse_type not in reverse_relationships: - reverse_relationships[reverse_type] = [] - - if file_hash not in reverse_relationships[reverse_type]: - reverse_relationships[reverse_type].append(file_hash) - - # Save the updated reverse relationships - cursor.execute( - """ - INSERT INTO metadata (hash, relationships) - VALUES (?, ?) - ON CONFLICT(hash) DO UPDATE SET - relationships = excluded.relationships, - time_modified = CURRENT_TIMESTAMP - """, - (related_file_hash, - json.dumps(reverse_relationships)), - ) - - self.connection.commit() - else: - self.connection.commit() - - except Exception as e: - logger.error(f"Error setting relationship: {e}", exc_info=True) - raise - - def find_files_pointing_to(self, target_path: Path) -> List[Dict[str, Any]]: - """Find all files that have a relationship pointing to the target path. - - Args: - target_path: The file path to look for in other files' relationships - - Returns: - List of dicts with {path, type} for files pointing to target - """ - try: - # Prefer the DB's stored identity hash for the target. - target_hash = None - try: - target_hash = self.get_file_hash(target_path) - except Exception: - target_hash = None - - # Fall back to hashing bytes if the path isn't known to the DB. - if not target_hash: - target_hash = sha256_file(target_path) - - if not target_hash: - logger.warning( - f"Cannot find files pointing to {target_path}: unable to compute hash" - ) - return [] - - return self.find_files_pointing_to_hash(target_hash) - except Exception as e: - logger.error( - f"Error finding files pointing to {target_path}: {e}", - exc_info=True - ) - return [] - - def get_note(self, file_hash: str, name: str = "default") -> Optional[str]: - """Get a named note (default note by default) for a file hash.""" - normalized_hash = str(file_hash or "").strip().lower() - if len(normalized_hash) != 64: - return None - - note_name = str(name or "default").strip() or "default" - max_attempts = 5 - import time - - for attempt in range(max_attempts): - try: - with self._with_db_lock(): - cursor = self.connection.cursor() - cursor.execute( - "SELECT note FROM note WHERE hash = ? AND name = ?", - (normalized_hash, - note_name), - ) - row = cursor.fetchone() - if row: - return row[0] - - if note_name != "default": - return None - - cursor.execute( - "SELECT note FROM note WHERE hash = ? ORDER BY updated_at DESC LIMIT 1", - (normalized_hash, - ), - ) - row = cursor.fetchone() - return row[0] if row else None - except sqlite3.OperationalError as e: - msg = str(e or "").lower() - if "database is locked" in msg and attempt < (max_attempts - 1): - sleep_time = min(0.1 * (2 ** attempt), 1.0) - time.sleep(sleep_time) - continue - logger.error( - f"Error getting note for hash {file_hash}: {e}", - exc_info=True - ) - return None - except Exception as e: - logger.error( - f"Error getting note for hash {file_hash}: {e}", - exc_info=True - ) - return None - return None - - def get_notes(self, file_hash: str) -> Dict[str, str]: - """Return all named notes for a file hash.""" - normalized_hash = str(file_hash or "").strip().lower() - if len(normalized_hash) != 64: - return {} - - notes: Dict[str, str] = {} - max_attempts = 5 - import time - - for attempt in range(max_attempts): - try: - with self._with_db_lock(): - cursor = self.connection.cursor() - cursor.execute( - "SELECT name, note FROM note WHERE hash = ?", - (normalized_hash, - ), - ) - rows = cursor.fetchall() or [] - for row in rows: - note_name = str(row[0] or "").strip() - if not note_name: - continue - notes[note_name] = str(row[1] or "") - return notes - except sqlite3.OperationalError as e: - msg = str(e or "").lower() - if "database is locked" in msg and attempt < (max_attempts - 1): - sleep_time = min(0.1 * (2 ** attempt), 1.0) - time.sleep(sleep_time) - continue - logger.error( - f"Error getting notes for hash {file_hash}: {e}", - exc_info=True - ) - return {} - except Exception as e: - logger.error( - f"Error getting notes for hash {file_hash}: {e}", - exc_info=True - ) - return {} - return {} - - def set_note_by_hash(self, file_hash: str, name: str, note: str) -> None: - """Set a named note using a known file hash (no re-hash).""" - note_name = str(name or "").strip() - normalized_hash = str(file_hash or "").strip().lower() - if not note_name: - raise ValueError("Note name is required") - if len(normalized_hash) != 64: - raise ValueError("File hash must be a 64-character hex string") - - max_attempts = 5 - import time - - for attempt in range(max_attempts): - try: - with self._with_db_lock(): - cursor = self.connection.cursor() - - cursor.execute( - "SELECT 1 FROM file WHERE hash = ?", - (normalized_hash, - ), - ) - exists = cursor.fetchone() is not None - if not exists: - raise ValueError( - f"Hash {normalized_hash} not found in file table" - ) - - cursor.execute( - """ - INSERT INTO note (hash, name, note) - VALUES (?, ?, ?) - ON CONFLICT(hash, name) DO UPDATE SET - note = excluded.note, - updated_at = CURRENT_TIMESTAMP - """, - (normalized_hash, - note_name, - note), - ) - self.connection.commit() - logger.debug( - f"Saved note '{note_name}' for hash {normalized_hash}" - ) - return - except sqlite3.OperationalError as e: - msg = str(e or "").lower() - if "database is locked" in msg and attempt < (max_attempts - 1): - sleep_time = min(0.1 * (2 ** attempt), 1.0) - time.sleep(sleep_time) - continue - logger.error( - f"Error saving note for hash {normalized_hash}: {e}", - exc_info=True - ) - raise - except Exception as e: - logger.error( - f"Error saving note for hash {normalized_hash}: {e}", - exc_info=True - ) - raise - - def set_note(self, file_path: Path, name: str, note: str) -> None: - """Set a named note for a file path, computing hash if needed.""" - note_name = str(name or "").strip() - if not note_name: - raise ValueError("Note name is required") - - try: - file_hash = self.get_or_create_file_entry(file_path) - self.set_note_by_hash(file_hash, note_name, note) - except Exception as e: - logger.error(f"Error saving note for {file_path}: {e}", exc_info=True) - raise - - def save_note(self, file_path: Path, note: str, name: str = "default") -> None: - """Backward-compatible helper to store a note for a file path.""" - self.set_note(file_path, name, note) - - def delete_note(self, file_hash: str, name: str) -> None: - """Delete a named note for a file by hash.""" - with self._with_db_lock(): - try: - note_name = str(name or "").strip() - if not note_name: - raise ValueError("Note name is required") - cursor = self.connection.cursor() - cursor.execute( - "DELETE FROM note WHERE hash = ? AND name = ?", - (file_hash, - note_name), - ) - self.connection.commit() - except Exception as e: - logger.error( - f"Error deleting note '{name}' for hash {file_hash}: {e}", - exc_info=True - ) - raise - - def search_by_tag(self, tag: str, limit: int = 100) -> List[tuple]: - """Search for files with a specific tag. Returns list of (hash, file_path) tuples.""" - try: - cursor = self.connection.cursor() - - cursor.execute( - """ - SELECT DISTINCT f.hash, f.file_path FROM file f - JOIN tag t ON f.hash = t.hash - WHERE t.tag = ? - LIMIT ? - """, - (tag, - limit), - ) - - rows = cursor.fetchall() or [] - results: List[tuple] = [] - for row in rows: - try: - file_hash = str(row[0]) - db_path = str(row[1]) - results.append((file_hash, str(self._from_db_file_path(db_path)))) - except Exception: - continue - return results - except Exception as e: - logger.error(f"Error searching by tag '{tag}': {e}", exc_info=True) - return [] - - def search_hash(self, file_hash: str) -> Optional[Path]: - """Search for a file by hash.""" - try: - with self._with_db_lock(): - cursor = self.connection.cursor() - - cursor.execute( - """ - SELECT file_path FROM file WHERE hash = ? - """, - (file_hash, - ), - ) - - row = cursor.fetchone() - return self._from_db_file_path(row[0]) if row else None - except Exception as e: - logger.error(f"Error searching by hash '{file_hash}': {e}", exc_info=True) - return None - - def update_file_hash(self, file_path: Path, file_hash: str) -> None: - """Deprecated: Hash is managed as primary key. This method is no-op. - - In the new hash-based schema, the file hash is the primary key (immutable). - Use get_or_create_file_entry() to ensure the hash is properly registered. - """ - # This is now a no-op since hash is the immutable primary key - pass - - def rename_file(self, old_path: Path, new_path: Path) -> None: - """Rename a file in the database, preserving all metadata.""" - try: - abs_old = self._normalize_input_path(old_path) - abs_new = self._normalize_input_path(new_path) - str_old_path = self._to_db_file_path(abs_old) - str_new_path = self._to_db_file_path(abs_new) - cursor = self.connection.cursor() - - cursor.execute( - """ - UPDATE file SET file_path = ?, updated_at = CURRENT_TIMESTAMP - WHERE file_path = ? - """, - (str_new_path, - str_old_path), - ) - - self.connection.commit() - logger.debug(f"Renamed file in database: {old_path} → {new_path}") - except Exception as e: - logger.error( - f"Error renaming file from {old_path} to {new_path}: {e}", - exc_info=True - ) - raise - - def cleanup_missing_files(self) -> int: - """Remove entries for files that no longer exist.""" - try: - cursor = self.connection.cursor() - cursor.execute("SELECT hash, file_path FROM file") - - removed_count = 0 - for file_hash, file_path in cursor.fetchall(): - try: - abs_path = self._from_db_file_path(file_path) - except Exception: - abs_path = Path(file_path) - if not abs_path.exists(): - cursor.execute("DELETE FROM file WHERE hash = ?", - (file_hash, - )) - removed_count += 1 - - self.connection.commit() - logger.info(f"Cleaned up {removed_count} missing file entries") - return removed_count - except Exception as e: - logger.error(f"Error cleaning up missing files: {e}", exc_info=True) - return 0 - - def delete_file(self, file_path: Path) -> bool: - """Delete a file from the database by path. - - Cascades to metadata, tags, notes, etc, and also cleans up relationship - backlinks in other files so no file retains dangling references to the - deleted hash. - """ - with self._with_db_lock(): - try: - abs_path = self._normalize_input_path(file_path) - str_path = self._to_db_file_path(abs_path) - cursor = self.connection.cursor() - - # Get the hash first (for logging) - cursor.execute("SELECT hash FROM file WHERE file_path = ?", - (str_path, - )) - row = cursor.fetchone() - if not row: - logger.debug(f"File not found in database: {str_path}") - return False - - file_hash = row[0] - - # Remove backlinks from other files that reference this hash. - try: - target_hash = str(file_hash or "").strip().lower() - backlinks = self.find_files_pointing_to_hash(target_hash) - by_src: Dict[str, - set[str]] = {} - for b in backlinks: - src = str((b or {}).get("hash") or "").strip().lower() - rt = str((b or {}).get("type") or "").strip() - if not src or src == target_hash or not rt: - continue - by_src.setdefault(src, set()).add(rt) - - for src_hash, rel_types in by_src.items(): - meta = self.get_metadata(src_hash) or {} - rels = meta.get("relationships") if isinstance(meta, dict) else None - if not isinstance(rels, dict) or not rels: - continue - - changed = False - for rt in rel_types: - key_to_edit = None - for k in list(rels.keys()): - if str(k).lower() == str(rt).lower(): - key_to_edit = str(k) - break - if not key_to_edit: - continue - - bucket = rels.get(key_to_edit) - if not isinstance(bucket, list) or not bucket: - continue - - new_bucket = [ - h for h in bucket - if str(h or "").strip().lower() != target_hash - ] - if len(new_bucket) == len(bucket): - continue - - changed = True - if new_bucket: - rels[key_to_edit] = new_bucket - else: - try: - del rels[key_to_edit] - except Exception: - rels[key_to_edit] = [] - - if changed: - cursor.execute( - """ - INSERT INTO metadata (hash, relationships) - VALUES (?, ?) - ON CONFLICT(hash) DO UPDATE SET - relationships = excluded.relationships, - time_modified = CURRENT_TIMESTAMP, - updated_at = CURRENT_TIMESTAMP - """, - (src_hash, - json.dumps(rels if rels else {})), - ) - except Exception: - # Best-effort cleanup; deletion should still proceed. - pass - - # Delete the file entry (cascades to metadata, tags, notes, etc via foreign keys) - cursor.execute("DELETE FROM file WHERE file_path = ?", - (str_path, - )) - self.connection.commit() - - logger.debug(f"Deleted file from database: {str_path} (hash: {file_hash})") - return cursor.rowcount > 0 - except Exception as e: - logger.error(f"Error deleting file {file_path}: {e}", exc_info=True) - return False - - # ======================================================================== - # WORKER MANAGEMENT - # ======================================================================== - - def insert_worker( - self, - worker_id: str, - worker_type: str, - title: str = "", - description: str = "", - total_steps: int = 0, - pipe: Optional[str] = None, - ) -> int: - """Insert a new worker entry into the database.""" - with self._with_db_lock(): - try: - cursor = self.connection.cursor() - cursor.execute( - """ - INSERT INTO worker (worker_id, worker_type, pipe, status, title, description, total_steps) - VALUES (?, ?, ?, ?, ?, ?, ?) - """, - ( - worker_id, - worker_type, - pipe, - "running", - title, - description, - total_steps - ), - ) - worker_rowid = cursor.lastrowid or 0 - - # Prune occasionally (1 in 50 chance) or just run it to keep it clean - # Running it every time might be overkill, but let's do a light version - cursor.execute( - "DELETE FROM worker WHERE status != 'running' AND id < (SELECT MAX(id) - ? FROM worker)", - (MAX_FINISHED_WORKERS * 2,) - ) - - self.connection.commit() - return worker_rowid - except sqlite3.IntegrityError: - return self.update_worker_status(worker_id, "running") - except Exception as e: - logger.error(f"Error inserting worker: {e}", exc_info=True) - return 0 - - def update_worker(self, worker_id: str, **kwargs) -> bool: - """Update worker entry with given fields.""" - with self._with_db_lock(): - try: - allowed_fields = { - "status", - "progress", - "current_step", - "error_message", - "result_data", - "title", - "description", - "completed_at", - "total_steps", - "pipe", - "started_at", - "last_stdout_at", - } - update_fields = { - k: v - for k, v in kwargs.items() if k in allowed_fields - } - - if not update_fields: - return True - - update_fields["last_updated"] = datetime.now().isoformat() - cursor = self.connection.cursor() - set_clause = ", ".join(f"{k} = ?" for k in update_fields.keys()) - values = list(update_fields.values()) + [worker_id] - - cursor.execute( - f""" - UPDATE worker SET {set_clause} WHERE worker_id = ? - """, - values, - ) - - self.connection.commit() - return cursor.rowcount > 0 - except Exception as e: - logger.error(f"Error updating worker {worker_id}: {e}", exc_info=True) - return False - - def update_worker_status(self, worker_id: str, status: str) -> int: - """Update worker status and return its database ID.""" - with self._with_db_lock(): - try: - cursor = self.connection.cursor() - - if status in ("completed", "error"): - cursor.execute( - """ - UPDATE worker - SET status = ?, completed_at = CURRENT_TIMESTAMP, last_updated = CURRENT_TIMESTAMP - WHERE worker_id = ? - """, - (status, - worker_id), - ) - else: - cursor.execute( - """ - UPDATE worker - SET status = ?, last_updated = CURRENT_TIMESTAMP - WHERE worker_id = ? - """, - (status, - worker_id), - ) - - self.connection.commit() - - cursor.execute("SELECT id FROM worker WHERE worker_id = ?", - (worker_id, - )) - row = cursor.fetchone() - return row[0] if row else 0 - except Exception as e: - logger.error(f"Error updating worker status: {e}", exc_info=True) - return 0 - - def get_worker(self, worker_id: str) -> Optional[Dict[str, Any]]: - """Retrieve a worker entry by ID.""" - try: - cursor = self.connection.cursor() - cursor.execute("SELECT * FROM worker WHERE worker_id = ?", - (worker_id, - )) - row = cursor.fetchone() - return dict(row) if row else None - except Exception as e: - logger.error(f"Error retrieving worker: {e}", exc_info=True) - return None - - def get_active_workers(self) -> List[Dict[str, Any]]: - """Get all active (running) workers.""" - try: - cursor = self.connection.cursor() - cursor.execute( - "SELECT * FROM worker WHERE status = 'running' ORDER BY started_at DESC" - ) - return [dict(row) for row in cursor.fetchall()] - except Exception as e: - logger.error(f"Error retrieving active workers: {e}", exc_info=True) - return [] - - def get_all_workers(self, limit: int = 100) -> List[Dict[str, Any]]: - """Get all workers (recent first).""" - try: - cursor = self.connection.cursor() - cursor.execute( - """ - SELECT * FROM worker ORDER BY started_at DESC LIMIT ? - """, - (limit, - ), - ) - return [dict(row) for row in cursor.fetchall()] - except Exception as e: - logger.error(f"Error retrieving all workers: {e}", exc_info=True) - return [] - - def delete_worker(self, worker_id: str) -> bool: - """Delete a worker entry.""" - with self._with_db_lock(): - try: - cursor = self.connection.cursor() - cursor.execute("DELETE FROM worker WHERE worker_id = ?", - (worker_id, - )) - self.connection.commit() - return cursor.rowcount > 0 - except Exception as e: - logger.error(f"Error deleting worker: {e}", exc_info=True) - return False - - def cleanup_old_workers(self, days: int = 7) -> int: - """Clean up completed/errored workers older than specified days.""" - try: - cursor = self.connection.cursor() - cursor.execute( - """ - DELETE FROM worker - WHERE status IN ('completed', 'error') - AND completed_at < datetime('now', '-' || ? || ' days') - """, - (days, - ), - ) - self.connection.commit() - return cursor.rowcount - except Exception as e: - logger.error(f"Error cleaning up old workers: {e}", exc_info=True) - return 0 - - def expire_running_workers( - self, - older_than_seconds: int = 300, - status: str = "error", - reason: str | None = None, - worker_id_prefix: str | None = None, - ) -> int: - """Mark long-idle running workers as finished with the given status. - - Args: - older_than_seconds: Minimum idle time before expiring the worker. - status: New status to apply (e.g., "error" or "cancelled"). - reason: Error message to set when none is present. - worker_id_prefix: Optional LIKE pattern (e.g., 'cli_%') to scope updates. - - Returns: - Number of workers updated. - """ - idle_seconds = max(1, int(older_than_seconds)) - cutoff = f"-{idle_seconds} seconds" - auto_reason = reason or "Worker stopped responding; auto-marked as error" - try: - cursor = self.connection.cursor() - if worker_id_prefix: - cursor.execute( - """ - UPDATE worker - SET status = ?, - error_message = CASE - WHEN IFNULL(TRIM(error_message), '') = '' THEN ? - ELSE error_message - END, - completed_at = COALESCE(completed_at, CURRENT_TIMESTAMP), - last_updated = CURRENT_TIMESTAMP - WHERE status = 'running' - AND worker_id LIKE ? - AND COALESCE(last_updated, started_at, created_at) < datetime('now', ?) - """, - (status, - auto_reason, - worker_id_prefix, - cutoff), - ) - else: - cursor.execute( - """ - UPDATE worker - SET status = ?, - error_message = CASE - WHEN IFNULL(TRIM(error_message), '') = '' THEN ? - ELSE error_message - END, - completed_at = COALESCE(completed_at, CURRENT_TIMESTAMP), - last_updated = CURRENT_TIMESTAMP - WHERE status = 'running' - AND COALESCE(last_updated, started_at, created_at) < datetime('now', ?) - """, - (status, - auto_reason, - cutoff), - ) - self.connection.commit() - return cursor.rowcount - except Exception as exc: - logger.error(f"Error expiring stale workers: {exc}", exc_info=True) - return 0 - - def append_worker_stdout( - self, - worker_id: str, - text: str, - step: Optional[str] = None, - channel: str = "stdout" - ) -> bool: - """Append text to a worker's stdout log and timeline.""" - if not text: - return True - with self._with_db_lock(): - try: - # Check if connection is valid - if not self.connection: - logger.warning( - f"Database connection not available for worker {worker_id}" - ) - return False - - payload = text if text.endswith("\n") else f"{text}\n" - cursor = self.connection.cursor() - cursor.execute( - """ - UPDATE worker - SET stdout = CASE - WHEN stdout IS NULL OR stdout = '' THEN ? - WHEN substr(stdout, -1, 1) = '\n' THEN stdout || ? - ELSE stdout || '\n' || ? - END, - last_updated = CURRENT_TIMESTAMP, - last_stdout_at = CURRENT_TIMESTAMP - WHERE worker_id = ? - """, - (payload, payload, payload, worker_id), - ) - if cursor.rowcount <= 0: - logger.warning(f"Worker {worker_id} not found for stdout append") - return False - self._insert_worker_log_entry( - cursor, - worker_id, - "stdout", - text, - step, - channel - ) - - self.connection.commit() - return cursor.rowcount > 0 - except sqlite3.ProgrammingError as e: - # Handle "Cannot operate on a closed database" gracefully - if "closed database" in str(e).lower(): - logger.warning( - f"Database connection closed, cannot append stdout for worker {worker_id}" - ) - return False - logger.error( - f"Error appending stdout to worker {worker_id}: {e}", - exc_info=True - ) - return False - except Exception as e: - logger.error( - f"Error appending stdout to worker {worker_id}: {e}", - exc_info=True - ) - return False - - def get_worker_stdout(self, worker_id: str) -> str: - """Get stdout logs for a worker.""" - try: - cursor = self.connection.cursor() - cursor.execute( - "SELECT stdout FROM worker WHERE worker_id = ?", - (worker_id, - ) - ) - row = cursor.fetchone() - return row[0] if row and row[0] else "" - except Exception as e: - logger.error( - f"Error getting worker stdout for {worker_id}: {e}", - exc_info=True - ) - return "" - - def append_worker_steps(self, worker_id: str, step_text: str) -> bool: - """Append a step to a worker's step log and timeline.""" - if not step_text: - return True - try: - cursor = self.connection.cursor() - cursor.execute( - "SELECT steps FROM worker WHERE worker_id = ?", - (worker_id, - ) - ) - row = cursor.fetchone() - - if not row: - logger.warning(f"Worker {worker_id} not found for steps append") - return False - - current_steps = row[0] or "" - timestamp = datetime.now().strftime("%H:%M:%S") - step_entry = f"[{timestamp}] {step_text}" - new_steps = (current_steps + "\n" if current_steps else "") + step_entry - - cursor.execute( - """ - UPDATE worker SET steps = ?, last_updated = CURRENT_TIMESTAMP, - current_step = ? - WHERE worker_id = ? - """, - (new_steps, - step_text, - worker_id), - ) - self._insert_worker_log_entry( - cursor, - worker_id, - "step", - step_text, - step_text, - "step" - ) - - self.connection.commit() - return cursor.rowcount > 0 - except Exception as e: - logger.error( - f"Error appending step to worker {worker_id}: {e}", - exc_info=True - ) - return False - - def get_worker_steps(self, worker_id: str) -> str: - """Get step logs for a worker.""" - try: - cursor = self.connection.cursor() - cursor.execute( - "SELECT steps FROM worker WHERE worker_id = ?", - (worker_id, - ) - ) - row = cursor.fetchone() - return row[0] if row and row[0] else "" - except Exception as e: - logger.error( - f"Error getting worker steps for {worker_id}: {e}", - exc_info=True - ) - return "" - - def clear_worker_stdout(self, worker_id: str) -> bool: - """Clear stdout logs for a worker.""" - try: - cursor = self.connection.cursor() - cursor.execute( - """ - UPDATE worker SET stdout = '', last_updated = CURRENT_TIMESTAMP - WHERE worker_id = ? - """, - (worker_id, - ), - ) - self.clear_worker_events(worker_id, event_type="stdout") - self.connection.commit() - return cursor.rowcount > 0 - except Exception as e: - logger.error(f"Error clearing worker stdout: {e}", exc_info=True) - return False - - def clear_finished_workers(self) -> int: - """Delete all workers that are not currently running.""" - try: - cursor = self.connection.cursor() - cursor.execute("DELETE FROM worker WHERE status != 'running'") - self.connection.commit() - return cursor.rowcount - except Exception as e: - logger.error(f"Error clearing finished workers: {e}", exc_info=True) - return 0 - - def close(self) -> None: - """Close the database connection.""" - try: - if self.connection: - self.connection.close() - logger.info("Database connection closed") - except Exception as e: - logger.error(f"Error closing database: {e}", exc_info=True) - - def __enter__(self): - # Acquire shared lock to serialize access across threads - self._lock_cm = self._with_db_lock() - self._lock_cm.__enter__() - - if not self.connection: - self._init_db() - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - try: - self.close() - finally: - if hasattr(self, "_lock_cm"): - self._lock_cm.__exit__(exc_type, exc_val, exc_tb) - - -# ============================================================================ -# DATABASE QUERY API -# ============================================================================ - - -class DatabaseAPI: - """Query API wrapper for LocalLibraryDB providing specialized search methods.""" - - def __init__(self, search_dir: Path): - self.search_dir = expand_path(search_dir).resolve() - self.db = API_folder_store(self.search_dir) - try: - mm_debug( - f"[folder-db] DatabaseAPI init: root={self.search_dir} db={self.db.db_path}" - ) - except Exception: - pass - - def __enter__(self): - try: - mm_debug( - f"[folder-db] DatabaseAPI enter: root={self.search_dir} db={self.db.db_path}" - ) - except Exception: - pass - self.db.__enter__() - return self - - def __exit__(self, *args): - try: - mm_debug( - f"[folder-db] DatabaseAPI exit: root={self.search_dir} db={self.db.db_path}" - ) - except Exception: - pass - return self.db.__exit__(*args) - - def get_cursor(self): - return self.db.connection.cursor() - - def get_file_hash_by_hash(self, file_hash: str) -> Optional[str]: - """Get file hash from the database, or None if not found.""" - cursor = self.get_cursor() - cursor.execute( - "SELECT hash FROM file WHERE LOWER(hash) = ?", - (file_hash.lower(), - ) - ) - row = cursor.fetchone() - return row[0] if row else None - - def get_all_file_hashes(self) -> Set[str]: - """Get all file hashes in the database.""" - cursor = self.get_cursor() - cursor.execute("SELECT hash FROM file") - return {row[0] - for row in cursor.fetchall()} - - def get_file_hashes_by_tag_pattern(self, query_pattern: str) -> List[tuple]: - """Get (hash, tag) tuples matching a tag pattern.""" - cursor = self.get_cursor() - cursor.execute( - """ - SELECT DISTINCT f.hash, t.tag - FROM file f - JOIN tag t ON f.hash = t.hash - WHERE LOWER(t.tag) LIKE ? - """, - (query_pattern, - ), - ) - return cursor.fetchall() - - def get_file_hashes_by_path_pattern(self, like_pattern: str) -> Set[str]: - """Get hashes of files matching a path pattern.""" - cursor = self.get_cursor() - cursor.execute( - "SELECT DISTINCT hash FROM file WHERE LOWER(file_path) LIKE ?", - (like_pattern, - ) - ) - return {row[0] - for row in cursor.fetchall()} - - def get_file_hashes_by_tag_substring(self, like_pattern: str) -> Set[str]: - """Get hashes of files matching a tag substring.""" - cursor = self.get_cursor() - cursor.execute( - """ - SELECT DISTINCT f.hash - FROM file f - JOIN tag t ON f.hash = t.hash - WHERE LOWER(t.tag) LIKE ? - """, - (like_pattern, - ), - ) - return {row[0] - for row in cursor.fetchall()} - - def get_file_hashes_with_any_url(self, limit: Optional[int] = None) -> Set[str]: - """Get hashes of files that have any non-empty URL metadata.""" - mm_debug( - f"[folder-db] get_file_hashes_with_any_url start: limit={limit or 10000}" - ) - cursor = self.get_cursor() - cursor.execute( - """ - SELECT DISTINCT f.hash - FROM file f - JOIN metadata m ON f.hash = m.hash - WHERE m.url IS NOT NULL - AND TRIM(m.url) != '' - AND TRIM(m.url) != '[]' - LIMIT ? - """, - (limit or 10000, - ), - ) - rows = cursor.fetchall() - mm_debug( - f"[folder-db] get_file_hashes_with_any_url done: {len(rows)} row(s)" - ) - return {row[0] for row in rows} - - def get_file_hashes_by_url_like( - self, - like_pattern: str, - limit: Optional[int] = None - ) -> Set[str]: - """Get hashes of files whose URL metadata contains a substring (case-insensitive).""" - mm_debug( - f"[folder-db] get_file_hashes_by_url_like start: pattern={like_pattern} limit={limit or 10000}" - ) - cursor = self.get_cursor() - cursor.execute( - """ - SELECT DISTINCT f.hash - FROM file f - JOIN metadata m ON f.hash = m.hash - WHERE m.url IS NOT NULL - AND LOWER(m.url) LIKE ? - LIMIT ? - """, - (like_pattern.lower(), - limit or 10000), - ) - rows = cursor.fetchall() - mm_debug( - f"[folder-db] get_file_hashes_by_url_like done: {len(rows)} row(s)" - ) - return {row[0] for row in rows} - - def get_file_hashes_by_ext(self, - ext_value: str, - limit: Optional[int] = None) -> Set[str]: - """Get hashes of files whose metadata ext matches the given extension. - - Matches case-insensitively and ignores any leading '.' in stored ext. - Supports glob wildcards '*' and '?' in the query. - """ - ext_clean = str(ext_value or "").strip().lower().lstrip(".") - ext_clean = "".join(ch for ch in ext_clean if ch.isalnum()) - if not ext_clean: - return set() - - cursor = self.get_cursor() - - has_glob = ("*" in ext_value) or ("?" in ext_value) - if has_glob: - pattern = str(ext_value or "").strip().lower().lstrip(".") - pattern = pattern.replace("%", "\\%").replace("_", "\\_") - pattern = pattern.replace("*", "%").replace("?", "_") - cursor.execute( - """ - SELECT DISTINCT f.hash - FROM file f - JOIN metadata m ON f.hash = m.hash - WHERE LOWER(LTRIM(COALESCE(m.ext, ''), '.')) LIKE ? ESCAPE '\\' - LIMIT ? - """, - (pattern, - limit or 10000), - ) - else: - cursor.execute( - """ - SELECT DISTINCT f.hash - FROM file f - JOIN metadata m ON f.hash = m.hash - WHERE LOWER(LTRIM(COALESCE(m.ext, ''), '.')) = ? - LIMIT ? - """, - (ext_clean, - limit or 10000), - ) - return {row[0] - for row in cursor.fetchall()} - - def get_files_by_ext(self, - ext_value: str, - limit: Optional[int] = None) -> List[tuple]: - """Get files whose metadata ext matches the given extension. - - Returns (hash, file_path, size, ext) tuples. - """ - ext_clean = str(ext_value or "").strip().lower().lstrip(".") - ext_clean = "".join(ch for ch in ext_clean if ch.isalnum()) - if not ext_clean: - return [] - - cursor = self.get_cursor() - cursor.execute( - """ - SELECT f.hash, f.file_path, - COALESCE((SELECT size FROM metadata WHERE hash = f.hash), 0) as size, - COALESCE((SELECT ext FROM metadata WHERE hash = f.hash), '') as ext - FROM file f - JOIN metadata m ON f.hash = m.hash - WHERE LOWER(LTRIM(COALESCE(m.ext, ''), '.')) = ? - ORDER BY f.file_path - LIMIT ? - """, - (ext_clean, - limit or 10000), - ) - return cursor.fetchall() - - def get_files_with_any_url(self, limit: Optional[int] = None) -> List[tuple]: - """Get files that have any non-empty URL metadata. - - Returns (hash, file_path, size, ext, url) tuples. - """ - mm_debug( - f"[folder-db] get_files_with_any_url start: limit={limit or 10000}" - ) - cursor = self.get_cursor() - cursor.execute( - """ - SELECT f.hash, f.file_path, - COALESCE((SELECT size FROM metadata WHERE hash = f.hash), 0) as size, - COALESCE((SELECT ext FROM metadata WHERE hash = f.hash), '') as ext, - COALESCE(m.url, '') as url - FROM file f - JOIN metadata m ON f.hash = m.hash - WHERE m.url IS NOT NULL - AND TRIM(m.url) != '' - AND TRIM(m.url) != '[]' - ORDER BY f.file_path - LIMIT ? - """, - (limit or 10000, - ), - ) - rows = cursor.fetchall() - mm_debug( - f"[folder-db] get_files_with_any_url done: {len(rows)} row(s)" - ) - return rows - - def get_files_by_url_like(self, - like_pattern: str, - limit: Optional[int] = None) -> List[tuple]: - """Get files whose URL metadata contains a substring (case-insensitive). - - Returns (hash, file_path, size, ext, url) tuples. - """ - mm_debug( - f"[folder-db] get_files_by_url_like start: pattern={like_pattern} limit={limit or 10000}" - ) - cursor = self.get_cursor() - cursor.execute( - """ - SELECT f.hash, f.file_path, - COALESCE((SELECT size FROM metadata WHERE hash = f.hash), 0) as size, - COALESCE((SELECT ext FROM metadata WHERE hash = f.hash), '') as ext, - COALESCE(m.url, '') as url - FROM file f - JOIN metadata m ON f.hash = m.hash - WHERE m.url IS NOT NULL - AND LOWER(m.url) LIKE ? - ORDER BY f.file_path - LIMIT ? - """, - (like_pattern.lower(), - limit or 10000), - ) - rows = cursor.fetchall() - mm_debug( - f"[folder-db] get_files_by_url_like done: {len(rows)} row(s)" - ) - return rows - - def get_files_by_url_like_any( - self, - like_patterns: Sequence[str], - limit: Optional[int] = None, - ) -> List[tuple]: - """Get files whose URL metadata matches any of the provided LIKE patterns. - - Returns (hash, file_path, size, ext, url) tuples. - """ - patterns = [str(p or "").strip() for p in (like_patterns or [])] - patterns = [p for p in patterns if p] - if not patterns: - return [] - - mm_debug( - f"[folder-db] get_files_by_url_like_any start: patterns={len(patterns)} limit={limit or 10000}" - ) - cursor = self.get_cursor() - where_or = " OR ".join(["LOWER(m.url) LIKE ?"] * len(patterns)) - query = f""" - SELECT f.hash, f.file_path, - COALESCE((SELECT size FROM metadata WHERE hash = f.hash), 0) as size, - COALESCE((SELECT ext FROM metadata WHERE hash = f.hash), '') as ext, - COALESCE(m.url, '') as url - FROM file f - JOIN metadata m ON f.hash = m.hash - WHERE m.url IS NOT NULL - AND ({where_or}) - ORDER BY f.file_path - LIMIT ? - """ - cursor.execute( - query, - (*[p.lower() for p in patterns], limit or 10000), - ) - rows = cursor.fetchall() - mm_debug( - f"[folder-db] get_files_by_url_like_any done: {len(rows)} row(s)" - ) - return rows - - def get_file_metadata(self, - file_hashes: Set[str], - limit: Optional[int] = None) -> List[tuple]: - """Get metadata for files given their hashes. Returns (hash, file_path, size, extension) tuples.""" - if not file_hashes: - return [] - mm_debug( - f"[folder-db] get_file_metadata start: hashes={len(file_hashes)} limit={limit or len(file_hashes)}" - ) - cursor = self.get_cursor() - placeholders = ",".join(["?"] * len(file_hashes)) - fetch_sql = f""" - SELECT hash, file_path, - COALESCE((SELECT size FROM metadata WHERE hash = file.hash), 0) as size, - COALESCE((SELECT ext FROM metadata WHERE hash = file.hash), '') as ext - FROM file - WHERE hash IN ({placeholders}) - ORDER BY file_path - LIMIT ? - """ - cursor.execute(fetch_sql, (*file_hashes, limit or len(file_hashes))) - rows = cursor.fetchall() - mm_debug( - f"[folder-db] get_file_metadata done: {len(rows)} row(s)" - ) - return rows - - def get_all_files(self, limit: Optional[int] = None) -> List[tuple]: - """Get all files in database. Returns (hash, file_path, size, ext) tuples.""" - cursor = self.get_cursor() - cursor.execute( - """ - SELECT f.hash, f.file_path, - COALESCE((SELECT size FROM metadata WHERE hash = f.hash), 0) as size, - COALESCE((SELECT ext FROM metadata WHERE hash = f.hash), '') as ext - FROM file f - ORDER BY file_path - LIMIT ? - """, - (limit or 1000, - ), - ) - return cursor.fetchall() - - def get_tags_for_file(self, file_hash: str) -> List[str]: - """Get all tags for a file given its hash.""" - mm_debug( - f"[folder-db] get_tags_for_file start: hash={file_hash}" - ) - cursor = self.get_cursor() - cursor.execute("SELECT tag FROM tag WHERE hash = ?", - (file_hash, - )) - rows = cursor.fetchall() - mm_debug( - f"[folder-db] get_tags_for_file done: {len(rows)} row(s)" - ) - return [row[0] for row in rows] - - def get_tags_by_namespace_and_file(self, - file_hash: str, - query_pattern: str) -> List[str]: - """Get tags for a file matching a pattern.""" - cursor = self.get_cursor() - cursor.execute( - """ - SELECT DISTINCT tag FROM tag - WHERE hash = ? - AND LOWER(tag) LIKE ? - """, - (file_hash, - query_pattern), - ) - return [row[0] for row in cursor.fetchall()] - - def get_files_by_namespace_pattern( - self, - query_pattern: str, - limit: Optional[int] = None - ) -> List[tuple]: - """Get files with tags matching a pattern. Returns (hash, file_path, size, ext) tuples.""" - cursor = self.get_cursor() - cursor.execute( - """ - SELECT DISTINCT f.hash, f.file_path, - COALESCE((SELECT size FROM metadata WHERE hash = f.hash), 0) as size, - COALESCE((SELECT ext FROM metadata WHERE hash = f.hash), '') as ext - FROM file f - JOIN tag t ON f.hash = t.hash - WHERE LOWER(t.tag) LIKE ? - ORDER BY f.file_path - LIMIT ? - """, - (query_pattern, - limit or 1000), - ) - return cursor.fetchall() - - def get_files_by_simple_tag_pattern( - self, - query_pattern: str, - limit: Optional[int] = None - ) -> List[tuple]: - """Get files with non-namespaced tags matching a pattern. Returns (hash, file_path, size, ext) tuples.""" - cursor = self.get_cursor() - cursor.execute( - """ - SELECT DISTINCT f.hash, f.file_path, - COALESCE((SELECT size FROM metadata WHERE hash = f.hash), 0) as size, - COALESCE((SELECT ext FROM metadata WHERE hash = f.hash), '') as ext - FROM file f - JOIN tag t ON f.hash = t.hash - WHERE LOWER(t.tag) LIKE ? AND LOWER(t.tag) NOT LIKE '%:%' - ORDER BY f.file_path - LIMIT ? - """, - (query_pattern, - limit or 1000), - ) - return cursor.fetchall() - - def get_files_by_multiple_path_conditions( - self, - conditions: List[str], - params: List[str], - limit: Optional[int] = None - ) -> List[tuple]: - """Get files matching multiple path conditions. Returns (hash, file_path, size, ext) tuples.""" - cursor = self.get_cursor() - where_clause = " AND ".join(conditions) - sql = f""" - SELECT DISTINCT f.hash, f.file_path, - COALESCE((SELECT size FROM metadata WHERE hash = f.hash), 0) as size, - COALESCE((SELECT ext FROM metadata WHERE hash = f.hash), '') as ext - FROM file f - WHERE {where_clause} - ORDER BY f.file_path - LIMIT ? - """ - cursor.execute(sql, (*params, limit or 10000)) - return cursor.fetchall() - - def get_files_by_title_tag_pattern( - self, - title_pattern: str, - limit: Optional[int] = None - ) -> List[tuple]: - """Get files with title tags matching a pattern. Returns (hash, file_path, size, ext) tuples.""" - cursor = self.get_cursor() - cursor.execute( - """ - SELECT DISTINCT f.hash, f.file_path, - COALESCE((SELECT size FROM metadata WHERE hash = f.hash), 0) as size, - COALESCE((SELECT ext FROM metadata WHERE hash = f.hash), '') as ext - FROM file f - JOIN tag t ON f.hash = t.hash - WHERE LOWER(t.tag) LIKE ? - ORDER BY f.file_path - LIMIT ? - """, - (title_pattern, - limit or 10000), - ) - return cursor.fetchall() - - -# ============================================================================ -# LIBRARY INITIALIZATION & MIGRATION -# ============================================================================ - - -class LocalLibraryInitializer: - """Initialize and synchronize local library database.""" - - def __init__(self, library_root: Path): - """Initialize the database scanner.""" - self.library_root = expand_path(library_root).resolve() - self.db = API_folder_store(self.library_root) - self.stats = { - "files_scanned": 0, - "files_new": 0, - "files_existing": 0, - "sidecars_imported": 0, - "sidecars_deleted": 0, - "tags_imported": 0, - "metadata_imported": 0, - "errors": 0, - } - - def scan_and_index(self) -> Dict[str, int]: - """Scan library folder and populate database with file entries.""" - try: - logger.info(f"Starting library scan at {self.library_root}") - - media_files = self._find_media_files() - logger.info(f"Found {len(media_files)} media files") - - db_files = self._get_database_files() - logger.info(f"Found {len(db_files)} files in database") - - for file_path in media_files: - self._process_file(file_path, db_files) - - self.db.connection.commit() - self._import_sidecars_batch() - self.db.connection.commit() - - # Ensure files without sidecars are still imported + renamed to hash. - self._hash_and_rename_non_sidecar_media_files() - self.db.connection.commit() - - self._cleanup_orphaned_sidecars() - self.db.connection.commit() - - try: - cursor = self.db.connection.cursor() - cursor.execute("SELECT COUNT(*) FROM file") - row = cursor.fetchone() - self.stats["files_total_db"] = int( - row[0] - ) if row and row[0] is not None else 0 - except Exception: - self.stats["files_total_db"] = 0 - - logger.info(f"Library scan complete. Stats: {self.stats}") - return self.stats - except Exception as e: - logger.error(f"Error during library scan: {e}", exc_info=True) - self.stats["errors"] += 1 - raise - finally: - self.db.close() - - def _hash_and_rename_non_sidecar_media_files(self) -> None: - """Ensure media files are hash-named even when they have no sidecars. - - This keeps the library stable across restarts: - - New files get hashed + renamed to - - DB file_path is updated by hash so the same file isn't re-counted as "new". - """ - try: - renamed = 0 - skipped_existing_target = 0 - duplicates_quarantined = 0 - - for file_path in self._find_media_files(): - try: - if not file_path.is_file(): - continue - - stem = file_path.stem.lower() - is_hash_named = len(stem) == 64 and all( - ch in "0123456789abcdef" for ch in stem - ) - if is_hash_named: - continue - - # If any sidecars exist for this file, let the sidecar importer handle it. - if (file_path.with_name(file_path.name + ".tag").exists() or - file_path.with_name(file_path.name + ".metadata").exists() - or file_path.with_name(file_path.name + ".notes").exists()): - continue - - file_hash = sha256_file(file_path) - target_path = file_path.with_name(f"{file_hash}{file_path.suffix}") - - # Ensure the DB entry exists with a title tag derived from the original filename. - # This intentionally happens BEFORE rename. - self.db.get_or_create_file_entry(file_path, file_hash) - - if target_path == file_path: - continue - - if target_path.exists(): - skipped_existing_target += 1 - # The canonical file already exists as a hash-named file. Keep the DB pointing - # at the canonical hash-named path and quarantine this duplicate so it doesn't - # get counted as "new" again on future restarts. - try: - cursor = self.db.connection.cursor() - cursor.execute( - "UPDATE file SET file_path = ?, updated_at = CURRENT_TIMESTAMP WHERE hash = ?", - (self.db._to_db_file_path(target_path), - file_hash), - ) - except Exception as exc: - logger.debug( - f"Failed to reset DB path to canonical file for {file_hash}: {exc}" - ) - - try: - dup_dir = self.library_root / ".duplicates" - dup_dir.mkdir(parents=True, exist_ok=True) - - dest = dup_dir / file_path.name - if dest.exists(): - ts = int(datetime.now().timestamp()) - dest = dup_dir / f"{file_path.stem}__dup__{ts}{file_path.suffix}" - - logger.warning( - f"Duplicate content (hash={file_hash}) detected; moving {file_path} -> {dest}" - ) - file_path.rename(dest) - duplicates_quarantined += 1 - except Exception as exc: - logger.warning( - f"Duplicate content (hash={file_hash}) detected but could not quarantine {file_path}: {exc}" - ) - continue - - try: - file_path.rename(target_path) - except Exception as exc: - logger.warning( - f"Failed to rename {file_path} -> {target_path}: {exc}" - ) - self.stats["errors"] += 1 - continue - - # Update DB path by hash (more robust than matching the old path). - try: - cursor = self.db.connection.cursor() - cursor.execute( - "UPDATE file SET file_path = ?, updated_at = CURRENT_TIMESTAMP WHERE hash = ?", - (self.db._to_db_file_path(target_path), - file_hash), - ) - except Exception: - pass - - # Ensure basic metadata exists. - try: - stat_result = target_path.stat() - self.db.save_metadata( - target_path, - { - "hash": file_hash, - "ext": target_path.suffix, - "size": stat_result.st_size, - }, - ) - except Exception: - pass - - renamed += 1 - except Exception as exc: - logger.warning(f"Error hashing/renaming file {file_path}: {exc}") - self.stats["errors"] += 1 - - if renamed: - self.stats["files_hashed_renamed"] = ( - int(self.stats.get("files_hashed_renamed", - 0) or 0) + renamed - ) - if skipped_existing_target: - self.stats["files_hashed_skipped_target_exists"] = ( - int(self.stats.get("files_hashed_skipped_target_exists", - 0) or 0) + skipped_existing_target - ) - if duplicates_quarantined: - self.stats["duplicates_quarantined"] = ( - int(self.stats.get("duplicates_quarantined", - 0) or 0) + duplicates_quarantined - ) - except Exception as exc: - logger.error( - f"Error hashing/renaming non-sidecar media files: {exc}", - exc_info=True - ) - self.stats["errors"] += 1 - - def _find_media_files(self) -> List[Path]: - """Find all media files in the library folder.""" - media_files = [] - try: - for file_path in self.library_root.rglob("*"): - # Don't repeatedly re-scan quarantined duplicates. - try: - if ".duplicates" in file_path.parts: - continue - except Exception: - pass - if file_path.is_file() and file_path.suffix.lower() in MEDIA_EXTENSIONS: - media_files.append(file_path) - except Exception as e: - logger.error(f"Error scanning media files: {e}", exc_info=True) - - return sorted(media_files) - - def _get_database_files(self) -> Dict[str, str]: - """Get existing files from database by normalized path, returns {normalized_path: hash}.""" - try: - cursor = self.db.connection.cursor() - cursor.execute("SELECT hash, file_path FROM file") - - result = {} - for file_hash, file_path in cursor.fetchall(): - try: - abs_path = self.db._from_db_file_path(file_path) - except Exception: - abs_path = Path(file_path) - normalized = str(abs_path.resolve()).lower() - result[normalized] = file_hash - - return result - except Exception as e: - logger.error(f"Error getting database files: {e}", exc_info=True) - return {} - - def _process_file(self, file_path: Path, db_files: Dict[str, str]) -> None: - """Process a single media file.""" - try: - normalized = str(file_path.resolve()).lower() - - if normalized in db_files: - self.stats["files_existing"] += 1 - else: - # Path not known. If this file's hash is already in DB, it's duplicate content and - # should not be counted as "new". - file_hash = sha256_file(file_path) - try: - cursor = self.db.connection.cursor() - cursor.execute("SELECT 1 FROM file WHERE hash = ?", - (file_hash, - )) - exists_by_hash = cursor.fetchone() is not None - except Exception: - exists_by_hash = False - - if exists_by_hash: - self.stats["files_existing"] += 1 - self.stats["duplicates_found"] = ( - int(self.stats.get("duplicates_found", - 0) or 0) + 1 - ) - logger.info( - f"Duplicate content detected during scan (hash={file_hash}): {file_path}" - ) - else: - self.db.get_or_create_file_entry(file_path, file_hash) - self.stats["files_new"] += 1 - - self.stats["files_scanned"] += 1 - except Exception as e: - logger.warning(f"Error processing file {file_path}: {e}") - self.stats["errors"] += 1 - - def _import_sidecars_batch(self) -> None: - """Batch import sidecars, hash files, and rename files to their hash.""" - try: - sidecar_map = self._collect_sidecars() - - for base_path, sidecars in sidecar_map.items(): - try: - if not base_path.exists(): - continue - - tags = self._read_tag_sidecars(sidecars) - metadata_info = self._read_metadata_sidecar(sidecars) - note_text = self._read_notes_sidecar(sidecars) - - hashed_path, file_hash = self._ensure_hashed_filename(base_path, sidecars) - - # Always trust freshly computed hash - metadata_info["hash"] = file_hash - try: - stat_result = hashed_path.stat() - metadata_info.setdefault("size", stat_result.st_size) - metadata_info.setdefault("ext", hashed_path.suffix) - except OSError: - pass - - self.db.save_file_info(hashed_path, metadata_info, tags) - if note_text: - self.db.save_note(hashed_path, note_text) - - # Delete all sidecars after importing - self._delete_sidecars(sidecars) - - self.stats["sidecars_imported"] += 1 - except Exception as e: - logger.warning( - f"Error importing sidecar bundle for {base_path}: {e}" - ) - self.stats["errors"] += 1 - except Exception as e: - logger.error(f"Error batch importing sidecars: {e}", exc_info=True) - - def _collect_sidecars(self) -> Dict[Path, Dict[str, List[Path]]]: - """Collect sidecars grouped by their base media file.""" - sidecar_map: Dict[Path, - Dict[str, - List[Path]]] = {} - - patterns = [ - ("*.tag", - "tag"), - ("*.metadata", - "metadata"), - ("*.notes", - "notes"), - ] - - for pattern, key in patterns: - for sidecar in self.library_root.rglob(pattern): - try: - base = sidecar.with_suffix("") - except Exception: - continue - - if not base.exists(): - continue - - bucket = sidecar_map.setdefault( - base, - { - "tag": [], - "metadata": [], - "notes": [] - } - ) - bucket[key].append(sidecar) - - return sidecar_map - - def _read_tag_sidecars(self, sidecars: Dict[str, List[Path]]) -> List[str]: - tags: List[str] = [] - for tag_path in sidecars.get("tag", []): - try: - content = tag_path.read_text(encoding="utf-8") - except OSError: - continue - - for raw_line in content.splitlines(): - line = raw_line.strip() - if line: - tags.append(line) - return tags - - def _read_metadata_sidecar(self, sidecars: Dict[str, List[Path]]) -> Dict[str, Any]: - metadata: Dict[str, - Any] = { - "url": [], - "relationships": [] - } - - meta_path = sidecars.get("metadata", []) - if not meta_path: - return metadata - - for path in meta_path: - try: - content = path.read_text(encoding="utf-8") - except OSError: - continue - - for raw_line in content.splitlines(): - line = raw_line.strip() - if not line or line.startswith("#"): - continue - - lower = line.lower() - if lower.startswith("hash:"): - metadata["hash"] = line.split(":", 1)[1].strip() - elif lower.startswith("url:") or lower.startswith("url:"): - url_part = line.split(":", 1)[1].strip() - if url_part: - for url_segment in url_part.replace(",", " ").split(): - clean = url_segment.strip() - if clean and clean not in metadata["url"]: - metadata["url"].append(clean) - elif lower.startswith("relationship:"): - rel_value = line.split(":", 1)[1].strip() - if rel_value: - metadata["relationships"].append(rel_value) - - return metadata - - def _read_notes_sidecar(self, sidecars: Dict[str, List[Path]]) -> Optional[str]: - note_paths = sidecars.get("notes", []) - for note_path in note_paths: - try: - content = note_path.read_text(encoding="utf-8").strip() - except OSError: - continue - if content: - return content - return None - - def _ensure_hashed_filename(self, - file_path: Path, - sidecars: Dict[str, - List[Path]]) -> Tuple[Path, - str]: - """Compute hash, rename file to hash-based name, and move sidecars accordingly.""" - file_hash = sha256_file(file_path) - target_name = f"{file_hash}{file_path.suffix}" - target_path = file_path.with_name(target_name) - - # Nothing to do if already hashed - if target_path == file_path: - return file_path, file_hash - - try: - if target_path.exists(): - logger.warning( - f"Hash target already exists, keeping original: {target_path}" - ) - return file_path, file_hash - - file_path.rename(target_path) - self._rename_sidecars(file_path, target_path, sidecars) - try: - self.db.rename_file(file_path, target_path) - except Exception: - # Entry might not exist yet; it will be created during save. - pass - return target_path, file_hash - except Exception as e: - logger.warning(f"Failed to rename {file_path} to hash {target_path}: {e}") - return file_path, file_hash - - def _rename_sidecars( - self, - old_base: Path, - new_base: Path, - sidecars: Dict[str, - List[Path]] - ) -> None: - """Rename sidecars to follow the new hashed filename.""" - mappings = [ - (sidecars.get("tag", - []), - ".tag"), - (sidecars.get("metadata", - []), - ".metadata"), - (sidecars.get("notes", - []), - ".notes"), - ] - - for candidates, suffix in mappings: - for source in candidates: - try: - dest = new_base.with_name(new_base.name + suffix) - except Exception: - continue - - if dest == source: - continue - - try: - source.rename(dest) - except Exception as e: - logger.warning(f"Failed to rename sidecar {source} -> {dest}: {e}") - - def _delete_sidecars(self, sidecars: Dict[str, List[Path]]) -> None: - """Delete sidecar files after they've been imported.""" - for sidecar_list in sidecars.values(): - for sidecar_path in sidecar_list: - try: - if sidecar_path.exists(): - sidecar_path.unlink() - self.stats["sidecars_deleted"] += 1 - except Exception as e: - logger.warning(f"Could not delete sidecar {sidecar_path}: {e}") - - def _cleanup_orphaned_sidecars(self) -> None: - """Remove sidecars for non-existent files.""" - try: - patterns = ["*.tag", "*.metadata", "*.notes"] - - for pattern in patterns: - for sidecar_path in self.library_root.rglob(pattern): - base_path = sidecar_path.with_suffix("") - if not base_path.exists(): - try: - sidecar_path.unlink() - self.stats["sidecars_deleted"] += 1 - except Exception as e: - logger.warning( - f"Could not delete orphaned sidecar {sidecar_path}: {e}" - ) - except Exception as e: - logger.error(f"Error cleaning up orphaned sidecars: {e}", exc_info=True) - - -def migrate_tags_to_db(library_root: Path, db: API_folder_store) -> int: - """Migrate .tag files to the database.""" - migrated_count = 0 - - try: - for tags_file in library_root.rglob("*.tag"): - try: - base_path = tags_file.with_suffix("") - tags_text = tags_file.read_text(encoding="utf-8") - tags = [line.strip() for line in tags_text.splitlines() if line.strip()] - - db.save_tags(base_path, tags) - migrated_count += 1 - - try: - tags_file.unlink() - logger.info(f"Migrated and deleted {tags_file}") - except Exception as e: - logger.warning(f"Migrated {tags_file} but failed to delete: {e}") - except Exception as e: - logger.warning(f"Failed to migrate {tags_file}: {e}") - - logger.info(f"Migrated {migrated_count} .tag files to database") - return migrated_count - except Exception as e: - logger.error(f"Error during tags migration: {e}", exc_info=True) - return migrated_count - - -def migrate_metadata_to_db(library_root: Path, db: API_folder_store) -> int: - """Migrate .metadata files to the database.""" - migrated_count = 0 - - try: - for metadata_file in library_root.rglob("*.metadata"): - try: - base_path = Path(str(metadata_file)[:-len(".metadata")]) - metadata_text = metadata_file.read_text(encoding="utf-8") - metadata = _parse_metadata_file(metadata_text) - - db.save_metadata(base_path, metadata) - migrated_count += 1 - - try: - metadata_file.unlink() - logger.info(f"Migrated and deleted {metadata_file}") - except Exception as e: - logger.warning( - f"Migrated {metadata_file} but failed to delete: {e}" - ) - except Exception as e: - logger.warning(f"Failed to migrate {metadata_file}: {e}") - - logger.info(f"Migrated {migrated_count} .metadata files to database") - return migrated_count - except Exception as e: - logger.error(f"Error during metadata migration: {e}", exc_info=True) - return migrated_count - - -def _parse_metadata_file(content: str) -> Dict[str, Any]: - """Parse metadata file content.""" - try: - return json.loads(content) - except json.JSONDecodeError: - logger.warning("Could not parse metadata JSON, returning empty dict") - return {} - - -def migrate_all(library_root: Path, - db: Optional[API_folder_store] = None) -> Dict[str, - int]: - """Migrate all sidecar files to database.""" - should_close = db is None - - try: - if db is None: - db = API_folder_store(library_root) - - return { - "tags": migrate_tags_to_db(library_root, - db), - "metadata": migrate_metadata_to_db(library_root, - db), - } - finally: - if should_close and db is not None: - db.close() - - -# ============================================================================ -# SEARCH OPTIMIZATION -# ============================================================================ - - -class LocalLibrarySearchOptimizer: - """Optimizer that uses database for local library searches.""" - - def __init__(self, library_root: Path): - """Initialize the search optimizer.""" - self.library_root = Path(library_root) - self.db: Optional[API_folder_store] = None - - def __enter__(self): - """Context manager entry.""" - self.db = API_folder_store(self.library_root) - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - """Context manager exit.""" - if self.db: - self.db.close() - - def get_cached_tags(self, file_path: Path) -> List[str]: - """Get tags from database cache.""" - if not self.db: - return [] - file_hash = self.db.get_file_hash(file_path) - return self.db.get_tags(file_hash) if file_hash else [] - - def get_cached_metadata(self, file_path: Path) -> Optional[Dict[str, Any]]: - """Get metadata from database cache.""" - if not self.db: - return None - file_hash = self.db.get_file_hash(file_path) - return self.db.get_metadata(file_hash) if file_hash else None - - def prefetch_metadata(self, file_paths: List[Path]) -> None: - """Pre-cache metadata for multiple files.""" - if not self.db: - return - - for file_path in file_paths: - try: - self.db.get_or_create_file_entry(file_path) - except Exception as e: - logger.warning(f"Failed to prefetch {file_path}: {e}") - - def update_search_result_with_cached_data( - self, - search_result: Any, - file_path: Path - ) -> None: - """Update a search result object with cached database data.""" - if not self.db: - return - - try: - file_hash = self.db.get_file_hash(file_path) - if not file_hash: - return - - tags = self.db.get_tags(file_hash) - if tags: - search_result.tag_summary = ", ".join(tags) - - metadata = self.db.get_metadata(file_hash) - if metadata: - if "hash" in metadata: - search_result.hash_hex = metadata["hash"] - if "duration" in metadata: - search_result.duration_seconds = metadata["duration"] - if "media_kind" in metadata: - search_result.media_kind = metadata["media_kind"] - except Exception as e: - logger.warning(f"Failed to update search result for {file_path}: {e}") - - def search_by_tag(self, tag: str, limit: int = 100) -> List[Dict[str, Any]]: - """Fast tag-based search using database.""" - if not self.db: - return [] - - try: - with self.db._with_db_lock(): - cursor = self.db.connection.cursor() - cursor.execute( - """ - SELECT f.hash, f.file_path, m.duration, m.size, m.type as media_kind, m.url - FROM file f - JOIN tag t ON f.hash = t.hash - LEFT JOIN metadata m ON f.hash = m.hash - WHERE t.tag LIKE ? - LIMIT ? - """, - (f"%{tag}%", - limit), - ) - - results = [] - for row in cursor.fetchall(): - res = dict(row) - # Resolve path to absolute string for remote consumption - res["file_path"] = str(self.db._from_db_file_path(res["file_path"])) - results.append(res) - return results - except Exception as e: - logger.error(f"Tag search failed: {e}") - return [] - - def search_by_name(self, query: str, limit: int = 100) -> List[Dict[str, Any]]: - """Fast name-based search using database.""" - if not self.db: - return [] - - try: - with self.db._with_db_lock(): - cursor = self.db.connection.cursor() - cursor.execute( - """ - SELECT f.hash, f.file_path, m.duration, m.size, m.type as media_kind, m.url - FROM file f - LEFT JOIN metadata m ON f.hash = m.hash - WHERE f.file_path LIKE ? - LIMIT ? - """, - (f"%{query}%", - limit), - ) - - results = [] - for row in cursor.fetchall(): - res = dict(row) - # Resolve path to absolute string for remote consumption - res["file_path"] = str(self.db._from_db_file_path(res["file_path"])) - results.append(res) - return results - except Exception as e: - logger.error(f"Name search failed: {e}") - return [] - - def save_playlist(self, name: str, items: List[Dict[str, Any]]) -> bool: - """Save a playlist to the database.""" - if not self.db: - return False - try: - cursor = self.db.connection.cursor() - items_json = json.dumps(items) - cursor.execute( - """ - INSERT INTO playlist (name, items, updated_at) - VALUES (?, ?, CURRENT_TIMESTAMP) - ON CONFLICT(name) DO UPDATE SET - items = excluded.items, - updated_at = CURRENT_TIMESTAMP - """, - (name, - items_json), - ) - self.db.connection.commit() - return True - except Exception as e: - logger.error(f"Failed to save playlist {name}: {e}") - return False - - def get_playlists(self) -> List[Dict[str, Any]]: - """Get all saved playlists.""" - if not self.db: - return [] - try: - cursor = self.db.connection.cursor() - cursor.execute( - "SELECT id, name, items, updated_at FROM playlist ORDER BY updated_at DESC" - ) - results = [] - for row in cursor.fetchall(): - try: - items = json.loads(row["items"]) - except json.JSONDecodeError: - items = [] - results.append( - { - "id": row["id"], - "name": row["name"], - "items": items, - "updated_at": row["updated_at"], - } - ) - return results - except Exception as e: - logger.error(f"Failed to get playlists: {e}") - return [] - - def get_playlist(self, name: str) -> Optional[List[Dict[str, Any]]]: - """Get a specific playlist by name.""" - if not self.db: - return None - try: - cursor = self.db.connection.cursor() - cursor.execute("SELECT items FROM playlist WHERE name = ?", - (name, - )) - row = cursor.fetchone() - if row: - try: - return json.loads(row["items"]) - except json.JSONDecodeError: - return [] - return None - except Exception as e: - logger.error(f"Failed to get playlist {name}: {e}") - return None - - def get_playlist_by_id(self, - playlist_id: int) -> Optional[Tuple[str, - List[Dict[str, - Any]]]]: - """Get a specific playlist by ID. Returns (name, items).""" - if not self.db: - return None - try: - cursor = self.db.connection.cursor() - cursor.execute( - "SELECT name, items FROM playlist WHERE id = ?", - (playlist_id, - ) - ) - row = cursor.fetchone() - if row: - try: - items = json.loads(row["items"]) - return (row["name"], items) - except json.JSONDecodeError: - return (row["name"], []) - return None - except Exception as e: - logger.error(f"Failed to get playlist ID {playlist_id}: {e}") - return None - - def delete_playlist(self, playlist_id: int) -> bool: - """Delete a playlist by ID.""" - if not self.db: - return False - try: - cursor = self.db.connection.cursor() - cursor.execute("DELETE FROM playlist WHERE id = ?", - (playlist_id, - )) - self.db.connection.commit() - return cursor.rowcount > 0 - except Exception as e: - logger.error(f"Failed to delete playlist ID {playlist_id}: {e}") - return False - - def search_by_hash(self, file_hash: str) -> Optional[Path]: - """Fast hash-based search using database.""" - if not self.db: - return None - return self.db.search_hash(file_hash) - - def set_relationship( - self, - file_path: Path, - related_file_path: Path, - rel_type: str = "alt", - *, - bidirectional: bool = True, - ) -> None: - """Set a relationship between two files in the database. - - Delegates to LocalLibraryDB.set_relationship(). - - Args: - file_path: Path to the first file - related_file_path: Path to the related file - rel_type: Type of relationship ('king', 'alt', 'related', etc.) - """ - if not self.db: - return - self.db.set_relationship( - file_path, - related_file_path, - rel_type, - bidirectional=bidirectional - ) - - def find_files_pointing_to(self, target_path: Path) -> List[Dict[str, Any]]: - """Find all files that have a relationship pointing to the target path.""" - if not self.db: - return [] - return self.db.find_files_pointing_to(target_path) diff --git a/Store/registry.py b/Store/registry.py index c6f800b..3231f91 100644 --- a/Store/registry.py +++ b/Store/registry.py @@ -164,45 +164,6 @@ class Store: str] = {} self._load_backends() - def _maybe_register_temp_alias( - self, - store_type: str, - backend_name: str, - kwargs: Dict[str, - Any], - backend: BaseStore - ) -> None: - """If a folder backend points at config['temp'], also expose it as the 'temp' backend. - - This keeps config compatibility (e.g. existing 'default') while presenting the temp - directory under a clearer name. - """ - try: - if _normalize_store_type(store_type) != "folder": - return - temp_value = self._config.get("temp") - if not temp_value: - return - path_value = kwargs.get("PATH") or kwargs.get("path") - if not path_value: - return - - temp_path = expand_path(temp_value).resolve() - backend_path = expand_path(path_value).resolve() - if backend_path != temp_path: - return - - # If the user already has a dedicated temp backend, do nothing. - if "temp" in self._backends: - return - - # Keep original name working, but add an alias. - if backend_name != "temp": - self._backends["temp"] = backend - self._backend_types["temp"] = store_type - except Exception: - return - def _load_backends(self) -> None: store_cfg = self._config.get("store") if not isinstance(store_cfg, dict): @@ -256,14 +217,6 @@ class Store: backend_name = str(kwargs.get("NAME") or instance_name) self._backends[backend_name] = backend self._backend_types[backend_name] = store_type - - # If this is the configured temp directory, also alias it as 'temp'. - self._maybe_register_temp_alias( - store_type, - backend_name, - kwargs, - backend - ) except Exception as exc: err_text = str(exc) self._backend_errors[str(instance_name)] = err_text @@ -447,7 +400,6 @@ def list_configured_backend_names(config: Optional[Dict[str, Any]]) -> list[str] Behaviour: - For each configured store type, returns the per-instance NAME override (case-insensitive) when present, otherwise the instance key. - - Includes a 'temp' alias when a folder backend points to the configured 'temp' path. """ try: store_cfg = (config or {}).get("store") or {} @@ -468,28 +420,6 @@ def list_configured_backend_names(config: Optional[Dict[str, Any]]) -> list[str] else: names.append(str(instance_name)) - # Best-effort: alias 'temp' when a folder backend points at config['temp'] - try: - temp_value = (config or {}).get("temp") - if temp_value: - temp_path = str(expand_path(temp_value).resolve()) - for raw_store_type, instances in store_cfg.items(): - if not isinstance(instances, dict): - continue - if _normalize_store_type(str(raw_store_type)) != "folder": - continue - for instance_name, instance_config in instances.items(): - if not isinstance(instance_config, dict): - continue - path_value = instance_config.get("PATH") or instance_config.get("path") - if not path_value: - continue - if str(expand_path(path_value).resolve()) == temp_path: - if "temp" not in names: - names.append("temp") - except Exception: - pass - return sorted(set(names)) except Exception: return [] diff --git a/cmdlet/_shared.py b/cmdlet/_shared.py index da490f4..f38f481 100644 --- a/cmdlet/_shared.py +++ b/cmdlet/_shared.py @@ -283,8 +283,7 @@ class SharedArgs: "location", type="enum", choices=["hydrus", - "0x0", - "local"], + "0x0"], required=True, description="Destination location", ) @@ -292,7 +291,7 @@ class SharedArgs: DELETE = CmdletArg( "delete", type="flag", - description="Delete the file and its .tag after successful operation.", + description="Delete the file after successful operation.", ) # Metadata arguments diff --git a/cmdlet/add_file.py b/cmdlet/add_file.py index 6e64694..9d74b95 100644 --- a/cmdlet/add_file.py +++ b/cmdlet/add_file.py @@ -34,7 +34,6 @@ coerce_to_path = sh.coerce_to_path build_pipeline_preview = sh.build_pipeline_preview get_field = sh.get_field -from API.folder import read_sidecar, find_sidecar, write_sidecar, API_folder_store from SYS.utils import sha256_file, unique_path from SYS.metadata import write_metadata @@ -2401,31 +2400,6 @@ class Add_File(Cmdlet): List[str], List[str]]: """Load sidecar metadata.""" - if store and store.lower() == "local": - try: - from SYS.config import get_local_storage_path - - db_root = get_local_storage_path(config) - if db_root: - with API_folder_store(Path(db_root)) as db: - file_hash = db.get_file_hash(media_path) - if file_hash: - tags = db.get_tags(file_hash) or [] - metadata = db.get_metadata(file_hash) or {} - url = metadata.get("url") or [] - f_hash = metadata.get("hash") or file_hash - if tags or url or f_hash: - return None, f_hash, tags, url - except Exception: - pass - - try: - sidecar_path = find_sidecar(media_path) - if sidecar_path and sidecar_path.exists(): - h, t, u = read_sidecar(sidecar_path) - return sidecar_path, h, t or [], u or [] - except Exception: - pass return None, None, [], [] @staticmethod @@ -2465,26 +2439,7 @@ class Add_File(Cmdlet): duration: Any, media_kind: str, ): - payload = { - "hash": f_hash, - "url": url, - "relationships": relationships or [], - "duration": duration, - "size": None, - "ext": dest_path.suffix.lower(), - "media_type": media_kind, - "media_kind": media_kind, - } - try: - payload["size"] = dest_path.stat().st_size - except OSError: - payload["size"] = None - - with API_folder_store(library_root) as db: - try: - db.save_file_info(dest_path, payload, tags) - except Exception as exc: - log(f"⚠️ Failed to persist metadata: {exc}", file=sys.stderr) + pass @staticmethod def _copy_sidecars(source_path: Path, target_path: Path): diff --git a/cmdlet/add_relationship.py b/cmdlet/add_relationship.py index 7d6cbea..6d67ed7 100644 --- a/cmdlet/add_relationship.py +++ b/cmdlet/add_relationship.py @@ -20,7 +20,6 @@ parse_cmdlet_args = sh.parse_cmdlet_args normalize_result_input = sh.normalize_result_input should_show_help = sh.should_show_help get_field = sh.get_field -from API.folder import read_sidecar, find_sidecar, API_folder_store from Store import Store CMDLET = Cmdlet( @@ -862,8 +861,7 @@ def _run(result: Any, _args: Sequence[str], config: Dict[str, Any]) -> int: None) # Legacy LOCAL STORAGE MODE: Handle relationships for local files - # (kept for -path sidecar workflows; store/hash mode above is preferred) - from API.folder import LocalLibrarySearchOptimizer + # (kept as stub - folder store removed) from SYS.config import get_local_storage_path local_storage_path = get_local_storage_path(config) if config else None diff --git a/cmdlet/delete_relationship.py b/cmdlet/delete_relationship.py index a88ae0a..13b2db9 100644 --- a/cmdlet/delete_relationship.py +++ b/cmdlet/delete_relationship.py @@ -1,28 +1,13 @@ -"""Delete file relationships.""" +"""Delete file relationships (Currently Disabled).""" -from __future__ import annotations - -from typing import Any, Dict, Optional, Sequence -import json -from pathlib import Path import sys - -from SYS.logger import log +from typing import Any, Dict, List, Optional from SYS import pipeline as ctx from . import _shared as sh -Cmdlet = sh.Cmdlet -CmdletArg = sh.CmdletArg -SharedArgs = sh.SharedArgs -parse_cmdlet_args = sh.parse_cmdlet_args normalize_hash = sh.normalize_hash -normalize_result_input = sh.normalize_result_input get_field = sh.get_field -should_show_help = sh.should_show_help -from API.folder import API_folder_store -from Store import Store -from SYS.config import get_local_storage_path def _extract_hash(item: Any) -> Optional[str]: @@ -33,471 +18,42 @@ def _extract_hash(item: Any) -> Optional[str]: return normalize_hash(str(h)) if h else None -def _upsert_relationships( - db: API_folder_store, - file_hash: str, - relationships: Dict[str, - Any] -) -> None: - conn = db.connection - if conn is None: - raise RuntimeError("Store DB connection is not initialized") - cursor = conn.cursor() - cursor.execute( - """ - INSERT INTO metadata (hash, relationships) - VALUES (?, ?) - ON CONFLICT(hash) DO UPDATE SET - relationships = excluded.relationships, - time_modified = CURRENT_TIMESTAMP, - updated_at = CURRENT_TIMESTAMP - """, - (file_hash, - json.dumps(relationships) if relationships else "{}"), - ) - - -def _remove_reverse_link( - db: API_folder_store, - *, - src_hash: str, - dst_hash: str, - rel_type: str -) -> None: - meta = db.get_metadata(dst_hash) or {} - rels = meta.get("relationships") if isinstance(meta, dict) else None - if not isinstance(rels, dict) or not rels: - return - - key_to_edit: Optional[str] = None - for k in list(rels.keys()): - if str(k).lower() == str(rel_type).lower(): - key_to_edit = str(k) - break - if not key_to_edit: - return - - bucket = rels.get(key_to_edit) - if not isinstance(bucket, list) or not bucket: - return - - new_bucket = [h for h in bucket if str(h).lower() != str(src_hash).lower()] - if new_bucket: - rels[key_to_edit] = new_bucket - else: - try: - del rels[key_to_edit] - except Exception: - rels[key_to_edit] = [] - - _upsert_relationships(db, dst_hash, rels) - - -def _refresh_relationship_view_if_current( - target_hash: Optional[str], - target_path: Optional[str], - other: Optional[str], - config: Dict[str, - Any], -) -> None: - """If the current subject matches the target, refresh relationships via get-relationship.""" +def _run(args: List[str], config: Dict[str, Any]) -> int: try: - from cmdlet import get as get_cmdlet # type: ignore - except Exception: - return - - try: - subject = ctx.get_last_result_subject() - if subject is None: - return - - def norm(val: Any) -> str: - return str(val).lower() - - target_hashes = [norm(v) for v in [target_hash, other] if v] - target_paths = [norm(v) for v in [target_path, other] if v] - - subj_hashes: list[str] = [] - subj_paths: list[str] = [] - for field in ("hydrus_hash", "hash", "hash_hex", "file_hash"): - val = get_field(subject, field) - if val: - subj_hashes.append(norm(val)) - for field in ("file_path", "path", "target"): - val = get_field(subject, field) - if val: - subj_paths.append(norm(val)) - - is_match = False - if target_hashes and any(h in subj_hashes for h in target_hashes): - is_match = True - if target_paths and any(p in subj_paths for p in target_paths): - is_match = True - if not is_match: - return - - refresh_args: list[str] = [] - if target_hash: - refresh_args.extend(["-query", f"hash:{target_hash}"]) - - cmd = get_cmdlet("get-relationship") - if not cmd: - return - cmd(subject, refresh_args, config) - except Exception: - pass - - -def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: - """Delete relationships from files. - - Args: - result: Input result(s) from previous cmdlet - args: Command arguments - config: CLI configuration - - Returns: - Exit code (0 = success) - """ - try: - if should_show_help(args): - log( - f"Cmdlet: {CMDLET.name}\nSummary: {CMDLET.summary}\nUsage: {CMDLET.usage}" - ) + parsed, _, _ = sh.parse_cmdlet_args(args) + if sh.should_show_help(parsed): return 0 - # Parse arguments - parsed_args = parse_cmdlet_args(args, CMDLET) - delete_all_flag = parsed_args.get("all", False) - rel_type_filter = parsed_args.get("type") - override_store = parsed_args.get("store") - override_hashes = sh.parse_hash_query(parsed_args.get("query")) - if parsed_args.get("query") and not override_hashes: - log("Invalid -query value (expected hash:)", file=sys.stderr) - return 1 - raw_path = parsed_args.get("path") - - # Normalize input - results = normalize_result_input(result) - - # Allow store/hash-first usage when no pipeline items were provided - if (not results) and override_hashes: - if not override_store: - log( - "-store is required when using -query without piped items", - file=sys.stderr - ) - return 1 - results = [ - { - "hash": h, - "store": str(override_store) - } for h in override_hashes - ] - - if not results: - # Legacy -path mode below may still apply - if raw_path: - results = [{ - "file_path": raw_path - }] - else: - log("No results to process", file=sys.stderr) - return 1 - - # Decide store (for same-store enforcement + folder-store DB routing) - store_name: Optional[str] = str(override_store - ).strip() if override_store else None - if not store_name: - stores = { - str(get_field(r, - "store")) - for r in results if get_field(r, "store") - } - if len(stores) == 1: - store_name = next(iter(stores)) - elif len(stores) > 1: - log( - "Multiple stores detected in pipeline; use -store to choose one", - file=sys.stderr, - ) - return 1 - - deleted_count = 0 - - # STORE/HASH FIRST: folder-store DB deletion (preferred) - if store_name: - backend = None - store_root: Optional[Path] = None - try: - store = Store(config) - backend = store[str(store_name)] - loc = getattr(backend, "location", None) - if callable(loc): - store_root = Path(str(loc())) - except Exception: - backend = None - store_root = None - - if store_root is not None: - try: - with API_folder_store(store_root) as db: - conn = db.connection - if conn is None: - raise RuntimeError("Store DB connection is not initialized") - for single_result in results: - # Enforce same-store when items carry store info - item_store = get_field(single_result, "store") - if item_store and str(item_store) != str(store_name): - log( - f"Cross-store delete blocked: item store '{item_store}' != '{store_name}'", - file=sys.stderr, - ) - return 1 - - file_hash = _extract_hash(single_result) - if not file_hash: - # Try path -> hash lookup within this store - fp = ( - get_field(single_result, - "file_path") - or get_field(single_result, - "path") - or get_field(single_result, - "target") - ) - if fp: - try: - file_hash = db.get_file_hash(Path(str(fp))) - except Exception: - file_hash = None - if not file_hash: - log( - 'Could not extract file hash for deletion (use -query "hash:" or ensure pipeline includes hash)', - file=sys.stderr, - ) - return 1 - - meta = db.get_metadata(file_hash) or {} - rels = meta.get("relationships" - ) if isinstance(meta, - dict) else None - if not isinstance(rels, dict) or not rels: - continue - - if delete_all_flag: - # remove reverse edges for all types - for rt, hashes in list(rels.items()): - if not isinstance(hashes, list): - continue - for other_hash in hashes: - other_norm = normalize_hash(str(other_hash)) - if other_norm: - _remove_reverse_link( - db, - src_hash=file_hash, - dst_hash=other_norm, - rel_type=str(rt), - ) - rels = {} - elif rel_type_filter: - # delete one type (case-insensitive key match) - key_to_delete: Optional[str] = None - for k in list(rels.keys()): - if str(k).lower() == str(rel_type_filter).lower(): - key_to_delete = str(k) - break - if not key_to_delete: - continue - hashes = rels.get(key_to_delete) - if isinstance(hashes, list): - for other_hash in hashes: - other_norm = normalize_hash(str(other_hash)) - if other_norm: - _remove_reverse_link( - db, - src_hash=file_hash, - dst_hash=other_norm, - rel_type=str(key_to_delete), - ) - try: - del rels[key_to_delete] - except Exception: - rels[key_to_delete] = [] - else: - log( - "Specify --all to delete all relationships or -type to delete specific type", - file=sys.stderr, - ) - return 1 - - _upsert_relationships(db, file_hash, rels) - conn.commit() - _refresh_relationship_view_if_current( - file_hash, - None, - None, - config - ) - deleted_count += 1 - - log( - f"Successfully deleted relationships from {deleted_count} file(s)", - file=sys.stderr, - ) - return 0 - except Exception as exc: - log(f"Error deleting store relationships: {exc}", file=sys.stderr) - return 1 - - # LEGACY PATH MODE (single local DB) - # Get storage path - local_storage_path = get_local_storage_path(config) - if not local_storage_path: - log("Local storage path not configured", file=sys.stderr) - return 1 - - try: - with API_folder_store(Path(local_storage_path)) as db: - conn = db.connection - if conn is None: - raise RuntimeError("Store DB connection is not initialized") - - for single_result in results: - # Get file path from result - file_path_from_result = ( - get_field(single_result, - "file_path") or get_field(single_result, - "path") - or get_field(single_result, - "target") - or ( - str(single_result) if not isinstance(single_result, - dict) else None - ) - ) - - if not file_path_from_result: - log("Could not extract file path from result", file=sys.stderr) - return 1 - - file_path_obj = Path(str(file_path_from_result)) - - if not file_path_obj.exists(): - log(f"File not found: {file_path_obj}", file=sys.stderr) - return 1 - - try: - file_hash = db.get_file_hash(file_path_obj) - except Exception: - file_hash = None - file_hash = normalize_hash(str(file_hash)) if file_hash else None - if not file_hash: - log( - f"File not in database: {file_path_obj.name}", - file=sys.stderr - ) - continue - - meta = db.get_metadata(file_hash) or {} - rels = meta.get("relationships") if isinstance(meta, dict) else None - if not isinstance(rels, dict) or not rels: - continue - - if delete_all_flag: - for rt, hashes in list(rels.items()): - if not isinstance(hashes, list): - continue - for other_hash in hashes: - other_norm = normalize_hash(str(other_hash)) - if other_norm: - _remove_reverse_link( - db, - src_hash=file_hash, - dst_hash=other_norm, - rel_type=str(rt), - ) - rels = {} - elif rel_type_filter: - key_to_delete: Optional[str] = None - for k in list(rels.keys()): - if str(k).lower() == str(rel_type_filter).lower(): - key_to_delete = str(k) - break - if not key_to_delete: - continue - hashes = rels.get(key_to_delete) - if isinstance(hashes, list): - for other_hash in hashes: - other_norm = normalize_hash(str(other_hash)) - if other_norm: - _remove_reverse_link( - db, - src_hash=file_hash, - dst_hash=other_norm, - rel_type=str(key_to_delete), - ) - try: - del rels[key_to_delete] - except Exception: - rels[key_to_delete] = [] - else: - log( - "Specify --all to delete all relationships or -type to delete specific type", - file=sys.stderr, - ) - return 1 - - _upsert_relationships(db, file_hash, rels) - conn.commit() - _refresh_relationship_view_if_current( - file_hash, - str(file_path_obj), - None, - config - ) - deleted_count += 1 - except Exception as exc: - log(f"Error deleting relationship: {exc}", file=sys.stderr) - return 1 - - log( - f"Successfully deleted relationships from {deleted_count} file(s)", - file=sys.stderr - ) - return 0 + # Relationship deletion is currently not implemented for Hydrus. + # Legacy Folder storage has been removed. + sh.log("Relationship deletion is currently only supported via Folder storage, which has been removed.", file=sys.stderr) + sh.log("Hydrus relationship management via Medios-Macina is planned for a future update.", file=sys.stderr) + return 1 except Exception as exc: - log(f"Error in delete-relationship: {exc}", file=sys.stderr) + sh.log(f"Error in delete-relationship: {exc}", file=sys.stderr) return 1 -CMDLET = Cmdlet( +CMDLET = sh.Cmdlet( name="delete-relationship", - summary="Remove relationships from files.", - usage= - "@1 | delete-relationship --all OR delete-relationship -path --all OR @1-3 | delete-relationship -type alt", + summary="Remove relationships from files (Currently Disabled).", + usage="@1 | delete-relationship --all", arg=[ - SharedArgs.PATH, - SharedArgs.STORE, - SharedArgs.QUERY, - CmdletArg( + sh.SharedArgs.PATH, + sh.SharedArgs.STORE, + sh.SharedArgs.QUERY, + sh.CmdletArg( "all", type="flag", description="Delete all relationships for the file(s)." ), - CmdletArg( + sh.CmdletArg( "type", type="string", - description= - "Delete specific relationship type ('alt', 'king', 'related'). Default: delete all types.", + description="Delete specific relationship type.", ), ], - detail=[ - "- Delete all relationships: pipe files | delete-relationship --all", - "- Delete specific type: pipe files | delete-relationship -type alt", - "- Delete all from file: delete-relationship -path --all", - ], ) CMDLET.exec = _run diff --git a/cmdlet/get_tag.py b/cmdlet/get_tag.py index 595065b..4336d37 100644 --- a/cmdlet/get_tag.py +++ b/cmdlet/get_tag.py @@ -25,7 +25,6 @@ from pathlib import Path from typing import Any, Dict, List, Optional, Sequence, Tuple from SYS import pipeline as ctx -from API.folder import read_sidecar, write_sidecar from . import _shared as sh normalize_hash = sh.normalize_hash @@ -525,157 +524,6 @@ def _apply_result_updates_from_tags(result: Any, tag_list: List[str]) -> None: pass -def _handle_title_rename(old_path: Path, tags_list: List[str]) -> Optional[Path]: - """If a title: tag is present, rename the file and its .tag sidecar to match. - - Returns the new path if renamed, otherwise returns None. - """ - # Extract title from tags - new_title = None - for tag in tags_list: - if isinstance(tag, str) and tag.lower().startswith("title:"): - new_title = tag.split(":", 1)[1].strip() - break - - if not new_title or not old_path.exists(): - return None - - try: - # Build new filename with same extension - old_name = old_path.name - old_suffix = old_path.suffix - - # Create new filename: title + extension - new_name = f"{new_title}{old_suffix}" - new_path = old_path.parent / new_name - - # Don't rename if already the same name - if new_path == old_path: - return None - - # Rename the main file - if new_path.exists(): - log(f"Warning: Target filename already exists: {new_name}", file=sys.stderr) - return None - - old_path.rename(new_path) - log(f"Renamed file: {old_name} → {new_name}", file=sys.stderr) - - # Rename the .tag sidecar if it exists - old_tags_path = old_path.parent / (old_name + ".tag") - if old_tags_path.exists(): - new_tags_path = old_path.parent / (new_name + ".tag") - if new_tags_path.exists(): - log( - f"Warning: Target sidecar already exists: {new_tags_path.name}", - file=sys.stderr - ) - else: - old_tags_path.rename(new_tags_path) - log( - f"Renamed sidecar: {old_tags_path.name} → {new_tags_path.name}", - file=sys.stderr - ) - - return new_path - except Exception as exc: - log(f"Warning: Failed to rename file: {exc}", file=sys.stderr) - return None - - -def _read_sidecar_fallback(p: Path) -> tuple[Optional[str], List[str], List[str]]: - """Fallback sidecar reader if metadata module unavailable. - - Format: - - Lines with "hash:" prefix: file hash - - Lines with "url:" or "url:" prefix: url - - Lines with "relationship:" prefix: ignored (internal relationships) - - Lines with "key:", "namespace:value" format: treated as namespace tags - - Plain lines without colons: freeform tags - - Excluded namespaces (treated as metadata, not tags): hash, url, url, relationship - """ - try: - raw = p.read_text(encoding="utf-8", errors="ignore") - except OSError: - return None, [], [] - t: List[str] = [] - u: List[str] = [] - h: Optional[str] = None - - # Namespaces to exclude from tags - excluded_namespaces = {"hash", - "url", - "url", - "relationship"} - - for line in raw.splitlines(): - s = line.strip() - if not s: - continue - low = s.lower() - - # Check if this is a hash line - if low.startswith("hash:"): - h = s.split(":", 1)[1].strip() if ":" in s else h - # Check if this is a URL line - elif low.startswith("url:") or low.startswith("url:"): - val = s.split(":", 1)[1].strip() if ":" in s else "" - if val: - u.append(val) - # Check if this is an excluded namespace - elif ":" in s: - namespace = s.split(":", 1)[0].strip().lower() - if namespace not in excluded_namespaces: - # Include as namespace tag (e.g., "title: The Freemasons") - t.append(s) - else: - # Plain text without colon = freeform tag - t.append(s) - - return h, t, u - - -def _write_sidecar( - p: Path, - media: Path, - tag_list: List[str], - url: List[str], - hash_in_sidecar: Optional[str] -) -> Path: - """Write tags to sidecar file and handle title-based renaming. - - Returns the new media path if renamed, otherwise returns the original media path. - """ - success = write_sidecar(media, tag_list, url, hash_in_sidecar) - if success: - _apply_result_updates_from_tags(None, tag_list) - # Check if we should rename the file based on title tag - new_media = _handle_title_rename(media, tag_list) - if new_media: - return new_media - return media - - # Fallback writer - ordered = [s for s in tag_list if s and s.strip()] - lines = [] - if hash_in_sidecar: - lines.append(f"hash:{hash_in_sidecar}") - lines.extend(ordered) - for u in url: - lines.append(f"url:{u}") - try: - p.write_text("\n".join(lines) + "\n", encoding="utf-8") - # Check if we should rename the file based on title tag - new_media = _handle_title_rename(media, tag_list) - if new_media: - return new_media - return media - except OSError as exc: - log(f"Failed to write sidecar: {exc}", file=sys.stderr) - return media - - def _emit_tag_payload( source: str, tags_list: List[str], @@ -1497,17 +1345,7 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: str) and file_path and not file_path.lower().startswith( ("http://", "https://"))): - try: - media_path = Path(str(file_path)) - if media_path.exists(): - tags_from_sidecar = read_sidecar(media_path) - if isinstance(tags_from_sidecar, list): - identifier_tags = [ - str(t) for t in tags_from_sidecar - if isinstance(t, (str, bytes)) - ] - except Exception: - pass + pass title_from_tags = _extract_tag_value(identifier_tags, "title") artist_from_tags = _extract_tag_value(identifier_tags, "artist") diff --git a/cmdlet/trim_file.py b/cmdlet/trim_file.py index c7df51c..0b0a6ac 100644 --- a/cmdlet/trim_file.py +++ b/cmdlet/trim_file.py @@ -181,27 +181,6 @@ def _persist_alt_relationship( if len(alt_norm) != 64 or len(king_norm) != 64 or alt_norm == king_norm: return - # Folder-backed local DB - try: - if (type(backend).__name__ == "Folder" and hasattr(backend, - "location") - and callable(getattr(backend, - "location"))): - from API.folder import API_folder_store - from pathlib import Path - - root = Path(str(backend.location())).expanduser() - with API_folder_store(root) as db: - db.set_relationship_by_hash( - alt_norm, - king_norm, - "alt", - bidirectional=False - ) - return - except Exception: - pass - # Hydrus-like backend try: client = getattr(backend, "_client", None) @@ -461,28 +440,14 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: store = Store(config) if store.is_available(store_name): backend = store[str(store_name)] - move_flag = type(backend).__name__ == "Folder" stored_hash = backend.add_file( Path(str(output_path)), title=new_title, tag=new_tags, url=urls, - move=move_flag, + move=False, ) stored_store = store_name - - # Best-effort resolve stored path for folder backends. - try: - if type(backend).__name__ == "Folder" and hasattr( - backend, - "get_file"): - p = backend.get_file(str(stored_hash)) - if isinstance(p, Path): - stored_path = str(p) - elif isinstance(p, str) and p: - stored_path = p - except Exception: - stored_path = None except Exception as exc: log( f"Failed to add clip to store '{store_name}': {exc}",