Files
Medios-Macina/cmdlet/screen_shot.py

1197 lines
44 KiB
Python

"""Screen-shot cmdlet for capturing screenshots of url in a pipeline.
This cmdlet processes files through the pipeline and creates screenshots using
Playwright, marking them as temporary artifacts for cleanup.
"""
from __future__ import annotations
import hashlib
import sys
import time
from datetime import datetime
import httpx
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence, Tuple
from urllib.parse import urlsplit, quote, urljoin, unquote
from SYS.logger import log, debug
from API.HTTP import HTTPClient
from SYS.pipeline_progress import PipelineProgress
from SYS.utils import ensure_directory, unique_path, unique_preserve_order
from . import _shared as sh
Cmdlet = sh.Cmdlet
CmdletArg = sh.CmdletArg
SharedArgs = sh.SharedArgs
create_pipe_object_result = sh.create_pipe_object_result
normalize_result_input = sh.normalize_result_input
should_show_help = sh.should_show_help
get_field = sh.get_field
parse_cmdlet_args = sh.parse_cmdlet_args
from SYS import pipeline as pipeline_context
# ============================================================================
# CMDLET Metadata Declaration
# ============================================================================
# ============================================================================
# Playwright & Screenshot Dependencies
# ============================================================================
from tool.playwright import PlaywrightTimeoutError, PlaywrightTool
try:
from SYS.config import resolve_output_dir
except ImportError:
try:
_parent_dir = str(Path(__file__).parent.parent)
if _parent_dir not in sys.path:
sys.path.insert(0, _parent_dir)
from SYS.config import resolve_output_dir
except ImportError:
resolve_output_dir = None
# ============================================================================
# Screenshot Constants & Configuration
# ============================================================================
USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36"
)
DEFAULT_VIEWPORT: dict[str,
int] = {
"width": 1920,
"height": 1080
}
ARCHIVE_TIMEOUT = 30.0
# WebP has a hard maximum dimension per side.
# Pillow typically fails with: "encoding error 5: Image size exceeds WebP limit of 16383 pixels"
WEBP_MAX_DIM = 16_383
# Configurable selectors for specific websites
SITE_SELECTORS: Dict[str,
List[str]] = {
"twitter.com": [
"article[role='article']",
"div[data-testid='tweet']",
"div[data-testid='cellInnerDiv'] article",
],
"x.com": [
"article[role='article']",
"div[data-testid='tweet']",
"div[data-testid='cellInnerDiv'] article",
],
"instagram.com": [
"article[role='presentation']",
"article[role='article']",
"div[role='dialog'] article",
"section main article",
],
"reddit.com": [
"shreddit-post",
"div[data-testid='post-container']",
"div[data-click-id='background']",
"article",
],
"rumble.com": [
"rumble-player, iframe.rumble",
"div.video-item--main",
"main article",
],
}
class ScreenshotError(RuntimeError):
"""Raised when screenshot capture or upload fails."""
@dataclass(slots=True)
class ScreenshotOptions:
"""Options controlling screenshot capture and post-processing."""
output_dir: Path
url: str = ""
output_path: Optional[Path] = None
full_page: bool = True
headless: bool = True
wait_after_load: float = 6.0
wait_for_article: bool = False
replace_video_posters: bool = True
tag: Sequence[str] = ()
archive: bool = False
archive_timeout: float = ARCHIVE_TIMEOUT
output_format: Optional[str] = None
prefer_platform_target: bool = False
target_selectors: Optional[Sequence[str]] = None
selector_timeout_ms: int = 10_000
playwright_tool: Optional[PlaywrightTool] = None
@dataclass(slots=True)
class ScreenshotResult:
"""Details about the captured screenshot."""
path: Path
tag_applied: List[str]
archive_url: List[str]
url: List[str]
warnings: List[str] = field(default_factory=list)
# ============================================================================
# Helper Functions
# ============================================================================
def _slugify_url(url: str) -> str:
"""Convert URL to filesystem-safe slug."""
parsed = urlsplit(url)
candidate = f"{parsed.netloc}{parsed.path}"
if parsed.query:
candidate += f"?{parsed.query}"
slug = "".join(char if char.isalnum() else "-" for char in candidate.lower())
slug = slug.strip("-") or "screenshot"
return slug[:100]
def _tags_from_url(url: str) -> List[str]:
"""Derive simple tags from a URL.
- site:<domain> (strips leading www.)
- title:<slug> derived from the last path segment, with extension removed
and separators (-, _, %) normalized to spaces.
"""
u = str(url or "").strip()
if not u:
return []
parsed = None
try:
parsed = urlsplit(u)
host = (
str(
getattr(parsed,
"hostname",
None) or getattr(parsed,
"netloc",
"") or ""
).strip().lower()
)
except Exception:
parsed = None
host = ""
if host:
# Drop credentials and port if present.
if "@" in host:
host = host.rsplit("@", 1)[-1]
if ":" in host:
host = host.split(":", 1)[0]
if host.startswith("www."):
host = host[len("www."):]
path = ""
if parsed is not None:
try:
path = str(getattr(parsed, "path", "") or "")
except Exception:
path = ""
last = ""
if path:
try:
last = path.rsplit("/", 1)[-1]
except Exception:
last = ""
try:
last = unquote(last or "")
except Exception:
last = last or ""
if last and "." in last:
# Drop a single trailing extension (e.g. .html, .php).
last = last.rsplit(".", 1)[0]
for sep in ("_", "-", "%"):
if last and sep in last:
last = last.replace(sep, " ")
title = " ".join(str(last or "").split()).strip().lower()
tags: List[str] = []
if host:
tags.append(f"site:{host}")
if title:
tags.append(f"title:{title}")
return tags
def _title_from_url(url: str) -> str:
"""Return the normalized title derived from a URL's last path segment."""
for t in _tags_from_url(url):
if str(t).lower().startswith("title:"):
return str(t)[len("title:"):].strip()
return ""
def _normalise_format(fmt: Optional[str]) -> str:
"""Normalize output format to valid values."""
if not fmt:
return "webp"
value = fmt.strip().lower()
if value in {"jpg",
"jpeg"}:
return "jpeg"
if value in {"png",
"pdf",
"webp"}:
return value
return "webp"
def _format_suffix(fmt: str) -> str:
"""Get file suffix for format."""
if fmt == "jpeg":
return ".jpg"
return f".{fmt}"
def _convert_to_webp(
src_png: Path,
dst_webp: Path,
*,
quality: int = 90,
method: int = 6,
max_dim: int = WEBP_MAX_DIM,
downscale_if_oversize: bool = True,
) -> bool:
"""Convert a PNG screenshot to WebP via Pillow.
Playwright does not currently support emitting WebP directly.
"""
if not src_png or not Path(src_png).is_file():
raise ScreenshotError(f"Source image not found: {src_png}")
dst_webp = Path(dst_webp)
try:
dst_webp.parent.mkdir(parents=True, exist_ok=True)
except Exception:
pass
try:
from PIL import Image
except Exception as exc:
raise ScreenshotError(f"Pillow is required for webp conversion: {exc}") from exc
# Write atomically to avoid partial files if conversion is interrupted.
tmp_path = unique_path(dst_webp.with_suffix(".tmp.webp"))
try:
with Image.open(src_png) as im:
did_downscale = False
save_kwargs: Dict[str,
Any] = {
"format": "WEBP",
"quality": int(quality),
"method": int(method),
}
# Preserve alpha when present; Pillow handles it for WEBP.
# Normalize palette images to RGBA to avoid odd palette artifacts.
if im.mode == "P":
im = im.convert("RGBA")
# WebP enforces a hard max dimension per side (16383px).
# When full-page captures are very tall, downscale proportionally to fit.
try:
w, h = im.size
except Exception:
w, h = 0, 0
if (downscale_if_oversize and isinstance(max_dim,
int) and max_dim > 0
and (w > max_dim or h > max_dim)):
scale = 1.0
try:
scale = min(float(max_dim) / float(w), float(max_dim) / float(h))
except Exception:
scale = 1.0
if scale > 0.0 and scale < 1.0:
new_w = max(1, int(w * scale))
new_h = max(1, int(h * scale))
debug(
f"[_convert_to_webp] Image exceeds WebP limit ({w}x{h}); downscaling -> {new_w}x{new_h}"
)
try:
resample = getattr(
getattr(Image,
"Resampling",
Image),
"LANCZOS",
None
)
if resample is None:
resample = getattr(Image, "LANCZOS", 1)
im = im.resize((new_w, new_h), resample=resample)
did_downscale = True
except Exception as exc:
debug(
f"[_convert_to_webp] Downscale failed; attempting direct WEBP save anyway: {exc}"
)
im.save(tmp_path, **save_kwargs)
tmp_path.replace(dst_webp)
return bool(did_downscale)
finally:
try:
tmp_path.unlink(missing_ok=True)
except Exception:
pass
def _matched_site_selectors(url: str) -> List[str]:
"""Return SITE_SELECTORS for a matched domain; empty if no match.
Unlike `_selectors_for_url()`, this does not return a generic fallback.
"""
u = str(url or "").lower()
sels: List[str] = []
for domain, selectors in SITE_SELECTORS.items():
if domain in u:
sels.extend(selectors)
return sels
def _selectors_for_url(url: str) -> List[str]:
"""Return selectors to try for a URL.
For now, prefer a minimal behavior: only return known SITE_SELECTORS.
(The cmdlet already falls back to full-page capture when no selectors match.)
"""
return _matched_site_selectors(url)
def _platform_preprocess(
url: str,
page: Any,
warnings: List[str],
timeout_ms: int = 10_000
) -> None:
"""Best-effort page tweaks for popular platforms before capture."""
try:
u = str(url or "").lower()
def _try_click_buttons(
names: List[str],
passes: int = 2,
per_timeout: int = 700
) -> int:
clicks = 0
for _ in range(max(1, int(passes))):
for name in names:
try:
locator = page.get_by_role("button", name=name)
locator.first.click(timeout=int(per_timeout))
clicks += 1
except Exception:
pass
return clicks
# Dismiss common cookie / consent prompts.
_try_click_buttons(
[
"Accept all",
"Accept",
"I agree",
"Agree",
"Allow all",
"OK",
]
)
# Some sites need small nudges (best-effort).
if "reddit.com" in u:
_try_click_buttons(["Accept all", "Accept"])
if ("twitter.com" in u) or ("x.com" in u):
_try_click_buttons(["Accept all", "Accept"])
if "instagram.com" in u:
_try_click_buttons(["Allow all", "Accept all", "Accept"])
except Exception as exc:
debug(f"[_platform_preprocess] skipped: {exc}")
return
def _submit_wayback(url: str, timeout: float) -> Optional[str]:
encoded = quote(url, safe="/:?=&")
with HTTPClient(headers={
"User-Agent": USER_AGENT
}) as client:
response = client.get(f"https://web.archive.org/save/{encoded}")
content_location = response.headers.get("Content-Location")
if content_location:
return urljoin("https://web.archive.org", content_location)
return str(response.url)
def _submit_archive_today(url: str, timeout: float) -> Optional[str]:
"""Submit URL to Archive.today."""
encoded = quote(url, safe=":/?#[]@!$&'()*+,;=")
with HTTPClient(headers={
"User-Agent": USER_AGENT
}) as client:
response = client.get(f"https://archive.today/submit/?url={encoded}")
response.raise_for_status()
final = str(response.url)
if final and ("archive.today" in final or "archive.ph" in final):
return final
return None
def _submit_archive_ph(url: str, timeout: float) -> Optional[str]:
"""Submit URL to Archive.ph."""
encoded = quote(url, safe=":/?#[]@!$&'()*+,;=")
with HTTPClient(headers={
"User-Agent": USER_AGENT
}) as client:
response = client.get(f"https://archive.ph/submit/?url={encoded}")
response.raise_for_status()
final = str(response.url)
if final and "archive.ph" in final:
return final
return None
def _archive_url(url: str, timeout: float) -> Tuple[List[str], List[str]]:
"""Submit URL to all available archive services."""
archives: List[str] = []
warnings: List[str] = []
for submitter, label in (
(_submit_wayback, "wayback"),
(_submit_archive_today, "archive.today"),
(_submit_archive_ph, "archive.ph"),
):
try:
debug(f"Archiving to {label}...")
archived = submitter(url, timeout)
except httpx.HTTPStatusError as exc:
if exc.response.status_code == 429:
warnings.append(f"archive {label} rate limited (HTTP 429)")
debug(f"{label}: Rate limited (HTTP 429)")
else:
warnings.append(
f"archive {label} failed: HTTP {exc.response.status_code}"
)
debug(f"{label}: HTTP {exc.response.status_code}")
except httpx.RequestError as exc:
warnings.append(f"archive {label} failed: {exc}")
debug(f"{label}: Connection error: {exc}")
except Exception as exc:
warnings.append(f"archive {label} failed: {exc}")
debug(f"{label}: {exc}")
else:
if archived:
archives.append(archived)
debug(f"{label}: Success - {archived}")
else:
debug(f"{label}: No archive link returned")
return archives, warnings
def _prepare_output_path(options: ScreenshotOptions) -> Path:
"""Prepare and validate output path for screenshot."""
ensure_directory(options.output_dir)
explicit_format = _normalise_format(
options.output_format
) if options.output_format else None
inferred_format: Optional[str] = None
if options.output_path is not None:
path = options.output_path
if not path.is_absolute():
path = options.output_dir / path
suffix = path.suffix.lower()
if suffix:
inferred_format = _normalise_format(suffix[1:])
else:
stamp = time.strftime("%Y%m%d_%H%M%S")
filename = f"{_slugify_url(options.url)}_{stamp}"
path = options.output_dir / filename
final_format = explicit_format or inferred_format or "png"
if not path.suffix:
path = path.with_suffix(_format_suffix(final_format))
else:
current_suffix = path.suffix.lower()
expected = _format_suffix(final_format)
if current_suffix != expected:
path = path.with_suffix(expected)
options.output_format = final_format
return unique_path(path)
def _capture(
options: ScreenshotOptions,
destination: Path,
warnings: List[str],
progress: PipelineProgress
) -> None:
"""Capture screenshot using Playwright."""
debug(f"[_capture] Starting capture for {options.url} -> {destination}")
try:
progress.step("loading launching browser")
tool = options.playwright_tool or PlaywrightTool({})
# Ensure Chromium engine is used for the screen-shot cmdlet (force for consistency)
try:
current_browser = (
getattr(tool.defaults,
"browser",
"").lower() if getattr(tool,
"defaults",
None) is not None else ""
)
if current_browser != "chromium":
debug(
f"[_capture] Overriding Playwright browser '{current_browser}' -> 'chromium' for screen-shot cmdlet"
)
base_cfg = {}
try:
base_cfg = dict(getattr(tool,
"_config",
{}) or {})
except Exception:
base_cfg = {}
tool_block = dict(base_cfg.get("tool") or {}
) if isinstance(base_cfg,
dict) else {}
pw_block = (
dict(tool_block.get("playwright") or {})
if isinstance(tool_block,
dict) else {}
)
pw_block["browser"] = "chromium"
tool_block["playwright"] = pw_block
if isinstance(base_cfg, dict):
base_cfg["tool"] = tool_block
tool = PlaywrightTool(base_cfg)
except Exception:
tool = PlaywrightTool({
"tool": {
"playwright": {
"browser": "chromium"
}
}
})
tool.debug_dump()
debug("Launching browser...")
format_name = _normalise_format(options.output_format)
headless = options.headless or format_name == "pdf"
debug(f"[_capture] Format: {format_name}, Headless: {headless}")
if format_name == "pdf" and not options.headless:
warnings.append(
"pdf output requires headless Chromium; overriding headless mode"
)
try:
with tool.open_page(headless=headless) as page:
progress.step("loading navigating")
debug(f"Navigating to {options.url}...")
try:
tool.goto(page, options.url)
debug("Page loaded successfully")
progress.step("loading page loaded")
except PlaywrightTimeoutError:
warnings.append("navigation timeout; capturing current page state")
debug("Navigation timeout; proceeding with current state")
progress.step("loading navigation timeout")
# Skip article lookup by default (wait_for_article defaults to False)
if options.wait_for_article:
try:
debug("Waiting for article element...")
page.wait_for_selector("article", timeout=10_000)
debug("Article element found")
except PlaywrightTimeoutError:
warnings.append(
"<article> selector not found; capturing fallback"
)
debug("Article element not found; using fallback")
if options.wait_after_load > 0:
debug(
f"Waiting {options.wait_after_load}s for page stabilization..."
)
time.sleep(min(10.0, max(0.0, options.wait_after_load)))
progress.step("loading stabilized")
progress.step("capturing preparing")
if options.replace_video_posters:
debug("Replacing video elements with posters...")
page.evaluate(
"""
document.querySelectorAll('video').forEach(v => {
if (v.poster) {
const img = document.createElement('img');
img.src = v.poster;
img.style.maxWidth = '100%';
img.style.borderRadius = '12px';
v.replaceWith(img);
}
});
"""
)
# Attempt platform-specific target capture if requested (and not PDF)
element_captured = False
if options.prefer_platform_target and format_name != "pdf":
debug(f"[_capture] Target capture enabled")
debug("Attempting platform-specific content capture...")
progress.step("capturing locating target")
try:
_platform_preprocess(options.url, page, warnings)
except Exception as e:
debug(f"[_capture] Platform preprocess failed: {e}")
pass
selectors = list(options.target_selectors or [])
if not selectors:
selectors = _selectors_for_url(options.url)
debug(f"[_capture] Trying selectors: {selectors}")
for sel in selectors:
try:
debug(f"Trying selector: {sel}")
el = page.wait_for_selector(
sel,
timeout=max(0,
int(options.selector_timeout_ms))
)
except PlaywrightTimeoutError:
debug(f"Selector not found: {sel}")
continue
try:
if el is not None:
debug(f"Found element with selector: {sel}")
try:
el.scroll_into_view_if_needed(timeout=1000)
except Exception:
pass
progress.step("capturing output")
debug(f"Capturing element to {destination}...")
el.screenshot(
path=str(destination),
type=("jpeg" if format_name == "jpeg" else None),
)
element_captured = True
debug("Element captured successfully")
break
except Exception as exc:
warnings.append(
f"element capture failed for '{sel}': {exc}"
)
debug(f"Failed to capture element: {exc}")
# Fallback to default capture paths
if element_captured:
progress.step("capturing saved")
elif format_name == "pdf":
debug("Generating PDF...")
page.emulate_media(media="print")
progress.step("capturing output")
page.pdf(path=str(destination), print_background=True)
debug(f"PDF saved to {destination}")
progress.step("capturing saved")
else:
debug(f"Capturing full page to {destination}...")
screenshot_kwargs: Dict[str,
Any] = {
"path": str(destination)
}
if format_name == "jpeg":
screenshot_kwargs["type"] = "jpeg"
screenshot_kwargs["quality"] = 90
if options.full_page:
progress.step("capturing output")
page.screenshot(full_page=True, **screenshot_kwargs)
else:
article = page.query_selector("article")
if article is not None:
article_kwargs = dict(screenshot_kwargs)
article_kwargs.pop("full_page", None)
progress.step("capturing output")
article.screenshot(**article_kwargs)
else:
progress.step("capturing output")
page.screenshot(**screenshot_kwargs)
debug(f"Screenshot saved to {destination}")
progress.step("capturing saved")
except Exception as exc:
debug(f"[_capture] Exception launching browser/page: {exc}")
msg = str(exc).lower()
if any(k in msg for k in ["executable", "not found", "no such file",
"cannot find", "install"]):
raise ScreenshotError(
"Chromium Playwright browser binaries not found. Install them: python ./scripts/bootstrap.py --playwright-only --browsers chromium"
) from exc
raise
except ScreenshotError:
# Re-raise ScreenshotError raised intentionally (do not wrap)
raise
except Exception as exc:
debug(f"[_capture] Exception: {exc}")
raise ScreenshotError(f"Failed to capture screenshot: {exc}") from exc
def _capture_screenshot(
options: ScreenshotOptions,
progress: PipelineProgress
) -> ScreenshotResult:
"""Capture a screenshot for the given options."""
debug(f"[_capture_screenshot] Preparing capture for {options.url}")
requested_format = _normalise_format(options.output_format)
destination = _prepare_output_path(options)
warnings: List[str] = []
will_target = bool(options.prefer_platform_target) and requested_format != "pdf"
will_convert = requested_format == "webp"
will_archive = bool(options.archive and options.url)
total_steps = (
9 + (1 if will_target else 0) + (1 if will_convert else 0) +
(1 if will_archive else 0)
)
progress.begin_steps(total_steps)
progress.step("loading starting")
# Playwright screenshots do not natively support WebP output.
# Capture as PNG, then convert via Pillow.
capture_path = destination
if requested_format == "webp":
capture_path = unique_path(destination.with_suffix(".png"))
debug(
f"[_capture_screenshot] Requested webp; capturing intermediate png -> {capture_path}"
)
options.output_format = "png"
_capture(options, capture_path, warnings, progress)
if requested_format == "webp":
progress.step("capturing converting to webp")
debug(f"[_capture_screenshot] Converting png -> webp: {destination}")
try:
did_downscale = _convert_to_webp(capture_path, destination)
if did_downscale:
warnings.append(
f"webp conversion used downscaling to fit {WEBP_MAX_DIM}px limit; keeping original png: {capture_path.name}"
)
else:
try:
capture_path.unlink(missing_ok=True)
except Exception:
pass
except Exception as exc:
warnings.append(f"webp conversion failed; keeping png: {exc}")
destination = capture_path
# Build URL list from captured url and any archives
url: List[str] = [options.url] if options.url else []
archive_url: List[str] = []
if options.archive and options.url:
progress.step("capturing archiving")
debug(f"[_capture_screenshot] Archiving enabled for {options.url}")
archives, archive_warnings = _archive_url(options.url, options.archive_timeout)
archive_url.extend(archives)
warnings.extend(archive_warnings)
if archives:
url = unique_preserve_order([*url, *archives])
progress.step("capturing finalized")
applied_tag = unique_preserve_order(list(tag for tag in options.tag if tag.strip()))
return ScreenshotResult(
path=destination,
tag_applied=applied_tag,
archive_url=archive_url,
url=url,
warnings=warnings,
)
# ============================================================================
# Main Cmdlet Function
# ============================================================================
def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
"""Take screenshots of url in the pipeline.
Accepts:
- Single result object (dict or PipeObject) with 'path' field
- List of result objects to screenshot each
- Direct URL as string
Emits PipeObject-formatted results for each screenshot with:
- action: 'cmdlet:screen-shot'
- is_temp: True (screenshots are temporary artifacts)
- parent_id: hash of the original file/URL
Screenshots are created using Playwright and marked as temporary
so they can be cleaned up later with the cleanup cmdlet.
"""
debug(f"[_run] screen-shot invoked with args: {args}")
# Help check
if should_show_help(args):
log(f"Cmdlet: {CMDLET.name}\nSummary: {CMDLET.summary}\nUsage: {CMDLET.usage}")
return 0
progress = PipelineProgress(pipeline_context)
# ========================================================================
# ARGUMENT PARSING
# ========================================================================
parsed = parse_cmdlet_args(args, CMDLET)
format_value = parsed.get("format")
if not format_value:
# Default format can be set via config.conf tool block:
# [tool=playwright]
# format="pdf"
try:
tool_cfg = config.get("tool",
{}) if isinstance(config,
dict) else {}
pw_cfg = tool_cfg.get("playwright") if isinstance(tool_cfg, dict) else None
if isinstance(pw_cfg, dict):
format_value = pw_cfg.get("format")
except Exception:
pass
if not format_value:
format_value = "webp"
storage_value = parsed.get("storage")
selector_arg = parsed.get("selector")
selectors = [selector_arg] if selector_arg else []
archive_enabled = parsed.get("archive", False)
# Positional URL argument (if provided)
url_arg = parsed.get("url")
positional_url = [str(url_arg)] if url_arg else []
# ========================================================================
# INPUT PROCESSING - Extract url from command args or pipeline
# ========================================================================
# If the user provided an explicit URL argument, prefer it.
url_to_process: List[Tuple[str, Any]] = []
if positional_url:
url_to_process = [(u, None) for u in positional_url]
else:
piped_results = normalize_result_input(result)
# Extract url from piped results
if piped_results:
for item in piped_results:
url = get_field(item,
"path") or get_field(item,
"url"
) or get_field(item,
"target")
if url:
url_to_process.append((str(url), item))
if not url_to_process:
log(f"No url to process for screen-shot cmdlet", file=sys.stderr)
return 1
debug(f"[_run] url to process: {[u for u, _ in url_to_process]}")
# ========================================================================
# OUTPUT DIRECTORY RESOLUTION - Priority chain
# ========================================================================
screenshot_dir: Optional[Path] = None
# Primary: Use --storage if provided (highest priority)
if storage_value:
try:
screenshot_dir = SharedArgs.resolve_storage(storage_value)
debug(f"[screen_shot] Using --storage {storage_value}: {screenshot_dir}")
except ValueError as e:
log(str(e), file=sys.stderr)
return 1
# Secondary: Use config-based resolver ONLY if --storage not provided
if screenshot_dir is None and resolve_output_dir is not None:
try:
screenshot_dir = resolve_output_dir(config)
debug(f"[screen_shot] Using config resolver: {screenshot_dir}")
except Exception:
pass
# Tertiary: Use config outfile ONLY if neither --storage nor resolver worked
if screenshot_dir is None and config and config.get("outfile"):
try:
screenshot_dir = Path(config["outfile"]).expanduser()
debug(f"[screen_shot] Using config outfile: {screenshot_dir}")
except Exception:
pass
# Default: User's Videos directory
if screenshot_dir is None:
screenshot_dir = Path.home() / "Videos"
debug(f"[screen_shot] Using default directory: {screenshot_dir}")
ensure_directory(screenshot_dir)
# If the caller isn't running the shared pipeline Live progress UI (e.g. direct
# cmdlet execution), start a minimal local pipeline progress panel so this cmdlet
# still shows step-level progress.
try:
progress.ensure_local_ui(
label="screen-shot",
total_items=len(url_to_process),
items_preview=[u for u, _ in url_to_process],
)
except Exception:
pass
# ========================================================================
# PREPARE SCREENSHOT OPTIONS
# ========================================================================
format_name = _normalise_format(format_value)
filtered_selectors = [str(s).strip() for s in selectors if str(s).strip()]
manual_target_selectors = filtered_selectors if filtered_selectors else None
all_emitted = []
exit_code = 0
# ========================================================================
# PROCESS url AND CAPTURE SCREENSHOTS
# ========================================================================
def _extract_item_tags(item: Any) -> List[str]:
if item is None:
return []
raw = get_field(item, "tag")
if isinstance(raw, list):
return [str(t) for t in raw if t is not None and str(t).strip()]
if isinstance(raw, str) and raw.strip():
return [raw.strip()]
return []
def _extract_item_title(item: Any) -> str:
if item is None:
return ""
for key in ("title", "name", "filename"):
val = get_field(item, key)
if val is None:
continue
text = str(val).strip()
if text:
return text
return ""
def _clean_title(text: str) -> str:
value = (text or "").strip()
if value.lower().startswith("screenshot:"):
value = value.split(":", 1)[1].strip()
return value
for url, origin_item in url_to_process:
# Validate URL format
if not url.lower().startswith(("http://", "https://", "file://")):
log(f"[screen_shot] Skipping non-URL input: {url}", file=sys.stderr)
continue
try:
# Create screenshot with provided options
# Force the Playwright engine to Chromium for the screen-shot cmdlet
# (this ensures consistent rendering and supports PDF output requirements).
pw_local_cfg = {}
if isinstance(config, dict):
tool_block = dict(config.get("tool") or {})
pw_block = dict(tool_block.get("playwright") or {})
pw_block["browser"] = "chromium"
# Use Playwright-native UA/headers (matches bundled Chromium version).
pw_block["user_agent"] = "native"
pw_block["viewport_width"] = int(DEFAULT_VIEWPORT.get("width", 1920))
pw_block["viewport_height"] = int(DEFAULT_VIEWPORT.get("height", 1080))
tool_block["playwright"] = pw_block
pw_local_cfg = dict(config)
pw_local_cfg["tool"] = tool_block
else:
pw_local_cfg = {
"tool": {
"playwright": {
"browser": "chromium",
"user_agent": "native",
"viewport_width": int(DEFAULT_VIEWPORT.get("width",
1920)),
"viewport_height":
int(DEFAULT_VIEWPORT.get("height",
1080)),
}
}
}
options = ScreenshotOptions(
url=url,
output_dir=screenshot_dir,
output_format=format_name,
archive=archive_enabled,
target_selectors=None,
prefer_platform_target=False,
wait_for_article=False,
full_page=True,
playwright_tool=PlaywrightTool(pw_local_cfg),
)
# Auto element capture for known sites (x.com/twitter/etc.).
# - If the user provided --selector, treat that as an explicit target.
# - Otherwise, if SITE_SELECTORS matches the URL, auto-capture the post/content element.
auto_selectors = _matched_site_selectors(url)
if manual_target_selectors:
options.prefer_platform_target = True
options.target_selectors = manual_target_selectors
debug(
f"[screen_shot] Using explicit selector(s): {manual_target_selectors}"
)
elif auto_selectors:
options.prefer_platform_target = True
options.target_selectors = auto_selectors
debug(f"[screen_shot] Auto selectors matched for url: {auto_selectors}")
screenshot_result = _capture_screenshot(options, progress)
# Log results and warnings
debug(f"Screenshot captured to {screenshot_result.path}")
if screenshot_result.archive_url:
debug(f"Archives: {', '.join(screenshot_result.archive_url)}")
for warning in screenshot_result.warnings:
debug(f"Warning: {warning}")
# Compute hash of screenshot file
screenshot_hash = None
try:
with open(screenshot_result.path, "rb") as f:
screenshot_hash = hashlib.sha256(f.read()).hexdigest()
except Exception:
pass
# Create PipeObject result - marked as TEMP since derivative artifact
capture_date = ""
try:
capture_date = (
datetime.fromtimestamp(screenshot_result.path.stat().st_mtime
).date().isoformat()
)
except Exception:
capture_date = datetime.now().date().isoformat()
upstream_title = _clean_title(_extract_item_title(origin_item))
url_title = _title_from_url(url)
display_title = upstream_title or url_title or url
upstream_tags = _extract_item_tags(origin_item)
filtered_upstream_tags = [
t for t in upstream_tags
if not str(t).strip().lower().startswith(("type:", "date:"))
]
url_tags = _tags_from_url(url)
merged_tags = unique_preserve_order(
["type:screenshot",
f"date:{capture_date}"] + filtered_upstream_tags + url_tags
)
pipe_obj = create_pipe_object_result(
source="screenshot",
store="PATH",
identifier=Path(screenshot_result.path).stem,
file_path=str(screenshot_result.path),
cmdlet_name="screen-shot",
title=display_title,
hash_value=screenshot_hash,
is_temp=True,
parent_hash=hashlib.sha256(url.encode()).hexdigest(),
tag=merged_tags,
extra={
"source_url": url,
"archive_url": screenshot_result.archive_url,
"url": screenshot_result.url,
"target": str(screenshot_result.path), # Explicit target for add-file
},
)
# Emit the result so downstream cmdlet (like add-file) can use it
pipeline_context.emit(pipe_obj)
all_emitted.append(pipe_obj)
# If we created a local progress UI, advance it per completed item.
progress.on_emit(pipe_obj)
except ScreenshotError as exc:
log(f"Error taking screenshot of {url}: {exc}", file=sys.stderr)
exit_code = 1
except Exception as exc:
log(f"Unexpected error taking screenshot of {url}: {exc}", file=sys.stderr)
import traceback
traceback.print_exc(file=sys.stderr)
exit_code = 1
progress.close_local_ui(force_complete=True)
if not all_emitted:
log(f"No screenshots were successfully captured", file=sys.stderr)
return 1
# Log completion message (keep this as normal output)
log(f"✓ Successfully captured {len(all_emitted)} screenshot(s)")
return exit_code
CMDLET = Cmdlet(
name="screen-shot",
summary="Capture a website screenshot",
usage="screen-shot <url> [options]",
alias=["screenshot",
"ss"],
arg=[
SharedArgs.URL,
CmdletArg(
name="format",
type="string",
description="Output format: webp, png, jpeg, or pdf"
),
CmdletArg(
name="selector",
type="string",
description="CSS selector for element capture"
),
SharedArgs.PATH,
],
detail=[
"Uses Playwright Chromium engine only. Install Chromium with: python ./scripts/bootstrap.py --playwright-only --browsers chromium",
"PDF output requires headless Chromium (the cmdlet will enforce headless mode for PDF).",
"Screenshots are temporary artifacts stored in the configured `temp` directory.",
],
)
CMDLET.exec = _run
CMDLET.register()