Files
Medios-Macina/cmdlet/search_file.py
2026-03-06 00:57:50 -08:00

2291 lines
90 KiB
Python

"""search-file cmdlet: Search for files in storage backends (Hydrus)."""
from __future__ import annotations
from typing import Any, Dict, Sequence, List, Optional
from collections import deque
import uuid
from pathlib import Path
import re
import json
import sys
import html
import time
from urllib.parse import urlparse, parse_qs, unquote, urljoin
from SYS.logger import log, debug
from ProviderCore.registry import get_search_provider, list_search_providers
from SYS.rich_display import (
show_provider_config_panel,
show_store_config_panel,
show_available_providers_panel,
)
from SYS.database import insert_worker, update_worker, append_worker_stdout
from ._shared import (
Cmdlet,
CmdletArg,
SharedArgs,
get_field,
should_show_help,
normalize_hash,
first_title_tag,
parse_hash_query,
)
from SYS import pipeline as ctx
class _WorkerLogger:
def __init__(self, worker_id: str) -> None:
self.worker_id = worker_id
def __enter__(self) -> "_WorkerLogger":
return self
def __exit__(self, exc_type, exc, tb) -> None: # type: ignore[override]
return None
def insert_worker(
self,
worker_id: str,
worker_type: str,
title: str = "",
description: str = "",
**kwargs: Any,
) -> None:
try:
insert_worker(worker_id, worker_type, title=title, description=description)
except Exception:
pass
def update_worker_status(self, worker_id: str, status: str) -> None:
try:
normalized = (status or "").lower()
kwargs: dict[str, str] = {"status": status}
if normalized in {"completed", "error", "cancelled"}:
kwargs["result"] = normalized
update_worker(worker_id, **kwargs)
except Exception:
pass
def append_worker_stdout(self, worker_id: str, content: str) -> None:
try:
append_worker_stdout(worker_id, content)
except Exception:
pass
class search_file(Cmdlet):
"""Class-based search-file cmdlet for searching storage backends."""
def __init__(self) -> None:
super().__init__(
name="search-file",
summary="Search storage backends (Hydrus) or external providers (via -provider).",
usage="search-file [-query <query>] [-store BACKEND] [-limit N] [-provider NAME]",
arg=[
CmdletArg(
"limit",
type="integer",
description="Limit results (default: 100)"
),
SharedArgs.STORE,
SharedArgs.QUERY,
CmdletArg(
"provider",
type="string",
description="External provider name (e.g., tidal, youtube, soulseek, etc)",
),
CmdletArg(
"open",
type="integer",
description="(alldebrid) Open folder/magnet by ID and list its files",
),
],
detail=[
"Search across storage backends: Hydrus instances",
"Use -store to search a specific backend by name",
"URL search: url:* (any URL) or url:<value> (URL substring)",
"Extension search: ext:<value> (e.g., ext:png)",
"Hydrus-style extension: system:filetype = png",
"Results include hash for downstream commands (get-file, add-tag, etc.)",
"Examples:",
"search-file -query foo # Search all storage backends",
"search-file -store home -query '*' # Search 'home' Hydrus instance",
"search-file -store home -query 'video' # Search 'home' Hydrus instance",
"search-file -query 'hash:deadbeef...' # Search by SHA256 hash",
"search-file -query 'url:*' # Files that have any URL",
"search-file -query 'url:youtube.com' # Files whose URL contains substring",
"search-file -query 'ext:png' # Files whose metadata ext is png",
"search-file -query 'system:filetype = png' # Hydrus: native",
"search-file 'example.com/path' -query 'ext:pdf' # Web: site:example.com filetype:pdf",
"search-file -query 'site:example.com filetype:epub history' # Web: site-scoped search",
"",
"Provider search (-provider):",
"search-file -provider youtube 'tutorial' # Search YouTube provider",
"search-file -provider alldebrid '*' # List AllDebrid magnets",
"search-file -provider alldebrid -open 123 '*' # Show files for a magnet",
],
exec=self.run,
)
self.register()
# --- Helper methods -------------------------------------------------
@staticmethod
def _normalize_host(value: Any) -> str:
"""Normalize host names for matching/filtering."""
host = str(value or "").strip().lower()
if host.startswith("www."):
host = host[4:]
if ":" in host:
host = host.split(":", 1)[0]
return host
@classmethod
def _extract_site_host(cls, candidate: Any) -> Optional[str]:
"""Extract a host/domain from URL-like input."""
raw = str(candidate or "").strip().strip('"').strip("'")
if not raw:
return None
if raw.lower().startswith("site:"):
raw = raw.split(":", 1)[1].strip()
parsed = None
try:
parsed = urlparse(raw)
except Exception:
parsed = None
if parsed is None or not getattr(parsed, "hostname", None):
try:
parsed = urlparse(f"https://{raw}")
except Exception:
parsed = None
host = ""
try:
host = str(getattr(parsed, "hostname", "") or "").strip().lower()
except Exception:
host = ""
host = cls._normalize_host(host)
if not host or "." not in host:
return None
return host
@staticmethod
def _normalize_space(text: Any) -> str:
return re.sub(r"\s+", " ", str(text or "")).strip()
@classmethod
def _build_web_search_plan(
cls,
*,
query: str,
positional_args: List[str],
storage_backend: Optional[str],
store_filter: Optional[str],
hash_query: List[str],
) -> Optional[Dict[str, Any]]:
"""Build web-search plan for URL + ext/filetype query syntax.
Example input:
search-file "example.com/foo" -query "ext:pdf"
Produces:
site:example.com filetype:pdf
"""
if storage_backend or store_filter or hash_query:
return None
text = cls._normalize_space(query)
if not text:
return None
# Avoid hijacking explicit local search DSL (url:, tag:, hash:, etc.).
local_markers = ("url:", "hash:", "tag:", "store:", "system:")
if any(marker in text.lower() for marker in local_markers):
return None
site_host: Optional[str] = None
site_from_positional = False
site_token_to_strip = ""
seed_url = ""
site_match = re.search(r"(?:^|\s)site:([^\s,]+)", text, flags=re.IGNORECASE)
if site_match:
site_host = cls._extract_site_host(site_match.group(1))
seed_url = str(site_match.group(1) or "").strip()
if not site_host and positional_args:
site_host = cls._extract_site_host(positional_args[0])
site_from_positional = bool(site_host)
if site_from_positional:
site_token_to_strip = str(positional_args[0] or "").strip()
seed_url = site_token_to_strip
if not site_host:
for token in text.split():
candidate = str(token or "").strip().strip(",")
if not candidate:
continue
lower_candidate = candidate.lower()
if lower_candidate.startswith(("ext:", "filetype:", "type:", "site:")):
continue
if re.match(r"^[a-z]+:", lower_candidate) and not lower_candidate.startswith(
("http://", "https://")
):
continue
guessed = cls._extract_site_host(candidate)
if guessed:
site_host = guessed
site_token_to_strip = candidate
break
if not site_host:
return None
filetype_match = re.search(
r"(?:^|\s)(?:ext|filetype|type):\.?([a-z0-9]{1,12})\b",
text,
flags=re.IGNORECASE,
)
filetype = cls._normalize_extension(filetype_match.group(1)) if filetype_match else ""
# Feature gate: trigger this web-search mode when filetype is present
# or user explicitly provided site: syntax.
has_explicit_site = bool(site_match)
if not filetype and not has_explicit_site:
return None
residual = text
residual = re.sub(r"(?:^|\s)site:[^\s,]+", " ", residual, flags=re.IGNORECASE)
residual = re.sub(
r"(?:^|\s)(?:ext|filetype|type):\.?[a-z0-9]{1,12}\b",
" ",
residual,
flags=re.IGNORECASE,
)
if site_from_positional and positional_args:
first = str(positional_args[0] or "").strip()
if first:
residual = re.sub(rf"(?:^|\s){re.escape(first)}(?:\s|$)", " ", residual, count=1)
elif site_token_to_strip:
residual = re.sub(
rf"(?:^|\s){re.escape(site_token_to_strip)}(?:\s|$)",
" ",
residual,
count=1,
)
residual = cls._normalize_space(residual)
search_terms: List[str] = [f"site:{site_host}"]
if filetype:
search_terms.append(f"filetype:{filetype}")
if residual:
search_terms.append(residual)
search_query = " ".join(search_terms).strip()
if not search_query:
return None
normalized_seed_url = cls._normalize_seed_url(seed_url, site_host)
return {
"site_host": site_host,
"filetype": filetype,
"search_query": search_query,
"residual": residual,
"seed_url": normalized_seed_url,
}
@classmethod
def _normalize_seed_url(cls, seed_value: Any, site_host: str) -> str:
"""Build a safe crawl starting URL from user input and resolved host."""
raw = str(seed_value or "").strip().strip("'\"")
if not raw:
raw = str(site_host or "").strip()
if raw and not raw.startswith(("http://", "https://")):
raw = f"https://{raw}"
try:
parsed = urlparse(raw)
except Exception:
parsed = urlparse("")
target = cls._normalize_host(site_host)
host = cls._normalize_host(getattr(parsed, "hostname", "") or "")
if target and host and not (host == target or host.endswith(f".{target}")):
return f"https://{target}/"
scheme = str(getattr(parsed, "scheme", "") or "https").lower()
if scheme not in {"http", "https"}:
scheme = "https"
netloc = str(getattr(parsed, "netloc", "") or "").strip()
if not netloc:
netloc = target
path = str(getattr(parsed, "path", "") or "").strip()
if not path:
path = "/"
return f"{scheme}://{netloc}{path}"
@staticmethod
def _is_probable_html_path(path_value: str) -> bool:
"""Return True when URL path likely points to an HTML page."""
path = str(path_value or "").strip()
if not path:
return True
suffix = Path(path).suffix.lower()
if not suffix:
return True
return suffix in {".html", ".htm", ".php", ".asp", ".aspx", ".jsp", ".shtml", ".xhtml"}
@classmethod
def _extract_html_links(cls, *, html_text: str, base_url: str) -> List[str]:
"""Extract absolute links from an HTML document."""
links: List[str] = []
seen: set[str] = set()
def _add_link(raw_href: Any) -> None:
href = str(raw_href or "").strip()
if not href or href.startswith(("#", "javascript:", "mailto:")):
return
try:
absolute = urljoin(base_url, href)
parsed = urlparse(absolute)
except Exception:
return
if str(getattr(parsed, "scheme", "") or "").lower() not in {"http", "https"}:
return
clean = parsed._replace(fragment="").geturl()
if clean in seen:
return
seen.add(clean)
links.append(clean)
try:
from lxml import html as lxml_html
doc = lxml_html.fromstring(html_text or "")
for node in doc.xpath("//a[@href]"):
_add_link(node.get("href"))
except Exception:
href_pattern = re.compile(r'<a[^>]+href=["\']([^"\']+)["\']', flags=re.IGNORECASE)
for match in href_pattern.finditer(html_text or ""):
_add_link(match.group(1))
return links
@classmethod
def _crawl_site_for_extension(
cls,
*,
seed_url: str,
site_host: str,
extension: str,
limit: int,
max_duration_seconds: float = 15.0,
) -> List[Dict[str, str]]:
"""Fallback crawler that discovers in-site file links by extension."""
from API.requests_client import get_requests_session
normalized_ext = cls._normalize_extension(extension)
if not normalized_ext:
return []
start_url = cls._normalize_seed_url(seed_url, site_host)
if not start_url:
return []
session = get_requests_session()
headers = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
),
"Accept-Language": "en-US,en;q=0.9",
}
queue: deque[str] = deque([start_url])
queued: set[str] = {start_url}
visited_pages: set[str] = set()
seen_files: set[str] = set()
rows: List[Dict[str, str]] = []
normalized_limit = max(1, min(int(limit or 1), 100))
max_pages = max(8, min(normalized_limit * 4, 64))
crawl_deadline = time.monotonic() + max(5.0, float(max_duration_seconds or 0.0))
while (
queue
and len(visited_pages) < max_pages
and len(rows) < normalized_limit
and time.monotonic() < crawl_deadline
):
page_url = queue.popleft()
queued.discard(page_url)
if page_url in visited_pages:
continue
visited_pages.add(page_url)
if time.monotonic() >= crawl_deadline:
break
try:
response = session.get(page_url, timeout=(4, 8), headers=headers)
response.raise_for_status()
except Exception:
continue
final_url = str(getattr(response, "url", "") or page_url)
try:
parsed_final = urlparse(final_url)
except Exception:
continue
final_host = cls._normalize_host(getattr(parsed_final, "hostname", "") or "")
if not cls._url_matches_site(final_url, site_host):
continue
final_path = str(getattr(parsed_final, "path", "") or "")
direct_ext = cls._normalize_extension(Path(final_path).suffix)
if direct_ext == normalized_ext:
file_url = parsed_final._replace(fragment="").geturl()
if file_url not in seen_files:
seen_files.add(file_url)
title = Path(unquote(final_path)).name or file_url
rows.append(
{
"url": file_url,
"title": title,
"snippet": "Discovered via in-site crawl",
}
)
continue
content_type = str((response.headers or {}).get("content-type", "") or "").lower()
if "html" not in content_type and "xhtml" not in content_type:
continue
html_text = str(getattr(response, "text", "") or "")
if not html_text:
continue
if len(html_text) > 2_500_000:
# Avoid parsing extremely large pages during fallback crawl mode.
continue
discovered_links = cls._extract_html_links(html_text=html_text, base_url=final_url)
for idx, target in enumerate(discovered_links):
if len(rows) >= normalized_limit:
break
if idx >= 300:
break
if time.monotonic() >= crawl_deadline:
break
try:
parsed_target = urlparse(target)
except Exception:
continue
target_host = cls._normalize_host(getattr(parsed_target, "hostname", "") or "")
if not target_host or not (target_host == final_host or target_host.endswith(f".{site_host}")):
if not cls._url_matches_site(target, site_host):
continue
target_clean = parsed_target._replace(fragment="").geturl()
target_path = str(getattr(parsed_target, "path", "") or "")
target_ext = cls._normalize_extension(Path(target_path).suffix)
if target_ext == normalized_ext:
if target_clean in seen_files:
continue
seen_files.add(target_clean)
title = Path(unquote(target_path)).name or target_clean
rows.append(
{
"url": target_clean,
"title": title,
"snippet": f"Discovered via crawl from {final_path or '/'}",
}
)
continue
if cls._is_probable_html_path(target_path):
if target_clean not in visited_pages and target_clean not in queued:
queue.append(target_clean)
queued.add(target_clean)
if time.monotonic() >= crawl_deadline:
debug(
"Web crawl fallback reached time budget",
{
"site": site_host,
"visited_pages": len(visited_pages),
"queued_pages": len(queue),
"results": len(rows),
"time_budget_seconds": max_duration_seconds,
},
)
return rows[:normalized_limit]
@staticmethod
def _extract_duckduckgo_target_url(href: Any) -> str:
"""Extract direct target URL from DuckDuckGo result links."""
raw_href = str(href or "").strip()
if not raw_href:
return ""
if raw_href.startswith("//"):
raw_href = f"https:{raw_href}"
if raw_href.startswith("/"):
raw_href = f"https://duckduckgo.com{raw_href}"
parsed = None
try:
parsed = urlparse(raw_href)
except Exception:
parsed = None
try:
host = str(getattr(parsed, "hostname", "") or "").strip().lower()
except Exception:
host = ""
if host.endswith("duckduckgo.com"):
try:
query = parse_qs(str(getattr(parsed, "query", "") or ""))
candidate = (query.get("uddg") or [""])[0]
if candidate:
return str(unquote(candidate)).strip()
except Exception:
pass
return raw_href
@staticmethod
def _extract_yahoo_target_url(href: Any) -> str:
"""Extract direct target URL from Yahoo redirect links."""
raw_href = str(href or "").strip()
if not raw_href:
return ""
# Yahoo result links often look like:
# https://r.search.yahoo.com/.../RU=<url-encoded-target>/RK=...
ru_match = re.search(r"/RU=([^/]+)/RK=", raw_href, flags=re.IGNORECASE)
if ru_match:
try:
return str(unquote(ru_match.group(1))).strip()
except Exception:
pass
# Fallback for query-string variants.
try:
parsed = urlparse(raw_href)
query = parse_qs(str(getattr(parsed, "query", "") or ""))
candidate = (query.get("RU") or query.get("ru") or [""])[0]
if candidate:
return str(unquote(candidate)).strip()
except Exception:
pass
return raw_href
@classmethod
def _url_matches_site(cls, url: str, site_host: str) -> bool:
"""Return True when URL host is the requested site/subdomain."""
try:
parsed = urlparse(str(url or ""))
host = cls._normalize_host(getattr(parsed, "hostname", "") or "")
except Exception:
return False
target = cls._normalize_host(site_host)
if not host or not target:
return False
return host == target or host.endswith(f".{target}")
@classmethod
def _parse_duckduckgo_results(
cls,
*,
html_text: str,
site_host: str,
limit: int,
) -> List[Dict[str, str]]:
"""Parse DuckDuckGo HTML results into normalized rows."""
items: List[Dict[str, str]] = []
seen_urls: set[str] = set()
def _add_item(url_text: str, title_text: str, snippet_text: str) -> None:
url_clean = str(url_text or "").strip()
if not url_clean:
return
if not url_clean.startswith(("http://", "https://")):
return
if not cls._url_matches_site(url_clean, site_host):
return
if url_clean in seen_urls:
return
seen_urls.add(url_clean)
title_clean = cls._normalize_space(title_text)
snippet_clean = cls._normalize_space(snippet_text)
items.append(
{
"url": url_clean,
"title": title_clean or url_clean,
"snippet": snippet_clean,
}
)
# Preferred parser path (lxml is already a project dependency).
try:
from lxml import html as lxml_html
doc = lxml_html.fromstring(html_text or "")
result_nodes = doc.xpath("//div[contains(@class, 'result')]")
for node in result_nodes:
links = node.xpath(".//a[contains(@class, 'result__a')]")
if not links:
continue
link = links[0]
href = cls._extract_duckduckgo_target_url(link.get("href"))
title = " ".join([str(t).strip() for t in link.itertext() if str(t).strip()])
snippet_nodes = node.xpath(".//*[contains(@class, 'result__snippet')]")
snippet = ""
if snippet_nodes:
snippet = " ".join(
[str(t).strip() for t in snippet_nodes[0].itertext() if str(t).strip()]
)
_add_item(href, title, snippet)
if len(items) >= limit:
break
except Exception:
# Fallback to regex parser below.
pass
if items:
return items[:limit]
# Regex fallback for environments where HTML parsing fails.
anchor_pattern = re.compile(
r'<a[^>]+class="[^"]*result__a[^"]*"[^>]+href="([^"]+)"[^>]*>(.*?)</a>',
flags=re.IGNORECASE | re.DOTALL,
)
for match in anchor_pattern.finditer(html_text or ""):
href = cls._extract_duckduckgo_target_url(match.group(1))
title_html = match.group(2)
title = re.sub(r"<[^>]+>", " ", str(title_html or ""))
title = html.unescape(title)
_add_item(href, title, "")
if len(items) >= limit:
break
return items[:limit]
@classmethod
def _parse_yahoo_results(
cls,
*,
html_text: str,
site_host: str,
limit: int,
) -> List[Dict[str, str]]:
"""Parse Yahoo HTML search results into normalized rows."""
items: List[Dict[str, str]] = []
seen_urls: set[str] = set()
def _add_item(url_text: str, title_text: str, snippet_text: str) -> None:
url_clean = str(url_text or "").strip()
if not url_clean or not url_clean.startswith(("http://", "https://")):
return
if not cls._url_matches_site(url_clean, site_host):
return
if url_clean in seen_urls:
return
seen_urls.add(url_clean)
items.append(
{
"url": url_clean,
"title": cls._normalize_space(title_text) or url_clean,
"snippet": cls._normalize_space(snippet_text),
}
)
try:
from lxml import html as lxml_html
doc = lxml_html.fromstring(html_text or "")
for node in doc.xpath("//a[@href]"):
href = cls._extract_yahoo_target_url(node.get("href"))
title = " ".join([str(t).strip() for t in node.itertext() if str(t).strip()])
_add_item(href, title, "")
if len(items) >= limit:
break
except Exception:
anchor_pattern = re.compile(
r'<a[^>]+href=["\']([^"\']+)["\'][^>]*>(.*?)</a>',
flags=re.IGNORECASE | re.DOTALL,
)
for match in anchor_pattern.finditer(html_text or ""):
href = cls._extract_yahoo_target_url(match.group(1))
title_html = match.group(2)
title = re.sub(r"<[^>]+>", " ", str(title_html or ""))
title = html.unescape(title)
_add_item(href, title, "")
if len(items) >= limit:
break
return items[:limit]
@classmethod
def _query_yahoo(
cls,
*,
search_query: str,
site_host: str,
limit: int,
session: Any,
deadline: Optional[float] = None,
) -> List[Dict[str, str]]:
"""Fetch results from Yahoo search (robust fallback in bot-protected envs)."""
all_rows: List[Dict[str, str]] = []
seen_urls: set[str] = set()
max_pages = max(1, min((max(1, int(limit or 1)) + 9) // 10, 3))
for page_idx in range(max_pages):
if deadline is not None and time.monotonic() >= deadline:
break
params = {
"p": search_query,
"n": "10",
"b": str((page_idx * 10) + 1),
}
try:
read_timeout = 10.0
if deadline is not None:
remaining = max(0.0, float(deadline - time.monotonic()))
if remaining <= 0.0:
break
read_timeout = max(3.0, min(10.0, remaining))
response = session.get(
"https://search.yahoo.com/search",
params=params,
timeout=(3, read_timeout),
headers={
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
),
"Accept-Language": "en-US,en;q=0.9",
},
)
response.raise_for_status()
except Exception:
break
page_rows = cls._parse_yahoo_results(
html_text=response.text,
site_host=site_host,
limit=max(1, limit - len(all_rows)),
)
new_rows = 0
for row in page_rows:
url_value = str(row.get("url") or "").strip()
if not url_value or url_value in seen_urls:
continue
seen_urls.add(url_value)
all_rows.append(row)
new_rows += 1
if len(all_rows) >= limit:
break
if len(all_rows) >= limit or new_rows == 0:
break
return all_rows[:limit]
@classmethod
def _parse_bing_results(
cls,
*,
html_text: str,
site_host: str,
limit: int,
) -> List[Dict[str, str]]:
"""Parse Bing HTML search results into normalized rows."""
items: List[Dict[str, str]] = []
seen_urls: set[str] = set()
def _add_item(url_text: str, title_text: str, snippet_text: str) -> None:
url_clean = str(url_text or "").strip()
if not url_clean or not url_clean.startswith(("http://", "https://")):
return
if not cls._url_matches_site(url_clean, site_host):
return
if url_clean in seen_urls:
return
seen_urls.add(url_clean)
items.append(
{
"url": url_clean,
"title": cls._normalize_space(title_text) or url_clean,
"snippet": cls._normalize_space(snippet_text),
}
)
try:
from lxml import html as lxml_html
doc = lxml_html.fromstring(html_text or "")
result_nodes = doc.xpath("//li[contains(@class, 'b_algo')]")
for node in result_nodes:
links = node.xpath(".//h2/a")
if not links:
continue
link = links[0]
href = str(link.get("href") or "").strip()
title = " ".join([str(t).strip() for t in link.itertext() if str(t).strip()])
snippet = ""
for sel in (
".//*[contains(@class,'b_caption')]//p",
".//*[contains(@class,'b_snippet')]",
".//p",
):
snip_nodes = node.xpath(sel)
if snip_nodes:
snippet = " ".join(
[str(t).strip() for t in snip_nodes[0].itertext() if str(t).strip()]
)
break
_add_item(href, title, snippet)
if len(items) >= limit:
break
except Exception:
anchor_pattern = re.compile(
r"<h2[^>]*>\s*<a[^>]+href=\"([^\"]+)\"[^>]*>(.*?)</a>",
flags=re.IGNORECASE | re.DOTALL,
)
for match in anchor_pattern.finditer(html_text or ""):
href = match.group(1)
title = re.sub(r"<[^>]+>", " ", str(match.group(2) or ""))
title = html.unescape(title)
_add_item(href, title, "")
if len(items) >= limit:
break
return items[:limit]
@classmethod
def _query_web_search(
cls,
*,
search_query: str,
site_host: str,
limit: int,
) -> List[Dict[str, str]]:
"""Execute web search and return parsed result rows.
Uses Yahoo first (works in environments where Bing/DDG HTML endpoints
are challenge-gated), then Bing, then DuckDuckGo.
"""
from API.requests_client import get_requests_session
session = get_requests_session()
normalized_limit = max(1, min(int(limit or 1), 100))
engine_deadline = time.monotonic() + 12.0
# Yahoo often remains parseable where other engines challenge bots.
all_rows = cls._query_yahoo(
search_query=search_query,
site_host=site_host,
limit=normalized_limit,
session=session,
deadline=engine_deadline,
)
if all_rows:
return all_rows[:normalized_limit]
# Bing reliably supports filetype: and site: operators when not challenged.
all_rows = cls._query_bing(
search_query=search_query,
site_host=site_host,
limit=normalized_limit,
session=session,
deadline=engine_deadline,
)
if all_rows:
return all_rows[:normalized_limit]
# DDG fallback.
all_rows_ddg: List[Dict[str, str]] = []
seen_urls: set[str] = set()
endpoints = [
"https://html.duckduckgo.com/html/",
"https://duckduckgo.com/html/",
]
for endpoint in endpoints:
if time.monotonic() >= engine_deadline:
break
max_offsets = min(3, max(1, (normalized_limit + 29) // 30))
for page_idx in range(max_offsets):
if time.monotonic() >= engine_deadline:
break
offset = page_idx * 30
params = {"q": search_query, "s": str(offset)}
remaining = max(0.0, float(engine_deadline - time.monotonic()))
if remaining <= 0.0:
break
read_timeout = max(3.0, min(10.0, remaining))
response = session.get(
endpoint,
params=params,
timeout=(3, read_timeout),
headers={"Referer": "https://duckduckgo.com/"},
)
response.raise_for_status()
page_rows = cls._parse_duckduckgo_results(
html_text=response.text,
site_host=site_host,
limit=max(1, normalized_limit - len(all_rows_ddg)),
)
new_rows = 0
for row in page_rows:
url_value = str(row.get("url") or "").strip()
if not url_value or url_value in seen_urls:
continue
seen_urls.add(url_value)
all_rows_ddg.append(row)
new_rows += 1
if len(all_rows_ddg) >= normalized_limit:
break
if len(all_rows_ddg) >= normalized_limit or new_rows == 0:
break
if all_rows_ddg:
break
return all_rows_ddg[:normalized_limit]
@classmethod
def _query_bing(
cls,
*,
search_query: str,
site_host: str,
limit: int,
session: Any,
deadline: Optional[float] = None,
) -> List[Dict[str, str]]:
"""Fetch results from Bing (supports filetype: and site: natively)."""
all_rows: List[Dict[str, str]] = []
seen_urls: set[str] = set()
page_start = 1
pages_checked = 0
max_pages = max(1, min((max(1, int(limit or 1)) + 49) // 50, 3))
while len(all_rows) < limit and pages_checked < max_pages:
if deadline is not None and time.monotonic() >= deadline:
break
params = {"q": search_query, "first": str(page_start), "count": "50"}
try:
read_timeout = 10.0
if deadline is not None:
remaining = max(0.0, float(deadline - time.monotonic()))
if remaining <= 0.0:
break
read_timeout = max(3.0, min(10.0, remaining))
response = session.get(
"https://www.bing.com/search",
params=params,
timeout=(3, read_timeout),
headers={
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
),
"Accept-Language": "en-US,en;q=0.9",
},
)
response.raise_for_status()
except Exception:
break
page_rows = cls._parse_bing_results(
html_text=response.text,
site_host=site_host,
limit=max(1, limit - len(all_rows)),
)
new_rows = 0
for row in page_rows:
url_value = str(row.get("url") or "").strip()
if not url_value or url_value in seen_urls:
continue
seen_urls.add(url_value)
all_rows.append(row)
new_rows += 1
if len(all_rows) >= limit:
break
if new_rows == 0 or len(all_rows) >= limit:
break
page_start += 50
pages_checked += 1
return all_rows
def _run_web_search(
self,
*,
web_plan: Dict[str, Any],
limit: int,
args_list: List[str],
refresh_mode: bool,
command_title: str,
) -> int:
"""Execute URL-scoped web search and emit downloadable table rows."""
site_host = str(web_plan.get("site_host") or "").strip().lower()
search_query = str(web_plan.get("search_query") or "").strip()
requested_type = self._normalize_extension(web_plan.get("filetype") or "")
seed_url = str(web_plan.get("seed_url") or "").strip()
if not site_host or not search_query:
log("Error: invalid website search request", file=sys.stderr)
return 1
worker_id = str(uuid.uuid4())
try:
insert_worker(
worker_id,
"search-file",
title=f"Web Search: {search_query}",
description=f"Site: {site_host}",
)
except Exception:
pass
try:
from SYS.result_table import Table
rows = self._query_web_search(
search_query=search_query,
site_host=site_host,
limit=limit,
)
if not rows and requested_type:
debug(
"Web search returned 0 rows; falling back to in-site crawl",
{"site": site_host, "ext": requested_type, "seed_url": seed_url},
)
rows = self._crawl_site_for_extension(
seed_url=seed_url or f"https://{site_host}/",
site_host=site_host,
extension=requested_type,
limit=limit,
max_duration_seconds=10.0,
)
table = Table(command_title)
table.set_table("web.search")
table.set_source_command("search-file", list(args_list))
try:
table.set_table_metadata(
{
"provider": "web",
"site": site_host,
"query": search_query,
"filetype": requested_type,
}
)
except Exception:
pass
if not rows:
log(f"No web results found for query: {search_query}", file=sys.stderr)
if refresh_mode:
try:
ctx.set_last_result_table_preserve_history(table, [])
except Exception:
pass
try:
append_worker_stdout(worker_id, json.dumps([], indent=2))
update_worker(worker_id, status="completed")
except Exception:
pass
return 0
results_list: List[Dict[str, Any]] = []
for row in rows:
target_url = str(row.get("url") or "").strip()
if not target_url:
continue
source_title = str(row.get("title") or "").strip()
title = source_title or target_url
snippet = self._normalize_space(row.get("snippet") or "")
if len(snippet) > 120:
snippet = f"{snippet[:117].rstrip()}..."
detected_ext = requested_type
file_name = ""
if not detected_ext:
try:
parsed_path = Path(urlparse(target_url).path)
file_name = Path(unquote(str(parsed_path))).name
detected_ext = self._normalize_extension(parsed_path.suffix)
except Exception:
detected_ext = ""
else:
try:
file_name = Path(unquote(urlparse(target_url).path)).name
except Exception:
file_name = ""
# For filetype-based web searches, prefer a concise filename title.
if file_name:
title = file_name
payload: Dict[str, Any] = {
"title": title,
"path": target_url,
"url": target_url,
"source": "web",
"store": "web",
"table": "web.search",
"ext": detected_ext,
"detail": snippet,
"tag": [f"site:{site_host}"] + ([f"type:{detected_ext}"] if detected_ext else []),
"columns": [
("Title", title),
("Type", detected_ext),
("URL", target_url),
],
"_selection_args": ["-url", target_url],
"_selection_action": ["download-file", "-url", target_url],
}
table.add_result(payload)
results_list.append(payload)
ctx.emit(payload)
if refresh_mode:
ctx.set_last_result_table_preserve_history(table, results_list)
else:
ctx.set_last_result_table(table, results_list)
ctx.set_current_stage_table(table)
try:
append_worker_stdout(worker_id, json.dumps(results_list, indent=2))
update_worker(worker_id, status="completed")
except Exception:
pass
return 0
except Exception as exc:
log(f"Web search failed: {exc}", file=sys.stderr)
try:
update_worker(worker_id, status="error")
except Exception:
pass
return 1
@staticmethod
def _normalize_extension(ext_value: Any) -> str:
"""Sanitize extension strings to alphanumerics and cap at 5 chars."""
ext = str(ext_value or "").strip().lstrip(".")
for sep in (" ", "|", "(", "[", "{", ",", ";"):
if sep in ext:
ext = ext.split(sep, 1)[0]
break
if "." in ext:
ext = ext.split(".")[-1]
ext = "".join(ch for ch in ext if ch.isalnum())
return ext[:5]
@staticmethod
def _normalize_lookup_target(value: Optional[str]) -> str:
"""Normalize candidate names for store/provider matching."""
raw = str(value or "").strip().lower()
return "".join(ch for ch in raw if ch.isalnum())
@staticmethod
def _extract_namespace_tags(payload: Dict[str, Any]) -> List[str]:
"""Return deduplicated namespace tags from payload, excluding title:* tags."""
candidates: List[str] = []
def _add_candidate(value: Any) -> None:
if isinstance(value, str):
text = value.strip()
if text:
parts = re.split(r"[,;\n\r]+", text)
for part in parts:
token = part.strip().strip("[](){}\"'#")
if token:
candidates.append(token)
elif isinstance(value, dict):
for nested in value.values():
_add_candidate(nested)
elif isinstance(value, (list, tuple, set)):
for item in value:
_add_candidate(item)
_add_candidate(payload.get("tag"))
_add_candidate(payload.get("tags"))
_add_candidate(payload.get("tag_summary"))
metadata = payload.get("metadata")
if isinstance(metadata, dict):
_add_candidate(metadata.get("tag"))
_add_candidate(metadata.get("tags"))
meta_tags = metadata.get("tags")
if isinstance(meta_tags, dict):
for service_data in meta_tags.values():
if not isinstance(service_data, dict):
continue
display_tags = service_data.get("display_tags")
if isinstance(display_tags, dict):
for ns_name, tag_list in display_tags.items():
if isinstance(tag_list, list):
ns_text = str(ns_name or "").strip()
for tag_item in tag_list:
item_text = str(tag_item or "").strip()
if not item_text:
continue
if ":" in item_text:
candidates.append(item_text)
continue
if ns_text:
candidates.append(f"{ns_text}:{item_text}")
else:
candidates.append(item_text)
else:
_add_candidate(tag_list)
namespace_tags: List[str] = []
seen: set[str] = set()
for raw in candidates:
candidate = str(raw or "").strip()
if not candidate or ":" not in candidate:
continue
ns, value = candidate.split(":", 1)
ns_norm = ns.strip().lower()
value_norm = value.strip()
if not value_norm:
continue
if ns_norm == "title":
continue
normalized = f"{ns_norm}:{value_norm}"
key = normalized.lower()
if key in seen:
continue
seen.add(key)
namespace_tags.append(normalized)
return namespace_tags
def _set_storage_display_columns(self, payload: Dict[str, Any]) -> None:
"""Set explicit display columns for store search results."""
title_text = str(payload.get("title") or payload.get("name") or payload.get("filename") or "Result")
namespace_tags = self._extract_namespace_tags(payload)
tag_text = ", ".join(namespace_tags)
store_text = str(payload.get("store") or payload.get("table") or payload.get("source") or "")
size_raw = payload.get("size_bytes")
if size_raw is None:
size_raw = payload.get("size")
ext_text = str(payload.get("ext") or "")
payload["columns"] = [
("Title", title_text),
("Tag", tag_text),
("Store", store_text),
("Size", size_raw),
("Ext", ext_text),
]
def _ensure_storage_columns(self, payload: Dict[str, Any]) -> Dict[str, Any]:
"""Ensure storage results have the necessary fields for result_table display."""
# Ensure we have title field
if "title" not in payload:
payload["title"] = (
payload.get("name") or payload.get("target") or payload.get("path")
or "Result"
)
# Ensure we have ext field
if ("ext" not in payload) or (not str(payload.get("ext") or "").strip()):
title = str(payload.get("title", ""))
path_obj = Path(title)
if path_obj.suffix:
payload["ext"] = self._normalize_extension(path_obj.suffix.lstrip("."))
else:
payload["ext"] = payload.get("ext", "")
# Ensure size_bytes is present for display (already set by search_file())
# result_table will handle formatting it
# Store search uses explicit columns so TAG can appear right after TITLE.
self._set_storage_display_columns(payload)
return payload
def _run_provider_search(
self,
*,
provider_name: str,
query: str,
limit: int,
limit_set: bool,
open_id: Optional[int],
args_list: List[str],
refresh_mode: bool,
config: Dict[str, Any],
) -> int:
"""Execute external provider search."""
if not provider_name or not query:
from SYS import pipeline as ctx_mod
progress = None
if hasattr(ctx_mod, "get_pipeline_state"):
progress = ctx_mod.get_pipeline_state().live_progress
if progress:
try:
progress.stop()
except Exception:
pass
log("Error: search-file -provider requires both provider and query", file=sys.stderr)
log(f"Usage: {self.usage}", file=sys.stderr)
providers_map = list_search_providers(config)
available = [n for n, a in providers_map.items() if a]
unconfigured = [n for n, a in providers_map.items() if not a]
if unconfigured:
show_provider_config_panel(unconfigured)
if available:
show_available_providers_panel(available)
return 1
# Align with provider default when user did not set -limit.
if not limit_set:
limit = 50
from SYS import pipeline as ctx_mod
progress = None
if hasattr(ctx_mod, "get_pipeline_state"):
progress = ctx_mod.get_pipeline_state().live_progress
provider = get_search_provider(provider_name, config)
if not provider:
if progress:
try:
progress.stop()
except Exception:
pass
show_provider_config_panel([provider_name])
providers_map = list_search_providers(config)
available = [n for n, a in providers_map.items() if a]
if available:
show_available_providers_panel(available)
return 1
worker_id = str(uuid.uuid4())
try:
insert_worker(
worker_id,
"search-file",
title=f"Search: {query}",
description=f"Provider: {provider_name}, Query: {query}",
)
except Exception:
pass
try:
results_list: List[Dict[str, Any]] = []
from SYS.result_table import Table
provider_text = str(provider_name or "").strip()
provider_lower = provider_text.lower()
# Dynamic query/filter extraction via provider
normalized_query = str(query or "").strip()
provider_filters: Dict[str, Any] = {}
try:
normalized_query, provider_filters = provider.extract_query_arguments(query)
except Exception:
provider_filters = {}
normalized_query = (normalized_query or "").strip()
query = normalized_query or "*"
search_filters = dict(provider_filters or {})
# Dynamic table generation via provider
table_title = provider.get_table_title(query, search_filters).strip().rstrip(":")
table_type = provider.get_table_type(query, search_filters)
table_meta = provider.get_table_metadata(query, search_filters)
preserve_order = provider.preserve_order
table = Table(table_title)._perseverance(preserve_order)
table.set_table(table_type)
try:
table.set_table_metadata(table_meta)
except Exception:
pass
# Dynamic source command via provider
source_cmd, source_args = provider.get_source_command(args_list)
table.set_source_command(source_cmd, source_args)
debug(f"[search-file] Calling {provider_name}.search(filters={search_filters})")
results = provider.search(query, limit=limit, filters=search_filters or None)
debug(f"[search-file] {provider_name} -> {len(results or [])} result(s)")
# Allow providers to apply provider-specific UX transforms (e.g. auto-expansion)
try:
post = getattr(provider, "postprocess_search_results", None)
if callable(post) and isinstance(results, list):
results, table_type_override, table_meta_override = post(
query=query,
results=results,
filters=search_filters or None,
limit=int(limit or 0),
table_type=str(table_type or ""),
table_meta=dict(table_meta) if isinstance(table_meta, dict) else None,
)
if table_type_override:
table_type = str(table_type_override)
table.set_table(table_type)
if isinstance(table_meta_override, dict) and table_meta_override:
table_meta = dict(table_meta_override)
try:
table.set_table_metadata(table_meta)
except Exception:
pass
except Exception:
pass
if not results:
log(f"No results found for query: {query}", file=sys.stderr)
try:
append_worker_stdout(worker_id, json.dumps([], indent=2))
update_worker(worker_id, status="completed")
except Exception:
pass
return 0
for search_result in results:
item_dict = (
search_result.to_dict()
if hasattr(search_result, "to_dict")
else dict(search_result)
if isinstance(search_result, dict)
else {"title": str(search_result)}
)
if "table" not in item_dict:
item_dict["table"] = table_type
# Ensure provider source is present so downstream cmdlets (select) can resolve provider
if "source" not in item_dict:
item_dict["source"] = provider_name
row_index = len(table.rows)
table.add_result(search_result)
results_list.append(item_dict)
ctx.emit(item_dict)
if refresh_mode:
ctx.set_last_result_table_preserve_history(table, results_list)
else:
ctx.set_last_result_table(table, results_list)
ctx.set_current_stage_table(table)
try:
append_worker_stdout(worker_id, json.dumps(results_list, indent=2))
update_worker(worker_id, status="completed")
except Exception:
pass
return 0
except Exception as exc:
log(f"Error searching provider '{provider_name}': {exc}", file=sys.stderr)
import traceback
debug(traceback.format_exc())
try:
update_worker(worker_id, status="error")
except Exception:
pass
return 1
# --- Execution ------------------------------------------------------
def run(self, result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
"""Search storage backends for files by various criteria.
Supports searching by:
- Hash (-query "hash:...")
- Title (-query "title:...")
- Tag (-query "tag:...")
- URL (-query "url:...")
- Other backend-specific fields
Optimizations:
- Extracts tags from metadata response (avoids duplicate API calls)
- Only calls get_tag() separately for backends that don't include tags
Args:
result: Piped input (typically empty for new search)
args: Search criteria and options
config: Application configuration
Returns:
0 on success, 1 on error
"""
if should_show_help(args):
log(f"Cmdlet: {self.name}\nSummary: {self.summary}\nUsage: {self.usage}")
return 0
args_list = [str(arg) for arg in (args or [])]
refresh_mode = any(
str(a).strip().lower() in {"--refresh", "-refresh", "-internal-refresh"}
for a in args_list
)
def _format_command_title(command: str, raw_args: List[str]) -> str:
def _quote(value: str) -> str:
text = str(value)
if not text:
return '""'
needs_quotes = any(ch.isspace() for ch in text) or '"' in text
if not needs_quotes:
return text
return '"' + text.replace('"', '\\"') + '"'
cleaned = [
str(a) for a in (raw_args or [])
if str(a).strip().lower() not in {"--refresh", "-refresh", "-internal-refresh"}
]
if not cleaned:
return command
return " ".join([command, *[_quote(a) for a in cleaned]])
raw_title = None
try:
raw_title = (
ctx.get_current_stage_text("")
if hasattr(ctx,
"get_current_stage_text") else None
)
except Exception:
raw_title = None
command_title = (str(raw_title).strip() if raw_title else
"") or _format_command_title("search-file",
list(args_list))
# Build dynamic flag variants from cmdlet arg definitions.
# This avoids hardcoding flag spellings in parsing loops.
flag_registry = self.build_flag_registry()
query_flags = {
f.lower()
for f in (flag_registry.get("query") or {"-query", "--query"})
}
store_flags = {
f.lower()
for f in (flag_registry.get("store") or {"-store", "--store"})
}
limit_flags = {
f.lower()
for f in (flag_registry.get("limit") or {"-limit", "--limit"})
}
provider_flags = {
f.lower()
for f in (flag_registry.get("provider") or {"-provider", "--provider"})
}
open_flags = {
f.lower()
for f in (flag_registry.get("open") or {"-open", "--open"})
}
# Parse arguments
query = ""
storage_backend: Optional[str] = None
provider_name: Optional[str] = None
open_id: Optional[int] = None
limit = 100
limit_set = False
searched_backends: List[str] = []
positional_args: List[str] = []
i = 0
while i < len(args_list):
arg = args_list[i]
low = arg.lower()
if low in query_flags and i + 1 < len(args_list):
chunk = args_list[i + 1]
query = f"{query} {chunk}".strip() if query else chunk
i += 2
continue
if low in provider_flags and i + 1 < len(args_list):
provider_name = args_list[i + 1]
i += 2
continue
if low in open_flags and i + 1 < len(args_list):
try:
open_id = int(args_list[i + 1])
except ValueError:
log(
f"Warning: Invalid open value '{args_list[i + 1]}', ignoring",
file=sys.stderr,
)
open_id = None
i += 2
continue
if low in store_flags and i + 1 < len(args_list):
storage_backend = args_list[i + 1]
i += 2
elif low in limit_flags and i + 1 < len(args_list):
limit_set = True
try:
limit = int(args_list[i + 1])
except ValueError:
limit = 100
i += 2
elif not arg.startswith("-"):
positional_args.append(arg)
query = f"{query} {arg}".strip() if query else arg
i += 1
else:
i += 1
query = query.strip()
if provider_name:
return self._run_provider_search(
provider_name=provider_name,
query=query,
limit=limit,
limit_set=limit_set,
open_id=open_id,
args_list=args_list,
refresh_mode=refresh_mode,
config=config,
)
store_filter: Optional[str] = None
if query:
match = re.search(r"\bstore:([^\s,]+)", query, flags=re.IGNORECASE)
if match:
store_filter = match.group(1).strip() or None
query = re.sub(r"\s*[,]?\s*store:[^\s,]+", " ", query, flags=re.IGNORECASE)
query = re.sub(r"\s{2,}", " ", query)
query = query.strip().strip(",")
if store_filter and not storage_backend:
storage_backend = store_filter
# If the user accidentally used `-store <provider>` or `store:<provider>`,
# prefer to treat it as a provider search (providers like 'alldebrid' are not store backends).
try:
from Store.registry import list_configured_backend_names
providers_map = list_search_providers(config)
configured = list_configured_backend_names(config or {})
if storage_backend:
matched = None
storage_hint = self._normalize_lookup_target(storage_backend)
if storage_hint:
for p in (providers_map or {}):
if self._normalize_lookup_target(p) == storage_hint:
matched = p
break
if matched and str(storage_backend) not in configured:
log(f"Note: Treating '-store {storage_backend}' as provider search for '{matched}'", file=sys.stderr)
return self._run_provider_search(
provider_name=matched,
query=query,
limit=limit,
limit_set=limit_set,
open_id=open_id,
args_list=args_list,
refresh_mode=refresh_mode,
config=config,
)
elif store_filter:
matched = None
store_hint = self._normalize_lookup_target(store_filter)
if store_hint:
for p in (providers_map or {}):
if self._normalize_lookup_target(p) == store_hint:
matched = p
break
if matched and str(store_filter) not in configured:
log(f"Note: Treating 'store:{store_filter}' as provider search for '{matched}'", file=sys.stderr)
return self._run_provider_search(
provider_name=matched,
query=query,
limit=limit,
limit_set=limit_set,
open_id=open_id,
args_list=args_list,
refresh_mode=refresh_mode,
config=config,
)
except Exception:
# Be conservative: if provider detection fails, fall back to store behaviour
pass
hash_query = parse_hash_query(query)
web_plan = self._build_web_search_plan(
query=query,
positional_args=positional_args,
storage_backend=storage_backend,
store_filter=store_filter,
hash_query=hash_query,
)
if web_plan is not None:
return self._run_web_search(
web_plan=web_plan,
limit=limit,
args_list=args_list,
refresh_mode=refresh_mode,
command_title=command_title,
)
if not query:
log("Provide a search query", file=sys.stderr)
return 1
worker_id = str(uuid.uuid4())
from Store import Store
storage_registry = Store(config=config or {})
if not storage_registry.list_backends():
# Internal refreshes should not trigger config panels or stop progress.
if "-internal-refresh" in args_list:
return 1
from SYS import pipeline as ctx_mod
progress = None
if hasattr(ctx_mod, "get_pipeline_state"):
progress = ctx_mod.get_pipeline_state().live_progress
if progress:
try:
progress.stop()
except Exception:
pass
show_store_config_panel(["Hydrus Network"])
return 1
# Use a lightweight worker logger to track search results in the central DB
with _WorkerLogger(worker_id) as db:
try:
if "-internal-refresh" not in args_list:
db.insert_worker(
worker_id,
"search-file",
title=f"Search: {query}",
description=f"Query: {query}",
pipe=ctx.get_current_command_text(),
)
results_list = []
from SYS.result_table import Table
table = Table(command_title)
try:
table.set_source_command("search-file", list(args_list))
except Exception:
pass
if hash_query:
try:
table._perseverance(True)
except Exception:
pass
from Store.registry import list_configured_backend_names, get_backend_instance
from Store._base import Store as BaseStore
backend_to_search = storage_backend or None
if hash_query:
# Explicit hash list search: build rows from backend metadata.
backends_to_try: List[str] = []
if backend_to_search:
backends_to_try = [backend_to_search]
else:
backends_to_try = list_configured_backend_names(config or {})
found_any = False
for h in hash_query:
resolved_backend_name: Optional[str] = None
resolved_backend = None
for backend_name in backends_to_try:
backend = None
try:
backend = get_backend_instance(config, backend_name, suppress_debug=True)
if backend is None:
# Last-resort: instantiate full registry for this backend only
from Store import Store as _Store
_store = _Store(config=config, suppress_debug=True)
if _store.is_available(backend_name):
backend = _store[backend_name]
except Exception:
backend = None
if backend is None:
continue
try:
# If get_metadata works, consider it a hit; get_file can be optional (e.g. remote URL).
meta = backend.get_metadata(h)
if meta is None:
continue
resolved_backend_name = backend_name
resolved_backend = backend
break
except Exception:
continue
if resolved_backend_name is None or resolved_backend is None:
continue
found_any = True
searched_backends.append(resolved_backend_name)
# Resolve a path/URL string if possible
path_str: Optional[str] = None
# Avoid calling get_file() for remote backends during search/refresh.
meta_obj: Dict[str,
Any] = {}
try:
meta_obj = resolved_backend.get_metadata(h) or {}
except Exception:
meta_obj = {}
# Extract tags from metadata response instead of separate get_tag() call
# Metadata already includes tags if fetched with include_service_keys_to_tags=True
tags_list: List[str] = []
# First try to extract from metadata tags dict
metadata_tags = meta_obj.get("tags")
if isinstance(metadata_tags, dict):
collected_tags: List[str] = []
for service_data in metadata_tags.values():
if isinstance(service_data, dict):
display_tags = service_data.get("display_tags", {})
if isinstance(display_tags, dict):
for ns_name, tag_list in display_tags.items():
if not isinstance(tag_list, list):
continue
ns_text = str(ns_name or "").strip()
for tag_item in tag_list:
tag_text = str(tag_item or "").strip()
if not tag_text:
continue
if ":" in tag_text:
collected_tags.append(tag_text)
elif ns_text:
collected_tags.append(f"{ns_text}:{tag_text}")
else:
collected_tags.append(tag_text)
if collected_tags:
dedup: List[str] = []
seen_tags: set[str] = set()
for tag_text in collected_tags:
key = tag_text.lower()
if key in seen_tags:
continue
seen_tags.add(key)
dedup.append(tag_text)
tags_list = dedup
# Fallback: if metadata didn't include tags, call get_tag() separately
# (This maintains compatibility with backends that don't include tags in metadata)
if not tags_list:
try:
tag_result = resolved_backend.get_tag(h)
if isinstance(tag_result, tuple) and tag_result:
maybe_tags = tag_result[0]
else:
maybe_tags = tag_result
if isinstance(maybe_tags, list):
tags_list = [
str(t).strip() for t in maybe_tags
if isinstance(t, str) and str(t).strip()
]
except Exception:
tags_list = []
title_from_tag: Optional[str] = None
try:
title_tag = first_title_tag(tags_list)
if title_tag and ":" in title_tag:
title_from_tag = title_tag.split(":", 1)[1].strip()
except Exception:
title_from_tag = None
title = title_from_tag or meta_obj.get("title") or meta_obj.get(
"name"
)
if not title and path_str:
try:
title = Path(path_str).stem
except Exception:
title = path_str
ext_val = meta_obj.get("ext") or meta_obj.get("extension")
if not ext_val and path_str:
try:
ext_val = Path(path_str).suffix
except Exception:
ext_val = None
if not ext_val and title:
try:
ext_val = Path(str(title)).suffix
except Exception:
ext_val = None
size_bytes = meta_obj.get("size")
if size_bytes is None:
size_bytes = meta_obj.get("size_bytes")
try:
size_bytes_int: Optional[int] = (
int(size_bytes) if size_bytes is not None else None
)
except Exception:
size_bytes_int = None
payload: Dict[str,
Any] = {
"title": str(title or h),
"hash": h,
"store": resolved_backend_name,
"path": path_str,
"ext": self._normalize_extension(ext_val),
"size_bytes": size_bytes_int,
"tag": tags_list,
"url": meta_obj.get("url") or [],
}
self._set_storage_display_columns(payload)
table.add_result(payload)
results_list.append(payload)
ctx.emit(payload)
if found_any:
table.title = command_title
# Add-file refresh quality-of-life: if exactly 1 item is being refreshed,
# show the detailed item panel instead of a single-row table.
if refresh_mode and len(results_list) == 1:
try:
from SYS.rich_display import render_item_details_panel
render_item_details_panel(results_list[0])
table._rendered_by_cmdlet = True
except Exception:
pass
if refresh_mode:
ctx.set_last_result_table_preserve_history(
table,
results_list
)
else:
ctx.set_last_result_table(table, results_list)
db.append_worker_stdout(
worker_id,
json.dumps(results_list,
indent=2)
)
db.update_worker_status(worker_id, "completed")
return 0
log("No results found", file=sys.stderr)
if refresh_mode:
try:
table.title = command_title
ctx.set_last_result_table_preserve_history(table, [])
except Exception:
pass
db.append_worker_stdout(worker_id, json.dumps([], indent=2))
db.update_worker_status(worker_id, "completed")
return 0
if backend_to_search:
searched_backends.append(backend_to_search)
try:
target_backend = get_backend_instance(config, backend_to_search, suppress_debug=True)
if target_backend is None:
from Store import Store as _Store
_store = _Store(config=config, suppress_debug=True)
if _store.is_available(backend_to_search):
target_backend = _store[backend_to_search]
else:
debug(f"[search-file] Requested backend '{backend_to_search}' not found")
return 1
except Exception as exc:
log(f"Backend '{backend_to_search}' not found: {exc}", file=sys.stderr)
db.update_worker_status(worker_id, "error")
return 1
if type(target_backend).search is BaseStore.search:
log(
f"Backend '{backend_to_search}' does not support searching",
file=sys.stderr,
)
db.update_worker_status(worker_id, "error")
return 1
debug(f"[search-file] Searching '{backend_to_search}'")
results = target_backend.search(query, limit=limit)
debug(
f"[search-file] '{backend_to_search}' -> {len(results or [])} result(s)"
)
else:
all_results = []
for backend_name in list_configured_backend_names(config or {}):
try:
backend = get_backend_instance(config, backend_name, suppress_debug=True)
if backend is None:
from Store import Store as _Store
_store = _Store(config=config, suppress_debug=True)
if _store.is_available(backend_name):
backend = _store[backend_name]
else:
# Configured backend name exists but has no registered implementation or failed to load.
# (e.g. 'all-debrid' being treated as a store but having no store provider).
continue
searched_backends.append(backend_name)
if type(backend).search is BaseStore.search:
continue
debug(f"[search-file] Searching '{backend_name}'")
backend_results = backend.search(
query,
limit=limit - len(all_results)
)
debug(
f"[search-file] '{backend_name}' -> {len(backend_results or [])} result(s)"
)
if backend_results:
all_results.extend(backend_results)
if len(all_results) >= limit:
break
except Exception as exc:
log(
f"Backend {backend_name} search failed: {exc}",
file=sys.stderr
)
results = all_results[:limit]
if results:
for item in results:
def _as_dict(obj: Any) -> Dict[str, Any]:
if isinstance(obj, dict):
return dict(obj)
if hasattr(obj,
"to_dict") and callable(getattr(obj,
"to_dict")):
return obj.to_dict() # type: ignore[arg-type]
return {
"title": str(obj)
}
item_dict = _as_dict(item)
if store_filter:
store_val = str(item_dict.get("store") or "").lower()
if store_filter != store_val:
continue
# Normalize storage results (ensure title, ext, etc.)
normalized = self._ensure_storage_columns(item_dict)
# If normalize skipped it due to STORAGE_ORIGINS, do it manually
if "title" not in normalized:
normalized["title"] = (
item_dict.get("title") or item_dict.get("name") or
item_dict.get("path") or item_dict.get("target") or "Result"
)
if "ext" not in normalized:
t = str(normalized.get("title", ""))
if "." in t:
normalized["ext"] = t.split(".")[-1].lower()[:5]
# Make hash/store available for downstream cmdlet without rerunning search
hash_val = normalized.get("hash")
store_val = normalized.get("store") or item_dict.get("store") or backend_to_search
if hash_val and not normalized.get("hash"):
normalized["hash"] = hash_val
if store_val and not normalized.get("store"):
normalized["store"] = store_val
# Populate default selection args for interactive @N selection/hash/url handling
try:
sel_args: Optional[List[str]] = None
sel_action: Optional[List[str]] = None
# Prefer explicit path when available
p_val = normalized.get("path") or normalized.get("target") or normalized.get("url")
if p_val:
p_str = str(p_val or "").strip()
if p_str:
if p_str.startswith(("http://", "https://", "magnet:", "torrent:")):
h = normalized.get("hash") or normalized.get("file_hash") or normalized.get("hash_hex")
s_val = normalized.get("store")
if h and s_val and "/view_file" in p_str:
try:
h_norm = normalize_hash(h)
except Exception:
h_norm = str(h)
sel_args = ["-query", f"hash:{h_norm}", "-store", str(s_val)]
sel_action = ["get-metadata", "-query", f"hash:{h_norm}", "-store", str(s_val)]
else:
sel_args = ["-url", p_str]
sel_action = ["download-file", "-url", p_str]
else:
try:
from SYS.utils import expand_path
full_path = expand_path(p_str)
# Prefer showing metadata details when we have a hash+store context
h = normalized.get("hash") or normalized.get("file_hash") or normalized.get("hash_hex")
s_val = normalized.get("store")
if h and s_val:
try:
h_norm = normalize_hash(h)
except Exception:
h_norm = str(h)
sel_args = ["-query", f"hash:{h_norm}", "-store", str(s_val)]
sel_action = ["get-metadata", "-query", f"hash:{h_norm}", "-store", str(s_val)]
else:
sel_args = ["-path", str(full_path)]
# Default action for local paths: get-file to fetch or operate on the path
sel_action = ["get-file", "-path", str(full_path)]
except Exception:
sel_args = ["-path", p_str]
sel_action = ["get-file", "-path", p_str]
# Fallback: use hash+store when available
if sel_args is None:
h = normalized.get("hash") or normalized.get("file_hash") or normalized.get("hash_hex")
s_val = normalized.get("store")
if h and s_val:
try:
h_norm = normalize_hash(h)
except Exception:
h_norm = str(h)
sel_args = ["-query", f"hash:{h_norm}", "-store", str(s_val)]
# Show metadata details by default for store/hash selections
sel_action = ["get-metadata", "-query", f"hash:{h_norm}", "-store", str(s_val)]
if sel_args:
normalized["_selection_args"] = [str(x) for x in sel_args]
if sel_action:
normalized["_selection_action"] = [str(x) for x in sel_action]
except Exception:
pass
table.add_result(normalized)
results_list.append(normalized)
ctx.emit(normalized)
table.title = command_title
# If exactly 1 item is being refreshed, show the detailed item panel.
if refresh_mode and len(results_list) == 1:
try:
from SYS.rich_display import render_item_details_panel
render_item_details_panel(results_list[0])
table._rendered_by_cmdlet = True
except Exception:
pass
if refresh_mode:
# For internal refresh, use overlay mode to avoid adding to history
try:
# Parse out the store/hash context if possible
subject_context = None
if "hash:" in query:
subject_hash = query.split("hash:")[1].split(",")[0].strip()
subject_context = {"store": backend_to_search, "hash": subject_hash}
ctx.set_last_result_table_overlay(table, results_list, subject=subject_context)
except Exception:
ctx.set_last_result_table_preserve_history(table, results_list)
else:
ctx.set_last_result_table(table, results_list)
db.append_worker_stdout(
worker_id,
json.dumps(results_list,
indent=2)
)
else:
log("No results found", file=sys.stderr)
if refresh_mode:
try:
table.title = command_title
ctx.set_last_result_table_preserve_history(table, [])
except Exception:
pass
db.append_worker_stdout(worker_id, json.dumps([], indent=2))
db.update_worker_status(worker_id, "completed")
return 0
except Exception as exc:
log(f"Search failed: {exc}", file=sys.stderr)
import traceback
traceback.print_exc(file=sys.stderr)
try:
db.update_worker_status(worker_id, "error")
except Exception:
pass
return 1
CMDLET = search_file()