"""Cleanup cmdlet for removing temporary artifacts from pipeline. This cmdlet processes result lists and removes temporary files (marked with is_temp=True), then emits the remaining non-temporary results for further pipeline stages. """ from __future__ import annotations from typing import Any, Dict, Sequence from pathlib import Path import sys from helper.logger import log from . import register from ._shared import Cmdlet, CmdletArg, get_pipe_object_path, normalize_result_input, filter_results_by_temp import models import pipeline as pipeline_context @register(["cleanup"]) def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: """Remove temporary files from pipeline results. Accepts: - Single result object with is_temp field - List of result objects to clean up Process: - Filters results by is_temp=True - Deletes those files from disk - Emits only non-temporary results Typical pipeline usage: download-data url | screen-shot | add-tag "tag" --all | cleanup """ # Help try: if any(str(a).lower() in {"-?", "/?", "--help", "-h", "help", "--cmdlet"} for a in args): import json log(json.dumps(CMDLET, ensure_ascii=False, indent=2)) return 0 except Exception: pass # Normalize input to list results = normalize_result_input(result) if not results: log("[cleanup] No results to process", file=sys.stderr) return 1 # Separate temporary and permanent results temp_results = pipeline_context.filter_results_by_temp(results, include_temp=True) perm_results = pipeline_context.filter_results_by_temp(results, include_temp=False) # Delete temporary files deleted_count = 0 for temp_result in temp_results: try: file_path = get_pipe_object_path(temp_result) if file_path: path_obj = Path(file_path) if path_obj.exists(): # Delete the file path_obj.unlink() log(f"[cleanup] Deleted temporary file: {path_obj.name}", file=sys.stderr) deleted_count += 1 # Clean up any associated sidecar files for ext in ['.tags', '.metadata']: sidecar = path_obj.parent / (path_obj.name + ext) if sidecar.exists(): try: sidecar.unlink() log(f"[cleanup] Deleted sidecar: {sidecar.name}", file=sys.stderr) except Exception as e: log(f"[cleanup] Warning: Could not delete sidecar {sidecar.name}: {e}", file=sys.stderr) else: log(f"[cleanup] File does not exist: {file_path}", file=sys.stderr) except Exception as e: log(f"[cleanup] Error deleting file: {e}", file=sys.stderr) # Log summary log(f"[cleanup] Deleted {deleted_count} temporary file(s), emitting {len(perm_results)} permanent result(s)", file=sys.stderr) # Emit permanent results for downstream processing for perm_result in perm_results: pipeline_context.emit(perm_result) return 0 CMDLET = Cmdlet( name="cleanup", summary="Remove temporary artifacts from pipeline (marked with is_temp=True).", usage="cleanup", args=[], details=[ "- Accepts pipeline results that may contain temporary files (screenshots, intermediate artifacts)", "- Deletes files marked with is_temp=True from disk", "- Also cleans up associated sidecar files (.tags, .metadata)", "- Emits only non-temporary results for further processing", "- Typical usage at end of pipeline: ... | add-tag \"tag\" --all | cleanup", "- Exit code 0 if cleanup successful, 1 if no results to process", ], )