From 6e9a0c28ff58b3495c94f77292375857b68cdf89 Mon Sep 17 00:00:00 2001 From: Nose Date: Fri, 2 Jan 2026 02:28:59 -0800 Subject: [PATCH] j --- API/folder.py | 597 +++++------- CLI.py | 23 +- SYS/cmdlet_api.py | 220 +++++ SYS/metadata.py | 71 +- SYS/worker_manager.py | 85 +- Store/Folder.py | 136 ++- Store/HydrusNetwork.py | 39 +- cmdlet/_shared.py | 7 +- cmdlet/add_file.py | 2025 +++------------------------------------ cmdlet/add_note.py | 109 ++- cmdlet/download_file.py | 220 ++++- cmdlet/merge_file.py | 4 +- cmdnat/help.py | 200 +++- 13 files changed, 1402 insertions(+), 2334 deletions(-) create mode 100644 SYS/cmdlet_api.py diff --git a/API/folder.py b/API/folder.py index bf018c6..9aa1bff 100644 --- a/API/folder.py +++ b/API/folder.py @@ -15,15 +15,38 @@ import json import logging import subprocess import shutil +import time from datetime import datetime from pathlib import Path, PurePosixPath +from threading import RLock from typing import Optional, Dict, Any, List, Tuple, Set from SYS.utils import sha256_file +from SYS.logger import debug as mm_debug logger = logging.getLogger(__name__) WORKER_LOG_MAX_ENTRIES = 99 +# Helper: decorate DB write methods to retry transient SQLITE 'database is locked' errors +def _db_retry(max_attempts: int = 6, base_sleep: float = 0.1): + def _decorator(func): + def _wrapped(*args, **kwargs): + attempts = 0 + while True: + try: + return func(*args, **kwargs) + except sqlite3.OperationalError as e: + msg = str(e or "").lower() + if "database is locked" in msg and attempts < max_attempts: + attempts += 1 + sleep_time = min(base_sleep * (2 ** (attempts - 1)), 5.0) + logger.debug(f"[db_retry] database locked; retrying in {sleep_time:.2f}s ({attempts}/{max_attempts})") + time.sleep(sleep_time) + continue + raise + return _wrapped + return _decorator + # Try to import optional dependencies try: import mutagen @@ -188,6 +211,10 @@ class API_folder_store: self.library_root = Path(library_root) self.db_path = self.library_root / self.DB_NAME self.connection: Optional[sqlite3.Connection] = None + # sqlite3 connections are not safe for concurrent use across threads. + # We intentionally keep a single connection per API_folder_store instance, + # so we must serialize all DB operations on that connection. + self._db_lock = RLock() self._init_db() def _normalize_input_path(self, file_path: Path) -> Path: @@ -234,7 +261,7 @@ class API_folder_store: self.connection = sqlite3.connect( str(self.db_path), check_same_thread=False, - timeout=60.0 + timeout=5.0 ) self.connection.row_factory = sqlite3.Row @@ -242,6 +269,11 @@ class API_folder_store: self.connection.execute("PRAGMA journal_mode=WAL") # Enable foreign keys self.connection.execute("PRAGMA foreign_keys = ON") + # Bound how long sqlite will wait on locks before raising. + try: + self.connection.execute("PRAGMA busy_timeout = 5000") + except Exception: + pass self._create_tables() logger.info(f"Database initialized at {self.db_path}") @@ -261,7 +293,7 @@ class API_folder_store: cursor.execute( """ - CREATE TABLE IF NOT EXISTS files ( + CREATE TABLE IF NOT EXISTS file ( hash TEXT PRIMARY KEY NOT NULL, file_path TEXT UNIQUE NOT NULL, file_modified REAL, @@ -285,19 +317,19 @@ class API_folder_store: time_modified TIMESTAMP DEFAULT CURRENT_TIMESTAMP, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - FOREIGN KEY (hash) REFERENCES files(hash) ON DELETE CASCADE + FOREIGN KEY (hash) REFERENCES file(hash) ON DELETE CASCADE ) """ ) cursor.execute( """ - CREATE TABLE IF NOT EXISTS tags ( + CREATE TABLE IF NOT EXISTS tag ( id INTEGER PRIMARY KEY AUTOINCREMENT, hash TEXT NOT NULL, tag TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - FOREIGN KEY (hash) REFERENCES files(hash) ON DELETE CASCADE, + FOREIGN KEY (hash) REFERENCES file(hash) ON DELETE CASCADE, UNIQUE(hash, tag) ) """ @@ -305,13 +337,13 @@ class API_folder_store: cursor.execute( """ - CREATE TABLE IF NOT EXISTS notes ( + CREATE TABLE IF NOT EXISTS note ( hash TEXT NOT NULL, name TEXT NOT NULL, note TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - FOREIGN KEY (hash) REFERENCES files(hash) ON DELETE CASCADE, + FOREIGN KEY (hash) REFERENCES file(hash) ON DELETE CASCADE, PRIMARY KEY (hash, name) ) """ @@ -319,7 +351,7 @@ class API_folder_store: cursor.execute( """ - CREATE TABLE IF NOT EXISTS playlists ( + CREATE TABLE IF NOT EXISTS playlist ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT UNIQUE NOT NULL, items TEXT NOT NULL, @@ -333,9 +365,9 @@ class API_folder_store: self._ensure_worker_tables(cursor) # Create indices for performance - cursor.execute("CREATE INDEX IF NOT EXISTS idx_files_path ON files(file_path)") - cursor.execute("CREATE INDEX IF NOT EXISTS idx_tags_hash ON tags(hash)") - cursor.execute("CREATE INDEX IF NOT EXISTS idx_tags_tag ON tags(tag)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_file_path ON file(file_path)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_tag_hash ON tag(hash)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_tag_tag ON tag(tag)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_metadata_ext ON metadata(ext)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_worker_id ON worker(worker_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_worker_status ON worker(status)") @@ -343,12 +375,9 @@ class API_folder_store: "CREATE INDEX IF NOT EXISTS idx_worker_type ON worker(worker_type)" ) - self._migrate_metadata_schema(cursor) - self._migrate_notes_schema(cursor) - # Notes indices (after migration so columns exist) - cursor.execute("CREATE INDEX IF NOT EXISTS idx_notes_hash ON notes(hash)") - cursor.execute("CREATE INDEX IF NOT EXISTS idx_notes_name ON notes(name)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_note_hash ON note(hash)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_note_name ON note(name)") self.connection.commit() logger.debug("Database tables created/verified") @@ -535,127 +564,7 @@ class API_folder_store: exc_info=True ) - def _migrate_metadata_schema(self, cursor) -> None: - """Ensure metadata schema is up-to-date. - - - If a legacy schema is detected, attempt to import/upgrade (best-effort). - - If the hash-based schema exists, add any missing columns expected by current code. - """ - try: - # Check if this is a fresh new database (hash-based schema) - cursor.execute("PRAGMA table_info(metadata)") - existing_columns = {row[1] - for row in cursor.fetchall()} - - # Legacy migration: If old schema exists, try to import data. - # Old schema would have had: id (INTEGER PRIMARY KEY), file_hash (TEXT), etc. - if "hash" not in existing_columns: - if "id" in existing_columns and "file_hash" in existing_columns: - logger.info( - "Detected legacy metadata schema - importing to new hash-based schema" - ) - # This would be complex legacy migration - for now just note it. - logger.info( - "Legacy metadata table detected but import not yet implemented" - ) - return - - # Unknown/unsupported schema; nothing we can safely do here. - return - - # Hash-based schema exists: add any missing columns expected by current code. - # These are safe ALTER TABLE additions for older DBs. - column_specs = { - "size": "INTEGER", - "ext": "TEXT", - "type": "TEXT", - "url": "TEXT", - "relationships": "TEXT", - "duration": "REAL", - "time_imported": "TIMESTAMP DEFAULT CURRENT_TIMESTAMP", - "time_modified": "TIMESTAMP DEFAULT CURRENT_TIMESTAMP", - "created_at": "TIMESTAMP DEFAULT CURRENT_TIMESTAMP", - "updated_at": "TIMESTAMP DEFAULT CURRENT_TIMESTAMP", - } - - for col_name, col_def in column_specs.items(): - if col_name not in existing_columns: - try: - cursor.execute( - f"ALTER TABLE metadata ADD COLUMN {col_name} {col_def}" - ) - existing_columns.add(col_name) - logger.info(f"Added '{col_name}' column to metadata table") - except Exception as e: - logger.debug(f"Column '{col_name}' may already exist: {e}") - - # Populate type column from ext if not already populated. - if "type" in existing_columns and "ext" in existing_columns: - try: - from SYS.utils_constant import get_type_from_ext - - cursor.execute( - "SELECT hash, ext FROM metadata WHERE type IS NULL OR type = ''" - ) - rows = cursor.fetchall() - for file_hash, ext in rows: - file_type = get_type_from_ext(ext or "") - cursor.execute( - "UPDATE metadata SET type = ? WHERE hash = ?", - (file_type, - file_hash) - ) - if rows: - logger.info( - f"Populated type column for {len(rows)} metadata entries" - ) - except Exception as e: - logger.debug(f"Could not populate type column: {e}") - - self.connection.commit() - except Exception as e: - logger.debug(f"Note: Schema import/migration completed with status: {e}") - - def _migrate_notes_schema(self, cursor) -> None: - """Migrate legacy notes schema (hash PRIMARY KEY, note) to named notes (hash,name PRIMARY KEY).""" - try: - cursor.execute("PRAGMA table_info(notes)") - cols = [row[1] for row in cursor.fetchall()] - if not cols: - return - if "name" in cols: - return - - logger.info("Migrating legacy notes table to named notes schema") - cursor.execute( - """ - CREATE TABLE IF NOT EXISTS notes_new ( - hash TEXT NOT NULL, - name TEXT NOT NULL, - note TEXT, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - FOREIGN KEY (hash) REFERENCES files(hash) ON DELETE CASCADE, - PRIMARY KEY (hash, name) - ) - """ - ) - - # Copy existing notes into the default key - cursor.execute( - """ - INSERT INTO notes_new (hash, name, note, created_at, updated_at) - SELECT hash, 'default', note, created_at, updated_at - FROM notes - """ - ) - - cursor.execute("DROP TABLE notes") - cursor.execute("ALTER TABLE notes_new RENAME TO notes") - self.connection.commit() - except Exception as exc: - logger.debug(f"Notes schema migration skipped/failed: {exc}") - + @_db_retry() def _update_metadata_modified_time(self, file_hash: str) -> None: """Update the time_modified timestamp for a file's metadata.""" try: @@ -687,108 +596,112 @@ class API_folder_store: Returns: The file hash (primary key) """ - try: - abs_path = self._normalize_input_path(file_path) - db_path = self._to_db_file_path(abs_path) - logger.debug(f"[get_or_create_file_entry] Looking up: {db_path}") + abs_path = self._normalize_input_path(file_path) + db_path = self._to_db_file_path(abs_path) + logger.debug(f"[get_or_create_file_entry] Looking up: {db_path}") + mm_debug(f"[folder-db] get_or_create_file_entry start: {db_path}") - # If hash not provided, compute it - if not file_hash: - file_hash = sha256_file(abs_path) - logger.debug(f"[get_or_create_file_entry] Computed hash: {file_hash}") + # If hash not provided, compute it + if not file_hash: + file_hash = sha256_file(abs_path) + logger.debug(f"[get_or_create_file_entry] Computed hash: {file_hash}") + mm_debug(f"[folder-db] computed hash: {file_hash}") - cursor = self.connection.cursor() - - # Prefer existing entry by path (file_path is UNIQUE in schema). - cursor.execute("SELECT hash FROM files WHERE file_path = ?", - (db_path, - )) - row = cursor.fetchone() - if row and row[0]: - existing_hash = str(row[0]) - if existing_hash != file_hash: - logger.debug( - f"[get_or_create_file_entry] Found existing file_path with different hash: path={db_path} existing={existing_hash} computed={file_hash}" - ) - else: - logger.debug( - f"[get_or_create_file_entry] Found existing file_path: {db_path} -> {existing_hash}" - ) - return existing_hash - - # Check if file entry exists - cursor.execute("SELECT hash FROM files WHERE hash = ?", - (file_hash, - )) - row = cursor.fetchone() - - if row: - logger.debug( - f"[get_or_create_file_entry] Found existing file hash: {file_hash}" - ) - return file_hash - - logger.debug( - f"[get_or_create_file_entry] File entry not found, creating new one" - ) - stat = abs_path.stat() + # Retry loop for transient 'database is locked' errors + import time + max_attempts = 6 + attempt = 0 + while True: try: - cursor.execute( - """ - INSERT INTO files (hash, file_path, file_modified) - VALUES (?, ?, ?) - """, - (file_hash, - db_path, - stat.st_mtime), - ) - except sqlite3.IntegrityError: - # Most likely: UNIQUE constraint on file_path. Re-fetch and return. - cursor.execute( - "SELECT hash FROM files WHERE file_path = ?", - (db_path, - ) - ) - row2 = cursor.fetchone() - if row2 and row2[0]: - existing_hash = str(row2[0]) - logger.debug( - f"[get_or_create_file_entry] Recovered from UNIQUE(file_path): {db_path} -> {existing_hash}" + with self._db_lock: + cursor = self.connection.cursor() + + mm_debug("[folder-db] SELECT files by file_path") + + # Prefer existing entry by path (file_path is UNIQUE in schema). + cursor.execute( + "SELECT hash FROM file WHERE file_path = ?", + (db_path,), ) - return existing_hash + row = cursor.fetchone() + if row and row[0]: + existing_hash = str(row[0]) + return existing_hash + + # Check if file entry exists + mm_debug("[folder-db] SELECT files by hash") + cursor.execute( + "SELECT hash FROM file WHERE hash = ?", + (file_hash,), + ) + row = cursor.fetchone() + + if row: + return file_hash + + mm_debug(f"[folder-db] INSERT file (in_tx={self.connection.in_transaction})") + stat = abs_path.stat() + try: + cursor.execute( + """ + INSERT INTO file (hash, file_path, file_modified) + VALUES (?, ?, ?) + """, + (file_hash, db_path, stat.st_mtime), + ) + mm_debug("[folder-db] INSERT file done") + except sqlite3.IntegrityError: + # Most likely: UNIQUE constraint on file_path. Re-fetch and return. + mm_debug("[folder-db] UNIQUE(file_path) hit; re-fetch") + cursor.execute( + "SELECT hash FROM file WHERE file_path = ?", + (db_path,), + ) + row2 = cursor.fetchone() + if row2 and row2[0]: + existing_hash = str(row2[0]) + return existing_hash + raise + + # Auto-create title tag + filename_without_ext = abs_path.stem + if filename_without_ext: + # Normalize underscores to spaces for consistency + title_value = filename_without_ext.replace("_", " ").strip() + title_tag = f"title:{title_value}" + mm_debug("[folder-db] INSERT title tag") + cursor.execute( + """ + INSERT OR IGNORE INTO tag (hash, tag) + VALUES (?, ?) + """, + (file_hash, title_tag), + ) + + mm_debug("[folder-db] COMMIT (file/tag)") + self.connection.commit() + mm_debug(f"[folder-db] get_or_create_file_entry done: {file_hash}") + return file_hash + + except sqlite3.OperationalError as e: + # Retry on transient locks + msg = str(e or "").lower() + if "database is locked" in msg and attempt < max_attempts: + attempt += 1 + sleep_time = min(0.5 * (2 ** (attempt - 1)), 5.0) + logger.debug(f"[get_or_create_file_entry] Database locked; retrying in {sleep_time:.2f}s (attempt {attempt}/{max_attempts})") + mm_debug(f"[folder-db] LOCKED (get_or_create); sleep {sleep_time:.2f}s ({attempt}/{max_attempts})") + time.sleep(sleep_time) + continue + logger.error(f"[get_or_create_file_entry] OperationalError: {e}", exc_info=True) raise - logger.debug( - f"[get_or_create_file_entry] Created new file entry for hash: {file_hash}" - ) - - # Auto-create title tag - filename_without_ext = abs_path.stem - if filename_without_ext: - # Normalize underscores to spaces for consistency - title_value = filename_without_ext.replace("_", " ").strip() - title_tag = f"title:{title_value}" - cursor.execute( - """ - INSERT OR IGNORE INTO tags (hash, tag) - VALUES (?, ?) - """, - (file_hash, - title_tag), + except Exception as e: + logger.error( + f"[get_or_create_file_entry] ❌ Error getting/creating file entry for {file_path}: {e}", + exc_info=True, ) - logger.debug( - f"[get_or_create_file_entry] Auto-created title tag for hash {file_hash}" - ) - - self.connection.commit() - logger.debug(f"[get_or_create_file_entry] Committed file entry {file_hash}") - return file_hash - except Exception as e: - logger.error( - f"[get_or_create_file_entry] ❌ Error getting/creating file entry for {file_path}: {e}", - exc_info=True, - ) - raise + raise def get_file_hash(self, file_path: Path) -> Optional[str]: """Get the file hash for a file path, or None if not found.""" @@ -796,7 +709,7 @@ class API_folder_store: abs_path = self._normalize_input_path(file_path) str_path = self._to_db_file_path(abs_path) cursor = self.connection.cursor() - cursor.execute("SELECT hash FROM files WHERE file_path = ?", + cursor.execute("SELECT hash FROM file WHERE file_path = ?", (str_path, )) row = cursor.fetchone() @@ -872,13 +785,13 @@ class API_folder_store: cursor = self.connection.cursor() - # Ensure both hashes exist in files table (metadata has FK to files) - cursor.execute("SELECT 1 FROM files WHERE hash = ?", + # Ensure both hashes exist in file table (metadata has FK to file) + cursor.execute("SELECT 1 FROM file WHERE hash = ?", (file_hash, )) if not cursor.fetchone(): raise ValueError(f"Hash not found in store DB: {file_hash}") - cursor.execute("SELECT 1 FROM files WHERE hash = ?", + cursor.execute("SELECT 1 FROM file WHERE hash = ?", (related_file_hash, )) if not cursor.fetchone(): @@ -975,7 +888,7 @@ class API_folder_store: """ SELECT f.hash, f.file_path, m.relationships FROM metadata m - JOIN files f ON m.hash = f.hash + JOIN file f ON m.hash = f.hash WHERE m.relationships LIKE ? """, (f"%{target_hash}%", @@ -1012,17 +925,17 @@ class API_folder_store: ) return [] + @_db_retry() def save_metadata(self, file_path: Path, metadata: Dict[str, Any]) -> None: """Save metadata for a file.""" try: abs_path = self._normalize_input_path(file_path) db_path = self._to_db_file_path(abs_path) logger.debug(f"[save_metadata] Starting save for: {db_path}") - + mm_debug(f"[folder-db] save_metadata start: {db_path}") file_hash = self.get_or_create_file_entry(abs_path, metadata.get("hash")) logger.debug(f"[save_metadata] Got/created file_hash: {file_hash}") - - cursor = self.connection.cursor() + mm_debug(f"[folder-db] save_metadata file_hash: {file_hash}") url = metadata.get("url", []) if not isinstance(url, str): @@ -1032,6 +945,8 @@ class API_folder_store: if not isinstance(relationships, str): relationships = json.dumps(relationships) + mm_debug("[folder-db] UPSERT metadata") + # Determine type from ext if not provided file_type = metadata.get("type") ext = metadata.get("ext") @@ -1040,36 +955,40 @@ class API_folder_store: file_type = get_type_from_ext(str(ext)) - cursor.execute( - """ - INSERT INTO metadata ( - hash, url, relationships, - duration, size, ext, type, - time_imported, time_modified + with self._db_lock: + cursor = self.connection.cursor() + cursor.execute( + """ + INSERT INTO metadata ( + hash, url, relationships, + duration, size, ext, type, + time_imported, time_modified + ) + VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) + ON CONFLICT(hash) DO UPDATE SET + url = excluded.url, + relationships = excluded.relationships, + duration = excluded.duration, + size = excluded.size, + ext = excluded.ext, + type = excluded.type, + time_modified = CURRENT_TIMESTAMP, + updated_at = CURRENT_TIMESTAMP + """, + ( + file_hash, + url, + relationships, + metadata.get("duration"), + metadata.get("size"), + ext, + file_type, + ), ) - VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) - ON CONFLICT(hash) DO UPDATE SET - url = excluded.url, - relationships = excluded.relationships, - duration = excluded.duration, - size = excluded.size, - ext = excluded.ext, - type = excluded.type, - time_modified = CURRENT_TIMESTAMP, - updated_at = CURRENT_TIMESTAMP - """, - ( - file_hash, - url, - relationships, - metadata.get("duration"), - metadata.get("size"), - ext, - file_type, - ), - ) - self.connection.commit() + mm_debug("[folder-db] COMMIT (metadata)") + self.connection.commit() + mm_debug(f"[folder-db] save_metadata done: {file_hash}") logger.debug(f"[save_metadata] Committed metadata for hash {file_hash}") except Exception as e: logger.error( @@ -1143,7 +1062,7 @@ class API_folder_store: # 2. Save Tags # We assume tags list is complete and includes title if needed - cursor.execute("DELETE FROM tags WHERE hash = ?", + cursor.execute("DELETE FROM tag WHERE hash = ?", (file_hash, )) @@ -1152,7 +1071,7 @@ class API_folder_store: if tag: cursor.execute( """ - INSERT OR IGNORE INTO tags (hash, tag) + INSERT OR IGNORE INTO tag (hash, tag) VALUES (?, ?) """, (file_hash, @@ -1178,7 +1097,7 @@ class API_folder_store: cursor.execute( """ - SELECT t.tag FROM tags t + SELECT t.tag FROM tag t WHERE t.hash = ? ORDER BY t.tag """, @@ -1191,6 +1110,7 @@ class API_folder_store: logger.error(f"Error getting tags for hash {file_hash}: {e}", exc_info=True) return [] + @_db_retry() def save_tags(self, file_path: Path, tags: List[str]) -> None: """Save tags for a file, replacing all existing tags.""" try: @@ -1205,14 +1125,14 @@ class API_folder_store: cursor.execute( """ - SELECT tag FROM tags WHERE hash = ? AND tag LIKE 'title:%' + SELECT tag FROM tag WHERE hash = ? AND tag LIKE 'title:%' """, (file_hash, ), ) existing_title = cursor.fetchone() - cursor.execute("DELETE FROM tags WHERE hash = ?", + cursor.execute("DELETE FROM tag WHERE hash = ?", (file_hash, )) logger.debug(f"[save_tags] Deleted existing tags for hash {file_hash}") @@ -1225,7 +1145,7 @@ class API_folder_store: if existing_title and not new_title_provided: cursor.execute( """ - INSERT INTO tags (hash, tag) VALUES (?, ?) + INSERT INTO tag (hash, tag) VALUES (?, ?) """, (file_hash, existing_title[0]), @@ -1239,7 +1159,7 @@ class API_folder_store: title_tag = f"title:{title_value}" cursor.execute( """ - INSERT INTO tags (hash, tag) VALUES (?, ?) + INSERT INTO tag (hash, tag) VALUES (?, ?) """, (file_hash, title_tag), @@ -1251,7 +1171,7 @@ class API_folder_store: if tag: cursor.execute( """ - INSERT OR IGNORE INTO tags (hash, tag) + INSERT OR IGNORE INTO tag (hash, tag) VALUES (?, ?) """, (file_hash, @@ -1262,7 +1182,7 @@ class API_folder_store: logger.debug(f"[save_tags] Committed {len(tags)} tags for hash {file_hash}") # Verify they were actually saved - cursor.execute("SELECT COUNT(*) FROM tags WHERE hash = ?", + cursor.execute("SELECT COUNT(*) FROM tag WHERE hash = ?", (file_hash, )) saved_count = cursor.fetchone()[0] @@ -1278,6 +1198,7 @@ class API_folder_store: ) raise + @_db_retry() def add_tags(self, file_path: Path, tags: List[str]) -> None: """Add tags to a file.""" try: @@ -1295,7 +1216,7 @@ class API_folder_store: if user_title_tag: cursor.execute( """ - DELETE FROM tags WHERE hash = ? AND tag LIKE 'title:%' + DELETE FROM tag WHERE hash = ? AND tag LIKE 'title:%' """, (file_hash, ), @@ -1303,7 +1224,7 @@ class API_folder_store: else: cursor.execute( """ - SELECT COUNT(*) FROM tags WHERE hash = ? AND tag LIKE 'title:%' + SELECT COUNT(*) FROM tag WHERE hash = ? AND tag LIKE 'title:%' """, (file_hash, ), @@ -1318,7 +1239,7 @@ class API_folder_store: title_tag = f"title:{title_value}" cursor.execute( """ - INSERT OR IGNORE INTO tags (hash, tag) + INSERT OR IGNORE INTO tag (hash, tag) VALUES (?, ?) """, (file_hash, @@ -1330,7 +1251,7 @@ class API_folder_store: if tag: cursor.execute( """ - INSERT OR IGNORE INTO tags (hash, tag) + INSERT OR IGNORE INTO tag (hash, tag) VALUES (?, ?) """, (file_hash, @@ -1344,6 +1265,7 @@ class API_folder_store: logger.error(f"Error adding tags for {file_path}: {e}", exc_info=True) raise + @_db_retry() def remove_tags(self, file_path: Path, tags: List[str]) -> None: """Remove specific tags from a file.""" try: @@ -1355,7 +1277,7 @@ class API_folder_store: if tag: cursor.execute( """ - DELETE FROM tags + DELETE FROM tag WHERE hash = ? AND tag = ? """, @@ -1369,6 +1291,7 @@ class API_folder_store: logger.error(f"Error removing tags for {file_path}: {e}", exc_info=True) raise + @_db_retry() def add_tags_to_hash(self, file_hash: str, tags: List[str]) -> None: """Add tags to a file by hash.""" try: @@ -1385,7 +1308,7 @@ class API_folder_store: if user_title_tag: cursor.execute( """ - DELETE FROM tags WHERE hash = ? AND tag LIKE 'title:%' + DELETE FROM tag WHERE hash = ? AND tag LIKE 'title:%' """, (file_hash, ), @@ -1396,7 +1319,7 @@ class API_folder_store: if tag: cursor.execute( """ - INSERT OR IGNORE INTO tags (hash, tag) + INSERT OR IGNORE INTO tag (hash, tag) VALUES (?, ?) """, (file_hash, @@ -1410,6 +1333,7 @@ class API_folder_store: logger.error(f"Error adding tags for hash {file_hash}: {e}", exc_info=True) raise + @_db_retry() def remove_tags_from_hash(self, file_hash: str, tags: List[str]) -> None: """Remove specific tags from a file by hash.""" try: @@ -1420,7 +1344,7 @@ class API_folder_store: if tag: cursor.execute( """ - DELETE FROM tags + DELETE FROM tag WHERE hash = ? AND tag = ? """, @@ -1437,6 +1361,7 @@ class API_folder_store: ) raise + @_db_retry() def update_metadata_by_hash( self, file_hash: str, @@ -1663,7 +1588,7 @@ class API_folder_store: try: cursor = self.connection.cursor() cursor.execute( - "SELECT name, note FROM notes WHERE hash = ? ORDER BY name ASC", + "SELECT name, note FROM note WHERE hash = ? ORDER BY name ASC", (file_hash, ), ) @@ -1696,7 +1621,7 @@ class API_folder_store: cursor = self.connection.cursor() cursor.execute( """ - INSERT INTO notes (hash, name, note) + INSERT INTO note (hash, name, note) VALUES (?, ?, ?) ON CONFLICT(hash, name) DO UPDATE SET note = excluded.note, @@ -1720,7 +1645,7 @@ class API_folder_store: raise ValueError("Note name is required") cursor = self.connection.cursor() cursor.execute( - "DELETE FROM notes WHERE hash = ? AND name = ?", + "DELETE FROM note WHERE hash = ? AND name = ?", (file_hash, note_name), ) @@ -1739,8 +1664,8 @@ class API_folder_store: cursor.execute( """ - SELECT DISTINCT f.hash, f.file_path FROM files f - JOIN tags t ON f.hash = t.hash + SELECT DISTINCT f.hash, f.file_path FROM file f + JOIN tag t ON f.hash = t.hash WHERE t.tag = ? LIMIT ? """, @@ -1769,7 +1694,7 @@ class API_folder_store: cursor.execute( """ - SELECT file_path FROM files WHERE hash = ? + SELECT file_path FROM file WHERE hash = ? """, (file_hash, ), @@ -1801,7 +1726,7 @@ class API_folder_store: cursor.execute( """ - UPDATE files SET file_path = ?, updated_at = CURRENT_TIMESTAMP + UPDATE file SET file_path = ?, updated_at = CURRENT_TIMESTAMP WHERE file_path = ? """, (str_new_path, @@ -1821,7 +1746,7 @@ class API_folder_store: """Remove entries for files that no longer exist.""" try: cursor = self.connection.cursor() - cursor.execute("SELECT hash, file_path FROM files") + cursor.execute("SELECT hash, file_path FROM file") removed_count = 0 for file_hash, file_path in cursor.fetchall(): @@ -1830,7 +1755,7 @@ class API_folder_store: except Exception: abs_path = Path(file_path) if not abs_path.exists(): - cursor.execute("DELETE FROM files WHERE hash = ?", + cursor.execute("DELETE FROM file WHERE hash = ?", (file_hash, )) removed_count += 1 @@ -1855,7 +1780,7 @@ class API_folder_store: cursor = self.connection.cursor() # Get the hash first (for logging) - cursor.execute("SELECT hash FROM files WHERE file_path = ?", + cursor.execute("SELECT hash FROM file WHERE file_path = ?", (str_path, )) row = cursor.fetchone() @@ -1932,7 +1857,7 @@ class API_folder_store: pass # Delete the file entry (cascades to metadata, tags, notes, etc via foreign keys) - cursor.execute("DELETE FROM files WHERE file_path = ?", + cursor.execute("DELETE FROM file WHERE file_path = ?", (str_path, )) self.connection.commit() @@ -2436,7 +2361,7 @@ class DatabaseAPI: """Get file hash from the database, or None if not found.""" cursor = self.get_cursor() cursor.execute( - "SELECT hash FROM files WHERE LOWER(hash) = ?", + "SELECT hash FROM file WHERE LOWER(hash) = ?", (file_hash.lower(), ) ) @@ -2446,7 +2371,7 @@ class DatabaseAPI: def get_all_file_hashes(self) -> Set[str]: """Get all file hashes in the database.""" cursor = self.get_cursor() - cursor.execute("SELECT hash FROM files") + cursor.execute("SELECT hash FROM file") return {row[0] for row in cursor.fetchall()} @@ -2456,8 +2381,8 @@ class DatabaseAPI: cursor.execute( """ SELECT DISTINCT f.hash, t.tag - FROM files f - JOIN tags t ON f.hash = t.hash + FROM file f + JOIN tag t ON f.hash = t.hash WHERE LOWER(t.tag) LIKE ? """, (query_pattern, @@ -2469,7 +2394,7 @@ class DatabaseAPI: """Get hashes of files matching a path pattern.""" cursor = self.get_cursor() cursor.execute( - "SELECT DISTINCT hash FROM files WHERE LOWER(file_path) LIKE ?", + "SELECT DISTINCT hash FROM file WHERE LOWER(file_path) LIKE ?", (like_pattern, ) ) @@ -2482,8 +2407,8 @@ class DatabaseAPI: cursor.execute( """ SELECT DISTINCT f.hash - FROM files f - JOIN tags t ON f.hash = t.hash + FROM file f + JOIN tag t ON f.hash = t.hash WHERE LOWER(t.tag) LIKE ? """, (like_pattern, @@ -2498,7 +2423,7 @@ class DatabaseAPI: cursor.execute( """ SELECT DISTINCT f.hash - FROM files f + FROM file f JOIN metadata m ON f.hash = m.hash WHERE m.url IS NOT NULL AND TRIM(m.url) != '' @@ -2521,7 +2446,7 @@ class DatabaseAPI: cursor.execute( """ SELECT DISTINCT f.hash - FROM files f + FROM file f JOIN metadata m ON f.hash = m.hash WHERE m.url IS NOT NULL AND LOWER(m.url) LIKE ? @@ -2556,7 +2481,7 @@ class DatabaseAPI: cursor.execute( """ SELECT DISTINCT f.hash - FROM files f + FROM file f JOIN metadata m ON f.hash = m.hash WHERE LOWER(LTRIM(COALESCE(m.ext, ''), '.')) LIKE ? ESCAPE '\\' LIMIT ? @@ -2568,7 +2493,7 @@ class DatabaseAPI: cursor.execute( """ SELECT DISTINCT f.hash - FROM files f + FROM file f JOIN metadata m ON f.hash = m.hash WHERE LOWER(LTRIM(COALESCE(m.ext, ''), '.')) = ? LIMIT ? @@ -2597,7 +2522,7 @@ class DatabaseAPI: SELECT f.hash, f.file_path, COALESCE((SELECT size FROM metadata WHERE hash = f.hash), 0) as size, COALESCE((SELECT ext FROM metadata WHERE hash = f.hash), '') as ext - FROM files f + FROM file f JOIN metadata m ON f.hash = m.hash WHERE LOWER(LTRIM(COALESCE(m.ext, ''), '.')) = ? ORDER BY f.file_path @@ -2619,7 +2544,7 @@ class DatabaseAPI: SELECT f.hash, f.file_path, COALESCE((SELECT size FROM metadata WHERE hash = f.hash), 0) as size, COALESCE((SELECT ext FROM metadata WHERE hash = f.hash), '') as ext - FROM files f + FROM file f JOIN metadata m ON f.hash = m.hash WHERE m.url IS NOT NULL AND TRIM(m.url) != '' @@ -2645,7 +2570,7 @@ class DatabaseAPI: SELECT f.hash, f.file_path, COALESCE((SELECT size FROM metadata WHERE hash = f.hash), 0) as size, COALESCE((SELECT ext FROM metadata WHERE hash = f.hash), '') as ext - FROM files f + FROM file f JOIN metadata m ON f.hash = m.hash WHERE m.url IS NOT NULL AND LOWER(m.url) LIKE ? @@ -2667,9 +2592,9 @@ class DatabaseAPI: placeholders = ",".join(["?"] * len(file_hashes)) fetch_sql = f""" SELECT hash, file_path, - COALESCE((SELECT size FROM metadata WHERE hash = files.hash), 0) as size, - COALESCE((SELECT ext FROM metadata WHERE hash = files.hash), '') as ext - FROM files + COALESCE((SELECT size FROM metadata WHERE hash = file.hash), 0) as size, + COALESCE((SELECT ext FROM metadata WHERE hash = file.hash), '') as ext + FROM file WHERE hash IN ({placeholders}) ORDER BY file_path LIMIT ? @@ -2685,7 +2610,7 @@ class DatabaseAPI: SELECT f.hash, f.file_path, COALESCE((SELECT size FROM metadata WHERE hash = f.hash), 0) as size, COALESCE((SELECT ext FROM metadata WHERE hash = f.hash), '') as ext - FROM files f + FROM file f ORDER BY file_path LIMIT ? """, @@ -2697,7 +2622,7 @@ class DatabaseAPI: def get_tags_for_file(self, file_hash: str) -> List[str]: """Get all tags for a file given its hash.""" cursor = self.get_cursor() - cursor.execute("SELECT tag FROM tags WHERE hash = ?", + cursor.execute("SELECT tag FROM tag WHERE hash = ?", (file_hash, )) return [row[0] for row in cursor.fetchall()] @@ -2709,7 +2634,7 @@ class DatabaseAPI: cursor = self.get_cursor() cursor.execute( """ - SELECT DISTINCT tag FROM tags + SELECT DISTINCT tag FROM tag WHERE hash = ? AND LOWER(tag) LIKE ? """, @@ -2730,8 +2655,8 @@ class DatabaseAPI: SELECT DISTINCT f.hash, f.file_path, COALESCE((SELECT size FROM metadata WHERE hash = f.hash), 0) as size, COALESCE((SELECT ext FROM metadata WHERE hash = f.hash), '') as ext - FROM files f - JOIN tags t ON f.hash = t.hash + FROM file f + JOIN tag t ON f.hash = t.hash WHERE LOWER(t.tag) LIKE ? ORDER BY f.file_path LIMIT ? @@ -2753,8 +2678,8 @@ class DatabaseAPI: SELECT DISTINCT f.hash, f.file_path, COALESCE((SELECT size FROM metadata WHERE hash = f.hash), 0) as size, COALESCE((SELECT ext FROM metadata WHERE hash = f.hash), '') as ext - FROM files f - JOIN tags t ON f.hash = t.hash + FROM file f + JOIN tag t ON f.hash = t.hash WHERE LOWER(t.tag) LIKE ? AND LOWER(t.tag) NOT LIKE '%:%' ORDER BY f.file_path LIMIT ? @@ -2777,7 +2702,7 @@ class DatabaseAPI: SELECT DISTINCT f.hash, f.file_path, COALESCE((SELECT size FROM metadata WHERE hash = f.hash), 0) as size, COALESCE((SELECT ext FROM metadata WHERE hash = f.hash), '') as ext - FROM files f + FROM file f WHERE {where_clause} ORDER BY f.file_path LIMIT ? @@ -2797,8 +2722,8 @@ class DatabaseAPI: SELECT DISTINCT f.hash, f.file_path, COALESCE((SELECT size FROM metadata WHERE hash = f.hash), 0) as size, COALESCE((SELECT ext FROM metadata WHERE hash = f.hash), '') as ext - FROM files f - JOIN tags t ON f.hash = t.hash + FROM file f + JOIN tag t ON f.hash = t.hash WHERE LOWER(t.tag) LIKE ? ORDER BY f.file_path LIMIT ? @@ -2859,7 +2784,7 @@ class LocalLibraryInitializer: try: cursor = self.db.connection.cursor() - cursor.execute("SELECT COUNT(*) FROM files") + cursor.execute("SELECT COUNT(*) FROM file") row = cursor.fetchone() self.stats["files_total_db"] = int( row[0] @@ -2924,7 +2849,7 @@ class LocalLibraryInitializer: try: cursor = self.db.connection.cursor() cursor.execute( - "UPDATE files SET file_path = ?, updated_at = CURRENT_TIMESTAMP WHERE hash = ?", + "UPDATE file SET file_path = ?, updated_at = CURRENT_TIMESTAMP WHERE hash = ?", (self.db._to_db_file_path(target_path), file_hash), ) @@ -2966,7 +2891,7 @@ class LocalLibraryInitializer: try: cursor = self.db.connection.cursor() cursor.execute( - "UPDATE files SET file_path = ?, updated_at = CURRENT_TIMESTAMP WHERE hash = ?", + "UPDATE file SET file_path = ?, updated_at = CURRENT_TIMESTAMP WHERE hash = ?", (self.db._to_db_file_path(target_path), file_hash), ) @@ -3036,7 +2961,7 @@ class LocalLibraryInitializer: """Get existing files from database by normalized path, returns {normalized_path: hash}.""" try: cursor = self.db.connection.cursor() - cursor.execute("SELECT hash, file_path FROM files") + cursor.execute("SELECT hash, file_path FROM file") result = {} for file_hash, file_path in cursor.fetchall(): @@ -3065,7 +2990,7 @@ class LocalLibraryInitializer: file_hash = sha256_file(file_path) try: cursor = self.db.connection.cursor() - cursor.execute("SELECT 1 FROM files WHERE hash = ?", + cursor.execute("SELECT 1 FROM file WHERE hash = ?", (file_hash, )) exists_by_hash = cursor.fetchone() is not None @@ -3502,8 +3427,8 @@ class LocalLibrarySearchOptimizer: cursor.execute( """ SELECT f.file_path - FROM files f - JOIN tags t ON f.id = t.file_id + FROM file f + JOIN tag t ON f.hash = t.hash WHERE t.tag LIKE ? LIMIT ? """, @@ -3511,7 +3436,7 @@ class LocalLibrarySearchOptimizer: limit), ) - return [Path(row[0]) for row in cursor.fetchall()] + return [self.db._from_db_file_path(row[0]) for row in cursor.fetchall()] except Exception as e: logger.error(f"Tag search failed: {e}") return [] @@ -3525,7 +3450,7 @@ class LocalLibrarySearchOptimizer: items_json = json.dumps(items) cursor.execute( """ - INSERT INTO playlists (name, items, updated_at) + INSERT INTO playlist (name, items, updated_at) VALUES (?, ?, CURRENT_TIMESTAMP) ON CONFLICT(name) DO UPDATE SET items = excluded.items, @@ -3547,7 +3472,7 @@ class LocalLibrarySearchOptimizer: try: cursor = self.db.connection.cursor() cursor.execute( - "SELECT id, name, items, updated_at FROM playlists ORDER BY updated_at DESC" + "SELECT id, name, items, updated_at FROM playlist ORDER BY updated_at DESC" ) results = [] for row in cursor.fetchall(): @@ -3574,7 +3499,7 @@ class LocalLibrarySearchOptimizer: return None try: cursor = self.db.connection.cursor() - cursor.execute("SELECT items FROM playlists WHERE name = ?", + cursor.execute("SELECT items FROM playlist WHERE name = ?", (name, )) row = cursor.fetchone() @@ -3598,7 +3523,7 @@ class LocalLibrarySearchOptimizer: try: cursor = self.db.connection.cursor() cursor.execute( - "SELECT name, items FROM playlists WHERE id = ?", + "SELECT name, items FROM playlist WHERE id = ?", (playlist_id, ) ) @@ -3620,7 +3545,7 @@ class LocalLibrarySearchOptimizer: return False try: cursor = self.db.connection.cursor() - cursor.execute("DELETE FROM playlists WHERE id = ?", + cursor.execute("DELETE FROM playlist WHERE id = ?", (playlist_id, )) self.db.connection.commit() diff --git a/CLI.py b/CLI.py index 5d45ed1..9730b28 100644 --- a/CLI.py +++ b/CLI.py @@ -3824,9 +3824,28 @@ class PipelineExecutor: # Fall back to default selection rendering on any failure. pass + items = piped_result if isinstance(piped_result, list) else [piped_result] + + # Special-case: selecting notes should show the text content directly. + note_like_items = [ + i for i in items + if isinstance(i, dict) and ("note_text" in i or "note" in i) + ] + if note_like_items: + for idx, item in enumerate(note_like_items, 1): + note_name = str( + item.get("note_name") + or item.get("name") + or f"note {idx}" + ).strip() + note_text = str(item.get("note_text") or item.get("note") or "") + note_text = note_text[:999] + stdout_console().print() + stdout_console().print(f"{note_name}:\n{note_text}") + ctx.set_last_result_items_only(items) + return + table = ResultTable("Selection Result") - items = piped_result if isinstance(piped_result, - list) else [piped_result] for item in items: table.add_result(item) ctx.set_last_result_items_only(items) diff --git a/SYS/cmdlet_api.py b/SYS/cmdlet_api.py new file mode 100644 index 0000000..4a5f1a1 --- /dev/null +++ b/SYS/cmdlet_api.py @@ -0,0 +1,220 @@ +from __future__ import annotations + +import contextlib +import io +from dataclasses import dataclass, field +from typing import Any, Callable, Dict, List, Optional, Sequence + +from SYS import pipeline as ctx +from SYS.models import PipelineStageContext +from SYS.rich_display import capture_rich_output + + +CmdletCallable = Callable[[Any, Sequence[str], Dict[str, Any]], int] + + +@dataclass(slots=True) +class CmdletRunResult: + """Programmatic result for a single cmdlet invocation.""" + + name: str + args: Sequence[str] + exit_code: int = 0 + emitted: List[Any] = field(default_factory=list) + + # Best-effort: cmdlets can publish tables/items via pipeline state even when + # they don't emit pipeline items. + result_table: Optional[Any] = None + result_items: List[Any] = field(default_factory=list) + result_subject: Optional[Any] = None + + stdout: str = "" + stderr: str = "" + error: Optional[str] = None + + +def _normalize_cmd_name(name: str) -> str: + return str(name or "").replace("_", "-").strip().lower() + + +def resolve_cmdlet(cmd_name: str) -> Optional[CmdletCallable]: + """Resolve a cmdlet callable by name from the registry (aliases supported).""" + try: + from SYS.cmdlet_catalog import ensure_registry_loaded + + ensure_registry_loaded() + except Exception: + pass + + try: + import cmdlet as cmdlet_pkg + + return cmdlet_pkg.get(cmd_name) + except Exception: + return None + + +def run_cmdlet( + cmd: str | CmdletCallable, + args: Sequence[str] | None, + config: Dict[str, Any], + *, + piped: Any = None, + isolate: bool = True, + capture_output: bool = True, + stage_index: int = 0, + total_stages: int = 1, + pipe_index: Optional[int] = None, + worker_id: Optional[str] = None, +) -> CmdletRunResult: + """Run a single cmdlet programmatically and return structured results. + + This is intended for TUI/webapp consumers that want cmdlet behavior without + going through the interactive CLI loop. + + Notes: + - When `isolate=True` (default) this runs inside `ctx.new_pipeline_state()` so + global CLI pipeline state is not mutated. + - Output capturing covers both normal `print()` and Rich output via + `capture_rich_output()`. + """ + + normalized_args: Sequence[str] = list(args or []) + + if isinstance(cmd, str): + name = _normalize_cmd_name(cmd) + cmd_fn = resolve_cmdlet(name) + else: + name = getattr(cmd, "__name__", "cmdlet") + cmd_fn = cmd + + result = CmdletRunResult(name=name, args=normalized_args) + + if not callable(cmd_fn): + result.exit_code = 1 + result.error = f"Unknown command: {name}" + result.stderr = result.error + return result + + stage_ctx = PipelineStageContext( + stage_index=int(stage_index), + total_stages=int(total_stages), + pipe_index=pipe_index, + worker_id=worker_id, + ) + + stdout_buffer = io.StringIO() + stderr_buffer = io.StringIO() + + stage_text = " ".join([name, *list(normalized_args)]).strip() + + state_cm = ctx.new_pipeline_state() if isolate else contextlib.nullcontext() + + with state_cm: + # Keep behavior predictable: start from a clean slate. + try: + ctx.reset() + except Exception: + pass + + try: + ctx.set_stage_context(stage_ctx) + except Exception: + pass + + try: + ctx.set_current_cmdlet_name(name) + except Exception: + pass + + try: + ctx.set_current_stage_text(stage_text) + except Exception: + pass + + try: + ctx.set_current_command_text(stage_text) + except Exception: + pass + + try: + run_cm = ( + capture_rich_output(stdout=stdout_buffer, stderr=stderr_buffer) + if capture_output + else contextlib.nullcontext() + ) + with run_cm: + with ( + contextlib.redirect_stdout(stdout_buffer) + if capture_output + else contextlib.nullcontext() + ): + with ( + contextlib.redirect_stderr(stderr_buffer) + if capture_output + else contextlib.nullcontext() + ): + result.exit_code = int(cmd_fn(piped, list(normalized_args), config)) + except Exception as exc: + result.exit_code = 1 + result.error = f"{type(exc).__name__}: {exc}" + finally: + result.stdout = stdout_buffer.getvalue() + result.stderr = stderr_buffer.getvalue() + + # Prefer cmdlet emits (pipeline semantics). + try: + result.emitted = list(stage_ctx.emits or []) + except Exception: + result.emitted = [] + + # Mirror CLI behavior: if cmdlet emitted items and there is no overlay table, + # make emitted items the last result items for downstream consumers. + try: + has_overlay = bool(ctx.get_display_table()) + except Exception: + has_overlay = False + + if result.emitted and not has_overlay: + try: + ctx.set_last_result_items_only(list(result.emitted)) + except Exception: + pass + + # Best-effort snapshot of visible results. + try: + result.result_table = ( + ctx.get_display_table() or ctx.get_current_stage_table() or ctx.get_last_result_table() + ) + except Exception: + result.result_table = None + + try: + result.result_items = list(ctx.get_last_result_items() or []) + except Exception: + result.result_items = [] + + try: + result.result_subject = ctx.get_last_result_subject() + except Exception: + result.result_subject = None + + # Cleanup stage-local markers. + try: + ctx.clear_current_stage_text() + except Exception: + pass + try: + ctx.clear_current_cmdlet_name() + except Exception: + pass + try: + ctx.clear_current_command_text() + except Exception: + pass + try: + ctx.set_stage_context(None) + except Exception: + pass + + return result diff --git a/SYS/metadata.py b/SYS/metadata.py index c2d4ba9..c9f1623 100644 --- a/SYS/metadata.py +++ b/SYS/metadata.py @@ -512,36 +512,46 @@ def import_pending_sidecars(db_root: Path, db: Any) -> None: if not base_path.exists(): continue - # Ensure file entry exists - file_id: Optional[int] = None + # Ensure file entry exists (folder store schema is keyed by hash). + file_hash_value: Optional[str] = None + if sha256_file and base_path.exists(): + try: + file_hash_value = sha256_file(base_path) + except Exception: + file_hash_value = None + + if not file_hash_value: + continue + + try: + db_file_path = ( + db._to_db_file_path(base_path) # type: ignore[attr-defined] + if hasattr(db, "_to_db_file_path") + else str(base_path) + ) + except Exception: + db_file_path = str(base_path) + + try: + file_modified = float(base_path.stat().st_mtime) + except Exception: + file_modified = None + try: cursor = db.connection.cursor() if db.connection else None if cursor: cursor.execute( - "SELECT id FROM files WHERE file_path = ?", - (str(base_path), - ) + "SELECT hash FROM file WHERE file_path = ?", + (str(db_file_path),), ) result = cursor.fetchone() - file_id = result[0] if result else None - except Exception: - file_id = None - - if not file_id: - try: - cursor = db.connection.cursor() if db.connection else None - if cursor: + if not result: cursor.execute( - 'INSERT INTO files (file_path, indexed_at, updated_at) VALUES (?, datetime("now"), datetime("now"))', - (str(base_path), - ), + "INSERT INTO file (hash, file_path, file_modified) VALUES (?, ?, ?)", + (file_hash_value, str(db_file_path), file_modified), ) db.connection.commit() - file_id = cursor.lastrowid - except Exception: - continue - - if not file_id: + except Exception: continue if sidecar_path.suffix == ".tag": @@ -557,15 +567,9 @@ def import_pending_sidecars(db_root: Path, db: Any) -> None: try: cursor = db.connection.cursor() if db.connection else None if cursor: - file_hash_value: Optional[str] = None - if hasattr(db, "get_file_hash"): - try: - file_hash_value = db.get_file_hash(file_id) - except Exception: - file_hash_value = None for tag in tags: cursor.execute( - "INSERT OR IGNORE INTO tags (hash, tag) VALUES (?, ?)", + "INSERT OR IGNORE INTO tag (hash, tag) VALUES (?, ?)", (file_hash_value, tag), ) @@ -608,13 +612,15 @@ def import_pending_sidecars(db_root: Path, db: Any) -> None: except Exception: pass + if not hash_value: + hash_value = file_hash_value + try: cursor = db.connection.cursor() if db.connection else None if cursor: cursor.execute( - 'INSERT OR REPLACE INTO metadata (file_id, hash, url, relationships, time_imported, time_modified) VALUES (?, ?, ?, ?, datetime("now"), datetime("now"))', + 'INSERT OR REPLACE INTO metadata (hash, url, relationships, time_imported, time_modified) VALUES (?, ?, ?, datetime("now"), datetime("now"))', ( - file_id, hash_value, json.dumps(url), json.dumps(relationships), @@ -634,9 +640,8 @@ def import_pending_sidecars(db_root: Path, db: Any) -> None: cursor = db.connection.cursor() if db.connection else None if cursor: cursor.execute( - 'INSERT INTO notes (file_id, note, created_at, updated_at) VALUES (?, ?, datetime("now"), datetime("now")) ON CONFLICT(file_id) DO UPDATE SET note = excluded.note, updated_at = datetime("now")', - (file_id, - content), + 'INSERT INTO note (hash, name, note, created_at, updated_at) VALUES (?, ?, ?, datetime("now"), datetime("now")) ON CONFLICT(hash, name) DO UPDATE SET note = excluded.note, updated_at = datetime("now")', + (file_hash_value, "default", content), ) db.connection.commit() except Exception: diff --git a/SYS/worker_manager.py b/SYS/worker_manager.py index ddcc209..d95ca16 100644 --- a/SYS/worker_manager.py +++ b/SYS/worker_manager.py @@ -263,6 +263,9 @@ class WorkerManager: self.refresh_thread: Optional[Thread] = None self._stop_refresh = False self._lock = Lock() + # Reuse the DB's own lock so there is exactly one lock guarding the + # sqlite connection (and it is safe for re-entrant/nested DB usage). + self._db_lock = self.db._db_lock self.worker_handlers: Dict[str, WorkerLoggingHandler] = {} # Track active handlers self._worker_last_step: Dict[str, @@ -272,7 +275,8 @@ class WorkerManager: """Close the database connection.""" if self.db: try: - self.db.close() + with self._db_lock: + self.db.close() except Exception: pass @@ -317,12 +321,13 @@ class WorkerManager: Count of workers updated. """ try: - return self.db.expire_running_workers( - older_than_seconds=older_than_seconds, - status=status, - reason=reason, - worker_id_prefix=worker_id_prefix, - ) + with self._db_lock: + return self.db.expire_running_workers( + older_than_seconds=older_than_seconds, + status=status, + reason=reason, + worker_id_prefix=worker_id_prefix, + ) except Exception as exc: logger.error(f"Failed to expire stale workers: {exc}", exc_info=True) return 0 @@ -419,14 +424,15 @@ class WorkerManager: True if worker was inserted successfully """ try: - result = self.db.insert_worker( - worker_id, - worker_type, - title, - description, - total_steps, - pipe=pipe - ) + with self._db_lock: + result = self.db.insert_worker( + worker_id, + worker_type, + title, + description, + total_steps, + pipe=pipe + ) if result > 0: logger.debug( f"[WorkerManager] Tracking worker: {worker_id} ({worker_type})" @@ -473,7 +479,8 @@ class WorkerManager: kwargs["last_updated"] = datetime.now().isoformat() if "current_step" in kwargs and kwargs["current_step"]: self._worker_last_step[worker_id] = str(kwargs["current_step"]) - return self.db.update_worker(worker_id, **kwargs) + with self._db_lock: + return self.db.update_worker(worker_id, **kwargs) return True except Exception as e: logger.error( @@ -510,7 +517,8 @@ class WorkerManager: if result_data: kwargs["result_data"] = result_data - success = self.db.update_worker(worker_id, **kwargs) + with self._db_lock: + success = self.db.update_worker(worker_id, **kwargs) logger.info(f"[WorkerManager] Worker finished: {worker_id} ({result})") self._worker_last_step.pop(worker_id, None) return success @@ -528,7 +536,8 @@ class WorkerManager: List of active worker dictionaries """ try: - return self.db.get_active_workers() + with self._db_lock: + return self.db.get_active_workers() except Exception as e: logger.error( f"[WorkerManager] Error getting active workers: {e}", @@ -546,7 +555,8 @@ class WorkerManager: List of finished worker dictionaries """ try: - all_workers = self.db.get_all_workers(limit=limit) + with self._db_lock: + all_workers = self.db.get_all_workers(limit=limit) # Filter to only finished workers finished = [ w for w in all_workers @@ -570,7 +580,8 @@ class WorkerManager: Worker data or None if not found """ try: - return self.db.get_worker(worker_id) + with self._db_lock: + return self.db.get_worker(worker_id) except Exception as e: logger.error( f"[WorkerManager] Error getting worker {worker_id}: {e}", @@ -583,7 +594,8 @@ class WorkerManager: limit: int = 500) -> List[Dict[str, Any]]: """Fetch recorded worker timeline events.""" - return self.db.get_worker_events(worker_id, limit) + with self._db_lock: + return self.db.get_worker_events(worker_id, limit) def log_step(self, worker_id: str, step_text: str) -> bool: """Log a step to a worker's step history. @@ -596,7 +608,8 @@ class WorkerManager: True if successful """ try: - success = self.db.append_worker_steps(worker_id, step_text) + with self._db_lock: + success = self.db.append_worker_steps(worker_id, step_text) if success: self._worker_last_step[worker_id] = step_text return success @@ -621,7 +634,8 @@ class WorkerManager: Steps text or empty string if not found """ try: - return self.db.get_worker_steps(worker_id) + with self._db_lock: + return self.db.get_worker_steps(worker_id) except Exception as e: logger.error( f"[WorkerManager] Error getting steps for worker {worker_id}: {e}", @@ -705,7 +719,8 @@ class WorkerManager: Number of workers deleted """ try: - count = self.db.cleanup_old_workers(days) + with self._db_lock: + count = self.db.cleanup_old_workers(days) if count > 0: logger.info(f"[WorkerManager] Cleaned up {count} old workers") return count @@ -729,12 +744,13 @@ class WorkerManager: """ try: step_label = self._get_last_step(worker_id) - return self.db.append_worker_stdout( - worker_id, - text, - step=step_label, - channel=channel - ) + with self._db_lock: + return self.db.append_worker_stdout( + worker_id, + text, + step=step_label, + channel=channel + ) except Exception as e: logger.error(f"[WorkerManager] Error appending stdout: {e}", exc_info=True) return False @@ -749,7 +765,8 @@ class WorkerManager: Worker's stdout or empty string """ try: - return self.db.get_worker_stdout(worker_id) + with self._db_lock: + return self.db.get_worker_stdout(worker_id) except Exception as e: logger.error(f"[WorkerManager] Error getting stdout: {e}", exc_info=True) return "" @@ -773,7 +790,8 @@ class WorkerManager: True if clear was successful """ try: - return self.db.clear_worker_stdout(worker_id) + with self._db_lock: + return self.db.clear_worker_stdout(worker_id) except Exception as e: logger.error(f"[WorkerManager] Error clearing stdout: {e}", exc_info=True) return False @@ -781,5 +799,6 @@ class WorkerManager: def close(self) -> None: """Close the worker manager and database connection.""" self.stop_auto_refresh() - self.db.close() + with self._db_lock: + self.db.close() logger.info("[WorkerManager] Closed") diff --git a/Store/Folder.py b/Store/Folder.py index 95a7674..fb29875 100644 --- a/Store/Folder.py +++ b/Store/Folder.py @@ -60,7 +60,7 @@ class Folder(Store): if location is None and PATH is not None: location = str(PATH) - self._location = location + self._location = str(location) if location is not None else "" self._name = name # Scan status (set during init) @@ -221,7 +221,7 @@ class Folder(Store): # Ensure DB points to the renamed path (update by hash). try: cursor.execute( - "UPDATE files SET file_path = ?, updated_at = CURRENT_TIMESTAMP WHERE hash = ?", + "UPDATE file SET file_path = ?, updated_at = CURRENT_TIMESTAMP WHERE hash = ?", (db._to_db_file_path(hash_path), file_hash), ) @@ -269,9 +269,9 @@ class Folder(Store): cursor.execute( """ SELECT f.hash, f.file_path - FROM files f + FROM file f WHERE NOT EXISTS ( - SELECT 1 FROM tags t WHERE t.hash = f.hash AND LOWER(t.tag) LIKE 'title:%' + SELECT 1 FROM tag t WHERE t.hash = f.hash AND LOWER(t.tag) LIKE 'title:%' ) """ ) @@ -298,7 +298,7 @@ class Folder(Store): # Third pass: discover files on disk that aren't in the database yet # These are hash-named files that were added after initial indexing - cursor.execute("SELECT LOWER(hash) FROM files") + cursor.execute("SELECT LOWER(hash) FROM file") db_hashes = {row[0] for row in cursor.fetchall()} @@ -484,10 +484,17 @@ class Folder(Store): except Exception: duration_value = None - # Save to database + # Save to database (metadata + tag/url updates share one connection) with API_folder_store(Path(self._location)) as db: - db.get_or_create_file_entry(save_file) - # Save metadata including extension + conn = getattr(db, "connection", None) + if conn is None: + raise RuntimeError("Folder store DB connection unavailable") + cursor = conn.cursor() + + debug( + f"[Folder.add_file] saving metadata for hash {file_hash}", + file=sys.stderr, + ) ext_clean = file_ext.lstrip(".") if file_ext else "" db.save_metadata( save_file, @@ -498,14 +505,77 @@ class Folder(Store): "duration": duration_value, }, ) + debug( + f"[Folder.add_file] metadata stored for hash {file_hash}", + file=sys.stderr, + ) - # Add tags if provided - if tag_list: - self.add_tag(file_hash, tag_list) + if tag_list: + try: + debug( + f"[Folder.add_file] merging {len(tag_list)} tags for {file_hash}", + file=sys.stderr, + ) + from SYS.metadata import compute_namespaced_tag_overwrite - # Add url if provided - if url: - self.add_url(file_hash, url) + existing_tags = [ + t for t in (db.get_tags(file_hash) or []) + if isinstance(t, str) and t.strip() + ] + _to_remove, _to_add, merged = compute_namespaced_tag_overwrite( + existing_tags, tag_list or [] + ) + if _to_remove or _to_add: + cursor.execute("DELETE FROM tag WHERE hash = ?", + (file_hash,)) + for t in merged: + tag_val = str(t).strip().lower() + if tag_val: + cursor.execute( + "INSERT OR IGNORE INTO tag (hash, tag) VALUES (?, ?)", + (file_hash, tag_val), + ) + conn.commit() + debug( + f"[Folder.add_file] tags rewritten for {file_hash}", + file=sys.stderr, + ) + try: + db._update_metadata_modified_time(file_hash) + except Exception: + pass + except Exception as exc: + debug(f"Local DB tag merge failed: {exc}", file=sys.stderr) + + if url: + try: + debug( + f"[Folder.add_file] merging {len(url)} URLs for {file_hash}", + file=sys.stderr, + ) + from SYS.metadata import normalize_urls + + existing_meta = db.get_metadata(file_hash) or {} + existing_urls = normalize_urls(existing_meta.get("url")) + incoming_urls = normalize_urls(url) + changed = False + for entry in list(incoming_urls or []): + if not entry: + continue + if entry not in existing_urls: + existing_urls.append(entry) + changed = True + if changed: + db.update_metadata_by_hash( + file_hash, + {"url": existing_urls}, + ) + debug( + f"[Folder.add_file] URLs merged for {file_hash}", + file=sys.stderr, + ) + except Exception as exc: + debug(f"Local DB URL merge failed: {exc}", file=sys.stderr) ##log(f"✓ Added to local storage: {save_file.name}", file=sys.stderr) return file_hash @@ -1373,6 +1443,34 @@ class Folder(Store): debug(f"Failed to get metadata for hash {file_hash}: {exc}") return None + def set_relationship(self, alt_hash: str, king_hash: str, kind: str = "alt") -> bool: + """Persist a relationship in the folder store DB. + + This is a thin wrapper around the folder DB API so cmdlets can avoid + backend-specific branching. + """ + try: + if not self._location: + return False + + alt_norm = _normalize_hash(alt_hash) + king_norm = _normalize_hash(king_hash) + if not alt_norm or not king_norm or alt_norm == king_norm: + return False + + from API.folder import API_folder_store + + with API_folder_store(Path(self._location).expanduser()) as db: + db.set_relationship_by_hash( + alt_norm, + king_norm, + str(kind or "alt"), + bidirectional=False, + ) + return True + except Exception: + return False + def get_tag(self, file_identifier: str, **kwargs: Any) -> Tuple[List[str], str]: """Get tags for a local file by hash. @@ -1432,14 +1530,14 @@ class Folder(Store): # Folder DB tag table is case-sensitive and add_tags_to_hash() is additive. # To enforce lowercase-only tags and namespace overwrites, rewrite the full tag set. cursor = db.connection.cursor() - cursor.execute("DELETE FROM tags WHERE hash = ?", + cursor.execute("DELETE FROM tag WHERE hash = ?", (hash, )) for t in merged: t = str(t).strip().lower() if t: cursor.execute( - "INSERT OR IGNORE INTO tags (hash, tag) VALUES (?, ?)", + "INSERT OR IGNORE INTO tag (hash, tag) VALUES (?, ?)", (hash, t), ) @@ -1953,7 +2051,7 @@ class Folder(Store): placeholders = ",".join(["?"] * len(chunk)) try: cursor.execute( - f"SELECT hash, file_path FROM files WHERE hash IN ({placeholders})", + f"SELECT hash, file_path FROM file WHERE hash IN ({placeholders})", chunk, ) rows = cursor.fetchall() or [] @@ -1987,13 +2085,13 @@ class Folder(Store): # Prefer upsert when supported, else fall back to INSERT OR REPLACE. try: cursor.executemany( - "INSERT INTO notes (hash, name, note) VALUES (?, ?, ?) " + "INSERT INTO note (hash, name, note) VALUES (?, ?, ?) " "ON CONFLICT(hash, name) DO UPDATE SET note = excluded.note, updated_at = CURRENT_TIMESTAMP", inserts, ) except Exception: cursor.executemany( - "INSERT OR REPLACE INTO notes (hash, name, note) VALUES (?, ?, ?)", + "INSERT OR REPLACE INTO note (hash, name, note) VALUES (?, ?, ?)", inserts, ) diff --git a/Store/HydrusNetwork.py b/Store/HydrusNetwork.py index 24677f4..2a186cf 100644 --- a/Store/HydrusNetwork.py +++ b/Store/HydrusNetwork.py @@ -218,6 +218,23 @@ class HydrusNetwork(Store): def get_name(self) -> str: return self.NAME + def set_relationship(self, alt_hash: str, king_hash: str, kind: str = "alt") -> bool: + """Persist a relationship via the Hydrus client API for this backend instance.""" + try: + alt_norm = str(alt_hash or "").strip().lower() + king_norm = str(king_hash or "").strip().lower() + if len(alt_norm) != 64 or len(king_norm) != 64 or alt_norm == king_norm: + return False + + client = getattr(self, "_client", None) + if client is None or not hasattr(client, "set_relationship"): + return False + + client.set_relationship(alt_norm, king_norm, str(kind or "alt")) + return True + except Exception: + return False + def add_file(self, file_path: Path, **kwargs: Any) -> str: """Upload file to Hydrus with full metadata support. @@ -284,9 +301,8 @@ class HydrusNetwork(Store): file_exists = True break if file_exists: - log( - f"ℹ️ Duplicate detected - file already in Hydrus with hash: {file_hash}", - file=sys.stderr, + debug( + f"{self._log_prefix()} Duplicate detected - file already in Hydrus with hash: {file_hash}" ) except Exception: pass @@ -301,9 +317,8 @@ class HydrusNetwork(Store): # Upload file if not already present if not file_exists: - log( - f"{self._log_prefix()} Uploading: {file_path.name}", - file=sys.stderr + debug( + f"{self._log_prefix()} Uploading: {file_path.name}" ) response = client.add_file(file_path) @@ -320,7 +335,7 @@ class HydrusNetwork(Store): raise Exception(f"Hydrus response missing file hash: {response}") file_hash = hydrus_hash - log(f"{self._log_prefix()} hash: {file_hash}", file=sys.stderr) + debug(f"{self._log_prefix()} hash: {file_hash}") # Add tags if provided (both for new and existing files) if tag_list: @@ -335,9 +350,8 @@ class HydrusNetwork(Store): f"{self._log_prefix()} Adding {len(tag_list)} tag(s): {tag_list}" ) client.add_tag(file_hash, tag_list, service_name) - log( - f"{self._log_prefix()} Tags added via '{service_name}'", - file=sys.stderr + debug( + f"{self._log_prefix()} Tags added via '{service_name}'" ) except Exception as exc: log( @@ -347,9 +361,8 @@ class HydrusNetwork(Store): # Associate url if provided (both for new and existing files) if url: - log( - f"{self._log_prefix()} Associating {len(url)} URL(s) with file", - file=sys.stderr + debug( + f"{self._log_prefix()} Associating {len(url)} URL(s) with file" ) for url in url: if url: diff --git a/cmdlet/_shared.py b/cmdlet/_shared.py index 72f75f1..d67f20d 100644 --- a/cmdlet/_shared.py +++ b/cmdlet/_shared.py @@ -186,7 +186,7 @@ class SharedArgs: name="path", type="string", choices=[], # Dynamically populated via get_store_choices() - description="Selects store", + description="selects store", ) URL = CmdletArg( @@ -194,6 +194,11 @@ class SharedArgs: type="string", description="http parser", ) + PROVIDER = CmdletArg( + name="provider", + type="string", + description="selects provider", + ) @staticmethod def get_store_choices(config: Optional[Dict[str, Any]] = None) -> List[str]: diff --git a/cmdlet/add_file.py b/cmdlet/add_file.py index 4b36174..33eb167 100644 --- a/cmdlet/add_file.py +++ b/cmdlet/add_file.py @@ -1,22 +1,19 @@ from __future__ import annotations -from typing import Any, Dict, Optional, Sequence, Tuple, List, Union +from typing import Any, Dict, Optional, Sequence, Tuple, List from pathlib import Path +from copy import deepcopy import sys import shutil -import tempfile import re -from urllib.parse import urlsplit, parse_qs from SYS import models from SYS import pipeline as ctx -from API import HydrusNetwork as hydrus_wrapper -from SYS.logger import log, debug +from SYS.logger import log, debug, is_debug_enabled from SYS.pipeline_progress import PipelineProgress from SYS.utils_constant import ALL_SUPPORTED_EXTENSIONS from Store import Store from . import _shared as sh -from SYS.result_table import ResultTable Cmdlet = sh.Cmdlet CmdletArg = sh.CmdletArg @@ -37,6 +34,33 @@ from SYS.metadata import write_metadata # Canonical supported filetypes for all stores/cmdlets SUPPORTED_MEDIA_EXTENSIONS = ALL_SUPPORTED_EXTENSIONS +DEBUG_PIPE_NOTE_PREVIEW_LENGTH = 256 + + +def _truncate_debug_note_text(value: Any) -> str: + raw = str(value or "") + if len(raw) <= DEBUG_PIPE_NOTE_PREVIEW_LENGTH: + return raw + return raw[:DEBUG_PIPE_NOTE_PREVIEW_LENGTH].rstrip() + "..." + + +def _sanitize_pipe_object_for_debug(pipe_obj: models.PipeObject) -> models.PipeObject: + safe_po = deepcopy(pipe_obj) + try: + extra = safe_po.extra + if isinstance(extra, dict): + sanitized = dict(extra) + notes = sanitized.get("notes") + if isinstance(notes, dict): + truncated_notes: Dict[str, str] = {} + for note_name, note_value in notes.items(): + truncated_notes[str(note_name)] = _truncate_debug_note_text(note_value) + sanitized["notes"] = truncated_notes + safe_po.extra = sanitized + except Exception: + pass + return safe_po + def _maybe_apply_florencevision_tags( media_path: Path, @@ -54,11 +78,11 @@ def _maybe_apply_florencevision_tags( If strict=false (default), failures log a warning and return the original tags. If strict=true, failures raise to abort the ingest. """ + strict = False try: tool_block = (config or {}).get("tool") fv_block = tool_block.get("florencevision") if isinstance(tool_block, dict) else None enabled = False - strict = False if isinstance(fv_block, dict): enabled = bool(fv_block.get("enabled")) strict = bool(fv_block.get("strict")) @@ -135,28 +159,13 @@ class Add_File(Cmdlet): super().__init__( name="add-file", summary= - "Upload a media file to specified location (Hydrus, file provider, or local directory).", + "Ingest a local media file to a store backend, file provider, or local directory.", usage= "add-file (-path | ) (-storage | -provider ) [-delete]", arg=[ SharedArgs.PATH, SharedArgs.STORE, - CmdletArg( - name="provider", - type="string", - required=False, - description= - "File hosting provider (e.g., 0x0, file.io, internetarchive)", - alias="prov", - ), - CmdletArg( - name="room", - type="string", - required=False, - description= - "Matrix room_id (when -provider matrix). If omitted, a room picker table is shown.", - alias="room_id", - ), + SharedArgs.PROVIDER, CmdletArg( name="delete", type="flag", @@ -166,6 +175,7 @@ class Add_File(Cmdlet): ), ], detail=[ + "Note: add-file ingests local files. To fetch remote sources, use download-file and pipe into add-file.", "- Storage location options (use -storage):", " hydrus: Upload to Hydrus database with metadata tagging", " local: Copy file to local directory", @@ -173,7 +183,6 @@ class Add_File(Cmdlet): "- File provider options (use -provider):", " 0x0: Upload to 0x0.st for temporary hosting", " file.io: Upload to file.io for temporary hosting", - " matrix: Upload to a Matrix room (requires Matrix config)", " internetarchive: Upload to archive.org (optional tag: ia: to upload into an existing item)", ], exec=self.run, @@ -188,7 +197,6 @@ class Add_File(Cmdlet): path_arg = parsed.get("path") location = parsed.get("store") provider_name = parsed.get("provider") - provider_room = parsed.get("room") delete_after = parsed.get("delete", False) # Convenience: when piping a file into add-file, allow `-path ` @@ -351,6 +359,36 @@ class Add_File(Cmdlet): f"[add-file] PARSED args: location={location}, provider={provider_name}, delete={delete_after}" ) + # add-file is ingestion-only: it does not download URLs here. + + # Show a concise PipeObject preview when debug logging is enabled to aid pipeline troubleshooting. + if is_debug_enabled(): + preview_items = ( + items_to_process if isinstance(items_to_process, list) + else [items_to_process] + ) + max_preview = 5 + for idx, item in enumerate(preview_items[:max_preview]): + po = item if isinstance(item, models.PipeObject) else None + if po is None: + try: + po = coerce_to_pipe_object(item, path_arg) + except Exception: + po = None + if po is None: + debug(f"[add-file] PIPE item[{idx}] preview (non-PipeObject)") + continue + debug(f"[add-file] PIPE item[{idx}] PipeObject preview") + try: + safe_po = _sanitize_pipe_object_for_debug(po) + safe_po.debug_table() + except Exception: + pass + if len(preview_items) > max_preview: + debug( + f"[add-file] Skipping {len(preview_items) - max_preview} additional piped item(s) in debug preview" + ) + # If this invocation was directory selector mode, show a selectable table and stop. # The user then runs @N (optionally piped), which replays add-file with selected paths. if dir_scan_mode: @@ -454,752 +492,50 @@ class Add_File(Cmdlet): and len(items_to_process) > 1 ) - # If we are going to persist results (-store / -provider) and the piped input contains - # URL download targets (e.g. playlist rows), preflight URL duplicates once up-front. - # IMPORTANT: Do not treat a *source URL* on an already-local file (e.g. screen-shot) - # as a download target; that would trigger yt-dlp preflights for non-yt-dlp URLs. - skip_url_downloads: set[str] = set() - download_mode_hint: Optional[str] = None - forced_ytdl_format: Optional[str] = None - if (provider_name or location) and isinstance(items_to_process, - list) and items_to_process: - url_candidates: List[str] = [] - for it in items_to_process: - try: - po_probe = coerce_to_pipe_object(it, path_arg) - except Exception: - continue - - # If the piped item already points at a local file, we are *ingesting* it, - # not downloading it. Skip URL-preflight and yt-dlp probing for those. - try: - po_path = getattr(po_probe, "path", None) - po_path_s = str(po_path or "").strip() - if po_path_s and not po_path_s.lower().startswith(("http://", - "https://", - "magnet:", - "torrent:")): - continue - except Exception: - pass - - try: - for u in self._get_url(it, po_probe) or []: - s = str(u or "").strip() - if not s: - continue - if s.lower().startswith(("http://", - "https://", - "magnet:", - "torrent:")): - url_candidates.append(s) - except Exception: - continue - - # Only meaningful when targeting a registered backend store. - if url_candidates and is_storage_backend_location and location: - # De-dupe in-order to keep logs stable. - seen: set[str] = set() - unique_urls: List[str] = [] - for u in url_candidates: - if u in seen: - continue - seen.add(u) - unique_urls.append(u) - - try: - skip_url_downloads = self._preflight_url_duplicates_bulk( - unique_urls, - config - ) - except Exception: - skip_url_downloads = set() - - # Batch-level format preflight: - # - If the sample URL only has one available format, force it for the batch. - # - If the sample URL appears audio-only (no video codecs), prefer audio mode. - try: - from tool.ytdlp import ( - YtDlpTool, - is_url_supported_by_ytdlp, - list_formats, - ) - - sample_url = unique_urls[0] if unique_urls else None - if sample_url and is_url_supported_by_ytdlp(str(sample_url)): - cf = None - try: - cookie_path = YtDlpTool(config).resolve_cookiefile() - if cookie_path is not None and cookie_path.is_file(): - cf = str(cookie_path) - except Exception: - cf = None - - fmts = list_formats( - str(sample_url), - no_playlist=False, - playlist_items=None, - cookiefile=cf, - ) - - if isinstance(fmts, list) and fmts: - has_video = False - try: - for f in fmts: - if not isinstance(f, dict): - continue - vcodec = str(f.get("vcodec", - "none") - or "none").strip().lower() - if vcodec and vcodec != "none": - has_video = True - break - except Exception: - has_video = False - - download_mode_hint = "video" if has_video else "audio" - - if len(fmts) == 1 and isinstance(fmts[0], dict): - fid = str(fmts[0].get("format_id") or "").strip() - if fid: - forced_ytdl_format = fid - except Exception: - download_mode_hint = download_mode_hint - forced_ytdl_format = forced_ytdl_format - - processed_url_items: set[str] = set() - for item in items_to_process: pipe_obj = coerce_to_pipe_object(item, path_arg) temp_dir_to_cleanup: Optional[Path] = None delete_after_item = delete_after try: - media_path_or_url, file_hash = self._resolve_source( + media_path, file_hash = self._resolve_source( item, path_arg, pipe_obj, config ) debug( - f"[add-file] RESOLVED source: path={media_path_or_url}, hash={file_hash[:12] if file_hash else 'N/A'}..." + f"[add-file] RESOLVED source: path={media_path}, hash={file_hash[:12] if file_hash else 'N/A'}..." ) - if not media_path_or_url: + if not media_path: failures += 1 continue - # If we got a hifi://track/ placeholder, resolve it to a decoded MPD first. - try: - if isinstance(media_path_or_url, Path): - mp_url = str(media_path_or_url) - if mp_url.lower().startswith("hifi:"): - manifest_path = sh.resolve_tidal_manifest_path(item) - if not manifest_path: - try: - meta = getattr(item, "full_metadata", None) - if isinstance(meta, dict) and meta.get("_tidal_manifest_error"): - log(str(meta.get("_tidal_manifest_error")), file=sys.stderr) - except Exception: - pass - log("HIFI selection has no playable DASH MPD manifest.", file=sys.stderr) - failures += 1 - continue - media_path_or_url = Path(manifest_path) - pipe_obj.path = str(media_path_or_url) - elif isinstance(media_path_or_url, str): - if str(media_path_or_url).strip().lower().startswith("hifi:"): - manifest_path = sh.resolve_tidal_manifest_path(item) - if not manifest_path: - try: - meta = getattr(item, "full_metadata", None) - if isinstance(meta, dict) and meta.get("_tidal_manifest_error"): - log(str(meta.get("_tidal_manifest_error")), file=sys.stderr) - except Exception: - pass - log("HIFI selection has no playable DASH MPD manifest.", file=sys.stderr) - failures += 1 - continue - media_path_or_url = Path(manifest_path) - pipe_obj.path = str(media_path_or_url) - except Exception: - pass - - manifest_source: Optional[Union[str, Path]] = None - tidal_metadata = None - try: - if isinstance(item, dict): - tidal_metadata = item.get("full_metadata") or item.get("metadata") - else: - tidal_metadata = ( - getattr(item, "full_metadata", None) - or getattr(item, "metadata", None) - ) - except Exception: - tidal_metadata = None - - if not tidal_metadata and isinstance(pipe_obj.extra, dict): - tidal_metadata = pipe_obj.extra.get("full_metadata") or pipe_obj.extra.get("metadata") - - if isinstance(tidal_metadata, dict): - manifest_source = ( - tidal_metadata.get("_tidal_manifest_path") - or tidal_metadata.get("_tidal_manifest_url") - ) - if not manifest_source: - target_is_mpd = False - if isinstance(media_path_or_url, Path): - target_is_mpd = str(media_path_or_url).lower().endswith(".mpd") - elif isinstance(media_path_or_url, str): - target_is_mpd = media_path_or_url.lower().endswith(".mpd") - if target_is_mpd: - manifest_source = media_path_or_url - - if manifest_source: - downloaded, tmp_dir = self._download_manifest_with_ffmpeg(manifest_source) - if downloaded is None: - failures += 1 - continue - media_path_or_url = str(downloaded) - pipe_obj.path = str(downloaded) - pipe_obj.is_temp = True - delete_after_item = True - if tmp_dir is not None: - temp_dir_to_cleanup = tmp_dir - - is_url_target = isinstance( - media_path_or_url, - str - ) and str(media_path_or_url).lower().startswith( - ("http://", - "https://", - "magnet:", - "torrent:") - ) - if use_steps and (not steps_started) and (not is_url_target): + if use_steps and (not steps_started): progress.begin_steps(3) progress.step("resolving source") steps_started = True # Update pipe_obj with resolved path - pipe_obj.path = str(media_path_or_url) - - table = None - full_metadata = None - if isinstance(pipe_obj.extra, dict): - table = pipe_obj.extra.get("table") - full_metadata = pipe_obj.extra.get("full_metadata") - - provider_table = str( - table or getattr(pipe_obj, "provider", "") - ).strip().lower() - if (provider_table == "alldebrid" - and isinstance(media_path_or_url, str) - and media_path_or_url.lower().startswith( - ("http://", "https://")) - and (provider_name or location)): - url_str = str(media_path_or_url) - if url_str in skip_url_downloads: - log( - f"Skipping download (already stored): {url_str}", - file=sys.stderr, - ) - successes += 1 - continue - - temp_dir_candidate = Path( - tempfile.mkdtemp(prefix="medios_alldebrid_") - ) - downloaded_path: Optional[Path] = None - try: - from ProviderCore.registry import get_search_provider - - provider = get_search_provider("alldebrid", config) - if provider is not None: - downloaded = provider.download( - pipe_obj, - temp_dir_candidate, - ) - if downloaded: - downloaded_path = Path(downloaded) - except Exception as exc: - log( - f"[add-file] AllDebrid download failed: {exc}", - file=sys.stderr, - ) - if downloaded_path and downloaded_path.exists(): - media_path_or_url = downloaded_path - pipe_obj.path = str(downloaded_path) - pipe_obj.is_temp = True - delete_after_item = True - temp_dir_to_cleanup = temp_dir_candidate - processed_url_items.add(url_str) - else: - shutil.rmtree(temp_dir_candidate, ignore_errors=True) - - # URL targets: prefer provider-aware download for OpenLibrary selections. - if isinstance(media_path_or_url, - str) and media_path_or_url.lower().startswith( - ("http://", - "https://", - "magnet:", - "torrent:")): - is_openlibrary = (str(table or "").lower() == "openlibrary") or ( - "openlibrary.org/books/" in media_path_or_url.lower() - ) - if is_openlibrary: - # Enrich tags from OpenLibrary metadata so the stored file has book tags (author/pages/etc). - try: - from Provider.openlibrary import OpenLibrary as _OpenLibrary - - olid = None - archive_id = None - if isinstance(full_metadata, dict): - olid = full_metadata.get( - "openlibrary_id" - ) or full_metadata.get("openlibrary") - archive_id = full_metadata.get("archive_id") - - if not olid: - import re - - m = re.search( - r"/books/(OL\d+M)", - str(media_path_or_url), - flags=re.IGNORECASE - ) - if m: - olid = m.group(1) - - scraped_tags: List[str] = [] - if olid: - scraped_tags.extend( - _OpenLibrary.scrape_openlibrary_metadata(str(olid)) - or [] - ) - if archive_id: - scraped_tags.append(f"internet_archive:{archive_id}") - - if scraped_tags: - existing = list(pipe_obj.tag or []) - pipe_obj.tag = merge_sequences( - existing, - scraped_tags, - case_sensitive=False - ) - except Exception: - pass - - from ProviderCore.registry import get_search_provider - from ProviderCore.base import SearchResult - - provider = get_search_provider("openlibrary", config) - if provider is None: - log( - "[add-file] OpenLibrary provider not available", - file=sys.stderr - ) - failures += 1 - continue - - # Lean, non-debug status output (the ResultTable search follows after ingest). - try: - title_text = str(getattr(pipe_obj, - "title", - "") or "").strip() - if not title_text and isinstance(full_metadata, dict): - title_text = str(full_metadata.get("title") - or "").strip() - tags_list = list(getattr(pipe_obj, "tag", None) or []) - tags_text = ", ".join( - str(t).strip() for t in tags_list if str(t).strip() - ) - log(f"Title: {title_text or 'Unknown'}") - log(f"Tags: {tags_text}") - except Exception: - pass - - temp_dir_to_cleanup = Path( - tempfile.mkdtemp(prefix="medios_openlibrary_") - ) - - # Wire OpenLibrary download progress into pipeline Live UI (no tqdm spam). - def _ol_progress( - kind: str, - completed: int, - total: Optional[int], - label: str - ) -> None: - try: - if kind == "pages" and total: - progress.set_status( - f"downloading pages {completed}/{total}" - ) - progress.set_percent( - int(round((completed / max(1, - total)) * 100.0)) - ) - elif kind == "bytes" and total: - progress.set_status( - f"downloading {label} {completed}/{total} bytes" - ) - progress.set_percent( - int(round((completed / max(1, - total)) * 100.0)) - ) - else: - progress.set_status("downloading") - except Exception: - return - - try: - progress.set_percent(0) - progress.set_status("downloading openlibrary") - except Exception: - pass - - sr = SearchResult( - table="openlibrary", - title=str(getattr(pipe_obj, - "title", - None) or "Unknown"), - path=str(media_path_or_url), - full_metadata=full_metadata - if isinstance(full_metadata, - dict) else {}, - ) - downloaded = provider.download( - sr, - temp_dir_to_cleanup, - progress_callback=_ol_progress - ) - if downloaded is None: - log( - "[add-file] OpenLibrary download failed", - file=sys.stderr - ) - failures += 1 - continue - - downloaded_path = Path(downloaded) - if downloaded_path.exists() and downloaded_path.is_dir(): - log( - "[add-file] OpenLibrary download produced a directory (PDF conversion failed). Cannot ingest.", - file=sys.stderr, - ) - failures += 1 - continue - - media_path_or_url = str(downloaded_path) - pipe_obj.path = str(downloaded_path) - delete_after_item = True - - try: - if ui is not None: - ui.set_pipe_percent(int(pipe_idx), 100) - ui.set_pipe_status_text(int(pipe_idx), "downloaded") - except Exception: - pass - - # For non-provider URLs, or if still a URL after provider attempt, delegate to download-media. - if isinstance(media_path_or_url, - str) and media_path_or_url.lower().startswith( - ("http://", - "https://", - "magnet:", - "torrent:")): - # Hydrus file URLs are direct file downloads and may require Hydrus auth headers. - # If the user provided a destination (-provider or -store), download now and continue. - if ((provider_name or location) - and isinstance(media_path_or_url, - str) - and media_path_or_url.lower().startswith(("http://", - "https://"))): - downloaded = self._try_download_hydrus_file_url( - file_url=str(media_path_or_url), - pipe_obj=pipe_obj, - config=config, - ) - if downloaded is not None: - downloaded_path, downloaded_temp_dir = downloaded - temp_dir_to_cleanup = downloaded_temp_dir - media_path_or_url = str(downloaded_path) - pipe_obj.path = str(downloaded_path) - pipe_obj.is_temp = True - delete_after_item = True - - # If it's still a URL target, fall back to the legacy delegate. - if isinstance(media_path_or_url, - str) and media_path_or_url.lower().startswith( - ("http://", - "https://", - "magnet:", - "torrent:")): - # If the user provided a destination (-store / -provider), download here and then - # continue normal add-file logic so the downloaded file is actually ingested. - url_str = str(media_path_or_url) - if provider_name or location: - # Avoid re-processing the same URL multiple times in a batch. - if url_str in processed_url_items: - successes += 1 - continue - processed_url_items.add(url_str) - - # If bulk preflight found this URL already stored, skip downloading. - if url_str in skip_url_downloads: - log( - f"Skipping download (already stored): {url_str}", - file=sys.stderr, - ) - successes += 1 - continue - - downloaded_pipe_dicts = ( - self._download_streaming_url_as_pipe_objects( - url_str, - config, - mode_hint=download_mode_hint, - ytdl_format_hint=forced_ytdl_format, - ) - ) - if not downloaded_pipe_dicts: - failures += 1 - continue - - # Merge original tags/notes/relationships into each downloaded item and ingest. - for dl_item in downloaded_pipe_dicts: - try: - if isinstance(dl_item, dict): - # Merge tags - base_tags = list( - getattr(pipe_obj, - "tag", - None) or [] - ) - if base_tags: - dl_tags = list(dl_item.get("tag") or []) - dl_item["tag"] = merge_sequences( - dl_tags, - base_tags, - case_sensitive=False - ) - - # Carry notes/relationships forward when present on the original. - base_notes = getattr( - pipe_obj, - "notes", - None - ) - if base_notes and ("notes" not in dl_item): - dl_item["notes"] = base_notes - base_rels = getattr( - pipe_obj, - "relationships", - None - ) - if base_rels and ("relationships" - not in dl_item): - dl_item["relationships"] = base_rels - except Exception: - pass - - dl_pipe_obj = coerce_to_pipe_object(dl_item, None) - try: - dl_media_path = Path( - str(getattr(dl_pipe_obj, - "path", - "") or "") - ) - except Exception: - dl_media_path = None - - if dl_media_path is None or not self._validate_source( - dl_media_path): - failures += 1 - continue - - if provider_name: - if str(provider_name - ).strip().lower() == "matrix": - room_id = None - if provider_room: - room_id = str(provider_room).strip() - if not room_id: - try: - matrix_conf = ( - config.get("provider", - {} - ).get("matrix", - {}) - if isinstance(config, - dict) else {} - ) - room_id = ( - str( - matrix_conf.get("room_id") - or "" - ).strip() or None - ) - except Exception: - room_id = None - if not room_id: - pending = [ - { - "path": - str(dl_media_path), - "pipe_obj": - dl_pipe_obj, - "delete_after": - bool(delete_after_item), - } - ] - return self._matrix_prompt_room_selection( - pending, - config, - list(args) - ) - - code = self._handle_matrix_upload( - dl_media_path, - dl_pipe_obj, - config, - delete_after_item, - room_id=room_id, - ) - else: - code = self._handle_provider_upload( - dl_media_path, - provider_name, - dl_pipe_obj, - config, - delete_after_item, - ) - if code == 0: - successes += 1 - else: - failures += 1 - continue - - if location: - try: - store = Store(config) - backends = store.list_backends() - if location in backends: - code = self._handle_storage_backend( - dl_item, - dl_media_path, - location, - dl_pipe_obj, - config, - delete_after_item, - collect_payloads=collected_payloads, - collect_relationship_pairs= - pending_relationship_pairs, - defer_url_association= - defer_url_association, - pending_url_associations= - pending_url_associations, - suppress_last_stage_overlay= - want_final_search_file, - auto_search_file= - auto_search_file_after_add, - ) - else: - code = self._handle_local_export( - dl_media_path, - location, - dl_pipe_obj, - config, - delete_after_item, - ) - except Exception as exc: - debug( - f"[add-file] ERROR: Failed to resolve location: {exc}" - ) - log( - f"Invalid location: {location}", - file=sys.stderr - ) - failures += 1 - continue - - if code == 0: - successes += 1 - else: - failures += 1 - continue - - # Finished processing all downloaded items for this URL. - continue - - # No destination specified: keep legacy behavior (download-media only). - code = self._delegate_to_download_file( - item, - url_str, - location, - provider_name, - args, - config - ) - if code == 0: - successes += 1 - else: - failures += 1 - continue - - media_path = ( - Path(media_path_or_url) if isinstance(media_path_or_url, - str) else media_path_or_url - ) + pipe_obj.path = str(media_path) if not self._validate_source(media_path): failures += 1 continue if provider_name: - # Matrix provider can prompt for a room selection if one is not configured. if str(provider_name).strip().lower() == "matrix": - room_id = None - if provider_room: - room_id = str(provider_room).strip() - if not room_id: - try: - matrix_conf = ( - config.get("provider", - {}).get("matrix", - {}) if isinstance(config, - dict) else {} - ) - room_id = str(matrix_conf.get("room_id") or "" - ).strip() or None - except Exception: - room_id = None - - if not room_id: - pending = [ - { - "path": str(media_path), - "pipe_obj": pipe_obj, - "delete_after": bool(delete_after_item), - } - ] - return self._matrix_prompt_room_selection( - pending, - config, - list(args) - ) - - code = self._handle_matrix_upload( - media_path, - pipe_obj, - config, - delete_after_item, - room_id=room_id - ) - else: - code = self._handle_provider_upload( - media_path, - provider_name, - pipe_obj, - config, - delete_after_item + log( + "Matrix uploads are handled by .matrix (not add-file).", + file=sys.stderr, ) + failures += 1 + continue + + code = self._handle_provider_upload( + media_path, + provider_name, + pipe_obj, + config, + delete_after_item + ) if code == 0: successes += 1 else: @@ -1521,10 +857,9 @@ class Add_File(Cmdlet): config: Dict[str, Any] ) -> None: - """Persist relationships into the appropriate backend DB/API. + """Persist relationships to backends that support relationships. - - Folder stores: write to the per-store SQLite DB (directional alt->king). - - Hydrus stores: call Hydrus relationship API. + This delegates to an optional backend method: `set_relationship(alt, king, kind)`. """ if not pending: return @@ -1543,84 +878,25 @@ class Add_File(Cmdlet): except Exception: continue - backend_type = type(backend).__name__.lower() - - # Folder-backed local DB - location_fn = getattr(backend, "location", None) - is_folder = type(backend).__name__ == "Folder" and callable(location_fn) - if is_folder and location_fn is not None: - try: - root = Path(str(location_fn())).expanduser() - with API_folder_store(root) as db: - processed_pairs: set[tuple[str, str]] = set() - for alt_hash, king_hash in sorted(pairs): - if not alt_hash or not king_hash or alt_hash == king_hash: - continue - if (alt_hash, king_hash) in processed_pairs: - continue - # Hash-first store DB write; skips if either hash isn't in this store DB. - try: - db.set_relationship_by_hash( - str(alt_hash), - str(king_hash), - "alt", - bidirectional=False - ) - except Exception: - continue - processed_pairs.add((alt_hash, king_hash)) - except Exception: - pass + setter = getattr(backend, "set_relationship", None) + if not callable(setter): continue - # Hydrus - if "hydrus" in backend_type or hasattr(backend, "_client"): - client: Any = getattr(backend, "_client", None) - # Do not fall back to a global/default Hydrus client here; relationships must not be cross-store. - if client is None or not hasattr(client, "set_relationship"): + processed_pairs: set[tuple[str, str]] = set() + for alt_hash, king_hash in sorted(pairs): + if not alt_hash or not king_hash or alt_hash == king_hash: + continue + if (alt_hash, king_hash) in processed_pairs: + continue + alt_norm = str(alt_hash).strip().lower() + king_norm = str(king_hash).strip().lower() + if len(alt_norm) != 64 or len(king_norm) != 64: + continue + try: + setter(alt_norm, king_norm, "alt") + processed_pairs.add((alt_hash, king_hash)) + except Exception: continue - - def _hash_exists(hash_hex: str) -> bool: - try: - if not hasattr(client, "fetch_file_metadata"): - return False - payload = client.fetch_file_metadata( - hashes=[hash_hex], - include_service_keys_to_tags=False, - include_file_url=False, - include_duration=False, - include_size=False, - include_mime=False, - include_notes=False, - ) - meta = payload.get("metadata" - ) if isinstance(payload, - dict) else None - return bool(isinstance(meta, list) and meta) - except Exception: - return False - - processed_pairs: set[tuple[str, str]] = set() - for alt_hash, king_hash in sorted(pairs): - if not alt_hash or not king_hash or alt_hash == king_hash: - continue - if (alt_hash, king_hash) in processed_pairs: - continue - try: - alt_norm = str(alt_hash).strip().lower() - king_norm = str(king_hash).strip().lower() - if len(alt_norm) != 64 or len(king_norm) != 64: - continue - if not _hash_exists(alt_norm) or not _hash_exists(king_norm): - continue - client.set_relationship(alt_norm, king_norm, "alt") - processed_pairs.add((alt_hash, king_hash)) - except Exception: - pass - continue - - # Other backends: no-op - _ = backend_type @staticmethod def _resolve_source( @@ -1629,12 +905,11 @@ class Add_File(Cmdlet): pipe_obj: models.PipeObject, config: Dict[str, Any], - ) -> Tuple[Optional[Path | str], + ) -> Tuple[Optional[Path], Optional[str]]: """Resolve the source file path from args or pipeline result. - Returns (media_path_or_url, file_hash) - where media_path_or_url can be a Path object or a URL string. + Returns (media_path, file_hash). """ # PRIORITY 1a: Try hash+path from directory scan result (has 'path' and 'hash' keys) if isinstance(result, dict): @@ -1673,14 +948,6 @@ class Add_File(Cmdlet): if isinstance(media_path, Path) and media_path.exists(): pipe_obj.path = str(media_path) return media_path, str(result_hash) - if isinstance(media_path, - str) and media_path.lower().startswith( - ("http://", - "https://", - "magnet:", - "torrent:")): - pipe_obj.path = media_path - return media_path, str(result_hash) except Exception as exc: debug(f"[add-file] Failed to retrieve via hash+store: {exc}") @@ -1696,49 +963,19 @@ class Add_File(Cmdlet): if pipe_path: pipe_path_str = str(pipe_path) debug(f"Resolved pipe_path: {pipe_path_str}") - if pipe_path_str.startswith("hydrus:"): - file_hash = pipe_path_str.split(":", 1)[1] - store_name = getattr(pipe_obj, "store", None) - if not store_name and isinstance(pipe_obj.extra, dict): - store_name = pipe_obj.extra.get("store") - media_path, success = Add_File._fetch_hydrus_path( - file_hash, config, store_name=str(store_name).strip() if store_name else None - ) - return media_path, file_hash if success else None if pipe_path_str.lower().startswith(("http://", "https://", "magnet:", - "torrent:")): - return pipe_path_str, None + "torrent:", + "hifi:", + "hydrus:")): + log( + "add-file ingests local files only. Use download-file first.", + file=sys.stderr, + ) + return None, None return Path(pipe_path_str), None - # PRIORITY 4: Try from pipe_obj.url (for streaming url without downloaded file) - pipe_url = getattr(pipe_obj, "url", None) - if pipe_url and isinstance(pipe_url, str): - # Check if it's a URL - if pipe_url.lower().startswith(("http://", - "https://", - "magnet:", - "torrent:")): - debug(f"Detected URL in pipe_obj.url: {pipe_url}") - return pipe_url, None - - # Try from hydrus hash in pipe_obj.extra or hash - hydrus_hash = None - if isinstance(pipe_obj.extra, dict): - hydrus_hash = pipe_obj.extra.get("hydrus_hash" - ) or pipe_obj.extra.get("hash") - hydrus_hash = hydrus_hash or pipe_obj.hash - - if hydrus_hash and hydrus_hash != "unknown": - store_name = getattr(pipe_obj, "store", None) - if not store_name and isinstance(pipe_obj.extra, dict): - store_name = pipe_obj.extra.get("store") - media_path, success = Add_File._fetch_hydrus_path( - str(hydrus_hash), config, store_name=str(store_name).strip() if store_name else None - ) - return media_path, str(hydrus_hash) if success else None - # Try from result (if it's a string path or URL) if isinstance(result, str): debug(f"Checking result string: {result}") @@ -1746,9 +983,14 @@ class Add_File(Cmdlet): if result.lower().startswith(("http://", "https://", "magnet:", - "torrent:")): - debug(f"Detected URL in result string: {result}") - return result, None # Return URL string directly + "torrent:", + "hifi:", + "hydrus:")): + log( + "add-file ingests local files only. Use download-file first.", + file=sys.stderr, + ) + return None, None media_path = Path(result) pipe_obj.path = str(media_path) return media_path, None @@ -1762,9 +1004,14 @@ class Add_File(Cmdlet): if first_item.lower().startswith(("http://", "https://", "magnet:", - "torrent:")): - debug(f"Detected URL in result list: {first_item}") - return first_item, None # Return URL string directly + "torrent:", + "hifi:", + "hydrus:")): + log( + "add-file ingests local files only. Use download-file first.", + file=sys.stderr, + ) + return None, None media_path = Path(first_item) pipe_obj.path = str(media_path) return media_path, None @@ -1789,8 +1036,7 @@ class Add_File(Cmdlet): pipe_obj.path = str(media_path) return media_path, first_item.get("hash") except Exception: - # Fallback to returning string if not a path - return str(path_candidate), first_item.get("hash") + return None, first_item.get("hash") # If first item is a PipeObject object try: @@ -1865,62 +1111,6 @@ class Add_File(Cmdlet): return files_info - @staticmethod - def _fetch_hydrus_path( - file_hash: str, - config: Dict[str, - Any], - store_name: Optional[str] = None, - ) -> Tuple[Optional[Path], - bool]: - """Fetch the physical path of a file from Hydrus using its hash.""" - if not file_hash: - return None, False - - try: - client = None - if store_name: - # Store specified: do not fall back to a global/default Hydrus client. - try: - store = Store(config) - backend = store[str(store_name)] - candidate = getattr(backend, "_client", None) - if candidate is not None and hasattr(candidate, "get_file_path"): - client = candidate - except Exception: - client = None - if client is None: - log( - f"❌ Hydrus client unavailable for store '{store_name}'", - file=sys.stderr - ) - return None, False - else: - client = hydrus_wrapper.get_client(config) - if not client: - log("❌ Hydrus client not available", file=sys.stderr) - return None, False - - response = client.get_file_path(file_hash) - file_path_str = response.get("path") - if not file_path_str: - log( - f"❌ Hydrus file_path endpoint did not return a path", - file=sys.stderr - ) - return None, False - - media_path = Path(file_path_str) - if not media_path.exists(): - log(f"❌ Hydrus file path does not exist: {media_path}", file=sys.stderr) - return None, False - - log(f"✓ Retrieved Hydrus file path: {media_path}", file=sys.stderr) - return media_path, True - except Exception as exc: - log(f"❌ Failed to get Hydrus file path: {exc}", file=sys.stderr) - return None, False - @staticmethod def _validate_source(media_path: Optional[Path]) -> bool: """Validate that the source file exists and is supported.""" @@ -1929,12 +1119,15 @@ class Add_File(Cmdlet): target_str = str(media_path) - # If it's a URL target, we skip file existence checks + # add-file does not accept URL inputs. if target_str.lower().startswith(("http://", "https://", "magnet:", - "torrent:")): - return True + "torrent:", + "hifi:", + "hydrus:")): + log("add-file ingests local files only.", file=sys.stderr) + return False if not media_path.exists() or not media_path.is_file(): log(f"File not found: {media_path}") @@ -1948,355 +1141,10 @@ class Add_File(Cmdlet): return True - @staticmethod - def _is_url_target(media_path: Optional[Path]) -> bool: - """Check if the target is a URL that needs downloading.""" - if media_path and str(media_path).lower().startswith(("http://", "https://")): - return True - return False - - @staticmethod - def _sanitize_filename(value: str) -> str: - # Minimal Windows-safe filename sanitization. - text = str(value or "").strip() - if not text: - return "file" - invalid = '<>:"/\\|?*' - text = "".join("_" if (ch in invalid or ord(ch) < 32) else ch for ch in text) - text = re.sub(r"\s+", " ", text).strip(" .") - return text or "file" - - @staticmethod - def _parse_hydrus_file_url(file_url: str) -> Optional[str]: - """Return the sha256 hash from a Hydrus /get_files/file URL, or None.""" - try: - split = urlsplit(str(file_url)) - if split.scheme.lower() not in {"http", - "https"}: - return None - path_lower = (split.path or "").lower() - if "/get_files/file" not in path_lower: - return None - params = parse_qs(split.query or "") - raw = None - if "hash" in params and params["hash"]: - raw = params["hash"][0] - if not raw: - return None - hash_val = str(raw).strip().lower() - if not re.fullmatch(r"[0-9a-f]{64}", hash_val): - return None - return hash_val - except Exception: - return None - - def _try_download_hydrus_file_url( - self, - *, - file_url: str, - pipe_obj: models.PipeObject, - config: Dict[str, - Any], - ) -> Optional[tuple[Path, - Path]]: - """If *file_url* is a Hydrus file URL, download it to temp and return (path, temp_dir).""" - file_hash = self._parse_hydrus_file_url(file_url) - if not file_hash: - return None - - # Resolve Hydrus backend for auth. - store_name = str(getattr(pipe_obj, "store", "") or "").strip() - if ":" in store_name: - store_name = store_name.split(":", 1)[-1].strip() - - backend = None - try: - store_registry = Store(config) - if store_name and store_registry.is_available(store_name): - candidate = store_registry[store_name] - if type(candidate).__name__.lower() == "hydrusnetwork": - backend = candidate - except Exception: - backend = None - - if backend is None: - try: - store_registry = Store(config) - target_prefix = str(file_url).split("/get_files/file", 1)[0].rstrip("/") - for backend_name in store_registry.list_backends(): - candidate = store_registry[backend_name] - if type(candidate).__name__.lower() != "hydrusnetwork": - continue - base_url = str(getattr(candidate, "URL", "") or "").rstrip("/") - if base_url and (target_prefix.lower() == base_url.lower() - or target_prefix.lower().startswith( - base_url.lower())): - backend = candidate - break - except Exception: - backend = None - - if backend is None: - debug( - "[add-file] Hydrus file URL detected but no Hydrus backend matched for auth" - ) - return None - - api_key = str(getattr(backend, "API", "") or "").strip() - if not api_key: - debug( - f"[add-file] Hydrus backend '{getattr(backend, 'NAME', '') or store_name}' missing API key" - ) - return None - - # Best-effort filename from title + ext. - ext = "" - try: - if isinstance(pipe_obj.extra, dict): - ext = str(pipe_obj.extra.get("ext") or "").strip().lstrip(".") - except Exception: - ext = "" - if not ext: - ext = "bin" - - title_hint = str(getattr(pipe_obj, "title", "") or "").strip() - base_name = ( - self._sanitize_filename(title_hint) - if title_hint else f"hydrus_{file_hash[:12]}" - ) - - temp_dir = Path(tempfile.mkdtemp(prefix="medios_hydrus_")) - destination = unique_path(temp_dir / f"{base_name}.{ext}") - - headers = { - "Hydrus-Client-API-Access-Key": api_key - } - timeout = 60.0 - try: - client = getattr(backend, "_client", None) - timeout_val = getattr(client, "timeout", None) - if timeout_val is not None: - timeout = float(timeout_val) - except Exception: - timeout = 60.0 - - try: - log( - f"[add-file] Downloading Hydrus file via API ({getattr(backend, 'NAME', '') or store_name})", - file=sys.stderr, - ) - downloaded_bytes = hydrus_wrapper.download_hydrus_file( - str(file_url), - headers, - destination, - timeout - ) - if downloaded_bytes <= 0 and not destination.exists(): - return None - return destination, temp_dir - except Exception as exc: - log(f"[add-file] Hydrus download failed: {exc}", file=sys.stderr) - try: - shutil.rmtree(temp_dir, ignore_errors=True) - except Exception: - pass - return None - - def _delegate_to_download_file( - self, - result: Any, - url_str: str, - location: Optional[str], - provider_name: Optional[str], - args: Sequence[str], - config: Dict[str, - Any], - ) -> int: - """Delegate URL handling to download-file cmdlet (yt-dlp path).""" - log( - f"Target is a URL, delegating to download-file: {url_str}", - file=sys.stderr - ) - # Reuse the globally-registered cmdlet instance to avoid duplicative registration - from cmdlet.download_file import CMDLET as dl_cmdlet - - dl_args = list(args) if args else [] - - # Add the URL to the argument list for download-media - dl_args.insert(0, url_str) - - # If result has selection_args (like -item from @N selection), include them - if isinstance(result, dict) and "_selection_args" in result: - selection_args = result["_selection_args"] - if selection_args: - dl_args.extend(selection_args) - else: - extra_val = getattr(result, "extra", None) - if isinstance(extra_val, dict) and "_selection_args" in extra_val: - selection_args = extra_val["_selection_args"] - if selection_args: - dl_args.extend(selection_args) - - # download-file doesn't support -storage flag - # It downloads to the configured directory, then add-file will handle storage - # Note: Provider uploads (0x0) are not supported via this path - - # Call download-file with the URL in args - return dl_cmdlet.run(None, dl_args, config) - - @staticmethod - def _download_manifest_with_ffmpeg(source: Union[str, Path]) -> Tuple[Optional[Path], Optional[Path]]: - """Run ffmpeg on the manifest or stream URL and return a local file path for ingestion.""" - import subprocess - - ffmpeg_bin = shutil.which("ffmpeg") - if not ffmpeg_bin: - log("ffmpeg not found on PATH; cannot download HIFI manifest.", file=sys.stderr) - return None, None - - tmp_dir = Path(tempfile.mkdtemp(prefix="medeia_hifi_mpd_")) - stream_mp4 = tmp_dir / "stream.mp4" - - input_target: Optional[str] = None - if isinstance(source, Path): - input_target = str(source) - elif isinstance(source, str): - candidate = source.strip() - if candidate.lower().startswith("file://"): - try: - from urllib.parse import unquote, urlparse - - parsed = urlparse(candidate) - raw_path = unquote(parsed.path or "") - raw_path = raw_path.lstrip("/") - candidate = raw_path - except Exception: - pass - input_target = candidate - - if not input_target: - return None, None - - try: - subprocess.run( - [ - ffmpeg_bin, - "-hide_banner", - "-loglevel", - "error", - "-y", - "-protocol_whitelist", - "file,https,tcp,tls,crypto,data", - "-i", - input_target, - "-c", - "copy", - str(stream_mp4), - ], - check=True, - capture_output=True, - text=True, - ) - except subprocess.CalledProcessError as exc: - err = (exc.stderr or "").strip() - if err: - log(f"ffmpeg manifest download failed: {err}", file=sys.stderr) - else: - log(f"ffmpeg manifest download failed (exit {exc.returncode})", file=sys.stderr) - return None, tmp_dir - except Exception as exc: - log(f"ffmpeg manifest download failed: {exc}", file=sys.stderr) - return None, tmp_dir - - codec = None - ffprobe_bin = shutil.which("ffprobe") - if ffprobe_bin: - try: - probe = subprocess.run( - [ - ffprobe_bin, - "-v", - "error", - "-select_streams", - "a:0", - "-show_entries", - "stream=codec_name", - "-of", - "default=nw=1:nk=1", - str(stream_mp4), - ], - capture_output=True, - text=True, - check=True, - ) - codec = (probe.stdout or "").strip().lower() or None - except Exception: - codec = None - - ext = None - if codec == "flac": - ext = "flac" - elif codec == "aac": - ext = "m4a" - elif codec == "mp3": - ext = "mp3" - elif codec == "opus": - ext = "opus" - else: - ext = "mka" - - audio_out = tmp_dir / f"audio.{ext}" - try: - subprocess.run( - [ - ffmpeg_bin, - "-hide_banner", - "-loglevel", - "error", - "-y", - "-i", - str(stream_mp4), - "-vn", - "-c:a", - "copy", - str(audio_out), - ], - check=True, - capture_output=True, - text=True, - ) - if audio_out.exists(): - return audio_out, tmp_dir - except subprocess.CalledProcessError as exc: - err = (exc.stderr or "").strip() - if err: - log(f"ffmpeg audio extract failed: {err}", file=sys.stderr) - except Exception: - pass - - if stream_mp4.exists(): - return stream_mp4, tmp_dir - return None, tmp_dir - @staticmethod def _get_url(result: Any, pipe_obj: models.PipeObject) -> List[str]: from SYS.metadata import normalize_urls - # If this is a HIFI selection, we only support the decoded MPD (never tidal.com URLs). - is_hifi = False - try: - if isinstance(result, dict): - is_hifi = str(result.get("table") or result.get("provider") or "").strip().lower().startswith("hifi") - else: - is_hifi = str(getattr(result, "table", "") or getattr(result, "provider", "")).strip().lower().startswith("hifi") - except Exception: - is_hifi = False - try: - if not is_hifi: - is_hifi = str(getattr(pipe_obj, "path", "") or "").strip().lower().startswith("hifi:") - except Exception: - pass - # Prefer explicit PipeObject.url if present urls: List[str] = [] try: @@ -2320,13 +1168,6 @@ class Add_File(Cmdlet): if not urls: urls = normalize_urls(extract_url_from_result(result)) - # If this is a Tidal/HIFI selection with a decodable manifest, do NOT fall back to - # tidal.com track URLs. The only supported target is the decoded local MPD. - manifest_path = sh.resolve_tidal_manifest_path(result) - if manifest_path: - return [manifest_path] - if is_hifi: - return [] return urls @staticmethod @@ -2820,578 +1661,6 @@ class Add_File(Cmdlet): return 0 - @staticmethod - def _preflight_url_duplicates_bulk(urls: Sequence[str], - config: Dict[str, - Any]) -> set[str]: - """Return a set of URLs that appear to already exist in any searchable backend. - - This is a best-effort check used to avoid re-downloading already-stored media when - a batch of URL items is piped into add-file. - """ - skip: set[str] = set() - try: - storage = Store(config) - backend_names = list(storage.list_searchable_backends() or []) - except Exception: - return skip - - for raw in urls: - u = str(raw or "").strip() - if not u: - continue - - for backend_name in backend_names: - try: - if str(backend_name).strip().lower() == "temp": - continue - except Exception: - pass - try: - backend = storage[backend_name] - except Exception: - continue - - try: - hits = backend.search(f"url:{u}", limit=1) or [] - except Exception: - hits = [] - if hits: - skip.add(u) - break - - return skip - - @staticmethod - def _download_streaming_url_as_pipe_objects( - url: str, - config: Dict[str, - Any], - *, - mode_hint: Optional[str] = None, - ytdl_format_hint: Optional[str] = None, - ) -> List[Dict[str, - Any]]: - """Download a yt-dlp-supported URL and return PipeObject-style dict(s). - - This does not rely on pipeline stage context and is used so add-file can ingest - URL selections directly (download -> add to store/provider) in one invocation. - """ - url_str = str(url or "").strip() - if not url_str: - return [] - - try: - from SYS.models import DownloadOptions - from tool.ytdlp import ( - YtDlpTool, - _best_subtitle_sidecar, - _download_with_timeout, - _format_chapters_note, - _read_text_file, - is_url_supported_by_ytdlp, - list_formats, - ) - except Exception: - return [] - - if not is_url_supported_by_ytdlp(url_str): - return [] - - try: - from SYS.config import resolve_output_dir - - out_dir = resolve_output_dir(config) - if out_dir is None: - return [] - except Exception: - return [] - - cookies_path = None - try: - cookie_candidate = YtDlpTool(config).resolve_cookiefile() - if cookie_candidate is not None and cookie_candidate.is_file(): - cookies_path = cookie_candidate - except Exception: - cookies_path = None - - quiet_download = False - try: - quiet_download = bool((config or {}).get("_quiet_background_output")) - except Exception: - quiet_download = False - - # Decide download mode. - # Default to video unless we have a hint or the URL appears to be audio-only. - mode = str(mode_hint or "").strip().lower() if mode_hint else "" - if mode not in {"audio", - "video"}: - mode = "video" - # Best-effort: infer from formats for this URL (one-time, no playlist probing). - try: - cf = ( - str(cookies_path) - if cookies_path is not None and cookies_path.is_file() else None - ) - fmts_probe = list_formats( - url_str, - no_playlist=False, - playlist_items=None, - cookiefile=cf - ) - if isinstance(fmts_probe, list) and fmts_probe: - has_video = False - for f in fmts_probe: - if not isinstance(f, dict): - continue - vcodec = str(f.get("vcodec", "none") or "none").strip().lower() - if vcodec and vcodec != "none": - has_video = True - break - mode = "video" if has_video else "audio" - except Exception: - mode = "video" - - # Pick a safe initial format selector. - # Important: yt-dlp defaults like "251/140" are YouTube-specific and break Bandcamp. - fmt_hint = str(ytdl_format_hint).strip() if ytdl_format_hint else "" - if fmt_hint: - chosen_format: Optional[str] = fmt_hint - else: - chosen_format = None - if mode == "audio": - # Generic audio selector that works across extractors. - chosen_format = "bestaudio/best" - - opts = DownloadOptions( - url=url_str, - mode=mode, - output_dir=Path(out_dir), - cookies_path=cookies_path, - ytdl_format=chosen_format, - quiet=quiet_download, - embed_chapters=True, - write_sub=True, - ) - - # Download with a small amount of resilience for format errors. - try: - result_obj = _download_with_timeout(opts, timeout_seconds=300) - except Exception as exc: - msg = str(exc) - # If a format is invalid/unsupported, try: - # - if only one format exists, retry with that id - # - else for audio-only sources, retry with bestaudio/best - try: - format_error = "Requested format is not available" in msg - except Exception: - format_error = False - - if format_error: - try: - cf = ( - str(cookies_path) - if cookies_path is not None and cookies_path.is_file() else None - ) - fmts = list_formats( - url_str, - no_playlist=False, - playlist_items=None, - cookiefile=cf - ) - if isinstance(fmts, - list) and len(fmts) == 1 and isinstance(fmts[0], - dict): - fid = str(fmts[0].get("format_id") or "").strip() - if fid: - opts = DownloadOptions( - url=url_str, - mode=mode, - output_dir=Path(out_dir), - cookies_path=cookies_path, - ytdl_format=fid, - quiet=quiet_download, - embed_chapters=True, - write_sub=True, - ) - result_obj = _download_with_timeout( - opts, - timeout_seconds=300 - ) - # proceed - else: - raise - elif mode == "audio" and (not chosen_format - or chosen_format != "bestaudio/best"): - opts = DownloadOptions( - url=url_str, - mode=mode, - output_dir=Path(out_dir), - cookies_path=cookies_path, - ytdl_format="bestaudio/best", - quiet=quiet_download, - embed_chapters=True, - write_sub=True, - ) - result_obj = _download_with_timeout(opts, timeout_seconds=300) - else: - raise - except Exception as exc2: - log( - f"[add-file] Download failed for {url_str}: {exc2}", - file=sys.stderr - ) - return [] - else: - log(f"[add-file] Download failed for {url_str}: {exc}", file=sys.stderr) - return [] - - results: List[Any] - if isinstance(result_obj, list): - results = list(result_obj) - else: - paths = getattr(result_obj, "paths", None) - if isinstance(paths, list) and paths: - # Section downloads: create one result per file. - from SYS.models import DownloadMediaResult - - results = [] - for p in paths: - try: - p_path = Path(p) - except Exception: - continue - if not p_path.exists() or p_path.is_dir(): - continue - try: - hv = sha256_file(p_path) - except Exception: - hv = None - try: - results.append( - DownloadMediaResult( - path=p_path, - info=getattr(result_obj, - "info", - {}) or {}, - tag=list(getattr(result_obj, - "tag", - []) or []), - source_url=getattr(result_obj, - "source_url", - None) or url_str, - hash_value=hv, - ) - ) - except Exception: - continue - else: - results = [result_obj] - - out: List[Dict[str, Any]] = [] - for downloaded in results: - try: - po = dl_cmdlet._build_pipe_object(downloaded, url_str, opts) - - # Attach chapter timestamps note (best-effort). - try: - info = ( - downloaded.info - if isinstance(getattr(downloaded, - "info", - None), - dict) else {} - ) - except Exception: - info = {} - try: - chapters_text = _format_chapters_note(info) - except Exception: - chapters_text = None - if chapters_text: - notes = po.get("notes") - if not isinstance(notes, dict): - notes = {} - notes.setdefault("chapters", chapters_text) - po["notes"] = notes - - # Capture subtitle sidecar into notes and remove it so add-file won't ingest it later. - try: - media_path = Path(str(po.get("path") or "")) - except Exception: - media_path = None - if media_path is not None and media_path.exists( - ) and media_path.is_file(): - try: - sub_path = _best_subtitle_sidecar(media_path) - except Exception: - sub_path = None - if sub_path is not None: - sub_text = _read_text_file(sub_path) - if sub_text: - notes = po.get("notes") - if not isinstance(notes, dict): - notes = {} - notes["sub"] = sub_text - po["notes"] = notes - try: - sub_path.unlink() - except Exception: - pass - - # Mark as temp artifact from download-media so add-file can auto-delete after ingest. - po["action"] = "cmdlet:download-media" - po["is_temp"] = True - out.append(po) - except Exception: - continue - - return out - - @staticmethod - def _download_soulseek_file(result: Any, config: Dict[str, Any]) -> Optional[Path]: - """ - Download a file from Soulseek peer. - - Extracts username and filename from soulseek result metadata and initiates download. - """ - try: - import asyncio - from ProviderCore.registry import download_soulseek_file - from pathlib import Path - - # Extract metadata from result - full_metadata = {} - if isinstance(result, dict): - full_metadata = result.get("full_metadata", - {}) - elif (hasattr(result, - "extra") and isinstance(result.extra, - dict) - and "full_metadata" in result.extra): - full_metadata = result.extra.get("full_metadata", - {}) - elif hasattr(result, "full_metadata"): - # Direct attribute access (fallback) - val = getattr(result, - "full_metadata", - {}) - if isinstance(val, dict): - full_metadata = val - - username = full_metadata.get("username") - filename = full_metadata.get("filename") - - if not username or not filename: - debug( - f"[add-file] ERROR: Could not extract soulseek metadata from result (type={type(result).__name__})" - ) - extra_val = getattr(result, "extra", None) - if isinstance(extra_val, dict): - debug(f"[add-file] Result extra keys: {list(extra_val.keys())}") - return None - - if not username or not filename: - debug( - f"[add-file] ERROR: Missing soulseek metadata (username={username}, filename={filename})" - ) - return None - - debug(f"[add-file] Starting soulseek download: {username} -> {filename}") - - # Read Soulseek login credentials from config (client credentials), separate from peer username. - try: - from SYS.config import get_soulseek_username, get_soulseek_password - - client_user = get_soulseek_username(config) or "" - client_pass = get_soulseek_password(config) or "" - except Exception: - client_user = "" - client_pass = "" - - if not client_user or not client_pass: - debug( - "[add-file] ERROR: Soulseek credentials missing (set [provider=soulseek] username/password in config.conf)" - ) - return None - - # Determine output directory (prefer downloads folder in config) - output_dir = ( - Path(config.get("output_dir", - "./downloads")) - if isinstance(config.get("output_dir"), - str) else Path("./downloads") - ) - output_dir.mkdir(parents=True, exist_ok=True) - - # Run async download in event loop - try: - loop = asyncio.get_event_loop() - if loop.is_closed(): - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - except RuntimeError: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - - downloaded_path = loop.run_until_complete( - download_soulseek_file( - username=username, - filename=filename, - output_dir=output_dir, - client_username=client_user, - client_password=client_pass, - timeout=1200, # 20 minutes - ) - ) - - return downloaded_path - - except Exception as e: - log( - f"[add-file] Soulseek download error: {type(e).__name__}: {e}", - file=sys.stderr - ) - debug(f"[add-file] Soulseek download traceback: {e}") - return None - - @staticmethod - def _handle_matrix_upload( - media_path: Path, - pipe_obj: models.PipeObject, - config: Dict[str, - Any], - delete_after: bool, - *, - room_id: str, - ) -> int: - """Upload to Matrix and update the PipeObject. - - Matrix needs a room_id. If you don't have one, use the interactive - room picker path which resumes via `-matrix-send`. - """ - from Provider.matrix import Matrix - - log(f"Uploading via matrix: {media_path.name}", file=sys.stderr) - - try: - provider = Matrix(config) - except Exception as exc: - log(f"Matrix not available: {exc}", file=sys.stderr) - return 1 - - try: - hoster_url = provider.upload_to_room( - str(media_path), - str(room_id), - pipe_obj=pipe_obj - ) - log(f"File uploaded: {hoster_url}", file=sys.stderr) - - f_hash = Add_File._resolve_file_hash(None, media_path, pipe_obj, None) - - except Exception as exc: - log(f"Upload failed: {exc}", file=sys.stderr) - return 1 - - # Update PipeObject and emit - extra_updates: Dict[str, - Any] = { - "provider": "matrix", - "provider_url": hoster_url, - "room_id": str(room_id), - } - if isinstance(pipe_obj.extra, dict): - existing_known = list(pipe_obj.extra.get("url") or []) - if hoster_url and hoster_url not in existing_known: - existing_known.append(hoster_url) - extra_updates["url"] = existing_known - - file_path = pipe_obj.path or (str(media_path) if media_path else None) or "" - Add_File._update_pipe_object_destination( - pipe_obj, - hash_value=f_hash or "unknown", - store="matrix", - path=file_path, - tag=pipe_obj.tag, - title=pipe_obj.title or (media_path.name if media_path else None), - extra_updates=extra_updates, - ) - Add_File._emit_pipe_object(pipe_obj) - Add_File._cleanup_after_success(media_path, delete_source=bool(delete_after)) - return 0 - - @staticmethod - def _matrix_prompt_room_selection( - pending_items: List[Dict[str, - Any]], - config: Dict[str, - Any], - original_args: List[str], - ) -> int: - """Show rooms table and pause pipeline for @N selection.""" - from Provider.matrix import Matrix - - # Stash pending uploads so @N on the matrix table can trigger Matrix.upload_to_room. - ctx.store_value("matrix_pending_uploads", pending_items) - - try: - provider = Matrix(config) - except Exception as exc: - log(f"Matrix not available: {exc}", file=sys.stderr) - return 1 - - try: - rooms = provider.list_rooms() - except Exception as exc: - log(f"Failed to list Matrix rooms: {exc}", file=sys.stderr) - return 1 - - if not rooms: - log("No joined rooms found.", file=sys.stderr) - return 0 - - table = ResultTable("Matrix Rooms") - table.set_table("matrix") - table.set_source_command("add-file", list(original_args or [])) - - for room in rooms: - row = table.add_row() - name = str(room.get("name") or "").strip() if isinstance(room, dict) else "" - rid = str(room.get("room_id") or "" - ).strip() if isinstance(room, - dict) else "" - row.add_column("Name", name) - row.add_column("Room", rid) - - room_items: List[Dict[str, Any]] = [] - for room in rooms: - if not isinstance(room, dict): - continue - rid = str(room.get("room_id") or "").strip() - name = str(room.get("name") or "").strip() - room_items.append( - { - **room, - "store": "matrix", - "provider": "matrix", - "title": name or rid or "Matrix Room", - } - ) - - # Overlay table: user selects @N on this Matrix rooms table to upload. - ctx.set_last_result_table_overlay(table, room_items) - ctx.set_current_stage_table(table) - - print() - from SYS.rich_display import stdout_console - - stdout_console().print(table) - print( - "\nSelect room(s) with @N (e.g. @1 or @1-3) to upload the selected item(s)" - ) - return 0 - @staticmethod def _handle_provider_upload( media_path: Path, @@ -3598,6 +1867,10 @@ class Add_File(Cmdlet): log(f"[add-file] FlorenceVision tagging error: {exc}", file=sys.stderr) return 1 + debug( + f"[add-file] Storing into backend '{backend_name}' path='{media_path}' title='{title}'" + ) + # Call backend's add_file with full metadata # Backend returns hash as identifier file_identifier = backend.add_file( @@ -3606,6 +1879,9 @@ class Add_File(Cmdlet): tag=tags, url=[] if (defer_url_association and url) else url, ) + debug( + f"[add-file] backend.add_file returned identifier {file_identifier} (len={len(str(file_identifier)) if file_identifier is not None else 'None'})" + ) ##log(f"✓ File added to '{backend_name}': {file_identifier}", file=sys.stderr) stored_path: Optional[str] = None @@ -3667,27 +1943,36 @@ class Add_File(Cmdlet): try: setter = getattr(backend, "set_note", None) if callable(setter): + debug( + f"[add-file] Writing sub note (len={len(str(sub_note))}) to {backend_name}:{resolved_hash}" + ) setter(resolved_hash, "sub", sub_note) - except Exception: - pass + except Exception as exc: + debug(f"[add-file] sub note write failed: {exc}") chapters_note = Add_File._get_note_text(result, pipe_obj, "chapters") if chapters_note: try: setter = getattr(backend, "set_note", None) if callable(setter): + debug( + f"[add-file] Writing chapters note (len={len(str(chapters_note))}) to {backend_name}:{resolved_hash}" + ) setter(resolved_hash, "chapters", chapters_note) - except Exception: - pass + except Exception as exc: + debug(f"[add-file] chapters note write failed: {exc}") caption_note = Add_File._get_note_text(result, pipe_obj, "caption") if caption_note: try: setter = getattr(backend, "set_note", None) if callable(setter): + debug( + f"[add-file] Writing caption note (len={len(str(caption_note))}) to {backend_name}:{resolved_hash}" + ) setter(resolved_hash, "caption", caption_note) - except Exception: - pass + except Exception as exc: + debug(f"[add-file] caption note write failed: {exc}") meta: Dict[str, Any] = {} diff --git a/cmdlet/add_note.py b/cmdlet/add_note.py index 3ff4458..7c61bbb 100644 --- a/cmdlet/add_note.py +++ b/cmdlet/add_note.py @@ -23,6 +23,14 @@ from SYS.utils import sha256_file class Add_Note(Cmdlet): + DEFAULT_QUERY_HINTS = ( + "title:", + "text:", + "hash:", + "caption:", + "sub:", + "subtitle:", + ) def __init__(self) -> None: super().__init__( @@ -124,6 +132,45 @@ class Add_Note(Cmdlet): note_text = text_match.group(1).strip() if text_match else "" return (note_name or None, note_text or None) + @classmethod + def _looks_like_note_query_token(cls, token: Any) -> bool: + text = str(token or "").strip().lower() + if not text: + return False + return any(hint in text for hint in cls.DEFAULT_QUERY_HINTS) + + @classmethod + def _default_query_args(cls, args: Sequence[str]) -> List[str]: + tokens: List[str] = list(args or []) + lower_tokens = {str(tok).lower() for tok in tokens if tok is not None} + if "-query" in lower_tokens or "--query" in lower_tokens: + return tokens + + for idx, tok in enumerate(tokens): + token_text = str(tok or "") + if not token_text or token_text.startswith("-"): + continue + if not cls._looks_like_note_query_token(token_text): + continue + + combined_parts = [token_text] + end = idx + 1 + while end < len(tokens): + next_text = str(tokens[end] or "") + if not next_text or next_text.startswith("-"): + break + if not cls._looks_like_note_query_token(next_text): + break + combined_parts.append(next_text) + end += 1 + + combined_query = " ".join(combined_parts) + tokens[idx:end] = [combined_query] + tokens.insert(idx, "-query") + return tokens + + return tokens + def _resolve_hash( self, raw_hash: Optional[str], @@ -153,11 +200,14 @@ class Add_Note(Cmdlet): log(f"Cmdlet: {self.name}\nSummary: {self.summary}\nUsage: {self.usage}") return 0 - parsed = parse_cmdlet_args(args, self) + parsed_args = self._default_query_args(args) + parsed = parse_cmdlet_args(parsed_args, self) store_override = parsed.get("store") hash_override = normalize_hash(parsed.get("hash")) note_name, note_text = self._parse_note_query(str(parsed.get("query") or "")) + note_name = str(note_name or "").strip() + note_text = str(note_text or "").strip() if not note_name or not note_text: log( "[add_note] Error: -query must include title: and text:<text>", @@ -173,7 +223,6 @@ class Add_Note(Cmdlet): return 1 explicit_target = bool(hash_override and store_override) - results = normalize_result_input(result) if results and explicit_target: # Direct targeting mode: apply note once to the explicit target and @@ -194,14 +243,22 @@ class Add_Note(Cmdlet): f"✓ add-note: 1 item in '{store_override}'", file=sys.stderr ) + log( + "[add_note] Updated 1/1 item(s)", + file=sys.stderr + ) + for res in results: + ctx.emit(res) + return 0 + log( + "[add_note] Warning: Note write reported failure", + file=sys.stderr + ) + return 1 except Exception as exc: log(f"[add_note] Error: Failed to set note: {exc}", file=sys.stderr) return 1 - for res in results: - ctx.emit(res) - return 0 - if not results: if explicit_target: # Allow standalone use (no piped input) and enable piping the target forward. @@ -217,7 +274,7 @@ class Add_Note(Cmdlet): return 1 store_registry = Store(config) - updated = 0 + planned_ops = 0 # Batch write plan: store -> [(hash, name, text), ...] note_ops: Dict[str, @@ -271,12 +328,12 @@ class Add_Note(Cmdlet): []).append((resolved_hash, note_name, item_note_text)) - updated += 1 + planned_ops += 1 ctx.emit(res) # Execute bulk writes per store. - wrote_any = False + successful_writes = 0 for store_name, ops in note_ops.items(): if not ops: continue @@ -285,16 +342,23 @@ class Add_Note(Cmdlet): except Exception: continue + store_success = 0 bulk_fn = getattr(backend, "set_note_bulk", None) if callable(bulk_fn): try: ok = bool(bulk_fn(list(ops), config=config)) - wrote_any = wrote_any or ok or True - ctx.print_if_visible( - f"✓ add-note: {len(ops)} item(s) in '{store_name}'", - file=sys.stderr + if ok: + store_success += len(ops) + ctx.print_if_visible( + f"✓ add-note: {len(ops)} item(s) in '{store_name}'", + file=sys.stderr + ) + successful_writes += store_success + continue + log( + f"[add_note] Warning: bulk set_note returned False for '{store_name}'", + file=sys.stderr, ) - continue except Exception as exc: log( f"[add_note] Warning: bulk set_note failed for '{store_name}': {exc}; falling back", @@ -305,12 +369,23 @@ class Add_Note(Cmdlet): for file_hash, name, text in ops: try: ok = bool(backend.set_note(file_hash, name, text, config=config)) - wrote_any = wrote_any or ok + if ok: + store_success += 1 except Exception: continue - log(f"[add_note] Updated {updated} item(s)", file=sys.stderr) - return 0 if (updated > 0 and wrote_any) else (0 if updated > 0 else 1) + if store_success: + successful_writes += store_success + ctx.print_if_visible( + f"✓ add-note: {store_success} item(s) in '{store_name}'", + file=sys.stderr + ) + + log( + f"[add_note] Updated {successful_writes}/{planned_ops} item(s)", + file=sys.stderr + ) + return 0 if successful_writes > 0 else 1 CMDLET = Add_Note() diff --git a/cmdlet/download_file.py b/cmdlet/download_file.py index a1308a3..575265b 100644 --- a/cmdlet/download_file.py +++ b/cmdlet/download_file.py @@ -33,6 +33,7 @@ from rich.prompt import Confirm from tool.ytdlp import ( YtDlpTool, _best_subtitle_sidecar, + _SUBTITLE_EXTS, _download_with_timeout, _format_chapters_note, _read_text_file, @@ -2413,7 +2414,7 @@ class Download_File(Cmdlet): except Exception: continue try: - if p_path.suffix.lower() in _best_subtitle_sidecar.__defaults__[0]: + if p_path.suffix.lower() in _SUBTITLE_EXTS: continue except Exception: pass @@ -2936,6 +2937,223 @@ class Download_File(Cmdlet): "media_kind": "video" if opts.mode == "video" else "audio", } + @staticmethod + def download_streaming_url_as_pipe_objects( + url: str, + config: Dict[str, Any], + *, + mode_hint: Optional[str] = None, + ytdl_format_hint: Optional[str] = None, + ) -> List[Dict[str, Any]]: + """Download a yt-dlp-supported URL and return PipeObject-style dict(s). + + This is a lightweight helper intended for cmdlets that need to expand streaming URLs + into local files without re-implementing yt-dlp glue. + """ + url_str = str(url or "").strip() + if not url_str: + return [] + + if not is_url_supported_by_ytdlp(url_str): + return [] + + try: + from SYS.config import resolve_output_dir + + out_dir = resolve_output_dir(config) + if out_dir is None: + return [] + except Exception: + return [] + + cookies_path = None + try: + cookie_candidate = YtDlpTool(config).resolve_cookiefile() + if cookie_candidate is not None and cookie_candidate.is_file(): + cookies_path = cookie_candidate + except Exception: + cookies_path = None + + quiet_download = False + try: + quiet_download = bool((config or {}).get("_quiet_background_output")) + except Exception: + quiet_download = False + + mode = str(mode_hint or "").strip().lower() if mode_hint else "" + if mode not in {"audio", "video"}: + mode = "video" + try: + cf = ( + str(cookies_path) + if cookies_path is not None and cookies_path.is_file() else None + ) + fmts_probe = list_formats( + url_str, + no_playlist=False, + playlist_items=None, + cookiefile=cf, + ) + if isinstance(fmts_probe, list) and fmts_probe: + has_video = False + for f in fmts_probe: + if not isinstance(f, dict): + continue + vcodec = str(f.get("vcodec", "none") or "none").strip().lower() + if vcodec and vcodec != "none": + has_video = True + break + mode = "video" if has_video else "audio" + except Exception: + mode = "video" + + fmt_hint = str(ytdl_format_hint).strip() if ytdl_format_hint else "" + chosen_format: Optional[str] + if fmt_hint: + chosen_format = fmt_hint + else: + chosen_format = None + if mode == "audio": + chosen_format = "bestaudio/best" + + opts = DownloadOptions( + url=url_str, + mode=mode, + output_dir=Path(out_dir), + cookies_path=cookies_path, + ytdl_format=chosen_format, + quiet=quiet_download, + embed_chapters=True, + write_sub=True, + ) + + try: + result_obj = _download_with_timeout(opts, timeout_seconds=300) + except Exception as exc: + log(f"[download-file] Download failed for {url_str}: {exc}", file=sys.stderr) + return [] + + results: List[Any] + if isinstance(result_obj, list): + results = list(result_obj) + else: + paths = getattr(result_obj, "paths", None) + if isinstance(paths, list) and paths: + results = [] + for p in paths: + try: + p_path = Path(p) + except Exception: + continue + if not p_path.exists() or p_path.is_dir(): + continue + try: + hv = sha256_file(p_path) + except Exception: + hv = None + try: + results.append( + DownloadMediaResult( + path=p_path, + info=getattr(result_obj, "info", {}) or {}, + tag=list(getattr(result_obj, "tag", []) or []), + source_url=getattr(result_obj, "source_url", None) or url_str, + hash_value=hv, + ) + ) + except Exception: + continue + else: + results = [result_obj] + + out: List[Dict[str, Any]] = [] + for downloaded in results: + try: + info = ( + downloaded.info + if isinstance(getattr(downloaded, "info", None), dict) else {} + ) + except Exception: + info = {} + + try: + media_path = Path(str(getattr(downloaded, "path", "") or "")) + except Exception: + continue + if not media_path.exists() or media_path.is_dir(): + continue + + try: + hash_value = getattr(downloaded, "hash_value", None) or sha256_file(media_path) + except Exception: + hash_value = None + + title = None + try: + title = info.get("title") + except Exception: + title = None + title = title or media_path.stem + + tags = list(getattr(downloaded, "tag", []) or []) + if title and f"title:{title}" not in tags: + tags.insert(0, f"title:{title}") + + final_url = None + try: + page_url = info.get("webpage_url") or info.get("original_url") or info.get("url") + if page_url: + final_url = str(page_url) + except Exception: + final_url = None + if not final_url: + final_url = url_str + + po: Dict[str, Any] = { + "path": str(media_path), + "hash": hash_value, + "title": title, + "url": final_url, + "tag": tags, + "action": "cmdlet:download-file", + "is_temp": True, + "ytdl_format": getattr(opts, "ytdl_format", None), + "store": getattr(opts, "storage_name", None) or getattr(opts, "storage_location", None) or "PATH", + "media_kind": "video" if opts.mode == "video" else "audio", + } + + try: + chapters_text = _format_chapters_note(info) + except Exception: + chapters_text = None + if chapters_text: + notes = po.get("notes") + if not isinstance(notes, dict): + notes = {} + notes.setdefault("chapters", chapters_text) + po["notes"] = notes + + try: + sub_path = _best_subtitle_sidecar(media_path) + except Exception: + sub_path = None + if sub_path is not None: + sub_text = _read_text_file(sub_path) + if sub_text: + notes = po.get("notes") + if not isinstance(notes, dict): + notes = {} + notes["sub"] = sub_text + po["notes"] = notes + try: + sub_path.unlink() + except Exception: + pass + + out.append(po) + + return out + @staticmethod def _normalise_hash_hex(value: Optional[str]) -> Optional[str]: if not value or not isinstance(value, str): diff --git a/cmdlet/merge_file.py b/cmdlet/merge_file.py index 474e2c9..48aa577 100644 --- a/cmdlet/merge_file.py +++ b/cmdlet/merge_file.py @@ -191,7 +191,7 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: mode_hint = None forced_format = None - from cmdlet.add_file import Add_File + from cmdlet.download_file import Download_File expanded: List[Dict[str, Any]] = [] downloaded_any = False @@ -204,7 +204,7 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: expanded.append(it) continue - downloaded = Add_File._download_streaming_url_as_pipe_objects( + downloaded = Download_File.download_streaming_url_as_pipe_objects( u, config, mode_hint=mode_hint, diff --git a/cmdnat/help.py b/cmdnat/help.py index 8b8bcdf..2e9a497 100644 --- a/cmdnat/help.py +++ b/cmdnat/help.py @@ -1,10 +1,11 @@ from __future__ import annotations -from typing import Any, Dict, Sequence, List, Optional +from typing import Any, Dict, Sequence, List, Optional, Tuple import shlex import sys from cmdlet._shared import Cmdlet, CmdletArg, parse_cmdlet_args +from cmdlet import REGISTRY as CMDLET_REGISTRY, ensure_cmdlet_modules_loaded from SYS.logger import log from SYS.result_table import ResultTable from SYS import pipeline as ctx @@ -27,6 +28,118 @@ def _examples_for_cmd(name: str) -> List[str]: return lookup.get(key, []) +def _normalize_cmdlet_key(name: Optional[str]) -> str: + return str(name or "").replace("_", "-").lower().strip() + + +def _cmdlet_aliases(cmdlet_obj: Cmdlet) -> List[str]: + aliases: List[str] = [] + for attr in ("alias", "aliases"): + raw_aliases = getattr(cmdlet_obj, attr, None) + if isinstance(raw_aliases, (list, tuple, set)): + candidates = raw_aliases + else: + candidates = (raw_aliases,) + for alias in candidates or (): + text = str(alias or "").strip() + if text: + aliases.append(text) + seen: set[str] = set() + deduped: List[str] = [] + for alias in aliases: + key = alias.lower() + if key in seen: + continue + seen.add(key) + deduped.append(alias) + return deduped + + +def _cmdlet_arg_to_dict(arg: CmdletArg) -> Dict[str, Any]: + return { + "name": str(getattr(arg, "name", "") or ""), + "type": str(getattr(arg, "type", "") or ""), + "required": bool(getattr(arg, "required", False)), + "description": str(getattr(arg, "description", "") or ""), + "choices": [str(c) for c in list(getattr(arg, "choices", []) or [])], + "alias": str(getattr(arg, "alias", "") or ""), + "variadic": bool(getattr(arg, "variadic", False)), + "usage": str(getattr(arg, "usage", "") or ""), + "query_key": getattr(arg, "query_key", None), + "query_aliases": [str(c) for c in list(getattr(arg, "query_aliases", []) or [])], + "query_only": bool(getattr(arg, "query_only", False)), + "requires_db": bool(getattr(arg, "requires_db", False)), + } + + +def _build_alias_map_from_metadata(metadata: Dict[str, Dict[str, Any]]) -> Dict[str, str]: + mapping: Dict[str, str] = {} + for name, meta in metadata.items(): + canonical = _normalize_cmdlet_key(name) + if canonical: + mapping[canonical] = name + for alias in meta.get("aliases", []) or []: + alias_key = _normalize_cmdlet_key(alias) + if alias_key: + mapping[alias_key] = name + return mapping + + +def _gather_metadata_from_cmdlet_classes() -> Tuple[Dict[str, Dict[str, Any]], Dict[str, str]]: + metadata: Dict[str, Dict[str, Any]] = {} + alias_map: Dict[str, str] = {} + try: + ensure_cmdlet_modules_loaded() + except Exception: + pass + + for module in list(sys.modules.values()): + mod_name = getattr(module, "__name__", "") or "" + if not (mod_name.startswith("cmdlet.") or mod_name == "cmdlet" or mod_name.startswith("cmdnat.")): + continue + cmdlet_obj = getattr(module, "CMDLET", None) + if not isinstance(cmdlet_obj, Cmdlet): + continue + canonical_key = _normalize_cmdlet_key(getattr(cmdlet_obj, "name", None) or "") + if not canonical_key: + continue + entry = { + "name": str(getattr(cmdlet_obj, "name", "") or canonical_key), + "summary": str(getattr(cmdlet_obj, "summary", "") or ""), + "usage": str(getattr(cmdlet_obj, "usage", "") or ""), + "aliases": _cmdlet_aliases(cmdlet_obj), + "details": list(getattr(cmdlet_obj, "detail", []) or []), + "args": [_cmdlet_arg_to_dict(a) for a in getattr(cmdlet_obj, "arg", []) or []], + "raw": getattr(cmdlet_obj, "raw", None), + } + metadata[canonical_key] = entry + alias_map[canonical_key] = canonical_key + for alias in entry["aliases"]: + alias_key = _normalize_cmdlet_key(alias) + if alias_key: + alias_map[alias_key] = canonical_key + + for registry_name in CMDLET_REGISTRY.keys(): + normalized = _normalize_cmdlet_key(registry_name) + if not normalized or normalized in alias_map: + continue + alias_map[normalized] = normalized + metadata.setdefault( + normalized, + { + "name": normalized, + "aliases": [], + "usage": "", + "summary": "", + "details": [], + "args": [], + "raw": None, + }, + ) + + return metadata, alias_map + + def _find_cmd_metadata(name: str, metadata: Dict[str, Dict[str, @@ -148,17 +261,90 @@ def _render_detail(meta: Dict[str, Any], args: Sequence[str]) -> None: def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: + catalog: Any | None = None + cmdlet_names: List[str] = [] + metadata: Dict[str, Dict[str, Any]] = {} + alias_map: Dict[str, str] = {} + try: import cmdlet_catalog as _catalog - CMDLET.arg[0].choices = _normalize_choice_list( - _catalog.list_cmdlet_names(config=config) - ) - metadata = _catalog.list_cmdlet_metadata(config=config) + catalog = _catalog except Exception: - CMDLET.arg[0].choices = [] - metadata = {} + catalog = None + if catalog is not None: + try: + cmdlet_names = catalog.list_cmdlet_names(config=config) + except Exception: + cmdlet_names = [] + try: + metadata = catalog.list_cmdlet_metadata(config=config) + except Exception: + metadata = {} + + if metadata: + alias_map = _build_alias_map_from_metadata(metadata) + else: + metadata, alias_map = _gather_metadata_from_cmdlet_classes() + + if not metadata: + fallback_names = sorted(set(cmdlet_names or list(CMDLET_REGISTRY.keys()))) + if fallback_names: + base_meta: Dict[str, Dict[str, Any]] = {} + for cmdname in fallback_names: + canonical = str(cmdname or "").replace("_", "-").lower() + entry: Dict[str, Any] + candidate: Dict[str, Any] | None = None + if catalog is not None: + try: + candidate = catalog.get_cmdlet_metadata(cmdname, config=config) + except Exception: + candidate = None + if candidate: + canonical = candidate.get("name", canonical) + entry = candidate + else: + entry = { + "name": canonical, + "aliases": [], + "usage": "", + "summary": "", + "details": [], + "args": [], + "raw": None, + } + base = base_meta.setdefault( + canonical, + { + "name": canonical, + "aliases": [], + "usage": "", + "summary": "", + "details": [], + "args": [], + "raw": entry.get("raw"), + }, + ) + if entry.get("aliases"): + base_aliases = set(base.get("aliases", [])) + base_aliases.update([a for a in entry.get("aliases", []) if a]) + base["aliases"] = sorted(base_aliases) + if not base.get("usage") and entry.get("usage"): + base["usage"] = entry["usage"] + if not base.get("summary") and entry.get("summary"): + base["summary"] = entry["summary"] + if not base.get("details") and entry.get("details"): + base["details"] = entry["details"] + if not base.get("args") and entry.get("args"): + base["args"] = entry["args"] + if not base.get("raw") and entry.get("raw"): + base["raw"] = entry["raw"] + metadata = base_meta + alias_map = _build_alias_map_from_metadata(metadata) + + choice_candidates = list(alias_map.keys()) if alias_map else list(metadata.keys()) + CMDLET.arg[0].choices = _normalize_choice_list(choice_candidates) parsed = parse_cmdlet_args(args, CMDLET) filter_text = parsed.get("filter")