Files
Medios-Macina/cmdlet/get_file.py
2025-12-22 02:11:53 -08:00

382 lines
14 KiB
Python

from __future__ import annotations
from typing import Any, Dict, Sequence
from pathlib import Path
import os
import sys
import shutil
import subprocess
import tempfile
import threading
import time
import http.server
from urllib.parse import quote
import webbrowser
from urllib.parse import urljoin
from urllib.request import pathname2url
import pipeline as ctx
from . import _shared as sh
from SYS.logger import log, debug
from Store import Store
from config import resolve_output_dir
class Get_File(sh.Cmdlet):
"""Export files to local path via hash+store."""
def __init__(self) -> None:
"""Initialize get-file cmdlet."""
super().__init__(
name="get-file",
summary="Export file to local path",
usage="@1 | get-file -path C:\\Downloads",
arg=[
sh.SharedArgs.QUERY,
sh.SharedArgs.STORE,
sh.SharedArgs.PATH,
sh.CmdletArg("name", description="Output filename (default: from metadata title)"),
],
detail=[
"- Exports file from storage backend to local path",
"- Uses selected item's hash, or -query \"hash:<sha256>\"",
"- Preserves file extension and metadata",
],
exec=self.run,
)
self.register()
def run(self, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
"""Export file via hash+store backend."""
debug(f"[get-file] run() called with result type: {type(result)}")
parsed = sh.parse_cmdlet_args(args, self)
debug(f"[get-file] parsed args: {parsed}")
query_hash = sh.parse_single_hash_query(parsed.get("query"))
if parsed.get("query") and not query_hash:
log("Error: -query must be of the form hash:<sha256>")
return 1
# Extract hash and store from result or args
file_hash = query_hash or sh.get_field(result, "hash")
store_name = parsed.get("store") or sh.get_field(result, "store")
output_path = parsed.get("path")
output_name = parsed.get("name")
debug(f"[get-file] file_hash={file_hash} store_name={store_name}")
if not file_hash:
log("Error: No file hash provided (pipe an item or use -query \"hash:<sha256>\")")
return 1
if not store_name:
log("Error: No store name provided")
return 1
# Normalize hash
file_hash = sh.normalize_hash(file_hash)
if not file_hash:
log("Error: Invalid hash format")
return 1
debug(f"[get-file] Getting storage backend: {store_name}")
# Get storage backend
store = Store(config)
backend = store[store_name]
debug(f"[get-file] Backend retrieved: {type(backend).__name__}")
# Get file metadata to determine name and extension
debug(f"[get-file] Getting metadata for hash...")
metadata = backend.get_metadata(file_hash)
if not metadata:
log(f"Error: File metadata not found for hash {file_hash}")
return 1
debug(f"[get-file] Metadata retrieved: title={metadata.get('title')}, ext={metadata.get('ext')}")
def resolve_display_title() -> str:
candidates = [
sh.get_field(result, "title"),
sh.get_field(result, "name"),
sh.get_field(result, "filename"),
(metadata.get("title") if isinstance(metadata, dict) else None),
(metadata.get("name") if isinstance(metadata, dict) else None),
(metadata.get("filename") if isinstance(metadata, dict) else None),
]
for candidate in candidates:
if candidate is None:
continue
text = str(candidate).strip()
if text:
return text
return ""
debug(f"[get-file] Calling backend.get_file({file_hash})")
# Get file from backend (may return Path or URL string depending on backend)
source_path = backend.get_file(file_hash)
debug(f"[get-file] backend.get_file returned: {source_path}")
# Check if backend returned a URL (HydrusNetwork case)
if isinstance(source_path, str) and (source_path.startswith("http://") or source_path.startswith("https://")):
# Hydrus backend returns a URL; open it only for this explicit user action.
try:
webbrowser.open(source_path)
except Exception as exc:
log(f"Error opening browser: {exc}", file=sys.stderr)
else:
debug(f"Opened in browser: {source_path}", file=sys.stderr)
# Emit result for pipeline
ctx.emit({
"hash": file_hash,
"store": store_name,
"url": source_path,
"title": resolve_display_title() or "Opened",
})
return 0
# Otherwise treat as file path (local/folder backends)
if isinstance(source_path, str):
source_path = Path(source_path)
if not source_path or not source_path.exists():
log(f"Error: Backend could not retrieve file for hash {file_hash}")
return 1
# Folder store UX: without -path, just open the file in the default app.
# Only export/copy when -path is explicitly provided.
backend_name = type(backend).__name__
is_folder_backend = backend_name.lower() == "folder"
if is_folder_backend and not output_path:
display_title = resolve_display_title() or source_path.stem or "Opened"
ext_for_emit = metadata.get("ext") or source_path.suffix.lstrip(".")
self._open_file_default(source_path)
log(f"Opened: {source_path}", file=sys.stderr)
ctx.emit({
"hash": file_hash,
"store": store_name,
"path": str(source_path),
"title": str(display_title),
"ext": str(ext_for_emit or ""),
})
debug("[get-file] Completed successfully")
return 0
# Otherwise: export/copy to output_dir.
if output_path:
output_dir = Path(output_path).expanduser()
else:
output_dir = resolve_output_dir(config)
debug(f"[get-file] Output dir: {output_dir}")
output_dir.mkdir(parents=True, exist_ok=True)
# Determine output filename (only when exporting)
if output_name:
filename = output_name
else:
title = (metadata.get("title") if isinstance(metadata, dict) else None) or resolve_display_title() or "export"
filename = self._sanitize_filename(title)
# Add extension if metadata has it
ext = metadata.get("ext")
if ext and not filename.endswith(ext):
if not ext.startswith('.'):
ext = '.' + ext
filename += ext
dest_path = self._unique_path(output_dir / filename)
# Copy file to destination
debug(f"[get-file] Copying {source_path} -> {dest_path}", file=sys.stderr)
shutil.copy2(source_path, dest_path)
log(f"Exported: {dest_path}", file=sys.stderr)
# Emit result for pipeline
ctx.emit({
"hash": file_hash,
"store": store_name,
"path": str(dest_path),
"title": filename,
})
debug(f"[get-file] Completed successfully")
return 0
def _open_file_default(self, path: Path) -> None:
"""Open a local file in the OS default application."""
try:
suffix = str(path.suffix or "").lower()
if sys.platform.startswith("win"):
# On Windows, file associations for common media types can point at
# editors (Paint/VS Code). Prefer opening a localhost URL.
if self._open_local_file_in_browser_via_http(path):
return
if suffix in {".png", ".jpg", ".jpeg", ".gif", ".webp", ".bmp", ".tif", ".tiff", ".svg"}:
# Use default web browser for images.
if self._open_image_in_default_browser(path):
return
if sys.platform.startswith("win"):
os.startfile(str(path)) # type: ignore[attr-defined]
return
if sys.platform == "darwin":
subprocess.Popen(["open", str(path)], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return
subprocess.Popen(["xdg-open", str(path)], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
except Exception as exc:
log(f"Error opening file: {exc}", file=sys.stderr)
def _open_local_file_in_browser_via_http(self, file_path: Path) -> bool:
"""Serve a single local file via localhost HTTP and open in browser.
This avoids Windows file-association issues (e.g., PNG -> Paint, HTML -> VS Code).
The server is bound to 127.0.0.1 on an ephemeral port and is shut down after
a timeout.
"""
try:
resolved = file_path.resolve()
directory = resolved.parent
filename = resolved.name
except Exception:
return False
class OneFileHandler(http.server.SimpleHTTPRequestHandler):
def __init__(self, *handler_args, **handler_kwargs):
super().__init__(*handler_args, directory=str(directory), **handler_kwargs)
def log_message(self, format: str, *args) -> None: # noqa: A003
# Keep normal output clean.
return
def do_GET(self) -> None: # noqa: N802
if self.path in {"/", ""}:
self.path = "/" + filename
return super().do_GET()
if self.path == "/" + filename or self.path == "/" + quote(filename):
return super().do_GET()
self.send_error(404)
def do_HEAD(self) -> None: # noqa: N802
if self.path in {"/", ""}:
self.path = "/" + filename
return super().do_HEAD()
if self.path == "/" + filename or self.path == "/" + quote(filename):
return super().do_HEAD()
self.send_error(404)
try:
httpd = http.server.ThreadingHTTPServer(("127.0.0.1", 0), OneFileHandler)
except Exception:
return False
port = httpd.server_address[1]
url = f"http://127.0.0.1:{port}/{quote(filename)}"
# Run server in the background.
server_thread = threading.Thread(target=httpd.serve_forever, kwargs={"poll_interval": 0.2}, daemon=True)
server_thread.start()
# Auto-shutdown after a timeout to avoid lingering servers.
def shutdown_later() -> None:
time.sleep(10 * 60)
try:
httpd.shutdown()
except Exception:
pass
try:
httpd.server_close()
except Exception:
pass
threading.Thread(target=shutdown_later, daemon=True).start()
try:
debug(f"[get-file] Opening via localhost: {url}")
return bool(webbrowser.open(url))
except Exception:
return False
def _open_image_in_default_browser(self, image_path: Path) -> bool:
"""Open an image file in the user's default web browser.
We intentionally avoid opening the image path directly on Windows because
file associations may point to editors/viewers (e.g., Paint). Instead we
generate a tiny HTML wrapper and open that (HTML is typically associated
with the default browser).
"""
try:
resolved = image_path.resolve()
image_url = urljoin("file:", pathname2url(str(resolved)))
except Exception:
return False
# Create a stable wrapper filename to reduce temp-file spam.
wrapper_path = Path(tempfile.gettempdir()) / f"medeia-open-image-{resolved.stem}.html"
try:
wrapper_path.write_text(
"\n".join(
[
"<!doctype html>",
"<meta charset=\"utf-8\">",
f"<title>{resolved.name}</title>",
"<style>html,body{margin:0;padding:0;background:#000}img{display:block;max-width:100vw;max-height:100vh;margin:auto}</style>",
f"<img src=\"{image_url}\" alt=\"{resolved.name}\">",
]
),
encoding="utf-8",
)
except Exception:
return False
# Prefer localhost server when possible (reliable on Windows).
if self._open_local_file_in_browser_via_http(image_path):
return True
wrapper_url = wrapper_path.as_uri()
try:
return bool(webbrowser.open(wrapper_url))
except Exception:
return False
def _sanitize_filename(self, name: str) -> str:
"""Sanitize filename by removing invalid characters."""
allowed_chars = []
for ch in str(name):
if ch.isalnum() or ch in {'-', '_', ' ', '.'}:
allowed_chars.append(ch)
else:
allowed_chars.append(' ')
# Collapse multiple spaces
sanitized = ' '.join(''.join(allowed_chars).split())
return sanitized or "export"
def _unique_path(self, path: Path) -> Path:
"""Generate unique path by adding (1), (2), etc. if file exists."""
if not path.exists():
return path
stem = path.stem
suffix = path.suffix
parent = path.parent
counter = 1
while True:
new_path = parent / f"{stem} ({counter}){suffix}"
if not new_path.exists():
return new_path
counter += 1
# Instantiate and register cmdlet
Add_File_Instance = Get_File()