"""Shared Library Genesis search and download helpers.""" from __future__ import annotations from pathlib import Path from typing import Any, Callable, Dict, Iterable, List, Optional import logging import requests from urllib.parse import quote, urljoin from libgen import search_sync, LibgenError LogFn = Optional[Callable[[str], None]] ErrorFn = Optional[Callable[[str], None]] DEFAULT_TIMEOUT = 10.0 DEFAULT_LIMIT = 50 logging.getLogger(__name__).setLevel(logging.WARNING) def _call(logger: LogFn, message: str) -> None: if logger: logger(message) def search_libgen_no_ads(query: str, session: Optional[requests.Session] = None) -> List[Dict[str, Any]]: """Search Libgen without triggering ads.php requests.""" try: from bs4 import BeautifulSoup except ImportError: # pragma: no cover logging.warning("BeautifulSoup not available; falling back to standard search") return [] mirrors = [ "https://libgen.gl", "https://libgen.vg", "https://libgen.la", "https://libgen.bz", "https://libgen.gs", ] session = session or requests.Session() session.headers.setdefault( "User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", ) for mirror in mirrors: try: search_url = f"{mirror}/index.php?req={quote(query)}&res=100&covers=on&filesuns=all" response = session.get(search_url, timeout=DEFAULT_TIMEOUT) if response.status_code != 200: continue soup = BeautifulSoup(response.content, "html.parser") table = soup.find("table", {"class": "catalog"}) if table is None: for candidate in soup.find_all("table"): rows = candidate.find_all("tr") if len(rows) > 2: table = candidate break if table is None: logging.debug("[libgen_no_ads] No results table on %s", mirror) continue rows = table.find_all("tr")[1:] results: List[Dict[str, Any]] = [] for row in rows: try: cells = row.find_all("td") if len(cells) < 9: continue size_cell = cells[7] file_link = size_cell.find("a") mirror_link = "" if file_link: href = str(file_link.get("href", "")) if href.startswith("/"): mirror_link = mirror + href elif href: mirror_link = urljoin(mirror, href) if not mirror_link: title_link = cells[1].find("a") if len(cells) > 1 else None if title_link: href = str(title_link.get("href", "")) if href.startswith("/"): mirror_link = mirror + href elif href: mirror_link = urljoin(mirror, href) if not mirror_link: continue results.append( { "id": "", "mirror": mirror_link, "cover": "", "title": cells[1].get_text(strip=True) if len(cells) > 1 else "Unknown", "authors": [cells[2].get_text(strip=True)] if len(cells) > 2 else ["Unknown"], "publisher": cells[3].get_text(strip=True) if len(cells) > 3 else "", "year": cells[4].get_text(strip=True) if len(cells) > 4 else "", "pages": cells[6].get_text(strip=True) if len(cells) > 6 else "", "language": cells[5].get_text(strip=True) if len(cells) > 5 else "", "size": cells[7].get_text(strip=True) if len(cells) > 7 else "", "extension": cells[8].get_text(strip=True) if len(cells) > 8 else "", "isbn": "", } ) except Exception as exc: # pragma: no cover - defensive logging.debug("[libgen_no_ads] Error parsing row: %s", exc) continue if results: logging.info("[libgen_no_ads] %d results from %s", len(results), mirror) return results except Exception as exc: # pragma: no cover - mirror issues logging.debug("[libgen_no_ads] Mirror %s failed: %s", mirror, exc) continue return [] def format_book_info(book: Any) -> Dict[str, Any]: """Format Libgen search result into a consistent dictionary.""" filesize_bytes = 0 size_str = getattr(book, "size", "") or "" if size_str: parts = size_str.strip().split() try: value = float(parts[0]) unit = parts[1].upper() if len(parts) > 1 else "B" if unit in {"MB", "M"}: filesize_bytes = int(value * 1024 * 1024) elif unit in {"GB", "G"}: filesize_bytes = int(value * 1024 * 1024 * 1024) elif unit in {"KB", "K"}: filesize_bytes = int(value * 1024) else: filesize_bytes = int(value) except (ValueError, IndexError): # pragma: no cover - defensive filesize_bytes = 0 title = getattr(book, "title", "") or "" isbn = getattr(book, "isbn", "") or "" if not isbn and title: import re match = re.search( r"((?:[\d]{10,13}(?:\s*[;,]\s*[\d]{10,13})+)|(?:[\d]{10,13})(?:\s*[;,]?\s*[\d\-]{0,50})?)\s*(?:\b|$)", title, ) if match: potential_isbn = match.group(0).strip() if re.search(r"\d{10,13}", potential_isbn): isbn = potential_isbn title = re.sub(r"\s+[a-z]\s*$", "", title[: match.start()].strip(), flags=re.IGNORECASE) authors_value = getattr(book, "authors", None) if isinstance(authors_value, Iterable) and not isinstance(authors_value, str): authors_str = ", ".join(str(author) for author in authors_value) else: authors_str = str(authors_value or "Unknown") download_links = getattr(book, "download_links", None) mirror_url = None if download_links and getattr(download_links, "get_link", None): mirror_url = download_links.get_link return { "title": title or "Unknown", "author": authors_str, "publisher": getattr(book, "publisher", "") or "", "year": getattr(book, "year", "") or "", "pages": getattr(book, "pages", "") or "", "language": getattr(book, "language", "") or "", "filesize": filesize_bytes, "filesize_str": size_str or "Unknown", "extension": getattr(book, "extension", "") or "", "isbn": isbn, "mirror_url": mirror_url, } def search_libgen( query: str, limit: int = DEFAULT_LIMIT, *, log_info: LogFn = None, log_error: ErrorFn = None, session: Optional[requests.Session] = None, ) -> List[Dict[str, Any]]: """Search Libgen returning formatted dictionaries with multiple mirrors. Uses HTML scraper (search_libgen_no_ads) to find books quickly. Returns mirror URLs and book IDs that can be used to generate alternative mirrors. """ try: _call(log_info, f"[search] Searching Libgen for: {query}") session = session or requests.Session() # Use HTML scraper - more reliable and doesn't hang on mirror resolution _call(log_info, "[search] Using HTML scraper (search_libgen_no_ads)...") results: List[Any] = search_libgen_no_ads(query, session=session) if not results: _call(log_info, "[search] No results from HTML scraper") return [] formatted: List[Dict[str, Any]] = [] mirrors_list = [ "https://libgen.gl", "https://libgen.vg", "https://libgen.la", "https://libgen.bz", "https://libgen.gs", ] for book in results[:limit]: if isinstance(book, dict): # Result from search_libgen_no_ads (HTML scraper) authors = book.get("authors", ["Unknown"]) if isinstance(authors, list): author_value = ", ".join(str(a) for a in authors) else: author_value = str(authors) # Extract book ID from mirror URL if available mirror = book.get("mirror", "") book_id = "" if mirror and "/file.php?id=" in mirror: try: book_id = mirror.split("/file.php?id=")[1].split("&")[0] except (IndexError, ValueError): pass # Build list of alternative mirrors based on book ID mirrors_dict = {} if book_id: for mirror_base in mirrors_list: mirrors_dict[mirror_base] = f"{mirror_base}/file.php?id={book_id}" elif mirror: # Fallback: use the mirror we found mirrors_dict["primary"] = mirror formatted.append( { "title": book.get("title", "Unknown"), "author": author_value, "publisher": book.get("publisher", ""), "year": book.get("year", ""), "pages": book.get("pages", ""), "language": book.get("language", ""), "filesize": 0, "filesize_str": book.get("size", "Unknown"), "extension": book.get("extension", ""), "isbn": book.get("isbn", ""), "mirror_url": mirror, # Primary mirror "mirrors": mirrors_dict, # Alternative mirrors "book_id": book_id, } ) else: # Fallback: try to format as book object try: formatted.append(format_book_info(book)) except Exception: pass _call(log_info, f"[search] Found {len(formatted)} result(s)") return formatted except LibgenError as exc: _call(log_error, f"[search] Libgen error: {exc}") return [] except Exception as exc: # pragma: no cover - defensive _call(log_error, f"[search] Error: {exc}") return [] def download_from_mirror( mirror_url: str, output_path: str | Path, *, log_info: LogFn = None, log_error: ErrorFn = None, session: Optional[requests.Session] = None, ) -> bool: """Download a Libgen file and write it to disk. Handles Libgen redirects and ensures proper file download by: - Following all redirects (default behavior) - Setting User-Agent header (required by some mirrors) - Validating that we're downloading binary content, not HTML - Attempting alternative download method if HTML is returned """ session = session or requests.Session() try: output_path = Path(output_path) output_path.parent.mkdir(parents=True, exist_ok=True) _call(log_info, f"[download] Downloading from mirror: {mirror_url}") # Ensure session has proper headers for Libgen if 'User-Agent' not in session.headers: session.headers['User-Agent'] = ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" ) # Download with redirects enabled (default) and referer session.headers['Referer'] = 'https://libgen.gs/' response = session.get(mirror_url, stream=True, timeout=30, allow_redirects=True) response.raise_for_status() # Check if we got HTML instead of a file (common Libgen issue) content_type = response.headers.get('content-type', '').lower() if 'text/html' in content_type: _call(log_error, f"[download] Server returned HTML. Trying alternative method...") # Try to extract file ID and use alternative CDN try: # Parse the HTML to extract MD5 or file ID from bs4 import BeautifulSoup soup = BeautifulSoup(response.text, 'html.parser') # Look for download link in the HTML # Common patterns: md5 hash in form, or direct link in anchor tags download_link = None # Try to find forms that might contain download functionality forms = soup.find_all('form') for form in forms: action = form.get('action', '') if 'download' in action.lower() or 'get' in action.lower(): download_link = action break if not download_link: _call(log_error, f"[download] Could not extract alternative download link from HTML") return False _call(log_info, f"[download] Using alternative download method: {download_link[:100]}") # Try downloading from alternative link response2 = session.get(download_link, stream=True, timeout=30, allow_redirects=True) response2.raise_for_status() response = response2 # Use the new response except Exception as alt_error: _call(log_error, f"[download] Alternative method failed: {alt_error}") return False total_size = int(response.headers.get("content-length", 0)) downloaded = 0 with open(output_path, "wb") as handle: for chunk in response.iter_content(chunk_size=8192): if not chunk: continue handle.write(chunk) downloaded += len(chunk) if total_size > 0: percent = downloaded / total_size * 100 _call( log_info, f"[download] {percent:.1f}% - {downloaded // (1024*1024)}MB / {total_size // (1024*1024)}MB", ) _call(log_info, f"[download] Downloaded successfully to: {output_path}") return True except Exception as exc: # pragma: no cover - defensive _call(log_error, f"[download] Error: {exc}") return False