This commit is contained in:
2026-02-14 15:54:31 -08:00
parent ce2f28cc50
commit ae4880b164
7 changed files with 215 additions and 22 deletions

View File

@@ -493,7 +493,7 @@
"mediafire\\.com/(\\?|download/|file/|download\\.php\\?)([0-9a-z]{15})"
],
"regexp": "mediafire\\.com/(\\?|download/|file/|download\\.php\\?)([0-9a-z]{15})",
"status": true
"status": false
},
"mixdrop": {
"name": "mixdrop",

View File

@@ -2476,6 +2476,43 @@ class PipelineExecutor:
stage_args = stage_tokens[1:]
if cmd_name == "@":
# Special-case get-tag tables: `@ | add-tag ...` should target the
# underlying file subject once, not each emitted TagItem row.
try:
next_cmd = None
if stage_index + 1 < len(stages) and stages[stage_index + 1]:
next_cmd = str(stages[stage_index + 1][0]).replace("_", "-").strip().lower()
current_table = None
try:
current_table = ctx.get_current_stage_table() or ctx.get_last_result_table()
except Exception:
current_table = None
source_cmd = str(getattr(current_table, "source_command", "") or "").replace("_", "-").strip().lower()
is_get_tag_table = source_cmd == "get-tag"
if is_get_tag_table and next_cmd in {"add-tag"}:
subject = ctx.get_last_result_subject()
if subject is not None:
piped_result = subject
try:
subject_items = subject if isinstance(subject, list) else [subject]
ctx.set_last_items(subject_items)
except Exception:
logger.exception("Failed to set last_items from get-tag subject during @ handling")
if pipeline_session and worker_manager:
try:
worker_manager.log_step(
pipeline_session.worker_id,
"@ used get-tag table subject for add-tag"
)
except Exception:
logger.exception("Failed to record pipeline log step for '@ used get-tag table subject for add-tag' (pipeline_session=%r)", getattr(pipeline_session, 'worker_id', None))
continue
except Exception:
logger.exception("Failed to evaluate get-tag @ subject special-case")
# Prefer piping the last emitted/visible items (e.g. add-file results)
# over the result-table subject. The subject can refer to older context
# (e.g. a playlist row) and may not contain store+hash.
@@ -2870,7 +2907,16 @@ class PipelineExecutor:
progress_ui.begin_pipe(pipe_idx, total_items=1)
# RUN THE CMDLET
cmd_fn(piped_result, stage_args, config)
ret_code = cmd_fn(piped_result, stage_args, config)
if ret_code is not None:
try:
normalized_ret = int(ret_code)
except Exception:
normalized_ret = 0
if normalized_ret != 0:
pipeline_status = "failed"
pipeline_error = f"Stage '{cmd_name}' failed with exit code {normalized_ret}"
return
# Pipeline overlay tables (e.g., get-url detail views) need to be
# rendered when running inside a pipeline because the CLI path

View File

@@ -70,6 +70,15 @@ def get_result_table_row_style(row_index: int) -> str:
return f"{text_color} on {bg_color}"
def apply_result_table_layout(table: Any) -> None:
"""Apply compact, flush column layout options to a Rich table."""
table.padding = (0, 1)
if hasattr(table, "pad_edge"):
table.pad_edge = False
if hasattr(table, "collapse_padding"):
table.collapse_padding = True
def _sanitize_cell_text(value: Any) -> str:
"""Coerce to a single-line, tab-free string suitable for terminal display."""
if value is None:
@@ -1374,6 +1383,9 @@ class Table:
empty,
title=Text(str(self.title), style=RESULT_TABLE_HEADER_STYLE),
border_style=RESULT_TABLE_BORDER_STYLE,
padding=(0, 0),
expand=False,
style="on #ffffff",
)
if self.title
else empty
@@ -1391,10 +1403,14 @@ class Table:
show_header=True,
header_style=RESULT_TABLE_HEADER_STYLE,
border_style=RESULT_TABLE_BORDER_STYLE,
box=SIMPLE,
expand=True,
box=None,
expand=False,
show_lines=False,
padding=(0, 1),
pad_edge=False,
collapse_padding=True,
)
apply_result_table_layout(table)
if not self.interactive:
table.add_column("#", justify="right", no_wrap=True)
@@ -1410,6 +1426,8 @@ class Table:
header = header_by_key.get(name, str(name).upper())
if name.lower() == "ext":
table.add_column(header, no_wrap=True)
elif name.lower() == "tag":
table.add_column(header, overflow="fold")
else:
table.add_column(header)
@@ -1430,6 +1448,9 @@ class Table:
renderable,
title=Text(str(self.title), style=RESULT_TABLE_HEADER_STYLE),
border_style=RESULT_TABLE_BORDER_STYLE,
padding=(0, 0),
expand=False,
style="on #ffffff",
)
if self.title
else renderable

