From 7ddf0065d1bba53febf9cdfeb9d51875539426ca Mon Sep 17 00:00:00 2001 From: Nose Date: Mon, 19 Jan 2026 06:24:09 -0800 Subject: [PATCH] f --- .gitignore | 4 +- API/HTTP.py | 8 +- API/HydrusNetwork.py | 23 +- API/__init__.py | 3 + API/folder.py | 17 +- CLI.py | 117 +--------- Provider/HIFI.py | 2 +- Provider/Tidal.py | 2 +- Provider/alldebrid.py | 4 +- Provider/fileio.py | 4 +- Provider/internetarchive.py | 2 +- Provider/matrix.py | 2 +- Provider/openlibrary.py | 2 +- Provider/soulseek.py | 13 +- Provider/telegram.py | 6 +- ProviderCore/base.py | 6 +- SYS/__init__.py | 0 SYS/cli_parsing.py | 22 +- SYS/config.py | 6 +- SYS/html_table.py | 12 +- SYS/json_table.py | 10 +- SYS/logger.py | 3 +- SYS/metadata.py | 379 +++++++++++++++++++++++++++----- SYS/models.py | 8 +- SYS/pipeline.py | 73 +++--- SYS/result_table.py | 59 ++--- SYS/rich_display.py | 17 +- SYS/utils.py | 6 +- SYS/utils_constant.py | 4 +- SYS/worker.py | 6 +- SYS/worker_manager.py | 32 +-- Store/Folder.py | 16 +- Store/HydrusNetwork.py | 20 +- Store/ZeroTier.py | 2 +- Store/_base.py | 2 +- Store/registry.py | 6 +- TUI/modalscreen/config_modal.py | 14 +- TUI/modalscreen/search.py | 3 + cmdlet/delete_url.py | 16 +- cmdlet/get_url.py | 14 +- cmdlet/search_file.py | 13 +- cmdnat/__init__.py | 2 +- scripts/bootstrap.py | 8 +- scripts/hydrusnetwork.py | 33 +-- tool/ytdlp.py | 37 ++-- 45 files changed, 627 insertions(+), 411 deletions(-) create mode 100644 API/__init__.py create mode 100644 SYS/__init__.py diff --git a/.gitignore b/.gitignore index 9ba7870..2a577bf 100644 --- a/.gitignore +++ b/.gitignore @@ -239,4 +239,6 @@ scripts/mm tmp_* *.secret # Ignore local ZeroTier auth tokens (project-local copy) -authtoken.secret \ No newline at end of file +authtoken.secret + +mypy.ini \ No newline at end of file diff --git a/API/HTTP.py b/API/HTTP.py index fda2a89..4592797 100644 --- a/API/HTTP.py +++ b/API/HTTP.py @@ -452,7 +452,7 @@ class HTTPClient: else: kwargs["headers"] = self._get_headers() - last_exception = None + last_exception: Exception | None = None for attempt in range(self.retries): self._debug_panel( @@ -875,7 +875,7 @@ def download_direct_file( pass tags: List[str] = [] - if extract_ytdlp_tags: + if extract_ytdlp_tags is not None: try: tags = extract_ytdlp_tags(info) except Exception as exc: @@ -884,7 +884,7 @@ def download_direct_file( if not any(str(t).startswith("title:") for t in tags): info["title"] = str(filename) tags = [] - if extract_ytdlp_tags: + if extract_ytdlp_tags is not None: try: tags = extract_ytdlp_tags(info) except Exception as exc: @@ -1135,7 +1135,7 @@ class AsyncHTTPClient: else: kwargs["headers"] = self._get_headers() - last_exception = None + last_exception: Exception | None = None for attempt in range(self.retries): try: diff --git a/API/HydrusNetwork.py b/API/HydrusNetwork.py index 0073953..4a139b8 100644 --- a/API/HydrusNetwork.py +++ b/API/HydrusNetwork.py @@ -2066,9 +2066,9 @@ def _derive_title( "original_display_filename", "original_filename", ): - value = entry.get(key) - if isinstance(value, str): - cleaned = value.strip() + raw_val = entry.get(key) + if isinstance(raw_val, str): + cleaned = raw_val.strip() if cleaned: return cleaned return None @@ -2444,7 +2444,7 @@ def fetch_hydrus_metadata_by_url(payload: Dict[str, Any]) -> Dict[str, Any]: matched_url = None normalized_reported = None seen: Set[str] = set() - queue = deque() + queue: deque[str] = deque() for variant in _generate_hydrus_url_variants(url): queue.append(variant) if not queue: @@ -2486,11 +2486,11 @@ def fetch_hydrus_metadata_by_url(payload: Dict[str, Any]) -> Dict[str, Any]: if isinstance(raw_hashes, list): for item in raw_hashes: try: - normalized = _normalize_hash(item) + norm_hash = _normalize_hash(item) except ValueError: continue - if normalized: - response_hashes_list.append(normalized) + if norm_hash: + response_hashes_list.append(norm_hash) raw_ids = response.get("file_ids") or response.get("file_id") if isinstance(raw_ids, list): for item in raw_ids: @@ -2510,12 +2510,13 @@ def fetch_hydrus_metadata_by_url(payload: Dict[str, Any]) -> Dict[str, Any]: continue status_hash = entry.get("hash") or entry.get("file_hash") if status_hash: + norm_status: Optional[str] = None try: - normalized = _normalize_hash(status_hash) + norm_status = _normalize_hash(status_hash) except ValueError: - normalized = None - if normalized: - response_hashes_list.append(normalized) + pass + if norm_status: + response_hashes_list.append(norm_status) status_id = entry.get("file_id") or entry.get("fileid") if status_id is not None: try: diff --git a/API/__init__.py b/API/__init__.py new file mode 100644 index 0000000..fc521a9 --- /dev/null +++ b/API/__init__.py @@ -0,0 +1,3 @@ +"""Medeia API helpers that power external integrations.""" + +__all__ = [] diff --git a/API/folder.py b/API/folder.py index 5030601..90985ad 100644 --- a/API/folder.py +++ b/API/folder.py @@ -55,6 +55,7 @@ def _db_retry(max_attempts: int = 6, base_sleep: float = 0.1): return _decorator # Try to import optional dependencies +mutagen: Any try: import mutagen except ImportError: @@ -72,12 +73,12 @@ try: METADATA_AVAILABLE = True except ImportError: - _read_sidecar_metadata = None - _derive_sidecar_path = None - write_tags = None - write_tags_to_file = None - embed_metadata_in_file = None - read_tags_from_file = None + _read_sidecar_metadata = None # type: ignore + _derive_sidecar_path = None # type: ignore + write_tags = None # type: ignore + write_tags_to_file = None # type: ignore + embed_metadata_in_file = None # type: ignore + read_tags_from_file = None # type: ignore METADATA_AVAILABLE = False # Media extensions to index @@ -219,7 +220,7 @@ class API_folder_store: """ self.library_root = expand_path(library_root).resolve() self.db_path = self.library_root / self.DB_NAME - self.connection: Optional[sqlite3.Connection] = None + self.connection: sqlite3.Connection = None # type: ignore # Use the shared lock self._db_lock = self._shared_db_lock mm_debug(f"[folder-db] init: root={self.library_root} db={self.db_path}") @@ -3818,7 +3819,7 @@ def migrate_all(library_root: Path, db), } finally: - if should_close: + if should_close and db is not None: db.close() diff --git a/CLI.py b/CLI.py index 7cd58f0..798f876 100644 --- a/CLI.py +++ b/CLI.py @@ -419,16 +419,16 @@ class CmdletCompleter(Completer): return arg_names = CmdletIntrospection.cmdlet_args(cmd_name, config) - logical_seen: Set[str] = set() + seen_logicals: Set[str] = set() for arg in arg_names: arg_low = arg.lower() if arg_low.startswith("--"): continue logical = arg.lstrip("-").lower() - if logical in logical_seen: + if logical in seen_logicals: continue yield Completion(arg, start_position=0) - logical_seen.add(logical) + seen_logicals.add(logical) yield Completion("-help", start_position=0) return @@ -541,117 +541,6 @@ class CmdletCompleter(Completer): yield Completion("-help", start_position=-len(current_token)) -# Lexer implementation removed; use `MedeiaLexer` from `SYS.cli_parsing` instead. - - line = document.lines[lineno] - tokens: List[tuple[str, str]] = [] - - pattern = re.compile( - r""" - (\s+) | # 1. Whitespace - (\|) | # 2. Pipe - ("(?:[^"\\]|\\.)*"|'(?:[^'\\]|\\.)*') | # 3. Quoted string - ([^\s\|]+) # 4. Word - """, - re.VERBOSE, - ) - - is_cmdlet = True - - def _emit_keyed_value(word: str) -> bool: - """Emit `key:` prefixes (comma-separated) as argument tokens. - - Designed for values like: - clip:3m4s-3m14s,1h22m-1h33m,item:2-3 - - Avoids special-casing URLs (://) and Windows drive paths (C:\\...). - Returns True if it handled the token. - """ - if not word or ":" not in word: - return False - # Avoid URLs and common scheme patterns. - if "://" in word: - return False - # Avoid Windows drive paths (e.g., C:\foo or D:/bar) - if re.match(r"^[A-Za-z]:[\\/]", word): - return False - - key_prefix = re.compile(r"^([A-Za-z_][A-Za-z0-9_-]*:)(.*)$") - parts = word.split(",") - handled_any = False - for i, part in enumerate(parts): - if i > 0: - tokens.append(("class:value", ",")) - if part == "": - continue - m = key_prefix.match(part) - if m: - tokens.append(("class:argument", m.group(1))) - if m.group(2): - tokens.append(("class:value", m.group(2))) - handled_any = True - else: - tokens.append(("class:value", part)) - handled_any = True - - return handled_any - - for match in pattern.finditer(line): - ws, pipe, quote, word = match.groups() - if ws: - tokens.append(("", ws)) - continue - if pipe: - tokens.append(("class:pipe", pipe)) - is_cmdlet = True - continue - if quote: - # If the quoted token contains a keyed spec (clip:/item:/hash:), - # highlight the `key:` portion in argument-blue even inside quotes. - if len(quote) >= 2 and quote[0] == quote[-1] and quote[0] in ('"', - "'"): - q = quote[0] - inner = quote[1:-1] - start_index = len(tokens) - if _emit_keyed_value(inner): - # _emit_keyed_value already appended tokens for inner; insert opening quote - # before that chunk, then add the closing quote. - tokens.insert(start_index, ("class:string", q)) - tokens.append(("class:string", q)) - is_cmdlet = False - continue - - tokens.append(("class:string", quote)) - is_cmdlet = False - continue - if not word: - continue - - if word.startswith("@"): # selection tokens - rest = word[1:] - if rest and re.fullmatch(r"[0-9\-\*,]+", rest): - tokens.append(("class:selection_at", "@")) - tokens.append(("class:selection_range", rest)) - is_cmdlet = False - continue - if rest == "": - tokens.append(("class:selection_at", "@")) - is_cmdlet = False - continue - - if is_cmdlet: - tokens.append(("class:cmdlet", word)) - is_cmdlet = False - elif word.startswith("-"): - tokens.append(("class:argument", word)) - else: - if not _emit_keyed_value(word): - tokens.append(("class:value", word)) - - return tokens - - return get_line - class ConfigLoader: diff --git a/Provider/HIFI.py b/Provider/HIFI.py index 597af42..22415ef 100644 --- a/Provider/HIFI.py +++ b/Provider/HIFI.py @@ -1278,7 +1278,7 @@ class HIFI(Provider): ) return materialized - def handle_url(self, url: str, *, output_dir: Optional[Path] = None) -> Tuple[bool, Optional[Path]]: + def handle_url(self, url: str, *, output_dir: Optional[Path] = None) -> Tuple[bool, Optional[Path | Dict[str, Any]]]: view, identifier = self._parse_tidal_url(url) if not view: return False, None diff --git a/Provider/Tidal.py b/Provider/Tidal.py index afd2cf1..3d828c0 100644 --- a/Provider/Tidal.py +++ b/Provider/Tidal.py @@ -1265,7 +1265,7 @@ class Tidal(Provider): ) return materialized - def handle_url(self, url: str, *, output_dir: Optional[Path] = None) -> Tuple[bool, Optional[Path]]: + def handle_url(self, url: str, *, output_dir: Optional[Path] = None) -> Tuple[bool, Optional[Path | Dict[str, Any]]]: view, identifier = self._parse_tidal_url(url) if not view: return False, None diff --git a/Provider/alldebrid.py b/Provider/alldebrid.py index 70fc38b..6ace5d7 100644 --- a/Provider/alldebrid.py +++ b/Provider/alldebrid.py @@ -585,7 +585,7 @@ class AllDebrid(TableProviderMixin, Provider): URL_DOMAINS = () @classmethod - def config(cls) -> List[Dict[str, Any]]: + def config_schema(cls) -> List[Dict[str, Any]]: return [ { "key": "api_key", @@ -646,7 +646,7 @@ class AllDebrid(TableProviderMixin, Provider): return spec return resolve_magnet_spec(str(target)) if isinstance(target, str) else None - def handle_url(self, url: str, *, output_dir: Optional[Path] = None) -> Tuple[bool, Optional[Path]]: + def handle_url(self, url: str, *, output_dir: Optional[Path] = None) -> Tuple[bool, Optional[Path | Dict[str, Any]]]: magnet_id = _parse_alldebrid_magnet_id(url) if magnet_id is not None: return True, { diff --git a/Provider/fileio.py b/Provider/fileio.py index 113eaff..0477b04 100644 --- a/Provider/fileio.py +++ b/Provider/fileio.py @@ -2,7 +2,7 @@ from __future__ import annotations import os import sys -from typing import Any, Dict, Optional +from typing import Any, Dict, List, Optional from ProviderCore.base import Provider from SYS.logger import log @@ -53,7 +53,7 @@ class FileIO(Provider): PROVIDER_NAME = "file.io" @classmethod - def config(cls) -> List[Dict[str, Any]]: + def config_schema(cls) -> List[Dict[str, Any]]: return [ { "key": "api_key", diff --git a/Provider/internetarchive.py b/Provider/internetarchive.py index c22959a..18b6f7c 100644 --- a/Provider/internetarchive.py +++ b/Provider/internetarchive.py @@ -468,7 +468,7 @@ class InternetArchive(Provider): URL = ("archive.org",) @classmethod - def config(cls) -> List[Dict[str, Any]]: + def config_schema(cls) -> List[Dict[str, Any]]: return [ { "key": "access_key", diff --git a/Provider/matrix.py b/Provider/matrix.py index 4199b33..9f5eee8 100644 --- a/Provider/matrix.py +++ b/Provider/matrix.py @@ -235,7 +235,7 @@ class Matrix(TableProviderMixin, Provider): """ @classmethod - def config(cls) -> List[Dict[str, Any]]: + def config_schema(cls) -> List[Dict[str, Any]]: return [ { "key": "homeserver", diff --git a/Provider/openlibrary.py b/Provider/openlibrary.py index 3e9a6cb..5647032 100644 --- a/Provider/openlibrary.py +++ b/Provider/openlibrary.py @@ -287,7 +287,7 @@ class OpenLibrary(Provider): } @classmethod - def config(cls) -> List[Dict[str, Any]]: + def config_schema(cls) -> List[Dict[str, Any]]: return [ { "key": "email", diff --git a/Provider/soulseek.py b/Provider/soulseek.py index 054cfc9..f5dd358 100644 --- a/Provider/soulseek.py +++ b/Provider/soulseek.py @@ -245,7 +245,7 @@ class Soulseek(Provider): return False @classmethod - def config(cls) -> List[Dict[str, Any]]: + def config_schema(cls) -> List[Dict[str, Any]]: return [ { "key": "username", @@ -325,6 +325,10 @@ class Soulseek(Provider): ) return None + # Cast to str for Mypy + username = str(username) + filename = str(filename) + # Use tempfile directory as default if generic path elements were passed or None. if output_dir is None: import tempfile @@ -363,10 +367,13 @@ class Soulseek(Provider): target_dir = Path(tempfile.gettempdir()) / "Medios" / "Soulseek" asyncio.set_event_loop(loop) + # Cast to str for Mypy + username_str = str(username) + filename_str = str(filename) return loop.run_until_complete( download_soulseek_file( - username=username, - filename=filename, + username=username_str, + filename=filename_str, output_dir=target_dir, timeout=self.MAX_WAIT_TRANSFER, ) diff --git a/Provider/telegram.py b/Provider/telegram.py index 23509e7..2ce1359 100644 --- a/Provider/telegram.py +++ b/Provider/telegram.py @@ -7,7 +7,7 @@ import sys import time import threading from pathlib import Path -from typing import Any, Dict, Optional, Sequence, Tuple +from typing import Any, Dict, List, Optional, Sequence, Tuple from urllib.parse import urlparse from ProviderCore.base import Provider, SearchResult @@ -150,7 +150,7 @@ class Telegram(Provider): URL = ("t.me", "telegram.me") @classmethod - def config(cls) -> List[Dict[str, Any]]: + def config_schema(cls) -> List[Dict[str, Any]]: return [ { "key": "app_id", @@ -1175,7 +1175,7 @@ class Telegram(Provider): raise ValueError("Not a Telegram URL") return self._download_message_media_sync(url=url, output_dir=output_dir) - def handle_url(self, url: str, *, output_dir: Optional[Path] = None) -> Tuple[bool, Optional[Path]]: + def handle_url(self, url: str, *, output_dir: Optional[Path] = None) -> Tuple[bool, Optional[Path | Dict[str, Any]]]: """Optional provider override to parse and act on URLs.""" if not _looks_like_telegram_message_url(url): return False, None diff --git a/ProviderCore/base.py b/ProviderCore/base.py index 79b0720..38b5180 100644 --- a/ProviderCore/base.py +++ b/ProviderCore/base.py @@ -24,7 +24,7 @@ class SearchResult: size_bytes: Optional[int] = None tag: set[str] = field(default_factory=set) # Searchable tag values columns: List[Tuple[str, str]] = field(default_factory=list) # Display columns - selection_action: Optional[Dict[str, Any]] = None + selection_action: Optional[List[str]] = None selection_args: Optional[List[str]] = None full_metadata: Dict[str, Any] = field(default_factory=dict) # Extra metadata @@ -150,7 +150,7 @@ class Provider(ABC): ).lower() @classmethod - def config(cls) -> List[Dict[str, Any]]: + def config_schema(cls) -> List[Dict[str, Any]]: """Return configuration schema for this provider. Returns a list of dicts, each defining a field: @@ -228,7 +228,7 @@ class Provider(ABC): _ = config return 0 - def handle_url(self, url: str, *, output_dir: Optional[Path] = None) -> Tuple[bool, Optional[Path]]: + def handle_url(self, url: str, *, output_dir: Optional[Path] = None) -> Tuple[bool, Optional[Path | Dict[str, Any]]]: """Optional provider override to parse and act on URLs.""" _ = url diff --git a/SYS/__init__.py b/SYS/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/SYS/cli_parsing.py b/SYS/cli_parsing.py index 4d2bb14..db97879 100644 --- a/SYS/cli_parsing.py +++ b/SYS/cli_parsing.py @@ -13,12 +13,14 @@ from typing import Any, Dict, List, Optional, Set, Tuple # stubs if prompt_toolkit is not available so imports remain safe for testing. try: from prompt_toolkit.document import Document - from prompt_toolkit.lexers import Lexer + from prompt_toolkit.lexers import Lexer as _PTK_Lexer except Exception: # pragma: no cover - optional dependency Document = object # type: ignore + # Fallback to a simple object when prompt_toolkit is not available + _PTK_Lexer = object # type: ignore - class Lexer: # simple fallback base - pass +# Expose a stable name used by the rest of the module +Lexer = _PTK_Lexer class SelectionSyntax: @@ -216,19 +218,19 @@ class SelectionFilterSyntax: if ":" in s: parts = [p.strip() for p in s.split(":")] if len(parts) == 2 and all(p.isdigit() for p in parts): - m, sec = parts - return max(0, int(m) * 60 + int(sec)) + m_str, sec_str = parts + return max(0, int(m_str) * 60 + int(sec_str)) if len(parts) == 3 and all(p.isdigit() for p in parts): - h, m, sec = parts - return max(0, int(h) * 3600 + int(m) * 60 + int(sec)) + h_str, m_str, sec_str = parts + return max(0, int(h_str) * 3600 + int(m_str) * 60 + int(sec_str)) # token format: 1h2m3s (tokens can appear in any combination) total = 0 found = False - for m in SelectionFilterSyntax._DUR_TOKEN_RE.finditer(s): + for match in SelectionFilterSyntax._DUR_TOKEN_RE.finditer(s): found = True - n = int(m.group(1)) - unit = m.group(2).lower() + n = int(match.group(1)) + unit = match.group(2).lower() if unit == "h": total += n * 3600 elif unit == "m": diff --git a/SYS/config.py b/SYS/config.py index 5bb02af..e9d9501 100644 --- a/SYS/config.py +++ b/SYS/config.py @@ -5,7 +5,7 @@ from __future__ import annotations import re import tempfile from pathlib import Path -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, List from SYS.logger import log from SYS.utils import expand_path @@ -722,10 +722,6 @@ def reload_config( return load_config(config_dir=config_dir, filename=filename) -def clear_config_cache() -> None: - _CONFIG_CACHE.clear() - - def _validate_config_safety(config: Dict[str, Any]) -> None: """Check for dangerous configurations, like folder stores in non-empty dirs.""" store = config.get("store") diff --git a/SYS/html_table.py b/SYS/html_table.py index 9d8facb..1d0cc40 100644 --- a/SYS/html_table.py +++ b/SYS/html_table.py @@ -220,11 +220,11 @@ def extract_records(doc_or_html: Any, base_url: Optional[str] = None, xpaths: Op records: List[Dict[str, str]] = [] for row in rows: - nr: Dict[str, str] = {} + row_norm: Dict[str, str] = {} for k, v in (row or {}).items(): nk = normalize_header(str(k or "")) - nr[nk] = (str(v).strip() if v is not None else "") - records.append(nr) + row_norm[nk] = (str(v).strip() if v is not None else "") + records.append(row_norm) # Attempt to recover hrefs by matching anchor text -> href try: @@ -265,11 +265,11 @@ def extract_records(doc_or_html: Any, base_url: Optional[str] = None, xpaths: Op # Normalize keys (map platform->system etc) normed: List[Dict[str, str]] = [] for r in records: - nr: Dict[str, str] = {} + norm_row: Dict[str, str] = {} for k, v in (r or {}).items(): nk = normalize_header(k) - nr[nk] = v - normed.append(nr) + norm_row[nk] = v + normed.append(norm_row) return normed, chosen diff --git a/SYS/json_table.py b/SYS/json_table.py index c45a2e4..36e4af4 100644 --- a/SYS/json_table.py +++ b/SYS/json_table.py @@ -24,16 +24,16 @@ def _coerce_value(value: Any) -> str: if isinstance(value, bool): return "true" if value else "false" if isinstance(value, (list, tuple, set)): - parts = [_coerce_value(v) for v in value] - cleaned = [part for part in parts if part] + parts_list = [_coerce_value(v) for v in value] + cleaned = [part for part in parts_list if part] return ", ".join(cleaned) if isinstance(value, dict): - parts: List[str] = [] + dict_parts: List[str] = [] for subkey, subvalue in value.items(): part = _coerce_value(subvalue) if part: - parts.append(f"{subkey}:{part}") - return ", ".join(parts) + dict_parts.append(f"{subkey}:{part}") + return ", ".join(dict_parts) try: return str(value).strip() except Exception: diff --git a/SYS/logger.py b/SYS/logger.py index 0ca8c89..1baa943 100644 --- a/SYS/logger.py +++ b/SYS/logger.py @@ -140,7 +140,7 @@ def debug_inspect( value=value, max_string=100_000, max_length=100_000, - ) + ) # type: ignore[call-arg] except TypeError: rich_inspect( obj, @@ -155,7 +155,6 @@ def debug_inspect( value=value, ) - def log(*args, **kwargs) -> None: """Print with automatic file.function prefix. diff --git a/SYS/metadata.py b/SYS/metadata.py index 3aeed40..a91c6e2 100644 --- a/SYS/metadata.py +++ b/SYS/metadata.py @@ -17,6 +17,14 @@ try: # Optional; used for IMDb lookup without API key from imdbinfo.services import search_title # type: ignore except Exception: # pragma: no cover - optional dependency search_title = None # type: ignore[assignment] +try: + import mutagen +except ImportError: + mutagen = None +try: + import musicbrainzngs +except ImportError: + musicbrainzngs = None def value_normalize(value: Any) -> str: @@ -93,6 +101,52 @@ def _sanitize_url(value: Optional[str]) -> Optional[str]: return cleaned +def sanitize_metadata_value(value: Any) -> str: + if value is None: + return "" + if isinstance(value, (list, tuple)): + value = ", ".join(str(v) for v in value if v) + return str(value).strip().replace("\n", " ").replace("\r", " ") + + +def unique_preserve_order(items: Iterable[Any]) -> list[Any]: + seen = set() + result = [] + for item in items: + if item not in seen: + seen.add(item) + result.append(item) + return result + + +def fetch_musicbrainz_tags(mbid: str, entity: str = "release") -> Dict[str, Any]: + if not musicbrainzngs: + return {"tag": []} + + musicbrainzngs.set_useragent("Medeia-Macina", "0.1") + tags: list[str] = [] + try: + if entity == "release": + res = musicbrainzngs.get_release_by_id(mbid, includes=["tags"]) + tags_list = res.get("release", {}).get("tag-list", []) + elif entity == "recording": + res = musicbrainzngs.get_recording_by_id(mbid, includes=["tags"]) + tags_list = res.get("recording", {}).get("tag-list", []) + elif entity == "artist": + res = musicbrainzngs.get_artist_by_id(mbid, includes=["tags"]) + tags_list = res.get("artist", {}).get("tag-list", []) + else: + return {"tag": []} + + for t in tags_list: + if isinstance(t, dict) and "name" in t: + tags.append(t["name"]) + except Exception as exc: + debug(f"MusicBrainz lookup failed: {exc}") + + return {"tag": tags} + + def _clean_existing_tags(existing: Any) -> List[str]: tags: List[str] = [] seen: Set[str] = set() @@ -601,7 +655,7 @@ def write_tags( # Write via consolidated function try: - lines = [] + lines: List[str] = [] lines.extend(str(tag).strip().lower() for tag in tag_list if str(tag).strip()) if lines: @@ -2415,11 +2469,6 @@ def scrape_url_metadata( try: import json as json_module - try: - from SYS.metadata import extract_ytdlp_tags - except ImportError: - extract_ytdlp_tags = None - # Build yt-dlp command with playlist support # IMPORTANT: Do NOT use --flat-playlist! It strips metadata like artist, album, uploader, genre # Without it, yt-dlp gives us full metadata in an 'entries' array within a single JSON object @@ -2462,14 +2511,13 @@ def scrape_url_metadata( # is_playlist = 'entries' in data and isinstance(data.get('entries'), list) # Extract tags and playlist items - tags = [] - playlist_items = [] + tags: List[str] = [] + playlist_items: List[Dict[str, Any]] = [] # IMPORTANT: Extract album/playlist-level tags FIRST (before processing entries) # This ensures we get metadata about the collection, not just individual tracks - if extract_ytdlp_tags: - album_tags = extract_ytdlp_tags(data) - tags.extend(album_tags) + album_tags = extract_ytdlp_tags(data) + tags.extend(album_tags) # Case 1: Entries are nested in the main object (standard playlist structure) if "entries" in data and isinstance(data.get("entries"), list): @@ -2493,41 +2541,40 @@ def scrape_url_metadata( # Extract tags from each entry and merge (but don't duplicate album-level tags) # Only merge entry tags that are multi-value prefixes (not single-value like title:, artist:, etc.) - if extract_ytdlp_tags: - entry_tags = extract_ytdlp_tags(entry) + entry_tags = extract_ytdlp_tags(entry) - # Single-value namespaces that should not be duplicated from entries - single_value_namespaces = { - "title", - "artist", - "album", - "creator", - "channel", - "release_date", - "upload_date", - "license", - "location", - } + # Single-value namespaces that should not be duplicated from entries + single_value_namespaces = { + "title", + "artist", + "album", + "creator", + "channel", + "release_date", + "upload_date", + "license", + "location", + } - for tag in entry_tags: - # Extract the namespace (part before the colon) - tag_namespace = tag.split(":", - 1)[0].lower( - ) if ":" in tag else None + for tag in entry_tags: + # Extract the namespace (part before the colon) + tag_namespace = tag.split(":", + 1)[0].lower( + ) if ":" in tag else None - # Skip if this namespace already exists in tags (from album level) - if tag_namespace and tag_namespace in single_value_namespaces: - # Check if any tag with this namespace already exists in tags - already_has_namespace = any( - t.split(":", - 1)[0].lower() == tag_namespace for t in tags - if ":" in t - ) - if already_has_namespace: - continue # Skip this tag, keep the album-level one + # Skip if this namespace already exists in tags (from album level) + if tag_namespace and tag_namespace in single_value_namespaces: + # Check if any tag with this namespace already exists in tags + already_has_namespace = any( + t.split(":", + 1)[0].lower() == tag_namespace for t in tags + if ":" in t + ) + if already_has_namespace: + continue # Skip this tag, keep the album-level one - if tag not in tags: # Avoid exact duplicates - tags.append(tag) + if tag not in tags: # Avoid exact duplicates + tags.append(tag) # Case 2: Playlist detected by playlist_count field (BandCamp albums, etc.) # These need a separate call with --flat-playlist to get the actual entries @@ -2586,7 +2633,7 @@ def scrape_url_metadata( pass # Silently ignore if we can't get playlist entries # Fallback: if still no tags detected, get from first item - if not tags and extract_ytdlp_tags: + if not tags: tags = extract_ytdlp_tags(data) # Extract formats from the main data object @@ -2595,11 +2642,7 @@ def scrape_url_metadata( formats = extract_url_formats(data.get("formats", [])) # Deduplicate tags by namespace to prevent duplicate title:, artist:, etc. - try: - if dedup_tags_by_namespace: - tags = dedup_tags_by_namespace(tags, keep_first=True) - except Exception: - pass # If dedup fails, return tags as-is + tags = dedup_tags_by_namespace(tags, keep_first=True) return title, tags, formats, playlist_items @@ -2617,8 +2660,8 @@ def extract_url_formats(formats: list) -> List[Tuple[str, str]]: Returns list of (display_label, format_id) tuples. """ try: - video_formats = {} # {resolution: format_data} - audio_formats = {} # {quality_label: format_data} + video_formats: Dict[str, Dict[str, Any]] = {} # {resolution: format_data} + audio_formats: Dict[str, Dict[str, Any]] = {} # {quality_label: format_data} for fmt in formats: vcodec = fmt.get("vcodec", "none") @@ -2655,7 +2698,7 @@ def extract_url_formats(formats: list) -> List[Tuple[str, str]]: "abr": abr, } - result = [] + result: List[Tuple[str, str]] = [] # Add video formats in descending resolution order for res in sorted(video_formats.keys(), @@ -2674,3 +2717,237 @@ def extract_url_formats(formats: list) -> List[Tuple[str, str]]: except Exception as e: log(f"Error extracting formats: {e}", file=sys.stderr) return [] + +def prepare_ffmpeg_metadata(payload: Optional[dict[str, Any]]) -> dict[str, str]: + if not isinstance(payload, dict): + return {} + metadata: dict[str, str] = {} + + def set_field(key: str, raw: Any, limit: int = 2000) -> None: + sanitized = sanitize_metadata_value(raw) + if not sanitized: + return + if len(sanitized) > limit: + sanitized = sanitized[:limit] + metadata[key] = sanitized + + set_field("title", payload.get("title")) + set_field("artist", payload.get("artist"), 512) + set_field("album", payload.get("album"), 512) + set_field("date", payload.get("year") or payload.get("date"), 20) + comment = payload.get("comment") + tags_value = payload.get("tags") + tag_strings: list[str] = [] + artists_from_tags: list[str] = [] + albums_from_tags: list[str] = [] + genres_from_tags: list[str] = [] + if isinstance(tags_value, list): + for raw_tag in tags_value: + if raw_tag is None: + continue + if not isinstance(raw_tag, str): + raw_tag = str(raw_tag) + tag = raw_tag.strip() + if not tag: + continue + tag_strings.append(tag) + namespace, sep, value = tag.partition(":") + if sep and value: + ns = namespace.strip().lower() + value = value.strip() + if ns in {"artist", "creator", "author", "performer"}: + artists_from_tags.append(value) + elif ns in {"album", "series", "collection", "group"}: + albums_from_tags.append(value) + elif ns in {"genre", "rating"}: + genres_from_tags.append(value) + elif ns in {"comment", "description"} and not comment: + comment = value + elif ns in {"year", "date"} and not (payload.get("year") or payload.get("date")): + set_field("date", value, 20) + else: + genres_from_tags.append(tag) + if "artist" not in metadata and artists_from_tags: + set_field("artist", ", ".join(unique_preserve_order(artists_from_tags)[:3]), 512) + if "album" not in metadata and albums_from_tags: + set_field("album", unique_preserve_order(albums_from_tags)[0], 512) + if genres_from_tags: + set_field("genre", ", ".join(unique_preserve_order(genres_from_tags)[:5]), 256) + if tag_strings: + joined_tags = ", ".join(tag_strings[:50]) + set_field("keywords", joined_tags, 2000) + if not comment: + comment = joined_tags + if comment: + set_field("comment", str(comment), 2000) + set_field("description", str(comment), 2000) + return metadata + + +def apply_mutagen_metadata(path: Path, metadata: dict[str, str], fmt: str) -> None: + if fmt != "audio": + return + if not metadata: + return + if mutagen is None: + return + try: + audio = mutagen.File(path, easy=True) # type: ignore[attr-defined] + except Exception as exc: # pragma: no cover - best effort only + log(f"mutagen load failed: {exc}", file=sys.stderr) + return + if audio is None: + return + field_map = { + "title": "title", + "artist": "artist", + "album": "album", + "genre": "genre", + "comment": "comment", + "description": "comment", + "date": "date", + } + changed = False + for source_key, target_key in field_map.items(): + value = metadata.get(source_key) + if not value: + continue + try: + audio[target_key] = [value] + changed = True + except Exception: # pragma: no cover - best effort only + continue + if not changed: + return + try: + audio.save() + except Exception as exc: # pragma: no cover - best effort only + log(f"mutagen save failed: {exc}", file=sys.stderr) + + +def build_ffmpeg_command( + ffmpeg_path: str, + input_path: Path, + output_path: Path, + fmt: str, + max_width: int, + metadata: Optional[dict[str, str]] = None, +) -> list[str]: + cmd = [ffmpeg_path, "-y", "-i", str(input_path)] + if fmt in {"mp4", "webm"} and max_width and max_width > 0: + cmd.extend(["-vf", f"scale='min({max_width},iw)':-2"]) + if metadata: + for key, value in metadata.items(): + cmd.extend(["-metadata", f"{key}={value}"]) + + # Video formats + if fmt == "mp4": + cmd.extend([ + "-c:v", + "libx265", + "-preset", + "medium", + "-crf", + "26", + "-tag:v", + "hvc1", + "-pix_fmt", + "yuv420p", + "-c:a", + "aac", + "-b:a", + "192k", + "-movflags", + "+faststart", + ]) + elif fmt == "webm": + cmd.extend([ + "-c:v", + "libvpx-vp9", + "-b:v", + "0", + "-crf", + "32", + "-c:a", + "libopus", + "-b:a", + "160k", + ]) + cmd.extend(["-f", "webm"]) + + # Audio formats + elif fmt == "mp3": + cmd.extend([ + "-vn", + "-c:a", + "libmp3lame", + "-b:a", + "192k", + ]) + cmd.extend(["-f", "mp3"]) + elif fmt == "flac": + cmd.extend([ + "-vn", + "-c:a", + "flac", + ]) + cmd.extend(["-f", "flac"]) + elif fmt == "wav": + cmd.extend([ + "-vn", + "-c:a", + "pcm_s16le", + ]) + cmd.extend(["-f", "wav"]) + elif fmt == "aac": + cmd.extend([ + "-vn", + "-c:a", + "aac", + "-b:a", + "192k", + ]) + cmd.extend(["-f", "adts"]) + elif fmt == "m4a": + cmd.extend([ + "-vn", + "-c:a", + "aac", + "-b:a", + "192k", + ]) + cmd.extend(["-f", "ipod"]) + elif fmt == "ogg": + cmd.extend([ + "-vn", + "-c:a", + "libvorbis", + "-b:a", + "192k", + ]) + cmd.extend(["-f", "ogg"]) + elif fmt == "opus": + cmd.extend([ + "-vn", + "-c:a", + "libopus", + "-b:a", + "192k", + ]) + cmd.extend(["-f", "opus"]) + elif fmt == "audio": + # Legacy format name for mp3 + cmd.extend([ + "-vn", + "-c:a", + "libmp3lame", + "-b:a", + "192k", + ]) + cmd.extend(["-f", "mp3"]) + elif fmt != "copy": + raise ValueError(f"Unsupported format: {fmt}") + + cmd.append(str(output_path)) + return cmd + diff --git a/SYS/models.py b/SYS/models.py index 615d91b..9c3f2ca 100644 --- a/SYS/models.py +++ b/SYS/models.py @@ -633,7 +633,13 @@ class ProgressFileReader: min_interval_s: float = 0.25, ): self._f = fileobj - self._total = int(total_bytes) if total_bytes not in (None, 0, "") else 0 + if total_bytes is None: + self._total = 0 + else: + try: + self._total = int(total_bytes) + except Exception: + self._total = 0 self._label = str(label or "upload") self._min_interval_s = max(0.05, float(min_interval_s)) self._bar = ProgressBar() diff --git a/SYS/pipeline.py b/SYS/pipeline.py index be01148..4b5e620 100644 --- a/SYS/pipeline.py +++ b/SYS/pipeline.py @@ -7,7 +7,7 @@ import sys from contextlib import contextmanager from dataclasses import dataclass, field from contextvars import ContextVar -from typing import Any, Dict, List, Optional, Sequence +from typing import Any, Dict, List, Optional, Sequence, Callable from SYS.models import PipelineStageContext from SYS.logger import log, debug, is_debug_enabled from SYS.worker import WorkerManagerRegistry, WorkerStages @@ -15,6 +15,9 @@ from SYS.cli_parsing import SelectionSyntax, SelectionFilterSyntax from SYS.rich_display import stdout_console from SYS.background_notifier import ensure_background_notifier from SYS.result_table import Table +import re +from datetime import datetime +from SYS.cmdlet_catalog import import_cmd_module HELP_EXAMPLE_SOURCE_COMMANDS = { ".help-example", @@ -946,8 +949,9 @@ def get_last_result_table_source_command() -> Optional[str]: Command name (e.g., 'download-file') or None if not set """ state = _get_pipeline_state() - if _is_selectable_table(state.last_result_table) and hasattr(state.last_result_table, "source_command"): - return state.last_result_table.source_command + table = state.last_result_table + if table is not None and _is_selectable_table(table) and hasattr(table, "source_command"): + return getattr(table, "source_command") return None @@ -958,8 +962,9 @@ def get_last_result_table_source_args() -> List[str]: List of arguments (e.g., ['https://example.com']) or empty list """ state = _get_pipeline_state() - if _is_selectable_table(state.last_result_table) and hasattr(state.last_result_table, "source_args"): - return state.last_result_table.source_args or [] + table = state.last_result_table + if table is not None and _is_selectable_table(table) and hasattr(table, "source_args"): + return getattr(table, "source_args") or [] return [] @@ -973,22 +978,26 @@ def get_last_result_table_row_selection_args(row_index: int) -> Optional[List[st Selection arguments (e.g., ['-item', '3']) or None """ state = _get_pipeline_state() - if _is_selectable_table(state.last_result_table) and hasattr(state.last_result_table, "rows"): - if 0 <= row_index < len(state.last_result_table.rows): - row = state.last_result_table.rows[row_index] + table = state.last_result_table + if table is not None and _is_selectable_table(table) and hasattr(table, "rows"): + rows = table.rows + if 0 <= row_index < len(rows): + row = rows[row_index] if hasattr(row, "selection_args"): - return row.selection_args + return getattr(row, "selection_args") return None def get_last_result_table_row_selection_action(row_index: int) -> Optional[List[str]]: """Get the expanded stage tokens for a row in the last result table.""" state = _get_pipeline_state() - if _is_selectable_table(state.last_result_table) and hasattr(state.last_result_table, "rows"): - if 0 <= row_index < len(state.last_result_table.rows): - row = state.last_result_table.rows[row_index] + table = state.last_result_table + if table is not None and _is_selectable_table(table) and hasattr(table, "rows"): + rows = table.rows + if 0 <= row_index < len(rows): + row = rows[row_index] if hasattr(row, "selection_action"): - return row.selection_action + return getattr(row, "selection_action") return None def set_current_stage_table(result_table: Optional[Any]) -> None: @@ -1019,8 +1028,9 @@ def get_current_stage_table_source_command() -> Optional[str]: Command name (e.g., 'download-file') or None """ state = _get_pipeline_state() - if _is_selectable_table(state.current_stage_table) and hasattr(state.current_stage_table, "source_command"): - return state.current_stage_table.source_command + table = state.current_stage_table + if table is not None and _is_selectable_table(table) and hasattr(table, "source_command"): + return getattr(table, "source_command") return None @@ -1031,8 +1041,9 @@ def get_current_stage_table_source_args() -> List[str]: List of arguments or empty list """ state = _get_pipeline_state() - if _is_selectable_table(state.current_stage_table) and hasattr(state.current_stage_table, "source_args"): - return state.current_stage_table.source_args or [] + table = state.current_stage_table + if table is not None and _is_selectable_table(table) and hasattr(table, "source_args"): + return getattr(table, "source_args") or [] return [] @@ -1046,22 +1057,26 @@ def get_current_stage_table_row_selection_args(row_index: int) -> Optional[List[ Selection arguments or None """ state = _get_pipeline_state() - if _is_selectable_table(state.current_stage_table) and hasattr(state.current_stage_table, "rows"): - if 0 <= row_index < len(state.current_stage_table.rows): - row = state.current_stage_table.rows[row_index] + table = state.current_stage_table + if table is not None and _is_selectable_table(table) and hasattr(table, "rows"): + rows = table.rows + if 0 <= row_index < len(rows): + row = rows[row_index] if hasattr(row, "selection_args"): - return row.selection_args + return getattr(row, "selection_args") return None def get_current_stage_table_row_selection_action(row_index: int) -> Optional[List[str]]: """Get the expanded stage tokens for a row in the current stage table.""" state = _get_pipeline_state() - if _is_selectable_table(state.current_stage_table) and hasattr(state.current_stage_table, "rows"): - if 0 <= row_index < len(state.current_stage_table.rows): - row = state.current_stage_table.rows[row_index] + table = state.current_stage_table + if table is not None and _is_selectable_table(table) and hasattr(table, "rows"): + rows = table.rows + if 0 <= row_index < len(rows): + row = rows[row_index] if hasattr(row, "selection_action"): - return row.selection_action + return getattr(row, "selection_action") return None @@ -1072,9 +1087,11 @@ def get_current_stage_table_row_source_index(row_index: int) -> Optional[int]: back to the original item order (e.g., playlist or provider order). """ state = _get_pipeline_state() - if _is_selectable_table(state.current_stage_table) and hasattr(state.current_stage_table, "rows"): - if 0 <= row_index < len(state.current_stage_table.rows): - row = state.current_stage_table.rows[row_index] + table = state.current_stage_table + if table is not None and _is_selectable_table(table) and hasattr(table, "rows"): + rows = table.rows + if 0 <= row_index < len(rows): + row = rows[row_index] return getattr(row, "source_index", None) return None diff --git a/SYS/result_table.py b/SYS/result_table.py index 916bfd5..c52db2e 100644 --- a/SYS/result_table.py +++ b/SYS/result_table.py @@ -33,12 +33,15 @@ except ImportError: TEXTUAL_AVAILABLE = False -# Import ResultModel from the API for unification -try: - from SYS.result_table_api import ResultModel -except ImportError: - # Fallback if not available yet in directory structure (unlikely) - ResultModel = None +# Import ResultModel from the API for typing; avoid runtime redefinition issues +from typing import TYPE_CHECKING +if TYPE_CHECKING: + from SYS.result_table_api import ResultModel # type: ignore +else: + ResultModel = None # type: ignore[assignment] + +# Reuse the existing format_bytes helper under a clearer alias +from SYS.utils import format_bytes as format_mb def _sanitize_cell_text(value: Any) -> str: @@ -158,6 +161,8 @@ def extract_hash_value(item: Any) -> str: def extract_title_value(item: Any) -> str: data = _as_dict(item) or {} + if not isinstance(data, dict): + data = {} title = _get_first_dict_value(data, ["title", "name", "filename"]) if not title: title = _get_first_dict_value( @@ -171,9 +176,11 @@ def extract_title_value(item: Any) -> str: def extract_ext_value(item: Any) -> str: data = _as_dict(item) or {} + if not isinstance(data, dict): + data = {} - meta = data.get("metadata") if isinstance(data.get("metadata"), - dict) else {} + _md = data.get("metadata") + meta: Dict[str, Any] = _md if isinstance(_md, dict) else {} raw_path = data.get("path") or data.get("target") or data.get( "filename" ) or data.get("title") @@ -206,8 +213,10 @@ def extract_ext_value(item: Any) -> str: def extract_size_bytes_value(item: Any) -> Optional[int]: data = _as_dict(item) or {} - meta = data.get("metadata") if isinstance(data.get("metadata"), - dict) else {} + if not isinstance(data, dict): + data = {} + _md = data.get("metadata") + meta: Dict[str, Any] = _md if isinstance(_md, dict) else {} size_val = _get_first_dict_value( data, @@ -749,7 +758,7 @@ class Table: row.payload = result # Handle ResultModel from the new strict API (SYS/result_table_api.py) - if ResultModel and isinstance(result, ResultModel): + if ResultModel is not None and isinstance(result, ResultModel): self._add_result_model(row, result) # Handle TagItem from get_tag.py (tag display with index) elif hasattr(result, "__class__") and result.__class__.__name__ == "TagItem": @@ -1573,7 +1582,7 @@ class Table: return None # Remaining parts are cmdlet arguments - cmdlet_args = {} + cmdlet_args: dict[str, Any] = {} i = 1 while i < len(parts): part = parts[i] @@ -1906,7 +1915,7 @@ def extract_item_metadata(item: Any) -> Dict[str, Any]: out = {} # Handle ResultModel specifically for better detail display - if ResultModel and isinstance(item, ResultModel): + if ResultModel is not None and isinstance(item, ResultModel): if item.title: out["Title"] = item.title if item.path: out["Path"] = item.path if item.ext: out["Ext"] = item.ext @@ -1964,34 +1973,30 @@ def extract_item_metadata(item: Any) -> Dict[str, Any]: if e: out["Ext"] = e size = extract_size_bytes_value(item) - if size: - out["Size"] = size + if size is not None: + out["Size"] = format_mb(size) else: s = data.get("size") or data.get("size_bytes") - if s: out["Size"] = s - + if s is not None: + out["Size"] = str(s) + # Duration dur = _get_first_dict_value(data, ["duration_seconds", "duration"]) if dur: out["Duration"] = _format_duration_hms(dur) - + # URL url = _get_first_dict_value(data, ["url", "URL"]) - if url: - out["Url"] = url - else: - out["Url"] = None # Explicitly None for display + out["Url"] = str(url) if url else "" # Relationships rels = _get_first_dict_value(data, ["relationships", "rel"]) - if rels: - out["Relations"] = rels - else: - out["Relations"] = None + out["Relations"] = str(rels) if rels else "" # Tags Summary tags = _get_first_dict_value(data, ["tags", "tag"]) - if tags: out["Tags"] = tags + if tags: + out["Tags"] = ", ".join([str(t) for t in (tags if isinstance(tags, (list, tuple)) else [tags])]) return out diff --git a/SYS/rich_display.py b/SYS/rich_display.py index 6af8d33..7aa19cc 100644 --- a/SYS/rich_display.py +++ b/SYS/rich_display.py @@ -11,7 +11,7 @@ from __future__ import annotations import contextlib import sys -from typing import Any, Iterator, TextIO +from typing import Any, Iterator, TextIO, List, Dict, Optional, Tuple, cast from rich.console import Console from rich.panel import Panel @@ -200,8 +200,8 @@ def render_image_to_console(image_path: str | Path, max_width: int | None = None if not path.exists() or not path.is_file(): return - with Image.open(path) as img: - img = img.convert("RGB") + with Image.open(path) as opened_img: + img = opened_img.convert("RGB") orig_w, orig_h = img.size # Determine target dimensions @@ -235,14 +235,21 @@ def render_image_to_console(image_path: str | Path, max_width: int | None = None img = img.resize((target_w, target_h), Image.Resampling.BILINEAR) pixels = img.load() + if pixels is None: + return # Render using upper half block (U+2580) # Each character row in terminal represents 2 pixel rows in image. for y in range(0, target_h - 1, 2): line = Text() for x in range(target_w): - r1, g1, b1 = pixels[x, y] - r2, g2, b2 = pixels[x, y + 1] + rgb1 = cast(tuple, pixels[x, y]) + rgb2 = cast(tuple, pixels[x, y + 1]) + try: + r1, g1, b1 = int(rgb1[0]), int(rgb1[1]), int(rgb1[2]) + r2, g2, b2 = int(rgb2[0]), int(rgb2[1]), int(rgb2[2]) + except Exception: + r1 = g1 = b1 = r2 = g2 = b2 = 0 # Foreground is top pixel, background is bottom pixel line.append( "▀", diff --git a/SYS/utils.py b/SYS/utils.py index f8d1461..229a69d 100644 --- a/SYS/utils.py +++ b/SYS/utils.py @@ -21,7 +21,7 @@ from dataclasses import dataclass, field from fnmatch import fnmatch from urllib.parse import urlparse -import SYS.utils_constant +from SYS.utils_constant import mime_maps try: import cbor2 @@ -140,7 +140,7 @@ def create_metadata_sidecar(file_path: Path, metadata: dict) -> None: metadata["hash"] = sha256_file(file_path) metadata["size"] = Path(file_path).stat().st_size format_found = False - for mime_type, ext_map in SYS.utils_constant.mime_maps.items(): + for mime_type, ext_map in mime_maps.items(): for key, info in ext_map.items(): if info.get("ext") == file_ext: metadata["type"] = mime_type @@ -516,7 +516,7 @@ def get_api_key(config: dict[str, Any], service: str, key_path: str) -> str | No """ try: parts = key_path.split(".") - value = config + value: Any = config for part in parts: if isinstance(value, dict): value = value.get(part) diff --git a/SYS/utils_constant.py b/SYS/utils_constant.py index 3b04e45..db87ed5 100644 --- a/SYS/utils_constant.py +++ b/SYS/utils_constant.py @@ -1,4 +1,6 @@ -mime_maps = { +from typing import Any, Dict + +mime_maps: Dict[str, Dict[str, Dict[str, Any]]] = { "image": { "jpg": { "ext": ".jpg", diff --git a/SYS/worker.py b/SYS/worker.py index 5accfbc..200e10b 100644 --- a/SYS/worker.py +++ b/SYS/worker.py @@ -5,7 +5,7 @@ import io import sys import uuid from pathlib import Path -from typing import Any, Dict, Optional, Set, TextIO +from typing import Any, Dict, Optional, Set, TextIO, Sequence from SYS.config import get_local_storage_path from SYS.worker_manager import WorkerManager @@ -189,9 +189,7 @@ class WorkerManagerRegistry: manager.expire_running_workers( older_than_seconds=120, worker_id_prefix="cli_%", - reason=( - "CLI session ended unexpectedly; marking worker as failed", - ), + reason="CLI session ended unexpectedly; marking worker as failed", ) except Exception: pass diff --git a/SYS/worker_manager.py b/SYS/worker_manager.py index d14c2d5..ab2510a 100644 --- a/SYS/worker_manager.py +++ b/SYS/worker_manager.py @@ -47,8 +47,8 @@ class Worker: self.details = "" self.error_message = "" self.result = "pending" - self._stdout_buffer = [] - self._steps_buffer = [] + self._stdout_buffer: list[str] = [] + self._steps_buffer: list[str] = [] def log_step(self, step_text: str) -> None: """Log a step for this worker. @@ -108,18 +108,26 @@ class Worker: logger.error(f"Error getting steps for worker {self.id}: {e}") return "" - def update_progress(self, progress: str = "", details: str = "") -> None: + def update_progress(self, progress: float | str = 0.0, details: str = "") -> None: """Update worker progress. Args: - progress: Progress string (e.g., "50%") + progress: Progress value (float) or textual like "50%"; will be coerced to float details: Additional details """ - self.progress = progress + self.progress = str(progress) self.details = details try: if self.manager: - self.manager.update_worker(self.id, progress, details) + # Normalize to a float value for the manager API (0-100) + try: + if isinstance(progress, str) and progress.endswith('%'): + progress_value = float(progress.rstrip('%')) + else: + progress_value = float(progress) + except Exception: + progress_value = 0.0 + self.manager.update_worker(self.id, progress_value, details) except Exception as e: logger.error(f"Error updating worker {self.id}: {e}") @@ -165,7 +173,7 @@ class WorkerLoggingHandler(logging.StreamHandler): self.db = db self.manager = manager self.buffer_size = buffer_size - self.buffer = [] + self.buffer: list[str] = [] self._lock = Lock() # Set a format that includes timestamp and level @@ -278,14 +286,6 @@ class WorkerManager: self._stdout_flush_bytes = 4096 self._stdout_flush_interval = 0.75 - def close(self) -> None: - """Close the database connection.""" - if self.db: - try: - with self._db_lock: - self.db.close() - except Exception: - pass def __enter__(self): """Context manager entry.""" @@ -478,7 +478,7 @@ class WorkerManager: True if update was successful """ try: - kwargs = {} + kwargs: dict[str, Any] = {} if progress > 0: kwargs["progress"] = progress if current_step: diff --git a/Store/Folder.py b/Store/Folder.py index fb912eb..f75080c 100644 --- a/Store/Folder.py +++ b/Store/Folder.py @@ -10,6 +10,7 @@ from typing import Any, Dict, List, Optional, Tuple from SYS.logger import debug, log from SYS.utils import sha256_file, expand_path +from SYS.config import get_local_storage_path from Store._base import Store @@ -56,7 +57,7 @@ class Folder(Store): """""" # Track which locations have already been migrated to avoid repeated migrations - _migrated_locations = set() + _migrated_locations: set[str] = set() # Cache scan results to avoid repeated full scans across repeated instantiations _scan_cache: Dict[str, Tuple[bool, @@ -65,7 +66,7 @@ class Folder(Store): int]]] = {} @classmethod - def config(cls) -> List[Dict[str, Any]]: + def config_schema(cls) -> List[Dict[str, Any]]: return [ { "key": "NAME", @@ -1498,11 +1499,12 @@ class Folder(Store): debug(f"Failed to get file for hash {file_hash}: {exc}") return None - def get_metadata(self, file_hash: str) -> Optional[Dict[str, Any]]: + def get_metadata(self, file_hash: str, **kwargs: Any) -> Optional[Dict[str, Any]]: """Get metadata for a file from the database by hash. Args: file_hash: SHA256 hash of the file (64-char hex string) + **kwargs: Additional options Returns: Dict with metadata fields (ext, size, hash, duration, etc.) or None if not found @@ -1613,7 +1615,7 @@ class Folder(Store): debug(f"get_tags failed for local file: {exc}") return [], "unknown" - def add_tag(self, hash: str, tag: List[str], **kwargs: Any) -> bool: + def add_tag(self, file_identifier: str, tags: List[str], **kwargs: Any) -> bool: """Add tags to a local file by hash (via API_folder_store). Handles namespace collapsing: when adding namespace:value, removes existing namespace:* tags. @@ -1628,14 +1630,14 @@ class Folder(Store): try: with API_folder_store(Path(self._location)) as db: existing_tags = [ - t for t in (db.get_tags(hash) or []) + t for t in (db.get_tags(file_identifier) or []) if isinstance(t, str) and t.strip() ] from SYS.metadata import compute_namespaced_tag_overwrite _to_remove, _to_add, merged = compute_namespaced_tag_overwrite( - existing_tags, tag or [] + existing_tags, tags or [] ) if not _to_remove and not _to_add: return True @@ -1644,7 +1646,7 @@ class Folder(Store): # To enforce lowercase-only tags and namespace overwrites, rewrite the full tag set. cursor = db.connection.cursor() cursor.execute("DELETE FROM tag WHERE hash = ?", - (hash, + (file_identifier, )) for t in merged: t = str(t).strip().lower() diff --git a/Store/HydrusNetwork.py b/Store/HydrusNetwork.py index c61786a..756f5fe 100644 --- a/Store/HydrusNetwork.py +++ b/Store/HydrusNetwork.py @@ -30,7 +30,7 @@ class HydrusNetwork(Store): """ @classmethod - def config(cls) -> List[Dict[str, Any]]: + def config_schema(cls) -> List[Dict[str, Any]]: return [ { "key": "NAME", @@ -723,6 +723,10 @@ class HydrusNetwork(Store): if text: pattern_hints.append(text) pattern_hint = pattern_hints[0] if pattern_hints else "" + + hashes: list[str] = [] + file_ids: list[int] = [] + if ":" in query_lower and not query_lower.startswith(":"): namespace, pattern = query_lower.split(":", 1) namespace = namespace.strip().lower() @@ -765,8 +769,8 @@ class HydrusNetwork(Store): response = client._perform_request( spec ) # type: ignore[attr-defined] - hashes: list[str] = [] - file_ids: list[int] = [] + hashes = [] + file_ids = [] if isinstance(response, dict): raw_hashes = response.get("hashes") or response.get( "file_hashes" @@ -870,11 +874,11 @@ class HydrusNetwork(Store): freeform_predicates = [f"{query_lower}*"] # Search files with the tags (unless url: search already produced metadata) - results = [] + results: list[dict[str, Any]] = [] if metadata_list is None: - file_ids: list[int] = [] - hashes: list[str] = [] + file_ids = [] + hashes = [] if freeform_union_search: if not title_predicates and not freeform_predicates: @@ -929,7 +933,7 @@ class HydrusNetwork(Store): # Fast path: ext-only search. Avoid fetching metadata for an unbounded # system:everything result set; fetch in chunks until we have enough. if ext_only and ext_filter: - results: list[dict[str, Any]] = [] + results = [] if not file_ids and not hashes: debug(f"{prefix} 0 result(s)") return [] @@ -1930,7 +1934,7 @@ class HydrusNetwork(Store): try: if service_key: # Mutate tags for many hashes in a single request - client.mutate_tags_by_key(hashes=hashes, service_key=service_key, add_tags=list(tag_tuple)) + client.mutate_tags_by_key(hash=hashes, service_key=service_key, add_tags=list(tag_tuple)) any_success = True continue except Exception as exc: diff --git a/Store/ZeroTier.py b/Store/ZeroTier.py index fdabd36..f90f890 100644 --- a/Store/ZeroTier.py +++ b/Store/ZeroTier.py @@ -30,7 +30,7 @@ from Store._base import Store class ZeroTier(Store): @classmethod - def config(cls) -> List[Dict[str, Any]]: + def config_schema(cls) -> List[Dict[str, Any]]: return [ {"key": "NAME", "label": "Store Name", "default": "", "required": True}, {"key": "NETWORK_ID", "label": "ZeroTier Network ID", "default": "", "required": True}, diff --git a/Store/_base.py b/Store/_base.py index d99703f..c12e9d9 100644 --- a/Store/_base.py +++ b/Store/_base.py @@ -13,7 +13,7 @@ from typing import Any, Dict, List, Optional, Tuple class Store(ABC): @classmethod - def config(cls) -> List[Dict[str, Any]]: + def config_schema(cls) -> List[Dict[str, Any]]: """Return configuration schema for this store. Returns a list of dicts: diff --git a/Store/registry.py b/Store/registry.py index d58ead1..fd08f30 100644 --- a/Store/registry.py +++ b/Store/registry.py @@ -91,10 +91,10 @@ def _discover_store_classes() -> Dict[str, Type[BaseStore]]: def _required_keys_for(store_cls: Type[BaseStore]) -> list[str]: - # Support new config() schema - if hasattr(store_cls, "config") and callable(store_cls.config): + # Support new config_schema() schema + if hasattr(store_cls, "config_schema") and callable(store_cls.config_schema): try: - schema = store_cls.config() + schema = store_cls.config_schema() keys = [] if isinstance(schema, list): for field in schema: diff --git a/TUI/modalscreen/config_modal.py b/TUI/modalscreen/config_modal.py index d00efad..5e831cb 100644 --- a/TUI/modalscreen/config_modal.py +++ b/TUI/modalscreen/config_modal.py @@ -380,7 +380,7 @@ class ConfigModal(ModalScreen): if stype in classes: cls = classes[stype] if hasattr(cls, "config") and callable(cls.config): - for field_def in cls.config(): + for field_def in cls.config_schema(): k = field_def.get("key") if k: provider_schema_map[k.upper()] = field_def @@ -395,7 +395,7 @@ class ConfigModal(ModalScreen): try: pcls = get_provider_class(item_name) if pcls and hasattr(pcls, "config") and callable(pcls.config): - for field_def in pcls.config(): + for field_def in pcls.config_schema(): k = field_def.get("key") if k: provider_schema_map[k.upper()] = field_def @@ -667,7 +667,7 @@ class ConfigModal(ModalScreen): for stype, cls in all_classes.items(): if hasattr(cls, "config") and callable(cls.config): try: - if cls.config(): + if cls.config_schema(): options.append(stype) except Exception: pass @@ -680,7 +680,7 @@ class ConfigModal(ModalScreen): pcls = get_provider_class(ptype) if pcls and hasattr(pcls, "config") and callable(pcls.config): try: - if pcls.config(): + if pcls.config_schema(): options.append(ptype) except Exception: pass @@ -856,7 +856,7 @@ class ConfigModal(ModalScreen): cls = classes[stype] # Use schema for defaults if present if hasattr(cls, "config") and callable(cls.config): - for field_def in cls.config(): + for field_def in cls.config_schema(): key = field_def.get("key") if key: val = field_def.get("default", "") @@ -890,7 +890,7 @@ class ConfigModal(ModalScreen): if pcls: # Use schema for defaults if hasattr(pcls, "config") and callable(pcls.config): - for field_def in pcls.config(): + for field_def in pcls.config_schema(): key = field_def.get("key") if key: new_config[key] = field_def.get("default", "") @@ -988,7 +988,7 @@ class ConfigModal(ModalScreen): if pcls: # Collect required keys from schema if hasattr(pcls, "config") and callable(pcls.config): - for field_def in pcls.config(): + for field_def in pcls.config_schema(): if field_def.get("required"): k = field_def.get("key") if k and k not in required_keys: diff --git a/TUI/modalscreen/search.py b/TUI/modalscreen/search.py index 4676e35..770033a 100644 --- a/TUI/modalscreen/search.py +++ b/TUI/modalscreen/search.py @@ -153,6 +153,9 @@ class SearchModal(ModalScreen): return source = self.source_select.value + if not source or not isinstance(source, str): + logger.warning("[search-modal] No source selected") + return # Clear existing results self.results_table.clear(columns=True) diff --git a/cmdlet/delete_url.py b/cmdlet/delete_url.py index f506f3c..44cfda1 100644 --- a/cmdlet/delete_url.py +++ b/cmdlet/delete_url.py @@ -4,15 +4,13 @@ from typing import Any, Dict, List, Sequence, Tuple import sys from SYS import pipeline as ctx -from . import _shared as sh - -Cmdlet, CmdletArg, SharedArgs, parse_cmdlet_args, get_field, normalize_hash = ( - sh.Cmdlet, - sh.CmdletArg, - sh.SharedArgs, - sh.parse_cmdlet_args, - sh.get_field, - sh.normalize_hash, +from ._shared import ( + Cmdlet, + CmdletArg, + SharedArgs, + parse_cmdlet_args, + get_field, + normalize_hash, ) from SYS.logger import log from Store import Store diff --git a/cmdlet/get_url.py b/cmdlet/get_url.py index d3512e5..48e6d57 100644 --- a/cmdlet/get_url.py +++ b/cmdlet/get_url.py @@ -8,14 +8,12 @@ import sys import re from fnmatch import fnmatch from urllib.parse import urlparse, parse_qsl, urlencode, urlunparse -from . import _shared as sh - -Cmdlet, SharedArgs, parse_cmdlet_args, get_field, normalize_hash = ( - sh.Cmdlet, - sh.SharedArgs, - sh.parse_cmdlet_args, - sh.get_field, - sh.normalize_hash, +from ._shared import ( + Cmdlet, + SharedArgs, + parse_cmdlet_args, + get_field, + normalize_hash, ) from SYS.logger import log from SYS.result_table import Table diff --git a/cmdlet/search_file.py b/cmdlet/search_file.py index 2e5bcf8..7077027 100644 --- a/cmdlet/search_file.py +++ b/cmdlet/search_file.py @@ -18,9 +18,7 @@ from SYS.rich_display import ( show_available_providers_panel, ) -from . import _shared as sh - -( +from ._shared import ( Cmdlet, CmdletArg, SharedArgs, @@ -29,15 +27,6 @@ from . import _shared as sh normalize_hash, first_title_tag, parse_hash_query, -) = ( - sh.Cmdlet, - sh.CmdletArg, - sh.SharedArgs, - sh.get_field, - sh.should_show_help, - sh.normalize_hash, - sh.first_title_tag, - sh.parse_hash_query, ) from SYS import pipeline as ctx diff --git a/cmdnat/__init__.py b/cmdnat/__init__.py index 499cae2..63a8dab 100644 --- a/cmdnat/__init__.py +++ b/cmdnat/__init__.py @@ -16,7 +16,7 @@ def _register_cmdlet_object(cmdlet_obj, registry: Dict[str, CmdletFn]) -> None: registry[cmdlet_obj.name.replace("_", "-").lower()] = run_fn # Cmdlet uses 'alias' (List[str]). Some older objects may use 'aliases'. - aliases = [] + aliases: list[str] = [] if hasattr(cmdlet_obj, "alias") and getattr(cmdlet_obj, "alias"): aliases.extend(getattr(cmdlet_obj, "alias") or []) if hasattr(cmdlet_obj, "aliases") and getattr(cmdlet_obj, "aliases"): diff --git a/scripts/bootstrap.py b/scripts/bootstrap.py index 06351e5..d01311c 100644 --- a/scripts/bootstrap.py +++ b/scripts/bootstrap.py @@ -55,11 +55,13 @@ from __future__ import annotations import argparse import os import platform +import re from pathlib import Path import shutil import subprocess import sys import time +from typing import Optional def run(cmd: list[str], quiet: bool = False, debug: bool = False, cwd: Optional[Path] = None) -> None: @@ -1088,7 +1090,7 @@ def main() -> int: # 7. CLI Verification pb.update("Verifying CLI configuration...") try: - rc = subprocess.run( + cli_verify_result = subprocess.run( [ str(venv_python), "-c", @@ -1098,7 +1100,7 @@ def main() -> int: stderr=subprocess.DEVNULL, check=False, ) - if rc.returncode != 0: + if cli_verify_result.returncode != 0: cmd = [ str(venv_python), "-c", @@ -1335,7 +1337,7 @@ if (Test-Path (Join-Path $repo 'CLI.py')) { else: # POSIX # If running as root (id 0), prefer /usr/bin or /usr/local/bin which are standard on PATH - if os.getuid() == 0: + if hasattr(os, "getuid") and os.getuid() == 0: user_bin = Path("/usr/local/bin") if not os.access(user_bin, os.W_OK): user_bin = Path("/usr/bin") diff --git a/scripts/hydrusnetwork.py b/scripts/hydrusnetwork.py index fa6421f..604dd14 100644 --- a/scripts/hydrusnetwork.py +++ b/scripts/hydrusnetwork.py @@ -369,7 +369,11 @@ def is_elevated() -> bool: return False else: try: - return os.geteuid() == 0 + # Use getattr for platform-specific os methods to satisfy Mypy + geteuid = getattr(os, "geteuid", None) + if geteuid: + return bool(geteuid() == 0) + return False except Exception: return False except Exception: @@ -476,9 +480,9 @@ def fix_permissions_unix( user = getpass.getuser() try: - pw = pwd.getpwnam(user) + pw = pwd.getpwnam(user) # type: ignore[attr-defined] uid = pw.pw_uid - gid = pw.pw_gid if not group else grp.getgrnam(group).gr_gid + gid = pw.pw_gid if not group else grp.getgrnam(group).gr_gid # type: ignore[attr-defined] except Exception: logging.warning("Could not resolve user/group to uid/gid; skipping chown.") return False @@ -500,16 +504,18 @@ def fix_permissions_unix( except Exception: # Best-effort fallback: chown/chmod individual entries for root_dir, dirs, files in os.walk(path): - try: - os.chown(root_dir, uid, gid) - except Exception: - pass - for fn in files: - fpath = os.path.join(root_dir, fn) + if hasattr(os, "chown"): try: - os.chown(fpath, uid, gid) + os.chown(root_dir, uid, gid) except Exception: pass + for fn in files: + fpath = os.path.join(root_dir, fn) + if hasattr(os, "chown"): + try: + os.chown(fpath, uid, gid) + except Exception: + pass # Fix modes: directories 0o755, files 0o644 (best-effort) for root_dir, dirs, files in os.walk(path): @@ -1454,11 +1460,12 @@ def main(argv: Optional[list[str]] = None) -> int: if p.exists(): client_found = p break + + run_client_script = None if client_found: # Prefer run_client helper located in the cloned repo; if missing, fall back to top-level scripts folder helper. script_dir = Path(__file__).resolve().parent helper_candidates = [dest / "run_client.py", script_dir / "run_client.py"] - run_client_script = None for cand in helper_candidates: if cand.exists(): run_client_script = cand @@ -1477,7 +1484,7 @@ def main(argv: Optional[list[str]] = None) -> int: ) else: if getattr(args, "install_service", False): - if run_client_script.exists(): + if run_client_script and run_client_script.exists(): cmd = [ str(venv_py), str(run_client_script), @@ -1513,7 +1520,7 @@ def main(argv: Optional[list[str]] = None) -> int: dest / "run_client.py", ) if getattr(args, "uninstall_service", False): - if run_client_script.exists(): + if run_client_script and run_client_script.exists(): cmd = [ str(venv_py), str(run_client_script), diff --git a/tool/ytdlp.py b/tool/ytdlp.py index 4e0f538..19f2d4a 100644 --- a/tool/ytdlp.py +++ b/tool/ytdlp.py @@ -27,6 +27,7 @@ from SYS.models import ( ) from SYS.pipeline_progress import PipelineProgress from SYS.utils import ensure_directory, sha256_file +from SYS.metadata import extract_ytdlp_tags _YTDLP_TRANSFER_STATE: Dict[str, Dict[str, Any]] = {} @@ -37,7 +38,7 @@ try: except Exception as exc: # pragma: no cover - handled at runtime yt_dlp = None # type: ignore gen_extractors = None # type: ignore - YTDLP_IMPORT_ERROR = exc + YTDLP_IMPORT_ERROR: Optional[Exception] = exc else: YTDLP_IMPORT_ERROR = None @@ -739,16 +740,16 @@ class YtDlpTool: # Progress + utility helpers for yt-dlp driven downloads (previously in cmdlet/download_media). _YTDLP_PROGRESS_BAR = ProgressBar() -_YTDLP_TRANSFER_STATE: Dict[str, Dict[str, Any]] = {} -_YTDLP_PROGRESS_ACTIVITY_LOCK = threading.Lock() -_YTDLP_PROGRESS_LAST_ACTIVITY = 0.0 _YTDLP_PROGRESS_ACTIVITY_LOCK = threading.Lock() _YTDLP_PROGRESS_LAST_ACTIVITY = 0.0 _SUBTITLE_EXTS = (".vtt", ".srt", ".ass", ".ssa", ".lrc") -def _progress_label(status: Dict[str, Any]) -> str: - info_dict = status.get("info_dict") if isinstance(status.get("info_dict"), dict) else {} +def _progress_label(status: Optional[Dict[str, Any]]) -> str: + if not status: + return "unknown" + raw_info = status.get("info_dict") + info_dict = raw_info if isinstance(raw_info, dict) else {} candidates = [ status.get("filename"), @@ -1244,7 +1245,7 @@ def download_media(opts: DownloadOptions, *, debug_logger: Optional[DebugLogger] debug( f"Skipping probe for playlist (item selection: {opts.playlist_items}), proceeding with download" ) - probe_result = {"url": opts.url} + probe_result: Optional[Dict[str, Any]] = {"url": opts.url} else: probe_cookiefile = None try: @@ -1286,7 +1287,7 @@ def download_media(opts: DownloadOptions, *, debug_logger: Optional[DebugLogger] debug(f"[yt-dlp] force_keyframes_at_cuts: {ytdl_options.get('force_keyframes_at_cuts', False)}") session_id = None - first_section_info = {} + first_section_info: Dict[str, Any] = {} if ytdl_options.get("download_sections"): live_ui, _ = PipelineProgress(pipeline_context).ui_and_pipe_index() quiet_sections = bool(opts.quiet) or (live_ui is not None) @@ -1447,20 +1448,20 @@ def download_media(opts: DownloadOptions, *, debug_logger: Optional[DebugLogger] raise DownloadError(str(exc)) from exc file_hash = sha256_file(media_path) - tags = [] + section_tags: List[str] = [] title = "" if first_section_info: title = first_section_info.get("title", "") if title: - tags.append(f"title:{title}") + section_tags.append(f"title:{title}") debug(f"Added title tag for section download: {title}") if first_section_info: - info_dict = first_section_info + info_dict_sec = first_section_info else: - info_dict = {"id": media_path.stem, "title": title or media_path.stem, "ext": media_path.suffix.lstrip(".")} + info_dict_sec = {"id": media_path.stem, "title": title or media_path.stem, "ext": media_path.suffix.lstrip(".")} - return DownloadMediaResult(path=media_path, info=info_dict, tag=tags, source_url=opts.url, hash_value=file_hash, paths=media_paths) + return DownloadMediaResult(path=media_path, info=info_dict_sec, tag=section_tags, source_url=opts.url, hash_value=file_hash, paths=media_paths) if not isinstance(info, dict): log(f"Unexpected yt-dlp response: {type(info)}", file=sys.stderr) @@ -1483,7 +1484,7 @@ def download_media(opts: DownloadOptions, *, debug_logger: Optional[DebugLogger] hash_value = None tags: List[str] = [] - if extract_ytdlp_tags: + if extract_ytdlp_tags is not None: try: tags = extract_ytdlp_tags(entry) except Exception as exc: @@ -1524,10 +1525,10 @@ def download_media(opts: DownloadOptions, *, debug_logger: Optional[DebugLogger] if debug_logger is not None: debug_logger.write_record("hash-error", {"path": str(media_path), "error": str(exc)}) - tags = [] - if extract_ytdlp_tags: + tags_res: List[str] = [] + if extract_ytdlp_tags is not None: try: - tags = extract_ytdlp_tags(entry) + tags_res = extract_ytdlp_tags(entry) except Exception as exc: log(f"Error extracting tags: {exc}", file=sys.stderr) @@ -1546,7 +1547,7 @@ def download_media(opts: DownloadOptions, *, debug_logger: Optional[DebugLogger] }, ) - return DownloadMediaResult(path=media_path, info=entry, tag=tags, source_url=source_url, hash_value=hash_value) + return DownloadMediaResult(path=media_path, info=entry, tag=tags_res, source_url=source_url, hash_value=hash_value) def _download_with_timeout(opts: DownloadOptions, timeout_seconds: int = 300) -> Any: