from __future__ import annotations from typing import Any, Dict, Sequence, Optional import json import sys from helper.logger import log from pathlib import Path import mimetypes import os from helper import hydrus as hydrus_wrapper from helper.local_library import LocalLibraryDB from ._shared import Cmdlet, CmdletArg, normalize_hash from config import get_local_storage_path import pipeline as ctx from result_table import ResultTable def _extract_imported_ts(meta: Dict[str, Any]) -> Optional[int]: """Extract an imported timestamp from Hydrus metadata if available.""" if not isinstance(meta, dict): return None # Prefer explicit time_imported if present explicit = meta.get("time_imported") if isinstance(explicit, (int, float)): return int(explicit) file_services = meta.get("file_services") if isinstance(file_services, dict): current = file_services.get("current") if isinstance(current, dict): numeric = [int(v) for v in current.values() if isinstance(v, (int, float))] if numeric: return min(numeric) return None def _format_imported(ts: Optional[int]) -> str: if not ts: return "" try: import datetime as _dt return _dt.datetime.utcfromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S") except Exception: return "" def _build_table_row(title: str, origin: str, path: str, mime: str, size_bytes: Optional[int], dur_seconds: Optional[int], imported_ts: Optional[int], urls: list[str], hash_value: Optional[str], pages: Optional[int] = None) -> Dict[str, Any]: size_mb = None if isinstance(size_bytes, int): try: size_mb = int(size_bytes / (1024 * 1024)) except Exception: size_mb = None dur_int = int(dur_seconds) if isinstance(dur_seconds, (int, float)) else None pages_int = int(pages) if isinstance(pages, (int, float)) else None imported_label = _format_imported(imported_ts) duration_label = "Duration(s)" duration_value = str(dur_int) if dur_int is not None else "" if mime and mime.lower().startswith("application/pdf"): duration_label = "Pages" duration_value = str(pages_int) if pages_int is not None else "" columns = [ ("Title", title or ""), ("Hash", hash_value or ""), ("MIME", mime or ""), ("Size(MB)", str(size_mb) if size_mb is not None else ""), (duration_label, duration_value), ("Imported", imported_label), ("Store", origin or ""), ] return { "title": title or path, "path": path, "origin": origin, "mime": mime, "size_bytes": size_bytes, "duration_seconds": dur_int, "pages": pages_int, "imported_ts": imported_ts, "imported": imported_label, "hash": hash_value, "known_urls": urls, "columns": columns, } def _run(result: Any, _args: Sequence[str], config: Dict[str, Any]) -> int: # Help try: if any(str(a).lower() in {"-?", "/?", "--help", "-h", "help", "--cmdlet"} for a in _args): log(json.dumps(CMDLET.to_dict(), ensure_ascii=False, indent=2)) return 0 except Exception: pass # Helper to get field from both dict and object def get_field(obj: Any, field: str, default: Any = None) -> Any: if isinstance(obj, dict): return obj.get(field, default) else: return getattr(obj, field, default) # Parse -hash override override_hash: str | None = None args_list = list(_args) i = 0 while i < len(args_list): a = args_list[i] low = str(a).lower() if low in {"-hash", "--hash", "hash"} and i + 1 < len(args_list): override_hash = str(args_list[i + 1]).strip() break i += 1 # Try to determine if this is a local file or Hydrus file local_path = get_field(result, "target", None) or get_field(result, "path", None) is_local = False if local_path and isinstance(local_path, str) and not local_path.startswith(("http://", "https://")): is_local = True # LOCAL FILE PATH if is_local and local_path: try: file_path = Path(str(local_path)) if file_path.exists() and file_path.is_file(): # Get the hash from result or compute it hash_hex = normalize_hash(override_hash) if override_hash else normalize_hash(get_field(result, "hash_hex", None)) # If no hash, compute SHA256 of the file if not hash_hex: try: import hashlib with open(file_path, 'rb') as f: hash_hex = hashlib.sha256(f.read()).hexdigest() except Exception: hash_hex = None # Get MIME type mime_type, _ = mimetypes.guess_type(str(file_path)) if not mime_type: mime_type = "unknown" # Pull metadata from local DB if available (for imported timestamp, duration, etc.) db_metadata = None library_root = get_local_storage_path(config) if library_root: try: with LocalLibraryDB(library_root) as db: db_metadata = db.get_metadata(file_path) or None except Exception: db_metadata = None # Get file size (prefer DB size if present) file_size = None if isinstance(db_metadata, dict) and isinstance(db_metadata.get("size"), int): file_size = db_metadata.get("size") else: try: file_size = file_path.stat().st_size except Exception: file_size = None # Duration/pages duration_seconds = None pages = None if isinstance(db_metadata, dict): if isinstance(db_metadata.get("duration"), (int, float)): duration_seconds = float(db_metadata.get("duration")) if isinstance(db_metadata.get("pages"), (int, float)): pages = int(db_metadata.get("pages")) if duration_seconds is None and mime_type and mime_type.startswith("video"): try: import subprocess result_proc = subprocess.run( ["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", str(file_path)], capture_output=True, text=True, timeout=5 ) if result_proc.returncode == 0 and result_proc.stdout.strip(): duration_seconds = float(result_proc.stdout.strip()) except Exception: pass # Known URLs from sidecar or result urls = [] sidecar_path = Path(str(file_path) + '.tags') if sidecar_path.exists(): try: with open(sidecar_path, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if line.startswith('known_url:'): url_value = line.replace('known_url:', '', 1).strip() if url_value: urls.append(url_value) except Exception: pass if not urls: urls_from_result = get_field(result, "known_urls", None) or get_field(result, "urls", None) if isinstance(urls_from_result, list): urls.extend([str(u).strip() for u in urls_from_result if u]) imported_ts = None if isinstance(db_metadata, dict): ts = db_metadata.get("time_imported") or db_metadata.get("time_added") if isinstance(ts, (int, float)): imported_ts = int(ts) elif isinstance(ts, str): try: import datetime as _dt imported_ts = int(_dt.datetime.fromisoformat(ts).timestamp()) except Exception: imported_ts = None row = _build_table_row( title=file_path.name, origin="local", path=str(file_path), mime=mime_type or "", size_bytes=int(file_size) if isinstance(file_size, int) else None, dur_seconds=duration_seconds, imported_ts=imported_ts, urls=urls, hash_value=hash_hex, pages=pages, ) table_title = file_path.name table = ResultTable(table_title) table.set_source_command("get-metadata", list(_args)) table.add_result(row) ctx.set_last_result_table_overlay(table, [row], row) ctx.emit(row) return 0 except Exception: # Fall through to Hydrus if local file handling fails pass # HYDRUS PATH hash_hex = normalize_hash(override_hash) if override_hash else normalize_hash(get_field(result, "hash_hex", None)) if not hash_hex: log("Selected result does not include a Hydrus hash or local path", file=sys.stderr) return 1 try: client = hydrus_wrapper.get_client(config) except Exception as exc: log(f"Hydrus client unavailable: {exc}", file=sys.stderr) return 1 if client is None: log("Hydrus client unavailable", file=sys.stderr) return 1 try: payload = client.fetch_file_metadata( hashes=[hash_hex], include_service_keys_to_tags=False, include_file_urls=True, include_duration=True, include_size=True, include_mime=True, ) except Exception as exc: log(f"Hydrus metadata fetch failed: {exc}", file=sys.stderr) return 1 items = payload.get("metadata") if isinstance(payload, dict) else None if not isinstance(items, list) or not items: log("No metadata found.") return 0 meta = items[0] if isinstance(items[0], dict) else None if not isinstance(meta, dict): log("No metadata found.") return 0 mime = meta.get("mime") size = meta.get("size") or meta.get("file_size") duration_value = meta.get("duration") inner = meta.get("metadata") if isinstance(meta.get("metadata"), dict) else None if duration_value is None and isinstance(inner, dict): duration_value = inner.get("duration") imported_ts = _extract_imported_ts(meta) try: from .search_file import _hydrus_duration_seconds as _dur_secs except Exception: _dur_secs = lambda x: x dur_seconds = _dur_secs(duration_value) urls = meta.get("known_urls") or meta.get("urls") urls = [str(u).strip() for u in urls] if isinstance(urls, list) else [] row = _build_table_row( title=hash_hex, origin="hydrus", path=f"hydrus://file/{hash_hex}", mime=mime or "", size_bytes=int(size) if isinstance(size, int) else None, dur_seconds=int(dur_seconds) if isinstance(dur_seconds, (int, float)) else None, imported_ts=imported_ts, urls=urls, hash_value=hash_hex, pages=None, ) table = ResultTable(hash_hex or "Metadata") table.set_source_command("get-metadata", list(_args)) table.add_result(row) ctx.set_last_result_table_overlay(table, [row], row) ctx.emit(row) return 0 CMDLET = Cmdlet( name="get-metadata", summary="Print metadata for local or Hydrus files (hash, mime, duration, size, URLs).", usage="get-metadata [-hash ]", aliases=["meta"], args=[ CmdletArg("hash", description="Override the Hydrus file hash (SHA256) to target instead of the selected result."), ], details=[ "- For local files: Shows path, hash (computed if needed), MIME type, size, duration, and known URLs from sidecar.", "- For Hydrus files: Shows path (hydrus://), hash, MIME, duration, size, and known URLs.", "- Automatically detects local vs Hydrus files.", "- Local file hashes are computed via SHA256 if not already available.", ], )