Files
Medios-Macina/cmdnat/config.py
T

355 lines
12 KiB
Python

import datetime
import sqlite3
from pathlib import Path
from typing import List, Dict, Any, Optional, Sequence
from SYS.cmdlet_spec import Cmdlet, CmdletArg
from SYS.config import (
load_config,
save_config,
save_config_and_verify,
set_nested_config_value,
)
from SYS.database import LOG_DB_PATH, db
from SYS.logger import log
from SYS import pipeline as ctx
from SYS.result_table import Table
from cmdnat._parsing import (
extract_piped_value as _extract_piped_value,
extract_value_arg as _extract_value_arg,
has_flag as _has_flag,
)
CMDLET = Cmdlet(
name=".config",
summary="Manage configuration settings",
usage=".config [key] [value] | .config -log [count]",
arg=[
CmdletArg(
name="key",
description="Configuration key to update (dot-separated)",
required=False
),
CmdletArg(
name="value",
description="New value for the configuration key",
required=False
),
],
)
def _extract_log_limit(args: Sequence[str], default: int = 30) -> int:
try:
tokens = [str(arg).strip() for arg in (args or []) if str(arg).strip()]
except Exception:
return default
for idx, token in enumerate(tokens):
lowered = token.lower()
if lowered in {"-log", "--log"}:
if idx + 1 < len(tokens):
candidate = tokens[idx + 1]
if candidate and not candidate.startswith("-"):
try:
return max(1, min(200, int(candidate)))
except Exception:
return default
return default
if lowered.startswith("-log=") or lowered.startswith("--log="):
_, value = lowered.split("=", 1)
try:
return max(1, min(200, int(value)))
except Exception:
return default
return default
def _fallback_log_path() -> Path:
return Path(db.db_path).with_name("logs") / "log_fallback.txt"
def _load_recent_config_logs(limit: int = 30) -> List[Dict[str, str]]:
rows: List[Dict[str, str]] = []
sql = """
SELECT timestamp, level, module, message
FROM logs
WHERE lower(module) LIKE ?
OR lower(message) LIKE ?
OR lower(message) LIKE ?
OR lower(message) LIKE ?
ORDER BY id DESC
LIMIT ?
"""
params = (
"%config%",
"%config%",
"%save failed%",
"%saving configuration failed%",
int(limit),
)
try:
with sqlite3.connect(str(LOG_DB_PATH), timeout=5.0) as conn:
conn.row_factory = sqlite3.Row
cur = conn.cursor()
cur.execute(sql, params)
fetched = cur.fetchall()
cur.close()
for row in fetched:
rows.append(
{
"timestamp": str(row["timestamp"] or ""),
"level": str(row["level"] or ""),
"module": str(row["module"] or ""),
"message": str(row["message"] or ""),
}
)
except Exception:
rows = []
if rows:
return rows
fallback = _fallback_log_path()
try:
if not fallback.exists():
return []
lines = fallback.read_text(encoding="utf-8", errors="replace").splitlines()
matches = [
line for line in lines
if any(term in line.lower() for term in ("config", "save failed", "saving configuration failed"))
]
for line in reversed(matches[-limit:]):
rows.append(
{
"timestamp": "",
"level": "FALLBACK",
"module": "fallback",
"message": line,
}
)
except Exception:
return []
return rows
def _format_log_timestamp_local(raw_value: str) -> str:
text = str(raw_value or "").strip()
if not text:
return ""
for pattern in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M:%S.%f"):
try:
parsed = datetime.datetime.strptime(text, pattern).replace(tzinfo=datetime.timezone.utc)
return parsed.astimezone().strftime("%Y-%m-%d %H:%M:%S")
except Exception:
continue
return text
def _show_config_logs(args: Sequence[str]) -> int:
limit = _extract_log_limit(args)
rows = _load_recent_config_logs(limit=limit)
if not rows:
print(
f"No recent config/save logs found in {LOG_DB_PATH.name} or {_fallback_log_path().name}."
)
return 0
table = Table("Configuration Logs")
table.set_table("config.logs")
table.set_source_command(".config", ["-log", str(limit)])
for row_data in rows:
row = table.add_row()
row.add_column("Time (local)", _format_log_timestamp_local(row_data.get("timestamp", "")))
row.add_column("Level", row_data.get("level", ""))
row.add_column("Module", row_data.get("module", ""))
row.add_column("Message", row_data.get("message", ""))
ctx.set_last_result_table_overlay(table, rows)
ctx.set_current_stage_table(table)
print(f"Showing {len(rows)} recent configuration log entries.")
return 0
def flatten_config(config: Dict[str, Any], parent_key: str = "", sep: str = ".") -> List[Dict[str, Any]]:
items: List[Dict[str, Any]] = []
for k, v in config.items():
if k.startswith("_"):
continue
new_key = f"{parent_key}{sep}{k}" if parent_key else k
if isinstance(v, dict):
items.extend(flatten_config(v, new_key, sep=sep))
else:
items.append({
"key": new_key,
"value": v,
"value_display": str(v),
"type": type(v).__name__,
})
return items
def set_nested_config(config: Dict[str, Any], key: str, value: str) -> bool:
return set_nested_config_value(config, key, value, on_error=print)
def _get_selected_config_key() -> Optional[str]:
try:
indices = ctx.get_last_selection() or []
except Exception:
indices = []
try:
items = ctx.get_last_result_items() or []
except Exception:
items = []
if not indices or not items:
return None
idx = indices[0]
if idx < 0 or idx >= len(items):
return None
item = items[idx]
if isinstance(item, dict):
return item.get("key")
return getattr(item, "key", None)
def _show_config_table(config_data: Dict[str, Any]) -> int:
items = flatten_config(config_data)
if not items:
print("No configuration entries available.")
return 0
items.sort(key=lambda x: x.get("key"))
table = Table("Configuration")
table.set_table("config")
table.set_source_command(".config", [])
for item in items:
row = table.add_row()
row.add_column("Key", item.get("key", ""))
row.add_column("Value", item.get("value_display", ""))
row.add_column("Type", item.get("type", ""))
ctx.set_last_result_table_overlay(table, items)
ctx.set_current_stage_table(table)
return 0
def _strip_value_quotes(value: str) -> str:
if not value:
return value
if (value.startswith('"') and value.endswith('"')) or (value.startswith("'") and value.endswith("'")):
return value[1:-1]
return value
def _run(piped_result: Any, args: List[str], config: Dict[str, Any]) -> int:
import sys
if _has_flag(args, "-log") or _has_flag(args, "--log"):
return _show_config_logs(args)
# Load configuration from the database
current_config = load_config()
selection_key = _get_selected_config_key()
value_from_args = _extract_value_arg(args) if selection_key else None
value_from_pipe = _extract_piped_value(piped_result)
if selection_key:
new_value = value_from_pipe or value_from_args
if not new_value:
print(
"Provide a new value via pipe or argument: @N | .config <value>"
)
return 1
new_value = _strip_value_quotes(new_value)
try:
set_nested_config(current_config, selection_key, new_value)
# For AllDebrid API changes, use the verified save path to ensure
# the new API key persisted to disk; otherwise fall back to normal save.
try:
key_l = str(selection_key or "").lower()
except Exception:
key_l = ""
if "alldebrid" in key_l or "all-debrid" in key_l:
try:
save_config_and_verify(current_config)
except Exception as exc:
log(f"Configuration save verification failed for '{selection_key}': {exc}")
print(f"Error saving configuration (verification failed): {exc}")
return 1
else:
save_config(current_config)
print(f"Updated '{selection_key}' to '{new_value}'")
return 0
except Exception as exc:
log(f"Error updating config '{selection_key}': {exc}")
print(f"Error updating config: {exc}")
return 1
if not args:
# Check if we're in an interactive terminal and can launch a Textual modal
if sys.stdin.isatty() and not piped_result:
try:
from textual.app import App
from TUI.modalscreen.config_modal import ConfigModal
class ConfigApp(App):
def on_mount(self) -> None:
self.title = "Config Editor"
# We push the modal screen. It will sit on top of the main (blank) screen.
# Using a callback to exit the app when the modal is dismissed.
self.push_screen(ConfigModal(), callback=self.exit_on_close)
def exit_on_close(self, result: Any = None) -> None:
self.exit()
with ctx.suspend_live_progress():
app = ConfigApp()
app.run()
# After modal exits, show the new status table if possible
try:
from cmdlet._shared import SharedArgs
from cmdnat.status import CMDLET as STATUS_CMDLET
# We reload the config one more time because it might have changed on disk
fresh_config = load_config()
# Force refresh of shared caches (especially stores)
SharedArgs._refresh_store_choices_cache(fresh_config)
# Update the global SharedArgs choices so cmdlets pick up new stores
SharedArgs.STORE.choices = SharedArgs.get_store_choices(fresh_config, force=True)
return STATUS_CMDLET.exec(None, [], fresh_config)
except Exception:
pass
return 0
except Exception as exc:
# Fall back to table display if Textual modal fails
print(f"Note: Could not launch interactive editor ({exc}). Showing configuration table:")
return _show_config_table(current_config)
return _show_config_table(current_config)
key = args[0]
if len(args) < 2:
print(f"Error: Value required for key '{key}'")
return 1
value = _strip_value_quotes(" ".join(args[1:]))
try:
set_nested_config(current_config, key, value)
save_config(current_config)
print(f"Updated '{key}' to '{value}'")
return 0
except Exception as exc:
log(f"Error updating config '{key}': {exc}")
print(f"Error updating config: {exc}")
return 1
CMDLET.exec = _run