1778 lines
65 KiB
Python
1778 lines
65 KiB
Python
"""
|
|
SearchProvider: Unified interface for different search backends.
|
|
|
|
This module defines a base class and registry for search providers that can be
|
|
used by search-file and other search-related cmdlets to handle different sources:
|
|
- Local file storage (LocalStorageBackend)
|
|
- Hydrus database
|
|
- AllDebrid magnets (search-debrid)
|
|
- Library Genesis / OpenLibrary books (search-libgen)
|
|
- Soulseek P2P network (search-soulseek)
|
|
- IMDB movies (future)
|
|
- Other sources
|
|
|
|
Usage:
|
|
from helper.search_provider import SearchProvider, get_provider
|
|
|
|
provider = get_provider("libgen")
|
|
results = provider.search("python programming", limit=10)
|
|
|
|
for result in results:
|
|
print(result["title"], result["target"], result["annotations"])
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from abc import ABC, abstractmethod
|
|
from typing import Any, Dict, List, Optional, Sequence, Tuple
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
import sys
|
|
import subprocess
|
|
import json
|
|
import shutil
|
|
|
|
|
|
from helper.logger import log, debug
|
|
|
|
|
|
@dataclass
|
|
class SearchResult:
|
|
"""Unified search result format across all providers."""
|
|
|
|
# Required fields
|
|
origin: str # Provider name: "libgen", "soulseek", "debrid", "local", "hydrus", etc.
|
|
title: str # Display title/filename
|
|
target: str # Unique identifier or download target (URL, path, magnet hash, etc.)
|
|
|
|
# Optional fields
|
|
detail: str = "" # Additional details (size, status, format, etc.)
|
|
annotations: List[str] = None # Tags/annotations: ["ready", "120MB", "mp3", etc.]
|
|
media_kind: str = "other" # Type: "book", "audio", "video", "file", "magnet", etc.
|
|
size_bytes: Optional[int] = None # File size in bytes
|
|
tags: Optional[set[str]] = None # Searchable tags
|
|
full_metadata: Optional[Dict[str, Any]] = None # Extra metadata (author, year, etc.)
|
|
columns: List[Tuple[str, str]] = None # Display columns: [("Header", "value"), ...] for result table
|
|
|
|
def __post_init__(self):
|
|
"""Ensure mutable defaults are properly initialized."""
|
|
if self.annotations is None:
|
|
self.annotations = []
|
|
if self.tags is None:
|
|
self.tags = set()
|
|
if self.full_metadata is None:
|
|
self.full_metadata = {}
|
|
if self.columns is None:
|
|
self.columns = []
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
"""Convert to dictionary for JSON serialization."""
|
|
data = {
|
|
"origin": self.origin,
|
|
"title": self.title,
|
|
"target": self.target,
|
|
"detail": self.detail,
|
|
"annotations": self.annotations,
|
|
"media_kind": self.media_kind,
|
|
"size_bytes": self.size_bytes,
|
|
"tags": list(self.tags) if self.tags else [],
|
|
"full_metadata": self.full_metadata,
|
|
}
|
|
if self.columns:
|
|
data["columns"] = list(self.columns)
|
|
return data
|
|
|
|
|
|
class SearchProvider(ABC):
|
|
"""Abstract base class for search providers."""
|
|
|
|
# Provider-specific field definitions: list of (api_field_name, display_column_name, formatter_func)
|
|
# Override in subclasses to define which fields to request and how to display them
|
|
# Example: [("title", "Title", None), ("author_name", "Author(s)", lambda x: ", ".join(x) if isinstance(x, list) else x)]
|
|
RESULT_FIELDS: List[Tuple[str, str, Optional[Any]]] = []
|
|
|
|
def __init__(self, config: Dict[str, Any] = None):
|
|
"""
|
|
Initialize provider with optional configuration.
|
|
|
|
Args:
|
|
config: Configuration dictionary (global config dict)
|
|
"""
|
|
self.config = config or {}
|
|
self.name = self.__class__.__name__.replace("Provider", "").lower()
|
|
|
|
@abstractmethod
|
|
def search(
|
|
self,
|
|
query: str,
|
|
limit: int = 50,
|
|
filters: Optional[Dict[str, Any]] = None,
|
|
**kwargs
|
|
) -> List[SearchResult]:
|
|
"""
|
|
Search for items matching the query.
|
|
|
|
Args:
|
|
query: Search query string. Special value "*" means "match all"
|
|
limit: Maximum number of results to return
|
|
filters: Optional filtering criteria (type, size, status, etc.)
|
|
**kwargs: Provider-specific arguments
|
|
|
|
Returns:
|
|
List of SearchResult objects
|
|
"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def get_result_args(self) -> List[str]:
|
|
"""
|
|
Get command-line arguments from a search result to pass to downstream cmdlets.
|
|
|
|
Example: For libgen, returns ["-url", result.target]
|
|
For soulseek, returns ["-id", result.target]
|
|
For local, returns ["-path", result.target]
|
|
|
|
Returns:
|
|
List of arguments to append to cmdlet invocation
|
|
"""
|
|
pass
|
|
|
|
def parse_args(self, args: Sequence[str]) -> Tuple[str, Dict[str, Any]]:
|
|
"""
|
|
Parse provider-specific command-line arguments.
|
|
|
|
Args:
|
|
args: Sequence of command-line arguments
|
|
|
|
Returns:
|
|
Tuple of (query, filters_dict)
|
|
"""
|
|
# Default implementation: first arg is query, rest are filters
|
|
query = args[0] if args else ""
|
|
filters = {}
|
|
return query, filters
|
|
|
|
def validate(self) -> bool:
|
|
"""
|
|
Validate that provider is properly configured and ready to use.
|
|
|
|
Returns:
|
|
True if provider is available, False otherwise
|
|
"""
|
|
return True
|
|
|
|
def get_columns_format(self) -> List[str]:
|
|
"""
|
|
Define which columns this provider displays in result table.
|
|
|
|
Returns:
|
|
List of column names to display.
|
|
Each provider can override to customize result table appearance.
|
|
Examples: ["Title", "Author", "Year"] for books
|
|
["Title", "Duration", "Format"] for media
|
|
["Title", "Size", "Status"] for files
|
|
|
|
Default: Empty list (uses traditional detail/origin/media_kind/target)
|
|
"""
|
|
return [col_name for _, col_name, _ in self.RESULT_FIELDS] if self.RESULT_FIELDS else []
|
|
|
|
def get_api_fields_string(self) -> str:
|
|
"""
|
|
Generate comma-separated API fields string from RESULT_FIELDS.
|
|
|
|
Returns:
|
|
Comma-separated string of API field names to request
|
|
Example: "title,author_name,first_publish_year,isbn,key"
|
|
"""
|
|
if not self.RESULT_FIELDS:
|
|
return ""
|
|
return ",".join(field_name for field_name, _, _ in self.RESULT_FIELDS)
|
|
|
|
def build_columns_from_doc(self, doc: Dict[str, Any], idx: int = None) -> List[Tuple[str, str]]:
|
|
"""
|
|
Dynamically build columns from a result document using RESULT_FIELDS definition.
|
|
|
|
Args:
|
|
doc: API response document (dict with field values)
|
|
idx: Optional index/number for the result (typically added as first column)
|
|
|
|
Returns:
|
|
List of (header, value) tuples ready for SearchResult.columns
|
|
"""
|
|
columns = []
|
|
|
|
# Add index as first column if provided
|
|
if idx is not None:
|
|
columns.append(("#", str(idx)))
|
|
|
|
# Process each field definition
|
|
for api_field_name, display_col_name, formatter_func in self.RESULT_FIELDS:
|
|
value = doc.get(api_field_name, "")
|
|
|
|
# Apply formatter if defined
|
|
if formatter_func and value:
|
|
value = formatter_func(value)
|
|
|
|
# Convert to string and add to columns
|
|
value_str = str(value) if value else "Unknown"
|
|
columns.append((display_col_name, value_str))
|
|
|
|
return columns
|
|
|
|
|
|
class LocalStorageProvider(SearchProvider):
|
|
"""Search provider for local file system storage."""
|
|
|
|
def __init__(self, config: Dict[str, Any] = None):
|
|
super().__init__(config)
|
|
self.name = "local"
|
|
# Import here to avoid circular dependency
|
|
from helper.file_storage import FileStorage
|
|
self.storage = FileStorage(config)
|
|
|
|
def search(
|
|
self,
|
|
query: str,
|
|
limit: int = 50,
|
|
filters: Optional[Dict[str, Any]] = None,
|
|
**kwargs
|
|
) -> List[SearchResult]:
|
|
"""Search local file storage."""
|
|
filters = filters or {}
|
|
backend_name = filters.get("backend", "local")
|
|
|
|
try:
|
|
# Use the backend from FileStorage
|
|
results = self.storage[backend_name].search(query, limit=limit)
|
|
|
|
search_results = []
|
|
for result_dict in results:
|
|
path = result_dict.get("path", "")
|
|
size = result_dict.get("size")
|
|
annotations = []
|
|
|
|
if size:
|
|
annotations.append(f"{size / 1e6:.1f}MB")
|
|
|
|
search_results.append(SearchResult(
|
|
origin="local",
|
|
title=path.split("\\")[-1] if path else "Unknown",
|
|
target=path,
|
|
detail=f"Local: {path}",
|
|
annotations=annotations,
|
|
size_bytes=size,
|
|
))
|
|
|
|
return search_results
|
|
|
|
except Exception as e:
|
|
log(f"[local] Search error: {e}", file=sys.stderr)
|
|
return []
|
|
|
|
def get_result_args(self) -> List[str]:
|
|
"""Local storage uses -path argument."""
|
|
return ["-path"]
|
|
|
|
def validate(self) -> bool:
|
|
"""Local storage is always available."""
|
|
return True
|
|
|
|
|
|
class LibGenProvider(SearchProvider):
|
|
"""Search provider for Library Genesis books."""
|
|
|
|
# Define fields to display (note: LibGen doesn't have API field mapping like OpenLibrary)
|
|
# These are extracted from the book dict directly
|
|
RESULT_FIELDS = [
|
|
("title", "Title", None),
|
|
("author", "Author(s)", None),
|
|
("year", "Year", None),
|
|
]
|
|
|
|
def __init__(self, config: Dict[str, Any] = None):
|
|
super().__init__(config)
|
|
self.name = "libgen"
|
|
|
|
def search(
|
|
self,
|
|
query: str,
|
|
limit: int = 50,
|
|
filters: Optional[Dict[str, Any]] = None,
|
|
**kwargs
|
|
) -> List[SearchResult]:
|
|
"""Search Library Genesis for books.
|
|
|
|
Supports dynamic query format:
|
|
- isbn:0557677203
|
|
- author:"Albert Pike"
|
|
- title:"Book Title"
|
|
- Combination: isbn:0557677203 author:"Albert Pike" free text
|
|
|
|
Priority: ISBN is the authoritative key for searching.
|
|
"""
|
|
filters = filters or {}
|
|
|
|
try:
|
|
from helper.unified_book_downloader import UnifiedBookDownloader
|
|
from helper.query_parser import parse_query, get_field, get_free_text
|
|
|
|
debug(f"[libgen] Starting search for: {query}")
|
|
|
|
# Parse the query to extract structured fields
|
|
parsed = parse_query(query)
|
|
isbn = get_field(parsed, 'isbn')
|
|
author = get_field(parsed, 'author')
|
|
title = get_field(parsed, 'title')
|
|
free_text = get_free_text(parsed)
|
|
|
|
# Build the search query for libgen
|
|
# Priority: isbn (authoritative key) > title > author > free_text
|
|
if isbn:
|
|
search_query = isbn
|
|
elif title:
|
|
search_query = title
|
|
elif author:
|
|
search_query = author
|
|
else:
|
|
search_query = free_text or query
|
|
|
|
debug(f"[libgen] Built search query: {search_query}")
|
|
|
|
downloader = UnifiedBookDownloader(config=self.config)
|
|
search_fn = getattr(downloader, "search_libgen", None)
|
|
|
|
if not callable(search_fn):
|
|
log("[libgen] Searcher unavailable", file=sys.stderr)
|
|
return []
|
|
|
|
debug(f"[libgen] Calling search_libgen with query: {search_query}")
|
|
books = search_fn(search_query, limit=limit)
|
|
debug(f"[libgen] Got {len(books) if books else 0} results from search_libgen")
|
|
|
|
search_results = []
|
|
for idx, book in enumerate(books, 1):
|
|
# Build columns dynamically from RESULT_FIELDS
|
|
columns = self.build_columns_from_doc(book, idx)
|
|
|
|
title = book.get("title", "Unknown")
|
|
author = book.get("author", "Unknown")
|
|
year = book.get("year", "Unknown")
|
|
filesize = book.get("filesize_str", "Unknown")
|
|
isbn = book.get("isbn", "")
|
|
mirror_url = book.get("mirror_url", "")
|
|
|
|
# Build detail with author and year
|
|
detail = f"By: {author}"
|
|
if year and year != "Unknown":
|
|
detail += f" ({year})"
|
|
|
|
annotations = [f"{filesize}"]
|
|
if isbn:
|
|
annotations.append(f"ISBN: {isbn}")
|
|
|
|
search_results.append(SearchResult(
|
|
origin="libgen",
|
|
title=title,
|
|
target=mirror_url or f"libgen:{book.get('id', '')}",
|
|
detail=detail,
|
|
annotations=annotations,
|
|
media_kind="book",
|
|
columns=columns,
|
|
full_metadata={
|
|
"number": idx,
|
|
"author": author,
|
|
"year": year,
|
|
"isbn": isbn,
|
|
"filesize": filesize,
|
|
"mirrors": book.get("mirrors", {}),
|
|
"book_id": book.get("book_id", ""),
|
|
"md5": book.get("md5", ""),
|
|
},
|
|
))
|
|
|
|
debug(f"[libgen] Returning {len(search_results)} formatted results")
|
|
return search_results
|
|
|
|
except Exception as e:
|
|
log(f"[libgen] Search error: {e}", file=sys.stderr)
|
|
import traceback
|
|
log(traceback.format_exc(), file=sys.stderr)
|
|
return []
|
|
|
|
def get_result_args(self) -> List[str]:
|
|
"""LibGen results use -url for download or -mirror for selection."""
|
|
return ["-url"]
|
|
|
|
def validate(self) -> bool:
|
|
"""Check if LibGen downloader is available."""
|
|
try:
|
|
from helper.unified_book_downloader import UnifiedBookDownloader
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
class SoulSeekProvider(SearchProvider):
|
|
"""Search provider for Soulseek P2P network."""
|
|
|
|
# Allowed music file extensions
|
|
MUSIC_EXTENSIONS = {
|
|
'.flac', '.mp3', '.m4a', '.aac', '.ogg', '.opus',
|
|
'.wav', '.alac', '.wma', '.ape', '.aiff', '.dsf',
|
|
'.dff', '.wv', '.tta', '.tak', '.ac3', '.dts'
|
|
}
|
|
|
|
# Display columns for search results
|
|
RESULT_FIELDS = [
|
|
("track_num", "Track", None),
|
|
("title", "Title", None),
|
|
("artist", "Artist", lambda x: (str(x)[:32] + '...') if x and len(str(x)) > 35 else x),
|
|
("album", "Album", lambda x: (str(x)[:32] + '...') if x and len(str(x)) > 35 else x),
|
|
("size", "Size", lambda x: f"{int(int(x)/1024/1024)} MB" if x else ""),
|
|
]
|
|
|
|
# Soulseek config
|
|
USERNAME = "asjhkjljhkjfdsd334"
|
|
PASSWORD = "khhhg"
|
|
DOWNLOAD_DIR = "./downloads"
|
|
MAX_WAIT_TRANSFER = 1200
|
|
|
|
def __init__(self, config: Dict[str, Any] = None):
|
|
super().__init__(config)
|
|
self.name = "soulseek"
|
|
|
|
async def perform_search(
|
|
self,
|
|
query: str,
|
|
timeout: float = 9.0,
|
|
limit: int = 50
|
|
) -> List[Dict[str, Any]]:
|
|
"""Perform async Soulseek search and return flattened results."""
|
|
import asyncio
|
|
import os
|
|
import re
|
|
import time
|
|
from aioslsk.client import SoulSeekClient
|
|
from aioslsk.settings import Settings, CredentialsSettings
|
|
|
|
os.makedirs(self.DOWNLOAD_DIR, exist_ok=True)
|
|
|
|
settings = Settings(credentials=CredentialsSettings(username=self.USERNAME, password=self.PASSWORD))
|
|
client = SoulSeekClient(settings)
|
|
|
|
try:
|
|
await client.start()
|
|
await client.login()
|
|
except Exception as e:
|
|
log(f"[soulseek] Login failed: {type(e).__name__}: {e}", file=sys.stderr)
|
|
return []
|
|
|
|
try:
|
|
search_request = await client.searches.search(query)
|
|
await self._collect_search_results(client, search_request, timeout=timeout)
|
|
flat = self._flatten_search_results(search_request)[:limit]
|
|
return flat
|
|
except Exception as e:
|
|
log(f"[soulseek] Search error: {type(e).__name__}: {e}", file=sys.stderr)
|
|
return []
|
|
finally:
|
|
try:
|
|
await client.stop()
|
|
except Exception:
|
|
pass
|
|
|
|
def _flatten_search_results(self, search_request) -> List[dict]:
|
|
"""Extract files from SearchRequest.results."""
|
|
flat: List[dict] = []
|
|
for result in search_request.results:
|
|
username = getattr(result, "username", "?")
|
|
|
|
for file_data in getattr(result, "shared_items", []):
|
|
flat.append({
|
|
"file": file_data,
|
|
"username": username,
|
|
"filename": getattr(file_data, "filename", "?"),
|
|
"size": getattr(file_data, "filesize", 0),
|
|
})
|
|
|
|
for file_data in getattr(result, "locked_results", []):
|
|
flat.append({
|
|
"file": file_data,
|
|
"username": username,
|
|
"filename": getattr(file_data, "filename", "?"),
|
|
"size": getattr(file_data, "filesize", 0),
|
|
})
|
|
|
|
return flat
|
|
|
|
async def _collect_search_results(self, client, search_request, timeout: float = 75.0) -> None:
|
|
"""Collect search results by waiting."""
|
|
import asyncio
|
|
import time
|
|
debug(f"[soulseek] Collecting results for {timeout}s...")
|
|
end = time.time() + timeout
|
|
last_count = 0
|
|
while time.time() < end:
|
|
current_count = len(search_request.results)
|
|
if current_count > last_count:
|
|
debug(f"[soulseek] Got {current_count} result(s) so far...")
|
|
last_count = current_count
|
|
await asyncio.sleep(0.5)
|
|
|
|
async def download_file(
|
|
self,
|
|
username: str,
|
|
filename: str,
|
|
file_size: int,
|
|
target_dir: Optional[str] = None
|
|
) -> bool:
|
|
"""Download a file from Soulseek to a specific directory."""
|
|
import asyncio
|
|
import os
|
|
import time
|
|
from aioslsk.client import SoulSeekClient
|
|
from aioslsk.settings import Settings, CredentialsSettings
|
|
from aioslsk.events import TransferProgressEvent
|
|
from tqdm import tqdm
|
|
|
|
download_dir = target_dir if target_dir else self.DOWNLOAD_DIR
|
|
os.makedirs(download_dir, exist_ok=True)
|
|
|
|
settings = Settings(credentials=CredentialsSettings(username=self.USERNAME, password=self.PASSWORD))
|
|
settings.shares.download = download_dir
|
|
client = SoulSeekClient(settings)
|
|
|
|
try:
|
|
await client.start()
|
|
await client.login()
|
|
|
|
debug(f"[soulseek] Starting: {filename} from {username}")
|
|
|
|
transfer = await client.transfers.download(username, filename)
|
|
if transfer is None:
|
|
log("[soulseek] Failed: transfer object is None")
|
|
return False
|
|
|
|
success = await self._wait_for_transfer(client, transfer, file_size=file_size, max_wait=self.MAX_WAIT_TRANSFER)
|
|
|
|
return success
|
|
|
|
except Exception as e:
|
|
log(f"[soulseek] Download error: {type(e).__name__}: {e}", file=sys.stderr)
|
|
return False
|
|
|
|
finally:
|
|
try:
|
|
await client.stop()
|
|
except Exception:
|
|
pass
|
|
|
|
async def _wait_for_transfer(self, client, transfer_obj: Any, file_size: Any = None, max_wait: float = 1200) -> bool:
|
|
"""Wait for transfer finish using event listeners with TQDM progress bar.
|
|
|
|
Returns:
|
|
True if transfer completed successfully, False if failed or timed out.
|
|
"""
|
|
import asyncio
|
|
import time
|
|
from aioslsk.events import TransferProgressEvent
|
|
from tqdm import tqdm
|
|
|
|
if transfer_obj is None:
|
|
log("[soulseek] No transfer object returned")
|
|
return False
|
|
|
|
transfer_finished = False
|
|
transfer_success = False
|
|
pbar = None
|
|
total_size = file_size
|
|
last_speed_time = time.time()
|
|
last_speed = 0
|
|
|
|
async def on_progress(event):
|
|
nonlocal last_speed_time, last_speed, transfer_finished, transfer_success, pbar, total_size
|
|
if not hasattr(event, 'updates') or not event.updates:
|
|
return
|
|
|
|
for transfer, _, curr_snapshot in event.updates:
|
|
if (transfer.username == transfer_obj.username and transfer.remote_path == transfer_obj.remote_path):
|
|
bytes_xfer = getattr(curr_snapshot, 'bytes_transfered', 0)
|
|
state_name = curr_snapshot.state.name if hasattr(curr_snapshot, 'state') else "?"
|
|
speed = getattr(curr_snapshot, 'speed', 0)
|
|
|
|
if total_size is None and hasattr(transfer, 'file_attributes'):
|
|
try:
|
|
size = getattr(transfer, 'file_size', None) or getattr(transfer, 'size', None)
|
|
if size:
|
|
total_size = size
|
|
except Exception:
|
|
pass
|
|
|
|
if pbar is None:
|
|
total = total_size if total_size else 100 * 1024 * 1024
|
|
pbar = tqdm(total=total, unit='B', unit_scale=True, desc='[transfer]')
|
|
|
|
if pbar:
|
|
pbar.n = bytes_xfer
|
|
if speed > 0:
|
|
pbar.set_postfix({"speed": f"{speed/1024:.1f} KB/s", "state": state_name})
|
|
pbar.refresh()
|
|
|
|
if state_name in ('FINISHED', 'COMPLETE'):
|
|
if pbar:
|
|
pbar.close()
|
|
debug(f"[soulseek] Transfer {state_name.lower()}")
|
|
transfer_finished = True
|
|
transfer_success = True
|
|
return
|
|
elif state_name in ('ABORTED', 'FAILED', 'PAUSED'):
|
|
if pbar:
|
|
pbar.close()
|
|
debug(f"[soulseek] Transfer {state_name.lower()}")
|
|
transfer_finished = True
|
|
transfer_success = False
|
|
return
|
|
|
|
if total_size and bytes_xfer >= total_size:
|
|
if pbar:
|
|
pbar.close()
|
|
debug(f"[soulseek] Transfer complete ({bytes_xfer / 1024 / 1024:.1f} MB)")
|
|
transfer_finished = True
|
|
transfer_success = True
|
|
return
|
|
|
|
if speed == 0 and bytes_xfer > 0:
|
|
now = time.time()
|
|
if now - last_speed_time > 3:
|
|
if pbar:
|
|
pbar.close()
|
|
debug(f"[soulseek] Transfer complete ({bytes_xfer / 1024 / 1024:.1f} MB)")
|
|
transfer_finished = True
|
|
transfer_success = True
|
|
return
|
|
else:
|
|
last_speed_time = time.time()
|
|
|
|
last_speed = speed
|
|
|
|
client.events.register(TransferProgressEvent, on_progress)
|
|
end = time.time() + max_wait
|
|
|
|
while time.time() < end:
|
|
if transfer_finished:
|
|
break
|
|
await asyncio.sleep(0.5)
|
|
|
|
client.events.unregister(TransferProgressEvent, on_progress)
|
|
|
|
if pbar:
|
|
pbar.close()
|
|
|
|
if not transfer_finished:
|
|
log(f"[soulseek] Timed out after {max_wait}s; transfer may still be in progress")
|
|
return False
|
|
else:
|
|
return transfer_success
|
|
|
|
def search(
|
|
self,
|
|
query: str,
|
|
limit: int = 50,
|
|
filters: Optional[Dict[str, Any]] = None,
|
|
**kwargs
|
|
) -> List[SearchResult]:
|
|
"""Search Soulseek P2P network (synchronous wrapper)."""
|
|
import asyncio
|
|
import re
|
|
|
|
filters = filters or {}
|
|
|
|
try:
|
|
# Run async search
|
|
flat_results = asyncio.run(self.perform_search(query, timeout=9.0, limit=limit))
|
|
|
|
if not flat_results:
|
|
return []
|
|
|
|
# Filter to music files only
|
|
music_results = []
|
|
for item in flat_results:
|
|
filename = item['filename']
|
|
if '.' in filename:
|
|
ext = '.' + filename.rsplit('.', 1)[-1].lower()
|
|
else:
|
|
ext = ''
|
|
|
|
if ext in self.MUSIC_EXTENSIONS:
|
|
music_results.append(item)
|
|
|
|
if not music_results:
|
|
return []
|
|
|
|
# Extract metadata for all results
|
|
enriched_results = []
|
|
for item in music_results:
|
|
filename = item['filename']
|
|
|
|
# Extract extension
|
|
if '.' in filename:
|
|
_, ext = filename.rsplit('.', 1)
|
|
ext = '.' + ext.lower()
|
|
else:
|
|
ext = ''
|
|
|
|
# Get display filename
|
|
if '\\' in filename:
|
|
display_name = filename.rsplit('\\', 1)[-1]
|
|
elif '/' in filename:
|
|
display_name = filename.rsplit('/', 1)[-1]
|
|
else:
|
|
display_name = filename
|
|
|
|
# Extract path hierarchy for artist/album
|
|
path_parts = filename.replace('\\', '/').split('/')
|
|
artist = ''
|
|
album = ''
|
|
|
|
if len(path_parts) >= 3:
|
|
artist = path_parts[-3]
|
|
album = path_parts[-2]
|
|
if ' - ' in album and re.match(r'^\d{4}', album):
|
|
album = album.split(' - ', 1)[1]
|
|
elif len(path_parts) == 2:
|
|
artist = path_parts[-2]
|
|
|
|
# Extract track number and title
|
|
base_name = display_name.rsplit('.', 1)[0] if '.' in display_name else display_name
|
|
track_num = ''
|
|
title = base_name
|
|
filename_artist = ''
|
|
|
|
# First, extract track number if present (e.g., "30 Stumfol - Prisoner" -> track=30, rest="Stumfol - Prisoner")
|
|
match = re.match(r'^(\d{1,3})\s*[\.\-]?\s+(.+)$', base_name)
|
|
if match:
|
|
track_num = match.group(1)
|
|
remainder = match.group(2)
|
|
|
|
# Now parse "Artist - Title" from the remainder
|
|
# If there's a " - " separator, split on it
|
|
if ' - ' in remainder:
|
|
parts = remainder.split(' - ', 1)
|
|
filename_artist = parts[0].strip()
|
|
title = parts[1].strip()
|
|
else:
|
|
# No artist-title separator, use the whole remainder as title
|
|
title = remainder
|
|
else:
|
|
# No track number, check if there's "Artist - Title" format
|
|
if ' - ' in base_name:
|
|
parts = base_name.split(' - ', 1)
|
|
filename_artist = parts[0].strip()
|
|
title = parts[1].strip()
|
|
|
|
# Use filename_artist if extracted, otherwise fall back to path artist
|
|
if filename_artist:
|
|
artist = filename_artist
|
|
|
|
enriched_results.append({
|
|
**item,
|
|
'artist': artist,
|
|
'album': album,
|
|
'title': title,
|
|
'track_num': track_num,
|
|
'ext': ext
|
|
})
|
|
|
|
# Apply filters if specified
|
|
if filters:
|
|
artist_filter = filters.get('artist', '').lower() if filters.get('artist') else ''
|
|
album_filter = filters.get('album', '').lower() if filters.get('album') else ''
|
|
track_filter = filters.get('track', '').lower() if filters.get('track') else ''
|
|
|
|
if artist_filter or album_filter or track_filter:
|
|
filtered_results = []
|
|
for item in enriched_results:
|
|
if artist_filter and artist_filter not in (item['artist'] or '').lower():
|
|
continue
|
|
if album_filter and album_filter not in (item['album'] or '').lower():
|
|
continue
|
|
if track_filter and track_filter not in (item['title'] or '').lower():
|
|
continue
|
|
filtered_results.append(item)
|
|
|
|
enriched_results = filtered_results
|
|
|
|
# Sort: .flac first, then others
|
|
enriched_results.sort(key=lambda item: (item['ext'].lower() != '.flac', -item['size']))
|
|
|
|
# Convert to SearchResult format
|
|
search_results = []
|
|
for idx, item in enumerate(enriched_results, 1):
|
|
artist_display = item['artist'] if item['artist'] else "(no artist)"
|
|
album_display = item['album'] if item['album'] else "(no album)"
|
|
size_mb = int(round(item['size'] / 1024 / 1024))
|
|
|
|
if item['track_num']:
|
|
track_title = f"[{item['track_num']}] {item['title']}"
|
|
else:
|
|
track_title = item['title'] or "(untitled)"
|
|
|
|
# Build columns from enriched metadata
|
|
columns = self.build_columns_from_doc(item, idx=idx)
|
|
|
|
search_results.append(SearchResult(
|
|
origin="soulseek",
|
|
title=track_title,
|
|
target=item['filename'],
|
|
detail=f"Artist: {artist_display} | Album: {album_display}",
|
|
annotations=[f"{size_mb} MB", item['ext']],
|
|
media_kind="audio",
|
|
size_bytes=item['size'],
|
|
columns=columns,
|
|
full_metadata={
|
|
"artist": item['artist'],
|
|
"album": item['album'],
|
|
"track_num": item['track_num'],
|
|
"username": item['username'],
|
|
"filename": item['filename'],
|
|
"ext": item['ext'],
|
|
},
|
|
))
|
|
|
|
return search_results
|
|
|
|
except Exception as e:
|
|
log(f"Soulseek search error: {e}", file=sys.stderr)
|
|
return []
|
|
|
|
def get_result_args(self) -> List[str]:
|
|
"""Soulseek results use filename/path for results."""
|
|
return ["-path"]
|
|
|
|
def validate(self) -> bool:
|
|
"""Check if Soulseek client is available."""
|
|
try:
|
|
import aioslsk # type: ignore
|
|
return True
|
|
except ImportError:
|
|
return False
|
|
|
|
|
|
class DebridProvider(SearchProvider):
|
|
"""Search provider for AllDebrid magnets."""
|
|
|
|
# Status code mappings
|
|
STATUS_MAP = {
|
|
0: "In Queue",
|
|
1: "Downloading",
|
|
2: "Compressing",
|
|
3: "Uploading",
|
|
4: "Ready",
|
|
5: "Upload Failed",
|
|
6: "Unpack Error",
|
|
7: "Not Downloaded",
|
|
8: "File Too Big",
|
|
9: "Internal Error",
|
|
10: "Download Timeout",
|
|
11: "Deleted",
|
|
12: "Processing Failed",
|
|
13: "Processing Failed",
|
|
14: "Tracker Error",
|
|
15: "No Peers"
|
|
}
|
|
|
|
def __init__(self, config: Dict[str, Any] = None):
|
|
super().__init__(config)
|
|
self.name = "debrid"
|
|
self._magnet_files_cache = {}
|
|
|
|
def _format_size(self, bytes_val: float) -> str:
|
|
"""Format bytes to human readable size."""
|
|
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
|
|
if bytes_val < 1024:
|
|
return f"{bytes_val:.2f} {unit}"
|
|
bytes_val /= 1024
|
|
return f"{bytes_val:.2f} PB"
|
|
|
|
def _get_status_display(self, status_code: int) -> str:
|
|
"""Get human-readable status for AllDebrid status codes."""
|
|
return self.STATUS_MAP.get(status_code, f"Unknown ({status_code})")
|
|
|
|
def _should_filter_magnet(self, status_code: int, status_text: str) -> bool:
|
|
"""Check if magnet should be filtered out (expired/deleted)."""
|
|
# Filter expired/deleted entries
|
|
return status_code in (5, 6, 7, 8, 11, 12, 13, 14)
|
|
|
|
def _fuzzy_match(self, text: str, pattern: str) -> bool:
|
|
"""Check if pattern fuzzy-matches text (case-insensitive, substring matching)."""
|
|
return pattern.lower() in text.lower()
|
|
|
|
def search(
|
|
self,
|
|
query: str,
|
|
limit: int = 50,
|
|
filters: Optional[Dict[str, Any]] = None,
|
|
**kwargs
|
|
) -> List[SearchResult]:
|
|
"""Search AllDebrid magnets with optional status and name filtering.
|
|
|
|
Args:
|
|
query: Search query (magnet filename or '*' for all)
|
|
limit: Max results to return
|
|
filters: Optional dict with 'status' filter ('all', 'active', 'ready', 'error')
|
|
|
|
Returns:
|
|
List of SearchResult objects
|
|
"""
|
|
filters = filters or {}
|
|
|
|
try:
|
|
from helper.alldebrid import AllDebridClient
|
|
from config import get_debrid_api_key
|
|
|
|
api_key = get_debrid_api_key(self.config)
|
|
|
|
if not api_key:
|
|
log("[debrid] API key not configured", file=sys.stderr)
|
|
return []
|
|
|
|
client = AllDebridClient(api_key)
|
|
|
|
# Parse status filter
|
|
status_filter_param = filters.get('status', 'all').lower() if filters.get('status') else 'all'
|
|
|
|
# Get magnets with optional status filter
|
|
response = client._request("magnet/status", {})
|
|
|
|
if response.get("status") != "success":
|
|
log(f"[debrid] API error: {response.get('error', 'Unknown')}", file=sys.stderr)
|
|
return []
|
|
|
|
magnets = response.get("data", {}).get("magnets", [])
|
|
|
|
# Handle both list and dict formats
|
|
if isinstance(magnets, dict):
|
|
magnets = list(magnets.values())
|
|
|
|
# Filter by status if specified
|
|
if status_filter_param == 'active':
|
|
magnets = [m for m in magnets if m.get('statusCode', -1) in (0, 1, 2, 3)]
|
|
elif status_filter_param == 'ready':
|
|
magnets = [m for m in magnets if m.get('statusCode', -1) == 4]
|
|
elif status_filter_param == 'error':
|
|
magnets = [m for m in magnets if m.get('statusCode', -1) in (5, 6, 8, 9, 10, 12, 13, 14, 15)]
|
|
# 'all' includes everything
|
|
|
|
# Filter by query (fuzzy match on filename)
|
|
results = []
|
|
count = 0
|
|
for magnet in magnets:
|
|
if count >= limit:
|
|
break
|
|
|
|
filename = magnet.get("filename", "")
|
|
status_code = magnet.get("statusCode", -1)
|
|
status_text = magnet.get("status", "Unknown")
|
|
|
|
# Skip expired/deleted unless 'all' filter
|
|
if status_filter_param != 'all' and self._should_filter_magnet(status_code, status_text):
|
|
continue
|
|
|
|
# Apply query filter (skip if doesn't match)
|
|
if query and query != "*" and not self._fuzzy_match(filename, query):
|
|
continue
|
|
|
|
magnet_id = magnet.get("id")
|
|
size = magnet.get("size", 0)
|
|
downloaded = magnet.get("downloaded", 0)
|
|
progress = (downloaded / size * 100) if size > 0 else 0
|
|
|
|
# Get status emoji
|
|
if status_code == 4:
|
|
status_emoji = "✓"
|
|
elif status_code < 4:
|
|
status_emoji = "⧗"
|
|
else:
|
|
status_emoji = "✗"
|
|
|
|
annotations = [self._get_status_display(status_code)]
|
|
if size > 0:
|
|
annotations.append(self._format_size(size))
|
|
if progress > 0 and progress < 100:
|
|
annotations.append(f"{progress:.1f}%")
|
|
|
|
results.append(SearchResult(
|
|
origin="debrid",
|
|
title=filename or "Unknown",
|
|
target=str(magnet_id),
|
|
detail=f"{status_emoji} {self._get_status_display(status_code)} | {self._format_size(size)}",
|
|
annotations=annotations,
|
|
media_kind="magnet",
|
|
size_bytes=size,
|
|
full_metadata={
|
|
"magnet_id": magnet_id,
|
|
"status_code": status_code,
|
|
"status_text": status_text,
|
|
"progress": progress,
|
|
"downloaded": downloaded,
|
|
"seeders": magnet.get("seeders", 0),
|
|
"download_speed": magnet.get("downloadSpeed", 0),
|
|
},
|
|
))
|
|
|
|
count += 1
|
|
|
|
# Cache metadata for ready magnets
|
|
if results:
|
|
self._cache_ready_magnet_metadata(client, [r for r in results if r.full_metadata.get('status_code') == 4])
|
|
|
|
return results
|
|
|
|
except Exception as e:
|
|
log(f"Debrid search error: {e}", file=sys.stderr)
|
|
return []
|
|
|
|
def _cache_ready_magnet_metadata(self, client, results: List[SearchResult]) -> None:
|
|
"""Cache file metadata for ready magnets."""
|
|
if not results:
|
|
return
|
|
|
|
try:
|
|
ready_ids = [r.full_metadata.get('magnet_id') for r in results if r.full_metadata.get('status_code') == 4]
|
|
if ready_ids:
|
|
self._magnet_files_cache = client.magnet_links(ready_ids)
|
|
log(f"[debrid] Cached metadata for {len(self._magnet_files_cache)} ready magnet(s)", file=sys.stderr)
|
|
except Exception as e:
|
|
log(f"[debrid] Warning: Could not cache magnet metadata: {e}", file=sys.stderr)
|
|
|
|
def get_magnet_metadata(self, magnet_id: int) -> Optional[Dict[str, Any]]:
|
|
"""Get cached metadata for a magnet."""
|
|
return self._magnet_files_cache.get(str(magnet_id))
|
|
|
|
def get_result_args(self) -> List[str]:
|
|
"""Debrid results use magnet ID for download."""
|
|
return ["-id"]
|
|
|
|
def validate(self) -> bool:
|
|
"""Check if AllDebrid is configured."""
|
|
from config import get_debrid_api_key
|
|
return bool(get_debrid_api_key(self.config))
|
|
|
|
|
|
class OpenLibraryProvider(SearchProvider):
|
|
"""Search provider for OpenLibrary."""
|
|
|
|
# Define fields to request from API and how to display them
|
|
RESULT_FIELDS = [
|
|
("title", "Title", None),
|
|
("author_name", "Author", lambda x: ", ".join(x) if isinstance(x, list) else x),
|
|
("first_publish_year", "Year", None),
|
|
("status", "Status", None),
|
|
]
|
|
|
|
def __init__(self, config: Dict[str, Any] = None):
|
|
super().__init__(config)
|
|
self.name = "openlibrary"
|
|
|
|
def _derive_status(self, doc: Dict[str, Any]) -> tuple[str, Optional[str]]:
|
|
"""Determine availability label and archive identifier."""
|
|
ebook_access = str(doc.get("ebook_access", "") or "").strip().lower()
|
|
has_fulltext = bool(doc.get("has_fulltext"))
|
|
ia_entries = doc.get("ia")
|
|
archive_id = ""
|
|
if isinstance(ia_entries, list):
|
|
for entry in ia_entries:
|
|
if isinstance(entry, str) and entry.strip():
|
|
archive_id = entry.strip()
|
|
break
|
|
elif isinstance(ia_entries, str) and ia_entries.strip():
|
|
archive_id = ia_entries.strip()
|
|
elif isinstance(doc.get("ocaid"), str) and doc["ocaid"].strip():
|
|
archive_id = doc["ocaid"].strip()
|
|
|
|
available = False
|
|
if ebook_access in {"borrowable", "public", "full"}:
|
|
available = True
|
|
elif has_fulltext:
|
|
available = True
|
|
elif archive_id:
|
|
available = True
|
|
|
|
status = "download" if available else "?Libgen"
|
|
return status, archive_id or None
|
|
|
|
def search(
|
|
self,
|
|
query: str,
|
|
limit: int = 50,
|
|
filters: Optional[Dict[str, Any]] = None,
|
|
**kwargs
|
|
) -> List[SearchResult]:
|
|
"""Search OpenLibrary for books.
|
|
|
|
Smart search that detects ISBN, OCLC, OpenLibrary ID, and falls back to title search.
|
|
"""
|
|
filters = filters or {}
|
|
|
|
try:
|
|
import requests
|
|
|
|
query_clean = query.strip()
|
|
search_url = "https://openlibrary.org/search.json"
|
|
|
|
# Try to detect query type (ISBN, OCLC, OL ID, or title)
|
|
if query_clean.isdigit() and len(query_clean) in (10, 13):
|
|
# ISBN search
|
|
url = f"https://openlibrary.org/isbn/{query_clean}.json"
|
|
response = requests.get(url, timeout=9)
|
|
if response.status_code == 200:
|
|
book_data = response.json()
|
|
return [self._format_isbn_result(book_data, query_clean)]
|
|
elif response.status_code == 404:
|
|
return []
|
|
|
|
# Default to title/general search
|
|
params = {
|
|
"q": query_clean,
|
|
"limit": limit,
|
|
"fields": f"{self.get_api_fields_string()},isbn,oclc_numbers,lccn,number_of_pages_median,language,key,ebook_access,ia,has_fulltext",
|
|
}
|
|
|
|
response = requests.get(search_url, params=params, timeout=9)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
search_results = []
|
|
for idx, doc in enumerate(data.get("docs", []), 1):
|
|
# Extract OLID first (needed for metadata)
|
|
olid = doc.get("key", "").split("/")[-1]
|
|
|
|
# Determine status/availability
|
|
status, archive_id = self._derive_status(doc)
|
|
doc["status"] = status
|
|
|
|
# Build columns dynamically from RESULT_FIELDS (now includes status)
|
|
columns = self.build_columns_from_doc(doc, idx)
|
|
|
|
# Extract additional metadata
|
|
title = doc.get("title", "Unknown")
|
|
authors = doc.get("author_name", ["Unknown"])
|
|
year = doc.get("first_publish_year", "")
|
|
isbn_list = doc.get("isbn", [])
|
|
isbn = isbn_list[0] if isbn_list else ""
|
|
oclc_list = doc.get("oclc_numbers", [])
|
|
oclc = oclc_list[0] if oclc_list else ""
|
|
lccn_list = doc.get("lccn", [])
|
|
lccn = lccn_list[0] if lccn_list else ""
|
|
pages = doc.get("number_of_pages_median", "")
|
|
languages = doc.get("language", [])
|
|
language = languages[0] if languages else ""
|
|
|
|
author_str = ", ".join(authors) if authors else "Unknown"
|
|
|
|
# Build detail with author and year
|
|
detail = f"By: {author_str}"
|
|
if year:
|
|
detail += f" ({year})"
|
|
|
|
# Build annotations with additional info
|
|
annotations = []
|
|
if pages:
|
|
annotations.append(f"{pages} pages")
|
|
if isbn:
|
|
annotations.append(f"ISBN: {isbn}")
|
|
|
|
search_results.append(SearchResult(
|
|
origin="openlibrary",
|
|
title=title,
|
|
target=f"https://openlibrary.org/books/{olid}",
|
|
detail=detail,
|
|
annotations=annotations,
|
|
media_kind="book",
|
|
columns=columns,
|
|
full_metadata={
|
|
"number": idx,
|
|
"authors": authors,
|
|
"year": year,
|
|
"isbn": isbn,
|
|
"oclc": oclc,
|
|
"lccn": lccn,
|
|
"pages": pages,
|
|
"language": language,
|
|
"olid": olid,
|
|
"ebook_access": doc.get("ebook_access", ""),
|
|
"status": status,
|
|
"archive_id": archive_id,
|
|
},
|
|
))
|
|
|
|
# Sort results: borrowable ones first, then not borrowable, then unknown
|
|
def sort_key(result):
|
|
status = (result.full_metadata.get("status") or "").strip().lower()
|
|
if status == "download":
|
|
return (0, result.title)
|
|
elif status.startswith("?libgen"):
|
|
return (1, result.title)
|
|
else:
|
|
return (2, result.title)
|
|
|
|
search_results.sort(key=sort_key)
|
|
|
|
# Rebuild number field after sorting
|
|
for new_idx, result in enumerate(search_results, 1):
|
|
result.full_metadata["number"] = new_idx
|
|
# Update the # column in columns
|
|
if result.columns and result.columns[0][0] == "#":
|
|
result.columns[0] = ("#", str(new_idx))
|
|
|
|
return search_results
|
|
|
|
except Exception as e:
|
|
log(f"OpenLibrary search error: {e}", file=sys.stderr)
|
|
return []
|
|
|
|
def _format_isbn_result(self, book_data: Dict[str, Any], isbn: str) -> SearchResult:
|
|
"""Format a book result from ISBN endpoint."""
|
|
# Get title from book data
|
|
title = book_data.get("title", "Unknown")
|
|
|
|
# Get authors
|
|
author_list = []
|
|
for author_key in book_data.get("authors", []):
|
|
if isinstance(author_key, dict):
|
|
author_list.append(author_key.get("name", ""))
|
|
elif isinstance(author_key, str):
|
|
author_list.append(author_key)
|
|
|
|
author_str = ", ".join(filter(None, author_list)) if author_list else "Unknown"
|
|
|
|
# Extract other metadata
|
|
year = book_data.get("first_publish_year", "")
|
|
publishers = book_data.get("publishers", [])
|
|
publisher = publishers[0].get("name", "") if publishers and isinstance(publishers[0], dict) else ""
|
|
pages = book_data.get("number_of_pages", "")
|
|
languages = book_data.get("languages", [])
|
|
language = languages[0].get("key", "").replace("/languages/", "") if languages else ""
|
|
olid = book_data.get("key", "").split("/")[-1] if book_data.get("key") else ""
|
|
|
|
# Build doc for column rendering
|
|
doc = {
|
|
"title": title,
|
|
"author_name": author_list,
|
|
"first_publish_year": year,
|
|
"ebook_access": book_data.get("ebook_access", ""),
|
|
"has_fulltext": bool(book_data.get("ocaid")),
|
|
"ia": [book_data.get("ocaid")] if book_data.get("ocaid") else [],
|
|
"ocaid": book_data.get("ocaid", ""),
|
|
}
|
|
status, archive_id = self._derive_status(doc)
|
|
doc["status"] = status
|
|
|
|
# Build detail
|
|
detail = f"By: {author_str}"
|
|
if year:
|
|
detail += f" ({year})"
|
|
|
|
# Build annotations
|
|
annotations = []
|
|
if pages:
|
|
annotations.append(f"{pages} pages")
|
|
annotations.append(f"ISBN: {isbn}")
|
|
|
|
# Build columns using shared helper for consistency
|
|
columns = self.build_columns_from_doc(doc, idx=1)
|
|
|
|
return SearchResult(
|
|
origin="openlibrary",
|
|
title=title,
|
|
target=f"https://openlibrary.org/books/{olid}",
|
|
detail=detail,
|
|
annotations=annotations,
|
|
media_kind="book",
|
|
columns=columns,
|
|
full_metadata={
|
|
"number": 1,
|
|
"authors": author_list,
|
|
"year": year,
|
|
"isbn": isbn,
|
|
"oclc": "",
|
|
"lccn": "",
|
|
"pages": pages,
|
|
"language": language,
|
|
"olid": olid,
|
|
"publisher": publisher,
|
|
"ebook_access": doc.get("ebook_access", ""),
|
|
"status": status,
|
|
"archive_id": archive_id,
|
|
},
|
|
)
|
|
|
|
def get_result_args(self) -> List[str]:
|
|
"""OpenLibrary results are info/links only."""
|
|
return ["-info"]
|
|
|
|
def validate(self) -> bool:
|
|
"""OpenLibrary is always available (no auth needed)."""
|
|
return True
|
|
|
|
|
|
class GogGamesProvider(SearchProvider):
|
|
"""Search provider for GOG Games."""
|
|
|
|
def __init__(self, config: Dict[str, Any] = None):
|
|
super().__init__(config)
|
|
self.name = "gog"
|
|
self.base_url = "https://gog-games.to"
|
|
self.headers = {
|
|
"Referer": "https://gog-games.to/",
|
|
"Origin": "https://gog-games.to",
|
|
"X-Requested-With": "XMLHttpRequest"
|
|
}
|
|
|
|
def _request(self, client, endpoint: str, is_json: bool = True) -> Any:
|
|
"""Helper for API requests."""
|
|
url = f"{self.base_url}/api/web/{endpoint}"
|
|
try:
|
|
response = client.get(url, headers=self.headers)
|
|
if response.status_code == 200:
|
|
return response.json() if is_json else response.text
|
|
elif response.status_code == 404:
|
|
return None
|
|
else:
|
|
log(f"[gog] API request failed: {response.status_code} for {endpoint}", file=sys.stderr)
|
|
return None
|
|
except Exception as e:
|
|
log(f"[gog] Request error: {e}", file=sys.stderr)
|
|
return None
|
|
|
|
def get_all_games(self, client) -> List[Dict[str, Any]]:
|
|
"""Fetch all games from the API."""
|
|
return self._request(client, "all-games") or []
|
|
|
|
def get_game_details(self, client, slug: str) -> Optional[Dict[str, Any]]:
|
|
"""Fetch details for a specific game."""
|
|
return self._request(client, f"query-game/{slug}")
|
|
|
|
def get_game_md5(self, client, slug: str) -> Optional[str]:
|
|
"""Fetch MD5 checksums for a game."""
|
|
return self._request(client, f"download-md5/{slug}", is_json=False)
|
|
|
|
def search(
|
|
self,
|
|
query: str,
|
|
limit: int = 50,
|
|
filters: Optional[Dict[str, Any]] = None,
|
|
**kwargs
|
|
) -> List[SearchResult]:
|
|
"""Search GOG Games."""
|
|
from helper.http_client import HTTPClient
|
|
|
|
results = []
|
|
query_norm = query.strip().lower()
|
|
|
|
with HTTPClient() as client:
|
|
# 1. Fetch all games to perform fuzzy search
|
|
all_games = self.get_all_games(client)
|
|
|
|
matches = []
|
|
if all_games:
|
|
for game in all_games:
|
|
if (query_norm in game.get("title", "").lower() or
|
|
query_norm in game.get("slug", "").lower()):
|
|
matches.append(game)
|
|
|
|
# 2. Fallback: If no matches and query looks like a slug, try direct lookup
|
|
if not matches and "_" in query_norm:
|
|
details = self.get_game_details(client, query_norm)
|
|
if details and "game_info" in details:
|
|
matches.append(details["game_info"])
|
|
|
|
for game in matches[:limit]:
|
|
slug = game.get("slug")
|
|
title = game.get("title", slug)
|
|
infohash = game.get("infohash")
|
|
gog_url = game.get("gog_url", "")
|
|
|
|
# Note: 'all-games' endpoint doesn't provide file size.
|
|
# We set size to 0 to avoid N+1 requests.
|
|
|
|
if infohash:
|
|
magnet_link = f"magnet:?xt=urn:btih:{infohash}&dn={slug}"
|
|
results.append(SearchResult(
|
|
origin="gog",
|
|
title=title,
|
|
target=magnet_link,
|
|
media_kind="magnet",
|
|
detail="Magnet Link",
|
|
size_bytes=0,
|
|
annotations=["Magnet"],
|
|
full_metadata=game
|
|
))
|
|
else:
|
|
results.append(SearchResult(
|
|
origin="gog",
|
|
title=title,
|
|
target=gog_url,
|
|
media_kind="game",
|
|
detail="No magnet available",
|
|
size_bytes=0,
|
|
annotations=["No Magnet"],
|
|
full_metadata=game
|
|
))
|
|
|
|
return results
|
|
|
|
def get_result_args(self) -> List[str]:
|
|
"""GOG results are URLs."""
|
|
return ["-url"]
|
|
|
|
def validate(self) -> bool:
|
|
"""GOG Games is a public website."""
|
|
return True
|
|
|
|
|
|
class YoutubeSearchProvider(SearchProvider):
|
|
"""
|
|
Search provider for YouTube using yt-dlp.
|
|
"""
|
|
|
|
RESULT_FIELDS = [
|
|
("title", "Title", None),
|
|
("uploader", "Uploader", None),
|
|
("duration_string", "Duration", None),
|
|
("view_count", "Views", lambda x: f"{x:,}" if x else ""),
|
|
]
|
|
|
|
def search(self, query: str, limit: int = 10, filters: Optional[Dict[str, Any]] = None, **kwargs) -> List[SearchResult]:
|
|
"""
|
|
Search YouTube using yt-dlp.
|
|
|
|
Args:
|
|
query: Search query
|
|
limit: Maximum number of results
|
|
filters: Optional filtering criteria (ignored for now)
|
|
|
|
Returns:
|
|
List of SearchResult objects
|
|
"""
|
|
# Check if yt-dlp is available
|
|
ytdlp_path = shutil.which("yt-dlp")
|
|
if not ytdlp_path:
|
|
log("yt-dlp not found in PATH", file=sys.stderr)
|
|
return []
|
|
|
|
# Construct command
|
|
# ytsearchN:query searches for N results
|
|
search_query = f"ytsearch{limit}:{query}"
|
|
|
|
cmd = [
|
|
ytdlp_path,
|
|
"--dump-json",
|
|
"--flat-playlist", # Don't resolve video details fully, faster
|
|
"--no-warnings",
|
|
search_query
|
|
]
|
|
|
|
try:
|
|
# Run yt-dlp
|
|
# We need to capture stdout. yt-dlp outputs one JSON object per line for search results
|
|
process = subprocess.run(
|
|
cmd,
|
|
capture_output=True,
|
|
text=True,
|
|
encoding="utf-8",
|
|
errors="replace"
|
|
)
|
|
|
|
if process.returncode != 0:
|
|
log(f"yt-dlp search failed: {process.stderr}", file=sys.stderr)
|
|
return []
|
|
|
|
results = []
|
|
for line in process.stdout.splitlines():
|
|
if not line.strip():
|
|
continue
|
|
|
|
try:
|
|
data = json.loads(line)
|
|
|
|
# Extract fields
|
|
title = data.get("title", "Unknown Title")
|
|
url = data.get("url")
|
|
if not url:
|
|
# Sometimes flat-playlist gives 'id', construct URL
|
|
video_id = data.get("id")
|
|
if video_id:
|
|
url = f"https://www.youtube.com/watch?v={video_id}"
|
|
else:
|
|
continue
|
|
|
|
uploader = data.get("uploader", "Unknown Uploader")
|
|
duration = data.get("duration") # seconds
|
|
view_count = data.get("view_count")
|
|
|
|
# Format duration
|
|
duration_str = ""
|
|
if duration:
|
|
try:
|
|
m, s = divmod(int(duration), 60)
|
|
h, m = divmod(m, 60)
|
|
if h > 0:
|
|
duration_str = f"{h}:{m:02d}:{s:02d}"
|
|
else:
|
|
duration_str = f"{m}:{s:02d}"
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
# Create annotations
|
|
annotations = []
|
|
if duration_str:
|
|
annotations.append(duration_str)
|
|
if view_count:
|
|
# Simple format for views
|
|
try:
|
|
vc = int(view_count)
|
|
if vc >= 1000000:
|
|
views_str = f"{vc/1000000:.1f}M views"
|
|
elif vc >= 1000:
|
|
views_str = f"{vc/1000:.1f}K views"
|
|
else:
|
|
views_str = f"{vc} views"
|
|
annotations.append(views_str)
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
annotations.append("youtube")
|
|
|
|
# Create result
|
|
result = SearchResult(
|
|
origin="youtube",
|
|
title=title,
|
|
target=url,
|
|
detail=f"by {uploader}",
|
|
annotations=annotations,
|
|
media_kind="video",
|
|
full_metadata=data,
|
|
columns=[
|
|
("Title", title),
|
|
("Uploader", uploader),
|
|
("Duration", duration_str),
|
|
("Views", str(view_count) if view_count else "")
|
|
]
|
|
)
|
|
results.append(result)
|
|
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
return results
|
|
|
|
except Exception as e:
|
|
log(f"Error running yt-dlp: {e}", file=sys.stderr)
|
|
return []
|
|
|
|
def get_result_args(self) -> List[str]:
|
|
"""YouTube results are URLs."""
|
|
return ["-url"]
|
|
|
|
def validate(self) -> bool:
|
|
"""Check if yt-dlp is installed."""
|
|
return shutil.which("yt-dlp") is not None
|
|
|
|
|
|
# Provider registry
|
|
_PROVIDERS = {
|
|
"local": LocalStorageProvider,
|
|
"libgen": LibGenProvider,
|
|
"soulseek": SoulSeekProvider,
|
|
"debrid": DebridProvider,
|
|
"openlibrary": OpenLibraryProvider,
|
|
"gog": GogGamesProvider,
|
|
"youtube": YoutubeSearchProvider,
|
|
}
|
|
|
|
|
|
def get_provider(name: str, config: Optional[Dict[str, Any]] = None) -> Optional[SearchProvider]:
|
|
"""
|
|
Get a search provider by name.
|
|
|
|
Args:
|
|
name: Provider name (case-insensitive): "local", "libgen", "soulseek", "debrid", "openlibrary"
|
|
config: Optional configuration dictionary
|
|
|
|
Returns:
|
|
SearchProvider instance or None if not found
|
|
"""
|
|
provider_class = _PROVIDERS.get(name.lower())
|
|
|
|
if provider_class is None:
|
|
log(f"Unknown search provider: {name}", file=sys.stderr)
|
|
return None
|
|
|
|
try:
|
|
provider = provider_class(config)
|
|
if not provider.validate():
|
|
log(f"Provider '{name}' is not properly configured or available", file=sys.stderr)
|
|
return None
|
|
return provider
|
|
|
|
except Exception as e:
|
|
log(f"Error initializing provider '{name}': {e}", file=sys.stderr)
|
|
return None
|
|
|
|
|
|
def list_providers(config: Optional[Dict[str, Any]] = None) -> Dict[str, bool]:
|
|
"""
|
|
List all available providers and whether they're available.
|
|
|
|
Args:
|
|
config: Optional configuration dictionary
|
|
|
|
Returns:
|
|
Dictionary mapping provider names to availability (True/False)
|
|
"""
|
|
availability = {}
|
|
for name, provider_class in _PROVIDERS.items():
|
|
try:
|
|
provider = provider_class(config)
|
|
availability[name] = provider.validate()
|
|
except Exception:
|
|
availability[name] = False
|
|
return availability
|
|
|
|
|
|
def register_provider(name: str, provider_class: type) -> None:
|
|
"""
|
|
Register a new search provider.
|
|
|
|
Args:
|
|
name: Provider name (lowercase)
|
|
provider_class: Class that inherits from SearchProvider
|
|
"""
|
|
_PROVIDERS[name.lower()] = provider_class
|
|
|
|
|
|
class FileProvider(ABC):
|
|
"""Abstract base class for file hosting providers."""
|
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None):
|
|
self.config = config or {}
|
|
self.name = self.__class__.__name__.replace("FileProvider", "").lower()
|
|
|
|
@abstractmethod
|
|
def upload(self, file_path: str) -> str:
|
|
"""Upload a file and return the URL."""
|
|
pass
|
|
|
|
def validate(self) -> bool:
|
|
"""Check if provider is available/configured."""
|
|
return True
|
|
|
|
|
|
class ZeroXZeroFileProvider(FileProvider):
|
|
"""File provider for 0x0.st."""
|
|
|
|
def __init__(self, config: Optional[Dict[str, Any]] = None):
|
|
super().__init__(config)
|
|
self.name = "0x0"
|
|
self.base_url = "https://0x0.st"
|
|
|
|
def upload(self, file_path: str) -> str:
|
|
"""Upload file to 0x0.st."""
|
|
from helper.http_client import HTTPClient
|
|
import os
|
|
|
|
if not os.path.exists(file_path):
|
|
raise FileNotFoundError(f"File not found: {file_path}")
|
|
|
|
try:
|
|
# 0x0.st expects 'file' field in multipart/form-data
|
|
# Use a custom User-Agent to avoid 403 Forbidden
|
|
headers = {"User-Agent": "Medeia-Macina/1.0"}
|
|
with HTTPClient(headers=headers) as client:
|
|
with open(file_path, 'rb') as f:
|
|
files = {'file': f}
|
|
response = client.post(self.base_url, files=files)
|
|
|
|
if response.status_code == 200:
|
|
return response.text.strip()
|
|
else:
|
|
raise Exception(f"Upload failed: {response.status_code} - {response.text}")
|
|
|
|
except Exception as e:
|
|
log(f"[0x0] Upload error: {e}", file=sys.stderr)
|
|
raise
|
|
|
|
def validate(self) -> bool:
|
|
return True
|
|
|
|
|
|
# File provider registry
|
|
_FILE_PROVIDERS = {
|
|
"0x0": ZeroXZeroFileProvider,
|
|
}
|
|
|
|
|
|
def get_file_provider(name: str, config: Optional[Dict[str, Any]] = None) -> Optional[FileProvider]:
|
|
"""
|
|
Get a file hosting provider by name.
|
|
|
|
Args:
|
|
name: Provider name (case-insensitive): "0x0"
|
|
config: Optional configuration dictionary
|
|
|
|
Returns:
|
|
FileProvider instance or None if not found
|
|
"""
|
|
provider_class = _FILE_PROVIDERS.get(name.lower())
|
|
|
|
if provider_class is None:
|
|
log(f"Unknown file provider: {name}", file=sys.stderr)
|
|
return None
|
|
|
|
try:
|
|
provider = provider_class(config)
|
|
if not provider.validate():
|
|
log(f"File provider '{name}' is not properly configured or available", file=sys.stderr)
|
|
return None
|
|
return provider
|
|
|
|
except Exception as e:
|
|
log(f"Error initializing file provider '{name}': {e}", file=sys.stderr)
|
|
return None
|
|
|
|
|
|
def list_file_providers(config: Optional[Dict[str, Any]] = None) -> Dict[str, bool]:
|
|
"""
|
|
List all available file hosting providers and whether they're available.
|
|
|
|
Args:
|
|
config: Optional configuration dictionary
|
|
|
|
Returns:
|
|
Dictionary mapping provider names to availability (True/False)
|
|
"""
|
|
availability = {}
|
|
for name, provider_class in _FILE_PROVIDERS.items():
|
|
try:
|
|
provider = provider_class(config)
|
|
availability[name] = provider.validate()
|
|
except Exception:
|
|
availability[name] = False
|
|
return availability
|
|
|
|
|
|
def register_file_provider(name: str, provider_class: type) -> None:
|
|
"""
|
|
Register a new file hosting provider.
|
|
|
|
Args:
|
|
name: Provider name (lowercase)
|
|
provider_class: Class that inherits from FileProvider
|
|
"""
|
|
_FILE_PROVIDERS[name.lower()] = provider_class
|
|
|
|
|
|
|
|
|