Files
Medios-Macina/SYS/config.py
2026-01-23 16:46:48 -08:00

443 lines
13 KiB
Python

""" """
from __future__ import annotations
import re
import tempfile
import json
import sqlite3
import time
from pathlib import Path
from typing import Any, Dict, Optional, List
from SYS.logger import log
from SYS.utils import expand_path
from SYS.database import db, get_config_all, save_config_value
DEFAULT_CONFIG_FILENAME = "config.conf"
SCRIPT_DIR = Path(__file__).resolve().parent
_CONFIG_CACHE: Dict[str, Dict[str, Any]] = {}
_CONFIG_SAVE_MAX_RETRIES = 5
_CONFIG_SAVE_RETRY_DELAY = 0.15
def global_config() -> List[Dict[str, Any]]:
"""Return configuration schema for global settings."""
return [
{
"key": "debug",
"label": "Debug Output",
"default": "false",
"choices": ["true", "false"]
},
{
"key": "auto_update",
"label": "Auto-Update",
"default": "true",
"choices": ["true", "false"]
}
]
def clear_config_cache() -> None:
"""Clear the configuration cache."""
_CONFIG_CACHE.clear()
def _make_cache_key(config_dir: Optional[Path], filename: str, actual_path: Optional[Path]) -> str:
if actual_path:
return str(actual_path.resolve())
base_dir = config_dir or SCRIPT_DIR
return str((base_dir / filename).resolve())
def get_hydrus_instance(
config: Dict[str, Any], instance_name: str = "home"
) -> Optional[Dict[str, Any]]:
"""Get a specific Hydrus instance config by name.
Supports modern config plus a fallback when no exact match exists.
"""
store = config.get("store", {})
if not isinstance(store, dict):
return None
hydrusnetwork = store.get("hydrusnetwork", {})
if not isinstance(hydrusnetwork, dict) or not hydrusnetwork:
return None
instance = hydrusnetwork.get(instance_name)
if isinstance(instance, dict):
return instance
target = str(instance_name or "").lower()
for name, conf in hydrusnetwork.items():
if isinstance(conf, dict) and str(name).lower() == target:
return conf
keys = sorted(hydrusnetwork.keys())
for key in keys:
if not str(key or "").startswith("new_"):
candidate = hydrusnetwork.get(key)
if isinstance(candidate, dict):
return candidate
first_key = keys[0]
candidate = hydrusnetwork.get(first_key)
if isinstance(candidate, dict):
return candidate
return None
def get_hydrus_access_key(config: Dict[str, Any], instance_name: str = "home") -> Optional[str]:
"""Get Hydrus access key for an instance.
Config format:
- config["store"]["hydrusnetwork"][name]["API"]
Args:
config: Configuration dict
instance_name: Name of the Hydrus instance (default: "home")
Returns:
Access key string, or None if not found
"""
instance = get_hydrus_instance(config, instance_name)
if instance:
key = instance.get("API")
return str(key).strip() if key else None
return None
def get_hydrus_url(config: Dict[str, Any], instance_name: str = "home") -> Optional[str]:
"""Get Hydrus URL for an instance.
Config format:
- config["store"]["hydrusnetwork"][name]["URL"]
Args:
config: Configuration dict
instance_name: Name of the Hydrus instance (default: "home")
Returns:
URL string, or None if not found
"""
instance = get_hydrus_instance(config, instance_name)
url = instance.get("URL") if instance else None
return str(url).strip() if url else None
def get_provider_block(config: Dict[str, Any], name: str) -> Dict[str, Any]:
provider_cfg = config.get("provider")
if not isinstance(provider_cfg, dict):
return {}
block = provider_cfg.get(str(name).strip().lower())
return block if isinstance(block, dict) else {}
def get_soulseek_username(config: Dict[str, Any]) -> Optional[str]:
block = get_provider_block(config, "soulseek")
val = block.get("username") or block.get("USERNAME")
return str(val).strip() if val else None
def get_soulseek_password(config: Dict[str, Any]) -> Optional[str]:
block = get_provider_block(config, "soulseek")
val = block.get("password") or block.get("PASSWORD")
return str(val).strip() if val else None
def resolve_output_dir(config: Dict[str, Any]) -> Path:
"""Resolve output directory from config with single source of truth.
Priority:
1. config["temp"] - explicitly set temp/output directory
2. config["outfile"] - fallback to outfile setting
3. System Temp - default fallback directory
Returns:
Path to output directory
"""
# First try explicit temp setting from config
temp_value = config.get("temp")
if temp_value:
try:
path = expand_path(temp_value)
# Verify we can access it (not a system directory with permission issues)
if path.exists() or path.parent.exists():
return path
except Exception:
pass
# Then try outfile setting
outfile_value = config.get("outfile")
if outfile_value:
try:
return expand_path(outfile_value)
except Exception:
pass
# Fallback to system temp directory
return Path(tempfile.gettempdir())
def get_local_storage_path(config: Dict[str, Any]) -> Optional[Path]:
"""Get local storage path from config.
Supports multiple formats:
- Old: config["storage"]["local"]["path"]
- Old: config["Local"]["path"]
Args:
config: Configuration dict
Returns:
Path object if found, None otherwise
"""
# Fall back to storage.local.path format
storage = config.get("storage", {})
if isinstance(storage, dict):
local_config = storage.get("local", {})
if isinstance(local_config, dict):
path_str = local_config.get("path")
if path_str:
return expand_path(path_str)
# Fall back to old Local format
local_config = config.get("Local", {})
if isinstance(local_config, dict):
path_str = local_config.get("path")
if path_str:
return expand_path(path_str)
return None
def get_debrid_api_key(config: Dict[str, Any], service: str = "All-debrid") -> Optional[str]:
"""Get Debrid API key from config.
Config format:
- config["store"]["debrid"][<name>]["api_key"]
where <name> is the store name (e.g. "all-debrid")
Args:
config: Configuration dict
service: Service name (default: "All-debrid")
Returns:
API key string if found, None otherwise
"""
store = config.get("store", {})
if not isinstance(store, dict):
return None
debrid_config = store.get("debrid", {})
if not isinstance(debrid_config, dict):
return None
service_key = str(service).strip().lower()
entry = debrid_config.get(service_key)
if isinstance(entry, dict):
api_key = entry.get("api_key")
return str(api_key).strip() if api_key else None
if isinstance(entry, str):
return entry.strip() or None
return None
def get_provider_credentials(config: Dict[str, Any], provider: str) -> Optional[Dict[str, str]]:
"""Get provider credentials (email/password) from config.
Supports both formats:
- New: config["provider"][provider] = {"email": "...", "password": "..."}
- Old: config[provider.capitalize()] = {"email": "...", "password": "..."}
Args:
config: Configuration dict
provider: Provider name (e.g., "openlibrary", "soulseek")
Returns:
Dict with credentials if found, None otherwise
"""
# Try new format first
provider_config = config.get("provider", {})
if isinstance(provider_config, dict):
creds = provider_config.get(provider.lower(), {})
if isinstance(creds, dict) and creds:
return creds
# Fall back to old format (capitalized key)
old_key_map = {
"openlibrary": "OpenLibrary",
"archive": "Archive",
"soulseek": "Soulseek",
}
old_key = old_key_map.get(provider.lower())
if old_key:
creds = config.get(old_key, {})
if isinstance(creds, dict) and creds:
return creds
return None
def resolve_cookies_path(
config: Dict[str, Any], script_dir: Optional[Path] = None
) -> Optional[Path]:
# Support both legacy top-level `cookies=...` and the modular conf style:
# [tool=ytdlp]
# cookies="C:\\path\\cookies.txt"
values: list[Any] = []
try:
values.append(config.get("cookies"))
except Exception:
pass
try:
tool = config.get("tool")
if isinstance(tool, dict):
ytdlp = tool.get("ytdlp")
if isinstance(ytdlp, dict):
values.append(ytdlp.get("cookies"))
values.append(ytdlp.get("cookiefile"))
except Exception:
pass
try:
ytdlp_block = config.get("ytdlp")
if isinstance(ytdlp_block, dict):
values.append(ytdlp_block.get("cookies"))
values.append(ytdlp_block.get("cookiefile"))
except Exception:
pass
base_dir = script_dir or SCRIPT_DIR
for value in values:
if not value:
continue
candidate = expand_path(value)
if not candidate.is_absolute():
candidate = expand_path(base_dir / candidate)
if candidate.is_file():
return candidate
default_path = base_dir / "cookies.txt"
if default_path.is_file():
return default_path
return None
def resolve_debug_log(config: Dict[str, Any]) -> Optional[Path]:
value = config.get("download_debug_log")
if not value:
return None
path = expand_path(value)
if not path.is_absolute():
path = Path.cwd() / path
return path
def load_config(
config_dir: Optional[Path] = None, filename: str = DEFAULT_CONFIG_FILENAME
) -> Dict[str, Any]:
base_dir = config_dir or SCRIPT_DIR
config_path = base_dir / filename
cache_key = _make_cache_key(config_dir, filename, config_path)
if cache_key in _CONFIG_CACHE:
return _CONFIG_CACHE[cache_key]
# Load from database
db_config = get_config_all()
if db_config:
_CONFIG_CACHE[cache_key] = db_config
return db_config
return {}
def reload_config(
config_dir: Optional[Path] = None, filename: str = DEFAULT_CONFIG_FILENAME
) -> Dict[str, Any]:
base_dir = config_dir or SCRIPT_DIR
config_path = base_dir / filename
cache_key = _make_cache_key(config_dir, filename, config_path)
_CONFIG_CACHE.pop(cache_key, None)
return load_config(config_dir=config_dir, filename=filename)
def save_config(
config: Dict[str, Any],
config_dir: Optional[Path] = None,
filename: str = DEFAULT_CONFIG_FILENAME,
) -> int:
base_dir = config_dir or SCRIPT_DIR
config_path = base_dir / filename
def _write_entries() -> int:
count = 0
with db.transaction():
db.execute("DELETE FROM config")
for key, value in config.items():
if key in ('store', 'provider', 'tool'):
if isinstance(value, dict):
for subtype, instances in value.items():
if isinstance(instances, dict):
if key == 'store':
for name, settings in instances.items():
if isinstance(settings, dict):
for k, v in settings.items():
save_config_value(key, subtype, name, k, v)
count += 1
else:
for k, v in instances.items():
save_config_value(key, subtype, "default", k, v)
count += 1
else:
if not key.startswith("_") and value is not None:
save_config_value("global", "none", "none", key, value)
count += 1
return count
saved_entries = 0
attempts = 0
while True:
try:
saved_entries = _write_entries()
log(f"Saved {saved_entries} configuration entries to database.")
break
except sqlite3.OperationalError as exc:
attempts += 1
locked_error = "locked" in str(exc).lower()
if not locked_error or attempts >= _CONFIG_SAVE_MAX_RETRIES:
log(f"CRITICAL: Failed to save config to database: {exc}")
raise
delay = _CONFIG_SAVE_RETRY_DELAY * attempts
log(
f"Database busy locking medios.db (attempt {attempts}/{_CONFIG_SAVE_MAX_RETRIES}); retrying in {delay:.2f}s."
)
time.sleep(delay)
except Exception as exc:
log(f"CRITICAL: Failed to save config to database: {exc}")
raise
cache_key = _make_cache_key(config_dir, filename, config_path)
clear_config_cache()
_CONFIG_CACHE[cache_key] = config
return saved_entries
def load() -> Dict[str, Any]:
"""Return the parsed downlow configuration."""
return load_config()
def save(config: Dict[str, Any]) -> int:
"""Persist *config* back to disk."""
return save_config(config)