diff --git a/SYS/pipeline.py b/SYS/pipeline.py index 17cc539..5214121 100644 --- a/SYS/pipeline.py +++ b/SYS/pipeline.py @@ -1655,6 +1655,9 @@ class PipelineExecutor: if not selection_indices: return True, None + # ============================================================================ + # PHASE 1: Synchronize current stage table with display table + # ============================================================================ # Selection should operate on the *currently displayed* selectable table. # Some navigation flows (e.g. @.. back) can show a display table without # updating current_stage_table. Provider selectors rely on current_stage_table @@ -1689,6 +1692,60 @@ class PipelineExecutor: except Exception: logger.exception("Failed to sync current_stage_table from display/last table in _maybe_apply_initial_selection") + # ============================================================================ + # Helper functions for row action/args discovery (performance: inline caching) + # ============================================================================ + def _get_row_action(idx: int, items_cache: List[Any] | None = None) -> List[str] | None: + """Retrieve row selection_action from table or payload fallback.""" + try: + action = ctx.get_current_stage_table_row_selection_action(idx) + if action: + return [str(x) for x in action if x is not None] + except Exception: + pass + + # Fallback to serialized _selection_action in payload + if items_cache is None: + try: + items_cache = ctx.get_last_result_items() or [] + except Exception: + items_cache = [] + + if 0 <= idx < len(items_cache): + item = items_cache[idx] + if isinstance(item, dict): + candidate = item.get("_selection_action") + if isinstance(candidate, (list, tuple)): + return [str(x) for x in candidate if x is not None] + return None + + def _get_row_args(idx: int, items_cache: List[Any] | None = None) -> List[str] | None: + """Retrieve row selection_args from table or payload fallback.""" + try: + args = ctx.get_current_stage_table_row_selection_args(idx) + if args: + return [str(x) for x in args if x is not None] + except Exception: + pass + + # Fallback to serialized _selection_args in payload + if items_cache is None: + try: + items_cache = ctx.get_last_result_items() or [] + except Exception: + items_cache = [] + + if 0 <= idx < len(items_cache): + item = items_cache[idx] + if isinstance(item, dict): + candidate = item.get("_selection_args") + if isinstance(candidate, (list, tuple)): + return [str(x) for x in candidate if x is not None] + return None + + # ============================================================================ + # PHASE 2: Parse source command and table metadata + # ============================================================================ source_cmd = None source_args_raw = None try: @@ -1715,6 +1772,9 @@ class PipelineExecutor: "table") else None ) + # ============================================================================ + # PHASE 3: Handle command expansion for @N syntax + # ============================================================================ command_expanded = False example_selector_triggered = False normalized_source_cmd = str(source_cmd or "").replace("_", "-").strip().lower() @@ -1859,9 +1919,10 @@ class PipelineExecutor: debug(f"@N: stage_table={stage_table is not None}, display_table={display_table is not None}") - # Prefer selecting from the last selectable *table* (search/playlist) - # rather than from display-only emitted items, unless we're explicitly - # selecting from an overlay table. + # ==================================================================== + # PHASE 4: Retrieve and filter items from current result set + # ==================================================================== + # Cache items_list to avoid redundant lookups in helper functions below. try: if display_table is not None and stage_table is display_table: items_list = ctx.get_last_result_items() or [] @@ -2034,72 +2095,30 @@ class PipelineExecutor: if src_norm in {".worker", "worker", "workers"}: if len(selection_indices) == 1: idx = selection_indices[0] - row_args = None - try: - row_args = ctx.get_current_stage_table_row_selection_args(idx) - except Exception: - row_args = None - if not row_args: - try: - row_args = ctx.get_last_result_table_row_selection_args(idx) - except Exception: - row_args = None - if not row_args: - try: - items = ctx.get_last_result_items() or [] - if 0 <= idx < len(items): - maybe = items[idx] - if isinstance(maybe, dict): - candidate = maybe.get("_selection_args") - if isinstance(candidate, (list, tuple)): - row_args = [str(x) for x in candidate if x is not None] - except Exception: - row_args = row_args or None - + row_args = _get_row_args(idx, items_list) if row_args: stages.append( [str(source_cmd_for_selection)] - + [str(x) for x in row_args if x is not None] + + row_args + [str(x) for x in source_args_for_selection if x is not None] ) def _apply_row_action_to_stage(stage_idx: int) -> bool: + """Apply row selection_action to a specific stage, replacing it.""" if not selection_indices or len(selection_indices) != 1: return False - try: - row_action = ctx.get_current_stage_table_row_selection_action( - selection_indices[0] - ) - except Exception: - row_action = None + row_action = _get_row_action(selection_indices[0], items_list) if not row_action: - # Fallback to serialized payload when the table row is unavailable - try: - items = ctx.get_last_result_items() or [] - if 0 <= selection_indices[0] < len(items): - maybe = items[selection_indices[0]] - if isinstance(maybe, dict): - candidate = maybe.get("_selection_action") - if isinstance(candidate, (list, tuple)): - row_action = [str(x) for x in candidate if x is not None] - debug(f"@N row {selection_indices[0]} restored action from payload: {row_action}") - except Exception: - row_action = row_action or None - if not row_action: - debug(f"@N row {selection_indices[0]} has no selection_action") return False - normalized = [str(x) for x in row_action if x is not None] - if not normalized: - return False - debug(f"Applying row action for row {selection_indices[0]} -> {normalized}") if 0 <= stage_idx < len(stages): - debug(f"Replacing stage {stage_idx} {stages[stage_idx]} with row action {normalized}") - stages[stage_idx] = normalized + stages[stage_idx] = row_action return True return False + # ==================================================================== + # PHASE 5: Auto-insert stages based on table type and user selection + # ==================================================================== if not stages: - debug(f"@N: stages is empty, checking auto_stage and metadata") if isinstance(table_type, str) and table_type.startswith("metadata."): print("Auto-applying metadata selection via get-tag") stages.append(["get-tag"]) @@ -2123,78 +2142,31 @@ class PipelineExecutor: # Only support single-row selection for auto-attach here if len(selection_indices) == 1: idx = selection_indices[0] - row_args = ctx.get_current_stage_table_row_selection_args(idx) - if not row_args: - try: - items = ctx.get_last_result_items() or [] - if 0 <= idx < len(items): - maybe = items[idx] - if isinstance(maybe, dict): - candidate = maybe.get("_selection_args") - if isinstance(candidate, (list, tuple)): - row_args = [str(x) for x in candidate if x is not None] - except Exception: - row_args = row_args or None + row_args = _get_row_args(idx, items_list) if row_args: # Place selection args before any existing source args inserted = stages[-1] if inserted: cmd = inserted[0] tail = [str(x) for x in inserted[1:]] - stages[-1] = [cmd] + [str(x) for x in row_args] + tail + stages[-1] = [cmd] + row_args + tail except Exception: logger.exception("Failed to attach selection args to auto-inserted stage") # Look for row_action in payload if still no stages if not stages and selection_indices and len(selection_indices) == 1: - debug(f"@N: No stages and no auto_stage, looking for row_action in payload") - try: - idx = selection_indices[0] - debug(f"@N: idx={idx}, looking for row_action") - row_action = None - try: - row_action = ctx.get_current_stage_table_row_selection_action(idx) - debug(f"@N: row_action from table={row_action}") - except Exception as exc: - debug(f"@N: Exception getting row_selection_action: {exc}") - row_action = None - - if not row_action: - debug(f"@N: row_action not found from table, checking payload") + row_action = _get_row_action(selection_indices[0], items_list) + if row_action: + debug(f"@N: applying row_action {row_action}") + stages.append(row_action) + if pipeline_session and worker_manager: try: - items = ctx.get_last_result_items() or [] - debug(f"@N: got items, length={len(items)}") - if 0 <= idx < len(items): - maybe = items[idx] - try: - if isinstance(maybe, dict): - debug(f"@N: payload is dict with _selection_action={maybe.get('_selection_action')}") - else: - debug(f"@N: payload type={type(maybe).__name__}") - except Exception: - pass - if isinstance(maybe, dict): - candidate = maybe.get("_selection_action") - if isinstance(candidate, (list, tuple)): - row_action = [str(x) for x in candidate if x is not None] - debug(f"@N: extracted row_action from payload={row_action}") - except Exception as exc: - debug(f"@N: Exception checking payload: {exc}") - row_action = None - - if row_action: - debug(f"@N: FOUND row_action, appending {row_action}") - stages.append(row_action) - if pipeline_session and worker_manager: - try: - worker_manager.log_step( - pipeline_session.worker_id, - f"@N applied row action -> {' '.join(row_action)}", - ) - except Exception: - logger.exception("Failed to record pipeline log step for applied row action (pipeline_session=%r)", getattr(pipeline_session, 'worker_id', None)) - except Exception: - logger.exception("Failed to apply single-row selection action") + worker_manager.log_step( + pipeline_session.worker_id, + f"@N applied row action -> {' '.join(row_action)}", + ) + except Exception: + logger.exception("Failed to record pipeline log step for applied row action (pipeline_session=%r)", getattr(pipeline_session, 'worker_id', None)) else: first_cmd = stages[0][0] if stages and stages[0] else None if isinstance(table_type, str) and table_type.startswith("metadata.") and first_cmd not in ( @@ -2224,24 +2196,13 @@ class PipelineExecutor: if not _apply_row_action_to_stage(0): if len(selection_indices) == 1: idx = selection_indices[0] - row_args = ctx.get_current_stage_table_row_selection_args(idx) - if not row_args: - try: - items = ctx.get_last_result_items() or [] - if 0 <= idx < len(items): - maybe = items[idx] - if isinstance(maybe, dict): - candidate = maybe.get("_selection_args") - if isinstance(candidate, (list, tuple)): - row_args = [str(x) for x in candidate if x is not None] - except Exception: - row_args = row_args or None + row_args = _get_row_args(idx, items_list) if row_args: inserted = stages[0] if inserted: cmd = inserted[0] tail = [str(x) for x in inserted[1:]] - stages[0] = [cmd] + [str(x) for x in row_args] + tail + stages[0] = [cmd] + row_args + tail except Exception: logger.exception("Failed to attach selection args to inserted auto stage (alternate branch)") diff --git a/Store/registry.py b/Store/registry.py index 0421423..324911d 100644 --- a/Store/registry.py +++ b/Store/registry.py @@ -117,6 +117,11 @@ def _required_keys_for(store_cls: Type[BaseStore]) -> list[str]: ) +# Store type names that have been converted to providers-only. +# These should be silently skipped without warning. +_PROVIDER_ONLY_STORE_NAMES = frozenset(("debrid", "alldebrid")) + + def _build_kwargs(store_cls: Type[BaseStore], instance_name: str, instance_config: Any) -> Dict[str, Any]: if isinstance(instance_config, dict): cfg_dict = dict(instance_config) @@ -180,7 +185,8 @@ class Store: continue store_cls = classes_by_type.get(store_type) if store_cls is None: - if not self._suppress_debug: + # Skip provider-only names without debug warning + if store_type not in _PROVIDER_ONLY_STORE_NAMES and not self._suppress_debug: debug(f"[Store] Unknown store type '{raw_store_type}'") continue