This commit is contained in:
nose
2025-12-01 01:10:16 -08:00
parent 2b93edac10
commit 6b9ed7d4ab
17 changed files with 1644 additions and 470 deletions

View File

@@ -28,9 +28,16 @@ from typing import Any, Dict, List, Optional, Sequence, Tuple
from dataclasses import dataclass
from pathlib import Path
import sys
try:
from playwright.sync_api import sync_playwright
PLAYWRIGHT_AVAILABLE = True
except ImportError:
PLAYWRIGHT_AVAILABLE = False
import subprocess
import json
import shutil
from helper.logger import log, debug
from helper.logger import log, debug
@@ -1580,8 +1587,293 @@ class YoutubeSearchProvider(SearchProvider):
return shutil.which("yt-dlp") is not None
class BandcampProvider(SearchProvider):
"""
Search provider for Bandcamp using Playwright scraper.
"""
RESULT_FIELDS = [
("name", "Name", None),
("artist", "Artist/Loc", None),
("type", "Type", None)
]
def search(
self,
query: str,
limit: int = 50,
filters: Optional[Dict[str, Any]] = None,
**kwargs
) -> List[SearchResult]:
if not PLAYWRIGHT_AVAILABLE:
print("Playwright library not available. Please install it (pip install playwright).")
return []
results = []
try:
with sync_playwright() as p:
# Launch browser (headless)
browser = p.chromium.launch(headless=True)
page = browser.new_page()
# Check if query is a URL (Artist/Album Scraping Mode)
if query.startswith("http://") or query.startswith("https://"):
return self._scrape_url(page, query, limit)
# Search Mode
# Parse query for prefixes
search_type = "t" # Default to track
clean_query = query
if "artist:" in query.lower():
search_type = "b"
clean_query = query.lower().replace("artist:", "").strip()
elif "album:" in query.lower():
search_type = "a"
clean_query = query.lower().replace("album:", "").strip()
elif "track:" in query.lower():
search_type = "t"
clean_query = query.lower().replace("track:", "").strip()
elif "label:" in query.lower():
search_type = "b"
clean_query = query.lower().replace("label:", "").strip()
# Filters override prefix
if filters:
ftype = filters.get("type", "").lower()
if ftype in ["album", "albums"]:
search_type = "a"
elif ftype in ["artist", "artists", "label", "labels"]:
search_type = "b"
elif ftype in ["track", "tracks"]:
search_type = "t"
# Construct URL with item_type
url = f"https://bandcamp.com/search?q={clean_query}&item_type={search_type}"
debug(f"[Bandcamp] Navigating to search URL: {url}")
page.goto(url)
page.wait_for_load_state("domcontentloaded")
# Wait for results
try:
# Wait for the search results to appear in the DOM
page.wait_for_selector(".searchresult", timeout=10000)
except Exception as e:
# No results found or timeout
log(f"Bandcamp search timeout or no results: {e}")
browser.close()
return []
# Extract items
items = page.query_selector_all(".searchresult")
debug(f"[Bandcamp] Found {len(items)} results")
for item in items:
if len(results) >= limit:
break
try:
# Extract data
heading_el = item.query_selector(".heading a")
if not heading_el:
debug("[Bandcamp] Skipping item: No heading found")
continue
name = heading_el.inner_text().strip()
item_url = heading_el.get_attribute("href")
# Clean URL (remove query params)
if item_url and "?" in item_url:
item_url = item_url.split("?")[0]
item_type_el = item.query_selector(".itemtype")
item_type = item_type_el.inner_text().strip() if item_type_el else "Unknown"
subhead_el = item.query_selector(".subhead")
subhead = subhead_el.inner_text().strip() if subhead_el else ""
art_el = item.query_selector(".art img")
img = art_el.get_attribute("src") if art_el else None
# Map to metadata
metadata = {
"name": name,
"type": item_type,
"url": item_url,
"img": img,
"subhead": subhead
}
# Refine metadata based on type
artist_or_loc = subhead
if "ALBUM" in item_type.upper():
artist_or_loc = subhead.replace("by ", "").strip()
metadata["artist"] = artist_or_loc
elif "ARTIST" in item_type.upper() or "LABEL" in item_type.upper():
metadata["location"] = subhead
elif "TRACK" in item_type.upper():
artist_or_loc = subhead.replace("by ", "").strip()
metadata["artist"] = artist_or_loc
columns = [
("Name", name),
("Artist/Loc", artist_or_loc),
("Type", item_type)
]
results.append(SearchResult(
origin="bandcamp",
title=name,
target=item_url,
full_metadata=metadata,
columns=columns
))
except Exception as e:
# Skip malformed items
debug(f"[Bandcamp] Error parsing item: {e}")
continue
browser.close()
except Exception as e:
log(f"Bandcamp search error: {e}")
return []
return results
def _scrape_url(self, page, url: str, limit: int) -> List[SearchResult]:
"""Scrape a Bandcamp artist or album page."""
debug(f"[Bandcamp] Scraping URL: {url}")
# If it's an artist page, try to go to /music to see all
if ".bandcamp.com" in url and "/music" not in url and "/album/" not in url and "/track/" not in url:
# Check if it's likely an artist root
url = url.rstrip("/") + "/music"
debug(f"[Bandcamp] Adjusted to music page: {url}")
page.goto(url)
page.wait_for_load_state("domcontentloaded")
results = []
# Check for grid items (Artist page /music)
grid_items = page.query_selector_all(".music-grid-item")
if grid_items:
debug(f"[Bandcamp] Found {len(grid_items)} grid items")
# Try to get global artist name from page metadata/header as fallback
page_artist = ""
try:
og_site_name = page.query_selector('meta[property="og:site_name"]')
if og_site_name:
page_artist = og_site_name.get_attribute("content") or ""
if not page_artist:
band_name = page.query_selector('#band-name-location .title')
if band_name:
page_artist = band_name.inner_text().strip()
except Exception:
pass
for item in grid_items:
if len(results) >= limit:
break
try:
title_el = item.query_selector(".title")
# Sanitize title to remove newlines which break the table
title = title_el.inner_text().strip().replace("\n", " ").replace("\r", "") if title_el else "Unknown"
# Remove extra spaces
title = " ".join(title.split())
link_el = item.query_selector("a")
href = link_el.get_attribute("href") if link_el else ""
if href and not href.startswith("http"):
# Relative link, construct full URL
base = url.split("/music")[0]
href = base + href
artist_el = item.query_selector(".artist")
artist = artist_el.inner_text().replace("by ", "").strip() if artist_el else ""
# Use page artist if item artist is missing
if not artist and page_artist:
artist = page_artist
# Sanitize artist
artist = artist.replace("\n", " ").replace("\r", "")
artist = " ".join(artist.split())
columns = [
("Name", title),
("Artist", artist),
("Type", "Album/Track")
]
results.append(SearchResult(
origin="bandcamp",
title=title,
target=href,
full_metadata={"artist": artist},
columns=columns
))
except Exception as e:
debug(f"[Bandcamp] Error parsing grid item: {e}")
continue
return results
# Check for track list (Album page)
track_rows = page.query_selector_all(".track_row_view")
if track_rows:
debug(f"[Bandcamp] Found {len(track_rows)} track rows")
# Get Album Artist
artist_el = page.query_selector("#name-section h3 span a")
album_artist = artist_el.inner_text().strip() if artist_el else "Unknown"
for row in track_rows:
if len(results) >= limit:
break
try:
title_el = row.query_selector(".track-title")
# Sanitize title
title = title_el.inner_text().strip().replace("\n", " ").replace("\r", "") if title_el else "Unknown"
title = " ".join(title.split())
# Track link
link_el = row.query_selector(".title a")
href = link_el.get_attribute("href") if link_el else ""
if href and not href.startswith("http"):
base = url.split(".com")[0] + ".com"
href = base + href
duration_el = row.query_selector(".time")
duration = duration_el.inner_text().strip() if duration_el else ""
columns = [
("Name", title),
("Artist", album_artist),
("Duration", duration)
]
results.append(SearchResult(
origin="bandcamp",
title=title,
target=href,
full_metadata={"artist": album_artist, "duration": duration},
columns=columns
))
except Exception as e:
debug(f"[Bandcamp] Error parsing track row: {e}")
continue
return results
debug("[Bandcamp] No recognizable items found on page")
return []
def get_result_args(self) -> List[str]:
return ["-url"]
# Provider registry
_PROVIDERS = {
"bandcamp": BandcampProvider,
"local": LocalStorageProvider,
"libgen": LibGenProvider,
"soulseek": SoulSeekProvider,