Files
Medios-Macina/cmdlet/get_metadata.py
2026-02-02 19:49:07 -08:00

418 lines
16 KiB
Python

from __future__ import annotations
from typing import Any, Dict, Sequence, Optional
import json
import sys
from SYS.logger import log
from . import _shared as sh
Cmdlet = sh.Cmdlet
CmdletArg = sh.CmdletArg
SharedArgs = sh.SharedArgs
parse_cmdlet_args = sh.parse_cmdlet_args
get_field = sh.get_field
from SYS import pipeline as ctx
from SYS.result_table import Table
class Get_Metadata(Cmdlet):
"""Class-based get-metadata cmdlet with self-registration."""
def __init__(self) -> None:
"""Initialize get-metadata cmdlet."""
super().__init__(
name="get-metadata",
summary="Print metadata for files by hash and storage backend.",
usage='get-metadata [-query "hash:<sha256>"] [-store <backend>]',
alias=["meta"],
arg=[
SharedArgs.QUERY,
SharedArgs.STORE,
],
detail=[
"- Retrieves metadata from storage backend using file hash as identifier.",
"- Shows hash, MIME type, size, duration/pages, known url, and import timestamp.",
"- Hash and store are taken from piped result or can be overridden with -query/-store flags.",
"- All metadata is retrieved from the storage backend's database (single source of truth).",
],
exec=self.run,
)
self.register()
@staticmethod
def _extract_imported_ts(meta: Dict[str, Any]) -> Optional[int]:
"""Extract an imported timestamp from metadata if available.
Attempts to parse imported timestamp from metadata dict in multiple formats:
- Numeric Unix timestamp (int/float)
- ISO format string (e.g., "2024-01-15T10:30:00")
Args:
meta: Metadata dictionary from backend (e.g., from get_metadata())
Returns:
Unix timestamp as integer if found, None otherwise
"""
if not isinstance(meta, dict):
return None
# Prefer explicit time_imported if present
explicit = meta.get("time_imported")
if isinstance(explicit, (int, float)):
return int(explicit)
# Try parsing string timestamps
if isinstance(explicit, str):
try:
import datetime as _dt
return int(_dt.datetime.fromisoformat(explicit).timestamp())
except Exception:
pass
return None
@staticmethod
def _format_imported(ts: Optional[int]) -> str:
"""Format Unix timestamp as human-readable date string (UTC).
Converts Unix timestamp to YYYY-MM-DD HH:MM:SS format.
Used for displaying file import dates to users.
Args:
ts: Unix timestamp (integer) or None
Returns:
Formatted date string (e.g., "2024-01-15 10:30:00") or empty string if invalid
"""
if not ts:
return ""
try:
import datetime as _dt
return _dt.datetime.utcfromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S")
except Exception:
return ""
@staticmethod
def _build_table_row(
title: str,
store: str,
path: str,
mime: str,
size_bytes: Optional[int],
dur_seconds: Optional[int],
imported_ts: Optional[int],
url: list[str],
hash_value: Optional[str],
pages: Optional[int] = None,
tag: Optional[List[str]] = None,
ext: Optional[str] = None,
) -> Dict[str,
Any]:
"""Build a normalized metadata row dict for display and piping.
Converts raw metadata fields into a standardized row format suitable for:
- Display in result tables
- Piping to downstream cmdlets
- JSON serialization
Args:
title: File or resource title
store: Backend store name (e.g., "hydrus", "local")
path: File path or resource identifier
mime: MIME type (e.g., "image/jpeg", "video/mp4")
size_bytes: File size in bytes
dur_seconds: Duration in seconds (for video/audio)
imported_ts: Unix timestamp when item was imported
url: List of known URLs associated with file
hash_value: File hash (SHA256 or other)
pages: Number of pages (for PDFs)
tag: List of tags applied to file
ext: File extension (e.g., "jpg", "mp4")
Returns:
Dictionary with normalized metadata fields and display columns
"""
size_mb = None
size_int: Optional[int] = None
if size_bytes is not None:
try:
size_int = int(size_bytes)
except Exception:
size_int = None
if isinstance(size_int, int):
try:
size_mb = int(size_int / (1024 * 1024))
except Exception:
size_mb = None
dur_int = int(dur_seconds) if isinstance(dur_seconds, (int, float)) else None
pages_int = int(pages) if isinstance(pages, (int, float)) else None
imported_label = Get_Metadata._format_imported(imported_ts)
duration_label = "Duration(s)"
duration_value = str(dur_int) if dur_int is not None else ""
if mime and mime.lower().startswith("application/pdf"):
duration_label = "Pages"
duration_value = str(pages_int) if pages_int is not None else ""
columns = [
("Title",
title or ""),
("Hash",
hash_value or ""),
("MIME",
mime or ""),
("Size(MB)",
str(size_mb) if size_mb is not None else ""),
(duration_label,
duration_value),
("Imported",
imported_label),
("Store",
store or ""),
]
return {
"title": title or path,
"path": path,
"store": store,
"mime": mime,
"ext": ext or "",
"size_bytes": size_int,
"duration_seconds": dur_int,
"pages": pages_int,
"imported_ts": imported_ts,
"imported": imported_label,
"hash": hash_value,
"url": url,
"tag": tag or [],
"columns": columns,
}
@staticmethod
def _add_table_body_row(table: Table, row: Dict[str, Any]) -> None:
"""Add a single metadata row to the result table.
Extracts column values from row dict and adds to result table using
standard column ordering (Hash, MIME, Size, Duration/Pages).
Args:
table: Result table to add row to
row: Metadata row dict (from _build_table_row)
"""
columns = row.get("columns") if isinstance(row, dict) else None
lookup: Dict[str,
Any] = {}
if isinstance(columns, list):
for col in columns:
if isinstance(col, tuple) and len(col) == 2:
label, value = col
lookup[str(label)] = value
row_obj = table.add_row()
row_obj.add_column("Hash", lookup.get("Hash", ""))
row_obj.add_column("MIME", lookup.get("MIME", ""))
row_obj.add_column("Size(MB)", lookup.get("Size(MB)", ""))
if "Duration(s)" in lookup:
row_obj.add_column("Duration(s)", lookup.get("Duration(s)", ""))
elif "Pages" in lookup:
row_obj.add_column("Pages", lookup.get("Pages", ""))
else:
row_obj.add_column("Duration(s)", "")
def run(self, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
"""Execute get-metadata cmdlet - retrieve and display file metadata.
Queries a storage backend (Hydrus, local, etc.) for file metadata using hash.
Extracts tags embedded in metadata response (avoiding duplicate API calls).
Displays metadata in rich detail panel and result table.
Allows piping (@N) to other cmdlets for chaining operations.
Optimizations:
- Extracts tags from metadata response (no separate get_tag() call)
- Single HTTP request to backends per file
Args:
result: Piped input (dict with optional hash/store/title/tag fields)
args: Command line arguments ([-query "hash:..."] [-store backend])
config: Application configuration dict
Returns:
0 on success, 1 on error (no metadata found, backend unavailable, etc.)
"""
# Parse arguments
parsed = parse_cmdlet_args(args, self)
query_hash = sh.parse_single_hash_query(parsed.get("query"))
if parsed.get("query") and not query_hash:
log('No hash available - use -query "hash:<sha256>"', file=sys.stderr)
return 1
# Get hash and store from parsed args or result
file_hash = query_hash or get_field(result, "hash")
storage_source = parsed.get("store") or get_field(result, "store")
if not file_hash:
log('No hash available - use -query "hash:<sha256>"', file=sys.stderr)
return 1
if not storage_source:
log("No storage backend specified - use -store to specify", file=sys.stderr)
return 1
# Use storage backend to get metadata
try:
# Instantiate only the required backend when possible to avoid initializing all configured backends
try:
from Store.registry import get_backend_instance
backend = get_backend_instance(config, storage_source, suppress_debug=True)
except Exception:
backend = None
if backend is None:
try:
from Store import Store
storage = Store(config)
backend = storage[storage_source]
except Exception:
log(f"Storage backend '{storage_source}' not found", file=sys.stderr)
return 1
# Get metadata from backend
metadata = backend.get_metadata(file_hash)
if not metadata:
log(
f"No metadata found for hash {file_hash[:8]}... in {storage_source}",
file=sys.stderr,
)
return 1
# Extract title from tags if available
title = get_field(result, "title") or file_hash[:16]
# Get tags from input result
item_tags = get_field(result, "tag") or get_field(result, "tags") or []
if not isinstance(item_tags, list):
item_tags = [str(item_tags)]
else:
item_tags = [str(t) for t in item_tags]
# Extract tags from metadata response instead of making a separate get_tag() request
# This prevents duplicate API calls to Hydrus (metadata already includes tags)
metadata_tags = metadata.get("tags")
if isinstance(metadata_tags, dict):
# metadata["tags"] is {service_key: {service_data}}
for service_data in metadata_tags.values():
if isinstance(service_data, dict):
display_tags = service_data.get("display_tags", {})
if isinstance(display_tags, dict):
# display_tags is typically {status: tag_list}
for tag_list in display_tags.values():
if isinstance(tag_list, list):
for t in tag_list:
ts = str(t) if t else ""
if ts and ts not in item_tags:
item_tags.append(ts)
# Check for title tag
if not get_field(result, "title") and ts.lower().startswith("title:"):
parts = ts.split(":", 1)
if len(parts) > 1:
title = parts[1].strip()
break # Only use first status level
if any(t for t in item_tags if str(t).lower().startswith("title:")):
break # Found title tag, stop searching services
# Extract metadata fields
mime_type = metadata.get("mime") or metadata.get("ext", "")
file_ext = metadata.get("ext", "") # Extract file extension separately
file_size = metadata.get("size")
duration_seconds = metadata.get("duration")
if duration_seconds is None:
duration_seconds = metadata.get("duration_seconds")
if duration_seconds is None:
duration_seconds = metadata.get("length")
if duration_seconds is None and isinstance(metadata.get("duration_ms"),
(int,
float)):
try:
duration_seconds = float(metadata["duration_ms"]) / 1000.0
except Exception:
duration_seconds = None
if isinstance(duration_seconds, str):
s = duration_seconds.strip()
if s:
try:
duration_seconds = float(s)
except ValueError:
if ":" in s:
parts = [p.strip() for p in s.split(":") if p.strip()]
if len(parts) in {2,
3} and all(p.isdigit() for p in parts):
nums = [int(p) for p in parts]
if len(nums) == 2:
duration_seconds = float(nums[0] * 60 + nums[1])
else:
duration_seconds = float(
nums[0] * 3600 + nums[1] * 60 + nums[2]
)
else:
duration_seconds = None
pages = metadata.get("pages")
url = metadata.get("url") or []
imported_ts = self._extract_imported_ts(metadata)
# Normalize url
if isinstance(url, str):
try:
url = json.loads(url)
except (json.JSONDecodeError, TypeError):
url = []
if not isinstance(url, list):
url = []
# Build display row
row = self._build_table_row(
title=title,
store=storage_source,
path=metadata.get("path",
""),
mime=mime_type,
size_bytes=file_size,
dur_seconds=duration_seconds,
imported_ts=imported_ts,
url=url,
hash_value=file_hash,
pages=pages,
tag=item_tags,
ext=file_ext,
)
table_title = f"get-metadata: {title}" if title else "get-metadata"
table = Table(table_title
).init_command(table_title,
"get-metadata",
list(args))
self._add_table_body_row(table, row)
# Use helper to display item and make it @-selectable
from ._shared import display_and_persist_items
display_and_persist_items([row], title=table_title, subject=row)
ctx.emit(row)
return 0
except KeyError:
log(f"Storage backend '{storage_source}' not found", file=sys.stderr)
return 1
except Exception as exc:
log(f"Failed to get metadata: {exc}", file=sys.stderr)
return 1
CMDLET = Get_Metadata()