"""Screen-shot cmdlet for capturing screenshots of URLs in a pipeline. This cmdlet processes files through the pipeline and creates screenshots using Playwright, marking them as temporary artifacts for cleanup. """ from __future__ import annotations import contextlib import hashlib import importlib import sys import time import httpx from dataclasses import dataclass, field from pathlib import Path from typing import Any, Dict, List, Optional, Sequence, Tuple from urllib.parse import urlsplit, quote, urljoin from helper.logger import log from helper.http_client import HTTPClient from . import register from ._shared import Cmdlet, CmdletArg, SharedArgs, create_pipe_object_result, normalize_result_input import models import pipeline as pipeline_context # ============================================================================ # CMDLET Metadata Declaration # ============================================================================ # ============================================================================ # Playwright & Screenshot Dependencies # ============================================================================ try: from playwright.sync_api import ( TimeoutError as PlaywrightTimeoutError, ViewportSize, sync_playwright, ) except Exception as exc: raise RuntimeError( "playwright is required for screenshot capture; install with 'pip install playwright'" ) from exc try: from config import resolve_output_dir except ImportError: try: _parent_dir = str(Path(__file__).parent.parent) if _parent_dir not in sys.path: sys.path.insert(0, _parent_dir) from config import resolve_output_dir except ImportError: resolve_output_dir = None # ============================================================================ # Screenshot Constants & Configuration # ============================================================================ USER_AGENT = ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/120.0.0.0 Safari/537.36" ) DEFAULT_VIEWPORT: ViewportSize = {"width": 1280, "height": 1200} ARCHIVE_TIMEOUT = 30.0 class ScreenshotError(RuntimeError): """Raised when screenshot capture or upload fails.""" @dataclass(slots=True) class ScreenshotOptions: """Options controlling screenshot capture and post-processing.""" url: str output_dir: Path output_path: Optional[Path] = None full_page: bool = True headless: bool = True wait_after_load: float = 2.0 wait_for_article: bool = False replace_video_posters: bool = True tags: Sequence[str] = () archive: bool = False archive_timeout: float = ARCHIVE_TIMEOUT known_urls: Sequence[str] = () output_format: Optional[str] = None prefer_platform_target: bool = False target_selectors: Optional[Sequence[str]] = None selector_timeout_ms: int = 10_000 @dataclass(slots=True) class ScreenshotResult: """Details about the captured screenshot.""" path: Path url: str tags_applied: List[str] archive_urls: List[str] known_urls: List[str] warnings: List[str] = field(default_factory=list) # ============================================================================ # Helper Functions # ============================================================================ def _ensure_directory(path: Path) -> None: """Ensure directory exists.""" if not isinstance(path, Path): path = Path(path) path.mkdir(parents=True, exist_ok=True) def _unique_path(path: Path) -> Path: """Get unique path by appending numbers if file exists.""" if not path.exists(): return path stem = path.stem suffix = path.suffix parent = path.parent counter = 1 while True: new_path = parent / f"{stem}_{counter}{suffix}" if not new_path.exists(): return new_path counter += 1 def _unique_preserve_order(items: Sequence[str]) -> List[str]: """Remove duplicates while preserving order.""" seen = set() result = [] for item in items: if item not in seen: seen.add(item) result.append(item) return result def _slugify_url(url: str) -> str: """Convert URL to filesystem-safe slug.""" parsed = urlsplit(url) candidate = f"{parsed.netloc}{parsed.path}" if parsed.query: candidate += f"?{parsed.query}" slug = "".join(char if char.isalnum() else "-" for char in candidate.lower()) slug = slug.strip("-") or "screenshot" return slug[:100] def _normalise_format(fmt: Optional[str]) -> str: """Normalize output format to valid values.""" if not fmt: return "png" value = fmt.strip().lower() if value in {"jpg", "jpeg"}: return "jpeg" if value in {"png", "pdf"}: return value return "png" def _format_suffix(fmt: str) -> str: """Get file suffix for format.""" if fmt == "jpeg": return ".jpg" return f".{fmt}" def _selectors_for_url(url: str) -> List[str]: """Return a list of likely content selectors for known platforms.""" u = url.lower() sels: List[str] = [] # Twitter/X if "twitter.com" in u or "x.com" in u: sels.extend([ "article[role='article']", "div[data-testid='tweet']", "div[data-testid='cellInnerDiv'] article", ]) # Instagram if "instagram.com" in u: sels.extend([ "article[role='presentation']", "article[role='article']", "div[role='dialog'] article", "section main article", ]) # Reddit if "reddit.com" in u: sels.extend([ "shreddit-post", "div[data-testid='post-container']", "div[data-click-id='background']", "article", ]) # Rumble (video post) if "rumble.com" in u: sels.extend([ "rumble-player, iframe.rumble", "div.video-item--main", "main article", ]) return sels or ["article"] def _platform_preprocess(url: str, page: Any, warnings: List[str], timeout_ms: int = 10_000) -> None: """Best-effort page tweaks for popular platforms before capture.""" u = url.lower() def _try_click_texts(texts: List[str], passes: int = 2, per_timeout: int = 700) -> int: clicks = 0 for _ in range(max(1, passes)): for t in texts: try: page.locator(f"text=/{t}/i").first.click(timeout=per_timeout) clicks += 1 except PlaywrightTimeoutError: pass except Exception: pass time.sleep(0.1) return clicks # Dismiss common cookie/consent prompts _try_click_texts(["accept", "i agree", "agree", "got it", "allow all", "consent"]) # Platform-specific expansions if "reddit.com" in u: _try_click_texts(["see more", "read more", "show more", "more"]) if ("twitter.com" in u) or ("x.com" in u): _try_click_texts(["show more", "more"]) if "instagram.com" in u: _try_click_texts(["more", "see more"]) if "tiktok.com" in u: _try_click_texts(["more", "see more"]) if ("facebook.com" in u) or ("fb.watch" in u): _try_click_texts(["see more", "show more", "more"]) if "rumble.com" in u: _try_click_texts(["accept", "agree", "close"]) def _submit_wayback(url: str, timeout: float) -> Optional[str]: """Submit URL to Internet Archive Wayback Machine.""" encoded = quote(url, safe="/:?=&") with HTTPClient() as client: response = client.get(f"https://web.archive.org/save/{encoded}") response.raise_for_status() content_location = response.headers.get("Content-Location") if content_location: return urljoin("https://web.archive.org", content_location) return str(response.url) def _submit_archive_today(url: str, timeout: float) -> Optional[str]: """Submit URL to Archive.today.""" encoded = quote(url, safe=":/?#[]@!$&'()*+,;=") with HTTPClient(headers={"User-Agent": USER_AGENT}) as client: response = client.get(f"https://archive.today/submit/?url={encoded}") response.raise_for_status() final = str(response.url) if final and ("archive.today" in final or "archive.ph" in final): return final return None def _submit_archive_ph(url: str, timeout: float) -> Optional[str]: """Submit URL to Archive.ph.""" encoded = quote(url, safe=":/?#[]@!$&'()*+,;=") with HTTPClient(headers={"User-Agent": USER_AGENT}) as client: response = client.get(f"https://archive.ph/submit/?url={encoded}") response.raise_for_status() final = str(response.url) if final and "archive.ph" in final: return final return None def _archive_url(url: str, timeout: float) -> Tuple[List[str], List[str]]: """Submit URL to all available archive services.""" archives: List[str] = [] warnings: List[str] = [] for submitter, label in ( (_submit_wayback, "wayback"), (_submit_archive_today, "archive.today"), (_submit_archive_ph, "archive.ph"), ): try: log(f"Archiving to {label}...", flush=True) archived = submitter(url, timeout) except httpx.HTTPStatusError as exc: if exc.response.status_code == 429: warnings.append(f"archive {label} rate limited (HTTP 429)") log(f"{label}: Rate limited (HTTP 429)", flush=True) else: warnings.append(f"archive {label} failed: HTTP {exc.response.status_code}") log(f"{label}: HTTP {exc.response.status_code}", flush=True) except httpx.RequestError as exc: warnings.append(f"archive {label} failed: {exc}") log(f"{label}: Connection error: {exc}", flush=True) except Exception as exc: warnings.append(f"archive {label} failed: {exc}") log(f"{label}: {exc}", flush=True) else: if archived: archives.append(archived) log(f"{label}: Success - {archived}", flush=True) else: log(f"{label}: No archive link returned", flush=True) return archives, warnings def _prepare_output_path(options: ScreenshotOptions) -> Path: """Prepare and validate output path for screenshot.""" _ensure_directory(options.output_dir) explicit_format = _normalise_format(options.output_format) if options.output_format else None inferred_format: Optional[str] = None if options.output_path is not None: path = options.output_path if not path.is_absolute(): path = options.output_dir / path suffix = path.suffix.lower() if suffix: inferred_format = _normalise_format(suffix[1:]) else: stamp = time.strftime("%Y%m%d_%H%M%S") filename = f"{_slugify_url(options.url)}_{stamp}" path = options.output_dir / filename final_format = explicit_format or inferred_format or "png" if not path.suffix: path = path.with_suffix(_format_suffix(final_format)) else: current_suffix = path.suffix.lower() expected = _format_suffix(final_format) if current_suffix != expected: path = path.with_suffix(expected) options.output_format = final_format return _unique_path(path) def _capture_with_playwright(options: ScreenshotOptions, destination: Path, warnings: List[str]) -> None: """Capture screenshot using Playwright.""" playwright = None browser = None context = None try: log("Starting Playwright...", flush=True) playwright = sync_playwright().start() log("Launching Chromium browser...", flush=True) format_name = _normalise_format(options.output_format) headless = options.headless or format_name == "pdf" if format_name == "pdf" and not options.headless: warnings.append("pdf output requires headless Chromium; overriding headless mode") browser = playwright.chromium.launch( headless=headless, args=["--disable-blink-features=AutomationControlled"], ) log("Creating browser context...", flush=True) context = browser.new_context( user_agent=USER_AGENT, viewport=DEFAULT_VIEWPORT, ignore_https_errors=True, ) page = context.new_page() log(f"Navigating to {options.url}...", flush=True) try: page.goto(options.url, timeout=90_000, wait_until="domcontentloaded") log("Page loaded successfully", flush=True) except PlaywrightTimeoutError: warnings.append("navigation timeout; capturing current page state") log("Navigation timeout; proceeding with current state", flush=True) # Skip article lookup by default (wait_for_article defaults to False) if options.wait_for_article: try: log("Waiting for article element...", flush=True) page.wait_for_selector("article", timeout=10_000) log("Article element found", flush=True) except PlaywrightTimeoutError: warnings.append("
selector not found; capturing fallback") log("Article element not found; using fallback", flush=True) if options.wait_after_load > 0: log(f"Waiting {options.wait_after_load}s for page stabilization...", flush=True) time.sleep(min(10.0, max(0.0, options.wait_after_load))) if options.replace_video_posters: log("Replacing video elements with posters...", flush=True) page.evaluate( """ document.querySelectorAll('video').forEach(v => { if (v.poster) { const img = document.createElement('img'); img.src = v.poster; img.style.maxWidth = '100%'; img.style.borderRadius = '12px'; v.replaceWith(img); } }); """ ) # Attempt platform-specific target capture if requested (and not PDF) element_captured = False if options.prefer_platform_target and format_name != "pdf": log("Attempting platform-specific content capture...", flush=True) try: _platform_preprocess(options.url, page, warnings) except Exception: pass selectors = list(options.target_selectors or []) if not selectors: selectors = _selectors_for_url(options.url) for sel in selectors: try: log(f"Trying selector: {sel}", flush=True) el = page.wait_for_selector(sel, timeout=max(0, int(options.selector_timeout_ms))) except PlaywrightTimeoutError: log(f"Selector not found: {sel}", flush=True) continue try: if el is not None: log(f"Found element with selector: {sel}", flush=True) try: el.scroll_into_view_if_needed(timeout=1000) except Exception: pass log(f"Capturing element to {destination}...", flush=True) el.screenshot(path=str(destination), type=("jpeg" if format_name == "jpeg" else None)) element_captured = True log("Element captured successfully", flush=True) break except Exception as exc: warnings.append(f"element capture failed for '{sel}': {exc}") log(f"Failed to capture element: {exc}", flush=True) # Fallback to default capture paths if element_captured: pass elif format_name == "pdf": log("Generating PDF...", flush=True) page.emulate_media(media="print") page.pdf(path=str(destination), print_background=True) log(f"PDF saved to {destination}", flush=True) else: log(f"Capturing full page to {destination}...", flush=True) screenshot_kwargs: Dict[str, Any] = {"path": str(destination)} if format_name == "jpeg": screenshot_kwargs["type"] = "jpeg" screenshot_kwargs["quality"] = 90 if options.full_page: page.screenshot(full_page=True, **screenshot_kwargs) else: article = page.query_selector("article") if article is not None: article_kwargs = dict(screenshot_kwargs) article_kwargs.pop("full_page", None) article.screenshot(**article_kwargs) else: page.screenshot(**screenshot_kwargs) log(f"Screenshot saved to {destination}", flush=True) except Exception as exc: raise ScreenshotError(f"Failed to capture screenshot: {exc}") from exc finally: log("Cleaning up browser resources...", flush=True) with contextlib.suppress(Exception): if context is not None: context.close() with contextlib.suppress(Exception): if browser is not None: browser.close() with contextlib.suppress(Exception): if playwright is not None: playwright.stop() log("Cleanup complete", flush=True) def _capture_screenshot(options: ScreenshotOptions) -> ScreenshotResult: """Capture a screenshot for the given options.""" destination = _prepare_output_path(options) warnings: List[str] = [] _capture_with_playwright(options, destination, warnings) known_urls = _unique_preserve_order([options.url, *options.known_urls]) archive_urls: List[str] = [] if options.archive: archives, archive_warnings = _archive_url(options.url, options.archive_timeout) archive_urls.extend(archives) warnings.extend(archive_warnings) if archives: known_urls = _unique_preserve_order([*known_urls, *archives]) applied_tags = _unique_preserve_order(list(tag for tag in options.tags if tag.strip())) return ScreenshotResult( path=destination, url=options.url, tags_applied=applied_tags, archive_urls=archive_urls, known_urls=known_urls, warnings=warnings, ) # ============================================================================ # Main Cmdlet Function # ============================================================================ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: """Take screenshots of URLs in the pipeline. Accepts: - Single result object (dict or PipeObject) with 'file_path' field - List of result objects to screenshot each - Direct URL as string Emits PipeObject-formatted results for each screenshot with: - action: 'cmdlet:screen-shot' - is_temp: True (screenshots are temporary artifacts) - parent_id: hash of the original file/URL Screenshots are created using Playwright and marked as temporary so they can be cleaned up later with the cleanup cmdlet. """ from ._shared import parse_cmdlet_args # Help check try: if any(str(a).lower() in {"-?", "/?", "--help", "-h", "help", "--cmdlet"} for a in args): log(json.dumps(CMDLET, ensure_ascii=False, indent=2)) return 0 except Exception: pass # ======================================================================== # ARGUMENT PARSING # ======================================================================== parsed = parse_cmdlet_args(args, CMDLET) format_value = parsed.get("format") storage_value = parsed.get("storage") selector_arg = parsed.get("selector") selectors = [selector_arg] if selector_arg else [] archive_enabled = parsed.get("archive", False) # Positional URL argument (if provided) url_arg = parsed.get("url") positional_urls = [str(url_arg)] if url_arg else [] # ======================================================================== # INPUT PROCESSING - Extract URLs from pipeline or command arguments # ======================================================================== piped_results = normalize_result_input(result) urls_to_process = [] # Extract URLs from piped results if piped_results: for item in piped_results: url = None if isinstance(item, dict): url = item.get('file_path') or item.get('path') or item.get('url') or item.get('target') else: url = getattr(item, 'file_path', None) or getattr(item, 'path', None) or getattr(item, 'url', None) or getattr(item, 'target', None) if url: urls_to_process.append(str(url)) # Use positional arguments if no pipeline input if not urls_to_process and positional_urls: urls_to_process = positional_urls if not urls_to_process: log(f"No URLs to process for screen-shot cmdlet", file=sys.stderr) return 1 # ======================================================================== # OUTPUT DIRECTORY RESOLUTION - Priority chain # ======================================================================== screenshot_dir: Optional[Path] = None # Primary: Use --storage if provided (highest priority) if storage_value: try: screenshot_dir = SharedArgs.resolve_storage(storage_value) log(f"[screen_shot] Using --storage {storage_value}: {screenshot_dir}", flush=True) except ValueError as e: log(str(e), file=sys.stderr) return 1 # Secondary: Use config-based resolver ONLY if --storage not provided if screenshot_dir is None and resolve_output_dir is not None: try: screenshot_dir = resolve_output_dir(config) log(f"[screen_shot] Using config resolver: {screenshot_dir}", flush=True) except Exception: pass # Tertiary: Use config outfile ONLY if neither --storage nor resolver worked if screenshot_dir is None and config and config.get("outfile"): try: screenshot_dir = Path(config["outfile"]).expanduser() log(f"[screen_shot] Using config outfile: {screenshot_dir}", flush=True) except Exception: pass # Default: User's Videos directory if screenshot_dir is None: screenshot_dir = Path.home() / "Videos" log(f"[screen_shot] Using default directory: {screenshot_dir}", flush=True) _ensure_directory(screenshot_dir) # ======================================================================== # PREPARE SCREENSHOT OPTIONS # ======================================================================== format_name = _normalise_format(format_value) filtered_selectors = [str(s).strip() for s in selectors if str(s).strip()] target_selectors = filtered_selectors if filtered_selectors else None all_emitted = [] exit_code = 0 # ======================================================================== # PROCESS URLs AND CAPTURE SCREENSHOTS # ======================================================================== for url in urls_to_process: # Validate URL format if not url.lower().startswith(("http://", "https://", "file://")): log(f"[screen_shot] Skipping non-URL input: {url}", file=sys.stderr) continue try: # Create screenshot with provided options options = ScreenshotOptions( url=url, output_dir=screenshot_dir, output_format=format_name, archive=archive_enabled, target_selectors=target_selectors, prefer_platform_target=False, wait_for_article=False, full_page=True, ) screenshot_result = _capture_screenshot(options) # Log results and warnings log(f"Screenshot captured to {screenshot_result.path}", flush=True) if screenshot_result.archive_urls: log(f"Archives: {', '.join(screenshot_result.archive_urls)}", flush=True) for warning in screenshot_result.warnings: log(f"Warning: {warning}", flush=True) # Compute hash of screenshot file screenshot_hash = None try: with open(screenshot_result.path, 'rb') as f: screenshot_hash = hashlib.sha256(f.read()).hexdigest() except Exception: pass # Create PipeObject result - marked as TEMP since derivative artifact pipe_obj = create_pipe_object_result( source='screenshot', identifier=Path(screenshot_result.path).stem, file_path=str(screenshot_result.path), cmdlet_name='screen-shot', title=f"Screenshot: {Path(screenshot_result.path).name}", file_hash=screenshot_hash, is_temp=True, parent_hash=hashlib.sha256(url.encode()).hexdigest(), extra={ 'source_url': url, 'archive_urls': screenshot_result.archive_urls, 'known_urls': screenshot_result.known_urls, 'target': str(screenshot_result.path), # Explicit target for add-file } ) # Emit the result so downstream cmdlets (like add-file) can use it pipeline_context.emit(pipe_obj) all_emitted.append(pipe_obj) except ScreenshotError as exc: log(f"Error taking screenshot of {url}: {exc}", file=sys.stderr) exit_code = 1 except Exception as exc: log(f"Unexpected error taking screenshot of {url}: {exc}", file=sys.stderr) import traceback traceback.print_exc(file=sys.stderr) exit_code = 1 if not all_emitted: log(f"No screenshots were successfully captured", file=sys.stderr) return 1 # Log completion message log(f"✓ Successfully captured {len(all_emitted)} screenshot(s)", flush=True) return exit_code CMDLET = Cmdlet( name="screen-shot", summary="Capture a screenshot of a URL or file and mark as temporary artifact", usage="screen-shot [options] or download-data | screen-shot [options]", aliases=["screenshot", "ss"], args=[ CmdletArg(name="url", type="string", required=False, description="URL to screenshot (or from pipeline)"), CmdletArg(name="format", type="string", description="Output format: png, jpeg, or pdf"), CmdletArg(name="selector", type="string", description="CSS selector for element capture"), SharedArgs.ARCHIVE, # Use shared archive argument SharedArgs.STORAGE, # Use shared storage argument ], details=[ "Take screenshots of URLs with optional archiving and element targeting.", "Screenshots are marked as temporary artifacts for cleanup by the cleanup cmdlet.", "", "Arguments:", " url URL to capture (optional if piped from pipeline)", " --format FORMAT Output format: png (default), jpeg, or pdf", " --selector SEL CSS selector for capturing specific element", " --archive, -arch Archive URL to Wayback/Archive.today/Archive.ph", " --storage LOCATION Storage destination: hydrus, local, 0x0, debrid, or ftp", "", "Examples:", " download-data https://example.com | screen-shot --storage local", " download-data https://twitter.com/user/status/123 | screen-shot --selector 'article[role=article]' --storage hydrus --archive", " screen-shot https://example.com --format jpeg --storage 0x0 --archive", ] )