Files
Medios-Macina/cmdnat/pipe.py
2026-02-04 16:59:04 -08:00

2237 lines
82 KiB
Python

from typing import Any, Dict, Sequence, List, Optional
import os
import sys
import json
import socket
import re
from datetime import datetime
from urllib.parse import urlparse, parse_qs
from pathlib import Path
from cmdlet._shared import Cmdlet, CmdletArg, parse_cmdlet_args, resolve_tidal_manifest_path
from SYS.logger import debug, get_thread_stream, is_debug_enabled, set_debug, set_thread_stream
from SYS.result_table import Table
from MPV.mpv_ipc import MPV
from SYS import pipeline as ctx
from SYS.models import PipeObject
from SYS.config import get_hydrus_access_key, get_hydrus_url
_ALLDEBRID_UNLOCK_CACHE: Dict[str,
str] = {}
def _repo_root() -> Path:
try:
return Path(__file__).resolve().parent.parent
except Exception:
return Path(os.getcwd())
def _playlist_store_path() -> Path:
return _repo_root() / "mpv_playlists.json"
def _load_playlist_store(path: Path) -> Dict[str, Any]:
if not path.exists():
return {"next_id": 1, "playlists": []}
try:
data = json.loads(path.read_text(encoding="utf-8"))
if not isinstance(data, dict):
return {"next_id": 1, "playlists": []}
data.setdefault("next_id", 1)
data.setdefault("playlists", [])
if not isinstance(data["playlists"], list):
data["playlists"] = []
return data
except Exception:
return {"next_id": 1, "playlists": []}
def _save_playlist_store(path: Path, data: Dict[str, Any]) -> bool:
try:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(data, indent=2), encoding="utf-8")
return True
except Exception:
return False
def _save_playlist(name: str, items: List[Any]) -> bool:
path = _playlist_store_path()
data = _load_playlist_store(path)
playlists = data.get("playlists", [])
now = datetime.utcnow().isoformat(timespec="seconds") + "Z"
for pl in playlists:
if str(pl.get("name")).strip().lower() == str(name).strip().lower():
pl["items"] = list(items)
pl["updated_at"] = now
return _save_playlist_store(path, data)
new_id = int(data.get("next_id") or 1)
data["next_id"] = new_id + 1
playlists.append({
"id": new_id,
"name": name,
"items": list(items),
"updated_at": now,
})
data["playlists"] = playlists
return _save_playlist_store(path, data)
def _get_playlist_by_id(playlist_id: int) -> Optional[tuple[str, List[Any]]]:
data = _load_playlist_store(_playlist_store_path())
for pl in data.get("playlists", []):
try:
if int(pl.get("id")) == int(playlist_id):
return str(pl.get("name") or ""), list(pl.get("items") or [])
except Exception:
continue
return None
def _delete_playlist(playlist_id: int) -> bool:
path = _playlist_store_path()
data = _load_playlist_store(path)
playlists = data.get("playlists", [])
kept = []
removed = False
for pl in playlists:
try:
if int(pl.get("id")) == int(playlist_id):
removed = True
continue
except Exception:
pass
kept.append(pl)
data["playlists"] = kept
return _save_playlist_store(path, data) if removed else False
def _get_playlists() -> List[Dict[str, Any]]:
data = _load_playlist_store(_playlist_store_path())
playlists = data.get("playlists", [])
return [dict(pl) for pl in playlists if isinstance(pl, dict)]
def _repo_log_dir() -> Path:
d = _repo_root() / "Log"
try:
d.mkdir(parents=True, exist_ok=True)
except Exception:
pass
return d
def _helper_log_file() -> Path:
return _repo_log_dir() / "medeia-mpv-helper.log"
def _lua_log_file() -> Path:
return _repo_log_dir() / "medeia-mpv-lua.log"
def _try_enable_mpv_file_logging(mpv_log_path: str, *, attempts: int = 3) -> bool:
"""Best-effort enable mpv log-file + verbose level on a running instance.
Note: mpv may not honor changing log-file at runtime on all builds/platforms.
We still try; if it fails, callers can fall back to restart-on-demand.
"""
if not isinstance(mpv_log_path, str) or not mpv_log_path.strip():
return False
mpv_log_path = mpv_log_path.strip()
try:
Path(mpv_log_path).parent.mkdir(parents=True, exist_ok=True)
with open(mpv_log_path, "a", encoding="utf-8", errors="replace"):
pass
except Exception:
pass
ok = False
for _ in range(max(1, int(attempts))):
try:
# Try to set log-file and verbose level.
r1 = _send_ipc_command(
{
"command": ["set_property",
"options/log-file",
mpv_log_path]
}
)
r2 = _send_ipc_command(
{
"command": ["set_property",
"options/msg-level",
"all=v"]
}
)
ok = bool(
(r1 and r1.get("error") == "success")
or (r2 and r2.get("error") == "success")
)
# Emit a predictable line so the file isn't empty if logging is active.
_send_ipc_command(
{
"command": ["print-text",
f"medeia: log enabled -> {mpv_log_path}"]
},
silent=True
)
except Exception:
ok = False
# If mpv has opened the log file, it should have content shortly.
try:
p = Path(mpv_log_path)
if p.exists() and p.is_file() and p.stat().st_size > 0:
return True
except Exception:
pass
try:
import time
time.sleep(0.15)
except Exception:
break
return bool(ok)
def _get_alldebrid_api_key(config: Optional[Dict[str, Any]]) -> Optional[str]:
try:
if not isinstance(config, dict):
return None
provider_cfg = config.get("provider")
if not isinstance(provider_cfg, dict):
return None
ad_cfg = provider_cfg.get("alldebrid")
if not isinstance(ad_cfg, dict):
return None
key = ad_cfg.get("api_key")
if not isinstance(key, str):
return None
key = key.strip()
return key or None
except Exception:
return None
def _is_alldebrid_protected_url(url: str) -> bool:
try:
if not isinstance(url, str):
return False
u = url.strip()
if not u.startswith(("http://", "https://")):
return False
p = urlparse(u)
host = (p.netloc or "").lower()
path = p.path or ""
# AllDebrid file page links (require auth; not directly streamable by mpv)
return host == "alldebrid.com" and path.startswith("/f/")
except Exception:
return False
def _maybe_unlock_alldebrid_url(url: str, config: Optional[Dict[str, Any]]) -> str:
"""Convert AllDebrid protected file URLs into direct streamable links.
When AllDebrid returns `https://alldebrid.com/f/...`, that URL typically requires
authentication. MPV cannot access it without credentials. We transparently call
the AllDebrid API `link/unlock` (using the configured API key) to obtain a direct
URL that MPV can stream.
"""
if not _is_alldebrid_protected_url(url):
return url
cached = _ALLDEBRID_UNLOCK_CACHE.get(url)
if isinstance(cached, str) and cached:
return cached
api_key = _get_alldebrid_api_key(config)
if not api_key:
return url
try:
from API.alldebrid import AllDebridClient
client = AllDebridClient(api_key)
unlocked = client.unlock_link(url)
if isinstance(unlocked, str) and unlocked.strip():
unlocked = unlocked.strip()
_ALLDEBRID_UNLOCK_CACHE[url] = unlocked
return unlocked
except Exception as e:
debug(f"AllDebrid unlock failed for MPV target: {e}", file=sys.stderr)
return url
def _ensure_lyric_overlay(mpv: MPV) -> None:
try:
mpv.ensure_lyric_loader_running()
except Exception:
pass
def _send_ipc_command(command: Dict[str, Any], silent: bool = False, wait: bool = True) -> Optional[Any]:
"""Send a command to the MPV IPC pipe and return the response."""
try:
mpv = MPV()
return mpv.send(command, silent=silent, wait=wait)
except Exception as e:
if not silent:
debug(f"IPC Error: {e}", file=sys.stderr)
return None
def _extract_store_and_hash(item: Any) -> tuple[Optional[str], Optional[str]]:
store: Optional[str] = None
file_hash: Optional[str] = None
try:
if isinstance(item, dict):
store = item.get("store")
file_hash = item.get("hash") or item.get("file_hash")
else:
store = getattr(item, "store", None)
file_hash = getattr(item, "hash", None) or getattr(item, "file_hash", None)
except Exception:
store = None
file_hash = None
try:
store = str(store).strip() if store else None
except Exception:
store = None
try:
file_hash = str(file_hash).strip().lower() if file_hash else None
except Exception:
file_hash = None
if not file_hash:
try:
text = None
if isinstance(item, dict):
text = item.get("path") or item.get("url") or item.get("filename")
else:
text = getattr(item, "path", None) or getattr(item, "url", None)
if text:
m = re.search(r"[0-9a-f]{64}", str(text).lower())
if m:
file_hash = m.group(0)
except Exception:
pass
return store, file_hash
def _set_mpv_item_context(store: Optional[str], file_hash: Optional[str]) -> None:
# Properties consumed by MPV.lyric
try:
_send_ipc_command(
{
"command": ["set_property", "user-data/medeia-item-store", store or ""],
"request_id": 901,
},
silent=True,
)
_send_ipc_command(
{
"command": ["set_property", "user-data/medeia-item-hash", file_hash or ""],
"request_id": 902,
},
silent=True,
)
except Exception:
pass
def _get_playlist(silent: bool = False) -> Optional[List[Dict[str, Any]]]:
"""Get the current playlist from MPV. Returns None if MPV is not running."""
cmd = {
"command": ["get_property",
"playlist"],
"request_id": 100
}
resp = _send_ipc_command(cmd, silent=silent)
if resp is None:
return None
if resp.get("error") == "success":
return resp.get("data", [])
return []
def _extract_title_from_item(item: Dict[str, Any]) -> str:
"""Extract a clean title from an MPV playlist item, handling memory:// M3U hacks."""
title = item.get("title")
filename = item.get("filename") or ""
# Special handling for memory:// M3U playlists (used to pass titles via IPC)
if "memory://" in filename and "#EXTINF:" in filename:
try:
# Extract title from #EXTINF:-1,Title
# Use regex to find title between #EXTINF:-1, and newline
match = re.search(r"#EXTINF:-1,(.*?)(?:\n|\r|$)", filename)
if match:
extracted_title = match.group(1).strip()
if not title or title == "memory://":
title = extracted_title
# If we still don't have a title, try to find the URL in the M3U content
if not title:
lines = filename.splitlines()
for line in lines:
line = line.strip()
if line and not line.startswith("#") and not line.startswith(
"memory://"):
# Found the URL, use it as title
return line
except Exception:
pass
return title or filename or "Unknown"
def _extract_target_from_memory_uri(text: str) -> Optional[str]:
"""Extract the real target URL/path from a memory:// M3U payload."""
if not isinstance(text, str) or not text.startswith("memory://"):
return None
for line in text.splitlines():
line = line.strip()
if not line or line.startswith("#") or line.startswith("memory://"):
continue
return line
return None
def _find_hydrus_instance_for_hash(hash_str: str, file_storage: Any) -> Optional[str]:
"""Find which Hydrus instance serves a specific file hash.
Args:
hash_str: SHA256 hash (64 hex chars)
file_storage: FileStorage instance with Hydrus backends
Returns:
Instance name (e.g., 'home') or None if not found
"""
# Query each Hydrus backend to see if it has this file
for backend_name in file_storage.list_backends():
backend = file_storage[backend_name]
# Check if this is a Hydrus backend by checking class name
backend_class = type(backend).__name__
if backend_class != "HydrusNetwork":
continue
try:
# Query metadata to see if this instance has the file
metadata = backend.get_metadata(hash_str)
if metadata:
return backend_name
except Exception:
# This instance doesn't have the file or had an error
continue
return None
def _find_hydrus_instance_by_url(url: str, file_storage: Any) -> Optional[str]:
"""Find which Hydrus instance matches a given URL.
Args:
url: Full URL (e.g., http://localhost:45869/get_files/file?hash=...)
file_storage: FileStorage instance with Hydrus backends
Returns:
Instance name (e.g., 'home') or None if not found
"""
from urllib.parse import urlparse
parsed_target = urlparse(url)
target_netloc = parsed_target.netloc.lower()
# Check each Hydrus backend's URL
for backend_name in file_storage.list_backends():
backend = file_storage[backend_name]
backend_class = type(backend).__name__
if backend_class != "HydrusNetwork":
continue
# Get the backend's base URL from its client
try:
backend_url = backend._client.base_url
parsed_backend = urlparse(backend_url)
backend_netloc = parsed_backend.netloc.lower()
# Match by netloc (host:port)
if target_netloc == backend_netloc:
return backend_name
except Exception:
continue
return None
def _normalize_playlist_path(text: Optional[str]) -> Optional[str]:
"""Normalize playlist entry paths for dedupe comparisons."""
if not text:
return None
real = _extract_target_from_memory_uri(text) or text
real = real.strip()
if not real:
return None
# If it's already a bare hydrus hash, use it directly
lower_real = real.lower()
if re.fullmatch(r"[0-9a-f]{64}", lower_real):
return lower_real
# If it's a hydrus file URL, normalize to the hash for dedupe
try:
parsed = urlparse(real)
if parsed.scheme in {"http",
"https",
"hydrus"}:
if parsed.path.endswith("/get_files/file"):
qs = parse_qs(parsed.query)
h = qs.get("hash", [None])[0]
if h and re.fullmatch(r"[0-9a-f]{64}", h.lower()):
return h.lower()
except Exception:
pass
# Normalize slashes for Windows paths and lowercase for comparison
real = real.replace("\\", "/")
return real.lower()
def _infer_store_from_playlist_item(
item: Dict[str,
Any],
file_storage: Optional[Any] = None
) -> str:
"""Infer a friendly store label from an MPV playlist entry.
Args:
item: MPV playlist item dict
file_storage: Optional FileStorage instance for querying specific backend instances
Returns:
Store label (e.g., 'home', 'work', 'local', 'youtube', etc.)
"""
name = item.get("filename") if isinstance(item, dict) else None
target = str(name or "")
# Unwrap memory:// M3U wrapper
memory_target = _extract_target_from_memory_uri(target)
if memory_target:
target = memory_target
# Hydrus hashes: bare 64-hex entries
if re.fullmatch(r"[0-9a-f]{64}", target.lower()):
# If we have file_storage, query each Hydrus instance to find which one has this hash
if file_storage:
hash_str = target.lower()
hydrus_instance = _find_hydrus_instance_for_hash(hash_str, file_storage)
if hydrus_instance:
return hydrus_instance
return "hydrus"
lower = target.lower()
if lower.startswith("magnet:"):
return "magnet"
if lower.startswith("hydrus://"):
# Extract hash from hydrus:// URL if possible
if file_storage:
hash_match = re.search(r"[0-9a-f]{64}", target.lower())
if hash_match:
hash_str = hash_match.group(0)
hydrus_instance = _find_hydrus_instance_for_hash(hash_str, file_storage)
if hydrus_instance:
return hydrus_instance
return "hydrus"
# Windows / UNC paths
if re.match(r"^[a-z]:[\\/]",
target,
flags=re.IGNORECASE) or target.startswith("\\\\"):
return "local"
# file:// url
if lower.startswith("file://"):
return "local"
parsed = urlparse(target)
host = (parsed.netloc or "").lower()
path = parsed.path or ""
if not host:
return ""
host_no_port = host.split(":", 1)[0]
host_stripped = host_no_port[4:] if host_no_port.startswith(
"www."
) else host_no_port
if "youtube" in host_stripped or "youtu.be" in target.lower():
return "youtube"
if "soundcloud" in host_stripped:
return "soundcloud"
if "bandcamp" in host_stripped:
return "bandcamp"
if "get_files" in path or "file?hash=" in path or host_stripped in {"127.0.0.1",
"localhost"}:
# Hydrus API URL - try to extract hash and find instance
if file_storage:
# Try to extract hash from URL parameters
hash_match = re.search(r"hash=([0-9a-f]{64})", target.lower())
if hash_match:
hash_str = hash_match.group(1)
hydrus_instance = _find_hydrus_instance_for_hash(hash_str, file_storage)
if hydrus_instance:
return hydrus_instance
# If no hash in URL, try matching the base URL to configured instances
hydrus_instance = _find_hydrus_instance_by_url(target, file_storage)
if hydrus_instance:
return hydrus_instance
return "hydrus"
if re.match(r"^\d+\.\d+\.\d+\.\d+$", host_stripped) and "get_files" in path:
# IP-based Hydrus URL
if file_storage:
hash_match = re.search(r"hash=([0-9a-f]{64})", target.lower())
if hash_match:
hash_str = hash_match.group(1)
hydrus_instance = _find_hydrus_instance_for_hash(hash_str, file_storage)
if hydrus_instance:
return hydrus_instance
hydrus_instance = _find_hydrus_instance_by_url(target, file_storage)
if hydrus_instance:
return hydrus_instance
return "hydrus"
parts = host_stripped.split(".")
if len(parts) >= 2:
return parts[-2] or host_stripped
return host_stripped
def _build_hydrus_header(config: Dict[str, Any]) -> Optional[str]:
"""Return header string for Hydrus auth if configured."""
try:
key = get_hydrus_access_key(config)
except Exception:
key = None
if not key:
return None
return f"Hydrus-Client-API-Access-Key: {key}"
def _build_ytdl_options(config: Optional[Dict[str,
Any]],
hydrus_header: Optional[str]) -> Optional[str]:
"""Compose ytdl-raw-options string including cookies and optional Hydrus header."""
opts: List[str] = []
cookies_path = None
try:
from tool.ytdlp import YtDlpTool
cookiefile = YtDlpTool(config or {}).resolve_cookiefile()
if cookiefile is not None:
cookies_path = str(cookiefile)
except Exception:
cookies_path = None
if cookies_path:
opts.append(f"cookies={cookies_path.replace('\\', '/')}")
else:
opts.append("cookies-from-browser=chrome")
if hydrus_header:
opts.append(f"add-header={hydrus_header}")
return ",".join(opts) if opts else None
def _is_hydrus_path(path: str, hydrus_url: Optional[str]) -> bool:
if not path:
return False
lower = path.lower()
if "hydrus://" in lower:
return True
parsed = urlparse(path)
host = (parsed.netloc or "").lower()
path_part = parsed.path or ""
if hydrus_url:
try:
hydrus_host = urlparse(hydrus_url).netloc.lower()
if hydrus_host and hydrus_host in host:
return True
except Exception:
pass
if "get_files" in path_part or "file?hash=" in path_part:
return True
if re.match(r"^\d+\.\d+\.\d+\.\d+$", host) and "get_files" in path_part:
return True
return False
def _ensure_ytdl_cookies(config: Optional[Dict[str, Any]] = None) -> None:
"""Ensure yt-dlp options are set correctly for this session."""
from pathlib import Path
cookies_path = None
try:
from tool.ytdlp import YtDlpTool
cookiefile = YtDlpTool(config or {}).resolve_cookiefile()
if cookiefile is not None:
cookies_path = str(cookiefile)
except Exception:
cookies_path = None
if cookies_path:
# Check if file exists and has content (use forward slashes for path checking)
check_path = cookies_path.replace("\\", "/")
file_obj = Path(cookies_path)
if file_obj.exists():
file_size = file_obj.stat().st_size
debug(f"Cookies file verified: {check_path} ({file_size} bytes)")
else:
debug(
f"WARNING: Cookies file does not exist: {check_path}",
file=sys.stderr
)
else:
debug("No cookies file configured")
def _monitor_mpv_logs(duration: float = 3.0) -> None:
"""Monitor MPV logs for a short duration to capture errors."""
try:
mpv = MPV()
client = mpv.client()
if not client.connect():
debug("Failed to connect to MPV for log monitoring", file=sys.stderr)
return
# Request log messages
client.send_command({
"command": ["request_log_messages",
"warn"]
})
# On Windows named pipes, avoid blocking the CLI; skip log read entirely
if client.is_windows:
client.disconnect()
return
import time
start_time = time.time()
# Unix sockets already have timeouts set; read until duration expires
sock_obj = client.sock
if not isinstance(sock_obj, socket.socket):
client.disconnect()
return
while time.time() - start_time < duration:
try:
chunk = sock_obj.recv(4096)
except socket.timeout:
continue
except Exception:
break
if not chunk:
break
for line in chunk.decode("utf-8", errors="ignore").splitlines():
try:
msg = json.loads(line)
if msg.get("event") == "log-message":
text = msg.get("text", "").strip()
prefix = msg.get("prefix", "")
level = msg.get("level", "")
if "ytdl" in prefix or level == "error":
debug(f"[MPV {prefix}] {text}", file=sys.stderr)
except json.JSONDecodeError:
continue
client.disconnect()
except Exception:
pass
def _tail_text_file(path: str,
*,
max_lines: int = 120,
max_bytes: int = 65536) -> List[str]:
try:
p = Path(str(path))
if not p.exists() or not p.is_file():
return []
except Exception:
return []
try:
with open(p, "rb") as f:
try:
f.seek(0, os.SEEK_END)
end = f.tell()
start = max(0, end - int(max_bytes))
f.seek(start, os.SEEK_SET)
except Exception:
pass
data = f.read()
text = data.decode("utf-8", errors="replace")
lines = text.splitlines()
if len(lines) > max_lines:
return lines[-max_lines:]
return lines
except Exception:
return []
def _get_playable_path(
item: Any,
file_storage: Optional[Any],
config: Optional[Dict[str,
Any]]
) -> Optional[tuple[str,
Optional[str]]]:
"""Extract a playable path/URL from an item, handling different store types.
Args:
item: Item to extract path from (dict, PipeObject, or string)
file_storage: FileStorage instance for querying backends
config: Config dict for Hydrus URL
Returns:
Tuple of (path, title) or None if no valid path found
"""
path: Optional[str] = None
title: Optional[str] = None
store: Optional[str] = None
file_hash: Optional[str] = None
# Extract fields from item - prefer a disk path ('path'), but accept 'url' as fallback for providers
if isinstance(item, dict):
path = item.get("path")
# Fallbacks for provider-style entries where URL is stored in 'url' or 'source_url' or 'target'
if not path:
path = item.get("url") or item.get("source_url") or item.get("target")
if not path:
known = item.get("url") or item.get("url") or []
if known and isinstance(known, list):
path = known[0]
title = item.get("title") or item.get("file_title")
store = item.get("store")
file_hash = item.get("hash")
elif (hasattr(item,
"path") or hasattr(item,
"url") or hasattr(item,
"source_url") or hasattr(item,
"store")
or hasattr(item,
"hash")):
# Handle PipeObject / dataclass objects - prefer path, but fall back to url/source_url attributes
path = getattr(item, "path", None)
if not path:
path = (
getattr(item,
"url",
None) or getattr(item,
"source_url",
None) or getattr(item,
"target",
None)
)
if not path:
known = getattr(item,
"url",
None) or (getattr(item,
"extra",
None) or {}).get("url")
if known and isinstance(known, list):
path = known[0]
title = getattr(item, "title", None) or getattr(item, "file_title", None)
store = getattr(item, "store", None)
file_hash = getattr(item, "hash", None)
elif isinstance(item, str):
path = item
# Debug: show incoming values
try:
debug(f"_get_playable_path: store={store}, path={path}, hash={file_hash}")
except Exception:
pass
# Treat common placeholders as missing.
if isinstance(path,
str) and path.strip().lower() in {"",
"n/a",
"na",
"none"}:
path = None
manifest_path = resolve_tidal_manifest_path(item)
if manifest_path:
path = manifest_path
else:
# If this is a tidal:// placeholder and we couldn't resolve a manifest, do not fall back.
try:
if isinstance(path, str) and path.strip().lower().startswith("tidal:"):
try:
meta = None
if isinstance(item, dict):
meta = item.get("full_metadata") or item.get("metadata")
else:
meta = getattr(item, "full_metadata", None) or getattr(item, "metadata", None)
if isinstance(meta, dict) and meta.get("_tidal_manifest_error"):
print(str(meta.get("_tidal_manifest_error")), file=sys.stderr)
except Exception:
pass
print("Tidal selection has no playable DASH MPD manifest.", file=sys.stderr)
return None
except Exception:
pass
if title is not None and not isinstance(title, str):
title = str(title)
if isinstance(file_hash, str):
file_hash = file_hash.strip().lower()
# Resolve hash+store into a playable target (file path or URL).
# This is unrelated to MPV's IPC pipe and keeps "pipe" terminology reserved for:
# - MPV IPC pipe (transport)
# - PipeObject (pipeline data)
backend_target_resolved = False
if store and file_hash and file_hash != "unknown" and file_storage:
try:
backend = file_storage[store]
except Exception:
backend = None
if backend is not None:
backend_class = type(backend).__name__
backend_target_resolved = True
# HydrusNetwork: build a playable API file URL without browser side-effects.
if backend_class == "HydrusNetwork":
try:
client = getattr(backend, "_client", None)
base_url = getattr(client, "url", None)
if base_url:
base_url = str(base_url).rstrip("/")
# Auth is provided via http-header-fields (set in _queue_items).
path = f"{base_url}/get_files/file?hash={file_hash}"
except Exception as e:
debug(
f"Error building Hydrus URL from store '{store}': {e}",
file=sys.stderr
)
else:
backend_target_resolved = False
if isinstance(path, str) and path.startswith(("http://", "https://")) and not backend_target_resolved:
return (path, title)
if not path:
# As a last resort, if we have a hash and no path/url, return the hash.
# _queue_items will convert it to a Hydrus file URL when possible.
if store and file_hash and file_hash != "unknown":
return (str(file_hash), title)
return None
if not isinstance(path, str):
path = str(path)
return (path, title)
def _queue_items(
items: List[Any],
clear_first: bool = False,
config: Optional[Dict[str,
Any]] = None,
start_opts: Optional[Dict[str,
Any]] = None,
wait: bool = True,
) -> bool:
"""Queue items to MPV, starting it if necessary.
Args:
items: List of items to queue
clear_first: If True, the first item will replace the current playlist
wait: If True, wait for MPV to acknowledge the loadfile command.
Returns:
True if MPV was started, False if items were queued via IPC.
"""
# Debug: print incoming items
try:
debug(
f"_queue_items: count={len(items)} types={[type(i).__name__ for i in items]}"
)
except Exception:
pass
# Just verify cookies are configured, don't try to set via IPC
_ensure_ytdl_cookies(config)
hydrus_header = _build_hydrus_header(config or {})
ytdl_opts = _build_ytdl_options(config, hydrus_header)
hydrus_url = None
try:
hydrus_url = get_hydrus_url(config) if config is not None else None
except Exception:
hydrus_url = None
# Initialize Store registry for path resolution
file_storage = None
try:
from Store import Store
file_storage = Store(config or {})
except Exception as e:
debug(f"Warning: Could not initialize Store registry: {e}", file=sys.stderr)
# Dedupe existing playlist before adding more (unless we're replacing it)
existing_targets: set[str] = set()
if not clear_first:
playlist = _get_playlist(silent=True) or []
dup_indexes: List[int] = []
for idx, pl_item in enumerate(playlist):
fname = pl_item.get("filename") if isinstance(pl_item,
dict) else str(pl_item)
alt = pl_item.get("playlist-path") if isinstance(pl_item, dict) else None
norm = _normalize_playlist_path(fname) or _normalize_playlist_path(alt)
if not norm:
continue
if norm in existing_targets:
dup_indexes.append(idx)
else:
existing_targets.add(norm)
# Remove duplicates from playlist starting from the end to keep indices valid
for idx in reversed(dup_indexes):
try:
_send_ipc_command(
{
"command": ["playlist-remove",
idx],
"request_id": 106
},
silent=True
)
except Exception:
pass
new_targets: set[str] = set()
for i, item in enumerate(items):
# Debug: show the item being processed
try:
debug(
f"_queue_items: processing idx={i} type={type(item)} repr={repr(item)[:200]}"
)
except Exception:
pass
# Extract URL/Path using store-aware logic
result = _get_playable_path(item, file_storage, config)
if not result:
debug(f"_queue_items: item idx={i} produced no playable path")
continue
target, title = result
# MPD/DASH playback requires ffmpeg protocol whitelist (file + https + crypto etc).
# Set it via IPC before loadfile so the currently running MPV can play the manifest.
try:
target_str = str(target or "")
if re.search(r"\.mpd($|\?)", target_str.lower()):
_send_ipc_command(
{
"command": [
"set_property",
"options/demuxer-lavf-o",
"protocol_whitelist=file,https,tcp,tls,crypto,data",
],
"request_id": 198,
},
silent=True,
)
except Exception:
pass
# If the target is an AllDebrid protected file URL, unlock it to a direct link for MPV.
try:
if isinstance(target, str):
target = _maybe_unlock_alldebrid_url(target, config)
except Exception:
pass
# Prefer per-item Hydrus instance credentials when the item belongs to a Hydrus store.
effective_hydrus_url = hydrus_url
effective_hydrus_header = hydrus_header
effective_ytdl_opts = ytdl_opts
item_store_name: Optional[str] = None
try:
item_store = None
if isinstance(item, dict):
item_store = item.get("store")
else:
item_store = getattr(item, "store", None)
if item_store:
item_store_name = str(item_store).strip() or None
if item_store and file_storage:
try:
backend = file_storage[str(item_store)]
except Exception:
backend = None
if backend is not None and type(backend).__name__ == "HydrusNetwork":
client = getattr(backend, "_client", None)
base_url = getattr(client, "url", None)
key = getattr(client, "access_key", None)
if base_url:
effective_hydrus_url = str(base_url).rstrip("/")
if key:
effective_hydrus_header = (
f"Hydrus-Client-API-Access-Key: {str(key).strip()}"
)
effective_ytdl_opts = _build_ytdl_options(
config,
effective_hydrus_header
)
except Exception:
pass
if target:
# If we just have a hydrus hash, build a direct file URL for MPV
if re.fullmatch(r"[0-9a-f]{64}",
str(target).strip().lower()) and effective_hydrus_url:
target = (
f"{effective_hydrus_url.rstrip('/')}/get_files/file?hash={str(target).strip()}"
)
norm_key = _normalize_playlist_path(target) or str(target).strip().lower()
if norm_key in existing_targets or norm_key in new_targets:
debug(f"Skipping duplicate playlist entry: {title or target}")
continue
new_targets.add(norm_key)
# Use memory:// M3U hack to pass title to MPV.
# This is especially important for remote URLs (e.g., YouTube) where MPV may otherwise
# show the raw URL as the playlist title.
if title:
# Sanitize title for M3U (remove newlines)
safe_title = title.replace("\n", " ").replace("\r", "")
# Carry the store name for hash URLs so MPV.lyric can resolve the backend.
# This is especially important for local file-server URLs like /get_files/file?hash=...
target_for_m3u = target
try:
if (item_store_name and isinstance(target_for_m3u,
str)
and target_for_m3u.startswith("http")):
if "get_files/file" in target_for_m3u and "store=" not in target_for_m3u:
sep = "&" if "?" in target_for_m3u else "?"
target_for_m3u = f"{target_for_m3u}{sep}store={item_store_name}"
except Exception:
target_for_m3u = target
m3u_content = f"#EXTM3U\n#EXTINF:-1,{safe_title}\n{target_for_m3u}"
target_to_send = f"memory://{m3u_content}"
else:
target_to_send = target
mode = "append"
if clear_first and i == 0:
mode = "replace"
# If we're replacing, this will start playing immediately: set store/hash context
# so MPV.lyric can resolve the correct backend for notes.
if mode == "replace":
try:
s, h = _extract_store_and_hash(item)
_set_mpv_item_context(s, h)
except Exception:
pass
# If this is a Hydrus path, set header property and yt-dlp headers before loading.
# Use the real target (not the memory:// wrapper) for detection.
if effective_hydrus_header and _is_hydrus_path(str(target),
effective_hydrus_url):
header_cmd = {
"command":
["set_property",
"http-header-fields",
effective_hydrus_header],
"request_id":
199,
}
_send_ipc_command(header_cmd, silent=True)
if effective_ytdl_opts:
ytdl_cmd = {
"command":
["set_property",
"ytdl-raw-options",
effective_ytdl_opts],
"request_id": 197,
}
_send_ipc_command(ytdl_cmd, silent=True)
# For memory:// M3U payloads (used to carry titles), use loadlist so mpv parses
# the content as a playlist and does not expose #EXTINF lines as entries.
command_name = "loadfile"
try:
if isinstance(target_to_send, str) and target_to_send.startswith("memory://") and "#EXTM3U" in target_to_send:
command_name = "loadlist"
except Exception:
pass
cmd = {
"command": [command_name,
target_to_send,
mode],
"request_id": 200
}
try:
debug(f"Sending MPV {command_name}: {target_to_send} mode={mode} wait={wait}")
resp = _send_ipc_command(cmd, silent=True, wait=wait)
debug(f"MPV {command_name} response: {resp}")
except Exception as e:
debug(f"Exception sending {command_name} to MPV: {e}", file=sys.stderr)
resp = None
if resp is None:
# MPV not running (or died)
# Start MPV with remaining items
debug(
f"MPV not running/died while queuing, starting MPV with remaining items: {items[i:]}"
)
_start_mpv(items[i:], config=config, start_opts=start_opts)
return True
elif resp.get("error") == "success":
# Do not set `force-media-title` when queueing items. It's a global property and
# would change the MPV window title even if the item isn't currently playing.
debug(f"Queued: {title or target}")
else:
error_msg = str(resp.get("error"))
debug(f"Failed to queue item: {error_msg}", file=sys.stderr)
return False
def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
"""Manage and play items in the MPV playlist via IPC."""
parsed = parse_cmdlet_args(args, CMDLET)
log_requested = bool(parsed.get("log"))
borderless = bool(parsed.get("borderless"))
prev_debug = is_debug_enabled()
prev_stream = get_thread_stream()
devnull_fh = None
mpv_log_path: Optional[str] = None
try:
# Default: keep `.pipe` quiet even if debug is enabled.
# With -log: enable debug and route it to stdout (pipeable), plus enable mpv log-file.
if log_requested:
set_debug(True)
set_thread_stream(sys.stdout)
try:
log_dir = _repo_log_dir()
mpv_log_path = str((log_dir / "medeia-mpv.log").resolve())
except Exception:
mpv_log_path = str(
(
Path(os.environ.get("TEMP") or os.environ.get("TMP") or ".") /
"medeia-mpv.log"
).resolve()
)
# Ensure file exists early so we can tail it even if mpv writes later.
try:
Path(mpv_log_path).parent.mkdir(parents=True, exist_ok=True)
with open(mpv_log_path, "a", encoding="utf-8", errors="replace"):
pass
except Exception:
pass
debug(f"MPV log file: {mpv_log_path}")
# Try to enable mpv file logging on the currently running instance.
# (If mpv wasn't started with --log-file, this may not work everywhere.)
try:
_try_enable_mpv_file_logging(mpv_log_path, attempts=3)
except Exception:
pass
# If mpv is already running, set log options live via IPC.
try:
mpv_live = MPV()
if mpv_live.is_running():
mpv_live.set_property("options/log-file", mpv_log_path)
mpv_live.set_property("options/msg-level", "all=v")
except Exception:
pass
else:
if prev_debug:
try:
devnull_fh = open(
os.devnull,
"w",
encoding="utf-8",
errors="replace"
)
set_thread_stream(devnull_fh)
except Exception:
pass
start_opts: Dict[str,
Any] = {
"borderless": borderless,
"mpv_log_path": mpv_log_path
}
# Store registry is only needed for certain playlist listing/inference paths.
# Keep it lazy so a simple `.pipe <url> -play` doesn't trigger Hydrus/API calls.
file_storage = None
# Initialize mpv_started flag
mpv_started = False
# Handle positional index argument if provided
index_arg = parsed.get("index")
url_arg = parsed.get("url")
# If index_arg is provided but is not an integer, treat it as a URL
# This allows .pipe "http://..." without -url flag
if index_arg is not None:
try:
int(index_arg)
except ValueError:
# Not an integer, treat as URL if url_arg is not set
if not url_arg:
url_arg = index_arg
index_arg = None
clear_mode = parsed.get("clear")
list_mode = parsed.get("list")
play_mode = parsed.get("play")
pause_mode = parsed.get("pause")
replace_mode = parsed.get("replace")
save_mode = parsed.get("save")
load_mode = parsed.get("load")
current_mode = parsed.get("current")
# Pure log mode: `.pipe -log` should not run any playlist actions and
# should not print the playlist table. It should only enable/tail logs
# (handled in the `finally` block).
only_log = bool(
log_requested and not url_arg and index_arg is None and not clear_mode
and not list_mode and not play_mode and not pause_mode and not save_mode
and not load_mode and not current_mode and not replace_mode
)
if only_log:
return 0
# Handle --current flag: emit currently playing item to pipeline
if current_mode:
items = _get_playlist()
if items is None:
debug("MPV is not running or not accessible.", file=sys.stderr)
return 1
# Find the currently playing item
current_item = None
for item in items:
if item.get("current", False):
current_item = item
break
if current_item is None:
debug("No item is currently playing.", file=sys.stderr)
return 1
# Build result object with file info
title = _extract_title_from_item(current_item)
filename = current_item.get("filename", "")
# Emit the current item to pipeline
result_obj = {
"path": filename,
"title": title,
"cmdlet_name": ".mpv",
"source": "pipe",
"__pipe_index": items.index(current_item),
}
ctx.emit(result_obj)
debug(f"Emitted current item: {title}")
return 0
# Handle URL queuing
mpv_started = False
if url_arg:
# If -replace is used, or if we have a URL and -play is requested,
# we prefer 'replace' mode which starts playback immediately and avoids IPC overhead.
# NOTE: Use wait=False for URLs because yt-dlp resolution can be slow and
# would cause the calling Lua script to timeout.
queue_replace = bool(replace_mode)
if play_mode and not replace_mode:
# If -play is used with a URL, treat it as "play this now".
# For better UX, we'll replace the current playlist.
queue_replace = True
mpv_started = _queue_items([url_arg], clear_first=queue_replace, config=config, start_opts=start_opts, wait=False)
if not (clear_mode or play_mode or pause_mode or save_mode or load_mode or replace_mode):
if mpv_started:
# MPV was just started, wait a moment for it to be ready, then play first item
import time
time.sleep(0.5)
index_arg = "1" # 1-based index for first item
play_mode = True
else:
# MPV was already running, just show the updated playlist.
list_mode = True
# If we used queue_replace, the URL is already playing. Clear play/index args to avoid redundant commands.
if queue_replace:
play_mode = False
index_arg = None
# Ensure lyric overlay is running (auto-discovery handled by MPV.lyric).
try:
mpv = MPV()
_ensure_lyric_overlay(mpv)
except Exception:
pass
# Handle Save Playlist
if save_mode:
# Avoid `shell=True` / `date /t` on Windows (can flash a cmd.exe window).
# Use Python's datetime instead.
from datetime import datetime
playlist_name = index_arg or f"Playlist {datetime.now().strftime('%Y-%m-%d')}"
# If index_arg was used for name, clear it so it doesn't trigger index logic
if index_arg:
index_arg = None
items = _get_playlist()
if not items:
debug("Cannot save: MPV playlist is empty or MPV is not running.")
return 1
# Clean up items for saving (remove current flag, etc)
clean_items = []
for item in items:
# If title was extracted from memory://, we should probably save the original filename
# if it's a URL, or reconstruct a clean object.
# Actually, _extract_title_from_item handles the display title.
# But for playback, we need the 'filename' (which might be memory://...)
# If we save 'memory://...', it will work when loaded back.
clean_items.append(item)
if _save_playlist(playlist_name, clean_items):
debug(f"Playlist saved as '{playlist_name}'")
return 0
debug(f"Failed to save playlist '{playlist_name}'")
return 1
# Handle Load Playlist
current_playlist_name = None
if load_mode:
if index_arg:
try:
pl_id = int(index_arg)
# Handle Delete Playlist (if -clear is also passed)
if clear_mode:
if _delete_playlist(pl_id):
debug(f"Playlist ID {pl_id} deleted.")
# Clear index_arg so we fall through to list mode and show updated list
index_arg = None
# Don't return, let it list the remaining playlists
else:
debug(f"Failed to delete playlist ID {pl_id}.")
return 1
else:
# Handle Load Playlist
result = _get_playlist_by_id(pl_id)
if result is None:
debug(f"Playlist ID {pl_id} not found.")
return 1
name, items = result
current_playlist_name = name
# Queue items (replacing current playlist)
if items:
_queue_items(
items,
clear_first=True,
config=config,
start_opts=start_opts
)
else:
# Empty playlist, just clear
_send_ipc_command(
{
"command": ["playlist-clear"]
},
silent=True
)
# Switch to list mode to show the result
list_mode = True
index_arg = None
# Fall through to list logic
except ValueError:
debug(f"Invalid playlist ID: {index_arg}")
return 1
# If we deleted or didn't have an index, list playlists
if not index_arg:
playlists = _get_playlists()
if not playlists:
debug("No saved playlists found.")
return 0
table = Table("Saved Playlists")
for i, pl in enumerate(playlists):
item_count = len(pl.get("items", []))
row = table.add_row()
# row.add_column("ID", str(pl['id'])) # Hidden as per user request
row.add_column("Name", pl["name"])
row.add_column("Items", str(item_count))
row.add_column("Updated", pl.get("updated_at") or "")
# Set the playlist items as the result object for this row
# When user selects @N, they get the list of items
# We also set the source command to .pipe -load <ID> so it loads it
table.set_row_selection_args(i, ["-load", str(pl["id"])])
table.set_source_command(".mpv")
# Register results
ctx.set_last_result_table_overlay(
table,
[p["items"] for p in playlists]
)
ctx.set_current_stage_table(table)
# Do not print directly here.
# Both CmdletExecutor and PipelineExecutor render the current-stage/overlay table,
# so printing here would duplicate output.
return 0
# Everything below was originally outside a try block; keep it inside so `start_opts` is in scope.
# Handle Play/Pause commands (but skip if we have index_arg to play a specific item)
if play_mode and index_arg is None:
cmd = {
"command": ["set_property",
"pause",
False],
"request_id": 103
}
resp = _send_ipc_command(cmd)
if resp and resp.get("error") == "success":
debug("Resumed playback")
return 0
else:
debug("Failed to resume playback (MPV not running?)", file=sys.stderr)
return 1
if pause_mode:
cmd = {
"command": ["set_property",
"pause",
True],
"request_id": 104
}
resp = _send_ipc_command(cmd)
if resp and resp.get("error") == "success":
debug("Paused playback")
return 0
else:
debug("Failed to pause playback (MPV not running?)", file=sys.stderr)
return 1
# Handle Clear All command (no index provided)
if clear_mode and index_arg is None:
cmd = {
"command": ["playlist-clear"],
"request_id": 105
}
resp = _send_ipc_command(cmd)
if resp and resp.get("error") == "success":
debug("Playlist cleared")
return 0
else:
debug("Failed to clear playlist (MPV not running?)", file=sys.stderr)
return 1
# Handle piped input (add to playlist)
# Skip adding if -list is specified (user just wants to see current playlist)
if result and not list_mode and not url_arg:
playlist_before = _get_playlist(silent=True)
idle_before = None
try:
idle_resp = _send_ipc_command(
{
"command": ["get_property",
"idle-active"],
"request_id": 111
},
silent=True
)
if idle_resp and idle_resp.get("error") == "success":
idle_before = bool(idle_resp.get("data"))
except Exception:
idle_before = None
# If result is a list of items, add them to playlist
items_to_add = []
if isinstance(result, list):
items_to_add = result
elif isinstance(result, dict):
items_to_add = [result]
else:
# Handle PipeObject or any other object type
items_to_add = [result]
# Debug: inspect incoming result and attributes
try:
debug(
f"pipe._run: received result type={type(result)} repr={repr(result)[:200]}"
)
debug(
f"pipe._run: attrs path={getattr(result, 'path', None)} url={getattr(result, 'url', None)} store={getattr(result, 'store', None)} hash={getattr(result, 'hash', None)}"
)
except Exception:
pass
queued_started_mpv = False
if items_to_add and _queue_items(items_to_add,
config=config,
start_opts=start_opts):
mpv_started = True
queued_started_mpv = True
# Ensure lyric overlay is running when we queue anything via .pipe.
if items_to_add and not queued_started_mpv:
try:
mpv = MPV()
_ensure_lyric_overlay(mpv)
except Exception:
pass
# Auto-play when a single item is piped and mpv was idle/empty.
if items_to_add and len(items_to_add) == 1 and not queued_started_mpv:
try:
playlist_after = _get_playlist(silent=True)
before_len = len(playlist_before
) if isinstance(playlist_before,
list) else 0
after_len = len(playlist_after
) if isinstance(playlist_after,
list) else 0
should_autoplay = False
if idle_before is True:
should_autoplay = True
elif isinstance(playlist_before,
list) and len(playlist_before) == 0:
should_autoplay = True
if should_autoplay and after_len > 0:
idx_to_play = min(max(0, before_len), after_len - 1)
# Prefer the store/hash from the piped item when auto-playing.
try:
s, h = _extract_store_and_hash(items_to_add[0])
_set_mpv_item_context(s, h)
except Exception:
pass
play_resp = _send_ipc_command(
{
"command": ["playlist-play-index",
idx_to_play],
"request_id": 112
},
silent=True,
)
_send_ipc_command(
{
"command": ["set_property",
"pause",
False],
"request_id": 113
},
silent=True,
)
if play_resp and play_resp.get("error") == "success":
debug("Auto-playing piped item")
# Start lyric overlay (auto-discovery handled by MPV.lyric).
try:
mpv = MPV()
_ensure_lyric_overlay(mpv)
except Exception:
pass
except Exception:
pass
# Get playlist from MPV (silent: we handle MPV-not-running gracefully below)
items = _get_playlist(silent=True)
if items is None:
if mpv_started:
# MPV was just started, retry getting playlist after a brief delay
import time
time.sleep(0.3)
items = _get_playlist(silent=True)
if items is None:
# Still can't connect, but MPV is starting
debug("MPV is starting up...")
return 0
else:
# Do not auto-launch MPV when no action/inputs were provided; avoid surprise startups
no_inputs = not any(
[
result,
url_arg,
index_arg,
clear_mode,
play_mode,
pause_mode,
save_mode,
load_mode,
current_mode,
list_mode,
]
)
if no_inputs:
# User invoked `.pipe` with no args: treat this as an intent to open MPV.
debug("MPV is not running. Starting new instance...")
_start_mpv([], config=config, start_opts=start_opts)
# Re-check playlist after startup; if IPC still isn't ready, just exit cleanly.
try:
import time
time.sleep(0.3)
except Exception:
pass
items = _get_playlist(silent=True)
if items is None:
debug("MPV is starting up...")
return 0
# IPC is ready; continue without restarting MPV again.
else:
debug("MPV is not running. Starting new instance...")
_start_mpv([], config=config, start_opts=start_opts)
return 0
if not items:
debug("MPV playlist is empty.")
return 0
# If index is provided, perform action (Play or Clear)
if index_arg is not None:
try:
# Handle 1-based index
idx = int(index_arg) - 1
if idx < 0 or idx >= len(items):
debug(f"Index {index_arg} out of range (1-{len(items)}).")
return 1
item = items[idx]
title = _extract_title_from_item(item)
filename = item.get("filename", "") if isinstance(item, dict) else ""
hydrus_header = _build_hydrus_header(config or {})
hydrus_url = None
try:
hydrus_url = get_hydrus_url(config) if config is not None else None
except Exception:
hydrus_url = None
if clear_mode:
# Remove item
cmd = {
"command": ["playlist-remove",
idx],
"request_id": 101
}
resp = _send_ipc_command(cmd)
if resp and resp.get("error") == "success":
debug(f"Removed: {title}")
# Refresh items for listing
items = _get_playlist() or []
list_mode = True
index_arg = None
else:
debug(
f"Failed to remove item: {resp.get('error') if resp else 'No response'}"
)
return 1
else:
# Play item
try:
s, h = _extract_store_and_hash(item)
_set_mpv_item_context(s, h)
except Exception:
pass
if hydrus_header and _is_hydrus_path(filename, hydrus_url):
header_cmd = {
"command":
["set_property",
"http-header-fields",
hydrus_header],
"request_id": 198,
}
_send_ipc_command(header_cmd, silent=True)
cmd = {
"command": ["playlist-play-index",
idx],
"request_id": 102
}
resp = _send_ipc_command(cmd)
if resp and resp.get("error") == "success":
# Ensure playback starts (unpause)
unpause_cmd = {
"command": ["set_property",
"pause",
False],
"request_id": 103,
}
_send_ipc_command(unpause_cmd)
debug(f"Playing: {title}")
# Monitor logs briefly for errors (e.g. ytdl failures)
_monitor_mpv_logs(3.0)
# Refresh playlist view so the user sees the new current item immediately
items = _get_playlist(silent=True) or items
list_mode = True
index_arg = None
else:
debug(
f"Failed to play item: {resp.get('error') if resp else 'No response'}"
)
return 1
except ValueError:
debug(f"Invalid index: {index_arg}")
return 1
# List items (Default action or after clear)
if list_mode or (index_arg is None and not url_arg):
if not items:
debug("MPV playlist is empty.")
return 0
if file_storage is None:
try:
from Store import Store
file_storage = Store(config)
except Exception as e:
debug(
f"Warning: Could not initialize Store registry: {e}",
file=sys.stderr
)
# Use the loaded playlist name if available, otherwise default
# Note: current_playlist_name is defined in the load_mode block if a playlist was loaded
try:
table_title = current_playlist_name or "MPV Playlist"
except NameError:
table_title = "MPV Playlist"
table = Table(table_title, preserve_order=True)
# Convert MPV items to PipeObjects with proper hash and store
pipe_objects = []
for i, item in enumerate(items):
is_current = item.get("current", False)
title = _extract_title_from_item(item)
filename = item.get("filename", "")
# Extract the real path/URL from memory:// wrapper if present
real_path = _extract_target_from_memory_uri(filename) or filename
# Try to extract hash from the path/URL
file_hash = None
store_name = None
# Check if it's a Hydrus URL
if "get_files/file" in real_path or "hash=" in real_path:
# Extract hash from Hydrus URL
hash_match = re.search(r"hash=([0-9a-f]{64})", real_path.lower())
if hash_match:
file_hash = hash_match.group(1)
# Try to find which Hydrus instance has this file
if file_storage:
store_name = _find_hydrus_instance_for_hash(
file_hash,
file_storage
)
if not store_name:
store_name = "hydrus"
# Check if it's a hash-based local file
elif real_path:
# Try to extract hash from filename (e.g., C:\path\1e8c46...a1b2.mp4)
path_obj = Path(real_path)
stem = path_obj.stem # filename without extension
if len(stem) == 64 and all(c in "0123456789abcdef"
for c in stem.lower()):
file_hash = stem.lower()
# Fallback to inferred store if we couldn't find it
if not store_name:
store_name = _infer_store_from_playlist_item(
item,
file_storage=file_storage
)
# Build PipeObject with proper metadata
pipe_obj = PipeObject(
hash=file_hash or "unknown",
store=store_name or "unknown",
title=title,
path=real_path,
)
pipe_objects.append(pipe_obj)
# Truncate title for display
display_title = title
if len(display_title) > 80:
display_title = display_title[:77] + "..."
row = table.add_row()
row.add_column("Current", "*" if is_current else "")
row.add_column("Store", store_name or "unknown")
row.add_column("Title", display_title)
table.set_row_selection_args(i, [str(i + 1)])
table.set_source_command(".mpv")
# Register PipeObjects (not raw MPV items) with pipeline context
ctx.set_last_result_table_overlay(table, pipe_objects)
ctx.set_current_stage_table(table)
# Do not print directly here.
# Both CmdletExecutor and PipelineExecutor render the current-stage/overlay table,
# so printing here would duplicate output.
return 0
finally:
if log_requested and isinstance(mpv_log_path, str) and mpv_log_path.strip():
try:
# Give mpv a short moment to flush logs, then print a tail that is easy to copy.
print(f"MPV log file: {mpv_log_path}")
# Best-effort: re-try enabling file logging at the end too (mpv may have
# been unreachable at the start).
try:
_try_enable_mpv_file_logging(mpv_log_path, attempts=2)
except Exception:
pass
tail_lines: List[str] = []
for _ in range(8):
tail_lines = _tail_text_file(mpv_log_path, max_lines=200)
if tail_lines:
break
try:
import time
time.sleep(0.25)
except Exception:
break
if tail_lines:
print("MPV log (tail):")
for ln in tail_lines:
print(ln)
else:
print("MPV log (tail): <empty>")
print(
"Note: On some Windows builds, mpv cannot start writing to --log-file after launch."
)
print(
"If you need full [main2] logs, restart mpv so it starts with --log-file."
)
# Print database logs for mpv module (helper output)
try:
import sqlite3
log_db_path = str((Path(__file__).resolve().parent.parent / "logs.db"))
conn = sqlite3.connect(log_db_path, timeout=5.0)
cur = conn.cursor()
cur.execute(
"SELECT level, module, message FROM logs WHERE module = 'mpv' ORDER BY timestamp DESC LIMIT 200"
)
mpv_logs = cur.fetchall()
cur.close()
conn.close()
print("Helper logs from database (mpv module, most recent first):")
if mpv_logs:
for level, module, message in mpv_logs:
print(f"[{level}] {message}")
else:
print("(no helper logs found)")
except Exception as e:
debug(f"Could not fetch database logs: {e}")
pass
# Also print the helper log tail (this captures Python helper output that won't
# necessarily show up in MPV's own log-file).
try:
helper_path = _helper_log_file()
helper_tail = _tail_text_file(str(helper_path), max_lines=200)
print(f"Helper log file: {str(helper_path)}")
if helper_tail:
print("Helper log (tail):")
for ln in helper_tail:
print(ln)
else:
print("Helper log (tail): <empty>")
except Exception:
pass
# Also print the Lua-side log tail (mp.msg output isn't always written to mpv's log-file).
try:
lua_path = _lua_log_file()
lua_tail = _tail_text_file(str(lua_path), max_lines=200)
print(f"Lua log file: {str(lua_path)}")
if lua_tail:
print("Lua log (tail):")
for ln in lua_tail:
print(ln)
else:
print("Lua log (tail): <empty>")
except Exception:
pass
except Exception:
pass
try:
set_thread_stream(prev_stream)
except Exception:
pass
try:
set_debug(prev_debug)
except Exception:
pass
try:
if devnull_fh is not None:
devnull_fh.close()
except Exception:
pass
def _start_mpv(
items: List[Any],
config: Optional[Dict[str,
Any]] = None,
start_opts: Optional[Dict[str,
Any]] = None,
) -> None:
"""Start MPV with a list of items."""
import time as _time_module
mpv = MPV()
mpv.kill_existing_windows()
_time_module.sleep(0.5) # Wait for process to die
hydrus_header = _build_hydrus_header(config or {})
ytdl_opts = _build_ytdl_options(config, hydrus_header)
cookies_path = None
try:
from tool.ytdlp import YtDlpTool
cookiefile = YtDlpTool(config or {}).resolve_cookiefile()
if cookiefile is not None:
cookies_path = str(cookiefile)
except Exception:
cookies_path = None
if cookies_path:
debug(f"Starting MPV with cookies file: {cookies_path.replace('\\', '/')}")
else:
debug("Starting MPV with browser cookies: chrome")
try:
extra_args: List[str] = [
"--ytdl-format=bestvideo[height<=?1080]+bestaudio/best[height<=?1080]",
]
# If we are going to play a DASH MPD, allow ffmpeg to fetch https segments referenced by the manifest.
try:
needs_mpd_whitelist = False
for it in items or []:
mpd = resolve_tidal_manifest_path(it)
candidate = mpd
if not candidate:
if isinstance(it, dict):
candidate = it.get("path") or it.get("url")
else:
candidate = getattr(it, "path", None) or getattr(it, "url", None)
if candidate and re.search(r"\.mpd($|\?)", str(candidate).lower()):
needs_mpd_whitelist = True
break
if needs_mpd_whitelist:
extra_args.append(
"--demuxer-lavf-o=protocol_whitelist=file,https,tcp,tls,crypto,data"
)
except Exception:
pass
# Optional: borderless window (useful for uosc-like overlay UI without fullscreen).
if start_opts and start_opts.get("borderless"):
extra_args.append("--border=no")
# Optional: mpv logging to file.
mpv_log_path = (start_opts or {}).get("mpv_log_path")
if isinstance(mpv_log_path, str) and mpv_log_path.strip():
extra_args.append(f"--log-file={mpv_log_path}")
extra_args.append("--msg-level=all=v")
# Always start MPV with the bundled Lua script via MPV class.
mpv.start(
extra_args=extra_args,
ytdl_raw_options=ytdl_opts,
http_header_fields=hydrus_header,
detached=True,
)
debug("Started MPV process")
# Wait for IPC pipe to be ready
if not mpv.wait_for_ipc(retries=20, delay_seconds=0.2):
debug("Timed out waiting for MPV IPC connection", file=sys.stderr)
return
# Publish context early so the lyric helper can resolve notes on the first
# target change (the helper may start before playback begins).
try:
if items:
s, h = _extract_store_and_hash(items[0])
_set_mpv_item_context(s, h)
except Exception:
pass
# main.lua is loaded at startup via --script; don't reload it here.
# Ensure lyric overlay is running (auto-discovery handled by MPV.lyric).
_ensure_lyric_overlay(mpv)
# Queue items via IPC
if items:
_queue_items(items, config=config, start_opts=start_opts)
# Auto-play the first item
import time
time.sleep(0.3) # Give MPV a moment to process the queued items
# Play the first item (index 0) and unpause
play_cmd = {
"command": ["playlist-play-index",
0],
"request_id": 102
}
play_resp = _send_ipc_command(play_cmd, silent=True)
if play_resp and play_resp.get("error") == "success":
# Ensure playback starts (unpause)
unpause_cmd = {
"command": ["set_property",
"pause",
False],
"request_id": 103
}
_send_ipc_command(unpause_cmd, silent=True)
debug("Auto-playing first item")
# Overlay already started above; it will follow track changes automatically.
except Exception as e:
debug(f"Error starting MPV: {e}", file=sys.stderr)
CMDLET = Cmdlet(
name=".mpv",
alias=[".pipe", "pipe", "playlist", "queue", "ls-pipe"],
summary="Manage and play items in the MPV playlist via IPC",
usage=".mpv [index|url] [-current] [-clear] [-list] [-url URL] [-log] [-borderless]",
arg=[
CmdletArg(
name="index",
type="string", # Changed to string to allow URL detection
description="Index of item to play/clear, or URL to queue",
required=False,
),
CmdletArg(name="url", type="string", description="URL to queue", required=False),
CmdletArg(
name="clear",
type="flag",
description="Remove the selected item, or clear entire playlist if no index provided",
),
CmdletArg(name="list", type="flag", description="List items (default)"),
CmdletArg(name="play", type="flag", description="Resume playback or play specific index/URL"),
CmdletArg(name="pause", type="flag", description="Pause playback"),
CmdletArg(
name="replace",
type="flag",
description="Replace current playlist when adding index or URL",
),
CmdletArg(
name="save",
type="flag",
description="Save current playlist to database",
requires_db=True,
),
CmdletArg(
name="load",
type="flag",
description="List saved playlists",
requires_db=True,
),
CmdletArg(
name="current",
type="flag",
description="Emit the currently playing item to pipeline for further processing",
),
CmdletArg(
name="log",
type="flag",
description="Enable pipeable debug output and write an mpv log file",
),
CmdletArg(
name="borderless",
type="flag",
description="Start mpv with no window border (uosc-like overlay feel without fullscreen)",
),
],
exec=_run,
)