740 lines
28 KiB
Python
740 lines
28 KiB
Python
|
|
"""Screen-shot cmdlet for capturing screenshots of URLs in a pipeline.
|
||
|
|
|
||
|
|
This cmdlet processes files through the pipeline and creates screenshots using
|
||
|
|
Playwright, marking them as temporary artifacts for cleanup.
|
||
|
|
"""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import contextlib
|
||
|
|
import hashlib
|
||
|
|
import importlib
|
||
|
|
import sys
|
||
|
|
import time
|
||
|
|
import httpx
|
||
|
|
from dataclasses import dataclass, field
|
||
|
|
from pathlib import Path
|
||
|
|
from typing import Any, Dict, List, Optional, Sequence, Tuple
|
||
|
|
from urllib.parse import urlsplit, quote, urljoin
|
||
|
|
|
||
|
|
from helper.logger import log
|
||
|
|
from helper.http_client import HTTPClient
|
||
|
|
|
||
|
|
from . import register
|
||
|
|
from ._shared import Cmdlet, CmdletArg, SharedArgs, create_pipe_object_result, normalize_result_input
|
||
|
|
import models
|
||
|
|
import pipeline as pipeline_context
|
||
|
|
|
||
|
|
# ============================================================================
|
||
|
|
# CMDLET Metadata Declaration
|
||
|
|
# ============================================================================
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
# ============================================================================
|
||
|
|
# Playwright & Screenshot Dependencies
|
||
|
|
# ============================================================================
|
||
|
|
|
||
|
|
try:
|
||
|
|
from playwright.sync_api import (
|
||
|
|
TimeoutError as PlaywrightTimeoutError,
|
||
|
|
ViewportSize,
|
||
|
|
sync_playwright,
|
||
|
|
)
|
||
|
|
except Exception as exc:
|
||
|
|
raise RuntimeError(
|
||
|
|
"playwright is required for screenshot capture; install with 'pip install playwright'"
|
||
|
|
) from exc
|
||
|
|
|
||
|
|
try:
|
||
|
|
from config import resolve_output_dir
|
||
|
|
except ImportError:
|
||
|
|
try:
|
||
|
|
_parent_dir = str(Path(__file__).parent.parent)
|
||
|
|
if _parent_dir not in sys.path:
|
||
|
|
sys.path.insert(0, _parent_dir)
|
||
|
|
from config import resolve_output_dir
|
||
|
|
except ImportError:
|
||
|
|
resolve_output_dir = None
|
||
|
|
|
||
|
|
# ============================================================================
|
||
|
|
# Screenshot Constants & Configuration
|
||
|
|
# ============================================================================
|
||
|
|
|
||
|
|
USER_AGENT = (
|
||
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
|
||
|
|
"AppleWebKit/537.36 (KHTML, like Gecko) "
|
||
|
|
"Chrome/120.0.0.0 Safari/537.36"
|
||
|
|
)
|
||
|
|
|
||
|
|
DEFAULT_VIEWPORT: ViewportSize = {"width": 1280, "height": 1200}
|
||
|
|
ARCHIVE_TIMEOUT = 30.0
|
||
|
|
|
||
|
|
|
||
|
|
class ScreenshotError(RuntimeError):
|
||
|
|
"""Raised when screenshot capture or upload fails."""
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass(slots=True)
|
||
|
|
class ScreenshotOptions:
|
||
|
|
"""Options controlling screenshot capture and post-processing."""
|
||
|
|
|
||
|
|
url: str
|
||
|
|
output_dir: Path
|
||
|
|
output_path: Optional[Path] = None
|
||
|
|
full_page: bool = True
|
||
|
|
headless: bool = True
|
||
|
|
wait_after_load: float = 2.0
|
||
|
|
wait_for_article: bool = False
|
||
|
|
replace_video_posters: bool = True
|
||
|
|
tags: Sequence[str] = ()
|
||
|
|
archive: bool = False
|
||
|
|
archive_timeout: float = ARCHIVE_TIMEOUT
|
||
|
|
known_urls: Sequence[str] = ()
|
||
|
|
output_format: Optional[str] = None
|
||
|
|
prefer_platform_target: bool = False
|
||
|
|
target_selectors: Optional[Sequence[str]] = None
|
||
|
|
selector_timeout_ms: int = 10_000
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass(slots=True)
|
||
|
|
class ScreenshotResult:
|
||
|
|
"""Details about the captured screenshot."""
|
||
|
|
|
||
|
|
path: Path
|
||
|
|
url: str
|
||
|
|
tags_applied: List[str]
|
||
|
|
archive_urls: List[str]
|
||
|
|
known_urls: List[str]
|
||
|
|
warnings: List[str] = field(default_factory=list)
|
||
|
|
|
||
|
|
|
||
|
|
# ============================================================================
|
||
|
|
# Helper Functions
|
||
|
|
# ============================================================================
|
||
|
|
|
||
|
|
def _ensure_directory(path: Path) -> None:
|
||
|
|
"""Ensure directory exists."""
|
||
|
|
if not isinstance(path, Path):
|
||
|
|
path = Path(path)
|
||
|
|
path.mkdir(parents=True, exist_ok=True)
|
||
|
|
|
||
|
|
|
||
|
|
def _unique_path(path: Path) -> Path:
|
||
|
|
"""Get unique path by appending numbers if file exists."""
|
||
|
|
if not path.exists():
|
||
|
|
return path
|
||
|
|
stem = path.stem
|
||
|
|
suffix = path.suffix
|
||
|
|
parent = path.parent
|
||
|
|
counter = 1
|
||
|
|
while True:
|
||
|
|
new_path = parent / f"{stem}_{counter}{suffix}"
|
||
|
|
if not new_path.exists():
|
||
|
|
return new_path
|
||
|
|
counter += 1
|
||
|
|
|
||
|
|
|
||
|
|
def _unique_preserve_order(items: Sequence[str]) -> List[str]:
|
||
|
|
"""Remove duplicates while preserving order."""
|
||
|
|
seen = set()
|
||
|
|
result = []
|
||
|
|
for item in items:
|
||
|
|
if item not in seen:
|
||
|
|
seen.add(item)
|
||
|
|
result.append(item)
|
||
|
|
return result
|
||
|
|
|
||
|
|
|
||
|
|
def _slugify_url(url: str) -> str:
|
||
|
|
"""Convert URL to filesystem-safe slug."""
|
||
|
|
parsed = urlsplit(url)
|
||
|
|
candidate = f"{parsed.netloc}{parsed.path}"
|
||
|
|
if parsed.query:
|
||
|
|
candidate += f"?{parsed.query}"
|
||
|
|
slug = "".join(char if char.isalnum() else "-" for char in candidate.lower())
|
||
|
|
slug = slug.strip("-") or "screenshot"
|
||
|
|
return slug[:100]
|
||
|
|
|
||
|
|
|
||
|
|
def _normalise_format(fmt: Optional[str]) -> str:
|
||
|
|
"""Normalize output format to valid values."""
|
||
|
|
if not fmt:
|
||
|
|
return "png"
|
||
|
|
value = fmt.strip().lower()
|
||
|
|
if value in {"jpg", "jpeg"}:
|
||
|
|
return "jpeg"
|
||
|
|
if value in {"png", "pdf"}:
|
||
|
|
return value
|
||
|
|
return "png"
|
||
|
|
|
||
|
|
|
||
|
|
def _format_suffix(fmt: str) -> str:
|
||
|
|
"""Get file suffix for format."""
|
||
|
|
if fmt == "jpeg":
|
||
|
|
return ".jpg"
|
||
|
|
return f".{fmt}"
|
||
|
|
|
||
|
|
|
||
|
|
def _selectors_for_url(url: str) -> List[str]:
|
||
|
|
"""Return a list of likely content selectors for known platforms."""
|
||
|
|
u = url.lower()
|
||
|
|
sels: List[str] = []
|
||
|
|
# Twitter/X
|
||
|
|
if "twitter.com" in u or "x.com" in u:
|
||
|
|
sels.extend([
|
||
|
|
"article[role='article']",
|
||
|
|
"div[data-testid='tweet']",
|
||
|
|
"div[data-testid='cellInnerDiv'] article",
|
||
|
|
])
|
||
|
|
# Instagram
|
||
|
|
if "instagram.com" in u:
|
||
|
|
sels.extend([
|
||
|
|
"article[role='presentation']",
|
||
|
|
"article[role='article']",
|
||
|
|
"div[role='dialog'] article",
|
||
|
|
"section main article",
|
||
|
|
])
|
||
|
|
# Reddit
|
||
|
|
if "reddit.com" in u:
|
||
|
|
sels.extend([
|
||
|
|
"shreddit-post",
|
||
|
|
"div[data-testid='post-container']",
|
||
|
|
"div[data-click-id='background']",
|
||
|
|
"article",
|
||
|
|
])
|
||
|
|
# Rumble (video post)
|
||
|
|
if "rumble.com" in u:
|
||
|
|
sels.extend([
|
||
|
|
"rumble-player, iframe.rumble",
|
||
|
|
"div.video-item--main",
|
||
|
|
"main article",
|
||
|
|
])
|
||
|
|
return sels or ["article"]
|
||
|
|
|
||
|
|
|
||
|
|
def _platform_preprocess(url: str, page: Any, warnings: List[str], timeout_ms: int = 10_000) -> None:
|
||
|
|
"""Best-effort page tweaks for popular platforms before capture."""
|
||
|
|
u = url.lower()
|
||
|
|
|
||
|
|
def _try_click_texts(texts: List[str], passes: int = 2, per_timeout: int = 700) -> int:
|
||
|
|
clicks = 0
|
||
|
|
for _ in range(max(1, passes)):
|
||
|
|
for t in texts:
|
||
|
|
try:
|
||
|
|
page.locator(f"text=/{t}/i").first.click(timeout=per_timeout)
|
||
|
|
clicks += 1
|
||
|
|
except PlaywrightTimeoutError:
|
||
|
|
pass
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
time.sleep(0.1)
|
||
|
|
return clicks
|
||
|
|
|
||
|
|
# Dismiss common cookie/consent prompts
|
||
|
|
_try_click_texts(["accept", "i agree", "agree", "got it", "allow all", "consent"])
|
||
|
|
|
||
|
|
# Platform-specific expansions
|
||
|
|
if "reddit.com" in u:
|
||
|
|
_try_click_texts(["see more", "read more", "show more", "more"])
|
||
|
|
if ("twitter.com" in u) or ("x.com" in u):
|
||
|
|
_try_click_texts(["show more", "more"])
|
||
|
|
if "instagram.com" in u:
|
||
|
|
_try_click_texts(["more", "see more"])
|
||
|
|
if "tiktok.com" in u:
|
||
|
|
_try_click_texts(["more", "see more"])
|
||
|
|
if ("facebook.com" in u) or ("fb.watch" in u):
|
||
|
|
_try_click_texts(["see more", "show more", "more"])
|
||
|
|
if "rumble.com" in u:
|
||
|
|
_try_click_texts(["accept", "agree", "close"])
|
||
|
|
|
||
|
|
|
||
|
|
def _submit_wayback(url: str, timeout: float) -> Optional[str]:
|
||
|
|
"""Submit URL to Internet Archive Wayback Machine."""
|
||
|
|
encoded = quote(url, safe="/:?=&")
|
||
|
|
with HTTPClient() as client:
|
||
|
|
response = client.get(f"https://web.archive.org/save/{encoded}")
|
||
|
|
response.raise_for_status()
|
||
|
|
content_location = response.headers.get("Content-Location")
|
||
|
|
if content_location:
|
||
|
|
return urljoin("https://web.archive.org", content_location)
|
||
|
|
return str(response.url)
|
||
|
|
|
||
|
|
|
||
|
|
def _submit_archive_today(url: str, timeout: float) -> Optional[str]:
|
||
|
|
"""Submit URL to Archive.today."""
|
||
|
|
encoded = quote(url, safe=":/?#[]@!$&'()*+,;=")
|
||
|
|
with HTTPClient(headers={"User-Agent": USER_AGENT}) as client:
|
||
|
|
response = client.get(f"https://archive.today/submit/?url={encoded}")
|
||
|
|
response.raise_for_status()
|
||
|
|
final = str(response.url)
|
||
|
|
if final and ("archive.today" in final or "archive.ph" in final):
|
||
|
|
return final
|
||
|
|
return None
|
||
|
|
|
||
|
|
|
||
|
|
def _submit_archive_ph(url: str, timeout: float) -> Optional[str]:
|
||
|
|
"""Submit URL to Archive.ph."""
|
||
|
|
encoded = quote(url, safe=":/?#[]@!$&'()*+,;=")
|
||
|
|
with HTTPClient(headers={"User-Agent": USER_AGENT}) as client:
|
||
|
|
response = client.get(f"https://archive.ph/submit/?url={encoded}")
|
||
|
|
response.raise_for_status()
|
||
|
|
final = str(response.url)
|
||
|
|
if final and "archive.ph" in final:
|
||
|
|
return final
|
||
|
|
return None
|
||
|
|
|
||
|
|
|
||
|
|
def _archive_url(url: str, timeout: float) -> Tuple[List[str], List[str]]:
|
||
|
|
"""Submit URL to all available archive services."""
|
||
|
|
archives: List[str] = []
|
||
|
|
warnings: List[str] = []
|
||
|
|
for submitter, label in (
|
||
|
|
(_submit_wayback, "wayback"),
|
||
|
|
(_submit_archive_today, "archive.today"),
|
||
|
|
(_submit_archive_ph, "archive.ph"),
|
||
|
|
):
|
||
|
|
try:
|
||
|
|
log(f"Archiving to {label}...", flush=True)
|
||
|
|
archived = submitter(url, timeout)
|
||
|
|
except httpx.HTTPStatusError as exc:
|
||
|
|
if exc.response.status_code == 429:
|
||
|
|
warnings.append(f"archive {label} rate limited (HTTP 429)")
|
||
|
|
log(f"{label}: Rate limited (HTTP 429)", flush=True)
|
||
|
|
else:
|
||
|
|
warnings.append(f"archive {label} failed: HTTP {exc.response.status_code}")
|
||
|
|
log(f"{label}: HTTP {exc.response.status_code}", flush=True)
|
||
|
|
except httpx.RequestError as exc:
|
||
|
|
warnings.append(f"archive {label} failed: {exc}")
|
||
|
|
log(f"{label}: Connection error: {exc}", flush=True)
|
||
|
|
except Exception as exc:
|
||
|
|
warnings.append(f"archive {label} failed: {exc}")
|
||
|
|
log(f"{label}: {exc}", flush=True)
|
||
|
|
else:
|
||
|
|
if archived:
|
||
|
|
archives.append(archived)
|
||
|
|
log(f"{label}: Success - {archived}", flush=True)
|
||
|
|
else:
|
||
|
|
log(f"{label}: No archive link returned", flush=True)
|
||
|
|
return archives, warnings
|
||
|
|
|
||
|
|
|
||
|
|
def _prepare_output_path(options: ScreenshotOptions) -> Path:
|
||
|
|
"""Prepare and validate output path for screenshot."""
|
||
|
|
_ensure_directory(options.output_dir)
|
||
|
|
explicit_format = _normalise_format(options.output_format) if options.output_format else None
|
||
|
|
inferred_format: Optional[str] = None
|
||
|
|
if options.output_path is not None:
|
||
|
|
path = options.output_path
|
||
|
|
if not path.is_absolute():
|
||
|
|
path = options.output_dir / path
|
||
|
|
suffix = path.suffix.lower()
|
||
|
|
if suffix:
|
||
|
|
inferred_format = _normalise_format(suffix[1:])
|
||
|
|
else:
|
||
|
|
stamp = time.strftime("%Y%m%d_%H%M%S")
|
||
|
|
filename = f"{_slugify_url(options.url)}_{stamp}"
|
||
|
|
path = options.output_dir / filename
|
||
|
|
final_format = explicit_format or inferred_format or "png"
|
||
|
|
if not path.suffix:
|
||
|
|
path = path.with_suffix(_format_suffix(final_format))
|
||
|
|
else:
|
||
|
|
current_suffix = path.suffix.lower()
|
||
|
|
expected = _format_suffix(final_format)
|
||
|
|
if current_suffix != expected:
|
||
|
|
path = path.with_suffix(expected)
|
||
|
|
options.output_format = final_format
|
||
|
|
return _unique_path(path)
|
||
|
|
|
||
|
|
|
||
|
|
def _capture_with_playwright(options: ScreenshotOptions, destination: Path, warnings: List[str]) -> None:
|
||
|
|
"""Capture screenshot using Playwright."""
|
||
|
|
playwright = None
|
||
|
|
browser = None
|
||
|
|
context = None
|
||
|
|
try:
|
||
|
|
log("Starting Playwright...", flush=True)
|
||
|
|
playwright = sync_playwright().start()
|
||
|
|
log("Launching Chromium browser...", flush=True)
|
||
|
|
format_name = _normalise_format(options.output_format)
|
||
|
|
headless = options.headless or format_name == "pdf"
|
||
|
|
if format_name == "pdf" and not options.headless:
|
||
|
|
warnings.append("pdf output requires headless Chromium; overriding headless mode")
|
||
|
|
browser = playwright.chromium.launch(
|
||
|
|
headless=headless,
|
||
|
|
args=["--disable-blink-features=AutomationControlled"],
|
||
|
|
)
|
||
|
|
log("Creating browser context...", flush=True)
|
||
|
|
context = browser.new_context(
|
||
|
|
user_agent=USER_AGENT,
|
||
|
|
viewport=DEFAULT_VIEWPORT,
|
||
|
|
ignore_https_errors=True,
|
||
|
|
)
|
||
|
|
page = context.new_page()
|
||
|
|
log(f"Navigating to {options.url}...", flush=True)
|
||
|
|
try:
|
||
|
|
page.goto(options.url, timeout=90_000, wait_until="domcontentloaded")
|
||
|
|
log("Page loaded successfully", flush=True)
|
||
|
|
except PlaywrightTimeoutError:
|
||
|
|
warnings.append("navigation timeout; capturing current page state")
|
||
|
|
log("Navigation timeout; proceeding with current state", flush=True)
|
||
|
|
|
||
|
|
# Skip article lookup by default (wait_for_article defaults to False)
|
||
|
|
if options.wait_for_article:
|
||
|
|
try:
|
||
|
|
log("Waiting for article element...", flush=True)
|
||
|
|
page.wait_for_selector("article", timeout=10_000)
|
||
|
|
log("Article element found", flush=True)
|
||
|
|
except PlaywrightTimeoutError:
|
||
|
|
warnings.append("<article> selector not found; capturing fallback")
|
||
|
|
log("Article element not found; using fallback", flush=True)
|
||
|
|
|
||
|
|
if options.wait_after_load > 0:
|
||
|
|
log(f"Waiting {options.wait_after_load}s for page stabilization...", flush=True)
|
||
|
|
time.sleep(min(10.0, max(0.0, options.wait_after_load)))
|
||
|
|
if options.replace_video_posters:
|
||
|
|
log("Replacing video elements with posters...", flush=True)
|
||
|
|
page.evaluate(
|
||
|
|
"""
|
||
|
|
document.querySelectorAll('video').forEach(v => {
|
||
|
|
if (v.poster) {
|
||
|
|
const img = document.createElement('img');
|
||
|
|
img.src = v.poster;
|
||
|
|
img.style.maxWidth = '100%';
|
||
|
|
img.style.borderRadius = '12px';
|
||
|
|
v.replaceWith(img);
|
||
|
|
}
|
||
|
|
});
|
||
|
|
"""
|
||
|
|
)
|
||
|
|
# Attempt platform-specific target capture if requested (and not PDF)
|
||
|
|
element_captured = False
|
||
|
|
if options.prefer_platform_target and format_name != "pdf":
|
||
|
|
log("Attempting platform-specific content capture...", flush=True)
|
||
|
|
try:
|
||
|
|
_platform_preprocess(options.url, page, warnings)
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
selectors = list(options.target_selectors or [])
|
||
|
|
if not selectors:
|
||
|
|
selectors = _selectors_for_url(options.url)
|
||
|
|
for sel in selectors:
|
||
|
|
try:
|
||
|
|
log(f"Trying selector: {sel}", flush=True)
|
||
|
|
el = page.wait_for_selector(sel, timeout=max(0, int(options.selector_timeout_ms)))
|
||
|
|
except PlaywrightTimeoutError:
|
||
|
|
log(f"Selector not found: {sel}", flush=True)
|
||
|
|
continue
|
||
|
|
try:
|
||
|
|
if el is not None:
|
||
|
|
log(f"Found element with selector: {sel}", flush=True)
|
||
|
|
try:
|
||
|
|
el.scroll_into_view_if_needed(timeout=1000)
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
log(f"Capturing element to {destination}...", flush=True)
|
||
|
|
el.screenshot(path=str(destination), type=("jpeg" if format_name == "jpeg" else None))
|
||
|
|
element_captured = True
|
||
|
|
log("Element captured successfully", flush=True)
|
||
|
|
break
|
||
|
|
except Exception as exc:
|
||
|
|
warnings.append(f"element capture failed for '{sel}': {exc}")
|
||
|
|
log(f"Failed to capture element: {exc}", flush=True)
|
||
|
|
# Fallback to default capture paths
|
||
|
|
if element_captured:
|
||
|
|
pass
|
||
|
|
elif format_name == "pdf":
|
||
|
|
log("Generating PDF...", flush=True)
|
||
|
|
page.emulate_media(media="print")
|
||
|
|
page.pdf(path=str(destination), print_background=True)
|
||
|
|
log(f"PDF saved to {destination}", flush=True)
|
||
|
|
else:
|
||
|
|
log(f"Capturing full page to {destination}...", flush=True)
|
||
|
|
screenshot_kwargs: Dict[str, Any] = {"path": str(destination)}
|
||
|
|
if format_name == "jpeg":
|
||
|
|
screenshot_kwargs["type"] = "jpeg"
|
||
|
|
screenshot_kwargs["quality"] = 90
|
||
|
|
if options.full_page:
|
||
|
|
page.screenshot(full_page=True, **screenshot_kwargs)
|
||
|
|
else:
|
||
|
|
article = page.query_selector("article")
|
||
|
|
if article is not None:
|
||
|
|
article_kwargs = dict(screenshot_kwargs)
|
||
|
|
article_kwargs.pop("full_page", None)
|
||
|
|
article.screenshot(**article_kwargs)
|
||
|
|
else:
|
||
|
|
page.screenshot(**screenshot_kwargs)
|
||
|
|
log(f"Screenshot saved to {destination}", flush=True)
|
||
|
|
except Exception as exc:
|
||
|
|
raise ScreenshotError(f"Failed to capture screenshot: {exc}") from exc
|
||
|
|
finally:
|
||
|
|
log("Cleaning up browser resources...", flush=True)
|
||
|
|
with contextlib.suppress(Exception):
|
||
|
|
if context is not None:
|
||
|
|
context.close()
|
||
|
|
with contextlib.suppress(Exception):
|
||
|
|
if browser is not None:
|
||
|
|
browser.close()
|
||
|
|
with contextlib.suppress(Exception):
|
||
|
|
if playwright is not None:
|
||
|
|
playwright.stop()
|
||
|
|
log("Cleanup complete", flush=True)
|
||
|
|
|
||
|
|
|
||
|
|
def _capture_screenshot(options: ScreenshotOptions) -> ScreenshotResult:
|
||
|
|
"""Capture a screenshot for the given options."""
|
||
|
|
destination = _prepare_output_path(options)
|
||
|
|
warnings: List[str] = []
|
||
|
|
_capture_with_playwright(options, destination, warnings)
|
||
|
|
|
||
|
|
known_urls = _unique_preserve_order([options.url, *options.known_urls])
|
||
|
|
archive_urls: List[str] = []
|
||
|
|
if options.archive:
|
||
|
|
archives, archive_warnings = _archive_url(options.url, options.archive_timeout)
|
||
|
|
archive_urls.extend(archives)
|
||
|
|
warnings.extend(archive_warnings)
|
||
|
|
if archives:
|
||
|
|
known_urls = _unique_preserve_order([*known_urls, *archives])
|
||
|
|
|
||
|
|
applied_tags = _unique_preserve_order(list(tag for tag in options.tags if tag.strip()))
|
||
|
|
|
||
|
|
return ScreenshotResult(
|
||
|
|
path=destination,
|
||
|
|
url=options.url,
|
||
|
|
tags_applied=applied_tags,
|
||
|
|
archive_urls=archive_urls,
|
||
|
|
known_urls=known_urls,
|
||
|
|
warnings=warnings,
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
# ============================================================================
|
||
|
|
# Main Cmdlet Function
|
||
|
|
# ============================================================================
|
||
|
|
|
||
|
|
def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
|
||
|
|
"""Take screenshots of URLs in the pipeline.
|
||
|
|
|
||
|
|
Accepts:
|
||
|
|
- Single result object (dict or PipeObject) with 'file_path' field
|
||
|
|
- List of result objects to screenshot each
|
||
|
|
- Direct URL as string
|
||
|
|
|
||
|
|
Emits PipeObject-formatted results for each screenshot with:
|
||
|
|
- action: 'cmdlet:screen-shot'
|
||
|
|
- is_temp: True (screenshots are temporary artifacts)
|
||
|
|
- parent_id: hash of the original file/URL
|
||
|
|
|
||
|
|
Screenshots are created using Playwright and marked as temporary
|
||
|
|
so they can be cleaned up later with the cleanup cmdlet.
|
||
|
|
"""
|
||
|
|
from ._shared import parse_cmdlet_args
|
||
|
|
|
||
|
|
# Help check
|
||
|
|
try:
|
||
|
|
if any(str(a).lower() in {"-?", "/?", "--help", "-h", "help", "--cmdlet"} for a in args):
|
||
|
|
log(json.dumps(CMDLET, ensure_ascii=False, indent=2))
|
||
|
|
return 0
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
|
||
|
|
# ========================================================================
|
||
|
|
# ARGUMENT PARSING
|
||
|
|
# ========================================================================
|
||
|
|
|
||
|
|
parsed = parse_cmdlet_args(args, CMDLET)
|
||
|
|
|
||
|
|
format_value = parsed.get("format")
|
||
|
|
storage_value = parsed.get("storage")
|
||
|
|
selector_arg = parsed.get("selector")
|
||
|
|
selectors = [selector_arg] if selector_arg else []
|
||
|
|
archive_enabled = parsed.get("archive", False)
|
||
|
|
|
||
|
|
# Positional URL argument (if provided)
|
||
|
|
url_arg = parsed.get("url")
|
||
|
|
positional_urls = [str(url_arg)] if url_arg else []
|
||
|
|
|
||
|
|
# ========================================================================
|
||
|
|
# INPUT PROCESSING - Extract URLs from pipeline or command arguments
|
||
|
|
# ========================================================================
|
||
|
|
|
||
|
|
piped_results = normalize_result_input(result)
|
||
|
|
urls_to_process = []
|
||
|
|
|
||
|
|
# Extract URLs from piped results
|
||
|
|
if piped_results:
|
||
|
|
for item in piped_results:
|
||
|
|
url = None
|
||
|
|
if isinstance(item, dict):
|
||
|
|
url = item.get('file_path') or item.get('path') or item.get('url') or item.get('target')
|
||
|
|
else:
|
||
|
|
url = getattr(item, 'file_path', None) or getattr(item, 'path', None) or getattr(item, 'url', None) or getattr(item, 'target', None)
|
||
|
|
|
||
|
|
if url:
|
||
|
|
urls_to_process.append(str(url))
|
||
|
|
|
||
|
|
# Use positional arguments if no pipeline input
|
||
|
|
if not urls_to_process and positional_urls:
|
||
|
|
urls_to_process = positional_urls
|
||
|
|
|
||
|
|
if not urls_to_process:
|
||
|
|
log(f"No URLs to process for screen-shot cmdlet", file=sys.stderr)
|
||
|
|
return 1
|
||
|
|
|
||
|
|
# ========================================================================
|
||
|
|
# OUTPUT DIRECTORY RESOLUTION - Priority chain
|
||
|
|
# ========================================================================
|
||
|
|
|
||
|
|
screenshot_dir: Optional[Path] = None
|
||
|
|
|
||
|
|
# Primary: Use --storage if provided (highest priority)
|
||
|
|
if storage_value:
|
||
|
|
try:
|
||
|
|
screenshot_dir = SharedArgs.resolve_storage(storage_value)
|
||
|
|
log(f"[screen_shot] Using --storage {storage_value}: {screenshot_dir}", flush=True)
|
||
|
|
except ValueError as e:
|
||
|
|
log(str(e), file=sys.stderr)
|
||
|
|
return 1
|
||
|
|
|
||
|
|
# Secondary: Use config-based resolver ONLY if --storage not provided
|
||
|
|
if screenshot_dir is None and resolve_output_dir is not None:
|
||
|
|
try:
|
||
|
|
screenshot_dir = resolve_output_dir(config)
|
||
|
|
log(f"[screen_shot] Using config resolver: {screenshot_dir}", flush=True)
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
|
||
|
|
# Tertiary: Use config outfile ONLY if neither --storage nor resolver worked
|
||
|
|
if screenshot_dir is None and config and config.get("outfile"):
|
||
|
|
try:
|
||
|
|
screenshot_dir = Path(config["outfile"]).expanduser()
|
||
|
|
log(f"[screen_shot] Using config outfile: {screenshot_dir}", flush=True)
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
|
||
|
|
# Default: User's Videos directory
|
||
|
|
if screenshot_dir is None:
|
||
|
|
screenshot_dir = Path.home() / "Videos"
|
||
|
|
log(f"[screen_shot] Using default directory: {screenshot_dir}", flush=True)
|
||
|
|
|
||
|
|
_ensure_directory(screenshot_dir)
|
||
|
|
|
||
|
|
# ========================================================================
|
||
|
|
# PREPARE SCREENSHOT OPTIONS
|
||
|
|
# ========================================================================
|
||
|
|
|
||
|
|
format_name = _normalise_format(format_value)
|
||
|
|
filtered_selectors = [str(s).strip() for s in selectors if str(s).strip()]
|
||
|
|
target_selectors = filtered_selectors if filtered_selectors else None
|
||
|
|
|
||
|
|
all_emitted = []
|
||
|
|
exit_code = 0
|
||
|
|
# ========================================================================
|
||
|
|
# PROCESS URLs AND CAPTURE SCREENSHOTS
|
||
|
|
# ========================================================================
|
||
|
|
|
||
|
|
for url in urls_to_process:
|
||
|
|
# Validate URL format
|
||
|
|
if not url.lower().startswith(("http://", "https://", "file://")):
|
||
|
|
log(f"[screen_shot] Skipping non-URL input: {url}", file=sys.stderr)
|
||
|
|
continue
|
||
|
|
|
||
|
|
try:
|
||
|
|
# Create screenshot with provided options
|
||
|
|
options = ScreenshotOptions(
|
||
|
|
url=url,
|
||
|
|
output_dir=screenshot_dir,
|
||
|
|
output_format=format_name,
|
||
|
|
archive=archive_enabled,
|
||
|
|
target_selectors=target_selectors,
|
||
|
|
prefer_platform_target=False,
|
||
|
|
wait_for_article=False,
|
||
|
|
full_page=True,
|
||
|
|
)
|
||
|
|
|
||
|
|
screenshot_result = _capture_screenshot(options)
|
||
|
|
|
||
|
|
# Log results and warnings
|
||
|
|
log(f"Screenshot captured to {screenshot_result.path}", flush=True)
|
||
|
|
if screenshot_result.archive_urls:
|
||
|
|
log(f"Archives: {', '.join(screenshot_result.archive_urls)}", flush=True)
|
||
|
|
for warning in screenshot_result.warnings:
|
||
|
|
log(f"Warning: {warning}", flush=True)
|
||
|
|
|
||
|
|
# Compute hash of screenshot file
|
||
|
|
screenshot_hash = None
|
||
|
|
try:
|
||
|
|
with open(screenshot_result.path, 'rb') as f:
|
||
|
|
screenshot_hash = hashlib.sha256(f.read()).hexdigest()
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
|
||
|
|
# Create PipeObject result - marked as TEMP since derivative artifact
|
||
|
|
pipe_obj = create_pipe_object_result(
|
||
|
|
source='screenshot',
|
||
|
|
identifier=Path(screenshot_result.path).stem,
|
||
|
|
file_path=str(screenshot_result.path),
|
||
|
|
cmdlet_name='screen-shot',
|
||
|
|
title=f"Screenshot: {Path(screenshot_result.path).name}",
|
||
|
|
file_hash=screenshot_hash,
|
||
|
|
is_temp=True,
|
||
|
|
parent_hash=hashlib.sha256(url.encode()).hexdigest(),
|
||
|
|
extra={
|
||
|
|
'source_url': url,
|
||
|
|
'archive_urls': screenshot_result.archive_urls,
|
||
|
|
'known_urls': screenshot_result.known_urls,
|
||
|
|
'target': str(screenshot_result.path), # Explicit target for add-file
|
||
|
|
}
|
||
|
|
)
|
||
|
|
|
||
|
|
# Emit the result so downstream cmdlets (like add-file) can use it
|
||
|
|
pipeline_context.emit(pipe_obj)
|
||
|
|
all_emitted.append(pipe_obj)
|
||
|
|
|
||
|
|
except ScreenshotError as exc:
|
||
|
|
log(f"Error taking screenshot of {url}: {exc}", file=sys.stderr)
|
||
|
|
exit_code = 1
|
||
|
|
except Exception as exc:
|
||
|
|
log(f"Unexpected error taking screenshot of {url}: {exc}", file=sys.stderr)
|
||
|
|
import traceback
|
||
|
|
traceback.print_exc(file=sys.stderr)
|
||
|
|
exit_code = 1
|
||
|
|
|
||
|
|
if not all_emitted:
|
||
|
|
log(f"No screenshots were successfully captured", file=sys.stderr)
|
||
|
|
return 1
|
||
|
|
|
||
|
|
# Log completion message
|
||
|
|
log(f"✓ Successfully captured {len(all_emitted)} screenshot(s)", flush=True)
|
||
|
|
|
||
|
|
return exit_code
|
||
|
|
CMDLET = Cmdlet(
|
||
|
|
name="screen-shot",
|
||
|
|
summary="Capture a screenshot of a URL or file and mark as temporary artifact",
|
||
|
|
usage="screen-shot <url> [options] or download-data <url> | screen-shot [options]",
|
||
|
|
aliases=["screenshot", "ss"],
|
||
|
|
args=[
|
||
|
|
CmdletArg(name="url", type="string", required=False, description="URL to screenshot (or from pipeline)"),
|
||
|
|
CmdletArg(name="format", type="string", description="Output format: png, jpeg, or pdf"),
|
||
|
|
CmdletArg(name="selector", type="string", description="CSS selector for element capture"),
|
||
|
|
SharedArgs.ARCHIVE, # Use shared archive argument
|
||
|
|
SharedArgs.STORAGE, # Use shared storage argument
|
||
|
|
],
|
||
|
|
details=[
|
||
|
|
"Take screenshots of URLs with optional archiving and element targeting.",
|
||
|
|
"Screenshots are marked as temporary artifacts for cleanup by the cleanup cmdlet.",
|
||
|
|
"",
|
||
|
|
"Arguments:",
|
||
|
|
" url URL to capture (optional if piped from pipeline)",
|
||
|
|
" --format FORMAT Output format: png (default), jpeg, or pdf",
|
||
|
|
" --selector SEL CSS selector for capturing specific element",
|
||
|
|
" --archive, -arch Archive URL to Wayback/Archive.today/Archive.ph",
|
||
|
|
" --storage LOCATION Storage destination: hydrus, local, 0x0, debrid, or ftp",
|
||
|
|
"",
|
||
|
|
"Examples:",
|
||
|
|
" download-data https://example.com | screen-shot --storage local",
|
||
|
|
" download-data https://twitter.com/user/status/123 | screen-shot --selector 'article[role=article]' --storage hydrus --archive",
|
||
|
|
" screen-shot https://example.com --format jpeg --storage 0x0 --archive",
|
||
|
|
]
|
||
|
|
)
|