From 67c272db4bb5a6835943a5d97febc5e1343649d0 Mon Sep 17 00:00:00 2001 From: Nose Date: Wed, 22 Apr 2026 21:19:55 -0700 Subject: [PATCH] added mhtml support and fixed some bugs in the process --- API/data/alldebrid.json | 8 +- CLI.py | 99 ++++++++++++++++ MPV/LUA/main.lua | 126 +++++++++++++++++--- MPV/pipeline_helper.py | 10 +- Provider/ytdlp.py | 31 ++--- SYS/cmdlet_catalog.py | 6 + SYS/pipeline.py | 4 +- cmdlet/screen_shot.py | 249 +++++++++++++++++++++++++++++++++++++--- tool/ytdlp.py | 97 +++++++++++++++- 9 files changed, 564 insertions(+), 66 deletions(-) diff --git a/API/data/alldebrid.json b/API/data/alldebrid.json index 501748d..55e2a61 100644 --- a/API/data/alldebrid.json +++ b/API/data/alldebrid.json @@ -92,7 +92,7 @@ "(hitfile\\.net/[a-z0-9A-Z]{4,9})" ], "regexp": "(hitf\\.(to|cc)/([a-z0-9A-Z]{4,9}))|(htfl\\.(net|to|cc)/([a-z0-9A-Z]{4,9}))|(hitfile\\.(net)/download/free/([a-z0-9A-Z]{4,9}))|((hitfile\\.net/[a-z0-9A-Z]{4,9}))", - "status": false + "status": true }, "mega": { "name": "mega", @@ -494,7 +494,7 @@ "mediafire\\.com/(\\?|download/|file/|download\\.php\\?)([0-9a-z]{15})" ], "regexp": "mediafire\\.com/(\\?|download/|file/|download\\.php\\?)([0-9a-z]{15})", - "status": true + "status": false }, "mixdrop": { "name": "mixdrop", @@ -17869,9 +17869,9 @@ "dl-protect.best" ], "regexps": [ - "dl\\-protect\\.(best|info|net|link|cc)/([0-9a-zA-Z]{8})" + "dl\\-protect\\.(best|info|net|link|cc)/([^/]+)" ], - "regexp": "dl\\-protect\\.(best|info|net|link|cc)/([0-9a-zA-Z]{8})" + "regexp": "dl\\-protect\\.(best|info|net|link|cc)/([^/]+)" }, "ed-protect": { "name": "ed-protect", diff --git a/CLI.py b/CLI.py index 05982a7..8d098e1 100644 --- a/CLI.py +++ b/CLI.py @@ -546,6 +546,35 @@ class CmdletIntrospection: except Exception: return [] + @staticmethod + def query_args(cmd_name: str, + config: Optional[Dict[str, + Any]] = None) -> List[Dict[str, + Any]]: + try: + meta = get_cmdlet_metadata(cmd_name, config=config) or {} + except Exception: + return [] + + args = meta.get("args", []) if isinstance(meta, dict) else [] + if not isinstance(args, list): + return [] + + query_args: List[Dict[str, Any]] = [] + for arg in args: + if not isinstance(arg, dict): + continue + key = str(arg.get("query_key") or "").strip().lower() + aliases = [ + str(value).strip().lower() + for value in (arg.get("query_aliases") or []) + if str(value).strip() + ] + if not key and not aliases: + continue + query_args.append(arg) + return query_args + class CmdletCompleter(Completer): """Prompt-toolkit completer for the Medeia cmdlet REPL.""" @@ -678,6 +707,76 @@ class CmdletCompleter(Completer): if cmd_name == "search-file": provider_name = self._flag_value(stage_tokens, "-plugin", "--plugin") + query_specs = CmdletIntrospection.query_args(cmd_name, config) + query_flag_index = -1 + for idx, tok in enumerate(stage_tokens): + if str(tok or "").strip().lower() in {"-query", "--query"}: + query_flag_index = idx + + if query_specs and query_flag_index >= 0: + query_parts = stage_tokens[query_flag_index + 1:] + query_started_quoted = bool(query_parts and str(query_parts[0] or "")[:1] in {"'", '"'}) + + query_fragment: Optional[str] = None + if prev_token in {"-query", "--query"} and current_token[:1] in {"'", '"'}: + query_fragment = current_token + elif query_started_quoted and not ends_with_space: + query_fragment = current_token + elif query_started_quoted and ends_with_space and ":" in prev_token: + query_fragment = "" + + if query_fragment is not None: + field_choices: Dict[str, List[str]] = {} + ordered_fields: List[str] = [] + for spec in query_specs: + key = str(spec.get("query_key") or spec.get("name") or "").strip().lower() + if not key: + continue + if key not in field_choices: + ordered_fields.append(key) + field_choices[key] = [str(choice) for choice in list(spec.get("choices", []) or [])] + for alias in spec.get("query_aliases", []) or []: + alias_text = str(alias or "").strip().lower() + if not alias_text: + continue + field_choices.setdefault(alias_text, field_choices[key]) + + raw_fragment = str(query_fragment or "") + segment = raw_fragment[1:] if raw_fragment[:1] in {"'", '"'} else raw_fragment + if "," in segment: + segment = segment.rsplit(",", 1)[-1].lstrip() + segment = segment.lstrip() + + if ":" in segment: + field, partial = segment.split(":", 1) + field = field.strip().lower() + partial_lower = partial.strip().lower() + + inline_choices = [] + if cmd_name == "search-file" and provider_name: + inline_choices = plugin_inline_query_choices(provider_name, field, config) + + choice_pool = inline_choices or field_choices.get(field, []) + if choice_pool: + filtered = ( + [choice for choice in choice_pool if partial_lower in str(choice).lower()] + if partial_lower else list(choice_pool) + ) + for choice in (filtered or choice_pool): + yield Completion(str(choice), start_position=-len(partial)) + return + else: + partial_lower = segment.strip().lower() + field_pool = ordered_fields + filtered_fields = ( + [field for field in field_pool if field.startswith(partial_lower)] + if partial_lower else field_pool + ) + for field in (filtered_fields or field_pool): + yield Completion(f"{field}:", start_position=-len(segment)) + if filtered_fields or field_pool: + return + if ( cmd_name == "search-file" and provider_name diff --git a/MPV/LUA/main.lua b/MPV/LUA/main.lua index 16feb65..b23bd94 100644 --- a/MPV/LUA/main.lua +++ b/MPV/LUA/main.lua @@ -1996,6 +1996,37 @@ function M._suspicious_ytdl_format_reason(fmt, url, raw) end end + if fmt:match('^%d+%-%d+$') and type(raw) == 'table' and type(raw.formats) == 'table' then + for _, item in ipairs(raw.formats) do + if type(item) == 'table' and trim(tostring(item.format_id or '')) == fmt then + local protocol = trim(tostring(item.protocol or '')):lower() + local size_bytes = item.filesize or item.filesize_approx + local vcodec = tostring(item.vcodec or 'none') + local acodec = tostring(item.acodec or 'none') + if (protocol == 'm3u8' or protocol == 'm3u8_native') + and not size_bytes + and vcodec ~= 'none' + and acodec ~= 'none' then + return 'format is transient hls variant selector' + end + break + end + end + end + + if fmt:match('^%d+%-%w+$') and type(raw) == 'table' and type(raw.formats) == 'table' then + for _, item in ipairs(raw.formats) do + if type(item) == 'table' and trim(tostring(item.format_id or '')) == fmt then + local vcodec = tostring(item.vcodec or 'none') + local acodec = tostring(item.acodec or 'none') + if vcodec == 'none' and acodec ~= 'none' then + return 'format is unstable audio variant selector' + end + break + end + end + end + return nil end @@ -4485,11 +4516,62 @@ local function _is_browseable_raw_format(fmt) return false end + local protocol = trim(tostring(fmt.protocol or '')):lower() + local size_bytes = fmt.filesize or fmt.filesize_approx + if protocol ~= '' + and (protocol == 'm3u8' or protocol == 'm3u8_native') + and format_id:match('^%d+%-%d+$') + and not size_bytes then + local hls_vcodec = tostring(fmt.vcodec or 'none') + local hls_acodec = tostring(fmt.acodec or 'none') + if hls_vcodec ~= 'none' and hls_acodec ~= 'none' then + return false + end + end + local vcodec = tostring(fmt.vcodec or 'none') local acodec = tostring(fmt.acodec or 'none') return not (vcodec == 'none' and acodec == 'none') end +function M._raw_format_display_id(fmt) + local format_id = trim(tostring(fmt and fmt.format_id or '')) + if format_id == '' then + return '' + end + local vcodec = tostring(fmt and fmt.vcodec or 'none') + local acodec = tostring(fmt and fmt.acodec or 'none') + if vcodec == 'none' and acodec ~= 'none' then + local base = format_id:match('^(%d+)%-%w+$') + if base and base ~= '' then + return base + end + end + return format_id +end + +function M._raw_format_selection_id(fmt) + local display_id = M._raw_format_display_id(fmt) + if display_id == '' then + return '' + end + local vcodec = tostring(fmt and fmt.vcodec or 'none') + local acodec = tostring(fmt and fmt.acodec or 'none') + if vcodec ~= 'none' and acodec == 'none' then + return display_id .. '+ba' + end + return display_id +end + +function M._raw_format_picker_score(fmt) + local note = trim(tostring(fmt and (fmt.format_note or fmt.format) or '')):lower() + local format_id = trim(tostring(fmt and fmt.format_id or '')):lower() + local prefers_original = (note:find('original', 1, true) or note:find('default', 1, true)) and 1 or 0 + local avoids_drc = (format_id:find('-drc', 1, true) or note:find('drc', 1, true)) and 0 or 1 + local magnitude = tonumber(fmt and (fmt.filesize or fmt.filesize_approx or fmt.abr or fmt.tbr) or 0) or 0 + return prefers_original * 1000000000000 + avoids_drc * 1000000000 + magnitude +end + local function _build_formats_table_from_raw_info(url, raw) if raw == nil then raw = mp.get_property_native('ytdl-raw-info') @@ -4505,10 +4587,12 @@ local function _build_formats_table_from_raw_info(url, raw) local rows = {} local browseable_count = 0 + local seen_selection_ids = {} for _, fmt in ipairs(formats) do if _is_browseable_raw_format(fmt) then browseable_count = browseable_count + 1 local format_id = trim(tostring(fmt.format_id or '')) + local display_id = M._raw_format_display_id(fmt) local resolution = trim(tostring(fmt.resolution or '')) if resolution == '' then local width = tonumber(fmt.width) @@ -4522,25 +4606,37 @@ local function _build_formats_table_from_raw_info(url, raw) local ext = trim(tostring(fmt.ext or '')) local size = _format_bytes_compact(fmt.filesize or fmt.filesize_approx) - local vcodec = tostring(fmt.vcodec or 'none') - local acodec = tostring(fmt.acodec or 'none') - local selection_id = format_id - if vcodec ~= 'none' and acodec == 'none' then - selection_id = format_id .. '+ba' + local selection_id = M._raw_format_selection_id(fmt) + if selection_id ~= '' then + local candidate = { + columns = { + { name = 'ID', value = display_id ~= '' and display_id or format_id }, + { name = 'Resolution', value = resolution }, + { name = 'Ext', value = ext }, + { name = 'Size', value = size }, + }, + selection_args = { '-format', selection_id }, + _picker_score = M._raw_format_picker_score(fmt), + } + local existing_index = seen_selection_ids[selection_id] + if existing_index then + local existing = rows[existing_index] + local existing_score = tonumber(existing and existing._picker_score or 0) or 0 + if candidate._picker_score > existing_score then + rows[existing_index] = candidate + end + else + rows[#rows + 1] = candidate + seen_selection_ids[selection_id] = #rows + end end - - rows[#rows + 1] = { - columns = { - { name = 'ID', value = format_id }, - { name = 'Resolution', value = resolution }, - { name = 'Ext', value = ext }, - { name = 'Size', value = size }, - }, - selection_args = { '-format', selection_id }, - } end end + for _, row in ipairs(rows) do + row._picker_score = nil + end + if browseable_count == 0 then return { title = 'Formats', rows = {} }, nil end diff --git a/MPV/pipeline_helper.py b/MPV/pipeline_helper.py index 2aed604..1eeea69 100644 --- a/MPV/pipeline_helper.py +++ b/MPV/pipeline_helper.py @@ -68,6 +68,7 @@ from SYS.logger import set_debug, debug, set_thread_stream # noqa: E402 from SYS.repl_queue import enqueue_repl_command # noqa: E402 from SYS.utils import format_bytes # noqa: E402 from ProviderCore.registry import get_plugin, get_plugin_class # noqa: E402 +from tool.ytdlp import get_display_format_id, get_selection_format_id # noqa: E402 REQUEST_PROP = "user-data/medeia-pipeline-request" RESPONSE_PROP = "user-data/medeia-pipeline-response" @@ -1028,6 +1029,7 @@ def _run_op(op: str, data: Any) -> Dict[str, Any]: format_id = str(fmt.get("format_id") or "").strip() if not format_id: continue + display_id = get_display_format_id(fmt) or format_id # Prefer human-ish resolution. resolution = str(fmt.get("resolution") or "").strip() @@ -1045,11 +1047,7 @@ def _run_op(op: str, data: Any) -> Dict[str, Any]: ext = str(fmt.get("ext") or "").strip() size = _format_bytes(fmt.get("filesize") or fmt.get("filesize_approx")) - vcodec = str(fmt.get("vcodec") or "none") - acodec = str(fmt.get("acodec") or "none") - selection_id = format_id - if vcodec != "none" and acodec == "none": - selection_id = f"{format_id}+ba" + selection_id = get_selection_format_id(fmt, video_audio_suffix="ba") or format_id # Build selection args compatible with MPV Lua picker. # Use -format instead of -query so Lua can extract the ID easily. @@ -1060,7 +1058,7 @@ def _run_op(op: str, data: Any) -> Dict[str, Any]: "columns": [ { "name": "ID", - "value": format_id + "value": display_id }, { "name": "Resolution", diff --git a/Provider/ytdlp.py b/Provider/ytdlp.py index 177c6c1..26bb54a 100644 --- a/Provider/ytdlp.py +++ b/Provider/ytdlp.py @@ -30,7 +30,9 @@ from tool.ytdlp import ( _download_with_timeout, _format_chapters_note, _read_text_file, + collapse_picker_formats, format_for_table_selection, + get_selection_format_id, is_browseable_format, is_url_supported_by_ytdlp, list_formats, @@ -349,25 +351,20 @@ def _format_id_for_query_index( raise ValueError("Unable to list formats for the URL") if s_val and not s_val.startswith("#"): - if any(str(f.get("format_id", "")) == s_val for f in fmts): - return s_val + for item in fmts: + if str(item.get("format_id", "")) == s_val: + normalized = get_selection_format_id(item, video_audio_suffix="bestaudio") + return normalized or s_val - candidate_formats = [f for f in fmts if is_browseable_format(f)] + candidate_formats = collapse_picker_formats(fmts, video_audio_suffix="bestaudio") filtered_formats = candidate_formats if candidate_formats else list(fmts) if idx <= 0 or idx > len(filtered_formats): raise ValueError(f"Format index {idx} out of range") chosen = filtered_formats[idx - 1] - selection_format_id = str(chosen.get("format_id") or "").strip() + selection_format_id = get_selection_format_id(chosen, video_audio_suffix="bestaudio") if not selection_format_id: raise ValueError("Selected format has no format_id") - try: - vcodec = str(chosen.get("vcodec", "none")) - acodec = str(chosen.get("acodec", "none")) - if vcodec != "none" and acodec == "none": - selection_format_id = f"{selection_format_id}+bestaudio" - except Exception: - pass return selection_format_id @@ -633,7 +630,7 @@ class ytdlp(TableProviderMixin, Provider): ) -> List[Dict[str, Any]]: if not isinstance(formats, list): return [] - browseable = [fmt for fmt in formats if isinstance(fmt, dict) and is_browseable_format(fmt)] + browseable = collapse_picker_formats(formats, video_audio_suffix="ba") return browseable if browseable else list(formats) def enrich_playlist_entries( @@ -797,7 +794,7 @@ class ytdlp(TableProviderMixin, Provider): if not formats or len(formats) <= 1: return False - candidate_formats = [f for f in formats if is_browseable_format(f)] + candidate_formats = collapse_picker_formats(formats, video_audio_suffix="bestaudio") filtered_formats = candidate_formats if candidate_formats else list(formats) base_cmd = f'download-file "{url}"' remaining_args = [arg for arg in args if arg not in [url] and not str(arg).startswith("-")] @@ -810,13 +807,7 @@ class ytdlp(TableProviderMixin, Provider): results_list: List[Dict[str, Any]] = [] for idx, fmt in enumerate(filtered_formats, 1): - format_id = fmt.get("format_id", "") - selection_format_id = format_id - try: - if str(fmt.get("vcodec", "none")) != "none" and str(fmt.get("acodec", "none")) == "none" and format_id: - selection_format_id = f"{format_id}+bestaudio" - except Exception: - selection_format_id = format_id + selection_format_id = get_selection_format_id(fmt, video_audio_suffix="bestaudio") format_dict = format_for_table_selection( fmt, diff --git a/SYS/cmdlet_catalog.py b/SYS/cmdlet_catalog.py index ea3158f..70778d4 100644 --- a/SYS/cmdlet_catalog.py +++ b/SYS/cmdlet_catalog.py @@ -122,6 +122,9 @@ def _normalize_arg(arg: Any) -> Dict[str, Any]: "choices": arg.get("choices", []) or [], "alias": arg.get("alias", ""), "variadic": arg.get("variadic", False), + "query_key": arg.get("query_key", None), + "query_aliases": arg.get("query_aliases", []) or [], + "query_only": bool(arg.get("query_only", False)), "requires_db": bool(arg.get("requires_db", False)), } @@ -134,6 +137,9 @@ def _normalize_arg(arg: Any) -> Dict[str, Any]: "choices": getattr(arg, "choices", []) or [], "alias": getattr(arg, "alias", ""), "variadic": getattr(arg, "variadic", False), + "query_key": getattr(arg, "query_key", None), + "query_aliases": getattr(arg, "query_aliases", []) or [], + "query_only": bool(getattr(arg, "query_only", False)), "requires_db": bool(getattr(arg, "requires_db", False)), } diff --git a/SYS/pipeline.py b/SYS/pipeline.py index 54e19b2..76478b3 100644 --- a/SYS/pipeline.py +++ b/SYS/pipeline.py @@ -1983,6 +1983,8 @@ class PipelineExecutor: command_expanded = False example_selector_triggered = False normalized_source_cmd = str(source_cmd or "").replace("_", "-").strip().lower() + prefer_row_action = False + preferred_row_action = None if normalized_source_cmd in HELP_EXAMPLE_SOURCE_COMMANDS and selection_indices: try: @@ -2011,8 +2013,6 @@ class PipelineExecutor: else: selected_row_args: List[str] = [] skip_pipe_expansion = source_cmd in {".pipe", ".mpv"} and len(stages) > 0 - prefer_row_action = False - preferred_row_action = None if len(selection_indices) == 1 and not stages: try: row_action = _get_row_action(selection_indices[0]) diff --git a/cmdlet/screen_shot.py b/cmdlet/screen_shot.py index 49dd3b6..1430a1e 100644 --- a/cmdlet/screen_shot.py +++ b/cmdlet/screen_shot.py @@ -73,6 +73,61 @@ DEFAULT_VIEWPORT: dict[str, } ARCHIVE_TIMEOUT = 30.0 +ADBLOCK_HOST_PATTERNS: tuple[str, ...] = ( + "doubleclick.net", + "googlesyndication.com", + "googleadservices.com", + "google-analytics.com", + "googletagmanager.com", + "googletagservices.com", + "adservice.google.", + "adsystem.com", + "adnxs.com", + "taboola.com", + "outbrain.com", + "criteo.com", + "casalemedia.com", + "rubiconproject.com", + "pubmatic.com", + "scorecardresearch.com", + "quantserve.com", + "zedo.com", + "moatads.com", + "amazon-adsystem.com", + "media.net", +) + +ADBLOCK_URL_PATTERNS: tuple[str, ...] = ( + "/ads/", + "?ads=", + "&ads=", + "advertisement", + "googlesyndication", + "doubleclick", + "adservice", + "adserver", + "prebid", + "taboola", + "outbrain", + "amazon-adsystem", +) + +ADBLOCK_CSS_SELECTORS: tuple[str, ...] = ( + "[id*='ad-']", + "[id^='ad-']", + "[id*='ads-']", + "[class*=' ad-']", + "[class^='ad-']", + "[class*='ads-']", + "[class*='advert']", + "[id*='sponsor']", + "[class*='sponsor']", + "iframe[src*='doubleclick.net']", + "iframe[src*='googlesyndication.com']", + "iframe[src*='taboola.com']", + "iframe[src*='outbrain.com']", +) + # WebP has a hard maximum dimension per side. # Pillow typically fails with: "encoding error 5: Image size exceeds WebP limit of 16383 pixels" WEBP_MAX_DIM = 16_383 @@ -136,6 +191,7 @@ class ScreenshotOptions: interactive_pick: bool = False interactive_pick_timeout_s: float = 120.0 quality: int = 8 + adblock: bool = True playwright_tool: Optional[PlaywrightTool] = None @@ -255,11 +311,14 @@ def _normalize_format(fmt: Optional[str]) -> str: if not fmt: return "webp" value = fmt.strip().lower() + if value in {"mht", "mhtml"}: + return "mhtml" if value in {"jpg", "jpeg"}: return "jpeg" if value in {"png", "pdf", + "mhtml", "webp"}: return value return "webp" @@ -281,6 +340,10 @@ def _normalize_capture_mode(value: Optional[str]) -> str: return "" +def _format_supports_target_selection(fmt: Optional[str]) -> bool: + return _normalize_format(fmt) not in {"pdf", "mhtml"} + + def _normalize_quality(value: Any) -> int: try: quality = int(str(value).strip()) @@ -289,6 +352,92 @@ def _normalize_quality(value: Any) -> int: return max(1, min(10, quality)) +def _normalize_bool(value: Any, *, default: bool = False) -> bool: + if value is None: + return bool(default) + if isinstance(value, bool): + return value + text = str(value).strip().lower() + if not text: + return bool(default) + if text in {"1", "true", "yes", "on", "enable", "enabled"}: + return True + if text in {"0", "false", "no", "off", "disable", "disabled"}: + return False + return bool(default) + + +def _url_matches_adblock(url: str) -> bool: + lowered = str(url or "").strip().lower() + if not lowered: + return False + try: + host = str(urlsplit(lowered).hostname or "").strip().lower() + except Exception: + host = "" + if host and any(pattern in host for pattern in ADBLOCK_HOST_PATTERNS): + return True + return any(pattern in lowered for pattern in ADBLOCK_URL_PATTERNS) + + +def _install_adblock(page: Any) -> Optional[Dict[str, int]]: + try: + state: Dict[str, int] = {"blocked": 0} + + def _route(route: Any) -> None: + try: + request = route.request + url = str(getattr(request, "url", "") or "") + resource_type = str(getattr(request, "resource_type", "") or "").strip().lower() + if resource_type != "document" and _url_matches_adblock(url): + state["blocked"] = int(state.get("blocked", 0)) + 1 + route.abort("blockedbyclient") + return + except Exception: + pass + route.continue_() + + page.route("**/*", _route) + return state + except Exception: + return None + + +def _remove_ad_elements(page: Any) -> int: + try: + selectors_json = repr(list(ADBLOCK_CSS_SELECTORS)) + removed = page.evaluate( + f""" + () => {{ + const selectors = {selectors_json}; + const seen = new Set(); + let removed = 0; + for (const selector of selectors) {{ + let nodes = []; + try {{ + nodes = Array.from(document.querySelectorAll(selector)); + }} catch (e) {{ + continue; + }} + for (const node of nodes) {{ + if (!(node instanceof Element)) continue; + if (seen.has(node)) continue; + seen.add(node); + try {{ + node.remove(); + removed += 1; + }} catch (e) {{}} + }} + }} + return removed; + }} + """ + ) + return int(removed or 0) + except Exception: + return 0 + + def _jpeg_quality_from_level(level: int) -> int: normalized = _normalize_quality(level) if normalized >= 10: @@ -577,6 +726,9 @@ def _prepare_capture_page( progress: PipelineProgress, ) -> str: navigation_status = "loaded" + adblock_state: Optional[Dict[str, int]] = None + if options.adblock: + adblock_state = _install_adblock(page) progress.step("loading navigating") try: tool.goto(page, options.url) @@ -611,6 +763,14 @@ def _prepare_capture_page( }); """ ) + removed_ads = 0 + if options.adblock: + removed_ads = _remove_ad_elements(page) + blocked_count = int((adblock_state or {}).get("blocked", 0)) + if blocked_count or removed_ads: + warnings.append( + f"adblock filtered {blocked_count} request(s) and removed {removed_ads} page element(s)" + ) return navigation_status @@ -1034,6 +1194,32 @@ def _capture_selector_screenshot( page.screenshot(**screenshot_kwargs) +def _capture_mhtml(page: Any, destination: Path) -> None: + session = None + try: + context = getattr(page, "context", None) + if context is None or not hasattr(context, "new_cdp_session"): + raise ScreenshotError("MHTML output requires Chromium CDP session support") + + session = context.new_cdp_session(page) + session.send("Page.enable") + snapshot = session.send("Page.captureSnapshot", {"format": "mhtml"}) + data = snapshot.get("data") if isinstance(snapshot, dict) else None + if not data: + raise ScreenshotError("Chromium did not return any MHTML snapshot data") + destination.write_text(str(data), encoding="utf-8", newline="") + except ScreenshotError: + raise + except Exception as exc: + raise ScreenshotError(f"Could not capture MHTML snapshot: {exc}") from exc + finally: + if session is not None: + try: + session.detach() + except Exception: + pass + + def _convert_to_webp( src_png: Path, dst_webp: Path, @@ -1364,7 +1550,7 @@ def _capture( format_name = _normalize_format(options.output_format) capture_headless = bool(options.headless) picker_headless = capture_headless - if options.interactive_pick and format_name != "pdf": + if options.interactive_pick and _format_supports_target_selection(format_name): picker_headless = False capture_headless = True elif format_name == "pdf": @@ -1405,10 +1591,19 @@ def _capture( warnings.append( "pdf output requires headless Chromium; overriding headless mode" ) + if not _format_supports_target_selection(format_name): + if options.interactive_pick: + warnings.append( + f"{format_name} output captures the full page; interactive element picking is ignored" + ) + if options.prefer_platform_target: + warnings.append( + f"{format_name} output captures the full page; selector targeting is ignored" + ) try: element_captured = False - if options.interactive_pick and format_name != "pdf": + if options.interactive_pick and _format_supports_target_selection(format_name): selected_selector = "" with tool.open_page( headless=picker_headless, @@ -1463,7 +1658,7 @@ def _capture( progress, ) # Attempt platform-specific target capture if requested (and not PDF) - if options.prefer_platform_target and format_name != "pdf": + if options.prefer_platform_target and _format_supports_target_selection(format_name): progress.step("capturing locating target") try: _platform_preprocess(options.url, page, warnings) @@ -1501,6 +1696,10 @@ def _capture( page.emulate_media(media="print") progress.step("capturing output") page.pdf(path=str(destination), print_background=True) + elif format_name == "mhtml": + capture_mode = "mhtml" + progress.step("capturing output") + _capture_mhtml(page, destination) else: screenshot_kwargs: Dict[str, Any] = { "path": str(destination) @@ -1579,10 +1778,10 @@ def _capture_screenshot( capture_mode = "" capture_target = "" - will_target = bool(options.prefer_platform_target or options.interactive_pick) and requested_format != "pdf" + will_target = bool(options.prefer_platform_target or options.interactive_pick) and _format_supports_target_selection(requested_format) will_convert = requested_format == "webp" will_archive = bool(options.archive and options.url) - interactive_extra_steps = 5 if (options.interactive_pick and requested_format != "pdf") else 0 + interactive_extra_steps = 5 if (options.interactive_pick and _format_supports_target_selection(requested_format)) else 0 total_steps = ( 9 + (1 if will_target else 0) + interactive_extra_steps + (1 if will_convert else 0) + (1 if will_archive else 0) @@ -1685,6 +1884,7 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: format_value = parsed.get("format") capture_mode_value = _normalize_capture_mode(parsed.get("capture_mode")) raw_quality_value = parsed.get("quality") + adblock_value = parsed.get("adblock") quality_value: Optional[int] = None if not format_value: try: @@ -1709,6 +1909,7 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: quality_value = None if quality_value is None: quality_value = _normalize_quality(None) + adblock_enabled = _normalize_bool(adblock_value, default=True) storage_value = parsed.get("storage") selector_arg = parsed.get("selector") @@ -1774,7 +1975,8 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: ("archive", archive_enabled), ("format", format_name), ("quality", quality_value), - ("capture_mode", capture_mode_value or ("interactive" if interactive_default and format_name != "pdf" else "auto")), + ("adblock", adblock_enabled), + ("capture_mode", capture_mode_value or ("interactive" if interactive_default and _format_supports_target_selection(format_name) else "auto")), ("output_dir", screenshot_dir), ("output_dir_source", screenshot_dir_source), ], @@ -1848,6 +2050,7 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: full_page=True, interactive_pick=False, quality=quality_value, + adblock=adblock_enabled, playwright_tool=shared_playwright_tool, ) @@ -1860,7 +2063,7 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: options.target_selectors = None elif capture_mode_value == "interactive": options.interactive_pick = True - elif interactive_default and format_name != "pdf": + elif interactive_default and _format_supports_target_selection(format_name): options.interactive_pick = True elif auto_selectors: options.prefer_platform_target = True @@ -1957,29 +2160,43 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: CMDLET = Cmdlet( name="screen-shot", summary="Capture a website screenshot", - usage="screen-shot [options] [-query \"format:full quality:10\"]", + usage="screen-shot [options] [-query \"format:webp quality:10 mode:full\"]", alias=["screenshot", "ss"], arg=[ SharedArgs.URL, - CmdletArg( - name="format", + sh.QueryArg( + "format", + key="format", type="string", - description="Output format: webp, png, jpeg, or pdf" + choices=["webp", "png", "jpeg", "jpg", "pdf", "mhtml", "mht"], + query_only=True, + description="Output format via -query, e.g. format:webp, format:pdf, or format:mhtml" ), sh.QueryArg( "capture_mode", - key="format", + key="mode", aliases=["capture", "mode"], + choices=["full", "interactive"], query_only=True, - description="Capture mode via -query, e.g. format:full or format:interactive" + description="Capture mode via -query, e.g. mode:full or mode:interactive" ), sh.QueryArg( "quality", key="quality", + choices=["1", "2", "3", "4", "5", "6", "7", "8", "9", "10"], query_only=True, description="Screenshot quality via -query, 1-10. 10 uses highest quality and lossless webp." ), + sh.QueryArg( + "adblock", + key="adblock", + aliases=["ads", "blockads"], + choices=["true", "false", "on", "off", "yes", "no", "1", "0"], + handler=lambda value: _normalize_bool(value, default=True), + query_only=True, + description="Ad and tracker blocking via -query. Defaults to true; use adblock:false to disable." + ), CmdletArg( name="selector", type="string", @@ -1991,9 +2208,13 @@ CMDLET = Cmdlet( detail=[ "Uses Playwright Chromium engine only. Install Chromium with: python ./scripts/bootstrap.py --playwright-only --browsers chromium", "PDF output requires headless Chromium (the cmdlet will enforce headless mode for PDF).", + "MHTML output uses Chromium page snapshots to save the full page as a single archival file.", + "Basic ad and tracker blocking is enabled by default during capture so MHTML archives are less likely to embed ad content.", "Screenshots are temporary artifacts stored in the configured `temp` directory.", "Interactive single-URL runs open a headful browser picker by default so you can hover and click the element to capture.", - "Use -query \"format:full\" to bypass the picker and capture the full page directly.", + "Use -query \"mode:full\" to bypass the picker and capture the full page directly.", + "Use -query \"format:webp\", \"format:pdf\", or \"format:mhtml\" to choose the output format.", + "Use -query \"adblock:false\" if a site breaks and you need the raw unfiltered page.", "Use -query \"quality:1\" through \"quality:10\" to control jpeg/webp compression. quality:10 uses lossless webp.", ], ) diff --git a/tool/ytdlp.py b/tool/ytdlp.py index 18e1256..28ccb9f 100644 --- a/tool/ytdlp.py +++ b/tool/ytdlp.py @@ -613,6 +613,18 @@ def is_browseable_format(fmt: Any) -> bool: if format_id.lower().startswith("sb"): return False + + protocol = str(fmt.get("protocol") or "").strip().lower() + size_bytes = fmt.get("filesize") or fmt.get("filesize_approx") + if ( + protocol in {"m3u8", "m3u8_native"} + and re.fullmatch(r"\d+-\d+", format_id) + and not size_bytes + ): + vcodec = str(fmt.get("vcodec", "none")) + acodec = str(fmt.get("acodec", "none")) + if vcodec != "none" and acodec != "none": + return False # Filter out formats with no audio and no video vcodec = str(fmt.get("vcodec", "none")) @@ -620,6 +632,80 @@ def is_browseable_format(fmt: Any) -> bool: return not (vcodec == "none" and acodec == "none") +def get_selection_format_id( + fmt: Dict[str, Any], + *, + video_audio_suffix: str = "ba", +) -> str: + format_id = str(fmt.get("format_id") or "").strip() + if not format_id: + return "" + + vcodec = str(fmt.get("vcodec", "none")) + acodec = str(fmt.get("acodec", "none")) + selector_id = format_id + + match = re.fullmatch(r"(?P\d+)-[A-Za-z0-9]+", format_id) + if match and vcodec == "none" and acodec != "none": + selector_id = match.group("base") + + if selector_id and vcodec != "none" and acodec == "none" and video_audio_suffix: + selector_id = f"{selector_id}+{video_audio_suffix}" + + return selector_id + + +def get_display_format_id(fmt: Dict[str, Any]) -> str: + format_id = str(fmt.get("format_id") or "").strip() + if not format_id: + return "" + selector_id = get_selection_format_id(fmt, video_audio_suffix="") + return selector_id or format_id + + +def _picker_format_score(fmt: Dict[str, Any]) -> tuple[int, int, float]: + note = str(fmt.get("format_note") or fmt.get("format") or "").strip().lower() + format_id = str(fmt.get("format_id") or "").strip().lower() + prefers_original = 1 if ("original" in note or "default" in note) else 0 + avoids_drc = 0 if ("-drc" in format_id or "drc" in note) else 1 + magnitude = 0.0 + for key in ("filesize", "filesize_approx", "abr", "tbr"): + value = fmt.get(key) + if isinstance(value, (int, float)): + magnitude = float(value) + break + if isinstance(value, str): + try: + magnitude = float(value.strip()) + break + except Exception: + pass + return (prefers_original, avoids_drc, magnitude) + + +def collapse_picker_formats( + formats: Sequence[Dict[str, Any]], + *, + video_audio_suffix: str = "ba", +) -> List[Dict[str, Any]]: + collapsed: Dict[str, Dict[str, Any]] = {} + order: List[str] = [] + for fmt in formats: + if not isinstance(fmt, dict) or not is_browseable_format(fmt): + continue + selector_id = get_selection_format_id(fmt, video_audio_suffix=video_audio_suffix) + if not selector_id: + continue + current = collapsed.get(selector_id) + if current is None: + collapsed[selector_id] = fmt + order.append(selector_id) + continue + if _picker_format_score(fmt) > _picker_format_score(current): + collapsed[selector_id] = fmt + return [collapsed[key] for key in order if key in collapsed] + + def format_for_table_selection( fmt: Dict[str, Any], url: str, @@ -648,6 +734,7 @@ def format_for_table_selection( results = [format_for_table_selection(f, url, i+1) for i, f in enumerate(browseable)] """ format_id = fmt.get("format_id", "") + display_format_id = get_display_format_id(fmt) resolution = fmt.get("resolution", "") ext = fmt.get("ext", "") vcodec = fmt.get("vcodec", "none") @@ -657,10 +744,10 @@ def format_for_table_selection( # If not provided, compute selection format ID (add +ba for video-only) if selection_format_id is None: - selection_format_id = format_id + selection_format_id = get_selection_format_id(fmt, video_audio_suffix="ba") try: - if vcodec != "none" and acodec == "none" and format_id: - selection_format_id = f"{format_id}+ba" + if not selection_format_id and format_id: + selection_format_id = format_id except Exception: from SYS.logger import logger logger.exception("Failed to compute selection_format_id for format: %s", fmt) @@ -694,14 +781,14 @@ def format_for_table_selection( # Build table row return { "table": "download-file", - "title": f"Format {format_id}", + "title": f"Format {display_format_id or format_id}", "url": url, "target": url, "detail": format_desc, "annotations": [ext, resolution] if resolution else [ext], "media_kind": "format", "columns": [ - ("ID", format_id), + ("ID", display_format_id or format_id), ("Resolution", resolution or "N/A"), ("Ext", ext), ("Size", size_str or ""),