Files
Medios-Macina/helper/utils.py

492 lines
16 KiB
Python
Raw Permalink Normal View History

2025-11-25 20:09:33 -08:00
"""General-purpose helpers used across the downlow CLI."""
from __future__ import annotations
import json
import hashlib
import ffmpeg
import base64
import logging
import time
from pathlib import Path
from typing import Any, Iterable
from datetime import datetime
from dataclasses import dataclass, field
from fnmatch import fnmatch
from urllib.parse import urlparse
import helper.utils_constant
try:
import cbor2
except ImportError:
cbor2 = None # type: ignore
CHUNK_SIZE = 1024 * 1024 # 1 MiB
_format_logger = logging.getLogger(__name__)
def ensure_directory(path: Path) -> None:
"""Ensure *path* exists as a directory."""
try:
path.mkdir(parents=True, exist_ok=True)
except OSError as exc: # pragma: no cover - surfaced to caller
raise RuntimeError(f"Failed to create directory {path}: {exc}") from exc
def unique_path(path: Path) -> Path:
"""Return a unique path by appending " (n)" if needed."""
if not path.exists():
return path
stem = path.stem
suffix = path.suffix
parent = path.parent
counter = 1
while True:
candidate = parent / f"{stem} ({counter}){suffix}"
if not candidate.exists():
return candidate
counter += 1
def sanitize_metadata_value(value: Any) -> str | None:
if value is None:
return None
if not isinstance(value, str):
value = str(value)
value = value.replace('\x00', ' ').replace('\r', ' ').replace('\n', ' ').strip()
if not value:
return None
return value
def unique_preserve_order(values: Iterable[str]) -> list[str]:
seen: set[str] = set()
ordered: list[str] = []
for value in values:
if value not in seen:
seen.add(value)
ordered.append(value)
return ordered
def sha256_file(file_path: Path) -> str:
"""Return the SHA-256 hex digest of *path*."""
hasher = hashlib.sha256()
with file_path.open('rb') as handle:
for chunk in iter(lambda: handle.read(CHUNK_SIZE), b''):
hasher.update(chunk)
return hasher.hexdigest()
def create_metadata_sidecar(file_path: Path, metadata: dict) -> None:
"""Create a .metadata sidecar file with JSON metadata.
The metadata dict should contain title. If not present, it will be derived from
the filename. This ensures the .metadata file can be matched during batch import.
Args:
file_path: Path to the exported file
metadata: Dictionary of metadata to save
"""
if not metadata:
return
file_name = file_path.stem
file_ext = file_path.suffix.lower()
# Ensure metadata has a title field that matches the filename (without extension)
# This allows the sidecar to be matched and imported properly during batch import
if 'title' not in metadata or not metadata.get('title'):
metadata['title'] = file_name
metadata['hash'] = sha256_file(file_path)
metadata['size'] = Path(file_path).stat().st_size
format_found = False
for mime_type, ext_map in helper.utils_constant.mime_maps.items():
for key, info in ext_map.items():
if info.get("ext") == file_ext:
metadata['type'] = mime_type
format_found = True
break
if format_found:
break
else:
metadata['type'] = 'unknown'
metadata.update(ffprobe(str(file_path)))
metadata_path = file_path.with_suffix(file_path.suffix + '.metadata')
try:
with open(metadata_path, 'w', encoding='utf-8') as f:
json.dump(metadata, f, ensure_ascii=False, indent=2)
except OSError as exc:
raise RuntimeError(f"Failed to write metadata sidecar {metadata_path}: {exc}") from exc
def create_tags_sidecar(file_path: Path, tags: set) -> None:
"""Create a .tags sidecar file with tags (one per line).
Args:
file_path: Path to the exported file
tags: Set of tag strings
"""
if not tags:
return
tags_path = file_path.with_suffix(file_path.suffix + '.tags')
try:
with open(tags_path, 'w', encoding='utf-8') as f:
for tag in sorted(tags):
f.write(f"{tag}\n")
except Exception as e:
raise RuntimeError(f"Failed to create tags sidecar {tags_path}: {e}") from e
def ffprobe(file_path: str) -> dict:
probe = ffmpeg.probe(file_path)
metadata = {}
# Format-level info
fmt = probe.get("format", {})
metadata["duration"] = float(fmt.get("duration", 0)) if "duration" in fmt else None
metadata["size"] = int(fmt.get("size", 0)) if "size" in fmt else None
metadata["format_name"] = fmt.get("format_name", None)
# Stream-level info
for stream in probe.get("streams", []):
codec_type = stream.get("codec_type")
if codec_type == "audio":
metadata["audio_codec"] = stream.get("codec_name")
metadata["bitrate"] = int(stream.get("bit_rate", 0)) if "bit_rate" in stream else None
metadata["samplerate"] = int(stream.get("sample_rate", 0)) if "sample_rate" in stream else None
metadata["channels"] = int(stream.get("channels", 0)) if "channels" in stream else None
elif codec_type == "video":
metadata["video_codec"] = stream.get("codec_name")
metadata["width"] = int(stream.get("width", 0)) if "width" in stream else None
metadata["height"] = int(stream.get("height", 0)) if "height" in stream else None
elif codec_type == "image":
metadata["image_codec"] = stream.get("codec_name")
metadata["width"] = int(stream.get("width", 0)) if "width" in stream else None
metadata["height"] = int(stream.get("height", 0)) if "height" in stream else None
return metadata
# ============================================================================
# CBOR Utilities - Consolidated from cbor.py
# ============================================================================
"""CBOR utilities backed by the `cbor2` library."""
def decode_cbor(data: bytes) -> Any:
"""Decode *data* from CBOR into native Python objects."""
if not data:
return None
if cbor2 is None:
raise ImportError("cbor2 library is required for CBOR decoding")
return cbor2.loads(data)
def jsonify(value: Any) -> Any:
"""Convert *value* into a JSON-friendly structure."""
if isinstance(value, dict):
return {str(key): jsonify(val) for key, val in value.items()}
if isinstance(value, list):
return [jsonify(item) for item in value]
if isinstance(value, bytes):
return {"__bytes__": base64.b64encode(value).decode("ascii")}
return value
# ============================================================================
# Format Utilities - Consolidated from format_utils.py
# ============================================================================
"""Formatting utilities for displaying metadata consistently across the application."""
def format_bytes(bytes_value) -> str:
"""Format bytes to human-readable format (e.g., '1.5 MB', '250 KB').
Args:
bytes_value: Size in bytes (int or float)
Returns:
Formatted string like '1.5 MB' or '756 MB'
"""
if bytes_value is None or bytes_value <= 0:
return "0 B"
if isinstance(bytes_value, (int, float)):
for unit in ("B", "KB", "MB", "GB", "TB"):
if bytes_value < 1024:
if unit == "B":
return f"{int(bytes_value)} {unit}"
return f"{bytes_value:.1f} {unit}"
bytes_value /= 1024
return f"{bytes_value:.1f} PB"
return str(bytes_value)
def format_duration(seconds) -> str:
"""Format duration in seconds to human-readable format (e.g., '1h 23m 5s', '5m 30s').
Args:
seconds: Duration in seconds (int or float)
Returns:
Formatted string like '1:23:45' or '5:30'
"""
if seconds is None or seconds == '':
return "N/A"
if isinstance(seconds, str):
try:
seconds = float(seconds)
except ValueError:
return str(seconds)
if not isinstance(seconds, (int, float)):
return str(seconds)
total_seconds = int(seconds)
if total_seconds < 0:
return "N/A"
hours = total_seconds // 3600
minutes = (total_seconds % 3600) // 60
secs = total_seconds % 60
if hours > 0:
return f"{hours}:{minutes:02d}:{secs:02d}"
elif minutes > 0:
return f"{minutes}:{secs:02d}"
else:
return f"{secs}s"
def format_timestamp(timestamp_str) -> str:
"""Format ISO timestamp to readable format.
Args:
timestamp_str: ISO format timestamp string or None
Returns:
Formatted string like "2025-10-28 19:36:01" or original string if parsing fails
"""
if not timestamp_str:
return "N/A"
try:
# Handle ISO format timestamps
if isinstance(timestamp_str, str):
# Try parsing ISO format
if 'T' in timestamp_str:
dt = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
else:
# Try other common formats
dt = datetime.fromisoformat(timestamp_str)
return dt.strftime("%Y-%m-%d %H:%M:%S")
except Exception as e:
_format_logger.debug(f"Could not parse timestamp '{timestamp_str}': {e}")
return str(timestamp_str)
def format_metadata_value(key: str, value) -> str:
"""Format a metadata value based on its key for display.
This is the central formatting rule for all metadata display.
Args:
key: Metadata field name
value: Value to format
Returns:
Formatted string for display
"""
if value is None or value == '':
return "N/A"
# Apply field-specific formatting
if key in ('size', 'file_size'):
return format_bytes(value)
elif key in ('duration', 'length'):
return format_duration(value)
elif key in ('time_modified', 'time_imported', 'created_at', 'updated_at', 'indexed_at', 'timestamp'):
return format_timestamp(value)
else:
return str(value)
# ============================================================================
# Link Utilities - Consolidated from link_utils.py
# ============================================================================
"""Link utilities - Extract and process URLs from various sources."""
def extract_link_from_args(args: Iterable[str]) -> Any | None:
"""Extract HTTP/HTTPS URL from command arguments.
Args:
args: Command arguments
Returns:
URL string if found, None otherwise
"""
args_list = list(args) if not isinstance(args, (list, tuple)) else args
if not args_list or len(args_list) == 0:
return None
potential_link = str(args_list[0])
if potential_link.startswith(('http://', 'https://')):
return potential_link
return None
def extract_link_from_result(result: Any) -> Any | None:
"""Extract URL from a result object (dict or object with attributes).
Args:
result: Result object from pipeline (dict or object)
Returns:
URL string if found, None otherwise
"""
if isinstance(result, dict):
return result.get('url') or result.get('link') or result.get('href')
return (
getattr(result, 'url', None) or
getattr(result, 'link', None) or
getattr(result, 'href', None)
)
def extract_link(result: Any, args: Iterable[str]) -> Any | None:
"""Extract link from args or result (args take priority).
Args:
result: Pipeline result object
args: Command arguments
Returns:
URL string if found, None otherwise
"""
# Try args first
link = extract_link_from_args(args)
if link:
return link
# Fall back to result
return extract_link_from_result(result)
def get_api_key(config: dict[str, Any], service: str, key_path: str) -> str | None:
"""Get API key from config with fallback support.
Args:
config: Configuration dictionary
service: Service name for logging
key_path: Dot-notation path to key (e.g., "Debrid.All-debrid")
Returns:
API key if found and not empty, None otherwise
"""
try:
parts = key_path.split('.')
value = config
for part in parts:
if isinstance(value, dict):
value = value.get(part)
else:
return None
if isinstance(value, str):
return value.strip() or None
return None
except Exception:
return None
def add_direct_link_to_result(result: Any, direct_link: str, original_link: str) -> None:
"""Add direct link information to result object.
Args:
result: Result object to modify (dict or object)
direct_link: The unlocked/direct URL
original_link: The original restricted URL
"""
if isinstance(result, dict):
result['direct_link'] = direct_link
result['original_link'] = original_link
else:
setattr(result, 'direct_link', direct_link)
setattr(result, 'original_link', original_link)
# ============================================================================
# URL Policy Resolution - Consolidated from url_parser.py
# ============================================================================
"""URL policy resolution for downlow workflows."""
@dataclass(slots=True)
class UrlPolicy:
"""Describe how a URL should be handled by download and screenshot flows."""
skip_download: bool = False
skip_metadata: bool = False
force_screenshot: bool = False
extra_tags: list[str] = field(default_factory=list)
def apply_tags(self, sources: Iterable[str]) -> list[str]:
tags = [tag.strip() for tag in self.extra_tags if tag and tag.strip()]
for value in sources:
text = str(value).strip()
if text:
tags.append(text)
return tags
def _normalise_rule(rule: dict[str, Any]) -> dict[str, Any] | None:
pattern = str(rule.get("pattern") or rule.get("host") or "").strip()
if not pattern:
return None
skip_download = bool(rule.get("skip_download"))
skip_metadata = bool(rule.get("skip_metadata"))
force_screenshot = bool(rule.get("force_screenshot"))
extra_tags_raw = rule.get("extra_tags")
if isinstance(extra_tags_raw, str):
extra_tags = [part.strip() for part in extra_tags_raw.split(",") if part.strip()]
elif isinstance(extra_tags_raw, (list, tuple, set)):
extra_tags = [str(item).strip() for item in extra_tags_raw if str(item).strip()]
else:
extra_tags = []
return {
"pattern": pattern,
"skip_download": skip_download,
"skip_metadata": skip_metadata,
"force_screenshot": force_screenshot,
"extra_tags": extra_tags,
}
def resolve_url_policy(config: dict[str, Any], url: str) -> UrlPolicy:
policies_raw = config.get("url_policies")
if not policies_raw:
return UrlPolicy()
if not isinstance(policies_raw, list):
return UrlPolicy()
parsed = urlparse(url)
subject = f"{parsed.netloc}{parsed.path}"
host = parsed.netloc
resolved = UrlPolicy()
for rule_raw in policies_raw:
if not isinstance(rule_raw, dict):
continue
rule = _normalise_rule(rule_raw)
if rule is None:
continue
pattern = rule["pattern"]
if not (fnmatch(host, pattern) or fnmatch(subject, pattern)):
continue
if rule["skip_download"]:
resolved.skip_download = True
if rule["skip_metadata"]:
resolved.skip_metadata = True
if rule["force_screenshot"]:
resolved.force_screenshot = True
if rule["extra_tags"]:
for tag in rule["extra_tags"]:
if tag not in resolved.extra_tags:
resolved.extra_tags.append(tag)
return resolved