import datetime import sqlite3 from pathlib import Path from typing import List, Dict, Any, Optional, Sequence from SYS.cmdlet_spec import Cmdlet, CmdletArg from SYS.config import ( load_config, save_config, save_config_and_verify, set_nested_config_value, ) from SYS.database import LOG_DB_PATH, db from SYS.logger import log from SYS import pipeline as ctx from SYS.result_table import Table from cmdnat._parsing import ( VALUE_ARG_FLAGS, extract_piped_value as _extract_piped_value, extract_arg_value as _extract_arg_value, extract_value_arg as _extract_value_arg, has_flag as _has_flag, ) _PREFERENCES_BROWSE_PATH = "__preferences__" _PLUGINS_BROWSE_PATH = "__plugins__" _PLUGIN_CATEGORY_KEYS = ("plugin", "provider", "tool") _KNOWN_SECTION_LABELS = { "plugin": "Plugins", "provider": "Plugins", "tool": "Plugins", } _KNOWN_SECTION_DESCRIPTIONS = { _PREFERENCES_BROWSE_PATH: "Global preferences and simple values", _PLUGINS_BROWSE_PATH: "All configured plugins and plugin instances", "provider": "Plugin configuration", "plugin": "Plugin configuration", "tool": "Plugin configuration", } _SENSITIVE_CONFIG_KEYS = { "access_key", "access_token", "api", "api_key", "apikey", "authorization", "bearer_token", "cookie", "cookies", "password", "secret", "token", } CMDLET = Cmdlet( name=".config", summary="Manage configuration settings", usage=".config [key] [value] | .config -log [count]", arg=[ CmdletArg( name="key", description="Configuration key to update (dot-separated)", required=False ), CmdletArg( name="value", description="New value for the configuration key", required=False ), ], ) def _extract_log_limit(args: Sequence[str], default: int = 30) -> int: try: tokens = [str(arg).strip() for arg in (args or []) if str(arg).strip()] except Exception: return default for idx, token in enumerate(tokens): lowered = token.lower() if lowered in {"-log", "--log"}: if idx + 1 < len(tokens): candidate = tokens[idx + 1] if candidate and not candidate.startswith("-"): try: return max(1, min(200, int(candidate))) except Exception: return default return default if lowered.startswith("-log=") or lowered.startswith("--log="): _, value = lowered.split("=", 1) try: return max(1, min(200, int(value))) except Exception: return default return default def _fallback_log_path() -> Path: return Path(db.db_path).with_name("logs") / "log_fallback.txt" def _load_recent_config_logs(limit: int = 30) -> List[Dict[str, str]]: rows: List[Dict[str, str]] = [] sql = """ SELECT timestamp, level, module, message FROM logs WHERE lower(module) LIKE ? OR lower(message) LIKE ? OR lower(message) LIKE ? OR lower(message) LIKE ? ORDER BY id DESC LIMIT ? """ params = ( "%config%", "%config%", "%save failed%", "%saving configuration failed%", int(limit), ) try: with sqlite3.connect(str(LOG_DB_PATH), timeout=5.0) as conn: conn.row_factory = sqlite3.Row cur = conn.cursor() cur.execute(sql, params) fetched = cur.fetchall() cur.close() for row in fetched: rows.append( { "timestamp": str(row["timestamp"] or ""), "level": str(row["level"] or ""), "module": str(row["module"] or ""), "message": str(row["message"] or ""), } ) except Exception: rows = [] if rows: return rows fallback = _fallback_log_path() try: if not fallback.exists(): return [] lines = fallback.read_text(encoding="utf-8", errors="replace").splitlines() matches = [ line for line in lines if any(term in line.lower() for term in ("config", "save failed", "saving configuration failed")) ] for line in reversed(matches[-limit:]): rows.append( { "timestamp": "", "level": "FALLBACK", "module": "fallback", "message": line, } ) except Exception: return [] return rows def _format_log_timestamp_local(raw_value: str) -> str: text = str(raw_value or "").strip() if not text: return "" for pattern in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M:%S.%f"): try: parsed = datetime.datetime.strptime(text, pattern).replace(tzinfo=datetime.timezone.utc) return parsed.astimezone().strftime("%Y-%m-%d %H:%M:%S") except Exception: continue return text def _show_config_logs(args: Sequence[str]) -> int: limit = _extract_log_limit(args) rows = _load_recent_config_logs(limit=limit) if not rows: print( f"No recent config/save logs found in {LOG_DB_PATH.name} or {_fallback_log_path().name}." ) return 0 table = Table("Configuration Logs") table.set_table("config.logs") table.set_source_command(".config", ["-log", str(limit)]) for row_data in rows: row = table.add_row() row.add_column("Time (local)", _format_log_timestamp_local(row_data.get("timestamp", ""))) row.add_column("Level", row_data.get("level", "")) row.add_column("Module", row_data.get("module", "")) row.add_column("Message", row_data.get("message", "")) ctx.set_last_result_table_overlay(table, rows) ctx.set_current_stage_table(table) print(f"Showing {len(rows)} recent configuration log entries.") return 0 def set_nested_config(config: Dict[str, Any], key: str, value: str) -> bool: return set_nested_config_value(config, key, value, on_error=print) def _visible_config_entries(config_data: Any) -> List[tuple[str, Any]]: if not isinstance(config_data, dict): return [] return [ (str(key), value) for key, value in config_data.items() if isinstance(key, str) and not key.startswith("_") ] def _format_config_label(value: Any) -> str: text = str(value or "").strip() if not text: return "Configuration" if text == _PREFERENCES_BROWSE_PATH: return "Preferences" if text == _PLUGINS_BROWSE_PATH: return "Plugins" return text.replace("_", " ").replace("-", " ").strip().title() def _format_config_path_label(browse_path: Optional[str]) -> str: text = str(browse_path or "").strip() if not text: return "Root" if text == _PREFERENCES_BROWSE_PATH: return "Preferences" if text == _PLUGINS_BROWSE_PATH: return "Plugins" parts = [part for part in text.split(".") if part] formatted: List[str] = [] for idx, part in enumerate(parts): if idx == 0 and part in _PLUGIN_CATEGORY_KEYS: formatted.append("Plugins") else: formatted.append(_format_config_label(part)) return " / ".join(formatted) def _format_config_value(value: Any) -> str: if isinstance(value, bool): return "true" if value else "false" if value is None: return "null" if isinstance(value, (list, tuple, set)): return ", ".join(str(item) for item in value) return str(value) def _is_sensitive_config_key(key_path: str) -> bool: leaf = str(key_path or "").split(".")[-1].strip().lower() return leaf in _SENSITIVE_CONFIG_KEYS def _format_config_entry_count(value: Any) -> str: count = len(_visible_config_entries(value)) if isinstance(value, dict) else 0 if count == 1: return "1 entry" return f"{count} entries" def _iter_plugin_branches(config_data: Dict[str, Any]) -> List[tuple[str, str, Any]]: branches: List[tuple[str, str, Any]] = [] if not isinstance(config_data, dict): return branches for category in _PLUGIN_CATEGORY_KEYS: category_block = config_data.get(category) if not isinstance(category_block, dict): continue for name, value in _visible_config_entries(category_block): branches.append((category, name, value)) return branches def _collect_plugin_root_items(config_data: Dict[str, Any]) -> List[Dict[str, Any]]: plugin_items: Dict[str, Dict[str, Any]] = {} for category, name, value in _iter_plugin_branches(config_data): key = str(name or "").strip().lower() if not key: continue existing = plugin_items.get(key) if existing is None: plugin_items[key] = { "kind": "section", "title": _format_config_label(name), "browse_path": f"{category}.{name}", "summary": _format_config_entry_count(value), "type": "section", "description": "Plugin configuration", } continue if str(category) == "plugin" and not str(existing.get("browse_path") or "").startswith("plugin."): existing["browse_path"] = f"{category}.{name}" try: current_count = int(str(existing.get("summary") or "0").split()[0]) except Exception: current_count = 0 extra_count = len(_visible_config_entries(value)) if isinstance(value, dict) else 0 merged_count = current_count + extra_count existing["summary"] = "1 entry" if merged_count == 1 else f"{merged_count} entries" return sorted(plugin_items.values(), key=lambda item: str(item.get("title") or "").lower()) def _resolve_config_branch( config_data: Dict[str, Any], browse_path: Optional[str], ) -> Optional[Dict[str, Any]]: text = str(browse_path or "").strip() if not text: return config_data if isinstance(config_data, dict) else None if text == _PREFERENCES_BROWSE_PATH: return { key: value for key, value in _visible_config_entries(config_data) if not isinstance(value, dict) } if text == _PLUGINS_BROWSE_PATH: return { str(item.get("title") or ""): item for item in _collect_plugin_root_items(config_data) } current: Any = config_data for part in text.split("."): if not isinstance(current, dict): return None current = current.get(part) return current if isinstance(current, dict) else None def _build_section_item( *, title: str, browse_path: str, value: Any, description: Optional[str] = None, ) -> Dict[str, Any]: return { "kind": "section", "title": title, "browse_path": browse_path, "summary": _format_config_entry_count(value), "type": "section", "description": str(description or "").strip() or _KNOWN_SECTION_DESCRIPTIONS.get(browse_path, ""), } def _build_value_item( *, key_path: str, name: str, value: Any, ) -> Dict[str, Any]: display_value = "***" if _is_sensitive_config_key(key_path) else _format_config_value(value) path_parts = [part for part in str(key_path or "").split(".") if part] display_path = " / ".join( [_format_config_path_label(".".join(path_parts[:-1]))] if len(path_parts) > 1 else [] + [_format_config_label(path_parts[-1])] if path_parts else [_format_config_label(name)] ) return { "kind": "value", "key": key_path, "name": name, "title": _format_config_label(name), "value": value, "value_display": display_value, "display_path": display_path, "type": type(value).__name__, } def _build_root_config_items(config_data: Dict[str, Any]) -> List[Dict[str, Any]]: items: List[Dict[str, Any]] = [] visible_entries = _visible_config_entries(config_data) preferences = { key: value for key, value in visible_entries if not isinstance(value, dict) } if preferences: items.append( _build_section_item( title="Preferences", browse_path=_PREFERENCES_BROWSE_PATH, value=preferences, ) ) plugin_items = _collect_plugin_root_items(config_data) if plugin_items: items.append( _build_section_item( title="Plugins", browse_path=_PLUGINS_BROWSE_PATH, value={item["title"]: item for item in plugin_items}, ) ) other_sections: List[Dict[str, Any]] = [] for key, value in visible_entries: if key in set(_PLUGIN_CATEGORY_KEYS) or not isinstance(value, dict): continue other_sections.append( _build_section_item( title=_format_config_label(key), browse_path=key, value=value, ) ) other_sections.sort(key=lambda item: str(item.get("title") or "").lower()) items.extend(other_sections) return items def _build_nested_config_items( config_data: Dict[str, Any], browse_path: str, ) -> List[Dict[str, Any]]: if browse_path == _PLUGINS_BROWSE_PATH: return _collect_plugin_root_items(config_data) branch = _resolve_config_branch(config_data, browse_path) if branch is None: return [] section_items: List[Dict[str, Any]] = [] value_items: List[Dict[str, Any]] = [] is_preferences_view = browse_path == _PREFERENCES_BROWSE_PATH for key, value in _visible_config_entries(branch): full_key = key if is_preferences_view else f"{browse_path}.{key}" if isinstance(value, dict): section_items.append( _build_section_item( title=_format_config_label(key), browse_path=full_key, value=value, ) ) else: value_items.append( _build_value_item( key_path=full_key, name=key, value=value, ) ) section_items.sort(key=lambda item: str(item.get("title") or "").lower()) value_items.sort(key=lambda item: str(item.get("name") or "").lower()) return section_items + value_items def _build_config_items( config_data: Dict[str, Any], browse_path: Optional[str] = None, ) -> List[Dict[str, Any]]: text = str(browse_path or "").strip() if not text: return _build_root_config_items(config_data) return _build_nested_config_items(config_data, text) def _build_config_table_title(browse_path: Optional[str]) -> str: text = str(browse_path or "").strip() if not text: return "Configuration" return f"Configuration: {_format_config_path_label(text)}" def _build_config_header_lines(browse_path: Optional[str]) -> List[str]: text = str(browse_path or "").strip() if not text: return [ "Use @N on a section to drill in. Use @.. to go back.", ] return [ f"Path: {_format_config_path_label(text)}", "Use @N on a section to drill in. Use @N | .config to update a setting. Use @.. to go back.", ] def _extract_browse_arg(args: Sequence[str]) -> Optional[str]: return _extract_arg_value(args, flags={"-browse", "--browse"}, allow_positional=False) def _extract_selected_update_value(args: Sequence[str]) -> Optional[str]: explicit = _extract_arg_value(args, flags=VALUE_ARG_FLAGS, allow_positional=False) if explicit is not None: return explicit tokens = [str(arg).strip() for arg in (args or []) if str(arg).strip()] positional = [token for token in tokens if not token.startswith("-")] if len(positional) == 1: return positional[0] return None def _get_selected_config_item() -> Optional[Dict[str, Any]]: try: indices = ctx.get_last_selection() or [] except Exception: indices = [] try: items = ctx.get_last_result_items() or [] except Exception: items = [] if not indices or not items: return None idx = indices[0] if idx < 0 or idx >= len(items): return None item = items[idx] if isinstance(item, dict): return item normalized: Dict[str, Any] = {} for key in ("kind", "key", "title", "browse_path", "name", "value", "value_display", "type"): try: value = getattr(item, key, None) except Exception: value = None if value is not None: normalized[key] = value return normalized or None def _show_config_table( config_data: Dict[str, Any], *, browse_path: Optional[str] = None, ) -> int: items = _build_config_items(config_data, browse_path=browse_path) if not items: path_text = _format_config_path_label(browse_path) print(f"No configuration entries available for {path_text}.") return 0 table = Table(_build_config_table_title(browse_path), preserve_order=True) table.set_table("config") if browse_path: table.set_source_command(".config", ["-browse", str(browse_path)]) else: table.set_source_command(".config", []) table.set_header_lines(_build_config_header_lines(browse_path)) for idx, item in enumerate(items): row = table.add_row() row.add_column("Name", item.get("title", "")) row.add_column("Value", item.get("summary") or item.get("value_display", "")) row.add_column("Type", item.get("type", "")) if item.get("kind") == "section" and item.get("browse_path"): table.set_row_selection_action( idx, [".config", "-browse", str(item.get("browse_path"))], ) ctx.set_last_result_table(table, items) ctx.set_current_stage_table(table) return 0 def _save_updated_config(config_data: Dict[str, Any], key_path: str) -> None: try: key_l = str(key_path or "").lower() except Exception: key_l = "" if "alldebrid" in key_l or "all-debrid" in key_l: save_config_and_verify(config_data) return save_config(config_data) def _resolve_direct_browse_path( config_data: Dict[str, Any], token: str, ) -> Optional[str]: text = str(token or "").strip() if not text: return None lowered = text.lower() if lowered in {"preferences", "prefs"}: return _PREFERENCES_BROWSE_PATH if lowered in {"plugins", "plugin", "providers", "provider", "tools", "tool"}: return _PLUGINS_BROWSE_PATH branch = _resolve_config_branch(config_data, text) if isinstance(branch, dict): return text return None def _strip_value_quotes(value: str) -> str: if not value: return value if (value.startswith('"') and value.endswith('"')) or (value.startswith("'") and value.endswith("'")): return value[1:-1] return value def _run(piped_result: Any, args: List[str], config: Dict[str, Any]) -> int: import sys if _has_flag(args, "-log") or _has_flag(args, "--log"): return _show_config_logs(args) # Load configuration from the database current_config = load_config() browse_path = _extract_browse_arg(args) if browse_path: return _show_config_table(current_config, browse_path=browse_path) selection_item = _get_selected_config_item() value_from_pipe = _extract_piped_value(piped_result) selection_kind = str((selection_item or {}).get("kind") or "").strip().lower() selection_key = str((selection_item or {}).get("key") or "").strip() or None selection_browse_path = str((selection_item or {}).get("browse_path") or "").strip() or None selection_display_path = str((selection_item or {}).get("display_path") or selection_key or "").strip() or selection_key if selection_kind == "section" and selection_browse_path and not args and value_from_pipe is None: return _show_config_table(current_config, browse_path=selection_browse_path) if selection_kind == "value" and selection_key: new_value = value_from_pipe or _extract_selected_update_value(args) if new_value is not None: new_value = _strip_value_quotes(new_value) try: set_nested_config(current_config, selection_key, new_value) _save_updated_config(current_config, selection_key) print(f"Updated '{selection_display_path}' to '{new_value}'") return 0 except Exception as exc: log(f"Error updating config '{selection_key}': {exc}") print(f"Error updating config: {exc}") return 1 if not args: if sys.stdin.isatty() and not piped_result: print( "Interactive TUI config editor has been discontinued. " "Showing configuration table instead." ) return _show_config_table(current_config) key = args[0] if len(args) < 2: browse_target = _resolve_direct_browse_path(current_config, key) if browse_target: return _show_config_table(current_config, browse_path=browse_target) print(f"Error: Value required for key '{key}'") return 1 value = _strip_value_quotes(" ".join(args[1:])) try: set_nested_config(current_config, key, value) _save_updated_config(current_config, key) print(f"Updated '{key}' to '{value}'") return 0 except Exception as exc: log(f"Error updating config '{key}': {exc}") print(f"Error updating config: {exc}") return 1 CMDLET.exec = _run