Files
Medios-Macina/TUI/modalscreen/config_modal.py
2026-01-30 12:04:37 -08:00

1329 lines
54 KiB
Python

import re
from copy import deepcopy
from typing import Any, Dict, List, Optional
from textual import on, work
from textual.app import ComposeResult
from textual.containers import Container, Horizontal, Vertical, ScrollableContainer
from textual.screen import ModalScreen
from textual.widgets import Static, Button, Input, Label, ListView, ListItem, Rule, Select
from pathlib import Path
from SYS.config import load_config, save_config, reload_config, global_config, count_changed_entries, ConfigSaveConflict
from SYS.database import db
from SYS.logger import log
from Store.registry import _discover_store_classes, _required_keys_for
from ProviderCore.registry import list_providers
from TUI.modalscreen.matrix_room_picker import MatrixRoomPicker
from TUI.modalscreen.selection_modal import SelectionModal
class ConfigModal(ModalScreen):
"""A modal for editing the configuration."""
BINDINGS = [
("ctrl+v", "paste", "Paste"),
("ctrl+c", "copy", "Copy"),
]
CSS = """
ConfigModal {
align: center middle;
background: $boost;
}
#config-container {
width: 90%;
height: 90%;
background: $panel;
border: thick $primary;
padding: 1;
}
.section-title {
background: $accent;
color: $text;
padding: 0 1;
margin-bottom: 1;
text-align: center;
text-style: bold;
height: 3;
content-align: center middle;
}
#config-sidebar {
width: 25%;
border-right: solid $surface;
}
#config-content {
width: 75%;
padding: 1;
}
.config-field {
margin-bottom: 1;
height: auto;
}
.config-label {
width: 100%;
text-style: bold;
color: $accent;
}
.field-row {
height: 5;
margin-bottom: 1;
align: left middle;
}
.config-input {
width: 1fr;
}
.paste-btn {
width: 10;
margin-left: 1;
}
#config-actions {
height: 3;
align: right middle;
}
.item-row {
height: 5;
margin-bottom: 1;
padding: 0 1;
border: solid $surface;
}
.item-label {
width: 1fr;
height: 3;
content-align: left middle;
}
.item-row Button {
width: 16;
height: 3;
}
Button {
margin: 0 1;
}
"""
def __init__(self) -> None:
super().__init__()
# Load config from the workspace root (parent of SYS)
self.config_data = load_config()
self.current_category = "globals"
self.editing_item_type = None # 'store' or 'provider'
self.editing_item_name = None
self._button_id_map = {}
self._input_id_map = {}
self._matrix_status: Optional[Static] = None
self._matrix_test_running = False
self._editor_snapshot: Optional[Dict[str, Any]] = None
# Path to the database file used by this process (for diagnostics)
self._db_path = str(db.db_path)
def _capture_editor_snapshot(self) -> None:
self._editor_snapshot = deepcopy(self.config_data)
def _revert_unsaved_editor_changes(self) -> None:
if self._editor_snapshot is not None:
self.config_data = deepcopy(self._editor_snapshot)
self._editor_snapshot = None
def _editor_has_changes(self) -> bool:
if self._editor_snapshot is None:
return True
return self.config_data != self._editor_snapshot
def compose(self) -> ComposeResult:
with Container(id="config-container"):
yield Static("CONFIGURATION EDITOR", classes="section-title")
yield Static(f"DB: {self._db_path}", classes="config-label", id="config-db-path")
yield Static("Last saved: unknown", classes="config-label", id="config-last-save")
with Horizontal():
with Vertical(id="config-sidebar"):
yield Label("Categories", classes="config-label")
with ListView(id="category-list"):
yield ListItem(Label("Global Settings"), id="cat-globals")
yield ListItem(Label("Stores"), id="cat-stores")
yield ListItem(Label("Providers"), id="cat-providers")
yield ListItem(Label("Tools"), id="cat-tools")
with Vertical(id="config-content"):
yield ScrollableContainer(id="fields-container")
with Horizontal(id="config-actions"):
yield Button("Save", variant="success", id="save-btn")
yield Button("Add Store", variant="primary", id="add-store-btn")
yield Button("Add Provider", variant="primary", id="add-provider-btn")
yield Button("Add Tool", variant="primary", id="add-tool-btn")
yield Button("Back", id="back-btn")
yield Button("Close", variant="error", id="cancel-btn")
def on_mount(self) -> None:
self.query_one("#add-store-btn", Button).display = False
self.query_one("#add-provider-btn", Button).display = False
try:
self.query_one("#add-tool-btn", Button).display = False
except Exception:
pass
# Update DB path and last-saved on mount
try:
self.query_one("#config-db-path", Static).update(self._db_path)
except Exception:
pass
try:
mtime = None
try:
mtime = db.db_path.stat().st_mtime
mtime = __import__('datetime').datetime.utcfromtimestamp(mtime).isoformat() + "Z"
except Exception:
mtime = None
self.query_one("#config-last-save", Static).update(f"Last saved: {mtime or '(unknown)'}")
except Exception:
pass
self.refresh_view()
def refresh_view(self) -> None:
"""
Refresh the content area. We debounce this call and use a render_id
to avoid race conditions with Textual's async widget mounting.
"""
self._render_id = getattr(self, "_render_id", 0) + 1
if hasattr(self, "_refresh_timer"):
self._refresh_timer.stop()
self._refresh_timer = self.set_timer(0.02, self._actual_refresh)
def _actual_refresh(self) -> None:
try:
container = self.query_one("#fields-container", ScrollableContainer)
except Exception:
return
self._button_id_map.clear()
self._input_id_map.clear()
# Clear existing
container.query("*").remove()
# Update visibility of buttons
try:
self.query_one("#add-store-btn", Button).display = (self.current_category == "stores" and self.editing_item_name is None)
self.query_one("#add-provider-btn", Button).display = (self.current_category == "providers" and self.editing_item_name is None)
self.query_one("#add-tool-btn", Button).display = (self.current_category == "tools" and self.editing_item_name is None)
self.query_one("#back-btn", Button).display = (self.editing_item_name is not None)
self.query_one("#save-btn", Button).display = (self.editing_item_name is not None or self.current_category == "globals")
except Exception:
pass
render_id = self._render_id
def do_mount():
# If a new refresh was started, ignore this old mount request
if getattr(self, "_render_id", 0) != render_id:
return
# Final check that container is empty. remove() is async.
if container.children:
for child in list(container.children):
child.remove()
if self.editing_item_name:
self.render_item_editor(container)
elif self.current_category == "globals":
self.render_globals(container)
elif self.current_category == "stores":
self.render_stores(container)
elif self.current_category == "providers":
self.render_providers(container)
elif self.current_category == "tools":
self.render_tools(container)
self.call_after_refresh(do_mount)
def render_globals(self, container: ScrollableContainer) -> None:
container.mount(Label("General Configuration", classes="config-label"))
# Get global schema
schema_map = {f["key"].lower(): f for f in global_config()}
existing_keys_lower = set()
idx = 0
# Show fields defined in schema first
for key_lower, field_def in schema_map.items():
existing_keys_lower.add(key_lower)
label_text = field_def.get("label") or field_def["key"]
choices = field_def.get("choices")
# Find current value (case-insensitive)
current_val = None
found_key = field_def["key"]
for k, v in self.config_data.items():
if k.lower() == key_lower:
current_val = str(v)
found_key = k
break
if current_val is None:
current_val = str(field_def.get("default") or "")
container.mount(Label(label_text))
inp_id = f"global-{idx}"
self._input_id_map[inp_id] = found_key
if choices:
select_options = [(str(c), str(c)) for c in choices]
if current_val not in [str(c) for c in choices]:
select_options.insert(0, (current_val, current_val))
sel = Select(select_options, value=current_val, id=inp_id)
container.mount(sel)
else:
row = Horizontal(classes="field-row")
container.mount(row)
row.mount(Input(value=current_val, id=inp_id, classes="config-input"))
row.mount(Button("Paste", id=f"paste-{inp_id}", classes="paste-btn"))
idx += 1
# Show any other top-level keys not in schema
for k, v in self.config_data.items():
if not isinstance(v, dict) and not k.startswith("_"):
if k.lower() in existing_keys_lower:
continue
inp_id = f"global-{idx}"
self._input_id_map[inp_id] = k
container.mount(Label(k))
row = Horizontal(classes="field-row")
container.mount(row)
row.mount(Input(value=str(v), id=inp_id, classes="config-input"))
row.mount(Button("Paste", id=f"paste-{inp_id}", classes="paste-btn"))
idx += 1
def render_stores(self, container: ScrollableContainer) -> None:
container.mount(Label("Configured Stores", classes="config-label"))
stores = self.config_data.get("store", {})
if not stores:
container.mount(Static("No stores configured."))
else:
# stores is structured as: {type: {name_key: config}}
idx = 0
for stype, instances in stores.items():
if isinstance(instances, dict):
for name_key, conf in instances.items():
# Use the name field from the config if it exists, otherwise use the key
display_name = name_key
if isinstance(conf, dict):
display_name = (
conf.get("NAME")
or conf.get("name")
or conf.get("Name")
or name_key
)
edit_id = f"edit-store-{idx}"
del_id = f"del-store-{idx}"
self._button_id_map[edit_id] = ("edit", f"store-{stype}", name_key)
self._button_id_map[del_id] = ("del", f"store-{stype}", name_key)
idx += 1
row = Horizontal(
Static(f"{display_name} ({stype})", classes="item-label"),
Button("Edit", id=edit_id),
Button("Delete", variant="error", id=del_id),
classes="item-row"
)
container.mount(row)
def render_providers(self, container: ScrollableContainer) -> None:
container.mount(Label("Configured Providers", classes="config-label"))
providers = self.config_data.get("provider", {})
if not providers:
container.mount(Static("No providers configured."))
else:
for i, (name, _) in enumerate(providers.items()):
edit_id = f"edit-provider-{i}"
del_id = f"del-provider-{i}"
self._button_id_map[edit_id] = ("edit", "provider", name)
self._button_id_map[del_id] = ("del", "provider", name)
row = Horizontal(
Static(name, classes="item-label"),
Button("Edit", id=edit_id),
Button("Delete", variant="error", id=del_id),
classes="item-row"
)
container.mount(row)
def render_tools(self, container: ScrollableContainer) -> None:
container.mount(Label("Configured Tools", classes="config-label"))
tools = self.config_data.get("tool", {})
if not tools:
container.mount(Static("No tools configured."))
else:
for i, (name, _) in enumerate(tools.items()):
edit_id = f"edit-tool-{i}"
del_id = f"del-tool-{i}"
self._button_id_map[edit_id] = ("edit", "tool", name)
self._button_id_map[del_id] = ("del", "tool", name)
row = Horizontal(
Static(name, classes="item-label"),
Button("Edit", id=edit_id),
Button("Delete", variant="error", id=del_id),
classes="item-row"
)
container.mount(row)
def render_item_editor(self, container: ScrollableContainer) -> None:
item_type = str(self.editing_item_type or "")
item_name = str(self.editing_item_name or "")
provider_schema_map = {}
# Parse item_type for store-{stype} or just provider
if item_type.startswith("store-"):
stype = item_type.replace("store-", "")
container.mount(Label(f"Editing Store: {item_name} ({stype})", classes="config-label"))
section = self.config_data.get("store", {}).get(stype, {}).get(item_name, {})
# Fetch Store schema
classes = _discover_store_classes()
if stype in classes:
cls = classes[stype]
if hasattr(cls, "config_schema") and callable(cls.config_schema):
for field_def in cls.config_schema():
k = field_def.get("key")
if k:
provider_schema_map[k.upper()] = field_def
else:
container.mount(Label(f"Editing {item_type.capitalize()}: {item_name}", classes="config-label"))
section = self.config_data.get(item_type, {}).get(item_name, {})
# Fetch Provider schema
if item_type == "provider":
from ProviderCore.registry import get_provider_class
try:
pcls = get_provider_class(item_name)
if pcls and hasattr(pcls, "config_schema") and callable(pcls.config_schema):
for field_def in pcls.config_schema():
k = field_def.get("key")
if k:
provider_schema_map[k.upper()] = field_def
except Exception:
pass
# Fetch Tool schema
if item_type == "tool":
try:
import importlib
mod = importlib.import_module(f"tool.{item_name}")
if hasattr(mod, "config_schema") and callable(mod.config_schema):
for field_def in mod.config_schema():
k = field_def.get("key")
if k:
provider_schema_map[k.upper()] = field_def
except Exception:
pass
# Use columns for better layout of inputs with paste buttons
container.mount(Label("Edit Settings"))
# render_item_editor will handle the inputs for us if we set these
# but wait, render_item_editor is called from refresh_view, not here.
# actually we don't need to do anything else here because refresh_view calls render_item_editor
# which now handles the paste buttons.
# Show all existing keys
existing_keys_upper = set()
idx = 0
for k, v in section.items():
if k.startswith("_"): continue
# Deduplicate keys case-insensitively (e.g. name vs NAME vs Name)
k_upper = k.upper()
if k_upper in existing_keys_upper:
continue
existing_keys_upper.add(k_upper)
# Determine display props from schema
label_text = k
is_secret = False
choices = None
schema = provider_schema_map.get(k_upper)
if schema:
label_text = schema.get("label") or k
if schema.get("required"):
label_text += " *"
if schema.get("secret"):
is_secret = True
choices = schema.get("choices")
container.mount(Label(label_text))
inp_id = f"item-{idx}"
self._input_id_map[inp_id] = k
if choices:
# Select takes a list of (label, value) tuples
select_options = []
choice_values = []
for c in choices:
if isinstance(c, tuple) and len(c) == 2:
select_options.append((str(c[0]), str(c[1])))
choice_values.append(str(c[1]))
else:
select_options.append((str(c), str(c)))
choice_values.append(str(c))
# If current value not in choices, add it or stay blank
current_val = str(v)
if current_val not in choice_values:
select_options.insert(0, (current_val, current_val))
sel = Select(select_options, value=current_val, id=inp_id)
container.mount(sel)
else:
row = Horizontal(classes="field-row")
container.mount(row)
inp = Input(value=str(v), id=inp_id, classes="config-input")
if is_secret:
inp.password = True
row.mount(inp)
row.mount(Button("Paste", id=f"paste-{inp_id}", classes="paste-btn"))
idx += 1
# Add required/optional fields from schema that are missing
for k_upper, field_def in provider_schema_map.items():
if k_upper not in existing_keys_upper:
existing_keys_upper.add(k_upper)
key = field_def["key"]
label_text = field_def.get("label") or key
if field_def.get("required"):
label_text += " *"
default_val = str(field_def.get("default") or "")
choices = field_def.get("choices")
container.mount(Label(label_text))
inp_id = f"item-{idx}"
self._input_id_map[inp_id] = key
if choices:
select_options = []
choice_values = []
for c in choices:
if isinstance(c, tuple) and len(c) == 2:
select_options.append((str(c[0]), str(c[1])))
choice_values.append(str(c[1]))
else:
select_options.append((str(c), str(c)))
choice_values.append(str(c))
if default_val not in choice_values:
select_options.insert(0, (default_val, default_val))
sel = Select(select_options, value=default_val, id=inp_id)
container.mount(sel)
else:
row = Horizontal(classes="field-row")
container.mount(row)
inp = Input(value=default_val, id=inp_id, classes="config-input")
if field_def.get("secret"):
inp.password = True
if field_def.get("placeholder"):
inp.placeholder = field_def.get("placeholder")
row.mount(inp)
row.mount(Button("Paste", id=f"paste-{inp_id}", classes="paste-btn"))
idx += 1
# If it's a store, we might have required keys (legacy check fallback)
if item_type.startswith("store-"):
stype = item_type.replace("store-", "")
classes = _discover_store_classes()
if stype in classes:
required_keys = _required_keys_for(classes[stype])
for rk in required_keys:
# Case-insensitive deduplication (fix path vs PATH)
if rk.upper() not in existing_keys_upper:
container.mount(Label(rk))
inp_id = f"item-{idx}"
self._input_id_map[inp_id] = rk
row = Horizontal(classes="field-row")
container.mount(row)
row.mount(Input(value="", id=inp_id, classes="config-input"))
row.mount(Button("Paste", id=f"paste-{inp_id}", classes="paste-btn"))
idx += 1
# If it's a provider, we might have required keys (legacy check fallback)
if item_type == "provider":
# 2. Legacy required_config_keys
from ProviderCore.registry import get_provider_class
try:
pcls = get_provider_class(item_name)
if pcls:
required_keys = pcls.required_config_keys()
for rk in required_keys:
if rk.upper() not in existing_keys_upper:
container.mount(Label(rk))
inp_id = f"item-{idx}"
self._input_id_map[inp_id] = rk
row = Horizontal(classes="field-row")
container.mount(row)
row.mount(Input(value="", id=inp_id, classes="config-input"))
row.mount(Button("Paste", id=f"paste-{inp_id}", classes="paste-btn"))
idx += 1
except Exception:
pass
if (
item_type == "provider"
and isinstance(item_name, str)
and item_name.strip().lower() == "matrix"
):
container.mount(Rule())
container.mount(Label("Matrix helpers", classes="config-label"))
status = Static("Set homeserver + token, then test before saving", id="matrix-status")
container.mount(status)
row = Horizontal(classes="field-row")
container.mount(row)
row.mount(Button("Test connection", id="matrix-test-btn"))
row.mount(Button("Choose default rooms", variant="primary", id="matrix-rooms-btn"))
self._matrix_status = status
def create_field(self, name: str, value: Any, id: str) -> Vertical:
# This method is now unused - we mount labels and inputs directly
v = Vertical(classes="config-field")
return v
def on_list_view_selected(self, event: ListView.Selected) -> None:
if not event.item: return
if event.item.id == "cat-globals":
self.current_category = "globals"
elif event.item.id == "cat-stores":
self.current_category = "stores"
elif event.item.id == "cat-providers":
self.current_category = "providers"
elif event.item.id == "cat-tools":
self.current_category = "tools"
self.editing_item_name = None
self.editing_item_type = None
self.refresh_view()
def on_button_pressed(self, event: Button.Pressed) -> None:
bid = event.button.id
if not bid: return
if bid == "cancel-btn":
self._revert_unsaved_editor_changes()
self.dismiss()
elif bid == "back-btn":
self._revert_unsaved_editor_changes()
self.editing_item_name = None
self.editing_item_type = None
self.refresh_view()
elif bid == "save-btn":
self._synchronize_inputs_to_config()
if not self.validate_current_editor():
return
if self.editing_item_name and not self._editor_has_changes():
self.notify("No changes to save", severity="warning", timeout=3)
return
try:
saved = self.save_all()
if saved == 0:
msg = f"Configuration saved (no rows changed) to {db.db_path.name}"
else:
msg = f"Configuration saved ({saved} change(s)) to {db.db_path.name}"
# Make the success notification visible a bit longer so it's not missed
self.notify(msg, timeout=5)
# Return to the main list view within the current category
self.editing_item_name = None
self.editing_item_type = None
self.refresh_view()
self._editor_snapshot = None
except ConfigSaveConflict as exc:
# A concurrent on-disk change was detected; do not overwrite it.
self.notify(
"Save aborted: configuration changed on disk. The editor will refresh.",
severity="error",
timeout=10,
)
# Refresh our in-memory view from disk and drop the editor snapshot
try:
self.config_data = reload_config()
except Exception:
pass
self._editor_snapshot = None
self.editing_item_name = None
self.editing_item_type = None
self.refresh_view()
except Exception as exc:
self.notify(f"Save failed: {exc}", severity="error", timeout=10)
elif bid in self._button_id_map:
action, itype, name = self._button_id_map[bid]
if action == "edit":
self._capture_editor_snapshot()
self.editing_item_type = itype
self.editing_item_name = name
self.refresh_view()
elif action == "del":
removed = False
if itype.startswith("store-"):
stype = itype.replace("store-", "")
if "store" in self.config_data and stype in self.config_data["store"]:
if name in self.config_data["store"][stype]:
del self.config_data["store"][stype][name]
removed = True
elif itype == "provider":
if "provider" in self.config_data and name in self.config_data["provider"]:
del self.config_data["provider"][name]
removed = True
if str(name).strip().lower() == "alldebrid":
self._remove_alldebrid_store_entry()
elif itype == "tool":
if "tool" in self.config_data and name in self.config_data["tool"]:
del self.config_data["tool"][name]
removed = True
if removed:
try:
saved = self.save_all()
self.notify("Saving configuration...", timeout=3)
except Exception as exc:
self.notify(f"Save failed: {exc}", severity="error", timeout=10)
self.refresh_view()
elif bid == "add-store-btn":
all_classes = _discover_store_classes()
options = []
for stype, cls in all_classes.items():
if hasattr(cls, "config_schema") and callable(cls.config_schema):
try:
if cls.config_schema():
options.append(stype)
except Exception:
pass
self.app.push_screen(SelectionModal("Select Store Type", options), callback=self.on_store_type_selected)
elif bid == "add-provider-btn":
provider_names = list(list_providers().keys())
options = []
from ProviderCore.registry import get_provider_class
for ptype in provider_names:
pcls = get_provider_class(ptype)
if pcls and hasattr(pcls, "config_schema") and callable(pcls.config_schema):
try:
if pcls.config_schema():
options.append(ptype)
except Exception:
pass
self.app.push_screen(SelectionModal("Select Provider Type", options), callback=self.on_provider_type_selected)
elif bid == "add-tool-btn":
# Discover tool modules that advertise a config_schema()
options = []
try:
import pkgutil
import importlib
import tool as _tool_pkg
for _mod in pkgutil.iter_modules(_tool_pkg.__path__):
try:
mod = importlib.import_module(f"tool.{_mod.name}")
if hasattr(mod, "config_schema") and callable(mod.config_schema):
options.append(_mod.name)
except Exception:
continue
except Exception:
# Fallback to known entry
options = ["ytdlp"]
if options:
options.sort()
self.app.push_screen(SelectionModal("Select Tool", options), callback=self.on_tool_type_selected)
elif bid == "matrix-test-btn":
self._request_matrix_test()
elif bid == "matrix-rooms-btn":
self._open_matrix_room_picker()
# Restore UI removed: backups/audit remain available programmatically
elif bid.startswith("paste-"):
# Programmatic paste button
target_id = bid.replace("paste-", "")
try:
inp = self.query_one(f"#{target_id}", Input)
self.focus_and_paste(inp)
except Exception:
pass
async def focus_and_paste(self, inp: Input) -> None:
if hasattr(self.app, "paste_from_clipboard"):
text = await self.app.paste_from_clipboard()
if text:
# Replace selection or append
inp.value = str(inp.value) + text
inp.focus()
self.notify("Pasted from clipboard")
else:
self.notify("Clipboard not supported in this terminal", severity="warning")
async def action_paste(self) -> None:
focused = self.focused
if isinstance(focused, Input):
await self.focus_and_paste(focused)
async def action_copy(self) -> None:
focused = self.focused
if isinstance(focused, Input) and focused.value:
if hasattr(self.app, "copy_to_clipboard"):
self.app.copy_to_clipboard(str(focused.value))
self.notify("Copied to clipboard")
else:
self.notify("Clipboard not supported in this terminal", severity="warning")
# Backup/restore helpers removed: forensics/audit mode disabled and restore UI removed.
def on_store_type_selected(self, stype: str) -> None:
if not stype:
return
self._capture_editor_snapshot()
existing_names: set[str] = set()
store_block = self.config_data.get("store")
if isinstance(store_block, dict):
st_entries = store_block.get(stype)
if isinstance(st_entries, dict):
existing_names = {str(name) for name in st_entries.keys() if name}
base_name = f"new_{stype}"
new_name = base_name
suffix = 1
while new_name in existing_names:
suffix += 1
new_name = f"{base_name}_{suffix}"
if "store" not in self.config_data:
self.config_data["store"] = {}
if stype not in self.config_data["store"]:
self.config_data["store"][stype] = {}
# Default config for the new store
new_config = {"NAME": new_name}
classes = _discover_store_classes()
if stype in classes:
cls = classes[stype]
# Use schema for defaults if present
if hasattr(cls, "config_schema") and callable(cls.config_schema):
for field_def in cls.config_schema():
key = field_def.get("key")
if key:
val = field_def.get("default", "")
# Don't override NAME if we already set it to new_stype
if key.upper() == "NAME":
continue
new_config[key] = val
else:
# Fallback to required keys list
required = _required_keys_for(cls)
for rk in required:
if rk.upper() != "NAME":
new_config[rk] = ""
self.config_data["store"][stype][new_name] = new_config
self.editing_item_type = f"store-{stype}"
self.editing_item_name = new_name
self.refresh_view()
def on_provider_type_selected(self, ptype: str) -> None:
if not ptype: return
self._capture_editor_snapshot()
if "provider" not in self.config_data:
self.config_data["provider"] = {}
# For providers, they are usually top-level entries in 'provider' dict
if ptype not in self.config_data["provider"]:
from ProviderCore.registry import get_provider_class
try:
pcls = get_provider_class(ptype)
new_config = {}
if pcls:
# Use schema for defaults
if hasattr(pcls, "config_schema") and callable(pcls.config_schema):
for field_def in pcls.config_schema():
key = field_def.get("key")
if key:
new_config[key] = field_def.get("default", "")
else:
# Fallback to legacy required keys
required = pcls.required_config_keys()
for rk in required:
new_config[rk] = ""
self.config_data["provider"][ptype] = new_config
except Exception:
self.config_data["provider"][ptype] = {}
self.editing_item_type = "provider"
self.editing_item_name = ptype
self.refresh_view()
def on_tool_type_selected(self, tname: str) -> None:
if not tname:
return
self._capture_editor_snapshot()
if "tool" not in self.config_data:
self.config_data["tool"] = {}
if tname not in self.config_data["tool"]:
new_config = {}
try:
import importlib
mod = importlib.import_module(f"tool.{tname}")
if hasattr(mod, "config_schema") and callable(mod.config_schema):
for field_def in mod.config_schema():
key = field_def.get("key")
if key:
new_config[key] = field_def.get("default", "")
except Exception:
pass
self.config_data["tool"][tname] = new_config
self.editing_item_type = "tool"
self.editing_item_name = tname
self.refresh_view()
def _update_config_value(self, widget_id: str, value: Any) -> None:
if widget_id not in self._input_id_map:
return
key = self._input_id_map[widget_id]
raw_value = value
is_blank_string = isinstance(raw_value, str) and not raw_value.strip()
existing_value: Any = None
item_type = str(self.editing_item_type or "")
item_name = str(self.editing_item_name or "")
if widget_id.startswith("global-"):
existing_value = self.config_data.get(key)
elif widget_id.startswith("item-") and item_name:
if item_type.startswith("store-"):
stype = item_type.replace("store-", "")
store_block = self.config_data.get("store")
if isinstance(store_block, dict):
type_block = store_block.get(stype)
if isinstance(type_block, dict):
section = type_block.get(item_name)
if isinstance(section, dict):
existing_value = section.get(key)
else:
section_block = self.config_data.get(item_type)
if isinstance(section_block, dict):
section = section_block.get(item_name)
if isinstance(section, dict):
existing_value = section.get(key)
if is_blank_string and existing_value is None:
return
# Try to preserve boolean/integer types
processed_value = raw_value
if isinstance(raw_value, str):
low = raw_value.lower()
if low == "true":
processed_value = True
elif low == "false":
processed_value = False
elif raw_value.isdigit():
processed_value = int(raw_value)
if widget_id.startswith("global-"):
self.config_data[key] = processed_value
elif widget_id.startswith("item-") and item_name:
if item_type.startswith("store-"):
stype = item_type.replace("store-", "")
if "store" not in self.config_data:
self.config_data["store"] = {}
if stype not in self.config_data["store"]:
self.config_data["store"][stype] = {}
if item_name not in self.config_data["store"][stype]:
self.config_data["store"][stype][item_name] = {}
# Special case: Renaming the store via the NAME field
if key.upper() == "NAME" and processed_value and str(processed_value) != item_name:
new_name = str(processed_value)
self.config_data["store"][stype][new_name] = self.config_data["store"][stype].pop(item_name)
self.editing_item_name = new_name
item_name = new_name
self.config_data["store"][stype][item_name][key] = processed_value
else:
if item_type not in self.config_data:
self.config_data[item_type] = {}
if item_name not in self.config_data[item_type]:
self.config_data[item_type][item_name] = {}
self.config_data[item_type][item_name][key] = processed_value
def _synchronize_inputs_to_config(self) -> None:
"""Capture current input/select values before saving."""
widgets = list(self.query(Input)) + list(self.query(Select))
for widget in widgets:
widget_id = widget.id
if not widget_id or widget_id not in self._input_id_map:
continue
if isinstance(widget, Select):
if widget.value == Select.BLANK:
continue
value = widget.value
else:
value = widget.value
self._update_config_value(widget_id, value)
def _remove_alldebrid_store_entry(self) -> bool:
"""Remove the mirrored AllDebrid store entry that would recreate the provider."""
store_block = self.config_data.get("store")
if not isinstance(store_block, dict):
return False
debrid = store_block.get("debrid")
if not isinstance(debrid, dict):
return False
removed = False
for key in list(debrid.keys()):
if str(key or "").strip().lower() == "all-debrid":
debrid.pop(key, None)
removed = True
if not debrid:
store_block.pop("debrid", None)
if not store_block:
self.config_data.pop("store", None)
return removed
def _get_matrix_provider_block(self) -> Dict[str, Any]:
providers = self.config_data.get("provider")
if not isinstance(providers, dict):
return {}
block = providers.get("matrix")
return block if isinstance(block, dict) else {}
def _parse_matrix_rooms_value(self) -> List[str]:
block = self._get_matrix_provider_block()
raw = block.get("rooms")
if isinstance(raw, (list, tuple, set)):
return [str(item).strip() for item in raw if str(item).strip()]
text = str(raw or "").strip()
if not text:
return []
return [segment for segment in re.split(r"[\s,]+", text) if segment]
def _request_matrix_test(self) -> None:
if self._matrix_test_running:
return
self._synchronize_inputs_to_config()
if self._matrix_status:
self._matrix_status.update("Saving configuration before testing…")
changed = count_changed_entries(self.config_data)
try:
entries = save_config(self.config_data)
except Exception as exc:
if self._matrix_status:
self._matrix_status.update(f"Saving configuration failed: {exc}")
self._matrix_test_running = False
return
self.config_data = reload_config()
if self._matrix_status:
self._matrix_status.update(f"Saved configuration ({changed} change(s)) to {db.db_path.name}. Testing Matrix connection…")
self._matrix_test_running = True
self._matrix_test_background()
@work(thread=True)
def _matrix_test_background(self) -> None:
try:
from Provider.matrix import Matrix
provider = Matrix(self.config_data)
rooms = provider.list_rooms()
self.app.call_from_thread(self._matrix_test_result, True, rooms, None)
except Exception as exc:
self.app.call_from_thread(self._matrix_test_result, False, [], str(exc))
def _matrix_test_result(
self,
success: bool,
rooms: List[Dict[str, Any]],
error: Optional[str],
) -> None:
self._matrix_test_running = False
if success:
msg = f"Matrix test succeeded ({len(rooms)} room(s))."
if self._matrix_status:
self._matrix_status.update(msg)
self._open_matrix_room_picker(prefetched_rooms=rooms)
else:
if self._matrix_status:
self._matrix_status.update(f"Matrix test failed: {error}")
def _open_matrix_room_picker(
self,
*,
prefetched_rooms: Optional[List[Dict[str, Any]]] = None,
) -> None:
existing = self._parse_matrix_rooms_value()
self.app.push_screen(
MatrixRoomPicker(
self.config_data,
existing=existing,
rooms=prefetched_rooms,
),
callback=self.on_matrix_rooms_selected,
)
def on_matrix_rooms_selected(self, result: Any = None) -> None:
if not isinstance(result, list):
if self._matrix_status:
self._matrix_status.update("Room selection cancelled.")
return
cleaned: List[str] = []
for item in result:
candidate = str(item or "").strip()
if candidate and candidate not in cleaned:
cleaned.append(candidate)
if not cleaned:
if self._matrix_status:
self._matrix_status.update("No default rooms were saved.")
return
matrix_block = self.config_data.setdefault("provider", {}).setdefault("matrix", {})
matrix_block["rooms"] = ", ".join(cleaned)
changed = count_changed_entries(self.config_data)
try:
entries = save_config(self.config_data)
except Exception as exc:
if self._matrix_status:
self._matrix_status.update(f"Saving default rooms failed: {exc}")
return
self.config_data = reload_config()
if self._matrix_status:
status = f"Saved {len(cleaned)} default room(s) ({changed} change(s)) to {db.db_path.name}."
self._matrix_status.update(status)
self.refresh_view()
@on(Input.Changed)
def on_input_changed(self, event: Input.Changed) -> None:
if event.input.id:
self._update_config_value(event.input.id, event.value)
@on(Select.Changed)
def on_select_changed(self, event: Select.Changed) -> None:
if event.select.id:
# Select value can be the 'Select.BLANK' sentinel
if event.value != Select.BLANK:
self._update_config_value(event.select.id, event.value)
def save_all(self) -> int:
self._synchronize_inputs_to_config()
# Compute change count prior to persisting so callers can report the number of
# actual changes rather than the total number of rows written to the DB.
changed = count_changed_entries(self.config_data)
# Snapshot config for background save
snapshot = deepcopy(self.config_data)
# Schedule the save to run on a background worker so the UI doesn't block.
try:
# Prefer Textual's worker when running inside the app
self._save_background(snapshot, changed)
except Exception:
# Fallback: start a plain thread that runs the underlying task body
import threading
func = getattr(self._save_background, "__wrapped__", None)
if func:
thread = threading.Thread(target=lambda: func(self, snapshot, changed), daemon=True)
else:
thread = threading.Thread(target=lambda: self._save_background(snapshot, changed), daemon=True)
thread.start()
# Ensure DB path indicator is current and show saving status
self._db_path = str(db.db_path)
try:
self.query_one("#config-db-path", Static).update(self._db_path)
except Exception:
pass
try:
self.query_one("#config-last-save", Static).update("Last saved: (saving...)")
except Exception:
pass
log(f"ConfigModal scheduled save (changed={changed})")
return changed
@work(thread=True)
def _save_background(self, cfg: Dict[str, Any], changed: int) -> None:
try:
saved_entries = save_config(cfg)
try:
appobj = self.app
except Exception:
appobj = None
if appobj and hasattr(appobj, 'call_from_thread'):
appobj.call_from_thread(self._on_save_complete, True, None, changed, saved_entries)
else:
# If no app (e.g., running under tests), call completion directly
self._on_save_complete(True, None, changed, saved_entries)
except ConfigSaveConflict as exc:
try:
appobj = self.app
except Exception:
appobj = None
if appobj and hasattr(appobj, 'call_from_thread'):
appobj.call_from_thread(self._on_save_complete, False, str(exc), changed, 0)
else:
self._on_save_complete(False, str(exc), changed, 0)
except Exception as exc:
try:
appobj = self.app
except Exception:
appobj = None
if appobj and hasattr(appobj, 'call_from_thread'):
appobj.call_from_thread(self._on_save_complete, False, str(exc), changed, 0)
else:
self._on_save_complete(False, str(exc), changed, 0)
def _on_save_complete(self, success: bool, error: Optional[str], changed: int, saved_entries: int) -> None:
# Safely determine whether a Textual app context is available. Accessing
# `self.app` can raise when not running inside the TUI; handle that.
try:
appobj = self.app
except Exception:
appobj = None
if success:
try:
self.config_data = reload_config()
except Exception:
pass
# Update last-saved label with file timestamp for visibility
db_mtime = None
try:
db_mtime = db.db_path.stat().st_mtime
db_mtime = __import__('datetime').datetime.utcfromtimestamp(db_mtime).isoformat() + "Z"
except Exception:
db_mtime = None
if appobj:
try:
if changed == 0:
label_text = f"Last saved: (no changes)"
else:
label_text = f"Last saved: {changed} change(s) at {db_mtime or '(unknown)'}"
try:
self.query_one("#config-last-save", Static).update(label_text)
except Exception:
pass
except Exception:
pass
try:
self.refresh_view()
except Exception:
pass
try:
self.notify(f"Configuration saved ({changed} change(s)) to {db.db_path.name}", timeout=5)
except Exception:
pass
else:
# No TUI available; log instead of updating UI
log(f"Configuration saved ({changed} change(s)) to {db.db_path.name}")
log(f"ConfigModal saved {saved_entries} configuration entries (changed={changed})")
else:
# Save failed; notify via UI if available, otherwise log
if appobj:
try:
self.notify(f"Save failed: {error}", severity="error", timeout=10)
except Exception:
pass
try:
self.config_data = reload_config()
except Exception:
pass
try:
self.refresh_view()
except Exception:
pass
else:
log(f"Save failed: {error}")
def validate_current_editor(self) -> bool:
"""Ensure all required fields for the current item are filled."""
if not self.editing_item_name:
return True
item_type = str(self.editing_item_type or "")
item_name = str(self.editing_item_name or "")
required_keys = []
section = {}
if item_type.startswith("store-"):
stype = item_type.replace("store-", "")
classes = _discover_store_classes()
if stype in classes:
required_keys = list(_required_keys_for(classes[stype]))
section = self.config_data.get("store", {}).get(stype, {}).get(item_name, {})
elif item_type == "provider":
from ProviderCore.registry import get_provider_class
try:
pcls = get_provider_class(item_name)
if pcls:
# Collect required keys from schema
if hasattr(pcls, "config_schema") and callable(pcls.config_schema):
for field_def in pcls.config_schema():
if field_def.get("required"):
k = field_def.get("key")
if k and k not in required_keys:
required_keys.append(k)
# Merge with legacy required keys
for rk in pcls.required_config_keys():
if rk not in required_keys:
required_keys.append(rk)
except Exception:
pass
section = self.config_data.get("provider", {}).get(item_name, {})
elif item_type == "tool":
try:
import importlib
mod = importlib.import_module(f"tool.{item_name}")
if hasattr(mod, "config_schema") and callable(mod.config_schema):
for field_def in mod.config_schema():
if field_def.get("required"):
k = field_def.get("key")
if k and k not in required_keys:
required_keys.append(k)
except Exception:
pass
section = self.config_data.get("tool", {}).get(item_name, {})
# Check required keys
for rk in required_keys:
# Case-insensitive lookup for the required key in the current section
val = None
rk_upper = rk.upper()
for k, v in section.items():
if k.upper() == rk_upper:
val = v
break
if not val or not str(val).strip():
self.notify(f"Required field '{rk}' cannot be blank", severity="error")
return False
return True