import datetime import sqlite3 from pathlib import Path from typing import List, Dict, Any, Optional, Sequence from SYS.cmdlet_spec import Cmdlet, CmdletArg from SYS.config import ( load_config, save_config, save_config_and_verify, set_nested_config_value, ) from SYS.database import LOG_DB_PATH, db from SYS.logger import log from SYS import pipeline as ctx from SYS.result_table import Table from cmdnat._parsing import ( extract_piped_value as _extract_piped_value, extract_value_arg as _extract_value_arg, has_flag as _has_flag, ) CMDLET = Cmdlet( name=".config", summary="Manage configuration settings", usage=".config [key] [value] | .config -log [count]", arg=[ CmdletArg( name="key", description="Configuration key to update (dot-separated)", required=False ), CmdletArg( name="value", description="New value for the configuration key", required=False ), ], ) def _extract_log_limit(args: Sequence[str], default: int = 30) -> int: try: tokens = [str(arg).strip() for arg in (args or []) if str(arg).strip()] except Exception: return default for idx, token in enumerate(tokens): lowered = token.lower() if lowered in {"-log", "--log"}: if idx + 1 < len(tokens): candidate = tokens[idx + 1] if candidate and not candidate.startswith("-"): try: return max(1, min(200, int(candidate))) except Exception: return default return default if lowered.startswith("-log=") or lowered.startswith("--log="): _, value = lowered.split("=", 1) try: return max(1, min(200, int(value))) except Exception: return default return default def _fallback_log_path() -> Path: return Path(db.db_path).with_name("logs") / "log_fallback.txt" def _load_recent_config_logs(limit: int = 30) -> List[Dict[str, str]]: rows: List[Dict[str, str]] = [] sql = """ SELECT timestamp, level, module, message FROM logs WHERE lower(module) LIKE ? OR lower(message) LIKE ? OR lower(message) LIKE ? OR lower(message) LIKE ? ORDER BY id DESC LIMIT ? """ params = ( "%config%", "%config%", "%save failed%", "%saving configuration failed%", int(limit), ) try: with sqlite3.connect(str(LOG_DB_PATH), timeout=5.0) as conn: conn.row_factory = sqlite3.Row cur = conn.cursor() cur.execute(sql, params) fetched = cur.fetchall() cur.close() for row in fetched: rows.append( { "timestamp": str(row["timestamp"] or ""), "level": str(row["level"] or ""), "module": str(row["module"] or ""), "message": str(row["message"] or ""), } ) except Exception: rows = [] if rows: return rows fallback = _fallback_log_path() try: if not fallback.exists(): return [] lines = fallback.read_text(encoding="utf-8", errors="replace").splitlines() matches = [ line for line in lines if any(term in line.lower() for term in ("config", "save failed", "saving configuration failed")) ] for line in reversed(matches[-limit:]): rows.append( { "timestamp": "", "level": "FALLBACK", "module": "fallback", "message": line, } ) except Exception: return [] return rows def _format_log_timestamp_local(raw_value: str) -> str: text = str(raw_value or "").strip() if not text: return "" for pattern in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M:%S.%f"): try: parsed = datetime.datetime.strptime(text, pattern).replace(tzinfo=datetime.timezone.utc) return parsed.astimezone().strftime("%Y-%m-%d %H:%M:%S") except Exception: continue return text def _show_config_logs(args: Sequence[str]) -> int: limit = _extract_log_limit(args) rows = _load_recent_config_logs(limit=limit) if not rows: print( f"No recent config/save logs found in {LOG_DB_PATH.name} or {_fallback_log_path().name}." ) return 0 table = Table("Configuration Logs") table.set_table("config.logs") table.set_source_command(".config", ["-log", str(limit)]) for row_data in rows: row = table.add_row() row.add_column("Time (local)", _format_log_timestamp_local(row_data.get("timestamp", ""))) row.add_column("Level", row_data.get("level", "")) row.add_column("Module", row_data.get("module", "")) row.add_column("Message", row_data.get("message", "")) ctx.set_last_result_table_overlay(table, rows) ctx.set_current_stage_table(table) print(f"Showing {len(rows)} recent configuration log entries.") return 0 def flatten_config(config: Dict[str, Any], parent_key: str = "", sep: str = ".") -> List[Dict[str, Any]]: items: List[Dict[str, Any]] = [] for k, v in config.items(): if k.startswith("_"): continue new_key = f"{parent_key}{sep}{k}" if parent_key else k if isinstance(v, dict): items.extend(flatten_config(v, new_key, sep=sep)) else: items.append({ "key": new_key, "value": v, "value_display": str(v), "type": type(v).__name__, }) return items def set_nested_config(config: Dict[str, Any], key: str, value: str) -> bool: return set_nested_config_value(config, key, value, on_error=print) def _get_selected_config_key() -> Optional[str]: try: indices = ctx.get_last_selection() or [] except Exception: indices = [] try: items = ctx.get_last_result_items() or [] except Exception: items = [] if not indices or not items: return None idx = indices[0] if idx < 0 or idx >= len(items): return None item = items[idx] if isinstance(item, dict): return item.get("key") return getattr(item, "key", None) def _show_config_table(config_data: Dict[str, Any]) -> int: items = flatten_config(config_data) if not items: print("No configuration entries available.") return 0 items.sort(key=lambda x: x.get("key")) table = Table("Configuration") table.set_table("config") table.set_source_command(".config", []) for item in items: row = table.add_row() row.add_column("Key", item.get("key", "")) row.add_column("Value", item.get("value_display", "")) row.add_column("Type", item.get("type", "")) ctx.set_last_result_table_overlay(table, items) ctx.set_current_stage_table(table) return 0 def _strip_value_quotes(value: str) -> str: if not value: return value if (value.startswith('"') and value.endswith('"')) or (value.startswith("'") and value.endswith("'")): return value[1:-1] return value def _run(piped_result: Any, args: List[str], config: Dict[str, Any]) -> int: import sys if _has_flag(args, "-log") or _has_flag(args, "--log"): return _show_config_logs(args) # Load configuration from the database current_config = load_config() selection_key = _get_selected_config_key() value_from_args = _extract_value_arg(args) if selection_key else None value_from_pipe = _extract_piped_value(piped_result) if selection_key: new_value = value_from_pipe or value_from_args if not new_value: print( "Provide a new value via pipe or argument: @N | .config " ) return 1 new_value = _strip_value_quotes(new_value) try: set_nested_config(current_config, selection_key, new_value) # For AllDebrid API changes, use the verified save path to ensure # the new API key persisted to disk; otherwise fall back to normal save. try: key_l = str(selection_key or "").lower() except Exception: key_l = "" if "alldebrid" in key_l or "all-debrid" in key_l: try: save_config_and_verify(current_config) except Exception as exc: log(f"Configuration save verification failed for '{selection_key}': {exc}") print(f"Error saving configuration (verification failed): {exc}") return 1 else: save_config(current_config) print(f"Updated '{selection_key}' to '{new_value}'") return 0 except Exception as exc: log(f"Error updating config '{selection_key}': {exc}") print(f"Error updating config: {exc}") return 1 if not args: # Check if we're in an interactive terminal and can launch a Textual modal if sys.stdin.isatty() and not piped_result: try: from textual.app import App from TUI.modalscreen.config_modal import ConfigModal class ConfigApp(App): def on_mount(self) -> None: self.title = "Config Editor" # We push the modal screen. It will sit on top of the main (blank) screen. # Using a callback to exit the app when the modal is dismissed. self.push_screen(ConfigModal(), callback=self.exit_on_close) def exit_on_close(self, result: Any = None) -> None: self.exit() with ctx.suspend_live_progress(): app = ConfigApp() app.run() # After modal exits, show the new status table if possible try: from cmdlet._shared import SharedArgs from cmdnat.status import CMDLET as STATUS_CMDLET # We reload the config one more time because it might have changed on disk fresh_config = load_config() # Force refresh of shared caches (especially stores) SharedArgs._refresh_store_choices_cache(fresh_config) # Update the global SharedArgs choices so cmdlets pick up new stores SharedArgs.STORE.choices = SharedArgs.get_store_choices(fresh_config, force=True) return STATUS_CMDLET.exec(None, [], fresh_config) except Exception: pass return 0 except Exception as exc: # Fall back to table display if Textual modal fails print(f"Note: Could not launch interactive editor ({exc}). Showing configuration table:") return _show_config_table(current_config) return _show_config_table(current_config) key = args[0] if len(args) < 2: print(f"Error: Value required for key '{key}'") return 1 value = _strip_value_quotes(" ".join(args[1:])) try: set_nested_config(current_config, key, value) save_config(current_config) print(f"Updated '{key}' to '{value}'") return 0 except Exception as exc: log(f"Error updating config '{key}': {exc}") print(f"Error updating config: {exc}") return 1 CMDLET.exec = _run