"""Screen-shot cmdlet for capturing screenshots of url in a pipeline. This cmdlet processes files through the pipeline and creates screenshots using Playwright, marking them as temporary artifacts for cleanup. """ from __future__ import annotations import hashlib import io import sys import tempfile import time from datetime import datetime import httpx from dataclasses import dataclass, field from pathlib import Path from typing import Any, Dict, List, Optional, Sequence, Tuple from urllib.parse import urlsplit, quote, urljoin, unquote from SYS.logger import debug_panel, log, is_debug_enabled from SYS.item_accessors import extract_item_tags, get_result_title from API.HTTP import HTTPClient from SYS.pipeline_progress import PipelineProgress from SYS.utils import ensure_directory, sha256_file, unique_path, unique_preserve_order from .. import _shared as sh Cmdlet = sh.Cmdlet CmdletArg = sh.CmdletArg SharedArgs = sh.SharedArgs create_pipe_object_result = sh.create_pipe_object_result normalize_result_input = sh.normalize_result_input should_show_help = sh.should_show_help get_field = sh.get_field parse_cmdlet_args = sh.parse_cmdlet_args from SYS import pipeline as pipeline_context # ============================================================================ # CMDLET Metadata Declaration # ============================================================================ # ============================================================================ # Playwright & Screenshot Dependencies # ============================================================================ from tool.playwright import PlaywrightTimeoutError, PlaywrightTool try: from SYS.config import resolve_output_dir except ImportError: try: _parent_dir = str(Path(__file__).parent.parent) if _parent_dir not in sys.path: sys.path.insert(0, _parent_dir) from SYS.config import resolve_output_dir except ImportError: resolve_output_dir = None # ============================================================================ # Screenshot Constants & Configuration # ============================================================================ USER_AGENT = ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/120.0.0.0 Safari/537.36" ) DEFAULT_VIEWPORT: dict[str, int] = { "width": 1920, "height": 1080 } ARCHIVE_TIMEOUT = 30.0 ADBLOCK_HOST_PATTERNS: tuple[str, ...] = ( "doubleclick.net", "googlesyndication.com", "googleadservices.com", "google-analytics.com", "googletagmanager.com", "googletagservices.com", "adservice.google.", "adsystem.com", "adnxs.com", "taboola.com", "outbrain.com", "criteo.com", "casalemedia.com", "rubiconproject.com", "pubmatic.com", "scorecardresearch.com", "quantserve.com", "zedo.com", "moatads.com", "amazon-adsystem.com", "media.net", ) ADBLOCK_URL_PATTERNS: tuple[str, ...] = ( "/ads/", "?ads=", "&ads=", "advertisement", "googlesyndication", "doubleclick", "adservice", "adserver", "prebid", "taboola", "outbrain", "amazon-adsystem", ) ADBLOCK_CSS_SELECTORS: tuple[str, ...] = ( "[id*='ad-']", "[id^='ad-']", "[id*='ads-']", "[class*=' ad-']", "[class^='ad-']", "[class*='ads-']", "[class*='advert']", "[id*='sponsor']", "[class*='sponsor']", "iframe[src*='doubleclick.net']", "iframe[src*='googlesyndication.com']", "iframe[src*='taboola.com']", "iframe[src*='outbrain.com']", ) # WebP has a hard maximum dimension per side. # Pillow typically fails with: "encoding error 5: Image size exceeds WebP limit of 16383 pixels" WEBP_MAX_DIM = 16_383 # Configurable selectors for specific websites SITE_SELECTORS: Dict[str, List[str]] = { "twitter.com": [ "article[role='article']", "div[data-testid='tweet']", "div[data-testid='cellInnerDiv'] article", ], "x.com": [ "article[role='article']", "div[data-testid='tweet']", "div[data-testid='cellInnerDiv'] article", ], "instagram.com": [ "article[role='presentation']", "article[role='article']", "div[role='dialog'] article", "section main article", ], "reddit.com": [ "shreddit-post", "div[data-testid='post-container']", "div[data-click-id='background']", "article", ], "rumble.com": [ "rumble-player, iframe.rumble", "div.video-item--main", "main article", ], } class ScreenshotError(RuntimeError): """Raised when screenshot capture or upload fails.""" @dataclass(slots=True) class ScreenshotOptions: """Options controlling screenshot capture and post-processing.""" output_dir: Path url: str = "" output_path: Optional[Path] = None full_page: bool = True headless: bool = True wait_after_load: float = 6.0 wait_for_article: bool = False replace_video_posters: bool = True tag: Sequence[str] = () archive: bool = False archive_timeout: float = ARCHIVE_TIMEOUT output_format: Optional[str] = None prefer_platform_target: bool = False target_selectors: Optional[Sequence[str]] = None selector_timeout_ms: int = 10_000 interactive_pick: bool = False interactive_pick_timeout_s: float = 120.0 quality: int = 8 adblock: bool = True playwright_tool: Optional[PlaywrightTool] = None @dataclass(slots=True) class ScreenshotResult: """Details about the captured screenshot.""" path: Path tag_applied: List[str] archive_url: List[str] url: List[str] capture_mode: str = "" capture_target: str = "" warnings: List[str] = field(default_factory=list) # ============================================================================ # Helper Functions # ============================================================================ def _slugify_url(url: str) -> str: """Convert URL to filesystem-safe slug.""" parsed = urlsplit(url) candidate = f"{parsed.netloc}{parsed.path}" if parsed.query: candidate += f"?{parsed.query}" slug = "".join(char if char.isalnum() else "-" for char in candidate.lower()) slug = slug.strip("-") or "screenshot" return slug[:100] def _tags_from_url(url: str) -> List[str]: """Derive simple tags from a URL. - site: (strips leading www.) - title: derived from the last path segment, with extension removed and separators (-, _, %) normalized to spaces. """ u = str(url or "").strip() if not u: return [] parsed = None try: parsed = urlsplit(u) host = ( str( getattr(parsed, "hostname", None) or getattr(parsed, "netloc", "") or "" ).strip().lower() ) except Exception: parsed = None host = "" if host: # Drop credentials and port if present. if "@" in host: host = host.rsplit("@", 1)[-1] if ":" in host: host = host.split(":", 1)[0] if host.startswith("www."): host = host[len("www."):] path = "" if parsed is not None: try: path = str(getattr(parsed, "path", "") or "") except Exception: path = "" last = "" if path: try: last = path.rsplit("/", 1)[-1] except Exception: last = "" try: last = unquote(last or "") except Exception: last = last or "" if last and "." in last: # Drop a single trailing extension (e.g. .html, .php). last = last.rsplit(".", 1)[0] for sep in ("_", "-", "%"): if last and sep in last: last = last.replace(sep, " ") title = " ".join(str(last or "").split()).strip().lower() tags: List[str] = [] if host: tags.append(f"site:{host}") if title: tags.append(f"title:{title}") return tags def _title_from_url(url: str) -> str: """Return the normalized title derived from a URL's last path segment.""" for t in _tags_from_url(url): if str(t).lower().startswith("title:"): return str(t)[len("title:"):].strip() return "" def _normalize_format(fmt: Optional[str]) -> str: """Normalize output format to valid values.""" if not fmt: return "webp" value = fmt.strip().lower() if value in {"mht", "mhtml"}: return "mhtml" if value in {"jpg", "jpeg"}: return "jpeg" if value in {"png", "pdf", "mhtml", "webp"}: return value return "webp" def _format_suffix(fmt: str) -> str: """Get file suffix for format.""" if fmt == "jpeg": return ".jpg" return f".{fmt}" def _normalize_capture_mode(value: Optional[str]) -> str: mode = str(value or "").strip().lower() if mode in {"full", "page", "fullscreen"}: return "full" if mode in {"pick", "picker", "interactive", "element", "select"}: return "interactive" return "" def _format_supports_target_selection(fmt: Optional[str]) -> bool: return _normalize_format(fmt) not in {"pdf", "mhtml"} def _normalize_quality(value: Any) -> int: try: quality = int(str(value).strip()) except Exception: quality = 8 return max(1, min(10, quality)) def _normalize_bool(value: Any, *, default: bool = False) -> bool: if value is None: return bool(default) if isinstance(value, bool): return value text = str(value).strip().lower() if not text: return bool(default) if text in {"1", "true", "yes", "on", "enable", "enabled"}: return True if text in {"0", "false", "no", "off", "disable", "disabled"}: return False return bool(default) def _url_matches_adblock(url: str) -> bool: lowered = str(url or "").strip().lower() if not lowered: return False try: host = str(urlsplit(lowered).hostname or "").strip().lower() except Exception: host = "" if host and any(pattern in host for pattern in ADBLOCK_HOST_PATTERNS): return True return any(pattern in lowered for pattern in ADBLOCK_URL_PATTERNS) def _install_adblock(page: Any) -> Optional[Dict[str, int]]: try: state: Dict[str, int] = {"blocked": 0} def _route(route: Any) -> None: try: request = route.request url = str(getattr(request, "url", "") or "") resource_type = str(getattr(request, "resource_type", "") or "").strip().lower() if resource_type != "document" and _url_matches_adblock(url): state["blocked"] = int(state.get("blocked", 0)) + 1 route.abort("blockedbyclient") return except Exception: pass route.continue_() page.route("**/*", _route) return state except Exception: return None def _remove_ad_elements(page: Any) -> int: try: selectors_json = repr(list(ADBLOCK_CSS_SELECTORS)) removed = page.evaluate( f""" () => {{ const selectors = {selectors_json}; const seen = new Set(); let removed = 0; for (const selector of selectors) {{ let nodes = []; try {{ nodes = Array.from(document.querySelectorAll(selector)); }} catch (e) {{ continue; }} for (const node of nodes) {{ if (!(node instanceof Element)) continue; if (seen.has(node)) continue; seen.add(node); try {{ node.remove(); removed += 1; }} catch (e) {{}} }} }} return removed; }} """ ) return int(removed or 0) except Exception: return 0 def _jpeg_quality_from_level(level: int) -> int: normalized = _normalize_quality(level) if normalized >= 10: return 100 return 45 + ((normalized - 1) * 6) def _webp_quality_settings(level: int) -> Dict[str, Any]: normalized = _normalize_quality(level) if normalized >= 10: return { "quality": 100, "method": 6, "lossless": True, } return { "quality": 45 + ((normalized - 1) * 6), "method": 6, "lossless": False, } def _stdin_interactive() -> bool: try: return bool(sys.stdin and sys.stdin.isatty()) except Exception: return False def _debug_rows(rows: Sequence[tuple[str, Any]]) -> List[tuple[str, Any]]: normalized: List[tuple[str, Any]] = [] for key, value in rows: if isinstance(value, (list, tuple, set)): value = ", ".join(str(item) for item in value) if value else "" elif isinstance(value, Path): value = str(value) elif value in (None, ""): value = "" normalized.append((str(key), value)) return normalized def _show_debug_panel( title: str, rows: Sequence[tuple[str, Any]], *, border_style: str = "cyan", ) -> None: try: debug_panel(title, _debug_rows(rows), border_style=border_style) except Exception: pass def _install_element_picker(page: Any) -> None: page.evaluate( """ () => { try { if (typeof window.__medeiaPickerCleanup === 'function') { window.__medeiaPickerCleanup(); } window.__medeiaPickerResult = null; const cssEscape = (value) => { try { if (window.CSS && typeof window.CSS.escape === 'function') { return window.CSS.escape(String(value || '')); } } catch (e) {} return String(value || '').replace(/[^a-zA-Z0-9_-]/g, '\\$&'); }; const buildSelector = (element) => { if (!(element instanceof Element)) return ''; if (element.id) return '#' + cssEscape(element.id); const parts = []; let node = element; while (node && node.nodeType === 1 && parts.length < 8) { let part = String(node.localName || node.tagName || '').toLowerCase(); if (!part) break; const classes = Array.from(node.classList || []).filter(Boolean).slice(0, 2); if (classes.length) { part += classes.map((name) => '.' + cssEscape(name)).join(''); } const parent = node.parentElement; if (parent) { const siblings = Array.from(parent.children).filter((child) => child.localName === node.localName); if (siblings.length > 1) { part += `:nth-of-type(${siblings.indexOf(node) + 1})`; } } parts.unshift(part); const selector = parts.join(' > '); try { if (document.querySelectorAll(selector).length === 1) { return selector; } } catch (e) {} node = parent; } return parts.join(' > '); }; const box = document.createElement('div'); box.setAttribute('data-medeia-picker', 'box'); box.style.position = 'fixed'; box.style.pointerEvents = 'none'; box.style.zIndex = '2147483646'; box.style.border = '2px solid #ffb000'; box.style.background = 'rgba(255, 176, 0, 0.12)'; box.style.boxShadow = '0 0 0 99999px rgba(0, 0, 0, 0.12)'; box.style.display = 'none'; const banner = document.createElement('div'); banner.setAttribute('data-medeia-picker', 'banner'); banner.style.position = 'fixed'; banner.style.top = '12px'; banner.style.left = '50%'; banner.style.transform = 'translateX(-50%)'; banner.style.zIndex = '2147483647'; banner.style.padding = '10px 14px'; banner.style.background = 'rgba(18, 18, 18, 0.92)'; banner.style.color = '#ffffff'; banner.style.font = '13px/1.4 sans-serif'; banner.style.borderRadius = '10px'; banner.style.boxShadow = '0 8px 24px rgba(0, 0, 0, 0.35)'; banner.style.maxWidth = 'min(90vw, 920px)'; banner.style.pointerEvents = 'none'; banner.textContent = 'Medeia screenshot picker: hover an element, click to capture it, or press Escape to cancel.'; const updateBox = (element) => { if (!(element instanceof Element)) { box.style.display = 'none'; return; } const rect = element.getBoundingClientRect(); box.style.display = 'block'; box.style.left = rect.left + 'px'; box.style.top = rect.top + 'px'; box.style.width = rect.width + 'px'; box.style.height = rect.height + 'px'; }; const finish = (payload) => { if (window.__medeiaPickerResult) { return; } window.__medeiaPickerResult = payload; }; const onMove = (event) => { const target = event.target instanceof Element ? event.target : null; if (!target || target.closest('[data-medeia-picker]')) { return; } updateBox(target); }; const onPointerDown = (event) => { const target = event.target instanceof Element ? event.target : null; if (!target || target.closest('[data-medeia-picker]')) { return; } event.preventDefault(); event.stopPropagation(); event.stopImmediatePropagation(); const rect = target.getBoundingClientRect(); finish({ cancelled: false, selector: buildSelector(target), tag: String(target.localName || target.tagName || '').toLowerCase(), text: String((target.textContent || '').trim()).slice(0, 200), width: Math.round(rect.width || 0), height: Math.round(rect.height || 0), }); }; const onKeyDown = (event) => { if (event.key !== 'Escape') { return; } event.preventDefault(); event.stopPropagation(); event.stopImmediatePropagation(); finish({ cancelled: true }); }; window.__medeiaPickerCleanup = () => { window.removeEventListener('mousemove', onMove, true); window.removeEventListener('pointerdown', onPointerDown, true); window.removeEventListener('keydown', onKeyDown, true); try { box.remove(); } catch (e) {} try { banner.remove(); } catch (e) {} try { delete window.__medeiaPickerCleanup; } catch (e) {} }; window.addEventListener('mousemove', onMove, true); window.addEventListener('pointerdown', onPointerDown, true); window.addEventListener('keydown', onKeyDown, true); document.documentElement.appendChild(box); document.documentElement.appendChild(banner); try { window.focus(); } catch (e) {} try { document.documentElement.setAttribute('tabindex', '-1'); document.documentElement.focus({ preventScroll: true }); } catch (e) {} } catch (e) { window.__medeiaPickerResult = { cancelled: true, error: String(e || ''), }; } } """ ) def _clear_element_picker(page: Any) -> None: try: page.evaluate( """ () => { try { if (typeof window.__medeiaPickerCleanup === 'function') { window.__medeiaPickerCleanup(); } } catch (e) {} } """ ) except Exception: pass def _interactive_pick_selector(page: Any, *, timeout_s: float) -> Dict[str, Any]: picked: Dict[str, Any] = {} _install_element_picker(page) deadline = time.time() + max(5.0, float(timeout_s or 0.0)) try: while time.time() < deadline: try: if page.is_closed(): picked["cancelled"] = True break except Exception: break try: payload = page.evaluate("() => window.__medeiaPickerResult || null") except Exception: payload = None if isinstance(payload, dict) and payload: picked.update(payload) break time.sleep(0.05) finally: _clear_element_picker(page) if not picked: raise ScreenshotError("Timed out waiting for element selection") if picked.get("cancelled"): error_text = str(picked.get("error") or "").strip() if error_text: raise ScreenshotError(f"Element selection cancelled: {error_text}") raise ScreenshotError("Element selection cancelled") selector = str(picked.get("selector") or "").strip() if not selector: raise ScreenshotError("Element picker did not return a valid selector") return picked def _prepare_capture_page( tool: PlaywrightTool, page: Any, options: ScreenshotOptions, warnings: List[str], progress: PipelineProgress, ) -> str: navigation_status = "loaded" adblock_state: Optional[Dict[str, int]] = None if options.adblock: adblock_state = _install_adblock(page) progress.step("loading navigating") try: tool.goto(page, options.url) progress.step("loading page loaded") except PlaywrightTimeoutError: navigation_status = "timeout" warnings.append("navigation timeout; capturing current page state") progress.step("loading navigation timeout") if options.wait_for_article: try: page.wait_for_selector("article", timeout=10_000) except PlaywrightTimeoutError: warnings.append("
selector not found; capturing fallback") if options.wait_after_load > 0: time.sleep(min(10.0, max(0.0, options.wait_after_load))) progress.step("loading stabilized") progress.step("capturing preparing") if options.replace_video_posters: page.evaluate( """ document.querySelectorAll('video').forEach(v => { if (v.poster) { const img = document.createElement('img'); img.src = v.poster; img.style.maxWidth = '100%'; img.style.borderRadius = '12px'; v.replaceWith(img); } }); """ ) removed_ads = 0 if options.adblock: removed_ads = _remove_ad_elements(page) blocked_count = int((adblock_state or {}).get("blocked", 0)) if blocked_count or removed_ads: warnings.append( f"adblock filtered {blocked_count} request(s) and removed {removed_ads} page element(s)" ) return navigation_status def _capture_selector_screenshot( page: Any, selector: str, destination: Path, format_name: str, selector_timeout_ms: int, quality_level: int, ) -> None: selector_text = str(selector or "").strip() if not selector_text: raise ScreenshotError("No selector was provided for element capture") timeout_ms = max(10_000, int(selector_timeout_ms or 0)) locator = page.locator(selector_text).first locator.wait_for(state="visible", timeout=timeout_ms) try: page.add_style_tag( content=( "*,*::before,*::after{animation:none !important;transition:none !important;" "scroll-behavior:auto !important;}" ) ) except Exception: pass try: locator.scroll_into_view_if_needed(timeout=min(timeout_ms, 2_500)) except Exception: pass try: locator.evaluate( """ async (element) => { const media = Array.from( element.querySelectorAll('img,video,iframe') ); const pending = media.map((node) => { if (node instanceof HTMLImageElement) { if (node.complete) { return Promise.resolve(); } return new Promise((resolve) => { const done = () => resolve(); node.addEventListener('load', done, { once: true }); node.addEventListener('error', done, { once: true }); setTimeout(done, 1500); }); } return Promise.resolve(); }); if (pending.length) { await Promise.allSettled(pending); } try { if (document.fonts && document.fonts.ready) { await Promise.race([ document.fonts.ready, new Promise((resolve) => setTimeout(resolve, 1500)), ]); } } catch (e) {} } """ ) except Exception: pass def _read_clip() -> Optional[Dict[str, float]]: try: clip_value = locator.bounding_box() except Exception: clip_value = None if not isinstance(clip_value, dict): return None try: return { "x": max(0.0, float(clip_value.get("x") or 0.0)), "y": max(0.0, float(clip_value.get("y") or 0.0)), "width": max(1.0, float(clip_value.get("width") or 0.0)), "height": max(1.0, float(clip_value.get("height") or 0.0)), } except Exception: return None def _read_page_rect() -> Optional[Dict[str, float]]: try: rect_value = locator.evaluate( """ (element) => { const rect = element.getBoundingClientRect(); return { x: Math.max(0, rect.left + window.scrollX), y: Math.max(0, rect.top + window.scrollY), width: Math.max(1, rect.width), height: Math.max(1, rect.height), }; } """ ) except Exception: rect_value = None if not isinstance(rect_value, dict): return None try: return { "x": max(0.0, float(rect_value.get("x") or 0.0)), "y": max(0.0, float(rect_value.get("y") or 0.0)), "width": max(1.0, float(rect_value.get("width") or 0.0)), "height": max(1.0, float(rect_value.get("height") or 0.0)), } except Exception: return None def _read_viewport_rect() -> Optional[Dict[str, float]]: try: rect_value = locator.evaluate( """ (element) => { const rect = element.getBoundingClientRect(); return { left: rect.left, top: rect.top, right: rect.right, bottom: rect.bottom, width: rect.width, height: rect.height, }; } """ ) except Exception: rect_value = None if not isinstance(rect_value, dict): return None try: return { "left": float(rect_value.get("left") or 0.0), "top": float(rect_value.get("top") or 0.0), "right": float(rect_value.get("right") or 0.0), "bottom": float(rect_value.get("bottom") or 0.0), "width": max(1.0, float(rect_value.get("width") or 0.0)), "height": max(1.0, float(rect_value.get("height") or 0.0)), } except Exception: return None def _read_scroll_metrics() -> Dict[str, float]: try: metrics_value = page.evaluate( """ () => { const root = document.documentElement || document.body; const body = document.body; const scrollHeight = Math.max( root ? root.scrollHeight || 0 : 0, body ? body.scrollHeight || 0 : 0, ); const innerWidth = window.innerWidth || 0; const innerHeight = window.innerHeight || 0; return { scrollX: window.scrollX || window.pageXOffset || 0, scrollY: window.scrollY || window.pageYOffset || 0, innerWidth, innerHeight, maxScrollY: Math.max(0, scrollHeight - innerHeight), }; } """ ) except Exception: metrics_value = None if not isinstance(metrics_value, dict): return { "scrollX": 0.0, "scrollY": 0.0, "innerWidth": max(1.0, current_viewport_width), "innerHeight": max(1.0, current_viewport_height), "maxScrollY": 0.0, } try: return { "scrollX": max(0.0, float(metrics_value.get("scrollX") or 0.0)), "scrollY": max(0.0, float(metrics_value.get("scrollY") or 0.0)), "innerWidth": max(1.0, float(metrics_value.get("innerWidth") or current_viewport_width or 1.0)), "innerHeight": max(1.0, float(metrics_value.get("innerHeight") or current_viewport_height or 1.0)), "maxScrollY": max(0.0, float(metrics_value.get("maxScrollY") or 0.0)), } except Exception: return { "scrollX": 0.0, "scrollY": 0.0, "innerWidth": max(1.0, current_viewport_width), "innerHeight": max(1.0, current_viewport_height), "maxScrollY": 0.0, } stable_clip: Optional[Dict[str, float]] = None stable_reads = 0 previous_clip: Optional[Dict[str, float]] = None for _ in range(12): current_clip = _read_clip() if current_clip is None: time.sleep(0.15) continue if previous_clip is not None: dx = abs(current_clip["x"] - previous_clip["x"]) dy = abs(current_clip["y"] - previous_clip["y"]) dw = abs(current_clip["width"] - previous_clip["width"]) dh = abs(current_clip["height"] - previous_clip["height"]) if max(dx, dy, dw, dh) <= 1.0: stable_reads += 1 else: stable_reads = 0 previous_clip = current_clip stable_clip = current_clip if stable_reads >= 2: break time.sleep(0.15) clip = stable_clip if clip is None: raise ScreenshotError(f"Could not measure selector '{selector_text}'") x = clip["x"] y = clip["y"] width = clip["width"] height = clip["height"] page_rect = _read_page_rect() if page_rect is None: raise ScreenshotError(f"Could not read page coordinates for selector '{selector_text}'") viewport_size = None try: viewport_size = page.viewport_size except Exception: viewport_size = None try: current_viewport_width = max(1.0, float((viewport_size or {}).get("width") or 0.0)) current_viewport_height = max(1.0, float((viewport_size or {}).get("height") or 0.0)) except Exception: current_viewport_width = 0.0 current_viewport_height = 0.0 required_width = max(1.0, x + width + 8.0) if required_width > current_viewport_width: try: page.set_viewport_size( { "width": int(max(current_viewport_width, required_width)), "height": int(max(current_viewport_height, 1.0)), } ) try: locator.scroll_into_view_if_needed(timeout=min(timeout_ms, 2_500)) except Exception: pass time.sleep(0.25) clip = _read_clip() if clip is None: raise ScreenshotError(f"Could not re-measure selector '{selector_text}' after viewport resize") x = clip["x"] y = clip["y"] width = clip["width"] height = clip["height"] page_rect = _read_page_rect() if page_rect is None: raise ScreenshotError(f"Could not re-read page coordinates for selector '{selector_text}'") current_viewport_width = max(current_viewport_width, required_width) except Exception as exc: raise ScreenshotError(f"Could not resize viewport for selector '{selector_text}': {exc}") from exc if height > max(1.0, current_viewport_height - 8.0): try: from PIL import Image except Exception as exc: raise ScreenshotError( f"Pillow is required for tall element capture: {exc}" ) from exc padding = 2.0 output_left = max(0.0, page_rect["x"] - padding) output_top = max(0.0, page_rect["y"] - padding) output_width = max(1, int(page_rect["width"] + (padding * 2.0) + 0.9999)) output_height = max(1, int(page_rect["height"] + (padding * 2.0) + 0.9999)) canvas_mode = "RGB" if format_name == "jpeg" else "RGBA" canvas_bg = (255, 255, 255) if canvas_mode == "RGB" else (255, 255, 255, 0) stitched = Image.new(canvas_mode, (output_width, output_height), canvas_bg) stitched_bottom = 0 overlap_px = 24 step_cursor = 0 max_iterations = max(10, int((output_height / max(1.0, current_viewport_height)) * 6.0) + 12) try: for _ in range(max_iterations): metrics = _read_scroll_metrics() desired_scroll_y = min( metrics["maxScrollY"], max(0.0, output_top + float(step_cursor)), ) page.evaluate("(y) => window.scrollTo(0, y)", desired_scroll_y) page.wait_for_timeout(125) try: locator.evaluate( """ async () => { await new Promise((resolve) => requestAnimationFrame(() => requestAnimationFrame(resolve))); } """ ) except Exception: pass metrics = _read_scroll_metrics() viewport_rect = _read_viewport_rect() if viewport_rect is None: continue visible_left = max(0.0, viewport_rect["left"] - padding) visible_top = max(0.0, viewport_rect["top"] - padding) visible_right = min(metrics["innerWidth"], viewport_rect["right"] + padding) visible_bottom = min(metrics["innerHeight"], viewport_rect["bottom"] + padding) if visible_right <= visible_left or visible_bottom <= visible_top: if metrics["scrollY"] >= metrics["maxScrollY"]: break step_cursor += max(1, int(metrics["innerHeight"] * 0.6)) continue clip_box = { "x": float(int(visible_left)), "y": float(int(visible_top)), "width": float(int((visible_right - visible_left) + 0.9999)), "height": float(int((visible_bottom - visible_top) + 0.9999)), } piece_bytes = page.screenshot( timeout=timeout_ms, type="png", clip=clip_box, ) capture_page_x = metrics["scrollX"] + visible_left capture_page_y = metrics["scrollY"] + visible_top paste_x = int(round(capture_page_x - output_left)) paste_y = int(round(capture_page_y - output_top)) with Image.open(io.BytesIO(piece_bytes)) as piece_image: if canvas_mode == "RGB": piece = piece_image.convert("RGB") else: piece = piece_image.convert("RGBA") crop_left = max(0, -paste_x) crop_top = max(0, -paste_y) crop_right = min(piece.width, output_width - paste_x) crop_bottom = min(piece.height, output_height - paste_y) if crop_right <= crop_left or crop_bottom <= crop_top: continue if crop_left or crop_top or crop_right != piece.width or crop_bottom != piece.height: piece = piece.crop((crop_left, crop_top, crop_right, crop_bottom)) dest_x = max(0, paste_x + crop_left) dest_y = max(0, paste_y + crop_top) stitched.paste(piece, (dest_x, dest_y)) piece_bottom = dest_y + piece.height if piece_bottom <= stitched_bottom + 1: if metrics["scrollY"] >= metrics["maxScrollY"]: break step_cursor += max(1, int(metrics["innerHeight"] * 0.6)) continue stitched_bottom = max(stitched_bottom, piece_bottom) if stitched_bottom >= output_height: break step_cursor = max(0, stitched_bottom - overlap_px) if stitched_bottom <= 0: raise ScreenshotError( f"Could not capture stitched slices for selector '{selector_text}'" ) save_kwargs: Dict[str, Any] = {} if format_name == "jpeg": save_kwargs.update({"format": "JPEG", "quality": _jpeg_quality_from_level(quality_level)}) else: save_kwargs.update({"format": "PNG"}) stitched.save(destination, **save_kwargs) return except ScreenshotError: raise except Exception as exc: raise ScreenshotError( f"Could not stitch tall selector capture for '{selector_text}': {exc}" ) from exc padding = 2.0 x = max(0.0, x - padding) y = max(0.0, y - padding) width = max(1.0, width + (padding * 2.0)) height = max(1.0, height + (padding * 2.0)) clip_box: Dict[str, float] = { "x": float(int(x)), "y": float(int(y)), "width": float(int(width + 0.9999)), "height": float(int(height + 0.9999)), } screenshot_kwargs: Dict[str, Any] = { "path": str(destination), "timeout": timeout_ms, "clip": clip_box, } if format_name == "jpeg": screenshot_kwargs["type"] = "jpeg" screenshot_kwargs["quality"] = _jpeg_quality_from_level(quality_level) page.screenshot(**screenshot_kwargs) def _capture_mhtml(page: Any, destination: Path) -> None: session = None try: context = getattr(page, "context", None) if context is None or not hasattr(context, "new_cdp_session"): raise ScreenshotError("MHTML output requires Chromium CDP session support") session = context.new_cdp_session(page) session.send("Page.enable") snapshot = session.send("Page.captureSnapshot", {"format": "mhtml"}) data = snapshot.get("data") if isinstance(snapshot, dict) else None if not data: raise ScreenshotError("Chromium did not return any MHTML snapshot data") destination.write_text(str(data), encoding="utf-8", newline="") except ScreenshotError: raise except Exception as exc: raise ScreenshotError(f"Could not capture MHTML snapshot: {exc}") from exc finally: if session is not None: try: session.detach() except Exception: pass def _convert_to_webp( src_png: Path, dst_webp: Path, *, quality: int = 90, method: int = 6, lossless: bool = False, max_dim: int = WEBP_MAX_DIM, downscale_if_oversize: bool = True, ) -> bool: """Convert a PNG screenshot to WebP via Pillow. Playwright does not currently support emitting WebP directly. """ if not src_png or not Path(src_png).is_file(): raise ScreenshotError(f"Source image not found: {src_png}") dst_webp = Path(dst_webp) try: dst_webp.parent.mkdir(parents=True, exist_ok=True) except Exception: pass try: from PIL import Image except Exception as exc: raise ScreenshotError(f"Pillow is required for webp conversion: {exc}") from exc # Write atomically to avoid partial files if conversion is interrupted. tmp_path = unique_path(dst_webp.with_suffix(".tmp.webp")) try: with Image.open(src_png) as im: did_downscale = False save_kwargs: Dict[str, Any] = { "format": "WEBP", "quality": int(quality), "method": int(method), "lossless": bool(lossless), } # Preserve alpha when present; Pillow handles it for WEBP. # Normalize palette images to RGBA to avoid odd palette artifacts. if im.mode == "P": im = im.convert("RGBA") # WebP enforces a hard max dimension per side (16383px). # When full-page captures are very tall, downscale proportionally to fit. try: w, h = im.size except Exception: w, h = 0, 0 if (downscale_if_oversize and isinstance(max_dim, int) and max_dim > 0 and (w > max_dim or h > max_dim)): scale = 1.0 try: scale = min(float(max_dim) / float(w), float(max_dim) / float(h)) except Exception: scale = 1.0 if scale > 0.0 and scale < 1.0: new_w = max(1, int(w * scale)) new_h = max(1, int(h * scale)) try: resample = getattr( getattr(Image, "Resampling", Image), "LANCZOS", None ) if resample is None: resample = getattr(Image, "LANCZOS", 1) im = im.resize((new_w, new_h), resample=resample) did_downscale = True except Exception: pass im.save(tmp_path, **save_kwargs) tmp_path.replace(dst_webp) return bool(did_downscale) finally: try: tmp_path.unlink(missing_ok=True) except Exception: pass def _matched_site_selectors(url: str) -> List[str]: """Return SITE_SELECTORS for a matched domain; empty if no match. Unlike `_selectors_for_url()`, this does not return a generic fallback. """ u = str(url or "").lower() sels: List[str] = [] for domain, selectors in SITE_SELECTORS.items(): if domain in u: sels.extend(selectors) return sels def _selectors_for_url(url: str) -> List[str]: """Return selectors to try for a URL. For now, prefer a minimal behavior: only return known SITE_SELECTORS. (The cmdlet already falls back to full-page capture when no selectors match.) """ return _matched_site_selectors(url) def _platform_preprocess( url: str, page: Any, warnings: List[str], timeout_ms: int = 10_000 ) -> None: """Best-effort page tweaks for popular platforms before capture.""" try: u = str(url or "").lower() def _try_click_buttons( names: List[str], passes: int = 2, per_timeout: int = 700 ) -> int: clicks = 0 for _ in range(max(1, int(passes))): for name in names: try: locator = page.get_by_role("button", name=name) locator.first.click(timeout=int(per_timeout)) clicks += 1 except Exception: pass return clicks # Dismiss common cookie / consent prompts. _try_click_buttons( [ "Accept all", "Accept", "I agree", "Agree", "Allow all", "OK", ] ) # Some sites need small nudges (best-effort). if "reddit.com" in u: _try_click_buttons(["Accept all", "Accept"]) if ("twitter.com" in u) or ("x.com" in u): _try_click_buttons(["Accept all", "Accept"]) if "instagram.com" in u: _try_click_buttons(["Allow all", "Accept all", "Accept"]) except Exception: return def _submit_wayback(url: str, timeout: float) -> Optional[str]: encoded = quote(url, safe="/:?=&") with HTTPClient(headers={ "User-Agent": USER_AGENT }) as client: response = client.get(f"https://web.archive.org/save/{encoded}") content_location = response.headers.get("Content-Location") if content_location: return urljoin("https://web.archive.org", content_location) return str(response.url) def _submit_archive_today(url: str, timeout: float) -> Optional[str]: """Submit URL to Archive.today.""" encoded = quote(url, safe=":/?#[]@!$&'()*+,;=") with HTTPClient(headers={ "User-Agent": USER_AGENT }) as client: response = client.get(f"https://archive.today/submit/?url={encoded}") response.raise_for_status() final = str(response.url) if final and ("archive.today" in final or "archive.ph" in final): return final return None def _submit_archive_ph(url: str, timeout: float) -> Optional[str]: """Submit URL to Archive.ph.""" encoded = quote(url, safe=":/?#[]@!$&'()*+,;=") with HTTPClient(headers={ "User-Agent": USER_AGENT }) as client: response = client.get(f"https://archive.ph/submit/?url={encoded}") response.raise_for_status() final = str(response.url) if final and "archive.ph" in final: return final return None def _archive_url(url: str, timeout: float) -> Tuple[List[str], List[str]]: """Submit URL to all available archive services.""" archives: List[str] = [] warnings: List[str] = [] archive_status: List[tuple[str, Any]] = [] for submitter, label in ( (_submit_wayback, "wayback"), (_submit_archive_today, "archive.today"), (_submit_archive_ph, "archive.ph"), ): try: archived = submitter(url, timeout) except httpx.HTTPStatusError as exc: if exc.response.status_code == 429: warnings.append(f"archive {label} rate limited (HTTP 429)") archive_status.append((label, "rate limited (HTTP 429)")) else: warnings.append( f"archive {label} failed: HTTP {exc.response.status_code}" ) archive_status.append((label, f"HTTP {exc.response.status_code}")) except httpx.RequestError as exc: warnings.append(f"archive {label} failed: {exc}") archive_status.append((label, f"connection error: {exc}")) except Exception as exc: warnings.append(f"archive {label} failed: {exc}") archive_status.append((label, exc)) else: if archived: archives.append(archived) archive_status.append((label, archived)) else: archive_status.append((label, "no archive link returned")) if is_debug_enabled() and archive_status: _show_debug_panel( "Screenshot Archive", [("url", url), *archive_status], ) return archives, warnings def _prepare_output_path(options: ScreenshotOptions) -> Path: """Prepare and validate output path for screenshot.""" ensure_directory(options.output_dir) explicit_format = _normalize_format( options.output_format ) if options.output_format else None inferred_format: Optional[str] = None if options.output_path is not None: path = options.output_path if not path.is_absolute(): path = options.output_dir / path suffix = path.suffix.lower() if suffix: inferred_format = _normalize_format(suffix[1:]) else: stamp = time.strftime("%Y%m%d_%H%M%S") filename = f"{_slugify_url(options.url)}_{stamp}" path = options.output_dir / filename final_format = explicit_format or inferred_format or "png" if not path.suffix: path = path.with_suffix(_format_suffix(final_format)) else: current_suffix = path.suffix.lower() expected = _format_suffix(final_format) if current_suffix != expected: path = path.with_suffix(expected) options.output_format = final_format return unique_path(path) def _capture( options: ScreenshotOptions, destination: Path, warnings: List[str], progress: PipelineProgress ) -> tuple[str, str]: """Capture screenshot using Playwright.""" capture_mode = "full-page" capture_target = "" try: progress.step("loading launching browser") tool = options.playwright_tool or PlaywrightTool({}) # Ensure Chromium engine is used for the screen-shot cmdlet (force for consistency) try: current_browser = ( getattr(tool.defaults, "browser", "").lower() if getattr(tool, "defaults", None) is not None else "" ) if current_browser != "chromium": base_cfg = {} try: base_cfg = dict(getattr(tool, "_config", {}) or {}) except Exception: base_cfg = {} tool_block = dict(base_cfg.get("tool") or {} ) if isinstance(base_cfg, dict) else {} pw_block = ( dict(tool_block.get("playwright") or {}) if isinstance(tool_block, dict) else {} ) pw_block["browser"] = "chromium" tool_block["playwright"] = pw_block if isinstance(base_cfg, dict): base_cfg["tool"] = tool_block tool = PlaywrightTool(base_cfg) except Exception: tool = PlaywrightTool({ "tool": { "playwright": { "browser": "chromium" } } }) format_name = _normalize_format(options.output_format) capture_headless = bool(options.headless) picker_headless = capture_headless if options.interactive_pick and _format_supports_target_selection(format_name): picker_headless = False capture_headless = True elif format_name == "pdf": picker_headless = True capture_headless = True if is_debug_enabled(): defaults = getattr(tool, "defaults", None) _show_debug_panel( "Screenshot Config", [ ("url", options.url), ("format", _normalize_format(options.output_format)), ("quality", options.quality), ("browser", getattr(defaults, "browser", "unknown") if defaults else "unknown"), ("headless", getattr(defaults, "headless", "unknown") if defaults else "unknown"), ( "viewport", ( f"{getattr(defaults, 'viewport_width', '?')}x{getattr(defaults, 'viewport_height', '?')}" if defaults else "" ), ), ("timeout", f"{getattr(defaults, 'navigation_timeout_ms', '?')}ms" if defaults else ""), ("full_page", options.full_page), ("interactive_pick", options.interactive_pick), ("picker_headless", picker_headless), ("capture_headless", capture_headless), ("target_selectors", list(options.target_selectors or [])), ("destination", destination), ], border_style="magenta", ) navigation_status = "loaded" if format_name == "pdf" and not options.headless: warnings.append( "pdf output requires headless Chromium; overriding headless mode" ) if not _format_supports_target_selection(format_name): if options.interactive_pick: warnings.append( f"{format_name} output captures the full page; interactive element picking is ignored" ) if options.prefer_platform_target: warnings.append( f"{format_name} output captures the full page; selector targeting is ignored" ) try: element_captured = False if options.interactive_pick and _format_supports_target_selection(format_name): selected_selector = "" with tool.open_page( headless=picker_headless, emulate_viewport=picker_headless, start_maximized=not picker_headless, ) as page: navigation_status = _prepare_capture_page( tool, page, options, warnings, progress, ) progress.step("capturing locating target") picked = _interactive_pick_selector( page, timeout_s=options.interactive_pick_timeout_s, ) selected_selector = str(picked.get("selector") or "").strip() if not selected_selector: raise ScreenshotError("Element picker did not return a valid selector") capture_mode = "interactive" capture_target = selected_selector progress.step("loading launching browser") with tool.open_page(headless=capture_headless) as page: navigation_status = _prepare_capture_page( tool, page, options, warnings, progress, ) progress.step("capturing output") _capture_selector_screenshot( page, selected_selector, destination, format_name, options.selector_timeout_ms, options.quality, ) element_captured = True else: with tool.open_page(headless=capture_headless) as page: navigation_status = _prepare_capture_page( tool, page, options, warnings, progress, ) # Attempt platform-specific target capture if requested (and not PDF) if options.prefer_platform_target and _format_supports_target_selection(format_name): progress.step("capturing locating target") try: _platform_preprocess(options.url, page, warnings) except Exception: pass selectors = list(options.target_selectors or []) if not selectors: selectors = _selectors_for_url(options.url) for sel in selectors: try: _capture_selector_screenshot( page, sel, destination, format_name, options.selector_timeout_ms, options.quality, ) element_captured = True capture_mode = "selector" capture_target = sel break except PlaywrightTimeoutError: continue except Exception as exc: warnings.append( f"element capture failed for '{sel}': {exc}" ) # Fallback to default capture paths if not element_captured: if format_name == "pdf": capture_mode = "pdf" page.emulate_media(media="print") progress.step("capturing output") page.pdf(path=str(destination), print_background=True) elif format_name == "mhtml": capture_mode = "mhtml" progress.step("capturing output") _capture_mhtml(page, destination) else: screenshot_kwargs: Dict[str, Any] = { "path": str(destination) } if format_name == "jpeg": screenshot_kwargs["type"] = "jpeg" screenshot_kwargs["quality"] = _jpeg_quality_from_level(options.quality) if options.full_page: progress.step("capturing output") page.screenshot(full_page=True, **screenshot_kwargs) capture_mode = "full-page" else: article = page.query_selector("article") if article is not None: article_kwargs = dict(screenshot_kwargs) article_kwargs.pop("full_page", None) progress.step("capturing output") article.screenshot(**article_kwargs) capture_mode = "article" capture_target = "article" else: progress.step("capturing output") page.screenshot(**screenshot_kwargs) capture_mode = "page" if element_captured or capture_mode: progress.step("capturing saved") if is_debug_enabled(): _show_debug_panel( "Screenshot Capture", [ ("url", options.url), ("navigation", navigation_status), ("mode", capture_mode), ("target", capture_target), ("wait_after_load_s", options.wait_after_load), ("warnings", len(warnings)), ("saved_to", destination), ], ) except Exception as exc: if is_debug_enabled(): _show_debug_panel( "Screenshot Error", [ ("url", options.url), ("destination", destination), ("error", exc), ], border_style="red", ) msg = str(exc).lower() if any(k in msg for k in ["executable", "not found", "no such file", "cannot find", "install"]): raise ScreenshotError( "Chromium Playwright browser binaries not found. Install them: python ./scripts/bootstrap.py --playwright-only --browsers chromium" ) from exc raise except ScreenshotError: # Re-raise ScreenshotError raised intentionally (do not wrap) raise except Exception as exc: raise ScreenshotError(f"Failed to capture screenshot: {exc}") from exc return capture_mode, capture_target def _capture_screenshot( options: ScreenshotOptions, progress: PipelineProgress ) -> ScreenshotResult: """Capture a screenshot for the given options.""" requested_format = _normalize_format(options.output_format) destination = _prepare_output_path(options) warnings: List[str] = [] capture_mode = "" capture_target = "" will_target = bool(options.prefer_platform_target or options.interactive_pick) and _format_supports_target_selection(requested_format) will_convert = requested_format == "webp" will_archive = bool(options.archive and options.url) interactive_extra_steps = 5 if (options.interactive_pick and _format_supports_target_selection(requested_format)) else 0 total_steps = ( 9 + (1 if will_target else 0) + interactive_extra_steps + (1 if will_convert else 0) + (1 if will_archive else 0) ) progress.begin_steps(total_steps) progress.step("loading starting") # Playwright screenshots do not natively support WebP output. # Capture as PNG, then convert via Pillow. capture_path = destination if requested_format == "webp": capture_path = unique_path(destination.with_suffix(".png")) options.output_format = "png" capture_mode, capture_target = _capture(options, capture_path, warnings, progress) if requested_format == "webp": progress.step("capturing converting to webp") try: webp_settings = _webp_quality_settings(options.quality) did_downscale = _convert_to_webp( capture_path, destination, quality=int(webp_settings["quality"]), method=int(webp_settings["method"]), lossless=bool(webp_settings["lossless"]), ) if did_downscale: try: destination.unlink(missing_ok=True) except Exception: pass destination = capture_path warnings.append( f"webp conversion required downscaling to fit {WEBP_MAX_DIM}px limit; using original png instead: {capture_path.name}" ) else: try: capture_path.unlink(missing_ok=True) except Exception: pass except Exception as exc: warnings.append(f"webp conversion failed; keeping png: {exc}") destination = capture_path # Build URL list from captured url and any archives url: List[str] = [options.url] if options.url else [] archive_url: List[str] = [] if options.archive and options.url: progress.step("capturing archiving") archives, archive_warnings = _archive_url(options.url, options.archive_timeout) archive_url.extend(archives) warnings.extend(archive_warnings) if archives: url = unique_preserve_order([*url, *archives]) progress.step("capturing finalized") applied_tag = unique_preserve_order(list(tag for tag in options.tag if tag.strip())) if is_debug_enabled(): _show_debug_panel( "Screenshot Output", [ ("url", options.url), ("requested_format", requested_format), ("path", destination), ("capture_mode", capture_mode), ("capture_target", capture_target), ("archives", archive_url), ("warnings", warnings), ], ) return ScreenshotResult( path=destination, tag_applied=applied_tag, archive_url=archive_url, url=url, capture_mode=capture_mode, capture_target=capture_target, warnings=warnings, ) # ============================================================================ # Main Cmdlet Function # ============================================================================ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: """Take screenshots of URL inputs from args or pipeline items.""" if should_show_help(args): log(f"Cmdlet: {CMDLET.name}\nSummary: {CMDLET.summary}\nUsage: {CMDLET.usage}") return 0 progress = PipelineProgress(pipeline_context) parsed = parse_cmdlet_args(args, CMDLET) format_value = parsed.get("format") capture_mode_value = _normalize_capture_mode(parsed.get("capture_mode")) raw_quality_value = parsed.get("quality") adblock_value = parsed.get("adblock") quality_value: Optional[int] = None if not format_value: try: tool_cfg = config.get("tool", {}) if isinstance(config, dict) else {} pw_cfg = tool_cfg.get("playwright") if isinstance(tool_cfg, dict) else None if isinstance(pw_cfg, dict): format_value = pw_cfg.get("format") except Exception: pass if not format_value: format_value = "webp" if raw_quality_value not in (None, ""): quality_value = _normalize_quality(raw_quality_value) else: try: tool_cfg = config.get("tool", {}) if isinstance(config, dict) else {} pw_cfg = tool_cfg.get("playwright") if isinstance(tool_cfg, dict) else None if isinstance(pw_cfg, dict) and pw_cfg.get("screenshot_quality") not in (None, ""): quality_value = _normalize_quality(pw_cfg.get("screenshot_quality")) except Exception: quality_value = None if quality_value is None: quality_value = _normalize_quality(None) adblock_enabled = _normalize_bool(adblock_value, default=True) storage_value = parsed.get("storage") selector_arg = parsed.get("selector") selectors = [selector_arg] if selector_arg else [] archive_enabled = parsed.get("archive", False) url_arg = parsed.get("url") positional_url = [str(url_arg)] if url_arg else [] url_to_process: List[Tuple[str, Any]] = [] if positional_url: url_to_process = [(u, None) for u in positional_url] else: piped_results = normalize_result_input(result) if piped_results: for item in piped_results: url = get_field(item, "path") or get_field(item, "url") or get_field(item, "target") if url: url_to_process.append((str(url), item)) if not url_to_process: log("No url to process for screen-shot cmdlet", file=sys.stderr) return 1 screenshot_dir: Optional[Path] = None screenshot_dir_source = "default temp" if storage_value: try: screenshot_dir = SharedArgs.resolve_storage(storage_value) screenshot_dir_source = f"--storage {storage_value}" except ValueError as exc: log(str(exc), file=sys.stderr) return 1 if screenshot_dir is None and resolve_output_dir is not None: try: screenshot_dir = resolve_output_dir(config) screenshot_dir_source = "config resolver" except Exception: pass if screenshot_dir is None and config and config.get("outfile"): try: screenshot_dir = Path(config["outfile"]).expanduser() screenshot_dir_source = "config outfile" except Exception: pass if screenshot_dir is None: screenshot_dir = Path(tempfile.gettempdir()) ensure_directory(screenshot_dir) format_name = _normalize_format(format_value) filtered_selectors = [str(s).strip() for s in selectors if str(s).strip()] manual_target_selectors = filtered_selectors if filtered_selectors else None interactive_default = bool(len(url_to_process) == 1 and _stdin_interactive()) if is_debug_enabled(): _show_debug_panel( "screen-shot", [ ("args", list(args)), ("url_count", len(url_to_process)), ("urls", [u for u, _ in url_to_process]), ("archive", archive_enabled), ("format", format_name), ("quality", quality_value), ("adblock", adblock_enabled), ("capture_mode", capture_mode_value or ("interactive" if interactive_default and _format_supports_target_selection(format_name) else "auto")), ("output_dir", screenshot_dir), ("output_dir_source", screenshot_dir_source), ], ) try: progress.ensure_local_ui( label="screen-shot", total_items=len(url_to_process), items_preview=[u for u, _ in url_to_process], ) except Exception: pass shared_playwright_tool: Optional[PlaywrightTool] = None try: if isinstance(config, dict): tool_block = dict(config.get("tool") or {}) pw_block = dict(tool_block.get("playwright") or {}) pw_block["browser"] = "chromium" pw_block["user_agent"] = "native" pw_block["viewport_width"] = int(DEFAULT_VIEWPORT.get("width", 1920)) pw_block["viewport_height"] = int(DEFAULT_VIEWPORT.get("height", 1080)) tool_block["playwright"] = pw_block pw_local_cfg = dict(config) pw_local_cfg["tool"] = tool_block else: pw_local_cfg = { "tool": { "playwright": { "browser": "chromium", "user_agent": "native", "viewport_width": int(DEFAULT_VIEWPORT.get("width", 1920)), "viewport_height": int(DEFAULT_VIEWPORT.get("height", 1080)), } } } shared_playwright_tool = PlaywrightTool(pw_local_cfg) except Exception: shared_playwright_tool = None all_emitted = [] exit_code = 0 def _extract_item_tags(item: Any) -> List[str]: return extract_item_tags(item) def _extract_item_title(item: Any) -> str: return get_result_title(item, "title", "name", "filename") or "" def _clean_title(text: str) -> str: value = (text or "").strip() if value.lower().startswith("screenshot:"): value = value.split(":", 1)[1].strip() return value for url, origin_item in url_to_process: if not url.lower().startswith(("http://", "https://", "file://")): log(f"[screen_shot] Skipping non-URL input: {url}", file=sys.stderr) continue try: options = ScreenshotOptions( url=url, output_dir=screenshot_dir, output_format=format_name, archive=archive_enabled, target_selectors=None, prefer_platform_target=False, wait_for_article=False, full_page=True, interactive_pick=False, quality=quality_value, adblock=adblock_enabled, playwright_tool=shared_playwright_tool, ) auto_selectors = _matched_site_selectors(url) if manual_target_selectors: options.prefer_platform_target = True options.target_selectors = manual_target_selectors elif capture_mode_value == "full": options.prefer_platform_target = False options.target_selectors = None elif capture_mode_value == "interactive": options.interactive_pick = True elif interactive_default and _format_supports_target_selection(format_name): options.interactive_pick = True elif auto_selectors: options.prefer_platform_target = True options.target_selectors = auto_selectors screenshot_result = _capture_screenshot(options, progress) screenshot_hash = None try: screenshot_hash = sha256_file(screenshot_result.path) except Exception: pass try: capture_date = datetime.fromtimestamp(screenshot_result.path.stat().st_mtime).date().isoformat() except Exception: capture_date = datetime.now().date().isoformat() upstream_title = _clean_title(_extract_item_title(origin_item)) url_title = _title_from_url(url) display_title = upstream_title or url_title or url upstream_tags = _extract_item_tags(origin_item) filtered_upstream_tags = [ tag for tag in upstream_tags if not str(tag).strip().lower().startswith(("type:", "date:")) ] url_tags = _tags_from_url(url) merged_tags = unique_preserve_order( ["type:screenshot", f"date:{capture_date}"] + filtered_upstream_tags + url_tags ) pipe_obj = create_pipe_object_result( source="screenshot", store="PATH", identifier=Path(screenshot_result.path).stem, file_path=str(screenshot_result.path), cmdlet_name="screen-shot", title=display_title, hash_value=screenshot_hash, is_temp=True, parent_hash=hashlib.sha256(url.encode()).hexdigest(), tag=merged_tags, url=url, source_url=url, extra={ "source_url": url, "archive_url": screenshot_result.archive_url, "url": screenshot_result.url, "target": str(screenshot_result.path), }, ) pipeline_context.emit(pipe_obj) all_emitted.append(pipe_obj) if is_debug_enabled(): _show_debug_panel( "screen-shot output", [ ("path", screenshot_result.path), ("hash", screenshot_hash), ("title", display_title), ("capture_mode", screenshot_result.capture_mode), ("capture_target", screenshot_result.capture_target), ("tags", merged_tags), ("archives", screenshot_result.archive_url), ("warnings", screenshot_result.warnings), ], ) progress.on_emit(pipe_obj) except ScreenshotError as exc: log(f"Error taking screenshot of {url}: {exc}", file=sys.stderr) exit_code = 1 except Exception as exc: log(f"Unexpected error taking screenshot of {url}: {exc}", file=sys.stderr) import traceback traceback.print_exc(file=sys.stderr) exit_code = 1 progress.close_local_ui(force_complete=True) if not all_emitted: log("No screenshots were successfully captured", file=sys.stderr) return 1 log(f"✓ Successfully captured {len(all_emitted)} screenshot(s)") return exit_code CMDLET = Cmdlet( name="screen-shot", summary="Capture a website screenshot", usage="screen-shot [options] [-query \"format:webp quality:10 mode:full\"]", alias=["screenshot", "ss"], arg=[ SharedArgs.URL, sh.QueryArg( "format", key="format", type="string", choices=["webp", "png", "jpeg", "jpg", "pdf", "mhtml", "mht"], query_only=True, description="Output format via -query, e.g. format:webp, format:pdf, or format:mhtml" ), sh.QueryArg( "capture_mode", key="mode", aliases=["capture", "mode"], choices=["full", "interactive"], query_only=True, description="Capture mode via -query, e.g. mode:full or mode:interactive" ), sh.QueryArg( "quality", key="quality", choices=["1", "2", "3", "4", "5", "6", "7", "8", "9", "10"], query_only=True, description="Screenshot quality via -query, 1-10. 10 uses highest quality and lossless webp." ), sh.QueryArg( "adblock", key="adblock", aliases=["ads", "blockads"], choices=["true", "false", "on", "off", "yes", "no", "1", "0"], handler=lambda value: _normalize_bool(value, default=True), query_only=True, description="Ad and tracker blocking via -query. Defaults to true; use adblock:false to disable." ), CmdletArg( name="selector", type="string", description="CSS selector for element capture" ), SharedArgs.PATH, SharedArgs.QUERY, ], detail=[ "Uses Playwright Chromium engine only. Install Chromium with: python ./scripts/bootstrap.py --playwright-only --browsers chromium", "PDF output requires headless Chromium (the cmdlet will enforce headless mode for PDF).", "MHTML output uses Chromium page snapshots to save the full page as a single archival file.", "Basic ad and tracker blocking is enabled by default during capture so MHTML archives are less likely to embed ad content.", "Screenshots are temporary artifacts stored in the configured `temp` directory.", "Interactive single-URL runs open a headful browser picker by default so you can hover and click the element to capture.", "Use -query \"mode:full\" to bypass the picker and capture the full page directly.", "Use -query \"format:webp\", \"format:pdf\", or \"format:mhtml\" to choose the output format.", "Use -query \"adblock:false\" if a site breaks and you need the raw unfiltered page.", "Use -query \"quality:1\" through \"quality:10\" to control jpeg/webp compression. quality:10 uses lossless webp.", ], ) CMDLET.exec = _run CMDLET.register()