diff --git a/PluginCore/backend_registry.py b/PluginCore/backend_registry.py index 6d16931..e5e1a5d 100644 --- a/PluginCore/backend_registry.py +++ b/PluginCore/backend_registry.py @@ -6,6 +6,7 @@ Backends are discovered from their owning plugins and instantiated from config. from __future__ import annotations import importlib +import importlib.util import inspect import re from typing import Any, Dict, Optional, Type @@ -119,6 +120,16 @@ def _discover_plugin_backend_class(backend_type: str) -> Optional[Type[BackendBa return resolved +def _plugin_module_exists(plugin_name: str) -> bool: + normalized = _normalize_backend_type(plugin_name) + if not normalized: + return False + try: + return importlib.util.find_spec(f"plugins.{normalized}") is not None + except Exception: + return False + + def _resolve_backend_class(backend_type: str) -> Optional[Type[BackendBase]]: normalized = _normalize_backend_type(backend_type) if not normalized: @@ -200,7 +211,11 @@ class BackendRegistry: continue backend_cls = _resolve_backend_class(backend_type) if backend_cls is None: - if backend_type not in _PROVIDER_ONLY_BACKEND_NAMES and not self._suppress_debug: + if ( + backend_type not in _PROVIDER_ONLY_BACKEND_NAMES + and not self._suppress_debug + and not _plugin_module_exists(backend_type) + ): debug(f"[BackendRegistry] Unknown backend type '{raw_backend_type}'") continue diff --git a/SYS/worker.py b/SYS/worker.py index 96ef960..3bd9e08 100644 --- a/SYS/worker.py +++ b/SYS/worker.py @@ -185,7 +185,7 @@ class WorkerManagerRegistry: except Exception: from SYS.logger import logger logger.exception("Failed to close existing WorkerManager during registry ensure") - cls._manager = WorkerManager(resolved_root, auto_refresh_interval=0.5) + cls._manager = WorkerManager(auto_refresh_interval=0.5) cls._manager_root = resolved_root manager = cls._manager diff --git a/SYS/worker_manager.py b/SYS/worker_manager.py index 3bcbb98..e4ee8e0 100644 --- a/SYS/worker_manager.py +++ b/SYS/worker_manager.py @@ -277,6 +277,7 @@ class WorkerManager: self._lock = Lock() self.worker_handlers: Dict[str, WorkerLoggingHandler] = {} self._worker_last_step: Dict[str, str] = {} + self._db_lock = Lock() # Buffered stdout/log batching to reduce DB lock contention. self._stdout_buffers: Dict[Tuple[str, str], List[str]] = {} @@ -596,8 +597,30 @@ class WorkerManager: limit: int = 500) -> List[Dict[str, Any]]: """Fetch recorded worker timeline events.""" - with self._db_lock: - return self.db.get_worker_events(worker_id, limit) + try: + with self._db_lock: + rows = db.fetchall( + "SELECT content, channel, timestamp FROM worker_stdout WHERE worker_id = ? ORDER BY timestamp ASC LIMIT ?", + (worker_id, int(limit or 500)), + ) or [] + events: List[Dict[str, Any]] = [] + for row in rows: + payload = dict(row) + events.append( + { + "message": payload.get("content"), + "channel": payload.get("channel") or "stdout", + "created_at": payload.get("timestamp"), + "step": self._worker_last_step.get(worker_id), + } + ) + return events + except Exception as e: + logger.error( + f"[WorkerManager] Error getting worker events for {worker_id}: {e}", + exc_info=True, + ) + return [] def log_step(self, worker_id: str, step_text: str) -> bool: """Log a step to a worker's step history. @@ -610,10 +633,14 @@ class WorkerManager: True if successful """ try: - with self._db_lock: - success = self.db.append_worker_steps(worker_id, step_text) + step_value = str(step_text or "").strip() + if not step_value: + return True + + success = update_worker(worker_id, details=step_value) + append_worker_stdout(worker_id, step_value, channel="step") if success: - self._worker_last_step[worker_id] = step_text + self._worker_last_step[worker_id] = step_value return success except Exception as e: logger.error( @@ -637,7 +664,18 @@ class WorkerManager: """ try: with self._db_lock: - return self.db.get_worker_steps(worker_id) + rows = db.fetchall( + "SELECT content FROM worker_stdout WHERE worker_id = ? AND channel = 'step' ORDER BY timestamp ASC", + (worker_id,), + ) or [] + parts = [str(dict(row).get("content") or "").strip() for row in rows] + parts = [part for part in parts if part] + if parts: + return "\n".join(parts) + worker = db_get_worker(worker_id) + if isinstance(worker, dict): + return str(worker.get("details") or "") + return "" except Exception as e: logger.error( f"[WorkerManager] Error getting steps for worker {worker_id}: {e}", @@ -725,7 +763,17 @@ class WorkerManager: """ try: with self._db_lock: - count = self.db.cleanup_old_workers(days) + cutoff_days = max(0, int(days or 0)) + cutoff_expr = f"-{cutoff_days} days" + db.execute( + "DELETE FROM worker_stdout WHERE worker_id IN (SELECT id FROM workers WHERE status != 'running' AND updated_at < datetime('now', ?))", + (cutoff_expr,), + ) + cur = db.execute( + "DELETE FROM workers WHERE status != 'running' AND updated_at < datetime('now', ?)", + (cutoff_expr,), + ) + count = int(getattr(cur, "rowcount", 0) or 0) if count > 0: logger.info(f"[WorkerManager] Cleaned up {count} old workers") return count diff --git a/cmdlet/file/add.py b/cmdlet/file/add.py index 38aea01..e68e6de 100644 --- a/cmdlet/file/add.py +++ b/cmdlet/file/add.py @@ -42,6 +42,10 @@ from SYS.utils import sha256_file, unique_path, sanitize_filename # Canonical supported filetypes for all stores/cmdlets SUPPORTED_MEDIA_EXTENSIONS = ALL_SUPPORTED_EXTENSIONS +_SCREENSHOT_TIME_SUFFIX_RE = re.compile( + r"^(?P.+?)_(?P<label>(?:\d+h)?(?:\d+m)?\d+s)$", + flags=re.IGNORECASE, +) class _CommandDependencies: @@ -260,7 +264,7 @@ class Add_File(Cmdlet): # Initialize command-scope dependency context (caches Store/plugins) deps = _CommandDependencies(config) - storage_registry = deps.get_backend_registry() + storage_registry: Optional[BackendRegistry] = None source_arg = parsed.get("source") location = parsed.get("instance") @@ -363,9 +367,10 @@ class Add_File(Cmdlet): # Determine if -instance targets a registered backend (vs a filesystem export path). is_storage_backend_location = False - if location: + if location and not plugin_name: try: backend_registry_for_lookup = storage_registry or deps.get_backend_registry() + storage_registry = backend_registry_for_lookup is_storage_backend_location = Add_File._resolve_backend_by_name(backend_registry_for_lookup, str(location)) is not None except Exception: is_storage_backend_location = False @@ -418,6 +423,8 @@ class Add_File(Cmdlet): store_instance=storage_registry, deps=deps, ) + if plugin_storage_backend and storage_registry is None: + storage_registry = deps.get_backend_registry() effective_storage_backend_name = plugin_storage_backend or ( str(location) if location and is_storage_backend_location else None @@ -582,7 +589,7 @@ class Add_File(Cmdlet): ) for idx, item in enumerate(items_to_process, 1): - pipe_obj = coerce_to_pipe_object(item, path_arg) + pipe_obj = coerce_to_pipe_object(item, source_arg) if source_url_arg: try: @@ -1525,16 +1532,17 @@ class Add_File(Cmdlet): return f"Pipeline error: invalid add-file arguments: {exc}" deps = _CommandDependencies(cfg) - storage_registry = deps.get_backend_registry() + storage_registry: Optional[BackendRegistry] = None location = parsed.get("instance") plugin_instance = parsed.get("instance") plugin_name = parsed.get("plugin") is_storage_backend_location = False - if location: + if location and not plugin_name: try: backend_registry_for_lookup = storage_registry or deps.get_backend_registry() + storage_registry = backend_registry_for_lookup is_storage_backend_location = Add_File._resolve_backend_by_name( backend_registry_for_lookup, str(location), @@ -1615,7 +1623,7 @@ class Add_File(Cmdlet): explicit_instance = str(instance_name or "").strip() or None try: - backend_registry = store_instance if store_instance is not None else BackendRegistry(config) + backend_registry = store_instance if store_instance is not None else deps.get_backend_registry() except Exception: backend_registry = None @@ -2234,6 +2242,40 @@ class Add_File(Cmdlet): url_from_result = Add_File._get_url(result, pipe_obj) + def _has_namespace_tag(tags: Sequence[str], namespace: str) -> bool: + namespace_text = str(namespace or "").strip().lower() + if not namespace_text: + return False + prefix = f"{namespace_text}:" + for tag in tags or []: + text = str(tag or "").strip().lower() + if text.startswith(prefix): + return True + return False + + def _extract_screenshot_time_title() -> tuple[Optional[str], Optional[str]]: + # MPV screenshot saves use <title>_<2m46s>.png as a temp filename. + # When we ingest that into a tag-capable backend, recover the clean title + # and surface the capture position as a namespace tag instead of baking it + # into the stored display title. + current_title = str(preferred_title or "").strip() + filename_title = str(media_path.stem or "").strip() + if current_title and current_title != filename_title: + return None, None + if not url_from_result: + return None, None + suffix = str(media_path.suffix or "").strip().lower() + if suffix not in {".png", ".jpg", ".jpeg", ".webp", ".gif", ".bmp", ".avif", ".mhtml"}: + return None, None + match = _SCREENSHOT_TIME_SUFFIX_RE.match(str(media_path.stem or "").strip()) + if not match: + return None, None + title_text = str(match.group("title") or "").strip().replace("_", " ").strip() + label_text = str(match.group("label") or "").strip().lower() + if not title_text or not label_text: + return None, None + return title_text, label_text + preferred_title = pipe_obj.title if not preferred_title: for t in tags_from_result: @@ -2247,6 +2289,12 @@ class Add_File(Cmdlet): if preferred_title: preferred_title = preferred_title.replace("_", " ").strip() + derived_screenshot_title, derived_time_tag = _extract_screenshot_time_title() + if derived_screenshot_title and ( + not preferred_title or str(preferred_title or "").strip() == str(media_path.stem or "").strip() + ): + preferred_title = derived_screenshot_title + store = getattr(pipe_obj, "store", None) _, sidecar_hash, sidecar_tags, sidecar_url = Add_File._load_sidecar_bundle( media_path, store, config @@ -2279,6 +2327,9 @@ class Add_File(Cmdlet): case_sensitive=True ) + if derived_time_tag and not _has_namespace_tag(merged_tags, "time") and not _has_namespace_tag(merged_tags, "timestamp"): + merged_tags.append(f"time:{derived_time_tag}") + if preferred_title: merged_tags.append(f"title:{preferred_title}")