diff --git a/SYS/cli_syntax.py b/SYS/cli_syntax.py new file mode 100644 index 0000000..b6fe1e0 --- /dev/null +++ b/SYS/cli_syntax.py @@ -0,0 +1,328 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any, Dict, Optional + +import re + + +@dataclass(frozen=True) +class SyntaxErrorDetail: + message: str + expected: Optional[str] = None + + +def _split_pipeline_stages(text: str) -> list[str]: + """Split a pipeline command into stage strings on unquoted '|' characters.""" + raw = str(text or "") + if not raw: + return [] + + stages: list[str] = [] + buf: list[str] = [] + quote: Optional[str] = None + escaped = False + + for ch in raw: + if escaped: + buf.append(ch) + escaped = False + continue + + if ch == "\\" and quote is not None: + buf.append(ch) + escaped = True + continue + + if ch in ('"', "'"): + if quote is None: + quote = ch + elif quote == ch: + quote = None + buf.append(ch) + continue + + if ch == "|" and quote is None: + stage = "".join(buf).strip() + if stage: + stages.append(stage) + buf = [] + continue + + buf.append(ch) + + tail = "".join(buf).strip() + if tail: + stages.append(tail) + return stages + + +def _tokenize_stage(stage_text: str) -> list[str]: + """Tokenize a stage string (best-effort).""" + import shlex + + text = str(stage_text or "").strip() + if not text: + return [] + try: + return shlex.split(text) + except Exception: + return text.split() + + +def _has_flag(tokens: list[str], *flags: str) -> bool: + want = {str(f).strip().lower() for f in flags if str(f).strip()} + if not want: + return False + for tok in tokens: + low = str(tok).strip().lower() + if low in want: + return True + # Support -arg=value + if "=" in low: + head = low.split("=", 1)[0].strip() + if head in want: + return True + return False + + +def _get_flag_value(tokens: list[str], *flags: str) -> Optional[str]: + """Return the value for a flag from tokenized args. + + Supports: + - -flag value + - --flag value + - -flag=value + - --flag=value + """ + want = {str(f).strip().lower() for f in flags if str(f).strip()} + if not want: + return None + for idx, tok in enumerate(tokens): + low = str(tok).strip().lower() + if "=" in low: + head, val = low.split("=", 1) + if head.strip() in want: + return tok.split("=", 1)[1] + if low in want and idx + 1 < len(tokens): + return tokens[idx + 1] + return None + + +def _validate_add_note_requires_add_file_order(raw: str) -> Optional[SyntaxErrorDetail]: + """Enforce: add-note in piped mode must occur after add-file. + + Rationale: add-note requires a known (store, hash) target; piping before add-file + means the item likely has no hash yet. + """ + stages = _split_pipeline_stages(raw) + if len(stages) <= 1: + return None + + parsed: list[tuple[str, list[str]]] = [] + for stage in stages: + tokens = _tokenize_stage(stage) + if not tokens: + continue + cmd = str(tokens[0]).replace("_", "-").strip().lower() + parsed.append((cmd, tokens)) + + add_file_positions = [i for i, (cmd, _toks) in enumerate(parsed) if cmd == "add-file"] + if not add_file_positions: + return None + + for i, (cmd, tokens) in enumerate(parsed): + if cmd != "add-note": + continue + + # If add-note occurs before any add-file stage, it must be explicitly targeted. + if any(pos > i for pos in add_file_positions): + has_hash = _has_flag(tokens, "-hash", "--hash") + has_store = _has_flag(tokens, "-store", "--store") + + # Also accept explicit targeting via -query "store: hash: ...". + query_val = _get_flag_value(tokens, "-query", "--query") + has_store_hash_in_query = False + if query_val: + try: + parsed_q = parse_query(str(query_val)) + q_hash = get_field(parsed_q, "hash") or get_field(parsed_q, "sha256") + q_store = get_field(parsed_q, "store") + has_store_hash_in_query = bool( + str(q_hash or "").strip() and str(q_store or "").strip() + ) + except Exception: + has_store_hash_in_query = False + + if (has_hash and has_store) or has_store_hash_in_query: + continue + return SyntaxErrorDetail( + "Pipeline error: 'add-note' must come after 'add-file' when used with piped input. " + "Move 'add-note' after 'add-file', or call it with explicit targeting: " + 'add-note -query "store: hash: title:,text:<text>".' + ) + + return None + + +def validate_pipeline_text(text: str) -> Optional[SyntaxErrorDetail]: + """Validate raw CLI input before tokenization/execution. + + This is intentionally lightweight and focuses on user-facing syntax issues: + - Unbalanced single/double quotes + - Dangling or empty pipeline stages (|) + + Returns: + None if valid, otherwise a SyntaxErrorDetail describing the issue. + """ + if text is None: + return SyntaxErrorDetail("Empty command") + + raw = text.strip() + if not raw: + return SyntaxErrorDetail("Empty command") + + in_single = False + in_double = False + escaped = False + last_pipe_outside_quotes: Optional[int] = None + + for idx, ch in enumerate(raw): + if escaped: + escaped = False + continue + + if ch == "\\" and (in_single or in_double): + escaped = True + continue + + if ch == '"' and not in_single: + in_double = not in_double + continue + + if ch == "'" and not in_double: + in_single = not in_single + continue + + if ch == "|" and not in_single and not in_double: + # Record pipe locations to catch empty stages/dangling pipe. + if last_pipe_outside_quotes is not None and last_pipe_outside_quotes == idx - 1: + return SyntaxErrorDetail("Syntax error: empty pipeline stage (found '||').") + last_pipe_outside_quotes = idx + + if in_double: + return SyntaxErrorDetail("Syntax error: missing closing " + '"' + ".", expected='"') + if in_single: + return SyntaxErrorDetail("Syntax error: missing closing '.", expected="'") + + # Dangling pipe at end / pipe as first non-space character + if raw.startswith("|"): + return SyntaxErrorDetail("Syntax error: pipeline cannot start with '|'.") + if raw.endswith("|"): + return SyntaxErrorDetail("Syntax error: pipeline cannot end with '|'.") + + # Empty stage like "cmd1 | | cmd2" (spaces between pipes) + if "|" in raw: + # Simple pass: look for pipes that have only whitespace between them. + # We only check outside quotes by re-scanning and counting non-space chars between pipes. + in_single = False + in_double = False + escaped = False + seen_nonspace_since_pipe = True # start true to allow leading command + for ch in raw: + if escaped: + escaped = False + continue + if ch == "\\" and (in_single or in_double): + escaped = True + continue + if ch == '"' and not in_single: + in_double = not in_double + continue + if ch == "'" and not in_double: + in_single = not in_single + continue + if ch == "|" and not in_single and not in_double: + if not seen_nonspace_since_pipe: + return SyntaxErrorDetail( + "Syntax error: empty pipeline stage (use a command between '|')." + ) + seen_nonspace_since_pipe = False + continue + if not in_single and not in_double and not ch.isspace(): + seen_nonspace_since_pipe = True + + # Semantic rules (still lightweight; no cmdlet imports) + semantic_error = _validate_add_note_requires_add_file_order(raw) + if semantic_error is not None: + return semantic_error + + return None + + +def parse_query(query: str) -> Dict[str, Any]: + """Parse a query string into field:value pairs and free text. + + Supports syntax like: + - isbn:0557677203 + - author:"Albert Pike" + - title:"Morals and Dogma" year:2010 + - Mixed with free text: Morals isbn:0557677203 + + Returns: + Dict with keys: + - fields: Dict[str, str] + - text: str + - raw: str + """ + + result: Dict[str, Any] = { + "fields": {}, + "text": "", + "raw": query, + } + + if not query or not query.strip(): + return result + + raw = query.strip() + remaining_parts: list[str] = [] + + # Match field:value where value is either a quoted string or a non-space token. + pattern = r'(\w+):(?:"([^"]*)"|(\S+))' + + pos = 0 + for match in re.finditer(pattern, raw): + if match.start() > pos: + before_text = raw[pos : match.start()].strip() + if before_text: + remaining_parts.append(before_text) + + field_name = (match.group(1) or "").lower() + field_value = match.group(2) if match.group(2) is not None else match.group(3) + if field_name: + result["fields"][field_name] = field_value + + pos = match.end() + + if pos < len(raw): + remaining_text = raw[pos:].strip() + if remaining_text: + remaining_parts.append(remaining_text) + + result["text"] = " ".join(remaining_parts) + return result + + +def get_field( + parsed_query: Dict[str, Any], field_name: str, default: Optional[str] = None +) -> Optional[str]: + """Get a field value from a parsed query.""" + + return parsed_query.get("fields", {}).get((field_name or "").lower(), default) + + +def get_free_text(parsed_query: Dict[str, Any]) -> str: + """Get the free-text portion of a parsed query.""" + + return str(parsed_query.get("text", "") or "") diff --git a/SYS/cmdlet_catalog.py b/SYS/cmdlet_catalog.py new file mode 100644 index 0000000..92ce848 --- /dev/null +++ b/SYS/cmdlet_catalog.py @@ -0,0 +1,250 @@ +from __future__ import annotations + +from importlib import import_module +from typing import Any, Dict, List, Optional + +try: + from .config import get_local_storage_path +except Exception: + get_local_storage_path = None # type: ignore + + +def _should_hide_db_args(config: Optional[Dict[str, Any]]) -> bool: + """Return True when the library root/local DB is not configured.""" + if not isinstance(config, dict): + return False + if get_local_storage_path is None: + return False + try: + return not bool(get_local_storage_path(config)) + except Exception: + return False + + +try: + from cmdlet import REGISTRY +except Exception: + REGISTRY = {} # type: ignore + +try: + from cmdnat import register_native_commands as _register_native_commands +except Exception: + _register_native_commands = None + + +def ensure_registry_loaded() -> None: + """Ensure native commands are registered into REGISTRY (idempotent).""" + if _register_native_commands and REGISTRY is not None: + try: + _register_native_commands(REGISTRY) + except Exception: + pass + + +def _normalize_mod_name(mod_name: str) -> str: + """Normalize a command/module name for import resolution.""" + normalized = (mod_name or "").strip() + if normalized.startswith("."): + normalized = normalized.lstrip(".") + normalized = normalized.replace("-", "_") + return normalized + + +def import_cmd_module(mod_name: str): + """Import a cmdlet/native module from cmdnat or cmdlet packages.""" + normalized = _normalize_mod_name(mod_name) + if not normalized: + return None + for package in ("cmdnat", "cmdlet", None): + try: + qualified = f"{package}.{normalized}" if package else normalized + return import_module(qualified) + except ModuleNotFoundError: + continue + except Exception: + continue + return None + + +def _normalize_arg(arg: Any) -> Dict[str, Any]: + """Convert a CmdletArg/dict into a plain metadata dict.""" + if isinstance(arg, dict): + name = arg.get("name", "") + return { + "name": str(name).lstrip("-"), + "type": arg.get("type", "string"), + "required": bool(arg.get("required", False)), + "description": arg.get("description", ""), + "choices": arg.get("choices", []) or [], + "alias": arg.get("alias", ""), + "variadic": arg.get("variadic", False), + "requires_db": bool(arg.get("requires_db", False)), + } + + name = getattr(arg, "name", "") or "" + return { + "name": str(name).lstrip("-"), + "type": getattr(arg, "type", "string"), + "required": bool(getattr(arg, "required", False)), + "description": getattr(arg, "description", ""), + "choices": getattr(arg, "choices", []) or [], + "alias": getattr(arg, "alias", ""), + "variadic": getattr(arg, "variadic", False), + "requires_db": bool(getattr(arg, "requires_db", False)), + } + + +def get_cmdlet_metadata( + cmd_name: str, config: Optional[Dict[str, Any]] = None +) -> Optional[Dict[str, Any]]: + """Return normalized metadata for a cmdlet, if available (aliases supported).""" + ensure_registry_loaded() + normalized = cmd_name.replace("-", "_") + mod = import_cmd_module(normalized) + data = getattr(mod, "CMDLET", None) if mod else None + + if data is None: + try: + reg_fn = (REGISTRY or {}).get(cmd_name.replace("_", "-").lower()) + if reg_fn: + owner_mod = getattr(reg_fn, "__module__", "") + if owner_mod: + owner = import_module(owner_mod) + data = getattr(owner, "CMDLET", None) + except Exception: + data = None + + if not data: + return None + + if hasattr(data, "to_dict"): + base = data.to_dict() + elif isinstance(data, dict): + base = data + else: + base = {} + + name = getattr(data, "name", base.get("name", cmd_name)) or cmd_name + aliases = getattr(data, "alias", base.get("alias", [])) or [] + usage = getattr(data, "usage", base.get("usage", "")) + summary = getattr(data, "summary", base.get("summary", "")) + details = getattr(data, "detail", base.get("detail", [])) or [] + args_list = getattr(data, "arg", base.get("arg", [])) or [] + args = [_normalize_arg(arg) for arg in args_list] + + if _should_hide_db_args(config): + args = [a for a in args if not a.get("requires_db")] + + return { + "name": str(name).replace("_", "-").lower(), + "aliases": [str(a).replace("_", "-").lower() for a in aliases if a], + "usage": usage, + "summary": summary, + "details": details, + "args": args, + "raw": data, + } + + +def list_cmdlet_metadata(config: Optional[Dict[str, Any]] = None) -> Dict[str, Dict[str, Any]]: + """Collect metadata for all registered cmdlet keyed by canonical name.""" + ensure_registry_loaded() + entries: Dict[str, Dict[str, Any]] = {} + for reg_name in (REGISTRY or {}).keys(): + meta = get_cmdlet_metadata(reg_name, config=config) + canonical = str(reg_name).replace("_", "-").lower() + + if meta: + canonical = meta.get("name", canonical) + aliases = meta.get("aliases", []) + base = entries.get( + canonical, + { + "name": canonical, + "aliases": [], + "usage": "", + "summary": "", + "details": [], + "args": [], + "raw": meta.get("raw"), + }, + ) + merged_aliases = set(base.get("aliases", [])) | set(aliases) + if canonical != reg_name: + merged_aliases.add(reg_name) + base["aliases"] = sorted(a for a in merged_aliases if a and a != canonical) + if not base.get("usage") and meta.get("usage"): + base["usage"] = meta["usage"] + if not base.get("summary") and meta.get("summary"): + base["summary"] = meta["summary"] + if not base.get("details") and meta.get("details"): + base["details"] = meta["details"] + if not base.get("args") and meta.get("args"): + base["args"] = meta["args"] + if not base.get("raw"): + base["raw"] = meta.get("raw") + entries[canonical] = base + else: + entries.setdefault( + canonical, + { + "name": canonical, + "aliases": [], + "usage": "", + "summary": "", + "details": [], + "args": [], + "raw": None, + }, + ) + return entries + + +def list_cmdlet_names( + include_aliases: bool = True, config: Optional[Dict[str, Any]] = None +) -> List[str]: + """Return sorted cmdlet names (optionally including aliases).""" + ensure_registry_loaded() + entries = list_cmdlet_metadata(config=config) + names = set() + for meta in entries.values(): + names.add(meta.get("name", "")) + if include_aliases: + for alias in meta.get("aliases", []): + names.add(alias) + return sorted(n for n in names if n) + + +def get_cmdlet_arg_flags(cmd_name: str, config: Optional[Dict[str, Any]] = None) -> List[str]: + """Return flag variants for cmdlet arguments (e.g., -name/--name).""" + meta = get_cmdlet_metadata(cmd_name, config=config) + if not meta: + return [] + + flags: List[str] = [] + seen: set[str] = set() + + for arg in meta.get("args", []): + name = str(arg.get("name") or "").strip().lstrip("-") + if not name: + continue + for candidate in (f"-{name}", f"--{name}"): + if candidate not in seen: + flags.append(candidate) + seen.add(candidate) + + return flags + + +def get_cmdlet_arg_choices( + cmd_name: str, arg_name: str, config: Optional[Dict[str, Any]] = None +) -> List[str]: + """Return declared choices for a cmdlet argument.""" + meta = get_cmdlet_metadata(cmd_name, config=config) + if not meta: + return [] + target = arg_name.lstrip("-") + for arg in meta.get("args", []): + if arg.get("name") == target: + return list(arg.get("choices", []) or []) + return [] diff --git a/SYS/config.py b/SYS/config.py new file mode 100644 index 0000000..413fd95 --- /dev/null +++ b/SYS/config.py @@ -0,0 +1,659 @@ +""" """ + +from __future__ import annotations + +import re +from pathlib import Path +from typing import Any, Dict, Optional +from SYS.logger import log + +DEFAULT_CONFIG_FILENAME = "config.conf" +SCRIPT_DIR = Path(__file__).resolve().parent + +_CONFIG_CACHE: Dict[str, Dict[str, Any]] = {} + + +def _strip_inline_comment(line: str) -> str: + # Keep it simple: only strip full-line comments and inline comments that start after whitespace. + # Users can always quote values that contain '#' or ';'. + stripped = line.strip() + if not stripped: + return "" + if stripped.startswith("#") or stripped.startswith(";"): + return "" + return line + + +def _parse_scalar(value: str) -> Any: + v = value.strip() + if not v: + return "" + + if (v.startswith('"') and v.endswith('"')) or (v.startswith("'") and v.endswith("'")): + return v[1:-1] + + low = v.lower() + if low in {"true", "yes", "on", "1"}: + return True + if low in {"false", "no", "off", "0"}: + return False + + if re.fullmatch(r"-?\d+", v): + try: + return int(v) + except Exception: + return v + if re.fullmatch(r"-?\d+\.\d+", v): + try: + return float(v) + except Exception: + return v + + return v + + +def _set_nested(d: Dict[str, Any], dotted_key: str, value: Any) -> None: + parts = [p for p in dotted_key.split(".") if p] + if not parts: + return + cur: Dict[str, Any] = d + for p in parts[:-1]: + nxt = cur.get(p) + if not isinstance(nxt, dict): + nxt = {} + cur[p] = nxt + cur = nxt + cur[parts[-1]] = value + + +def _merge_dict_inplace(base: Dict[str, Any], patch: Dict[str, Any]) -> Dict[str, Any]: + for k, v in patch.items(): + if isinstance(v, dict) and isinstance(base.get(k), dict): + _merge_dict_inplace(base[k], v) # type: ignore[index] + else: + base[k] = v + return base + + +def _apply_conf_block( + config: Dict[str, Any], kind: str, subtype: str, block: Dict[str, Any] +) -> None: + kind_l = str(kind).strip().lower() + subtype_l = str(subtype).strip().lower() + + if kind_l == "store": + # Store instances are keyed by NAME (preferred). If a block uses `name=...`, + # normalize it into NAME to keep a single canonical key. + name = block.get("NAME") + if not name: + name = block.get("name") + if name: + block = dict(block) + block.pop("name", None) + block["NAME"] = name + + if not name: + return + + name_l = str(name).strip().lower() + payload = dict(block) + store = config.setdefault("store", {}) + if not isinstance(store, dict): + config["store"] = {} + store = config["store"] + bucket = store.setdefault(subtype_l, {}) + if not isinstance(bucket, dict): + store[subtype_l] = {} + bucket = store[subtype_l] + existing = bucket.get(name_l) + if isinstance(existing, dict): + _merge_dict_inplace(existing, payload) + else: + bucket[name_l] = payload + return + + if kind_l == "provider": + provider_name = str(subtype).strip().lower() + provider = config.setdefault("provider", {}) + if not isinstance(provider, dict): + config["provider"] = {} + provider = config["provider"] + existing = provider.get(provider_name) + if isinstance(existing, dict): + _merge_dict_inplace(existing, block) + else: + provider[provider_name] = dict(block) + return + + if kind_l == "tool": + tool_name = str(subtype).strip().lower() + if not tool_name: + return + tool = config.setdefault("tool", {}) + if not isinstance(tool, dict): + config["tool"] = {} + tool = config["tool"] + existing = tool.get(tool_name) + if isinstance(existing, dict): + _merge_dict_inplace(existing, block) + else: + tool[tool_name] = dict(block) + return + + +def parse_conf_text(text: str, *, base: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: + """Parse a lightweight .conf format into the app's config dict. + + Supported patterns: + - Top-level key/value: temp="C:\\Users\\Me\\Downloads" + - Sections: [store=folder] + name/path lines + - Sections: [store=hydrusnetwork] + name/access key/url lines + - Sections: [provider=OpenLibrary] + email/password lines + - Dotted keys: store.folder.default.path="C:\\Media" (optional) + """ + config: Dict[str, Any] = dict(base or {}) + + current_kind: Optional[str] = None + current_subtype: Optional[str] = None + current_block: Dict[str, Any] = {} + + def flush() -> None: + nonlocal current_kind, current_subtype, current_block + if current_kind and current_subtype and current_block: + _apply_conf_block(config, current_kind, current_subtype, current_block) + current_kind = None + current_subtype = None + current_block = {} + + for raw_line in text.splitlines(): + line = _strip_inline_comment(raw_line) + if not line.strip(): + continue + + stripped = line.strip() + if stripped.startswith("[") and stripped.endswith("]"): + flush() + header = stripped[1:-1].strip() + if "=" in header: + k, v = header.split("=", 1) + current_kind = k.strip() + current_subtype = v.strip() + else: + # Unknown header style; ignore block + current_kind = None + current_subtype = None + continue + + if "=" not in stripped: + continue + + key, value = stripped.split("=", 1) + key = key.strip() + parsed_val = _parse_scalar(value) + + if current_kind and current_subtype: + current_block[key] = parsed_val + else: + if "." in key: + _set_nested(config, key, parsed_val) + else: + config[key] = parsed_val + + flush() + return config + + +def _load_conf_config(base_dir: Path, config_path: Path) -> Dict[str, Any]: + config: Dict[str, Any] = {} + raw = config_path.read_text(encoding="utf-8") + config = parse_conf_text(raw, base=config) + + conf_dir = base_dir / "config.d" + if conf_dir.exists() and conf_dir.is_dir(): + for frag in sorted(conf_dir.glob("*.conf")): + try: + frag_raw = frag.read_text(encoding="utf-8") + config = parse_conf_text(frag_raw, base=config) + except OSError as exc: + log(f"Failed to read {frag}: {exc}") + + return config + + +def _format_conf_value(val: Any) -> str: + if isinstance(val, bool): + return "true" if val else "false" + if isinstance(val, (int, float)): + return str(val) + if val is None: + return '""' + s = str(val) + s = s.replace('"', '\\"') + return f'"{s}"' + + +def _serialize_conf(config: Dict[str, Any]) -> str: + lines: list[str] = [] + + # Top-level scalars first + for key in sorted(config.keys()): + if key in {"store", "provider", "tool"}: + continue + value = config.get(key) + if isinstance(value, dict): + continue + lines.append(f"{key}={_format_conf_value(value)}") + + # Store blocks + store = config.get("store") + if isinstance(store, dict): + for subtype in sorted(store.keys()): + bucket = store.get(subtype) + if not isinstance(bucket, dict): + continue + for name in sorted(bucket.keys()): + block = bucket.get(name) + if not isinstance(block, dict): + continue + lines.append("") + lines.append(f"[store={subtype}]") + lines.append(f"name={_format_conf_value(name)}") + for k in sorted(block.keys()): + lines.append(f"{k}={_format_conf_value(block.get(k))}") + + # Provider blocks + provider = config.get("provider") + if isinstance(provider, dict): + for prov in sorted(provider.keys()): + block = provider.get(prov) + if not isinstance(block, dict): + continue + lines.append("") + lines.append(f"[provider={prov}]") + for k in sorted(block.keys()): + lines.append(f"{k}={_format_conf_value(block.get(k))}") + + # Tool blocks + tool = config.get("tool") + if isinstance(tool, dict): + for name in sorted(tool.keys()): + block = tool.get(name) + if not isinstance(block, dict): + continue + lines.append("") + lines.append(f"[tool={name}]") + for k in sorted(block.keys()): + lines.append(f"{k}={_format_conf_value(block.get(k))}") + + return "\n".join(lines).rstrip() + "\n" + + +def _make_cache_key(config_dir: Optional[Path], filename: str, actual_path: Optional[Path]) -> str: + if actual_path: + return str(actual_path.resolve()) + base_dir = config_dir or SCRIPT_DIR + return str((base_dir / filename).resolve()) + + +def get_hydrus_instance( + config: Dict[str, Any], instance_name: str = "home" +) -> Optional[Dict[str, Any]]: + """Get a specific Hydrus instance config by name. + + Supports multiple formats: + - Current: config["store"]["hydrusnetwork"][instance_name] + - Legacy: config["storage"]["hydrus"][instance_name] + - Old: config["HydrusNetwork"][instance_name] + + Args: + config: Configuration dict + instance_name: Name of the Hydrus instance (default: "home") + + Returns: + Dict with access key and URL, or None if not found + """ + # Canonical: config["store"]["hydrusnetwork"]["home"] + store = config.get("store", {}) + if isinstance(store, dict): + hydrusnetwork = store.get("hydrusnetwork", {}) + if isinstance(hydrusnetwork, dict): + instance = hydrusnetwork.get(instance_name) + if isinstance(instance, dict): + return instance + return None + + +def get_hydrus_access_key(config: Dict[str, Any], instance_name: str = "home") -> Optional[str]: + """Get Hydrus access key for an instance. + + Config format: + - config["store"]["hydrusnetwork"][name]["API"] + + Args: + config: Configuration dict + instance_name: Name of the Hydrus instance (default: "home") + + Returns: + Access key string, or None if not found + """ + instance = get_hydrus_instance(config, instance_name) + if instance: + key = instance.get("API") + return str(key).strip() if key else None + + return None + + +def get_hydrus_url(config: Dict[str, Any], instance_name: str = "home") -> Optional[str]: + """Get Hydrus URL for an instance. + + Config format: + - config["store"]["hydrusnetwork"][name]["URL"] + + Args: + config: Configuration dict + instance_name: Name of the Hydrus instance (default: "home") + + Returns: + URL string, or None if not found + """ + instance = get_hydrus_instance(config, instance_name) + url = instance.get("URL") if instance else None + return str(url).strip() if url else None + + +def get_provider_block(config: Dict[str, Any], name: str) -> Dict[str, Any]: + provider_cfg = config.get("provider") + if not isinstance(provider_cfg, dict): + return {} + block = provider_cfg.get(str(name).strip().lower()) + return block if isinstance(block, dict) else {} + + +def get_soulseek_username(config: Dict[str, Any]) -> Optional[str]: + block = get_provider_block(config, "soulseek") + val = block.get("username") or block.get("USERNAME") + return str(val).strip() if val else None + + +def get_soulseek_password(config: Dict[str, Any]) -> Optional[str]: + block = get_provider_block(config, "soulseek") + val = block.get("password") or block.get("PASSWORD") + return str(val).strip() if val else None + + +def resolve_output_dir(config: Dict[str, Any]) -> Path: + """Resolve output directory from config with single source of truth. + + Priority: + 1. config["temp"] - explicitly set temp/output directory + 2. config["outfile"] - fallback to outfile setting + 3. Home/Videos - safe user directory fallback + + Returns: + Path to output directory + """ + # First try explicit temp setting from config + temp_value = config.get("temp") + if temp_value: + try: + path = Path(str(temp_value)).expanduser() + # Verify we can access it (not a system directory with permission issues) + if path.exists() or path.parent.exists(): + return path + except Exception: + pass + + # Then try outfile setting + outfile_value = config.get("outfile") + if outfile_value: + try: + return Path(str(outfile_value)).expanduser() + except Exception: + pass + + # Fallback to user's Videos directory + return Path.home() / "Videos" + + +def get_local_storage_path(config: Dict[str, Any]) -> Optional[Path]: + """Get local storage path from config. + + Supports multiple formats: + - New: config["store"]["folder"]["default"]["path"] + - Old: config["storage"]["local"]["path"] + - Old: config["Local"]["path"] + + Args: + config: Configuration dict + + Returns: + Path object if found, None otherwise + """ + # Try new format first: store.folder.default.path + store = config.get("store", {}) + if isinstance(store, dict): + folder_config = store.get("folder", {}) + if isinstance(folder_config, dict): + default_config = folder_config.get("default", {}) + if isinstance(default_config, dict): + path_str = default_config.get("path") + if path_str: + return Path(str(path_str)).expanduser() + + # Fall back to storage.local.path format + storage = config.get("storage", {}) + if isinstance(storage, dict): + local_config = storage.get("local", {}) + if isinstance(local_config, dict): + path_str = local_config.get("path") + if path_str: + return Path(str(path_str)).expanduser() + + # Fall back to old Local format + local_config = config.get("Local", {}) + if isinstance(local_config, dict): + path_str = local_config.get("path") + if path_str: + return Path(str(path_str)).expanduser() + + return None + + +def get_debrid_api_key(config: Dict[str, Any], service: str = "All-debrid") -> Optional[str]: + """Get Debrid API key from config. + + Config format: + - config["store"]["debrid"][<name>]["api_key"] + where <name> is the store name (e.g. "all-debrid") + + Args: + config: Configuration dict + service: Service name (default: "All-debrid") + + Returns: + API key string if found, None otherwise + """ + store = config.get("store", {}) + if not isinstance(store, dict): + return None + + debrid_config = store.get("debrid", {}) + if not isinstance(debrid_config, dict): + return None + + service_key = str(service).strip().lower() + entry = debrid_config.get(service_key) + + if isinstance(entry, dict): + api_key = entry.get("api_key") + return str(api_key).strip() if api_key else None + + if isinstance(entry, str): + return entry.strip() or None + + return None + + +def get_provider_credentials(config: Dict[str, Any], provider: str) -> Optional[Dict[str, str]]: + """Get provider credentials (email/password) from config. + + Supports both formats: + - New: config["provider"][provider] = {"email": "...", "password": "..."} + - Old: config[provider.capitalize()] = {"email": "...", "password": "..."} + + Args: + config: Configuration dict + provider: Provider name (e.g., "openlibrary", "soulseek") + + Returns: + Dict with credentials if found, None otherwise + """ + # Try new format first + provider_config = config.get("provider", {}) + if isinstance(provider_config, dict): + creds = provider_config.get(provider.lower(), {}) + if isinstance(creds, dict) and creds: + return creds + + # Fall back to old format (capitalized key) + old_key_map = { + "openlibrary": "OpenLibrary", + "archive": "Archive", + "soulseek": "Soulseek", + } + old_key = old_key_map.get(provider.lower()) + if old_key: + creds = config.get(old_key, {}) + if isinstance(creds, dict) and creds: + return creds + + return None + + +def resolve_cookies_path( + config: Dict[str, Any], script_dir: Optional[Path] = None +) -> Optional[Path]: + # Support both legacy top-level `cookies=...` and the modular conf style: + # [tool=ytdlp] + # cookies="C:\\path\\cookies.txt" + values: list[Any] = [] + try: + values.append(config.get("cookies")) + except Exception: + pass + + try: + tool = config.get("tool") + if isinstance(tool, dict): + ytdlp = tool.get("ytdlp") + if isinstance(ytdlp, dict): + values.append(ytdlp.get("cookies")) + values.append(ytdlp.get("cookiefile")) + except Exception: + pass + + try: + ytdlp_block = config.get("ytdlp") + if isinstance(ytdlp_block, dict): + values.append(ytdlp_block.get("cookies")) + values.append(ytdlp_block.get("cookiefile")) + except Exception: + pass + + base_dir = script_dir or SCRIPT_DIR + for value in values: + if not value: + continue + candidate = Path(str(value)).expanduser() + if not candidate.is_absolute(): + candidate = (base_dir / candidate).expanduser() + if candidate.is_file(): + return candidate + + default_path = base_dir / "cookies.txt" + if default_path.is_file(): + return default_path + return None + + +def resolve_debug_log(config: Dict[str, Any]) -> Optional[Path]: + value = config.get("download_debug_log") + if not value: + return None + path = Path(str(value)).expanduser() + if not path.is_absolute(): + path = Path.cwd() / path + return path + + +def load_config( + config_dir: Optional[Path] = None, filename: str = DEFAULT_CONFIG_FILENAME +) -> Dict[str, Any]: + base_dir = config_dir or SCRIPT_DIR + config_path = base_dir / filename + cache_key = _make_cache_key(config_dir, filename, config_path) + if cache_key in _CONFIG_CACHE: + return _CONFIG_CACHE[cache_key] + + if config_path.suffix.lower() != ".conf": + log(f"Unsupported config format: {config_path.name} (only .conf is supported)") + _CONFIG_CACHE[cache_key] = {} + return {} + + try: + data = _load_conf_config(base_dir, config_path) + except FileNotFoundError: + _CONFIG_CACHE[cache_key] = {} + return {} + except OSError as exc: + log(f"Failed to read {config_path}: {exc}") + _CONFIG_CACHE[cache_key] = {} + return {} + + _CONFIG_CACHE[cache_key] = data + return data + + +def reload_config( + config_dir: Optional[Path] = None, filename: str = DEFAULT_CONFIG_FILENAME +) -> Dict[str, Any]: + cache_key = _make_cache_key(config_dir, filename, None) + _CONFIG_CACHE.pop(cache_key, None) + return load_config(config_dir=config_dir, filename=filename) + + +def clear_config_cache() -> None: + _CONFIG_CACHE.clear() + + +def save_config( + config: Dict[str, Any], + config_dir: Optional[Path] = None, + filename: str = DEFAULT_CONFIG_FILENAME, +) -> None: + base_dir = config_dir or SCRIPT_DIR + config_path = base_dir / filename + + if config_path.suffix.lower() != ".conf": + raise RuntimeError( + f"Unsupported config format: {config_path.name} (only .conf is supported)" + ) + + try: + config_path.write_text(_serialize_conf(config), encoding="utf-8") + except OSError as exc: + raise RuntimeError(f"Failed to write config to {config_path}: {exc}") from exc + + cache_key = _make_cache_key(config_dir, filename, config_path) + _CONFIG_CACHE[cache_key] = config + + +def load() -> Dict[str, Any]: + """Return the parsed downlow configuration.""" + return load_config() + + +def save(config: Dict[str, Any]) -> None: + """Persist *config* back to disk.""" + save_config(config)