jkjnkjkllkjjk

This commit is contained in:
nose
2025-11-30 11:39:04 -08:00
parent ed417c8200
commit 7a13af9a1f
15 changed files with 1150 additions and 363 deletions

View File

@@ -75,6 +75,11 @@ def credential_openlibrary(config: Dict[str, Any]) -> Tuple[Optional[str], Optio
return email, password
class BookNotAvailableError(Exception):
"""Raised when a book is not available for borrowing (waitlisted/in use)."""
pass
def display_error(response: requests.Response, message: str) -> None:
"""Display error and exit."""
log(message, file=sys.stderr)
@@ -133,9 +138,11 @@ def loan(session: requests.Session, book_id: str, verbose: bool = True) -> reque
if response.status_code == 400:
try:
if response.json()["error"] == "This book is not available to borrow at this time. Please try again later.":
debug("This book doesn't need to be borrowed")
return session
debug("Book is not available for borrowing (waitlisted or in use)")
raise BookNotAvailableError("Book is waitlisted or in use")
display_error(response, "Something went wrong when trying to borrow the book.")
except BookNotAvailableError:
raise
except:
display_error(response, "The book cannot be borrowed")
@@ -182,11 +189,21 @@ def get_book_infos(session: requests.Session, url: str) -> Tuple[str, List[str],
# Try to extract the infos URL from the response
try:
# Look for the "url" field in the response
if '"url":"' not in r:
raise ValueError("No 'url' field found in response")
infos_url = "https:" + r.split('"url":"')[1].split('"')[0].replace("\\u0026", "&")
except (IndexError, ValueError) as e:
# Look for the "url" field in the response using regex
# Matches "url":"//archive.org/..."
import re
match = re.search(r'"url"\s*:\s*"([^"]+)"', r)
if not match:
raise ValueError("No 'url' field found in response")
url_path = match.group(1)
if url_path.startswith("//"):
infos_url = "https:" + url_path
else:
infos_url = url_path
infos_url = infos_url.replace("\\u0026", "&")
except (IndexError, ValueError, AttributeError) as e:
# If URL extraction fails, raise with better error message
raise RuntimeError(f"Failed to extract book info URL from response: {e}")

View File

@@ -27,6 +27,7 @@ import requests
import re
from helper.logger import log, debug
from helper.utils_constant import mime_maps
class StorageBackend(ABC):
@@ -707,6 +708,18 @@ class HydrusStorageBackend(StorageBackend):
if title != f"Hydrus File {file_id}":
break
# Resolve extension from MIME type
mime_type = meta.get("mime")
ext = ""
if mime_type:
for category in mime_maps.values():
for ext_key, info in category.items():
if mime_type in info.get("mimes", []):
ext = info.get("ext", "").lstrip('.')
break
if ext:
break
# Filter results based on query type
# If user provided explicit namespace (has ':'), don't do substring filtering
# Just include what the tag search returned
@@ -726,7 +739,8 @@ class HydrusStorageBackend(StorageBackend):
"origin": "hydrus",
"tags": all_tags,
"file_id": file_id,
"mime": meta.get("mime"),
"mime": mime_type,
"ext": ext,
})
else:
# Free-form search: check if search terms match the title or tags
@@ -758,7 +772,8 @@ class HydrusStorageBackend(StorageBackend):
"origin": "hydrus",
"tags": all_tags,
"file_id": file_id,
"mime": meta.get("mime"),
"mime": mime_type,
"ext": ext,
})
debug(f"Found {len(results)} result(s)")
@@ -971,6 +986,60 @@ class MatrixStorageBackend(StorageBackend):
def get_name(self) -> str:
return "matrix"
def list_rooms(self, config: Dict[str, Any]) -> List[Dict[str, Any]]:
"""List joined rooms with their names."""
matrix_conf = config.get('storage', {}).get('matrix', {})
homeserver = matrix_conf.get('homeserver')
access_token = matrix_conf.get('access_token')
if not homeserver or not access_token:
return []
if not homeserver.startswith('http'):
homeserver = f"https://{homeserver}"
headers = {"Authorization": f"Bearer {access_token}"}
try:
# Get joined rooms
resp = requests.get(f"{homeserver}/_matrix/client/v3/joined_rooms", headers=headers, timeout=10)
if resp.status_code != 200:
return []
room_ids = resp.json().get('joined_rooms', [])
rooms = []
for rid in room_ids:
# Try to get room name
name = "Unknown Room"
try:
# Get state event for name
name_resp = requests.get(
f"{homeserver}/_matrix/client/v3/rooms/{rid}/state/m.room.name",
headers=headers,
timeout=2
)
if name_resp.status_code == 200:
name = name_resp.json().get('name', name)
else:
# Try canonical alias
alias_resp = requests.get(
f"{homeserver}/_matrix/client/v3/rooms/{rid}/state/m.room.canonical_alias",
headers=headers,
timeout=2
)
if alias_resp.status_code == 200:
name = alias_resp.json().get('alias', name)
except Exception:
pass
rooms.append({'id': rid, 'name': name})
return rooms
except Exception as e:
log(f"Error listing Matrix rooms: {e}", file=sys.stderr)
return []
def upload(self, file_path: Path, **kwargs: Any) -> str:
"""Upload file to Matrix room.
@@ -993,8 +1062,8 @@ class MatrixStorageBackend(StorageBackend):
access_token = matrix_conf.get('access_token')
room_id = matrix_conf.get('room_id')
if not homeserver or not room_id:
raise ValueError("Matrix homeserver and room_id required")
if not homeserver:
raise ValueError("Matrix homeserver required")
# Ensure homeserver has protocol
if not homeserver.startswith('http'):
@@ -1004,6 +1073,39 @@ class MatrixStorageBackend(StorageBackend):
if not access_token:
raise ValueError("Matrix access_token required (login not yet implemented)")
# Handle room selection if not provided
if not room_id:
log("No room_id configured. Fetching joined rooms...", file=sys.stderr)
rooms = self.list_rooms(config)
if not rooms:
raise ValueError("No joined rooms found or failed to fetch rooms.")
from result_table import ResultTable
table = ResultTable("Matrix Rooms")
for i, room in enumerate(rooms):
row = table.add_row()
row.add_column("#", str(i + 1))
row.add_column("Name", room['name'])
row.add_column("ID", room['id'])
print(table)
# Simple interactive selection
try:
selection = input("Select room # to upload to: ")
idx = int(selection) - 1
if 0 <= idx < len(rooms):
room_id = rooms[idx]['id']
log(f"Selected room: {rooms[idx]['name']} ({room_id})", file=sys.stderr)
else:
raise ValueError("Invalid selection")
except Exception:
raise ValueError("Invalid room selection")
if not room_id:
raise ValueError("Matrix room_id required")
# 1. Upload Media
upload_url = f"{homeserver}/_matrix/media/r3/upload"
headers = {

View File

@@ -1337,19 +1337,44 @@ def is_available(config: dict[str, Any], use_cache: bool = True) -> tuple[bool,
timeout = 10.0
try:
client = HydrusClient(url, access_key, timeout)
# Lightweight probe: get services
# Temporarily suppress error logging for health checks (expected to fail if Hydrus unavailable)
hydrus_logger = logging.getLogger("helper.hydrus")
original_level = hydrus_logger.level
hydrus_logger.setLevel(logging.CRITICAL) # Suppress errors/warnings
# Use HTTPClient directly to avoid session key logic and reduce retries
# This prevents log spam when Hydrus is offline (avoiding 3 retries x 2 requests)
from helper.http_client import HTTPClient
probe_url = f"{url.rstrip('/')}/get_services"
headers = {}
if access_key:
headers["Hydrus-Client-API-Access-Key"] = access_key
# Suppress HTTPClient logging during probe to avoid "Request failed" logs on startup
http_logger = logging.getLogger("helper.http_client")
original_level = http_logger.level
http_logger.setLevel(logging.CRITICAL)
try:
_ = client.get_services()
_HYDRUS_AVAILABLE = True
_HYDRUS_UNAVAILABLE_REASON = None
return True, None
# Use retries=1 (single attempt, no retry) to fail fast
with HTTPClient(timeout=timeout, retries=1, headers=headers, verify_ssl=False) as http:
try:
response = http.get(probe_url)
if response.status_code == 200:
_HYDRUS_AVAILABLE = True
_HYDRUS_UNAVAILABLE_REASON = None
return True, None
else:
# Even if we get a 4xx/5xx, the service is "reachable" but maybe auth failed
# But for "availability" we usually mean "usable".
# If auth fails (403), we can't use it, so return False.
reason = f"HTTP {response.status_code}: {response.reason_phrase}"
_HYDRUS_AVAILABLE = False
_HYDRUS_UNAVAILABLE_REASON = reason
return False, reason
except Exception as e:
# This catches connection errors from HTTPClient
raise e
finally:
hydrus_logger.setLevel(original_level)
http_logger.setLevel(original_level)
except Exception as exc:
reason = str(exc)
_HYDRUS_AVAILABLE = False

View File

@@ -1,21 +1,44 @@
"""Shared Library Genesis search and download helpers."""
"""Shared Library Genesis search and download helpers.
Replaces the old libgen backend with a robust scraper based on libgen-api-enhanced logic.
Targets libgen.is/rs/st mirrors and parses the results table directly.
"""
from __future__ import annotations
from pathlib import Path
from typing import Any, Callable, Dict, Iterable, List, Optional
import logging
import re
import requests
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional
from urllib.parse import quote, urljoin
from libgen import search_sync, LibgenError
# Optional dependencies
try:
from bs4 import BeautifulSoup
except ImportError:
BeautifulSoup = None
LogFn = Optional[Callable[[str], None]]
ErrorFn = Optional[Callable[[str], None]]
DEFAULT_TIMEOUT = 10.0
DEFAULT_TIMEOUT = 20.0
DEFAULT_LIMIT = 50
logging.getLogger(__name__).setLevel(logging.WARNING)
# Mirrors to try in order
MIRRORS = [
"https://libgen.is",
"https://libgen.rs",
"https://libgen.st",
"http://libgen.is",
"http://libgen.rs",
"http://libgen.st",
"https://libgen.li", # Different structure, fallback
"http://libgen.li",
"https://libgen.gl", # Different structure, fallback
"http://libgen.gl",
]
logging.getLogger(__name__).setLevel(logging.INFO)
def _call(logger: LogFn, message: str) -> None:
@@ -23,168 +46,248 @@ def _call(logger: LogFn, message: str) -> None:
logger(message)
def search_libgen_no_ads(query: str, session: Optional[requests.Session] = None) -> List[Dict[str, Any]]:
"""Search Libgen without triggering ads.php requests."""
try:
from bs4 import BeautifulSoup
except ImportError: # pragma: no cover
logging.warning("BeautifulSoup not available; falling back to standard search")
class LibgenSearch:
"""Robust LibGen searcher."""
def __init__(self, session: Optional[requests.Session] = None):
self.session = session or requests.Session()
self.session.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
})
def search(self, query: str, limit: int = DEFAULT_LIMIT) -> List[Dict[str, Any]]:
"""Search LibGen mirrors."""
if not BeautifulSoup:
logging.error("BeautifulSoup not installed. Cannot search LibGen.")
return []
for mirror in MIRRORS:
try:
if "libgen.li" in mirror or "libgen.gl" in mirror:
results = self._search_libgen_li(mirror, query, limit)
else:
results = self._search_libgen_rs(mirror, query, limit)
if results:
return results
except Exception as e:
logging.debug(f"Mirror {mirror} failed: {e}")
continue
return []
mirrors = [
"https://libgen.gl",
"https://libgen.vg",
"https://libgen.la",
"https://libgen.bz",
"https://libgen.gs",
]
def _search_libgen_rs(self, mirror: str, query: str, limit: int) -> List[Dict[str, Any]]:
"""Search libgen.rs/is/st style mirrors."""
# Search URL: /search.php?req=QUERY&res=100&column=def
url = f"{mirror}/search.php"
params = {
"req": query,
"res": 100, # Request more to filter later
"column": "def",
"open": 0,
"view": "simple",
"phrase": 1,
}
resp = self.session.get(url, params=params, timeout=DEFAULT_TIMEOUT)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
# Find the table with results. usually class 'c'
table = soup.find("table", {"class": "c"})
if not table:
# Try finding by structure (table with many rows)
tables = soup.find_all("table")
for t in tables:
if len(t.find_all("tr")) > 5:
table = t
break
if not table:
return []
session = session or requests.Session()
session.headers.setdefault(
"User-Agent",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
)
for mirror in mirrors:
try:
search_url = f"{mirror}/index.php?req={quote(query)}&res=100&covers=on&filesuns=all"
response = session.get(search_url, timeout=DEFAULT_TIMEOUT)
if response.status_code != 200:
results = []
# Skip header row
rows = table.find_all("tr")[1:]
for row in rows:
cols = row.find_all("td")
if len(cols) < 9:
continue
# Columns:
# 0: ID
# 1: Author(s)
# 2: Title
# 3: Publisher
# 4: Year
# 5: Pages
# 6: Language
# 7: Size
# 8: Extension
# 9+: Mirrors
try:
libgen_id = cols[0].get_text(strip=True)
authors = [a.get_text(strip=True) for a in cols[1].find_all("a")]
if not authors:
authors = [cols[1].get_text(strip=True)]
title_tag = cols[2].find("a")
title = title_tag.get_text(strip=True) if title_tag else cols[2].get_text(strip=True)
# Extract MD5 from title link if possible (often in href)
# href='book/index.php?md5=...'
md5 = ""
if title_tag and title_tag.has_attr("href"):
href = title_tag["href"]
match = re.search(r"md5=([a-fA-F0-9]{32})", href)
if match:
md5 = match.group(1)
publisher = cols[3].get_text(strip=True)
year = cols[4].get_text(strip=True)
pages = cols[5].get_text(strip=True)
language = cols[6].get_text(strip=True)
size = cols[7].get_text(strip=True)
extension = cols[8].get_text(strip=True)
# Mirrors
# Usually col 9 is http://library.lol/main/MD5
mirror_links = []
for i in range(9, len(cols)):
a = cols[i].find("a")
if a and a.has_attr("href"):
mirror_links.append(a["href"])
# Construct direct download page link (library.lol)
# If we have MD5, we can guess it: http://library.lol/main/{md5}
if md5:
download_link = f"http://library.lol/main/{md5}"
elif mirror_links:
download_link = mirror_links[0]
else:
download_link = ""
soup = BeautifulSoup(response.content, "html.parser")
table = soup.find("table", {"class": "catalog"})
if table is None:
for candidate in soup.find_all("table"):
rows = candidate.find_all("tr")
if len(rows) > 2:
table = candidate
break
if table is None:
logging.debug("[libgen_no_ads] No results table on %s", mirror)
results.append({
"id": libgen_id,
"title": title,
"author": ", ".join(authors),
"publisher": publisher,
"year": year,
"pages": pages,
"language": language,
"filesize_str": size,
"extension": extension,
"md5": md5,
"mirror_url": download_link,
"cover": "", # Could extract from hover if needed
})
if len(results) >= limit:
break
except Exception as e:
logging.debug(f"Error parsing row: {e}")
continue
return results
rows = table.find_all("tr")[1:]
results: List[Dict[str, Any]] = []
for row in rows:
try:
cells = row.find_all("td")
if len(cells) < 9:
continue
size_cell = cells[7]
file_link = size_cell.find("a")
mirror_link = ""
if file_link:
href = str(file_link.get("href", ""))
if href.startswith("/"):
mirror_link = mirror + href
elif href:
mirror_link = urljoin(mirror, href)
if not mirror_link:
title_link = cells[1].find("a") if len(cells) > 1 else None
if title_link:
href = str(title_link.get("href", ""))
if href.startswith("/"):
mirror_link = mirror + href
elif href:
mirror_link = urljoin(mirror, href)
if not mirror_link:
continue
results.append(
{
"id": "",
"mirror": mirror_link,
"cover": "",
"title": cells[1].get_text(strip=True) if len(cells) > 1 else "Unknown",
"authors": [cells[2].get_text(strip=True)]
if len(cells) > 2
else ["Unknown"],
"publisher": cells[3].get_text(strip=True) if len(cells) > 3 else "",
"year": cells[4].get_text(strip=True) if len(cells) > 4 else "",
"pages": cells[6].get_text(strip=True) if len(cells) > 6 else "",
"language": cells[5].get_text(strip=True) if len(cells) > 5 else "",
"size": cells[7].get_text(strip=True) if len(cells) > 7 else "",
"extension": cells[8].get_text(strip=True) if len(cells) > 8 else "",
"isbn": "",
}
)
except Exception as exc: # pragma: no cover - defensive
logging.debug("[libgen_no_ads] Error parsing row: %s", exc)
continue
if results:
logging.info("[libgen_no_ads] %d results from %s", len(results), mirror)
return results
except Exception as exc: # pragma: no cover - mirror issues
logging.debug("[libgen_no_ads] Mirror %s failed: %s", mirror, exc)
continue
return []
def format_book_info(book: Any) -> Dict[str, Any]:
"""Format Libgen search result into a consistent dictionary."""
filesize_bytes = 0
size_str = getattr(book, "size", "") or ""
if size_str:
parts = size_str.strip().split()
try:
value = float(parts[0])
unit = parts[1].upper() if len(parts) > 1 else "B"
if unit in {"MB", "M"}:
filesize_bytes = int(value * 1024 * 1024)
elif unit in {"GB", "G"}:
filesize_bytes = int(value * 1024 * 1024 * 1024)
elif unit in {"KB", "K"}:
filesize_bytes = int(value * 1024)
else:
filesize_bytes = int(value)
except (ValueError, IndexError): # pragma: no cover - defensive
filesize_bytes = 0
title = getattr(book, "title", "") or ""
isbn = getattr(book, "isbn", "") or ""
if not isbn and title:
import re
match = re.search(
r"((?:[\d]{10,13}(?:\s*[;,]\s*[\d]{10,13})+)|(?:[\d]{10,13})(?:\s*[;,]?\s*[\d\-]{0,50})?)\s*(?:\b|$)",
title,
)
if match:
potential_isbn = match.group(0).strip()
if re.search(r"\d{10,13}", potential_isbn):
isbn = potential_isbn
title = re.sub(r"\s+[a-z]\s*$", "", title[: match.start()].strip(), flags=re.IGNORECASE)
authors_value = getattr(book, "authors", None)
if isinstance(authors_value, Iterable) and not isinstance(authors_value, str):
authors_str = ", ".join(str(author) for author in authors_value)
else:
authors_str = str(authors_value or "Unknown")
download_links = getattr(book, "download_links", None)
mirror_url = None
if download_links and getattr(download_links, "get_link", None):
mirror_url = download_links.get_link
return {
"title": title or "Unknown",
"author": authors_str,
"publisher": getattr(book, "publisher", "") or "",
"year": getattr(book, "year", "") or "",
"pages": getattr(book, "pages", "") or "",
"language": getattr(book, "language", "") or "",
"filesize": filesize_bytes,
"filesize_str": size_str or "Unknown",
"extension": getattr(book, "extension", "") or "",
"isbn": isbn,
"mirror_url": mirror_url,
}
def _search_libgen_li(self, mirror: str, query: str, limit: int) -> List[Dict[str, Any]]:
"""Search libgen.li/gl style mirrors."""
# Search URL: /index.php?req=QUERY&columns[]=t&columns[]=a...
url = f"{mirror}/index.php"
params = {
"req": query,
"res": 100,
"covers": "on",
"filesuns": "all",
}
resp = self.session.get(url, params=params, timeout=DEFAULT_TIMEOUT)
resp.raise_for_status()
soup = BeautifulSoup(resp.text, "html.parser")
table = soup.find("table", {"id": "tablelibgen"})
if not table:
table = soup.find("table", {"class": "table table-striped"})
if not table:
return []
results = []
rows = table.find_all("tr")[1:]
for row in rows:
cols = row.find_all("td")
if len(cols) < 9:
continue
try:
# Structure is different
# 0: Cover
# 1: Title (with link to file.php?id=...)
# 2: Author
# 3: Publisher
# 4: Year
# 5: Language
# 6: Pages
# 7: Size
# 8: Extension
# 9: Mirrors
title_col = cols[1]
title_link = title_col.find("a")
title = title_link.get_text(strip=True) if title_link else title_col.get_text(strip=True)
# Extract ID from link
libgen_id = ""
if title_link and title_link.has_attr("href"):
href = title_link["href"]
# href is usually "file.php?id=..." or "edition.php?id=..."
match = re.search(r"id=(\d+)", href)
if match:
libgen_id = match.group(1)
authors = cols[2].get_text(strip=True)
publisher = cols[3].get_text(strip=True)
year = cols[4].get_text(strip=True)
language = cols[5].get_text(strip=True)
pages = cols[6].get_text(strip=True)
size = cols[7].get_text(strip=True)
extension = cols[8].get_text(strip=True)
# Mirror link
# Usually in col 9 or title link
mirror_url = ""
if title_link:
href = title_link["href"]
if href.startswith("/"):
mirror_url = mirror + href
else:
mirror_url = urljoin(mirror, href)
results.append({
"id": libgen_id,
"title": title,
"author": authors,
"publisher": publisher,
"year": year,
"pages": pages,
"language": language,
"filesize_str": size,
"extension": extension,
"md5": "", # .li doesn't show MD5 easily in table
"mirror_url": mirror_url,
})
if len(results) >= limit:
break
except Exception:
continue
return results
def search_libgen(
@@ -195,183 +298,160 @@ def search_libgen(
log_error: ErrorFn = None,
session: Optional[requests.Session] = None,
) -> List[Dict[str, Any]]:
"""Search Libgen returning formatted dictionaries with multiple mirrors.
Uses HTML scraper (search_libgen_no_ads) to find books quickly.
Returns mirror URLs and book IDs that can be used to generate alternative mirrors.
"""
"""Search Libgen using the robust scraper."""
searcher = LibgenSearch(session=session)
try:
_call(log_info, f"[search] Searching Libgen for: {query}")
session = session or requests.Session()
# Use HTML scraper - more reliable and doesn't hang on mirror resolution
_call(log_info, "[search] Using HTML scraper (search_libgen_no_ads)...")
results: List[Any] = search_libgen_no_ads(query, session=session)
if not results:
_call(log_info, "[search] No results from HTML scraper")
return []
formatted: List[Dict[str, Any]] = []
mirrors_list = [
"https://libgen.gl",
"https://libgen.vg",
"https://libgen.la",
"https://libgen.bz",
"https://libgen.gs",
]
for book in results[:limit]:
if isinstance(book, dict):
# Result from search_libgen_no_ads (HTML scraper)
authors = book.get("authors", ["Unknown"])
if isinstance(authors, list):
author_value = ", ".join(str(a) for a in authors)
else:
author_value = str(authors)
# Extract book ID from mirror URL if available
mirror = book.get("mirror", "")
book_id = ""
if mirror and "/file.php?id=" in mirror:
try:
book_id = mirror.split("/file.php?id=")[1].split("&")[0]
except (IndexError, ValueError):
pass
# Build list of alternative mirrors based on book ID
mirrors_dict = {}
if book_id:
for mirror_base in mirrors_list:
mirrors_dict[mirror_base] = f"{mirror_base}/file.php?id={book_id}"
elif mirror:
# Fallback: use the mirror we found
mirrors_dict["primary"] = mirror
formatted.append(
{
"title": book.get("title", "Unknown"),
"author": author_value,
"publisher": book.get("publisher", ""),
"year": book.get("year", ""),
"pages": book.get("pages", ""),
"language": book.get("language", ""),
"filesize": 0,
"filesize_str": book.get("size", "Unknown"),
"extension": book.get("extension", ""),
"isbn": book.get("isbn", ""),
"mirror_url": mirror, # Primary mirror
"mirrors": mirrors_dict, # Alternative mirrors
"book_id": book_id,
}
)
else:
# Fallback: try to format as book object
try:
formatted.append(format_book_info(book))
except Exception:
pass
_call(log_info, f"[search] Found {len(formatted)} result(s)")
return formatted
except LibgenError as exc:
_call(log_error, f"[search] Libgen error: {exc}")
return []
except Exception as exc: # pragma: no cover - defensive
_call(log_error, f"[search] Error: {exc}")
results = searcher.search(query, limit=limit)
_call(log_info, f"[libgen] Found {len(results)} results")
return results
except Exception as e:
_call(log_error, f"[libgen] Search failed: {e}")
return []
def _resolve_download_url(
session: requests.Session,
url: str,
log_info: LogFn = None
) -> Optional[str]:
"""Resolve the final download URL by following the LibGen chain."""
current_url = url
visited = set()
# Max hops to prevent infinite loops
for _ in range(6):
if current_url in visited:
break
visited.add(current_url)
_call(log_info, f"[resolve] Checking: {current_url}")
# Simple heuristic: if it looks like a file, return it
if current_url.lower().endswith(('.pdf', '.epub', '.mobi', '.djvu', '.azw3', '.cbz', '.cbr')):
return current_url
try:
# Use HEAD first to check content type if possible, but some mirrors block HEAD or return 405
# So we'll just GET with stream=True to peek headers/content without downloading everything
with session.get(current_url, stream=True, timeout=30) as resp:
resp.raise_for_status()
ct = resp.headers.get("Content-Type", "").lower()
if "text/html" not in ct:
# It's a binary file
return current_url
# It's HTML, read content
content = resp.text
except Exception as e:
_call(log_info, f"[resolve] Failed to fetch {current_url}: {e}")
return None
soup = BeautifulSoup(content, "html.parser")
# 1. Check for "GET" link (library.lol / ads.php style)
# Usually <h2>GET</h2> inside <a> or just text "GET"
get_link = soup.find("a", string=re.compile(r"^GET$", re.IGNORECASE))
if not get_link:
# Try finding <a> containing <h2>GET</h2>
h2_get = soup.find("h2", string=re.compile(r"^GET$", re.IGNORECASE))
if h2_get and h2_get.parent.name == "a":
get_link = h2_get.parent
if get_link and get_link.has_attr("href"):
return urljoin(current_url, get_link["href"])
# 2. Check for "series.php" -> "edition.php"
if "series.php" in current_url:
# Find first edition link
edition_link = soup.find("a", href=re.compile(r"edition\.php"))
if edition_link:
current_url = urljoin(current_url, edition_link["href"])
continue
# 3. Check for "edition.php" -> "file.php"
if "edition.php" in current_url:
file_link = soup.find("a", href=re.compile(r"file\.php"))
if file_link:
current_url = urljoin(current_url, file_link["href"])
continue
# 4. Check for "file.php" -> "ads.php" (Libgen badge)
if "file.php" in current_url:
# Look for link with title="libgen" or text "Libgen"
libgen_link = soup.find("a", title="libgen")
if not libgen_link:
libgen_link = soup.find("a", string=re.compile(r"Libgen", re.IGNORECASE))
if libgen_link and libgen_link.has_attr("href"):
current_url = urljoin(current_url, libgen_link["href"])
continue
# 5. Check for "ads.php" -> "get.php" (Fallback if GET link logic above failed)
if "ads.php" in current_url:
get_php_link = soup.find("a", href=re.compile(r"get\.php"))
if get_php_link:
return urljoin(current_url, get_php_link["href"])
# 6. Library.lol / generic fallback
for text in ["Cloudflare", "IPFS.io", "Infura"]:
link = soup.find("a", string=re.compile(text, re.IGNORECASE))
if link and link.has_attr("href"):
return urljoin(current_url, link["href"])
# If we found nothing new, stop
break
return None
def download_from_mirror(
mirror_url: str,
output_path: str | Path,
output_path: Path,
*,
log_info: LogFn = None,
log_error: ErrorFn = None,
session: Optional[requests.Session] = None,
) -> bool:
"""Download a Libgen file and write it to disk.
Handles Libgen redirects and ensures proper file download by:
- Following all redirects (default behavior)
- Setting User-Agent header (required by some mirrors)
- Validating that we're downloading binary content, not HTML
- Attempting alternative download method if HTML is returned
"""
"""Download file from a LibGen mirror URL."""
session = session or requests.Session()
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
try:
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
_call(log_info, f"[download] Downloading from mirror: {mirror_url}")
_call(log_info, f"[download] Resolving download link from: {mirror_url}")
# Ensure session has proper headers for Libgen
if 'User-Agent' not in session.headers:
session.headers['User-Agent'] = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
)
download_url = _resolve_download_url(session, mirror_url, log_info)
# Download with redirects enabled (default) and referer
session.headers['Referer'] = 'https://libgen.gs/'
response = session.get(mirror_url, stream=True, timeout=30, allow_redirects=True)
response.raise_for_status()
# Check if we got HTML instead of a file (common Libgen issue)
content_type = response.headers.get('content-type', '').lower()
if 'text/html' in content_type:
_call(log_error, f"[download] Server returned HTML. Trying alternative method...")
if not download_url:
_call(log_error, "[download] Could not find direct download link")
return False
# Try to extract file ID and use alternative CDN
try:
# Parse the HTML to extract MD5 or file ID
from bs4 import BeautifulSoup
soup = BeautifulSoup(response.text, 'html.parser')
# Look for download link in the HTML
# Common patterns: md5 hash in form, or direct link in anchor tags
download_link = None
# Try to find forms that might contain download functionality
forms = soup.find_all('form')
for form in forms:
action = form.get('action', '')
if 'download' in action.lower() or 'get' in action.lower():
download_link = action
break
if not download_link:
_call(log_error, f"[download] Could not extract alternative download link from HTML")
return False
_call(log_info, f"[download] Using alternative download method: {download_link[:100]}")
# Try downloading from alternative link
response2 = session.get(download_link, stream=True, timeout=30, allow_redirects=True)
response2.raise_for_status()
response = response2 # Use the new response
_call(log_info, f"[download] Downloading from: {download_url}")
# Download the actual file
with session.get(download_url, stream=True, timeout=60) as r:
r.raise_for_status()
except Exception as alt_error:
_call(log_error, f"[download] Alternative method failed: {alt_error}")
# Verify it's not HTML (error page)
ct = r.headers.get("content-type", "").lower()
if "text/html" in ct:
_call(log_error, "[download] Final URL returned HTML, not a file.")
return False
total_size = int(response.headers.get("content-length", 0))
downloaded = 0
with open(output_path, "wb") as handle:
for chunk in response.iter_content(chunk_size=8192):
if not chunk:
continue
handle.write(chunk)
downloaded += len(chunk)
if total_size > 0:
percent = downloaded / total_size * 100
_call(
log_info,
f"[download] {percent:.1f}% - {downloaded // (1024*1024)}MB / {total_size // (1024*1024)}MB",
)
_call(log_info, f"[download] Downloaded successfully to: {output_path}")
total_size = int(r.headers.get("content-length", 0))
downloaded = 0
with open(output_path, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
downloaded += len(chunk)
# Optional: progress logging
_call(log_info, f"[download] Saved to {output_path}")
return True
except Exception as exc: # pragma: no cover - defensive
_call(log_error, f"[download] Error: {exc}")
except Exception as e:
_call(log_error, f"[download] Download failed: {e}")
return False

View File

@@ -238,7 +238,7 @@ def send_to_mpv(file_url: str, title: str, headers: Optional[Dict[str, str]] = N
# Sanitize title for M3U (remove newlines)
safe_title = title.replace("\n", " ").replace("\r", "")
# M3U format: #EXTM3U\n#EXTINF:-1,Title\nURL
m3u_content = f"#EXTM3U\n#EXTINF:-1,{safe_title}\n{file_url}\n"
m3u_content = f"#EXTM3U\n#EXTINF:-1,{safe_title}\n{file_url}"
target = f"memory://{m3u_content}"
else:
target = file_url
@@ -256,9 +256,8 @@ def send_to_mpv(file_url: str, title: str, headers: Optional[Dict[str, str]] = N
# Command 3: Set title (metadata for display) - still useful for window title
if title:
safe_title_prop = title.replace('"', '\\"')
cmd_title = {
"command": ["set_property", "force-media-title", safe_title_prop],
"command": ["set_property", "force-media-title", title],
"request_id": 2
}
client.send_command(cmd_title)