Files
Medios-Macina/cmdlets/get_metadata.py
2025-12-11 19:04:02 -08:00

230 lines
8.4 KiB
Python

from __future__ import annotations
from typing import Any, Dict, Sequence, Optional
import json
import sys
from SYS.logger import log
from pathlib import Path
from ._shared import Cmdlet, CmdletArg, SharedArgs, parse_cmdlet_args, get_field
import pipeline as ctx
from result_table import ResultTable
class Get_Metadata(Cmdlet):
"""Class-based get-metadata cmdlet with self-registration."""
def __init__(self) -> None:
"""Initialize get-metadata cmdlet."""
super().__init__(
name="get-metadata",
summary="Print metadata for files by hash and storage backend.",
usage="get-metadata [-hash <sha256>] [-store <backend>]",
alias=["meta"],
arg=[
SharedArgs.HASH,
SharedArgs.STORE,
],
detail=[
"- Retrieves metadata from storage backend using file hash as identifier.",
"- Shows hash, MIME type, size, duration/pages, known url, and import timestamp.",
"- Hash and store are taken from piped result or can be overridden with -hash/-store flags.",
"- All metadata is retrieved from the storage backend's database (single source of truth).",
],
exec=self.run,
)
self.register()
@staticmethod
def _extract_imported_ts(meta: Dict[str, Any]) -> Optional[int]:
"""Extract an imported timestamp from metadata if available."""
if not isinstance(meta, dict):
return None
# Prefer explicit time_imported if present
explicit = meta.get("time_imported")
if isinstance(explicit, (int, float)):
return int(explicit)
# Try parsing string timestamps
if isinstance(explicit, str):
try:
import datetime as _dt
return int(_dt.datetime.fromisoformat(explicit).timestamp())
except Exception:
pass
return None
@staticmethod
def _format_imported(ts: Optional[int]) -> str:
"""Format timestamp as readable string."""
if not ts:
return ""
try:
import datetime as _dt
return _dt.datetime.utcfromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S")
except Exception:
return ""
@staticmethod
def _build_table_row(title: str, store: str, path: str, mime: str, size_bytes: Optional[int],
dur_seconds: Optional[int], imported_ts: Optional[int], url: list[str],
hash_value: Optional[str], pages: Optional[int] = None) -> Dict[str, Any]:
"""Build a table row dict with metadata fields."""
size_mb = None
if isinstance(size_bytes, int):
try:
size_mb = int(size_bytes / (1024 * 1024))
except Exception:
size_mb = None
dur_int = int(dur_seconds) if isinstance(dur_seconds, (int, float)) else None
pages_int = int(pages) if isinstance(pages, (int, float)) else None
imported_label = Get_Metadata._format_imported(imported_ts)
duration_label = "Duration(s)"
duration_value = str(dur_int) if dur_int is not None else ""
if mime and mime.lower().startswith("application/pdf"):
duration_label = "Pages"
duration_value = str(pages_int) if pages_int is not None else ""
columns = [
("Title", title or ""),
("Hash", hash_value or ""),
("MIME", mime or ""),
("Size(MB)", str(size_mb) if size_mb is not None else ""),
(duration_label, duration_value),
("Imported", imported_label),
("Store", store or ""),
]
return {
"title": title or path,
"path": path,
"store": store,
"mime": mime,
"size_bytes": size_bytes,
"duration_seconds": dur_int,
"pages": pages_int,
"imported_ts": imported_ts,
"imported": imported_label,
"hash": hash_value,
"url": url,
"columns": columns,
}
@staticmethod
def _add_table_body_row(table: ResultTable, row: Dict[str, Any]) -> None:
"""Add a single row to the ResultTable using the prepared columns."""
columns = row.get("columns") if isinstance(row, dict) else None
lookup: Dict[str, Any] = {}
if isinstance(columns, list):
for col in columns:
if isinstance(col, tuple) and len(col) == 2:
label, value = col
lookup[str(label)] = value
row_obj = table.add_row()
row_obj.add_column("Hash", lookup.get("Hash", ""))
row_obj.add_column("MIME", lookup.get("MIME", ""))
row_obj.add_column("Size(MB)", lookup.get("Size(MB)", ""))
if "Duration(s)" in lookup:
row_obj.add_column("Duration(s)", lookup.get("Duration(s)", ""))
elif "Pages" in lookup:
row_obj.add_column("Pages", lookup.get("Pages", ""))
else:
row_obj.add_column("Duration(s)", "")
def run(self, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
"""Main execution entry point."""
# Parse arguments
parsed = parse_cmdlet_args(args, self)
# Get hash and store from parsed args or result
file_hash = parsed.get("hash") or get_field(result, "hash")
storage_source = parsed.get("store") or get_field(result, "store")
if not file_hash:
log("No hash available - use -hash to specify", file=sys.stderr)
return 1
if not storage_source:
log("No storage backend specified - use -store to specify", file=sys.stderr)
return 1
# Use storage backend to get metadata
try:
from Store import Store
storage = Store(config)
backend = storage[storage_source]
# Get metadata from backend
metadata = backend.get_metadata(file_hash)
if not metadata:
log(f"No metadata found for hash {file_hash[:8]}... in {storage_source}", file=sys.stderr)
return 1
# Extract title from tags if available
title = get_field(result, "title") or file_hash[:16]
if not get_field(result, "title"):
# Try to get title from tags
try:
tags, _ = backend.get_tag(file_hash)
for tag in tags:
if tag.lower().startswith("title:"):
title = tag.split(":", 1)[1]
break
except Exception:
pass
# Extract metadata fields
mime_type = metadata.get("mime") or metadata.get("ext", "")
file_size = metadata.get("size")
duration_seconds = metadata.get("duration")
pages = metadata.get("pages")
url = metadata.get("url") or []
imported_ts = self._extract_imported_ts(metadata)
# Normalize url
if isinstance(url, str):
try:
url = json.loads(url)
except (json.JSONDecodeError, TypeError):
url = []
if not isinstance(url, list):
url = []
# Build display row
row = self._build_table_row(
title=title,
store=storage_source,
path=metadata.get("path", ""),
mime=mime_type,
size_bytes=file_size,
dur_seconds=duration_seconds,
imported_ts=imported_ts,
url=url,
hash_value=file_hash,
pages=pages,
)
table_title = title
table = ResultTable(table_title).init_command("get-metadata", list(args))
self._add_table_body_row(table, row)
ctx.set_last_result_table_overlay(table, [row], row)
ctx.emit(row)
return 0
except KeyError:
log(f"Storage backend '{storage_source}' not found", file=sys.stderr)
return 1
except Exception as exc:
log(f"Failed to get metadata: {exc}", file=sys.stderr)
return 1
CMDLET = Get_Metadata()