diff --git a/API/data/alldebrid.json b/API/data/alldebrid.json index c9142ac..3c92ea5 100644 --- a/API/data/alldebrid.json +++ b/API/data/alldebrid.json @@ -22,7 +22,7 @@ "((1fichier\\.com|megadl\\.fr|alterupload\\.com|cjoint\\.net|desfichiers\\.com|dfichiers\\.com|mesfichiers\\.org|piecejointe\\.net|pjointe\\.com|tenvoi\\.com|dl4free\\.com)/\\?[a-zA-Z0-9]{5,30}(&pw=[^&]+)?)" ], "regexp": "((1fichier\\.com|megadl\\.fr|alterupload\\.com|cjoint\\.net|desfichiers\\.com|dfichiers\\.com|mesfichiers\\.org|piecejointe\\.net|pjointe\\.com|tenvoi\\.com|dl4free\\.com)/\\?[a-zA-Z0-9]{5,30}(&pw=[^&]+)?)", - "status": false + "status": true }, "rapidgator": { "name": "rapidgator", @@ -398,7 +398,7 @@ "(gigapeta\\.com/dl/[0-9a-zA-Z]{13,15})" ], "regexp": "(gigapeta\\.com/dl/[0-9a-zA-Z]{13,15})", - "status": false + "status": true }, "google": { "name": "google", @@ -425,7 +425,7 @@ "(hexupload\\.net|hexload\\.com)/([a-zA-Z0-9]{12})" ], "regexp": "(hexupload\\.net|hexload\\.com)/([a-zA-Z0-9]{12})", - "status": false + "status": true }, "hot4share": { "name": "hot4share", @@ -494,7 +494,7 @@ "mediafire\\.com/(\\?|download/|file/|download\\.php\\?)([0-9a-z]{15})" ], "regexp": "mediafire\\.com/(\\?|download/|file/|download\\.php\\?)([0-9a-z]{15})", - "status": false + "status": true }, "mixdrop": { "name": "mixdrop", @@ -595,7 +595,7 @@ "(simfileshare\\.net/download/[0-9]+/)" ], "regexp": "(simfileshare\\.net/download/[0-9]+/)", - "status": false + "status": true }, "streamtape": { "name": "streamtape", diff --git a/cmdlet/convert_file.py b/cmdlet/convert_file.py index c121a66..fac8608 100644 --- a/cmdlet/convert_file.py +++ b/cmdlet/convert_file.py @@ -83,11 +83,13 @@ def _detect_kind(ext: str) -> str: return "unknown" -def _allowed(source_kind: str, target_kind: str) -> bool: +def _allowed(source_kind: str, target_kind: str, target_ext: str = "") -> bool: if source_kind == target_kind: return True if source_kind == "video" and target_kind == "audio": return True + if source_kind == "video" and target_kind == "image" and target_ext.lower().lstrip(".") == "gif": + return True return False @@ -173,7 +175,7 @@ CMDLET = Cmdlet( SharedArgs.DELETE, ], detail=[ - "Allows video↔video, audio↔audio, image↔image, doc↔doc, and video→audio conversions.", + "Allows video↔video, audio↔audio, image↔image, doc↔doc, video→audio, and video→gif conversions.", "Disallows incompatible conversions (e.g., video→pdf).", "Uses ffmpeg for media and pypandoc-binary (bundled pandoc) for document formats (mobi/epub→pdf/txt/etc); PDF output uses the tectonic LaTeX engine when available.", ], @@ -250,7 +252,7 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: source_ext = input_path.suffix.lower().lstrip(".") source_kind = _detect_kind(source_ext) - if not _allowed(source_kind, target_kind): + if not _allowed(source_kind, target_kind, target_fmt): log( f"Conversion from {source_kind or 'unknown'} to {target_kind} is not allowed", file=sys.stderr, diff --git a/cmdlet/download_file.py b/cmdlet/download_file.py index 1e6c49d..8b4e9ff 100644 --- a/cmdlet/download_file.py +++ b/cmdlet/download_file.py @@ -109,6 +109,14 @@ class Download_File(Cmdlet): debug(f"[download-file] run invoked with args: {list(args)}") return self._run_impl(result, args, config) + @staticmethod + def _path_from_download_result(result_obj: Any) -> Path: + """Normalize downloader return values to a concrete filesystem path.""" + resolved = coerce_to_path(result_obj) + if resolved is None: + raise DownloadError("Could not determine downloaded file path") + return resolved + def _process_explicit_urls( self, *, diff --git a/cmdlet/search_file.py b/cmdlet/search_file.py index b29c1fa..f8d989d 100644 --- a/cmdlet/search_file.py +++ b/cmdlet/search_file.py @@ -3,11 +3,15 @@ from __future__ import annotations from typing import Any, Dict, Sequence, List, Optional +from collections import deque import uuid from pathlib import Path import re import json import sys +import html +import time +from urllib.parse import urlparse, parse_qs, unquote, urljoin from SYS.logger import log, debug from ProviderCore.registry import get_search_provider, list_search_providers @@ -114,6 +118,8 @@ class search_file(Cmdlet): "search-file -query 'url:youtube.com' # Files whose URL contains substring", "search-file -query 'ext:png' # Files whose metadata ext is png", "search-file -query 'system:filetype = png' # Hydrus: native", + "search-file 'example.com/path' -query 'ext:pdf' # Web: site:example.com filetype:pdf", + "search-file -query 'site:example.com filetype:epub history' # Web: site-scoped search", "", "Provider search (-provider):", "search-file -provider youtube 'tutorial' # Search YouTube provider", @@ -125,6 +131,1085 @@ class search_file(Cmdlet): self.register() # --- Helper methods ------------------------------------------------- + @staticmethod + def _normalize_host(value: Any) -> str: + """Normalize host names for matching/filtering.""" + host = str(value or "").strip().lower() + if host.startswith("www."): + host = host[4:] + if ":" in host: + host = host.split(":", 1)[0] + return host + + @classmethod + def _extract_site_host(cls, candidate: Any) -> Optional[str]: + """Extract a host/domain from URL-like input.""" + raw = str(candidate or "").strip().strip('"').strip("'") + if not raw: + return None + + if raw.lower().startswith("site:"): + raw = raw.split(":", 1)[1].strip() + + parsed = None + try: + parsed = urlparse(raw) + except Exception: + parsed = None + + if parsed is None or not getattr(parsed, "hostname", None): + try: + parsed = urlparse(f"https://{raw}") + except Exception: + parsed = None + + host = "" + try: + host = str(getattr(parsed, "hostname", "") or "").strip().lower() + except Exception: + host = "" + + host = cls._normalize_host(host) + if not host or "." not in host: + return None + return host + + @staticmethod + def _normalize_space(text: Any) -> str: + return re.sub(r"\s+", " ", str(text or "")).strip() + + @classmethod + def _build_web_search_plan( + cls, + *, + query: str, + positional_args: List[str], + storage_backend: Optional[str], + store_filter: Optional[str], + hash_query: List[str], + ) -> Optional[Dict[str, Any]]: + """Build web-search plan for URL + ext/filetype query syntax. + + Example input: + search-file "example.com/foo" -query "ext:pdf" + Produces: + site:example.com filetype:pdf + """ + if storage_backend or store_filter or hash_query: + return None + + text = cls._normalize_space(query) + if not text: + return None + + # Avoid hijacking explicit local search DSL (url:, tag:, hash:, etc.). + local_markers = ("url:", "hash:", "tag:", "store:", "system:") + if any(marker in text.lower() for marker in local_markers): + return None + + site_host: Optional[str] = None + site_from_positional = False + site_token_to_strip = "" + seed_url = "" + + site_match = re.search(r"(?:^|\s)site:([^\s,]+)", text, flags=re.IGNORECASE) + if site_match: + site_host = cls._extract_site_host(site_match.group(1)) + seed_url = str(site_match.group(1) or "").strip() + + if not site_host and positional_args: + site_host = cls._extract_site_host(positional_args[0]) + site_from_positional = bool(site_host) + if site_from_positional: + site_token_to_strip = str(positional_args[0] or "").strip() + seed_url = site_token_to_strip + + if not site_host: + for token in text.split(): + candidate = str(token or "").strip().strip(",") + if not candidate: + continue + lower_candidate = candidate.lower() + if lower_candidate.startswith(("ext:", "filetype:", "type:", "site:")): + continue + if re.match(r"^[a-z]+:", lower_candidate) and not lower_candidate.startswith( + ("http://", "https://") + ): + continue + guessed = cls._extract_site_host(candidate) + if guessed: + site_host = guessed + site_token_to_strip = candidate + break + + if not site_host: + return None + + filetype_match = re.search( + r"(?:^|\s)(?:ext|filetype|type):\.?([a-z0-9]{1,12})\b", + text, + flags=re.IGNORECASE, + ) + filetype = cls._normalize_extension(filetype_match.group(1)) if filetype_match else "" + + # Feature gate: trigger this web-search mode when filetype is present + # or user explicitly provided site: syntax. + has_explicit_site = bool(site_match) + if not filetype and not has_explicit_site: + return None + + residual = text + residual = re.sub(r"(?:^|\s)site:[^\s,]+", " ", residual, flags=re.IGNORECASE) + residual = re.sub( + r"(?:^|\s)(?:ext|filetype|type):\.?[a-z0-9]{1,12}\b", + " ", + residual, + flags=re.IGNORECASE, + ) + + if site_from_positional and positional_args: + first = str(positional_args[0] or "").strip() + if first: + residual = re.sub(rf"(?:^|\s){re.escape(first)}(?:\s|$)", " ", residual, count=1) + elif site_token_to_strip: + residual = re.sub( + rf"(?:^|\s){re.escape(site_token_to_strip)}(?:\s|$)", + " ", + residual, + count=1, + ) + + residual = cls._normalize_space(residual) + + search_terms: List[str] = [f"site:{site_host}"] + if filetype: + search_terms.append(f"filetype:{filetype}") + if residual: + search_terms.append(residual) + + search_query = " ".join(search_terms).strip() + if not search_query: + return None + + normalized_seed_url = cls._normalize_seed_url(seed_url, site_host) + + return { + "site_host": site_host, + "filetype": filetype, + "search_query": search_query, + "residual": residual, + "seed_url": normalized_seed_url, + } + + @classmethod + def _normalize_seed_url(cls, seed_value: Any, site_host: str) -> str: + """Build a safe crawl starting URL from user input and resolved host.""" + raw = str(seed_value or "").strip().strip("'\"") + if not raw: + raw = str(site_host or "").strip() + + if raw and not raw.startswith(("http://", "https://")): + raw = f"https://{raw}" + + try: + parsed = urlparse(raw) + except Exception: + parsed = urlparse("") + + target = cls._normalize_host(site_host) + host = cls._normalize_host(getattr(parsed, "hostname", "") or "") + if target and host and not (host == target or host.endswith(f".{target}")): + return f"https://{target}/" + + scheme = str(getattr(parsed, "scheme", "") or "https").lower() + if scheme not in {"http", "https"}: + scheme = "https" + + netloc = str(getattr(parsed, "netloc", "") or "").strip() + if not netloc: + netloc = target + path = str(getattr(parsed, "path", "") or "").strip() + if not path: + path = "/" + + return f"{scheme}://{netloc}{path}" + + @staticmethod + def _is_probable_html_path(path_value: str) -> bool: + """Return True when URL path likely points to an HTML page.""" + path = str(path_value or "").strip() + if not path: + return True + suffix = Path(path).suffix.lower() + if not suffix: + return True + return suffix in {".html", ".htm", ".php", ".asp", ".aspx", ".jsp", ".shtml", ".xhtml"} + + @classmethod + def _extract_html_links(cls, *, html_text: str, base_url: str) -> List[str]: + """Extract absolute links from an HTML document.""" + links: List[str] = [] + seen: set[str] = set() + + def _add_link(raw_href: Any) -> None: + href = str(raw_href or "").strip() + if not href or href.startswith(("#", "javascript:", "mailto:")): + return + try: + absolute = urljoin(base_url, href) + parsed = urlparse(absolute) + except Exception: + return + if str(getattr(parsed, "scheme", "") or "").lower() not in {"http", "https"}: + return + clean = parsed._replace(fragment="").geturl() + if clean in seen: + return + seen.add(clean) + links.append(clean) + + try: + from lxml import html as lxml_html + + doc = lxml_html.fromstring(html_text or "") + for node in doc.xpath("//a[@href]"): + _add_link(node.get("href")) + except Exception: + href_pattern = re.compile(r']+href=["\']([^"\']+)["\']', flags=re.IGNORECASE) + for match in href_pattern.finditer(html_text or ""): + _add_link(match.group(1)) + + return links + + @classmethod + def _crawl_site_for_extension( + cls, + *, + seed_url: str, + site_host: str, + extension: str, + limit: int, + max_duration_seconds: float = 15.0, + ) -> List[Dict[str, str]]: + """Fallback crawler that discovers in-site file links by extension.""" + from API.requests_client import get_requests_session + + normalized_ext = cls._normalize_extension(extension) + if not normalized_ext: + return [] + + start_url = cls._normalize_seed_url(seed_url, site_host) + if not start_url: + return [] + + session = get_requests_session() + headers = { + "User-Agent": ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/124.0.0.0 Safari/537.36" + ), + "Accept-Language": "en-US,en;q=0.9", + } + + queue: deque[str] = deque([start_url]) + queued: set[str] = {start_url} + visited_pages: set[str] = set() + seen_files: set[str] = set() + rows: List[Dict[str, str]] = [] + normalized_limit = max(1, min(int(limit or 1), 100)) + max_pages = max(8, min(normalized_limit * 4, 64)) + crawl_deadline = time.monotonic() + max(5.0, float(max_duration_seconds or 0.0)) + + while ( + queue + and len(visited_pages) < max_pages + and len(rows) < normalized_limit + and time.monotonic() < crawl_deadline + ): + page_url = queue.popleft() + queued.discard(page_url) + if page_url in visited_pages: + continue + visited_pages.add(page_url) + + if time.monotonic() >= crawl_deadline: + break + + try: + response = session.get(page_url, timeout=(4, 8), headers=headers) + response.raise_for_status() + except Exception: + continue + + final_url = str(getattr(response, "url", "") or page_url) + try: + parsed_final = urlparse(final_url) + except Exception: + continue + + final_host = cls._normalize_host(getattr(parsed_final, "hostname", "") or "") + if not cls._url_matches_site(final_url, site_host): + continue + + final_path = str(getattr(parsed_final, "path", "") or "") + direct_ext = cls._normalize_extension(Path(final_path).suffix) + if direct_ext == normalized_ext: + file_url = parsed_final._replace(fragment="").geturl() + if file_url not in seen_files: + seen_files.add(file_url) + title = Path(unquote(final_path)).name or file_url + rows.append( + { + "url": file_url, + "title": title, + "snippet": "Discovered via in-site crawl", + } + ) + continue + + content_type = str((response.headers or {}).get("content-type", "") or "").lower() + if "html" not in content_type and "xhtml" not in content_type: + continue + + html_text = str(getattr(response, "text", "") or "") + if not html_text: + continue + if len(html_text) > 2_500_000: + # Avoid parsing extremely large pages during fallback crawl mode. + continue + + discovered_links = cls._extract_html_links(html_text=html_text, base_url=final_url) + for idx, target in enumerate(discovered_links): + if len(rows) >= normalized_limit: + break + if idx >= 300: + break + if time.monotonic() >= crawl_deadline: + break + try: + parsed_target = urlparse(target) + except Exception: + continue + target_host = cls._normalize_host(getattr(parsed_target, "hostname", "") or "") + if not target_host or not (target_host == final_host or target_host.endswith(f".{site_host}")): + if not cls._url_matches_site(target, site_host): + continue + + target_clean = parsed_target._replace(fragment="").geturl() + target_path = str(getattr(parsed_target, "path", "") or "") + target_ext = cls._normalize_extension(Path(target_path).suffix) + + if target_ext == normalized_ext: + if target_clean in seen_files: + continue + seen_files.add(target_clean) + title = Path(unquote(target_path)).name or target_clean + rows.append( + { + "url": target_clean, + "title": title, + "snippet": f"Discovered via crawl from {final_path or '/'}", + } + ) + continue + + if cls._is_probable_html_path(target_path): + if target_clean not in visited_pages and target_clean not in queued: + queue.append(target_clean) + queued.add(target_clean) + + if time.monotonic() >= crawl_deadline: + debug( + "Web crawl fallback reached time budget", + { + "site": site_host, + "visited_pages": len(visited_pages), + "queued_pages": len(queue), + "results": len(rows), + "time_budget_seconds": max_duration_seconds, + }, + ) + + return rows[:normalized_limit] + + @staticmethod + def _extract_duckduckgo_target_url(href: Any) -> str: + """Extract direct target URL from DuckDuckGo result links.""" + raw_href = str(href or "").strip() + if not raw_href: + return "" + + if raw_href.startswith("//"): + raw_href = f"https:{raw_href}" + + if raw_href.startswith("/"): + raw_href = f"https://duckduckgo.com{raw_href}" + + parsed = None + try: + parsed = urlparse(raw_href) + except Exception: + parsed = None + + try: + host = str(getattr(parsed, "hostname", "") or "").strip().lower() + except Exception: + host = "" + + if host.endswith("duckduckgo.com"): + try: + query = parse_qs(str(getattr(parsed, "query", "") or "")) + candidate = (query.get("uddg") or [""])[0] + if candidate: + return str(unquote(candidate)).strip() + except Exception: + pass + + return raw_href + + @staticmethod + def _extract_yahoo_target_url(href: Any) -> str: + """Extract direct target URL from Yahoo redirect links.""" + raw_href = str(href or "").strip() + if not raw_href: + return "" + + # Yahoo result links often look like: + # https://r.search.yahoo.com/.../RU=/RK=... + ru_match = re.search(r"/RU=([^/]+)/RK=", raw_href, flags=re.IGNORECASE) + if ru_match: + try: + return str(unquote(ru_match.group(1))).strip() + except Exception: + pass + + # Fallback for query-string variants. + try: + parsed = urlparse(raw_href) + query = parse_qs(str(getattr(parsed, "query", "") or "")) + candidate = (query.get("RU") or query.get("ru") or [""])[0] + if candidate: + return str(unquote(candidate)).strip() + except Exception: + pass + + return raw_href + + @classmethod + def _url_matches_site(cls, url: str, site_host: str) -> bool: + """Return True when URL host is the requested site/subdomain.""" + try: + parsed = urlparse(str(url or "")) + host = cls._normalize_host(getattr(parsed, "hostname", "") or "") + except Exception: + return False + + target = cls._normalize_host(site_host) + if not host or not target: + return False + return host == target or host.endswith(f".{target}") + + @classmethod + def _parse_duckduckgo_results( + cls, + *, + html_text: str, + site_host: str, + limit: int, + ) -> List[Dict[str, str]]: + """Parse DuckDuckGo HTML results into normalized rows.""" + items: List[Dict[str, str]] = [] + seen_urls: set[str] = set() + + def _add_item(url_text: str, title_text: str, snippet_text: str) -> None: + url_clean = str(url_text or "").strip() + if not url_clean: + return + if not url_clean.startswith(("http://", "https://")): + return + if not cls._url_matches_site(url_clean, site_host): + return + if url_clean in seen_urls: + return + + seen_urls.add(url_clean) + title_clean = cls._normalize_space(title_text) + snippet_clean = cls._normalize_space(snippet_text) + items.append( + { + "url": url_clean, + "title": title_clean or url_clean, + "snippet": snippet_clean, + } + ) + + # Preferred parser path (lxml is already a project dependency). + try: + from lxml import html as lxml_html + + doc = lxml_html.fromstring(html_text or "") + result_nodes = doc.xpath("//div[contains(@class, 'result')]") + + for node in result_nodes: + links = node.xpath(".//a[contains(@class, 'result__a')]") + if not links: + continue + + link = links[0] + href = cls._extract_duckduckgo_target_url(link.get("href")) + title = " ".join([str(t).strip() for t in link.itertext() if str(t).strip()]) + + snippet_nodes = node.xpath(".//*[contains(@class, 'result__snippet')]") + snippet = "" + if snippet_nodes: + snippet = " ".join( + [str(t).strip() for t in snippet_nodes[0].itertext() if str(t).strip()] + ) + + _add_item(href, title, snippet) + if len(items) >= limit: + break + except Exception: + # Fallback to regex parser below. + pass + + if items: + return items[:limit] + + # Regex fallback for environments where HTML parsing fails. + anchor_pattern = re.compile( + r']+class="[^"]*result__a[^"]*"[^>]+href="([^"]+)"[^>]*>(.*?)', + flags=re.IGNORECASE | re.DOTALL, + ) + for match in anchor_pattern.finditer(html_text or ""): + href = cls._extract_duckduckgo_target_url(match.group(1)) + title_html = match.group(2) + title = re.sub(r"<[^>]+>", " ", str(title_html or "")) + title = html.unescape(title) + _add_item(href, title, "") + if len(items) >= limit: + break + + return items[:limit] + + @classmethod + def _parse_yahoo_results( + cls, + *, + html_text: str, + site_host: str, + limit: int, + ) -> List[Dict[str, str]]: + """Parse Yahoo HTML search results into normalized rows.""" + items: List[Dict[str, str]] = [] + seen_urls: set[str] = set() + + def _add_item(url_text: str, title_text: str, snippet_text: str) -> None: + url_clean = str(url_text or "").strip() + if not url_clean or not url_clean.startswith(("http://", "https://")): + return + if not cls._url_matches_site(url_clean, site_host): + return + if url_clean in seen_urls: + return + seen_urls.add(url_clean) + items.append( + { + "url": url_clean, + "title": cls._normalize_space(title_text) or url_clean, + "snippet": cls._normalize_space(snippet_text), + } + ) + + try: + from lxml import html as lxml_html + + doc = lxml_html.fromstring(html_text or "") + for node in doc.xpath("//a[@href]"): + href = cls._extract_yahoo_target_url(node.get("href")) + title = " ".join([str(t).strip() for t in node.itertext() if str(t).strip()]) + _add_item(href, title, "") + if len(items) >= limit: + break + except Exception: + anchor_pattern = re.compile( + r']+href=["\']([^"\']+)["\'][^>]*>(.*?)', + flags=re.IGNORECASE | re.DOTALL, + ) + for match in anchor_pattern.finditer(html_text or ""): + href = cls._extract_yahoo_target_url(match.group(1)) + title_html = match.group(2) + title = re.sub(r"<[^>]+>", " ", str(title_html or "")) + title = html.unescape(title) + _add_item(href, title, "") + if len(items) >= limit: + break + + return items[:limit] + + @classmethod + def _query_yahoo( + cls, + *, + search_query: str, + site_host: str, + limit: int, + session: Any, + deadline: Optional[float] = None, + ) -> List[Dict[str, str]]: + """Fetch results from Yahoo search (robust fallback in bot-protected envs).""" + all_rows: List[Dict[str, str]] = [] + seen_urls: set[str] = set() + + max_pages = max(1, min((max(1, int(limit or 1)) + 9) // 10, 3)) + for page_idx in range(max_pages): + if deadline is not None and time.monotonic() >= deadline: + break + + params = { + "p": search_query, + "n": "10", + "b": str((page_idx * 10) + 1), + } + try: + read_timeout = 10.0 + if deadline is not None: + remaining = max(0.0, float(deadline - time.monotonic())) + if remaining <= 0.0: + break + read_timeout = max(3.0, min(10.0, remaining)) + + response = session.get( + "https://search.yahoo.com/search", + params=params, + timeout=(3, read_timeout), + headers={ + "User-Agent": ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/124.0.0.0 Safari/537.36" + ), + "Accept-Language": "en-US,en;q=0.9", + }, + ) + response.raise_for_status() + except Exception: + break + + page_rows = cls._parse_yahoo_results( + html_text=response.text, + site_host=site_host, + limit=max(1, limit - len(all_rows)), + ) + new_rows = 0 + for row in page_rows: + url_value = str(row.get("url") or "").strip() + if not url_value or url_value in seen_urls: + continue + seen_urls.add(url_value) + all_rows.append(row) + new_rows += 1 + if len(all_rows) >= limit: + break + + if len(all_rows) >= limit or new_rows == 0: + break + + return all_rows[:limit] + + @classmethod + def _parse_bing_results( + cls, + *, + html_text: str, + site_host: str, + limit: int, + ) -> List[Dict[str, str]]: + """Parse Bing HTML search results into normalized rows.""" + items: List[Dict[str, str]] = [] + seen_urls: set[str] = set() + + def _add_item(url_text: str, title_text: str, snippet_text: str) -> None: + url_clean = str(url_text or "").strip() + if not url_clean or not url_clean.startswith(("http://", "https://")): + return + if not cls._url_matches_site(url_clean, site_host): + return + if url_clean in seen_urls: + return + seen_urls.add(url_clean) + items.append( + { + "url": url_clean, + "title": cls._normalize_space(title_text) or url_clean, + "snippet": cls._normalize_space(snippet_text), + } + ) + + try: + from lxml import html as lxml_html + + doc = lxml_html.fromstring(html_text or "") + result_nodes = doc.xpath("//li[contains(@class, 'b_algo')]") + + for node in result_nodes: + links = node.xpath(".//h2/a") + if not links: + continue + link = links[0] + href = str(link.get("href") or "").strip() + title = " ".join([str(t).strip() for t in link.itertext() if str(t).strip()]) + + snippet = "" + for sel in ( + ".//*[contains(@class,'b_caption')]//p", + ".//*[contains(@class,'b_snippet')]", + ".//p", + ): + snip_nodes = node.xpath(sel) + if snip_nodes: + snippet = " ".join( + [str(t).strip() for t in snip_nodes[0].itertext() if str(t).strip()] + ) + break + + _add_item(href, title, snippet) + if len(items) >= limit: + break + except Exception: + anchor_pattern = re.compile( + r"]*>\s*]+href=\"([^\"]+)\"[^>]*>(.*?)", + flags=re.IGNORECASE | re.DOTALL, + ) + for match in anchor_pattern.finditer(html_text or ""): + href = match.group(1) + title = re.sub(r"<[^>]+>", " ", str(match.group(2) or "")) + title = html.unescape(title) + _add_item(href, title, "") + if len(items) >= limit: + break + + return items[:limit] + + @classmethod + def _query_web_search( + cls, + *, + search_query: str, + site_host: str, + limit: int, + ) -> List[Dict[str, str]]: + """Execute web search and return parsed result rows. + + Uses Yahoo first (works in environments where Bing/DDG HTML endpoints + are challenge-gated), then Bing, then DuckDuckGo. + """ + from API.requests_client import get_requests_session + + session = get_requests_session() + normalized_limit = max(1, min(int(limit or 1), 100)) + engine_deadline = time.monotonic() + 12.0 + + # Yahoo often remains parseable where other engines challenge bots. + all_rows = cls._query_yahoo( + search_query=search_query, + site_host=site_host, + limit=normalized_limit, + session=session, + deadline=engine_deadline, + ) + if all_rows: + return all_rows[:normalized_limit] + + # Bing reliably supports filetype: and site: operators when not challenged. + all_rows = cls._query_bing( + search_query=search_query, + site_host=site_host, + limit=normalized_limit, + session=session, + deadline=engine_deadline, + ) + if all_rows: + return all_rows[:normalized_limit] + + # DDG fallback. + all_rows_ddg: List[Dict[str, str]] = [] + seen_urls: set[str] = set() + endpoints = [ + "https://html.duckduckgo.com/html/", + "https://duckduckgo.com/html/", + ] + for endpoint in endpoints: + if time.monotonic() >= engine_deadline: + break + max_offsets = min(3, max(1, (normalized_limit + 29) // 30)) + for page_idx in range(max_offsets): + if time.monotonic() >= engine_deadline: + break + offset = page_idx * 30 + params = {"q": search_query, "s": str(offset)} + remaining = max(0.0, float(engine_deadline - time.monotonic())) + if remaining <= 0.0: + break + read_timeout = max(3.0, min(10.0, remaining)) + response = session.get( + endpoint, + params=params, + timeout=(3, read_timeout), + headers={"Referer": "https://duckduckgo.com/"}, + ) + response.raise_for_status() + page_rows = cls._parse_duckduckgo_results( + html_text=response.text, + site_host=site_host, + limit=max(1, normalized_limit - len(all_rows_ddg)), + ) + new_rows = 0 + for row in page_rows: + url_value = str(row.get("url") or "").strip() + if not url_value or url_value in seen_urls: + continue + seen_urls.add(url_value) + all_rows_ddg.append(row) + new_rows += 1 + if len(all_rows_ddg) >= normalized_limit: + break + if len(all_rows_ddg) >= normalized_limit or new_rows == 0: + break + if all_rows_ddg: + break + + return all_rows_ddg[:normalized_limit] + + @classmethod + def _query_bing( + cls, + *, + search_query: str, + site_host: str, + limit: int, + session: Any, + deadline: Optional[float] = None, + ) -> List[Dict[str, str]]: + """Fetch results from Bing (supports filetype: and site: natively).""" + all_rows: List[Dict[str, str]] = [] + seen_urls: set[str] = set() + + page_start = 1 + pages_checked = 0 + max_pages = max(1, min((max(1, int(limit or 1)) + 49) // 50, 3)) + while len(all_rows) < limit and pages_checked < max_pages: + if deadline is not None and time.monotonic() >= deadline: + break + + params = {"q": search_query, "first": str(page_start), "count": "50"} + try: + read_timeout = 10.0 + if deadline is not None: + remaining = max(0.0, float(deadline - time.monotonic())) + if remaining <= 0.0: + break + read_timeout = max(3.0, min(10.0, remaining)) + + response = session.get( + "https://www.bing.com/search", + params=params, + timeout=(3, read_timeout), + headers={ + "User-Agent": ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/124.0.0.0 Safari/537.36" + ), + "Accept-Language": "en-US,en;q=0.9", + }, + ) + response.raise_for_status() + except Exception: + break + + page_rows = cls._parse_bing_results( + html_text=response.text, + site_host=site_host, + limit=max(1, limit - len(all_rows)), + ) + new_rows = 0 + for row in page_rows: + url_value = str(row.get("url") or "").strip() + if not url_value or url_value in seen_urls: + continue + seen_urls.add(url_value) + all_rows.append(row) + new_rows += 1 + if len(all_rows) >= limit: + break + + if new_rows == 0 or len(all_rows) >= limit: + break + page_start += 50 + pages_checked += 1 + + return all_rows + + def _run_web_search( + self, + *, + web_plan: Dict[str, Any], + limit: int, + args_list: List[str], + refresh_mode: bool, + command_title: str, + ) -> int: + """Execute URL-scoped web search and emit downloadable table rows.""" + site_host = str(web_plan.get("site_host") or "").strip().lower() + search_query = str(web_plan.get("search_query") or "").strip() + requested_type = self._normalize_extension(web_plan.get("filetype") or "") + seed_url = str(web_plan.get("seed_url") or "").strip() + + if not site_host or not search_query: + log("Error: invalid website search request", file=sys.stderr) + return 1 + + worker_id = str(uuid.uuid4()) + try: + insert_worker( + worker_id, + "search-file", + title=f"Web Search: {search_query}", + description=f"Site: {site_host}", + ) + except Exception: + pass + + try: + from SYS.result_table import Table + + rows = self._query_web_search( + search_query=search_query, + site_host=site_host, + limit=limit, + ) + + if not rows and requested_type: + debug( + "Web search returned 0 rows; falling back to in-site crawl", + {"site": site_host, "ext": requested_type, "seed_url": seed_url}, + ) + rows = self._crawl_site_for_extension( + seed_url=seed_url or f"https://{site_host}/", + site_host=site_host, + extension=requested_type, + limit=limit, + max_duration_seconds=10.0, + ) + + table = Table(command_title) + table.set_table("web.search") + table.set_source_command("search-file", list(args_list)) + try: + table.set_table_metadata( + { + "provider": "web", + "site": site_host, + "query": search_query, + "filetype": requested_type, + } + ) + except Exception: + pass + + if not rows: + log(f"No web results found for query: {search_query}", file=sys.stderr) + if refresh_mode: + try: + ctx.set_last_result_table_preserve_history(table, []) + except Exception: + pass + try: + append_worker_stdout(worker_id, json.dumps([], indent=2)) + update_worker(worker_id, status="completed") + except Exception: + pass + return 0 + + results_list: List[Dict[str, Any]] = [] + for row in rows: + target_url = str(row.get("url") or "").strip() + if not target_url: + continue + + source_title = str(row.get("title") or "").strip() + title = source_title or target_url + snippet = self._normalize_space(row.get("snippet") or "") + if len(snippet) > 120: + snippet = f"{snippet[:117].rstrip()}..." + + detected_ext = requested_type + file_name = "" + if not detected_ext: + try: + parsed_path = Path(urlparse(target_url).path) + file_name = Path(unquote(str(parsed_path))).name + detected_ext = self._normalize_extension(parsed_path.suffix) + except Exception: + detected_ext = "" + else: + try: + file_name = Path(unquote(urlparse(target_url).path)).name + except Exception: + file_name = "" + + # For filetype-based web searches, prefer a concise filename title. + if file_name: + title = file_name + + payload: Dict[str, Any] = { + "title": title, + "path": target_url, + "url": target_url, + "source": "web", + "store": "web", + "table": "web.search", + "ext": detected_ext, + "detail": snippet, + "tag": [f"site:{site_host}"] + ([f"type:{detected_ext}"] if detected_ext else []), + "columns": [ + ("Title", title), + ("Type", detected_ext), + ("URL", target_url), + ], + "_selection_args": ["-url", target_url], + "_selection_action": ["download-file", "-url", target_url], + } + + table.add_result(payload) + results_list.append(payload) + ctx.emit(payload) + + if refresh_mode: + ctx.set_last_result_table_preserve_history(table, results_list) + else: + ctx.set_last_result_table(table, results_list) + + ctx.set_current_stage_table(table) + + try: + append_worker_stdout(worker_id, json.dumps(results_list, indent=2)) + update_worker(worker_id, status="completed") + except Exception: + pass + + return 0 + + except Exception as exc: + log(f"Web search failed: {exc}", file=sys.stderr) + try: + update_worker(worker_id, status="error") + except Exception: + pass + return 1 + @staticmethod def _normalize_extension(ext_value: Any) -> str: """Sanitize extension strings to alphanumerics and cap at 5 chars.""" @@ -566,6 +1651,7 @@ class search_file(Cmdlet): limit = 100 limit_set = False searched_backends: List[str] = [] + positional_args: List[str] = [] i = 0 while i < len(args_list): @@ -602,6 +1688,7 @@ class search_file(Cmdlet): limit = 100 i += 2 elif not arg.startswith("-"): + positional_args.append(arg) query = f"{query} {arg}".strip() if query else arg i += 1 else: @@ -685,6 +1772,22 @@ class search_file(Cmdlet): hash_query = parse_hash_query(query) + web_plan = self._build_web_search_plan( + query=query, + positional_args=positional_args, + storage_backend=storage_backend, + store_filter=store_filter, + hash_query=hash_query, + ) + if web_plan is not None: + return self._run_web_search( + web_plan=web_plan, + limit=limit, + args_list=args_list, + refresh_mode=refresh_mode, + command_title=command_title, + ) + if not query: log("Provide a search query", file=sys.stderr) return 1