f
This commit is contained in:
800
SYS/pipeline.py
800
SYS/pipeline.py
@@ -622,7 +622,9 @@ def set_last_result_table(
|
||||
src_idx = getattr(row, "source_index", None)
|
||||
if isinstance(src_idx, int) and 0 <= src_idx < len(state.last_result_items):
|
||||
sorted_items.append(state.last_result_items[src_idx])
|
||||
if len(sorted_items) == len(result_table.rows):
|
||||
# Only reassign when the table actually contains rows and the reordering
|
||||
# produced a complete mapping. Avoid clearing items when the table has no rows.
|
||||
if result_table.rows and len(sorted_items) == len(result_table.rows):
|
||||
state.last_result_items = sorted_items
|
||||
except Exception:
|
||||
pass
|
||||
@@ -2136,6 +2138,54 @@ class PipelineExecutor:
|
||||
stages[-1] = [cmd] + [str(x) for x in row_args] + tail
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# If no auto stage inserted and there are selection-action tokens available
|
||||
# for the single selected row, apply it as the pipeline stage so a bare
|
||||
# `@N` runs the intended action (e.g., get-file for hash-backed rows).
|
||||
if not stages and selection_indices and len(selection_indices) == 1:
|
||||
try:
|
||||
idx = selection_indices[0]
|
||||
debug(f"@N initial selection idx={idx} last_items={len(ctx.get_last_result_items() or [])}")
|
||||
row_action = None
|
||||
try:
|
||||
row_action = ctx.get_current_stage_table_row_selection_action(idx)
|
||||
except Exception:
|
||||
row_action = None
|
||||
|
||||
if not row_action:
|
||||
try:
|
||||
items = ctx.get_last_result_items() or []
|
||||
if 0 <= idx < len(items):
|
||||
maybe = items[idx]
|
||||
# Provide explicit debug output about the payload selected
|
||||
try:
|
||||
if isinstance(maybe, dict):
|
||||
debug(f"@N payload: hash={maybe.get('hash')} store={maybe.get('store')} _selection_args={maybe.get('_selection_args')} _selection_action={maybe.get('_selection_action')}")
|
||||
else:
|
||||
debug(f"@N payload object type: {type(maybe).__name__}")
|
||||
except Exception:
|
||||
pass
|
||||
if isinstance(maybe, dict):
|
||||
candidate = maybe.get("_selection_action")
|
||||
if isinstance(candidate, (list, tuple)):
|
||||
row_action = [str(x) for x in candidate if x is not None]
|
||||
debug(f"@N restored row_action from payload: {row_action}")
|
||||
except Exception:
|
||||
row_action = None
|
||||
|
||||
if row_action:
|
||||
debug(f"@N applying row action -> {row_action}")
|
||||
stages.append(row_action)
|
||||
if pipeline_session and worker_manager:
|
||||
try:
|
||||
worker_manager.log_step(
|
||||
pipeline_session.worker_id,
|
||||
f"@N applied row action -> {' '.join(row_action)}",
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
except Exception:
|
||||
pass
|
||||
else:
|
||||
first_cmd = stages[0][0] if stages and stages[0] else None
|
||||
if isinstance(table_type, str) and table_type.startswith("metadata.") and first_cmd not in (
|
||||
@@ -2372,387 +2422,447 @@ class PipelineExecutor:
|
||||
pipe_index_by_stage: Dict[int,
|
||||
int] = {}
|
||||
|
||||
try:
|
||||
ok, initial_piped = self._maybe_apply_initial_selection(
|
||||
ctx,
|
||||
config,
|
||||
stages,
|
||||
selection_indices=first_stage_selection_indices,
|
||||
first_stage_had_extra_args=first_stage_had_extra_args,
|
||||
worker_manager=worker_manager,
|
||||
pipeline_session=pipeline_session,
|
||||
)
|
||||
if not ok:
|
||||
return
|
||||
if initial_piped is not None:
|
||||
piped_result = initial_piped
|
||||
except Exception as exc:
|
||||
ok, initial_piped = self._maybe_apply_initial_selection(
|
||||
ctx,
|
||||
config,
|
||||
stages,
|
||||
selection_indices=first_stage_selection_indices,
|
||||
first_stage_had_extra_args=first_stage_had_extra_args,
|
||||
worker_manager=worker_manager,
|
||||
pipeline_session=pipeline_session,
|
||||
)
|
||||
if not ok:
|
||||
return
|
||||
if initial_piped is not None:
|
||||
piped_result = initial_piped
|
||||
|
||||
# REPL guard: prevent add-relationship before add-file for download-file pipelines.
|
||||
if not self._validate_download_file_relationship_order(stages):
|
||||
pipeline_status = "failed"
|
||||
pipeline_error = f"{type(exc).__name__}: {exc}"
|
||||
print(f"[error] {type(exc).__name__}: {exc}\n")
|
||||
pipeline_error = "Invalid pipeline order"
|
||||
return
|
||||
|
||||
# REPL guard: prevent add-relationship before add-file for download-file pipelines.
|
||||
if not self._validate_download_file_relationship_order(stages):
|
||||
pipeline_status = "failed"
|
||||
pipeline_error = "Invalid pipeline order"
|
||||
return
|
||||
# ------------------------------------------------------------------
|
||||
# Multi-level pipeline progress (pipes = stages, tasks = items)
|
||||
# ------------------------------------------------------------------
|
||||
progress_ui, pipe_index_by_stage = self._maybe_start_live_progress(config, stages)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Multi-level pipeline progress (pipes = stages, tasks = items)
|
||||
# ------------------------------------------------------------------
|
||||
progress_ui, pipe_index_by_stage = self._maybe_start_live_progress(config, stages)
|
||||
for stage_index, stage_tokens in enumerate(stages):
|
||||
if not stage_tokens:
|
||||
continue
|
||||
|
||||
for stage_index, stage_tokens in enumerate(stages):
|
||||
if not stage_tokens:
|
||||
continue
|
||||
raw_stage_name = str(stage_tokens[0])
|
||||
cmd_name = raw_stage_name.replace("_", "-").lower()
|
||||
stage_args = stage_tokens[1:]
|
||||
|
||||
raw_stage_name = str(stage_tokens[0])
|
||||
cmd_name = raw_stage_name.replace("_", "-").lower()
|
||||
stage_args = stage_tokens[1:]
|
||||
|
||||
if cmd_name == "@":
|
||||
# Prefer piping the last emitted/visible items (e.g. add-file results)
|
||||
# over the result-table subject. The subject can refer to older context
|
||||
# (e.g. a playlist row) and may not contain store+hash.
|
||||
if cmd_name == "@":
|
||||
# Prefer piping the last emitted/visible items (e.g. add-file results)
|
||||
# over the result-table subject. The subject can refer to older context
|
||||
# (e.g. a playlist row) and may not contain store+hash.
|
||||
last_items = None
|
||||
try:
|
||||
last_items = ctx.get_last_result_items()
|
||||
except Exception:
|
||||
last_items = None
|
||||
|
||||
if last_items:
|
||||
from cmdlet._shared import coerce_to_pipe_object
|
||||
|
||||
try:
|
||||
last_items = ctx.get_last_result_items()
|
||||
pipe_items = [
|
||||
coerce_to_pipe_object(x) for x in list(last_items)
|
||||
]
|
||||
except Exception:
|
||||
last_items = None
|
||||
|
||||
if last_items:
|
||||
from cmdlet._shared import coerce_to_pipe_object
|
||||
|
||||
try:
|
||||
pipe_items = [
|
||||
coerce_to_pipe_object(x) for x in list(last_items)
|
||||
]
|
||||
except Exception:
|
||||
pipe_items = list(last_items)
|
||||
piped_result = pipe_items if len(pipe_items
|
||||
) > 1 else pipe_items[0]
|
||||
try:
|
||||
ctx.set_last_items(pipe_items)
|
||||
except Exception:
|
||||
pass
|
||||
if pipeline_session and worker_manager:
|
||||
try:
|
||||
worker_manager.log_step(
|
||||
pipeline_session.worker_id,
|
||||
"@ used last result items"
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
continue
|
||||
|
||||
subject = ctx.get_last_result_subject()
|
||||
if subject is None:
|
||||
print("No current result context available for '@'\n")
|
||||
pipeline_status = "failed"
|
||||
pipeline_error = "No result items/subject for @"
|
||||
return
|
||||
piped_result = subject
|
||||
pipe_items = list(last_items)
|
||||
piped_result = pipe_items if len(pipe_items
|
||||
) > 1 else pipe_items[0]
|
||||
try:
|
||||
subject_items = subject if isinstance(subject,
|
||||
list) else [subject]
|
||||
ctx.set_last_items(subject_items)
|
||||
ctx.set_last_items(pipe_items)
|
||||
except Exception:
|
||||
pass
|
||||
if pipeline_session and worker_manager:
|
||||
try:
|
||||
worker_manager.log_step(
|
||||
pipeline_session.worker_id,
|
||||
"@ used current table subject"
|
||||
"@ used last result items"
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
continue
|
||||
|
||||
if cmd_name.startswith("@"): # selection stage
|
||||
selection_token = raw_stage_name
|
||||
selection = SelectionSyntax.parse(selection_token)
|
||||
filter_spec = SelectionFilterSyntax.parse(selection_token)
|
||||
is_select_all = selection_token.strip() == "@*"
|
||||
if selection is None and filter_spec is None and not is_select_all:
|
||||
print(f"Invalid selection: {selection_token}\n")
|
||||
pipeline_status = "failed"
|
||||
pipeline_error = f"Invalid selection {selection_token}"
|
||||
return
|
||||
|
||||
selected_indices = []
|
||||
# Prefer selecting from the last selectable *table* (search/playlist)
|
||||
# rather than from display-only emitted items, unless we're explicitly
|
||||
# selecting from an overlay table.
|
||||
display_table = None
|
||||
subject = ctx.get_last_result_subject()
|
||||
if subject is None:
|
||||
print("No current result context available for '@'\n")
|
||||
pipeline_status = "failed"
|
||||
pipeline_error = "No result items/subject for @"
|
||||
return
|
||||
piped_result = subject
|
||||
try:
|
||||
subject_items = subject if isinstance(subject,
|
||||
list) else [subject]
|
||||
ctx.set_last_items(subject_items)
|
||||
except Exception:
|
||||
pass
|
||||
if pipeline_session and worker_manager:
|
||||
try:
|
||||
display_table = (
|
||||
ctx.get_display_table()
|
||||
if hasattr(ctx,
|
||||
"get_display_table") else None
|
||||
worker_manager.log_step(
|
||||
pipeline_session.worker_id,
|
||||
"@ used current table subject"
|
||||
)
|
||||
except Exception:
|
||||
display_table = None
|
||||
|
||||
stage_table = ctx.get_current_stage_table()
|
||||
# Selection should operate on the table the user sees.
|
||||
# If a display overlay table exists, force it as the current-stage table
|
||||
# so provider selectors (e.g. tidal.album -> tracks) behave consistently.
|
||||
try:
|
||||
if display_table is not None and hasattr(ctx, "set_current_stage_table"):
|
||||
ctx.set_current_stage_table(display_table)
|
||||
stage_table = display_table
|
||||
except Exception:
|
||||
pass
|
||||
continue
|
||||
|
||||
if not stage_table and display_table is not None:
|
||||
if cmd_name.startswith("@"): # selection stage
|
||||
selection_token = raw_stage_name
|
||||
selection = SelectionSyntax.parse(selection_token)
|
||||
filter_spec = SelectionFilterSyntax.parse(selection_token)
|
||||
is_select_all = selection_token.strip() == "@*"
|
||||
if selection is None and filter_spec is None and not is_select_all:
|
||||
print(f"Invalid selection: {selection_token}\n")
|
||||
pipeline_status = "failed"
|
||||
pipeline_error = f"Invalid selection {selection_token}"
|
||||
return
|
||||
|
||||
selected_indices = []
|
||||
# Prefer selecting from the last selectable *table* (search/playlist)
|
||||
# rather than from display-only emitted items, unless we're explicitly
|
||||
# selecting from an overlay table.
|
||||
display_table = None
|
||||
try:
|
||||
display_table = (
|
||||
ctx.get_display_table()
|
||||
if hasattr(ctx,
|
||||
"get_display_table") else None
|
||||
)
|
||||
except Exception:
|
||||
display_table = None
|
||||
|
||||
stage_table = ctx.get_current_stage_table()
|
||||
# Selection should operate on the table the user sees.
|
||||
# If a display overlay table exists, force it as the current-stage table
|
||||
# so provider selectors (e.g. tidal.album -> tracks) behave consistently.
|
||||
try:
|
||||
if display_table is not None and hasattr(ctx, "set_current_stage_table"):
|
||||
ctx.set_current_stage_table(display_table)
|
||||
stage_table = display_table
|
||||
if not stage_table:
|
||||
stage_table = ctx.get_last_result_table()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if not stage_table and display_table is not None:
|
||||
stage_table = display_table
|
||||
if not stage_table:
|
||||
stage_table = ctx.get_last_result_table()
|
||||
|
||||
try:
|
||||
if hasattr(ctx, "debug_table_state"):
|
||||
ctx.debug_table_state(f"selection {selection_token}")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if display_table is not None and stage_table is display_table:
|
||||
items_list = ctx.get_last_result_items() or []
|
||||
else:
|
||||
if hasattr(ctx, "get_last_selectable_result_items"):
|
||||
items_list = ctx.get_last_selectable_result_items(
|
||||
) or []
|
||||
else:
|
||||
items_list = ctx.get_last_result_items() or []
|
||||
|
||||
if is_select_all:
|
||||
selected_indices = list(range(len(items_list)))
|
||||
elif filter_spec is not None:
|
||||
selected_indices = [
|
||||
i for i, item in enumerate(items_list)
|
||||
if SelectionFilterSyntax.matches(item, filter_spec)
|
||||
]
|
||||
else:
|
||||
selected_indices = sorted(
|
||||
[i - 1 for i in selection]
|
||||
) # type: ignore[arg-type]
|
||||
|
||||
resolved_items = items_list if items_list else []
|
||||
filtered = [
|
||||
resolved_items[i] for i in selected_indices
|
||||
if 0 <= i < len(resolved_items)
|
||||
]
|
||||
# Debug: show selection resolution and sample payload info
|
||||
try:
|
||||
debug(f"Selection {selection_token} -> resolved_indices={selected_indices} filtered_count={len(filtered)}")
|
||||
if filtered:
|
||||
sample = filtered[0]
|
||||
if isinstance(sample, dict):
|
||||
debug(f"Selection sample: hash={sample.get('hash')} store={sample.get('store')} _selection_args={sample.get('_selection_args')} _selection_action={sample.get('_selection_action')}")
|
||||
else:
|
||||
try:
|
||||
debug(f"Selection sample object: provider={getattr(sample, 'provider', None)} store={getattr(sample, 'store', None)}")
|
||||
except Exception:
|
||||
pass
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if not filtered:
|
||||
print("No items matched selection\n")
|
||||
pipeline_status = "failed"
|
||||
pipeline_error = "Empty selection"
|
||||
return
|
||||
|
||||
# Filter UX: if the stage token is a filter and it's terminal,
|
||||
# render a filtered table overlay rather than selecting/auto-downloading.
|
||||
stage_is_last = (stage_index + 1 >= len(stages))
|
||||
if filter_spec is not None and stage_is_last:
|
||||
try:
|
||||
if hasattr(ctx, "debug_table_state"):
|
||||
ctx.debug_table_state(f"selection {selection_token}")
|
||||
base_table = stage_table
|
||||
if base_table is None:
|
||||
base_table = ctx.get_last_result_table()
|
||||
|
||||
if base_table is not None and hasattr(base_table, "copy_with_title"):
|
||||
new_table = base_table.copy_with_title(getattr(base_table, "title", "") or "Results")
|
||||
else:
|
||||
new_table = Table(getattr(base_table, "title", "") if base_table is not None else "Results")
|
||||
|
||||
try:
|
||||
if base_table is not None and getattr(base_table, "table", None):
|
||||
new_table.set_table(str(getattr(base_table, "table")))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
try:
|
||||
# Attach a one-line header so users see the active filter.
|
||||
safe = str(selection_token)[1:].strip()
|
||||
new_table.set_header_line(f'filter: "{safe}"')
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
for item in filtered:
|
||||
new_table.add_result(item)
|
||||
|
||||
try:
|
||||
ctx.set_last_result_table_overlay(new_table, items=list(filtered), subject=ctx.get_last_result_subject())
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
try:
|
||||
stdout_console().print()
|
||||
stdout_console().print(new_table)
|
||||
except Exception:
|
||||
pass
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if display_table is not None and stage_table is display_table:
|
||||
items_list = ctx.get_last_result_items() or []
|
||||
else:
|
||||
if hasattr(ctx, "get_last_selectable_result_items"):
|
||||
items_list = ctx.get_last_selectable_result_items(
|
||||
) or []
|
||||
else:
|
||||
items_list = ctx.get_last_result_items() or []
|
||||
|
||||
if is_select_all:
|
||||
selected_indices = list(range(len(items_list)))
|
||||
elif filter_spec is not None:
|
||||
selected_indices = [
|
||||
i for i, item in enumerate(items_list)
|
||||
if SelectionFilterSyntax.matches(item, filter_spec)
|
||||
]
|
||||
else:
|
||||
selected_indices = sorted(
|
||||
[i - 1 for i in selection]
|
||||
) # type: ignore[arg-type]
|
||||
|
||||
resolved_items = items_list if items_list else []
|
||||
filtered = [
|
||||
resolved_items[i] for i in selected_indices
|
||||
if 0 <= i < len(resolved_items)
|
||||
]
|
||||
if not filtered:
|
||||
print("No items matched selection\n")
|
||||
pipeline_status = "failed"
|
||||
pipeline_error = "Empty selection"
|
||||
return
|
||||
|
||||
# Filter UX: if the stage token is a filter and it's terminal,
|
||||
# render a filtered table overlay rather than selecting/auto-downloading.
|
||||
stage_is_last = (stage_index + 1 >= len(stages))
|
||||
if filter_spec is not None and stage_is_last:
|
||||
try:
|
||||
base_table = stage_table
|
||||
if base_table is None:
|
||||
base_table = ctx.get_last_result_table()
|
||||
|
||||
if base_table is not None and hasattr(base_table, "copy_with_title"):
|
||||
new_table = base_table.copy_with_title(getattr(base_table, "title", "") or "Results")
|
||||
else:
|
||||
new_table = Table(getattr(base_table, "title", "") if base_table is not None else "Results")
|
||||
|
||||
try:
|
||||
if base_table is not None and getattr(base_table, "table", None):
|
||||
new_table.set_table(str(getattr(base_table, "table")))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
try:
|
||||
# Attach a one-line header so users see the active filter.
|
||||
safe = str(selection_token)[1:].strip()
|
||||
new_table.set_header_line(f'filter: "{safe}"')
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
for item in filtered:
|
||||
new_table.add_result(item)
|
||||
|
||||
try:
|
||||
ctx.set_last_result_table_overlay(new_table, items=list(filtered), subject=ctx.get_last_result_subject())
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
try:
|
||||
stdout_console().print()
|
||||
stdout_console().print(new_table)
|
||||
except Exception:
|
||||
pass
|
||||
except Exception:
|
||||
pass
|
||||
continue
|
||||
|
||||
# UX: selecting a single URL row from get-url tables should open it.
|
||||
# Only do this when the selection stage is terminal to avoid surprising
|
||||
# side-effects in pipelines like `@1 | download-file`.
|
||||
current_table = ctx.get_current_stage_table(
|
||||
) or ctx.get_last_result_table()
|
||||
if (not is_select_all) and (len(filtered) == 1):
|
||||
try:
|
||||
PipelineExecutor._maybe_open_url_selection(
|
||||
current_table,
|
||||
filtered,
|
||||
stage_is_last=(stage_index + 1 >= len(stages)),
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if PipelineExecutor._maybe_run_class_selector(
|
||||
ctx,
|
||||
config,
|
||||
filtered,
|
||||
stage_is_last=(stage_index + 1 >= len(stages))):
|
||||
return
|
||||
|
||||
# Special case: selecting multiple tags from get-tag and piping into delete-tag
|
||||
# should batch into a single operation (one backend call).
|
||||
next_cmd = None
|
||||
try:
|
||||
if stage_index + 1 < len(stages) and stages[stage_index + 1]:
|
||||
next_cmd = str(stages[stage_index + 1][0]
|
||||
).replace("_",
|
||||
"-").lower()
|
||||
except Exception:
|
||||
next_cmd = None
|
||||
|
||||
def _is_tag_row(obj: Any) -> bool:
|
||||
try:
|
||||
if (hasattr(obj,
|
||||
"__class__")
|
||||
and obj.__class__.__name__ == "TagItem"
|
||||
and hasattr(obj,
|
||||
"tag_name")):
|
||||
return True
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
if isinstance(obj, dict) and obj.get("tag_name"):
|
||||
return True
|
||||
except Exception:
|
||||
pass
|
||||
return False
|
||||
|
||||
if (next_cmd in {"delete-tag",
|
||||
"delete_tag"} and len(filtered) > 1
|
||||
and all(_is_tag_row(x) for x in filtered)):
|
||||
from cmdlet._shared import get_field
|
||||
|
||||
tags: List[str] = []
|
||||
first_hash = None
|
||||
first_store = None
|
||||
first_path = None
|
||||
for item in filtered:
|
||||
tag_name = get_field(item, "tag_name")
|
||||
if tag_name:
|
||||
tags.append(str(tag_name))
|
||||
if first_hash is None:
|
||||
first_hash = get_field(item, "hash")
|
||||
if first_store is None:
|
||||
first_store = get_field(item, "store")
|
||||
if first_path is None:
|
||||
first_path = get_field(item,
|
||||
"path") or get_field(
|
||||
item,
|
||||
"target"
|
||||
)
|
||||
|
||||
if tags:
|
||||
grouped = {
|
||||
"table": "tag.selection",
|
||||
"media_kind": "tag",
|
||||
"hash": first_hash,
|
||||
"store": first_store,
|
||||
"path": first_path,
|
||||
"tag": tags,
|
||||
}
|
||||
piped_result = grouped
|
||||
continue
|
||||
|
||||
from cmdlet._shared import coerce_to_pipe_object
|
||||
|
||||
filtered_pipe_objs = [
|
||||
coerce_to_pipe_object(item) for item in filtered
|
||||
]
|
||||
piped_result = (
|
||||
filtered_pipe_objs
|
||||
if len(filtered_pipe_objs) > 1 else filtered_pipe_objs[0]
|
||||
)
|
||||
|
||||
current_table = ctx.get_current_stage_table(
|
||||
) or ctx.get_last_result_table()
|
||||
table_type = (
|
||||
current_table.table
|
||||
if current_table and hasattr(current_table,
|
||||
"table") else None
|
||||
)
|
||||
|
||||
def _norm_stage_cmd(name: Any) -> str:
|
||||
return str(name or "").replace("_", "-").strip().lower()
|
||||
|
||||
next_cmd = None
|
||||
if stage_index + 1 < len(stages) and stages[stage_index + 1]:
|
||||
next_cmd = _norm_stage_cmd(stages[stage_index + 1][0])
|
||||
|
||||
auto_stage = None
|
||||
if isinstance(table_type, str) and table_type:
|
||||
try:
|
||||
from ProviderCore.registry import selection_auto_stage_for_table
|
||||
|
||||
# Preserve historical behavior: only forward selection-stage args
|
||||
# to the auto stage when we are appending a new last stage.
|
||||
at_end = bool(stage_index + 1 >= len(stages))
|
||||
auto_stage = selection_auto_stage_for_table(
|
||||
table_type,
|
||||
stage_args if at_end else None,
|
||||
)
|
||||
except Exception:
|
||||
auto_stage = None
|
||||
|
||||
# Auto-insert downloader stages for provider tables.
|
||||
# IMPORTANT: do not auto-download for filter selections; they may match many rows.
|
||||
if filter_spec is None:
|
||||
if stage_index + 1 >= len(stages):
|
||||
if auto_stage:
|
||||
try:
|
||||
print(f"Auto-running selection via {auto_stage[0]}")
|
||||
except Exception:
|
||||
pass
|
||||
stages.append(list(auto_stage))
|
||||
else:
|
||||
if auto_stage:
|
||||
auto_cmd = _norm_stage_cmd(auto_stage[0])
|
||||
if next_cmd not in (auto_cmd, ".pipe", ".mpv"):
|
||||
debug(f"Auto-inserting {auto_cmd} after selection")
|
||||
stages.insert(stage_index + 1, list(auto_stage))
|
||||
continue
|
||||
|
||||
cmd_fn = REGISTRY.get(cmd_name)
|
||||
if not cmd_fn:
|
||||
# UX: selecting a single URL row from get-url tables should open it.
|
||||
# Only do this when the selection stage is terminal to avoid surprising
|
||||
# side-effects in pipelines like `@1 | download-file`.
|
||||
current_table = ctx.get_current_stage_table(
|
||||
) or ctx.get_last_result_table()
|
||||
if (not is_select_all) and (len(filtered) == 1):
|
||||
try:
|
||||
mod = import_cmd_module(cmd_name)
|
||||
data = getattr(mod, "CMDLET", None) if mod else None
|
||||
if data and hasattr(data, "exec") and callable(getattr(data, "exec")):
|
||||
run_fn = getattr(data, "exec")
|
||||
REGISTRY[cmd_name] = run_fn
|
||||
cmd_fn = run_fn
|
||||
PipelineExecutor._maybe_open_url_selection(
|
||||
current_table,
|
||||
filtered,
|
||||
stage_is_last=(stage_index + 1 >= len(stages)),
|
||||
)
|
||||
except Exception:
|
||||
cmd_fn = None
|
||||
pass
|
||||
|
||||
if not cmd_fn:
|
||||
print(f"Unknown command: {cmd_name}\n")
|
||||
pipeline_status = "failed"
|
||||
pipeline_error = f"Unknown command: {cmd_name}"
|
||||
if PipelineExecutor._maybe_run_class_selector(
|
||||
ctx,
|
||||
config,
|
||||
filtered,
|
||||
stage_is_last=(stage_index + 1 >= len(stages))):
|
||||
return
|
||||
|
||||
# Special case: selecting multiple tags from get-tag and piping into delete-tag
|
||||
# should batch into a single operation (one backend call).
|
||||
next_cmd = None
|
||||
try:
|
||||
if stage_index + 1 < len(stages) and stages[stage_index + 1]:
|
||||
next_cmd = str(stages[stage_index + 1][0]
|
||||
).replace("_",
|
||||
"-").lower()
|
||||
except Exception:
|
||||
next_cmd = None
|
||||
|
||||
def _is_tag_row(obj: Any) -> bool:
|
||||
try:
|
||||
if (hasattr(obj,
|
||||
"__class__")
|
||||
and obj.__class__.__name__ == "TagItem"
|
||||
and hasattr(obj,
|
||||
"tag_name")):
|
||||
return True
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
if isinstance(obj, dict) and obj.get("tag_name"):
|
||||
return True
|
||||
except Exception:
|
||||
pass
|
||||
return False
|
||||
|
||||
if (next_cmd in {"delete-tag",
|
||||
"delete_tag"} and len(filtered) > 1
|
||||
and all(_is_tag_row(x) for x in filtered)):
|
||||
from cmdlet._shared import get_field
|
||||
|
||||
tags: List[str] = []
|
||||
first_hash = None
|
||||
first_store = None
|
||||
first_path = None
|
||||
for item in filtered:
|
||||
tag_name = get_field(item, "tag_name")
|
||||
if tag_name:
|
||||
tags.append(str(tag_name))
|
||||
if first_hash is None:
|
||||
first_hash = get_field(item, "hash")
|
||||
if first_store is None:
|
||||
first_store = get_field(item, "store")
|
||||
if first_path is None:
|
||||
first_path = get_field(item,
|
||||
"path") or get_field(
|
||||
item,
|
||||
"target"
|
||||
)
|
||||
|
||||
if tags:
|
||||
grouped = {
|
||||
"table": "tag.selection",
|
||||
"media_kind": "tag",
|
||||
"hash": first_hash,
|
||||
"store": first_store,
|
||||
"path": first_path,
|
||||
"tag": tags,
|
||||
}
|
||||
piped_result = grouped
|
||||
continue
|
||||
|
||||
from cmdlet._shared import coerce_to_pipe_object
|
||||
|
||||
filtered_pipe_objs = [
|
||||
coerce_to_pipe_object(item) for item in filtered
|
||||
]
|
||||
piped_result = (
|
||||
filtered_pipe_objs
|
||||
if len(filtered_pipe_objs) > 1 else filtered_pipe_objs[0]
|
||||
)
|
||||
|
||||
current_table = ctx.get_current_stage_table(
|
||||
) or ctx.get_last_result_table()
|
||||
table_type = (
|
||||
current_table.table
|
||||
if current_table and hasattr(current_table,
|
||||
"table") else None
|
||||
)
|
||||
|
||||
def _norm_stage_cmd(name: Any) -> str:
|
||||
return str(name or "").replace("_", "-").strip().lower()
|
||||
|
||||
next_cmd = None
|
||||
if stage_index + 1 < len(stages) and stages[stage_index + 1]:
|
||||
next_cmd = _norm_stage_cmd(stages[stage_index + 1][0])
|
||||
|
||||
auto_stage = None
|
||||
if isinstance(table_type, str) and table_type:
|
||||
try:
|
||||
from ProviderCore.registry import selection_auto_stage_for_table
|
||||
|
||||
# Preserve historical behavior: only forward selection-stage args
|
||||
# to the auto stage when we are appending a new last stage.
|
||||
at_end = bool(stage_index + 1 >= len(stages))
|
||||
auto_stage = selection_auto_stage_for_table(
|
||||
table_type,
|
||||
stage_args if at_end else None,
|
||||
)
|
||||
except Exception:
|
||||
auto_stage = None
|
||||
|
||||
# Auto-insert downloader stages for provider tables.
|
||||
# IMPORTANT: do not auto-download for filter selections; they may match many rows.
|
||||
if filter_spec is None:
|
||||
if stage_index + 1 >= len(stages):
|
||||
if auto_stage:
|
||||
try:
|
||||
print(f"Auto-running selection via {auto_stage[0]}")
|
||||
except Exception:
|
||||
pass
|
||||
stages.append(list(auto_stage))
|
||||
else:
|
||||
if auto_stage:
|
||||
auto_cmd = _norm_stage_cmd(auto_stage[0])
|
||||
if next_cmd not in (auto_cmd, ".pipe", ".mpv"):
|
||||
debug(f"Auto-inserting {auto_cmd} after selection")
|
||||
stages.insert(stage_index + 1, list(auto_stage))
|
||||
continue
|
||||
|
||||
cmd_fn = REGISTRY.get(cmd_name)
|
||||
if not cmd_fn:
|
||||
try:
|
||||
mod = import_cmd_module(cmd_name)
|
||||
data = getattr(mod, "CMDLET", None) if mod else None
|
||||
if data and hasattr(data, "exec") and callable(getattr(data, "exec")):
|
||||
run_fn = getattr(data, "exec")
|
||||
REGISTRY[cmd_name] = run_fn
|
||||
cmd_fn = run_fn
|
||||
except Exception:
|
||||
cmd_fn = None
|
||||
|
||||
if not cmd_fn:
|
||||
print(f"Unknown command: {cmd_name}\n")
|
||||
pipeline_status = "failed"
|
||||
pipeline_error = f"Unknown command: {cmd_name}"
|
||||
return
|
||||
|
||||
try:
|
||||
from SYS.models import PipelineStageContext
|
||||
|
||||
pipe_idx = pipe_index_by_stage.get(stage_index)
|
||||
|
||||
session = WorkerStages.begin_stage(
|
||||
worker_manager,
|
||||
cmd_name=cmd_name,
|
||||
stage_tokens=stage_tokens,
|
||||
config=config,
|
||||
command_text=pipeline_text if pipeline_text else " ".join(stage_tokens),
|
||||
)
|
||||
try:
|
||||
stage_ctx = PipelineStageContext(
|
||||
stage_index=stage_index,
|
||||
total_stages=len(stages),
|
||||
pipe_index=pipe_idx,
|
||||
worker_id=session.worker_id if session else None,
|
||||
)
|
||||
|
||||
# Set context for the current run
|
||||
ctx.set_stage_context(stage_ctx)
|
||||
ctx.set_current_cmdlet_name(cmd_name)
|
||||
ctx.set_current_stage_text(" ".join(stage_tokens))
|
||||
ctx.clear_emits()
|
||||
|
||||
# RUN THE CMDLET
|
||||
cmd_fn(piped_result, stage_args, config)
|
||||
|
||||
# Update piped_result for next stage from emitted items
|
||||
stage_emits = list(stage_ctx.emits)
|
||||
if stage_emits:
|
||||
piped_result = stage_emits if len(stage_emits) > 1 else stage_emits[0]
|
||||
else:
|
||||
piped_result = None
|
||||
|
||||
if progress_ui is not None and pipe_idx is not None:
|
||||
progress_ui.complete_stage(pipe_idx)
|
||||
finally:
|
||||
if session:
|
||||
try:
|
||||
session.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
except Exception as exc:
|
||||
pipeline_status = "failed"
|
||||
pipeline_error = f"{cmd_name}: {exc}"
|
||||
debug(f"Error in stage {stage_index} ({cmd_name}): {exc}")
|
||||
return
|
||||
except Exception as exc:
|
||||
pipeline_status = "failed"
|
||||
pipeline_error = f"{type(exc).__name__}: {exc}"
|
||||
|
||||
Reference in New Issue
Block a user