"""Screen-shot cmdlet for capturing screenshots of url in a pipeline. This cmdlet processes files through the pipeline and creates screenshots using Playwright, marking them as temporary artifacts for cleanup. """ from __future__ import annotations import hashlib import sys import time from datetime import datetime import httpx from dataclasses import dataclass, field from pathlib import Path from typing import Any, Dict, List, Optional, Sequence, Tuple from urllib.parse import urlsplit, quote, urljoin, unquote from SYS.logger import log, debug from API.HTTP import HTTPClient from SYS.pipeline_progress import PipelineProgress from SYS.utils import ensure_directory, unique_path, unique_preserve_order from . import _shared as sh Cmdlet = sh.Cmdlet CmdletArg = sh.CmdletArg SharedArgs = sh.SharedArgs create_pipe_object_result = sh.create_pipe_object_result normalize_result_input = sh.normalize_result_input should_show_help = sh.should_show_help get_field = sh.get_field parse_cmdlet_args = sh.parse_cmdlet_args import pipeline as pipeline_context # ============================================================================ # CMDLET Metadata Declaration # ============================================================================ # ============================================================================ # Playwright & Screenshot Dependencies # ============================================================================ from tool.playwright import HAS_PLAYWRIGHT, PlaywrightTimeoutError, PlaywrightTool try: from config import resolve_output_dir except ImportError: try: _parent_dir = str(Path(__file__).parent.parent) if _parent_dir not in sys.path: sys.path.insert(0, _parent_dir) from config import resolve_output_dir except ImportError: resolve_output_dir = None # ============================================================================ # Screenshot Constants & Configuration # ============================================================================ USER_AGENT = ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/120.0.0.0 Safari/537.36" ) DEFAULT_VIEWPORT: dict[str, int] = {"width": 1920, "height": 1080} ARCHIVE_TIMEOUT = 30.0 # WebP has a hard maximum dimension per side. # Pillow typically fails with: "encoding error 5: Image size exceeds WebP limit of 16383 pixels" WEBP_MAX_DIM = 16_383 # Configurable selectors for specific websites SITE_SELECTORS: Dict[str, List[str]] = { "twitter.com": [ "article[role='article']", "div[data-testid='tweet']", "div[data-testid='cellInnerDiv'] article", ], "x.com": [ "article[role='article']", "div[data-testid='tweet']", "div[data-testid='cellInnerDiv'] article", ], "instagram.com": [ "article[role='presentation']", "article[role='article']", "div[role='dialog'] article", "section main article", ], "reddit.com": [ "shreddit-post", "div[data-testid='post-container']", "div[data-click-id='background']", "article", ], "rumble.com": [ "rumble-player, iframe.rumble", "div.video-item--main", "main article", ], } class ScreenshotError(RuntimeError): """Raised when screenshot capture or upload fails.""" @dataclass(slots=True) class ScreenshotOptions: """Options controlling screenshot capture and post-processing.""" output_dir: Path url: str = "" output_path: Optional[Path] = None full_page: bool = True headless: bool = True wait_after_load: float = 6.0 wait_for_article: bool = False replace_video_posters: bool = True tag: Sequence[str] = () archive: bool = False archive_timeout: float = ARCHIVE_TIMEOUT output_format: Optional[str] = None prefer_platform_target: bool = False target_selectors: Optional[Sequence[str]] = None selector_timeout_ms: int = 10_000 playwright_tool: Optional[PlaywrightTool] = None @dataclass(slots=True) class ScreenshotResult: """Details about the captured screenshot.""" path: Path tag_applied: List[str] archive_url: List[str] url: List[str] warnings: List[str] = field(default_factory=list) # ============================================================================ # Helper Functions # ============================================================================ def _slugify_url(url: str) -> str: """Convert URL to filesystem-safe slug.""" parsed = urlsplit(url) candidate = f"{parsed.netloc}{parsed.path}" if parsed.query: candidate += f"?{parsed.query}" slug = "".join(char if char.isalnum() else "-" for char in candidate.lower()) slug = slug.strip("-") or "screenshot" return slug[:100] def _tags_from_url(url: str) -> List[str]: """Derive simple tags from a URL. - site: (strips leading www.) - title: derived from the last path segment, with extension removed and separators (-, _, %) normalized to spaces. """ u = str(url or "").strip() if not u: return [] parsed = None try: parsed = urlsplit(u) host = str(getattr(parsed, "hostname", None) or getattr(parsed, "netloc", "") or "").strip().lower() except Exception: parsed = None host = "" if host: # Drop credentials and port if present. if "@" in host: host = host.rsplit("@", 1)[-1] if ":" in host: host = host.split(":", 1)[0] if host.startswith("www."): host = host[len("www.") :] path = "" if parsed is not None: try: path = str(getattr(parsed, "path", "") or "") except Exception: path = "" last = "" if path: try: last = path.rsplit("/", 1)[-1] except Exception: last = "" try: last = unquote(last or "") except Exception: last = last or "" if last and "." in last: # Drop a single trailing extension (e.g. .html, .php). last = last.rsplit(".", 1)[0] for sep in ("_", "-", "%"): if last and sep in last: last = last.replace(sep, " ") title = " ".join(str(last or "").split()).strip().lower() tags: List[str] = [] if host: tags.append(f"site:{host}") if title: tags.append(f"title:{title}") return tags def _title_from_url(url: str) -> str: """Return the normalized title derived from a URL's last path segment.""" for t in _tags_from_url(url): if str(t).lower().startswith("title:"): return str(t)[len("title:") :].strip() return "" def _normalise_format(fmt: Optional[str]) -> str: """Normalize output format to valid values.""" if not fmt: return "webp" value = fmt.strip().lower() if value in {"jpg", "jpeg"}: return "jpeg" if value in {"png", "pdf", "webp"}: return value return "webp" def _format_suffix(fmt: str) -> str: """Get file suffix for format.""" if fmt == "jpeg": return ".jpg" return f".{fmt}" def _convert_to_webp( src_png: Path, dst_webp: Path, *, quality: int = 90, method: int = 6, max_dim: int = WEBP_MAX_DIM, downscale_if_oversize: bool = True, ) -> bool: """Convert a PNG screenshot to WebP via Pillow. Playwright does not currently support emitting WebP directly. """ if not src_png or not Path(src_png).is_file(): raise ScreenshotError(f"Source image not found: {src_png}") dst_webp = Path(dst_webp) try: dst_webp.parent.mkdir(parents=True, exist_ok=True) except Exception: pass try: from PIL import Image except Exception as exc: raise ScreenshotError(f"Pillow is required for webp conversion: {exc}") from exc # Write atomically to avoid partial files if conversion is interrupted. tmp_path = unique_path(dst_webp.with_suffix(".tmp.webp")) try: with Image.open(src_png) as im: did_downscale = False save_kwargs: Dict[str, Any] = { "format": "WEBP", "quality": int(quality), "method": int(method), } # Preserve alpha when present; Pillow handles it for WEBP. # Normalize palette images to RGBA to avoid odd palette artifacts. if im.mode == "P": im = im.convert("RGBA") # WebP enforces a hard max dimension per side (16383px). # When full-page captures are very tall, downscale proportionally to fit. try: w, h = im.size except Exception: w, h = 0, 0 if downscale_if_oversize and isinstance(max_dim, int) and max_dim > 0 and (w > max_dim or h > max_dim): scale = 1.0 try: scale = min(float(max_dim) / float(w), float(max_dim) / float(h)) except Exception: scale = 1.0 if scale > 0.0 and scale < 1.0: new_w = max(1, int(w * scale)) new_h = max(1, int(h * scale)) debug( f"[_convert_to_webp] Image exceeds WebP limit ({w}x{h}); downscaling -> {new_w}x{new_h}" ) try: resample = getattr(getattr(Image, "Resampling", Image), "LANCZOS", None) if resample is None: resample = getattr(Image, "LANCZOS", 1) im = im.resize((new_w, new_h), resample=resample) did_downscale = True except Exception as exc: debug(f"[_convert_to_webp] Downscale failed; attempting direct WEBP save anyway: {exc}") im.save(tmp_path, **save_kwargs) tmp_path.replace(dst_webp) return bool(did_downscale) finally: try: tmp_path.unlink(missing_ok=True) except Exception: pass def _matched_site_selectors(url: str) -> List[str]: """Return SITE_SELECTORS for a matched domain; empty if no match. Unlike `_selectors_for_url()`, this does not return a generic fallback. """ u = str(url or "").lower() sels: List[str] = [] for domain, selectors in SITE_SELECTORS.items(): if domain in u: sels.extend(selectors) return sels def _selectors_for_url(url: str) -> List[str]: """Return selectors to try for a URL. For now, prefer a minimal behavior: only return known SITE_SELECTORS. (The cmdlet already falls back to full-page capture when no selectors match.) """ return _matched_site_selectors(url) def _platform_preprocess(url: str, page: Any, warnings: List[str], timeout_ms: int = 10_000) -> None: """Best-effort page tweaks for popular platforms before capture.""" try: u = str(url or "").lower() def _try_click_buttons(names: List[str], passes: int = 2, per_timeout: int = 700) -> int: clicks = 0 for _ in range(max(1, int(passes))): for name in names: try: locator = page.get_by_role("button", name=name) locator.first.click(timeout=int(per_timeout)) clicks += 1 except Exception: pass return clicks # Dismiss common cookie / consent prompts. _try_click_buttons([ "Accept all", "Accept", "I agree", "Agree", "Allow all", "OK", ]) # Some sites need small nudges (best-effort). if "reddit.com" in u: _try_click_buttons(["Accept all", "Accept"]) if ("twitter.com" in u) or ("x.com" in u): _try_click_buttons(["Accept all", "Accept"]) if "instagram.com" in u: _try_click_buttons(["Allow all", "Accept all", "Accept"]) except Exception as exc: debug(f"[_platform_preprocess] skipped: {exc}") return def _submit_wayback(url: str, timeout: float) -> Optional[str]: encoded = quote(url, safe="/:?=&") with HTTPClient(headers={"User-Agent": USER_AGENT}) as client: response = client.get(f"https://web.archive.org/save/{encoded}") content_location = response.headers.get("Content-Location") if content_location: return urljoin("https://web.archive.org", content_location) return str(response.url) def _submit_archive_today(url: str, timeout: float) -> Optional[str]: """Submit URL to Archive.today.""" encoded = quote(url, safe=":/?#[]@!$&'()*+,;=") with HTTPClient(headers={"User-Agent": USER_AGENT}) as client: response = client.get(f"https://archive.today/submit/?url={encoded}") response.raise_for_status() final = str(response.url) if final and ("archive.today" in final or "archive.ph" in final): return final return None def _submit_archive_ph(url: str, timeout: float) -> Optional[str]: """Submit URL to Archive.ph.""" encoded = quote(url, safe=":/?#[]@!$&'()*+,;=") with HTTPClient(headers={"User-Agent": USER_AGENT}) as client: response = client.get(f"https://archive.ph/submit/?url={encoded}") response.raise_for_status() final = str(response.url) if final and "archive.ph" in final: return final return None def _archive_url(url: str, timeout: float) -> Tuple[List[str], List[str]]: """Submit URL to all available archive services.""" archives: List[str] = [] warnings: List[str] = [] for submitter, label in ( (_submit_wayback, "wayback"), (_submit_archive_today, "archive.today"), (_submit_archive_ph, "archive.ph"), ): try: debug(f"Archiving to {label}...") archived = submitter(url, timeout) except httpx.HTTPStatusError as exc: if exc.response.status_code == 429: warnings.append(f"archive {label} rate limited (HTTP 429)") debug(f"{label}: Rate limited (HTTP 429)") else: warnings.append(f"archive {label} failed: HTTP {exc.response.status_code}") debug(f"{label}: HTTP {exc.response.status_code}") except httpx.RequestError as exc: warnings.append(f"archive {label} failed: {exc}") debug(f"{label}: Connection error: {exc}") except Exception as exc: warnings.append(f"archive {label} failed: {exc}") debug(f"{label}: {exc}") else: if archived: archives.append(archived) debug(f"{label}: Success - {archived}") else: debug(f"{label}: No archive link returned") return archives, warnings def _prepare_output_path(options: ScreenshotOptions) -> Path: """Prepare and validate output path for screenshot.""" ensure_directory(options.output_dir) explicit_format = _normalise_format(options.output_format) if options.output_format else None inferred_format: Optional[str] = None if options.output_path is not None: path = options.output_path if not path.is_absolute(): path = options.output_dir / path suffix = path.suffix.lower() if suffix: inferred_format = _normalise_format(suffix[1:]) else: stamp = time.strftime("%Y%m%d_%H%M%S") filename = f"{_slugify_url(options.url)}_{stamp}" path = options.output_dir / filename final_format = explicit_format or inferred_format or "png" if not path.suffix: path = path.with_suffix(_format_suffix(final_format)) else: current_suffix = path.suffix.lower() expected = _format_suffix(final_format) if current_suffix != expected: path = path.with_suffix(expected) options.output_format = final_format return unique_path(path) def _capture(options: ScreenshotOptions, destination: Path, warnings: List[str], progress: PipelineProgress) -> None: """Capture screenshot using Playwright.""" debug(f"[_capture] Starting capture for {options.url} -> {destination}") try: progress.step("loading launching browser") tool = options.playwright_tool or PlaywrightTool({}) # Ensure Chromium engine is used for the screen-shot cmdlet (force for consistency) try: current_browser = getattr(tool.defaults, "browser", "").lower() if getattr(tool, "defaults", None) is not None else "" if current_browser != "chromium": debug(f"[_capture] Overriding Playwright browser '{current_browser}' -> 'chromium' for screen-shot cmdlet") base_cfg = {} try: base_cfg = dict(getattr(tool, "_config", {}) or {}) except Exception: base_cfg = {} tool_block = dict(base_cfg.get("tool") or {}) if isinstance(base_cfg, dict) else {} pw_block = dict(tool_block.get("playwright") or {}) if isinstance(tool_block, dict) else {} pw_block["browser"] = "chromium" tool_block["playwright"] = pw_block if isinstance(base_cfg, dict): base_cfg["tool"] = tool_block tool = PlaywrightTool(base_cfg) except Exception: tool = PlaywrightTool({"tool": {"playwright": {"browser": "chromium"}}}) tool.debug_dump() debug("Launching browser...") format_name = _normalise_format(options.output_format) headless = options.headless or format_name == "pdf" debug(f"[_capture] Format: {format_name}, Headless: {headless}") if format_name == "pdf" and not options.headless: warnings.append("pdf output requires headless Chromium; overriding headless mode") try: with tool.open_page(headless=headless) as page: progress.step("loading navigating") debug(f"Navigating to {options.url}...") try: tool.goto(page, options.url) debug("Page loaded successfully") progress.step("loading page loaded") except PlaywrightTimeoutError: warnings.append("navigation timeout; capturing current page state") debug("Navigation timeout; proceeding with current state") progress.step("loading navigation timeout") # Skip article lookup by default (wait_for_article defaults to False) if options.wait_for_article: try: debug("Waiting for article element...") page.wait_for_selector("article", timeout=10_000) debug("Article element found") except PlaywrightTimeoutError: warnings.append("
selector not found; capturing fallback") debug("Article element not found; using fallback") if options.wait_after_load > 0: debug(f"Waiting {options.wait_after_load}s for page stabilization...") time.sleep(min(10.0, max(0.0, options.wait_after_load))) progress.step("loading stabilized") progress.step("capturing preparing") if options.replace_video_posters: debug("Replacing video elements with posters...") page.evaluate( """ document.querySelectorAll('video').forEach(v => { if (v.poster) { const img = document.createElement('img'); img.src = v.poster; img.style.maxWidth = '100%'; img.style.borderRadius = '12px'; v.replaceWith(img); } }); """ ) # Attempt platform-specific target capture if requested (and not PDF) element_captured = False if options.prefer_platform_target and format_name != "pdf": debug(f"[_capture] Target capture enabled") debug("Attempting platform-specific content capture...") progress.step("capturing locating target") try: _platform_preprocess(options.url, page, warnings) except Exception as e: debug(f"[_capture] Platform preprocess failed: {e}") pass selectors = list(options.target_selectors or []) if not selectors: selectors = _selectors_for_url(options.url) debug(f"[_capture] Trying selectors: {selectors}") for sel in selectors: try: debug(f"Trying selector: {sel}") el = page.wait_for_selector(sel, timeout=max(0, int(options.selector_timeout_ms))) except PlaywrightTimeoutError: debug(f"Selector not found: {sel}") continue try: if el is not None: debug(f"Found element with selector: {sel}") try: el.scroll_into_view_if_needed(timeout=1000) except Exception: pass progress.step("capturing output") debug(f"Capturing element to {destination}...") el.screenshot(path=str(destination), type=("jpeg" if format_name == "jpeg" else None)) element_captured = True debug("Element captured successfully") break except Exception as exc: warnings.append(f"element capture failed for '{sel}': {exc}") debug(f"Failed to capture element: {exc}") # Fallback to default capture paths if element_captured: progress.step("capturing saved") elif format_name == "pdf": debug("Generating PDF...") page.emulate_media(media="print") progress.step("capturing output") page.pdf(path=str(destination), print_background=True) debug(f"PDF saved to {destination}") progress.step("capturing saved") else: debug(f"Capturing full page to {destination}...") screenshot_kwargs: Dict[str, Any] = {"path": str(destination)} if format_name == "jpeg": screenshot_kwargs["type"] = "jpeg" screenshot_kwargs["quality"] = 90 if options.full_page: progress.step("capturing output") page.screenshot(full_page=True, **screenshot_kwargs) else: article = page.query_selector("article") if article is not None: article_kwargs = dict(screenshot_kwargs) article_kwargs.pop("full_page", None) progress.step("capturing output") article.screenshot(**article_kwargs) else: progress.step("capturing output") page.screenshot(**screenshot_kwargs) debug(f"Screenshot saved to {destination}") progress.step("capturing saved") except Exception as exc: debug(f"[_capture] Exception launching browser/page: {exc}") msg = str(exc).lower() if any(k in msg for k in ["executable", "not found", "no such file", "cannot find", "install"]): raise ScreenshotError("Chromium Playwright browser binaries not found. Install them: python ./scripts/setup.py --playwright-only --browsers chromium") from exc raise except ScreenshotError: # Re-raise ScreenshotError raised intentionally (do not wrap) raise except Exception as exc: debug(f"[_capture] Exception: {exc}") raise ScreenshotError(f"Failed to capture screenshot: {exc}") from exc def _capture_screenshot(options: ScreenshotOptions, progress: PipelineProgress) -> ScreenshotResult: """Capture a screenshot for the given options.""" debug(f"[_capture_screenshot] Preparing capture for {options.url}") requested_format = _normalise_format(options.output_format) destination = _prepare_output_path(options) warnings: List[str] = [] will_target = bool(options.prefer_platform_target) and requested_format != "pdf" will_convert = requested_format == "webp" will_archive = bool(options.archive and options.url) total_steps = 9 + (1 if will_target else 0) + (1 if will_convert else 0) + (1 if will_archive else 0) progress.begin_steps(total_steps) progress.step("loading starting") # Playwright screenshots do not natively support WebP output. # Capture as PNG, then convert via Pillow. capture_path = destination if requested_format == "webp": capture_path = unique_path(destination.with_suffix(".png")) debug(f"[_capture_screenshot] Requested webp; capturing intermediate png -> {capture_path}") options.output_format = "png" _capture(options, capture_path, warnings, progress) if requested_format == "webp": progress.step("capturing converting to webp") debug(f"[_capture_screenshot] Converting png -> webp: {destination}") try: did_downscale = _convert_to_webp(capture_path, destination) if did_downscale: warnings.append( f"webp conversion used downscaling to fit {WEBP_MAX_DIM}px limit; keeping original png: {capture_path.name}" ) else: try: capture_path.unlink(missing_ok=True) except Exception: pass except Exception as exc: warnings.append(f"webp conversion failed; keeping png: {exc}") destination = capture_path # Build URL list from captured url and any archives url: List[str] = [options.url] if options.url else [] archive_url: List[str] = [] if options.archive and options.url: progress.step("capturing archiving") debug(f"[_capture_screenshot] Archiving enabled for {options.url}") archives, archive_warnings = _archive_url(options.url, options.archive_timeout) archive_url.extend(archives) warnings.extend(archive_warnings) if archives: url = unique_preserve_order([*url, *archives]) progress.step("capturing finalized") applied_tag = unique_preserve_order(list(tag for tag in options.tag if tag.strip())) return ScreenshotResult( path=destination, tag_applied=applied_tag, archive_url=archive_url, url=url, warnings=warnings, ) # ============================================================================ # Main Cmdlet Function # ============================================================================ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: """Take screenshots of url in the pipeline. Accepts: - Single result object (dict or PipeObject) with 'path' field - List of result objects to screenshot each - Direct URL as string Emits PipeObject-formatted results for each screenshot with: - action: 'cmdlet:screen-shot' - is_temp: True (screenshots are temporary artifacts) - parent_id: hash of the original file/URL Screenshots are created using Playwright and marked as temporary so they can be cleaned up later with the cleanup cmdlet. """ debug(f"[_run] screen-shot invoked with args: {args}") # Help check if should_show_help(args): log(f"Cmdlet: {CMDLET.name}\nSummary: {CMDLET.summary}\nUsage: {CMDLET.usage}") return 0 if not HAS_PLAYWRIGHT: log( "playwright is required for screenshot capture; install with: pip install playwright; then: playwright install", file=sys.stderr, ) return 1 progress = PipelineProgress(pipeline_context) # ======================================================================== # ARGUMENT PARSING # ======================================================================== parsed = parse_cmdlet_args(args, CMDLET) format_value = parsed.get("format") if not format_value: # Default format can be set via config.conf tool block: # [tool=playwright] # format="pdf" try: tool_cfg = config.get("tool", {}) if isinstance(config, dict) else {} pw_cfg = tool_cfg.get("playwright") if isinstance(tool_cfg, dict) else None if isinstance(pw_cfg, dict): format_value = pw_cfg.get("format") except Exception: pass if not format_value: format_value = "webp" storage_value = parsed.get("storage") selector_arg = parsed.get("selector") selectors = [selector_arg] if selector_arg else [] archive_enabled = parsed.get("archive", False) # Positional URL argument (if provided) url_arg = parsed.get("url") positional_url = [str(url_arg)] if url_arg else [] # ======================================================================== # INPUT PROCESSING - Extract url from command args or pipeline # ======================================================================== # If the user provided an explicit URL argument, prefer it. url_to_process: List[Tuple[str, Any]] = [] if positional_url: url_to_process = [(u, None) for u in positional_url] else: piped_results = normalize_result_input(result) # Extract url from piped results if piped_results: for item in piped_results: url = ( get_field(item, 'path') or get_field(item, 'url') or get_field(item, 'target') ) if url: url_to_process.append((str(url), item)) if not url_to_process: log(f"No url to process for screen-shot cmdlet", file=sys.stderr) return 1 debug(f"[_run] url to process: {[u for u, _ in url_to_process]}") # ======================================================================== # OUTPUT DIRECTORY RESOLUTION - Priority chain # ======================================================================== screenshot_dir: Optional[Path] = None # Primary: Use --storage if provided (highest priority) if storage_value: try: screenshot_dir = SharedArgs.resolve_storage(storage_value) debug(f"[screen_shot] Using --storage {storage_value}: {screenshot_dir}") except ValueError as e: log(str(e), file=sys.stderr) return 1 # Secondary: Use config-based resolver ONLY if --storage not provided if screenshot_dir is None and resolve_output_dir is not None: try: screenshot_dir = resolve_output_dir(config) debug(f"[screen_shot] Using config resolver: {screenshot_dir}") except Exception: pass # Tertiary: Use config outfile ONLY if neither --storage nor resolver worked if screenshot_dir is None and config and config.get("outfile"): try: screenshot_dir = Path(config["outfile"]).expanduser() debug(f"[screen_shot] Using config outfile: {screenshot_dir}") except Exception: pass # Default: User's Videos directory if screenshot_dir is None: screenshot_dir = Path.home() / "Videos" debug(f"[screen_shot] Using default directory: {screenshot_dir}") ensure_directory(screenshot_dir) # If the caller isn't running the shared pipeline Live progress UI (e.g. direct # cmdlet execution), start a minimal local pipeline progress panel so this cmdlet # still shows step-level progress. try: progress.ensure_local_ui( label="screen-shot", total_items=len(url_to_process), items_preview=[u for u, _ in url_to_process], ) except Exception: pass # ======================================================================== # PREPARE SCREENSHOT OPTIONS # ======================================================================== format_name = _normalise_format(format_value) filtered_selectors = [str(s).strip() for s in selectors if str(s).strip()] manual_target_selectors = filtered_selectors if filtered_selectors else None all_emitted = [] exit_code = 0 # ======================================================================== # PROCESS url AND CAPTURE SCREENSHOTS # ======================================================================== def _extract_item_tags(item: Any) -> List[str]: if item is None: return [] raw = get_field(item, 'tag') if isinstance(raw, list): return [str(t) for t in raw if t is not None and str(t).strip()] if isinstance(raw, str) and raw.strip(): return [raw.strip()] return [] def _extract_item_title(item: Any) -> str: if item is None: return "" for key in ("title", "name", "filename"): val = get_field(item, key) if val is None: continue text = str(val).strip() if text: return text return "" def _clean_title(text: str) -> str: value = (text or "").strip() if value.lower().startswith("screenshot:"): value = value.split(":", 1)[1].strip() return value for url, origin_item in url_to_process: # Validate URL format if not url.lower().startswith(("http://", "https://", "file://")): log(f"[screen_shot] Skipping non-URL input: {url}", file=sys.stderr) continue try: # Create screenshot with provided options # Force the Playwright engine to Chromium for the screen-shot cmdlet # (this ensures consistent rendering and supports PDF output requirements). pw_local_cfg = {} if isinstance(config, dict): tool_block = dict(config.get("tool") or {}) pw_block = dict(tool_block.get("playwright") or {}) pw_block["browser"] = "chromium" # Use Playwright-native UA/headers (matches bundled Chromium version). pw_block["user_agent"] = "native" pw_block["viewport_width"] = int(DEFAULT_VIEWPORT.get("width", 1920)) pw_block["viewport_height"] = int(DEFAULT_VIEWPORT.get("height", 1080)) tool_block["playwright"] = pw_block pw_local_cfg = dict(config) pw_local_cfg["tool"] = tool_block else: pw_local_cfg = { "tool": { "playwright": { "browser": "chromium", "user_agent": "native", "viewport_width": int(DEFAULT_VIEWPORT.get("width", 1920)), "viewport_height": int(DEFAULT_VIEWPORT.get("height", 1080)), } } } options = ScreenshotOptions( url=url, output_dir=screenshot_dir, output_format=format_name, archive=archive_enabled, target_selectors=None, prefer_platform_target=False, wait_for_article=False, full_page=True, playwright_tool=PlaywrightTool(pw_local_cfg), ) # Auto element capture for known sites (x.com/twitter/etc.). # - If the user provided --selector, treat that as an explicit target. # - Otherwise, if SITE_SELECTORS matches the URL, auto-capture the post/content element. auto_selectors = _matched_site_selectors(url) if manual_target_selectors: options.prefer_platform_target = True options.target_selectors = manual_target_selectors debug(f"[screen_shot] Using explicit selector(s): {manual_target_selectors}") elif auto_selectors: options.prefer_platform_target = True options.target_selectors = auto_selectors debug(f"[screen_shot] Auto selectors matched for url: {auto_selectors}") screenshot_result = _capture_screenshot(options, progress) # Log results and warnings debug(f"Screenshot captured to {screenshot_result.path}") if screenshot_result.archive_url: debug(f"Archives: {', '.join(screenshot_result.archive_url)}") for warning in screenshot_result.warnings: debug(f"Warning: {warning}") # Compute hash of screenshot file screenshot_hash = None try: with open(screenshot_result.path, 'rb') as f: screenshot_hash = hashlib.sha256(f.read()).hexdigest() except Exception: pass # Create PipeObject result - marked as TEMP since derivative artifact capture_date = "" try: capture_date = datetime.fromtimestamp(screenshot_result.path.stat().st_mtime).date().isoformat() except Exception: capture_date = datetime.now().date().isoformat() upstream_title = _clean_title(_extract_item_title(origin_item)) url_title = _title_from_url(url) display_title = upstream_title or url_title or url upstream_tags = _extract_item_tags(origin_item) filtered_upstream_tags = [ t for t in upstream_tags if not str(t).strip().lower().startswith(("type:", "date:")) ] url_tags = _tags_from_url(url) merged_tags = unique_preserve_order( ["type:screenshot", f"date:{capture_date}"] + filtered_upstream_tags + url_tags ) pipe_obj = create_pipe_object_result( source='screenshot', store='PATH', identifier=Path(screenshot_result.path).stem, file_path=str(screenshot_result.path), cmdlet_name='screen-shot', title=display_title, hash_value=screenshot_hash, is_temp=True, parent_hash=hashlib.sha256(url.encode()).hexdigest(), tag=merged_tags, extra={ 'source_url': url, 'archive_url': screenshot_result.archive_url, 'url': screenshot_result.url, 'target': str(screenshot_result.path), # Explicit target for add-file } ) # Emit the result so downstream cmdlet (like add-file) can use it pipeline_context.emit(pipe_obj) all_emitted.append(pipe_obj) # If we created a local progress UI, advance it per completed item. progress.on_emit(pipe_obj) except ScreenshotError as exc: log(f"Error taking screenshot of {url}: {exc}", file=sys.stderr) exit_code = 1 except Exception as exc: log(f"Unexpected error taking screenshot of {url}: {exc}", file=sys.stderr) import traceback traceback.print_exc(file=sys.stderr) exit_code = 1 progress.close_local_ui(force_complete=True) if not all_emitted: log(f"No screenshots were successfully captured", file=sys.stderr) return 1 # Log completion message (keep this as normal output) log(f"✓ Successfully captured {len(all_emitted)} screenshot(s)") return exit_code CMDLET = Cmdlet( name="screen-shot", summary="Capture a website screenshot", usage="screen-shot [options] or download-data | screen-shot [options]", alias=["screenshot", "ss"], arg=[ SharedArgs.URL, CmdletArg(name="format", type="string", description="Output format: webp, png, jpeg, or pdf"), CmdletArg(name="selector", type="string", description="CSS selector for element capture"), ], detail=[ "Uses Playwright Chromium engine only. Install Chromium with: python ./scripts/setup.py --playwright-only --browsers chromium", "PDF output requires headless Chromium (the cmdlet will enforce headless mode for PDF).", "Screenshots are temporary artifacts stored in the configured `temp` directory.", ] ) CMDLET.exec = _run CMDLET.register()