This commit is contained in:
2026-01-17 02:36:06 -08:00
parent 3a7c443004
commit c6fd6b4224
9 changed files with 440 additions and 226 deletions

View File

@@ -92,7 +92,7 @@
"(hitfile\\.net/[a-z0-9A-Z]{4,9})"
],
"regexp": "(hitf\\.(to|cc)/([a-z0-9A-Z]{4,9}))|(htfl\\.(net|to|cc)/([a-z0-9A-Z]{4,9}))|(hitfile\\.(net)/download/free/([a-z0-9A-Z]{4,9}))|((hitfile\\.net/[a-z0-9A-Z]{4,9}))",
"status": true
"status": false
},
"mega": {
"name": "mega",

View File

@@ -344,7 +344,7 @@ class MPV:
def _q(s: str) -> str:
return '"' + s.replace("\\", "\\\\").replace('"', '\\"') + '"'
pipeline = f"download-file -url {_q(url)} -format {_q(fmt)}"
pipeline = f"download-file -url {_q(url)} -query {_q(f'format:{fmt}')}"
if store:
pipeline += f" | add-file -store {_q(store)}"
else:

View File

@@ -518,7 +518,7 @@ def _run_op(op: str, data: Any) -> Dict[str, Any]:
size = _format_bytes(fmt.get("filesize") or fmt.get("filesize_approx"))
# Build selection args compatible with MPV Lua picker.
selection_args = ["-format", format_id]
selection_args = ["-query", f"format:{format_id}"]
rows.append(
{

View File

@@ -266,6 +266,22 @@ def _fetch_torrent_bytes(target: str) -> Optional[bytes]:
return None
_ALD_MAGNET_PREFIX = "alldebrid:magnet:"
def _parse_alldebrid_magnet_id(target: str) -> Optional[int]:
candidate = str(target or "").strip()
if not candidate:
return None
if not candidate.lower().startswith(_ALD_MAGNET_PREFIX):
return None
try:
magnet_id_raw = candidate[len(_ALD_MAGNET_PREFIX):].strip()
return int(magnet_id_raw)
except Exception:
return None
def resolve_magnet_spec(target: str) -> Optional[str]:
"""Resolve a magnet/hash/torrent URL into a magnet/hash string."""
candidate = str(target or "").strip()
@@ -558,14 +574,14 @@ class AllDebrid(TableProviderMixin, Provider):
1. User runs: search-file -provider alldebrid "ubuntu"
2. Results show magnet folders and (optionally) files
3. User selects a row: @1
4. Selection metadata routes to download-file with -magnet-id
5. download-file calls provider.download_items() with magnet_id
4. Selection metadata routes to download-file with -url alldebrid:magnet:<id>
5. download-file invokes provider.download_items() via provider URL handling
6. Provider fetches files, unlocks locked URLs, and downloads
"""
# Magnet URIs should be routed through this provider.
TABLE_AUTO_STAGES = {"alldebrid": ["download-file"]}
AUTO_STAGE_USE_SELECTION_ARGS = True
URL = ("magnet:",)
URL = ("magnet:", "alldebrid:magnet:")
URL_DOMAINS = ()
@classmethod
@@ -631,6 +647,19 @@ class AllDebrid(TableProviderMixin, Provider):
return resolve_magnet_spec(str(target)) if isinstance(target, str) else None
def handle_url(self, url: str, *, output_dir: Optional[Path] = None) -> Tuple[bool, Optional[Path]]:
magnet_id = _parse_alldebrid_magnet_id(url)
if magnet_id is not None:
return True, {
"action": "download_items",
"path": f"{_ALD_MAGNET_PREFIX}{magnet_id}",
"title": f"magnet-{magnet_id}",
"metadata": {
"magnet_id": magnet_id,
"provider": "alldebrid",
"provider_view": "files",
},
}
spec = resolve_magnet_spec(url)
if not spec:
return False, None
@@ -1180,8 +1209,8 @@ class AllDebrid(TableProviderMixin, Provider):
"provider": "alldebrid",
"provider_view": "files",
# Selection metadata for table system
"_selection_args": ["-magnet-id", str(magnet_id)],
"_selection_action": ["download-file", "-provider", "alldebrid", "-magnet-id", str(magnet_id)],
"_selection_args": ["-url", f"{_ALD_MAGNET_PREFIX}{magnet_id}"],
"_selection_action": ["download-file", "-provider", "alldebrid", "-url", f"{_ALD_MAGNET_PREFIX}{magnet_id}"],
}
results.append(
@@ -1295,8 +1324,8 @@ class AllDebrid(TableProviderMixin, Provider):
"provider_view": "folders",
"magnet_name": magnet_name,
# Selection metadata: allow @N expansion to drive downloads directly
"_selection_args": ["-magnet-id", str(magnet_id)],
"_selection_action": ["download-file", "-provider", "alldebrid", "-magnet-id", str(magnet_id)],
"_selection_args": ["-url", f"{_ALD_MAGNET_PREFIX}{magnet_id}"],
"_selection_action": ["download-file", "-provider", "alldebrid", "-url", f"{_ALD_MAGNET_PREFIX}{magnet_id}"],
},
)
)
@@ -1585,7 +1614,7 @@ try:
1. Explicit _selection_action (full command args)
2. Explicit _selection_args (URL-specific args)
3. Magic routing based on provider_view (files vs folders)
4. Magnet ID routing for folder-type rows
4. Magnet ID routing for folder-type rows (via alldebrid:magnet:<id>)
5. Direct URL for file rows
This ensures that selector overrides all pre-codes and gives users full power.
@@ -1612,7 +1641,7 @@ try:
# Folder rows: use magnet_id to fetch and download all files
magnet_id = metadata.get("magnet_id")
if magnet_id is not None:
return ["-magnet-id", str(magnet_id)]
return ["-url", f"{_ALD_MAGNET_PREFIX}{magnet_id}"]
# Fallback: try direct URL
if row.path:

View File

@@ -2,7 +2,7 @@
When a URL is passed through download-file, this provider displays available formats
in a table and routes format selection back to download-file with the chosen format
already specified via -format, skipping the format table on the second invocation.
already specified via -query "format:<id>", skipping the format table on the second invocation.
This keeps format selection logic in ytdlp and leaves add-file plug-and-play.
"""
@@ -31,8 +31,8 @@ class ytdlp(TableProviderMixin, Provider):
- User runs: download-file "https://example.com/video"
- If URL is ytdlp-supported and no format specified, displays format table
- User selects @N (e.g., @3 for format index 3)
- Selection args include -format <format_id>, re-invoking download-file
- Second download-file call sees -format and skips table, downloads directly
- Selection args include -query "format:<format_id>", re-invoking download-file
- Second download-file call sees the format query and skips the table, downloads directly
SEARCH USAGE:
- User runs: search-file -provider ytdlp "linux tutorial"
@@ -41,12 +41,12 @@ class ytdlp(TableProviderMixin, Provider):
- Selection args route to download-file for streaming download
SELECTION FLOW (Format):
1. download-file receives URL without -format
1. download-file receives URL without a format query
2. Calls ytdlp to list formats
3. Returns formats as ResultTable (from this provider)
4. User selects @N
5. Selection args: ["-format", "<format_id>"] route back to download-file
6. Second download-file invocation with -format skips table
5. Selection args: ["-query", "format:<format_id>"] route back to download-file
6. Second download-file invocation with format query skips table
SELECTION FLOW (Search):
1. search-file lists YouTube videos via yt_dlp
@@ -56,7 +56,7 @@ class ytdlp(TableProviderMixin, Provider):
5. download-file downloads the selected video
TABLE AUTO-STAGES:
- Format selection: ytdlp.formatlist -> download-file (with -format)
- Format selection: ytdlp.formatlist -> download-file (with -query format:<id>)
- Video search: ytdlp.search -> download-file (with -url)
SUPPORTED URLS:
@@ -106,7 +106,7 @@ class ytdlp(TableProviderMixin, Provider):
"ytdlp.formatlist": ["download-file"],
"ytdlp.search": ["download-file"],
}
# Forward selection args (including -format or -url) to the next stage
# Forward selection args (including -query format:... or -url) to the next stage
AUTO_STAGE_USE_SELECTION_ARGS = True
def search(
@@ -277,7 +277,7 @@ try:
"""Return selection args for format selection.
When user selects @N, these args are passed to download-file which sees
the -format specifier and skips the format table, downloading directly.
the format query and skips the format table, downloading directly.
"""
metadata = row.metadata or {}
@@ -291,7 +291,7 @@ try:
# Fallback: use format_id
format_id = metadata.get("format_id") or metadata.get("id")
if format_id:
result_args = ["-format", str(format_id)]
result_args = ["-query", f"format:{format_id}"]
debug(f"[ytdlp] Selection routed with format_id: {format_id}")
return result_args

View File

@@ -10,6 +10,7 @@ import shutil
import sys
import tempfile
from collections.abc import Iterable as IterableABC
from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
from SYS.logger import log, debug
from pathlib import Path
@@ -3074,6 +3075,19 @@ def check_url_exists_in_storage(
pass
return False
def _load_preflight_cache() -> Dict[str, Any]:
try:
existing = pipeline_context.load_value("preflight", default=None)
except Exception:
existing = None
return existing if isinstance(existing, dict) else {}
def _store_preflight_cache(cache: Dict[str, Any]) -> None:
try:
pipeline_context.store_value("preflight", cache)
except Exception:
pass
unique_urls: List[str] = []
for u in urls or []:
s = str(u or "").strip()
@@ -3093,6 +3107,56 @@ def check_url_exists_in_storage(
except Exception:
return False
def _expand_url_variants(value: str) -> List[str]:
if not _httpish(value):
return []
try:
parsed = urlparse(value)
except Exception:
return []
if parsed.scheme.lower() not in {"http", "https"}:
return []
out: List[str] = []
def _maybe_add(candidate: str) -> None:
if not candidate or candidate == value:
return
if candidate not in out:
out.append(candidate)
if parsed.fragment:
_maybe_add(urlunparse(parsed._replace(fragment="")))
time_keys = {"t", "start", "time_continue", "timestamp", "time", "begin"}
tracking_prefixes = ("utm_",)
try:
query_pairs = parse_qsl(parsed.query, keep_blank_values=True)
except Exception:
query_pairs = []
if query_pairs or parsed.fragment:
filtered_pairs = []
removed = False
for key, val in query_pairs:
key_norm = str(key or "").lower()
if key_norm in time_keys:
removed = True
continue
if key_norm.startswith(tracking_prefixes):
removed = True
continue
filtered_pairs.append((key, val))
if removed:
new_query = urlencode(filtered_pairs, doseq=True) if filtered_pairs else ""
_maybe_add(urlunparse(parsed._replace(query=new_query, fragment="")))
return out
url_needles: Dict[str, List[str]] = {}
for u in unique_urls:
needles: List[str] = []
@@ -3112,7 +3176,88 @@ def check_url_exists_in_storage(
continue
if n2 not in filtered:
filtered.append(n2)
url_needles[u] = filtered if filtered else [u]
expanded: List[str] = []
for n2 in filtered:
for extra in _expand_url_variants(n2):
if extra not in expanded and extra not in filtered:
expanded.append(extra)
combined = filtered + expanded
url_needles[u] = combined if combined else [u]
if in_pipeline:
preflight_cache = _load_preflight_cache()
url_dup_cache = preflight_cache.get("url_duplicates")
if not isinstance(url_dup_cache, dict):
url_dup_cache = {}
cached_urls = url_dup_cache.get("urls")
cached_set = {str(u) for u in cached_urls} if isinstance(cached_urls, list) else set()
if cached_set:
all_cached = True
for original_url, needles in url_needles.items():
if original_url in cached_set:
continue
if any(n in cached_set for n in (needles or [])):
continue
all_cached = False
break
if all_cached:
debug("Bulk URL preflight: cached for pipeline; skipping duplicate check")
return True
def _search_backend_url_hits(
backend: Any,
backend_name: str,
original_url: str,
needles: Sequence[str],
) -> Optional[Dict[str, Any]]:
backend_hits: List[Dict[str, Any]] = []
for needle in (needles or [])[:3]:
try:
backend_hits = backend.search(f"url:{needle}", limit=1) or []
if backend_hits:
break
except Exception:
continue
if not backend_hits:
return None
hit = backend_hits[0]
title = hit.get("title") or hit.get("name") or hit.get("target") or hit.get("path") or "(exists)"
file_hash = hit.get("hash") or hit.get("file_hash") or hit.get("sha256") or ""
try:
from SYS.result_table import build_display_row
extracted = build_display_row(hit, keys=["title", "store", "hash", "ext", "size"])
except Exception:
extracted = {}
extracted["title"] = str(title)
extracted["store"] = str(hit.get("store") or backend_name)
extracted["hash"] = str(file_hash or "")
ext = extracted.get("ext")
size_val = extracted.get("size")
return {
"title": str(title),
"store": str(hit.get("store") or backend_name),
"hash": str(file_hash or ""),
"ext": str(ext or ""),
"size": size_val,
"url": original_url,
"columns": [
("Title", str(title)),
("Store", str(hit.get("store") or backend_name)),
("Hash", str(file_hash or "")),
("Ext", str(ext or "")),
("Size", size_val),
("URL", original_url),
],
}
backend_names: List[str] = []
try:
@@ -3167,12 +3312,11 @@ def check_url_exists_in_storage(
continue
if HydrusNetwork is not None and isinstance(backend, HydrusNetwork):
if not hydrus_available:
continue
client = getattr(backend, "_client", None)
if client is None:
continue
if not hydrus_available:
debug("Bulk URL preflight: hydrus availability check failed; attempting best-effort lookup")
for original_url, needles in url_needles.items():
if len(match_rows) >= max_rows:
@@ -3214,6 +3358,11 @@ def check_url_exists_in_storage(
continue
if not found:
fallback_row = _search_backend_url_hits(backend, str(backend_name), original_url, needles)
if fallback_row:
seen_pairs.add((original_url, str(backend_name)))
matched_urls.add(original_url)
match_rows.append(fallback_row)
continue
seen_pairs.add((original_url, str(backend_name)))
@@ -3239,57 +3388,33 @@ def check_url_exists_in_storage(
if (original_url, str(backend_name)) in seen_pairs:
continue
backend_hits: List[Dict[str, Any]] = []
for needle in (needles or [])[:3]:
try:
backend_hits = backend.search(f"url:{needle}", limit=1) or []
if backend_hits:
break
except Exception:
continue
if not backend_hits:
display_row = _search_backend_url_hits(backend, str(backend_name), original_url, needles)
if not display_row:
continue
seen_pairs.add((original_url, str(backend_name)))
matched_urls.add(original_url)
hit = backend_hits[0]
title = hit.get("title") or hit.get("name") or hit.get("target") or hit.get("path") or "(exists)"
file_hash = hit.get("hash") or hit.get("file_hash") or hit.get("sha256") or ""
try:
from SYS.result_table import build_display_row
extracted = build_display_row(hit, keys=["title", "store", "hash", "ext", "size"])
except Exception:
extracted = {}
extracted["title"] = str(title)
extracted["store"] = str(hit.get("store") or backend_name)
extracted["hash"] = str(file_hash or "")
ext = extracted.get("ext")
size_val = extracted.get("size")
display_row = {
"title": str(title),
"store": str(hit.get("store") or backend_name),
"hash": str(file_hash or ""),
"ext": str(ext or ""),
"size": size_val,
"url": original_url,
"columns": [
("Title", str(title)),
("Store", str(hit.get("store") or backend_name)),
("Hash", str(file_hash or "")),
("Ext", str(ext or "")),
("Size", size_val),
("URL", original_url),
],
}
match_rows.append(display_row)
if not match_rows:
debug("Bulk URL preflight: no matches")
if in_pipeline:
preflight_cache = _load_preflight_cache()
url_dup_cache = preflight_cache.get("url_duplicates")
if not isinstance(url_dup_cache, dict):
url_dup_cache = {}
cached_urls = url_dup_cache.get("urls")
cached_set = {str(u) for u in cached_urls} if isinstance(cached_urls, list) else set()
for original_url, needles in url_needles.items():
cached_set.add(original_url)
for needle in needles or []:
cached_set.add(str(needle))
url_dup_cache["urls"] = sorted(cached_set)
preflight_cache["url_duplicates"] = url_dup_cache
_store_preflight_cache(preflight_cache)
return True
table = ResultTable(f"URL already exists ({len(matched_urls)} url(s))", max_columns=10)
@@ -3333,6 +3458,13 @@ def check_url_exists_in_storage(
url_dup_cache = {}
url_dup_cache["command"] = str(current_cmd_text or "")
url_dup_cache["continue"] = bool(answered_yes)
cached_urls = url_dup_cache.get("urls")
cached_set = {str(u) for u in cached_urls} if isinstance(cached_urls, list) else set()
for original_url, needles in url_needles.items():
cached_set.add(original_url)
for needle in needles or []:
cached_set.add(str(needle))
url_dup_cache["urls"] = sorted(cached_set)
preflight_cache["url_duplicates"] = url_dup_cache
try:
pipeline_context.store_value("preflight", preflight_cache)

View File

@@ -81,23 +81,6 @@ class Download_File(Cmdlet):
alias="o",
description="(deprecated) Output directory (use -path instead)",
),
CmdletArg(
name="audio",
type="flag",
alias="a",
description="Download audio only (yt-dlp)",
),
CmdletArg(
name="-magnet-id",
type="string",
description="(internal) AllDebrid magnet id used by provider selection hooks",
),
CmdletArg(
name="format",
type="string",
alias="fmt",
description="Explicit yt-dlp format selector",
),
QueryArg(
"clip",
key="clip",
@@ -183,6 +166,42 @@ class Download_File(Cmdlet):
path_value: Optional[Any] = path
if isinstance(path, dict):
provider_action = str(
path.get("action")
or path.get("provider_action")
or ""
).strip().lower()
if provider_action == "download_items" or bool(path.get("download_items")):
request_metadata = path.get("metadata") or path.get("full_metadata") or {}
if not isinstance(request_metadata, dict):
request_metadata = {}
magnet_id = path.get("magnet_id") or request_metadata.get("magnet_id")
if magnet_id is not None:
request_metadata.setdefault("magnet_id", magnet_id)
if SearchResult is None:
debug("Provider download_items requested but SearchResult unavailable")
continue
sr = SearchResult(
table=str(provider_name),
title=str(path.get("title") or path.get("name") or f"{provider_name} item"),
path=str(path.get("path") or path.get("url") or url),
full_metadata=request_metadata,
)
downloaded_extra = self._download_provider_items(
provider=provider,
provider_name=str(provider_name),
search_result=sr,
output_dir=final_output_dir,
progress=progress,
quiet_mode=quiet_mode,
config=config,
)
if downloaded_extra:
downloaded_count += int(downloaded_extra)
continue
path_value = path.get("path") or path.get("file_path")
extra_meta = path.get("metadata") or path.get("full_metadata")
title_hint = path.get("title") or path.get("name")
@@ -451,6 +470,24 @@ class Download_File(Cmdlet):
provider_sr = sr
debug(f"[download-file] Provider download result: {downloaded_path}")
if downloaded_path is None:
try:
downloaded_extra = self._download_provider_items(
provider=provider_obj,
provider_name=str(provider_key),
search_result=sr,
output_dir=output_dir,
progress=progress,
quiet_mode=quiet_mode,
config=config,
)
except Exception:
downloaded_extra = 0
if downloaded_extra:
downloaded_count += int(downloaded_extra)
continue
# Fallback: if we have a direct HTTP URL and no provider successfully handled it
if (downloaded_path is None and not attempted_provider_download
and isinstance(target, str) and target.startswith("http")):
@@ -539,6 +576,68 @@ class Download_File(Cmdlet):
return downloaded_count, queued_magnet_submissions
def _download_provider_items(
self,
*,
provider: Any,
provider_name: str,
search_result: Any,
output_dir: Path,
progress: PipelineProgress,
quiet_mode: bool,
config: Dict[str, Any],
) -> int:
if provider is None or not hasattr(provider, "download_items"):
return 0
def _on_emit(path: Path, file_url: str, relpath: str, metadata: Dict[str, Any]) -> None:
title_hint = None
try:
title_hint = metadata.get("name") or relpath
except Exception:
title_hint = relpath
title_hint = title_hint or (Path(path).name if path else "download")
self._emit_local_file(
downloaded_path=path,
source=file_url,
title_hint=title_hint,
tags_hint=None,
media_kind_hint="file",
full_metadata=metadata if isinstance(metadata, dict) else None,
progress=progress,
config=config,
provider_hint=provider_name,
)
try:
downloaded_count = provider.download_items(
search_result,
output_dir,
emit=_on_emit,
progress=progress,
quiet_mode=quiet_mode,
path_from_result=coerce_to_path,
config=config,
)
except TypeError:
downloaded_count = provider.download_items(
search_result,
output_dir,
emit=_on_emit,
progress=progress,
quiet_mode=quiet_mode,
path_from_result=coerce_to_path,
)
except Exception as exc:
log(f"Provider {provider_name} download_items error: {exc}", file=sys.stderr)
return 0
try:
return int(downloaded_count or 0)
except Exception:
return 0
def _emit_local_file(
self,
*,
@@ -1221,11 +1320,27 @@ class Download_File(Cmdlet):
# Add base command for display
format_dict["cmd"] = base_cmd
def _merge_query_args(selection_args: List[str], query_value: str) -> List[str]:
if not query_value:
return selection_args
merged = list(selection_args or [])
if "-query" in merged:
idx_query = merged.index("-query")
if idx_query + 1 < len(merged):
existing = str(merged[idx_query + 1] or "").strip()
merged[idx_query + 1] = f"{existing},{query_value}" if existing else query_value
else:
merged.append(query_value)
else:
merged.extend(["-query", query_value])
return merged
# Append clip values to selection args if needed
selection_args: List[str] = format_dict["_selection_args"].copy()
selection_args: List[str] = list(format_dict.get("_selection_args") or [])
try:
if (not clip_spec) and clip_values:
selection_args.extend(["-query", f"clip:{','.join([v for v in clip_values if v])}"])
clip_query = f"clip:{','.join([v for v in clip_values if v])}"
selection_args = _merge_query_args(selection_args, clip_query)
except Exception:
pass
format_dict["_selection_args"] = selection_args
@@ -1253,7 +1368,9 @@ class Download_File(Cmdlet):
pipeline_context.set_last_result_table(table, results_list)
debug(f"[ytdlp.formatlist] Format table registered with {len(results_list)} formats")
debug(f"[ytdlp.formatlist] When user selects @N, will invoke: download-file {url} -format <format_id>")
debug(
f"[ytdlp.formatlist] When user selects @N, will invoke: download-file {url} -query 'format:<format_id>'"
)
log(f"", file=sys.stderr)
return 0
@@ -1518,7 +1635,7 @@ class Download_File(Cmdlet):
"url": url,
"item_selector": selection_format_id,
},
"_selection_args": ["-format", selection_format_id],
"_selection_args": ["-query", f"format:{selection_format_id}"],
}
results_list.append(format_dict)
@@ -1748,12 +1865,10 @@ class Download_File(Cmdlet):
except Exception:
query_wants_audio = False
audio_flag = bool(parsed.get("audio") is True)
wants_audio = audio_flag
if query_audio is not None:
wants_audio = wants_audio or bool(query_audio)
wants_audio = bool(query_audio)
else:
wants_audio = wants_audio or bool(query_wants_audio)
wants_audio = bool(query_wants_audio)
mode = "audio" if wants_audio else "video"
clip_ranges, clip_invalid, clip_values = self._parse_clip_ranges_and_apply_items(
@@ -1777,8 +1892,8 @@ class Download_File(Cmdlet):
formats_cache: Dict[str, Optional[List[Dict[str, Any]]]] = {}
playlist_items = str(parsed.get("item")) if parsed.get("item") else None
ytdl_format = parsed.get("format")
if not ytdl_format and query_format and not query_wants_audio:
ytdl_format = None
if query_format and not query_wants_audio:
try:
height_selector = self._format_selector_for_query_height(query_format)
except ValueError as e:
@@ -1825,7 +1940,7 @@ class Download_File(Cmdlet):
sample_pipeline = f'download-file "{candidate_url}"'
hint = (
"To select non-interactively, re-run with an explicit format: "
"e.g. mm \"{pipeline} -format {fmt} | add-file -store <store>\" or "
"e.g. mm \"{pipeline} -query 'format:{fmt}' | add-file -store <store>\" or "
"mm \"{pipeline} -query 'format:{index}' | add-file -store <store>\""
).format(
pipeline=sample_pipeline,
@@ -2735,18 +2850,6 @@ class Download_File(Cmdlet):
downloaded_count = 0
# Special-case: support selection-inserted magnet-id arg to drive provider downloads
magnet_ret = self._process_magnet_id(
parsed=parsed,
registry=registry,
config=config,
final_output_dir=final_output_dir,
progress=progress,
quiet_mode=quiet_mode
)
if magnet_ret is not None:
return magnet_ret
urls_downloaded, early_exit = self._process_explicit_urls(
raw_urls=raw_url,
final_output_dir=final_output_dir,
@@ -2800,104 +2903,6 @@ class Download_File(Cmdlet):
pass
progress.close_local_ui(force_complete=True)
def _process_magnet_id(
self,
*,
parsed: Dict[str, Any],
registry: Dict[str, Any],
config: Dict[str, Any],
final_output_dir: Path,
progress: PipelineProgress,
quiet_mode: bool
) -> Optional[int]:
magnet_id_raw = parsed.get("magnet-id")
if not magnet_id_raw:
return None
try:
magnet_id = int(str(magnet_id_raw).strip())
except Exception:
log(f"[download-file] invalid magnet-id: {magnet_id_raw}", file=sys.stderr)
return 1
get_provider = registry.get("get_provider")
provider_name = str(parsed.get("provider") or "alldebrid").strip().lower()
provider_obj = None
if get_provider is not None:
try:
provider_obj = get_provider(provider_name, config)
except Exception:
provider_obj = None
if provider_obj is None:
log(f"[download-file] provider '{provider_name}' not available", file=sys.stderr)
return 1
SearchResult = registry.get("SearchResult")
try:
if SearchResult is not None:
sr = SearchResult(
table=provider_name,
title=f"magnet-{magnet_id}",
path=f"alldebrid:magnet:{magnet_id}",
full_metadata={
"magnet_id": magnet_id,
"provider": provider_name,
"provider_view": "files",
},
)
else:
sr = None
except Exception:
sr = None
def _on_emit(path: Path, file_url: str, relpath: str, metadata: Dict[str, Any]) -> None:
title_hint = metadata.get("name") or relpath or f"magnet-{magnet_id}"
self._emit_local_file(
downloaded_path=path,
source=file_url or f"alldebrid:magnet:{magnet_id}",
title_hint=title_hint,
tags_hint=None,
media_kind_hint="file",
full_metadata=metadata,
progress=progress,
config=config,
provider_hint=provider_name,
)
try:
downloaded_extra = provider_obj.download_items(
sr,
final_output_dir,
emit=_on_emit,
progress=progress,
quiet_mode=quiet_mode,
path_from_result=coerce_to_path,
config=config,
)
except TypeError:
downloaded_extra = provider_obj.download_items(
sr,
final_output_dir,
emit=_on_emit,
progress=progress,
quiet_mode=quiet_mode,
path_from_result=coerce_to_path,
)
except Exception as exc:
log(f"[download-file] failed to download magnet {magnet_id}: {exc}", file=sys.stderr)
return 1
if downloaded_extra:
debug(f"[download-file] AllDebrid magnet {magnet_id} emitted {downloaded_extra} files")
return 0
log(
f"[download-file] AllDebrid magnet {magnet_id} produced no downloads",
file=sys.stderr,
)
return 1
def _maybe_show_provider_picker(
self,
*,

View File

@@ -75,6 +75,17 @@ class Get_Url(Cmdlet):
return url.lower()
@staticmethod
def _looks_like_url_pattern(value: str) -> bool:
v = str(value or "").strip().lower()
if not v:
return False
if "://" in v:
return True
if v.startswith(("magnet:", "torrent:", "ytdl:", "tidal:", "ftp:", "sftp:", "file:")):
return True
return "." in v and "/" in v
@staticmethod
def _match_url_pattern(url: str, pattern: str) -> bool:
"""Match URL against pattern with wildcard support.
@@ -82,10 +93,14 @@ class Get_Url(Cmdlet):
Strips protocol/www from both URL and pattern before matching.
Supports * and ? wildcards.
"""
raw_pattern = str(pattern or "").strip()
normalized_url = Get_Url._normalize_url_for_search(url)
normalized_pattern = Get_Url._normalize_url_for_search(pattern)
normalized_pattern = Get_Url._normalize_url_for_search(raw_pattern)
has_wildcards = any(ch in normalized_pattern for ch in ("*", "?"))
looks_like_url = Get_Url._looks_like_url_pattern(raw_pattern)
has_wildcards = "*" in normalized_pattern or (
not looks_like_url and "?" in normalized_pattern
)
if has_wildcards:
return fnmatch(normalized_url, normalized_pattern)
@@ -324,25 +339,58 @@ class Get_Url(Cmdlet):
# This avoids the expensive/incorrect "search('*')" scan.
try:
raw_pattern = str(pattern or "").strip()
has_wildcards = any(ch in raw_pattern for ch in ("*", "?"))
looks_like_url = self._looks_like_url_pattern(raw_pattern)
has_wildcards = "*" in raw_pattern or (
not looks_like_url and "?" in raw_pattern
)
# If this is a Hydrus backend and the pattern is a single URL,
# normalize it through the official API. Skip for bare domains.
normalized_url = None
looks_like_url = (
"://" in raw_pattern or raw_pattern.startswith("magnet:")
)
if not has_wildcards and looks_like_url and hasattr(backend, "get_url_info"):
try:
info = backend.get_url_info(raw_pattern) # type: ignore[attr-defined]
if isinstance(info, dict):
norm = info.get("normalised_url") or info.get("normalized_url")
if isinstance(norm, str) and norm.strip():
normalized_url = norm.strip()
except Exception:
normalized_url = None
normalized_search_pattern = None
if not has_wildcards and looks_like_url:
normalized_search_pattern = self._normalize_url_for_search(
raw_pattern
)
if (
normalized_search_pattern
and normalized_search_pattern != raw_pattern
):
debug(
"get-url normalized raw pattern: %s -> %s",
raw_pattern,
normalized_search_pattern,
)
if hasattr(backend, "get_url_info"):
try:
info = backend.get_url_info(raw_pattern) # type: ignore[attr-defined]
if isinstance(info, dict):
norm = (
info.get("normalised_url")
or info.get("normalized_url")
)
if isinstance(norm, str) and norm.strip():
normalized_url = self._normalize_url_for_search(
norm.strip()
)
except Exception:
pass
if (
normalized_url
and normalized_url != normalized_search_pattern
and normalized_url != raw_pattern
):
debug(
"get-url normalized backend result: %s -> %s",
raw_pattern,
normalized_url,
)
target_pattern = normalized_url or raw_pattern
target_pattern = (
normalized_url
or normalized_search_pattern
or raw_pattern
)
if has_wildcards or not target_pattern:
search_query = "url:*"
else:

View File

@@ -324,7 +324,7 @@ def format_for_table_selection(
This helper formats a single format from list_formats() into the shape
expected by the ResultTable system, ready for user selection and routing
to download-file with -format argument.
to download-file with -query "format:<id>".
Args:
fmt: Format dict from yt-dlp
@@ -403,9 +403,9 @@ def format_for_table_selection(
"format_id": format_id,
"url": url,
"item_selector": selection_format_id,
"_selection_args": ["-format", selection_format_id],
"_selection_args": ["-query", f"format:{selection_format_id}"],
},
"_selection_args": ["-format", selection_format_id],
"_selection_args": ["-query", f"format:{selection_format_id}"],
}