pipeline: use PIPELINE_MISSING sentinel and MAX_RESULT_TABLE_HISTORY; remove unused _LIVE_PROGRESS
This commit is contained in:
196
SYS/pipeline.py
196
SYS/pipeline.py
@@ -12,13 +12,11 @@ from typing import Any, Dict, List, Optional, Sequence
|
||||
from SYS.models import PipelineStageContext
|
||||
from SYS.logger import log
|
||||
|
||||
_LIVE_PROGRESS: Any = None
|
||||
|
||||
def set_live_progress(progress_ui: Any) -> None:
|
||||
"""Register the current Live progress UI so cmdlets can suspend it during prompts."""
|
||||
state = _get_pipeline_state()
|
||||
state.live_progress = progress_ui
|
||||
_sync_module_state(state)
|
||||
|
||||
|
||||
def get_live_progress() -> Any:
|
||||
@@ -134,42 +132,9 @@ def new_pipeline_state():
|
||||
_CTX_STATE.reset(token)
|
||||
|
||||
|
||||
def _sync_module_state(state: PipelineState) -> None:
|
||||
"""Synchronize module-level pipeline globals from a PipelineState instance.
|
||||
# Legacy module-level synchronization removed — module-level pipeline globals are no longer maintained.
|
||||
# Use `get_pipeline_state()` to access or mutate the per-run PipelineState.
|
||||
|
||||
Centralizing `global` declarations reduces the chance of syntax problems during
|
||||
incremental migration (all module globals are updated from a single place).
|
||||
"""
|
||||
global _CURRENT_CONTEXT, _LAST_SEARCH_QUERY, _PIPELINE_REFRESHED, _PIPELINE_LAST_ITEMS
|
||||
global _LAST_RESULT_TABLE, _LAST_RESULT_ITEMS, _LAST_RESULT_SUBJECT, _RESULT_TABLE_HISTORY, _RESULT_TABLE_FORWARD
|
||||
global _CURRENT_STAGE_TABLE, _DISPLAY_ITEMS, _DISPLAY_TABLE, _DISPLAY_SUBJECT
|
||||
global _PIPELINE_LAST_SELECTION, _PIPELINE_COMMAND_TEXT, _CURRENT_CMDLET_NAME, _CURRENT_STAGE_TEXT
|
||||
global _PIPELINE_VALUES, _PENDING_PIPELINE_TAIL, _PENDING_PIPELINE_SOURCE, _UI_LIBRARY_REFRESH_CALLBACK
|
||||
global _PIPELINE_STOP, _LIVE_PROGRESS
|
||||
|
||||
_CURRENT_CONTEXT = state.current_context
|
||||
_LAST_SEARCH_QUERY = state.last_search_query
|
||||
_PIPELINE_REFRESHED = state.pipeline_refreshed
|
||||
_PIPELINE_LAST_ITEMS = state.last_items
|
||||
_LAST_RESULT_TABLE = state.last_result_table
|
||||
_LAST_RESULT_ITEMS = state.last_result_items
|
||||
_LAST_RESULT_SUBJECT = state.last_result_subject
|
||||
_RESULT_TABLE_HISTORY = state.result_table_history
|
||||
_RESULT_TABLE_FORWARD = state.result_table_forward
|
||||
_CURRENT_STAGE_TABLE = state.current_stage_table
|
||||
_DISPLAY_ITEMS = state.display_items
|
||||
_DISPLAY_TABLE = state.display_table
|
||||
_DISPLAY_SUBJECT = state.display_subject
|
||||
_PIPELINE_LAST_SELECTION = state.last_selection
|
||||
_PIPELINE_COMMAND_TEXT = state.pipeline_command_text
|
||||
_CURRENT_CMDLET_NAME = state.current_cmdlet_name
|
||||
_CURRENT_STAGE_TEXT = state.current_stage_text
|
||||
_PIPELINE_VALUES = state.pipeline_values
|
||||
_PENDING_PIPELINE_TAIL = state.pending_pipeline_tail
|
||||
_PENDING_PIPELINE_SOURCE = state.pending_pipeline_source
|
||||
_UI_LIBRARY_REFRESH_CALLBACK = state.ui_library_refresh_callback
|
||||
_PIPELINE_STOP = state.pipeline_stop
|
||||
_LIVE_PROGRESS = state.live_progress
|
||||
|
||||
|
||||
# Public accessors for pipeline state (for external callers that need to inspect
|
||||
@@ -180,81 +145,11 @@ def get_pipeline_state() -> PipelineState:
|
||||
return _get_pipeline_state()
|
||||
|
||||
|
||||
def sync_module_state(state: PipelineState) -> None:
|
||||
"""Synchronize module-level globals from a PipelineState instance (public wrapper)."""
|
||||
_sync_module_state(state)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# PIPELINE STATE
|
||||
# ============================================================================
|
||||
|
||||
# Current pipeline context
|
||||
_CURRENT_CONTEXT: Optional[PipelineStageContext] = None
|
||||
|
||||
# Remember last search query to support refreshing results after pipeline actions
|
||||
_LAST_SEARCH_QUERY: Optional[str] = None
|
||||
|
||||
# Track whether the last pipeline execution already refreshed and displayed results
|
||||
_PIPELINE_REFRESHED: bool = False
|
||||
|
||||
# Cache the last pipeline outputs so non-interactive callers can inspect results
|
||||
_PIPELINE_LAST_ITEMS: List[Any] = []
|
||||
|
||||
# Store the last result table for @ selection syntax (e.g., @2, @2-5, @{1,3,5})
|
||||
_LAST_RESULT_TABLE: Optional[Any] = None
|
||||
_LAST_RESULT_ITEMS: List[Any] = []
|
||||
# Subject for the current result table (e.g., the file whose tags/url are displayed)
|
||||
_LAST_RESULT_SUBJECT: Optional[Any] = None
|
||||
|
||||
# History of result tables for @.. navigation (LIFO stack, max 20 tables)
|
||||
_RESULT_TABLE_HISTORY: List[tuple[Optional[Any], List[Any], Optional[Any]]] = []
|
||||
_MAX_RESULT_TABLE_HISTORY = 20
|
||||
|
||||
# Forward history for @,, navigation (LIFO stack for popped tables)
|
||||
_RESULT_TABLE_FORWARD: List[tuple[Optional[Any], List[Any], Optional[Any]]] = []
|
||||
|
||||
# Current stage table for @N expansion (separate from history)
|
||||
_CURRENT_STAGE_TABLE: Optional[Any] = None
|
||||
|
||||
# Items displayed by non-selectable commands (get-tag, delete-tag, etc.)
|
||||
_DISPLAY_ITEMS: List[Any] = []
|
||||
|
||||
# Table for display-only commands (overlay)
|
||||
_DISPLAY_TABLE: Optional[Any] = None
|
||||
# Subject for overlay/display-only tables (takes precedence over _LAST_RESULT_SUBJECT)
|
||||
_DISPLAY_SUBJECT: Optional[Any] = None
|
||||
|
||||
# Track the indices the user selected via @ syntax for the current invocation
|
||||
_PIPELINE_LAST_SELECTION: List[int] = []
|
||||
|
||||
# Track the currently executing command/pipeline string for worker attribution
|
||||
_PIPELINE_COMMAND_TEXT: str = ""
|
||||
|
||||
# Track the currently executing cmdlet name so debug helpers can label objects
|
||||
# with the active stage (e.g., "1 - add-file").
|
||||
_CURRENT_CMDLET_NAME: str = ""
|
||||
|
||||
# Track the currently executing stage text (best-effort, quotes preserved).
|
||||
_CURRENT_STAGE_TEXT: str = ""
|
||||
|
||||
# Shared scratchpad for cmdlet/funacts to stash structured data between stages
|
||||
_PIPELINE_VALUES: Dict[str,
|
||||
Any] = {}
|
||||
_PIPELINE_MISSING = object()
|
||||
|
||||
# Preserve downstream pipeline stages when a command pauses for @N selection
|
||||
_PENDING_PIPELINE_TAIL: List[List[str]] = []
|
||||
_PENDING_PIPELINE_SOURCE: Optional[str] = None
|
||||
|
||||
# Global callback to notify UI when library content changes
|
||||
_UI_LIBRARY_REFRESH_CALLBACK: Optional[Any] = None
|
||||
|
||||
# ============================================================================
|
||||
# PIPELINE STOP SIGNAL
|
||||
# ============================================================================
|
||||
|
||||
_PIPELINE_STOP: Optional[Dict[str, Any]] = None
|
||||
# No module-level pipeline runtime variables; per-run pipeline state is stored in PipelineState (use `get_pipeline_state()`).
|
||||
MAX_RESULT_TABLE_HISTORY = 20
|
||||
PIPELINE_MISSING = object()
|
||||
|
||||
|
||||
def request_pipeline_stop(*, reason: str = "", exit_code: int = 0) -> None:
|
||||
@@ -264,7 +159,6 @@ def request_pipeline_stop(*, reason: str = "", exit_code: int = 0) -> None:
|
||||
"reason": str(reason or "").strip(),
|
||||
"exit_code": int(exit_code)
|
||||
}
|
||||
_sync_module_state(state)
|
||||
|
||||
|
||||
def get_pipeline_stop() -> Optional[Dict[str, Any]]:
|
||||
@@ -275,7 +169,6 @@ def get_pipeline_stop() -> Optional[Dict[str, Any]]:
|
||||
def clear_pipeline_stop() -> None:
|
||||
state = _get_pipeline_state()
|
||||
state.pipeline_stop = None
|
||||
_sync_module_state(state)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
@@ -287,7 +180,6 @@ def set_stage_context(context: Optional[PipelineStageContext]) -> None:
|
||||
"""Set the current pipeline stage context."""
|
||||
state = _get_pipeline_state()
|
||||
state.current_context = context
|
||||
_sync_module_state(state)
|
||||
|
||||
|
||||
def get_stage_context() -> Optional[PipelineStageContext]:
|
||||
@@ -343,7 +235,8 @@ def store_value(key: str, value: Any) -> None:
|
||||
if not text:
|
||||
return
|
||||
try:
|
||||
_PIPELINE_VALUES[text] = value
|
||||
state = _get_pipeline_state()
|
||||
state.pipeline_values[text] = value
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@@ -361,11 +254,13 @@ def load_value(key: str, default: Any = None) -> Any:
|
||||
if not parts:
|
||||
return default
|
||||
root_key = parts[0].lower()
|
||||
container = _PIPELINE_VALUES.get(root_key, _PIPELINE_MISSING)
|
||||
if container is _PIPELINE_MISSING:
|
||||
state = _get_pipeline_state()
|
||||
container = state.pipeline_values.get(root_key, PIPELINE_MISSING)
|
||||
if container is PIPELINE_MISSING:
|
||||
return default
|
||||
if len(parts) == 1:
|
||||
return container
|
||||
|
||||
current: Any = container
|
||||
for fragment in parts[1:]:
|
||||
if isinstance(current, dict):
|
||||
@@ -373,12 +268,12 @@ def load_value(key: str, default: Any = None) -> Any:
|
||||
if fragment in current:
|
||||
current = current[fragment]
|
||||
continue
|
||||
match = _PIPELINE_MISSING
|
||||
match = PIPELINE_MISSING
|
||||
for key_name, value in current.items():
|
||||
if isinstance(key_name, str) and key_name.lower() == fragment_lower:
|
||||
match = value
|
||||
break
|
||||
if match is _PIPELINE_MISSING:
|
||||
if match is PIPELINE_MISSING:
|
||||
return default
|
||||
current = match
|
||||
continue
|
||||
@@ -418,8 +313,6 @@ def set_pending_pipeline_tail(
|
||||
state.pending_pipeline_tail = pending
|
||||
clean_source = (source_command or "").strip()
|
||||
state.pending_pipeline_source = clean_source if clean_source else None
|
||||
# Sync module-level variables
|
||||
_sync_module_state(state)
|
||||
except Exception:
|
||||
# Keep existing pending tail on failure
|
||||
pass
|
||||
@@ -442,8 +335,6 @@ def clear_pending_pipeline_tail() -> None:
|
||||
state = _get_pipeline_state()
|
||||
state.pending_pipeline_tail = []
|
||||
state.pending_pipeline_source = None
|
||||
# Sync module-level variables
|
||||
_sync_module_state(state)
|
||||
|
||||
|
||||
def reset() -> None:
|
||||
@@ -451,23 +342,24 @@ def reset() -> None:
|
||||
state = _get_pipeline_state()
|
||||
state.reset()
|
||||
|
||||
# Sync module-level variables for backwards compatibility
|
||||
_sync_module_state(state)
|
||||
|
||||
|
||||
def get_emitted_items() -> List[Any]:
|
||||
"""
|
||||
Get a copy of all items emitted by the current pipeline stage.
|
||||
"""
|
||||
if _CURRENT_CONTEXT is not None:
|
||||
return list(_CURRENT_CONTEXT.emits)
|
||||
state = _get_pipeline_state()
|
||||
ctx = state.current_context
|
||||
if ctx is not None:
|
||||
return list(ctx.emits)
|
||||
return []
|
||||
|
||||
|
||||
def clear_emits() -> None:
|
||||
"""Clear the emitted items list (called between stages)."""
|
||||
if _CURRENT_CONTEXT is not None:
|
||||
_CURRENT_CONTEXT.emits.clear()
|
||||
state = _get_pipeline_state()
|
||||
ctx = state.current_context
|
||||
if ctx is not None:
|
||||
ctx.emits.clear()
|
||||
|
||||
|
||||
def set_last_selection(indices: Sequence[int]) -> None:
|
||||
@@ -478,8 +370,6 @@ def set_last_selection(indices: Sequence[int]) -> None:
|
||||
"""
|
||||
state = _get_pipeline_state()
|
||||
state.last_selection = list(indices or [])
|
||||
# Sync module-level variables
|
||||
_sync_module_state(state)
|
||||
|
||||
|
||||
def get_last_selection() -> List[int]:
|
||||
@@ -492,14 +382,12 @@ def clear_last_selection() -> None:
|
||||
"""Clear the cached selection indices after a cmdlet finishes."""
|
||||
state = _get_pipeline_state()
|
||||
state.last_selection = []
|
||||
_sync_module_state(state)
|
||||
|
||||
|
||||
def set_current_command_text(command_text: Optional[str]) -> None:
|
||||
"""Record the raw pipeline/command text for downstream consumers."""
|
||||
state = _get_pipeline_state()
|
||||
state.pipeline_command_text = (command_text or "").strip()
|
||||
_sync_module_state(state)
|
||||
|
||||
|
||||
def get_current_command_text(default: str = "") -> str:
|
||||
@@ -513,7 +401,6 @@ def clear_current_command_text() -> None:
|
||||
"""Clear the cached command text after execution completes."""
|
||||
state = _get_pipeline_state()
|
||||
state.pipeline_command_text = ""
|
||||
_sync_module_state(state)
|
||||
|
||||
|
||||
def split_pipeline_text(pipeline_text: str) -> List[str]:
|
||||
@@ -571,7 +458,6 @@ def set_current_stage_text(stage_text: Optional[str]) -> None:
|
||||
"""Record the raw stage text currently being executed."""
|
||||
state = _get_pipeline_state()
|
||||
state.current_stage_text = str(stage_text or "").strip()
|
||||
_sync_module_state(state)
|
||||
|
||||
|
||||
def get_current_stage_text(default: str = "") -> str:
|
||||
@@ -585,14 +471,12 @@ def clear_current_stage_text() -> None:
|
||||
"""Clear the cached stage text after a stage completes."""
|
||||
state = _get_pipeline_state()
|
||||
state.current_stage_text = ""
|
||||
_sync_module_state(state)
|
||||
|
||||
|
||||
def set_current_cmdlet_name(cmdlet_name: Optional[str]) -> None:
|
||||
"""Record the currently executing cmdlet name (stage-local)."""
|
||||
state = _get_pipeline_state()
|
||||
state.current_cmdlet_name = str(cmdlet_name or "").strip()
|
||||
_sync_module_state(state)
|
||||
|
||||
|
||||
def get_current_cmdlet_name(default: str = "") -> str:
|
||||
@@ -606,14 +490,12 @@ def clear_current_cmdlet_name() -> None:
|
||||
"""Clear the cached cmdlet name after a stage completes."""
|
||||
state = _get_pipeline_state()
|
||||
state.current_cmdlet_name = ""
|
||||
_sync_module_state(state)
|
||||
|
||||
|
||||
def set_search_query(query: Optional[str]) -> None:
|
||||
"""Set the last search query for refresh purposes."""
|
||||
state = _get_pipeline_state()
|
||||
state.last_search_query = query
|
||||
_sync_module_state(state)
|
||||
|
||||
|
||||
def get_search_query() -> Optional[str]:
|
||||
@@ -626,7 +508,6 @@ def set_pipeline_refreshed(refreshed: bool) -> None:
|
||||
"""Track whether the pipeline already refreshed results."""
|
||||
state = _get_pipeline_state()
|
||||
state.pipeline_refreshed = refreshed
|
||||
_sync_module_state(state)
|
||||
|
||||
|
||||
def was_pipeline_refreshed() -> bool:
|
||||
@@ -640,9 +521,6 @@ def set_last_items(items: list) -> None:
|
||||
state = _get_pipeline_state()
|
||||
state.last_items = list(items) if items else []
|
||||
|
||||
# Sync module-level variable
|
||||
_sync_module_state(state)
|
||||
|
||||
|
||||
def get_last_items() -> List[Any]:
|
||||
"""Get the last pipeline outputs."""
|
||||
@@ -656,7 +534,6 @@ def set_ui_library_refresh_callback(callback: Any) -> None:
|
||||
"""
|
||||
state = _get_pipeline_state()
|
||||
state.ui_library_refresh_callback = callback
|
||||
_sync_module_state(state)
|
||||
|
||||
|
||||
def get_ui_library_refresh_callback() -> Optional[Any]:
|
||||
@@ -704,7 +581,7 @@ def set_last_result_table(
|
||||
)
|
||||
)
|
||||
# Keep history size limited
|
||||
if len(state.result_table_history) > _MAX_RESULT_TABLE_HISTORY:
|
||||
if len(state.result_table_history) > MAX_RESULT_TABLE_HISTORY:
|
||||
state.result_table_history.pop(0)
|
||||
|
||||
# Set new current table and clear any display items/table
|
||||
@@ -735,9 +612,6 @@ def set_last_result_table(
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Sync module-level variables for backwards compatibility
|
||||
_sync_module_state(state)
|
||||
|
||||
|
||||
def set_last_result_table_overlay(
|
||||
result_table: Optional[Any],
|
||||
@@ -773,9 +647,6 @@ def set_last_result_table_overlay(
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Sync module-level variables
|
||||
_sync_module_state(state)
|
||||
|
||||
|
||||
def set_last_result_table_preserve_history(
|
||||
result_table: Optional[Any],
|
||||
@@ -792,8 +663,6 @@ def set_last_result_table_preserve_history(
|
||||
state.last_result_items = items or []
|
||||
state.last_result_subject = subject
|
||||
|
||||
# Sync module-level variables
|
||||
_sync_module_state(state)
|
||||
|
||||
|
||||
def set_last_result_items_only(items: Optional[List[Any]]) -> None:
|
||||
@@ -809,9 +678,6 @@ def set_last_result_items_only(items: Optional[List[Any]]) -> None:
|
||||
state.display_table = None
|
||||
state.display_subject = None
|
||||
|
||||
# Sync module-level variables
|
||||
_sync_module_state(state)
|
||||
|
||||
|
||||
def restore_previous_result_table() -> bool:
|
||||
"""
|
||||
@@ -827,12 +693,8 @@ def restore_previous_result_table() -> bool:
|
||||
# If an underlying table exists, we're done.
|
||||
# Otherwise, fall through to history restore so @.. actually returns to the last table.
|
||||
if state.last_result_table is not None:
|
||||
# Sync module-level variables
|
||||
_sync_module_state(state)
|
||||
return True
|
||||
if not state.result_table_history:
|
||||
# Sync
|
||||
_sync_module_state(state)
|
||||
return True
|
||||
|
||||
if not state.result_table_history:
|
||||
@@ -858,9 +720,6 @@ def restore_previous_result_table() -> bool:
|
||||
state.display_table = None
|
||||
state.display_subject = None
|
||||
|
||||
# Sync module-level variables
|
||||
_sync_module_state(state)
|
||||
|
||||
return True
|
||||
|
||||
|
||||
@@ -878,12 +737,8 @@ def restore_next_result_table() -> bool:
|
||||
# If an underlying table exists, we're done.
|
||||
# Otherwise, fall through to forward restore when available.
|
||||
if state.last_result_table is not None:
|
||||
# Sync module-level vars
|
||||
_sync_module_state(state)
|
||||
return True
|
||||
if not state.result_table_forward:
|
||||
# Sync and return
|
||||
_sync_module_state(state)
|
||||
return True
|
||||
|
||||
if not state.result_table_forward:
|
||||
@@ -911,9 +766,6 @@ def restore_next_result_table() -> bool:
|
||||
state.display_table = None
|
||||
state.display_subject = None
|
||||
|
||||
# Sync module-level variables
|
||||
_sync_module_state(state)
|
||||
|
||||
return True
|
||||
|
||||
|
||||
@@ -1035,9 +887,6 @@ def set_current_stage_table(result_table: Optional[Any]) -> None:
|
||||
state = _get_pipeline_state()
|
||||
state.current_stage_table = result_table
|
||||
|
||||
# Sync module-level variable
|
||||
_sync_module_state(state)
|
||||
|
||||
|
||||
def get_current_stage_table() -> Optional[Any]:
|
||||
"""Get the current pipeline stage table (if any)."""
|
||||
@@ -1107,4 +956,3 @@ def clear_last_result() -> None:
|
||||
state.last_result_table = None
|
||||
state.last_result_items = []
|
||||
state.last_result_subject = None
|
||||
_sync_module_state(state)
|
||||
|
||||
Reference in New Issue
Block a user