This commit is contained in:
2026-01-20 00:31:44 -08:00
parent fcab85455d
commit 1f65f9de2a
5 changed files with 104 additions and 21 deletions

View File

@@ -1376,16 +1376,18 @@ class PipelineLiveProgress:
total = self._pipe_totals[pipe_index]
active = self._subtask_active_index[pipe_index]
# If a stage emits more than expected, extend totals dynamically.
if done >= total:
total = done + 1
self._pipe_totals[pipe_index] = total
pipe_task = self._pipe_tasks[pipe_index]
pipe_progress.update(pipe_task, total=total)
# If a stage emits more than expected, or if we have no subtasks yet, extend dynamically.
if (done >= total) or (not self._subtask_ids[pipe_index]):
if done >= total:
total = done + 1
self._pipe_totals[pipe_index] = total
pipe_task = self._pipe_tasks[pipe_index]
pipe_progress.update(pipe_task, total=total)
# Add a placeholder subtask.
# Add a placeholder/next subtask.
label = _pipeline_progress_item_label(emitted)
sub_id = subtasks.add_task(
f"{self._pipe_labels[pipe_index]}: {_pipeline_progress_item_label(emitted)}"
f"{self._pipe_labels[pipe_index]}: {label}"
)
subtasks.stop_task(sub_id)
subtasks.update(sub_id, visible=False)
@@ -1457,6 +1459,9 @@ class PipelineLiveProgress:
assert overall is not None
total = self._pipe_totals[pipe_index]
if total < 1:
total = 1
self._pipe_totals[pipe_index] = total
done = self._pipe_done[pipe_index]
# Ensure the pipe bar finishes even if cmdlet didnt emit per item.
@@ -1499,6 +1504,16 @@ class PipelineLiveProgress:
self._update_overall()
def complete_all_pipes(self) -> None:
"""Mark every configured pipe as finished so UI bars reach 100%."""
if not self._enabled:
return
for idx in range(len(self._pipe_labels)):
try:
self.finish_pipe(idx)
except Exception:
pass
class PipelineStageContext:
"""Context information for the current pipeline stage."""

View File

@@ -2831,6 +2831,8 @@ class PipelineExecutor:
total_stages=len(stages),
pipe_index=pipe_idx,
worker_id=session.worker_id if session else None,
on_emit=(lambda x: progress_ui.on_emit(pipe_idx, x))
if progress_ui is not None and pipe_idx is not None else None,
)
# Set context for the current run
@@ -2839,6 +2841,12 @@ class PipelineExecutor:
ctx.set_current_stage_text(" ".join(stage_tokens))
ctx.clear_emits()
if progress_ui is not None and pipe_idx is not None:
# Start the pipe task in the UI. For most cmdlets we assume 1 item
# initially; cmdlets that process multiple items (like search)
# should call begin_pipe themselves with the actual count.
progress_ui.begin_pipe(pipe_idx, total_items=1)
# RUN THE CMDLET
cmd_fn(piped_result, stage_args, config)
@@ -2848,10 +2856,9 @@ class PipelineExecutor:
piped_result = stage_emits if len(stage_emits) > 1 else stage_emits[0]
else:
piped_result = None
if progress_ui is not None and pipe_idx is not None:
progress_ui.complete_stage(pipe_idx)
finally:
if progress_ui is not None and pipe_idx is not None:
progress_ui.finish_pipe(pipe_idx)
if session:
try:
session.close()
@@ -2870,6 +2877,10 @@ class PipelineExecutor:
finally:
# Stop Live progress and clear pipeline-level live progress
if progress_ui is not None:
try:
progress_ui.complete_all_pipes()
except Exception:
pass
try:
progress_ui.stop()
except Exception: