Files
Medios-Macina/cmdlets/trim_file.py
2025-12-11 12:47:30 -08:00

295 lines
11 KiB
Python

"""Trim a media file using ffmpeg."""
from __future__ import annotations
from typing import Any, Dict, Sequence, List, Optional
from pathlib import Path
import sys
import json
import subprocess
import shutil
import re
from helper.logger import log, debug
from helper.utils import sha256_file
from . import register
from ._shared import (
Cmdlet,
CmdletArg,
parse_cmdlet_args,
normalize_result_input,
extract_tags_from_result,
extract_title_from_result
)
import pipeline as ctx
CMDLET = Cmdlet(
name="trim-file",
summary="Trim a media file using ffmpeg.",
usage="trim-file [-path <path>] -range <start-end> [-delete]",
arg=[
CmdletArg("-path", description="Path to the file (optional if piped)."),
CmdletArg("-range", required=True, description="Time range to trim (e.g. '3:45-3:55' or '00:03:45-00:03:55')."),
CmdletArg("-delete", type="flag", description="Delete the original file after trimming."),
],
detail=[
"Creates a new file with 'clip_' prefix in the filename/title.",
"Inherits tags from the source file.",
"Adds a relationship to the source file (if hash is available).",
"Output can be piped to add-file.",
]
)
def _parse_time(time_str: str) -> float:
"""Convert time string (HH:MM:SS or MM:SS or SS) to seconds."""
parts = time_str.strip().split(':')
if len(parts) == 3:
return float(parts[0]) * 3600 + float(parts[1]) * 60 + float(parts[2])
elif len(parts) == 2:
return float(parts[0]) * 60 + float(parts[1])
elif len(parts) == 1:
return float(parts[0])
else:
raise ValueError(f"Invalid time format: {time_str}")
def _trim_media(input_path: Path, output_path: Path, start_time: str, end_time: str) -> bool:
"""Trim media file using ffmpeg."""
ffmpeg_path = shutil.which('ffmpeg')
if not ffmpeg_path:
log("ffmpeg not found in PATH", file=sys.stderr)
return False
# Calculate duration to avoid seeking issues if possible, or just use -to
# Using -ss before -i is faster (input seeking) but might be less accurate.
# Using -ss after -i is slower (output seeking) but accurate.
# For trimming, accuracy is usually preferred, but for long files input seeking is better.
# We'll use input seeking (-ss before -i) and -to.
cmd = [
ffmpeg_path, '-y',
'-ss', start_time,
'-i', str(input_path),
'-to', end_time,
'-c', 'copy', # Stream copy for speed and quality preservation
'-map_metadata', '0', # Copy metadata
str(output_path)
]
# If stream copy fails (e.g. cutting not on keyframe), we might need re-encoding.
# But let's try copy first as it's standard for "trimming" without quality loss.
# Note: -to with input seeking (-ss before -i) resets timestamp, so -to refers to duration?
# No, -to refers to position in output if used after -ss?
# Actually, if -ss is before -i, the timestamps are reset to 0.
# So -to should be (end - start).
# Alternatively, use -t (duration).
try:
s = _parse_time(start_time)
e = _parse_time(end_time)
duration = e - s
if duration <= 0:
log(f"Invalid range: start {start_time} >= end {end_time}", file=sys.stderr)
return False
cmd = [
ffmpeg_path, '-y',
'-ss', start_time,
'-i', str(input_path),
'-t', str(duration),
'-c', 'copy',
'-map_metadata', '0',
str(output_path)
]
debug(f"Running ffmpeg: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
log(f"ffmpeg error: {result.stderr}", file=sys.stderr)
return False
return True
except Exception as e:
log(f"Error parsing time or running ffmpeg: {e}", file=sys.stderr)
return False
@register(["trim-file"])
def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
"""Trim a media file."""
# Parse arguments
parsed = parse_cmdlet_args(args, CMDLET)
range_arg = parsed.get("range")
if not range_arg or '-' not in range_arg:
log("Error: -range argument required (format: start-end)", file=sys.stderr)
return 1
start_str, end_str = range_arg.split('-', 1)
delete_original = parsed.get("delete", False)
path_arg = parsed.get("path")
# Collect inputs
inputs = normalize_result_input(result)
# If path arg provided, add it to inputs
if path_arg:
inputs.append({"path": path_arg})
if not inputs:
log("No input files provided.", file=sys.stderr)
return 1
success_count = 0
for item in inputs:
# Resolve file path
file_path = None
if isinstance(item, dict):
file_path = item.get("path") or item.get("target")
elif hasattr(item, "path"):
file_path = item.path
elif isinstance(item, str):
file_path = item
if not file_path:
continue
path_obj = Path(file_path)
if not path_obj.exists():
log(f"File not found: {file_path}", file=sys.stderr)
continue
# Determine output path
# Prepend clip_ to filename
new_filename = f"clip_{path_obj.name}"
output_path = path_obj.parent / new_filename
# Trim
log(f"Trimming {path_obj.name} ({start_str} to {end_str})...", file=sys.stderr)
if _trim_media(path_obj, output_path, start_str, end_str):
log(f"Created clip: {output_path}", file=sys.stderr)
success_count += 1
# Prepare result for pipeline
# 1. Get source hash for relationship
source_hash = None
if isinstance(item, dict):
source_hash = item.get("hash")
elif hasattr(item, "hash"):
source_hash = item.hash
if not source_hash:
try:
source_hash = sha256_file(path_obj)
except Exception:
pass
# 2. Get tags
tags = extract_tags_from_result(item)
# 3. Get title and modify it
title = extract_title_from_result(item)
if not title:
title = path_obj.stem
new_title = f"clip_{title}"
# Update title tag if present
new_tags = []
has_title_tag = False
for t in tags:
if t.lower().startswith("title:"):
new_tags.append(f"title:{new_title}")
has_title_tag = True
else:
new_tags.append(t)
if not has_title_tag:
new_tags.append(f"title:{new_title}")
# 4. Calculate clip hash and update original file's relationships
clip_hash = None
try:
clip_hash = sha256_file(output_path)
except Exception:
pass
if source_hash and clip_hash:
# Update original file in local DB if possible
try:
from config import get_local_storage_path
from helper.folder_store import FolderDB
storage_path = get_local_storage_path(config)
if storage_path:
with FolderDB(storage_path) as db:
# Get original file metadata
# We need to find the original file by hash or path
# Try path first
orig_meta = db.get_metadata(path_obj)
if not orig_meta and source_hash:
# Try by hash
orig_path_resolved = db.search_hash(source_hash)
if orig_path_resolved:
orig_meta = db.get_metadata(orig_path_resolved)
if orig_meta:
# Update relationships
rels = orig_meta.get("relationships", {})
if not isinstance(rels, dict):
rels = {}
# Add clip as "derivative" (since original is the source)
if "derivative" not in rels:
rels["derivative"] = []
if clip_hash not in rels["derivative"]:
rels["derivative"].append(clip_hash)
# Save back to DB
# We need to preserve other metadata
orig_meta["relationships"] = rels
# Ensure hash is set in metadata if we have it
if source_hash and not orig_meta.get("hash"):
orig_meta["hash"] = source_hash
# We need the path to save
save_path = Path(orig_meta.get("path") or path_obj)
db.save_metadata(save_path, orig_meta)
log(f"Updated relationship for original file: {save_path.name}", file=sys.stderr)
except Exception as e:
log(f"Failed to update original file relationships: {e}", file=sys.stderr)
# 5. Construct result
result_dict = {
"path": str(output_path),
"title": new_title,
"tags": new_tags,
"media_kind": "video", # Assumption, or derive
"hash": clip_hash, # Pass calculated hash
"relationships": {
# The source is the KING of this clip
"king": [source_hash] if source_hash else []
}
}
# Emit result
ctx.emit(result_dict)
# Delete original if requested
if delete_original:
try:
path_obj.unlink()
log(f"Deleted original file: {path_obj}", file=sys.stderr)
# Also try to delete sidecars?
# Maybe leave that to user or cleanup cmdlet
except Exception as e:
log(f"Failed to delete original: {e}", file=sys.stderr)
else:
log(f"Failed to trim {path_obj.name}", file=sys.stderr)
return 0 if success_count > 0 else 1