110 lines
3.4 KiB
Python
110 lines
3.4 KiB
Python
from __future__ import annotations
|
|
|
|
import sys
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
from ProviderCore.base import SearchProvider, SearchResult
|
|
from SYS.logger import log, debug
|
|
|
|
try:
|
|
from playwright.sync_api import sync_playwright
|
|
except ImportError: # pragma: no cover
|
|
sync_playwright = None
|
|
|
|
|
|
class Bandcamp(SearchProvider):
|
|
"""Search provider for Bandcamp."""
|
|
|
|
def search(
|
|
self,
|
|
query: str,
|
|
limit: int = 50,
|
|
filters: Optional[Dict[str, Any]] = None,
|
|
**kwargs: Any,
|
|
) -> List[SearchResult]:
|
|
if sync_playwright is None:
|
|
log(
|
|
"[bandcamp] Playwright not available. Install with: pip install playwright",
|
|
file=sys.stderr,
|
|
)
|
|
return []
|
|
|
|
try:
|
|
with sync_playwright() as p:
|
|
browser = p.chromium.launch(headless=True)
|
|
page = browser.new_page()
|
|
|
|
if query.strip().lower().startswith("artist:"):
|
|
artist_name = query[7:].strip().strip('"')
|
|
search_url = f"https://bandcamp.com/search?q={artist_name}&item_type=b"
|
|
else:
|
|
search_url = f"https://bandcamp.com/search?q={query}&item_type=a"
|
|
|
|
results = self._scrape_url(page, search_url, limit)
|
|
|
|
browser.close()
|
|
return results
|
|
|
|
except Exception as exc:
|
|
log(f"[bandcamp] Search error: {exc}", file=sys.stderr)
|
|
return []
|
|
|
|
def _scrape_url(self, page: Any, url: str, limit: int) -> List[SearchResult]:
|
|
debug(f"[bandcamp] Scraping: {url}")
|
|
|
|
page.goto(url)
|
|
page.wait_for_load_state("domcontentloaded")
|
|
|
|
results: List[SearchResult] = []
|
|
|
|
search_results = page.query_selector_all(".searchresult")
|
|
if not search_results:
|
|
return results
|
|
|
|
for item in search_results[:limit]:
|
|
try:
|
|
heading = item.query_selector(".heading")
|
|
if not heading:
|
|
continue
|
|
|
|
link = heading.query_selector("a")
|
|
if not link:
|
|
continue
|
|
|
|
title = link.inner_text().strip()
|
|
target_url = link.get_attribute("href")
|
|
|
|
subhead = item.query_selector(".subhead")
|
|
artist = subhead.inner_text().strip() if subhead else "Unknown"
|
|
|
|
itemtype = item.query_selector(".itemtype")
|
|
media_type = itemtype.inner_text().strip() if itemtype else "album"
|
|
|
|
results.append(
|
|
SearchResult(
|
|
table="bandcamp",
|
|
title=title,
|
|
path=target_url,
|
|
detail=f"By: {artist}",
|
|
annotations=[media_type],
|
|
media_kind="audio",
|
|
columns=[
|
|
("Name", title),
|
|
("Artist", artist),
|
|
("Type", media_type),
|
|
],
|
|
full_metadata={
|
|
"artist": artist,
|
|
"type": media_type,
|
|
},
|
|
)
|
|
)
|
|
|
|
except Exception as exc:
|
|
debug(f"[bandcamp] Error parsing result: {exc}")
|
|
|
|
return results
|
|
|
|
def validate(self) -> bool:
|
|
return sync_playwright is not None
|