import re from copy import deepcopy from typing import Any, Dict, List, Optional, Iterable import traceback from textual import on, work from textual.app import ComposeResult from textual.containers import Container, Horizontal, Vertical, ScrollableContainer from textual.screen import ModalScreen from textual.widgets import Static, Button, Input, Label, ListView, ListItem, Rule, Select, Checkbox from pathlib import Path from SYS.config import load_config, save_config, reload_config, global_config, count_changed_entries, ConfigSaveConflict from SYS.database import db from SYS.logger import log, debug from Store.registry import _discover_store_classes, _required_keys_for from ProviderCore.registry import list_providers from TUI.modalscreen.matrix_room_picker import MatrixRoomPicker from TUI.modalscreen.selection_modal import SelectionModal class ConfigModal(ModalScreen): """A modal for editing the configuration.""" BINDINGS = [ ("ctrl+v", "paste", "Paste"), ("ctrl+c", "copy", "Copy"), ] CSS = """ ConfigModal { align: center middle; background: $boost; } #config-container { width: 90%; height: 90%; background: $panel; border: thick $primary; padding: 1; } .section-title { background: $accent; color: $text; padding: 0 1; margin-bottom: 1; text-align: center; text-style: bold; height: 3; content-align: center middle; } #config-sidebar { width: 25%; border-right: solid $surface; } #config-content { width: 75%; padding: 1; } .config-field { margin-bottom: 1; height: auto; } .config-label { width: 100%; text-style: bold; color: $accent; } .field-row { height: 5; margin-bottom: 1; align: left middle; } .config-input { width: 1fr; } #config-actions { height: 3; align: right middle; } .item-row { height: 5; margin-bottom: 1; padding: 0 1; border: solid $surface; } .item-label { width: 1fr; height: 3; content-align: left middle; } .item-row Button { width: 16; height: 3; } Button { margin: 0 1; } /* Inline matrix rooms list sizing & style (larger, scrollable) */ #matrix-rooms-inline { height: 16; border: solid $surface; padding: 1; margin-bottom: 1; } .matrix-room-row { border-bottom: solid $surface; padding: 1 0; align: left middle; } """ def __init__(self) -> None: super().__init__() # Load config from the workspace root (parent of SYS) self.config_data = load_config() self.current_category = "globals" self.editing_item_type = None # 'store' or 'provider' self.editing_item_name = None self._button_id_map = {} self._input_id_map = {} self._matrix_status: Optional[Static] = None self._matrix_test_running = False self._editor_snapshot: Optional[Dict[str, Any]] = None # Inline matrix rooms controls self._matrix_inline_list: Optional[ListView] = None self._matrix_inline_checkbox_map: Dict[str, str] = {} # Path to the database file used by this process (for diagnostics) self._db_path = str(db.db_path) def _capture_editor_snapshot(self) -> None: self._editor_snapshot = deepcopy(self.config_data) def _revert_unsaved_editor_changes(self) -> None: if self._editor_snapshot is not None: self.config_data = deepcopy(self._editor_snapshot) self._editor_snapshot = None def _editor_has_changes(self) -> bool: if self._editor_snapshot is None: return True return self.config_data != self._editor_snapshot def compose(self) -> ComposeResult: with Container(id="config-container"): yield Static("CONFIGURATION EDITOR", classes="section-title") yield Static(f"DB: {self._db_path}", classes="config-label", id="config-db-path") yield Static("Last saved: unknown", classes="config-label", id="config-last-save") with Horizontal(): with Vertical(id="config-sidebar"): yield Label("Categories", classes="config-label") with ListView(id="category-list"): yield ListItem(Label("Global Settings"), id="cat-globals") yield ListItem(Label("Stores"), id="cat-stores") yield ListItem(Label("Providers"), id="cat-providers") yield ListItem(Label("Tools"), id="cat-tools") with Vertical(id="config-content"): yield ScrollableContainer(id="fields-container") with Horizontal(id="config-actions"): yield Button("Save", variant="success", id="save-btn") yield Button("Add Store", variant="primary", id="add-store-btn") yield Button("Add Provider", variant="primary", id="add-provider-btn") yield Button("Add Tool", variant="primary", id="add-tool-btn") yield Button("Back", id="back-btn") yield Button("Close", variant="error", id="cancel-btn") def on_mount(self) -> None: self.query_one("#add-store-btn", Button).display = False self.query_one("#add-provider-btn", Button).display = False try: self.query_one("#add-tool-btn", Button).display = False except Exception: pass # Update DB path and last-saved on mount try: self.query_one("#config-db-path", Static).update(self._db_path) except Exception: pass try: mtime = None try: mtime = db.db_path.stat().st_mtime mtime = __import__('datetime').datetime.utcfromtimestamp(mtime).isoformat() + "Z" except Exception: mtime = None self.query_one("#config-last-save", Static).update(f"Last saved: {mtime or '(unknown)'}") except Exception: pass self.refresh_view() def refresh_view(self) -> None: """ Refresh the content area. We debounce this call and use a render_id to avoid race conditions with Textual's async widget mounting. """ self._render_id = getattr(self, "_render_id", 0) + 1 if hasattr(self, "_refresh_timer"): self._refresh_timer.stop() self._refresh_timer = self.set_timer(0.02, self._actual_refresh) def _actual_refresh(self) -> None: try: container = self.query_one("#fields-container", ScrollableContainer) except Exception: return self._button_id_map.clear() self._input_id_map.clear() # Clear existing container.query("*").remove() # Update visibility of buttons try: self.query_one("#add-store-btn", Button).display = (self.current_category == "stores" and self.editing_item_name is None) self.query_one("#add-provider-btn", Button).display = (self.current_category == "providers" and self.editing_item_name is None) self.query_one("#add-tool-btn", Button).display = (self.current_category == "tools" and self.editing_item_name is None) self.query_one("#back-btn", Button).display = (self.editing_item_name is not None) self.query_one("#save-btn", Button).display = (self.editing_item_name is not None or self.current_category == "globals") except Exception: pass render_id = self._render_id def do_mount(): # If a new refresh was started, ignore this old mount request if getattr(self, "_render_id", 0) != render_id: return # Final check that container is empty. remove() is async. if container.children: for child in list(container.children): child.remove() if self.editing_item_name: self.render_item_editor(container) elif self.current_category == "globals": self.render_globals(container) elif self.current_category == "stores": self.render_stores(container) elif self.current_category == "providers": self.render_providers(container) elif self.current_category == "tools": self.render_tools(container) self.call_after_refresh(do_mount) def render_globals(self, container: ScrollableContainer) -> None: container.mount(Label("General Configuration", classes="config-label")) # Get global schema schema_map = {f["key"].lower(): f for f in global_config()} existing_keys_lower = set() idx = 0 # Show fields defined in schema first for key_lower, field_def in schema_map.items(): existing_keys_lower.add(key_lower) label_text = field_def.get("label") or field_def["key"] choices = field_def.get("choices") # Find current value (case-insensitive) current_val = None found_key = field_def["key"] for k, v in self.config_data.items(): if k.lower() == key_lower: current_val = str(v) found_key = k break if current_val is None: current_val = str(field_def.get("default") or "") container.mount(Label(label_text)) inp_id = f"global-{idx}" self._input_id_map[inp_id] = found_key if choices: # Normalize boolean-like choices to lowercase ('true'/'false') to avoid duplicate choices normalized_choices = [] for c in choices: s = str(c) if s.lower() in ("true", "false"): normalized_choices.append(s.lower()) else: normalized_choices.append(s) select_options = [(str(c), str(c)) for c in normalized_choices] # Normalize current value as well cur_val = str(current_val) if current_val is not None else "" if cur_val.lower() in ("true", "false"): cur_val = cur_val.lower() if cur_val not in normalized_choices: select_options.insert(0, (cur_val, cur_val)) sel = Select(select_options, value=cur_val, id=inp_id) container.mount(sel) else: row = Horizontal(classes="field-row") container.mount(row) row.mount(Input(value=current_val, id=inp_id, classes="config-input")) idx += 1 # Show any other top-level keys not in schema for k, v in self.config_data.items(): if not isinstance(v, dict) and not k.startswith("_"): if k.lower() in existing_keys_lower: continue inp_id = f"global-{idx}" self._input_id_map[inp_id] = k container.mount(Label(k)) row = Horizontal(classes="field-row") container.mount(row) row.mount(Input(value=str(v), id=inp_id, classes="config-input")) row.mount(Button("Paste", id=f"paste-{inp_id}", classes="paste-btn")) idx += 1 def render_stores(self, container: ScrollableContainer) -> None: container.mount(Label("Configured Stores", classes="config-label")) stores = self.config_data.get("store", {}) if not stores: container.mount(Static("No stores configured.")) else: # stores is structured as: {type: {name_key: config}} idx = 0 for stype, instances in stores.items(): if isinstance(instances, dict): for name_key, conf in instances.items(): # Use the name field from the config if it exists, otherwise use the key display_name = name_key if isinstance(conf, dict): display_name = ( conf.get("NAME") or conf.get("name") or conf.get("Name") or name_key ) edit_id = f"edit-store-{idx}" del_id = f"del-store-{idx}" self._button_id_map[edit_id] = ("edit", f"store-{stype}", name_key) self._button_id_map[del_id] = ("del", f"store-{stype}", name_key) idx += 1 row = Horizontal( Static(f"{display_name} ({stype})", classes="item-label"), Button("Edit", id=edit_id), Button("Delete", variant="error", id=del_id), classes="item-row" ) container.mount(row) def render_providers(self, container: ScrollableContainer) -> None: container.mount(Label("Configured Providers", classes="config-label")) providers = self.config_data.get("provider", {}) if not providers: container.mount(Static("No providers configured.")) else: for i, (name, _) in enumerate(providers.items()): edit_id = f"edit-provider-{i}" del_id = f"del-provider-{i}" self._button_id_map[edit_id] = ("edit", "provider", name) self._button_id_map[del_id] = ("del", "provider", name) row = Horizontal( Static(name, classes="item-label"), Button("Edit", id=edit_id), Button("Delete", variant="error", id=del_id), classes="item-row" ) container.mount(row) def render_tools(self, container: ScrollableContainer) -> None: container.mount(Label("Configured Tools", classes="config-label")) tools = self.config_data.get("tool", {}) if not tools: container.mount(Static("No tools configured.")) else: for i, (name, _) in enumerate(tools.items()): edit_id = f"edit-tool-{i}" del_id = f"del-tool-{i}" self._button_id_map[edit_id] = ("edit", "tool", name) self._button_id_map[del_id] = ("del", "tool", name) row = Horizontal( Static(name, classes="item-label"), Button("Edit", id=edit_id), Button("Delete", variant="error", id=del_id), classes="item-row" ) container.mount(row) def render_item_editor(self, container: ScrollableContainer) -> None: item_type = str(self.editing_item_type or "") item_name = str(self.editing_item_name or "") provider_schema_map = {} # Parse item_type for store-{stype} or just provider if item_type.startswith("store-"): stype = item_type.replace("store-", "") container.mount(Label(f"Editing Store: {item_name} ({stype})", classes="config-label")) section = self.config_data.get("store", {}).get(stype, {}).get(item_name, {}) # Fetch Store schema classes = _discover_store_classes() if stype in classes: cls = classes[stype] if hasattr(cls, "config_schema") and callable(cls.config_schema): for field_def in cls.config_schema(): k = field_def.get("key") if k: provider_schema_map[k.upper()] = field_def else: container.mount(Label(f"Editing {item_type.capitalize()}: {item_name}", classes="config-label")) section = self.config_data.get(item_type, {}).get(item_name, {}) # Fetch Provider schema if item_type == "provider": from ProviderCore.registry import get_provider_class try: pcls = get_provider_class(item_name) if pcls and hasattr(pcls, "config_schema") and callable(pcls.config_schema): for field_def in pcls.config_schema(): k = field_def.get("key") if k: provider_schema_map[k.upper()] = field_def except Exception: pass # Fetch Tool schema if item_type == "tool": try: import importlib mod = importlib.import_module(f"tool.{item_name}") if hasattr(mod, "config_schema") and callable(mod.config_schema): for field_def in mod.config_schema(): k = field_def.get("key") if k: provider_schema_map[k.upper()] = field_def except Exception: pass # Use columns for better layout of inputs with paste buttons container.mount(Label("Edit Settings")) # render_item_editor will handle the inputs for us if we set these # but wait, render_item_editor is called from refresh_view, not here. # actually we don't need to do anything else here because refresh_view calls render_item_editor # which now handles the paste buttons. # Show all existing keys existing_keys_upper = set() idx = 0 for k, v in section.items(): if k.startswith("_"): continue # Skip low-level keys that shouldn't be editable via the form UI if ( item_type == "provider" and isinstance(item_name, str) and item_name.strip().lower() == "matrix" and str(k or "").strip().lower() in ("rooms", "cached_rooms") ): # These are managed by the inline UI and should not be edited directly. continue # Deduplicate keys case-insensitively (e.g. name vs NAME vs Name) k_upper = k.upper() if k_upper in existing_keys_upper: continue existing_keys_upper.add(k_upper) # Determine display props from schema label_text = k is_secret = False choices = None schema = provider_schema_map.get(k_upper) if schema: label_text = schema.get("label") or k if schema.get("required"): label_text += " *" if schema.get("secret"): is_secret = True choices = schema.get("choices") container.mount(Label(label_text)) inp_id = f"item-{idx}" self._input_id_map[inp_id] = k if choices: # Select takes a list of (label, value) tuples; normalize boolean-like values select_options = [] choice_values = [] for c in choices: if isinstance(c, tuple) and len(c) == 2: label = str(c[0]) val = str(c[1]) else: label = str(c) val = str(c) if val.lower() in ("true", "false"): val = val.lower() label = val select_options.append((label, val)) choice_values.append(val) # If current value not in choices, add it or stay blank current_val = str(v) if current_val.lower() in ("true", "false"): current_val = current_val.lower() if current_val not in choice_values: select_options.insert(0, (current_val, current_val)) sel = Select(select_options, value=current_val, id=inp_id) container.mount(sel) else: row = Horizontal(classes="field-row") container.mount(row) inp = Input(value=str(v), id=inp_id, classes="config-input") if is_secret: inp.password = True row.mount(inp) idx += 1 # Add required/optional fields from schema that are missing for k_upper, field_def in provider_schema_map.items(): if k_upper not in existing_keys_upper: existing_keys_upper.add(k_upper) key = field_def["key"] label_text = field_def.get("label") or key if field_def.get("required"): label_text += " *" default_val = str(field_def.get("default") or "") choices = field_def.get("choices") container.mount(Label(label_text)) inp_id = f"item-{idx}" self._input_id_map[inp_id] = key if choices: select_options = [] choice_values = [] for c in choices: if isinstance(c, tuple) and len(c) == 2: label = str(c[0]) val = str(c[1]) else: label = str(c) val = str(c) if val.lower() in ("true", "false"): val = val.lower() label = val select_options.append((label, val)) choice_values.append(val) # Normalize default/current value current_val = str(default_val) if default_val is not None else "" if current_val.lower() in ("true", "false"): current_val = current_val.lower() if current_val not in choice_values: select_options.insert(0, (current_val, current_val)) sel = Select(select_options, value=current_val, id=inp_id) container.mount(sel) else: row = Horizontal(classes="field-row") container.mount(row) inp = Input(value=default_val, id=inp_id, classes="config-input") if field_def.get("secret"): inp.password = True if field_def.get("placeholder"): inp.placeholder = field_def.get("placeholder") row.mount(inp) row.mount(Button("Paste", id=f"paste-{inp_id}", classes="paste-btn")) idx += 1 # If it's a store, we might have required keys (legacy check fallback) if item_type.startswith("store-"): stype = item_type.replace("store-", "") classes = _discover_store_classes() if stype in classes: required_keys = _required_keys_for(classes[stype]) for rk in required_keys: # Case-insensitive deduplication (fix path vs PATH) if rk.upper() not in existing_keys_upper: container.mount(Label(rk)) inp_id = f"item-{idx}" self._input_id_map[inp_id] = rk row = Horizontal(classes="field-row") container.mount(row) row.mount(Input(value="", id=inp_id, classes="config-input")) idx += 1 # If it's a provider, we might have required keys (legacy check fallback) if item_type == "provider": # 2. Legacy required_config_keys from ProviderCore.registry import get_provider_class try: pcls = get_provider_class(item_name) if pcls: required_keys = pcls.required_config_keys() for rk in required_keys: if rk.upper() not in existing_keys_upper: container.mount(Label(rk)) inp_id = f"item-{idx}" self._input_id_map[inp_id] = rk row = Horizontal(classes="field-row") container.mount(row) row.mount(Input(value="", id=inp_id, classes="config-input")) row.mount(Button("Paste", id=f"paste-{inp_id}", classes="paste-btn")) idx += 1 except Exception: pass if ( item_type == "provider" and isinstance(item_name, str) and item_name.strip().lower() == "matrix" ): container.mount(Rule()) container.mount(Label("Matrix helpers", classes="config-label")) status = Static("Set homeserver + token, then test before saving", id="matrix-status") container.mount(status) row = Horizontal(classes="field-row") container.mount(row) row.mount(Button("Test connection", id="matrix-test-btn")) # Load rooms refreshes the inline list and caches the results (no popup) row.mount(Button("Load rooms", variant="primary", id="matrix-load-btn")) self._matrix_status = status # Inline rooms list for selecting default rooms (populated after a successful test) container.mount(Label("Default Rooms", classes="config-label", id="matrix-inline-label")) # Start with empty list; it will be filled when test loads rooms container.mount(ListView(id="matrix-rooms-inline")) # Inline actions row2 = Horizontal(classes="field-row") container.mount(row2) row2.mount(Button("Select All", id="matrix-inline-select-all")) row2.mount(Button("Clear All", id="matrix-inline-clear")) save_inline = Button("Save defaults", variant="success", id="matrix-inline-save") save_inline.disabled = True row2.mount(save_inline) # Local bookkeeping maps try: self._matrix_inline_checkbox_map = {} self._matrix_inline_list = self.query_one("#matrix-rooms-inline", ListView) # Do NOT auto-render cached rooms here; only show explicitly saved defaults try: existing_ids = self._parse_matrix_rooms_value() cached = self._get_cached_matrix_rooms() rooms_to_render: List[Dict[str, Any]] = [] # Start with cached rooms (from last Load). These are shown # in the inline Default Rooms list but are unselected unless # they are in the saved defaults list. if cached: rooms_to_render.extend(cached) # Ensure saved default room ids are present and will be selected if existing_ids: cached_ids = {str(r.get("room_id") or "").strip() for r in rooms_to_render if isinstance(r, dict)} need_resolve = [rid for rid in existing_ids if rid not in cached_ids] if need_resolve: try: resolved = self._resolve_matrix_rooms_by_ids(need_resolve) if resolved: rooms_to_render.extend(resolved) else: rooms_to_render.extend([{"room_id": rid, "name": ""} for rid in need_resolve]) except Exception: rooms_to_render.extend([{"room_id": rid, "name": ""} for rid in need_resolve]) # Deduplicate while preserving order deduped: List[Dict[str, Any]] = [] seen_ids: set[str] = set() for r in rooms_to_render: try: rid = str(r.get("room_id") or "").strip() if not rid or rid in seen_ids: continue seen_ids.add(rid) deduped.append(r) except Exception: continue if self._matrix_inline_list is not None and deduped: try: self._render_matrix_rooms_inline(deduped) except Exception: pass except Exception: pass except Exception: self._matrix_inline_checkbox_map = {} self._matrix_inline_list = None def create_field(self, name: str, value: Any, id: str) -> Vertical: # This method is now unused - we mount labels and inputs directly v = Vertical(classes="config-field") return v def on_list_view_selected(self, event: ListView.Selected) -> None: # Only respond to selections from the left-hand category list. Avoid # resetting editor state when other ListViews (like the inline rooms # list) trigger selection events. if not event.item: return item_id = getattr(event.item, "id", None) if item_id not in ("cat-globals", "cat-stores", "cat-providers", "cat-tools"): return if item_id == "cat-globals": self.current_category = "globals" elif item_id == "cat-stores": self.current_category = "stores" elif item_id == "cat-providers": self.current_category = "providers" elif item_id == "cat-tools": self.current_category = "tools" # Reset editor state and refresh view for the new category self.editing_item_name = None self.editing_item_type = None self.refresh_view() def on_button_pressed(self, event: Button.Pressed) -> None: bid = event.button.id if not bid: return if bid == "cancel-btn": self._revert_unsaved_editor_changes() self.dismiss() elif bid == "back-btn": self._revert_unsaved_editor_changes() self.editing_item_name = None self.editing_item_type = None self.refresh_view() elif bid == "save-btn": self._synchronize_inputs_to_config() if not self.validate_current_editor(): return if self.editing_item_name and not self._editor_has_changes(): self.notify("No changes to save", severity="warning", timeout=3) return try: saved = self.save_all() if saved == 0: msg = f"Configuration saved (no rows changed) to {db.db_path.name}" else: msg = f"Configuration saved ({saved} change(s)) to {db.db_path.name}" # Make the success notification visible a bit longer so it's not missed self.notify(msg, timeout=5) # Return to the main list view within the current category self.editing_item_name = None self.editing_item_type = None self.refresh_view() self._editor_snapshot = None except ConfigSaveConflict as exc: # A concurrent on-disk change was detected; do not overwrite it. self.notify( "Save aborted: configuration changed on disk. The editor will refresh.", severity="error", timeout=10, ) # Refresh our in-memory view from disk and drop the editor snapshot try: self.config_data = reload_config() except Exception: pass self._editor_snapshot = None self.editing_item_name = None self.editing_item_type = None self.refresh_view() except Exception as exc: self.notify(f"Save failed: {exc}", severity="error", timeout=10) elif bid in self._button_id_map: action, itype, name = self._button_id_map[bid] if action == "edit": self._capture_editor_snapshot() self.editing_item_type = itype self.editing_item_name = name self.refresh_view() elif action == "del": removed = False if itype.startswith("store-"): stype = itype.replace("store-", "") if "store" in self.config_data and stype in self.config_data["store"]: if name in self.config_data["store"][stype]: del self.config_data["store"][stype][name] removed = True elif itype == "provider": if "provider" in self.config_data and name in self.config_data["provider"]: del self.config_data["provider"][name] removed = True if str(name).strip().lower() == "alldebrid": self._remove_alldebrid_store_entry() elif itype == "tool": if "tool" in self.config_data and name in self.config_data["tool"]: del self.config_data["tool"][name] removed = True if removed: try: saved = self.save_all() self.notify("Saving configuration...", timeout=3) except Exception as exc: self.notify(f"Save failed: {exc}", severity="error", timeout=10) self.refresh_view() elif bid == "add-store-btn": all_classes = _discover_store_classes() options = [] for stype, cls in all_classes.items(): if hasattr(cls, "config_schema") and callable(cls.config_schema): try: if cls.config_schema(): options.append(stype) except Exception: pass self.app.push_screen(SelectionModal("Select Store Type", options), callback=self.on_store_type_selected) elif bid == "add-provider-btn": provider_names = list(list_providers().keys()) options = [] from ProviderCore.registry import get_provider_class for ptype in provider_names: pcls = get_provider_class(ptype) if pcls and hasattr(pcls, "config_schema") and callable(pcls.config_schema): try: if pcls.config_schema(): options.append(ptype) except Exception: pass self.app.push_screen(SelectionModal("Select Provider Type", options), callback=self.on_provider_type_selected) elif bid == "add-tool-btn": # Discover tool modules that advertise a config_schema() options = [] try: import pkgutil import importlib import tool as _tool_pkg for _mod in pkgutil.iter_modules(_tool_pkg.__path__): try: mod = importlib.import_module(f"tool.{_mod.name}") if hasattr(mod, "config_schema") and callable(mod.config_schema): options.append(_mod.name) except Exception: continue except Exception: # Fallback to known entry options = ["ytdlp"] if options: options.sort() self.app.push_screen(SelectionModal("Select Tool", options), callback=self.on_tool_type_selected) elif bid == "matrix-test-btn": self._request_matrix_test() elif bid == "matrix-load-btn": # Refresh the inline rooms list and cache the results (no popup) self._request_matrix_load() elif bid == "matrix-inline-select-all": for checkbox_id in list(self._matrix_inline_checkbox_map.keys()): try: cb = self.query_one(f"#{checkbox_id}", Checkbox) cb.value = True except Exception: pass try: self.query_one("#matrix-inline-save", Button).disabled = False except Exception: pass elif bid == "matrix-inline-clear": for checkbox_id in list(self._matrix_inline_checkbox_map.keys()): try: cb = self.query_one(f"#{checkbox_id}", Checkbox) cb.value = False except Exception: pass try: self.query_one("#matrix-inline-save", Button).disabled = True except Exception: pass elif bid == "matrix-inline-save": selected: List[str] = [] for checkbox_id, room_id in self._matrix_inline_checkbox_map.items(): try: cb = self.query_one(f"#{checkbox_id}", Checkbox) if cb.value and room_id: selected.append(room_id) except Exception: pass if not selected: if self._matrix_status: self._matrix_status.update("No default rooms were saved.") return matrix_block = self.config_data.setdefault("provider", {}).setdefault("matrix", {}) matrix_block["rooms"] = ", ".join(selected) changed = count_changed_entries(self.config_data) try: entries = save_config(self.config_data) except Exception as exc: if self._matrix_status: self._matrix_status.update(f"Saving default rooms failed: {exc}") return self.config_data = reload_config() if self._matrix_status: status = f"Saved {len(selected)} default room(s) ({changed} change(s)) to {db.db_path.name}." self._matrix_status.update(status) try: self.query_one("#matrix-inline-save", Button).disabled = True except Exception: pass self.refresh_view() async def focus_and_paste(self, inp: Input) -> None: if hasattr(self.app, "paste_from_clipboard"): text = await self.app.paste_from_clipboard() if text: # Replace selection or append inp.value = str(inp.value) + text inp.focus() self.notify("Pasted from clipboard") else: self.notify("Clipboard not supported in this terminal", severity="warning") async def action_paste(self) -> None: focused = self.focused if isinstance(focused, Input): await self.focus_and_paste(focused) async def action_copy(self) -> None: focused = self.focused if isinstance(focused, Input) and focused.value: if hasattr(self.app, "copy_to_clipboard"): self.app.copy_to_clipboard(str(focused.value)) self.notify("Copied to clipboard") else: self.notify("Clipboard not supported in this terminal", severity="warning") # Backup/restore helpers removed: forensics/audit mode disabled and restore UI removed. def on_store_type_selected(self, stype: str) -> None: if not stype: return self._capture_editor_snapshot() existing_names: set[str] = set() store_block = self.config_data.get("store") if isinstance(store_block, dict): st_entries = store_block.get(stype) if isinstance(st_entries, dict): existing_names = {str(name) for name in st_entries.keys() if name} base_name = f"new_{stype}" new_name = base_name suffix = 1 while new_name in existing_names: suffix += 1 new_name = f"{base_name}_{suffix}" if "store" not in self.config_data: self.config_data["store"] = {} if stype not in self.config_data["store"]: self.config_data["store"][stype] = {} # Default config for the new store new_config = {"NAME": new_name} classes = _discover_store_classes() if stype in classes: cls = classes[stype] # Use schema for defaults if present if hasattr(cls, "config_schema") and callable(cls.config_schema): for field_def in cls.config_schema(): key = field_def.get("key") if key: val = field_def.get("default", "") # Don't override NAME if we already set it to new_stype if key.upper() == "NAME": continue new_config[key] = val else: # Fallback to required keys list required = _required_keys_for(cls) for rk in required: if rk.upper() != "NAME": new_config[rk] = "" self.config_data["store"][stype][new_name] = new_config self.editing_item_type = f"store-{stype}" self.editing_item_name = new_name self.refresh_view() def on_provider_type_selected(self, ptype: str) -> None: if not ptype: return self._capture_editor_snapshot() if "provider" not in self.config_data: self.config_data["provider"] = {} # For providers, they are usually top-level entries in 'provider' dict if ptype not in self.config_data["provider"]: from ProviderCore.registry import get_provider_class try: pcls = get_provider_class(ptype) new_config = {} if pcls: # Use schema for defaults if hasattr(pcls, "config_schema") and callable(pcls.config_schema): for field_def in pcls.config_schema(): key = field_def.get("key") if key: new_config[key] = field_def.get("default", "") else: # Fallback to legacy required keys required = pcls.required_config_keys() for rk in required: new_config[rk] = "" self.config_data["provider"][ptype] = new_config except Exception: self.config_data["provider"][ptype] = {} self.editing_item_type = "provider" self.editing_item_name = ptype self.refresh_view() def on_tool_type_selected(self, tname: str) -> None: if not tname: return self._capture_editor_snapshot() if "tool" not in self.config_data: self.config_data["tool"] = {} if tname not in self.config_data["tool"]: new_config = {} try: import importlib mod = importlib.import_module(f"tool.{tname}") if hasattr(mod, "config_schema") and callable(mod.config_schema): for field_def in mod.config_schema(): key = field_def.get("key") if key: new_config[key] = field_def.get("default", "") except Exception: pass self.config_data["tool"][tname] = new_config self.editing_item_type = "tool" self.editing_item_name = tname self.refresh_view() def _update_config_value(self, widget_id: str, value: Any) -> None: if widget_id not in self._input_id_map: return key = self._input_id_map[widget_id] raw_value = value is_blank_string = isinstance(raw_value, str) and not raw_value.strip() existing_value: Any = None item_type = str(self.editing_item_type or "") item_name = str(self.editing_item_name or "") if widget_id.startswith("global-"): existing_value = self.config_data.get(key) elif widget_id.startswith("item-") and item_name: if item_type.startswith("store-"): stype = item_type.replace("store-", "") store_block = self.config_data.get("store") if isinstance(store_block, dict): type_block = store_block.get(stype) if isinstance(type_block, dict): section = type_block.get(item_name) if isinstance(section, dict): existing_value = section.get(key) else: section_block = self.config_data.get(item_type) if isinstance(section_block, dict): section = section_block.get(item_name) if isinstance(section, dict): existing_value = section.get(key) if is_blank_string and existing_value is None: return # Try to preserve boolean/integer types processed_value = raw_value if isinstance(raw_value, str): low = raw_value.lower() if low == "true": processed_value = True elif low == "false": processed_value = False elif raw_value.isdigit(): processed_value = int(raw_value) if widget_id.startswith("global-"): self.config_data[key] = processed_value elif widget_id.startswith("item-") and item_name: if item_type.startswith("store-"): stype = item_type.replace("store-", "") if "store" not in self.config_data: self.config_data["store"] = {} if stype not in self.config_data["store"]: self.config_data["store"][stype] = {} if item_name not in self.config_data["store"][stype]: self.config_data["store"][stype][item_name] = {} # Special case: Renaming the store via the NAME field if key.upper() == "NAME" and processed_value and str(processed_value) != item_name: new_name = str(processed_value) self.config_data["store"][stype][new_name] = self.config_data["store"][stype].pop(item_name) self.editing_item_name = new_name item_name = new_name self.config_data["store"][stype][item_name][key] = processed_value else: if item_type not in self.config_data: self.config_data[item_type] = {} if item_name not in self.config_data[item_type]: self.config_data[item_type][item_name] = {} self.config_data[item_type][item_name][key] = processed_value def _synchronize_inputs_to_config(self) -> None: """Capture current input/select values before saving.""" widgets = list(self.query(Input)) + list(self.query(Select)) for widget in widgets: widget_id = widget.id if not widget_id or widget_id not in self._input_id_map: continue if isinstance(widget, Select): if widget.value == Select.BLANK: continue value = widget.value else: value = widget.value self._update_config_value(widget_id, value) def _remove_alldebrid_store_entry(self) -> bool: """Remove the mirrored AllDebrid store entry that would recreate the provider.""" store_block = self.config_data.get("store") if not isinstance(store_block, dict): return False debrid = store_block.get("debrid") if not isinstance(debrid, dict): return False removed = False for key in list(debrid.keys()): if str(key or "").strip().lower() == "all-debrid": debrid.pop(key, None) removed = True if not debrid: store_block.pop("debrid", None) if not store_block: self.config_data.pop("store", None) return removed def _get_matrix_provider_block(self) -> Dict[str, Any]: providers = self.config_data.get("provider") if not isinstance(providers, dict): return {} block = providers.get("matrix") return block if isinstance(block, dict) else {} def _parse_matrix_rooms_value(self) -> List[str]: block = self._get_matrix_provider_block() raw = block.get("rooms") if isinstance(raw, (list, tuple, set)): return [str(item).strip() for item in raw if str(item).strip()] text = str(raw or "").strip() if not text: return [] return [segment for segment in re.split(r"[\s,]+", text) if segment] def _request_matrix_test(self) -> None: if self._matrix_test_running: return self._synchronize_inputs_to_config() # Quick client-side pre-check before attempting to save/test to provide # immediate guidance when required fields are missing. try: matrix_block = self.config_data.get("provider", {}).get("matrix", {}) hs = matrix_block.get("homeserver") token = matrix_block.get("access_token") if not hs or not token: if self._matrix_status: self._matrix_status.update("Matrix test skipped: please set both 'homeserver' and 'access_token' before testing.") return except Exception: pass if self._matrix_status: self._matrix_status.update("Saving configuration before testing…") changed = count_changed_entries(self.config_data) try: entries = save_config(self.config_data) except Exception as exc: if self._matrix_status: self._matrix_status.update(f"Saving configuration failed: {exc}") self._matrix_test_running = False return self.config_data = reload_config() if self._matrix_status: self._matrix_status.update(f"Saved configuration ({changed} change(s)) to {db.db_path.name}. Testing Matrix connection…") self._matrix_test_running = True self._matrix_test_background() @work(thread=True) def _matrix_test_background(self) -> None: try: from Provider.matrix import Matrix provider = Matrix(self.config_data) rooms = provider.list_rooms() self.app.call_from_thread(self._matrix_test_result, True, rooms, None) except Exception as exc: # Log full traceback for diagnostics but present a concise, actionable # message to the user in the UI. tb = traceback.format_exc() try: debug(f"[matrix] Test connection failed: {exc}\n{tb}") except Exception: pass msg = str(exc) or "Matrix test failed" m_lower = msg.lower() if "auth" in m_lower or "authentication" in m_lower: msg = msg + ". Please verify your access token and try again." elif "homeserver" in m_lower or "missing" in m_lower: msg = msg + ". Check your homeserver URL (include https://)." else: msg = msg + " (see logs for details)" self.app.call_from_thread(self._matrix_test_result, False, [], msg) def _get_cached_matrix_rooms(self) -> List[Dict[str, Any]]: """Return cached rooms stored in the provider config (normalized). The config value can be a list/dict, a JSON string, or a Python literal string (repr). This method normalizes the input and returns a list of dicts containing 'room_id' and 'name'. Malformed inputs are ignored. """ try: block = self._get_matrix_provider_block() raw = block.get("cached_rooms") if not raw: return [] # If it's already a list or tuple, normalize each element if isinstance(raw, (list, tuple)): return self._normalize_cached_raw(list(raw)) # If it's a dict, wrap and normalize if isinstance(raw, dict): return self._normalize_cached_raw([raw]) # If it's a string, try JSON -> ast.literal_eval -> regex ID extraction if isinstance(raw, str): s = str(raw).strip() if not s: return [] # Try JSON first (strict) try: import json parsed = json.loads(s) if isinstance(parsed, (list, tuple, dict)): return self._normalize_cached_raw(parsed if isinstance(parsed, (list, tuple)) else [parsed]) except Exception: pass # Try Python literal eval (accepts single quotes, repr-style lists) try: import ast parsed = ast.literal_eval(s) if isinstance(parsed, (list, tuple, dict)): return self._normalize_cached_raw(parsed if isinstance(parsed, (list, tuple)) else [parsed]) except Exception: pass # Try to extract dict-like pairs for room_id/name when the string looks like # a Python repr or partial dict fragment (e.g., "'room_id': '!r1', 'name': 'Room'" try: import re pair_pat = re.compile(r"[\"']room_id[\"']\s*:\s*[\"'](?P[^\"']+)[\"']\s*,\s*[\"']name[\"']\s*:\s*[\"'](?P[^\"']+)[\"']") pairs = [m.groupdict() for m in pair_pat.finditer(s)] if pairs: out = [] for p in pairs: rid = str(p.get("id") or "").strip() name = str(p.get("name") or "").strip() if rid: out.append({"room_id": rid, "name": name}) if out: return out # As a last resort, extract candidate room ids via regex (look for leading '!') ids = re.findall(r"![-A-Za-z0-9._=]+(?::[-A-Za-z0-9._=]+)?", s) if ids: return [{"room_id": rid, "name": ""} for rid in ids] except Exception: pass return [] except Exception: pass return [] def _normalize_cached_raw(self, parsed: List[Any]) -> List[Dict[str, Any]]: out: List[Dict[str, Any]] = [] for it in parsed: try: if isinstance(it, dict): rid = str(it.get("room_id") or "").strip() name = str(it.get("name") or "").strip() if rid: out.append({"room_id": rid, "name": name}) elif isinstance(it, str): s = str(it or "").strip() if s: out.append({"room_id": s, "name": ""}) except Exception: continue return out def _request_matrix_load(self) -> None: """Save current config and request a background load of joined rooms. This replaces the old "Choose default rooms" popup and instead refreshes the inline default rooms list and caches the results to config. """ if self._matrix_test_running: return self._synchronize_inputs_to_config() # Quick client-side pre-check for required fields try: matrix_block = self.config_data.get("provider", {}).get("matrix", {}) hs = matrix_block.get("homeserver") token = matrix_block.get("access_token") if not hs or not token: if self._matrix_status: self._matrix_status.update("Load skipped: please set both 'homeserver' and 'access_token' before loading rooms.") return except Exception: pass if self._matrix_status: self._matrix_status.update("Saving configuration before loading rooms…") changed = count_changed_entries(self.config_data) try: entries = save_config(self.config_data) except Exception as exc: if self._matrix_status: self._matrix_status.update(f"Saving configuration failed: {exc}") self._matrix_test_running = False return self.config_data = reload_config() if self._matrix_status: self._matrix_status.update(f"Saved configuration ({changed} change(s)) to {db.db_path.name}. Loading Matrix rooms…") self._matrix_test_running = True self._matrix_load_background() @work(thread=True) def _matrix_load_background(self) -> None: try: from Provider.matrix import Matrix provider = Matrix(self.config_data) rooms = provider.list_rooms() self.app.call_from_thread(self._matrix_load_result, True, rooms, None) except Exception as exc: tb = traceback.format_exc() try: debug(f"[matrix] Load rooms failed: {exc}\n{tb}") except Exception: pass msg = str(exc) or "Matrix load failed" if "auth" in msg.lower(): msg = msg + ". Please verify your access token and try again." self.app.call_from_thread(self._matrix_load_result, False, [], msg) def _matrix_load_result(self, success: bool, rooms: List[Dict[str, Any]], error: Optional[str]) -> None: # Called on the main thread via call_from_thread self._matrix_test_running = False if not success: full_msg = f"Matrix load failed: {error or '(error)'}" if self._matrix_status: self._matrix_status.update(full_msg) try: self.notify(full_msg, severity="error", timeout=8) except Exception: pass return # Populate inline list try: self._render_matrix_rooms_inline(rooms) except Exception: pass # Persist cached rooms so they are available on next editor open try: matrix_block = self.config_data.setdefault("provider", {}).setdefault("matrix", {}) matrix_block["cached_rooms"] = rooms # Schedule a background save of the config (non-blocking) try: self.save_all() except Exception: # Fallback to direct save when save_all is unavailable (tests) try: save_config(self.config_data) except Exception: pass if self._matrix_status: self._matrix_status.update(f"Loaded and cached {len(rooms)} room(s).") try: self.notify(f"Loaded {len(rooms)} rooms and cached the results", timeout=5) except Exception: pass except Exception: pass def _open_matrix_room_picker( self, *, prefetched_rooms: Optional[List[Dict[str, Any]]] = None, ) -> None: existing = self._parse_matrix_rooms_value() self.app.push_screen( MatrixRoomPicker( self.config_data, existing=existing, rooms=prefetched_rooms, ), callback=self.on_matrix_rooms_selected, ) def _render_matrix_rooms_inline(self, rooms: List[Dict[str, Any]]) -> None: """ Populate the inline matrix rooms ListView with checkboxes based on the list of rooms returned from a successful test. If the inline ListView is not present in the current editor view, fall back to opening the MatrixRoomPicker popup with the results. """ try: inline_list = self._matrix_inline_list or self.query_one("#matrix-rooms-inline", ListView) except Exception: inline_list = None if inline_list is None: # Inline view isn't available in this context; cache the rooms and persist try: matrix_block = self.config_data.setdefault("provider", {}).setdefault("matrix", {}) matrix_block["cached_rooms"] = rooms try: self.save_all() except Exception: try: save_config(self.config_data) except Exception: pass if self._matrix_status: self._matrix_status.update(f"Loaded {len(rooms)} rooms (cached)") try: self.notify(f"Loaded {len(rooms)} rooms and cached the results", timeout=5) except Exception: pass except Exception: pass return # Clear current entries for child in list(inline_list.children): child.remove() self._matrix_inline_checkbox_map.clear() self._matrix_inline_list = inline_list # Determine existing selection from current config (so saved defaults are pre-selected) existing = set(self._parse_matrix_rooms_value()) if not rooms: if self._matrix_status: self._matrix_status.update("Matrix returned no rooms.") try: save_btn = self.query_one("#matrix-inline-save", Button) save_btn.disabled = True except Exception: pass return any_selected = False # Filter and normalize rooms to avoid malformed cache entries or split-word artifacts. normalized: List[Dict[str, str]] = [] for r in rooms: try: rid = str(r.get("room_id") or "").strip() name = str(r.get("name") or "").strip() except Exception: continue # Ignore obviously malformed tokens coming from bad caching/parsing le_rid = rid.lower() le_name = name.lower() if "room_id" in le_rid or "room_id" in le_name: continue # Require a valid room id (Matrix ids usually start with '!' and often contain ':') if not rid or (not rid.startswith("!") and ":" not in rid): # Skip entries without a sensible ID (we rely on IDs for saving) continue normalized.append({"room_id": rid, "name": name}) for idx, room in enumerate(normalized): room_id = room.get("room_id") or "" name = room.get("name") or "" checkbox_id = f"matrix-inline-room-{idx}" label_text = name or room_id or "Matrix Room" checked = bool(room_id and room_id in existing) if checked: any_selected = True from textual.widgets import Checkbox as _Checkbox # local import to avoid top-level change checkbox = _Checkbox(label_text, id=checkbox_id, value=checked) self._matrix_inline_checkbox_map[checkbox_id] = room_id inline_list.mount(ListItem(checkbox, classes="matrix-room-row")) if self._matrix_status: self._matrix_status.update("Loaded rooms. Select one or more and save.") try: save_btn = self.query_one("#matrix-inline-save", Button) save_btn.disabled = not any_selected except Exception: pass def _resolve_matrix_rooms_by_ids(self, ids: Iterable[str]) -> List[Dict[str, Any]]: """ Resolve room display names for a list of room IDs using the Matrix provider. Returns a list of dictionaries with keys 'room_id' and 'name' on success, or an empty list on failure. """ try: ids_list = [str(i).strip() for i in ids if str(i).strip()] except Exception: return [] if not ids_list: return [] # Only attempt network resolution if homeserver + token are present block = self._get_matrix_provider_block() hs = block.get("homeserver") token = block.get("access_token") if not hs or not token: return [] try: from Provider.matrix import Matrix provider = Matrix(self.config_data) rooms = provider.list_rooms(room_ids=ids_list) return rooms or [] except Exception as exc: try: debug(f"[config] failed to resolve matrix room names: {exc}") except Exception: pass return [] def on_matrix_rooms_selected(self, result: Any = None) -> None: if not isinstance(result, list): if self._matrix_status: self._matrix_status.update("Room selection cancelled.") return cleaned: List[str] = [] for item in result: candidate = str(item or "").strip() if candidate and candidate not in cleaned: cleaned.append(candidate) if not cleaned: if self._matrix_status: self._matrix_status.update("No default rooms were saved.") return matrix_block = self.config_data.setdefault("provider", {}).setdefault("matrix", {}) matrix_block["rooms"] = ", ".join(cleaned) changed = count_changed_entries(self.config_data) try: entries = save_config(self.config_data) except Exception as exc: if self._matrix_status: self._matrix_status.update(f"Saving default rooms failed: {exc}") return self.config_data = reload_config() if self._matrix_status: status = f"Saved {len(cleaned)} default room(s) ({changed} change(s)) to {db.db_path.name}." self._matrix_status.update(status) self.refresh_view() @on(Input.Changed) @on(Checkbox.Changed) def on_checkbox_changed(self, event: Checkbox.Changed) -> None: # Only respond to inline matrix checkboxes try: cbid = event.checkbox.id except Exception: cbid = None if not cbid or cbid not in self._matrix_inline_checkbox_map: return any_selected = False for checkbox_id in self._matrix_inline_checkbox_map.keys(): try: cb = self.query_one(f"#{checkbox_id}", Checkbox) if cb.value: any_selected = True break except Exception: continue try: self.query_one("#matrix-inline-save", Button).disabled = not any_selected except Exception: pass def on_input_changed(self, event: Input.Changed) -> None: if event.input.id: self._update_config_value(event.input.id, event.value) @on(Select.Changed) def on_select_changed(self, event: Select.Changed) -> None: if event.select.id: # Select value can be the 'Select.BLANK' sentinel if event.value != Select.BLANK: self._update_config_value(event.select.id, event.value) def save_all(self) -> int: self._synchronize_inputs_to_config() # Compute change count prior to persisting so callers can report the number of # actual changes rather than the total number of rows written to the DB. changed = count_changed_entries(self.config_data) # Snapshot config for background save snapshot = deepcopy(self.config_data) # Schedule the save to run on a background worker so the UI doesn't block. try: # Prefer Textual's worker when running inside the app self._save_background(snapshot, changed) except Exception: # Fallback: start a plain thread that runs the underlying task body import threading func = getattr(self._save_background, "__wrapped__", None) if func: thread = threading.Thread(target=lambda: func(self, snapshot, changed), daemon=True) else: thread = threading.Thread(target=lambda: self._save_background(snapshot, changed), daemon=True) thread.start() # Ensure DB path indicator is current and show saving status self._db_path = str(db.db_path) try: self.query_one("#config-db-path", Static).update(self._db_path) except Exception: pass try: self.query_one("#config-last-save", Static).update("Last saved: (saving...)") except Exception: pass log(f"ConfigModal scheduled save (changed={changed})") return changed @work(thread=True) def _save_background(self, cfg: Dict[str, Any], changed: int) -> None: try: saved_entries = save_config(cfg) try: appobj = self.app except Exception: appobj = None if appobj and hasattr(appobj, 'call_from_thread'): appobj.call_from_thread(self._on_save_complete, True, None, changed, saved_entries) else: # If no app (e.g., running under tests), call completion directly self._on_save_complete(True, None, changed, saved_entries) except ConfigSaveConflict as exc: try: appobj = self.app except Exception: appobj = None if appobj and hasattr(appobj, 'call_from_thread'): appobj.call_from_thread(self._on_save_complete, False, str(exc), changed, 0) else: self._on_save_complete(False, str(exc), changed, 0) except Exception as exc: try: appobj = self.app except Exception: appobj = None if appobj and hasattr(appobj, 'call_from_thread'): appobj.call_from_thread(self._on_save_complete, False, str(exc), changed, 0) else: self._on_save_complete(False, str(exc), changed, 0) def _on_save_complete(self, success: bool, error: Optional[str], changed: int, saved_entries: int) -> None: # Safely determine whether a Textual app context is available. Accessing # `self.app` can raise when not running inside the TUI; handle that. try: appobj = self.app except Exception: appobj = None if success: try: self.config_data = reload_config() except Exception: pass # Update last-saved label with file timestamp for visibility db_mtime = None try: db_mtime = db.db_path.stat().st_mtime db_mtime = __import__('datetime').datetime.utcfromtimestamp(db_mtime).isoformat() + "Z" except Exception: db_mtime = None if appobj: try: if changed == 0: label_text = f"Last saved: (no changes)" else: label_text = f"Last saved: {changed} change(s) at {db_mtime or '(unknown)'}" try: self.query_one("#config-last-save", Static).update(label_text) except Exception: pass except Exception: pass try: self.refresh_view() except Exception: pass try: self.notify(f"Configuration saved ({changed} change(s)) to {db.db_path.name}", timeout=5) except Exception: pass else: # No TUI available; log instead of updating UI log(f"Configuration saved ({changed} change(s)) to {db.db_path.name}") log(f"ConfigModal saved {saved_entries} configuration entries (changed={changed})") else: # Save failed; notify via UI if available, otherwise log if appobj: try: self.notify(f"Save failed: {error}", severity="error", timeout=10) except Exception: pass try: self.config_data = reload_config() except Exception: pass try: self.refresh_view() except Exception: pass else: log(f"Save failed: {error}") def validate_current_editor(self) -> bool: """Ensure all required fields for the current item are filled.""" if not self.editing_item_name: return True item_type = str(self.editing_item_type or "") item_name = str(self.editing_item_name or "") required_keys = [] section = {} if item_type.startswith("store-"): stype = item_type.replace("store-", "") classes = _discover_store_classes() if stype in classes: required_keys = list(_required_keys_for(classes[stype])) section = self.config_data.get("store", {}).get(stype, {}).get(item_name, {}) elif item_type == "provider": from ProviderCore.registry import get_provider_class try: pcls = get_provider_class(item_name) if pcls: # Collect required keys from schema if hasattr(pcls, "config_schema") and callable(pcls.config_schema): for field_def in pcls.config_schema(): if field_def.get("required"): k = field_def.get("key") if k and k not in required_keys: required_keys.append(k) # Merge with legacy required keys for rk in pcls.required_config_keys(): if rk not in required_keys: required_keys.append(rk) except Exception: pass section = self.config_data.get("provider", {}).get(item_name, {}) elif item_type == "tool": try: import importlib mod = importlib.import_module(f"tool.{item_name}") if hasattr(mod, "config_schema") and callable(mod.config_schema): for field_def in mod.config_schema(): if field_def.get("required"): k = field_def.get("key") if k and k not in required_keys: required_keys.append(k) except Exception: pass section = self.config_data.get("tool", {}).get(item_name, {}) # Check required keys for rk in required_keys: # Case-insensitive lookup for the required key in the current section val = None rk_upper = rk.upper() for k, v in section.items(): if k.upper() == rk_upper: val = v break if not val or not str(val).strip(): self.notify(f"Required field '{rk}' cannot be blank", severity="error") return False return True