Files
Medios-Macina/helper/search_provider.py

1906 lines
70 KiB
Python
Raw Permalink Normal View History

2025-11-25 20:09:33 -08:00
"""
SearchProvider: Unified interface for different search backends.
This module defines a base class and registry for search providers that can be
used by search-file and other search-related cmdlets to handle different sources:
- Local file storage (LocalStorageBackend)
- Hydrus database
- AllDebrid magnets (search-debrid)
- Library Genesis / OpenLibrary books (search-libgen)
- Soulseek P2P network (search-soulseek)
- IMDB movies (future)
- Other sources
Usage:
from helper.search_provider import SearchProvider, get_provider
provider = get_provider("libgen")
results = provider.search("python programming", limit=10)
for result in results:
print(result["title"], result["target"], result["annotations"])
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Sequence, Tuple
from dataclasses import dataclass
from pathlib import Path
import sys
import subprocess
import json
import shutil
from helper.logger import log, debug
@dataclass
class SearchResult:
"""Unified search result format across all providers."""
# Required fields
origin: str # Provider name: "libgen", "soulseek", "debrid", "local", "hydrus", etc.
title: str # Display title/filename
target: str # Unique identifier or download target (URL, path, magnet hash, etc.)
# Optional fields
detail: str = "" # Additional details (size, status, format, etc.)
annotations: List[str] = None # Tags/annotations: ["ready", "120MB", "mp3", etc.]
media_kind: str = "other" # Type: "book", "audio", "video", "file", "magnet", etc.
size_bytes: Optional[int] = None # File size in bytes
tags: Optional[set[str]] = None # Searchable tags
full_metadata: Optional[Dict[str, Any]] = None # Extra metadata (author, year, etc.)
columns: List[Tuple[str, str]] = None # Display columns: [("Header", "value"), ...] for result table
def __post_init__(self):
"""Ensure mutable defaults are properly initialized."""
if self.annotations is None:
self.annotations = []
if self.tags is None:
self.tags = set()
if self.full_metadata is None:
self.full_metadata = {}
if self.columns is None:
self.columns = []
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for JSON serialization."""
data = {
"origin": self.origin,
"title": self.title,
"target": self.target,
"detail": self.detail,
"annotations": self.annotations,
"media_kind": self.media_kind,
"size_bytes": self.size_bytes,
"tags": list(self.tags) if self.tags else [],
"full_metadata": self.full_metadata,
}
if self.columns:
data["columns"] = list(self.columns)
return data
class SearchProvider(ABC):
"""Abstract base class for search providers."""
# Provider-specific field definitions: list of (api_field_name, display_column_name, formatter_func)
# Override in subclasses to define which fields to request and how to display them
# Example: [("title", "Title", None), ("author_name", "Author(s)", lambda x: ", ".join(x) if isinstance(x, list) else x)]
RESULT_FIELDS: List[Tuple[str, str, Optional[Any]]] = []
def __init__(self, config: Dict[str, Any] = None):
"""
Initialize provider with optional configuration.
Args:
config: Configuration dictionary (global config dict)
"""
self.config = config or {}
self.name = self.__class__.__name__.replace("Provider", "").lower()
@abstractmethod
def search(
self,
query: str,
limit: int = 50,
filters: Optional[Dict[str, Any]] = None,
**kwargs
) -> List[SearchResult]:
"""
Search for items matching the query.
Args:
query: Search query string. Special value "*" means "match all"
limit: Maximum number of results to return
filters: Optional filtering criteria (type, size, status, etc.)
**kwargs: Provider-specific arguments
Returns:
List of SearchResult objects
"""
pass
@abstractmethod
def get_result_args(self) -> List[str]:
"""
Get command-line arguments from a search result to pass to downstream cmdlets.
Example: For libgen, returns ["-url", result.target]
For soulseek, returns ["-id", result.target]
For local, returns ["-path", result.target]
Returns:
List of arguments to append to cmdlet invocation
"""
pass
def parse_args(self, args: Sequence[str]) -> Tuple[str, Dict[str, Any]]:
"""
Parse provider-specific command-line arguments.
Args:
args: Sequence of command-line arguments
Returns:
Tuple of (query, filters_dict)
"""
# Default implementation: first arg is query, rest are filters
query = args[0] if args else ""
filters = {}
return query, filters
def validate(self) -> bool:
"""
Validate that provider is properly configured and ready to use.
Returns:
True if provider is available, False otherwise
"""
return True
def get_columns_format(self) -> List[str]:
"""
Define which columns this provider displays in result table.
Returns:
List of column names to display.
Each provider can override to customize result table appearance.
Examples: ["Title", "Author", "Year"] for books
["Title", "Duration", "Format"] for media
["Title", "Size", "Status"] for files
Default: Empty list (uses traditional detail/origin/media_kind/target)
"""
return [col_name for _, col_name, _ in self.RESULT_FIELDS] if self.RESULT_FIELDS else []
def get_api_fields_string(self) -> str:
"""
Generate comma-separated API fields string from RESULT_FIELDS.
Returns:
Comma-separated string of API field names to request
Example: "title,author_name,first_publish_year,isbn,key"
"""
if not self.RESULT_FIELDS:
return ""
return ",".join(field_name for field_name, _, _ in self.RESULT_FIELDS)
def build_columns_from_doc(self, doc: Dict[str, Any], idx: int = None) -> List[Tuple[str, str]]:
"""
Dynamically build columns from a result document using RESULT_FIELDS definition.
Args:
doc: API response document (dict with field values)
idx: Optional index/number for the result (typically added as first column)
Returns:
List of (header, value) tuples ready for SearchResult.columns
"""
columns = []
# Add index as first column if provided
if idx is not None:
columns.append(("#", str(idx)))
# Process each field definition
for api_field_name, display_col_name, formatter_func in self.RESULT_FIELDS:
value = doc.get(api_field_name, "")
# Apply formatter if defined
if formatter_func and value:
value = formatter_func(value)
# Convert to string and add to columns
value_str = str(value) if value else "Unknown"
columns.append((display_col_name, value_str))
return columns
class LocalStorageProvider(SearchProvider):
"""Search provider for local file system storage."""
def __init__(self, config: Dict[str, Any] = None):
super().__init__(config)
self.name = "local"
# Import here to avoid circular dependency
from helper.file_storage import FileStorage
self.storage = FileStorage(config)
def search(
self,
query: str,
limit: int = 50,
filters: Optional[Dict[str, Any]] = None,
**kwargs
) -> List[SearchResult]:
"""Search local file storage."""
filters = filters or {}
backend_name = filters.get("backend", "local")
try:
# Use the backend from FileStorage
results = self.storage[backend_name].search(query, limit=limit)
search_results = []
for result_dict in results:
path = result_dict.get("path", "")
size = result_dict.get("size")
annotations = []
if size:
annotations.append(f"{size / 1e6:.1f}MB")
search_results.append(SearchResult(
origin="local",
title=path.split("\\")[-1] if path else "Unknown",
target=path,
detail=f"Local: {path}",
annotations=annotations,
size_bytes=size,
))
return search_results
except Exception as e:
log(f"[local] Search error: {e}", file=sys.stderr)
return []
def get_result_args(self) -> List[str]:
"""Local storage uses -path argument."""
return ["-path"]
def validate(self) -> bool:
"""Local storage is always available."""
return True
class LibGenProvider(SearchProvider):
"""Search provider for Library Genesis books."""
# Define fields to display (note: LibGen doesn't have API field mapping like OpenLibrary)
# These are extracted from the book dict directly
RESULT_FIELDS = [
("title", "Title", None),
("author", "Author(s)", None),
("year", "Year", None),
]
def __init__(self, config: Dict[str, Any] = None):
super().__init__(config)
self.name = "libgen"
def search(
self,
query: str,
limit: int = 50,
filters: Optional[Dict[str, Any]] = None,
**kwargs
) -> List[SearchResult]:
"""Search Library Genesis for books.
Supports dynamic query format:
- isbn:0557677203
- author:"Albert Pike"
- title:"Book Title"
- Combination: isbn:0557677203 author:"Albert Pike" free text
Priority: ISBN is the authoritative key for searching.
"""
filters = filters or {}
try:
from helper.unified_book_downloader import UnifiedBookDownloader
from helper.query_parser import parse_query, get_field, get_free_text
debug(f"[libgen] Starting search for: {query}")
# Parse the query to extract structured fields
parsed = parse_query(query)
isbn = get_field(parsed, 'isbn')
author = get_field(parsed, 'author')
title = get_field(parsed, 'title')
free_text = get_free_text(parsed)
# Build the search query for libgen
# Priority: isbn (authoritative key) > title > author > free_text
if isbn:
search_query = isbn
elif title:
search_query = title
elif author:
search_query = author
else:
search_query = free_text or query
debug(f"[libgen] Built search query: {search_query}")
downloader = UnifiedBookDownloader(config=self.config)
search_fn = getattr(downloader, "search_libgen", None)
if not callable(search_fn):
log("[libgen] Searcher unavailable", file=sys.stderr)
return []
debug(f"[libgen] Calling search_libgen with query: {search_query}")
books = search_fn(search_query, limit=limit)
debug(f"[libgen] Got {len(books) if books else 0} results from search_libgen")
search_results = []
for idx, book in enumerate(books, 1):
# Build columns dynamically from RESULT_FIELDS
columns = self.build_columns_from_doc(book, idx)
title = book.get("title", "Unknown")
author = book.get("author", "Unknown")
year = book.get("year", "Unknown")
filesize = book.get("filesize_str", "Unknown")
isbn = book.get("isbn", "")
mirror_url = book.get("mirror_url", "")
# Build detail with author and year
detail = f"By: {author}"
if year and year != "Unknown":
detail += f" ({year})"
annotations = [f"{filesize}"]
if isbn:
annotations.append(f"ISBN: {isbn}")
search_results.append(SearchResult(
origin="libgen",
title=title,
target=mirror_url or f"libgen:{book.get('id', '')}",
detail=detail,
annotations=annotations,
media_kind="book",
columns=columns,
full_metadata={
"number": idx,
"author": author,
"year": year,
"isbn": isbn,
"filesize": filesize,
"mirrors": book.get("mirrors", {}),
"book_id": book.get("book_id", ""),
"md5": book.get("md5", ""),
},
))
debug(f"[libgen] Returning {len(search_results)} formatted results")
return search_results
except Exception as e:
log(f"[libgen] Search error: {e}", file=sys.stderr)
import traceback
log(traceback.format_exc(), file=sys.stderr)
return []
def get_result_args(self) -> List[str]:
"""LibGen results use -url for download or -mirror for selection."""
return ["-url"]
def validate(self) -> bool:
"""Check if LibGen downloader is available."""
try:
from helper.unified_book_downloader import UnifiedBookDownloader
return True
except Exception:
return False
class SoulSeekProvider(SearchProvider):
"""Search provider for Soulseek P2P network."""
# Allowed music file extensions
MUSIC_EXTENSIONS = {
'.flac', '.mp3', '.m4a', '.aac', '.ogg', '.opus',
'.wav', '.alac', '.wma', '.ape', '.aiff', '.dsf',
'.dff', '.wv', '.tta', '.tak', '.ac3', '.dts'
}
# Display columns for search results
RESULT_FIELDS = [
("track_num", "Track", None),
("title", "Title", None),
("artist", "Artist", lambda x: (str(x)[:32] + '...') if x and len(str(x)) > 35 else x),
("album", "Album", lambda x: (str(x)[:32] + '...') if x and len(str(x)) > 35 else x),
("size", "Size", lambda x: f"{int(int(x)/1024/1024)} MB" if x else ""),
]
# Soulseek config
USERNAME = "asjhkjljhkjfdsd334"
PASSWORD = "khhhg"
DOWNLOAD_DIR = "./downloads"
MAX_WAIT_TRANSFER = 1200
def __init__(self, config: Dict[str, Any] = None):
super().__init__(config)
self.name = "soulseek"
async def perform_search(
self,
query: str,
timeout: float = 9.0,
limit: int = 50
) -> List[Dict[str, Any]]:
"""Perform async Soulseek search and return flattened results."""
import asyncio
import os
import re
import time
from aioslsk.client import SoulSeekClient
from aioslsk.settings import Settings, CredentialsSettings
os.makedirs(self.DOWNLOAD_DIR, exist_ok=True)
settings = Settings(credentials=CredentialsSettings(username=self.USERNAME, password=self.PASSWORD))
client = SoulSeekClient(settings)
try:
await client.start()
await client.login()
except Exception as e:
log(f"[soulseek] Login failed: {type(e).__name__}: {e}", file=sys.stderr)
return []
try:
search_request = await client.searches.search(query)
await self._collect_search_results(client, search_request, timeout=timeout)
flat = self._flatten_search_results(search_request)[:limit]
return flat
except Exception as e:
log(f"[soulseek] Search error: {type(e).__name__}: {e}", file=sys.stderr)
return []
finally:
try:
await client.stop()
except Exception:
pass
def _flatten_search_results(self, search_request) -> List[dict]:
"""Extract files from SearchRequest.results."""
flat: List[dict] = []
for result in search_request.results:
username = getattr(result, "username", "?")
for file_data in getattr(result, "shared_items", []):
flat.append({
"file": file_data,
"username": username,
"filename": getattr(file_data, "filename", "?"),
"size": getattr(file_data, "filesize", 0),
})
for file_data in getattr(result, "locked_results", []):
flat.append({
"file": file_data,
"username": username,
"filename": getattr(file_data, "filename", "?"),
"size": getattr(file_data, "filesize", 0),
})
return flat
async def _collect_search_results(self, client, search_request, timeout: float = 75.0) -> None:
"""Collect search results by waiting."""
import asyncio
import time
debug(f"[soulseek] Collecting results for {timeout}s...")
end = time.time() + timeout
last_count = 0
while time.time() < end:
current_count = len(search_request.results)
if current_count > last_count:
debug(f"[soulseek] Got {current_count} result(s) so far...")
last_count = current_count
await asyncio.sleep(0.5)
async def download_file(
self,
username: str,
filename: str,
file_size: int,
target_dir: Optional[str] = None
) -> bool:
"""Download a file from Soulseek to a specific directory."""
import asyncio
import os
import time
from aioslsk.client import SoulSeekClient
from aioslsk.settings import Settings, CredentialsSettings
from aioslsk.events import TransferProgressEvent
from tqdm import tqdm
download_dir = target_dir if target_dir else self.DOWNLOAD_DIR
os.makedirs(download_dir, exist_ok=True)
settings = Settings(credentials=CredentialsSettings(username=self.USERNAME, password=self.PASSWORD))
settings.shares.download = download_dir
client = SoulSeekClient(settings)
try:
await client.start()
await client.login()
debug(f"[soulseek] Starting: {filename} from {username}")
transfer = await client.transfers.download(username, filename)
if transfer is None:
log("[soulseek] Failed: transfer object is None")
return False
success = await self._wait_for_transfer(client, transfer, file_size=file_size, max_wait=self.MAX_WAIT_TRANSFER)
return success
except Exception as e:
log(f"[soulseek] Download error: {type(e).__name__}: {e}", file=sys.stderr)
return False
finally:
try:
await client.stop()
except Exception:
pass
async def _wait_for_transfer(self, client, transfer_obj: Any, file_size: Any = None, max_wait: float = 1200) -> bool:
"""Wait for transfer finish using event listeners with TQDM progress bar.
Returns:
True if transfer completed successfully, False if failed or timed out.
"""
import asyncio
import time
from aioslsk.events import TransferProgressEvent
from tqdm import tqdm
if transfer_obj is None:
log("[soulseek] No transfer object returned")
return False
transfer_finished = False
transfer_success = False
pbar = None
total_size = file_size
last_speed_time = time.time()
last_speed = 0
async def on_progress(event):
nonlocal last_speed_time, last_speed, transfer_finished, transfer_success, pbar, total_size
if not hasattr(event, 'updates') or not event.updates:
return
for transfer, _, curr_snapshot in event.updates:
if (transfer.username == transfer_obj.username and transfer.remote_path == transfer_obj.remote_path):
bytes_xfer = getattr(curr_snapshot, 'bytes_transfered', 0)
state_name = curr_snapshot.state.name if hasattr(curr_snapshot, 'state') else "?"
speed = getattr(curr_snapshot, 'speed', 0)
if total_size is None and hasattr(transfer, 'file_attributes'):
try:
size = getattr(transfer, 'file_size', None) or getattr(transfer, 'size', None)
if size:
total_size = size
except Exception:
pass
if pbar is None:
total = total_size if total_size else 100 * 1024 * 1024
pbar = tqdm(total=total, unit='B', unit_scale=True, desc='[transfer]')
if pbar:
pbar.n = bytes_xfer
if speed > 0:
pbar.set_postfix({"speed": f"{speed/1024:.1f} KB/s", "state": state_name})
pbar.refresh()
if state_name in ('FINISHED', 'COMPLETE'):
if pbar:
pbar.close()
debug(f"[soulseek] Transfer {state_name.lower()}")
transfer_finished = True
transfer_success = True
return
elif state_name in ('ABORTED', 'FAILED', 'PAUSED'):
if pbar:
pbar.close()
debug(f"[soulseek] Transfer {state_name.lower()}")
transfer_finished = True
transfer_success = False
return
if total_size and bytes_xfer >= total_size:
if pbar:
pbar.close()
debug(f"[soulseek] Transfer complete ({bytes_xfer / 1024 / 1024:.1f} MB)")
transfer_finished = True
transfer_success = True
return
if speed == 0 and bytes_xfer > 0:
now = time.time()
if now - last_speed_time > 3:
if pbar:
pbar.close()
debug(f"[soulseek] Transfer complete ({bytes_xfer / 1024 / 1024:.1f} MB)")
transfer_finished = True
transfer_success = True
return
else:
last_speed_time = time.time()
last_speed = speed
client.events.register(TransferProgressEvent, on_progress)
end = time.time() + max_wait
while time.time() < end:
if transfer_finished:
break
await asyncio.sleep(0.5)
client.events.unregister(TransferProgressEvent, on_progress)
if pbar:
pbar.close()
if not transfer_finished:
log(f"[soulseek] Timed out after {max_wait}s; transfer may still be in progress")
return False
else:
return transfer_success
def search(
self,
query: str,
limit: int = 50,
filters: Optional[Dict[str, Any]] = None,
**kwargs
) -> List[SearchResult]:
"""Search Soulseek P2P network (synchronous wrapper)."""
import asyncio
import re
filters = filters or {}
try:
# Run async search
flat_results = asyncio.run(self.perform_search(query, timeout=9.0, limit=limit))
if not flat_results:
return []
# Filter to music files only
music_results = []
for item in flat_results:
filename = item['filename']
if '.' in filename:
ext = '.' + filename.rsplit('.', 1)[-1].lower()
else:
ext = ''
if ext in self.MUSIC_EXTENSIONS:
music_results.append(item)
if not music_results:
return []
# Extract metadata for all results
enriched_results = []
for item in music_results:
filename = item['filename']
# Extract extension
if '.' in filename:
_, ext = filename.rsplit('.', 1)
ext = '.' + ext.lower()
else:
ext = ''
# Get display filename
if '\\' in filename:
display_name = filename.rsplit('\\', 1)[-1]
elif '/' in filename:
display_name = filename.rsplit('/', 1)[-1]
else:
display_name = filename
# Extract path hierarchy for artist/album
path_parts = filename.replace('\\', '/').split('/')
artist = ''
album = ''
if len(path_parts) >= 3:
artist = path_parts[-3]
album = path_parts[-2]
if ' - ' in album and re.match(r'^\d{4}', album):
album = album.split(' - ', 1)[1]
elif len(path_parts) == 2:
artist = path_parts[-2]
# Extract track number and title
base_name = display_name.rsplit('.', 1)[0] if '.' in display_name else display_name
track_num = ''
title = base_name
filename_artist = ''
# First, extract track number if present (e.g., "30 Stumfol - Prisoner" -> track=30, rest="Stumfol - Prisoner")
match = re.match(r'^(\d{1,3})\s*[\.\-]?\s+(.+)$', base_name)
if match:
track_num = match.group(1)
remainder = match.group(2)
# Now parse "Artist - Title" from the remainder
# If there's a " - " separator, split on it
if ' - ' in remainder:
parts = remainder.split(' - ', 1)
filename_artist = parts[0].strip()
title = parts[1].strip()
else:
# No artist-title separator, use the whole remainder as title
title = remainder
else:
# No track number, check if there's "Artist - Title" format
if ' - ' in base_name:
parts = base_name.split(' - ', 1)
filename_artist = parts[0].strip()
title = parts[1].strip()
# Use filename_artist if extracted, otherwise fall back to path artist
if filename_artist:
artist = filename_artist
enriched_results.append({
**item,
'artist': artist,
'album': album,
'title': title,
'track_num': track_num,
'ext': ext
})
# Apply filters if specified
if filters:
artist_filter = filters.get('artist', '').lower() if filters.get('artist') else ''
album_filter = filters.get('album', '').lower() if filters.get('album') else ''
track_filter = filters.get('track', '').lower() if filters.get('track') else ''
if artist_filter or album_filter or track_filter:
filtered_results = []
for item in enriched_results:
if artist_filter and artist_filter not in (item['artist'] or '').lower():
continue
if album_filter and album_filter not in (item['album'] or '').lower():
continue
if track_filter and track_filter not in (item['title'] or '').lower():
continue
filtered_results.append(item)
enriched_results = filtered_results
# Sort: .flac first, then others
enriched_results.sort(key=lambda item: (item['ext'].lower() != '.flac', -item['size']))
# Convert to SearchResult format
search_results = []
for idx, item in enumerate(enriched_results, 1):
artist_display = item['artist'] if item['artist'] else "(no artist)"
album_display = item['album'] if item['album'] else "(no album)"
size_mb = int(round(item['size'] / 1024 / 1024))
if item['track_num']:
track_title = f"[{item['track_num']}] {item['title']}"
else:
track_title = item['title'] or "(untitled)"
# Build columns from enriched metadata
columns = self.build_columns_from_doc(item, idx=idx)
search_results.append(SearchResult(
origin="soulseek",
title=track_title,
target=item['filename'],
detail=f"Artist: {artist_display} | Album: {album_display}",
annotations=[f"{size_mb} MB", item['ext']],
media_kind="audio",
size_bytes=item['size'],
columns=columns,
full_metadata={
"artist": item['artist'],
"album": item['album'],
"track_num": item['track_num'],
"username": item['username'],
"filename": item['filename'],
"ext": item['ext'],
},
))
return search_results
except Exception as e:
log(f"Soulseek search error: {e}", file=sys.stderr)
return []
def get_result_args(self) -> List[str]:
"""Soulseek results use filename/path for results."""
return ["-path"]
def validate(self) -> bool:
"""Check if Soulseek client is available."""
try:
import aioslsk # type: ignore
return True
except ImportError:
return False
class DebridProvider(SearchProvider):
"""Search provider for AllDebrid magnets."""
# Status code mappings
STATUS_MAP = {
0: "In Queue",
1: "Downloading",
2: "Compressing",
3: "Uploading",
4: "Ready",
5: "Upload Failed",
6: "Unpack Error",
7: "Not Downloaded",
8: "File Too Big",
9: "Internal Error",
10: "Download Timeout",
11: "Deleted",
12: "Processing Failed",
13: "Processing Failed",
14: "Tracker Error",
15: "No Peers"
}
def __init__(self, config: Dict[str, Any] = None):
super().__init__(config)
self.name = "debrid"
self._magnet_files_cache = {}
def _format_size(self, bytes_val: float) -> str:
"""Format bytes to human readable size."""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if bytes_val < 1024:
return f"{bytes_val:.2f} {unit}"
bytes_val /= 1024
return f"{bytes_val:.2f} PB"
def _get_status_display(self, status_code: int) -> str:
"""Get human-readable status for AllDebrid status codes."""
return self.STATUS_MAP.get(status_code, f"Unknown ({status_code})")
def _should_filter_magnet(self, status_code: int, status_text: str) -> bool:
"""Check if magnet should be filtered out (expired/deleted)."""
# Filter expired/deleted entries
return status_code in (5, 6, 7, 8, 11, 12, 13, 14)
def _fuzzy_match(self, text: str, pattern: str) -> bool:
"""Check if pattern fuzzy-matches text (case-insensitive, substring matching)."""
return pattern.lower() in text.lower()
def search(
self,
query: str,
limit: int = 50,
filters: Optional[Dict[str, Any]] = None,
**kwargs
) -> List[SearchResult]:
"""Search AllDebrid magnets with optional status and name filtering.
Args:
query: Search query (magnet filename or '*' for all)
limit: Max results to return
filters: Optional dict with 'status' filter ('all', 'active', 'ready', 'error')
Returns:
List of SearchResult objects
"""
filters = filters or {}
try:
from helper.alldebrid import AllDebridClient
from config import get_debrid_api_key
api_key = get_debrid_api_key(self.config)
if not api_key:
log("[debrid] API key not configured", file=sys.stderr)
return []
client = AllDebridClient(api_key)
# Parse status filter
status_filter_param = filters.get('status', 'all').lower() if filters.get('status') else 'all'
# Get magnets with optional status filter
response = client._request("magnet/status", {})
if response.get("status") != "success":
log(f"[debrid] API error: {response.get('error', 'Unknown')}", file=sys.stderr)
return []
magnets = response.get("data", {}).get("magnets", [])
# Handle both list and dict formats
if isinstance(magnets, dict):
magnets = list(magnets.values())
# Filter by status if specified
if status_filter_param == 'active':
magnets = [m for m in magnets if m.get('statusCode', -1) in (0, 1, 2, 3)]
elif status_filter_param == 'ready':
magnets = [m for m in magnets if m.get('statusCode', -1) == 4]
elif status_filter_param == 'error':
magnets = [m for m in magnets if m.get('statusCode', -1) in (5, 6, 8, 9, 10, 12, 13, 14, 15)]
# 'all' includes everything
# Filter by query (fuzzy match on filename)
results = []
count = 0
for magnet in magnets:
if count >= limit:
break
filename = magnet.get("filename", "")
status_code = magnet.get("statusCode", -1)
status_text = magnet.get("status", "Unknown")
# Skip expired/deleted unless 'all' filter
if status_filter_param != 'all' and self._should_filter_magnet(status_code, status_text):
continue
# Apply query filter (skip if doesn't match)
if query and query != "*" and not self._fuzzy_match(filename, query):
continue
magnet_id = magnet.get("id")
size = magnet.get("size", 0)
downloaded = magnet.get("downloaded", 0)
progress = (downloaded / size * 100) if size > 0 else 0
# Get status emoji
if status_code == 4:
status_emoji = ""
elif status_code < 4:
status_emoji = ""
else:
status_emoji = ""
annotations = [self._get_status_display(status_code)]
if size > 0:
annotations.append(self._format_size(size))
if progress > 0 and progress < 100:
annotations.append(f"{progress:.1f}%")
results.append(SearchResult(
origin="debrid",
title=filename or "Unknown",
target=str(magnet_id),
detail=f"{status_emoji} {self._get_status_display(status_code)} | {self._format_size(size)}",
annotations=annotations,
media_kind="magnet",
size_bytes=size,
full_metadata={
"magnet_id": magnet_id,
"status_code": status_code,
"status_text": status_text,
"progress": progress,
"downloaded": downloaded,
"seeders": magnet.get("seeders", 0),
"download_speed": magnet.get("downloadSpeed", 0),
},
))
count += 1
# Cache metadata for ready magnets
if results:
self._cache_ready_magnet_metadata(client, [r for r in results if r.full_metadata.get('status_code') == 4])
return results
except Exception as e:
log(f"Debrid search error: {e}", file=sys.stderr)
return []
def _cache_ready_magnet_metadata(self, client, results: List[SearchResult]) -> None:
"""Cache file metadata for ready magnets."""
if not results:
return
try:
ready_ids = [r.full_metadata.get('magnet_id') for r in results if r.full_metadata.get('status_code') == 4]
if ready_ids:
self._magnet_files_cache = client.magnet_links(ready_ids)
log(f"[debrid] Cached metadata for {len(self._magnet_files_cache)} ready magnet(s)", file=sys.stderr)
except Exception as e:
log(f"[debrid] Warning: Could not cache magnet metadata: {e}", file=sys.stderr)
def get_magnet_metadata(self, magnet_id: int) -> Optional[Dict[str, Any]]:
"""Get cached metadata for a magnet."""
return self._magnet_files_cache.get(str(magnet_id))
def get_result_args(self) -> List[str]:
"""Debrid results use magnet ID for download."""
return ["-id"]
def validate(self) -> bool:
"""Check if AllDebrid is configured."""
from config import get_debrid_api_key
return bool(get_debrid_api_key(self.config))
class OpenLibraryProvider(SearchProvider):
"""Search provider for OpenLibrary."""
# Define fields to request from API and how to display them
RESULT_FIELDS = [
("title", "Title", None),
("author_name", "Author", lambda x: ", ".join(x) if isinstance(x, list) else x),
("first_publish_year", "Year", None),
("status", "Status", None),
]
def __init__(self, config: Dict[str, Any] = None):
super().__init__(config)
self.name = "openlibrary"
def _derive_status(self, doc: Dict[str, Any]) -> tuple[str, Optional[str]]:
"""Determine availability label and archive identifier."""
ebook_access = str(doc.get("ebook_access", "") or "").strip().lower()
has_fulltext = bool(doc.get("has_fulltext"))
ia_entries = doc.get("ia")
archive_id = ""
if isinstance(ia_entries, list):
for entry in ia_entries:
if isinstance(entry, str) and entry.strip():
archive_id = entry.strip()
break
elif isinstance(ia_entries, str) and ia_entries.strip():
archive_id = ia_entries.strip()
elif isinstance(doc.get("ocaid"), str) and doc["ocaid"].strip():
archive_id = doc["ocaid"].strip()
available = False
if ebook_access in {"borrowable", "public", "full"}:
available = True
elif has_fulltext:
available = True
elif archive_id:
available = True
status = "download" if available else "?Libgen"
return status, archive_id or None
def search(
self,
query: str,
limit: int = 50,
filters: Optional[Dict[str, Any]] = None,
**kwargs
) -> List[SearchResult]:
"""Search OpenLibrary for books.
Smart search that detects ISBN, OCLC, OpenLibrary ID, and falls back to title search.
"""
filters = filters or {}
try:
import requests
query_clean = query.strip()
search_url = "https://openlibrary.org/search.json"
# Try to detect query type (ISBN, OCLC, OL ID, or title)
if query_clean.isdigit() and len(query_clean) in (10, 13):
# ISBN search
url = f"https://openlibrary.org/isbn/{query_clean}.json"
response = requests.get(url, timeout=9)
if response.status_code == 200:
book_data = response.json()
return [self._format_isbn_result(book_data, query_clean)]
elif response.status_code == 404:
return []
# Default to title/general search
params = {
"q": query_clean,
"limit": limit,
"fields": f"{self.get_api_fields_string()},isbn,oclc_numbers,lccn,number_of_pages_median,language,key,ebook_access,ia,has_fulltext",
}
response = requests.get(search_url, params=params, timeout=9)
response.raise_for_status()
data = response.json()
search_results = []
for idx, doc in enumerate(data.get("docs", []), 1):
# Extract OLID first (needed for metadata)
olid = doc.get("key", "").split("/")[-1]
# Determine status/availability
status, archive_id = self._derive_status(doc)
doc["status"] = status
# Build columns dynamically from RESULT_FIELDS (now includes status)
columns = self.build_columns_from_doc(doc, idx)
# Extract additional metadata
title = doc.get("title", "Unknown")
authors = doc.get("author_name", ["Unknown"])
year = doc.get("first_publish_year", "")
isbn_list = doc.get("isbn", [])
isbn = isbn_list[0] if isbn_list else ""
oclc_list = doc.get("oclc_numbers", [])
oclc = oclc_list[0] if oclc_list else ""
lccn_list = doc.get("lccn", [])
lccn = lccn_list[0] if lccn_list else ""
pages = doc.get("number_of_pages_median", "")
languages = doc.get("language", [])
language = languages[0] if languages else ""
author_str = ", ".join(authors) if authors else "Unknown"
# Build detail with author and year
detail = f"By: {author_str}"
if year:
detail += f" ({year})"
# Build annotations with additional info
annotations = []
if pages:
annotations.append(f"{pages} pages")
if isbn:
annotations.append(f"ISBN: {isbn}")
search_results.append(SearchResult(
origin="openlibrary",
title=title,
target=f"https://openlibrary.org/books/{olid}",
detail=detail,
annotations=annotations,
media_kind="book",
columns=columns,
full_metadata={
"number": idx,
"authors": authors,
"year": year,
"isbn": isbn,
"oclc": oclc,
"lccn": lccn,
"pages": pages,
"language": language,
"olid": olid,
"ebook_access": doc.get("ebook_access", ""),
"status": status,
"archive_id": archive_id,
},
))
# Sort results: borrowable ones first, then not borrowable, then unknown
def sort_key(result):
status = (result.full_metadata.get("status") or "").strip().lower()
if status == "download":
return (0, result.title)
elif status.startswith("?libgen"):
return (1, result.title)
else:
return (2, result.title)
search_results.sort(key=sort_key)
# Rebuild number field after sorting
for new_idx, result in enumerate(search_results, 1):
result.full_metadata["number"] = new_idx
# Update the # column in columns
if result.columns and result.columns[0][0] == "#":
result.columns[0] = ("#", str(new_idx))
return search_results
except Exception as e:
log(f"OpenLibrary search error: {e}", file=sys.stderr)
return []
def _format_isbn_result(self, book_data: Dict[str, Any], isbn: str) -> SearchResult:
"""Format a book result from ISBN endpoint."""
# Get title from book data
title = book_data.get("title", "Unknown")
# Get authors
author_list = []
for author_key in book_data.get("authors", []):
if isinstance(author_key, dict):
author_list.append(author_key.get("name", ""))
elif isinstance(author_key, str):
author_list.append(author_key)
author_str = ", ".join(filter(None, author_list)) if author_list else "Unknown"
# Extract other metadata
year = book_data.get("first_publish_year", "")
publishers = book_data.get("publishers", [])
publisher = publishers[0].get("name", "") if publishers and isinstance(publishers[0], dict) else ""
pages = book_data.get("number_of_pages", "")
languages = book_data.get("languages", [])
language = languages[0].get("key", "").replace("/languages/", "") if languages else ""
olid = book_data.get("key", "").split("/")[-1] if book_data.get("key") else ""
# Build doc for column rendering
doc = {
"title": title,
"author_name": author_list,
"first_publish_year": year,
"ebook_access": book_data.get("ebook_access", ""),
"has_fulltext": bool(book_data.get("ocaid")),
"ia": [book_data.get("ocaid")] if book_data.get("ocaid") else [],
"ocaid": book_data.get("ocaid", ""),
}
status, archive_id = self._derive_status(doc)
doc["status"] = status
# Build detail
detail = f"By: {author_str}"
if year:
detail += f" ({year})"
# Build annotations
annotations = []
if pages:
annotations.append(f"{pages} pages")
annotations.append(f"ISBN: {isbn}")
# Build columns using shared helper for consistency
columns = self.build_columns_from_doc(doc, idx=1)
return SearchResult(
origin="openlibrary",
title=title,
target=f"https://openlibrary.org/books/{olid}",
detail=detail,
annotations=annotations,
media_kind="book",
columns=columns,
full_metadata={
"number": 1,
"authors": author_list,
"year": year,
"isbn": isbn,
"oclc": "",
"lccn": "",
"pages": pages,
"language": language,
"olid": olid,
"publisher": publisher,
"ebook_access": doc.get("ebook_access", ""),
"status": status,
"archive_id": archive_id,
},
)
def get_result_args(self) -> List[str]:
"""OpenLibrary results are info/links only."""
return ["-info"]
def validate(self) -> bool:
"""OpenLibrary is always available (no auth needed)."""
return True
class GogGamesProvider(SearchProvider):
"""Search provider for GOG Games."""
def __init__(self, config: Dict[str, Any] = None):
super().__init__(config)
self.name = "gog"
self.base_url = "https://gog-games.to"
self.headers = {
"Referer": "https://gog-games.to/",
"Origin": "https://gog-games.to",
"X-Requested-With": "XMLHttpRequest"
}
def _request(self, client, endpoint: str, is_json: bool = True) -> Any:
"""Helper for API requests."""
url = f"{self.base_url}/api/web/{endpoint}"
try:
response = client.get(url, headers=self.headers)
if response.status_code == 200:
return response.json() if is_json else response.text
elif response.status_code == 404:
return None
else:
log(f"[gog] API request failed: {response.status_code} for {endpoint}", file=sys.stderr)
return None
except Exception as e:
log(f"[gog] Request error: {e}", file=sys.stderr)
return None
def get_all_games(self, client) -> List[Dict[str, Any]]:
"""Fetch all games from the API."""
return self._request(client, "all-games") or []
def get_game_details(self, client, slug: str) -> Optional[Dict[str, Any]]:
"""Fetch details for a specific game."""
return self._request(client, f"query-game/{slug}")
def get_game_md5(self, client, slug: str) -> Optional[str]:
"""Fetch MD5 checksums for a game."""
return self._request(client, f"download-md5/{slug}", is_json=False)
def search(
self,
query: str,
limit: int = 50,
filters: Optional[Dict[str, Any]] = None,
**kwargs
) -> List[SearchResult]:
"""Search GOG Games."""
from helper.http_client import HTTPClient
results = []
query_norm = query.strip().lower()
with HTTPClient() as client:
# 1. Fetch all games to perform fuzzy search
all_games = self.get_all_games(client)
matches = []
if all_games:
for game in all_games:
if (query_norm in game.get("title", "").lower() or
query_norm in game.get("slug", "").lower()):
matches.append(game)
# 2. Fallback: If no matches and query looks like a slug, try direct lookup
if not matches and "_" in query_norm:
details = self.get_game_details(client, query_norm)
if details and "game_info" in details:
matches.append(details["game_info"])
for game in matches[:limit]:
slug = game.get("slug")
title = game.get("title", slug)
infohash = game.get("infohash")
gog_url = game.get("gog_url", "")
# Note: 'all-games' endpoint doesn't provide file size.
# We set size to 0 to avoid N+1 requests.
if infohash:
magnet_link = f"magnet:?xt=urn:btih:{infohash}&dn={slug}"
results.append(SearchResult(
origin="gog",
title=title,
target=magnet_link,
media_kind="magnet",
detail="Magnet Link",
size_bytes=0,
annotations=["Magnet"],
full_metadata=game
))
else:
results.append(SearchResult(
origin="gog",
title=title,
target=gog_url,
media_kind="game",
detail="No magnet available",
size_bytes=0,
annotations=["No Magnet"],
full_metadata=game
))
return results
def get_result_args(self) -> List[str]:
"""GOG results are URLs."""
return ["-url"]
def validate(self) -> bool:
"""GOG Games is a public website."""
return True
class YoutubeSearchProvider(SearchProvider):
"""
Search provider for YouTube using yt-dlp.
"""
RESULT_FIELDS = [
("title", "Title", None),
("uploader", "Uploader", None),
("duration_string", "Duration", None),
("view_count", "Views", lambda x: f"{x:,}" if x else ""),
]
def search(self, query: str, limit: int = 10, filters: Optional[Dict[str, Any]] = None, **kwargs) -> List[SearchResult]:
"""
Search YouTube using yt-dlp.
Args:
query: Search query
limit: Maximum number of results
filters: Optional filtering criteria (ignored for now)
Returns:
List of SearchResult objects
"""
# Check if yt-dlp is available
ytdlp_path = shutil.which("yt-dlp")
if not ytdlp_path:
log("yt-dlp not found in PATH", file=sys.stderr)
return []
# Construct command
# ytsearchN:query searches for N results
search_query = f"ytsearch{limit}:{query}"
cmd = [
ytdlp_path,
"--dump-json",
"--flat-playlist", # Don't resolve video details fully, faster
"--no-warnings",
search_query
]
try:
# Run yt-dlp
# We need to capture stdout. yt-dlp outputs one JSON object per line for search results
process = subprocess.run(
cmd,
capture_output=True,
text=True,
encoding="utf-8",
errors="replace"
)
if process.returncode != 0:
log(f"yt-dlp search failed: {process.stderr}", file=sys.stderr)
return []
results = []
for line in process.stdout.splitlines():
if not line.strip():
continue
try:
data = json.loads(line)
# Extract fields
title = data.get("title", "Unknown Title")
url = data.get("url")
if not url:
# Sometimes flat-playlist gives 'id', construct URL
video_id = data.get("id")
if video_id:
url = f"https://www.youtube.com/watch?v={video_id}"
else:
continue
uploader = data.get("uploader", "Unknown Uploader")
duration = data.get("duration") # seconds
view_count = data.get("view_count")
# Format duration
duration_str = ""
if duration:
try:
m, s = divmod(int(duration), 60)
h, m = divmod(m, 60)
if h > 0:
duration_str = f"{h}:{m:02d}:{s:02d}"
else:
duration_str = f"{m}:{s:02d}"
except (ValueError, TypeError):
pass
# Create annotations
annotations = []
if duration_str:
annotations.append(duration_str)
if view_count:
# Simple format for views
try:
vc = int(view_count)
if vc >= 1000000:
views_str = f"{vc/1000000:.1f}M views"
elif vc >= 1000:
views_str = f"{vc/1000:.1f}K views"
else:
views_str = f"{vc} views"
annotations.append(views_str)
except (ValueError, TypeError):
pass
annotations.append("youtube")
# Create result
result = SearchResult(
origin="youtube",
title=title,
target=url,
detail=f"by {uploader}",
annotations=annotations,
media_kind="video",
full_metadata=data,
columns=[
("Title", title),
("Uploader", uploader),
("Duration", duration_str),
("Views", str(view_count) if view_count else "")
]
)
results.append(result)
except json.JSONDecodeError:
continue
return results
except Exception as e:
log(f"Error running yt-dlp: {e}", file=sys.stderr)
return []
def get_result_args(self) -> List[str]:
"""YouTube results are URLs."""
return ["-url"]
def validate(self) -> bool:
"""Check if yt-dlp is installed."""
return shutil.which("yt-dlp") is not None
# Provider registry
_PROVIDERS = {
"local": LocalStorageProvider,
"libgen": LibGenProvider,
"soulseek": SoulSeekProvider,
"debrid": DebridProvider,
"openlibrary": OpenLibraryProvider,
"gog": GogGamesProvider,
"youtube": YoutubeSearchProvider,
}
def get_provider(name: str, config: Optional[Dict[str, Any]] = None) -> Optional[SearchProvider]:
"""
Get a search provider by name.
Args:
name: Provider name (case-insensitive): "local", "libgen", "soulseek", "debrid", "openlibrary"
config: Optional configuration dictionary
Returns:
SearchProvider instance or None if not found
"""
provider_class = _PROVIDERS.get(name.lower())
if provider_class is None:
log(f"Unknown search provider: {name}", file=sys.stderr)
return None
try:
provider = provider_class(config)
if not provider.validate():
log(f"Provider '{name}' is not properly configured or available", file=sys.stderr)
return None
return provider
except Exception as e:
log(f"Error initializing provider '{name}': {e}", file=sys.stderr)
return None
def list_providers(config: Optional[Dict[str, Any]] = None) -> Dict[str, bool]:
"""
List all available providers and whether they're available.
Args:
config: Optional configuration dictionary
Returns:
Dictionary mapping provider names to availability (True/False)
"""
availability = {}
for name, provider_class in _PROVIDERS.items():
try:
provider = provider_class(config)
availability[name] = provider.validate()
except Exception:
availability[name] = False
return availability
def register_provider(name: str, provider_class: type) -> None:
"""
Register a new search provider.
Args:
name: Provider name (lowercase)
provider_class: Class that inherits from SearchProvider
"""
_PROVIDERS[name.lower()] = provider_class
class FileProvider(ABC):
"""Abstract base class for file hosting providers."""
def __init__(self, config: Optional[Dict[str, Any]] = None):
self.config = config or {}
self.name = self.__class__.__name__.replace("FileProvider", "").lower()
@abstractmethod
2025-11-27 10:59:01 -08:00
def upload(self, file_path: str, **kwargs: Any) -> str:
2025-11-25 20:09:33 -08:00
"""Upload a file and return the URL."""
pass
def validate(self) -> bool:
"""Check if provider is available/configured."""
return True
class ZeroXZeroFileProvider(FileProvider):
"""File provider for 0x0.st."""
def __init__(self, config: Optional[Dict[str, Any]] = None):
super().__init__(config)
self.name = "0x0"
self.base_url = "https://0x0.st"
2025-11-27 10:59:01 -08:00
def upload(self, file_path: str, **kwargs: Any) -> str:
2025-11-25 20:09:33 -08:00
"""Upload file to 0x0.st."""
from helper.http_client import HTTPClient
import os
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
try:
# 0x0.st expects 'file' field in multipart/form-data
# Use a custom User-Agent to avoid 403 Forbidden
headers = {"User-Agent": "Medeia-Macina/1.0"}
with HTTPClient(headers=headers) as client:
with open(file_path, 'rb') as f:
files = {'file': f}
response = client.post(self.base_url, files=files)
if response.status_code == 200:
return response.text.strip()
else:
raise Exception(f"Upload failed: {response.status_code} - {response.text}")
except Exception as e:
log(f"[0x0] Upload error: {e}", file=sys.stderr)
raise
def validate(self) -> bool:
return True
2025-11-27 10:59:01 -08:00
class MatrixFileProvider(FileProvider):
"""File provider for Matrix (Element) chat rooms."""
def __init__(self, config: Optional[Dict[str, Any]] = None):
super().__init__(config)
self.name = "matrix"
def validate(self) -> bool:
"""Check if Matrix is configured."""
if not self.config: return False
matrix_conf = self.config.get('storage', {}).get('matrix', {})
return bool(matrix_conf.get('homeserver') and matrix_conf.get('room_id') and (matrix_conf.get('access_token') or matrix_conf.get('password')))
def upload(self, file_path: str, **kwargs: Any) -> str:
"""Upload file to Matrix room."""
import requests
import mimetypes
from pathlib import Path
import json
debug(f"[Matrix] Starting upload for: {file_path}")
debug(f"[Matrix] kwargs: {kwargs}")
path = Path(file_path)
if not path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
matrix_conf = self.config.get('storage', {}).get('matrix', {})
homeserver = matrix_conf.get('homeserver')
access_token = matrix_conf.get('access_token')
room_id = matrix_conf.get('room_id')
if not homeserver.startswith('http'):
homeserver = f"https://{homeserver}"
# 1. Upload Media
# Use v3 API
upload_url = f"{homeserver}/_matrix/media/v3/upload"
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/octet-stream"
}
mime_type, _ = mimetypes.guess_type(path)
if mime_type:
headers["Content-Type"] = mime_type
filename = path.name
debug(f"[Matrix] Uploading media to {upload_url} with mime_type: {mime_type}")
with open(path, 'rb') as f:
resp = requests.post(upload_url, headers=headers, data=f, params={"filename": filename})
if resp.status_code != 200:
raise Exception(f"Matrix upload failed: {resp.text}")
content_uri = resp.json().get('content_uri')
if not content_uri:
raise Exception("No content_uri returned from Matrix upload")
debug(f"[Matrix] Media uploaded, content_uri: {content_uri}")
# 2. Send Message
# Use v3 API
send_url = f"{homeserver}/_matrix/client/v3/rooms/{room_id}/send/m.room.message"
# Determine msgtype with better fallback for audio
msgtype = "m.file"
ext = path.suffix.lower()
# Explicit check for common audio extensions to force m.audio
# This prevents audio files being treated as generic files or video
AUDIO_EXTS = {'.mp3', '.flac', '.wav', '.m4a', '.aac', '.ogg', '.opus', '.wma', '.mka', '.alac'}
VIDEO_EXTS = {'.mp4', '.mkv', '.webm', '.mov', '.avi', '.flv', '.mpg', '.mpeg', '.ts', '.m4v', '.wmv'}
IMAGE_EXTS = {'.jpg', '.jpeg', '.png', '.gif', '.webp', '.bmp', '.tiff'}
if ext in AUDIO_EXTS:
msgtype = "m.audio"
elif ext in VIDEO_EXTS:
msgtype = "m.video"
elif ext in IMAGE_EXTS:
msgtype = "m.image"
elif mime_type:
if mime_type.startswith("audio/"): msgtype = "m.audio"
elif mime_type.startswith("video/"): msgtype = "m.video"
elif mime_type.startswith("image/"): msgtype = "m.image"
debug(f"[Matrix] Determined msgtype: {msgtype} (ext: {ext}, mime: {mime_type})")
info = {
"mimetype": mime_type,
"size": path.stat().st_size
}
# Try to get duration for audio/video
if msgtype in ("m.audio", "m.video"):
try:
# Try mutagen first (lightweight)
# Use dynamic import to avoid top-level dependency if not installed
# Note: mutagen.File is available at package level at runtime but type checkers might miss it
import mutagen # type: ignore
m = mutagen.File(str(path)) # type: ignore
if m and m.info and hasattr(m.info, 'length'):
duration_ms = int(m.info.length * 1000)
info['duration'] = duration_ms
debug(f"[Matrix] Extracted duration: {duration_ms}ms")
except Exception as e:
debug(f"[Matrix] Failed to extract duration: {e}")
payload = {
"msgtype": msgtype,
"body": filename,
"url": content_uri,
"info": info
}
debug(f"[Matrix] Sending message payload: {json.dumps(payload, indent=2)}")
resp = requests.post(send_url, headers=headers, json=payload)
if resp.status_code != 200:
raise Exception(f"Matrix send message failed: {resp.text}")
event_id = resp.json().get('event_id')
return f"https://matrix.to/#/{room_id}/{event_id}"
2025-11-25 20:09:33 -08:00
# File provider registry
_FILE_PROVIDERS = {
"0x0": ZeroXZeroFileProvider,
2025-11-27 10:59:01 -08:00
"matrix": MatrixFileProvider,
2025-11-25 20:09:33 -08:00
}
def get_file_provider(name: str, config: Optional[Dict[str, Any]] = None) -> Optional[FileProvider]:
"""
Get a file hosting provider by name.
Args:
name: Provider name (case-insensitive): "0x0"
config: Optional configuration dictionary
Returns:
FileProvider instance or None if not found
"""
provider_class = _FILE_PROVIDERS.get(name.lower())
if provider_class is None:
log(f"Unknown file provider: {name}", file=sys.stderr)
return None
try:
provider = provider_class(config)
if not provider.validate():
log(f"File provider '{name}' is not properly configured or available", file=sys.stderr)
return None
return provider
except Exception as e:
log(f"Error initializing file provider '{name}': {e}", file=sys.stderr)
return None
def list_file_providers(config: Optional[Dict[str, Any]] = None) -> Dict[str, bool]:
"""
List all available file hosting providers and whether they're available.
Args:
config: Optional configuration dictionary
Returns:
Dictionary mapping provider names to availability (True/False)
"""
availability = {}
for name, provider_class in _FILE_PROVIDERS.items():
try:
provider = provider_class(config)
availability[name] = provider.validate()
except Exception:
availability[name] = False
return availability
def register_file_provider(name: str, provider_class: type) -> None:
"""
Register a new file hosting provider.
Args:
name: Provider name (lowercase)
provider_class: Class that inherits from FileProvider
"""
_FILE_PROVIDERS[name.lower()] = provider_class