from __future__ import annotations import contextlib import os import re import shutil import tempfile import traceback from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, Iterator, Optional, Union from SYS.logger import debug from playwright.sync_api import TimeoutError as PlaywrightTimeoutError from playwright.sync_api import sync_playwright # Re-export for consumers (e.g. cmdlets catching navigation timeouts) __all__ = [ "PlaywrightTimeoutError", "PlaywrightTool", "PlaywrightDefaults", "PlaywrightDownloadResult", ] def _get_nested(config: Dict[str, Any], *path: str) -> Any: cur: Any = config for key in path: if not isinstance(cur, dict): return None cur = cur.get(key) return cur def _resolve_out_dir(arg_outdir: Optional[Union[str, Path]]) -> Path: """Resolve an output directory using config when possible.""" if arg_outdir: p = Path(arg_outdir) p.mkdir(parents=True, exist_ok=True) return p try: from SYS.config import load_config, resolve_output_dir cfg = load_config() p = resolve_output_dir(cfg) try: p.mkdir(parents=True, exist_ok=True) except Exception: pass return p except Exception: return Path(tempfile.mkdtemp(prefix="pwdl_")) def _find_filename_from_cd(cd: str) -> Optional[str]: if not cd: return None m = re.search(r"filename\*?=(?:UTF-8''\s*)?\"?([^\";]+)\"?", cd) if m: return m.group(1) return None @dataclass(slots=True) class PlaywrightDefaults: browser: str = "chromium" # chromium|firefox|webkit headless: bool = True user_agent: str = ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/120.0.0.0 Safari/537.36" ) viewport_width: int = 1920 viewport_height: int = 1080 navigation_timeout_ms: int = 90_000 ignore_https_errors: bool = True ffmpeg_path: Optional[str] = None # Path to ffmpeg executable; auto-detected if None @dataclass(slots=True) class PlaywrightDownloadResult: ok: bool path: Optional[Path] = None url: Optional[str] = None mode: Optional[str] = None error: Optional[str] = None def to_dict(self) -> Dict[str, Any]: return { "ok": bool(self.ok), "path": str(self.path) if self.path else None, "url": self.url, "mode": self.mode, "error": self.error, } class PlaywrightTool: """Small wrapper to standardize Playwright defaults and lifecycle. This is meant to keep cmdlets/providers from duplicating: - sync_playwright start/stop - browser launch/context creation - user-agent/viewport defaults - ffmpeg path resolution (for video recording) Config overrides (top-level keys): - playwright.browser="chromium" - playwright.headless=true - playwright.user_agent="..." - playwright.viewport_width=1280 - playwright.viewport_height=1200 - playwright.navigation_timeout_ms=90000 - playwright.ignore_https_errors=true - playwright.ffmpeg_path="/path/to/ffmpeg" (auto-detected if not set) FFmpeg resolution (in order): 1. Config key: playwright.ffmpeg_path 2. Environment variable: PLAYWRIGHT_FFMPEG_PATH 3. Project bundled: MPV/ffmpeg/bin/ffmpeg[.exe] 4. System PATH: which ffmpeg """ def __init__(self, config: Optional[Dict[str, Any]] = None) -> None: self._config: Dict[str, Any] = dict(config or {}) self.defaults = self._load_defaults() def _load_defaults(self) -> PlaywrightDefaults: cfg = self._config defaults = PlaywrightDefaults() tool_block = _get_nested(cfg, "tool", "playwright") if not isinstance(tool_block, dict): tool_block = {} pw_block = cfg.get("playwright") if isinstance(cfg.get("playwright"), dict) else {} if not isinstance(pw_block, dict): pw_block = {} def _get(name: str, fallback: Any) -> Any: val = tool_block.get(name) if val is None: val = pw_block.get(name) if val is None: val = cfg.get(f"playwright_{name}") if val is None: val = _get_nested(cfg, "playwright", name) return fallback if val is None else val browser = str(_get("browser", defaults.browser)).strip().lower() or "chromium" if browser not in {"chromium", "firefox", "webkit"}: browser = "chromium" headless_raw = _get("headless", defaults.headless) headless = bool(headless_raw) ua = str(_get("user_agent", defaults.user_agent)) def _int(name: str, fallback: int) -> int: raw = _get(name, fallback) try: return int(raw) except Exception: return fallback vw = _int("viewport_width", defaults.viewport_width) vh = _int("viewport_height", defaults.viewport_height) nav_timeout = _int("navigation_timeout_ms", defaults.navigation_timeout_ms) ignore_https = bool(_get("ignore_https_errors", defaults.ignore_https_errors)) # Try to find ffmpeg: config override, environment variable, bundled, then system # This checks if ffmpeg is actually available (not just the path to it) ffmpeg_path: Optional[str] = None config_ffmpeg = _get("ffmpeg_path", None) if config_ffmpeg: # User explicitly configured ffmpeg path candidate = str(config_ffmpeg).strip() if Path(candidate).exists(): ffmpeg_path = candidate else: debug(f"Configured ffmpeg path does not exist: {candidate}") if not ffmpeg_path: # Check environment variable (supports project ffmpeg) env_ffmpeg = os.environ.get("PLAYWRIGHT_FFMPEG_PATH") if env_ffmpeg and Path(env_ffmpeg).exists(): ffmpeg_path = env_ffmpeg elif env_ffmpeg: debug(f"PLAYWRIGHT_FFMPEG_PATH set but path does not exist: {env_ffmpeg}") if not ffmpeg_path: # Try to find bundled ffmpeg in the project (Windows-only, in MPV/ffmpeg/bin) try: repo_root = Path(__file__).resolve().parent.parent bundled_ffmpeg = repo_root / "MPV" / "ffmpeg" / "bin" if bundled_ffmpeg.exists(): ffmpeg_exe = bundled_ffmpeg / ("ffmpeg.exe" if os.name == "nt" else "ffmpeg") if ffmpeg_exe.exists(): ffmpeg_path = str(ffmpeg_exe) debug(f"Found bundled ffmpeg at: {ffmpeg_path}") except Exception as e: debug(f"Error checking for bundled ffmpeg: {e}") if not ffmpeg_path: # Try system ffmpeg if bundled not found system_ffmpeg = shutil.which("ffmpeg") if system_ffmpeg: ffmpeg_path = system_ffmpeg debug(f"Found system ffmpeg at: {ffmpeg_path}") else: # ffmpeg not found - log a debug message but don't fail # ffmpeg-python may still work with system installation, or user might not need it debug("ffmpeg not found on PATH. For best compatibility, install ffmpeg: Windows (use bundled or choco install ffmpeg), macOS (brew install ffmpeg), Linux (apt install ffmpeg or equivalent)") return PlaywrightDefaults( browser=browser, headless=headless, user_agent=ua, viewport_width=vw, viewport_height=vh, navigation_timeout_ms=nav_timeout, ignore_https_errors=ignore_https, ffmpeg_path=ffmpeg_path, ) def require(self) -> None: """Ensure Playwright is present; raise a helpful RuntimeError if not.""" try: assert sync_playwright is not None except Exception: raise RuntimeError( "playwright is required; install with: pip install playwright; then: playwright install" ) def ffmpeg_available(self) -> bool: """Check if ffmpeg is available on the system.""" return bool(self.defaults.ffmpeg_path) def require_ffmpeg(self) -> None: """Require ffmpeg to be available; raise a helpful error if not. This should be called before operations that need ffmpeg (e.g., video recording). """ if not self.ffmpeg_available(): raise RuntimeError( "ffmpeg is required but not found on your system.\n" "Install it using:\n" " Windows: choco install ffmpeg (if using Chocolatey) or use the bundled version in MPV/ffmpeg\n" " macOS: brew install ffmpeg\n" " Linux: apt install ffmpeg (Ubuntu/Debian) or equivalent for your distribution\n" "\n" "Or set the PLAYWRIGHT_FFMPEG_PATH environment variable to point to your ffmpeg executable." ) @contextlib.contextmanager def open_page( self, *, headless: Optional[bool] = None, user_agent: Optional[str] = None, viewport_width: Optional[int] = None, viewport_height: Optional[int] = None, ignore_https_errors: Optional[bool] = None, accept_downloads: bool = False, ) -> Iterator[Any]: """Context manager yielding a Playwright page with sane defaults.""" self.require() h = self.defaults.headless if headless is None else bool(headless) ua = self.defaults.user_agent if user_agent is None else str(user_agent) vw = self.defaults.viewport_width if viewport_width is None else int( viewport_width ) vh = self.defaults.viewport_height if viewport_height is None else int( viewport_height ) ihe = ( self.defaults.ignore_https_errors if ignore_https_errors is None else bool(ignore_https_errors) ) # Support Playwright-native headers/user-agent. # If user_agent is unset/empty or explicitly set to one of these tokens, # we omit the user_agent override so Playwright uses its bundled Chromium UA. ua_value: Optional[str] ua_text = str(ua or "").strip() if not ua_text or ua_text.lower() in {"native", "playwright", "default"}: ua_value = None else: ua_value = ua_text pw = None browser = None context = None try: assert sync_playwright is not None pw = sync_playwright().start() browser_type = getattr(pw, self.defaults.browser, None) if browser_type is None: browser_type = pw.chromium browser = browser_type.launch( headless=h, args=["--disable-blink-features=AutomationControlled"], ) context_kwargs: Dict[str, Any] = { "viewport": { "width": vw, "height": vh }, "ignore_https_errors": ihe, "accept_downloads": bool(accept_downloads), } if ua_value is not None: context_kwargs["user_agent"] = ua_value context = browser.new_context(**context_kwargs) page = context.new_page() yield page finally: try: if context is not None: context.close() except Exception: pass try: if browser is not None: browser.close() except Exception: pass try: if pw is not None: pw.stop() except Exception: pass def goto(self, page: Any, url: str) -> None: """Navigate with configured timeout.""" try: page.goto( url, timeout=int(self.defaults.navigation_timeout_ms), wait_until="domcontentloaded" ) except Exception: raise def download_file( self, url: str, *, selector: str = "form#dl_form button[type=submit]", out_dir: Optional[Union[str, Path]] = None, timeout_sec: int = 60, headless_first: bool = False, debug_mode: bool = False, ) -> PlaywrightDownloadResult: """Download a file by clicking a selector and capturing the response. The helper mirrors the standalone `scripts/playwright_fetch.py` logic and tries multiple click strategies (expect_download, tooltip continue, submitDL, JS/mouse click) to coax stubborn sites. """ try: self.require() except Exception as exc: return PlaywrightDownloadResult(ok=False, error=str(exc)) out_path_base = _resolve_out_dir(out_dir) timeout_ms = max(10_000, int(timeout_sec) * 1000 if timeout_sec is not None else int(self.defaults.navigation_timeout_ms)) nav_timeout_ms = max(timeout_ms, int(self.defaults.navigation_timeout_ms)) selector_timeout_ms = 10_000 # Preserve legacy behaviour: headless_first=False tries headful then headless; True reverses the order. order = [True, False] if headless_first else [False, True] seen = set() modes = [] for m in order: if m in seen: continue seen.add(m) modes.append(m) last_error: Optional[str] = None for mode in modes: try: if debug_mode: debug(f"[playwright] download url={url} selector={selector} headless={mode} out_dir={out_path_base}") with self.open_page(headless=mode, accept_downloads=True) as page: page.goto(url, wait_until="networkidle", timeout=nav_timeout_ms) page.wait_for_selector(selector, timeout=selector_timeout_ms) self._wait_for_block_clear(page, timeout_ms=6000) el = page.query_selector(selector) # 1) Direct click with expect_download try: with page.expect_download(timeout=timeout_ms) as dl_info: if el: el.click() else: page.click(selector) dl = dl_info.value filename = dl.suggested_filename or Path(dl.url).name or "download" out_path = out_path_base / filename dl.save_as(str(out_path)) return PlaywrightDownloadResult(ok=True, path=out_path, url=dl.url, mode="download") except PlaywrightTimeoutError: last_error = "download timeout" except Exception as click_exc: last_error = str(click_exc) or last_error # 2) Tooltip continue flow try: btn = page.query_selector("#tooltip4 input[type=button]") if btn: btn.click() with page.expect_download(timeout=timeout_ms) as dl_info: if el: el.click() else: page.click(selector) dl = dl_info.value filename = dl.suggested_filename or Path(dl.url).name or "download" out_path = out_path_base / filename dl.save_as(str(out_path)) return PlaywrightDownloadResult(ok=True, path=out_path, url=dl.url, mode="tooltip-download") except Exception as tooltip_exc: last_error = str(tooltip_exc) or last_error # 3) Submit handler that respects tooltip flow try: page.evaluate("() => { try { submitDL(document.forms['dl_form'], 'tooltip4'); } catch (e) {} }") resp = page.wait_for_response( lambda r: r.status == 200 and any(k.lower() == 'content-disposition' for k in r.headers.keys()), timeout=timeout_ms, ) if resp: out_path = self._save_response(resp, out_path_base) if out_path: return PlaywrightDownloadResult(ok=True, path=out_path, url=getattr(resp, "url", None), mode="response") except Exception as resp_exc: last_error = str(resp_exc) or last_error # 4) JS/mouse click and capture response try: if el: try: page.evaluate("el => el.click()", el) except Exception: page.evaluate(f"() => document.querySelector('{selector}').click()") else: page.evaluate(f"() => document.querySelector('{selector}').click()") if el: try: box = el.bounding_box() if box: page.mouse.move(box['x'] + box['width'] / 2, box['y'] + box['height'] / 2) page.mouse.click(box['x'] + box['width'] / 2, box['y'] + box['height'] / 2) except Exception: pass resp = page.wait_for_response( lambda r: r.status == 200 and any(k.lower() == 'content-disposition' for k in r.headers.keys()), timeout=timeout_ms, ) if resp: out_path = self._save_response(resp, out_path_base) if out_path: return PlaywrightDownloadResult(ok=True, path=out_path, url=getattr(resp, "url", None), mode="response-fallback") except Exception as final_exc: last_error = str(final_exc) or last_error except Exception as exc: last_error = str(exc) if debug_mode: try: debug(f"[playwright] attempt failed (headless={mode}): {traceback.format_exc()}") except Exception: pass continue return PlaywrightDownloadResult(ok=False, error=last_error or "no download captured") def debug_dump(self) -> None: try: debug( f"[playwright] browser={self.defaults.browser} headless={self.defaults.headless} " f"viewport={self.defaults.viewport_width}x{self.defaults.viewport_height} " f"nav_timeout_ms={self.defaults.navigation_timeout_ms}" ) except Exception: pass def _wait_for_block_clear(self, page: Any, timeout_ms: int = 8000) -> bool: try: page.wait_for_function( "() => { for (const k in window) { if (Object.prototype.hasOwnProperty.call(window, k) && k.startsWith('blocked_')) { try { return window[k] === false; } catch(e) {} return false; } } return true; }", timeout=timeout_ms, ) return True except Exception: return False def _save_response(self, response: Any, out_dir: Path) -> Optional[Path]: try: cd = "" try: headers = getattr(response, "headers", {}) or {} cd = "".join([v for k, v in headers.items() if str(k).lower() == "content-disposition"]) except Exception: cd = "" filename = _find_filename_from_cd(cd) or Path(str(getattr(response, "url", "") or "")).name or "download" body = response.body() out_path = out_dir / filename out_path.write_bytes(body) return out_path except Exception as exc: try: debug(f"[playwright] failed to save response: {exc}") except Exception: pass return None