Compare commits

8 Commits
m ... main

Author SHA1 Message Date
cef42cd54a f 2026-01-12 23:30:21 -08:00
3bd36baf5a f 2026-01-12 20:50:39 -08:00
bb4ac1f36b h 2026-01-12 20:50:29 -08:00
b5247936a4 f 2026-01-12 20:33:14 -08:00
8b7f518725 h 2026-01-12 20:26:45 -08:00
749ffb7e34 f 2026-01-12 20:10:18 -08:00
2870abf4de j 2026-01-12 20:01:45 -08:00
be55e6e450 Update readme.md 2026-01-12 22:58:35 -05:00
12 changed files with 624 additions and 152 deletions

View File

@@ -25,7 +25,8 @@ from SYS.utils import sha256_file, expand_path
from SYS.logger import debug as mm_debug
logger = logging.getLogger(__name__)
WORKER_LOG_MAX_ENTRIES = 99
WORKER_LOG_MAX_ENTRIES = 50 # Reduced from 99 to keep log size down
MAX_FINISHED_WORKERS = 100 # Only keep 100 finished workers globally
# Helper: decorate DB write methods to retry transient SQLITE 'database is locked' errors
def _db_retry(max_attempts: int = 6, base_sleep: float = 0.1):
@@ -263,7 +264,31 @@ class API_folder_store:
try:
# Ensure the library root exists; sqlite cannot create parent dirs.
try:
# User safety: Folder store must be created in a blank folder/no files in it.
# If the DB already exists, we skip this check (it's an existing library).
should_check_empty = not self.db_path.exists()
self.library_root.mkdir(parents=True, exist_ok=True)
if should_check_empty:
# Check if there are any files or directories in the library root (excluding the DB itself if it was just created)
# We use a generator and next() for efficiency.
existing_items = [item for item in self.library_root.iterdir() if item.name != self.DB_NAME]
if existing_items:
# Log the items found for debugging
item_names = [i.name for i in existing_items[:5]]
if len(existing_items) > 5:
item_names.append("...")
raise RuntimeError(
f"Safety Check Failed: Local library root must be empty for new stores.\n"
f"Directory: {self.library_root}\n"
f"Found {len(existing_items)} items: {', '.join(item_names)}\n"
f"Please use a clean directory to prevent accidental hashing of existing files."
)
except RuntimeError:
# Re-raise our specific safety error
raise
except Exception as exc:
raise RuntimeError(
f"Cannot create/open library root directory: {self.library_root}: {exc}"
@@ -280,10 +305,22 @@ class API_folder_store:
)
self.connection.row_factory = sqlite3.Row
# Enable Write-Ahead Logging (WAL) for better concurrency
# Performance & Size Optimizations
# 1. WAL mode for better concurrency and fewer locks
self.connection.execute("PRAGMA journal_mode=WAL")
# Enable foreign keys
# 2. auto_vacuum=FULL to automatically reclaim space from deleted rows/logs
self.connection.execute("PRAGMA auto_vacuum = FULL")
# 3. Increase page size for modern file systems
self.connection.execute("PRAGMA page_size = 4096")
# 4. Memory and Sync optimizations
self.connection.execute("PRAGMA synchronous = NORMAL")
self.connection.execute("PRAGMA temp_store = MEMORY")
self.connection.execute("PRAGMA cache_size = -2000")
# Use memory mapping for the entire DB (up to 30MB) for near-instant reads
self.connection.execute("PRAGMA mmap_size = 30000000")
# 5. Standard features
self.connection.execute("PRAGMA foreign_keys = ON")
# Bound how long sqlite will wait on locks before raising.
try:
self.connection.execute("PRAGMA busy_timeout = 5000")
@@ -291,6 +328,10 @@ class API_folder_store:
pass
self._create_tables()
# Run maintenance if the DB has grown suspiciously large
self._run_maintenance_if_needed()
logger.info(f"Database initialized at {self.db_path}")
except Exception as e:
logger.error(f"Failed to initialize database: {e}", exc_info=True)
@@ -302,6 +343,84 @@ class API_folder_store:
self.connection = None
raise
def _run_maintenance_if_needed(self) -> None:
"""Perform a one-time VACUUM if the database file is large."""
try:
if not self.db_path.exists():
return
# Global cleanup of old workers and logs regardless of size
self._global_cleanup()
# If the database is larger than 30MB, run a vacuum to ensure space is reclaimed.
# We only do this on startup to minimize performance impact.
file_stats = self.db_path.stat()
size_mb = file_stats.st_size / (1024 * 1024)
if size_mb > 30:
logger.debug(f"Database size ({size_mb:.1f}MB) exceeds maintenance threshold. Vacuuming...")
# We use a cursor to avoid blocking the main connection state if possible
self.connection.execute("VACUUM")
# Also optimize the query planner indices
self.connection.execute("ANALYZE")
new_size_mb = self.db_path.stat().st_size / (1024 * 1024)
reduction = size_mb - new_size_mb
if reduction > 1.0:
logger.info(f"Maintenance reclaimed {reduction:.1f}MB. Current size: {new_size_mb:.1f}MB")
except Exception as e:
# Maintenance should never block application startup
logger.warning(f"Database maintenance skipped: {e}")
def _global_cleanup(self) -> None:
"""Aggressively prune old workers and logs to prevent database bloat."""
try:
cursor = self.connection.cursor()
# 1. Prune finished/failed workers older than MAX_FINISHED_WORKERS
# We keep the newest ones based on completed_at or started_at
cursor.execute(
"""
DELETE FROM worker
WHERE status != 'running'
AND id NOT IN (
SELECT id FROM worker
WHERE status != 'running'
ORDER BY COALESCE(completed_at, started_at) DESC
LIMIT ?
)
""",
(MAX_FINISHED_WORKERS,)
)
worker_deletes = cursor.rowcount
# 2. Orphans check: Remove logs that no longer have a parent worker
cursor.execute(
"DELETE FROM worker_log WHERE worker_id NOT IN (SELECT worker_id FROM worker)"
)
log_orphans = cursor.rowcount
# 3. Global log limit: Ensure we don't have millions of log rows even if workers are within limit
# Limit total log entries to something reasonable like 5,000
cursor.execute(
"""
DELETE FROM worker_log
WHERE id NOT IN (
SELECT id FROM worker_log
ORDER BY created_at DESC
LIMIT 5000
)
"""
)
log_limit_deletes = cursor.rowcount
if worker_deletes > 0 or log_orphans > 0 or log_limit_deletes > 0:
logger.info(f"Global cleanup: Removed {worker_deletes} workers and {log_orphans + log_limit_deletes} log entries.")
self.connection.commit()
except Exception as e:
logger.warning(f"Global cleanup failed: {e}")
def _create_tables(self) -> None:
"""Create database tables if they don't exist."""
cursor = self.connection.cursor()
@@ -393,6 +512,15 @@ class API_folder_store:
# Notes indices (after migration so columns exist)
cursor.execute("CREATE INDEX IF NOT EXISTS idx_note_hash ON note(hash)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_note_name ON note(name)")
# Additional optimizations for search speed
# Covering index for tags helps query 'tags for hash' without hitting the table
cursor.execute("CREATE INDEX IF NOT EXISTS idx_tag_covering ON tag(hash, tag)")
# Index on metadata size and imports for common sorting
cursor.execute("CREATE INDEX IF NOT EXISTS idx_metadata_size ON metadata(size)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_metadata_imported ON metadata(time_imported)")
self.connection.commit()
logger.debug("Database tables created/verified")
@@ -1914,8 +2042,17 @@ class API_folder_store:
total_steps
),
)
worker_rowid = cursor.lastrowid or 0
# Prune occasionally (1 in 50 chance) or just run it to keep it clean
# Running it every time might be overkill, but let's do a light version
cursor.execute(
"DELETE FROM worker WHERE status != 'running' AND id < (SELECT MAX(id) - ? FROM worker)",
(MAX_FINISHED_WORKERS * 2,)
)
self.connection.commit()
return cursor.lastrowid or 0
return worker_rowid
except sqlite3.IntegrityError:
return self.update_worker_status(worker_id, "running")
except Exception as e:

