3672 lines
131 KiB
Python
3672 lines
131 KiB
Python
"""Unified local library management system combining database, initialization, migration, and search.
|
|
|
|
This module provides:
|
|
- SQLite database management for local file metadata caching
|
|
- Library scanning and database initialization
|
|
- Sidecar file migration from .tag/.metadata files to database
|
|
- Optimized search functionality using database indices
|
|
- Worker task tracking for background operations
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import sqlite3
|
|
import json
|
|
import logging
|
|
import subprocess
|
|
import shutil
|
|
from datetime import datetime
|
|
from pathlib import Path, PurePosixPath
|
|
from typing import Optional, Dict, Any, List, Tuple, Set
|
|
|
|
from SYS.utils import sha256_file
|
|
|
|
logger = logging.getLogger(__name__)
|
|
WORKER_LOG_MAX_ENTRIES = 99
|
|
|
|
# Try to import optional dependencies
|
|
try:
|
|
import mutagen
|
|
except ImportError:
|
|
mutagen = None
|
|
|
|
try:
|
|
from metadata import (
|
|
_read_sidecar_metadata,
|
|
_derive_sidecar_path,
|
|
write_tags,
|
|
write_tags_to_file,
|
|
embed_metadata_in_file,
|
|
read_tags_from_file,
|
|
)
|
|
|
|
METADATA_AVAILABLE = True
|
|
except ImportError:
|
|
_read_sidecar_metadata = None
|
|
_derive_sidecar_path = None
|
|
write_tags = None
|
|
write_tags_to_file = None
|
|
embed_metadata_in_file = None
|
|
read_tags_from_file = None
|
|
METADATA_AVAILABLE = False
|
|
|
|
# Media extensions to index
|
|
MEDIA_EXTENSIONS = {
|
|
".mp4",
|
|
".mkv",
|
|
".mka",
|
|
".webm",
|
|
".avi",
|
|
".mov",
|
|
".flv",
|
|
".wmv",
|
|
".m4v",
|
|
".mp3",
|
|
".flac",
|
|
".wav",
|
|
".aac",
|
|
".ogg",
|
|
".m4a",
|
|
".wma",
|
|
".jpg",
|
|
".jpeg",
|
|
".png",
|
|
".gif",
|
|
".webp",
|
|
".bmp",
|
|
".tiff",
|
|
".pdf",
|
|
".epub",
|
|
".txt",
|
|
".docx",
|
|
".doc",
|
|
}
|
|
|
|
# ============================================================================
|
|
# SIDECAR FILE HANDLING
|
|
# ============================================================================
|
|
|
|
|
|
def read_sidecar(sidecar_path: Path) -> Tuple[Optional[str], List[str], List[str]]:
|
|
"""Read metadata from a sidecar file.
|
|
|
|
Delegates to metadata._read_sidecar_metadata for centralized handling.
|
|
|
|
Args:
|
|
sidecar_path: Path to .tag sidecar file
|
|
|
|
Returns:
|
|
Tuple of (hash_value, tags_list, url_list)
|
|
Returns (None, [], []) if file doesn't exist or can't be read
|
|
"""
|
|
if _read_sidecar_metadata is None:
|
|
return None, [], []
|
|
|
|
try:
|
|
return _read_sidecar_metadata(sidecar_path)
|
|
except Exception:
|
|
return None, [], []
|
|
|
|
|
|
def write_sidecar(
|
|
media_path: Path,
|
|
tags: List[str],
|
|
url: List[str],
|
|
hash_value: Optional[str] = None
|
|
) -> bool:
|
|
"""Write metadata to a sidecar file.
|
|
|
|
Delegates to metadata.write_tags for centralized handling.
|
|
|
|
Args:
|
|
media_path: Path to the media file (sidecar created as media_path.tag)
|
|
tags: List of tag strings
|
|
url: List of known URL strings
|
|
hash_value: Optional SHA256 hash to include
|
|
|
|
Returns:
|
|
True if successful, False otherwise
|
|
"""
|
|
if write_tags is None:
|
|
return False
|
|
|
|
if media_path.exists() and media_path.is_dir():
|
|
return False
|
|
|
|
try:
|
|
write_tags(media_path, tags, url, hash_value)
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def find_sidecar(media_path: Path) -> Optional[Path]:
|
|
"""Find the sidecar file for a media path.
|
|
|
|
Uses metadata._derive_sidecar_path for centralized handling.
|
|
|
|
Args:
|
|
media_path: Path to media file
|
|
|
|
Returns:
|
|
Path to existing sidecar file, or None if not found
|
|
"""
|
|
if media_path.is_dir():
|
|
return None
|
|
|
|
if _derive_sidecar_path is None:
|
|
return None
|
|
|
|
try:
|
|
# Check for new format: filename.ext.tag
|
|
sidecar_path = _derive_sidecar_path(media_path)
|
|
if sidecar_path.exists():
|
|
return sidecar_path
|
|
except OSError:
|
|
pass
|
|
|
|
return None
|
|
|
|
|
|
def has_sidecar(media_path: Path) -> bool:
|
|
"""Check if a media file has a sidecar."""
|
|
return find_sidecar(media_path) is not None
|
|
|
|
|
|
class API_folder_store:
|
|
"""SQLite database for caching local library metadata."""
|
|
|
|
DB_NAME = "medios-macina.db"
|
|
SCHEMA_VERSION = 4
|
|
|
|
def __init__(self, library_root: Path):
|
|
"""Initialize the database at the library root.
|
|
|
|
Args:
|
|
library_root: Path to the local library root directory
|
|
"""
|
|
self.library_root = Path(library_root)
|
|
self.db_path = self.library_root / self.DB_NAME
|
|
self.connection: Optional[sqlite3.Connection] = None
|
|
self._init_db()
|
|
|
|
def _normalize_input_path(self, file_path: Path) -> Path:
|
|
p = Path(file_path).expanduser()
|
|
if not p.is_absolute():
|
|
p = self.library_root / p
|
|
return p
|
|
|
|
def _to_db_file_path(self, file_path: Path) -> str:
|
|
"""Convert an on-disk file path to a DB-stored relative path (POSIX separators)."""
|
|
p = self._normalize_input_path(file_path)
|
|
p_abs = p.resolve()
|
|
root_abs = self.library_root.resolve()
|
|
rel = p_abs.relative_to(root_abs)
|
|
rel_posix = PurePosixPath(*rel.parts).as_posix()
|
|
rel_posix = str(rel_posix or "").strip()
|
|
if not rel_posix or rel_posix == ".":
|
|
raise ValueError(f"Invalid relative path for DB storage: {file_path}")
|
|
return rel_posix
|
|
|
|
def _from_db_file_path(self, db_file_path: str) -> Path:
|
|
"""Convert a DB-stored relative path (POSIX separators) into an absolute path."""
|
|
rel_str = str(db_file_path or "").strip()
|
|
if not rel_str:
|
|
raise ValueError("Missing DB file_path")
|
|
rel_parts = PurePosixPath(rel_str).parts
|
|
return self.library_root / Path(*rel_parts)
|
|
|
|
def _init_db(self) -> None:
|
|
"""Initialize database connection and create tables if needed."""
|
|
try:
|
|
# Ensure the library root exists; sqlite cannot create parent dirs.
|
|
try:
|
|
self.library_root.mkdir(parents=True, exist_ok=True)
|
|
except Exception as exc:
|
|
raise RuntimeError(
|
|
f"Cannot create/open library root directory: {self.library_root}: {exc}"
|
|
) from exc
|
|
|
|
# Use check_same_thread=False to allow multi-threaded access
|
|
# This is safe because we're not sharing connections across threads;
|
|
# each thread will get its own cursor
|
|
# Set a generous timeout to avoid "database is locked" errors during heavy concurrency
|
|
self.connection = sqlite3.connect(
|
|
str(self.db_path),
|
|
check_same_thread=False,
|
|
timeout=60.0
|
|
)
|
|
self.connection.row_factory = sqlite3.Row
|
|
|
|
# Enable Write-Ahead Logging (WAL) for better concurrency
|
|
self.connection.execute("PRAGMA journal_mode=WAL")
|
|
# Enable foreign keys
|
|
self.connection.execute("PRAGMA foreign_keys = ON")
|
|
|
|
self._create_tables()
|
|
logger.info(f"Database initialized at {self.db_path}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize database: {e}", exc_info=True)
|
|
if self.connection:
|
|
try:
|
|
self.connection.close()
|
|
except Exception:
|
|
pass
|
|
self.connection = None
|
|
raise
|
|
|
|
def _create_tables(self) -> None:
|
|
"""Create database tables if they don't exist."""
|
|
cursor = self.connection.cursor()
|
|
|
|
cursor.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS files (
|
|
hash TEXT PRIMARY KEY NOT NULL,
|
|
file_path TEXT UNIQUE NOT NULL,
|
|
file_modified REAL,
|
|
indexed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
"""
|
|
)
|
|
|
|
cursor.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS metadata (
|
|
hash TEXT PRIMARY KEY NOT NULL,
|
|
url TEXT,
|
|
relationships TEXT,
|
|
duration REAL,
|
|
size INTEGER,
|
|
ext TEXT,
|
|
type TEXT,
|
|
time_imported TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
time_modified TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
FOREIGN KEY (hash) REFERENCES files(hash) ON DELETE CASCADE
|
|
)
|
|
"""
|
|
)
|
|
|
|
cursor.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS tags (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
hash TEXT NOT NULL,
|
|
tag TEXT NOT NULL,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
FOREIGN KEY (hash) REFERENCES files(hash) ON DELETE CASCADE,
|
|
UNIQUE(hash, tag)
|
|
)
|
|
"""
|
|
)
|
|
|
|
cursor.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS notes (
|
|
hash TEXT NOT NULL,
|
|
name TEXT NOT NULL,
|
|
note TEXT,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
FOREIGN KEY (hash) REFERENCES files(hash) ON DELETE CASCADE,
|
|
PRIMARY KEY (hash, name)
|
|
)
|
|
"""
|
|
)
|
|
|
|
cursor.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS playlists (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT UNIQUE NOT NULL,
|
|
items TEXT NOT NULL,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
"""
|
|
)
|
|
|
|
# Worker tracking tables (drop legacy workers table if still present)
|
|
self._ensure_worker_tables(cursor)
|
|
|
|
# Create indices for performance
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_files_path ON files(file_path)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_tags_hash ON tags(hash)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_tags_tag ON tags(tag)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_metadata_ext ON metadata(ext)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_worker_id ON worker(worker_id)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_worker_status ON worker(status)")
|
|
cursor.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_worker_type ON worker(worker_type)"
|
|
)
|
|
|
|
self._migrate_metadata_schema(cursor)
|
|
self._migrate_notes_schema(cursor)
|
|
|
|
# Notes indices (after migration so columns exist)
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_notes_hash ON notes(hash)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_notes_name ON notes(name)")
|
|
self.connection.commit()
|
|
logger.debug("Database tables created/verified")
|
|
|
|
def _ensure_worker_tables(self, cursor) -> None:
|
|
"""Ensure the modern worker tables exist, dropping legacy ones if needed."""
|
|
cursor.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name='worker'"
|
|
)
|
|
has_worker = cursor.fetchone() is not None
|
|
if not has_worker:
|
|
cursor.execute("DROP TABLE IF EXISTS workers")
|
|
cursor.execute(
|
|
"""
|
|
CREATE TABLE worker (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
worker_id TEXT UNIQUE NOT NULL,
|
|
worker_type TEXT NOT NULL,
|
|
pipe TEXT,
|
|
status TEXT DEFAULT 'running',
|
|
title TEXT,
|
|
description TEXT,
|
|
progress REAL DEFAULT 0.0,
|
|
current_step TEXT,
|
|
total_steps INTEGER DEFAULT 0,
|
|
error_message TEXT,
|
|
result_data TEXT,
|
|
stdout TEXT DEFAULT '',
|
|
steps TEXT DEFAULT '',
|
|
started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
completed_at TIMESTAMP,
|
|
last_stdout_at TIMESTAMP,
|
|
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
"""
|
|
)
|
|
else:
|
|
self._ensure_worker_columns(cursor)
|
|
|
|
cursor.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS worker_log (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
worker_id TEXT NOT NULL,
|
|
event_type TEXT NOT NULL,
|
|
step TEXT,
|
|
channel TEXT,
|
|
message TEXT,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
FOREIGN KEY(worker_id) REFERENCES worker(worker_id) ON DELETE CASCADE
|
|
)
|
|
"""
|
|
)
|
|
cursor.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_worker_log_worker_id ON worker_log(worker_id)"
|
|
)
|
|
|
|
def _ensure_worker_columns(self, cursor) -> None:
|
|
"""Backfill columns for older worker tables during upgrade."""
|
|
try:
|
|
cursor.execute("PRAGMA table_info(worker)")
|
|
existing_columns = {row[1]
|
|
for row in cursor.fetchall()}
|
|
except Exception as exc:
|
|
logger.error(f"Error introspecting worker table: {exc}")
|
|
return
|
|
column_specs = {
|
|
"pipe": "TEXT",
|
|
"progress": "REAL DEFAULT 0.0",
|
|
"current_step": "TEXT",
|
|
"total_steps": "INTEGER DEFAULT 0",
|
|
"stdout": "TEXT DEFAULT ''",
|
|
"steps": "TEXT DEFAULT ''",
|
|
"last_stdout_at": "TIMESTAMP",
|
|
}
|
|
for col_name, ddl in column_specs.items():
|
|
if col_name not in existing_columns:
|
|
try:
|
|
cursor.execute(f"ALTER TABLE worker ADD COLUMN {col_name} {ddl}")
|
|
logger.info(f"Added '{col_name}' column to worker table")
|
|
except Exception as exc:
|
|
logger.warning(
|
|
f"Could not add '{col_name}' column to worker table: {exc}"
|
|
)
|
|
|
|
def _insert_worker_log_entry(
|
|
self,
|
|
cursor,
|
|
worker_id: str,
|
|
event_type: str,
|
|
message: str,
|
|
step: Optional[str] = None,
|
|
channel: Optional[str] = None,
|
|
) -> None:
|
|
if not message:
|
|
return
|
|
cursor.execute(
|
|
"""
|
|
INSERT INTO worker_log (worker_id, event_type, step, channel, message)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
""",
|
|
(worker_id,
|
|
event_type,
|
|
step,
|
|
channel,
|
|
message),
|
|
)
|
|
self._prune_worker_log_entries(cursor, worker_id)
|
|
|
|
def _prune_worker_log_entries(self, cursor, worker_id: str) -> None:
|
|
"""Keep at most WORKER_LOG_MAX_ENTRIES rows per worker by trimming oldest ones."""
|
|
if WORKER_LOG_MAX_ENTRIES <= 0:
|
|
return
|
|
cursor.execute(
|
|
"""
|
|
SELECT id FROM worker_log
|
|
WHERE worker_id = ?
|
|
ORDER BY id DESC
|
|
LIMIT 1 OFFSET ?
|
|
""",
|
|
(worker_id,
|
|
WORKER_LOG_MAX_ENTRIES - 1),
|
|
)
|
|
row = cursor.fetchone()
|
|
if not row:
|
|
return
|
|
cutoff_id = row[0]
|
|
cursor.execute(
|
|
"DELETE FROM worker_log WHERE worker_id = ? AND id < ?",
|
|
(worker_id,
|
|
cutoff_id),
|
|
)
|
|
|
|
def get_worker_events(self,
|
|
worker_id: str,
|
|
limit: int = 500) -> List[Dict[str,
|
|
Any]]:
|
|
"""Return chronological worker log events for timelines."""
|
|
try:
|
|
cursor = self.connection.cursor()
|
|
cursor.execute(
|
|
"""
|
|
SELECT id, event_type, step, channel, message, created_at
|
|
FROM worker_log
|
|
WHERE worker_id = ?
|
|
ORDER BY id ASC
|
|
LIMIT ?
|
|
""",
|
|
(worker_id,
|
|
limit),
|
|
)
|
|
return [dict(row) for row in cursor.fetchall()]
|
|
except Exception as exc:
|
|
logger.error(
|
|
f"Error retrieving worker events for {worker_id}: {exc}",
|
|
exc_info=True
|
|
)
|
|
return []
|
|
|
|
def clear_worker_events(
|
|
self,
|
|
worker_id: str,
|
|
event_type: Optional[str] = None
|
|
) -> None:
|
|
"""Remove worker log entries, optionally filtered by event type."""
|
|
try:
|
|
cursor = self.connection.cursor()
|
|
if event_type:
|
|
cursor.execute(
|
|
"DELETE FROM worker_log WHERE worker_id = ? AND event_type = ?",
|
|
(worker_id,
|
|
event_type),
|
|
)
|
|
else:
|
|
cursor.execute(
|
|
"DELETE FROM worker_log WHERE worker_id = ?",
|
|
(worker_id,
|
|
)
|
|
)
|
|
self.connection.commit()
|
|
except Exception as exc:
|
|
logger.error(
|
|
f"Error clearing worker log for {worker_id}: {exc}",
|
|
exc_info=True
|
|
)
|
|
|
|
def _migrate_metadata_schema(self, cursor) -> None:
|
|
"""Ensure metadata schema is up-to-date.
|
|
|
|
- If a legacy schema is detected, attempt to import/upgrade (best-effort).
|
|
- If the hash-based schema exists, add any missing columns expected by current code.
|
|
"""
|
|
try:
|
|
# Check if this is a fresh new database (hash-based schema)
|
|
cursor.execute("PRAGMA table_info(metadata)")
|
|
existing_columns = {row[1]
|
|
for row in cursor.fetchall()}
|
|
|
|
# Legacy migration: If old schema exists, try to import data.
|
|
# Old schema would have had: id (INTEGER PRIMARY KEY), file_hash (TEXT), etc.
|
|
if "hash" not in existing_columns:
|
|
if "id" in existing_columns and "file_hash" in existing_columns:
|
|
logger.info(
|
|
"Detected legacy metadata schema - importing to new hash-based schema"
|
|
)
|
|
# This would be complex legacy migration - for now just note it.
|
|
logger.info(
|
|
"Legacy metadata table detected but import not yet implemented"
|
|
)
|
|
return
|
|
|
|
# Unknown/unsupported schema; nothing we can safely do here.
|
|
return
|
|
|
|
# Hash-based schema exists: add any missing columns expected by current code.
|
|
# These are safe ALTER TABLE additions for older DBs.
|
|
column_specs = {
|
|
"size": "INTEGER",
|
|
"ext": "TEXT",
|
|
"type": "TEXT",
|
|
"url": "TEXT",
|
|
"relationships": "TEXT",
|
|
"duration": "REAL",
|
|
"time_imported": "TIMESTAMP DEFAULT CURRENT_TIMESTAMP",
|
|
"time_modified": "TIMESTAMP DEFAULT CURRENT_TIMESTAMP",
|
|
"created_at": "TIMESTAMP DEFAULT CURRENT_TIMESTAMP",
|
|
"updated_at": "TIMESTAMP DEFAULT CURRENT_TIMESTAMP",
|
|
}
|
|
|
|
for col_name, col_def in column_specs.items():
|
|
if col_name not in existing_columns:
|
|
try:
|
|
cursor.execute(
|
|
f"ALTER TABLE metadata ADD COLUMN {col_name} {col_def}"
|
|
)
|
|
existing_columns.add(col_name)
|
|
logger.info(f"Added '{col_name}' column to metadata table")
|
|
except Exception as e:
|
|
logger.debug(f"Column '{col_name}' may already exist: {e}")
|
|
|
|
# Populate type column from ext if not already populated.
|
|
if "type" in existing_columns and "ext" in existing_columns:
|
|
try:
|
|
from SYS.utils_constant import get_type_from_ext
|
|
|
|
cursor.execute(
|
|
"SELECT hash, ext FROM metadata WHERE type IS NULL OR type = ''"
|
|
)
|
|
rows = cursor.fetchall()
|
|
for file_hash, ext in rows:
|
|
file_type = get_type_from_ext(ext or "")
|
|
cursor.execute(
|
|
"UPDATE metadata SET type = ? WHERE hash = ?",
|
|
(file_type,
|
|
file_hash)
|
|
)
|
|
if rows:
|
|
logger.info(
|
|
f"Populated type column for {len(rows)} metadata entries"
|
|
)
|
|
except Exception as e:
|
|
logger.debug(f"Could not populate type column: {e}")
|
|
|
|
self.connection.commit()
|
|
except Exception as e:
|
|
logger.debug(f"Note: Schema import/migration completed with status: {e}")
|
|
|
|
def _migrate_notes_schema(self, cursor) -> None:
|
|
"""Migrate legacy notes schema (hash PRIMARY KEY, note) to named notes (hash,name PRIMARY KEY)."""
|
|
try:
|
|
cursor.execute("PRAGMA table_info(notes)")
|
|
cols = [row[1] for row in cursor.fetchall()]
|
|
if not cols:
|
|
return
|
|
if "name" in cols:
|
|
return
|
|
|
|
logger.info("Migrating legacy notes table to named notes schema")
|
|
cursor.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS notes_new (
|
|
hash TEXT NOT NULL,
|
|
name TEXT NOT NULL,
|
|
note TEXT,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
FOREIGN KEY (hash) REFERENCES files(hash) ON DELETE CASCADE,
|
|
PRIMARY KEY (hash, name)
|
|
)
|
|
"""
|
|
)
|
|
|
|
# Copy existing notes into the default key
|
|
cursor.execute(
|
|
"""
|
|
INSERT INTO notes_new (hash, name, note, created_at, updated_at)
|
|
SELECT hash, 'default', note, created_at, updated_at
|
|
FROM notes
|
|
"""
|
|
)
|
|
|
|
cursor.execute("DROP TABLE notes")
|
|
cursor.execute("ALTER TABLE notes_new RENAME TO notes")
|
|
self.connection.commit()
|
|
except Exception as exc:
|
|
logger.debug(f"Notes schema migration skipped/failed: {exc}")
|
|
|
|
def _update_metadata_modified_time(self, file_hash: str) -> None:
|
|
"""Update the time_modified timestamp for a file's metadata."""
|
|
try:
|
|
cursor = self.connection.cursor()
|
|
cursor.execute(
|
|
"""
|
|
UPDATE metadata SET time_modified = CURRENT_TIMESTAMP WHERE hash = ?
|
|
""",
|
|
(file_hash,
|
|
),
|
|
)
|
|
self.connection.commit()
|
|
except Exception as e:
|
|
logger.debug(
|
|
f"Could not update metadata modified time for hash {file_hash}: {e}"
|
|
)
|
|
|
|
def get_or_create_file_entry(
|
|
self,
|
|
file_path: Path,
|
|
file_hash: Optional[str] = None
|
|
) -> str:
|
|
"""Get or create a file entry in the database and return the hash.
|
|
|
|
Args:
|
|
file_path: Path to the file
|
|
file_hash: Optional hash (will be computed if not provided)
|
|
|
|
Returns:
|
|
The file hash (primary key)
|
|
"""
|
|
try:
|
|
abs_path = self._normalize_input_path(file_path)
|
|
db_path = self._to_db_file_path(abs_path)
|
|
logger.debug(f"[get_or_create_file_entry] Looking up: {db_path}")
|
|
|
|
# If hash not provided, compute it
|
|
if not file_hash:
|
|
file_hash = sha256_file(abs_path)
|
|
logger.debug(f"[get_or_create_file_entry] Computed hash: {file_hash}")
|
|
|
|
cursor = self.connection.cursor()
|
|
|
|
# Prefer existing entry by path (file_path is UNIQUE in schema).
|
|
cursor.execute("SELECT hash FROM files WHERE file_path = ?",
|
|
(db_path,
|
|
))
|
|
row = cursor.fetchone()
|
|
if row and row[0]:
|
|
existing_hash = str(row[0])
|
|
if existing_hash != file_hash:
|
|
logger.debug(
|
|
f"[get_or_create_file_entry] Found existing file_path with different hash: path={db_path} existing={existing_hash} computed={file_hash}"
|
|
)
|
|
else:
|
|
logger.debug(
|
|
f"[get_or_create_file_entry] Found existing file_path: {db_path} -> {existing_hash}"
|
|
)
|
|
return existing_hash
|
|
|
|
# Check if file entry exists
|
|
cursor.execute("SELECT hash FROM files WHERE hash = ?",
|
|
(file_hash,
|
|
))
|
|
row = cursor.fetchone()
|
|
|
|
if row:
|
|
logger.debug(
|
|
f"[get_or_create_file_entry] Found existing file hash: {file_hash}"
|
|
)
|
|
return file_hash
|
|
|
|
logger.debug(
|
|
f"[get_or_create_file_entry] File entry not found, creating new one"
|
|
)
|
|
stat = abs_path.stat()
|
|
try:
|
|
cursor.execute(
|
|
"""
|
|
INSERT INTO files (hash, file_path, file_modified)
|
|
VALUES (?, ?, ?)
|
|
""",
|
|
(file_hash,
|
|
db_path,
|
|
stat.st_mtime),
|
|
)
|
|
except sqlite3.IntegrityError:
|
|
# Most likely: UNIQUE constraint on file_path. Re-fetch and return.
|
|
cursor.execute(
|
|
"SELECT hash FROM files WHERE file_path = ?",
|
|
(db_path,
|
|
)
|
|
)
|
|
row2 = cursor.fetchone()
|
|
if row2 and row2[0]:
|
|
existing_hash = str(row2[0])
|
|
logger.debug(
|
|
f"[get_or_create_file_entry] Recovered from UNIQUE(file_path): {db_path} -> {existing_hash}"
|
|
)
|
|
return existing_hash
|
|
raise
|
|
|
|
logger.debug(
|
|
f"[get_or_create_file_entry] Created new file entry for hash: {file_hash}"
|
|
)
|
|
|
|
# Auto-create title tag
|
|
filename_without_ext = abs_path.stem
|
|
if filename_without_ext:
|
|
# Normalize underscores to spaces for consistency
|
|
title_value = filename_without_ext.replace("_", " ").strip()
|
|
title_tag = f"title:{title_value}"
|
|
cursor.execute(
|
|
"""
|
|
INSERT OR IGNORE INTO tags (hash, tag)
|
|
VALUES (?, ?)
|
|
""",
|
|
(file_hash,
|
|
title_tag),
|
|
)
|
|
logger.debug(
|
|
f"[get_or_create_file_entry] Auto-created title tag for hash {file_hash}"
|
|
)
|
|
|
|
self.connection.commit()
|
|
logger.debug(f"[get_or_create_file_entry] Committed file entry {file_hash}")
|
|
return file_hash
|
|
except Exception as e:
|
|
logger.error(
|
|
f"[get_or_create_file_entry] ❌ Error getting/creating file entry for {file_path}: {e}",
|
|
exc_info=True,
|
|
)
|
|
raise
|
|
|
|
def get_file_hash(self, file_path: Path) -> Optional[str]:
|
|
"""Get the file hash for a file path, or None if not found."""
|
|
try:
|
|
abs_path = self._normalize_input_path(file_path)
|
|
str_path = self._to_db_file_path(abs_path)
|
|
cursor = self.connection.cursor()
|
|
cursor.execute("SELECT hash FROM files WHERE file_path = ?",
|
|
(str_path,
|
|
))
|
|
row = cursor.fetchone()
|
|
return row[0] if row else None
|
|
except Exception as e:
|
|
logger.error(f"Error getting file hash for {file_path}: {e}", exc_info=True)
|
|
return None
|
|
|
|
def get_metadata(self, file_hash: str) -> Optional[Dict[str, Any]]:
|
|
"""Get metadata for a file by hash."""
|
|
try:
|
|
cursor = self.connection.cursor()
|
|
|
|
cursor.execute(
|
|
"""
|
|
SELECT m.* FROM metadata m
|
|
WHERE m.hash = ?
|
|
""",
|
|
(file_hash,
|
|
),
|
|
)
|
|
|
|
row = cursor.fetchone()
|
|
if not row:
|
|
return None
|
|
|
|
metadata = dict(row)
|
|
|
|
# Parse JSON fields
|
|
for field in ["url", "relationships"]:
|
|
if metadata.get(field):
|
|
try:
|
|
metadata[field] = json.loads(metadata[field])
|
|
except (json.JSONDecodeError, TypeError):
|
|
metadata[field] = [] if field == "url" else {}
|
|
|
|
# Ensure relationships is always a dict
|
|
if metadata.get("relationships") is None:
|
|
metadata["relationships"] = {}
|
|
if not isinstance(metadata.get("relationships"), dict):
|
|
metadata["relationships"] = {}
|
|
|
|
return metadata
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error getting metadata for hash {file_hash}: {e}",
|
|
exc_info=True
|
|
)
|
|
return None
|
|
|
|
def set_relationship_by_hash(
|
|
self,
|
|
file_hash: str,
|
|
related_file_hash: str,
|
|
rel_type: str = "alt",
|
|
*,
|
|
bidirectional: bool = True,
|
|
) -> None:
|
|
"""Set a relationship between two files by hash.
|
|
|
|
This is the store/hash-first API. It avoids any dependency on local filesystem
|
|
paths and only requires that both hashes exist in the DB.
|
|
"""
|
|
try:
|
|
file_hash = str(file_hash or "").strip().lower()
|
|
related_file_hash = str(related_file_hash or "").strip().lower()
|
|
rel_type = str(rel_type or "alt").strip() or "alt"
|
|
|
|
if not file_hash or not related_file_hash:
|
|
raise ValueError("Missing file hash for relationship")
|
|
if file_hash == related_file_hash:
|
|
return
|
|
|
|
cursor = self.connection.cursor()
|
|
|
|
# Ensure both hashes exist in files table (metadata has FK to files)
|
|
cursor.execute("SELECT 1 FROM files WHERE hash = ?",
|
|
(file_hash,
|
|
))
|
|
if not cursor.fetchone():
|
|
raise ValueError(f"Hash not found in store DB: {file_hash}")
|
|
cursor.execute("SELECT 1 FROM files WHERE hash = ?",
|
|
(related_file_hash,
|
|
))
|
|
if not cursor.fetchone():
|
|
raise ValueError(f"Hash not found in store DB: {related_file_hash}")
|
|
|
|
# Load current relationships for the main file
|
|
cursor.execute(
|
|
"SELECT relationships FROM metadata WHERE hash = ?",
|
|
(file_hash,
|
|
)
|
|
)
|
|
row = cursor.fetchone()
|
|
relationships_str = row[0] if row else None
|
|
|
|
try:
|
|
relationships = json.loads(relationships_str
|
|
) if relationships_str else {}
|
|
except (json.JSONDecodeError, TypeError):
|
|
relationships = {}
|
|
if not isinstance(relationships, dict):
|
|
relationships = {}
|
|
|
|
relationships.setdefault(rel_type, [])
|
|
if not isinstance(relationships[rel_type], list):
|
|
relationships[rel_type] = []
|
|
if related_file_hash not in relationships[rel_type]:
|
|
relationships[rel_type].append(related_file_hash)
|
|
|
|
cursor.execute(
|
|
"""
|
|
INSERT INTO metadata (hash, relationships)
|
|
VALUES (?, ?)
|
|
ON CONFLICT(hash) DO UPDATE SET
|
|
relationships = excluded.relationships,
|
|
time_modified = CURRENT_TIMESTAMP,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
""",
|
|
(file_hash,
|
|
json.dumps(relationships)),
|
|
)
|
|
|
|
if bidirectional:
|
|
# Update the related file as well
|
|
cursor.execute(
|
|
"SELECT relationships FROM metadata WHERE hash = ?",
|
|
(related_file_hash,
|
|
)
|
|
)
|
|
row2 = cursor.fetchone()
|
|
relationships_str2 = row2[0] if row2 else None
|
|
try:
|
|
reverse_relationships = (
|
|
json.loads(relationships_str2) if relationships_str2 else {}
|
|
)
|
|
except (json.JSONDecodeError, TypeError):
|
|
reverse_relationships = {}
|
|
if not isinstance(reverse_relationships, dict):
|
|
reverse_relationships = {}
|
|
|
|
reverse_relationships.setdefault(rel_type, [])
|
|
if not isinstance(reverse_relationships[rel_type], list):
|
|
reverse_relationships[rel_type] = []
|
|
if file_hash not in reverse_relationships[rel_type]:
|
|
reverse_relationships[rel_type].append(file_hash)
|
|
|
|
cursor.execute(
|
|
"""
|
|
INSERT INTO metadata (hash, relationships)
|
|
VALUES (?, ?)
|
|
ON CONFLICT(hash) DO UPDATE SET
|
|
relationships = excluded.relationships,
|
|
time_modified = CURRENT_TIMESTAMP,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
""",
|
|
(related_file_hash,
|
|
json.dumps(reverse_relationships)),
|
|
)
|
|
|
|
self.connection.commit()
|
|
except Exception as e:
|
|
logger.error(f"Error setting relationship by hash: {e}", exc_info=True)
|
|
raise
|
|
|
|
def find_files_pointing_to_hash(self, target_hash: str) -> List[Dict[str, Any]]:
|
|
"""Find all files that have a relationship pointing to the target hash."""
|
|
try:
|
|
target_hash = str(target_hash or "").strip().lower()
|
|
if not target_hash:
|
|
return []
|
|
|
|
cursor = self.connection.cursor()
|
|
|
|
cursor.execute(
|
|
"""
|
|
SELECT f.hash, f.file_path, m.relationships
|
|
FROM metadata m
|
|
JOIN files f ON m.hash = f.hash
|
|
WHERE m.relationships LIKE ?
|
|
""",
|
|
(f"%{target_hash}%",
|
|
),
|
|
)
|
|
|
|
results: List[Dict[str, Any]] = []
|
|
for row in cursor.fetchall():
|
|
src_hash = row[0]
|
|
src_path = row[1]
|
|
rels_json = row[2]
|
|
try:
|
|
rels = json.loads(rels_json) if rels_json else {}
|
|
except (json.JSONDecodeError, TypeError):
|
|
continue
|
|
if not isinstance(rels, dict):
|
|
continue
|
|
for r_type, hashes in rels.items():
|
|
if not isinstance(hashes, list):
|
|
continue
|
|
if target_hash in [str(h or "").strip().lower() for h in hashes]:
|
|
results.append(
|
|
{
|
|
"hash": src_hash,
|
|
"path": src_path,
|
|
"type": r_type,
|
|
}
|
|
)
|
|
return results
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error finding files pointing to hash {target_hash}: {e}",
|
|
exc_info=True
|
|
)
|
|
return []
|
|
|
|
def save_metadata(self, file_path: Path, metadata: Dict[str, Any]) -> None:
|
|
"""Save metadata for a file."""
|
|
try:
|
|
abs_path = self._normalize_input_path(file_path)
|
|
db_path = self._to_db_file_path(abs_path)
|
|
logger.debug(f"[save_metadata] Starting save for: {db_path}")
|
|
|
|
file_hash = self.get_or_create_file_entry(abs_path, metadata.get("hash"))
|
|
logger.debug(f"[save_metadata] Got/created file_hash: {file_hash}")
|
|
|
|
cursor = self.connection.cursor()
|
|
|
|
url = metadata.get("url", [])
|
|
if not isinstance(url, str):
|
|
url = json.dumps(url)
|
|
|
|
relationships = metadata.get("relationships", [])
|
|
if not isinstance(relationships, str):
|
|
relationships = json.dumps(relationships)
|
|
|
|
# Determine type from ext if not provided
|
|
file_type = metadata.get("type")
|
|
ext = metadata.get("ext")
|
|
if not file_type and ext:
|
|
from SYS.utils_constant import get_type_from_ext
|
|
|
|
file_type = get_type_from_ext(str(ext))
|
|
|
|
cursor.execute(
|
|
"""
|
|
INSERT INTO metadata (
|
|
hash, url, relationships,
|
|
duration, size, ext, type,
|
|
time_imported, time_modified
|
|
)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
|
|
ON CONFLICT(hash) DO UPDATE SET
|
|
url = excluded.url,
|
|
relationships = excluded.relationships,
|
|
duration = excluded.duration,
|
|
size = excluded.size,
|
|
ext = excluded.ext,
|
|
type = excluded.type,
|
|
time_modified = CURRENT_TIMESTAMP,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
""",
|
|
(
|
|
file_hash,
|
|
url,
|
|
relationships,
|
|
metadata.get("duration"),
|
|
metadata.get("size"),
|
|
ext,
|
|
file_type,
|
|
),
|
|
)
|
|
|
|
self.connection.commit()
|
|
logger.debug(f"[save_metadata] Committed metadata for hash {file_hash}")
|
|
except Exception as e:
|
|
logger.error(
|
|
f"[save_metadata] ❌ Error saving metadata for {file_path}: {e}",
|
|
exc_info=True
|
|
)
|
|
raise
|
|
|
|
def save_file_info(
|
|
self,
|
|
file_path: Path,
|
|
metadata: Dict[str,
|
|
Any],
|
|
tags: List[str]
|
|
) -> None:
|
|
"""Save metadata and tags for a file in a single transaction."""
|
|
try:
|
|
abs_path = self._normalize_input_path(file_path)
|
|
db_path = self._to_db_file_path(abs_path)
|
|
logger.debug(f"[save_file_info] Starting save for: {db_path}")
|
|
|
|
file_hash = self.get_or_create_file_entry(abs_path, metadata.get("hash"))
|
|
|
|
cursor = self.connection.cursor()
|
|
|
|
# 1. Save Metadata
|
|
url = metadata.get("url", [])
|
|
if not isinstance(url, str):
|
|
url = json.dumps(url)
|
|
|
|
relationships = metadata.get("relationships", [])
|
|
if not isinstance(relationships, str):
|
|
relationships = json.dumps(relationships)
|
|
|
|
# Determine type from ext if not provided
|
|
file_type = metadata.get("type")
|
|
ext = metadata.get("ext")
|
|
if not file_type and ext:
|
|
from SYS.utils_constant import get_type_from_ext
|
|
|
|
file_type = get_type_from_ext(str(ext))
|
|
|
|
cursor.execute(
|
|
"""
|
|
INSERT INTO metadata (
|
|
hash, url, relationships,
|
|
duration, size, ext, type,
|
|
time_imported, time_modified
|
|
)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
|
|
ON CONFLICT(hash) DO UPDATE SET
|
|
url = excluded.url,
|
|
relationships = excluded.relationships,
|
|
duration = excluded.duration,
|
|
size = excluded.size,
|
|
ext = excluded.ext,
|
|
type = excluded.type,
|
|
time_modified = CURRENT_TIMESTAMP,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
""",
|
|
(
|
|
file_hash,
|
|
url,
|
|
relationships,
|
|
metadata.get("duration"),
|
|
metadata.get("size"),
|
|
ext,
|
|
file_type,
|
|
),
|
|
)
|
|
|
|
# 2. Save Tags
|
|
# We assume tags list is complete and includes title if needed
|
|
cursor.execute("DELETE FROM tags WHERE hash = ?",
|
|
(file_hash,
|
|
))
|
|
|
|
for tag in tags:
|
|
tag = tag.strip()
|
|
if tag:
|
|
cursor.execute(
|
|
"""
|
|
INSERT OR IGNORE INTO tags (hash, tag)
|
|
VALUES (?, ?)
|
|
""",
|
|
(file_hash,
|
|
tag),
|
|
)
|
|
|
|
self.connection.commit()
|
|
logger.debug(
|
|
f"[save_file_info] Committed metadata and tags for hash {file_hash}"
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"[save_file_info] ❌ Error saving file info for {file_path}: {e}",
|
|
exc_info=True
|
|
)
|
|
raise
|
|
|
|
def get_tags(self, file_hash: str) -> List[str]:
|
|
"""Get all tags for a file by hash."""
|
|
try:
|
|
cursor = self.connection.cursor()
|
|
|
|
cursor.execute(
|
|
"""
|
|
SELECT t.tag FROM tags t
|
|
WHERE t.hash = ?
|
|
ORDER BY t.tag
|
|
""",
|
|
(file_hash,
|
|
),
|
|
)
|
|
|
|
return [row[0] for row in cursor.fetchall()]
|
|
except Exception as e:
|
|
logger.error(f"Error getting tags for hash {file_hash}: {e}", exc_info=True)
|
|
return []
|
|
|
|
def save_tags(self, file_path: Path, tags: List[str]) -> None:
|
|
"""Save tags for a file, replacing all existing tags."""
|
|
try:
|
|
abs_path = self._normalize_input_path(file_path)
|
|
db_path = self._to_db_file_path(abs_path)
|
|
logger.debug(f"[save_tags] Starting save for: {db_path}")
|
|
|
|
file_hash = self.get_or_create_file_entry(abs_path)
|
|
logger.debug(f"[save_tags] Got/created file_hash: {file_hash}")
|
|
|
|
cursor = self.connection.cursor()
|
|
|
|
cursor.execute(
|
|
"""
|
|
SELECT tag FROM tags WHERE hash = ? AND tag LIKE 'title:%'
|
|
""",
|
|
(file_hash,
|
|
),
|
|
)
|
|
existing_title = cursor.fetchone()
|
|
|
|
cursor.execute("DELETE FROM tags WHERE hash = ?",
|
|
(file_hash,
|
|
))
|
|
logger.debug(f"[save_tags] Deleted existing tags for hash {file_hash}")
|
|
|
|
# Check if new tags provide a title
|
|
new_title_provided = any(
|
|
str(t).strip().lower().startswith("title:") for t in tags
|
|
)
|
|
|
|
if existing_title and not new_title_provided:
|
|
cursor.execute(
|
|
"""
|
|
INSERT INTO tags (hash, tag) VALUES (?, ?)
|
|
""",
|
|
(file_hash,
|
|
existing_title[0]),
|
|
)
|
|
logger.debug(f"[save_tags] Preserved existing title tag")
|
|
elif not existing_title and not new_title_provided:
|
|
filename_without_ext = abs_path.stem
|
|
if filename_without_ext:
|
|
# Normalize underscores to spaces for consistency
|
|
title_value = filename_without_ext.replace("_", " ").strip()
|
|
title_tag = f"title:{title_value}"
|
|
cursor.execute(
|
|
"""
|
|
INSERT INTO tags (hash, tag) VALUES (?, ?)
|
|
""",
|
|
(file_hash,
|
|
title_tag),
|
|
)
|
|
logger.debug(f"[save_tags] Created auto-title tag: {title_tag}")
|
|
|
|
for tag in tags:
|
|
tag = tag.strip()
|
|
if tag:
|
|
cursor.execute(
|
|
"""
|
|
INSERT OR IGNORE INTO tags (hash, tag)
|
|
VALUES (?, ?)
|
|
""",
|
|
(file_hash,
|
|
tag),
|
|
)
|
|
|
|
self.connection.commit()
|
|
logger.debug(f"[save_tags] Committed {len(tags)} tags for hash {file_hash}")
|
|
|
|
# Verify they were actually saved
|
|
cursor.execute("SELECT COUNT(*) FROM tags WHERE hash = ?",
|
|
(file_hash,
|
|
))
|
|
saved_count = cursor.fetchone()[0]
|
|
logger.debug(
|
|
f"[save_tags] Verified: {saved_count} tags in database for hash {file_hash}"
|
|
)
|
|
|
|
self._update_metadata_modified_time(file_hash)
|
|
except Exception as e:
|
|
logger.error(
|
|
f"[save_tags] ❌ Error saving tags for {file_path}: {e}",
|
|
exc_info=True
|
|
)
|
|
raise
|
|
|
|
def add_tags(self, file_path: Path, tags: List[str]) -> None:
|
|
"""Add tags to a file."""
|
|
try:
|
|
file_hash = self.get_or_create_file_entry(file_path)
|
|
cursor = self.connection.cursor()
|
|
|
|
user_title_tag = next(
|
|
(
|
|
tag.strip()
|
|
for tag in tags if tag.strip().lower().startswith("title:")
|
|
),
|
|
None
|
|
)
|
|
|
|
if user_title_tag:
|
|
cursor.execute(
|
|
"""
|
|
DELETE FROM tags WHERE hash = ? AND tag LIKE 'title:%'
|
|
""",
|
|
(file_hash,
|
|
),
|
|
)
|
|
else:
|
|
cursor.execute(
|
|
"""
|
|
SELECT COUNT(*) FROM tags WHERE hash = ? AND tag LIKE 'title:%'
|
|
""",
|
|
(file_hash,
|
|
),
|
|
)
|
|
|
|
has_title = cursor.fetchone()[0] > 0
|
|
if not has_title:
|
|
filename_without_ext = file_path.stem
|
|
if filename_without_ext:
|
|
# Normalize underscores to spaces for consistency
|
|
title_value = filename_without_ext.replace("_", " ").strip()
|
|
title_tag = f"title:{title_value}"
|
|
cursor.execute(
|
|
"""
|
|
INSERT OR IGNORE INTO tags (hash, tag)
|
|
VALUES (?, ?)
|
|
""",
|
|
(file_hash,
|
|
title_tag),
|
|
)
|
|
|
|
for tag in tags:
|
|
tag = tag.strip()
|
|
if tag:
|
|
cursor.execute(
|
|
"""
|
|
INSERT OR IGNORE INTO tags (hash, tag)
|
|
VALUES (?, ?)
|
|
""",
|
|
(file_hash,
|
|
tag),
|
|
)
|
|
|
|
self.connection.commit()
|
|
self._update_metadata_modified_time(file_hash)
|
|
logger.debug(f"Added {len(tags)} tags for {file_path}")
|
|
except Exception as e:
|
|
logger.error(f"Error adding tags for {file_path}: {e}", exc_info=True)
|
|
raise
|
|
|
|
def remove_tags(self, file_path: Path, tags: List[str]) -> None:
|
|
"""Remove specific tags from a file."""
|
|
try:
|
|
file_hash = self.get_or_create_file_entry(file_path)
|
|
cursor = self.connection.cursor()
|
|
|
|
for tag in tags:
|
|
tag = tag.strip()
|
|
if tag:
|
|
cursor.execute(
|
|
"""
|
|
DELETE FROM tags
|
|
WHERE hash = ?
|
|
AND tag = ?
|
|
""",
|
|
(file_hash,
|
|
tag),
|
|
)
|
|
|
|
self.connection.commit()
|
|
logger.debug(f"Removed {len(tags)} tags for {file_path}")
|
|
except Exception as e:
|
|
logger.error(f"Error removing tags for {file_path}: {e}", exc_info=True)
|
|
raise
|
|
|
|
def add_tags_to_hash(self, file_hash: str, tags: List[str]) -> None:
|
|
"""Add tags to a file by hash."""
|
|
try:
|
|
cursor = self.connection.cursor()
|
|
|
|
user_title_tag = next(
|
|
(
|
|
tag.strip()
|
|
for tag in tags if tag.strip().lower().startswith("title:")
|
|
),
|
|
None
|
|
)
|
|
|
|
if user_title_tag:
|
|
cursor.execute(
|
|
"""
|
|
DELETE FROM tags WHERE hash = ? AND tag LIKE 'title:%'
|
|
""",
|
|
(file_hash,
|
|
),
|
|
)
|
|
|
|
for tag in tags:
|
|
tag = tag.strip()
|
|
if tag:
|
|
cursor.execute(
|
|
"""
|
|
INSERT OR IGNORE INTO tags (hash, tag)
|
|
VALUES (?, ?)
|
|
""",
|
|
(file_hash,
|
|
tag),
|
|
)
|
|
|
|
self.connection.commit()
|
|
self._update_metadata_modified_time(file_hash)
|
|
logger.debug(f"Added {len(tags)} tags for hash {file_hash}")
|
|
except Exception as e:
|
|
logger.error(f"Error adding tags for hash {file_hash}: {e}", exc_info=True)
|
|
raise
|
|
|
|
def remove_tags_from_hash(self, file_hash: str, tags: List[str]) -> None:
|
|
"""Remove specific tags from a file by hash."""
|
|
try:
|
|
cursor = self.connection.cursor()
|
|
|
|
for tag in tags:
|
|
tag = tag.strip()
|
|
if tag:
|
|
cursor.execute(
|
|
"""
|
|
DELETE FROM tags
|
|
WHERE hash = ?
|
|
AND tag = ?
|
|
""",
|
|
(file_hash,
|
|
tag),
|
|
)
|
|
|
|
self.connection.commit()
|
|
logger.debug(f"Removed {len(tags)} tags for hash {file_hash}")
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error removing tags for hash {file_hash}: {e}",
|
|
exc_info=True
|
|
)
|
|
raise
|
|
|
|
def update_metadata_by_hash(
|
|
self,
|
|
file_hash: str,
|
|
metadata_updates: Dict[str,
|
|
Any]
|
|
) -> None:
|
|
"""Update metadata for a file by hash."""
|
|
try:
|
|
cursor = self.connection.cursor()
|
|
|
|
fields = []
|
|
values = []
|
|
|
|
for key, value in metadata_updates.items():
|
|
if key in ["url", "relationships"]:
|
|
if not isinstance(value, str):
|
|
value = json.dumps(value)
|
|
fields.append(f"{key} = ?")
|
|
values.append(value)
|
|
|
|
if not fields:
|
|
return
|
|
|
|
# Ensure a metadata row exists so updates don't silently no-op.
|
|
# This can happen for older DBs or entries created without explicit metadata.
|
|
cursor.execute(
|
|
"INSERT OR IGNORE INTO metadata (hash) VALUES (?)",
|
|
(file_hash,
|
|
),
|
|
)
|
|
|
|
values.append(file_hash)
|
|
|
|
sql = f"UPDATE metadata SET {', '.join(fields)}, time_modified = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP WHERE hash = ?"
|
|
cursor.execute(sql, values)
|
|
self.connection.commit()
|
|
logger.debug(f"Updated metadata for hash {file_hash}")
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error updating metadata for hash {file_hash}: {e}",
|
|
exc_info=True
|
|
)
|
|
raise
|
|
|
|
def set_relationship(
|
|
self,
|
|
file_path: Path,
|
|
related_file_path: Path,
|
|
rel_type: str = "alt",
|
|
*,
|
|
bidirectional: bool = True,
|
|
) -> None:
|
|
"""Set a relationship between two local files.
|
|
|
|
Args:
|
|
file_path: Path to the file being related
|
|
related_file_path: Path to the related file
|
|
rel_type: Type of relationship ('king', 'alt', 'related')
|
|
"""
|
|
try:
|
|
str_path = str(file_path.resolve())
|
|
str_related_path = str(related_file_path.resolve())
|
|
|
|
file_hash = self.get_or_create_file_entry(file_path)
|
|
related_file_hash = self.get_or_create_file_entry(related_file_path)
|
|
|
|
cursor = self.connection.cursor()
|
|
|
|
# Get current relationships for the main file
|
|
cursor.execute(
|
|
"""
|
|
SELECT relationships FROM metadata WHERE hash = ?
|
|
""",
|
|
(file_hash,
|
|
),
|
|
)
|
|
|
|
row = cursor.fetchone()
|
|
# Use index access to be safe regardless of row_factory
|
|
relationships_str = row[0] if row else None
|
|
|
|
try:
|
|
if relationships_str:
|
|
relationships = json.loads(relationships_str)
|
|
else:
|
|
relationships = {}
|
|
except (json.JSONDecodeError, TypeError):
|
|
relationships = {}
|
|
|
|
# Ensure relationships is a dict (handle case where DB has a list)
|
|
if not isinstance(relationships, dict):
|
|
relationships = {}
|
|
|
|
# Ensure rel_type key exists
|
|
if rel_type not in relationships:
|
|
relationships[rel_type] = []
|
|
|
|
# Add the relationship (store as hash string)
|
|
if related_file_hash not in relationships[rel_type]:
|
|
relationships[rel_type].append(related_file_hash)
|
|
|
|
# Save the updated relationships for the main file
|
|
cursor.execute(
|
|
"""
|
|
INSERT INTO metadata (hash, relationships)
|
|
VALUES (?, ?)
|
|
ON CONFLICT(hash) DO UPDATE SET
|
|
relationships = excluded.relationships,
|
|
time_modified = CURRENT_TIMESTAMP
|
|
""",
|
|
(file_hash,
|
|
json.dumps(relationships)),
|
|
)
|
|
|
|
logger.debug(
|
|
f"Set {rel_type} relationship: {str_path} ({file_hash}) -> {str_related_path} ({related_file_hash})"
|
|
)
|
|
|
|
if bidirectional:
|
|
# Set reverse relationship (bidirectional)
|
|
# For 'alt' and 'related', the reverse is the same
|
|
# For 'king', the reverse is 'subject' (or we just use 'alt' for simplicity as Hydrus does)
|
|
# Let's use the same type for now to keep it simple and consistent with Hydrus 'alternates'
|
|
reverse_type = rel_type
|
|
|
|
# Update the related file
|
|
cursor.execute(
|
|
"""
|
|
SELECT relationships FROM metadata WHERE hash = ?
|
|
""",
|
|
(related_file_hash,
|
|
),
|
|
)
|
|
|
|
row = cursor.fetchone()
|
|
relationships_str = row[0] if row else None
|
|
|
|
try:
|
|
if relationships_str:
|
|
reverse_relationships = json.loads(relationships_str)
|
|
else:
|
|
reverse_relationships = {}
|
|
except (json.JSONDecodeError, TypeError):
|
|
reverse_relationships = {}
|
|
|
|
if not isinstance(reverse_relationships, dict):
|
|
reverse_relationships = {}
|
|
|
|
if reverse_type not in reverse_relationships:
|
|
reverse_relationships[reverse_type] = []
|
|
|
|
if file_hash not in reverse_relationships[reverse_type]:
|
|
reverse_relationships[reverse_type].append(file_hash)
|
|
|
|
# Save the updated reverse relationships
|
|
cursor.execute(
|
|
"""
|
|
INSERT INTO metadata (hash, relationships)
|
|
VALUES (?, ?)
|
|
ON CONFLICT(hash) DO UPDATE SET
|
|
relationships = excluded.relationships,
|
|
time_modified = CURRENT_TIMESTAMP
|
|
""",
|
|
(related_file_hash,
|
|
json.dumps(reverse_relationships)),
|
|
)
|
|
|
|
self.connection.commit()
|
|
else:
|
|
self.connection.commit()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error setting relationship: {e}", exc_info=True)
|
|
raise
|
|
|
|
def find_files_pointing_to(self, target_path: Path) -> List[Dict[str, Any]]:
|
|
"""Find all files that have a relationship pointing to the target path.
|
|
|
|
Args:
|
|
target_path: The file path to look for in other files' relationships
|
|
|
|
Returns:
|
|
List of dicts with {path, type} for files pointing to target
|
|
"""
|
|
try:
|
|
# Prefer the DB's stored identity hash for the target.
|
|
target_hash = None
|
|
try:
|
|
target_hash = self.get_file_hash(target_path)
|
|
except Exception:
|
|
target_hash = None
|
|
|
|
# Fall back to hashing bytes if the path isn't known to the DB.
|
|
if not target_hash:
|
|
target_hash = sha256_file(target_path)
|
|
|
|
if not target_hash:
|
|
logger.warning(
|
|
f"Cannot find files pointing to {target_path}: unable to compute hash"
|
|
)
|
|
return []
|
|
|
|
return self.find_files_pointing_to_hash(target_hash)
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error finding files pointing to {target_path}: {e}",
|
|
exc_info=True
|
|
)
|
|
return []
|
|
|
|
def get_note(self, file_hash: str) -> Optional[str]:
|
|
"""Get the default note for a file by hash."""
|
|
try:
|
|
notes = self.get_notes(file_hash)
|
|
if not notes:
|
|
return None
|
|
return notes.get("default")
|
|
except Exception as e:
|
|
logger.error(f"Error getting note for hash {file_hash}: {e}", exc_info=True)
|
|
return None
|
|
|
|
def get_notes(self, file_hash: str) -> Dict[str, str]:
|
|
"""Get all notes for a file by hash."""
|
|
try:
|
|
cursor = self.connection.cursor()
|
|
cursor.execute(
|
|
"SELECT name, note FROM notes WHERE hash = ? ORDER BY name ASC",
|
|
(file_hash,
|
|
),
|
|
)
|
|
out: Dict[str,
|
|
str] = {}
|
|
for name, note in cursor.fetchall() or []:
|
|
if not name:
|
|
continue
|
|
out[str(name)] = str(note or "")
|
|
return out
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error getting notes for hash {file_hash}: {e}",
|
|
exc_info=True
|
|
)
|
|
return {}
|
|
|
|
def save_note(self, file_path: Path, note: str) -> None:
|
|
"""Save the default note for a file."""
|
|
self.set_note(file_path, "default", note)
|
|
|
|
def set_note(self, file_path: Path, name: str, note: str) -> None:
|
|
"""Set a named note for a file."""
|
|
try:
|
|
note_name = str(name or "").strip()
|
|
if not note_name:
|
|
raise ValueError("Note name is required")
|
|
|
|
file_hash = self.get_or_create_file_entry(file_path)
|
|
cursor = self.connection.cursor()
|
|
cursor.execute(
|
|
"""
|
|
INSERT INTO notes (hash, name, note)
|
|
VALUES (?, ?, ?)
|
|
ON CONFLICT(hash, name) DO UPDATE SET
|
|
note = excluded.note,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
""",
|
|
(file_hash,
|
|
note_name,
|
|
note),
|
|
)
|
|
self.connection.commit()
|
|
logger.debug(f"Saved note '{note_name}' for {file_path}")
|
|
except Exception as e:
|
|
logger.error(f"Error saving note for {file_path}: {e}", exc_info=True)
|
|
raise
|
|
|
|
def delete_note(self, file_hash: str, name: str) -> None:
|
|
"""Delete a named note for a file by hash."""
|
|
try:
|
|
note_name = str(name or "").strip()
|
|
if not note_name:
|
|
raise ValueError("Note name is required")
|
|
cursor = self.connection.cursor()
|
|
cursor.execute(
|
|
"DELETE FROM notes WHERE hash = ? AND name = ?",
|
|
(file_hash,
|
|
note_name),
|
|
)
|
|
self.connection.commit()
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error deleting note '{name}' for hash {file_hash}: {e}",
|
|
exc_info=True
|
|
)
|
|
raise
|
|
|
|
def search_by_tag(self, tag: str, limit: int = 100) -> List[tuple]:
|
|
"""Search for files with a specific tag. Returns list of (hash, file_path) tuples."""
|
|
try:
|
|
cursor = self.connection.cursor()
|
|
|
|
cursor.execute(
|
|
"""
|
|
SELECT DISTINCT f.hash, f.file_path FROM files f
|
|
JOIN tags t ON f.hash = t.hash
|
|
WHERE t.tag = ?
|
|
LIMIT ?
|
|
""",
|
|
(tag,
|
|
limit),
|
|
)
|
|
|
|
rows = cursor.fetchall() or []
|
|
results: List[tuple] = []
|
|
for row in rows:
|
|
try:
|
|
file_hash = str(row[0])
|
|
db_path = str(row[1])
|
|
results.append((file_hash, str(self._from_db_file_path(db_path))))
|
|
except Exception:
|
|
continue
|
|
return results
|
|
except Exception as e:
|
|
logger.error(f"Error searching by tag '{tag}': {e}", exc_info=True)
|
|
return []
|
|
|
|
def search_hash(self, file_hash: str) -> Optional[Path]:
|
|
"""Search for a file by hash."""
|
|
try:
|
|
cursor = self.connection.cursor()
|
|
|
|
cursor.execute(
|
|
"""
|
|
SELECT file_path FROM files WHERE hash = ?
|
|
""",
|
|
(file_hash,
|
|
),
|
|
)
|
|
|
|
row = cursor.fetchone()
|
|
return self._from_db_file_path(row[0]) if row else None
|
|
except Exception as e:
|
|
logger.error(f"Error searching by hash '{file_hash}': {e}", exc_info=True)
|
|
return None
|
|
|
|
def update_file_hash(self, file_path: Path, file_hash: str) -> None:
|
|
"""Deprecated: Hash is managed as primary key. This method is no-op.
|
|
|
|
In the new hash-based schema, the file hash is the primary key (immutable).
|
|
Use get_or_create_file_entry() to ensure the hash is properly registered.
|
|
"""
|
|
# This is now a no-op since hash is the immutable primary key
|
|
pass
|
|
|
|
def rename_file(self, old_path: Path, new_path: Path) -> None:
|
|
"""Rename a file in the database, preserving all metadata."""
|
|
try:
|
|
abs_old = self._normalize_input_path(old_path)
|
|
abs_new = self._normalize_input_path(new_path)
|
|
str_old_path = self._to_db_file_path(abs_old)
|
|
str_new_path = self._to_db_file_path(abs_new)
|
|
cursor = self.connection.cursor()
|
|
|
|
cursor.execute(
|
|
"""
|
|
UPDATE files SET file_path = ?, updated_at = CURRENT_TIMESTAMP
|
|
WHERE file_path = ?
|
|
""",
|
|
(str_new_path,
|
|
str_old_path),
|
|
)
|
|
|
|
self.connection.commit()
|
|
logger.debug(f"Renamed file in database: {old_path} → {new_path}")
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error renaming file from {old_path} to {new_path}: {e}",
|
|
exc_info=True
|
|
)
|
|
raise
|
|
|
|
def cleanup_missing_files(self) -> int:
|
|
"""Remove entries for files that no longer exist."""
|
|
try:
|
|
cursor = self.connection.cursor()
|
|
cursor.execute("SELECT hash, file_path FROM files")
|
|
|
|
removed_count = 0
|
|
for file_hash, file_path in cursor.fetchall():
|
|
try:
|
|
abs_path = self._from_db_file_path(file_path)
|
|
except Exception:
|
|
abs_path = Path(file_path)
|
|
if not abs_path.exists():
|
|
cursor.execute("DELETE FROM files WHERE hash = ?",
|
|
(file_hash,
|
|
))
|
|
removed_count += 1
|
|
|
|
self.connection.commit()
|
|
logger.info(f"Cleaned up {removed_count} missing file entries")
|
|
return removed_count
|
|
except Exception as e:
|
|
logger.error(f"Error cleaning up missing files: {e}", exc_info=True)
|
|
return 0
|
|
|
|
def delete_file(self, file_path: Path) -> bool:
|
|
"""Delete a file from the database by path.
|
|
|
|
Cascades to metadata, tags, notes, etc, and also cleans up relationship
|
|
backlinks in other files so no file retains dangling references to the
|
|
deleted hash.
|
|
"""
|
|
try:
|
|
abs_path = self._normalize_input_path(file_path)
|
|
str_path = self._to_db_file_path(abs_path)
|
|
cursor = self.connection.cursor()
|
|
|
|
# Get the hash first (for logging)
|
|
cursor.execute("SELECT hash FROM files WHERE file_path = ?",
|
|
(str_path,
|
|
))
|
|
row = cursor.fetchone()
|
|
if not row:
|
|
logger.debug(f"File not found in database: {str_path}")
|
|
return False
|
|
|
|
file_hash = row[0]
|
|
|
|
# Remove backlinks from other files that reference this hash.
|
|
try:
|
|
target_hash = str(file_hash or "").strip().lower()
|
|
backlinks = self.find_files_pointing_to_hash(target_hash)
|
|
by_src: Dict[str,
|
|
set[str]] = {}
|
|
for b in backlinks:
|
|
src = str((b or {}).get("hash") or "").strip().lower()
|
|
rt = str((b or {}).get("type") or "").strip()
|
|
if not src or src == target_hash or not rt:
|
|
continue
|
|
by_src.setdefault(src, set()).add(rt)
|
|
|
|
for src_hash, rel_types in by_src.items():
|
|
meta = self.get_metadata(src_hash) or {}
|
|
rels = meta.get("relationships") if isinstance(meta, dict) else None
|
|
if not isinstance(rels, dict) or not rels:
|
|
continue
|
|
|
|
changed = False
|
|
for rt in rel_types:
|
|
key_to_edit = None
|
|
for k in list(rels.keys()):
|
|
if str(k).lower() == str(rt).lower():
|
|
key_to_edit = str(k)
|
|
break
|
|
if not key_to_edit:
|
|
continue
|
|
|
|
bucket = rels.get(key_to_edit)
|
|
if not isinstance(bucket, list) or not bucket:
|
|
continue
|
|
|
|
new_bucket = [
|
|
h for h in bucket
|
|
if str(h or "").strip().lower() != target_hash
|
|
]
|
|
if len(new_bucket) == len(bucket):
|
|
continue
|
|
|
|
changed = True
|
|
if new_bucket:
|
|
rels[key_to_edit] = new_bucket
|
|
else:
|
|
try:
|
|
del rels[key_to_edit]
|
|
except Exception:
|
|
rels[key_to_edit] = []
|
|
|
|
if changed:
|
|
cursor.execute(
|
|
"""
|
|
INSERT INTO metadata (hash, relationships)
|
|
VALUES (?, ?)
|
|
ON CONFLICT(hash) DO UPDATE SET
|
|
relationships = excluded.relationships,
|
|
time_modified = CURRENT_TIMESTAMP,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
""",
|
|
(src_hash,
|
|
json.dumps(rels if rels else {})),
|
|
)
|
|
except Exception:
|
|
# Best-effort cleanup; deletion should still proceed.
|
|
pass
|
|
|
|
# Delete the file entry (cascades to metadata, tags, notes, etc via foreign keys)
|
|
cursor.execute("DELETE FROM files WHERE file_path = ?",
|
|
(str_path,
|
|
))
|
|
self.connection.commit()
|
|
|
|
logger.debug(f"Deleted file from database: {str_path} (hash: {file_hash})")
|
|
return cursor.rowcount > 0
|
|
except Exception as e:
|
|
logger.error(f"Error deleting file {file_path}: {e}", exc_info=True)
|
|
return False
|
|
|
|
# ========================================================================
|
|
# WORKER MANAGEMENT
|
|
# ========================================================================
|
|
|
|
def insert_worker(
|
|
self,
|
|
worker_id: str,
|
|
worker_type: str,
|
|
title: str = "",
|
|
description: str = "",
|
|
total_steps: int = 0,
|
|
pipe: Optional[str] = None,
|
|
) -> int:
|
|
"""Insert a new worker entry into the database."""
|
|
try:
|
|
cursor = self.connection.cursor()
|
|
cursor.execute(
|
|
"""
|
|
INSERT INTO worker (worker_id, worker_type, pipe, status, title, description, total_steps)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
worker_id,
|
|
worker_type,
|
|
pipe,
|
|
"running",
|
|
title,
|
|
description,
|
|
total_steps
|
|
),
|
|
)
|
|
self.connection.commit()
|
|
return cursor.lastrowid or 0
|
|
except sqlite3.IntegrityError:
|
|
return self.update_worker_status(worker_id, "running")
|
|
except Exception as e:
|
|
logger.error(f"Error inserting worker: {e}", exc_info=True)
|
|
return 0
|
|
|
|
def update_worker(self, worker_id: str, **kwargs) -> bool:
|
|
"""Update worker entry with given fields."""
|
|
try:
|
|
allowed_fields = {
|
|
"status",
|
|
"progress",
|
|
"current_step",
|
|
"error_message",
|
|
"result_data",
|
|
"title",
|
|
"description",
|
|
"completed_at",
|
|
"total_steps",
|
|
"pipe",
|
|
"started_at",
|
|
"last_stdout_at",
|
|
}
|
|
update_fields = {
|
|
k: v
|
|
for k, v in kwargs.items() if k in allowed_fields
|
|
}
|
|
|
|
if not update_fields:
|
|
return True
|
|
|
|
update_fields["last_updated"] = datetime.now().isoformat()
|
|
cursor = self.connection.cursor()
|
|
set_clause = ", ".join(f"{k} = ?" for k in update_fields.keys())
|
|
values = list(update_fields.values()) + [worker_id]
|
|
|
|
cursor.execute(
|
|
f"""
|
|
UPDATE worker SET {set_clause} WHERE worker_id = ?
|
|
""",
|
|
values,
|
|
)
|
|
|
|
self.connection.commit()
|
|
return cursor.rowcount > 0
|
|
except Exception as e:
|
|
logger.error(f"Error updating worker {worker_id}: {e}", exc_info=True)
|
|
return False
|
|
|
|
def update_worker_status(self, worker_id: str, status: str) -> int:
|
|
"""Update worker status and return its database ID."""
|
|
try:
|
|
cursor = self.connection.cursor()
|
|
|
|
if status in ("completed", "error"):
|
|
cursor.execute(
|
|
"""
|
|
UPDATE worker
|
|
SET status = ?, completed_at = CURRENT_TIMESTAMP, last_updated = CURRENT_TIMESTAMP
|
|
WHERE worker_id = ?
|
|
""",
|
|
(status,
|
|
worker_id),
|
|
)
|
|
else:
|
|
cursor.execute(
|
|
"""
|
|
UPDATE worker
|
|
SET status = ?, last_updated = CURRENT_TIMESTAMP
|
|
WHERE worker_id = ?
|
|
""",
|
|
(status,
|
|
worker_id),
|
|
)
|
|
|
|
self.connection.commit()
|
|
|
|
cursor.execute("SELECT id FROM worker WHERE worker_id = ?",
|
|
(worker_id,
|
|
))
|
|
row = cursor.fetchone()
|
|
return row[0] if row else 0
|
|
except Exception as e:
|
|
logger.error(f"Error updating worker status: {e}", exc_info=True)
|
|
return 0
|
|
|
|
def get_worker(self, worker_id: str) -> Optional[Dict[str, Any]]:
|
|
"""Retrieve a worker entry by ID."""
|
|
try:
|
|
cursor = self.connection.cursor()
|
|
cursor.execute("SELECT * FROM worker WHERE worker_id = ?",
|
|
(worker_id,
|
|
))
|
|
row = cursor.fetchone()
|
|
return dict(row) if row else None
|
|
except Exception as e:
|
|
logger.error(f"Error retrieving worker: {e}", exc_info=True)
|
|
return None
|
|
|
|
def get_active_workers(self) -> List[Dict[str, Any]]:
|
|
"""Get all active (running) workers."""
|
|
try:
|
|
cursor = self.connection.cursor()
|
|
cursor.execute(
|
|
"SELECT * FROM worker WHERE status = 'running' ORDER BY started_at DESC"
|
|
)
|
|
return [dict(row) for row in cursor.fetchall()]
|
|
except Exception as e:
|
|
logger.error(f"Error retrieving active workers: {e}", exc_info=True)
|
|
return []
|
|
|
|
def get_all_workers(self, limit: int = 100) -> List[Dict[str, Any]]:
|
|
"""Get all workers (recent first)."""
|
|
try:
|
|
cursor = self.connection.cursor()
|
|
cursor.execute(
|
|
"""
|
|
SELECT * FROM worker ORDER BY started_at DESC LIMIT ?
|
|
""",
|
|
(limit,
|
|
),
|
|
)
|
|
return [dict(row) for row in cursor.fetchall()]
|
|
except Exception as e:
|
|
logger.error(f"Error retrieving all workers: {e}", exc_info=True)
|
|
return []
|
|
|
|
def delete_worker(self, worker_id: str) -> bool:
|
|
"""Delete a worker entry."""
|
|
try:
|
|
cursor = self.connection.cursor()
|
|
cursor.execute("DELETE FROM worker WHERE worker_id = ?",
|
|
(worker_id,
|
|
))
|
|
self.connection.commit()
|
|
return cursor.rowcount > 0
|
|
except Exception as e:
|
|
logger.error(f"Error deleting worker: {e}", exc_info=True)
|
|
return False
|
|
|
|
def cleanup_old_workers(self, days: int = 7) -> int:
|
|
"""Clean up completed/errored workers older than specified days."""
|
|
try:
|
|
cursor = self.connection.cursor()
|
|
cursor.execute(
|
|
"""
|
|
DELETE FROM worker
|
|
WHERE status IN ('completed', 'error')
|
|
AND completed_at < datetime('now', '-' || ? || ' days')
|
|
""",
|
|
(days,
|
|
),
|
|
)
|
|
self.connection.commit()
|
|
return cursor.rowcount
|
|
except Exception as e:
|
|
logger.error(f"Error cleaning up old workers: {e}", exc_info=True)
|
|
return 0
|
|
|
|
def expire_running_workers(
|
|
self,
|
|
older_than_seconds: int = 300,
|
|
status: str = "error",
|
|
reason: str | None = None,
|
|
worker_id_prefix: str | None = None,
|
|
) -> int:
|
|
"""Mark long-idle running workers as finished with the given status.
|
|
|
|
Args:
|
|
older_than_seconds: Minimum idle time before expiring the worker.
|
|
status: New status to apply (e.g., "error" or "cancelled").
|
|
reason: Error message to set when none is present.
|
|
worker_id_prefix: Optional LIKE pattern (e.g., 'cli_%') to scope updates.
|
|
|
|
Returns:
|
|
Number of workers updated.
|
|
"""
|
|
idle_seconds = max(1, int(older_than_seconds))
|
|
cutoff = f"-{idle_seconds} seconds"
|
|
auto_reason = reason or "Worker stopped responding; auto-marked as error"
|
|
try:
|
|
cursor = self.connection.cursor()
|
|
if worker_id_prefix:
|
|
cursor.execute(
|
|
"""
|
|
UPDATE worker
|
|
SET status = ?,
|
|
error_message = CASE
|
|
WHEN IFNULL(TRIM(error_message), '') = '' THEN ?
|
|
ELSE error_message
|
|
END,
|
|
completed_at = COALESCE(completed_at, CURRENT_TIMESTAMP),
|
|
last_updated = CURRENT_TIMESTAMP
|
|
WHERE status = 'running'
|
|
AND worker_id LIKE ?
|
|
AND COALESCE(last_updated, started_at, created_at) < datetime('now', ?)
|
|
""",
|
|
(status,
|
|
auto_reason,
|
|
worker_id_prefix,
|
|
cutoff),
|
|
)
|
|
else:
|
|
cursor.execute(
|
|
"""
|
|
UPDATE worker
|
|
SET status = ?,
|
|
error_message = CASE
|
|
WHEN IFNULL(TRIM(error_message), '') = '' THEN ?
|
|
ELSE error_message
|
|
END,
|
|
completed_at = COALESCE(completed_at, CURRENT_TIMESTAMP),
|
|
last_updated = CURRENT_TIMESTAMP
|
|
WHERE status = 'running'
|
|
AND COALESCE(last_updated, started_at, created_at) < datetime('now', ?)
|
|
""",
|
|
(status,
|
|
auto_reason,
|
|
cutoff),
|
|
)
|
|
self.connection.commit()
|
|
return cursor.rowcount
|
|
except Exception as exc:
|
|
logger.error(f"Error expiring stale workers: {exc}", exc_info=True)
|
|
return 0
|
|
|
|
def append_worker_stdout(
|
|
self,
|
|
worker_id: str,
|
|
text: str,
|
|
step: Optional[str] = None,
|
|
channel: str = "stdout"
|
|
) -> bool:
|
|
"""Append text to a worker's stdout log and timeline."""
|
|
if not text:
|
|
return True
|
|
try:
|
|
# Check if connection is valid
|
|
if not self.connection:
|
|
logger.warning(
|
|
f"Database connection not available for worker {worker_id}"
|
|
)
|
|
return False
|
|
|
|
cursor = self.connection.cursor()
|
|
cursor.execute(
|
|
"SELECT stdout FROM worker WHERE worker_id = ?",
|
|
(worker_id,
|
|
)
|
|
)
|
|
row = cursor.fetchone()
|
|
|
|
if not row:
|
|
logger.warning(f"Worker {worker_id} not found for stdout append")
|
|
return False
|
|
|
|
current_stdout = row[0] or ""
|
|
separator = (
|
|
"" if not current_stdout else
|
|
("" if current_stdout.endswith("\n") else "\n")
|
|
)
|
|
new_stdout = f"{current_stdout}{separator}{text}\n"
|
|
|
|
cursor.execute(
|
|
"""
|
|
UPDATE worker SET stdout = ?, last_updated = CURRENT_TIMESTAMP,
|
|
last_stdout_at = CURRENT_TIMESTAMP
|
|
WHERE worker_id = ?
|
|
""",
|
|
(new_stdout,
|
|
worker_id),
|
|
)
|
|
self._insert_worker_log_entry(
|
|
cursor,
|
|
worker_id,
|
|
"stdout",
|
|
text,
|
|
step,
|
|
channel
|
|
)
|
|
|
|
self.connection.commit()
|
|
return cursor.rowcount > 0
|
|
except sqlite3.ProgrammingError as e:
|
|
# Handle "Cannot operate on a closed database" gracefully
|
|
if "closed database" in str(e).lower():
|
|
logger.warning(
|
|
f"Database connection closed, cannot append stdout for worker {worker_id}"
|
|
)
|
|
return False
|
|
logger.error(
|
|
f"Error appending stdout to worker {worker_id}: {e}",
|
|
exc_info=True
|
|
)
|
|
return False
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error appending stdout to worker {worker_id}: {e}",
|
|
exc_info=True
|
|
)
|
|
return False
|
|
|
|
def get_worker_stdout(self, worker_id: str) -> str:
|
|
"""Get stdout logs for a worker."""
|
|
try:
|
|
cursor = self.connection.cursor()
|
|
cursor.execute(
|
|
"SELECT stdout FROM worker WHERE worker_id = ?",
|
|
(worker_id,
|
|
)
|
|
)
|
|
row = cursor.fetchone()
|
|
return row[0] if row and row[0] else ""
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error getting worker stdout for {worker_id}: {e}",
|
|
exc_info=True
|
|
)
|
|
return ""
|
|
|
|
def append_worker_steps(self, worker_id: str, step_text: str) -> bool:
|
|
"""Append a step to a worker's step log and timeline."""
|
|
if not step_text:
|
|
return True
|
|
try:
|
|
cursor = self.connection.cursor()
|
|
cursor.execute(
|
|
"SELECT steps FROM worker WHERE worker_id = ?",
|
|
(worker_id,
|
|
)
|
|
)
|
|
row = cursor.fetchone()
|
|
|
|
if not row:
|
|
logger.warning(f"Worker {worker_id} not found for steps append")
|
|
return False
|
|
|
|
current_steps = row[0] or ""
|
|
timestamp = datetime.now().strftime("%H:%M:%S")
|
|
step_entry = f"[{timestamp}] {step_text}"
|
|
new_steps = (current_steps + "\n" if current_steps else "") + step_entry
|
|
|
|
cursor.execute(
|
|
"""
|
|
UPDATE worker SET steps = ?, last_updated = CURRENT_TIMESTAMP,
|
|
current_step = ?
|
|
WHERE worker_id = ?
|
|
""",
|
|
(new_steps,
|
|
step_text,
|
|
worker_id),
|
|
)
|
|
self._insert_worker_log_entry(
|
|
cursor,
|
|
worker_id,
|
|
"step",
|
|
step_text,
|
|
step_text,
|
|
"step"
|
|
)
|
|
|
|
self.connection.commit()
|
|
return cursor.rowcount > 0
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error appending step to worker {worker_id}: {e}",
|
|
exc_info=True
|
|
)
|
|
return False
|
|
|
|
def get_worker_steps(self, worker_id: str) -> str:
|
|
"""Get step logs for a worker."""
|
|
try:
|
|
cursor = self.connection.cursor()
|
|
cursor.execute(
|
|
"SELECT steps FROM worker WHERE worker_id = ?",
|
|
(worker_id,
|
|
)
|
|
)
|
|
row = cursor.fetchone()
|
|
return row[0] if row and row[0] else ""
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error getting worker steps for {worker_id}: {e}",
|
|
exc_info=True
|
|
)
|
|
return ""
|
|
|
|
def clear_worker_stdout(self, worker_id: str) -> bool:
|
|
"""Clear stdout logs for a worker."""
|
|
try:
|
|
cursor = self.connection.cursor()
|
|
cursor.execute(
|
|
"""
|
|
UPDATE worker SET stdout = '', last_updated = CURRENT_TIMESTAMP
|
|
WHERE worker_id = ?
|
|
""",
|
|
(worker_id,
|
|
),
|
|
)
|
|
self.clear_worker_events(worker_id, event_type="stdout")
|
|
self.connection.commit()
|
|
return cursor.rowcount > 0
|
|
except Exception as e:
|
|
logger.error(f"Error clearing worker stdout: {e}", exc_info=True)
|
|
return False
|
|
|
|
def clear_finished_workers(self) -> int:
|
|
"""Delete all workers that are not currently running."""
|
|
try:
|
|
cursor = self.connection.cursor()
|
|
cursor.execute("DELETE FROM worker WHERE status != 'running'")
|
|
self.connection.commit()
|
|
return cursor.rowcount
|
|
except Exception as e:
|
|
logger.error(f"Error clearing finished workers: {e}", exc_info=True)
|
|
return 0
|
|
|
|
def close(self) -> None:
|
|
"""Close the database connection."""
|
|
try:
|
|
if self.connection:
|
|
self.connection.close()
|
|
logger.info("Database connection closed")
|
|
except Exception as e:
|
|
logger.error(f"Error closing database: {e}", exc_info=True)
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
self.close()
|
|
|
|
|
|
# ============================================================================
|
|
# DATABASE QUERY API
|
|
# ============================================================================
|
|
|
|
|
|
class DatabaseAPI:
|
|
"""Query API wrapper for LocalLibraryDB providing specialized search methods."""
|
|
|
|
def __init__(self, search_dir: Path):
|
|
self.search_dir = search_dir
|
|
self.db = API_folder_store(search_dir)
|
|
|
|
def __enter__(self):
|
|
self.db.__enter__()
|
|
return self
|
|
|
|
def __exit__(self, *args):
|
|
return self.db.__exit__(*args)
|
|
|
|
def get_cursor(self):
|
|
return self.db.connection.cursor()
|
|
|
|
def get_file_hash_by_hash(self, file_hash: str) -> Optional[str]:
|
|
"""Get file hash from the database, or None if not found."""
|
|
cursor = self.get_cursor()
|
|
cursor.execute(
|
|
"SELECT hash FROM files WHERE LOWER(hash) = ?",
|
|
(file_hash.lower(),
|
|
)
|
|
)
|
|
row = cursor.fetchone()
|
|
return row[0] if row else None
|
|
|
|
def get_all_file_hashes(self) -> Set[str]:
|
|
"""Get all file hashes in the database."""
|
|
cursor = self.get_cursor()
|
|
cursor.execute("SELECT hash FROM files")
|
|
return {row[0]
|
|
for row in cursor.fetchall()}
|
|
|
|
def get_file_hashes_by_tag_pattern(self, query_pattern: str) -> List[tuple]:
|
|
"""Get (hash, tag) tuples matching a tag pattern."""
|
|
cursor = self.get_cursor()
|
|
cursor.execute(
|
|
"""
|
|
SELECT DISTINCT f.hash, t.tag
|
|
FROM files f
|
|
JOIN tags t ON f.hash = t.hash
|
|
WHERE LOWER(t.tag) LIKE ?
|
|
""",
|
|
(query_pattern,
|
|
),
|
|
)
|
|
return cursor.fetchall()
|
|
|
|
def get_file_hashes_by_path_pattern(self, like_pattern: str) -> Set[str]:
|
|
"""Get hashes of files matching a path pattern."""
|
|
cursor = self.get_cursor()
|
|
cursor.execute(
|
|
"SELECT DISTINCT hash FROM files WHERE LOWER(file_path) LIKE ?",
|
|
(like_pattern,
|
|
)
|
|
)
|
|
return {row[0]
|
|
for row in cursor.fetchall()}
|
|
|
|
def get_file_hashes_by_tag_substring(self, like_pattern: str) -> Set[str]:
|
|
"""Get hashes of files matching a tag substring."""
|
|
cursor = self.get_cursor()
|
|
cursor.execute(
|
|
"""
|
|
SELECT DISTINCT f.hash
|
|
FROM files f
|
|
JOIN tags t ON f.hash = t.hash
|
|
WHERE LOWER(t.tag) LIKE ?
|
|
""",
|
|
(like_pattern,
|
|
),
|
|
)
|
|
return {row[0]
|
|
for row in cursor.fetchall()}
|
|
|
|
def get_file_hashes_with_any_url(self, limit: Optional[int] = None) -> Set[str]:
|
|
"""Get hashes of files that have any non-empty URL metadata."""
|
|
cursor = self.get_cursor()
|
|
cursor.execute(
|
|
"""
|
|
SELECT DISTINCT f.hash
|
|
FROM files f
|
|
JOIN metadata m ON f.hash = m.hash
|
|
WHERE m.url IS NOT NULL
|
|
AND TRIM(m.url) != ''
|
|
AND TRIM(m.url) != '[]'
|
|
LIMIT ?
|
|
""",
|
|
(limit or 10000,
|
|
),
|
|
)
|
|
return {row[0]
|
|
for row in cursor.fetchall()}
|
|
|
|
def get_file_hashes_by_url_like(
|
|
self,
|
|
like_pattern: str,
|
|
limit: Optional[int] = None
|
|
) -> Set[str]:
|
|
"""Get hashes of files whose URL metadata contains a substring (case-insensitive)."""
|
|
cursor = self.get_cursor()
|
|
cursor.execute(
|
|
"""
|
|
SELECT DISTINCT f.hash
|
|
FROM files f
|
|
JOIN metadata m ON f.hash = m.hash
|
|
WHERE m.url IS NOT NULL
|
|
AND LOWER(m.url) LIKE ?
|
|
LIMIT ?
|
|
""",
|
|
(like_pattern.lower(),
|
|
limit or 10000),
|
|
)
|
|
return {row[0]
|
|
for row in cursor.fetchall()}
|
|
|
|
def get_file_hashes_by_ext(self,
|
|
ext_value: str,
|
|
limit: Optional[int] = None) -> Set[str]:
|
|
"""Get hashes of files whose metadata ext matches the given extension.
|
|
|
|
Matches case-insensitively and ignores any leading '.' in stored ext.
|
|
Supports glob wildcards '*' and '?' in the query.
|
|
"""
|
|
ext_clean = str(ext_value or "").strip().lower().lstrip(".")
|
|
ext_clean = "".join(ch for ch in ext_clean if ch.isalnum())
|
|
if not ext_clean:
|
|
return set()
|
|
|
|
cursor = self.get_cursor()
|
|
|
|
has_glob = ("*" in ext_value) or ("?" in ext_value)
|
|
if has_glob:
|
|
pattern = str(ext_value or "").strip().lower().lstrip(".")
|
|
pattern = pattern.replace("%", "\\%").replace("_", "\\_")
|
|
pattern = pattern.replace("*", "%").replace("?", "_")
|
|
cursor.execute(
|
|
"""
|
|
SELECT DISTINCT f.hash
|
|
FROM files f
|
|
JOIN metadata m ON f.hash = m.hash
|
|
WHERE LOWER(LTRIM(COALESCE(m.ext, ''), '.')) LIKE ? ESCAPE '\\'
|
|
LIMIT ?
|
|
""",
|
|
(pattern,
|
|
limit or 10000),
|
|
)
|
|
else:
|
|
cursor.execute(
|
|
"""
|
|
SELECT DISTINCT f.hash
|
|
FROM files f
|
|
JOIN metadata m ON f.hash = m.hash
|
|
WHERE LOWER(LTRIM(COALESCE(m.ext, ''), '.')) = ?
|
|
LIMIT ?
|
|
""",
|
|
(ext_clean,
|
|
limit or 10000),
|
|
)
|
|
return {row[0]
|
|
for row in cursor.fetchall()}
|
|
|
|
def get_files_by_ext(self,
|
|
ext_value: str,
|
|
limit: Optional[int] = None) -> List[tuple]:
|
|
"""Get files whose metadata ext matches the given extension.
|
|
|
|
Returns (hash, file_path, size, ext) tuples.
|
|
"""
|
|
ext_clean = str(ext_value or "").strip().lower().lstrip(".")
|
|
ext_clean = "".join(ch for ch in ext_clean if ch.isalnum())
|
|
if not ext_clean:
|
|
return []
|
|
|
|
cursor = self.get_cursor()
|
|
cursor.execute(
|
|
"""
|
|
SELECT f.hash, f.file_path,
|
|
COALESCE((SELECT size FROM metadata WHERE hash = f.hash), 0) as size,
|
|
COALESCE((SELECT ext FROM metadata WHERE hash = f.hash), '') as ext
|
|
FROM files f
|
|
JOIN metadata m ON f.hash = m.hash
|
|
WHERE LOWER(LTRIM(COALESCE(m.ext, ''), '.')) = ?
|
|
ORDER BY f.file_path
|
|
LIMIT ?
|
|
""",
|
|
(ext_clean,
|
|
limit or 10000),
|
|
)
|
|
return cursor.fetchall()
|
|
|
|
def get_files_with_any_url(self, limit: Optional[int] = None) -> List[tuple]:
|
|
"""Get files that have any non-empty URL metadata.
|
|
|
|
Returns (hash, file_path, size, ext) tuples.
|
|
"""
|
|
cursor = self.get_cursor()
|
|
cursor.execute(
|
|
"""
|
|
SELECT f.hash, f.file_path,
|
|
COALESCE((SELECT size FROM metadata WHERE hash = f.hash), 0) as size,
|
|
COALESCE((SELECT ext FROM metadata WHERE hash = f.hash), '') as ext
|
|
FROM files f
|
|
JOIN metadata m ON f.hash = m.hash
|
|
WHERE m.url IS NOT NULL
|
|
AND TRIM(m.url) != ''
|
|
AND TRIM(m.url) != '[]'
|
|
ORDER BY f.file_path
|
|
LIMIT ?
|
|
""",
|
|
(limit or 10000,
|
|
),
|
|
)
|
|
return cursor.fetchall()
|
|
|
|
def get_files_by_url_like(self,
|
|
like_pattern: str,
|
|
limit: Optional[int] = None) -> List[tuple]:
|
|
"""Get files whose URL metadata contains a substring (case-insensitive).
|
|
|
|
Returns (hash, file_path, size, ext) tuples.
|
|
"""
|
|
cursor = self.get_cursor()
|
|
cursor.execute(
|
|
"""
|
|
SELECT f.hash, f.file_path,
|
|
COALESCE((SELECT size FROM metadata WHERE hash = f.hash), 0) as size,
|
|
COALESCE((SELECT ext FROM metadata WHERE hash = f.hash), '') as ext
|
|
FROM files f
|
|
JOIN metadata m ON f.hash = m.hash
|
|
WHERE m.url IS NOT NULL
|
|
AND LOWER(m.url) LIKE ?
|
|
ORDER BY f.file_path
|
|
LIMIT ?
|
|
""",
|
|
(like_pattern.lower(),
|
|
limit or 10000),
|
|
)
|
|
return cursor.fetchall()
|
|
|
|
def get_file_metadata(self,
|
|
file_hashes: Set[str],
|
|
limit: Optional[int] = None) -> List[tuple]:
|
|
"""Get metadata for files given their hashes. Returns (hash, file_path, size, extension) tuples."""
|
|
if not file_hashes:
|
|
return []
|
|
cursor = self.get_cursor()
|
|
placeholders = ",".join(["?"] * len(file_hashes))
|
|
fetch_sql = f"""
|
|
SELECT hash, file_path,
|
|
COALESCE((SELECT size FROM metadata WHERE hash = files.hash), 0) as size,
|
|
COALESCE((SELECT ext FROM metadata WHERE hash = files.hash), '') as ext
|
|
FROM files
|
|
WHERE hash IN ({placeholders})
|
|
ORDER BY file_path
|
|
LIMIT ?
|
|
"""
|
|
cursor.execute(fetch_sql, (*file_hashes, limit or len(file_hashes)))
|
|
return cursor.fetchall()
|
|
|
|
def get_all_files(self, limit: Optional[int] = None) -> List[tuple]:
|
|
"""Get all files in database. Returns (hash, file_path, size, ext) tuples."""
|
|
cursor = self.get_cursor()
|
|
cursor.execute(
|
|
"""
|
|
SELECT f.hash, f.file_path,
|
|
COALESCE((SELECT size FROM metadata WHERE hash = f.hash), 0) as size,
|
|
COALESCE((SELECT ext FROM metadata WHERE hash = f.hash), '') as ext
|
|
FROM files f
|
|
ORDER BY file_path
|
|
LIMIT ?
|
|
""",
|
|
(limit or 1000,
|
|
),
|
|
)
|
|
return cursor.fetchall()
|
|
|
|
def get_tags_for_file(self, file_hash: str) -> List[str]:
|
|
"""Get all tags for a file given its hash."""
|
|
cursor = self.get_cursor()
|
|
cursor.execute("SELECT tag FROM tags WHERE hash = ?",
|
|
(file_hash,
|
|
))
|
|
return [row[0] for row in cursor.fetchall()]
|
|
|
|
def get_tags_by_namespace_and_file(self,
|
|
file_hash: str,
|
|
query_pattern: str) -> List[str]:
|
|
"""Get tags for a file matching a pattern."""
|
|
cursor = self.get_cursor()
|
|
cursor.execute(
|
|
"""
|
|
SELECT DISTINCT tag FROM tags
|
|
WHERE hash = ?
|
|
AND LOWER(tag) LIKE ?
|
|
""",
|
|
(file_hash,
|
|
query_pattern),
|
|
)
|
|
return [row[0] for row in cursor.fetchall()]
|
|
|
|
def get_files_by_namespace_pattern(
|
|
self,
|
|
query_pattern: str,
|
|
limit: Optional[int] = None
|
|
) -> List[tuple]:
|
|
"""Get files with tags matching a pattern. Returns (hash, file_path, size, ext) tuples."""
|
|
cursor = self.get_cursor()
|
|
cursor.execute(
|
|
"""
|
|
SELECT DISTINCT f.hash, f.file_path,
|
|
COALESCE((SELECT size FROM metadata WHERE hash = f.hash), 0) as size,
|
|
COALESCE((SELECT ext FROM metadata WHERE hash = f.hash), '') as ext
|
|
FROM files f
|
|
JOIN tags t ON f.hash = t.hash
|
|
WHERE LOWER(t.tag) LIKE ?
|
|
ORDER BY f.file_path
|
|
LIMIT ?
|
|
""",
|
|
(query_pattern,
|
|
limit or 1000),
|
|
)
|
|
return cursor.fetchall()
|
|
|
|
def get_files_by_simple_tag_pattern(
|
|
self,
|
|
query_pattern: str,
|
|
limit: Optional[int] = None
|
|
) -> List[tuple]:
|
|
"""Get files with non-namespaced tags matching a pattern. Returns (hash, file_path, size, ext) tuples."""
|
|
cursor = self.get_cursor()
|
|
cursor.execute(
|
|
"""
|
|
SELECT DISTINCT f.hash, f.file_path,
|
|
COALESCE((SELECT size FROM metadata WHERE hash = f.hash), 0) as size,
|
|
COALESCE((SELECT ext FROM metadata WHERE hash = f.hash), '') as ext
|
|
FROM files f
|
|
JOIN tags t ON f.hash = t.hash
|
|
WHERE LOWER(t.tag) LIKE ? AND LOWER(t.tag) NOT LIKE '%:%'
|
|
ORDER BY f.file_path
|
|
LIMIT ?
|
|
""",
|
|
(query_pattern,
|
|
limit or 1000),
|
|
)
|
|
return cursor.fetchall()
|
|
|
|
def get_files_by_multiple_path_conditions(
|
|
self,
|
|
conditions: List[str],
|
|
params: List[str],
|
|
limit: Optional[int] = None
|
|
) -> List[tuple]:
|
|
"""Get files matching multiple path conditions. Returns (hash, file_path, size, ext) tuples."""
|
|
cursor = self.get_cursor()
|
|
where_clause = " AND ".join(conditions)
|
|
sql = f"""
|
|
SELECT DISTINCT f.hash, f.file_path,
|
|
COALESCE((SELECT size FROM metadata WHERE hash = f.hash), 0) as size,
|
|
COALESCE((SELECT ext FROM metadata WHERE hash = f.hash), '') as ext
|
|
FROM files f
|
|
WHERE {where_clause}
|
|
ORDER BY f.file_path
|
|
LIMIT ?
|
|
"""
|
|
cursor.execute(sql, (*params, limit or 10000))
|
|
return cursor.fetchall()
|
|
|
|
def get_files_by_title_tag_pattern(
|
|
self,
|
|
title_pattern: str,
|
|
limit: Optional[int] = None
|
|
) -> List[tuple]:
|
|
"""Get files with title tags matching a pattern. Returns (hash, file_path, size, ext) tuples."""
|
|
cursor = self.get_cursor()
|
|
cursor.execute(
|
|
"""
|
|
SELECT DISTINCT f.hash, f.file_path,
|
|
COALESCE((SELECT size FROM metadata WHERE hash = f.hash), 0) as size,
|
|
COALESCE((SELECT ext FROM metadata WHERE hash = f.hash), '') as ext
|
|
FROM files f
|
|
JOIN tags t ON f.hash = t.hash
|
|
WHERE LOWER(t.tag) LIKE ?
|
|
ORDER BY f.file_path
|
|
LIMIT ?
|
|
""",
|
|
(title_pattern,
|
|
limit or 10000),
|
|
)
|
|
return cursor.fetchall()
|
|
|
|
|
|
# ============================================================================
|
|
# LIBRARY INITIALIZATION & MIGRATION
|
|
# ============================================================================
|
|
|
|
|
|
class LocalLibraryInitializer:
|
|
"""Initialize and synchronize local library database."""
|
|
|
|
def __init__(self, library_root: Path):
|
|
"""Initialize the database scanner."""
|
|
self.library_root = Path(library_root)
|
|
self.db = API_folder_store(library_root)
|
|
self.stats = {
|
|
"files_scanned": 0,
|
|
"files_new": 0,
|
|
"files_existing": 0,
|
|
"sidecars_imported": 0,
|
|
"sidecars_deleted": 0,
|
|
"tags_imported": 0,
|
|
"metadata_imported": 0,
|
|
"errors": 0,
|
|
}
|
|
|
|
def scan_and_index(self) -> Dict[str, int]:
|
|
"""Scan library folder and populate database with file entries."""
|
|
try:
|
|
logger.info(f"Starting library scan at {self.library_root}")
|
|
|
|
media_files = self._find_media_files()
|
|
logger.info(f"Found {len(media_files)} media files")
|
|
|
|
db_files = self._get_database_files()
|
|
logger.info(f"Found {len(db_files)} files in database")
|
|
|
|
for file_path in media_files:
|
|
self._process_file(file_path, db_files)
|
|
|
|
self.db.connection.commit()
|
|
self._import_sidecars_batch()
|
|
self.db.connection.commit()
|
|
|
|
# Ensure files without sidecars are still imported + renamed to hash.
|
|
self._hash_and_rename_non_sidecar_media_files()
|
|
self.db.connection.commit()
|
|
|
|
self._cleanup_orphaned_sidecars()
|
|
self.db.connection.commit()
|
|
|
|
try:
|
|
cursor = self.db.connection.cursor()
|
|
cursor.execute("SELECT COUNT(*) FROM files")
|
|
row = cursor.fetchone()
|
|
self.stats["files_total_db"] = int(
|
|
row[0]
|
|
) if row and row[0] is not None else 0
|
|
except Exception:
|
|
self.stats["files_total_db"] = 0
|
|
|
|
logger.info(f"Library scan complete. Stats: {self.stats}")
|
|
return self.stats
|
|
except Exception as e:
|
|
logger.error(f"Error during library scan: {e}", exc_info=True)
|
|
self.stats["errors"] += 1
|
|
raise
|
|
finally:
|
|
self.db.close()
|
|
|
|
def _hash_and_rename_non_sidecar_media_files(self) -> None:
|
|
"""Ensure media files are hash-named even when they have no sidecars.
|
|
|
|
This keeps the library stable across restarts:
|
|
- New files get hashed + renamed to <sha256><ext>
|
|
- DB file_path is updated by hash so the same file isn't re-counted as "new".
|
|
"""
|
|
try:
|
|
renamed = 0
|
|
skipped_existing_target = 0
|
|
duplicates_quarantined = 0
|
|
|
|
for file_path in self._find_media_files():
|
|
try:
|
|
if not file_path.is_file():
|
|
continue
|
|
|
|
stem = file_path.stem.lower()
|
|
is_hash_named = len(stem) == 64 and all(
|
|
ch in "0123456789abcdef" for ch in stem
|
|
)
|
|
if is_hash_named:
|
|
continue
|
|
|
|
# If any sidecars exist for this file, let the sidecar importer handle it.
|
|
if (file_path.with_name(file_path.name + ".tag").exists() or
|
|
file_path.with_name(file_path.name + ".metadata").exists()
|
|
or file_path.with_name(file_path.name + ".notes").exists()):
|
|
continue
|
|
|
|
file_hash = sha256_file(file_path)
|
|
target_path = file_path.with_name(f"{file_hash}{file_path.suffix}")
|
|
|
|
# Ensure the DB entry exists with a title tag derived from the original filename.
|
|
# This intentionally happens BEFORE rename.
|
|
self.db.get_or_create_file_entry(file_path, file_hash)
|
|
|
|
if target_path == file_path:
|
|
continue
|
|
|
|
if target_path.exists():
|
|
skipped_existing_target += 1
|
|
# The canonical file already exists as a hash-named file. Keep the DB pointing
|
|
# at the canonical hash-named path and quarantine this duplicate so it doesn't
|
|
# get counted as "new" again on future restarts.
|
|
try:
|
|
cursor = self.db.connection.cursor()
|
|
cursor.execute(
|
|
"UPDATE files SET file_path = ?, updated_at = CURRENT_TIMESTAMP WHERE hash = ?",
|
|
(self.db._to_db_file_path(target_path),
|
|
file_hash),
|
|
)
|
|
except Exception as exc:
|
|
logger.debug(
|
|
f"Failed to reset DB path to canonical file for {file_hash}: {exc}"
|
|
)
|
|
|
|
try:
|
|
dup_dir = self.library_root / ".duplicates"
|
|
dup_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
dest = dup_dir / file_path.name
|
|
if dest.exists():
|
|
ts = int(datetime.now().timestamp())
|
|
dest = dup_dir / f"{file_path.stem}__dup__{ts}{file_path.suffix}"
|
|
|
|
logger.warning(
|
|
f"Duplicate content (hash={file_hash}) detected; moving {file_path} -> {dest}"
|
|
)
|
|
file_path.rename(dest)
|
|
duplicates_quarantined += 1
|
|
except Exception as exc:
|
|
logger.warning(
|
|
f"Duplicate content (hash={file_hash}) detected but could not quarantine {file_path}: {exc}"
|
|
)
|
|
continue
|
|
|
|
try:
|
|
file_path.rename(target_path)
|
|
except Exception as exc:
|
|
logger.warning(
|
|
f"Failed to rename {file_path} -> {target_path}: {exc}"
|
|
)
|
|
self.stats["errors"] += 1
|
|
continue
|
|
|
|
# Update DB path by hash (more robust than matching the old path).
|
|
try:
|
|
cursor = self.db.connection.cursor()
|
|
cursor.execute(
|
|
"UPDATE files SET file_path = ?, updated_at = CURRENT_TIMESTAMP WHERE hash = ?",
|
|
(self.db._to_db_file_path(target_path),
|
|
file_hash),
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
# Ensure basic metadata exists.
|
|
try:
|
|
stat_result = target_path.stat()
|
|
self.db.save_metadata(
|
|
target_path,
|
|
{
|
|
"hash": file_hash,
|
|
"ext": target_path.suffix,
|
|
"size": stat_result.st_size,
|
|
},
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
renamed += 1
|
|
except Exception as exc:
|
|
logger.warning(f"Error hashing/renaming file {file_path}: {exc}")
|
|
self.stats["errors"] += 1
|
|
|
|
if renamed:
|
|
self.stats["files_hashed_renamed"] = (
|
|
int(self.stats.get("files_hashed_renamed",
|
|
0) or 0) + renamed
|
|
)
|
|
if skipped_existing_target:
|
|
self.stats["files_hashed_skipped_target_exists"] = (
|
|
int(self.stats.get("files_hashed_skipped_target_exists",
|
|
0) or 0) + skipped_existing_target
|
|
)
|
|
if duplicates_quarantined:
|
|
self.stats["duplicates_quarantined"] = (
|
|
int(self.stats.get("duplicates_quarantined",
|
|
0) or 0) + duplicates_quarantined
|
|
)
|
|
except Exception as exc:
|
|
logger.error(
|
|
f"Error hashing/renaming non-sidecar media files: {exc}",
|
|
exc_info=True
|
|
)
|
|
self.stats["errors"] += 1
|
|
|
|
def _find_media_files(self) -> List[Path]:
|
|
"""Find all media files in the library folder."""
|
|
media_files = []
|
|
try:
|
|
for file_path in self.library_root.rglob("*"):
|
|
# Don't repeatedly re-scan quarantined duplicates.
|
|
try:
|
|
if ".duplicates" in file_path.parts:
|
|
continue
|
|
except Exception:
|
|
pass
|
|
if file_path.is_file() and file_path.suffix.lower() in MEDIA_EXTENSIONS:
|
|
media_files.append(file_path)
|
|
except Exception as e:
|
|
logger.error(f"Error scanning media files: {e}", exc_info=True)
|
|
|
|
return sorted(media_files)
|
|
|
|
def _get_database_files(self) -> Dict[str, str]:
|
|
"""Get existing files from database by normalized path, returns {normalized_path: hash}."""
|
|
try:
|
|
cursor = self.db.connection.cursor()
|
|
cursor.execute("SELECT hash, file_path FROM files")
|
|
|
|
result = {}
|
|
for file_hash, file_path in cursor.fetchall():
|
|
try:
|
|
abs_path = self.db._from_db_file_path(file_path)
|
|
except Exception:
|
|
abs_path = Path(file_path)
|
|
normalized = str(abs_path.resolve()).lower()
|
|
result[normalized] = file_hash
|
|
|
|
return result
|
|
except Exception as e:
|
|
logger.error(f"Error getting database files: {e}", exc_info=True)
|
|
return {}
|
|
|
|
def _process_file(self, file_path: Path, db_files: Dict[str, str]) -> None:
|
|
"""Process a single media file."""
|
|
try:
|
|
normalized = str(file_path.resolve()).lower()
|
|
|
|
if normalized in db_files:
|
|
self.stats["files_existing"] += 1
|
|
else:
|
|
# Path not known. If this file's hash is already in DB, it's duplicate content and
|
|
# should not be counted as "new".
|
|
file_hash = sha256_file(file_path)
|
|
try:
|
|
cursor = self.db.connection.cursor()
|
|
cursor.execute("SELECT 1 FROM files WHERE hash = ?",
|
|
(file_hash,
|
|
))
|
|
exists_by_hash = cursor.fetchone() is not None
|
|
except Exception:
|
|
exists_by_hash = False
|
|
|
|
if exists_by_hash:
|
|
self.stats["files_existing"] += 1
|
|
self.stats["duplicates_found"] = (
|
|
int(self.stats.get("duplicates_found",
|
|
0) or 0) + 1
|
|
)
|
|
logger.info(
|
|
f"Duplicate content detected during scan (hash={file_hash}): {file_path}"
|
|
)
|
|
else:
|
|
self.db.get_or_create_file_entry(file_path, file_hash)
|
|
self.stats["files_new"] += 1
|
|
|
|
self.stats["files_scanned"] += 1
|
|
except Exception as e:
|
|
logger.warning(f"Error processing file {file_path}: {e}")
|
|
self.stats["errors"] += 1
|
|
|
|
def _import_sidecars_batch(self) -> None:
|
|
"""Batch import sidecars, hash files, and rename files to their hash."""
|
|
try:
|
|
sidecar_map = self._collect_sidecars()
|
|
|
|
for base_path, sidecars in sidecar_map.items():
|
|
try:
|
|
if not base_path.exists():
|
|
continue
|
|
|
|
tags = self._read_tag_sidecars(sidecars)
|
|
metadata_info = self._read_metadata_sidecar(sidecars)
|
|
note_text = self._read_notes_sidecar(sidecars)
|
|
|
|
hashed_path, file_hash = self._ensure_hashed_filename(base_path, sidecars)
|
|
|
|
# Always trust freshly computed hash
|
|
metadata_info["hash"] = file_hash
|
|
try:
|
|
stat_result = hashed_path.stat()
|
|
metadata_info.setdefault("size", stat_result.st_size)
|
|
metadata_info.setdefault("ext", hashed_path.suffix)
|
|
except OSError:
|
|
pass
|
|
|
|
self.db.save_file_info(hashed_path, metadata_info, tags)
|
|
if note_text:
|
|
self.db.save_note(hashed_path, note_text)
|
|
|
|
# Delete all sidecars after importing
|
|
self._delete_sidecars(sidecars)
|
|
|
|
self.stats["sidecars_imported"] += 1
|
|
except Exception as e:
|
|
logger.warning(
|
|
f"Error importing sidecar bundle for {base_path}: {e}"
|
|
)
|
|
self.stats["errors"] += 1
|
|
except Exception as e:
|
|
logger.error(f"Error batch importing sidecars: {e}", exc_info=True)
|
|
|
|
def _collect_sidecars(self) -> Dict[Path, Dict[str, List[Path]]]:
|
|
"""Collect sidecars grouped by their base media file."""
|
|
sidecar_map: Dict[Path,
|
|
Dict[str,
|
|
List[Path]]] = {}
|
|
|
|
patterns = [
|
|
("*.tag",
|
|
"tag"),
|
|
("*.metadata",
|
|
"metadata"),
|
|
("*.notes",
|
|
"notes"),
|
|
]
|
|
|
|
for pattern, key in patterns:
|
|
for sidecar in self.library_root.rglob(pattern):
|
|
try:
|
|
base = sidecar.with_suffix("")
|
|
except Exception:
|
|
continue
|
|
|
|
if not base.exists():
|
|
continue
|
|
|
|
bucket = sidecar_map.setdefault(
|
|
base,
|
|
{
|
|
"tag": [],
|
|
"metadata": [],
|
|
"notes": []
|
|
}
|
|
)
|
|
bucket[key].append(sidecar)
|
|
|
|
return sidecar_map
|
|
|
|
def _read_tag_sidecars(self, sidecars: Dict[str, List[Path]]) -> List[str]:
|
|
tags: List[str] = []
|
|
for tag_path in sidecars.get("tag", []):
|
|
try:
|
|
content = tag_path.read_text(encoding="utf-8")
|
|
except OSError:
|
|
continue
|
|
|
|
for raw_line in content.splitlines():
|
|
line = raw_line.strip()
|
|
if line:
|
|
tags.append(line)
|
|
return tags
|
|
|
|
def _read_metadata_sidecar(self, sidecars: Dict[str, List[Path]]) -> Dict[str, Any]:
|
|
metadata: Dict[str,
|
|
Any] = {
|
|
"url": [],
|
|
"relationships": []
|
|
}
|
|
|
|
meta_path = sidecars.get("metadata", [])
|
|
if not meta_path:
|
|
return metadata
|
|
|
|
for path in meta_path:
|
|
try:
|
|
content = path.read_text(encoding="utf-8")
|
|
except OSError:
|
|
continue
|
|
|
|
for raw_line in content.splitlines():
|
|
line = raw_line.strip()
|
|
if not line or line.startswith("#"):
|
|
continue
|
|
|
|
lower = line.lower()
|
|
if lower.startswith("hash:"):
|
|
metadata["hash"] = line.split(":", 1)[1].strip()
|
|
elif lower.startswith("url:") or lower.startswith("url:"):
|
|
url_part = line.split(":", 1)[1].strip()
|
|
if url_part:
|
|
for url_segment in url_part.replace(",", " ").split():
|
|
clean = url_segment.strip()
|
|
if clean and clean not in metadata["url"]:
|
|
metadata["url"].append(clean)
|
|
elif lower.startswith("relationship:"):
|
|
rel_value = line.split(":", 1)[1].strip()
|
|
if rel_value:
|
|
metadata["relationships"].append(rel_value)
|
|
|
|
return metadata
|
|
|
|
def _read_notes_sidecar(self, sidecars: Dict[str, List[Path]]) -> Optional[str]:
|
|
note_paths = sidecars.get("notes", [])
|
|
for note_path in note_paths:
|
|
try:
|
|
content = note_path.read_text(encoding="utf-8").strip()
|
|
except OSError:
|
|
continue
|
|
if content:
|
|
return content
|
|
return None
|
|
|
|
def _ensure_hashed_filename(self,
|
|
file_path: Path,
|
|
sidecars: Dict[str,
|
|
List[Path]]) -> Tuple[Path,
|
|
str]:
|
|
"""Compute hash, rename file to hash-based name, and move sidecars accordingly."""
|
|
file_hash = sha256_file(file_path)
|
|
target_name = f"{file_hash}{file_path.suffix}"
|
|
target_path = file_path.with_name(target_name)
|
|
|
|
# Nothing to do if already hashed
|
|
if target_path == file_path:
|
|
return file_path, file_hash
|
|
|
|
try:
|
|
if target_path.exists():
|
|
logger.warning(
|
|
f"Hash target already exists, keeping original: {target_path}"
|
|
)
|
|
return file_path, file_hash
|
|
|
|
file_path.rename(target_path)
|
|
self._rename_sidecars(file_path, target_path, sidecars)
|
|
try:
|
|
self.db.rename_file(file_path, target_path)
|
|
except Exception:
|
|
# Entry might not exist yet; it will be created during save.
|
|
pass
|
|
return target_path, file_hash
|
|
except Exception as e:
|
|
logger.warning(f"Failed to rename {file_path} to hash {target_path}: {e}")
|
|
return file_path, file_hash
|
|
|
|
def _rename_sidecars(
|
|
self,
|
|
old_base: Path,
|
|
new_base: Path,
|
|
sidecars: Dict[str,
|
|
List[Path]]
|
|
) -> None:
|
|
"""Rename sidecars to follow the new hashed filename."""
|
|
mappings = [
|
|
(sidecars.get("tag",
|
|
[]),
|
|
".tag"),
|
|
(sidecars.get("metadata",
|
|
[]),
|
|
".metadata"),
|
|
(sidecars.get("notes",
|
|
[]),
|
|
".notes"),
|
|
]
|
|
|
|
for candidates, suffix in mappings:
|
|
for source in candidates:
|
|
try:
|
|
dest = new_base.with_name(new_base.name + suffix)
|
|
except Exception:
|
|
continue
|
|
|
|
if dest == source:
|
|
continue
|
|
|
|
try:
|
|
source.rename(dest)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to rename sidecar {source} -> {dest}: {e}")
|
|
|
|
def _delete_sidecars(self, sidecars: Dict[str, List[Path]]) -> None:
|
|
"""Delete sidecar files after they've been imported."""
|
|
for sidecar_list in sidecars.values():
|
|
for sidecar_path in sidecar_list:
|
|
try:
|
|
if sidecar_path.exists():
|
|
sidecar_path.unlink()
|
|
self.stats["sidecars_deleted"] += 1
|
|
except Exception as e:
|
|
logger.warning(f"Could not delete sidecar {sidecar_path}: {e}")
|
|
|
|
def _cleanup_orphaned_sidecars(self) -> None:
|
|
"""Remove sidecars for non-existent files."""
|
|
try:
|
|
patterns = ["*.tag", "*.metadata", "*.notes"]
|
|
|
|
for pattern in patterns:
|
|
for sidecar_path in self.library_root.rglob(pattern):
|
|
base_path = sidecar_path.with_suffix("")
|
|
if not base_path.exists():
|
|
try:
|
|
sidecar_path.unlink()
|
|
self.stats["sidecars_deleted"] += 1
|
|
except Exception as e:
|
|
logger.warning(
|
|
f"Could not delete orphaned sidecar {sidecar_path}: {e}"
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error cleaning up orphaned sidecars: {e}", exc_info=True)
|
|
|
|
|
|
def migrate_tags_to_db(library_root: Path, db: API_folder_store) -> int:
|
|
"""Migrate .tag files to the database."""
|
|
migrated_count = 0
|
|
|
|
try:
|
|
for tags_file in library_root.rglob("*.tag"):
|
|
try:
|
|
base_path = tags_file.with_suffix("")
|
|
tags_text = tags_file.read_text(encoding="utf-8")
|
|
tags = [line.strip() for line in tags_text.splitlines() if line.strip()]
|
|
|
|
db.save_tags(base_path, tags)
|
|
migrated_count += 1
|
|
|
|
try:
|
|
tags_file.unlink()
|
|
logger.info(f"Migrated and deleted {tags_file}")
|
|
except Exception as e:
|
|
logger.warning(f"Migrated {tags_file} but failed to delete: {e}")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to migrate {tags_file}: {e}")
|
|
|
|
logger.info(f"Migrated {migrated_count} .tag files to database")
|
|
return migrated_count
|
|
except Exception as e:
|
|
logger.error(f"Error during tags migration: {e}", exc_info=True)
|
|
return migrated_count
|
|
|
|
|
|
def migrate_metadata_to_db(library_root: Path, db: API_folder_store) -> int:
|
|
"""Migrate .metadata files to the database."""
|
|
migrated_count = 0
|
|
|
|
try:
|
|
for metadata_file in library_root.rglob("*.metadata"):
|
|
try:
|
|
base_path = Path(str(metadata_file)[:-len(".metadata")])
|
|
metadata_text = metadata_file.read_text(encoding="utf-8")
|
|
metadata = _parse_metadata_file(metadata_text)
|
|
|
|
db.save_metadata(base_path, metadata)
|
|
migrated_count += 1
|
|
|
|
try:
|
|
metadata_file.unlink()
|
|
logger.info(f"Migrated and deleted {metadata_file}")
|
|
except Exception as e:
|
|
logger.warning(
|
|
f"Migrated {metadata_file} but failed to delete: {e}"
|
|
)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to migrate {metadata_file}: {e}")
|
|
|
|
logger.info(f"Migrated {migrated_count} .metadata files to database")
|
|
return migrated_count
|
|
except Exception as e:
|
|
logger.error(f"Error during metadata migration: {e}", exc_info=True)
|
|
return migrated_count
|
|
|
|
|
|
def _parse_metadata_file(content: str) -> Dict[str, Any]:
|
|
"""Parse metadata file content."""
|
|
try:
|
|
return json.loads(content)
|
|
except json.JSONDecodeError:
|
|
logger.warning("Could not parse metadata JSON, returning empty dict")
|
|
return {}
|
|
|
|
|
|
def migrate_all(library_root: Path,
|
|
db: Optional[API_folder_store] = None) -> Dict[str,
|
|
int]:
|
|
"""Migrate all sidecar files to database."""
|
|
should_close = db is None
|
|
|
|
try:
|
|
if db is None:
|
|
db = API_folder_store(library_root)
|
|
|
|
return {
|
|
"tags": migrate_tags_to_db(library_root,
|
|
db),
|
|
"metadata": migrate_metadata_to_db(library_root,
|
|
db),
|
|
}
|
|
finally:
|
|
if should_close:
|
|
db.close()
|
|
|
|
|
|
# ============================================================================
|
|
# SEARCH OPTIMIZATION
|
|
# ============================================================================
|
|
|
|
|
|
class LocalLibrarySearchOptimizer:
|
|
"""Optimizer that uses database for local library searches."""
|
|
|
|
def __init__(self, library_root: Path):
|
|
"""Initialize the search optimizer."""
|
|
self.library_root = Path(library_root)
|
|
self.db: Optional[API_folder_store] = None
|
|
|
|
def __enter__(self):
|
|
"""Context manager entry."""
|
|
self.db = API_folder_store(self.library_root)
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
"""Context manager exit."""
|
|
if self.db:
|
|
self.db.close()
|
|
|
|
def get_cached_tags(self, file_path: Path) -> List[str]:
|
|
"""Get tags from database cache."""
|
|
if not self.db:
|
|
return []
|
|
return self.db.get_tags(file_path)
|
|
|
|
def get_cached_metadata(self, file_path: Path) -> Optional[Dict[str, Any]]:
|
|
"""Get metadata from database cache."""
|
|
if not self.db:
|
|
return None
|
|
return self.db.get_metadata(file_path)
|
|
|
|
def prefetch_metadata(self, file_paths: List[Path]) -> None:
|
|
"""Pre-cache metadata for multiple files."""
|
|
if not self.db:
|
|
return
|
|
|
|
for file_path in file_paths:
|
|
try:
|
|
self.db.get_or_create_file_entry(file_path)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to prefetch {file_path}: {e}")
|
|
|
|
def update_search_result_with_cached_data(
|
|
self,
|
|
search_result: Any,
|
|
file_path: Path
|
|
) -> None:
|
|
"""Update a search result object with cached database data."""
|
|
if not self.db:
|
|
return
|
|
|
|
try:
|
|
tags = self.db.get_tags(file_path)
|
|
if tags:
|
|
search_result.tag_summary = ", ".join(tags)
|
|
|
|
metadata = self.db.get_metadata(file_path)
|
|
if metadata:
|
|
if "hash" in metadata:
|
|
search_result.hash_hex = metadata["hash"]
|
|
if "duration" in metadata:
|
|
search_result.duration_seconds = metadata["duration"]
|
|
if "media_kind" in metadata:
|
|
search_result.media_kind = metadata["media_kind"]
|
|
except Exception as e:
|
|
logger.warning(f"Failed to update search result for {file_path}: {e}")
|
|
|
|
def search_by_tag(self, tag: str, limit: int = 100) -> List[Path]:
|
|
"""Fast tag-based search using database."""
|
|
if not self.db:
|
|
return []
|
|
|
|
try:
|
|
cursor = self.db.connection.cursor()
|
|
cursor.execute(
|
|
"""
|
|
SELECT f.file_path
|
|
FROM files f
|
|
JOIN tags t ON f.id = t.file_id
|
|
WHERE t.tag LIKE ?
|
|
LIMIT ?
|
|
""",
|
|
(f"%{tag}%",
|
|
limit),
|
|
)
|
|
|
|
return [Path(row[0]) for row in cursor.fetchall()]
|
|
except Exception as e:
|
|
logger.error(f"Tag search failed: {e}")
|
|
return []
|
|
|
|
def save_playlist(self, name: str, items: List[Dict[str, Any]]) -> bool:
|
|
"""Save a playlist to the database."""
|
|
if not self.db:
|
|
return False
|
|
try:
|
|
cursor = self.db.connection.cursor()
|
|
items_json = json.dumps(items)
|
|
cursor.execute(
|
|
"""
|
|
INSERT INTO playlists (name, items, updated_at)
|
|
VALUES (?, ?, CURRENT_TIMESTAMP)
|
|
ON CONFLICT(name) DO UPDATE SET
|
|
items = excluded.items,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
""",
|
|
(name,
|
|
items_json),
|
|
)
|
|
self.db.connection.commit()
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to save playlist {name}: {e}")
|
|
return False
|
|
|
|
def get_playlists(self) -> List[Dict[str, Any]]:
|
|
"""Get all saved playlists."""
|
|
if not self.db:
|
|
return []
|
|
try:
|
|
cursor = self.db.connection.cursor()
|
|
cursor.execute(
|
|
"SELECT id, name, items, updated_at FROM playlists ORDER BY updated_at DESC"
|
|
)
|
|
results = []
|
|
for row in cursor.fetchall():
|
|
try:
|
|
items = json.loads(row["items"])
|
|
except json.JSONDecodeError:
|
|
items = []
|
|
results.append(
|
|
{
|
|
"id": row["id"],
|
|
"name": row["name"],
|
|
"items": items,
|
|
"updated_at": row["updated_at"],
|
|
}
|
|
)
|
|
return results
|
|
except Exception as e:
|
|
logger.error(f"Failed to get playlists: {e}")
|
|
return []
|
|
|
|
def get_playlist(self, name: str) -> Optional[List[Dict[str, Any]]]:
|
|
"""Get a specific playlist by name."""
|
|
if not self.db:
|
|
return None
|
|
try:
|
|
cursor = self.db.connection.cursor()
|
|
cursor.execute("SELECT items FROM playlists WHERE name = ?",
|
|
(name,
|
|
))
|
|
row = cursor.fetchone()
|
|
if row:
|
|
try:
|
|
return json.loads(row["items"])
|
|
except json.JSONDecodeError:
|
|
return []
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Failed to get playlist {name}: {e}")
|
|
return None
|
|
|
|
def get_playlist_by_id(self,
|
|
playlist_id: int) -> Optional[Tuple[str,
|
|
List[Dict[str,
|
|
Any]]]]:
|
|
"""Get a specific playlist by ID. Returns (name, items)."""
|
|
if not self.db:
|
|
return None
|
|
try:
|
|
cursor = self.db.connection.cursor()
|
|
cursor.execute(
|
|
"SELECT name, items FROM playlists WHERE id = ?",
|
|
(playlist_id,
|
|
)
|
|
)
|
|
row = cursor.fetchone()
|
|
if row:
|
|
try:
|
|
items = json.loads(row["items"])
|
|
return (row["name"], items)
|
|
except json.JSONDecodeError:
|
|
return (row["name"], [])
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Failed to get playlist ID {playlist_id}: {e}")
|
|
return None
|
|
|
|
def delete_playlist(self, playlist_id: int) -> bool:
|
|
"""Delete a playlist by ID."""
|
|
if not self.db:
|
|
return False
|
|
try:
|
|
cursor = self.db.connection.cursor()
|
|
cursor.execute("DELETE FROM playlists WHERE id = ?",
|
|
(playlist_id,
|
|
))
|
|
self.db.connection.commit()
|
|
return cursor.rowcount > 0
|
|
except Exception as e:
|
|
logger.error(f"Failed to delete playlist ID {playlist_id}: {e}")
|
|
return False
|
|
if not self.db:
|
|
return []
|
|
return self.db.search_by_tag(tag, limit)
|
|
|
|
def search_by_hash(self, file_hash: str) -> Optional[Path]:
|
|
"""Fast hash-based search using database."""
|
|
if not self.db:
|
|
return None
|
|
return self.db.search_hash(file_hash)
|
|
|
|
def set_relationship(
|
|
self,
|
|
file_path: Path,
|
|
related_file_path: Path,
|
|
rel_type: str = "alt",
|
|
*,
|
|
bidirectional: bool = True,
|
|
) -> None:
|
|
"""Set a relationship between two files in the database.
|
|
|
|
Delegates to LocalLibraryDB.set_relationship().
|
|
|
|
Args:
|
|
file_path: Path to the first file
|
|
related_file_path: Path to the related file
|
|
rel_type: Type of relationship ('king', 'alt', 'related', etc.)
|
|
"""
|
|
if not self.db:
|
|
return
|
|
self.db.set_relationship(
|
|
file_path,
|
|
related_file_path,
|
|
rel_type,
|
|
bidirectional=bidirectional
|
|
)
|
|
|
|
def find_files_pointing_to(self, target_path: Path) -> List[Dict[str, Any]]:
|
|
"""Find all files that have a relationship pointing to the target path."""
|
|
if not self.db:
|
|
return []
|
|
return self.db.find_files_pointing_to(target_path)
|