khh
Some checks failed
smoke-mm / Install & smoke test mm --help (push) Has been cancelled

This commit is contained in:
nose
2025-12-24 02:13:21 -08:00
parent 8bf04c6b71
commit 24dd18de7e
20 changed files with 1792 additions and 636 deletions

View File

@@ -30,6 +30,8 @@ import time
import logging
import re
import hashlib
import subprocess
import platform
from pathlib import Path
from typing import Any, Dict, Optional
@@ -134,6 +136,91 @@ def _run_op(op: str, data: Any) -> Dict[str, Any]:
"""
op_name = str(op or "").strip().lower()
if op_name in {"run-detached", "run_detached", "pipeline-detached", "pipeline_detached"}:
pipeline_text = ""
seeds = None
if isinstance(data, dict):
pipeline_text = str(data.get("pipeline") or "").strip()
seeds = data.get("seeds")
if not pipeline_text:
return {
"success": False,
"stdout": "",
"stderr": "",
"error": "Missing pipeline",
"table": None,
}
py = sys.executable or "python"
if platform.system() == "Windows":
try:
exe = str(py or "").strip()
except Exception:
exe = ""
low = exe.lower()
if low.endswith("python.exe"):
try:
candidate = exe[:-10] + "pythonw.exe"
if os.path.exists(candidate):
py = candidate
except Exception:
pass
cmd = [py, str((_repo_root() / "CLI.py").resolve()), "pipeline", "--pipeline", pipeline_text]
if seeds is not None:
try:
cmd.extend(["--seeds-json", json.dumps(seeds, ensure_ascii=False)])
except Exception:
# Best-effort; seeds are optional.
pass
popen_kwargs: Dict[str, Any] = {
"stdin": subprocess.DEVNULL,
"stdout": subprocess.DEVNULL,
"stderr": subprocess.DEVNULL,
"cwd": str(_repo_root()),
}
if platform.system() == "Windows":
flags = 0
try:
flags |= int(getattr(subprocess, "DETACHED_PROCESS", 0x00000008))
except Exception:
flags |= 0x00000008
try:
flags |= int(getattr(subprocess, "CREATE_NO_WINDOW", 0x08000000))
except Exception:
flags |= 0x08000000
popen_kwargs["creationflags"] = int(flags)
try:
si = subprocess.STARTUPINFO()
si.dwFlags |= int(getattr(subprocess, "STARTF_USESHOWWINDOW", 0x00000001))
si.wShowWindow = subprocess.SW_HIDE
popen_kwargs["startupinfo"] = si
except Exception:
pass
else:
popen_kwargs["start_new_session"] = True
try:
proc = subprocess.Popen(cmd, **popen_kwargs)
except Exception as exc:
return {
"success": False,
"stdout": "",
"stderr": "",
"error": f"Failed to spawn detached pipeline: {type(exc).__name__}: {exc}",
"table": None,
}
return {
"success": True,
"stdout": "",
"stderr": "",
"error": None,
"table": None,
"pid": int(getattr(proc, "pid", 0) or 0),
}
# Provide store backend choices using the same source as CLI/Typer autocomplete.
if op_name in {"store-choices", "store_choices", "get-store-choices", "get_store_choices"}:
from CLI import MedeiaCLI # noqa: WPS433