View File

@@ -693,6 +693,58 @@ def clear_config_cache() -> None:
_CONFIG_CACHE.clear()
def _validate_config_safety(config: Dict[str, Any]) -> None:
"""Check for dangerous configurations, like folder stores in non-empty dirs."""
store = config.get("store")
if not isinstance(store, dict):
return
folder_stores = store.get("folder")
if not isinstance(folder_stores, dict):
return
for name, cfg in folder_stores.items():
if not isinstance(cfg, dict):
continue
path_str = cfg.get("path") or cfg.get("PATH")
if not path_str:
continue
try:
p = expand_path(path_str).resolve()
# If the path doesn't exist yet, it's fine (will be created empty)
if not p.exists():
continue
if not p.is_dir():
continue
# DB name from API/folder.py
db_file = p / "medios-macina.db"
if db_file.exists():
# Existing portable library, allowed to re-attach
continue
# Check if directory has any files (other than the DB we just checked)
items = list(p.iterdir())
if items:
item_names = [i.name for i in items[:3]]
if len(items) > 3:
item_names.append("...")
raise RuntimeError(
f"Configuration Error: Local library '{name}' target directory is not empty.\n"
f"Path: {p}\n"
f"Found {len(items)} items: {', '.join(item_names)}\n"
f"To prevent accidental mass-hashing, new libraries must be set to unique, empty folders."
)
except RuntimeError:
raise
except Exception:
# We don't want to crash on invalid paths during validation if they aren't 'unsafe'
pass
def save_config(
config: Dict[str, Any],
config_dir: Optional[Path] = None,
@@ -706,6 +758,9 @@ def save_config(
f"Unsupported config format: {config_path.name} (only .conf is supported)"
)
# Safety Check: Validate folder stores are in empty dirs or existing libraries
_validate_config_safety(config)
try:
config_path.write_text(_serialize_conf(config), encoding="utf-8")
except OSError as exc:

View File

@@ -34,6 +34,14 @@ except ImportError:
TEXTUAL_AVAILABLE = False
# Import ResultModel from the API for unification
try:
from SYS.result_table_api import ResultModel
except ImportError:
# Fallback if not available yet in directory structure (unlikely)
ResultModel = None
def _sanitize_cell_text(value: Any) -> str:
"""Coerce to a single-line, tab-free string suitable for terminal display."""
if value is None:
@@ -741,8 +749,11 @@ class ResultTable:
row = self.add_row()
row.payload = result
# Handle ResultModel from the new strict API (SYS/result_table_api.py)
if ResultModel and isinstance(result, ResultModel):
self._add_result_model(row, result)
# Handle TagItem from get_tag.py (tag display with index)
if hasattr(result, "__class__") and result.__class__.__name__ == "TagItem":
elif hasattr(result, "__class__") and result.__class__.__name__ == "TagItem":
self._add_tag_item(row, result)
# Handle ResultItem from search_file.py (compact display)
elif hasattr(result, "__class__") and result.__class__.__name__ == "ResultItem":
@@ -781,6 +792,62 @@ class ResultTable:
payloads.append(payload)
return payloads
@classmethod
def from_api_table(cls, api_table: Any) -> "ResultTable":
"""Convert a strict SYS.result_table_api.ResultTable into an interactive monolith ResultTable.
This allows providers using the new strict API to benefit from the monolith's
interactive selection (@N) and rich layout features.
"""
# Duck typing check instead of strict isinstance to keep dependencies light
if not hasattr(api_table, "rows") or not hasattr(api_table, "columns"):
return cls(str(api_table))
title = getattr(api_table, "provider", "")
# Try to get provider metadata title if available
meta = getattr(api_table, "meta", {})
if meta and isinstance(meta, dict):
title = meta.get("title") or title
instance = cls(title)
# Import adapters if we want to extract selection args automatically
# but let's keep it simple: we rely on add_result logic for most things.
# Iterate rows and build interactive ones
for r in api_table.rows:
row = instance.add_row()
row.payload = r
# Use columns defined in the API table
for col in api_table.columns:
try:
val = col.extractor(r)
if col.format_fn:
val = col.format_fn(val)
row.add_column(col.header, val)
except Exception:
pass
return instance
def _add_result_model(self, row: ResultRow, result: ResultModel) -> None:
"""Extract and add ResultModel fields from the new API to row."""
row.add_column("Title", result.title)
if result.ext:
row.add_column("Ext", result.ext)
if result.size_bytes is not None:
# Use the existing format_mb helper in this file
row.add_column("Size", format_mb(result.size_bytes))
if result.source:
row.add_column("Source", result.source)
# Add a placeholder for metadata-like display if needed in the main table
# but usually metadata is handled by the detail panel now
def _add_search_result(self, row: ResultRow, result: Any) -> None:
"""Extract and add SearchResult fields to row."""
cols = getattr(result, "columns", None)
@@ -1828,3 +1895,230 @@ def format_result(result: Any, title: str = "") -> str:
table.add_result(result)
return str(table)
def extract_item_metadata(item: Any) -> Dict[str, Any]:
"""Extract a comprehensive set of metadata from an item for the ItemDetailView.
Now supports SYS.result_table_api.ResultModel as a first-class input.
"""
if item is None:
return {}
out = {}
# Handle ResultModel specifically for better detail display
if ResultModel and isinstance(item, ResultModel):
if item.title: out["Title"] = item.title
if item.path: out["Path"] = item.path
if item.ext: out["Ext"] = item.ext
if item.size_bytes: out["Size"] = format_mb(item.size_bytes)
if item.source: out["Store"] = item.source
# Merge internal metadata dict
if item.metadata:
for k, v in item.metadata.items():
# Convert keys to readable labels (snake_case -> Title Case)
label = str(k).replace("_", " ").title()
if label not in out and v is not None:
out[label] = v
# URLs/Tags/Relations from metadata if present
data = item.metadata or {}
url = _get_first_dict_value(data, ["url", "URL"])
if url: out["Url"] = url
rels = _get_first_dict_value(data, ["relationships", "rel"])
if rels: out["Relations"] = rels
tags = _get_first_dict_value(data, ["tags", "tag"])
if tags: out["Tags"] = tags
return out
# Fallback to existing extraction logic for legacy objects/dicts
# Use existing extractors from match-standard result table columns
title = extract_title_value(item)
if title:
out["Title"] = title
else:
# Fallback for raw dicts
data = _as_dict(item) or {}
t = data.get("title") or data.get("name") or data.get("TITLE")
if t: out["Title"] = t
hv = extract_hash_value(item)
if hv: out["Hash"] = hv
store = extract_store_value(item)
if store: out["Store"] = store
# Path/Target
data = _as_dict(item) or {}
path = data.get("path") or data.get("target") or data.get("filename")
if path: out["Path"] = path
ext = extract_ext_value(item)
if ext:
out["Ext"] = ext
else:
e = data.get("ext") or data.get("extension")
if e: out["Ext"] = e
size = extract_size_bytes_value(item)
if size:
out["Size"] = size
else:
s = data.get("size") or data.get("size_bytes")
if s: out["Size"] = s
# Duration
dur = _get_first_dict_value(data, ["duration_seconds", "duration"])
if dur:
out["Duration"] = _format_duration_hms(dur)
# URL
url = _get_first_dict_value(data, ["url", "URL"])
if url:
out["Url"] = url
else:
out["Url"] = None # Explicitly None for <null> display
# Relationships
rels = _get_first_dict_value(data, ["relationships", "rel"])
if rels:
out["Relations"] = rels
else:
out["Relations"] = None
# Tags Summary
tags = _get_first_dict_value(data, ["tags", "tag"])
if tags: out["Tags"] = tags
return out
class ItemDetailView(ResultTable):
"""A specialized view that displays item details alongside a list of related items (tags, urls, etc).
This is used for 'get-tag', 'get-url' and similar cmdlets where we want to contextually show
what is being operated on (the main item) along with the selection list.
"""
def __init__(
self,
title: str = "",
item_metadata: Optional[Dict[str, Any]] = None,
**kwargs
):
super().__init__(title, **kwargs)
self.item_metadata = item_metadata or {}
def to_rich(self):
"""Render the item details panel above the standard results table."""
from rich.table import Table as RichTable
from rich.panel import Panel
from rich.console import Group
from rich.columns import Columns
from rich.text import Text
# 1. Create Detail Grid (matching rich_display.py style)
details_table = RichTable.grid(expand=True, padding=(0, 2))
details_table.add_column("Key", style="cyan", justify="right", width=15)
details_table.add_column("Value", style="white")
# Canonical display order for metadata
order = ["Title", "Hash", "Store", "Path", "Ext", "Size", "Duration", "Url", "Relations"]
has_details = False
# Add ordered items first
for key in order:
val = self.item_metadata.get(key)
if val is None:
val = self.item_metadata.get(key.lower())
if val is None:
val = self.item_metadata.get(key.upper())
# Special formatting for certain types
if key == "Title" and val:
val = f"[bold]{val}[/bold]"
if key == "Size" and val and isinstance(val, (int, float, str)) and str(val).isdigit():
val = _format_size(int(val), integer_only=False)
if key == "Relations" and isinstance(val, list) and val:
if isinstance(val[0], dict):
val = "\n".join([f"[dim]→[/dim] {r.get('type','rel')}: {r.get('title','?')}" for r in val])
else:
val = "\n".join([f"[dim]→[/dim] {r}" for r in val])
if val is not None and val != "":
details_table.add_row(f"{key}:", str(val))
has_details = True
elif key in ["Url", "Relations"]:
# User requested <null> for these if blank
details_table.add_row(f"{key}:", "[dim]<null>[/dim]")
has_details = True
# Add any remaining metadata not in the canonical list
for k, v in self.item_metadata.items():
k_norm = k.lower()
if k_norm not in [x.lower() for x in order] and v and k_norm not in ["tags", "tag"]:
label = k.capitalize() if len(k) > 1 else k.upper()
details_table.add_row(f"{label}:", str(v))
has_details = True
# Tags Summary
tags = self.item_metadata.get("Tags") or self.item_metadata.get("tags") or self.item_metadata.get("tag")
if tags and isinstance(tags, (list, str)):
if isinstance(tags, str):
tags = [t.strip() for t in tags.split(",") if t.strip()]
tags_sorted = sorted(map(str, tags))
tag_cols = Columns([f"[dim]#[/dim]{t}" for t in tags_sorted], equal=True, expand=True)
details_table.add_row("", "") # Spacer
details_table.add_row("Tags:", tag_cols)
has_details = True
# 2. Get the standard table render (if there are rows or a specific title)
original_title = self.title
original_header_lines = self.header_lines
self.title = ""
self.header_lines = []
results_renderable = None
# We only show the results panel if there's data OR if the user explicitly set a title (cmdlet mode)
if self.rows or original_title:
self.title = original_title
try:
results_renderable = super().to_rich()
finally:
self.title = "" # Keep it clean for element assembly
# 3. Assemble components
elements = []
if has_details:
elements.append(Panel(
details_table,
title="[bold green]Item Details[/bold green]",
border_style="green",
padding=(1, 2)
))
if results_renderable:
# If it's a Panel already (from super().to_rich() with title), use it directly
# but force the border style to green for consistency
if isinstance(results_renderable, Panel):
results_renderable.border_style = "green"
# Add a bit of padding inside if it contains a table
elements.append(results_renderable)
else:
# Wrap the raw table/text in a titled panel
display_title = "Items"
if original_title:
display_title = original_title
# Add a bit of padding
results_group = Group(Text(""), results_renderable, Text(""))
elements.append(Panel(results_group, title=display_title, border_style="green"))
return Group(*elements)

View File

@@ -262,88 +262,20 @@ def render_image_to_console(image_path: str | Path, max_width: int | None = None
def render_item_details_panel(item: Dict[str, Any]) -> None:
"""Render a comprehensive details panel for a result item."""
from rich.table import Table
from rich.columns import Columns
title = (
item.get("title")
or item.get("name")
or item.get("TITLE")
or "Unnamed Item"
)
# Main layout table for the panel
details_table = Table.grid(expand=True)
details_table.add_column(style="cyan", no_wrap=True, width=15)
details_table.add_column(style="white")
# Basic Info
details_table.add_row("Title", f"[bold]{title}[/bold]")
"""Render a comprehensive details panel for a result item using unified ItemDetailView."""
from SYS.result_table import ItemDetailView, extract_item_metadata
if "store" in item:
details_table.add_row("Store", str(item["store"]))
metadata = extract_item_metadata(item)
if "hash" in item:
details_table.add_row("Hash", str(item["hash"]))
# Create a specialized view with no results rows (only the metadata panel)
# We set no_choice=True to hide the "#" column (not that there are any rows).
view = ItemDetailView(item_metadata=metadata).set_no_choice(True)
# Metadata / Path
if "path" in item or "target" in item:
path = item.get("path") or item.get("target")
details_table.add_row("Path", str(path))
if "ext" in item or "extension" in item:
ext = item.get("ext") or item.get("extension")
details_table.add_row("Extension", str(ext))
if "size_bytes" in item or "size" in item:
size = item.get("size_bytes") or item.get("size")
if isinstance(size, (int, float)):
if size > 1024 * 1024 * 1024:
size_str = f"{size / (1024*1024*1024):.1f} GB"
elif size > 1024 * 1024:
size_str = f"{size / (1024*1024):.1f} MB"
elif size > 1024:
size_str = f"{size / 1024:.1f} KB"
else:
size_str = f"{size} bytes"
details_table.add_row("Size", size_str)
# URL(s)
urls = item.get("url") or item.get("URL") or []
if isinstance(urls, str):
urls = [urls]
if isinstance(urls, list) and urls:
url_text = "\n".join(map(str, urls))
details_table.add_row("URL(s)", url_text)
# Tags
tags = item.get("tag") or item.get("tags") or []
if isinstance(tags, str):
tags = [tags]
if isinstance(tags, list) and tags:
# Sort and filter tags to look nice
tags_sorted = sorted(map(str, tags))
# Group tags by namespace if they have them
tag_cols = Columns([f"[dim]#[/dim]{t}" for t in tags_sorted], equal=True, expand=True)
details_table.add_row("", "") # Spacer
details_table.add_row("Tags", tag_cols)
# Relationships (if any)
rels = item.get("relationships") or item.get("rel") or []
if isinstance(rels, list) and rels:
rel_text = "\n".join([f"[dim]→[/dim] {r}" for r in rels])
details_table.add_row("Relations", rel_text)
panel = Panel(
details_table,
title=f"[bold green]Item Details[/bold green]",
border_style="green",
padding=(1, 2),
expand=True
)
# We want to print ONLY the elements from ItemDetailView, so we don't use stdout_console().print(view)
# as that would include the (empty) results panel.
# Actually, let's just use to_rich and print it.
stdout_console().print()
stdout_console().print(panel)
stdout_console().print(view.to_rich())

View File

@@ -450,12 +450,15 @@ class ConfigModal(ModalScreen):
elif bid == "save-btn":
if not self.validate_current_editor():
return
self.save_all()
self.notify("Configuration saved!")
# Return to the main list view within the current category
self.editing_item_name = None
self.editing_item_type = None
self.refresh_view()
try:
self.save_all()
self.notify("Configuration saved!")
# Return to the main list view within the current category
self.editing_item_name = None
self.editing_item_type = None
self.refresh_view()
except Exception as exc:
self.notify(f"Save failed: {exc}", severity="error", timeout=10)
elif bid in self._button_id_map:
action, itype, name = self._button_id_map[bid]
if action == "edit":

View File

@@ -1634,7 +1634,15 @@ class Add_File(Cmdlet):
c for c in title_value if c.isalnum() or c in " ._-()[]{}'`"
).strip()
base_name = safe_title or media_path.stem
new_name = base_name + media_path.suffix
# Fix to prevent double extensions (e.g., file.exe.exe)
# If the base name already ends with the extension of the media file,
# don't append it again.
file_ext = media_path.suffix
if file_ext and base_name.lower().endswith(file_ext.lower()):
new_name = base_name
else:
new_name = base_name + file_ext
destination_root.mkdir(parents=True, exist_ok=True)
target_path = destination_root / new_name

View File

@@ -669,17 +669,34 @@ class Add_Tag(Cmdlet):
# treat add-tag as a pipeline mutation (carry tags forward for add-file) instead of a store write.
if not store_override:
store_name_str = str(store_name) if store_name is not None else ""
local_mode_requested = (
(not store_name_str) or (store_name_str.upper() == "PATH")
or (store_name_str.lower() == "local")
)
is_known_backend = bool(store_name_str) and store_registry.is_available(
store_name_str
)
is_known_backend = False
try:
is_known_backend = bool(store_name_str) and store_registry.is_available(
store_name_str
)
except Exception:
pass
if local_mode_requested and raw_path:
# If the item isn't in a configured store backend yet (e.g., store=PATH),
# treat add-tag as a pipeline mutation (carry tags forward for add-file)
# instead of a store write.
if not is_known_backend:
try:
if Path(str(raw_path)).expanduser().exists():
# We allow metadata updates even if file doesn't exist locally,
# but check path existence if valid path provided.
proceed_local = True
if raw_path:
try:
if not Path(str(raw_path)).expanduser().exists():
# If path is provided but missing, we might prefer skipping?
# But for pipeline metadata, purely missing file shouldn't block tagging.
# So we allow it.
pass
except Exception:
pass
if proceed_local:
existing_tag_list = _extract_item_tags(res)
existing_lower = {
t.lower()
@@ -799,14 +816,9 @@ class Add_Tag(Cmdlet):
except Exception:
pass
if local_mode_requested:
log(
"[add_tag] Error: Missing usable local path for tagging (or provide -store)",
file=sys.stderr,
)
return 1
if store_name_str and not is_known_backend:
# If it's not a known backend and we didn't handle it above as a local/pipeline
# metadata edit, then it's an error.
log(
f"[add_tag] Error: Unknown store '{store_name_str}'. Available: {store_registry.list_backends()}",
file=sys.stderr,

View File

@@ -101,7 +101,20 @@ class Get_Note(Cmdlet):
store_registry = Store(config)
any_notes = False
display_items: List[Dict[str, Any]] = []
note_table: Optional[ResultTable] = None
# We assume single subject for get-note detail view
main_res = results[0]
from SYS.result_table import ItemDetailView, extract_item_metadata
metadata = extract_item_metadata(main_res)
note_table = (
ItemDetailView("Notes", item_metadata=metadata)
.set_table("note")
.set_value_case("preserve")
.set_preserve_order(True)
)
note_table.set_source_command("get-note", [])
for res in results:
if not isinstance(res, dict):
@@ -125,6 +138,12 @@ class Get_Note(Cmdlet):
)
if not resolved_hash:
continue
# Update metadata if we resolved a hash that wasn't in source
if resolved_hash and not metadata.get("Hash"):
metadata["Hash"] = resolved_hash
if store_name and not metadata.get("Store"):
metadata["Store"] = store_name
try:
backend = store_registry[store_name]
@@ -148,13 +167,6 @@ class Get_Note(Cmdlet):
continue
any_notes = True
if note_table is None:
note_table = (
ResultTable("note")
.set_table("note")
.set_value_case("preserve")
.set_preserve_order(True)
)
# Emit each note as its own row so CLI renders a proper note table
for k in sorted(notes.keys(), key=lambda x: str(x).lower()):
v = notes.get(k)
@@ -176,13 +188,18 @@ class Get_Note(Cmdlet):
}
display_items.append(payload)
if note_table is not None:
note_table.add_result(payload)
row = note_table.add_row()
row.add_column("Name", str(k))
row.add_column("Text", preview.strip())
ctx.emit(payload)
# Always set the table overlay even if empty to show item details
ctx.set_last_result_table_overlay(note_table, display_items, subject=result)
if not any_notes:
ctx.emit("No notes found.")
elif note_table is not None:
ctx.set_last_result_table(note_table, display_items, subject=result)
log("No notes found.")
return 0

View File

@@ -499,22 +499,20 @@ def _run(result: Any, _args: Sequence[str], config: Dict[str, Any]) -> int:
if not found_relationships:
log(f"Hydrus relationships fetch failed: {exc}", file=sys.stderr)
if not found_relationships:
try:
from rich.panel import Panel
from SYS.rich_display import stdout_console
title = source_title or (hash_hex[:16] + "..." if hash_hex else "Item")
stdout_console().print(
Panel(f"{title} has no relationships",
title="Relationships")
)
except Exception:
log("No relationships found.")
return 0
# Display results
table = ResultTable(f"Relationships: {source_title}"
from SYS.result_table import ItemDetailView, extract_item_metadata
# Prepare metadata for the detail view
metadata = extract_item_metadata(result)
if hash_hex:
metadata["Hash"] = hash_hex
# Overlays
if source_title and source_title != "Unknown":
metadata["Title"] = source_title
table = ItemDetailView(f"Relationships", item_metadata=metadata
).init_command("get-relationship",
[])
@@ -568,11 +566,15 @@ def _run(result: Any, _args: Sequence[str], config: Dict[str, Any]) -> int:
f"hash:{item['hash']}"]
)
ctx.set_last_result_table(table, pipeline_results)
# Ensure empty state is still navigable/visible
ctx.set_last_result_table_overlay(table, pipeline_results)
from SYS.rich_display import stdout_console
stdout_console().print(table)
if not found_relationships:
log("No relationships found.")
return 0

