Files
Medios-Macina/Provider/internetarchive.py
2026-01-11 03:24:49 -08:00

895 lines
27 KiB
Python

from __future__ import annotations
import importlib
import os
import re
import sys
from pathlib import Path
from typing import Any, Dict, List, Optional
from urllib.parse import quote, urlparse
from ProviderCore.base import Provider, SearchResult
from SYS.utils import sanitize_filename
from SYS.logger import log
# Helper for download-file: render selectable formats for a details URL.
def maybe_show_formats_table(
*,
raw_urls: Any,
piped_items: Any,
parsed: Dict[str, Any],
config: Dict[str, Any],
quiet_mode: bool,
get_field: Any,
) -> Optional[int]:
"""If input is a single Internet Archive details URL, render a formats table.
Returns an exit code when handled; otherwise None.
"""
# Do not suppress the picker in quiet/background mode: this selector UX is
# required for Internet Archive "details" pages (which are not directly downloadable).
try:
total_inputs = int(len(raw_urls or []) + len(piped_items or []))
except Exception:
total_inputs = 0
if total_inputs != 1:
return None
item = piped_items[0] if piped_items else None
target = ""
if item is not None:
try:
target = str(get_field(item,
"path") or get_field(item,
"url") or "").strip()
except Exception:
target = ""
if not target and raw_urls:
target = str(raw_urls[0]).strip()
if not target:
return None
identifier = ""
try:
md = get_field(item, "full_metadata") if item is not None else None
if isinstance(md, dict):
identifier = str(md.get("identifier") or "").strip()
except Exception:
identifier = ""
if not identifier:
try:
identifier = str(extract_identifier(target) or "").strip()
except Exception:
identifier = ""
if not identifier:
return None
# Only show picker for item pages (details); direct download URLs should download immediately.
try:
if not is_details_url(target):
return None
except Exception:
return None
try:
files = list_download_files(identifier)
except Exception as exc:
log(f"download-file: Internet Archive lookup failed: {exc}", file=sys.stderr)
return 1
if not files:
log("download-file: Internet Archive item has no downloadable files", file=sys.stderr)
return 1
title = ""
try:
title = str(get_field(item, "title") or "").strip() if item is not None else ""
except Exception:
title = ""
table_title = (
f"Internet Archive: {title}".strip().rstrip(":")
if title else f"Internet Archive: {identifier}"
)
try:
from SYS.result_table import ResultTable
from SYS import pipeline as pipeline_context
except Exception as exc:
log(f"download-file: ResultTable unavailable: {exc}", file=sys.stderr)
return 1
base_args: List[str] = []
out_arg = parsed.get("path") or parsed.get("output")
if out_arg:
base_args.extend(["-path", str(out_arg)])
table = ResultTable(table_title).set_preserve_order(True)
table.set_table("internetarchive.format")
table.set_source_command("download-file", base_args)
rows: List[Dict[str, Any]] = []
for f in files:
name = str(f.get("name") or "").strip()
if not name:
continue
fmt = str(f.get("format") or "").strip()
src = str(f.get("source") or "").strip()
direct_url = str(f.get("direct_url") or "").strip()
if not direct_url:
continue
size_val: Any = f.get("size")
try:
size_val = int(size_val) if size_val not in (None, "") else ""
except Exception:
pass
row_item: Dict[str, Any] = {
"table": "internetarchive",
"title": fmt or name,
"path": direct_url,
"url": direct_url,
"columns": [
("Format", fmt),
("Name", name),
("Size", size_val),
("Source", src),
],
"_selection_args": [direct_url],
"full_metadata": {
"identifier": identifier,
"name": name,
"format": fmt,
"source": src,
"size": f.get("size"),
},
}
rows.append(row_item)
table.add_result(row_item)
if not rows:
log("download-file: no downloadable files found for this item", file=sys.stderr)
return 1
try:
pipeline_context.set_last_result_table(table, rows, subject=item)
pipeline_context.set_current_stage_table(table)
except Exception:
pass
log("Internet Archive item detected: select a file with @N to download", file=sys.stderr)
return 0
def _ia() -> Any:
try:
return importlib.import_module("internetarchive")
except Exception as exc:
raise Exception(f"internetarchive module not available: {exc}")
def _pick_provider_config(config: Any) -> Dict[str, Any]:
if not isinstance(config, dict):
return {}
provider = config.get("provider")
if not isinstance(provider, dict):
return {}
entry = provider.get("internetarchive")
if isinstance(entry, dict):
return entry
return {}
def _looks_fielded_query(q: str) -> bool:
low = (q or "").lower()
return (":" in low) or (" and " in low) or (" or "
in low) or (" not "
in low) or ("(" in low)
def _extract_identifier_from_any(value: str) -> str:
raw = str(value or "").strip()
if not raw:
return ""
if raw.lower().startswith("ia:"):
return raw.split(":", 1)[1].strip()
if raw.startswith("http://") or raw.startswith("https://"):
try:
from urllib.parse import urlparse
p = urlparse(raw)
host = (p.hostname or "").lower().strip()
path = (p.path or "").strip("/")
except Exception:
return ""
if not host.endswith("archive.org"):
return ""
parts = [x for x in path.split("/") if x]
# /details/<identifier>
if len(parts) >= 2 and parts[0].lower() == "details":
return str(parts[1]).strip()
# /download/<identifier>/<filename>
if len(parts) >= 2 and parts[0].lower() == "download":
return str(parts[1]).strip()
return ""
# Assume bare identifier
return raw
def extract_identifier(value: str) -> str:
"""Public wrapper for extracting an IA identifier from URLs/tags/bare ids."""
return _extract_identifier_from_any(value)
def is_details_url(url: str) -> bool:
raw = str(url or "").strip()
if not raw:
return False
if not (raw.startswith("http://") or raw.startswith("https://")):
return False
try:
p = urlparse(raw)
host = (p.hostname or "").lower().strip()
parts = [x for x in (p.path or "").split("/") if x]
except Exception:
return False
if not host.endswith("archive.org"):
return False
return len(parts) >= 2 and parts[0].lower() == "details" and bool(parts[1].strip())
def is_download_file_url(url: str) -> bool:
raw = str(url or "").strip()
if not raw:
return False
if not (raw.startswith("http://") or raw.startswith("https://")):
return False
try:
p = urlparse(raw)
host = (p.hostname or "").lower().strip()
parts = [x for x in (p.path or "").split("/") if x]
except Exception:
return False
if not host.endswith("archive.org"):
return False
# /download/<identifier>/<filename>
return (
len(parts) >= 3 and parts[0].lower() == "download" and bool(parts[1].strip())
and bool(parts[2].strip())
)
def list_download_files(identifier: str) -> List[Dict[str, Any]]:
"""Return a sorted list of downloadable files for an IA identifier.
Each entry includes: name, size, format, source, direct_url.
"""
ident = str(identifier or "").strip()
if not ident:
return []
ia = _ia()
get_item = getattr(ia, "get_item", None)
if not callable(get_item):
raise Exception("internetarchive.get_item is not available")
try:
item: Any = get_item(str(ident))
except Exception as exc:
raise Exception(f"Internet Archive item lookup failed: {exc}")
files: List[Dict[str, Any]] = []
try:
raw_files = getattr(item, "files", None)
if isinstance(raw_files, list):
for f in raw_files:
if isinstance(f, dict):
files.append(f)
except Exception:
files = []
if not files:
try:
for f in item.get_files():
name = getattr(f, "name", None)
if not name and isinstance(f, dict):
name = f.get("name")
if not name:
continue
files.append(
{
"name": str(name),
"size": getattr(f,
"size",
None),
"format": getattr(f,
"format",
None),
"source": getattr(f,
"source",
None),
}
)
except Exception:
files = []
if not files:
return []
def _is_ia_metadata_file(f: Dict[str, Any]) -> bool:
try:
source = str(f.get("source") or "").strip().lower()
fmt = str(f.get("format") or "").strip().lower()
except Exception:
source = ""
fmt = ""
if source == "metadata":
return True
if fmt in {"metadata",
"archive bittorrent"}:
return True
if fmt.startswith("thumbnail"):
return True
return False
candidates = [
f for f in files if isinstance(f, dict) and not _is_ia_metadata_file(f)
]
if not candidates:
candidates = [f for f in files if isinstance(f, dict)]
out: List[Dict[str, Any]] = []
for f in candidates:
name = str(f.get("name") or "").strip()
if not name:
continue
direct_url = f"https://archive.org/download/{ident}/{quote(name, safe='')}"
out.append(
{
"name": name,
"size": f.get("size"),
"format": f.get("format"),
"source": f.get("source"),
"direct_url": direct_url,
}
)
def _key(f: Dict[str, Any]) -> tuple[str, str]:
fmt = str(f.get("format") or "").strip().lower()
name = str(f.get("name") or "").strip().lower()
return (fmt, name)
out.sort(key=_key)
return out
def _extract_download_filename_from_url(url: str) -> str:
raw = str(url or "").strip()
if not raw:
return ""
if not (raw.startswith("http://") or raw.startswith("https://")):
return ""
try:
from urllib.parse import urlparse
p = urlparse(raw)
host = (p.hostname or "").lower().strip()
path = (p.path or "").strip("/")
except Exception:
return ""
if not host.endswith("archive.org"):
return ""
parts = [x for x in path.split("/") if x]
# /download/<identifier>/<filename>
if len(parts) >= 3 and parts[0].lower() == "download":
return str(parts[2]).strip()
return ""
def _normalize_identifier(s: str) -> str:
text = str(s or "").strip().lower()
if not text:
return ""
# Internet Archive identifiers are fairly permissive; keep alnum, '_', '-', '.' and collapse the rest.
text = re.sub(r"[^a-z0-9_.-]+", "-", text)
text = re.sub(r"-+", "-", text).strip("-._")
if len(text) > 80:
text = text[:80].rstrip("-._")
return text
def _best_file_candidate(files: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
if not files:
return None
def _is_metadata(f: Dict[str, Any]) -> bool:
source = str(f.get("source") or "").strip().lower()
fmt = str(f.get("format") or "").strip().lower()
if source == "metadata":
return True
if fmt in {"metadata",
"archive bittorrent"}:
return True
if fmt.startswith("thumbnail"):
return True
return False
def _size(f: Dict[str, Any]) -> int:
try:
return int(f.get("size") or 0)
except Exception:
return 0
candidates = [f for f in files if not _is_metadata(f)]
if not candidates:
candidates = list(files)
# Prefer originals.
originals = [
f for f in candidates
if str(f.get("source") or "").strip().lower() == "original"
]
pool = originals if originals else candidates
pool = [f for f in pool if str(f.get("name") or "").strip()]
if not pool:
return None
pool.sort(key=_size, reverse=True)
return pool[0]
class InternetArchive(Provider):
"""Internet Archive provider using the `internetarchive` Python module.
Supports:
- search-file -provider internetarchive <query>
- download-file / provider.download() from search results
- add-file -provider internetarchive (uploads)
"""
URL = ("archive.org",)
@classmethod
def config(cls) -> List[Dict[str, Any]]:
return [
{
"key": "access_key",
"label": "Access Key (for uploads)",
"default": "",
"secret": True
},
{
"key": "secret_key",
"label": "Secret Key (for uploads)",
"default": "",
"secret": True
},
{
"key": "collection",
"label": "Default Collection",
"default": ""
},
{
"key": "mediatype",
"label": "Default Mediatype",
"default": ""
}
]
TABLE_AUTO_STAGES = {
"internetarchive": ["download-file"],
"internetarchive.folder": ["download-file"],
"internetarchive.format": ["download-file"],
"internetarchive.formats": ["download-file"],
}
def __init__(self, config: Optional[Dict[str, Any]] = None):
super().__init__(config)
conf = _pick_provider_config(self.config)
self._access_key = conf.get("access_key")
self._secret_key = conf.get("secret_key")
self._collection = conf.get("collection") or conf.get("default_collection")
self._mediatype = conf.get("mediatype") or conf.get("default_mediatype")
def validate(self) -> bool:
try:
_ia()
return True
except Exception:
return False
@staticmethod
def _media_kind_from_mediatype(mediatype: str) -> str:
mt = str(mediatype or "").strip().lower()
if mt in {"texts"}:
return "book"
if mt in {"audio",
"etree"}:
return "audio"
if mt in {"movies"}:
return "video"
if mt in {"image"}:
return "image"
return "file"
def search(
self,
query: str,
limit: int = 50,
filters: Optional[Dict[str,
Any]] = None,
**_kwargs: Any,
) -> List[SearchResult]:
ia = _ia()
search_items = getattr(ia, "search_items", None)
if not callable(search_items):
raise Exception("internetarchive.search_items is not available")
q = str(query or "").strip()
if not q:
return []
# If the user supplied a plain string, default to title search.
if not _looks_fielded_query(q) and q not in {"*",
"*.*"}:
q = f'title:("{q}")'
fields = [
"identifier",
"title",
"mediatype",
"creator",
"date",
"collection",
]
try:
search: Any = search_items(q, fields=fields)
except Exception as exc:
raise Exception(f"Internet Archive search failed: {exc}")
out: List[SearchResult] = []
for row in search:
if len(out) >= int(limit or 50):
break
if not isinstance(row, dict):
continue
identifier = str(row.get("identifier") or "").strip()
if not identifier:
continue
title = str(row.get("title") or identifier).strip() or identifier
mediatype = str(row.get("mediatype") or "").strip()
creator_raw = row.get("creator")
if isinstance(creator_raw, list):
creator = ", ".join(str(x) for x in creator_raw if x)
else:
creator = str(creator_raw or "").strip()
date = str(row.get("date") or "").strip()
annotations: List[str] = []
if mediatype:
annotations.append(mediatype)
if date:
annotations.append(date)
if creator:
annotations.append(creator)
detail_parts: List[str] = []
if creator:
detail_parts.append(creator)
if date:
detail_parts.append(date)
path = f"https://archive.org/details/{identifier}"
sr = SearchResult(
table="internetarchive.folder",
title=title,
path=path,
detail=" · ".join(detail_parts),
annotations=annotations,
media_kind=self._media_kind_from_mediatype(mediatype),
size_bytes=None,
tag=set(),
columns=[
("title",
title),
("mediatype",
mediatype),
("date",
date),
("creator",
creator),
],
full_metadata=dict(row),
)
out.append(sr)
return out
def download_url(self, url: str, output_dir: Path) -> Optional[Path]:
"""Download an Internet Archive URL.
Supports:
- https://archive.org/details/<identifier>
- https://archive.org/download/<identifier>/<filename>
"""
sr = SearchResult(
table="internetarchive",
title=str(url),
path=str(url),
full_metadata={}
)
return self.download(sr, output_dir)
def download(self, result: SearchResult, output_dir: Path) -> Optional[Path]:
ia = _ia()
get_item = getattr(ia, "get_item", None)
download_fn = getattr(ia, "download", None)
if not callable(get_item):
raise Exception("internetarchive.get_item is not available")
if not callable(download_fn):
raise Exception("internetarchive.download is not available")
identifier = _extract_identifier_from_any(
str(getattr(result,
"path",
"") or "")
)
if not identifier:
return None
requested_filename = ""
try:
requested_filename = _extract_download_filename_from_url(str(result.path))
except Exception:
requested_filename = ""
try:
output_dir.mkdir(parents=True, exist_ok=True)
except Exception:
pass
try:
item: Any = get_item(identifier)
except Exception as exc:
raise Exception(f"Internet Archive item lookup failed: {exc}")
files: List[Dict[str, Any]] = []
try:
raw_files = getattr(item, "files", None)
if isinstance(raw_files, list):
for f in raw_files:
if isinstance(f, dict):
files.append(f)
except Exception:
files = []
if not files:
try:
for f in item.get_files():
name = getattr(f, "name", None)
if not name and isinstance(f, dict):
name = f.get("name")
if not name:
continue
files.append(
{
"name": str(name),
"size": getattr(f,
"size",
None),
"format": getattr(f,
"format",
None),
"source": getattr(f,
"source",
None),
}
)
except Exception:
files = []
chosen_name = ""
if requested_filename:
chosen_name = requested_filename
else:
chosen = _best_file_candidate(files)
if chosen is not None:
chosen_name = str(chosen.get("name") or "").strip()
if not chosen_name:
raise Exception("Internet Archive item has no downloadable files")
# Download the selected file.
try:
download_fn(
identifier,
files=[chosen_name],
destdir=str(output_dir),
no_directory=True,
ignore_existing=True,
verbose=False,
)
except TypeError:
# Older versions may not support some flags.
download_fn(
identifier,
files=[chosen_name],
destdir=str(output_dir),
)
except Exception as exc:
raise Exception(f"Internet Archive download failed: {exc}")
# Resolve downloaded path (library behavior varies by version/flags).
candidates = [
output_dir / chosen_name,
output_dir / identifier / chosen_name,
]
for p in candidates:
try:
if p.exists():
return p
except Exception:
continue
# As a last resort, try to find by basename.
try:
for root in (output_dir, output_dir / identifier):
if root.exists() and root.is_dir():
for child in root.iterdir():
if child.is_file() and child.name == chosen_name:
return child
except Exception:
pass
return None
def upload(self, file_path: str, **kwargs: Any) -> str:
"""Upload a file to Internet Archive.
If a piped item includes a tag `ia:<identifier>`, uploads to that identifier.
Otherwise creates a new identifier derived from the filename/title and hash.
Returns the item URL.
"""
ia = _ia()
upload_fn = getattr(ia, "upload", None)
if not callable(upload_fn):
raise Exception("internetarchive.upload is not available")
p = Path(str(file_path))
if not p.exists():
raise FileNotFoundError(f"File not found: {file_path}")
pipe_obj = kwargs.get("pipe_obj")
title = ""
file_hash = ""
tags: List[str] = []
try:
if pipe_obj is not None:
title = str(getattr(pipe_obj, "title", "") or "").strip()
file_hash = str(getattr(pipe_obj, "hash", "") or "").strip()
tags_val = getattr(pipe_obj, "tag", None)
if isinstance(tags_val, list):
tags = [str(t) for t in tags_val if t]
except Exception:
title = ""
file_hash = ""
tags = []
identifier = ""
for t in tags:
low = str(t or "").strip()
if low.lower().startswith("ia:"):
identifier = low.split(":", 1)[1].strip()
break
if low.lower().startswith("internetarchive:"):
identifier = low.split(":", 1)[1].strip()
break
if not identifier:
base_title = title or p.stem
slug = _normalize_identifier(base_title)
suffix = ""
if file_hash:
suffix = str(file_hash)[:10]
if slug and suffix:
identifier = f"{slug}-{suffix}"
elif slug:
identifier = slug
elif suffix:
identifier = f"medeia-{suffix}"
else:
identifier = _normalize_identifier(p.stem) or "medeia-upload"
identifier = _normalize_identifier(identifier)
if not identifier:
raise Exception("Could not determine Internet Archive identifier")
meta: Dict[str,
Any] = {}
if title:
meta["title"] = title
else:
meta["title"] = p.stem
if isinstance(self._collection, str) and self._collection.strip():
meta["collection"] = self._collection.strip()
if isinstance(self._mediatype, str) and self._mediatype.strip():
meta["mediatype"] = self._mediatype.strip()
# Build upload options; credentials are optional if the user has internetarchive configured globally.
upload_kwargs: Dict[str,
Any] = {
"metadata": meta
}
ak = os.getenv("IA_ACCESS_KEY") or self._access_key
sk = os.getenv("IA_SECRET_KEY") or self._secret_key
if isinstance(ak, str) and ak.strip():
upload_kwargs["access_key"] = ak.strip()
if isinstance(sk, str) and sk.strip():
upload_kwargs["secret_key"] = sk.strip()
# Use a friendly uploaded filename.
upload_name = sanitize_filename(p.name)
files = {
upload_name: str(p)
}
try:
resp: Any = upload_fn(identifier, files=files, **upload_kwargs)
except TypeError:
# Older versions may require positional args.
resp = upload_fn(identifier, files, meta)
except Exception as exc:
log(f"[internetarchive] Upload error: {exc}", file=sys.stderr)
raise
# Drain generator responses to catch failures.
try:
if resp is not None:
for r in resp:
if isinstance(r, dict) and r.get("success") is False:
raise Exception(str(r.get("error") or r))
except Exception as exc:
raise Exception(f"Internet Archive upload failed: {exc}")
item_url = f"https://archive.org/details/{identifier}"
try:
if pipe_obj is not None:
from Store import Store
Store(
self.config,
suppress_debug=True
).try_add_url_for_pipe_object(pipe_obj,
item_url)
except Exception:
pass
return item_url