Files
Medios-Macina/cmdlet/get_file.py
2026-01-23 21:32:34 -08:00

452 lines
16 KiB
Python

from __future__ import annotations
from typing import Any, Dict, Sequence
from pathlib import Path
import os
import sys
import shutil
import subprocess
import tempfile
import threading
import time
import http.server
from urllib.parse import quote
import webbrowser
from urllib.parse import urljoin
from urllib.request import pathname2url
from SYS import pipeline as ctx
from . import _shared as sh
from SYS.logger import log, debug
from Store import Store
from SYS.config import resolve_output_dir
from API.HTTP import _download_direct_file
class Get_File(sh.Cmdlet):
"""Export files to local path via hash+store."""
def __init__(self) -> None:
"""Initialize get-file cmdlet."""
super().__init__(
name="get-file",
summary="Export file to local path",
usage="@1 | get-file -path ./output",
arg=[
sh.SharedArgs.QUERY,
sh.SharedArgs.STORE,
sh.SharedArgs.PATH,
sh.CmdletArg(
"name",
description="Output filename (default: from metadata title)"
),
],
detail=[
"- Exports file from storage backend to local path",
'- Uses selected item\'s hash, or -query "hash:<sha256>"',
"- Preserves file extension and metadata",
],
exec=self.run,
)
self.register()
def run(self, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
"""Export file via hash+store backend."""
debug(f"[get-file] run() called with result type: {type(result)}")
parsed = sh.parse_cmdlet_args(args, self)
debug(f"[get-file] parsed args: {parsed}")
query_hash = sh.parse_single_hash_query(parsed.get("query"))
if parsed.get("query") and not query_hash:
log("Error: -query must be of the form hash:<sha256>")
return 1
# Extract hash and store from result or args
file_hash = query_hash or sh.get_field(result, "hash")
store_name = parsed.get("store") or sh.get_field(result, "store")
output_path = parsed.get("path")
output_name = parsed.get("name")
debug(f"[get-file] file_hash={file_hash} store_name={store_name}")
if not file_hash:
log(
'Error: No file hash provided (pipe an item or use -query "hash:<sha256>")'
)
return 1
if not store_name:
log("Error: No store name provided")
return 1
# Normalize hash
file_hash = sh.normalize_hash(file_hash)
if not file_hash:
log("Error: Invalid hash format")
return 1
debug(f"[get-file] Getting storage backend: {store_name}")
# Prefer instantiating only the named backend to avoid initializing all configured backends
try:
from Store.registry import get_backend_instance
backend = get_backend_instance(config, store_name, suppress_debug=True)
except Exception:
backend = None
if backend is None:
# Fallback to full registry when targeted instantiation fails
try:
store = Store(config)
backend = store[store_name]
except Exception:
log(f"Error: Storage backend '{store_name}' not found", file=sys.stderr)
return 1
debug(f"[get-file] Backend retrieved: {type(backend).__name__}")
# Get file metadata to determine name and extension
debug("[get-file] Getting metadata for hash...")
metadata = backend.get_metadata(file_hash)
if not metadata:
log(f"Error: File metadata not found for hash {file_hash}")
return 1
debug(
f"[get-file] Metadata retrieved: title={metadata.get('title')}, ext={metadata.get('ext')}"
)
def resolve_display_title() -> str:
candidates = [
sh.get_field(result,
"title"),
sh.get_field(result,
"name"),
sh.get_field(result,
"filename"),
(metadata.get("title") if isinstance(metadata,
dict) else None),
(metadata.get("name") if isinstance(metadata,
dict) else None),
(metadata.get("filename") if isinstance(metadata,
dict) else None),
]
for candidate in candidates:
if candidate is None:
continue
text = str(candidate).strip()
if text:
return text
return ""
debug(f"[get-file] Calling backend.get_file({file_hash})")
# Get file from backend (may return Path or URL string depending on backend).
# We pass url=True if no explicit path was provided, which hints the backend
# (specifically Hydrus) to return a browser-friendly URL instead of a local path.
want_url = (output_path is None)
debug(f"[get-file] Requesting file from backend (url_hint={want_url})...")
source_path = backend.get_file(file_hash, url=want_url)
debug(f"[get-file] backend.get_file returned: {source_path}")
download_url = None
if isinstance(source_path, str):
if source_path.startswith("http://") or source_path.startswith("https://"):
download_url = source_path
else:
source_path = Path(source_path)
if download_url and output_path is None:
# Hydrus backend returns a URL; open it only when no output path
try:
webbrowser.open(download_url)
except Exception as exc:
log(f"Error opening browser: {exc}", file=sys.stderr)
else:
debug(f"Opened in browser: {download_url}", file=sys.stderr)
ctx.emit(
{
"hash": file_hash,
"store": store_name,
"url": download_url,
"title": resolve_display_title() or "Opened",
}
)
return 0
if download_url is None:
if not source_path or not source_path.exists():
log(f"Error: Backend could not retrieve file for hash {file_hash}")
return 1
# Otherwise: export/copy to output_dir.
if output_path:
output_dir = Path(output_path).expanduser()
else:
output_dir = resolve_output_dir(config)
debug(f"[get-file] Output dir: {output_dir}")
output_dir.mkdir(parents=True, exist_ok=True)
# Determine output filename (only when exporting)
if output_name:
filename = output_name
else:
title = (
(metadata.get("title") if isinstance(metadata,
dict) else None)
or resolve_display_title() or "export"
)
filename = self._sanitize_filename(title)
# Add extension if metadata has it
ext = metadata.get("ext")
if ext and not filename.endswith(ext):
if not ext.startswith("."):
ext = "." + ext
filename += ext
dest_path: Path
if download_url:
downloaded = _download_direct_file(
download_url,
output_dir,
quiet=True,
suggested_filename=filename,
)
dest_path = downloaded.path
debug(f"[get-file] Downloaded remote file to {dest_path}", file=sys.stderr)
else:
dest_path = self._unique_path(output_dir / filename)
# Copy file to destination
debug(f"[get-file] Copying {source_path} -> {dest_path}", file=sys.stderr)
shutil.copy2(source_path, dest_path)
log(f"Exported: {dest_path}", file=sys.stderr)
# Emit result for pipeline
ctx.emit(
{
"hash": file_hash,
"store": store_name,
"path": str(dest_path),
"title": filename,
}
)
debug("[get-file] Completed successfully")
return 0
def _open_file_default(self, path: Path) -> None:
"""Open a local file in the OS default application."""
try:
suffix = str(path.suffix or "").lower()
if sys.platform.startswith("win"):
# On Windows, file associations for common media types can point at
# editors (Paint/VS Code). Prefer opening a localhost URL.
if self._open_local_file_in_browser_via_http(path):
return
if suffix in {
".png",
".jpg",
".jpeg",
".gif",
".webp",
".bmp",
".tif",
".tiff",
".svg",
}:
# Use default web browser for images.
if self._open_image_in_default_browser(path):
return
if sys.platform.startswith("win"):
os.startfile(str(path)) # type: ignore[attr-defined]
return
if sys.platform == "darwin":
subprocess.Popen(
["open",
str(path)],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
return
subprocess.Popen(
["xdg-open",
str(path)],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
except Exception as exc:
log(f"Error opening file: {exc}", file=sys.stderr)
def _open_local_file_in_browser_via_http(self, file_path: Path) -> bool:
"""Serve a single local file via localhost HTTP and open in browser.
This avoids Windows file-association issues (e.g., PNG -> Paint, HTML -> VS Code).
The server is bound to 127.0.0.1 on an ephemeral port and is shut down after
a timeout.
"""
try:
resolved = file_path.resolve()
directory = resolved.parent
filename = resolved.name
except Exception:
return False
class OneFileHandler(http.server.SimpleHTTPRequestHandler):
def __init__(self, *handler_args, **handler_kwargs):
super().__init__(
*handler_args,
directory=str(directory),
**handler_kwargs
)
def log_message(self, format: str, *args) -> None: # noqa: A003
# Keep normal output clean.
return
def do_GET(self) -> None: # noqa: N802
if self.path in {"/",
""}:
self.path = "/" + filename
return super().do_GET()
if self.path == "/" + filename or self.path == "/" + quote(filename):
return super().do_GET()
self.send_error(404)
def do_HEAD(self) -> None: # noqa: N802
if self.path in {"/",
""}:
self.path = "/" + filename
return super().do_HEAD()
if self.path == "/" + filename or self.path == "/" + quote(filename):
return super().do_HEAD()
self.send_error(404)
try:
httpd = http.server.ThreadingHTTPServer(("127.0.0.1", 0), OneFileHandler)
except Exception:
return False
port = httpd.server_address[1]
url = f"http://127.0.0.1:{port}/{quote(filename)}"
# Run server in the background.
server_thread = threading.Thread(
target=httpd.serve_forever,
kwargs={
"poll_interval": 0.2
},
daemon=True
)
server_thread.start()
# Auto-shutdown after a timeout to avoid lingering servers.
def shutdown_later() -> None:
time.sleep(10 * 60)
try:
httpd.shutdown()
except Exception:
pass
try:
httpd.server_close()
except Exception:
pass
threading.Thread(target=shutdown_later, daemon=True).start()
try:
debug(f"[get-file] Opening via localhost: {url}")
return bool(webbrowser.open(url))
except Exception:
return False
def _open_image_in_default_browser(self, image_path: Path) -> bool:
"""Open an image file in the user's default web browser.
We intentionally avoid opening the image path directly on Windows because
file associations may point to editors/viewers (e.g., Paint). Instead we
generate a tiny HTML wrapper and open that (HTML is typically associated
with the default browser).
"""
try:
resolved = image_path.resolve()
image_url = urljoin("file:", pathname2url(str(resolved)))
except Exception:
return False
# Create a stable wrapper filename to reduce temp-file spam.
wrapper_path = Path(
tempfile.gettempdir()
) / f"medeia-open-image-{resolved.stem}.html"
try:
wrapper_path.write_text(
"\n".join(
[
"<!doctype html>",
'<meta charset="utf-8">',
f"<title>{resolved.name}</title>",
"<style>html,body{margin:0;padding:0;background:#000}img{display:block;max-width:100vw;max-height:100vh;margin:auto}</style>",
f'<img src="{image_url}" alt="{resolved.name}">',
]
),
encoding="utf-8",
)
except Exception:
return False
# Prefer localhost server when possible (reliable on Windows).
if self._open_local_file_in_browser_via_http(image_path):
return True
wrapper_url = wrapper_path.as_uri()
try:
return bool(webbrowser.open(wrapper_url))
except Exception:
return False
def _sanitize_filename(self, name: str) -> str:
"""Sanitize filename by removing invalid characters."""
allowed_chars = []
for ch in str(name):
if ch.isalnum() or ch in {"-",
"_",
" ",
"."}:
allowed_chars.append(ch)
else:
allowed_chars.append(" ")
# Collapse multiple spaces
sanitized = " ".join("".join(allowed_chars).split())
return sanitized or "export"
def _unique_path(self, path: Path) -> Path:
"""Generate unique path by adding (1), (2), etc. if file exists."""
if not path.exists():
return path
stem = path.stem
suffix = path.suffix
parent = path.parent
counter = 1
while True:
new_path = parent / f"{stem} ({counter}){suffix}"
if not new_path.exists():
return new_path
counter += 1
# Instantiate and register cmdlet
Add_File_Instance = Get_File()