from __future__ import annotations import sys from urllib.parse import urlparse from typing import Any, Dict, List, Optional from ProviderCore.base import Provider, SearchResult from SYS.logger import log, debug try: from playwright.sync_api import sync_playwright except ImportError: # pragma: no cover sync_playwright = None class Bandcamp(Provider): """Search provider for Bandcamp.""" @staticmethod def _base_url(raw_url: str) -> str: """Normalize a Bandcamp URL down to scheme://netloc.""" text = str(raw_url or "").strip() if not text: return "" try: parsed = urlparse(text) if not parsed.scheme or not parsed.netloc: return text return f"{parsed.scheme}://{parsed.netloc}" except Exception: return text @classmethod def _discography_url(cls, raw_url: str) -> str: base = cls._base_url(raw_url) if not base: return "" # Bandcamp discography lives under /music. return base.rstrip("/") + "/music" def _scrape_artist_page(self, page: Any, artist_url: str, limit: int = 50) -> List[SearchResult]: """Scrape an artist page for albums/tracks (discography).""" base = self._base_url(artist_url) discography_url = self._discography_url(artist_url) if not base or not discography_url: return [] debug(f"[bandcamp] Scraping artist page: {discography_url}") page.goto(discography_url) page.wait_for_load_state("domcontentloaded") results: List[SearchResult] = [] cards = page.query_selector_all("li.music-grid-item") or [] if not cards: # Fallback selector cards = page.query_selector_all(".music-grid-item") or [] for item in cards[:limit]: try: link = item.query_selector("a") if not link: continue href = link.get_attribute("href") or "" href = str(href).strip() if not href: continue if href.startswith("/"): target = base.rstrip("/") + href elif href.startswith("http://") or href.startswith("https://"): target = href else: target = base.rstrip("/") + "/" + href title_node = item.query_selector("p.title" ) or item.query_selector(".title") title = title_node.inner_text().strip() if title_node else "" if title: title = " ".join(title.split()) if not title: title = target.rsplit("/", 1)[-1] kind = ( "album" if "/album/" in target else ("track" if "/track/" in target else "item") ) results.append( SearchResult( table="bandcamp", title=title, path=target, detail="", annotations=[kind], media_kind="audio", columns=[ ("Title", title), ("Type", kind), ("Url", target), ], full_metadata={ "type": kind, "url": target, "artist_url": base, }, ) ) except Exception as exc: debug(f"[bandcamp] Error parsing artist item: {exc}") return results def selector( self, selected_items: List[Any], *, ctx: Any, stage_is_last: bool = True, **_kwargs: Any ) -> bool: """Handle Bandcamp `@N` selection. If the selected item is an ARTIST result, selecting it auto-expands into a discography table by scraping the artist URL. """ if not stage_is_last: return False if sync_playwright is None: return False # Only handle artist selections. chosen: List[Dict[str, Any]] = [] for item in selected_items or []: payload: Dict[str, Any] = {} if isinstance(item, dict): payload = item else: try: if hasattr(item, "to_dict"): payload = item.to_dict() # type: ignore[assignment] except Exception: payload = {} if not payload: try: payload = { "title": getattr(item, "title", None), "url": getattr(item, "url", None), "path": getattr(item, "path", None), "metadata": getattr(item, "metadata", None), "extra": getattr(item, "extra", None), } except Exception: payload = {} meta = payload.get("metadata") or payload.get("full_metadata") or {} if not isinstance(meta, dict): meta = {} extra = payload.get("extra") if isinstance(extra, dict): meta = { **meta, **extra } type_val = str(meta.get("type") or "").strip().lower() if type_val != "artist": continue title = str(payload.get("title") or "").strip() url_val = str( payload.get("url") or payload.get("path") or meta.get("url") or "" ).strip() base = self._base_url(url_val) if not base: continue chosen.append( { "title": title, "url": base, "location": str(meta.get("artist") or "").strip() } ) if not chosen: return False # Build a new table from artist discography. try: from SYS.result_table import ResultTable from SYS.rich_display import stdout_console except Exception: return False artist_title = chosen[0].get("title") or "artist" artist_url = chosen[0].get("url") or "" try: with sync_playwright() as p: browser = p.chromium.launch(headless=True) page = browser.new_page() discography = self._scrape_artist_page(page, artist_url, limit=50) browser.close() except Exception as exc: print(f"bandcamp artist lookup failed: {exc}\n") return True table = ResultTable(f"Bandcamp: artist:{artist_title}").set_preserve_order(True) table.set_table("bandcamp") try: table.set_value_case("lower") except Exception: pass results_payload: List[Dict[str, Any]] = [] for r in discography: table.add_result(r) try: results_payload.append(r.to_dict()) except Exception: results_payload.append( { "table": "bandcamp", "title": getattr(r, "title", ""), "path": getattr(r, "path", ""), } ) try: ctx.set_last_result_table(table, results_payload) ctx.set_current_stage_table(table) except Exception: pass try: stdout_console().print() stdout_console().print(table) except Exception: pass return True def search( self, query: str, limit: int = 50, filters: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> List[SearchResult]: if sync_playwright is None: log( "[bandcamp] Playwright not available. Install with: pip install playwright", file=sys.stderr, ) return [] try: with sync_playwright() as p: browser = p.chromium.launch(headless=True) page = browser.new_page() if query.strip().lower().startswith("artist:"): artist_name = query[7:].strip().strip('"') search_url = f"https://bandcamp.com/search?q={artist_name}&item_type=b" else: search_url = f"https://bandcamp.com/search?q={query}&item_type=a" results = self._scrape_url(page, search_url, limit) browser.close() return results except Exception as exc: log(f"[bandcamp] Search error: {exc}", file=sys.stderr) return [] def _scrape_url(self, page: Any, url: str, limit: int) -> List[SearchResult]: debug(f"[bandcamp] Scraping: {url}") page.goto(url) page.wait_for_load_state("domcontentloaded") results: List[SearchResult] = [] search_results = page.query_selector_all(".searchresult") if not search_results: return results for item in search_results[:limit]: try: heading = item.query_selector(".heading") if not heading: continue link = heading.query_selector("a") if not link: continue title = link.inner_text().strip() target_url = link.get_attribute("href") base_url = self._base_url(str(target_url or "")) subhead = item.query_selector(".subhead") artist = subhead.inner_text().strip() if subhead else "Unknown" itemtype = item.query_selector(".itemtype") media_type = itemtype.inner_text().strip() if itemtype else "album" results.append( SearchResult( table="bandcamp", title=title, path=target_url, detail=f"By: {artist}", annotations=[media_type], media_kind="audio", columns=[ ("Title", title), ("Location", artist), ("Type", media_type), ("Url", base_url or str(target_url or "")), ], full_metadata={ "artist": artist, "type": media_type, "url": base_url or str(target_url or ""), }, ) ) except Exception as exc: debug(f"[bandcamp] Error parsing result: {exc}") return results def validate(self) -> bool: return sync_playwright is not None