From 66e6c6eb72516a902f2720bba42c0e8149d88fce Mon Sep 17 00:00:00 2001 From: Nose Date: Sun, 18 Jan 2026 10:50:42 -0800 Subject: [PATCH] h --- API/HTTP.py | 4 +- CLI.py | 336 +------------------------- Provider/HIFI.py | 16 +- Provider/Tidal.py | 16 +- Provider/alldebrid.py | 4 +- Provider/bandcamp.py | 4 +- Provider/hello_provider.py | 4 +- Provider/internetarchive.py | 4 +- Provider/podcastindex.py | 4 +- SYS/cli_parsing.py | 460 ++++++++++++++++++++++++++++++++++++ SYS/result_table.py | 94 ++++---- SYS/rich_display.py | 10 +- TUI.py | 4 +- TUI/modalscreen/search.py | 6 +- TUI/pipeline_runner.py | 4 +- cmdlet/_shared.py | 75 +++--- cmdlet/add_file.py | 12 +- cmdlet/add_tag.py | 4 +- cmdlet/delete_file.py | 6 +- cmdlet/download_file.py | 14 +- cmdlet/get_metadata.py | 6 +- cmdlet/get_note.py | 4 +- cmdlet/get_relationship.py | 2 +- cmdlet/get_tag.py | 4 +- cmdlet/get_url.py | 8 +- cmdlet/search_file.py | 10 +- cmdnat/adjective.py | 6 +- cmdnat/config.py | 4 +- cmdnat/help.py | 16 +- cmdnat/matrix.py | 8 +- cmdnat/pipe.py | 6 +- cmdnat/status.py | 71 +++++- cmdnat/telegram.py | 4 +- cmdnat/zerotier.py | 4 +- 34 files changed, 718 insertions(+), 516 deletions(-) create mode 100644 SYS/cli_parsing.py diff --git a/API/HTTP.py b/API/HTTP.py index becbf47..5a19857 100644 --- a/API/HTTP.py +++ b/API/HTTP.py @@ -191,10 +191,10 @@ class HTTPClient: if not is_debug_enabled(): return try: - from rich.table import Table + from rich.table import Table as RichTable from rich.panel import Panel - grid = Table.grid(padding=(0, 1)) + grid = RichTable.grid(padding=(0, 1)) grid.add_column("Key", style="cyan", no_wrap=True) grid.add_column("Value") for key, val in rows: diff --git a/CLI.py b/CLI.py index b8d7348..8ea9130 100644 --- a/CLI.py +++ b/CLI.py @@ -32,7 +32,7 @@ from rich.layout import Layout from rich.panel import Panel from rich.markdown import Markdown from rich.bar import Bar -from rich.table import Table +from rich.table import Table as RichTable from SYS.rich_display import ( IMAGE_EXTENSIONS, render_image_to_console, @@ -73,7 +73,7 @@ from SYS.cmdlet_catalog import ( list_cmdlet_names, ) from SYS.config import get_local_storage_path, load_config -from SYS.result_table import ResultTable +from SYS.result_table import Table from ProviderCore.registry import provider_inline_query_choices HELP_EXAMPLE_SOURCE_COMMANDS = { @@ -98,330 +98,12 @@ def _split_pipeline_tokens(tokens: Sequence[str]) -> List[List[str]]: stages.append(current) return [stage for stage in stages if stage] -class SelectionSyntax: - """Parses @ selection syntax into 1-based indices.""" - - _RANGE_RE = re.compile(r"^[0-9\-]+$") - - @staticmethod - def parse(token: str) -> Optional[Set[int]]: - """Return 1-based indices or None when not a concrete selection. - - Concrete selections: - - @2 - - @2-5 - - @{1,3,5} - - @2,5,7-9 - - Special (non-concrete) selectors return None: - - @* (select all) - - @.. (history prev) - - @,, (history next) - """ - - if not token or not token.startswith("@"): - return None - - selector = token[1:].strip() - if selector in (".", ",", "*"): - return None - - if selector.startswith("{") and selector.endswith("}"): - selector = selector[1:-1].strip() - - indices: Set[int] = set() - for part in selector.split(","): - part = part.strip() - if not part: - continue - - if "-" in part: - pieces = part.split("-", 1) - if len(pieces) != 2: - return None - start_str = pieces[0].strip() - end_str = pieces[1].strip() - if not start_str or not end_str: - return None - try: - start = int(start_str) - end = int(end_str) - except ValueError: - return None - if start <= 0 or end <= 0 or start > end: - return None - indices.update(range(start, end + 1)) - continue - - try: - value = int(part) - except ValueError: - return None - if value <= 0: - return None - indices.add(value) - - return indices if indices else None +# Selection parsing and REPL lexer moved to SYS.cli_parsing +from SYS.cli_parsing import SelectionSyntax, SelectionFilterSyntax, MedeiaLexer -class SelectionFilterSyntax: - """Parses and applies @"COL:filter" selection filters. +# SelectionFilterSyntax moved to SYS.cli_parsing (imported above) - Notes: - - CLI tokenization (shlex) strips quotes, so a user input of `@"TITLE:foo"` - arrives as `@TITLE:foo`. We support both forms. - - Filters apply to the *current selectable table items* (in-memory), not to - provider searches. - """ - - _OP_RE = re.compile(r"^(>=|<=|!=|==|>|<|=)\s*(.+)$") - _DUR_TOKEN_RE = re.compile(r"(?i)(\d+)\s*([hms])") - - @staticmethod - def parse(token: str) -> Optional[List[Tuple[str, str]]]: - """Return list of (column, raw_expression) or None when not a filter token.""" - - if not token or not str(token).startswith("@"): - return None - - if token.strip() == "@*": - return None - - # If this is a concrete numeric selection (@2, @1-3, @{1,3}), do not treat it as a filter. - try: - if SelectionSyntax.parse(str(token)) is not None: - return None - except Exception: - pass - - raw = str(token)[1:].strip() - if not raw: - return None - - # If quotes survived tokenization, strip a single symmetric wrapper. - if len(raw) >= 2 and raw[0] == raw[-1] and raw[0] in ('"', "'"): - raw = raw[1:-1].strip() - - # Shorthand: @"foo" means Title contains "foo". - if ":" not in raw: - if raw: - return [("Title", raw)] - return None - - parts = [p.strip() for p in raw.split(",") if p.strip()] - conditions: List[Tuple[str, str]] = [] - for part in parts: - if ":" not in part: - return None - col, expr = part.split(":", 1) - col = str(col or "").strip() - expr = str(expr or "").strip() - if not col: - return None - conditions.append((col, expr)) - - return conditions if conditions else None - - @staticmethod - def _norm_key(text: str) -> str: - return re.sub(r"\s+", " ", str(text or "").strip().lower()) - - @staticmethod - def _item_column_map(item: Any) -> Dict[str, str]: - out: Dict[str, str] = {} - - def _set(k: Any, v: Any) -> None: - key = SelectionFilterSyntax._norm_key(str(k or "")) - if not key: - return - if v is None: - return - try: - if isinstance(v, (list, tuple, set)): - text = ", ".join(str(x) for x in v if x is not None) - else: - text = str(v) - except Exception: - return - out[key] = text - - if isinstance(item, dict): - # Display columns (primary UX surface) - cols = item.get("columns") - if isinstance(cols, list): - for pair in cols: - try: - if isinstance(pair, (list, tuple)) and len(pair) == 2: - _set(pair[0], pair[1]) - except Exception: - continue - # Direct keys as fallback - for k, v in item.items(): - if k == "columns": - continue - _set(k, v) - else: - cols = getattr(item, "columns", None) - if isinstance(cols, list): - for pair in cols: - try: - if isinstance(pair, (list, tuple)) and len(pair) == 2: - _set(pair[0], pair[1]) - except Exception: - continue - for k in ("title", "path", "detail", "provider", "store", "table"): - try: - _set(k, getattr(item, k, None)) - except Exception: - pass - - return out - - @staticmethod - def _parse_duration_seconds(text: str) -> Optional[int]: - s = str(text or "").strip() - if not s: - return None - - if s.isdigit(): - try: - return max(0, int(s)) - except Exception: - return None - - # clock format: M:SS or H:MM:SS - if ":" in s: - parts = [p.strip() for p in s.split(":")] - if len(parts) == 2 and all(p.isdigit() for p in parts): - m, sec = parts - return max(0, int(m) * 60 + int(sec)) - if len(parts) == 3 and all(p.isdigit() for p in parts): - h, m, sec = parts - return max(0, int(h) * 3600 + int(m) * 60 + int(sec)) - - # token format: 1h2m3s (tokens can appear in any combination) - total = 0 - found = False - for m in SelectionFilterSyntax._DUR_TOKEN_RE.finditer(s): - found = True - n = int(m.group(1)) - unit = m.group(2).lower() - if unit == "h": - total += n * 3600 - elif unit == "m": - total += n * 60 - elif unit == "s": - total += n - if found: - return max(0, int(total)) - - return None - - @staticmethod - def _parse_float(text: str) -> Optional[float]: - s = str(text or "").strip() - if not s: - return None - s = s.replace(",", "") - try: - return float(s) - except Exception: - return None - - @staticmethod - def _parse_op(expr: str) -> tuple[Optional[str], str]: - text = str(expr or "").strip() - if not text: - return None, "" - m = SelectionFilterSyntax._OP_RE.match(text) - if not m: - return None, text - return m.group(1), str(m.group(2) or "").strip() - - @staticmethod - def matches(item: Any, conditions: List[Tuple[str, str]]) -> bool: - colmap = SelectionFilterSyntax._item_column_map(item) - - for col, expr in conditions: - key = SelectionFilterSyntax._norm_key(col) - actual = colmap.get(key) - - # Convenience aliases for common UX names. - if actual is None: - if key == "duration": - actual = colmap.get("duration") - elif key == "title": - actual = colmap.get("title") - - if actual is None: - return False - - op, rhs = SelectionFilterSyntax._parse_op(expr) - left_text = str(actual or "").strip() - right_text = str(rhs or "").strip() - - if op is None: - if not right_text: - return False - if right_text.lower() not in left_text.lower(): - return False - continue - - # Comparator: try duration parsing first when it looks time-like. - prefer_duration = ( - key == "duration" - or any(ch in right_text for ch in (":", "h", "m", "s")) - or any(ch in left_text for ch in (":", "h", "m", "s")) - ) - - left_num: Optional[float] = None - right_num: Optional[float] = None - - if prefer_duration: - ldur = SelectionFilterSyntax._parse_duration_seconds(left_text) - rdur = SelectionFilterSyntax._parse_duration_seconds(right_text) - if ldur is not None and rdur is not None: - left_num = float(ldur) - right_num = float(rdur) - - if left_num is None or right_num is None: - left_num = SelectionFilterSyntax._parse_float(left_text) - right_num = SelectionFilterSyntax._parse_float(right_text) - - if left_num is not None and right_num is not None: - if op in ("=", "=="): - if not (left_num == right_num): - return False - elif op == "!=": - if not (left_num != right_num): - return False - elif op == ">": - if not (left_num > right_num): - return False - elif op == ">=": - if not (left_num >= right_num): - return False - elif op == "<": - if not (left_num < right_num): - return False - elif op == "<=": - if not (left_num <= right_num): - return False - else: - return False - continue - - # Fallback to string equality for =/!= when numeric parsing fails. - if op in ("=", "=="): - if left_text.lower() != right_text.lower(): - return False - elif op == "!=": - if left_text.lower() == right_text.lower(): - return False - else: - return False - - return True class WorkerOutputMirror(io.TextIOBase): @@ -1204,6 +886,8 @@ class MedeiaLexer(Lexer): return get_line +from SYS.cli_parsing import MedeiaLexer as _MigratedMedeiaLexer +MedeiaLexer = _MigratedMedeiaLexer class ConfigLoader: @@ -4665,7 +4349,7 @@ class MedeiaCLI: ] def rainbow_pillar(colors, height=21, bar_width=36): - table = Table.grid(padding=0) + table = RichTable.grid(padding=0) table.add_column(no_wrap=True) for i in range(height): @@ -4727,10 +4411,10 @@ Come to love it when others take what you share, as there is no greater joy prompt_text = "<🜂🜄|🜁🜃>" - startup_table = ResultTable( + startup_table = Table( "*********************************************" ) - startup_table.set_no_choice(True).set_preserve_order(True) + startup_table._interactive(True)._perseverance(True) startup_table.set_value_case("upper") def _upper(value: Any) -> str: diff --git a/Provider/HIFI.py b/Provider/HIFI.py index 7b6795f..1c0dc08 100644 --- a/Provider/HIFI.py +++ b/Provider/HIFI.py @@ -735,7 +735,7 @@ class HIFI(Provider): try: from SYS.rich_display import stdout_console - from SYS.result_table import ResultTable + from SYS.result_table import Table except Exception: return @@ -743,7 +743,7 @@ class HIFI(Provider): if artist_name: label = f"{artist_name} - {label}" - table = ResultTable(f"HIFI Tracks: {label}").set_preserve_order(True) + table = Table(f"HIFI Tracks: {label}")._perseverance(True) table.set_table("hifi.track") try: table.set_table_metadata( @@ -1901,11 +1901,11 @@ class HIFI(Provider): try: from SYS.rich_display import stdout_console - from SYS.result_table import ResultTable + from SYS.result_table import Table except Exception: return False - table = ResultTable(f"HIFI Albums: {artist_name}").set_preserve_order(False) + table = Table(f"HIFI Albums: {artist_name}")._perseverance(False) table.set_table("hifi.album") try: table.set_table_metadata({"provider": "hifi", "view": "album", "artist_id": artist_id, "artist_name": artist_name}) @@ -1957,7 +1957,7 @@ class HIFI(Provider): try: from SYS.rich_display import stdout_console - from SYS.result_table import ResultTable + from SYS.result_table import Table except Exception: return False @@ -1965,7 +1965,7 @@ class HIFI(Provider): if artist_name: label = f"{artist_name} - {album_title}" # Preserve album order (disc/track) rather than sorting by title. - table = ResultTable(f"HIFI Tracks: {label}").set_preserve_order(True) + table = Table(f"HIFI Tracks: {label}")._perseverance(True) table.set_table("hifi.track") try: table.set_table_metadata( @@ -2027,11 +2027,11 @@ class HIFI(Provider): try: from SYS.rich_display import stdout_console - from SYS.result_table import ResultTable + from SYS.result_table import Table except Exception: return False - table = ResultTable("HIFI Track").set_preserve_order(True) + table = Table("HIFI Track")._perseverance(True) table.set_table("hifi.track") try: table.set_table_metadata({"provider": "hifi", "view": "track", "resolved_manifest": True}) diff --git a/Provider/Tidal.py b/Provider/Tidal.py index 759f1ef..60f0181 100644 --- a/Provider/Tidal.py +++ b/Provider/Tidal.py @@ -732,7 +732,7 @@ class Tidal(Provider): try: from SYS.rich_display import stdout_console - from SYS.result_table import ResultTable + from SYS.result_table import Table except Exception: return @@ -740,7 +740,7 @@ class Tidal(Provider): if artist_name: label = f"{artist_name} - {label}" - table = ResultTable(f"Tidal Tracks: {label}").set_preserve_order(True) + table = Table(f"Tidal Tracks: {label}")._perseverance(True) table.set_table("tidal.track") try: table.set_table_metadata( @@ -1901,11 +1901,11 @@ class Tidal(Provider): try: from SYS.rich_display import stdout_console - from SYS.result_table import ResultTable + from SYS.result_table import Table except Exception: return False - table = ResultTable(f"Tidal Albums: {artist_name}").set_preserve_order(False) + table = Table(f"Tidal Albums: {artist_name}")._perseverance(False) table.set_table("tidal.album") try: table.set_table_metadata({"provider": "tidal", "view": "album", "artist_id": artist_id, "artist_name": artist_name}) @@ -1957,7 +1957,7 @@ class Tidal(Provider): try: from SYS.rich_display import stdout_console - from SYS.result_table import ResultTable + from SYS.result_table import Table except Exception: return False @@ -1965,7 +1965,7 @@ class Tidal(Provider): if artist_name: label = f"{artist_name} - {album_title}" # Preserve album order (disc/track) rather than sorting by title. - table = ResultTable(f"Tidal Tracks: {label}").set_preserve_order(True) + table = Table(f"Tidal Tracks: {label}")._perseverance(True) table.set_table("tidal.track") try: table.set_table_metadata( @@ -2033,11 +2033,11 @@ class Tidal(Provider): try: from SYS.rich_display import stdout_console - from SYS.result_table import ResultTable + from SYS.result_table import Table except Exception: return False - table = ResultTable("Tidal Track").set_preserve_order(True) + table = Table("Tidal Track")._perseverance(True) table.set_table("tidal.track") try: table.set_table_metadata({"provider": "tidal", "view": "track", "resolved_manifest": True}) diff --git a/Provider/alldebrid.py b/Provider/alldebrid.py index 6544be4..70fc38b 100644 --- a/Provider/alldebrid.py +++ b/Provider/alldebrid.py @@ -1415,12 +1415,12 @@ class AllDebrid(TableProviderMixin, Provider): return True try: - from SYS.result_table import ResultTable + from SYS.result_table import Table from SYS.rich_display import stdout_console except Exception: return True - table = ResultTable(f"AllDebrid Files: {title}").set_preserve_order(True) + table = Table(f"AllDebrid Files: {title}")._perseverance(True) table.set_table("alldebrid") try: table.set_table_metadata({"provider": "alldebrid", "view": "files", "magnet_id": magnet_id}) diff --git a/Provider/bandcamp.py b/Provider/bandcamp.py index a494771..961ced8 100644 --- a/Provider/bandcamp.py +++ b/Provider/bandcamp.py @@ -206,7 +206,7 @@ class Bandcamp(Provider): # Build a new table from artist discography. try: - from SYS.result_table import ResultTable + from SYS.result_table import Table from SYS.rich_display import stdout_console except Exception: return False @@ -223,7 +223,7 @@ class Bandcamp(Provider): print(f"bandcamp artist lookup failed: {exc}\n") return True - table = ResultTable(f"Bandcamp: artist:{artist_title}").set_preserve_order(True) + table = Table(f"Bandcamp: artist:{artist_title}")._perseverance(True) table.set_table("bandcamp") try: table.set_value_case("lower") diff --git a/Provider/hello_provider.py b/Provider/hello_provider.py index 05ac4fa..b005e74 100644 --- a/Provider/hello_provider.py +++ b/Provider/hello_provider.py @@ -143,13 +143,13 @@ class HelloProvider(Provider): title = target.get("title") or f"hello-{idx}" try: - from SYS.result_table import ResultTable + from SYS.result_table import Table from SYS.rich_display import stdout_console except Exception: # If ResultTable isn't available, consider selection handled return True - table = ResultTable(f"Hello Details: {title}").set_preserve_order(True) + table = Table(f"Hello Details: {title}")._perseverance(True) table.set_table("hello") try: table.set_table_metadata({"provider": "hello", "view": "details", "example_index": idx}) diff --git a/Provider/internetarchive.py b/Provider/internetarchive.py index d92358e..c22959a 100644 --- a/Provider/internetarchive.py +++ b/Provider/internetarchive.py @@ -95,7 +95,7 @@ def maybe_show_formats_table( ) try: - from SYS.result_table import ResultTable + from SYS.result_table import Table from SYS import pipeline as pipeline_context except Exception as exc: log(f"download-file: ResultTable unavailable: {exc}", file=sys.stderr) @@ -106,7 +106,7 @@ def maybe_show_formats_table( if out_arg: base_args.extend(["-path", str(out_arg)]) - table = ResultTable(table_title).set_preserve_order(True) + table = Table(table_title)._perseverance(True) table.set_table("internetarchive.format") table.set_source_command("download-file", base_args) diff --git a/Provider/podcastindex.py b/Provider/podcastindex.py index 3d02cf3..ba1f6e8 100644 --- a/Provider/podcastindex.py +++ b/Provider/podcastindex.py @@ -219,12 +219,12 @@ class PodcastIndex(Provider): return True try: - from SYS.result_table import ResultTable + from SYS.result_table import Table from SYS.rich_display import stdout_console except Exception: return True - table = ResultTable(f"PodcastIndex Episodes: {feed_title}").set_preserve_order(True) + table = Table(f"PodcastIndex Episodes: {feed_title}")._perseverance(True) table.set_table("podcastindex.episodes") try: table.set_value_case("lower") diff --git a/SYS/cli_parsing.py b/SYS/cli_parsing.py new file mode 100644 index 0000000..4d2bb14 --- /dev/null +++ b/SYS/cli_parsing.py @@ -0,0 +1,460 @@ +"""CLI parsing helpers moved out of `CLI.py`. + +Contains selection parsing and the REPL lexer so `CLI.py` can be smaller and +these pure helpers are easier to test. +""" + +from __future__ import annotations + +import re +from typing import Any, Dict, List, Optional, Set, Tuple + +# Prompt-toolkit lexer types are optional at import time; fall back to lightweight +# stubs if prompt_toolkit is not available so imports remain safe for testing. +try: + from prompt_toolkit.document import Document + from prompt_toolkit.lexers import Lexer +except Exception: # pragma: no cover - optional dependency + Document = object # type: ignore + + class Lexer: # simple fallback base + pass + + +class SelectionSyntax: + """Parses @ selection syntax into 1-based indices.""" + + _RANGE_RE = re.compile(r"^[0-9\-]+$") + + @staticmethod + def parse(token: str) -> Optional[Set[int]]: + """Return 1-based indices or None when not a concrete selection. + + Concrete selections: + - @2 + - @2-5 + - @{1,3,5} + - @2,5,7-9 + + Special (non-concrete) selectors return None: + - @* (select all) + - @.. (history prev) + - @,, (history next) + """ + + if not token or not token.startswith("@"): + return None + + selector = token[1:].strip() + if selector in (".", ",", "*"): + return None + + if selector.startswith("{") and selector.endswith("}"): + selector = selector[1:-1].strip() + + indices: Set[int] = set() + for part in selector.split(","): + part = part.strip() + if not part: + continue + + if "-" in part: + pieces = part.split("-", 1) + if len(pieces) != 2: + return None + start_str = pieces[0].strip() + end_str = pieces[1].strip() + if not start_str or not end_str: + return None + try: + start = int(start_str) + end = int(end_str) + except ValueError: + return None + if start <= 0 or end <= 0 or start > end: + return None + indices.update(range(start, end + 1)) + continue + + try: + value = int(part) + except ValueError: + return None + if value <= 0: + return None + indices.add(value) + + return indices if indices else None + + +class SelectionFilterSyntax: + """Parses and applies @"COL:filter" selection filters. + + Notes: + - CLI tokenization (shlex) strips quotes, so a user input of `@"TITLE:foo"` + arrives as `@TITLE:foo`. We support both forms. + - Filters apply to the *current selectable table items* (in-memory), not to + provider searches. + """ + + _OP_RE = re.compile(r"^(>=|<=|!=|==|>|<|=)\s*(.+)$") + _DUR_TOKEN_RE = re.compile(r"(?i)(\d+)\s*([hms])") + + @staticmethod + def parse(token: str) -> Optional[List[Tuple[str, str]]]: + """Return list of (column, raw_expression) or None when not a filter token.""" + + if not token or not str(token).startswith("@"): + return None + + if token.strip() == "@*": + return None + + # If this is a concrete numeric selection (@2, @1-3, @{1,3}), do not treat it as a filter. + try: + if SelectionSyntax.parse(str(token)) is not None: + return None + except Exception: + pass + + raw = str(token)[1:].strip() + if not raw: + return None + + # If quotes survived tokenization, strip a single symmetric wrapper. + if len(raw) >= 2 and raw[0] == raw[-1] and raw[0] in ('"', "'"): + raw = raw[1:-1].strip() + + # Shorthand: @"foo" means Title contains "foo". + if ":" not in raw: + if raw: + return [("Title", raw)] + return None + + parts = [p.strip() for p in raw.split(",") if p.strip()] + conditions: List[Tuple[str, str]] = [] + for part in parts: + if ":" not in part: + return None + col, expr = part.split(":", 1) + col = str(col or "").strip() + expr = str(expr or "").strip() + if not col: + return None + conditions.append((col, expr)) + + return conditions if conditions else None + + @staticmethod + def _norm_key(text: str) -> str: + return re.sub(r"\s+", " ", str(text or "").strip().lower()) + + @staticmethod + def _item_column_map(item: Any) -> Dict[str, str]: + out: Dict[str, str] = {} + + def _set(k: Any, v: Any) -> None: + key = SelectionFilterSyntax._norm_key(str(k or "")) + if not key: + return + if v is None: + return + try: + if isinstance(v, (list, tuple, set)): + text = ", ".join(str(x) for x in v if x is not None) + else: + text = str(v) + except Exception: + return + out[key] = text + + if isinstance(item, dict): + # Display columns (primary UX surface) + cols = item.get("columns") + if isinstance(cols, list): + for pair in cols: + try: + if isinstance(pair, (list, tuple)) and len(pair) == 2: + _set(pair[0], pair[1]) + except Exception: + continue + # Direct keys as fallback + for k, v in item.items(): + if k == "columns": + continue + _set(k, v) + else: + cols = getattr(item, "columns", None) + if isinstance(cols, list): + for pair in cols: + try: + if isinstance(pair, (list, tuple)) and len(pair) == 2: + _set(pair[0], pair[1]) + except Exception: + continue + for k in ("title", "path", "detail", "provider", "store", "table"): + try: + _set(k, getattr(item, k, None)) + except Exception: + pass + + return out + + @staticmethod + def _parse_duration_seconds(text: str) -> Optional[int]: + s = str(text or "").strip() + if not s: + return None + + if s.isdigit(): + try: + return max(0, int(s)) + except Exception: + return None + + # clock format: M:SS or H:MM:SS + if ":" in s: + parts = [p.strip() for p in s.split(":")] + if len(parts) == 2 and all(p.isdigit() for p in parts): + m, sec = parts + return max(0, int(m) * 60 + int(sec)) + if len(parts) == 3 and all(p.isdigit() for p in parts): + h, m, sec = parts + return max(0, int(h) * 3600 + int(m) * 60 + int(sec)) + + # token format: 1h2m3s (tokens can appear in any combination) + total = 0 + found = False + for m in SelectionFilterSyntax._DUR_TOKEN_RE.finditer(s): + found = True + n = int(m.group(1)) + unit = m.group(2).lower() + if unit == "h": + total += n * 3600 + elif unit == "m": + total += n * 60 + elif unit == "s": + total += n + if found: + return max(0, int(total)) + + return None + + @staticmethod + def _parse_float(text: str) -> Optional[float]: + s = str(text or "").strip() + if not s: + return None + s = s.replace(",", "") + try: + return float(s) + except Exception: + return None + + @staticmethod + def _parse_op(expr: str) -> Tuple[Optional[str], str]: + text = str(expr or "").strip() + if not text: + return None, "" + m = SelectionFilterSyntax._OP_RE.match(text) + if not m: + return None, text + return m.group(1), str(m.group(2) or "").strip() + + @staticmethod + def matches(item: Any, conditions: List[Tuple[str, str]]) -> bool: + colmap = SelectionFilterSyntax._item_column_map(item) + + for col, expr in conditions: + key = SelectionFilterSyntax._norm_key(col) + actual = colmap.get(key) + + # Convenience aliases for common UX names. + if actual is None: + if key == "duration": + actual = colmap.get("duration") + elif key == "title": + actual = colmap.get("title") + + if actual is None: + return False + + op, rhs = SelectionFilterSyntax._parse_op(expr) + left_text = str(actual or "").strip() + right_text = str(rhs or "").strip() + + if op is None: + if not right_text: + return False + if right_text.lower() not in left_text.lower(): + return False + continue + + # Comparator: try duration parsing first when it looks time-like. + prefer_duration = ( + key == "duration" + or any(ch in right_text for ch in (":", "h", "m", "s")) + or any(ch in left_text for ch in (":", "h", "m", "s")) + ) + + left_num: Optional[float] = None + right_num: Optional[float] = None + + if prefer_duration: + ldur = SelectionFilterSyntax._parse_duration_seconds(left_text) + rdur = SelectionFilterSyntax._parse_duration_seconds(right_text) + if ldur is not None and rdur is not None: + left_num = float(ldur) + right_num = float(rdur) + + if left_num is None or right_num is None: + left_num = SelectionFilterSyntax._parse_float(left_text) + right_num = SelectionFilterSyntax._parse_float(right_text) + + if left_num is not None and right_num is not None: + if op in ("=", "=="): + if not (left_num == right_num): + return False + elif op == "!=": + if not (left_num != right_num): + return False + elif op == ">": + if not (left_num > right_num): + return False + elif op == ">=": + if not (left_num >= right_num): + return False + elif op == "<": + if not (left_num < right_num): + return False + elif op == "<=": + if not (left_num <= right_num): + return False + else: + return False + continue + + # Fallback to string equality for =/!= when numeric parsing fails. + if op in ("=", "=="): + if left_text.lower() != right_text.lower(): + return False + elif op == "!=": + if left_text.lower() == right_text.lower(): + return False + else: + return False + + return True + + +class MedeiaLexer(Lexer): + def lex_document(self, document: "Document"): # type: ignore[override] + + def get_line(lineno: int): + line = document.lines[lineno] + tokens: List[tuple[str, str]] = [] + + pattern = re.compile( + r""" + (\s+) | # 1. Whitespace + (\|) | # 2. Pipe + ("(?:[^"\\]|\\.)*"|'(?:[^'\\]|\\.)*') | # 3. Quoted string + ([^\s\|]+) # 4. Word + """, + re.VERBOSE, + ) + + is_cmdlet = True + + def _emit_keyed_value(word: str) -> bool: + """Emit `key:` prefixes (comma-separated) as argument tokens. + + Designed for values like: + clip:3m4s-3m14s,1h22m-1h33m,item:2-3 + + Avoids special-casing URLs (://) and Windows drive paths (C:\\...). + Returns True if it handled the token. + """ + if not word or ":" not in word: + return False + # Avoid URLs and common scheme patterns. + if "://" in word: + return False + # Avoid Windows drive paths (e.g., C:\\foo or D:/bar) + if re.match(r"^[A-Za-z]:[\\/]", word): + return False + + key_prefix = re.compile(r"^([A-Za-z_][A-Za-z0-9_-]*:)(.*)$") + parts = word.split(",") + handled_any = False + for i, part in enumerate(parts): + if i > 0: + tokens.append(("class:value", ",")) + if part == "": + continue + m = key_prefix.match(part) + if m: + tokens.append(("class:argument", m.group(1))) + if m.group(2): + tokens.append(("class:value", m.group(2))) + handled_any = True + else: + tokens.append(("class:value", part)) + handled_any = True + + return handled_any + + for match in pattern.finditer(line): + ws, pipe, quote, word = match.groups() + if ws: + tokens.append(("", ws)) + continue + if pipe: + tokens.append(("class:pipe", pipe)) + is_cmdlet = True + continue + if quote: + # If the quoted token contains a keyed spec (clip:/item:/hash:), + # highlight the `key:` portion in argument-blue even inside quotes. + if len(quote) >= 2 and quote[0] == quote[-1] and quote[0] in ('"', "'"): + q = quote[0] + inner = quote[1:-1] + start_index = len(tokens) + if _emit_keyed_value(inner): + # _emit_keyed_value already appended tokens for inner; insert opening quote + # before that chunk, then add the closing quote. + tokens.insert(start_index, ("class:string", q)) + tokens.append(("class:string", q)) + is_cmdlet = False + continue + + tokens.append(("class:string", quote)) + is_cmdlet = False + continue + if not word: + continue + + if word.startswith("@"): # selection tokens + rest = word[1:] + if rest and re.fullmatch(r"[0-9\-\*,]+", rest): + tokens.append(("class:selection_at", "@")) + tokens.append(("class:selection_range", rest)) + is_cmdlet = False + continue + if rest == "": + tokens.append(("class:selection_at", "@")) + is_cmdlet = False + continue + + if is_cmdlet: + tokens.append(("class:cmdlet", word)) + is_cmdlet = False + elif word.startswith("-"): + tokens.append(("class:argument", word)) + else: + if not _emit_keyed_value(word): + tokens.append(("class:value", word)) + + return tokens + + return get_line diff --git a/SYS/result_table.py b/SYS/result_table.py index 4886bfb..01788fe 100644 --- a/SYS/result_table.py +++ b/SYS/result_table.py @@ -341,7 +341,7 @@ class TUIResultCard: @dataclass -class ResultColumn: +class Column: """Represents a single column in a result table.""" name: str @@ -361,10 +361,10 @@ class ResultColumn: @dataclass -class ResultRow: +class Row: """Represents a single row in a result table.""" - columns: List[ResultColumn] = field(default_factory=list) + columns: List[Column] = field(default_factory=list) selection_args: Optional[List[str]] = None """Arguments to use for this row when selected via @N syntax (e.g., ['-item', '3'])""" selection_action: Optional[List[str]] = None @@ -398,7 +398,7 @@ class ResultRow: if formatted: str_value = formatted - self.columns.append(ResultColumn(normalized_name, str_value)) + self.columns.append(Column(normalized_name, str_value)) def get_column(self, name: str) -> Optional[str]: """Get column value by name.""" @@ -420,7 +420,7 @@ class ResultRow: return " | ".join(str(col) for col in self.columns) -class ResultTable: +class Table: """Unified table formatter for search results, metadata, and pipeline objects. Provides a structured way to display results in the CLI with consistent formatting. @@ -491,7 +491,7 @@ class ResultTable: self.max_columns = ( max_columns if max_columns is not None else 5 ) # Default 5 for cleaner display - self.rows: List[ResultRow] = [] + self.rows: List[Row] = [] self.column_widths: Dict[str, int] = {} self.input_options: Dict[str, @@ -503,9 +503,9 @@ class ResultTable: """Base arguments for the source command""" self.header_lines: List[str] = [] """Optional metadata lines rendered under the title""" - self.preserve_order: bool = preserve_order + self.perseverance: bool = preserve_order """If True, skip automatic sorting so display order matches input order.""" - self.no_choice: bool = False + self.interactive: bool = False """When True, suppress row numbers/selection to make the table non-interactive.""" self.table: Optional[str] = None """Table type (e.g., 'youtube', 'soulseek') for context-aware selection logic.""" @@ -516,7 +516,7 @@ class ResultTable: self.value_case: str = "lower" """Display-only value casing: 'lower' (default), 'upper', or 'preserve'.""" - def set_value_case(self, value_case: str) -> "ResultTable": + def set_value_case(self, value_case: str) -> "Table": """Configure display-only casing for rendered cell values.""" case = str(value_case or "").strip().lower() if case not in {"lower", @@ -535,12 +535,12 @@ class ResultTable: return text return text.lower() - def set_table(self, table: str) -> "ResultTable": + def set_table(self, table: str) -> "Table": """Set the table type for context-aware selection logic.""" self.table = table return self - def set_table_metadata(self, metadata: Optional[Dict[str, Any]]) -> "ResultTable": + def set_table_metadata(self, metadata: Optional[Dict[str, Any]]) -> "Table": """Attach provider/table metadata for downstream selection logic.""" self.table_metadata = dict(metadata or {}) return self @@ -552,19 +552,19 @@ class ResultTable: except Exception: return {} - def set_no_choice(self, no_choice: bool = True) -> "ResultTable": + def _interactive(self, interactive: bool = True) -> "Table": """Mark the table as non-interactive (no row numbers, no selection parsing).""" - self.no_choice = bool(no_choice) + self.interactive = bool(interactive) return self - def set_preserve_order(self, preserve: bool = True) -> "ResultTable": + def _perseverance(self, perseverance: bool = True) -> "Table": """Configure whether this table should skip automatic sorting.""" - self.preserve_order = bool(preserve) + self.perseverance = bool(perseverance) return self - def add_row(self) -> ResultRow: + def add_row(self) -> Row: """Add a new row to the table and return it for configuration.""" - row = ResultRow() + row = Row() row.source_index = len(self.rows) self.rows.append(row) return row @@ -573,7 +573,7 @@ class ResultTable: self, command: str, args: Optional[List[str]] = None - ) -> "ResultTable": + ) -> "Table": """Set the source command that generated this table. This is used for @N expansion: when user runs @2 | next-cmd, it will expand to: @@ -596,7 +596,7 @@ class ResultTable: command: str, args: Optional[List[str]] = None, preserve_order: bool = False, - ) -> "ResultTable": + ) -> "Table": """Initialize table with title, command, args, and preserve_order in one call. Consolidates common initialization pattern: ResultTable(title) + set_source_command(cmd, args) + set_preserve_order(preserve_order) @@ -613,10 +613,10 @@ class ResultTable: self.title = title self.source_command = command self.source_args = args or [] - self.preserve_order = preserve_order + self.perseverance = preserve_order return self - def copy_with_title(self, new_title: str) -> "ResultTable": + def copy_with_title(self, new_title: str) -> "Table": """Create a new table copying settings from this one but with a new title. Consolidates pattern: new_table = ResultTable(title); new_table.set_source_command(...) @@ -628,16 +628,16 @@ class ResultTable: Returns: New ResultTable with copied settings and new title """ - new_table = ResultTable( + new_table = Table( title=new_title, title_width=self.title_width, max_columns=self.max_columns, - preserve_order=self.preserve_order, + preserve_order=self.perseverance, ) new_table.source_command = self.source_command new_table.source_args = list(self.source_args) if self.source_args else [] new_table.input_options = dict(self.input_options) if self.input_options else {} - new_table.no_choice = self.no_choice + new_table.interactive = self.interactive new_table.table = self.table new_table.table_metadata = ( dict(self.table_metadata) if getattr(self, "table_metadata", None) else {} @@ -663,12 +663,12 @@ class ResultTable: if 0 <= row_index < len(self.rows): self.rows[row_index].selection_action = selection_action - def set_header_lines(self, lines: List[str]) -> "ResultTable": + def set_header_lines(self, lines: List[str]) -> "Table": """Attach metadata lines that render beneath the title.""" self.header_lines = [line for line in lines if line] return self - def set_header_line(self, line: str) -> "ResultTable": + def set_header_line(self, line: str) -> "Table": """Attach a single metadata line beneath the title.""" return self.set_header_lines([line] if line else []) @@ -699,7 +699,7 @@ class ResultTable: self.set_header_line(summary) return summary - def sort_by_title(self) -> "ResultTable": + def sort_by_title(self) -> "Table": """Sort rows alphabetically by Title or Name column. Looks for columns named 'Title', 'Name', or 'Tag' (in that order). @@ -737,7 +737,7 @@ class ResultTable: return self - def add_result(self, result: Any) -> "ResultTable": + def add_result(self, result: Any) -> "Table": """Add a result object (SearchResult, PipeObject, ResultItem, TagItem, or dict) as a row. Args: @@ -793,7 +793,7 @@ class ResultTable: return payloads @classmethod - def from_api_table(cls, api_table: Any) -> "ResultTable": + def from_api_table(cls, api_table: Any) -> "Table": """Convert a strict SYS.result_table_api.ResultTable into an interactive monolith ResultTable. This allows providers using the new strict API to benefit from the monolith's @@ -831,7 +831,7 @@ class ResultTable: return instance - def _add_result_model(self, row: ResultRow, result: ResultModel) -> None: + def _add_result_model(self, row: Row, result: ResultModel) -> None: """Extract and add ResultModel fields from the new API to row.""" row.add_column("Title", result.title) @@ -848,7 +848,7 @@ class ResultTable: # Add a placeholder for metadata-like display if needed in the main table # but usually metadata is handled by the detail panel now - def _add_search_result(self, row: ResultRow, result: Any) -> None: + def _add_search_result(self, row: Row, result: Any) -> None: """Extract and add SearchResult fields to row.""" cols = getattr(result, "columns", None) used_explicit_columns = False @@ -925,7 +925,7 @@ class ResultTable: if selection_action: row.selection_action = [str(a) for a in selection_action if a is not None] - def _add_result_item(self, row: ResultRow, item: Any) -> None: + def _add_result_item(self, row: Row, item: Any) -> None: """Extract and add ResultItem fields to row (compact display for search results). Shows only essential columns: @@ -970,7 +970,7 @@ class ResultTable: if hasattr(item, "size_bytes") and item.size_bytes: row.add_column("Size", _format_size(item.size_bytes, integer_only=False)) - def _add_tag_item(self, row: ResultRow, item: Any) -> None: + def _add_tag_item(self, row: Row, item: Any) -> None: """Extract and add TagItem fields to row (compact tag display). Shows the Tag column with the tag name and Source column to identify @@ -986,7 +986,7 @@ class ResultTable: if hasattr(item, "source") and item.source: row.add_column("Store", item.source) - def _add_pipe_object(self, row: ResultRow, obj: Any) -> None: + def _add_pipe_object(self, row: Row, obj: Any) -> None: """Extract and add PipeObject fields to row.""" # Source and identifier if hasattr(obj, "source") and obj.source: @@ -1019,7 +1019,7 @@ class ResultTable: warnings_str += f" (+{len(obj.warnings) - 2} more)" row.add_column("Warnings", warnings_str) - def _add_dict(self, row: ResultRow, data: Dict[str, Any]) -> None: + def _add_dict(self, row: Row, data: Dict[str, Any]) -> None: """Extract and add dict fields to row using first-match priority groups. Respects max_columns limit to keep table compact and readable. @@ -1251,7 +1251,7 @@ class ResultTable: # Don't display it added_fields.add("_selection_args") - def _add_generic_object(self, row: ResultRow, obj: Any) -> None: + def _add_generic_object(self, row: Row, obj: Any) -> None: """Extract and add fields from generic objects.""" if hasattr(obj, "__dict__"): for key, value in obj.__dict__.items(): @@ -1282,7 +1282,7 @@ class ResultTable: show_lines=False, ) - if not self.no_choice: + if not self.interactive: table.add_column("#", justify="right", no_wrap=True) # Render headers in uppercase, but keep original column keys for lookup. @@ -1301,7 +1301,7 @@ class ResultTable: for row_idx, row in enumerate(self.rows, 1): cells: List[str] = [] - if not self.no_choice: + if not self.interactive: cells.append(str(row_idx)) for name in col_names: val = row.get_column(name) or "" @@ -1381,7 +1381,7 @@ class ResultTable: """Iterate over rows.""" return iter(self.rows) - def __getitem__(self, index: int) -> ResultRow: + def __getitem__(self, index: int) -> Row: """Get row by index.""" return self.rows[index] @@ -1410,7 +1410,7 @@ class ResultTable: If accept_args=False: List of 0-based indices, or None if cancelled If accept_args=True: Dict with "indices" and "args" keys, or None if cancelled """ - if self.no_choice: + if self.interactive: from SYS.rich_display import stdout_console stdout_console().print(self) @@ -1494,7 +1494,7 @@ class ResultTable: Returns: List of 0-based indices, or None if invalid """ - if self.no_choice: + if self.interactive: return None indices = set() @@ -1604,7 +1604,7 @@ class ResultTable: "args": cmdlet_args } - def add_input_option(self, option: InputOption) -> "ResultTable": + def add_input_option(self, option: InputOption) -> "Table": """Add an interactive input option to the table. Input options allow users to specify cmdlet arguments interactively, @@ -1708,7 +1708,7 @@ class ResultTable: result[name] = value return result - def select_by_index(self, index: int) -> Optional[ResultRow]: + def select_by_index(self, index: int) -> Optional[Row]: """Get a row by 1-based index (user-friendly). Args: @@ -1740,7 +1740,7 @@ class ResultTable: return rows def _format_datatable_row(self, - row: ResultRow, + row: Row, source: str = "unknown") -> List[str]: """Format a ResultRow for DataTable display. @@ -1769,7 +1769,7 @@ class ResultTable: cards.append(card) return cards - def _row_to_card(self, row: ResultRow) -> TUIResultCard: + def _row_to_card(self, row: Row) -> TUIResultCard: """Convert a ResultRow to a TUIResultCard. Args: @@ -1886,7 +1886,7 @@ def format_result(result: Any, title: str = "") -> str: Returns: Formatted string """ - table = ResultTable(title) + table = Table(title) if isinstance(result, list): for item in result: @@ -1997,7 +1997,7 @@ def extract_item_metadata(item: Any) -> Dict[str, Any]: return out -class ItemDetailView(ResultTable): +class ItemDetailView(Table): """A specialized view that displays item details alongside a list of related items (tags, urls, etc). This is used for 'get-tag', 'get-url' and similar cmdlets where we want to contextually show diff --git a/SYS/rich_display.py b/SYS/rich_display.py index c0f3a4a..7e2148e 100644 --- a/SYS/rich_display.py +++ b/SYS/rich_display.py @@ -80,7 +80,7 @@ def show_provider_config_panel( provider_names: str | List[str], ) -> None: """Show a Rich panel explaining how to configure providers.""" - from rich.table import Table + from rich.table import Table as RichTable from rich.text import Text from rich.console import Group @@ -89,7 +89,7 @@ def show_provider_config_panel( else: providers = provider_names - table = Table.grid(padding=(0, 1)) + table = RichTable.grid(padding=(0, 1)) table.add_column(style="bold red") for provider in providers: @@ -116,7 +116,7 @@ def show_store_config_panel( store_names: str | List[str], ) -> None: """Show a Rich panel explaining how to configure storage backends.""" - from rich.table import Table + from rich.table import Table as RichTable from rich.text import Text from rich.console import Group @@ -125,7 +125,7 @@ def show_store_config_panel( else: stores = store_names - table = Table.grid(padding=(0, 1)) + table = RichTable.grid(padding=(0, 1)) table.add_column(style="bold yellow") for store in stores: @@ -269,7 +269,7 @@ def render_item_details_panel(item: Dict[str, Any], *, title: Optional[str] = No # Create a specialized view with no results rows (only the metadata panel) # We set no_choice=True to hide the "#" column (not that there are any rows). - view = ItemDetailView(item_metadata=metadata, detail_title=title).set_no_choice(True) + view = ItemDetailView(item_metadata=metadata, detail_title=title)._interactive(True) # Ensure no title leaks in (prevents an empty "No results" table from rendering). try: view.title = "" diff --git a/TUI.py b/TUI.py index 6494285..b247283 100644 --- a/TUI.py +++ b/TUI.py @@ -40,7 +40,7 @@ for path in (REPO_ROOT, TUI_DIR): sys.path.insert(0, str_path) from TUI.pipeline_runner import PipelineRunResult # type: ignore # noqa: E402 -from SYS.result_table import ResultTable, extract_hash_value, extract_store_value # type: ignore # noqa: E402 +from SYS.result_table import Table, extract_hash_value, extract_store_value # type: ignore # noqa: E402 from SYS.config import load_config # type: ignore # noqa: E402 from Store.registry import Store as StoreRegistry # type: ignore # noqa: E402 @@ -435,7 +435,7 @@ class PipelineHubApp(App): self.results_table: Optional[DataTable] = None self.worker_table: Optional[DataTable] = None self.status_panel: Optional[Static] = None - self.current_result_table: Optional[ResultTable] = None + self.current_result_table: Optional[Table] = None self.suggestion_list: Optional[OptionList] = None self._cmdlet_names: List[str] = [] self._pipeline_running = False diff --git a/TUI/modalscreen/search.py b/TUI/modalscreen/search.py index bf5451b..f1ab2a1 100644 --- a/TUI/modalscreen/search.py +++ b/TUI/modalscreen/search.py @@ -15,7 +15,7 @@ import asyncio # Add parent directory to path for imports sys.path.insert(0, str(Path(__file__).parent.parent)) from SYS.config import load_config, resolve_output_dir -from SYS.result_table import ResultTable +from SYS.result_table import Table from ProviderCore.registry import get_search_provider logger = logging.getLogger(__name__) @@ -59,7 +59,7 @@ class SearchModal(ModalScreen): self.tags_textarea: Optional[TextArea] = None self.library_source_select: Optional[Select] = None self.current_results: List[Any] = [] # List of SearchResult objects - self.current_result_table: Optional[ResultTable] = None + self.current_result_table: Optional[Table] = None self.is_searching = False self.current_worker = None # Track worker for search operations @@ -189,7 +189,7 @@ class SearchModal(ModalScreen): self.current_worker.log_step(f"Found {len(results)} results") # Create ResultTable - table = ResultTable(f"Search Results: {query}") + table = Table(f"Search Results: {query}") for res in results: row = table.add_row() # Add columns from result.columns diff --git a/TUI/pipeline_runner.py b/TUI/pipeline_runner.py index e6d393c..f1b182d 100644 --- a/TUI/pipeline_runner.py +++ b/TUI/pipeline_runner.py @@ -31,7 +31,7 @@ except Exception: WorkerManagerRegistry = None from SYS.logger import set_debug from SYS.rich_display import capture_rich_output -from SYS.result_table import ResultTable +from SYS.result_table import Table @dataclass(slots=True) @@ -199,7 +199,7 @@ class PipelineRunner: if table is None and items: try: - synth = ResultTable("Results") + synth = Table("Results") for item in items: synth.add_result(item) table = synth diff --git a/cmdlet/_shared.py b/cmdlet/_shared.py index e2d317e..598963c 100644 --- a/cmdlet/_shared.py +++ b/cmdlet/_shared.py @@ -19,7 +19,7 @@ from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence, Set from dataclasses import dataclass, field from SYS import models from SYS import pipeline as pipeline_context -from SYS.result_table import ResultTable +from SYS.result_table import Table from SYS.rich_display import stderr_console as get_stderr_console from rich.prompt import Confirm from contextlib import AbstractContextManager, nullcontext @@ -1722,28 +1722,34 @@ def _print_live_safe_stderr(message: str) -> None: except Exception: return - cm = None + cm: AbstractContextManager[Any] | None = None try: from SYS import pipeline as _pipeline_ctx # type: ignore suspend = getattr(_pipeline_ctx, "suspend_live_progress", None) - cm = suspend() if callable(suspend) else None + candidate = suspend() if callable(suspend) else None + if isinstance(candidate, AbstractContextManager): + cm = candidate + elif candidate is not None and hasattr(candidate, "__enter__") and hasattr(candidate, "__exit__"): + cm = candidate # type: ignore[arg-type] except Exception: cm = None - try: - from contextlib import nullcontext - except Exception: - nullcontext = None # type: ignore if cm is None: - cm = nullcontext() if callable(nullcontext) else None + cm = nullcontext() try: - if cm is not None: - with cm: - stderr_console.print(str(message)) - else: - stderr_console.print(str(message)) + console = stderr_console() + print_func = getattr(console, "print", None) + except Exception: + return + + if not callable(print_func): + return + + try: + with cm: + print_func(str(message)) except Exception: return @@ -1919,21 +1925,21 @@ def _print_saved_output_panel(item: Any, final_path: Path) -> None: # If Rich Live progress is active, pause it while printing so the panel # doesn't get overwritten/truncated by Live's cursor control. + cm: AbstractContextManager[Any] | None = None try: from SYS import pipeline as _pipeline_ctx # type: ignore suspend = getattr(_pipeline_ctx, "suspend_live_progress", None) - cm = suspend() if callable(suspend) else None + cm_candidate = suspend() if callable(suspend) else None + if isinstance(cm_candidate, AbstractContextManager): + cm = cm_candidate + elif cm_candidate is not None and hasattr(cm_candidate, "__enter__") and hasattr(cm_candidate, "__exit__"): + cm = cm_candidate # type: ignore[arg-type] except Exception: cm = None - try: - from contextlib import nullcontext - except Exception: - nullcontext = None # type: ignore - if cm is None: - cm = nullcontext() if callable(nullcontext) else None + cm = nullcontext() try: location = str(final_path) @@ -1974,11 +1980,17 @@ def _print_saved_output_panel(item: Any, final_path: Path) -> None: grid.add_row("Hash", file_hash or "(unknown)") try: - if cm is not None: - with cm: - stderr_console.print(Panel(grid, title="Saved", expand=False)) - else: - stderr_console.print(Panel(grid, title="Saved", expand=False)) + console = stderr_console() + print_func = getattr(console, "print", None) + except Exception: + return + + if not callable(print_func): + return + + try: + with cm: + print_func(Panel(grid, title="Saved", expand=False)) except Exception: return @@ -2635,7 +2647,7 @@ def propagate_metadata( if p_obj.hash and p_obj.hash != "unknown": prev_by_hash[p_obj.hash] = p_obj - normalized: List[models.PipeObject] = [] + normalized: List[Any] = [] # Pre-calculate length matching for heuristic is_same_length = len(new_items) == len(prev_normalized) @@ -3688,11 +3700,12 @@ def check_url_exists_in_storage( if isinstance(response, dict): raw_hashes = response.get("hashes") or response.get("file_hashes") raw_ids = response.get("file_ids") + hash_list = raw_hashes if isinstance(raw_hashes, list) else [] has_ids = isinstance(raw_ids, list) and len(raw_ids) > 0 - has_hashes = isinstance(raw_hashes, list) and len(raw_hashes) > 0 + has_hashes = len(hash_list) > 0 if has_hashes: try: - found_hash = str(raw_hashes[0]).strip() + found_hash = str(hash_list[0]).strip() except Exception: found_hash = None if has_ids or has_hashes: @@ -3816,10 +3829,10 @@ def check_url_exists_in_storage( _mark_preflight_checked() return True - table = ResultTable(f"URL already exists ({len(matched_urls)} url(s))", max_columns=10) - table.set_no_choice(True) + table = Table(f"URL already exists ({len(matched_urls)} url(s))", max_columns=10) + table._interactive(True) try: - table.set_preserve_order(True) + table._perseverance(True) except Exception: pass diff --git a/cmdlet/add_file.py b/cmdlet/add_file.py index abf86ea..fa0c66f 100644 --- a/cmdlet/add_file.py +++ b/cmdlet/add_file.py @@ -419,7 +419,7 @@ class Add_File(Cmdlet): # The user then runs @N (optionally piped), which replays add-file with selected paths. if dir_scan_mode: try: - from SYS.result_table import ResultTable + from SYS.result_table import Table from pathlib import Path as _Path # Build base args to replay: keep everything except the directory -path. @@ -437,7 +437,7 @@ class Add_File(Cmdlet): continue base_args.append(t) - table = ResultTable(title="Files in Directory", preserve_order=True) + table = Table(title="Files in Directory", preserve_order=True) table.set_table("add-file.directory") table.set_source_command("add-file", base_args) @@ -668,7 +668,7 @@ class Add_File(Cmdlet): # Legacy search-file refresh is no longer used for final display. if want_final_search_file and collected_payloads: try: - from SYS.result_table import ResultTable + from SYS.result_table import Table from SYS.rich_display import render_item_details_panel # Stop the live pipeline progress UI before rendering the details panels. @@ -701,7 +701,7 @@ class Add_File(Cmdlet): for idx, payload in enumerate(collected_payloads, 1): render_item_details_panel(payload, title=f"#{idx} Item Details") - table = ResultTable("Result") + table = Table("Result") for payload in collected_payloads: table.add_result(payload) setattr(table, "_rendered_by_cmdlet", True) @@ -1442,9 +1442,9 @@ class Add_File(Cmdlet): return try: - from SYS.result_table import ResultTable + from SYS.result_table import Table - table = ResultTable("Result") + table = Table("Result") table.add_result(payload) # Overlay so @1 refers to this add-file result without overwriting search history ctx.set_last_result_table_overlay(table, [payload], subject=payload) diff --git a/cmdlet/add_tag.py b/cmdlet/add_tag.py index 9569c3e..dc82d96 100644 --- a/cmdlet/add_tag.py +++ b/cmdlet/add_tag.py @@ -1020,12 +1020,12 @@ class Add_Tag(Cmdlet): if is_last_stage and display_items: try: from SYS.rich_display import render_item_details_panel - from SYS.result_table import ResultTable + from SYS.result_table import Table for idx, item in enumerate(display_items, 1): render_item_details_panel(item, title=f"#{idx} Item Details") - table = ResultTable("Result") + table = Table("Result") for item in display_items: table.add_result(item) setattr(table, "_rendered_by_cmdlet", True) diff --git a/cmdlet/delete_file.py b/cmdlet/delete_file.py index a26e476..55f958d 100644 --- a/cmdlet/delete_file.py +++ b/cmdlet/delete_file.py @@ -13,7 +13,7 @@ from Store import Store from . import _shared as sh from API import HydrusNetwork as hydrus_wrapper from SYS import pipeline as ctx -from SYS.result_table import ResultTable, _format_size +from SYS.result_table import Table, _format_size from SYS.rich_display import stdout_console @@ -581,8 +581,8 @@ class Delete_File(sh.Cmdlet): deleted_rows.extend(rows) if deleted_rows: - table = ResultTable("Deleted") - table.set_no_choice(True).set_preserve_order(True) + table = Table("Deleted") + table._interactive(True)._perseverance(True) for row in deleted_rows: result_row = table.add_row() result_row.add_column("Title", row.get("title", "")) diff --git a/cmdlet/download_file.py b/cmdlet/download_file.py index 0ed1079..a1fd59b 100644 --- a/cmdlet/download_file.py +++ b/cmdlet/download_file.py @@ -21,7 +21,7 @@ from API.HTTP import _download_direct_file from SYS.models import DownloadError, DownloadOptions, DownloadMediaResult from SYS.logger import log, debug from SYS.pipeline_progress import PipelineProgress -from SYS.result_table import ResultTable +from SYS.result_table import Table from SYS.rich_display import stderr_console as get_stderr_console from SYS import pipeline as pipeline_context from SYS.utils import sha256_file @@ -746,12 +746,12 @@ class Download_File(Cmdlet): try: from SYS.rich_display import render_item_details_panel - from SYS.result_table import ResultTable + from SYS.result_table import Table for idx, item in enumerate(emitted_items, 1): render_item_details_panel(item, title=f"#{idx} Item Details") - table = ResultTable("Result") + table = Table("Result") for item in emitted_items: table.add_result(item) setattr(table, "_rendered_by_cmdlet", True) @@ -1184,7 +1184,7 @@ class Download_File(Cmdlet): return f"https://www.youtube.com/watch?v={entry_id.strip()}" return None - table = ResultTable() + table = Table() safe_url = str(url or "").strip() table.title = f'download-file -url "{safe_url}"' if safe_url else "download-file" if table_type: @@ -1194,7 +1194,7 @@ class Download_File(Cmdlet): table.table = table_type table.set_source_command("download-file", []) try: - table.set_preserve_order(True) + table._perseverance(True) except Exception: pass @@ -1318,7 +1318,7 @@ class Download_File(Cmdlet): if remaining_args: base_cmd += " " + " ".join(remaining_args) - table = ResultTable(title=f"Available formats for {url}", max_columns=10, preserve_order=True) + table = Table(title=f"Available formats for {url}", max_columns=10, preserve_order=True) table.set_table("ytdlp.formatlist") table.set_source_command("download-file", [url]) @@ -1601,7 +1601,7 @@ class Download_File(Cmdlet): if formats: formats_to_show = formats - table = ResultTable(title=f"Available formats for {url}", max_columns=10, preserve_order=True) + table = Table(title=f"Available formats for {url}", max_columns=10, preserve_order=True) table.set_table("ytdlp.formatlist") table.set_source_command("download-file", [url]) diff --git a/cmdlet/get_metadata.py b/cmdlet/get_metadata.py index 5186a12..073d669 100644 --- a/cmdlet/get_metadata.py +++ b/cmdlet/get_metadata.py @@ -15,7 +15,7 @@ SharedArgs = sh.SharedArgs parse_cmdlet_args = sh.parse_cmdlet_args get_field = sh.get_field from SYS import pipeline as ctx -from SYS.result_table import ResultTable +from SYS.result_table import Table class Get_Metadata(Cmdlet): @@ -147,7 +147,7 @@ class Get_Metadata(Cmdlet): } @staticmethod - def _add_table_body_row(table: ResultTable, row: Dict[str, Any]) -> None: + def _add_table_body_row(table: Table, row: Dict[str, Any]) -> None: """Add a single row to the ResultTable using the prepared columns.""" columns = row.get("columns") if isinstance(row, dict) else None lookup: Dict[str, @@ -285,7 +285,7 @@ class Get_Metadata(Cmdlet): ) table_title = f"get-metadata: {title}" if title else "get-metadata" - table = ResultTable(table_title + table = Table(table_title ).init_command(table_title, "get-metadata", list(args)) diff --git a/cmdlet/get_note.py b/cmdlet/get_note.py index 96cd28d..bbf98ba 100644 --- a/cmdlet/get_note.py +++ b/cmdlet/get_note.py @@ -7,7 +7,7 @@ import sys from SYS.logger import log from SYS import pipeline as ctx -from SYS.result_table import ResultTable +from SYS.result_table import Table from . import _shared as sh Cmdlet = sh.Cmdlet @@ -112,7 +112,7 @@ class Get_Note(Cmdlet): ItemDetailView("Notes", item_metadata=metadata) .set_table("note") .set_value_case("preserve") - .set_preserve_order(True) + ._perseverance(True) ) note_table.set_source_command("get-note", []) diff --git a/cmdlet/get_relationship.py b/cmdlet/get_relationship.py index 5d7d27a..a5adee0 100644 --- a/cmdlet/get_relationship.py +++ b/cmdlet/get_relationship.py @@ -23,7 +23,7 @@ should_show_help = sh.should_show_help get_field = sh.get_field from API.folder import API_folder_store from SYS.config import get_local_storage_path -from SYS.result_table import ResultTable +from SYS.result_table import Table from Store import Store CMDLET = Cmdlet( diff --git a/cmdlet/get_tag.py b/cmdlet/get_tag.py index a6c98ba..72a42ab 100644 --- a/cmdlet/get_tag.py +++ b/cmdlet/get_tag.py @@ -1552,9 +1552,9 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: ) return 0 - from SYS.result_table import ResultTable + from SYS.result_table import Table - table = ResultTable(f"Metadata: {provider.name}") + table = Table(f"Metadata: {provider.name}") table.set_table(f"metadata.{provider.name}") table.set_source_command("get-tag", []) selection_payload = [] diff --git a/cmdlet/get_url.py b/cmdlet/get_url.py index fee913b..d3512e5 100644 --- a/cmdlet/get_url.py +++ b/cmdlet/get_url.py @@ -18,7 +18,7 @@ Cmdlet, SharedArgs, parse_cmdlet_args, get_field, normalize_hash = ( sh.normalize_hash, ) from SYS.logger import log -from SYS.result_table import ResultTable +from SYS.result_table import Table from Store import Store from SYS import pipeline as ctx @@ -575,10 +575,10 @@ class Get_Url(Cmdlet): display_items: List[Dict[str, Any]] = [] table = ( - ResultTable( + Table( "url", max_columns=5 - ).set_preserve_order(True).set_table("url").set_value_case("preserve") + )._perseverance(True).set_table("url").set_value_case("preserve") ) table.set_source_command("get-url", ["-url", search_pattern]) @@ -660,7 +660,7 @@ class Get_Url(Cmdlet): "Urls", item_metadata=metadata, max_columns=1 - ).set_preserve_order(True).set_table("url").set_value_case("preserve") + )._perseverance(True).set_table("url").set_value_case("preserve") ) table.set_source_command("get-url", []) diff --git a/cmdlet/search_file.py b/cmdlet/search_file.py index 62a9e0b..954a31c 100644 --- a/cmdlet/search_file.py +++ b/cmdlet/search_file.py @@ -260,7 +260,7 @@ class search_file(Cmdlet): try: results_list: List[Dict[str, Any]] = [] - from SYS.result_table import ResultTable + from SYS.result_table import Table provider_text = str(provider_name or "").strip() provider_lower = provider_text.lower() @@ -311,7 +311,7 @@ class search_file(Cmdlet): # Internet Archive search results are effectively folders (items); selecting @N # should open a list of downloadable files for the chosen item. table_type = "internetarchive.folder" - table = ResultTable(table_title).set_preserve_order(preserve_order) + table = Table(table_title)._perseverance(preserve_order) table.set_table(table_type) if provider_lower == "alldebrid": table_meta["view"] = "files" if effective_open_id is not None else "folders" @@ -654,16 +654,16 @@ class search_file(Cmdlet): ) results_list = [] - from SYS.result_table import ResultTable + from SYS.result_table import Table - table = ResultTable(command_title) + table = Table(command_title) try: table.set_source_command("search-file", list(args_list)) except Exception: pass if hash_query: try: - table.set_preserve_order(True) + table._perseverance(True) except Exception: pass diff --git a/cmdnat/adjective.py b/cmdnat/adjective.py index 684d77b..aee2756 100644 --- a/cmdnat/adjective.py +++ b/cmdnat/adjective.py @@ -4,7 +4,7 @@ import sys from typing import List, Dict, Any, Optional, Sequence from cmdlet._shared import Cmdlet, CmdletArg, parse_cmdlet_args from SYS.logger import log -from SYS.result_table import ResultTable +from SYS.result_table import Table from SYS import pipeline as ctx ADJECTIVE_FILE = os.path.join( @@ -42,7 +42,7 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: # If no args, list categories if not args: - table = ResultTable("Adjective Categories") + table = Table("Adjective Categories") for i, (category, tags) in enumerate(data.items()): row = table.add_row() row.add_column("#", str(i + 1)) @@ -125,7 +125,7 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: # List tags in category (Default action if no flags or after modification) tags = data.get(category, []) - table = ResultTable(f"Tags in '{category}'") + table = Table(f"Tags in '{category}'") for i, tag in enumerate(tags): row = table.add_row() row.add_column("#", str(i + 1)) diff --git a/cmdnat/config.py b/cmdnat/config.py index 5755b30..63ed51d 100644 --- a/cmdnat/config.py +++ b/cmdnat/config.py @@ -3,7 +3,7 @@ from typing import List, Dict, Any, Optional, Sequence from cmdlet._shared import Cmdlet, CmdletArg from SYS.config import load_config, save_config from SYS import pipeline as ctx -from SYS.result_table import ResultTable +from SYS.result_table import Table CMDLET = Cmdlet( name=".config", @@ -157,7 +157,7 @@ def _show_config_table(config_data: Dict[str, Any]) -> int: return 0 items.sort(key=lambda x: x.get("key")) - table = ResultTable("Configuration") + table = Table("Configuration") table.set_table("config") table.set_source_command(".config", []) diff --git a/cmdnat/help.py b/cmdnat/help.py index b793432..6d1be45 100644 --- a/cmdnat/help.py +++ b/cmdnat/help.py @@ -7,7 +7,7 @@ import sys from cmdlet._shared import Cmdlet, CmdletArg, parse_cmdlet_args from cmdlet import REGISTRY as CMDLET_REGISTRY, ensure_cmdlet_modules_loaded from SYS.logger import log -from SYS.result_table import ResultTable +from SYS.result_table import Table from SYS import pipeline as ctx @@ -191,7 +191,7 @@ def _render_list( filter_text: Optional[str], args: Sequence[str] ) -> None: - table = ResultTable("Help") + table = Table("Help") table.set_source_command(".help", list(args)) items: List[Dict[str, Any]] = [] @@ -259,11 +259,11 @@ def _render_detail(meta: Dict[str, Any], _args: Sequence[str]) -> None: args_meta = meta.get("args", []) or [] - args_table = ResultTable(title) + args_table = Table(title) if header_lines: args_table.set_header_lines(header_lines) - args_table.set_preserve_order(True) - args_table.set_no_choice(True) + args_table._perseverance(True) + args_table._interactive(True) if not args_meta: row = args_table.add_row() @@ -285,8 +285,8 @@ def _render_detail(meta: Dict[str, Any], _args: Sequence[str]) -> None: desc = f"{desc} ({choice_text})" if desc else choice_text row.add_column("Description", desc) - example_table = ResultTable(f"{cmd_name} Examples") - example_table.set_preserve_order(True) + example_table = Table(f"{cmd_name} Examples") + example_table._perseverance(True) example_table.set_header_line("Select @N to insert the example command into the REPL.") example_items: List[str] = [] @@ -301,7 +301,7 @@ def _render_detail(meta: Dict[str, Any], _args: Sequence[str]) -> None: if tokens: example_table.set_row_selection_args(idx, tokens) else: - example_table.set_no_choice(True) + example_table._interactive(True) row = example_table.add_row() row.add_column("Example", "(no examples available)") diff --git a/cmdnat/matrix.py b/cmdnat/matrix.py index d66bad6..053d05e 100644 --- a/cmdnat/matrix.py +++ b/cmdnat/matrix.py @@ -10,7 +10,7 @@ from urllib.parse import parse_qs, urlparse from cmdlet._shared import Cmdlet, CmdletArg from SYS.logger import log, debug -from SYS.result_table import ResultTable +from SYS.result_table import Table from SYS import pipeline as ctx _MATRIX_PENDING_ITEMS_KEY = "matrix_pending_items" @@ -583,7 +583,7 @@ def _resolve_upload_path(item: Any, config: Dict[str, Any]) -> Optional[str]: def _show_main_menu() -> int: """Display main menu: Rooms or Settings.""" - table = ResultTable("Matrix (select with @N)") + table = Table("Matrix (select with @N)") table.set_table("matrix") table.set_source_command(".matrix", []) @@ -613,7 +613,7 @@ def _show_main_menu() -> int: def _show_settings_table(config: Dict[str, Any]) -> int: """Display Matrix configuration settings as a modifiable table.""" - table = ResultTable("Matrix Settings (select with @N to modify)") + table = Table("Matrix Settings (select with @N to modify)") table.set_table("matrix") table.set_source_command(".matrix", ["-settings"]) @@ -800,7 +800,7 @@ def _show_rooms_table(config: Dict[str, Any]) -> int: log("No joined rooms found.", file=sys.stderr) return 0 - table = ResultTable("Matrix Rooms (select with @N)") + table = Table("Matrix Rooms (select with @N)") table.set_table("matrix") table.set_source_command(".matrix", []) diff --git a/cmdnat/pipe.py b/cmdnat/pipe.py index 9f7b030..3332f50 100644 --- a/cmdnat/pipe.py +++ b/cmdnat/pipe.py @@ -9,7 +9,7 @@ from urllib.parse import urlparse, parse_qs from pathlib import Path from cmdlet._shared import Cmdlet, CmdletArg, parse_cmdlet_args, resolve_tidal_manifest_path from SYS.logger import debug, get_thread_stream, is_debug_enabled, set_debug, set_thread_stream -from SYS.result_table import ResultTable +from SYS.result_table import Table from MPV.mpv_ipc import MPV from SYS import pipeline as ctx from SYS.models import PipeObject @@ -1455,7 +1455,7 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: debug("No saved playlists found.") return 0 - table = ResultTable("Saved Playlists") + table = Table("Saved Playlists") for i, pl in enumerate(playlists): item_count = len(pl.get("items", [])) row = table.add_row() @@ -1813,7 +1813,7 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: except NameError: table_title = "MPV Playlist" - table = ResultTable(table_title, preserve_order=True) + table = Table(table_title, preserve_order=True) # Convert MPV items to PipeObjects with proper hash and store pipe_objects = [] diff --git a/cmdnat/status.py b/cmdnat/status.py index 3b46549..76d7995 100644 --- a/cmdnat/status.py +++ b/cmdnat/status.py @@ -7,7 +7,7 @@ from datetime import datetime from cmdlet._shared import Cmdlet, CmdletArg from SYS import pipeline as ctx -from SYS.result_table import ResultTable +from SYS.result_table import Table from SYS.logger import log, set_debug, debug from SYS.rich_display import stdout_console @@ -23,7 +23,7 @@ def _upper(value: Any) -> str: return text.upper() def _add_startup_check( - table: ResultTable, + table: Table, status: str, name: str, *, @@ -104,13 +104,20 @@ def _ping_first(urls: list[str]) -> tuple[bool, str]: return False, "No ping target" def _run(result: Any, args: List[str], config: Dict[str, Any]) -> int: - startup_table = ResultTable( + startup_table = Table( "*********************************************" ) - startup_table.set_no_choice(True).set_preserve_order(True) + startup_table._interactive(True)._perseverance(True) startup_table.set_value_case("upper") debug_enabled = bool(config.get("debug", False)) + try: + # Ensure global debug state follows config so HTTPClient and other helpers + # emit debug-level information during the status check. + set_debug(debug_enabled) + except Exception: + pass + debug(f"Status check: debug_enabled={debug_enabled}") _add_startup_check(startup_table, "ENABLED" if debug_enabled else "DISABLED", "DEBUGGING") try: @@ -120,16 +127,24 @@ def _run(result: Any, args: List[str], config: Dict[str, Any]) -> int: MPV() mpv_path = shutil.which("mpv") _add_startup_check(startup_table, "ENABLED", "MPV", detail=mpv_path or "Available") + debug(f"MPV check OK: path={mpv_path or 'Available'}") except Exception as exc: _add_startup_check(startup_table, "DISABLED", "MPV", detail=str(exc)) + debug(f"MPV check failed: {exc}") # Store Registry store_registry = None try: from Store import Store as StoreRegistry store_registry = StoreRegistry(config=config, suppress_debug=True) - except Exception: - pass + try: + backends = store_registry.list_backends() + except Exception: + backends = [] + debug(f"StoreRegistry initialized. backends={backends}") + except Exception as exc: + debug(f"StoreRegistry initialization failed: {exc}") + store_registry = None # Hydrus if _has_store_subtype(config, "hydrusnetwork"): @@ -138,6 +153,7 @@ def _run(result: Any, args: List[str], config: Dict[str, Any]) -> int: if not isinstance(icfg, dict): continue nkey = str(icfg.get("NAME") or iname) uval = str(icfg.get("URL") or "").strip() + debug(f"Hydrus network check: name={nkey}, url={uval}") ok = bool(store_registry and store_registry.is_available(nkey)) status = "ENABLED" if ok else "DISABLED" files = None @@ -148,9 +164,12 @@ def _run(result: Any, args: List[str], config: Dict[str, Any]) -> int: files = getattr(backend, "total_count", None) if files is None and hasattr(backend, "get_total_count"): files = backend.get_total_count() - except Exception: pass + debug(f"Hydrus backend '{nkey}' available: files={files}") + except Exception as exc: + debug(f"Hydrus backend '{nkey}' check failed: {exc}") else: err = store_registry.get_backend_error(iname) if store_registry else None + debug(f"Hydrus backend '{nkey}' not available: {err}") detail = f"{uval} - {err or 'Unavailable'}" _add_startup_check(startup_table, status, nkey, store="hydrusnetwork", files=files, detail=detail) @@ -164,6 +183,7 @@ def _run(result: Any, args: List[str], config: Dict[str, Any]) -> int: s_avail = list_search_providers(config) or {} f_avail = list_file_providers(config) or {} m_avail = list_metadata_providers(config) or {} + debug(f"Provider registries: providers={list(p_avail.keys())}, search={list(s_avail.keys())}, file={list(f_avail.keys())}, metadata={list(m_avail.keys())}") already = {"matrix"} for pname in pcfg.keys(): @@ -176,25 +196,31 @@ def _run(result: Any, args: List[str], config: Dict[str, Any]) -> int: from Provider.alldebrid import _get_debrid_api_key from API.alldebrid import AllDebridClient api_key = _get_debrid_api_key(config) + debug(f"AllDebrid configured: api_key_present={bool(api_key)}") if not api_key: _add_startup_check(startup_table, "DISABLED", display, provider=prov, detail="Not configured") else: client = AllDebridClient(api_key) _add_startup_check(startup_table, "ENABLED", display, provider=prov, detail=getattr(client, "base_url", "Connected")) + debug(f"AllDebrid client connected: base_url={getattr(client, 'base_url', 'unknown')}") except Exception as exc: _add_startup_check(startup_table, "DISABLED", display, provider=prov, detail=str(exc)) + debug(f"AllDebrid check failed: {exc}") already.add(prov) continue is_known = prov in p_avail or prov in s_avail or prov in f_avail or prov in m_avail if not is_known: _add_startup_check(startup_table, "UNKNOWN", display, provider=prov, detail="Not registered") + debug(f"Provider {prov} not registered") else: ok_val = p_avail.get(prov) or s_avail.get(prov) or f_avail.get(prov) or m_avail.get(prov) detail = "Configured" if ok_val else "Not configured" ping_targets = _default_provider_ping_targets(prov) if ping_targets: + debug(f"Provider {prov} ping targets: {ping_targets}") pok, pdet = _ping_first(ping_targets) + debug(f"Provider {prov} ping result: ok={pok}, detail={pdet}") detail = pdet if ok_val else f"{detail} | {pdet}" _add_startup_check(startup_table, "ENABLED" if ok_val else "DISABLED", display, provider=prov, detail=detail) already.add(prov) @@ -208,9 +234,16 @@ def _run(result: Any, args: List[str], config: Dict[str, Any]) -> int: hs = str(mcfg.get("homeserver") or "").strip() rid = str(mcfg.get("room_id") or "").strip() detail = f"{hs} room:{rid}" - _add_startup_check(startup_table, "ENABLED" if m_prov.validate() else "DISABLED", "Matrix", provider="matrix", detail=detail) + valid = False + try: + valid = bool(m_prov.validate()) + except Exception as exc: + debug(f"Matrix validate failed: {exc}") + _add_startup_check(startup_table, "ENABLED" if valid else "DISABLED", "Matrix", provider="matrix", detail=detail) + debug(f"Matrix check: homeserver={hs}, room_id={rid}, validate={valid}") except Exception as exc: _add_startup_check(startup_table, "DISABLED", "Matrix", provider="matrix", detail=str(exc)) + debug(f"Matrix instantiation failed: {exc}") # Folders if _has_store_subtype(config, "folder"): @@ -219,6 +252,7 @@ def _run(result: Any, args: List[str], config: Dict[str, Any]) -> int: if not isinstance(icfg, dict): continue nkey = str(icfg.get("NAME") or iname) pval = str(icfg.get("PATH") or icfg.get("path") or "").strip() + debug(f"Folder store check: name={nkey}, path={pval}") ok = bool(store_registry and store_registry.is_available(nkey)) if ok and store_registry: backend = store_registry[nkey] @@ -226,9 +260,11 @@ def _run(result: Any, args: List[str], config: Dict[str, Any]) -> int: sdet = getattr(backend, "scan_detail", "Up to date") stats = getattr(backend, "scan_stats", {}) files = int(stats.get("files_total_db", 0)) if stats else None + debug(f"Folder backend '{nkey}': scan_ok={scan_ok}, scan_detail={sdet}, stats={stats}") _add_startup_check(startup_table, "SCANNED" if scan_ok else "ERROR", nkey, store="folder", files=files, detail=f"{pval} - {sdet}") else: err = store_registry.get_backend_error(iname) if store_registry else None + debug(f"Folder backend '{nkey}' error: {err}") _add_startup_check(startup_table, "ERROR", nkey, store="folder", detail=f"{pval} - {err or 'Unavailable'}") # Cookies @@ -236,20 +272,27 @@ def _run(result: Any, args: List[str], config: Dict[str, Any]) -> int: from tool.ytdlp import YtDlpTool cf = YtDlpTool(config).resolve_cookiefile() _add_startup_check(startup_table, "FOUND" if cf else "MISSING", "Cookies", detail=str(cf) if cf else "Not found") - except Exception: pass + debug(f"Cookies: resolved cookiefile={cf}") + except Exception as exc: + debug(f"Cookies check failed: {exc}") # ZeroTier Hosting zt_conf = config.get("networking", {}).get("zerotier", {}) if zt_conf.get("serve"): from SYS.background_services import ensure_zerotier_server_running - ensure_zerotier_server_running() - + try: + debug("ZeroTier hosting enabled; ensuring server is running") + ensure_zerotier_server_running() + except Exception as exc: + debug(f"ensure_zerotier_server_running failed: {exc}") + serve_target = zt_conf.get("serve") port = zt_conf.get("port") or 999 status = "OFFLINE" detail = f"Sharing: {serve_target} on port {port}" try: from API.HTTP import HTTPClient + debug(f"Probing ZeroTier health on 127.0.0.1:{port}") # Probing 127.0.0.1 is more reliable on Windows than localhost with HTTPClient(timeout=1.0, retries=0) as client: resp = client.get(f"http://127.0.0.1:{port}/health") @@ -257,8 +300,9 @@ def _run(result: Any, args: List[str], config: Dict[str, Any]) -> int: status = "ONLINE" payload = resp.json() detail += f" (Live: {payload.get('name', 'unknown')})" - except Exception: - pass + debug(f"ZeroTier host responded: status={resp.status_code}, payload_keys={list(payload.keys()) if isinstance(payload, dict) else 'unknown'}") + except Exception as exc: + debug(f"ZeroTier probe failed: {exc}") _add_startup_check(startup_table, status, "ZeroTier Host", detail=detail) except Exception as exc: @@ -269,6 +313,7 @@ def _run(result: Any, args: List[str], config: Dict[str, Any]) -> int: # (avoiding duplication in TUI logs, while keeping it in TUI Results) setattr(startup_table, "_rendered_by_cmdlet", True) ctx.set_current_stage_table(startup_table) + debug(f"Status check completed: {len(startup_table.rows)} checks recorded") return 0 diff --git a/cmdnat/telegram.py b/cmdnat/telegram.py index 7ec9a1b..063c130 100644 --- a/cmdnat/telegram.py +++ b/cmdnat/telegram.py @@ -6,7 +6,7 @@ from typing import Any, Dict, List, Optional, Sequence from cmdlet._shared import Cmdlet, CmdletArg from SYS.logger import log -from SYS.result_table import ResultTable +from SYS.result_table import Table from SYS import pipeline as ctx _TELEGRAM_PENDING_ITEMS_KEY = "telegram_pending_items" @@ -299,7 +299,7 @@ def _run(_result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: ) return 0 - table = ResultTable("Telegram Chats") + table = Table("Telegram Chats") table.set_table("telegram") table.set_source_command(".telegram", []) diff --git a/cmdnat/zerotier.py b/cmdnat/zerotier.py index d7553fc..ab9f55d 100644 --- a/cmdnat/zerotier.py +++ b/cmdnat/zerotier.py @@ -11,14 +11,14 @@ if str(root) not in sys.path: from cmdlet._shared import Cmdlet, CmdletArg from SYS.config import load_config -from SYS.result_table import ResultTable +from SYS.result_table import Table from API import zerotier as zt def exec_zerotier(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: # Use provided config or fall back to CWD load cfg = config if config else load_config(Path.cwd()) - table = ResultTable("ZeroTier Status") + table = Table("ZeroTier Status") # 1. Local Hub Status row = table.add_row()