Files
Medios-Macina/cmdlet/get_metadata.py

306 lines
11 KiB
Python

from __future__ import annotations
from typing import Any, Dict, Sequence, Optional
import json
import sys
from SYS.logger import log
from pathlib import Path
from . import _shared as sh
Cmdlet = sh.Cmdlet
CmdletArg = sh.CmdletArg
SharedArgs = sh.SharedArgs
parse_cmdlet_args = sh.parse_cmdlet_args
get_field = sh.get_field
import pipeline as ctx
from result_table import ResultTable
class Get_Metadata(Cmdlet):
"""Class-based get-metadata cmdlet with self-registration."""
def __init__(self) -> None:
"""Initialize get-metadata cmdlet."""
super().__init__(
name="get-metadata",
summary="Print metadata for files by hash and storage backend.",
usage='get-metadata [-query "hash:<sha256>"] [-store <backend>]',
alias=["meta"],
arg=[
SharedArgs.QUERY,
SharedArgs.STORE,
],
detail=[
"- Retrieves metadata from storage backend using file hash as identifier.",
"- Shows hash, MIME type, size, duration/pages, known url, and import timestamp.",
"- Hash and store are taken from piped result or can be overridden with -query/-store flags.",
"- All metadata is retrieved from the storage backend's database (single source of truth).",
],
exec=self.run,
)
self.register()
@staticmethod
def _extract_imported_ts(meta: Dict[str, Any]) -> Optional[int]:
"""Extract an imported timestamp from metadata if available."""
if not isinstance(meta, dict):
return None
# Prefer explicit time_imported if present
explicit = meta.get("time_imported")
if isinstance(explicit, (int, float)):
return int(explicit)
# Try parsing string timestamps
if isinstance(explicit, str):
try:
import datetime as _dt
return int(_dt.datetime.fromisoformat(explicit).timestamp())
except Exception:
pass
return None
@staticmethod
def _format_imported(ts: Optional[int]) -> str:
"""Format timestamp as readable string."""
if not ts:
return ""
try:
import datetime as _dt
return _dt.datetime.utcfromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S")
except Exception:
return ""
@staticmethod
def _build_table_row(
title: str,
store: str,
path: str,
mime: str,
size_bytes: Optional[int],
dur_seconds: Optional[int],
imported_ts: Optional[int],
url: list[str],
hash_value: Optional[str],
pages: Optional[int] = None,
) -> Dict[str,
Any]:
"""Build a table row dict with metadata fields."""
size_mb = None
size_int: Optional[int] = None
if size_bytes is not None:
try:
size_int = int(size_bytes)
except Exception:
size_int = None
if isinstance(size_int, int):
try:
size_mb = int(size_int / (1024 * 1024))
except Exception:
size_mb = None
dur_int = int(dur_seconds) if isinstance(dur_seconds, (int, float)) else None
pages_int = int(pages) if isinstance(pages, (int, float)) else None
imported_label = Get_Metadata._format_imported(imported_ts)
duration_label = "Duration(s)"
duration_value = str(dur_int) if dur_int is not None else ""
if mime and mime.lower().startswith("application/pdf"):
duration_label = "Pages"
duration_value = str(pages_int) if pages_int is not None else ""
columns = [
("Title",
title or ""),
("Hash",
hash_value or ""),
("MIME",
mime or ""),
("Size(MB)",
str(size_mb) if size_mb is not None else ""),
(duration_label,
duration_value),
("Imported",
imported_label),
("Store",
store or ""),
]
return {
"title": title or path,
"path": path,
"store": store,
"mime": mime,
"size_bytes": size_int,
"duration_seconds": dur_int,
"pages": pages_int,
"imported_ts": imported_ts,
"imported": imported_label,
"hash": hash_value,
"url": url,
"columns": columns,
}
@staticmethod
def _add_table_body_row(table: ResultTable, row: Dict[str, Any]) -> None:
"""Add a single row to the ResultTable using the prepared columns."""
columns = row.get("columns") if isinstance(row, dict) else None
lookup: Dict[str,
Any] = {}
if isinstance(columns, list):
for col in columns:
if isinstance(col, tuple) and len(col) == 2:
label, value = col
lookup[str(label)] = value
row_obj = table.add_row()
row_obj.add_column("Hash", lookup.get("Hash", ""))
row_obj.add_column("MIME", lookup.get("MIME", ""))
row_obj.add_column("Size(MB)", lookup.get("Size(MB)", ""))
if "Duration(s)" in lookup:
row_obj.add_column("Duration(s)", lookup.get("Duration(s)", ""))
elif "Pages" in lookup:
row_obj.add_column("Pages", lookup.get("Pages", ""))
else:
row_obj.add_column("Duration(s)", "")
def run(self, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
"""Main execution entry point."""
# Parse arguments
parsed = parse_cmdlet_args(args, self)
query_hash = sh.parse_single_hash_query(parsed.get("query"))
if parsed.get("query") and not query_hash:
log('No hash available - use -query "hash:<sha256>"', file=sys.stderr)
return 1
# Get hash and store from parsed args or result
file_hash = query_hash or get_field(result, "hash")
storage_source = parsed.get("store") or get_field(result, "store")
if not file_hash:
log('No hash available - use -query "hash:<sha256>"', file=sys.stderr)
return 1
if not storage_source:
log("No storage backend specified - use -store to specify", file=sys.stderr)
return 1
# Use storage backend to get metadata
try:
from Store import Store
storage = Store(config)
backend = storage[storage_source]
# Get metadata from backend
metadata = backend.get_metadata(file_hash)
if not metadata:
log(
f"No metadata found for hash {file_hash[:8]}... in {storage_source}",
file=sys.stderr,
)
return 1
# Extract title from tags if available
title = get_field(result, "title") or file_hash[:16]
if not get_field(result, "title"):
# Try to get title from tags
try:
tags, _ = backend.get_tag(file_hash)
for tag in tags:
if tag.lower().startswith("title:"):
title = tag.split(":", 1)[1]
break
except Exception:
pass
# Extract metadata fields
mime_type = metadata.get("mime") or metadata.get("ext", "")
file_size = metadata.get("size")
duration_seconds = metadata.get("duration")
if duration_seconds is None:
duration_seconds = metadata.get("duration_seconds")
if duration_seconds is None:
duration_seconds = metadata.get("length")
if duration_seconds is None and isinstance(metadata.get("duration_ms"),
(int,
float)):
try:
duration_seconds = float(metadata["duration_ms"]) / 1000.0
except Exception:
duration_seconds = None
if isinstance(duration_seconds, str):
s = duration_seconds.strip()
if s:
try:
duration_seconds = float(s)
except ValueError:
if ":" in s:
parts = [p.strip() for p in s.split(":") if p.strip()]
if len(parts) in {2,
3} and all(p.isdigit() for p in parts):
nums = [int(p) for p in parts]
if len(nums) == 2:
duration_seconds = float(nums[0] * 60 + nums[1])
else:
duration_seconds = float(
nums[0] * 3600 + nums[1] * 60 + nums[2]
)
else:
duration_seconds = None
pages = metadata.get("pages")
url = metadata.get("url") or []
imported_ts = self._extract_imported_ts(metadata)
# Normalize url
if isinstance(url, str):
try:
url = json.loads(url)
except (json.JSONDecodeError, TypeError):
url = []
if not isinstance(url, list):
url = []
# Build display row
row = self._build_table_row(
title=title,
store=storage_source,
path=metadata.get("path",
""),
mime=mime_type,
size_bytes=file_size,
dur_seconds=duration_seconds,
imported_ts=imported_ts,
url=url,
hash_value=file_hash,
pages=pages,
)
table_title = f"get-metadata: {title}" if title else "get-metadata"
table = ResultTable(table_title
).init_command(table_title,
"get-metadata",
list(args))
self._add_table_body_row(table, row)
ctx.set_last_result_table_overlay(table, [row], row)
ctx.emit(row)
return 0
except KeyError:
log(f"Storage backend '{storage_source}' not found", file=sys.stderr)
return 1
except Exception as exc:
log(f"Failed to get metadata: {exc}", file=sys.stderr)
return 1
CMDLET = Get_Metadata()