This commit is contained in:
nose
2025-11-27 10:59:01 -08:00
parent e9b505e609
commit 9eff65d1af
30 changed files with 2099 additions and 1095 deletions

View File

@@ -9,6 +9,7 @@ from __future__ import annotations
import contextlib
import hashlib
import importlib
import json
import sys
import time
import httpx
@@ -17,8 +18,9 @@ from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence, Tuple
from urllib.parse import urlsplit, quote, urljoin
from helper.logger import log
from helper.logger import log, debug
from helper.http_client import HTTPClient
from helper.utils import ensure_directory, unique_path, unique_preserve_order
from . import register
from ._shared import Cmdlet, CmdletArg, SharedArgs, create_pipe_object_result, normalize_result_input
@@ -70,6 +72,38 @@ USER_AGENT = (
DEFAULT_VIEWPORT: ViewportSize = {"width": 1280, "height": 1200}
ARCHIVE_TIMEOUT = 30.0
# Configurable selectors for specific websites
SITE_SELECTORS: Dict[str, List[str]] = {
"twitter.com": [
"article[role='article']",
"div[data-testid='tweet']",
"div[data-testid='cellInnerDiv'] article",
],
"x.com": [
"article[role='article']",
"div[data-testid='tweet']",
"div[data-testid='cellInnerDiv'] article",
],
"instagram.com": [
"article[role='presentation']",
"article[role='article']",
"div[role='dialog'] article",
"section main article",
],
"reddit.com": [
"shreddit-post",
"div[data-testid='post-container']",
"div[data-click-id='background']",
"article",
],
"rumble.com": [
"rumble-player, iframe.rumble",
"div.video-item--main",
"main article",
],
}
class ScreenshotError(RuntimeError):
"""Raised when screenshot capture or upload fails."""
@@ -113,39 +147,6 @@ class ScreenshotResult:
# Helper Functions
# ============================================================================
def _ensure_directory(path: Path) -> None:
"""Ensure directory exists."""
if not isinstance(path, Path):
path = Path(path)
path.mkdir(parents=True, exist_ok=True)
def _unique_path(path: Path) -> Path:
"""Get unique path by appending numbers if file exists."""
if not path.exists():
return path
stem = path.stem
suffix = path.suffix
parent = path.parent
counter = 1
while True:
new_path = parent / f"{stem}_{counter}{suffix}"
if not new_path.exists():
return new_path
counter += 1
def _unique_preserve_order(items: Sequence[str]) -> List[str]:
"""Remove duplicates while preserving order."""
seen = set()
result = []
for item in items:
if item not in seen:
seen.add(item)
result.append(item)
return result
def _slugify_url(url: str) -> str:
"""Convert URL to filesystem-safe slug."""
parsed = urlsplit(url)
@@ -180,36 +181,11 @@ def _selectors_for_url(url: str) -> List[str]:
"""Return a list of likely content selectors for known platforms."""
u = url.lower()
sels: List[str] = []
# Twitter/X
if "twitter.com" in u or "x.com" in u:
sels.extend([
"article[role='article']",
"div[data-testid='tweet']",
"div[data-testid='cellInnerDiv'] article",
])
# Instagram
if "instagram.com" in u:
sels.extend([
"article[role='presentation']",
"article[role='article']",
"div[role='dialog'] article",
"section main article",
])
# Reddit
if "reddit.com" in u:
sels.extend([
"shreddit-post",
"div[data-testid='post-container']",
"div[data-click-id='background']",
"article",
])
# Rumble (video post)
if "rumble.com" in u:
sels.extend([
"rumble-player, iframe.rumble",
"div.video-item--main",
"main article",
])
for domain, selectors in SITE_SELECTORS.items():
if domain in u:
sels.extend(selectors)
return sels or ["article"]
@@ -321,7 +297,7 @@ def _archive_url(url: str, timeout: float) -> Tuple[List[str], List[str]]:
def _prepare_output_path(options: ScreenshotOptions) -> Path:
"""Prepare and validate output path for screenshot."""
_ensure_directory(options.output_dir)
ensure_directory(options.output_dir)
explicit_format = _normalise_format(options.output_format) if options.output_format else None
inferred_format: Optional[str] = None
if options.output_path is not None:
@@ -344,20 +320,23 @@ def _prepare_output_path(options: ScreenshotOptions) -> Path:
if current_suffix != expected:
path = path.with_suffix(expected)
options.output_format = final_format
return _unique_path(path)
return unique_path(path)
def _capture_with_playwright(options: ScreenshotOptions, destination: Path, warnings: List[str]) -> None:
def _capture(options: ScreenshotOptions, destination: Path, warnings: List[str]) -> None:
"""Capture screenshot using Playwright."""
debug(f"[_capture] Starting capture for {options.url} -> {destination}")
playwright = None
browser = None
context = None
try:
log("Starting Playwright...", flush=True)
debug("Starting Playwright...", flush=True)
playwright = sync_playwright().start()
log("Launching Chromium browser...", flush=True)
format_name = _normalise_format(options.output_format)
headless = options.headless or format_name == "pdf"
debug(f"[_capture] Format: {format_name}, Headless: {headless}")
if format_name == "pdf" and not options.headless:
warnings.append("pdf output requires headless Chromium; overriding headless mode")
browser = playwright.chromium.launch(
@@ -413,11 +392,14 @@ def _capture_with_playwright(options: ScreenshotOptions, destination: Path, warn
log("Attempting platform-specific content capture...", flush=True)
try:
_platform_preprocess(options.url, page, warnings)
except Exception:
except Exception as e:
debug(f"[_capture] Platform preprocess failed: {e}")
pass
selectors = list(options.target_selectors or [])
if not selectors:
selectors = _selectors_for_url(options.url)
debug(f"[_capture] Trying selectors: {selectors}")
for sel in selectors:
try:
log(f"Trying selector: {sel}", flush=True)
@@ -466,6 +448,7 @@ def _capture_with_playwright(options: ScreenshotOptions, destination: Path, warn
page.screenshot(**screenshot_kwargs)
log(f"Screenshot saved to {destination}", flush=True)
except Exception as exc:
debug(f"[_capture] Exception: {exc}")
raise ScreenshotError(f"Failed to capture screenshot: {exc}") from exc
finally:
log("Cleaning up browser resources...", flush=True)
@@ -483,20 +466,22 @@ def _capture_with_playwright(options: ScreenshotOptions, destination: Path, warn
def _capture_screenshot(options: ScreenshotOptions) -> ScreenshotResult:
"""Capture a screenshot for the given options."""
debug(f"[_capture_screenshot] Preparing capture for {options.url}")
destination = _prepare_output_path(options)
warnings: List[str] = []
_capture_with_playwright(options, destination, warnings)
_capture(options, destination, warnings)
known_urls = _unique_preserve_order([options.url, *options.known_urls])
known_urls = unique_preserve_order([options.url, *options.known_urls])
archive_urls: List[str] = []
if options.archive:
debug(f"[_capture_screenshot] Archiving enabled for {options.url}")
archives, archive_warnings = _archive_url(options.url, options.archive_timeout)
archive_urls.extend(archives)
warnings.extend(archive_warnings)
if archives:
known_urls = _unique_preserve_order([*known_urls, *archives])
known_urls = unique_preserve_order([*known_urls, *archives])
applied_tags = _unique_preserve_order(list(tag for tag in options.tags if tag.strip()))
applied_tags = unique_preserve_order(list(tag for tag in options.tags if tag.strip()))
return ScreenshotResult(
path=destination,
@@ -530,6 +515,8 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
"""
from ._shared import parse_cmdlet_args
debug(f"[_run] screen-shot invoked with args: {args}")
# Help check
try:
if any(str(a).lower() in {"-?", "/?", "--help", "-h", "help", "--cmdlet"} for a in args):
@@ -581,6 +568,8 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
log(f"No URLs to process for screen-shot cmdlet", file=sys.stderr)
return 1
debug(f"[_run] URLs to process: {urls_to_process}")
# ========================================================================
# OUTPUT DIRECTORY RESOLUTION - Priority chain
# ========================================================================
@@ -617,7 +606,7 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
screenshot_dir = Path.home() / "Videos"
log(f"[screen_shot] Using default directory: {screenshot_dir}", flush=True)
_ensure_directory(screenshot_dir)
ensure_directory(screenshot_dir)
# ========================================================================
# PREPARE SCREENSHOT OPTIONS