Files
Medios-Macina/SYS/utils.py

634 lines
19 KiB
Python
Raw Permalink Normal View History

2025-11-25 20:09:33 -08:00
"""General-purpose helpers used across the downlow CLI."""
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
from __future__ import annotations
import json
import hashlib
2025-12-23 16:36:39 -08:00
import subprocess
import shutil
2025-12-29 17:05:03 -08:00
2025-12-23 16:36:39 -08:00
try:
import ffmpeg # type: ignore
except Exception:
ffmpeg = None # type: ignore
2026-01-11 00:52:54 -08:00
import os
2025-11-25 20:09:33 -08:00
import base64
import logging
import time
from pathlib import Path
2026-01-11 00:52:54 -08:00
from typing import Any, Iterable, Optional
2025-11-25 20:09:33 -08:00
from datetime import datetime
from dataclasses import dataclass, field
from fnmatch import fnmatch
from urllib.parse import urlparse
2025-12-11 19:04:02 -08:00
import SYS.utils_constant
2025-11-25 20:09:33 -08:00
try:
import cbor2
except ImportError:
cbor2 = None # type: ignore
CHUNK_SIZE = 1024 * 1024 # 1 MiB
_format_logger = logging.getLogger(__name__)
2025-12-29 17:05:03 -08:00
2026-01-11 00:52:54 -08:00
def expand_path(p: str | Path | None) -> Path:
"""Expand ~ and environment variables in path."""
if p is None:
return None # type: ignore
2026-01-11 01:04:39 -08:00
s = str(p)
# Courtesy check for $home -> $HOME if we're on a POSIX-like system
# (where env vars are case-sensitive)
if os.name != 'nt' and '$home' in s and '$HOME' not in os.environ:
# If $home is literally used in config but only HOME is defined
if 'HOME' in os.environ:
s = s.replace('$home', '$HOME')
expanded = os.path.expandvars(s)
2026-01-11 00:52:54 -08:00
return Path(expanded).expanduser()
2025-11-25 20:09:33 -08:00
def ensure_directory(path: Path) -> None:
"""Ensure *path* exists as a directory."""
try:
path.mkdir(parents=True, exist_ok=True)
except OSError as exc: # pragma: no cover - surfaced to caller
raise RuntimeError(f"Failed to create directory {path}: {exc}") from exc
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
def unique_path(path: Path) -> Path:
"""Return a unique path by appending " (n)" if needed."""
if not path.exists():
return path
stem = path.stem
suffix = path.suffix
parent = path.parent
counter = 1
while True:
candidate = parent / f"{stem} ({counter}){suffix}"
if not candidate.exists():
return candidate
counter += 1
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
def sanitize_metadata_value(value: Any) -> str | None:
if value is None:
return None
if not isinstance(value, str):
value = str(value)
2025-12-29 17:05:03 -08:00
value = value.replace("\x00", " ").replace("\r", " ").replace("\n", " ").strip()
2025-11-25 20:09:33 -08:00
if not value:
return None
return value
2025-12-29 17:05:03 -08:00
def sanitize_filename(name: str, *, max_len: int = 150) -> str:
"""Return a filesystem-safe filename derived from *name*.
Replaces characters that are invalid on Windows with underscores and
collapses whitespace. Trims trailing periods and enforces a max length.
"""
text = str(name or "").strip()
if not text:
return "download"
forbidden = set('<>:"/\\|?*')
cleaned = "".join("_" if c in forbidden else c for c in text)
cleaned = " ".join(cleaned.split()).strip().strip(".")
if not cleaned:
cleaned = "download"
return cleaned[:max_len]
2025-11-25 20:09:33 -08:00
def unique_preserve_order(values: Iterable[str]) -> list[str]:
seen: set[str] = set()
ordered: list[str] = []
for value in values:
if value not in seen:
seen.add(value)
ordered.append(value)
return ordered
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
def sha256_file(file_path: Path) -> str:
"""Return the SHA-256 hex digest of *path*."""
hasher = hashlib.sha256()
2025-12-29 17:05:03 -08:00
with file_path.open("rb") as handle:
for chunk in iter(lambda: handle.read(CHUNK_SIZE), b""):
2025-11-25 20:09:33 -08:00
hasher.update(chunk)
return hasher.hexdigest()
def create_metadata_sidecar(file_path: Path, metadata: dict) -> None:
"""Create a .metadata sidecar file with JSON metadata.
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
The metadata dict should contain title. If not present, it will be derived from
the filename. This ensures the .metadata file can be matched during batch import.
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
Args:
file_path: Path to the exported file
metadata: Dictionary of metadata to save
"""
if not metadata:
return
file_name = file_path.stem
file_ext = file_path.suffix.lower()
# Ensure metadata has a title field that matches the filename (without extension)
# This allows the sidecar to be matched and imported properly during batch import
2025-12-29 17:05:03 -08:00
if "title" not in metadata or not metadata.get("title"):
metadata["title"] = file_name
metadata["hash"] = sha256_file(file_path)
metadata["size"] = Path(file_path).stat().st_size
2025-11-25 20:09:33 -08:00
format_found = False
2025-12-11 19:04:02 -08:00
for mime_type, ext_map in SYS.utils_constant.mime_maps.items():
2025-11-25 20:09:33 -08:00
for key, info in ext_map.items():
if info.get("ext") == file_ext:
2025-12-29 17:05:03 -08:00
metadata["type"] = mime_type
2025-11-25 20:09:33 -08:00
format_found = True
break
if format_found:
break
else:
2025-12-29 17:05:03 -08:00
metadata["type"] = "unknown"
2025-11-25 20:09:33 -08:00
metadata.update(ffprobe(str(file_path)))
2025-12-29 17:05:03 -08:00
metadata_path = file_path.with_suffix(file_path.suffix + ".metadata")
2025-11-25 20:09:33 -08:00
try:
2025-12-29 17:05:03 -08:00
with open(metadata_path, "w", encoding="utf-8") as f:
2025-11-25 20:09:33 -08:00
json.dump(metadata, f, ensure_ascii=False, indent=2)
except OSError as exc:
raise RuntimeError(
f"Failed to write metadata sidecar {metadata_path}: {exc}"
) from exc
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
def create_tags_sidecar(file_path: Path, tags: set) -> None:
2025-12-11 23:21:45 -08:00
"""Create a .tag sidecar file with tags (one per line).
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
Args:
file_path: Path to the exported file
tags: Set of tag strings
"""
if not tags:
return
2025-12-29 17:05:03 -08:00
tags_path = file_path.with_suffix(file_path.suffix + ".tag")
2025-11-25 20:09:33 -08:00
try:
2025-12-29 17:05:03 -08:00
with open(tags_path, "w", encoding="utf-8") as f:
2025-11-25 20:09:33 -08:00
for tag in sorted(tags):
2025-12-20 23:57:44 -08:00
f.write(f"{str(tag).strip().lower()}\n")
2025-11-25 20:09:33 -08:00
except Exception as e:
raise RuntimeError(f"Failed to create tags sidecar {tags_path}: {e}") from e
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
def ffprobe(file_path: str) -> dict:
2025-12-23 16:36:39 -08:00
"""Probe a media file and return a metadata dictionary.
This function prefers the python `ffmpeg` module (ffmpeg-python) when available.
If that is not present, it will attempt to call the external `ffprobe` binary if found
on PATH. If neither is available or probing fails, an empty dict is returned.
"""
probe = None
2025-11-25 20:09:33 -08:00
2025-12-23 16:36:39 -08:00
# Try python ffmpeg module first
if ffmpeg is not None:
try:
probe = ffmpeg.probe(file_path)
except Exception as exc: # pragma: no cover - environment dependent
_format_logger.debug("ffmpeg.probe failed: %s", exc)
probe = None
# Fall back to external ffprobe if available
if probe is None:
ffprobe_cmd = shutil.which("ffprobe")
if ffprobe_cmd:
try:
proc = subprocess.run(
2025-12-29 17:05:03 -08:00
[
ffprobe_cmd,
"-v",
"quiet",
"-print_format",
"json",
"-show_format",
"-show_streams",
str(file_path),
],
2025-12-23 16:36:39 -08:00
check=True,
capture_output=True,
text=True,
)
probe = json.loads(proc.stdout)
except Exception as exc: # pragma: no cover - environment dependent
_format_logger.debug("External ffprobe failed: %s", exc)
probe = None
else:
_format_logger.debug("No ffmpeg Python module and no ffprobe binary found")
return {}
if not isinstance(probe, dict):
return {}
metadata = {}
fmt = probe.get("format",
{})
2025-11-25 20:09:33 -08:00
metadata["duration"] = float(fmt.get("duration", 0)) if "duration" in fmt else None
metadata["size"] = int(fmt.get("size", 0)) if "size" in fmt else None
metadata["format_name"] = fmt.get("format_name", None)
# Stream-level info
for stream in probe.get("streams", []):
codec_type = stream.get("codec_type")
if codec_type == "audio":
metadata["audio_codec"] = stream.get("codec_name")
metadata["bitrate"] = int(
stream.get("bit_rate",
0)
) if "bit_rate" in stream else None
2025-12-29 17:05:03 -08:00
metadata["samplerate"] = (
int(stream.get("sample_rate",
0)) if "sample_rate" in stream else None
2025-12-29 17:05:03 -08:00
)
metadata["channels"] = int(
stream.get("channels",
0)
) if "channels" in stream else None
2025-11-25 20:09:33 -08:00
elif codec_type == "video":
metadata["video_codec"] = stream.get("codec_name")
metadata["width"] = int(
stream.get("width",
0)
) if "width" in stream else None
metadata["height"] = int(
stream.get("height",
0)
) if "height" in stream else None
2025-11-25 20:09:33 -08:00
elif codec_type == "image":
metadata["image_codec"] = stream.get("codec_name")
metadata["width"] = int(
stream.get("width",
0)
) if "width" in stream else None
metadata["height"] = int(
stream.get("height",
0)
) if "height" in stream else None
2025-11-25 20:09:33 -08:00
return metadata
# ============================================================================
# CBOR Utilities - Consolidated from cbor.py
# ============================================================================
"""CBOR utilities backed by the `cbor2` library."""
def decode_cbor(data: bytes) -> Any:
"""Decode *data* from CBOR into native Python objects."""
if not data:
return None
if cbor2 is None:
raise ImportError("cbor2 library is required for CBOR decoding")
return cbor2.loads(data)
def jsonify(value: Any) -> Any:
"""Convert *value* into a JSON-friendly structure."""
if isinstance(value, dict):
return {
str(key): jsonify(val)
for key, val in value.items()
}
2025-11-25 20:09:33 -08:00
if isinstance(value, list):
return [jsonify(item) for item in value]
if isinstance(value, bytes):
return {
"__bytes__": base64.b64encode(value).decode("ascii")
}
2025-11-25 20:09:33 -08:00
return value
# ============================================================================
# Format Utilities - Consolidated from format_utils.py
# ============================================================================
"""Formatting utilities for displaying metadata consistently across the application."""
def format_bytes(bytes_value) -> str:
"""Format bytes to human-readable format (e.g., '1.5 MB', '250 KB').
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
Args:
bytes_value: Size in bytes (int or float)
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
Returns:
Formatted string like '1.5 MB' or '756 MB'
"""
if bytes_value is None or bytes_value <= 0:
return "0 B"
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
if isinstance(bytes_value, (int, float)):
for unit in ("B", "KB", "MB", "GB", "TB"):
if bytes_value < 1024:
if unit == "B":
return f"{int(bytes_value)} {unit}"
return f"{bytes_value:.1f} {unit}"
bytes_value /= 1024
return f"{bytes_value:.1f} PB"
return str(bytes_value)
def format_duration(seconds) -> str:
"""Format duration in seconds to human-readable format (e.g., '1h 23m 5s', '5m 30s').
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
Args:
seconds: Duration in seconds (int or float)
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
Returns:
Formatted string like '1:23:45' or '5:30'
"""
2025-12-29 17:05:03 -08:00
if seconds is None or seconds == "":
2025-11-25 20:09:33 -08:00
return "N/A"
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
if isinstance(seconds, str):
try:
seconds = float(seconds)
except ValueError:
return str(seconds)
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
if not isinstance(seconds, (int, float)):
return str(seconds)
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
total_seconds = int(seconds)
if total_seconds < 0:
return "N/A"
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
hours = total_seconds // 3600
minutes = (total_seconds % 3600) // 60
secs = total_seconds % 60
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
if hours > 0:
return f"{hours}:{minutes:02d}:{secs:02d}"
elif minutes > 0:
return f"{minutes}:{secs:02d}"
else:
return f"{secs}s"
def format_timestamp(timestamp_str) -> str:
"""Format ISO timestamp to readable format.
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
Args:
timestamp_str: ISO format timestamp string or None
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
Returns:
Formatted string like "2025-10-28 19:36:01" or original string if parsing fails
"""
if not timestamp_str:
return "N/A"
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
try:
# Handle ISO format timestamps
if isinstance(timestamp_str, str):
# Try parsing ISO format
2025-12-29 17:05:03 -08:00
if "T" in timestamp_str:
dt = datetime.fromisoformat(timestamp_str.replace("Z", "+00:00"))
2025-11-25 20:09:33 -08:00
else:
# Try other common formats
dt = datetime.fromisoformat(timestamp_str)
return dt.strftime("%Y-%m-%d %H:%M:%S")
except Exception as e:
_format_logger.debug(f"Could not parse timestamp '{timestamp_str}': {e}")
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
return str(timestamp_str)
def format_metadata_value(key: str, value) -> str:
"""Format a metadata value based on its key for display.
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
This is the central formatting rule for all metadata display.
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
Args:
key: Metadata field name
value: Value to format
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
Returns:
Formatted string for display
"""
2025-12-29 17:05:03 -08:00
if value is None or value == "":
2025-11-25 20:09:33 -08:00
return "N/A"
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
# Apply field-specific formatting
2025-12-29 17:05:03 -08:00
if key in ("size", "file_size"):
2025-11-25 20:09:33 -08:00
return format_bytes(value)
2025-12-29 17:05:03 -08:00
elif key in ("duration", "length"):
2025-11-25 20:09:33 -08:00
return format_duration(value)
2025-12-29 17:05:03 -08:00
elif key in (
"time_modified",
"time_imported",
"created_at",
"updated_at",
"indexed_at",
"timestamp",
2025-12-29 17:05:03 -08:00
):
2025-11-25 20:09:33 -08:00
return format_timestamp(value)
else:
return str(value)
# ============================================================================
# Link Utilities - Consolidated from link_utils.py
# ============================================================================
2025-12-11 12:47:30 -08:00
"""Link utilities - Extract and process url from various sources."""
2025-11-25 20:09:33 -08:00
def extract_link_from_args(args: Iterable[str]) -> Any | None:
"""Extract HTTP/HTTPS URL from command arguments.
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
Args:
args: Command arguments
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
Returns:
URL string if found, None otherwise
"""
args_list = list(args) if not isinstance(args, (list, tuple)) else args
if not args_list or len(args_list) == 0:
return None
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
potential_link = str(args_list[0])
2025-12-29 17:05:03 -08:00
if potential_link.startswith(("http://", "https://")):
2025-11-25 20:09:33 -08:00
return potential_link
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
return None
def extract_link_from_result(result: Any) -> Any | None:
"""Extract URL from a result object (dict or object with attributes).
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
Args:
result: Result object from pipeline (dict or object)
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
Returns:
URL string if found, None otherwise
"""
if isinstance(result, dict):
2025-12-29 17:05:03 -08:00
return result.get("url") or result.get("link") or result.get("href")
2025-11-25 20:09:33 -08:00
return (
getattr(result,
"url",
None) or getattr(result,
"link",
None) or getattr(result,
"href",
None)
2025-11-25 20:09:33 -08:00
)
def extract_link(result: Any, args: Iterable[str]) -> Any | None:
"""Extract link from args or result (args take priority).
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
Args:
result: Pipeline result object
args: Command arguments
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
Returns:
URL string if found, None otherwise
"""
# Try args first
link = extract_link_from_args(args)
if link:
return link
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
# Fall back to result
return extract_link_from_result(result)
def get_api_key(config: dict[str, Any], service: str, key_path: str) -> str | None:
"""Get API key from config with fallback support.
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
Args:
config: Configuration dictionary
service: Service name for logging
key_path: Dot-notation path to key (e.g., "Debrid.All-debrid")
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
Returns:
API key if found and not empty, None otherwise
"""
try:
2025-12-29 17:05:03 -08:00
parts = key_path.split(".")
2025-11-25 20:09:33 -08:00
value = config
for part in parts:
if isinstance(value, dict):
value = value.get(part)
else:
return None
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
if isinstance(value, str):
return value.strip() or None
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
return None
except Exception:
return None
def add_direct_link_to_result(
result: Any,
direct_link: str,
original_link: str
) -> None:
2025-11-25 20:09:33 -08:00
"""Add direct link information to result object.
2025-12-29 17:05:03 -08:00
2025-11-25 20:09:33 -08:00
Args:
result: Result object to modify (dict or object)
direct_link: The unlocked/direct URL
original_link: The original restricted URL
"""
if isinstance(result, dict):
2025-12-29 17:05:03 -08:00
result["direct_link"] = direct_link
result["original_link"] = original_link
2025-11-25 20:09:33 -08:00
else:
2025-12-29 17:05:03 -08:00
setattr(result, "direct_link", direct_link)
setattr(result, "original_link", original_link)
2025-11-25 20:09:33 -08:00
# ============================================================================
# URL Policy Resolution - Consolidated from url_parser.py
# ============================================================================
"""URL policy resolution for downlow workflows."""
@dataclass(slots=True)
class UrlPolicy:
"""Describe how a URL should be handled by download and screenshot flows."""
skip_download: bool = False
skip_metadata: bool = False
force_screenshot: bool = False
extra_tags: list[str] = field(default_factory=list)
def apply_tags(self, sources: Iterable[str]) -> list[str]:
tags = [tag.strip() for tag in self.extra_tags if tag and tag.strip()]
for value in sources:
text = str(value).strip()
if text:
tags.append(text)
return tags
def _normalise_rule(rule: dict[str, Any]) -> dict[str, Any] | None:
pattern = str(rule.get("pattern") or rule.get("host") or "").strip()
if not pattern:
return None
skip_download = bool(rule.get("skip_download"))
skip_metadata = bool(rule.get("skip_metadata"))
force_screenshot = bool(rule.get("force_screenshot"))
extra_tags_raw = rule.get("extra_tags")
if isinstance(extra_tags_raw, str):
extra_tags = [
part.strip() for part in extra_tags_raw.split(",") if part.strip()
]
2025-11-25 20:09:33 -08:00
elif isinstance(extra_tags_raw, (list, tuple, set)):
extra_tags = [str(item).strip() for item in extra_tags_raw if str(item).strip()]
else:
extra_tags = []
return {
"pattern": pattern,
"skip_download": skip_download,
"skip_metadata": skip_metadata,
"force_screenshot": force_screenshot,
"extra_tags": extra_tags,
}
def resolve_url_policy(config: dict[str, Any], url: str) -> UrlPolicy:
policies_raw = config.get("url_policies")
if not policies_raw:
return UrlPolicy()
if not isinstance(policies_raw, list):
return UrlPolicy()
parsed = urlparse(url)
subject = f"{parsed.netloc}{parsed.path}"
host = parsed.netloc
resolved = UrlPolicy()
for rule_raw in policies_raw:
if not isinstance(rule_raw, dict):
continue
rule = _normalise_rule(rule_raw)
if rule is None:
continue
pattern = rule["pattern"]
if not (fnmatch(host, pattern) or fnmatch(subject, pattern)):
continue
if rule["skip_download"]:
resolved.skip_download = True
if rule["skip_metadata"]:
resolved.skip_metadata = True
if rule["force_screenshot"]:
resolved.force_screenshot = True
if rule["extra_tags"]:
for tag in rule["extra_tags"]:
if tag not in resolved.extra_tags:
resolved.extra_tags.append(tag)
2025-12-29 17:05:03 -08:00
return resolved