diff --git a/API/data/alldebrid.json b/API/data/alldebrid.json index 8b0c686..501748d 100644 --- a/API/data/alldebrid.json +++ b/API/data/alldebrid.json @@ -71,7 +71,7 @@ "(wayupload\\.com/[a-z0-9]{12}\\.html)" ], "regexp": "(turbobit5?a?\\.(net|cc|com)/([a-z0-9]{12}))|(turbobif\\.(net|cc|com)/([a-z0-9]{12}))|(turb[o]?\\.(to|cc|pw)\\/([a-z0-9]{12}))|(turbobit\\.(net|cc)/download/free/([a-z0-9]{12}))|((trbbt|tourbobit|torbobit|tbit|turbobita|trbt)\\.(net|cc|com|to)/([a-z0-9]{12}))|((turbobit\\.cloud/turbo/[a-z0-9]+))|((wayupload\\.com/[a-z0-9]{12}\\.html))", - "status": true + "status": false }, "hitfile": { "name": "hitfile", @@ -92,7 +92,7 @@ "(hitfile\\.net/[a-z0-9A-Z]{4,9})" ], "regexp": "(hitf\\.(to|cc)/([a-z0-9A-Z]{4,9}))|(htfl\\.(net|to|cc)/([a-z0-9A-Z]{4,9}))|(hitfile\\.(net)/download/free/([a-z0-9A-Z]{4,9}))|((hitfile\\.net/[a-z0-9A-Z]{4,9}))", - "status": true + "status": false }, "mega": { "name": "mega", @@ -478,10 +478,10 @@ "katfile.vip" ], "regexps": [ - "katfile\\.(cloud|online|vip)/([0-9a-zA-Z]{12})", + "katfile\\.(cloud|online|vip|ws)/([0-9a-zA-Z]{12})", "(katfile\\.com/[0-9a-zA-Z]{12})" ], - "regexp": "(katfile\\.(cloud|online|vip)/([0-9a-zA-Z]{12}))|((katfile\\.com/[0-9a-zA-Z]{12}))", + "regexp": "(katfile\\.(cloud|online|vip|ws)/([0-9a-zA-Z]{12}))|((katfile\\.com/[0-9a-zA-Z]{12}))", "status": true }, "mediafire": { @@ -494,7 +494,7 @@ "mediafire\\.com/(\\?|download/|file/|download\\.php\\?)([0-9a-z]{15})" ], "regexp": "mediafire\\.com/(\\?|download/|file/|download\\.php\\?)([0-9a-z]{15})", - "status": false + "status": true }, "mixdrop": { "name": "mixdrop", @@ -618,7 +618,7 @@ "(upload42\\.com/[0-9a-zA-Z]{12})" ], "regexp": "(upload42\\.com/[0-9a-zA-Z]{12})", - "status": true + "status": false }, "uploadbank": { "name": "uploadbank", diff --git a/MPV/lyric.py b/MPV/lyric.py index 51ce481..91565c0 100644 --- a/MPV/lyric.py +++ b/MPV/lyric.py @@ -60,6 +60,7 @@ _LYRIC_VISIBLE_PROP = "user-data/medeia-lyric-visible" # to a store via the store DB. _ITEM_STORE_PROP = "user-data/medeia-item-store" _ITEM_HASH_PROP = "user-data/medeia-item-hash" +_LEGACY_SUB_TRACK_TITLES = ("medeia-note-sub", "medeia-lyric-sub", "medeia-sub") # Note: We previously used `osd-overlay`, but some mpv builds return # error='invalid parameter' for that command. We now use `show-text`, which is @@ -540,6 +541,39 @@ def _lyric_duration_ms(idx: int, times: List[float], current_t: float) -> int: return 1200 +def _format_vtt_timestamp(seconds: float) -> str: + total_ms = max(0, int(round(float(seconds or 0.0) * 1000.0))) + hours = total_ms // 3600000 + minutes = (total_ms // 60000) % 60 + secs = (total_ms // 1000) % 60 + millis = total_ms % 1000 + return f"{hours:02d}:{minutes:02d}:{secs:02d}.{millis:03d}" + + +def _lrc_entries_to_vtt_text(entries: List[LrcLine]) -> str: + if not entries: + return "WEBVTT\n\n" + + lines: List[str] = ["WEBVTT", ""] + times = [entry.time_s for entry in entries] + for idx, entry in enumerate(entries, start=1): + start_s = max(0.0, float(entry.time_s or 0.0)) + if idx < len(entries): + end_s = max(start_s + 0.25, float(times[idx])) + else: + end_s = start_s + 1.2 + + text = str(entry.text or "").replace("\r\n", "\n").replace("\r", "\n") + cue_text = text if text.strip() else " " + + lines.append(str(idx)) + lines.append(f"{_format_vtt_timestamp(start_s)} --> {_format_vtt_timestamp(end_s)}") + lines.extend(cue_text.split("\n")) + lines.append("") + + return "\n".join(lines) + + def _unwrap_memory_m3u(text: Optional[str]) -> Optional[str]: """Extract the real target URL/path from a memory:// M3U payload.""" if not isinstance(text, str) or not text.startswith("memory://"): @@ -596,6 +630,12 @@ def _notes_cache_root() -> Path: return root +def _generated_sub_root() -> Path: + root = Path(tempfile.gettempdir()) / "medeia-mpv-notes" + root.mkdir(parents=True, exist_ok=True) + return root + + def _notes_cache_key(store: str, file_hash: str) -> str: return hashlib.sha1( f"{str(store or '').strip().lower()}:{str(file_hash or '').strip().lower()}".encode( @@ -790,6 +830,26 @@ def _extract_note_text(notes: Dict[str, str], name: str) -> Optional[str]: return text if text.strip() else None +def _extract_first_note_text( + notes: Dict[str, str], + names: List[str], + *, + predicate: Optional[Any] = None, +) -> tuple[Optional[str], Optional[str]]: + for name in names: + candidate = _extract_note_text(notes, name) + if not candidate: + continue + if predicate is not None: + try: + if not bool(predicate(candidate)): + continue + except Exception: + continue + return name, candidate + return None, None + + def _extract_lrc_from_notes(notes: Dict[str, str]) -> Optional[str]: """Return raw LRC text from the note named 'lyric'.""" return _extract_note_text(notes, "lyric") @@ -811,18 +871,61 @@ def _looks_like_subtitle_text(text: str) -> bool: return False -def _extract_sub_from_notes(notes: Dict[str, str]) -> Optional[str]: - """Return raw subtitle text from note-backed subtitle/transcript keys.""" +def _extract_sub_from_notes(notes: Dict[str, str]) -> tuple[Optional[str], Optional[str]]: + """Return (note_name, subtitle_text) from note-backed subtitle/transcript keys.""" primary = _extract_note_text(notes, "sub") if primary: - return primary - for note_name in _SUBTITLE_NOTE_ALIASES: - candidate = _extract_note_text(notes, note_name) - if candidate and _looks_like_subtitle_text(candidate): - return candidate + return "sub", primary + return _extract_first_note_text( + notes, + list(_SUBTITLE_NOTE_ALIASES), + predicate=_looks_like_subtitle_text, + ) + + +def _display_note_name(note_name: Optional[str]) -> str: + text = re.sub(r"\s+", " ", str(note_name or "").replace("_", " ")).strip() + if not text: + return "subtitle" + lowered = text.casefold() + if lowered == "lyric": + return "lyrics" + if lowered == "sub": + return "subtitles" + return text + + +def _display_media_title(client: MPVIPCClient) -> Optional[str]: + for key in ("metadata/by-key/title", "metadata/by-key/Title", "media-title"): + try: + value = _ipc_get_property(client, key, None) + except Exception: + value = None + if isinstance(value, str): + text = re.sub(r"\s+", " ", value).strip() + if text: + return text return None +def _generated_subtitle_title(client: MPVIPCClient, *, note_name: Optional[str]) -> str: + note_label = _display_note_name(note_name) + media_title = _display_media_title(client) + if media_title: + title = f"{note_label}: {media_title}" + else: + title = note_label + title = re.sub(r"\s+", " ", title).strip() + return title[:96] if len(title) > 96 else title + + +def _filename_slug(text: Optional[str], *, default: str) -> str: + value = re.sub(r"[^A-Za-z0-9._ -]+", " ", str(text or "")) + value = re.sub(r"\s+", "-", value).strip("- ._") + value = value[:48] + return value or default + + def _infer_sub_extension(text: str) -> str: # Best-effort: mpv generally understands SRT/VTT; choose based on content. t = (text or "").lstrip("\ufeff\r\n").lstrip() @@ -839,39 +942,122 @@ def _infer_sub_extension(text: str) -> str: return ".vtt" -def _write_temp_sub_file(*, key: str, text: str) -> Path: +def _write_temp_sub_file(*, key: str, text: str, label: Optional[str] = None) -> Path: # Write to a content-addressed temp path so updates force mpv reload. - tmp_dir = Path(tempfile.gettempdir()) / "medeia-mpv-notes" - tmp_dir.mkdir(parents=True, exist_ok=True) + tmp_dir = _generated_sub_root() ext = _infer_sub_extension(text) digest = hashlib.sha1((key + "\n" + (text or "")).encode("utf-8", errors="ignore") ).hexdigest()[:16] - safe_key = hashlib.sha1((key or "").encode("utf-8", - errors="ignore")).hexdigest()[:12] - path = (tmp_dir / f"sub-{safe_key}-{digest}{ext}").resolve() + prefix = _filename_slug(label, default="subtitle") + path = (tmp_dir / f"{prefix}-{digest}{ext}").resolve() path.write_text(text or "", encoding="utf-8", errors="replace") return path -def _try_remove_selected_external_sub(client: MPVIPCClient) -> None: +def _subtitle_track_snapshot(client: MPVIPCClient) -> List[Dict[str, Any]]: + raw = _ipc_get_property(client, "track-list", []) + return raw if isinstance(raw, list) else [] + + +def _track_external_sub_path(track: Dict[str, Any]) -> Optional[Path]: + if not isinstance(track, dict): + return None + for key in ("external-filename", "external_filename", "demux-filename", "demux_filename"): + raw = track.get(key) + if not isinstance(raw, str): + continue + text = raw.strip() + if not text: + continue + try: + return Path(text).expanduser().resolve() + except Exception: + return Path(text) + return None + + +def _is_medeia_generated_sub_track(track: Dict[str, Any]) -> bool: + if not isinstance(track, dict): + return False + title = str(track.get("title") or "").strip() + if title in _LEGACY_SUB_TRACK_TITLES: + return True + path = _track_external_sub_path(track) + if path is None: + return False try: - client.send_command({ - "command": ["sub-remove"] - }) + path.relative_to(_generated_sub_root().resolve()) + return True except Exception: + return False + + +def _find_medeia_sub_track_ids(client: MPVIPCClient) -> List[int]: + out: List[int] = [] + for track in _subtitle_track_snapshot(client): + if not isinstance(track, dict): + continue + if str(track.get("type") or "") != "sub": + continue + if not _is_medeia_generated_sub_track(track): + continue + try: + track_id = int(track.get("id")) + except Exception: + continue + out.append(track_id) + return out + + +def _log_medeia_sub_tracks(client: MPVIPCClient, reason: str) -> None: + parts: List[str] = [] + for track in _subtitle_track_snapshot(client): + if not isinstance(track, dict): + continue + if str(track.get("type") or "") != "sub": + continue + if not _is_medeia_generated_sub_track(track): + continue + title = str(track.get("title") or "").strip() + source = _track_external_sub_path(track) + parts.append( + f"id={track.get('id')}" + f" title={title!r}" + f" selected={bool(track.get('selected'))}" + f" external={bool(track.get('external'))}" + f" source={source.name if source is not None else ''}" + ) + if parts: + _log(f"Medeia subtitle tracks {reason}: " + " | ".join(parts)) + else: + _log(f"Medeia subtitle tracks {reason}: ") + + +def _remove_medeia_external_subs(client: MPVIPCClient, *, reason: str = "") -> None: + track_ids = _find_medeia_sub_track_ids(client) + if not track_ids: return + _log(f"Removing Medeia subtitle tracks reason={reason or 'unknown'} ids={track_ids}") + for track_id in track_ids: + try: + client.send_command({ + "command": ["sub-remove", int(track_id)] + }) + except Exception: + continue + _log_medeia_sub_tracks(client, f"after-remove:{reason or 'unknown'}") -def _try_add_external_sub(client: MPVIPCClient, path: Path) -> None: +def _try_add_external_sub(client: MPVIPCClient, path: Path, *, title: str) -> None: try: client.send_command( { "command": ["sub-add", str(path), "select", - "medeia-sub"] + str(title or _NOTE_SUB_TRACK_TITLE)] } ) except Exception: @@ -1099,7 +1285,7 @@ class _PlaybackState: entries: List[LrcLine] = field(default_factory=list) times: List[float] = field(default_factory=list) loaded_key: Optional[str] = None - loaded_mode: Optional[str] = None # 'lyric' | 'sub' | None + loaded_mode: Optional[str] = None # 'lyric' | 'sub' | 'lyric-sub' | None loaded_sub_path: Optional[Path] = None last_target: Optional[str] = None fetch_attempt_key: Optional[str] = None @@ -1130,7 +1316,7 @@ class _PlaybackState: self.loaded_key = None self.loaded_mode = None if self.loaded_sub_path is not None: - _try_remove_selected_external_sub(client) + _remove_medeia_external_subs(client, reason="state-clear") self.loaded_sub_path = None @@ -1153,6 +1339,7 @@ def run_auto_overlay( return 3 _log(f"Auto overlay connected (ipc={getattr(mpv, 'ipc_path', None)})") + _remove_medeia_external_subs(client, reason="startup-sweep") state = _PlaybackState() last_idx: Optional[int] = None @@ -1196,6 +1383,12 @@ def run_auto_overlay( if not client.connect(): _log("mpv IPC disconnected; exiting MPV.lyric") return 4 + _remove_medeia_external_subs(client, reason="reconnect-sweep:path") + state.clear(client) + state.last_target = None + last_idx = None + last_text = None + last_visible = None time.sleep(poll_s) continue @@ -1207,11 +1400,14 @@ def run_auto_overlay( last_visible = visible elif last_visible is True and visible is False: _osd_clear_and_restore(client) - _try_remove_selected_external_sub(client) + _remove_medeia_external_subs(client, reason="visibility-off") + state.loaded_sub_path = None last_idx = None last_text = None last_visible = visible elif last_visible is False and visible is True: + if state.loaded_mode in {"sub", "lyric-sub"} and state.loaded_sub_path is None: + state.loaded_key = None last_idx = None last_text = None last_visible = visible @@ -1459,21 +1655,26 @@ def run_auto_overlay( except Exception: _log("Loaded notes keys: ") - sub_text = _extract_note_text(notes, "sub") + sub_note_name, sub_text = _extract_sub_from_notes(notes) if sub_text: # Hand subtitles to mpv's track subsystem; suppress OSD lyric overlay. _osd_clear_and_restore(client) sub_path: Optional[Path] = None + sub_title = _generated_subtitle_title(client, note_name=sub_note_name) try: - sub_path = _write_temp_sub_file(key=state.key, text=sub_text) + sub_path = _write_temp_sub_file(key=state.key, text=sub_text, label=sub_title) except Exception as exc: _log(f"Failed to write sub note temp file: {exc}") if sub_path is not None: - if state.loaded_sub_path is not None: - _try_remove_selected_external_sub(client) - _try_add_external_sub(client, sub_path) + _remove_medeia_external_subs(client, reason="load-note-sub") + _try_add_external_sub(client, sub_path, title=sub_title) state.loaded_sub_path = sub_path + _log( + f"Loaded note-backed native subtitle track" + f" note={sub_note_name!r} title={sub_title!r} path={sub_path}" + ) + _log_medeia_sub_tracks(client, "after-add-note-sub") state.entries = [] state.times = [] @@ -1481,12 +1682,12 @@ def run_auto_overlay( state.loaded_mode = "sub" else: - # Switching away from sub mode: unload the external subtitle. - if state.loaded_mode == "sub" and state.loaded_sub_path is not None: - _try_remove_selected_external_sub(client) + # Switching away from native subtitle mode: unload the external subtitle. + if state.loaded_sub_path is not None: + _remove_medeia_external_subs(client, reason="switch-away-native-sub") state.loaded_sub_path = None - lrc_text = _extract_note_text(notes, "lyric") + lrc_text = _extract_lrc_from_notes(notes) if not lrc_text: _log("No lyric note found (note name: 'lyric')") @@ -1569,10 +1770,47 @@ def run_auto_overlay( else: _log(f"Loaded lyric note ({len(lrc_text)} chars)") parsed = parse_lrc(lrc_text) - state.entries = parsed - state.times = [e.time_s for e in parsed] - state.loaded_key = state.key - state.loaded_mode = "lyric" + if not parsed: + _log("Lyric note contained no timestamped entries") + _osd_clear_and_restore(client) + state.entries = [] + state.times = [] + state.loaded_key = state.key + state.loaded_mode = None + else: + lyric_sub_path: Optional[Path] = None + lyric_sub_title = _generated_subtitle_title(client, note_name="lyric") + try: + lyric_sub_text = _lrc_entries_to_vtt_text(parsed) + lyric_sub_path = _write_temp_sub_file( + key=f"{state.key}:lyric", + text=lyric_sub_text, + label=lyric_sub_title, + ) + except Exception as exc: + _log(f"Failed to write lyric note temp subtitle: {exc}") + + if lyric_sub_path is None: + _osd_clear_and_restore(client) + state.entries = [] + state.times = [] + state.loaded_key = state.key + state.loaded_mode = None + else: + _osd_clear_and_restore(client) + _remove_medeia_external_subs(client, reason="load-lyric-sub") + _try_add_external_sub(client, lyric_sub_path, title=lyric_sub_title) + state.loaded_sub_path = lyric_sub_path + state.entries = [] + state.times = [] + state.loaded_key = state.key + state.loaded_mode = "lyric-sub" + _log( + f"Loaded lyric note as native subtitle track" + f" title={lyric_sub_title!r} entries={len(parsed)}" + f" path={lyric_sub_path}" + ) + _log_medeia_sub_tracks(client, "after-add-lyric-sub") # ---------------------------------------------------------------- # 8. Render the current lyric line. @@ -1590,6 +1828,12 @@ def run_auto_overlay( if not client.connect(): _log("mpv IPC disconnected; exiting MPV.lyric") return 4 + _remove_medeia_external_subs(client, reason="reconnect-sweep:time") + state.clear(client) + state.last_target = None + last_idx = None + last_text = None + last_visible = None time.sleep(poll_s) continue diff --git a/MPV/pipeline_helper.py b/MPV/pipeline_helper.py index fcb63de..2aed604 100644 --- a/MPV/pipeline_helper.py +++ b/MPV/pipeline_helper.py @@ -756,7 +756,6 @@ def _run_op(op: str, data: Any) -> Dict[str, Any]: refresh = bool(data.get("refresh") or data.get("reload")) if cached_choices and not refresh: - debug(f"[store-choices] using cached choices={len(cached_choices)}") return { "success": True, "stdout": "", @@ -767,20 +766,14 @@ def _run_op(op: str, data: Any) -> Dict[str, Any]: } try: - config_root = _runtime_config_root() choices = _load_store_choices_from_config(force_reload=refresh) if not choices and cached_choices: choices = cached_choices - debug( - f"[store-choices] config returned empty; falling back to cached choices={len(choices)}" - ) if choices: choices = _set_cached_store_choices(choices) - debug(f"[store-choices] config_dir={config_root} choices={len(choices)}") - return { "success": True, "stdout": "", @@ -791,9 +784,6 @@ def _run_op(op: str, data: Any) -> Dict[str, Any]: } except Exception as exc: if cached_choices: - debug( - f"[store-choices] refresh failed; returning cached choices={len(cached_choices)} error={type(exc).__name__}: {exc}" - ) return { "success": True, "stdout": "", diff --git a/SYS/cmdlet_spec.py b/SYS/cmdlet_spec.py index 8699db6..5242d37 100644 --- a/SYS/cmdlet_spec.py +++ b/SYS/cmdlet_spec.py @@ -101,10 +101,7 @@ class SharedArgs: if not force and hasattr(SharedArgs, "_cached_available_stores"): return SharedArgs._cached_available_stores or [] - if not force: - SharedArgs._refresh_store_choices_cache(config, skip_instantiation=True) - else: - SharedArgs._refresh_store_choices_cache(config, skip_instantiation=False) + SharedArgs._refresh_store_choices_cache(config, skip_instantiation=False) return SharedArgs._cached_available_stores or [] @staticmethod @@ -119,13 +116,7 @@ class SharedArgs: SharedArgs._cached_available_stores = [] return - try: - from Store.registry import list_configured_backend_names - - SharedArgs._cached_available_stores = list_configured_backend_names(config) or [] - except Exception: - SharedArgs._cached_available_stores = [] - + SharedArgs._cached_available_stores = [] if skip_instantiation: return diff --git a/cmdlet/_shared.py b/cmdlet/_shared.py index bb02fed..7783785 100644 --- a/cmdlet/_shared.py +++ b/cmdlet/_shared.py @@ -228,23 +228,18 @@ class SharedArgs: if not force and hasattr(SharedArgs, "_cached_available_stores"): return SharedArgs._cached_available_stores or [] - # Refresh the cache. When not forcing, prefer a lightweight configured-name - # pass to avoid instantiating backends (which may perform work such as opening DBs). - if not force: - SharedArgs._refresh_store_choices_cache(config, skip_instantiation=True) - else: - SharedArgs._refresh_store_choices_cache(config, skip_instantiation=False) + # Autocomplete and shared arg choices must only expose backends that actually + # initialized successfully. Do a full refresh when the cache is missing. + SharedArgs._refresh_store_choices_cache(config, skip_instantiation=False) return SharedArgs._cached_available_stores or [] @staticmethod def _refresh_store_choices_cache(config: Optional[Dict[str, Any]] = None, skip_instantiation: bool = False) -> None: """Refresh the cached store choices list. Should be called once at startup. - This performs a lightweight pass first (reads configured names only, without - instantiating backend classes) to avoid side-effects during autocompletion or - other quick lookups. When `skip_instantiation` is False, the function will - attempt a full StoreRegistry initialization to filter out backends that failed - to initialize properly. + Store choices are user-facing and should only include backends that actually + initialized successfully. When `skip_instantiation` is True, this method keeps + the cache empty rather than surfacing configured-but-disabled store names. Args: config: Config dict. If not provided, will try to load from config module. @@ -259,15 +254,10 @@ class SharedArgs: SharedArgs._cached_available_stores = [] return - # Lightweight pass: return configured names without instantiating backends - try: - from Store.registry import list_configured_backend_names - SharedArgs._cached_available_stores = list_configured_backend_names(config) or [] - except Exception: - SharedArgs._cached_available_stores = [] + SharedArgs._cached_available_stores = [] - # If caller explicitly requested a full scan, instantiate registry to get - # only backends that actually initialized successfully. + # If caller requested a lightweight pass, avoid exposing configured names + # that may be disabled or unavailable. if skip_instantiation: return @@ -278,7 +268,6 @@ class SharedArgs: if available: SharedArgs._cached_available_stores = available except Exception: - # Keep the lightweight list if full initialization fails pass except Exception: SharedArgs._cached_available_stores = [] diff --git a/cmdlet/add_file.py b/cmdlet/add_file.py index 9cca9bd..5c0ce1a 100644 --- a/cmdlet/add_file.py +++ b/cmdlet/add_file.py @@ -338,6 +338,14 @@ class Add_File(Cmdlet): except Exception: is_storage_backend_location = False + if location and not plugin_name and not is_storage_backend_location: + if not Add_File._looks_like_local_export_target(str(location)): + log( + f"Storage backend '{location}' not found. Use -path for local export or configure that store backend.", + file=sys.stderr, + ) + return 1 + # Decide which items to process. # - If directory scan was performed, use those results # - If user provided -path (and it was not reinterpreted as destination), treat this invocation as single-item. @@ -1262,6 +1270,27 @@ class Add_File(Cmdlet): pass return None + @staticmethod + def _looks_like_local_export_target(location: str) -> bool: + target = str(location or "").strip() + if not target: + return False + + target_path = Path(target).expanduser() + try: + if target_path.exists(): + return True + except Exception: + pass + + if target.startswith((".", "~")): + return True + if "\\" in target or "/" in target: + return True + if len(target) >= 2 and target[1] == ":": + return True + return False + @staticmethod def _resolve_source( result: Any, diff --git a/cmdlet/screen_shot.py b/cmdlet/screen_shot.py index 3185975..42edd49 100644 --- a/cmdlet/screen_shot.py +++ b/cmdlet/screen_shot.py @@ -7,6 +7,7 @@ Playwright, marking them as temporary artifacts for cleanup. from __future__ import annotations import hashlib +import io import sys import tempfile import time @@ -17,18 +18,17 @@ from pathlib import Path from typing import Any, Dict, List, Optional, Sequence, Tuple from urllib.parse import urlsplit, quote, urljoin, unquote -from SYS.logger import log, debug, is_debug_enabled +from SYS.logger import debug_panel, log, is_debug_enabled from SYS.item_accessors import extract_item_tags, get_result_title from API.HTTP import HTTPClient from SYS.pipeline_progress import PipelineProgress -from SYS.utils import ensure_directory, unique_path, unique_preserve_order +from SYS.utils import ensure_directory, sha256_file, unique_path, unique_preserve_order from . import _shared as sh Cmdlet = sh.Cmdlet CmdletArg = sh.CmdletArg SharedArgs = sh.SharedArgs create_pipe_object_result = sh.create_pipe_object_result -coerce_to_pipe_object = sh.coerce_to_pipe_object normalize_result_input = sh.normalize_result_input should_show_help = sh.should_show_help get_field = sh.get_field @@ -133,6 +133,8 @@ class ScreenshotOptions: prefer_platform_target: bool = False target_selectors: Optional[Sequence[str]] = None selector_timeout_ms: int = 10_000 + interactive_pick: bool = False + interactive_pick_timeout_s: float = 120.0 playwright_tool: Optional[PlaywrightTool] = None @@ -144,6 +146,8 @@ class ScreenshotResult: tag_applied: List[str] archive_url: List[str] url: List[str] + capture_mode: str = "" + capture_target: str = "" warnings: List[str] = field(default_factory=list) @@ -267,6 +271,581 @@ def _format_suffix(fmt: str) -> str: return f".{fmt}" +def _normalize_capture_mode(value: Optional[str]) -> str: + mode = str(value or "").strip().lower() + if mode in {"full", "page", "fullscreen"}: + return "full" + if mode in {"pick", "picker", "interactive", "element", "select"}: + return "interactive" + return "" + + +def _stdin_interactive() -> bool: + try: + return bool(sys.stdin and sys.stdin.isatty()) + except Exception: + return False + + +def _debug_rows(rows: Sequence[tuple[str, Any]]) -> List[tuple[str, Any]]: + normalized: List[tuple[str, Any]] = [] + for key, value in rows: + if isinstance(value, (list, tuple, set)): + value = ", ".join(str(item) for item in value) if value else "" + elif isinstance(value, Path): + value = str(value) + elif value in (None, ""): + value = "" + normalized.append((str(key), value)) + return normalized + + +def _show_debug_panel( + title: str, + rows: Sequence[tuple[str, Any]], + *, + border_style: str = "cyan", +) -> None: + try: + debug_panel(title, _debug_rows(rows), border_style=border_style) + except Exception: + pass + + +def _install_element_picker(page: Any) -> None: + page.evaluate( + """ + () => { + try { + if (typeof window.__medeiaPickerCleanup === 'function') { + window.__medeiaPickerCleanup(); + } + + window.__medeiaPickerResult = null; + + const cssEscape = (value) => { + try { + if (window.CSS && typeof window.CSS.escape === 'function') { + return window.CSS.escape(String(value || '')); + } + } catch (e) {} + return String(value || '').replace(/[^a-zA-Z0-9_-]/g, '\\$&'); + }; + + const buildSelector = (element) => { + if (!(element instanceof Element)) return ''; + if (element.id) return '#' + cssEscape(element.id); + const parts = []; + let node = element; + while (node && node.nodeType === 1 && parts.length < 8) { + let part = String(node.localName || node.tagName || '').toLowerCase(); + if (!part) break; + const classes = Array.from(node.classList || []).filter(Boolean).slice(0, 2); + if (classes.length) { + part += classes.map((name) => '.' + cssEscape(name)).join(''); + } + const parent = node.parentElement; + if (parent) { + const siblings = Array.from(parent.children).filter((child) => child.localName === node.localName); + if (siblings.length > 1) { + part += `:nth-of-type(${siblings.indexOf(node) + 1})`; + } + } + parts.unshift(part); + const selector = parts.join(' > '); + try { + if (document.querySelectorAll(selector).length === 1) { + return selector; + } + } catch (e) {} + node = parent; + } + return parts.join(' > '); + }; + + const box = document.createElement('div'); + box.setAttribute('data-medeia-picker', 'box'); + box.style.position = 'fixed'; + box.style.pointerEvents = 'none'; + box.style.zIndex = '2147483646'; + box.style.border = '2px solid #ffb000'; + box.style.background = 'rgba(255, 176, 0, 0.12)'; + box.style.boxShadow = '0 0 0 99999px rgba(0, 0, 0, 0.12)'; + box.style.display = 'none'; + + const banner = document.createElement('div'); + banner.setAttribute('data-medeia-picker', 'banner'); + banner.style.position = 'fixed'; + banner.style.top = '12px'; + banner.style.left = '50%'; + banner.style.transform = 'translateX(-50%)'; + banner.style.zIndex = '2147483647'; + banner.style.padding = '10px 14px'; + banner.style.background = 'rgba(18, 18, 18, 0.92)'; + banner.style.color = '#ffffff'; + banner.style.font = '13px/1.4 sans-serif'; + banner.style.borderRadius = '10px'; + banner.style.boxShadow = '0 8px 24px rgba(0, 0, 0, 0.35)'; + banner.style.maxWidth = 'min(90vw, 920px)'; + banner.style.pointerEvents = 'none'; + banner.textContent = 'Medeia screenshot picker: hover an element, click to capture it, or press Escape to cancel.'; + + const updateBox = (element) => { + if (!(element instanceof Element)) { + box.style.display = 'none'; + return; + } + const rect = element.getBoundingClientRect(); + box.style.display = 'block'; + box.style.left = rect.left + 'px'; + box.style.top = rect.top + 'px'; + box.style.width = rect.width + 'px'; + box.style.height = rect.height + 'px'; + }; + + const finish = (payload) => { + if (window.__medeiaPickerResult) { + return; + } + window.__medeiaPickerResult = payload; + }; + + const onMove = (event) => { + const target = event.target instanceof Element ? event.target : null; + if (!target || target.closest('[data-medeia-picker]')) { + return; + } + updateBox(target); + }; + + const onPointerDown = (event) => { + const target = event.target instanceof Element ? event.target : null; + if (!target || target.closest('[data-medeia-picker]')) { + return; + } + event.preventDefault(); + event.stopPropagation(); + event.stopImmediatePropagation(); + const rect = target.getBoundingClientRect(); + finish({ + cancelled: false, + selector: buildSelector(target), + tag: String(target.localName || target.tagName || '').toLowerCase(), + text: String((target.textContent || '').trim()).slice(0, 200), + width: Math.round(rect.width || 0), + height: Math.round(rect.height || 0), + }); + }; + + const onKeyDown = (event) => { + if (event.key !== 'Escape') { + return; + } + event.preventDefault(); + event.stopPropagation(); + event.stopImmediatePropagation(); + finish({ cancelled: true }); + }; + + window.__medeiaPickerCleanup = () => { + window.removeEventListener('mousemove', onMove, true); + window.removeEventListener('pointerdown', onPointerDown, true); + window.removeEventListener('keydown', onKeyDown, true); + try { box.remove(); } catch (e) {} + try { banner.remove(); } catch (e) {} + try { delete window.__medeiaPickerCleanup; } catch (e) {} + }; + + window.addEventListener('mousemove', onMove, true); + window.addEventListener('pointerdown', onPointerDown, true); + window.addEventListener('keydown', onKeyDown, true); + document.documentElement.appendChild(box); + document.documentElement.appendChild(banner); + + try { + window.focus(); + } catch (e) {} + try { + document.documentElement.setAttribute('tabindex', '-1'); + document.documentElement.focus({ preventScroll: true }); + } catch (e) {} + } catch (e) { + window.__medeiaPickerResult = { + cancelled: true, + error: String(e || ''), + }; + } + } + """ + ) + + +def _clear_element_picker(page: Any) -> None: + try: + page.evaluate( + """ + () => { + try { + if (typeof window.__medeiaPickerCleanup === 'function') { + window.__medeiaPickerCleanup(); + } + } catch (e) {} + } + """ + ) + except Exception: + pass + + +def _interactive_pick_selector(page: Any, *, timeout_s: float) -> Dict[str, Any]: + picked: Dict[str, Any] = {} + + _install_element_picker(page) + deadline = time.time() + max(5.0, float(timeout_s or 0.0)) + try: + while time.time() < deadline: + try: + if page.is_closed(): + picked["cancelled"] = True + break + except Exception: + break + + try: + payload = page.evaluate("() => window.__medeiaPickerResult || null") + except Exception: + payload = None + + if isinstance(payload, dict) and payload: + picked.update(payload) + break + + time.sleep(0.05) + finally: + _clear_element_picker(page) + + if not picked: + raise ScreenshotError("Timed out waiting for element selection") + if picked.get("cancelled"): + error_text = str(picked.get("error") or "").strip() + if error_text: + raise ScreenshotError(f"Element selection cancelled: {error_text}") + raise ScreenshotError("Element selection cancelled") + + selector = str(picked.get("selector") or "").strip() + if not selector: + raise ScreenshotError("Element picker did not return a valid selector") + return picked + + +def _prepare_capture_page( + tool: PlaywrightTool, + page: Any, + options: ScreenshotOptions, + warnings: List[str], + progress: PipelineProgress, +) -> str: + navigation_status = "loaded" + progress.step("loading navigating") + try: + tool.goto(page, options.url) + progress.step("loading page loaded") + except PlaywrightTimeoutError: + navigation_status = "timeout" + warnings.append("navigation timeout; capturing current page state") + progress.step("loading navigation timeout") + + if options.wait_for_article: + try: + page.wait_for_selector("article", timeout=10_000) + except PlaywrightTimeoutError: + warnings.append("
selector not found; capturing fallback") + + if options.wait_after_load > 0: + time.sleep(min(10.0, max(0.0, options.wait_after_load))) + + progress.step("loading stabilized") + progress.step("capturing preparing") + if options.replace_video_posters: + page.evaluate( + """ + document.querySelectorAll('video').forEach(v => { + if (v.poster) { + const img = document.createElement('img'); + img.src = v.poster; + img.style.maxWidth = '100%'; + img.style.borderRadius = '12px'; + v.replaceWith(img); + } + }); + """ + ) + return navigation_status + + +def _capture_selector_screenshot( + page: Any, + selector: str, + destination: Path, + format_name: str, + selector_timeout_ms: int, +) -> None: + selector_text = str(selector or "").strip() + if not selector_text: + raise ScreenshotError("No selector was provided for element capture") + + timeout_ms = max(10_000, int(selector_timeout_ms or 0)) + locator = page.locator(selector_text).first + locator.wait_for(state="visible", timeout=timeout_ms) + + try: + page.add_style_tag( + content=( + "*,*::before,*::after{animation:none !important;transition:none !important;" + "scroll-behavior:auto !important;}" + ) + ) + except Exception: + pass + + try: + locator.scroll_into_view_if_needed(timeout=min(timeout_ms, 2_500)) + except Exception: + pass + + try: + locator.evaluate( + """ + async (element) => { + const media = Array.from( + element.querySelectorAll('img,video,iframe') + ); + const pending = media.map((node) => { + if (node instanceof HTMLImageElement) { + if (node.complete) { + return Promise.resolve(); + } + return new Promise((resolve) => { + const done = () => resolve(); + node.addEventListener('load', done, { once: true }); + node.addEventListener('error', done, { once: true }); + setTimeout(done, 1500); + }); + } + return Promise.resolve(); + }); + if (pending.length) { + await Promise.allSettled(pending); + } + try { + if (document.fonts && document.fonts.ready) { + await Promise.race([ + document.fonts.ready, + new Promise((resolve) => setTimeout(resolve, 1500)), + ]); + } + } catch (e) {} + } + """ + ) + except Exception: + pass + + def _read_clip() -> Optional[Dict[str, float]]: + try: + clip_value = locator.bounding_box() + except Exception: + clip_value = None + if not isinstance(clip_value, dict): + return None + try: + return { + "x": max(0.0, float(clip_value.get("x") or 0.0)), + "y": max(0.0, float(clip_value.get("y") or 0.0)), + "width": max(1.0, float(clip_value.get("width") or 0.0)), + "height": max(1.0, float(clip_value.get("height") or 0.0)), + } + except Exception: + return None + + def _read_page_rect() -> Optional[Dict[str, float]]: + try: + rect_value = locator.evaluate( + """ + (element) => { + const rect = element.getBoundingClientRect(); + return { + x: Math.max(0, rect.left + window.scrollX), + y: Math.max(0, rect.top + window.scrollY), + width: Math.max(1, rect.width), + height: Math.max(1, rect.height), + }; + } + """ + ) + except Exception: + rect_value = None + if not isinstance(rect_value, dict): + return None + try: + return { + "x": max(0.0, float(rect_value.get("x") or 0.0)), + "y": max(0.0, float(rect_value.get("y") or 0.0)), + "width": max(1.0, float(rect_value.get("width") or 0.0)), + "height": max(1.0, float(rect_value.get("height") or 0.0)), + } + except Exception: + return None + + stable_clip: Optional[Dict[str, float]] = None + stable_reads = 0 + previous_clip: Optional[Dict[str, float]] = None + for _ in range(12): + current_clip = _read_clip() + if current_clip is None: + time.sleep(0.15) + continue + if previous_clip is not None: + dx = abs(current_clip["x"] - previous_clip["x"]) + dy = abs(current_clip["y"] - previous_clip["y"]) + dw = abs(current_clip["width"] - previous_clip["width"]) + dh = abs(current_clip["height"] - previous_clip["height"]) + if max(dx, dy, dw, dh) <= 1.0: + stable_reads += 1 + else: + stable_reads = 0 + previous_clip = current_clip + stable_clip = current_clip + if stable_reads >= 2: + break + time.sleep(0.15) + + clip = stable_clip + if clip is None: + raise ScreenshotError(f"Could not measure selector '{selector_text}'") + x = clip["x"] + y = clip["y"] + width = clip["width"] + height = clip["height"] + page_rect = _read_page_rect() + if page_rect is None: + raise ScreenshotError(f"Could not read page coordinates for selector '{selector_text}'") + + viewport_size = None + try: + viewport_size = page.viewport_size + except Exception: + viewport_size = None + + try: + current_viewport_width = max(1.0, float((viewport_size or {}).get("width") or 0.0)) + current_viewport_height = max(1.0, float((viewport_size or {}).get("height") or 0.0)) + except Exception: + current_viewport_width = 0.0 + current_viewport_height = 0.0 + + required_width = max(1.0, x + width + 8.0) + if required_width > current_viewport_width: + try: + page.set_viewport_size( + { + "width": int(max(current_viewport_width, required_width)), + "height": int(max(current_viewport_height, 1.0)), + } + ) + try: + locator.scroll_into_view_if_needed(timeout=min(timeout_ms, 2_500)) + except Exception: + pass + time.sleep(0.25) + clip = _read_clip() + if clip is None: + raise ScreenshotError(f"Could not re-measure selector '{selector_text}' after viewport resize") + x = clip["x"] + y = clip["y"] + width = clip["width"] + height = clip["height"] + page_rect = _read_page_rect() + if page_rect is None: + raise ScreenshotError(f"Could not re-read page coordinates for selector '{selector_text}'") + current_viewport_width = max(current_viewport_width, required_width) + except Exception as exc: + raise ScreenshotError(f"Could not resize viewport for selector '{selector_text}': {exc}") from exc + + if height > max(1.0, current_viewport_height - 8.0): + try: + from PIL import Image + except Exception as exc: + raise ScreenshotError( + f"Pillow is required for tall element capture: {exc}" + ) from exc + + try: + full_page_bytes = page.screenshot( + full_page=True, + timeout=timeout_ms, + type="png", + ) + except Exception as exc: + raise ScreenshotError( + f"Could not capture full-page screenshot for selector '{selector_text}': {exc}" + ) from exc + + padding = 2.0 + crop_left = max(0, int(page_rect["x"] - padding)) + crop_top = max(0, int(page_rect["y"] - padding)) + crop_right = max(crop_left + 1, int(page_rect["x"] + page_rect["width"] + padding + 0.9999)) + crop_bottom = max(crop_top + 1, int(page_rect["y"] + page_rect["height"] + padding + 0.9999)) + + try: + with Image.open(io.BytesIO(full_page_bytes)) as full_page_image: + bounded_box = ( + max(0, min(crop_left, full_page_image.width - 1)), + max(0, min(crop_top, full_page_image.height - 1)), + max(1, min(crop_right, full_page_image.width)), + max(1, min(crop_bottom, full_page_image.height)), + ) + cropped = full_page_image.crop(bounded_box) + save_kwargs: Dict[str, Any] = {} + if format_name == "jpeg": + cropped = cropped.convert("RGB") + save_kwargs.update({"format": "JPEG", "quality": 90}) + else: + if cropped.mode == "P": + cropped = cropped.convert("RGBA") + save_kwargs.update({"format": "PNG"}) + cropped.save(destination, **save_kwargs) + return + except Exception as exc: + raise ScreenshotError( + f"Could not crop full-page screenshot for selector '{selector_text}': {exc}" + ) from exc + + padding = 2.0 + x = max(0.0, x - padding) + y = max(0.0, y - padding) + width = max(1.0, width + (padding * 2.0)) + height = max(1.0, height + (padding * 2.0)) + + clip_box: Dict[str, float] = { + "x": float(int(x)), + "y": float(int(y)), + "width": float(int(width + 0.9999)), + "height": float(int(height + 0.9999)), + } + + screenshot_kwargs: Dict[str, Any] = { + "path": str(destination), + "timeout": timeout_ms, + "clip": clip_box, + } + if format_name == "jpeg": + screenshot_kwargs["type"] = "jpeg" + screenshot_kwargs["quality"] = 90 + + page.screenshot(**screenshot_kwargs) + + def _convert_to_webp( src_png: Path, dst_webp: Path, @@ -330,9 +909,6 @@ def _convert_to_webp( if scale > 0.0 and scale < 1.0: new_w = max(1, int(w * scale)) new_h = max(1, int(h * scale)) - debug( - f"[_convert_to_webp] Image exceeds WebP limit ({w}x{h}); downscaling -> {new_w}x{new_h}" - ) try: resample = getattr( getattr(Image, @@ -345,10 +921,8 @@ def _convert_to_webp( resample = getattr(Image, "LANCZOS", 1) im = im.resize((new_w, new_h), resample=resample) did_downscale = True - except Exception as exc: - debug( - f"[_convert_to_webp] Downscale failed; attempting direct WEBP save anyway: {exc}" - ) + except Exception: + pass im.save(tmp_path, **save_kwargs) @@ -429,8 +1003,7 @@ def _platform_preprocess( _try_click_buttons(["Accept all", "Accept"]) if "instagram.com" in u: _try_click_buttons(["Allow all", "Accept all", "Accept"]) - except Exception as exc: - debug(f"[_platform_preprocess] skipped: {exc}") + except Exception: return @@ -478,35 +1051,41 @@ def _archive_url(url: str, timeout: float) -> Tuple[List[str], List[str]]: """Submit URL to all available archive services.""" archives: List[str] = [] warnings: List[str] = [] + archive_status: List[tuple[str, Any]] = [] for submitter, label in ( (_submit_wayback, "wayback"), (_submit_archive_today, "archive.today"), (_submit_archive_ph, "archive.ph"), ): try: - debug(f"Archiving to {label}...") archived = submitter(url, timeout) except httpx.HTTPStatusError as exc: if exc.response.status_code == 429: warnings.append(f"archive {label} rate limited (HTTP 429)") - debug(f"{label}: Rate limited (HTTP 429)") + archive_status.append((label, "rate limited (HTTP 429)")) else: warnings.append( f"archive {label} failed: HTTP {exc.response.status_code}" ) - debug(f"{label}: HTTP {exc.response.status_code}") + archive_status.append((label, f"HTTP {exc.response.status_code}")) except httpx.RequestError as exc: warnings.append(f"archive {label} failed: {exc}") - debug(f"{label}: Connection error: {exc}") + archive_status.append((label, f"connection error: {exc}")) except Exception as exc: warnings.append(f"archive {label} failed: {exc}") - debug(f"{label}: {exc}") + archive_status.append((label, exc)) else: if archived: archives.append(archived) - debug(f"{label}: Success - {archived}") + archive_status.append((label, archived)) else: - debug(f"{label}: No archive link returned") + archive_status.append((label, "no archive link returned")) + + if is_debug_enabled() and archive_status: + _show_debug_panel( + "Screenshot Archive", + [("url", url), *archive_status], + ) return archives, warnings @@ -545,9 +1124,10 @@ def _capture( destination: Path, warnings: List[str], progress: PipelineProgress -) -> None: +) -> tuple[str, str]: """Capture screenshot using Playwright.""" - debug(f"[_capture] Starting capture for {options.url} -> {destination}") + capture_mode = "full-page" + capture_target = "" try: progress.step("loading launching browser") tool = options.playwright_tool or PlaywrightTool({}) @@ -562,9 +1142,6 @@ def _capture( None) is not None else "" ) if current_browser != "chromium": - debug( - f"[_capture] Overriding Playwright browser '{current_browser}' -> 'chromium' for screen-shot cmdlet" - ) base_cfg = {} try: base_cfg = dict(getattr(tool, @@ -594,32 +1171,44 @@ def _capture( } }) - if is_debug_enabled(): - try: - from rich.table import Table - from rich import box - t = Table(title="Screenshot Config", show_header=True, header_style="bold magenta", box=box.ROUNDED) - t.add_column("Property", style="cyan") - t.add_column("Value", style="green") - t.add_row("URL", options.url) - t.add_row("Format", _normalize_format(options.output_format)) - - # Browser details - defaults = getattr(tool, "defaults", None) - if defaults: - t.add_row("Browser", getattr(defaults, "browser", "unknown")) - t.add_row("Headless", str(getattr(defaults, "headless", "unknown"))) - t.add_row("Viewport", f"{getattr(defaults, 'viewport_width', '?')}x{getattr(defaults, 'viewport_height', '?')}") - t.add_row("Timeout", f"{getattr(defaults, 'navigation_timeout_ms', '?')}ms") - - t.add_row("Full Page", str(options.full_page)) - t.add_row("Destination", str(destination)) - debug(t) - except Exception: - pass - format_name = _normalize_format(options.output_format) - headless = options.headless or format_name == "pdf" + capture_headless = bool(options.headless) + picker_headless = capture_headless + if options.interactive_pick and format_name != "pdf": + picker_headless = False + capture_headless = True + elif format_name == "pdf": + picker_headless = True + capture_headless = True + + if is_debug_enabled(): + defaults = getattr(tool, "defaults", None) + _show_debug_panel( + "Screenshot Config", + [ + ("url", options.url), + ("format", _normalize_format(options.output_format)), + ("browser", getattr(defaults, "browser", "unknown") if defaults else "unknown"), + ("headless", getattr(defaults, "headless", "unknown") if defaults else "unknown"), + ( + "viewport", + ( + f"{getattr(defaults, 'viewport_width', '?')}x{getattr(defaults, 'viewport_height', '?')}" + if defaults else "" + ), + ), + ("timeout", f"{getattr(defaults, 'navigation_timeout_ms', '?')}ms" if defaults else ""), + ("full_page", options.full_page), + ("interactive_pick", options.interactive_pick), + ("picker_headless", picker_headless), + ("capture_headless", capture_headless), + ("target_selectors", list(options.target_selectors or [])), + ("destination", destination), + ], + border_style="magenta", + ) + + navigation_status = "loaded" if format_name == "pdf" and not options.headless: warnings.append( @@ -627,138 +1216,146 @@ def _capture( ) try: - with tool.open_page(headless=headless) as page: - progress.step("loading navigating") - debug(f"Navigating to {options.url}...") - try: - tool.goto(page, options.url) - debug("Page loaded successfully") - progress.step("loading page loaded") - except PlaywrightTimeoutError: - warnings.append("navigation timeout; capturing current page state") - debug("Navigation timeout; proceeding with current state") - progress.step("loading navigation timeout") - - # Skip article lookup by default (wait_for_article defaults to False) - if options.wait_for_article: - try: - debug("Waiting for article element...") - page.wait_for_selector("article", timeout=10_000) - debug("Article element found") - except PlaywrightTimeoutError: - warnings.append( - "
selector not found; capturing fallback" - ) - debug("Article element not found; using fallback") - - if options.wait_after_load > 0: - debug( - f"Waiting {options.wait_after_load}s for page stabilization..." + element_captured = False + if options.interactive_pick and format_name != "pdf": + selected_selector = "" + with tool.open_page(headless=picker_headless) as page: + navigation_status = _prepare_capture_page( + tool, + page, + options, + warnings, + progress, ) - time.sleep(min(10.0, max(0.0, options.wait_after_load))) - - progress.step("loading stabilized") - - progress.step("capturing preparing") - if options.replace_video_posters: - debug("Replacing video elements with posters...") - page.evaluate( - """ - document.querySelectorAll('video').forEach(v => { - if (v.poster) { - const img = document.createElement('img'); - img.src = v.poster; - img.style.maxWidth = '100%'; - img.style.borderRadius = '12px'; - v.replaceWith(img); - } - }); - """ - ) - # Attempt platform-specific target capture if requested (and not PDF) - element_captured = False - if options.prefer_platform_target and format_name != "pdf": - debug("[_capture] Target capture enabled") - debug("Attempting platform-specific content capture...") progress.step("capturing locating target") - try: - _platform_preprocess(options.url, page, warnings) - except Exception as e: - debug(f"[_capture] Platform preprocess failed: {e}") - pass - selectors = list(options.target_selectors or []) - if not selectors: - selectors = _selectors_for_url(options.url) + picked = _interactive_pick_selector( + page, + timeout_s=options.interactive_pick_timeout_s, + ) + selected_selector = str(picked.get("selector") or "").strip() + if not selected_selector: + raise ScreenshotError("Element picker did not return a valid selector") - debug(f"[_capture] Trying selectors: {selectors}") - for sel in selectors: + capture_mode = "interactive" + capture_target = selected_selector + + progress.step("loading launching browser") + with tool.open_page(headless=capture_headless) as page: + navigation_status = _prepare_capture_page( + tool, + page, + options, + warnings, + progress, + ) + progress.step("capturing output") + _capture_selector_screenshot( + page, + selected_selector, + destination, + format_name, + options.selector_timeout_ms, + ) + element_captured = True + else: + with tool.open_page(headless=capture_headless) as page: + navigation_status = _prepare_capture_page( + tool, + page, + options, + warnings, + progress, + ) + # Attempt platform-specific target capture if requested (and not PDF) + if options.prefer_platform_target and format_name != "pdf": + progress.step("capturing locating target") try: - debug(f"Trying selector: {sel}") - el = page.wait_for_selector( - sel, - timeout=max(0, - int(options.selector_timeout_ms)) - ) - except PlaywrightTimeoutError: - debug(f"Selector not found: {sel}") - continue - try: - if el is not None: - debug(f"Found element with selector: {sel}") - try: - el.scroll_into_view_if_needed(timeout=1000) - except Exception: - pass - progress.step("capturing output") - debug(f"Capturing element to {destination}...") - el.screenshot( - path=str(destination), - type=("jpeg" if format_name == "jpeg" else None), + _platform_preprocess(options.url, page, warnings) + except Exception: + pass + selectors = list(options.target_selectors or []) + if not selectors: + selectors = _selectors_for_url(options.url) + + for sel in selectors: + try: + _capture_selector_screenshot( + page, + sel, + destination, + format_name, + options.selector_timeout_ms, ) element_captured = True - debug("Element captured successfully") + capture_mode = "selector" + capture_target = sel break - except Exception as exc: - warnings.append( - f"element capture failed for '{sel}': {exc}" - ) - debug(f"Failed to capture element: {exc}") - # Fallback to default capture paths - if element_captured: - progress.step("capturing saved") - elif format_name == "pdf": - debug("Generating PDF...") - page.emulate_media(media="print") - progress.step("capturing output") - page.pdf(path=str(destination), print_background=True) - debug(f"PDF saved to {destination}") - progress.step("capturing saved") - else: - debug(f"Capturing full page to {destination}...") - screenshot_kwargs: Dict[str, - Any] = { - "path": str(destination) - } - if format_name == "jpeg": - screenshot_kwargs["type"] = "jpeg" - screenshot_kwargs["quality"] = 90 - if options.full_page: - progress.step("capturing output") - page.screenshot(full_page=True, **screenshot_kwargs) - else: - article = page.query_selector("article") - if article is not None: - article_kwargs = dict(screenshot_kwargs) - article_kwargs.pop("full_page", None) + except PlaywrightTimeoutError: + continue + except Exception as exc: + warnings.append( + f"element capture failed for '{sel}': {exc}" + ) + + # Fallback to default capture paths + if not element_captured: + if format_name == "pdf": + capture_mode = "pdf" + page.emulate_media(media="print") progress.step("capturing output") - article.screenshot(**article_kwargs) + page.pdf(path=str(destination), print_background=True) else: - progress.step("capturing output") - page.screenshot(**screenshot_kwargs) - debug(f"Screenshot saved to {destination}") - progress.step("capturing saved") + screenshot_kwargs: Dict[str, Any] = { + "path": str(destination) + } + if format_name == "jpeg": + screenshot_kwargs["type"] = "jpeg" + screenshot_kwargs["quality"] = 90 + if options.full_page: + progress.step("capturing output") + page.screenshot(full_page=True, **screenshot_kwargs) + capture_mode = "full-page" + else: + article = page.query_selector("article") + if article is not None: + article_kwargs = dict(screenshot_kwargs) + article_kwargs.pop("full_page", None) + progress.step("capturing output") + article.screenshot(**article_kwargs) + capture_mode = "article" + capture_target = "article" + else: + progress.step("capturing output") + page.screenshot(**screenshot_kwargs) + capture_mode = "page" + + if element_captured or capture_mode: + progress.step("capturing saved") + + if is_debug_enabled(): + _show_debug_panel( + "Screenshot Capture", + [ + ("url", options.url), + ("navigation", navigation_status), + ("mode", capture_mode), + ("target", capture_target), + ("wait_after_load_s", options.wait_after_load), + ("warnings", len(warnings)), + ("saved_to", destination), + ], + ) except Exception as exc: - debug(f"[_capture] Exception launching browser/page: {exc}") + if is_debug_enabled(): + _show_debug_panel( + "Screenshot Error", + [ + ("url", options.url), + ("destination", destination), + ("error", exc), + ], + border_style="red", + ) msg = str(exc).lower() if any(k in msg for k in ["executable", "not found", "no such file", "cannot find", "install"]): @@ -770,8 +1367,8 @@ def _capture( # Re-raise ScreenshotError raised intentionally (do not wrap) raise except Exception as exc: - debug(f"[_capture] Exception: {exc}") raise ScreenshotError(f"Failed to capture screenshot: {exc}") from exc + return capture_mode, capture_target def _capture_screenshot( @@ -779,17 +1376,19 @@ def _capture_screenshot( progress: PipelineProgress ) -> ScreenshotResult: """Capture a screenshot for the given options.""" - debug(f"[_capture_screenshot] Preparing capture for {options.url}") requested_format = _normalize_format(options.output_format) destination = _prepare_output_path(options) warnings: List[str] = [] + capture_mode = "" + capture_target = "" - will_target = bool(options.prefer_platform_target) and requested_format != "pdf" + will_target = bool(options.prefer_platform_target or options.interactive_pick) and requested_format != "pdf" will_convert = requested_format == "webp" will_archive = bool(options.archive and options.url) + interactive_extra_steps = 5 if (options.interactive_pick and requested_format != "pdf") else 0 total_steps = ( - 9 + (1 if will_target else 0) + (1 if will_convert else 0) + - (1 if will_archive else 0) + 9 + (1 if will_target else 0) + interactive_extra_steps + + (1 if will_convert else 0) + (1 if will_archive else 0) ) progress.begin_steps(total_steps) progress.step("loading starting") @@ -799,15 +1398,11 @@ def _capture_screenshot( capture_path = destination if requested_format == "webp": capture_path = unique_path(destination.with_suffix(".png")) - debug( - f"[_capture_screenshot] Requested webp; capturing intermediate png -> {capture_path}" - ) options.output_format = "png" - _capture(options, capture_path, warnings, progress) + capture_mode, capture_target = _capture(options, capture_path, warnings, progress) if requested_format == "webp": progress.step("capturing converting to webp") - debug(f"[_capture_screenshot] Converting png -> webp: {destination}") try: did_downscale = _convert_to_webp(capture_path, destination) if did_downscale: @@ -828,7 +1423,6 @@ def _capture_screenshot( archive_url: List[str] = [] if options.archive and options.url: progress.step("capturing archiving") - debug(f"[_capture_screenshot] Archiving enabled for {options.url}") archives, archive_warnings = _archive_url(options.url, options.archive_timeout) archive_url.extend(archives) warnings.extend(archive_warnings) @@ -839,11 +1433,27 @@ def _capture_screenshot( applied_tag = unique_preserve_order(list(tag for tag in options.tag if tag.strip())) + if is_debug_enabled(): + _show_debug_panel( + "Screenshot Output", + [ + ("url", options.url), + ("requested_format", requested_format), + ("path", destination), + ("capture_mode", capture_mode), + ("capture_target", capture_target), + ("archives", archive_url), + ("warnings", warnings), + ], + ) + return ScreenshotResult( path=destination, tag_applied=applied_tag, archive_url=archive_url, url=url, + capture_mode=capture_mode, + capture_target=capture_target, warnings=warnings, ) @@ -854,47 +1464,20 @@ def _capture_screenshot( def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: - """Take screenshots of url in the pipeline. - - Accepts: - - Single result object (dict or PipeObject) with 'path' field - - List of result objects to screenshot each - - Direct URL as string - - Emits PipeObject-formatted results for each screenshot with: - - action: 'cmdlet:screen-shot' - - is_temp: True (screenshots are temporary artifacts) - - parent_id: hash of the original file/URL - - Screenshots are created using Playwright and marked as temporary - so they can be cleaned up later with the cleanup cmdlet. - """ - debug(f"[_run] screen-shot invoked with args: {args}") - - # Help check + """Take screenshots of URL inputs from args or pipeline items.""" if should_show_help(args): log(f"Cmdlet: {CMDLET.name}\nSummary: {CMDLET.summary}\nUsage: {CMDLET.usage}") return 0 - - progress = PipelineProgress(pipeline_context) - # ======================================================================== - # ARGUMENT PARSING - # ======================================================================== - parsed = parse_cmdlet_args(args, CMDLET) format_value = parsed.get("format") + capture_mode_value = _normalize_capture_mode(parsed.get("capture_mode")) if not format_value: - # Default format can be set via config.conf tool block: - # [tool=playwright] - # format="pdf" try: - tool_cfg = config.get("tool", - {}) if isinstance(config, - dict) else {} + tool_cfg = config.get("tool", {}) if isinstance(config, dict) else {} pw_cfg = tool_cfg.get("playwright") if isinstance(tool_cfg, dict) else None if isinstance(pw_cfg, dict): format_value = pw_cfg.get("format") @@ -902,35 +1485,23 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: pass if not format_value: format_value = "webp" + storage_value = parsed.get("storage") selector_arg = parsed.get("selector") selectors = [selector_arg] if selector_arg else [] archive_enabled = parsed.get("archive", False) - # Positional URL argument (if provided) url_arg = parsed.get("url") positional_url = [str(url_arg)] if url_arg else [] - # ======================================================================== - # INPUT PROCESSING - Extract url from command args or pipeline - # ======================================================================== - - # If the user provided an explicit URL argument, prefer it. url_to_process: List[Tuple[str, Any]] = [] if positional_url: url_to_process = [(u, None) for u in positional_url] else: piped_results = normalize_result_input(result) - - # Extract url from piped results if piped_results: for item in piped_results: - url = get_field(item, - "path") or get_field(item, - "url" - ) or get_field(item, - "target") - + url = get_field(item, "path") or get_field(item, "url") or get_field(item, "target") if url: url_to_process.append((str(url), item)) @@ -938,49 +1509,52 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: log("No url to process for screen-shot cmdlet", file=sys.stderr) return 1 - debug(f"[_run] url to process: {[u for u, _ in url_to_process]}") - - # ======================================================================== - # OUTPUT DIRECTORY RESOLUTION - Priority chain - # ======================================================================== - screenshot_dir: Optional[Path] = None - - # Primary: Use --storage if provided (highest priority) + screenshot_dir_source = "default temp" if storage_value: try: screenshot_dir = SharedArgs.resolve_storage(storage_value) - debug(f"[screen_shot] Using --storage {storage_value}: {screenshot_dir}") - except ValueError as e: - log(str(e), file=sys.stderr) + screenshot_dir_source = f"--storage {storage_value}" + except ValueError as exc: + log(str(exc), file=sys.stderr) return 1 - - # Secondary: Use config-based resolver ONLY if --storage not provided if screenshot_dir is None and resolve_output_dir is not None: try: screenshot_dir = resolve_output_dir(config) - debug(f"[screen_shot] Using config resolver: {screenshot_dir}") + screenshot_dir_source = "config resolver" except Exception: pass - - # Tertiary: Use config outfile ONLY if neither --storage nor resolver worked if screenshot_dir is None and config and config.get("outfile"): try: screenshot_dir = Path(config["outfile"]).expanduser() - debug(f"[screen_shot] Using config outfile: {screenshot_dir}") + screenshot_dir_source = "config outfile" except Exception: pass - - # Default: system temp directory if screenshot_dir is None: screenshot_dir = Path(tempfile.gettempdir()) - debug(f"[screen_shot] Using default directory: {screenshot_dir}") ensure_directory(screenshot_dir) - # If the caller isn't running the shared pipeline Live progress UI (e.g. direct - # cmdlet execution), start a minimal local pipeline progress panel so this cmdlet - # still shows step-level progress. + format_name = _normalize_format(format_value) + filtered_selectors = [str(s).strip() for s in selectors if str(s).strip()] + manual_target_selectors = filtered_selectors if filtered_selectors else None + interactive_default = bool(len(url_to_process) == 1 and _stdin_interactive()) + + if is_debug_enabled(): + _show_debug_panel( + "screen-shot", + [ + ("args", list(args)), + ("url_count", len(url_to_process)), + ("urls", [u for u, _ in url_to_process]), + ("archive", archive_enabled), + ("format", format_name), + ("capture_mode", capture_mode_value or ("interactive" if interactive_default and format_name != "pdf" else "auto")), + ("output_dir", screenshot_dir), + ("output_dir_source", screenshot_dir_source), + ], + ) + try: progress.ensure_local_ui( label="screen-shot", @@ -990,21 +1564,36 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: except Exception: pass - # ======================================================================== - # PREPARE SCREENSHOT OPTIONS - # ======================================================================== - - format_name = _normalize_format(format_value) - filtered_selectors = [str(s).strip() for s in selectors if str(s).strip()] - manual_target_selectors = filtered_selectors if filtered_selectors else None + shared_playwright_tool: Optional[PlaywrightTool] = None + try: + if isinstance(config, dict): + tool_block = dict(config.get("tool") or {}) + pw_block = dict(tool_block.get("playwright") or {}) + pw_block["browser"] = "chromium" + pw_block["user_agent"] = "native" + pw_block["viewport_width"] = int(DEFAULT_VIEWPORT.get("width", 1920)) + pw_block["viewport_height"] = int(DEFAULT_VIEWPORT.get("height", 1080)) + tool_block["playwright"] = pw_block + pw_local_cfg = dict(config) + pw_local_cfg["tool"] = tool_block + else: + pw_local_cfg = { + "tool": { + "playwright": { + "browser": "chromium", + "user_agent": "native", + "viewport_width": int(DEFAULT_VIEWPORT.get("width", 1920)), + "viewport_height": int(DEFAULT_VIEWPORT.get("height", 1080)), + } + } + } + shared_playwright_tool = PlaywrightTool(pw_local_cfg) + except Exception: + shared_playwright_tool = None all_emitted = [] exit_code = 0 - # ======================================================================== - # PROCESS url AND CAPTURE SCREENSHOTS - # ======================================================================== - def _extract_item_tags(item: Any) -> List[str]: return extract_item_tags(item) @@ -1018,42 +1607,11 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: return value for url, origin_item in url_to_process: - # Validate URL format if not url.lower().startswith(("http://", "https://", "file://")): log(f"[screen_shot] Skipping non-URL input: {url}", file=sys.stderr) continue try: - # Create screenshot with provided options - # Force the Playwright engine to Chromium for the screen-shot cmdlet - # (this ensures consistent rendering and supports PDF output requirements). - pw_local_cfg = {} - if isinstance(config, dict): - tool_block = dict(config.get("tool") or {}) - pw_block = dict(tool_block.get("playwright") or {}) - pw_block["browser"] = "chromium" - # Use Playwright-native UA/headers (matches bundled Chromium version). - pw_block["user_agent"] = "native" - pw_block["viewport_width"] = int(DEFAULT_VIEWPORT.get("width", 1920)) - pw_block["viewport_height"] = int(DEFAULT_VIEWPORT.get("height", 1080)) - tool_block["playwright"] = pw_block - pw_local_cfg = dict(config) - pw_local_cfg["tool"] = tool_block - else: - pw_local_cfg = { - "tool": { - "playwright": { - "browser": "chromium", - "user_agent": "native", - "viewport_width": int(DEFAULT_VIEWPORT.get("width", - 1920)), - "viewport_height": - int(DEFAULT_VIEWPORT.get("height", - 1080)), - } - } - } - options = ScreenshotOptions( url=url, output_dir=screenshot_dir, @@ -1063,48 +1621,35 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: prefer_platform_target=False, wait_for_article=False, full_page=True, - playwright_tool=PlaywrightTool(pw_local_cfg), + interactive_pick=False, + playwright_tool=shared_playwright_tool, ) - # Auto element capture for known sites (x.com/twitter/etc.). - # - If the user provided --selector, treat that as an explicit target. - # - Otherwise, if SITE_SELECTORS matches the URL, auto-capture the post/content element. auto_selectors = _matched_site_selectors(url) if manual_target_selectors: options.prefer_platform_target = True options.target_selectors = manual_target_selectors - debug( - f"[screen_shot] Using explicit selector(s): {manual_target_selectors}" - ) + elif capture_mode_value == "full": + options.prefer_platform_target = False + options.target_selectors = None + elif capture_mode_value == "interactive": + options.interactive_pick = True + elif interactive_default and format_name != "pdf": + options.interactive_pick = True elif auto_selectors: options.prefer_platform_target = True options.target_selectors = auto_selectors - debug(f"[screen_shot] Auto selectors matched for url: {auto_selectors}") screenshot_result = _capture_screenshot(options, progress) - # Log results and warnings - debug(f"Screenshot captured to {screenshot_result.path}") - if screenshot_result.archive_url: - debug(f"Archives: {', '.join(screenshot_result.archive_url)}") - for warning in screenshot_result.warnings: - debug(f"Warning: {warning}") - - # Compute hash of screenshot file screenshot_hash = None try: - with open(screenshot_result.path, "rb") as f: - screenshot_hash = hashlib.sha256(f.read()).hexdigest() + screenshot_hash = sha256_file(screenshot_result.path) except Exception: pass - # Create PipeObject result - marked as TEMP since derivative artifact - capture_date = "" try: - capture_date = ( - datetime.fromtimestamp(screenshot_result.path.stat().st_mtime - ).date().isoformat() - ) + capture_date = datetime.fromtimestamp(screenshot_result.path.stat().st_mtime).date().isoformat() except Exception: capture_date = datetime.now().date().isoformat() @@ -1114,14 +1659,12 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: upstream_tags = _extract_item_tags(origin_item) filtered_upstream_tags = [ - t for t in upstream_tags - if not str(t).strip().lower().startswith(("type:", "date:")) + tag for tag in upstream_tags + if not str(tag).strip().lower().startswith(("type:", "date:")) ] - url_tags = _tags_from_url(url) merged_tags = unique_preserve_order( - ["type:screenshot", - f"date:{capture_date}"] + filtered_upstream_tags + url_tags + ["type:screenshot", f"date:{capture_date}"] + filtered_upstream_tags + url_tags ) pipe_obj = create_pipe_object_result( @@ -1135,39 +1678,34 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: is_temp=True, parent_hash=hashlib.sha256(url.encode()).hexdigest(), tag=merged_tags, - url=url, # Explicitly map url to top-level PipeObject field - source_url=url, # Map source_url as well + url=url, + source_url=url, extra={ "source_url": url, "archive_url": screenshot_result.archive_url, "url": screenshot_result.url, - "target": str(screenshot_result.path), # Explicit target for add-file + "target": str(screenshot_result.path), }, ) - # Emit the result so downstream cmdlet (like add-file) can use it pipeline_context.emit(pipe_obj) all_emitted.append(pipe_obj) - # Debug: show PipeObject preview if enabled if is_debug_enabled(): - try: - debug("[screen-shot] Output PipeObject preview") - po = coerce_to_pipe_object(pipe_obj) - from SYS.logger import _sanitize_pipe_object_for_debug as _sanitize # Or use helper if avail - # Add simple sanitize helper if not available - def _safe_table(obj): - try: - # Try calling debug_table on the object - if hasattr(obj, "debug_table"): - obj.debug_table() - except Exception: - pass - _safe_table(po) - except Exception: - pass + _show_debug_panel( + "screen-shot output", + [ + ("path", screenshot_result.path), + ("hash", screenshot_hash), + ("title", display_title), + ("capture_mode", screenshot_result.capture_mode), + ("capture_target", screenshot_result.capture_target), + ("tags", merged_tags), + ("archives", screenshot_result.archive_url), + ("warnings", screenshot_result.warnings), + ], + ) - # If we created a local progress UI, advance it per completed item. progress.on_emit(pipe_obj) except ScreenshotError as exc: @@ -1186,16 +1724,14 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: log("No screenshots were successfully captured", file=sys.stderr) return 1 - # Log completion message (keep this as normal output) log(f"✓ Successfully captured {len(all_emitted)} screenshot(s)") - return exit_code CMDLET = Cmdlet( name="screen-shot", summary="Capture a website screenshot", - usage="screen-shot [options]", + usage="screen-shot [options] [-query \"format:full\"]", alias=["screenshot", "ss"], arg=[ @@ -1205,17 +1741,27 @@ CMDLET = Cmdlet( type="string", description="Output format: webp, png, jpeg, or pdf" ), + sh.QueryArg( + "capture_mode", + key="format", + aliases=["capture", "mode"], + query_only=True, + description="Capture mode via -query, e.g. format:full or format:interactive" + ), CmdletArg( name="selector", type="string", description="CSS selector for element capture" ), SharedArgs.PATH, + SharedArgs.QUERY, ], detail=[ "Uses Playwright Chromium engine only. Install Chromium with: python ./scripts/bootstrap.py --playwright-only --browsers chromium", "PDF output requires headless Chromium (the cmdlet will enforce headless mode for PDF).", "Screenshots are temporary artifacts stored in the configured `temp` directory.", + "Interactive single-URL runs open a headful browser picker by default so you can hover and click the element to capture.", + "Use -query \"format:full\" to bypass the picker and capture the full page directly.", ], ) diff --git a/tool/playwright.py b/tool/playwright.py index a975b86..467aa1d 100644 --- a/tool/playwright.py +++ b/tool/playwright.py @@ -191,23 +191,19 @@ class PlaywrightTool: if candidate and Path(candidate).exists(): ffmpeg_path = candidate else: - debug(f"Configured ffmpeg path does not exist: {candidate}") + ffmpeg_path = None if not ffmpeg_path: # Prefer a global FFMPEG_PATH env var (shared by tools) before Playwright-specific one env_ffmpeg = os.environ.get("FFMPEG_PATH") if env_ffmpeg and Path(env_ffmpeg).exists(): ffmpeg_path = env_ffmpeg - elif env_ffmpeg: - debug(f"FFMPEG_PATH set but path does not exist: {env_ffmpeg}") if not ffmpeg_path: # Backward-compatible Playwright-specific env var env_ffmpeg2 = os.environ.get("PLAYWRIGHT_FFMPEG_PATH") if env_ffmpeg2 and Path(env_ffmpeg2).exists(): ffmpeg_path = env_ffmpeg2 - elif env_ffmpeg2: - debug(f"PLAYWRIGHT_FFMPEG_PATH set but path does not exist: {env_ffmpeg2}") if not ffmpeg_path: # Try to find bundled ffmpeg in the project (Windows-only, in MPV/ffmpeg/bin) @@ -218,20 +214,14 @@ class PlaywrightTool: ffmpeg_exe = bundled_ffmpeg / ("ffmpeg.exe" if os.name == "nt" else "ffmpeg") if ffmpeg_exe.exists(): ffmpeg_path = str(ffmpeg_exe) - debug(f"Found bundled ffmpeg at: {ffmpeg_path}") - except Exception as e: - debug(f"Error checking for bundled ffmpeg: {e}") + except Exception: + pass if not ffmpeg_path: # Try system ffmpeg if bundled not found system_ffmpeg = shutil.which("ffmpeg") if system_ffmpeg: ffmpeg_path = system_ffmpeg - debug(f"Found system ffmpeg at: {ffmpeg_path}") - else: - # ffmpeg not found - log a debug message but don't fail - # ffmpeg-python may still work with system installation, or user might not need it - debug("ffmpeg not found on PATH. For best compatibility, install ffmpeg: Windows (use bundled or choco install ffmpeg), macOS (brew install ffmpeg), Linux (apt install ffmpeg or equivalent)") return PlaywrightDefaults( browser=browser,