From 900a37e210ec6370c40722c1f379b1aa16eb232c Mon Sep 17 00:00:00 2001 From: nose Date: Fri, 19 Dec 2025 15:20:08 -0800 Subject: [PATCH] dfd --- .gitignore | 1 + API/HydrusNetwork.py | 71 +++++++- Log/medeia_macina/telegram.session | Bin 28672 -> 0 bytes Provider/fileio.py | 167 +++++++++++++++++++ Provider/matrix.py | 4 +- Provider/telegram.py | 251 ++++++++++++++++++++++++++++- Provider/zeroxzero.py | 8 +- ProviderCore/registry.py | 2 + Store/Folder.py | 45 +++++- cmdlet/download_file.py | 17 ++ cmdlet/download_media.py | 22 ++- cmdlet/merge_file.py | 76 +++++++-- models.py | 97 +++++++++++ 13 files changed, 729 insertions(+), 32 deletions(-) delete mode 100644 Log/medeia_macina/telegram.session create mode 100644 Provider/fileio.py diff --git a/.gitignore b/.gitignore index 8545a64..1e441a5 100644 --- a/.gitignore +++ b/.gitignore @@ -224,3 +224,4 @@ MPV/ffmpeg/* MPV/portable_config/* Log/ Log/medeia_macina/telegram.session +*.session \ No newline at end of file diff --git a/API/HydrusNetwork.py b/API/HydrusNetwork.py index 6668e2e..c5c0e0a 100644 --- a/API/HydrusNetwork.py +++ b/API/HydrusNetwork.py @@ -151,10 +151,75 @@ class HydrusNetwork: logger.debug(f"{self._log_prefix()} Uploading file {file_path.name} ({file_size} bytes)") + # Stream upload body with a stderr progress bar (pipeline-safe). + try: + from models import ProgressBar + except Exception: + ProgressBar = None # type: ignore[assignment] + + bar = ProgressBar() if ProgressBar is not None else None + label = f"{self._log_prefix().strip('[]')} upload" + start_t = time.time() + last_render_t = [start_t] + last_log_t = [start_t] + sent = [0] + tty = bool(getattr(sys.stderr, "isatty", lambda: False)()) + + def _render_progress(final: bool = False) -> None: + if bar is None: + return + if file_size <= 0: + return + now = time.time() + if not final and (now - float(last_render_t[0])) < 0.25: + return + last_render_t[0] = now + elapsed = max(0.001, now - start_t) + speed = float(sent[0]) / elapsed + eta_s = (float(file_size) - float(sent[0])) / speed if speed > 0 else 0.0 + minutes, seconds = divmod(int(max(0.0, eta_s)), 60) + hours, minutes = divmod(minutes, 60) + eta_str = f"{hours:02d}:{minutes:02d}:{seconds:02d}" + speed_str = bar.format_bytes(speed) + "/s" + + line = bar.format_progress( + percent_str=None, + downloaded=int(sent[0]), + total=int(file_size), + speed_str=speed_str, + eta_str=eta_str, + ) + + try: + if tty: + sys.stderr.write("\r" + f"[{label}] " + line + " ") + sys.stderr.flush() + else: + # Non-interactive: keep it quiet-ish. + if final or (now - float(last_log_t[0])) >= 2.0: + log(f"[{label}] {line}", file=sys.stderr) + last_log_t[0] = now + except Exception: + pass + def file_gen(): - with file_path.open("rb") as handle: - while chunk := handle.read(65536): - yield chunk + try: + with file_path.open("rb") as handle: + while True: + chunk = handle.read(256 * 1024) + if not chunk: + break + sent[0] += len(chunk) + _render_progress(final=False) + yield chunk + finally: + _render_progress(final=True) + if tty: + try: + sys.stderr.write("\n") + sys.stderr.flush() + except Exception: + pass response = client.request( spec.method, diff --git a/Log/medeia_macina/telegram.session b/Log/medeia_macina/telegram.session deleted file mode 100644 index 4e3915a9c1796e7ae1ca699c01ba624a2852fc6e..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 28672 zcmWFz^vNtqRY=P(%1ta$FlG>7U}R))P*7lCV31;9U|?rJ09FPD1{MUDff0#~iz&sR z*CoQsTh747&B(yNiEk-yJr5hN6E`t>ahW+PJ{kg}Aut*OqaiRF0;3@?8UmvsFd72n zhd>h(8@srwDr2*INn%n?YH2}AVo7RzaYV(}_ZG;C_~Oi} zRG7=KD=VoiAfN&f*w{_TNG#64ZwFEwX&~ILqX2iJjsnOTnwVi{%*rP2uE@xcnU|7U zQCyglS&|x`SXz<~=EoxfD&7#mW@BJr-~grBCS_)Jaamc$7D;fxr{SC>5s`7iZ?@6~ky|lu&|+;s~Xb;F(suawYV5x z7~~g~;7?+Sr6n2h*{PLyGDJyYc4~fU36cvj1H*@rO-WvNA=pny_LC;C<0u{tfzc2c4S~@R7!85Z5Eu=C(GVC7fzc2c4S~@R z7!85Z5EyPDz{JkP0Pg?uJ!9Z|Hr#wS>YvdN7!85Z5Eu=C(GVC7fzc2c4S~@R7!85Z z5Eu=Ckrx6@;*9JJnoKP`42C9_dWNPZdWPnvdZvbqyEonZ6!quU_X4R|y6fXjXH7Qc zZwhgM3$-8uS%E5NqnU(T`p4|4q?wwUy^Fu!mlY4VJ zuCEf^@;Sa#S=?Fq+?w~IjGH#v*7IH7^ed@jgPXll)?=g)N2-DwF{KCkVGEZ7C^EX%^8ryX1ao!f7!85Z5Eu=C(GVC7fzc2c i4S~@R7!85Z5Eu=C(GVDsA;87J#=yzJ!kOu6Tnqq*f$miR diff --git a/Provider/fileio.py b/Provider/fileio.py new file mode 100644 index 0000000..c0f638e --- /dev/null +++ b/Provider/fileio.py @@ -0,0 +1,167 @@ +from __future__ import annotations + +import os +import sys +from typing import Any, Dict, Optional + +from ProviderCore.base import Provider +from SYS.logger import log + + +def _pick_provider_config(config: Any) -> Dict[str, Any]: + if not isinstance(config, dict): + return {} + provider = config.get("provider") + if not isinstance(provider, dict): + return {} + entry = provider.get("file.io") + if isinstance(entry, dict): + return entry + return {} + + +def _extract_link(payload: Any) -> Optional[str]: + if isinstance(payload, dict): + for key in ("link", "url", "downloadLink", "download_url"): + val = payload.get(key) + if isinstance(val, str) and val.strip().startswith(("http://", "https://")): + return val.strip() + for nested_key in ("data", "file", "result"): + nested = payload.get(nested_key) + found = _extract_link(nested) + if found: + return found + return None + + +def _extract_key(payload: Any) -> Optional[str]: + if isinstance(payload, dict): + for key in ("key", "id", "uuid"): + val = payload.get(key) + if isinstance(val, str) and val.strip(): + return val.strip() + for nested_key in ("data", "file", "result"): + nested = payload.get(nested_key) + found = _extract_key(nested) + if found: + return found + return None + + +class FileIO(Provider): + """File provider for file.io.""" + + def __init__(self, config: Optional[Dict[str, Any]] = None): + super().__init__(config) + conf = _pick_provider_config(self.config) + self._base_url = str(conf.get("base_url") or "https://file.io").strip().rstrip("/") + self._api_key = conf.get("api_key") + self._default_expires = conf.get("expires") + self._default_max_downloads = conf.get("maxDownloads") + if self._default_max_downloads is None: + self._default_max_downloads = conf.get("max_downloads") + self._default_auto_delete = conf.get("autoDelete") + if self._default_auto_delete is None: + self._default_auto_delete = conf.get("auto_delete") + + def validate(self) -> bool: + return True + + def upload(self, file_path: str, **kwargs: Any) -> str: + from API.HTTP import HTTPClient + from models import ProgressFileReader + + if not os.path.exists(file_path): + raise FileNotFoundError(f"File not found: {file_path}") + + data: Dict[str, Any] = {} + expires = kwargs.get("expires", self._default_expires) + max_downloads = kwargs.get("maxDownloads", kwargs.get("max_downloads", self._default_max_downloads)) + auto_delete = kwargs.get("autoDelete", kwargs.get("auto_delete", self._default_auto_delete)) + + if expires not in (None, ""): + data["expires"] = expires + if max_downloads not in (None, ""): + data["maxDownloads"] = max_downloads + if auto_delete not in (None, ""): + data["autoDelete"] = auto_delete + + headers: Dict[str, str] = {"User-Agent": "Medeia-Macina/1.0", "Accept": "application/json"} + if isinstance(self._api_key, str) and self._api_key.strip(): + # Some file.io plans use bearer tokens; keep optional. + headers["Authorization"] = f"Bearer {self._api_key.strip()}" + + try: + with HTTPClient(headers=headers) as client: + with open(file_path, "rb") as handle: + filename = os.path.basename(file_path) + try: + total = os.path.getsize(file_path) + except Exception: + total = None + wrapped = ProgressFileReader(handle, total_bytes=total, label="upload") + response = client.request( + "POST", + f"{self._base_url}/upload", + data=data or None, + files={"file": (filename, wrapped)}, + follow_redirects=True, + raise_for_status=False, + ) + + if response.status_code >= 400: + location = response.headers.get("location") or response.headers.get("Location") + ct = response.headers.get("content-type") or response.headers.get("Content-Type") + raise Exception(f"Upload failed: {response.status_code} (content-type={ct}, location={location}) - {response.text}") + + payload: Any + try: + payload = response.json() + except Exception: + payload = None + + # If the server ignored our Accept header and returned HTML, this is almost + # certainly the wrong endpoint or an upstream block. + ct = (response.headers.get("content-type") or response.headers.get("Content-Type") or "").lower() + if (payload is None) and ("text/html" in ct): + raise Exception("file.io returned HTML instead of JSON; expected API response from /upload") + + if isinstance(payload, dict) and payload.get("success") is False: + reason = payload.get("message") or payload.get("error") or payload.get("status") + raise Exception(str(reason or "Upload failed")) + + uploaded_url = _extract_link(payload) + if not uploaded_url: + # Some APIs may return the link as plain text. + text = str(response.text or "").strip() + if text.startswith(("http://", "https://")): + uploaded_url = text + + if not uploaded_url: + key = _extract_key(payload) + if key: + uploaded_url = f"{self._base_url}/{key.lstrip('/')}" + + if not uploaded_url: + try: + snippet = (response.text or "").strip() + if len(snippet) > 300: + snippet = snippet[:300] + "..." + except Exception: + snippet = "" + raise Exception(f"Upload succeeded but response did not include a link (response: {snippet})") + + try: + pipe_obj = kwargs.get("pipe_obj") + if pipe_obj is not None: + from Store import Store + + Store(self.config, suppress_debug=True).try_add_url_for_pipe_object(pipe_obj, uploaded_url) + except Exception: + pass + + return uploaded_url + + except Exception as exc: + log(f"[file.io] Upload error: {exc}", file=sys.stderr) + raise diff --git a/Provider/matrix.py b/Provider/matrix.py index a95784a..8dc22b2 100644 --- a/Provider/matrix.py +++ b/Provider/matrix.py @@ -146,6 +146,7 @@ class Matrix(Provider): def upload_to_room(self, file_path: str, room_id: str, **kwargs: Any) -> str: """Upload a file and send it to a specific room.""" + from models import ProgressFileReader path = Path(file_path) if not path.exists(): raise FileNotFoundError(f"File not found: {file_path}") @@ -167,7 +168,8 @@ class Matrix(Provider): # Upload media upload_url = f"{base}/_matrix/media/v3/upload" with open(path, "rb") as handle: - resp = requests.post(upload_url, headers=headers, data=handle, params={"filename": filename}) + wrapped = ProgressFileReader(handle, total_bytes=int(path.stat().st_size), label="upload") + resp = requests.post(upload_url, headers=headers, data=wrapped, params={"filename": filename}) if resp.status_code != 200: raise Exception(f"Matrix upload failed: {resp.text}") content_uri = (resp.json() or {}).get("content_uri") diff --git a/Provider/telegram.py b/Provider/telegram.py index 76ce6b3..ec0e094 100644 --- a/Provider/telegram.py +++ b/Provider/telegram.py @@ -2,6 +2,7 @@ from __future__ import annotations import asyncio import re +import shutil import sys import time from pathlib import Path @@ -79,6 +80,215 @@ class Telegram(Provider): telegram_conf = self.config.get("provider", {}).get("telegram", {}) if isinstance(self.config, dict) else {} self._app_id = telegram_conf.get("app_id") self._api_hash = telegram_conf.get("api_hash") + self._bot_token = telegram_conf.get("bot_token") + # Telethon downloads are chunked; larger parts mean fewer round-trips. + # Telethon typically expects 4..1024 KB and divisible by 4. + self._part_size_kb = telegram_conf.get("part_size_kb") + if self._part_size_kb is None: + self._part_size_kb = telegram_conf.get("chunk_kb") + if self._part_size_kb is None: + self._part_size_kb = telegram_conf.get("download_part_kb") + + # Avoid repeatedly prompting during startup where validate() may be called multiple times. + _startup_auth_attempted: bool = False + + def _legacy_session_base_path(self) -> Path: + # Older versions stored sessions under Log/medeia_macina. + root = Path(__file__).resolve().parents[1] + return root / "Log" / "medeia_macina" / "telegram" + + def _migrate_legacy_session_if_needed(self) -> None: + """If a legacy Telethon session exists, copy it to the new root location.""" + try: + new_base = self._session_base_path() + new_session = Path(str(new_base) + ".session") + if new_session.is_file(): + return + + legacy_base = self._legacy_session_base_path() + legacy_session = Path(str(legacy_base) + ".session") + if not legacy_session.is_file(): + return + + for suffix in (".session", ".session-journal", ".session-wal", ".session-shm"): + src = Path(str(legacy_base) + suffix) + dst = Path(str(new_base) + suffix) + try: + if src.is_file() and not dst.exists(): + shutil.copy2(str(src), str(dst)) + except Exception: + continue + except Exception: + return + + def _session_file_path(self) -> Path: + self._migrate_legacy_session_if_needed() + base = self._session_base_path() + return Path(str(base) + ".session") + + def _has_session(self) -> bool: + self._migrate_legacy_session_if_needed() + try: + return self._session_file_path().is_file() + except Exception: + return False + + def _ensure_session_interactive(self) -> bool: + """Best-effort interactive auth to create a Telethon session file. + + Returns True if a session exists after the attempt. + """ + if self._has_session(): + return True + + # Never prompt in non-interactive contexts. + try: + if not bool(getattr(sys.stdin, "isatty", lambda: False)()): + return False + except Exception: + return False + + try: + from telethon.sync import TelegramClient + except Exception: + return False + + try: + app_id, api_hash = self._credentials() + except Exception: + return False + + self._ensure_event_loop() + loop = asyncio.get_event_loop() + if getattr(loop, "is_running", lambda: False)(): + # Can't safely prompt/auth while another loop is running. + return False + + def _resolve(value): + if asyncio.iscoroutine(value): + return loop.run_until_complete(value) + return value + + try: + sys.stderr.write("[telegram] No session found; login required.\n") + sys.stderr.write("[telegram] Choose login method: 1) phone 2) bot token\n") + sys.stderr.write("[telegram] Enter 1 or 2: ") + sys.stderr.flush() + choice = "" + try: + choice = str(input()).strip().lower() + except EOFError: + choice = "" + + use_bot = choice in {"2", "b", "bot", "token"} + bot_token = "" + if use_bot: + sys.stderr.write("[telegram] Bot token: ") + sys.stderr.flush() + try: + bot_token = str(input()).strip() + except EOFError: + bot_token = "" + if not bot_token: + return False + self._bot_token = bot_token + else: + sys.stderr.write("[telegram] Phone login selected (Telethon will prompt for phone + code).\n") + sys.stderr.flush() + + session_base = self._session_base_path() + client = TelegramClient(str(session_base), app_id, api_hash) + try: + if use_bot: + _resolve(client.start(bot_token=bot_token)) + else: + _resolve(client.start()) + finally: + try: + _resolve(client.disconnect()) + except Exception: + pass + finally: + try: + sys.stderr.write("\n") + sys.stderr.flush() + except Exception: + pass + + return self._has_session() + + def _ensure_session_with_bot_token(self, bot_token: str) -> bool: + """Create a Telethon session using a bot token without prompting. + + Returns True if a session exists after the attempt. + """ + if self._has_session(): + return True + bot_token = str(bot_token or "").strip() + if not bot_token: + return False + try: + from telethon.sync import TelegramClient + except Exception: + return False + try: + app_id, api_hash = self._credentials() + except Exception: + return False + + self._ensure_event_loop() + loop = asyncio.get_event_loop() + if getattr(loop, "is_running", lambda: False)(): + return False + + def _resolve(value): + if asyncio.iscoroutine(value): + return loop.run_until_complete(value) + return value + + session_base = self._session_base_path() + client = TelegramClient(str(session_base), app_id, api_hash) + try: + _resolve(client.start(bot_token=bot_token)) + finally: + try: + _resolve(client.disconnect()) + except Exception: + pass + + return self._has_session() + + def _resolve_part_size_kb(self, file_size: Optional[int]) -> int: + # Default: bias to max throughput. + val = self._part_size_kb + try: + if val not in (None, ""): + ps = int(str(val).strip()) + else: + ps = 1024 + except Exception: + ps = 1024 + + # Clamp to Telethon-safe range. + if ps < 4: + ps = 4 + if ps > 1024: + ps = 1024 + # Must be divisible by 4. + ps = int(ps / 4) * 4 + if ps <= 0: + ps = 64 + + # For very small files, reduce overhead a bit (still divisible by 4). + try: + if file_size is not None and int(file_size) > 0: + if int(file_size) < 2 * 1024 * 1024: + ps = min(ps, 256) + elif int(file_size) < 10 * 1024 * 1024: + ps = min(ps, 512) + except Exception: + pass + return ps def validate(self) -> bool: try: @@ -91,16 +301,29 @@ class Telegram(Provider): except Exception: app_id = None api_hash = str(self._api_hash).strip() if self._api_hash not in (None, "") else "" - return bool(app_id and api_hash) + if not bool(app_id and api_hash): + return False + + # Consider Telegram "configured" only if a persisted session exists. + if self._has_session(): + return True + + # If a bot token is already configured, attempt a non-interactive login. + bot_token = str(self._bot_token or "").strip() + if bot_token: + return bool(self._ensure_session_with_bot_token(bot_token)) + + # Interactive startup prompt (only once per process). + if Telegram._startup_auth_attempted: + return False + Telegram._startup_auth_attempted = True + return bool(self._ensure_session_interactive()) def _session_base_path(self) -> Path: + # Store session alongside cookies.txt at repo root. + # Telethon uses this as base name and writes ".session". root = Path(__file__).resolve().parents[1] - session_dir = root / "Log" / "medeia_macina" - try: - session_dir.mkdir(parents=True, exist_ok=True) - except Exception: - pass - return session_dir / "telegram" + return root / "telegram" def _credentials(self) -> Tuple[int, str]: raw_app_id = self._app_id @@ -266,7 +489,19 @@ class Telegram(Provider): except Exception: return - downloaded = _resolve(client.download_media(message, file=str(output_dir), progress_callback=_progress)) + part_kb = self._resolve_part_size_kb(file_size) + try: + downloaded = _resolve( + client.download_media( + message, + file=str(output_dir), + progress_callback=_progress, + part_size_kb=part_kb, + ) + ) + except TypeError: + # Older/newer Telethon versions may not accept part_size_kb on download_media. + downloaded = _resolve(client.download_media(message, file=str(output_dir), progress_callback=_progress)) try: sys.stderr.write("\n") sys.stderr.flush() diff --git a/Provider/zeroxzero.py b/Provider/zeroxzero.py index dc1251c..b1815d2 100644 --- a/Provider/zeroxzero.py +++ b/Provider/zeroxzero.py @@ -13,6 +13,7 @@ class ZeroXZero(Provider): def upload(self, file_path: str, **kwargs: Any) -> str: from API.HTTP import HTTPClient + from models import ProgressFileReader if not os.path.exists(file_path): raise FileNotFoundError(f"File not found: {file_path}") @@ -21,7 +22,12 @@ class ZeroXZero(Provider): headers = {"User-Agent": "Medeia-Macina/1.0"} with HTTPClient(headers=headers) as client: with open(file_path, "rb") as handle: - response = client.post("https://0x0.st", files={"file": handle}) + try: + total = os.path.getsize(file_path) + except Exception: + total = None + wrapped = ProgressFileReader(handle, total_bytes=total, label="upload") + response = client.post("https://0x0.st", files={"file": wrapped}) if response.status_code == 200: uploaded_url = response.text.strip() diff --git a/ProviderCore/registry.py b/ProviderCore/registry.py index e70ae47..e324d59 100644 --- a/ProviderCore/registry.py +++ b/ProviderCore/registry.py @@ -20,6 +20,7 @@ from Provider.openlibrary import OpenLibrary from Provider.soulseek import Soulseek, download_soulseek_file from Provider.telegram import Telegram from Provider.youtube import YouTube +from Provider.fileio import FileIO from Provider.zeroxzero import ZeroXZero @@ -34,6 +35,7 @@ _PROVIDERS: Dict[str, Type[Provider]] = { "telegram": Telegram, # Upload-capable providers "0x0": ZeroXZero, + "file.io": FileIO, "matrix": Matrix, } diff --git a/Store/Folder.py b/Store/Folder.py index e55b9d8..f41d9f2 100644 --- a/Store/Folder.py +++ b/Store/Folder.py @@ -354,12 +354,49 @@ class Folder(Store): self.add_url(file_hash, url) return file_hash - # Move or copy file + # Move or copy file (with progress bar on actual byte transfer). + # Note: a same-volume move may be a fast rename and won't show progress. + def _copy_with_progress(src: Path, dst: Path, *, label: str) -> None: + from models import ProgressFileReader + + total_bytes = None + try: + total_bytes = int(src.stat().st_size) + except Exception: + total_bytes = None + + with src.open("rb") as r, dst.open("wb") as w: + reader = ProgressFileReader(r, total_bytes=total_bytes, label=label) + while True: + chunk = reader.read(1024 * 1024) + if not chunk: + break + w.write(chunk) + + # Preserve file metadata similar to shutil.copy2 + try: + shutil.copystat(str(src), str(dst)) + except Exception: + pass + if move_file: - shutil.move(str(file_path), str(save_file)) - debug(f"Local move: {save_file}", file=sys.stderr) + # Prefer native move; fall back to copy+delete with progress on failure. + try: + shutil.move(str(file_path), str(save_file)) + debug(f"Local move: {save_file}", file=sys.stderr) + except Exception: + _copy_with_progress(file_path, save_file, label=f"folder:{self._name} move") + try: + file_path.unlink(missing_ok=True) # type: ignore[arg-type] + except Exception: + try: + if file_path.exists(): + file_path.unlink() + except Exception: + pass + debug(f"Local move (copy+delete): {save_file}", file=sys.stderr) else: - shutil.copy2(str(file_path), str(save_file)) + _copy_with_progress(file_path, save_file, label=f"folder:{self._name} copy") debug(f"Local copy: {save_file}", file=sys.stderr) # Best-effort: capture duration for media diff --git a/cmdlet/download_file.py b/cmdlet/download_file.py index edff42d..4881da2 100644 --- a/cmdlet/download_file.py +++ b/cmdlet/download_file.py @@ -68,6 +68,23 @@ class Download_File(Cmdlet): if isinstance(raw_url, str): raw_url = [raw_url] + # Allow comma-separated URLs in a single argument. + # Example: download-file "https://a.pdf,https://b.pdf" + expanded_urls: List[str] = [] + for u in (raw_url or []): + if u is None: + continue + s = str(u).strip() + if not s: + continue + if "," in s: + parts = [p.strip() for p in s.split(",")] + expanded_urls.extend([p for p in parts if p]) + else: + expanded_urls.append(s) + if expanded_urls: + raw_url = expanded_urls + # If no URL args were provided, fall back to piped results (provider items) piped_items: List[Any] = [] if not raw_url: diff --git a/cmdlet/download_media.py b/cmdlet/download_media.py index 2e01f34..b2fc013 100644 --- a/cmdlet/download_media.py +++ b/cmdlet/download_media.py @@ -450,9 +450,8 @@ def _build_ytdlp_options(opts: DownloadOptions) -> Dict[str, Any]: except Exception: pass - # Avoid writing progress bars when running in quiet/background mode (e.g. mpv detached pipelines). - if not getattr(opts, "quiet", False): - base_options["progress_hooks"] = [_progress_callback] + # Always show a progress bar. The hook prints to stderr so piped stdout stays clean. + base_options["progress_hooks"] = [_progress_callback] if opts.cookies_path and opts.cookies_path.is_file(): base_options["cookiefile"] = str(opts.cookies_path) @@ -1432,6 +1431,23 @@ class Download_Media(Cmdlet): raw_url = parsed.get("url", []) if isinstance(raw_url, str): raw_url = [raw_url] + + # Allow a single quoted argument containing multiple URLs separated by commas. + # Example: download-media "https://a,https://b" -audio + expanded_urls: List[str] = [] + for u in (raw_url or []): + if u is None: + continue + s = str(u).strip() + if not s: + continue + if "," in s: + parts = [p.strip() for p in s.split(",")] + expanded_urls.extend([p for p in parts if p]) + else: + expanded_urls.append(s) + if expanded_urls: + raw_url = expanded_urls # If no url provided via args, try to extract from piped result if not raw_url and result: diff --git a/cmdlet/merge_file.py b/cmdlet/merge_file.py index b3894e6..9ec97ea 100644 --- a/cmdlet/merge_file.py +++ b/cmdlet/merge_file.py @@ -38,6 +38,7 @@ try: from metadata import ( read_tags_from_file, dedup_tags_by_namespace, + merge_multiple_tag_lists, ) HAS_METADATA_API = True except ImportError: @@ -58,6 +59,17 @@ except ImportError: def dedup_tags_by_namespace(tags: List[str]) -> List[str]: return tags + def merge_multiple_tag_lists(sources: List[List[str]], strategy: str = 'first') -> List[str]: + out: List[str] = [] + seen: set[str] = set() + for src in sources: + for t in (src or []): + s = str(t) + if s and s not in seen: + out.append(s) + seen.add(s) + return out + def write_metadata(*_args: Any, **_kwargs: Any) -> None: return None @@ -105,7 +117,8 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: source_files: List[Path] = [] source_hashes: List[str] = [] source_url: List[str] = [] - source_tags: List[str] = [] # NEW: collect tags from source files + source_tags: List[str] = [] # tags read from .tag sidecars + source_item_tag_lists: List[List[str]] = [] # tags carried in-memory on piped items for item in files_to_merge: raw_path = get_pipe_object_path(item) target_path = None @@ -119,6 +132,16 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: if target_path and target_path.exists(): source_files.append(target_path) + # Track tags carried in the piped items (e.g. add-tag stage) so they survive merge. + try: + raw_tags = get_field(item, 'tag', []) + if isinstance(raw_tags, str) and raw_tags.strip(): + source_item_tag_lists.append([raw_tags.strip()]) + elif isinstance(raw_tags, list): + source_item_tag_lists.append([str(t) for t in raw_tags if t is not None and str(t).strip()]) + except Exception: + pass + # Track tags from the .tag sidecar for this source (if present) tags_file = target_path.with_suffix(target_path.suffix + '.tag') if tags_file.exists() and HAS_METADATA_API: @@ -217,17 +240,46 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: return 1 log(f"Merged {len(source_files)} files into: {output_path}", file=sys.stderr) - - merged_tags: List[str] = [f"title:{output_path.stem}"] - # Merge tags from source files into the emitted PipeObject only. - # Sidecar files (.tag/.metadata) are written only during explicit filesystem export (add-file to a path). - if source_tags and HAS_METADATA_API: - merged_source_tags = dedup_tags_by_namespace(source_tags) - merged_tags.extend(merged_source_tags) - log(f"Merged {len(merged_source_tags)} unique tags from source files", file=sys.stderr) - elif source_tags: - merged_tags.extend(list(dict.fromkeys(source_tags))) # Preserve order, remove duplicates + def _title_value_from_tags(tags: List[str]) -> Optional[str]: + for t in tags: + try: + s = str(t) + except Exception: + continue + if s.lower().startswith('title:'): + val = s.split(':', 1)[1].strip() + return val or None + return None + + # Determine best title: + # - prefer a title tag shared across all inputs (typical when user did add-tag title:...) + # - otherwise fall back to first title tag encountered + shared_title: Optional[str] = None + try: + if source_item_tag_lists: + per_item_titles: List[Optional[str]] = [_title_value_from_tags(tl) for tl in source_item_tag_lists] + non_empty = [t for t in per_item_titles if t] + if non_empty: + candidate = non_empty[0] + if candidate and all((t == candidate) for t in non_empty): + shared_title = candidate + else: + shared_title = non_empty[0] + except Exception: + shared_title = None + + merged_title = shared_title or output_path.stem + + # Merge tags from: + # - in-memory PipeObject tags (from add-tag etc) + # - .tag sidecars (if present) + # Keep all unique plain tags, and keep the first value for namespaced tags. + merged_tags = merge_multiple_tag_lists(source_item_tag_lists + ([source_tags] if source_tags else []), strategy='combine') + + # Ensure we always have a title tag (and make sure it's the chosen title) + merged_tags = [t for t in merged_tags if not str(t).lower().startswith('title:')] + merged_tags.insert(0, f"title:{merged_title}") # Emit a PipeObject-compatible dict so the merged file can be piped to next command try: @@ -238,7 +290,7 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: identifier=output_path.name, file_path=str(output_path), cmdlet_name="merge-file", - title=output_path.stem, + title=merged_title, hash_value=merged_hash, tag=merged_tags, url=source_url, diff --git a/models.py b/models.py index 6c4869c..50dd484 100644 --- a/models.py +++ b/models.py @@ -7,6 +7,7 @@ import math import os import shutil import sys +import time from dataclasses import dataclass, field from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Protocol, TextIO, Tuple @@ -660,6 +661,102 @@ class ProgressBar: return " ".join(parts) +class ProgressFileReader: + """File-like wrapper that prints a ProgressBar as bytes are read. + + Intended for uploads: pass this wrapper as the file object to httpx/requests. + Progress is written to stderr (so pipelines remain clean). + """ + + def __init__(self, fileobj: Any, *, total_bytes: Optional[int], label: str = "upload", min_interval_s: float = 0.25): + self._f = fileobj + self._total = int(total_bytes) if total_bytes not in (None, 0, "") else 0 + self._label = str(label or "upload") + self._min_interval_s = max(0.05, float(min_interval_s)) + self._bar = ProgressBar() + self._start = time.time() + self._last = self._start + self._read = 0 + self._done = False + + def _render(self) -> None: + if self._done: + return + if self._total <= 0: + return + now = time.time() + if now - self._last < self._min_interval_s: + return + elapsed = max(0.001, now - self._start) + speed = float(self._read) / elapsed + eta_s = (float(self._total) - float(self._read)) / speed if speed > 0 else 0.0 + + minutes, seconds = divmod(int(max(0.0, eta_s)), 60) + hours, minutes = divmod(minutes, 60) + eta_str = f"{hours:02d}:{minutes:02d}:{seconds:02d}" + speed_str = self._bar.format_bytes(speed) + "/s" + percent = (float(self._read) / float(self._total)) * 100.0 if self._total > 0 else 0.0 + + line = self._bar.format_progress( + percent_str=f"{percent:.1f}%", + downloaded=int(self._read), + total=int(self._total), + speed_str=speed_str, + eta_str=eta_str, + ) + sys.stderr.write("\r" + f"[{self._label}] " + line + " ") + sys.stderr.flush() + self._last = now + + def _finish(self) -> None: + if self._done: + return + self._done = True + sys.stderr.write("\r" + (" " * 180) + "\r") + sys.stderr.write("\n") + sys.stderr.flush() + + def read(self, size: int = -1) -> Any: + chunk = self._f.read(size) + try: + if chunk: + self._read += len(chunk) + self._render() + else: + # EOF + self._finish() + except Exception: + pass + return chunk + + def seek(self, offset: int, whence: int = 0) -> Any: + out = self._f.seek(offset, whence) + try: + pos = int(self._f.tell()) + if pos <= 0: + self._read = 0 + self._start = time.time() + self._last = self._start + else: + self._read = pos + except Exception: + pass + return out + + def tell(self) -> Any: + return self._f.tell() + + def close(self) -> None: + try: + self._finish() + except Exception: + pass + return self._f.close() + + def __getattr__(self, name: str) -> Any: + return getattr(self._f, name) + + # ============================================================================ # PIPELINE EXECUTION CONTEXT # Consolidated from pipeline_context.py