from __future__ import annotations from typing import Any, Dict, Optional, Sequence from pathlib import Path import shutil as _shutil import subprocess as _subprocess import json import sys import platform from helper.logger import log, debug import uuid as _uuid import time as _time from helper.progress import print_progress, print_final_progress from helper.http_client import HTTPClient from helper.mpv_ipc import get_ipc_pipe_path, send_to_mpv import fnmatch as _fnmatch from . import register import models import pipeline as ctx from helper import hydrus as hydrus_wrapper from ._shared import Cmdlet, CmdletArg, normalize_hash, looks_like_hash, create_pipe_object_result from config import resolve_output_dir, get_hydrus_url, get_hydrus_access_key from helper.alldebrid import AllDebridClient def _is_alldebrid_pipe_data(line: str) -> bool: """Check if line is AllDebrid pipe format: ID|filename|size|...""" parts = line.strip().split('|') if len(parts) < 5: return False try: # Check if first part is magnet ID (integer) magnet_id = int(parts[0]) # Check if 3rd part (size) is integer size = int(parts[2]) # Check if 4th part (status_code) is integer status_code = int(parts[3]) return magnet_id > 0 and size >= 0 and status_code in {0, 1, 2, 3, 4} except (ValueError, IndexError): return False def _handle_alldebrid_pipe(config: Dict[str, Any], args: Sequence[str]) -> int: """Handle AllDebrid magnet downloads from piped stdin.""" # Parse arguments out_path = None file_filter = None i = 0 while i < len(args): if args[i].lower() in {"-path", "--path", "path"} and i + 1 < len(args): out_path = Path(args[i + 1]).expanduser() i += 2 elif args[i].lower() in {"-file", "--file", "file"} and i + 1 < len(args): file_filter = args[i + 1] i += 2 else: i += 1 if not out_path: log("✗ -path required for AllDebrid downloads", file=sys.stderr) return 1 # Read magnet IDs from stdin magnets = [] try: for line in sys.stdin: line = line.strip() if line and _is_alldebrid_pipe_data(line): parts = line.split('|') magnet_id = int(parts[0]) magnets.append(magnet_id) except Exception as e: log(f"✗ Error reading stdin: {e}", file=sys.stderr) return 1 if not magnets: log("✗ No valid magnet IDs in pipe", file=sys.stderr) return 1 # Get API key from config import get_debrid_api_key api_key = get_debrid_api_key(config) if not api_key: log("✗ AllDebrid API key not configured", file=sys.stderr) return 1 # Download from each magnet client = AllDebridClient(api_key) total_files = 0 failed_files = 0 log(f"Processing {len(magnets)} magnet(s)...", file=sys.stderr) for magnet_id in magnets: try: # Fetch magnet files using magnet_status with include_files magnet_info = client.magnet_status(magnet_id, include_files=True) files_list = _extract_files_from_magnet(magnet_info, file_filter) if not files_list: log(f"⊘ No files in magnet {magnet_id}", file=sys.stderr) continue log(f"✓ Found {len(files_list)} file(s) in magnet {magnet_id}", file=sys.stderr) # Download each file for file_info in files_list: try: link = file_info['link'] filename = file_info['name'] # Unlock link to get direct URL try: direct_url = client.unlock_link(link) if not direct_url: log(f"✗ Failed to unlock link for {filename}", file=sys.stderr) failed_files += 1 continue except Exception as e: log(f"✗ Error unlocking link: {e}", file=sys.stderr) failed_files += 1 continue # Download file output_file = out_path / filename if _download_file_from_alldebrid(direct_url, output_file, filename, file_info['size']): log(f"✓ Downloaded: {filename}", file=sys.stderr) total_files += 1 else: log(f"✗ Failed to download: {filename}", file=sys.stderr) failed_files += 1 except Exception as e: log(f"✗ Error downloading file: {e}", file=sys.stderr) failed_files += 1 except Exception as e: log(f"✗ Error processing magnet {magnet_id}: {e}", file=sys.stderr) failed_files += 1 log(f"✓ Download complete: {total_files} file(s) downloaded, {failed_files} failed", file=sys.stderr) return 0 if failed_files == 0 else 1 def _extract_files_from_magnet(magnet_info: Dict[str, Any], filter_pattern: Optional[str] = None) -> list: """Extract files from magnet file tree, optionally filtering by pattern.""" files = [] def traverse(items: Any, prefix: str = "") -> None: if not isinstance(items, list): return for item in items: if not isinstance(item, dict): continue name = item.get('n', '') link = item.get('l', '') size = item.get('s', 0) entries = item.get('e', []) # File if link: full_path = f"{prefix}/{name}" if prefix else name if filter_pattern is None or _fnmatch.fnmatch(name.lower(), filter_pattern.lower()): files.append({'name': name, 'path': full_path, 'size': size, 'link': link}) # Folder if entries: full_path = f"{prefix}/{name}" if prefix else name traverse(entries, full_path) items = magnet_info.get('files', []) traverse(items) return files def _download_file_from_alldebrid(url: str, output_path: Path, filename: str, file_size: int) -> bool: """Download a single file from AllDebrid with progress bar.""" output_path.parent.mkdir(parents=True, exist_ok=True) try: downloaded = 0 chunk_size = 1024 * 1024 start_time = _time.time() last_update = start_time with HTTPClient(timeout=30.0, headers={'User-Agent': 'downlow/1.0'}) as client: response = client.get(url) response.raise_for_status() with open(output_path, 'wb', buffering=1024*1024) as f: for chunk in response.iter_bytes(chunk_size): if not chunk: break f.write(chunk) downloaded += len(chunk) # Update progress every 0.5 seconds to avoid spam now = _time.time() if now - last_update >= 0.5 or downloaded == file_size: elapsed = now - start_time speed = downloaded / elapsed if elapsed > 0 else 0 print_progress(filename, downloaded, file_size, speed) last_update = now # Print final progress line elapsed = _time.time() - start_time print_final_progress(filename, file_size, elapsed) log(f"✓ {filename} downloaded", file=sys.stderr) return True except Exception as e: log(f"\n[get-file] ✗ Download error: {e}", file=sys.stderr) return False def _is_playable_in_mpv(file_path_or_ext: str, mime_type: Optional[str] = None) -> bool: """Check if file can be played in MPV based on extension or mime type.""" from helper.utils_constant import mime_maps # Check mime type first if provided if mime_type: mime_lower = mime_type.lower() # Simple prefix check for common media types if any(mime_lower.startswith(prefix) for prefix in ['video/', 'audio/', 'image/']): return True # Extract extension if file_path_or_ext.startswith('.'): ext = file_path_or_ext.lower() else: ext = Path(file_path_or_ext).suffix.lower() if not ext: return False # Check if extension is in playable categories playable_categories = ['video', 'audio', 'image', 'image_sequence'] for category in playable_categories: if category in mime_maps: for key, info in mime_maps[category].items(): if info.get('ext', '').lower() == ext: return True return False def _play_in_mpv(file_url: str, file_title: str, is_stream: bool = False, headers: Optional[Dict[str, str]] = None) -> bool: """Play file in MPV using centralized IPC pipe, creating new instance if needed. Returns True on success, False on error. """ try: # First try to send to existing MPV instance if send_to_mpv(file_url, file_title, headers): debug(f"Added to MPV: {file_title}") return True # No existing MPV or pipe unavailable - start new instance ipc_pipe = get_ipc_pipe_path() debug(f"[get-file] Starting new MPV instance (pipe: {ipc_pipe})", file=sys.stderr) # Build command - start MPV without a file initially, just with IPC server cmd = ['mpv', f'--input-ipc-server={ipc_pipe}'] if headers: # Format headers for command line # --http-header-fields="Header1: Val1,Header2: Val2" header_str = ",".join([f"{k}: {v}" for k, v in headers.items()]) cmd.append(f'--http-header-fields={header_str}') # Add --idle flag so MPV stays running and waits for playlist commands cmd.append('--idle') # Detach process to prevent freezing parent CLI kwargs = {} if platform.system() == 'Windows': kwargs['creationflags'] = 0x00000008 # DETACHED_PROCESS _subprocess.Popen(cmd, stdin=_subprocess.DEVNULL, stdout=_subprocess.DEVNULL, stderr=_subprocess.DEVNULL, **kwargs) debug(f"[get-file] Started MPV instance (IPC: {ipc_pipe})", file=sys.stderr) # Give MPV time to start and open IPC pipe # Windows needs more time than Unix wait_time = 1.0 if platform.system() == 'Windows' else 0.5 debug(f"[get-file] Waiting {wait_time}s for MPV to initialize IPC...", file=sys.stderr) _time.sleep(wait_time) # Try up to 3 times to send the file via IPC for attempt in range(3): debug(f"[get-file] Sending file via IPC (attempt {attempt + 1}/3)", file=sys.stderr) if send_to_mpv(file_url, file_title, headers): debug(f"{'Streaming' if is_stream else 'Playing'} in MPV: {file_title}") debug(f"[get-file] Added to new MPV instance (IPC: {ipc_pipe})", file=sys.stderr) return True if attempt < 2: # Wait before retrying _time.sleep(0.3) # IPC send failed after all retries log("Error: Could not send file to MPV via IPC after startup", file=sys.stderr) return False except FileNotFoundError: log("Error: MPV not found. Install mpv to play media files", file=sys.stderr) return False except Exception as e: log(f"Error launching MPV: {e}", file=sys.stderr) return False def _handle_search_result(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: """Handle a file from search-file results using FileStorage backend.""" try: from helper.file_storage import FileStorage # Helper to get field from both dict and object def get_field(obj: Any, field: str, default: Any = None) -> Any: if isinstance(obj, dict): return obj.get(field, default) else: return getattr(obj, field, default) # Extract file information from ResultItem storage_name = get_field(result, 'origin', None) # Also check for 'source' field (from add-file and other cmdlets) if not storage_name: storage_name = get_field(result, 'source', None) file_hash = get_field(result, 'hash_hex', None) # Also check for file_hash field (from add-file and other cmdlets) if not file_hash: file_hash = get_field(result, 'file_hash', None) file_title = get_field(result, 'title', 'file') mime_type = get_field(result, 'mime', None) file_path = get_field(result, 'target', None) # Also check for 'file_path' field (from add-file and other cmdlets) if not file_path: file_path = get_field(result, 'file_path', None) # Also check for 'path' field (from search-file and other cmdlets) if not file_path: file_path = get_field(result, 'path', None) full_metadata = get_field(result, 'full_metadata', {}) magnet_id = full_metadata.get('magnet_id') if isinstance(full_metadata, dict) else None if not storage_name: log("Error: No storage backend specified in result", file=sys.stderr) return 1 debug(f"[get-file] Retrieving file from storage: {storage_name}", file=sys.stderr) # Handle different storage backends if storage_name.lower() == 'hydrus': return _handle_hydrus_file(file_hash, file_title, config, args, mime_type=mime_type) elif storage_name.lower() == 'local': return _handle_local_file(file_path, file_title, config, args, file_hash=file_hash) elif storage_name.lower() == 'download': # Downloads are local files return _handle_local_file(file_path, file_title, config, args, file_hash=file_hash) elif storage_name.lower() == 'debrid': # Extract magnet_id from result (search-file stores it in full_metadata or as custom attribute) if not magnet_id: magnet_id = get_field(result, 'magnet_id', None) if not magnet_id: log("Error: No magnet ID in debrid result", file=sys.stderr) return 1 return _handle_debrid_file(magnet_id, file_title, config, args) else: log(f"Unknown storage backend: {storage_name}", file=sys.stderr) return 1 except Exception as e: log(f"Error processing search result: {e}", file=sys.stderr) import traceback traceback.print_exc(file=sys.stderr) return 1 def _handle_hydrus_file(file_hash: Optional[str], file_title: str, config: Dict[str, Any], args: Sequence[str], mime_type: Optional[str] = None) -> int: """Handle file from Hydrus - auto-play in MPV if media file, otherwise open web URL.""" if not file_hash: log("Error: No file hash provided", file=sys.stderr) return 1 try: hydrus_url = get_hydrus_url(config) access_key = get_hydrus_access_key(config) if not hydrus_url or not access_key: log("Error: Hydrus not configured", file=sys.stderr) return 1 # Check if it's a playable media file based on filename or mime type is_media = _is_playable_in_mpv(file_title) if not is_media and mime_type: # Check mime type if filename check failed if any(m in mime_type.lower() for m in ['video/', 'audio/', 'image/']): is_media = True force_mpv = any(str(a).lower() in {'-mpv', '--mpv', 'mpv'} for a in args) force_browser = any(str(a).lower() in {'-web', '--web', 'web', '-browser', '--browser'} for a in args) # Check MPV availability from hydrus_health_check import check_mpv_availability mpv_available, _ = check_mpv_availability() # Construct URLs for streaming/viewing # For streaming, we use headers for auth, so we don't put the key in the URL stream_url = f"{hydrus_url}/get_files/file?hash={file_hash}" # For browser, we still need the key in the URL web_url = f"{hydrus_url}/get_files/file?hash={file_hash}&Hydrus-Client-API-Access-Key={access_key}" headers = { "Hydrus-Client-API-Access-Key": access_key } if force_browser: # User explicitly wants browser ipc_pipe = get_ipc_pipe_path() result_dict = create_pipe_object_result( source='hydrus', identifier=file_hash, file_path=web_url, cmdlet_name='get-file', title=file_title, file_hash=file_hash, extra={ 'ipc': ipc_pipe, 'action_type': 'browser', 'web_url': web_url, 'hydrus_url': hydrus_url, 'access_key': access_key } ) ctx.emit(result_dict) try: import webbrowser webbrowser.open(web_url) debug(f"[get-file] Opened in browser: {file_title}", file=sys.stderr) except Exception: pass return 0 elif force_mpv or (is_media and mpv_available): # Auto-play in MPV for media files (if available), or user requested it if _play_in_mpv(stream_url, file_title, is_stream=True, headers=headers): # Show pipe menu instead of emitting result for display # This allows immediate @N selection from the playlist from . import pipe pipe._run(None, [], config) return 0 else: # Fall back to browser try: import webbrowser webbrowser.open(web_url) debug(f"[get-file] Opened in browser instead", file=sys.stderr) except Exception: pass return 0 else: # Not media, open in browser ipc_pipe = get_ipc_pipe_path() result_dict = create_pipe_object_result( source='hydrus', identifier=file_hash, file_path=web_url, cmdlet_name='get-file', title=file_title, file_hash=file_hash, extra={ 'ipc': ipc_pipe, 'action_type': 'browser', 'web_url': web_url, 'hydrus_url': hydrus_url, 'access_key': access_key } ) ctx.emit(result_dict) try: import webbrowser webbrowser.open(web_url) debug(f"[get-file] Opened in browser: {file_title}", file=sys.stderr) except Exception: pass return 0 except Exception as e: log(f"Error handling Hydrus file: {e}", file=sys.stderr) import traceback traceback.print_exc(file=sys.stderr) return 1 def _handle_local_file(file_path: Optional[str], file_title: str, config: Dict[str, Any], args: Sequence[str], file_hash: Optional[str] = None) -> int: """Handle file from local storage - auto-play in MPV if media, otherwise open with default app.""" if not file_path: log("Error: No file path provided", file=sys.stderr) return 1 try: source = Path(file_path) if not source.exists(): log(f"Error: File not found: {file_path}", file=sys.stderr) return 1 # Check for explicit user flags force_mpv = any(str(a).lower() in {'-mpv', '--mpv', 'mpv'} for a in args) force_default = any(str(a).lower() in {'-open', '--open', 'open'} for a in args) # Check if it's a playable media file is_media = _is_playable_in_mpv(str(source)) # Check MPV availability from hydrus_health_check import check_mpv_availability mpv_available, _ = check_mpv_availability() if force_default: # User explicitly wants default application import subprocess as sp import platform import os try: if platform.system() == 'Darwin': # macOS sp.run(['open', file_path]) elif platform.system() == 'Windows': os.startfile(file_path) else: # Linux sp.run(['xdg-open', file_path]) ctx.emit(f"Opened: {file_title}") debug(f"[get-file] Opened {file_title} with default app", file=sys.stderr) return 0 except Exception as e: log(f"Error opening file: {e}", file=sys.stderr) return 1 elif force_mpv or (is_media and mpv_available): # Auto-play in MPV for media files (if available), or user requested it if _play_in_mpv(file_path, file_title, is_stream=False): # Show pipe menu instead of emitting result for display # This allows immediate @N selection from the playlist from . import pipe pipe._run(None, [], config) return 0 else: # Fall back to default application try: import os import platform if platform.system() == 'Darwin': # macOS _subprocess.run(['open', file_path]) elif platform.system() == 'Windows': os.startfile(file_path) else: # Linux _subprocess.run(['xdg-open', file_path]) debug(f"[get-file] Opened with default app instead", file=sys.stderr) except Exception: pass return 0 else: # Not media - open with default application import subprocess as sp import platform import os try: if platform.system() == 'Darwin': # macOS sp.run(['open', file_path]) elif platform.system() == 'Windows': # Use os.startfile for more reliable Windows handling os.startfile(file_path) else: # Linux sp.run(['xdg-open', file_path]) print(f"Opened: {file_title}") debug(f"[get-file] Opened {file_title} with default app", file=sys.stderr) # Emit result for downstream processing result_dict = create_pipe_object_result( source='local', identifier=str(Path(file_path).stem) if file_path else 'unknown', file_path=file_path, cmdlet_name='get-file', title=file_title, file_hash=file_hash, extra={'action_type': 'opened'} ) ctx.emit(result_dict) return 0 except Exception as e: log(f"Error opening file with default app: {e}", file=sys.stderr) return 1 except Exception as e: log(f"Error handling local file: {e}", file=sys.stderr) return 1 def _handle_debrid_file(magnet_id: int, magnet_title: str, config: Dict[str, Any], args: Sequence[str]) -> int: """Handle magnet file from AllDebrid storage - download to local path.""" # Parse output path argument out_path = None i = 0 args_list = [str(a) for a in args] while i < len(args_list): if args_list[i].lower() in {"-path", "--path", "path"} and i + 1 < len(args_list): out_path = Path(args_list[i + 1]).expanduser() i += 2 else: i += 1 if not out_path: log("✗ -Path required for debrid downloads", file=sys.stderr) return 1 # Ensure output directory exists try: out_path.mkdir(parents=True, exist_ok=True) except Exception as e: log(f"✗ Error creating output directory: {e}", file=sys.stderr) return 1 # Get API key from config import get_debrid_api_key api_key = get_debrid_api_key(config) if not api_key: log("✗ AllDebrid API key not configured in config.json", file=sys.stderr) return 1 try: client = AllDebridClient(api_key) debug(f"[get-file] Downloading magnet {magnet_id}: {magnet_title}", file=sys.stderr) # Fetch magnet files try: magnet_info = client.magnet_status(magnet_id, include_files=True) except Exception as e: log(f"✗ Failed to fetch magnet files: {e}", file=sys.stderr) return 1 # Extract files from magnet files_list = _extract_files_from_magnet(magnet_info) if not files_list: log(f"✗ No files in magnet {magnet_id}", file=sys.stderr) return 1 log(f"✓ Found {len(files_list)} file(s) in magnet {magnet_id}", file=sys.stderr) # Download each file total_files = 0 failed_files = 0 for file_info in files_list: try: link = file_info['link'] filename = file_info['name'] file_size = file_info['size'] # Unlock link to get direct URL try: direct_url = client.unlock_link(link) if not direct_url: log(f"✗ Failed to unlock link for {filename}", file=sys.stderr) failed_files += 1 continue except Exception as e: log(f"✗ Error unlocking link: {e}", file=sys.stderr) failed_files += 1 continue # Download file output_file = out_path / filename if _download_file_from_alldebrid(direct_url, output_file, filename, file_size): log(f"✓ Downloaded: {filename}", file=sys.stderr) total_files += 1 else: log(f"✗ Failed to download: {filename}", file=sys.stderr) failed_files += 1 except Exception as e: log(f"✗ Error downloading file: {e}", file=sys.stderr) failed_files += 1 log(f"✓ Download complete: {total_files} file(s) downloaded, {failed_files} failed", file=sys.stderr) if total_files > 0: # Emit result for downstream processing result_dict = create_pipe_object_result( source='debrid', identifier=str(magnet_id), file_path=str(out_path), cmdlet_name='get-file', title=magnet_title, extra={ 'magnet_id': magnet_id, 'files_downloaded': total_files, 'download_dir': str(out_path) } ) ctx.emit(result_dict) return 0 if failed_files == 0 else 1 except Exception as e: log(f"✗ Error processing debrid download: {e}", file=sys.stderr) import traceback traceback.print_exc(file=sys.stderr) return 1 @register(["get-file"]) # primary name def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: # Help: if any help token is present, print CMDLET JSON and exit try: if any(str(a).lower() in {"-?", "/?", "--help", "-h", "help", "--cmdlet"} for a in args): log(json.dumps(CMDLET, ensure_ascii=False, indent=2)) return 0 except Exception: pass # Helper to get field from both dict and object def get_field(obj: Any, field: str, default: Any = None) -> Any: if isinstance(obj, dict): return obj.get(field, default) else: return getattr(obj, field, default) # Check if result is a list (from @N selection) and extract the first item actual_result = result if isinstance(result, list) and len(result) > 0: actual_result = result[0] # Check if this is a FileStorage search result (has origin field indicating a backend) # This handles both dict and ResultItem objects origin = get_field(actual_result, 'origin', None) # Also check for 'source' field (from add-file and other cmdlets) if not origin: origin = get_field(actual_result, 'source', None) if origin and origin.lower() in {'hydrus', 'local', 'debrid', 'alldebrid'}: # This is a search result with explicit origin - handle it via _handle_search_result return _handle_search_result(actual_result, args, config) # Handle ResultItem from search-file via @N selection # The result can be either: # 1. A single ResultItem (direct call) # 2. A list of ResultItems (from @N selection in CLI) result_item = None if result and hasattr(result, '__class__'): if result.__class__.__name__ == 'ResultItem': result_item = result elif isinstance(result, list) and len(result) > 0: # @N selection creates a list, extract the first item if it's a ResultItem if hasattr(result[0], '__class__') and result[0].__class__.__name__ == 'ResultItem': result_item = result[0] if result_item: return _handle_search_result(result_item, args, config) # Handle PipeObject results from previous get-file call (for chaining) if result and isinstance(result, dict) and result.get('action', '').startswith('cmdlet:get-file'): # This is from a previous get-file result - just pass it through # Don't treat it as a new file to play, just emit for pipeline chaining ctx.emit(result) return 0 # Check for AllDebrid pipe input (from search-debrid) # Try to read first line from stdin to detect format first_line = None try: # Try to read one line without blocking if hasattr(sys.stdin, 'readable') and sys.stdin.readable(): first_line = sys.stdin.readline().strip() except Exception: pass if first_line and _is_alldebrid_pipe_data(first_line): # This is AllDebrid pipe data - handle it separately # Put the line back by creating a chain with the rest of stdin import io try: remaining_stdin = sys.stdin.read() except: remaining_stdin = "" sys.stdin = io.StringIO(first_line + '\n' + remaining_stdin) return _handle_alldebrid_pipe(config, args) elif first_line: # Not AllDebrid data, put it back for normal processing import io try: remaining_stdin = sys.stdin.read() except: remaining_stdin = "" sys.stdin = io.StringIO(first_line + '\n' + remaining_stdin) # Helpers def _sanitize_name(text: str) -> str: allowed = [] for ch in text: allowed.append(ch if (ch.isalnum() or ch in {"-", "_", " ", "."}) else " ") return (" ".join("".join(allowed).split()) or "export").strip() def _ffprobe_duration_seconds(path: Path) -> Optional[float]: ffprobe_path = _shutil.which('ffprobe') if not ffprobe_path: return None try: res = _subprocess.run( [ffprobe_path, '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', str(path)], stdout=_subprocess.PIPE, stderr=_subprocess.PIPE, check=True, text=True, ) out = (res.stdout or '').strip() if not out: return None value = float(out) return value if value > 0 else None except Exception: return None def _parse_args(tokens: Sequence[str]) -> tuple[Optional[Path], Optional[str], Optional[str], Optional[str], bool]: out_override: Optional[Path] = None size_spec: Optional[str] = None convert_spec: Optional[str] = None hash_spec: Optional[str] = None export_metadata: bool = False i = 0 while i < len(tokens): t = tokens[i] low = t.lower() if low in {"-path", "--path", "path"} and i + 1 < len(tokens): try: out_override = Path(tokens[i + 1]).expanduser() except Exception: out_override = None i += 2 continue if low in {"size", "-size", "--size"} and i + 1 < len(tokens): size_spec = tokens[i + 1] i += 2 continue if low in {"convert", "-convert", "--convert"} and i + 1 < len(tokens): convert_spec = tokens[i + 1] i += 2 continue if low in {"-hash", "--hash", "hash"} and i + 1 < len(tokens): hash_spec = tokens[i + 1] i += 2 continue if low in {"-metadata", "--metadata", "metadata"}: export_metadata = True i += 1 continue i += 1 return out_override, size_spec, convert_spec, hash_spec, export_metadata def _compute_target_bytes(size_spec: Optional[str], source_bytes: int) -> Optional[int]: if not size_spec: return None text = str(size_spec).strip().lower() if not text: return None if text.endswith('%'): try: pct = float(text[:-1]) except ValueError: return None pct = max(0.0, min(100.0, pct)) target = int(round(source_bytes * (pct / 100.0))) else: val = text if val.endswith('mb'): val = val[:-2] elif val.endswith('m'): val = val[:-1] try: mb = float(val) except ValueError: return None target = int(round(mb * 1024 * 1024)) min_bytes = 1 * 1024 * 1024 if target <= 0: target = min_bytes return min(target, source_bytes) def _guess_kind_from_suffix(path: Path) -> str: sfx = path.suffix.lower() if sfx in {'.mp4', '.mkv', '.webm', '.mov', '.avi', '.flv', '.mpg', '.mpeg', '.ts', '.m4v', '.wmv'}: return 'video' if sfx in {'.mp3', '.flac', '.wav', '.m4a', '.aac', '.ogg', '.opus', '.mka'}: return 'audio' return 'other' def _extract_metadata_from_tags(tags_payload: Dict[str, Any], file_hash: str, input_kind: str = '') -> Dict[str, str]: """Extract common metadata fields from Hydrus tags. Returns a dict mapping FFmpeg metadata keys to values. Supports: title, artist, album, track, date, genre, etc. For audio files, applies sensible defaults: - If no album, uses title as album - If no track, defaults to 1 - album_artist is set to artist value """ metadata = {} # Map of common tag namespaces to FFmpeg metadata keys tag_map = { 'title': 'title', 'artist': 'artist', 'album': 'album', 'track': 'track', 'track_number': 'track', 'date': 'date', 'year': 'date', 'genre': 'genre', 'composer': 'composer', 'comment': 'comment', } if not tags_payload or 'metadata' not in tags_payload or not tags_payload['metadata']: return metadata entry = tags_payload['metadata'][0] if 'tags' not in entry or not isinstance(entry['tags'], dict): return metadata tags_dict = entry['tags'] # Extract metadata from tags for _service_key, service_data in tags_dict.items(): if not isinstance(service_data, dict): continue display_tags = service_data.get('display_tags', {}) if not isinstance(display_tags, dict): continue current_tags = display_tags.get('0', []) if not isinstance(current_tags, list): continue for tag in current_tags: tag_str = str(tag).strip() if ':' in tag_str: namespace, value = tag_str.split(':', 1) namespace = namespace.lower().strip() value = value.strip() if namespace in tag_map and value: ffmpeg_key = tag_map[namespace] # Use first occurrence if ffmpeg_key not in metadata: metadata[ffmpeg_key] = value # Apply sensible defaults for audio files if input_kind == 'audio': # If no album, use title as album if 'album' not in metadata and 'title' in metadata: metadata['album'] = metadata['title'] # If no track, default to 1 if 'track' not in metadata: metadata['track'] = '1' # If no album_artist, use artist if 'artist' in metadata: metadata['album_artist'] = metadata['artist'] return metadata out_override, size_spec, convert_spec, hash_spec, export_metadata = _parse_args(args) default_dir = resolve_output_dir(config) media_kind = (get_field(result, 'media_kind', '') or '').lower() _chk = [] if out_override: _chk.append(f"Path={out_override}") if size_spec: _chk.append(f"Size={size_spec}") if convert_spec: _chk.append(f"Convert={convert_spec}") # Prefer explicit -hash over result hash for logging file_hash_for_log = None if hash_spec and looks_like_hash(hash_spec): file_hash_for_log = normalize_hash(hash_spec) else: hash_value = get_field(result, 'hash_hex', None) file_hash_for_log = normalize_hash(hash_value) if hash_value else None if _chk or file_hash_for_log: msg = "get-file: " + ", ".join(_chk) if _chk else "get-file" if file_hash_for_log: msg = f"{msg} (Hash={file_hash_for_log})" ctx.emit(msg) base_name = _sanitize_name(get_field(result, 'title', None) or '') if not base_name: target_attr = get_field(result, 'target', None) if isinstance(target_attr, str) and target_attr and not target_attr.startswith(('http://', 'https://')): base_name = _sanitize_name(Path(target_attr).stem) else: base_name = 'export' local_target = get_field(result, 'target', None) is_url = isinstance(local_target, str) and local_target.startswith(('http://', 'https://')) # Establish file hash (prefer -hash override when provided and valid) if hash_spec and looks_like_hash(hash_spec): file_hash = normalize_hash(hash_spec) else: file_hash = normalize_hash(get_field(result, 'hash_hex', None)) if get_field(result, 'hash_hex', None) else None source_path: Optional[Path] = None source_size: Optional[int] = None duration_sec: Optional[float] = None tags_payload: Dict[str, Any] = {} urls_payload: Dict[str, Any] = {} cleanup_source: bool = False if isinstance(local_target, str) and not is_url and not (hash_spec and file_hash): p = Path(local_target) if not p.exists(): log(f"File missing: {p}") return 1 source_path = p try: source_size = p.stat().st_size except OSError: source_size = None duration_sec = _ffprobe_duration_seconds(p) if file_hash is None: for sc in (p.with_suffix('.tags'), p.with_suffix('.tags.txt')): try: if sc.exists(): text = sc.read_text(encoding='utf-8', errors='ignore') for line in text.splitlines(): ls = line.strip().lower() if ls.startswith('hash:'): candidate = line.split(':', 1)[1].strip() if ':' in line else '' if looks_like_hash(candidate): file_hash = candidate.lower() break except OSError: pass elif file_hash: try: client = hydrus_wrapper.get_client(config) except Exception as exc: log(f"Hydrus client unavailable: {exc}") return 1 if client is None: log("Hydrus client unavailable") return 1 # Fetch metadata and tags (needed for both -metadata flag and audio tagging) # Fetch tags try: tags_payload = client.fetch_file_metadata(hashes=[file_hash], include_service_keys_to_tags=True) except Exception: tags_payload = {} # Fetch URLs try: urls_payload = client.fetch_file_metadata(hashes=[file_hash], include_file_urls=True) except Exception: urls_payload = {} # Extract title from metadata if base_name is still 'export' if base_name == 'export' and tags_payload: try: file_metadata = tags_payload.get('file_metadata', []) if file_metadata and isinstance(file_metadata, list) and len(file_metadata) > 0: meta = file_metadata[0] if isinstance(meta, dict): tags_dict = meta.get('tags', {}) if isinstance(tags_dict, dict): # Look for title in storage tags for service in tags_dict.values(): if isinstance(service, dict): storage = service.get('storage_tags', {}) if isinstance(storage, dict): for tag_list in storage.values(): if isinstance(tag_list, list): for tag in tag_list: if isinstance(tag, str) and tag.lower().startswith('title:'): title_val = tag.split(':', 1)[1].strip() if title_val: base_name = _sanitize_name(title_val) break if base_name != 'export': break if base_name != 'export': break except Exception: pass # Normal file export (happens regardless of -metadata flag) try: from helper.hydrus import hydrus_export as _hydrus_export except Exception: _hydrus_export = None # type: ignore if _hydrus_export is None: log("Hydrus export helper unavailable") return 1 download_dir = out_override if (out_override and out_override.is_dir()) else default_dir try: download_dir.mkdir(parents=True, exist_ok=True) except Exception: # If mkdir fails, fall back to default_dir download_dir = default_dir # Verify the directory is writable; if not, fall back to default try: test_file = download_dir / f".downlow_write_test_{_uuid.uuid4().hex[:8]}" test_file.touch() test_file.unlink() except (OSError, PermissionError): # Directory is not writable, use default_dir instead download_dir = default_dir try: download_dir.mkdir(parents=True, exist_ok=True) except Exception: pass token = (_uuid.uuid4().hex[:8]) provisional_stem = f"{base_name}.dlhx_{token}" provisional = download_dir / f"{provisional_stem}.bin" class _Args: pass args_obj = _Args() setattr(args_obj, 'output', provisional) setattr(args_obj, 'format', 'copy') setattr(args_obj, 'tmp_dir', str(download_dir)) setattr(args_obj, 'metadata_json', None) setattr(args_obj, 'hydrus_url', get_hydrus_url(config, "home") or "http://localhost:45869") setattr(args_obj, 'access_key', get_hydrus_access_key(config, "home") or "") setattr(args_obj, 'timeout', float(config.get('HydrusNetwork_Request_Timeout') or 60.0)) try: file_url = client.file_url(file_hash) except Exception: file_url = None setattr(args_obj, 'file_url', file_url) setattr(args_obj, 'file_hash', file_hash) import io as _io, contextlib as _contextlib _buf = _io.StringIO() status = 1 with _contextlib.redirect_stdout(_buf): status = _hydrus_export(args_obj, None) if status != 0: stderr_text = _buf.getvalue().strip() if stderr_text: log(stderr_text) return status json_text = _buf.getvalue().strip().splitlines()[-1] if _buf.getvalue() else '' final_from_json: Optional[Path] = None try: payload = json.loads(json_text) if json_text else None if isinstance(payload, dict): outp = payload.get('output') if isinstance(outp, str) and outp: final_from_json = Path(outp) except Exception: final_from_json = None if final_from_json and final_from_json.exists(): source_path = final_from_json else: candidates = [p for p in provisional.parent.glob(provisional_stem + '*') if p.exists() and p.is_file()] non_provisional = [p for p in candidates if p.suffix.lower() not in {'.bin', '.hydrus'}] pick_from = non_provisional if non_provisional else candidates if pick_from: try: source_path = max(pick_from, key=lambda p: p.stat().st_mtime) except Exception: source_path = pick_from[0] else: source_path = provisional candidates = [p for p in provisional.parent.glob(provisional_stem + '*') if p.exists() and p.is_file()] non_provisional = [p for p in candidates if p.suffix.lower() not in {'.bin', '.hydrus'}] pick_from = non_provisional if non_provisional else candidates if pick_from: try: source_path = max(pick_from, key=lambda p: p.stat().st_mtime) except Exception: source_path = pick_from[0] else: source_path = provisional try: source_size = source_size or (source_path.stat().st_size if source_path.exists() else None) except OSError: source_size = source_size if duration_sec is None: duration_sec = _ffprobe_duration_seconds(source_path) cleanup_source = True else: log("Selected result is neither a local file nor a Hydrus record") return 1 convert = (str(convert_spec or '').strip().lower()) if convert not in {'', 'copy', 'mp4', 'webm', 'audio', 'mp3', 'opus'}: log(f"Unsupported Convert value: {convert_spec}") return 1 if not convert: convert = 'copy' input_kind = media_kind or _guess_kind_from_suffix(source_path) if input_kind == 'audio' and convert in {'mp4', 'webm'}: log("Cannot convert audio to video") return 1 def _ext_for_convert(conv: str, src: Path) -> str: if conv == 'mp4': return '.mp4' if conv == 'webm': return '.webm' if conv in {'audio', 'mp3'}: return '.mp3' if conv == 'opus': return '.opus' return src.suffix or '' auto_named = True if out_override is not None and out_override.exists() and out_override.is_dir(): dest_dir = out_override dest_ext = _ext_for_convert(convert, source_path) dest_path = dest_dir / f"{base_name}{dest_ext}" else: dest_dir = default_dir dest_ext = _ext_for_convert(convert, source_path) if out_override and not out_override.exists() and not str(out_override).endswith(('/', '\\')): dest_path = out_override auto_named = False else: dest_path = (dest_dir / f"{base_name}{dest_ext}") if source_size is None: try: source_size = source_path.stat().st_size except OSError: source_size = None if source_size is None: log("Unable to determine source size for sizing logic; proceeding without Size targeting") target_bytes = None else: target_bytes = _compute_target_bytes(size_spec, int(source_size)) if target_bytes and (source_size or 0): try: from ..downlow import _fmt_bytes as _fmt_bytes_helper except ImportError: try: from downlow import _fmt_bytes as _fmt_bytes_helper # type: ignore except ImportError: _fmt_bytes_helper = lambda x: f"{x} bytes" # type: ignore except Exception: _fmt_bytes_helper = lambda x: f"{x} bytes" # type: ignore ctx.emit(f"Resizing target: {_fmt_bytes_helper(source_size)} -> {_fmt_bytes_helper(target_bytes)}") cleanup_source = locals().get('cleanup_source', False) if convert == 'copy' and (not target_bytes or target_bytes >= (source_size or 0)): # Simple copy without FFmpeg processing # Only skip this if we need to write metadata (then FFmpeg handles it) if not (export_metadata or (tags_payload and tags_payload.get('metadata'))): try: dest_path.parent.mkdir(parents=True, exist_ok=True) final_dest = _unique_path(dest_path) _shutil.copy2(source_path, final_dest) ctx.emit(f"Exported to {final_dest}") log(f"Exported: {final_dest}", file=sys.stderr) if cleanup_source: try: if source_path.exists() and source_path != final_dest: source_path.unlink() except OSError: pass return 0 except Exception as exc: log(f"Copy failed: {exc}") return 1 else: # Metadata exists, so we need to go through FFmpeg to embed and write sidecar # Fall through to FFmpeg section below pass convert_effective = convert if convert == 'copy' and target_bytes and (source_size or 0) > target_bytes: if input_kind == 'video': convert_effective = 'mp4' elif input_kind == 'audio': convert_effective = 'copy' else: convert_effective = convert ffmpeg_path = _shutil.which('ffmpeg') if not ffmpeg_path: log("ffmpeg executable not found in PATH") return 1 # Extract metadata from tags to embed in file file_metadata = _extract_metadata_from_tags(tags_payload, file_hash or '', input_kind) if file_metadata: metadata_msg = ', '.join(f'{k}={v}' for k, v in file_metadata.items()) ctx.emit(f"[metadata] Embedding: {metadata_msg}") ctx.print_if_visible(f"[get-file] Embedding metadata: {metadata_msg}", file=sys.stderr) else: ctx.print_if_visible(f"[get-file] No metadata tags found to embed", file=sys.stderr) cmd: list[str] = [ffmpeg_path, '-y', '-i', str(source_path)] # Add metadata flags to FFmpeg command for key, value in file_metadata.items(): cmd.extend(['-metadata', f'{key}={value}']) conv = convert_effective if conv in {'mp4', 'webm', 'copy'}: video_bitrate: Optional[int] = None audio_bitrate: int = 128_000 if target_bytes and duration_sec and duration_sec > 0: total_bps = max(1, int((target_bytes * 8) / duration_sec)) if total_bps <= audio_bitrate + 50_000: if input_kind == 'video': video_bitrate = max(50_000, total_bps - audio_bitrate) else: video_bitrate = None else: video_bitrate = total_bps - audio_bitrate if conv == 'webm': cmd += ['-c:v', 'libvpx-vp9'] if video_bitrate: cmd += ['-b:v', str(video_bitrate)] else: cmd += ['-b:v', '0', '-crf', '32'] cmd += ['-c:a', 'libopus', '-b:a', '160k'] elif conv == 'mp4' or (conv == 'copy' and input_kind == 'video'): cmd += ['-c:v', 'libx265', '-preset', 'medium', '-tag:v', 'hvc1', '-pix_fmt', 'yuv420p'] if video_bitrate: cmd += ['-b:v', str(video_bitrate)] else: cmd += ['-crf', '26'] cmd += ['-c:a', 'aac', '-b:a', '192k'] if conv == 'mp4' or (conv == 'copy' and input_kind == 'video'): cmd += ['-movflags', '+faststart'] if convert_spec and conv != 'copy': ctx.emit(f"Converting video -> {conv} (duration={duration_sec or 'unknown'}s)") else: if target_bytes and duration_sec and duration_sec > 0: total_bps = max(1, int((target_bytes * 8) / duration_sec)) abr = max(32_000, min(320_000, total_bps)) else: abr = 192_000 if conv in {'audio', 'mp3'}: cmd += ['-vn', '-c:a', 'libmp3lame', '-b:a', str(abr)] elif conv == 'opus': cmd += ['-vn', '-c:a', 'libopus', '-b:a', str(abr)] else: ext = (source_path.suffix.lower() if source_path else '') if ext in {'.mp3'}: cmd += ['-vn', '-c:a', 'libmp3lame', '-b:a', str(abr)] elif ext in {'.opus', '.ogg'}: cmd += ['-vn', '-c:a', 'libopus', '-b:a', str(abr)] elif ext in {'.m4a', '.aac'}: cmd += ['-vn', '-c:a', 'aac', '-b:a', str(abr)] else: cmd += ['-vn', '-c:a', 'libmp3lame', '-b:a', str(abr)] if convert_spec and conv != 'copy': ctx.emit(f"Converting audio -> {conv}") if conv in {'audio','mp3'}: desired_ext = '.mp3' elif conv == 'opus': desired_ext = '.opus' elif conv == 'webm': desired_ext = '.webm' elif conv == 'mp4': desired_ext = '.mp4' else: desired_ext = source_path.suffix if (not dest_path.suffix) or auto_named or (dest_path.suffix.lower() in {'.hydrus', '.bin'}): dest_path = dest_path.with_suffix(desired_ext) suffix_parts: list[str] = [] def _size_label(raw: Optional[str], tb: Optional[int]) -> Optional[str]: if not raw: return None text = str(raw).strip() if text.endswith('%'): return text if not tb: return None mb = int(round(tb / (1024*1024))) return f"{mb}Mb" label = _size_label(size_spec, locals().get('target_bytes')) if label: suffix_parts.append(label) if convert_spec and convert.lower() != 'copy': label_map = {'mp4':'MP4','webm':'WEBM','audio':'AUDIO','mp3':'MP3','opus':'OPUS'} suffix_parts.append(label_map.get(convert.lower(), convert.upper())) if suffix_parts and auto_named: _aug = f"{base_name} (" + ",".join(suffix_parts) + ")" dest_path = dest_path.with_name(_aug + dest_path.suffix) try: dest_path.parent.mkdir(parents=True, exist_ok=True) final_dest = _unique_path(dest_path) cmd.append(str(final_dest)) completed = _subprocess.run(cmd, stdout=_subprocess.PIPE, stderr=_subprocess.PIPE, text=True) if completed.returncode != 0: stderr = (completed.stderr or '').strip() log(f"ffmpeg failed ({completed.returncode}): {stderr}") return 1 ctx.emit(f"Exported to {final_dest}") log(f"Exported: {final_dest}", file=sys.stderr) # Always write the .tags sidecar with metadata (hash, tags, URLs) # This ensures metadata is preserved even if FFmpeg embedding didn't work try: metadata_lines = [] # Add hash if file_hash: metadata_lines.append(f"hash:{file_hash}") # Extract tags from metadata payload using correct structure tags_set = set() if 'metadata' in tags_payload and tags_payload['metadata']: entry = tags_payload['metadata'][0] if 'tags' in entry and isinstance(entry['tags'], dict): for _service_key, service_data in entry['tags'].items(): if isinstance(service_data, dict): display_tags = service_data.get('display_tags', {}) if isinstance(display_tags, dict): current_tags = display_tags.get('0', []) if isinstance(current_tags, list): tags_set.update(current_tags) # Add tags (sorted, no prefix) for tag in sorted(tags_set): metadata_lines.append(tag) # Extract and add URLs if 'metadata' in urls_payload and urls_payload['metadata']: entry = urls_payload['metadata'][0] if 'known_urls' in entry and isinstance(entry['known_urls'], list): for url in entry['known_urls']: metadata_lines.append(f"known_url:{url}") # Write sidecar if we have any metadata if metadata_lines: sidecar_path = final_dest.parent / f"{final_dest.name}.tags" sidecar_path.write_text('\n'.join(metadata_lines), encoding='utf-8') ctx.emit(f"Sidecar: {sidecar_path.name}") log(f"Tags file: {sidecar_path}", file=sys.stderr) except Exception as exc: log(f"Warning: Could not write metadata sidecar: {exc}", file=sys.stderr) if cleanup_source: try: if source_path.exists() and source_path != final_dest: source_path.unlink() except OSError: pass return 0 except Exception as exc: log(f"Export failed: {exc}") return 1 def _unique_path(p: Path) -> Path: if not p.exists(): return p stem = p.stem suffix = p.suffix parent = p.parent for i in range(1, 1000): candidate = parent / f"{stem} ({i}){suffix}" if not candidate.exists(): return candidate return p CMDLET = Cmdlet( name="get-file", summary="Export files: from Hydrus database OR from AllDebrid magnets via pipe. Auto-detects source and handles accordingly.", usage="get-file [-Path