From 11a13edb84bcac9c7fb4b5b0b0b3a0219dac302b Mon Sep 17 00:00:00 2001 From: nose Date: Sun, 21 Dec 2025 05:10:09 -0800 Subject: [PATCH] dfdsf --- .gitignore | 4 +- API/HydrusNetwork.py | 4 + CLI.py | 254 +++++++++++++++-- ProviderCore/registry.py | 11 + SYS/logger.py | 42 ++- cli_syntax.py | 120 ++++++++ cmdlet/_shared.py | 4 +- cmdlet/add_note.py | 161 ++++++----- cmdlet/download_media.py | 129 ++++++++- cmdlet/screen_shot.py | 224 ++++++++++++--- models.py | 603 +++++++++++++++++++++++++++++++++++++-- pipeline.py | 65 +++++ result_table.py | 268 ++++++++++++++--- tool/playwright.py | 27 +- tool/ytdlp.py | 9 +- 15 files changed, 1712 insertions(+), 213 deletions(-) diff --git a/.gitignore b/.gitignore index 1e441a5..3584344 100644 --- a/.gitignore +++ b/.gitignore @@ -224,4 +224,6 @@ MPV/ffmpeg/* MPV/portable_config/* Log/ Log/medeia_macina/telegram.session -*.session \ No newline at end of file +*.session +example.py +test* \ No newline at end of file diff --git a/API/HydrusNetwork.py b/API/HydrusNetwork.py index 387f28e..9829596 100644 --- a/API/HydrusNetwork.py +++ b/API/HydrusNetwork.py @@ -200,6 +200,10 @@ class HydrusNetwork: content = spec.data else: json_data = spec.data + # Hydrus expects JSON bodies to be sent with Content-Type: application/json. + # httpx will usually set this automatically, but we set it explicitly to + # match the Hydrus API docs and avoid edge cases. + headers.setdefault("Content-Type", "application/json") logger.debug(f"{self._log_prefix()} Request body size: {len(content) if content else 'json'}") response = client.request( diff --git a/CLI.py b/CLI.py index c4a0c50..a14f250 100644 --- a/CLI.py +++ b/CLI.py @@ -1078,15 +1078,16 @@ class CmdletExecutor: filtered_args.append(str(arg)) + # IMPORTANT: Do not implicitly feed the previous command's results into + # a new command unless the user explicitly selected items via @ syntax. + # Piping should require `|` (or an explicit @ selection). piped_items = ctx.get_last_result_items() result: Any = None - if piped_items: + if piped_items and (select_all or selected_indices): if select_all: result = piped_items - elif selected_indices: - result = [piped_items[idx] for idx in selected_indices if 0 <= idx < len(piped_items)] else: - result = piped_items + result = [piped_items[idx] for idx in selected_indices if 0 <= idx < len(piped_items)] worker_manager = WorkerManagerRegistry.ensure(config) stage_session = WorkerStages.begin_stage( @@ -1249,6 +1250,12 @@ class PipelineExecutor: import pipeline as ctx try: + try: + if hasattr(ctx, "clear_pipeline_stop"): + ctx.clear_pipeline_stop() + except Exception: + pass + stages = self._split_stages(tokens) if not stages: print("Invalid pipeline syntax\n") @@ -1283,7 +1290,10 @@ class PipelineExecutor: config = self._config_loader.load() if isinstance(config, dict): - config["_quiet_background_output"] = True + # This executor is used by both the REPL and the `pipeline` subcommand. + # Quiet/background mode is helpful for detached/background runners, but + # it suppresses interactive UX (like the pipeline Live progress UI). + config["_quiet_background_output"] = bool(self._toolbar_output is None) def _resolve_items_for_selection(table_obj, items_list): return items_list if items_list else [] @@ -1322,12 +1332,19 @@ class PipelineExecutor: _add(getattr(item, "table", None)) try: - from ProviderCore.registry import get_provider + from ProviderCore.registry import get_provider, is_known_provider_name except Exception: get_provider = None # type: ignore + is_known_provider_name = None # type: ignore if get_provider is not None: for key in candidates: + try: + if is_known_provider_name is not None and (not is_known_provider_name(key)): + continue + except Exception: + # If the predicate fails for any reason, fall back to legacy behavior. + pass try: provider = get_provider(key, config) except Exception: @@ -1441,6 +1458,9 @@ class PipelineExecutor: pipeline_status = "completed" pipeline_error = "" + progress_ui = None + pipe_index_by_stage: Dict[int, int] = {} + try: if first_stage_selection_indices: if not ctx.get_current_stage_table_source_command(): @@ -1594,6 +1614,45 @@ class PipelineExecutor: print("No previous results to select from\n") return + # ------------------------------------------------------------------ + # Multi-level pipeline progress (pipes = stages, tasks = items) + # ------------------------------------------------------------------ + try: + quiet_mode = bool(config.get("_quiet_background_output")) if isinstance(config, dict) else False + except Exception: + quiet_mode = False + + try: + import sys as _sys + + if (not quiet_mode) and bool(getattr(_sys.stderr, "isatty", lambda: False)()): + from models import PipelineLiveProgress + + pipe_stage_indices: List[int] = [] + pipe_labels: List[str] = [] + for idx, tokens in enumerate(stages): + if not tokens: + continue + name = str(tokens[0]).replace("_", "-").lower() + if name == "@" or name.startswith("@"): + continue + pipe_stage_indices.append(idx) + pipe_labels.append(name) + + if pipe_labels: + progress_ui = PipelineLiveProgress(pipe_labels, enabled=True) + progress_ui.start() + try: + import pipeline as _pipeline_ctx + if hasattr(_pipeline_ctx, "set_live_progress"): + _pipeline_ctx.set_live_progress(progress_ui) + except Exception: + pass + pipe_index_by_stage = {stage_idx: pipe_idx for pipe_idx, stage_idx in enumerate(pipe_stage_indices)} + except Exception: + progress_ui = None + pipe_index_by_stage = {} + for stage_index, stage_tokens in enumerate(stages): if not stage_tokens: continue @@ -1735,10 +1794,60 @@ class PipelineExecutor: ) stage_worker_id = stage_session.worker_id if stage_session else None + + # Estimate how many per-item tasks this pipe will run. + pipe_idx = pipe_index_by_stage.get(stage_index) + if progress_ui is not None and pipe_idx is not None: + try: + # Prefer piped input for task counts. + if isinstance(piped_result, list): + total_items = len(piped_result) + preview_items: Optional[List[Any]] = list(piped_result) + elif piped_result is not None: + total_items = 1 + preview_items = [piped_result] + else: + # First stage without piped input: infer from URL-ish args. + preview: List[Any] = [] + + toks = list(stage_tokens[1:]) + i = 0 + while i < len(toks): + t = str(toks[i]) + low = t.lower().strip() + if low in {"-url", "--url"} and i + 1 < len(toks): + nxt = str(toks[i + 1]) + if nxt and not nxt.startswith("-"): + preview.append(nxt) + i += 2 + continue + if (not t.startswith("-")) and ( + "://" in low or low.startswith(("magnet:", "torrent:")) + ): + preview.append(t) + i += 1 + + preview_items = preview if preview else None + total_items = len(preview) if preview else 1 + + progress_ui.begin_pipe(pipe_idx, total_items=int(total_items), items_preview=preview_items) + except Exception: + pass + + on_emit = None + if progress_ui is not None and pipe_idx is not None: + def _on_emit(obj: Any, _idx: int = int(pipe_idx)) -> None: + try: + progress_ui.on_emit(_idx, obj) + except Exception: + pass + on_emit = _on_emit + pipeline_ctx = ctx.PipelineStageContext( stage_index=stage_index, total_stages=len(stages), worker_id=stage_worker_id, + on_emit=on_emit, ) ctx.set_stage_context(pipeline_ctx) stage_status = "completed" @@ -1784,6 +1893,17 @@ class PipelineExecutor: stage_is_last = stage_index + 1 >= len(stages) + # Graceful early-stop: preflight declined, etc. + try: + stop_req = ctx.get_pipeline_stop() if hasattr(ctx, "get_pipeline_stop") else None + except Exception: + stop_req = None + if stop_req is not None: + # Do not treat as an error; just end the pipeline quietly. + pipeline_status = "completed" + pipeline_error = "" + return + emits: List[Any] = [] if getattr(pipeline_ctx, "emits", None) is not None: emits = list(pipeline_ctx.emits or []) @@ -1825,6 +1945,25 @@ class PipelineExecutor: already_rendered = False if not already_rendered: + # Stop the Live progress display before printing a selectable table. + # Printing while Live is active can cause the table to be truncated/overwritten. + if progress_ui is not None: + try: + if pipe_idx is not None: + progress_ui.finish_pipe(int(pipe_idx), force_complete=True) + except Exception: + pass + try: + progress_ui.stop() + except Exception: + pass + try: + import pipeline as _pipeline_ctx + if hasattr(_pipeline_ctx, "set_live_progress"): + _pipeline_ctx.set_live_progress(None) + except Exception: + pass + progress_ui = None stdout_console().print() stdout_console().print(stage_table) @@ -1845,6 +1984,26 @@ class PipelineExecutor: # table they placed into pipeline context (e.g. get-tag). Prefer a # display table if one exists, otherwise the current-stage table. if stage_is_last: + # Stop the Live progress display before printing the final table. + # This avoids cursor-control interactions that can truncate output. + if progress_ui is not None: + try: + if pipe_idx is not None: + progress_ui.finish_pipe(int(pipe_idx), force_complete=(stage_status == "completed")) + except Exception: + pass + try: + progress_ui.stop() + except Exception: + pass + try: + import pipeline as _pipeline_ctx + if hasattr(_pipeline_ctx, "set_live_progress"): + _pipeline_ctx.set_live_progress(None) + except Exception: + pass + progress_ui = None + final_table = None try: final_table = ctx.get_display_table() if hasattr(ctx, "get_display_table") else None @@ -1853,6 +2012,36 @@ class PipelineExecutor: if final_table is None: final_table = stage_table + # If the cmdlet emitted results but didn't supply a fresh table, it's + # common for `stage_table` to still point at the previous stage's table + # (e.g. add-file's canonical store table). In that case, prefer rendering + # the emitted results so the user sees the actual output of this stage. + if emits and (ctx.get_display_table() if hasattr(ctx, "get_display_table") else None) is None: + try: + src_cmd = str(getattr(final_table, "source_command", "") or "").strip().lower() if final_table else "" + except Exception: + src_cmd = "" + try: + cur_cmd = str(cmd_name or "").strip().replace("_", "-").lower() + except Exception: + cur_cmd = "" + if (final_table is None) or (not src_cmd) or (src_cmd.replace("_", "-") != cur_cmd): + try: + table_title = CmdletExecutor._get_table_title_for_command(cmd_name, emits, list(stage_args)) + except Exception: + table_title = "Results" + table = ResultTable(table_title) + for item in emits: + table.add_result(item) + try: + if hasattr(ctx, "set_last_result_table_overlay"): + ctx.set_last_result_table_overlay(table, emits) + if hasattr(ctx, "set_current_stage_table"): + ctx.set_current_stage_table(table) + except Exception: + pass + final_table = table + if final_table is not None: try: already_rendered = bool(getattr(final_table, "_rendered_by_cmdlet", False)) @@ -1863,18 +2052,7 @@ class PipelineExecutor: stdout_console().print() stdout_console().print(final_table) - # Fallback: if a cmdlet emitted results but did not provide a table, - # render a standard ResultTable so last-stage pipelines still show output. - if final_table is None and emits: - try: - table_title = CmdletExecutor._get_table_title_for_command(cmd_name, emits, list(stage_args)) - except Exception: - table_title = "Results" - table = ResultTable(table_title) - for item in emits: - table.add_result(item) - stdout_console().print() - stdout_console().print(table) + # (Fallback handled above by synthesizing an overlay ResultTable.) if isinstance(ret_code, int) and ret_code != 0: stage_status = "failed" @@ -1891,6 +2069,11 @@ class PipelineExecutor: pipeline_error = f"{stage_label} error: {exc}" return finally: + if progress_ui is not None and pipe_idx is not None: + try: + progress_ui.finish_pipe(int(pipe_idx), force_complete=(stage_status == "completed")) + except Exception: + pass try: if hasattr(ctx, "clear_current_cmdlet_name"): ctx.clear_current_cmdlet_name() @@ -1925,6 +2108,17 @@ class PipelineExecutor: pipeline_error = str(exc) print(f"[error] Failed to execute pipeline: {exc}\n") finally: + if progress_ui is not None: + try: + progress_ui.stop() + except Exception: + pass + try: + import pipeline as _pipeline_ctx + if hasattr(_pipeline_ctx, "set_live_progress"): + _pipeline_ctx.set_live_progress(None) + except Exception: + pass if pipeline_session: pipeline_session.close(status=pipeline_status, error_msg=pipeline_error) except Exception as exc: @@ -1933,6 +2127,11 @@ class PipelineExecutor: Welcome = """ # MEDIOS-MACINA +Romans 1:22 Professing themselves to be wise, they became fools, + + +dfd +== Rich can do a pretty *decent* job of rendering markdown. 1. This is a list item @@ -1966,6 +2165,19 @@ class MedeiaCLI: def build_app(self) -> typer.Typer: app = typer.Typer(help="Medeia-Macina CLI") + def _validate_pipeline_option(ctx: typer.Context, param: typer.CallbackParam, value: str): + try: + from cli_syntax import validate_pipeline_text + + syntax_error = validate_pipeline_text(value) + if syntax_error: + raise typer.BadParameter(syntax_error.message) + except typer.BadParameter: + raise + except Exception: + pass + return value + def _complete_search_provider(ctx, param, incomplete: str): # pragma: no cover try: from click.shell_completion import CompletionItem @@ -1996,7 +2208,9 @@ class MedeiaCLI: @app.command("pipeline") def pipeline( - command: str = typer.Option(..., "--pipeline", "-p", help="Pipeline command string to execute"), + command: str = typer.Option( + ..., "--pipeline", "-p", help="Pipeline command string to execute", callback=_validate_pipeline_option + ), seeds_json: Optional[str] = typer.Option(None, "--seeds-json", "-s", help="JSON string of seed items"), ) -> None: import pipeline as ctx @@ -2064,7 +2278,7 @@ class MedeiaCLI: def run_repl(self) -> None: # (Startup banner is optional; keep the REPL quiet by default.) - prompt_text = "🜂🜄🜁🜃|" + prompt_text = "<🜂🜄🜁🜃>" startup_table = ResultTable( "*********************************************" diff --git a/ProviderCore/registry.py b/ProviderCore/registry.py index e324d59..1908ab3 100644 --- a/ProviderCore/registry.py +++ b/ProviderCore/registry.py @@ -40,6 +40,17 @@ _PROVIDERS: Dict[str, Type[Provider]] = { } +def is_known_provider_name(name: str) -> bool: + """Return True if `name` matches a registered provider key. + + This is intentionally cheap (no imports/instantiation) so callers can + probe UI strings (table names, store names, etc.) without triggering + noisy 'Unknown provider' logs. + """ + + return (name or "").strip().lower() in _PROVIDERS + + def _supports_search(provider: Provider) -> bool: return provider.__class__.search is not Provider.search diff --git a/SYS/logger.py b/SYS/logger.py index 02894d0..00e1691 100644 --- a/SYS/logger.py +++ b/SYS/logger.py @@ -119,18 +119,36 @@ def debug_inspect( if not effective_title and prefix: effective_title = prefix - rich_inspect( - obj, - console=console, - title=effective_title, - methods=methods, - docs=docs, - private=private, - dunder=dunder, - sort=sort, - all=all, - value=value, - ) + # Show full identifiers (hashes/paths) without Rich shortening. + # Guard for older Rich versions which may not support max_* parameters. + try: + rich_inspect( + obj, + console=console, + title=effective_title, + methods=methods, + docs=docs, + private=private, + dunder=dunder, + sort=sort, + all=all, + value=value, + max_string=100_000, + max_length=100_000, + ) + except TypeError: + rich_inspect( + obj, + console=console, + title=effective_title, + methods=methods, + docs=docs, + private=private, + dunder=dunder, + sort=sort, + all=all, + value=value, + ) def log(*args, **kwargs) -> None: """Print with automatic file.function prefix. diff --git a/cli_syntax.py b/cli_syntax.py index 67d8ae1..370df09 100644 --- a/cli_syntax.py +++ b/cli_syntax.py @@ -12,6 +12,121 @@ class SyntaxErrorDetail: expected: Optional[str] = None +def _split_pipeline_stages(text: str) -> list[str]: + """Split a pipeline command into stage strings on unquoted '|' characters.""" + raw = str(text or "") + if not raw: + return [] + + stages: list[str] = [] + buf: list[str] = [] + quote: Optional[str] = None + escaped = False + + for ch in raw: + if escaped: + buf.append(ch) + escaped = False + continue + + if ch == "\\" and quote is not None: + buf.append(ch) + escaped = True + continue + + if ch in ("\"", "'"): + if quote is None: + quote = ch + elif quote == ch: + quote = None + buf.append(ch) + continue + + if ch == "|" and quote is None: + stage = "".join(buf).strip() + if stage: + stages.append(stage) + buf = [] + continue + + buf.append(ch) + + tail = "".join(buf).strip() + if tail: + stages.append(tail) + return stages + + +def _tokenize_stage(stage_text: str) -> list[str]: + """Tokenize a stage string (best-effort).""" + import shlex + + text = str(stage_text or "").strip() + if not text: + return [] + try: + return shlex.split(text) + except Exception: + return text.split() + + +def _has_flag(tokens: list[str], *flags: str) -> bool: + want = {str(f).strip().lower() for f in flags if str(f).strip()} + if not want: + return False + for tok in tokens: + low = str(tok).strip().lower() + if low in want: + return True + # Support -arg=value + if "=" in low: + head = low.split("=", 1)[0].strip() + if head in want: + return True + return False + + +def _validate_add_note_requires_add_file_order(raw: str) -> Optional[SyntaxErrorDetail]: + """Enforce: add-note in piped mode must occur after add-file. + + Rationale: add-note requires a known (store, hash) target; piping before add-file + means the item likely has no hash yet. + """ + stages = _split_pipeline_stages(raw) + if len(stages) <= 1: + return None + + parsed: list[tuple[str, list[str]]] = [] + for stage in stages: + tokens = _tokenize_stage(stage) + if not tokens: + continue + cmd = str(tokens[0]).replace("_", "-").strip().lower() + parsed.append((cmd, tokens)) + + add_file_positions = [i for i, (cmd, _toks) in enumerate(parsed) if cmd == "add-file"] + if not add_file_positions: + return None + + for i, (cmd, tokens) in enumerate(parsed): + if cmd != "add-note": + continue + + # If add-note occurs before any add-file stage, it must be explicitly targeted. + if any(pos > i for pos in add_file_positions): + has_hash = _has_flag(tokens, "-hash", "--hash") + has_store = _has_flag(tokens, "-store", "--store") + if has_hash and has_store: + continue + return SyntaxErrorDetail( + "Pipeline error: 'add-note' must come after 'add-file' when used with piped input. " + "Move 'add-note' after 'add-file', or call it with explicit targeting: " + "add-note -store -hash -query \"title:,text:<text>\"." + ) + + return None + + def validate_pipeline_text(text: str) -> Optional[SyntaxErrorDetail]: """Validate raw CLI input before tokenization/execution. @@ -97,6 +212,11 @@ def validate_pipeline_text(text: str) -> Optional[SyntaxErrorDetail]: if not in_single and not in_double and not ch.isspace(): seen_nonspace_since_pipe = True + # Semantic rules (still lightweight; no cmdlet imports) + semantic_error = _validate_add_note_requires_add_file_order(raw) + if semantic_error is not None: + return semantic_error + return None diff --git a/cmdlet/_shared.py b/cmdlet/_shared.py index 8696ea2..13e66fc 100644 --- a/cmdlet/_shared.py +++ b/cmdlet/_shared.py @@ -525,8 +525,8 @@ def parse_cmdlet_args(args: Sequence[str], cmdlet_spec: Dict[str, Any] | Cmdlet) token_lower = token.lower() # Legacy guidance: -hash/--hash was removed in favor of -query "hash:...". - # We don't error hard here because some cmdlets also accept free-form args. - if token_lower in {"-hash", "--hash"}: + # However, some cmdlets may explicitly re-introduce a -hash flag. + if token_lower in {"-hash", "--hash"} and token_lower not in arg_spec_map: try: log("Legacy flag -hash is no longer supported. Use: -query \"hash:<sha256>\"", file=sys.stderr) except Exception: diff --git a/cmdlet/add_note.py b/cmdlet/add_note.py index 07dcb9d..55ddc57 100644 --- a/cmdlet/add_note.py +++ b/cmdlet/add_note.py @@ -3,6 +3,7 @@ from __future__ import annotations from pathlib import Path from typing import Any, Dict, List, Optional, Sequence, Tuple import sys +import re from SYS.logger import log @@ -25,13 +26,12 @@ class Add_Note(Cmdlet): super().__init__( name="add-note", summary="Add file store note", - usage="add-note -store <store> [-query \"hash:<sha256>\"] <name> <text...>", + usage="add-note (-query \"title:<title>,text:<text>\") [ -store <store> -hash <sha256> | <piped> ]", alias=[""], arg=[ SharedArgs.STORE, + CmdletArg("hash", type="string", required=False, description="Target file hash (sha256). When omitted, uses piped item hash."), SharedArgs.QUERY, - CmdletArg("name", type="string", required=True, description="The note name/key to set (e.g. 'comment', 'lyric')."), - CmdletArg("text", type="string", required=True, description="Note text/content to store.", variadic=True), ], detail=[ """ @@ -47,6 +47,68 @@ class Add_Note(Cmdlet): pass self.register() + @staticmethod + def _commas_to_spaces_outside_quotes(text: str) -> str: + buf: List[str] = [] + quote: Optional[str] = None + escaped = False + for ch in str(text or ""): + if escaped: + buf.append(ch) + escaped = False + continue + if ch == "\\" and quote is not None: + buf.append(ch) + escaped = True + continue + if ch in ('"', "'"): + if quote is None: + quote = ch + elif quote == ch: + quote = None + buf.append(ch) + continue + if ch == "," and quote is None: + buf.append(" ") + continue + buf.append(ch) + return "".join(buf) + + @staticmethod + def _parse_note_query(query: str) -> Tuple[Optional[str], Optional[str]]: + """Parse note payload from -query. + + Expected: + title:<title>,text:<text> + Commas are treated as separators when not inside quotes. + """ + raw = str(query or "").strip() + if not raw: + return None, None + + try: + from cli_syntax import parse_query, get_field + except Exception: + parse_query = None # type: ignore + get_field = None # type: ignore + + normalized = Add_Note._commas_to_spaces_outside_quotes(raw) + + if callable(parse_query) and callable(get_field): + parsed = parse_query(normalized) + name = get_field(parsed, "title") + text = get_field(parsed, "text") + name_s = str(name or "").strip() if name is not None else "" + text_s = str(text or "").strip() if text is not None else "" + return (name_s or None, text_s or None) + + # Fallback: best-effort regex. + name_match = re.search(r"\btitle\s*:\s*([^,\s]+)", normalized, flags=re.IGNORECASE) + text_match = re.search(r"\btext\s*:\s*(.+)$", normalized, flags=re.IGNORECASE) + note_name = (name_match.group(1).strip() if name_match else "") + note_text = (text_match.group(1).strip() if text_match else "") + return (note_name or None, note_text or None) + def _resolve_hash(self, raw_hash: Optional[str], raw_path: Optional[str], override_hash: Optional[str]) -> Optional[str]: resolved = normalize_hash(override_hash) if override_hash else normalize_hash(raw_hash) if resolved: @@ -72,32 +134,42 @@ class Add_Note(Cmdlet): parsed = parse_cmdlet_args(args, self) store_override = parsed.get("store") - query_hash = sh.parse_single_hash_query(parsed.get("query")) - if parsed.get("query") and not query_hash: - log("[add_note] Error: -query must be of the form hash:<sha256>", file=sys.stderr) - return 1 - note_name = str(parsed.get("name") or "").strip() - text_parts = parsed.get("text") - - if not note_name: - log("[add_note] Error: Requires <name>", file=sys.stderr) + hash_override = normalize_hash(parsed.get("hash")) + note_name, note_text = self._parse_note_query(str(parsed.get("query") or "")) + if not note_name or not note_text: + log("[add_note] Error: -query must include title:<title> and text:<text>", file=sys.stderr) return 1 - if isinstance(text_parts, list): - note_text = " ".join([str(p) for p in text_parts]).strip() - else: - note_text = str(text_parts or "").strip() + if hash_override and not store_override: + log("[add_note] Error: -hash requires -store <store>", file=sys.stderr) + return 1 - # Note text can be omitted when upstream stages provide it (e.g. download-media --write-sub - # attaches notes.sub). In that case we resolve per-item below. - user_provided_text = bool(note_text) + explicit_target = bool(hash_override and store_override) results = normalize_result_input(result) + if results and explicit_target: + # Direct targeting mode: apply note once to the explicit target and + # pass through any piped items unchanged. + try: + store_registry = Store(config) + backend = store_registry[str(store_override)] + ok = bool(backend.set_note(str(hash_override), note_name, note_text, config=config)) + if ok: + ctx.print_if_visible(f"✓ add-note: 1 item in '{store_override}'", file=sys.stderr) + except Exception as exc: + log(f"[add_note] Error: Failed to set note: {exc}", file=sys.stderr) + return 1 + + for res in results: + ctx.emit(res) + return 0 + if not results: - if store_override and query_hash: - results = [{"store": str(store_override), "hash": query_hash}] + if explicit_target: + # Allow standalone use (no piped input) and enable piping the target forward. + results = [{"store": str(store_override), "hash": hash_override}] else: - log("[add_note] Error: Requires piped item(s) or -store and -query \"hash:<sha256>\"", file=sys.stderr) + log("[add_note] Error: Requires piped item(s) from add-file, or explicit -store <store> and -hash <sha256>", file=sys.stderr) return 1 store_registry = Store(config) @@ -106,55 +178,12 @@ class Add_Note(Cmdlet): # Batch write plan: store -> [(hash, name, text), ...] note_ops: Dict[str, List[Tuple[str, str, str]]] = {} - # Optional global fallback for note text from pipeline values. - # Allows patterns like: ... | add-note sub - pipeline_default_text = None - if not user_provided_text: - try: - pipeline_default_text = ctx.load_value(note_name) - except Exception: - pipeline_default_text = None - if isinstance(pipeline_default_text, list): - pipeline_default_text = " ".join([str(x) for x in pipeline_default_text]).strip() - elif pipeline_default_text is not None: - pipeline_default_text = str(pipeline_default_text).strip() - for res in results: if not isinstance(res, dict): ctx.emit(res) continue - # Resolve note text for this item when not provided explicitly. item_note_text = note_text - if not user_provided_text: - # Prefer item-scoped notes dict. - candidate = None - try: - notes = res.get("notes") - if isinstance(notes, dict): - candidate = notes.get(note_name) - except Exception: - candidate = None - - # Also allow direct field fallback: res["sub"], etc. - if candidate is None: - try: - candidate = res.get(note_name) - except Exception: - candidate = None - - if candidate is None: - candidate = pipeline_default_text - - if isinstance(candidate, list): - item_note_text = " ".join([str(x) for x in candidate]).strip() - else: - item_note_text = str(candidate or "").strip() - - if not item_note_text: - log(f"[add_note] Warning: No note text found for '{note_name}'; skipping", file=sys.stderr) - ctx.emit(res) - continue store_name = str(store_override or res.get("store") or "").strip() raw_hash = res.get("hash") @@ -167,7 +196,7 @@ class Add_Note(Cmdlet): resolved_hash = self._resolve_hash( raw_hash=str(raw_hash) if raw_hash else None, raw_path=str(raw_path) if raw_path else None, - override_hash=str(query_hash) if query_hash else None, + override_hash=str(hash_override) if hash_override else None, ) if not resolved_hash: log("[add_note] Warning: Item missing usable hash; skipping", file=sys.stderr) diff --git a/cmdlet/download_media.py b/cmdlet/download_media.py index c04180c..73a697c 100644 --- a/cmdlet/download_media.py +++ b/cmdlet/download_media.py @@ -254,6 +254,22 @@ def list_formats( return None formats = info.get("formats") or [] + + # Some URLs (notably playlist contexts) yield a playlist-shaped payload with + # `entries` rather than a direct video payload. If so, try to pull formats + # from the first concrete entry. + if (not formats) and isinstance(info.get("entries"), list): + try: + for entry in info.get("entries") or []: + if not isinstance(entry, dict): + continue + entry_formats = entry.get("formats") + if isinstance(entry_formats, list) and entry_formats: + formats = entry_formats + break + except Exception: + pass + if not isinstance(formats, list) or not formats: log("No formats available", file=sys.stderr) return None @@ -704,7 +720,30 @@ def download_media( session_id = None first_section_info = {} if ytdl_options.get("download_sections"): - session_id, first_section_info = _download_with_sections_via_cli(opts.url, ytdl_options, ytdl_options.get("download_sections", []), quiet=opts.quiet) + # The CLI path emits yt-dlp's own progress output; pause the pipeline Live UI + # so those progress bars remain visible instead of being clobbered. + try: + from contextlib import nullcontext + except Exception: + nullcontext = None # type: ignore + + suspend = getattr(pipeline_context, "suspend_live_progress", None) + cm = suspend() if callable(suspend) else (nullcontext() if nullcontext else None) + if cm is None: + session_id, first_section_info = _download_with_sections_via_cli( + opts.url, + ytdl_options, + ytdl_options.get("download_sections", []), + quiet=opts.quiet, + ) + else: + with cm: + session_id, first_section_info = _download_with_sections_via_cli( + opts.url, + ytdl_options, + ytdl_options.get("download_sections", []), + quiet=opts.quiet, + ) info = None else: with yt_dlp.YoutubeDL(ytdl_options) as ydl: # type: ignore[arg-type] @@ -1384,21 +1423,50 @@ class Download_Media(Cmdlet): item["title"] = item.get("name") or item.get("target") or item.get("path") or "Result" # Keep the full payload for history/inspection, but display a focused table. - display_row = { - "title": item.get("title"), - "store": item.get("store"), - "hash": item.get("hash") or item.get("file_hash") or item.get("sha256"), - } + # Use shared extractors so Ext/Size/Store/Hash remain consistent everywhere. + try: + from result_table import build_display_row + except Exception: + build_display_row = None # type: ignore + + if callable(build_display_row): + display_row = build_display_row(item, keys=["title", "store", "hash", "ext", "size"]) + else: + display_row = { + "title": item.get("title"), + "store": item.get("store"), + "hash": item.get("hash") or item.get("file_hash") or item.get("sha256"), + "ext": str(item.get("ext") or ""), + "size": item.get("size") or item.get("size_bytes"), + } table.add_result(display_row) results_list.append(item) pipeline_context.set_current_stage_table(table) pipeline_context.set_last_result_table(table, results_list) - get_stderr_console().print(table) - setattr(table, "_rendered_by_cmdlet", True) - if not Confirm.ask("Continue anyway?", default=False, console=get_stderr_console()): - return False + try: + from contextlib import nullcontext + except Exception: + nullcontext = None # type: ignore + + suspend = getattr(pipeline_context, "suspend_live_progress", None) + cm = suspend() if callable(suspend) else (nullcontext() if nullcontext else None) + if cm is None: + get_stderr_console().print(table) + setattr(table, "_rendered_by_cmdlet", True) + if not Confirm.ask("Continue anyway?", default=False, console=get_stderr_console()): + return False + else: + with cm: + get_stderr_console().print(table) + setattr(table, "_rendered_by_cmdlet", True) + if not Confirm.ask("Continue anyway?", default=False, console=get_stderr_console()): + try: + pipeline_context.request_pipeline_stop(reason="duplicate-url declined", exit_code=0) + except Exception: + pass + return False return True def _preflight_url_duplicates_bulk(urls: Sequence[str]) -> bool: @@ -1597,15 +1665,45 @@ class Download_Media(Cmdlet): hit = backend_hits[0] title = hit.get("title") or hit.get("name") or hit.get("target") or hit.get("path") or "(exists)" file_hash = hit.get("hash") or hit.get("file_hash") or hit.get("sha256") or "" + + try: + from result_table import build_display_row + except Exception: + build_display_row = None # type: ignore + + extracted = { + "title": str(title), + "store": str(hit.get("store") or backend_name), + "hash": str(file_hash or ""), + "ext": "", + "size": None, + } + if callable(build_display_row): + try: + extracted = build_display_row(hit, keys=["title", "store", "hash", "ext", "size"]) + except Exception: + pass + # Ensure we still prefer the precomputed values for title/store/hash. + extracted["title"] = str(title) + extracted["store"] = str(hit.get("store") or backend_name) + extracted["hash"] = str(file_hash or "") + + ext = extracted.get("ext") + size_val = extracted.get("size") + display_row = { "title": str(title), "store": str(hit.get("store") or backend_name), "hash": str(file_hash or ""), + "ext": str(ext or ""), + "size": size_val, "url": original_url, "columns": [ ("Title", str(title)), ("Store", str(hit.get("store") or backend_name)), ("Hash", str(file_hash or "")), + ("Ext", str(ext or "")), + ("Size", size_val), ("URL", original_url), ], } @@ -1615,7 +1713,8 @@ class Download_Media(Cmdlet): debug("Bulk URL preflight: no matches") return True - table = ResultTable(f"URL already exists ({len(matched_urls)} url(s))") + # This table is non-interactive and intentionally wide (we want URL + ext/size). + table = ResultTable(f"URL already exists ({len(matched_urls)} url(s))", max_columns=10) table.set_no_choice(True) try: table.set_preserve_order(True) @@ -1777,7 +1876,10 @@ class Download_Media(Cmdlet): table = ResultTable() safe_url = str(url or "").strip() table.title = f'download-media -url "{safe_url}"' if safe_url else "download-media" - table.set_source_command("download-media", [url]) + # Selection tables should expand '@N' into a runnable command. + # For playlist-item rows we prefer the concrete per-item URL so the + # expanded command targets a single video (not the whole playlist). + table.set_source_command("download-media", []) try: table.set_preserve_order(True) except Exception: @@ -1803,6 +1905,9 @@ class Download_Media(Cmdlet): "detail": str(uploader or ""), "media_kind": "playlist-item", "playlist_index": idx, + # Enable '@N' expansion into a concrete command. + # Prefer selecting the resolved per-item URL when available. + "_selection_args": (["-url", str(entry_url)] if entry_url else ["-url", str(url), "-item", str(idx)]), # Critical for normal @ selection piping: downstream cmdlets # (including download-media itself) look for url/target. "url": entry_url, diff --git a/cmdlet/screen_shot.py b/cmdlet/screen_shot.py index cf77630..f5ddcfc 100644 --- a/cmdlet/screen_shot.py +++ b/cmdlet/screen_shot.py @@ -6,7 +6,6 @@ Playwright, marking them as temporary artifacts for cleanup. from __future__ import annotations -import contextlib import hashlib import sys import time @@ -32,6 +31,22 @@ get_field = sh.get_field parse_cmdlet_args = sh.parse_cmdlet_args import pipeline as pipeline_context + +def _set_live_step(text: str) -> None: + """Best-effort update to the pipeline Live progress title (if enabled).""" + try: + ui = pipeline_context.get_live_progress() if hasattr(pipeline_context, "get_live_progress") else None + except Exception: + ui = None + if ui is None: + return + try: + setter = getattr(ui, "set_active_subtask_text", None) + if callable(setter): + setter(str(text or "").strip()) + except Exception: + pass + # ============================================================================ # CMDLET Metadata Declaration # ============================================================================ @@ -65,7 +80,7 @@ USER_AGENT = ( "Chrome/120.0.0.0 Safari/537.36" ) -DEFAULT_VIEWPORT: dict[str, int] = {"width": 1280, "height": 1200} +DEFAULT_VIEWPORT: dict[str, int] = {"width": 1920, "height": 1080} ARCHIVE_TIMEOUT = 30.0 # Configurable selectors for specific websites @@ -114,7 +129,7 @@ class ScreenshotOptions: output_path: Optional[Path] = None full_page: bool = True headless: bool = True - wait_after_load: float = 2.0 + wait_after_load: float = 6.0 wait_for_article: bool = False replace_video_posters: bool = True tag: Sequence[str] = () @@ -156,13 +171,13 @@ def _slugify_url(url: str) -> str: def _normalise_format(fmt: Optional[str]) -> str: """Normalize output format to valid values.""" if not fmt: - return "png" + return "webp" value = fmt.strip().lower() if value in {"jpg", "jpeg"}: return "jpeg" - if value in {"png", "pdf"}: + if value in {"png", "pdf", "webp"}: return value - return "png" + return "webp" def _format_suffix(fmt: str) -> str: @@ -172,6 +187,15 @@ def _format_suffix(fmt: str) -> str: return f".{fmt}" +def _convert_to_webp(source_path: Path, dest_path: Path) -> None: + """Convert an image file to WebP using Pillow.""" + from PIL import Image + + with Image.open(source_path) as img: + # Keep a sensible default: good quality + small size. + img.save(dest_path, format="WEBP", quality=100, method=6) + + def _selectors_for_url(url: str) -> List[str]: """Return a list of likely content selectors for known platforms.""" u = url.lower() @@ -184,6 +208,19 @@ def _selectors_for_url(url: str) -> List[str]: return sels or ["article"] +def _matched_site_selectors(url: str) -> List[str]: + """Return SITE_SELECTORS for a matched domain; empty if no match. + + Unlike `_selectors_for_url()`, this does not return a generic fallback. + """ + u = str(url or "").lower() + sels: List[str] = [] + for domain, selectors in SITE_SELECTORS.items(): + if domain in u: + sels.extend(selectors) + return sels + + def _platform_preprocess(url: str, page: Any, warnings: List[str], timeout_ms: int = 10_000) -> None: """Best-effort page tweaks for popular platforms before capture.""" u = url.lower() @@ -322,6 +359,10 @@ def _capture(options: ScreenshotOptions, destination: Path, warnings: List[str]) """Capture screenshot using Playwright.""" debug(f"[_capture] Starting capture for {options.url} -> {destination}") try: + # Two-phase Live progress: + # 1) load + stabilize (ends right after the wait_after_load sleep) + # 2) capture + save (and any post-processing) + _set_live_step("screen-shot: loading") tool = options.playwright_tool or PlaywrightTool({}) # Ensure Chromium engine is used for the screen-shot cmdlet (force for consistency) @@ -329,7 +370,18 @@ def _capture(options: ScreenshotOptions, destination: Path, warnings: List[str]) current_browser = getattr(tool.defaults, "browser", "").lower() if getattr(tool, "defaults", None) is not None else "" if current_browser != "chromium": debug(f"[_capture] Overriding Playwright browser '{current_browser}' -> 'chromium' for screen-shot cmdlet") - tool = PlaywrightTool({"tool": {"playwright": {"browser": "chromium"}}}) + base_cfg = {} + try: + base_cfg = dict(getattr(tool, "_config", {}) or {}) + except Exception: + base_cfg = {} + tool_block = dict(base_cfg.get("tool") or {}) if isinstance(base_cfg, dict) else {} + pw_block = dict(tool_block.get("playwright") or {}) if isinstance(tool_block, dict) else {} + pw_block["browser"] = "chromium" + tool_block["playwright"] = pw_block + if isinstance(base_cfg, dict): + base_cfg["tool"] = tool_block + tool = PlaywrightTool(base_cfg) except Exception: tool = PlaywrightTool({"tool": {"playwright": {"browser": "chromium"}}}) @@ -366,6 +418,9 @@ def _capture(options: ScreenshotOptions, destination: Path, warnings: List[str]) if options.wait_after_load > 0: debug(f"Waiting {options.wait_after_load}s for page stabilization...") time.sleep(min(10.0, max(0.0, options.wait_after_load))) + + # Phase 2 begins here (per request). + _set_live_step("screen-shot: capturing") if options.replace_video_posters: debug("Replacing video elements with posters...") page.evaluate( @@ -384,6 +439,7 @@ def _capture(options: ScreenshotOptions, destination: Path, warnings: List[str]) # Attempt platform-specific target capture if requested (and not PDF) element_captured = False if options.prefer_platform_target and format_name != "pdf": + debug(f"[_capture] Target capture enabled") debug("Attempting platform-specific content capture...") try: _platform_preprocess(options.url, page, warnings) @@ -393,7 +449,7 @@ def _capture(options: ScreenshotOptions, destination: Path, warnings: List[str]) selectors = list(options.target_selectors or []) if not selectors: selectors = _selectors_for_url(options.url) - + debug(f"[_capture] Trying selectors: {selectors}") for sel in selectors: try: @@ -459,14 +515,36 @@ def _capture(options: ScreenshotOptions, destination: Path, warnings: List[str]) def _capture_screenshot(options: ScreenshotOptions) -> ScreenshotResult: """Capture a screenshot for the given options.""" debug(f"[_capture_screenshot] Preparing capture for {options.url}") + requested_format = _normalise_format(options.output_format) destination = _prepare_output_path(options) warnings: List[str] = [] - _capture(options, destination, warnings) + + # Playwright screenshots do not natively support WebP output. + # Capture as PNG, then convert via Pillow. + capture_path = destination + if requested_format == "webp": + capture_path = unique_path(destination.with_suffix(".png")) + debug(f"[_capture_screenshot] Requested webp; capturing intermediate png -> {capture_path}") + options.output_format = "png" + _capture(options, capture_path, warnings) + + if requested_format == "webp": + debug(f"[_capture_screenshot] Converting png -> webp: {destination}") + try: + _convert_to_webp(capture_path, destination) + try: + capture_path.unlink(missing_ok=True) + except Exception: + pass + except Exception as exc: + warnings.append(f"webp conversion failed; keeping png: {exc}") + destination = capture_path # Build URL list from captured url and any archives url: List[str] = [options.url] if options.url else [] archive_url: List[str] = [] if options.archive and options.url: + _set_live_step("screen-shot: archiving") debug(f"[_capture_screenshot] Archiving enabled for {options.url}") archives, archive_warnings = _archive_url(options.url, options.archive_timeout) archive_url.extend(archives) @@ -538,7 +616,7 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: except Exception: pass if not format_value: - format_value = "png" + format_value = "webp" storage_value = parsed.get("storage") selector_arg = parsed.get("selector") selectors = [selector_arg] if selector_arg else [] @@ -549,27 +627,27 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: positional_url = [str(url_arg)] if url_arg else [] # ======================================================================== - # INPUT PROCESSING - Extract url from pipeline or command arguments + # INPUT PROCESSING - Extract url from command args or pipeline # ======================================================================== - - piped_results = normalize_result_input(result) - url_to_process: List[Tuple[str, Any]] = [] - - # Extract url from piped results - if piped_results: - for item in piped_results: - url = ( - get_field(item, 'path') - or get_field(item, 'url') - or get_field(item, 'target') - ) - if url: - url_to_process.append((str(url), item)) - - # Use positional arguments if no pipeline input - if not url_to_process and positional_url: + # If the user provided an explicit URL argument, prefer it. + url_to_process: List[Tuple[str, Any]] = [] + if positional_url: url_to_process = [(u, None) for u in positional_url] + else: + piped_results = normalize_result_input(result) + + # Extract url from piped results + if piped_results: + for item in piped_results: + url = ( + get_field(item, 'path') + or get_field(item, 'url') + or get_field(item, 'target') + ) + + if url: + url_to_process.append((str(url), item)) if not url_to_process: log(f"No url to process for screen-shot cmdlet", file=sys.stderr) @@ -577,6 +655,32 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: debug(f"[_run] url to process: {[u for u, _ in url_to_process]}") + # If the caller isn't running the shared pipeline Live progress UI (e.g. direct + # cmdlet execution), start a minimal local pipeline progress panel so this cmdlet + # still shows step-level progress. + local_progress_ui = None + try: + existing_ui = pipeline_context.get_live_progress() if hasattr(pipeline_context, "get_live_progress") else None + except Exception: + existing_ui = None + try: + if existing_ui is None and bool(getattr(sys.stderr, "isatty", lambda: False)()): + from models import PipelineLiveProgress + + local_progress_ui = PipelineLiveProgress(["screen-shot"], enabled=True) + local_progress_ui.start() + try: + if hasattr(pipeline_context, "set_live_progress"): + pipeline_context.set_live_progress(local_progress_ui) + except Exception: + pass + try: + local_progress_ui.begin_pipe(0, total_items=len(url_to_process), items_preview=[u for u, _ in url_to_process]) + except Exception: + pass + except Exception: + local_progress_ui = None + # ======================================================================== # OUTPUT DIRECTORY RESOLUTION - Priority chain # ======================================================================== @@ -621,7 +725,7 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: format_name = _normalise_format(format_value) filtered_selectors = [str(s).strip() for s in selectors if str(s).strip()] - target_selectors = filtered_selectors if filtered_selectors else None + manual_target_selectors = filtered_selectors if filtered_selectors else None all_emitted = [] exit_code = 0 @@ -664,6 +768,7 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: continue try: + _set_live_step("screen-shot: starting") # Create screenshot with provided options # Force the Playwright engine to Chromium for the screen-shot cmdlet # (this ensures consistent rendering and supports PDF output requirements). @@ -672,23 +777,49 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: tool_block = dict(config.get("tool") or {}) pw_block = dict(tool_block.get("playwright") or {}) pw_block["browser"] = "chromium" + # Use Playwright-native UA/headers (matches bundled Chromium version). + pw_block["user_agent"] = "native" + pw_block["viewport_width"] = int(DEFAULT_VIEWPORT.get("width", 1920)) + pw_block["viewport_height"] = int(DEFAULT_VIEWPORT.get("height", 1080)) tool_block["playwright"] = pw_block pw_local_cfg = dict(config) pw_local_cfg["tool"] = tool_block else: - pw_local_cfg = {"tool": {"playwright": {"browser": "chromium"}}} + pw_local_cfg = { + "tool": { + "playwright": { + "browser": "chromium", + "user_agent": "native", + "viewport_width": int(DEFAULT_VIEWPORT.get("width", 1920)), + "viewport_height": int(DEFAULT_VIEWPORT.get("height", 1080)), + } + } + } options = ScreenshotOptions( url=url, output_dir=screenshot_dir, output_format=format_name, archive=archive_enabled, - target_selectors=target_selectors, + target_selectors=None, prefer_platform_target=False, wait_for_article=False, full_page=True, playwright_tool=PlaywrightTool(pw_local_cfg), ) + + # Auto element capture for known sites (x.com/twitter/etc.). + # - If the user provided --selector, treat that as an explicit target. + # - Otherwise, if SITE_SELECTORS matches the URL, auto-capture the post/content element. + auto_selectors = _matched_site_selectors(url) + if manual_target_selectors: + options.prefer_platform_target = True + options.target_selectors = manual_target_selectors + debug(f"[screen_shot] Using explicit selector(s): {manual_target_selectors}") + elif auto_selectors: + options.prefer_platform_target = True + options.target_selectors = auto_selectors + debug(f"[screen_shot] Auto selectors matched for url: {auto_selectors}") screenshot_result = _capture_screenshot(options) @@ -748,6 +879,13 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: # Emit the result so downstream cmdlet (like add-file) can use it pipeline_context.emit(pipe_obj) all_emitted.append(pipe_obj) + + # If we created a local progress UI, advance it per completed item. + if local_progress_ui is not None: + try: + local_progress_ui.on_emit(0, pipe_obj) + except Exception: + pass except ScreenshotError as exc: log(f"Error taking screenshot of {url}: {exc}", file=sys.stderr) @@ -758,13 +896,31 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: traceback.print_exc(file=sys.stderr) exit_code = 1 + try: + if local_progress_ui is not None: + try: + local_progress_ui.finish_pipe(0, force_complete=True) + except Exception: + pass + finally: + if local_progress_ui is not None: + try: + local_progress_ui.stop() + except Exception: + pass + try: + if hasattr(pipeline_context, "set_live_progress"): + pipeline_context.set_live_progress(None) + except Exception: + pass + if not all_emitted: log(f"No screenshots were successfully captured", file=sys.stderr) return 1 - + # Log completion message (keep this as normal output) log(f"✓ Successfully captured {len(all_emitted)} screenshot(s)") - + return exit_code CMDLET = Cmdlet( name="screen-shot", @@ -773,7 +929,7 @@ CMDLET = Cmdlet( alias=["screenshot", "ss"], arg=[ SharedArgs.URL, - CmdletArg(name="format", type="string", description="Output format: png, jpeg, or pdf"), + CmdletArg(name="format", type="string", description="Output format: webp, png, jpeg, or pdf"), CmdletArg(name="selector", type="string", description="CSS selector for element capture"), ], diff --git a/models.py b/models.py index 3557644..162e6ab 100644 --- a/models.py +++ b/models.py @@ -12,14 +12,20 @@ from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Protocol, TextIO from rich.console import Console +from rich.console import ConsoleOptions +from rich.console import Group +from rich.live import Live +from rich.panel import Panel from rich.progress import ( BarColumn, DownloadColumn, Progress, + SpinnerColumn, TaskID, TaskProgressColumn, TextColumn, TimeRemainingColumn, + TimeElapsedColumn, TransferSpeedColumn, ) @@ -403,14 +409,56 @@ class ProgressBar: self._progress: Optional[Progress] = None self._task_id: Optional[TaskID] = None + # Optional: when a PipelineLiveProgress is active, prefer rendering + # transfers inside it instead of creating a nested Rich Progress. + self._pipeline_ui: Any = None + self._pipeline_label: Optional[str] = None + def _ensure_started(self, *, label: str, total: Optional[int], file: Any = None) -> None: + if self._pipeline_ui is not None and self._pipeline_label: + # Pipeline-backed transfer task is already registered; update its total if needed. + try: + if total is not None and total > 0: + self._pipeline_ui.update_transfer(label=self._pipeline_label, completed=None, total=int(total)) + except Exception: + pass + return + if self._progress is not None and self._task_id is not None: if total is not None and total > 0: self._progress.update(self._task_id, total=int(total)) return + # Prefer integrating with the pipeline Live UI to avoid nested Rich Live instances. + try: + import pipeline as pipeline_context + + ui = pipeline_context.get_live_progress() + if ui is not None and hasattr(ui, "begin_transfer") and hasattr(ui, "update_transfer"): + self._pipeline_ui = ui + self._pipeline_label = str(label or "download") + try: + ui.begin_transfer(label=self._pipeline_label, total=int(total) if isinstance(total, int) and total > 0 else None) + except Exception: + # If pipeline integration fails, fall back to standalone progress. + self._pipeline_ui = None + self._pipeline_label = None + else: + return + except Exception: + pass + stream = file if file is not None else sys.stderr - console = Console(file=stream) + # Use shared stderr console when rendering to stderr (cooperates with PipelineLiveProgress). + if stream is sys.stderr: + try: + from rich_display import stderr_console + + console = stderr_console() + except Exception: + console = Console(file=stream) + else: + console = Console(file=stream) progress = Progress( TextColumn("[progress.description]{task.description}"), BarColumn(), @@ -441,6 +489,17 @@ class ProgressBar: if downloaded is None and total is None: return self._ensure_started(label=label, total=total, file=file) + if self._pipeline_ui is not None and self._pipeline_label: + try: + self._pipeline_ui.update_transfer( + label=self._pipeline_label, + completed=int(downloaded or 0) if downloaded is not None else None, + total=int(total) if isinstance(total, int) and total > 0 else None, + ) + except Exception: + pass + return + if self._progress is None or self._task_id is None: return if total is not None and total > 0: @@ -449,6 +508,15 @@ class ProgressBar: self._progress.update(self._task_id, completed=int(downloaded or 0), refresh=True) def finish(self) -> None: + if self._pipeline_ui is not None and self._pipeline_label: + try: + self._pipeline_ui.finish_transfer(label=self._pipeline_label) + except Exception: + pass + finally: + self._pipeline_ui = None + self._pipeline_label = None + return if self._progress is None: return try: @@ -562,28 +630,519 @@ class ProgressFileReader: # ============================================================================ # Note: Pipeline functions and state variables moved to pipeline.py + +def _pipeline_progress_item_label(value: Any, *, max_len: int = 72) -> str: + def _clip(text: str) -> str: + text = str(text or "").strip() + if not text: + return "(item)" + if len(text) <= max_len: + return text + return text[: max(0, max_len - 1)] + "…" + + try: + if isinstance(value, PipeObject): + if value.title: + return _clip(value.title) + if value.url: + return _clip(value.url) + if value.source_url: + return _clip(value.source_url) + if value.path: + return _clip(value.path) + if value.hash: + return _clip(value.hash) + if isinstance(value, dict): + for key in ("title", "url", "source_url", "path", "hash", "target"): + raw = value.get(key) + if raw is not None and str(raw).strip(): + return _clip(str(raw)) + return _clip(str(value)) + except Exception: + return "(item)" + + +class PipelineLiveProgress: + """Multi-level pipeline progress UI. + + - Each pipeline step (pipe) is a persistent bar. + - Each per-item operation is shown as a transient sub-task (spinner). + + Designed to render to stderr so pipelines remain clean. + """ + + def __init__(self, pipe_labels: List[str], *, enabled: bool = True) -> None: + self._enabled = bool(enabled) + self._pipe_labels = [str(x) for x in (pipe_labels or [])] + + self._console: Optional[Console] = None + self._live: Optional[Live] = None + + self._overall: Optional[Progress] = None + self._pipe_progress: Optional[Progress] = None + self._subtasks: Optional[Progress] = None + self._transfers: Optional[Progress] = None + + self._overall_task: Optional[TaskID] = None + self._pipe_tasks: List[TaskID] = [] + + self._transfer_tasks: Dict[str, TaskID] = {} + + # Per-pipe state + self._pipe_totals: List[int] = [0 for _ in self._pipe_labels] + self._pipe_done: List[int] = [0 for _ in self._pipe_labels] + self._subtask_ids: List[List[TaskID]] = [[] for _ in self._pipe_labels] + self._subtask_active_index: List[int] = [0 for _ in self._pipe_labels] + + # Title line state (active per-item context) + self._active_subtask_text: Optional[str] = None + + def _title_text(self) -> str: + """Compute the Pipeline panel title. + + We keep per-pipe elapsed time on the pipe rows. The panel title is used + to show the currently active item (cmd + url/path) with a lightweight + spinner so the UI reads as "working on X". + """ + + active = str(self._active_subtask_text or "").strip() + if not active: + return "Pipeline" + + # Lightweight spinner frames (similar intent to Rich's simpleDots). + try: + import time + + frames = [".", "..", "..."] + idx = int(time.monotonic() * 4) % len(frames) + prefix = frames[idx] + except Exception: + prefix = "..." + + return f"{prefix} {active}" + + def set_active_subtask_text(self, text: Optional[str]) -> None: + """Update the Pipeline panel title to reflect the current in-item step. + + This is intentionally lightweight: it does not affect pipe counters. + Cmdlets may call this to surface step-level progress for long-running + single-item work (e.g. Playwright page load -> capture -> convert). + """ + if not self._enabled: + return + try: + value = str(text or "").strip() + except Exception: + value = "" + self._active_subtask_text = value or None + + def __rich_console__(self, console: "Console", options: "ConsoleOptions"): + """Renderable hook used by Rich Live. + + Using a dynamic renderable keeps the panel title up to date and animates + the spinner without needing manual Live.update() calls. + """ + + pipe_progress = self._pipe_progress + transfers = self._transfers + overall = self._overall + if pipe_progress is None or transfers is None or overall is None: + # Not started (or stopped). + yield Panel("", title="Pipeline", expand=False) + return + + yield Group( + Panel(Group(pipe_progress, transfers), title=self._title_text(), expand=False), + overall, + ) + + def _render_group(self) -> Group: + # Backward-compatible helper (some callers may still expect a Group). + pipe_progress = self._pipe_progress + transfers = self._transfers + overall = self._overall + assert pipe_progress is not None + assert transfers is not None + assert overall is not None + return Group( + Panel(Group(pipe_progress, transfers), title=self._title_text(), expand=False), + overall, + ) + + def start(self) -> None: + if not self._enabled: + return + if self._live is not None: + return + + # IMPORTANT: use the shared stderr Console instance so that any + # `stderr_console().print(...)` calls from inside cmdlets (e.g. preflight + # tables/prompts in download-media) cooperate with Rich Live rendering. + # If we create a separate Console(file=sys.stderr), output will fight for + # terminal cursor control and appear "blocked"/truncated. + from rich_display import stderr_console + + self._console = stderr_console() + + # Persistent per-pipe bars. + self._pipe_progress = Progress( + TextColumn("{task.description}"), + TimeElapsedColumn(), + BarColumn(), + TaskProgressColumn(), + console=self._console, + transient=False, + ) + + # Transient, per-item spinner for the currently-active subtask. + self._subtasks = Progress( + TextColumn(" "), + SpinnerColumn("simpleDots"), + TextColumn("{task.description}"), + console=self._console, + transient=False, + ) + + # Byte-based transfer bars (download/upload) integrated into the Live view. + self._transfers = Progress( + TextColumn(" {task.description}"), + BarColumn(), + TaskProgressColumn(), + DownloadColumn(), + TransferSpeedColumn(), + TimeRemainingColumn(), + console=self._console, + transient=False, + ) + + self._overall = Progress( + TimeElapsedColumn(), + BarColumn(), + TextColumn("{task.description}"), + console=self._console, + transient=False, + ) + + # Create pipe tasks up-front so the user sees the pipe structure immediately. + self._pipe_tasks = [] + for idx, label in enumerate(self._pipe_labels): + # Start timers only when the pipe actually begins. + task_id = self._pipe_progress.add_task( + f"{idx + 1}/{len(self._pipe_labels)} {label}", + total=1, + start=False, + ) + self._pipe_progress.update(task_id, completed=0, total=1) + self._pipe_tasks.append(task_id) + + self._overall_task = self._overall.add_task( + f"Pipeline: 0/{len(self._pipe_labels)} pipes completed", + total=max(1, len(self._pipe_labels)), + ) + + self._live = Live(self, console=self._console, refresh_per_second=10, transient=True) + self._live.start() + + def pause(self) -> None: + """Temporarily stop Live rendering without losing progress state.""" + if self._live is None: + return + try: + self._live.stop() + finally: + self._live = None + + def resume(self) -> None: + """Resume Live rendering after pause().""" + if not self._enabled: + return + if self._live is not None: + return + if self._console is None or self._pipe_progress is None or self._subtasks is None or self._transfers is None or self._overall is None: + # Not initialized yet; start fresh. + self.start() + return + self._live = Live(self, console=self._console, refresh_per_second=10, transient=True) + self._live.start() + + def stop(self) -> None: + # Safe to call whether Live is running or paused. + if self._live is not None: + try: + self._live.stop() + except Exception: + pass + + self._live = None + self._console = None + self._overall = None + self._pipe_progress = None + self._subtasks = None + self._transfers = None + self._overall_task = None + self._pipe_tasks = [] + self._transfer_tasks = {} + self._active_subtask_text = None + + def begin_transfer(self, *, label: str, total: Optional[int] = None) -> None: + if not self._enabled: + return + if self._transfers is None: + return + key = str(label or "transfer") + if key in self._transfer_tasks: + # If it already exists, treat as an update to total. + try: + if total is not None and total > 0: + self._transfers.update(self._transfer_tasks[key], total=int(total)) + except Exception: + pass + return + task_total = int(total) if isinstance(total, int) and total > 0 else None + try: + task_id = self._transfers.add_task(key, total=task_total) + self._transfer_tasks[key] = task_id + except Exception: + pass + + def update_transfer(self, *, label: str, completed: Optional[int], total: Optional[int] = None) -> None: + if not self._enabled: + return + if self._transfers is None: + return + key = str(label or "transfer") + if key not in self._transfer_tasks: + self.begin_transfer(label=key, total=total) + task_id = self._transfer_tasks.get(key) + if task_id is None: + return + try: + kwargs: Dict[str, Any] = {} + if completed is not None: + kwargs["completed"] = int(completed) + if total is not None and total > 0: + kwargs["total"] = int(total) + self._transfers.update(task_id, refresh=True, **kwargs) + except Exception: + pass + + def finish_transfer(self, *, label: str) -> None: + if self._transfers is None: + return + key = str(label or "transfer") + task_id = self._transfer_tasks.pop(key, None) + if task_id is None: + return + try: + self._transfers.remove_task(task_id) + except Exception: + pass + + def _ensure_pipe(self, pipe_index: int) -> bool: + if not self._enabled: + return False + if self._pipe_progress is None or self._subtasks is None or self._overall is None: + return False + if pipe_index < 0 or pipe_index >= len(self._pipe_labels): + return False + return True + + def begin_pipe(self, pipe_index: int, *, total_items: int, items_preview: Optional[List[Any]] = None) -> None: + if not self._ensure_pipe(pipe_index): + return + pipe_progress = self._pipe_progress + subtasks = self._subtasks + assert pipe_progress is not None + assert subtasks is not None + + total_items = int(total_items) if isinstance(total_items, int) else 0 + total_items = max(1, total_items) + self._pipe_totals[pipe_index] = total_items + self._pipe_done[pipe_index] = 0 + self._subtask_active_index[pipe_index] = 0 + self._subtask_ids[pipe_index] = [] + + pipe_task = self._pipe_tasks[pipe_index] + pipe_progress.update(pipe_task, completed=0, total=total_items) + # Start the per-pipe timer now that the pipe is actually running. + try: + pipe_progress.start_task(pipe_task) + except Exception: + pass + + labels: List[str] = [] + if isinstance(items_preview, list) and items_preview: + labels = [_pipeline_progress_item_label(x) for x in items_preview] + + for i in range(total_items): + suffix = labels[i] if i < len(labels) else f"item {i + 1}/{total_items}" + # Use start=False so elapsed time starts when we explicitly start_task(). + sub_id = subtasks.add_task(f"{self._pipe_labels[pipe_index]}: {suffix}", start=False) + subtasks.update(sub_id, visible=False) + self._subtask_ids[pipe_index].append(sub_id) + + # Show the first subtask spinner. + if self._subtask_ids[pipe_index]: + first = self._subtask_ids[pipe_index][0] + subtasks.update(first, visible=True) + subtasks.start_task(first) + try: + t = subtasks.tasks[first] + self._active_subtask_text = str(getattr(t, "description", "") or "").strip() or None + except Exception: + self._active_subtask_text = None + + def on_emit(self, pipe_index: int, emitted: Any) -> None: + if not self._ensure_pipe(pipe_index): + return + + pipe_progress = self._pipe_progress + subtasks = self._subtasks + assert pipe_progress is not None + assert subtasks is not None + + done = self._pipe_done[pipe_index] + total = self._pipe_totals[pipe_index] + active = self._subtask_active_index[pipe_index] + + # If a stage emits more than expected, extend totals dynamically. + if done >= total: + total = done + 1 + self._pipe_totals[pipe_index] = total + pipe_task = self._pipe_tasks[pipe_index] + pipe_progress.update(pipe_task, total=total) + + # Add a placeholder subtask. + sub_id = subtasks.add_task( + f"{self._pipe_labels[pipe_index]}: {_pipeline_progress_item_label(emitted)}" + ) + subtasks.stop_task(sub_id) + subtasks.update(sub_id, visible=False) + self._subtask_ids[pipe_index].append(sub_id) + + # Complete & hide current active subtask. + if active < len(self._subtask_ids[pipe_index]): + current = self._subtask_ids[pipe_index][active] + try: + # If we didn’t have a preview label, set it now. + subtasks.update( + current, + description=f"{self._pipe_labels[pipe_index]}: {_pipeline_progress_item_label(emitted)}", + ) + except Exception: + pass + subtasks.stop_task(current) + subtasks.update(current, visible=False) + + done += 1 + self._pipe_done[pipe_index] = done + + pipe_task = self._pipe_tasks[pipe_index] + pipe_progress.update(pipe_task, completed=done) + + # Start next subtask spinner. + next_index = active + 1 + self._subtask_active_index[pipe_index] = next_index + if next_index < len(self._subtask_ids[pipe_index]): + nxt = self._subtask_ids[pipe_index][next_index] + subtasks.update(nxt, visible=True) + subtasks.start_task(nxt) + try: + t = subtasks.tasks[nxt] + self._active_subtask_text = str(getattr(t, "description", "") or "").strip() or None + except Exception: + self._active_subtask_text = None + else: + self._active_subtask_text = None + + def finish_pipe(self, pipe_index: int, *, force_complete: bool = True) -> None: + if not self._ensure_pipe(pipe_index): + return + + pipe_progress = self._pipe_progress + subtasks = self._subtasks + overall = self._overall + assert pipe_progress is not None + assert subtasks is not None + assert overall is not None + + total = self._pipe_totals[pipe_index] + done = self._pipe_done[pipe_index] + + # Ensure the pipe bar finishes even if cmdlet didn’t emit per item. + if force_complete and done < total: + pipe_task = self._pipe_tasks[pipe_index] + pipe_progress.update(pipe_task, completed=total) + self._pipe_done[pipe_index] = total + + # Hide any remaining subtask spinners. + for sub_id in self._subtask_ids[pipe_index]: + try: + subtasks.stop_task(sub_id) + subtasks.update(sub_id, visible=False) + except Exception: + pass + + # If we just finished the active pipe, clear the title context. + self._active_subtask_text = None + + # Stop the per-pipe timer once the pipe is finished. + try: + pipe_task = self._pipe_tasks[pipe_index] + pipe_progress.stop_task(pipe_task) + except Exception: + pass + + if self._overall_task is not None: + completed = 0 + try: + completed = sum(1 for i in range(len(self._pipe_labels)) if self._pipe_done[i] >= max(1, self._pipe_totals[i])) + except Exception: + completed = 0 + overall.update( + self._overall_task, + completed=min(completed, max(1, len(self._pipe_labels))), + description=f"Pipeline: {completed}/{len(self._pipe_labels)} pipes completed", + ) + class PipelineStageContext: - """Context information for the current pipeline stage.""" - - def __init__(self, stage_index: int, total_stages: int, worker_id: Optional[str] = None): - self.stage_index = stage_index - self.total_stages = total_stages - self.is_last_stage = (stage_index == total_stages - 1) - self.worker_id = worker_id - self.emits: List[Any] = [] - - def emit(self, obj: Any) -> None: - """Emit an object to the next pipeline stage.""" - self.emits.append(obj) - - def get_current_command_text(self) -> str: - """Get the current command text (for backward compatibility).""" - # This is maintained for backward compatibility with old code - # In a real implementation, this would come from the stage context - return "" - - def __repr__(self) -> str: - return f"PipelineStageContext(stage={self.stage_index}/{self.total_stages}, is_last={self.is_last_stage}, worker_id={self.worker_id})" + """Context information for the current pipeline stage.""" + + def __init__( + self, + stage_index: int, + total_stages: int, + worker_id: Optional[str] = None, + on_emit: Optional[Callable[[Any], None]] = None, + ): + self.stage_index = stage_index + self.total_stages = total_stages + self.is_last_stage = (stage_index == total_stages - 1) + self.worker_id = worker_id + self._on_emit = on_emit + self.emits: List[Any] = [] + + def emit(self, obj: Any) -> None: + """Emit an object to the next pipeline stage.""" + self.emits.append(obj) + cb = getattr(self, "_on_emit", None) + if cb: + try: + cb(obj) + except Exception: + pass + + def get_current_command_text(self) -> str: + """Get the current command text (for backward compatibility).""" + # This is maintained for backward compatibility with old code + # In a real implementation, this would come from the stage context + return "" + + def __repr__(self) -> str: + return ( + f"PipelineStageContext(stage={self.stage_index}/{self.total_stages}, " + f"is_last={self.is_last_stage}, worker_id={self.worker_id})" + ) # ============================================================================ diff --git a/pipeline.py b/pipeline.py index bd6f23a..c5059b1 100644 --- a/pipeline.py +++ b/pipeline.py @@ -20,11 +20,54 @@ from __future__ import annotations import sys import shlex +from contextlib import contextmanager from typing import Any, Dict, List, Optional, Sequence from models import PipelineStageContext from SYS.logger import log +# Live progress UI instance (optional). Set by the pipeline runner. +_LIVE_PROGRESS: Any = None + + +def set_live_progress(progress_ui: Any) -> None: + """Register the current Live progress UI so cmdlets can suspend it during prompts.""" + global _LIVE_PROGRESS + _LIVE_PROGRESS = progress_ui + + +def get_live_progress() -> Any: + return _LIVE_PROGRESS + + +@contextmanager +def suspend_live_progress(): + """Temporarily pause Live progress rendering. + + This avoids Rich Live cursor control interfering with interactive tables/prompts + emitted by cmdlets during preflight (e.g. URL-duplicate confirmation). + """ + ui = _LIVE_PROGRESS + paused = False + try: + if ui is not None and hasattr(ui, "pause"): + try: + ui.pause() + paused = True + except Exception: + paused = False + yield + finally: + # If a stage requested the pipeline stop (e.g. user declined a preflight prompt), + # do not resume Live rendering. + if get_pipeline_stop() is not None: + return + if paused and ui is not None and hasattr(ui, "resume"): + try: + ui.resume() + except Exception: + pass + def _is_selectable_table(table: Any) -> bool: """Return True when a table can be used for @ selection.""" @@ -96,6 +139,28 @@ _PENDING_PIPELINE_SOURCE: Optional[str] = None _UI_LIBRARY_REFRESH_CALLBACK: Optional[Any] = None +# ============================================================================ +# PIPELINE STOP SIGNAL +# ============================================================================ + +_PIPELINE_STOP: Optional[Dict[str, Any]] = None + + +def request_pipeline_stop(*, reason: str = "", exit_code: int = 0) -> None: + """Request that the pipeline runner stop gracefully after the current stage.""" + global _PIPELINE_STOP + _PIPELINE_STOP = {"reason": str(reason or "").strip(), "exit_code": int(exit_code)} + + +def get_pipeline_stop() -> Optional[Dict[str, Any]]: + return _PIPELINE_STOP + + +def clear_pipeline_stop() -> None: + global _PIPELINE_STOP + _PIPELINE_STOP = None + + # ============================================================================ # PUBLIC API # ============================================================================ diff --git a/result_table.py b/result_table.py index 9c39891..918273a 100644 --- a/result_table.py +++ b/result_table.py @@ -48,6 +48,177 @@ def _sanitize_cell_text(value: Any) -> str: ) +def _format_duration_hms(duration: Any) -> str: + """Format a duration in seconds into a compact h/m/s string. + + Examples: + 3150 -> "52m30s" + 59 -> "59s" + 3600 -> "1h0m0s" + + If the value is not numeric, returns an empty string. + """ + if duration is None: + return "" + try: + if isinstance(duration, str): + s = duration.strip() + if not s: + return "" + # If it's already formatted (contains letters/colon), leave it to caller. + if any(ch.isalpha() for ch in s) or ":" in s: + return "" + seconds = float(s) + else: + seconds = float(duration) + except Exception: + return "" + + if seconds < 0: + return "" + + total_seconds = int(seconds) + minutes, secs = divmod(total_seconds, 60) + hours, minutes = divmod(minutes, 60) + + parts: List[str] = [] + if hours > 0: + parts.append(f"{hours}h") + if minutes > 0 or hours > 0: + parts.append(f"{minutes}m") + parts.append(f"{secs}s") + return "".join(parts) + + +@dataclass(frozen=True) +class TableColumn: + """Reusable column specification. + + This is intentionally separate from `ResultColumn`: + - `ResultColumn` is a rendered (name,value) pair attached to a single row. + - `TableColumn` is a reusable extractor/formatter used to build rows consistently + across cmdlets and stores. + """ + + key: str + header: str + extractor: Callable[[Any], Any] + + def extract(self, item: Any) -> Any: + try: + return self.extractor(item) + except Exception: + return None + + +def _get_first_dict_value(data: Dict[str, Any], keys: List[str]) -> Any: + for k in keys: + if k in data: + v = data.get(k) + if v is not None and str(v).strip() != "": + return v + return None + + +def _as_dict(item: Any) -> Optional[Dict[str, Any]]: + if isinstance(item, dict): + return item + try: + if hasattr(item, "__dict__"): + return dict(getattr(item, "__dict__")) + except Exception: + return None + return None + + +def extract_store_value(item: Any) -> str: + data = _as_dict(item) or {} + store = _get_first_dict_value(data, ["store", "table", "source", "storage"]) # storage is legacy + return str(store or "").strip() + + +def extract_hash_value(item: Any) -> str: + data = _as_dict(item) or {} + hv = _get_first_dict_value(data, ["hash", "hash_hex", "file_hash", "sha256"]) + return str(hv or "").strip() + + +def extract_title_value(item: Any) -> str: + data = _as_dict(item) or {} + title = _get_first_dict_value(data, ["title", "name", "filename"]) + if not title: + title = _get_first_dict_value(data, ["target", "path", "url"]) # last resort display + return str(title or "").strip() + + +def extract_ext_value(item: Any) -> str: + data = _as_dict(item) or {} + + meta = data.get("metadata") if isinstance(data.get("metadata"), dict) else {} + raw_path = data.get("path") or data.get("target") or data.get("filename") or data.get("title") + + ext = ( + _get_first_dict_value(data, ["ext", "file_ext", "extension"]) + or _get_first_dict_value(meta, ["ext", "file_ext", "extension"]) + ) + + if (not ext) and raw_path: + try: + suf = Path(str(raw_path)).suffix + if suf: + ext = suf.lstrip(".") + except Exception: + ext = "" + + ext_str = str(ext or "").strip().lstrip(".") + for idx, ch in enumerate(ext_str): + if not ch.isalnum(): + ext_str = ext_str[:idx] + break + return ext_str[:5] + + +def extract_size_bytes_value(item: Any) -> Optional[int]: + data = _as_dict(item) or {} + meta = data.get("metadata") if isinstance(data.get("metadata"), dict) else {} + + size_val = ( + _get_first_dict_value(data, ["size_bytes", "size", "file_size", "bytes", "filesize"]) + or _get_first_dict_value(meta, ["size_bytes", "size", "file_size", "bytes", "filesize"]) + ) + if size_val is None: + return None + try: + s = str(size_val).strip() + if not s: + return None + # Some sources might provide floats or numeric strings + return int(float(s)) + except Exception: + return None + + +COMMON_COLUMNS: Dict[str, TableColumn] = { + "title": TableColumn("title", "Title", extract_title_value), + "store": TableColumn("store", "Store", extract_store_value), + "hash": TableColumn("hash", "Hash", extract_hash_value), + "ext": TableColumn("ext", "Ext", extract_ext_value), + "size": TableColumn("size", "Size", extract_size_bytes_value), +} + + +def build_display_row(item: Any, *, keys: List[str]) -> Dict[str, Any]: + """Build a dict suitable for `ResultTable.add_result()` using shared column specs.""" + out: Dict[str, Any] = {} + for k in keys: + spec = COMMON_COLUMNS.get(k) + if spec is None: + continue + val = spec.extract(item) + out[spec.key] = val + return out + + @dataclass class InputOption: """Represents an interactive input option (cmdlet argument) in a table. @@ -159,6 +330,12 @@ class ResultRow: break str_value = str_value[:5] + # Normalize Duration columns: providers often pass raw seconds. + if normalized_name.lower() == "duration": + formatted = _format_duration_hms(value) + if formatted: + str_value = formatted + self.columns.append(ResultColumn(normalized_name, str_value)) def get_column(self, name: str) -> Optional[str]: @@ -502,16 +679,12 @@ class ResultTable: # Tag summary if hasattr(result, 'tag_summary') and result.tag_summary: - tag_str = str(result.tag_summary) - if len(tag_str) > 60: - tag_str = tag_str[:57] + "..." - row.add_column("Tag", tag_str) + row.add_column("Tag", str(result.tag_summary)) # Duration (for media) if hasattr(result, 'duration_seconds') and result.duration_seconds: - minutes = int(result.duration_seconds // 60) - seconds = int(result.duration_seconds % 60) - row.add_column("Duration", f"{minutes}m {seconds}s") + dur = _format_duration_hms(result.duration_seconds) + row.add_column("Duration", dur or str(result.duration_seconds)) # Size (for files) if hasattr(result, 'size_bytes') and result.size_bytes: @@ -519,10 +692,7 @@ class ResultTable: # Annotations if hasattr(result, 'annotations') and result.annotations: - ann_str = ", ".join(str(a) for a in result.annotations) - if len(ann_str) > 50: - ann_str = ann_str[:47] + "..." - row.add_column("Annotations", ann_str) + row.add_column("Annotations", ", ".join(str(a) for a in result.annotations)) def _add_result_item(self, row: ResultRow, item: Any) -> None: """Extract and add ResultItem fields to row (compact display for search results). @@ -550,7 +720,7 @@ class ResultTable: title = path_obj.stem if title: - row.add_column("Title", title[:90] + ("..." if len(title) > 90 else "")) + row.add_column("Title", title) # Extension column - always add to maintain column order row.add_column("Ext", extension) @@ -573,12 +743,9 @@ class ResultTable: All data preserved in TagItem for piping and operations. Tag row selection is handled by the CLI pipeline (e.g. `@N | ...`). """ - # Tag name (truncate if too long) + # Tag name if hasattr(item, 'tag_name') and item.tag_name: - tag_name = item.tag_name - if len(tag_name) > 60: - tag_name = tag_name[:57] + "..." - row.add_column("Tag", tag_name) + row.add_column("Tag", item.tag_name) # Source/Store (where the tag values come from) if hasattr(item, 'source') and item.source: @@ -593,14 +760,11 @@ class ResultTable: # Title if hasattr(obj, 'title') and obj.title: - row.add_column("Title", obj.title[:50] + ("..." if len(obj.title) > 50 else "")) + row.add_column("Title", obj.title) # File info if hasattr(obj, 'path') and obj.path: - file_str = str(obj.path) - if len(file_str) > 60: - file_str = "..." + file_str[-57:] - row.add_column("Path", file_str) + row.add_column("Path", str(obj.path)) # Tag if hasattr(obj, 'tag') and obj.tag: @@ -611,7 +775,8 @@ class ResultTable: # Duration if hasattr(obj, 'duration') and obj.duration: - row.add_column("Duration", f"{obj.duration:.1f}s") + dur = _format_duration_hms(obj.duration) + row.add_column("Duration", dur or str(obj.duration)) # Warnings if hasattr(obj, 'warnings') and obj.warnings: @@ -652,6 +817,29 @@ class ResultTable: # Strip out hidden metadata fields (prefixed with __) visible_data = {k: v for k, v in data.items() if not is_hidden_field(k)} + # Normalize common fields using shared extractors so nested metadata/path values work. + # This keeps Ext/Size/Store consistent across all dict-based result sources. + try: + store_extracted = extract_store_value(data) + if store_extracted and "store" not in visible_data and "table" not in visible_data and "source" not in visible_data: + visible_data["store"] = store_extracted + except Exception: + pass + + try: + ext_extracted = extract_ext_value(data) + # Always ensure `ext` exists so priority_groups keeps a stable column. + visible_data["ext"] = str(ext_extracted or "") + except Exception: + visible_data.setdefault("ext", "") + + try: + size_extracted = extract_size_bytes_value(data) + if size_extracted is not None and "size_bytes" not in visible_data and "size" not in visible_data: + visible_data["size_bytes"] = size_extracted + except Exception: + pass + # Handle extension separation for local files store_val = str(visible_data.get('store', '') or visible_data.get('table', '') or visible_data.get('source', '')).lower() @@ -671,7 +859,7 @@ class ResultTable: # print(f"DEBUG: Split extension. Title: {visible_data[title_field]}, Ext: {extension}") else: visible_data['ext'] = "" - + # Ensure 'ext' is present so it gets picked up by priority_groups in correct order if 'ext' not in visible_data: visible_data['ext'] = "" @@ -699,9 +887,26 @@ class ResultTable: continue if column_count >= self.max_columns: break - col_value_str = format_value(col_value) - if len(col_value_str) > 60: - col_value_str = col_value_str[:57] + "..." + # When providers supply raw numeric fields, keep formatting consistent. + if isinstance(col_name, str) and col_name.strip().lower() == "size": + try: + if col_value is None or str(col_value).strip() == "": + col_value_str = "" + else: + col_value_str = _format_size(col_value, integer_only=False) + except Exception: + col_value_str = format_value(col_value) + elif isinstance(col_name, str) and col_name.strip().lower() == "duration": + try: + if col_value is None or str(col_value).strip() == "": + col_value_str = "" + else: + dur = _format_duration_hms(col_value) + col_value_str = dur or format_value(col_value) + except Exception: + col_value_str = format_value(col_value) + else: + col_value_str = format_value(col_value) row.add_column(col_name, col_value_str) added_fields.add(col_name.lower()) column_count += 1 @@ -743,9 +948,6 @@ class ResultTable: else: value_str = format_value(visible_data[field]) - if len(value_str) > 60: - value_str = value_str[:57] + "..." - # Map field names to display column names if field in ['store', 'table', 'source']: col_name = "Store" @@ -777,11 +979,7 @@ class ResultTable: if key.startswith('_'): # Skip private attributes continue - value_str = str(value) - if len(value_str) > 60: - value_str = value_str[:57] + "..." - - row.add_column(key.replace('_', ' ').title(), value_str) + row.add_column(key.replace('_', ' ').title(), str(value)) def to_rich(self): """Return a Rich renderable representing this table.""" diff --git a/tool/playwright.py b/tool/playwright.py index 0f7c6fe..5b58ac0 100644 --- a/tool/playwright.py +++ b/tool/playwright.py @@ -41,8 +41,8 @@ class PlaywrightDefaults: "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/120.0.0.0 Safari/537.36" ) - viewport_width: int = 1280 - viewport_height: int = 1200 + viewport_width: int = 1920 + viewport_height: int = 1080 navigation_timeout_ms: int = 90_000 ignore_https_errors: bool = True @@ -149,6 +149,16 @@ class PlaywrightTool: vh = self.defaults.viewport_height if viewport_height is None else int(viewport_height) ihe = self.defaults.ignore_https_errors if ignore_https_errors is None else bool(ignore_https_errors) + # Support Playwright-native headers/user-agent. + # If user_agent is unset/empty or explicitly set to one of these tokens, + # we omit the user_agent override so Playwright uses its bundled Chromium UA. + ua_value: Optional[str] + ua_text = str(ua or "").strip() + if not ua_text or ua_text.lower() in {"native", "playwright", "default"}: + ua_value = None + else: + ua_value = ua_text + pw = None browser = None context = None @@ -164,11 +174,14 @@ class PlaywrightTool: headless=h, args=["--disable-blink-features=AutomationControlled"], ) - context = browser.new_context( - user_agent=ua, - viewport={"width": vw, "height": vh}, - ignore_https_errors=ihe, - ) + context_kwargs: Dict[str, Any] = { + "viewport": {"width": vw, "height": vh}, + "ignore_https_errors": ihe, + } + if ua_value is not None: + context_kwargs["user_agent"] = ua_value + + context = browser.new_context(**context_kwargs) page = context.new_page() yield page finally: diff --git a/tool/ytdlp.py b/tool/ytdlp.py index f379565..b68c722 100644 --- a/tool/ytdlp.py +++ b/tool/ytdlp.py @@ -89,6 +89,11 @@ class YtDlpTool: def _load_defaults(self) -> YtDlpDefaults: cfg = self._config + # NOTE: `YtDlpDefaults` is a slots dataclass. Referencing defaults via + # `YtDlpDefaults.video_format` yields a `member_descriptor`, not the + # default string value. Use an instance for fallback defaults. + _fallback_defaults = YtDlpDefaults() + tool_block = _get_nested(cfg, "tool", "ytdlp") if not isinstance(tool_block, dict): tool_block = {} @@ -128,8 +133,8 @@ class YtDlpTool: fmt_sort = _parse_csv_list(fmt_sort_val) defaults = YtDlpDefaults( - video_format=str(nested_video or video_format or YtDlpDefaults.video_format), - audio_format=str(nested_audio or audio_format or YtDlpDefaults.audio_format), + video_format=str(nested_video or video_format or _fallback_defaults.video_format), + audio_format=str(nested_audio or audio_format or _fallback_defaults.audio_format), format_sort=fmt_sort, )