"""Archive.org API client for borrowing and downloading books. This module provides low-level functions for interacting with Archive.org: - Authentication (login, credential management) - Borrowing (loan, return_loan) - Book metadata extraction (get_book_infos, get_book_metadata) - Image downloading and deobfuscation - PDF creation with metadata Used by unified_book_downloader.py for the borrowing workflow. """ from __future__ import annotations import base64 import hashlib import logging import os import re import sys import time from concurrent import futures from typing import Any, Dict, List, Optional, Sequence, Tuple import requests from helper.logger import log, debug try: from Crypto.Cipher import AES # type: ignore from Crypto.Util import Counter # type: ignore except ImportError: AES = None # type: ignore Counter = None # type: ignore try: from tqdm import tqdm # type: ignore except ImportError: tqdm = None # type: ignore def credential_openlibrary(config: Dict[str, Any]) -> Tuple[Optional[str], Optional[str]]: """Get OpenLibrary/Archive.org email and password from config. Supports both formats: - New: {"provider": {"openlibrary": {"email": "...", "password": "..."}}} - Old: {"Archive": {"email": "...", "password": "..."}} {"archive_org_email": "...", "archive_org_password": "..."} Returns: (email, password) tuple, each can be None """ if not isinstance(config, dict): return None, None # Try new format first provider_config = config.get("provider", {}) if isinstance(provider_config, dict): openlibrary_config = provider_config.get("openlibrary", {}) if isinstance(openlibrary_config, dict): email = openlibrary_config.get("email") password = openlibrary_config.get("password") if email or password: return email, password # Try old nested format archive_config = config.get("Archive") if isinstance(archive_config, dict): email = archive_config.get("email") password = archive_config.get("password") if email or password: return email, password # Fall back to old flat format email = config.get("archive_org_email") password = config.get("archive_org_password") return email, password def display_error(response: requests.Response, message: str) -> None: """Display error and exit.""" log(message, file=sys.stderr) log(response.text, file=sys.stderr) sys.exit(1) def login(email: str, password: str) -> requests.Session: """Login to archive.org. Args: email: Archive.org email password: Archive.org password Returns: Authenticated requests.Session Raises: SystemExit on login failure """ session = requests.Session() session.get("https://archive.org/account/login", timeout=30) data = {"username": email, "password": password} response = session.post("https://archive.org/account/login", data=data, timeout=30) if "bad_login" in response.text: log("Invalid credentials!", file=sys.stderr) sys.exit(1) if "Successful login" in response.text: debug("Successful login") return session display_error(response, "[-] Error while login:") sys.exit(1) # Unreachable but satisfies type checker def loan(session: requests.Session, book_id: str, verbose: bool = True) -> requests.Session: """Borrow a book from archive.org (14-day loan). Args: session: Authenticated requests.Session from login() book_id: Archive.org book identifier (e.g., 'ia_book_id') verbose: Whether to log messages Returns: Session with active loan Raises: SystemExit on loan failure """ data = {"action": "grant_access", "identifier": book_id} response = session.post("https://archive.org/services/loans/loan/searchInside.php", data=data, timeout=30) data["action"] = "browse_book" response = session.post("https://archive.org/services/loans/loan/", data=data, timeout=30) if response.status_code == 400: try: if response.json()["error"] == "This book is not available to borrow at this time. Please try again later.": debug("This book doesn't need to be borrowed") return session display_error(response, "Something went wrong when trying to borrow the book.") except: display_error(response, "The book cannot be borrowed") data["action"] = "create_token" response = session.post("https://archive.org/services/loans/loan/", data=data, timeout=30) if "token" in response.text: if verbose: debug("Successful loan") return session display_error(response, "Something went wrong when trying to borrow the book.") sys.exit(1) # Unreachable but satisfies type checker def return_loan(session: requests.Session, book_id: str) -> None: """Return a borrowed book. Args: session: Authenticated requests.Session with active loan book_id: Archive.org book identifier """ data = {"action": "return_loan", "identifier": book_id} response = session.post("https://archive.org/services/loans/loan/", data=data, timeout=30) if response.status_code == 200 and response.json()["success"]: debug("Book returned") else: display_error(response, "Something went wrong when trying to return the book") def get_book_infos(session: requests.Session, url: str) -> Tuple[str, List[str], Dict[str, Any]]: """Extract book information and page links from archive.org viewer. Args: session: Authenticated requests.Session url: Book URL (e.g., https://archive.org/borrow/book_id or /details/book_id) Returns: Tuple of (title, page_links, metadata) Raises: RuntimeError: If page data cannot be extracted """ r = session.get(url, timeout=30).text # Try to extract the infos URL from the response try: # Look for the "url" field in the response if '"url":"' not in r: raise ValueError("No 'url' field found in response") infos_url = "https:" + r.split('"url":"')[1].split('"')[0].replace("\\u0026", "&") except (IndexError, ValueError) as e: # If URL extraction fails, raise with better error message raise RuntimeError(f"Failed to extract book info URL from response: {e}") response = session.get(infos_url, timeout=30) data = response.json()["data"] title = data["brOptions"]["bookTitle"].strip().replace(" ", "_") title = "".join(c for c in title if c not in '<>:"/\\|?*') # Filter forbidden chars title = title[:150] # Trim to avoid long file names metadata = data["metadata"] links = [] # Safely extract page links from brOptions data try: br_data = data.get("brOptions", {}).get("data", []) for item in br_data: if isinstance(item, list): for page in item: if isinstance(page, dict) and "uri" in page: links.append(page["uri"]) elif isinstance(item, dict) and "uri" in item: links.append(item["uri"]) except (KeyError, IndexError, TypeError) as e: log(f"Warning: Error parsing page links: {e}", file=sys.stderr) # Continue with whatever links we found if len(links) > 1: debug(f"Found {len(links)} pages") return title, links, metadata elif len(links) == 1: debug(f"Found {len(links)} page") return title, links, metadata else: log("Error while getting image links - no pages found", file=sys.stderr) raise RuntimeError("No pages found in book data") def image_name(pages: int, page: int, directory: str) -> str: """Generate image filename for page. Args: pages: Total number of pages page: Current page number (0-indexed) directory: Directory to save to Returns: Full path to image file """ return f"{directory}/{(len(str(pages)) - len(str(page))) * '0'}{page}.jpg" def deobfuscate_image(image_data: bytes, link: str, obf_header: str) -> bytes: """Decrypt obfuscated image data using AES-CTR. This handles Archive.org's image obfuscation for borrowed books. Based on: https://github.com/justimm Args: image_data: Encrypted image bytes link: Image URL (used to derive AES key) obf_header: X-Obfuscate header value (format: "1|BASE64_COUNTER") Returns: Decrypted image bytes """ if not AES or not Counter: raise RuntimeError("Crypto library not available") try: version, counter_b64 = obf_header.split("|") except Exception as e: raise ValueError("Invalid X-Obfuscate header format") from e if version != "1": raise ValueError("Unsupported obfuscation version: " + version) # Derive AES key from URL aesKey = re.sub(r"^https?:\/\/.*?\/", "/", link) sha1_digest = hashlib.sha1(aesKey.encode("utf-8")).digest() key = sha1_digest[:16] # Decode counter counter_bytes = base64.b64decode(counter_b64) if len(counter_bytes) != 16: raise ValueError(f"Expected counter to be 16 bytes, got {len(counter_bytes)}") prefix = counter_bytes[:8] initial_value = int.from_bytes(counter_bytes[8:], byteorder="big") # Create AES-CTR cipher ctr = Counter.new(64, prefix=prefix, initial_value=initial_value, little_endian=False) # type: ignore cipher = AES.new(key, AES.MODE_CTR, counter=ctr) # type: ignore decrypted_part = cipher.decrypt(image_data[:1024]) new_data = decrypted_part + image_data[1024:] return new_data def download_one_image( session: requests.Session, link: str, i: int, directory: str, book_id: str, pages: int, ) -> None: """Download a single book page image. Handles obfuscated images and re-borrowing on 403 errors. Args: session: Authenticated requests.Session link: Direct image URL i: Page index (0-based) directory: Directory to save to book_id: Archive.org book ID (for re-borrowing on 403) pages: Total number of pages """ headers = { "Referer": "https://archive.org/", "Accept": "image/avif,image/webp,image/apng,image/*,*/*;q=0.8", "Sec-Fetch-Site": "same-site", "Sec-Fetch-Mode": "no-cors", "Sec-Fetch-Dest": "image", } retry = True response = None while retry: try: response = session.get(link, headers=headers, timeout=30) if response.status_code == 403: session = loan(session, book_id, verbose=False) raise Exception("Borrow again") if response.status_code == 200: retry = False except: time.sleep(1) image = image_name(pages, i, directory) if response is None: log(f"Failed to download page {i}", file=sys.stderr) return obf_header = response.headers.get("X-Obfuscate") image_content = None if obf_header: try: image_content = deobfuscate_image(response.content, link, obf_header) except Exception as e: log(f"Deobfuscation failed: {e}", file=sys.stderr) return else: image_content = response.content with open(image, "wb") as f: f.write(image_content) def download( session: requests.Session, n_threads: int, directory: str, links: List[str], scale: int, book_id: str, ) -> List[str]: """Download all book pages as images. Uses thread pool for parallel downloads. Args: session: Authenticated requests.Session n_threads: Number of download threads directory: Directory to save images to links: List of image URLs scale: Image resolution (0=highest, 10=lowest) book_id: Archive.org book ID (for re-borrowing) Returns: List of downloaded image file paths """ debug("Downloading pages...") links = [f"{link}&rotate=0&scale={scale}" for link in links] pages = len(links) tasks = [] with futures.ThreadPoolExecutor(max_workers=n_threads) as executor: for link in links: i = links.index(link) tasks.append( executor.submit( download_one_image, session=session, link=link, i=i, directory=directory, book_id=book_id, pages=pages, ) ) if tqdm: for _ in tqdm(futures.as_completed(tasks), total=len(tasks)): # type: ignore pass else: for _ in futures.as_completed(tasks): pass images = [image_name(pages, i, directory) for i in range(len(links))] return images def check_direct_download(book_id: str) -> Tuple[bool, str]: """Check if a book can be downloaded directly without borrowing. Searches Archive.org metadata for downloadable PDF files. Args: book_id: Archive.org book identifier Returns: Tuple of (can_download: bool, pdf_url: str) """ try: # First, try to get the metadata to find the actual PDF filename metadata_url = f"https://archive.org/metadata/{book_id}" response = requests.get(metadata_url, timeout=10) response.raise_for_status() metadata = response.json() # Find PDF file in files list if "files" in metadata: for file_info in metadata["files"]: filename = file_info.get("name", "") if filename.endswith(".pdf") and file_info.get("source") == "original": # Found the original PDF pdf_filename = filename pdf_url = f"https://archive.org/download/{book_id}/{pdf_filename.replace(' ', '%20')}" # Verify it's accessible check_response = requests.head(pdf_url, timeout=5, allow_redirects=True) if check_response.status_code == 200: return True, pdf_url return False, "" except Exception as e: log(f"Error checking direct download: {e}", file=sys.stderr) return False, "" def get_openlibrary_by_isbn(isbn: str) -> Dict[str, Any]: """Fetch book data from OpenLibrary using ISBN. Args: isbn: ISBN-10 or ISBN-13 to search for Returns: Dictionary with book metadata from OpenLibrary """ try: # Try ISBN API first api_url = f"https://openlibrary.org/api/books?bibkeys=ISBN:{isbn}&jscmd=data&format=json" response = requests.get(api_url, timeout=10) response.raise_for_status() data = response.json() if data: # Get first result key = list(data.keys())[0] return data[key] return {} except Exception as e: log(f"Error fetching OpenLibrary data by ISBN: {e}", file=sys.stderr) return {} def extract_isbn_from_metadata(metadata: Dict[str, Any]) -> str: """Extract ISBN from archive.org metadata. Looks for ISBN in various metadata fields. Args: metadata: Archive.org metadata dictionary Returns: ISBN string (clean, no hyphens) or empty string if not found """ # Try various common metadata fields isbn_fields = [ "isbn", "ISBN", "isbn_13", "isbn_10", "isbns", "isbn-10", "isbn-13", "identifer_isbn" ] for field in isbn_fields: if field in metadata: isbn_val = metadata[field] if isinstance(isbn_val, list): isbn_val = isbn_val[0] if isbn_val else None if isbn_val and isinstance(isbn_val, str): # Clean ISBN (remove hyphens, spaces) isbn_clean = isbn_val.replace("-", "").replace(" ", "") if len(isbn_clean) in [10, 13]: return isbn_clean return "" def normalize_url(url: str) -> str: """Convert openlibrary.org URL to archive.org URL. Looks up the actual Archive.org ID from OpenLibrary API. Args: url: Book URL (archive.org or openlibrary.org format) Returns: Normalized archive.org URL """ url = url.strip() # Already archive.org format if url.startswith("https://archive.org/details/"): return url # Convert openlibrary.org format by querying the OpenLibrary API if "openlibrary.org/books/" in url: try: # Extract the book ID (e.g., OL6796852M) parts = url.split("/books/") if len(parts) > 1: book_id = parts[1].split("/")[0] # Query OpenLibrary API to get the book metadata api_url = f"https://openlibrary.org/books/{book_id}.json" response = requests.get(api_url, timeout=10) response.raise_for_status() data = response.json() # Look for identifiers including internet_archive or ocaid # First try ocaid (Open Content Alliance ID) - this is most common if "ocaid" in data: ocaid = data["ocaid"] return f"https://archive.org/details/{ocaid}" # Check for identifiers object if "identifiers" in data: identifiers = data["identifiers"] # Look for internet_archive ID if "internet_archive" in identifiers: ia_ids = identifiers["internet_archive"] if isinstance(ia_ids, list) and ia_ids: ia_id = ia_ids[0] else: ia_id = ia_ids return f"https://archive.org/details/{ia_id}" # If no IA identifier found, use the book ID as fallback log(f"No Internet Archive ID found for {book_id}. Attempting with OpenLibrary ID.", file=sys.stderr) return f"https://archive.org/details/{book_id}" except requests.RequestException as e: log(f"Could not fetch OpenLibrary metadata: {e}", file=sys.stderr) # Fallback to using the book ID directly parts = url.split("/books/") if len(parts) > 1: book_id = parts[1].split("/")[0] return f"https://archive.org/details/{book_id}" except (KeyError, IndexError) as e: log(f"Error parsing OpenLibrary response: {e}", file=sys.stderr) # Fallback to using the book ID directly parts = url.split("/books/") if len(parts) > 1: book_id = parts[1].split("/")[0] return f"https://archive.org/details/{book_id}" # Return original if can't parse return url