from __future__ import annotations from typing import Any, Dict, Sequence, Optional import json import sys from SYS.logger import log from . import _shared as sh Cmdlet = sh.Cmdlet CmdletArg = sh.CmdletArg SharedArgs = sh.SharedArgs parse_cmdlet_args = sh.parse_cmdlet_args get_field = sh.get_field from SYS import pipeline as ctx from SYS.result_table import Table class Get_Metadata(Cmdlet): """Class-based get-metadata cmdlet with self-registration.""" def __init__(self) -> None: """Initialize get-metadata cmdlet.""" super().__init__( name="get-metadata", summary="Print metadata for files by hash and storage backend.", usage='get-metadata [-query "hash:"] [-store ]', alias=["meta"], arg=[ SharedArgs.QUERY, SharedArgs.STORE, ], detail=[ "- Retrieves metadata from storage backend using file hash as identifier.", "- Shows hash, MIME type, size, duration/pages, known url, and import timestamp.", "- Hash and store are taken from piped result or can be overridden with -query/-store flags.", "- All metadata is retrieved from the storage backend's database (single source of truth).", ], exec=self.run, ) self.register() @staticmethod def _extract_imported_ts(meta: Dict[str, Any]) -> Optional[int]: """Extract an imported timestamp from metadata if available. Attempts to parse imported timestamp from metadata dict in multiple formats: - Numeric Unix timestamp (int/float) - ISO format string (e.g., "2024-01-15T10:30:00") Args: meta: Metadata dictionary from backend (e.g., from get_metadata()) Returns: Unix timestamp as integer if found, None otherwise """ if not isinstance(meta, dict): return None # Prefer explicit time_imported if present explicit = meta.get("time_imported") if isinstance(explicit, (int, float)): return int(explicit) # Try parsing string timestamps if isinstance(explicit, str): try: import datetime as _dt return int(_dt.datetime.fromisoformat(explicit).timestamp()) except Exception: pass return None @staticmethod def _format_imported(ts: Optional[int]) -> str: """Format Unix timestamp as human-readable date string (UTC). Converts Unix timestamp to YYYY-MM-DD HH:MM:SS format. Used for displaying file import dates to users. Args: ts: Unix timestamp (integer) or None Returns: Formatted date string (e.g., "2024-01-15 10:30:00") or empty string if invalid """ if not ts: return "" try: import datetime as _dt return _dt.datetime.utcfromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S") except Exception: return "" @staticmethod def _build_table_row( title: str, store: str, path: str, mime: str, size_bytes: Optional[int], dur_seconds: Optional[int], imported_ts: Optional[int], url: list[str], hash_value: Optional[str], pages: Optional[int] = None, tag: Optional[List[str]] = None, ext: Optional[str] = None, ) -> Dict[str, Any]: """Build a normalized metadata row dict for display and piping. Converts raw metadata fields into a standardized row format suitable for: - Display in result tables - Piping to downstream cmdlets - JSON serialization Args: title: File or resource title store: Backend store name (e.g., "hydrus", "local") path: File path or resource identifier mime: MIME type (e.g., "image/jpeg", "video/mp4") size_bytes: File size in bytes dur_seconds: Duration in seconds (for video/audio) imported_ts: Unix timestamp when item was imported url: List of known URLs associated with file hash_value: File hash (SHA256 or other) pages: Number of pages (for PDFs) tag: List of tags applied to file ext: File extension (e.g., "jpg", "mp4") Returns: Dictionary with normalized metadata fields and display columns """ size_mb = None size_int: Optional[int] = None if size_bytes is not None: try: size_int = int(size_bytes) except Exception: size_int = None if isinstance(size_int, int): try: size_mb = int(size_int / (1024 * 1024)) except Exception: size_mb = None dur_int = int(dur_seconds) if isinstance(dur_seconds, (int, float)) else None pages_int = int(pages) if isinstance(pages, (int, float)) else None imported_label = Get_Metadata._format_imported(imported_ts) duration_label = "Duration(s)" duration_value = str(dur_int) if dur_int is not None else "" if mime and mime.lower().startswith("application/pdf"): duration_label = "Pages" duration_value = str(pages_int) if pages_int is not None else "" columns = [ ("Title", title or ""), ("Hash", hash_value or ""), ("MIME", mime or ""), ("Size(MB)", str(size_mb) if size_mb is not None else ""), (duration_label, duration_value), ("Imported", imported_label), ("Store", store or ""), ] return { "title": title or path, "path": path, "store": store, "mime": mime, "ext": ext or "", "size_bytes": size_int, "duration_seconds": dur_int, "pages": pages_int, "imported_ts": imported_ts, "imported": imported_label, "hash": hash_value, "url": url, "tag": tag or [], "columns": columns, } @staticmethod def _add_table_body_row(table: Table, row: Dict[str, Any]) -> None: """Add a single metadata row to the result table. Extracts column values from row dict and adds to result table using standard column ordering (Hash, MIME, Size, Duration/Pages). Args: table: Result table to add row to row: Metadata row dict (from _build_table_row) """ columns = row.get("columns") if isinstance(row, dict) else None lookup: Dict[str, Any] = {} if isinstance(columns, list): for col in columns: if isinstance(col, tuple) and len(col) == 2: label, value = col lookup[str(label)] = value row_obj = table.add_row() row_obj.add_column("Hash", lookup.get("Hash", "")) row_obj.add_column("MIME", lookup.get("MIME", "")) row_obj.add_column("Size(MB)", lookup.get("Size(MB)", "")) if "Duration(s)" in lookup: row_obj.add_column("Duration(s)", lookup.get("Duration(s)", "")) elif "Pages" in lookup: row_obj.add_column("Pages", lookup.get("Pages", "")) else: row_obj.add_column("Duration(s)", "") def run(self, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: """Execute get-metadata cmdlet - retrieve and display file metadata. Queries a storage backend (Hydrus, local, etc.) for file metadata using hash. Extracts tags embedded in metadata response (avoiding duplicate API calls). Displays metadata in rich detail panel and result table. Allows piping (@N) to other cmdlets for chaining operations. Optimizations: - Extracts tags from metadata response (no separate get_tag() call) - Single HTTP request to backends per file Args: result: Piped input (dict with optional hash/store/title/tag fields) args: Command line arguments ([-query "hash:..."] [-store backend]) config: Application configuration dict Returns: 0 on success, 1 on error (no metadata found, backend unavailable, etc.) """ # Parse arguments parsed = parse_cmdlet_args(args, self) query_hash = sh.parse_single_hash_query(parsed.get("query")) if parsed.get("query") and not query_hash: log('No hash available - use -query "hash:"', file=sys.stderr) return 1 # Get hash and store from parsed args or result file_hash = query_hash or get_field(result, "hash") storage_source = parsed.get("store") or get_field(result, "store") if not file_hash: log('No hash available - use -query "hash:"', file=sys.stderr) return 1 if not storage_source: log("No storage backend specified - use -store to specify", file=sys.stderr) return 1 # Use storage backend to get metadata try: # Instantiate only the required backend when possible to avoid initializing all configured backends try: from Store.registry import get_backend_instance backend = get_backend_instance(config, storage_source, suppress_debug=True) except Exception: backend = None if backend is None: try: from Store import Store storage = Store(config) backend = storage[storage_source] except Exception: log(f"Storage backend '{storage_source}' not found", file=sys.stderr) return 1 # Get metadata from backend metadata = backend.get_metadata(file_hash) if not metadata: log( f"No metadata found for hash {file_hash[:8]}... in {storage_source}", file=sys.stderr, ) return 1 # Extract title from tags if available title = get_field(result, "title") or file_hash[:16] # Get tags from input result item_tags = get_field(result, "tag") or get_field(result, "tags") or [] if not isinstance(item_tags, list): item_tags = [str(item_tags)] else: item_tags = [str(t) for t in item_tags] # Extract tags from metadata response instead of making a separate get_tag() request # This prevents duplicate API calls to Hydrus (metadata already includes tags) metadata_tags = metadata.get("tags") if isinstance(metadata_tags, dict): # metadata["tags"] is {service_key: {service_data}} for service_data in metadata_tags.values(): if isinstance(service_data, dict): display_tags = service_data.get("display_tags", {}) if isinstance(display_tags, dict): # display_tags is typically {status: tag_list} for tag_list in display_tags.values(): if isinstance(tag_list, list): for t in tag_list: ts = str(t) if t else "" if ts and ts not in item_tags: item_tags.append(ts) # Check for title tag if not get_field(result, "title") and ts.lower().startswith("title:"): parts = ts.split(":", 1) if len(parts) > 1: title = parts[1].strip() break # Only use first status level if any(t for t in item_tags if str(t).lower().startswith("title:")): break # Found title tag, stop searching services # Extract metadata fields mime_type = metadata.get("mime") or metadata.get("ext", "") file_ext = metadata.get("ext", "") # Extract file extension separately file_size = metadata.get("size") duration_seconds = metadata.get("duration") if duration_seconds is None: duration_seconds = metadata.get("duration_seconds") if duration_seconds is None: duration_seconds = metadata.get("length") if duration_seconds is None and isinstance(metadata.get("duration_ms"), (int, float)): try: duration_seconds = float(metadata["duration_ms"]) / 1000.0 except Exception: duration_seconds = None if isinstance(duration_seconds, str): s = duration_seconds.strip() if s: try: duration_seconds = float(s) except ValueError: if ":" in s: parts = [p.strip() for p in s.split(":") if p.strip()] if len(parts) in {2, 3} and all(p.isdigit() for p in parts): nums = [int(p) for p in parts] if len(nums) == 2: duration_seconds = float(nums[0] * 60 + nums[1]) else: duration_seconds = float( nums[0] * 3600 + nums[1] * 60 + nums[2] ) else: duration_seconds = None pages = metadata.get("pages") url = metadata.get("url") or [] imported_ts = self._extract_imported_ts(metadata) # Normalize url if isinstance(url, str): try: url = json.loads(url) except (json.JSONDecodeError, TypeError): url = [] if not isinstance(url, list): url = [] # Build display row row = self._build_table_row( title=title, store=storage_source, path=metadata.get("path", ""), mime=mime_type, size_bytes=file_size, dur_seconds=duration_seconds, imported_ts=imported_ts, url=url, hash_value=file_hash, pages=pages, tag=item_tags, ext=file_ext, ) table_title = f"get-metadata: {title}" if title else "get-metadata" table = Table(table_title ).init_command(table_title, "get-metadata", list(args)) self._add_table_body_row(table, row) # Use helper to display item and make it @-selectable from ._shared import display_and_persist_items display_and_persist_items([row], title=table_title, subject=row) ctx.emit(row) return 0 except KeyError: log(f"Storage backend '{storage_source}' not found", file=sys.stderr) return 1 except Exception as exc: log(f"Failed to get metadata: {exc}", file=sys.stderr) return 1 CMDLET = Get_Metadata()