This commit is contained in:
2026-01-20 16:42:49 -08:00
parent 1e2054189b
commit 922b649e17
9 changed files with 351 additions and 141 deletions

View File

@@ -27,7 +27,7 @@ from SYS.models import (
)
from SYS.pipeline_progress import PipelineProgress
from SYS.utils import ensure_directory, sha256_file
from SYS.metadata import extract_ytdlp_tags
from SYS.yt_metadata import extract_ytdlp_tags
_YTDLP_TRANSFER_STATE: Dict[str, Dict[str, Any]] = {}
@@ -44,6 +44,96 @@ else:
_EXTRACTOR_CACHE: List[Any] | None = None
# Patterns for domain extraction from yt-dlp regexes
# 1) Alternation group followed by \.tld e.g. (?:youtube|youtu|youtube-nocookie)\.com
ALT_GROUP_TLD = re.compile(r'\((?:\?:)?([^\)]+)\)\\\.(?P<tld>[A-Za-z0-9.+-]+)')
# 2) Literal domain pieces like youtube\.com or youtu\.be (not preceded by a group)
LITERAL_DOMAIN = re.compile(r'(?<!\()(?<!\|)(?<!:)([A-Za-z0-9][A-Za-z0-9_-]{0,})\\\.([A-Za-z0-9.+-]+)')
# 3) Partial domain tokens that appear alone (e.g., zhihu) — treat as zhihu.com fallback
PARTIAL_TOKEN = re.compile(r'(?<![A-Za-z0-9_-])([A-Za-z0-9][A-Za-z0-9_-]{1,})(?=(?:\\?[/\)\$]|\\\.|$))')
_SUPPORTED_DOMAINS: set[str] | None = None
def normalize_patterns(valid_url) -> List[str]:
if not valid_url:
return []
if isinstance(valid_url, str):
return [valid_url]
if isinstance(valid_url, (list, tuple)):
return [p for p in valid_url if isinstance(p, str)]
return []
def extract_from_pattern(pat: str) -> set[str]:
domains = set()
# 1) Alternation groups followed by .tld
for alt_group, tld in ALT_GROUP_TLD.findall(pat):
# alt_group like "youtube|youtu|youtube-nocookie"
for alt in alt_group.split('|'):
alt = alt.strip()
# remove any non-domain tokens like (?:www\.)? if present inside alt (rare)
alt = re.sub(r'\(\?:www\\\.\)\?', '', alt)
if alt:
domains.add(f"{alt}.{tld}".lower())
# 2) Literal domain matches (youtube\.com)
for name, tld in LITERAL_DOMAIN.findall(pat):
domains.add(f"{name}.{tld}".lower())
# 3) Partial tokens fallback (only if we didn't already capture domains)
# This helps when regexes contain plain tokens like 'zhihu' or 'vimeo' without .com
if not domains:
for token in PARTIAL_TOKEN.findall(pat):
# ignore common regex words that are not domains
if len(token) <= 2:
continue
# avoid tokens that are clearly regex constructs
if token.lower() in {"https", "http", "www", "com", "net", "org"}:
continue
domains.add(f"{token.lower()}.com")
return domains
def extract_domains(valid_url) -> set[str]:
patterns = normalize_patterns(valid_url)
all_domains = set()
for pat in patterns:
all_domains |= extract_from_pattern(pat)
# final cleanup: remove obvious junk like 'com.com' if present
cleaned = set()
for d in all_domains:
# drop duplicates where left side equals tld (e.g., com.com)
parts = d.split('.')
if len(parts) >= 2 and parts[-2] == parts[-1]:
continue
cleaned.add(d)
return cleaned
def _build_supported_domains() -> set[str]:
global _SUPPORTED_DOMAINS
if _SUPPORTED_DOMAINS is not None:
return _SUPPORTED_DOMAINS
_SUPPORTED_DOMAINS = set()
if gen_extractors is None:
return _SUPPORTED_DOMAINS
try:
for e in gen_extractors():
name = getattr(e, "IE_NAME", "").lower()
if name == "generic":
continue
regex = getattr(e, "_VALID_URL", None)
domains = extract_domains(regex)
_SUPPORTED_DOMAINS.update(domains)
except Exception:
pass
return _SUPPORTED_DOMAINS
def _get_nested(config: Dict[str, Any], *path: str) -> Any:
cur: Any = config
@@ -122,17 +212,14 @@ def is_url_supported_by_ytdlp(url: str) -> bool:
return False
try:
for extractor in _get_extractors():
try:
if not extractor.suitable(url):
continue
except Exception:
continue
name = getattr(extractor, "IE_NAME", "").lower()
if name == "generic":
continue
return True
parsed = urlparse(url)
domain = parsed.netloc.lower()
if not domain:
return False
supported = _build_supported_domains()
for base in supported:
if domain == base or domain.endswith("." + base):
return True
except Exception:
return False
@@ -593,16 +680,22 @@ class YtDlpTool:
# Defaulting to 'chrome' as the most common path.
base_options["cookiesfrombrowser"] = "chrome"
# Special handling for format keywords
if opts.ytdl_format == "audio":
opts = opts._replace(mode="audio", ytdl_format=None)
elif opts.ytdl_format == "video":
opts = opts._replace(mode="video", ytdl_format=None)
if opts.no_playlist:
base_options["noplaylist"] = True
fmt = opts.ytdl_format or self.default_format(opts.mode)
base_options["format"] = fmt
# if opts.mode == "audio":
# base_options["postprocessors"] = [{
# "key": "FFmpegExtractAudio"
# }]
if opts.mode == "audio":
base_options["postprocessors"] = [{
"key": "FFmpegExtractAudio"
}]
if opts.mode != "audio":
format_sort = self.defaults.format_sort or [