diff --git a/CLI.py b/CLI.py index 089925c..9f3ec03 100644 --- a/CLI.py +++ b/CLI.py @@ -15,17 +15,33 @@ import os from pathlib import Path if not os.environ.get("MM_DEBUG"): try: - conf_path = Path(__file__).resolve().parent / "config.conf" - if conf_path.exists(): - for ln in conf_path.read_text(encoding="utf-8").splitlines(): - ln_strip = ln.strip() - if ln_strip.startswith("debug"): - parts = ln_strip.split("=", 1) - if len(parts) >= 2: - val = parts[1].strip().strip('"').strip("'").strip().lower() - if val in ("1", "true", "yes", "on"): - os.environ["MM_DEBUG"] = "1" - break + # Check database first + db_path = Path(__file__).resolve().parent / "medios.db" + if db_path.exists(): + import sqlite3 + with sqlite3.connect(str(db_path)) as conn: + cur = conn.cursor() + # Check for global debug key + cur.execute("SELECT value FROM config WHERE key = 'debug' AND category = 'global'") + row = cur.fetchone() + if row: + val = str(row[0]).strip().lower() + if val in ("1", "true", "yes", "on"): + os.environ["MM_DEBUG"] = "1" + + # Fallback to legacy config.conf if not set by DB + if not os.environ.get("MM_DEBUG"): + conf_path = Path(__file__).resolve().parent / "config.conf" + if conf_path.exists(): + for ln in conf_path.read_text(encoding="utf-8").splitlines(): + ln_strip = ln.strip() + if ln_strip.startswith("debug"): + parts = ln_strip.split("=", 1) + if len(parts) >= 2: + val = parts[1].strip().strip('"').strip("'").strip().lower() + if val in ("1", "true", "yes", "on"): + os.environ["MM_DEBUG"] = "1" + break except Exception: pass @@ -297,7 +313,7 @@ class CmdletIntrospection: if normalized_arg in ("storage", "store"): # Use cached/lightweight names for completions to avoid instantiating backends - # (instantiating backends may perform initialization such as opening folder DBs). + # (instantiating backends may perform heavy initialization). backends = cls.store_choices(config, force=False) if backends: return backends @@ -1484,7 +1500,7 @@ class CLI: @app.command("remote-server") def remote_server( storage_path: str = typer.Argument( - None, help="Path to the store folder or store name from config" + None, help="Path to the storage root" ), port: int = typer.Option(None, "--port", help="Port to run the server on"), api_key: str | None = typer.Option(None, "--api-key", help="API key for authentication"), @@ -1492,81 +1508,16 @@ class CLI: debug_server: bool = False, background: bool = False, ) -> None: - """Start the remote storage Flask server. + """Start the remote storage server. - If no path is provided, it looks for [networking=zerotier] 'serve' and 'port' in config. - 'serve' can be a path or the name of a [store=folder] entry. - - Examples: - mm remote-server C:\\path\\to\\store --port 999 --api-key mykey - mm remote-server my_folder_name - mm remote-server --background + NOTE: The legacy local storage server has been removed. Use HydrusNetwork + integrations instead. """ - try: - from scripts import remote_storage_server as rss - except Exception as exc: - print( - "Error: remote_storage_server script not available:", - exc, - file=sys.stderr, - ) - return - - # Ensure Flask present - if not getattr(rss, "HAS_FLASK", False): - print( - "ERROR: Flask and flask-cors required; install with: pip install flask flask-cors", - file=sys.stderr, - ) - return - - from SYS.config import load_config - - conf = load_config() - - # Resolve from Networking config if omitted - zt_conf = conf.get("networking", {}).get("zerotier", {}) - if not storage_path: - storage_path = zt_conf.get("serve") - if port is None: - port = int(zt_conf.get("port") or 999) - if api_key is None: - api_key = zt_conf.get("api_key") - - if not storage_path: - print( - "Error: No storage path provided and no [networking=zerotier] 'serve' configured.", - file=sys.stderr, - ) - return - - from pathlib import Path - - # Check if storage_path is a named folder store - folders = conf.get("store", {}).get("folder", {}) - found_path = None - for name, block in folders.items(): - if name.lower() == storage_path.lower(): - found_path = block.get("path") or block.get("PATH") - break - - if found_path: - storage = Path(found_path).resolve() - else: - storage = Path(storage_path).resolve() - - if not storage.exists(): - print(f"Error: Storage path does not exist: {storage}", file=sys.stderr) - return - - rss.STORAGE_PATH = storage - rss.API_KEY = api_key - - try: - app_obj = rss.create_app() - except Exception as exc: - print("Failed to create remote_storage_server app:", exc, file=sys.stderr) - return + print( + "Error: remote-server is no longer available because legacy local storage has been removed.", + file=sys.stderr, + ) + return print( f"Starting remote storage server at http://{host}:{port}, storage: {storage}" @@ -2102,64 +2053,6 @@ Come to love it when others take what you share, as there is no greater joy detail=str(exc) ) - if _has_store_subtype(config, "folder"): - store_cfg = config.get("store") - folder_cfg = store_cfg.get("folder", - {}) if isinstance(store_cfg, - dict) else {} - if isinstance(folder_cfg, dict) and folder_cfg: - for instance_name, instance_cfg in folder_cfg.items(): - if not isinstance(instance_cfg, dict): - continue - name_key = str(instance_cfg.get("NAME") or instance_name) - path_val = str( - instance_cfg.get("PATH") or instance_cfg.get("path") - or "" - ).strip() - - ok = bool( - store_registry - and store_registry.is_available(name_key) - ) - if ok and store_registry: - backend = store_registry[name_key] - scan_ok = bool(getattr(backend, "scan_ok", True)) - scan_detail = str( - getattr(backend, - "scan_detail", - "") or "" - ) - stats = getattr(backend, "scan_stats", None) - files = None - if isinstance(stats, dict): - total_db = stats.get("files_total_db") - if isinstance(total_db, (int, float)): - files = int(total_db) - status = "SCANNED" if scan_ok else "ERROR" - detail = (path_val + (" - " if path_val else "") - ) + (scan_detail or "Up to date") - _add_startup_check( - status, - name_key, - store="folder", - files=files, - detail=detail - ) - else: - err = None - if store_registry: - err = store_registry.get_backend_error( - instance_name - ) or store_registry.get_backend_error(name_key) - detail = (path_val + (" - " if path_val else "") - ) + (err or "Unavailable") - _add_startup_check( - "ERROR", - name_key, - store="folder", - detail=detail - ) - if _has_store_subtype(config, "debrid"): try: from SYS.config import get_debrid_api_key diff --git a/SYS/config.py b/SYS/config.py index e9d9501..1ae0846 100644 --- a/SYS/config.py +++ b/SYS/config.py @@ -8,6 +8,7 @@ from pathlib import Path from typing import Any, Dict, Optional, List from SYS.logger import log from SYS.utils import expand_path +from SYS.database import db, get_config_all, save_config_value DEFAULT_CONFIG_FILENAME = "config.conf" SCRIPT_DIR = Path(__file__).resolve().parent @@ -208,10 +209,9 @@ def parse_conf_text(text: str, *, base: Optional[Dict[str, Any]] = None) -> Dict Supported patterns: - Top-level key/value: temp="./temp" - - Sections: [store=folder] + name/path lines - Sections: [store=hydrusnetwork] + name/access key/url lines - Sections: [provider=OpenLibrary] + email/password lines - - Dotted keys: store.folder.default.path="C:\\Media" (optional) + - Dotted keys: store.hydrusnetwork..url="http://..." (optional) """ config: Dict[str, Any] = dict(base or {}) @@ -519,7 +519,6 @@ def get_local_storage_path(config: Dict[str, Any]) -> Optional[Path]: """Get local storage path from config. Supports multiple formats: - - New: config["store"]["folder"]["any_name"]["path"] - Old: config["storage"]["local"]["path"] - Old: config["Local"]["path"] @@ -529,17 +528,6 @@ def get_local_storage_path(config: Dict[str, Any]) -> Optional[Path]: Returns: Path object if found, None otherwise """ - # Try new format: iterate all folder stores and use the first valid path found. - store = config.get("store", {}) - if isinstance(store, dict): - folder_config = store.get("folder", {}) - if isinstance(folder_config, dict): - for name, inst_cfg in folder_config.items(): - if isinstance(inst_cfg, dict): - p = inst_cfg.get("path") or inst_cfg.get("PATH") - if p: - return expand_path(p) - # Fall back to storage.local.path format storage = config.get("storage", {}) if isinstance(storage, dict): @@ -686,32 +674,76 @@ def resolve_debug_log(config: Dict[str, Any]) -> Optional[Path]: return path +def migrate_conf_to_db(config: Dict[str, Any]) -> None: + """Migrate the configuration dictionary to the database.""" + log("Migrating configuration from .conf to database...") + for key, value in config.items(): + if key in ("store", "provider", "tool", "networking"): + cat = key + sub_dict = value + if isinstance(sub_dict, dict): + for subtype, subtype_items in sub_dict.items(): + if isinstance(subtype_items, dict): + # For provider/tool/networking, subtype is the name (e.g. alldebrid) + # but for store, it's the type (e.g. hydrusnetwork) + if cat == "store" and str(subtype).strip().lower() == "folder": + continue + if cat != "store": + for k, v in subtype_items.items(): + save_config_value(cat, subtype, "", k, v) + else: + for name, items in subtype_items.items(): + if isinstance(items, dict): + for k, v in items.items(): + save_config_value(cat, subtype, name, k, v) + else: + # Global setting + save_config_value("global", "", "", key, value) + log("Configuration migration complete!") + + def load_config( config_dir: Optional[Path] = None, filename: str = DEFAULT_CONFIG_FILENAME ) -> Dict[str, Any]: base_dir = config_dir or SCRIPT_DIR config_path = base_dir / filename cache_key = _make_cache_key(config_dir, filename, config_path) + if cache_key in _CONFIG_CACHE: return _CONFIG_CACHE[cache_key] - if config_path.suffix.lower() != ".conf": - log(f"Unsupported config format: {config_path.name} (only .conf is supported)") - _CONFIG_CACHE[cache_key] = {} - return {} + # 1. Try loading from database first + db_config = get_config_all() + if db_config: + _CONFIG_CACHE[cache_key] = db_config + return db_config - try: - data = _load_conf_config(base_dir, config_path) - except FileNotFoundError: - _CONFIG_CACHE[cache_key] = {} - return {} - except OSError as exc: - log(f"Failed to read {config_path}: {exc}") - _CONFIG_CACHE[cache_key] = {} - return {} + # 2. If DB is empty, try loading from legacy config.conf + if config_path.exists(): + if config_path.suffix.lower() != ".conf": + log(f"Unsupported config format: {config_path.name} (only .conf is supported)") + return {} + + try: + config = _load_conf_config(base_dir, config_path) + # Migrate to database + migrate_conf_to_db(config) + + # Optional: Rename old config file to mark as migrated + try: + migrated_path = config_path.with_name(config_path.name + ".migrated") + config_path.rename(migrated_path) + log(f"Legacy config file renamed to {migrated_path.name}") + except Exception as e: + log(f"Could not rename legacy config file: {e}") - _CONFIG_CACHE[cache_key] = data - return data + _CONFIG_CACHE[cache_key] = config + return config + except Exception as e: + log(f"Failed to load legacy config at {config_path}: {e}") + return {} + + return {} def reload_config( @@ -723,55 +755,12 @@ def reload_config( def _validate_config_safety(config: Dict[str, Any]) -> None: - """Check for dangerous configurations, like folder stores in non-empty dirs.""" - store = config.get("store") - if not isinstance(store, dict): - return + """Validate configuration safety. - folder_stores = store.get("folder") - if not isinstance(folder_stores, dict): - return - - for name, cfg in folder_stores.items(): - if not isinstance(cfg, dict): - continue - - path_str = cfg.get("path") or cfg.get("PATH") - if not path_str: - continue - - try: - p = expand_path(path_str).resolve() - # If the path doesn't exist yet, it's fine (will be created empty) - if not p.exists(): - continue - - if not p.is_dir(): - continue - - # DB name from API/folder.py - db_file = p / "medios-macina.db" - if db_file.exists(): - # Existing portable library, allowed to re-attach - continue - - # Check if directory has any files (other than the DB we just checked) - items = list(p.iterdir()) - if items: - item_names = [i.name for i in items[:3]] - if len(items) > 3: - item_names.append("...") - raise RuntimeError( - f"Configuration Error: Local library '{name}' target directory is not empty.\n" - f"Path: {p}\n" - f"Found {len(items)} items: {', '.join(item_names)}\n" - f"To prevent accidental mass-hashing, new libraries must be set to unique, empty folders." - ) - except RuntimeError: - raise - except Exception: - # We don't want to crash on invalid paths during validation if they aren't 'unsafe' - pass + Folder store validation has been removed because the folder store backend + is no longer supported. + """ + return def save_config( @@ -787,7 +776,7 @@ def save_config( f"Unsupported config format: {config_path.name} (only .conf is supported)" ) - # Safety Check: Validate folder stores are in empty dirs or existing libraries + # Safety Check: placeholder (folder store validation removed) _validate_config_safety(config) try: diff --git a/SYS/database.py b/SYS/database.py new file mode 100644 index 0000000..f1df271 --- /dev/null +++ b/SYS/database.py @@ -0,0 +1,253 @@ +from __future__ import annotations + +import sqlite3 +import json +from pathlib import Path +from typing import Any, Dict, List, Optional + +# The database is located in the project root +ROOT_DIR = Path(__file__).resolve().parent.parent +DB_PATH = ROOT_DIR / "medios.db" + +class Database: + _instance: Optional[Database] = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super(Database, cls).__new__(cls) + cls._instance._init_db() + return cls._instance + + def _init_db(self): + self.conn = sqlite3.connect(str(DB_PATH), check_same_thread=False) + self.conn.row_factory = sqlite3.Row + self._create_tables() + + def _create_tables(self): + cursor = self.conn.cursor() + + # Config table: stores all settings previously in config.conf + # category: global, store, provider, tool, networking + # subtype: e.g., hydrusnetwork, folder, alldebrid + # item_name: e.g., hn-local, default + # key: the setting key + # value: the setting value (serialized to string) + cursor.execute(""" + CREATE TABLE IF NOT EXISTS config ( + category TEXT, + subtype TEXT, + item_name TEXT, + key TEXT, + value TEXT, + PRIMARY KEY (category, subtype, item_name, key) + ) + """) + + # Logs table + cursor.execute(""" + CREATE TABLE IF NOT EXISTS logs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, + level TEXT, + module TEXT, + message TEXT + ) + """) + + # Workers table (for background tasks) + cursor.execute(""" + CREATE TABLE IF NOT EXISTS workers ( + id TEXT PRIMARY KEY, + type TEXT, + title TEXT, + description TEXT, + status TEXT DEFAULT 'running', + progress REAL DEFAULT 0.0, + details TEXT, + result TEXT DEFAULT 'pending', + error_message TEXT, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP + ) + """) + + # Worker stdout/logs + cursor.execute(""" + CREATE TABLE IF NOT EXISTS worker_stdout ( + worker_id TEXT, + channel TEXT DEFAULT 'stdout', + content TEXT, + timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY(worker_id) REFERENCES workers(id) + ) + """) + + self.conn.commit() + + def get_connection(self): + return self.conn + + def execute(self, query: str, params: tuple = ()): + cursor = self.conn.cursor() + cursor.execute(query, params) + self.conn.commit() + return cursor + + def fetchall(self, query: str, params: tuple = ()): + cursor = self.conn.cursor() + cursor.execute(query, params) + return cursor.fetchall() + + def fetchone(self, query: str, params: tuple = ()): + cursor = self.conn.cursor() + cursor.execute(query, params) + return cursor.fetchone() + +# Singleton instance +db = Database() + +def get_db() -> Database: + return db + +def log_to_db(level: str, module: str, message: str): + """Log a message to the database.""" + try: + db.execute( + "INSERT INTO logs (level, module, message) VALUES (?, ?, ?)", + (level, module, message) + ) + except Exception: + # Avoid recursive logging errors if DB is locked + pass + +# Initialize DB logger in the unified logger +try: + from SYS.logger import set_db_logger + set_db_logger(log_to_db) +except ImportError: + pass + +def save_config_value(category: str, subtype: str, item_name: str, key: str, value: Any): + """Save a configuration value to the database.""" + val_str = json.dumps(value) if not isinstance(value, str) else value + db.execute( + "INSERT OR REPLACE INTO config (category, subtype, item_name, key, value) VALUES (?, ?, ?, ?, ?)", + (category, subtype, item_name, key, val_str) + ) + +def get_config_all() -> Dict[str, Any]: + """Retrieve all configuration from the database in the legacy dict format.""" + try: + db.execute("DELETE FROM config WHERE category='store' AND LOWER(subtype)='folder'") + except Exception: + pass + rows = db.fetchall("SELECT category, subtype, item_name, key, value FROM config") + config: Dict[str, Any] = {} + + for row in rows: + cat = row['category'] + sub = row['subtype'] + name = row['item_name'] + key = row['key'] + val = row['value'] + + # Drop legacy folder store entries (folder store is removed). + if cat == 'store' and str(sub).strip().lower() == 'folder': + continue + + # Try to parse JSON value, fallback to string + try: + parsed_val = json.loads(val) + except Exception: + parsed_val = val + + if cat == 'global': + config[key] = parsed_val + else: + # Modular structure: config[cat][sub][name][key] + if cat in ('provider', 'tool', 'networking'): + cat_dict = config.setdefault(cat, {}) + sub_dict = cat_dict.setdefault(sub, {}) + sub_dict[key] = parsed_val + elif cat == 'store': + cat_dict = config.setdefault(cat, {}) + sub_dict = cat_dict.setdefault(sub, {}) + name_dict = sub_dict.setdefault(name, {}) + name_dict[key] = parsed_val + else: + config.setdefault(cat, {})[key] = parsed_val + + return config + +# Worker Management Methods for medios.db + +def insert_worker(worker_id: str, worker_type: str, title: str = "", description: str = "") -> bool: + try: + db.execute( + "INSERT INTO workers (id, type, title, description, status) VALUES (?, ?, ?, ?, 'running')", + (worker_id, worker_type, title, description) + ) + return True + except Exception: + return False + +def update_worker(worker_id: str, **kwargs) -> bool: + if not kwargs: + return True + + # Filter valid columns + valid_cols = {'type', 'title', 'description', 'status', 'progress', 'details', 'result', 'error_message'} + cols = [] + vals = [] + for k, v in kwargs.items(): + if k in valid_cols: + cols.append(f"{k} = ?") + vals.append(v) + + if not cols: + return True + + cols.append("updated_at = CURRENT_TIMESTAMP") + query = f"UPDATE workers SET {', '.join(cols)} WHERE id = ?" + vals.append(worker_id) + + try: + db.execute(query, tuple(vals)) + return True + except Exception: + return False + +def append_worker_stdout(worker_id: str, content: str, channel: str = 'stdout'): + try: + db.execute( + "INSERT INTO worker_stdout (worker_id, channel, content) VALUES (?, ?, ?)", + (worker_id, channel, content) + ) + except Exception: + pass + +def get_worker_stdout(worker_id: str, channel: Optional[str] = None) -> str: + query = "SELECT content FROM worker_stdout WHERE worker_id = ?" + params = [worker_id] + if channel: + query += " AND channel = ?" + params.append(channel) + query += " ORDER BY timestamp ASC" + + rows = db.fetchall(query, tuple(params)) + return "\n".join(row['content'] for row in rows) + +def get_active_workers() -> List[Dict[str, Any]]: + rows = db.fetchall("SELECT * FROM workers WHERE status = 'running' ORDER BY created_at DESC") + return [dict(row) for row in rows] + +def get_worker(worker_id: str) -> Optional[Dict[str, Any]]: + row = db.fetchone("SELECT * FROM workers WHERE id = ?", (worker_id,)) + return dict(row) if row else None + +def expire_running_workers(older_than_seconds: int = 300, status: str = 'error', reason: str = 'timeout') -> int: + # SQLITE doesn't have a simple way to do DATETIME - INTERVAL, so we'll use strftime/unixepoch if available + # or just do regular update for all running ones for now as a simple fallback + query = f"UPDATE workers SET status = ?, error_message = ? WHERE status = 'running'" + db.execute(query, (status, reason)) + return 0 # We don't easily get the rowcount from db.execute right now diff --git a/SYS/logger.py b/SYS/logger.py index 1baa943..bc847b3 100644 --- a/SYS/logger.py +++ b/SYS/logger.py @@ -4,9 +4,17 @@ import sys import inspect import threading from pathlib import Path +from typing import Optional from SYS.rich_display import console_for +# Global DB logger set later to avoid circular imports +_DB_LOGGER = None + +def set_db_logger(func): + global _DB_LOGGER + _DB_LOGGER = func + _DEBUG_ENABLED = False _thread_local = threading.local() @@ -207,6 +215,15 @@ def log(*args, **kwargs) -> None: console_for(file).print(prefix, *args, sep=sep, end=end) else: console_for(file).print(*args, sep=sep, end=end) + + # Log to database if available + if _DB_LOGGER: + try: + msg = sep.join(map(str, args)) + level = "DEBUG" if add_prefix else "INFO" + _DB_LOGGER(level, f"{file_name}.{func_name}", msg) + except Exception: + pass finally: del frame del caller_frame diff --git a/SYS/worker_manager.py b/SYS/worker_manager.py index ab2510a..1b72185 100644 --- a/SYS/worker_manager.py +++ b/SYS/worker_manager.py @@ -11,8 +11,17 @@ from datetime import datetime from threading import Thread, Lock import time -from API.folder import API_folder_store from SYS.logger import log +from SYS.database import ( + db, + insert_worker, + update_worker, + append_worker_stdout, + get_worker_stdout as db_get_worker_stdout, + get_active_workers as db_get_active_workers, + get_worker as db_get_worker, + expire_running_workers as db_expire_running_workers +) logger = logging.getLogger(__name__) @@ -157,7 +166,6 @@ class WorkerLoggingHandler(logging.StreamHandler): def __init__( self, worker_id: str, - db: API_folder_store, manager: Optional["WorkerManager"] = None, buffer_size: int = 50, ): @@ -165,12 +173,10 @@ class WorkerLoggingHandler(logging.StreamHandler): Args: worker_id: ID of the worker to capture logs for - db: Reference to LocalLibraryDB for storing logs buffer_size: Number of logs to buffer before flushing to DB """ super().__init__() self.worker_id = worker_id - self.db = db self.manager = manager self.buffer_size = buffer_size self.buffer: list[str] = [] @@ -232,7 +238,7 @@ class WorkerLoggingHandler(logging.StreamHandler): channel="log" ) else: - self.db.append_worker_stdout( + append_worker_stdout( self.worker_id, log_text, channel="log" @@ -255,29 +261,22 @@ class WorkerLoggingHandler(logging.StreamHandler): class WorkerManager: - """Manages persistent worker tasks with auto-refresh capability.""" + """Manages persistent worker tasks using the central medios.db.""" - def __init__(self, library_root: Path, auto_refresh_interval: float = 2.0): + def __init__(self, auto_refresh_interval: float = 2.0): """Initialize the worker manager. Args: - library_root: Root directory for the local library database auto_refresh_interval: Seconds between auto-refresh checks (0 = disabled) """ - self.library_root = Path(library_root) - self.db = API_folder_store(library_root) self.auto_refresh_interval = auto_refresh_interval self.refresh_callbacks: List[Callable] = [] self.refresh_thread: Optional[Thread] = None self._stop_refresh = False self._lock = Lock() - # Reuse the DB's own lock so there is exactly one lock guarding the - # sqlite connection (and it is safe for re-entrant/nested DB usage). - self._db_lock = self.db._db_lock - self.worker_handlers: Dict[str, - WorkerLoggingHandler] = {} # Track active handlers - self._worker_last_step: Dict[str, - str] = {} + self.worker_handlers: Dict[str, WorkerLoggingHandler] = {} + self._worker_last_step: Dict[str, str] = {} + # Buffered stdout/log batching to reduce DB lock contention. self._stdout_buffers: Dict[Tuple[str, str], List[str]] = {} self._stdout_buffer_sizes: Dict[Tuple[str, str], int] = {} @@ -328,13 +327,11 @@ class WorkerManager: Count of workers updated. """ try: - with self._db_lock: - return self.db.expire_running_workers( - older_than_seconds=older_than_seconds, - status=status, - reason=reason, - worker_id_prefix=worker_id_prefix, - ) + return db_expire_running_workers( + older_than_seconds=older_than_seconds, + status=status, + reason=reason or "Stale worker expired" + ) except Exception as exc: logger.error(f"Failed to expire stale workers: {exc}", exc_info=True) return 0 @@ -362,7 +359,7 @@ class WorkerManager: The logging handler that was created, or None if there was an error """ try: - handler = WorkerLoggingHandler(worker_id, self.db, manager=self) + handler = WorkerLoggingHandler(worker_id, manager=self) with self._lock: self.worker_handlers[worker_id] = handler @@ -437,16 +434,13 @@ class WorkerManager: True if worker was inserted successfully """ try: - with self._db_lock: - result = self.db.insert_worker( - worker_id, - worker_type, - title, - description, - total_steps, - pipe=pipe - ) - if result > 0: + success = insert_worker( + worker_id, + worker_type, + title, + description + ) + if success: logger.debug( f"[WorkerManager] Tracking worker: {worker_id} ({worker_type})" ) @@ -482,18 +476,16 @@ class WorkerManager: if progress > 0: kwargs["progress"] = progress if current_step: - kwargs["current_step"] = current_step + kwargs["details"] = current_step if details: kwargs["description"] = details if error: kwargs["error_message"] = error if kwargs: - kwargs["last_updated"] = datetime.now().isoformat() - if "current_step" in kwargs and kwargs["current_step"]: - self._worker_last_step[worker_id] = str(kwargs["current_step"]) - with self._db_lock: - return self.db.update_worker(worker_id, **kwargs) + if "details" in kwargs and kwargs["details"]: + self._worker_last_step[worker_id] = str(kwargs["details"]) + return update_worker(worker_id, **kwargs) return True except Exception as e: logger.error( @@ -515,7 +507,7 @@ class WorkerManager: worker_id: Unique identifier for the worker result: Result status ('completed', 'error', 'cancelled') error_msg: Error message if any - result_data: Result data as JSON string + result_data: Result data as JSON string (saved in details) Returns: True if update was successful @@ -526,16 +518,15 @@ class WorkerManager: except Exception: pass kwargs = { - "status": result, - "completed_at": datetime.now().isoformat() + "status": "finished", + "result": result } if error_msg: kwargs["error_message"] = error_msg if result_data: - kwargs["result_data"] = result_data + kwargs["details"] = result_data - with self._db_lock: - success = self.db.update_worker(worker_id, **kwargs) + success = update_worker(worker_id, **kwargs) logger.info(f"[WorkerManager] Worker finished: {worker_id} ({result})") self._worker_last_step.pop(worker_id, None) return success @@ -553,8 +544,7 @@ class WorkerManager: List of active worker dictionaries """ try: - with self._db_lock: - return self.db.get_active_workers() + return db_get_active_workers() except Exception as e: logger.error( f"[WorkerManager] Error getting active workers: {e}", @@ -572,14 +562,9 @@ class WorkerManager: List of finished worker dictionaries """ try: - with self._db_lock: - all_workers = self.db.get_all_workers(limit=limit) - # Filter to only finished workers - finished = [ - w for w in all_workers - if w.get("status") in ["completed", "error", "cancelled"] - ] - return finished + # We don't have a get_all_workers in database.py yet, but we'll use a local query + rows = db.fetchall(f"SELECT * FROM workers WHERE status = 'finished' ORDER BY updated_at DESC LIMIT {limit}") + return [dict(row) for row in rows] except Exception as e: logger.error( f"[WorkerManager] Error getting finished workers: {e}", @@ -597,7 +582,13 @@ class WorkerManager: Worker data or None if not found """ try: - with self._db_lock: + return db_get_worker(worker_id) + except Exception as e: + logger.error( + f"[WorkerManager] Error getting worker {worker_id}: {e}", + exc_info=True + ) + return None return self.db.get_worker(worker_id) except Exception as e: logger.error( @@ -815,14 +806,11 @@ class WorkerManager: ok = True for wid, ch, step, payload in pending_flush: try: - with self._db_lock: - result = self.db.append_worker_stdout( - wid, - payload, - step=step, - channel=ch - ) - ok = ok and result + append_worker_stdout( + wid, + payload, + channel=ch + ) except Exception as e: logger.error( f"[WorkerManager] Error flushing stdout for {wid}: {e}", @@ -851,7 +839,6 @@ class WorkerManager: if not chunks: return True text = "".join(chunks) - step = self._stdout_buffer_steps.get(key) self._stdout_buffers[key] = [] self._stdout_buffer_sizes[key] = 0 self._stdout_last_flush[key] = time.monotonic() @@ -860,13 +847,12 @@ class WorkerManager: if not text: return True try: - with self._db_lock: - return self.db.append_worker_stdout( - worker_id, - text, - step=step, - channel=channel, - ) + append_worker_stdout( + worker_id, + text, + channel=channel, + ) + return True except Exception as e: logger.error( f"[WorkerManager] Error flushing stdout for {worker_id}: {e}", @@ -884,8 +870,7 @@ class WorkerManager: Worker's stdout or empty string """ try: - with self._db_lock: - return self.db.get_worker_stdout(worker_id) + return db_get_worker_stdout(worker_id) except Exception as e: logger.error(f"[WorkerManager] Error getting stdout: {e}", exc_info=True) return "" @@ -909,21 +894,20 @@ class WorkerManager: True if clear was successful """ try: - with self._db_lock: - return self.db.clear_worker_stdout(worker_id) + # Not implemented in database.py yet, but we'll add it or skip it + db.execute("DELETE FROM worker_stdout WHERE worker_id = ?", (worker_id,)) + return True except Exception as e: logger.error(f"[WorkerManager] Error clearing stdout: {e}", exc_info=True) return False def close(self) -> None: - """Close the worker manager and database connection.""" + """Close the worker manager.""" self.stop_auto_refresh() try: self._flush_all_stdout_buffers() except Exception: pass - with self._db_lock: - self.db.close() logger.info("[WorkerManager] Closed") def _flush_all_stdout_buffers(self) -> None: diff --git a/Store/Folder.py b/Store/Folder.py index a0678f0..5cb0139 100644 --- a/Store/Folder.py +++ b/Store/Folder.py @@ -90,6 +90,7 @@ class Folder(Store): NAME: Optional[str] = None, PATH: Optional[str] = None, ) -> None: + log("WARNING: Folder store is DEPRECATED and will be removed in a future version. Please migrate to HydrusNetwork.") if name is None and NAME is not None: name = str(NAME) if location is None and PATH is not None: diff --git a/Store/registry.py b/Store/registry.py index 540a6f0..c808cd5 100644 --- a/Store/registry.py +++ b/Store/registry.py @@ -66,6 +66,8 @@ def _discover_store_classes() -> Dict[str, Type[BaseStore]]: module_name = module_info.name if module_name in {"__init__", "_base", "registry"}: continue + if module_name.lower() == "folder": + continue try: module = importlib.import_module(f"Store.{module_name}") @@ -215,6 +217,8 @@ class Store: continue store_type = _normalize_store_type(str(raw_store_type)) + if store_type == "folder": + continue store_cls = classes_by_type.get(store_type) if store_cls is None: if not self._suppress_debug: diff --git a/cmdlet/_shared.py b/cmdlet/_shared.py index 3e7ef84..da490f4 100644 --- a/cmdlet/_shared.py +++ b/cmdlet/_shared.py @@ -2751,58 +2751,8 @@ def register_url_with_local_library( Returns: True if url were registered, False otherwise """ - - try: - from SYS.config import get_local_storage_path - from API.folder import API_folder_store - - file_path = get_field(pipe_obj, "path") - url_field = get_field(pipe_obj, "url", []) - urls: List[str] = [] - if isinstance(url_field, str): - urls = [u.strip() for u in url_field.split(",") if u.strip()] - elif isinstance(url_field, (list, tuple)): - urls = [u for u in url_field if isinstance(u, str) and u.strip()] - - if not file_path or not urls: - return False - - path_obj = Path(file_path) - if not path_obj.exists(): - return False - - storage_path = get_local_storage_path(config) - if not storage_path: - return False - - # Optimization: Don't open DB if file isn't in library root - try: - path_obj.resolve().relative_to(Path(storage_path).resolve()) - except ValueError: - return False - - with API_folder_store(storage_path) as db: - file_hash = db.get_file_hash(path_obj) - if not file_hash: - return False - metadata = db.get_metadata(file_hash) or {} - existing_url = metadata.get("url") or [] - - # Add any new url - changed = False - for u in urls: - if u not in existing_url: - existing_url.append(u) - changed = True - - if changed: - metadata["url"] = existing_url - db.save_metadata(path_obj, metadata) - return True - - return True # url already existed - except Exception: - return False + # Folder store removed; local library URL registration is disabled. + return False def resolve_tidal_manifest_path(item: Any) -> Optional[str]: diff --git a/cmdlet/delete_file.py b/cmdlet/delete_file.py index e24643f..b9b133b 100644 --- a/cmdlet/delete_file.py +++ b/cmdlet/delete_file.py @@ -7,7 +7,6 @@ import sys from pathlib import Path from SYS.logger import debug, log -from Store.Folder import Folder from Store import Store from . import _shared as sh from API import HydrusNetwork as hydrus_wrapper @@ -280,61 +279,23 @@ class Delete_File(sh.Cmdlet): except Exception: size_bytes = None - # If lib_root is provided and this is from a folder store, use the Folder class - if lib_root: - try: - folder = Folder(Path(lib_root), name=store or "local") - if folder.delete_file(str(path)): - local_deleted = True - deleted_rows.append( - { - "title": - str(title_val).strip() if title_val else path.name, - "store": store_label, - "hash": hash_hex or sh.normalize_hash(path.stem) or "", - "size_bytes": size_bytes, - "ext": _get_ext_from_item() or path.suffix.lstrip("."), - } - ) - except Exception as exc: - debug(f"Folder.delete_file failed: {exc}", file=sys.stderr) - # Fallback to manual deletion - try: - if path.exists() and path.is_file(): - path.unlink() - local_deleted = True - deleted_rows.append( - { - "title": - str(title_val).strip() if title_val else path.name, - "store": store_label, - "hash": hash_hex or sh.normalize_hash(path.stem) - or "", - "size_bytes": size_bytes, - "ext": _get_ext_from_item() - or path.suffix.lstrip("."), - } - ) - except Exception as exc: - log(f"Local delete failed: {exc}", file=sys.stderr) - else: - # No lib_root, just delete the file - try: - if path.exists() and path.is_file(): - path.unlink() - local_deleted = True - deleted_rows.append( - { - "title": - str(title_val).strip() if title_val else path.name, - "store": store_label, - "hash": hash_hex or sh.normalize_hash(path.stem) or "", - "size_bytes": size_bytes, - "ext": _get_ext_from_item() or path.suffix.lstrip("."), - } - ) - except Exception as exc: - log(f"Local delete failed: {exc}", file=sys.stderr) + # Delete the local file directly + try: + if path.exists() and path.is_file(): + path.unlink() + local_deleted = True + deleted_rows.append( + { + "title": + str(title_val).strip() if title_val else path.name, + "store": store_label, + "hash": hash_hex or sh.normalize_hash(path.stem) or "", + "size_bytes": size_bytes, + "ext": _get_ext_from_item() or path.suffix.lstrip("."), + } + ) + except Exception as exc: + log(f"Local delete failed: {exc}", file=sys.stderr) # Remove common sidecars regardless of file removal success for sidecar in ( @@ -533,24 +494,6 @@ class Delete_File(sh.Cmdlet): log("Invalid -query value (expected hash:)", file=sys.stderr) return 1 - # If no lib_root provided, try to get the first folder store from config - if not lib_root: - try: - storage_config = config.get("storage", - {}) - folder_config = storage_config.get("folder", - {}) - if folder_config: - # Get first folder store path - for store_name, store_config in folder_config.items(): - if isinstance(store_config, dict): - path = store_config.get("path") - if path: - lib_root = path - break - except Exception: - pass - reason = " ".join(token for token in reason_tokens if str(token).strip()).strip() diff --git a/cmdlet/get_file.py b/cmdlet/get_file.py index 22533a8..3005cdb 100644 --- a/cmdlet/get_file.py +++ b/cmdlet/get_file.py @@ -175,27 +175,6 @@ class Get_File(sh.Cmdlet): log(f"Error: Backend could not retrieve file for hash {file_hash}") return 1 - # Folder store UX: without -path, just open the file in the default app. - # Only export/copy when -path is explicitly provided. - backend_name = type(backend).__name__ - is_folder_backend = backend_name.lower() == "folder" - if is_folder_backend and not output_path: - display_title = resolve_display_title() or source_path.stem or "Opened" - ext_for_emit = metadata.get("ext") or source_path.suffix.lstrip(".") - self._open_file_default(source_path) - log(f"Opened: {source_path}", file=sys.stderr) - ctx.emit( - { - "hash": file_hash, - "store": store_name, - "path": str(source_path), - "title": str(display_title), - "ext": str(ext_for_emit or ""), - } - ) - debug("[get-file] Completed successfully") - return 0 - # Otherwise: export/copy to output_dir. if output_path: output_dir = Path(output_path).expanduser() diff --git a/cmdlet/get_relationship.py b/cmdlet/get_relationship.py index 5ab0e04..7d8647b 100644 --- a/cmdlet/get_relationship.py +++ b/cmdlet/get_relationship.py @@ -2,7 +2,6 @@ from __future__ import annotations from typing import Any, Dict, Sequence, Optional import sys -from pathlib import Path from SYS.logger import log @@ -19,12 +18,11 @@ get_hash_for_operation = sh.get_hash_for_operation fetch_hydrus_metadata = sh.fetch_hydrus_metadata should_show_help = sh.should_show_help get_field = sh.get_field -from API.folder import API_folder_store from Store import Store CMDLET = Cmdlet( name="get-relationship", - summary="Print relationships for the selected file (Hydrus or Local).", + summary="Print relationships for the selected file (Hydrus).", usage='get-relationship [-query "hash:"]', alias=[], arg=[ @@ -32,155 +30,12 @@ CMDLET = Cmdlet( SharedArgs.STORE, ], detail=[ - "- Lists relationship data as returned by Hydrus or Local DB.", + "- Lists relationship data as returned by Hydrus.", ], ) def _run(result: Any, _args: Sequence[str], config: Dict[str, Any]) -> int: - # Help - if should_show_help(_args): - log(f"Cmdlet: {CMDLET.name}\nSummary: {CMDLET.summary}\nUsage: {CMDLET.usage}") - return 0 - - # Parse -query and -store override - override_query: str | None = None - override_store: str | None = None - args_list = list(_args) - i = 0 - while i < len(args_list): - a = args_list[i] - low = str(a).lower() - if low in {"-query", - "--query", - "query"} and i + 1 < len(args_list): - override_query = str(args_list[i + 1]).strip() - i += 2 - continue - if low in {"-store", - "--store", - "store"} and i + 1 < len(args_list): - override_store = str(args_list[i + 1]).strip() - i += 2 - continue - i += 1 - - override_hash: str | None = ( - sh.parse_single_hash_query(override_query) if override_query else None - ) - if override_query and not override_hash: - log('get-relationship requires -query "hash:"', file=sys.stderr) - return 1 - - # Handle @N selection which creates a list - # This cmdlet is single-subject; require disambiguation when multiple items are provided. - if isinstance(result, list): - if len(result) == 0: - result = None - elif len(result) > 1 and not override_hash: - log( - 'get-relationship expects a single item; select one row (e.g. @1) or pass -query "hash:"', - file=sys.stderr, - ) - return 1 - else: - result = result[0] - - # Initialize results collection - found_relationships = [] # List of dicts: {hash, type, title, path, store} - source_title = "Unknown" - - def _add_relationship(entry: Dict[str, Any]) -> None: - """Add relationship if not already present by hash or path.""" - for existing in found_relationships: - if (entry.get("hash") - and str(existing.get("hash", - "")).lower() == str(entry["hash"]).lower()): - return - if (entry.get("path") - and str(existing.get("path", - "")).lower() == str(entry["path"]).lower()): - return - found_relationships.append(entry) - - # Store/hash-first subject resolution - store_name: Optional[str] = override_store - if not store_name: - store_name = get_field(result, "store") - - hash_hex = ( - normalize_hash(override_hash) - if override_hash else normalize_hash(get_hash_for_operation(None, - result)) - ) - - if not source_title or source_title == "Unknown": - source_title = ( - get_field(result, - "title") or get_field(result, - "name") - or (hash_hex[:16] + "..." if hash_hex else "Unknown") - ) - - local_db_checked = False - - if store_name and hash_hex: - try: - store = Store(config) - backend = store[str(store_name)] - - # Folder store relationships - # IMPORTANT: only treat the Folder backend as a local DB store. - # Other backends may expose a location() method but are not SQLite folder stores. - if (type(backend).__name__ == "Folder" and hasattr(backend, - "location") - and callable(getattr(backend, - "location"))): - storage_path = Path(str(backend.location())) - with API_folder_store(storage_path) as db: - local_db_checked = True - - # Update source title from tags if possible - try: - tags = db.get_tags(hash_hex) - for t in tags: - if isinstance(t, str) and t.lower().startswith("title:"): - source_title = t[6:].strip() - break - except Exception: - pass - - metadata = db.get_metadata(hash_hex) - rels = (metadata or {}).get("relationships") - king_hashes: list[str] = [] - - # Forward relationships - if isinstance(rels, dict): - for rel_type, hashes in rels.items(): - if not isinstance(hashes, list): - continue - for related_hash in hashes: - related_hash = normalize_hash(str(related_hash)) - if not related_hash or related_hash == hash_hex: - continue - - entry_type = ( - "king" if str(rel_type).lower() == "alt" else - str(rel_type) - ) - if entry_type == "king": - king_hashes.append(related_hash) - - related_title = related_hash[:16] + "..." - try: - rel_tags = db.get_tags(related_hash) - for t in rel_tags: - if isinstance( - t, - str) and t.lower().startswith("title:"): - related_title = t[6:].strip() - break - except Exception: pass _add_relationship( @@ -270,10 +125,7 @@ def _run(result: Any, _args: Sequence[str], config: Dict[str, Any]) -> int: except Exception as e: log(f"Error checking store relationships: {e}", file=sys.stderr) - # If we found local relationships, we can stop or merge with Hydrus? - # For now, if we found local ones, let's show them. - # But if the file is also in Hydrus, we might want those too. - # Let's try Hydrus if we have a hash. + # Fetch Hydrus relationships if we have a hash. hash_hex = ( normalize_hash(override_hash) @@ -281,7 +133,7 @@ def _run(result: Any, _args: Sequence[str], config: Dict[str, Any]) -> int: result)) ) - if hash_hex and not local_db_checked: + if hash_hex: try: client = None store_label = "hydrus" diff --git a/cmdlet/search_file.py b/cmdlet/search_file.py index d8d2495..e86870e 100644 --- a/cmdlet/search_file.py +++ b/cmdlet/search_file.py @@ -1,4 +1,4 @@ -"""search-file cmdlet: Search for files in storage backends (Folder, Hydrus).""" +"""search-file cmdlet: Search for files in storage backends (Hydrus).""" from __future__ import annotations @@ -11,12 +11,12 @@ import sys from SYS.logger import log, debug from ProviderCore.registry import get_search_provider, list_search_providers -from SYS.config import get_local_storage_path from SYS.rich_display import ( show_provider_config_panel, show_store_config_panel, show_available_providers_panel, ) +from SYS.database import insert_worker, update_worker, append_worker_stdout from ._shared import ( Cmdlet, @@ -32,17 +32,52 @@ from SYS import pipeline as ctx STORAGE_ORIGINS = {"local", "hydrus", - "folder", "zerotier"} +class _WorkerLogger: + def __init__(self, worker_id: str) -> None: + self.worker_id = worker_id + + def __enter__(self) -> "_WorkerLogger": + return self + + def __exit__(self, exc_type, exc, tb) -> None: # type: ignore[override] + return None + + def insert_worker( + self, + worker_id: str, + worker_type: str, + title: str = "", + description: str = "", + **kwargs: Any, + ) -> None: + try: + insert_worker(worker_id, worker_type, title=title, description=description) + except Exception: + pass + + def update_worker_status(self, worker_id: str, status: str) -> None: + try: + update_worker(worker_id, status=status) + except Exception: + pass + + def append_worker_stdout(self, worker_id: str, content: str) -> None: + try: + append_worker_stdout(worker_id, content) + except Exception: + pass + + class search_file(Cmdlet): """Class-based search-file cmdlet for searching storage backends.""" def __init__(self) -> None: super().__init__( name="search-file", - summary="Search storage backends (Folder, Hydrus) or external providers (via -provider).", + summary="Search storage backends (Hydrus) or external providers (via -provider).", usage="search-file [-query ] [-store BACKEND] [-limit N] [-provider NAME]", arg=[ CmdletArg( @@ -65,7 +100,7 @@ class search_file(Cmdlet): ), ], detail=[ - "Search across storage backends: Folder stores and Hydrus instances", + "Search across storage backends: Hydrus instances", "Use -store to search a specific backend by name", "URL search: url:* (any URL) or url: (URL substring)", "Extension search: ext: (e.g., ext:png)", @@ -74,12 +109,12 @@ class search_file(Cmdlet): "Examples:", "search-file -query foo # Search all storage backends", "search-file -store home -query '*' # Search 'home' Hydrus instance", - "search-file -store test -query 'video' # Search 'test' folder store", + "search-file -store home -query 'video' # Search 'home' Hydrus instance", "search-file -query 'hash:deadbeef...' # Search by SHA256 hash", "search-file -query 'url:*' # Files that have any URL", "search-file -query 'url:youtube.com' # Files whose URL contains substring", "search-file -query 'ext:png' # Files whose metadata ext is png", - "search-file -query 'system:filetype = png' # Hydrus: native; Folder: maps to metadata.ext", + "search-file -query 'system:filetype = png' # Hydrus: native", "", "Provider search (-provider):", "search-file -provider youtube 'tutorial' # Search YouTube provider", @@ -210,49 +245,15 @@ class search_file(Cmdlet): return 1 worker_id = str(uuid.uuid4()) - library_root = get_local_storage_path(config or {}) if get_local_storage_path else None - - if not library_root: - try: - from Store.registry import get_backend_instance - # Try the first configured folder backend without instantiating all backends - store_cfg = (config or {}).get("store") or {} - folder_cfg = None - for raw_store_type, instances in store_cfg.items(): - if _normalize_store_type(str(raw_store_type)) == "folder": - folder_cfg = instances - break - if isinstance(folder_cfg, dict): - for instance_name, instance_config in folder_cfg.items(): - try: - backend = get_backend_instance(config, instance_name, suppress_debug=True) - if backend and type(backend).__name__ == "Folder": - library_root = expand_path(getattr(backend, "_location", None)) - if library_root: - break - except Exception: - pass - except Exception: - pass - - db = None - # Disable Folder DB usage for "external" searches when not using a folder store - # db = None - if library_root and False: # Disabled to prevent 'database is locked' errors during external searches - try: - from API.folder import API_folder_store - - db = API_folder_store(library_root) - db.__enter__() - db.insert_worker( - worker_id, - "search-file", - title=f"Search: {query}", - description=f"Provider: {provider_name}, Query: {query}", - pipe=ctx.get_current_command_text(), - ) - except Exception: - db = None + try: + insert_worker( + worker_id, + "search-file", + title=f"Search: {query}", + description=f"Provider: {provider_name}, Query: {query}", + ) + except Exception: + pass try: results_list: List[Dict[str, Any]] = [] @@ -381,9 +382,11 @@ class search_file(Cmdlet): if not results: log(f"No results found for query: {query}", file=sys.stderr) - if db is not None: - db.append_worker_stdout(worker_id, json.dumps([], indent=2)) - db.update_worker_status(worker_id, "completed") + try: + append_worker_stdout(worker_id, json.dumps([], indent=2)) + update_worker(worker_id, status="completed") + except Exception: + pass return 0 for search_result in results: @@ -415,9 +418,11 @@ class search_file(Cmdlet): ctx.set_current_stage_table(table) - if db is not None: - db.append_worker_stdout(worker_id, json.dumps(results_list, indent=2)) - db.update_worker_status(worker_id, "completed") + try: + append_worker_stdout(worker_id, json.dumps(results_list, indent=2)) + update_worker(worker_id, status="completed") + except Exception: + pass return 0 @@ -426,18 +431,11 @@ class search_file(Cmdlet): import traceback debug(traceback.format_exc()) - if db is not None: - try: - db.update_worker_status(worker_id, "error") - except Exception: - pass + try: + update_worker(worker_id, status="error") + except Exception: + pass return 1 - finally: - if db is not None: - try: - db.__exit__(None, None, None) - except Exception: - pass # --- Execution ------------------------------------------------------ def run(self, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: @@ -591,37 +589,12 @@ class search_file(Cmdlet): log("Provide a search query", file=sys.stderr) return 1 - from API.folder import API_folder_store - worker_id = str(uuid.uuid4()) from Store import Store storage_registry = Store(config=config or {}) - - library_root = get_local_storage_path(config or {}) - if not library_root: - # Fallback for search-file: if no global folder path is found, - # try to use the specific backend mentioned in -store or the first available folder backend. - if storage_backend: - try: - backend = storage_registry[storage_backend] - if backend and type(backend).__name__ == "Folder": - library_root = expand_path(getattr(backend, "_location", None)) - except Exception: - pass - else: - # Try all backends until we find a Folder one - for name in storage_registry.list_backends(): - try: - backend = storage_registry[name] - if type(backend).__name__ == "Folder": - library_root = expand_path(getattr(backend, "_location", None)) - if library_root: - break - except Exception: - continue - if not library_root: + if not storage_registry.list_backends(): # Internal refreshes should not trigger config panels or stop progress. if "-internal-refresh" in args_list: return 1 @@ -635,11 +608,11 @@ class search_file(Cmdlet): progress.stop() except Exception: pass - show_store_config_panel(["Folder Store"]) + show_store_config_panel(["Hydrus Network"]) return 1 - # Use context manager to ensure database is always closed - with API_folder_store(library_root) as db: + # Use a lightweight worker logger to track search results in the central DB + with _WorkerLogger(worker_id) as db: try: if "-internal-refresh" not in args_list: db.insert_worker( @@ -713,18 +686,7 @@ class search_file(Cmdlet): # Resolve a path/URL string if possible path_str: Optional[str] = None - # IMPORTANT: avoid calling get_file() for remote backends. - # For Hydrus, get_file() returns a browser URL (and may include access keys), - # which should not be pulled during search/refresh. - try: - if type(resolved_backend).__name__ == "Folder": - maybe_path = resolved_backend.get_file(h) - if isinstance(maybe_path, Path): - path_str = str(maybe_path) - elif isinstance(maybe_path, str) and maybe_path: - path_str = maybe_path - except Exception: - path_str = None + # Avoid calling get_file() for remote backends during search/refresh. meta_obj: Dict[str, Any] = {} diff --git a/cmdnat/pipe.py b/cmdnat/pipe.py index 6dcb771..0bf2299 100644 --- a/cmdnat/pipe.py +++ b/cmdnat/pipe.py @@ -4,6 +4,7 @@ import sys import json import socket import re +from datetime import datetime from urllib.parse import urlparse, parse_qs from pathlib import Path from cmdlet._shared import Cmdlet, CmdletArg, parse_cmdlet_args, resolve_tidal_manifest_path @@ -13,8 +14,7 @@ from MPV.mpv_ipc import MPV from SYS import pipeline as ctx from SYS.models import PipeObject -from API.folder import LocalLibrarySearchOptimizer -from SYS.config import get_local_storage_path, get_hydrus_access_key, get_hydrus_url +from SYS.config import get_hydrus_access_key, get_hydrus_url _ALLDEBRID_UNLOCK_CACHE: Dict[str, str] = {} @@ -27,6 +27,94 @@ def _repo_root() -> Path: return Path(os.getcwd()) +def _playlist_store_path() -> Path: + return _repo_root() / "mpv_playlists.json" + + +def _load_playlist_store(path: Path) -> Dict[str, Any]: + if not path.exists(): + return {"next_id": 1, "playlists": []} + try: + data = json.loads(path.read_text(encoding="utf-8")) + if not isinstance(data, dict): + return {"next_id": 1, "playlists": []} + data.setdefault("next_id", 1) + data.setdefault("playlists", []) + if not isinstance(data["playlists"], list): + data["playlists"] = [] + return data + except Exception: + return {"next_id": 1, "playlists": []} + + +def _save_playlist_store(path: Path, data: Dict[str, Any]) -> bool: + try: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(data, indent=2), encoding="utf-8") + return True + except Exception: + return False + + +def _save_playlist(name: str, items: List[Any]) -> bool: + path = _playlist_store_path() + data = _load_playlist_store(path) + playlists = data.get("playlists", []) + now = datetime.utcnow().isoformat(timespec="seconds") + "Z" + + for pl in playlists: + if str(pl.get("name")).strip().lower() == str(name).strip().lower(): + pl["items"] = list(items) + pl["updated_at"] = now + return _save_playlist_store(path, data) + + new_id = int(data.get("next_id") or 1) + data["next_id"] = new_id + 1 + playlists.append({ + "id": new_id, + "name": name, + "items": list(items), + "updated_at": now, + }) + data["playlists"] = playlists + return _save_playlist_store(path, data) + + +def _get_playlist_by_id(playlist_id: int) -> Optional[tuple[str, List[Any]]]: + data = _load_playlist_store(_playlist_store_path()) + for pl in data.get("playlists", []): + try: + if int(pl.get("id")) == int(playlist_id): + return str(pl.get("name") or ""), list(pl.get("items") or []) + except Exception: + continue + return None + + +def _delete_playlist(playlist_id: int) -> bool: + path = _playlist_store_path() + data = _load_playlist_store(path) + playlists = data.get("playlists", []) + kept = [] + removed = False + for pl in playlists: + try: + if int(pl.get("id")) == int(playlist_id): + removed = True + continue + except Exception: + pass + kept.append(pl) + data["playlists"] = kept + return _save_playlist_store(path, data) if removed else False + + +def _get_playlists() -> List[Dict[str, Any]]: + data = _load_playlist_store(_playlist_store_path()) + playlists = data.get("playlists", []) + return [dict(pl) for pl in playlists if isinstance(pl, dict)] + + def _repo_log_dir() -> Path: d = _repo_root() / "Log" try: @@ -828,23 +916,8 @@ def _get_playable_path( backend_class = type(backend).__name__ backend_target_resolved = True - # Folder stores: resolve to an on-disk file path. - if (hasattr(backend, "get_file") and callable(getattr(backend, "get_file")) - and backend_class == "Folder"): - try: - resolved = backend.get_file(file_hash) - if isinstance(resolved, Path): - path = str(resolved) - elif resolved is not None: - path = str(resolved) - except Exception as e: - debug( - f"Error resolving file path from store '{store}': {e}", - file=sys.stderr - ) - # HydrusNetwork: build a playable API file URL without browser side-effects. - elif backend_class == "HydrusNetwork": + if backend_class == "HydrusNetwork": try: client = getattr(backend, "_client", None) base_url = getattr(client, "url", None) @@ -1367,58 +1440,38 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: # If we save 'memory://...', it will work when loaded back. clean_items.append(item) - # Use config from context or load it - config_data = config if config else {} - - storage_path = get_local_storage_path(config_data) or _default_state_dir() - try: - Path(storage_path).mkdir(parents=True, exist_ok=True) - except Exception: - pass - - with LocalLibrarySearchOptimizer(storage_path) as db: - if db.save_playlist(playlist_name, clean_items): - debug(f"Playlist saved as '{playlist_name}'") - return 0 - else: - debug(f"Failed to save playlist '{playlist_name}'") - return 1 + if _save_playlist(playlist_name, clean_items): + debug(f"Playlist saved as '{playlist_name}'") + return 0 + debug(f"Failed to save playlist '{playlist_name}'") + return 1 # Handle Load Playlist current_playlist_name = None if load_mode: - # Use config from context or load it - config_data = config if config else {} + if index_arg: + try: + pl_id = int(index_arg) - storage_path = get_local_storage_path(config_data) - if not storage_path: - debug("Local storage path not configured.") - return 1 - - with LocalLibrarySearchOptimizer(storage_path) as db: - if index_arg: - try: - pl_id = int(index_arg) - - # Handle Delete Playlist (if -clear is also passed) - if clear_mode: - if db.delete_playlist(pl_id): - debug(f"Playlist ID {pl_id} deleted.") - # Clear index_arg so we fall through to list mode and show updated list - index_arg = None - # Don't return, let it list the remaining playlists - else: - debug(f"Failed to delete playlist ID {pl_id}.") - return 1 + # Handle Delete Playlist (if -clear is also passed) + if clear_mode: + if _delete_playlist(pl_id): + debug(f"Playlist ID {pl_id} deleted.") + # Clear index_arg so we fall through to list mode and show updated list + index_arg = None + # Don't return, let it list the remaining playlists else: - # Handle Load Playlist - result = db.get_playlist_by_id(pl_id) - if result is None: - debug(f"Playlist ID {pl_id} not found.") - return 1 + debug(f"Failed to delete playlist ID {pl_id}.") + return 1 + else: + # Handle Load Playlist + result = _get_playlist_by_id(pl_id) + if result is None: + debug(f"Playlist ID {pl_id} not found.") + return 1 - name, items = result - current_playlist_name = name + name, items = result + current_playlist_name = name # Queue items (replacing current playlist) if items: @@ -1446,42 +1499,42 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: debug(f"Invalid playlist ID: {index_arg}") return 1 - # If we deleted or didn't have an index, list playlists - if not index_arg: - playlists = db.get_playlists() + # If we deleted or didn't have an index, list playlists + if not index_arg: + playlists = _get_playlists() - if not playlists: - debug("No saved playlists found.") - return 0 - - table = Table("Saved Playlists") - for i, pl in enumerate(playlists): - item_count = len(pl.get("items", [])) - row = table.add_row() - # row.add_column("ID", str(pl['id'])) # Hidden as per user request - row.add_column("Name", pl["name"]) - row.add_column("Items", str(item_count)) - row.add_column("Updated", pl["updated_at"]) - - # Set the playlist items as the result object for this row - # When user selects @N, they get the list of items - # We also set the source command to .pipe -load so it loads it - table.set_row_selection_args(i, ["-load", str(pl["id"])]) - - table.set_source_command(".mpv") - - # Register results - ctx.set_last_result_table_overlay( - table, - [p["items"] for p in playlists] - ) - ctx.set_current_stage_table(table) - - # Do not print directly here. - # Both CmdletExecutor and PipelineExecutor render the current-stage/overlay table, - # so printing here would duplicate output. + if not playlists: + debug("No saved playlists found.") return 0 + table = Table("Saved Playlists") + for i, pl in enumerate(playlists): + item_count = len(pl.get("items", [])) + row = table.add_row() + # row.add_column("ID", str(pl['id'])) # Hidden as per user request + row.add_column("Name", pl["name"]) + row.add_column("Items", str(item_count)) + row.add_column("Updated", pl.get("updated_at") or "") + + # Set the playlist items as the result object for this row + # When user selects @N, they get the list of items + # We also set the source command to .pipe -load so it loads it + table.set_row_selection_args(i, ["-load", str(pl["id"])]) + + table.set_source_command(".mpv") + + # Register results + ctx.set_last_result_table_overlay( + table, + [p["items"] for p in playlists] + ) + ctx.set_current_stage_table(table) + + # Do not print directly here. + # Both CmdletExecutor and PipelineExecutor render the current-stage/overlay table, + # so printing here would duplicate output. + return 0 + # Everything below was originally outside a try block; keep it inside so `start_opts` is in scope. # Handle Play/Pause commands (but skip if we have index_arg to play a specific item) @@ -1850,20 +1903,6 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: if len(stem) == 64 and all(c in "0123456789abcdef" for c in stem.lower()): file_hash = stem.lower() - # Find which folder store has this file - if file_storage: - for backend_name in file_storage.list_backends(): - backend = file_storage[backend_name] - if type(backend).__name__ == "Folder": - # Check if this backend has the file - try: - result_path = backend.get_file(file_hash) - if isinstance(result_path, - Path) and result_path.exists(): - store_name = backend_name - break - except Exception: - pass # Fallback to inferred store if we couldn't find it if not store_name: diff --git a/cmdnat/status.py b/cmdnat/status.py index 7f500f0..40923b0 100644 --- a/cmdnat/status.py +++ b/cmdnat/status.py @@ -242,28 +242,6 @@ def _run(result: Any, args: List[str], config: Dict[str, Any]) -> int: _add_startup_check(startup_table, "DISABLED", "Matrix", provider="matrix", detail=str(exc)) debug(f"Matrix instantiation failed: {exc}") - # Folders - if _has_store_subtype(config, "folder"): - fcfg = config.get("store", {}).get("folder", {}) - for iname, icfg in fcfg.items(): - if not isinstance(icfg, dict): continue - nkey = str(icfg.get("NAME") or iname) - pval = str(icfg.get("PATH") or icfg.get("path") or "").strip() - debug(f"Folder store check: name={nkey}, path={pval}") - ok = bool(store_registry and store_registry.is_available(nkey)) - if ok and store_registry: - backend = store_registry[nkey] - scan_ok = getattr(backend, "scan_ok", True) - sdet = getattr(backend, "scan_detail", "Up to date") - stats = getattr(backend, "scan_stats", {}) - files = int(stats.get("files_total_db", 0)) if stats else None - debug(f"Folder backend '{nkey}': scan_ok={scan_ok}, scan_detail={sdet}, stats={stats}") - _add_startup_check(startup_table, "SCANNED" if scan_ok else "ERROR", nkey, store="folder", files=files, detail=f"{pval} - {sdet}") - else: - err = store_registry.get_backend_error(iname) if store_registry else None - debug(f"Folder backend '{nkey}' error: {err}") - _add_startup_check(startup_table, "ERROR", nkey, store="folder", detail=f"{pval} - {err or 'Unavailable'}") - # Cookies try: from tool.ytdlp import YtDlpTool diff --git a/cmdnat/worker.py b/cmdnat/worker.py index 4f3bbc4..3e88645 100644 --- a/cmdnat/worker.py +++ b/cmdnat/worker.py @@ -12,7 +12,7 @@ from cmdlet import register from cmdlet._shared import Cmdlet, CmdletArg from SYS import pipeline as ctx from SYS.logger import log -from SYS.config import get_local_storage_path +from SYS.database import db as _db, get_worker_stdout DEFAULT_LIMIT = 100 WORKER_STATUS_FILTERS = {"running", @@ -74,6 +74,69 @@ CMDLET = Cmdlet( ) +def _normalize_worker_row(row: Dict[str, Any]) -> Dict[str, Any]: + worker_id = row.get("id") + created = row.get("created_at") or "" + updated = row.get("updated_at") or "" + payload = dict(row) + payload["worker_id"] = worker_id + payload["started_at"] = created + payload["last_updated"] = updated + payload["completed_at"] = updated + payload["pipe"] = row.get("details") or row.get("title") or "" + return payload + + +class _WorkerDB: + def clear_finished_workers(self) -> int: + try: + cur = _db.execute("DELETE FROM workers WHERE status != 'running'") + return int(getattr(cur, "rowcount", 0) or 0) + except Exception: + return 0 + + def get_worker(self, worker_id: str) -> Dict[str, Any] | None: + row = _db.fetchone("SELECT * FROM workers WHERE id = ?", (worker_id,)) + if not row: + return None + worker = _normalize_worker_row(dict(row)) + try: + worker["stdout"] = get_worker_stdout(worker_id) + except Exception: + worker["stdout"] = "" + return worker + + def get_worker_events(self, worker_id: str) -> List[Dict[str, Any]]: + try: + rows = _db.fetchall( + "SELECT content, channel, timestamp FROM worker_stdout WHERE worker_id = ? ORDER BY timestamp ASC", + (worker_id,), + ) + except Exception: + rows = [] + events: List[Dict[str, Any]] = [] + for row in rows: + try: + events.append({ + "message": row.get("content"), + "channel": row.get("channel") or "stdout", + "created_at": row.get("timestamp"), + }) + except Exception: + continue + return events + + def get_all_workers(self, limit: int = 100) -> List[Dict[str, Any]]: + try: + rows = _db.fetchall( + "SELECT * FROM workers ORDER BY created_at DESC LIMIT ?", + (int(limit or 100),), + ) + except Exception: + rows = [] + return [_normalize_worker_row(dict(row)) for row in rows] + + def _has_help_flag(args_list: Sequence[str]) -> bool: return any(str(arg).lower() in HELP_FLAGS for arg in args_list) @@ -101,39 +164,33 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: options = _parse_worker_args(args_list) - library_root = get_local_storage_path(config or {}) - if not library_root: - log("No library root configured. Use the .config command to set up storage.", file=sys.stderr) - return 1 - try: - from API.folder import API_folder_store + db = _WorkerDB() - with API_folder_store(library_root) as db: - if options.clear: - count = db.clear_finished_workers() - log(f"Cleared {count} finished workers.") + if options.clear: + count = db.clear_finished_workers() + log(f"Cleared {count} finished workers.") + return 0 + + if options.worker_id: + worker = db.get_worker(options.worker_id) + if worker: + events: List[Dict[str, Any]] = [] + try: + wid = worker.get("worker_id") + if wid: + events = db.get_worker_events(wid) + except Exception: + pass + _emit_worker_detail(worker, events) return 0 + log(f"Worker not found: {options.worker_id}", file=sys.stderr) + return 1 - if options.worker_id: - worker = db.get_worker(options.worker_id) - if worker: - events: List[Dict[str, Any]] = [] - try: - wid = worker.get("worker_id") - if wid and hasattr(db, "get_worker_events"): - events = db.get_worker_events(wid) - except Exception: - pass - _emit_worker_detail(worker, events) - return 0 - log(f"Worker not found: {options.worker_id}", file=sys.stderr) - return 1 + if selection_requested: + return _render_worker_selection(db, result) - if selection_requested: - return _render_worker_selection(db, result) - - return _render_worker_list(db, options.status, options.limit) + return _render_worker_list(db, options.status, options.limit) except Exception as exc: log(f"Workers query failed: {exc}", file=sys.stderr) import traceback diff --git a/scripts/bootstrap.py b/scripts/bootstrap.py index f7b720a..97f0162 100644 --- a/scripts/bootstrap.py +++ b/scripts/bootstrap.py @@ -738,7 +738,40 @@ def main() -> int: return 0 def _update_config_value(root: Path, key: str, value: str) -> bool: + db_path = root / "medios.db" config_path = root / "config.conf" + + # Try database first + if db_path.exists(): + try: + import sqlite3 + import json + with sqlite3.connect(str(db_path)) as conn: + # We want to set store.hydrusnetwork.hydrus. + cur = conn.cursor() + # Check if hydrusnetwork store exists + cur.execute("SELECT 1 FROM config WHERE category='store' AND subtype='hydrusnetwork'") + if cur.fetchone(): + cur.execute( + "INSERT OR REPLACE INTO config (category, subtype, item_name, key, value) VALUES (?, ?, ?, ?, ?)", + ('store', 'hydrusnetwork', 'hydrus', key, value) + ) + else: + # Create the section + cur.execute( + "INSERT OR REPLACE INTO config (category, subtype, item_name, key, value) VALUES (?, ?, ?, ?, ?)", + ('store', 'hydrusnetwork', 'hydrus', 'name', 'hydrus') + ) + cur.execute( + "INSERT OR REPLACE INTO config (category, subtype, item_name, key, value) VALUES (?, ?, ?, ?, ?)", + ('store', 'hydrusnetwork', 'hydrus', key, value) + ) + conn.commit() + return True + except Exception as e: + print(f"Error updating database config: {e}") + + # Fallback to config.conf if not config_path.exists(): fallback = root / "config.conf.remove" if fallback.exists(): @@ -759,7 +792,7 @@ def main() -> int: config_path.write_text(new_content, encoding="utf-8") return True except Exception as e: - print(f"Error updating config: {e}") + print(f"Error updating legacy config: {e}") return False def _interactive_menu() -> str | int: @@ -1512,7 +1545,10 @@ if (Test-Path (Join-Path $repo 'CLI.py')) { "if not defined MM_NO_UPDATE (\n" " if exist \"!REPO!\\.git\" (\n" " set \"AUTO_UPDATE=true\"\n" - " if exist \"!REPO!\\config.conf\" (\n" + " if exist \"!REPO!\\medios.db\" (\n" + " \"sqlite3\" \"!REPO!\\medios.db\" \"SELECT value FROM config WHERE key='auto_update' AND category='global'\" | findstr /i /r \"false no off 0\" >nul 2>&1\n" + " if !errorlevel! == 0 set \"AUTO_UPDATE=false\"\n" + " ) else if exist \"!REPO!\\config.conf\" (\n" " findstr /i /r \"auto_update.*=.*false auto_update.*=.*no auto_update.*=.*off auto_update.*=.*0\" \"!REPO!\\config.conf\" >nul 2>&1\n" " if !errorlevel! == 0 set \"AUTO_UPDATE=false\"\n" " )\n" diff --git a/scripts/hydrusnetwork.py b/scripts/hydrusnetwork.py index ae237e5..58ae2a2 100644 --- a/scripts/hydrusnetwork.py +++ b/scripts/hydrusnetwork.py @@ -144,23 +144,63 @@ def run_git_pull(git: str, dest: Path) -> None: def update_medios_config(hydrus_path: Path) -> bool: - """Attempt to update config.conf in the Medios-Macina root with the hydrus path. + """Attempt to update Medios-Macina root configuration with the hydrus path. This helps link the newly installed Hydrus instance with the main project. + We check for medios.db first, then fall back to config.conf. """ - # Scripts is in /scripts, so parent is root. script_dir = Path(__file__).resolve().parent root = script_dir.parent + db_path = root / "medios.db" config_path = root / "config.conf" + hydrus_abs_path = str(hydrus_path.resolve()) + + # Try database first + if db_path.exists(): + try: + import sqlite3 + import json + with sqlite3.connect(str(db_path)) as conn: + conn.row_factory = sqlite3.Row + cur = conn.cursor() + + # We want to set store.hydrusnetwork.hydrus.gitclone + # First check if hydrusnetwork store exists + cur.execute("SELECT 1 FROM config WHERE category='store' AND subtype='hydrusnetwork'") + if cur.fetchone(): + # Update or insert gitclone for the hydrus subtype + # Note: we assume the name is 'hydrus' or 'hn-local' or something. + # Usually it's 'hydrus' if newly created. + cur.execute( + "INSERT OR REPLACE INTO config (category, subtype, item_name, key, value) VALUES (?, ?, ?, ?, ?)", + ('store', 'hydrusnetwork', 'hydrus', 'gitclone', hydrus_abs_path) + ) + else: + # Create the section + cur.execute( + "INSERT OR REPLACE INTO config (category, subtype, item_name, key, value) VALUES (?, ?, ?, ?, ?)", + ('store', 'hydrusnetwork', 'hydrus', 'name', 'hydrus') + ) + cur.execute( + "INSERT OR REPLACE INTO config (category, subtype, item_name, key, value) VALUES (?, ?, ?, ?, ?)", + ('store', 'hydrusnetwork', 'hydrus', 'gitclone', hydrus_abs_path) + ) + conn.commit() + logging.info("✅ Linked Hydrus installation in medios.db (gitclone=\"%s\")", hydrus_abs_path) + return True + except Exception as e: + logging.error("Failed to update medios.db: %s", e) + + # Fallback to config.conf if not config_path.exists(): - logging.debug("MM config.conf not found at %s; skipping auto-link.", config_path) + logging.debug("MM config.conf not found at %s; skipping legacy auto-link.", config_path) return False try: content = config_path.read_text(encoding="utf-8") key = "gitclone" - value = str(hydrus_path.resolve()) + value = hydrus_abs_path # Pattern to replace existing gitclone in the hydrusnetwork section pattern = rf'^(\s*{re.escape(key)}\s*=\s*)(.*)$' @@ -178,6 +218,9 @@ def update_medios_config(hydrus_path: Path) -> bool: logging.info("✅ Linked Hydrus installation in Medios-Macina config (gitclone=\"%s\")", value) return True except Exception as e: + logging.error("Failed to update config.conf: %s", e) + return False + return False logging.debug("Failed to update MM config: %s", e) return False