diff --git a/CLI.py b/CLI.py index fb59e6b..4e680ad 100644 --- a/CLI.py +++ b/CLI.py @@ -780,9 +780,9 @@ class WorkerStages: class CmdletIntrospection: @staticmethod - def cmdlet_names() -> List[str]: + def cmdlet_names(force: bool = False) -> List[str]: try: - return list_cmdlet_names() or [] + return list_cmdlet_names(force=force) or [] except Exception: return [] @@ -796,11 +796,11 @@ class CmdletIntrospection: return [] @staticmethod - def store_choices(config: Dict[str, Any]) -> List[str]: + def store_choices(config: Dict[str, Any], force: bool = False) -> List[str]: try: # Use the cached startup check from SharedArgs from cmdlet._shared import SharedArgs - return SharedArgs.get_store_choices(config) + return SharedArgs.get_store_choices(config, force=force) except Exception: return [] @@ -810,12 +810,13 @@ class CmdletIntrospection: cmd_name: str, arg_name: str, config: Dict[str, - Any]) -> List[str]: + Any], + force: bool = False) -> List[str]: try: normalized_arg = (arg_name or "").lstrip("-").strip().lower() if normalized_arg in ("storage", "store"): - backends = cls.store_choices(config) + backends = cls.store_choices(config, force=force) if backends: return backends @@ -923,6 +924,9 @@ class CmdletCompleter(Completer): document: Document, complete_event ): # type: ignore[override] + # Refresh cmdlet names from introspection to pick up dynamic updates + self.cmdlet_names = CmdletIntrospection.cmdlet_names(force=True) + text = document.text_before_cursor tokens = text.split() ends_with_space = bool(text) and text[-1].isspace() @@ -1031,7 +1035,8 @@ class CmdletCompleter(Completer): choices = CmdletIntrospection.arg_choices( cmd_name=cmd_name, arg_name=prev_token, - config=config + config=config, + force=True ) if choices: choice_list = choices diff --git a/SYS/cmdlet_catalog.py b/SYS/cmdlet_catalog.py index 51ff176..edc865a 100644 --- a/SYS/cmdlet_catalog.py +++ b/SYS/cmdlet_catalog.py @@ -43,15 +43,15 @@ def _get_registry() -> Dict[str, Any]: return getattr(pkg, "REGISTRY", {}) or {} -def ensure_registry_loaded() -> None: - """Ensure native commands are registered into REGISTRY (idempotent).""" +def ensure_registry_loaded(force: bool = False) -> None: + """Ensure native commands are registered into REGISTRY (idempotent unless force=True).""" pkg = _get_cmdlet_package() if pkg is None: return ensure_fn = getattr(pkg, "ensure_cmdlet_modules_loaded", None) if callable(ensure_fn): try: - ensure_fn() + ensure_fn(force=force) except Exception: pass @@ -171,9 +171,11 @@ def get_cmdlet_metadata( } -def list_cmdlet_metadata(config: Optional[Dict[str, Any]] = None) -> Dict[str, Dict[str, Any]]: +def list_cmdlet_metadata( + force: bool = False, config: Optional[Dict[str, Any]] = None +) -> Dict[str, Dict[str, Any]]: """Collect metadata for all registered cmdlet keyed by canonical name.""" - ensure_registry_loaded() + ensure_registry_loaded(force=force) entries: Dict[str, Dict[str, Any]] = {} registry = _get_registry() for reg_name in registry.keys(): @@ -238,11 +240,13 @@ def list_cmdlet_metadata(config: Optional[Dict[str, Any]] = None) -> Dict[str, D def list_cmdlet_names( - include_aliases: bool = True, config: Optional[Dict[str, Any]] = None + include_aliases: bool = True, + force: bool = False, + config: Optional[Dict[str, Any]] = None, ) -> List[str]: """Return sorted cmdlet names (optionally including aliases).""" - ensure_registry_loaded() - entries = list_cmdlet_metadata(config=config) + ensure_registry_loaded(force=force) + entries = list_cmdlet_metadata(force=force, config=config) names = set() for meta in entries.values(): names.add(meta.get("name", "")) diff --git a/SYS/config.py b/SYS/config.py index 802a7ef..2ee30f0 100644 --- a/SYS/config.py +++ b/SYS/config.py @@ -280,7 +280,14 @@ def _serialize_conf(config: Dict[str, Any]) -> str: lines.append("") lines.append(f"[store={subtype}]") lines.append(f"name={_format_conf_value(name)}") + + # Deduplicate keys case-insensitively and skip "name" + seen_keys = {"NAME", "name"} for k in sorted(block.keys()): + k_upper = k.upper() + if k_upper in seen_keys: + continue + seen_keys.add(k_upper) lines.append(f"{k}={_format_conf_value(block.get(k))}") # Provider blocks @@ -292,7 +299,13 @@ def _serialize_conf(config: Dict[str, Any]) -> str: continue lines.append("") lines.append(f"[provider={prov}]") + + seen_keys = set() for k in sorted(block.keys()): + k_upper = k.upper() + if k_upper in seen_keys: + continue + seen_keys.add(k_upper) lines.append(f"{k}={_format_conf_value(block.get(k))}") # Tool blocks @@ -304,7 +317,13 @@ def _serialize_conf(config: Dict[str, Any]) -> str: continue lines.append("") lines.append(f"[tool={name}]") + + seen_keys = set() for k in sorted(block.keys()): + k_upper = k.upper() + if k_upper in seen_keys: + continue + seen_keys.add(k_upper) lines.append(f"{k}={_format_conf_value(block.get(k))}") return "\n".join(lines).rstrip() + "\n" diff --git a/TUI.py b/TUI.py index 9b5b5bc..54a18fa 100644 --- a/TUI.py +++ b/TUI.py @@ -455,6 +455,7 @@ class PipelineHubApp(App): yield Button("Tags", id="tags-button") yield Button("Metadata", id="metadata-button") yield Button("Relationships", id="relationships-button") + yield Button("Config", id="config-button") yield Static("Ready", id="status-panel") yield OptionList(id="cmd-suggestions") @@ -514,6 +515,9 @@ class PipelineHubApp(App): self.refresh_workers() if self.command_input: self.command_input.focus() + + # Run startup check automatically + self._run_pipeline_background(".status") # ------------------------------------------------------------------ # Actions @@ -547,6 +551,12 @@ class PipelineHubApp(App): self.notify("Enter a pipeline to run", severity="warning", timeout=3) return + # Special interception for .config + if pipeline_text.lower().strip() == ".config": + self._open_config_popup() + self.command_input.value = "" + return + pipeline_text = self._apply_store_path_and_tags(pipeline_text) self._pipeline_running = True @@ -593,6 +603,47 @@ class PipelineHubApp(App): self._open_metadata_popup() elif event.button.id == "relationships-button": self._open_relationships_popup() + elif event.button.id == "config-button": + self._open_config_popup() + + def _open_config_popup(self) -> None: + from TUI.modalscreen.config_modal import ConfigModal + self.push_screen(ConfigModal(), callback=self.on_config_closed) + + def on_config_closed(self, result: Any = None) -> None: + """Call when the config modal is dismissed to reload session data.""" + try: + from SYS.config import load_config + from cmdlet._shared import SharedArgs + # Force a fresh load from disk + cfg = load_config() + + # Clear UI state to show a "fresh" start + self._clear_results() + self._clear_log() + self._append_log_line(">>> RESTARTING SESSION (Config updated)") + self._set_status("Reloading config…", level="info") + + # Clear shared caches (especially store selection choices) + SharedArgs._refresh_store_choices_cache(cfg) + # Update the global SharedArgs choices so cmdlets pick up new stores + SharedArgs.STORE.choices = SharedArgs.get_store_choices(cfg, force=True) + + # Re-build our local dropdown + self._populate_store_options() + # Reload cmdlet names (in case new ones were added or indexed) + self._load_cmdlet_names(force=True) + # Optionally update executor config if needed + self.executor._config_loader.load() + + self.notify("Configuration reloaded") + + # Use the existing background runner to show the status table + # This will append the IGNITIO table to the logs/results + self._run_pipeline_background(".status") + + except Exception as exc: + self.notify(f"Error refreshing config: {exc}", severity="error") def on_input_submitted(self, event: Input.Submitted) -> None: if event.input.id == "pipeline-input": @@ -886,10 +937,10 @@ class PipelineHubApp(App): except Exception: pass - def _load_cmdlet_names(self) -> None: + def _load_cmdlet_names(self, force: bool = False) -> None: try: - ensure_registry_loaded() - names = list_cmdlet_names() or [] + ensure_registry_loaded(force=force) + names = list_cmdlet_names(force=force) or [] self._cmdlet_names = sorted( {str(n).replace("_", "-") diff --git a/TUI/modalscreen/config_modal.py b/TUI/modalscreen/config_modal.py new file mode 100644 index 0000000..b3ceaa7 --- /dev/null +++ b/TUI/modalscreen/config_modal.py @@ -0,0 +1,327 @@ +from textual.app import ComposeResult +from textual.screen import ModalScreen +from textual.containers import Container, Horizontal, Vertical, ScrollableContainer +from textual.widgets import Static, Button, Input, Label, ListView, ListItem, Rule, OptionList, Footer +from textual import on +from textual.message import Message +from typing import Dict, Any, List, Optional +import os +import json + +from pathlib import Path +from SYS.config import load_config, save_config +from Store.registry import _discover_store_classes, _required_keys_for + +class ConfigModal(ModalScreen): + """A modal for editing the configuration.""" + + CSS = """ + ConfigModal { + align: center middle; + background: $boost; + } + + #config-container { + width: 90%; + height: 90%; + background: $panel; + border: thick $primary; + padding: 1; + } + + .section-title { + background: $accent; + color: $text; + padding: 0 1; + margin-bottom: 1; + text-align: center; + text-style: bold; + height: 3; + content-align: center middle; + } + + #config-sidebar { + width: 25%; + border-right: solid $surface; + } + + #config-content { + width: 75%; + padding: 1; + } + + .config-field { + margin-bottom: 1; + height: auto; + } + + .config-label { + width: 100%; + text-style: bold; + color: $accent; + } + + .config-input { + width: 100%; + } + + #config-actions { + height: 3; + align: right middle; + } + + .item-row { + height: 3; + margin-bottom: 1; + padding: 0 1; + border: solid $surface; + } + + .item-label { + width: 1fr; + content-align: left middle; + } + + Button { + margin: 0 1; + } + """ + + def __init__(self) -> None: + super().__init__() + # Load config from the workspace root (parent of SYS) + workspace_root = Path(__file__).resolve().parent.parent.parent + self.config_data = load_config(config_dir=workspace_root) + self.current_category = "globals" + self.editing_item_type = None # 'store' or 'provider' + self.editing_item_name = None + self.workspace_root = workspace_root + + def compose(self) -> ComposeResult: + with Container(id="config-container"): + yield Static("CONFIGURATION EDITOR", classes="section-title") + with Horizontal(): + with Vertical(id="config-sidebar"): + yield Label("Categories", classes="config-label") + with ListView(id="category-list"): + yield ListItem(Label("Global Settings"), id="cat-globals") + yield ListItem(Label("Stores"), id="cat-stores") + yield ListItem(Label("Providers"), id="cat-providers") + + with Vertical(id="config-content"): + yield ScrollableContainer(id="fields-container") + with Horizontal(id="config-actions"): + yield Button("Save", variant="success", id="save-btn") + yield Button("Add Store", variant="primary", id="add-store-btn") + yield Button("Back", id="back-btn") + yield Button("Close", variant="error", id="cancel-btn") + + def on_mount(self) -> None: + self.query_one("#add-store-btn", Button).display = False + self.query_one("#back-btn", Button).display = False + self.refresh_view() + + def refresh_view(self) -> None: + container = self.query_one("#fields-container", ScrollableContainer) + + # Clear existing synchronously + for child in list(container.children): + child.remove() + + # Update visibility of buttons + try: + self.query_one("#add-store-btn", Button).display = (self.current_category == "stores" and self.editing_item_name is None) + self.query_one("#back-btn", Button).display = (self.editing_item_name is not None) + except Exception: + pass + + # We mount using call_after_refresh to ensure the removals are processed by Textual + # before we try to mount new widgets with potentially duplicate IDs. + def do_mount(): + if self.editing_item_name: + self.render_item_editor(container) + elif self.current_category == "globals": + self.render_globals(container) + elif self.current_category == "stores": + self.render_stores(container) + elif self.current_category == "providers": + self.render_providers(container) + + self.call_after_refresh(do_mount) + + def render_globals(self, container: ScrollableContainer) -> None: + container.mount(Label("General Configuration", classes="config-label")) + for k, v in self.config_data.items(): + if not isinstance(v, dict) and not k.startswith("_"): + container.mount(Label(k)) + container.mount(Input(value=str(v), id=f"global-{k}", classes="config-input")) + + def render_stores(self, container: ScrollableContainer) -> None: + container.mount(Label("Configured Stores", classes="config-label")) + stores = self.config_data.get("store", {}) + if not stores: + container.mount(Static("No stores configured.")) + else: + # stores is structured as: {type: {name: config}} + for stype, instances in stores.items(): + if isinstance(instances, dict): + for name, conf in instances.items(): + row = Horizontal( + Static(f"{name} ({stype})", classes="item-label"), + Button("Edit", id=f"edit-store-{stype}-{name}"), + Button("Delete", variant="error", id=f"del-store-{stype}-{name}"), + classes="item-row" + ) + container.mount(row) + + def render_providers(self, container: ScrollableContainer) -> None: + container.mount(Label("Configured Providers", classes="config-label")) + providers = self.config_data.get("provider", {}) + if not providers: + container.mount(Static("No providers configured.")) + else: + for name, _ in providers.items(): + row = Horizontal( + Static(name, classes="item-label"), + Button("Edit", id=f"edit-provider-{name}"), + classes="item-row" + ) + container.mount(row) + + def render_item_editor(self, container: ScrollableContainer) -> None: + item_type = str(self.editing_item_type or "") + item_name = str(self.editing_item_name or "") + + # Parse item_type for store-{stype} or just provider + if item_type.startswith("store-"): + stype = item_type.replace("store-", "") + container.mount(Label(f"Editing Store: {item_name} ({stype})", classes="config-label")) + section = self.config_data.get("store", {}).get(stype, {}).get(item_name, {}) + else: + container.mount(Label(f"Editing {item_type.capitalize()}: {item_name}", classes="config-label")) + section = self.config_data.get(item_type, {}).get(item_name, {}) + + # Show all existing keys + existing_keys_upper = set() + for k, v in section.items(): + if k.startswith("_"): continue + + # Deduplicate keys case-insensitively (e.g. name vs NAME vs Name) + k_upper = k.upper() + if k_upper in existing_keys_upper: + continue + existing_keys_upper.add(k_upper) + + container.mount(Label(k)) + container.mount(Input(value=str(v), id=f"item-{k}", classes="config-input")) + + # If it's a store, we might have required keys + if item_type.startswith("store-"): + stype = item_type.replace("store-", "") + classes = _discover_store_classes() + if stype in classes: + required_keys = _required_keys_for(classes[stype]) + for rk in required_keys: + # Case-insensitive deduplication (fix path vs PATH) + if rk.upper() not in existing_keys_upper: + container.mount(Label(rk)) + container.mount(Input(value="", id=f"item-{rk}", classes="config-input")) + + def create_field(self, name: str, value: Any, id: str) -> Vertical: + # This method is now unused - we mount labels and inputs directly + v = Vertical(classes="config-field") + return v + + def on_list_view_selected(self, event: ListView.Selected) -> None: + if not event.item: return + if event.item.id == "cat-globals": + self.current_category = "globals" + elif event.item.id == "cat-stores": + self.current_category = "stores" + elif event.item.id == "cat-providers": + self.current_category = "providers" + + self.editing_item_name = None + self.editing_item_type = None + self.refresh_view() + + def on_button_pressed(self, event: Button.Pressed) -> None: + bid = event.button.id + if not bid: return + + if bid == "cancel-btn": + self.dismiss() + elif bid == "back-btn": + self.editing_item_name = None + self.editing_item_type = None + self.refresh_view() + elif bid == "save-btn": + self.save_all() + self.notify("Configuration saved!") + # Return to the main list view within the current category + self.editing_item_name = None + self.editing_item_type = None + self.refresh_view() + elif bid.startswith("edit-store-"): + parts = bid.replace("edit-store-", "").split("-", 1) + if len(parts) == 2: + stype, name = parts + self.editing_item_type = f"store-{stype}" + self.editing_item_name = name + self.refresh_view() + elif bid.startswith("edit-provider-"): + self.editing_item_name = bid.replace("edit-provider-", "") + self.editing_item_type = "provider" + self.refresh_view() + elif bid == "add-store-btn": + # Create a simple dialog or default folder store + new_name = "new_folder" + if "store" not in self.config_data: + self.config_data["store"] = {} + if "folder" not in self.config_data["store"]: + self.config_data["store"]["folder"] = {} + self.config_data["store"]["folder"][new_name] = {"NAME": new_name, "path": ""} + self.editing_item_type = "store-folder" + self.editing_item_name = new_name + self.refresh_view() + elif bid.startswith("del-store-"): + parts = bid.replace("del-store-", "").split("-", 1) + if len(parts) == 2: + stype, name = parts + if "store" in self.config_data and stype in self.config_data["store"]: + if name in self.config_data["store"][stype]: + del self.config_data["store"][stype][name] + self.refresh_view() + + @on(Input.Changed) + def on_input_changed(self, event: Input.Changed) -> None: + if not event.input.id: + return + if event.input.id.startswith("global-"): + key = event.input.id.replace("global-", "") + self.config_data[key] = event.value + elif event.input.id.startswith("item-") and self.editing_item_name: + key = event.input.id.replace("item-", "") + it = str(self.editing_item_type or "") + inm = str(self.editing_item_name or "") + + # Handle nested store structure + if it.startswith("store-"): + stype = it.replace("store-", "") + if "store" not in self.config_data: + self.config_data["store"] = {} + if stype not in self.config_data["store"]: + self.config_data["store"][stype] = {} + if inm not in self.config_data["store"][stype]: + self.config_data["store"][stype][inm] = {} + self.config_data["store"][stype][inm][key] = event.value + else: + # Provider or other top-level sections + if it not in self.config_data: + self.config_data[it] = {} + if inm not in self.config_data[it]: + self.config_data[it][inm] = {} + self.config_data[it][inm][key] = event.value + + def save_all(self) -> None: + save_config(self.config_data, config_dir=self.workspace_root) diff --git a/cmdlet/__init__.py b/cmdlet/__init__.py index 8032801..80210a5 100644 --- a/cmdlet/__init__.py +++ b/cmdlet/__init__.py @@ -104,10 +104,10 @@ def _register_native_commands() -> None: pass -def ensure_cmdlet_modules_loaded() -> None: +def ensure_cmdlet_modules_loaded(force: bool = False) -> None: global _MODULES_LOADED - if _MODULES_LOADED: + if _MODULES_LOADED and not force: return for mod_name in _iter_cmdlet_module_names(): diff --git a/cmdlet/_shared.py b/cmdlet/_shared.py index 6d9f52d..645b2c2 100644 --- a/cmdlet/_shared.py +++ b/cmdlet/_shared.py @@ -202,7 +202,8 @@ class SharedArgs: ) @staticmethod - def get_store_choices(config: Optional[Dict[str, Any]] = None) -> List[str]: + @staticmethod + def get_store_choices(config: Optional[Dict[str, Any]] = None, force: bool = False) -> List[str]: """Get list of available store backend names. This method returns the cached list of available backends from the most @@ -210,7 +211,8 @@ class SharedArgs: Users must restart to refresh the list if stores are enabled/disabled. Args: - config: Ignored (kept for compatibility); uses cached startup result. + config: Optional config dict. Used if force=True or no cache exists. + force: If True, force a fresh check of the backends. Returns: List of backend names (e.g., ['default', 'test', 'home', 'work']) @@ -219,23 +221,13 @@ class SharedArgs: Example: SharedArgs.STORE.choices = SharedArgs.get_store_choices(config) """ - # Use the cached startup check result if available - if hasattr(SharedArgs, "_cached_available_stores"): + # Use the cached startup check result if available (unless force=True) + if not force and hasattr(SharedArgs, "_cached_available_stores"): return SharedArgs._cached_available_stores or [] - # Fallback to configured names if cache doesn't exist yet - # (This shouldn't happen in normal operation, but provides a safe fallback) - try: - from Store.registry import list_configured_backend_names - if config is None: - try: - from SYS.config import load_config - config = load_config() - except Exception: - return [] - return list_configured_backend_names(config) or [] - except Exception: - return [] + # Refresh the cache + SharedArgs._refresh_store_choices_cache(config) + return SharedArgs._cached_available_stores or [] @staticmethod def _refresh_store_choices_cache(config: Optional[Dict[str, Any]] = None) -> None: diff --git a/cmdnat/config.py b/cmdnat/config.py index be5a2e6..5755b30 100644 --- a/cmdnat/config.py +++ b/cmdnat/config.py @@ -181,7 +181,12 @@ def _strip_value_quotes(value: str) -> str: def _run(piped_result: Any, args: List[str], config: Dict[str, Any]) -> int: - current_config = load_config() + import sys + from pathlib import Path + + # Load from workspace root, not SYS directory + workspace_root = Path(__file__).resolve().parent.parent + current_config = load_config(config_dir=workspace_root) selection_key = _get_selected_config_key() value_from_args = _extract_value_arg(args) if selection_key else None @@ -197,7 +202,7 @@ def _run(piped_result: Any, args: List[str], config: Dict[str, Any]) -> int: new_value = _strip_value_quotes(new_value) try: set_nested_config(current_config, selection_key, new_value) - save_config(current_config) + save_config(current_config, config_dir=workspace_root) print(f"Updated '{selection_key}' to '{new_value}'") return 0 except Exception as exc: @@ -205,6 +210,47 @@ def _run(piped_result: Any, args: List[str], config: Dict[str, Any]) -> int: return 1 if not args: + # Check if we're in an interactive terminal and can launch a Textual modal + if sys.stdin.isatty() and not piped_result: + try: + from textual.app import App, ComposeResult + from TUI.modalscreen.config_modal import ConfigModal + + class ConfigApp(App): + def on_mount(self) -> None: + self.title = "Config Editor" + # We push the modal screen. It will sit on top of the main (blank) screen. + # Using a callback to exit the app when the modal is dismissed. + self.push_screen(ConfigModal(), callback=self.exit_on_close) + + def exit_on_close(self, result: Any = None) -> None: + self.exit() + + with ctx.suspend_live_progress(): + app = ConfigApp() + app.run() + + # After modal exits, show the new status table if possible + try: + from cmdlet._shared import SharedArgs + from cmdnat.status import CMDLET as STATUS_CMDLET + # We reload the config one more time because it might have changed on disk + fresh_config = load_config(config_dir=workspace_root) + + # Force refresh of shared caches (especially stores) + SharedArgs._refresh_store_choices_cache(fresh_config) + # Update the global SharedArgs choices so cmdlets pick up new stores + SharedArgs.STORE.choices = SharedArgs.get_store_choices(fresh_config, force=True) + + return STATUS_CMDLET.exec(None, [], fresh_config) + except Exception: + pass + return 0 + except Exception as exc: + # Fall back to table display if Textual modal fails + print(f"Note: Could not launch interactive editor ({exc}). Showing configuration table:") + return _show_config_table(current_config) + return _show_config_table(current_config) key = args[0] @@ -215,7 +261,7 @@ def _run(piped_result: Any, args: List[str], config: Dict[str, Any]) -> int: value = _strip_value_quotes(" ".join(args[1:])) try: set_nested_config(current_config, key, value) - save_config(current_config) + save_config(current_config, config_dir=workspace_root) print(f"Updated '{key}' to '{value}'") return 0 except Exception as exc: diff --git a/cmdnat/status.py b/cmdnat/status.py new file mode 100644 index 0000000..e98ebac --- /dev/null +++ b/cmdnat/status.py @@ -0,0 +1,252 @@ +from __future__ import annotations + +import sys +import shutil +from typing import Any, Dict, List, Optional, Sequence, Tuple +from datetime import datetime + +from cmdlet._shared import Cmdlet, CmdletArg +from SYS import pipeline as ctx +from SYS.result_table import ResultTable +from SYS.logger import log, set_debug, debug +from SYS.rich_display import stdout_console + +CMDLET = Cmdlet( + name=".status", + summary="Check and display service/provider status", + usage=".status", + arg=[], +) + +def _upper(value: Any) -> str: + text = "" if value is None else str(value) + return text.upper() + +def _add_startup_check( + table: ResultTable, + status: str, + name: str, + *, + provider: str = "", + store: str = "", + files: int | str | None = None, + detail: str = "", +) -> None: + row = table.add_row() + row.add_column("STATUS", _upper(status)) + row.add_column("NAME", _upper(name)) + row.add_column("PROVIDER", _upper(provider or "")) + row.add_column("STORE", _upper(store or "")) + row.add_column("FILES", "" if files is None else str(files)) + row.add_column("DETAIL", _upper(detail or "")) + +def _has_store_subtype(cfg: dict, subtype: str) -> bool: + store_cfg = cfg.get("store") + if not isinstance(store_cfg, dict): + return False + bucket = store_cfg.get(subtype) + if not isinstance(bucket, dict): + return False + return any(isinstance(v, dict) and bool(v) for v in bucket.values()) + +def _has_provider(cfg: dict, name: str) -> bool: + provider_cfg = cfg.get("provider") + if not isinstance(provider_cfg, dict): + return False + block = provider_cfg.get(str(name).strip().lower()) + return isinstance(block, dict) and bool(block) + +def _has_tool(cfg: dict, name: str) -> bool: + tool_cfg = cfg.get("tool") + if not isinstance(tool_cfg, dict): + return False + block = tool_cfg.get(str(name).strip().lower()) + return isinstance(block, dict) and bool(block) + +def _ping_url(url: str, timeout: float = 3.0) -> tuple[bool, str]: + try: + from API.HTTP import HTTPClient + with HTTPClient(timeout=timeout, retries=1) as client: + resp = client.get(url, allow_redirects=True) + code = int(getattr(resp, "status_code", 0) or 0) + ok = 200 <= code < 500 + return ok, f"{url} (HTTP {code})" + except Exception as exc: + return False, f"{url} ({type(exc).__name__})" + +def _provider_display_name(key: str) -> str: + k = (key or "").strip() + low = k.lower() + if low == "openlibrary": return "OpenLibrary" + if low == "alldebrid": return "AllDebrid" + if low == "youtube": return "YouTube" + return k[:1].upper() + k[1:] if k else "Provider" + +def _default_provider_ping_targets(provider_key: str) -> list[str]: + prov = (provider_key or "").strip().lower() + if prov == "openlibrary": return ["https://openlibrary.org"] + if prov == "youtube": return ["https://www.youtube.com"] + if prov == "bandcamp": return ["https://bandcamp.com"] + if prov == "libgen": + try: + from Provider.libgen import MIRRORS + return [str(x).rstrip("/") + "/json.php" for x in (MIRRORS or []) if str(x).strip()] + except ImportError: return [] + return [] + +def _ping_first(urls: list[str]) -> tuple[bool, str]: + for u in urls: + ok, detail = _ping_url(u) + if ok: return True, detail + if urls: + ok, detail = _ping_url(urls[0]) + return ok, detail + return False, "No ping target" + +def _run(result: Any, args: List[str], config: Dict[str, Any]) -> int: + startup_table = ResultTable( + "*********************************************" + ) + startup_table.set_no_choice(True).set_preserve_order(True) + startup_table.set_value_case("upper") + + debug_enabled = bool(config.get("debug", False)) + _add_startup_check(startup_table, "ENABLED" if debug_enabled else "DISABLED", "DEBUGGING") + + try: + # MPV check + try: + from MPV.mpv_ipc import MPV + MPV() + mpv_path = shutil.which("mpv") + _add_startup_check(startup_table, "ENABLED", "MPV", detail=mpv_path or "Available") + except Exception as exc: + _add_startup_check(startup_table, "DISABLED", "MPV", detail=str(exc)) + + # Store Registry + store_registry = None + try: + from Store import Store as StoreRegistry + store_registry = StoreRegistry(config=config, suppress_debug=True) + except Exception: + pass + + # Hydrus + if _has_store_subtype(config, "hydrusnetwork"): + hcfg = config.get("store", {}).get("hydrusnetwork", {}) + for iname, icfg in hcfg.items(): + if not isinstance(icfg, dict): continue + nkey = str(icfg.get("NAME") or iname) + uval = str(icfg.get("URL") or "").strip() + ok = bool(store_registry and store_registry.is_available(nkey)) + status = "ENABLED" if ok else "DISABLED" + files = None + detail = uval + if ok and store_registry: + try: + backend = store_registry[nkey] + files = getattr(backend, "total_count", None) + if files is None and hasattr(backend, "get_total_count"): + files = backend.get_total_count() + except Exception: pass + else: + err = store_registry.get_backend_error(iname) if store_registry else None + detail = f"{uval} - {err or 'Unavailable'}" + _add_startup_check(startup_table, status, nkey, store="hydrusnetwork", files=files, detail=detail) + + # Providers + pcfg = config.get("provider", {}) + if isinstance(pcfg, dict) and pcfg: + from ProviderCore.registry import list_providers, list_search_providers, list_file_providers + from Provider.metadata_provider import list_metadata_providers + + p_avail = list_providers(config) or {} + s_avail = list_search_providers(config) or {} + f_avail = list_file_providers(config) or {} + m_avail = list_metadata_providers(config) or {} + + already = {"matrix"} + for pname in pcfg.keys(): + prov = str(pname).lower() + if prov in already: continue + display = _provider_display_name(prov) + + if prov == "alldebrid": + try: + from Provider.alldebrid import _get_debrid_api_key + from API.alldebrid import AllDebridClient + api_key = _get_debrid_api_key(config) + if not api_key: + _add_startup_check(startup_table, "DISABLED", display, provider=prov, detail="Not configured") + else: + client = AllDebridClient(api_key) + _add_startup_check(startup_table, "ENABLED", display, provider=prov, detail=getattr(client, "base_url", "Connected")) + except Exception as exc: + _add_startup_check(startup_table, "DISABLED", display, provider=prov, detail=str(exc)) + already.add(prov) + continue + + is_known = prov in p_avail or prov in s_avail or prov in f_avail or prov in m_avail + if not is_known: + _add_startup_check(startup_table, "UNKNOWN", display, provider=prov, detail="Not registered") + else: + ok_val = p_avail.get(prov) or s_avail.get(prov) or f_avail.get(prov) or m_avail.get(prov) + detail = "Configured" if ok_val else "Not configured" + ping_targets = _default_provider_ping_targets(prov) + if ping_targets: + pok, pdet = _ping_first(ping_targets) + detail = pdet if ok_val else f"{detail} | {pdet}" + _add_startup_check(startup_table, "ENABLED" if ok_val else "DISABLED", display, provider=prov, detail=detail) + already.add(prov) + + # Matrix + if _has_provider(config, "matrix"): + try: + from Provider.matrix import Matrix + m_prov = Matrix(config) + mcfg = config.get("provider", {}).get("matrix", {}) + hs = str(mcfg.get("homeserver") or "").strip() + rid = str(mcfg.get("room_id") or "").strip() + detail = f"{hs} room:{rid}" + _add_startup_check(startup_table, "ENABLED" if m_prov.validate() else "DISABLED", "Matrix", provider="matrix", detail=detail) + except Exception as exc: + _add_startup_check(startup_table, "DISABLED", "Matrix", provider="matrix", detail=str(exc)) + + # Folders + if _has_store_subtype(config, "folder"): + fcfg = config.get("store", {}).get("folder", {}) + for iname, icfg in fcfg.items(): + if not isinstance(icfg, dict): continue + nkey = str(icfg.get("NAME") or iname) + pval = str(icfg.get("PATH") or icfg.get("path") or "").strip() + ok = bool(store_registry and store_registry.is_available(nkey)) + if ok and store_registry: + backend = store_registry[nkey] + scan_ok = getattr(backend, "scan_ok", True) + sdet = getattr(backend, "scan_detail", "Up to date") + stats = getattr(backend, "scan_stats", {}) + files = int(stats.get("files_total_db", 0)) if stats else None + _add_startup_check(startup_table, "SCANNED" if scan_ok else "ERROR", nkey, store="folder", files=files, detail=f"{pval} - {sdet}") + else: + err = store_registry.get_backend_error(iname) if store_registry else None + _add_startup_check(startup_table, "ERROR", nkey, store="folder", detail=f"{pval} - {err or 'Unavailable'}") + + # Cookies + try: + from tool.ytdlp import YtDlpTool + cf = YtDlpTool(config).resolve_cookiefile() + _add_startup_check(startup_table, "FOUND" if cf else "MISSING", "Cookies", detail=str(cf) if cf else "Not found") + except Exception: pass + + except Exception as exc: + debug(f"Status check failed: {exc}") + + if startup_table.rows: + # Mark as rendered to prevent CLI.py from auto-printing it to stdout + # (avoiding duplication in TUI logs, while keeping it in TUI Results) + setattr(startup_table, "_rendered_by_cmdlet", True) + ctx.set_current_stage_table(startup_table) + + return 0 + +CMDLET.exec = _run