k
This commit is contained in:
@@ -4,6 +4,7 @@ Pipeline execution context and state management for cmdlet.
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
import time
|
||||
from contextlib import contextmanager
|
||||
from dataclasses import dataclass, field
|
||||
from contextvars import ContextVar
|
||||
@@ -102,6 +103,7 @@ class PipelineState:
|
||||
ui_library_refresh_callback: Optional[Any] = None
|
||||
pipeline_stop: Optional[Dict[str, Any]] = None
|
||||
live_progress: Any = None
|
||||
last_execution_result: Dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
def reset(self) -> None:
|
||||
self.current_context = None
|
||||
@@ -127,6 +129,7 @@ class PipelineState:
|
||||
self.ui_library_refresh_callback = None
|
||||
self.pipeline_stop = None
|
||||
self.live_progress = None
|
||||
self.last_execution_result = {}
|
||||
|
||||
|
||||
# ContextVar for per-run state (prototype)
|
||||
@@ -315,6 +318,29 @@ def load_value(key: str, default: Any = None) -> Any:
|
||||
return current
|
||||
|
||||
|
||||
def set_last_execution_result(
|
||||
*,
|
||||
status: str,
|
||||
error: str = "",
|
||||
command_text: str = "",
|
||||
) -> None:
|
||||
state = _get_pipeline_state()
|
||||
text_status = str(status or "").strip().lower() or "unknown"
|
||||
state.last_execution_result = {
|
||||
"status": text_status,
|
||||
"success": text_status == "completed",
|
||||
"error": str(error or "").strip(),
|
||||
"command_text": str(command_text or "").strip(),
|
||||
"finished_at": time.time(),
|
||||
}
|
||||
|
||||
|
||||
def get_last_execution_result() -> Dict[str, Any]:
|
||||
state = _get_pipeline_state()
|
||||
payload = state.last_execution_result
|
||||
return dict(payload) if isinstance(payload, dict) else {}
|
||||
|
||||
|
||||
def set_pending_pipeline_tail(
|
||||
stages: Optional[Sequence[Sequence[str]]],
|
||||
source_command: Optional[str] = None
|
||||
@@ -3042,3 +3068,11 @@ class PipelineExecutor:
|
||||
f"Pipeline {pipeline_status}: {pipeline_error or ''}")
|
||||
except Exception:
|
||||
logger.exception("Failed to log final pipeline status (pipeline_session=%r)", getattr(pipeline_session, 'worker_id', None))
|
||||
try:
|
||||
set_last_execution_result(
|
||||
status=pipeline_status,
|
||||
error=pipeline_error,
|
||||
command_text=pipeline_text,
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Failed to record last execution result for pipeline")
|
||||
|
||||
@@ -11,6 +11,13 @@ def repl_queue_dir(root: Path) -> Path:
|
||||
return Path(root) / "Log" / "repl_queue"
|
||||
|
||||
|
||||
def _legacy_repl_queue_glob(root: Path) -> list[Path]:
|
||||
log_dir = Path(root) / "Log"
|
||||
if not log_dir.exists():
|
||||
return []
|
||||
return list(log_dir.glob("medeia-repl-queue-*.json"))
|
||||
|
||||
|
||||
def enqueue_repl_command(
|
||||
root: Path,
|
||||
command: str,
|
||||
@@ -41,11 +48,24 @@ def enqueue_repl_command(
|
||||
|
||||
def pop_repl_commands(root: Path, *, limit: int = 8) -> List[Dict[str, Any]]:
|
||||
queue_dir = repl_queue_dir(root)
|
||||
if not queue_dir.exists():
|
||||
legacy_entries = _legacy_repl_queue_glob(root)
|
||||
if not queue_dir.exists() and not legacy_entries:
|
||||
return []
|
||||
|
||||
items: List[Dict[str, Any]] = []
|
||||
for entry in sorted(queue_dir.glob("*.json"))[: max(1, int(limit or 1))]:
|
||||
entries: List[Path] = []
|
||||
if queue_dir.exists():
|
||||
entries.extend(queue_dir.glob("*.json"))
|
||||
entries.extend(legacy_entries)
|
||||
|
||||
def _sort_key(path: Path) -> tuple[float, str]:
|
||||
try:
|
||||
ts = float(path.stat().st_mtime)
|
||||
except Exception:
|
||||
ts = 0.0
|
||||
return (ts, path.name)
|
||||
|
||||
for entry in sorted(entries, key=_sort_key)[: max(1, int(limit or 1))]:
|
||||
try:
|
||||
payload = json.loads(entry.read_text(encoding="utf-8"))
|
||||
except Exception:
|
||||
|
||||
Reference in New Issue
Block a user