""" """ from __future__ import annotations import re import tempfile import json from pathlib import Path from typing import Any, Dict, Optional, List from SYS.logger import log from SYS.utils import expand_path from SYS.database import db, get_config_all, save_config_value DEFAULT_CONFIG_FILENAME = "config.conf" SCRIPT_DIR = Path(__file__).resolve().parent _CONFIG_CACHE: Dict[str, Dict[str, Any]] = {} def global_config() -> List[Dict[str, Any]]: """Return configuration schema for global settings.""" return [ { "key": "debug", "label": "Debug Output", "default": "false", "choices": ["true", "false"] }, { "key": "auto_update", "label": "Auto-Update", "default": "true", "choices": ["true", "false"] } ] def clear_config_cache() -> None: """Clear the configuration cache.""" _CONFIG_CACHE.clear() def _make_cache_key(config_dir: Optional[Path], filename: str, actual_path: Optional[Path]) -> str: if actual_path: return str(actual_path.resolve()) base_dir = config_dir or SCRIPT_DIR return str((base_dir / filename).resolve()) def get_hydrus_instance( config: Dict[str, Any], instance_name: str = "home" ) -> Optional[Dict[str, Any]]: """Get a specific Hydrus instance config by name. Supports multiple formats: - Current: config["store"]["hydrusnetwork"][instance_name] - Legacy: config["storage"]["hydrus"][instance_name] - Old: config["HydrusNetwork"][instance_name] Args: config: Configuration dict instance_name: Name of the Hydrus instance (default: "home") Returns: Dict with access key and URL, or None if not found """ # Canonical: config["store"]["hydrusnetwork"]["home"] store = config.get("store", {}) if isinstance(store, dict): hydrusnetwork = store.get("hydrusnetwork", {}) if isinstance(hydrusnetwork, dict): instance = hydrusnetwork.get(instance_name) if isinstance(instance, dict): return instance return None def get_hydrus_access_key(config: Dict[str, Any], instance_name: str = "home") -> Optional[str]: """Get Hydrus access key for an instance. Config format: - config["store"]["hydrusnetwork"][name]["API"] Args: config: Configuration dict instance_name: Name of the Hydrus instance (default: "home") Returns: Access key string, or None if not found """ instance = get_hydrus_instance(config, instance_name) if instance: key = instance.get("API") return str(key).strip() if key else None return None def get_hydrus_url(config: Dict[str, Any], instance_name: str = "home") -> Optional[str]: """Get Hydrus URL for an instance. Config format: - config["store"]["hydrusnetwork"][name]["URL"] Args: config: Configuration dict instance_name: Name of the Hydrus instance (default: "home") Returns: URL string, or None if not found """ instance = get_hydrus_instance(config, instance_name) url = instance.get("URL") if instance else None return str(url).strip() if url else None def get_provider_block(config: Dict[str, Any], name: str) -> Dict[str, Any]: provider_cfg = config.get("provider") if not isinstance(provider_cfg, dict): return {} block = provider_cfg.get(str(name).strip().lower()) return block if isinstance(block, dict) else {} def get_soulseek_username(config: Dict[str, Any]) -> Optional[str]: block = get_provider_block(config, "soulseek") val = block.get("username") or block.get("USERNAME") return str(val).strip() if val else None def get_soulseek_password(config: Dict[str, Any]) -> Optional[str]: block = get_provider_block(config, "soulseek") val = block.get("password") or block.get("PASSWORD") return str(val).strip() if val else None def resolve_output_dir(config: Dict[str, Any]) -> Path: """Resolve output directory from config with single source of truth. Priority: 1. config["temp"] - explicitly set temp/output directory 2. config["outfile"] - fallback to outfile setting 3. System Temp - default fallback directory Returns: Path to output directory """ # First try explicit temp setting from config temp_value = config.get("temp") if temp_value: try: path = expand_path(temp_value) # Verify we can access it (not a system directory with permission issues) if path.exists() or path.parent.exists(): return path except Exception: pass # Then try outfile setting outfile_value = config.get("outfile") if outfile_value: try: return expand_path(outfile_value) except Exception: pass # Fallback to system temp directory return Path(tempfile.gettempdir()) def get_local_storage_path(config: Dict[str, Any]) -> Optional[Path]: """Get local storage path from config. Supports multiple formats: - Old: config["storage"]["local"]["path"] - Old: config["Local"]["path"] Args: config: Configuration dict Returns: Path object if found, None otherwise """ # Fall back to storage.local.path format storage = config.get("storage", {}) if isinstance(storage, dict): local_config = storage.get("local", {}) if isinstance(local_config, dict): path_str = local_config.get("path") if path_str: return expand_path(path_str) # Fall back to old Local format local_config = config.get("Local", {}) if isinstance(local_config, dict): path_str = local_config.get("path") if path_str: return expand_path(path_str) return None def get_debrid_api_key(config: Dict[str, Any], service: str = "All-debrid") -> Optional[str]: """Get Debrid API key from config. Config format: - config["store"]["debrid"][]["api_key"] where is the store name (e.g. "all-debrid") Args: config: Configuration dict service: Service name (default: "All-debrid") Returns: API key string if found, None otherwise """ store = config.get("store", {}) if not isinstance(store, dict): return None debrid_config = store.get("debrid", {}) if not isinstance(debrid_config, dict): return None service_key = str(service).strip().lower() entry = debrid_config.get(service_key) if isinstance(entry, dict): api_key = entry.get("api_key") return str(api_key).strip() if api_key else None if isinstance(entry, str): return entry.strip() or None return None def get_provider_credentials(config: Dict[str, Any], provider: str) -> Optional[Dict[str, str]]: """Get provider credentials (email/password) from config. Supports both formats: - New: config["provider"][provider] = {"email": "...", "password": "..."} - Old: config[provider.capitalize()] = {"email": "...", "password": "..."} Args: config: Configuration dict provider: Provider name (e.g., "openlibrary", "soulseek") Returns: Dict with credentials if found, None otherwise """ # Try new format first provider_config = config.get("provider", {}) if isinstance(provider_config, dict): creds = provider_config.get(provider.lower(), {}) if isinstance(creds, dict) and creds: return creds # Fall back to old format (capitalized key) old_key_map = { "openlibrary": "OpenLibrary", "archive": "Archive", "soulseek": "Soulseek", } old_key = old_key_map.get(provider.lower()) if old_key: creds = config.get(old_key, {}) if isinstance(creds, dict) and creds: return creds return None def resolve_cookies_path( config: Dict[str, Any], script_dir: Optional[Path] = None ) -> Optional[Path]: # Support both legacy top-level `cookies=...` and the modular conf style: # [tool=ytdlp] # cookies="C:\\path\\cookies.txt" values: list[Any] = [] try: values.append(config.get("cookies")) except Exception: pass try: tool = config.get("tool") if isinstance(tool, dict): ytdlp = tool.get("ytdlp") if isinstance(ytdlp, dict): values.append(ytdlp.get("cookies")) values.append(ytdlp.get("cookiefile")) except Exception: pass try: ytdlp_block = config.get("ytdlp") if isinstance(ytdlp_block, dict): values.append(ytdlp_block.get("cookies")) values.append(ytdlp_block.get("cookiefile")) except Exception: pass base_dir = script_dir or SCRIPT_DIR for value in values: if not value: continue candidate = expand_path(value) if not candidate.is_absolute(): candidate = expand_path(base_dir / candidate) if candidate.is_file(): return candidate default_path = base_dir / "cookies.txt" if default_path.is_file(): return default_path return None def resolve_debug_log(config: Dict[str, Any]) -> Optional[Path]: value = config.get("download_debug_log") if not value: return None path = expand_path(value) if not path.is_absolute(): path = Path.cwd() / path return path def load_config( config_dir: Optional[Path] = None, filename: str = DEFAULT_CONFIG_FILENAME ) -> Dict[str, Any]: base_dir = config_dir or SCRIPT_DIR config_path = base_dir / filename cache_key = _make_cache_key(config_dir, filename, config_path) if cache_key in _CONFIG_CACHE: return _CONFIG_CACHE[cache_key] # Load from database db_config = get_config_all() if db_config: _CONFIG_CACHE[cache_key] = db_config return db_config return {} def reload_config( config_dir: Optional[Path] = None, filename: str = DEFAULT_CONFIG_FILENAME ) -> Dict[str, Any]: cache_key = _make_cache_key(config_dir, filename, None) _CONFIG_CACHE.pop(cache_key, None) return load_config(config_dir=config_dir, filename=filename) def save_config( config: Dict[str, Any], config_dir: Optional[Path] = None, filename: str = DEFAULT_CONFIG_FILENAME, ) -> None: base_dir = config_dir or SCRIPT_DIR config_path = base_dir / filename # 1. Save to Database try: from SYS.database import db, save_config_value with db.transaction(): # Replace the table contents so removed entries disappear from the DB. db.execute("DELETE FROM config") for key, value in config.items(): if key in ('store', 'provider', 'tool'): if isinstance(value, dict): for subtype, instances in value.items(): if isinstance(instances, dict): # provider/tool are usually config[cat][subtype][key] # but store is config['store'][subtype][name][key] if key == 'store': for name, settings in instances.items(): if isinstance(settings, dict): for k, v in settings.items(): save_config_value(key, subtype, name, k, v) else: for k, v in instances.items(): save_config_value(key, subtype, "default", k, v) else: # global settings if not key.startswith("_"): save_config_value("global", "none", "none", key, value) except Exception as e: log(f"Failed to save config to database: {e}") cache_key = _make_cache_key(config_dir, filename, config_path) clear_config_cache() _CONFIG_CACHE[cache_key] = config def load() -> Dict[str, Any]: """Return the parsed downlow configuration.""" return load_config() def save(config: Dict[str, Any]) -> None: """Persist *config* back to disk.""" save_config(config)