Files
Medios-Macina/cmdlets/get_file.py
2025-11-25 20:09:33 -08:00

1618 lines
65 KiB
Python

from __future__ import annotations
from typing import Any, Dict, Optional, Sequence
from pathlib import Path
import shutil as _shutil
import subprocess as _subprocess
import json
import sys
from helper.logger import log
import uuid as _uuid
import time as _time
from downlow_helpers.progress import print_progress, print_final_progress, format_size
from downlow_helpers.http_client import HTTPClient
import fnmatch as _fnmatch
from . import register
import models
import pipeline as ctx
from helper import hydrus as hydrus_wrapper
from ._shared import Cmdlet, CmdletArg, normalize_hash, looks_like_hash, create_pipe_object_result
from config import resolve_output_dir, get_hydrus_url, get_hydrus_access_key
from downlow_helpers.alldebrid import AllDebridClient
def _is_alldebrid_pipe_data(line: str) -> bool:
"""Check if line is AllDebrid pipe format: ID|filename|size|..."""
parts = line.strip().split('|')
if len(parts) < 5:
return False
try:
# Check if first part is magnet ID (integer)
magnet_id = int(parts[0])
# Check if 3rd part (size) is integer
size = int(parts[2])
# Check if 4th part (status_code) is integer
status_code = int(parts[3])
return magnet_id > 0 and size >= 0 and status_code in {0, 1, 2, 3, 4}
except (ValueError, IndexError):
return False
def _handle_alldebrid_pipe(config: Dict[str, Any], args: Sequence[str]) -> int:
"""Handle AllDebrid magnet downloads from piped stdin."""
# Parse arguments
out_path = None
file_filter = None
i = 0
while i < len(args):
if args[i].lower() in {"-path", "--path", "path"} and i + 1 < len(args):
out_path = Path(args[i + 1]).expanduser()
i += 2
elif args[i].lower() in {"-file", "--file", "file"} and i + 1 < len(args):
file_filter = args[i + 1]
i += 2
else:
i += 1
if not out_path:
log("✗ -path required for AllDebrid downloads", file=sys.stderr)
return 1
# Read magnet IDs from stdin
magnets = []
try:
for line in sys.stdin:
line = line.strip()
if line and _is_alldebrid_pipe_data(line):
parts = line.split('|')
magnet_id = int(parts[0])
magnets.append(magnet_id)
except Exception as e:
log(f"✗ Error reading stdin: {e}", file=sys.stderr)
return 1
if not magnets:
log("✗ No valid magnet IDs in pipe", file=sys.stderr)
return 1
# Get API key
from config import get_debrid_api_key
api_key = get_debrid_api_key(config)
if not api_key:
log("✗ AllDebrid API key not configured", file=sys.stderr)
return 1
# Download from each magnet
client = AllDebridClient(api_key)
total_files = 0
failed_files = 0
log(f"Processing {len(magnets)} magnet(s)...", file=sys.stderr)
for magnet_id in magnets:
try:
# Fetch magnet files using magnet_status with include_files
magnet_info = client.magnet_status(magnet_id, include_files=True)
files_list = _extract_files_from_magnet(magnet_info, file_filter)
if not files_list:
log(f"⊘ No files in magnet {magnet_id}", file=sys.stderr)
continue
log(f"✓ Found {len(files_list)} file(s) in magnet {magnet_id}", file=sys.stderr)
# Download each file
for file_info in files_list:
try:
link = file_info['link']
filename = file_info['name']
# Unlock link to get direct URL
try:
direct_url = client.unlock_link(link)
if not direct_url:
log(f"✗ Failed to unlock link for {filename}", file=sys.stderr)
failed_files += 1
continue
except Exception as e:
log(f"✗ Error unlocking link: {e}", file=sys.stderr)
failed_files += 1
continue
# Download file
output_file = out_path / filename
if _download_file_from_alldebrid(direct_url, output_file, filename, file_info['size']):
log(f"✓ Downloaded: {filename}", file=sys.stderr)
total_files += 1
else:
log(f"✗ Failed to download: {filename}", file=sys.stderr)
failed_files += 1
except Exception as e:
log(f"✗ Error downloading file: {e}", file=sys.stderr)
failed_files += 1
except Exception as e:
log(f"✗ Error processing magnet {magnet_id}: {e}", file=sys.stderr)
failed_files += 1
log(f"✓ Download complete: {total_files} file(s) downloaded, {failed_files} failed", file=sys.stderr)
return 0 if failed_files == 0 else 1
def _extract_files_from_magnet(magnet_info: Dict[str, Any], filter_pattern: Optional[str] = None) -> list:
"""Extract files from magnet file tree, optionally filtering by pattern."""
files = []
def traverse(items: Any, prefix: str = "") -> None:
if not isinstance(items, list):
return
for item in items:
if not isinstance(item, dict):
continue
name = item.get('n', '')
link = item.get('l', '')
size = item.get('s', 0)
entries = item.get('e', [])
# File
if link:
full_path = f"{prefix}/{name}" if prefix else name
if filter_pattern is None or _fnmatch.fnmatch(name.lower(), filter_pattern.lower()):
files.append({'name': name, 'path': full_path, 'size': size, 'link': link})
# Folder
if entries:
full_path = f"{prefix}/{name}" if prefix else name
traverse(entries, full_path)
items = magnet_info.get('files', [])
traverse(items)
return files
def _download_file_from_alldebrid(url: str, output_path: Path, filename: str, file_size: int) -> bool:
"""Download a single file from AllDebrid with progress bar."""
output_path.parent.mkdir(parents=True, exist_ok=True)
try:
downloaded = 0
chunk_size = 1024 * 1024
start_time = _time.time()
last_update = start_time
with HTTPClient(timeout=30.0, headers={'User-Agent': 'downlow/1.0'}) as client:
response = client.get(url)
response.raise_for_status()
with open(output_path, 'wb', buffering=1024*1024) as f:
for chunk in response.iter_bytes(chunk_size):
if not chunk:
break
f.write(chunk)
downloaded += len(chunk)
# Update progress every 0.5 seconds to avoid spam
now = _time.time()
if now - last_update >= 0.5 or downloaded == file_size:
elapsed = now - start_time
speed = downloaded / elapsed if elapsed > 0 else 0
print_progress(filename, downloaded, file_size, speed)
last_update = now
# Print final progress line
elapsed = _time.time() - start_time
print_final_progress(filename, file_size, elapsed)
log(f"{filename} downloaded", file=sys.stderr)
return True
except Exception as e:
log(f"\n[get-file] ✗ Download error: {e}", file=sys.stderr)
return False
def _is_playable_in_mpv(file_path_or_ext: str, mime_type: Optional[str] = None) -> bool:
"""Check if file can be played in MPV based on extension or mime type."""
from helper.utils_constant import mime_maps
# Check mime type first if provided
if mime_type:
mime_lower = mime_type.lower()
# Simple prefix check for common media types
if any(mime_lower.startswith(prefix) for prefix in ['video/', 'audio/', 'image/']):
return True
# Extract extension
if file_path_or_ext.startswith('.'):
ext = file_path_or_ext.lower()
else:
ext = Path(file_path_or_ext).suffix.lower()
if not ext:
return False
# Check if extension is in playable categories
playable_categories = ['video', 'audio', 'image', 'image_sequence']
for category in playable_categories:
if category in mime_maps:
for key, info in mime_maps[category].items():
if info.get('ext', '').lower() == ext:
return True
return False
def _get_fixed_ipc_pipe() -> str:
"""Get the fixed IPC pipe name for persistent MPV connection.
Uses a fixed name 'mpv-medeia-macina' so all playback sessions
connect to the same MPV window/process instead of creating new instances.
"""
import platform
if platform.system() == 'Windows':
return "\\\\.\\pipe\\mpv-medeia-macina"
else:
return "/tmp/mpv-medeia-macina.sock"
def _send_to_mpv_pipe(file_url: str, ipc_pipe: str, title: str, headers: Optional[Dict[str, str]] = None) -> bool:
"""Send loadfile command to existing MPV via IPC pipe.
Returns True if successfully sent to existing MPV, False if pipe unavailable.
"""
import json
import socket
import platform
try:
# Prepare commands
# Use set_property for headers as loadfile options can be unreliable via IPC
header_str = ""
if headers:
header_str = ",".join([f"{k}: {v}" for k, v in headers.items()])
# Command 1: Set headers (or clear them)
cmd_headers = {
"command": ["set_property", "http-header-fields", header_str],
"request_id": 0
}
# Command 2: Load file using memory:// M3U to preserve title
# Sanitize title to avoid breaking M3U format
safe_title = title.replace("\n", " ").replace("\r", "")
m3u_content = f"#EXTM3U\n#EXTINF:-1,{safe_title}\n{file_url}\n"
cmd_load = {
"command": ["loadfile", f"memory://{m3u_content}", "append-play"],
"request_id": 1
}
if platform.system() == 'Windows':
# Windows named pipes require special handling
try:
# Open in r+b to read response
with open(ipc_pipe, 'r+b', buffering=0) as pipe:
# Send headers
pipe.write((json.dumps(cmd_headers) + "\n").encode('utf-8'))
pipe.flush()
pipe.readline() # Consume response for headers
# Send loadfile
pipe.write((json.dumps(cmd_load) + "\n").encode('utf-8'))
pipe.flush()
# Read response
response_line = pipe.readline()
if response_line:
resp = json.loads(response_line.decode('utf-8'))
if resp.get('error') != 'success':
log(f"[get-file] MPV error: {resp.get('error')}", file=sys.stderr)
return False
log(f"[get-file] Sent to existing MPV: {title}", file=sys.stderr)
return True
except (OSError, IOError):
# Pipe not available
return False
else:
# Unix socket for Linux/macOS
if not hasattr(socket, 'AF_UNIX'):
return False
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(ipc_pipe)
# Send headers
sock.sendall((json.dumps(cmd_headers) + "\n").encode('utf-8'))
sock.recv(4096) # Consume response
# Send loadfile
sock.sendall((json.dumps(cmd_load) + "\n").encode('utf-8'))
# Read response
try:
response_data = sock.recv(4096)
if response_data:
resp = json.loads(response_data.decode('utf-8'))
if resp.get('error') != 'success':
log(f"[get-file] MPV error: {resp.get('error')}", file=sys.stderr)
sock.close()
return False
except:
pass
sock.close()
log(f"[get-file] Sent to existing MPV: {title}", file=sys.stderr)
return True
except (OSError, socket.error, ConnectionRefusedError):
# Pipe doesn't exist or MPV not listening - will need to start new instance
return False
except Exception as e:
log(f"[get-file] IPC error: {e}", file=sys.stderr)
return False
def _play_in_mpv(file_url: str, file_title: str, is_stream: bool = False, headers: Optional[Dict[str, str]] = None) -> bool:
"""Play file in MPV using IPC pipe, creating new instance if needed.
Returns True on success, False on error.
"""
ipc_pipe = _get_fixed_ipc_pipe()
import json
import socket
import platform
try:
# First try to send to existing MPV instance
if _send_to_mpv_pipe(file_url, ipc_pipe, file_title, headers):
print(f"Added to MPV: {file_title}")
return True
# No existing MPV or pipe unavailable - start new instance
log(f"[get-file] Starting new MPV instance (pipe: {ipc_pipe})", file=sys.stderr)
cmd = ['mpv', file_url, f'--input-ipc-server={ipc_pipe}']
# Set title for new instance
cmd.append(f'--force-media-title={file_title}')
if headers:
# Format headers for command line
# --http-header-fields="Header1: Val1,Header2: Val2"
header_str = ",".join([f"{k}: {v}" for k, v in headers.items()])
cmd.append(f'--http-header-fields={header_str}')
# Detach process to prevent freezing parent CLI
kwargs = {}
if platform.system() == 'Windows':
# CREATE_NEW_CONSOLE might be better than CREATE_NO_WINDOW if MPV needs a window
# But usually MPV creates its own window.
# DETACHED_PROCESS (0x00000008) is also an option.
kwargs['creationflags'] = 0x00000008 # DETACHED_PROCESS
_subprocess.Popen(cmd, stdin=_subprocess.DEVNULL, stdout=_subprocess.DEVNULL, stderr=_subprocess.DEVNULL, **kwargs)
print(f"{'Streaming' if is_stream else 'Playing'} in MPV: {file_title}")
log(f"[get-file] Started MPV with {file_title} (IPC: {ipc_pipe})", file=sys.stderr)
return True
except FileNotFoundError:
log("Error: MPV not found. Install mpv to play media files", file=sys.stderr)
return False
except Exception as e:
log(f"Error launching MPV: {e}", file=sys.stderr)
return False
def _handle_search_result(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
"""Handle a file from search-file results using FileStorage backend."""
try:
from helper.file_storage import FileStorage
# Helper to get field from both dict and object
def get_field(obj: Any, field: str, default: Any = None) -> Any:
if isinstance(obj, dict):
return obj.get(field, default)
else:
return getattr(obj, field, default)
# Extract file information from ResultItem
storage_name = get_field(result, 'origin', None)
# Also check for 'source' field (from add-file and other cmdlets)
if not storage_name:
storage_name = get_field(result, 'source', None)
file_hash = get_field(result, 'hash_hex', None)
# Also check for file_hash field (from add-file and other cmdlets)
if not file_hash:
file_hash = get_field(result, 'file_hash', None)
file_title = get_field(result, 'title', 'file')
mime_type = get_field(result, 'mime', None)
file_path = get_field(result, 'target', None)
# Also check for 'file_path' field (from add-file and other cmdlets)
if not file_path:
file_path = get_field(result, 'file_path', None)
# Also check for 'path' field (from search-file and other cmdlets)
if not file_path:
file_path = get_field(result, 'path', None)
full_metadata = get_field(result, 'full_metadata', {})
magnet_id = full_metadata.get('magnet_id') if isinstance(full_metadata, dict) else None
if not storage_name:
log("Error: No storage backend specified in result", file=sys.stderr)
return 1
log(f"[get-file] Retrieving file from storage: {storage_name}", file=sys.stderr)
# Handle different storage backends
if storage_name.lower() == 'hydrus':
return _handle_hydrus_file(file_hash, file_title, config, args, mime_type=mime_type)
elif storage_name.lower() == 'local':
return _handle_local_file(file_path, file_title, args, file_hash=file_hash)
elif storage_name.lower() == 'download':
# Downloads are local files
return _handle_local_file(file_path, file_title, args, file_hash=file_hash)
elif storage_name.lower() == 'debrid':
# Extract magnet_id from result (search-file stores it in full_metadata or as custom attribute)
if not magnet_id:
magnet_id = get_field(result, 'magnet_id', None)
if not magnet_id:
log("Error: No magnet ID in debrid result", file=sys.stderr)
return 1
return _handle_debrid_file(magnet_id, file_title, config, args)
else:
log(f"Unknown storage backend: {storage_name}", file=sys.stderr)
return 1
except Exception as e:
log(f"Error processing search result: {e}", file=sys.stderr)
import traceback
traceback.print_exc(file=sys.stderr)
return 1
def _handle_hydrus_file(file_hash: Optional[str], file_title: str, config: Dict[str, Any], args: Sequence[str], mime_type: Optional[str] = None) -> int:
"""Handle file from Hydrus - auto-play in MPV if media file, otherwise open web URL."""
if not file_hash:
log("Error: No file hash provided", file=sys.stderr)
return 1
try:
hydrus_url = get_hydrus_url(config)
access_key = get_hydrus_access_key(config)
if not hydrus_url or not access_key:
log("Error: Hydrus not configured", file=sys.stderr)
return 1
# Check if it's a playable media file based on filename or mime type
is_media = _is_playable_in_mpv(file_title)
if not is_media and mime_type:
# Check mime type if filename check failed
if any(m in mime_type.lower() for m in ['video/', 'audio/', 'image/']):
is_media = True
force_mpv = any(str(a).lower() in {'-mpv', '--mpv', 'mpv'} for a in args)
force_browser = any(str(a).lower() in {'-web', '--web', 'web', '-browser', '--browser'} for a in args)
# Check MPV availability
from hydrus_health_check import check_mpv_availability
mpv_available, _ = check_mpv_availability()
# Construct URLs for streaming/viewing
# For streaming, we use headers for auth, so we don't put the key in the URL
stream_url = f"{hydrus_url}/get_files/file?hash={file_hash}"
# For browser, we still need the key in the URL
web_url = f"{hydrus_url}/get_files/file?hash={file_hash}&Hydrus-Client-API-Access-Key={access_key}"
headers = {
"Hydrus-Client-API-Access-Key": access_key
}
if force_browser:
# User explicitly wants browser
ipc_pipe = _get_fixed_ipc_pipe()
result_dict = create_pipe_object_result(
source='hydrus',
identifier=file_hash,
file_path=web_url,
cmdlet_name='get-file',
title=file_title,
file_hash=file_hash,
extra={
'ipc': ipc_pipe,
'action_type': 'browser',
'web_url': web_url,
'hydrus_url': hydrus_url,
'access_key': access_key
}
)
ctx.emit(result_dict)
try:
import webbrowser
webbrowser.open(web_url)
log(f"[get-file] Opened in browser: {file_title}", file=sys.stderr)
except Exception:
pass
return 0
elif force_mpv or (is_media and mpv_available):
# Auto-play in MPV for media files (if available), or user requested it
if _play_in_mpv(stream_url, file_title, is_stream=True, headers=headers):
# Emit result as PipeObject-compatible dict for pipelining
ipc_pipe = _get_fixed_ipc_pipe()
result_dict = create_pipe_object_result(
source='hydrus',
identifier=file_hash,
file_path=stream_url,
cmdlet_name='get-file',
title=file_title,
file_hash=file_hash,
extra={
'ipc': ipc_pipe,
'action_type': 'streaming',
'web_url': web_url,
'hydrus_url': hydrus_url,
'access_key': access_key
}
)
ctx.emit(result_dict)
return 0
else:
# Fall back to browser
try:
import webbrowser
webbrowser.open(web_url)
log(f"[get-file] Opened in browser instead", file=sys.stderr)
except Exception:
pass
return 0
else:
# Not media, open in browser
ipc_pipe = _get_fixed_ipc_pipe()
result_dict = create_pipe_object_result(
source='hydrus',
identifier=file_hash,
file_path=web_url,
cmdlet_name='get-file',
title=file_title,
file_hash=file_hash,
extra={
'ipc': ipc_pipe,
'action_type': 'browser',
'web_url': web_url,
'hydrus_url': hydrus_url,
'access_key': access_key
}
)
ctx.emit(result_dict)
try:
import webbrowser
webbrowser.open(web_url)
log(f"[get-file] Opened in browser: {file_title}", file=sys.stderr)
except Exception:
pass
return 0
except Exception as e:
log(f"Error handling Hydrus file: {e}", file=sys.stderr)
import traceback
traceback.print_exc(file=sys.stderr)
return 1
def _handle_local_file(file_path: Optional[str], file_title: str, args: Sequence[str], file_hash: Optional[str] = None) -> int:
"""Handle file from local storage - auto-play in MPV if media, otherwise open with default app."""
if not file_path:
log("Error: No file path provided", file=sys.stderr)
return 1
try:
source = Path(file_path)
if not source.exists():
log(f"Error: File not found: {file_path}", file=sys.stderr)
return 1
# Check for explicit user flags
force_mpv = any(str(a).lower() in {'-mpv', '--mpv', 'mpv'} for a in args)
force_default = any(str(a).lower() in {'-open', '--open', 'open'} for a in args)
# Check if it's a playable media file
is_media = _is_playable_in_mpv(str(source))
# Check MPV availability
from hydrus_health_check import check_mpv_availability
mpv_available, _ = check_mpv_availability()
if force_default:
# User explicitly wants default application
import subprocess as sp
import platform
import os
try:
if platform.system() == 'Darwin': # macOS
sp.run(['open', file_path])
elif platform.system() == 'Windows':
os.startfile(file_path)
else: # Linux
sp.run(['xdg-open', file_path])
ctx.emit(f"Opened: {file_title}")
log(f"[get-file] Opened {file_title} with default app", file=sys.stderr)
return 0
except Exception as e:
log(f"Error opening file: {e}", file=sys.stderr)
return 1
elif force_mpv or (is_media and mpv_available):
# Auto-play in MPV for media files (if available), or user requested it
if _play_in_mpv(file_path, file_title, is_stream=False):
# Emit result as PipeObject-compatible dict for pipelining
ipc_pipe = _get_fixed_ipc_pipe()
result_dict = create_pipe_object_result(
source='local',
identifier=str(Path(file_path).stem) if file_path else 'unknown',
file_path=file_path,
cmdlet_name='get-file',
title=file_title,
file_hash=file_hash, # Include hash from search result if available
extra={
'ipc': ipc_pipe, # MPV IPC pipe for Lua script control
'action_type': 'playing' # Distinguish from other get-file actions
}
)
ctx.emit(result_dict)
return 0
else:
# Fall back to default application
try:
import os
import platform
if platform.system() == 'Darwin': # macOS
_subprocess.run(['open', file_path])
elif platform.system() == 'Windows':
os.startfile(file_path)
else: # Linux
_subprocess.run(['xdg-open', file_path])
log(f"[get-file] Opened with default app instead", file=sys.stderr)
except Exception:
pass
return 0
else:
# Not media - open with default application
import subprocess as sp
import platform
import os
try:
if platform.system() == 'Darwin': # macOS
sp.run(['open', file_path])
elif platform.system() == 'Windows':
# Use os.startfile for more reliable Windows handling
os.startfile(file_path)
else: # Linux
sp.run(['xdg-open', file_path])
print(f"Opened: {file_title}")
log(f"[get-file] Opened {file_title} with default app", file=sys.stderr)
# Emit result for downstream processing
result_dict = create_pipe_object_result(
source='local',
identifier=str(Path(file_path).stem) if file_path else 'unknown',
file_path=file_path,
cmdlet_name='get-file',
title=file_title,
file_hash=file_hash,
extra={'action_type': 'opened'}
)
ctx.emit(result_dict)
return 0
except Exception as e:
log(f"Error opening file with default app: {e}", file=sys.stderr)
return 1
except Exception as e:
log(f"Error handling local file: {e}", file=sys.stderr)
return 1
def _handle_debrid_file(magnet_id: int, magnet_title: str, config: Dict[str, Any], args: Sequence[str]) -> int:
"""Handle magnet file from AllDebrid storage - download to local path."""
# Parse output path argument
out_path = None
i = 0
args_list = [str(a) for a in args]
while i < len(args_list):
if args_list[i].lower() in {"-path", "--path", "path"} and i + 1 < len(args_list):
out_path = Path(args_list[i + 1]).expanduser()
i += 2
else:
i += 1
if not out_path:
log("✗ -Path required for debrid downloads", file=sys.stderr)
return 1
# Ensure output directory exists
try:
out_path.mkdir(parents=True, exist_ok=True)
except Exception as e:
log(f"✗ Error creating output directory: {e}", file=sys.stderr)
return 1
# Get API key
from config import get_debrid_api_key
api_key = get_debrid_api_key(config)
if not api_key:
log("✗ AllDebrid API key not configured in config.json", file=sys.stderr)
return 1
try:
client = AllDebridClient(api_key)
log(f"[get-file] Downloading magnet {magnet_id}: {magnet_title}", file=sys.stderr)
# Fetch magnet files
try:
magnet_info = client.magnet_status(magnet_id, include_files=True)
except Exception as e:
log(f"✗ Failed to fetch magnet files: {e}", file=sys.stderr)
return 1
# Extract files from magnet
files_list = _extract_files_from_magnet(magnet_info)
if not files_list:
log(f"✗ No files in magnet {magnet_id}", file=sys.stderr)
return 1
log(f"✓ Found {len(files_list)} file(s) in magnet {magnet_id}", file=sys.stderr)
# Download each file
total_files = 0
failed_files = 0
for file_info in files_list:
try:
link = file_info['link']
filename = file_info['name']
file_size = file_info['size']
# Unlock link to get direct URL
try:
direct_url = client.unlock_link(link)
if not direct_url:
log(f"✗ Failed to unlock link for {filename}", file=sys.stderr)
failed_files += 1
continue
except Exception as e:
log(f"✗ Error unlocking link: {e}", file=sys.stderr)
failed_files += 1
continue
# Download file
output_file = out_path / filename
if _download_file_from_alldebrid(direct_url, output_file, filename, file_size):
log(f"✓ Downloaded: {filename}", file=sys.stderr)
total_files += 1
else:
log(f"✗ Failed to download: {filename}", file=sys.stderr)
failed_files += 1
except Exception as e:
log(f"✗ Error downloading file: {e}", file=sys.stderr)
failed_files += 1
log(f"✓ Download complete: {total_files} file(s) downloaded, {failed_files} failed", file=sys.stderr)
if total_files > 0:
# Emit result for downstream processing
result_dict = create_pipe_object_result(
source='debrid',
identifier=str(magnet_id),
file_path=str(out_path),
cmdlet_name='get-file',
title=magnet_title,
extra={
'magnet_id': magnet_id,
'files_downloaded': total_files,
'download_dir': str(out_path)
}
)
ctx.emit(result_dict)
return 0 if failed_files == 0 else 1
except Exception as e:
log(f"✗ Error processing debrid download: {e}", file=sys.stderr)
import traceback
traceback.print_exc(file=sys.stderr)
return 1
@register(["get-file"]) # primary name
def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
# Help: if any help token is present, print CMDLET JSON and exit
try:
if any(str(a).lower() in {"-?", "/?", "--help", "-h", "help", "--cmdlet"} for a in args):
log(json.dumps(CMDLET, ensure_ascii=False, indent=2))
return 0
except Exception:
pass
# Helper to get field from both dict and object
def get_field(obj: Any, field: str, default: Any = None) -> Any:
if isinstance(obj, dict):
return obj.get(field, default)
else:
return getattr(obj, field, default)
# Check if result is a list (from @N selection) and extract the first item
actual_result = result
if isinstance(result, list) and len(result) > 0:
actual_result = result[0]
# Check if this is a FileStorage search result (has origin field indicating a backend)
# This handles both dict and ResultItem objects
origin = get_field(actual_result, 'origin', None)
# Also check for 'source' field (from add-file and other cmdlets)
if not origin:
origin = get_field(actual_result, 'source', None)
if origin and origin.lower() in {'hydrus', 'local', 'debrid', 'alldebrid'}:
# This is a search result with explicit origin - handle it via _handle_search_result
return _handle_search_result(actual_result, args, config)
# Handle ResultItem from search-file via @N selection
# The result can be either:
# 1. A single ResultItem (direct call)
# 2. A list of ResultItems (from @N selection in CLI)
result_item = None
if result and hasattr(result, '__class__'):
if result.__class__.__name__ == 'ResultItem':
result_item = result
elif isinstance(result, list) and len(result) > 0:
# @N selection creates a list, extract the first item if it's a ResultItem
if hasattr(result[0], '__class__') and result[0].__class__.__name__ == 'ResultItem':
result_item = result[0]
if result_item:
return _handle_search_result(result_item, args, config)
# Handle PipeObject results from previous get-file call (for chaining)
if result and isinstance(result, dict) and result.get('action', '').startswith('cmdlet:get-file'):
# This is from a previous get-file result - just pass it through
# Don't treat it as a new file to play, just emit for pipeline chaining
ctx.emit(result)
return 0
# Check for AllDebrid pipe input (from search-debrid)
# Try to read first line from stdin to detect format
first_line = None
try:
# Try to read one line without blocking
if hasattr(sys.stdin, 'readable') and sys.stdin.readable():
first_line = sys.stdin.readline().strip()
except Exception:
pass
if first_line and _is_alldebrid_pipe_data(first_line):
# This is AllDebrid pipe data - handle it separately
# Put the line back by creating a chain with the rest of stdin
import io
try:
remaining_stdin = sys.stdin.read()
except:
remaining_stdin = ""
sys.stdin = io.StringIO(first_line + '\n' + remaining_stdin)
return _handle_alldebrid_pipe(config, args)
elif first_line:
# Not AllDebrid data, put it back for normal processing
import io
try:
remaining_stdin = sys.stdin.read()
except:
remaining_stdin = ""
sys.stdin = io.StringIO(first_line + '\n' + remaining_stdin)
# Helpers
def _sanitize_name(text: str) -> str:
allowed = []
for ch in text:
allowed.append(ch if (ch.isalnum() or ch in {"-", "_", " ", "."}) else " ")
return (" ".join("".join(allowed).split()) or "export").strip()
def _ffprobe_duration_seconds(path: Path) -> Optional[float]:
ffprobe_path = _shutil.which('ffprobe')
if not ffprobe_path:
return None
try:
res = _subprocess.run(
[ffprobe_path, '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', str(path)],
stdout=_subprocess.PIPE,
stderr=_subprocess.PIPE,
check=True,
text=True,
)
out = (res.stdout or '').strip()
if not out:
return None
value = float(out)
return value if value > 0 else None
except Exception:
return None
def _parse_args(tokens: Sequence[str]) -> tuple[Optional[Path], Optional[str], Optional[str], Optional[str], bool]:
out_override: Optional[Path] = None
size_spec: Optional[str] = None
convert_spec: Optional[str] = None
hash_spec: Optional[str] = None
export_metadata: bool = False
i = 0
while i < len(tokens):
t = tokens[i]
low = t.lower()
if low in {"-path", "--path", "path"} and i + 1 < len(tokens):
try:
out_override = Path(tokens[i + 1]).expanduser()
except Exception:
out_override = None
i += 2
continue
if low in {"size", "-size", "--size"} and i + 1 < len(tokens):
size_spec = tokens[i + 1]
i += 2
continue
if low in {"convert", "-convert", "--convert"} and i + 1 < len(tokens):
convert_spec = tokens[i + 1]
i += 2
continue
if low in {"-hash", "--hash", "hash"} and i + 1 < len(tokens):
hash_spec = tokens[i + 1]
i += 2
continue
if low in {"-metadata", "--metadata", "metadata"}:
export_metadata = True
i += 1
continue
i += 1
return out_override, size_spec, convert_spec, hash_spec, export_metadata
def _compute_target_bytes(size_spec: Optional[str], source_bytes: int) -> Optional[int]:
if not size_spec:
return None
text = str(size_spec).strip().lower()
if not text:
return None
if text.endswith('%'):
try:
pct = float(text[:-1])
except ValueError:
return None
pct = max(0.0, min(100.0, pct))
target = int(round(source_bytes * (pct / 100.0)))
else:
val = text
if val.endswith('mb'):
val = val[:-2]
elif val.endswith('m'):
val = val[:-1]
try:
mb = float(val)
except ValueError:
return None
target = int(round(mb * 1024 * 1024))
min_bytes = 1 * 1024 * 1024
if target <= 0:
target = min_bytes
return min(target, source_bytes)
def _guess_kind_from_suffix(path: Path) -> str:
sfx = path.suffix.lower()
if sfx in {'.mp4', '.mkv', '.webm', '.mov', '.avi', '.flv', '.mpg', '.mpeg', '.ts', '.m4v', '.wmv'}:
return 'video'
if sfx in {'.mp3', '.flac', '.wav', '.m4a', '.aac', '.ogg', '.opus', '.mka'}:
return 'audio'
return 'other'
def _extract_metadata_from_tags(tags_payload: Dict[str, Any], file_hash: str, input_kind: str = '') -> Dict[str, str]:
"""Extract common metadata fields from Hydrus tags.
Returns a dict mapping FFmpeg metadata keys to values.
Supports: title, artist, album, track, date, genre, etc.
For audio files, applies sensible defaults:
- If no album, uses title as album
- If no track, defaults to 1
- album_artist is set to artist value
"""
metadata = {}
# Map of common tag namespaces to FFmpeg metadata keys
tag_map = {
'title': 'title',
'artist': 'artist',
'album': 'album',
'track': 'track',
'track_number': 'track',
'date': 'date',
'year': 'date',
'genre': 'genre',
'composer': 'composer',
'comment': 'comment',
}
if not tags_payload or 'metadata' not in tags_payload or not tags_payload['metadata']:
return metadata
entry = tags_payload['metadata'][0]
if 'tags' not in entry or not isinstance(entry['tags'], dict):
return metadata
tags_dict = entry['tags']
# Extract metadata from tags
for _service_key, service_data in tags_dict.items():
if not isinstance(service_data, dict):
continue
display_tags = service_data.get('display_tags', {})
if not isinstance(display_tags, dict):
continue
current_tags = display_tags.get('0', [])
if not isinstance(current_tags, list):
continue
for tag in current_tags:
tag_str = str(tag).strip()
if ':' in tag_str:
namespace, value = tag_str.split(':', 1)
namespace = namespace.lower().strip()
value = value.strip()
if namespace in tag_map and value:
ffmpeg_key = tag_map[namespace]
# Use first occurrence
if ffmpeg_key not in metadata:
metadata[ffmpeg_key] = value
# Apply sensible defaults for audio files
if input_kind == 'audio':
# If no album, use title as album
if 'album' not in metadata and 'title' in metadata:
metadata['album'] = metadata['title']
# If no track, default to 1
if 'track' not in metadata:
metadata['track'] = '1'
# If no album_artist, use artist
if 'artist' in metadata:
metadata['album_artist'] = metadata['artist']
return metadata
out_override, size_spec, convert_spec, hash_spec, export_metadata = _parse_args(args)
default_dir = resolve_output_dir(config)
media_kind = (get_field(result, 'media_kind', '') or '').lower()
_chk = []
if out_override:
_chk.append(f"Path={out_override}")
if size_spec:
_chk.append(f"Size={size_spec}")
if convert_spec:
_chk.append(f"Convert={convert_spec}")
# Prefer explicit -hash over result hash for logging
file_hash_for_log = None
if hash_spec and looks_like_hash(hash_spec):
file_hash_for_log = normalize_hash(hash_spec)
else:
hash_value = get_field(result, 'hash_hex', None)
file_hash_for_log = normalize_hash(hash_value) if hash_value else None
if _chk or file_hash_for_log:
msg = "get-file: " + ", ".join(_chk) if _chk else "get-file"
if file_hash_for_log:
msg = f"{msg} (Hash={file_hash_for_log})"
ctx.emit(msg)
base_name = _sanitize_name(get_field(result, 'title', None) or '')
if not base_name:
target_attr = get_field(result, 'target', None)
if isinstance(target_attr, str) and target_attr and not target_attr.startswith(('http://', 'https://')):
base_name = _sanitize_name(Path(target_attr).stem)
else:
base_name = 'export'
local_target = get_field(result, 'target', None)
is_url = isinstance(local_target, str) and local_target.startswith(('http://', 'https://'))
# Establish file hash (prefer -hash override when provided and valid)
if hash_spec and looks_like_hash(hash_spec):
file_hash = normalize_hash(hash_spec)
else:
file_hash = normalize_hash(get_field(result, 'hash_hex', None)) if get_field(result, 'hash_hex', None) else None
source_path: Optional[Path] = None
source_size: Optional[int] = None
duration_sec: Optional[float] = None
tags_payload: Dict[str, Any] = {}
urls_payload: Dict[str, Any] = {}
cleanup_source: bool = False
if isinstance(local_target, str) and not is_url and not (hash_spec and file_hash):
p = Path(local_target)
if not p.exists():
log(f"File missing: {p}")
return 1
source_path = p
try:
source_size = p.stat().st_size
except OSError:
source_size = None
duration_sec = _ffprobe_duration_seconds(p)
if file_hash is None:
for sc in (p.with_suffix('.tags'), p.with_suffix('.tags.txt')):
try:
if sc.exists():
text = sc.read_text(encoding='utf-8', errors='ignore')
for line in text.splitlines():
ls = line.strip().lower()
if ls.startswith('hash:'):
candidate = line.split(':', 1)[1].strip() if ':' in line else ''
if looks_like_hash(candidate):
file_hash = candidate.lower()
break
except OSError:
pass
elif file_hash:
try:
client = hydrus_wrapper.get_client(config)
except Exception as exc:
log(f"Hydrus client unavailable: {exc}")
return 1
if client is None:
log("Hydrus client unavailable")
return 1
# Fetch metadata and tags (needed for both -metadata flag and audio tagging)
# Fetch tags
try:
tags_payload = client.fetch_file_metadata(hashes=[file_hash], include_service_keys_to_tags=True)
except Exception:
tags_payload = {}
# Fetch URLs
try:
urls_payload = client.fetch_file_metadata(hashes=[file_hash], include_file_urls=True)
except Exception:
urls_payload = {}
# Extract title from metadata if base_name is still 'export'
if base_name == 'export' and tags_payload:
try:
file_metadata = tags_payload.get('file_metadata', [])
if file_metadata and isinstance(file_metadata, list) and len(file_metadata) > 0:
meta = file_metadata[0]
if isinstance(meta, dict):
tags_dict = meta.get('tags', {})
if isinstance(tags_dict, dict):
# Look for title in storage tags
for service in tags_dict.values():
if isinstance(service, dict):
storage = service.get('storage_tags', {})
if isinstance(storage, dict):
for tag_list in storage.values():
if isinstance(tag_list, list):
for tag in tag_list:
if isinstance(tag, str) and tag.lower().startswith('title:'):
title_val = tag.split(':', 1)[1].strip()
if title_val:
base_name = _sanitize_name(title_val)
break
if base_name != 'export':
break
if base_name != 'export':
break
except Exception:
pass
# Normal file export (happens regardless of -metadata flag)
try:
from downlow_helpers.hydrus import hydrus_export as _hydrus_export
except Exception:
_hydrus_export = None # type: ignore
if _hydrus_export is None:
log("Hydrus export helper unavailable")
return 1
download_dir = out_override if (out_override and out_override.is_dir()) else default_dir
try:
download_dir.mkdir(parents=True, exist_ok=True)
except Exception:
# If mkdir fails, fall back to default_dir
download_dir = default_dir
# Verify the directory is writable; if not, fall back to default
try:
test_file = download_dir / f".downlow_write_test_{_uuid.uuid4().hex[:8]}"
test_file.touch()
test_file.unlink()
except (OSError, PermissionError):
# Directory is not writable, use default_dir instead
download_dir = default_dir
try:
download_dir.mkdir(parents=True, exist_ok=True)
except Exception:
pass
token = (_uuid.uuid4().hex[:8])
provisional_stem = f"{base_name}.dlhx_{token}"
provisional = download_dir / f"{provisional_stem}.bin"
class _Args:
pass
args_obj = _Args()
setattr(args_obj, 'output', provisional)
setattr(args_obj, 'format', 'copy')
setattr(args_obj, 'tmp_dir', str(download_dir))
setattr(args_obj, 'metadata_json', None)
setattr(args_obj, 'hydrus_url', get_hydrus_url(config, "home") or "http://localhost:45869")
setattr(args_obj, 'access_key', get_hydrus_access_key(config, "home") or "")
setattr(args_obj, 'timeout', float(config.get('HydrusNetwork_Request_Timeout') or 60.0))
try:
file_url = client.file_url(file_hash)
except Exception:
file_url = None
setattr(args_obj, 'file_url', file_url)
setattr(args_obj, 'file_hash', file_hash)
import io as _io, contextlib as _contextlib
_buf = _io.StringIO()
status = 1
with _contextlib.redirect_stdout(_buf):
status = _hydrus_export(args_obj, None)
if status != 0:
stderr_text = _buf.getvalue().strip()
if stderr_text:
log(stderr_text)
return status
json_text = _buf.getvalue().strip().splitlines()[-1] if _buf.getvalue() else ''
final_from_json: Optional[Path] = None
try:
payload = json.loads(json_text) if json_text else None
if isinstance(payload, dict):
outp = payload.get('output')
if isinstance(outp, str) and outp:
final_from_json = Path(outp)
except Exception:
final_from_json = None
if final_from_json and final_from_json.exists():
source_path = final_from_json
else:
candidates = [p for p in provisional.parent.glob(provisional_stem + '*') if p.exists() and p.is_file()]
non_provisional = [p for p in candidates if p.suffix.lower() not in {'.bin', '.hydrus'}]
pick_from = non_provisional if non_provisional else candidates
if pick_from:
try:
source_path = max(pick_from, key=lambda p: p.stat().st_mtime)
except Exception:
source_path = pick_from[0]
else:
source_path = provisional
candidates = [p for p in provisional.parent.glob(provisional_stem + '*') if p.exists() and p.is_file()]
non_provisional = [p for p in candidates if p.suffix.lower() not in {'.bin', '.hydrus'}]
pick_from = non_provisional if non_provisional else candidates
if pick_from:
try:
source_path = max(pick_from, key=lambda p: p.stat().st_mtime)
except Exception:
source_path = pick_from[0]
else:
source_path = provisional
try:
source_size = source_size or (source_path.stat().st_size if source_path.exists() else None)
except OSError:
source_size = source_size
if duration_sec is None:
duration_sec = _ffprobe_duration_seconds(source_path)
cleanup_source = True
else:
log("Selected result is neither a local file nor a Hydrus record")
return 1
convert = (str(convert_spec or '').strip().lower())
if convert not in {'', 'copy', 'mp4', 'webm', 'audio', 'mp3', 'opus'}:
log(f"Unsupported Convert value: {convert_spec}")
return 1
if not convert:
convert = 'copy'
input_kind = media_kind or _guess_kind_from_suffix(source_path)
if input_kind == 'audio' and convert in {'mp4', 'webm'}:
log("Cannot convert audio to video")
return 1
def _ext_for_convert(conv: str, src: Path) -> str:
if conv == 'mp4':
return '.mp4'
if conv == 'webm':
return '.webm'
if conv in {'audio', 'mp3'}:
return '.mp3'
if conv == 'opus':
return '.opus'
return src.suffix or ''
auto_named = True
if out_override is not None and out_override.exists() and out_override.is_dir():
dest_dir = out_override
dest_ext = _ext_for_convert(convert, source_path)
dest_path = dest_dir / f"{base_name}{dest_ext}"
else:
dest_dir = default_dir
dest_ext = _ext_for_convert(convert, source_path)
if out_override and not out_override.exists() and not str(out_override).endswith(('/', '\\')):
dest_path = out_override
auto_named = False
else:
dest_path = (dest_dir / f"{base_name}{dest_ext}")
if source_size is None:
try:
source_size = source_path.stat().st_size
except OSError:
source_size = None
if source_size is None:
log("Unable to determine source size for sizing logic; proceeding without Size targeting")
target_bytes = None
else:
target_bytes = _compute_target_bytes(size_spec, int(source_size))
if target_bytes and (source_size or 0):
try:
from ..downlow import _fmt_bytes as _fmt_bytes_helper
except ImportError:
try:
from downlow import _fmt_bytes as _fmt_bytes_helper # type: ignore
except ImportError:
_fmt_bytes_helper = lambda x: f"{x} bytes" # type: ignore
except Exception:
_fmt_bytes_helper = lambda x: f"{x} bytes" # type: ignore
ctx.emit(f"Resizing target: {_fmt_bytes_helper(source_size)} -> {_fmt_bytes_helper(target_bytes)}")
cleanup_source = locals().get('cleanup_source', False)
if convert == 'copy' and (not target_bytes or target_bytes >= (source_size or 0)):
# Simple copy without FFmpeg processing
# Only skip this if we need to write metadata (then FFmpeg handles it)
if not (export_metadata or (tags_payload and tags_payload.get('metadata'))):
try:
dest_path.parent.mkdir(parents=True, exist_ok=True)
final_dest = _unique_path(dest_path)
_shutil.copy2(source_path, final_dest)
ctx.emit(f"Exported to {final_dest}")
log(f"Exported: {final_dest}", file=sys.stderr)
if cleanup_source:
try:
if source_path.exists() and source_path != final_dest:
source_path.unlink()
except OSError:
pass
return 0
except Exception as exc:
log(f"Copy failed: {exc}")
return 1
else:
# Metadata exists, so we need to go through FFmpeg to embed and write sidecar
# Fall through to FFmpeg section below
pass
convert_effective = convert
if convert == 'copy' and target_bytes and (source_size or 0) > target_bytes:
if input_kind == 'video':
convert_effective = 'mp4'
elif input_kind == 'audio':
convert_effective = 'copy'
else:
convert_effective = convert
ffmpeg_path = _shutil.which('ffmpeg')
if not ffmpeg_path:
log("ffmpeg executable not found in PATH")
return 1
# Extract metadata from tags to embed in file
file_metadata = _extract_metadata_from_tags(tags_payload, file_hash or '', input_kind)
if file_metadata:
metadata_msg = ', '.join(f'{k}={v}' for k, v in file_metadata.items())
ctx.emit(f"[metadata] Embedding: {metadata_msg}")
ctx.print_if_visible(f"[get-file] Embedding metadata: {metadata_msg}", file=sys.stderr)
else:
ctx.print_if_visible(f"[get-file] No metadata tags found to embed", file=sys.stderr)
cmd: list[str] = [ffmpeg_path, '-y', '-i', str(source_path)]
# Add metadata flags to FFmpeg command
for key, value in file_metadata.items():
cmd.extend(['-metadata', f'{key}={value}'])
conv = convert_effective
if conv in {'mp4', 'webm', 'copy'}:
video_bitrate: Optional[int] = None
audio_bitrate: int = 128_000
if target_bytes and duration_sec and duration_sec > 0:
total_bps = max(1, int((target_bytes * 8) / duration_sec))
if total_bps <= audio_bitrate + 50_000:
if input_kind == 'video':
video_bitrate = max(50_000, total_bps - audio_bitrate)
else:
video_bitrate = None
else:
video_bitrate = total_bps - audio_bitrate
if conv == 'webm':
cmd += ['-c:v', 'libvpx-vp9']
if video_bitrate:
cmd += ['-b:v', str(video_bitrate)]
else:
cmd += ['-b:v', '0', '-crf', '32']
cmd += ['-c:a', 'libopus', '-b:a', '160k']
elif conv == 'mp4' or (conv == 'copy' and input_kind == 'video'):
cmd += ['-c:v', 'libx265', '-preset', 'medium', '-tag:v', 'hvc1', '-pix_fmt', 'yuv420p']
if video_bitrate:
cmd += ['-b:v', str(video_bitrate)]
else:
cmd += ['-crf', '26']
cmd += ['-c:a', 'aac', '-b:a', '192k']
if conv == 'mp4' or (conv == 'copy' and input_kind == 'video'):
cmd += ['-movflags', '+faststart']
if convert_spec and conv != 'copy':
ctx.emit(f"Converting video -> {conv} (duration={duration_sec or 'unknown'}s)")
else:
if target_bytes and duration_sec and duration_sec > 0:
total_bps = max(1, int((target_bytes * 8) / duration_sec))
abr = max(32_000, min(320_000, total_bps))
else:
abr = 192_000
if conv in {'audio', 'mp3'}:
cmd += ['-vn', '-c:a', 'libmp3lame', '-b:a', str(abr)]
elif conv == 'opus':
cmd += ['-vn', '-c:a', 'libopus', '-b:a', str(abr)]
else:
ext = (source_path.suffix.lower() if source_path else '')
if ext in {'.mp3'}:
cmd += ['-vn', '-c:a', 'libmp3lame', '-b:a', str(abr)]
elif ext in {'.opus', '.ogg'}:
cmd += ['-vn', '-c:a', 'libopus', '-b:a', str(abr)]
elif ext in {'.m4a', '.aac'}:
cmd += ['-vn', '-c:a', 'aac', '-b:a', str(abr)]
else:
cmd += ['-vn', '-c:a', 'libmp3lame', '-b:a', str(abr)]
if convert_spec and conv != 'copy':
ctx.emit(f"Converting audio -> {conv}")
if conv in {'audio','mp3'}:
desired_ext = '.mp3'
elif conv == 'opus':
desired_ext = '.opus'
elif conv == 'webm':
desired_ext = '.webm'
elif conv == 'mp4':
desired_ext = '.mp4'
else:
desired_ext = source_path.suffix
if (not dest_path.suffix) or auto_named or (dest_path.suffix.lower() in {'.hydrus', '.bin'}):
dest_path = dest_path.with_suffix(desired_ext)
suffix_parts: list[str] = []
def _size_label(raw: Optional[str], tb: Optional[int]) -> Optional[str]:
if not raw:
return None
text = str(raw).strip()
if text.endswith('%'):
return text
if not tb:
return None
mb = int(round(tb / (1024*1024)))
return f"{mb}Mb"
label = _size_label(size_spec, locals().get('target_bytes'))
if label:
suffix_parts.append(label)
if convert_spec and convert.lower() != 'copy':
label_map = {'mp4':'MP4','webm':'WEBM','audio':'AUDIO','mp3':'MP3','opus':'OPUS'}
suffix_parts.append(label_map.get(convert.lower(), convert.upper()))
if suffix_parts and auto_named:
_aug = f"{base_name} (" + ",".join(suffix_parts) + ")"
dest_path = dest_path.with_name(_aug + dest_path.suffix)
try:
dest_path.parent.mkdir(parents=True, exist_ok=True)
final_dest = _unique_path(dest_path)
cmd.append(str(final_dest))
completed = _subprocess.run(cmd, stdout=_subprocess.PIPE, stderr=_subprocess.PIPE, text=True)
if completed.returncode != 0:
stderr = (completed.stderr or '').strip()
log(f"ffmpeg failed ({completed.returncode}): {stderr}")
return 1
ctx.emit(f"Exported to {final_dest}")
log(f"Exported: {final_dest}", file=sys.stderr)
# Always write the .tags sidecar with metadata (hash, tags, URLs)
# This ensures metadata is preserved even if FFmpeg embedding didn't work
try:
metadata_lines = []
# Add hash
if file_hash:
metadata_lines.append(f"hash:{file_hash}")
# Extract tags from metadata payload using correct structure
tags_set = set()
if 'metadata' in tags_payload and tags_payload['metadata']:
entry = tags_payload['metadata'][0]
if 'tags' in entry and isinstance(entry['tags'], dict):
for _service_key, service_data in entry['tags'].items():
if isinstance(service_data, dict):
display_tags = service_data.get('display_tags', {})
if isinstance(display_tags, dict):
current_tags = display_tags.get('0', [])
if isinstance(current_tags, list):
tags_set.update(current_tags)
# Add tags (sorted, no prefix)
for tag in sorted(tags_set):
metadata_lines.append(tag)
# Extract and add URLs
if 'metadata' in urls_payload and urls_payload['metadata']:
entry = urls_payload['metadata'][0]
if 'known_urls' in entry and isinstance(entry['known_urls'], list):
for url in entry['known_urls']:
metadata_lines.append(f"known_url:{url}")
# Write sidecar if we have any metadata
if metadata_lines:
sidecar_path = final_dest.parent / f"{final_dest.name}.tags"
sidecar_path.write_text('\n'.join(metadata_lines), encoding='utf-8')
ctx.emit(f"Sidecar: {sidecar_path.name}")
log(f"Tags file: {sidecar_path}", file=sys.stderr)
except Exception as exc:
log(f"Warning: Could not write metadata sidecar: {exc}", file=sys.stderr)
if cleanup_source:
try:
if source_path.exists() and source_path != final_dest:
source_path.unlink()
except OSError:
pass
return 0
except Exception as exc:
log(f"Export failed: {exc}")
return 1
def _unique_path(p: Path) -> Path:
if not p.exists():
return p
stem = p.stem
suffix = p.suffix
parent = p.parent
for i in range(1, 1000):
candidate = parent / f"{stem} ({i}){suffix}"
if not candidate.exists():
return candidate
return p
CMDLET = Cmdlet(
name="get-file",
summary="Export files: from Hydrus database OR from AllDebrid magnets via pipe. Auto-detects source and handles accordingly.",
usage="get-file [-Path <dir>] [Size <50%|34MB>] [Convert <mp4|webm|audio|mp3|opus>] [-metadata] [-file <pattern>]",
args=[
CmdletArg("Path", description="Output directory for files."),
CmdletArg("Size", description="Target size (Hydrus only): 50% or 34MB."),
CmdletArg("Convert", description="Convert format (Hydrus only): mp4, webm, audio, mp3, opus."),
CmdletArg("metadata", type="flag", description="Export metadata to .tags file (Hydrus only)."),
CmdletArg("file", description="Filter files by pattern (AllDebrid only)."),
],
details=[
"Hydrus mode: exports media with optional size/format conversion",
"AllDebrid mode: downloads files from piped magnet IDs from search-debrid",
"Auto-detects pipe format and routes to correct handler",
"Magnet pipe format: ID|filename|size|statusCode|status|progress|...",
],
)