361 lines
12 KiB
Python
361 lines
12 KiB
Python
|
|
"""Unified configuration helpers for downlow."""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Any, Dict, Optional
|
|
from pathlib import Path
|
|
from helper.logger import log
|
|
|
|
DEFAULT_CONFIG_FILENAME = "config.json"
|
|
SCRIPT_DIR = Path(__file__).resolve().parent
|
|
|
|
_CONFIG_CACHE: Dict[str, Dict[str, Any]] = {}
|
|
|
|
|
|
def _make_cache_key(config_dir: Optional[Path], filename: str, actual_path: Optional[Path]) -> str:
|
|
if actual_path:
|
|
return str(actual_path.resolve())
|
|
base_dir = (config_dir or SCRIPT_DIR)
|
|
return str((base_dir / filename).resolve())
|
|
|
|
|
|
|
|
def get_hydrus_instance(config: Dict[str, Any], instance_name: str = "home") -> Optional[Dict[str, Any]]:
|
|
"""Get a specific Hydrus instance config by name.
|
|
|
|
Supports both formats:
|
|
- New: config["storage"]["hydrus"][instance_name] = {"key": "...", "url": "..."}
|
|
- Old: config["HydrusNetwork"][instance_name] = {"key": "...", "url": "..."}
|
|
|
|
Args:
|
|
config: Configuration dict
|
|
instance_name: Name of the Hydrus instance (default: "home")
|
|
|
|
Returns:
|
|
Dict with "key" and "url" keys, or None if not found
|
|
"""
|
|
# Try new format first
|
|
storage = config.get("storage", {})
|
|
if isinstance(storage, dict):
|
|
hydrus_config = storage.get("hydrus", {})
|
|
if isinstance(hydrus_config, dict):
|
|
instance = hydrus_config.get(instance_name)
|
|
if isinstance(instance, dict):
|
|
return instance
|
|
|
|
# Fall back to old format
|
|
hydrus_network = config.get("HydrusNetwork")
|
|
if not isinstance(hydrus_network, dict):
|
|
return None
|
|
|
|
instance = hydrus_network.get(instance_name)
|
|
if isinstance(instance, dict):
|
|
return instance
|
|
|
|
return None
|
|
|
|
|
|
def get_hydrus_access_key(config: Dict[str, Any], instance_name: str = "home") -> Optional[str]:
|
|
"""Get Hydrus access key for an instance.
|
|
|
|
Supports both old flat format and new nested format:
|
|
- Old: config["HydrusNetwork_Access_Key"]
|
|
- New: config["HydrusNetwork"][instance_name]["key"]
|
|
|
|
Args:
|
|
config: Configuration dict
|
|
instance_name: Name of the Hydrus instance (default: "home")
|
|
|
|
Returns:
|
|
Access key string, or None if not found
|
|
"""
|
|
instance = get_hydrus_instance(config, instance_name)
|
|
key = instance.get("key") if instance else config.get("HydrusNetwork_Access_Key")
|
|
return str(key).strip() if key else None
|
|
|
|
|
|
def get_hydrus_url(config: Dict[str, Any], instance_name: str = "home") -> Optional[str]:
|
|
"""Get Hydrus URL for an instance.
|
|
|
|
Supports both old flat format and new nested format:
|
|
- Old: config["HydrusNetwork_URL"] or constructed from IP/Port/HTTPS
|
|
- New: config["HydrusNetwork"][instance_name]["url"]
|
|
|
|
Args:
|
|
config: Configuration dict
|
|
instance_name: Name of the Hydrus instance (default: "home")
|
|
|
|
Returns:
|
|
URL string, or None if not found
|
|
"""
|
|
instance = get_hydrus_instance(config, instance_name)
|
|
url = instance.get("url") if instance else config.get("HydrusNetwork_URL")
|
|
if url: # Check if not None and not empty
|
|
return str(url).strip()
|
|
# Build from IP/Port/HTTPS if not found
|
|
host = str(config.get("HydrusNetwork_IP") or "localhost").strip() or "localhost"
|
|
port = str(config.get("HydrusNetwork_Port") or "45869").strip()
|
|
scheme = "https" if str(config.get("HydrusNetwork_Use_HTTPS") or "").strip().lower() in {"1", "true", "yes", "on"} else "http"
|
|
authority = host if not (":" in host and not host.startswith("[")) else f"[{host}]"
|
|
return f"{scheme}://{authority}:{port}"
|
|
|
|
|
|
|
|
def resolve_output_dir(config: Dict[str, Any]) -> Path:
|
|
"""Resolve output directory from config with single source of truth.
|
|
|
|
Priority:
|
|
1. config["temp"] - explicitly set temp/output directory
|
|
2. config["outfile"] - fallback to outfile setting
|
|
3. Home/Videos - safe user directory fallback
|
|
|
|
Returns:
|
|
Path to output directory
|
|
"""
|
|
# First try explicit temp setting from config
|
|
temp_value = config.get("temp")
|
|
if temp_value:
|
|
try:
|
|
path = Path(str(temp_value)).expanduser()
|
|
# Verify we can access it (not a system directory with permission issues)
|
|
if path.exists() or path.parent.exists():
|
|
return path
|
|
except Exception:
|
|
pass
|
|
|
|
# Then try outfile setting
|
|
outfile_value = config.get("outfile")
|
|
if outfile_value:
|
|
try:
|
|
return Path(str(outfile_value)).expanduser()
|
|
except Exception:
|
|
pass
|
|
|
|
# Fallback to user's Videos directory
|
|
return Path.home() / "Videos"
|
|
|
|
|
|
def get_local_storage_path(config: Dict[str, Any]) -> Optional[Path]:
|
|
"""Get local storage path from config.
|
|
|
|
Supports both formats:
|
|
- New: config["storage"]["local"]["path"]
|
|
- Old: config["Local"]["path"]
|
|
|
|
Args:
|
|
config: Configuration dict
|
|
|
|
Returns:
|
|
Path object if found, None otherwise
|
|
"""
|
|
# Try new format first
|
|
storage = config.get("storage", {})
|
|
if isinstance(storage, dict):
|
|
local_config = storage.get("local", {})
|
|
if isinstance(local_config, dict):
|
|
path_str = local_config.get("path")
|
|
if path_str:
|
|
return Path(str(path_str)).expanduser()
|
|
|
|
# Fall back to old format
|
|
local_config = config.get("Local", {})
|
|
if isinstance(local_config, dict):
|
|
path_str = local_config.get("path")
|
|
if path_str:
|
|
return Path(str(path_str)).expanduser()
|
|
|
|
return None
|
|
|
|
|
|
def get_debrid_api_key(config: Dict[str, Any], service: str = "All-debrid") -> Optional[str]:
|
|
"""Get Debrid API key from config.
|
|
|
|
Supports both formats:
|
|
- New: config["storage"]["debrid"]["All-debrid"]
|
|
- Old: config["Debrid"]["All-debrid"]
|
|
|
|
Args:
|
|
config: Configuration dict
|
|
service: Service name (default: "All-debrid")
|
|
|
|
Returns:
|
|
API key string if found, None otherwise
|
|
"""
|
|
# Try new format first
|
|
storage = config.get("storage", {})
|
|
if isinstance(storage, dict):
|
|
debrid_config = storage.get("debrid", {})
|
|
if isinstance(debrid_config, dict):
|
|
api_key = debrid_config.get(service)
|
|
if api_key: # Check if not None and not empty
|
|
return str(api_key).strip() if api_key else None
|
|
|
|
# Fall back to old format
|
|
debrid_config = config.get("Debrid", {})
|
|
if isinstance(debrid_config, dict):
|
|
api_key = debrid_config.get(service)
|
|
if api_key: # Check if not None and not empty
|
|
return str(api_key).strip() if api_key else None
|
|
|
|
return None
|
|
|
|
|
|
def get_provider_credentials(config: Dict[str, Any], provider: str) -> Optional[Dict[str, str]]:
|
|
"""Get provider credentials (email/password) from config.
|
|
|
|
Supports both formats:
|
|
- New: config["provider"][provider] = {"email": "...", "password": "..."}
|
|
- Old: config[provider.capitalize()] = {"email": "...", "password": "..."}
|
|
|
|
Args:
|
|
config: Configuration dict
|
|
provider: Provider name (e.g., "openlibrary", "soulseek")
|
|
|
|
Returns:
|
|
Dict with credentials if found, None otherwise
|
|
"""
|
|
# Try new format first
|
|
provider_config = config.get("provider", {})
|
|
if isinstance(provider_config, dict):
|
|
creds = provider_config.get(provider.lower(), {})
|
|
if isinstance(creds, dict) and creds:
|
|
return creds
|
|
|
|
# Fall back to old format (capitalized key)
|
|
old_key_map = {
|
|
"openlibrary": "OpenLibrary",
|
|
"archive": "Archive",
|
|
"soulseek": "Soulseek",
|
|
}
|
|
old_key = old_key_map.get(provider.lower())
|
|
if old_key:
|
|
creds = config.get(old_key, {})
|
|
if isinstance(creds, dict) and creds:
|
|
return creds
|
|
|
|
return None
|
|
|
|
|
|
def resolve_cookies_path(config: Dict[str, Any], script_dir: Optional[Path] = None) -> Optional[Path]:
|
|
value = config.get("cookies") or config.get("Cookies_Path")
|
|
if value:
|
|
candidate = Path(str(value)).expanduser()
|
|
if candidate.is_file():
|
|
return candidate
|
|
base_dir = script_dir or SCRIPT_DIR
|
|
default_path = base_dir / "cookies.txt"
|
|
if default_path.is_file():
|
|
return default_path
|
|
return None
|
|
|
|
def resolve_debug_log(config: Dict[str, Any]) -> Optional[Path]:
|
|
value = config.get("download_debug_log")
|
|
if not value:
|
|
return None
|
|
path = Path(str(value)).expanduser()
|
|
if not path.is_absolute():
|
|
path = Path.cwd() / path
|
|
return path
|
|
|
|
def load_config(config_dir: Optional[Path] = None, filename: str = DEFAULT_CONFIG_FILENAME) -> Dict[str, Any]:
|
|
base_dir = config_dir or SCRIPT_DIR
|
|
config_path = base_dir / filename
|
|
cache_key = _make_cache_key(config_dir, filename, config_path)
|
|
if cache_key in _CONFIG_CACHE:
|
|
return _CONFIG_CACHE[cache_key]
|
|
|
|
try:
|
|
raw = config_path.read_text(encoding="utf-8")
|
|
except FileNotFoundError:
|
|
# Try alternate filename if default not found
|
|
if filename == DEFAULT_CONFIG_FILENAME:
|
|
alt_path = base_dir / "downlow.json"
|
|
try:
|
|
raw = alt_path.read_text(encoding="utf-8")
|
|
config_path = alt_path
|
|
cache_key = _make_cache_key(config_dir, filename, alt_path)
|
|
except FileNotFoundError:
|
|
_CONFIG_CACHE[cache_key] = {}
|
|
return {}
|
|
except OSError as exc:
|
|
log(f"Failed to read {alt_path}: {exc}")
|
|
_CONFIG_CACHE[cache_key] = {}
|
|
return {}
|
|
else:
|
|
_CONFIG_CACHE[cache_key] = {}
|
|
return {}
|
|
except OSError as exc:
|
|
log(f"Failed to read {config_path}: {exc}")
|
|
_CONFIG_CACHE[cache_key] = {}
|
|
return {}
|
|
|
|
raw = raw.strip()
|
|
if not raw:
|
|
_CONFIG_CACHE[cache_key] = {}
|
|
return {}
|
|
try:
|
|
data = json.loads(raw)
|
|
except json.JSONDecodeError as exc:
|
|
log(f"Invalid JSON in {config_path}: {exc}")
|
|
_CONFIG_CACHE[cache_key] = {}
|
|
return {}
|
|
if not isinstance(data, dict):
|
|
log(f"Expected object in {config_path}, got {type(data).__name__}")
|
|
_CONFIG_CACHE[cache_key] = {}
|
|
return {}
|
|
|
|
_CONFIG_CACHE[cache_key] = data
|
|
return data
|
|
|
|
|
|
def reload_config(config_dir: Optional[Path] = None, filename: str = DEFAULT_CONFIG_FILENAME) -> Dict[str, Any]:
|
|
cache_key = _make_cache_key(config_dir, filename, None)
|
|
_CONFIG_CACHE.pop(cache_key, None)
|
|
return load_config(config_dir=config_dir, filename=filename)
|
|
|
|
|
|
def clear_config_cache() -> None:
|
|
_CONFIG_CACHE.clear()
|
|
|
|
def save_config(
|
|
config: Dict[str, Any],
|
|
config_dir: Optional[Path] = None,
|
|
filename: str = DEFAULT_CONFIG_FILENAME,
|
|
) -> None:
|
|
base_dir = config_dir or SCRIPT_DIR
|
|
config_path = base_dir / filename
|
|
|
|
# Load existing config to preserve keys that aren't being changed
|
|
try:
|
|
existing_raw = config_path.read_text(encoding="utf-8")
|
|
existing_data = json.loads(existing_raw.strip())
|
|
if isinstance(existing_data, dict):
|
|
# Merge: existing config as base, then overlay with new config
|
|
merged = existing_data.copy()
|
|
merged.update(config)
|
|
config = merged
|
|
except (FileNotFoundError, OSError, json.JSONDecodeError):
|
|
# File doesn't exist or is invalid, use provided config as-is
|
|
pass
|
|
|
|
try:
|
|
config_path.write_text(
|
|
json.dumps(config, ensure_ascii=False, indent=2, sort_keys=True) + "\n",
|
|
encoding="utf-8",
|
|
)
|
|
except OSError as exc:
|
|
raise RuntimeError(f"Failed to write config to {config_path}: {exc}") from exc
|
|
|
|
cache_key = _make_cache_key(config_dir, filename, config_path)
|
|
_CONFIG_CACHE[cache_key] = config
|
|
|
|
def load() -> Dict[str, Any]:
|
|
"""Return the parsed downlow configuration."""
|
|
return load_config()
|
|
|
|
def save(config: Dict[str, Any]) -> None:
|
|
"""Persist *config* back to disk."""
|
|
save_config(config)
|