g
This commit is contained in:
213
SYS/pipeline.py
213
SYS/pipeline.py
@@ -1655,6 +1655,9 @@ class PipelineExecutor:
|
||||
if not selection_indices:
|
||||
return True, None
|
||||
|
||||
# ============================================================================
|
||||
# PHASE 1: Synchronize current stage table with display table
|
||||
# ============================================================================
|
||||
# Selection should operate on the *currently displayed* selectable table.
|
||||
# Some navigation flows (e.g. @.. back) can show a display table without
|
||||
# updating current_stage_table. Provider selectors rely on current_stage_table
|
||||
@@ -1689,6 +1692,60 @@ class PipelineExecutor:
|
||||
except Exception:
|
||||
logger.exception("Failed to sync current_stage_table from display/last table in _maybe_apply_initial_selection")
|
||||
|
||||
# ============================================================================
|
||||
# Helper functions for row action/args discovery (performance: inline caching)
|
||||
# ============================================================================
|
||||
def _get_row_action(idx: int, items_cache: List[Any] | None = None) -> List[str] | None:
|
||||
"""Retrieve row selection_action from table or payload fallback."""
|
||||
try:
|
||||
action = ctx.get_current_stage_table_row_selection_action(idx)
|
||||
if action:
|
||||
return [str(x) for x in action if x is not None]
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Fallback to serialized _selection_action in payload
|
||||
if items_cache is None:
|
||||
try:
|
||||
items_cache = ctx.get_last_result_items() or []
|
||||
except Exception:
|
||||
items_cache = []
|
||||
|
||||
if 0 <= idx < len(items_cache):
|
||||
item = items_cache[idx]
|
||||
if isinstance(item, dict):
|
||||
candidate = item.get("_selection_action")
|
||||
if isinstance(candidate, (list, tuple)):
|
||||
return [str(x) for x in candidate if x is not None]
|
||||
return None
|
||||
|
||||
def _get_row_args(idx: int, items_cache: List[Any] | None = None) -> List[str] | None:
|
||||
"""Retrieve row selection_args from table or payload fallback."""
|
||||
try:
|
||||
args = ctx.get_current_stage_table_row_selection_args(idx)
|
||||
if args:
|
||||
return [str(x) for x in args if x is not None]
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Fallback to serialized _selection_args in payload
|
||||
if items_cache is None:
|
||||
try:
|
||||
items_cache = ctx.get_last_result_items() or []
|
||||
except Exception:
|
||||
items_cache = []
|
||||
|
||||
if 0 <= idx < len(items_cache):
|
||||
item = items_cache[idx]
|
||||
if isinstance(item, dict):
|
||||
candidate = item.get("_selection_args")
|
||||
if isinstance(candidate, (list, tuple)):
|
||||
return [str(x) for x in candidate if x is not None]
|
||||
return None
|
||||
|
||||
# ============================================================================
|
||||
# PHASE 2: Parse source command and table metadata
|
||||
# ============================================================================
|
||||
source_cmd = None
|
||||
source_args_raw = None
|
||||
try:
|
||||
@@ -1715,6 +1772,9 @@ class PipelineExecutor:
|
||||
"table") else None
|
||||
)
|
||||
|
||||
# ============================================================================
|
||||
# PHASE 3: Handle command expansion for @N syntax
|
||||
# ============================================================================
|
||||
command_expanded = False
|
||||
example_selector_triggered = False
|
||||
normalized_source_cmd = str(source_cmd or "").replace("_", "-").strip().lower()
|
||||
@@ -1859,9 +1919,10 @@ class PipelineExecutor:
|
||||
|
||||
debug(f"@N: stage_table={stage_table is not None}, display_table={display_table is not None}")
|
||||
|
||||
# Prefer selecting from the last selectable *table* (search/playlist)
|
||||
# rather than from display-only emitted items, unless we're explicitly
|
||||
# selecting from an overlay table.
|
||||
# ====================================================================
|
||||
# PHASE 4: Retrieve and filter items from current result set
|
||||
# ====================================================================
|
||||
# Cache items_list to avoid redundant lookups in helper functions below.
|
||||
try:
|
||||
if display_table is not None and stage_table is display_table:
|
||||
items_list = ctx.get_last_result_items() or []
|
||||
@@ -2034,72 +2095,30 @@ class PipelineExecutor:
|
||||
if src_norm in {".worker", "worker", "workers"}:
|
||||
if len(selection_indices) == 1:
|
||||
idx = selection_indices[0]
|
||||
row_args = None
|
||||
try:
|
||||
row_args = ctx.get_current_stage_table_row_selection_args(idx)
|
||||
except Exception:
|
||||
row_args = None
|
||||
if not row_args:
|
||||
try:
|
||||
row_args = ctx.get_last_result_table_row_selection_args(idx)
|
||||
except Exception:
|
||||
row_args = None
|
||||
if not row_args:
|
||||
try:
|
||||
items = ctx.get_last_result_items() or []
|
||||
if 0 <= idx < len(items):
|
||||
maybe = items[idx]
|
||||
if isinstance(maybe, dict):
|
||||
candidate = maybe.get("_selection_args")
|
||||
if isinstance(candidate, (list, tuple)):
|
||||
row_args = [str(x) for x in candidate if x is not None]
|
||||
except Exception:
|
||||
row_args = row_args or None
|
||||
|
||||
row_args = _get_row_args(idx, items_list)
|
||||
if row_args:
|
||||
stages.append(
|
||||
[str(source_cmd_for_selection)]
|
||||
+ [str(x) for x in row_args if x is not None]
|
||||
+ row_args
|
||||
+ [str(x) for x in source_args_for_selection if x is not None]
|
||||
)
|
||||
|
||||
def _apply_row_action_to_stage(stage_idx: int) -> bool:
|
||||
"""Apply row selection_action to a specific stage, replacing it."""
|
||||
if not selection_indices or len(selection_indices) != 1:
|
||||
return False
|
||||
try:
|
||||
row_action = ctx.get_current_stage_table_row_selection_action(
|
||||
selection_indices[0]
|
||||
)
|
||||
except Exception:
|
||||
row_action = None
|
||||
row_action = _get_row_action(selection_indices[0], items_list)
|
||||
if not row_action:
|
||||
# Fallback to serialized payload when the table row is unavailable
|
||||
try:
|
||||
items = ctx.get_last_result_items() or []
|
||||
if 0 <= selection_indices[0] < len(items):
|
||||
maybe = items[selection_indices[0]]
|
||||
if isinstance(maybe, dict):
|
||||
candidate = maybe.get("_selection_action")
|
||||
if isinstance(candidate, (list, tuple)):
|
||||
row_action = [str(x) for x in candidate if x is not None]
|
||||
debug(f"@N row {selection_indices[0]} restored action from payload: {row_action}")
|
||||
except Exception:
|
||||
row_action = row_action or None
|
||||
if not row_action:
|
||||
debug(f"@N row {selection_indices[0]} has no selection_action")
|
||||
return False
|
||||
normalized = [str(x) for x in row_action if x is not None]
|
||||
if not normalized:
|
||||
return False
|
||||
debug(f"Applying row action for row {selection_indices[0]} -> {normalized}")
|
||||
if 0 <= stage_idx < len(stages):
|
||||
debug(f"Replacing stage {stage_idx} {stages[stage_idx]} with row action {normalized}")
|
||||
stages[stage_idx] = normalized
|
||||
stages[stage_idx] = row_action
|
||||
return True
|
||||
return False
|
||||
|
||||
# ====================================================================
|
||||
# PHASE 5: Auto-insert stages based on table type and user selection
|
||||
# ====================================================================
|
||||
if not stages:
|
||||
debug(f"@N: stages is empty, checking auto_stage and metadata")
|
||||
if isinstance(table_type, str) and table_type.startswith("metadata."):
|
||||
print("Auto-applying metadata selection via get-tag")
|
||||
stages.append(["get-tag"])
|
||||
@@ -2123,78 +2142,31 @@ class PipelineExecutor:
|
||||
# Only support single-row selection for auto-attach here
|
||||
if len(selection_indices) == 1:
|
||||
idx = selection_indices[0]
|
||||
row_args = ctx.get_current_stage_table_row_selection_args(idx)
|
||||
if not row_args:
|
||||
try:
|
||||
items = ctx.get_last_result_items() or []
|
||||
if 0 <= idx < len(items):
|
||||
maybe = items[idx]
|
||||
if isinstance(maybe, dict):
|
||||
candidate = maybe.get("_selection_args")
|
||||
if isinstance(candidate, (list, tuple)):
|
||||
row_args = [str(x) for x in candidate if x is not None]
|
||||
except Exception:
|
||||
row_args = row_args or None
|
||||
row_args = _get_row_args(idx, items_list)
|
||||
if row_args:
|
||||
# Place selection args before any existing source args
|
||||
inserted = stages[-1]
|
||||
if inserted:
|
||||
cmd = inserted[0]
|
||||
tail = [str(x) for x in inserted[1:]]
|
||||
stages[-1] = [cmd] + [str(x) for x in row_args] + tail
|
||||
stages[-1] = [cmd] + row_args + tail
|
||||
except Exception:
|
||||
logger.exception("Failed to attach selection args to auto-inserted stage")
|
||||
|
||||
# Look for row_action in payload if still no stages
|
||||
if not stages and selection_indices and len(selection_indices) == 1:
|
||||
debug(f"@N: No stages and no auto_stage, looking for row_action in payload")
|
||||
try:
|
||||
idx = selection_indices[0]
|
||||
debug(f"@N: idx={idx}, looking for row_action")
|
||||
row_action = None
|
||||
try:
|
||||
row_action = ctx.get_current_stage_table_row_selection_action(idx)
|
||||
debug(f"@N: row_action from table={row_action}")
|
||||
except Exception as exc:
|
||||
debug(f"@N: Exception getting row_selection_action: {exc}")
|
||||
row_action = None
|
||||
|
||||
if not row_action:
|
||||
debug(f"@N: row_action not found from table, checking payload")
|
||||
row_action = _get_row_action(selection_indices[0], items_list)
|
||||
if row_action:
|
||||
debug(f"@N: applying row_action {row_action}")
|
||||
stages.append(row_action)
|
||||
if pipeline_session and worker_manager:
|
||||
try:
|
||||
items = ctx.get_last_result_items() or []
|
||||
debug(f"@N: got items, length={len(items)}")
|
||||
if 0 <= idx < len(items):
|
||||
maybe = items[idx]
|
||||
try:
|
||||
if isinstance(maybe, dict):
|
||||
debug(f"@N: payload is dict with _selection_action={maybe.get('_selection_action')}")
|
||||
else:
|
||||
debug(f"@N: payload type={type(maybe).__name__}")
|
||||
except Exception:
|
||||
pass
|
||||
if isinstance(maybe, dict):
|
||||
candidate = maybe.get("_selection_action")
|
||||
if isinstance(candidate, (list, tuple)):
|
||||
row_action = [str(x) for x in candidate if x is not None]
|
||||
debug(f"@N: extracted row_action from payload={row_action}")
|
||||
except Exception as exc:
|
||||
debug(f"@N: Exception checking payload: {exc}")
|
||||
row_action = None
|
||||
|
||||
if row_action:
|
||||
debug(f"@N: FOUND row_action, appending {row_action}")
|
||||
stages.append(row_action)
|
||||
if pipeline_session and worker_manager:
|
||||
try:
|
||||
worker_manager.log_step(
|
||||
pipeline_session.worker_id,
|
||||
f"@N applied row action -> {' '.join(row_action)}",
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Failed to record pipeline log step for applied row action (pipeline_session=%r)", getattr(pipeline_session, 'worker_id', None))
|
||||
except Exception:
|
||||
logger.exception("Failed to apply single-row selection action")
|
||||
worker_manager.log_step(
|
||||
pipeline_session.worker_id,
|
||||
f"@N applied row action -> {' '.join(row_action)}",
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Failed to record pipeline log step for applied row action (pipeline_session=%r)", getattr(pipeline_session, 'worker_id', None))
|
||||
else:
|
||||
first_cmd = stages[0][0] if stages and stages[0] else None
|
||||
if isinstance(table_type, str) and table_type.startswith("metadata.") and first_cmd not in (
|
||||
@@ -2224,24 +2196,13 @@ class PipelineExecutor:
|
||||
if not _apply_row_action_to_stage(0):
|
||||
if len(selection_indices) == 1:
|
||||
idx = selection_indices[0]
|
||||
row_args = ctx.get_current_stage_table_row_selection_args(idx)
|
||||
if not row_args:
|
||||
try:
|
||||
items = ctx.get_last_result_items() or []
|
||||
if 0 <= idx < len(items):
|
||||
maybe = items[idx]
|
||||
if isinstance(maybe, dict):
|
||||
candidate = maybe.get("_selection_args")
|
||||
if isinstance(candidate, (list, tuple)):
|
||||
row_args = [str(x) for x in candidate if x is not None]
|
||||
except Exception:
|
||||
row_args = row_args or None
|
||||
row_args = _get_row_args(idx, items_list)
|
||||
if row_args:
|
||||
inserted = stages[0]
|
||||
if inserted:
|
||||
cmd = inserted[0]
|
||||
tail = [str(x) for x in inserted[1:]]
|
||||
stages[0] = [cmd] + [str(x) for x in row_args] + tail
|
||||
stages[0] = [cmd] + row_args + tail
|
||||
except Exception:
|
||||
logger.exception("Failed to attach selection args to inserted auto stage (alternate branch)")
|
||||
|
||||
|
||||
@@ -117,6 +117,11 @@ def _required_keys_for(store_cls: Type[BaseStore]) -> list[str]:
|
||||
)
|
||||
|
||||
|
||||
# Store type names that have been converted to providers-only.
|
||||
# These should be silently skipped without warning.
|
||||
_PROVIDER_ONLY_STORE_NAMES = frozenset(("debrid", "alldebrid"))
|
||||
|
||||
|
||||
def _build_kwargs(store_cls: Type[BaseStore], instance_name: str, instance_config: Any) -> Dict[str, Any]:
|
||||
if isinstance(instance_config, dict):
|
||||
cfg_dict = dict(instance_config)
|
||||
@@ -180,7 +185,8 @@ class Store:
|
||||
continue
|
||||
store_cls = classes_by_type.get(store_type)
|
||||
if store_cls is None:
|
||||
if not self._suppress_debug:
|
||||
# Skip provider-only names without debug warning
|
||||
if store_type not in _PROVIDER_ONLY_STORE_NAMES and not self._suppress_debug:
|
||||
debug(f"[Store] Unknown store type '{raw_store_type}'")
|
||||
continue
|
||||
|
||||
|
||||
Reference in New Issue
Block a user