Files
Medios-Macina/helper/archive_client.py

568 lines
19 KiB
Python
Raw Permalink Normal View History

2025-11-25 20:09:33 -08:00
"""Archive.org API client for borrowing and downloading books.
This module provides low-level functions for interacting with Archive.org:
- Authentication (login, credential management)
- Borrowing (loan, return_loan)
- Book metadata extraction (get_book_infos, get_book_metadata)
- Image downloading and deobfuscation
- PDF creation with metadata
Used by unified_book_downloader.py for the borrowing workflow.
"""
from __future__ import annotations
import base64
import hashlib
import logging
import os
import re
import sys
import time
from concurrent import futures
from typing import Any, Dict, List, Optional, Sequence, Tuple
import requests
from helper.logger import log, debug
try:
from Crypto.Cipher import AES # type: ignore
from Crypto.Util import Counter # type: ignore
except ImportError:
AES = None # type: ignore
Counter = None # type: ignore
try:
from tqdm import tqdm # type: ignore
except ImportError:
tqdm = None # type: ignore
def credential_openlibrary(config: Dict[str, Any]) -> Tuple[Optional[str], Optional[str]]:
"""Get OpenLibrary/Archive.org email and password from config.
Supports both formats:
- New: {"provider": {"openlibrary": {"email": "...", "password": "..."}}}
- Old: {"Archive": {"email": "...", "password": "..."}}
{"archive_org_email": "...", "archive_org_password": "..."}
Returns: (email, password) tuple, each can be None
"""
if not isinstance(config, dict):
return None, None
# Try new format first
provider_config = config.get("provider", {})
if isinstance(provider_config, dict):
openlibrary_config = provider_config.get("openlibrary", {})
if isinstance(openlibrary_config, dict):
email = openlibrary_config.get("email")
password = openlibrary_config.get("password")
if email or password:
return email, password
# Try old nested format
archive_config = config.get("Archive")
if isinstance(archive_config, dict):
email = archive_config.get("email")
password = archive_config.get("password")
if email or password:
return email, password
# Fall back to old flat format
email = config.get("archive_org_email")
password = config.get("archive_org_password")
return email, password
def display_error(response: requests.Response, message: str) -> None:
"""Display error and exit."""
log(message, file=sys.stderr)
log(response.text, file=sys.stderr)
sys.exit(1)
def login(email: str, password: str) -> requests.Session:
"""Login to archive.org.
Args:
email: Archive.org email
password: Archive.org password
Returns:
Authenticated requests.Session
Raises:
SystemExit on login failure
"""
session = requests.Session()
session.get("https://archive.org/account/login", timeout=30)
data = {"username": email, "password": password}
response = session.post("https://archive.org/account/login", data=data, timeout=30)
if "bad_login" in response.text:
log("Invalid credentials!", file=sys.stderr)
sys.exit(1)
if "Successful login" in response.text:
debug("Successful login")
return session
display_error(response, "[-] Error while login:")
sys.exit(1) # Unreachable but satisfies type checker
def loan(session: requests.Session, book_id: str, verbose: bool = True) -> requests.Session:
"""Borrow a book from archive.org (14-day loan).
Args:
session: Authenticated requests.Session from login()
book_id: Archive.org book identifier (e.g., 'ia_book_id')
verbose: Whether to log messages
Returns:
Session with active loan
Raises:
SystemExit on loan failure
"""
data = {"action": "grant_access", "identifier": book_id}
response = session.post("https://archive.org/services/loans/loan/searchInside.php", data=data, timeout=30)
data["action"] = "browse_book"
response = session.post("https://archive.org/services/loans/loan/", data=data, timeout=30)
if response.status_code == 400:
try:
if response.json()["error"] == "This book is not available to borrow at this time. Please try again later.":
debug("This book doesn't need to be borrowed")
return session
display_error(response, "Something went wrong when trying to borrow the book.")
except:
display_error(response, "The book cannot be borrowed")
data["action"] = "create_token"
response = session.post("https://archive.org/services/loans/loan/", data=data, timeout=30)
if "token" in response.text:
if verbose:
debug("Successful loan")
return session
display_error(response, "Something went wrong when trying to borrow the book.")
sys.exit(1) # Unreachable but satisfies type checker
def return_loan(session: requests.Session, book_id: str) -> None:
"""Return a borrowed book.
Args:
session: Authenticated requests.Session with active loan
book_id: Archive.org book identifier
"""
data = {"action": "return_loan", "identifier": book_id}
response = session.post("https://archive.org/services/loans/loan/", data=data, timeout=30)
if response.status_code == 200 and response.json()["success"]:
debug("Book returned")
else:
display_error(response, "Something went wrong when trying to return the book")
def get_book_infos(session: requests.Session, url: str) -> Tuple[str, List[str], Dict[str, Any]]:
"""Extract book information and page links from archive.org viewer.
Args:
session: Authenticated requests.Session
url: Book URL (e.g., https://archive.org/borrow/book_id or /details/book_id)
Returns:
Tuple of (title, page_links, metadata)
Raises:
RuntimeError: If page data cannot be extracted
"""
r = session.get(url, timeout=30).text
# Try to extract the infos URL from the response
try:
# Look for the "url" field in the response
if '"url":"' not in r:
raise ValueError("No 'url' field found in response")
infos_url = "https:" + r.split('"url":"')[1].split('"')[0].replace("\\u0026", "&")
except (IndexError, ValueError) as e:
# If URL extraction fails, raise with better error message
raise RuntimeError(f"Failed to extract book info URL from response: {e}")
response = session.get(infos_url, timeout=30)
data = response.json()["data"]
title = data["brOptions"]["bookTitle"].strip().replace(" ", "_")
title = "".join(c for c in title if c not in '<>:"/\\|?*') # Filter forbidden chars
title = title[:150] # Trim to avoid long file names
metadata = data["metadata"]
links = []
# Safely extract page links from brOptions data
try:
br_data = data.get("brOptions", {}).get("data", [])
for item in br_data:
if isinstance(item, list):
for page in item:
if isinstance(page, dict) and "uri" in page:
links.append(page["uri"])
elif isinstance(item, dict) and "uri" in item:
links.append(item["uri"])
except (KeyError, IndexError, TypeError) as e:
log(f"Warning: Error parsing page links: {e}", file=sys.stderr)
# Continue with whatever links we found
if len(links) > 1:
debug(f"Found {len(links)} pages")
return title, links, metadata
elif len(links) == 1:
debug(f"Found {len(links)} page")
return title, links, metadata
else:
log("Error while getting image links - no pages found", file=sys.stderr)
raise RuntimeError("No pages found in book data")
def image_name(pages: int, page: int, directory: str) -> str:
"""Generate image filename for page.
Args:
pages: Total number of pages
page: Current page number (0-indexed)
directory: Directory to save to
Returns:
Full path to image file
"""
return f"{directory}/{(len(str(pages)) - len(str(page))) * '0'}{page}.jpg"
def deobfuscate_image(image_data: bytes, link: str, obf_header: str) -> bytes:
"""Decrypt obfuscated image data using AES-CTR.
This handles Archive.org's image obfuscation for borrowed books.
Based on: https://github.com/justimm
Args:
image_data: Encrypted image bytes
link: Image URL (used to derive AES key)
obf_header: X-Obfuscate header value (format: "1|BASE64_COUNTER")
Returns:
Decrypted image bytes
"""
if not AES or not Counter:
raise RuntimeError("Crypto library not available")
try:
version, counter_b64 = obf_header.split("|")
except Exception as e:
raise ValueError("Invalid X-Obfuscate header format") from e
if version != "1":
raise ValueError("Unsupported obfuscation version: " + version)
# Derive AES key from URL
aesKey = re.sub(r"^https?:\/\/.*?\/", "/", link)
sha1_digest = hashlib.sha1(aesKey.encode("utf-8")).digest()
key = sha1_digest[:16]
# Decode counter
counter_bytes = base64.b64decode(counter_b64)
if len(counter_bytes) != 16:
raise ValueError(f"Expected counter to be 16 bytes, got {len(counter_bytes)}")
prefix = counter_bytes[:8]
initial_value = int.from_bytes(counter_bytes[8:], byteorder="big")
# Create AES-CTR cipher
ctr = Counter.new(64, prefix=prefix, initial_value=initial_value, little_endian=False) # type: ignore
cipher = AES.new(key, AES.MODE_CTR, counter=ctr) # type: ignore
decrypted_part = cipher.decrypt(image_data[:1024])
new_data = decrypted_part + image_data[1024:]
return new_data
def download_one_image(
session: requests.Session,
link: str,
i: int,
directory: str,
book_id: str,
pages: int,
) -> None:
"""Download a single book page image.
Handles obfuscated images and re-borrowing on 403 errors.
Args:
session: Authenticated requests.Session
link: Direct image URL
i: Page index (0-based)
directory: Directory to save to
book_id: Archive.org book ID (for re-borrowing on 403)
pages: Total number of pages
"""
headers = {
"Referer": "https://archive.org/",
"Accept": "image/avif,image/webp,image/apng,image/*,*/*;q=0.8",
"Sec-Fetch-Site": "same-site",
"Sec-Fetch-Mode": "no-cors",
"Sec-Fetch-Dest": "image",
}
retry = True
response = None
while retry:
try:
response = session.get(link, headers=headers, timeout=30)
if response.status_code == 403:
session = loan(session, book_id, verbose=False)
raise Exception("Borrow again")
if response.status_code == 200:
retry = False
except:
time.sleep(1)
image = image_name(pages, i, directory)
if response is None:
log(f"Failed to download page {i}", file=sys.stderr)
return
obf_header = response.headers.get("X-Obfuscate")
image_content = None
if obf_header:
try:
image_content = deobfuscate_image(response.content, link, obf_header)
except Exception as e:
log(f"Deobfuscation failed: {e}", file=sys.stderr)
return
else:
image_content = response.content
with open(image, "wb") as f:
f.write(image_content)
def download(
session: requests.Session,
n_threads: int,
directory: str,
links: List[str],
scale: int,
book_id: str,
) -> List[str]:
"""Download all book pages as images.
Uses thread pool for parallel downloads.
Args:
session: Authenticated requests.Session
n_threads: Number of download threads
directory: Directory to save images to
links: List of image URLs
scale: Image resolution (0=highest, 10=lowest)
book_id: Archive.org book ID (for re-borrowing)
Returns:
List of downloaded image file paths
"""
debug("Downloading pages...")
links = [f"{link}&rotate=0&scale={scale}" for link in links]
pages = len(links)
tasks = []
with futures.ThreadPoolExecutor(max_workers=n_threads) as executor:
for link in links:
i = links.index(link)
tasks.append(
executor.submit(
download_one_image,
session=session,
link=link,
i=i,
directory=directory,
book_id=book_id,
pages=pages,
)
)
if tqdm:
for _ in tqdm(futures.as_completed(tasks), total=len(tasks)): # type: ignore
pass
else:
for _ in futures.as_completed(tasks):
pass
images = [image_name(pages, i, directory) for i in range(len(links))]
return images
def check_direct_download(book_id: str) -> Tuple[bool, str]:
"""Check if a book can be downloaded directly without borrowing.
Searches Archive.org metadata for downloadable PDF files.
Args:
book_id: Archive.org book identifier
Returns:
Tuple of (can_download: bool, pdf_url: str)
"""
try:
# First, try to get the metadata to find the actual PDF filename
metadata_url = f"https://archive.org/metadata/{book_id}"
response = requests.get(metadata_url, timeout=10)
response.raise_for_status()
metadata = response.json()
# Find PDF file in files list
if "files" in metadata:
for file_info in metadata["files"]:
filename = file_info.get("name", "")
if filename.endswith(".pdf") and file_info.get("source") == "original":
# Found the original PDF
pdf_filename = filename
pdf_url = f"https://archive.org/download/{book_id}/{pdf_filename.replace(' ', '%20')}"
# Verify it's accessible
check_response = requests.head(pdf_url, timeout=5, allow_redirects=True)
if check_response.status_code == 200:
return True, pdf_url
return False, ""
except Exception as e:
log(f"Error checking direct download: {e}", file=sys.stderr)
return False, ""
def get_openlibrary_by_isbn(isbn: str) -> Dict[str, Any]:
"""Fetch book data from OpenLibrary using ISBN.
Args:
isbn: ISBN-10 or ISBN-13 to search for
Returns:
Dictionary with book metadata from OpenLibrary
"""
try:
# Try ISBN API first
api_url = f"https://openlibrary.org/api/books?bibkeys=ISBN:{isbn}&jscmd=data&format=json"
response = requests.get(api_url, timeout=10)
response.raise_for_status()
data = response.json()
if data:
# Get first result
key = list(data.keys())[0]
return data[key]
return {}
except Exception as e:
log(f"Error fetching OpenLibrary data by ISBN: {e}", file=sys.stderr)
return {}
def extract_isbn_from_metadata(metadata: Dict[str, Any]) -> str:
"""Extract ISBN from archive.org metadata.
Looks for ISBN in various metadata fields.
Args:
metadata: Archive.org metadata dictionary
Returns:
ISBN string (clean, no hyphens) or empty string if not found
"""
# Try various common metadata fields
isbn_fields = [
"isbn", "ISBN", "isbn_13", "isbn_10", "isbns",
"isbn-10", "isbn-13", "identifer_isbn"
]
for field in isbn_fields:
if field in metadata:
isbn_val = metadata[field]
if isinstance(isbn_val, list):
isbn_val = isbn_val[0] if isbn_val else None
if isbn_val and isinstance(isbn_val, str):
# Clean ISBN (remove hyphens, spaces)
isbn_clean = isbn_val.replace("-", "").replace(" ", "")
if len(isbn_clean) in [10, 13]:
return isbn_clean
return ""
def normalize_url(url: str) -> str:
"""Convert openlibrary.org URL to archive.org URL.
Looks up the actual Archive.org ID from OpenLibrary API.
Args:
url: Book URL (archive.org or openlibrary.org format)
Returns:
Normalized archive.org URL
"""
url = url.strip()
# Already archive.org format
if url.startswith("https://archive.org/details/"):
return url
# Convert openlibrary.org format by querying the OpenLibrary API
if "openlibrary.org/books/" in url:
try:
# Extract the book ID (e.g., OL6796852M)
parts = url.split("/books/")
if len(parts) > 1:
book_id = parts[1].split("/")[0]
# Query OpenLibrary API to get the book metadata
api_url = f"https://openlibrary.org/books/{book_id}.json"
response = requests.get(api_url, timeout=10)
response.raise_for_status()
data = response.json()
# Look for identifiers including internet_archive or ocaid
# First try ocaid (Open Content Alliance ID) - this is most common
if "ocaid" in data:
ocaid = data["ocaid"]
return f"https://archive.org/details/{ocaid}"
# Check for identifiers object
if "identifiers" in data:
identifiers = data["identifiers"]
# Look for internet_archive ID
if "internet_archive" in identifiers:
ia_ids = identifiers["internet_archive"]
if isinstance(ia_ids, list) and ia_ids:
ia_id = ia_ids[0]
else:
ia_id = ia_ids
return f"https://archive.org/details/{ia_id}"
# If no IA identifier found, use the book ID as fallback
log(f"No Internet Archive ID found for {book_id}. Attempting with OpenLibrary ID.", file=sys.stderr)
return f"https://archive.org/details/{book_id}"
except requests.RequestException as e:
log(f"Could not fetch OpenLibrary metadata: {e}", file=sys.stderr)
# Fallback to using the book ID directly
parts = url.split("/books/")
if len(parts) > 1:
book_id = parts[1].split("/")[0]
return f"https://archive.org/details/{book_id}"
except (KeyError, IndexError) as e:
log(f"Error parsing OpenLibrary response: {e}", file=sys.stderr)
# Fallback to using the book ID directly
parts = url.split("/books/")
if len(parts) > 1:
book_id = parts[1].split("/")[0]
return f"https://archive.org/details/{book_id}"
# Return original if can't parse
return url