"""Shared Library Genesis search and download helpers. Replaces the old libgen backend with a robust scraper based on libgen-api-enhanced logic. Targets libgen.is/rs/st mirrors and parses the results table directly. """ from __future__ import annotations import logging import re import requests from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Tuple from urllib.parse import quote, urljoin, urlparse, unquote # Optional dependencies try: from bs4 import BeautifulSoup except ImportError: BeautifulSoup = None LogFn = Optional[Callable[[str], None]] ErrorFn = Optional[Callable[[str], None]] DEFAULT_TIMEOUT = 20.0 DEFAULT_LIMIT = 50 # Mirrors to try in order MIRRORS = [ "https://libgen.is", "https://libgen.rs", "https://libgen.st", "http://libgen.is", "http://libgen.rs", "http://libgen.st", "https://libgen.li", # Different structure, fallback "http://libgen.li", "https://libgen.gl", # Different structure, fallback "http://libgen.gl", ] logging.getLogger(__name__).setLevel(logging.INFO) def _call(logger: LogFn, message: str) -> None: if logger: logger(message) class LibgenSearch: """Robust LibGen searcher.""" def __init__(self, session: Optional[requests.Session] = None): self.session = session or requests.Session() self.session.headers.update({ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" }) def search(self, query: str, limit: int = DEFAULT_LIMIT) -> List[Dict[str, Any]]: """Search LibGen mirrors.""" if not BeautifulSoup: logging.error("BeautifulSoup not installed. Cannot search LibGen.") return [] for mirror in MIRRORS: try: if "libgen.li" in mirror or "libgen.gl" in mirror: results = self._search_libgen_li(mirror, query, limit) else: results = self._search_libgen_rs(mirror, query, limit) if results: return results except Exception as e: logging.debug(f"Mirror {mirror} failed: {e}") continue return [] def _search_libgen_rs(self, mirror: str, query: str, limit: int) -> List[Dict[str, Any]]: """Search libgen.rs/is/st style mirrors.""" # Search URL: /search.php?req=QUERY&res=100&column=def url = f"{mirror}/search.php" params = { "req": query, "res": 100, # Request more to filter later "column": "def", "open": 0, "view": "simple", "phrase": 1, } resp = self.session.get(url, params=params, timeout=DEFAULT_TIMEOUT) resp.raise_for_status() soup = BeautifulSoup(resp.text, "html.parser") # Find the table with results. usually class 'c' table = soup.find("table", {"class": "c"}) if not table: # Try finding by structure (table with many rows) tables = soup.find_all("table") for t in tables: if len(t.find_all("tr")) > 5: table = t break if not table: return [] results = [] # Skip header row rows = table.find_all("tr")[1:] for row in rows: cols = row.find_all("td") if len(cols) < 9: continue # Columns: # 0: ID # 1: Author(s) # 2: Title # 3: Publisher # 4: Year # 5: Pages # 6: Language # 7: Size # 8: Extension # 9+: Mirrors try: libgen_id = cols[0].get_text(strip=True) authors = [a.get_text(strip=True) for a in cols[1].find_all("a")] if not authors: authors = [cols[1].get_text(strip=True)] title_tag = cols[2].find("a") title = title_tag.get_text(strip=True) if title_tag else cols[2].get_text(strip=True) # Extract MD5 from title link if possible (often in href) # href='book/index.php?md5=...' md5 = "" if title_tag and title_tag.has_attr("href"): href = title_tag["href"] match = re.search(r"md5=([a-fA-F0-9]{32})", href) if match: md5 = match.group(1) publisher = cols[3].get_text(strip=True) year = cols[4].get_text(strip=True) pages = cols[5].get_text(strip=True) language = cols[6].get_text(strip=True) size = cols[7].get_text(strip=True) extension = cols[8].get_text(strip=True) # Mirrors # Usually col 9 is http://library.lol/main/MD5 mirror_links = [] for i in range(9, len(cols)): a = cols[i].find("a") if a and a.has_attr("href"): mirror_links.append(a["href"]) # Construct direct download page link (library.lol) # If we have MD5, we can guess it: http://library.lol/main/{md5} if md5: download_link = f"http://library.lol/main/{md5}" elif mirror_links: download_link = mirror_links[0] else: download_link = "" results.append({ "id": libgen_id, "title": title, "author": ", ".join(authors), "publisher": publisher, "year": year, "pages": pages, "language": language, "filesize_str": size, "extension": extension, "md5": md5, "mirror_url": download_link, "cover": "", # Could extract from hover if needed }) if len(results) >= limit: break except Exception as e: logging.debug(f"Error parsing row: {e}") continue return results def _search_libgen_li(self, mirror: str, query: str, limit: int) -> List[Dict[str, Any]]: """Search libgen.li/gl style mirrors.""" # Search URL: /index.php?req=QUERY&columns[]=t&columns[]=a... url = f"{mirror}/index.php" params = { "req": query, "res": 100, "covers": "on", "filesuns": "all", } resp = self.session.get(url, params=params, timeout=DEFAULT_TIMEOUT) resp.raise_for_status() soup = BeautifulSoup(resp.text, "html.parser") table = soup.find("table", {"id": "tablelibgen"}) if not table: table = soup.find("table", {"class": "table table-striped"}) if not table: return [] results = [] rows = table.find_all("tr")[1:] for row in rows: cols = row.find_all("td") if len(cols) < 9: continue try: # Structure is different # 0: Cover # 1: Title (with link to file.php?id=...) # 2: Author # 3: Publisher # 4: Year # 5: Language # 6: Pages # 7: Size # 8: Extension # 9: Mirrors title_col = cols[1] title_link = title_col.find("a") title = title_link.get_text(strip=True) if title_link else title_col.get_text(strip=True) # Extract ID from link libgen_id = "" if title_link and title_link.has_attr("href"): href = title_link["href"] # href is usually "file.php?id=..." or "edition.php?id=..." match = re.search(r"id=(\d+)", href) if match: libgen_id = match.group(1) authors = cols[2].get_text(strip=True) publisher = cols[3].get_text(strip=True) year = cols[4].get_text(strip=True) language = cols[5].get_text(strip=True) pages = cols[6].get_text(strip=True) size = cols[7].get_text(strip=True) extension = cols[8].get_text(strip=True) # Mirror link # Usually in col 9 or title link mirror_url = "" if title_link: href = title_link["href"] if href.startswith("/"): mirror_url = mirror + href else: mirror_url = urljoin(mirror, href) results.append({ "id": libgen_id, "title": title, "author": authors, "publisher": publisher, "year": year, "pages": pages, "language": language, "filesize_str": size, "extension": extension, "md5": "", # .li doesn't show MD5 easily in table "mirror_url": mirror_url, }) if len(results) >= limit: break except Exception: continue return results def search_libgen( query: str, limit: int = DEFAULT_LIMIT, *, log_info: LogFn = None, log_error: ErrorFn = None, session: Optional[requests.Session] = None, ) -> List[Dict[str, Any]]: """Search Libgen using the robust scraper.""" searcher = LibgenSearch(session=session) try: results = searcher.search(query, limit=limit) _call(log_info, f"[libgen] Found {len(results)} results") return results except Exception as e: _call(log_error, f"[libgen] Search failed: {e}") return [] def _resolve_download_url( session: requests.Session, url: str, log_info: LogFn = None ) -> Optional[str]: """Resolve the final download URL by following the LibGen chain.""" current_url = url visited = set() # Max hops to prevent infinite loops for _ in range(6): if current_url in visited: break visited.add(current_url) _call(log_info, f"[resolve] Checking: {current_url}") # Simple heuristic: if it looks like a file, return it if current_url.lower().endswith(('.pdf', '.epub', '.mobi', '.djvu', '.azw3', '.cbz', '.cbr')): return current_url try: # Use HEAD first to check content type if possible, but some mirrors block HEAD or return 405 # So we'll just GET with stream=True to peek headers/content without downloading everything with session.get(current_url, stream=True, timeout=30) as resp: resp.raise_for_status() ct = resp.headers.get("Content-Type", "").lower() if "text/html" not in ct: # It's a binary file return current_url # It's HTML, read content content = resp.text except Exception as e: _call(log_info, f"[resolve] Failed to fetch {current_url}: {e}") return None soup = BeautifulSoup(content, "html.parser") # 1. Check for "GET" link (library.lol / ads.php style) # Usually