View File

@@ -15,6 +15,7 @@ logger = logging.getLogger(__name__)
from SYS.result_table import (
RESULT_TABLE_BORDER_STYLE,
RESULT_TABLE_HEADER_STYLE,
apply_result_table_layout,
get_result_table_row_style,
)
from SYS.result_table_api import ColumnSpec, ResultModel, ResultTable, Renderer
@@ -39,9 +40,18 @@ class RichRenderer(Renderer):
show_header=True,
header_style=RESULT_TABLE_HEADER_STYLE,
border_style=RESULT_TABLE_BORDER_STYLE,
box=None,
padding=(0, 1),
pad_edge=False,
collapse_padding=True,
expand=False,
)
apply_result_table_layout(table)
cols = list(columns)
for col in cols:
if str(col.header or "").strip().lower() == "tag":
table.add_column(col.header, overflow="fold")
else:
table.add_column(col.header)
for row_idx, r in enumerate(rows):

View File

@@ -2082,8 +2082,9 @@ class Download_File(Cmdlet):
except Exception:
height_selector = None
if query_wants_audio:
# Explicit audio request should map to the configured audio selector (usually '251/140/bestaudio')
ytdl_format = ytdlp_tool.default_format("audio")
# Explicit `format:audio` must always force bestaudio fallback chain
# and avoid format-list/selector ambiguity.
ytdl_format = "bestaudio/best"
elif height_selector:
ytdl_format = height_selector
elif query_format:

View File

