""" """ from __future__ import annotations import re import tempfile from pathlib import Path from typing import Any, Dict, Optional from SYS.logger import log from SYS.utils import expand_path DEFAULT_CONFIG_FILENAME = "config.conf" SCRIPT_DIR = Path(__file__).resolve().parent _CONFIG_CACHE: Dict[str, Dict[str, Any]] = {} def global_config() -> List[Dict[str, Any]]: """Return configuration schema for global settings.""" return [ { "key": "debug", "label": "Debug Output", "default": "false", "choices": ["true", "false"] }, { "key": "auto_update", "label": "Auto-Update", "default": "true", "choices": ["true", "false"] } ] def clear_config_cache() -> None: """Clear the configuration cache.""" _CONFIG_CACHE.clear() def _strip_inline_comment(line: str) -> str: # Strip comments in a way that's friendly to common .conf usage: # - Full-line comments starting with '#' or ';' # - Inline comments starting with '#' or ';' *outside quotes* # (e.g. dtype="float16" # optional) stripped = line.strip() if not stripped: return "" if stripped.startswith("#") or stripped.startswith(";"): return "" in_single = False in_double = False for i, ch in enumerate(line): if ch == "'" and not in_double: in_single = not in_single continue if ch == '"' and not in_single: in_double = not in_double continue if in_single or in_double: continue if ch in {"#", ";"}: # Treat as a comment start only when preceded by whitespace. # This keeps values like paths or tokens containing '#' working # when quoted, and reduces surprises for unquoted values. if i == 0 or line[i - 1].isspace(): return line[:i].rstrip() return line def _parse_scalar(value: str) -> Any: v = value.strip() if not v: return "" if (v.startswith('"') and v.endswith('"')) or (v.startswith("'") and v.endswith("'")): return v[1:-1] low = v.lower() if low in {"true", "yes", "on", "1"}: return True if low in {"false", "no", "off", "0"}: return False if re.fullmatch(r"-?\d+", v): try: return int(v) except Exception: return v if re.fullmatch(r"-?\d+\.\d+", v): try: return float(v) except Exception: return v return v def _set_nested(d: Dict[str, Any], dotted_key: str, value: Any) -> None: parts = [p for p in dotted_key.split(".") if p] if not parts: return cur: Dict[str, Any] = d for p in parts[:-1]: nxt = cur.get(p) if not isinstance(nxt, dict): nxt = {} cur[p] = nxt cur = nxt cur[parts[-1]] = value def _merge_dict_inplace(base: Dict[str, Any], patch: Dict[str, Any]) -> Dict[str, Any]: for k, v in patch.items(): if isinstance(v, dict) and isinstance(base.get(k), dict): _merge_dict_inplace(base[k], v) # type: ignore[index] else: base[k] = v return base def _apply_conf_block( config: Dict[str, Any], kind: str, subtype: str, block: Dict[str, Any] ) -> None: kind_l = str(kind).strip().lower() subtype_l = str(subtype).strip().lower() if kind_l == "store": # Store instances are keyed by NAME (preferred). If a block uses `name=...`, # normalize it into NAME to keep a single canonical key. name = block.get("NAME") if not name: name = block.get("name") if name: block = dict(block) block.pop("name", None) block["NAME"] = name if not name: return name_l = str(name).strip().lower() payload = dict(block) store = config.setdefault("store", {}) if not isinstance(store, dict): config["store"] = {} store = config["store"] bucket = store.setdefault(subtype_l, {}) if not isinstance(bucket, dict): store[subtype_l] = {} bucket = store[subtype_l] existing = bucket.get(name_l) if isinstance(existing, dict): _merge_dict_inplace(existing, payload) else: bucket[name_l] = payload return if kind_l == "provider": provider_name = str(subtype).strip().lower() provider = config.setdefault("provider", {}) if not isinstance(provider, dict): config["provider"] = {} provider = config["provider"] existing = provider.get(provider_name) if isinstance(existing, dict): _merge_dict_inplace(existing, block) else: provider[provider_name] = dict(block) return if kind_l == "tool": tool_name = str(subtype).strip().lower() if not tool_name: return tool = config.setdefault("tool", {}) if not isinstance(tool, dict): config["tool"] = {} tool = config["tool"] existing = tool.get(tool_name) if isinstance(existing, dict): _merge_dict_inplace(existing, block) else: tool[tool_name] = dict(block) return def parse_conf_text(text: str, *, base: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """Parse a lightweight .conf format into the app's config dict. Supported patterns: - Top-level key/value: temp="./temp" - Sections: [store=folder] + name/path lines - Sections: [store=hydrusnetwork] + name/access key/url lines - Sections: [provider=OpenLibrary] + email/password lines - Dotted keys: store.folder.default.path="C:\\Media" (optional) """ config: Dict[str, Any] = dict(base or {}) current_kind: Optional[str] = None current_subtype: Optional[str] = None current_block: Dict[str, Any] = {} def flush() -> None: nonlocal current_kind, current_subtype, current_block if current_kind and current_subtype and current_block: _apply_conf_block(config, current_kind, current_subtype, current_block) current_kind = None current_subtype = None current_block = {} for raw_line in text.splitlines(): line = _strip_inline_comment(raw_line) if not line.strip(): continue stripped = line.strip() if stripped.startswith("[") and stripped.endswith("]"): flush() header = stripped[1:-1].strip() if "=" in header: k, v = header.split("=", 1) current_kind = k.strip() current_subtype = v.strip() else: # Unknown header style; ignore block current_kind = None current_subtype = None continue if "=" not in stripped: continue key, value = stripped.split("=", 1) key = key.strip() parsed_val = _parse_scalar(value) if current_kind and current_subtype: current_block[key] = parsed_val else: if "." in key: _set_nested(config, key, parsed_val) else: config[key] = parsed_val flush() return config def _load_conf_config(base_dir: Path, config_path: Path) -> Dict[str, Any]: config: Dict[str, Any] = {} raw = config_path.read_text(encoding="utf-8") config = parse_conf_text(raw, base=config) conf_dir = base_dir / "config.d" if conf_dir.exists() and conf_dir.is_dir(): for frag in sorted(conf_dir.glob("*.conf")): try: frag_raw = frag.read_text(encoding="utf-8") config = parse_conf_text(frag_raw, base=config) except OSError as exc: log(f"Failed to read {frag}: {exc}") return config def _format_conf_value(val: Any) -> str: if isinstance(val, bool): return "true" if val else "false" if isinstance(val, (int, float)): return str(val) if val is None: return '""' s = str(val) s = s.replace('"', '\\"') return f'"{s}"' def _serialize_conf(config: Dict[str, Any]) -> str: lines: list[str] = [] # Top-level scalars first for key in sorted(config.keys()): if key in {"store", "provider", "tool"}: continue value = config.get(key) if isinstance(value, dict): continue lines.append(f"{key}={_format_conf_value(value)}") # Store blocks store = config.get("store") if isinstance(store, dict): for subtype in sorted(store.keys()): bucket = store.get(subtype) if not isinstance(bucket, dict): continue for name in sorted(bucket.keys()): block = bucket.get(name) if not isinstance(block, dict): continue lines.append("") lines.append(f"[store={subtype}]") lines.append(f"name={_format_conf_value(name)}") # Deduplicate keys case-insensitively and skip "name" seen_keys = {"NAME", "name"} for k in sorted(block.keys()): k_upper = k.upper() if k_upper in seen_keys: continue seen_keys.add(k_upper) lines.append(f"{k}={_format_conf_value(block.get(k))}") # Provider blocks provider = config.get("provider") if isinstance(provider, dict): for prov in sorted(provider.keys()): block = provider.get(prov) if not isinstance(block, dict): continue lines.append("") lines.append(f"[provider={prov}]") seen_keys = set() for k in sorted(block.keys()): k_upper = k.upper() if k_upper in seen_keys: continue seen_keys.add(k_upper) lines.append(f"{k}={_format_conf_value(block.get(k))}") # Tool blocks tool = config.get("tool") if isinstance(tool, dict): for name in sorted(tool.keys()): block = tool.get(name) if not isinstance(block, dict): continue lines.append("") lines.append(f"[tool={name}]") seen_keys = set() for k in sorted(block.keys()): k_upper = k.upper() if k_upper in seen_keys: continue seen_keys.add(k_upper) lines.append(f"{k}={_format_conf_value(block.get(k))}") return "\n".join(lines).rstrip() + "\n" def _make_cache_key(config_dir: Optional[Path], filename: str, actual_path: Optional[Path]) -> str: if actual_path: return str(actual_path.resolve()) base_dir = config_dir or SCRIPT_DIR return str((base_dir / filename).resolve()) def get_hydrus_instance( config: Dict[str, Any], instance_name: str = "home" ) -> Optional[Dict[str, Any]]: """Get a specific Hydrus instance config by name. Supports multiple formats: - Current: config["store"]["hydrusnetwork"][instance_name] - Legacy: config["storage"]["hydrus"][instance_name] - Old: config["HydrusNetwork"][instance_name] Args: config: Configuration dict instance_name: Name of the Hydrus instance (default: "home") Returns: Dict with access key and URL, or None if not found """ # Canonical: config["store"]["hydrusnetwork"]["home"] store = config.get("store", {}) if isinstance(store, dict): hydrusnetwork = store.get("hydrusnetwork", {}) if isinstance(hydrusnetwork, dict): instance = hydrusnetwork.get(instance_name) if isinstance(instance, dict): return instance return None def get_hydrus_access_key(config: Dict[str, Any], instance_name: str = "home") -> Optional[str]: """Get Hydrus access key for an instance. Config format: - config["store"]["hydrusnetwork"][name]["API"] Args: config: Configuration dict instance_name: Name of the Hydrus instance (default: "home") Returns: Access key string, or None if not found """ instance = get_hydrus_instance(config, instance_name) if instance: key = instance.get("API") return str(key).strip() if key else None return None def get_hydrus_url(config: Dict[str, Any], instance_name: str = "home") -> Optional[str]: """Get Hydrus URL for an instance. Config format: - config["store"]["hydrusnetwork"][name]["URL"] Args: config: Configuration dict instance_name: Name of the Hydrus instance (default: "home") Returns: URL string, or None if not found """ instance = get_hydrus_instance(config, instance_name) url = instance.get("URL") if instance else None return str(url).strip() if url else None def get_provider_block(config: Dict[str, Any], name: str) -> Dict[str, Any]: provider_cfg = config.get("provider") if not isinstance(provider_cfg, dict): return {} block = provider_cfg.get(str(name).strip().lower()) return block if isinstance(block, dict) else {} def get_soulseek_username(config: Dict[str, Any]) -> Optional[str]: block = get_provider_block(config, "soulseek") val = block.get("username") or block.get("USERNAME") return str(val).strip() if val else None def get_soulseek_password(config: Dict[str, Any]) -> Optional[str]: block = get_provider_block(config, "soulseek") val = block.get("password") or block.get("PASSWORD") return str(val).strip() if val else None def resolve_output_dir(config: Dict[str, Any]) -> Path: """Resolve output directory from config with single source of truth. Priority: 1. config["temp"] - explicitly set temp/output directory 2. config["outfile"] - fallback to outfile setting 3. System Temp - default fallback directory Returns: Path to output directory """ # First try explicit temp setting from config temp_value = config.get("temp") if temp_value: try: path = expand_path(temp_value) # Verify we can access it (not a system directory with permission issues) if path.exists() or path.parent.exists(): return path except Exception: pass # Then try outfile setting outfile_value = config.get("outfile") if outfile_value: try: return expand_path(outfile_value) except Exception: pass # Fallback to system temp directory return Path(tempfile.gettempdir()) def get_local_storage_path(config: Dict[str, Any]) -> Optional[Path]: """Get local storage path from config. Supports multiple formats: - New: config["store"]["folder"]["any_name"]["path"] - Old: config["storage"]["local"]["path"] - Old: config["Local"]["path"] Args: config: Configuration dict Returns: Path object if found, None otherwise """ # Try new format: iterate all folder stores and use the first valid path found. store = config.get("store", {}) if isinstance(store, dict): folder_config = store.get("folder", {}) if isinstance(folder_config, dict): for name, inst_cfg in folder_config.items(): if isinstance(inst_cfg, dict): p = inst_cfg.get("path") or inst_cfg.get("PATH") if p: return expand_path(p) # Fall back to storage.local.path format storage = config.get("storage", {}) if isinstance(storage, dict): local_config = storage.get("local", {}) if isinstance(local_config, dict): path_str = local_config.get("path") if path_str: return expand_path(path_str) # Fall back to old Local format local_config = config.get("Local", {}) if isinstance(local_config, dict): path_str = local_config.get("path") if path_str: return expand_path(path_str) return None def get_debrid_api_key(config: Dict[str, Any], service: str = "All-debrid") -> Optional[str]: """Get Debrid API key from config. Config format: - config["store"]["debrid"][]["api_key"] where is the store name (e.g. "all-debrid") Args: config: Configuration dict service: Service name (default: "All-debrid") Returns: API key string if found, None otherwise """ store = config.get("store", {}) if not isinstance(store, dict): return None debrid_config = store.get("debrid", {}) if not isinstance(debrid_config, dict): return None service_key = str(service).strip().lower() entry = debrid_config.get(service_key) if isinstance(entry, dict): api_key = entry.get("api_key") return str(api_key).strip() if api_key else None if isinstance(entry, str): return entry.strip() or None return None def get_provider_credentials(config: Dict[str, Any], provider: str) -> Optional[Dict[str, str]]: """Get provider credentials (email/password) from config. Supports both formats: - New: config["provider"][provider] = {"email": "...", "password": "..."} - Old: config[provider.capitalize()] = {"email": "...", "password": "..."} Args: config: Configuration dict provider: Provider name (e.g., "openlibrary", "soulseek") Returns: Dict with credentials if found, None otherwise """ # Try new format first provider_config = config.get("provider", {}) if isinstance(provider_config, dict): creds = provider_config.get(provider.lower(), {}) if isinstance(creds, dict) and creds: return creds # Fall back to old format (capitalized key) old_key_map = { "openlibrary": "OpenLibrary", "archive": "Archive", "soulseek": "Soulseek", } old_key = old_key_map.get(provider.lower()) if old_key: creds = config.get(old_key, {}) if isinstance(creds, dict) and creds: return creds return None def resolve_cookies_path( config: Dict[str, Any], script_dir: Optional[Path] = None ) -> Optional[Path]: # Support both legacy top-level `cookies=...` and the modular conf style: # [tool=ytdlp] # cookies="C:\\path\\cookies.txt" values: list[Any] = [] try: values.append(config.get("cookies")) except Exception: pass try: tool = config.get("tool") if isinstance(tool, dict): ytdlp = tool.get("ytdlp") if isinstance(ytdlp, dict): values.append(ytdlp.get("cookies")) values.append(ytdlp.get("cookiefile")) except Exception: pass try: ytdlp_block = config.get("ytdlp") if isinstance(ytdlp_block, dict): values.append(ytdlp_block.get("cookies")) values.append(ytdlp_block.get("cookiefile")) except Exception: pass base_dir = script_dir or SCRIPT_DIR for value in values: if not value: continue candidate = expand_path(value) if not candidate.is_absolute(): candidate = expand_path(base_dir / candidate) if candidate.is_file(): return candidate default_path = base_dir / "cookies.txt" if default_path.is_file(): return default_path return None def resolve_debug_log(config: Dict[str, Any]) -> Optional[Path]: value = config.get("download_debug_log") if not value: return None path = expand_path(value) if not path.is_absolute(): path = Path.cwd() / path return path def load_config( config_dir: Optional[Path] = None, filename: str = DEFAULT_CONFIG_FILENAME ) -> Dict[str, Any]: base_dir = config_dir or SCRIPT_DIR config_path = base_dir / filename cache_key = _make_cache_key(config_dir, filename, config_path) if cache_key in _CONFIG_CACHE: return _CONFIG_CACHE[cache_key] if config_path.suffix.lower() != ".conf": log(f"Unsupported config format: {config_path.name} (only .conf is supported)") _CONFIG_CACHE[cache_key] = {} return {} try: data = _load_conf_config(base_dir, config_path) except FileNotFoundError: _CONFIG_CACHE[cache_key] = {} return {} except OSError as exc: log(f"Failed to read {config_path}: {exc}") _CONFIG_CACHE[cache_key] = {} return {} _CONFIG_CACHE[cache_key] = data return data def reload_config( config_dir: Optional[Path] = None, filename: str = DEFAULT_CONFIG_FILENAME ) -> Dict[str, Any]: cache_key = _make_cache_key(config_dir, filename, None) _CONFIG_CACHE.pop(cache_key, None) return load_config(config_dir=config_dir, filename=filename) def clear_config_cache() -> None: _CONFIG_CACHE.clear() def save_config( config: Dict[str, Any], config_dir: Optional[Path] = None, filename: str = DEFAULT_CONFIG_FILENAME, ) -> None: base_dir = config_dir or SCRIPT_DIR config_path = base_dir / filename if config_path.suffix.lower() != ".conf": raise RuntimeError( f"Unsupported config format: {config_path.name} (only .conf is supported)" ) try: config_path.write_text(_serialize_conf(config), encoding="utf-8") except OSError as exc: raise RuntimeError(f"Failed to write config to {config_path}: {exc}") from exc cache_key = _make_cache_key(config_dir, filename, config_path) _CONFIG_CACHE[cache_key] = config def load() -> Dict[str, Any]: """Return the parsed downlow configuration.""" return load_config() def save(config: Dict[str, Any]) -> None: """Persist *config* back to disk.""" save_config(config)