From 73f3005393cda0635113437a693ac0c40601912f Mon Sep 17 00:00:00 2001 From: Nose Date: Sat, 3 Jan 2026 03:37:48 -0800 Subject: [PATCH] j --- API/hifi.py | 13 + CLI.py | 378 ++++++++-------- MPV/LUA/main.lua | 39 +- MPV/lyric.py | 449 ++++++++++++++++---- MPV/mpv_ipc.py | 12 +- MPV/portable_config/script-opts/medeia.conf | 2 +- Provider/HIFI.py | 434 ++++++++++++++++++- Provider/bandcamp.py | 4 + Provider/internetarchive.py | 15 +- Provider/libgen.py | 4 + Provider/openlibrary.py | 233 ++++++---- Provider/podcastindex.py | 5 + Provider/soulseek.py | 4 + Provider/youtube.py | 6 + ProviderCore/base.py | 80 ++++ ProviderCore/registry.py | 34 ++ SYS/cmdlet_catalog.py | 20 + cmdlet/_shared.py | 3 + cmdlet/add_file.py | 15 + cmdlet/download_file.py | 220 ++++++++-- cmdlet/search_file.py | 26 +- cmdnat/help.py | 132 ++++-- cmdnat/pipe.py | 105 ++++- 23 files changed, 1791 insertions(+), 442 deletions(-) diff --git a/API/hifi.py b/API/hifi.py index 93f8df1..c04004b 100644 --- a/API/hifi.py +++ b/API/hifi.py @@ -18,6 +18,7 @@ class HifiApiClient: - GET /search/ with exactly one of s, a, v, p - GET /track/ with id (and optional quality) - GET /info/ with id + - GET /lyrics/ with id """ def __init__(self, base_url: str = DEFAULT_BASE_URL, *, timeout: float = 10.0) -> None: @@ -66,3 +67,15 @@ class HifiApiClient: raise HifiApiError("track_id must be positive") return self._get_json("info/", params={"id": track_int}) + + def lyrics(self, track_id: int) -> Dict[str, Any]: + """Fetch lyrics (including subtitles/LRC) for a track.""" + + try: + track_int = int(track_id) + except Exception as exc: + raise HifiApiError(f"track_id must be int-compatible: {exc}") from exc + if track_int <= 0: + raise HifiApiError("track_id must be positive") + + return self._get_json("lyrics/", params={"id": track_int}) diff --git a/CLI.py b/CLI.py index 9730b28..d5ac921 100644 --- a/CLI.py +++ b/CLI.py @@ -68,6 +68,27 @@ from SYS.cmdlet_catalog import ( from SYS.config import get_local_storage_path, load_config from SYS.result_table import ResultTable +HELP_EXAMPLE_SOURCE_COMMANDS = { + ".help-example", + "help-example", +} + + +def _split_pipeline_tokens(tokens: Sequence[str]) -> List[List[str]]: + """Split example tokens into per-stage command sequences using pipe separators.""" + + stages: List[List[str]] = [] + current: List[str] = [] + for token in tokens: + if token == "|": + if current: + stages.append(current) + current = [] + continue + current.append(str(token)) + if current: + stages.append(current) + return [stage for stage in stages if stage] class SelectionSyntax: """Parses @ selection syntax into 1-based indices.""" @@ -1397,6 +1418,7 @@ class CmdletExecutor: "get-relationship", "get-rel", ".pipe", + ".mpv", ".matrix", ".telegram", "telegram", @@ -2418,96 +2440,117 @@ class PipelineExecutor: ) command_expanded = False + example_selector_triggered = False + normalized_source_cmd = str(source_cmd or "").replace("_", "-").strip().lower() - if table_type in {"youtube", - "soulseek"}: - command_expanded = False - elif source_cmd == "search-file" and source_args and "youtube" in source_args: - command_expanded = False - else: - selected_row_args: List[str] = [] - skip_pipe_expansion = source_cmd == ".pipe" and len(stages) > 0 - # Command expansion via @N: - # - Default behavior: expand ONLY for single-row selections. - # - Special case: allow multi-row expansion for add-file directory tables by - # combining selected rows into a single `-path file1,file2,...` argument. - if source_cmd and not skip_pipe_expansion: - src = str(source_cmd).replace("_", "-").strip().lower() + if normalized_source_cmd in HELP_EXAMPLE_SOURCE_COMMANDS and selection_indices: + try: + idx = selection_indices[0] + row_args = ctx.get_current_stage_table_row_selection_args(idx) + except Exception: + row_args = None + tokens: List[str] = [] + if isinstance(row_args, list) and row_args: + tokens = [str(x) for x in row_args if x is not None] + if tokens: + stage_groups = _split_pipeline_tokens(tokens) + if stage_groups: + for stage in reversed(stage_groups): + stages.insert(0, stage) + selection_indices = [] + command_expanded = True + example_selector_triggered = True - if src == "add-file" and selection_indices: - row_args_list: List[List[str]] = [] - for idx in selection_indices: + if not example_selector_triggered: + if table_type in {"youtube", + "soulseek"}: + command_expanded = False + elif source_cmd == "search-file" and source_args and "youtube" in source_args: + command_expanded = False + else: + selected_row_args: List[str] = [] + skip_pipe_expansion = source_cmd in {".pipe", ".mpv"} and len(stages) > 0 + # Command expansion via @N: + # - Default behavior: expand ONLY for single-row selections. + # - Special case: allow multi-row expansion for add-file directory tables by + # combining selected rows into a single `-path file1,file2,...` argument. + if source_cmd and not skip_pipe_expansion: + src = str(source_cmd).replace("_", "-").strip().lower() + + if src == "add-file" and selection_indices: + row_args_list: List[List[str]] = [] + for idx in selection_indices: + try: + row_args = ctx.get_current_stage_table_row_selection_args( + idx + ) + except Exception: + row_args = None + if isinstance(row_args, list) and row_args: + row_args_list.append( + [str(x) for x in row_args if x is not None] + ) + + # Combine `['-path', ]` from each row into one `-path` arg. + paths: List[str] = [] + can_merge = bool(row_args_list) and ( + len(row_args_list) == len(selection_indices) + ) + if can_merge: + for ra in row_args_list: + if len(ra) == 2 and str(ra[0]).strip().lower() in { + "-path", + "--path", + "-p", + }: + p = str(ra[1]).strip() + if p: + paths.append(p) + else: + can_merge = False + break + + if can_merge and paths: + selected_row_args.extend(["-path", ",".join(paths)]) + elif len(selection_indices) == 1 and row_args_list: + selected_row_args.extend(row_args_list[0]) + else: + # Only perform @N command expansion for *single-item* selections. + # For multi-item selections (e.g. @*, @1-5), expanding to one row + # would silently drop items. In those cases we pipe items downstream. + if len(selection_indices) == 1: + idx = selection_indices[0] + row_args = ctx.get_current_stage_table_row_selection_args(idx) + if row_args: + selected_row_args.extend(row_args) + + if selected_row_args: + if isinstance(source_cmd, list): + cmd_list: List[str] = [str(x) for x in source_cmd if x is not None] + elif isinstance(source_cmd, str): + cmd_list = [source_cmd] + else: + cmd_list = [] + + expanded_stage: List[str] = cmd_list + source_args + selected_row_args + + if first_stage_had_extra_args and stages: + expanded_stage += stages[0] + stages[0] = expanded_stage + else: + stages.insert(0, expanded_stage) + + if pipeline_session and worker_manager: try: - row_args = ctx.get_current_stage_table_row_selection_args( - idx + worker_manager.log_step( + pipeline_session.worker_id, + f"@N expansion: {source_cmd} + {' '.join(str(x) for x in selected_row_args)}", ) except Exception: - row_args = None - if isinstance(row_args, list) and row_args: - row_args_list.append( - [str(x) for x in row_args if x is not None] - ) + pass - # Combine `['-path', ]` from each row into one `-path` arg. - paths: List[str] = [] - can_merge = bool(row_args_list) and ( - len(row_args_list) == len(selection_indices) - ) - if can_merge: - for ra in row_args_list: - if len(ra) == 2 and str(ra[0]).strip().lower() in { - "-path", - "--path", - "-p", - }: - p = str(ra[1]).strip() - if p: - paths.append(p) - else: - can_merge = False - break - - if can_merge and paths: - selected_row_args.extend(["-path", ",".join(paths)]) - elif len(selection_indices) == 1 and row_args_list: - selected_row_args.extend(row_args_list[0]) - else: - # Only perform @N command expansion for *single-item* selections. - # For multi-item selections (e.g. @*, @1-5), expanding to one row - # would silently drop items. In those cases we pipe items downstream. - if len(selection_indices) == 1: - idx = selection_indices[0] - row_args = ctx.get_current_stage_table_row_selection_args(idx) - if row_args: - selected_row_args.extend(row_args) - - if selected_row_args: - if isinstance(source_cmd, list): - cmd_list: List[str] = [str(x) for x in source_cmd if x is not None] - elif isinstance(source_cmd, str): - cmd_list = [source_cmd] - else: - cmd_list = [] - - expanded_stage: List[str] = cmd_list + source_args + selected_row_args - - if first_stage_had_extra_args and stages: - expanded_stage += stages[0] - stages[0] = expanded_stage - else: - stages.insert(0, expanded_stage) - - if pipeline_session and worker_manager: - try: - worker_manager.log_step( - pipeline_session.worker_id, - f"@N expansion: {source_cmd} + {' '.join(str(x) for x in selected_row_args)}", - ) - except Exception: - pass - - selection_indices = [] - command_expanded = True + selection_indices = [] + command_expanded = True if (not command_expanded) and selection_indices: stage_table = None @@ -2597,74 +2640,44 @@ class PipelineExecutor: "table") else None ) + def _norm_cmd(name: Any) -> str: + return str(name or "").replace("_", "-").strip().lower() + + auto_stage = None + if isinstance(table_type, str) and table_type: + try: + from ProviderCore.registry import selection_auto_stage_for_table + + auto_stage = selection_auto_stage_for_table(table_type) + except Exception: + auto_stage = None + if not stages: - if table_type == "youtube": - print("Auto-running YouTube selection via download-file") - stages.append(["download-file"]) - elif table_type == "bandcamp": - print("Auto-running Bandcamp selection via download-file") - stages.append(["download-file"]) - elif table_type == "internetarchive": - print("Auto-loading Internet Archive item via download-file") - stages.append(["download-file"]) - elif table_type == "podcastindex.episodes": - print("Auto-piping selection to download-file") - stages.append(["download-file"]) - elif table_type in {"soulseek", - "openlibrary", - "libgen"}: - print("Auto-piping selection to download-file") - stages.append(["download-file"]) - elif isinstance(table_type, str) and table_type.startswith("metadata."): + if isinstance(table_type, str) and table_type.startswith("metadata."): print("Auto-applying metadata selection via get-tag") stages.append(["get-tag"]) + elif auto_stage: + try: + print(f"Auto-running selection via {auto_stage[0]}") + except Exception: + pass + stages.append(list(auto_stage)) else: first_cmd = stages[0][0] if stages and stages[0] else None - if table_type == "soulseek" and first_cmd not in ( - "download-file", - ".pipe", - ): - debug("Auto-inserting download-file after Soulseek selection") - stages.insert(0, ["download-file"]) - if table_type == "youtube" and first_cmd not in ( - "download-file", - ".pipe", - ): - debug("Auto-inserting download-file after YouTube selection") - stages.insert(0, ["download-file"]) - if table_type == "bandcamp" and first_cmd not in ( - "download-file", - ".pipe", - ): - print("Auto-inserting download-file after Bandcamp selection") - stages.insert(0, ["download-file"]) - if table_type == "internetarchive" and first_cmd not in ( - "download-file", - ".pipe", - ): - debug( - "Auto-inserting download-file after Internet Archive selection" - ) - stages.insert(0, ["download-file"]) - if table_type == "podcastindex.episodes" and first_cmd not in ( - "download-file", - ".pipe", - ): - print("Auto-inserting download-file after PodcastIndex episode selection") - stages.insert(0, ["download-file"]) - if table_type == "libgen" and first_cmd not in ( - "download-file", - ".pipe", - ): - print("Auto-inserting download-file after Libgen selection") - stages.insert(0, ["download-file"]) if isinstance(table_type, str) and table_type.startswith("metadata.") and first_cmd not in ( "get-tag", "get_tag", ".pipe", + ".mpv", ): print("Auto-inserting get-tag after metadata selection") stages.insert(0, ["get-tag"]) + elif auto_stage: + first_cmd_norm = _norm_cmd(first_cmd) + auto_cmd_norm = _norm_cmd(auto_stage[0]) + if first_cmd_norm not in (auto_cmd_norm, ".pipe", ".mpv"): + debug(f"Auto-inserting {auto_cmd_norm} after selection") + stages.insert(0, list(auto_stage)) return True, piped_result else: @@ -2744,7 +2757,7 @@ class PipelineExecutor: # `.pipe` (MPV) is an interactive launcher; disable pipeline Live progress # for it because it doesn't meaningfully "complete" (mpv may keep running) # and Live output interferes with MPV playlist UI. - if name == ".pipe": + if name in {".pipe", ".mpv"}: continue # `.matrix` uses a two-phase picker (@N then .matrix -send). Pipeline Live # progress can linger across those phases and interfere with interactive output. @@ -3161,62 +3174,37 @@ class PipelineExecutor: if stage_index + 1 < len(stages) and stages[stage_index + 1]: next_cmd = _norm_stage_cmd(stages[stage_index + 1][0]) + auto_stage = None + if isinstance(table_type, str) and table_type: + try: + from ProviderCore.registry import selection_auto_stage_for_table + + # Preserve historical behavior: only forward selection-stage args + # to the auto stage when we are appending a new last stage. + at_end = bool(stage_index + 1 >= len(stages)) + auto_stage = selection_auto_stage_for_table( + table_type, + stage_args if at_end else None, + ) + except Exception: + auto_stage = None + # Auto-insert downloader stages for provider tables. # IMPORTANT: do not auto-download for filter selections; they may match many rows. if filter_spec is None: if stage_index + 1 >= len(stages): - if table_type == "youtube": - print("Auto-running YouTube selection via download-file") - stages.append(["download-file", *stage_args]) - elif table_type == "bandcamp": - print("Auto-running Bandcamp selection via download-file") - stages.append(["download-file"]) - elif table_type == "internetarchive": - print("Auto-loading Internet Archive item via download-file") - stages.append(["download-file"]) - elif table_type == "podcastindex.episodes": - print("Auto-piping selection to download-file") - stages.append(["download-file"]) - elif table_type in {"soulseek", "openlibrary", "libgen"}: - print("Auto-piping selection to download-file") - stages.append(["download-file"]) + if auto_stage: + try: + print(f"Auto-running selection via {auto_stage[0]}") + except Exception: + pass + stages.append(list(auto_stage)) else: - if table_type == "soulseek" and next_cmd not in ( - "download-file", - ".pipe", - ): - debug("Auto-inserting download-file after Soulseek selection") - stages.insert(stage_index + 1, ["download-file"]) - if table_type == "youtube" and next_cmd not in ( - "download-file", - ".pipe", - ): - debug("Auto-inserting download-file after YouTube selection") - stages.insert(stage_index + 1, ["download-file"]) - if table_type == "bandcamp" and next_cmd not in ( - "download-file", - ".pipe", - ): - print("Auto-inserting download-file after Bandcamp selection") - stages.insert(stage_index + 1, ["download-file"]) - if table_type == "internetarchive" and next_cmd not in ( - "download-file", - ".pipe", - ): - debug("Auto-inserting download-file after Internet Archive selection") - stages.insert(stage_index + 1, ["download-file"]) - if table_type == "podcastindex.episodes" and next_cmd not in ( - "download-file", - ".pipe", - ): - print("Auto-inserting download-file after PodcastIndex episode selection") - stages.insert(stage_index + 1, ["download-file"]) - if table_type == "libgen" and next_cmd not in ( - "download-file", - ".pipe", - ): - print("Auto-inserting download-file after Libgen selection") - stages.insert(stage_index + 1, ["download-file"]) + if auto_stage: + auto_cmd = _norm_stage_cmd(auto_stage[0]) + if next_cmd not in (auto_cmd, ".pipe", ".mpv"): + debug(f"Auto-inserting {auto_cmd} after selection") + stages.insert(stage_index + 1, list(auto_stage)) continue cmd_fn = REGISTRY.get(cmd_name) @@ -3386,9 +3374,9 @@ class PipelineExecutor: except Exception: pass - # `.pipe` is typically the terminal interactive stage (MPV UI). + # `.pipe`/`.mpv` is typically the terminal interactive stage (MPV UI). # Stop Live progress before running it so output doesn't get stuck behind Live. - if (cmd_name == ".pipe" and progress_ui is not None + if (cmd_name in {".pipe", ".mpv"} and progress_ui is not None and (stage_index + 1 >= len(stages))): try: progress_ui.stop() @@ -3495,7 +3483,7 @@ class PipelineExecutor: "bandcamp", "youtube", } or stage_table_source in {"download-file"} - or stage_table_type in {"internetarchive.formats"} + or stage_table_type in {"internetarchive.format", "internetarchive.formats"} or stage_table_source in {"download-file"})): try: is_selectable = not bool( diff --git a/MPV/LUA/main.lua b/MPV/LUA/main.lua index 86b26da..896ac6d 100644 --- a/MPV/LUA/main.lua +++ b/MPV/LUA/main.lua @@ -297,6 +297,10 @@ end local _cached_store_names = {} local _store_cache_loaded = false +-- Optional index into _cached_store_names (used by some older menu code paths). +-- If unset, callers should fall back to reading SELECTED_STORE_PROP. +local _selected_store_index = nil + local SELECTED_STORE_PROP = 'user-data/medeia-selected-store' local STORE_PICKER_MENU_TYPE = 'medeia_store_picker' local _selected_store_loaded = false @@ -438,7 +442,7 @@ local function get_mpv_ipc_path() end local function ensure_mpv_ipc_server() - -- `.pipe -play` (Python) controls MPV via JSON IPC. If mpv was started + -- `.mpv -play` (Python) controls MPV via JSON IPC. If mpv was started -- without --input-ipc-server, make sure we set one so the running instance -- can be controlled (instead of Python spawning a separate mpv). local ipc = mp.get_property('input-ipc-server') @@ -2192,6 +2196,37 @@ local function _call_mpv_api(request) end end +-- Run a Medeia pipeline command via the Python pipeline helper (IPC request/response). +-- Returns stdout string on success, or nil on failure. +function M.run_pipeline(pipeline_cmd, seeds) + pipeline_cmd = trim(tostring(pipeline_cmd or '')) + if pipeline_cmd == '' then + return nil + end + + ensure_mpv_ipc_server() + + local resp = run_pipeline_via_ipc_response(pipeline_cmd, seeds, 30) + if type(resp) == 'table' and resp.success then + return resp.stdout or '' + end + + local err = '' + if type(resp) == 'table' then + if resp.error and tostring(resp.error) ~= '' then + err = tostring(resp.error) + elseif resp.stderr and tostring(resp.stderr) ~= '' then + err = tostring(resp.stderr) + end + end + if err ~= '' then + _lua_log('pipeline failed cmd=' .. tostring(pipeline_cmd) .. ' err=' .. err) + else + _lua_log('pipeline failed cmd=' .. tostring(pipeline_cmd) .. ' err=') + end + return nil +end + -- Helper to run pipeline and parse JSON output function M.run_pipeline_json(pipeline_cmd, seeds) -- Append | output-json if not present @@ -2584,7 +2619,7 @@ mp.register_script_message('medios-load-url-event', function(json) end ensure_mpv_ipc_server() - local out = M.run_pipeline('.pipe -url ' .. quote_pipeline_arg(url) .. ' -play') + local out = M.run_pipeline('.mpv -url ' .. quote_pipeline_arg(url) .. ' -play') if out ~= nil then if ensure_uosc_loaded() then mp.commandv('script-message-to', 'uosc', 'close-menu', LOAD_URL_MENU_TYPE) diff --git a/MPV/lyric.py b/MPV/lyric.py index 828a036..d6bdf5d 100644 --- a/MPV/lyric.py +++ b/MPV/lyric.py @@ -54,9 +54,18 @@ _SINGLE_INSTANCE_LOCK_FH: Optional[TextIO] = None _LYRIC_VISIBLE_PROP = "user-data/medeia-lyric-visible" -# mpv osd-overlay IDs are scoped to the IPC client connection. -# MPV.lyric keeps a persistent connection, so we can safely reuse a constant ID. -_LYRIC_OSD_OVERLAY_ID = 4242 +# Optional overrides set by the playlist controller (.pipe/.mpv) so the lyric +# helper can resolve notes even when the local file path cannot be mapped back +# to a store via the store DB (common for Folder stores). +_ITEM_STORE_PROP = "user-data/medeia-item-store" +_ITEM_HASH_PROP = "user-data/medeia-item-hash" + +# Note: We previously used `osd-overlay`, but some mpv builds return +# error='invalid parameter' for that command. We now use `show-text`, which is +# widely supported across mpv versions. + +_OSD_STYLE_SAVED: Optional[Dict[str, Any]] = None +_OSD_STYLE_APPLIED: bool = False def _single_instance_lock_path(ipc_path: str) -> Path: @@ -70,7 +79,7 @@ def _single_instance_lock_path(ipc_path: str) -> Path: def _acquire_single_instance_lock(ipc_path: str) -> bool: """Ensure only one MPV.lyric process runs per IPC server. - This prevents duplicate overlays (e.g. one old show-text overlay + one new osd-overlay). + This prevents duplicate overlays (e.g. multiple lyric helpers racing to update OSD). """ global _SINGLE_INSTANCE_LOCK_FH @@ -123,39 +132,29 @@ def _ass_escape(text: str) -> str: return t -def _format_lyric_as_subtitle(text: str) -> str: - # Bottom-center like a subtitle (ASS alignment 2). - # NOTE: show-text escapes ASS by default; we use osd-overlay so this is honored. - return "{\\an2}" + _ass_escape(text) +def _osd_set_text(client: MPVIPCClient, text: str, *, duration_ms: int = 1000) -> Optional[dict]: + # Signature: show-text [] [] + # Duration 0 clears immediately; we generally set it to cover until next update. + try: + d = int(duration_ms) + except Exception: + d = 1000 + if d < 0: + d = 0 + return client.send_command({ + "command": [ + "show-text", + str(text or ""), + d, + ] + }) -def _osd_overlay_set_ass(client: MPVIPCClient, ass_text: str) -> Optional[dict]: - # Use osd-overlay with ass-events so ASS override tags (e.g. {\an2}) are applied. - # Keep z low so UI scripts (like uosc) can draw above it if they use higher z. - return client.send_command( - { - "command": { - "name": "osd-overlay", - "id": _LYRIC_OSD_OVERLAY_ID, - "format": "ass-events", - "data": ass_text, - "res_y": 720, - "z": -50, - } - } - ) - - -def _osd_overlay_clear(client: MPVIPCClient) -> None: - client.send_command( - { - "command": { - "name": "osd-overlay", - "id": _LYRIC_OSD_OVERLAY_ID, - "format": "none" - } - } - ) +def _osd_clear(client: MPVIPCClient) -> None: + try: + _osd_set_text(client, "", duration_ms=0) + except Exception: + return def _log(msg: str) -> None: @@ -191,6 +190,104 @@ def _ipc_get_property( return default +def _ipc_set_property(client: MPVIPCClient, name: str, value: Any) -> bool: + resp = client.send_command({ + "command": ["set_property", + name, + value] + }) + return bool(resp and resp.get("error") == "success") + + +def _osd_capture_style(client: MPVIPCClient) -> Dict[str, Any]: + keys = [ + "osd-align-x", + "osd-align-y", + "osd-font-size", + "osd-margin-y", + ] + out: Dict[str, Any] = {} + for k in keys: + try: + out[k] = _ipc_get_property(client, k, None) + except Exception: + out[k] = None + return out + + +def _osd_apply_lyric_style(client: MPVIPCClient, *, config: Dict[str, Any]) -> None: + """Apply bottom-center + larger font for lyric show-text messages. + + This modifies mpv's global OSD settings, so we save and restore them. + """ + global _OSD_STYLE_SAVED, _OSD_STYLE_APPLIED + + if not _OSD_STYLE_APPLIED: + if _OSD_STYLE_SAVED is None: + _OSD_STYLE_SAVED = _osd_capture_style(client) + + try: + _ipc_set_property(client, "osd-align-x", "center") + _ipc_set_property(client, "osd-align-y", "bottom") + + scale = config.get("lyric_osd_font_scale", 1.15) + try: + scale_f = float(scale) + except Exception: + scale_f = 1.15 + if scale_f < 1.0: + scale_f = 1.0 + + old_size = None + try: + if _OSD_STYLE_SAVED is not None: + old_size = _OSD_STYLE_SAVED.get("osd-font-size") + except Exception: + old_size = None + if isinstance(old_size, (int, float)): + new_size = int(max(10, round(float(old_size) * scale_f))) + else: + # mpv default is typically ~55; choose a conservative readable size. + new_size = int(config.get("lyric_osd_font_size", 64)) + + _ipc_set_property(client, "osd-font-size", new_size) + + min_margin_y = int(config.get("lyric_osd_min_margin_y", 60)) + old_margin_y = None + try: + if _OSD_STYLE_SAVED is not None: + old_margin_y = _OSD_STYLE_SAVED.get("osd-margin-y") + except Exception: + old_margin_y = None + if isinstance(old_margin_y, (int, float)): + _ipc_set_property(client, "osd-margin-y", int(max(old_margin_y, min_margin_y))) + else: + _ipc_set_property(client, "osd-margin-y", min_margin_y) + except Exception: + return + + _OSD_STYLE_APPLIED = True + + +def _osd_restore_style(client: MPVIPCClient) -> None: + global _OSD_STYLE_SAVED, _OSD_STYLE_APPLIED + + if not _OSD_STYLE_APPLIED: + return + + try: + saved = _OSD_STYLE_SAVED or {} + for k, v in saved.items(): + if v is None: + continue + try: + _ipc_set_property(client, k, v) + except Exception: + pass + finally: + _OSD_STYLE_APPLIED = False + + def _http_get_json(url: str, *, timeout_s: float = 10.0) -> Optional[dict]: try: req = Request( @@ -460,9 +557,26 @@ def _load_config_best_effort() -> dict: try: from SYS.config import load_config - cfg = load_config() - return cfg if isinstance(cfg, - dict) else {} + # `SYS.config.load_config()` defaults to loading `config.conf` from the + # SYS/ directory, but this repo keeps `config.conf` at the repo root. + # MPV.lyric is often spawned from mpv (not the CLI), so we must locate + # the repo root ourselves. + try: + repo_root = Path(__file__).resolve().parent.parent + except Exception: + repo_root = None + + cfg = None + if repo_root is not None: + try: + cfg = load_config(config_dir=repo_root) + except Exception: + cfg = None + + if cfg is None: + cfg = load_config() + + return cfg if isinstance(cfg, dict) else {} except Exception: return {} @@ -745,6 +859,22 @@ def _resolve_store_backend_for_target( return name, backend + # Fallback for Folder stores: + # If the mpv target is inside a configured Folder store root and the filename + # is hash-named, accept the inferred store even if the store DB doesn't map + # hash->path (e.g. DB missing entry, external copy, etc.). + try: + inferred = _infer_store_for_target(target=target, config=config) + if inferred and inferred in backend_names: + backend = reg[inferred] + if type(backend).__name__ == "Folder": + p = Path(target) + stem = str(p.stem or "").strip().lower() + if stem and stem == str(file_hash or "").strip().lower(): + return inferred, backend + except Exception: + pass + return None, None @@ -856,6 +986,8 @@ def run_auto_overlay( last_text: Optional[str] = None last_visible: Optional[bool] = None + global _OSD_STYLE_SAVED, _OSD_STYLE_APPLIED + while True: try: # Toggle support (mpv Lua script sets this property; default to visible). @@ -868,13 +1000,20 @@ def run_auto_overlay( raw_path = _ipc_get_property(client, "path", None, raise_on_disconnect=True) except ConnectionError: try: - _osd_overlay_clear(client) + _osd_clear(client) + except Exception: + pass + try: + _osd_restore_style(client) except Exception: pass try: client.disconnect() except Exception: pass + # If mpv restarted, recapture baseline OSD settings on reconnect. + _OSD_STYLE_SAVED = None + _OSD_STYLE_APPLIED = False if not client.connect(): _log("mpv IPC disconnected; exiting MPV.lyric") return 4 @@ -888,7 +1027,11 @@ def run_auto_overlay( elif last_visible is True and visible is False: # Clear immediately when switching off. try: - _osd_overlay_clear(client) + _osd_clear(client) + except Exception: + pass + try: + _osd_restore_style(client) except Exception: pass # Also remove any external subtitle that may be showing lyrics so @@ -923,7 +1066,11 @@ def run_auto_overlay( # Non-http streams (ytdl://, edl://, rtmp://, etc.) are never valid for lyrics. if last_loaded_key is not None: try: - _osd_overlay_clear(client) + _osd_clear(client) + except Exception: + pass + try: + _osd_restore_style(client) except Exception: pass if last_loaded_sub_path is not None: @@ -941,6 +1088,28 @@ def run_auto_overlay( time.sleep(poll_s) continue + # Optional override from the playlist controller: `.mpv` can publish the + # intended store/hash in mpv user-data. We use this both on target change + # and as a late-arriving fallback (the helper may start before `.mpv` + # sets the properties). + store_override = None + hash_override = None + try: + store_override = _ipc_get_property(client, _ITEM_STORE_PROP, None) + hash_override = _ipc_get_property(client, _ITEM_HASH_PROP, None) + except Exception: + store_override = None + hash_override = None + + try: + store_override = str(store_override).strip() if store_override else None + except Exception: + store_override = None + try: + hash_override = str(hash_override).strip().lower() if hash_override else None + except Exception: + hash_override = None + if target != last_target: last_target = target last_idx = None @@ -953,7 +1122,11 @@ def run_auto_overlay( entries = [] times = [] if last_loaded_key is not None: - _osd_overlay_clear(client) + _osd_clear(client) + try: + _osd_restore_style(client) + except Exception: + pass last_loaded_key = None last_loaded_mode = None if last_loaded_sub_path is not None: @@ -962,6 +1135,22 @@ def run_auto_overlay( time.sleep(poll_s) continue + if store_override and (not hash_override or hash_override == current_file_hash): + try: + from Store import Store as StoreRegistry + + reg = StoreRegistry(cfg, suppress_debug=True) + current_backend = reg[store_override] + current_store_name = store_override + current_key = f"{current_store_name}:{current_file_hash}" + _log( + f"Resolved via mpv override store={current_store_name!r} hash={current_file_hash!r} valid=True" + ) + except Exception: + current_backend = None + current_store_name = None + current_key = None + if is_http: # HTTP/HTTPS targets are only valid if they map to a store backend. store_from_url = _extract_store_from_url_target(target) @@ -977,7 +1166,11 @@ def run_auto_overlay( entries = [] times = [] if last_loaded_key is not None: - _osd_overlay_clear(client) + _osd_clear(client) + try: + _osd_restore_style(client) + except Exception: + pass last_loaded_key = None last_loaded_mode = None if last_loaded_sub_path is not None: @@ -1002,7 +1195,11 @@ def run_auto_overlay( entries = [] times = [] if last_loaded_key is not None: - _osd_overlay_clear(client) + _osd_clear(client) + try: + _osd_restore_style(client) + except Exception: + pass last_loaded_key = None last_loaded_mode = None if last_loaded_sub_path is not None: @@ -1026,7 +1223,11 @@ def run_auto_overlay( entries = [] times = [] if last_loaded_key is not None: - _osd_overlay_clear(client) + _osd_clear(client) + try: + _osd_restore_style(client) + except Exception: + pass last_loaded_key = None last_loaded_mode = None if last_loaded_sub_path is not None: @@ -1042,15 +1243,16 @@ def run_auto_overlay( else: # Local files: resolve store item via store DB. If not resolvable, lyrics are disabled. - current_store_name, current_backend = _resolve_store_backend_for_target( - target=target, - file_hash=current_file_hash, - config=cfg, - ) - current_key = ( - f"{current_store_name}:{current_file_hash}" - if current_store_name and current_file_hash else None - ) + if not current_key or not current_backend: + current_store_name, current_backend = _resolve_store_backend_for_target( + target=target, + file_hash=current_file_hash, + config=cfg, + ) + current_key = ( + f"{current_store_name}:{current_file_hash}" + if current_store_name and current_file_hash else None + ) _log( f"Resolved store={current_store_name!r} hash={current_file_hash!r} valid={bool(current_key)}" @@ -1063,7 +1265,11 @@ def run_auto_overlay( entries = [] times = [] if last_loaded_key is not None: - _osd_overlay_clear(client) + _osd_clear(client) + try: + _osd_restore_style(client) + except Exception: + pass last_loaded_key = None last_loaded_mode = None if last_loaded_sub_path is not None: @@ -1072,6 +1278,29 @@ def run_auto_overlay( time.sleep(poll_s) continue + # Late-arriving context fallback: if we still don't have a store/backend for a + # local file, but `.mpv` has since populated user-data overrides, apply them + # without requiring a track change. + if (not is_http) and target and (not current_key or not current_backend): + try: + current_file_hash = _infer_hash_for_target(target) or current_file_hash + except Exception: + pass + + if (store_override and current_file_hash and (not hash_override or hash_override == current_file_hash)): + try: + from Store import Store as StoreRegistry + + reg = StoreRegistry(cfg, suppress_debug=True) + current_backend = reg[store_override] + current_store_name = store_override + current_key = f"{current_store_name}:{current_file_hash}" + _log( + f"Resolved via mpv override store={current_store_name!r} hash={current_file_hash!r} valid=True" + ) + except Exception: + pass + # Load/reload lyrics when we have a resolvable key and it differs from what we loaded. # This is important for the autofetch path: the note can appear without the mpv target changing. if (current_key and current_key != last_loaded_key and current_store_name @@ -1097,7 +1326,11 @@ def run_auto_overlay( if sub_text: # Treat subtitles as an alternative to lyrics; do not show the lyric overlay. try: - _osd_overlay_clear(client) + _osd_clear(client) + except Exception: + pass + try: + _osd_restore_style(client) except Exception: pass @@ -1194,30 +1427,56 @@ def run_auto_overlay( entries = [] times = [] if last_loaded_key is not None: - _osd_overlay_clear(client) + _osd_clear(client) + try: + _osd_restore_style(client) + except Exception: + pass last_loaded_key = None last_loaded_mode = None else: - _log(f"Loaded lyric note ({len(lrc_text)} chars)") + if not lrc_text: + # No lyric note, and we didn't run autofetch this tick. + # Clear any previous overlay and avoid crashing on None. + try: + _osd_clear(client) + except Exception: + pass + try: + _osd_restore_style(client) + except Exception: + pass + entries = [] + times = [] + last_loaded_key = current_key + last_loaded_mode = None + else: + _log(f"Loaded lyric note ({len(lrc_text)} chars)") - parsed = parse_lrc(lrc_text) - entries = parsed - times = [e.time_s for e in entries] - last_loaded_key = current_key - last_loaded_mode = "lyric" + parsed = parse_lrc(lrc_text) + entries = parsed + times = [e.time_s for e in entries] + last_loaded_key = current_key + last_loaded_mode = "lyric" try: # mpv returns None when idle/no file. t = _ipc_get_property(client, "time-pos", None, raise_on_disconnect=True) except ConnectionError: try: - _osd_overlay_clear(client) + _osd_clear(client) + except Exception: + pass + try: + _osd_restore_style(client) except Exception: pass try: client.disconnect() except Exception: pass + _OSD_STYLE_SAVED = None + _OSD_STYLE_APPLIED = False if not client.connect(): _log("mpv IPC disconnected; exiting MPV.lyric") return 4 @@ -1229,10 +1488,34 @@ def run_auto_overlay( continue if not entries: + # Nothing to show; ensure any previous text is cleared. + if last_text is not None: + try: + _osd_clear(client) + except Exception: + pass + try: + _osd_restore_style(client) + except Exception: + pass + last_text = None + last_idx = None time.sleep(poll_s) continue if not visible: + # User toggled lyrics off. + if last_text is not None: + try: + _osd_clear(client) + except Exception: + pass + try: + _osd_restore_style(client) + except Exception: + pass + last_text = None + last_idx = None time.sleep(poll_s) continue @@ -1244,8 +1527,23 @@ def run_auto_overlay( line = entries[idx] if idx != last_idx or line.text != last_text: - # osd-overlay has no duration; refresh periodically. - resp = _osd_overlay_set_ass(client, _format_lyric_as_subtitle(line.text)) + try: + if last_loaded_mode == "lyric": + _osd_apply_lyric_style(client, config=cfg) + except Exception: + pass + + # Show until the next lyric timestamp (or a sane max) to avoid flicker. + dur_ms = 1200 + try: + if idx + 1 < len(times): + nxt = float(times[idx + 1]) + cur = float(t) + dur_ms = int(max(250, min(8000, (nxt - cur) * 1000))) + except Exception: + dur_ms = 1200 + + resp = _osd_set_text(client, line.text, duration_ms=dur_ms) if resp is None: client.disconnect() if not client.connect(): @@ -1253,7 +1551,7 @@ def run_auto_overlay( return 4 elif isinstance(resp, dict) and resp.get("error") not in (None, "success"): try: - _log(f"mpv osd-overlay returned error={resp.get('error')!r}") + _log(f"mpv show-text returned error={resp.get('error')!r}") except Exception: pass last_idx = idx @@ -1285,7 +1583,7 @@ def run_overlay(*, mpv: MPV, entries: List[LrcLine], poll_s: float = 0.15) -> in t = _ipc_get_property(client, "time-pos", None, raise_on_disconnect=True) except ConnectionError: try: - _osd_overlay_clear(client) + _osd_clear(client) except Exception: pass try: @@ -1311,8 +1609,17 @@ def run_overlay(*, mpv: MPV, entries: List[LrcLine], poll_s: float = 0.15) -> in line = entries[idx] if idx != last_idx or line.text != last_text: - # osd-overlay has no duration; refresh periodically. - resp = _osd_overlay_set_ass(client, _format_lyric_as_subtitle(line.text)) + # Show until the next lyric timestamp (or a sane max) to avoid flicker. + dur_ms = 1200 + try: + if idx + 1 < len(times): + nxt = float(times[idx + 1]) + cur = float(t) + dur_ms = int(max(250, min(8000, (nxt - cur) * 1000))) + except Exception: + dur_ms = 1200 + + resp = _osd_set_text(client, line.text, duration_ms=dur_ms) if resp is None: client.disconnect() if not client.connect(): @@ -1320,7 +1627,7 @@ def run_overlay(*, mpv: MPV, entries: List[LrcLine], poll_s: float = 0.15) -> in return 4 elif isinstance(resp, dict) and resp.get("error") not in (None, "success"): try: - _log(f"mpv osd-overlay returned error={resp.get('error')!r}") + _log(f"mpv show-text returned error={resp.get('error')!r}") except Exception: pass last_idx = idx diff --git a/MPV/mpv_ipc.py b/MPV/mpv_ipc.py index 7d47f96..fa0fad8 100644 --- a/MPV/mpv_ipc.py +++ b/MPV/mpv_ipc.py @@ -488,7 +488,6 @@ class MPV: tmp_dir = Path(os.environ.get("TEMP") or os.environ.get("TMP") or ".") except Exception: tmp_dir = Path(".") - log_path = str((tmp_dir / "medeia-mpv-lyric.log").resolve()) # Ensure the module can be imported even when the app is launched from a different cwd. # Repo root = parent of the MPV package directory. @@ -497,6 +496,17 @@ class MPV: except Exception: repo_root = Path.cwd() + # Prefer a stable in-repo log so users can inspect it easily. + log_path = None + try: + log_dir = (repo_root / "Log") + log_dir.mkdir(parents=True, exist_ok=True) + log_path = str((log_dir / "medeia-mpv-lyric.log").resolve()) + except Exception: + log_path = None + if not log_path: + log_path = str((tmp_dir / "medeia-mpv-lyric.log").resolve()) + py = sys.executable if platform.system() == "Windows": py = _windows_pythonw_exe(py) or py diff --git a/MPV/portable_config/script-opts/medeia.conf b/MPV/portable_config/script-opts/medeia.conf index 4077249..0cdb96d 100644 --- a/MPV/portable_config/script-opts/medeia.conf +++ b/MPV/portable_config/script-opts/medeia.conf @@ -1,2 +1,2 @@ # Medeia MPV script options -store=tutorial +store=default diff --git a/Provider/HIFI.py b/Provider/HIFI.py index 8c150c5..e5fd819 100644 --- a/Provider/HIFI.py +++ b/Provider/HIFI.py @@ -1,12 +1,15 @@ from __future__ import annotations import re +import shutil import sys +from pathlib import Path +import subprocess from typing import Any, Dict, List, Optional, Tuple from API.hifi import HifiApiClient from ProviderCore.base import Provider, SearchResult -from SYS.logger import log +from SYS.logger import debug, log DEFAULT_API_URLS = ( "https://tidal-api.binimum.org", @@ -27,6 +30,10 @@ _SEGMENT_BOUNDARY_RE = re.compile(r"(?=\b\w+\s*:)") class HIFI(Provider): + + TABLE_AUTO_PREFIXES = { + "hifi": ["download-file"], + } """Provider that targets the HiFi-RestAPI (Tidal proxy) search endpoint. The CLI can supply a list of fail-over URLs via ``provider.hifi.api_urls`` or @@ -86,6 +93,379 @@ class HIFI(Provider): return results[:limit] + @staticmethod + def _safe_filename(value: Any, *, fallback: str = "hifi") -> str: + text = str(value or "").strip() + if not text: + return fallback + text = re.sub(r"[<>:\"/\\|?*\x00-\x1f]", "_", text) + text = re.sub(r"\s+", " ", text).strip().strip(". ") + return text[:120] if text else fallback + + @staticmethod + def _parse_track_id(value: Any) -> Optional[int]: + if value is None: + return None + try: + track_id = int(value) + except Exception: + return None + return track_id if track_id > 0 else None + + def _extract_track_id_from_result(self, result: SearchResult) -> Optional[int]: + md = getattr(result, "full_metadata", None) + if isinstance(md, dict): + track_id = self._parse_track_id(md.get("trackId") or md.get("id")) + if track_id: + return track_id + + path = str(getattr(result, "path", "") or "").strip() + if path: + m = re.search(r"hifi:(?://)?track[\\/](\d+)", path, flags=re.IGNORECASE) + if m: + return self._parse_track_id(m.group(1)) + return None + + @staticmethod + def _find_ffmpeg() -> Optional[str]: + exe = shutil.which("ffmpeg") + if exe: + return exe + try: + repo_root = Path(__file__).resolve().parents[1] + bundled = repo_root / "MPV" / "ffmpeg" / "bin" / "ffmpeg.exe" + if bundled.is_file(): + return str(bundled) + except Exception: + pass + return None + + @staticmethod + def _find_ffprobe() -> Optional[str]: + exe = shutil.which("ffprobe") + if exe: + return exe + try: + repo_root = Path(__file__).resolve().parents[1] + bundled = repo_root / "MPV" / "ffmpeg" / "bin" / "ffprobe.exe" + if bundled.is_file(): + return str(bundled) + except Exception: + pass + return None + + def _probe_audio_codec(self, input_ref: str) -> Optional[str]: + """Best-effort probe for primary audio codec name (lowercase).""" + + candidate = str(input_ref or "").strip() + if not candidate: + return None + + ffprobe_path = self._find_ffprobe() + if ffprobe_path: + cmd = [ + ffprobe_path, + "-v", + "error", + "-select_streams", + "a:0", + "-show_entries", + "stream=codec_name", + "-of", + "default=nw=1:nk=1", + candidate, + ] + try: + proc = subprocess.run( + cmd, + capture_output=True, + text=True, + check=False, + ) + if proc.returncode == 0: + codec = str(proc.stdout or "").strip().lower() + if codec: + return codec + except Exception: + pass + + # Fallback: parse `ffmpeg -i` stream info. + ffmpeg_path = self._find_ffmpeg() + if not ffmpeg_path: + return None + try: + proc = subprocess.run( + [ffmpeg_path, "-hide_banner", "-i", candidate], + capture_output=True, + text=True, + check=False, + ) + text = (proc.stderr or "") + "\n" + (proc.stdout or "") + m = re.search(r"Audio:\s*([A-Za-z0-9_]+)", text) + if m: + return str(m.group(1)).strip().lower() + except Exception: + pass + return None + + @staticmethod + def _preferred_audio_suffix(codec: Optional[str], metadata: Optional[Dict[str, Any]] = None) -> str: + c = str(codec or "").strip().lower() + if c == "flac": + return ".flac" + if c in {"aac", "alac"}: + return ".m4a" + # Default to Matroska Audio for unknown / uncommon codecs. + return ".mka" + + @staticmethod + def _has_nonempty_file(path: Path) -> bool: + try: + return path.is_file() and path.stat().st_size > 0 + except Exception: + return False + + def _ffmpeg_demux_to_audio( + self, + *, + input_ref: str, + output_path: Path, + lossless_fallback: bool = True, + ) -> Optional[Path]: + ffmpeg_path = self._find_ffmpeg() + if not ffmpeg_path: + debug("[hifi] ffmpeg not found; cannot materialize audio from MPD") + return None + + if self._has_nonempty_file(output_path): + return output_path + + try: + output_path.parent.mkdir(parents=True, exist_ok=True) + except Exception: + pass + + protocol_whitelist = "file,https,http,tcp,tls,crypto,data" + + def _run(cmd: List[str]) -> bool: + try: + proc = subprocess.run( + cmd, + capture_output=True, + text=True, + check=False, + ) + if proc.returncode == 0 and self._has_nonempty_file(output_path): + return True + if proc.stderr: + debug(f"[hifi] ffmpeg failed: {proc.stderr.strip()}") + except Exception as exc: + debug(f"[hifi] ffmpeg invocation failed: {exc}") + return False + + # Prefer remux (fast, no transcode). + cmd_copy = [ + ffmpeg_path, + "-y", + "-hide_banner", + "-loglevel", + "error", + "-protocol_whitelist", + protocol_whitelist, + "-i", + str(input_ref), + "-vn", + "-c", + "copy", + str(output_path), + ] + if _run(cmd_copy): + return output_path + + if not lossless_fallback: + return None + + # Fallback: decode/transcode to FLAC to guarantee a supported file. + flac_path = ( + output_path + if output_path.suffix.lower() == ".flac" + else output_path.with_suffix(".flac") + ) + if self._has_nonempty_file(flac_path): + return flac_path + + # Avoid leaving a partial FLAC behind if we're transcoding into the final name. + tmp_flac_path = flac_path + if flac_path == output_path: + tmp_flac_path = output_path.with_name(f"{output_path.stem}.tmp{output_path.suffix}") + + cmd_flac = [ + ffmpeg_path, + "-y", + "-hide_banner", + "-loglevel", + "error", + "-protocol_whitelist", + protocol_whitelist, + "-i", + str(input_ref), + "-vn", + "-c:a", + "flac", + str(tmp_flac_path), + ] + try: + proc = subprocess.run( + cmd_flac, + capture_output=True, + text=True, + check=False, + ) + if proc.returncode == 0 and self._has_nonempty_file(tmp_flac_path): + if tmp_flac_path != flac_path: + try: + tmp_flac_path.replace(flac_path) + except Exception: + # If rename fails, still return the temp file. + return tmp_flac_path + return flac_path + if proc.stderr: + debug(f"[hifi] ffmpeg flac fallback failed: {proc.stderr.strip()}") + except Exception as exc: + debug(f"[hifi] ffmpeg flac fallback invocation failed: {exc}") + return None + + def download(self, result: SearchResult, output_dir: Path) -> Optional[Path]: + """Materialize a playable audio file from a Tidal DASH manifest.""" + + try: + output_dir.mkdir(parents=True, exist_ok=True) + except Exception: + pass + + raw_path = str(getattr(result, "path", "") or "").strip() + + md: Dict[str, Any] = {} + if isinstance(getattr(result, "full_metadata", None), dict): + md = dict(getattr(result, "full_metadata") or {}) + + if not md.get("manifest"): + track_id = self._extract_track_id_from_result(result) + if track_id: + detail = self._fetch_track_details(track_id) + if isinstance(detail, dict) and detail: + try: + md.update(detail) + except Exception: + md = detail + + # Best-effort: fetch synced lyric subtitles for MPV (LRC). + try: + track_id_for_lyrics = self._extract_track_id_from_result(result) + except Exception: + track_id_for_lyrics = None + if track_id_for_lyrics and not md.get("_tidal_lyrics_subtitles"): + lyr = self._fetch_track_lyrics(track_id_for_lyrics) + if isinstance(lyr, dict) and lyr: + try: + md.setdefault("lyrics", lyr) + except Exception: + pass + try: + subtitles = lyr.get("subtitles") + if isinstance(subtitles, str) and subtitles.strip(): + md["_tidal_lyrics_subtitles"] = subtitles.strip() + except Exception: + pass + + # Ensure downstream cmdlets see our enriched metadata. + try: + if isinstance(getattr(result, "full_metadata", None), dict): + result.full_metadata.update(md) + else: + result.full_metadata = md + except Exception: + pass + + try: + from cmdlet._shared import resolve_tidal_manifest_path + except Exception: + return None + + resolved = resolve_tidal_manifest_path({"full_metadata": md, "path": raw_path, "title": getattr(result, "title", "")}) + if not resolved: + return None + + resolved_text = str(resolved).strip() + if not resolved_text: + return None + + track_id = self._extract_track_id_from_result(result) + title_part = self._safe_filename(getattr(result, "title", None), fallback="hifi") + hash_part = self._safe_filename(md.get("manifestHash"), fallback="") + + stem_parts = ["hifi"] + if track_id: + stem_parts.append(str(track_id)) + if hash_part: + stem_parts.append(hash_part[:12]) + if title_part: + stem_parts.append(title_part) + stem = "-".join([p for p in stem_parts if p])[:180].rstrip("- ") + + codec = self._probe_audio_codec(resolved_text) + suffix = self._preferred_audio_suffix(codec, md) + + # If resolve_tidal_manifest_path returned a URL, prefer feeding it directly to ffmpeg. + if resolved_text.lower().startswith("http"): + out_file = output_dir / f"{stem}{suffix}" + materialized = self._ffmpeg_demux_to_audio(input_ref=resolved_text, output_path=out_file) + if materialized is not None: + return materialized + + # As a fallback, try downloading the URL directly if it looks like a file. + try: + import httpx + + resp = httpx.get(resolved_text, timeout=float(getattr(self, "api_timeout", 10.0))) + resp.raise_for_status() + content = resp.content + direct_path = output_dir / f"{stem}.bin" + with open(direct_path, "wb") as fh: + fh.write(content) + return direct_path + except Exception: + return None + + try: + source_path = Path(resolved_text) + except Exception: + return None + + if source_path.is_file() and source_path.suffix.lower() == ".mpd": + # Materialize audio from the local MPD. + out_file = output_dir / f"{stem}{suffix}" + materialized = self._ffmpeg_demux_to_audio(input_ref=str(source_path), output_path=out_file) + if materialized is not None: + return materialized + return None + + # If we somehow got a local audio file already, copy it to output_dir. + if source_path.is_file() and source_path.suffix.lower() in {".m4a", ".mp3", ".flac", ".wav", ".mka", ".mp4"}: + dest = output_dir / f"{stem}{source_path.suffix.lower()}" + if self._has_nonempty_file(dest): + return dest + try: + shutil.copyfile(source_path, dest) + return dest + except Exception: + return None + + # As a last resort, attempt to treat the local path as an ffmpeg input. + out_file = output_dir / f"{stem}{suffix}" + materialized = self._ffmpeg_demux_to_audio(input_ref=resolved_text, output_path=out_file) + return materialized + def _get_api_client_for_base(self, base_url: str) -> Optional[HifiApiClient]: base = base_url.rstrip("/") for client in self.api_clients: @@ -126,6 +506,8 @@ class HIFI(Provider): deduped: List[Dict[str, Any]] = [] for item in items: track_id = item.get("id") or item.get("trackId") + if track_id is None: + continue try: track_int = int(track_id) except Exception: @@ -381,6 +763,29 @@ class HIFI(Provider): continue return None + def _fetch_track_lyrics(self, track_id: int) -> Optional[Dict[str, Any]]: + if track_id <= 0: + return None + for base in self.api_urls: + endpoint = f"{base.rstrip('/')}/lyrics/" + try: + client = self._get_api_client_for_base(base) + payload = client.lyrics(track_id) if client else None + if not isinstance(payload, dict): + continue + + lyrics_obj = payload.get("lyrics") + if isinstance(lyrics_obj, dict) and lyrics_obj: + return lyrics_obj + + data_obj = payload.get("data") + if isinstance(data_obj, dict) and data_obj: + return data_obj + except Exception as exc: + debug(f"[hifi] Lyrics lookup failed for {endpoint}: {exc}") + continue + return None + def _build_track_columns(self, detail: Dict[str, Any], track_id: int) -> List[Tuple[str, str]]: values: List[Tuple[str, str]] = [ ("Track ID", str(track_id)), @@ -405,6 +810,27 @@ class HIFI(Provider): if not stage_is_last: return False + try: + current_table = ctx.get_current_stage_table() + except Exception: + current_table = None + table_type = ( + current_table.table + if current_table and hasattr(current_table, "table") + else None + ) + if isinstance(table_type, str) and table_type.strip().lower() == "hifi.track": + try: + meta = ( + current_table.get_table_metadata() + if current_table is not None and hasattr(current_table, "get_table_metadata") + else {} + ) + except Exception: + meta = {} + if isinstance(meta, dict) and meta.get("resolved_manifest"): + return False + contexts = self._extract_track_selection_context(selected_items) if not contexts: return False @@ -426,6 +852,10 @@ class HIFI(Provider): table = ResultTable("HIFI Track").set_preserve_order(True) table.set_table("hifi.track") + try: + table.set_table_metadata({"provider": "hifi", "view": "track", "resolved_manifest": True}) + except Exception: + pass results_payload: List[Dict[str, Any]] = [] for track_id, title, path, detail in track_details: # Decode the DASH MPD manifest to a local file and use it as the selectable/playable path. @@ -455,7 +885,7 @@ class HIFI(Provider): columns.insert(insert_pos, ("Album", album_title)) result = SearchResult( - table="hifi.track", + table="hifi", title=title, path=resolved_path, detail=f"id:{track_id}", diff --git a/Provider/bandcamp.py b/Provider/bandcamp.py index 4ef7f61..4a42bfc 100644 --- a/Provider/bandcamp.py +++ b/Provider/bandcamp.py @@ -16,6 +16,10 @@ except ImportError: # pragma: no cover class Bandcamp(Provider): """Search provider for Bandcamp.""" + TABLE_AUTO_STAGES = { + "bandcamp": ["download-file"], + } + @staticmethod def _base_url(raw_url: str) -> str: """Normalize a Bandcamp URL down to scheme://netloc.""" diff --git a/Provider/internetarchive.py b/Provider/internetarchive.py index 5964c78..f0f2a4f 100644 --- a/Provider/internetarchive.py +++ b/Provider/internetarchive.py @@ -27,8 +27,8 @@ def maybe_show_formats_table( Returns an exit code when handled; otherwise None. """ - if quiet_mode: - return None + # Do not suppress the picker in quiet/background mode: this selector UX is + # required for Internet Archive "details" pages (which are not directly downloadable). try: total_inputs = int(len(raw_urls or []) + len(piped_items or [])) @@ -107,7 +107,7 @@ def maybe_show_formats_table( base_args.extend(["-path", str(out_arg)]) table = ResultTable(table_title).set_preserve_order(True) - table.set_table("internetarchive.formats") + table.set_table("internetarchive.format") table.set_source_command("download-file", base_args) rows: List[Dict[str, Any]] = [] @@ -474,6 +474,13 @@ class InternetArchive(Provider): """ URL = ("archive.org",) + TABLE_AUTO_STAGES = { + "internetarchive": ["download-file"], + "internetarchive.folder": ["download-file"], + "internetarchive.format": ["download-file"], + "internetarchive.formats": ["download-file"], + } + def __init__(self, config: Optional[Dict[str, Any]] = None): super().__init__(config) conf = _pick_provider_config(self.config) @@ -577,7 +584,7 @@ class InternetArchive(Provider): path = f"https://archive.org/details/{identifier}" sr = SearchResult( - table="internetarchive", + table="internetarchive.folder", title=title, path=path, detail=" ยท ".join(detail_parts), diff --git a/Provider/libgen.py b/Provider/libgen.py index 54e27e9..5b96475 100644 --- a/Provider/libgen.py +++ b/Provider/libgen.py @@ -656,6 +656,10 @@ def _libgen_metadata_to_tags(meta: Dict[str, Any]) -> List[str]: class Libgen(Provider): + + TABLE_AUTO_STAGES = { + "libgen": ["download-file"], + } # Domains that should be routed to this provider when the user supplies a URL. # (Used by ProviderCore.registry.match_provider_name_for_url) URL_DOMAINS = ( diff --git a/Provider/openlibrary.py b/Provider/openlibrary.py index 6012934..47d85d6 100644 --- a/Provider/openlibrary.py +++ b/Provider/openlibrary.py @@ -214,10 +214,15 @@ def _archive_id_from_url(url: str) -> str: # - /details//... # - /borrow/ # - /download//... - if len(parts) >= 2 and parts[0].lower() in {"details", - "borrow", - "download", - "stream"}: + # - /stream//... + # - /metadata/ + if len(parts) >= 2 and parts[0].lower() in { + "details", + "borrow", + "download", + "stream", + "metadata", + }: return str(parts[1]).strip() # Sometimes the identifier is the first segment. @@ -225,37 +230,38 @@ def _archive_id_from_url(url: str) -> str: first = str(parts[0]).strip() if first and first.lower() not in {"account", "services", + "metadata", "search", "advancedsearch.php"}: return first - - def edition_id_from_url(u: str) -> str: - """Extract an OpenLibrary edition id (OL...M) from a book URL.""" - try: - p = urlparse(str(u)) - parts = [x for x in (p.path or "").split("/") if x] - except Exception: - parts = [] - if len(parts) >= 2 and str(parts[0]).lower() == "books": - return str(parts[1]).strip() - return "" - - - def title_hint_from_url_slug(u: str) -> str: - """Derive a human-friendly title hint from the URL slug.""" - try: - p = urlparse(str(u)) - parts = [x for x in (p.path or "").split("/") if x] - slug = parts[-1] if parts else "" - except Exception: - slug = "" - slug = (slug or "").strip().replace("_", " ") - return slug or "OpenLibrary" - return "" +def edition_id_from_url(u: str) -> str: + """Extract an OpenLibrary edition id (OL...M) from a book URL.""" + try: + p = urlparse(str(u)) + parts = [x for x in (p.path or "").split("/") if x] + except Exception: + parts = [] + if len(parts) >= 2 and str(parts[0]).lower() == "books": + return str(parts[1]).strip() + return "" + + +def title_hint_from_url_slug(u: str) -> str: + """Derive a human-friendly title hint from the URL slug.""" + try: + p = urlparse(str(u)) + parts = [x for x in (p.path or "").split("/") if x] + slug = parts[-1] if parts else "" + except Exception: + slug = "" + slug = (slug or "").strip().replace("_", " ") + return slug or "OpenLibrary" + + def _coerce_archive_field_list(value: Any) -> List[str]: """Coerce an Archive.org metadata field to a list of strings.""" if value is None: @@ -433,6 +439,22 @@ def _fetch_archive_item_metadata(archive_id: str, class OpenLibrary(Provider): + + TABLE_AUTO_STAGES = { + "openlibrary": ["download-file"], + } + + REQUIRED_CONFIG_KEYS = ( + "email", + "password", + ) + + DEFAULT_ARCHIVE_SCALE = 4 + QUALITY_TO_ARCHIVE_SCALE = { + "high": 2, + "medium": 5, + "low": 8, + } # Domains that should be routed to this provider when the user supplies a URL. # (Used by ProviderCore.registry.match_provider_name_for_url) URL_DOMAINS = ( @@ -449,6 +471,41 @@ class OpenLibrary(Provider): class BookNotAvailableError(Exception): """Raised when a book is not available for borrowing (waitlisted/in use).""" + def search_result_from_url(self, url: str) -> Optional[SearchResult]: + """Build a minimal SearchResult from a bare OpenLibrary/Archive URL.""" + edition_id = edition_id_from_url(url) + title_hint = title_hint_from_url_slug(url) + return SearchResult( + table="openlibrary", + title=title_hint, + path=str(url), + media_kind="book", + full_metadata={"openlibrary_id": edition_id} if edition_id else {}, + ) + + def download_url( + self, + url: str, + output_dir: Path, + progress_callback: Optional[Callable[[str, int, Optional[int], str], None]] = None, + ) -> Optional[Dict[str, Any]]: + """Download a book directly from an OpenLibrary/Archive URL. + + Returns a dict with the downloaded path and SearchResult when successful. + """ + sr = self.search_result_from_url(url) + if sr is None: + return None + + downloaded = self.download(sr, output_dir, progress_callback) + if not downloaded: + return None + + return { + "path": Path(downloaded), + "search_result": sr, + } + @staticmethod def _credential_archive(config: Dict[str, Any]) -> Tuple[Optional[str], @@ -491,6 +548,57 @@ class OpenLibrary(Provider): str(password) if password is not None else None ) + @classmethod + def _archive_scale_from_config(cls, config: Dict[str, Any]) -> int: + """Resolve Archive.org book-reader scale from provider config. + + Config: + [provider=OpenLibrary] + quality="medium" # High=2, Medium=5, Low=8 + + Default when missing/invalid: 4. + """ + + default_scale = int(getattr(cls, "DEFAULT_ARCHIVE_SCALE", 4) or 4) + if not isinstance(config, dict): + return default_scale + + provider_config = config.get("provider", {}) + openlibrary_config = None + if isinstance(provider_config, dict): + openlibrary_config = provider_config.get("openlibrary") + if not isinstance(openlibrary_config, dict): + openlibrary_config = {} + + raw_quality = openlibrary_config.get("quality") + if raw_quality is None: + return default_scale + + if isinstance(raw_quality, (int, float)): + try: + val = int(raw_quality) + except Exception: + return default_scale + return val if val > 0 else default_scale + + try: + q = str(raw_quality).strip().lower() + except Exception: + return default_scale + if not q: + return default_scale + + mapped = cls.QUALITY_TO_ARCHIVE_SCALE.get(q) + if isinstance(mapped, int) and mapped > 0: + return mapped + + # Allow numeric strings (e.g. quality="4"). + try: + val = int(q) + except Exception: + return default_scale + return val if val > 0 else default_scale + @staticmethod def _archive_error_body(response: requests.Response) -> str: try: @@ -1444,64 +1552,6 @@ class OpenLibrary(Provider): log("[openlibrary] Direct download failed", file=sys.stderr) return None - # --- Convenience helpers for URL-driven downloads (used by download-file) --- - - def search_result_from_url(self, url: str) -> Optional[SearchResult]: - """Build a minimal SearchResult from a bare OpenLibrary URL.""" - edition_id = edition_id_from_url(url) - title_hint = title_hint_from_url_slug(url) - return SearchResult( - table="openlibrary", - title=title_hint, - path=str(url), - media_kind="book", - full_metadata={"openlibrary_id": edition_id} if edition_id else {}, - ) - - def download_url( - self, - url: str, - output_dir: Path, - progress_callback: Optional[Callable[[str, int, Optional[int], str], None]] = None, - ) -> Optional[Dict[str, Any]]: - """Download a book directly from an OpenLibrary URL. - - Returns a dict with the downloaded path and SearchResult when successful. - """ - sr = self.search_result_from_url(url) - if sr is None: - return None - - downloaded = self.download(sr, output_dir, progress_callback) - if not downloaded: - return None - - return { - "path": Path(downloaded), - "search_result": sr, - } - try: - if progress_callback is not None: - progress_callback("step", 0, None, "direct download") - except Exception: - pass - out_path = unique_path(output_dir / f"{safe_title}.pdf") - ok = download_file( - pdf_url, - out_path, - session=self._session, - progress_callback=( - ( - lambda downloaded, total, label: - progress_callback("bytes", downloaded, total, label) - ) if progress_callback is not None else None - ), - ) - if ok: - return out_path - log("[openlibrary] Direct download failed", file=sys.stderr) - return None - # 2) Borrow flow (credentials required). try: email, password = self._credential_archive(self.config or {}) @@ -1510,6 +1560,15 @@ class OpenLibrary(Provider): "[openlibrary] Archive credentials missing; cannot borrow", file=sys.stderr ) + try: + from SYS.rich_display import show_provider_config_panel + + show_provider_config_panel( + "openlibrary", + keys=self.required_config_keys(), + ) + except Exception: + pass return None lendable = True @@ -1590,7 +1649,7 @@ class OpenLibrary(Provider): n_threads=10, directory=temp_dir, links=links, - scale=3, + scale=self._archive_scale_from_config(self.config or {}), book_id=archive_id, progress_callback=( ( diff --git a/Provider/podcastindex.py b/Provider/podcastindex.py index 8c2e3b9..665d2d0 100644 --- a/Provider/podcastindex.py +++ b/Provider/podcastindex.py @@ -29,6 +29,11 @@ def _get_podcastindex_credentials(config: Dict[str, Any]) -> Tuple[str, str]: class PodcastIndex(Provider): """Search provider for PodcastIndex.org.""" + TABLE_AUTO_STAGES = { + "podcastindex": ["download-file"], + "podcastindex.episodes": ["download-file"], + } + @staticmethod def _format_duration(value: Any) -> str: def _to_seconds(v: Any) -> Optional[int]: diff --git a/Provider/soulseek.py b/Provider/soulseek.py index 12da283..19dc0e4 100644 --- a/Provider/soulseek.py +++ b/Provider/soulseek.py @@ -204,6 +204,10 @@ def _suppress_aioslsk_noise() -> Any: class Soulseek(Provider): + + TABLE_AUTO_STAGES = { + "soulseek": ["download-file"], + } """Search provider for Soulseek P2P network.""" MUSIC_EXTENSIONS = { diff --git a/Provider/youtube.py b/Provider/youtube.py index 3db0a70..8fa722b 100644 --- a/Provider/youtube.py +++ b/Provider/youtube.py @@ -10,6 +10,12 @@ from SYS.logger import log class YouTube(Provider): """Search provider for YouTube using the yt_dlp Python package.""" + TABLE_AUTO_STAGES = { + "youtube": ["download-file"], + } + # If the user provides extra args on the selection stage, forward them to download-file. + AUTO_STAGE_USE_SELECTION_ARGS = True + def search( self, query: str, diff --git a/ProviderCore/base.py b/ProviderCore/base.py index 467b542..2f6a8c4 100644 --- a/ProviderCore/base.py +++ b/ProviderCore/base.py @@ -55,10 +55,40 @@ class Provider(ABC): URL: Sequence[str] = () + # Optional provider-driven defaults for what to do when a user selects @N from a + # provider table. The CLI uses this to auto-insert stages (e.g. download-file) + # without hardcoding table names. + # + # Example: + # TABLE_AUTO_STAGES = {"youtube": ["download-file"]} + # TABLE_AUTO_PREFIXES = {"hifi": ["download-file"]} # matches hifi.* + TABLE_AUTO_STAGES: Dict[str, Sequence[str]] = {} + TABLE_AUTO_PREFIXES: Dict[str, Sequence[str]] = {} + AUTO_STAGE_USE_SELECTION_ARGS: bool = False + + # Optional provider-declared configuration keys. + # Used for dynamically generating config panels (e.g., missing credentials). + REQUIRED_CONFIG_KEYS: Sequence[str] = () + def __init__(self, config: Optional[Dict[str, Any]] = None): self.config = config or {} self.name = self.__class__.__name__.lower() + @classmethod + def required_config_keys(cls) -> List[str]: + keys = getattr(cls, "REQUIRED_CONFIG_KEYS", None) + if not keys: + return [] + out: List[str] = [] + try: + for k in list(keys): + s = str(k or "").strip() + if s: + out.append(s) + except Exception: + return [] + return out + # Standard lifecycle/auth hook. def login(self, **_kwargs: Any) -> bool: return True @@ -109,6 +139,56 @@ class Provider(ABC): _ = stage_is_last return False + @classmethod + def selection_auto_stage( + cls, + table_type: str, + stage_args: Optional[Sequence[str]] = None, + ) -> Optional[List[str]]: + """Return a stage to auto-run after selecting from `table_type`. + + This is used by the CLI to auto-insert default stages for provider tables + (e.g. select a YouTube row -> auto-run download-file). + + Providers can implement this via class attributes (TABLE_AUTO_STAGES / + TABLE_AUTO_PREFIXES) or by overriding this method. + """ + t = str(table_type or "").strip().lower() + if not t: + return None + + stage: Optional[Sequence[str]] = None + try: + stage = cls.TABLE_AUTO_STAGES.get(t) + except Exception: + stage = None + + if stage is None: + try: + for prefix, cmd in (cls.TABLE_AUTO_PREFIXES or {}).items(): + p = str(prefix or "").strip().lower() + if not p: + continue + if t == p or t.startswith(p + ".") or t.startswith(p): + stage = cmd + break + except Exception: + stage = None + + if not stage: + return None + + out = [str(x) for x in stage if str(x or "").strip()] + if not out: + return None + + if cls.AUTO_STAGE_USE_SELECTION_ARGS and stage_args: + try: + out.extend([str(x) for x in stage_args if str(x or "").strip()]) + except Exception: + pass + return out + @classmethod def url_patterns(cls) -> Tuple[str, ...]: """Return normalized URL patterns that this provider handles.""" diff --git a/ProviderCore/registry.py b/ProviderCore/registry.py index 93f6b6f..748c706 100644 --- a/ProviderCore/registry.py +++ b/ProviderCore/registry.py @@ -49,6 +49,38 @@ _PROVIDERS: Dict[str, } +def get_provider_class(name: str) -> Optional[Type[Provider]]: + """Return the provider class for a registered provider name, if any.""" + key = str(name or "").strip().lower() + return _PROVIDERS.get(key) + + +def selection_auto_stage_for_table( + table_type: str, + stage_args: Optional[Sequence[str]] = None, +) -> Optional[list[str]]: + """Return the provider-suggested stage to auto-run for a selected table. + + This is used by the CLI to avoid hardcoding table names and behaviors. + """ + t = str(table_type or "").strip().lower() + if not t: + return None + + # Provider tables are usually either: + # - "youtube" (no dot) + # - "hifi.tracks" (prefix = provider name) + provider_key = t.split(".", 1)[0] if "." in t else t + provider_class = get_provider_class(provider_key) or get_provider_class(t) + if provider_class is None: + return None + + try: + return provider_class.selection_auto_stage(t, stage_args) + except Exception: + return None + + def is_known_provider_name(name: str) -> bool: """Return True if `name` matches a registered provider key. @@ -251,4 +283,6 @@ __all__ = [ "match_provider_name_for_url", "get_provider_for_url", "download_soulseek_file", + "get_provider_class", + "selection_auto_stage_for_table", ] diff --git a/SYS/cmdlet_catalog.py b/SYS/cmdlet_catalog.py index feb446d..51ff176 100644 --- a/SYS/cmdlet_catalog.py +++ b/SYS/cmdlet_catalog.py @@ -147,6 +147,14 @@ def get_cmdlet_metadata( details = getattr(data, "detail", base.get("detail", [])) or [] args_list = getattr(data, "arg", base.get("arg", [])) or [] args = [_normalize_arg(arg) for arg in args_list] + examples_list = getattr(data, "examples", base.get("examples", [])) or [] + if not examples_list: + examples_list = getattr(data, "example", base.get("example", [])) or [] + examples = [] + for example in examples_list: + text = str(example or "").strip() + if text: + examples.append(text) if _should_hide_db_args(config): args = [a for a in args if not a.get("requires_db")] @@ -158,6 +166,7 @@ def get_cmdlet_metadata( "summary": summary, "details": details, "args": args, + "examples": examples, "raw": data, } @@ -183,6 +192,7 @@ def list_cmdlet_metadata(config: Optional[Dict[str, Any]] = None) -> Dict[str, D "summary": "", "details": [], "args": [], + "examples": meta.get("examples", []), "raw": meta.get("raw"), }, ) @@ -198,6 +208,15 @@ def list_cmdlet_metadata(config: Optional[Dict[str, Any]] = None) -> Dict[str, D base["details"] = meta["details"] if not base.get("args") and meta.get("args"): base["args"] = meta["args"] + example_sources: List[str] = [] + for attr in ("examples", "example"): + values = meta.get(attr, []) if isinstance(meta, dict) else [] + example_sources.extend(values or []) + merged_examples = [e for e in base.get("examples", []) or []] + for example_entry in example_sources: + if example_entry not in merged_examples: + merged_examples.append(example_entry) + base["examples"] = merged_examples if not base.get("raw"): base["raw"] = meta.get("raw") entries[canonical] = base @@ -211,6 +230,7 @@ def list_cmdlet_metadata(config: Optional[Dict[str, Any]] = None) -> Dict[str, D "summary": "", "details": [], "args": [], + "examples": [], "raw": None, }, ) diff --git a/cmdlet/_shared.py b/cmdlet/_shared.py index d67f20d..2772c02 100644 --- a/cmdlet/_shared.py +++ b/cmdlet/_shared.py @@ -28,6 +28,7 @@ class CmdletArg: """Argument type: 'string', 'int', 'flag', 'enum', etc.""" required: bool = False """Whether this argument is required""" + description: str = "" """Human-readable description of the argument""" choices: List[str] = field(default_factory=list) @@ -424,6 +425,8 @@ class Cmdlet: """List of arguments accepted by this cmdlet""" detail: List[str] = field(default_factory=list) """Detailed explanation lines (for help text)""" + examples: List[str] = field(default_factory=list) + """Example invocations shown in `.help`.""" # Execution function: func(result, args, config) -> int exec: Optional[Callable[[Any, Sequence[str], diff --git a/cmdlet/add_file.py b/cmdlet/add_file.py index 33eb167..4cbe781 100644 --- a/cmdlet/add_file.py +++ b/cmdlet/add_file.py @@ -185,6 +185,9 @@ class Add_File(Cmdlet): " file.io: Upload to file.io for temporary hosting", " internetarchive: Upload to archive.org (optional tag: ia: to upload into an existing item)", ], + examples=[ + 'download-file "https://themathesontrust.org/papers/christianity/alcock-alphabet1.pdf" | add-file -store tutorial', + ], exec=self.run, ) self.register() @@ -1950,6 +1953,18 @@ class Add_File(Cmdlet): except Exception as exc: debug(f"[add-file] sub note write failed: {exc}") + lyric_note = Add_File._get_note_text(result, pipe_obj, "lyric") + if lyric_note: + try: + setter = getattr(backend, "set_note", None) + if callable(setter): + debug( + f"[add-file] Writing lyric note (len={len(str(lyric_note))}) to {backend_name}:{resolved_hash}" + ) + setter(resolved_hash, "lyric", lyric_note) + except Exception as exc: + debug(f"[add-file] lyric note write failed: {exc}") + chapters_note = Add_File._get_note_text(result, pipe_obj, "chapters") if chapters_note: try: diff --git a/cmdlet/download_file.py b/cmdlet/download_file.py index 575265b..bb1ebf6 100644 --- a/cmdlet/download_file.py +++ b/cmdlet/download_file.py @@ -15,6 +15,8 @@ from typing import Any, Dict, List, Optional, Sequence from urllib.parse import urlparse from contextlib import AbstractContextManager, nullcontext +import requests + from API.alldebrid import is_magnet_link from Provider import internetarchive as ia_provider from Provider import alldebrid as ad_provider @@ -142,6 +144,85 @@ class Download_File(Cmdlet): return expanded_urls + @staticmethod + def _rewrite_archive_org_urls(raw_urls: Sequence[str]) -> List[str]: + """Rewrite Archive.org URLs using metadata JSON to pick the right flow. + + - /metadata/: + - if lendable (collection contains inlibrary/printdisabled/lendinglibrary) -> /borrow/ + - else -> /details/ + - /details/: + - if lendable -> /borrow/ + + This makes `download-file` do the right thing for borrow-only items. + """ + + out: List[str] = [] + for u in list(raw_urls or []): + s = str(u or "").strip() + if not s: + continue + + try: + p = urlparse(s) + host = (p.hostname or "").strip().lower() + path = (p.path or "").strip() + except Exception: + out.append(s) + continue + + if not host or (host != "archive.org" and not host.endswith(".archive.org")): + out.append(s) + continue + + low_path = path.lower().strip() + if not (low_path.startswith("/metadata/") or low_path.startswith("/details/")): + out.append(s) + continue + + parts = [x for x in path.split("/") if x] + if len(parts) < 2: + out.append(s) + continue + head = str(parts[0] or "").strip().lower() + archive_id = str(parts[1] or "").strip() + if head not in {"metadata", "details"} or not archive_id: + out.append(s) + continue + + lendable = False + try: + meta_url = f"https://archive.org/metadata/{archive_id}" + resp = requests.get(meta_url, timeout=8) + resp.raise_for_status() + data = resp.json() if resp is not None else {} + meta = data.get("metadata", {}) if isinstance(data, dict) else {} + collection = meta.get("collection") if isinstance(meta, dict) else None + + values: List[str] = [] + if isinstance(collection, list): + values = [str(x).strip().lower() for x in collection if str(x).strip()] + elif isinstance(collection, str): + values = [collection.strip().lower()] if collection.strip() else [] + + lendable = any(v in {"inlibrary", "printdisabled", "lendinglibrary"} for v in values) + except Exception: + lendable = False + + if lendable: + debug(f"[download-file] archive.org item '{archive_id}' looks lendable; using borrow flow") + out.append(f"https://archive.org/borrow/{archive_id}") + continue + + # Non-lendable: turn metadata URLs into details URLs so IA picker can show files. + if head == "metadata": + out.append(f"https://archive.org/details/{archive_id}") + continue + + out.append(s) + + return out + @staticmethod def _collect_piped_items_if_no_urls(result: Any, raw_urls: Sequence[str]) -> List[Any]: @@ -232,6 +313,14 @@ class Download_File(Cmdlet): title_val = (title_hint or downloaded_path.stem or "Unknown").strip() or downloaded_path.stem hash_value = self._compute_file_hash(downloaded_path) + notes: Optional[Dict[str, str]] = None + try: + if isinstance(full_metadata, dict): + subtitles = full_metadata.get("_tidal_lyrics_subtitles") + if isinstance(subtitles, str) and subtitles.strip(): + notes = {"lyric": subtitles} + except Exception: + notes = None tag: List[str] = [] if tags_hint: tag.extend([str(t) for t in tags_hint if t]) @@ -253,6 +342,8 @@ class Download_File(Cmdlet): payload["provider"] = str(provider_hint) if full_metadata: payload["full_metadata"] = full_metadata + if notes: + payload["notes"] = notes if source and str(source).startswith("http"): payload["url"] = source elif source: @@ -890,52 +981,85 @@ class Download_File(Cmdlet): msg += f" (availability={availability or ''} reason={reason or ''})" log(msg, file=sys.stderr) - # Fallback: run a LibGen title search so the user can pick an alternative source. + # Fallback: show a LibGen selectable ResultTable (no emits) so the user can pick @N. + # This intentionally mirrors `search-file -provider libgen` UX: results table + selection. try: title_text = str(title or "").strip() if not title_text and isinstance(full_metadata, dict): title_text = str(full_metadata.get("title") or "").strip() - if title_text: + if title_text and get_search_provider and SearchResult: log( f"[download-file] Not available on OpenLibrary; searching LibGen for: {title_text}", file=sys.stderr, ) - from cmdlet.search_file import CMDLET as _SEARCH_FILE_CMDLET - - fallback_query = title_text - exec_fn = getattr(_SEARCH_FILE_CMDLET, "exec", None) - if not callable(exec_fn): + libgen_provider = get_search_provider("libgen", config) + if libgen_provider is None: log( - "[download-file] search-file cmdlet unavailable; cannot run LibGen fallback search", + "[download-file] LibGen provider unavailable; cannot run fallback search", file=sys.stderr, ) continue - ret = exec_fn( - None, - ["-provider", - "libgen", - "-query", - fallback_query], - config, - ) - - # Promote the search-file table to a display overlay so it renders. try: - table_obj = pipeline_context.get_last_result_table() - items_obj = pipeline_context.get_last_result_items() - if table_obj is not None: - pipeline_context.set_last_result_table_overlay( - table_obj, - items_obj - ) + from SYS.result_table import ResultTable + except Exception: + ResultTable = None # type: ignore[assignment] + + if ResultTable is None: + log( + "[download-file] ResultTable unavailable; cannot render LibGen fallback search", + file=sys.stderr, + ) + continue + + fallback_query = title_text + # Keep parity with search-file provider default when user didn't specify a limit. + results = libgen_provider.search(fallback_query, limit=50) + if not results: + log( + f"[download-file] LibGen: no results found for: {fallback_query}", + file=sys.stderr, + ) + continue + + table_title = f"Libgen: {fallback_query}".strip().rstrip(":") + table_obj = ResultTable(table_title).set_preserve_order(False) + table_obj.set_table("libgen") + try: + table_obj.set_table_metadata({"provider": "libgen"}) except Exception: pass + # Mark as produced by download-file so the pipeline runner pauses and stores tail stages. + table_obj.set_source_command("download-file", []) + + results_list: List[Dict[str, Any]] = [] + for search_result in results: + item_dict = ( + search_result.to_dict() + if hasattr(search_result, "to_dict") + else dict(search_result) + if isinstance(search_result, dict) + else {"title": str(search_result)} + ) + if "table" not in item_dict: + item_dict["table"] = "libgen" + table_obj.add_result(search_result) + results_list.append(item_dict) + + # Seed selection state for @N and pause the pipeline. try: - return int(ret) # type: ignore[arg-type] + pipeline_context.set_last_result_table(table_obj, results_list) except Exception: - return 1 + pass + try: + pipeline_context.set_current_stage_table(table_obj) + except Exception: + pass + + # Returning 0 with a selectable stage table and no emits causes the CLI to render + # the table and pause, preserving the downstream pipeline tail. + return 0 except Exception: pass @@ -976,6 +1100,15 @@ class Download_File(Cmdlet): ) continue + # Prefer provider-enriched metadata (providers may mutate sr.full_metadata). + if provider_sr is not None: + try: + sr_md = getattr(provider_sr, "full_metadata", None) + if isinstance(sr_md, dict) and sr_md: + full_metadata = sr_md + except Exception: + pass + # Allow providers to add/enrich tags and metadata during download. if str(table or "").lower() == "libgen" and provider_sr is not None: try: @@ -3305,6 +3438,7 @@ class Download_File(Cmdlet): parsed = parse_cmdlet_args(args, self) raw_url = self._normalize_urls(parsed) + raw_url = self._rewrite_archive_org_urls(raw_url) piped_items = self._collect_piped_items_if_no_urls(result, raw_url) had_piped_input = False @@ -3346,6 +3480,26 @@ class Download_File(Cmdlet): log("No url or piped items to download", file=sys.stderr) return 1 + # Internet Archive details URLs should present a downloadable file picker + # before we try any streaming/ytdlp probing. + try: + quiet_mode = ( + bool(config.get("_quiet_background_output")) + if isinstance(config, dict) else False + ) + except Exception: + quiet_mode = False + ia_picker_exit = ia_provider.maybe_show_formats_table( + raw_urls=raw_url, + piped_items=piped_items, + parsed=parsed, + config=config, + quiet_mode=quiet_mode, + get_field=get_field, + ) + if ia_picker_exit is not None: + return int(ia_picker_exit) + streaming_candidates = self._append_urls_from_piped_result(list(raw_url), result) supported_streaming, unsupported_streaming = self._filter_supported_urls(streaming_candidates) @@ -3360,13 +3514,13 @@ class Download_File(Cmdlet): ) if streaming_exit_code == 0: streaming_downloaded += 1 + # Only remove URLs from further processing when streaming succeeded. + raw_url = [u for u in raw_url if u not in supported_streaming] + if not raw_url and not unsupported_streaming: + piped_items = [] - raw_url = [u for u in raw_url if u not in supported_streaming] - if not raw_url and not unsupported_streaming: - piped_items = [] - - if not raw_url and not piped_items: - return int(streaming_exit_code or 0) + if not raw_url and not piped_items: + return int(streaming_exit_code or 0) quiet_mode = ( bool(config.get("_quiet_background_output")) diff --git a/cmdlet/search_file.py b/cmdlet/search_file.py index 2497d8b..1ab7dbf 100644 --- a/cmdlet/search_file.py +++ b/cmdlet/search_file.py @@ -110,6 +110,17 @@ class search_file(Cmdlet): ext = "".join(ch for ch in ext if ch.isalnum()) return ext[:5] + @staticmethod + def _get_hifi_view_from_query(query: str) -> str: + text = str(query or "").strip() + if not text: + return "track" + if re.search(r"\balbum\s*:", text, flags=re.IGNORECASE): + return "album" + if re.search(r"\bartist\s*:", text, flags=re.IGNORECASE): + return "artist" + return "track" + def _ensure_storage_columns(self, payload: Dict[str, Any]) -> Dict[str, Any]: """Ensure storage results have the necessary fields for result_table display.""" store_value = str(payload.get("store") or "").lower() @@ -236,9 +247,18 @@ class search_file(Cmdlet): table_title = f"{provider_label}: {query}".strip().rstrip(":") preserve_order = provider_lower in {"youtube", "openlibrary", "loc"} - table = ResultTable(table_title).set_preserve_order(preserve_order) - table.set_table(provider_name) + table_type = provider_name table_meta: Dict[str, Any] = {"provider": provider_name} + if provider_lower == "hifi": + view = self._get_hifi_view_from_query(query) + table_meta["view"] = view + table_type = f"hifi.{view}" + elif provider_lower == "internetarchive": + # Internet Archive search results are effectively folders (items); selecting @N + # should open a list of downloadable files for the chosen item. + table_type = "internetarchive.folder" + table = ResultTable(table_title).set_preserve_order(preserve_order) + table.set_table(table_type) if provider_lower == "alldebrid": table_meta["view"] = "files" if effective_open_id is not None else "folders" if effective_open_id is not None: @@ -277,7 +297,7 @@ class search_file(Cmdlet): ) if "table" not in item_dict: - item_dict["table"] = provider_name + item_dict["table"] = table_type row_index = len(table.rows) table.add_result(search_result) diff --git a/cmdnat/help.py b/cmdnat/help.py index 2e9a497..b793432 100644 --- a/cmdnat/help.py +++ b/cmdnat/help.py @@ -15,7 +15,10 @@ def _normalize_choice_list(arg_names: Optional[List[str]]) -> List[str]: return sorted(set(arg_names or [])) -def _examples_for_cmd(name: str) -> List[str]: +_HELP_EXAMPLE_SOURCE_COMMAND = ".help-example" + + +def _example_for_cmd(name: str) -> List[str]: """Return example invocations for a given command (best-effort).""" lookup = { ".adjective": [ @@ -28,6 +31,21 @@ def _examples_for_cmd(name: str) -> List[str]: return lookup.get(key, []) +def _parse_example_tokens(example: str) -> List[str]: + """Split an example string into CLI tokens suitable for @N selection.""" + + text = str(example or "").strip() + if not text: + return [] + + try: + tokens = shlex.split(text) + except Exception: + tokens = text.split() + + return [token for token in tokens if token] + + def _normalize_cmdlet_key(name: Optional[str]) -> str: return str(name or "").replace("_", "-").lower().strip() @@ -103,6 +121,16 @@ def _gather_metadata_from_cmdlet_classes() -> Tuple[Dict[str, Dict[str, Any]], D canonical_key = _normalize_cmdlet_key(getattr(cmdlet_obj, "name", None) or "") if not canonical_key: continue + example_entries: List[str] = [] + seen_example_entries: set[str] = set() + for attr in ("examples", "example"): + for value in (getattr(cmdlet_obj, attr, []) or []): + text = str(value or "").strip() + if not text or text in seen_example_entries: + continue + seen_example_entries.add(text) + example_entries.append(text) + entry = { "name": str(getattr(cmdlet_obj, "name", "") or canonical_key), "summary": str(getattr(cmdlet_obj, "summary", "") or ""), @@ -110,6 +138,7 @@ def _gather_metadata_from_cmdlet_classes() -> Tuple[Dict[str, Dict[str, Any]], D "aliases": _cmdlet_aliases(cmdlet_obj), "details": list(getattr(cmdlet_obj, "detail", []) or []), "args": [_cmdlet_arg_to_dict(a) for a in getattr(cmdlet_obj, "arg", []) or []], + "examples": example_entries, "raw": getattr(cmdlet_obj, "raw", None), } metadata[canonical_key] = entry @@ -185,60 +214,66 @@ def _render_list( ctx.set_last_result_table(table, items) ctx.set_current_stage_table(table) + setattr(table, "_rendered_by_cmdlet", True) from SYS.rich_display import stdout_console stdout_console().print(table) -def _render_detail(meta: Dict[str, Any], args: Sequence[str]) -> None: - title = f"Help: {meta.get('name', '') or 'cmd'}" - table = ResultTable(title) - table.set_source_command(".help", list(args)) - - header_lines: List[str] = [] +def _render_detail(meta: Dict[str, Any], _args: Sequence[str]) -> None: + cmd_name = str(meta.get("name", "") or "cmd") + title = f"Help: {cmd_name}" summary = meta.get("summary", "") usage = meta.get("usage", "") aliases = meta.get("aliases", []) or [] - examples = _examples_for_cmd(meta.get("name", "")) - first_example_tokens: List[str] = [] - first_example_cmd: Optional[str] = None - if examples: - try: - split_tokens = shlex.split(examples[0]) - if split_tokens: - first_example_cmd = split_tokens[0] - first_example_tokens = split_tokens[1:] - except Exception: - pass + details = meta.get("details", []) or [] + seen_examples: set[str] = set() + explicit_example: List[str] = [] + for attr in ("examples", "example"): + for value in (meta.get(attr, []) or []): + text = str(value or "").strip() + if not text or text in seen_examples: + continue + seen_examples.add(text) + explicit_example.append(text) + fallback_example = _example_for_cmd(cmd_name) + for fallback in fallback_example: + text = str(fallback or "").strip() + if not text or text in seen_examples: + continue + seen_examples.add(text) + explicit_example.append(text) + + header_lines: List[str] = [] if summary: header_lines.append(summary) if usage: header_lines.append(f"Usage: {usage}") if aliases: header_lines.append("Aliases: " + ", ".join(aliases)) - if examples: - header_lines.append("Examples: " + " | ".join(examples)) - if header_lines: - table.set_header_lines(header_lines) + if details: + header_lines.extend(str(line) for line in details if str(line).strip()) + if explicit_example: + header_lines.append("Examples available below") args_meta = meta.get("args", []) or [] - example_text = " | ".join(examples) - # If we have an example, use it as the source command so @N runs that example - if first_example_cmd: - table.set_source_command(first_example_cmd, []) + + args_table = ResultTable(title) + if header_lines: + args_table.set_header_lines(header_lines) + args_table.set_preserve_order(True) + args_table.set_no_choice(True) + if not args_meta: - row = table.add_row() + row = args_table.add_row() row.add_column("Arg", "(none)") row.add_column("Type", "") row.add_column("Req", "") row.add_column("Description", "") - row.add_column("Example", example_text) - if first_example_tokens: - table.set_row_selection_args(len(table.rows) - 1, first_example_tokens) else: for arg in args_meta: - row = table.add_row() + row = args_table.add_row() name = arg.get("name") or "" row.add_column("Arg", f"-{name}" if name else "") row.add_column("Type", arg.get("type", "")) @@ -249,15 +284,38 @@ def _render_detail(meta: Dict[str, Any], args: Sequence[str]) -> None: choice_text = f"choices: {', '.join(choices)}" desc = f"{desc} ({choice_text})" if desc else choice_text row.add_column("Description", desc) - row.add_column("Example", example_text) - if first_example_tokens: - table.set_row_selection_args(len(table.rows) - 1, first_example_tokens) - ctx.set_last_result_table_overlay(table, [meta]) - ctx.set_current_stage_table(table) + example_table = ResultTable(f"{cmd_name} Examples") + example_table.set_preserve_order(True) + example_table.set_header_line("Select @N to insert the example command into the REPL.") + + example_items: List[str] = [] + if explicit_example: + for idx, example_cmd in enumerate(explicit_example): + example_text = str(example_cmd or "").strip() + row = example_table.add_row() + row.add_column("Example", example_text or "(empty example)") + example_items.append(example_text) + if example_text: + tokens = _parse_example_tokens(example_text) + if tokens: + example_table.set_row_selection_args(idx, tokens) + else: + example_table.set_no_choice(True) + row = example_table.add_row() + row.add_column("Example", "(no examples available)") + + ctx.set_last_result_table(example_table, example_items) + ctx.set_current_stage_table(example_table) + setattr(example_table, "_rendered_by_cmdlet", True) + example_table.set_source_command(_HELP_EXAMPLE_SOURCE_COMMAND) from SYS.rich_display import stdout_console - stdout_console().print(table) + stdout_console().print() + stdout_console().print(args_table) + stdout_console().print() + stdout_console().print(example_table) + stdout_console().print() def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: diff --git a/cmdnat/pipe.py b/cmdnat/pipe.py index 3cc3709..4b294c5 100644 --- a/cmdnat/pipe.py +++ b/cmdnat/pipe.py @@ -201,6 +201,69 @@ def _send_ipc_command(command: Dict[str, Any], silent: bool = False) -> Optional return None +def _extract_store_and_hash(item: Any) -> tuple[Optional[str], Optional[str]]: + store: Optional[str] = None + file_hash: Optional[str] = None + + try: + if isinstance(item, dict): + store = item.get("store") + file_hash = item.get("hash") or item.get("file_hash") + else: + store = getattr(item, "store", None) + file_hash = getattr(item, "hash", None) or getattr(item, "file_hash", None) + except Exception: + store = None + file_hash = None + + try: + store = str(store).strip() if store else None + except Exception: + store = None + + try: + file_hash = str(file_hash).strip().lower() if file_hash else None + except Exception: + file_hash = None + + if not file_hash: + try: + text = None + if isinstance(item, dict): + text = item.get("path") or item.get("url") or item.get("filename") + else: + text = getattr(item, "path", None) or getattr(item, "url", None) + if text: + m = re.search(r"[0-9a-f]{64}", str(text).lower()) + if m: + file_hash = m.group(0) + except Exception: + pass + + return store, file_hash + + +def _set_mpv_item_context(store: Optional[str], file_hash: Optional[str]) -> None: + # Properties consumed by MPV.lyric + try: + _send_ipc_command( + { + "command": ["set_property", "user-data/medeia-item-store", store or ""], + "request_id": 901, + }, + silent=True, + ) + _send_ipc_command( + { + "command": ["set_property", "user-data/medeia-item-hash", file_hash or ""], + "request_id": 902, + }, + silent=True, + ) + except Exception: + pass + + def _get_playlist(silent: bool = False) -> Optional[List[Dict[str, Any]]]: """Get the current playlist from MPV. Returns None if MPV is not running.""" cmd = { @@ -1014,6 +1077,15 @@ def _queue_items( if clear_first and i == 0: mode = "replace" + # If we're replacing, this will start playing immediately: set store/hash context + # so MPV.lyric can resolve the correct backend for notes. + if mode == "replace": + try: + s, h = _extract_store_and_hash(item) + _set_mpv_item_context(s, h) + except Exception: + pass + # If this is a Hydrus path, set header property and yt-dlp headers before loading. # Use the real target (not the memory:// wrapper) for detection. if effective_hydrus_header and _is_hydrus_path(str(target), @@ -1209,7 +1281,7 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: result_obj = { "path": filename, "title": title, - "cmdlet_name": ".pipe", + "cmdlet_name": ".mpv", "source": "pipe", "__pipe_index": items.index(current_item), } @@ -1392,7 +1464,7 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: # We also set the source command to .pipe -load so it loads it table.set_row_selection_args(i, ["-load", str(pl["id"])]) - table.set_source_command(".pipe") + table.set_source_command(".mpv") # Register results ctx.set_last_result_table_overlay( @@ -1528,6 +1600,13 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: if should_autoplay and after_len > 0: idx_to_play = min(max(0, before_len), after_len - 1) + + # Prefer the store/hash from the piped item when auto-playing. + try: + s, h = _extract_store_and_hash(items_to_add[0]) + _set_mpv_item_context(s, h) + except Exception: + pass play_resp = _send_ipc_command( { "command": ["playlist-play-index", @@ -1657,6 +1736,11 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: return 1 else: # Play item + try: + s, h = _extract_store_and_hash(item) + _set_mpv_item_context(s, h) + except Exception: + pass if hydrus_header and _is_hydrus_path(filename, hydrus_url): header_cmd = { "command": @@ -1805,7 +1889,7 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: table.set_row_selection_args(i, [str(i + 1)]) - table.set_source_command(".pipe") + table.set_source_command(".mpv") # Register PipeObjects (not raw MPV items) with pipeline context ctx.set_last_result_table_overlay(table, pipe_objects) @@ -1980,6 +2064,15 @@ def _start_mpv( debug("Timed out waiting for MPV IPC connection", file=sys.stderr) return + # Publish context early so the lyric helper can resolve notes on the first + # target change (the helper may start before playback begins). + try: + if items: + s, h = _extract_store_and_hash(items[0]) + _set_mpv_item_context(s, h) + except Exception: + pass + # main.lua is loaded at startup via --script; don't reload it here. # Ensure lyric overlay is running (auto-discovery handled by MPV.lyric). @@ -2020,10 +2113,10 @@ def _start_mpv( CMDLET = Cmdlet( - name=".pipe", - alias=["pipe", "playlist", "queue", "ls-pipe"], + name=".mpv", + alias=[".pipe", "pipe", "playlist", "queue", "ls-pipe"], summary="Manage and play items in the MPV playlist via IPC", - usage=".pipe [index|url] [-current] [-clear] [-list] [-url URL] [-log] [-borderless]", + usage=".mpv [index|url] [-current] [-clear] [-list] [-url URL] [-log] [-borderless]", arg=[ CmdletArg( name="index",