from __future__ import annotations from typing import Any, Dict, Sequence from pathlib import Path import os import sys import shutil import subprocess import webbrowser import pipeline as ctx from ._shared import Cmdlet, CmdletArg, SharedArgs, parse_cmdlet_args, get_field, normalize_hash from SYS.logger import log, debug from Store import Store from config import resolve_output_dir class Get_File(Cmdlet): """Export files to local path via hash+store.""" def __init__(self) -> None: """Initialize get-file cmdlet.""" super().__init__( name="get-file", summary="Export file to local path", usage="@1 | get-file -path C:\\Downloads", arg=[ SharedArgs.HASH, SharedArgs.STORE, SharedArgs.PATH, CmdletArg("name", description="Output filename (default: from metadata title)"), ], detail=[ "- Exports file from storage backend to local path", "- Uses hash+store to retrieve file", "- Preserves file extension and metadata", ], exec=self.run, ) self.register() def run(self, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: """Export file via hash+store backend.""" debug(f"[get-file] run() called with result type: {type(result)}") parsed = parse_cmdlet_args(args, self) debug(f"[get-file] parsed args: {parsed}") # Extract hash and store from result or args file_hash = parsed.get("hash") or get_field(result, "hash") store_name = parsed.get("store") or get_field(result, "store") output_path = parsed.get("path") output_name = parsed.get("name") debug(f"[get-file] file_hash={file_hash[:12] if file_hash else None}... store_name={store_name}") if not file_hash: log("Error: No file hash provided") return 1 if not store_name: log("Error: No store name provided") return 1 # Normalize hash file_hash = normalize_hash(file_hash) if not file_hash: log("Error: Invalid hash format") return 1 debug(f"[get-file] Getting storage backend: {store_name}") # Get storage backend store = Store(config) backend = store[store_name] debug(f"[get-file] Backend retrieved: {type(backend).__name__}") # Get file metadata to determine name and extension debug(f"[get-file] Getting metadata for hash...") metadata = backend.get_metadata(file_hash) if not metadata: log(f"Error: File metadata not found for hash {file_hash[:12]}...") return 1 debug(f"[get-file] Metadata retrieved: title={metadata.get('title')}, ext={metadata.get('ext')}") def resolve_display_title() -> str: candidates = [ get_field(result, "title"), get_field(result, "name"), get_field(result, "filename"), (metadata.get("title") if isinstance(metadata, dict) else None), (metadata.get("name") if isinstance(metadata, dict) else None), (metadata.get("filename") if isinstance(metadata, dict) else None), ] for candidate in candidates: if candidate is None: continue text = str(candidate).strip() if text: return text return "" debug(f"[get-file] Calling backend.get_file({file_hash[:12]}...)") # Get file from backend (may return Path or URL string depending on backend) source_path = backend.get_file(file_hash) debug(f"[get-file] backend.get_file returned: {source_path}") # Check if backend returned a URL (HydrusNetwork case) if isinstance(source_path, str) and (source_path.startswith("http://") or source_path.startswith("https://")): # Hydrus backend returns a URL; open it only for this explicit user action. try: webbrowser.open(source_path) except Exception as exc: log(f"Error opening browser: {exc}", file=sys.stderr) else: log(f"Opened in browser: {source_path}", file=sys.stderr) # Emit result for pipeline ctx.emit({ "hash": file_hash, "store": store_name, "url": source_path, "title": resolve_display_title() or "Opened", }) return 0 # Otherwise treat as file path (local/folder backends) if isinstance(source_path, str): source_path = Path(source_path) if not source_path or not source_path.exists(): log(f"Error: Backend could not retrieve file for hash {file_hash[:12]}...") return 1 # Folder store UX: without -path, just open the file in the default app. # Only export/copy when -path is explicitly provided. backend_name = type(backend).__name__ is_folder_backend = backend_name.lower() == "folder" if is_folder_backend and not output_path: display_title = resolve_display_title() or source_path.stem or "Opened" ext_for_emit = metadata.get("ext") or source_path.suffix.lstrip(".") self._open_file_default(source_path) log(f"Opened: {source_path}", file=sys.stderr) ctx.emit({ "hash": file_hash, "store": store_name, "path": str(source_path), "title": str(display_title), "ext": str(ext_for_emit or ""), }) debug("[get-file] Completed successfully") return 0 # Otherwise: export/copy to output_dir. if output_path: output_dir = Path(output_path).expanduser() else: output_dir = resolve_output_dir(config) debug(f"[get-file] Output dir: {output_dir}") output_dir.mkdir(parents=True, exist_ok=True) # Determine output filename (only when exporting) if output_name: filename = output_name else: title = (metadata.get("title") if isinstance(metadata, dict) else None) or resolve_display_title() or "export" filename = self._sanitize_filename(title) # Add extension if metadata has it ext = metadata.get("ext") if ext and not filename.endswith(ext): if not ext.startswith('.'): ext = '.' + ext filename += ext dest_path = self._unique_path(output_dir / filename) # Copy file to destination debug(f"[get-file] Copying {source_path} -> {dest_path}", file=sys.stderr) shutil.copy2(source_path, dest_path) log(f"Exported: {dest_path}", file=sys.stderr) # Emit result for pipeline ctx.emit({ "hash": file_hash, "store": store_name, "path": str(dest_path), "title": filename, }) debug(f"[get-file] Completed successfully") return 0 def _open_file_default(self, path: Path) -> None: """Open a local file in the OS default application.""" try: if sys.platform.startswith("win"): os.startfile(str(path)) # type: ignore[attr-defined] return if sys.platform == "darwin": subprocess.Popen(["open", str(path)], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) return subprocess.Popen(["xdg-open", str(path)], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) except Exception as exc: log(f"Error opening file: {exc}", file=sys.stderr) def _sanitize_filename(self, name: str) -> str: """Sanitize filename by removing invalid characters.""" allowed_chars = [] for ch in str(name): if ch.isalnum() or ch in {'-', '_', ' ', '.'}: allowed_chars.append(ch) else: allowed_chars.append(' ') # Collapse multiple spaces sanitized = ' '.join(''.join(allowed_chars).split()) return sanitized or "export" def _unique_path(self, path: Path) -> Path: """Generate unique path by adding (1), (2), etc. if file exists.""" if not path.exists(): return path stem = path.stem suffix = path.suffix parent = path.parent counter = 1 while True: new_path = parent / f"{stem} ({counter}){suffix}" if not new_path.exists(): return new_path counter += 1 # Instantiate and register cmdlet Add_File_Instance = Get_File()