Files
Medios-Macina/cmdlets/pipe.py

650 lines
24 KiB
Python
Raw Normal View History

2025-11-25 20:09:33 -08:00
from typing import Any, Dict, Sequence, List, Optional
import sys
import json
import platform
import socket
import re
import subprocess
from ._shared import Cmdlet, CmdletArg, parse_cmdlet_args
2025-11-26 00:02:33 -08:00
from helper.logger import log, debug
2025-11-25 20:09:33 -08:00
from result_table import ResultTable
2025-11-27 10:59:01 -08:00
from helper.mpv_ipc import get_ipc_pipe_path, MPVIPCClient
2025-11-25 20:09:33 -08:00
import pipeline as ctx
2025-12-01 14:42:30 -08:00
from helper.download import is_url_supported_by_ytdlp
2025-11-25 20:09:33 -08:00
2025-11-27 10:59:01 -08:00
from helper.local_library import LocalLibrarySearchOptimizer
from config import get_local_storage_path
2025-12-01 14:42:30 -08:00
from hydrus_health_check import get_cookies_file_path
2025-11-27 10:59:01 -08:00
def _send_ipc_command(command: Dict[str, Any], silent: bool = False) -> Optional[Any]:
2025-11-25 20:09:33 -08:00
"""Send a command to the MPV IPC pipe and return the response."""
try:
2025-11-27 10:59:01 -08:00
ipc_pipe = get_ipc_pipe_path()
client = MPVIPCClient(socket_path=ipc_pipe)
if not client.connect():
return None # MPV not running
response = client.send_command(command)
client.disconnect()
return response
2025-11-25 20:09:33 -08:00
except Exception as e:
2025-11-27 10:59:01 -08:00
if not silent:
debug(f"IPC Error: {e}", file=sys.stderr)
2025-11-25 20:09:33 -08:00
return None
2025-11-27 10:59:01 -08:00
def _get_playlist(silent: bool = False) -> Optional[List[Dict[str, Any]]]:
"""Get the current playlist from MPV. Returns None if MPV is not running."""
2025-11-25 20:09:33 -08:00
cmd = {"command": ["get_property", "playlist"], "request_id": 100}
2025-11-27 10:59:01 -08:00
resp = _send_ipc_command(cmd, silent=silent)
if resp is None:
return None
if resp.get("error") == "success":
2025-11-25 20:09:33 -08:00
return resp.get("data", [])
return []
2025-11-27 10:59:01 -08:00
def _extract_title_from_item(item: Dict[str, Any]) -> str:
"""Extract a clean title from an MPV playlist item, handling memory:// M3U hacks."""
title = item.get("title")
filename = item.get("filename") or ""
# Special handling for memory:// M3U playlists (used to pass titles via IPC)
if "memory://" in filename and "#EXTINF:" in filename:
try:
# Extract title from #EXTINF:-1,Title
# Use regex to find title between #EXTINF:-1, and newline
match = re.search(r"#EXTINF:-1,(.*?)(?:\n|\r|$)", filename)
if match:
extracted_title = match.group(1).strip()
if not title or title == "memory://":
title = extracted_title
# If we still don't have a title, try to find the URL in the M3U content
if not title:
lines = filename.splitlines()
for line in lines:
line = line.strip()
if line and not line.startswith('#') and not line.startswith('memory://'):
# Found the URL, use it as title
return line
except Exception:
pass
return title or filename or "Unknown"
2025-12-01 14:42:30 -08:00
def _ensure_ytdl_cookies() -> None:
"""Ensure yt-dlp options are set correctly for this session."""
from pathlib import Path
cookies_path = get_cookies_file_path()
if cookies_path:
# Check if file exists and has content (use forward slashes for path checking)
check_path = cookies_path.replace('\\', '/')
file_obj = Path(cookies_path)
if file_obj.exists():
file_size = file_obj.stat().st_size
debug(f"Cookies file verified: {check_path} ({file_size} bytes)")
else:
debug(f"WARNING: Cookies file does not exist: {check_path}", file=sys.stderr)
else:
debug("No cookies file configured")
def _monitor_mpv_logs(duration: float = 3.0) -> None:
"""Monitor MPV logs for a short duration to capture errors."""
try:
client = MPVIPCClient()
if not client.connect():
debug("Failed to connect to MPV for log monitoring", file=sys.stderr)
return
# Request log messages
client.send_command({"command": ["request_log_messages", "warn"]})
import time
start_time = time.time()
while time.time() - start_time < duration:
# We need to read raw lines from the socket
if client.is_windows:
try:
line = client.sock.readline()
if line:
try:
msg = json.loads(line)
if msg.get("event") == "log-message":
text = msg.get("text", "").strip()
prefix = msg.get("prefix", "")
level = msg.get("level", "")
if "ytdl" in prefix or level == "error":
debug(f"[MPV {prefix}] {text}", file=sys.stderr)
except json.JSONDecodeError:
pass
except Exception:
break
else:
# Unix socket handling (simplified)
break
time.sleep(0.05)
client.disconnect()
except Exception:
pass
2025-11-30 11:39:04 -08:00
def _queue_items(items: List[Any], clear_first: bool = False) -> bool:
2025-11-27 10:59:01 -08:00
"""Queue items to MPV, starting it if necessary.
Args:
items: List of items to queue
clear_first: If True, the first item will replace the current playlist
2025-11-30 11:39:04 -08:00
Returns:
True if MPV was started, False if items were queued via IPC.
2025-11-27 10:59:01 -08:00
"""
2025-12-01 14:42:30 -08:00
# Just verify cookies are configured, don't try to set via IPC
_ensure_ytdl_cookies()
2025-11-27 10:59:01 -08:00
for i, item in enumerate(items):
# Extract URL/Path
target = None
title = None
if isinstance(item, dict):
target = item.get("target") or item.get("url") or item.get("path") or item.get("filename")
title = item.get("title") or item.get("name")
elif hasattr(item, "target"):
target = item.target
title = getattr(item, "title", None)
elif isinstance(item, str):
target = item
if target:
2025-12-01 14:42:30 -08:00
# Check if it's a yt-dlp supported URL
is_ytdlp = False
if target.startswith("http") and is_url_supported_by_ytdlp(target):
is_ytdlp = True
2025-11-27 10:59:01 -08:00
# Use memory:// M3U hack to pass title to MPV
2025-12-01 14:42:30 -08:00
# Skip for yt-dlp URLs to ensure proper handling
if title and not is_ytdlp:
2025-11-27 10:59:01 -08:00
# Sanitize title for M3U (remove newlines)
safe_title = title.replace('\n', ' ').replace('\r', '')
m3u_content = f"#EXTM3U\n#EXTINF:-1,{safe_title}\n{target}"
target_to_send = f"memory://{m3u_content}"
else:
target_to_send = target
mode = "append"
if clear_first and i == 0:
mode = "replace"
cmd = {"command": ["loadfile", target_to_send, mode], "request_id": 200}
resp = _send_ipc_command(cmd)
if resp is None:
# MPV not running (or died)
# Start MPV with remaining items
_start_mpv(items[i:])
2025-11-30 11:39:04 -08:00
return True
2025-11-27 10:59:01 -08:00
elif resp.get("error") == "success":
# Also set property for good measure
if title:
title_cmd = {"command": ["set_property", "force-media-title", title], "request_id": 201}
_send_ipc_command(title_cmd)
debug(f"Queued: {title or target}")
else:
error_msg = str(resp.get('error'))
debug(f"Failed to queue item: {error_msg}", file=sys.stderr)
2025-11-30 11:39:04 -08:00
return False
2025-11-27 10:59:01 -08:00
2025-11-25 20:09:33 -08:00
def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
"""Manage and play items in the MPV playlist via IPC."""
parsed = parse_cmdlet_args(args, CMDLET)
2025-11-30 11:39:04 -08:00
# Initialize mpv_started flag
mpv_started = False
2025-11-25 20:09:33 -08:00
# Handle positional index argument if provided
index_arg = parsed.get("index")
2025-11-30 11:39:04 -08:00
url_arg = parsed.get("url")
# If index_arg is provided but is not an integer, treat it as a URL
# This allows .pipe "http://..." without -url flag
if index_arg is not None:
try:
int(index_arg)
except ValueError:
# Not an integer, treat as URL if url_arg is not set
if not url_arg:
url_arg = index_arg
index_arg = None
2025-11-25 20:09:33 -08:00
clear_mode = parsed.get("clear")
list_mode = parsed.get("list")
2025-11-25 22:34:41 -08:00
play_mode = parsed.get("play")
pause_mode = parsed.get("pause")
2025-11-27 10:59:01 -08:00
save_mode = parsed.get("save")
load_mode = parsed.get("load")
2025-11-25 22:34:41 -08:00
2025-11-30 11:39:04 -08:00
# Handle URL queuing
mpv_started = False
if url_arg:
mpv_started = _queue_items([url_arg])
2025-12-01 14:42:30 -08:00
# Auto-play the URL when it's queued via .pipe "url" (without explicit flags)
2025-11-30 11:39:04 -08:00
# unless other flags are present
if not (clear_mode or play_mode or pause_mode or save_mode or load_mode):
2025-12-01 14:42:30 -08:00
if mpv_started:
# MPV was just started, wait a moment for it to be ready, then play first item
import time
time.sleep(0.5)
index_arg = "1" # 1-based index for first item
play_mode = True
else:
# MPV was already running, get playlist and play the newly added item
playlist = _get_playlist(silent=True)
if playlist and len(playlist) > 0:
# Auto-play the last item in the playlist (the one we just added)
# Use 1-based indexing
index_arg = str(len(playlist))
play_mode = True
else:
# Fallback: just list the playlist if we can't determine index
list_mode = True
2025-11-30 11:39:04 -08:00
2025-11-27 10:59:01 -08:00
# Handle Save Playlist
if save_mode:
playlist_name = index_arg or f"Playlist {subprocess.check_output(['date', '/t'], shell=True).decode().strip()}"
# If index_arg was used for name, clear it so it doesn't trigger index logic
if index_arg:
index_arg = None
items = _get_playlist()
if not items:
debug("Cannot save: MPV playlist is empty or MPV is not running.")
return 1
# Clean up items for saving (remove current flag, etc)
clean_items = []
for item in items:
# If title was extracted from memory://, we should probably save the original filename
# if it's a URL, or reconstruct a clean object.
# Actually, _extract_title_from_item handles the display title.
# But for playback, we need the 'filename' (which might be memory://...)
# If we save 'memory://...', it will work when loaded back.
clean_items.append(item)
# Use config from context or load it
config_data = config if config else {}
storage_path = get_local_storage_path(config_data)
if not storage_path:
debug("Local storage path not configured.")
return 1
with LocalLibrarySearchOptimizer(storage_path) as db:
if db.save_playlist(playlist_name, clean_items):
debug(f"Playlist saved as '{playlist_name}'")
return 0
else:
debug(f"Failed to save playlist '{playlist_name}'")
return 1
# Handle Load Playlist
current_playlist_name = None
if load_mode:
# Use config from context or load it
config_data = config if config else {}
storage_path = get_local_storage_path(config_data)
if not storage_path:
debug("Local storage path not configured.")
return 1
with LocalLibrarySearchOptimizer(storage_path) as db:
if index_arg:
try:
pl_id = int(index_arg)
2025-11-27 18:35:06 -08:00
# Handle Delete Playlist (if -clear is also passed)
if clear_mode:
if db.delete_playlist(pl_id):
debug(f"Playlist ID {pl_id} deleted.")
# Clear index_arg so we fall through to list mode and show updated list
index_arg = None
# Don't return, let it list the remaining playlists
else:
debug(f"Failed to delete playlist ID {pl_id}.")
return 1
2025-11-27 10:59:01 -08:00
else:
2025-11-27 18:35:06 -08:00
# Handle Load Playlist
result = db.get_playlist_by_id(pl_id)
if result is None:
debug(f"Playlist ID {pl_id} not found.")
return 1
name, items = result
current_playlist_name = name
# Queue items (replacing current playlist)
if items:
_queue_items(items, clear_first=True)
else:
# Empty playlist, just clear
_send_ipc_command({"command": ["playlist-clear"]}, silent=True)
# Switch to list mode to show the result
list_mode = True
index_arg = None
# Fall through to list logic
2025-11-27 10:59:01 -08:00
except ValueError:
debug(f"Invalid playlist ID: {index_arg}")
return 1
2025-11-27 18:35:06 -08:00
# If we deleted or didn't have an index, list playlists
if not index_arg:
2025-11-27 10:59:01 -08:00
playlists = db.get_playlists()
if not playlists:
debug("No saved playlists found.")
return 0
table = ResultTable("Saved Playlists")
for i, pl in enumerate(playlists):
item_count = len(pl.get('items', []))
row = table.add_row()
# row.add_column("ID", str(pl['id'])) # Hidden as per user request
row.add_column("Name", pl['name'])
row.add_column("Items", str(item_count))
row.add_column("Updated", pl['updated_at'])
# Set the playlist items as the result object for this row
# When user selects @N, they get the list of items
# We also set the source command to .pipe -load <ID> so it loads it
table.set_row_selection_args(i, ["-load", str(pl['id'])])
table.set_source_command(".pipe")
# Register results
ctx.set_last_result_table_overlay(table, [p['items'] for p in playlists])
ctx.set_current_stage_table(table)
print(table)
return 0
2025-12-01 14:42:30 -08:00
# Handle Play/Pause commands (but skip if we have index_arg to play a specific item)
if play_mode and index_arg is None:
2025-11-25 22:34:41 -08:00
cmd = {"command": ["set_property", "pause", False], "request_id": 103}
resp = _send_ipc_command(cmd)
if resp and resp.get("error") == "success":
2025-11-26 00:02:33 -08:00
debug("Resumed playback")
2025-11-25 22:34:41 -08:00
return 0
else:
2025-11-26 00:02:33 -08:00
debug("Failed to resume playback (MPV not running?)", file=sys.stderr)
2025-11-25 22:34:41 -08:00
return 1
if pause_mode:
cmd = {"command": ["set_property", "pause", True], "request_id": 104}
resp = _send_ipc_command(cmd)
if resp and resp.get("error") == "success":
2025-11-26 00:02:33 -08:00
debug("Paused playback")
2025-11-25 22:34:41 -08:00
return 0
else:
2025-11-26 00:02:33 -08:00
debug("Failed to pause playback (MPV not running?)", file=sys.stderr)
2025-11-25 22:34:41 -08:00
return 1
2025-11-26 00:29:10 -08:00
# Handle Clear All command (no index provided)
if clear_mode and index_arg is None:
cmd = {"command": ["playlist-clear"], "request_id": 105}
resp = _send_ipc_command(cmd)
if resp and resp.get("error") == "success":
debug("Playlist cleared")
return 0
else:
debug("Failed to clear playlist (MPV not running?)", file=sys.stderr)
return 1
2025-11-25 20:09:33 -08:00
# Handle piped input (add to playlist)
2025-11-27 18:35:06 -08:00
# Skip adding if -list is specified (user just wants to see current playlist)
2025-11-30 11:39:04 -08:00
if result and not list_mode and not url_arg:
2025-11-25 20:09:33 -08:00
# If result is a list of items, add them to playlist
items_to_add = []
if isinstance(result, list):
items_to_add = result
elif isinstance(result, dict):
items_to_add = [result]
2025-11-30 11:39:04 -08:00
if _queue_items(items_to_add):
mpv_started = True
2025-11-25 20:09:33 -08:00
2025-11-27 10:59:01 -08:00
if items_to_add:
2025-11-25 20:09:33 -08:00
# If we added items, we might want to play the first one if nothing is playing?
# For now, just list the playlist
pass
# Get playlist from MPV
items = _get_playlist()
2025-11-27 10:59:01 -08:00
if items is None:
2025-11-30 11:39:04 -08:00
if mpv_started:
2025-12-01 14:42:30 -08:00
# MPV was just started, retry getting playlist after a brief delay
import time
time.sleep(0.3)
items = _get_playlist(silent=True)
2025-11-30 11:39:04 -08:00
2025-12-01 14:42:30 -08:00
if items is None:
# Still can't connect, but MPV is starting
debug("MPV is starting up...")
return 0
else:
debug("MPV is not running. Starting new instance...")
_start_mpv([])
return 0
2025-11-27 10:59:01 -08:00
2025-11-25 20:09:33 -08:00
if not items:
2025-11-27 10:59:01 -08:00
debug("MPV playlist is empty.")
2025-11-25 20:09:33 -08:00
return 0
# If index is provided, perform action (Play or Clear)
if index_arg is not None:
try:
# Handle 1-based index
idx = int(index_arg) - 1
if idx < 0 or idx >= len(items):
2025-11-26 00:02:33 -08:00
debug(f"Index {index_arg} out of range (1-{len(items)}).")
2025-11-25 20:09:33 -08:00
return 1
item = items[idx]
2025-11-27 10:59:01 -08:00
title = _extract_title_from_item(item)
2025-11-25 20:09:33 -08:00
if clear_mode:
# Remove item
cmd = {"command": ["playlist-remove", idx], "request_id": 101}
resp = _send_ipc_command(cmd)
if resp and resp.get("error") == "success":
2025-11-26 00:02:33 -08:00
debug(f"Removed: {title}")
2025-11-25 20:09:33 -08:00
# Refresh items for listing
2025-11-27 10:59:01 -08:00
items = _get_playlist() or []
2025-11-25 20:09:33 -08:00
list_mode = True
index_arg = None
else:
2025-11-26 00:02:33 -08:00
debug(f"Failed to remove item: {resp.get('error') if resp else 'No response'}")
2025-11-25 20:09:33 -08:00
return 1
else:
# Play item
cmd = {"command": ["playlist-play-index", idx], "request_id": 102}
resp = _send_ipc_command(cmd)
if resp and resp.get("error") == "success":
2025-11-25 22:34:41 -08:00
# Ensure playback starts (unpause)
unpause_cmd = {"command": ["set_property", "pause", False], "request_id": 103}
_send_ipc_command(unpause_cmd)
2025-11-26 00:02:33 -08:00
debug(f"Playing: {title}")
2025-12-01 14:42:30 -08:00
# Monitor logs briefly for errors (e.g. ytdl failures)
_monitor_mpv_logs(3.0)
2025-11-25 20:09:33 -08:00
return 0
else:
2025-11-26 00:02:33 -08:00
debug(f"Failed to play item: {resp.get('error') if resp else 'No response'}")
2025-11-25 20:09:33 -08:00
return 1
except ValueError:
2025-11-26 00:02:33 -08:00
debug(f"Invalid index: {index_arg}")
2025-11-25 20:09:33 -08:00
return 1
# List items (Default action or after clear)
2025-11-30 11:39:04 -08:00
if list_mode or (index_arg is None and not url_arg):
2025-11-25 20:09:33 -08:00
if not items:
2025-11-26 00:02:33 -08:00
debug("MPV playlist is empty.")
2025-11-25 20:09:33 -08:00
return 0
2025-11-27 10:59:01 -08:00
# Use the loaded playlist name if available, otherwise default
# Note: current_playlist_name is defined in the load_mode block if a playlist was loaded
try:
table_title = current_playlist_name or "MPV Playlist"
except NameError:
table_title = "MPV Playlist"
table = ResultTable(table_title)
2025-11-25 20:09:33 -08:00
for i, item in enumerate(items):
is_current = item.get("current", False)
2025-11-27 10:59:01 -08:00
title = _extract_title_from_item(item)
2025-11-25 20:09:33 -08:00
# Truncate if too long
2025-11-27 10:59:01 -08:00
if len(title) > 80:
title = title[:77] + "..."
2025-11-25 20:09:33 -08:00
row = table.add_row()
row.add_column("Current", "*" if is_current else "")
row.add_column("Title", title)
table.set_row_selection_args(i, [str(i + 1)])
table.set_source_command(".pipe")
# Register results with pipeline context so @N selection works
ctx.set_last_result_table_overlay(table, items)
ctx.set_current_stage_table(table)
print(table)
return 0
def _start_mpv(items: List[Any]) -> None:
"""Start MPV with a list of items."""
2025-12-01 14:42:30 -08:00
import subprocess
import time as _time_module
# Kill any existing MPV processes to ensure clean start
try:
subprocess.run(['taskkill', '/IM', 'mpv.exe', '/F'],
stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL, timeout=2)
_time_module.sleep(0.5) # Wait for process to die
except Exception:
pass
2025-11-27 10:59:01 -08:00
ipc_pipe = get_ipc_pipe_path()
2025-11-25 20:09:33 -08:00
2025-12-01 14:42:30 -08:00
# Start MPV in idle mode with IPC server
2025-11-27 10:59:01 -08:00
cmd = ['mpv', f'--input-ipc-server={ipc_pipe}', '--idle', '--force-window']
2025-11-25 20:09:33 -08:00
cmd.append('--ytdl-format=bestvideo[height<=?1080]+bestaudio/best[height<=?1080]')
2025-12-01 14:42:30 -08:00
# Use cookies.txt if available, otherwise fallback to browser cookies
cookies_path = get_cookies_file_path()
if cookies_path:
# yt-dlp on Windows needs forward slashes OR properly escaped backslashes
# Using forward slashes is more reliable across systems
cookies_path_normalized = cookies_path.replace('\\', '/')
debug(f"Starting MPV with cookies file: {cookies_path_normalized}")
# yt-dlp expects the cookies option with file path
cmd.append(f'--ytdl-raw-options=cookies={cookies_path_normalized}')
else:
# Use cookies from browser (Chrome) to handle age-restricted content
debug("Starting MPV with browser cookies: chrome")
cmd.append('--ytdl-raw-options=cookies-from-browser=chrome')
2025-11-27 10:59:01 -08:00
try:
kwargs = {}
if platform.system() == 'Windows':
kwargs['creationflags'] = 0x00000008 # DETACHED_PROCESS
2025-12-01 14:42:30 -08:00
# Log the complete MPV command being executed
debug(f"DEBUG: Full MPV command: {' '.join(cmd)}")
subprocess.Popen(cmd, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs)
debug(f"Started MPV process")
# Wait for IPC pipe to be ready
import time
max_retries = 20
for i in range(max_retries):
time.sleep(0.2)
client = MPVIPCClient(socket_path=ipc_pipe)
if client.connect():
client.disconnect()
break
else:
debug("Timed out waiting for MPV IPC connection", file=sys.stderr)
return
# Queue items via IPC
if items:
_queue_items(items)
2025-11-27 10:59:01 -08:00
except Exception as e:
debug(f"Error starting MPV: {e}", file=sys.stderr)
2025-11-25 20:09:33 -08:00
2025-12-01 14:42:30 -08:00
2025-11-25 20:09:33 -08:00
CMDLET = Cmdlet(
name=".pipe",
aliases=["pipe", "playlist", "queue", "ls-pipe"],
summary="Manage and play items in the MPV playlist via IPC",
2025-11-30 11:39:04 -08:00
usage=".pipe [index|url] [-clear] [-url URL]",
2025-11-25 20:09:33 -08:00
args=[
CmdletArg(
name="index",
2025-11-30 11:39:04 -08:00
type="string", # Changed to string to allow URL detection
description="Index of item to play/clear, or URL to queue",
required=False
),
CmdletArg(
name="url",
type="string",
description="URL to queue",
2025-11-25 20:09:33 -08:00
required=False
),
CmdletArg(
name="clear",
type="flag",
2025-11-26 00:29:10 -08:00
description="Remove the selected item, or clear entire playlist if no index provided"
2025-11-25 20:09:33 -08:00
),
CmdletArg(
name="list",
type="flag",
description="List items (default)"
),
2025-11-25 22:34:41 -08:00
CmdletArg(
name="play",
type="flag",
description="Resume playback"
),
CmdletArg(
name="pause",
type="flag",
description="Pause playback"
),
2025-11-27 10:59:01 -08:00
CmdletArg(
name="save",
type="flag",
description="Save current playlist to database"
),
CmdletArg(
name="load",
type="flag",
description="List saved playlists"
),
2025-11-25 20:09:33 -08:00
],
exec=_run
)