@@ -144,6 +144,105 @@ class search_file(Cmdlet):
raw = str(value or "").strip().lower()
return "".join(ch for ch in raw if ch.isalnum())
@staticmethod
def _extract_namespace_tags(payload: Dict[str, Any]) -> List[str]:
"""Return deduplicated namespace tags from payload, excluding title:* tags."""
candidates: List[str] = []
def _add_candidate(value: Any) -> None:
if isinstance(value, str):
text = value.strip()
if text:
parts = re.split(r"[,;\n\r]+", text)
for part in parts:
token = part.strip().strip("[](){}\"'#")
if token:
candidates.append(token)
elif isinstance(value, dict):
for nested in value.values():
_add_candidate(nested)
elif isinstance(value, (list, tuple, set)):
for item in value:
_add_candidate(item)
_add_candidate(payload.get("tag"))
_add_candidate(payload.get("tags"))
_add_candidate(payload.get("tag_summary"))
metadata = payload.get("metadata")
if isinstance(metadata, dict):
_add_candidate(metadata.get("tag"))
_add_candidate(metadata.get("tags"))
meta_tags = metadata.get("tags")
if isinstance(meta_tags, dict):
for service_data in meta_tags.values():
if not isinstance(service_data, dict):
continue
display_tags = service_data.get("display_tags")
if isinstance(display_tags, dict):
for ns_name, tag_list in display_tags.items():
if isinstance(tag_list, list):
ns_text = str(ns_name or "").strip()
for tag_item in tag_list:
item_text = str(tag_item or "").strip()
if not item_text:
continue
if ":" in item_text:
candidates.append(item_text)
continue
if ns_text:
candidates.append(f"{ns_text}:{item_text}")
else:
candidates.append(item_text)
else:
_add_candidate(tag_list)
namespace_tags: List[str] = []
seen: set[str] = set()
for raw in candidates:
candidate = str(raw or "").strip()
if not candidate or ":" not in candidate:
continue
ns, value = candidate.split(":", 1)
ns_norm = ns.strip().lower()
value_norm = value.strip()
if not value_norm:
continue
if ns_norm == "title":
continue
normalized = f"{ns_norm}:{value_norm}"
key = normalized.lower()
if key in seen:
continue
seen.add(key)
namespace_tags.append(normalized)
return namespace_tags
def _set_storage_display_columns(self, payload: Dict[str, Any]) -> None:
"""Set explicit display columns for store search results."""
title_text = str(payload.get("title") or payload.get("name") or payload.get("filename") or "Result")
namespace_tags = self._extract_namespace_tags(payload)
tag_text = ", ".join(namespace_tags)
store_text = str(payload.get("store") or payload.get("table") or payload.get("source") or "")
size_raw = payload.get("size_bytes")
if size_raw is None:
size_raw = payload.get("size")
ext_text = str(payload.get("ext") or "")
payload["columns"] = [
("Title", title_text),
("Tag", tag_text),
("Store", store_text),
("Size", size_raw),
("Ext", ext_text),
]
def _ensure_storage_columns(self, payload: Dict[str, Any]) -> Dict[str, Any]:
"""Ensure storage results have the necessary fields for result_table display."""
@@ -166,8 +265,8 @@ class search_file(Cmdlet):
# Ensure size_bytes is present for display (already set by search_file())
# result_table will handle formatting it
# Don't create manual columns - let result_table handle display
# This allows the table to respect max_columns and apply consistent formatting
# Store search uses explicit columns so TAG can appear right after TITLE.
self._set_storage_display_columns(payload)
return payload
def _run_provider_search(
@@ -706,19 +805,35 @@ class search_file(Cmdlet):
# First try to extract from metadata tags dict
metadata_tags = meta_obj.get("tags")
if isinstance(metadata_tags, dict):
collected_tags: List[str] = []
for service_data in metadata_tags.values():
if isinstance(service_data, dict):
display_tags = service_data.get("display_tags", {})
if isinstance(display_tags, dict):
for tag_list in display_tags.values():
if isinstance(tag_list, list):
tags_list = [
str(t).strip() for t in tag_list
if isinstance(t, str) and str(t).strip()
]
break
if tags_list:
break
for ns_name, tag_list in display_tags.items():
if not isinstance(tag_list, list):
continue
ns_text = str(ns_name or "").strip()
for tag_item in tag_list:
tag_text = str(tag_item or "").strip()
if not tag_text:
continue
if ":" in tag_text:
collected_tags.append(tag_text)
elif ns_text:
collected_tags.append(f"{ns_text}:{tag_text}")
else:
collected_tags.append(tag_text)
if collected_tags:
dedup: List[str] = []
seen_tags: set[str] = set()
for tag_text in collected_tags:
key = tag_text.lower()
if key in seen_tags:
continue
seen_tags.add(key)
dedup.append(tag_text)
tags_list = dedup
# Fallback: if metadata didn't include tags, call get_tag() separately
# (This maintains compatibility with backends that don't include tags in metadata)
@@ -788,6 +903,8 @@ class search_file(Cmdlet):
"url": meta_obj.get("url") or [],
}
self._set_storage_display_columns(payload)
table.add_result(payload)
results_list.append(payload)
ctx.emit(payload)

View File

@@ -1861,12 +1861,10 @@ def download_media(opts: DownloadOptions, *, config: Optional[Dict[str, Any]] =
probe_result = probe_url(opts.url, no_playlist=opts.no_playlist, timeout_seconds=15, cookiefile=probe_cookiefile)
if probe_result is None:
msg = "yt-dlp could not detect media for this URL; use download-file for direct downloads"
if not opts.quiet:
log(msg)
debug("yt-dlp probe returned no metadata; continuing with direct download attempt")
if debug_logger is not None:
debug_logger.write_record("ytdlp-skip-no-media", {"url": opts.url})
raise DownloadError(msg)
debug_logger.write_record("ytdlp-probe-miss-continue", {"url": opts.url})
ensure_yt_dlp_ready()