GET

inside or just text "GET" get_link = soup.find("a", string=re.compile(r"^GET$", re.IGNORECASE)) if not get_link: # Try finding containing

GET

h2_get = soup.find("h2", string=re.compile(r"^GET$", re.IGNORECASE)) if h2_get and h2_get.parent.name == "a": get_link = h2_get.parent if get_link and get_link.has_attr("href"): return urljoin(current_url, get_link["href"]) # 2. Check for "series.php" -> "edition.php" if "series.php" in current_url: # Find first edition link edition_link = soup.find("a", href=re.compile(r"edition\.php")) if edition_link: current_url = urljoin(current_url, edition_link["href"]) continue # 3. Check for "edition.php" -> "file.php" if "edition.php" in current_url: file_link = soup.find("a", href=re.compile(r"file\.php")) if file_link: current_url = urljoin(current_url, file_link["href"]) continue # 4. Check for "file.php" -> "ads.php" (Libgen badge) if "file.php" in current_url: # Look for link with title="libgen" or text "Libgen" libgen_link = soup.find("a", title="libgen") if not libgen_link: libgen_link = soup.find("a", string=re.compile(r"Libgen", re.IGNORECASE)) if libgen_link and libgen_link.has_attr("href"): current_url = urljoin(current_url, libgen_link["href"]) continue # 5. Check for "ads.php" -> "get.php" (Fallback if GET link logic above failed) if "ads.php" in current_url: get_php_link = soup.find("a", href=re.compile(r"get\.php")) if get_php_link: return urljoin(current_url, get_php_link["href"]) # 6. Library.lol / generic fallback for text in ["Cloudflare", "IPFS.io", "Infura"]: link = soup.find("a", string=re.compile(text, re.IGNORECASE)) if link and link.has_attr("href"): return urljoin(current_url, link["href"]) # If we found nothing new, stop break return None def _guess_filename_extension(download_url: str, headers: Dict[str, str]) -> Optional[str]: """Guess the file extension from headers or the download URL.""" content_disposition = headers.get("content-disposition", "") if content_disposition: match = re.search(r'filename\*?=(?:UTF-8\'\'|"?)([^";]+)', content_disposition, flags=re.IGNORECASE) if match: filename = unquote(match.group(1).strip('"')) suffix = Path(filename).suffix if suffix: return suffix.lstrip('.') parsed = urlparse(download_url) suffix = Path(parsed.path).suffix if suffix: return suffix.lstrip('.') content_type = headers.get('content-type', '').lower() mime_map = { 'application/pdf': 'pdf', 'application/epub+zip': 'epub', 'application/x-mobipocket-ebook': 'mobi', 'application/x-cbr': 'cbr', 'application/x-cbz': 'cbz', 'application/zip': 'zip', } for mime, ext in mime_map.items(): if mime in content_type: return ext return None def _apply_extension(path: Path, extension: Optional[str]) -> Path: """Rename the path to match the detected extension, if needed.""" if not extension: return path suffix = extension if extension.startswith('.') else f'.{extension}' if path.suffix.lower() == suffix.lower(): return path candidate = path.with_suffix(suffix) base_stem = path.stem counter = 1 while candidate.exists() and counter < 100: candidate = path.with_name(f"{base_stem}({counter}){suffix}") counter += 1 try: path.replace(candidate) return candidate except Exception: return path def download_from_mirror( mirror_url: str, output_path: Path, *, log_info: LogFn = None, log_error: ErrorFn = None, session: Optional[requests.Session] = None, progress_callback: Optional[Callable[[int, int], None]] = None, ) -> Tuple[bool, Optional[Path]]: """Download file from a LibGen mirror URL with optional progress tracking.""" session = session or requests.Session() output_path = Path(output_path) output_path.parent.mkdir(parents=True, exist_ok=True) try: _call(log_info, f"[download] Resolving download link from: {mirror_url}") download_url = _resolve_download_url(session, mirror_url, log_info) if not download_url: _call(log_error, "[download] Could not find direct download link") return False, None _call(log_info, f"[download] Downloading from: {download_url}") downloaded = 0 total_size = 0 headers: Dict[str, str] = {} with session.get(download_url, stream=True, timeout=60) as r: r.raise_for_status() headers = dict(r.headers) # Verify it's not HTML (error page) ct = headers.get("content-type", "").lower() if "text/html" in ct: _call(log_error, "[download] Final URL returned HTML, not a file.") return False, None total_size = int(headers.get("content-length", 0) or 0) with open(output_path, "wb") as f: for chunk in r.iter_content(chunk_size=8192): if chunk: f.write(chunk) downloaded += len(chunk) if progress_callback: progress_callback(downloaded, total_size) final_extension = _guess_filename_extension(download_url, headers) final_path = _apply_extension(output_path, final_extension) if progress_callback and total_size > 0: progress_callback(downloaded, total_size) _call(log_info, f"[download] Saved to {final_path}") return True, final_path except Exception as e: _call(log_error, f"[download] Download failed: {e}") return False, None