"""Trim a media file using ffmpeg.""" from __future__ import annotations from typing import Any, Dict, Sequence, List, Optional from pathlib import Path import sys import json import subprocess import shutil import re from helper.logger import log, debug from helper.utils import sha256_file from . import register from ._shared import ( Cmdlet, CmdletArg, parse_cmdlet_args, normalize_result_input, extract_tags_from_result, extract_title_from_result ) import pipeline as ctx CMDLET = Cmdlet( name="trim-file", summary="Trim a media file using ffmpeg.", usage="trim-file [-path ] -range [-delete]", args=[ CmdletArg("-path", description="Path to the file (optional if piped)."), CmdletArg("-range", required=True, description="Time range to trim (e.g. '3:45-3:55' or '00:03:45-00:03:55')."), CmdletArg("-delete", type="flag", description="Delete the original file after trimming."), ], details=[ "Creates a new file with 'clip_' prefix in the filename/title.", "Inherits tags from the source file.", "Adds a relationship to the source file (if hash is available).", "Output can be piped to add-file.", ] ) def _parse_time(time_str: str) -> float: """Convert time string (HH:MM:SS or MM:SS or SS) to seconds.""" parts = time_str.strip().split(':') if len(parts) == 3: return float(parts[0]) * 3600 + float(parts[1]) * 60 + float(parts[2]) elif len(parts) == 2: return float(parts[0]) * 60 + float(parts[1]) elif len(parts) == 1: return float(parts[0]) else: raise ValueError(f"Invalid time format: {time_str}") def _trim_media(input_path: Path, output_path: Path, start_time: str, end_time: str) -> bool: """Trim media file using ffmpeg.""" ffmpeg_path = shutil.which('ffmpeg') if not ffmpeg_path: log("ffmpeg not found in PATH", file=sys.stderr) return False # Calculate duration to avoid seeking issues if possible, or just use -to # Using -ss before -i is faster (input seeking) but might be less accurate. # Using -ss after -i is slower (output seeking) but accurate. # For trimming, accuracy is usually preferred, but for long files input seeking is better. # We'll use input seeking (-ss before -i) and -to. cmd = [ ffmpeg_path, '-y', '-ss', start_time, '-i', str(input_path), '-to', end_time, '-c', 'copy', # Stream copy for speed and quality preservation '-map_metadata', '0', # Copy metadata str(output_path) ] # If stream copy fails (e.g. cutting not on keyframe), we might need re-encoding. # But let's try copy first as it's standard for "trimming" without quality loss. # Note: -to with input seeking (-ss before -i) resets timestamp, so -to refers to duration? # No, -to refers to position in output if used after -ss? # Actually, if -ss is before -i, the timestamps are reset to 0. # So -to should be (end - start). # Alternatively, use -t (duration). try: s = _parse_time(start_time) e = _parse_time(end_time) duration = e - s if duration <= 0: log(f"Invalid range: start {start_time} >= end {end_time}", file=sys.stderr) return False cmd = [ ffmpeg_path, '-y', '-ss', start_time, '-i', str(input_path), '-t', str(duration), '-c', 'copy', '-map_metadata', '0', str(output_path) ] debug(f"Running ffmpeg: {' '.join(cmd)}") result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: log(f"ffmpeg error: {result.stderr}", file=sys.stderr) return False return True except Exception as e: log(f"Error parsing time or running ffmpeg: {e}", file=sys.stderr) return False @register(["trim-file"]) def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: """Trim a media file.""" # Parse arguments parsed = parse_cmdlet_args(args, CMDLET) range_arg = parsed.get("range") if not range_arg or '-' not in range_arg: log("Error: -range argument required (format: start-end)", file=sys.stderr) return 1 start_str, end_str = range_arg.split('-', 1) delete_original = parsed.get("delete", False) path_arg = parsed.get("path") # Collect inputs inputs = normalize_result_input(result) # If path arg provided, add it to inputs if path_arg: inputs.append({"file_path": path_arg}) if not inputs: log("No input files provided.", file=sys.stderr) return 1 success_count = 0 for item in inputs: # Resolve file path file_path = None if isinstance(item, dict): file_path = item.get("file_path") or item.get("path") or item.get("target") elif hasattr(item, "file_path"): file_path = item.file_path elif isinstance(item, str): file_path = item if not file_path: continue path_obj = Path(file_path) if not path_obj.exists(): log(f"File not found: {file_path}", file=sys.stderr) continue # Determine output path # Prepend clip_ to filename new_filename = f"clip_{path_obj.name}" output_path = path_obj.parent / new_filename # Trim log(f"Trimming {path_obj.name} ({start_str} to {end_str})...", file=sys.stderr) if _trim_media(path_obj, output_path, start_str, end_str): log(f"Created clip: {output_path}", file=sys.stderr) success_count += 1 # Prepare result for pipeline # 1. Get source hash for relationship source_hash = None if isinstance(item, dict): source_hash = item.get("hash") or item.get("file_hash") elif hasattr(item, "file_hash"): source_hash = item.file_hash if not source_hash: try: source_hash = sha256_file(path_obj) except Exception: pass # 2. Get tags tags = extract_tags_from_result(item) # 3. Get title and modify it title = extract_title_from_result(item) if not title: title = path_obj.stem new_title = f"clip_{title}" # Update title tag if present new_tags = [] has_title_tag = False for t in tags: if t.lower().startswith("title:"): new_tags.append(f"title:{new_title}") has_title_tag = True else: new_tags.append(t) if not has_title_tag: new_tags.append(f"title:{new_title}") # 4. Calculate clip hash and update original file's relationships clip_hash = None try: clip_hash = sha256_file(output_path) except Exception: pass if source_hash and clip_hash: # Update original file in local DB if possible try: from config import get_local_storage_path from helper.local_library import LocalLibraryDB storage_path = get_local_storage_path(config) if storage_path: with LocalLibraryDB(storage_path) as db: # Get original file metadata # We need to find the original file by hash or path # Try path first orig_meta = db.get_metadata(path_obj) if not orig_meta and source_hash: # Try by hash orig_path_resolved = db.search_by_hash(source_hash) if orig_path_resolved: orig_meta = db.get_metadata(orig_path_resolved) if orig_meta: # Update relationships rels = orig_meta.get("relationships", {}) if not isinstance(rels, dict): rels = {} # Add clip as "derivative" (since original is the source) if "derivative" not in rels: rels["derivative"] = [] if clip_hash not in rels["derivative"]: rels["derivative"].append(clip_hash) # Save back to DB # We need to preserve other metadata orig_meta["relationships"] = rels # Ensure hash is set in metadata if we have it if source_hash and not orig_meta.get("hash"): orig_meta["hash"] = source_hash # We need the path to save save_path = Path(orig_meta.get("file_path") or path_obj) db.save_metadata(save_path, orig_meta) log(f"Updated relationship for original file: {save_path.name}", file=sys.stderr) except Exception as e: log(f"Failed to update original file relationships: {e}", file=sys.stderr) # 5. Construct result result_dict = { "file_path": str(output_path), "path": str(output_path), "title": new_title, "tags": new_tags, "media_kind": "video", # Assumption, or derive "hash": clip_hash, # Pass calculated hash "relationships": { # The source is the KING of this clip "king": [source_hash] if source_hash else [] } } # Emit result ctx.emit(result_dict) # Delete original if requested if delete_original: try: path_obj.unlink() log(f"Deleted original file: {path_obj}", file=sys.stderr) # Also try to delete sidecars? # Maybe leave that to user or cleanup cmdlet except Exception as e: log(f"Failed to delete original: {e}", file=sys.stderr) else: log(f"Failed to trim {path_obj.name}", file=sys.stderr) return 0 if success_count > 0 else 1