from __future__ import annotations from typing import Any, Dict, Iterable, Optional, Sequence from pathlib import Path from . import _shared as sh from SYS.logger import log, debug from SYS import pipeline as ctx from SYS.result_table_adapters import get_provider from SYS.result_table_renderers import RichRenderer Cmdlet = sh.Cmdlet CmdletArg = sh.CmdletArg parse_cmdlet_args = sh.parse_cmdlet_args CMDLET = Cmdlet( name="provider-table", summary="Render a provider's result set and optionally run a follow-up cmdlet using the selected row.", usage="provider-table -provider [-sample] [-select ] [-run-cmd ]", arg=[ CmdletArg("provider", type="string", description="Provider name to render (default: example)"), CmdletArg("sample", type="flag", description="Use provider sample/demo items when available."), CmdletArg("select", type="int", description="1-based row index to select and use for follow-up command."), CmdletArg("run-cmd", type="string", description="Cmdlet to invoke with the selected row's selector args."), ], detail=[ "Use a registered provider to build a table and optionally run another cmdlet with selection args.", "Emits pipeline-friendly dicts enriched with `_selection_args` so you can pipe into `select` and other cmdlets.", "Example: provider-table -provider example -sample | select -select 1 | add-file -store default", ], ) def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: parsed = parse_cmdlet_args(args, CMDLET) provider_name = parsed.get("provider") or "example" use_sample = bool(parsed.get("sample", False)) run_cmd = parsed.get("run-cmd") select_raw = parsed.get("select") try: provider = get_provider(provider_name) except Exception as exc: log(f"Unknown provider: {provider_name}", file=sys.stderr) return 1 # Obtain items to feed to the adapter items = None if use_sample: # Try to locate SAMPLE_ITEMS in the adapter's module (convention only) try: mod = __import__(provider.adapter.__module__, fromlist=["*"]) items = getattr(mod, "SAMPLE_ITEMS", None) if items is None: log("Provider does not expose SAMPLE_ITEMS; no sample available", file=sys.stderr) return 1 except Exception: log("Failed to load provider sample", file=sys.stderr) return 1 else: # Require input for non-sample runs inputs = list(result) if isinstance(result, Iterable) else [] if not inputs: log("No input provided. Use -sample for demo or pipe provider items in.", file=sys.stderr) return 1 items = inputs # Build rows try: rows = list(provider.adapter(items)) except Exception as exc: log(f"Provider adapter failed: {exc}", file=sys.stderr) return 1 cols = provider.get_columns(rows) # Emit rows for downstream pipeline consumption (pipable behavior). try: for r in rows: try: item = { "title": getattr(r, "title", None) or None, "path": getattr(r, "path", None) or None, "ext": getattr(r, "ext", None) or None, "size_bytes": getattr(r, "size_bytes", None) or None, "metadata": getattr(r, "metadata", None) or {}, "source": getattr(r, "source", None) or provider.name, "_selection_args": provider.selection_args(r), } ctx.emit(item) except Exception: # Best-effort: continue emitting other rows continue except Exception: # Non-fatal: continue to rendering even if emission fails pass # Render using RichRenderer try: table = RichRenderer().render(rows, cols, provider.metadata) try: from rich.console import Console Console().print(table) except Exception: # Fallback to simple printing for r in rows: print(" ".join(str((c.extractor(r) or "")) for c in cols)) except Exception as exc: log(f"Rendering failed: {exc}", file=sys.stderr) return 1 # If no selection requested, we're done if not select_raw: return 0 try: select_idx = int(select_raw) - 1 except Exception: log("Invalid -select value; must be an integer", file=sys.stderr) return 1 if select_idx < 0 or select_idx >= len(rows): log("-select out of range", file=sys.stderr) return 1 selected = rows[select_idx] sel_args = provider.selection_args(selected) if not run_cmd: # Print selection args for caller log(f"Selection args: {sel_args}", file=sys.stderr) return 0 # Run follow-up cmdlet try: from cmdlet import ensure_cmdlet_modules_loaded, get as get_cmdlet ensure_cmdlet_modules_loaded() cmd_fn = get_cmdlet(run_cmd) if not cmd_fn: log(f"Follow-up cmdlet not found: {run_cmd}", file=sys.stderr) return 1 # Call the cmdlet with no upstream result, but with selection args ret = cmd_fn(None, sel_args, config or {}) return ret except Exception as exc: log(f"Failed to invoke follow-up cmdlet: {exc}", file=sys.stderr) return 1 CMDLET.exec = _run CMDLET.register()