diff --git a/API/folder.py b/API/folder.py index 7c6713a..35da959 100644 --- a/API/folder.py +++ b/API/folder.py @@ -261,87 +261,88 @@ class API_folder_store: def _init_db(self) -> None: """Initialize database connection and create tables if needed.""" - try: - # Ensure the library root exists; sqlite cannot create parent dirs. + with self._db_lock: try: - # User safety: Folder store must be created in a blank folder/no files in it. - # If the DB already exists, we skip this check (it's an existing library). - should_check_empty = not self.db_path.exists() - - self.library_root.mkdir(parents=True, exist_ok=True) - - if should_check_empty: - # Check if there are any files or directories in the library root (excluding the DB itself if it was just created) - # We use a generator and next() for efficiency. - existing_items = [item for item in self.library_root.iterdir() if item.name != self.DB_NAME] - if existing_items: - # Log the items found for debugging - item_names = [i.name for i in existing_items[:5]] - if len(existing_items) > 5: - item_names.append("...") - raise RuntimeError( - f"Safety Check Failed: Local library root must be empty for new stores.\n" - f"Directory: {self.library_root}\n" - f"Found {len(existing_items)} items: {', '.join(item_names)}\n" - f"Please use a clean directory to prevent accidental hashing of existing files." - ) - - except RuntimeError: - # Re-raise our specific safety error - raise - except Exception as exc: - raise RuntimeError( - f"Cannot create/open library root directory: {self.library_root}: {exc}" - ) from exc - - # Use check_same_thread=False to allow multi-threaded access - # This is safe because we're not sharing connections across threads; - # each thread will get its own cursor - # Set a generous timeout to avoid "database is locked" errors during heavy concurrency - self.connection = sqlite3.connect( - str(self.db_path), - check_same_thread=False, - timeout=5.0 - ) - self.connection.row_factory = sqlite3.Row - - # Performance & Size Optimizations - # 1. WAL mode for better concurrency and fewer locks - self.connection.execute("PRAGMA journal_mode=WAL") - # 2. auto_vacuum=FULL to automatically reclaim space from deleted rows/logs - self.connection.execute("PRAGMA auto_vacuum = FULL") - # 3. Increase page size for modern file systems - self.connection.execute("PRAGMA page_size = 4096") - # 4. Memory and Sync optimizations - self.connection.execute("PRAGMA synchronous = NORMAL") - self.connection.execute("PRAGMA temp_store = MEMORY") - self.connection.execute("PRAGMA cache_size = -2000") - # Use memory mapping for the entire DB (up to 30MB) for near-instant reads - self.connection.execute("PRAGMA mmap_size = 30000000") - # 5. Standard features - self.connection.execute("PRAGMA foreign_keys = ON") - - # Bound how long sqlite will wait on locks before raising. - try: - self.connection.execute("PRAGMA busy_timeout = 5000") - except Exception: - pass - - self._create_tables() - - # Run maintenance if the DB has grown suspiciously large - self._run_maintenance_if_needed() - - logger.info(f"Database initialized at {self.db_path}") - except Exception as e: - logger.error(f"Failed to initialize database: {e}", exc_info=True) - if self.connection: + # Ensure the library root exists; sqlite cannot create parent dirs. try: - self.connection.close() + # User safety: Folder store must be created in a blank folder/no files in it. + # If the DB already exists, we skip this check (it's an existing library). + should_check_empty = not self.db_path.exists() + + self.library_root.mkdir(parents=True, exist_ok=True) + + if should_check_empty: + # Check if there are any files or directories in the library root (excluding the DB itself if it was just created) + # We use a generator and next() for efficiency. + existing_items = [item for item in self.library_root.iterdir() if item.name != self.DB_NAME] + if existing_items: + # Log the items found for debugging + item_names = [i.name for i in existing_items[:5]] + if len(existing_items) > 5: + item_names.append("...") + raise RuntimeError( + f"Safety Check Failed: Local library root must be empty for new stores.\n" + f"Directory: {self.library_root}\n" + f"Found {len(existing_items)} items: {', '.join(item_names)}\n" + f"Please use a clean directory to prevent accidental hashing of existing files." + ) + + except RuntimeError: + # Re-raise our specific safety error + raise + except Exception as exc: + raise RuntimeError( + f"Cannot create/open library root directory: {self.library_root}: {exc}" + ) from exc + + # Use check_same_thread=False to allow multi-threaded access + # This is safe because we're not sharing connections across threads; + # each thread will get its own cursor + # Set a generous timeout to avoid "database is locked" errors during heavy concurrency + self.connection = sqlite3.connect( + str(self.db_path), + check_same_thread=False, + timeout=5.0 + ) + self.connection.row_factory = sqlite3.Row + + # Performance & Size Optimizations + # 1. WAL mode for better concurrency and fewer locks + self.connection.execute("PRAGMA journal_mode=WAL") + # 2. auto_vacuum=FULL to automatically reclaim space from deleted rows/logs + self.connection.execute("PRAGMA auto_vacuum = FULL") + # 3. Increase page size for modern file systems + self.connection.execute("PRAGMA page_size = 4096") + # 4. Memory and Sync optimizations + self.connection.execute("PRAGMA synchronous = NORMAL") + self.connection.execute("PRAGMA temp_store = MEMORY") + self.connection.execute("PRAGMA cache_size = -2000") + # Use memory mapping for the entire DB (up to 30MB) for near-instant reads + self.connection.execute("PRAGMA mmap_size = 30000000") + # 5. Standard features + self.connection.execute("PRAGMA foreign_keys = ON") + + # Bound how long sqlite will wait on locks before raising. + try: + self.connection.execute("PRAGMA busy_timeout = 5000") except Exception: pass - self.connection = None - raise + + self._create_tables() + + # Run maintenance if the DB has grown suspiciously large + self._run_maintenance_if_needed() + + logger.info(f"Database initialized at {self.db_path}") + except Exception as e: + logger.error(f"Failed to initialize database: {e}", exc_info=True) + if self.connection: + try: + self.connection.close() + except Exception: + pass + self.connection = None + raise def _run_maintenance_if_needed(self) -> None: """Perform a one-time VACUUM if the database file is large.""" @@ -352,22 +353,34 @@ class API_folder_store: # Global cleanup of old workers and logs regardless of size self._global_cleanup() - # If the database is larger than 30MB, run a vacuum to ensure space is reclaimed. - # We only do this on startup to minimize performance impact. + # If the database is larger than 64MB, check if a vacuum is worth the time. + # We only do this check on startup to minimize performance impact. file_stats = self.db_path.stat() size_mb = file_stats.st_size / (1024 * 1024) - if size_mb > 30: - logger.debug(f"Database size ({size_mb:.1f}MB) exceeds maintenance threshold. Vacuuming...") - # We use a cursor to avoid blocking the main connection state if possible - self.connection.execute("VACUUM") - # Also optimize the query planner indices - self.connection.execute("ANALYZE") - - new_size_mb = self.db_path.stat().st_size / (1024 * 1024) - reduction = size_mb - new_size_mb - if reduction > 1.0: - logger.info(f"Maintenance reclaimed {reduction:.1f}MB. Current size: {new_size_mb:.1f}MB") + if size_mb > 64: + # Check fragmentation (freelist count) + try: + freelist = self.connection.execute("PRAGMA freelist_count").fetchone()[0] + page_size = self.connection.execute("PRAGMA page_size").fetchone()[0] + free_mb = (freelist * page_size) / (1024 * 1024) + + # If more than 25% or 10MB of the file is free space, it's worth a VACUUM. + if free_mb > 10 or (free_mb / size_mb) > 0.25: + logger.debug(f"Database size ({size_mb:.1f}MB) has {free_mb:.1f}MB free. Vacuuming...") + self.connection.execute("VACUUM") + # Also optimize the query planner indices + self.connection.execute("ANALYZE") + + new_size_mb = self.db_path.stat().st_size / (1024 * 1024) + reduction = size_mb - new_size_mb + if reduction > 1.0: + logger.info(f"Maintenance reclaimed {reduction:.1f}MB. Current size: {new_size_mb:.1f}MB") + except Exception as inner_e: + logger.debug(f"Refined maintenance check failed: {inner_e}") + # Fallback to simple size threshold if PRAGMA fails + if size_mb > 128: + self.connection.execute("VACUUM") except Exception as e: # Maintenance should never block application startup logger.warning(f"Database maintenance skipped: {e}") @@ -710,20 +723,21 @@ class API_folder_store: @_db_retry() def _update_metadata_modified_time(self, file_hash: str) -> None: """Update the time_modified timestamp for a file's metadata.""" - try: - cursor = self.connection.cursor() - cursor.execute( - """ - UPDATE metadata SET time_modified = CURRENT_TIMESTAMP WHERE hash = ? - """, - (file_hash, - ), - ) - self.connection.commit() - except Exception as e: - logger.debug( - f"Could not update metadata modified time for hash {file_hash}: {e}" - ) + with self._db_lock: + try: + cursor = self.connection.cursor() + cursor.execute( + """ + UPDATE metadata SET time_modified = CURRENT_TIMESTAMP WHERE hash = ? + """, + (file_hash, + ), + ) + self.connection.commit() + except Exception as e: + logger.debug( + f"Could not update metadata modified time for hash {file_hash}: {e}" + ) def get_or_create_file_entry( self, @@ -1149,90 +1163,87 @@ class API_folder_store: tags: List[str] ) -> None: """Save metadata and tags for a file in a single transaction.""" - try: - abs_path = self._normalize_input_path(file_path) - db_path = self._to_db_file_path(abs_path) - logger.debug(f"[save_file_info] Starting save for: {db_path}") + with self._db_lock: + try: + abs_path = self._normalize_input_path(file_path) + db_path = self._to_db_file_path(abs_path) + logger.debug(f"[save_file_info] Starting save for: {db_path}") - file_hash = self.get_or_create_file_entry(abs_path, metadata.get("hash")) + file_hash = self.get_or_create_file_entry(abs_path, metadata.get("hash")) - cursor = self.connection.cursor() + cursor = self.connection.cursor() - # 1. Save Metadata - url = metadata.get("url", []) - if not isinstance(url, str): - url = json.dumps(url) + # 1. Save Metadata + url = metadata.get("url", []) + if not isinstance(url, str): + url = json.dumps(url) - relationships = metadata.get("relationships", []) - if not isinstance(relationships, str): - relationships = json.dumps(relationships) + relationships = metadata.get("relationships", []) + if not isinstance(relationships, str): + relationships = json.dumps(relationships) - # Determine type from ext if not provided - file_type = metadata.get("type") - ext = metadata.get("ext") - if not file_type and ext: - from SYS.utils_constant import get_type_from_ext + # Determine type from ext if not provided + file_type = metadata.get("type") + ext = metadata.get("ext") + if not file_type and ext: + from SYS.utils_constant import get_type_from_ext - file_type = get_type_from_ext(str(ext)) + file_type = get_type_from_ext(str(ext)) - cursor.execute( - """ - INSERT INTO metadata ( - hash, url, relationships, - duration, size, ext, type, - time_imported, time_modified - ) - VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) - ON CONFLICT(hash) DO UPDATE SET - url = excluded.url, - relationships = excluded.relationships, - duration = excluded.duration, - size = excluded.size, - ext = excluded.ext, - type = excluded.type, - time_modified = CURRENT_TIMESTAMP, - updated_at = CURRENT_TIMESTAMP - """, - ( - file_hash, - url, - relationships, - metadata.get("duration"), - metadata.get("size"), - ext, - file_type, - ), - ) - - # 2. Save Tags - # We assume tags list is complete and includes title if needed - cursor.execute("DELETE FROM tag WHERE hash = ?", - (file_hash, - )) - - for tag in tags: - tag = tag.strip() - if tag: - cursor.execute( - """ - INSERT OR IGNORE INTO tag (hash, tag) - VALUES (?, ?) - """, - (file_hash, - tag), + cursor.execute( + """ + INSERT INTO metadata ( + hash, url, relationships, + duration, size, ext, type, + time_imported, time_modified ) + VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) + ON CONFLICT(hash) DO UPDATE SET + url = excluded.url, + relationships = excluded.relationships, + duration = excluded.duration, + size = excluded.size, + ext = excluded.ext, + type = excluded.type, + time_modified = CURRENT_TIMESTAMP, + updated_at = CURRENT_TIMESTAMP + """, + ( + file_hash, + url, + relationships, + metadata.get("duration"), + metadata.get("size"), + ext, + file_type, + ), + ) - self.connection.commit() - logger.debug( - f"[save_file_info] Committed metadata and tags for hash {file_hash}" - ) + # 2. Save Tags + # We assume tags list is complete and includes title if needed + cursor.execute("DELETE FROM tag WHERE hash = ?", (file_hash, )) - except Exception as e: - logger.error( - f"[save_file_info] ❌ Error saving file info for {file_path}: {e}", - exc_info=True - ) - raise + for tag in tags: + tag = tag.strip() + if tag: + cursor.execute( + """ + INSERT OR IGNORE INTO tag (hash, tag) + VALUES (?, ?) + """, + (file_hash, tag), + ) + + self.connection.commit() + logger.debug( + f"[save_file_info] Committed metadata and tags for hash {file_hash}" + ) + + except Exception as e: + logger.error( + f"[save_file_info] ❌ Error saving file info for {file_path}: {e}", + exc_info=True) + raise def get_tags(self, file_hash: str) -> List[str]: """Get all tags for a file by hash.""" @@ -1346,165 +1357,169 @@ class API_folder_store: @_db_retry() def add_tags(self, file_path: Path, tags: List[str]) -> None: """Add tags to a file.""" - try: - file_hash = self.get_or_create_file_entry(file_path) - cursor = self.connection.cursor() + with self._db_lock: + try: + file_hash = self.get_or_create_file_entry(file_path) + cursor = self.connection.cursor() - user_title_tag = next( - ( - tag.strip() - for tag in tags if tag.strip().lower().startswith("title:") - ), - None - ) - - if user_title_tag: - cursor.execute( - """ - DELETE FROM tag WHERE hash = ? AND tag LIKE 'title:%' - """, - (file_hash, - ), - ) - else: - cursor.execute( - """ - SELECT COUNT(*) FROM tag WHERE hash = ? AND tag LIKE 'title:%' - """, - (file_hash, - ), + user_title_tag = next( + ( + tag.strip() + for tag in tags if tag.strip().lower().startswith("title:") + ), + None ) - has_title = cursor.fetchone()[0] > 0 - if not has_title: - filename_without_ext = file_path.stem - if filename_without_ext: - # Normalize underscores to spaces for consistency - title_value = filename_without_ext.replace("_", " ").strip() - title_tag = f"title:{title_value}" + if user_title_tag: + cursor.execute( + """ + DELETE FROM tag WHERE hash = ? AND tag LIKE 'title:%' + """, + (file_hash, + ), + ) + else: + cursor.execute( + """ + SELECT COUNT(*) FROM tag WHERE hash = ? AND tag LIKE 'title:%' + """, + (file_hash, + ), + ) + + has_title = cursor.fetchone()[0] > 0 + if not has_title: + filename_without_ext = file_path.stem + if filename_without_ext: + # Normalize underscores to spaces for consistency + title_value = filename_without_ext.replace("_", " ").strip() + title_tag = f"title:{title_value}" + cursor.execute( + """ + INSERT OR IGNORE INTO tag (hash, tag) + VALUES (?, ?) + """, + (file_hash, + title_tag), + ) + + for tag in tags: + tag = tag.strip() + if tag: cursor.execute( """ INSERT OR IGNORE INTO tag (hash, tag) VALUES (?, ?) """, (file_hash, - title_tag), + tag), ) - for tag in tags: - tag = tag.strip() - if tag: - cursor.execute( - """ - INSERT OR IGNORE INTO tag (hash, tag) - VALUES (?, ?) - """, - (file_hash, - tag), - ) - - self.connection.commit() - self._update_metadata_modified_time(file_hash) - logger.debug(f"Added {len(tags)} tags for {file_path}") - except Exception as e: - logger.error(f"Error adding tags for {file_path}: {e}", exc_info=True) - raise + self.connection.commit() + self._update_metadata_modified_time(file_hash) + logger.debug(f"Added {len(tags)} tags for {file_path}") + except Exception as e: + logger.error(f"Error adding tags for {file_path}: {e}", exc_info=True) + raise @_db_retry() def remove_tags(self, file_path: Path, tags: List[str]) -> None: """Remove specific tags from a file.""" - try: - file_hash = self.get_or_create_file_entry(file_path) - cursor = self.connection.cursor() + with self._db_lock: + try: + file_hash = self.get_or_create_file_entry(file_path) + cursor = self.connection.cursor() - for tag in tags: - tag = tag.strip() - if tag: - cursor.execute( - """ - DELETE FROM tag - WHERE hash = ? - AND tag = ? - """, - (file_hash, - tag), - ) + for tag in tags: + tag = tag.strip() + if tag: + cursor.execute( + """ + DELETE FROM tag + WHERE hash = ? + AND tag = ? + """, + (file_hash, + tag), + ) - self.connection.commit() - logger.debug(f"Removed {len(tags)} tags for {file_path}") - except Exception as e: - logger.error(f"Error removing tags for {file_path}: {e}", exc_info=True) - raise + self.connection.commit() + logger.debug(f"Removed {len(tags)} tags for {file_path}") + except Exception as e: + logger.error(f"Error removing tags for {file_path}: {e}", exc_info=True) + raise @_db_retry() def add_tags_to_hash(self, file_hash: str, tags: List[str]) -> None: """Add tags to a file by hash.""" - try: - cursor = self.connection.cursor() + with self._db_lock: + try: + cursor = self.connection.cursor() - user_title_tag = next( - ( - tag.strip() - for tag in tags if tag.strip().lower().startswith("title:") - ), - None - ) - - if user_title_tag: - cursor.execute( - """ - DELETE FROM tag WHERE hash = ? AND tag LIKE 'title:%' - """, - (file_hash, - ), + user_title_tag = next( + ( + tag.strip() + for tag in tags if tag.strip().lower().startswith("title:") + ), + None ) - for tag in tags: - tag = tag.strip() - if tag: + if user_title_tag: cursor.execute( """ - INSERT OR IGNORE INTO tag (hash, tag) - VALUES (?, ?) + DELETE FROM tag WHERE hash = ? AND tag LIKE 'title:%' """, (file_hash, - tag), + ), ) - self.connection.commit() - self._update_metadata_modified_time(file_hash) - logger.debug(f"Added {len(tags)} tags for hash {file_hash}") - except Exception as e: - logger.error(f"Error adding tags for hash {file_hash}: {e}", exc_info=True) - raise + for tag in tags: + tag = tag.strip() + if tag: + cursor.execute( + """ + INSERT OR IGNORE INTO tag (hash, tag) + VALUES (?, ?) + """, + (file_hash, + tag), + ) + + self.connection.commit() + self._update_metadata_modified_time(file_hash) + logger.debug(f"Added {len(tags)} tags for hash {file_hash}") + except Exception as e: + logger.error(f"Error adding tags for hash {file_hash}: {e}", exc_info=True) + raise @_db_retry() def remove_tags_from_hash(self, file_hash: str, tags: List[str]) -> None: """Remove specific tags from a file by hash.""" - try: - cursor = self.connection.cursor() + with self._db_lock: + try: + cursor = self.connection.cursor() - for tag in tags: - tag = tag.strip() - if tag: - cursor.execute( - """ - DELETE FROM tag - WHERE hash = ? - AND tag = ? - """, - (file_hash, - tag), - ) + for tag in tags: + tag = tag.strip() + if tag: + cursor.execute( + """ + DELETE FROM tag + WHERE hash = ? + AND tag = ? + """, + (file_hash, + tag), + ) - self.connection.commit() - logger.debug(f"Removed {len(tags)} tags for hash {file_hash}") - except Exception as e: - logger.error( - f"Error removing tags for hash {file_hash}: {e}", - exc_info=True - ) - raise + self.connection.commit() + logger.debug(f"Removed {len(tags)} tags for hash {file_hash}") + except Exception as e: + logger.error( + f"Error removing tags for hash {file_hash}: {e}", + exc_info=True + ) + raise @_db_retry() def update_metadata_by_hash( @@ -1514,42 +1529,43 @@ class API_folder_store: Any] ) -> None: """Update metadata for a file by hash.""" - try: - cursor = self.connection.cursor() + with self._db_lock: + try: + cursor = self.connection.cursor() - fields = [] - values = [] + fields = [] + values = [] - for key, value in metadata_updates.items(): - if key in ["url", "relationships"]: - if not isinstance(value, str): - value = json.dumps(value) - fields.append(f"{key} = ?") - values.append(value) + for key, value in metadata_updates.items(): + if key in ["url", "relationships"]: + if not isinstance(value, str): + value = json.dumps(value) + fields.append(f"{key} = ?") + values.append(value) - if not fields: - return + if not fields: + return - # Ensure a metadata row exists so updates don't silently no-op. - # This can happen for older DBs or entries created without explicit metadata. - cursor.execute( - "INSERT OR IGNORE INTO metadata (hash) VALUES (?)", - (file_hash, - ), - ) + # Ensure a metadata row exists so updates don't silently no-op. + # This can happen for older DBs or entries created without explicit metadata. + cursor.execute( + "INSERT OR IGNORE INTO metadata (hash) VALUES (?)", + (file_hash, + ), + ) - values.append(file_hash) + values.append(file_hash) - sql = f"UPDATE metadata SET {', '.join(fields)}, time_modified = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP WHERE hash = ?" - cursor.execute(sql, values) - self.connection.commit() - logger.debug(f"Updated metadata for hash {file_hash}") - except Exception as e: - logger.error( - f"Error updating metadata for hash {file_hash}: {e}", - exc_info=True - ) - raise + sql = f"UPDATE metadata SET {', '.join(fields)}, time_modified = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP WHERE hash = ?" + cursor.execute(sql, values) + self.connection.commit() + logger.debug(f"Updated metadata for hash {file_hash}") + except Exception as e: + logger.error( + f"Error updating metadata for hash {file_hash}: {e}", + exc_info=True + ) + raise def set_relationship( self, @@ -1566,102 +1582,50 @@ class API_folder_store: related_file_path: Path to the related file rel_type: Type of relationship ('king', 'alt', 'related') """ - try: - str_path = str(file_path.resolve()) - str_related_path = str(related_file_path.resolve()) - - file_hash = self.get_or_create_file_entry(file_path) - related_file_hash = self.get_or_create_file_entry(related_file_path) - - cursor = self.connection.cursor() - - # Get current relationships for the main file - cursor.execute( - """ - SELECT relationships FROM metadata WHERE hash = ? - """, - (file_hash, - ), - ) - - row = cursor.fetchone() - # Use index access to be safe regardless of row_factory - relationships_str = row[0] if row else None - + with self._db_lock: try: - if relationships_str: - relationships = json.loads(relationships_str) - else: - relationships = {} - except (json.JSONDecodeError, TypeError): - relationships = {} + str_path = str(file_path.resolve()) + str_related_path = str(related_file_path.resolve()) - # Ensure relationships is a dict (handle case where DB has a list) - if not isinstance(relationships, dict): - relationships = {} + file_hash = self.get_or_create_file_entry(file_path) + related_file_hash = self.get_or_create_file_entry(related_file_path) - # Ensure rel_type key exists - if rel_type not in relationships: - relationships[rel_type] = [] + cursor = self.connection.cursor() - # Add the relationship (store as hash string) - if related_file_hash not in relationships[rel_type]: - relationships[rel_type].append(related_file_hash) - - # Save the updated relationships for the main file - cursor.execute( - """ - INSERT INTO metadata (hash, relationships) - VALUES (?, ?) - ON CONFLICT(hash) DO UPDATE SET - relationships = excluded.relationships, - time_modified = CURRENT_TIMESTAMP - """, - (file_hash, - json.dumps(relationships)), - ) - - logger.debug( - f"Set {rel_type} relationship: {str_path} ({file_hash}) -> {str_related_path} ({related_file_hash})" - ) - - if bidirectional: - # Set reverse relationship (bidirectional) - # For 'alt' and 'related', the reverse is the same - # For 'king', the reverse is 'subject' (or we just use 'alt' for simplicity as Hydrus does) - # Let's use the same type for now to keep it simple and consistent with Hydrus 'alternates' - reverse_type = rel_type - - # Update the related file + # Get current relationships for the main file cursor.execute( """ SELECT relationships FROM metadata WHERE hash = ? """, - (related_file_hash, - ), + (file_hash, + ), ) row = cursor.fetchone() + # Use index access to be safe regardless of row_factory relationships_str = row[0] if row else None try: if relationships_str: - reverse_relationships = json.loads(relationships_str) + relationships = json.loads(relationships_str) else: - reverse_relationships = {} + relationships = {} except (json.JSONDecodeError, TypeError): - reverse_relationships = {} + relationships = {} - if not isinstance(reverse_relationships, dict): - reverse_relationships = {} + # Ensure relationships is a dict (handle case where DB has a list) + if not isinstance(relationships, dict): + relationships = {} - if reverse_type not in reverse_relationships: - reverse_relationships[reverse_type] = [] + # Ensure rel_type key exists + if rel_type not in relationships: + relationships[rel_type] = [] - if file_hash not in reverse_relationships[reverse_type]: - reverse_relationships[reverse_type].append(file_hash) + # Add the relationship (store as hash string) + if related_file_hash not in relationships[rel_type]: + relationships[rel_type].append(related_file_hash) - # Save the updated reverse relationships + # Save the updated relationships for the main file cursor.execute( """ INSERT INTO metadata (hash, relationships) @@ -1670,17 +1634,70 @@ class API_folder_store: relationships = excluded.relationships, time_modified = CURRENT_TIMESTAMP """, - (related_file_hash, - json.dumps(reverse_relationships)), + (file_hash, + json.dumps(relationships)), ) - self.connection.commit() - else: - self.connection.commit() + logger.debug( + f"Set {rel_type} relationship: {str_path} ({file_hash}) -> {str_related_path} ({related_file_hash})" + ) - except Exception as e: - logger.error(f"Error setting relationship: {e}", exc_info=True) - raise + if bidirectional: + # Set reverse relationship (bidirectional) + # For 'alt' and 'related', the reverse is the same + # For 'king', the reverse is 'subject' (or we just use 'alt' for simplicity as Hydrus does) + # Let's use the same type for now to keep it simple and consistent with Hydrus 'alternates' + reverse_type = rel_type + + # Update the related file + cursor.execute( + """ + SELECT relationships FROM metadata WHERE hash = ? + """, + (related_file_hash, + ), + ) + + row = cursor.fetchone() + relationships_str = row[0] if row else None + + try: + if relationships_str: + reverse_relationships = json.loads(relationships_str) + else: + reverse_relationships = {} + except (json.JSONDecodeError, TypeError): + reverse_relationships = {} + + if not isinstance(reverse_relationships, dict): + reverse_relationships = {} + + if reverse_type not in reverse_relationships: + reverse_relationships[reverse_type] = [] + + if file_hash not in reverse_relationships[reverse_type]: + reverse_relationships[reverse_type].append(file_hash) + + # Save the updated reverse relationships + cursor.execute( + """ + INSERT INTO metadata (hash, relationships) + VALUES (?, ?) + ON CONFLICT(hash) DO UPDATE SET + relationships = excluded.relationships, + time_modified = CURRENT_TIMESTAMP + """, + (related_file_hash, + json.dumps(reverse_relationships)), + ) + + self.connection.commit() + else: + self.connection.commit() + + except Exception as e: + logger.error(f"Error setting relationship: {e}", exc_info=True) + raise def find_files_pointing_to(self, target_path: Path) -> List[Dict[str, Any]]: """Find all files that have a relationship pointing to the target path. @@ -1757,50 +1774,52 @@ class API_folder_store: def set_note(self, file_path: Path, name: str, note: str) -> None: """Set a named note for a file.""" - try: - note_name = str(name or "").strip() - if not note_name: - raise ValueError("Note name is required") + with self._db_lock: + try: + note_name = str(name or "").strip() + if not note_name: + raise ValueError("Note name is required") - file_hash = self.get_or_create_file_entry(file_path) - cursor = self.connection.cursor() - cursor.execute( - """ - INSERT INTO note (hash, name, note) - VALUES (?, ?, ?) - ON CONFLICT(hash, name) DO UPDATE SET - note = excluded.note, - updated_at = CURRENT_TIMESTAMP - """, - (file_hash, - note_name, - note), - ) - self.connection.commit() - logger.debug(f"Saved note '{note_name}' for {file_path}") - except Exception as e: - logger.error(f"Error saving note for {file_path}: {e}", exc_info=True) - raise + file_hash = self.get_or_create_file_entry(file_path) + cursor = self.connection.cursor() + cursor.execute( + """ + INSERT INTO note (hash, name, note) + VALUES (?, ?, ?) + ON CONFLICT(hash, name) DO UPDATE SET + note = excluded.note, + updated_at = CURRENT_TIMESTAMP + """, + (file_hash, + note_name, + note), + ) + self.connection.commit() + logger.debug(f"Saved note '{note_name}' for {file_path}") + except Exception as e: + logger.error(f"Error saving note for {file_path}: {e}", exc_info=True) + raise def delete_note(self, file_hash: str, name: str) -> None: """Delete a named note for a file by hash.""" - try: - note_name = str(name or "").strip() - if not note_name: - raise ValueError("Note name is required") - cursor = self.connection.cursor() - cursor.execute( - "DELETE FROM note WHERE hash = ? AND name = ?", - (file_hash, - note_name), - ) - self.connection.commit() - except Exception as e: - logger.error( - f"Error deleting note '{name}' for hash {file_hash}: {e}", - exc_info=True - ) - raise + with self._db_lock: + try: + note_name = str(name or "").strip() + if not note_name: + raise ValueError("Note name is required") + cursor = self.connection.cursor() + cursor.execute( + "DELETE FROM note WHERE hash = ? AND name = ?", + (file_hash, + note_name), + ) + self.connection.commit() + except Exception as e: + logger.error( + f"Error deleting note '{name}' for hash {file_hash}: {e}", + exc_info=True + ) + raise def search_by_tag(self, tag: str, limit: int = 100) -> List[tuple]: """Search for files with a specific tag. Returns list of (hash, file_path) tuples.""" @@ -1920,99 +1939,100 @@ class API_folder_store: backlinks in other files so no file retains dangling references to the deleted hash. """ - try: - abs_path = self._normalize_input_path(file_path) - str_path = self._to_db_file_path(abs_path) - cursor = self.connection.cursor() - - # Get the hash first (for logging) - cursor.execute("SELECT hash FROM file WHERE file_path = ?", - (str_path, - )) - row = cursor.fetchone() - if not row: - logger.debug(f"File not found in database: {str_path}") - return False - - file_hash = row[0] - - # Remove backlinks from other files that reference this hash. + with self._db_lock: try: - target_hash = str(file_hash or "").strip().lower() - backlinks = self.find_files_pointing_to_hash(target_hash) - by_src: Dict[str, - set[str]] = {} - for b in backlinks: - src = str((b or {}).get("hash") or "").strip().lower() - rt = str((b or {}).get("type") or "").strip() - if not src or src == target_hash or not rt: - continue - by_src.setdefault(src, set()).add(rt) + abs_path = self._normalize_input_path(file_path) + str_path = self._to_db_file_path(abs_path) + cursor = self.connection.cursor() - for src_hash, rel_types in by_src.items(): - meta = self.get_metadata(src_hash) or {} - rels = meta.get("relationships") if isinstance(meta, dict) else None - if not isinstance(rels, dict) or not rels: - continue + # Get the hash first (for logging) + cursor.execute("SELECT hash FROM file WHERE file_path = ?", + (str_path, + )) + row = cursor.fetchone() + if not row: + logger.debug(f"File not found in database: {str_path}") + return False - changed = False - for rt in rel_types: - key_to_edit = None - for k in list(rels.keys()): - if str(k).lower() == str(rt).lower(): - key_to_edit = str(k) - break - if not key_to_edit: + file_hash = row[0] + + # Remove backlinks from other files that reference this hash. + try: + target_hash = str(file_hash or "").strip().lower() + backlinks = self.find_files_pointing_to_hash(target_hash) + by_src: Dict[str, + set[str]] = {} + for b in backlinks: + src = str((b or {}).get("hash") or "").strip().lower() + rt = str((b or {}).get("type") or "").strip() + if not src or src == target_hash or not rt: + continue + by_src.setdefault(src, set()).add(rt) + + for src_hash, rel_types in by_src.items(): + meta = self.get_metadata(src_hash) or {} + rels = meta.get("relationships") if isinstance(meta, dict) else None + if not isinstance(rels, dict) or not rels: continue - bucket = rels.get(key_to_edit) - if not isinstance(bucket, list) or not bucket: - continue + changed = False + for rt in rel_types: + key_to_edit = None + for k in list(rels.keys()): + if str(k).lower() == str(rt).lower(): + key_to_edit = str(k) + break + if not key_to_edit: + continue - new_bucket = [ - h for h in bucket - if str(h or "").strip().lower() != target_hash - ] - if len(new_bucket) == len(bucket): - continue + bucket = rels.get(key_to_edit) + if not isinstance(bucket, list) or not bucket: + continue - changed = True - if new_bucket: - rels[key_to_edit] = new_bucket - else: - try: - del rels[key_to_edit] - except Exception: - rels[key_to_edit] = [] + new_bucket = [ + h for h in bucket + if str(h or "").strip().lower() != target_hash + ] + if len(new_bucket) == len(bucket): + continue - if changed: - cursor.execute( - """ - INSERT INTO metadata (hash, relationships) - VALUES (?, ?) - ON CONFLICT(hash) DO UPDATE SET - relationships = excluded.relationships, - time_modified = CURRENT_TIMESTAMP, - updated_at = CURRENT_TIMESTAMP - """, - (src_hash, - json.dumps(rels if rels else {})), - ) - except Exception: - # Best-effort cleanup; deletion should still proceed. - pass + changed = True + if new_bucket: + rels[key_to_edit] = new_bucket + else: + try: + del rels[key_to_edit] + except Exception: + rels[key_to_edit] = [] - # Delete the file entry (cascades to metadata, tags, notes, etc via foreign keys) - cursor.execute("DELETE FROM file WHERE file_path = ?", - (str_path, - )) - self.connection.commit() + if changed: + cursor.execute( + """ + INSERT INTO metadata (hash, relationships) + VALUES (?, ?) + ON CONFLICT(hash) DO UPDATE SET + relationships = excluded.relationships, + time_modified = CURRENT_TIMESTAMP, + updated_at = CURRENT_TIMESTAMP + """, + (src_hash, + json.dumps(rels if rels else {})), + ) + except Exception: + # Best-effort cleanup; deletion should still proceed. + pass - logger.debug(f"Deleted file from database: {str_path} (hash: {file_hash})") - return cursor.rowcount > 0 - except Exception as e: - logger.error(f"Error deleting file {file_path}: {e}", exc_info=True) - return False + # Delete the file entry (cascades to metadata, tags, notes, etc via foreign keys) + cursor.execute("DELETE FROM file WHERE file_path = ?", + (str_path, + )) + self.connection.commit() + + logger.debug(f"Deleted file from database: {str_path} (hash: {file_hash})") + return cursor.rowcount > 0 + except Exception as e: + logger.error(f"Error deleting file {file_path}: {e}", exc_info=True) + return False # ======================================================================== # WORKER MANAGEMENT @@ -2065,46 +2085,47 @@ class API_folder_store: def update_worker(self, worker_id: str, **kwargs) -> bool: """Update worker entry with given fields.""" - try: - allowed_fields = { - "status", - "progress", - "current_step", - "error_message", - "result_data", - "title", - "description", - "completed_at", - "total_steps", - "pipe", - "started_at", - "last_stdout_at", - } - update_fields = { - k: v - for k, v in kwargs.items() if k in allowed_fields - } + with self._db_lock: + try: + allowed_fields = { + "status", + "progress", + "current_step", + "error_message", + "result_data", + "title", + "description", + "completed_at", + "total_steps", + "pipe", + "started_at", + "last_stdout_at", + } + update_fields = { + k: v + for k, v in kwargs.items() if k in allowed_fields + } - if not update_fields: - return True + if not update_fields: + return True - update_fields["last_updated"] = datetime.now().isoformat() - cursor = self.connection.cursor() - set_clause = ", ".join(f"{k} = ?" for k in update_fields.keys()) - values = list(update_fields.values()) + [worker_id] + update_fields["last_updated"] = datetime.now().isoformat() + cursor = self.connection.cursor() + set_clause = ", ".join(f"{k} = ?" for k in update_fields.keys()) + values = list(update_fields.values()) + [worker_id] - cursor.execute( - f""" - UPDATE worker SET {set_clause} WHERE worker_id = ? - """, - values, - ) + cursor.execute( + f""" + UPDATE worker SET {set_clause} WHERE worker_id = ? + """, + values, + ) - self.connection.commit() - return cursor.rowcount > 0 - except Exception as e: - logger.error(f"Error updating worker {worker_id}: {e}", exc_info=True) - return False + self.connection.commit() + return cursor.rowcount > 0 + except Exception as e: + logger.error(f"Error updating worker {worker_id}: {e}", exc_info=True) + return False def update_worker_status(self, worker_id: str, status: str) -> int: """Update worker status and return its database ID.""" @@ -2187,16 +2208,17 @@ class API_folder_store: def delete_worker(self, worker_id: str) -> bool: """Delete a worker entry.""" - try: - cursor = self.connection.cursor() - cursor.execute("DELETE FROM worker WHERE worker_id = ?", - (worker_id, - )) - self.connection.commit() - return cursor.rowcount > 0 - except Exception as e: - logger.error(f"Error deleting worker: {e}", exc_info=True) - return False + with self._db_lock: + try: + cursor = self.connection.cursor() + cursor.execute("DELETE FROM worker WHERE worker_id = ?", + (worker_id, + )) + self.connection.commit() + return cursor.rowcount > 0 + except Exception as e: + logger.error(f"Error deleting worker: {e}", exc_info=True) + return False def cleanup_old_workers(self, days: int = 7) -> int: """Clean up completed/errored workers older than specified days.""" @@ -2294,71 +2316,72 @@ class API_folder_store: """Append text to a worker's stdout log and timeline.""" if not text: return True - try: - # Check if connection is valid - if not self.connection: - logger.warning( - f"Database connection not available for worker {worker_id}" + with self._db_lock: + try: + # Check if connection is valid + if not self.connection: + logger.warning( + f"Database connection not available for worker {worker_id}" + ) + return False + + cursor = self.connection.cursor() + cursor.execute( + "SELECT stdout FROM worker WHERE worker_id = ?", + (worker_id, + ) + ) + row = cursor.fetchone() + + if not row: + logger.warning(f"Worker {worker_id} not found for stdout append") + return False + + current_stdout = row[0] or "" + separator = ( + "" if not current_stdout else + ("" if current_stdout.endswith("\n") else "\n") + ) + new_stdout = f"{current_stdout}{separator}{text}\n" + + cursor.execute( + """ + UPDATE worker SET stdout = ?, last_updated = CURRENT_TIMESTAMP, + last_stdout_at = CURRENT_TIMESTAMP + WHERE worker_id = ? + """, + (new_stdout, + worker_id), + ) + self._insert_worker_log_entry( + cursor, + worker_id, + "stdout", + text, + step, + channel + ) + + self.connection.commit() + return cursor.rowcount > 0 + except sqlite3.ProgrammingError as e: + # Handle "Cannot operate on a closed database" gracefully + if "closed database" in str(e).lower(): + logger.warning( + f"Database connection closed, cannot append stdout for worker {worker_id}" + ) + return False + logger.error( + f"Error appending stdout to worker {worker_id}: {e}", + exc_info=True ) return False - - cursor = self.connection.cursor() - cursor.execute( - "SELECT stdout FROM worker WHERE worker_id = ?", - (worker_id, - ) - ) - row = cursor.fetchone() - - if not row: - logger.warning(f"Worker {worker_id} not found for stdout append") - return False - - current_stdout = row[0] or "" - separator = ( - "" if not current_stdout else - ("" if current_stdout.endswith("\n") else "\n") - ) - new_stdout = f"{current_stdout}{separator}{text}\n" - - cursor.execute( - """ - UPDATE worker SET stdout = ?, last_updated = CURRENT_TIMESTAMP, - last_stdout_at = CURRENT_TIMESTAMP - WHERE worker_id = ? - """, - (new_stdout, - worker_id), - ) - self._insert_worker_log_entry( - cursor, - worker_id, - "stdout", - text, - step, - channel - ) - - self.connection.commit() - return cursor.rowcount > 0 - except sqlite3.ProgrammingError as e: - # Handle "Cannot operate on a closed database" gracefully - if "closed database" in str(e).lower(): - logger.warning( - f"Database connection closed, cannot append stdout for worker {worker_id}" + except Exception as e: + logger.error( + f"Error appending stdout to worker {worker_id}: {e}", + exc_info=True ) return False - logger.error( - f"Error appending stdout to worker {worker_id}: {e}", - exc_info=True - ) - return False - except Exception as e: - logger.error( - f"Error appending stdout to worker {worker_id}: {e}", - exc_info=True - ) - return False def get_worker_stdout(self, worker_id: str) -> str: """Get stdout logs for a worker.""" diff --git a/SYS/models.py b/SYS/models.py index 07bb199..615d91b 100644 --- a/SYS/models.py +++ b/SYS/models.py @@ -7,6 +7,7 @@ import os import shutil import sys import time +from threading import RLock from dataclasses import dataclass, field from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Protocol, TextIO @@ -755,6 +756,7 @@ class PipelineLiveProgress: def __init__(self, pipe_labels: List[str], *, enabled: bool = True) -> None: self._enabled = bool(enabled) self._pipe_labels = [str(x) for x in (pipe_labels or [])] + self._lock = RLock() self._console: Optional[Console] = None self._live: Optional[Live] = None @@ -826,26 +828,27 @@ class PipelineLiveProgress: the spinner without needing manual Live.update() calls. """ - pipe_progress = self._pipe_progress - status = self._status - transfers = self._transfers - overall = self._overall - if pipe_progress is None or transfers is None or overall is None: - # Not started (or stopped). - yield Panel("", title="Pipeline", expand=False) - return + with self._lock: + pipe_progress = self._pipe_progress + status = self._status + transfers = self._transfers + overall = self._overall + if pipe_progress is None or transfers is None or overall is None: + # Not started (or stopped). + yield Panel("", title="Pipeline", expand=False) + return - body_parts: List[Any] = [pipe_progress] - if status is not None and self._status_tasks: - body_parts.append(status) - body_parts.append(transfers) + body_parts: List[Any] = [pipe_progress] + if status is not None and self._status_tasks: + body_parts.append(status) + body_parts.append(transfers) - yield Group( - Panel(Group(*body_parts), - title=self._title_text(), - expand=False), - overall - ) + yield Group( + Panel(Group(*body_parts), + title=self._title_text(), + expand=False), + overall + ) def _render_group(self) -> Group: # Backward-compatible helper (some callers may still expect a Group). @@ -1029,52 +1032,58 @@ class PipelineLiveProgress: return if not self._ensure_pipe(int(pipe_index)): return - prog = self._status - if prog is None: - return + + with self._lock: + prog = self._status + if prog is None: + return - try: - pidx = int(pipe_index) - msg = str(text or "").strip() - except Exception: - return - - # For long single-item work, hide the per-item spinner line and use this - # dedicated status line instead. - if self._pipe_percent_mode.get(pidx, False): try: - self._hide_pipe_subtasks(pidx) + pidx = int(pipe_index) + msg = str(text or "").strip() + except Exception: + return + + # For long single-item work, hide the per-item spinner line and use this + # dedicated status line instead. + if self._pipe_percent_mode.get(pidx, False): + try: + self._hide_pipe_subtasks(pidx) + except Exception: + pass + + task_id = self._status_tasks.get(pidx) + if task_id is None: + try: + task_id = prog.add_task(msg) + except Exception: + return + self._status_tasks[pidx] = task_id + + try: + prog.update(task_id, description=msg, refresh=True) except Exception: pass - task_id = self._status_tasks.get(pidx) - if task_id is None: + def clear_pipe_status_text(self, pipe_index: int) -> None: + if not self._enabled: + return + + with self._lock: + prog = self._status + if prog is None: + return try: - task_id = prog.add_task(msg) + pidx = int(pipe_index) except Exception: return - self._status_tasks[pidx] = task_id - - try: - prog.update(task_id, description=msg, refresh=True) - except Exception: - pass - - def clear_pipe_status_text(self, pipe_index: int) -> None: - prog = self._status - if prog is None: - return - try: - pidx = int(pipe_index) - except Exception: - return - task_id = self._status_tasks.pop(pidx, None) - if task_id is None: - return - try: - prog.remove_task(task_id) - except Exception: - pass + task_id = self._status_tasks.pop(pidx, None) + if task_id is None: + return + try: + prog.remove_task(task_id) + except Exception: + pass def set_pipe_percent(self, pipe_index: int, percent: int) -> None: """Update the pipe bar as a percent (only when single-item mode is enabled).""" @@ -1095,6 +1104,31 @@ class PipelineLiveProgress: pct = max(0, min(100, int(percent))) pipe_task = self._pipe_tasks[pidx] pipe_progress.update(pipe_task, completed=pct, total=100, refresh=True) + self._update_overall() + except Exception: + pass + + def _update_overall(self) -> None: + """Update the overall pipeline progress task.""" + if self._overall is None or self._overall_task is None: + return + + completed = 0 + try: + # Count a pipe as completed if its 'done' count matches or exceeds the advertised total. + completed = sum( + 1 for i in range(len(self._pipe_labels)) + if self._pipe_done[i] >= max(1, self._pipe_totals[i]) + ) + except Exception: + completed = 0 + + try: + self._overall.update( + self._overall_task, + completed=min(completed, max(1, len(self._pipe_labels))), + description=f"Pipeline: {completed}/{len(self._pipe_labels)} pipes completed", + ) except Exception: pass @@ -1108,24 +1142,25 @@ class PipelineLiveProgress: if not self._ensure_pipe(int(pipe_index)): return - try: - pidx = int(pipe_index) - tot = max(1, int(total_steps)) - except Exception: - return + with self._lock: + try: + pidx = int(pipe_index) + tot = max(1, int(total_steps)) + except Exception: + return - self._pipe_step_total[pidx] = tot - self._pipe_step_done[pidx] = 0 + self._pipe_step_total[pidx] = tot + self._pipe_step_done[pidx] = 0 - # Reset status line and percent. - try: - self.clear_pipe_status_text(pidx) - except Exception: - pass - try: - self.set_pipe_percent(pidx, 0) - except Exception: - pass + # Reset status line and percent. + try: + self.clear_pipe_status_text(pidx) + except Exception: + pass + try: + self.set_pipe_percent(pidx, 0) + except Exception: + pass def advance_pipe_step(self, pipe_index: int, text: str) -> None: """Advance the pipe's step counter by one. @@ -1287,6 +1322,8 @@ class PipelineLiveProgress: except Exception: pass + self._update_overall() + labels: List[str] = [] if isinstance(items_preview, list) and items_preview: labels = [_pipeline_progress_item_label(x) for x in items_preview] @@ -1372,6 +1409,8 @@ class PipelineLiveProgress: else: pipe_progress.update(pipe_task, completed=done) + self._update_overall() + # Clear any status line now that it emitted. try: self.clear_pipe_status_text(pipe_index) @@ -1452,23 +1491,7 @@ class PipelineLiveProgress: except Exception: pass - if self._overall_task is not None: - completed = 0 - try: - completed = sum( - 1 for i in range(len(self._pipe_labels)) - if self._pipe_done[i] >= max(1, self._pipe_totals[i]) - ) - except Exception: - completed = 0 - overall.update( - self._overall_task, - completed=min(completed, - max(1, - len(self._pipe_labels))), - description= - f"Pipeline: {completed}/{len(self._pipe_labels)} pipes completed", - ) + self._update_overall() class PipelineStageContext: diff --git a/Store/HydrusNetwork.py b/Store/HydrusNetwork.py index 411c7a3..eb205c3 100644 --- a/Store/HydrusNetwork.py +++ b/Store/HydrusNetwork.py @@ -325,8 +325,11 @@ class HydrusNetwork(Store): ] try: - # Compute file hash - file_hash = sha256_file(file_path) + # Compute file hash (or use hint from kwargs to avoid redundant IO) + file_hash = kwargs.get("hash") or kwargs.get("file_hash") + if not file_hash: + file_hash = sha256_file(file_path) + debug(f"{self._log_prefix()} file hash: {file_hash}") # Use persistent client with session key diff --git a/cmdlet/add_file.py b/cmdlet/add_file.py index 3046e57..f60558c 100644 --- a/cmdlet/add_file.py +++ b/cmdlet/add_file.py @@ -369,11 +369,12 @@ class Add_File(Cmdlet): # Many add-file flows don't emit intermediate items, so without steps the pipe can look "stuck". use_steps = False steps_started = False - step2_done = False - step3_done = False try: ui, _ = progress.ui_and_pipe_index() use_steps = (ui is not None) and (len(items_to_process) == 1) + if use_steps: + progress.begin_steps(5) + steps_started = True except Exception: use_steps = False @@ -545,10 +546,8 @@ class Add_File(Cmdlet): temp_dir_to_cleanup: Optional[Path] = None delete_after_item = delete_after try: - if use_steps and (not steps_started): - progress.begin_steps(5) + if use_steps and steps_started: progress.step("resolving source") - steps_started = True media_path, file_hash, temp_dir_to_cleanup = self._resolve_source( item, path_arg, pipe_obj, config, store_instance=storage_registry @@ -560,32 +559,20 @@ class Add_File(Cmdlet): failures += 1 continue + # Update pipe_obj with resolved path + pipe_obj.path = str(media_path) + + # When using -path (filesystem export), allow all file types. + # When using -store (backend), restrict to SUPPORTED_MEDIA_EXTENSIONS. + allow_all_files = not (location and is_storage_backend_location) + if not self._validate_source(media_path, allow_all_extensions=allow_all_files): + failures += 1 + continue + if use_steps and steps_started: - progress.step("hashing file") - - # Update pipe_obj with resolved path - pipe_obj.path = str(media_path) - - # When using -path (filesystem export), allow all file types. - # When using -store (backend), restrict to SUPPORTED_MEDIA_EXTENSIONS. - allow_all_files = not (location and is_storage_backend_location) - if not self._validate_source(media_path, allow_all_extensions=allow_all_files): - failures += 1 - continue - - if use_steps and steps_started and (not step2_done): + if not file_hash: + progress.step("hashing file") progress.step("ingesting file") - step2_done = True - - # Update pipe_obj with resolved path - pipe_obj.path = str(media_path) - - # When using -path (filesystem export), allow all file types. - # When using -store (backend), restrict to SUPPORTED_MEDIA_EXTENSIONS. - allow_all_files = not (location and is_storage_backend_location) - if not self._validate_source(media_path, allow_all_extensions=allow_all_files): - failures += 1 - continue if provider_name: if str(provider_name).strip().lower() == "matrix": @@ -690,9 +677,8 @@ class Add_File(Cmdlet): seen: set[str] = set() hashes = [h for h in hashes if not (h in seen or seen.add(h))] - if use_steps and steps_started and (not step3_done): + if use_steps and steps_started: progress.step("refreshing display") - step3_done = True refreshed_items = Add_File._try_emit_search_file_by_hashes( store=str(location), @@ -700,6 +686,7 @@ class Add_File(Cmdlet): config=config, store_instance=storage_registry, ) + debug(f"[add-file] Internal refresh returned refreshed_items count={len(refreshed_items) if refreshed_items else 0}") if not refreshed_items: # Fallback: at least show the add-file payloads as a display overlay from SYS.result_table import ResultTable @@ -756,7 +743,7 @@ class Add_File(Cmdlet): from cmdlet.search_file import CMDLET as search_file_cmdlet query = "hash:" + ",".join(hashes) - args = ["-store", str(store), query] + args = ["-store", str(store), "-internal-refresh", query] debug(f'[add-file] Refresh: search-file -store {store} "{query}"') # Run search-file under a temporary stage context so its ctx.emit() calls diff --git a/cmdlet/search_file.py b/cmdlet/search_file.py index 5d3c062..6731696 100644 --- a/cmdlet/search_file.py +++ b/cmdlet/search_file.py @@ -258,9 +258,6 @@ class search_file(Cmdlet): try: results_list: List[Dict[str, Any]] = [] - from SYS import result_table - - importlib.reload(result_table) from SYS.result_table import ResultTable provider_text = str(provider_name or "").strip() @@ -453,8 +450,8 @@ class search_file(Cmdlet): args_list = [str(arg) for arg in (args or [])] refresh_mode = any( - str(a).strip().lower() in {"--refresh", - "-refresh"} for a in args_list + str(a).strip().lower() in {"--refresh", "-refresh", "-internal-refresh"} + for a in args_list ) def _format_command_title(command: str, raw_args: List[str]) -> str: @@ -470,7 +467,7 @@ class search_file(Cmdlet): cleaned = [ str(a) for a in (raw_args or []) - if str(a).strip().lower() not in {"--refresh", "-refresh"} + if str(a).strip().lower() not in {"--refresh", "-refresh", "-internal-refresh"} ] if not cleaned: return command @@ -626,6 +623,10 @@ class search_file(Cmdlet): continue if not library_root: + # Internal refreshes should not trigger config panels or stop progress. + if "-internal-refresh" in args_list: + return 1 + from SYS import pipeline as ctx_mod progress = None if hasattr(ctx_mod, "get_pipeline_state"): @@ -641,19 +642,16 @@ class search_file(Cmdlet): # Use context manager to ensure database is always closed with API_folder_store(library_root) as db: try: - db.insert_worker( - worker_id, - "search-file", - title=f"Search: {query}", - description=f"Query: {query}", - pipe=ctx.get_current_command_text(), - ) + if "-internal-refresh" not in args_list: + db.insert_worker( + worker_id, + "search-file", + title=f"Search: {query}", + description=f"Query: {query}", + pipe=ctx.get_current_command_text(), + ) results_list = [] - from SYS import result_table - import importlib - - importlib.reload(result_table) from SYS.result_table import ResultTable table = ResultTable(command_title) @@ -802,6 +800,16 @@ class search_file(Cmdlet): if found_any: table.title = command_title + # Add-file refresh quality-of-life: if exactly 1 item is being refreshed, + # show the detailed item panel instead of a single-row table. + if refresh_mode and len(results_list) == 1: + try: + from SYS.rich_display import render_item_details_panel + render_item_details_panel(results_list[0]) + table._rendered_by_cmdlet = True + except Exception: + pass + if refresh_mode: ctx.set_last_result_table_preserve_history( table, @@ -918,6 +926,15 @@ class search_file(Cmdlet): table.title = command_title + # If exactly 1 item is being refreshed, show the detailed item panel. + if refresh_mode and len(results_list) == 1: + try: + from SYS.rich_display import render_item_details_panel + render_item_details_panel(results_list[0]) + table._rendered_by_cmdlet = True + except Exception: + pass + if refresh_mode: ctx.set_last_result_table_preserve_history(table, results_list) else: