This commit is contained in:
2026-01-17 03:37:11 -08:00
parent c6fd6b4224
commit 5e76c44155
2 changed files with 294 additions and 22 deletions

View File

@@ -3060,20 +3060,17 @@ def check_url_exists_in_storage(
in_pipeline = bool(stage_ctx is not None or ("|" in str(current_cmd_text or "")))
if in_pipeline:
try:
cached_cmd = pipeline_context.load_value("preflight.url_duplicates.command", default="")
cached_decision = pipeline_context.load_value("preflight.url_duplicates.continue", default=None)
already_checked = bool(
pipeline_context.load_value(
"preflight.url_duplicates.checked", default=False
)
)
except Exception:
cached_cmd = ""
cached_decision = None
already_checked = False
if cached_decision is not None and str(cached_cmd or "") == str(current_cmd_text or ""):
if bool(cached_decision):
return True
try:
pipeline_context.request_pipeline_stop(reason="duplicate-url declined", exit_code=0)
except Exception:
pass
return False
if already_checked:
debug("Bulk URL preflight: already checked in pipeline; skipping duplicate check")
return True
def _load_preflight_cache() -> Dict[str, Any]:
try:
@@ -3088,6 +3085,40 @@ def check_url_exists_in_storage(
except Exception:
pass
def _mark_preflight_checked() -> None:
if not in_pipeline:
return
try:
pipeline_context.store_value("preflight.url_duplicates.checked", True)
except Exception:
pass
preflight_cache = _load_preflight_cache()
preflight_cache["url_duplicates_checked"] = True
url_dup_cache = preflight_cache.get("url_duplicates")
if not isinstance(url_dup_cache, dict):
url_dup_cache = {}
url_dup_cache["checked"] = True
preflight_cache["url_duplicates"] = url_dup_cache
_store_preflight_cache(preflight_cache)
if in_pipeline:
try:
cached_cmd = pipeline_context.load_value("preflight.url_duplicates.command", default="")
cached_decision = pipeline_context.load_value("preflight.url_duplicates.continue", default=None)
except Exception:
cached_cmd = ""
cached_decision = None
if cached_decision is not None and str(cached_cmd or "") == str(current_cmd_text or ""):
_mark_preflight_checked()
if bool(cached_decision):
return True
try:
pipeline_context.request_pipeline_stop(reason="duplicate-url declined", exit_code=0)
except Exception:
pass
return False
unique_urls: List[str] = []
for u in urls or []:
s = str(u or "").strip()
@@ -3107,6 +3138,46 @@ def check_url_exists_in_storage(
except Exception:
return False
def _normalize_url_for_search(value: str) -> str:
url = str(value or "").strip()
# Strip fragment (e.g., #t=10) before matching
url = url.split("#", 1)[0]
# Strip common time/tracking query params for matching
try:
parsed = urlparse(url)
except Exception:
parsed = None
if parsed is not None and parsed.query:
time_keys = {"t", "start", "time_continue", "timestamp", "time", "begin"}
tracking_prefixes = ("utm_",)
try:
pairs = parse_qsl(parsed.query, keep_blank_values=True)
filtered = []
for key, val in pairs:
key_norm = str(key or "").lower()
if key_norm in time_keys:
continue
if key_norm.startswith(tracking_prefixes):
continue
filtered.append((key, val))
if filtered:
url = urlunparse(parsed._replace(query=urlencode(filtered, doseq=True)))
else:
url = urlunparse(parsed._replace(query=""))
except Exception:
pass
# Remove protocol (http://, https://, ftp://, etc.)
url = re.sub(r"^[a-z][a-z0-9+.-]*://", "", url, flags=re.IGNORECASE)
# Remove www. prefix (case-insensitive)
url = re.sub(r"^www\.", "", url, flags=re.IGNORECASE)
return url.lower()
def _expand_url_variants(value: str) -> List[str]:
if not _httpish(value):
return []
@@ -3121,6 +3192,51 @@ def check_url_exists_in_storage(
out: List[str] = []
def _add_variant(candidate: str) -> None:
_maybe_add(candidate)
try:
lower = str(candidate or "").lower()
except Exception:
lower = ""
if lower and lower != candidate:
_maybe_add(lower)
try:
parsed_candidate = urlparse(candidate)
except Exception:
parsed_candidate = None
if parsed_candidate is None:
return
host = (parsed_candidate.hostname or "").strip().lower()
if host.startswith("www."):
host = host[4:]
if host:
netloc = host
try:
if parsed_candidate.port:
netloc = f"{netloc}:{parsed_candidate.port}"
except Exception:
pass
try:
if parsed_candidate.username or parsed_candidate.password:
userinfo = parsed_candidate.username or ""
if parsed_candidate.password:
userinfo = f"{userinfo}:{parsed_candidate.password}"
if userinfo:
netloc = f"{userinfo}@{netloc}"
except Exception:
pass
alt = urlunparse(parsed_candidate._replace(netloc=netloc))
_maybe_add(alt)
try:
lower_alt = alt.lower()
except Exception:
lower_alt = ""
if lower_alt and lower_alt != alt:
_maybe_add(lower_alt)
def _maybe_add(candidate: str) -> None:
if not candidate or candidate == value:
return
@@ -3128,7 +3244,7 @@ def check_url_exists_in_storage(
out.append(candidate)
if parsed.fragment:
_maybe_add(urlunparse(parsed._replace(fragment="")))
_add_variant(urlunparse(parsed._replace(fragment="")))
time_keys = {"t", "start", "time_continue", "timestamp", "time", "begin"}
tracking_prefixes = ("utm_",)
@@ -3153,7 +3269,7 @@ def check_url_exists_in_storage(
if removed:
new_query = urlencode(filtered_pairs, doseq=True) if filtered_pairs else ""
_maybe_add(urlunparse(parsed._replace(query=new_query, fragment="")))
_add_variant(urlunparse(parsed._replace(query=new_query, fragment="")))
return out
@@ -3176,13 +3292,35 @@ def check_url_exists_in_storage(
continue
if n2 not in filtered:
filtered.append(n2)
lowered: List[str] = []
for n2 in filtered:
try:
lower = n2.lower()
except Exception:
lower = ""
if lower and lower != n2 and lower not in filtered and lower not in lowered:
lowered.append(lower)
normalized: List[str] = []
for n2 in filtered:
norm = _normalize_url_for_search(n2)
if norm and norm not in normalized and norm not in filtered:
normalized.append(norm)
expanded: List[str] = []
for n2 in filtered:
for extra in _expand_url_variants(n2):
if extra not in expanded and extra not in filtered:
if extra not in expanded and extra not in filtered and extra not in lowered:
expanded.append(extra)
norm_extra = _normalize_url_for_search(extra)
if (
norm_extra
and norm_extra not in normalized
and norm_extra not in filtered
and norm_extra not in expanded
and norm_extra not in lowered
):
normalized.append(norm_extra)
combined = filtered + expanded
combined = filtered + expanded + lowered + normalized
url_needles[u] = combined if combined else [u]
if in_pipeline:
@@ -3196,15 +3334,28 @@ def check_url_exists_in_storage(
if cached_set:
all_cached = True
for original_url, needles in url_needles.items():
if original_url in cached_set:
continue
if any(n in cached_set for n in (needles or [])):
original_cached = str(original_url or "") in cached_set
needles_cached = True
if original_cached:
for needle in (needles or []):
needle_text = str(needle or "")
if not needle_text:
continue
if needle_text not in cached_set:
needles_cached = False
break
else:
needles_cached = False
if original_cached and needles_cached:
continue
all_cached = False
break
if all_cached:
debug("Bulk URL preflight: cached for pipeline; skipping duplicate check")
_mark_preflight_checked()
return True
def _search_backend_url_hits(
@@ -3215,14 +3366,103 @@ def check_url_exists_in_storage(
) -> Optional[Dict[str, Any]]:
backend_hits: List[Dict[str, Any]] = []
for needle in (needles or [])[:3]:
needle_text = str(needle or "").strip()
if not needle_text:
continue
search_needle = _normalize_url_for_search(needle_text) or needle_text
query = f"url:*{search_needle}*"
try:
backend_hits = backend.search(f"url:{needle}", limit=1) or []
backend_hits = backend.search(query, limit=1) or []
if backend_hits:
break
except Exception:
continue
if not backend_hits:
def _match_normalized_url(pattern_text: str, candidate_url: str) -> bool:
pattern_norm = _normalize_url_for_search(pattern_text)
candidate_norm = _normalize_url_for_search(candidate_url)
if not pattern_norm or not candidate_norm:
return False
if pattern_norm == candidate_norm:
return True
return pattern_norm in candidate_norm
fallback_hits: List[Dict[str, Any]] = []
try:
fallback_hits = backend.search("url:*", limit=200) or []
except Exception:
fallback_hits = []
for hit in fallback_hits:
url_values: List[str] = []
try:
raw_urls = get_field(hit, "known_urls") or get_field(hit, "urls") or get_field(hit, "url")
if isinstance(raw_urls, str) and raw_urls.strip():
url_values.append(raw_urls.strip())
elif isinstance(raw_urls, (list, tuple, set)):
for item in raw_urls:
if isinstance(item, str) and item.strip():
url_values.append(item.strip())
except Exception:
url_values = []
if not url_values:
try:
file_hash = hit.get("hash") if isinstance(hit, dict) else None
if file_hash:
fetched = backend.get_url(str(file_hash))
if isinstance(fetched, str) and fetched.strip():
url_values.append(fetched.strip())
elif isinstance(fetched, (list, tuple, set)):
for item in fetched:
if isinstance(item, str) and item.strip():
url_values.append(item.strip())
except Exception:
pass
if not url_values:
continue
matched = False
for url_value in url_values:
for needle in (needles or []):
if _match_normalized_url(str(needle or ""), str(url_value or "")):
matched = True
break
if matched:
break
if not matched:
continue
title = "(exists)"
try:
title = hit.get("title") or hit.get("name") or hit.get("target") or hit.get("path") or "(exists)"
except Exception:
title = "(exists)"
file_hash = ""
try:
file_hash = hit.get("hash") or hit.get("file_hash") or hit.get("sha256") or ""
except Exception:
file_hash = ""
return {
"title": str(title),
"store": str(hit.get("store") or backend_name),
"hash": str(file_hash or ""),
"ext": "",
"size": None,
"url": original_url,
"columns": [
("Title", str(title)),
("Store", str(hit.get("store") or backend_name)),
("Hash", str(file_hash or "")),
("URL", original_url),
],
}
return None
hit = backend_hits[0]
@@ -3326,7 +3566,7 @@ def check_url_exists_in_storage(
found_hash: Optional[str] = None
found = False
for needle in (needles or [])[:3]:
for needle in (needles or [])[:6]:
if not _httpish(needle):
continue
try:
@@ -3415,6 +3655,7 @@ def check_url_exists_in_storage(
url_dup_cache["urls"] = sorted(cached_set)
preflight_cache["url_duplicates"] = url_dup_cache
_store_preflight_cache(preflight_cache)
_mark_preflight_checked()
return True
table = ResultTable(f"URL already exists ({len(matched_urls)} url(s))", max_columns=10)
@@ -3477,6 +3718,8 @@ def check_url_exists_in_storage(
pipeline_context.request_pipeline_stop(reason="duplicate-url declined", exit_code=0)
except Exception:
pass
_mark_preflight_checked()
return False
_mark_preflight_checked()
return True

View File

@@ -7,7 +7,7 @@ from typing import Any, Dict, List, Sequence, Optional, Set, Tuple
import sys
import re
from fnmatch import fnmatch
from urllib.parse import urlparse
from urllib.parse import urlparse, parse_qsl, urlencode, urlunparse
from . import _shared as sh
Cmdlet, SharedArgs, parse_cmdlet_args, get_field, normalize_hash = (
@@ -67,6 +67,35 @@ class Get_Url(Cmdlet):
"""
url = str(url or "").strip()
# Strip fragment (e.g., #t=10) before matching
url = url.split("#", 1)[0]
# Strip common time/tracking query params for matching
try:
parsed = urlparse(url)
except Exception:
parsed = None
if parsed is not None and parsed.query:
time_keys = {"t", "start", "time_continue", "timestamp", "time", "begin"}
tracking_prefixes = ("utm_",)
try:
pairs = parse_qsl(parsed.query, keep_blank_values=True)
filtered = []
for key, val in pairs:
key_norm = str(key or "").lower()
if key_norm in time_keys:
continue
if key_norm.startswith(tracking_prefixes):
continue
filtered.append((key, val))
if filtered:
url = urlunparse(parsed._replace(query=urlencode(filtered, doseq=True)))
else:
url = urlunparse(parsed._replace(query=""))
except Exception:
pass
# Remove protocol (http://, https://, ftp://, etc.)
url = re.sub(r"^[a-z][a-z0-9+.-]*://", "", url, flags=re.IGNORECASE)