View File

@@ -322,15 +322,23 @@ def _emit_tags_as_table(
This replaces _print_tag_list to make tags pipe-able.
Stores the table via ctx.set_last_result_table_overlay (or ctx.set_last_result_table) for downstream @ selection.
"""
from SYS.result_table import ResultTable
from SYS.result_table import ItemDetailView, extract_item_metadata
# Create ResultTable with just tag column (no title)
# Keep the title stable and avoid including hash fragments.
table_title = "tag"
# Prepare metadata for the detail view
metadata = extract_item_metadata(subject)
# Overlays/Overrides from explicit args if subject was partial
if item_title:
table_title = f"tag: {item_title}"
metadata["Title"] = item_title
if file_hash:
metadata["Hash"] = file_hash
if store:
metadata["Store"] = service_name if service_name else store
if path:
metadata["Path"] = path
table = ResultTable(table_title, max_columns=1)
# Create ItemDetailView
table = ItemDetailView("Tags", item_metadata=metadata, max_columns=1)
table.set_source_command("get-tag", [])
# Create TagItem for each tag
@@ -1745,10 +1753,7 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
storage = Store(config)
backend = storage[store_name]
current, source = backend.get_tag(file_hash, config=config)
if not current:
log("No tags found", file=sys.stderr)
return 1
current = list(current or [])
service_name = ""
except KeyError:

View File

@@ -421,14 +421,20 @@ class Get_Url(Cmdlet):
from SYS.metadata import normalize_urls
urls = normalize_urls(urls)
title = str(get_field(result, "title") or "").strip()
table_title = "Title"
if title:
table_title = f"Title: {title}"
from SYS.result_table import ItemDetailView, extract_item_metadata
# Prepare metadata for the detail view
metadata = extract_item_metadata(result)
if file_hash:
metadata["Hash"] = file_hash
if store_name:
metadata["Store"] = store_name
table = (
ResultTable(
table_title,
ItemDetailView(
"Urls",
item_metadata=metadata,
max_columns=1
).set_preserve_order(True).set_table("url").set_value_case("preserve")
)
@@ -447,13 +453,14 @@ class Get_Url(Cmdlet):
# Use overlay mode to avoid "merging" with the previous status/table state.
# This is idiomatic for detail views and prevents the search table from being
# contaminated by partial re-renders.
ctx.set_last_result_table_overlay(table if items else None, items, subject=result)
ctx.set_last_result_table_overlay(table, items, subject=result)
# Emit items at the end for pipeline continuity
for item in items:
ctx.emit(item)
if not items:
# Still log it but the panel will show the item context
log("No url found", file=sys.stderr)
return 0

View File

@@ -42,7 +42,7 @@ Medios-Macina is a CLI file media manager and toolkit focused on downloading, ta
<details>
<summary>installation console command</summary>
<pre><code>git clone https://code.glowers.club/goyimnose/Medios-Macina.git
<pre><code>git clone --depth 1 https://code.glowers.club/goyimnose/Medios-Macina.git
python Medios-Macina/scripts/bootstrap.py
</code></pre>
</details>