Files
Medios-Macina/Provider/openlibrary.py
2025-12-12 21:55:38 -08:00

359 lines
12 KiB
Python

from __future__ import annotations
import shutil
import sys
import tempfile
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import requests
from ProviderCore.base import SearchProvider, SearchResult
from ProviderCore.download import download_file, sanitize_filename
from cli_syntax import get_field, get_free_text, parse_query
from SYS.logger import log
from SYS.utils import unique_path
def _looks_like_isbn(text: str) -> bool:
t = (text or "").replace("-", "").strip()
return t.isdigit() and len(t) in (10, 13)
def _first_str(value: Any) -> Optional[str]:
if isinstance(value, str):
v = value.strip()
return v if v else None
if isinstance(value, list) and value:
first = value[0]
if isinstance(first, str):
v = first.strip()
return v if v else None
return str(first) if first is not None else None
return None
def _resolve_edition_id(doc: Dict[str, Any]) -> str:
# OpenLibrary Search API typically provides edition_key: ["OL...M", ...]
edition_key = doc.get("edition_key")
if isinstance(edition_key, list) and edition_key:
return str(edition_key[0]).strip()
# Fallback: sometimes key can be /books/OL...M
key = doc.get("key")
if isinstance(key, str) and key.startswith("/books/"):
return key.split("/books/", 1)[1].strip("/")
return ""
def _check_lendable(session: requests.Session, edition_id: str) -> Tuple[bool, str]:
"""Return (lendable, status_text) using OpenLibrary volumes API."""
try:
if not edition_id or not edition_id.startswith("OL") or not edition_id.endswith("M"):
return False, "not-an-edition"
url = f"https://openlibrary.org/api/volumes/brief/json/OLID:{edition_id}"
resp = session.get(url, timeout=10)
resp.raise_for_status()
data = resp.json() or {}
wrapped = data.get(f"OLID:{edition_id}")
if not isinstance(wrapped, dict):
return False, "no-availability"
items = wrapped.get("items")
if not isinstance(items, list) or not items:
return False, "no-items"
first = items[0]
status_val = ""
if isinstance(first, dict):
status_val = str(first.get("status", ""))
else:
status_val = str(first)
return ("lendable" in status_val.lower()), status_val
except requests.exceptions.Timeout:
return False, "api-timeout"
except Exception:
return False, "api-error"
def _resolve_archive_id(session: requests.Session, edition_id: str, ia_candidates: List[str]) -> str:
# Prefer IA identifiers already present in search results.
if ia_candidates:
first = ia_candidates[0].strip()
if first:
return first
# Otherwise query the edition JSON.
try:
resp = session.get(f"https://openlibrary.org/books/{edition_id}.json", timeout=10)
resp.raise_for_status()
data = resp.json() or {}
ocaid = data.get("ocaid")
if isinstance(ocaid, str) and ocaid.strip():
return ocaid.strip()
identifiers = data.get("identifiers")
if isinstance(identifiers, dict):
ia = identifiers.get("internet_archive")
ia_id = _first_str(ia)
if ia_id:
return ia_id
except Exception:
pass
return ""
class OpenLibrary(SearchProvider):
"""Search provider for OpenLibrary books + Archive.org direct/borrow download."""
def __init__(self, config: Optional[Dict[str, Any]] = None):
super().__init__(config)
self._session = requests.Session()
def search(
self,
query: str,
limit: int = 50,
filters: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> List[SearchResult]:
filters = filters or {}
parsed = parse_query(query)
isbn = get_field(parsed, "isbn")
author = get_field(parsed, "author")
title = get_field(parsed, "title")
free_text = get_free_text(parsed)
q = (isbn or title or author or free_text or query or "").strip()
if not q:
return []
if _looks_like_isbn(q):
q = f"isbn:{q.replace('-', '')}"
try:
resp = self._session.get(
"https://openlibrary.org/search.json",
params={"q": q, "limit": int(limit)},
timeout=10,
)
resp.raise_for_status()
data = resp.json() or {}
except Exception as exc:
log(f"[openlibrary] Search failed: {exc}", file=sys.stderr)
return []
results: List[SearchResult] = []
docs = data.get("docs") or []
if not isinstance(docs, list):
return []
for doc in docs[: int(limit)]:
if not isinstance(doc, dict):
continue
book_title = str(doc.get("title") or "").strip() or "Unknown"
authors = doc.get("author_name") or []
if isinstance(authors, str):
authors = [authors]
if not isinstance(authors, list):
authors = []
authors_list = [str(a) for a in authors if a]
year_val = doc.get("first_publish_year")
year = str(year_val) if year_val is not None else ""
edition_id = _resolve_edition_id(doc)
ia_val = doc.get("ia") or []
if isinstance(ia_val, str):
ia_val = [ia_val]
if not isinstance(ia_val, list):
ia_val = []
ia_ids = [str(x) for x in ia_val if x]
isbn_list = doc.get("isbn") or []
if isinstance(isbn_list, str):
isbn_list = [isbn_list]
if not isinstance(isbn_list, list):
isbn_list = []
isbn_13 = next((str(i) for i in isbn_list if len(str(i)) == 13), "")
isbn_10 = next((str(i) for i in isbn_list if len(str(i)) == 10), "")
columns = [
("Title", book_title),
("Author", ", ".join(authors_list)),
("Year", year),
("OLID", edition_id),
]
annotations: List[str] = []
if isbn_13:
annotations.append(f"isbn_13:{isbn_13}")
elif isbn_10:
annotations.append(f"isbn_10:{isbn_10}")
if ia_ids:
annotations.append("archive")
results.append(
SearchResult(
table="openlibrary",
title=book_title,
path=(f"https://openlibrary.org/books/{edition_id}" if edition_id else "https://openlibrary.org"),
detail=(
(f"By: {', '.join(authors_list)}" if authors_list else "")
+ (f" ({year})" if year else "")
).strip(),
annotations=annotations,
media_kind="book",
columns=columns,
full_metadata={
"openlibrary_id": edition_id,
"authors": authors_list,
"year": year,
"isbn_10": isbn_10,
"isbn_13": isbn_13,
"ia": ia_ids,
"raw": doc,
},
)
)
return results
def download(self, result: SearchResult, output_dir: Path) -> Optional[Path]:
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
meta = result.full_metadata or {}
edition_id = str(meta.get("openlibrary_id") or "").strip()
if not edition_id:
log("[openlibrary] Missing openlibrary_id; cannot download", file=sys.stderr)
return None
ia_ids = meta.get("ia") or []
if isinstance(ia_ids, str):
ia_ids = [ia_ids]
if not isinstance(ia_ids, list):
ia_ids = []
ia_candidates = [str(x) for x in ia_ids if x]
archive_id = _resolve_archive_id(self._session, edition_id, ia_candidates)
if not archive_id:
log("[openlibrary] No archive identifier available; cannot download", file=sys.stderr)
return None
safe_title = sanitize_filename(result.title)
# 1) Direct download if available.
try:
from API.archive_client import check_direct_download
can_direct, pdf_url = check_direct_download(archive_id)
except Exception:
can_direct, pdf_url = False, ""
if can_direct and pdf_url:
out_path = unique_path(output_dir / f"{safe_title}.pdf")
ok = download_file(pdf_url, out_path, session=self._session)
if ok:
return out_path
log("[openlibrary] Direct download failed", file=sys.stderr)
return None
# 2) Borrow flow (credentials required).
try:
from API.archive_client import BookNotAvailableError, credential_openlibrary, download as archive_download
from API.archive_client import get_book_infos, loan, login
email, password = credential_openlibrary(self.config or {})
if not email or not password:
log("[openlibrary] Archive credentials missing; cannot borrow", file=sys.stderr)
return None
lendable, reason = _check_lendable(self._session, edition_id)
if not lendable:
log(f"[openlibrary] Not lendable: {reason}", file=sys.stderr)
return None
session = login(email, password)
try:
session = loan(session, archive_id, verbose=False)
except BookNotAvailableError:
log("[openlibrary] Book not available to borrow", file=sys.stderr)
return None
except SystemExit:
log("[openlibrary] Borrow failed", file=sys.stderr)
return None
urls = [f"https://archive.org/borrow/{archive_id}", f"https://archive.org/details/{archive_id}"]
title = safe_title
links: Optional[List[str]] = None
last_exc: Optional[Exception] = None
for u in urls:
try:
title_raw, links, _metadata = get_book_infos(session, u)
if title_raw:
title = sanitize_filename(title_raw)
break
except Exception as exc:
last_exc = exc
continue
if not links:
log(f"[openlibrary] Failed to extract pages: {last_exc}", file=sys.stderr)
return None
temp_dir = tempfile.mkdtemp(prefix=f"{title}_", dir=str(output_dir))
try:
images = archive_download(session=session, n_threads=10, directory=temp_dir, links=links, scale=3, book_id=archive_id)
try:
import img2pdf # type: ignore
pdf_bytes = img2pdf.convert(images) if images else None
if not pdf_bytes:
log("[openlibrary] PDF conversion failed", file=sys.stderr)
try:
shutil.rmtree(temp_dir)
except Exception:
pass
return None
pdf_path = unique_path(output_dir / f"{title}.pdf")
with open(pdf_path, "wb") as f:
f.write(pdf_bytes)
try:
shutil.rmtree(temp_dir)
except Exception:
pass
return pdf_path
except ImportError:
# Keep images folder.
return Path(temp_dir)
except Exception:
try:
shutil.rmtree(temp_dir)
except Exception:
pass
raise
except Exception as exc:
log(f"[openlibrary] Borrow workflow error: {exc}", file=sys.stderr)
return None
def validate(self) -> bool:
return True