Merge branch 'style/ruff-fixes'

This commit is contained in:
2026-01-19 06:36:16 -08:00
93 changed files with 3097 additions and 3275 deletions
+3 -1
View File
@@ -239,4 +239,6 @@ scripts/mm
tmp_*
*.secret
# Ignore local ZeroTier auth tokens (project-local copy)
authtoken.secret
authtoken.secret
mypy.ini
+5 -5
View File
@@ -15,7 +15,7 @@ import time
import traceback
import re
import os
from typing import Optional, Dict, Any, Callable, BinaryIO, List, Iterable, Set, Union
from typing import Optional, Dict, Any, Callable, List, Union
from pathlib import Path
from urllib.parse import unquote, urlparse, parse_qs
import logging
@@ -452,7 +452,7 @@ class HTTPClient:
else:
kwargs["headers"] = self._get_headers()
last_exception = None
last_exception: Exception | None = None
for attempt in range(self.retries):
self._debug_panel(
@@ -875,7 +875,7 @@ def download_direct_file(
pass
tags: List[str] = []
if extract_ytdlp_tags:
if extract_ytdlp_tags is not None:
try:
tags = extract_ytdlp_tags(info)
except Exception as exc:
@@ -884,7 +884,7 @@ def download_direct_file(
if not any(str(t).startswith("title:") for t in tags):
info["title"] = str(filename)
tags = []
if extract_ytdlp_tags:
if extract_ytdlp_tags is not None:
try:
tags = extract_ytdlp_tags(info)
except Exception as exc:
@@ -1135,7 +1135,7 @@ class AsyncHTTPClient:
else:
kwargs["headers"] = self._get_headers()
last_exception = None
last_exception: Exception | None = None
for attempt in range(self.retries):
try:
+12 -11
View File
@@ -2066,9 +2066,9 @@ def _derive_title(
"original_display_filename",
"original_filename",
):
value = entry.get(key)
if isinstance(value, str):
cleaned = value.strip()
raw_val = entry.get(key)
if isinstance(raw_val, str):
cleaned = raw_val.strip()
if cleaned:
return cleaned
return None
@@ -2444,7 +2444,7 @@ def fetch_hydrus_metadata_by_url(payload: Dict[str, Any]) -> Dict[str, Any]:
matched_url = None
normalized_reported = None
seen: Set[str] = set()
queue = deque()
queue: deque[str] = deque()
for variant in _generate_hydrus_url_variants(url):
queue.append(variant)
if not queue:
@@ -2486,11 +2486,11 @@ def fetch_hydrus_metadata_by_url(payload: Dict[str, Any]) -> Dict[str, Any]:
if isinstance(raw_hashes, list):
for item in raw_hashes:
try:
normalized = _normalize_hash(item)
norm_hash = _normalize_hash(item)
except ValueError:
continue
if normalized:
response_hashes_list.append(normalized)
if norm_hash:
response_hashes_list.append(norm_hash)
raw_ids = response.get("file_ids") or response.get("file_id")
if isinstance(raw_ids, list):
for item in raw_ids:
@@ -2510,12 +2510,13 @@ def fetch_hydrus_metadata_by_url(payload: Dict[str, Any]) -> Dict[str, Any]:
continue
status_hash = entry.get("hash") or entry.get("file_hash")
if status_hash:
norm_status: Optional[str] = None
try:
normalized = _normalize_hash(status_hash)
norm_status = _normalize_hash(status_hash)
except ValueError:
normalized = None
if normalized:
response_hashes_list.append(normalized)
pass
if norm_status:
response_hashes_list.append(norm_status)
status_id = entry.get("file_id") or entry.get("fileid")
if status_id is not None:
try:
+3
View File
@@ -0,0 +1,3 @@
"""Medeia API helpers that power external integrations."""
__all__ = []
+1 -2
View File
@@ -12,7 +12,6 @@ import sys
import time
from typing import Any, Dict, Optional, Set, List, Sequence, Tuple
import time
from urllib.parse import urlparse
from SYS.logger import log, debug
@@ -1124,7 +1123,7 @@ def unlock_link_cmdlet(result: Any, args: Sequence[str], config: Dict[str, Any])
# Note: The cmdlet wrapper will handle emitting to pipeline
return 0
else:
log(f"❌ Failed to unlock link or already unrestricted", file=sys.stderr)
log("❌ Failed to unlock link or already unrestricted", file=sys.stderr)
return 1
-1
View File
@@ -1,6 +1,5 @@
from __future__ import annotations
import json
from typing import Any, Dict, Optional
from .HTTP import HTTPClient
+3 -3
View File
@@ -92,7 +92,7 @@
"(hitfile\\.net/[a-z0-9A-Z]{4,9})"
],
"regexp": "(hitf\\.(to|cc)/([a-z0-9A-Z]{4,9}))|(htfl\\.(net|to|cc)/([a-z0-9A-Z]{4,9}))|(hitfile\\.(net)/download/free/([a-z0-9A-Z]{4,9}))|((hitfile\\.net/[a-z0-9A-Z]{4,9}))",
"status": false
"status": true
},
"mega": {
"name": "mega",
@@ -353,7 +353,7 @@
"filedot\\.(xyz|to|top)/([0-9a-zA-Z]{12})"
],
"regexp": "filedot\\.(xyz|to|top)/([0-9a-zA-Z]{12})",
"status": true
"status": false
},
"filefactory": {
"name": "filefactory",
@@ -622,7 +622,7 @@
"(simfileshare\\.net/download/[0-9]+/)"
],
"regexp": "(simfileshare\\.net/download/[0-9]+/)",
"status": true
"status": false
},
"streamtape": {
"name": "streamtape",
+24 -12
View File
@@ -13,8 +13,6 @@ from __future__ import annotations
import sqlite3
import json
import logging
import subprocess
import shutil
import time
import os
from contextlib import contextmanager
@@ -57,6 +55,7 @@ def _db_retry(max_attempts: int = 6, base_sleep: float = 0.1):
return _decorator
# Try to import optional dependencies
mutagen: Any
try:
import mutagen
except ImportError:
@@ -74,12 +73,12 @@ try:
METADATA_AVAILABLE = True
except ImportError:
_read_sidecar_metadata = None
_derive_sidecar_path = None
write_tags = None
write_tags_to_file = None
embed_metadata_in_file = None
read_tags_from_file = None
_read_sidecar_metadata = None # type: ignore
_derive_sidecar_path = None # type: ignore
write_tags = None # type: ignore
write_tags_to_file = None # type: ignore
embed_metadata_in_file = None # type: ignore
read_tags_from_file = None # type: ignore
METADATA_AVAILABLE = False
# Media extensions to index
@@ -221,7 +220,7 @@ class API_folder_store:
"""
self.library_root = expand_path(library_root).resolve()
self.db_path = self.library_root / self.DB_NAME
self.connection: Optional[sqlite3.Connection] = None
self.connection: sqlite3.Connection = None # type: ignore
# Use the shared lock
self._db_lock = self._shared_db_lock
mm_debug(f"[folder-db] init: root={self.library_root} db={self.db_path}")
@@ -303,8 +302,21 @@ class API_folder_store:
if should_check_empty:
# Check if there are any files or directories in the library root (excluding the DB itself if it was just created)
# We use a generator and next() for efficiency.
existing_items = [item for item in self.library_root.iterdir() if item.name != self.DB_NAME]
# Allow an empty 'incoming' directory created by upload flow to exist
# (this prevents a false-positive safety check when an upload endpoint
# creates the incoming dir before DB initialization).
if existing_items:
if len(existing_items) == 1 and existing_items[0].name == "incoming" and existing_items[0].is_dir():
try:
# If the incoming directory is empty, treat it as harmless.
if not any(existing_items[0].iterdir()):
existing_items = []
except Exception:
# If we can't inspect it safely, leave the original items in place
pass
if existing_items:
# Log the items found for debugging
item_names = [i.name for i in existing_items[:5]]
@@ -1378,7 +1390,7 @@ class API_folder_store:
(file_hash,
existing_title[0]),
)
logger.debug(f"[save_tags] Preserved existing title tag")
logger.debug("[save_tags] Preserved existing title tag")
elif not existing_title and not new_title_provided:
filename_without_ext = abs_path.stem
if filename_without_ext:
@@ -3807,7 +3819,7 @@ def migrate_all(library_root: Path,
db),
}
finally:
if should_close:
if should_close and db is not None:
db.close()
-1
View File
@@ -12,7 +12,6 @@ The LoC JSON API does not require an API key.
from __future__ import annotations
import json
from typing import Any, Dict, Optional
from .base import API, ApiError
-1
View File
@@ -12,7 +12,6 @@ Authentication headers required for most endpoints:
from __future__ import annotations
import hashlib
import json
import time
from typing import Any, Dict, List, Optional
+1 -1
View File
@@ -32,7 +32,7 @@ from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from SYS.logger import debug, log
from SYS.logger import debug
# Optional Python ZeroTier bindings - prefer them when available
_HAVE_PY_ZEROTIER = False
+14 -2739
View File
File diff suppressed because it is too large Load Diff
+3 -3
View File
@@ -351,10 +351,10 @@ class MPV:
pipeline += f" | add-file -path {_q(path or '')}"
try:
from TUI.pipeline_runner import PipelineExecutor # noqa: WPS433
from TUI.pipeline_runner import PipelineRunner # noqa: WPS433
executor = PipelineExecutor()
result = executor.run_pipeline(pipeline)
runner = PipelineRunner()
result = runner.run_pipeline(pipeline)
return {
"success": bool(getattr(result,
"success",
+4 -4
View File
@@ -74,7 +74,7 @@ OBS_ID_REQUEST = 1001
def _run_pipeline(pipeline_text: str, *, seeds: Any = None) -> Dict[str, Any]:
# Import after sys.path fix.
from TUI.pipeline_runner import PipelineExecutor # noqa: WPS433
from TUI.pipeline_runner import PipelineRunner # noqa: WPS433
def _table_to_payload(table: Any) -> Optional[Dict[str, Any]]:
if table is None:
@@ -133,8 +133,8 @@ def _run_pipeline(pipeline_text: str, *, seeds: Any = None) -> Dict[str, Any]:
"rows": rows_payload
}
executor = PipelineExecutor()
result = executor.run_pipeline(pipeline_text, seeds=seeds)
runner = PipelineRunner()
result = runner.run_pipeline(pipeline_text, seeds=seeds)
table_payload = None
try:
@@ -905,7 +905,7 @@ def main(argv: Optional[list[str]] = None) -> int:
]
)
_append_helper_log(
f"[helper] published store-choices to user-data/medeia-store-choices-cached"
"[helper] published store-choices to user-data/medeia-store-choices-cached"
)
except Exception as exc:
_append_helper_log(
+2 -6
View File
@@ -1,15 +1,12 @@
from __future__ import annotations
import os
import random
import re
import shutil
import string
import subprocess
import time
import sys
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Tuple
from typing import Any, Dict, List, Optional, Tuple
from urllib.parse import urlparse
from API.Tidal import (
@@ -20,7 +17,6 @@ from API.Tidal import (
stringify,
)
from ProviderCore.base import Provider, SearchResult, parse_inline_query_arguments
from ProviderCore.inline_utils import collect_choice
from cmdlet._shared import get_field
from SYS import pipeline as pipeline_context
from SYS.logger import debug, log
@@ -1282,7 +1278,7 @@ class HIFI(Provider):
)
return materialized
def handle_url(self, url: str, *, output_dir: Optional[Path] = None) -> Tuple[bool, Optional[Path]]:
def handle_url(self, url: str, *, output_dir: Optional[Path] = None) -> Tuple[bool, Optional[Path | Dict[str, Any]]]:
view, identifier = self._parse_tidal_url(url)
if not view:
return False, None
+2 -5
View File
@@ -1,15 +1,12 @@
from __future__ import annotations
import os
import random
import re
import shutil
import string
import subprocess
import time
import sys
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Tuple
from typing import Any, Dict, List, Optional, Tuple
from urllib.parse import urlparse
from API.Tidal import (
@@ -1268,7 +1265,7 @@ class Tidal(Provider):
)
return materialized
def handle_url(self, url: str, *, output_dir: Optional[Path] = None) -> Tuple[bool, Optional[Path]]:
def handle_url(self, url: str, *, output_dir: Optional[Path] = None) -> Tuple[bool, Optional[Path | Dict[str, Any]]]:
view, identifier = self._parse_tidal_url(url)
if not view:
return False, None
+2 -2
View File
@@ -585,7 +585,7 @@ class AllDebrid(TableProviderMixin, Provider):
URL_DOMAINS = ()
@classmethod
def config(cls) -> List[Dict[str, Any]]:
def config_schema(cls) -> List[Dict[str, Any]]:
return [
{
"key": "api_key",
@@ -646,7 +646,7 @@ class AllDebrid(TableProviderMixin, Provider):
return spec
return resolve_magnet_spec(str(target)) if isinstance(target, str) else None
def handle_url(self, url: str, *, output_dir: Optional[Path] = None) -> Tuple[bool, Optional[Path]]:
def handle_url(self, url: str, *, output_dir: Optional[Path] = None) -> Tuple[bool, Optional[Path | Dict[str, Any]]]:
magnet_id = _parse_alldebrid_magnet_id(url)
if magnet_id is not None:
return True, {
+2 -2
View File
@@ -2,7 +2,7 @@ from __future__ import annotations
import os
import sys
from typing import Any, Dict, Optional
from typing import Any, Dict, List, Optional
from ProviderCore.base import Provider
from SYS.logger import log
@@ -53,7 +53,7 @@ class FileIO(Provider):
PROVIDER_NAME = "file.io"
@classmethod
def config(cls) -> List[Dict[str, Any]]:
def config_schema(cls) -> List[Dict[str, Any]]:
return [
{
"key": "api_key",
+1 -1
View File
@@ -468,7 +468,7 @@ class InternetArchive(Provider):
URL = ("archive.org",)
@classmethod
def config(cls) -> List[Dict[str, Any]]:
def config_schema(cls) -> List[Dict[str, Any]]:
return [
{
"key": "access_key",
+1 -1
View File
@@ -1265,7 +1265,7 @@ class LibgenSearch:
_call(log_info, f"[libgen] Using mirror: {mirror}")
return results
else:
_call(log_info, f"[libgen] Mirror returned 0 results; stopping mirror fallback")
_call(log_info, "[libgen] Mirror returned 0 results; stopping mirror fallback")
break
except requests.exceptions.Timeout:
_call(log_info, f"[libgen] Mirror timed out: {mirror}")
+1 -1
View File
@@ -235,7 +235,7 @@ class Matrix(TableProviderMixin, Provider):
"""
@classmethod
def config(cls) -> List[Dict[str, Any]]:
def config_schema(cls) -> List[Dict[str, Any]]:
return [
{
"key": "homeserver",
+3 -3
View File
@@ -11,7 +11,7 @@ import sys
import tempfile
import time
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from typing import Any, Callable, Dict, List, Optional, Tuple
from urllib.parse import urlparse
import requests
@@ -20,7 +20,7 @@ from API.HTTP import HTTPClient, get_requests_verify_value
from ProviderCore.base import Provider, SearchResult
from SYS.utils import sanitize_filename
from SYS.cli_syntax import get_field, get_free_text, parse_query
from SYS.logger import debug, log
from SYS.logger import log
from Provider.metadata_provider import (
archive_item_metadata_to_tags,
fetch_archive_item_metadata,
@@ -287,7 +287,7 @@ class OpenLibrary(Provider):
}
@classmethod
def config(cls) -> List[Dict[str, Any]]:
def config_schema(cls) -> List[Dict[str, Any]]:
return [
{
"key": "email",
+10 -3
View File
@@ -245,7 +245,7 @@ class Soulseek(Provider):
return False
@classmethod
def config(cls) -> List[Dict[str, Any]]:
def config_schema(cls) -> List[Dict[str, Any]]:
return [
{
"key": "username",
@@ -325,6 +325,10 @@ class Soulseek(Provider):
)
return None
# Cast to str for Mypy
username = str(username)
filename = str(filename)
# Use tempfile directory as default if generic path elements were passed or None.
if output_dir is None:
import tempfile
@@ -363,10 +367,13 @@ class Soulseek(Provider):
target_dir = Path(tempfile.gettempdir()) / "Medios" / "Soulseek"
asyncio.set_event_loop(loop)
# Cast to str for Mypy
username_str = str(username)
filename_str = str(filename)
return loop.run_until_complete(
download_soulseek_file(
username=username,
filename=filename,
username=username_str,
filename=filename_str,
output_dir=target_dir,
timeout=self.MAX_WAIT_TRANSFER,
)
+3 -3
View File
@@ -7,7 +7,7 @@ import sys
import time
import threading
from pathlib import Path
from typing import Any, Dict, Optional, Sequence, Tuple
from typing import Any, Dict, List, Optional, Sequence, Tuple
from urllib.parse import urlparse
from ProviderCore.base import Provider, SearchResult
@@ -150,7 +150,7 @@ class Telegram(Provider):
URL = ("t.me", "telegram.me")
@classmethod
def config(cls) -> List[Dict[str, Any]]:
def config_schema(cls) -> List[Dict[str, Any]]:
return [
{
"key": "app_id",
@@ -1175,7 +1175,7 @@ class Telegram(Provider):
raise ValueError("Not a Telegram URL")
return self._download_message_media_sync(url=url, output_dir=output_dir)
def handle_url(self, url: str, *, output_dir: Optional[Path] = None) -> Tuple[bool, Optional[Path]]:
def handle_url(self, url: str, *, output_dir: Optional[Path] = None) -> Tuple[bool, Optional[Path | Dict[str, Any]]]:
"""Optional provider override to parse and act on URLs."""
if not _looks_like_telegram_message_url(url):
return False, None
-1
View File
@@ -109,7 +109,6 @@ class YouTube(TableProviderMixin, Provider):
def validate(self) -> bool:
try:
import yt_dlp # type: ignore
return True
except Exception:
+2 -5
View File
@@ -9,13 +9,11 @@ This keeps format selection logic in ytdlp and leaves add-file plug-and-play.
from __future__ import annotations
import sys
from typing import Any, Dict, Iterable, List, Optional, Tuple
from ProviderCore.base import Provider, SearchResult
from SYS.provider_helpers import TableProviderMixin
from SYS.logger import log, debug
from tool.ytdlp import list_formats, is_url_supported_by_ytdlp
from SYS.logger import debug
class ytdlp(TableProviderMixin, Provider):
@@ -196,7 +194,6 @@ class ytdlp(TableProviderMixin, Provider):
def validate(self) -> bool:
"""Validate yt-dlp availability."""
try:
import yt_dlp # type: ignore
return True
except Exception:
return False
@@ -295,7 +292,7 @@ try:
debug(f"[ytdlp] Selection routed with format_id: {format_id}")
return result_args
debug(f"[ytdlp] Warning: No selection args or format_id found in row")
debug("[ytdlp] Warning: No selection args or format_id found in row")
return []
register_provider(
+4 -4
View File
@@ -2,7 +2,7 @@ from __future__ import annotations
import re
from abc import ABC, abstractmethod
from abc import ABC
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence, Tuple, Callable
@@ -24,7 +24,7 @@ class SearchResult:
size_bytes: Optional[int] = None
tag: set[str] = field(default_factory=set) # Searchable tag values
columns: List[Tuple[str, str]] = field(default_factory=list) # Display columns
selection_action: Optional[Dict[str, Any]] = None
selection_action: Optional[List[str]] = None
selection_args: Optional[List[str]] = None
full_metadata: Dict[str, Any] = field(default_factory=dict) # Extra metadata
@@ -150,7 +150,7 @@ class Provider(ABC):
).lower()
@classmethod
def config(cls) -> List[Dict[str, Any]]:
def config_schema(cls) -> List[Dict[str, Any]]:
"""Return configuration schema for this provider.
Returns a list of dicts, each defining a field:
@@ -228,7 +228,7 @@ class Provider(ABC):
_ = config
return 0
def handle_url(self, url: str, *, output_dir: Optional[Path] = None) -> Tuple[bool, Optional[Path]]:
def handle_url(self, url: str, *, output_dir: Optional[Path] = None) -> Tuple[bool, Optional[Path | Dict[str, Any]]]:
"""Optional provider override to parse and act on URLs."""
_ = url
View File
-2
View File
@@ -1,9 +1,7 @@
from __future__ import annotations
import os
import sys
import subprocess
import atexit
from pathlib import Path
from typing import Optional
+12 -10
View File
@@ -13,12 +13,14 @@ from typing import Any, Dict, List, Optional, Set, Tuple
# stubs if prompt_toolkit is not available so imports remain safe for testing.
try:
from prompt_toolkit.document import Document
from prompt_toolkit.lexers import Lexer
from prompt_toolkit.lexers import Lexer as _PTK_Lexer
except Exception: # pragma: no cover - optional dependency
Document = object # type: ignore
# Fallback to a simple object when prompt_toolkit is not available
_PTK_Lexer = object # type: ignore
class Lexer: # simple fallback base
pass
# Expose a stable name used by the rest of the module
Lexer = _PTK_Lexer
class SelectionSyntax:
@@ -216,19 +218,19 @@ class SelectionFilterSyntax:
if ":" in s:
parts = [p.strip() for p in s.split(":")]
if len(parts) == 2 and all(p.isdigit() for p in parts):
m, sec = parts
return max(0, int(m) * 60 + int(sec))
m_str, sec_str = parts
return max(0, int(m_str) * 60 + int(sec_str))
if len(parts) == 3 and all(p.isdigit() for p in parts):
h, m, sec = parts
return max(0, int(h) * 3600 + int(m) * 60 + int(sec))
h_str, m_str, sec_str = parts
return max(0, int(h_str) * 3600 + int(m_str) * 60 + int(sec_str))
# token format: 1h2m3s (tokens can appear in any combination)
total = 0
found = False
for m in SelectionFilterSyntax._DUR_TOKEN_RE.finditer(s):
for match in SelectionFilterSyntax._DUR_TOKEN_RE.finditer(s):
found = True
n = int(m.group(1))
unit = m.group(2).lower()
n = int(match.group(1))
unit = match.group(2).lower()
if unit == "h":
total += n * 3600
elif unit == "m":
+1 -5
View File
@@ -5,7 +5,7 @@ from __future__ import annotations
import re
import tempfile
from pathlib import Path
from typing import Any, Dict, Optional
from typing import Any, Dict, Optional, List
from SYS.logger import log
from SYS.utils import expand_path
@@ -722,10 +722,6 @@ def reload_config(
return load_config(config_dir=config_dir, filename=filename)
def clear_config_cache() -> None:
_CONFIG_CACHE.clear()
def _validate_config_safety(config: Dict[str, Any]) -> None:
"""Check for dangerous configurations, like folder stores in non-empty dirs."""
store = config.get("store")
+6 -6
View File
@@ -220,11 +220,11 @@ def extract_records(doc_or_html: Any, base_url: Optional[str] = None, xpaths: Op
records: List[Dict[str, str]] = []
for row in rows:
nr: Dict[str, str] = {}
row_norm: Dict[str, str] = {}
for k, v in (row or {}).items():
nk = normalize_header(str(k or ""))
nr[nk] = (str(v).strip() if v is not None else "")
records.append(nr)
row_norm[nk] = (str(v).strip() if v is not None else "")
records.append(row_norm)
# Attempt to recover hrefs by matching anchor text -> href
try:
@@ -265,11 +265,11 @@ def extract_records(doc_or_html: Any, base_url: Optional[str] = None, xpaths: Op
# Normalize keys (map platform->system etc)
normed: List[Dict[str, str]] = []
for r in records:
nr: Dict[str, str] = {}
norm_row: Dict[str, str] = {}
for k, v in (r or {}).items():
nk = normalize_header(k)
nr[nk] = v
normed.append(nr)
norm_row[nk] = v
normed.append(norm_row)
return normed, chosen
+5 -5
View File
@@ -24,16 +24,16 @@ def _coerce_value(value: Any) -> str:
if isinstance(value, bool):
return "true" if value else "false"
if isinstance(value, (list, tuple, set)):
parts = [_coerce_value(v) for v in value]
cleaned = [part for part in parts if part]
parts_list = [_coerce_value(v) for v in value]
cleaned = [part for part in parts_list if part]
return ", ".join(cleaned)
if isinstance(value, dict):
parts: List[str] = []
dict_parts: List[str] = []
for subkey, subvalue in value.items():
part = _coerce_value(subvalue)
if part:
parts.append(f"{subkey}:{part}")
return ", ".join(parts)
dict_parts.append(f"{subkey}:{part}")
return ", ".join(dict_parts)
try:
return str(value).strip()
except Exception:
+1 -2
View File
@@ -140,7 +140,7 @@ def debug_inspect(
value=value,
max_string=100_000,
max_length=100_000,
)
) # type: ignore[call-arg]
except TypeError:
rich_inspect(
obj,
@@ -155,7 +155,6 @@ def debug_inspect(
value=value,
)
def log(*args, **kwargs) -> None:
"""Print with automatic file.function prefix.
+329 -55
View File
@@ -4,13 +4,10 @@ import subprocess
import sys
import shutil
from SYS.logger import log, debug
from urllib.parse import urlsplit, urlunsplit, unquote
from collections import deque
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Sequence, Set, Tuple
from API.HydrusNetwork import apply_hydrus_tag_mutation, fetch_hydrus_metadata, fetch_hydrus_metadata_by_url
from SYS.models import FileRelationshipTracker
try: # Optional; used when available for richer metadata fetches
import yt_dlp
@@ -20,6 +17,14 @@ try: # Optional; used for IMDb lookup without API key
from imdbinfo.services import search_title # type: ignore
except Exception: # pragma: no cover - optional dependency
search_title = None # type: ignore[assignment]
try:
import mutagen
except ImportError:
mutagen = None
try:
import musicbrainzngs
except ImportError:
musicbrainzngs = None
def value_normalize(value: Any) -> str:
@@ -96,6 +101,52 @@ def _sanitize_url(value: Optional[str]) -> Optional[str]:
return cleaned
def sanitize_metadata_value(value: Any) -> str:
if value is None:
return ""
if isinstance(value, (list, tuple)):
value = ", ".join(str(v) for v in value if v)
return str(value).strip().replace("\n", " ").replace("\r", " ")
def unique_preserve_order(items: Iterable[Any]) -> list[Any]:
seen = set()
result = []
for item in items:
if item not in seen:
seen.add(item)
result.append(item)
return result
def fetch_musicbrainz_tags(mbid: str, entity: str = "release") -> Dict[str, Any]:
if not musicbrainzngs:
return {"tag": []}
musicbrainzngs.set_useragent("Medeia-Macina", "0.1")
tags: list[str] = []
try:
if entity == "release":
res = musicbrainzngs.get_release_by_id(mbid, includes=["tags"])
tags_list = res.get("release", {}).get("tag-list", [])
elif entity == "recording":
res = musicbrainzngs.get_recording_by_id(mbid, includes=["tags"])
tags_list = res.get("recording", {}).get("tag-list", [])
elif entity == "artist":
res = musicbrainzngs.get_artist_by_id(mbid, includes=["tags"])
tags_list = res.get("artist", {}).get("tag-list", [])
else:
return {"tag": []}
for t in tags_list:
if isinstance(t, dict) and "name" in t:
tags.append(t["name"])
except Exception as exc:
debug(f"MusicBrainz lookup failed: {exc}")
return {"tag": tags}
def _clean_existing_tags(existing: Any) -> List[str]:
tags: List[str] = []
seen: Set[str] = set()
@@ -604,7 +655,7 @@ def write_tags(
# Write via consolidated function
try:
lines = []
lines: List[str] = []
lines.extend(str(tag).strip().lower() for tag in tag_list if str(tag).strip())
if lines:
@@ -2418,11 +2469,6 @@ def scrape_url_metadata(
try:
import json as json_module
try:
from SYS.metadata import extract_ytdlp_tags
except ImportError:
extract_ytdlp_tags = None
# Build yt-dlp command with playlist support
# IMPORTANT: Do NOT use --flat-playlist! It strips metadata like artist, album, uploader, genre
# Without it, yt-dlp gives us full metadata in an 'entries' array within a single JSON object
@@ -2465,14 +2511,13 @@ def scrape_url_metadata(
# is_playlist = 'entries' in data and isinstance(data.get('entries'), list)
# Extract tags and playlist items
tags = []
playlist_items = []
tags: List[str] = []
playlist_items: List[Dict[str, Any]] = []
# IMPORTANT: Extract album/playlist-level tags FIRST (before processing entries)
# This ensures we get metadata about the collection, not just individual tracks
if extract_ytdlp_tags:
album_tags = extract_ytdlp_tags(data)
tags.extend(album_tags)
album_tags = extract_ytdlp_tags(data)
tags.extend(album_tags)
# Case 1: Entries are nested in the main object (standard playlist structure)
if "entries" in data and isinstance(data.get("entries"), list):
@@ -2496,41 +2541,40 @@ def scrape_url_metadata(
# Extract tags from each entry and merge (but don't duplicate album-level tags)
# Only merge entry tags that are multi-value prefixes (not single-value like title:, artist:, etc.)
if extract_ytdlp_tags:
entry_tags = extract_ytdlp_tags(entry)
entry_tags = extract_ytdlp_tags(entry)
# Single-value namespaces that should not be duplicated from entries
single_value_namespaces = {
"title",
"artist",
"album",
"creator",
"channel",
"release_date",
"upload_date",
"license",
"location",
}
# Single-value namespaces that should not be duplicated from entries
single_value_namespaces = {
"title",
"artist",
"album",
"creator",
"channel",
"release_date",
"upload_date",
"license",
"location",
}
for tag in entry_tags:
# Extract the namespace (part before the colon)
tag_namespace = tag.split(":",
1)[0].lower(
) if ":" in tag else None
for tag in entry_tags:
# Extract the namespace (part before the colon)
tag_namespace = tag.split(":",
1)[0].lower(
) if ":" in tag else None
# Skip if this namespace already exists in tags (from album level)
if tag_namespace and tag_namespace in single_value_namespaces:
# Check if any tag with this namespace already exists in tags
already_has_namespace = any(
t.split(":",
1)[0].lower() == tag_namespace for t in tags
if ":" in t
)
if already_has_namespace:
continue # Skip this tag, keep the album-level one
# Skip if this namespace already exists in tags (from album level)
if tag_namespace and tag_namespace in single_value_namespaces:
# Check if any tag with this namespace already exists in tags
already_has_namespace = any(
t.split(":",
1)[0].lower() == tag_namespace for t in tags
if ":" in t
)
if already_has_namespace:
continue # Skip this tag, keep the album-level one
if tag not in tags: # Avoid exact duplicates
tags.append(tag)
if tag not in tags: # Avoid exact duplicates
tags.append(tag)
# Case 2: Playlist detected by playlist_count field (BandCamp albums, etc.)
# These need a separate call with --flat-playlist to get the actual entries
@@ -2585,11 +2629,11 @@ def scrape_url_metadata(
)
except json_module.JSONDecodeError:
pass
except Exception as e:
except Exception:
pass # Silently ignore if we can't get playlist entries
# Fallback: if still no tags detected, get from first item
if not tags and extract_ytdlp_tags:
if not tags:
tags = extract_ytdlp_tags(data)
# Extract formats from the main data object
@@ -2598,11 +2642,7 @@ def scrape_url_metadata(
formats = extract_url_formats(data.get("formats", []))
# Deduplicate tags by namespace to prevent duplicate title:, artist:, etc.
try:
if dedup_tags_by_namespace:
tags = dedup_tags_by_namespace(tags, keep_first=True)
except Exception:
pass # If dedup fails, return tags as-is
tags = dedup_tags_by_namespace(tags, keep_first=True)
return title, tags, formats, playlist_items
@@ -2620,8 +2660,8 @@ def extract_url_formats(formats: list) -> List[Tuple[str, str]]:
Returns list of (display_label, format_id) tuples.
"""
try:
video_formats = {} # {resolution: format_data}
audio_formats = {} # {quality_label: format_data}
video_formats: Dict[str, Dict[str, Any]] = {} # {resolution: format_data}
audio_formats: Dict[str, Dict[str, Any]] = {} # {quality_label: format_data}
for fmt in formats:
vcodec = fmt.get("vcodec", "none")
@@ -2658,7 +2698,7 @@ def extract_url_formats(formats: list) -> List[Tuple[str, str]]:
"abr": abr,
}
result = []
result: List[Tuple[str, str]] = []
# Add video formats in descending resolution order
for res in sorted(video_formats.keys(),
@@ -2677,3 +2717,237 @@ def extract_url_formats(formats: list) -> List[Tuple[str, str]]:
except Exception as e:
log(f"Error extracting formats: {e}", file=sys.stderr)
return []
def prepare_ffmpeg_metadata(payload: Optional[dict[str, Any]]) -> dict[str, str]:
if not isinstance(payload, dict):
return {}
metadata: dict[str, str] = {}
def set_field(key: str, raw: Any, limit: int = 2000) -> None:
sanitized = sanitize_metadata_value(raw)
if not sanitized:
return
if len(sanitized) > limit:
sanitized = sanitized[:limit]
metadata[key] = sanitized
set_field("title", payload.get("title"))
set_field("artist", payload.get("artist"), 512)
set_field("album", payload.get("album"), 512)
set_field("date", payload.get("year") or payload.get("date"), 20)
comment = payload.get("comment")
tags_value = payload.get("tags")
tag_strings: list[str] = []
artists_from_tags: list[str] = []
albums_from_tags: list[str] = []
genres_from_tags: list[str] = []
if isinstance(tags_value, list):
for raw_tag in tags_value:
if raw_tag is None:
continue
if not isinstance(raw_tag, str):
raw_tag = str(raw_tag)
tag = raw_tag.strip()
if not tag:
continue
tag_strings.append(tag)
namespace, sep, value = tag.partition(":")
if sep and value:
ns = namespace.strip().lower()
value = value.strip()
if ns in {"artist", "creator", "author", "performer"}:
artists_from_tags.append(value)
elif ns in {"album", "series", "collection", "group"}:
albums_from_tags.append(value)
elif ns in {"genre", "rating"}:
genres_from_tags.append(value)
elif ns in {"comment", "description"} and not comment:
comment = value
elif ns in {"year", "date"} and not (payload.get("year") or payload.get("date")):
set_field("date", value, 20)
else:
genres_from_tags.append(tag)
if "artist" not in metadata and artists_from_tags:
set_field("artist", ", ".join(unique_preserve_order(artists_from_tags)[:3]), 512)
if "album" not in metadata and albums_from_tags:
set_field("album", unique_preserve_order(albums_from_tags)[0], 512)
if genres_from_tags:
set_field("genre", ", ".join(unique_preserve_order(genres_from_tags)[:5]), 256)
if tag_strings:
joined_tags = ", ".join(tag_strings[:50])
set_field("keywords", joined_tags, 2000)
if not comment:
comment = joined_tags
if comment:
set_field("comment", str(comment), 2000)
set_field("description", str(comment), 2000)
return metadata
def apply_mutagen_metadata(path: Path, metadata: dict[str, str], fmt: str) -> None:
if fmt != "audio":
return
if not metadata:
return
if mutagen is None:
return
try:
audio = mutagen.File(path, easy=True) # type: ignore[attr-defined]
except Exception as exc: # pragma: no cover - best effort only
log(f"mutagen load failed: {exc}", file=sys.stderr)
return
if audio is None:
return
field_map = {
"title": "title",
"artist": "artist",
"album": "album",
"genre": "genre",
"comment": "comment",
"description": "comment",
"date": "date",
}
changed = False
for source_key, target_key in field_map.items():
value = metadata.get(source_key)
if not value:
continue
try:
audio[target_key] = [value]
changed = True
except Exception: # pragma: no cover - best effort only
continue
if not changed:
return
try:
audio.save()
except Exception as exc: # pragma: no cover - best effort only
log(f"mutagen save failed: {exc}", file=sys.stderr)
def build_ffmpeg_command(
ffmpeg_path: str,
input_path: Path,
output_path: Path,
fmt: str,
max_width: int,
metadata: Optional[dict[str, str]] = None,
) -> list[str]:
cmd = [ffmpeg_path, "-y", "-i", str(input_path)]
if fmt in {"mp4", "webm"} and max_width and max_width > 0:
cmd.extend(["-vf", f"scale='min({max_width},iw)':-2"])
if metadata:
for key, value in metadata.items():
cmd.extend(["-metadata", f"{key}={value}"])
# Video formats
if fmt == "mp4":
cmd.extend([
"-c:v",
"libx265",
"-preset",
"medium",
"-crf",
"26",
"-tag:v",
"hvc1",
"-pix_fmt",
"yuv420p",
"-c:a",
"aac",
"-b:a",
"192k",
"-movflags",
"+faststart",
])
elif fmt == "webm":
cmd.extend([
"-c:v",
"libvpx-vp9",
"-b:v",
"0",
"-crf",
"32",
"-c:a",
"libopus",
"-b:a",
"160k",
])
cmd.extend(["-f", "webm"])
# Audio formats
elif fmt == "mp3":
cmd.extend([
"-vn",
"-c:a",
"libmp3lame",
"-b:a",
"192k",
])
cmd.extend(["-f", "mp3"])
elif fmt == "flac":
cmd.extend([
"-vn",
"-c:a",
"flac",
])
cmd.extend(["-f", "flac"])
elif fmt == "wav":
cmd.extend([
"-vn",
"-c:a",
"pcm_s16le",
])
cmd.extend(["-f", "wav"])
elif fmt == "aac":
cmd.extend([
"-vn",
"-c:a",
"aac",
"-b:a",
"192k",
])
cmd.extend(["-f", "adts"])
elif fmt == "m4a":
cmd.extend([
"-vn",
"-c:a",
"aac",
"-b:a",
"192k",
])
cmd.extend(["-f", "ipod"])
elif fmt == "ogg":
cmd.extend([
"-vn",
"-c:a",
"libvorbis",
"-b:a",
"192k",
])
cmd.extend(["-f", "ogg"])
elif fmt == "opus":
cmd.extend([
"-vn",
"-c:a",
"libopus",
"-b:a",
"192k",
])
cmd.extend(["-f", "opus"])
elif fmt == "audio":
# Legacy format name for mp3
cmd.extend([
"-vn",
"-c:a",
"libmp3lame",
"-b:a",
"192k",
])
cmd.extend(["-f", "mp3"])
elif fmt != "copy":
raise ValueError(f"Unsupported format: {fmt}")
cmd.append(str(output_path))
return cmd
+7 -1
View File
@@ -633,7 +633,13 @@ class ProgressFileReader:
min_interval_s: float = 0.25,
):
self._f = fileobj
self._total = int(total_bytes) if total_bytes not in (None, 0, "") else 0
if total_bytes is None:
self._total = 0
else:
try:
self._total = int(total_bytes)
except Exception:
self._total = 0
self._label = str(label or "upload")
self._min_interval_s = max(0.05, float(min_interval_s))
self._bar = ProgressBar()
+1 -1
View File
@@ -4,7 +4,7 @@ import importlib
import os
import subprocess
import sys
from typing import Any, Dict, Iterable, List, Optional, Tuple
from typing import Any, Dict, List, Tuple
from SYS.logger import log
from SYS.rich_display import stdout_console
+1735 -29
View File
File diff suppressed because it is too large Load Diff
+1 -2
View File
@@ -18,8 +18,7 @@ so authors don't have to install pandas/bs4 unless they want to.
"""
from __future__ import annotations
from typing import Any, Dict, List, Optional
from urllib.parse import quote_plus
from typing import List, Optional
from API.HTTP import HTTPClient
from ProviderCore.base import SearchResult
+33 -29
View File
@@ -16,7 +16,6 @@ from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Callable, Set
from pathlib import Path
import json
import shutil
from rich.box import SIMPLE
from rich.console import Group
@@ -34,12 +33,15 @@ except ImportError:
TEXTUAL_AVAILABLE = False
# Import ResultModel from the API for unification
try:
from SYS.result_table_api import ResultModel
except ImportError:
# Fallback if not available yet in directory structure (unlikely)
ResultModel = None
# Import ResultModel from the API for typing; avoid runtime redefinition issues
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from SYS.result_table_api import ResultModel # type: ignore
else:
ResultModel = None # type: ignore[assignment]
# Reuse the existing format_bytes helper under a clearer alias
from SYS.utils import format_bytes as format_mb
def _sanitize_cell_text(value: Any) -> str:
@@ -159,6 +161,8 @@ def extract_hash_value(item: Any) -> str:
def extract_title_value(item: Any) -> str:
data = _as_dict(item) or {}
if not isinstance(data, dict):
data = {}
title = _get_first_dict_value(data, ["title", "name", "filename"])
if not title:
title = _get_first_dict_value(
@@ -172,9 +176,11 @@ def extract_title_value(item: Any) -> str:
def extract_ext_value(item: Any) -> str:
data = _as_dict(item) or {}
if not isinstance(data, dict):
data = {}
meta = data.get("metadata") if isinstance(data.get("metadata"),
dict) else {}
_md = data.get("metadata")
meta: Dict[str, Any] = _md if isinstance(_md, dict) else {}
raw_path = data.get("path") or data.get("target") or data.get(
"filename"
) or data.get("title")
@@ -207,8 +213,10 @@ def extract_ext_value(item: Any) -> str:
def extract_size_bytes_value(item: Any) -> Optional[int]:
data = _as_dict(item) or {}
meta = data.get("metadata") if isinstance(data.get("metadata"),
dict) else {}
if not isinstance(data, dict):
data = {}
_md = data.get("metadata")
meta: Dict[str, Any] = _md if isinstance(_md, dict) else {}
size_val = _get_first_dict_value(
data,
@@ -750,7 +758,7 @@ class Table:
row.payload = result
# Handle ResultModel from the new strict API (SYS/result_table_api.py)
if ResultModel and isinstance(result, ResultModel):
if ResultModel is not None and isinstance(result, ResultModel):
self._add_result_model(row, result)
# Handle TagItem from get_tag.py (tag display with index)
elif hasattr(result, "__class__") and result.__class__.__name__ == "TagItem":
@@ -1574,7 +1582,7 @@ class Table:
return None
# Remaining parts are cmdlet arguments
cmdlet_args = {}
cmdlet_args: dict[str, Any] = {}
i = 1
while i < len(parts):
part = parts[i]
@@ -1678,7 +1686,7 @@ class Table:
try:
int(value)
except ValueError:
print(f"Must be an integer")
print("Must be an integer")
continue
return value
@@ -1907,7 +1915,7 @@ def extract_item_metadata(item: Any) -> Dict[str, Any]:
out = {}
# Handle ResultModel specifically for better detail display
if ResultModel and isinstance(item, ResultModel):
if ResultModel is not None and isinstance(item, ResultModel):
if item.title: out["Title"] = item.title
if item.path: out["Path"] = item.path
if item.ext: out["Ext"] = item.ext
@@ -1965,34 +1973,30 @@ def extract_item_metadata(item: Any) -> Dict[str, Any]:
if e: out["Ext"] = e
size = extract_size_bytes_value(item)
if size:
out["Size"] = size
if size is not None:
out["Size"] = format_mb(size)
else:
s = data.get("size") or data.get("size_bytes")
if s: out["Size"] = s
if s is not None:
out["Size"] = str(s)
# Duration
dur = _get_first_dict_value(data, ["duration_seconds", "duration"])
if dur:
out["Duration"] = _format_duration_hms(dur)
# URL
url = _get_first_dict_value(data, ["url", "URL"])
if url:
out["Url"] = url
else:
out["Url"] = None # Explicitly None for <null> display
out["Url"] = str(url) if url else ""
# Relationships
rels = _get_first_dict_value(data, ["relationships", "rel"])
if rels:
out["Relations"] = rels
else:
out["Relations"] = None
out["Relations"] = str(rels) if rels else ""
# Tags Summary
tags = _get_first_dict_value(data, ["tags", "tag"])
if tags: out["Tags"] = tags
if tags:
out["Tags"] = ", ".join([str(t) for t in (tags if isinstance(tags, (list, tuple)) else [tags])])
return out
+12 -8
View File
@@ -11,7 +11,7 @@ from __future__ import annotations
import contextlib
import sys
from typing import Any, Iterator, Sequence, TextIO
from typing import Any, Iterator, TextIO, List, Dict, Optional, Tuple, cast
from rich.console import Console
from rich.panel import Panel
@@ -81,7 +81,6 @@ def show_provider_config_panel(
) -> None:
"""Show a Rich panel explaining how to configure providers."""
from rich.table import Table as RichTable
from rich.text import Text
from rich.console import Group
if isinstance(provider_names, str):
@@ -117,7 +116,6 @@ def show_store_config_panel(
) -> None:
"""Show a Rich panel explaining how to configure storage backends."""
from rich.table import Table as RichTable
from rich.text import Text
from rich.console import Group
if isinstance(store_names, str):
@@ -152,7 +150,6 @@ def show_available_providers_panel(provider_names: List[str]) -> None:
"""Show a Rich panel listing available/configured providers."""
from rich.columns import Columns
from rich.console import Group
from rich.text import Text
if not provider_names:
return
@@ -203,8 +200,8 @@ def render_image_to_console(image_path: str | Path, max_width: int | None = None
if not path.exists() or not path.is_file():
return
with Image.open(path) as img:
img = img.convert("RGB")
with Image.open(path) as opened_img:
img = opened_img.convert("RGB")
orig_w, orig_h = img.size
# Determine target dimensions
@@ -238,14 +235,21 @@ def render_image_to_console(image_path: str | Path, max_width: int | None = None
img = img.resize((target_w, target_h), Image.Resampling.BILINEAR)
pixels = img.load()
if pixels is None:
return
# Render using upper half block (U+2580)
# Each character row in terminal represents 2 pixel rows in image.
for y in range(0, target_h - 1, 2):
line = Text()
for x in range(target_w):
r1, g1, b1 = pixels[x, y]
r2, g2, b2 = pixels[x, y + 1]
rgb1 = cast(tuple, pixels[x, y])
rgb2 = cast(tuple, pixels[x, y + 1])
try:
r1, g1, b1 = int(rgb1[0]), int(rgb1[1]), int(rgb1[2])
r2, g2, b2 = int(rgb2[0]), int(rgb2[1]), int(rgb2[2])
except Exception:
r1 = g1 = b1 = r2 = g2 = b2 = 0
# Foreground is top pixel, background is bottom pixel
line.append(
"",
+4 -5
View File
@@ -14,15 +14,14 @@ except Exception:
import os
import base64
import logging
import time
from pathlib import Path
from typing import Any, Iterable, Optional
from typing import Any, Iterable
from datetime import datetime
from dataclasses import dataclass, field
from fnmatch import fnmatch
from urllib.parse import urlparse
import SYS.utils_constant
from SYS.utils_constant import mime_maps
try:
import cbor2
@@ -141,7 +140,7 @@ def create_metadata_sidecar(file_path: Path, metadata: dict) -> None:
metadata["hash"] = sha256_file(file_path)
metadata["size"] = Path(file_path).stat().st_size
format_found = False
for mime_type, ext_map in SYS.utils_constant.mime_maps.items():
for mime_type, ext_map in mime_maps.items():
for key, info in ext_map.items():
if info.get("ext") == file_ext:
metadata["type"] = mime_type
@@ -517,7 +516,7 @@ def get_api_key(config: dict[str, Any], service: str, key_path: str) -> str | No
"""
try:
parts = key_path.split(".")
value = config
value: Any = config
for part in parts:
if isinstance(value, dict):
value = value.get(part)
+3 -1
View File
@@ -1,4 +1,6 @@
mime_maps = {
from typing import Any, Dict
mime_maps: Dict[str, Dict[str, Dict[str, Any]]] = {
"image": {
"jpg": {
"ext": ".jpg",
+347
View File
@@ -0,0 +1,347 @@
from __future__ import annotations
import atexit
import io
import sys
import uuid
from pathlib import Path
from typing import Any, Dict, Optional, Set, TextIO, Sequence
from SYS.config import get_local_storage_path
from SYS.worker_manager import WorkerManager
class WorkerOutputMirror(io.TextIOBase):
"""Mirror stdout/stderr to worker manager while preserving console output."""
def __init__(
self,
original: TextIO,
manager: WorkerManager,
worker_id: str,
channel: str,
):
self._original = original
self._manager = manager
self._worker_id = worker_id
self._channel = channel
self._pending: str = ""
def write(self, data: str) -> int: # type: ignore[override]
if not data:
return 0
self._original.write(data)
self._buffer_text(data)
return len(data)
def flush(self) -> None: # type: ignore[override]
self._original.flush()
self._flush_pending(force=True)
def isatty(self) -> bool: # pragma: no cover
return bool(getattr(self._original, "isatty", lambda: False)())
def _buffer_text(self, data: str) -> None:
combined = self._pending + data
lines = combined.splitlines(keepends=True)
if not lines:
self._pending = combined
return
if lines[-1].endswith(("\n", "\r")):
complete = lines
self._pending = ""
else:
complete = lines[:-1]
self._pending = lines[-1]
for chunk in complete:
self._emit(chunk)
def _flush_pending(self, *, force: bool = False) -> None:
if self._pending and force:
self._emit(self._pending)
self._pending = ""
def _emit(self, text: str) -> None:
if not text:
return
try:
self._manager.append_stdout(self._worker_id, text, channel=self._channel)
except Exception:
pass
@property
def encoding(self) -> str: # type: ignore[override]
return getattr(self._original, "encoding", "utf-8")
class WorkerStageSession:
"""Lifecycle helper for wrapping a CLI cmdlet execution in a worker record."""
def __init__(
self,
*,
manager: WorkerManager,
worker_id: str,
orig_stdout: TextIO,
orig_stderr: TextIO,
stdout_proxy: WorkerOutputMirror,
stderr_proxy: WorkerOutputMirror,
config: Optional[Dict[str, Any]],
logging_enabled: bool,
completion_label: str,
error_label: str,
) -> None:
self.manager = manager
self.worker_id = worker_id
self.orig_stdout = orig_stdout
self.orig_stderr = orig_stderr
self.stdout_proxy = stdout_proxy
self.stderr_proxy = stderr_proxy
self.config = config
self.logging_enabled = logging_enabled
self.closed = False
self._completion_label = completion_label
self._error_label = error_label
def close(self, *, status: str = "completed", error_msg: str = "") -> None:
if self.closed:
return
try:
self.stdout_proxy.flush()
self.stderr_proxy.flush()
except Exception:
pass
sys.stdout = self.orig_stdout
sys.stderr = self.orig_stderr
if self.logging_enabled:
try:
self.manager.disable_logging_for_worker(self.worker_id)
except Exception:
pass
try:
if status == "completed":
self.manager.log_step(self.worker_id, self._completion_label)
else:
self.manager.log_step(
self.worker_id, f"{self._error_label}: {error_msg or status}"
)
except Exception:
pass
try:
self.manager.finish_worker(
self.worker_id, result=status or "completed", error_msg=error_msg or ""
)
except Exception:
pass
if self.config and self.config.get("_current_worker_id") == self.worker_id:
self.config.pop("_current_worker_id", None)
self.closed = True
class WorkerManagerRegistry:
"""Process-wide WorkerManager cache keyed by library_root."""
_manager: Optional[WorkerManager] = None
_manager_root: Optional[Path] = None
_orphan_cleanup_done: bool = False
_registered: bool = False
@classmethod
def ensure(cls, config: Dict[str, Any]) -> Optional[WorkerManager]:
if not isinstance(config, dict):
return None
existing = config.get("_worker_manager")
if isinstance(existing, WorkerManager):
return existing
library_root = get_local_storage_path(config)
if not library_root:
return None
try:
resolved_root = Path(library_root).resolve()
except Exception:
resolved_root = Path(library_root)
try:
if cls._manager is None or cls._manager_root != resolved_root:
if cls._manager is not None:
try:
cls._manager.close()
except Exception:
pass
cls._manager = WorkerManager(resolved_root, auto_refresh_interval=0.5)
cls._manager_root = resolved_root
manager = cls._manager
config["_worker_manager"] = manager
if manager is not None and not cls._orphan_cleanup_done:
try:
manager.expire_running_workers(
older_than_seconds=120,
worker_id_prefix="cli_%",
reason="CLI session ended unexpectedly; marking worker as failed",
)
except Exception:
pass
else:
cls._orphan_cleanup_done = True
if not cls._registered:
atexit.register(cls.close)
cls._registered = True
return manager
except Exception as exc:
print(f"[worker] Could not initialize worker manager: {exc}", file=sys.stderr)
return None
@classmethod
def close(cls) -> None:
if cls._manager is None:
return
try:
cls._manager.close()
except Exception:
pass
cls._manager = None
cls._manager_root = None
cls._orphan_cleanup_done = False
class WorkerStages:
"""Factory methods for stage/pipeline worker sessions."""
@staticmethod
def _start_worker_session(
worker_manager: Optional[WorkerManager],
*,
worker_type: str,
title: str,
description: str,
pipe_text: str,
config: Optional[Dict[str, Any]],
completion_label: str,
error_label: str,
skip_logging_for: Optional[Set[str]] = None,
session_worker_ids: Optional[Set[str]] = None,
) -> Optional[WorkerStageSession]:
if worker_manager is None:
return None
if skip_logging_for and worker_type in skip_logging_for:
return None
safe_type = worker_type or "cmd"
worker_id = f"cli_{safe_type[:8]}_{uuid.uuid4().hex[:6]}"
try:
tracked = worker_manager.track_worker(
worker_id,
worker_type=worker_type,
title=title,
description=description or "(no args)",
pipe=pipe_text,
)
if not tracked:
return None
except Exception as exc:
print(f"[worker] Failed to track {worker_type}: {exc}", file=sys.stderr)
return None
if session_worker_ids is not None:
session_worker_ids.add(worker_id)
logging_enabled = False
try:
handler = worker_manager.enable_logging_for_worker(worker_id)
logging_enabled = handler is not None
except Exception:
logging_enabled = False
orig_stdout = sys.stdout
orig_stderr = sys.stderr
stdout_proxy = WorkerOutputMirror(orig_stdout, worker_manager, worker_id, "stdout")
stderr_proxy = WorkerOutputMirror(orig_stderr, worker_manager, worker_id, "stderr")
sys.stdout = stdout_proxy
sys.stderr = stderr_proxy
if isinstance(config, dict):
config["_current_worker_id"] = worker_id
try:
worker_manager.log_step(worker_id, f"Started {worker_type}")
except Exception:
pass
return WorkerStageSession(
manager=worker_manager,
worker_id=worker_id,
orig_stdout=orig_stdout,
orig_stderr=orig_stderr,
stdout_proxy=stdout_proxy,
stderr_proxy=stderr_proxy,
config=config,
logging_enabled=logging_enabled,
completion_label=completion_label,
error_label=error_label,
)
@classmethod
def begin_stage(
cls,
worker_manager: Optional[WorkerManager],
*,
cmd_name: str,
stage_tokens: Sequence[str],
config: Optional[Dict[str, Any]],
command_text: str,
) -> Optional[WorkerStageSession]:
description = " ".join(stage_tokens[1:]) if len(stage_tokens) > 1 else "(no args)"
session_worker_ids = None
if isinstance(config, dict):
session_worker_ids = config.get("_session_worker_ids")
return cls._start_worker_session(
worker_manager,
worker_type=cmd_name,
title=f"{cmd_name} stage",
description=description,
pipe_text=command_text,
config=config,
completion_label="Stage completed",
error_label="Stage error",
skip_logging_for={".worker", "worker", "workers"},
session_worker_ids=session_worker_ids,
)
@classmethod
def begin_pipeline(
cls,
worker_manager: Optional[WorkerManager],
*,
pipeline_text: str,
config: Optional[Dict[str, Any]],
) -> Optional[WorkerStageSession]:
session_worker_ids: Set[str] = set()
if isinstance(config, dict):
config["_session_worker_ids"] = session_worker_ids
return cls._start_worker_session(
worker_manager,
worker_type="pipeline",
title="Pipeline run",
description=pipeline_text,
pipe_text=pipeline_text,
config=config,
completion_label="Pipeline completed",
error_label="Pipeline error",
session_worker_ids=session_worker_ids,
)
+16 -16
View File
@@ -47,8 +47,8 @@ class Worker:
self.details = ""
self.error_message = ""
self.result = "pending"
self._stdout_buffer = []
self._steps_buffer = []
self._stdout_buffer: list[str] = []
self._steps_buffer: list[str] = []
def log_step(self, step_text: str) -> None:
"""Log a step for this worker.
@@ -108,18 +108,26 @@ class Worker:
logger.error(f"Error getting steps for worker {self.id}: {e}")
return ""
def update_progress(self, progress: str = "", details: str = "") -> None:
def update_progress(self, progress: float | str = 0.0, details: str = "") -> None:
"""Update worker progress.
Args:
progress: Progress string (e.g., "50%")
progress: Progress value (float) or textual like "50%"; will be coerced to float
details: Additional details
"""
self.progress = progress
self.progress = str(progress)
self.details = details
try:
if self.manager:
self.manager.update_worker(self.id, progress, details)
# Normalize to a float value for the manager API (0-100)
try:
if isinstance(progress, str) and progress.endswith('%'):
progress_value = float(progress.rstrip('%'))
else:
progress_value = float(progress)
except Exception:
progress_value = 0.0
self.manager.update_worker(self.id, progress_value, details)
except Exception as e:
logger.error(f"Error updating worker {self.id}: {e}")
@@ -165,7 +173,7 @@ class WorkerLoggingHandler(logging.StreamHandler):
self.db = db
self.manager = manager
self.buffer_size = buffer_size
self.buffer = []
self.buffer: list[str] = []
self._lock = Lock()
# Set a format that includes timestamp and level
@@ -278,14 +286,6 @@ class WorkerManager:
self._stdout_flush_bytes = 4096
self._stdout_flush_interval = 0.75
def close(self) -> None:
"""Close the database connection."""
if self.db:
try:
with self._db_lock:
self.db.close()
except Exception:
pass
def __enter__(self):
"""Context manager entry."""
@@ -478,7 +478,7 @@ class WorkerManager:
True if update was successful
"""
try:
kwargs = {}
kwargs: dict[str, Any] = {}
if progress > 0:
kwargs["progress"] = progress
if current_step:
+11 -9
View File
@@ -4,12 +4,13 @@ import json
import re
import shutil
import sys
from fnmatch import fnmatch, translate
from fnmatch import fnmatch
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from SYS.logger import debug, log
from SYS.utils import sha256_file, expand_path
from SYS.config import get_local_storage_path
from Store._base import Store
@@ -56,7 +57,7 @@ class Folder(Store):
""""""
# Track which locations have already been migrated to avoid repeated migrations
_migrated_locations = set()
_migrated_locations: set[str] = set()
# Cache scan results to avoid repeated full scans across repeated instantiations
_scan_cache: Dict[str,
Tuple[bool,
@@ -65,7 +66,7 @@ class Folder(Store):
int]]] = {}
@classmethod
def config(cls) -> List[Dict[str, Any]]:
def config_schema(cls) -> List[Dict[str, Any]]:
return [
{
"key": "NAME",
@@ -177,7 +178,7 @@ class Folder(Store):
Checks for sidecars (.metadata, .tag) and imports them before renaming.
Also ensures all files have a title: tag.
"""
from API.folder import API_folder_store, read_sidecar, write_sidecar, find_sidecar
from API.folder import API_folder_store, read_sidecar, find_sidecar
try:
with API_folder_store(location_path) as db:
@@ -1498,11 +1499,12 @@ class Folder(Store):
debug(f"Failed to get file for hash {file_hash}: {exc}")
return None
def get_metadata(self, file_hash: str) -> Optional[Dict[str, Any]]:
def get_metadata(self, file_hash: str, **kwargs: Any) -> Optional[Dict[str, Any]]:
"""Get metadata for a file from the database by hash.
Args:
file_hash: SHA256 hash of the file (64-char hex string)
**kwargs: Additional options
Returns:
Dict with metadata fields (ext, size, hash, duration, etc.) or None if not found
@@ -1613,7 +1615,7 @@ class Folder(Store):
debug(f"get_tags failed for local file: {exc}")
return [], "unknown"
def add_tag(self, hash: str, tag: List[str], **kwargs: Any) -> bool:
def add_tag(self, file_identifier: str, tags: List[str], **kwargs: Any) -> bool:
"""Add tags to a local file by hash (via API_folder_store).
Handles namespace collapsing: when adding namespace:value, removes existing namespace:* tags.
@@ -1628,14 +1630,14 @@ class Folder(Store):
try:
with API_folder_store(Path(self._location)) as db:
existing_tags = [
t for t in (db.get_tags(hash) or [])
t for t in (db.get_tags(file_identifier) or [])
if isinstance(t, str) and t.strip()
]
from SYS.metadata import compute_namespaced_tag_overwrite
_to_remove, _to_add, merged = compute_namespaced_tag_overwrite(
existing_tags, tag or []
existing_tags, tags or []
)
if not _to_remove and not _to_add:
return True
@@ -1644,7 +1646,7 @@ class Folder(Store):
# To enforce lowercase-only tags and namespace overwrites, rewrite the full tag set.
cursor = db.connection.cursor()
cursor.execute("DELETE FROM tag WHERE hash = ?",
(hash,
(file_identifier,
))
for t in merged:
t = str(t).strip().lower()
+66 -7
View File
@@ -30,7 +30,7 @@ class HydrusNetwork(Store):
"""
@classmethod
def config(cls) -> List[Dict[str, Any]]:
def config_schema(cls) -> List[Dict[str, Any]]:
return [
{
"key": "NAME",
@@ -723,6 +723,10 @@ class HydrusNetwork(Store):
if text:
pattern_hints.append(text)
pattern_hint = pattern_hints[0] if pattern_hints else ""
hashes: list[str] = []
file_ids: list[int] = []
if ":" in query_lower and not query_lower.startswith(":"):
namespace, pattern = query_lower.split(":", 1)
namespace = namespace.strip().lower()
@@ -765,8 +769,8 @@ class HydrusNetwork(Store):
response = client._perform_request(
spec
) # type: ignore[attr-defined]
hashes: list[str] = []
file_ids: list[int] = []
hashes = []
file_ids = []
if isinstance(response, dict):
raw_hashes = response.get("hashes") or response.get(
"file_hashes"
@@ -870,11 +874,11 @@ class HydrusNetwork(Store):
freeform_predicates = [f"{query_lower}*"]
# Search files with the tags (unless url: search already produced metadata)
results = []
results: list[dict[str, Any]] = []
if metadata_list is None:
file_ids: list[int] = []
hashes: list[str] = []
file_ids = []
hashes = []
if freeform_union_search:
if not title_predicates and not freeform_predicates:
@@ -929,7 +933,7 @@ class HydrusNetwork(Store):
# Fast path: ext-only search. Avoid fetching metadata for an unbounded
# system:everything result set; fetch in chunks until we have enough.
if ext_only and ext_filter:
results: list[dict[str, Any]] = []
results = []
if not file_ids and not hashes:
debug(f"{prefix} 0 result(s)")
return []
@@ -1894,6 +1898,61 @@ class HydrusNetwork(Store):
debug(f"{self._log_prefix()} add_url_bulk failed: {exc}")
return False
def add_tags_bulk(self, items: List[tuple[str, List[str]]], *, service_name: str | None = None) -> bool:
"""Bulk add tags to multiple Hydrus files.
Groups files by identical tag-sets and uses the Hydrus `mutate_tags_by_key`
call (when a service key is available) to reduce the number of API calls.
Falls back to per-hash `add_tag` calls if necessary.
"""
try:
client = self._client
if client is None:
debug(f"{self._log_prefix()} add_tags_bulk: client unavailable")
return False
# Group by canonical tag set (sorted tuple) to batch identical additions
buckets: dict[tuple[str, ...], list[str]] = {}
for file_identifier, tags in items or []:
h = str(file_identifier or "").strip().lower()
if len(h) != 64:
continue
tlist = [str(t).strip().lower() for t in (tags or []) if isinstance(t, str) and str(t).strip()]
if not tlist:
continue
key = tuple(sorted(tlist))
buckets.setdefault(key, []).append(h)
if not buckets:
return False
svc = service_name or "my tags"
service_key = self._get_service_key(svc)
any_success = False
for tag_tuple, hashes in buckets.items():
try:
if service_key:
# Mutate tags for many hashes in a single request
client.mutate_tags_by_key(hash=hashes, service_key=service_key, add_tags=list(tag_tuple))
any_success = True
continue
except Exception as exc:
debug(f"{self._log_prefix()} add_tags_bulk mutate failed for tags {tag_tuple}: {exc}")
# Fallback: apply per-hash add_tag
for h in hashes:
try:
client.add_tag(h, list(tag_tuple), svc)
any_success = True
except Exception:
continue
return any_success
except Exception as exc:
debug(f"{self._log_prefix()} add_tags_bulk failed: {exc}")
return False
def delete_url(self, file_identifier: str, url: List[str], **kwargs: Any) -> bool:
"""Delete one or more url from a Hydrus file."""
try:
+53 -14
View File
@@ -20,9 +20,6 @@ Notes:
from __future__ import annotations
import json
import sys
import time
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
@@ -33,7 +30,7 @@ from Store._base import Store
class ZeroTier(Store):
@classmethod
def config(cls) -> List[Dict[str, Any]]:
def config_schema(cls) -> List[Dict[str, Any]]:
return [
{"key": "NAME", "label": "Store Name", "default": "", "required": True},
{"key": "NETWORK_ID", "label": "ZeroTier Network ID", "default": "", "required": True},
@@ -355,7 +352,6 @@ class ZeroTier(Store):
Returns the file hash on success, or None on failure.
"""
from SYS.utils import sha256_file
p = Path(file_path)
if not p.exists():
@@ -404,17 +400,60 @@ class ZeroTier(Store):
data.append(("url", u))
files = {"file": (p.name, fh, "application/octet-stream")}
resp = httpx.post(url, headers=headers, files=files, data=data, timeout=self._timeout)
resp.raise_for_status()
if resp.status_code in (200, 201):
# Prefer `requests` for local testing / WSGI servers which may not accept
# chunked uploads reliably with httpx/httpcore. Fall back to httpx otherwise.
try:
try:
payload = resp.json()
file_hash = payload.get("hash") or payload.get("file_hash")
return file_hash
except Exception:
import requests
# Convert data list-of-tuples to dict for requests (acceptable for repeated fields)
data_dict = {}
for k, v in data:
if k in data_dict:
existing = data_dict[k]
if not isinstance(existing, list):
data_dict[k] = [existing]
data_dict[k].append(v)
else:
data_dict[k] = v
r = requests.post(url, headers=headers, files=files, data=data_dict or None, timeout=self._timeout)
if r.status_code in (200, 201):
try:
payload = r.json()
file_hash = payload.get("hash") or payload.get("file_hash")
return file_hash
except Exception:
return None
try:
debug(f"[zerotier-debug] upload failed (requests) status={r.status_code} body={r.text}")
except Exception:
pass
debug(f"ZeroTier add_file failed (requests): status {r.status_code} body={getattr(r, 'text', '')}")
return None
debug(f"ZeroTier add_file failed: status {resp.status_code}")
return None
except Exception:
import httpx
resp = httpx.post(url, headers=headers, files=files, data=data, timeout=self._timeout)
# Note: some environments may not create request.files correctly; capture body for debugging
try:
if resp.status_code in (200, 201):
try:
payload = resp.json()
file_hash = payload.get("hash") or payload.get("file_hash")
return file_hash
except Exception:
return None
# Debug output to help tests capture server response
try:
debug(f"[zerotier-debug] upload failed status={resp.status_code} body={resp.text}")
except Exception:
pass
debug(f"ZeroTier add_file failed: status {resp.status_code} body={getattr(resp, 'text', '')}")
return None
except Exception as exc:
debug(f"ZeroTier add_file exception: {exc}")
return None
except Exception as exc:
debug(f"ZeroTier add_file exception: {exc}")
return None
except Exception as exc:
debug(f"ZeroTier add_file exception: {exc}")
return None
+1 -1
View File
@@ -13,7 +13,7 @@ from typing import Any, Dict, List, Optional, Tuple
class Store(ABC):
@classmethod
def config(cls) -> List[Dict[str, Any]]:
def config_schema(cls) -> List[Dict[str, Any]]:
"""Return configuration schema for this store.
Returns a list of dicts:
+4 -5
View File
@@ -15,8 +15,7 @@ import importlib
import inspect
import pkgutil
import re
from pathlib import Path
from typing import Any, Dict, Iterable, Optional, Type
from typing import Any, Dict, Optional, Type
from SYS.logger import debug
from SYS.utils import expand_path
@@ -92,10 +91,10 @@ def _discover_store_classes() -> Dict[str, Type[BaseStore]]:
def _required_keys_for(store_cls: Type[BaseStore]) -> list[str]:
# Support new config() schema
if hasattr(store_cls, "config") and callable(store_cls.config):
# Support new config_schema() schema
if hasattr(store_cls, "config_schema") and callable(store_cls.config_schema):
try:
schema = store_cls.config()
schema = store_cls.config_schema()
keys = []
if isinstance(schema, list):
for field in schema:
-1
View File
@@ -6,7 +6,6 @@ import json
import re
import sys
import subprocess
import asyncio
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple
+1 -1
View File
@@ -5,7 +5,7 @@ from __future__ import annotations
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Sequence
from typing import Dict, Iterable, List, Sequence
BASE_DIR = Path(__file__).resolve().parent
ROOT_DIR = BASE_DIR.parent
+9 -12
View File
@@ -1,12 +1,9 @@
from textual.app import ComposeResult
from textual.screen import ModalScreen
from textual.containers import Container, Horizontal, Vertical, ScrollableContainer
from textual.widgets import Static, Button, Input, Label, ListView, ListItem, Rule, OptionList, Footer, Select
from textual.widgets import Static, Button, Input, Label, ListView, ListItem, Rule, Select
from textual import on, work
from textual.message import Message
from typing import Dict, Any, List, Optional
import os
import json
from typing import Any
from pathlib import Path
from SYS.config import load_config, save_config, global_config
@@ -383,7 +380,7 @@ class ConfigModal(ModalScreen):
if stype in classes:
cls = classes[stype]
if hasattr(cls, "config") and callable(cls.config):
for field_def in cls.config():
for field_def in cls.config_schema():
k = field_def.get("key")
if k:
provider_schema_map[k.upper()] = field_def
@@ -398,7 +395,7 @@ class ConfigModal(ModalScreen):
try:
pcls = get_provider_class(item_name)
if pcls and hasattr(pcls, "config") and callable(pcls.config):
for field_def in pcls.config():
for field_def in pcls.config_schema():
k = field_def.get("key")
if k:
provider_schema_map[k.upper()] = field_def
@@ -670,7 +667,7 @@ class ConfigModal(ModalScreen):
for stype, cls in all_classes.items():
if hasattr(cls, "config") and callable(cls.config):
try:
if cls.config():
if cls.config_schema():
options.append(stype)
except Exception:
pass
@@ -683,7 +680,7 @@ class ConfigModal(ModalScreen):
pcls = get_provider_class(ptype)
if pcls and hasattr(pcls, "config") and callable(pcls.config):
try:
if pcls.config():
if pcls.config_schema():
options.append(ptype)
except Exception:
pass
@@ -859,7 +856,7 @@ class ConfigModal(ModalScreen):
cls = classes[stype]
# Use schema for defaults if present
if hasattr(cls, "config") and callable(cls.config):
for field_def in cls.config():
for field_def in cls.config_schema():
key = field_def.get("key")
if key:
val = field_def.get("default", "")
@@ -893,7 +890,7 @@ class ConfigModal(ModalScreen):
if pcls:
# Use schema for defaults
if hasattr(pcls, "config") and callable(pcls.config):
for field_def in pcls.config():
for field_def in pcls.config_schema():
key = field_def.get("key")
if key:
new_config[key] = field_def.get("default", "")
@@ -991,7 +988,7 @@ class ConfigModal(ModalScreen):
if pcls:
# Collect required keys from schema
if hasattr(pcls, "config") and callable(pcls.config):
for field_def in pcls.config():
for field_def in pcls.config_schema():
if field_def.get("required"):
k = field_def.get("key")
if k and k not in required_keys:
+5 -8
View File
@@ -9,11 +9,10 @@ This modal allows users to specify:
from textual.app import ComposeResult
from textual.screen import ModalScreen
from textual.containers import Container, Horizontal, Vertical, ScrollableContainer
from textual.containers import Container, Horizontal, Vertical
from textual.widgets import (
Static,
Button,
Label,
Select,
Checkbox,
TextArea,
@@ -448,8 +447,6 @@ class DownloadModal(ModalScreen):
try:
# Capture output from the cmdlet using temp files (more reliable than redirect)
import tempfile
import subprocess
# Try normal redirect first
import io
@@ -461,7 +458,7 @@ class DownloadModal(ModalScreen):
# Always capture output
try:
with redirect_stdout(stdout_buf), redirect_stderr(stderr_buf):
logger.info(f"Calling download_cmdlet...")
logger.info("Calling download_cmdlet...")
cmd_config = (
dict(self.config)
if isinstance(self.config,
@@ -637,7 +634,7 @@ class DownloadModal(ModalScreen):
# Also append detailed error info to worker stdout for visibility
if worker:
worker.append_stdout(f"\n❌ DOWNLOAD FAILED\n")
worker.append_stdout("\n❌ DOWNLOAD FAILED\n")
worker.append_stdout(f"Reason: {error_reason}\n")
if stderr_text and stderr_text.strip():
worker.append_stdout(
@@ -1169,7 +1166,7 @@ class DownloadModal(ModalScreen):
url.endswith(".pdf") or "pdf" in url.lower() for url in url
)
if all_pdfs:
logger.info(f"All url are PDFs - creating pseudo-playlist")
logger.info("All url are PDFs - creating pseudo-playlist")
self._handle_pdf_playlist(url)
return
@@ -1646,7 +1643,7 @@ class DownloadModal(ModalScreen):
break
if not json_line:
logger.error(f"No JSON found in get-tag output")
logger.error("No JSON found in get-tag output")
logger.debug(f"Raw output: {output}")
try:
self.app.call_from_thread(
+4 -8
View File
@@ -3,20 +3,16 @@
from textual.app import ComposeResult
from textual.screen import ModalScreen
from textual.containers import Container, Horizontal, Vertical
from textual.widgets import Static, Button, Input, TextArea, Tree, Select
from textual.widgets import Static, Button, Input, TextArea, Select
from textual.binding import Binding
import logging
from typing import Optional, Any
from typing import Optional
from pathlib import Path
import json
import sys
import subprocess
from datetime import datetime
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
from SYS.utils import format_metadata_value
from SYS.config import load_config
logger = logging.getLogger(__name__)
@@ -147,7 +143,7 @@ class ExportModal(ModalScreen):
if not metadata:
logger.info(
f"_get_metadata_text - No metadata found, returning 'No metadata available'"
"_get_metadata_text - No metadata found, returning 'No metadata available'"
)
return "No metadata available"
@@ -184,7 +180,7 @@ class ExportModal(ModalScreen):
)
return "\n".join(lines)
else:
logger.info(f"_get_metadata_text - No matching fields found in metadata")
logger.info("_get_metadata_text - No matching fields found in metadata")
return "No metadata available"
def compose(self) -> ComposeResult:
+5 -2
View File
@@ -2,7 +2,7 @@
from textual.app import ComposeResult
from textual.screen import ModalScreen
from textual.containers import Container, Horizontal, Vertical
from textual.containers import Horizontal, Vertical
from textual.widgets import Static, Button, Input, Select, DataTable, TextArea
from textual.binding import Binding
from textual.message import Message
@@ -153,6 +153,9 @@ class SearchModal(ModalScreen):
return
source = self.source_select.value
if not source or not isinstance(source, str):
logger.warning("[search-modal] No source selected")
return
# Clear existing results
self.results_table.clear(columns=True)
@@ -363,7 +366,7 @@ class SearchModal(ModalScreen):
tags_text = "\n".join(tags)
self.tags_textarea.text = tags_text
logger.info(f"[search-modal] Populated tags textarea from result")
logger.info("[search-modal] Populated tags textarea from result")
async def _download_book(self, result: Any) -> None:
"""Download a book from OpenLibrary using the provider."""
+2 -2
View File
@@ -1,8 +1,8 @@
from textual.app import ComposeResult
from textual.screen import ModalScreen
from textual.containers import Container, ScrollableContainer
from textual.widgets import Static, Button, Label
from typing import List, Callable
from textual.widgets import Static, Button
from typing import List
class SelectionModal(ModalScreen[str]):
"""A modal for selecting a type from a list of strings."""
+5 -5
View File
@@ -238,7 +238,7 @@ class WorkersModal(ModalScreen):
"---",
"No workers running"
)
logger.debug(f"[workers-modal] No running workers to display")
logger.debug("[workers-modal] No running workers to display")
return
logger.debug(
@@ -319,7 +319,7 @@ class WorkersModal(ModalScreen):
"---",
"No finished workers"
)
logger.debug(f"[workers-modal] No finished workers to display")
logger.debug("[workers-modal] No finished workers to display")
return
logger.info(
@@ -399,7 +399,7 @@ class WorkersModal(ModalScreen):
workers_list = None
if event.control == self.running_table:
workers_list = self.running_workers
logger.debug(f"[workers-modal] Highlighted in running table")
logger.debug("[workers-modal] Highlighted in running table")
elif event.control == self.finished_table:
workers_list = self.finished_workers
logger.debug(
@@ -442,7 +442,7 @@ class WorkersModal(ModalScreen):
workers_list = None
if event.data_table == self.running_table:
workers_list = self.running_workers
logger.debug(f"[workers-modal] Cell highlighted in running table")
logger.debug("[workers-modal] Cell highlighted in running table")
elif event.data_table == self.finished_table:
workers_list = self.finished_workers
logger.debug(
@@ -502,7 +502,7 @@ class WorkersModal(ModalScreen):
self.stdout_display.cursor_location = (len(combined_text) - 1, 0)
except Exception:
pass
logger.info(f"[workers-modal] Updated stdout display successfully")
logger.info("[workers-modal] Updated stdout display successfully")
except Exception as e:
logger.error(
f"[workers-modal] Error updating stdout display: {e}",
+4 -8
View File
@@ -22,13 +22,9 @@ for path in (ROOT_DIR, BASE_DIR):
sys.path.insert(0, str_path)
from SYS import pipeline as ctx
# Lazily import CLI dependencies to avoid import-time failures in test environments
try:
from CLI import ConfigLoader, PipelineExecutor as CLIPipelineExecutor, WorkerManagerRegistry
except Exception:
ConfigLoader = None
CLIPipelineExecutor = None
WorkerManagerRegistry = None
from CLI import ConfigLoader
from SYS.pipeline import PipelineExecutor
from SYS.worker import WorkerManagerRegistry
from SYS.logger import set_debug
from SYS.rich_display import capture_rich_output
from SYS.result_table import Table
@@ -89,7 +85,7 @@ class PipelineRunner:
if executor is not None:
self._executor = executor
else:
self._executor = CLIPipelineExecutor(config_loader=self._config_loader) if CLIPipelineExecutor else None
self._executor = PipelineExecutor(config_loader=self._config_loader)
self._worker_manager = None
@property
+77 -9
View File
@@ -499,6 +499,9 @@ class Add_File(Cmdlet):
pending_url_associations: Dict[str,
List[tuple[str,
List[str]]]] = {}
pending_tag_associations: Dict[str,
List[tuple[str,
List[str]]]] = {}
successes = 0
failures = 0
@@ -612,6 +615,8 @@ class Add_File(Cmdlet):
collect_relationship_pairs=pending_relationship_pairs,
defer_url_association=defer_url_association,
pending_url_associations=pending_url_associations,
defer_tag_association=defer_url_association,
pending_tag_associations=pending_tag_associations,
suppress_last_stage_overlay=want_final_search_file,
auto_search_file=auto_search_file_after_add,
store_instance=storage_registry,
@@ -664,6 +669,17 @@ class Add_File(Cmdlet):
except Exception:
pass
# Apply deferred tag associations (bulk) if collected
if pending_tag_associations:
try:
Add_File._apply_pending_tag_associations(
pending_tag_associations,
config,
store_instance=storage_registry
)
except Exception:
pass
# Always end add-file -store (when last stage) by showing item detail panels.
# Legacy search-file refresh is no longer used for final display.
if want_final_search_file and collected_payloads:
@@ -1854,6 +1870,10 @@ class Add_File(Cmdlet):
pending_url_associations: Optional[Dict[str,
List[tuple[str,
List[str]]]]] = None,
defer_tag_association: bool = False,
pending_tag_associations: Optional[Dict[str,
List[tuple[str,
List[str]]]]] = None,
suppress_last_stage_overlay: bool = False,
auto_search_file: bool = True,
store_instance: Optional[Store] = None,
@@ -2072,15 +2092,22 @@ class Add_File(Cmdlet):
resolved_hash = chosen_hash
if hydrus_like_backend and tags:
try:
adder = getattr(backend, "add_tag", None)
if callable(adder):
debug(
f"[add-file] Applying {len(tags)} tag(s) post-upload to Hydrus"
)
adder(resolved_hash, list(tags))
except Exception as exc:
log(f"[add-file] Hydrus post-upload tagging failed: {exc}", file=sys.stderr)
# Support deferring tag application for batching bulk operations
if defer_tag_association and pending_tag_associations is not None:
try:
pending_tag_associations.setdefault(str(backend_name), []).append((str(resolved_hash), list(tags)))
except Exception:
pass
else:
try:
adder = getattr(backend, "add_tag", None)
if callable(adder):
debug(
f"[add-file] Applying {len(tags)} tag(s) post-upload to Hydrus"
)
adder(resolved_hash, list(tags))
except Exception as exc:
log(f"[add-file] Hydrus post-upload tagging failed: {exc}", file=sys.stderr)
# If we have url(s), ensure they get associated with the destination file.
# This mirrors `add-url` behavior but avoids emitting extra pipeline noise.
@@ -2322,6 +2349,47 @@ class Add_File(Cmdlet):
except Exception:
continue
@staticmethod
def _apply_pending_tag_associations(
pending: Dict[str,
List[tuple[str,
List[str]]]],
config: Dict[str,
Any],
store_instance: Optional[Store] = None,
) -> None:
"""Apply deferred tag associations in bulk, grouped per backend."""
try:
store = store_instance if store_instance is not None else Store(config)
except Exception:
return
for backend_name, pairs in (pending or {}).items():
if not pairs:
continue
try:
backend = store[backend_name]
except Exception:
continue
# Try bulk variant first
bulk = getattr(backend, "add_tags_bulk", None)
if callable(bulk):
try:
bulk([(h, t) for h, t in pairs])
continue
except Exception:
pass
single = getattr(backend, "add_tag", None)
if callable(single):
for h, t in pairs:
try:
single(h, t)
except Exception:
continue
@staticmethod
def _load_sidecar_bundle(
media_path: Path,
+2 -2
View File
@@ -1097,7 +1097,7 @@ def _run(result: Any, _args: Sequence[str], config: Dict[str, Any]) -> int:
]
if not relationship_tags:
log(f"No relationship tags found in sidecar", file=sys.stderr)
log("No relationship tags found in sidecar", file=sys.stderr)
return 0 # Not an error, just nothing to do
# Get the file hash from result (should have been set by add-file)
@@ -1166,7 +1166,7 @@ def _run(result: Any, _args: Sequence[str], config: Dict[str, Any]) -> int:
)
return 0
elif error_count == 0:
log(f"No relationships to set", file=sys.stderr)
log("No relationships to set", file=sys.stderr)
return 0 # Success with nothing to do
else:
log(f"Failed with {error_count} error(s)", file=sys.stderr)
+1 -1
View File
@@ -1,6 +1,6 @@
from __future__ import annotations
from typing import Any, Dict, List, Optional, Sequence, Tuple
from typing import Any, Dict, List, Sequence, Tuple
import sys
from SYS import pipeline as ctx
-1
View File
@@ -7,7 +7,6 @@ import sys
from pathlib import Path
from SYS.logger import debug, log
from SYS.utils import format_bytes
from Store.Folder import Folder
from Store import Store
from . import _shared as sh
-2
View File
@@ -2,10 +2,8 @@ from __future__ import annotations
from typing import Any, Dict, Sequence
from pathlib import Path
import json
import sys
from SYS import models
from SYS import pipeline as ctx
from . import _shared as sh
+8 -10
View File
@@ -1,18 +1,16 @@
from __future__ import annotations
from typing import Any, Dict, List, Optional, Sequence, Tuple
from typing import Any, Dict, List, Sequence, Tuple
import sys
from SYS import pipeline as ctx
from . import _shared as sh
Cmdlet, CmdletArg, SharedArgs, parse_cmdlet_args, get_field, normalize_hash = (
sh.Cmdlet,
sh.CmdletArg,
sh.SharedArgs,
sh.parse_cmdlet_args,
sh.get_field,
sh.normalize_hash,
from ._shared import (
Cmdlet,
CmdletArg,
SharedArgs,
parse_cmdlet_args,
get_field,
normalize_hash,
)
from SYS.logger import log
from Store import Store
+5 -7
View File
@@ -15,7 +15,6 @@ from typing import Any, Dict, List, Optional, Sequence
from urllib.parse import urlparse
from contextlib import AbstractContextManager, nullcontext
import requests
from API.HTTP import _download_direct_file
from SYS.models import DownloadError, DownloadOptions, DownloadMediaResult
@@ -26,7 +25,6 @@ from SYS.rich_display import stderr_console as get_stderr_console
from SYS import pipeline as pipeline_context
from SYS.utils import sha256_file
from SYS.metadata import normalize_urls as normalize_url_list
from rich.prompt import Confirm
from tool.ytdlp import (
YtDlpTool,
@@ -948,7 +946,7 @@ class Download_File(Cmdlet):
from Store import Store
from API.HydrusNetwork import is_hydrus_available
debug(f"[download-file] Initializing storage interface...")
debug("[download-file] Initializing storage interface...")
storage = Store(config=config or {}, suppress_debug=True)
hydrus_available = bool(is_hydrus_available(config or {}))
@@ -1338,7 +1336,7 @@ class Download_File(Cmdlet):
table.set_source_command("download-file", [url])
debug(f"[ytdlp.formatlist] Displaying format selection table for {url}")
debug(f"[ytdlp.formatlist] Provider: ytdlp (routing to download-file via TABLE_AUTO_STAGES)")
debug("[ytdlp.formatlist] Provider: ytdlp (routing to download-file via TABLE_AUTO_STAGES)")
results_list: List[Dict[str, Any]] = []
for idx, fmt in enumerate(filtered_formats, 1):
@@ -1420,7 +1418,7 @@ class Download_File(Cmdlet):
f"[ytdlp.formatlist] When user selects @N, will invoke: download-file {url} -query 'format:<format_id>'"
)
log(f"", file=sys.stderr)
log("", file=sys.stderr)
return 0
return None
@@ -2054,7 +2052,7 @@ class Download_File(Cmdlet):
forced_single_format_id = None
forced_single_format_for_batch = False
debug(f"[download-file] Checking if format table should be shown...")
debug("[download-file] Checking if format table should be shown...")
early_ret = self._maybe_show_format_table_for_single_url(
mode=mode,
clip_spec=clip_spec,
@@ -2763,7 +2761,7 @@ class Download_File(Cmdlet):
debug(f"[download-file] Processing {total_selection} selected item(s) from table...")
for idx, run_args in enumerate(selection_runs, 1):
debug(f"[download-file] Item {idx}/{total_selection}: {run_args}")
debug(f"[download-file] Re-invoking download-file for selected item...")
debug("[download-file] Re-invoking download-file for selected item...")
exit_code = self._run_impl(None, run_args, config)
if exit_code == 0:
successes += 1
+2 -2
View File
@@ -92,7 +92,7 @@ class Get_File(sh.Cmdlet):
debug(f"[get-file] Backend retrieved: {type(backend).__name__}")
# Get file metadata to determine name and extension
debug(f"[get-file] Getting metadata for hash...")
debug("[get-file] Getting metadata for hash...")
metadata = backend.get_metadata(file_hash)
if not metadata:
log(f"Error: File metadata not found for hash {file_hash}")
@@ -228,7 +228,7 @@ class Get_File(sh.Cmdlet):
}
)
debug(f"[get-file] Completed successfully")
debug("[get-file] Completed successfully")
return 0
def _open_file_default(self, path: Path) -> None:
-1
View File
@@ -5,7 +5,6 @@ import json
import sys
from SYS.logger import log
from pathlib import Path
from . import _shared as sh
-1
View File
@@ -7,7 +7,6 @@ import sys
from SYS.logger import log
from SYS import pipeline as ctx
from SYS.result_table import Table
from . import _shared as sh
Cmdlet = sh.Cmdlet
+2 -6
View File
@@ -1,13 +1,11 @@
from __future__ import annotations
from typing import Any, Dict, Sequence, List, Optional
import json
from typing import Any, Dict, Sequence, Optional
import sys
from pathlib import Path
from SYS.logger import log
from SYS import models
from SYS import pipeline as ctx
from API import HydrusNetwork as hydrus_wrapper
from . import _shared as sh
@@ -22,8 +20,6 @@ fetch_hydrus_metadata = sh.fetch_hydrus_metadata
should_show_help = sh.should_show_help
get_field = sh.get_field
from API.folder import API_folder_store
from SYS.config import get_local_storage_path
from SYS.result_table import Table
from Store import Store
CMDLET = Cmdlet(
@@ -512,7 +508,7 @@ def _run(result: Any, _args: Sequence[str], config: Dict[str, Any]) -> int:
if source_title and source_title != "Unknown":
metadata["Title"] = source_title
table = ItemDetailView(f"Relationships", item_metadata=metadata
table = ItemDetailView("Relationships", item_metadata=metadata
).init_command("get-relationship",
[])
+2 -4
View File
@@ -25,8 +25,7 @@ from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence, Tuple
from SYS import pipeline as ctx
from API import HydrusNetwork
from API.folder import read_sidecar, write_sidecar, find_sidecar, API_folder_store
from API.folder import read_sidecar, write_sidecar
from . import _shared as sh
normalize_hash = sh.normalize_hash
@@ -36,7 +35,6 @@ CmdletArg = sh.CmdletArg
SharedArgs = sh.SharedArgs
parse_cmdlet_args = sh.parse_cmdlet_args
get_field = sh.get_field
from SYS.config import get_local_storage_path
try:
from SYS.metadata import extract_title
@@ -944,7 +942,7 @@ def _scrape_url_metadata(
)
except json_module.JSONDecodeError:
pass
except Exception as e:
except Exception:
pass # Silently ignore if we can't get playlist entries
# Fallback: if still no tags detected, get from first item
+6 -8
View File
@@ -8,14 +8,12 @@ import sys
import re
from fnmatch import fnmatch
from urllib.parse import urlparse, parse_qsl, urlencode, urlunparse
from . import _shared as sh
Cmdlet, SharedArgs, parse_cmdlet_args, get_field, normalize_hash = (
sh.Cmdlet,
sh.SharedArgs,
sh.parse_cmdlet_args,
sh.get_field,
sh.normalize_hash,
from ._shared import (
Cmdlet,
SharedArgs,
parse_cmdlet_args,
get_field,
normalize_hash,
)
from SYS.logger import log
from SYS.result_table import Table
+21 -21
View File
@@ -320,7 +320,7 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
f"Mixed file types detected: {', '.join(sorted(file_types))}",
file=sys.stderr
)
log(f"Can only merge files of the same type", file=sys.stderr)
log("Can only merge files of the same type", file=sys.stderr)
return 1
file_kind = list(file_types)[0] if file_types else "other"
@@ -524,7 +524,7 @@ def _merge_audio(files: List[Path], output: Path, output_format: str) -> bool:
current_time_ms = 0
log(f"Analyzing {len(files)} files for chapter information...", file=sys.stderr)
logger.info(f"[merge-file] Analyzing files for chapters")
logger.info("[merge-file] Analyzing files for chapters")
for file_path in files:
# Get duration using ffprobe
@@ -767,14 +767,14 @@ def _merge_audio(files: List[Path], output: Path, output_format: str) -> bool:
logger.exception(f"[merge-file] ffmpeg process error: {e}")
raise
log(f"Merge successful, adding chapters metadata...", file=sys.stderr)
log("Merge successful, adding chapters metadata...", file=sys.stderr)
# Step 5: Embed chapters into container (MKA, MP4/M4A, or note limitation)
if output_format == "mka" or output.suffix.lower() == ".mka":
# MKA/MKV format has native chapter support via FFMetadata
# Re-mux the file with chapters embedded (copy streams, no re-encode)
log(f"Embedding chapters into Matroska container...", file=sys.stderr)
logger.info(f"[merge-file] Adding chapters to MKA file via FFMetadata")
log("Embedding chapters into Matroska container...", file=sys.stderr)
logger.info("[merge-file] Adding chapters to MKA file via FFMetadata")
temp_output = output.parent / f".temp_{output.stem}.mka"
@@ -783,7 +783,7 @@ def _merge_audio(files: List[Path], output: Path, output_format: str) -> bool:
if mkvmerge_path:
# mkvmerge is the best tool for embedding chapters in Matroska files
log(f"Using mkvmerge for optimal chapter embedding...", file=sys.stderr)
log("Using mkvmerge for optimal chapter embedding...", file=sys.stderr)
cmd2 = [
mkvmerge_path,
"-o",
@@ -795,7 +795,7 @@ def _merge_audio(files: List[Path], output: Path, output_format: str) -> bool:
else:
# Fallback to ffmpeg with proper chapter embedding for Matroska
log(
f"Using ffmpeg for chapter embedding (install mkvtoolnix for better quality)...",
"Using ffmpeg for chapter embedding (install mkvtoolnix for better quality)...",
file=sys.stderr,
)
# For Matroska files, the metadata must be provided via -f ffmetadata input
@@ -838,12 +838,12 @@ def _merge_audio(files: List[Path], output: Path, output_format: str) -> bool:
if output.exists():
output.unlink()
shutil.move(str(temp_output), str(output))
log(f"✓ Chapters successfully embedded!", file=sys.stderr)
logger.info(f"[merge-file] Chapters embedded successfully")
log("✓ Chapters successfully embedded!", file=sys.stderr)
logger.info("[merge-file] Chapters embedded successfully")
except Exception as e:
logger.warning(f"[merge-file] Could not replace file: {e}")
log(
f"Warning: Could not embed chapters, using merge without chapters",
"Warning: Could not embed chapters, using merge without chapters",
file=sys.stderr,
)
try:
@@ -852,12 +852,12 @@ def _merge_audio(files: List[Path], output: Path, output_format: str) -> bool:
pass
else:
logger.warning(
f"[merge-file] Chapter embedding did not create output"
"[merge-file] Chapter embedding did not create output"
)
except Exception as e:
logger.exception(f"[merge-file] Chapter embedding failed: {e}")
log(
f"Warning: Chapter embedding failed, using merge without chapters",
"Warning: Chapter embedding failed, using merge without chapters",
file=sys.stderr,
)
elif output_format in {"m4a",
@@ -865,15 +865,15 @@ def _merge_audio(files: List[Path], output: Path, output_format: str) -> bool:
".m4b",
".mp4"]:
# MP4/M4A format has native chapter support via iTunes metadata atoms
log(f"Embedding chapters into MP4 container...", file=sys.stderr)
log("Embedding chapters into MP4 container...", file=sys.stderr)
logger.info(
f"[merge-file] Adding chapters to M4A/MP4 file via iTunes metadata"
"[merge-file] Adding chapters to M4A/MP4 file via iTunes metadata"
)
temp_output = output.parent / f".temp_{output.stem}{output.suffix}"
# ffmpeg embeds chapters in MP4 using -map_metadata and -map_chapters
log(f"Using ffmpeg for MP4 chapter embedding...", file=sys.stderr)
log("Using ffmpeg for MP4 chapter embedding...", file=sys.stderr)
cmd2 = [
ffmpeg_path,
"-y",
@@ -916,14 +916,14 @@ def _merge_audio(files: List[Path], output: Path, output_format: str) -> bool:
output.unlink()
shutil.move(str(temp_output), str(output))
log(
f"✓ Chapters successfully embedded in MP4!",
"✓ Chapters successfully embedded in MP4!",
file=sys.stderr
)
logger.info(f"[merge-file] MP4 chapters embedded successfully")
logger.info("[merge-file] MP4 chapters embedded successfully")
except Exception as e:
logger.warning(f"[merge-file] Could not replace file: {e}")
log(
f"Warning: Could not embed chapters, using merge without chapters",
"Warning: Could not embed chapters, using merge without chapters",
file=sys.stderr,
)
try:
@@ -932,12 +932,12 @@ def _merge_audio(files: List[Path], output: Path, output_format: str) -> bool:
pass
else:
logger.warning(
f"[merge-file] MP4 chapter embedding did not create output"
"[merge-file] MP4 chapter embedding did not create output"
)
except Exception as e:
logger.exception(f"[merge-file] MP4 chapter embedding failed: {e}")
log(
f"Warning: MP4 chapter embedding failed, using merge without chapters",
"Warning: MP4 chapter embedding failed, using merge without chapters",
file=sys.stderr,
)
else:
@@ -945,7 +945,7 @@ def _merge_audio(files: List[Path], output: Path, output_format: str) -> bool:
logger.info(
f"[merge-file] Format {output_format} does not have native chapter support"
)
log(f"Note: For chapter support, use MKA or M4A format", file=sys.stderr)
log("Note: For chapter support, use MKA or M4A format", file=sys.stderr)
# Clean up temp files
try:
+2 -2
View File
@@ -4,7 +4,7 @@ import sys
from typing import Any, Dict, Iterable, Sequence
from . import _shared as sh
from SYS.logger import log, debug
from SYS.logger import log
from SYS import pipeline as ctx
from SYS.result_table_adapters import get_provider
@@ -43,7 +43,7 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
try:
provider = get_provider(provider_name)
except Exception as exc:
except Exception:
log(f"Unknown provider: {provider_name}", file=sys.stderr)
return 1
+3 -3
View File
@@ -656,7 +656,7 @@ def _capture(
# Attempt platform-specific target capture if requested (and not PDF)
element_captured = False
if options.prefer_platform_target and format_name != "pdf":
debug(f"[_capture] Target capture enabled")
debug("[_capture] Target capture enabled")
debug("Attempting platform-specific content capture...")
progress.step("capturing locating target")
try:
@@ -913,7 +913,7 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
url_to_process.append((str(url), item))
if not url_to_process:
log(f"No url to process for screen-shot cmdlet", file=sys.stderr)
log("No url to process for screen-shot cmdlet", file=sys.stderr)
return 1
debug(f"[_run] url to process: {[u for u, _ in url_to_process]}")
@@ -1157,7 +1157,7 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
progress.close_local_ui(force_complete=True)
if not all_emitted:
log(f"No screenshots were successfully captured", file=sys.stderr)
log("No screenshots were successfully captured", file=sys.stderr)
return 1
# Log completion message (keep this as normal output)
+1 -13
View File
@@ -3,7 +3,6 @@
from __future__ import annotations
from typing import Any, Dict, Sequence, List, Optional
import importlib
import uuid
from pathlib import Path
import re
@@ -19,9 +18,7 @@ from SYS.rich_display import (
show_available_providers_panel,
)
from . import _shared as sh
(
from ._shared import (
Cmdlet,
CmdletArg,
SharedArgs,
@@ -30,15 +27,6 @@ from . import _shared as sh
normalize_hash,
first_title_tag,
parse_hash_query,
) = (
sh.Cmdlet,
sh.CmdletArg,
sh.SharedArgs,
sh.get_field,
sh.should_show_help,
sh.normalize_hash,
sh.first_title_tag,
sh.parse_hash_query,
)
from SYS import pipeline as ctx
+1 -1
View File
@@ -16,7 +16,7 @@ def _register_cmdlet_object(cmdlet_obj, registry: Dict[str, CmdletFn]) -> None:
registry[cmdlet_obj.name.replace("_", "-").lower()] = run_fn
# Cmdlet uses 'alias' (List[str]). Some older objects may use 'aliases'.
aliases = []
aliases: list[str] = []
if hasattr(cmdlet_obj, "alias") and getattr(cmdlet_obj, "alias"):
aliases.extend(getattr(cmdlet_obj, "alias") or [])
if hasattr(cmdlet_obj, "aliases") and getattr(cmdlet_obj, "aliases"):
+2 -2
View File
@@ -1,8 +1,8 @@
import json
import os
import sys
from typing import List, Dict, Any, Optional, Sequence
from cmdlet._shared import Cmdlet, CmdletArg, parse_cmdlet_args
from typing import List, Dict, Any, Sequence
from cmdlet._shared import Cmdlet, CmdletArg
from SYS.logger import log
from SYS.result_table import Table
from SYS import pipeline as ctx
+1 -1
View File
@@ -213,7 +213,7 @@ def _run(piped_result: Any, args: List[str], config: Dict[str, Any]) -> int:
# Check if we're in an interactive terminal and can launch a Textual modal
if sys.stdin.isatty() and not piped_result:
try:
from textual.app import App, ComposeResult
from textual.app import App
from TUI.modalscreen.config_modal import ConfigModal
class ConfigApp(App):
-1
View File
@@ -4,7 +4,6 @@ import sys
import json
import socket
import re
import subprocess
from urllib.parse import urlparse, parse_qs
from pathlib import Path
from cmdlet._shared import Cmdlet, CmdletArg, parse_cmdlet_args, resolve_tidal_manifest_path
+3 -6
View File
@@ -1,15 +1,12 @@
from __future__ import annotations
import sys
import shutil
from typing import Any, Dict, List, Optional, Sequence, Tuple
from datetime import datetime
from typing import Any, Dict, List
from cmdlet._shared import Cmdlet, CmdletArg
from cmdlet._shared import Cmdlet
from SYS import pipeline as ctx
from SYS.result_table import Table
from SYS.logger import log, set_debug, debug
from SYS.rich_display import stdout_console
from SYS.logger import set_debug, debug
CMDLET = Cmdlet(
name=".status",
+2 -3
View File
@@ -1,15 +1,14 @@
import os
import sys
import requests
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence
from typing import Any, Dict, Sequence
# Add project root to sys.path
root = Path(__file__).resolve().parent.parent
if str(root) not in sys.path:
sys.path.insert(0, str(root))
from cmdlet._shared import Cmdlet, CmdletArg
from cmdlet._shared import Cmdlet
from SYS.config import load_config
from SYS.result_table import Table
from API import zerotier as zt
+28 -27
View File
@@ -55,11 +55,13 @@ from __future__ import annotations
import argparse
import os
import platform
import re
from pathlib import Path
import shutil
import subprocess
import sys
import time
from typing import Optional
def run(cmd: list[str], quiet: bool = False, debug: bool = False, cwd: Optional[Path] = None) -> None:
@@ -203,7 +205,6 @@ def run_platform_bootstrap(repo_root: Path) -> int:
def playwright_package_installed() -> bool:
try:
import playwright # type: ignore
return True
except Exception:
@@ -751,7 +752,7 @@ def main() -> int:
user_bin = Path(os.environ.get("USERPROFILE", str(home))) / "bin"
mm_bat = user_bin / "mm.bat"
print(f"Checking for shim files:")
print("Checking for shim files:")
print(f" mm.bat: {'' if mm_bat.exists() else ''} ({mm_bat})")
print()
@@ -760,14 +761,14 @@ def main() -> int:
if "REPO=" in bat_content or "ENTRY=" in bat_content:
print(f" mm.bat content looks valid ({len(bat_content)} bytes)")
else:
print(f" ⚠️ mm.bat content may be corrupted")
print(" ⚠️ mm.bat content may be corrupted")
print()
# Check PATH
path = os.environ.get("PATH", "")
user_bin_str = str(user_bin)
in_path = user_bin_str in path
print(f"Checking PATH environment variable:")
print("Checking PATH environment variable:")
print(f" {user_bin_str} in current session PATH: {'' if in_path else ''}")
# Check registry
@@ -792,7 +793,7 @@ def main() -> int:
try:
result = subprocess.run(["mm", "--help"], capture_output=True, text=True, timeout=5)
if result.returncode == 0:
print(f"'mm --help' works!")
print("'mm --help' works!")
print(f" Output (first line): {result.stdout.split(chr(10))[0]}")
else:
print(f"'mm --help' failed with exit code {result.returncode}")
@@ -800,8 +801,8 @@ def main() -> int:
print(f" Error: {result.stderr.strip()}")
except FileNotFoundError:
# mm not found via PATH, try calling the .ps1 directly
print(f"'mm' command not found in PATH")
print(f" Shims exist but command is not accessible via PATH")
print("'mm' command not found in PATH")
print(" Shims exist but command is not accessible via PATH")
print()
print("Attempting to call shim directly...")
try:
@@ -810,23 +811,23 @@ def main() -> int:
capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
print(f" ✓ Direct shim call works!")
print(f" The shim files are valid and functional.")
print(" ✓ Direct shim call works!")
print(" The shim files are valid and functional.")
print()
print("⚠️ 'mm' is not in PATH, but the shims are working correctly.")
print()
print("Possible causes and fixes:")
print(f" 1. Terminal needs restart: Close and reopen your terminal/PowerShell")
print(f" 2. PATH reload: Run: $env:Path = [Environment]::GetEnvironmentVariable('PATH', 'User') + ';' + [Environment]::GetEnvironmentVariable('PATH', 'Machine')")
print(" 1. Terminal needs restart: Close and reopen your terminal/PowerShell")
print(" 2. PATH reload: Run: $env:Path = [Environment]::GetEnvironmentVariable('PATH', 'User') + ';' + [Environment]::GetEnvironmentVariable('PATH', 'Machine')")
print(f" 3. Manual PATH: Add {user_bin} to your system PATH manually")
else:
print(f" ✗ Direct shim call failed")
print(" ✗ Direct shim call failed")
if result.stderr:
print(f" Error: {result.stderr.strip()}")
except Exception as e:
print(f" ✗ Could not test direct shim: {e}")
except subprocess.TimeoutExpired:
print(f"'mm' command timed out")
print("'mm' command timed out")
except Exception as e:
print(f" ✗ Error testing 'mm': {e}")
else:
@@ -835,7 +836,7 @@ def main() -> int:
locations = [home / ".local" / "bin" / "mm", Path("/usr/local/bin/mm"), Path("/usr/bin/mm")]
found_shims = [p for p in locations if p.exists()]
print(f"Checking for shim files:")
print("Checking for shim files:")
for p in locations:
if p.exists():
print(f" mm: ✓ ({p})")
@@ -844,23 +845,23 @@ def main() -> int:
print(f" mm: ✗ ({p})")
if not found_shims:
print(f" mm: ✗ (No shim found in standard locations)")
print(" mm: ✗ (No shim found in standard locations)")
print()
path = os.environ.get("PATH", "")
# Find which 'mm' is actually being run
actual_mm = shutil.which("mm")
print(f"Checking PATH environment variable:")
print("Checking PATH environment variable:")
if actual_mm:
print(f" 'mm' resolved to: {actual_mm}")
# Check if it's in a directory on the PATH
if any(str(Path(actual_mm).parent) in p for p in path.split(os.pathsep)):
print(f" Command is accessible via current session PATH: ✓")
print(" Command is accessible via current session PATH: ✓")
else:
print(f" Command is found but directory may not be in current PATH: ⚠️")
print(" Command is found but directory may not be in current PATH: ⚠️")
else:
print(f" 'mm' not found in current session PATH: ✗")
print(" 'mm' not found in current session PATH: ✗")
print()
# Test if mm command works
@@ -868,14 +869,14 @@ def main() -> int:
try:
result = subprocess.run(["mm", "--help"], capture_output=True, text=True, timeout=5)
if result.returncode == 0:
print(f"'mm --help' works!")
print("'mm --help' works!")
print(f" Output (first line): {result.stdout.split(chr(10))[0]}")
else:
print(f"'mm --help' failed with exit code {result.returncode}")
if result.stderr:
print(f" Error: {result.stderr.strip()}")
except FileNotFoundError:
print(f"'mm' command not found in PATH")
print("'mm' command not found in PATH")
except Exception as e:
print(f" ✗ Error testing 'mm': {e}")
@@ -1002,7 +1003,7 @@ def main() -> int:
try:
_run_cmd([str(python_path), "-m", "ensurepip", "--upgrade"])
except subprocess.CalledProcessError as exc:
except subprocess.CalledProcessError:
print(
"Failed to install pip inside the local virtualenv via ensurepip; ensure your Python build includes ensurepip and retry.",
file=sys.stderr,
@@ -1089,7 +1090,7 @@ def main() -> int:
# 7. CLI Verification
pb.update("Verifying CLI configuration...")
try:
rc = subprocess.run(
cli_verify_result = subprocess.run(
[
str(venv_python),
"-c",
@@ -1099,7 +1100,7 @@ def main() -> int:
stderr=subprocess.DEVNULL,
check=False,
)
if rc.returncode != 0:
if cli_verify_result.returncode != 0:
cmd = [
str(venv_python),
"-c",
@@ -1326,17 +1327,17 @@ if (Test-Path (Join-Path $repo 'CLI.py')) {
if not args.quiet:
print(f"Installed global launcher to: {user_bin}")
print(f"✓ mm.bat (Command Prompt and PowerShell)")
print("✓ mm.bat (Command Prompt and PowerShell)")
print()
print("You can now run 'mm' from any terminal window.")
print(f"If 'mm' is not found, restart your terminal or reload PATH:")
print("If 'mm' is not found, restart your terminal or reload PATH:")
print(" PowerShell: $env:PATH = [Environment]::GetEnvironmentVariable('PATH','User') + ';' + [Environment]::GetEnvironmentVariable('PATH','Machine')")
print(" CMD: path %PATH%")
else:
# POSIX
# If running as root (id 0), prefer /usr/bin or /usr/local/bin which are standard on PATH
if os.getuid() == 0:
if hasattr(os, "getuid") and os.getuid() == 0:
user_bin = Path("/usr/local/bin")
if not os.access(user_bin, os.W_OK):
user_bin = Path("/usr/bin")
+1 -1
View File
@@ -5,6 +5,6 @@ import traceback
try:
importlib.import_module("CLI")
print("CLI imported OK")
except Exception as e:
except Exception:
traceback.print_exc()
sys.exit(1)
+19
View File
@@ -0,0 +1,19 @@
import re
from pathlib import Path
p = Path(r'c:\Forgejo\Medios-Macina\CLI.py')
s = p.read_text(encoding='utf-8')
pattern = re.compile(r'(?s)if False:\s*class _OldPipelineExecutor:.*?from rich\\.markdown import Markdown\\s*')
m = pattern.search(s)
print('found', bool(m))
if m:
print('start', m.start(), 'end', m.end())
print('snippet:', s[m.start():m.start()+120])
else:
# print a slice around the if False for debugging
i = s.find('if False:')
print('if False index', i)
print('around if False:', s[max(0,i-50):i+200])
j = s.find('from rich.markdown import Markdown', i)
print('next from rich index after if False', j)
if j!=-1:
print('around that:', s[j-50:j+80])
+35
View File
@@ -0,0 +1,35 @@
from pathlib import Path
p=Path('SYS/pipeline.py')
s=p.read_text(encoding='utf-8')
lines=s.splitlines()
stack=[]
for i,l in enumerate(lines,1):
stripped=l.strip()
# Skip commented lines
if stripped.startswith('#'):
continue
# compute indent as leading spaces (tabs are converted)
indent = len(l) - len(l.lstrip(' '))
if stripped.startswith('try:'):
stack.append((indent, i))
if stripped.startswith('except ') or stripped=='except:' or stripped.startswith('finally:'):
# find the most recent try with same indent
for idx in range(len(stack)-1, -1, -1):
if stack[idx][0] == indent:
stack.pop(idx)
break
else:
# no matching try at same indent
print(f"Found {stripped.split()[0]} at line {i} with no matching try at same indent")
print('Unmatched try count', len(stack))
if stack:
print('Unmatched try positions (indent, line):', stack)
for indent, lineno in stack:
start = max(1, lineno - 10)
end = min(len(lines), lineno + 10)
print(f"Context around line {lineno}:")
for i in range(start, end + 1):
print(f"{i:5d}: {lines[i-1]}")
else:
print("All try statements appear matched")
+2 -1
View File
@@ -1,4 +1,5 @@
import importlib, traceback
import importlib
import traceback
try:
m = importlib.import_module('Provider.vimm')
+21 -15
View File
@@ -28,7 +28,6 @@ import sys
import tempfile
import urllib.request
import zipfile
import shlex
import re
from pathlib import Path
from typing import Optional, Tuple
@@ -370,7 +369,11 @@ def is_elevated() -> bool:
return False
else:
try:
return os.geteuid() == 0
# Use getattr for platform-specific os methods to satisfy Mypy
geteuid = getattr(os, "geteuid", None)
if geteuid:
return bool(geteuid() == 0)
return False
except Exception:
return False
except Exception:
@@ -477,9 +480,9 @@ def fix_permissions_unix(
user = getpass.getuser()
try:
pw = pwd.getpwnam(user)
pw = pwd.getpwnam(user) # type: ignore[attr-defined]
uid = pw.pw_uid
gid = pw.pw_gid if not group else grp.getgrnam(group).gr_gid
gid = pw.pw_gid if not group else grp.getgrnam(group).gr_gid # type: ignore[attr-defined]
except Exception:
logging.warning("Could not resolve user/group to uid/gid; skipping chown.")
return False
@@ -501,16 +504,18 @@ def fix_permissions_unix(
except Exception:
# Best-effort fallback: chown/chmod individual entries
for root_dir, dirs, files in os.walk(path):
try:
os.chown(root_dir, uid, gid)
except Exception:
pass
for fn in files:
fpath = os.path.join(root_dir, fn)
if hasattr(os, "chown"):
try:
os.chown(fpath, uid, gid)
os.chown(root_dir, uid, gid)
except Exception:
pass
for fn in files:
fpath = os.path.join(root_dir, fn)
if hasattr(os, "chown"):
try:
os.chown(fpath, uid, gid)
except Exception:
pass
# Fix modes: directories 0o755, files 0o644 (best-effort)
for root_dir, dirs, files in os.walk(path):
@@ -870,7 +875,7 @@ def main(argv: Optional[list[str]] = None) -> int:
args.root = str(default_root)
# Ask for destination folder name
dest_input = input(f"Enter folder name for Hydrus [default: hydrusnetwork]: ").strip()
dest_input = input("Enter folder name for Hydrus [default: hydrusnetwork]: ").strip()
if dest_input:
args.dest_name = dest_input
except (EOFError, KeyboardInterrupt):
@@ -1455,11 +1460,12 @@ def main(argv: Optional[list[str]] = None) -> int:
if p.exists():
client_found = p
break
run_client_script = None
if client_found:
# Prefer run_client helper located in the cloned repo; if missing, fall back to top-level scripts folder helper.
script_dir = Path(__file__).resolve().parent
helper_candidates = [dest / "run_client.py", script_dir / "run_client.py"]
run_client_script = None
for cand in helper_candidates:
if cand.exists():
run_client_script = cand
@@ -1478,7 +1484,7 @@ def main(argv: Optional[list[str]] = None) -> int:
)
else:
if getattr(args, "install_service", False):
if run_client_script.exists():
if run_client_script and run_client_script.exists():
cmd = [
str(venv_py),
str(run_client_script),
@@ -1514,7 +1520,7 @@ def main(argv: Optional[list[str]] = None) -> int:
dest / "run_client.py",
)
if getattr(args, "uninstall_service", False):
if run_client_script.exists():
if run_client_script and run_client_script.exists():
cmd = [
str(venv_py),
str(run_client_script),
+22 -21
View File
@@ -41,7 +41,6 @@ from __future__ import annotations
import os
import sys
import json
import argparse
import logging
import threading
@@ -54,7 +53,6 @@ from functools import wraps
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
from SYS.logger import log
# ============================================================================
# CONFIGURATION
@@ -419,29 +417,32 @@ def create_app():
filename = sanitize_filename(file_storage.filename or "upload")
incoming_dir = STORAGE_PATH / "incoming"
ensure_directory(incoming_dir)
target_path = incoming_dir / filename
target_path = unique_path(target_path)
try:
# Save uploaded file to storage
file_storage.save(str(target_path))
# Extract optional metadata
tags = []
if 'tag' in request.form:
# Support repeated form fields or comma-separated list
tags = request.form.getlist('tag') or []
if not tags and request.form.get('tag'):
tags = [t.strip() for t in str(request.form.get('tag') or "").split(",") if t.strip()]
urls = []
if 'url' in request.form:
urls = request.form.getlist('url') or []
if not urls and request.form.get('url'):
urls = [u.strip() for u in str(request.form.get('url') or "").split(",") if u.strip()]
# Initialize the DB first (run safety checks) before creating any files.
with API_folder_store(STORAGE_PATH) as db:
# Ensure the incoming directory exists only after DB safety checks pass.
ensure_directory(incoming_dir)
# Save uploaded file to storage
file_storage.save(str(target_path))
# Extract optional metadata
tags = []
if 'tag' in request.form:
# Support repeated form fields or comma-separated list
tags = request.form.getlist('tag') or []
if not tags and request.form.get('tag'):
tags = [t.strip() for t in str(request.form.get('tag') or "").split(",") if t.strip()]
urls = []
if 'url' in request.form:
urls = request.form.getlist('url') or []
if not urls and request.form.get('url'):
urls = [u.strip() for u in str(request.form.get('url') or "").split(",") if u.strip()]
db.get_or_create_file_entry(target_path)
if tags:
@@ -723,7 +724,7 @@ def main():
local_ip = "127.0.0.1"
print(f"\n{'='*70}")
print(f"Remote Storage Server - Medios-Macina")
print("Remote Storage Server - Medios-Macina")
print(f"{'='*70}")
print(f"Storage Path: {STORAGE_PATH}")
print(f"Local IP: {local_ip}")
+18
View File
@@ -0,0 +1,18 @@
#!/usr/bin/env python3
from pathlib import Path
p = Path(r"c:\Forgejo\Medios-Macina\CLI.py")
s = p.read_text(encoding='utf-8')
start = s.find('\nif False:')
if start == -1:
print('No if False found')
else:
after = s[start+1:]
idx = after.find('\nfrom rich.markdown import Markdown')
if idx == -1:
print('No subsequent import found')
else:
before = s[:start]
rest = after[idx+1:]
new = before + '\nfrom rich.markdown import Markdown\n' + rest
p.write_text(new, encoding='utf-8')
print('Removed legacy block')
+1 -2
View File
@@ -14,10 +14,9 @@ from __future__ import annotations
import argparse
import json
import sys
from typing import Any
from pathlib import Path
from SYS.logger import log, debug
from SYS.logger import log
try:
from API import zerotier
+1 -1
View File
@@ -2,7 +2,7 @@ from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple
from typing import Any, Dict, List, Optional, Sequence, Tuple
from SYS.logger import debug
+19 -19
View File
@@ -11,7 +11,6 @@ import sys
import threading
import time
import traceback
from contextlib import AbstractContextManager, nullcontext
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Iterator, List, Optional, Sequence, cast
@@ -28,6 +27,7 @@ from SYS.models import (
)
from SYS.pipeline_progress import PipelineProgress
from SYS.utils import ensure_directory, sha256_file
from SYS.metadata import extract_ytdlp_tags
_YTDLP_TRANSFER_STATE: Dict[str, Dict[str, Any]] = {}
@@ -38,7 +38,7 @@ try:
except Exception as exc: # pragma: no cover - handled at runtime
yt_dlp = None # type: ignore
gen_extractors = None # type: ignore
YTDLP_IMPORT_ERROR = exc
YTDLP_IMPORT_ERROR: Optional[Exception] = exc
else:
YTDLP_IMPORT_ERROR = None
@@ -740,16 +740,16 @@ class YtDlpTool:
# Progress + utility helpers for yt-dlp driven downloads (previously in cmdlet/download_media).
_YTDLP_PROGRESS_BAR = ProgressBar()
_YTDLP_TRANSFER_STATE: Dict[str, Dict[str, Any]] = {}
_YTDLP_PROGRESS_ACTIVITY_LOCK = threading.Lock()
_YTDLP_PROGRESS_LAST_ACTIVITY = 0.0
_YTDLP_PROGRESS_ACTIVITY_LOCK = threading.Lock()
_YTDLP_PROGRESS_LAST_ACTIVITY = 0.0
_SUBTITLE_EXTS = (".vtt", ".srt", ".ass", ".ssa", ".lrc")
def _progress_label(status: Dict[str, Any]) -> str:
info_dict = status.get("info_dict") if isinstance(status.get("info_dict"), dict) else {}
def _progress_label(status: Optional[Dict[str, Any]]) -> str:
if not status:
return "unknown"
raw_info = status.get("info_dict")
info_dict = raw_info if isinstance(raw_info, dict) else {}
candidates = [
status.get("filename"),
@@ -1245,7 +1245,7 @@ def download_media(opts: DownloadOptions, *, debug_logger: Optional[DebugLogger]
debug(
f"Skipping probe for playlist (item selection: {opts.playlist_items}), proceeding with download"
)
probe_result = {"url": opts.url}
probe_result: Optional[Dict[str, Any]] = {"url": opts.url}
else:
probe_cookiefile = None
try:
@@ -1287,7 +1287,7 @@ def download_media(opts: DownloadOptions, *, debug_logger: Optional[DebugLogger]
debug(f"[yt-dlp] force_keyframes_at_cuts: {ytdl_options.get('force_keyframes_at_cuts', False)}")
session_id = None
first_section_info = {}
first_section_info: Dict[str, Any] = {}
if ytdl_options.get("download_sections"):
live_ui, _ = PipelineProgress(pipeline_context).ui_and_pipe_index()
quiet_sections = bool(opts.quiet) or (live_ui is not None)
@@ -1448,20 +1448,20 @@ def download_media(opts: DownloadOptions, *, debug_logger: Optional[DebugLogger]
raise DownloadError(str(exc)) from exc
file_hash = sha256_file(media_path)
tags = []
section_tags: List[str] = []
title = ""
if first_section_info:
title = first_section_info.get("title", "")
if title:
tags.append(f"title:{title}")
section_tags.append(f"title:{title}")
debug(f"Added title tag for section download: {title}")
if first_section_info:
info_dict = first_section_info
info_dict_sec = first_section_info
else:
info_dict = {"id": media_path.stem, "title": title or media_path.stem, "ext": media_path.suffix.lstrip(".")}
info_dict_sec = {"id": media_path.stem, "title": title or media_path.stem, "ext": media_path.suffix.lstrip(".")}
return DownloadMediaResult(path=media_path, info=info_dict, tag=tags, source_url=opts.url, hash_value=file_hash, paths=media_paths)
return DownloadMediaResult(path=media_path, info=info_dict_sec, tag=section_tags, source_url=opts.url, hash_value=file_hash, paths=media_paths)
if not isinstance(info, dict):
log(f"Unexpected yt-dlp response: {type(info)}", file=sys.stderr)
@@ -1484,7 +1484,7 @@ def download_media(opts: DownloadOptions, *, debug_logger: Optional[DebugLogger]
hash_value = None
tags: List[str] = []
if extract_ytdlp_tags:
if extract_ytdlp_tags is not None:
try:
tags = extract_ytdlp_tags(entry)
except Exception as exc:
@@ -1525,10 +1525,10 @@ def download_media(opts: DownloadOptions, *, debug_logger: Optional[DebugLogger]
if debug_logger is not None:
debug_logger.write_record("hash-error", {"path": str(media_path), "error": str(exc)})
tags = []
if extract_ytdlp_tags:
tags_res: List[str] = []
if extract_ytdlp_tags is not None:
try:
tags = extract_ytdlp_tags(entry)
tags_res = extract_ytdlp_tags(entry)
except Exception as exc:
log(f"Error extracting tags: {exc}", file=sys.stderr)
@@ -1547,7 +1547,7 @@ def download_media(opts: DownloadOptions, *, debug_logger: Optional[DebugLogger]
},
)
return DownloadMediaResult(path=media_path, info=entry, tag=tags, source_url=source_url, hash_value=hash_value)
return DownloadMediaResult(path=media_path, info=entry, tag=tags_res, source_url=source_url, hash_value=hash_value)
def _download_with_timeout(opts: DownloadOptions, timeout_seconds: int = 300) -> Any: