from __future__ import annotations from typing import Any, Dict, Sequence, Optional import json import sys from SYS.logger import log from pathlib import Path from ._shared import Cmdlet, CmdletArg, SharedArgs, parse_cmdlet_args, get_field import pipeline as ctx from result_table import ResultTable class Get_Metadata(Cmdlet): """Class-based get-metadata cmdlet with self-registration.""" def __init__(self) -> None: """Initialize get-metadata cmdlet.""" super().__init__( name="get-metadata", summary="Print metadata for files by hash and storage backend.", usage="get-metadata [-hash ] [-store ]", alias=["meta"], arg=[ SharedArgs.HASH, SharedArgs.STORE, ], detail=[ "- Retrieves metadata from storage backend using file hash as identifier.", "- Shows hash, MIME type, size, duration/pages, known url, and import timestamp.", "- Hash and store are taken from piped result or can be overridden with -hash/-store flags.", "- All metadata is retrieved from the storage backend's database (single source of truth).", ], exec=self.run, ) self.register() @staticmethod def _extract_imported_ts(meta: Dict[str, Any]) -> Optional[int]: """Extract an imported timestamp from metadata if available.""" if not isinstance(meta, dict): return None # Prefer explicit time_imported if present explicit = meta.get("time_imported") if isinstance(explicit, (int, float)): return int(explicit) # Try parsing string timestamps if isinstance(explicit, str): try: import datetime as _dt return int(_dt.datetime.fromisoformat(explicit).timestamp()) except Exception: pass return None @staticmethod def _format_imported(ts: Optional[int]) -> str: """Format timestamp as readable string.""" if not ts: return "" try: import datetime as _dt return _dt.datetime.utcfromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S") except Exception: return "" @staticmethod def _build_table_row(title: str, store: str, path: str, mime: str, size_bytes: Optional[int], dur_seconds: Optional[int], imported_ts: Optional[int], url: list[str], hash_value: Optional[str], pages: Optional[int] = None) -> Dict[str, Any]: """Build a table row dict with metadata fields.""" size_mb = None if isinstance(size_bytes, int): try: size_mb = int(size_bytes / (1024 * 1024)) except Exception: size_mb = None dur_int = int(dur_seconds) if isinstance(dur_seconds, (int, float)) else None pages_int = int(pages) if isinstance(pages, (int, float)) else None imported_label = Get_Metadata._format_imported(imported_ts) duration_label = "Duration(s)" duration_value = str(dur_int) if dur_int is not None else "" if mime and mime.lower().startswith("application/pdf"): duration_label = "Pages" duration_value = str(pages_int) if pages_int is not None else "" columns = [ ("Title", title or ""), ("Hash", hash_value or ""), ("MIME", mime or ""), ("Size(MB)", str(size_mb) if size_mb is not None else ""), (duration_label, duration_value), ("Imported", imported_label), ("Store", store or ""), ] return { "title": title or path, "path": path, "store": store, "mime": mime, "size_bytes": size_bytes, "duration_seconds": dur_int, "pages": pages_int, "imported_ts": imported_ts, "imported": imported_label, "hash": hash_value, "url": url, "columns": columns, } @staticmethod def _add_table_body_row(table: ResultTable, row: Dict[str, Any]) -> None: """Add a single row to the ResultTable using the prepared columns.""" columns = row.get("columns") if isinstance(row, dict) else None lookup: Dict[str, Any] = {} if isinstance(columns, list): for col in columns: if isinstance(col, tuple) and len(col) == 2: label, value = col lookup[str(label)] = value row_obj = table.add_row() row_obj.add_column("Hash", lookup.get("Hash", "")) row_obj.add_column("MIME", lookup.get("MIME", "")) row_obj.add_column("Size(MB)", lookup.get("Size(MB)", "")) if "Duration(s)" in lookup: row_obj.add_column("Duration(s)", lookup.get("Duration(s)", "")) elif "Pages" in lookup: row_obj.add_column("Pages", lookup.get("Pages", "")) else: row_obj.add_column("Duration(s)", "") def run(self, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: """Main execution entry point.""" # Parse arguments parsed = parse_cmdlet_args(args, self) # Get hash and store from parsed args or result file_hash = parsed.get("hash") or get_field(result, "hash") storage_source = parsed.get("store") or get_field(result, "store") if not file_hash: log("No hash available - use -hash to specify", file=sys.stderr) return 1 if not storage_source: log("No storage backend specified - use -store to specify", file=sys.stderr) return 1 # Use storage backend to get metadata try: from Store import Store storage = Store(config) backend = storage[storage_source] # Get metadata from backend metadata = backend.get_metadata(file_hash) if not metadata: log(f"No metadata found for hash {file_hash[:8]}... in {storage_source}", file=sys.stderr) return 1 # Extract title from tags if available title = get_field(result, "title") or file_hash[:16] if not get_field(result, "title"): # Try to get title from tags try: tags, _ = backend.get_tag(file_hash) for tag in tags: if tag.lower().startswith("title:"): title = tag.split(":", 1)[1] break except Exception: pass # Extract metadata fields mime_type = metadata.get("mime") or metadata.get("ext", "") file_size = metadata.get("size") duration_seconds = metadata.get("duration") pages = metadata.get("pages") url = metadata.get("url") or [] imported_ts = self._extract_imported_ts(metadata) # Normalize url if isinstance(url, str): try: url = json.loads(url) except (json.JSONDecodeError, TypeError): url = [] if not isinstance(url, list): url = [] # Build display row row = self._build_table_row( title=title, store=storage_source, path=metadata.get("path", ""), mime=mime_type, size_bytes=file_size, dur_seconds=duration_seconds, imported_ts=imported_ts, url=url, hash_value=file_hash, pages=pages, ) table_title = title table = ResultTable(table_title).init_command("get-metadata", list(args)) self._add_table_body_row(table, row) ctx.set_last_result_table_overlay(table, [row], row) ctx.emit(row) return 0 except KeyError: log(f"Storage backend '{storage_source}' not found", file=sys.stderr) return 1 except Exception as exc: log(f"Failed to get metadata: {exc}", file=sys.stderr) return 1 CMDLET = Get_Metadata()