Files
Medios-Macina/tool/playwright.py
2026-01-09 13:41:18 -08:00

499 lines
19 KiB
Python

from __future__ import annotations
import contextlib
import os
import re
import shutil
import tempfile
import traceback
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Iterator, Optional, Union
from SYS.logger import debug
from playwright.sync_api import TimeoutError as PlaywrightTimeoutError
from playwright.sync_api import sync_playwright
# Re-export for consumers (e.g. cmdlets catching navigation timeouts)
__all__ = [
"PlaywrightTimeoutError",
"PlaywrightTool",
"PlaywrightDefaults",
"PlaywrightDownloadResult",
]
def _get_nested(config: Dict[str, Any], *path: str) -> Any:
cur: Any = config
for key in path:
if not isinstance(cur, dict):
return None
cur = cur.get(key)
return cur
def _resolve_out_dir(arg_outdir: Optional[Union[str, Path]]) -> Path:
"""Resolve an output directory using config when possible."""
if arg_outdir:
p = Path(arg_outdir)
p.mkdir(parents=True, exist_ok=True)
return p
try:
from SYS.config import load_config, resolve_output_dir
cfg = load_config()
p = resolve_output_dir(cfg)
try:
p.mkdir(parents=True, exist_ok=True)
except Exception:
pass
return p
except Exception:
return Path(tempfile.mkdtemp(prefix="pwdl_"))
def _find_filename_from_cd(cd: str) -> Optional[str]:
if not cd:
return None
m = re.search(r"filename\*?=(?:UTF-8''\s*)?\"?([^\";]+)\"?", cd)
if m:
return m.group(1)
return None
@dataclass(slots=True)
class PlaywrightDefaults:
browser: str = "chromium" # chromium|firefox|webkit
headless: bool = True
user_agent: str = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36"
)
viewport_width: int = 1920
viewport_height: int = 1080
navigation_timeout_ms: int = 90_000
ignore_https_errors: bool = True
ffmpeg_path: Optional[str] = None # Path to ffmpeg executable; auto-detected if None
@dataclass(slots=True)
class PlaywrightDownloadResult:
ok: bool
path: Optional[Path] = None
url: Optional[str] = None
mode: Optional[str] = None
error: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
return {
"ok": bool(self.ok),
"path": str(self.path) if self.path else None,
"url": self.url,
"mode": self.mode,
"error": self.error,
}
class PlaywrightTool:
"""Small wrapper to standardize Playwright defaults and lifecycle.
This is meant to keep cmdlets/providers from duplicating:
- sync_playwright start/stop
- browser launch/context creation
- user-agent/viewport defaults
- ffmpeg path resolution (for video recording)
Config overrides (top-level keys):
- playwright.browser="chromium"
- playwright.headless=true
- playwright.user_agent="..."
- playwright.viewport_width=1280
- playwright.viewport_height=1200
- playwright.navigation_timeout_ms=90000
- playwright.ignore_https_errors=true
- playwright.ffmpeg_path="/path/to/ffmpeg" (auto-detected if not set)
FFmpeg resolution (in order):
1. Config key: playwright.ffmpeg_path
2. Environment variable: PLAYWRIGHT_FFMPEG_PATH
3. Project bundled: MPV/ffmpeg/bin/ffmpeg[.exe]
4. System PATH: which ffmpeg
"""
def __init__(self, config: Optional[Dict[str, Any]] = None) -> None:
self._config: Dict[str,
Any] = dict(config or {})
self.defaults = self._load_defaults()
def _load_defaults(self) -> PlaywrightDefaults:
cfg = self._config
defaults = PlaywrightDefaults()
tool_block = _get_nested(cfg, "tool", "playwright")
if not isinstance(tool_block, dict):
tool_block = {}
pw_block = cfg.get("playwright") if isinstance(cfg.get("playwright"),
dict) else {}
if not isinstance(pw_block, dict):
pw_block = {}
def _get(name: str, fallback: Any) -> Any:
val = tool_block.get(name)
if val is None:
val = pw_block.get(name)
if val is None:
val = cfg.get(f"playwright_{name}")
if val is None:
val = _get_nested(cfg, "playwright", name)
return fallback if val is None else val
browser = str(_get("browser", defaults.browser)).strip().lower() or "chromium"
if browser not in {"chromium",
"firefox",
"webkit"}:
browser = "chromium"
headless_raw = _get("headless", defaults.headless)
headless = bool(headless_raw)
ua = str(_get("user_agent", defaults.user_agent))
def _int(name: str, fallback: int) -> int:
raw = _get(name, fallback)
try:
return int(raw)
except Exception:
return fallback
vw = _int("viewport_width", defaults.viewport_width)
vh = _int("viewport_height", defaults.viewport_height)
nav_timeout = _int("navigation_timeout_ms", defaults.navigation_timeout_ms)
ignore_https = bool(_get("ignore_https_errors", defaults.ignore_https_errors))
# Try to find ffmpeg: config override, environment variable, bundled, then system
ffmpeg_path: Optional[str] = None
config_ffmpeg = _get("ffmpeg_path", None)
if config_ffmpeg:
ffmpeg_path = str(config_ffmpeg).strip()
else:
# Check environment variable (supports project ffmpeg)
env_ffmpeg = os.environ.get("PLAYWRIGHT_FFMPEG_PATH")
if env_ffmpeg:
ffmpeg_path = env_ffmpeg
else:
# Try to find bundled ffmpeg in the project (if available)
try:
repo_root = Path(__file__).resolve().parent.parent
bundled_ffmpeg = repo_root / "MPV" / "ffmpeg" / "bin"
if bundled_ffmpeg.exists():
ffmpeg_exe = bundled_ffmpeg / ("ffmpeg.exe" if os.name == "nt" else "ffmpeg")
if ffmpeg_exe.exists():
ffmpeg_path = str(ffmpeg_exe)
except Exception:
pass
# Try system ffmpeg if bundled not found
if not ffmpeg_path:
ffmpeg_path = shutil.which("ffmpeg")
return PlaywrightDefaults(
browser=browser,
headless=headless,
user_agent=ua,
viewport_width=vw,
viewport_height=vh,
navigation_timeout_ms=nav_timeout,
ignore_https_errors=ignore_https,
ffmpeg_path=ffmpeg_path,
)
def require(self) -> None:
"""Ensure Playwright is present; raise a helpful RuntimeError if not."""
try:
assert sync_playwright is not None
except Exception:
raise RuntimeError(
"playwright is required; install with: pip install playwright; then: playwright install"
)
@contextlib.contextmanager
def open_page(
self,
*,
headless: Optional[bool] = None,
user_agent: Optional[str] = None,
viewport_width: Optional[int] = None,
viewport_height: Optional[int] = None,
ignore_https_errors: Optional[bool] = None,
accept_downloads: bool = False,
) -> Iterator[Any]:
"""Context manager yielding a Playwright page with sane defaults."""
self.require()
h = self.defaults.headless if headless is None else bool(headless)
ua = self.defaults.user_agent if user_agent is None else str(user_agent)
vw = self.defaults.viewport_width if viewport_width is None else int(
viewport_width
)
vh = self.defaults.viewport_height if viewport_height is None else int(
viewport_height
)
ihe = (
self.defaults.ignore_https_errors
if ignore_https_errors is None else bool(ignore_https_errors)
)
# Support Playwright-native headers/user-agent.
# If user_agent is unset/empty or explicitly set to one of these tokens,
# we omit the user_agent override so Playwright uses its bundled Chromium UA.
ua_value: Optional[str]
ua_text = str(ua or "").strip()
if not ua_text or ua_text.lower() in {"native",
"playwright",
"default"}:
ua_value = None
else:
ua_value = ua_text
pw = None
browser = None
context = None
try:
assert sync_playwright is not None
pw = sync_playwright().start()
browser_type = getattr(pw, self.defaults.browser, None)
if browser_type is None:
browser_type = pw.chromium
browser = browser_type.launch(
headless=h,
args=["--disable-blink-features=AutomationControlled"],
)
context_kwargs: Dict[str,
Any] = {
"viewport": {
"width": vw,
"height": vh
},
"ignore_https_errors": ihe,
"accept_downloads": bool(accept_downloads),
}
if ua_value is not None:
context_kwargs["user_agent"] = ua_value
context = browser.new_context(**context_kwargs)
page = context.new_page()
yield page
finally:
try:
if context is not None:
context.close()
except Exception:
pass
try:
if browser is not None:
browser.close()
except Exception:
pass
try:
if pw is not None:
pw.stop()
except Exception:
pass
def goto(self, page: Any, url: str) -> None:
"""Navigate with configured timeout."""
try:
page.goto(
url,
timeout=int(self.defaults.navigation_timeout_ms),
wait_until="domcontentloaded"
)
except Exception:
raise
def download_file(
self,
url: str,
*,
selector: str = "form#dl_form button[type=submit]",
out_dir: Optional[Union[str, Path]] = None,
timeout_sec: int = 60,
headless_first: bool = False,
debug_mode: bool = False,
) -> PlaywrightDownloadResult:
"""Download a file by clicking a selector and capturing the response.
The helper mirrors the standalone `scripts/playwright_fetch.py` logic
and tries multiple click strategies (expect_download, tooltip continue,
submitDL, JS/mouse click) to coax stubborn sites.
"""
try:
self.require()
except Exception as exc:
return PlaywrightDownloadResult(ok=False, error=str(exc))
out_path_base = _resolve_out_dir(out_dir)
timeout_ms = max(10_000, int(timeout_sec) * 1000 if timeout_sec is not None else int(self.defaults.navigation_timeout_ms))
nav_timeout_ms = max(timeout_ms, int(self.defaults.navigation_timeout_ms))
selector_timeout_ms = 10_000
# Preserve legacy behaviour: headless_first=False tries headful then headless; True reverses the order.
order = [True, False] if headless_first else [False, True]
seen = set()
modes = []
for m in order:
if m in seen:
continue
seen.add(m)
modes.append(m)
last_error: Optional[str] = None
for mode in modes:
try:
if debug_mode:
debug(f"[playwright] download url={url} selector={selector} headless={mode} out_dir={out_path_base}")
with self.open_page(headless=mode, accept_downloads=True) as page:
page.goto(url, wait_until="networkidle", timeout=nav_timeout_ms)
page.wait_for_selector(selector, timeout=selector_timeout_ms)
self._wait_for_block_clear(page, timeout_ms=6000)
el = page.query_selector(selector)
# 1) Direct click with expect_download
try:
with page.expect_download(timeout=timeout_ms) as dl_info:
if el:
el.click()
else:
page.click(selector)
dl = dl_info.value
filename = dl.suggested_filename or Path(dl.url).name or "download"
out_path = out_path_base / filename
dl.save_as(str(out_path))
return PlaywrightDownloadResult(ok=True, path=out_path, url=dl.url, mode="download")
except PlaywrightTimeoutError:
last_error = "download timeout"
except Exception as click_exc:
last_error = str(click_exc) or last_error
# 2) Tooltip continue flow
try:
btn = page.query_selector("#tooltip4 input[type=button]")
if btn:
btn.click()
with page.expect_download(timeout=timeout_ms) as dl_info:
if el:
el.click()
else:
page.click(selector)
dl = dl_info.value
filename = dl.suggested_filename or Path(dl.url).name or "download"
out_path = out_path_base / filename
dl.save_as(str(out_path))
return PlaywrightDownloadResult(ok=True, path=out_path, url=dl.url, mode="tooltip-download")
except Exception as tooltip_exc:
last_error = str(tooltip_exc) or last_error
# 3) Submit handler that respects tooltip flow
try:
page.evaluate("() => { try { submitDL(document.forms['dl_form'], 'tooltip4'); } catch (e) {} }")
resp = page.wait_for_response(
lambda r: r.status == 200 and any(k.lower() == 'content-disposition' for k in r.headers.keys()),
timeout=timeout_ms,
)
if resp:
out_path = self._save_response(resp, out_path_base)
if out_path:
return PlaywrightDownloadResult(ok=True, path=out_path, url=getattr(resp, "url", None), mode="response")
except Exception as resp_exc:
last_error = str(resp_exc) or last_error
# 4) JS/mouse click and capture response
try:
if el:
try:
page.evaluate("el => el.click()", el)
except Exception:
page.evaluate(f"() => document.querySelector('{selector}').click()")
else:
page.evaluate(f"() => document.querySelector('{selector}').click()")
if el:
try:
box = el.bounding_box()
if box:
page.mouse.move(box['x'] + box['width'] / 2, box['y'] + box['height'] / 2)
page.mouse.click(box['x'] + box['width'] / 2, box['y'] + box['height'] / 2)
except Exception:
pass
resp = page.wait_for_response(
lambda r: r.status == 200 and any(k.lower() == 'content-disposition' for k in r.headers.keys()),
timeout=timeout_ms,
)
if resp:
out_path = self._save_response(resp, out_path_base)
if out_path:
return PlaywrightDownloadResult(ok=True, path=out_path, url=getattr(resp, "url", None), mode="response-fallback")
except Exception as final_exc:
last_error = str(final_exc) or last_error
except Exception as exc:
last_error = str(exc)
if debug_mode:
try:
debug(f"[playwright] attempt failed (headless={mode}): {traceback.format_exc()}")
except Exception:
pass
continue
return PlaywrightDownloadResult(ok=False, error=last_error or "no download captured")
def debug_dump(self) -> None:
try:
debug(
f"[playwright] browser={self.defaults.browser} headless={self.defaults.headless} "
f"viewport={self.defaults.viewport_width}x{self.defaults.viewport_height} "
f"nav_timeout_ms={self.defaults.navigation_timeout_ms}"
)
except Exception:
pass
def _wait_for_block_clear(self, page: Any, timeout_ms: int = 8000) -> bool:
try:
page.wait_for_function(
"() => { for (const k in window) { if (Object.prototype.hasOwnProperty.call(window, k) && k.startsWith('blocked_')) { try { return window[k] === false; } catch(e) {} return false; } } return true; }",
timeout=timeout_ms,
)
return True
except Exception:
return False
def _save_response(self, response: Any, out_dir: Path) -> Optional[Path]:
try:
cd = ""
try:
headers = getattr(response, "headers", {}) or {}
cd = "".join([v for k, v in headers.items() if str(k).lower() == "content-disposition"])
except Exception:
cd = ""
filename = _find_filename_from_cd(cd) or Path(str(getattr(response, "url", "") or "")).name or "download"
body = response.body()
out_path = out_dir / filename
out_path.write_bytes(body)
return out_path
except Exception as exc:
try:
debug(f"[playwright] failed to save response: {exc}")
except Exception:
pass
return None