Files
Medios-Macina/TUI/modalscreen/config_modal.py
T

1884 lines
80 KiB
Python

import re
from copy import deepcopy
from typing import Any, Dict, List, Optional, Iterable
import traceback
from textual import on, work
from textual.app import ComposeResult
from textual.containers import Container, Horizontal, Vertical, ScrollableContainer
from textual.screen import ModalScreen
from textual.widgets import Static, Button, Input, Label, ListView, ListItem, Rule, Select, Checkbox, TextArea
from pathlib import Path
from SYS.config import (
load_config,
save_config,
save_config_and_verify,
reload_config,
count_changed_entries,
ConfigSaveConflict,
coerce_config_value,
)
from SYS.database import db
from SYS.logger import log, debug
from SYS.plugin_config import (
build_default_provider_config,
build_default_store_config,
build_default_tool_config,
get_configurable_provider_types,
get_configurable_store_types,
get_configurable_tool_types,
get_global_schema,
get_item_schema_map,
get_required_config_keys,
)
from ProviderCore.registry import get_plugin, get_plugin_class
from TUI.modalscreen.matrix_room_picker import MatrixRoomPicker
from TUI.modalscreen.selection_modal import SelectionModal
import logging
logger = logging.getLogger(__name__)
class ConfigModal(ModalScreen):
"""A modal for editing the configuration."""
BINDINGS = [
("ctrl+v", "paste", "Paste"),
("ctrl+c", "copy", "Copy"),
]
CSS = """
ConfigModal {
align: center middle;
background: $boost;
}
#config-container {
width: 90%;
height: 90%;
background: $panel;
border: thick $primary;
padding: 1;
}
.section-title {
background: $accent;
color: $text;
padding: 0 1;
margin-bottom: 1;
text-align: center;
text-style: bold;
height: 3;
content-align: center middle;
}
#config-sidebar {
width: 25%;
border-right: solid $surface;
}
#config-content {
width: 75%;
padding: 1;
}
.config-field {
margin-bottom: 1;
height: auto;
}
.config-label {
width: 100%;
text-style: bold;
color: $accent;
}
.field-row {
height: 5;
margin-bottom: 1;
align: left middle;
}
.config-input {
width: 1fr;
}
.config-textarea {
height: 8;
margin-bottom: 1;
}
.config-group {
color: $accent;
text-style: bold;
margin-top: 1;
margin-bottom: 1;
}
#config-actions {
height: 3;
align: right middle;
}
.item-row {
height: 5;
margin-bottom: 1;
padding: 0 1;
border: solid $surface;
}
.item-label {
width: 1fr;
height: 3;
content-align: left middle;
}
.item-row Button {
width: 16;
height: 3;
}
Button {
margin: 0 1;
}
/* Inline matrix rooms list sizing & style (larger, scrollable) */
#matrix-rooms-inline {
height: 16;
border: solid $surface;
padding: 1;
margin-bottom: 1;
}
.matrix-room-row {
border-bottom: solid $surface;
padding: 1 0;
align: left middle;
}
"""
def __init__(self) -> None:
super().__init__()
# Load config from the workspace root (parent of SYS)
self.config_data = load_config()
self.current_category = "globals"
self.editing_item_type = None # 'store' or 'provider'
self.editing_item_name = None
self._button_id_map = {}
self._provider_button_map: Dict[str, tuple[str, str]] = {}
self._input_id_map = {}
self._matrix_status: Optional[Static] = None
self._matrix_test_running = False
self._provider_status: Optional[Static] = None
self._provider_action_running = False
self._editor_snapshot: Optional[Dict[str, Any]] = None
# Inline matrix rooms controls
self._matrix_inline_list: Optional[ListView] = None
self._matrix_inline_checkbox_map: Dict[str, str] = {}
# Path to the database file used by this process (for diagnostics)
self._db_path = str(db.db_path)
def _capture_editor_snapshot(self) -> None:
self._editor_snapshot = deepcopy(self.config_data)
def _revert_unsaved_editor_changes(self) -> None:
if self._editor_snapshot is not None:
self.config_data = deepcopy(self._editor_snapshot)
self._editor_snapshot = None
def _editor_has_changes(self) -> bool:
if self._editor_snapshot is None:
return True
return self.config_data != self._editor_snapshot
def compose(self) -> ComposeResult:
with Container(id="config-container"):
yield Static("CONFIGURATION EDITOR", classes="section-title")
yield Static(f"DB: {self._db_path}", classes="config-label", id="config-db-path")
yield Static("Last saved: unknown", classes="config-label", id="config-last-save")
with Horizontal():
with Vertical(id="config-sidebar"):
yield Label("Categories", classes="config-label")
with ListView(id="category-list"):
yield ListItem(Label("Global Settings"), id="cat-globals")
yield ListItem(Label("Stores"), id="cat-stores")
yield ListItem(Label("Plugins"), id="cat-providers")
yield ListItem(Label("Tools"), id="cat-tools")
with Vertical(id="config-content"):
yield ScrollableContainer(id="fields-container")
with Horizontal(id="config-actions"):
yield Button("Save", variant="success", id="save-btn")
# Durable synchronous save: waits and verifies DB persisted critical keys
yield Button("Save (durable)", variant="primary", id="save-durable-btn")
yield Button("Add Store", variant="primary", id="add-store-btn")
yield Button("Add Plugin", variant="primary", id="add-provider-btn")
yield Button("Add Tool", variant="primary", id="add-tool-btn")
yield Button("Back", id="back-btn")
yield Button("Close", variant="error", id="cancel-btn")
def on_mount(self) -> None:
self.query_one("#add-store-btn", Button).display = False
self.query_one("#add-provider-btn", Button).display = False
try:
self.query_one("#add-tool-btn", Button).display = False
except Exception:
logger.exception("Failed to hide add-tool button in ConfigModal.on_mount")
# Update DB path and last-saved on mount
try:
self.query_one("#config-db-path", Static).update(self._db_path)
except Exception:
logger.exception("Failed to update config DB path display in ConfigModal.on_mount")
try:
mtime = None
try:
mtime = db.db_path.stat().st_mtime
mtime = __import__('datetime').datetime.utcfromtimestamp(mtime).isoformat() + "Z"
except Exception:
logger.exception("Failed to stat DB path for last-saved time")
mtime = None
self.query_one("#config-last-save", Static).update(f"Last saved: {mtime or '(unknown)'}")
except Exception:
logger.exception("Failed to update last-saved display in ConfigModal.on_mount")
self.refresh_view()
def refresh_view(self) -> None:
"""
Refresh the content area. We debounce this call and use a render_id
to avoid race conditions with Textual's async widget mounting.
"""
self._render_id = getattr(self, "_render_id", 0) + 1
if hasattr(self, "_refresh_timer"):
self._refresh_timer.stop()
self._refresh_timer = self.set_timer(0.02, self._actual_refresh)
def _actual_refresh(self) -> None:
try:
container = self.query_one("#fields-container", ScrollableContainer)
except Exception:
return
self._button_id_map.clear()
self._provider_button_map.clear()
self._input_id_map.clear()
# Clear existing
container.query("*").remove()
# Update visibility of buttons
try:
self.query_one("#add-store-btn", Button).display = (self.current_category == "stores" and self.editing_item_name is None)
self.query_one("#add-provider-btn", Button).display = (self.current_category == "providers" and self.editing_item_name is None)
self.query_one("#add-tool-btn", Button).display = (self.current_category == "tools" and self.editing_item_name is None)
self.query_one("#back-btn", Button).display = (self.editing_item_name is not None)
self.query_one("#save-btn", Button).display = (self.editing_item_name is not None or self.current_category == "globals")
except Exception:
logger.exception("Failed to update visibility of config modal action buttons")
render_id = self._render_id
def do_mount():
# If a new refresh was started, ignore this old mount request
if getattr(self, "_render_id", 0) != render_id:
return
# Final check that container is empty. remove() is async.
if container.children:
for child in list(container.children):
child.remove()
if self.editing_item_name:
self.render_item_editor(container)
elif self.current_category == "globals":
self.render_globals(container)
elif self.current_category == "stores":
self.render_stores(container)
elif self.current_category == "providers":
self.render_providers(container)
elif self.current_category == "tools":
self.render_tools(container)
self.call_after_refresh(do_mount)
def render_globals(self, container: ScrollableContainer) -> None:
container.mount(Label("General Configuration", classes="config-label"))
schema = get_global_schema()
schema_map = {f["key"].lower(): f for f in schema}
existing_keys_lower = set()
render_state = {"group": None, "mounted_any": False}
idx = 0
# Show fields defined in schema first
for field_def in schema:
key_lower = field_def["key"].lower()
existing_keys_lower.add(key_lower)
self._mount_schema_group(container, field_def, render_state)
# Find current value (case-insensitive)
current_val = None
found_key = field_def["key"]
for k, v in self.config_data.items():
if k.lower() == key_lower:
current_val = str(v)
found_key = k
break
if current_val is None:
current_val = str(field_def.get("default") or "")
inp_id = f"global-{idx}"
self._input_id_map[inp_id] = found_key
self._mount_schema_field(container, field_def, inp_id, current_val, allow_paste=True)
render_state["mounted_any"] = True
idx += 1
# Show any other top-level keys not in schema
for k, v in self.config_data.items():
if not isinstance(v, dict) and not k.startswith("_"):
if k.lower() in existing_keys_lower:
continue
inp_id = f"global-{idx}"
self._input_id_map[inp_id] = k
container.mount(Label(k))
row = Horizontal(classes="field-row")
container.mount(row)
row.mount(Input(value=str(v), id=inp_id, classes="config-input"))
row.mount(Button("Paste", id=f"paste-{inp_id}", classes="paste-btn"))
idx += 1
def _mount_schema_group(self, container: ScrollableContainer, field_def: Dict[str, Any], state: Dict[str, Any]) -> None:
group_name = str(field_def.get("group") or "").strip()
if not group_name:
return
if state.get("group") == group_name:
return
if state.get("mounted_any"):
container.mount(Rule())
container.mount(Label(group_name, classes="config-group"))
state["group"] = group_name
def _normalized_select_options(self, choices: Any, current_val: Any) -> tuple[list[tuple[str, str]], str]:
select_options: list[tuple[str, str]] = []
choice_values: list[str] = []
for choice in list(choices or []):
if isinstance(choice, tuple) and len(choice) == 2:
label = str(choice[0])
value = str(choice[1])
else:
label = str(choice)
value = str(choice)
if value.lower() in ("true", "false"):
value = value.lower()
label = value
select_options.append((label, value))
choice_values.append(value)
normalized_current = str(current_val) if current_val is not None else ""
if normalized_current.lower() in ("true", "false"):
normalized_current = normalized_current.lower()
if normalized_current not in choice_values:
select_options.insert(0, (normalized_current, normalized_current))
return select_options, normalized_current
def _mount_schema_field(
self,
container: ScrollableContainer,
field_def: Dict[str, Any],
widget_id: str,
current_value: Any,
*,
allow_paste: bool,
) -> None:
label_text = str(field_def.get("label") or field_def.get("key") or widget_id)
if field_def.get("required"):
label_text += " *"
container.mount(Label(label_text))
choices = field_def.get("choices")
field_type = str(field_def.get("type") or "").strip().lower()
if choices:
select_options, normalized_current = self._normalized_select_options(choices, current_value)
container.mount(Select(select_options, value=normalized_current, id=widget_id))
return
if field_type in {"multiline", "textarea"}:
text_area = TextArea(str(current_value or ""), id=widget_id, classes="config-textarea")
placeholder = field_def.get("placeholder")
if placeholder:
try:
text_area.tooltip = str(placeholder)
except Exception:
pass
container.mount(text_area)
return
row = Horizontal(classes="field-row")
container.mount(row)
input_widget = Input(value=str(current_value or ""), id=widget_id, classes="config-input")
if field_def.get("secret") or field_type == "secret":
input_widget.password = True
if field_def.get("placeholder"):
input_widget.placeholder = str(field_def.get("placeholder") or "")
elif field_type == "path":
input_widget.placeholder = "Path"
row.mount(input_widget)
if allow_paste:
row.mount(Button("Paste", id=f"paste-{widget_id}", classes="paste-btn"))
def render_stores(self, container: ScrollableContainer) -> None:
container.mount(Label("Configured Stores", classes="config-label"))
stores = self.config_data.get("store", {})
if not stores:
container.mount(Static("No stores configured."))
else:
# stores is structured as: {type: {name_key: config}}
idx = 0
for stype, instances in stores.items():
if isinstance(instances, dict):
for name_key, conf in instances.items():
# Use the name field from the config if it exists, otherwise use the key
display_name = name_key
if isinstance(conf, dict):
display_name = (
conf.get("NAME")
or conf.get("name")
or conf.get("Name")
or name_key
)
edit_id = f"edit-store-{idx}"
del_id = f"del-store-{idx}"
self._button_id_map[edit_id] = ("edit", f"store-{stype}", name_key)
self._button_id_map[del_id] = ("del", f"store-{stype}", name_key)
idx += 1
row = Horizontal(
Static(f"{display_name} ({stype})", classes="item-label"),
Button("Edit", id=edit_id),
Button("Delete", variant="error", id=del_id),
classes="item-row"
)
container.mount(row)
def render_providers(self, container: ScrollableContainer) -> None:
container.mount(Label("Configured Plugins", classes="config-label"))
providers = self.config_data.get("provider", {})
if not providers:
container.mount(Static("No plugins configured."))
else:
for i, (name, _) in enumerate(providers.items()):
edit_id = f"edit-provider-{i}"
del_id = f"del-provider-{i}"
self._button_id_map[edit_id] = ("edit", "provider", name)
self._button_id_map[del_id] = ("del", "provider", name)
row = Horizontal(
Static(name, classes="item-label"),
Button("Edit", id=edit_id),
Button("Delete", variant="error", id=del_id),
classes="item-row"
)
container.mount(row)
def render_tools(self, container: ScrollableContainer) -> None:
container.mount(Label("Configured Tools", classes="config-label"))
tools = self.config_data.get("tool", {})
if not tools:
container.mount(Static("No tools configured."))
else:
for i, (name, _) in enumerate(tools.items()):
edit_id = f"edit-tool-{i}"
del_id = f"del-tool-{i}"
self._button_id_map[edit_id] = ("edit", "tool", name)
self._button_id_map[del_id] = ("del", "tool", name)
row = Horizontal(
Static(name, classes="item-label"),
Button("Edit", id=edit_id),
Button("Delete", variant="error", id=del_id),
classes="item-row"
)
container.mount(row)
def render_item_editor(self, container: ScrollableContainer) -> None:
item_type = str(self.editing_item_type or "")
item_name = str(self.editing_item_name or "")
item_schema_map = get_item_schema_map(item_type, item_name)
render_state = {"group": None, "mounted_any": False}
# Parse item_type for store-{stype} or just provider
if item_type.startswith("store-"):
stype = item_type.replace("store-", "")
container.mount(Label(f"Editing Store: {item_name} ({stype})", classes="config-label"))
section = self.config_data.get("store", {}).get(stype, {}).get(item_name, {})
else:
container.mount(Label(f"Editing {item_type.capitalize()}: {item_name}", classes="config-label"))
section = self.config_data.get(item_type, {}).get(item_name, {})
# Use columns for better layout of inputs with paste buttons
container.mount(Label("Edit Settings"))
# render_item_editor will handle the inputs for us if we set these
# but wait, render_item_editor is called from refresh_view, not here.
# actually we don't need to do anything else here because refresh_view calls render_item_editor
# which now handles the paste buttons.
# Show all existing keys
existing_keys_upper = set()
idx = 0
for k, v in section.items():
if k.startswith("_"): continue
# Skip low-level keys that shouldn't be editable via the form UI
if (
item_type == "provider"
and isinstance(item_name, str)
and item_name.strip().lower() == "matrix"
and str(k or "").strip().lower() in ("rooms", "cached_rooms")
):
# These are managed by the inline UI and should not be edited directly.
continue
# Deduplicate keys case-insensitively (e.g. name vs NAME vs Name)
k_upper = k.upper()
if k_upper in existing_keys_upper:
continue
existing_keys_upper.add(k_upper)
# Determine display props from schema
label_text = k
is_secret = False
choices = None
schema = item_schema_map.get(k_upper)
if schema:
self._mount_schema_group(container, schema, render_state)
if schema.get("secret"):
is_secret = True
choices = schema.get("choices")
inp_id = f"item-{idx}"
self._input_id_map[inp_id] = k
if schema:
self._mount_schema_field(container, schema, inp_id, v, allow_paste=True)
else:
container.mount(Label(label_text))
row = Horizontal(classes="field-row")
container.mount(row)
inp = Input(value=str(v), id=inp_id, classes="config-input")
if is_secret:
inp.password = True
row.mount(inp)
render_state["mounted_any"] = True
idx += 1
# Add required/optional fields from schema that are missing
for k_upper, field_def in item_schema_map.items():
if k_upper not in existing_keys_upper:
existing_keys_upper.add(k_upper)
key = field_def["key"]
self._mount_schema_group(container, field_def, render_state)
default_val = str(field_def.get("default") or "")
inp_id = f"item-{idx}"
self._input_id_map[inp_id] = key
self._mount_schema_field(container, field_def, inp_id, default_val, allow_paste=True)
render_state["mounted_any"] = True
idx += 1
for required_key in get_required_config_keys(item_type, item_name):
if required_key.upper() in existing_keys_upper:
continue
existing_keys_upper.add(required_key.upper())
container.mount(Label(required_key))
inp_id = f"item-{idx}"
self._input_id_map[inp_id] = required_key
row = Horizontal(classes="field-row")
container.mount(row)
row.mount(Input(value="", id=inp_id, classes="config-input"))
row.mount(Button("Paste", id=f"paste-{inp_id}", classes="paste-btn"))
idx += 1
if item_type == "provider" and isinstance(item_name, str):
provider = self._instantiate_provider_for_editor(item_name, self.config_data)
if provider is not None:
provider_actions = provider.config_actions() or []
if provider_actions:
container.mount(Rule())
container.mount(Label(f"{provider.label} helpers", classes="config-label"))
helper_text = str(provider.config_helper_text() or "Use these helpers to validate provider settings.").strip()
status = Static(helper_text, id="provider-status")
container.mount(status)
self._provider_status = status
row = Horizontal(classes="field-row")
container.mount(row)
for action in provider_actions:
action_id = str(action.get("id") or "").strip()
if not action_id:
continue
button_id = f"provider-action-{item_name}-{action_id}".replace(" ", "-")
self._provider_button_map[button_id] = (item_name, action_id)
row.mount(
Button(
str(action.get("label") or action_id.replace("_", " ").title()),
id=button_id,
variant=str(action.get("variant") or "default"),
)
)
if (
item_type == "provider"
and isinstance(item_name, str)
and item_name.strip().lower() == "matrix"
):
container.mount(Rule())
container.mount(Label("Matrix helpers", classes="config-label"))
status = Static("Set homeserver + token, then test before saving", id="matrix-status")
container.mount(status)
row = Horizontal(classes="field-row")
container.mount(row)
row.mount(Button("Test connection", id="matrix-test-btn"))
# Load rooms refreshes the inline list and caches the results (no popup)
row.mount(Button("Load rooms", variant="primary", id="matrix-load-btn"))
self._matrix_status = status
# Inline rooms list for selecting default rooms (populated after a successful test)
container.mount(Label("Default Rooms", classes="config-label", id="matrix-inline-label"))
# Start with empty list; it will be filled when test loads rooms
container.mount(ListView(id="matrix-rooms-inline"))
# Inline actions
row2 = Horizontal(classes="field-row")
container.mount(row2)
row2.mount(Button("Select All", id="matrix-inline-select-all"))
row2.mount(Button("Clear All", id="matrix-inline-clear"))
save_inline = Button("Save defaults", variant="success", id="matrix-inline-save")
save_inline.disabled = True
row2.mount(save_inline)
# Local bookkeeping maps
try:
self._matrix_inline_checkbox_map = {}
self._matrix_inline_list = self.query_one("#matrix-rooms-inline", ListView)
# Do NOT auto-render cached rooms here; only show explicitly saved defaults
try:
existing_ids = self._parse_matrix_rooms_value()
cached = self._get_cached_matrix_rooms()
rooms_to_render: List[Dict[str, Any]] = []
# Start with cached rooms (from last Load). These are shown
# in the inline Default Rooms list but are unselected unless
# they are in the saved defaults list.
if cached:
rooms_to_render.extend(cached)
# Ensure saved default room ids are present and will be selected
if existing_ids:
cached_ids = {str(r.get("room_id") or "").strip() for r in rooms_to_render if isinstance(r, dict)}
need_resolve = [rid for rid in existing_ids if rid not in cached_ids]
if need_resolve:
try:
resolved = self._resolve_matrix_rooms_by_ids(need_resolve)
if resolved:
rooms_to_render.extend(resolved)
else:
rooms_to_render.extend([{"room_id": rid, "name": ""} for rid in need_resolve])
except Exception:
rooms_to_render.extend([{"room_id": rid, "name": ""} for rid in need_resolve])
# Deduplicate while preserving order
deduped: List[Dict[str, Any]] = []
seen_ids: set[str] = set()
for r in rooms_to_render:
try:
rid = str(r.get("room_id") or "").strip()
if not rid or rid in seen_ids:
continue
seen_ids.add(rid)
deduped.append(r)
except Exception:
logger.exception("Failed to process a matrix room entry while deduplicating")
continue
if self._matrix_inline_list is not None and deduped:
try:
self._render_matrix_rooms_inline(deduped)
except Exception:
logger.exception("Failed to render matrix inline rooms")
except Exception:
logger.exception("Failed to fetch or process matrix rooms for inline rendering")
except Exception:
self._matrix_inline_checkbox_map = {}
self._matrix_inline_list = None
def create_field(self, name: str, value: Any, id: str) -> Vertical:
# This method is now unused - we mount labels and inputs directly
v = Vertical(classes="config-field")
return v
def on_list_view_selected(self, event: ListView.Selected) -> None:
# Only respond to selections from the left-hand category list. Avoid
# resetting editor state when other ListViews (like the inline rooms
# list) trigger selection events.
if not event.item:
return
item_id = getattr(event.item, "id", None)
if item_id not in ("cat-globals", "cat-stores", "cat-providers", "cat-tools"):
return
if item_id == "cat-globals":
self.current_category = "globals"
elif item_id == "cat-stores":
self.current_category = "stores"
elif item_id == "cat-providers":
self.current_category = "providers"
elif item_id == "cat-tools":
self.current_category = "tools"
# Reset editor state and refresh view for the new category
self.editing_item_name = None
self.editing_item_type = None
self.refresh_view()
def on_button_pressed(self, event: Button.Pressed) -> None:
bid = event.button.id
if not bid: return
if bid == "cancel-btn":
self._revert_unsaved_editor_changes()
self.dismiss()
elif bid == "back-btn":
self._revert_unsaved_editor_changes()
self.editing_item_name = None
self.editing_item_type = None
self.refresh_view()
elif bid == "save-btn":
self._synchronize_inputs_to_config()
if not self.validate_current_editor():
return
if self.editing_item_name and not self._editor_has_changes():
self.notify("No changes to save", severity="warning", timeout=3)
return
try:
saved = self.save_all()
if saved == 0:
msg = f"Configuration saved (no rows changed) to {db.db_path.name}"
else:
msg = f"Configuration saved ({saved} change(s)) to {db.db_path.name}"
# Make the success notification visible a bit longer so it's not missed
self.notify(msg, timeout=5)
# Return to the main list view within the current category
self.editing_item_name = None
self.editing_item_type = None
self.refresh_view()
self._editor_snapshot = None
except ConfigSaveConflict as exc:
# A concurrent on-disk change was detected; do not overwrite it.
self.notify(
"Save aborted: configuration changed on disk. The editor will refresh.",
severity="error",
timeout=10,
)
# Refresh our in-memory view from disk and drop the editor snapshot
try:
self.config_data = reload_config()
except Exception:
logger.exception("Failed to reload config after save conflict")
self._editor_snapshot = None
self.editing_item_name = None
self.editing_item_type = None
self.refresh_view()
except Exception as exc:
try:
log(f"Configuration save failed: {exc}")
except Exception:
logger.exception("Failed to write save failure to logs")
self.notify(f"Save failed: {exc}", severity="error", timeout=10)
elif bid == "save-durable-btn":
# Perform a synchronous, verified save and notify status to the user.
self._synchronize_inputs_to_config()
if not self.validate_current_editor():
return
if self.editing_item_name and not self._editor_has_changes():
self.notify("No changes to save", severity="warning", timeout=3)
return
try:
from SYS.config import save_config_and_verify
saved = save_config_and_verify(self.config_data, retries=3, delay=0.1)
try:
self.config_data = reload_config()
except Exception:
logger.exception("Failed to reload config after durable save")
if saved == 0:
msg = f"Configuration saved (no rows changed) to {db.db_path.name}"
else:
msg = f"Configuration saved ({saved} change(s)) to {db.db_path.name} (verified)"
try:
self.notify(msg, timeout=6)
except Exception:
logger.exception("Failed to show notification message in ConfigModal")
# Return to the main list view within the current category
self.editing_item_name = None
self.editing_item_type = None
self.refresh_view()
self._editor_snapshot = None
except Exception as exc:
try:
log(f"Durable configuration save failed: {exc}")
except Exception:
logger.exception("Failed to write durable save failure to logs")
self.notify(f"Durable save failed: {exc}", severity="error", timeout=10)
try:
log(f"Durable save failed: {exc}")
except Exception:
logger.exception("Failed to call log() for durable save error")
elif bid in self._button_id_map:
action, itype, name = self._button_id_map[bid]
if action == "edit":
self._capture_editor_snapshot()
self.editing_item_type = itype
self.editing_item_name = name
self.refresh_view()
elif action == "del":
removed = False
if itype.startswith("store-"):
stype = itype.replace("store-", "")
if "store" in self.config_data and stype in self.config_data["store"]:
if name in self.config_data["store"][stype]:
del self.config_data["store"][stype][name]
removed = True
elif itype == "provider":
if "provider" in self.config_data and name in self.config_data["provider"]:
del self.config_data["provider"][name]
removed = True
if str(name).strip().lower() == "alldebrid":
self._remove_alldebrid_store_entry()
elif itype == "tool":
if "tool" in self.config_data and name in self.config_data["tool"]:
del self.config_data["tool"][name]
removed = True
if removed:
try:
saved = self.save_all()
self.notify("Saving configuration...", timeout=3)
except Exception as exc:
try:
log(f"Configuration save failed while deleting config entry: {exc}")
except Exception:
logger.exception("Failed to write config delete save failure to logs")
self.notify(f"Save failed: {exc}", severity="error", timeout=10)
self.refresh_view()
elif bid in self._provider_button_map:
provider_name, action_id = self._provider_button_map[bid]
self._request_provider_action(provider_name, action_id)
elif bid == "add-store-btn":
options = get_configurable_store_types()
self.app.push_screen(SelectionModal("Select Store Type", options), callback=self.on_store_type_selected)
elif bid == "add-provider-btn":
options = get_configurable_provider_types()
self.app.push_screen(SelectionModal("Select Plugin Type", options), callback=self.on_provider_type_selected)
elif bid == "add-tool-btn":
options = get_configurable_tool_types() or ["ytdlp"]
if options:
options.sort()
self.app.push_screen(SelectionModal("Select Tool", options), callback=self.on_tool_type_selected)
elif bid == "matrix-test-btn":
self._request_matrix_test()
elif bid == "matrix-load-btn":
# Refresh the inline rooms list and cache the results (no popup)
self._request_matrix_load()
elif bid == "matrix-inline-select-all":
for checkbox_id in list(self._matrix_inline_checkbox_map.keys()):
try:
cb = self.query_one(f"#{checkbox_id}", Checkbox)
cb.value = True
except Exception:
logger.exception("Failed to set matrix inline checkbox to True for '%s'", checkbox_id)
try:
self.query_one("#matrix-inline-save", Button).disabled = False
except Exception:
logger.exception("Failed to enable matrix inline save button")
elif bid == "matrix-inline-clear":
for checkbox_id in list(self._matrix_inline_checkbox_map.keys()):
try:
cb = self.query_one(f"#{checkbox_id}", Checkbox)
cb.value = False
except Exception:
logger.exception("Failed to set matrix inline checkbox to False for '%s'", checkbox_id)
try:
self.query_one("#matrix-inline-save", Button).disabled = True
except Exception:
logger.exception("Failed to disable matrix inline save button")
elif bid == "matrix-inline-save":
selected: List[str] = []
for checkbox_id, room_id in self._matrix_inline_checkbox_map.items():
try:
cb = self.query_one(f"#{checkbox_id}", Checkbox)
if cb.value and room_id:
selected.append(room_id)
except Exception:
logger.exception("Failed to read matrix inline checkbox '%s'", checkbox_id)
if not selected:
if self._matrix_status:
self._matrix_status.update("No default rooms were saved.")
return
matrix_block = self.config_data.setdefault("provider", {}).setdefault("matrix", {})
matrix_block["rooms"] = ", ".join(selected)
changed = count_changed_entries(self.config_data)
try:
entries = save_config(self.config_data)
except Exception as exc:
try:
log(f"Saving Matrix default rooms failed: {exc}")
except Exception:
logger.exception("Failed to write Matrix room save failure to logs")
if self._matrix_status:
self._matrix_status.update(f"Saving default rooms failed: {exc}")
return
self.config_data = reload_config()
if self._matrix_status:
status = f"Saved {len(selected)} default room(s) ({changed} change(s)) to {db.db_path.name}."
self._matrix_status.update(status)
try:
self.query_one("#matrix-inline-save", Button).disabled = True
except Exception:
logger.exception("Failed to disable matrix inline save button")
self.refresh_view()
async def focus_and_paste(self, inp: Input) -> None:
if hasattr(self.app, "paste_from_clipboard"):
text = await self.app.paste_from_clipboard()
if text:
# Replace selection or append
inp.value = str(inp.value) + text
inp.focus()
self.notify("Pasted from clipboard")
else:
self.notify("Clipboard not supported in this terminal", severity="warning")
async def action_paste(self) -> None:
focused = self.focused
if isinstance(focused, Input):
await self.focus_and_paste(focused)
async def action_copy(self) -> None:
focused = self.focused
if isinstance(focused, Input) and focused.value:
if hasattr(self.app, "copy_to_clipboard"):
self.app.copy_to_clipboard(str(focused.value))
self.notify("Copied to clipboard")
else:
self.notify("Clipboard not supported in this terminal", severity="warning")
def _instantiate_provider_for_editor(self, provider_name: str, config_data: Optional[Dict[str, Any]] = None) -> Optional[Any]:
try:
provider_class = get_plugin_class(provider_name)
except Exception:
provider_class = None
if provider_class is None:
return None
try:
return provider_class(config_data or self.config_data)
except Exception:
logger.exception("Failed to instantiate provider '%s' for config helper", provider_name)
return None
def _request_provider_action(self, provider_name: str, action_id: str) -> None:
if self._provider_action_running:
return
self._synchronize_inputs_to_config()
self._provider_action_running = True
if self._provider_status is not None:
self._provider_status.update(f"Running {action_id.replace('_', ' ')}")
self._provider_action_background(provider_name, action_id, deepcopy(self.config_data))
@work(thread=True)
def _provider_action_background(self, provider_name: str, action_id: str, config_snapshot: Dict[str, Any]) -> None:
try:
provider = self._instantiate_provider_for_editor(provider_name, config_snapshot)
if provider is None:
raise RuntimeError(f"Provider '{provider_name}' is unavailable")
result = provider.run_config_action(action_id)
if not isinstance(result, dict):
result = {"ok": False, "message": f"Provider '{provider_name}' returned an invalid config action result."}
except Exception as exc:
result = {"ok": False, "message": str(exc) or f"Provider action '{action_id}' failed."}
try:
self.app.call_from_thread(self._provider_action_complete, provider_name, action_id, result)
except Exception:
self._provider_action_complete(provider_name, action_id, result)
def _provider_action_complete(self, provider_name: str, action_id: str, result: Dict[str, Any]) -> None:
self._provider_action_running = False
ok = bool(result.get("ok"))
message = str(result.get("message") or f"Provider action '{action_id}' finished.")
updates = result.get("config_updates")
if ok and isinstance(updates, dict):
provider_block = self.config_data.setdefault("provider", {}).setdefault(provider_name, {})
if isinstance(provider_block, dict):
provider_block.update(updates)
message = f"{message}"
try:
self.refresh_view()
except Exception:
logger.exception("Failed to refresh config view after provider action")
if self._provider_status is not None:
self._provider_status.update(message)
try:
self.notify(message, severity="error" if not ok else "information", timeout=8)
except Exception:
logger.exception("Failed to notify provider action result for %s/%s", provider_name, action_id)
# Backup/restore helpers removed: forensics/audit mode disabled and restore UI removed.
def on_store_type_selected(self, stype: str) -> None:
if not stype:
return
self._capture_editor_snapshot()
existing_names: set[str] = set()
store_block = self.config_data.get("store")
if isinstance(store_block, dict):
st_entries = store_block.get(stype)
if isinstance(st_entries, dict):
existing_names = {str(name) for name in st_entries.keys() if name}
base_name = f"new_{stype}"
new_name = base_name
suffix = 1
while new_name in existing_names:
suffix += 1
new_name = f"{base_name}_{suffix}"
if "store" not in self.config_data:
self.config_data["store"] = {}
if stype not in self.config_data["store"]:
self.config_data["store"][stype] = {}
# Default config for the new store
new_config = build_default_store_config(stype, new_name)
self.config_data["store"][stype][new_name] = new_config
self.editing_item_type = f"store-{stype}"
self.editing_item_name = new_name
self.refresh_view()
def on_provider_type_selected(self, ptype: str) -> None:
if not ptype: return
self._capture_editor_snapshot()
if "provider" not in self.config_data:
self.config_data["provider"] = {}
# For providers, they are usually top-level entries in 'provider' dict
if ptype not in self.config_data["provider"]:
self.config_data["provider"][ptype] = build_default_provider_config(ptype)
self.editing_item_type = "provider"
self.editing_item_name = ptype
self.refresh_view()
def on_tool_type_selected(self, tname: str) -> None:
if not tname:
return
self._capture_editor_snapshot()
if "tool" not in self.config_data:
self.config_data["tool"] = {}
if tname not in self.config_data["tool"]:
self.config_data["tool"][tname] = build_default_tool_config(tname)
self.editing_item_type = "tool"
self.editing_item_name = tname
self.refresh_view()
def _update_config_value(self, widget_id: str, value: Any) -> None:
if widget_id not in self._input_id_map:
return
key = self._input_id_map[widget_id]
raw_value = value
is_blank_string = isinstance(raw_value, str) and not raw_value.strip()
existing_value: Any = None
item_type = str(self.editing_item_type or "")
item_name = str(self.editing_item_name or "")
if widget_id.startswith("global-"):
existing_value = self.config_data.get(key)
elif widget_id.startswith("item-") and item_name:
if item_type.startswith("store-"):
stype = item_type.replace("store-", "")
store_block = self.config_data.get("store")
if isinstance(store_block, dict):
type_block = store_block.get(stype)
if isinstance(type_block, dict):
section = type_block.get(item_name)
if isinstance(section, dict):
existing_value = section.get(key)
else:
section_block = self.config_data.get(item_type)
if isinstance(section_block, dict):
section = section_block.get(item_name)
if isinstance(section, dict):
existing_value = section.get(key)
if is_blank_string and existing_value is None:
return
# Try to preserve boolean/integer types
processed_value = coerce_config_value(raw_value, existing_value)
if widget_id.startswith("global-"):
self.config_data[key] = processed_value
elif widget_id.startswith("item-") and item_name:
if item_type.startswith("store-"):
stype = item_type.replace("store-", "")
if "store" not in self.config_data:
self.config_data["store"] = {}
if stype not in self.config_data["store"]:
self.config_data["store"][stype] = {}
if item_name not in self.config_data["store"][stype]:
self.config_data["store"][stype][item_name] = {}
# Special case: Renaming the store via the NAME field
if key.upper() == "NAME" and processed_value and str(processed_value) != item_name:
new_name = str(processed_value)
self.config_data["store"][stype][new_name] = self.config_data["store"][stype].pop(item_name)
self.editing_item_name = new_name
item_name = new_name
self.config_data["store"][stype][item_name][key] = processed_value
else:
if item_type not in self.config_data:
self.config_data[item_type] = {}
if item_name not in self.config_data[item_type]:
self.config_data[item_type][item_name] = {}
self.config_data[item_type][item_name][key] = processed_value
def _synchronize_inputs_to_config(self) -> None:
"""Capture current input/select values before saving."""
widgets = list(self.query(Input)) + list(self.query(Select)) + list(self.query(TextArea))
for widget in widgets:
widget_id = widget.id
if not widget_id or widget_id not in self._input_id_map:
continue
if isinstance(widget, Select):
if widget.value == Select.BLANK:
continue
value = widget.value
elif isinstance(widget, TextArea):
value = widget.text
else:
value = widget.value
self._update_config_value(widget_id, value)
def _remove_alldebrid_store_entry(self) -> bool:
"""Remove the mirrored AllDebrid store entry that would recreate the provider."""
store_block = self.config_data.get("store")
if not isinstance(store_block, dict):
return False
debrid = store_block.get("debrid")
if not isinstance(debrid, dict):
return False
removed = False
for key in list(debrid.keys()):
if str(key or "").strip().lower() == "all-debrid":
debrid.pop(key, None)
removed = True
if not debrid:
store_block.pop("debrid", None)
if not store_block:
self.config_data.pop("store", None)
return removed
def _get_matrix_provider_block(self) -> Dict[str, Any]:
providers = self.config_data.get("provider")
if not isinstance(providers, dict):
return {}
block = providers.get("matrix")
return block if isinstance(block, dict) else {}
def _parse_matrix_rooms_value(self) -> List[str]:
block = self._get_matrix_provider_block()
raw = block.get("rooms")
if isinstance(raw, (list, tuple, set)):
return [str(item).strip() for item in raw if str(item).strip()]
text = str(raw or "").strip()
if not text:
return []
return [segment for segment in re.split(r"[\s,]+", text) if segment]
def _request_matrix_test(self) -> None:
if self._matrix_test_running:
return
self._synchronize_inputs_to_config()
# Quick client-side pre-check before attempting to save/test to provide
# immediate guidance when required fields are missing.
try:
matrix_block = self.config_data.get("provider", {}).get("matrix", {})
hs = matrix_block.get("homeserver")
token = matrix_block.get("access_token")
if not hs or not token:
if self._matrix_status:
self._matrix_status.update("Matrix test skipped: please set both 'homeserver' and 'access_token' before testing.")
return
except Exception:
logger.exception("Failed to check matrix configuration before testing")
if self._matrix_status:
self._matrix_status.update("Saving configuration before testing…")
changed = count_changed_entries(self.config_data)
try:
entries = save_config(self.config_data)
except Exception as exc:
try:
log(f"Saving configuration before Matrix test failed: {exc}")
except Exception:
logger.exception("Failed to write Matrix test pre-save failure to logs")
if self._matrix_status:
self._matrix_status.update(f"Saving configuration failed: {exc}")
self._matrix_test_running = False
return
self.config_data = reload_config()
if self._matrix_status:
self._matrix_status.update(f"Saved configuration ({changed} change(s)) to {db.db_path.name}. Testing Matrix connection…")
self._matrix_test_running = True
self._matrix_test_background()
@work(thread=True)
def _matrix_test_background(self) -> None:
try:
provider = get_plugin("matrix", self.config_data)
if provider is None:
raise RuntimeError("Matrix plugin unavailable")
rooms = provider.list_rooms()
self.app.call_from_thread(self._matrix_test_result, True, rooms, None)
except Exception as exc:
# Log full traceback for diagnostics but present a concise, actionable
# message to the user in the UI.
tb = traceback.format_exc()
try:
debug(f"[matrix] Test connection failed: {exc}\n{tb}")
except Exception:
logger.exception("Failed to debug matrix test failure")
msg = str(exc) or "Matrix test failed"
m_lower = msg.lower()
if "auth" in m_lower or "authentication" in m_lower:
msg = msg + ". Please verify your access token and try again."
elif "homeserver" in m_lower or "missing" in m_lower:
msg = msg + ". Check your homeserver URL (include https://)."
else:
msg = msg + " (see logs for details)"
self.app.call_from_thread(self._matrix_test_result, False, [], msg)
def _get_cached_matrix_rooms(self) -> List[Dict[str, Any]]:
"""Return cached rooms stored in the provider config (normalized).
The config value can be a list/dict, a JSON string, or a Python literal
string (repr). This method normalizes the input and returns a list of
dicts containing 'room_id' and 'name'. Malformed inputs are ignored.
"""
try:
block = self._get_matrix_provider_block()
raw = block.get("cached_rooms")
if not raw:
return []
# If it's already a list or tuple, normalize each element
if isinstance(raw, (list, tuple)):
return self._normalize_cached_raw(list(raw))
# If it's a dict, wrap and normalize
if isinstance(raw, dict):
return self._normalize_cached_raw([raw])
# If it's a string, try JSON -> ast.literal_eval -> regex ID extraction
if isinstance(raw, str):
s = str(raw).strip()
if not s:
return []
# Try JSON first (strict)
try:
import json
parsed = json.loads(s)
if isinstance(parsed, (list, tuple, dict)):
return self._normalize_cached_raw(parsed if isinstance(parsed, (list, tuple)) else [parsed])
except Exception:
logger.exception("Failed to parse cached_rooms JSON for provider matrix")
# Try Python literal eval (accepts single quotes, repr-style lists)
try:
import ast
parsed = ast.literal_eval(s)
if isinstance(parsed, (list, tuple, dict)):
return self._normalize_cached_raw(parsed if isinstance(parsed, (list, tuple)) else [parsed])
except Exception:
logger.exception("Failed to parse cached_rooms as Python literal for provider matrix")
# Try to extract dict-like pairs for room_id/name when the string looks like
# a Python repr or partial dict fragment (e.g., "'room_id': '!r1', 'name': 'Room'"
try:
import re
pair_pat = re.compile(r"[\"']room_id[\"']\s*:\s*[\"'](?P<id>[^\"']+)[\"']\s*,\s*[\"']name[\"']\s*:\s*[\"'](?P<name>[^\"']+)[\"']")
pairs = [m.groupdict() for m in pair_pat.finditer(s)]
if pairs:
out = []
for p in pairs:
rid = str(p.get("id") or "").strip()
name = str(p.get("name") or "").strip()
if rid:
out.append({"room_id": rid, "name": name})
if out:
return out
# As a last resort, extract candidate room ids via regex (look for leading '!')
ids = re.findall(r"![-A-Za-z0-9._=]+(?::[-A-Za-z0-9._=]+)?", s)
if ids:
return [{"room_id": rid, "name": ""} for rid in ids]
except Exception:
logger.exception("Failed to extract cached_rooms pairs or ids for provider matrix")
return []
except Exception:
logger.exception("Failed to parse cached_rooms for provider matrix")
return []
def _normalize_cached_raw(self, parsed: List[Any]) -> List[Dict[str, Any]]:
out: List[Dict[str, Any]] = []
for it in parsed:
try:
if isinstance(it, dict):
rid = str(it.get("room_id") or "").strip()
name = str(it.get("name") or "").strip()
if rid:
out.append({"room_id": rid, "name": name})
elif isinstance(it, str):
s = str(it or "").strip()
if s:
out.append({"room_id": s, "name": ""})
except Exception:
logger.exception("Failed to normalize cached_rooms entry: %r", it)
continue
return out
def _request_matrix_load(self) -> None:
"""Save current config and request a background load of joined rooms.
This replaces the old "Choose default rooms" popup and instead refreshes
the inline default rooms list and caches the results to config.
"""
if self._matrix_test_running:
return
self._synchronize_inputs_to_config()
# Quick client-side pre-check for required fields
try:
matrix_block = self.config_data.get("provider", {}).get("matrix", {})
hs = matrix_block.get("homeserver")
token = matrix_block.get("access_token")
if not hs or not token:
if self._matrix_status:
self._matrix_status.update("Load skipped: please set both 'homeserver' and 'access_token' before loading rooms.")
return
except Exception:
logger.exception("Failed to check matrix configuration before load")
if self._matrix_status:
self._matrix_status.update("Saving configuration before loading rooms…")
changed = count_changed_entries(self.config_data)
try:
entries = save_config(self.config_data)
except Exception as exc:
try:
log(f"Saving configuration before Matrix room load failed: {exc}")
except Exception:
logger.exception("Failed to write Matrix load pre-save failure to logs")
if self._matrix_status:
self._matrix_status.update(f"Saving configuration failed: {exc}")
self._matrix_test_running = False
return
self.config_data = reload_config()
if self._matrix_status:
self._matrix_status.update(f"Saved configuration ({changed} change(s)) to {db.db_path.name}. Loading Matrix rooms…")
self._matrix_test_running = True
self._matrix_load_background()
@work(thread=True)
def _matrix_load_background(self) -> None:
try:
provider = get_plugin("matrix", self.config_data)
if provider is None:
raise RuntimeError("Matrix plugin unavailable")
rooms = provider.list_rooms()
self.app.call_from_thread(self._matrix_load_result, True, rooms, None)
except Exception as exc:
tb = traceback.format_exc()
try:
debug(f"[matrix] Load rooms failed: {exc}\n{tb}")
except Exception:
logger.exception("Failed to debug matrix load failure")
msg = str(exc) or "Matrix load failed"
if "auth" in msg.lower():
msg = msg + ". Please verify your access token and try again."
self.app.call_from_thread(self._matrix_load_result, False, [], msg)
def _matrix_load_result(self, success: bool, rooms: List[Dict[str, Any]], error: Optional[str]) -> None:
# Called on the main thread via call_from_thread
self._matrix_test_running = False
if not success:
full_msg = f"Matrix load failed: {error or '(error)'}"
if self._matrix_status:
self._matrix_status.update(full_msg)
try:
self.notify(full_msg, severity="error", timeout=8)
except Exception:
logger.exception("Failed to show Matrix load failure notification")
return
# Populate inline list
try:
self._render_matrix_rooms_inline(rooms)
except Exception:
logger.exception("Failed to render inline matrix rooms")
# Persist cached rooms so they are available on next editor open
try:
matrix_block = self.config_data.setdefault("provider", {}).setdefault("matrix", {})
matrix_block["cached_rooms"] = rooms
# Schedule a background save of the config (non-blocking)
try:
self.save_all()
except Exception:
# Fallback to direct save when save_all is unavailable (tests)
try:
save_config(self.config_data)
except Exception:
logger.exception("Failed to persist cached matrix rooms via save_config() fallback")
if self._matrix_status:
self._matrix_status.update(f"Loaded and cached {len(rooms)} room(s).")
try:
self.notify(f"Loaded {len(rooms)} rooms and cached the results", timeout=5)
except Exception:
logger.exception("Failed to notify loaded-and-cached message for Matrix rooms")
except Exception:
logger.exception("Failed to cache Matrix rooms after load")
def _open_matrix_room_picker(
self,
*,
prefetched_rooms: Optional[List[Dict[str, Any]]] = None,
) -> None:
existing = self._parse_matrix_rooms_value()
self.app.push_screen(
MatrixRoomPicker(
self.config_data,
existing=existing,
rooms=prefetched_rooms,
),
callback=self.on_matrix_rooms_selected,
)
def _render_matrix_rooms_inline(self, rooms: List[Dict[str, Any]]) -> None:
"""
Populate the inline matrix rooms ListView with checkboxes based on the
list of rooms returned from a successful test. If the inline ListView
is not present in the current editor view, fall back to opening the
MatrixRoomPicker popup with the results.
"""
try:
inline_list = self._matrix_inline_list or self.query_one("#matrix-rooms-inline", ListView)
except Exception:
inline_list = None
if inline_list is None:
# Inline view isn't available in this context; cache the rooms and persist
try:
matrix_block = self.config_data.setdefault("provider", {}).setdefault("matrix", {})
matrix_block["cached_rooms"] = rooms
try:
self.save_all()
except Exception:
try:
save_config(self.config_data)
except Exception:
logger.exception("Failed to persist cached matrix rooms via save_config() fallback")
if self._matrix_status:
self._matrix_status.update(f"Loaded {len(rooms)} rooms (cached)")
try:
self.notify(f"Loaded {len(rooms)} rooms and cached the results", timeout=5)
except Exception:
logger.exception("Failed to notify loaded-and-cached message for Matrix rooms")
except Exception:
logger.exception("Failed to cache Matrix rooms when inline view unavailable")
return
# Clear current entries
for child in list(inline_list.children):
child.remove()
self._matrix_inline_checkbox_map.clear()
self._matrix_inline_list = inline_list
# Determine existing selection from current config (so saved defaults are pre-selected)
existing = set(self._parse_matrix_rooms_value())
if not rooms:
if self._matrix_status:
self._matrix_status.update("Matrix returned no rooms.")
try:
save_btn = self.query_one("#matrix-inline-save", Button)
save_btn.disabled = True
except Exception:
logger.exception("Failed to disable matrix inline save button when no rooms returned")
return
any_selected = False
# Filter and normalize rooms to avoid malformed cache entries or split-word artifacts.
normalized: List[Dict[str, str]] = []
for r in rooms:
try:
rid = str(r.get("room_id") or "").strip()
name = str(r.get("name") or "").strip()
except Exception:
continue
# Ignore obviously malformed tokens coming from bad caching/parsing
le_rid = rid.lower()
le_name = name.lower()
if "room_id" in le_rid or "room_id" in le_name:
continue
# Require a valid room id (Matrix ids usually start with '!' and often contain ':')
if not rid or (not rid.startswith("!") and ":" not in rid):
# Skip entries without a sensible ID (we rely on IDs for saving)
continue
normalized.append({"room_id": rid, "name": name})
for idx, room in enumerate(normalized):
room_id = room.get("room_id") or ""
name = room.get("name") or ""
checkbox_id = f"matrix-inline-room-{idx}"
label_text = name or room_id or "Matrix Room"
checked = bool(room_id and room_id in existing)
if checked:
any_selected = True
from textual.widgets import Checkbox as _Checkbox # local import to avoid top-level change
checkbox = _Checkbox(label_text, id=checkbox_id, value=checked)
self._matrix_inline_checkbox_map[checkbox_id] = room_id
inline_list.mount(ListItem(checkbox, classes="matrix-room-row"))
if self._matrix_status:
self._matrix_status.update("Loaded rooms. Select one or more and save.")
try:
save_btn = self.query_one("#matrix-inline-save", Button)
save_btn.disabled = not any_selected
except Exception:
logger.exception("Failed to set matrix inline save button disabled state")
def _resolve_matrix_rooms_by_ids(self, ids: Iterable[str]) -> List[Dict[str, Any]]:
"""
Resolve room display names for a list of room IDs using the Matrix provider.
Returns a list of dictionaries with keys 'room_id' and 'name' on success, or an
empty list on failure.
"""
try:
ids_list = [str(i).strip() for i in ids if str(i).strip()]
except Exception:
return []
if not ids_list:
return []
# Only attempt network resolution if homeserver + token are present
block = self._get_matrix_provider_block()
hs = block.get("homeserver")
token = block.get("access_token")
if not hs or not token:
return []
try:
provider = get_plugin("matrix", self.config_data)
if provider is None:
return []
rooms = provider.list_rooms(room_ids=ids_list)
return rooms or []
except Exception as exc:
try:
debug(f"[config] failed to resolve matrix room names: {exc}")
except Exception:
logger.exception("Failed to debug matrix name resolution")
return []
def on_matrix_rooms_selected(self, result: Any = None) -> None:
if not isinstance(result, list):
if self._matrix_status:
self._matrix_status.update("Room selection cancelled.")
return
cleaned: List[str] = []
for item in result:
candidate = str(item or "").strip()
if candidate and candidate not in cleaned:
cleaned.append(candidate)
if not cleaned:
if self._matrix_status:
self._matrix_status.update("No default rooms were saved.")
return
matrix_block = self.config_data.setdefault("provider", {}).setdefault("matrix", {})
matrix_block["rooms"] = ", ".join(cleaned)
changed = count_changed_entries(self.config_data)
try:
entries = save_config(self.config_data)
except Exception as exc:
if self._matrix_status:
self._matrix_status.update(f"Saving default rooms failed: {exc}")
return
self.config_data = reload_config()
if self._matrix_status:
status = f"Saved {len(cleaned)} default room(s) ({changed} change(s)) to {db.db_path.name}."
self._matrix_status.update(status)
self.refresh_view()
@on(Input.Changed)
@on(Checkbox.Changed)
def on_checkbox_changed(self, event: Checkbox.Changed) -> None:
# Only respond to inline matrix checkboxes
try:
cbid = event.checkbox.id
except Exception:
cbid = None
if not cbid or cbid not in self._matrix_inline_checkbox_map:
return
any_selected = False
for checkbox_id in self._matrix_inline_checkbox_map.keys():
try:
cb = self.query_one(f"#{checkbox_id}", Checkbox)
if cb.value:
any_selected = True
break
except Exception:
continue
try:
self.query_one("#matrix-inline-save", Button).disabled = not any_selected
except Exception:
logger.exception("Failed to update matrix inline save button")
def on_input_changed(self, event: Input.Changed) -> None:
if event.input.id:
self._update_config_value(event.input.id, event.value)
@on(TextArea.Changed)
def on_text_area_changed(self, event: TextArea.Changed) -> None:
if event.text_area.id:
self._update_config_value(event.text_area.id, event.text_area.text)
@on(Select.Changed)
def on_select_changed(self, event: Select.Changed) -> None:
if event.select.id:
# Select value can be the 'Select.BLANK' sentinel
if event.value != Select.BLANK:
self._update_config_value(event.select.id, event.value)
def save_all(self) -> int:
self._synchronize_inputs_to_config()
# Compute change count prior to persisting so callers can report the number of
# actual changes rather than the total number of rows written to the DB.
changed = count_changed_entries(self.config_data)
# Snapshot config for background save
snapshot = deepcopy(self.config_data)
# Schedule the save to run on a background worker so the UI doesn't block.
try:
# Prefer Textual's worker when running inside the app
self._save_background(snapshot, changed)
except Exception:
# Fallback: start a plain thread that runs the underlying task body
import threading
func = getattr(self._save_background, "__wrapped__", None)
if func:
thread = threading.Thread(target=lambda: func(self, snapshot, changed), daemon=True)
else:
thread = threading.Thread(target=lambda: self._save_background(snapshot, changed), daemon=True)
thread.start()
# Ensure DB path indicator is current and show saving status
self._db_path = str(db.db_path)
try:
self.query_one("#config-db-path", Static).update(self._db_path)
except Exception:
logger.exception("Failed to update config db path label")
try:
self.query_one("#config-last-save", Static).update("Last saved: (saving...)")
except Exception:
logger.exception("Failed to update config last-save label")
log(f"ConfigModal scheduled save (changed={changed})")
return changed
@work(thread=True)
def _save_background(self, cfg: Dict[str, Any], changed: int) -> None:
try:
# Use the verified save path which will check that crucial keys
# (like AllDebrid API keys) persisted to disk. This ensures the UI
# surface reports a failure immediately if post-save verification fails.
saved_entries = save_config_and_verify(cfg)
try:
appobj = self.app
except Exception:
appobj = None
if appobj and hasattr(appobj, 'call_from_thread'):
appobj.call_from_thread(self._on_save_complete, True, None, changed, saved_entries)
else:
# If no app (e.g., running under tests), call completion directly
self._on_save_complete(True, None, changed, saved_entries)
except ConfigSaveConflict as exc:
try:
appobj = self.app
except Exception:
appobj = None
if appobj and hasattr(appobj, 'call_from_thread'):
appobj.call_from_thread(self._on_save_complete, False, str(exc), changed, 0)
else:
self._on_save_complete(False, str(exc), changed, 0)
except Exception as exc:
# Bubble up verification/other save errors back to the UI so the
# user knows persistent storage failed.
try:
appobj = self.app
except Exception:
appobj = None
if appobj and hasattr(appobj, 'call_from_thread'):
appobj.call_from_thread(self._on_save_complete, False, str(exc), changed, 0)
else:
self._on_save_complete(False, str(exc), changed, 0)
except Exception as exc:
try:
appobj = self.app
except Exception:
appobj = None
if appobj and hasattr(appobj, 'call_from_thread'):
appobj.call_from_thread(self._on_save_complete, False, str(exc), changed, 0)
else:
self._on_save_complete(False, str(exc), changed, 0)
def _on_save_complete(self, success: bool, error: Optional[str], changed: int, saved_entries: int) -> None:
# Safely determine whether a Textual app context is available. Accessing
# `self.app` can raise when not running inside the TUI; handle that.
try:
appobj = self.app
except Exception:
appobj = None
if success:
try:
self.config_data = reload_config()
except Exception:
logger.exception("Failed to reload config after save completion")
# Update last-saved label with file timestamp for visibility
db_mtime = None
try:
db_mtime = db.db_path.stat().st_mtime
db_mtime = __import__('datetime').datetime.utcfromtimestamp(db_mtime).isoformat() + "Z"
except Exception:
db_mtime = None
if appobj:
try:
if changed == 0:
label_text = f"Last saved: (no changes)"
else:
label_text = f"Last saved: {changed} change(s) at {db_mtime or '(unknown)'}"
try:
self.query_one("#config-last-save", Static).update(label_text)
except Exception:
logger.exception("Failed to update last-save label with timestamp")
except Exception:
logger.exception("Failed to compute last-save label text")
try:
self.refresh_view()
except Exception:
logger.exception("Failed to refresh config editor view after save completion")
try:
self.notify(f"Configuration saved ({changed} change(s)) to {db.db_path.name}", timeout=5)
except Exception:
logger.exception("Failed to show configuration saved notification")
else:
# No TUI available; log instead of updating UI
log(f"Configuration saved ({changed} change(s)) to {db.db_path.name}")
log(f"ConfigModal saved {saved_entries} configuration entries (changed={changed})")
else:
# Save failed; notify via UI if available, otherwise log
if appobj:
try:
self.notify(f"Save failed: {error}", severity="error", timeout=10)
except Exception:
logger.exception("Failed to show save failed notification")
try:
self.config_data = reload_config()
except Exception:
logger.exception("Failed to reload config after save failure")
try:
self.refresh_view()
except Exception:
logger.exception("Failed to refresh view after save failure")
else:
log(f"Save failed: {error}")
def validate_current_editor(self) -> bool:
"""Ensure all required fields for the current item are filled."""
if not self.editing_item_name:
return True
item_type = str(self.editing_item_type or "")
item_name = str(self.editing_item_name or "")
section = {}
if item_type.startswith("store-"):
stype = item_type.replace("store-", "")
section = self.config_data.get("store", {}).get(stype, {}).get(item_name, {})
elif item_type == "provider":
section = self.config_data.get("provider", {}).get(item_name, {})
elif item_type == "tool":
section = self.config_data.get("tool", {}).get(item_name, {})
# Check required keys
for rk in get_required_config_keys(item_type, item_name):
# Case-insensitive lookup for the required key in the current section
val = None
rk_upper = rk.upper()
for k, v in section.items():
if k.upper() == rk_upper:
val = v
break
if not val or not str(val).strip():
self.notify(f"Required field '{rk}' cannot be blank", severity="error")
return False
return True