"""Screen-shot cmdlet for capturing screenshots of url in a pipeline. This cmdlet processes files through the pipeline and creates screenshots using Playwright, marking them as temporary artifacts for cleanup. """ from __future__ import annotations import hashlib import sys import time from datetime import datetime import httpx from dataclasses import dataclass, field from pathlib import Path from typing import Any, Dict, List, Optional, Sequence, Tuple from urllib.parse import urlsplit, quote, urljoin from SYS.logger import log, debug from API.HTTP import HTTPClient from SYS.utils import ensure_directory, unique_path, unique_preserve_order from . import _shared as sh Cmdlet = sh.Cmdlet CmdletArg = sh.CmdletArg SharedArgs = sh.SharedArgs create_pipe_object_result = sh.create_pipe_object_result normalize_result_input = sh.normalize_result_input should_show_help = sh.should_show_help get_field = sh.get_field parse_cmdlet_args = sh.parse_cmdlet_args import pipeline as pipeline_context def _live_ui_and_pipe_index() -> tuple[Optional[Any], int]: ui = None try: ui = pipeline_context.get_live_progress() if hasattr(pipeline_context, "get_live_progress") else None except Exception: ui = None pipe_idx: int = 0 try: stage_ctx = pipeline_context.get_stage_context() if hasattr(pipeline_context, "get_stage_context") else None maybe_idx = getattr(stage_ctx, "pipe_index", None) if stage_ctx is not None else None if isinstance(maybe_idx, int): pipe_idx = int(maybe_idx) except Exception: pipe_idx = 0 return ui, pipe_idx def _begin_live_steps(total_steps: int) -> None: """Declare the total number of steps for this cmdlet run (per-pipe).""" ui, pipe_idx = _live_ui_and_pipe_index() if ui is None: return try: begin = getattr(ui, "begin_pipe_steps", None) if callable(begin): begin(int(pipe_idx), total_steps=int(total_steps)) except Exception: return def _step(text: str) -> None: """Emit a *new* step. Each call increments the step counter and advances percent automatically. """ ui, pipe_idx = _live_ui_and_pipe_index() if ui is None: return try: adv = getattr(ui, "advance_pipe_step", None) if callable(adv): adv(int(pipe_idx), str(text)) except Exception: return # ============================================================================ # CMDLET Metadata Declaration # ============================================================================ # ============================================================================ # Playwright & Screenshot Dependencies # ============================================================================ from tool.playwright import HAS_PLAYWRIGHT, PlaywrightTimeoutError, PlaywrightTool try: from config import resolve_output_dir except ImportError: try: _parent_dir = str(Path(__file__).parent.parent) if _parent_dir not in sys.path: sys.path.insert(0, _parent_dir) from config import resolve_output_dir except ImportError: resolve_output_dir = None # ============================================================================ # Screenshot Constants & Configuration # ============================================================================ USER_AGENT = ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/120.0.0.0 Safari/537.36" ) DEFAULT_VIEWPORT: dict[str, int] = {"width": 1920, "height": 1080} ARCHIVE_TIMEOUT = 30.0 # Configurable selectors for specific websites SITE_SELECTORS: Dict[str, List[str]] = { "twitter.com": [ "article[role='article']", "div[data-testid='tweet']", "div[data-testid='cellInnerDiv'] article", ], "x.com": [ "article[role='article']", "div[data-testid='tweet']", "div[data-testid='cellInnerDiv'] article", ], "instagram.com": [ "article[role='presentation']", "article[role='article']", "div[role='dialog'] article", "section main article", ], "reddit.com": [ "shreddit-post", "div[data-testid='post-container']", "div[data-click-id='background']", "article", ], "rumble.com": [ "rumble-player, iframe.rumble", "div.video-item--main", "main article", ], } class ScreenshotError(RuntimeError): """Raised when screenshot capture or upload fails.""" @dataclass(slots=True) class ScreenshotOptions: """Options controlling screenshot capture and post-processing.""" output_dir: Path url: str = "" output_path: Optional[Path] = None full_page: bool = True headless: bool = True wait_after_load: float = 6.0 wait_for_article: bool = False replace_video_posters: bool = True tag: Sequence[str] = () archive: bool = False archive_timeout: float = ARCHIVE_TIMEOUT output_format: Optional[str] = None prefer_platform_target: bool = False target_selectors: Optional[Sequence[str]] = None selector_timeout_ms: int = 10_000 playwright_tool: Optional[PlaywrightTool] = None @dataclass(slots=True) class ScreenshotResult: """Details about the captured screenshot.""" path: Path tag_applied: List[str] archive_url: List[str] url: List[str] warnings: List[str] = field(default_factory=list) # ============================================================================ # Helper Functions # ============================================================================ def _slugify_url(url: str) -> str: """Convert URL to filesystem-safe slug.""" parsed = urlsplit(url) candidate = f"{parsed.netloc}{parsed.path}" if parsed.query: candidate += f"?{parsed.query}" slug = "".join(char if char.isalnum() else "-" for char in candidate.lower()) slug = slug.strip("-") or "screenshot" return slug[:100] def _normalise_format(fmt: Optional[str]) -> str: """Normalize output format to valid values.""" if not fmt: return "webp" value = fmt.strip().lower() if value in {"jpg", "jpeg"}: return "jpeg" if value in {"png", "pdf", "webp"}: return value return "webp" def _format_suffix(fmt: str) -> str: """Get file suffix for format.""" if fmt == "jpeg": return ".jpg" return f".{fmt}" def _matched_site_selectors(url: str) -> List[str]: """Return SITE_SELECTORS for a matched domain; empty if no match. Unlike `_selectors_for_url()`, this does not return a generic fallback. """ u = str(url or "").lower() sels: List[str] = [] for domain, selectors in SITE_SELECTORS.items(): if domain in u: sels.extend(selectors) return sels def _platform_preprocess(url: str, page: Any, warnings: List[str], timeout_ms: int = 10_000) -> None: """Best-effort page tweaks for popular platforms before capture.""" try: u = str(url or "").lower() def _try_click_buttons(names: List[str], passes: int = 2, per_timeout: int = 700) -> int: clicks = 0 for _ in range(max(1, int(passes))): for name in names: try: locator = page.get_by_role("button", name=name) locator.first.click(timeout=int(per_timeout)) clicks += 1 except Exception: pass return clicks # Dismiss common cookie / consent prompts. _try_click_buttons([ "Accept all", "Accept", "I agree", "Agree", "Allow all", "OK", ]) # Some sites need small nudges (best-effort). if "reddit.com" in u: _try_click_buttons(["Accept all", "Accept"]) if ("twitter.com" in u) or ("x.com" in u): _try_click_buttons(["Accept all", "Accept"]) if "instagram.com" in u: _try_click_buttons(["Allow all", "Accept all", "Accept"]) except Exception as exc: debug(f"[_platform_preprocess] skipped: {exc}") return def _submit_wayback(url: str, timeout: float) -> Optional[str]: encoded = quote(url, safe="/:?=&") with HTTPClient(headers={"User-Agent": USER_AGENT}) as client: response = client.get(f"https://web.archive.org/save/{encoded}") content_location = response.headers.get("Content-Location") if content_location: return urljoin("https://web.archive.org", content_location) return str(response.url) def _submit_archive_today(url: str, timeout: float) -> Optional[str]: """Submit URL to Archive.today.""" encoded = quote(url, safe=":/?#[]@!$&'()*+,;=") with HTTPClient(headers={"User-Agent": USER_AGENT}) as client: response = client.get(f"https://archive.today/submit/?url={encoded}") response.raise_for_status() final = str(response.url) if final and ("archive.today" in final or "archive.ph" in final): return final return None def _submit_archive_ph(url: str, timeout: float) -> Optional[str]: """Submit URL to Archive.ph.""" encoded = quote(url, safe=":/?#[]@!$&'()*+,;=") with HTTPClient(headers={"User-Agent": USER_AGENT}) as client: response = client.get(f"https://archive.ph/submit/?url={encoded}") response.raise_for_status() final = str(response.url) if final and "archive.ph" in final: return final return None def _archive_url(url: str, timeout: float) -> Tuple[List[str], List[str]]: """Submit URL to all available archive services.""" archives: List[str] = [] warnings: List[str] = [] for submitter, label in ( (_submit_wayback, "wayback"), (_submit_archive_today, "archive.today"), (_submit_archive_ph, "archive.ph"), ): try: debug(f"Archiving to {label}...") archived = submitter(url, timeout) except httpx.HTTPStatusError as exc: if exc.response.status_code == 429: warnings.append(f"archive {label} rate limited (HTTP 429)") debug(f"{label}: Rate limited (HTTP 429)") else: warnings.append(f"archive {label} failed: HTTP {exc.response.status_code}") debug(f"{label}: HTTP {exc.response.status_code}") except httpx.RequestError as exc: warnings.append(f"archive {label} failed: {exc}") debug(f"{label}: Connection error: {exc}") except Exception as exc: warnings.append(f"archive {label} failed: {exc}") debug(f"{label}: {exc}") else: if archived: archives.append(archived) debug(f"{label}: Success - {archived}") else: debug(f"{label}: No archive link returned") return archives, warnings def _prepare_output_path(options: ScreenshotOptions) -> Path: """Prepare and validate output path for screenshot.""" ensure_directory(options.output_dir) explicit_format = _normalise_format(options.output_format) if options.output_format else None inferred_format: Optional[str] = None if options.output_path is not None: path = options.output_path if not path.is_absolute(): path = options.output_dir / path suffix = path.suffix.lower() if suffix: inferred_format = _normalise_format(suffix[1:]) else: stamp = time.strftime("%Y%m%d_%H%M%S") filename = f"{_slugify_url(options.url)}_{stamp}" path = options.output_dir / filename final_format = explicit_format or inferred_format or "png" if not path.suffix: path = path.with_suffix(_format_suffix(final_format)) else: current_suffix = path.suffix.lower() expected = _format_suffix(final_format) if current_suffix != expected: path = path.with_suffix(expected) options.output_format = final_format return unique_path(path) def _capture(options: ScreenshotOptions, destination: Path, warnings: List[str]) -> None: """Capture screenshot using Playwright.""" debug(f"[_capture] Starting capture for {options.url} -> {destination}") try: _step("loading launching browser") tool = options.playwright_tool or PlaywrightTool({}) # Ensure Chromium engine is used for the screen-shot cmdlet (force for consistency) try: current_browser = getattr(tool.defaults, "browser", "").lower() if getattr(tool, "defaults", None) is not None else "" if current_browser != "chromium": debug(f"[_capture] Overriding Playwright browser '{current_browser}' -> 'chromium' for screen-shot cmdlet") base_cfg = {} try: base_cfg = dict(getattr(tool, "_config", {}) or {}) except Exception: base_cfg = {} tool_block = dict(base_cfg.get("tool") or {}) if isinstance(base_cfg, dict) else {} pw_block = dict(tool_block.get("playwright") or {}) if isinstance(tool_block, dict) else {} pw_block["browser"] = "chromium" tool_block["playwright"] = pw_block if isinstance(base_cfg, dict): base_cfg["tool"] = tool_block tool = PlaywrightTool(base_cfg) except Exception: tool = PlaywrightTool({"tool": {"playwright": {"browser": "chromium"}}}) tool.debug_dump() debug("Launching browser...") format_name = _normalise_format(options.output_format) headless = options.headless or format_name == "pdf" debug(f"[_capture] Format: {format_name}, Headless: {headless}") if format_name == "pdf" and not options.headless: warnings.append("pdf output requires headless Chromium; overriding headless mode") try: with tool.open_page(headless=headless) as page: _step("loading navigating") debug(f"Navigating to {options.url}...") try: tool.goto(page, options.url) debug("Page loaded successfully") _step("loading page loaded") except PlaywrightTimeoutError: warnings.append("navigation timeout; capturing current page state") debug("Navigation timeout; proceeding with current state") _step("loading navigation timeout") # Skip article lookup by default (wait_for_article defaults to False) if options.wait_for_article: try: debug("Waiting for article element...") page.wait_for_selector("article", timeout=10_000) debug("Article element found") except PlaywrightTimeoutError: warnings.append("
selector not found; capturing fallback") debug("Article element not found; using fallback") if options.wait_after_load > 0: debug(f"Waiting {options.wait_after_load}s for page stabilization...") time.sleep(min(10.0, max(0.0, options.wait_after_load))) _step("loading stabilized") _step("capturing preparing") if options.replace_video_posters: debug("Replacing video elements with posters...") page.evaluate( """ document.querySelectorAll('video').forEach(v => { if (v.poster) { const img = document.createElement('img'); img.src = v.poster; img.style.maxWidth = '100%'; img.style.borderRadius = '12px'; v.replaceWith(img); } }); """ ) # Attempt platform-specific target capture if requested (and not PDF) element_captured = False if options.prefer_platform_target and format_name != "pdf": debug(f"[_capture] Target capture enabled") debug("Attempting platform-specific content capture...") _step("capturing locating target") try: _platform_preprocess(options.url, page, warnings) except Exception as e: debug(f"[_capture] Platform preprocess failed: {e}") pass selectors = list(options.target_selectors or []) if not selectors: selectors = _selectors_for_url(options.url) debug(f"[_capture] Trying selectors: {selectors}") for sel in selectors: try: debug(f"Trying selector: {sel}") el = page.wait_for_selector(sel, timeout=max(0, int(options.selector_timeout_ms))) except PlaywrightTimeoutError: debug(f"Selector not found: {sel}") continue try: if el is not None: debug(f"Found element with selector: {sel}") try: el.scroll_into_view_if_needed(timeout=1000) except Exception: pass _step("capturing output") debug(f"Capturing element to {destination}...") el.screenshot(path=str(destination), type=("jpeg" if format_name == "jpeg" else None)) element_captured = True debug("Element captured successfully") break except Exception as exc: warnings.append(f"element capture failed for '{sel}': {exc}") debug(f"Failed to capture element: {exc}") # Fallback to default capture paths if element_captured: _step("capturing saved") elif format_name == "pdf": debug("Generating PDF...") page.emulate_media(media="print") _step("capturing output") page.pdf(path=str(destination), print_background=True) debug(f"PDF saved to {destination}") _step("capturing saved") else: debug(f"Capturing full page to {destination}...") screenshot_kwargs: Dict[str, Any] = {"path": str(destination)} if format_name == "jpeg": screenshot_kwargs["type"] = "jpeg" screenshot_kwargs["quality"] = 90 if options.full_page: _step("capturing output") page.screenshot(full_page=True, **screenshot_kwargs) else: article = page.query_selector("article") if article is not None: article_kwargs = dict(screenshot_kwargs) article_kwargs.pop("full_page", None) _step("capturing output") article.screenshot(**article_kwargs) else: _step("capturing output") page.screenshot(**screenshot_kwargs) debug(f"Screenshot saved to {destination}") _step("capturing saved") except Exception as exc: debug(f"[_capture] Exception launching browser/page: {exc}") msg = str(exc).lower() if any(k in msg for k in ["executable", "not found", "no such file", "cannot find", "install"]): raise ScreenshotError("Chromium Playwright browser binaries not found. Install them: python ./scripts/setup.py --playwright-only --browsers chromium") from exc raise except ScreenshotError: # Re-raise ScreenshotError raised intentionally (do not wrap) raise except Exception as exc: debug(f"[_capture] Exception: {exc}") raise ScreenshotError(f"Failed to capture screenshot: {exc}") from exc def _capture_screenshot(options: ScreenshotOptions) -> ScreenshotResult: """Capture a screenshot for the given options.""" debug(f"[_capture_screenshot] Preparing capture for {options.url}") requested_format = _normalise_format(options.output_format) destination = _prepare_output_path(options) warnings: List[str] = [] will_target = bool(options.prefer_platform_target) and requested_format != "pdf" will_convert = requested_format == "webp" will_archive = bool(options.archive and options.url) total_steps = 9 + (1 if will_target else 0) + (1 if will_convert else 0) + (1 if will_archive else 0) _begin_live_steps(total_steps) _step("loading starting") # Playwright screenshots do not natively support WebP output. # Capture as PNG, then convert via Pillow. capture_path = destination if requested_format == "webp": capture_path = unique_path(destination.with_suffix(".png")) debug(f"[_capture_screenshot] Requested webp; capturing intermediate png -> {capture_path}") options.output_format = "png" _capture(options, capture_path, warnings) if requested_format == "webp": _step("capturing converting to webp") debug(f"[_capture_screenshot] Converting png -> webp: {destination}") try: _convert_to_webp(capture_path, destination) try: capture_path.unlink(missing_ok=True) except Exception: pass except Exception as exc: warnings.append(f"webp conversion failed; keeping png: {exc}") destination = capture_path # Build URL list from captured url and any archives url: List[str] = [options.url] if options.url else [] archive_url: List[str] = [] if options.archive and options.url: _step("capturing archiving") debug(f"[_capture_screenshot] Archiving enabled for {options.url}") archives, archive_warnings = _archive_url(options.url, options.archive_timeout) archive_url.extend(archives) warnings.extend(archive_warnings) if archives: url = unique_preserve_order([*url, *archives]) _step("capturing finalized") applied_tag = unique_preserve_order(list(tag for tag in options.tag if tag.strip())) return ScreenshotResult( path=destination, tag_applied=applied_tag, archive_url=archive_url, url=url, warnings=warnings, ) # ============================================================================ # Main Cmdlet Function # ============================================================================ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: """Take screenshots of url in the pipeline. Accepts: - Single result object (dict or PipeObject) with 'path' field - List of result objects to screenshot each - Direct URL as string Emits PipeObject-formatted results for each screenshot with: - action: 'cmdlet:screen-shot' - is_temp: True (screenshots are temporary artifacts) - parent_id: hash of the original file/URL Screenshots are created using Playwright and marked as temporary so they can be cleaned up later with the cleanup cmdlet. """ debug(f"[_run] screen-shot invoked with args: {args}") # Help check if should_show_help(args): log(f"Cmdlet: {CMDLET.name}\nSummary: {CMDLET.summary}\nUsage: {CMDLET.usage}") return 0 if not HAS_PLAYWRIGHT: log( "playwright is required for screenshot capture; install with: pip install playwright; then: playwright install", file=sys.stderr, ) return 1 # ======================================================================== # ARGUMENT PARSING # ======================================================================== parsed = parse_cmdlet_args(args, CMDLET) format_value = parsed.get("format") if not format_value: # Default format can be set via config.conf tool block: # [tool=playwright] # format="pdf" try: tool_cfg = config.get("tool", {}) if isinstance(config, dict) else {} pw_cfg = tool_cfg.get("playwright") if isinstance(tool_cfg, dict) else None if isinstance(pw_cfg, dict): format_value = pw_cfg.get("format") except Exception: pass if not format_value: format_value = "webp" storage_value = parsed.get("storage") selector_arg = parsed.get("selector") selectors = [selector_arg] if selector_arg else [] archive_enabled = parsed.get("archive", False) # Positional URL argument (if provided) url_arg = parsed.get("url") positional_url = [str(url_arg)] if url_arg else [] # ======================================================================== # INPUT PROCESSING - Extract url from command args or pipeline # ======================================================================== # If the user provided an explicit URL argument, prefer it. url_to_process: List[Tuple[str, Any]] = [] if positional_url: url_to_process = [(u, None) for u in positional_url] else: piped_results = normalize_result_input(result) # Extract url from piped results if piped_results: for item in piped_results: url = ( get_field(item, 'path') or get_field(item, 'url') or get_field(item, 'target') ) if url: url_to_process.append((str(url), item)) if not url_to_process: log(f"No url to process for screen-shot cmdlet", file=sys.stderr) return 1 debug(f"[_run] url to process: {[u for u, _ in url_to_process]}") # If the caller isn't running the shared pipeline Live progress UI (e.g. direct # cmdlet execution), start a minimal local pipeline progress panel so this cmdlet # still shows step-level progress. local_progress_ui = None try: existing_ui = pipeline_context.get_live_progress() if hasattr(pipeline_context, "get_live_progress") else None except Exception: existing_ui = None try: if existing_ui is None and bool(getattr(sys.stderr, "isatty", lambda: False)()): from models import PipelineLiveProgress local_progress_ui = PipelineLiveProgress(["screen-shot"], enabled=True) local_progress_ui.start() try: if hasattr(pipeline_context, "set_live_progress"): pipeline_context.set_live_progress(local_progress_ui) except Exception: pass try: local_progress_ui.begin_pipe(0, total_items=len(url_to_process), items_preview=[u for u, _ in url_to_process]) except Exception: pass except Exception: local_progress_ui = None # ======================================================================== # OUTPUT DIRECTORY RESOLUTION - Priority chain # ======================================================================== screenshot_dir: Optional[Path] = None # Primary: Use --storage if provided (highest priority) if storage_value: try: screenshot_dir = SharedArgs.resolve_storage(storage_value) debug(f"[screen_shot] Using --storage {storage_value}: {screenshot_dir}") except ValueError as e: log(str(e), file=sys.stderr) return 1 # Secondary: Use config-based resolver ONLY if --storage not provided if screenshot_dir is None and resolve_output_dir is not None: try: screenshot_dir = resolve_output_dir(config) debug(f"[screen_shot] Using config resolver: {screenshot_dir}") except Exception: pass # Tertiary: Use config outfile ONLY if neither --storage nor resolver worked if screenshot_dir is None and config and config.get("outfile"): try: screenshot_dir = Path(config["outfile"]).expanduser() debug(f"[screen_shot] Using config outfile: {screenshot_dir}") except Exception: pass # Default: User's Videos directory if screenshot_dir is None: screenshot_dir = Path.home() / "Videos" debug(f"[screen_shot] Using default directory: {screenshot_dir}") ensure_directory(screenshot_dir) # ======================================================================== # PREPARE SCREENSHOT OPTIONS # ======================================================================== format_name = _normalise_format(format_value) filtered_selectors = [str(s).strip() for s in selectors if str(s).strip()] manual_target_selectors = filtered_selectors if filtered_selectors else None all_emitted = [] exit_code = 0 # ======================================================================== # PROCESS url AND CAPTURE SCREENSHOTS # ======================================================================== def _extract_item_tags(item: Any) -> List[str]: if item is None: return [] raw = get_field(item, 'tag') if isinstance(raw, list): return [str(t) for t in raw if t is not None and str(t).strip()] if isinstance(raw, str) and raw.strip(): return [raw.strip()] return [] def _extract_item_title(item: Any) -> str: if item is None: return "" for key in ("title", "name", "filename"): val = get_field(item, key) if val is None: continue text = str(val).strip() if text: return text return "" def _clean_title(text: str) -> str: value = (text or "").strip() if value.lower().startswith("screenshot:"): value = value.split(":", 1)[1].strip() return value for url, origin_item in url_to_process: # Validate URL format if not url.lower().startswith(("http://", "https://", "file://")): log(f"[screen_shot] Skipping non-URL input: {url}", file=sys.stderr) continue try: # Create screenshot with provided options # Force the Playwright engine to Chromium for the screen-shot cmdlet # (this ensures consistent rendering and supports PDF output requirements). pw_local_cfg = {} if isinstance(config, dict): tool_block = dict(config.get("tool") or {}) pw_block = dict(tool_block.get("playwright") or {}) pw_block["browser"] = "chromium" # Use Playwright-native UA/headers (matches bundled Chromium version). pw_block["user_agent"] = "native" pw_block["viewport_width"] = int(DEFAULT_VIEWPORT.get("width", 1920)) pw_block["viewport_height"] = int(DEFAULT_VIEWPORT.get("height", 1080)) tool_block["playwright"] = pw_block pw_local_cfg = dict(config) pw_local_cfg["tool"] = tool_block else: pw_local_cfg = { "tool": { "playwright": { "browser": "chromium", "user_agent": "native", "viewport_width": int(DEFAULT_VIEWPORT.get("width", 1920)), "viewport_height": int(DEFAULT_VIEWPORT.get("height", 1080)), } } } options = ScreenshotOptions( url=url, output_dir=screenshot_dir, output_format=format_name, archive=archive_enabled, target_selectors=None, prefer_platform_target=False, wait_for_article=False, full_page=True, playwright_tool=PlaywrightTool(pw_local_cfg), ) # Auto element capture for known sites (x.com/twitter/etc.). # - If the user provided --selector, treat that as an explicit target. # - Otherwise, if SITE_SELECTORS matches the URL, auto-capture the post/content element. auto_selectors = _matched_site_selectors(url) if manual_target_selectors: options.prefer_platform_target = True options.target_selectors = manual_target_selectors debug(f"[screen_shot] Using explicit selector(s): {manual_target_selectors}") elif auto_selectors: options.prefer_platform_target = True options.target_selectors = auto_selectors debug(f"[screen_shot] Auto selectors matched for url: {auto_selectors}") screenshot_result = _capture_screenshot(options) # Log results and warnings debug(f"Screenshot captured to {screenshot_result.path}") if screenshot_result.archive_url: debug(f"Archives: {', '.join(screenshot_result.archive_url)}") for warning in screenshot_result.warnings: debug(f"Warning: {warning}") # Compute hash of screenshot file screenshot_hash = None try: with open(screenshot_result.path, 'rb') as f: screenshot_hash = hashlib.sha256(f.read()).hexdigest() except Exception: pass # Create PipeObject result - marked as TEMP since derivative artifact capture_date = "" try: capture_date = datetime.fromtimestamp(screenshot_result.path.stat().st_mtime).date().isoformat() except Exception: capture_date = datetime.now().date().isoformat() upstream_title = _clean_title(_extract_item_title(origin_item)) display_title = upstream_title or url upstream_tags = _extract_item_tags(origin_item) filtered_upstream_tags = [ t for t in upstream_tags if not str(t).strip().lower().startswith(("type:", "date:")) ] merged_tags = unique_preserve_order( ["type:screenshot", f"date:{capture_date}"] + filtered_upstream_tags ) pipe_obj = create_pipe_object_result( source='screenshot', store='PATH', identifier=Path(screenshot_result.path).stem, file_path=str(screenshot_result.path), cmdlet_name='screen-shot', title=display_title, hash_value=screenshot_hash, is_temp=True, parent_hash=hashlib.sha256(url.encode()).hexdigest(), tag=merged_tags, extra={ 'source_url': url, 'archive_url': screenshot_result.archive_url, 'url': screenshot_result.url, 'target': str(screenshot_result.path), # Explicit target for add-file } ) # Emit the result so downstream cmdlet (like add-file) can use it pipeline_context.emit(pipe_obj) all_emitted.append(pipe_obj) # If we created a local progress UI, advance it per completed item. if local_progress_ui is not None: try: local_progress_ui.on_emit(0, pipe_obj) except Exception: pass except ScreenshotError as exc: log(f"Error taking screenshot of {url}: {exc}", file=sys.stderr) exit_code = 1 except Exception as exc: log(f"Unexpected error taking screenshot of {url}: {exc}", file=sys.stderr) import traceback traceback.print_exc(file=sys.stderr) exit_code = 1 try: if local_progress_ui is not None: try: local_progress_ui.finish_pipe(0, force_complete=True) except Exception: pass finally: if local_progress_ui is not None: try: local_progress_ui.stop() except Exception: pass try: if hasattr(pipeline_context, "set_live_progress"): pipeline_context.set_live_progress(None) except Exception: pass if not all_emitted: log(f"No screenshots were successfully captured", file=sys.stderr) return 1 # Log completion message (keep this as normal output) log(f"✓ Successfully captured {len(all_emitted)} screenshot(s)") return exit_code CMDLET = Cmdlet( name="screen-shot", summary="Capture a website screenshot", usage="screen-shot [options] or download-data | screen-shot [options]", alias=["screenshot", "ss"], arg=[ SharedArgs.URL, CmdletArg(name="format", type="string", description="Output format: webp, png, jpeg, or pdf"), CmdletArg(name="selector", type="string", description="CSS selector for element capture"), ], detail=[ "Uses Playwright Chromium engine only. Install Chromium with: python ./scripts/setup.py --playwright-only --browsers chromium", "PDF output requires headless Chromium (the cmdlet will enforce headless mode for PDF).", "Screenshots are temporary artifacts stored in the configured `temp` directory.", ] ) CMDLET.exec = _run CMDLET.register()