Files
Medios-Macina/helper/hydrus.py

1554 lines
58 KiB
Python
Raw Normal View History

2025-11-25 20:09:33 -08:00
"""Hydrus API helpers and export utilities."""
from __future__ import annotations
import base64
import json
import os
import re
import shutil
import subprocess
import sys
import time
from helper.logger import log, debug
import tempfile
import logging
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Iterable, Optional, Sequence, Type, TypeVar, Union, cast
from urllib.parse import urlsplit, urlencode, quote
import httpx
logger = logging.getLogger(__name__)
try: # Optional metadata helper for audio files
import mutagen # type: ignore
except ImportError: # pragma: no cover - best effort
mutagen = None # type: ignore
from .utils import (
decode_cbor,
jsonify,
ensure_directory,
sanitize_metadata_value,
unique_path,
unique_preserve_order,
)
from .http_client import HTTPClient
class HydrusRequestError(RuntimeError):
"""Raised when the Hydrus Client API returns an error response."""
def __init__(self, status: int, message: str, payload: Any | None = None) -> None:
super().__init__(f"Hydrus request failed ({status}): {message}")
self.status = status
self.payload = payload
class HydrusConnectionError(HydrusRequestError):
"""Raised when Hydrus service is unavailable (connection refused, timeout, etc.).
This is an expected error when Hydrus is not running and should not include
a full traceback in logs.
"""
def __init__(self, message: str) -> None:
super().__init__(0, message, None) # status 0 indicates connection error
self.is_connection_error = True
@dataclass(slots=True)
class HydrusRequestSpec:
method: str
endpoint: str
query: dict[str, Any] | None = None
data: Any | None = None
file_path: Path | None = None
content_type: str | None = None
accept: str | None = "application/cbor"
@dataclass(slots=True)
class HydrusClient:
"""Thin wrapper around the Hydrus Client API."""
base_url: str
access_key: str = ""
timeout: float = 60.0
scheme: str = field(init=False)
hostname: str = field(init=False)
port: int = field(init=False)
base_path: str = field(init=False)
_session_key: str = field(init=False, default="", repr=False) # Cached session key
def __post_init__(self) -> None:
if not self.base_url:
raise ValueError("Hydrus base URL is required")
self.base_url = self.base_url.rstrip("/")
parsed = urlsplit(self.base_url)
if parsed.scheme not in {"http", "https"}:
raise ValueError("Hydrus base URL must use http or https")
self.scheme = parsed.scheme
self.hostname = parsed.hostname or "localhost"
self.port = parsed.port or (443 if self.scheme == "https" else 80)
self.base_path = parsed.path.rstrip("/")
self.access_key = self.access_key or ""
# ------------------------------------------------------------------
# low-level helpers
# ------------------------------------------------------------------
def _build_path(self, endpoint: str, query: dict[str, Any] | None = None) -> str:
path = endpoint if endpoint.startswith("/") else f"/{endpoint}"
if self.base_path:
path = f"{self.base_path}{path}"
if query:
encoded = urlencode(query, doseq=True)
if encoded:
path = f"{path}?{encoded}"
return path
def _perform_request(self, spec: HydrusRequestSpec) -> Any:
headers: dict[str, str] = {}
# On first request, try to acquire session key for security
if not self._session_key and self.access_key and spec.endpoint != "/session_key":
try:
logger.debug(f"[Hydrus] Acquiring session key on first request...")
self._acquire_session_key()
except Exception as e:
# If session key acquisition fails, fall back to access key
logger.debug(f"[Hydrus] Session key acquisition failed: {e}. Using access key instead.")
# Use session key if available, otherwise use access key
if self._session_key:
headers["Hydrus-Client-API-Session-Key"] = self._session_key
elif self.access_key:
headers["Hydrus-Client-API-Access-Key"] = self.access_key
if spec.accept:
headers["Accept"] = spec.accept
path = self._build_path(spec.endpoint, spec.query)
url = f"{self.scheme}://{self.hostname}:{self.port}{path}"
# Log request details
logger.debug(f"[Hydrus] {spec.method} {spec.endpoint} (auth: {'session_key' if self._session_key else 'access_key' if self.access_key else 'none'})")
status = 0
reason = ""
body = b""
content_type = ""
try:
with HTTPClient(timeout=self.timeout, headers=headers, verify_ssl=False) as client:
response = None
if spec.file_path is not None:
file_path = Path(spec.file_path)
if not file_path.is_file():
error_msg = f"Upload file not found: {file_path}"
logger.error(f"[Hydrus] {error_msg}")
raise FileNotFoundError(error_msg)
file_size = file_path.stat().st_size
headers["Content-Type"] = spec.content_type or "application/octet-stream"
headers["Content-Length"] = str(file_size)
logger.debug(f"[Hydrus] Uploading file {file_path.name} ({file_size} bytes)")
def file_gen():
with file_path.open("rb") as handle:
while chunk := handle.read(65536):
yield chunk
response = client.request(
spec.method,
url,
content=file_gen(),
headers=headers
)
else:
content = None
json_data = None
if spec.data is not None:
if isinstance(spec.data, (bytes, bytearray)):
content = spec.data
else:
json_data = spec.data
logger.debug(f"[Hydrus] Request body size: {len(content) if content else 'json'}")
response = client.request(
spec.method,
url,
content=content,
json=json_data,
headers=headers
)
status = response.status_code
reason = response.reason_phrase
body = response.content
content_type = response.headers.get("Content-Type", "") or ""
logger.debug(f"[Hydrus] Response {status} {reason} ({len(body)} bytes)")
except (httpx.ConnectError, httpx.TimeoutException, httpx.NetworkError) as exc:
msg = f"Hydrus unavailable: {exc}"
logger.warning(f"[Hydrus] {msg}")
raise HydrusConnectionError(msg) from exc
except httpx.HTTPStatusError as exc:
response = exc.response
status = response.status_code
reason = response.reason_phrase
body = response.content
content_type = response.headers.get("Content-Type", "") or ""
except Exception as exc:
logger.error(f"[Hydrus] Connection error: {exc}", exc_info=True)
raise
payload: Any
payload = {}
if body:
content_main = content_type.split(";", 1)[0].strip().lower()
if "json" in content_main:
try:
payload = json.loads(body.decode("utf-8"))
except (json.JSONDecodeError, UnicodeDecodeError):
payload = body.decode("utf-8", "replace")
elif "cbor" in content_main:
try:
payload = decode_cbor(body)
except Exception:
payload = body
else:
payload = body
if status >= 400:
message = ""
if isinstance(payload, dict):
message = str(payload.get("message") or payload.get("error") or payload)
elif isinstance(payload, str):
message = payload
else:
message = reason or "HTTP error"
logger.error(f"[Hydrus] HTTP {status}: {message}")
# Handle expired session key (419) by clearing cache and retrying once
if status == 419 and self._session_key and "session" in message.lower():
logger.warning(f"[Hydrus] Session key expired, acquiring new one and retrying...")
self._session_key = "" # Clear expired session key
try:
self._acquire_session_key()
# Retry the request with new session key
return self._perform_request(spec)
except Exception as retry_error:
logger.error(f"[Hydrus] Retry failed: {retry_error}", exc_info=True)
# If retry fails, raise the original error
raise HydrusRequestError(status, message, payload) from retry_error
raise HydrusRequestError(status, message, payload)
return payload
def _acquire_session_key(self) -> str:
"""Acquire a session key from the Hydrus API using the access key.
Session keys are temporary authentication tokens that expire after 24 hours
of inactivity, client restart, or if the access key is deleted. They are
more secure than passing access keys in every request.
Returns the session key string.
Raises HydrusRequestError if the request fails.
"""
if not self.access_key:
raise HydrusRequestError(401, "Cannot acquire session key: no access key configured")
# Temporarily use access key to get session key
original_session_key = self._session_key
try:
self._session_key = "" # Clear session key to use access key for this request
result = self._get("/session_key")
session_key = result.get("session_key")
if not session_key:
raise HydrusRequestError(500, "Session key response missing 'session_key' field", result)
self._session_key = session_key
return session_key
except HydrusRequestError:
self._session_key = original_session_key
raise
except Exception as e:
self._session_key = original_session_key
raise HydrusRequestError(500, f"Failed to acquire session key: {e}")
def ensure_session_key(self) -> str:
"""Ensure a valid session key exists, acquiring one if needed.
Returns the session key. If one is already cached, returns it.
Otherwise acquires a new session key from the API.
"""
if self._session_key:
return self._session_key
return self._acquire_session_key()
def _get(self, endpoint: str, *, query: dict[str, Any] | None = None) -> dict[str, Any]:
spec = HydrusRequestSpec("GET", endpoint, query=query)
return cast(dict[str, Any], self._perform_request(spec))
def _post(
self,
endpoint: str,
*,
data: dict[str, Any] | None = None,
file_path: Path | None = None,
content_type: str | None = None,
) -> dict[str, Any]:
spec = HydrusRequestSpec("POST", endpoint, data=data, file_path=file_path, content_type=content_type)
return cast(dict[str, Any], self._perform_request(spec))
def _ensure_hashes(self, hashes: Union[str, Iterable[str]]) -> list[str]:
if isinstance(hashes, str):
return [hashes]
return list(hashes)
def _append_access_key(self, url: str) -> str:
if not self.access_key:
return url
separator = "&" if "?" in url else "?"
# Use the correct parameter name for Hydrus API compatibility
return f"{url}{separator}access_key={quote(self.access_key)}"
# ------------------------------------------------------------------
# public API wrappers
# ------------------------------------------------------------------
def add_file(self, file_path: Path) -> dict[str, Any]:
return self._post("/add_files/add_file", file_path=file_path)
def add_tags(self, file_hashes: Union[str, Iterable[str]], tags: Iterable[str], service_name: str) -> dict[str, Any]:
hashes = self._ensure_hashes(file_hashes)
body = {"hashes": hashes, "service_names_to_tags": {service_name: list(tags)}}
return self._post("/add_tags/add_tags", data=body)
def delete_tags(
self,
file_hashes: Union[str, Iterable[str]],
tags: Iterable[str],
service_name: str,
*,
action: int = 1,
) -> dict[str, Any]:
hashes = self._ensure_hashes(file_hashes)
body = {
"hashes": hashes,
"service_names_to_actions_to_tags": {service_name: {action: list(tags)}},
}
return self._post("/add_tags/add_tags", data=body)
def add_tags_by_key(self, file_hashes: Union[str, Iterable[str]], tags: Iterable[str], service_key: str) -> dict[str, Any]:
hashes = self._ensure_hashes(file_hashes)
body = {"hashes": hashes, "service_keys_to_tags": {service_key: list(tags)}}
return self._post("/add_tags/add_tags", data=body)
def delete_tags_by_key(
self,
file_hashes: Union[str, Iterable[str]],
tags: Iterable[str],
service_key: str,
*,
action: int = 1,
) -> dict[str, Any]:
hashes = self._ensure_hashes(file_hashes)
body = {
"hashes": hashes,
"service_keys_to_actions_to_tags": {service_key: {action: list(tags)}},
}
return self._post("/add_tags/add_tags", data=body)
def associate_url(self, file_hashes: Union[str, Iterable[str]], url: str) -> dict[str, Any]:
hashes = self._ensure_hashes(file_hashes)
if len(hashes) == 1:
body = {"hash": hashes[0], "url_to_add": url}
return self._post("/add_urls/associate_url", data=body)
results: dict[str, Any] = {}
for file_hash in hashes:
body = {"hash": file_hash, "url_to_add": url}
results[file_hash] = self._post("/add_urls/associate_url", data=body)
return {"batched": results}
def delete_url(self, file_hashes: Union[str, Iterable[str]], url: str) -> dict[str, Any]:
hashes = self._ensure_hashes(file_hashes)
if len(hashes) == 1:
body = {"hash": hashes[0], "url_to_delete": url}
return self._post("/add_urls/associate_url", data=body)
results: dict[str, Any] = {}
for file_hash in hashes:
body = {"hash": file_hash, "url_to_delete": url}
results[file_hash] = self._post("/add_urls/associate_url", data=body)
return {"batched": results}
def set_notes(self, file_hashes: Union[str, Iterable[str]], notes: dict[str, str], service_name: str) -> dict[str, Any]:
if not notes:
raise ValueError("notes mapping must not be empty")
hashes = self._ensure_hashes(file_hashes)
body = {"hashes": hashes, "service_names_to_notes": {service_name: notes}}
return self._post("/add_notes/set_notes", data=body)
def delete_notes(
self,
file_hashes: Union[str, Iterable[str]],
note_names: Sequence[str],
service_name: str,
) -> dict[str, Any]:
names = [name for name in note_names if name]
if not names:
raise ValueError("note_names must not be empty")
hashes = self._ensure_hashes(file_hashes)
body = {"hashes": hashes, "service_names_to_deleted_note_names": {service_name: names}}
return self._post("/add_notes/set_notes", data=body)
def get_file_relationships(self, file_hash: str) -> dict[str, Any]:
query = {"hash": file_hash}
return self._get("/manage_file_relationships/get_file_relationships", query=query)
def set_relationship(self, hash_a: str, hash_b: str, relationship: Union[str, int], do_default_content_merge: bool = False) -> dict[str, Any]:
"""Set a relationship between two files in Hydrus.
Args:
hash_a: First file hash
hash_b: Second file hash
relationship: Relationship type - can be string ("king", "alt", "related", etc)
or integer (0-4):
- 0 = duplicates
- 1 = alternate
- 2 = not_related
- 3 = related
- 4 = king
do_default_content_merge: Whether to perform default content merge
Returns:
Response from Hydrus API
"""
# Convert string relationship types to integers
if isinstance(relationship, str):
rel_map = {
"duplicates": 0,
"duplicate": 0,
"alt": 1,
"alternate": 1,
"not_related": 2,
"not related": 2,
"related": 3,
"king": 4,
}
relationship = rel_map.get(relationship.lower(), 3) # Default to "related" (3)
body = {
"relationships": [
{
"hash_a": hash_a,
"hash_b": hash_b,
"relationship": relationship,
"do_default_content_merge": do_default_content_merge,
}
]
}
return self._post("/manage_file_relationships/set_file_relationships", data=body)
def get_services(self) -> dict[str, Any]:
return self._get("/get_services")
def search_files(
self,
tags: Sequence[Any],
*,
file_service_name: str | None = None,
return_hashes: bool = False,
return_file_ids: bool = True,
include_current_tags: bool | None = None,
include_pending_tags: bool | None = None,
file_sort_type: int | None = None,
file_sort_asc: bool | None = None,
file_sort_key: str | None = None,
) -> dict[str, Any]:
if not tags:
raise ValueError("tags must not be empty")
query: dict[str, Any] = {}
query_fields = [
("tags", tags, lambda v: json.dumps(list(v))),
("file_service_name", file_service_name, lambda v: v),
("return_hashes", return_hashes, lambda v: "true" if v else None),
("return_file_ids", return_file_ids, lambda v: "true" if v else None),
(
"include_current_tags",
include_current_tags,
lambda v: "true" if v else "false" if v is not None else None,
),
(
"include_pending_tags",
include_pending_tags,
lambda v: "true" if v else "false" if v is not None else None,
),
("file_sort_type", file_sort_type, lambda v: str(v) if v is not None else None),
("file_sort_asc", file_sort_asc, lambda v: "true" if v else "false" if v is not None else None),
("file_sort_key", file_sort_key, lambda v: v),
]
for key, value, formatter in query_fields:
if value is None or value == []:
continue
formatted = formatter(value)
if formatted is not None:
query[key] = formatted
return self._get("/get_files/search_files", query=query)
def fetch_file_metadata(
self,
*,
file_ids: Sequence[int] | None = None,
hashes: Sequence[str] | None = None,
include_service_keys_to_tags: bool = True,
include_file_urls: bool = False,
include_duration: bool = True,
include_size: bool = True,
include_mime: bool = False,
include_notes: bool = False,
) -> dict[str, Any]:
if not file_ids and not hashes:
raise ValueError("Either file_ids or hashes must be provided")
query: dict[str, Any] = {}
query_fields = [
("file_ids", file_ids, lambda v: json.dumps(list(v))),
("hashes", hashes, lambda v: json.dumps(list(v))),
(
"include_service_keys_to_tags",
include_service_keys_to_tags,
lambda v: "true" if v else None,
),
("include_file_urls", include_file_urls, lambda v: "true" if v else None),
("include_duration", include_duration, lambda v: "true" if v else None),
("include_size", include_size, lambda v: "true" if v else None),
("include_mime", include_mime, lambda v: "true" if v else None),
("include_notes", include_notes, lambda v: "true" if v else None),
]
for key, value, formatter in query_fields:
if not value:
continue
formatted = formatter(value)
if formatted is not None:
query[key] = formatted
return self._get("/get_files/file_metadata", query=query)
def get_file_path(self, file_hash: str) -> dict[str, Any]:
"""Get the local file system path for a given file hash."""
query = {"hash": file_hash}
return self._get("/get_files/file_path", query=query)
def file_url(self, file_hash: str) -> str:
hash_param = quote(file_hash)
# Don't append access_key parameter for file downloads - use header instead
url = f"{self.base_url}/get_files/file?hash={hash_param}"
return url
def thumbnail_url(self, file_hash: str) -> str:
hash_param = quote(file_hash)
# Don't append access_key parameter for file downloads - use header instead
url = f"{self.base_url}/get_files/thumbnail?hash={hash_param}"
return url
HydrusCliOptionsT = TypeVar("HydrusCliOptionsT", bound="HydrusCliOptions")
@dataclass(slots=True)
class HydrusCliOptions:
url: str
method: str
access_key: str
accept: str
timeout: float
content_type: str | None
body_bytes: bytes | None = None
body_path: Path | None = None
debug: bool = False
@classmethod
def from_namespace(cls: Type[HydrusCliOptionsT], namespace: Any) -> HydrusCliOptionsT:
accept_header = namespace.accept or 'application/cbor'
body_bytes: bytes | None = None
body_path: Path | None = None
if namespace.body_file:
body_path = Path(namespace.body_file)
elif namespace.body is not None:
body_bytes = namespace.body.encode('utf-8')
return cls(
url=namespace.url,
method=namespace.method.upper(),
access_key=namespace.access_key or '',
accept=accept_header,
timeout=namespace.timeout,
content_type=namespace.content_type,
body_bytes=body_bytes,
body_path=body_path,
debug=bool(os.environ.get('DOWNLOW_DEBUG')),
)
def hydrus_request(args, parser) -> int:
if args.body and args.body_file:
parser.error('Only one of --body or --body-file may be supplied')
options = HydrusCliOptions.from_namespace(args)
parsed = urlsplit(options.url)
if parsed.scheme not in ('http', 'https'):
parser.error('Only http and https URLs are supported')
if not parsed.hostname:
parser.error('Invalid Hydrus URL')
headers: dict[str, str] = {}
if options.access_key:
headers['Hydrus-Client-API-Access-Key'] = options.access_key
if options.accept:
headers['Accept'] = options.accept
request_body_bytes: bytes | None = None
body_path: Path | None = None
if options.body_path is not None:
body_path = options.body_path
if not body_path.is_file():
parser.error(f'File not found: {body_path}')
headers.setdefault('Content-Type', options.content_type or 'application/octet-stream')
headers['Content-Length'] = str(body_path.stat().st_size)
elif options.body_bytes is not None:
request_body_bytes = options.body_bytes
headers['Content-Type'] = options.content_type or 'application/json'
assert request_body_bytes is not None
headers['Content-Length'] = str(len(request_body_bytes))
elif options.content_type:
headers['Content-Type'] = options.content_type
if parsed.username or parsed.password:
userinfo = f"{parsed.username or ''}:{parsed.password or ''}".encode('utf-8')
headers['Authorization'] = 'Basic ' + base64.b64encode(userinfo).decode('ascii')
path = parsed.path or '/'
if parsed.query:
path += '?' + parsed.query
port = parsed.port
if port is None:
port = 443 if parsed.scheme == 'https' else 80
connection_cls = http.client.HTTPSConnection if parsed.scheme == 'https' else http.client.HTTPConnection
host = parsed.hostname or 'localhost'
connection = connection_cls(host, port, timeout=options.timeout)
if options.debug:
log(f"Hydrus connecting to {parsed.scheme}://{host}:{port}{path}", file=sys.stderr)
response_bytes: bytes = b''
content_type = ''
status = 0
try:
if body_path is not None:
with body_path.open('rb') as handle:
if options.debug:
size_hint = headers.get('Content-Length', 'unknown')
log(f"Hydrus sending file body ({size_hint} bytes)", file=sys.stderr)
connection.putrequest(options.method, path)
host_header = host
if (parsed.scheme == 'http' and port not in (80, None)) or (parsed.scheme == 'https' and port not in (443, None)):
host_header = f"{host}:{port}"
connection.putheader('Host', host_header)
for key, value in headers.items():
if value:
connection.putheader(key, value)
connection.endheaders()
while True:
chunk = handle.read(65536)
if not chunk:
break
connection.send(chunk)
if options.debug:
log('[downlow.py] Hydrus upload complete; awaiting response', file=sys.stderr)
else:
if options.debug:
size_hint = 'none' if request_body_bytes is None else str(len(request_body_bytes))
log(f"Hydrus sending request body bytes={size_hint}", file=sys.stderr)
sanitized_headers = {k: v for k, v in headers.items() if v}
connection.request(options.method, path, body=request_body_bytes, headers=sanitized_headers)
response = connection.getresponse()
status = response.status
response_bytes = response.read()
if options.debug:
log(f"Hydrus response received ({len(response_bytes)} bytes)", file=sys.stderr)
content_type = response.getheader('Content-Type', '')
except (OSError, http.client.HTTPException) as exc:
log(f"HTTP error: {exc}", file=sys.stderr)
return 1
finally:
connection.close()
content_type_lower = (content_type or '').split(';', 1)[0].strip().lower()
accept_value = options.accept or ''
expect_cbor = 'cbor' in (content_type_lower or '') or 'cbor' in accept_value.lower()
payload = None
decode_error: Exception | None = None
if response_bytes:
if expect_cbor:
try:
payload = decode_cbor(response_bytes)
except Exception as exc: # pragma: no cover - library errors surfaced
decode_error = exc
if payload is None and not expect_cbor:
try:
payload = json.loads(response_bytes.decode('utf-8'))
except (json.JSONDecodeError, UnicodeDecodeError):
payload = response_bytes.decode('utf-8', 'replace')
elif payload is None and expect_cbor and decode_error is not None:
log(f"Expected CBOR response but decoding failed: {decode_error}", file=sys.stderr)
return 1
json_ready = jsonify(payload) if isinstance(payload, (dict, list)) else payload
if options.debug:
log(f"Hydrus {options.method} {options.url} -> {status}", file=sys.stderr)
if isinstance(json_ready, (dict, list)):
log(json.dumps(json_ready, ensure_ascii=False))
elif json_ready is None:
log('{}')
else:
log(json.dumps({'value': json_ready}, ensure_ascii=False))
return 0 if 200 <= status < 400 else 1
def prepare_ffmpeg_metadata(payload: Optional[dict[str, Any]]) -> dict[str, str]:
if not isinstance(payload, dict):
return {}
metadata: dict[str, str] = {}
def set_field(key: str, raw: Any, limit: int = 2000) -> None:
sanitized = sanitize_metadata_value(raw)
if not sanitized:
return
if len(sanitized) > limit:
sanitized = sanitized[:limit]
metadata[key] = sanitized
set_field('title', payload.get('title'))
set_field('artist', payload.get('artist'), 512)
set_field('album', payload.get('album'), 512)
set_field('date', payload.get('year'), 20)
comment = payload.get('comment')
tags_value = payload.get('tags')
tag_strings: list[str] = []
artists_from_tags: list[str] = []
albums_from_tags: list[str] = []
genres_from_tags: list[str] = []
if isinstance(tags_value, list):
for raw_tag in tags_value:
if raw_tag is None:
continue
if not isinstance(raw_tag, str):
raw_tag = str(raw_tag)
tag = raw_tag.strip()
if not tag:
continue
tag_strings.append(tag)
namespace, sep, value = tag.partition(':')
if sep and value:
ns = namespace.strip().lower()
value = value.strip()
if ns in {'artist', 'creator', 'author', 'performer'}:
artists_from_tags.append(value)
elif ns in {'album', 'series', 'collection', 'group'}:
albums_from_tags.append(value)
elif ns in {'genre', 'rating'}:
genres_from_tags.append(value)
elif ns in {'comment', 'description'} and not comment:
comment = value
elif ns in {'year', 'date'} and not payload.get('year'):
set_field('date', value, 20)
else:
genres_from_tags.append(tag)
if 'artist' not in metadata and artists_from_tags:
set_field('artist', ', '.join(unique_preserve_order(artists_from_tags)[:3]), 512)
if 'album' not in metadata and albums_from_tags:
set_field('album', unique_preserve_order(albums_from_tags)[0], 512)
if genres_from_tags:
set_field('genre', ', '.join(unique_preserve_order(genres_from_tags)[:5]), 256)
if tag_strings:
joined_tags = ', '.join(tag_strings[:50])
set_field('keywords', joined_tags, 2000)
if not comment:
comment = joined_tags
if comment:
set_field('comment', comment, 2000)
set_field('description', comment, 2000)
return metadata
def apply_mutagen_metadata(path: Path, metadata: dict[str, str], fmt: str) -> None:
if fmt != 'audio':
return
if not metadata:
return
if mutagen is None:
return
try:
audio = mutagen.File(path, easy=True) # type: ignore[attr-defined]
except Exception as exc: # pragma: no cover - best effort only
log(f"mutagen load failed: {exc}", file=sys.stderr)
return
if audio is None:
return
field_map = {
'title': 'title',
'artist': 'artist',
'album': 'album',
'genre': 'genre',
'comment': 'comment',
'description': 'comment',
'date': 'date',
}
changed = False
for source_key, target_key in field_map.items():
value = metadata.get(source_key)
if not value:
continue
try:
audio[target_key] = [value]
changed = True
except Exception: # pragma: no cover - best effort only
continue
if not changed:
return
try:
audio.save()
except Exception as exc: # pragma: no cover - best effort only
log(f"mutagen save failed: {exc}", file=sys.stderr)
def build_ffmpeg_command(ffmpeg_path: str, input_path: Path, output_path: Path, fmt: str, max_width: int, metadata: Optional[dict[str, str]] = None) -> list[str]:
cmd = [ffmpeg_path, '-y', '-i', str(input_path)]
if fmt in {'mp4', 'webm'} and max_width and max_width > 0:
cmd.extend(['-vf', f"scale='min({max_width},iw)':-2"])
if metadata:
for key, value in metadata.items():
cmd.extend(['-metadata', f'{key}={value}'])
# Video formats
if fmt == 'mp4':
cmd.extend([
'-c:v', 'libx265',
'-preset', 'medium',
'-crf', '26',
'-tag:v', 'hvc1',
'-pix_fmt', 'yuv420p',
'-c:a', 'aac',
'-b:a', '192k',
'-movflags', '+faststart',
])
elif fmt == 'webm':
cmd.extend([
'-c:v', 'libvpx-vp9',
'-b:v', '0',
'-crf', '32',
'-c:a', 'libopus',
'-b:a', '160k',
])
cmd.extend(['-f', 'webm'])
# Audio formats
elif fmt == 'mp3':
cmd.extend([
'-vn',
'-c:a', 'libmp3lame',
'-b:a', '192k',
])
cmd.extend(['-f', 'mp3'])
elif fmt == 'flac':
cmd.extend([
'-vn',
'-c:a', 'flac',
])
cmd.extend(['-f', 'flac'])
elif fmt == 'wav':
cmd.extend([
'-vn',
'-c:a', 'pcm_s16le',
])
cmd.extend(['-f', 'wav'])
elif fmt == 'aac':
cmd.extend([
'-vn',
'-c:a', 'aac',
'-b:a', '192k',
])
cmd.extend(['-f', 'adts'])
elif fmt == 'm4a':
cmd.extend([
'-vn',
'-c:a', 'aac',
'-b:a', '192k',
])
cmd.extend(['-f', 'ipod'])
elif fmt == 'ogg':
cmd.extend([
'-vn',
'-c:a', 'libvorbis',
'-b:a', '192k',
])
cmd.extend(['-f', 'ogg'])
elif fmt == 'opus':
cmd.extend([
'-vn',
'-c:a', 'libopus',
'-b:a', '192k',
])
cmd.extend(['-f', 'opus'])
elif fmt == 'audio':
# Legacy format name for mp3
cmd.extend([
'-vn',
'-c:a', 'libmp3lame',
'-b:a', '192k',
])
cmd.extend(['-f', 'mp3'])
elif fmt != 'copy':
raise ValueError(f'Unsupported format: {fmt}')
cmd.append(str(output_path))
return cmd
def hydrus_export(args, _parser) -> int:
output_path: Path = args.output
original_suffix = output_path.suffix
target_dir = output_path.parent
metadata_payload: Optional[dict[str, Any]] = None
metadata_raw = getattr(args, 'metadata_json', None)
if metadata_raw:
try:
parsed = json.loads(metadata_raw)
except json.JSONDecodeError as exc:
log(f"Invalid metadata JSON: {exc}", file=sys.stderr)
return 1
if isinstance(parsed, dict):
metadata_payload = parsed
else:
log('[downlow.py] Metadata JSON must decode to an object', file=sys.stderr)
return 1
ffmpeg_metadata = prepare_ffmpeg_metadata(metadata_payload)
def _normalise_ext(value: Optional[str]) -> Optional[str]:
if not value:
return None
cleaned = value.strip()
if not cleaned:
return None
if not cleaned.startswith('.'): # tolerate inputs like "mp4"
cleaned = '.' + cleaned.lstrip('.')
return cleaned
def _extension_from_mime(mime: Optional[str]) -> Optional[str]:
if not mime:
return None
mime_map = {
# Images / bitmaps
'image/jpeg': '.jpg',
'image/jpg': '.jpg',
'image/png': '.png',
'image/gif': '.gif',
'image/webp': '.webp',
'image/avif': '.avif',
'image/jxl': '.jxl', # JPEG XL
'image/bmp': '.bmp',
'image/heic': '.heic',
'image/heif': '.heif',
'image/x-icon': '.ico',
'image/vnd.microsoft.icon': '.ico',
'image/qoi': '.qoi', # Quite OK Image
'image/tiff': '.tiff',
'image/svg+xml': '.svg',
'image/vnd.adobe.photoshop': '.psd',
# Animation / sequence variants
'image/apng': '.apng',
'image/avif-sequence': '.avifs',
'image/heic-sequence': '.heics',
'image/heif-sequence': '.heifs',
# Video
'video/mp4': '.mp4',
'video/webm': '.webm',
'video/quicktime': '.mov',
'video/ogg': '.ogv',
'video/mpeg': '.mpeg',
'video/x-msvideo': '.avi',
'video/x-flv': '.flv',
'video/x-matroska': '.mkv',
'video/x-ms-wmv': '.wmv',
'video/vnd.rn-realvideo': '.rv',
# Audio
'audio/mpeg': '.mp3',
'audio/mp4': '.m4a',
'audio/ogg': '.ogg',
'audio/flac': '.flac',
'audio/wav': '.wav',
'audio/x-wav': '.wav',
'audio/x-ms-wma': '.wma',
'audio/x-tta': '.tta',
'audio/vnd.wave': '.wav',
'audio/x-wavpack': '.wv',
# Documents / office
'application/pdf': '.pdf',
'application/epub+zip': '.epub',
'application/vnd.djvu': '.djvu',
'application/rtf': '.rtf',
'application/vnd.openxmlformats-officedocument.wordprocessingml.document': '.docx',
'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet': '.xlsx',
'application/vnd.openxmlformats-officedocument.presentationml.presentation': '.pptx',
'application/msword': '.doc',
'application/vnd.ms-excel': '.xls',
'application/vnd.ms-powerpoint': '.ppt',
# Archive / comicbook / zip-like
'application/zip': '.zip',
'application/x-7z-compressed': '.7z',
'application/x-rar-compressed': '.rar',
'application/gzip': '.gz',
'application/x-tar': '.tar',
'application/x-cbz': '.cbz', # often just ZIP with images; CBZ is not an official mime type but used as mapping
# App / project / other
'application/clip': '.clip', # Clip Studio
'application/x-krita': '.kra',
'application/x-procreate': '.procreate',
'application/x-shockwave-flash': '.swf',
}
return mime_map.get(mime.lower())
def _extract_hash(file_url: str) -> Optional[str]:
match = re.search(r'[?&]hash=([0-9a-fA-F]+)', file_url)
return match.group(1) if match else None
# Ensure output and temp directories exist using global helper
for dir_path in [target_dir, Path(args.tmp_dir) if args.tmp_dir else target_dir]:
try:
ensure_directory(dir_path)
except RuntimeError as exc:
log(f"{exc}", file=sys.stderr)
return 1
source_suffix = _normalise_ext(getattr(args, 'source_ext', None))
if source_suffix and source_suffix.lower() == '.bin':
source_suffix = None
if source_suffix is None:
hydrus_url = getattr(args, 'hydrus_url', None)
if not hydrus_url:
try:
from config import load_config, get_hydrus_url
hydrus_url = get_hydrus_url(load_config())
except Exception as exc:
hydrus_url = None
if os.environ.get('DOWNLOW_DEBUG'):
log(f"hydrus-export could not load Hydrus URL: {exc}", file=sys.stderr)
if hydrus_url:
try:
setattr(args, 'hydrus_url', hydrus_url)
except Exception:
pass
resolved_suffix: Optional[str] = None
file_hash = getattr(args, 'file_hash', None) or _extract_hash(args.file_url)
if hydrus_url and file_hash:
try:
client = HydrusClient(base_url=hydrus_url, access_key=args.access_key, timeout=args.timeout)
meta_response = client.fetch_file_metadata(hashes=[file_hash], include_mime=True)
entries = meta_response.get('metadata') if isinstance(meta_response, dict) else None
if isinstance(entries, list) and entries:
entry = entries[0]
ext_value = _normalise_ext(entry.get('ext') if isinstance(entry, dict) else None)
if ext_value:
resolved_suffix = ext_value
else:
mime_value = entry.get('mime') if isinstance(entry, dict) else None
resolved_suffix = _extension_from_mime(mime_value)
except Exception as exc: # pragma: no cover - defensive
if os.environ.get('DOWNLOW_DEBUG'):
log(f"hydrus metadata fetch failed: {exc}", file=sys.stderr)
if not resolved_suffix:
fallback_suffix = _normalise_ext(original_suffix)
if fallback_suffix and fallback_suffix.lower() == '.bin':
fallback_suffix = None
resolved_suffix = fallback_suffix or '.hydrus'
source_suffix = resolved_suffix
suffix = source_suffix or '.hydrus'
if suffix and output_path.suffix.lower() in {'', '.bin'}:
if output_path.suffix.lower() != suffix.lower():
output_path = output_path.with_suffix(suffix)
target_dir = output_path.parent
# Determine temp directory (prefer provided tmp_dir, fallback to output location)
temp_dir = Path(getattr(args, 'tmp_dir', None) or target_dir)
try:
ensure_directory(temp_dir)
except RuntimeError:
temp_dir = target_dir
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=suffix, dir=str(temp_dir))
temp_path = Path(temp_file.name)
temp_file.close()
downloaded_bytes = 0
headers = {
'Hydrus-Client-API-Access-Key': args.access_key,
}
try:
downloaded_bytes = download_hydrus_file(args.file_url, headers, temp_path, args.timeout)
if os.environ.get('DOWNLOW_DEBUG'):
log(f"hydrus-export downloaded {downloaded_bytes} bytes", file=sys.stderr)
except httpx.RequestError as exc:
if temp_path.exists():
temp_path.unlink()
log(f"hydrus-export download failed: {exc}", file=sys.stderr)
return 1
except Exception as exc: # pragma: no cover - unexpected
if temp_path.exists():
temp_path.unlink()
log(f"hydrus-export error: {exc}", file=sys.stderr)
return 1
ffmpeg_log: Optional[str] = None
converted_tmp: Optional[Path] = None
try:
final_target = unique_path(output_path)
if args.format == 'copy':
shutil.move(str(temp_path), str(final_target))
result_path = final_target
else:
ffmpeg_path = shutil.which('ffmpeg')
if not ffmpeg_path:
raise RuntimeError('ffmpeg executable not found in PATH')
converted_tmp = final_target.with_suffix(final_target.suffix + '.part')
if converted_tmp.exists():
converted_tmp.unlink()
max_width = args.max_width if args.max_width and args.max_width > 0 else 0
cmd = build_ffmpeg_command(ffmpeg_path, temp_path, converted_tmp, args.format, max_width, metadata=ffmpeg_metadata if ffmpeg_metadata else None)
if os.environ.get('DOWNLOW_DEBUG'):
log(f"ffmpeg command: {' '.join(cmd)}", file=sys.stderr)
completed = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=False,
text=True,
)
ffmpeg_log = (completed.stderr or '').strip()
if completed.returncode != 0:
error_details = ffmpeg_log or (completed.stdout or '').strip()
raise RuntimeError(
f'ffmpeg failed with exit code {completed.returncode}'
+ (f': {error_details}' if error_details else '')
)
shutil.move(str(converted_tmp), str(final_target))
result_path = final_target
apply_mutagen_metadata(result_path, ffmpeg_metadata, args.format)
result_size = result_path.stat().st_size if result_path.exists() else None
payload: dict[str, object] = {'output': str(result_path)}
if downloaded_bytes:
payload['source_bytes'] = downloaded_bytes
if result_size is not None:
payload['size_bytes'] = result_size
if metadata_payload:
payload['metadata_keys'] = sorted(ffmpeg_metadata.keys()) if ffmpeg_metadata else []
log(json.dumps(payload, ensure_ascii=False))
if ffmpeg_log:
log(ffmpeg_log, file=sys.stderr)
return 0
except Exception as exc:
log(f"hydrus-export failed: {exc}", file=sys.stderr)
return 1
finally:
if temp_path.exists():
try:
temp_path.unlink()
except OSError:
pass
if converted_tmp and converted_tmp.exists():
try:
converted_tmp.unlink()
except OSError:
pass
# ============================================================================
# Hydrus Wrapper Functions - Utilities for client initialization and config
# ============================================================================
# This section consolidates functions formerly in hydrus_wrapper.py
# Provides: supported filetypes, client initialization, caching, service resolution
# Official Hydrus supported filetypes
# Source: https://hydrusnetwork.github.io/hydrus/filetypes.html
SUPPORTED_FILETYPES = {
# Images
'image': {
'.jpeg': 'image/jpeg',
'.jpg': 'image/jpeg',
'.png': 'image/png',
'.gif': 'image/gif',
'.webp': 'image/webp',
'.avif': 'image/avif',
'.jxl': 'image/jxl',
'.bmp': 'image/bmp',
'.heic': 'image/heic',
'.heif': 'image/heif',
'.ico': 'image/x-icon',
'.qoi': 'image/qoi',
'.tiff': 'image/tiff',
},
# Animated Images
'animation': {
'.apng': 'image/apng',
'.avifs': 'image/avif-sequence',
'.heics': 'image/heic-sequence',
'.heifs': 'image/heif-sequence',
},
# Video
'video': {
'.mp4': 'video/mp4',
'.webm': 'video/webm',
'.mkv': 'video/x-matroska',
'.avi': 'video/x-msvideo',
'.flv': 'video/x-flv',
'.mov': 'video/quicktime',
'.mpeg': 'video/mpeg',
'.ogv': 'video/ogg',
'.rm': 'video/vnd.rn-realvideo',
'.wmv': 'video/x-ms-wmv',
},
# Audio
'audio': {
'.mp3': 'audio/mp3',
'.ogg': 'audio/ogg',
'.flac': 'audio/flac',
'.m4a': 'audio/mp4',
'.mka': 'audio/x-matroska',
'.mkv': 'audio/x-matroska',
'.mp4': 'audio/mp4',
'.ra': 'audio/vnd.rn-realaudio',
'.tta': 'audio/x-tta',
'.wav': 'audio/x-wav',
'.wv': 'audio/wavpack',
'.wma': 'audio/x-ms-wma',
},
# Applications & Documents
'application': {
'.swf': 'application/x-shockwave-flash',
'.pdf': 'application/pdf',
'.epub': 'application/epub+zip',
'.djvu': 'image/vnd.djvu',
'.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
'.xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
'.pptx': 'application/vnd.openxmlformats-officedocument.presentationml.presentation',
'.doc': 'application/msword',
'.xls': 'application/vnd.ms-excel',
'.ppt': 'application/vnd.ms-powerpoint',
'.rtf': 'application/rtf',
},
# Image Project Files
'project': {
'.clip': 'application/clip1',
'.kra': 'application/x-krita',
'.procreate': 'application/x-procreate1',
'.psd': 'image/vnd.adobe.photoshop',
'.sai2': 'application/sai21',
'.svg': 'image/svg+xml',
'.xcf': 'application/x-xcf',
},
# Archives
'archive': {
'.cbz': 'application/vnd.comicbook+zip',
'.7z': 'application/x-7z-compressed',
'.gz': 'application/gzip',
'.rar': 'application/vnd.rar',
'.zip': 'application/zip',
},
}
# Flatten to get all supported extensions
ALL_SUPPORTED_EXTENSIONS = set()
for category_extensions in SUPPORTED_FILETYPES.values():
ALL_SUPPORTED_EXTENSIONS.update(category_extensions.keys())
# Global Hydrus client cache to reuse session keys
_hydrus_client_cache: dict[str, Any] = {}
# Cache Hydrus availability across the session
_HYDRUS_AVAILABLE: Optional[bool] = None
_HYDRUS_UNAVAILABLE_REASON: Optional[str] = None
def reset_cache() -> None:
"""Reset the availability cache (useful for testing)."""
global _HYDRUS_AVAILABLE, _HYDRUS_UNAVAILABLE_REASON
_HYDRUS_AVAILABLE = None
_HYDRUS_UNAVAILABLE_REASON = None
def is_available(config: dict[str, Any], use_cache: bool = True) -> tuple[bool, Optional[str]]:
"""Check if Hydrus is available and accessible.
Performs a lightweight probe to verify:
- Hydrus URL is configured
- Hydrus client library is available
- Can connect to Hydrus and retrieve services
Results are cached per session unless use_cache=False.
Args:
config: Configuration dict with Hydrus settings
use_cache: If True, use cached result from previous probe
Returns:
Tuple of (is_available: bool, reason: Optional[str])
reason is None if available, or an error message if not
"""
global _HYDRUS_AVAILABLE, _HYDRUS_UNAVAILABLE_REASON
if use_cache and _HYDRUS_AVAILABLE is not None:
return _HYDRUS_AVAILABLE, _HYDRUS_UNAVAILABLE_REASON
# Use new config helpers first, fallback to old method
from config import get_hydrus_url, get_hydrus_access_key
url = (get_hydrus_url(config, "home") or "").strip()
if not url:
reason = "Hydrus URL not configured (check config.json HydrusNetwork.home.url)"
_HYDRUS_AVAILABLE = False
_HYDRUS_UNAVAILABLE_REASON = reason
return False, reason
access_key = get_hydrus_access_key(config, "home") or ""
timeout_raw = config.get("HydrusNetwork_Request_Timeout")
try:
timeout = float(timeout_raw) if timeout_raw is not None else 10.0
except (TypeError, ValueError):
timeout = 10.0
try:
client = HydrusClient(url, access_key, timeout)
# Lightweight probe: get services
# Temporarily suppress error logging for health checks (expected to fail if Hydrus unavailable)
hydrus_logger = logging.getLogger("helper.hydrus")
original_level = hydrus_logger.level
hydrus_logger.setLevel(logging.CRITICAL) # Suppress errors/warnings
try:
_ = client.get_services()
_HYDRUS_AVAILABLE = True
_HYDRUS_UNAVAILABLE_REASON = None
return True, None
finally:
hydrus_logger.setLevel(original_level)
except Exception as exc:
reason = str(exc)
_HYDRUS_AVAILABLE = False
_HYDRUS_UNAVAILABLE_REASON = reason
return False, reason
def is_hydrus_available(config: dict[str, Any]) -> bool:
"""Check if Hydrus is available without raising.
Args:
config: Configuration dict
Returns:
True if Hydrus is available, False otherwise
"""
available, _ = is_available(config)
return available
def get_client(config: dict[str, Any]) -> HydrusClient:
"""Create and return a Hydrus client with session key authentication.
Reuses cached client instance to preserve session keys across requests.
Args:
config: Configuration dict with Hydrus settings
Returns:
HydrusClient instance (with active session key)
Raises:
RuntimeError: If Hydrus is not configured or unavailable
"""
# Check availability first - if unavailable, raise immediately
available, reason = is_available(config)
if not available:
raise RuntimeError(f"Hydrus is unavailable: {reason}")
from config import get_hydrus_url, get_hydrus_access_key
# Use new config helpers
hydrus_url = (get_hydrus_url(config, "home") or "").strip()
if not hydrus_url:
raise RuntimeError("Hydrus URL is not configured (check config.json HydrusNetwork.home.url)")
access_key = get_hydrus_access_key(config, "home") or ""
timeout_raw = config.get("HydrusNetwork_Request_Timeout")
try:
timeout = float(timeout_raw) if timeout_raw is not None else 60.0
except (TypeError, ValueError):
timeout = 60.0
# Create cache key from URL and access key
cache_key = f"{hydrus_url}#{access_key}"
# Check if we have a cached client
if cache_key in _hydrus_client_cache:
cached_client = _hydrus_client_cache[cache_key]
# If cached client has a session key, reuse it (don't re-acquire)
if hasattr(cached_client, '_session_key') and cached_client._session_key:
debug(f"Reusing cached session key for {hydrus_url}")
return cached_client
# If no session key in cache, try to get one
try:
cached_client.ensure_session_key()
return cached_client
except Exception as e:
# If verification fails, remove from cache and create new one
debug(f"Cached client invalid, creating new: {e}")
del _hydrus_client_cache[cache_key]
# Create new client
client = HydrusClient(hydrus_url, access_key, timeout)
# Acquire session key for secure authentication
try:
client.ensure_session_key()
except HydrusConnectionError:
# This should not happen since we checked availability above
debug(f"Hydrus service unavailable during client creation")
raise RuntimeError("Hydrus is unavailable") from None
except Exception as e:
# Log other exceptions but don't fail - client can still work with access_key
debug(f"Warning: Could not acquire session key: {e}")
# Cache the client
_hydrus_client_cache[cache_key] = client
return client
def get_tag_service_name(config: dict[str, Any]) -> str:
"""Get the name of the tag service to use for tagging operations.
Currently always returns "my tags" to avoid remote service errors.
Args:
config: Configuration dict (not currently used)
Returns:
Service name string, typically "my tags"
"""
# Always use 'my tags' to avoid remote service errors
return "my tags"
def get_tag_service_key(client: HydrusClient, fallback_name: str = "my tags") -> Optional[str]:
"""Get the service key for a named tag service.
Queries the Hydrus client's services and finds the service key matching
the given name.
Args:
client: HydrusClient instance
fallback_name: Name of the service to find (e.g., "my tags")
Returns:
Service key string if found, None otherwise
"""
try:
services = client.get_services()
except Exception:
return None
if not isinstance(services, dict):
return None
# Hydrus returns services grouped by type; walk all lists and match on name
for group in services.values():
if not isinstance(group, list):
continue
for item in group:
if not isinstance(item, dict):
continue
name = str(item.get("name") or "").strip().lower()
key = item.get("service_key") or item.get("key")
if name == fallback_name.lower() and key:
return str(key)
return None
def is_request_error(exc: Exception) -> bool:
"""Check if an exception is a Hydrus request error.
Args:
exc: Exception to check
Returns:
True if this is a HydrusRequestError
"""
return isinstance(exc, HydrusRequestError)
CHUNK_SIZE = 1024 * 1024 # 1 MiB
def download_hydrus_file(file_url: str, headers: dict[str, str], destination: Path, timeout: float) -> int:
"""Download *file_url* into *destination* returning the byte count with progress bar."""
from .progress import print_progress, print_final_progress
downloaded = 0
start_time = time.time()
last_update = start_time
# Try to get file size from headers if available
file_size = None
with HTTPClient(timeout=timeout, headers=headers) as client:
response = client.get(file_url)
response.raise_for_status()
# Try to get size from content-length header
try:
file_size = int(response.headers.get('content-length', 0))
except (ValueError, TypeError):
file_size = None
filename = destination.name
with destination.open('wb') as handle:
for chunk in response.iter_bytes(CHUNK_SIZE):
if not chunk:
break
handle.write(chunk)
downloaded += len(chunk)
# Update progress every 0.5 seconds if we know total size
if file_size:
now = time.time()
if now - last_update >= 0.5:
elapsed = now - start_time
speed = downloaded / elapsed if elapsed > 0 else 0
print_progress(filename, downloaded, file_size, speed)
last_update = now
# Print final progress line if we tracked it
if file_size:
elapsed = time.time() - start_time
print_final_progress(filename, file_size, elapsed)
return downloaded