diff --git a/MPV/LUA/main.lua b/MPV/LUA/main.lua index d494860..35d3efa 100644 --- a/MPV/LUA/main.lua +++ b/MPV/LUA/main.lua @@ -42,6 +42,7 @@ local LOAD_URL_MENU_TYPE = 'medios_load_url' local DOWNLOAD_FORMAT_MENU_TYPE = 'medios_download_pick_format' local DOWNLOAD_STORE_MENU_TYPE = 'medios_download_pick_store' +local SCREENSHOT_TAG_MENU_TYPE = 'medeia_screenshot_tags' -- Menu types for the command submenu and trim prompt local CMD_MENU_TYPE = 'medios_cmd_menu' @@ -386,6 +387,12 @@ local function _path_exists(path) return utils.file_info(path) ~= nil end +local function _normalize_fs_path(path) + path = trim(tostring(path or '')) + path = path:gsub('^"+', ''):gsub('"+$', '') + return trim(path) +end + local function _build_python_candidates(configured_python, prefer_no_console) local candidates = {} local seen = {} @@ -585,6 +592,9 @@ end -- Forward declaration (defined later) used by helper auto-start. local _resolve_python_exe +local _refresh_store_cache +local _uosc_open_list_picker +local _run_pipeline_detached local _cached_store_names = {} local _store_cache_loaded = false @@ -830,18 +840,15 @@ local function attempt_start_pipeline_helper_async(callback) return end - local cwd = _detect_repo_root() - local cwd_arg = cwd ~= '' and cwd or nil - local args = { python, '-m', 'MPV.pipeline_helper', '--ipc', get_mpv_ipc_path(), '--timeout', '30' } - _lua_log('attempt_start_pipeline_helper_async: spawning helper python=' .. tostring(python) .. ' cwd=' .. tostring(cwd_arg or '')) + _lua_log('attempt_start_pipeline_helper_async: spawning helper python=' .. tostring(python)) -- Spawn detached; don't wait for it here (async). - local ok, result, detail = _run_subprocess_command({ name = 'subprocess', args = args, cwd = cwd_arg, detach = true }) + local ok, result, detail = _run_subprocess_command({ name = 'subprocess', args = args, detach = true }) _lua_log('attempt_start_pipeline_helper_async: detached spawn result ' .. tostring(detail or '')) if not ok then _lua_log('attempt_start_pipeline_helper_async: detached spawn failed, retrying blocking') - ok, result, detail = _run_subprocess_command({ name = 'subprocess', args = args, cwd = cwd_arg }) + ok, result, detail = _run_subprocess_command({ name = 'subprocess', args = args }) _lua_log('attempt_start_pipeline_helper_async: blocking spawn result ' .. tostring(detail or '')) end @@ -913,6 +920,7 @@ local function _run_helper_request_async(req, timeout_seconds, cb) local function done(resp, err) local err_text = err and tostring(err) or '' + local quiet = type(req) == 'table' and req.quiet and true or false local is_timeout = err_text:find('timeout waiting response', 1, true) ~= nil local retry_count = type(req) == 'table' and tonumber(req._retry or 0) or 0 local is_retryable = is_timeout and type(req) == 'table' @@ -938,7 +946,11 @@ local function _run_helper_request_async(req, timeout_seconds, cb) end if err then - _lua_log('ipc-async: done id=' .. tostring(id) .. ' ERROR: ' .. tostring(err)) + if quiet then + _lua_log('ipc-async: done id=' .. tostring(id) .. ' unavailable ' .. tostring(label)) + else + _lua_log('ipc-async: done id=' .. tostring(id) .. ' ERROR: ' .. tostring(err)) + end else _lua_log('ipc-async: done id=' .. tostring(id) .. ' success=' .. tostring(resp and resp.success)) end @@ -1044,88 +1056,16 @@ local function _run_helper_request_async(req, timeout_seconds, cb) ensure_helper_and_send() end -local function _run_helper_request_response(req, timeout_seconds) - -- Legacy synchronous wrapper for compatibility with run_pipeline_via_ipc_response. - -- TODO: Migrate all callers to async _run_helper_request_async and remove this. - _last_ipc_error = '' - if not ensure_pipeline_helper_running() then - local rv = tostring(mp.get_property(PIPELINE_READY_PROP) or mp.get_property_native(PIPELINE_READY_PROP) or '') - _lua_log('ipc: helper not ready (ready=' .. rv .. '); attempting request anyway') - _last_ipc_error = 'helper not ready' - end - - do - -- Best-effort wait for heartbeat, but do not hard-fail the request. - local deadline = mp.get_time() + 1.5 - while mp.get_time() < deadline do - if _is_pipeline_helper_ready() then - break - end - mp.wait_event(0.05) - end - if not _is_pipeline_helper_ready() then - local rv = tostring(mp.get_property(PIPELINE_READY_PROP) or mp.get_property_native(PIPELINE_READY_PROP) or '') - _lua_log('ipc: proceeding without helper heartbeat; ready=' .. rv) - _last_ipc_error = 'helper heartbeat missing (ready=' .. rv .. ')' - end - end - - if type(req) ~= 'table' then - return nil - end - - local id = tostring(req.id or '') - if id == '' then - id = tostring(math.floor(mp.get_time() * 1000)) .. '-' .. tostring(math.random(100000, 999999)) - req.id = id - end - - local label = '' - if req.op then - label = 'op=' .. tostring(req.op) - elseif req.pipeline then - label = 'cmd=' .. tostring(req.pipeline) - else - label = '(unknown)' - end - _lua_log('ipc: send request id=' .. tostring(id) .. ' ' .. label) - - local req_json = utils.format_json(req) - _last_ipc_last_req_json = req_json - mp.set_property(PIPELINE_RESP_PROP, '') - mp.set_property(PIPELINE_REQ_PROP, req_json) - -- Read-back for debugging: confirms MPV accepted the property write. - local echoed = mp.get_property(PIPELINE_REQ_PROP) or '' - if echoed == '' then - _lua_log('ipc: WARNING request property echoed empty after set') - end - - local deadline = mp.get_time() + (timeout_seconds or 5) - while mp.get_time() < deadline do - local resp_json = mp.get_property(PIPELINE_RESP_PROP) - if resp_json and resp_json ~= '' then - _last_ipc_last_resp_json = resp_json - local ok, resp = pcall(utils.parse_json, resp_json) - if ok and resp and resp.id == id then - _lua_log('ipc: got response id=' .. tostring(id) .. ' success=' .. tostring(resp.success)) - return resp - end - end - mp.wait_event(0.05) - end - - _lua_log('ipc: timeout waiting response; ' .. label) - _last_ipc_error = 'timeout waiting response (' .. label .. ')' - return nil -end - --- IPC helper: return the whole response object (stdout/stderr/error/table) -local function run_pipeline_via_ipc_response(pipeline_cmd, seeds, timeout_seconds) +local function run_pipeline_via_ipc_async(pipeline_cmd, seeds, timeout_seconds, cb) local req = { pipeline = pipeline_cmd } if seeds then req.seeds = seeds end - return _run_helper_request_response(req, timeout_seconds) + _run_helper_request_async(req, timeout_seconds, function(resp, err) + if type(cb) == 'function' then + cb(resp, err) + end + end) end local function _url_can_direct_load(url) @@ -1330,28 +1270,16 @@ local function _check_store_for_existing_url(store, url, cb) local needle = tostring(needles[idx]) idx = idx + 1 local query = 'url:' .. needle - local pipeline_cmd = 'search-file -query ' .. quote_pipeline_arg(query) .. ' | output-json' _lua_log('store-check: probing global query=' .. tostring(query)) - _run_helper_request_async({ pipeline = pipeline_cmd }, 4.0, function(resp, err) + _run_helper_request_async({ op = 'url-exists', data = { url = url, needles = { needle } }, quiet = true }, 2.5, function(resp, err) if resp and resp.success then - local output = trim(tostring(resp.stdout or '')) - if output == '' then + local data = resp.data + if type(data) ~= 'table' or #data == 0 then run_next(nil) return end - - local ok, data = pcall(utils.parse_json, output) - if ok and type(data) == 'table' then - if #data > 0 then - cb(data, nil, needle) - return - end - run_next(nil) - return - end - - run_next('malformed JSON response') + cb(data, nil, needle) return end @@ -1556,6 +1484,160 @@ local function _strip_title_extension(title, path) return title end +local _pending_screenshot = nil + +local function _normalize_tag_list(value) + local tags = {} + local seen = {} + + local function add_tag(text) + text = trim(tostring(text or '')) + if text == '' then + return + end + local key = text:lower() + if seen[key] then + return + end + seen[key] = true + tags[#tags + 1] = text + end + + if type(value) == 'table' then + for _, item in ipairs(value) do + add_tag(item) + end + return tags + end + + local text = tostring(value or '') + for token in text:gmatch('[^,;\r\n]+') do + add_tag(token) + end + return tags +end + +local function _start_screenshot_store_save(store, out_path, tags) + store = _normalize_store_name(store) + out_path = _normalize_fs_path(out_path) + if store == '' or out_path == '' then + mp.osd_message('Screenshot upload failed: invalid store or path', 5) + return false + end + + local tag_list = _normalize_tag_list(tags) + local cmd = 'add-file -store ' .. quote_pipeline_arg(store) + local seeds = { { path = out_path } } + if #tag_list > 0 then + seeds[1].tag = tag_list + end + _set_selected_store(store) + + if _run_pipeline_detached(cmd, function(_, err) + mp.osd_message('Screenshot upload failed to start: ' .. tostring(err or 'unknown'), 5) + end, seeds) then + local tag_suffix = (#tag_list > 0) and (' | tags: ' .. tostring(#tag_list)) or '' + mp.osd_message('Screenshot saved to store: ' .. store .. tag_suffix, 3) + return true + end + + mp.osd_message('Screenshot upload failed to start', 5) + return false +end + +local function _commit_pending_screenshot(tags) + if type(_pending_screenshot) ~= 'table' or not _pending_screenshot.path or not _pending_screenshot.store then + return + end + local store = tostring(_pending_screenshot.store or '') + local out_path = tostring(_pending_screenshot.path or '') + _pending_screenshot = nil + _start_screenshot_store_save(store, out_path, tags) +end + +local function _apply_screenshot_tag_query(query) + _commit_pending_screenshot(_normalize_tag_list(query)) +end + +local function _open_screenshot_tag_prompt(store, out_path) + store = _normalize_store_name(store) + out_path = _normalize_fs_path(out_path) + if store == '' or out_path == '' then + return + end + + _pending_screenshot = { store = store, path = out_path } + + if not ensure_uosc_loaded() then + _commit_pending_screenshot(nil) + return + end + + local menu_data = { + type = SCREENSHOT_TAG_MENU_TYPE, + title = 'Screenshot tags', + search_style = 'palette', + search_debounce = 'submit', + on_search = { 'script-message-to', mp.get_script_name(), 'medeia-image-screenshot-tags-search' }, + footnote = 'Optional comma-separated tags. Press Enter to save, or choose Save without tags.', + items = { + { + title = 'Save without tags', + hint = 'Skip optional tags', + value = { 'script-message-to', mp.get_script_name(), 'medeia-image-screenshot-tags-event', utils.format_json({ query = '' }) }, + }, + }, + } + + mp.commandv('script-message-to', 'uosc', 'open-menu', utils.format_json(menu_data)) +end + +local function _open_store_picker_for_pending_screenshot() + if type(_pending_screenshot) ~= 'table' or not _pending_screenshot.path then + return + end + + local function build_items() + local selected = _get_selected_store() + local items = {} + + if type(_cached_store_names) == 'table' and #_cached_store_names > 0 then + for _, name in ipairs(_cached_store_names) do + name = trim(tostring(name or '')) + if name ~= '' then + items[#items + 1] = { + title = name, + hint = (selected ~= '' and name == selected) and 'Current store' or '', + active = (selected ~= '' and name == selected) and true or false, + value = { 'script-message-to', mp.get_script_name(), 'medeia-image-screenshot-pick-store', utils.format_json({ store = name }) }, + } + end + end + else + items[#items + 1] = { + title = 'No stores found', + hint = 'Configure stores in config.conf', + selectable = false, + } + end + + return items + end + + _uosc_open_list_picker(DOWNLOAD_STORE_MENU_TYPE, 'Save screenshot', build_items()) + + mp.add_timeout(0.05, function() + if type(_pending_screenshot) ~= 'table' or not _pending_screenshot.path then + return + end + _refresh_store_cache(1.5, function(success, changed) + if success and changed then + _uosc_open_list_picker(DOWNLOAD_STORE_MENU_TYPE, 'Save screenshot', build_items()) + end + end) + end) +end + local function _capture_screenshot() local function _format_time_label(seconds) local total = math.max(0, math.floor(tonumber(seconds or 0) or 0)) @@ -1585,8 +1667,12 @@ local function _capture_screenshot() local safe_title = _sanitize_filename_component(raw_title) local filename = safe_title .. '_' .. label .. '.png' - local temp_dir = mp.get_property('user-data/medeia-config-temp') or os.getenv('TEMP') or os.getenv('TMP') or '/tmp' + local temp_dir = _normalize_fs_path(mp.get_property('user-data/medeia-config-temp')) + if temp_dir == '' then + temp_dir = _normalize_fs_path(os.getenv('TEMP') or os.getenv('TMP') or '/tmp') + end local out_path = utils.join_path(temp_dir, filename) + out_path = _normalize_fs_path(out_path) local function do_screenshot(mode) mode = mode or 'video' @@ -1611,28 +1697,35 @@ local function _capture_screenshot() end _ensure_selected_store_loaded() - local selected_store = _get_selected_store() - selected_store = trim(tostring(selected_store or '')) - selected_store = selected_store:gsub('^\"', ''):gsub('\"$', '') - if selected_store == '' then - mp.osd_message('Select a store first (Store button)', 2) - return + local function dispatch_screenshot_save() + local store_count = (type(_cached_store_names) == 'table') and #_cached_store_names or 0 + local selected_store = _normalize_store_name(_get_selected_store()) + + if store_count > 1 then + _pending_screenshot = { path = out_path } + _open_store_picker_for_pending_screenshot() + return + end + + if selected_store == '' and store_count == 1 then + selected_store = _normalize_store_name(_cached_store_names[1]) + end + + if selected_store == '' then + mp.osd_message('Select a store first (Store button)', 2) + return + end + + _open_screenshot_tag_prompt(selected_store, out_path) end - - mp.osd_message('Saving screenshot...', 1) - - -- optimization: use persistent pipeline helper instead of spawning new python process - -- escape paths for command line - local cmd = 'add-file -store "' .. selected_store .. '" -path "' .. out_path .. '"' - - local resp = run_pipeline_via_ipc_response(cmd, nil, 10) - - if resp and resp.success then - mp.osd_message('Screenshot saved to store: ' .. selected_store, 3) + + if not _store_cache_loaded then + _refresh_store_cache(1.5, function() + dispatch_screenshot_save() + end) else - local err = (resp and resp.error) or (resp and resp.stderr) or 'IPC error' - mp.osd_message('Screenshot upload failed: ' .. tostring(err), 5) + dispatch_screenshot_save() end end @@ -1640,6 +1733,36 @@ mp.register_script_message('medeia-image-screenshot', function() _capture_screenshot() end) +mp.register_script_message('medeia-image-screenshot-pick-store', function(json) + if type(_pending_screenshot) ~= 'table' or not _pending_screenshot.path then + return + end + local ok, ev = pcall(utils.parse_json, json) + if not ok or type(ev) ~= 'table' then + return + end + + local store = _normalize_store_name(ev.store) + if store == '' then + return + end + + local out_path = tostring(_pending_screenshot.path or '') + _open_screenshot_tag_prompt(store, out_path) +end) + +mp.register_script_message('medeia-image-screenshot-tags-search', function(query) + _apply_screenshot_tag_query(query) +end) + +mp.register_script_message('medeia-image-screenshot-tags-event', function(json) + local ok, ev = pcall(utils.parse_json, json) + if not ok or type(ev) ~= 'table' then + return + end + _apply_screenshot_tag_query(ev.query) +end) + local CLIP_MARKER_SLOT_COUNT = 2 local clip_markers = {} @@ -2061,7 +2184,7 @@ local function _run_pipeline_request_async(pipeline_cmd, seeds, timeout_seconds, _run_helper_request_async(req, timeout_seconds or 30, cb) end -local function _refresh_store_cache(timeout_seconds, on_complete) +_refresh_store_cache = function(timeout_seconds, on_complete) ensure_mpv_ipc_server() local prev_count = (type(_cached_store_names) == 'table') and #_cached_store_names or 0 @@ -2174,7 +2297,7 @@ local function _refresh_store_cache(timeout_seconds, on_complete) return false end -local function _uosc_open_list_picker(menu_type, title, items) +_uosc_open_list_picker = function(menu_type, title, items) local menu_data = { type = menu_type, title = title, @@ -2606,7 +2729,7 @@ _refresh_current_store_url_status = function(reason) local err_text = trim(tostring(err or '')) if err_text ~= '' then _set_current_store_url_status(store, url, 'error', err_text, 0, needle) - _lua_log('store-check: failed err=' .. tostring(err_text)) + _lua_log('store-check: lookup unavailable') return end @@ -3281,10 +3404,6 @@ local function _run_pipeline_cli_detached(pipeline_cmd, seeds) args = args, detach = true, } - local repo_root = _detect_repo_root() - if repo_root ~= '' then - cmd.cwd = repo_root - end local ok, result, detail = _run_subprocess_command(cmd) if ok then @@ -3296,11 +3415,11 @@ local function _run_pipeline_cli_detached(pipeline_cmd, seeds) return false, detail or _describe_subprocess_result(result) end -local function _run_pipeline_detached(pipeline_cmd, on_failure) +_run_pipeline_detached = function(pipeline_cmd, on_failure, seeds) if not pipeline_cmd or pipeline_cmd == '' then return false end - local ok, detail = _run_pipeline_cli_detached(pipeline_cmd, nil) + local ok, detail = _run_pipeline_cli_detached(pipeline_cmd, seeds) if ok then return true end @@ -3313,7 +3432,7 @@ local function _run_pipeline_detached(pipeline_cmd, on_failure) return false end - _run_helper_request_async({ op = 'run-detached', data = { pipeline = pipeline_cmd } }, 1.0, function(resp, err) + _run_helper_request_async({ op = 'run-detached', data = { pipeline = pipeline_cmd, seeds = seeds } }, 1.0, function(resp, err) if resp and resp.success then return end @@ -3826,21 +3945,49 @@ end -- Helper to run pipeline and parse JSON output function M.run_pipeline_json(pipeline_cmd, seeds, cb) cb = cb or function() end - if not pipeline_cmd:match('output%-json$') then - pipeline_cmd = pipeline_cmd .. ' | output-json' + pipeline_cmd = trim(tostring(pipeline_cmd or '')) + if pipeline_cmd == '' then + cb(nil, 'empty pipeline command') + return end - M.run_pipeline(pipeline_cmd, seeds, function(output, err) - if output then - local ok, data = pcall(utils.parse_json, output) - if ok then - cb(data, nil) + + ensure_mpv_ipc_server() + + local lower_cmd = pipeline_cmd:lower() + local is_mpv_load = lower_cmd:match('%.mpv%s+%-url') ~= nil + local timeout_seconds = is_mpv_load and 45 or 30 + _run_pipeline_request_async(pipeline_cmd, seeds, timeout_seconds, function(resp, err) + if resp and resp.success then + if type(resp.data) == 'table' then + cb(resp.data, nil) return end - _lua_log('Failed to parse JSON: ' .. output) - cb(nil, 'malformed JSON response') + + local output = trim(tostring(resp.stdout or '')) + if output ~= '' then + local ok, data = pcall(utils.parse_json, output) + if ok then + cb(data, nil) + return + end + _lua_log('Failed to parse JSON: ' .. output) + cb(nil, 'malformed JSON response') + return + end + + cb({}, nil) return end - cb(nil, err) + + local details = err or '' + if details == '' and type(resp) == 'table' then + if resp.error and tostring(resp.error) ~= '' then + details = tostring(resp.error) + elseif resp.stderr and tostring(resp.stderr) ~= '' then + details = tostring(resp.stderr) + end + end + cb(nil, details ~= '' and details or 'unknown') end) end @@ -4081,34 +4228,33 @@ local function _start_trim_with_range(range) _lua_log('trim: final upload_cmd=' .. pipeline_cmd) _lua_log('trim: === CALLING PIPELINE HELPER FOR UPLOAD ===') - -- Optimization: use persistent pipeline helper - local response = run_pipeline_via_ipc_response(pipeline_cmd, nil, 60) - - if not response then - response = { success = false, error = "Timeout or IPC error" } - end - - _lua_log('trim: api response success=' .. tostring(response.success)) - _lua_log('trim: api response error=' .. tostring(response.error or 'nil')) - _lua_log('trim: api response stderr=' .. tostring(response.stderr or 'nil')) - _lua_log('trim: api response returncode=' .. tostring(response.returncode or 'nil')) - - if response.stderr and response.stderr ~= '' then - _lua_log('trim: STDERR OUTPUT: ' .. response.stderr) - end - - if response.success then - local msg = 'Trim and upload completed' - if selected_store then - msg = msg .. ' (store: ' .. selected_store .. ')' + run_pipeline_via_ipc_async(pipeline_cmd, nil, 60, function(response, err) + if not response then + response = { success = false, error = err or 'Timeout or IPC error' } end - mp.osd_message(msg, 5) - _lua_log('trim: SUCCESS - ' .. msg) - else - local err_msg = response.error or response.stderr or 'unknown error' - mp.osd_message('Upload failed: ' .. err_msg, 5) - _lua_log('trim: upload FAILED - ' .. err_msg) - end + + _lua_log('trim: api response success=' .. tostring(response.success)) + _lua_log('trim: api response error=' .. tostring(response.error or 'nil')) + _lua_log('trim: api response stderr=' .. tostring(response.stderr or 'nil')) + _lua_log('trim: api response returncode=' .. tostring(response.returncode or 'nil')) + + if response.stderr and response.stderr ~= '' then + _lua_log('trim: STDERR OUTPUT: ' .. response.stderr) + end + + if response.success then + local msg = 'Trim and upload completed' + if selected_store then + msg = msg .. ' (store: ' .. selected_store .. ')' + end + mp.osd_message(msg, 5) + _lua_log('trim: SUCCESS - ' .. msg) + else + local err_msg = err or response.error or response.stderr or 'unknown error' + mp.osd_message('Upload failed: ' .. err_msg, 5) + _lua_log('trim: upload FAILED - ' .. err_msg) + end + end) end function M.open_trim_prompt() diff --git a/MPV/lyric.py b/MPV/lyric.py index b508263..60ecd0d 100644 --- a/MPV/lyric.py +++ b/MPV/lyric.py @@ -1079,6 +1079,7 @@ class _PlaybackState: fetch_attempt_at: float = 0.0 cache_wait_key: Optional[str] = None cache_wait_started_at: float = 0.0 + cache_wait_next_probe_at: float = 0.0 def clear(self, client: MPVIPCClient, *, clear_hash: bool = True) -> None: """Reset backend resolution and clean up any active OSD / external subtitle. @@ -1096,6 +1097,7 @@ class _PlaybackState: self.times = [] self.cache_wait_key = None self.cache_wait_started_at = 0.0 + self.cache_wait_next_probe_at = 0.0 if self.loaded_key is not None: _osd_clear_and_restore(client) self.loaded_key = None @@ -1244,6 +1246,7 @@ def run_auto_overlay( state.key = None state.cache_wait_key = None state.cache_wait_started_at = 0.0 + state.cache_wait_next_probe_at = 0.0 if store_override and (not hash_override or hash_override == state.file_hash): reg = _make_registry() @@ -1390,15 +1393,21 @@ def run_auto_overlay( if state.cache_wait_key != state.key: state.cache_wait_key = state.key state.cache_wait_started_at = now + state.cache_wait_next_probe_at = 0.0 + elif state.cache_wait_next_probe_at > now: + time.sleep(max(0.05, min(0.5, state.cache_wait_next_probe_at - now))) + continue pending = is_notes_prefetch_pending(state.store_name, state.file_hash) waited_s = max(0.0, now - float(state.cache_wait_started_at or now)) if pending and waited_s < pending_wait_s: - time.sleep(min(max(poll_s, 0.05), 0.2)) + state.cache_wait_next_probe_at = now + max(0.2, min(0.5, pending_wait_s - waited_s)) + time.sleep(max(0.05, min(0.5, state.cache_wait_next_probe_at - now))) continue if waited_s < cache_wait_s: - time.sleep(min(max(poll_s, 0.05), 0.2)) + state.cache_wait_next_probe_at = now + max(0.2, min(0.5, cache_wait_s - waited_s)) + time.sleep(max(0.05, min(0.5, state.cache_wait_next_probe_at - now))) continue try: @@ -1413,6 +1422,7 @@ def run_auto_overlay( state.cache_wait_key = None state.cache_wait_started_at = 0.0 + state.cache_wait_next_probe_at = 0.0 try: _log( diff --git a/MPV/pipeline_helper.py b/MPV/pipeline_helper.py index 2329b05..fb7feb4 100644 --- a/MPV/pipeline_helper.py +++ b/MPV/pipeline_helper.py @@ -112,10 +112,32 @@ def _start_ready_heartbeat(ipc_path: str, stop_event: threading.Event) -> thread return thread -def _run_pipeline(pipeline_text: str, *, seeds: Any = None) -> Dict[str, Any]: +def _run_pipeline( + pipeline_text: str, + *, + seeds: Any = None, + json_output: bool = False, +) -> Dict[str, Any]: # Import after sys.path fix. from TUI.pipeline_runner import PipelineRunner # noqa: WPS433 + def _json_safe(value: Any) -> Any: + if value is None or isinstance(value, (str, int, float, bool)): + return value + if isinstance(value, dict): + out: Dict[str, Any] = {} + for key, item in value.items(): + out[str(key)] = _json_safe(item) + return out + if isinstance(value, (list, tuple, set)): + return [_json_safe(item) for item in value] + if hasattr(value, "to_dict") and callable(getattr(value, "to_dict")): + try: + return _json_safe(value.to_dict()) + except Exception: + pass + return str(value) + def _table_to_payload(table: Any) -> Optional[Dict[str, Any]]: if table is None: return None @@ -190,12 +212,20 @@ def _run_pipeline(pipeline_text: str, *, seeds: Any = None) -> Dict[str, Any]: except Exception: table_payload = None + data_payload = None + if json_output: + try: + data_payload = _json_safe(getattr(result, "emitted", None) or []) + except Exception: + data_payload = [] + return { "success": bool(result.success), "stdout": result.stdout or "", "stderr": result.stderr or "", "error": result.error, "table": table_payload, + "data": data_payload, } @@ -372,6 +402,112 @@ def _run_op(op: str, data: Any) -> Dict[str, Any]: "choices": [], } + if op_name in {"url-exists", + "url_exists", + "find-url", + "find_url"}: + try: + from Store import Store # noqa: WPS433 + + cfg = load_config() or {} + storage = Store(config=cfg, suppress_debug=True) + + raw_needles: list[str] = [] + if isinstance(data, dict): + maybe_needles = data.get("needles") + if isinstance(maybe_needles, (list, tuple, set)): + for item in maybe_needles: + text = str(item or "").strip() + if text and text not in raw_needles: + raw_needles.append(text) + elif isinstance(maybe_needles, str): + text = maybe_needles.strip() + if text: + raw_needles.append(text) + + if not raw_needles: + text = str(data.get("url") or "").strip() + if text: + raw_needles.append(text) + + if not raw_needles: + return { + "success": False, + "stdout": "", + "stderr": "", + "error": "Missing url", + "table": None, + "data": [], + } + + matches: list[dict[str, Any]] = [] + seen_keys: set[str] = set() + + for backend_name in storage.list_backends() or []: + try: + backend = storage[backend_name] + except Exception: + continue + + search_fn = getattr(backend, "search", None) + if not callable(search_fn): + continue + + for needle in raw_needles: + query = f"url:{needle}" + try: + results = backend.search( + query, + limit=1, + minimal=True, + url_only=True, + ) or [] + except Exception: + continue + + for item in results: + if hasattr(item, "to_dict") and callable(getattr(item, "to_dict")): + try: + item = item.to_dict() + except Exception: + item = {"title": str(item)} + elif not isinstance(item, dict): + item = {"title": str(item)} + + payload = dict(item) + payload.setdefault("store", str(backend_name)) + payload.setdefault("needle", str(needle)) + + key = str(payload.get("hash") or payload.get("url") or payload.get("title") or needle).strip().lower() + if key in seen_keys: + continue + seen_keys.add(key) + matches.append(payload) + + if matches: + break + + if matches: + break + + return { + "success": True, + "stdout": "", + "stderr": "", + "error": None, + "table": None, + "data": matches, + } + except Exception as exc: + return { + "success": False, + "stdout": "", + "stderr": "", + "error": f"url-exists failed: {type(exc).__name__}: {exc}", + "table": None, + "data": [], + } + # Provide yt-dlp format list for a URL (for MPV "Change format" menu). # Returns a ResultTable-like payload so the Lua UI can render without running cmdlets. if op_name in {"ytdlp-formats", @@ -1084,6 +1220,7 @@ def main(argv: Optional[list[str]] = None) -> int: data = req.get("data") pipeline_text = str(req.get("pipeline") or "").strip() seeds = req.get("seeds") + json_output = bool(req.get("json") or req.get("output_json")) if not req_id: continue @@ -1122,7 +1259,11 @@ def main(argv: Optional[list[str]] = None) -> int: req_id=req_id, ) else: - run = _run_pipeline(pipeline_text, seeds=seeds) + run = _run_pipeline( + pipeline_text, + seeds=seeds, + json_output=json_output, + ) resp = { "id": req_id, @@ -1133,6 +1274,7 @@ def main(argv: Optional[list[str]] = None) -> int: ""), "error": run.get("error"), "table": run.get("table"), + "data": run.get("data"), } if "choices" in run: resp["choices"] = run.get("choices") diff --git a/SYS/database.py b/SYS/database.py index 50ec343..7cea074 100644 --- a/SYS/database.py +++ b/SYS/database.py @@ -527,15 +527,72 @@ def get_config_all() -> Dict[str, Any]: # Worker Management Methods for medios.db +def _worker_db_connect(timeout: float = 0.75) -> sqlite3.Connection: + conn = sqlite3.connect( + str(DB_PATH), + timeout=timeout, + check_same_thread=False, + ) + conn.row_factory = sqlite3.Row + try: + busy_ms = max(1, int(timeout * 1000)) + conn.execute(f"PRAGMA busy_timeout = {busy_ms}") + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA synchronous=NORMAL") + except sqlite3.Error: + pass + return conn + + +def _worker_db_execute( + query: str, + params: tuple = (), + *, + fetch: Optional[str] = None, + timeout: float = 0.75, + retries: int = 1, +) -> Any: + attempts = 0 + while True: + conn: Optional[sqlite3.Connection] = None + try: + conn = _worker_db_connect(timeout=timeout) + cursor = conn.cursor() + try: + cursor.execute(query, params) + if fetch == "one": + result = cursor.fetchone() + elif fetch == "all": + result = cursor.fetchall() + else: + result = cursor.rowcount + conn.commit() + return result + finally: + cursor.close() + except sqlite3.OperationalError as exc: + msg = str(exc).lower() + if "locked" in msg and attempts < retries: + attempts += 1 + time.sleep(0.05 * attempts) + continue + raise + finally: + if conn is not None: + try: + conn.close() + except Exception: + pass + def insert_worker(worker_id: str, worker_type: str, title: str = "", description: str = "") -> bool: try: - db.execute( + _worker_db_execute( "INSERT INTO workers (id, type, title, description, status) VALUES (?, ?, ?, ?, 'running')", - (worker_id, worker_type, title, description) + (worker_id, worker_type, title, description), ) return True except Exception as exc: - logger.exception("Failed to insert worker %s: %s", worker_id, exc) + logger.warning("Failed to insert worker %s: %s", worker_id, exc) return False def update_worker(worker_id: str, **kwargs) -> bool: @@ -559,20 +616,20 @@ def update_worker(worker_id: str, **kwargs) -> bool: vals.append(worker_id) try: - db.execute(query, tuple(vals)) + _worker_db_execute(query, tuple(vals)) return True except Exception as exc: - logger.exception("Failed to update worker %s: %s", worker_id, exc) + logger.warning("Failed to update worker %s: %s", worker_id, exc) return False def append_worker_stdout(worker_id: str, content: str, channel: str = 'stdout'): try: - db.execute( + _worker_db_execute( "INSERT INTO worker_stdout (worker_id, channel, content) VALUES (?, ?, ?)", - (worker_id, channel, content) + (worker_id, channel, content), ) except Exception as exc: - logger.exception("Failed to append worker stdout for %s", worker_id) + logger.warning("Failed to append worker stdout for %s: %s", worker_id, exc) def get_worker_stdout(worker_id: str, channel: Optional[str] = None) -> str: query = "SELECT content FROM worker_stdout WHERE worker_id = ?" @@ -582,20 +639,30 @@ def get_worker_stdout(worker_id: str, channel: Optional[str] = None) -> str: params.append(channel) query += " ORDER BY timestamp ASC" - rows = db.fetchall(query, tuple(params)) + rows = _worker_db_execute(query, tuple(params), fetch="all") or [] return "\n".join(row['content'] for row in rows) def get_active_workers() -> List[Dict[str, Any]]: - rows = db.fetchall("SELECT * FROM workers WHERE status = 'running' ORDER BY created_at DESC") + rows = _worker_db_execute( + "SELECT * FROM workers WHERE status = 'running' ORDER BY created_at DESC", + fetch="all", + ) or [] return [dict(row) for row in rows] def get_worker(worker_id: str) -> Optional[Dict[str, Any]]: - row = db.fetchone("SELECT * FROM workers WHERE id = ?", (worker_id,)) + row = _worker_db_execute( + "SELECT * FROM workers WHERE id = ?", + (worker_id,), + fetch="one", + ) return dict(row) if row else None def expire_running_workers(older_than_seconds: int = 300, status: str = 'error', reason: str = 'timeout') -> int: # SQLITE doesn't have a simple way to do DATETIME - INTERVAL, so we'll use strftime/unixepoch if available # or just do regular update for all running ones for now as a simple fallback query = f"UPDATE workers SET status = ?, error_message = ? WHERE status = 'running'" - db.execute(query, (status, reason)) + try: + _worker_db_execute(query, (status, reason), timeout=0.5, retries=0) + except Exception: + return 0 return 0 # We don't easily get the rowcount from db.execute right now diff --git a/Store/HydrusNetwork.py b/Store/HydrusNetwork.py index 689143a..2eb0f98 100644 --- a/Store/HydrusNetwork.py +++ b/Store/HydrusNetwork.py @@ -684,6 +684,30 @@ class HydrusNetwork(Store): continue return ids_out, hashes_out + def _fetch_search_metadata( + *, + file_ids: Optional[Sequence[Any]] = None, + hashes: Optional[Sequence[Any]] = None, + include_tags: bool = True, + include_urls: bool = True, + include_mime: bool = True, + ) -> list[dict[str, Any]]: + try: + payload = client.fetch_file_metadata( + file_ids=file_ids, + hashes=hashes, + include_service_keys_to_tags=include_tags, + include_file_url=include_urls, + include_duration=False, + include_size=True, + include_mime=include_mime, + ) + except Exception: + return [] + + metadata = payload.get("metadata", []) if isinstance(payload, dict) else [] + return metadata if isinstance(metadata, list) else [] + def _iter_url_filtered_metadata( url_value: str | None, want_any: bool, @@ -927,6 +951,55 @@ class HydrusNetwork(Store): return metas_out[:fetch_limit] + def _cap_metadata_candidates( + file_ids_in: list[int], + hashes_in: list[str], + *, + requested_limit: Any, + freeform_mode: bool = False, + fallback_scan: bool = False, + ) -> tuple[list[int], list[str]]: + """Cap metadata hydration to a sane subset of Hydrus hits. + + Hydrus native tag search is fast, but fetching metadata for every + matched file can explode for broad queries. Keep the native search, + but only hydrate a bounded working set and let downstream filtering + stop once enough display rows are collected. + """ + + try: + base_limit = int(requested_limit or 100) + except Exception: + base_limit = 100 + if base_limit <= 0: + base_limit = 100 + + hydrate_limit = base_limit + if freeform_mode: + hydrate_limit = max(hydrate_limit * 4, 200) + if fallback_scan: + hydrate_limit = max(hydrate_limit * 2, 200) + hydrate_limit = min(hydrate_limit, 1000) + + ids_out = list(file_ids_in or []) + hashes_out = list(hashes_in or []) + total_candidates = len(ids_out) + len(hashes_out) + if total_candidates <= hydrate_limit: + return ids_out, hashes_out + + debug( + f"{prefix} limiting metadata hydration to {hydrate_limit} of {total_candidates} candidate(s)" + ) + + if ids_out: + ids_out = ids_out[:hydrate_limit] + remaining = max(0, hydrate_limit - len(ids_out)) + hashes_out = hashes_out[:remaining] if remaining > 0 else [] + else: + hashes_out = hashes_out[:hydrate_limit] + + return ids_out, hashes_out + query_lower = query.lower().strip() # Support `ext:` anywhere in the query. We filter results by the @@ -1172,7 +1245,7 @@ class HydrusNetwork(Store): payloads.append( client.search_files( tags=title_predicates, - return_hashes=True, + return_hashes=False, return_file_ids=True, ) ) @@ -1187,7 +1260,7 @@ class HydrusNetwork(Store): payloads.append( client.search_files( tags=[f"title:{query_lower}*"], - return_hashes=True, + return_hashes=False, return_file_ids=True, ) ) @@ -1198,7 +1271,7 @@ class HydrusNetwork(Store): payloads.append( client.search_files( tags=freeform_predicates, - return_hashes=True, + return_hashes=False, return_file_ids=True, ) ) @@ -1206,15 +1279,12 @@ class HydrusNetwork(Store): pass id_set: set[int] = set() - hash_set: set[str] = set() for payload in payloads: - ids_part, hashes_part = _extract_search_ids(payload) + ids_part, _ = _extract_search_ids(payload) for fid in ids_part: id_set.add(fid) - for hh in hashes_part: - hash_set.add(hh) file_ids = list(id_set) - hashes = list(hash_set) + hashes = [] else: if not tags: debug(f"{prefix} 0 result(s)") @@ -1222,10 +1292,11 @@ class HydrusNetwork(Store): search_result = client.search_files( tags=tags, - return_hashes=True, + return_hashes=False, return_file_ids=True ) - file_ids, hashes = _extract_search_ids(search_result) + file_ids, _ = _extract_search_ids(search_result) + hashes = [] # Fast path: ext-only search. Avoid fetching metadata for an unbounded # system:everything result set; fetch in chunks until we have enough. @@ -1242,21 +1313,13 @@ class HydrusNetwork(Store): if len(results) >= limit: break chunk = file_ids[start:start + chunk_size] - try: - payload = client.fetch_file_metadata( - file_ids=chunk, - include_service_keys_to_tags=True, - include_file_url=True, - include_duration=True, - include_size=True, - include_mime=True, - ) - except Exception: - continue - metas = payload.get("metadata", - []) if isinstance(payload, - dict) else [] - if not isinstance(metas, list): + metas = _fetch_search_metadata( + file_ids=chunk, + include_tags=True, + include_urls=True, + include_mime=True, + ) + if not metas: continue for meta in metas: if len(results) >= limit: @@ -1312,26 +1375,27 @@ class HydrusNetwork(Store): debug(f"{prefix} 0 result(s)") return [] + file_ids, hashes = _cap_metadata_candidates( + file_ids, + hashes, + requested_limit=limit, + freeform_mode=freeform_union_search, + ) + if file_ids: - metadata = client.fetch_file_metadata( + metadata_list = _fetch_search_metadata( file_ids=file_ids, - include_service_keys_to_tags=True, - include_file_url=True, - include_duration=True, - include_size=True, + include_tags=True, + include_urls=True, include_mime=True, ) - metadata_list = metadata.get("metadata", []) elif hashes: - metadata = client.fetch_file_metadata( + metadata_list = _fetch_search_metadata( hashes=hashes, - include_service_keys_to_tags=True, - include_file_url=True, - include_duration=True, - include_size=True, + include_tags=True, + include_urls=True, include_mime=True, ) - metadata_list = metadata.get("metadata", []) else: metadata_list = [] @@ -1341,31 +1405,34 @@ class HydrusNetwork(Store): try: search_result = client.search_files( tags=["system:everything"], - return_hashes=True, + return_hashes=False, return_file_ids=True, ) - file_ids, hashes = _extract_search_ids(search_result) + file_ids, _ = _extract_search_ids(search_result) + hashes = [] + + file_ids, hashes = _cap_metadata_candidates( + file_ids, + hashes, + requested_limit=limit, + freeform_mode=True, + fallback_scan=True, + ) if file_ids: - metadata = client.fetch_file_metadata( + metadata_list = _fetch_search_metadata( file_ids=file_ids, - include_service_keys_to_tags=True, - include_file_url=True, - include_duration=True, - include_size=True, + include_tags=True, + include_urls=True, include_mime=True, ) - metadata_list = metadata.get("metadata", []) elif hashes: - metadata = client.fetch_file_metadata( + metadata_list = _fetch_search_metadata( hashes=hashes, - include_service_keys_to_tags=True, - include_file_url=True, - include_duration=True, - include_size=True, + include_tags=True, + include_urls=True, include_mime=True, ) - metadata_list = metadata.get("metadata", []) except Exception: pass diff --git a/cmdlet/search_file.py b/cmdlet/search_file.py index f8d989d..0c23a37 100644 --- a/cmdlet/search_file.py +++ b/cmdlet/search_file.py @@ -75,6 +75,60 @@ class _WorkerLogger: pass +def _truncate_worker_text(value: Any, max_len: int = 120) -> str: + text = str(value or "").strip() + if len(text) <= max_len: + return text + if max_len <= 3: + return text[:max_len] + return f"{text[:max_len - 3].rstrip()}..." + + +def _summarize_worker_result(item: Dict[str, Any]) -> str: + title = ( + item.get("title") + or item.get("name") + or item.get("path") + or item.get("url") + or item.get("hash") + or "Result" + ) + details: list[str] = [] + + store_val = str(item.get("store") or item.get("source") or "").strip() + if store_val: + details.append(store_val) + + ext_val = str(item.get("ext") or item.get("mime") or "").strip() + if ext_val: + details.append(ext_val) + + hash_val = str( + item.get("hash") or item.get("file_hash") or item.get("hash_hex") or "" + ).strip() + if hash_val: + details.append(hash_val[:12]) + + suffix = f" [{' | '.join(details)}]" if details else "" + return f"- {_truncate_worker_text(title)}{suffix}" + + +def _summarize_worker_results(results: Sequence[Dict[str, Any]], preview_limit: int = 8) -> str: + count = len(results) + lines = [f"{count} result(s)"] + if count <= 0: + return lines[0] + + for item in results[:preview_limit]: + lines.append(_summarize_worker_result(item)) + + remaining = count - min(count, preview_limit) + if remaining > 0: + lines.append(f"... {remaining} more") + + return "\n".join(lines) + + class search_file(Cmdlet): """Class-based search-file cmdlet for searching storage backends.""" @@ -1127,7 +1181,7 @@ class search_file(Cmdlet): except Exception: pass try: - append_worker_stdout(worker_id, json.dumps([], indent=2)) + append_worker_stdout(worker_id, _summarize_worker_results([])) update_worker(worker_id, status="completed") except Exception: pass @@ -1195,7 +1249,7 @@ class search_file(Cmdlet): ctx.set_current_stage_table(table) try: - append_worker_stdout(worker_id, json.dumps(results_list, indent=2)) + append_worker_stdout(worker_id, _summarize_worker_results(results_list)) update_worker(worker_id, status="completed") except Exception: pass @@ -1498,7 +1552,7 @@ class search_file(Cmdlet): if not results: log(f"No results found for query: {query}", file=sys.stderr) try: - append_worker_stdout(worker_id, json.dumps([], indent=2)) + append_worker_stdout(worker_id, _summarize_worker_results([])) update_worker(worker_id, status="completed") except Exception: pass @@ -1534,7 +1588,7 @@ class search_file(Cmdlet): ctx.set_current_stage_table(table) try: - append_worker_stdout(worker_id, json.dumps(results_list, indent=2)) + append_worker_stdout(worker_id, _summarize_worker_results(results_list)) update_worker(worker_id, status="completed") except Exception: pass @@ -2034,8 +2088,7 @@ class search_file(Cmdlet): ctx.set_last_result_table(table, results_list) db.append_worker_stdout( worker_id, - json.dumps(results_list, - indent=2) + _summarize_worker_results(results_list) ) db.update_worker_status(worker_id, "completed") return 0 @@ -2047,7 +2100,7 @@ class search_file(Cmdlet): ctx.set_last_result_table_preserve_history(table, []) except Exception: pass - db.append_worker_stdout(worker_id, json.dumps([], indent=2)) + db.append_worker_stdout(worker_id, _summarize_worker_results([])) db.update_worker_status(worker_id, "completed") return 0 @@ -2259,8 +2312,7 @@ class search_file(Cmdlet): ctx.set_last_result_table(table, results_list) db.append_worker_stdout( worker_id, - json.dumps(results_list, - indent=2) + _summarize_worker_results(results_list) ) else: log("No results found", file=sys.stderr) @@ -2270,7 +2322,7 @@ class search_file(Cmdlet): ctx.set_last_result_table_preserve_history(table, []) except Exception: pass - db.append_worker_stdout(worker_id, json.dumps([], indent=2)) + db.append_worker_stdout(worker_id, _summarize_worker_results([])) db.update_worker_status(worker_id, "completed") return 0