This commit is contained in:
nose
2025-12-23 16:36:39 -08:00
parent 16316bb3fd
commit 8bf04c6b71
25 changed files with 3165 additions and 234 deletions

View File

@@ -4,6 +4,7 @@
from __future__ import annotations
import json
import shutil
import sys
from collections.abc import Iterable as IterableABC
@@ -1275,6 +1276,233 @@ def get_pipe_object_path(pipe_object: Any) -> Optional[str]:
return None
def _extract_flag_value(args: Sequence[str], *flags: str) -> Optional[str]:
"""Return the value for the first matching flag in args.
This is intentionally lightweight (no cmdlet spec required) so callers in CLI/pipeline
can share the same behavior.
"""
if not args:
return None
want = {str(f).strip().lower() for f in flags if str(f).strip()}
if not want:
return None
try:
tokens = [str(a) for a in args]
except Exception:
tokens = list(args) # type: ignore[list-item]
for i, tok in enumerate(tokens):
low = str(tok).strip().lower()
if low in want:
if i + 1 >= len(tokens):
return None
nxt = str(tokens[i + 1])
# Allow paths like "-"? Treat missing value as None.
if not nxt.strip():
return None
# Don't consume another flag as value.
if nxt.startswith("-"):
return None
return nxt
return None
def _unique_destination_path(dest: Path) -> Path:
"""Generate a non-colliding destination path by appending " (N)"."""
try:
if not dest.exists():
return dest
except Exception:
return dest
parent = dest.parent
stem = dest.stem
suffix = dest.suffix
for i in range(1, 10_000):
candidate = parent / f"{stem} ({i}){suffix}"
try:
if not candidate.exists():
return candidate
except Exception:
return candidate
return dest
def apply_output_path_from_pipeobjects(
*,
cmd_name: str,
args: Sequence[str],
emits: Sequence[Any],
) -> List[Any]:
"""If the user supplied `-path`, move emitted temp/PATH files there.
This enables a dynamic pattern:
- Any cmdlet can include `SharedArgs.PATH`.
- If it emits a file-backed PipeObject (`path` exists on disk) and the item is
a temp/PATH artifact, then `-path <dest>` will save it to that location.
Rules:
- Only affects items whose `action` matches the current cmdlet.
- Only affects items that look like local artifacts (`is_temp` True or `store` == PATH).
- Updates the emitted object's `path` (and `target` when it points at the same file).
"""
dest_raw = _extract_flag_value(args, "-path", "--path")
if not dest_raw:
return list(emits or [])
cmd_norm = str(cmd_name or "").replace("_", "-").strip().lower()
if not cmd_norm:
return list(emits or [])
try:
dest_hint_dir = str(dest_raw).endswith(("/", "\\"))
except Exception:
dest_hint_dir = False
try:
dest_path = Path(str(dest_raw)).expanduser()
except Exception:
return list(emits or [])
items = list(emits or [])
# Identify which emitted items are actually file artifacts produced by this cmdlet.
artifact_indices: List[int] = []
artifact_paths: List[Path] = []
for idx, item in enumerate(items):
action = str(get_field(item, "action", "") or "").strip().lower()
if not action.startswith("cmdlet:"):
continue
action_name = action.split(":", 1)[-1].strip().lower()
if action_name != cmd_norm:
continue
store = str(get_field(item, "store", "") or "").strip().lower()
is_temp = bool(get_field(item, "is_temp", False))
if not (is_temp or store == "path"):
continue
src_str = get_pipe_object_path(item)
if not src_str:
continue
try:
src = Path(str(src_str)).expanduser()
except Exception:
continue
try:
if not src.exists() or not src.is_file():
continue
except Exception:
continue
artifact_indices.append(idx)
artifact_paths.append(src)
if not artifact_indices:
return items
# Decide whether the destination is a directory or a single file.
if len(artifact_indices) > 1:
# Multiple artifacts: always treat destination as a directory.
if dest_path.suffix:
dest_dir = dest_path.parent
else:
dest_dir = dest_path
try:
dest_dir.mkdir(parents=True, exist_ok=True)
except Exception as exc:
log(f"Failed to create destination directory: {dest_dir} ({exc})", file=sys.stderr)
return items
for idx, src in zip(artifact_indices, artifact_paths):
final = dest_dir / src.name
final = _unique_destination_path(final)
try:
if src.resolve() == final.resolve():
continue
except Exception:
pass
try:
shutil.move(str(src), str(final))
except Exception as exc:
log(f"Failed to save output to {final}: {exc}", file=sys.stderr)
continue
_apply_saved_path_update(items[idx], old_path=str(src), new_path=str(final))
return items
# Single artifact: destination can be a directory or a concrete file path.
src = artifact_paths[0]
idx = artifact_indices[0]
final: Path
try:
if dest_hint_dir or (dest_path.exists() and dest_path.is_dir()):
final = dest_path / src.name
else:
final = dest_path
except Exception:
final = dest_path
try:
final.parent.mkdir(parents=True, exist_ok=True)
except Exception as exc:
log(f"Failed to create destination directory: {final.parent} ({exc})", file=sys.stderr)
return items
final = _unique_destination_path(final)
try:
if src.resolve() != final.resolve():
shutil.move(str(src), str(final))
except Exception as exc:
log(f"Failed to save output to {final}: {exc}", file=sys.stderr)
return items
_apply_saved_path_update(items[idx], old_path=str(src), new_path=str(final))
return items
def _apply_saved_path_update(item: Any, *, old_path: str, new_path: str) -> None:
"""Update a PipeObject-like item after its backing file has moved."""
old_str = str(old_path)
new_str = str(new_path)
if isinstance(item, dict):
try:
if str(item.get("path") or "") == old_str:
item["path"] = new_str
except Exception:
pass
try:
if str(item.get("target") or "") == old_str:
item["target"] = new_str
except Exception:
pass
try:
extra = item.get("extra")
if isinstance(extra, dict):
if str(extra.get("target") or "") == old_str:
extra["target"] = new_str
if str(extra.get("path") or "") == old_str:
extra["path"] = new_str
except Exception:
pass
return
# models.PipeObject or PipeObject-ish
try:
if getattr(item, "path", None) == old_str:
setattr(item, "path", new_str)
except Exception:
pass
try:
extra = getattr(item, "extra", None)
if isinstance(extra, dict):
if str(extra.get("target") or "") == old_str:
extra["target"] = new_str
if str(extra.get("path") or "") == old_str:
extra["path"] = new_str
except Exception:
pass
def get_pipe_object_hash(pipe_object: Any) -> Optional[str]:
"""Extract file hash from PipeObject, dict, or pipeline-friendly object."""
if pipe_object is None: