from textual.app import ComposeResult from textual.screen import ModalScreen from textual.containers import Container, Horizontal, Vertical, ScrollableContainer from textual.widgets import Static, Button, Input, Label, ListView, ListItem, Rule, OptionList, Footer, Select from textual import on from textual.message import Message from typing import Dict, Any, List, Optional import os import json from pathlib import Path from SYS.config import load_config, save_config, global_config from Store.registry import _discover_store_classes, _required_keys_for from ProviderCore.registry import list_providers from TUI.modalscreen.selection_modal import SelectionModal class ConfigModal(ModalScreen): """A modal for editing the configuration.""" BINDINGS = [ ("ctrl+v", "paste", "Paste"), ("ctrl+c", "copy", "Copy"), ] CSS = """ ConfigModal { align: center middle; background: $boost; } #config-container { width: 90%; height: 90%; background: $panel; border: thick $primary; padding: 1; } .section-title { background: $accent; color: $text; padding: 0 1; margin-bottom: 1; text-align: center; text-style: bold; height: 3; content-align: center middle; } #config-sidebar { width: 25%; border-right: solid $surface; } #config-content { width: 75%; padding: 1; } .config-field { margin-bottom: 1; height: auto; } .config-label { width: 100%; text-style: bold; color: $accent; } .field-row { height: 5; margin-bottom: 1; align: left middle; } .config-input { width: 1fr; } .paste-btn { width: 10; margin-left: 1; } #config-actions { height: 3; align: right middle; } .item-row { height: 5; margin-bottom: 1; padding: 0 1; border: solid $surface; } .item-label { width: 1fr; height: 3; content-align: left middle; } .item-row Button { width: 16; height: 3; } Button { margin: 0 1; } """ def __init__(self) -> None: super().__init__() # Load config from the workspace root (parent of SYS) workspace_root = Path(__file__).resolve().parent.parent.parent self.config_data = load_config(config_dir=workspace_root) self.current_category = "globals" self.editing_item_type = None # 'store' or 'provider' self.editing_item_name = None self.workspace_root = workspace_root self._button_id_map = {} self._input_id_map = {} def compose(self) -> ComposeResult: with Container(id="config-container"): yield Static("CONFIGURATION EDITOR", classes="section-title") with Horizontal(): with Vertical(id="config-sidebar"): yield Label("Categories", classes="config-label") with ListView(id="category-list"): yield ListItem(Label("Global Settings"), id="cat-globals") yield ListItem(Label("Networking"), id="cat-networking") yield ListItem(Label("Stores"), id="cat-stores") yield ListItem(Label("Providers"), id="cat-providers") with Vertical(id="config-content"): yield ScrollableContainer(id="fields-container") with Horizontal(id="config-actions"): yield Button("Save", variant="success", id="save-btn") yield Button("Add Store", variant="primary", id="add-store-btn") yield Button("Add Provider", variant="primary", id="add-provider-btn") yield Button("Add Net", variant="primary", id="add-net-btn") yield Button("Back", id="back-btn") yield Button("Close", variant="error", id="cancel-btn") def on_mount(self) -> None: self.query_one("#add-store-btn", Button).display = False self.query_one("#add-provider-btn", Button).display = False self.query_one("#add-net-btn", Button).display = False self.refresh_view() def refresh_view(self) -> None: container = self.query_one("#fields-container", ScrollableContainer) self._button_id_map.clear() self._input_id_map.clear() # Clear existing synchronously for child in list(container.children): child.remove() # Update visibility of buttons try: self.query_one("#add-store-btn", Button).display = (self.current_category == "stores" and self.editing_item_name is None) self.query_one("#add-provider-btn", Button).display = (self.current_category == "providers" and self.editing_item_name is None) self.query_one("#add-net-btn", Button).display = (self.current_category == "networking" and self.editing_item_name is None) self.query_one("#back-btn", Button).display = (self.editing_item_name is not None) self.query_one("#save-btn", Button).display = (self.editing_item_name is not None or self.current_category == "globals") except Exception: pass # We mount using call_after_refresh to ensure the removals are processed by Textual # before we try to mount new widgets with potentially duplicate IDs. def do_mount(): if self.editing_item_name: self.render_item_editor(container) elif self.current_category == "globals": self.render_globals(container) elif self.current_category == "networking": self.render_networking(container) elif self.current_category == "stores": self.render_stores(container) elif self.current_category == "providers": self.render_providers(container) self.call_after_refresh(do_mount) def render_globals(self, container: ScrollableContainer) -> None: container.mount(Label("General Configuration", classes="config-label")) # Get global schema schema_map = {f["key"].lower(): f for f in global_config()} existing_keys_lower = set() idx = 0 # Show fields defined in schema first for key_lower, field_def in schema_map.items(): existing_keys_lower.add(key_lower) label_text = field_def.get("label") or field_def["key"] choices = field_def.get("choices") # Find current value (case-insensitive) current_val = None found_key = field_def["key"] for k, v in self.config_data.items(): if k.lower() == key_lower: current_val = str(v) found_key = k break if current_val is None: current_val = str(field_def.get("default") or "") container.mount(Label(label_text)) inp_id = f"global-{idx}" self._input_id_map[inp_id] = found_key if choices: select_options = [(str(c), str(c)) for c in choices] if current_val not in [str(c) for c in choices]: select_options.insert(0, (current_val, current_val)) sel = Select(select_options, value=current_val, id=inp_id) container.mount(sel) else: row = Horizontal(classes="field-row") container.mount(row) row.mount(Input(value=current_val, id=inp_id, classes="config-input")) row.mount(Button("Paste", id=f"paste-{inp_id}", classes="paste-btn")) idx += 1 # Show any other top-level keys not in schema for k, v in self.config_data.items(): if not isinstance(v, dict) and not k.startswith("_"): if k.lower() in existing_keys_lower: continue inp_id = f"global-{idx}" self._input_id_map[inp_id] = k container.mount(Label(k)) row = Horizontal(classes="field-row") container.mount(row) row.mount(Input(value=str(v), id=inp_id, classes="config-input")) row.mount(Button("Paste", id=f"paste-{inp_id}", classes="paste-btn")) idx += 1 def render_networking(self, container: ScrollableContainer) -> None: container.mount(Label("ZeroTier Networks (local)", classes="config-label")) from API import zerotier as zt # Show whether we have an explicit authtoken available and its source try: token_src = zt._get_token_path() except Exception: token_src = None if token_src == "env": container.mount(Static("Auth: authtoken provided via env var (ZEROTIER_AUTH_TOKEN) — no admin required", classes="config-note")) elif token_src: container.mount(Static(f"Auth: authtoken file found: {token_src} — no admin required", classes="config-note")) else: container.mount(Static("Auth: authtoken not found in workspace; TUI may need admin to join networks", classes="config-warning")) try: local_nets = zt.list_networks() if not local_nets: container.mount(Static("No active ZeroTier networks found on this machine.")) else: for n in local_nets: row = Horizontal( Static(f"{n.name} [{n.id}] - {n.status}", classes="item-label"), Button("Leave", variant="error", id=f"zt-leave-{n.id}"), classes="item-row" ) container.mount(row) except Exception as exc: container.mount(Static(f"Error listing ZeroTier networks: {exc}")) container.mount(Rule()) container.mount(Label("Networking Services", classes="config-label")) net = self.config_data.get("networking", {}) if not net: container.mount(Static("No networking services configured.")) else: idx = 0 for ntype, conf in net.items(): edit_id = f"edit-net-{idx}" del_id = f"del-net-{idx}" self._button_id_map[edit_id] = ("edit", "networking", ntype) self._button_id_map[del_id] = ("del", "networking", ntype) idx += 1 row = Horizontal( Static(ntype, classes="item-label"), Button("Edit", id=edit_id), Button("Delete", variant="error", id=del_id), classes="item-row" ) container.mount(row) def render_stores(self, container: ScrollableContainer) -> None: container.mount(Label("Configured Stores", classes="config-label")) stores = self.config_data.get("store", {}) if not stores: container.mount(Static("No stores configured.")) else: # stores is structured as: {type: {name_key: config}} idx = 0 for stype, instances in stores.items(): if isinstance(instances, dict): for name_key, conf in instances.items(): # Use the name field from the config if it exists, otherwise use the key display_name = name_key if isinstance(conf, dict): display_name = ( conf.get("NAME") or conf.get("name") or conf.get("Name") or name_key ) edit_id = f"edit-store-{idx}" del_id = f"del-store-{idx}" self._button_id_map[edit_id] = ("edit", f"store-{stype}", name_key) self._button_id_map[del_id] = ("del", f"store-{stype}", name_key) idx += 1 row = Horizontal( Static(f"{display_name} ({stype})", classes="item-label"), Button("Edit", id=edit_id), Button("Delete", variant="error", id=del_id), classes="item-row" ) container.mount(row) def render_providers(self, container: ScrollableContainer) -> None: container.mount(Label("Configured Providers", classes="config-label")) providers = self.config_data.get("provider", {}) if not providers: container.mount(Static("No providers configured.")) else: for i, (name, _) in enumerate(providers.items()): edit_id = f"edit-provider-{i}" del_id = f"del-provider-{i}" self._button_id_map[edit_id] = ("edit", "provider", name) self._button_id_map[del_id] = ("del", "provider", name) row = Horizontal( Static(name, classes="item-label"), Button("Edit", id=edit_id), Button("Delete", variant="error", id=del_id), classes="item-row" ) container.mount(row) def render_item_editor(self, container: ScrollableContainer) -> None: item_type = str(self.editing_item_type or "") item_name = str(self.editing_item_name or "") provider_schema_map = {} # Parse item_type for store-{stype} or just provider if item_type.startswith("store-"): stype = item_type.replace("store-", "") container.mount(Label(f"Editing Store: {item_name} ({stype})", classes="config-label")) section = self.config_data.get("store", {}).get(stype, {}).get(item_name, {}) # Fetch Store schema classes = _discover_store_classes() if stype in classes: cls = classes[stype] if hasattr(cls, "config") and callable(cls.config): for field_def in cls.config(): k = field_def.get("key") if k: provider_schema_map[k.upper()] = field_def else: container.mount(Label(f"Editing {item_type.capitalize()}: {item_name}", classes="config-label")) section = self.config_data.get(item_type, {}).get(item_name, {}) # Fetch Provider schema if item_type == "provider": from ProviderCore.registry import get_provider_class try: pcls = get_provider_class(item_name) if pcls and hasattr(pcls, "config") and callable(pcls.config): for field_def in pcls.config(): k = field_def.get("key") if k: provider_schema_map[k.upper()] = field_def except Exception: pass # Fetch Networking schema if item_type == "networking": if item_name == "zerotier": schema = [ {"key": "api_key", "label": "ZeroTier Central API Token", "default": "", "secret": True}, {"key": "network_id", "label": "Network ID to Join", "default": ""}, ] for f in schema: provider_schema_map[f["key"].upper()] = f # Use columns for better layout of inputs with paste buttons container.mount(Label("Edit Settings")) # render_item_editor will handle the inputs for us if we set these # but wait, render_item_editor is called from refresh_view, not here. # actually we don't need to do anything else here because refresh_view calls render_item_editor # which now handles the paste buttons. # Show all existing keys existing_keys_upper = set() idx = 0 for k, v in section.items(): if k.startswith("_"): continue # Deduplicate keys case-insensitively (e.g. name vs NAME vs Name) k_upper = k.upper() if k_upper in existing_keys_upper: continue existing_keys_upper.add(k_upper) # Determine display props from schema label_text = k is_secret = False choices = None schema = provider_schema_map.get(k_upper) if schema: label_text = schema.get("label") or k if schema.get("required"): label_text += " *" if schema.get("secret"): is_secret = True choices = schema.get("choices") container.mount(Label(label_text)) inp_id = f"item-{idx}" self._input_id_map[inp_id] = k if choices: # Select takes a list of (label, value) tuples select_options = [(str(c), str(c)) for c in choices] # If current value not in choices, add it or stay blank current_val = str(v) if current_val not in [str(c) for c in choices]: select_options.insert(0, (current_val, current_val)) sel = Select(select_options, value=current_val, id=inp_id) container.mount(sel) else: row = Horizontal(classes="field-row") container.mount(row) inp = Input(value=str(v), id=inp_id, classes="config-input") if is_secret: inp.password = True row.mount(inp) row.mount(Button("Paste", id=f"paste-{inp_id}", classes="paste-btn")) idx += 1 # Add required/optional fields from schema that are missing for k_upper, field_def in provider_schema_map.items(): if k_upper not in existing_keys_upper: existing_keys_upper.add(k_upper) key = field_def["key"] label_text = field_def.get("label") or key if field_def.get("required"): label_text += " *" default_val = str(field_def.get("default") or "") choices = field_def.get("choices") container.mount(Label(label_text)) inp_id = f"item-{idx}" self._input_id_map[inp_id] = key if choices: select_options = [(str(c), str(c)) for c in choices] sel = Select(select_options, value=default_val, id=inp_id) container.mount(sel) else: row = Horizontal(classes="field-row") container.mount(row) inp = Input(value=default_val, id=inp_id, classes="config-input") if field_def.get("secret"): inp.password = True if field_def.get("placeholder"): inp.placeholder = field_def.get("placeholder") row.mount(inp) row.mount(Button("Paste", id=f"paste-{inp_id}", classes="paste-btn")) idx += 1 # If it's a store, we might have required keys (legacy check fallback) if item_type.startswith("store-"): stype = item_type.replace("store-", "") classes = _discover_store_classes() if stype in classes: required_keys = _required_keys_for(classes[stype]) for rk in required_keys: # Case-insensitive deduplication (fix path vs PATH) if rk.upper() not in existing_keys_upper: container.mount(Label(rk)) inp_id = f"item-{idx}" self._input_id_map[inp_id] = rk row = Horizontal(classes="field-row") container.mount(row) row.mount(Input(value="", id=inp_id, classes="config-input")) row.mount(Button("Paste", id=f"paste-{inp_id}", classes="paste-btn")) idx += 1 # If it's a provider, we might have required keys (legacy check fallback) if item_type == "provider": # 2. Legacy required_config_keys from ProviderCore.registry import get_provider_class try: pcls = get_provider_class(item_name) if pcls: required_keys = pcls.required_config_keys() for rk in required_keys: if rk.upper() not in existing_keys_upper: container.mount(Label(rk)) inp_id = f"item-{idx}" self._input_id_map[inp_id] = rk row = Horizontal(classes="field-row") row.mount(Input(value="", id=inp_id, classes="config-input")) row.mount(Button("Paste", id=f"paste-{inp_id}", classes="paste-btn")) container.mount(row) idx += 1 except Exception: pass def create_field(self, name: str, value: Any, id: str) -> Vertical: # This method is now unused - we mount labels and inputs directly v = Vertical(classes="config-field") return v def on_list_view_selected(self, event: ListView.Selected) -> None: if not event.item: return if event.item.id == "cat-globals": self.current_category = "globals" elif event.item.id == "cat-networking": self.current_category = "networking" elif event.item.id == "cat-stores": self.current_category = "stores" elif event.item.id == "cat-providers": self.current_category = "providers" self.editing_item_name = None self.editing_item_type = None self.refresh_view() def on_button_pressed(self, event: Button.Pressed) -> None: bid = event.button.id if not bid: return if bid == "cancel-btn": self.dismiss() elif bid == "back-btn": self.editing_item_name = None self.editing_item_type = None self.refresh_view() elif bid == "save-btn": if not self.validate_current_editor(): return try: # If we are editing networking.zerotier, check if network_id changed and join it if self.editing_item_type == "networking" and self.editing_item_name == "zerotier": old_id = str(self.config_data.get("networking", {}).get("zerotier", {}).get("network_id") or "").strip() self.save_all() new_id = str(self.config_data.get("networking", {}).get("zerotier", {}).get("network_id") or "").strip() if new_id and new_id != old_id: from API import zerotier as zt try: if zt.join_network(new_id): self.notify(f"Joined ZeroTier network {new_id}") else: self.notify(f"Config saved, but failed to join network {new_id}", severity="warning") except Exception as exc: self.notify(f"Join error: {exc}", severity="error") else: self.save_all() self.notify("Configuration saved!") # Return to the main list view within the current category self.editing_item_name = None self.editing_item_type = None self.refresh_view() except Exception as exc: self.notify(f"Save failed: {exc}", severity="error", timeout=10) elif bid.startswith("zt-leave-"): nid = bid.replace("zt-leave-", "") from API import zerotier as zt try: zt.leave_network(nid) self.notify(f"Left ZeroTier network {nid}") self.refresh_view() except Exception as exc: self.notify(f"Failed to leave: {exc}", severity="error") elif bid in self._button_id_map: action, itype, name = self._button_id_map[bid] if action == "edit": self.editing_item_type = itype self.editing_item_name = name self.refresh_view() elif action == "del": if itype.startswith("store-"): stype = itype.replace("store-", "") if "store" in self.config_data and stype in self.config_data["store"]: if name in self.config_data["store"][stype]: del self.config_data["store"][stype][name] elif itype == "provider": if "provider" in self.config_data and name in self.config_data["provider"]: del self.config_data["provider"][name] elif itype == "networking": if "networking" in self.config_data and name in self.config_data["networking"]: del self.config_data["networking"][name] self.refresh_view() elif bid == "add-store-btn": all_classes = _discover_store_classes() options = [] for stype, cls in all_classes.items(): if hasattr(cls, "config") and callable(cls.config): try: if cls.config(): options.append(stype) except Exception: pass self.app.push_screen(SelectionModal("Select Store Type", options), callback=self.on_store_type_selected) elif bid == "add-provider-btn": provider_names = list(list_providers().keys()) options = [] from ProviderCore.registry import get_provider_class for ptype in provider_names: pcls = get_provider_class(ptype) if pcls and hasattr(pcls, "config") and callable(pcls.config): try: if pcls.config(): options.append(ptype) except Exception: pass self.app.push_screen(SelectionModal("Select Provider Type", options), callback=self.on_provider_type_selected) elif bid == "add-net-btn": options = ["zerotier"] self.app.push_screen(SelectionModal("Select Networking Service", options), callback=self.on_net_type_selected) elif bid.startswith("paste-"): # Programmatic paste button target_id = bid.replace("paste-", "") try: inp = self.query_one(f"#{target_id}", Input) self.focus_and_paste(inp) except Exception: pass async def focus_and_paste(self, inp: Input) -> None: if hasattr(self.app, "paste_from_clipboard"): text = await self.app.paste_from_clipboard() if text: # Replace selection or append inp.value = str(inp.value) + text inp.focus() self.notify("Pasted from clipboard") else: self.notify("Clipboard not supported in this terminal", severity="warning") async def action_paste(self) -> None: focused = self.focused if isinstance(focused, Input): await self.focus_and_paste(focused) async def action_copy(self) -> None: focused = self.focused if isinstance(focused, Input) and focused.value: if hasattr(self.app, "copy_to_clipboard"): self.app.copy_to_clipboard(str(focused.value)) self.notify("Copied to clipboard") else: self.notify("Clipboard not supported in this terminal", severity="warning") def on_store_type_selected(self, stype: str) -> None: if not stype: return if stype == "zerotier": # Push a discovery wizard from TUI.modalscreen.selection_modal import SelectionModal from API import zerotier as zt # 1. Choose Network joined = zt.list_networks() if not joined: self.notify("Error: Join a ZeroTier network first in 'Networking'", severity="error") return net_options = [f"{n.name or 'Network'} ({n.id})" for n in joined] def on_net_selected(net_choice: str): if not net_choice: return net_id = net_choice.split("(")[-1].rstrip(")") # 2. Host or Connect? def on_mode_selected(mode: str): if not mode: return if mode == "Host (Share a local store)": # 3a. Select Local Store to Share local_stores = [] for s_type, s_data in self.config_data.get("store", {}).items(): if s_type == "zerotier": continue for s_name in s_data.keys(): local_stores.append(f"{s_name} ({s_type})") if not local_stores: self.notify("No local stores available to share.", severity="error") return def on_share_selected(share_choice: str): if not share_choice: return share_name = share_choice.split(" (")[0] # Update networking config if "networking" not in self.config_data: self.config_data["networking"] = {} zt_net = self.config_data["networking"].setdefault("zerotier", {}) zt_net["serve"] = share_name zt_net["network_id"] = net_id if not zt_net.get("port"): zt_net["port"] = "999" try: self.save_all() from SYS.background_services import ensure_zerotier_server_running ensure_zerotier_server_running() self.notify(f"ZeroTier auto-saved: Sharing '{share_name}' on network {net_id}") except Exception as e: self.notify(f"Auto-save failed: {e}", severity="error") self.refresh_view() self.app.push_screen(SelectionModal("Select Local Store to Share", local_stores), callback=on_share_selected) else: # 3b. Connect to Remote Peer # Discover Peers (Port 999 only) central_token = self.config_data.get("networking", {}).get("zerotier", {}).get("api_key") self.notify(f"Scanning Network {net_id} for peers...") try: probes = zt.discover_services_on_network(net_id, ports=[999], api_token=central_token) except Exception as e: self.notify(f"Discovery error: {e}", severity="error") return if not probes: self.notify("No peers found on port 999. Use manual setup.", severity="warning") # Create empty template new_name = f"zt_remote" if "store" not in self.config_data: self.config_data["store"] = {} store_cfg = self.config_data["store"].setdefault("zerotier", {}) store_cfg[new_name] = { "NAME": new_name, "NETWORK_ID": net_id, "HOST": "", "PORT": "999", "SERVICE": "remote" } try: self.save_all() self.notify("ZeroTier manual template created.") except Exception as e: self.notify(f"Auto-save failed: {e}", severity="error") self.editing_item_type = "store-zerotier" self.editing_item_name = new_name self.refresh_view() return peer_options = [] for p in probes: peer_name = "Unnamed Peer" if isinstance(p.payload, dict): peer_name = p.payload.get("name") or p.payload.get("NAME") or peer_name peer_options.append(f"{p.address} ({peer_name})") def on_peer_selected(peer_choice: str): if not peer_choice: return p_addr = peer_choice.split(" ")[0] match = next((p for p in probes if p.address == p_addr), None) new_name = f"zt_{p_addr.replace('.', '_')}" if "store" not in self.config_data: self.config_data["store"] = {} store_cfg = self.config_data["store"].setdefault("zerotier", {}) new_config = { "NAME": new_name, "NETWORK_ID": net_id, "HOST": p_addr, "PORT": "999", "SERVICE": "remote" } if match and match.service_hint == "hydrus": new_config["SERVICE"] = "hydrus" new_config["PORT"] = "45869" store_cfg[new_name] = new_config try: self.save_all() self.notify(f"ZeroTier auto-saved: Store '{new_name}' added.") except Exception as e: self.notify(f"Auto-save failed: {e}", severity="error") self.editing_item_type = "store-zerotier" self.editing_item_name = new_name self.refresh_view() self.app.push_screen(SelectionModal("Select Remote Peer", peer_options), callback=on_peer_selected) self.app.push_screen(SelectionModal("ZeroTier Mode", ["Host (Share a local store)", "Connect (Use a remote store)"]), callback=on_mode_selected) self.app.push_screen(SelectionModal("Select ZeroTier Network", net_options), callback=on_net_selected) return new_name = f"new_{stype}" if "store" not in self.config_data: self.config_data["store"] = {} if stype not in self.config_data["store"]: self.config_data["store"][stype] = {} # Default config for the new store new_config = {"NAME": new_name} classes = _discover_store_classes() if stype in classes: cls = classes[stype] # Use schema for defaults if present if hasattr(cls, "config") and callable(cls.config): for field_def in cls.config(): key = field_def.get("key") if key: val = field_def.get("default", "") # Don't override NAME if we already set it to new_stype if key.upper() == "NAME": continue new_config[key] = val else: # Fallback to required keys list required = _required_keys_for(cls) for rk in required: if rk.upper() != "NAME": new_config[rk] = "" self.config_data["store"][stype][new_name] = new_config self.editing_item_type = f"store-{stype}" self.editing_item_name = new_name self.refresh_view() def on_provider_type_selected(self, ptype: str) -> None: if not ptype: return if "provider" not in self.config_data: self.config_data["provider"] = {} # For providers, they are usually top-level entries in 'provider' dict if ptype not in self.config_data["provider"]: from ProviderCore.registry import get_provider_class try: pcls = get_provider_class(ptype) new_config = {} if pcls: # Use schema for defaults if hasattr(pcls, "config") and callable(pcls.config): for field_def in pcls.config(): key = field_def.get("key") if key: new_config[key] = field_def.get("default", "") else: # Fallback to legacy required keys required = pcls.required_config_keys() for rk in required: new_config[rk] = "" self.config_data["provider"][ptype] = new_config except Exception: self.config_data["provider"][ptype] = {} self.editing_item_type = "provider" self.editing_item_name = ptype self.refresh_view() def on_net_type_selected(self, ntype: str) -> None: if not ntype: return self.editing_item_type = "networking" self.editing_item_name = ntype # Ensure it exists in config_data net = self.config_data.setdefault("networking", {}) if ntype not in net: net[ntype] = {} self.refresh_view() def _update_config_value(self, widget_id: str, value: Any) -> None: if widget_id not in self._input_id_map: return key = self._input_id_map[widget_id] if widget_id.startswith("global-"): self.config_data[key] = value elif widget_id.startswith("item-") and self.editing_item_name: it = str(self.editing_item_type or "") inm = str(self.editing_item_name or "") # Handle nested store structure if it.startswith("store-"): stype = it.replace("store-", "") if "store" not in self.config_data: self.config_data["store"] = {} if stype not in self.config_data["store"]: self.config_data["store"][stype] = {} if inm not in self.config_data["store"][stype]: self.config_data["store"][stype][inm] = {} self.config_data["store"][stype][inm][key] = value else: # Provider or other top-level sections if it not in self.config_data: self.config_data[it] = {} if inm not in self.config_data[it]: self.config_data[it][inm] = {} self.config_data[it][inm][key] = value @on(Input.Changed) def on_input_changed(self, event: Input.Changed) -> None: if event.input.id: self._update_config_value(event.input.id, event.value) @on(Select.Changed) def on_select_changed(self, event: Select.Changed) -> None: if event.select.id: # Select value can be the 'Select.BLANK' sentinel if event.value != Select.BLANK: self._update_config_value(event.select.id, event.value) def save_all(self) -> None: save_config(self.config_data, config_dir=self.workspace_root) def validate_current_editor(self) -> bool: """Ensure all required fields for the current item are filled.""" if not self.editing_item_name: return True item_type = str(self.editing_item_type or "") item_name = str(self.editing_item_name or "") required_keys = [] section = {} if item_type.startswith("store-"): stype = item_type.replace("store-", "") classes = _discover_store_classes() if stype in classes: required_keys = list(_required_keys_for(classes[stype])) section = self.config_data.get("store", {}).get(stype, {}).get(item_name, {}) elif item_type == "provider": from ProviderCore.registry import get_provider_class try: pcls = get_provider_class(item_name) if pcls: # Collect required keys from schema if hasattr(pcls, "config") and callable(pcls.config): for field_def in pcls.config(): if field_def.get("required"): k = field_def.get("key") if k and k not in required_keys: required_keys.append(k) # Merge with legacy required keys for rk in pcls.required_config_keys(): if rk not in required_keys: required_keys.append(rk) except Exception: pass section = self.config_data.get("provider", {}).get(item_name, {}) # Check required keys for rk in required_keys: # Case-insensitive lookup for the required key in the current section val = None rk_upper = rk.upper() for k, v in section.items(): if k.upper() == rk_upper: val = v break if not val or not str(val).strip(): self.notify(f"Required field '{rk}' cannot be blank", severity="error") return False return True