Files
Medios-Macina/TUI/modalscreen/config_modal.py

1920 lines
83 KiB
Python
Raw Normal View History

2026-01-26 02:29:56 -08:00
import re
2026-01-27 14:56:01 -08:00
from copy import deepcopy
2026-01-30 16:24:08 -08:00
from typing import Any, Dict, List, Optional, Iterable
import traceback
2026-01-26 02:29:56 -08:00
from textual import on, work
2026-01-11 00:39:17 -08:00
from textual.app import ComposeResult
from textual.containers import Container, Horizontal, Vertical, ScrollableContainer
2026-01-26 02:29:56 -08:00
from textual.screen import ModalScreen
2026-01-30 16:24:08 -08:00
from textual.widgets import Static, Button, Input, Label, ListView, ListItem, Rule, Select, Checkbox
2026-01-30 10:47:47 -08:00
from pathlib import Path
2026-01-11 00:39:17 -08:00
2026-01-31 21:32:51 -08:00
from SYS.config import load_config, save_config, save_config_and_verify, reload_config, global_config, count_changed_entries, ConfigSaveConflict
2026-01-30 10:47:47 -08:00
from SYS.database import db
2026-01-30 16:24:08 -08:00
from SYS.logger import log, debug
2026-01-11 00:39:17 -08:00
from Store.registry import _discover_store_classes, _required_keys_for
2026-01-11 02:42:08 -08:00
from ProviderCore.registry import list_providers
2026-01-26 02:29:56 -08:00
from TUI.modalscreen.matrix_room_picker import MatrixRoomPicker
2026-01-11 02:42:08 -08:00
from TUI.modalscreen.selection_modal import SelectionModal
2026-01-31 19:00:04 -08:00
import logging
logger = logging.getLogger(__name__)
2026-01-11 00:39:17 -08:00
class ConfigModal(ModalScreen):
"""A modal for editing the configuration."""
2026-01-14 01:33:25 -08:00
BINDINGS = [
("ctrl+v", "paste", "Paste"),
("ctrl+c", "copy", "Copy"),
]
2026-01-11 00:39:17 -08:00
CSS = """
ConfigModal {
align: center middle;
background: $boost;
}
#config-container {
width: 90%;
height: 90%;
background: $panel;
border: thick $primary;
padding: 1;
}
.section-title {
background: $accent;
color: $text;
padding: 0 1;
margin-bottom: 1;
text-align: center;
text-style: bold;
height: 3;
content-align: center middle;
}
#config-sidebar {
width: 25%;
border-right: solid $surface;
}
#config-content {
width: 75%;
padding: 1;
}
.config-field {
margin-bottom: 1;
height: auto;
}
.config-label {
width: 100%;
text-style: bold;
color: $accent;
}
2026-01-14 01:33:25 -08:00
.field-row {
height: 5;
margin-bottom: 1;
align: left middle;
}
2026-01-11 00:39:17 -08:00
.config-input {
2026-01-14 01:33:25 -08:00
width: 1fr;
}
2026-01-11 00:39:17 -08:00
#config-actions {
height: 3;
align: right middle;
}
.item-row {
2026-01-11 03:56:09 -08:00
height: 5;
2026-01-11 00:39:17 -08:00
margin-bottom: 1;
padding: 0 1;
border: solid $surface;
}
.item-label {
width: 1fr;
2026-01-11 03:56:09 -08:00
height: 3;
2026-01-11 00:39:17 -08:00
content-align: left middle;
}
2026-01-11 03:47:25 -08:00
.item-row Button {
2026-01-11 03:56:09 -08:00
width: 16;
height: 3;
2026-01-11 03:47:25 -08:00
}
2026-01-11 00:39:17 -08:00
Button {
margin: 0 1;
}
2026-01-30 16:24:08 -08:00
/* Inline matrix rooms list sizing & style (larger, scrollable) */
#matrix-rooms-inline {
height: 16;
border: solid $surface;
padding: 1;
margin-bottom: 1;
}
.matrix-room-row {
border-bottom: solid $surface;
padding: 1 0;
align: left middle;
}
2026-01-11 00:39:17 -08:00
"""
def __init__(self) -> None:
super().__init__()
# Load config from the workspace root (parent of SYS)
2026-01-23 14:11:09 -08:00
self.config_data = load_config()
2026-01-11 00:39:17 -08:00
self.current_category = "globals"
self.editing_item_type = None # 'store' or 'provider'
self.editing_item_name = None
2026-01-11 03:34:35 -08:00
self._button_id_map = {}
self._input_id_map = {}
2026-01-26 02:29:56 -08:00
self._matrix_status: Optional[Static] = None
self._matrix_test_running = False
2026-01-27 14:56:01 -08:00
self._editor_snapshot: Optional[Dict[str, Any]] = None
2026-01-30 16:24:08 -08:00
# Inline matrix rooms controls
self._matrix_inline_list: Optional[ListView] = None
self._matrix_inline_checkbox_map: Dict[str, str] = {}
2026-01-30 10:47:47 -08:00
# Path to the database file used by this process (for diagnostics)
self._db_path = str(db.db_path)
2026-01-27 14:56:01 -08:00
def _capture_editor_snapshot(self) -> None:
self._editor_snapshot = deepcopy(self.config_data)
def _revert_unsaved_editor_changes(self) -> None:
if self._editor_snapshot is not None:
self.config_data = deepcopy(self._editor_snapshot)
self._editor_snapshot = None
def _editor_has_changes(self) -> bool:
if self._editor_snapshot is None:
return True
return self.config_data != self._editor_snapshot
2026-01-11 00:39:17 -08:00
def compose(self) -> ComposeResult:
with Container(id="config-container"):
yield Static("CONFIGURATION EDITOR", classes="section-title")
2026-01-30 10:47:47 -08:00
yield Static(f"DB: {self._db_path}", classes="config-label", id="config-db-path")
yield Static("Last saved: unknown", classes="config-label", id="config-last-save")
2026-01-11 00:39:17 -08:00
with Horizontal():
2026-01-30 10:47:47 -08:00
with Vertical(id="config-sidebar"):
2026-01-11 00:39:17 -08:00
yield Label("Categories", classes="config-label")
with ListView(id="category-list"):
yield ListItem(Label("Global Settings"), id="cat-globals")
yield ListItem(Label("Stores"), id="cat-stores")
yield ListItem(Label("Providers"), id="cat-providers")
2026-01-30 12:04:37 -08:00
yield ListItem(Label("Tools"), id="cat-tools")
2026-01-11 00:39:17 -08:00
with Vertical(id="config-content"):
yield ScrollableContainer(id="fields-container")
with Horizontal(id="config-actions"):
yield Button("Save", variant="success", id="save-btn")
2026-01-31 15:37:17 -08:00
# Durable synchronous save: waits and verifies DB persisted critical keys
yield Button("Save (durable)", variant="primary", id="save-durable-btn")
2026-01-11 00:39:17 -08:00
yield Button("Add Store", variant="primary", id="add-store-btn")
2026-01-11 02:42:08 -08:00
yield Button("Add Provider", variant="primary", id="add-provider-btn")
2026-01-30 12:04:37 -08:00
yield Button("Add Tool", variant="primary", id="add-tool-btn")
2026-01-11 00:39:17 -08:00
yield Button("Back", id="back-btn")
yield Button("Close", variant="error", id="cancel-btn")
def on_mount(self) -> None:
self.query_one("#add-store-btn", Button).display = False
2026-01-11 02:42:08 -08:00
self.query_one("#add-provider-btn", Button).display = False
2026-01-30 12:04:37 -08:00
try:
self.query_one("#add-tool-btn", Button).display = False
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to hide add-tool button in ConfigModal.on_mount")
2026-01-30 10:47:47 -08:00
# Update DB path and last-saved on mount
try:
self.query_one("#config-db-path", Static).update(self._db_path)
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to update config DB path display in ConfigModal.on_mount")
2026-01-30 10:47:47 -08:00
try:
mtime = None
try:
mtime = db.db_path.stat().st_mtime
mtime = __import__('datetime').datetime.utcfromtimestamp(mtime).isoformat() + "Z"
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to stat DB path for last-saved time")
2026-01-30 10:47:47 -08:00
mtime = None
self.query_one("#config-last-save", Static).update(f"Last saved: {mtime or '(unknown)'}")
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to update last-saved display in ConfigModal.on_mount")
2026-01-11 00:39:17 -08:00
self.refresh_view()
def refresh_view(self) -> None:
2026-01-22 02:45:08 -08:00
"""
Refresh the content area. We debounce this call and use a render_id
to avoid race conditions with Textual's async widget mounting.
"""
self._render_id = getattr(self, "_render_id", 0) + 1
if hasattr(self, "_refresh_timer"):
self._refresh_timer.stop()
self._refresh_timer = self.set_timer(0.02, self._actual_refresh)
def _actual_refresh(self) -> None:
try:
container = self.query_one("#fields-container", ScrollableContainer)
except Exception:
return
2026-01-11 03:34:35 -08:00
self._button_id_map.clear()
self._input_id_map.clear()
2026-01-11 00:39:17 -08:00
2026-01-22 02:45:08 -08:00
# Clear existing
container.query("*").remove()
2026-01-11 00:39:17 -08:00
# Update visibility of buttons
try:
self.query_one("#add-store-btn", Button).display = (self.current_category == "stores" and self.editing_item_name is None)
2026-01-11 02:42:08 -08:00
self.query_one("#add-provider-btn", Button).display = (self.current_category == "providers" and self.editing_item_name is None)
2026-01-30 12:04:37 -08:00
self.query_one("#add-tool-btn", Button).display = (self.current_category == "tools" and self.editing_item_name is None)
2026-01-11 00:39:17 -08:00
self.query_one("#back-btn", Button).display = (self.editing_item_name is not None)
2026-01-11 02:42:08 -08:00
self.query_one("#save-btn", Button).display = (self.editing_item_name is not None or self.current_category == "globals")
2026-01-11 00:39:17 -08:00
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to update visibility of config modal action buttons")
2026-01-11 00:39:17 -08:00
2026-01-22 02:45:08 -08:00
render_id = self._render_id
2026-01-11 00:39:17 -08:00
def do_mount():
2026-01-22 02:45:08 -08:00
# If a new refresh was started, ignore this old mount request
if getattr(self, "_render_id", 0) != render_id:
return
# Final check that container is empty. remove() is async.
if container.children:
for child in list(container.children):
child.remove()
2026-01-11 00:39:17 -08:00
if self.editing_item_name:
self.render_item_editor(container)
elif self.current_category == "globals":
self.render_globals(container)
elif self.current_category == "stores":
self.render_stores(container)
elif self.current_category == "providers":
self.render_providers(container)
2026-01-30 12:04:37 -08:00
elif self.current_category == "tools":
self.render_tools(container)
2026-01-11 00:39:17 -08:00
self.call_after_refresh(do_mount)
def render_globals(self, container: ScrollableContainer) -> None:
container.mount(Label("General Configuration", classes="config-label"))
2026-01-11 03:56:09 -08:00
# Get global schema
schema_map = {f["key"].lower(): f for f in global_config()}
existing_keys_lower = set()
2026-01-11 03:34:35 -08:00
idx = 0
2026-01-11 03:56:09 -08:00
# Show fields defined in schema first
for key_lower, field_def in schema_map.items():
existing_keys_lower.add(key_lower)
label_text = field_def.get("label") or field_def["key"]
choices = field_def.get("choices")
# Find current value (case-insensitive)
current_val = None
found_key = field_def["key"]
for k, v in self.config_data.items():
if k.lower() == key_lower:
current_val = str(v)
found_key = k
break
if current_val is None:
current_val = str(field_def.get("default") or "")
container.mount(Label(label_text))
inp_id = f"global-{idx}"
self._input_id_map[inp_id] = found_key
2026-01-30 16:24:08 -08:00
2026-01-11 03:56:09 -08:00
if choices:
2026-01-30 16:24:08 -08:00
# Normalize boolean-like choices to lowercase ('true'/'false') to avoid duplicate choices
normalized_choices = []
for c in choices:
s = str(c)
if s.lower() in ("true", "false"):
normalized_choices.append(s.lower())
else:
normalized_choices.append(s)
select_options = [(str(c), str(c)) for c in normalized_choices]
# Normalize current value as well
cur_val = str(current_val) if current_val is not None else ""
if cur_val.lower() in ("true", "false"):
cur_val = cur_val.lower()
if cur_val not in normalized_choices:
select_options.insert(0, (cur_val, cur_val))
sel = Select(select_options, value=cur_val, id=inp_id)
2026-01-11 03:56:09 -08:00
container.mount(sel)
else:
2026-01-14 01:33:25 -08:00
row = Horizontal(classes="field-row")
container.mount(row)
row.mount(Input(value=current_val, id=inp_id, classes="config-input"))
2026-01-11 03:56:09 -08:00
idx += 1
# Show any other top-level keys not in schema
2026-01-11 00:39:17 -08:00
for k, v in self.config_data.items():
if not isinstance(v, dict) and not k.startswith("_"):
2026-01-11 03:56:09 -08:00
if k.lower() in existing_keys_lower:
continue
2026-01-11 03:34:35 -08:00
inp_id = f"global-{idx}"
self._input_id_map[inp_id] = k
2026-01-11 00:39:17 -08:00
container.mount(Label(k))
2026-01-14 01:33:25 -08:00
row = Horizontal(classes="field-row")
container.mount(row)
row.mount(Input(value=str(v), id=inp_id, classes="config-input"))
row.mount(Button("Paste", id=f"paste-{inp_id}", classes="paste-btn"))
idx += 1
2026-01-11 00:39:17 -08:00
def render_stores(self, container: ScrollableContainer) -> None:
container.mount(Label("Configured Stores", classes="config-label"))
stores = self.config_data.get("store", {})
if not stores:
container.mount(Static("No stores configured."))
else:
2026-01-11 02:49:09 -08:00
# stores is structured as: {type: {name_key: config}}
2026-01-11 03:34:35 -08:00
idx = 0
2026-01-11 00:39:17 -08:00
for stype, instances in stores.items():
if isinstance(instances, dict):
2026-01-11 02:49:09 -08:00
for name_key, conf in instances.items():
# Use the name field from the config if it exists, otherwise use the key
display_name = name_key
if isinstance(conf, dict):
display_name = (
conf.get("NAME")
or conf.get("name")
or conf.get("Name")
or name_key
)
2026-01-11 03:34:35 -08:00
edit_id = f"edit-store-{idx}"
del_id = f"del-store-{idx}"
self._button_id_map[edit_id] = ("edit", f"store-{stype}", name_key)
self._button_id_map[del_id] = ("del", f"store-{stype}", name_key)
idx += 1
2026-01-11 00:39:17 -08:00
row = Horizontal(
2026-01-11 02:49:09 -08:00
Static(f"{display_name} ({stype})", classes="item-label"),
2026-01-11 03:34:35 -08:00
Button("Edit", id=edit_id),
Button("Delete", variant="error", id=del_id),
2026-01-11 00:39:17 -08:00
classes="item-row"
)
container.mount(row)
def render_providers(self, container: ScrollableContainer) -> None:
container.mount(Label("Configured Providers", classes="config-label"))
providers = self.config_data.get("provider", {})
if not providers:
container.mount(Static("No providers configured."))
else:
2026-01-11 03:34:35 -08:00
for i, (name, _) in enumerate(providers.items()):
edit_id = f"edit-provider-{i}"
del_id = f"del-provider-{i}"
self._button_id_map[edit_id] = ("edit", "provider", name)
self._button_id_map[del_id] = ("del", "provider", name)
2026-01-11 00:39:17 -08:00
row = Horizontal(
Static(name, classes="item-label"),
2026-01-11 03:34:35 -08:00
Button("Edit", id=edit_id),
Button("Delete", variant="error", id=del_id),
2026-01-11 00:39:17 -08:00
classes="item-row"
)
container.mount(row)
2026-01-30 12:04:37 -08:00
def render_tools(self, container: ScrollableContainer) -> None:
container.mount(Label("Configured Tools", classes="config-label"))
tools = self.config_data.get("tool", {})
if not tools:
container.mount(Static("No tools configured."))
else:
for i, (name, _) in enumerate(tools.items()):
edit_id = f"edit-tool-{i}"
del_id = f"del-tool-{i}"
self._button_id_map[edit_id] = ("edit", "tool", name)
self._button_id_map[del_id] = ("del", "tool", name)
row = Horizontal(
Static(name, classes="item-label"),
Button("Edit", id=edit_id),
Button("Delete", variant="error", id=del_id),
classes="item-row"
)
container.mount(row)
2026-01-11 00:39:17 -08:00
def render_item_editor(self, container: ScrollableContainer) -> None:
item_type = str(self.editing_item_type or "")
item_name = str(self.editing_item_name or "")
2026-01-11 03:24:49 -08:00
provider_schema_map = {}
2026-01-11 00:39:17 -08:00
# Parse item_type for store-{stype} or just provider
if item_type.startswith("store-"):
stype = item_type.replace("store-", "")
container.mount(Label(f"Editing Store: {item_name} ({stype})", classes="config-label"))
section = self.config_data.get("store", {}).get(stype, {}).get(item_name, {})
2026-01-11 03:24:49 -08:00
# Fetch Store schema
classes = _discover_store_classes()
if stype in classes:
cls = classes[stype]
2026-01-21 22:52:52 -08:00
if hasattr(cls, "config_schema") and callable(cls.config_schema):
2026-01-19 06:24:09 -08:00
for field_def in cls.config_schema():
2026-01-11 03:24:49 -08:00
k = field_def.get("key")
if k:
provider_schema_map[k.upper()] = field_def
2026-01-11 00:39:17 -08:00
else:
container.mount(Label(f"Editing {item_type.capitalize()}: {item_name}", classes="config-label"))
section = self.config_data.get(item_type, {}).get(item_name, {})
2026-01-11 03:24:49 -08:00
# Fetch Provider schema
if item_type == "provider":
from ProviderCore.registry import get_provider_class
try:
pcls = get_provider_class(item_name)
2026-01-21 22:52:52 -08:00
if pcls and hasattr(pcls, "config_schema") and callable(pcls.config_schema):
2026-01-19 06:24:09 -08:00
for field_def in pcls.config_schema():
2026-01-11 03:24:49 -08:00
k = field_def.get("key")
if k:
provider_schema_map[k.upper()] = field_def
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to retrieve provider config_schema")
2026-01-30 12:04:37 -08:00
# Fetch Tool schema
if item_type == "tool":
try:
import importlib
mod = importlib.import_module(f"tool.{item_name}")
if hasattr(mod, "config_schema") and callable(mod.config_schema):
for field_def in mod.config_schema():
k = field_def.get("key")
if k:
provider_schema_map[k.upper()] = field_def
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to retrieve tool config_schema")
2026-01-14 01:33:25 -08:00
# Use columns for better layout of inputs with paste buttons
container.mount(Label("Edit Settings"))
# render_item_editor will handle the inputs for us if we set these
# but wait, render_item_editor is called from refresh_view, not here.
# actually we don't need to do anything else here because refresh_view calls render_item_editor
# which now handles the paste buttons.
2026-01-11 03:24:49 -08:00
2026-01-11 00:39:17 -08:00
# Show all existing keys
existing_keys_upper = set()
2026-01-11 03:34:35 -08:00
idx = 0
2026-01-11 00:39:17 -08:00
for k, v in section.items():
if k.startswith("_"): continue
2026-01-30 16:24:08 -08:00
# Skip low-level keys that shouldn't be editable via the form UI
if (
item_type == "provider"
and isinstance(item_name, str)
and item_name.strip().lower() == "matrix"
and str(k or "").strip().lower() in ("rooms", "cached_rooms")
):
# These are managed by the inline UI and should not be edited directly.
continue
2026-01-11 00:39:17 -08:00
# Deduplicate keys case-insensitively (e.g. name vs NAME vs Name)
k_upper = k.upper()
if k_upper in existing_keys_upper:
continue
existing_keys_upper.add(k_upper)
2026-01-30 16:24:08 -08:00
2026-01-11 03:24:49 -08:00
# Determine display props from schema
label_text = k
is_secret = False
2026-01-11 03:47:25 -08:00
choices = None
2026-01-11 03:24:49 -08:00
schema = provider_schema_map.get(k_upper)
if schema:
label_text = schema.get("label") or k
if schema.get("required"):
label_text += " *"
if schema.get("secret"):
is_secret = True
2026-01-11 03:47:25 -08:00
choices = schema.get("choices")
2026-01-11 03:24:49 -08:00
container.mount(Label(label_text))
2026-01-11 03:34:35 -08:00
inp_id = f"item-{idx}"
self._input_id_map[inp_id] = k
2026-01-11 03:47:25 -08:00
if choices:
2026-01-30 16:24:08 -08:00
# Select takes a list of (label, value) tuples; normalize boolean-like values
2026-01-14 04:27:54 -08:00
select_options = []
choice_values = []
for c in choices:
if isinstance(c, tuple) and len(c) == 2:
2026-01-30 16:24:08 -08:00
label = str(c[0])
val = str(c[1])
2026-01-14 04:27:54 -08:00
else:
2026-01-30 16:24:08 -08:00
label = str(c)
val = str(c)
if val.lower() in ("true", "false"):
val = val.lower()
label = val
select_options.append((label, val))
choice_values.append(val)
2026-01-11 03:47:25 -08:00
# If current value not in choices, add it or stay blank
current_val = str(v)
2026-01-30 16:24:08 -08:00
if current_val.lower() in ("true", "false"):
current_val = current_val.lower()
2026-01-14 04:27:54 -08:00
if current_val not in choice_values:
2026-01-11 03:47:25 -08:00
select_options.insert(0, (current_val, current_val))
2026-01-30 16:24:08 -08:00
2026-01-11 03:47:25 -08:00
sel = Select(select_options, value=current_val, id=inp_id)
container.mount(sel)
else:
2026-01-14 01:33:25 -08:00
row = Horizontal(classes="field-row")
container.mount(row)
2026-01-11 03:47:25 -08:00
inp = Input(value=str(v), id=inp_id, classes="config-input")
if is_secret:
inp.password = True
2026-01-14 01:33:25 -08:00
row.mount(inp)
2026-01-11 03:34:35 -08:00
idx += 1
2026-01-11 03:24:49 -08:00
# Add required/optional fields from schema that are missing
for k_upper, field_def in provider_schema_map.items():
if k_upper not in existing_keys_upper:
existing_keys_upper.add(k_upper)
key = field_def["key"]
label_text = field_def.get("label") or key
if field_def.get("required"):
label_text += " *"
default_val = str(field_def.get("default") or "")
2026-01-11 03:47:25 -08:00
choices = field_def.get("choices")
container.mount(Label(label_text))
2026-01-11 03:34:35 -08:00
inp_id = f"item-{idx}"
self._input_id_map[inp_id] = key
2026-01-11 03:47:25 -08:00
if choices:
2026-01-14 04:27:54 -08:00
select_options = []
choice_values = []
for c in choices:
if isinstance(c, tuple) and len(c) == 2:
2026-01-30 16:24:08 -08:00
label = str(c[0])
val = str(c[1])
2026-01-14 04:27:54 -08:00
else:
2026-01-30 16:24:08 -08:00
label = str(c)
val = str(c)
if val.lower() in ("true", "false"):
val = val.lower()
label = val
select_options.append((label, val))
choice_values.append(val)
# Normalize default/current value
current_val = str(default_val) if default_val is not None else ""
if current_val.lower() in ("true", "false"):
current_val = current_val.lower()
if current_val not in choice_values:
select_options.insert(0, (current_val, current_val))
sel = Select(select_options, value=current_val, id=inp_id)
2026-01-11 03:47:25 -08:00
container.mount(sel)
else:
2026-01-14 01:33:25 -08:00
row = Horizontal(classes="field-row")
container.mount(row)
2026-01-11 03:47:25 -08:00
inp = Input(value=default_val, id=inp_id, classes="config-input")
if field_def.get("secret"):
inp.password = True
if field_def.get("placeholder"):
inp.placeholder = field_def.get("placeholder")
2026-01-14 01:33:25 -08:00
row.mount(inp)
row.mount(Button("Paste", id=f"paste-{inp_id}", classes="paste-btn"))
2026-01-11 03:34:35 -08:00
idx += 1
2026-01-11 00:39:17 -08:00
2026-01-11 03:24:49 -08:00
# If it's a store, we might have required keys (legacy check fallback)
2026-01-11 00:39:17 -08:00
if item_type.startswith("store-"):
stype = item_type.replace("store-", "")
classes = _discover_store_classes()
if stype in classes:
required_keys = _required_keys_for(classes[stype])
for rk in required_keys:
# Case-insensitive deduplication (fix path vs PATH)
if rk.upper() not in existing_keys_upper:
container.mount(Label(rk))
2026-01-11 03:34:35 -08:00
inp_id = f"item-{idx}"
self._input_id_map[inp_id] = rk
2026-01-14 01:33:25 -08:00
row = Horizontal(classes="field-row")
container.mount(row)
row.mount(Input(value="", id=inp_id, classes="config-input"))
2026-01-11 03:34:35 -08:00
idx += 1
2026-01-11 02:42:08 -08:00
2026-01-11 03:24:49 -08:00
# If it's a provider, we might have required keys (legacy check fallback)
2026-01-11 02:42:08 -08:00
if item_type == "provider":
2026-01-11 03:24:49 -08:00
# 2. Legacy required_config_keys
2026-01-11 02:42:08 -08:00
from ProviderCore.registry import get_provider_class
try:
pcls = get_provider_class(item_name)
if pcls:
required_keys = pcls.required_config_keys()
for rk in required_keys:
if rk.upper() not in existing_keys_upper:
container.mount(Label(rk))
2026-01-11 03:34:35 -08:00
inp_id = f"item-{idx}"
self._input_id_map[inp_id] = rk
2026-01-14 01:33:25 -08:00
row = Horizontal(classes="field-row")
2026-01-26 02:29:56 -08:00
container.mount(row)
2026-01-14 01:33:25 -08:00
row.mount(Input(value="", id=inp_id, classes="config-input"))
row.mount(Button("Paste", id=f"paste-{inp_id}", classes="paste-btn"))
2026-01-11 03:34:35 -08:00
idx += 1
2026-01-11 02:42:08 -08:00
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to build required config inputs for provider/tool")
2026-01-11 00:39:17 -08:00
2026-01-26 02:29:56 -08:00
if (
item_type == "provider"
and isinstance(item_name, str)
and item_name.strip().lower() == "matrix"
):
container.mount(Rule())
container.mount(Label("Matrix helpers", classes="config-label"))
status = Static("Set homeserver + token, then test before saving", id="matrix-status")
container.mount(status)
row = Horizontal(classes="field-row")
container.mount(row)
row.mount(Button("Test connection", id="matrix-test-btn"))
2026-01-30 16:24:08 -08:00
# Load rooms refreshes the inline list and caches the results (no popup)
row.mount(Button("Load rooms", variant="primary", id="matrix-load-btn"))
2026-01-26 02:29:56 -08:00
self._matrix_status = status
2026-01-30 16:24:08 -08:00
# Inline rooms list for selecting default rooms (populated after a successful test)
container.mount(Label("Default Rooms", classes="config-label", id="matrix-inline-label"))
# Start with empty list; it will be filled when test loads rooms
container.mount(ListView(id="matrix-rooms-inline"))
# Inline actions
row2 = Horizontal(classes="field-row")
container.mount(row2)
row2.mount(Button("Select All", id="matrix-inline-select-all"))
row2.mount(Button("Clear All", id="matrix-inline-clear"))
save_inline = Button("Save defaults", variant="success", id="matrix-inline-save")
save_inline.disabled = True
row2.mount(save_inline)
# Local bookkeeping maps
try:
self._matrix_inline_checkbox_map = {}
self._matrix_inline_list = self.query_one("#matrix-rooms-inline", ListView)
# Do NOT auto-render cached rooms here; only show explicitly saved defaults
try:
existing_ids = self._parse_matrix_rooms_value()
cached = self._get_cached_matrix_rooms()
rooms_to_render: List[Dict[str, Any]] = []
# Start with cached rooms (from last Load). These are shown
# in the inline Default Rooms list but are unselected unless
# they are in the saved defaults list.
if cached:
rooms_to_render.extend(cached)
# Ensure saved default room ids are present and will be selected
if existing_ids:
cached_ids = {str(r.get("room_id") or "").strip() for r in rooms_to_render if isinstance(r, dict)}
need_resolve = [rid for rid in existing_ids if rid not in cached_ids]
if need_resolve:
try:
resolved = self._resolve_matrix_rooms_by_ids(need_resolve)
if resolved:
rooms_to_render.extend(resolved)
else:
rooms_to_render.extend([{"room_id": rid, "name": ""} for rid in need_resolve])
except Exception:
rooms_to_render.extend([{"room_id": rid, "name": ""} for rid in need_resolve])
# Deduplicate while preserving order
deduped: List[Dict[str, Any]] = []
seen_ids: set[str] = set()
for r in rooms_to_render:
try:
rid = str(r.get("room_id") or "").strip()
if not rid or rid in seen_ids:
continue
seen_ids.add(rid)
deduped.append(r)
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to process a matrix room entry while deduplicating")
2026-01-30 16:24:08 -08:00
continue
if self._matrix_inline_list is not None and deduped:
try:
self._render_matrix_rooms_inline(deduped)
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to render matrix inline rooms")
2026-01-30 16:24:08 -08:00
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to fetch or process matrix rooms for inline rendering")
2026-01-30 16:24:08 -08:00
except Exception:
self._matrix_inline_checkbox_map = {}
self._matrix_inline_list = None
2026-01-11 00:39:17 -08:00
def create_field(self, name: str, value: Any, id: str) -> Vertical:
# This method is now unused - we mount labels and inputs directly
v = Vertical(classes="config-field")
return v
def on_list_view_selected(self, event: ListView.Selected) -> None:
2026-01-30 16:24:08 -08:00
# Only respond to selections from the left-hand category list. Avoid
# resetting editor state when other ListViews (like the inline rooms
# list) trigger selection events.
if not event.item:
return
item_id = getattr(event.item, "id", None)
if item_id not in ("cat-globals", "cat-stores", "cat-providers", "cat-tools"):
return
if item_id == "cat-globals":
2026-01-11 00:39:17 -08:00
self.current_category = "globals"
2026-01-30 16:24:08 -08:00
elif item_id == "cat-stores":
2026-01-11 00:39:17 -08:00
self.current_category = "stores"
2026-01-30 16:24:08 -08:00
elif item_id == "cat-providers":
2026-01-11 00:39:17 -08:00
self.current_category = "providers"
2026-01-30 16:24:08 -08:00
elif item_id == "cat-tools":
2026-01-30 12:04:37 -08:00
self.current_category = "tools"
2026-01-30 16:24:08 -08:00
# Reset editor state and refresh view for the new category
2026-01-11 00:39:17 -08:00
self.editing_item_name = None
self.editing_item_type = None
self.refresh_view()
def on_button_pressed(self, event: Button.Pressed) -> None:
bid = event.button.id
if not bid: return
if bid == "cancel-btn":
2026-01-27 14:56:01 -08:00
self._revert_unsaved_editor_changes()
2026-01-11 00:39:17 -08:00
self.dismiss()
elif bid == "back-btn":
2026-01-27 14:56:01 -08:00
self._revert_unsaved_editor_changes()
2026-01-11 00:39:17 -08:00
self.editing_item_name = None
self.editing_item_type = None
self.refresh_view()
elif bid == "save-btn":
2026-01-23 16:46:48 -08:00
self._synchronize_inputs_to_config()
2026-01-11 02:42:08 -08:00
if not self.validate_current_editor():
return
2026-01-27 14:56:01 -08:00
if self.editing_item_name and not self._editor_has_changes():
2026-01-30 10:47:47 -08:00
self.notify("No changes to save", severity="warning", timeout=3)
2026-01-27 14:56:01 -08:00
return
2026-01-12 20:10:18 -08:00
try:
2026-01-23 16:46:48 -08:00
saved = self.save_all()
if saved == 0:
2026-01-30 10:47:47 -08:00
msg = f"Configuration saved (no rows changed) to {db.db_path.name}"
else:
msg = f"Configuration saved ({saved} change(s)) to {db.db_path.name}"
# Make the success notification visible a bit longer so it's not missed
self.notify(msg, timeout=5)
2026-01-12 20:10:18 -08:00
# Return to the main list view within the current category
self.editing_item_name = None
self.editing_item_type = None
self.refresh_view()
2026-01-27 14:56:01 -08:00
self._editor_snapshot = None
2026-01-30 10:47:47 -08:00
except ConfigSaveConflict as exc:
# A concurrent on-disk change was detected; do not overwrite it.
self.notify(
"Save aborted: configuration changed on disk. The editor will refresh.",
severity="error",
timeout=10,
)
# Refresh our in-memory view from disk and drop the editor snapshot
try:
self.config_data = reload_config()
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to reload config after save conflict")
2026-01-30 10:47:47 -08:00
self._editor_snapshot = None
self.editing_item_name = None
self.editing_item_type = None
self.refresh_view()
2026-01-12 20:10:18 -08:00
except Exception as exc:
self.notify(f"Save failed: {exc}", severity="error", timeout=10)
2026-01-31 15:37:17 -08:00
elif bid == "save-durable-btn":
# Perform a synchronous, verified save and notify status to the user.
self._synchronize_inputs_to_config()
if not self.validate_current_editor():
return
if self.editing_item_name and not self._editor_has_changes():
self.notify("No changes to save", severity="warning", timeout=3)
return
try:
from SYS.config import save_config_and_verify
saved = save_config_and_verify(self.config_data, retries=3, delay=0.1)
try:
self.config_data = reload_config()
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to reload config after durable save")
2026-01-31 15:37:17 -08:00
if saved == 0:
msg = f"Configuration saved (no rows changed) to {db.db_path.name}"
else:
msg = f"Configuration saved ({saved} change(s)) to {db.db_path.name} (verified)"
try:
self.notify(msg, timeout=6)
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to show notification message in ConfigModal")
2026-01-31 15:37:17 -08:00
# Return to the main list view within the current category
self.editing_item_name = None
self.editing_item_type = None
self.refresh_view()
self._editor_snapshot = None
except Exception as exc:
self.notify(f"Durable save failed: {exc}", severity="error", timeout=10)
try:
log(f"Durable save failed: {exc}")
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to call log() for durable save error")
2026-01-11 03:34:35 -08:00
elif bid in self._button_id_map:
action, itype, name = self._button_id_map[bid]
if action == "edit":
2026-01-27 14:56:01 -08:00
self._capture_editor_snapshot()
2026-01-11 03:34:35 -08:00
self.editing_item_type = itype
2026-01-11 00:39:17 -08:00
self.editing_item_name = name
self.refresh_view()
2026-01-11 03:34:35 -08:00
elif action == "del":
2026-01-23 15:02:19 -08:00
removed = False
2026-01-11 03:34:35 -08:00
if itype.startswith("store-"):
stype = itype.replace("store-", "")
if "store" in self.config_data and stype in self.config_data["store"]:
if name in self.config_data["store"][stype]:
del self.config_data["store"][stype][name]
2026-01-23 15:02:19 -08:00
removed = True
2026-01-11 03:34:35 -08:00
elif itype == "provider":
if "provider" in self.config_data and name in self.config_data["provider"]:
del self.config_data["provider"][name]
2026-01-23 15:02:19 -08:00
removed = True
2026-01-30 10:47:47 -08:00
if str(name).strip().lower() == "alldebrid":
self._remove_alldebrid_store_entry()
2026-01-30 12:04:37 -08:00
elif itype == "tool":
if "tool" in self.config_data and name in self.config_data["tool"]:
del self.config_data["tool"][name]
removed = True
2026-01-23 15:02:19 -08:00
if removed:
try:
2026-01-23 16:46:48 -08:00
saved = self.save_all()
2026-01-30 10:47:47 -08:00
self.notify("Saving configuration...", timeout=3)
2026-01-23 15:02:19 -08:00
except Exception as exc:
self.notify(f"Save failed: {exc}", severity="error", timeout=10)
2026-01-11 03:34:35 -08:00
self.refresh_view()
2026-01-11 00:39:17 -08:00
elif bid == "add-store-btn":
2026-01-11 03:24:49 -08:00
all_classes = _discover_store_classes()
options = []
for stype, cls in all_classes.items():
2026-01-21 22:52:52 -08:00
if hasattr(cls, "config_schema") and callable(cls.config_schema):
2026-01-11 03:24:49 -08:00
try:
2026-01-19 06:24:09 -08:00
if cls.config_schema():
2026-01-11 03:24:49 -08:00
options.append(stype)
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to inspect store class config_schema for '%s'", stype)
2026-01-11 02:42:08 -08:00
self.app.push_screen(SelectionModal("Select Store Type", options), callback=self.on_store_type_selected)
elif bid == "add-provider-btn":
2026-01-11 03:24:49 -08:00
provider_names = list(list_providers().keys())
options = []
from ProviderCore.registry import get_provider_class
for ptype in provider_names:
pcls = get_provider_class(ptype)
2026-01-21 22:52:52 -08:00
if pcls and hasattr(pcls, "config_schema") and callable(pcls.config_schema):
2026-01-11 03:24:49 -08:00
try:
2026-01-19 06:24:09 -08:00
if pcls.config_schema():
2026-01-11 03:24:49 -08:00
options.append(ptype)
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to inspect provider class config_schema for '%s'", ptype)
2026-01-11 02:42:08 -08:00
self.app.push_screen(SelectionModal("Select Provider Type", options), callback=self.on_provider_type_selected)
2026-01-30 12:04:37 -08:00
elif bid == "add-tool-btn":
# Discover tool modules that advertise a config_schema()
options = []
try:
import pkgutil
import importlib
import tool as _tool_pkg
for _mod in pkgutil.iter_modules(_tool_pkg.__path__):
try:
mod = importlib.import_module(f"tool.{_mod.name}")
if hasattr(mod, "config_schema") and callable(mod.config_schema):
options.append(_mod.name)
except Exception:
continue
except Exception:
# Fallback to known entry
options = ["ytdlp"]
if options:
options.sort()
self.app.push_screen(SelectionModal("Select Tool", options), callback=self.on_tool_type_selected)
2026-01-26 02:29:56 -08:00
elif bid == "matrix-test-btn":
self._request_matrix_test()
2026-01-30 16:24:08 -08:00
elif bid == "matrix-load-btn":
# Refresh the inline rooms list and cache the results (no popup)
self._request_matrix_load()
elif bid == "matrix-inline-select-all":
for checkbox_id in list(self._matrix_inline_checkbox_map.keys()):
try:
cb = self.query_one(f"#{checkbox_id}", Checkbox)
cb.value = True
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to set matrix inline checkbox to True for '%s'", checkbox_id)
2026-01-14 01:33:25 -08:00
try:
2026-01-30 16:24:08 -08:00
self.query_one("#matrix-inline-save", Button).disabled = False
2026-01-14 01:33:25 -08:00
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to enable matrix inline save button")
2026-01-30 16:24:08 -08:00
elif bid == "matrix-inline-clear":
for checkbox_id in list(self._matrix_inline_checkbox_map.keys()):
try:
cb = self.query_one(f"#{checkbox_id}", Checkbox)
cb.value = False
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to set matrix inline checkbox to False for '%s'", checkbox_id)
2026-01-30 16:24:08 -08:00
try:
self.query_one("#matrix-inline-save", Button).disabled = True
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to disable matrix inline save button")
2026-01-30 16:24:08 -08:00
elif bid == "matrix-inline-save":
selected: List[str] = []
for checkbox_id, room_id in self._matrix_inline_checkbox_map.items():
try:
cb = self.query_one(f"#{checkbox_id}", Checkbox)
if cb.value and room_id:
selected.append(room_id)
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to read matrix inline checkbox '%s'", checkbox_id)
2026-01-30 16:24:08 -08:00
if not selected:
if self._matrix_status:
self._matrix_status.update("No default rooms were saved.")
return
matrix_block = self.config_data.setdefault("provider", {}).setdefault("matrix", {})
matrix_block["rooms"] = ", ".join(selected)
changed = count_changed_entries(self.config_data)
try:
entries = save_config(self.config_data)
except Exception as exc:
if self._matrix_status:
self._matrix_status.update(f"Saving default rooms failed: {exc}")
return
self.config_data = reload_config()
if self._matrix_status:
status = f"Saved {len(selected)} default room(s) ({changed} change(s)) to {db.db_path.name}."
self._matrix_status.update(status)
try:
self.query_one("#matrix-inline-save", Button).disabled = True
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to disable matrix inline save button")
2026-01-30 16:24:08 -08:00
self.refresh_view()
2026-01-14 01:33:25 -08:00
async def focus_and_paste(self, inp: Input) -> None:
if hasattr(self.app, "paste_from_clipboard"):
text = await self.app.paste_from_clipboard()
if text:
# Replace selection or append
inp.value = str(inp.value) + text
inp.focus()
self.notify("Pasted from clipboard")
else:
self.notify("Clipboard not supported in this terminal", severity="warning")
async def action_paste(self) -> None:
focused = self.focused
if isinstance(focused, Input):
await self.focus_and_paste(focused)
async def action_copy(self) -> None:
focused = self.focused
if isinstance(focused, Input) and focused.value:
if hasattr(self.app, "copy_to_clipboard"):
self.app.copy_to_clipboard(str(focused.value))
self.notify("Copied to clipboard")
else:
self.notify("Clipboard not supported in this terminal", severity="warning")
2026-01-11 02:42:08 -08:00
2026-01-30 12:04:37 -08:00
# Backup/restore helpers removed: forensics/audit mode disabled and restore UI removed.
2026-01-30 10:47:47 -08:00
2026-01-11 02:42:08 -08:00
def on_store_type_selected(self, stype: str) -> None:
2026-01-23 01:19:36 -08:00
if not stype:
return
2026-01-27 14:56:01 -08:00
self._capture_editor_snapshot()
2026-01-23 01:19:36 -08:00
existing_names: set[str] = set()
store_block = self.config_data.get("store")
if isinstance(store_block, dict):
st_entries = store_block.get(stype)
if isinstance(st_entries, dict):
existing_names = {str(name) for name in st_entries.keys() if name}
base_name = f"new_{stype}"
new_name = base_name
suffix = 1
while new_name in existing_names:
suffix += 1
new_name = f"{base_name}_{suffix}"
2026-01-11 02:42:08 -08:00
if "store" not in self.config_data:
self.config_data["store"] = {}
if stype not in self.config_data["store"]:
self.config_data["store"][stype] = {}
# Default config for the new store
new_config = {"NAME": new_name}
classes = _discover_store_classes()
if stype in classes:
2026-01-11 03:24:49 -08:00
cls = classes[stype]
# Use schema for defaults if present
2026-01-21 22:52:52 -08:00
if hasattr(cls, "config_schema") and callable(cls.config_schema):
2026-01-19 06:24:09 -08:00
for field_def in cls.config_schema():
2026-01-11 03:24:49 -08:00
key = field_def.get("key")
if key:
val = field_def.get("default", "")
# Don't override NAME if we already set it to new_stype
if key.upper() == "NAME":
continue
new_config[key] = val
else:
# Fallback to required keys list
required = _required_keys_for(cls)
for rk in required:
if rk.upper() != "NAME":
new_config[rk] = ""
2026-01-23 01:19:36 -08:00
2026-01-11 02:42:08 -08:00
self.config_data["store"][stype][new_name] = new_config
self.editing_item_type = f"store-{stype}"
self.editing_item_name = new_name
self.refresh_view()
def on_provider_type_selected(self, ptype: str) -> None:
if not ptype: return
2026-01-27 14:56:01 -08:00
self._capture_editor_snapshot()
2026-01-11 02:42:08 -08:00
if "provider" not in self.config_data:
self.config_data["provider"] = {}
# For providers, they are usually top-level entries in 'provider' dict
if ptype not in self.config_data["provider"]:
from ProviderCore.registry import get_provider_class
try:
pcls = get_provider_class(ptype)
new_config = {}
if pcls:
2026-01-11 03:24:49 -08:00
# Use schema for defaults
2026-01-21 22:52:52 -08:00
if hasattr(pcls, "config_schema") and callable(pcls.config_schema):
2026-01-19 06:24:09 -08:00
for field_def in pcls.config_schema():
2026-01-11 03:24:49 -08:00
key = field_def.get("key")
if key:
new_config[key] = field_def.get("default", "")
else:
# Fallback to legacy required keys
required = pcls.required_config_keys()
for rk in required:
new_config[rk] = ""
2026-01-11 02:42:08 -08:00
self.config_data["provider"][ptype] = new_config
except Exception:
self.config_data["provider"][ptype] = {}
self.editing_item_type = "provider"
self.editing_item_name = ptype
self.refresh_view()
2026-01-11 00:39:17 -08:00
2026-01-30 12:04:37 -08:00
def on_tool_type_selected(self, tname: str) -> None:
if not tname:
return
self._capture_editor_snapshot()
if "tool" not in self.config_data:
self.config_data["tool"] = {}
if tname not in self.config_data["tool"]:
new_config = {}
try:
import importlib
mod = importlib.import_module(f"tool.{tname}")
if hasattr(mod, "config_schema") and callable(mod.config_schema):
for field_def in mod.config_schema():
key = field_def.get("key")
if key:
new_config[key] = field_def.get("default", "")
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to load config_schema for tool '%s'", tname)
2026-01-30 12:04:37 -08:00
self.config_data["tool"][tname] = new_config
self.editing_item_type = "tool"
self.editing_item_name = tname
self.refresh_view()
2026-01-11 03:47:25 -08:00
def _update_config_value(self, widget_id: str, value: Any) -> None:
if widget_id not in self._input_id_map:
2026-01-11 03:34:35 -08:00
return
2026-01-11 03:47:25 -08:00
key = self._input_id_map[widget_id]
2026-01-27 14:56:01 -08:00
raw_value = value
is_blank_string = isinstance(raw_value, str) and not raw_value.strip()
existing_value: Any = None
item_type = str(self.editing_item_type or "")
item_name = str(self.editing_item_name or "")
if widget_id.startswith("global-"):
existing_value = self.config_data.get(key)
elif widget_id.startswith("item-") and item_name:
if item_type.startswith("store-"):
stype = item_type.replace("store-", "")
store_block = self.config_data.get("store")
if isinstance(store_block, dict):
type_block = store_block.get(stype)
if isinstance(type_block, dict):
section = type_block.get(item_name)
if isinstance(section, dict):
existing_value = section.get(key)
else:
section_block = self.config_data.get(item_type)
if isinstance(section_block, dict):
section = section_block.get(item_name)
if isinstance(section, dict):
existing_value = section.get(key)
if is_blank_string and existing_value is None:
return
2026-01-11 03:34:35 -08:00
2026-01-23 16:46:48 -08:00
# Try to preserve boolean/integer types
2026-01-27 14:56:01 -08:00
processed_value = raw_value
if isinstance(raw_value, str):
low = raw_value.lower()
2026-01-23 16:46:48 -08:00
if low == "true":
processed_value = True
elif low == "false":
processed_value = False
2026-01-27 14:56:01 -08:00
elif raw_value.isdigit():
processed_value = int(raw_value)
2026-01-23 16:46:48 -08:00
2026-01-11 03:47:25 -08:00
if widget_id.startswith("global-"):
2026-01-23 16:46:48 -08:00
self.config_data[key] = processed_value
2026-01-27 14:56:01 -08:00
elif widget_id.startswith("item-") and item_name:
if item_type.startswith("store-"):
stype = item_type.replace("store-", "")
2026-01-11 00:39:17 -08:00
if "store" not in self.config_data:
self.config_data["store"] = {}
if stype not in self.config_data["store"]:
self.config_data["store"][stype] = {}
2026-01-27 14:56:01 -08:00
if item_name not in self.config_data["store"][stype]:
self.config_data["store"][stype][item_name] = {}
2026-01-23 16:46:48 -08:00
# Special case: Renaming the store via the NAME field
2026-01-27 14:56:01 -08:00
if key.upper() == "NAME" and processed_value and str(processed_value) != item_name:
new_name = str(processed_value)
self.config_data["store"][stype][new_name] = self.config_data["store"][stype].pop(item_name)
self.editing_item_name = new_name
item_name = new_name
self.config_data["store"][stype][item_name][key] = processed_value
2026-01-11 00:39:17 -08:00
else:
2026-01-27 14:56:01 -08:00
if item_type not in self.config_data:
self.config_data[item_type] = {}
if item_name not in self.config_data[item_type]:
self.config_data[item_type][item_name] = {}
self.config_data[item_type][item_name][key] = processed_value
2026-01-23 16:46:48 -08:00
def _synchronize_inputs_to_config(self) -> None:
"""Capture current input/select values before saving."""
widgets = list(self.query(Input)) + list(self.query(Select))
for widget in widgets:
widget_id = widget.id
if not widget_id or widget_id not in self._input_id_map:
continue
if isinstance(widget, Select):
if widget.value == Select.BLANK:
continue
value = widget.value
else:
value = widget.value
self._update_config_value(widget_id, value)
2026-01-11 03:47:25 -08:00
2026-01-30 10:47:47 -08:00
def _remove_alldebrid_store_entry(self) -> bool:
"""Remove the mirrored AllDebrid store entry that would recreate the provider."""
store_block = self.config_data.get("store")
if not isinstance(store_block, dict):
return False
debrid = store_block.get("debrid")
if not isinstance(debrid, dict):
return False
removed = False
for key in list(debrid.keys()):
if str(key or "").strip().lower() == "all-debrid":
debrid.pop(key, None)
removed = True
if not debrid:
store_block.pop("debrid", None)
if not store_block:
self.config_data.pop("store", None)
return removed
2026-01-26 02:29:56 -08:00
def _get_matrix_provider_block(self) -> Dict[str, Any]:
providers = self.config_data.get("provider")
if not isinstance(providers, dict):
return {}
block = providers.get("matrix")
return block if isinstance(block, dict) else {}
def _parse_matrix_rooms_value(self) -> List[str]:
block = self._get_matrix_provider_block()
raw = block.get("rooms")
if isinstance(raw, (list, tuple, set)):
return [str(item).strip() for item in raw if str(item).strip()]
text = str(raw or "").strip()
if not text:
return []
return [segment for segment in re.split(r"[\s,]+", text) if segment]
def _request_matrix_test(self) -> None:
if self._matrix_test_running:
return
self._synchronize_inputs_to_config()
2026-01-30 16:24:08 -08:00
# Quick client-side pre-check before attempting to save/test to provide
# immediate guidance when required fields are missing.
try:
matrix_block = self.config_data.get("provider", {}).get("matrix", {})
hs = matrix_block.get("homeserver")
token = matrix_block.get("access_token")
if not hs or not token:
if self._matrix_status:
self._matrix_status.update("Matrix test skipped: please set both 'homeserver' and 'access_token' before testing.")
return
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to check matrix configuration before testing")
2026-01-30 16:24:08 -08:00
2026-01-26 02:29:56 -08:00
if self._matrix_status:
2026-01-27 14:56:01 -08:00
self._matrix_status.update("Saving configuration before testing…")
2026-01-30 10:47:47 -08:00
changed = count_changed_entries(self.config_data)
2026-01-27 14:56:01 -08:00
try:
entries = save_config(self.config_data)
except Exception as exc:
if self._matrix_status:
self._matrix_status.update(f"Saving configuration failed: {exc}")
self._matrix_test_running = False
return
self.config_data = reload_config()
if self._matrix_status:
2026-01-30 10:47:47 -08:00
self._matrix_status.update(f"Saved configuration ({changed} change(s)) to {db.db_path.name}. Testing Matrix connection…")
2026-01-26 02:29:56 -08:00
self._matrix_test_running = True
self._matrix_test_background()
@work(thread=True)
def _matrix_test_background(self) -> None:
try:
from Provider.matrix import Matrix
provider = Matrix(self.config_data)
rooms = provider.list_rooms()
self.app.call_from_thread(self._matrix_test_result, True, rooms, None)
except Exception as exc:
2026-01-30 16:24:08 -08:00
# Log full traceback for diagnostics but present a concise, actionable
# message to the user in the UI.
tb = traceback.format_exc()
try:
debug(f"[matrix] Test connection failed: {exc}\n{tb}")
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to debug matrix test failure")
2026-01-26 02:29:56 -08:00
2026-01-30 16:24:08 -08:00
msg = str(exc) or "Matrix test failed"
m_lower = msg.lower()
if "auth" in m_lower or "authentication" in m_lower:
msg = msg + ". Please verify your access token and try again."
elif "homeserver" in m_lower or "missing" in m_lower:
msg = msg + ". Check your homeserver URL (include https://)."
else:
msg = msg + " (see logs for details)"
self.app.call_from_thread(self._matrix_test_result, False, [], msg)
def _get_cached_matrix_rooms(self) -> List[Dict[str, Any]]:
"""Return cached rooms stored in the provider config (normalized).
The config value can be a list/dict, a JSON string, or a Python literal
string (repr). This method normalizes the input and returns a list of
dicts containing 'room_id' and 'name'. Malformed inputs are ignored.
"""
try:
block = self._get_matrix_provider_block()
raw = block.get("cached_rooms")
if not raw:
return []
# If it's already a list or tuple, normalize each element
if isinstance(raw, (list, tuple)):
return self._normalize_cached_raw(list(raw))
# If it's a dict, wrap and normalize
if isinstance(raw, dict):
return self._normalize_cached_raw([raw])
# If it's a string, try JSON -> ast.literal_eval -> regex ID extraction
if isinstance(raw, str):
s = str(raw).strip()
if not s:
return []
# Try JSON first (strict)
try:
import json
parsed = json.loads(s)
if isinstance(parsed, (list, tuple, dict)):
return self._normalize_cached_raw(parsed if isinstance(parsed, (list, tuple)) else [parsed])
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to parse cached_rooms JSON for provider matrix")
2026-01-30 16:24:08 -08:00
# Try Python literal eval (accepts single quotes, repr-style lists)
try:
import ast
parsed = ast.literal_eval(s)
if isinstance(parsed, (list, tuple, dict)):
return self._normalize_cached_raw(parsed if isinstance(parsed, (list, tuple)) else [parsed])
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to parse cached_rooms as Python literal for provider matrix")
2026-01-30 16:24:08 -08:00
# Try to extract dict-like pairs for room_id/name when the string looks like
# a Python repr or partial dict fragment (e.g., "'room_id': '!r1', 'name': 'Room'"
try:
import re
pair_pat = re.compile(r"[\"']room_id[\"']\s*:\s*[\"'](?P<id>[^\"']+)[\"']\s*,\s*[\"']name[\"']\s*:\s*[\"'](?P<name>[^\"']+)[\"']")
pairs = [m.groupdict() for m in pair_pat.finditer(s)]
if pairs:
out = []
for p in pairs:
rid = str(p.get("id") or "").strip()
name = str(p.get("name") or "").strip()
if rid:
out.append({"room_id": rid, "name": name})
if out:
return out
# As a last resort, extract candidate room ids via regex (look for leading '!')
ids = re.findall(r"![-A-Za-z0-9._=]+(?::[-A-Za-z0-9._=]+)?", s)
if ids:
return [{"room_id": rid, "name": ""} for rid in ids]
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to extract cached_rooms pairs or ids for provider matrix")
2026-01-30 16:24:08 -08:00
return []
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to parse cached_rooms for provider matrix")
2026-01-30 16:24:08 -08:00
return []
def _normalize_cached_raw(self, parsed: List[Any]) -> List[Dict[str, Any]]:
out: List[Dict[str, Any]] = []
for it in parsed:
try:
if isinstance(it, dict):
rid = str(it.get("room_id") or "").strip()
name = str(it.get("name") or "").strip()
if rid:
out.append({"room_id": rid, "name": name})
elif isinstance(it, str):
s = str(it or "").strip()
if s:
out.append({"room_id": s, "name": ""})
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to normalize cached_rooms entry: %r", it)
2026-01-30 16:24:08 -08:00
continue
return out
def _request_matrix_load(self) -> None:
"""Save current config and request a background load of joined rooms.
This replaces the old "Choose default rooms" popup and instead refreshes
the inline default rooms list and caches the results to config.
"""
if self._matrix_test_running:
return
self._synchronize_inputs_to_config()
# Quick client-side pre-check for required fields
try:
matrix_block = self.config_data.get("provider", {}).get("matrix", {})
hs = matrix_block.get("homeserver")
token = matrix_block.get("access_token")
if not hs or not token:
if self._matrix_status:
self._matrix_status.update("Load skipped: please set both 'homeserver' and 'access_token' before loading rooms.")
return
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to check matrix configuration before load")
2026-01-30 16:24:08 -08:00
if self._matrix_status:
self._matrix_status.update("Saving configuration before loading rooms…")
changed = count_changed_entries(self.config_data)
try:
entries = save_config(self.config_data)
except Exception as exc:
if self._matrix_status:
self._matrix_status.update(f"Saving configuration failed: {exc}")
self._matrix_test_running = False
return
self.config_data = reload_config()
if self._matrix_status:
self._matrix_status.update(f"Saved configuration ({changed} change(s)) to {db.db_path.name}. Loading Matrix rooms…")
self._matrix_test_running = True
self._matrix_load_background()
@work(thread=True)
def _matrix_load_background(self) -> None:
try:
from Provider.matrix import Matrix
provider = Matrix(self.config_data)
rooms = provider.list_rooms()
self.app.call_from_thread(self._matrix_load_result, True, rooms, None)
except Exception as exc:
tb = traceback.format_exc()
try:
debug(f"[matrix] Load rooms failed: {exc}\n{tb}")
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to debug matrix load failure")
2026-01-30 16:24:08 -08:00
msg = str(exc) or "Matrix load failed"
if "auth" in msg.lower():
msg = msg + ". Please verify your access token and try again."
self.app.call_from_thread(self._matrix_load_result, False, [], msg)
def _matrix_load_result(self, success: bool, rooms: List[Dict[str, Any]], error: Optional[str]) -> None:
# Called on the main thread via call_from_thread
2026-01-26 02:29:56 -08:00
self._matrix_test_running = False
2026-01-30 16:24:08 -08:00
if not success:
full_msg = f"Matrix load failed: {error or '(error)'}"
2026-01-26 02:29:56 -08:00
if self._matrix_status:
2026-01-30 16:24:08 -08:00
self._matrix_status.update(full_msg)
try:
self.notify(full_msg, severity="error", timeout=8)
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to show Matrix load failure notification")
2026-01-30 16:24:08 -08:00
return
# Populate inline list
try:
self._render_matrix_rooms_inline(rooms)
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to render inline matrix rooms")
2026-01-30 16:24:08 -08:00
# Persist cached rooms so they are available on next editor open
try:
matrix_block = self.config_data.setdefault("provider", {}).setdefault("matrix", {})
matrix_block["cached_rooms"] = rooms
# Schedule a background save of the config (non-blocking)
try:
self.save_all()
except Exception:
# Fallback to direct save when save_all is unavailable (tests)
try:
save_config(self.config_data)
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to persist cached matrix rooms via save_config() fallback")
2026-01-26 02:29:56 -08:00
if self._matrix_status:
2026-01-30 16:24:08 -08:00
self._matrix_status.update(f"Loaded and cached {len(rooms)} room(s).")
try:
self.notify(f"Loaded {len(rooms)} rooms and cached the results", timeout=5)
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to notify loaded-and-cached message for Matrix rooms")
2026-01-30 16:24:08 -08:00
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to cache Matrix rooms after load")
2026-01-26 02:29:56 -08:00
def _open_matrix_room_picker(
self,
*,
prefetched_rooms: Optional[List[Dict[str, Any]]] = None,
) -> None:
existing = self._parse_matrix_rooms_value()
self.app.push_screen(
MatrixRoomPicker(
self.config_data,
existing=existing,
rooms=prefetched_rooms,
),
callback=self.on_matrix_rooms_selected,
)
2026-01-30 16:24:08 -08:00
def _render_matrix_rooms_inline(self, rooms: List[Dict[str, Any]]) -> None:
"""
Populate the inline matrix rooms ListView with checkboxes based on the
list of rooms returned from a successful test. If the inline ListView
is not present in the current editor view, fall back to opening the
MatrixRoomPicker popup with the results.
"""
try:
inline_list = self._matrix_inline_list or self.query_one("#matrix-rooms-inline", ListView)
except Exception:
inline_list = None
if inline_list is None:
# Inline view isn't available in this context; cache the rooms and persist
try:
matrix_block = self.config_data.setdefault("provider", {}).setdefault("matrix", {})
matrix_block["cached_rooms"] = rooms
try:
self.save_all()
except Exception:
try:
save_config(self.config_data)
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to persist cached matrix rooms via save_config() fallback")
2026-01-30 16:24:08 -08:00
if self._matrix_status:
self._matrix_status.update(f"Loaded {len(rooms)} rooms (cached)")
try:
self.notify(f"Loaded {len(rooms)} rooms and cached the results", timeout=5)
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to notify loaded-and-cached message for Matrix rooms")
2026-01-30 16:24:08 -08:00
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to cache Matrix rooms when inline view unavailable")
2026-01-30 16:24:08 -08:00
return
# Clear current entries
for child in list(inline_list.children):
child.remove()
self._matrix_inline_checkbox_map.clear()
self._matrix_inline_list = inline_list
# Determine existing selection from current config (so saved defaults are pre-selected)
existing = set(self._parse_matrix_rooms_value())
if not rooms:
if self._matrix_status:
self._matrix_status.update("Matrix returned no rooms.")
try:
save_btn = self.query_one("#matrix-inline-save", Button)
save_btn.disabled = True
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to disable matrix inline save button when no rooms returned")
2026-01-30 16:24:08 -08:00
return
any_selected = False
# Filter and normalize rooms to avoid malformed cache entries or split-word artifacts.
normalized: List[Dict[str, str]] = []
for r in rooms:
try:
rid = str(r.get("room_id") or "").strip()
name = str(r.get("name") or "").strip()
except Exception:
continue
# Ignore obviously malformed tokens coming from bad caching/parsing
le_rid = rid.lower()
le_name = name.lower()
if "room_id" in le_rid or "room_id" in le_name:
continue
# Require a valid room id (Matrix ids usually start with '!' and often contain ':')
if not rid or (not rid.startswith("!") and ":" not in rid):
# Skip entries without a sensible ID (we rely on IDs for saving)
continue
normalized.append({"room_id": rid, "name": name})
for idx, room in enumerate(normalized):
room_id = room.get("room_id") or ""
name = room.get("name") or ""
checkbox_id = f"matrix-inline-room-{idx}"
label_text = name or room_id or "Matrix Room"
checked = bool(room_id and room_id in existing)
if checked:
any_selected = True
from textual.widgets import Checkbox as _Checkbox # local import to avoid top-level change
checkbox = _Checkbox(label_text, id=checkbox_id, value=checked)
self._matrix_inline_checkbox_map[checkbox_id] = room_id
inline_list.mount(ListItem(checkbox, classes="matrix-room-row"))
if self._matrix_status:
self._matrix_status.update("Loaded rooms. Select one or more and save.")
try:
save_btn = self.query_one("#matrix-inline-save", Button)
save_btn.disabled = not any_selected
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to set matrix inline save button disabled state")
2026-01-30 16:24:08 -08:00
def _resolve_matrix_rooms_by_ids(self, ids: Iterable[str]) -> List[Dict[str, Any]]:
"""
Resolve room display names for a list of room IDs using the Matrix provider.
Returns a list of dictionaries with keys 'room_id' and 'name' on success, or an
empty list on failure.
"""
try:
ids_list = [str(i).strip() for i in ids if str(i).strip()]
except Exception:
return []
if not ids_list:
return []
# Only attempt network resolution if homeserver + token are present
block = self._get_matrix_provider_block()
hs = block.get("homeserver")
token = block.get("access_token")
if not hs or not token:
return []
try:
from Provider.matrix import Matrix
provider = Matrix(self.config_data)
rooms = provider.list_rooms(room_ids=ids_list)
return rooms or []
except Exception as exc:
try:
debug(f"[config] failed to resolve matrix room names: {exc}")
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to debug matrix name resolution")
2026-01-30 16:24:08 -08:00
return []
2026-01-26 02:29:56 -08:00
def on_matrix_rooms_selected(self, result: Any = None) -> None:
if not isinstance(result, list):
if self._matrix_status:
self._matrix_status.update("Room selection cancelled.")
return
cleaned: List[str] = []
for item in result:
candidate = str(item or "").strip()
if candidate and candidate not in cleaned:
cleaned.append(candidate)
if not cleaned:
if self._matrix_status:
self._matrix_status.update("No default rooms were saved.")
return
matrix_block = self.config_data.setdefault("provider", {}).setdefault("matrix", {})
matrix_block["rooms"] = ", ".join(cleaned)
2026-01-30 10:47:47 -08:00
changed = count_changed_entries(self.config_data)
2026-01-27 14:56:01 -08:00
try:
entries = save_config(self.config_data)
except Exception as exc:
if self._matrix_status:
self._matrix_status.update(f"Saving default rooms failed: {exc}")
return
self.config_data = reload_config()
2026-01-26 02:29:56 -08:00
if self._matrix_status:
2026-01-30 10:47:47 -08:00
status = f"Saved {len(cleaned)} default room(s) ({changed} change(s)) to {db.db_path.name}."
2026-01-27 14:56:01 -08:00
self._matrix_status.update(status)
2026-01-26 02:29:56 -08:00
self.refresh_view()
2026-01-11 03:47:25 -08:00
@on(Input.Changed)
2026-01-30 16:24:08 -08:00
@on(Checkbox.Changed)
def on_checkbox_changed(self, event: Checkbox.Changed) -> None:
# Only respond to inline matrix checkboxes
try:
cbid = event.checkbox.id
except Exception:
cbid = None
if not cbid or cbid not in self._matrix_inline_checkbox_map:
return
any_selected = False
for checkbox_id in self._matrix_inline_checkbox_map.keys():
try:
cb = self.query_one(f"#{checkbox_id}", Checkbox)
if cb.value:
any_selected = True
break
except Exception:
continue
try:
self.query_one("#matrix-inline-save", Button).disabled = not any_selected
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to update matrix inline save button")
2026-01-30 16:24:08 -08:00
2026-01-11 03:47:25 -08:00
def on_input_changed(self, event: Input.Changed) -> None:
if event.input.id:
self._update_config_value(event.input.id, event.value)
@on(Select.Changed)
def on_select_changed(self, event: Select.Changed) -> None:
if event.select.id:
# Select value can be the 'Select.BLANK' sentinel
if event.value != Select.BLANK:
self._update_config_value(event.select.id, event.value)
2026-01-11 00:39:17 -08:00
2026-01-23 16:46:48 -08:00
def save_all(self) -> int:
self._synchronize_inputs_to_config()
2026-01-30 10:47:47 -08:00
# Compute change count prior to persisting so callers can report the number of
# actual changes rather than the total number of rows written to the DB.
changed = count_changed_entries(self.config_data)
# Snapshot config for background save
snapshot = deepcopy(self.config_data)
# Schedule the save to run on a background worker so the UI doesn't block.
try:
# Prefer Textual's worker when running inside the app
self._save_background(snapshot, changed)
except Exception:
# Fallback: start a plain thread that runs the underlying task body
import threading
func = getattr(self._save_background, "__wrapped__", None)
if func:
thread = threading.Thread(target=lambda: func(self, snapshot, changed), daemon=True)
else:
thread = threading.Thread(target=lambda: self._save_background(snapshot, changed), daemon=True)
thread.start()
# Ensure DB path indicator is current and show saving status
self._db_path = str(db.db_path)
try:
self.query_one("#config-db-path", Static).update(self._db_path)
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to update config db path label")
2026-01-30 10:47:47 -08:00
try:
self.query_one("#config-last-save", Static).update("Last saved: (saving...)")
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to update config last-save label")
2026-01-30 10:47:47 -08:00
log(f"ConfigModal scheduled save (changed={changed})")
return changed
@work(thread=True)
def _save_background(self, cfg: Dict[str, Any], changed: int) -> None:
try:
2026-01-31 21:32:51 -08:00
# Use the verified save path which will check that crucial keys
# (like AllDebrid API keys) persisted to disk. This ensures the UI
# surface reports a failure immediately if post-save verification fails.
saved_entries = save_config_and_verify(cfg)
2026-01-30 10:47:47 -08:00
try:
appobj = self.app
except Exception:
appobj = None
if appobj and hasattr(appobj, 'call_from_thread'):
appobj.call_from_thread(self._on_save_complete, True, None, changed, saved_entries)
else:
# If no app (e.g., running under tests), call completion directly
self._on_save_complete(True, None, changed, saved_entries)
except ConfigSaveConflict as exc:
try:
appobj = self.app
except Exception:
appobj = None
if appobj and hasattr(appobj, 'call_from_thread'):
appobj.call_from_thread(self._on_save_complete, False, str(exc), changed, 0)
else:
self._on_save_complete(False, str(exc), changed, 0)
2026-01-31 21:32:51 -08:00
except Exception as exc:
# Bubble up verification/other save errors back to the UI so the
# user knows persistent storage failed.
try:
appobj = self.app
except Exception:
appobj = None
if appobj and hasattr(appobj, 'call_from_thread'):
appobj.call_from_thread(self._on_save_complete, False, str(exc), changed, 0)
else:
self._on_save_complete(False, str(exc), changed, 0)
2026-01-30 10:47:47 -08:00
except Exception as exc:
try:
appobj = self.app
except Exception:
appobj = None
if appobj and hasattr(appobj, 'call_from_thread'):
appobj.call_from_thread(self._on_save_complete, False, str(exc), changed, 0)
else:
self._on_save_complete(False, str(exc), changed, 0)
def _on_save_complete(self, success: bool, error: Optional[str], changed: int, saved_entries: int) -> None:
# Safely determine whether a Textual app context is available. Accessing
# `self.app` can raise when not running inside the TUI; handle that.
try:
appobj = self.app
except Exception:
appobj = None
if success:
try:
self.config_data = reload_config()
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to reload config after save completion")
2026-01-30 10:47:47 -08:00
# Update last-saved label with file timestamp for visibility
db_mtime = None
try:
db_mtime = db.db_path.stat().st_mtime
db_mtime = __import__('datetime').datetime.utcfromtimestamp(db_mtime).isoformat() + "Z"
except Exception:
db_mtime = None
if appobj:
try:
if changed == 0:
label_text = f"Last saved: (no changes)"
else:
label_text = f"Last saved: {changed} change(s) at {db_mtime or '(unknown)'}"
try:
self.query_one("#config-last-save", Static).update(label_text)
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to update last-save label with timestamp")
2026-01-30 10:47:47 -08:00
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to compute last-save label text")
2026-01-30 10:47:47 -08:00
try:
self.refresh_view()
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to refresh config editor view after save completion")
2026-01-30 10:47:47 -08:00
try:
self.notify(f"Configuration saved ({changed} change(s)) to {db.db_path.name}", timeout=5)
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to show configuration saved notification")
2026-01-30 10:47:47 -08:00
else:
# No TUI available; log instead of updating UI
log(f"Configuration saved ({changed} change(s)) to {db.db_path.name}")
log(f"ConfigModal saved {saved_entries} configuration entries (changed={changed})")
else:
# Save failed; notify via UI if available, otherwise log
if appobj:
try:
self.notify(f"Save failed: {error}", severity="error", timeout=10)
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to show save failed notification")
2026-01-30 10:47:47 -08:00
try:
self.config_data = reload_config()
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to reload config after save failure")
2026-01-30 10:47:47 -08:00
try:
self.refresh_view()
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to refresh view after save failure")
2026-01-30 10:47:47 -08:00
else:
log(f"Save failed: {error}")
2026-01-11 02:42:08 -08:00
def validate_current_editor(self) -> bool:
"""Ensure all required fields for the current item are filled."""
if not self.editing_item_name:
return True
item_type = str(self.editing_item_type or "")
item_name = str(self.editing_item_name or "")
required_keys = []
section = {}
if item_type.startswith("store-"):
stype = item_type.replace("store-", "")
classes = _discover_store_classes()
if stype in classes:
2026-01-11 03:24:49 -08:00
required_keys = list(_required_keys_for(classes[stype]))
2026-01-11 02:42:08 -08:00
section = self.config_data.get("store", {}).get(stype, {}).get(item_name, {})
elif item_type == "provider":
from ProviderCore.registry import get_provider_class
try:
pcls = get_provider_class(item_name)
if pcls:
2026-01-11 03:24:49 -08:00
# Collect required keys from schema
2026-01-21 22:52:52 -08:00
if hasattr(pcls, "config_schema") and callable(pcls.config_schema):
2026-01-19 06:24:09 -08:00
for field_def in pcls.config_schema():
2026-01-11 03:24:49 -08:00
if field_def.get("required"):
k = field_def.get("key")
if k and k not in required_keys:
required_keys.append(k)
# Merge with legacy required keys
for rk in pcls.required_config_keys():
if rk not in required_keys:
required_keys.append(rk)
2026-01-11 02:42:08 -08:00
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to inspect provider class '%s' for required keys", item_name)
2026-01-11 02:42:08 -08:00
section = self.config_data.get("provider", {}).get(item_name, {})
2026-01-30 12:04:37 -08:00
elif item_type == "tool":
try:
import importlib
mod = importlib.import_module(f"tool.{item_name}")
if hasattr(mod, "config_schema") and callable(mod.config_schema):
for field_def in mod.config_schema():
if field_def.get("required"):
k = field_def.get("key")
if k and k not in required_keys:
required_keys.append(k)
except Exception:
2026-01-31 19:00:04 -08:00
logger.exception("Failed to inspect tool module 'tool.%s' for required keys", item_name)
2026-01-30 12:04:37 -08:00
section = self.config_data.get("tool", {}).get(item_name, {})
2026-01-11 02:42:08 -08:00
# Check required keys
for rk in required_keys:
# Case-insensitive lookup for the required key in the current section
val = None
rk_upper = rk.upper()
for k, v in section.items():
if k.upper() == rk_upper:
val = v
break
if not val or not str(val).strip():
self.notify(f"Required field '{rk}' cannot be blank", severity="error")
return False
return True