Files
Medios-Macina/helper/libgen_service.py

378 lines
15 KiB
Python
Raw Permalink Normal View History

2025-11-25 20:09:33 -08:00
"""Shared Library Genesis search and download helpers."""
from __future__ import annotations
from pathlib import Path
from typing import Any, Callable, Dict, Iterable, List, Optional
import logging
import requests
from urllib.parse import quote, urljoin
from libgen import search_sync, LibgenError
LogFn = Optional[Callable[[str], None]]
ErrorFn = Optional[Callable[[str], None]]
DEFAULT_TIMEOUT = 10.0
DEFAULT_LIMIT = 50
logging.getLogger(__name__).setLevel(logging.WARNING)
def _call(logger: LogFn, message: str) -> None:
if logger:
logger(message)
def search_libgen_no_ads(query: str, session: Optional[requests.Session] = None) -> List[Dict[str, Any]]:
"""Search Libgen without triggering ads.php requests."""
try:
from bs4 import BeautifulSoup
except ImportError: # pragma: no cover
logging.warning("BeautifulSoup not available; falling back to standard search")
return []
mirrors = [
"https://libgen.gl",
"https://libgen.vg",
"https://libgen.la",
"https://libgen.bz",
"https://libgen.gs",
]
session = session or requests.Session()
session.headers.setdefault(
"User-Agent",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
)
for mirror in mirrors:
try:
search_url = f"{mirror}/index.php?req={quote(query)}&res=100&covers=on&filesuns=all"
response = session.get(search_url, timeout=DEFAULT_TIMEOUT)
if response.status_code != 200:
continue
soup = BeautifulSoup(response.content, "html.parser")
table = soup.find("table", {"class": "catalog"})
if table is None:
for candidate in soup.find_all("table"):
rows = candidate.find_all("tr")
if len(rows) > 2:
table = candidate
break
if table is None:
logging.debug("[libgen_no_ads] No results table on %s", mirror)
continue
rows = table.find_all("tr")[1:]
results: List[Dict[str, Any]] = []
for row in rows:
try:
cells = row.find_all("td")
if len(cells) < 9:
continue
size_cell = cells[7]
file_link = size_cell.find("a")
mirror_link = ""
if file_link:
href = str(file_link.get("href", ""))
if href.startswith("/"):
mirror_link = mirror + href
elif href:
mirror_link = urljoin(mirror, href)
if not mirror_link:
title_link = cells[1].find("a") if len(cells) > 1 else None
if title_link:
href = str(title_link.get("href", ""))
if href.startswith("/"):
mirror_link = mirror + href
elif href:
mirror_link = urljoin(mirror, href)
if not mirror_link:
continue
results.append(
{
"id": "",
"mirror": mirror_link,
"cover": "",
"title": cells[1].get_text(strip=True) if len(cells) > 1 else "Unknown",
"authors": [cells[2].get_text(strip=True)]
if len(cells) > 2
else ["Unknown"],
"publisher": cells[3].get_text(strip=True) if len(cells) > 3 else "",
"year": cells[4].get_text(strip=True) if len(cells) > 4 else "",
"pages": cells[6].get_text(strip=True) if len(cells) > 6 else "",
"language": cells[5].get_text(strip=True) if len(cells) > 5 else "",
"size": cells[7].get_text(strip=True) if len(cells) > 7 else "",
"extension": cells[8].get_text(strip=True) if len(cells) > 8 else "",
"isbn": "",
}
)
except Exception as exc: # pragma: no cover - defensive
logging.debug("[libgen_no_ads] Error parsing row: %s", exc)
continue
if results:
logging.info("[libgen_no_ads] %d results from %s", len(results), mirror)
return results
except Exception as exc: # pragma: no cover - mirror issues
logging.debug("[libgen_no_ads] Mirror %s failed: %s", mirror, exc)
continue
return []
def format_book_info(book: Any) -> Dict[str, Any]:
"""Format Libgen search result into a consistent dictionary."""
filesize_bytes = 0
size_str = getattr(book, "size", "") or ""
if size_str:
parts = size_str.strip().split()
try:
value = float(parts[0])
unit = parts[1].upper() if len(parts) > 1 else "B"
if unit in {"MB", "M"}:
filesize_bytes = int(value * 1024 * 1024)
elif unit in {"GB", "G"}:
filesize_bytes = int(value * 1024 * 1024 * 1024)
elif unit in {"KB", "K"}:
filesize_bytes = int(value * 1024)
else:
filesize_bytes = int(value)
except (ValueError, IndexError): # pragma: no cover - defensive
filesize_bytes = 0
title = getattr(book, "title", "") or ""
isbn = getattr(book, "isbn", "") or ""
if not isbn and title:
import re
match = re.search(
r"((?:[\d]{10,13}(?:\s*[;,]\s*[\d]{10,13})+)|(?:[\d]{10,13})(?:\s*[;,]?\s*[\d\-]{0,50})?)\s*(?:\b|$)",
title,
)
if match:
potential_isbn = match.group(0).strip()
if re.search(r"\d{10,13}", potential_isbn):
isbn = potential_isbn
title = re.sub(r"\s+[a-z]\s*$", "", title[: match.start()].strip(), flags=re.IGNORECASE)
authors_value = getattr(book, "authors", None)
if isinstance(authors_value, Iterable) and not isinstance(authors_value, str):
authors_str = ", ".join(str(author) for author in authors_value)
else:
authors_str = str(authors_value or "Unknown")
download_links = getattr(book, "download_links", None)
mirror_url = None
if download_links and getattr(download_links, "get_link", None):
mirror_url = download_links.get_link
return {
"title": title or "Unknown",
"author": authors_str,
"publisher": getattr(book, "publisher", "") or "",
"year": getattr(book, "year", "") or "",
"pages": getattr(book, "pages", "") or "",
"language": getattr(book, "language", "") or "",
"filesize": filesize_bytes,
"filesize_str": size_str or "Unknown",
"extension": getattr(book, "extension", "") or "",
"isbn": isbn,
"mirror_url": mirror_url,
}
def search_libgen(
query: str,
limit: int = DEFAULT_LIMIT,
*,
log_info: LogFn = None,
log_error: ErrorFn = None,
session: Optional[requests.Session] = None,
) -> List[Dict[str, Any]]:
"""Search Libgen returning formatted dictionaries with multiple mirrors.
Uses HTML scraper (search_libgen_no_ads) to find books quickly.
Returns mirror URLs and book IDs that can be used to generate alternative mirrors.
"""
try:
_call(log_info, f"[search] Searching Libgen for: {query}")
session = session or requests.Session()
# Use HTML scraper - more reliable and doesn't hang on mirror resolution
_call(log_info, "[search] Using HTML scraper (search_libgen_no_ads)...")
results: List[Any] = search_libgen_no_ads(query, session=session)
if not results:
_call(log_info, "[search] No results from HTML scraper")
return []
formatted: List[Dict[str, Any]] = []
mirrors_list = [
"https://libgen.gl",
"https://libgen.vg",
"https://libgen.la",
"https://libgen.bz",
"https://libgen.gs",
]
for book in results[:limit]:
if isinstance(book, dict):
# Result from search_libgen_no_ads (HTML scraper)
authors = book.get("authors", ["Unknown"])
if isinstance(authors, list):
author_value = ", ".join(str(a) for a in authors)
else:
author_value = str(authors)
# Extract book ID from mirror URL if available
mirror = book.get("mirror", "")
book_id = ""
if mirror and "/file.php?id=" in mirror:
try:
book_id = mirror.split("/file.php?id=")[1].split("&")[0]
except (IndexError, ValueError):
pass
# Build list of alternative mirrors based on book ID
mirrors_dict = {}
if book_id:
for mirror_base in mirrors_list:
mirrors_dict[mirror_base] = f"{mirror_base}/file.php?id={book_id}"
elif mirror:
# Fallback: use the mirror we found
mirrors_dict["primary"] = mirror
formatted.append(
{
"title": book.get("title", "Unknown"),
"author": author_value,
"publisher": book.get("publisher", ""),
"year": book.get("year", ""),
"pages": book.get("pages", ""),
"language": book.get("language", ""),
"filesize": 0,
"filesize_str": book.get("size", "Unknown"),
"extension": book.get("extension", ""),
"isbn": book.get("isbn", ""),
"mirror_url": mirror, # Primary mirror
"mirrors": mirrors_dict, # Alternative mirrors
"book_id": book_id,
}
)
else:
# Fallback: try to format as book object
try:
formatted.append(format_book_info(book))
except Exception:
pass
_call(log_info, f"[search] Found {len(formatted)} result(s)")
return formatted
except LibgenError as exc:
_call(log_error, f"[search] Libgen error: {exc}")
return []
except Exception as exc: # pragma: no cover - defensive
_call(log_error, f"[search] Error: {exc}")
return []
def download_from_mirror(
mirror_url: str,
output_path: str | Path,
*,
log_info: LogFn = None,
log_error: ErrorFn = None,
session: Optional[requests.Session] = None,
) -> bool:
"""Download a Libgen file and write it to disk.
Handles Libgen redirects and ensures proper file download by:
- Following all redirects (default behavior)
- Setting User-Agent header (required by some mirrors)
- Validating that we're downloading binary content, not HTML
- Attempting alternative download method if HTML is returned
"""
session = session or requests.Session()
try:
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
_call(log_info, f"[download] Downloading from mirror: {mirror_url}")
# Ensure session has proper headers for Libgen
if 'User-Agent' not in session.headers:
session.headers['User-Agent'] = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
)
# Download with redirects enabled (default) and referer
session.headers['Referer'] = 'https://libgen.gs/'
response = session.get(mirror_url, stream=True, timeout=30, allow_redirects=True)
response.raise_for_status()
# Check if we got HTML instead of a file (common Libgen issue)
content_type = response.headers.get('content-type', '').lower()
if 'text/html' in content_type:
_call(log_error, f"[download] Server returned HTML. Trying alternative method...")
# Try to extract file ID and use alternative CDN
try:
# Parse the HTML to extract MD5 or file ID
from bs4 import BeautifulSoup
soup = BeautifulSoup(response.text, 'html.parser')
# Look for download link in the HTML
# Common patterns: md5 hash in form, or direct link in anchor tags
download_link = None
# Try to find forms that might contain download functionality
forms = soup.find_all('form')
for form in forms:
action = form.get('action', '')
if 'download' in action.lower() or 'get' in action.lower():
download_link = action
break
if not download_link:
_call(log_error, f"[download] Could not extract alternative download link from HTML")
return False
_call(log_info, f"[download] Using alternative download method: {download_link[:100]}")
# Try downloading from alternative link
response2 = session.get(download_link, stream=True, timeout=30, allow_redirects=True)
response2.raise_for_status()
response = response2 # Use the new response
except Exception as alt_error:
_call(log_error, f"[download] Alternative method failed: {alt_error}")
return False
total_size = int(response.headers.get("content-length", 0))
downloaded = 0
with open(output_path, "wb") as handle:
for chunk in response.iter_content(chunk_size=8192):
if not chunk:
continue
handle.write(chunk)
downloaded += len(chunk)
if total_size > 0:
percent = downloaded / total_size * 100
_call(
log_info,
f"[download] {percent:.1f}% - {downloaded // (1024*1024)}MB / {total_size // (1024*1024)}MB",
)
_call(log_info, f"[download] Downloaded successfully to: {output_path}")
return True
except Exception as exc: # pragma: no cover - defensive
_call(log_error, f"[download] Error: {exc}")
return False