Files
Medios-Macina/cmdlet/screen_shot.py
nose 5104689a53 df
2025-12-17 03:16:41 -08:00

757 lines
28 KiB
Python

"""Screen-shot cmdlet for capturing screenshots of url in a pipeline.
This cmdlet processes files through the pipeline and creates screenshots using
Playwright, marking them as temporary artifacts for cleanup.
"""
from __future__ import annotations
import contextlib
import hashlib
import sys
import time
from datetime import datetime
import httpx
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence, Tuple
from urllib.parse import urlsplit, quote, urljoin
from SYS.logger import log, debug
from API.HTTP import HTTPClient
from SYS.utils import ensure_directory, unique_path, unique_preserve_order
from . import _shared as sh
Cmdlet = sh.Cmdlet
CmdletArg = sh.CmdletArg
SharedArgs = sh.SharedArgs
create_pipe_object_result = sh.create_pipe_object_result
normalize_result_input = sh.normalize_result_input
should_show_help = sh.should_show_help
get_field = sh.get_field
parse_cmdlet_args = sh.parse_cmdlet_args
import pipeline as pipeline_context
# ============================================================================
# CMDLET Metadata Declaration
# ============================================================================
# ============================================================================
# Playwright & Screenshot Dependencies
# ============================================================================
from tool.playwright import HAS_PLAYWRIGHT, PlaywrightTimeoutError, PlaywrightTool
try:
from config import resolve_output_dir
except ImportError:
try:
_parent_dir = str(Path(__file__).parent.parent)
if _parent_dir not in sys.path:
sys.path.insert(0, _parent_dir)
from config import resolve_output_dir
except ImportError:
resolve_output_dir = None
# ============================================================================
# Screenshot Constants & Configuration
# ============================================================================
USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36"
)
DEFAULT_VIEWPORT: dict[str, int] = {"width": 1280, "height": 1200}
ARCHIVE_TIMEOUT = 30.0
# Configurable selectors for specific websites
SITE_SELECTORS: Dict[str, List[str]] = {
"twitter.com": [
"article[role='article']",
"div[data-testid='tweet']",
"div[data-testid='cellInnerDiv'] article",
],
"x.com": [
"article[role='article']",
"div[data-testid='tweet']",
"div[data-testid='cellInnerDiv'] article",
],
"instagram.com": [
"article[role='presentation']",
"article[role='article']",
"div[role='dialog'] article",
"section main article",
],
"reddit.com": [
"shreddit-post",
"div[data-testid='post-container']",
"div[data-click-id='background']",
"article",
],
"rumble.com": [
"rumble-player, iframe.rumble",
"div.video-item--main",
"main article",
],
}
class ScreenshotError(RuntimeError):
"""Raised when screenshot capture or upload fails."""
@dataclass(slots=True)
class ScreenshotOptions:
"""Options controlling screenshot capture and post-processing."""
output_dir: Path
url: str = ""
output_path: Optional[Path] = None
full_page: bool = True
headless: bool = True
wait_after_load: float = 2.0
wait_for_article: bool = False
replace_video_posters: bool = True
tag: Sequence[str] = ()
archive: bool = False
archive_timeout: float = ARCHIVE_TIMEOUT
output_format: Optional[str] = None
prefer_platform_target: bool = False
target_selectors: Optional[Sequence[str]] = None
selector_timeout_ms: int = 10_000
playwright_tool: Optional[PlaywrightTool] = None
@dataclass(slots=True)
class ScreenshotResult:
"""Details about the captured screenshot."""
path: Path
tag_applied: List[str]
archive_url: List[str]
url: List[str]
warnings: List[str] = field(default_factory=list)
# ============================================================================
# Helper Functions
# ============================================================================
def _slugify_url(url: str) -> str:
"""Convert URL to filesystem-safe slug."""
parsed = urlsplit(url)
candidate = f"{parsed.netloc}{parsed.path}"
if parsed.query:
candidate += f"?{parsed.query}"
slug = "".join(char if char.isalnum() else "-" for char in candidate.lower())
slug = slug.strip("-") or "screenshot"
return slug[:100]
def _normalise_format(fmt: Optional[str]) -> str:
"""Normalize output format to valid values."""
if not fmt:
return "png"
value = fmt.strip().lower()
if value in {"jpg", "jpeg"}:
return "jpeg"
if value in {"png", "pdf"}:
return value
return "png"
def _format_suffix(fmt: str) -> str:
"""Get file suffix for format."""
if fmt == "jpeg":
return ".jpg"
return f".{fmt}"
def _selectors_for_url(url: str) -> List[str]:
"""Return a list of likely content selectors for known platforms."""
u = url.lower()
sels: List[str] = []
for domain, selectors in SITE_SELECTORS.items():
if domain in u:
sels.extend(selectors)
return sels or ["article"]
def _platform_preprocess(url: str, page: Any, warnings: List[str], timeout_ms: int = 10_000) -> None:
"""Best-effort page tweaks for popular platforms before capture."""
u = url.lower()
def _try_click_texts(texts: List[str], passes: int = 2, per_timeout: int = 700) -> int:
clicks = 0
for _ in range(max(1, passes)):
for t in texts:
try:
page.locator(f"text=/{t}/i").first.click(timeout=per_timeout)
clicks += 1
except PlaywrightTimeoutError:
pass
except Exception:
pass
time.sleep(0.1)
return clicks
# Dismiss common cookie/consent prompts
_try_click_texts(["accept", "i agree", "agree", "got it", "allow all", "consent"])
# Platform-specific expansions
if "reddit.com" in u:
_try_click_texts(["see more", "read more", "show more", "more"])
if ("twitter.com" in u) or ("x.com" in u):
_try_click_texts(["show more", "more"])
if "instagram.com" in u:
_try_click_texts(["more", "see more"])
if "tiktok.com" in u:
_try_click_texts(["more", "see more"])
if ("facebook.com" in u) or ("fb.watch" in u):
_try_click_texts(["see more", "show more", "more"])
if "rumble.com" in u:
_try_click_texts(["accept", "agree", "close"])
def _submit_wayback(url: str, timeout: float) -> Optional[str]:
"""Submit URL to Internet Archive Wayback Machine."""
encoded = quote(url, safe="/:?=&")
with HTTPClient() as client:
response = client.get(f"https://web.archive.org/save/{encoded}")
response.raise_for_status()
content_location = response.headers.get("Content-Location")
if content_location:
return urljoin("https://web.archive.org", content_location)
return str(response.url)
def _submit_archive_today(url: str, timeout: float) -> Optional[str]:
"""Submit URL to Archive.today."""
encoded = quote(url, safe=":/?#[]@!$&'()*+,;=")
with HTTPClient(headers={"User-Agent": USER_AGENT}) as client:
response = client.get(f"https://archive.today/submit/?url={encoded}")
response.raise_for_status()
final = str(response.url)
if final and ("archive.today" in final or "archive.ph" in final):
return final
return None
def _submit_archive_ph(url: str, timeout: float) -> Optional[str]:
"""Submit URL to Archive.ph."""
encoded = quote(url, safe=":/?#[]@!$&'()*+,;=")
with HTTPClient(headers={"User-Agent": USER_AGENT}) as client:
response = client.get(f"https://archive.ph/submit/?url={encoded}")
response.raise_for_status()
final = str(response.url)
if final and "archive.ph" in final:
return final
return None
def _archive_url(url: str, timeout: float) -> Tuple[List[str], List[str]]:
"""Submit URL to all available archive services."""
archives: List[str] = []
warnings: List[str] = []
for submitter, label in (
(_submit_wayback, "wayback"),
(_submit_archive_today, "archive.today"),
(_submit_archive_ph, "archive.ph"),
):
try:
log(f"Archiving to {label}...", flush=True)
archived = submitter(url, timeout)
except httpx.HTTPStatusError as exc:
if exc.response.status_code == 429:
warnings.append(f"archive {label} rate limited (HTTP 429)")
log(f"{label}: Rate limited (HTTP 429)", flush=True)
else:
warnings.append(f"archive {label} failed: HTTP {exc.response.status_code}")
log(f"{label}: HTTP {exc.response.status_code}", flush=True)
except httpx.RequestError as exc:
warnings.append(f"archive {label} failed: {exc}")
log(f"{label}: Connection error: {exc}", flush=True)
except Exception as exc:
warnings.append(f"archive {label} failed: {exc}")
log(f"{label}: {exc}", flush=True)
else:
if archived:
archives.append(archived)
log(f"{label}: Success - {archived}", flush=True)
else:
log(f"{label}: No archive link returned", flush=True)
return archives, warnings
def _prepare_output_path(options: ScreenshotOptions) -> Path:
"""Prepare and validate output path for screenshot."""
ensure_directory(options.output_dir)
explicit_format = _normalise_format(options.output_format) if options.output_format else None
inferred_format: Optional[str] = None
if options.output_path is not None:
path = options.output_path
if not path.is_absolute():
path = options.output_dir / path
suffix = path.suffix.lower()
if suffix:
inferred_format = _normalise_format(suffix[1:])
else:
stamp = time.strftime("%Y%m%d_%H%M%S")
filename = f"{_slugify_url(options.url)}_{stamp}"
path = options.output_dir / filename
final_format = explicit_format or inferred_format or "png"
if not path.suffix:
path = path.with_suffix(_format_suffix(final_format))
else:
current_suffix = path.suffix.lower()
expected = _format_suffix(final_format)
if current_suffix != expected:
path = path.with_suffix(expected)
options.output_format = final_format
return unique_path(path)
def _capture(options: ScreenshotOptions, destination: Path, warnings: List[str]) -> None:
"""Capture screenshot using Playwright."""
debug(f"[_capture] Starting capture for {options.url} -> {destination}")
try:
tool = options.playwright_tool or PlaywrightTool({})
tool.debug_dump()
log("Launching browser...", flush=True)
format_name = _normalise_format(options.output_format)
headless = options.headless or format_name == "pdf"
debug(f"[_capture] Format: {format_name}, Headless: {headless}")
if format_name == "pdf" and not options.headless:
warnings.append("pdf output requires headless Chromium; overriding headless mode")
with tool.open_page(headless=headless) as page:
log(f"Navigating to {options.url}...", flush=True)
try:
tool.goto(page, options.url)
log("Page loaded successfully", flush=True)
except PlaywrightTimeoutError:
warnings.append("navigation timeout; capturing current page state")
log("Navigation timeout; proceeding with current state", flush=True)
# Skip article lookup by default (wait_for_article defaults to False)
if options.wait_for_article:
try:
log("Waiting for article element...", flush=True)
page.wait_for_selector("article", timeout=10_000)
log("Article element found", flush=True)
except PlaywrightTimeoutError:
warnings.append("<article> selector not found; capturing fallback")
log("Article element not found; using fallback", flush=True)
if options.wait_after_load > 0:
log(f"Waiting {options.wait_after_load}s for page stabilization...", flush=True)
time.sleep(min(10.0, max(0.0, options.wait_after_load)))
if options.replace_video_posters:
log("Replacing video elements with posters...", flush=True)
page.evaluate(
"""
document.querySelectorAll('video').forEach(v => {
if (v.poster) {
const img = document.createElement('img');
img.src = v.poster;
img.style.maxWidth = '100%';
img.style.borderRadius = '12px';
v.replaceWith(img);
}
});
"""
)
# Attempt platform-specific target capture if requested (and not PDF)
element_captured = False
if options.prefer_platform_target and format_name != "pdf":
log("Attempting platform-specific content capture...", flush=True)
try:
_platform_preprocess(options.url, page, warnings)
except Exception as e:
debug(f"[_capture] Platform preprocess failed: {e}")
pass
selectors = list(options.target_selectors or [])
if not selectors:
selectors = _selectors_for_url(options.url)
debug(f"[_capture] Trying selectors: {selectors}")
for sel in selectors:
try:
log(f"Trying selector: {sel}", flush=True)
el = page.wait_for_selector(sel, timeout=max(0, int(options.selector_timeout_ms)))
except PlaywrightTimeoutError:
log(f"Selector not found: {sel}", flush=True)
continue
try:
if el is not None:
log(f"Found element with selector: {sel}", flush=True)
try:
el.scroll_into_view_if_needed(timeout=1000)
except Exception:
pass
log(f"Capturing element to {destination}...", flush=True)
el.screenshot(path=str(destination), type=("jpeg" if format_name == "jpeg" else None))
element_captured = True
log("Element captured successfully", flush=True)
break
except Exception as exc:
warnings.append(f"element capture failed for '{sel}': {exc}")
log(f"Failed to capture element: {exc}", flush=True)
# Fallback to default capture paths
if element_captured:
pass
elif format_name == "pdf":
log("Generating PDF...", flush=True)
page.emulate_media(media="print")
page.pdf(path=str(destination), print_background=True)
log(f"PDF saved to {destination}", flush=True)
else:
log(f"Capturing full page to {destination}...", flush=True)
screenshot_kwargs: Dict[str, Any] = {"path": str(destination)}
if format_name == "jpeg":
screenshot_kwargs["type"] = "jpeg"
screenshot_kwargs["quality"] = 90
if options.full_page:
page.screenshot(full_page=True, **screenshot_kwargs)
else:
article = page.query_selector("article")
if article is not None:
article_kwargs = dict(screenshot_kwargs)
article_kwargs.pop("full_page", None)
article.screenshot(**article_kwargs)
else:
page.screenshot(**screenshot_kwargs)
log(f"Screenshot saved to {destination}", flush=True)
except Exception as exc:
debug(f"[_capture] Exception: {exc}")
raise ScreenshotError(f"Failed to capture screenshot: {exc}") from exc
def _capture_screenshot(options: ScreenshotOptions) -> ScreenshotResult:
"""Capture a screenshot for the given options."""
debug(f"[_capture_screenshot] Preparing capture for {options.url}")
destination = _prepare_output_path(options)
warnings: List[str] = []
_capture(options, destination, warnings)
# Build URL list from captured url and any archives
url: List[str] = [options.url] if options.url else []
archive_url: List[str] = []
if options.archive and options.url:
debug(f"[_capture_screenshot] Archiving enabled for {options.url}")
archives, archive_warnings = _archive_url(options.url, options.archive_timeout)
archive_url.extend(archives)
warnings.extend(archive_warnings)
if archives:
url = unique_preserve_order([*url, *archives])
applied_tag = unique_preserve_order(list(tag for tag in options.tag if tag.strip()))
return ScreenshotResult(
path=destination,
tag_applied=applied_tag,
archive_url=archive_url,
url=url,
warnings=warnings,
)
# ============================================================================
# Main Cmdlet Function
# ============================================================================
def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
"""Take screenshots of url in the pipeline.
Accepts:
- Single result object (dict or PipeObject) with 'path' field
- List of result objects to screenshot each
- Direct URL as string
Emits PipeObject-formatted results for each screenshot with:
- action: 'cmdlet:screen-shot'
- is_temp: True (screenshots are temporary artifacts)
- parent_id: hash of the original file/URL
Screenshots are created using Playwright and marked as temporary
so they can be cleaned up later with the cleanup cmdlet.
"""
debug(f"[_run] screen-shot invoked with args: {args}")
# Help check
if should_show_help(args):
log(f"Cmdlet: {CMDLET.name}\nSummary: {CMDLET.summary}\nUsage: {CMDLET.usage}")
return 0
if not HAS_PLAYWRIGHT:
log(
"playwright is required for screenshot capture; install with: pip install playwright; then: playwright install",
file=sys.stderr,
)
return 1
# ========================================================================
# ARGUMENT PARSING
# ========================================================================
parsed = parse_cmdlet_args(args, CMDLET)
format_value = parsed.get("format")
if not format_value:
# Default format can be set via config.conf tool block:
# [tool=playwright]
# format="pdf"
try:
tool_cfg = config.get("tool", {}) if isinstance(config, dict) else {}
pw_cfg = tool_cfg.get("playwright") if isinstance(tool_cfg, dict) else None
if isinstance(pw_cfg, dict):
format_value = pw_cfg.get("format")
except Exception:
pass
if not format_value:
format_value = "png"
storage_value = parsed.get("storage")
selector_arg = parsed.get("selector")
selectors = [selector_arg] if selector_arg else []
archive_enabled = parsed.get("archive", False)
# Positional URL argument (if provided)
url_arg = parsed.get("url")
positional_url = [str(url_arg)] if url_arg else []
# ========================================================================
# INPUT PROCESSING - Extract url from pipeline or command arguments
# ========================================================================
piped_results = normalize_result_input(result)
url_to_process: List[Tuple[str, Any]] = []
# Extract url from piped results
if piped_results:
for item in piped_results:
url = (
get_field(item, 'path')
or get_field(item, 'url')
or get_field(item, 'target')
)
if url:
url_to_process.append((str(url), item))
# Use positional arguments if no pipeline input
if not url_to_process and positional_url:
url_to_process = [(u, None) for u in positional_url]
if not url_to_process:
log(f"No url to process for screen-shot cmdlet", file=sys.stderr)
return 1
debug(f"[_run] url to process: {[u for u, _ in url_to_process]}")
# ========================================================================
# OUTPUT DIRECTORY RESOLUTION - Priority chain
# ========================================================================
screenshot_dir: Optional[Path] = None
# Primary: Use --storage if provided (highest priority)
if storage_value:
try:
screenshot_dir = SharedArgs.resolve_storage(storage_value)
log(f"[screen_shot] Using --storage {storage_value}: {screenshot_dir}", flush=True)
except ValueError as e:
log(str(e), file=sys.stderr)
return 1
# Secondary: Use config-based resolver ONLY if --storage not provided
if screenshot_dir is None and resolve_output_dir is not None:
try:
screenshot_dir = resolve_output_dir(config)
log(f"[screen_shot] Using config resolver: {screenshot_dir}", flush=True)
except Exception:
pass
# Tertiary: Use config outfile ONLY if neither --storage nor resolver worked
if screenshot_dir is None and config and config.get("outfile"):
try:
screenshot_dir = Path(config["outfile"]).expanduser()
log(f"[screen_shot] Using config outfile: {screenshot_dir}", flush=True)
except Exception:
pass
# Default: User's Videos directory
if screenshot_dir is None:
screenshot_dir = Path.home() / "Videos"
log(f"[screen_shot] Using default directory: {screenshot_dir}", flush=True)
ensure_directory(screenshot_dir)
# ========================================================================
# PREPARE SCREENSHOT OPTIONS
# ========================================================================
format_name = _normalise_format(format_value)
filtered_selectors = [str(s).strip() for s in selectors if str(s).strip()]
target_selectors = filtered_selectors if filtered_selectors else None
all_emitted = []
exit_code = 0
# ========================================================================
# PROCESS url AND CAPTURE SCREENSHOTS
# ========================================================================
def _extract_item_tags(item: Any) -> List[str]:
if item is None:
return []
raw = get_field(item, 'tag')
if isinstance(raw, list):
return [str(t) for t in raw if t is not None and str(t).strip()]
if isinstance(raw, str) and raw.strip():
return [raw.strip()]
return []
def _extract_item_title(item: Any) -> str:
if item is None:
return ""
for key in ("title", "name", "filename"):
val = get_field(item, key)
if val is None:
continue
text = str(val).strip()
if text:
return text
return ""
def _clean_title(text: str) -> str:
value = (text or "").strip()
if value.lower().startswith("screenshot:"):
value = value.split(":", 1)[1].strip()
return value
for url, origin_item in url_to_process:
# Validate URL format
if not url.lower().startswith(("http://", "https://", "file://")):
log(f"[screen_shot] Skipping non-URL input: {url}", file=sys.stderr)
continue
try:
# Create screenshot with provided options
options = ScreenshotOptions(
url=url,
output_dir=screenshot_dir,
output_format=format_name,
archive=archive_enabled,
target_selectors=target_selectors,
prefer_platform_target=False,
wait_for_article=False,
full_page=True,
playwright_tool=PlaywrightTool(config),
)
screenshot_result = _capture_screenshot(options)
# Log results and warnings
log(f"Screenshot captured to {screenshot_result.path}", flush=True)
if screenshot_result.archive_url:
log(f"Archives: {', '.join(screenshot_result.archive_url)}", flush=True)
for warning in screenshot_result.warnings:
log(f"Warning: {warning}", flush=True)
# Compute hash of screenshot file
screenshot_hash = None
try:
with open(screenshot_result.path, 'rb') as f:
screenshot_hash = hashlib.sha256(f.read()).hexdigest()
except Exception:
pass
# Create PipeObject result - marked as TEMP since derivative artifact
capture_date = ""
try:
capture_date = datetime.fromtimestamp(screenshot_result.path.stat().st_mtime).date().isoformat()
except Exception:
capture_date = datetime.now().date().isoformat()
upstream_title = _clean_title(_extract_item_title(origin_item))
display_title = upstream_title or url
upstream_tags = _extract_item_tags(origin_item)
filtered_upstream_tags = [
t for t in upstream_tags
if not str(t).strip().lower().startswith(("type:", "date:"))
]
merged_tags = unique_preserve_order(
["type:screenshot", f"date:{capture_date}"] + filtered_upstream_tags
)
pipe_obj = create_pipe_object_result(
source='screenshot',
store='PATH',
identifier=Path(screenshot_result.path).stem,
file_path=str(screenshot_result.path),
cmdlet_name='screen-shot',
title=display_title,
hash_value=screenshot_hash,
is_temp=True,
parent_hash=hashlib.sha256(url.encode()).hexdigest(),
tag=merged_tags,
extra={
'source_url': url,
'archive_url': screenshot_result.archive_url,
'url': screenshot_result.url,
'target': str(screenshot_result.path), # Explicit target for add-file
}
)
# Emit the result so downstream cmdlet (like add-file) can use it
pipeline_context.emit(pipe_obj)
all_emitted.append(pipe_obj)
except ScreenshotError as exc:
log(f"Error taking screenshot of {url}: {exc}", file=sys.stderr)
exit_code = 1
except Exception as exc:
log(f"Unexpected error taking screenshot of {url}: {exc}", file=sys.stderr)
import traceback
traceback.print_exc(file=sys.stderr)
exit_code = 1
if not all_emitted:
log(f"No screenshots were successfully captured", file=sys.stderr)
return 1
# Log completion message
log(f"✓ Successfully captured {len(all_emitted)} screenshot(s)", flush=True)
return exit_code
CMDLET = Cmdlet(
name="screen-shot",
summary="Capture a website screenshot",
usage="screen-shot <url> [options] or download-data <url> | screen-shot [options]",
alias=["screenshot", "ss"],
arg=[
SharedArgs.URL,
CmdletArg(name="format", type="string", description="Output format: png, jpeg, or pdf"),
CmdletArg(name="selector", type="string", description="CSS selector for element capture"),
],
detail=
["""
"""]
)
CMDLET.exec = _run
CMDLET.register()