Files
Medios-Macina/cmdlet/screen_shot.py

789 lines
31 KiB
Python
Raw Normal View History

2025-12-11 12:47:30 -08:00
"""Screen-shot cmdlet for capturing screenshots of url in a pipeline.
2025-11-25 20:09:33 -08:00
This cmdlet processes files through the pipeline and creates screenshots using
Playwright, marking them as temporary artifacts for cleanup.
"""
from __future__ import annotations
import contextlib
import hashlib
import sys
import time
2025-12-16 01:45:01 -08:00
from datetime import datetime
2025-11-25 20:09:33 -08:00
import httpx
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence, Tuple
from urllib.parse import urlsplit, quote, urljoin
2025-12-11 19:04:02 -08:00
from SYS.logger import log, debug
from API.HTTP import HTTPClient
from SYS.utils import ensure_directory, unique_path, unique_preserve_order
2025-12-16 23:23:43 -08:00
from . import _shared as sh
Cmdlet = sh.Cmdlet
CmdletArg = sh.CmdletArg
SharedArgs = sh.SharedArgs
create_pipe_object_result = sh.create_pipe_object_result
normalize_result_input = sh.normalize_result_input
should_show_help = sh.should_show_help
get_field = sh.get_field
parse_cmdlet_args = sh.parse_cmdlet_args
2025-11-25 20:09:33 -08:00
import pipeline as pipeline_context
# ============================================================================
# CMDLET Metadata Declaration
# ============================================================================
# ============================================================================
# Playwright & Screenshot Dependencies
# ============================================================================
2025-12-16 23:23:43 -08:00
from tool.playwright import HAS_PLAYWRIGHT, PlaywrightTimeoutError, PlaywrightTool
2025-11-25 20:09:33 -08:00
try:
from config import resolve_output_dir
except ImportError:
try:
_parent_dir = str(Path(__file__).parent.parent)
if _parent_dir not in sys.path:
sys.path.insert(0, _parent_dir)
from config import resolve_output_dir
except ImportError:
resolve_output_dir = None
# ============================================================================
# Screenshot Constants & Configuration
# ============================================================================
USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36"
)
2025-12-12 21:55:38 -08:00
DEFAULT_VIEWPORT: dict[str, int] = {"width": 1280, "height": 1200}
2025-11-25 20:09:33 -08:00
ARCHIVE_TIMEOUT = 30.0
2025-11-27 10:59:01 -08:00
# Configurable selectors for specific websites
SITE_SELECTORS: Dict[str, List[str]] = {
"twitter.com": [
"article[role='article']",
"div[data-testid='tweet']",
"div[data-testid='cellInnerDiv'] article",
],
"x.com": [
"article[role='article']",
"div[data-testid='tweet']",
"div[data-testid='cellInnerDiv'] article",
],
"instagram.com": [
"article[role='presentation']",
"article[role='article']",
"div[role='dialog'] article",
"section main article",
],
"reddit.com": [
"shreddit-post",
"div[data-testid='post-container']",
"div[data-click-id='background']",
"article",
],
"rumble.com": [
"rumble-player, iframe.rumble",
"div.video-item--main",
"main article",
],
}
2025-11-25 20:09:33 -08:00
class ScreenshotError(RuntimeError):
"""Raised when screenshot capture or upload fails."""
@dataclass(slots=True)
class ScreenshotOptions:
"""Options controlling screenshot capture and post-processing."""
output_dir: Path
2025-12-12 21:55:38 -08:00
url: str = ""
2025-11-25 20:09:33 -08:00
output_path: Optional[Path] = None
full_page: bool = True
headless: bool = True
wait_after_load: float = 2.0
wait_for_article: bool = False
replace_video_posters: bool = True
2025-12-11 23:21:45 -08:00
tag: Sequence[str] = ()
2025-11-25 20:09:33 -08:00
archive: bool = False
archive_timeout: float = ARCHIVE_TIMEOUT
output_format: Optional[str] = None
prefer_platform_target: bool = False
target_selectors: Optional[Sequence[str]] = None
selector_timeout_ms: int = 10_000
2025-12-16 23:23:43 -08:00
playwright_tool: Optional[PlaywrightTool] = None
2025-11-25 20:09:33 -08:00
@dataclass(slots=True)
class ScreenshotResult:
"""Details about the captured screenshot."""
path: Path
2025-12-11 23:21:45 -08:00
tag_applied: List[str]
2025-12-11 12:47:30 -08:00
archive_url: List[str]
url: List[str]
2025-11-25 20:09:33 -08:00
warnings: List[str] = field(default_factory=list)
# ============================================================================
# Helper Functions
# ============================================================================
def _slugify_url(url: str) -> str:
"""Convert URL to filesystem-safe slug."""
parsed = urlsplit(url)
candidate = f"{parsed.netloc}{parsed.path}"
if parsed.query:
candidate += f"?{parsed.query}"
slug = "".join(char if char.isalnum() else "-" for char in candidate.lower())
slug = slug.strip("-") or "screenshot"
return slug[:100]
def _normalise_format(fmt: Optional[str]) -> str:
"""Normalize output format to valid values."""
if not fmt:
return "png"
value = fmt.strip().lower()
if value in {"jpg", "jpeg"}:
return "jpeg"
if value in {"png", "pdf"}:
return value
return "png"
def _format_suffix(fmt: str) -> str:
"""Get file suffix for format."""
if fmt == "jpeg":
return ".jpg"
return f".{fmt}"
def _selectors_for_url(url: str) -> List[str]:
"""Return a list of likely content selectors for known platforms."""
u = url.lower()
sels: List[str] = []
2025-11-27 10:59:01 -08:00
for domain, selectors in SITE_SELECTORS.items():
if domain in u:
sels.extend(selectors)
2025-11-25 20:09:33 -08:00
return sels or ["article"]
def _platform_preprocess(url: str, page: Any, warnings: List[str], timeout_ms: int = 10_000) -> None:
"""Best-effort page tweaks for popular platforms before capture."""
u = url.lower()
def _try_click_texts(texts: List[str], passes: int = 2, per_timeout: int = 700) -> int:
clicks = 0
for _ in range(max(1, passes)):
for t in texts:
try:
page.locator(f"text=/{t}/i").first.click(timeout=per_timeout)
clicks += 1
except PlaywrightTimeoutError:
pass
except Exception:
pass
time.sleep(0.1)
return clicks
# Dismiss common cookie/consent prompts
_try_click_texts(["accept", "i agree", "agree", "got it", "allow all", "consent"])
# Platform-specific expansions
if "reddit.com" in u:
_try_click_texts(["see more", "read more", "show more", "more"])
if ("twitter.com" in u) or ("x.com" in u):
_try_click_texts(["show more", "more"])
if "instagram.com" in u:
_try_click_texts(["more", "see more"])
if "tiktok.com" in u:
_try_click_texts(["more", "see more"])
if ("facebook.com" in u) or ("fb.watch" in u):
_try_click_texts(["see more", "show more", "more"])
if "rumble.com" in u:
_try_click_texts(["accept", "agree", "close"])
def _submit_wayback(url: str, timeout: float) -> Optional[str]:
"""Submit URL to Internet Archive Wayback Machine."""
encoded = quote(url, safe="/:?=&")
with HTTPClient() as client:
response = client.get(f"https://web.archive.org/save/{encoded}")
response.raise_for_status()
content_location = response.headers.get("Content-Location")
if content_location:
return urljoin("https://web.archive.org", content_location)
return str(response.url)
def _submit_archive_today(url: str, timeout: float) -> Optional[str]:
"""Submit URL to Archive.today."""
encoded = quote(url, safe=":/?#[]@!$&'()*+,;=")
with HTTPClient(headers={"User-Agent": USER_AGENT}) as client:
response = client.get(f"https://archive.today/submit/?url={encoded}")
response.raise_for_status()
final = str(response.url)
if final and ("archive.today" in final or "archive.ph" in final):
return final
return None
def _submit_archive_ph(url: str, timeout: float) -> Optional[str]:
"""Submit URL to Archive.ph."""
encoded = quote(url, safe=":/?#[]@!$&'()*+,;=")
with HTTPClient(headers={"User-Agent": USER_AGENT}) as client:
response = client.get(f"https://archive.ph/submit/?url={encoded}")
response.raise_for_status()
final = str(response.url)
if final and "archive.ph" in final:
return final
return None
def _archive_url(url: str, timeout: float) -> Tuple[List[str], List[str]]:
"""Submit URL to all available archive services."""
archives: List[str] = []
warnings: List[str] = []
for submitter, label in (
(_submit_wayback, "wayback"),
(_submit_archive_today, "archive.today"),
(_submit_archive_ph, "archive.ph"),
):
try:
2025-12-20 23:57:44 -08:00
debug(f"Archiving to {label}...")
2025-11-25 20:09:33 -08:00
archived = submitter(url, timeout)
except httpx.HTTPStatusError as exc:
if exc.response.status_code == 429:
warnings.append(f"archive {label} rate limited (HTTP 429)")
2025-12-20 23:57:44 -08:00
debug(f"{label}: Rate limited (HTTP 429)")
2025-11-25 20:09:33 -08:00
else:
warnings.append(f"archive {label} failed: HTTP {exc.response.status_code}")
2025-12-20 23:57:44 -08:00
debug(f"{label}: HTTP {exc.response.status_code}")
2025-11-25 20:09:33 -08:00
except httpx.RequestError as exc:
warnings.append(f"archive {label} failed: {exc}")
2025-12-20 23:57:44 -08:00
debug(f"{label}: Connection error: {exc}")
2025-11-25 20:09:33 -08:00
except Exception as exc:
warnings.append(f"archive {label} failed: {exc}")
2025-12-20 23:57:44 -08:00
debug(f"{label}: {exc}")
2025-11-25 20:09:33 -08:00
else:
if archived:
archives.append(archived)
2025-12-20 23:57:44 -08:00
debug(f"{label}: Success - {archived}")
2025-11-25 20:09:33 -08:00
else:
2025-12-20 23:57:44 -08:00
debug(f"{label}: No archive link returned")
2025-11-25 20:09:33 -08:00
return archives, warnings
def _prepare_output_path(options: ScreenshotOptions) -> Path:
"""Prepare and validate output path for screenshot."""
2025-11-27 10:59:01 -08:00
ensure_directory(options.output_dir)
2025-11-25 20:09:33 -08:00
explicit_format = _normalise_format(options.output_format) if options.output_format else None
inferred_format: Optional[str] = None
if options.output_path is not None:
path = options.output_path
if not path.is_absolute():
path = options.output_dir / path
suffix = path.suffix.lower()
if suffix:
inferred_format = _normalise_format(suffix[1:])
else:
stamp = time.strftime("%Y%m%d_%H%M%S")
filename = f"{_slugify_url(options.url)}_{stamp}"
path = options.output_dir / filename
final_format = explicit_format or inferred_format or "png"
if not path.suffix:
path = path.with_suffix(_format_suffix(final_format))
else:
current_suffix = path.suffix.lower()
expected = _format_suffix(final_format)
if current_suffix != expected:
path = path.with_suffix(expected)
options.output_format = final_format
2025-11-27 10:59:01 -08:00
return unique_path(path)
2025-11-25 20:09:33 -08:00
2025-11-27 10:59:01 -08:00
def _capture(options: ScreenshotOptions, destination: Path, warnings: List[str]) -> None:
2025-11-25 20:09:33 -08:00
"""Capture screenshot using Playwright."""
2025-11-27 10:59:01 -08:00
debug(f"[_capture] Starting capture for {options.url} -> {destination}")
2025-11-25 20:09:33 -08:00
try:
2025-12-16 23:23:43 -08:00
tool = options.playwright_tool or PlaywrightTool({})
2025-12-17 17:42:46 -08:00
# Ensure Chromium engine is used for the screen-shot cmdlet (force for consistency)
try:
current_browser = getattr(tool.defaults, "browser", "").lower() if getattr(tool, "defaults", None) is not None else ""
if current_browser != "chromium":
debug(f"[_capture] Overriding Playwright browser '{current_browser}' -> 'chromium' for screen-shot cmdlet")
tool = PlaywrightTool({"tool": {"playwright": {"browser": "chromium"}}})
except Exception:
tool = PlaywrightTool({"tool": {"playwright": {"browser": "chromium"}}})
2025-12-16 23:23:43 -08:00
tool.debug_dump()
2025-12-20 23:57:44 -08:00
debug("Launching browser...")
2025-11-25 20:09:33 -08:00
format_name = _normalise_format(options.output_format)
headless = options.headless or format_name == "pdf"
2025-11-27 10:59:01 -08:00
debug(f"[_capture] Format: {format_name}, Headless: {headless}")
2025-11-25 20:09:33 -08:00
if format_name == "pdf" and not options.headless:
warnings.append("pdf output requires headless Chromium; overriding headless mode")
2025-12-16 23:23:43 -08:00
2025-12-17 17:42:46 -08:00
try:
with tool.open_page(headless=headless) as page:
2025-12-20 23:57:44 -08:00
debug(f"Navigating to {options.url}...")
2025-11-25 20:09:33 -08:00
try:
2025-12-17 17:42:46 -08:00
tool.goto(page, options.url)
2025-12-20 23:57:44 -08:00
debug("Page loaded successfully")
2025-11-25 20:09:33 -08:00
except PlaywrightTimeoutError:
2025-12-17 17:42:46 -08:00
warnings.append("navigation timeout; capturing current page state")
2025-12-20 23:57:44 -08:00
debug("Navigation timeout; proceeding with current state")
2025-12-16 23:23:43 -08:00
2025-12-17 17:42:46 -08:00
# Skip article lookup by default (wait_for_article defaults to False)
if options.wait_for_article:
2025-12-16 23:23:43 -08:00
try:
2025-12-20 23:57:44 -08:00
debug("Waiting for article element...")
2025-12-17 17:42:46 -08:00
page.wait_for_selector("article", timeout=10_000)
2025-12-20 23:57:44 -08:00
debug("Article element found")
2025-12-16 23:23:43 -08:00
except PlaywrightTimeoutError:
2025-12-17 17:42:46 -08:00
warnings.append("<article> selector not found; capturing fallback")
2025-12-20 23:57:44 -08:00
debug("Article element not found; using fallback")
2025-12-17 17:42:46 -08:00
if options.wait_after_load > 0:
2025-12-20 23:57:44 -08:00
debug(f"Waiting {options.wait_after_load}s for page stabilization...")
2025-12-17 17:42:46 -08:00
time.sleep(min(10.0, max(0.0, options.wait_after_load)))
if options.replace_video_posters:
2025-12-20 23:57:44 -08:00
debug("Replacing video elements with posters...")
2025-12-17 17:42:46 -08:00
page.evaluate(
"""
document.querySelectorAll('video').forEach(v => {
if (v.poster) {
const img = document.createElement('img');
img.src = v.poster;
img.style.maxWidth = '100%';
img.style.borderRadius = '12px';
v.replaceWith(img);
}
});
"""
)
# Attempt platform-specific target capture if requested (and not PDF)
element_captured = False
if options.prefer_platform_target and format_name != "pdf":
2025-12-20 23:57:44 -08:00
debug("Attempting platform-specific content capture...")
2025-12-16 23:23:43 -08:00
try:
2025-12-17 17:42:46 -08:00
_platform_preprocess(options.url, page, warnings)
except Exception as e:
debug(f"[_capture] Platform preprocess failed: {e}")
pass
selectors = list(options.target_selectors or [])
if not selectors:
selectors = _selectors_for_url(options.url)
debug(f"[_capture] Trying selectors: {selectors}")
for sel in selectors:
try:
2025-12-20 23:57:44 -08:00
debug(f"Trying selector: {sel}")
2025-12-17 17:42:46 -08:00
el = page.wait_for_selector(sel, timeout=max(0, int(options.selector_timeout_ms)))
except PlaywrightTimeoutError:
2025-12-20 23:57:44 -08:00
debug(f"Selector not found: {sel}")
2025-12-17 17:42:46 -08:00
continue
try:
if el is not None:
2025-12-20 23:57:44 -08:00
debug(f"Found element with selector: {sel}")
2025-12-17 17:42:46 -08:00
try:
el.scroll_into_view_if_needed(timeout=1000)
except Exception:
pass
2025-12-20 23:57:44 -08:00
debug(f"Capturing element to {destination}...")
2025-12-17 17:42:46 -08:00
el.screenshot(path=str(destination), type=("jpeg" if format_name == "jpeg" else None))
element_captured = True
2025-12-20 23:57:44 -08:00
debug("Element captured successfully")
2025-12-17 17:42:46 -08:00
break
except Exception as exc:
warnings.append(f"element capture failed for '{sel}': {exc}")
2025-12-20 23:57:44 -08:00
debug(f"Failed to capture element: {exc}")
2025-12-17 17:42:46 -08:00
# Fallback to default capture paths
if element_captured:
pass
elif format_name == "pdf":
2025-12-20 23:57:44 -08:00
debug("Generating PDF...")
2025-12-17 17:42:46 -08:00
page.emulate_media(media="print")
page.pdf(path=str(destination), print_background=True)
2025-12-20 23:57:44 -08:00
debug(f"PDF saved to {destination}")
2025-11-25 20:09:33 -08:00
else:
2025-12-20 23:57:44 -08:00
debug(f"Capturing full page to {destination}...")
2025-12-17 17:42:46 -08:00
screenshot_kwargs: Dict[str, Any] = {"path": str(destination)}
if format_name == "jpeg":
screenshot_kwargs["type"] = "jpeg"
screenshot_kwargs["quality"] = 90
if options.full_page:
page.screenshot(full_page=True, **screenshot_kwargs)
2025-12-16 23:23:43 -08:00
else:
2025-12-17 17:42:46 -08:00
article = page.query_selector("article")
if article is not None:
article_kwargs = dict(screenshot_kwargs)
article_kwargs.pop("full_page", None)
article.screenshot(**article_kwargs)
else:
page.screenshot(**screenshot_kwargs)
2025-12-20 23:57:44 -08:00
debug(f"Screenshot saved to {destination}")
2025-12-17 17:42:46 -08:00
except Exception as exc:
debug(f"[_capture] Exception launching browser/page: {exc}")
msg = str(exc).lower()
if any(k in msg for k in ["executable", "not found", "no such file", "cannot find", "install"]):
raise ScreenshotError("Chromium Playwright browser binaries not found. Install them: python ./scripts/setup.py --playwright-only --browsers chromium") from exc
raise
except ScreenshotError:
# Re-raise ScreenshotError raised intentionally (do not wrap)
raise
2025-11-25 20:09:33 -08:00
except Exception as exc:
2025-11-27 10:59:01 -08:00
debug(f"[_capture] Exception: {exc}")
2025-11-25 20:09:33 -08:00
raise ScreenshotError(f"Failed to capture screenshot: {exc}") from exc
def _capture_screenshot(options: ScreenshotOptions) -> ScreenshotResult:
"""Capture a screenshot for the given options."""
2025-11-27 10:59:01 -08:00
debug(f"[_capture_screenshot] Preparing capture for {options.url}")
2025-11-25 20:09:33 -08:00
destination = _prepare_output_path(options)
warnings: List[str] = []
2025-11-27 10:59:01 -08:00
_capture(options, destination, warnings)
2025-11-25 20:09:33 -08:00
2025-12-12 21:55:38 -08:00
# Build URL list from captured url and any archives
url: List[str] = [options.url] if options.url else []
2025-12-11 12:47:30 -08:00
archive_url: List[str] = []
2025-12-12 21:55:38 -08:00
if options.archive and options.url:
2025-11-27 10:59:01 -08:00
debug(f"[_capture_screenshot] Archiving enabled for {options.url}")
2025-11-25 20:09:33 -08:00
archives, archive_warnings = _archive_url(options.url, options.archive_timeout)
2025-12-11 12:47:30 -08:00
archive_url.extend(archives)
2025-11-25 20:09:33 -08:00
warnings.extend(archive_warnings)
if archives:
2025-12-11 12:47:30 -08:00
url = unique_preserve_order([*url, *archives])
2025-11-25 20:09:33 -08:00
2025-12-11 23:21:45 -08:00
applied_tag = unique_preserve_order(list(tag for tag in options.tag if tag.strip()))
2025-11-25 20:09:33 -08:00
return ScreenshotResult(
path=destination,
2025-12-11 23:21:45 -08:00
tag_applied=applied_tag,
2025-12-11 12:47:30 -08:00
archive_url=archive_url,
url=url,
2025-11-25 20:09:33 -08:00
warnings=warnings,
)
# ============================================================================
# Main Cmdlet Function
# ============================================================================
def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
2025-12-11 12:47:30 -08:00
"""Take screenshots of url in the pipeline.
2025-11-25 20:09:33 -08:00
Accepts:
2025-12-11 12:47:30 -08:00
- Single result object (dict or PipeObject) with 'path' field
2025-11-25 20:09:33 -08:00
- List of result objects to screenshot each
- Direct URL as string
Emits PipeObject-formatted results for each screenshot with:
- action: 'cmdlet:screen-shot'
- is_temp: True (screenshots are temporary artifacts)
- parent_id: hash of the original file/URL
Screenshots are created using Playwright and marked as temporary
so they can be cleaned up later with the cleanup cmdlet.
"""
2025-11-27 10:59:01 -08:00
debug(f"[_run] screen-shot invoked with args: {args}")
2025-11-25 20:09:33 -08:00
# Help check
2025-12-11 12:47:30 -08:00
if should_show_help(args):
2025-12-12 21:55:38 -08:00
log(f"Cmdlet: {CMDLET.name}\nSummary: {CMDLET.summary}\nUsage: {CMDLET.usage}")
2025-12-11 12:47:30 -08:00
return 0
2025-11-25 20:09:33 -08:00
2025-12-12 21:55:38 -08:00
if not HAS_PLAYWRIGHT:
log(
"playwright is required for screenshot capture; install with: pip install playwright; then: playwright install",
file=sys.stderr,
)
return 1
2025-11-25 20:09:33 -08:00
# ========================================================================
# ARGUMENT PARSING
# ========================================================================
parsed = parse_cmdlet_args(args, CMDLET)
format_value = parsed.get("format")
2025-12-16 23:23:43 -08:00
if not format_value:
# Default format can be set via config.conf tool block:
# [tool=playwright]
# format="pdf"
try:
tool_cfg = config.get("tool", {}) if isinstance(config, dict) else {}
pw_cfg = tool_cfg.get("playwright") if isinstance(tool_cfg, dict) else None
if isinstance(pw_cfg, dict):
format_value = pw_cfg.get("format")
except Exception:
pass
if not format_value:
format_value = "png"
2025-11-25 20:09:33 -08:00
storage_value = parsed.get("storage")
selector_arg = parsed.get("selector")
selectors = [selector_arg] if selector_arg else []
archive_enabled = parsed.get("archive", False)
# Positional URL argument (if provided)
url_arg = parsed.get("url")
2025-12-11 12:47:30 -08:00
positional_url = [str(url_arg)] if url_arg else []
2025-11-25 20:09:33 -08:00
# ========================================================================
2025-12-11 12:47:30 -08:00
# INPUT PROCESSING - Extract url from pipeline or command arguments
2025-11-25 20:09:33 -08:00
# ========================================================================
piped_results = normalize_result_input(result)
2025-12-16 01:45:01 -08:00
url_to_process: List[Tuple[str, Any]] = []
2025-11-25 20:09:33 -08:00
2025-12-11 12:47:30 -08:00
# Extract url from piped results
2025-11-25 20:09:33 -08:00
if piped_results:
for item in piped_results:
2025-12-11 12:47:30 -08:00
url = (
get_field(item, 'path')
or get_field(item, 'url')
or get_field(item, 'target')
)
2025-11-25 20:09:33 -08:00
if url:
2025-12-16 01:45:01 -08:00
url_to_process.append((str(url), item))
2025-11-25 20:09:33 -08:00
# Use positional arguments if no pipeline input
2025-12-11 12:47:30 -08:00
if not url_to_process and positional_url:
2025-12-16 01:45:01 -08:00
url_to_process = [(u, None) for u in positional_url]
2025-11-25 20:09:33 -08:00
2025-12-11 12:47:30 -08:00
if not url_to_process:
log(f"No url to process for screen-shot cmdlet", file=sys.stderr)
2025-11-25 20:09:33 -08:00
return 1
2025-12-16 01:45:01 -08:00
debug(f"[_run] url to process: {[u for u, _ in url_to_process]}")
2025-11-27 10:59:01 -08:00
2025-11-25 20:09:33 -08:00
# ========================================================================
# OUTPUT DIRECTORY RESOLUTION - Priority chain
# ========================================================================
screenshot_dir: Optional[Path] = None
# Primary: Use --storage if provided (highest priority)
if storage_value:
try:
screenshot_dir = SharedArgs.resolve_storage(storage_value)
2025-12-20 23:57:44 -08:00
debug(f"[screen_shot] Using --storage {storage_value}: {screenshot_dir}")
2025-11-25 20:09:33 -08:00
except ValueError as e:
log(str(e), file=sys.stderr)
return 1
# Secondary: Use config-based resolver ONLY if --storage not provided
if screenshot_dir is None and resolve_output_dir is not None:
try:
screenshot_dir = resolve_output_dir(config)
2025-12-20 23:57:44 -08:00
debug(f"[screen_shot] Using config resolver: {screenshot_dir}")
2025-11-25 20:09:33 -08:00
except Exception:
pass
# Tertiary: Use config outfile ONLY if neither --storage nor resolver worked
if screenshot_dir is None and config and config.get("outfile"):
try:
screenshot_dir = Path(config["outfile"]).expanduser()
2025-12-20 23:57:44 -08:00
debug(f"[screen_shot] Using config outfile: {screenshot_dir}")
2025-11-25 20:09:33 -08:00
except Exception:
pass
# Default: User's Videos directory
if screenshot_dir is None:
screenshot_dir = Path.home() / "Videos"
2025-12-20 23:57:44 -08:00
debug(f"[screen_shot] Using default directory: {screenshot_dir}")
2025-11-25 20:09:33 -08:00
2025-11-27 10:59:01 -08:00
ensure_directory(screenshot_dir)
2025-11-25 20:09:33 -08:00
# ========================================================================
# PREPARE SCREENSHOT OPTIONS
# ========================================================================
format_name = _normalise_format(format_value)
filtered_selectors = [str(s).strip() for s in selectors if str(s).strip()]
target_selectors = filtered_selectors if filtered_selectors else None
all_emitted = []
exit_code = 0
# ========================================================================
2025-12-11 12:47:30 -08:00
# PROCESS url AND CAPTURE SCREENSHOTS
2025-11-25 20:09:33 -08:00
# ========================================================================
2025-12-16 01:45:01 -08:00
def _extract_item_tags(item: Any) -> List[str]:
if item is None:
return []
raw = get_field(item, 'tag')
if isinstance(raw, list):
return [str(t) for t in raw if t is not None and str(t).strip()]
if isinstance(raw, str) and raw.strip():
return [raw.strip()]
return []
def _extract_item_title(item: Any) -> str:
if item is None:
return ""
for key in ("title", "name", "filename"):
val = get_field(item, key)
if val is None:
continue
text = str(val).strip()
if text:
return text
return ""
def _clean_title(text: str) -> str:
value = (text or "").strip()
if value.lower().startswith("screenshot:"):
value = value.split(":", 1)[1].strip()
return value
for url, origin_item in url_to_process:
2025-11-25 20:09:33 -08:00
# Validate URL format
if not url.lower().startswith(("http://", "https://", "file://")):
log(f"[screen_shot] Skipping non-URL input: {url}", file=sys.stderr)
continue
try:
# Create screenshot with provided options
2025-12-17 17:42:46 -08:00
# Force the Playwright engine to Chromium for the screen-shot cmdlet
# (this ensures consistent rendering and supports PDF output requirements).
pw_local_cfg = {}
if isinstance(config, dict):
tool_block = dict(config.get("tool") or {})
pw_block = dict(tool_block.get("playwright") or {})
pw_block["browser"] = "chromium"
tool_block["playwright"] = pw_block
pw_local_cfg = dict(config)
pw_local_cfg["tool"] = tool_block
else:
pw_local_cfg = {"tool": {"playwright": {"browser": "chromium"}}}
2025-11-25 20:09:33 -08:00
options = ScreenshotOptions(
2025-12-12 21:55:38 -08:00
url=url,
2025-11-25 20:09:33 -08:00
output_dir=screenshot_dir,
output_format=format_name,
archive=archive_enabled,
target_selectors=target_selectors,
prefer_platform_target=False,
wait_for_article=False,
full_page=True,
2025-12-17 17:42:46 -08:00
playwright_tool=PlaywrightTool(pw_local_cfg),
2025-11-25 20:09:33 -08:00
)
screenshot_result = _capture_screenshot(options)
# Log results and warnings
2025-12-20 23:57:44 -08:00
debug(f"Screenshot captured to {screenshot_result.path}")
2025-12-11 12:47:30 -08:00
if screenshot_result.archive_url:
2025-12-20 23:57:44 -08:00
debug(f"Archives: {', '.join(screenshot_result.archive_url)}")
2025-11-25 20:09:33 -08:00
for warning in screenshot_result.warnings:
2025-12-20 23:57:44 -08:00
debug(f"Warning: {warning}")
2025-11-25 20:09:33 -08:00
# Compute hash of screenshot file
screenshot_hash = None
try:
with open(screenshot_result.path, 'rb') as f:
screenshot_hash = hashlib.sha256(f.read()).hexdigest()
except Exception:
pass
# Create PipeObject result - marked as TEMP since derivative artifact
2025-12-16 01:45:01 -08:00
capture_date = ""
try:
capture_date = datetime.fromtimestamp(screenshot_result.path.stat().st_mtime).date().isoformat()
except Exception:
capture_date = datetime.now().date().isoformat()
upstream_title = _clean_title(_extract_item_title(origin_item))
display_title = upstream_title or url
upstream_tags = _extract_item_tags(origin_item)
filtered_upstream_tags = [
t for t in upstream_tags
if not str(t).strip().lower().startswith(("type:", "date:"))
]
merged_tags = unique_preserve_order(
["type:screenshot", f"date:{capture_date}"] + filtered_upstream_tags
)
2025-11-25 20:09:33 -08:00
pipe_obj = create_pipe_object_result(
source='screenshot',
2025-12-17 03:16:41 -08:00
store='PATH',
2025-11-25 20:09:33 -08:00
identifier=Path(screenshot_result.path).stem,
file_path=str(screenshot_result.path),
cmdlet_name='screen-shot',
2025-12-16 01:45:01 -08:00
title=display_title,
2025-12-11 19:04:02 -08:00
hash_value=screenshot_hash,
2025-11-25 20:09:33 -08:00
is_temp=True,
parent_hash=hashlib.sha256(url.encode()).hexdigest(),
2025-12-16 01:45:01 -08:00
tag=merged_tags,
2025-11-25 20:09:33 -08:00
extra={
'source_url': url,
2025-12-11 12:47:30 -08:00
'archive_url': screenshot_result.archive_url,
'url': screenshot_result.url,
2025-11-25 20:09:33 -08:00
'target': str(screenshot_result.path), # Explicit target for add-file
}
)
2025-12-12 21:55:38 -08:00
# Emit the result so downstream cmdlet (like add-file) can use it
2025-11-25 20:09:33 -08:00
pipeline_context.emit(pipe_obj)
all_emitted.append(pipe_obj)
except ScreenshotError as exc:
log(f"Error taking screenshot of {url}: {exc}", file=sys.stderr)
exit_code = 1
except Exception as exc:
log(f"Unexpected error taking screenshot of {url}: {exc}", file=sys.stderr)
import traceback
traceback.print_exc(file=sys.stderr)
exit_code = 1
if not all_emitted:
log(f"No screenshots were successfully captured", file=sys.stderr)
return 1
2025-12-20 23:57:44 -08:00
# Log completion message (keep this as normal output)
log(f"✓ Successfully captured {len(all_emitted)} screenshot(s)")
2025-11-25 20:09:33 -08:00
return exit_code
CMDLET = Cmdlet(
name="screen-shot",
2025-12-11 19:04:02 -08:00
summary="Capture a website screenshot",
2025-11-25 20:09:33 -08:00
usage="screen-shot <url> [options] or download-data <url> | screen-shot [options]",
2025-12-11 12:47:30 -08:00
alias=["screenshot", "ss"],
arg=[
2025-12-11 19:04:02 -08:00
SharedArgs.URL,
2025-11-25 20:09:33 -08:00
CmdletArg(name="format", type="string", description="Output format: png, jpeg, or pdf"),
CmdletArg(name="selector", type="string", description="CSS selector for element capture"),
2025-12-11 19:04:02 -08:00
2025-11-25 20:09:33 -08:00
],
2025-12-17 17:42:46 -08:00
detail=[
"Uses Playwright Chromium engine only. Install Chromium with: python ./scripts/setup.py --playwright-only --browsers chromium",
"PDF output requires headless Chromium (the cmdlet will enforce headless mode for PDF).",
"Screenshots are temporary artifacts stored in the configured `temp` directory.",
]
2025-11-25 20:09:33 -08:00
)
2025-12-12 21:55:38 -08:00
CMDLET.exec = _run
CMDLET.register()