diff --git a/CLI.py b/CLI.py index 186019f..483d298 100644 --- a/CLI.py +++ b/CLI.py @@ -231,9 +231,25 @@ def _get_table_title_for_command(cmd_name: str, emitted_items: Optional[List[Any 'delete_file': 'Results', 'check-file-status': 'Status', 'check_file_status': 'Status', + 'get-metadata': None, + 'get_metadata': None, } - - return title_map.get(cmd_name, 'Results') + + mapped = title_map.get(cmd_name, 'Results') + if mapped is not None: + return mapped + + # For metadata, derive title from first item if available + if emitted_items: + first = emitted_items[0] + try: + if isinstance(first, dict) and first.get('title'): + return str(first.get('title')) + if hasattr(first, 'title') and getattr(first, 'title'): + return str(getattr(first, 'title')) + except Exception: + pass + return 'Results' def _close_cli_worker_manager() -> None: @@ -409,9 +425,20 @@ def _get_cmdlet_names() -> List[str]: def _import_cmd_module(mod_name: str): """Import a cmdlet/native module from cmdlets or cmdnats packages.""" - for package in ("cmdlets", "cmdnats", None): + # Normalize leading punctuation used in aliases (e.g., .pipe) + normalized = (mod_name or "").strip() + if normalized.startswith('.'): + normalized = normalized.lstrip('.') + # Convert hyphens to underscores to match module filenames + normalized = normalized.replace("-", "_") + if not normalized: + return None + + # Prefer native cmdnats modules first so editable installs of this package + # don't shadow the in-repo implementations (e.g., .pipe autocomplete flags). + for package in ("cmdnats", "cmdlets", None): try: - qualified = f"{package}.{mod_name}" if package else mod_name + qualified = f"{package}.{normalized}" if package else normalized return import_module(qualified) except ModuleNotFoundError: continue @@ -495,6 +522,15 @@ def _get_arg_choices(cmd_name: str, arg_name: str) -> List[str]: merged = sorted(set(provider_choices + meta_choices)) if merged: return merged + + if normalized_arg == "scrape": + try: + from helper.metadata_search import list_metadata_providers + meta_providers = list_metadata_providers(_load_cli_config()) + if meta_providers: + return sorted(meta_providers.keys()) + except Exception: + pass mod = _import_cmd_module(mod_name) data = getattr(mod, "CMDLET", None) if mod else None if data: @@ -536,36 +572,48 @@ if ( text = document.text_before_cursor tokens = text.split() - if not tokens: + # Respect pipeline stages: only use tokens after the last '|' + last_pipe = -1 + for idx, tok in enumerate(tokens): + if tok == "|": + last_pipe = idx + stage_tokens = tokens[last_pipe + 1:] if last_pipe >= 0 else tokens + + if not stage_tokens: for cmd in self.cmdlet_names: yield CompletionType(cmd, start_position=0) - elif len(tokens) == 1: - current = tokens[0].lower() + return + + # Single token at this stage -> suggest command names/keywords + if len(stage_tokens) == 1: + current = stage_tokens[0].lower() for cmd in self.cmdlet_names: if cmd.startswith(current): yield CompletionType(cmd, start_position=-len(current)) for keyword in ["help", "exit", "quit"]: if keyword.startswith(current): yield CompletionType(keyword, start_position=-len(current)) - else: - cmd_name = tokens[0].replace("_", "-").lower() - current_token = tokens[-1].lower() - prev_token = tokens[-2].lower() if len(tokens) > 1 else "" + return - choices = _get_arg_choices(cmd_name, prev_token) - if choices: - for choice in choices: - if choice.lower().startswith(current_token): - yield CompletionType(choice, start_position=-len(current_token)) - return + # Otherwise treat first token of stage as command and complete its args + cmd_name = stage_tokens[0].replace("_", "-").lower() + current_token = stage_tokens[-1].lower() + prev_token = stage_tokens[-2].lower() if len(stage_tokens) > 1 else "" - arg_names = _get_cmdlet_args(cmd_name) - for arg in arg_names: - if arg.lower().startswith(current_token): - yield CompletionType(arg, start_position=-len(current_token)) + choices = _get_arg_choices(cmd_name, prev_token) + if choices: + for choice in choices: + if choice.lower().startswith(current_token): + yield CompletionType(choice, start_position=-len(current_token)) + return - if "--help".startswith(current_token): - yield CompletionType("--help", start_position=-len(current_token)) + arg_names = _get_cmdlet_args(cmd_name) + for arg in arg_names: + if arg.lower().startswith(current_token): + yield CompletionType(arg, start_position=-len(current_token)) + + if "--help".startswith(current_token): + yield CompletionType("--help", start_position=-len(current_token)) async def get_completions_async(self, document: Document, complete_event): # type: ignore[override] for completion in self.get_completions(document, complete_event): @@ -689,6 +737,7 @@ def _create_cmdlet_cli(): |246813579|JKLMNOPQR| |369369369|STUVWXYZ0| |483726159|ABCDEFGHI| +|=========+=========| |516273849|JKLMNOPQR| |639639639|STUVWXYZ0| |753186429|ABCDEFGHI| @@ -699,7 +748,7 @@ def _create_cmdlet_cli(): print(banner) # Configurable prompt - prompt_text = ">>>|" + prompt_text = "🜂🜄🜁🜃|" # Pre-acquire Hydrus session key at startup (like hub-ui does) try: @@ -840,7 +889,6 @@ def _create_cmdlet_cli(): return input(prompt) while True: - print("#-------------------------------------------------------------------------#") try: user_input = get_input(prompt_text).strip() except (EOFError, KeyboardInterrupt): @@ -971,6 +1019,19 @@ def _execute_pipeline(tokens: list): if not stages: print("Invalid pipeline syntax\n") return + + # If a previous stage paused for selection, attach its remaining stages when the user runs only @N + pending_tail = ctx.get_pending_pipeline_tail() if hasattr(ctx, 'get_pending_pipeline_tail') else [] + pending_source = ctx.get_pending_pipeline_source() if hasattr(ctx, 'get_pending_pipeline_source') else None + current_source = ctx.get_current_stage_table_source_command() if hasattr(ctx, 'get_current_stage_table_source_command') else None + selection_only = len(stages) == 1 and stages[0] and stages[0][0].startswith('@') + if pending_tail and selection_only: + if pending_source and current_source and current_source == pending_source: + stages.extend(pending_tail) + if hasattr(ctx, 'clear_pending_pipeline_tail'): + ctx.clear_pending_pipeline_tail() + elif hasattr(ctx, 'clear_pending_pipeline_tail'): + ctx.clear_pending_pipeline_tail() # Load config relative to CLI root config = _load_cli_config() @@ -1044,7 +1105,9 @@ def _execute_pipeline(tokens: list): command_expanded = False selected_row_args = [] - if source_cmd: + skip_pipe_expansion = source_cmd == '.pipe' and len(stages) > 0 + + if source_cmd and not skip_pipe_expansion: # Try to find row args for the selected indices for idx in first_stage_selection_indices: row_args = ctx.get_current_stage_table_row_selection_args(idx) @@ -1151,6 +1214,9 @@ def _execute_pipeline(tokens: list): if source_cmd == '.pipe' or source_cmd == '.adjective': should_expand_to_command = True + if source_cmd == '.pipe' and (stage_index + 1 < len(stages) or stage_args): + # When piping playlist rows to another cmdlet, prefer item-based selection + should_expand_to_command = False elif source_cmd == 'search-file' and source_args and 'youtube' in source_args: # Special case for youtube search results: @N expands to .pipe if stage_index + 1 >= len(stages): @@ -1170,6 +1236,10 @@ def _execute_pipeline(tokens: list): # Single format object if source_cmd: should_expand_to_command = True + + # If we have a source command but no piped data (paused for selection), expand to command + if not should_expand_to_command and source_cmd and selection is not None and piped_result is None: + should_expand_to_command = True # If expanding to command, replace this stage and re-execute if should_expand_to_command and selection is not None: @@ -1360,6 +1430,27 @@ def _execute_pipeline(tokens: list): # Intermediate stage - thread to next stage piped_result = pipeline_ctx.emits ctx.set_last_result_table(None, pipeline_ctx.emits) + else: + # No output from this stage. If it presented a selectable table (e.g., format list), pause + # and stash the remaining pipeline so @N can resume with the selection applied. + if not is_last_stage: + stage_table_source = ctx.get_current_stage_table_source_command() + row_has_selection = ctx.get_current_stage_table_row_selection_args(0) is not None + if stage_table_source and row_has_selection: + pending_tail = stages[stage_index + 1:] + if pending_tail and pending_tail[0] and pending_tail[0][0].startswith('@'): + pending_tail = pending_tail[1:] + if hasattr(ctx, 'set_pending_pipeline_tail') and pending_tail: + ctx.set_pending_pipeline_tail(pending_tail, stage_table_source) + elif hasattr(ctx, 'clear_pending_pipeline_tail'): + ctx.clear_pending_pipeline_tail() + if pipeline_session and worker_manager: + try: + worker_manager.log_step(pipeline_session.worker_id, "Pipeline paused for @N selection") + except Exception: + pass + print("Pipeline paused: select a format with @N to continue remaining stages") + return if ret_code != 0: stage_status = "failed" diff --git a/LUA/main.lua b/LUA/main.lua index 14af2d9..514a9f8 100644 --- a/LUA/main.lua +++ b/LUA/main.lua @@ -11,12 +11,32 @@ local opts = { } -- Detect CLI path -local script_dir = mp.get_script_directory() +local function detect_script_dir() + local dir = mp.get_script_directory() + if dir and dir ~= "" then return dir end + + -- Fallback to debug info path + local src = debug.getinfo(1, "S").source + if src and src:sub(1, 1) == "@" then + local path = src:sub(2) + local parent = path:match("(.*)[/\\]") + if parent and parent ~= "" then + return parent + end + end + + -- Fallback to working directory + local cwd = utils.getcwd() + if cwd and cwd ~= "" then return cwd end + return nil +end + +local script_dir = detect_script_dir() or "" if not opts.cli_path then -- Assuming the structure is repo/LUA/script.lua and repo/CLI.py -- We need to go up one level local parent_dir = script_dir:match("(.*)[/\\]") - if parent_dir then + if parent_dir and parent_dir ~= "" then opts.cli_path = parent_dir .. "/CLI.py" else opts.cli_path = "CLI.py" -- Fallback diff --git a/README.md b/README.md index a130184..4565545 100644 --- a/README.md +++ b/README.md @@ -32,10 +32,10 @@ python cli.py ``` Adding your first file ```python -.pipe -list # List MPV current playing/list -.pipe -save # Save playlist -.pipe -load # lists saved playlist, @# to load -.pipe "https://www.youtube.com/watch?v=_23dFb50Z2Y" # adds to current playlist +.pipe -list # List MPV current playing/list +.pipe -save # Save current MPV playlist to local library +.pipe -load # List saved playlists; use @N to load one +.pipe "https://www.youtube.com/watch?v=_23dFb50Z2Y" # Add URL to current playlist ``` 1. search-file -provider youtube "something in the way" diff --git a/cmdlets/_shared.py b/cmdlets/_shared.py index 00ad710..e3aa4b8 100644 --- a/cmdlets/_shared.py +++ b/cmdlets/_shared.py @@ -1128,6 +1128,47 @@ def merge_sequences(*sources: Optional[Iterable[Any]], case_sensitive: bool = Tr return merged +def collapse_namespace_tags(tags: Optional[Iterable[Any]], namespace: str, prefer: str = "last") -> list[str]: + """Reduce tags so only one entry for a given namespace remains. + + Keeps either the first or last occurrence (default last) while preserving overall order + for non-matching tags. Useful for ensuring a single title: tag. + """ + if not tags: + return [] + ns = str(namespace or "").strip().lower() + if not ns: + return list(tags) if isinstance(tags, list) else list(tags) + + prefer_last = str(prefer or "last").lower() != "first" + ns_prefix = ns + ":" + + items = list(tags) + if prefer_last: + kept: list[str] = [] + seen_ns = False + for tag in reversed(items): + text = str(tag) + if text.lower().startswith(ns_prefix): + if seen_ns: + continue + seen_ns = True + kept.append(text) + kept.reverse() + return kept + else: + kept_ns = False + result: list[str] = [] + for tag in items: + text = str(tag) + if text.lower().startswith(ns_prefix): + if kept_ns: + continue + kept_ns = True + result.append(text) + return result + + def extract_tags_from_result(result: Any) -> list[str]: tags: list[str] = [] if isinstance(result, models.PipeObject): diff --git a/cmdlets/add_file.py b/cmdlets/add_file.py index b23b628..83c8c6a 100644 --- a/cmdlets/add_file.py +++ b/cmdlets/add_file.py @@ -16,6 +16,7 @@ from ._shared import ( extract_tags_from_result, extract_title_from_result, extract_known_urls_from_result, merge_sequences, extract_relationships, extract_duration ) +from ._shared import collapse_namespace_tags from helper.local_library import read_sidecar, find_sidecar, write_sidecar, LocalLibraryDB from helper.utils import sha256_file from metadata import embed_metadata_in_file @@ -133,6 +134,31 @@ def _cleanup_sidecar_files(media_path: Path, *extra_paths: Optional[Path]) -> No continue +def _show_local_result_table(file_hash: Optional[str], config: Dict[str, Any]) -> None: + """Run search-file by hash to display the newly added local file in a table.""" + if not file_hash: + return + try: + from cmdlets import search_file as search_cmd + temp_ctx = models.PipelineStageContext(0, 1) + saved_ctx = ctx.get_stage_context() + ctx.set_stage_context(temp_ctx) + try: + # Call the cmdlet exactly like the user would type: search-file "hash:...,store:local" + search_cmd._run(None, [f"hash:{file_hash},store:local"], config) + try: + table = ctx.get_last_result_table() + if table is not None: + log("") + log(table.format_plain()) + except Exception: + pass + finally: + ctx.set_stage_context(saved_ctx) + except Exception as exc: + debug(f"[add-file] Skipped search-file display: {exc}") + + def _persist_local_metadata( library_root: Path, dest_path: Path, @@ -209,7 +235,7 @@ def _handle_local_transfer( try: destination_root.mkdir(parents=True, exist_ok=True) except Exception as exc: - log(f"❌ Cannot prepare destination directory {destination_root}: {exc}", file=sys.stderr) + log(f"Cannot prepare destination directory {destination_root}: {exc}", file=sys.stderr) return 1, None @@ -234,8 +260,8 @@ def _handle_local_transfer( return f"title:{value}" return tag - tags_from_result = [normalize_title_tag(t) for t in tags_from_result] - sidecar_tags = [normalize_title_tag(t) for t in sidecar_tags] + tags_from_result = collapse_namespace_tags([normalize_title_tag(t) for t in tags_from_result], "title", prefer="last") + sidecar_tags = collapse_namespace_tags([normalize_title_tag(t) for t in sidecar_tags], "title", prefer="last") # Merge tags carefully: if URL has title tag, don't include sidecar title tags # This prevents duplicate title: tags when URL provides a title @@ -295,6 +321,7 @@ def _handle_local_transfer( else: # Ensure filename is the hash when adding to local storage resolved_hash = _resolve_file_hash(result, sidecar_hash, media_path) + hashed_move_done = False if resolved_hash: hashed_name = resolved_hash + media_path.suffix target_path = destination_root / hashed_name @@ -305,7 +332,13 @@ def _handle_local_transfer( pass if media_path != target_path: media_path = media_path.rename(target_path) - dest_file = storage["local"].upload(media_path, location=str(destination_root), move=True) + hashed_move_done = True + + if hashed_move_done and media_path.parent.samefile(destination_root): + # Already placed at final destination with hash name; skip extra upload/move + dest_file = str(media_path) + else: + dest_file = storage["local"].upload(media_path, location=str(destination_root), move=True) except Exception as exc: log(f"❌ Failed to move file into {destination_root}: {exc}", file=sys.stderr) return 1, None @@ -316,7 +349,7 @@ def _handle_local_transfer( # If we have a title tag, keep it. Otherwise, derive from filename. has_title = any(str(t).strip().lower().startswith("title:") for t in merged_tags) - final_tags = merged_tags + final_tags = collapse_namespace_tags(merged_tags, "title", prefer="last") if not has_title: filename_title = dest_path.stem.replace("_", " ").strip() @@ -326,7 +359,7 @@ def _handle_local_transfer( if not export_mode: _persist_local_metadata(destination_root, dest_path, final_tags, merged_urls, file_hash, relationships, duration, media_kind) _cleanup_sidecar_files(media_path, sidecar_path) - debug(f"✅ Moved to local library: {dest_path}") + _show_local_result_table(file_hash, config or {}) else: debug(f"✅ Exported to destination: {dest_path}") return 0, dest_path @@ -390,9 +423,17 @@ def _run(result: Any, _args: Sequence[str], config: Dict[str, Any]) -> int: location = str(path_value) # Get location from parsed args - now uses SharedArgs.STORAGE so key is "storage" - location = parsed.get("storage") - if location: - location = str(location).lower().strip() + storage_arg = parsed.get("storage") + if location is None: + location = storage_arg + if location: + location = str(location).lower().strip() + elif storage_arg: + # User provided both -path (as destination) and -storage; prefer explicit storage only if it matches + storage_str = str(storage_arg).lower().strip() + if storage_str != str(location).lower(): + log(f"❌ Conflicting destinations: -path '{location}' vs -storage '{storage_str}'", file=sys.stderr) + return 1 # Get file provider from parsed args provider_name = parsed.get("provider") @@ -973,8 +1014,14 @@ def _run(result: Any, _args: Sequence[str], config: Dict[str, Any]) -> int: except OSError as exc: log(f"Failed to delete sidecar: {exc}", file=sys.stderr) - log(f"✅ Successfully completed: {media_path.name} (hash={file_hash})", file=sys.stderr) - + # Decide whether to surface search-file results at end of pipeline + stage_ctx = ctx.get_stage_context() + is_storage_target = location is not None + should_display = is_storage_target and (stage_ctx is None or stage_ctx.is_last_stage) + + if (not should_display) or not file_hash: + log(f"Successfully completed: {media_path.name} (hash={file_hash})", file=sys.stderr) + # Emit result for Hydrus uploads so downstream commands know about it if location == 'hydrus': # Extract title from original result, fallback to filename if not available @@ -999,6 +1046,17 @@ def _run(result: Any, _args: Sequence[str], config: Dict[str, Any]) -> int: # Clear the stage table so downstream @N doesn't try to re-run download-data # Next stage will use these Hydrus file results, not format objects ctx.set_current_stage_table(None) + + # If this is the last stage (or not in a pipeline), show the file via search-file + if should_display and file_hash: + try: + from cmdlets import search_file as search_cmdlet + search_cmdlet._run(None, [f"hash:{file_hash}"], config) + except Exception: + debug("search-file lookup after add-file failed", file=sys.stderr) + elif file_hash: + # Not displaying search results here, so report completion normally + log(f"Successfully completed: {media_path.name} (hash={file_hash})", file=sys.stderr) return 0 diff --git a/cmdlets/add_tags.py b/cmdlets/add_tags.py index cf39c14..fd00da1 100644 --- a/cmdlets/add_tags.py +++ b/cmdlets/add_tags.py @@ -14,7 +14,7 @@ from ._shared import normalize_result_input, filter_results_by_temp from helper import hydrus as hydrus_wrapper from helper.local_library import read_sidecar, write_sidecar, find_sidecar, has_sidecar, LocalLibraryDB from metadata import rename -from ._shared import Cmdlet, CmdletArg, normalize_hash, parse_tag_arguments, expand_tag_groups, parse_cmdlet_args +from ._shared import Cmdlet, CmdletArg, normalize_hash, parse_tag_arguments, expand_tag_groups, parse_cmdlet_args, collapse_namespace_tags from config import get_local_storage_path @@ -176,7 +176,7 @@ def _refresh_tags_view(res: Any, hydrus_hash: Optional[str], file_hash: Optional target_hash = hydrus_hash or file_hash refresh_args: List[str] = [] if target_hash: - refresh_args = ["-hash", target_hash] + refresh_args = ["-hash", target_hash, "-store", target_hash] try: subject = ctx.get_last_result_subject() @@ -413,6 +413,9 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: if new_tag not in existing_tags: existing_tags.append(new_tag) + # Ensure only one tag per namespace (e.g., single title:) with latest preferred + existing_tags = collapse_namespace_tags(existing_tags, "title", prefer="last") + # Compute new tags relative to original new_tags_added = [t for t in existing_tags if isinstance(t, str) and t.lower() not in original_tags_lower] total_new_tags += len(new_tags_added) diff --git a/cmdlets/delete_file.py b/cmdlets/delete_file.py index 42fdf02..8eda253 100644 --- a/cmdlets/delete_file.py +++ b/cmdlets/delete_file.py @@ -16,6 +16,35 @@ from config import get_local_storage_path from helper.local_library import LocalLibraryDB +def _refresh_last_search(config: Dict[str, Any]) -> None: + """Re-run the last search-file to refresh the table after deletes.""" + try: + source_cmd = ctx.get_last_result_table_source_command() if hasattr(ctx, "get_last_result_table_source_command") else None + if source_cmd not in {"search-file", "search_file", "search"}: + return + + args = ctx.get_last_result_table_source_args() if hasattr(ctx, "get_last_result_table_source_args") else [] + try: + from cmdlets import search_file as search_file_cmd # type: ignore + except Exception: + return + + # Re-run the prior search to refresh items/table without disturbing history + search_file_cmd._run(None, args, config) + + # Set an overlay so action-command pipeline output displays the refreshed table + try: + new_table = ctx.get_last_result_table() + new_items = ctx.get_last_result_items() + subject = ctx.get_last_result_subject() if hasattr(ctx, "get_last_result_subject") else None + if hasattr(ctx, "set_last_result_table_overlay") and new_table and new_items is not None: + ctx.set_last_result_table_overlay(new_table, new_items, subject) + except Exception: + pass + except Exception as exc: + debug(f"[delete_file] search refresh failed: {exc}", file=sys.stderr) + + def _cleanup_relationships(db_path: Path, file_hash: str) -> int: @@ -342,7 +371,10 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: for item in items: if _process_single_item(item, override_hash, conserve, lib_root, reason, config): success_count += 1 - + + if success_count > 0: + _refresh_last_search(config) + return 0 if success_count > 0 else 1 CMDLET = Cmdlet( diff --git a/cmdlets/download_data.py b/cmdlets/download_data.py index c09aaef..49c4ab0 100644 --- a/cmdlets/download_data.py +++ b/cmdlets/download_data.py @@ -2484,7 +2484,7 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any], emit_results: # Create result table for format display table = ResultTable(title=f"Available Formats - {probe_info.get('title', 'Unknown')}") - for fmt in formats: + for idx, fmt in enumerate(formats, start=1): row = table.add_row() row.add_column("Format ID", fmt.get("format_id", "")) @@ -2518,38 +2518,26 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any], emit_results: if fmt.get("filesize"): size_mb = fmt["filesize"] / (1024 * 1024) row.add_column("Size", f"{size_mb:.1f} MB") + + # Enable @N expansion to rerun download-data with -item idx + row.set_selection_args(["-item", str(idx)]) # Set source command for @N expansion table.set_source_command("download-data", [url]) - # Note: Row selection args are not set - users select with @N syntax directly - - # Display table and emit as pipeline result + # Display table log(str(table), flush=True) formats_displayed = True # Store table for @N expansion so CLI can reconstruct commands - # Uses separate current_stage_table instead of result history table pipeline_context.set_current_stage_table(table) - - # Always emit formats so they can be selected with @N - for i, fmt in enumerate(formats, 1): - pipeline_context.emit({ - "format_id": fmt.get("format_id", ""), - "format_string": fmt.get("format", ""), - "resolution": fmt.get("resolution", ""), - "vcodec": fmt.get("vcodec", ""), - "acodec": fmt.get("acodec", ""), - "ext": fmt.get("ext", ""), - "filesize": fmt.get("filesize"), - "source_url": url, - "index": i, - }) - debug(f"Use @N syntax to select a format and download") + pipeline_context.set_last_result_table_overlay(table, formats) + debug("Use @N to pick a format; pipeline paused until selection") else: log(f"✗ No formats available for this URL", file=sys.stderr) - continue # Skip download, just show formats + # Stop pipeline here; selection via @N will re-run download-data with -item + return 0 # ====== AUTO-DETECT MULTIPLE FORMATS ====== # Check if multiple formats exist and handle based on -item flag @@ -2636,35 +2624,21 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any], emit_results: # Set source command for @N expansion table.set_source_command("download-data", [url]) - + # Set row selection args so @N expands to "download-data URL -item N" for i in range(len(formats)): - # i is 0-based index, but -item expects 1-based index table.set_row_selection_args(i, ["-item", str(i + 1)]) - - # Display table and emit formats so they can be selected with @N - debug(str(table)) + + # Display table + log(str(table), flush=True) debug(f"💡 Use @N syntax to select a format and download (e.g., @1)") - + # Store table for @N expansion so CLI can reconstruct commands pipeline_context.set_current_stage_table(table) - - # Emit formats as pipeline results for @N selection - for i, fmt in enumerate(formats, 1): - pipeline_context.emit({ - "format_id": fmt.get("format_id", ""), - "format_string": fmt.get("format", ""), - "resolution": fmt.get("resolution", ""), - "vcodec": fmt.get("vcodec", ""), - "acodec": fmt.get("acodec", ""), - "filesize": fmt.get("filesize"), - "tbr": fmt.get("tbr"), - "source_url": url, - "index": i, - }) - + pipeline_context.set_last_result_table_overlay(table, formats) + formats_displayed = True # Mark that we displayed formats - continue # Skip download, user must select format via @N + return 0 # Pause pipeline; user must select format via @N debug(f"Downloading: {url}") @@ -2951,41 +2925,30 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any], emit_results: if downloaded_files or files_downloaded_directly > 0: total_files = len(downloaded_files) + files_downloaded_directly debug(f"✓ Successfully downloaded {total_files} file(s)") - - # Create a result table for the downloaded files - # This ensures that subsequent @N commands select from these files - # instead of trying to expand the previous command (e.g. search-file) - if downloaded_files: - from result_table import ResultTable - table = ResultTable("Downloaded Files") - for i, file_path in enumerate(downloaded_files): - # Ensure file_path is a Path object - if isinstance(file_path, str): - file_path = Path(file_path) - - row = table.add_row() - row.add_column("#", str(i + 1)) - row.add_column("File", file_path.name) - row.add_column("Path", str(file_path)) - try: - size_mb = file_path.stat().st_size / (1024*1024) - row.add_column("Size", f"{size_mb:.1f} MB") - except OSError: - row.add_column("Size", "?") - - # Set selection args to just the file path (or index if we want item selection) - # For item selection fallback, we don't strictly need row args if source command is None - # But setting them helps if we want to support command expansion later - table.set_row_selection_args(i, [str(file_path)]) - - # Register the table but DO NOT set a source command - # This forces CLI to use item-based selection (filtering the pipe) - # instead of command expansion - pipeline_context.set_last_result_table_overlay(table, downloaded_files) - pipeline_context.set_current_stage_table(table) - - # Also print the table so user sees what they got - log(str(table), flush=True) + + stage_ctx = pipeline_context.get_stage_context() + should_display_results = stage_ctx is None or stage_ctx.is_last_stage + + if downloaded_files and should_display_results: + try: + from cmdlets import search_file as search_cmdlet + except Exception: + search_cmdlet = None + + if search_cmdlet: + seen_hashes: set[str] = set() + for file_entry in downloaded_files: + path_obj = Path(file_entry) if not isinstance(file_entry, Path) else file_entry + if not path_obj.is_file(): + continue + file_hash = _compute_file_hash(path_obj) + if file_hash and file_hash not in seen_hashes: + seen_hashes.add(file_hash) + search_cmdlet._run(None, [f"hash:{file_hash}"], config) + else: + debug("search-file not available; skipping post-download display") + elif downloaded_files: + debug("Skipping search-file display because downstream pipeline is present") if db: db.update_worker_status(worker_id, 'completed') diff --git a/cmdlets/get_file.py b/cmdlets/get_file.py index 6096356..a475dd2 100644 --- a/cmdlets/get_file.py +++ b/cmdlets/get_file.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any, Dict, Optional, Sequence +from typing import Any, Callable, Dict, List, Optional, Sequence from pathlib import Path import shutil as _shutil import subprocess as _subprocess @@ -8,13 +8,15 @@ import json import sys import platform +import threading + from helper.logger import log, debug import uuid as _uuid import time as _time from helper.progress import print_progress, print_final_progress from helper.http_client import HTTPClient -from helper.mpv_ipc import get_ipc_pipe_path, send_to_mpv +from helper.mpv_ipc import get_ipc_pipe_path, send_to_mpv, MPV_LUA_SCRIPT_PATH import fnmatch as _fnmatch from . import register @@ -25,6 +27,9 @@ from ._shared import Cmdlet, CmdletArg, normalize_hash, looks_like_hash, create_ from config import resolve_output_dir, get_hydrus_url, get_hydrus_access_key from helper.alldebrid import AllDebridClient +DEFAULT_DEBRID_WAIT_TIMEOUT = 600 +DEBRID_WORKER_PREFIX = "debrid_" + @@ -83,70 +88,13 @@ def _handle_alldebrid_pipe(config: Dict[str, Any], args: Sequence[str]) -> int: log("✗ No valid magnet IDs in pipe", file=sys.stderr) return 1 - # Get API key - from config import get_debrid_api_key - api_key = get_debrid_api_key(config) - if not api_key: - log("✗ AllDebrid API key not configured", file=sys.stderr) - return 1 - - # Download from each magnet - client = AllDebridClient(api_key) - total_files = 0 - failed_files = 0 - - log(f"Processing {len(magnets)} magnet(s)...", file=sys.stderr) - - for magnet_id in magnets: - try: - # Fetch magnet files using magnet_status with include_files - magnet_info = client.magnet_status(magnet_id, include_files=True) - - files_list = _extract_files_from_magnet(magnet_info, file_filter) - - if not files_list: - log(f"⊘ No files in magnet {magnet_id}", file=sys.stderr) - continue - - log(f"✓ Found {len(files_list)} file(s) in magnet {magnet_id}", file=sys.stderr) - - # Download each file - for file_info in files_list: - try: - link = file_info['link'] - filename = file_info['name'] - - # Unlock link to get direct URL - try: - direct_url = client.unlock_link(link) - if not direct_url: - log(f"✗ Failed to unlock link for {filename}", file=sys.stderr) - failed_files += 1 - continue - except Exception as e: - log(f"✗ Error unlocking link: {e}", file=sys.stderr) - failed_files += 1 - continue - - # Download file - output_file = out_path / filename - if _download_file_from_alldebrid(direct_url, output_file, filename, file_info['size']): - log(f"✓ Downloaded: {filename}", file=sys.stderr) - total_files += 1 - else: - log(f"✗ Failed to download: {filename}", file=sys.stderr) - failed_files += 1 - - except Exception as e: - log(f"✗ Error downloading file: {e}", file=sys.stderr) - failed_files += 1 - - except Exception as e: - log(f"✗ Error processing magnet {magnet_id}: {e}", file=sys.stderr) - failed_files += 1 - - log(f"✓ Download complete: {total_files} file(s) downloaded, {failed_files} failed", file=sys.stderr) - return 0 if failed_files == 0 else 1 + return _queue_alldebrid_worker( + config=config, + output_dir=out_path, + magnet_ids=magnets, + title=f"AllDebrid pipe ({len(magnets)} magnet{'s' if len(magnets) != 1 else ''})", + file_filter=file_filter, + ) def _extract_files_from_magnet(magnet_info: Dict[str, Any], filter_pattern: Optional[str] = None) -> list: @@ -219,6 +167,202 @@ def _download_file_from_alldebrid(url: str, output_path: Path, filename: str, fi return False +def _queue_alldebrid_worker( + config: Dict[str, Any], + output_dir: Path, + magnet_ids: Sequence[int], + title: str, + file_filter: Optional[str] = None, + wait_timeout: int = DEFAULT_DEBRID_WAIT_TIMEOUT, +): + """Spawn a background worker to download AllDebrid magnets.""" + from config import get_debrid_api_key + + if not magnet_ids: + log("✗ No magnet IDs provided for AllDebrid download", file=sys.stderr) + return 1 + + api_key = get_debrid_api_key(config) + if not api_key: + log("✗ AllDebrid API key not configured", file=sys.stderr) + return 1 + + worker_id = f"{DEBRID_WORKER_PREFIX}{_uuid.uuid4().hex[:8]}" + worker_manager = config.get('_worker_manager') + if worker_manager: + try: + worker_manager.track_worker( + worker_id, + worker_type="download_debrid", + title=title, + description=f"AllDebrid download for {title}", + pipe=ctx.get_current_command_text(), + ) + except Exception as exc: + debug(f"⚠ Failed to register AllDebrid worker: {exc}") + worker_manager = None + + thread = threading.Thread( + target=_run_alldebrid_download_worker, + args=( + worker_id, + api_key, + output_dir, + list(magnet_ids), + file_filter, + title, + worker_manager, + wait_timeout, + ), + daemon=False, + name=f"AllDebridWorker_{worker_id}" + ) + thread.start() + + ctx.emit({ + 'worker_id': worker_id, + 'worker_type': 'download_debrid', + 'status': 'running', + 'message': f"{title} (queued)", + }) + + log(f"🌀 AllDebrid download queued (worker {worker_id})", file=sys.stderr) + return 0 + + +def _run_alldebrid_download_worker( + worker_id: str, + api_key: str, + output_dir: Path, + magnet_ids: List[int], + file_filter: Optional[str], + title: str, + worker_manager: Optional[Any], + wait_timeout: int, +): + """Worker entrypoint that polls AllDebrid and downloads magnet files.""" + def log_progress(message: str) -> None: + safe = f"[Worker {worker_id}] {message}" + debug(safe) + if worker_manager: + try: + worker_manager.log_step(worker_id, message) + except Exception: + pass + + try: + client = AllDebridClient(api_key) + except Exception as exc: + log_progress(f"✗ Failed to initialize AllDebrid client: {exc}") + if worker_manager: + try: + worker_manager.finish_worker(worker_id, "failed", str(exc)) + except Exception: + pass + return + + output_dir.mkdir(parents=True, exist_ok=True) + total_downloaded = 0 + total_failed = 0 + + for magnet_id in magnet_ids: + log_progress(f"⧗ Processing magnet {magnet_id}") + try: + status_info = client.magnet_status(magnet_id) + except Exception as exc: + log_progress(f"✗ Failed to query magnet {magnet_id}: {exc}") + total_failed += 1 + continue + + try: + ready_status = _wait_for_magnet_ready(client, magnet_id, log_progress, wait_timeout) + except Exception as exc: + log_progress(f"✗ Magnet {magnet_id} did not become ready: {exc}") + total_failed += 1 + continue + + try: + magnet_info = client.magnet_status(magnet_id, include_files=True) + except Exception as exc: + log_progress(f"✗ Failed to list files for magnet {magnet_id}: {exc}") + total_failed += 1 + continue + + files_list = _extract_files_from_magnet(magnet_info, file_filter) + if not files_list: + log_progress(f"⊘ Magnet {magnet_id} has no files") + total_failed += 1 + continue + + for file_info in files_list: + name = file_info.get('name', 'unknown') + log_progress(f"⇓ Downloading {name}") + link = file_info.get('link') + if not link: + log_progress(f"✗ Missing link for {name}") + total_failed += 1 + continue + + try: + direct_url = client.unlock_link(link) + except Exception as exc: + log_progress(f"✗ Failed to unlock {name}: {exc}") + total_failed += 1 + continue + + output_file = output_dir / name + if _download_file_from_alldebrid(direct_url, output_file, name, file_info.get('size', 0)): + total_downloaded += 1 + else: + total_failed += 1 + + if total_downloaded or total_failed: + summary = f"{total_downloaded} file(s) downloaded, {total_failed} failed" + else: + summary = "No files were processed" + + log(f"✓ AllDebrid worker {worker_id}: {summary}", file=sys.stderr) + if worker_manager: + status = "success" if total_downloaded > 0 else "failed" + try: + worker_manager.finish_worker(worker_id, status, summary if status == "failed" else "") + except Exception: + pass + + +def _wait_for_magnet_ready( + client: AllDebridClient, + magnet_id: int, + log_progress: Callable[[str], None], + wait_timeout: int, +) -> Dict[str, Any]: + elapsed = 0 + last_report = -5 + while elapsed < wait_timeout: + try: + status = client.magnet_status(magnet_id) + except Exception as exc: + log_progress(f"⚠ Live status check failed: {exc}") + _time.sleep(2) + elapsed += 2 + continue + + status_code = int(status.get('statusCode', -1)) + if status_code == 4: + return status + if status_code >= 5: + raise RuntimeError(status.get('status', f"Failed code {status_code}")) + if elapsed - last_report >= 5: + downloaded = status.get('downloaded', 0) + size = status.get('size', 0) + percent = (downloaded / size * 100) if size else 0 + log_progress(f"⧗ {status.get('status', 'processing')} — {percent:.1f}%") + last_report = elapsed + _time.sleep(2) + elapsed += 2 + raise TimeoutError(f"Magnet {magnet_id} not ready after {wait_timeout}s") + + def _is_playable_in_mpv(file_path_or_ext: str, mime_type: Optional[str] = None) -> bool: """Check if file can be played in MPV based on extension or mime type.""" from helper.utils_constant import mime_maps @@ -265,8 +409,13 @@ def _play_in_mpv(file_url: str, file_title: str, is_stream: bool = False, header ipc_pipe = get_ipc_pipe_path() debug(f"[get-file] Starting new MPV instance (pipe: {ipc_pipe})", file=sys.stderr) - # Build command - start MPV without a file initially, just with IPC server + # Build command - start MPV without a file initially, just with IPC server and our Lua helper cmd = ['mpv', f'--input-ipc-server={ipc_pipe}'] + try: + if MPV_LUA_SCRIPT_PATH and Path(MPV_LUA_SCRIPT_PATH).exists(): + cmd.append(f"--scripts-append={MPV_LUA_SCRIPT_PATH}") + except Exception: + pass if headers: # Format headers for command line @@ -468,10 +617,12 @@ def _handle_hydrus_file(file_hash: Optional[str], file_title: str, config: Dict[ elif force_mpv or (is_media and mpv_available): # Auto-play in MPV for media files (if available), or user requested it if _play_in_mpv(stream_url, file_title, is_stream=True, headers=headers): - # Show pipe menu instead of emitting result for display - # This allows immediate @N selection from the playlist - from . import pipe - pipe._run(None, [], config) + # Show unified MPV playlist view (reuse cmdnats.pipe display) + try: + from cmdnats import pipe as mpv_pipe + mpv_pipe._run(None, [], config) + except Exception: + pass return 0 else: # Fall back to browser @@ -580,10 +731,12 @@ def _handle_local_file(file_path: Optional[str], file_title: str, config: Dict[s elif force_mpv or (is_media and mpv_available): # Auto-play in MPV for media files (if available), or user requested it if _play_in_mpv(file_path, file_title, is_stream=False): - # Show pipe menu instead of emitting result for display - # This allows immediate @N selection from the playlist - from . import pipe - pipe._run(None, [], config) + # Show unified MPV playlist view (reuse cmdnats.pipe display) + try: + from cmdnats import pipe as mpv_pipe + mpv_pipe._run(None, [], config) + except Exception: + pass return 0 else: # Fall back to default application @@ -661,94 +814,12 @@ def _handle_debrid_file(magnet_id: int, magnet_title: str, config: Dict[str, Any log(f"✗ Error creating output directory: {e}", file=sys.stderr) return 1 - # Get API key - from config import get_debrid_api_key - api_key = get_debrid_api_key(config) - if not api_key: - log("✗ AllDebrid API key not configured in config.json", file=sys.stderr) - return 1 - - try: - client = AllDebridClient(api_key) - - debug(f"[get-file] Downloading magnet {magnet_id}: {magnet_title}", file=sys.stderr) - - # Fetch magnet files - try: - magnet_info = client.magnet_status(magnet_id, include_files=True) - except Exception as e: - log(f"✗ Failed to fetch magnet files: {e}", file=sys.stderr) - return 1 - - # Extract files from magnet - files_list = _extract_files_from_magnet(magnet_info) - - if not files_list: - log(f"✗ No files in magnet {magnet_id}", file=sys.stderr) - return 1 - - log(f"✓ Found {len(files_list)} file(s) in magnet {magnet_id}", file=sys.stderr) - - # Download each file - total_files = 0 - failed_files = 0 - - for file_info in files_list: - try: - link = file_info['link'] - filename = file_info['name'] - file_size = file_info['size'] - - # Unlock link to get direct URL - try: - direct_url = client.unlock_link(link) - if not direct_url: - log(f"✗ Failed to unlock link for {filename}", file=sys.stderr) - failed_files += 1 - continue - except Exception as e: - log(f"✗ Error unlocking link: {e}", file=sys.stderr) - failed_files += 1 - continue - - # Download file - output_file = out_path / filename - if _download_file_from_alldebrid(direct_url, output_file, filename, file_size): - log(f"✓ Downloaded: {filename}", file=sys.stderr) - total_files += 1 - else: - log(f"✗ Failed to download: {filename}", file=sys.stderr) - failed_files += 1 - - except Exception as e: - log(f"✗ Error downloading file: {e}", file=sys.stderr) - failed_files += 1 - - log(f"✓ Download complete: {total_files} file(s) downloaded, {failed_files} failed", file=sys.stderr) - - if total_files > 0: - # Emit result for downstream processing - result_dict = create_pipe_object_result( - source='debrid', - identifier=str(magnet_id), - file_path=str(out_path), - cmdlet_name='get-file', - title=magnet_title, - extra={ - 'magnet_id': magnet_id, - 'files_downloaded': total_files, - 'download_dir': str(out_path) - } - ) - ctx.emit(result_dict) - - return 0 if failed_files == 0 else 1 - - except Exception as e: - log(f"✗ Error processing debrid download: {e}", file=sys.stderr) - import traceback - traceback.print_exc(file=sys.stderr) - return 1 + return _queue_alldebrid_worker( + config=config, + output_dir=out_path, + magnet_ids=[magnet_id], + title=magnet_title or f"magnet {magnet_id}", + ) @register(["get-file"]) # primary name @@ -1043,7 +1114,13 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: else: base_name = 'export' - local_target = get_field(result, 'target', None) + # Accept multiple path-ish fields so @ selection from MPV playlist rows or ad-hoc dicts still resolve. + local_target = ( + get_field(result, 'target', None) + or get_field(result, 'path', None) + or get_field(result, 'file_path', None) + or get_field(result, 'filename', None) + ) is_url = isinstance(local_target, str) and local_target.startswith(('http://', 'https://')) # Establish file hash (prefer -hash override when provided and valid) if hash_spec and looks_like_hash(hash_spec): @@ -1580,19 +1657,22 @@ def _handle_ytdlp_download(url: str, title: str, config: Dict[str, Any], args: S if not force_local: # Default: Stream to MPV if _play_in_mpv(url, title, is_stream=True): - from . import pipe - pipe._run(None, [], config) - return 0 + try: + from cmdnats import pipe as mpv_pipe + mpv_pipe._run(None, [], config) + except Exception: + pass + return 0 else: - # Fallback to browser - try: - import webbrowser - webbrowser.open(url) - debug(f"[get-file] Opened in browser: {title}", file=sys.stderr) - return 0 - except Exception: - pass - return 1 + # Fallback to browser + try: + import webbrowser + webbrowser.open(url) + debug(f"[get-file] Opened in browser: {title}", file=sys.stderr) + return 0 + except Exception: + pass + return 1 # Download mode try: diff --git a/cmdlets/get_metadata.py b/cmdlets/get_metadata.py index ac3b4f0..1ea8c7a 100644 --- a/cmdlets/get_metadata.py +++ b/cmdlets/get_metadata.py @@ -10,7 +10,85 @@ import mimetypes import os from helper import hydrus as hydrus_wrapper +from helper.local_library import LocalLibraryDB from ._shared import Cmdlet, CmdletArg, normalize_hash +from config import get_local_storage_path +import pipeline as ctx +from result_table import ResultTable + + +def _extract_imported_ts(meta: Dict[str, Any]) -> Optional[int]: + """Extract an imported timestamp from Hydrus metadata if available.""" + if not isinstance(meta, dict): + return None + + # Prefer explicit time_imported if present + explicit = meta.get("time_imported") + if isinstance(explicit, (int, float)): + return int(explicit) + + file_services = meta.get("file_services") + if isinstance(file_services, dict): + current = file_services.get("current") + if isinstance(current, dict): + numeric = [int(v) for v in current.values() if isinstance(v, (int, float))] + if numeric: + return min(numeric) + return None + + +def _format_imported(ts: Optional[int]) -> str: + if not ts: + return "" + try: + import datetime as _dt + return _dt.datetime.utcfromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S") + except Exception: + return "" + + +def _build_table_row(title: str, origin: str, path: str, mime: str, size_bytes: Optional[int], dur_seconds: Optional[int], imported_ts: Optional[int], urls: list[str], hash_value: Optional[str], pages: Optional[int] = None) -> Dict[str, Any]: + size_mb = None + if isinstance(size_bytes, int): + try: + size_mb = int(size_bytes / (1024 * 1024)) + except Exception: + size_mb = None + + dur_int = int(dur_seconds) if isinstance(dur_seconds, (int, float)) else None + pages_int = int(pages) if isinstance(pages, (int, float)) else None + imported_label = _format_imported(imported_ts) + + duration_label = "Duration(s)" + duration_value = str(dur_int) if dur_int is not None else "" + if mime and mime.lower().startswith("application/pdf"): + duration_label = "Pages" + duration_value = str(pages_int) if pages_int is not None else "" + + columns = [ + ("Title", title or ""), + ("Hash", hash_value or ""), + ("MIME", mime or ""), + ("Size(MB)", str(size_mb) if size_mb is not None else ""), + (duration_label, duration_value), + ("Imported", imported_label), + ("Store", origin or ""), + ] + + return { + "title": title or path, + "path": path, + "origin": origin, + "mime": mime, + "size_bytes": size_bytes, + "duration_seconds": dur_int, + "pages": pages_int, + "imported_ts": imported_ts, + "imported": imported_label, + "hash": hash_value, + "known_urls": urls, + "columns": columns, + } def _run(result: Any, _args: Sequence[str], config: Dict[str, Any]) -> int: @@ -69,43 +147,50 @@ def _run(result: Any, _args: Sequence[str], config: Dict[str, Any]) -> int: if not mime_type: mime_type = "unknown" - # Get file size - try: - file_size = file_path.stat().st_size - except Exception: - file_size = None - - # Try to get duration if it's a media file + # Pull metadata from local DB if available (for imported timestamp, duration, etc.) + db_metadata = None + library_root = get_local_storage_path(config) + if library_root: + try: + with LocalLibraryDB(library_root) as db: + db_metadata = db.get_metadata(file_path) or None + except Exception: + db_metadata = None + + # Get file size (prefer DB size if present) + file_size = None + if isinstance(db_metadata, dict) and isinstance(db_metadata.get("size"), int): + file_size = db_metadata.get("size") + else: + try: + file_size = file_path.stat().st_size + except Exception: + file_size = None + + # Duration/pages duration_seconds = None - try: - # Try to use ffprobe if available - import subprocess - result_proc = subprocess.run( - ["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", str(file_path)], - capture_output=True, - text=True, - timeout=5 - ) - if result_proc.returncode == 0 and result_proc.stdout.strip(): - try: + pages = None + if isinstance(db_metadata, dict): + if isinstance(db_metadata.get("duration"), (int, float)): + duration_seconds = float(db_metadata.get("duration")) + if isinstance(db_metadata.get("pages"), (int, float)): + pages = int(db_metadata.get("pages")) + + if duration_seconds is None and mime_type and mime_type.startswith("video"): + try: + import subprocess + result_proc = subprocess.run( + ["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", str(file_path)], + capture_output=True, + text=True, + timeout=5 + ) + if result_proc.returncode == 0 and result_proc.stdout.strip(): duration_seconds = float(result_proc.stdout.strip()) - except ValueError: - pass - except Exception: - pass - - # Get format helpers from search module - try: - from .search_file import _format_size as _fmt_size - from .search_file import _format_duration as _fmt_dur - except Exception: - _fmt_size = lambda x: str(x) if x is not None else "" - _fmt_dur = lambda x: str(x) if x is not None else "" - - size_label = _fmt_size(file_size) if file_size is not None else "" - dur_label = _fmt_dur(duration_seconds) if duration_seconds is not None else "" - - # Get known URLs from sidecar or result + except Exception: + pass + + # Known URLs from sidecar or result urls = [] sidecar_path = Path(str(file_path) + '.tags') if sidecar_path.exists(): @@ -119,30 +204,45 @@ def _run(result: Any, _args: Sequence[str], config: Dict[str, Any]) -> int: urls.append(url_value) except Exception: pass - - # Fallback to result URLs if not in sidecar + if not urls: urls_from_result = get_field(result, "known_urls", None) or get_field(result, "urls", None) if isinstance(urls_from_result, list): urls.extend([str(u).strip() for u in urls_from_result if u]) - - # Display local file metadata - log(f"PATH: {file_path}") - if hash_hex: - log(f"HASH: {hash_hex}") - if mime_type: - log(f"MIME: {mime_type}") - if size_label: - log(f"Size: {size_label}") - if dur_label: - log(f"Duration: {dur_label}") - if urls: - log("URLs:") - for url in urls: - log(f" {url}") - + + imported_ts = None + if isinstance(db_metadata, dict): + ts = db_metadata.get("time_imported") or db_metadata.get("time_added") + if isinstance(ts, (int, float)): + imported_ts = int(ts) + elif isinstance(ts, str): + try: + import datetime as _dt + imported_ts = int(_dt.datetime.fromisoformat(ts).timestamp()) + except Exception: + imported_ts = None + + row = _build_table_row( + title=file_path.name, + origin="local", + path=str(file_path), + mime=mime_type or "", + size_bytes=int(file_size) if isinstance(file_size, int) else None, + dur_seconds=duration_seconds, + imported_ts=imported_ts, + urls=urls, + hash_value=hash_hex, + pages=pages, + ) + + table_title = file_path.name + table = ResultTable(table_title) + table.set_source_command("get-metadata", list(_args)) + table.add_result(row) + ctx.set_last_result_table_overlay(table, [row], row) + ctx.emit(row) return 0 - except Exception as exc: + except Exception: # Fall through to Hydrus if local file handling fails pass @@ -191,41 +291,37 @@ def _run(result: Any, _args: Sequence[str], config: Dict[str, Any]) -> int: inner = meta.get("metadata") if isinstance(meta.get("metadata"), dict) else None if duration_value is None and isinstance(inner, dict): duration_value = inner.get("duration") + + imported_ts = _extract_imported_ts(meta) try: - from .search_file import _format_size as _fmt_size - from .search_file import _format_duration as _fmt_dur from .search_file import _hydrus_duration_seconds as _dur_secs except Exception: - _fmt_size = lambda x: str(x) if x is not None else "" _dur_secs = lambda x: x - _fmt_dur = lambda x: str(x) if x is not None else "" - + dur_seconds = _dur_secs(duration_value) - dur_label = _fmt_dur(dur_seconds) if dur_seconds is not None else "" - size_label = _fmt_size(size) - - # Display Hydrus file metadata - log(f"PATH: hydrus://file/{hash_hex}") - log(f"Hash: {hash_hex}") - if mime: - log(f"MIME: {mime}") - if dur_label: - log(f"Duration: {dur_label}") - if size_label: - log(f"Size: {size_label}") - urls = meta.get("known_urls") or meta.get("urls") - if isinstance(urls, list) and urls: - log("URLs:") - for url in urls: - try: - text = str(url).strip() - except Exception: - text = "" - if text: - log(f" {text}") - + urls = [str(u).strip() for u in urls] if isinstance(urls, list) else [] + + row = _build_table_row( + title=hash_hex, + origin="hydrus", + path=f"hydrus://file/{hash_hex}", + mime=mime or "", + size_bytes=int(size) if isinstance(size, int) else None, + dur_seconds=int(dur_seconds) if isinstance(dur_seconds, (int, float)) else None, + imported_ts=imported_ts, + urls=urls, + hash_value=hash_hex, + pages=None, + ) + + table = ResultTable(hash_hex or "Metadata") + table.set_source_command("get-metadata", list(_args)) + table.add_result(row) + ctx.set_last_result_table_overlay(table, [row], row) + ctx.emit(row) + return 0 diff --git a/cmdlets/get_relationship.py b/cmdlets/get_relationship.py index 02e25dd..da851d3 100644 --- a/cmdlets/get_relationship.py +++ b/cmdlets/get_relationship.py @@ -57,6 +57,15 @@ def _run(result: Any, _args: Sequence[str], config: Dict[str, Any]) -> int: # Initialize results collection found_relationships = [] # List of dicts: {hash, type, title, path, origin} source_title = "Unknown" + + def _add_relationship(entry: Dict[str, Any]) -> None: + """Add relationship if not already present by hash or path.""" + for existing in found_relationships: + if entry.get("hash") and str(existing.get("hash", "")).lower() == str(entry["hash"]).lower(): + return + if entry.get("path") and str(existing.get("path", "")).lower() == str(entry["path"]).lower(): + return + found_relationships.append(entry) # Check for local file first file_path = None @@ -116,9 +125,10 @@ def _run(result: Any, _args: Sequence[str], config: Dict[str, Any]) -> int: except Exception: title = resolved_path.stem - found_relationships.append({ + entry_type = "king" if rel_type.lower() == "alt" else rel_type + _add_relationship({ "hash": h, - "type": rel_type, + "type": entry_type, "title": title, "path": path, "origin": "local" @@ -136,7 +146,12 @@ def _run(result: Any, _args: Sequence[str], config: Dict[str, Any]) -> int: print(f"[DEBUG] Parent path obj: {parent_path_obj}", file=sys.stderr) # Also add the king/parent itself if not already in results - if not any(str(r['hash']).lower() == str(path).lower() for r in found_relationships): + existing_parent = None + for r in found_relationships: + if str(r.get('hash', '')).lower() == str(path).lower() or str(r.get('path', '')).lower() == str(path).lower(): + existing_parent = r + break + if not existing_parent: parent_title = parent_path_obj.stem try: parent_tags = db.get_tags(parent_path_obj) @@ -148,7 +163,7 @@ def _run(result: Any, _args: Sequence[str], config: Dict[str, Any]) -> int: pass print(f"[DEBUG] ➕ Adding king/parent to results: {parent_title}", file=sys.stderr) - found_relationships.append({ + _add_relationship({ "hash": str(path), "type": "king" if rel_type.lower() == "alt" else rel_type, "title": parent_title, @@ -157,11 +172,8 @@ def _run(result: Any, _args: Sequence[str], config: Dict[str, Any]) -> int: }) else: # If already in results, ensure it's marked as king if appropriate - for r in found_relationships: - if str(r['hash']).lower() == str(path).lower(): - if rel_type.lower() == "alt": - r['type'] = "king" - break + if rel_type.lower() == "alt": + existing_parent['type'] = "king" # 1. Check forward relationships from parent (siblings) parent_metadata = db.get_metadata(parent_path_obj) @@ -185,13 +197,8 @@ def _run(result: Any, _args: Sequence[str], config: Dict[str, Any]) -> int: print(f"[DEBUG] ⏭️ Hash doesn't resolve, skipping: {child_h}", file=sys.stderr) continue - # Skip the current file we're querying - if str(child_path_obj).lower() == str(path_obj).lower(): - print(f"[DEBUG] ⏭️ Skipping current file: {child_path_obj}", file=sys.stderr) - continue - - # Check if already added (case-insensitive hash check) - if any(str(r['hash']).lower() == str(child_h).lower() for r in found_relationships): + # Check if already added (case-insensitive hash/path check) + if any(str(r.get('hash', '')).lower() == str(child_h).lower() or str(r.get('path', '')).lower() == str(child_path_obj).lower() for r in found_relationships): print(f"[DEBUG] ⏭️ Already in results: {child_h}", file=sys.stderr) continue @@ -207,7 +214,7 @@ def _run(result: Any, _args: Sequence[str], config: Dict[str, Any]) -> int: pass print(f"[DEBUG] ➕ Adding sibling: {child_title}", file=sys.stderr) - found_relationships.append({ + _add_relationship({ "hash": child_h, "type": f"alt" if child_type == "alt" else f"sibling ({child_type})", "title": child_title, @@ -226,13 +233,8 @@ def _run(result: Any, _args: Sequence[str], config: Dict[str, Any]) -> int: child_type = child['type'] print(f"[DEBUG] Reverse child: {child_path}, type: {child_type}", file=sys.stderr) - # Skip the current file - if str(child_path).lower() == str(path_obj).lower(): - print(f"[DEBUG] ⏭️ Skipping self", file=sys.stderr) - continue - - # Skip if already added (check by path, case-insensitive) - if any(str(r.get('path', '')).lower() == str(child_path).lower() for r in found_relationships): + # Skip if already added (check by path/hash, case-insensitive) + if any(str(r.get('path', '')).lower() == str(child_path).lower() or str(r.get('hash', '')).lower() == str(child_path).lower() for r in found_relationships): print(f"[DEBUG] ⏭️ Already in results: {child_path}", file=sys.stderr) continue @@ -248,7 +250,7 @@ def _run(result: Any, _args: Sequence[str], config: Dict[str, Any]) -> int: pass print(f"[DEBUG] ➕ Adding reverse sibling: {child_title}", file=sys.stderr) - found_relationships.append({ + _add_relationship({ "hash": child_path, "type": f"alt" if child_type == "alt" else f"sibling ({child_type})", "title": child_title, diff --git a/cmdlets/get_tag.py b/cmdlets/get_tag.py index 5f54a8f..2ece642 100644 --- a/cmdlets/get_tag.py +++ b/cmdlets/get_tag.py @@ -12,8 +12,8 @@ from __future__ import annotations import sys -from helper.logger import log -from helper.metadata_search import get_metadata_provider +from helper.logger import log, debug +from helper.metadata_search import get_metadata_provider, list_metadata_providers import subprocess from pathlib import Path from typing import Any, Dict, List, Optional, Sequence, Tuple @@ -475,6 +475,21 @@ def _extract_scrapable_identifiers(tags_list: List[str]) -> Dict[str, str]: return identifiers +def _extract_tag_value(tags_list: List[str], namespace: str) -> Optional[str]: + """Get first tag value for a namespace (e.g., artist:, title:).""" + ns = namespace.lower() + for tag in tags_list: + if not isinstance(tag, str) or ':' not in tag: + continue + prefix, _, value = tag.partition(':') + if prefix.strip().lower() != ns: + continue + candidate = value.strip() + if candidate: + return candidate + return None + + def _scrape_url_metadata(url: str) -> Tuple[Optional[str], List[str], List[Tuple[str, str]], List[Dict[str, Any]]]: """Scrape metadata from a URL using yt-dlp. @@ -1012,6 +1027,25 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: --emit: Emit result without interactive prompt (quiet mode) -scrape : Scrape metadata from URL or provider name (itunes, openlibrary, googlebooks) """ + args_list = [str(arg) for arg in (args or [])] + raw_args = list(args_list) + + # Support numeric selection tokens (e.g., "@1" leading to argument "1") without treating + # them as hash overrides. This lets users pick from the most recent table overlay/results. + if len(args_list) == 1: + token = args_list[0] + if not token.startswith("-") and token.isdigit(): + try: + idx = int(token) - 1 + items_pool = ctx.get_last_result_items() + if 0 <= idx < len(items_pool): + result = items_pool[idx] + args_list = [] + debug(f"[get_tag] Resolved numeric selection arg {token} -> last_result_items[{idx}]") + else: + debug(f"[get_tag] Numeric selection arg {token} out of range (items={len(items_pool)})") + except Exception as exc: + debug(f"[get_tag] Failed to resolve numeric selection arg {token}: {exc}") # Helper to get field from both dict and object def get_field(obj: Any, field: str, default: Any = None) -> Any: if isinstance(obj, dict): @@ -1020,10 +1054,10 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: return getattr(obj, field, default) # Parse arguments using shared parser - parsed_args = parse_cmdlet_args(args, CMDLET) + parsed_args = parse_cmdlet_args(args_list, CMDLET) # Detect if -scrape flag was provided without a value (parse_cmdlet_args skips missing values) - scrape_flag_present = any(str(arg).lower() in {"-scrape", "--scrape"} for arg in args) + scrape_flag_present = any(str(arg).lower() in {"-scrape", "--scrape"} for arg in args_list) # Extract values hash_override_raw = parsed_args.get("hash") @@ -1033,10 +1067,14 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: scrape_url = parsed_args.get("scrape") scrape_requested = scrape_flag_present or scrape_url is not None + explicit_hash_flag = any(str(arg).lower() in {"-hash", "--hash"} for arg in raw_args) if hash_override_raw is not None: if not hash_override or not looks_like_hash(hash_override): - log("Invalid hash format: expected 64 hex characters", file=sys.stderr) - return 1 + debug(f"[get_tag] Ignoring invalid hash override '{hash_override_raw}' (explicit_flag={explicit_hash_flag})") + if explicit_hash_flag: + log("Invalid hash format: expected 64 hex characters", file=sys.stderr) + return 1 + hash_override = None if scrape_requested and (not scrape_url or str(scrape_url).strip() == ""): log("-scrape requires a URL or provider name", file=sys.stderr) @@ -1085,6 +1123,9 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: identifier_tags = [str(t) for t in tags_from_sidecar if isinstance(t, (str, bytes))] except Exception: pass + + title_from_tags = _extract_tag_value(identifier_tags, "title") + artist_from_tags = _extract_tag_value(identifier_tags, "artist") identifiers = _extract_scrapable_identifiers(identifier_tags) identifier_query: Optional[str] = None @@ -1095,19 +1136,35 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: identifier_query = identifiers.get("musicbrainz") or identifiers.get("musicbrainzalbum") # Determine query from identifier first, else title on the result or filename - title_hint = get_field(result, "title", None) or get_field(result, "name", None) + title_hint = title_from_tags or get_field(result, "title", None) or get_field(result, "name", None) if not title_hint: file_path = get_field(result, "path", None) or get_field(result, "filename", None) if file_path: title_hint = Path(str(file_path)).stem + artist_hint = artist_from_tags or get_field(result, "artist", None) or get_field(result, "uploader", None) + if not artist_hint: + meta_field = get_field(result, "metadata", None) + if isinstance(meta_field, dict): + meta_artist = meta_field.get("artist") or meta_field.get("uploader") + if meta_artist: + artist_hint = str(meta_artist) + + combined_query: Optional[str] = None + if not identifier_query and title_hint and artist_hint and provider.name in {"itunes", "musicbrainz"}: + if provider.name == "musicbrainz": + combined_query = f'recording:"{title_hint}" AND artist:"{artist_hint}"' + else: + combined_query = f"{title_hint} {artist_hint}" - query_hint = identifier_query or title_hint + query_hint = identifier_query or combined_query or title_hint if not query_hint: log("No title or identifier available to search for metadata", file=sys.stderr) return 1 if identifier_query: log(f"Using identifier for metadata search: {identifier_query}") + elif combined_query: + log(f"Using title+artist for metadata search: {title_hint} - {artist_hint}") else: log(f"Using title for metadata search: {query_hint}") @@ -1319,6 +1376,13 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: return 0 +_SCRAPE_CHOICES = [] +try: + _SCRAPE_CHOICES = sorted(list_metadata_providers().keys()) +except Exception: + _SCRAPE_CHOICES = ["itunes", "openlibrary", "googlebooks", "google", "musicbrainz"] + + CMDLET = Cmdlet( name="get-tag", summary="Get tags from Hydrus or local sidecar metadata", @@ -1341,8 +1405,9 @@ CMDLET = Cmdlet( CmdletArg( name="-scrape", type="string", - description="Scrape metadata from URL or provider name (returns tags as JSON or table)", - required=False + description="Scrape metadata from URL or provider name (returns tags as JSON or table)", + required=False, + choices=_SCRAPE_CHOICES, ) ] ) diff --git a/cmdlets/search_file.py b/cmdlets/search_file.py index e570e35..e20fbcb 100644 --- a/cmdlets/search_file.py +++ b/cmdlets/search_file.py @@ -6,6 +6,7 @@ from fnmatch import fnmatchcase from pathlib import Path from dataclasses import dataclass, field from collections import OrderedDict +import re import json import os import sys @@ -135,6 +136,25 @@ class ResultItem: STORAGE_ORIGINS = {"local", "hydrus", "debrid"} +def _normalize_extension(ext_value: Any) -> str: + """Sanitize extension strings to alphanumerics and cap at 5 chars.""" + ext = str(ext_value or "").strip().lstrip(".") + + # Stop at common separators to avoid dragging status text into the extension + for sep in (" ", "|", "(", "[", "{", ",", ";"): + if sep in ext: + ext = ext.split(sep, 1)[0] + break + + # If there are multiple dots, take the last token as the extension + if "." in ext: + ext = ext.split(".")[-1] + + # Keep only alphanumeric characters and enforce max length + ext = "".join(ch for ch in ext if ch.isalnum()) + return ext[:5] + + def _ensure_storage_columns(payload: Dict[str, Any]) -> Dict[str, Any]: """Attach Title/Store columns for storage-origin results to keep CLI display compact.""" origin_value = str(payload.get("origin") or payload.get("source") or "").lower() @@ -145,11 +165,11 @@ def _ensure_storage_columns(payload: Dict[str, Any]) -> Dict[str, Any]: store_label = payload.get("origin") or payload.get("source") or origin_value # Handle extension - extension = payload.get("ext", "") + extension = _normalize_extension(payload.get("ext", "")) if not extension and title: path_obj = Path(str(title)) if path_obj.suffix: - extension = path_obj.suffix.lstrip('.') + extension = _normalize_extension(path_obj.suffix.lstrip('.')) title = path_obj.stem # Handle size as integer MB (header will include units) @@ -175,7 +195,7 @@ def _ensure_storage_columns(payload: Dict[str, Any]) -> Dict[str, Any]: CMDLET = Cmdlet( name="search-file", - summary="Unified search cmdlet for storage (Hydrus, Local) and providers (Debrid, LibGen, OpenLibrary, Soulseek).", + summary="Unified search cmdlet for storage (Hydrus, Local) and providers (Debrid, LibGen, OpenLibrary, Soulseek).", usage="search-file [query] [-tag TAG] [-size >100MB|<50MB] [-type audio|video|image] [-duration >10:00] [-storage BACKEND] [-provider PROVIDER]", args=[ CmdletArg("query", description="Search query string"), @@ -184,11 +204,11 @@ CMDLET = Cmdlet( CmdletArg("type", description="Filter by type: audio, video, image, document"), CmdletArg("duration", description="Filter by duration: >10:00, <1:30:00"), CmdletArg("limit", type="integer", description="Limit results (default: 45)"), - CmdletArg("storage", description="Search storage backend: hydrus, local (default: all searchable storages)"), - CmdletArg("provider", description="Search provider: libgen, openlibrary, soulseek, debrid, local (overrides -storage)"), + CmdletArg("storage", description="Search storage backend: hydrus, local (default: all searchable storages)"), + CmdletArg("provider", description="Search provider: libgen, openlibrary, soulseek, debrid, local (overrides -storage)"), ], details=[ - "Search across storage (Hydrus, Local) and providers (Debrid, LibGen, OpenLibrary, Soulseek)", + "Search across storage (Hydrus, Local) and providers (Debrid, LibGen, OpenLibrary, Soulseek)", "Use -provider to search a specific source, or -storage to search file backends", "Filter results by: tag, size, type, duration", "Results can be piped to other commands", @@ -206,286 +226,306 @@ CMDLET = Cmdlet( @register(["search-file", "search"]) def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: - """Search across multiple providers: Hydrus, Local, Debrid, LibGen, etc.""" - args_list = [str(arg) for arg in (args or [])] - - # Parse arguments - query = "" - tag_filters: List[str] = [] - size_filter: Optional[Tuple[str, int]] = None - duration_filter: Optional[Tuple[str, float]] = None - type_filter: Optional[str] = None - storage_backend: Optional[str] = None - provider_name: Optional[str] = None - limit = 45 - searched_backends: List[str] = [] - - # Simple argument parsing - i = 0 - while i < len(args_list): - arg = args_list[i] - low = arg.lower() - - if low in {"-provider", "--provider"} and i + 1 < len(args_list): - provider_name = args_list[i + 1].lower() - i += 2 - elif low in {"-storage", "--storage"} and i + 1 < len(args_list): - storage_backend = args_list[i + 1].lower() - i += 2 - elif low in {"-tag", "--tag"} and i + 1 < len(args_list): - tag_filters.append(args_list[i + 1]) - i += 2 - elif low in {"-limit", "--limit"} and i + 1 < len(args_list): - try: - limit = int(args_list[i + 1]) - except ValueError: - limit = 100 - i += 2 - elif low in {"-type", "--type"} and i + 1 < len(args_list): - type_filter = args_list[i + 1].lower() - i += 2 - elif not arg.startswith("-"): - if query: - query += " " + arg - else: - query = arg - i += 1 - else: - i += 1 + """Search across multiple providers: Hydrus, Local, Debrid, LibGen, etc.""" + args_list = [str(arg) for arg in (args or [])] + + # Parse arguments + query = "" + tag_filters: List[str] = [] + size_filter: Optional[Tuple[str, int]] = None + duration_filter: Optional[Tuple[str, float]] = None + type_filter: Optional[str] = None + storage_backend: Optional[str] = None + provider_name: Optional[str] = None + limit = 45 + searched_backends: List[str] = [] + + # Simple argument parsing + i = 0 + while i < len(args_list): + arg = args_list[i] + low = arg.lower() + + if low in {"-provider", "--provider"} and i + 1 < len(args_list): + provider_name = args_list[i + 1].lower() + i += 2 + elif low in {"-storage", "--storage"} and i + 1 < len(args_list): + storage_backend = args_list[i + 1].lower() + i += 2 + elif low in {"-tag", "--tag"} and i + 1 < len(args_list): + tag_filters.append(args_list[i + 1]) + i += 2 + elif low in {"-limit", "--limit"} and i + 1 < len(args_list): + try: + limit = int(args_list[i + 1]) + except ValueError: + limit = 100 + i += 2 + elif low in {"-type", "--type"} and i + 1 < len(args_list): + type_filter = args_list[i + 1].lower() + i += 2 + elif not arg.startswith("-"): + if query: + query += " " + arg + else: + query = arg + i += 1 + else: + i += 1 - # Debrid is provider-only now - if storage_backend and storage_backend.lower() == "debrid": - log("Use -provider debrid instead of -storage debrid (debrid is provider-only)", file=sys.stderr) - return 1 - - # Handle piped input (e.g. from @N selection) if query is empty - if not query and result: - # If result is a list, take the first item - actual_result = result[0] if isinstance(result, list) and result else result - - # Helper to get field - def get_field(obj: Any, field: str) -> Any: - return getattr(obj, field, None) or (obj.get(field) if isinstance(obj, dict) else None) - - origin = get_field(actual_result, 'origin') - target = get_field(actual_result, 'target') - - # Special handling for Bandcamp artist/album drill-down - if origin == 'bandcamp' and target: - query = target - if not provider_name: - provider_name = 'bandcamp' - - # Generic URL handling - elif target and str(target).startswith(('http://', 'https://')): - query = target - # Try to infer provider from URL if not set - if not provider_name: - if 'bandcamp.com' in target: - provider_name = 'bandcamp' - elif 'youtube.com' in target or 'youtu.be' in target: - provider_name = 'youtube' + # Extract store: filter tokens (works with commas or whitespace) and clean query for backends + store_filter: Optional[str] = None + if query: + match = re.search(r"\bstore:([^\s,]+)", query, flags=re.IGNORECASE) + if match: + store_filter = match.group(1).strip().lower() or None + # Remove any store: tokens so downstream backends see only the actual query + query = re.sub(r"\s*[,]?\s*store:[^\s,]+", " ", query, flags=re.IGNORECASE) + query = re.sub(r"\s{2,}", " ", query) + query = query.strip().strip(',') - if not query: - log("Provide a search query", file=sys.stderr) - return 1 - - # Initialize worker for this search command - from helper.local_library import LocalLibraryDB - from config import get_local_storage_path - import uuid - worker_id = str(uuid.uuid4()) - library_root = get_local_storage_path(config or {}) - if not library_root: - log("No library root configured", file=sys.stderr) - return 1 - - db = None - try: - db = LocalLibraryDB(library_root) - db.insert_worker( - worker_id, - "search", - title=f"Search: {query}", - description=f"Query: {query}", - pipe=ctx.get_current_command_text() - ) - - results_list = [] - import result_table - import importlib - importlib.reload(result_table) - from result_table import ResultTable - - # Create ResultTable for display - table_title = f"Search: {query}" - if provider_name: - table_title += f" [{provider_name}]" - elif storage_backend: - table_title += f" [{storage_backend}]" - - table = ResultTable(table_title) - table.set_source_command("search-file", args_list) - - # Try to search using provider (libgen, soulseek, debrid, openlibrary) - if provider_name: - debug(f"[search_file] Attempting provider search with: {provider_name}") - provider = get_provider(provider_name, config) - if not provider: - log(f"Provider '{provider_name}' not available", file=sys.stderr) - db.update_worker_status(worker_id, 'error') - return 1 - - debug(f"[search_file] Provider loaded, calling search with query: {query}") - search_result = provider.search(query, limit=limit) - debug(f"[search_file] Provider search returned {len(search_result)} results") - - for item in search_result: - # Add to table - table.add_result(item) - - # Emit to pipeline - item_dict = item.to_dict() - results_list.append(item_dict) - ctx.emit(item_dict) - - # Set the result table in context for TUI/CLI display - ctx.set_last_result_table(table, results_list) - - debug(f"[search_file] Emitted {len(results_list)} results") - - # Write results to worker stdout - db.append_worker_stdout(worker_id, json.dumps(results_list, indent=2)) - db.update_worker_status(worker_id, 'completed') - return 0 - - # Otherwise search using storage backends (Hydrus, Local) - from helper.file_storage import FileStorage - storage = FileStorage(config=config or {}) - - backend_to_search = storage_backend or None - if backend_to_search: - # Check if requested backend is available - if backend_to_search == "hydrus": - from helper.hydrus import is_hydrus_available - if not is_hydrus_available(config or {}): - log(f"Backend 'hydrus' is not available (Hydrus service not running)", file=sys.stderr) - db.update_worker_status(worker_id, 'error') - return 1 - searched_backends.append(backend_to_search) - if not storage.supports_search(backend_to_search): - log(f"Backend '{backend_to_search}' does not support searching", file=sys.stderr) - db.update_worker_status(worker_id, 'error') - return 1 - results = storage[backend_to_search].search(query, limit=limit) - else: - # Search all searchable backends, but skip hydrus if unavailable - from helper.hydrus import is_hydrus_available - hydrus_available = is_hydrus_available(config or {}) - - all_results = [] - for backend_name in storage.list_searchable_backends(): - # Skip hydrus if not available - if backend_name == "hydrus" and not hydrus_available: - continue - searched_backends.append(backend_name) - try: - backend_results = storage[backend_name].search(query, limit=limit - len(all_results)) - if backend_results: - all_results.extend(backend_results) - if len(all_results) >= limit: - break - except Exception as exc: - log(f"Backend {backend_name} search failed: {exc}", file=sys.stderr) - results = all_results[:limit] + # Debrid is provider-only now + if storage_backend and storage_backend.lower() == "debrid": + log("Use -provider debrid instead of -storage debrid (debrid is provider-only)", file=sys.stderr) + return 1 - # Also query Debrid provider by default (provider-only, but keep legacy coverage when no explicit provider given) - if not provider_name and not storage_backend: - try: - debrid_provider = get_provider("debrid", config) - if debrid_provider and debrid_provider.validate(): - remaining = max(0, limit - len(results)) if isinstance(results, list) else limit - if remaining > 0: - debrid_results = debrid_provider.search(query, limit=remaining) - if debrid_results: - if "debrid" not in searched_backends: - searched_backends.append("debrid") - if results is None: - results = [] - results.extend(debrid_results) - except Exception as exc: - log(f"Debrid provider search failed: {exc}", file=sys.stderr) + # If store: was provided without explicit -storage/-provider, prefer that backend + if store_filter and not provider_name and not storage_backend: + if store_filter in {"hydrus", "local", "debrid"}: + storage_backend = store_filter + + # Handle piped input (e.g. from @N selection) if query is empty + if not query and result: + # If result is a list, take the first item + actual_result = result[0] if isinstance(result, list) and result else result + + # Helper to get field + def get_field(obj: Any, field: str) -> Any: + return getattr(obj, field, None) or (obj.get(field) if isinstance(obj, dict) else None) + + origin = get_field(actual_result, 'origin') + target = get_field(actual_result, 'target') + + # Special handling for Bandcamp artist/album drill-down + if origin == 'bandcamp' and target: + query = target + if not provider_name: + provider_name = 'bandcamp' + + # Generic URL handling + elif target and str(target).startswith(('http://', 'https://')): + query = target + # Try to infer provider from URL if not set + if not provider_name: + if 'bandcamp.com' in target: + provider_name = 'bandcamp' + elif 'youtube.com' in target or 'youtu.be' in target: + provider_name = 'youtube' - def _format_storage_label(name: str) -> str: - clean = str(name or "").strip() - if not clean: - return "Unknown" - return clean.replace("_", " ").title() + if not query: + log("Provide a search query", file=sys.stderr) + return 1 + + # Initialize worker for this search command + from helper.local_library import LocalLibraryDB + from config import get_local_storage_path + import uuid + worker_id = str(uuid.uuid4()) + library_root = get_local_storage_path(config or {}) + if not library_root: + log("No library root configured", file=sys.stderr) + return 1 + + db = None + try: + db = LocalLibraryDB(library_root) + db.insert_worker( + worker_id, + "search", + title=f"Search: {query}", + description=f"Query: {query}", + pipe=ctx.get_current_command_text() + ) + + results_list = [] + import result_table + import importlib + importlib.reload(result_table) + from result_table import ResultTable + + # Create ResultTable for display + table_title = f"Search: {query}" + if provider_name: + table_title += f" [{provider_name}]" + elif storage_backend: + table_title += f" [{storage_backend}]" + + table = ResultTable(table_title) + table.set_source_command("search-file", args_list) + + # Try to search using provider (libgen, soulseek, debrid, openlibrary) + if provider_name: + debug(f"[search_file] Attempting provider search with: {provider_name}") + provider = get_provider(provider_name, config) + if not provider: + log(f"Provider '{provider_name}' not available", file=sys.stderr) + db.update_worker_status(worker_id, 'error') + return 1 + + debug(f"[search_file] Provider loaded, calling search with query: {query}") + search_result = provider.search(query, limit=limit) + debug(f"[search_file] Provider search returned {len(search_result)} results") + + for item in search_result: + # Add to table + table.add_result(item) + + # Emit to pipeline + item_dict = item.to_dict() + results_list.append(item_dict) + ctx.emit(item_dict) + + # Set the result table in context for TUI/CLI display + ctx.set_last_result_table(table, results_list) + + debug(f"[search_file] Emitted {len(results_list)} results") + + # Write results to worker stdout + db.append_worker_stdout(worker_id, json.dumps(results_list, indent=2)) + db.update_worker_status(worker_id, 'completed') + return 0 + + # Otherwise search using storage backends (Hydrus, Local) + from helper.file_storage import FileStorage + storage = FileStorage(config=config or {}) + + backend_to_search = storage_backend or None + if backend_to_search: + # Check if requested backend is available + if backend_to_search == "hydrus": + from helper.hydrus import is_hydrus_available + if not is_hydrus_available(config or {}): + log(f"Backend 'hydrus' is not available (Hydrus service not running)", file=sys.stderr) + db.update_worker_status(worker_id, 'error') + return 1 + searched_backends.append(backend_to_search) + if not storage.supports_search(backend_to_search): + log(f"Backend '{backend_to_search}' does not support searching", file=sys.stderr) + db.update_worker_status(worker_id, 'error') + return 1 + results = storage[backend_to_search].search(query, limit=limit) + else: + # Search all searchable backends, but skip hydrus if unavailable + from helper.hydrus import is_hydrus_available + hydrus_available = is_hydrus_available(config or {}) + + all_results = [] + for backend_name in storage.list_searchable_backends(): + # Skip hydrus if not available + if backend_name == "hydrus" and not hydrus_available: + continue + searched_backends.append(backend_name) + try: + backend_results = storage[backend_name].search(query, limit=limit - len(all_results)) + if backend_results: + all_results.extend(backend_results) + if len(all_results) >= limit: + break + except Exception as exc: + log(f"Backend {backend_name} search failed: {exc}", file=sys.stderr) + results = all_results[:limit] - storage_counts: OrderedDict[str, int] = OrderedDict((name, 0) for name in searched_backends) - for item in results or []: - origin = getattr(item, 'origin', None) - if origin is None and isinstance(item, dict): - origin = item.get('origin') or item.get('source') - if not origin: - continue - key = str(origin).lower() - if key not in storage_counts: - storage_counts[key] = 0 - storage_counts[key] += 1 + # Also query Debrid provider by default (provider-only, but keep legacy coverage when no explicit provider given) + if not provider_name and not storage_backend: + try: + debrid_provider = get_provider("debrid", config) + if debrid_provider and debrid_provider.validate(): + remaining = max(0, limit - len(results)) if isinstance(results, list) else limit + if remaining > 0: + debrid_results = debrid_provider.search(query, limit=remaining) + if debrid_results: + if "debrid" not in searched_backends: + searched_backends.append("debrid") + if results is None: + results = [] + results.extend(debrid_results) + except Exception as exc: + log(f"Debrid provider search failed: {exc}", file=sys.stderr) - if storage_counts or query: - display_counts = OrderedDict((_format_storage_label(name), count) for name, count in storage_counts.items()) - summary_line = table.set_storage_summary(display_counts, query, inline=True) - if summary_line: - table.title = summary_line - - # Emit results and collect for workers table - if results: - for item in results: - def _as_dict(obj: Any) -> Dict[str, Any]: - if isinstance(obj, dict): - return dict(obj) - if hasattr(obj, "to_dict") and callable(getattr(obj, "to_dict")): - return obj.to_dict() # type: ignore[arg-type] - return {"title": str(obj)} + def _format_storage_label(name: str) -> str: + clean = str(name or "").strip() + if not clean: + return "Unknown" + return clean.replace("_", " ").title() - item_dict = _as_dict(item) - normalized = _ensure_storage_columns(item_dict) - # Add to table using normalized columns to avoid extra fields (e.g., Tags/Name) - table.add_result(normalized) + storage_counts: OrderedDict[str, int] = OrderedDict((name, 0) for name in searched_backends) + for item in results or []: + origin = getattr(item, 'origin', None) + if origin is None and isinstance(item, dict): + origin = item.get('origin') or item.get('source') + if not origin: + continue + key = str(origin).lower() + if key not in storage_counts: + storage_counts[key] = 0 + storage_counts[key] += 1 - results_list.append(normalized) - ctx.emit(normalized) - - # Set the result table in context for TUI/CLI display - ctx.set_last_result_table(table, results_list) - - # Write results to worker stdout - db.append_worker_stdout(worker_id, json.dumps(results_list, indent=2)) - else: - log("No results found", file=sys.stderr) - db.append_worker_stdout(worker_id, json.dumps([], indent=2)) - - db.update_worker_status(worker_id, 'completed') - return 0 - - except Exception as exc: - log(f"Search failed: {exc}", file=sys.stderr) - import traceback - traceback.print_exc(file=sys.stderr) - if db: - try: - db.update_worker_status(worker_id, 'error') - except Exception: - pass - return 1 - - finally: - # Always close the database connection - if db: - try: - db.close() - except Exception: - pass + if storage_counts or query: + display_counts = OrderedDict((_format_storage_label(name), count) for name, count in storage_counts.items()) + summary_line = table.set_storage_summary(display_counts, query, inline=True) + if summary_line: + table.title = summary_line + + # Emit results and collect for workers table + if results: + for item in results: + def _as_dict(obj: Any) -> Dict[str, Any]: + if isinstance(obj, dict): + return dict(obj) + if hasattr(obj, "to_dict") and callable(getattr(obj, "to_dict")): + return obj.to_dict() # type: ignore[arg-type] + return {"title": str(obj)} + + item_dict = _as_dict(item) + if store_filter: + origin_val = str(item_dict.get("origin") or item_dict.get("source") or "").lower() + if store_filter != origin_val: + continue + normalized = _ensure_storage_columns(item_dict) + # Add to table using normalized columns to avoid extra fields (e.g., Tags/Name) + table.add_result(normalized) + + results_list.append(normalized) + ctx.emit(normalized) + + # Set the result table in context for TUI/CLI display + ctx.set_last_result_table(table, results_list) + + # Write results to worker stdout + db.append_worker_stdout(worker_id, json.dumps(results_list, indent=2)) + else: + log("No results found", file=sys.stderr) + db.append_worker_stdout(worker_id, json.dumps([], indent=2)) + + db.update_worker_status(worker_id, 'completed') + return 0 + + except Exception as exc: + log(f"Search failed: {exc}", file=sys.stderr) + import traceback + traceback.print_exc(file=sys.stderr) + if db: + try: + db.update_worker_status(worker_id, 'error') + except Exception: + pass + return 1 + + finally: + # Always close the database connection + if db: + try: + db.close() + except Exception: + pass diff --git a/cmdnats/pipe.py b/cmdnats/pipe.py index 76e240e..b9a6cfc 100644 --- a/cmdnats/pipe.py +++ b/cmdnats/pipe.py @@ -5,7 +5,7 @@ import platform import socket import re import subprocess -from urllib.parse import urlparse +from urllib.parse import urlparse, parse_qs from pathlib import Path from cmdlets._shared import Cmdlet, CmdletArg, parse_cmdlet_args from helper.logger import log, debug @@ -87,6 +87,37 @@ def _extract_target_from_memory_uri(text: str) -> Optional[str]: return None +def _normalize_playlist_target(text: Optional[str]) -> Optional[str]: + """Normalize playlist entry targets for dedupe comparisons.""" + if not text: + return None + real = _extract_target_from_memory_uri(text) or text + real = real.strip() + if not real: + return None + # If it's already a bare hydrus hash, use it directly + lower_real = real.lower() + if re.fullmatch(r"[0-9a-f]{64}", lower_real): + return lower_real + + # If it's a hydrus file URL, normalize to the hash for dedupe + try: + parsed = urlparse(real) + if parsed.scheme in {"http", "https", "hydrus"}: + if parsed.path.endswith("/get_files/file"): + qs = parse_qs(parsed.query) + h = qs.get("hash", [None])[0] + if h and re.fullmatch(r"[0-9a-f]{64}", h.lower()): + return h.lower() + except Exception: + pass + + # Normalize slashes for Windows paths and lowercase for comparison + real = real.replace('\\', '\\') + real = real.replace('\\', '\\') + return real.lower() + + def _infer_store_from_playlist_item(item: Dict[str, Any]) -> str: """Infer a friendly store label from an MPV playlist entry.""" name = item.get("filename") if isinstance(item, dict) else None @@ -97,6 +128,10 @@ def _infer_store_from_playlist_item(item: Dict[str, Any]) -> str: if memory_target: target = memory_target + # Hydrus hashes: bare 64-hex entries + if re.fullmatch(r"[0-9a-f]{64}", target.lower()): + return "hydrus" + lower = target.lower() if lower.startswith("magnet:"): return "magnet" @@ -245,31 +280,36 @@ def _monitor_mpv_logs(duration: float = 3.0) -> None: # Request log messages client.send_command({"command": ["request_log_messages", "warn"]}) + # On Windows named pipes, avoid blocking the CLI; skip log read entirely + if client.is_windows: + client.disconnect() + return + import time start_time = time.time() + + # Unix sockets already have timeouts set; read until duration expires while time.time() - start_time < duration: - # We need to read raw lines from the socket - if client.is_windows: - try: - line = client.sock.readline() - if line: - try: - msg = json.loads(line) - if msg.get("event") == "log-message": - text = msg.get("text", "").strip() - prefix = msg.get("prefix", "") - level = msg.get("level", "") - if "ytdl" in prefix or level == "error": - debug(f"[MPV {prefix}] {text}", file=sys.stderr) - except json.JSONDecodeError: - pass - except Exception: - break - else: - # Unix socket handling (simplified) + try: + chunk = client.sock.recv(4096) + except socket.timeout: + continue + except Exception: break - time.sleep(0.05) - + if not chunk: + break + for line in chunk.decode("utf-8", errors="ignore").splitlines(): + try: + msg = json.loads(line) + if msg.get("event") == "log-message": + text = msg.get("text", "").strip() + prefix = msg.get("prefix", "") + level = msg.get("level", "") + if "ytdl" in prefix or level == "error": + debug(f"[MPV {prefix}] {text}", file=sys.stderr) + except json.JSONDecodeError: + continue + client.disconnect() except Exception: pass @@ -294,6 +334,31 @@ def _queue_items(items: List[Any], clear_first: bool = False, config: Optional[D except Exception: hydrus_url = None + # Dedupe existing playlist before adding more (unless we're replacing it) + existing_targets: set[str] = set() + if not clear_first: + playlist = _get_playlist(silent=True) or [] + dup_indexes: List[int] = [] + for idx, pl_item in enumerate(playlist): + fname = pl_item.get("filename") if isinstance(pl_item, dict) else str(pl_item) + alt = pl_item.get("playlist-path") if isinstance(pl_item, dict) else None + norm = _normalize_playlist_target(fname) or _normalize_playlist_target(alt) + if not norm: + continue + if norm in existing_targets: + dup_indexes.append(idx) + else: + existing_targets.add(norm) + + # Remove duplicates from playlist starting from the end to keep indices valid + for idx in reversed(dup_indexes): + try: + _send_ipc_command({"command": ["playlist-remove", idx], "request_id": 106}, silent=True) + except Exception: + pass + + new_targets: set[str] = set() + for i, item in enumerate(items): # Extract URL/Path target = None @@ -309,6 +374,16 @@ def _queue_items(items: List[Any], clear_first: bool = False, config: Optional[D target = item if target: + # If we just have a hydrus hash, build a direct file URL for MPV + if re.fullmatch(r"[0-9a-f]{64}", str(target).strip().lower()) and hydrus_url: + target = f"{hydrus_url.rstrip('/')}/get_files/file?hash={str(target).strip()}" + + norm_key = _normalize_playlist_target(target) or str(target).strip().lower() + if norm_key in existing_targets or norm_key in new_targets: + debug(f"Skipping duplicate playlist entry: {title or target}") + continue + new_targets.add(norm_key) + # Check if it's a yt-dlp supported URL is_ytdlp = False if target.startswith("http") and is_url_supported_by_ytdlp(target): @@ -699,7 +774,11 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: # Monitor logs briefly for errors (e.g. ytdl failures) _monitor_mpv_logs(3.0) - return 0 + + # Refresh playlist view so the user sees the new current item immediately + items = _get_playlist(silent=True) or items + list_mode = True + index_arg = None else: debug(f"Failed to play item: {resp.get('error') if resp else 'No response'}") return 1 diff --git a/cmdnats/worker.py b/cmdnats/worker.py index 3a1287c..98d88cc 100644 --- a/cmdnats/worker.py +++ b/cmdnats/worker.py @@ -1,10 +1,11 @@ """Worker cmdlet: Display workers table in ResultTable format.""" from __future__ import annotations -from typing import Any, Dict, Sequence, List import json import sys +from dataclasses import dataclass from datetime import datetime, timezone +from typing import Any, Dict, Sequence, List from cmdlets import register from cmdlets._shared import Cmdlet, CmdletArg @@ -12,6 +13,9 @@ import pipeline as ctx from helper.logger import log from config import get_local_storage_path +DEFAULT_LIMIT = 100 +WORKER_STATUS_FILTERS = {"running", "completed", "error", "cancelled"} +HELP_FLAGS = {"-?", "/?", "--help", "-h", "help", "--cmdlet"} CMDLET = Cmdlet( name=".worker", @@ -21,6 +25,8 @@ CMDLET = Cmdlet( CmdletArg("status", description="Filter by status: running, completed, error (default: all)"), CmdletArg("limit", type="integer", description="Limit results (default: 100)"), CmdletArg("@N", description="Select worker by index (1-based) and display full logs"), + CmdletArg("-id", description="Show full logs for a specific worker"), + CmdletArg("-clear", type="flag", description="Remove completed workers from the database"), ], details=[ "- Shows all background worker tasks and their output", @@ -37,284 +43,285 @@ CMDLET = Cmdlet( ) +def _has_help_flag(args_list: Sequence[str]) -> bool: + return any(str(arg).lower() in HELP_FLAGS for arg in args_list) + + +@dataclass +class WorkerCommandOptions: + status: str | None = None + limit: int = DEFAULT_LIMIT + worker_id: str | None = None + clear: bool = False + + @register([".worker", "worker", "workers"]) def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: - """Display workers table or show detailed logs for a specific worker.""" - args_list = [str(arg) for arg in (args or [])] - selection_indices = ctx.get_last_selection() - selection_requested = bool(selection_indices) and isinstance(result, list) and len(result) > 0 + """Display workers table or show detailed logs for a specific worker.""" + args_list = [str(arg) for arg in (args or [])] + selection_indices = ctx.get_last_selection() + selection_requested = bool(selection_indices) and isinstance(result, list) and len(result) > 0 - # Parse arguments for list view - status_filter: str | None = None - limit = 100 - clear_requested = False - worker_id_arg: str | None = None - i = 0 - while i < len(args_list): - arg = args_list[i] - low = arg.lower() - if low in {"-limit", "--limit"} and i + 1 < len(args_list): - try: - limit = max(1, int(args_list[i + 1])) - except ValueError: - limit = 100 - i += 2 - elif low in {"-id", "--id"} and i + 1 < len(args_list): - worker_id_arg = args_list[i + 1] - i += 2 - elif low in {"-clear", "--clear"}: - clear_requested = True - i += 1 - elif low in {"running", "completed", "error", "cancelled"}: - status_filter = low - i += 1 - elif not arg.startswith("-"): - status_filter = low - i += 1 - else: - i += 1 + if _has_help_flag(args_list): + log(json.dumps(CMDLET, ensure_ascii=False, indent=2)) + return 0 - try: - if any(str(a).lower() in {"-?", "/?", "--help", "-h", "help", "--cmdlet"} for a in args): - log(json.dumps(CMDLET, ensure_ascii=False, indent=2)) - return 0 - except Exception: - pass + options = _parse_worker_args(args_list) - library_root = get_local_storage_path(config or {}) - if not library_root: - log("No library root configured", file=sys.stderr) - return 1 + library_root = get_local_storage_path(config or {}) + if not library_root: + log("No library root configured", file=sys.stderr) + return 1 - try: - from helper.local_library import LocalLibraryDB - with LocalLibraryDB(library_root) as db: - if clear_requested: - count = db.clear_finished_workers() - log(f"Cleared {count} finished workers.") - return 0 - - if worker_id_arg: - worker = db.get_worker(worker_id_arg) - if worker: - events = [] - try: - wid = worker.get("worker_id") - if wid and hasattr(db, "get_worker_events"): - events = db.get_worker_events(wid) - except Exception: - pass - _emit_worker_detail(worker, events) - return 0 - else: - log(f"Worker not found: {worker_id_arg}", file=sys.stderr) - return 1 + try: + from helper.local_library import LocalLibraryDB - if selection_requested: - return _render_worker_selection(db, result) - return _render_worker_list(db, status_filter, limit) - except Exception as exc: - log(f"Workers query failed: {exc}", file=sys.stderr) - import traceback - traceback.print_exc(file=sys.stderr) - return 1 + with LocalLibraryDB(library_root) as db: + if options.clear: + count = db.clear_finished_workers() + log(f"Cleared {count} finished workers.") + return 0 + + if options.worker_id: + worker = db.get_worker(options.worker_id) + if worker: + events: List[Dict[str, Any]] = [] + try: + wid = worker.get("worker_id") + if wid and hasattr(db, "get_worker_events"): + events = db.get_worker_events(wid) + except Exception: + pass + _emit_worker_detail(worker, events) + return 0 + log(f"Worker not found: {options.worker_id}", file=sys.stderr) + return 1 + + if selection_requested: + return _render_worker_selection(db, result) + + return _render_worker_list(db, options.status, options.limit) + except Exception as exc: + log(f"Workers query failed: {exc}", file=sys.stderr) + import traceback + + traceback.print_exc(file=sys.stderr) + return 1 + + +def _parse_worker_args(args_list: Sequence[str]) -> WorkerCommandOptions: + options = WorkerCommandOptions() + i = 0 + while i < len(args_list): + arg = args_list[i] + low = arg.lower() + if low in {"-limit", "--limit"} and i + 1 < len(args_list): + options.limit = _normalize_limit(args_list[i + 1]) + i += 2 + elif low in {"-id", "--id"} and i + 1 < len(args_list): + options.worker_id = args_list[i + 1] + i += 2 + elif low in {"-clear", "--clear"}: + options.clear = True + i += 1 + elif low in {"-status", "--status"} and i + 1 < len(args_list): + options.status = args_list[i + 1].lower() + i += 2 + elif low in WORKER_STATUS_FILTERS: + options.status = low + i += 1 + elif not arg.startswith("-"): + options.status = low + i += 1 + else: + i += 1 + return options + + +def _normalize_limit(value: Any) -> int: + try: + return max(1, int(value)) + except (TypeError, ValueError): + return DEFAULT_LIMIT def _render_worker_list(db, status_filter: str | None, limit: int) -> int: - workers = db.get_all_workers(limit=limit) - if status_filter: - workers = [w for w in workers if str(w.get("status", "")).lower() == status_filter] + workers = db.get_all_workers(limit=limit) + if status_filter: + workers = [w for w in workers if str(w.get("status", "")).lower() == status_filter] - if not workers: - log("No workers found", file=sys.stderr) - return 0 + if not workers: + log("No workers found", file=sys.stderr) + return 0 - for worker in workers: - started = worker.get("started_at", "") - ended = worker.get("completed_at", worker.get("last_updated", "")) - - date_str = _extract_date(started) - start_time = _format_event_timestamp(started) - end_time = _format_event_timestamp(ended) - - item = { - "columns": [ - ("Status", worker.get("status", "")), - ("Pipe", _summarize_pipe(worker.get("pipe"))), - ("Date", date_str), - ("Start Time", start_time), - ("End Time", end_time), - ], - "__worker_metadata": worker, - "_selection_args": ["-id", worker.get("worker_id")] - } - ctx.emit(item) - return 0 + for worker in workers: + started = worker.get("started_at", "") + ended = worker.get("completed_at", worker.get("last_updated", "")) + + date_str = _extract_date(started) + start_time = _format_event_timestamp(started) + end_time = _format_event_timestamp(ended) + + item = { + "columns": [ + ("Status", worker.get("status", "")), + ("Pipe", _summarize_pipe(worker.get("pipe"))), + ("Date", date_str), + ("Start Time", start_time), + ("End Time", end_time), + ], + "__worker_metadata": worker, + "_selection_args": ["-id", worker.get("worker_id")], + } + ctx.emit(item) + return 0 def _render_worker_selection(db, selected_items: Any) -> int: - if not isinstance(selected_items, list): - log("Selection payload missing", file=sys.stderr) - return 1 + if not isinstance(selected_items, list): + log("Selection payload missing", file=sys.stderr) + return 1 - emitted = False - for item in selected_items: - worker = _resolve_worker_record(db, item) - if not worker: - continue - events = [] - try: - events = db.get_worker_events(worker.get("worker_id")) if hasattr(db, "get_worker_events") else [] - except Exception: - events = [] - _emit_worker_detail(worker, events) - emitted = True - if not emitted: - log("Selected rows no longer exist", file=sys.stderr) - return 1 - return 0 + emitted = False + for item in selected_items: + worker = _resolve_worker_record(db, item) + if not worker: + continue + events: List[Dict[str, Any]] = [] + try: + events = db.get_worker_events(worker.get("worker_id")) if hasattr(db, "get_worker_events") else [] + except Exception: + events = [] + _emit_worker_detail(worker, events) + emitted = True + if not emitted: + log("Selected rows no longer exist", file=sys.stderr) + return 1 + return 0 def _resolve_worker_record(db, payload: Any) -> Dict[str, Any] | None: - if not isinstance(payload, dict): - return None - worker_data = payload.get("__worker_metadata") - worker_id = None - if isinstance(worker_data, dict): - worker_id = worker_data.get("worker_id") - else: - worker_id = payload.get("worker_id") - worker_data = None - if worker_id: - fresh = db.get_worker(worker_id) - if fresh: - return fresh - return worker_data if isinstance(worker_data, dict) else None + if not isinstance(payload, dict): + return None + worker_data = payload.get("__worker_metadata") + worker_id = None + if isinstance(worker_data, dict): + worker_id = worker_data.get("worker_id") + else: + worker_id = payload.get("worker_id") + worker_data = None + if worker_id: + fresh = db.get_worker(worker_id) + if fresh: + return fresh + return worker_data if isinstance(worker_data, dict) else None def _emit_worker_detail(worker: Dict[str, Any], events: List[Dict[str, Any]]) -> None: - # Parse stdout logs into rows - stdout_content = worker.get("stdout", "") or "" - - # Try to parse lines if they follow the standard log format - # Format: YYYY-MM-DD HH:MM:SS - name - level - message - lines = stdout_content.splitlines() - - for line in lines: - line = line.strip() - if not line: - continue - - # Default values - timestamp = "" - level = "INFO" - message = line - - # Try to parse standard format - try: - parts = line.split(" - ", 3) - if len(parts) >= 4: - # Full format - ts_str, _, lvl, msg = parts - timestamp = _format_event_timestamp(ts_str) - level = lvl - message = msg - elif len(parts) == 3: - # Missing name or level - ts_str, lvl, msg = parts - timestamp = _format_event_timestamp(ts_str) - level = lvl - message = msg - except Exception: - pass - - item = { - "columns": [ - ("Time", timestamp), - ("Level", level), - ("Message", message) - ] - } - ctx.emit(item) - - # Also emit events if available and not redundant - # (For now, just focusing on stdout logs as requested) + stdout_content = worker.get("stdout", "") or "" + + lines = stdout_content.splitlines() + + for line in lines: + line = line.strip() + if not line: + continue + + timestamp = "" + level = "INFO" + message = line + + try: + parts = line.split(" - ", 3) + if len(parts) >= 4: + ts_str, _, lvl, msg = parts + timestamp = _format_event_timestamp(ts_str) + level = lvl + message = msg + elif len(parts) == 3: + ts_str, lvl, msg = parts + timestamp = _format_event_timestamp(ts_str) + level = lvl + message = msg + except Exception: + pass + + item = { + "columns": [ + ("Time", timestamp), + ("Level", level), + ("Message", message), + ] + } + ctx.emit(item) + + # Events are already always derived from stdout for now. def _summarize_pipe(pipe_value: Any, limit: int = 60) -> str: - text = str(pipe_value or "").strip() - if not text: - return "(none)" - return text if len(text) <= limit else text[: limit - 3] + "..." + text = str(pipe_value or "").strip() + if not text: + return "(none)" + return text if len(text) <= limit else text[: limit - 3] + "..." def _format_event_timestamp(raw_timestamp: Any) -> str: - dt = _parse_to_local(raw_timestamp) - if dt: - return dt.strftime("%H:%M:%S") + dt = _parse_to_local(raw_timestamp) + if dt: + return dt.strftime("%H:%M:%S") - if not raw_timestamp: - return "--:--:--" - text = str(raw_timestamp) - if "T" in text: - time_part = text.split("T", 1)[1] - elif " " in text: - time_part = text.split(" ", 1)[1] - else: - time_part = text - return time_part[:8] if len(time_part) >= 8 else time_part + if not raw_timestamp: + return "--:--:--" + text = str(raw_timestamp) + if "T" in text: + time_part = text.split("T", 1)[1] + elif " " in text: + time_part = text.split(" ", 1)[1] + else: + time_part = text + return time_part[:8] if len(time_part) >= 8 else time_part def _parse_to_local(timestamp_str: Any) -> datetime | None: - if not timestamp_str: - return None - text = str(timestamp_str).strip() - if not text: - return None + if not timestamp_str: + return None + text = str(timestamp_str).strip() + if not text: + return None - try: - # Check for T separator (Python isoformat - Local time) - if 'T' in text: - return datetime.fromisoformat(text) - - # Check for space separator (SQLite CURRENT_TIMESTAMP - UTC) - # Format: YYYY-MM-DD HH:MM:SS - if ' ' in text: - # Assume UTC - dt = datetime.strptime(text, "%Y-%m-%d %H:%M:%S") - dt = dt.replace(tzinfo=timezone.utc) - return dt.astimezone() # Convert to local - - except Exception: - pass - - return None + try: + if "T" in text: + return datetime.fromisoformat(text) + if " " in text: + dt = datetime.strptime(text, "%Y-%m-%d %H:%M:%S") + dt = dt.replace(tzinfo=timezone.utc) + return dt.astimezone() + except Exception: + pass + return None def _extract_date(raw_timestamp: Any) -> str: - dt = _parse_to_local(raw_timestamp) - if dt: - return dt.strftime("%m-%d-%y") - - # Fallback - if not raw_timestamp: - return "" - text = str(raw_timestamp) - # Extract YYYY-MM-DD part - date_part = "" - if "T" in text: - date_part = text.split("T", 1)[0] - elif " " in text: - date_part = text.split(" ", 1)[0] - else: - date_part = text - - # Convert YYYY-MM-DD to MM-DD-YY - try: - parts = date_part.split("-") - if len(parts) == 3: - year, month, day = parts - return f"{month}-{day}-{year[2:]}" - except Exception: - pass - return date_part \ No newline at end of file + dt = _parse_to_local(raw_timestamp) + if dt: + return dt.strftime("%m-%d-%y") + + if not raw_timestamp: + return "" + text = str(raw_timestamp) + date_part = "" + if "T" in text: + date_part = text.split("T", 1)[0] + elif " " in text: + date_part = text.split(" ", 1)[0] + else: + date_part = text + + try: + parts = date_part.split("-") + if len(parts) == 3: + year, month, day = parts + return f"{month}-{day}-{year[2:]}" + except Exception: + pass + return date_part diff --git a/helper/download.py b/helper/download.py index cc4acda..29c05e5 100644 --- a/helper/download.py +++ b/helper/download.py @@ -20,7 +20,7 @@ import time import traceback from pathlib import Path from typing import Any, Dict, Iterator, List, Optional -from urllib.parse import urljoin +from urllib.parse import urljoin, urlparse import httpx @@ -62,14 +62,11 @@ def _progress_callback(status: Dict[str, Any]) -> None: percent = status.get("_percent_str", "?") speed = status.get("_speed_str", "?") eta = status.get("_eta_str", "?") - # Print progress to stdout with carriage return to update in place sys.stdout.write(f"\r[download] {percent} at {speed} ETA {eta} ") sys.stdout.flush() elif event == "finished": - # Clear the progress line sys.stdout.write("\r" + " " * 70 + "\r") sys.stdout.flush() - # Log finished message (visible) debug(f"✓ Download finished: {status.get('filename')}") elif event in ("postprocessing", "processing"): debug(f"Post-processing: {status.get('postprocessor')}") @@ -99,17 +96,7 @@ def is_url_supported_by_ytdlp(url: str) -> bool: def list_formats(url: str, no_playlist: bool = False, playlist_items: Optional[str] = None) -> Optional[List[Dict[str, Any]]]: - """Get list of available formats for a URL using yt-dlp. - - Args: - url: URL to get formats for - no_playlist: If True, ignore playlists and list formats for single video - playlist_items: If specified, only list formats for these playlist items (e.g., "1,3,5-8") - - Returns: - List of format dictionaries with keys: format_id, format, resolution, fps, vcodec, acodec, filesize, etc. - Returns None if yt-dlp is not available or format listing fails. - """ + """Get list of available formats for a URL using yt-dlp.""" _ensure_yt_dlp_ready() try: @@ -118,28 +105,25 @@ def list_formats(url: str, no_playlist: bool = False, playlist_items: Optional[s "no_warnings": True, "socket_timeout": 30, } - - # Add no_playlist option if specified + if no_playlist: ydl_opts["noplaylist"] = True - - # Add playlist_items filter if specified + if playlist_items: ydl_opts["playlist_items"] = playlist_items with yt_dlp.YoutubeDL(ydl_opts) as ydl: debug(f"Fetching format list for: {url}") info = ydl.extract_info(url, download=False) - + formats = info.get("formats", []) if not formats: log("No formats available", file=sys.stderr) return None - - # Parse and extract relevant format info + result_formats = [] for fmt in formats: - format_info = { + result_formats.append({ "format_id": fmt.get("format_id", ""), "format": fmt.get("format", ""), "ext": fmt.get("ext", ""), @@ -150,13 +134,12 @@ def list_formats(url: str, no_playlist: bool = False, playlist_items: Optional[s "vcodec": fmt.get("vcodec", "none"), "acodec": fmt.get("acodec", "none"), "filesize": fmt.get("filesize"), - "tbr": fmt.get("tbr"), # Total bitrate - } - result_formats.append(format_info) - + "tbr": fmt.get("tbr"), + }) + debug(f"Found {len(result_formats)} available formats") return result_formats - + except Exception as e: log(f"✗ Error fetching formats: {e}", file=sys.stderr) return None @@ -779,8 +762,28 @@ def download_media( debug_logger.write_record("libgen-resolve-failed", {"url": opts.url}) return _download_direct_file(opts.url, opts.output_dir, debug_logger) - # Try yt-dlp first if URL is supported - if not is_url_supported_by_ytdlp(opts.url): + # Handle GoFile shares with a dedicated resolver before yt-dlp/direct fallbacks + try: + netloc = urlparse(opts.url).netloc.lower() + except Exception: + netloc = "" + if "gofile.io" in netloc: + msg = "GoFile links are currently unsupported" + debug(msg) + if debug_logger is not None: + debug_logger.write_record("gofile-unsupported", {"url": opts.url}) + raise DownloadError(msg) + + # Determine if yt-dlp should be used + ytdlp_supported = is_url_supported_by_ytdlp(opts.url) + if ytdlp_supported: + probe_result = probe_url(opts.url, no_playlist=opts.no_playlist) + if probe_result is None: + log(f"URL supported by yt-dlp but no media detected, falling back to direct download: {opts.url}") + if debug_logger is not None: + debug_logger.write_record("ytdlp-skip-no-media", {"url": opts.url}) + return _download_direct_file(opts.url, opts.output_dir, debug_logger) + else: log(f"URL not supported by yt-dlp, trying direct download: {opts.url}") if debug_logger is not None: debug_logger.write_record("direct-file-attempt", {"url": opts.url}) diff --git a/helper/file_storage.py b/helper/file_storage.py index 088bf28..758156e 100644 --- a/helper/file_storage.py +++ b/helper/file_storage.py @@ -28,6 +28,41 @@ import re from helper.logger import log, debug from helper.utils_constant import mime_maps +from helper.utils import sha256_file + + +HEX_DIGITS = set("0123456789abcdef") + + +def _normalize_hex_hash(value: Optional[str]) -> Optional[str]: + """Return a normalized 64-character lowercase hash or None.""" + if value is None: + return None + + try: + cleaned = ''.join(ch for ch in str(value).strip().lower() if ch in HEX_DIGITS) + except Exception: + return None + + if len(cleaned) == 64: + return cleaned + return None + + +def _resolve_file_hash(candidate: Optional[str], path: Path) -> Optional[str]: + """Return the given hash if valid, otherwise compute sha256 from disk.""" + normalized = _normalize_hex_hash(candidate) + if normalized is not None: + return normalized + + if not path.exists(): + return None + + try: + return sha256_file(path) + except Exception as exc: + debug(f"Failed to compute hash for {path}: {exc}") + return None class StorageBackend(ABC): @@ -198,6 +233,39 @@ class LocalStorageBackend(StorageBackend): search_dir = Path(location).expanduser() debug(f"Searching local storage at: {search_dir}") + # Support comma-separated AND queries (token1,token2,...). Each token must match. + tokens = [t.strip() for t in query.split(',') if t.strip()] + + # Require explicit namespace for hash lookups to avoid accidental filename matches + if not match_all and len(tokens) == 1 and _normalize_hex_hash(query_lower): + debug("Hash queries require 'hash:' prefix for local search") + return results + + # Require explicit namespace for hash lookups to avoid accidental filename matches + if not match_all and _normalize_hex_hash(query_lower): + debug("Hash queries require 'hash:' prefix for local search") + return results + + def _create_entry(file_path: Path, tags: list[str], size_bytes: int | None, db_hash: Optional[str]) -> dict[str, Any]: + path_str = str(file_path) + entry = { + "name": file_path.stem, + "title": next((t.split(':', 1)[1] for t in tags if t.lower().startswith('title:')), file_path.stem), + "ext": file_path.suffix.lstrip('.'), + "path": path_str, + "target": path_str, + "origin": "local", + "size": size_bytes, + "size_bytes": size_bytes, + "tags": tags, + } + hash_value = _resolve_file_hash(db_hash, file_path) + if hash_value: + entry["hash"] = hash_value + entry["hash_hex"] = hash_value + entry["file_hash"] = hash_value + return entry + try: if not search_dir.exists(): debug(f"Search directory does not exist: {search_dir}") @@ -209,17 +277,196 @@ class LocalStorageBackend(StorageBackend): cursor = db.connection.cursor() # Check if query is a tag namespace search (format: "namespace:pattern") + if tokens and len(tokens) > 1: + # AND mode across comma-separated tokens + def _like_pattern(term: str) -> str: + return term.replace('*', '%').replace('?', '_') + + def _ids_for_token(token: str, cursor) -> set[int]: + token = token.strip() + if not token: + return set() + + # Namespaced token + if ':' in token and not token.startswith(':'): + namespace, pattern = token.split(':', 1) + namespace = namespace.strip().lower() + pattern = pattern.strip().lower() + + if namespace == 'hash': + normalized_hash = _normalize_hex_hash(pattern) + if not normalized_hash: + return set() + cursor.execute( + """ + SELECT id FROM files + WHERE LOWER(file_hash) = ? + """, + (normalized_hash,) + ) + return {row[0] for row in cursor.fetchall()} + + if namespace == 'store': + # Local backend only serves local store + if pattern not in {'local', 'file', 'filesystem'}: + return set() + cursor.execute("SELECT id FROM files") + return {row[0] for row in cursor.fetchall()} + + # Generic namespace match on tags + query_pattern = f"{namespace}:%" + cursor.execute( + """ + SELECT DISTINCT f.id, t.tag + FROM files f + JOIN tags t ON f.id = t.file_id + WHERE LOWER(t.tag) LIKE ? + """, + (query_pattern,) + ) + matched: set[int] = set() + for file_id, tag_val in cursor.fetchall(): + if not tag_val: + continue + tag_lower = str(tag_val).lower() + if not tag_lower.startswith(f"{namespace}:"): + continue + value = tag_lower[len(namespace)+1:] + if fnmatch(value, pattern): + matched.add(int(file_id)) + return matched + + # Bare token: match filename OR any tag (including title) + term = token.lower() + like_pattern = f"%{_like_pattern(term)}%" + + ids: set[int] = set() + # Filename match + cursor.execute( + """ + SELECT DISTINCT id FROM files + WHERE LOWER(file_path) LIKE ? + """, + (like_pattern,) + ) + ids.update(int(row[0]) for row in cursor.fetchall()) + + # Tag match (any namespace, including title) + cursor.execute( + """ + SELECT DISTINCT f.id + FROM files f + JOIN tags t ON f.id = t.file_id + WHERE LOWER(t.tag) LIKE ? + """, + (like_pattern,) + ) + ids.update(int(row[0]) for row in cursor.fetchall()) + return ids + + try: + with LocalLibraryDB(search_dir) as db: + cursor = db.connection.cursor() + matching_ids: set[int] | None = None + for token in tokens: + ids = _ids_for_token(token, cursor) + matching_ids = ids if matching_ids is None else matching_ids & ids + if not matching_ids: + return results + + if not matching_ids: + return results + + # Fetch rows for matching IDs + placeholders = ",".join(["?"] * len(matching_ids)) + fetch_sql = f""" + SELECT id, file_path, file_size, file_hash + FROM files + WHERE id IN ({placeholders}) + ORDER BY file_path + LIMIT ? + """ + cursor.execute(fetch_sql, (*matching_ids, limit or len(matching_ids))) + rows = cursor.fetchall() + for file_id, file_path_str, size_bytes, file_hash in rows: + if not file_path_str: + continue + file_path = Path(file_path_str) + if not file_path.exists(): + continue + if size_bytes is None: + try: + size_bytes = file_path.stat().st_size + except OSError: + size_bytes = None + cursor.execute( + """ + SELECT tag FROM tags WHERE file_id = ? + """, + (file_id,), + ) + tags = [row[0] for row in cursor.fetchall()] + entry = _create_entry(file_path, tags, size_bytes, file_hash) + results.append(entry) + if limit is not None and len(results) >= limit: + return results + return results + except Exception as exc: + log(f"⚠️ AND search failed: {exc}", file=sys.stderr) + debug(f"AND search exception details: {exc}") + return [] + if ":" in query and not query.startswith(":"): namespace, pattern = query.split(":", 1) namespace = namespace.strip().lower() pattern = pattern.strip().lower() debug(f"Performing namespace search: {namespace}:{pattern}") + + # Special-case hash: lookups against file_hash column + if namespace == "hash": + normalized_hash = _normalize_hex_hash(pattern) + if not normalized_hash: + return results + cursor.execute( + """ + SELECT id, file_path, file_size, file_hash + FROM files + WHERE LOWER(file_hash) = ? + ORDER BY file_path + LIMIT ? + """, + (normalized_hash, limit or 1000), + ) + + for file_id, file_path_str, size_bytes, file_hash in cursor.fetchall(): + if not file_path_str: + continue + file_path = Path(file_path_str) + if not file_path.exists(): + continue + if size_bytes is None: + try: + size_bytes = file_path.stat().st_size + except OSError: + size_bytes = None + cursor.execute( + """ + SELECT tag FROM tags WHERE file_id = ? + """, + (file_id,), + ) + all_tags = [row[0] for row in cursor.fetchall()] + entry = _create_entry(file_path, all_tags, size_bytes, file_hash) + results.append(entry) + if limit is not None and len(results) >= limit: + return results + return results # Search for tags matching the namespace and pattern query_pattern = f"{namespace}:%" cursor.execute(""" - SELECT DISTINCT f.id, f.file_path, f.file_size + SELECT DISTINCT f.id, f.file_path, f.file_size, f.file_hash FROM files f JOIN tags t ON f.id = t.file_id WHERE LOWER(t.tag) LIKE ? @@ -231,7 +478,7 @@ class LocalStorageBackend(StorageBackend): debug(f"Found {len(rows)} potential matches in DB") # Filter results by pattern match - for file_id, file_path_str, size_bytes in rows: + for file_id, file_path_str, size_bytes, file_hash in rows: if not file_path_str: continue @@ -254,30 +501,14 @@ class LocalStorageBackend(StorageBackend): if fnmatch(value, pattern): file_path = Path(file_path_str) if file_path.exists(): - path_str = str(file_path) if size_bytes is None: size_bytes = file_path.stat().st_size - - # Fetch all tags for this file cursor.execute(""" SELECT tag FROM tags WHERE file_id = ? """, (file_id,)) all_tags = [row[0] for row in cursor.fetchall()] - - # Use title tag if present - title_tag = next((t.split(':', 1)[1] for t in all_tags if t.lower().startswith('title:')), None) - - results.append({ - "name": file_path.stem, - "title": title_tag or file_path.stem, - "ext": file_path.suffix.lstrip('.'), - "path": path_str, - "target": path_str, - "origin": "local", - "size": size_bytes, - "size_bytes": size_bytes, - "tags": all_tags, - }) + entry = _create_entry(file_path, all_tags, size_bytes, file_hash) + results.append(entry) else: debug(f"File missing on disk: {file_path}") break # Don't add same file multiple times @@ -309,7 +540,7 @@ class LocalStorageBackend(StorageBackend): where_clause = " AND ".join(conditions) cursor.execute(f""" - SELECT DISTINCT f.id, f.file_path, f.file_size + SELECT DISTINCT f.id, f.file_path, f.file_size, f.file_hash FROM files f WHERE {where_clause} ORDER BY f.file_path @@ -344,7 +575,7 @@ class LocalStorageBackend(StorageBackend): word_regex = None seen_files = set() - for file_id, file_path_str, size_bytes in rows: + for file_id, file_path_str, size_bytes, file_hash in rows: if not file_path_str or file_path_str in seen_files: continue @@ -361,26 +592,12 @@ class LocalStorageBackend(StorageBackend): if size_bytes is None: size_bytes = file_path.stat().st_size - # Fetch tags for this file cursor.execute(""" SELECT tag FROM tags WHERE file_id = ? """, (file_id,)) tags = [row[0] for row in cursor.fetchall()] - - # Use title tag if present - title_tag = next((t.split(':', 1)[1] for t in tags if t.lower().startswith('title:')), None) - - results.append({ - "name": file_path.stem, - "title": title_tag or file_path.stem, - "ext": file_path.suffix.lstrip('.'), - "path": path_str, - "target": path_str, - "origin": "local", - "size": size_bytes, - "size_bytes": size_bytes, - "tags": tags, - }) + entry = _create_entry(file_path, tags, size_bytes, file_hash) + results.append(entry) if limit is not None and len(results) >= limit: return results @@ -390,7 +607,7 @@ class LocalStorageBackend(StorageBackend): for term in terms: cursor.execute( """ - SELECT DISTINCT f.id, f.file_path, f.file_size + SELECT DISTINCT f.id, f.file_path, f.file_size, f.file_hash FROM files f JOIN tags t ON f.id = t.file_id WHERE LOWER(t.tag) LIKE ? @@ -399,7 +616,7 @@ class LocalStorageBackend(StorageBackend): """, (f"title:%{term}%", fetch_limit), ) - for file_id, file_path_str, size_bytes in cursor.fetchall(): + for file_id, file_path_str, size_bytes, file_hash in cursor.fetchall(): if not file_path_str: continue entry = title_hits.get(file_id) @@ -411,6 +628,7 @@ class LocalStorageBackend(StorageBackend): title_hits[file_id] = { "path": file_path_str, "size": size_bytes, + "hash": file_hash, "count": 1, } @@ -441,19 +659,8 @@ class LocalStorageBackend(StorageBackend): (file_id,), ) tags = [row[0] for row in cursor.fetchall()] - title_tag = next((t.split(':', 1)[1] for t in tags if t.lower().startswith('title:')), None) - - results.append({ - "name": file_path.stem, - "title": title_tag or file_path.stem, - "ext": file_path.suffix.lstrip('.'), - "path": str(file_path), - "target": str(file_path), - "origin": "local", - "size": size_bytes, - "size_bytes": size_bytes, - "tags": tags, - }) + entry = _create_entry(file_path, tags, size_bytes, info.get("hash")) + results.append(entry) if limit is not None and len(results) >= limit: return results @@ -465,7 +672,7 @@ class LocalStorageBackend(StorageBackend): query_pattern = f"%{query_lower}%" cursor.execute(""" - SELECT DISTINCT f.id, f.file_path, f.file_size + SELECT DISTINCT f.id, f.file_path, f.file_size, f.file_hash FROM files f JOIN tags t ON f.id = t.file_id WHERE LOWER(t.tag) LIKE ? AND LOWER(t.tag) NOT LIKE '%:%' @@ -474,7 +681,7 @@ class LocalStorageBackend(StorageBackend): """, (query_pattern, limit or 1000)) tag_rows = cursor.fetchall() - for file_id, file_path_str, size_bytes in tag_rows: + for file_id, file_path_str, size_bytes, file_hash in tag_rows: if not file_path_str or file_path_str in seen_files: continue seen_files.add(file_path_str) @@ -490,21 +697,8 @@ class LocalStorageBackend(StorageBackend): SELECT tag FROM tags WHERE file_id = ? """, (file_id,)) tags = [row[0] for row in cursor.fetchall()] - - # Use title tag if present - title_tag = next((t.split(':', 1)[1] for t in tags if t.lower().startswith('title:')), None) - - results.append({ - "name": file_path.stem, - "title": title_tag or file_path.stem, - "ext": file_path.suffix.lstrip('.'), - "path": path_str, - "target": path_str, - "origin": "local", - "size": size_bytes, - "size_bytes": size_bytes, - "tags": tags, - }) + entry = _create_entry(file_path, tags, size_bytes, file_hash) + results.append(entry) if limit is not None and len(results) >= limit: return results @@ -512,14 +706,14 @@ class LocalStorageBackend(StorageBackend): else: # Match all - get all files from database cursor.execute(""" - SELECT id, file_path, file_size + SELECT id, file_path, file_size, file_hash FROM files ORDER BY file_path LIMIT ? """, (limit or 1000,)) rows = cursor.fetchall() - for file_id, file_path_str, size_bytes in rows: + for file_id, file_path_str, size_bytes, file_hash in rows: if file_path_str: file_path = Path(file_path_str) if file_path.exists(): @@ -532,21 +726,8 @@ class LocalStorageBackend(StorageBackend): SELECT tag FROM tags WHERE file_id = ? """, (file_id,)) tags = [row[0] for row in cursor.fetchall()] - - # Use title tag if present - title_tag = next((t.split(':', 1)[1] for t in tags if t.lower().startswith('title:')), None) - - results.append({ - "name": file_path.stem, - "title": title_tag or file_path.stem, - "ext": file_path.suffix.lstrip('.'), - "path": path_str, - "target": path_str, - "origin": "local", - "size": size_bytes, - "size_bytes": size_bytes, - "tags": tags, - }) + entry = _create_entry(file_path, tags, size_bytes, file_hash) + results.append(entry) if results: debug(f"Returning {len(results)} results from DB") diff --git a/helper/local_library.py b/helper/local_library.py index c3d692a..68def1c 100644 --- a/helper/local_library.py +++ b/helper/local_library.py @@ -22,6 +22,7 @@ from typing import Optional, Dict, Any, List, Tuple, Set from .utils import sha256_file logger = logging.getLogger(__name__) +WORKER_LOG_MAX_ENTRIES = 99 # Try to import optional dependencies try: @@ -352,6 +353,29 @@ class LocalLibraryDB: INSERT INTO worker_log (worker_id, event_type, step, channel, message) VALUES (?, ?, ?, ?, ?) """, (worker_id, event_type, step, channel, message)) + self._prune_worker_log_entries(cursor, worker_id) + + def _prune_worker_log_entries(self, cursor, worker_id: str) -> None: + """Keep at most WORKER_LOG_MAX_ENTRIES rows per worker by trimming oldest ones.""" + if WORKER_LOG_MAX_ENTRIES <= 0: + return + cursor.execute( + """ + SELECT id FROM worker_log + WHERE worker_id = ? + ORDER BY id DESC + LIMIT 1 OFFSET ? + """, + (worker_id, WORKER_LOG_MAX_ENTRIES - 1), + ) + row = cursor.fetchone() + if not row: + return + cutoff_id = row[0] + cursor.execute( + "DELETE FROM worker_log WHERE worker_id = ? AND id < ?", + (worker_id, cutoff_id), + ) def get_worker_events(self, worker_id: str, limit: int = 500) -> List[Dict[str, Any]]: """Return chronological worker log events for timelines.""" diff --git a/helper/metadata_search.py b/helper/metadata_search.py index f99d316..26b1237 100644 --- a/helper/metadata_search.py +++ b/helper/metadata_search.py @@ -7,6 +7,11 @@ import sys from helper.logger import log, debug +try: # Optional dependency + import musicbrainzngs # type: ignore +except ImportError: # pragma: no cover - optional + musicbrainzngs = None + class MetadataProvider(ABC): """Base class for metadata providers (music, movies, books, etc.).""" @@ -266,6 +271,86 @@ class GoogleBooksMetadataProvider(MetadataProvider): return tags +class MusicBrainzMetadataProvider(MetadataProvider): + """Metadata provider for MusicBrainz recordings.""" + + @property + def name(self) -> str: # type: ignore[override] + return "musicbrainz" + + def search(self, query: str, limit: int = 10) -> List[Dict[str, Any]]: + if not musicbrainzngs: + log("musicbrainzngs is not installed; skipping MusicBrainz scrape", file=sys.stderr) + return [] + + q = (query or "").strip() + if not q: + return [] + + try: + # Ensure user agent is set (required by MusicBrainz) + musicbrainzngs.set_useragent("Medeia-Macina", "0.1") + except Exception: + pass + + try: + resp = musicbrainzngs.search_recordings(query=q, limit=limit) + recordings = resp.get("recording-list") or resp.get("recordings") or [] + except Exception as exc: + log(f"MusicBrainz search failed: {exc}", file=sys.stderr) + return [] + + items: List[Dict[str, Any]] = [] + for rec in recordings[:limit]: + if not isinstance(rec, dict): + continue + title = rec.get("title") or "" + + artist = "" + artist_credit = rec.get("artist-credit") or rec.get("artist_credit") + if isinstance(artist_credit, list) and artist_credit: + first = artist_credit[0] + if isinstance(first, dict): + artist = first.get("name") or first.get("artist", {}).get("name", "") + elif isinstance(first, str): + artist = first + + album = "" + release_list = rec.get("release-list") or rec.get("releases") or rec.get("release") + if isinstance(release_list, list) and release_list: + first_rel = release_list[0] + if isinstance(first_rel, dict): + album = first_rel.get("title", "") or "" + release_date = first_rel.get("date") or "" + else: + album = str(first_rel) + release_date = "" + else: + release_date = rec.get("first-release-date") or "" + + year = str(release_date)[:4] if release_date else "" + mbid = rec.get("id") or "" + + items.append({ + "title": title, + "artist": artist, + "album": album, + "year": year, + "provider": self.name, + "mbid": mbid, + "raw": rec, + }) + + return items + + def to_tags(self, item: Dict[str, Any]) -> List[str]: + tags = super().to_tags(item) + mbid = item.get("mbid") + if mbid: + tags.append(f"musicbrainz:{mbid}") + return tags + + # Registry --------------------------------------------------------------- _METADATA_PROVIDERS: Dict[str, Type[MetadataProvider]] = { @@ -273,6 +358,7 @@ _METADATA_PROVIDERS: Dict[str, Type[MetadataProvider]] = { "openlibrary": OpenLibraryMetadataProvider, "googlebooks": GoogleBooksMetadataProvider, "google": GoogleBooksMetadataProvider, + "musicbrainz": MusicBrainzMetadataProvider, } diff --git a/helper/mpv_ipc.py b/helper/mpv_ipc.py index 42d50e0..7f18795 100644 --- a/helper/mpv_ipc.py +++ b/helper/mpv_ipc.py @@ -12,6 +12,7 @@ import os import platform import socket import time as _time +from pathlib import Path from typing import Any, Dict, Optional, List from helper.logger import debug @@ -19,6 +20,7 @@ from helper.logger import debug # Fixed pipe name for persistent MPV connection across all Python sessions FIXED_IPC_PIPE_NAME = "mpv-medeia-macina" +MPV_LUA_SCRIPT_PATH = str(Path(__file__).resolve().parent.parent / "LUA" / "main.lua") class MPVIPCError(Exception): @@ -45,6 +47,48 @@ def get_ipc_pipe_path() -> str: return f"/tmp/{FIXED_IPC_PIPE_NAME}.sock" +def _unwrap_memory_target(text: Optional[str]) -> Optional[str]: + """Return the real target from a memory:// M3U payload if present.""" + if not isinstance(text, str) or not text.startswith("memory://"): + return text + for line in text.splitlines(): + line = line.strip() + if not line or line.startswith('#') or line.startswith('memory://'): + continue + return line + return text + + +def _normalize_target(text: Optional[str]) -> Optional[str]: + """Normalize playlist targets for deduping across raw/memory:// wrappers.""" + if not text: + return None + real = _unwrap_memory_target(text) + if not real: + return None + real = real.strip() + if not real: + return None + + lower = real.lower() + # Hydrus bare hash + if len(lower) == 64 and all(ch in "0123456789abcdef" for ch in lower): + return lower + + # Hydrus file URL with hash query + try: + parsed = __import__("urllib.parse").parse.urlparse(real) + qs = __import__("urllib.parse").parse.parse_qs(parsed.query) + hash_qs = qs.get("hash", [None])[0] + if hash_qs and len(hash_qs) == 64 and all(ch in "0123456789abcdef" for ch in hash_qs.lower()): + return hash_qs.lower() + except Exception: + pass + + # Normalize paths/urls for comparison + return lower.replace('\\', '\\') + + class MPVIPCClient: """Client for communicating with mpv via IPC socket/pipe. @@ -171,11 +215,18 @@ class MPVIPCClient: # Check if this is the response to our request if resp.get("request_id") == request.get("request_id"): return resp - - # If it's an error without request_id (shouldn't happen for commands) - if "error" in resp and "request_id" not in resp: - # Might be an event or async error - pass + + # Handle async log messages/events for visibility + event_type = resp.get("event") + if event_type == "log-message": + level = resp.get("level", "info") + prefix = resp.get("prefix", "") + text = resp.get("text", "").strip() + debug(f"[MPV {level}] {prefix} {text}".strip()) + elif event_type: + debug(f"[MPV event] {event_type}: {resp}") + elif "error" in resp and "request_id" not in resp: + debug(f"[MPV error] {resp}") except json.JSONDecodeError: pass @@ -230,7 +281,13 @@ def send_to_mpv(file_url: str, title: str, headers: Optional[Dict[str, str]] = N return False try: - # Command 1: Set headers if provided + # Command 0: Subscribe to log messages so MPV console errors surface in REPL + _subscribe_log_messages(client) + + # Command 1: Ensure our Lua helper is loaded for in-window controls + _ensure_lua_script_loaded(client) + + # Command 2: Set headers if provided if headers: header_str = ",".join([f"{k}: {v}" for k, v in headers.items()]) cmd_headers = { @@ -238,22 +295,46 @@ def send_to_mpv(file_url: str, title: str, headers: Optional[Dict[str, str]] = N "request_id": 0 } client.send_command(cmd_headers) + + # Deduplicate: if target already exists in playlist, just play it + normalized_new = _normalize_target(file_url) + existing_index = None + existing_title = None + if normalized_new: + playlist_resp = client.send_command({"command": ["get_property", "playlist"], "request_id": 98}) + if playlist_resp and playlist_resp.get("error") == "success": + for idx, item in enumerate(playlist_resp.get("data", []) or []): + for key in ("playlist-path", "filename"): + norm_existing = _normalize_target(item.get(key)) if isinstance(item, dict) else None + if norm_existing and norm_existing == normalized_new: + existing_index = idx + existing_title = item.get("title") if isinstance(item, dict) else None + break + if existing_index is not None: + break + + if existing_index is not None and append: + play_cmd = {"command": ["playlist-play-index", existing_index], "request_id": 99} + play_resp = client.send_command(play_cmd) + if play_resp and play_resp.get("error") == "success": + client.send_command({"command": ["set_property", "pause", False], "request_id": 100}) + safe_title = (title or existing_title or "").replace("\n", " ").replace("\r", " ").strip() + if safe_title: + client.send_command({"command": ["set_property", "force-media-title", safe_title], "request_id": 101}) + debug(f"Already in playlist, playing existing entry: {safe_title or file_url}") + return True - # Command 2: Load file - # Use memory:// M3U to preserve title in playlist if provided - # This is required for YouTube URLs and proper playlist display - if title: - # Sanitize title for M3U (remove newlines) - safe_title = title.replace("\n", " ").replace("\r", "") - # M3U format: #EXTM3U\n#EXTINF:-1,Title\nURL - m3u_content = f"#EXTM3U\n#EXTINF:-1,{safe_title}\n{file_url}" - target = f"memory://{m3u_content}" - else: - target = file_url - + # Command 2: Load file and inject title via memory:// wrapper so playlist shows friendly names immediately + target = file_url load_mode = "append-play" if append else "replace" + safe_title = (title or "").replace("\n", " ").replace("\r", " ").strip() + target_to_send = target + if safe_title and not str(target).startswith("memory://"): + m3u_content = f"#EXTM3U\n#EXTINF:-1,{safe_title}\n{target}" + target_to_send = f"memory://{m3u_content}" + cmd_load = { - "command": ["loadfile", target, load_mode], + "command": ["loadfile", target_to_send, load_mode], "request_id": 1 } @@ -263,14 +344,14 @@ def send_to_mpv(file_url: str, title: str, headers: Optional[Dict[str, str]] = N return False # Command 3: Set title (metadata for display) - still useful for window title - if title: + if safe_title: cmd_title = { - "command": ["set_property", "force-media-title", title], + "command": ["set_property", "force-media-title", safe_title], "request_id": 2 } client.send_command(cmd_title) - debug(f"Sent to existing MPV: {title}") + debug(f"Sent to existing MPV: {safe_title or title}") return True except Exception as e: @@ -295,3 +376,29 @@ def get_mpv_client(socket_path: Optional[str] = None) -> Optional[MPVIPCClient]: return client return None + +def _subscribe_log_messages(client: MPVIPCClient) -> None: + """Ask MPV to emit log messages over IPC so we can surface console errors.""" + try: + client.send_command({"command": ["request_log_messages", "warn"], "request_id": 11}) + except Exception as exc: + debug(f"Failed to subscribe to MPV logs: {exc}") + + +def _ensure_lua_script_loaded(client: MPVIPCClient) -> None: + """Load the bundled MPV Lua script to enable in-window controls. + + Safe to call repeatedly; mpv will simply reload the script if already present. + """ + try: + script_path = MPV_LUA_SCRIPT_PATH + if not script_path or not os.path.exists(script_path): + return + resp = client.send_command({"command": ["load-script", script_path], "request_id": 12}) + if resp and resp.get("error") == "success": + debug(f"Loaded MPV Lua script: {script_path}") + else: + debug(f"MPV Lua load response: {resp}") + except Exception as exc: + debug(f"Failed to load MPV Lua script: {exc}") + diff --git a/hydrus_health_check.py b/hydrus_health_check.py index 7fcdd54..44c8202 100644 --- a/hydrus_health_check.py +++ b/hydrus_health_check.py @@ -55,16 +55,16 @@ def check_hydrus_availability(config: Dict[str, Any]) -> Tuple[bool, Optional[st is_available, reason = _is_hydrus_available(config, use_cache=False) if is_available: - logger.info("[Hydrus Health Check] ✅ Hydrus API is AVAILABLE") + logger.info("[Hydrus Health Check] Hydrus API is AVAILABLE") return True, None else: reason_str = f": {reason}" if reason else "" - logger.warning(f"[Hydrus Health Check] ❌ Hydrus API is UNAVAILABLE{reason_str}") + logger.warning(f"[Hydrus Health Check] Hydrus API is UNAVAILABLE{reason_str}") return False, reason except Exception as e: error_msg = str(e) - logger.error(f"[Hydrus Health Check] ❌ Error checking Hydrus availability: {error_msg}") + logger.error(f"[Hydrus Health Check] Error checking Hydrus availability: {error_msg}") return False, error_msg @@ -88,16 +88,16 @@ def initialize_hydrus_health_check(config: Dict[str, Any]) -> None: _HYDRUS_CHECK_COMPLETE = True if is_available: - debug("✅ Hydrus: ENABLED - All Hydrus features available", file=sys.stderr) + debug("Hydrus: ENABLED - All Hydrus features available", file=sys.stderr) else: - debug(f"⚠️ Hydrus: DISABLED - {reason or 'Connection failed'}", file=sys.stderr) + debug(f"Hydrus: DISABLED - {reason or 'Connection failed'}", file=sys.stderr) except Exception as e: logger.error(f"[Startup] Failed to initialize Hydrus health check: {e}", exc_info=True) _HYDRUS_AVAILABLE = False _HYDRUS_UNAVAILABLE_REASON = str(e) _HYDRUS_CHECK_COMPLETE = True - debug(f"⚠️ Hydrus: DISABLED - Error during health check: {e}", file=sys.stderr) + debug(f"Hydrus: DISABLED - Error during health check: {e}", file=sys.stderr) def check_debrid_availability(config: Dict[str, Any]) -> Tuple[bool, Optional[str]]: diff --git a/pipeline.py b/pipeline.py index b18a819..7638f49 100644 --- a/pipeline.py +++ b/pipeline.py @@ -85,6 +85,10 @@ _PIPELINE_COMMAND_TEXT: str = "" _PIPELINE_VALUES: Dict[str, Any] = {} _PIPELINE_MISSING = object() +# Preserve downstream pipeline stages when a command pauses for @N selection +_PENDING_PIPELINE_TAIL: List[List[str]] = [] +_PENDING_PIPELINE_SOURCE: Optional[str] = None + # Global callback to notify UI when library content changes _UI_LIBRARY_REFRESH_CALLBACK: Optional[Any] = None @@ -262,11 +266,50 @@ def load_value(key: str, default: Any = None) -> Any: return current +def set_pending_pipeline_tail(stages: Optional[Sequence[Sequence[str]]], source_command: Optional[str] = None) -> None: + """Store the remaining pipeline stages when execution pauses for @N selection. + + Args: + stages: Iterable of pipeline stage token lists + source_command: Command that produced the selection table (for validation) + """ + global _PENDING_PIPELINE_TAIL, _PENDING_PIPELINE_SOURCE + try: + pending: List[List[str]] = [] + for stage in stages or []: + if isinstance(stage, (list, tuple)): + pending.append([str(token) for token in stage]) + _PENDING_PIPELINE_TAIL = pending + clean_source = (source_command or "").strip() + _PENDING_PIPELINE_SOURCE = clean_source if clean_source else None + except Exception: + # Keep existing pending tail on failure + pass + + +def get_pending_pipeline_tail() -> List[List[str]]: + """Get a copy of the pending pipeline tail (stages queued after selection).""" + return [list(stage) for stage in _PENDING_PIPELINE_TAIL] + + +def get_pending_pipeline_source() -> Optional[str]: + """Get the source command associated with the pending pipeline tail.""" + return _PENDING_PIPELINE_SOURCE + + +def clear_pending_pipeline_tail() -> None: + """Clear any stored pending pipeline tail.""" + global _PENDING_PIPELINE_TAIL, _PENDING_PIPELINE_SOURCE + _PENDING_PIPELINE_TAIL = [] + _PENDING_PIPELINE_SOURCE = None + + def reset() -> None: """Reset all pipeline state. Called between pipeline executions.""" global _PIPE_EMITS, _PIPE_ACTIVE, _PIPE_IS_LAST, _PIPELINE_VALUES global _LAST_PIPELINE_CAPTURE, _PIPELINE_REFRESHED, _PIPELINE_LAST_ITEMS global _PIPELINE_COMMAND_TEXT, _LAST_RESULT_SUBJECT, _DISPLAY_SUBJECT + global _PENDING_PIPELINE_TAIL, _PENDING_PIPELINE_SOURCE _PIPE_EMITS = [] _PIPE_ACTIVE = False @@ -278,6 +321,8 @@ def reset() -> None: _PIPELINE_COMMAND_TEXT = "" _LAST_RESULT_SUBJECT = None _DISPLAY_SUBJECT = None + _PENDING_PIPELINE_TAIL = [] + _PENDING_PIPELINE_SOURCE = None def get_emitted_items() -> List[Any]: diff --git a/result_table.py b/result_table.py index 20b0038..f668bd1 100644 --- a/result_table.py +++ b/result_table.py @@ -118,6 +118,16 @@ class ResultRow: def add_column(self, name: str, value: Any) -> None: """Add a column to this row.""" str_value = str(value) if value is not None else "" + + # Normalize extension columns globally and cap to 5 characters + if str(name).strip().lower() == "ext": + str_value = str_value.strip().lstrip(".") + for idx, ch in enumerate(str_value): + if not ch.isalnum(): + str_value = str_value[:idx] + break + str_value = str_value[:5] + self.columns.append(ResultColumn(name, str_value)) def get_column(self, name: str) -> Optional[str]: @@ -618,48 +628,78 @@ class ResultTable: for row in self.rows: for col in row.columns: col_name = col.name + value_width = len(col.value) + if col_name.lower() == "ext": + value_width = min(value_width, 5) col_widths[col_name] = max( col_widths.get(col_name, 0), len(col.name), - len(col.value) + value_width ) # Calculate row number column width num_width = len(str(len(self.rows))) + 1 # +1 for padding - - lines = [] - - # Add title if present - if self.title: - lines.append("=" * self.title_width) - lines.append(self.title.center(self.title_width)) - lines.append("=" * self.title_width) + # Preserve column order + column_names = list(col_widths.keys()) + + def capped_width(name: str) -> int: + cap = 5 if name.lower() == "ext" else 90 + return min(col_widths[name], cap) + + widths = [num_width] + [capped_width(name) for name in column_names] + base_inner_width = sum(widths) + (len(widths) - 1) * 3 # account for " | " separators + + # Compute final table width (with side walls) to accommodate headers/titles + table_width = base_inner_width + 2 # side walls + if self.title: + table_width = max(table_width, len(self.title) + 2) if self.header_lines: - lines.extend(self.header_lines) - + table_width = max(table_width, max(len(line) for line in self.header_lines) + 2) + + def wrap(text: str) -> str: + """Wrap content with side walls and pad to table width.""" + if len(text) > table_width - 2: + text = text[: table_width - 5] + "..." # keep walls intact + return "|" + text.ljust(table_width - 2) + "|" + + lines = [] + + # Title block + if self.title: + lines.append("|" + "=" * (table_width - 2) + "|") + lines.append(wrap(self.title.center(table_width - 2))) + lines.append("|" + "=" * (table_width - 2) + "|") + + # Optional header metadata lines + for meta in self.header_lines: + lines.append(wrap(meta)) + # Add header with # column header_parts = ["#".ljust(num_width)] separator_parts = ["-" * num_width] - for col_name in col_widths: - width = min(col_widths[col_name], 90) # Cap column width (increased for expanded titles) + for col_name in column_names: + width = capped_width(col_name) header_parts.append(col_name.ljust(width)) separator_parts.append("-" * width) - - lines.append(" | ".join(header_parts)) - lines.append("-+-".join(separator_parts)) - + + lines.append(wrap(" | ".join(header_parts))) + lines.append(wrap("-+-".join(separator_parts))) + # Add rows with row numbers for row_num, row in enumerate(self.rows, 1): row_parts = [str(row_num).ljust(num_width)] - for col_name in col_widths: - width = min(col_widths[col_name], 90) # Increased cap for expanded titles + for col_name in column_names: + width = capped_width(col_name) col_value = row.get_column(col_name) or "" if len(col_value) > width: - col_value = col_value[:width - 3] + "..." + col_value = col_value[: width - 3] + "..." row_parts.append(col_value.ljust(width)) - lines.append(" | ".join(row_parts)) - + lines.append(wrap(" | ".join(row_parts))) + + # Bottom border to close the rectangle + lines.append("|" + "=" * (table_width - 2) + "|") + return "\n".join(lines) def format_compact(self) -> str: