""" """ from __future__ import annotations import json import sqlite3 import time from copy import deepcopy from pathlib import Path from typing import Any, Dict, List, Optional, Tuple from SYS.logger import log from SYS.utils import expand_path from SYS.database import db, get_config_all, save_config_value SCRIPT_DIR = Path(__file__).resolve().parent _CONFIG_CACHE: Dict[str, Any] = {} _LAST_SAVED_CONFIG: Dict[str, Any] = {} _CONFIG_SAVE_MAX_RETRIES = 5 _CONFIG_SAVE_RETRY_DELAY = 0.15 def global_config() -> List[Dict[str, Any]]: """Return configuration schema for global settings.""" return [ { "key": "debug", "label": "Debug Output", "default": "false", "choices": ["true", "false"] }, { "key": "auto_update", "label": "Auto-Update", "default": "true", "choices": ["true", "false"] } ] def clear_config_cache() -> None: """Clear the configuration cache and baseline snapshot.""" global _CONFIG_CACHE, _LAST_SAVED_CONFIG _CONFIG_CACHE = {} _LAST_SAVED_CONFIG = {} def get_hydrus_instance( config: Dict[str, Any], instance_name: str = "home" ) -> Optional[Dict[str, Any]]: """Get a specific Hydrus instance config by name. Supports modern config plus a fallback when no exact match exists. """ store = config.get("store", {}) if not isinstance(store, dict): return None hydrusnetwork = store.get("hydrusnetwork", {}) if not isinstance(hydrusnetwork, dict) or not hydrusnetwork: return None instance = hydrusnetwork.get(instance_name) if isinstance(instance, dict): return instance target = str(instance_name or "").lower() for name, conf in hydrusnetwork.items(): if isinstance(conf, dict) and str(name).lower() == target: return conf keys = sorted(hydrusnetwork.keys()) for key in keys: if not str(key or "").startswith("new_"): candidate = hydrusnetwork.get(key) if isinstance(candidate, dict): return candidate first_key = keys[0] candidate = hydrusnetwork.get(first_key) if isinstance(candidate, dict): return candidate return None def get_hydrus_access_key(config: Dict[str, Any], instance_name: str = "home") -> Optional[str]: """Get Hydrus access key for an instance. Config format: - config["store"]["hydrusnetwork"][name]["API"] Args: config: Configuration dict instance_name: Name of the Hydrus instance (default: "home") Returns: Access key string, or None if not found """ instance = get_hydrus_instance(config, instance_name) if instance: key = instance.get("API") return str(key).strip() if key else None return None def get_hydrus_url(config: Dict[str, Any], instance_name: str = "home") -> Optional[str]: """Get Hydrus URL for an instance. Config format: - config["store"]["hydrusnetwork"][name]["URL"] Args: config: Configuration dict instance_name: Name of the Hydrus instance (default: "home") Returns: URL string, or None if not found """ instance = get_hydrus_instance(config, instance_name) url = instance.get("URL") if instance else None return str(url).strip() if url else None def get_provider_block(config: Dict[str, Any], name: str) -> Dict[str, Any]: provider_cfg = config.get("provider") if not isinstance(provider_cfg, dict): return {} block = provider_cfg.get(str(name).strip().lower()) return block if isinstance(block, dict) else {} def get_soulseek_username(config: Dict[str, Any]) -> Optional[str]: block = get_provider_block(config, "soulseek") val = block.get("username") or block.get("USERNAME") return str(val).strip() if val else None def get_soulseek_password(config: Dict[str, Any]) -> Optional[str]: block = get_provider_block(config, "soulseek") val = block.get("password") or block.get("PASSWORD") return str(val).strip() if val else None def resolve_output_dir(config: Dict[str, Any]) -> Path: """Resolve output directory from config with single source of truth. Priority: 1. config["temp"] - explicitly set temp/output directory 2. config["outfile"] - fallback to outfile setting 3. System Temp - default fallback directory Returns: Path to output directory """ # First try explicit temp setting from config temp_value = config.get("temp") if temp_value: try: path = expand_path(temp_value) # Verify we can access it (not a system directory with permission issues) if path.exists() or path.parent.exists(): return path except Exception: pass # Then try outfile setting outfile_value = config.get("outfile") if outfile_value: try: return expand_path(outfile_value) except Exception: pass # Fallback to system temp directory return Path(tempfile.gettempdir()) def get_local_storage_path(config: Dict[str, Any]) -> Optional[Path]: """Get local storage path from config. Supports multiple formats: - Old: config["storage"]["local"]["path"] - Old: config["Local"]["path"] Args: config: Configuration dict Returns: Path object if found, None otherwise """ # Fall back to storage.local.path format storage = config.get("storage", {}) if isinstance(storage, dict): local_config = storage.get("local", {}) if isinstance(local_config, dict): path_str = local_config.get("path") if path_str: return expand_path(path_str) # Fall back to old Local format local_config = config.get("Local", {}) if isinstance(local_config, dict): path_str = local_config.get("path") if path_str: return expand_path(path_str) return None def get_debrid_api_key(config: Dict[str, Any], service: str = "All-debrid") -> Optional[str]: """Get Debrid API key from config. Config format: - config["store"]["debrid"][]["api_key"] where is the store name (e.g. "all-debrid") Args: config: Configuration dict service: Service name (default: "All-debrid") Returns: API key string if found, None otherwise """ store = config.get("store", {}) if not isinstance(store, dict): return None debrid_config = store.get("debrid", {}) if not isinstance(debrid_config, dict): return None service_key = str(service).strip().lower() entry = debrid_config.get(service_key) if isinstance(entry, dict): api_key = entry.get("api_key") return str(api_key).strip() if api_key else None if isinstance(entry, str): return entry.strip() or None return None def get_provider_credentials(config: Dict[str, Any], provider: str) -> Optional[Dict[str, str]]: """Get provider credentials (email/password) from config. Supports both formats: - New: config["provider"][provider] = {"email": "...", "password": "..."} - Old: config[provider.capitalize()] = {"email": "...", "password": "..."} Args: config: Configuration dict provider: Provider name (e.g., "openlibrary", "soulseek") Returns: Dict with credentials if found, None otherwise """ # Try new format first provider_config = config.get("provider", {}) if isinstance(provider_config, dict): creds = provider_config.get(provider.lower(), {}) if isinstance(creds, dict) and creds: return creds # Fall back to old format (capitalized key) old_key_map = { "openlibrary": "OpenLibrary", "archive": "Archive", "soulseek": "Soulseek", } old_key = old_key_map.get(provider.lower()) if old_key: creds = config.get(old_key, {}) if isinstance(creds, dict) and creds: return creds return None def resolve_cookies_path( config: Dict[str, Any], script_dir: Optional[Path] = None ) -> Optional[Path]: # Support both legacy top-level `cookies=...` and the modular conf style: # [tool=ytdlp] # cookies="C:\\path\\cookies.txt" values: list[Any] = [] try: values.append(config.get("cookies")) except Exception: pass try: tool = config.get("tool") if isinstance(tool, dict): ytdlp = tool.get("ytdlp") if isinstance(ytdlp, dict): values.append(ytdlp.get("cookies")) values.append(ytdlp.get("cookiefile")) except Exception: pass try: ytdlp_block = config.get("ytdlp") if isinstance(ytdlp_block, dict): values.append(ytdlp_block.get("cookies")) values.append(ytdlp_block.get("cookiefile")) except Exception: pass base_dir = script_dir or SCRIPT_DIR for value in values: if not value: continue candidate = expand_path(value) if not candidate.is_absolute(): candidate = expand_path(base_dir / candidate) if candidate.is_file(): return candidate default_path = base_dir / "cookies.txt" if default_path.is_file(): return default_path return None def resolve_debug_log(config: Dict[str, Any]) -> Optional[Path]: value = config.get("download_debug_log") if not value: return None path = expand_path(value) if not path.is_absolute(): path = Path.cwd() / path return path def _flatten_config_entries(config: Dict[str, Any]) -> Dict[Tuple[str, str, str, str], Any]: entries: Dict[Tuple[str, str, str, str], Any] = {} for key, value in config.items(): if key in ('store', 'provider', 'tool') and isinstance(value, dict): for subtype, instances in value.items(): if not isinstance(instances, dict): continue if key == 'store': for name, settings in instances.items(): if not isinstance(settings, dict): continue for k, v in settings.items(): entries[(key, subtype, name, k)] = v else: for k, v in instances.items(): entries[(key, subtype, 'default', k)] = v elif not key.startswith('_') and value is not None: entries[('global', 'none', 'none', key)] = value return entries def _count_changed_entries(old_config: Dict[str, Any], new_config: Dict[str, Any]) -> int: old_entries = _flatten_config_entries(old_config or {}) new_entries = _flatten_config_entries(new_config or {}) changed = {k for k, v in new_entries.items() if old_entries.get(k) != v} removed = {k for k in old_entries if k not in new_entries} return len(changed) + len(removed) def load_config() -> Dict[str, Any]: global _CONFIG_CACHE, _LAST_SAVED_CONFIG if _CONFIG_CACHE: return _CONFIG_CACHE # Load strictly from database db_config = get_config_all() if db_config: _CONFIG_CACHE = db_config _LAST_SAVED_CONFIG = deepcopy(db_config) return db_config _LAST_SAVED_CONFIG = {} return {} def reload_config() -> Dict[str, Any]: clear_config_cache() return load_config() def save_config(config: Dict[str, Any]) -> int: global _CONFIG_CACHE, _LAST_SAVED_CONFIG previous_config = deepcopy(_LAST_SAVED_CONFIG) changed_count = _count_changed_entries(previous_config, config) def _write_entries() -> int: count = 0 with db.transaction(): db.execute("DELETE FROM config") for key, value in config.items(): if key in ('store', 'provider', 'tool'): if isinstance(value, dict): for subtype, instances in value.items(): if isinstance(instances, dict): if key == 'store': for name, settings in instances.items(): if isinstance(settings, dict): for k, v in settings.items(): save_config_value(key, subtype, name, k, v) count += 1 else: for k, v in instances.items(): save_config_value(key, subtype, "default", k, v) count += 1 else: if not key.startswith("_") and value is not None: save_config_value("global", "none", "none", key, value) count += 1 return count saved_entries = 0 attempts = 0 while True: try: saved_entries = _write_entries() log( f"Synced {saved_entries} entries to {db.db_path} " f"({changed_count} changed entries)" ) break except sqlite3.OperationalError as exc: attempts += 1 locked_error = "locked" in str(exc).lower() if not locked_error or attempts >= _CONFIG_SAVE_MAX_RETRIES: log(f"CRITICAL: Database write failed: {exc}") raise delay = _CONFIG_SAVE_RETRY_DELAY * attempts log(f"Database locked; retry {attempts}/{_CONFIG_SAVE_MAX_RETRIES} in {delay:.2f}s") time.sleep(delay) except Exception as exc: log(f"CRITICAL: Configuration save failed: {exc}") raise clear_config_cache() _CONFIG_CACHE = config _LAST_SAVED_CONFIG = deepcopy(config) return saved_entries def load() -> Dict[str, Any]: """Return the parsed downlow configuration.""" return load_config() def save(config: Dict[str, Any]) -> int: """Persist *config* back to disk.""" return save_config(config)