hj
Some checks failed
smoke-mm / Install & smoke test mm --help (push) Has been cancelled

This commit is contained in:
2025-12-26 21:04:09 -08:00
parent 9310478a37
commit a595453a9b
7 changed files with 611 additions and 5 deletions

521
Provider/internetarchive.py Normal file
View File

@@ -0,0 +1,521 @@
from __future__ import annotations
import importlib
import os
import re
import sys
from pathlib import Path
from typing import Any, Dict, List, Optional
from ProviderCore.base import Provider, SearchResult
from ProviderCore.download import sanitize_filename
from SYS.logger import log
def _ia() -> Any:
try:
return importlib.import_module("internetarchive")
except Exception as exc:
raise Exception(f"internetarchive module not available: {exc}")
def _pick_provider_config(config: Any) -> Dict[str, Any]:
if not isinstance(config, dict):
return {}
provider = config.get("provider")
if not isinstance(provider, dict):
return {}
entry = provider.get("internetarchive")
if isinstance(entry, dict):
return entry
return {}
def _looks_fielded_query(q: str) -> bool:
low = (q or "").lower()
return (":" in low) or (" and " in low) or (" or " in low) or (" not " in low) or ("(" in low)
def _extract_identifier_from_any(value: str) -> str:
raw = str(value or "").strip()
if not raw:
return ""
if raw.lower().startswith("ia:"):
return raw.split(":", 1)[1].strip()
if raw.startswith("http://") or raw.startswith("https://"):
try:
from urllib.parse import urlparse
p = urlparse(raw)
host = (p.hostname or "").lower().strip()
path = (p.path or "").strip("/")
except Exception:
return ""
if not host.endswith("archive.org"):
return ""
parts = [x for x in path.split("/") if x]
# /details/<identifier>
if len(parts) >= 2 and parts[0].lower() == "details":
return str(parts[1]).strip()
# /download/<identifier>/<filename>
if len(parts) >= 2 and parts[0].lower() == "download":
return str(parts[1]).strip()
return ""
# Assume bare identifier
return raw
def _extract_download_filename_from_url(url: str) -> str:
raw = str(url or "").strip()
if not raw:
return ""
if not (raw.startswith("http://") or raw.startswith("https://")):
return ""
try:
from urllib.parse import urlparse
p = urlparse(raw)
host = (p.hostname or "").lower().strip()
path = (p.path or "").strip("/")
except Exception:
return ""
if not host.endswith("archive.org"):
return ""
parts = [x for x in path.split("/") if x]
# /download/<identifier>/<filename>
if len(parts) >= 3 and parts[0].lower() == "download":
return str(parts[2]).strip()
return ""
def _normalize_identifier(s: str) -> str:
text = str(s or "").strip().lower()
if not text:
return ""
# Internet Archive identifiers are fairly permissive; keep alnum, '_', '-', '.' and collapse the rest.
text = re.sub(r"[^a-z0-9_.-]+", "-", text)
text = re.sub(r"-+", "-", text).strip("-._")
if len(text) > 80:
text = text[:80].rstrip("-._")
return text
def _best_file_candidate(files: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
if not files:
return None
def _is_metadata(f: Dict[str, Any]) -> bool:
source = str(f.get("source") or "").strip().lower()
fmt = str(f.get("format") or "").strip().lower()
if source == "metadata":
return True
if fmt in {"metadata", "archive bittorrent"}:
return True
if fmt.startswith("thumbnail"):
return True
return False
def _size(f: Dict[str, Any]) -> int:
try:
return int(f.get("size") or 0)
except Exception:
return 0
candidates = [f for f in files if not _is_metadata(f)]
if not candidates:
candidates = list(files)
# Prefer originals.
originals = [f for f in candidates if str(f.get("source") or "").strip().lower() == "original"]
pool = originals if originals else candidates
pool = [f for f in pool if str(f.get("name") or "").strip()]
if not pool:
return None
pool.sort(key=_size, reverse=True)
return pool[0]
class InternetArchive(Provider):
"""Internet Archive provider using the `internetarchive` Python module.
Supports:
- search-provider -provider internetarchive <query>
- download-file / provider.download() from search results
- add-file -provider internetarchive (uploads)
Config (optional):
[provider=internetarchive]
access_key="..." # optional (upload)
secret_key="..." # optional (upload)
collection="..." # optional (upload)
mediatype="..." # optional (upload)
"""
def __init__(self, config: Optional[Dict[str, Any]] = None):
super().__init__(config)
conf = _pick_provider_config(self.config)
self._access_key = conf.get("access_key")
self._secret_key = conf.get("secret_key")
self._collection = conf.get("collection") or conf.get("default_collection")
self._mediatype = conf.get("mediatype") or conf.get("default_mediatype")
def validate(self) -> bool:
try:
_ia()
return True
except Exception:
return False
@staticmethod
def _media_kind_from_mediatype(mediatype: str) -> str:
mt = str(mediatype or "").strip().lower()
if mt in {"texts"}:
return "book"
if mt in {"audio", "etree"}:
return "audio"
if mt in {"movies"}:
return "video"
if mt in {"image"}:
return "image"
return "file"
def search(
self,
query: str,
limit: int = 50,
filters: Optional[Dict[str, Any]] = None,
**_kwargs: Any,
) -> List[SearchResult]:
ia = _ia()
search_items = getattr(ia, "search_items", None)
if not callable(search_items):
raise Exception("internetarchive.search_items is not available")
q = str(query or "").strip()
if not q:
return []
# If the user supplied a plain string, default to title search.
if not _looks_fielded_query(q) and q not in {"*", "*.*"}:
q = f'title:("{q}")'
fields = [
"identifier",
"title",
"mediatype",
"creator",
"date",
"downloads",
"collection",
]
try:
search: Any = search_items(q, fields=fields)
except Exception as exc:
raise Exception(f"Internet Archive search failed: {exc}")
out: List[SearchResult] = []
for row in search:
if len(out) >= int(limit or 50):
break
if not isinstance(row, dict):
continue
identifier = str(row.get("identifier") or "").strip()
if not identifier:
continue
title = str(row.get("title") or identifier).strip() or identifier
mediatype = str(row.get("mediatype") or "").strip()
creator = str(row.get("creator") or "").strip()
date = str(row.get("date") or "").strip()
annotations: List[str] = []
if mediatype:
annotations.append(mediatype)
if date:
annotations.append(date)
if creator:
annotations.append(creator)
detail_parts: List[str] = []
if creator:
detail_parts.append(creator)
if date:
detail_parts.append(date)
path = f"https://archive.org/details/{identifier}"
sr = SearchResult(
table="internetarchive",
title=title,
path=path,
detail=" · ".join(detail_parts),
annotations=annotations,
media_kind=self._media_kind_from_mediatype(mediatype),
size_bytes=None,
tag=set(),
columns=[
("identifier", identifier),
("mediatype", mediatype),
("date", date),
],
full_metadata=dict(row),
)
out.append(sr)
return out
def download_url(self, url: str, output_dir: Path) -> Optional[Path]:
"""Download an Internet Archive URL.
Supports:
- https://archive.org/details/<identifier>
- https://archive.org/download/<identifier>/<filename>
"""
sr = SearchResult(table="internetarchive", title=str(url), path=str(url), full_metadata={})
return self.download(sr, output_dir)
def download(self, result: SearchResult, output_dir: Path) -> Optional[Path]:
ia = _ia()
get_item = getattr(ia, "get_item", None)
download_fn = getattr(ia, "download", None)
if not callable(get_item):
raise Exception("internetarchive.get_item is not available")
if not callable(download_fn):
raise Exception("internetarchive.download is not available")
identifier = _extract_identifier_from_any(str(getattr(result, "path", "") or ""))
if not identifier:
return None
requested_filename = ""
try:
requested_filename = _extract_download_filename_from_url(str(result.path))
except Exception:
requested_filename = ""
try:
output_dir.mkdir(parents=True, exist_ok=True)
except Exception:
pass
try:
item: Any = get_item(identifier)
except Exception as exc:
raise Exception(f"Internet Archive item lookup failed: {exc}")
files: List[Dict[str, Any]] = []
try:
raw_files = getattr(item, "files", None)
if isinstance(raw_files, list):
for f in raw_files:
if isinstance(f, dict):
files.append(f)
except Exception:
files = []
if not files:
try:
for f in item.get_files():
name = getattr(f, "name", None)
if not name and isinstance(f, dict):
name = f.get("name")
if not name:
continue
files.append({
"name": str(name),
"size": getattr(f, "size", None),
"format": getattr(f, "format", None),
"source": getattr(f, "source", None),
})
except Exception:
files = []
chosen_name = ""
if requested_filename:
chosen_name = requested_filename
else:
chosen = _best_file_candidate(files)
if chosen is not None:
chosen_name = str(chosen.get("name") or "").strip()
if not chosen_name:
raise Exception("Internet Archive item has no downloadable files")
# Download the selected file.
try:
download_fn(
identifier,
files=[chosen_name],
destdir=str(output_dir),
no_directory=True,
ignore_existing=True,
verbose=False,
)
except TypeError:
# Older versions may not support some flags.
download_fn(
identifier,
files=[chosen_name],
destdir=str(output_dir),
)
except Exception as exc:
raise Exception(f"Internet Archive download failed: {exc}")
# Resolve downloaded path (library behavior varies by version/flags).
candidates = [
output_dir / chosen_name,
output_dir / identifier / chosen_name,
]
for p in candidates:
try:
if p.exists():
return p
except Exception:
continue
# As a last resort, try to find by basename.
try:
for root in (output_dir, output_dir / identifier):
if root.exists() and root.is_dir():
for child in root.iterdir():
if child.is_file() and child.name == chosen_name:
return child
except Exception:
pass
return None
def upload(self, file_path: str, **kwargs: Any) -> str:
"""Upload a file to Internet Archive.
If a piped item includes a tag `ia:<identifier>`, uploads to that identifier.
Otherwise creates a new identifier derived from the filename/title and hash.
Returns the item URL.
"""
ia = _ia()
upload_fn = getattr(ia, "upload", None)
if not callable(upload_fn):
raise Exception("internetarchive.upload is not available")
p = Path(str(file_path))
if not p.exists():
raise FileNotFoundError(f"File not found: {file_path}")
pipe_obj = kwargs.get("pipe_obj")
title = ""
file_hash = ""
tags: List[str] = []
try:
if pipe_obj is not None:
title = str(getattr(pipe_obj, "title", "") or "").strip()
file_hash = str(getattr(pipe_obj, "hash", "") or "").strip()
tags_val = getattr(pipe_obj, "tag", None)
if isinstance(tags_val, list):
tags = [str(t) for t in tags_val if t]
except Exception:
title = ""
file_hash = ""
tags = []
identifier = ""
for t in tags:
low = str(t or "").strip()
if low.lower().startswith("ia:"):
identifier = low.split(":", 1)[1].strip()
break
if low.lower().startswith("internetarchive:"):
identifier = low.split(":", 1)[1].strip()
break
if not identifier:
base_title = title or p.stem
slug = _normalize_identifier(base_title)
suffix = ""
if file_hash:
suffix = str(file_hash)[:10]
if slug and suffix:
identifier = f"{slug}-{suffix}"
elif slug:
identifier = slug
elif suffix:
identifier = f"medeia-{suffix}"
else:
identifier = _normalize_identifier(p.stem) or "medeia-upload"
identifier = _normalize_identifier(identifier)
if not identifier:
raise Exception("Could not determine Internet Archive identifier")
meta: Dict[str, Any] = {}
if title:
meta["title"] = title
else:
meta["title"] = p.stem
if isinstance(self._collection, str) and self._collection.strip():
meta["collection"] = self._collection.strip()
if isinstance(self._mediatype, str) and self._mediatype.strip():
meta["mediatype"] = self._mediatype.strip()
# Build upload options; credentials are optional if the user has internetarchive configured globally.
upload_kwargs: Dict[str, Any] = {"metadata": meta}
ak = os.getenv("IA_ACCESS_KEY") or self._access_key
sk = os.getenv("IA_SECRET_KEY") or self._secret_key
if isinstance(ak, str) and ak.strip():
upload_kwargs["access_key"] = ak.strip()
if isinstance(sk, str) and sk.strip():
upload_kwargs["secret_key"] = sk.strip()
# Use a friendly uploaded filename.
upload_name = sanitize_filename(p.name)
files = {upload_name: str(p)}
try:
resp: Any = upload_fn(identifier, files=files, **upload_kwargs)
except TypeError:
# Older versions may require positional args.
resp = upload_fn(identifier, files, meta)
except Exception as exc:
log(f"[internetarchive] Upload error: {exc}", file=sys.stderr)
raise
# Drain generator responses to catch failures.
try:
if resp is not None:
for r in resp:
if isinstance(r, dict) and r.get("success") is False:
raise Exception(str(r.get("error") or r))
except Exception as exc:
raise Exception(f"Internet Archive upload failed: {exc}")
item_url = f"https://archive.org/details/{identifier}"
try:
if pipe_obj is not None:
from Store import Store
Store(self.config, suppress_debug=True).try_add_url_for_pipe_object(pipe_obj, item_url)
except Exception:
pass
return item_url

View File

@@ -13,6 +13,74 @@ from urllib.parse import urlparse
from ProviderCore.base import Provider, SearchResult
_TELEGRAM_DEFAULT_TIMESTAMP_STEM_RE = re.compile(
r"^(?P<prefix>photo|video|document|audio|voice|animation)_(?P<date>\d{4}-\d{2}-\d{2})_(?P<time>\d{2}-\d{2}-\d{2})(?: \(\d+\))?$",
flags=re.IGNORECASE,
)
def _unique_path(path: Path) -> Path:
try:
if not path.exists():
return path
except Exception:
return path
stem = path.stem
suffix = path.suffix
parent = path.parent
for i in range(1, 10_000):
candidate = parent / f"{stem} ({i}){suffix}"
try:
if not candidate.exists():
return candidate
except Exception:
return candidate
return parent / f"{stem} (copy){suffix}"
def _maybe_strip_telegram_timestamped_default_filename(*, downloaded_path: Path) -> Path:
"""Normalize Telethon's default timestamped names.
Examples:
- photo_2025-12-27_02-58-09.jpg -> photo.jpg
"""
try:
stem = downloaded_path.stem
suffix = downloaded_path.suffix
except Exception:
return downloaded_path
if not suffix:
return downloaded_path
m = _TELEGRAM_DEFAULT_TIMESTAMP_STEM_RE.fullmatch(str(stem))
if not m:
return downloaded_path
prefix = str(m.group("prefix") or "").strip().lower()
if not prefix:
return downloaded_path
new_candidate = downloaded_path.with_name(f"{prefix}{suffix}")
if new_candidate == downloaded_path:
return downloaded_path
new_path = _unique_path(new_candidate)
try:
if downloaded_path.exists():
try:
downloaded_path.rename(new_path)
return new_path
except Exception:
shutil.move(str(downloaded_path), str(new_path))
return new_path
except Exception:
return downloaded_path
return downloaded_path
def _looks_like_telegram_message_url(url: str) -> bool:
try:
parsed = urlparse(str(url))
@@ -945,6 +1013,13 @@ class Telegram(Provider):
raise Exception("Telegram download returned no file")
downloaded_path = Path(str(downloaded))
# Telethon's default media filenames include timestamps (e.g. photo_YYYY-MM-DD_HH-MM-SS.jpg).
# Strip those timestamps ONLY when Telegram didn't provide an explicit filename.
if not file_name:
downloaded_path = _maybe_strip_telegram_timestamped_default_filename(
downloaded_path=downloaded_path,
)
date_iso = None
try:
if msg_date is not None and hasattr(msg_date, "isoformat"):

View File

@@ -24,6 +24,7 @@ from Provider.youtube import YouTube
from Provider.fileio import FileIO
from Provider.zeroxzero import ZeroXZero
from Provider.loc import LOC
from Provider.internetarchive import InternetArchive
_PROVIDERS: Dict[str, Type[Provider]] = {
@@ -31,6 +32,7 @@ _PROVIDERS: Dict[str, Type[Provider]] = {
"alldebrid": AllDebrid,
"libgen": Libgen,
"openlibrary": OpenLibrary,
"internetarchive": InternetArchive,
"soulseek": Soulseek,
"bandcamp": Bandcamp,
"youtube": YouTube,

View File

@@ -49,7 +49,7 @@ class Add_File(Cmdlet):
arg=[
SharedArgs.PATH,
SharedArgs.STORE,
CmdletArg(name="provider", type="string", required=False, description="File hosting provider (e.g., 0x0)", alias="prov"),
CmdletArg(name="provider", type="string", required=False, description="File hosting provider (e.g., 0x0, file.io, internetarchive)", alias="prov"),
CmdletArg(
name="room",
type="string",
@@ -66,6 +66,9 @@ class Add_File(Cmdlet):
" <path>: Copy file to specified directory",
"- File provider options (use -provider):",
" 0x0: Upload to 0x0.st for temporary hosting",
" file.io: Upload to file.io for temporary hosting",
" matrix: Upload to a Matrix room (requires Matrix config)",
" internetarchive: Upload to archive.org (optional tag: ia:<identifier> to upload into an existing item)",
],
exec=self.run,
)

View File

@@ -31,10 +31,10 @@ class Search_Provider(Cmdlet):
def __init__(self):
super().__init__(
name="search-provider",
summary="Search external providers (bandcamp, libgen, soulseek, youtube, alldebrid, loc)",
summary="Search external providers (bandcamp, libgen, soulseek, youtube, alldebrid, loc, internetarchive)",
usage="search-provider -provider <provider> <query> [-limit N] [-open ID]",
arg=[
CmdletArg("provider", type="string", required=True, description="Provider name: bandcamp, libgen, soulseek, youtube, alldebrid, loc"),
CmdletArg("provider", type="string", required=True, description="Provider name: bandcamp, libgen, soulseek, youtube, alldebrid, loc, internetarchive"),
CmdletArg("query", type="string", required=True, description="Search query (supports provider-specific syntax)"),
CmdletArg("limit", type="int", description="Maximum results to return (default: 50)"),
CmdletArg("open", type="int", description="(alldebrid) Open folder/magnet by ID and list its files"),
@@ -54,12 +54,15 @@ class Search_Provider(Cmdlet):
" Example: search-provider -provider soulseek \"pink floyd\"",
"- youtube: Search YouTube for videos",
" Example: search-provider -provider youtube \"tutorial\"",
"- internetarchive: Search archive.org items (advancedsearch syntax)",
" Example: search-provider -provider internetarchive \"title:(lincoln) AND mediatype:texts\"",
"",
"Query syntax:",
"- bandcamp: Use 'artist:Name' to search by artist",
"- libgen: Supports isbn:, author:, title: prefixes",
"- soulseek: Plain text search",
"- youtube: Plain text search",
"- internetarchive: Archive.org advancedsearch query syntax",
"",
"Results can be piped to other cmdlet:",
" search-provider -provider bandcamp \"artist:grace\" | @1 | download-data",

View File

@@ -5,7 +5,7 @@ Medios-Macina is a CLI media manager and toolkit focused on downloading, tagging
## Features
- **Flexible syntax structure:** chain commands with `|` and select options from tables with `@N`.
- **Multiple file stores:** *HYDRUSNETWORK, FOLDER*
- **Provider plugin integration:** *YOUTUBE, OPENLIBRARY/ARCHIVE.ORG, SOULSEEK, LIBGEN, ALLDEBRID, TELEGRAM, BANDCAMP*
- **Provider plugin integration:** *YOUTUBE, OPENLIBRARY, INTERNETARCHIVE, SOULSEEK, LIBGEN, ALLDEBRID, TELEGRAM, BANDCAMP*
- **Module Mixing:** *[Playwright](https://github.com/microsoft/playwright), [yt-dlp](https://github.com/yt-dlp/yt-dlp), [aioslsk](https://github.com/JurgenR/aioslsk), [telethon](https://github.com/LonamiWebs/Telethon),[typer](https://github.com/fastapi/typer)*
- **MPV Manager:** Play audio, video, and even images in a custom designed MPV with trimming, screenshotting, and more built right in!
@@ -77,7 +77,8 @@ search-store "ext:mp3"
- **HydrusNetwork**: use for database-backed media storage and advanced tagging (requires running Hydrus client/server).
- **Local folder**: copy files to a configured directory (fast and simple).
- **YouTube / yt-dlp**: robust media downloader for YouTube and many hosts.
- **OpenLibrary / Archive.org**: scripted metadata scraping and optional downloads.
- **OpenLibrary**: book metadata, borrowing, and Archive.org downloads.
- **Internet Archive**: search/download/upload via the `internetarchive` module.
- **Soulseek, LibGen, All-Debrid, Others**: provider support is modular—add or configure providers in `config.conf`.
## Troubleshooting & tips 🛠️

View File

@@ -9,6 +9,7 @@ yt-dlp[default]>=2023.11.0
requests>=2.31.0
httpx>=0.25.0
telethon>=1.36.0
internetarchive>=4.1.0
# Document and data handling
pypdf>=3.0.0