"""Unified configuration helpers for downlow.""" from __future__ import annotations import json from pathlib import Path from typing import Any, Dict, Optional from pathlib import Path from helper.logger import log DEFAULT_CONFIG_FILENAME = "config.json" SCRIPT_DIR = Path(__file__).resolve().parent _CONFIG_CACHE: Dict[str, Dict[str, Any]] = {} def _make_cache_key(config_dir: Optional[Path], filename: str, actual_path: Optional[Path]) -> str: if actual_path: return str(actual_path.resolve()) base_dir = (config_dir or SCRIPT_DIR) return str((base_dir / filename).resolve()) def get_hydrus_instance(config: Dict[str, Any], instance_name: str = "home") -> Optional[Dict[str, Any]]: """Get a specific Hydrus instance config by name. Supports both formats: - New: config["storage"]["hydrus"][instance_name] = {"key": "...", "url": "..."} - Old: config["HydrusNetwork"][instance_name] = {"key": "...", "url": "..."} Args: config: Configuration dict instance_name: Name of the Hydrus instance (default: "home") Returns: Dict with "key" and "url" keys, or None if not found """ # Try new format first storage = config.get("storage", {}) if isinstance(storage, dict): hydrus_config = storage.get("hydrus", {}) if isinstance(hydrus_config, dict): instance = hydrus_config.get(instance_name) if isinstance(instance, dict): return instance # Fall back to old format hydrus_network = config.get("HydrusNetwork") if not isinstance(hydrus_network, dict): return None instance = hydrus_network.get(instance_name) if isinstance(instance, dict): return instance return None def get_hydrus_access_key(config: Dict[str, Any], instance_name: str = "home") -> Optional[str]: """Get Hydrus access key for an instance. Supports both old flat format and new nested format: - Old: config["HydrusNetwork_Access_Key"] - New: config["HydrusNetwork"][instance_name]["key"] Args: config: Configuration dict instance_name: Name of the Hydrus instance (default: "home") Returns: Access key string, or None if not found """ instance = get_hydrus_instance(config, instance_name) key = instance.get("key") if instance else config.get("HydrusNetwork_Access_Key") return str(key).strip() if key else None def get_hydrus_url(config: Dict[str, Any], instance_name: str = "home") -> Optional[str]: """Get Hydrus URL for an instance. Supports both old flat format and new nested format: - Old: config["HydrusNetwork_URL"] or constructed from IP/Port/HTTPS - New: config["HydrusNetwork"][instance_name]["url"] Args: config: Configuration dict instance_name: Name of the Hydrus instance (default: "home") Returns: URL string, or None if not found """ instance = get_hydrus_instance(config, instance_name) url = instance.get("url") if instance else config.get("HydrusNetwork_URL") if url: # Check if not None and not empty return str(url).strip() # Build from IP/Port/HTTPS if not found host = str(config.get("HydrusNetwork_IP") or "localhost").strip() or "localhost" port = str(config.get("HydrusNetwork_Port") or "45869").strip() scheme = "https" if str(config.get("HydrusNetwork_Use_HTTPS") or "").strip().lower() in {"1", "true", "yes", "on"} else "http" authority = host if not (":" in host and not host.startswith("[")) else f"[{host}]" return f"{scheme}://{authority}:{port}" def resolve_output_dir(config: Dict[str, Any]) -> Path: """Resolve output directory from config with single source of truth. Priority: 1. config["temp"] - explicitly set temp/output directory 2. config["outfile"] - fallback to outfile setting 3. Home/Videos - safe user directory fallback Returns: Path to output directory """ # First try explicit temp setting from config temp_value = config.get("temp") if temp_value: try: path = Path(str(temp_value)).expanduser() # Verify we can access it (not a system directory with permission issues) if path.exists() or path.parent.exists(): return path except Exception: pass # Then try outfile setting outfile_value = config.get("outfile") if outfile_value: try: return Path(str(outfile_value)).expanduser() except Exception: pass # Fallback to user's Videos directory return Path.home() / "Videos" def get_local_storage_path(config: Dict[str, Any]) -> Optional[Path]: """Get local storage path from config. Supports both formats: - New: config["storage"]["local"]["path"] - Old: config["Local"]["path"] Args: config: Configuration dict Returns: Path object if found, None otherwise """ # Try new format first storage = config.get("storage", {}) if isinstance(storage, dict): local_config = storage.get("local", {}) if isinstance(local_config, dict): path_str = local_config.get("path") if path_str: return Path(str(path_str)).expanduser() # Fall back to old format local_config = config.get("Local", {}) if isinstance(local_config, dict): path_str = local_config.get("path") if path_str: return Path(str(path_str)).expanduser() return None def get_debrid_api_key(config: Dict[str, Any], service: str = "All-debrid") -> Optional[str]: """Get Debrid API key from config. Supports both formats: - New: config["storage"]["debrid"]["All-debrid"] - Old: config["Debrid"]["All-debrid"] Args: config: Configuration dict service: Service name (default: "All-debrid") Returns: API key string if found, None otherwise """ # Try new format first storage = config.get("storage", {}) if isinstance(storage, dict): debrid_config = storage.get("debrid", {}) if isinstance(debrid_config, dict): api_key = debrid_config.get(service) if api_key: # Check if not None and not empty return str(api_key).strip() if api_key else None # Fall back to old format debrid_config = config.get("Debrid", {}) if isinstance(debrid_config, dict): api_key = debrid_config.get(service) if api_key: # Check if not None and not empty return str(api_key).strip() if api_key else None return None def get_provider_credentials(config: Dict[str, Any], provider: str) -> Optional[Dict[str, str]]: """Get provider credentials (email/password) from config. Supports both formats: - New: config["provider"][provider] = {"email": "...", "password": "..."} - Old: config[provider.capitalize()] = {"email": "...", "password": "..."} Args: config: Configuration dict provider: Provider name (e.g., "openlibrary", "soulseek") Returns: Dict with credentials if found, None otherwise """ # Try new format first provider_config = config.get("provider", {}) if isinstance(provider_config, dict): creds = provider_config.get(provider.lower(), {}) if isinstance(creds, dict) and creds: return creds # Fall back to old format (capitalized key) old_key_map = { "openlibrary": "OpenLibrary", "archive": "Archive", "soulseek": "Soulseek", } old_key = old_key_map.get(provider.lower()) if old_key: creds = config.get(old_key, {}) if isinstance(creds, dict) and creds: return creds return None def resolve_cookies_path(config: Dict[str, Any], script_dir: Optional[Path] = None) -> Optional[Path]: value = config.get("cookies") or config.get("Cookies_Path") if value: candidate = Path(str(value)).expanduser() if candidate.is_file(): return candidate base_dir = script_dir or SCRIPT_DIR default_path = base_dir / "cookies.txt" if default_path.is_file(): return default_path return None def resolve_debug_log(config: Dict[str, Any]) -> Optional[Path]: value = config.get("download_debug_log") if not value: return None path = Path(str(value)).expanduser() if not path.is_absolute(): path = Path.cwd() / path return path def load_config(config_dir: Optional[Path] = None, filename: str = DEFAULT_CONFIG_FILENAME) -> Dict[str, Any]: base_dir = config_dir or SCRIPT_DIR config_path = base_dir / filename cache_key = _make_cache_key(config_dir, filename, config_path) if cache_key in _CONFIG_CACHE: return _CONFIG_CACHE[cache_key] try: raw = config_path.read_text(encoding="utf-8") except FileNotFoundError: # Try alternate filename if default not found if filename == DEFAULT_CONFIG_FILENAME: alt_path = base_dir / "downlow.json" try: raw = alt_path.read_text(encoding="utf-8") config_path = alt_path cache_key = _make_cache_key(config_dir, filename, alt_path) except FileNotFoundError: _CONFIG_CACHE[cache_key] = {} return {} except OSError as exc: log(f"Failed to read {alt_path}: {exc}") _CONFIG_CACHE[cache_key] = {} return {} else: _CONFIG_CACHE[cache_key] = {} return {} except OSError as exc: log(f"Failed to read {config_path}: {exc}") _CONFIG_CACHE[cache_key] = {} return {} raw = raw.strip() if not raw: _CONFIG_CACHE[cache_key] = {} return {} try: data = json.loads(raw) except json.JSONDecodeError as exc: log(f"Invalid JSON in {config_path}: {exc}") _CONFIG_CACHE[cache_key] = {} return {} if not isinstance(data, dict): log(f"Expected object in {config_path}, got {type(data).__name__}") _CONFIG_CACHE[cache_key] = {} return {} _CONFIG_CACHE[cache_key] = data return data def reload_config(config_dir: Optional[Path] = None, filename: str = DEFAULT_CONFIG_FILENAME) -> Dict[str, Any]: cache_key = _make_cache_key(config_dir, filename, None) _CONFIG_CACHE.pop(cache_key, None) return load_config(config_dir=config_dir, filename=filename) def clear_config_cache() -> None: _CONFIG_CACHE.clear() def save_config( config: Dict[str, Any], config_dir: Optional[Path] = None, filename: str = DEFAULT_CONFIG_FILENAME, ) -> None: base_dir = config_dir or SCRIPT_DIR config_path = base_dir / filename # Load existing config to preserve keys that aren't being changed try: existing_raw = config_path.read_text(encoding="utf-8") existing_data = json.loads(existing_raw.strip()) if isinstance(existing_data, dict): # Merge: existing config as base, then overlay with new config merged = existing_data.copy() merged.update(config) config = merged except (FileNotFoundError, OSError, json.JSONDecodeError): # File doesn't exist or is invalid, use provided config as-is pass try: config_path.write_text( json.dumps(config, ensure_ascii=False, indent=2, sort_keys=True) + "\n", encoding="utf-8", ) except OSError as exc: raise RuntimeError(f"Failed to write config to {config_path}: {exc}") from exc cache_key = _make_cache_key(config_dir, filename, config_path) _CONFIG_CACHE[cache_key] = config def load() -> Dict[str, Any]: """Return the parsed downlow configuration.""" return load_config() def save(config: Dict[str, Any]) -> None: """Persist *config* back to disk.""" save_config(config)