This commit is contained in:
nose
2025-12-19 15:20:08 -08:00
parent d3edd6420c
commit 900a37e210
13 changed files with 729 additions and 32 deletions

1
.gitignore vendored
View File

@@ -224,3 +224,4 @@ MPV/ffmpeg/*
MPV/portable_config/* MPV/portable_config/*
Log/ Log/
Log/medeia_macina/telegram.session Log/medeia_macina/telegram.session
*.session

View File

@@ -151,10 +151,75 @@ class HydrusNetwork:
logger.debug(f"{self._log_prefix()} Uploading file {file_path.name} ({file_size} bytes)") logger.debug(f"{self._log_prefix()} Uploading file {file_path.name} ({file_size} bytes)")
# Stream upload body with a stderr progress bar (pipeline-safe).
try:
from models import ProgressBar
except Exception:
ProgressBar = None # type: ignore[assignment]
bar = ProgressBar() if ProgressBar is not None else None
label = f"{self._log_prefix().strip('[]')} upload"
start_t = time.time()
last_render_t = [start_t]
last_log_t = [start_t]
sent = [0]
tty = bool(getattr(sys.stderr, "isatty", lambda: False)())
def _render_progress(final: bool = False) -> None:
if bar is None:
return
if file_size <= 0:
return
now = time.time()
if not final and (now - float(last_render_t[0])) < 0.25:
return
last_render_t[0] = now
elapsed = max(0.001, now - start_t)
speed = float(sent[0]) / elapsed
eta_s = (float(file_size) - float(sent[0])) / speed if speed > 0 else 0.0
minutes, seconds = divmod(int(max(0.0, eta_s)), 60)
hours, minutes = divmod(minutes, 60)
eta_str = f"{hours:02d}:{minutes:02d}:{seconds:02d}"
speed_str = bar.format_bytes(speed) + "/s"
line = bar.format_progress(
percent_str=None,
downloaded=int(sent[0]),
total=int(file_size),
speed_str=speed_str,
eta_str=eta_str,
)
try:
if tty:
sys.stderr.write("\r" + f"[{label}] " + line + " ")
sys.stderr.flush()
else:
# Non-interactive: keep it quiet-ish.
if final or (now - float(last_log_t[0])) >= 2.0:
log(f"[{label}] {line}", file=sys.stderr)
last_log_t[0] = now
except Exception:
pass
def file_gen(): def file_gen():
with file_path.open("rb") as handle: try:
while chunk := handle.read(65536): with file_path.open("rb") as handle:
yield chunk while True:
chunk = handle.read(256 * 1024)
if not chunk:
break
sent[0] += len(chunk)
_render_progress(final=False)
yield chunk
finally:
_render_progress(final=True)
if tty:
try:
sys.stderr.write("\n")
sys.stderr.flush()
except Exception:
pass
response = client.request( response = client.request(
spec.method, spec.method,

Binary file not shown.

167
Provider/fileio.py Normal file
View File

@@ -0,0 +1,167 @@
from __future__ import annotations
import os
import sys
from typing import Any, Dict, Optional
from ProviderCore.base import Provider
from SYS.logger import log
def _pick_provider_config(config: Any) -> Dict[str, Any]:
if not isinstance(config, dict):
return {}
provider = config.get("provider")
if not isinstance(provider, dict):
return {}
entry = provider.get("file.io")
if isinstance(entry, dict):
return entry
return {}
def _extract_link(payload: Any) -> Optional[str]:
if isinstance(payload, dict):
for key in ("link", "url", "downloadLink", "download_url"):
val = payload.get(key)
if isinstance(val, str) and val.strip().startswith(("http://", "https://")):
return val.strip()
for nested_key in ("data", "file", "result"):
nested = payload.get(nested_key)
found = _extract_link(nested)
if found:
return found
return None
def _extract_key(payload: Any) -> Optional[str]:
if isinstance(payload, dict):
for key in ("key", "id", "uuid"):
val = payload.get(key)
if isinstance(val, str) and val.strip():
return val.strip()
for nested_key in ("data", "file", "result"):
nested = payload.get(nested_key)
found = _extract_key(nested)
if found:
return found
return None
class FileIO(Provider):
"""File provider for file.io."""
def __init__(self, config: Optional[Dict[str, Any]] = None):
super().__init__(config)
conf = _pick_provider_config(self.config)
self._base_url = str(conf.get("base_url") or "https://file.io").strip().rstrip("/")
self._api_key = conf.get("api_key")
self._default_expires = conf.get("expires")
self._default_max_downloads = conf.get("maxDownloads")
if self._default_max_downloads is None:
self._default_max_downloads = conf.get("max_downloads")
self._default_auto_delete = conf.get("autoDelete")
if self._default_auto_delete is None:
self._default_auto_delete = conf.get("auto_delete")
def validate(self) -> bool:
return True
def upload(self, file_path: str, **kwargs: Any) -> str:
from API.HTTP import HTTPClient
from models import ProgressFileReader
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
data: Dict[str, Any] = {}
expires = kwargs.get("expires", self._default_expires)
max_downloads = kwargs.get("maxDownloads", kwargs.get("max_downloads", self._default_max_downloads))
auto_delete = kwargs.get("autoDelete", kwargs.get("auto_delete", self._default_auto_delete))
if expires not in (None, ""):
data["expires"] = expires
if max_downloads not in (None, ""):
data["maxDownloads"] = max_downloads
if auto_delete not in (None, ""):
data["autoDelete"] = auto_delete
headers: Dict[str, str] = {"User-Agent": "Medeia-Macina/1.0", "Accept": "application/json"}
if isinstance(self._api_key, str) and self._api_key.strip():
# Some file.io plans use bearer tokens; keep optional.
headers["Authorization"] = f"Bearer {self._api_key.strip()}"
try:
with HTTPClient(headers=headers) as client:
with open(file_path, "rb") as handle:
filename = os.path.basename(file_path)
try:
total = os.path.getsize(file_path)
except Exception:
total = None
wrapped = ProgressFileReader(handle, total_bytes=total, label="upload")
response = client.request(
"POST",
f"{self._base_url}/upload",
data=data or None,
files={"file": (filename, wrapped)},
follow_redirects=True,
raise_for_status=False,
)
if response.status_code >= 400:
location = response.headers.get("location") or response.headers.get("Location")
ct = response.headers.get("content-type") or response.headers.get("Content-Type")
raise Exception(f"Upload failed: {response.status_code} (content-type={ct}, location={location}) - {response.text}")
payload: Any
try:
payload = response.json()
except Exception:
payload = None
# If the server ignored our Accept header and returned HTML, this is almost
# certainly the wrong endpoint or an upstream block.
ct = (response.headers.get("content-type") or response.headers.get("Content-Type") or "").lower()
if (payload is None) and ("text/html" in ct):
raise Exception("file.io returned HTML instead of JSON; expected API response from /upload")
if isinstance(payload, dict) and payload.get("success") is False:
reason = payload.get("message") or payload.get("error") or payload.get("status")
raise Exception(str(reason or "Upload failed"))
uploaded_url = _extract_link(payload)
if not uploaded_url:
# Some APIs may return the link as plain text.
text = str(response.text or "").strip()
if text.startswith(("http://", "https://")):
uploaded_url = text
if not uploaded_url:
key = _extract_key(payload)
if key:
uploaded_url = f"{self._base_url}/{key.lstrip('/')}"
if not uploaded_url:
try:
snippet = (response.text or "").strip()
if len(snippet) > 300:
snippet = snippet[:300] + "..."
except Exception:
snippet = "<unreadable response>"
raise Exception(f"Upload succeeded but response did not include a link (response: {snippet})")
try:
pipe_obj = kwargs.get("pipe_obj")
if pipe_obj is not None:
from Store import Store
Store(self.config, suppress_debug=True).try_add_url_for_pipe_object(pipe_obj, uploaded_url)
except Exception:
pass
return uploaded_url
except Exception as exc:
log(f"[file.io] Upload error: {exc}", file=sys.stderr)
raise

View File

@@ -146,6 +146,7 @@ class Matrix(Provider):
def upload_to_room(self, file_path: str, room_id: str, **kwargs: Any) -> str: def upload_to_room(self, file_path: str, room_id: str, **kwargs: Any) -> str:
"""Upload a file and send it to a specific room.""" """Upload a file and send it to a specific room."""
from models import ProgressFileReader
path = Path(file_path) path = Path(file_path)
if not path.exists(): if not path.exists():
raise FileNotFoundError(f"File not found: {file_path}") raise FileNotFoundError(f"File not found: {file_path}")
@@ -167,7 +168,8 @@ class Matrix(Provider):
# Upload media # Upload media
upload_url = f"{base}/_matrix/media/v3/upload" upload_url = f"{base}/_matrix/media/v3/upload"
with open(path, "rb") as handle: with open(path, "rb") as handle:
resp = requests.post(upload_url, headers=headers, data=handle, params={"filename": filename}) wrapped = ProgressFileReader(handle, total_bytes=int(path.stat().st_size), label="upload")
resp = requests.post(upload_url, headers=headers, data=wrapped, params={"filename": filename})
if resp.status_code != 200: if resp.status_code != 200:
raise Exception(f"Matrix upload failed: {resp.text}") raise Exception(f"Matrix upload failed: {resp.text}")
content_uri = (resp.json() or {}).get("content_uri") content_uri = (resp.json() or {}).get("content_uri")

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
import asyncio import asyncio
import re import re
import shutil
import sys import sys
import time import time
from pathlib import Path from pathlib import Path
@@ -79,6 +80,215 @@ class Telegram(Provider):
telegram_conf = self.config.get("provider", {}).get("telegram", {}) if isinstance(self.config, dict) else {} telegram_conf = self.config.get("provider", {}).get("telegram", {}) if isinstance(self.config, dict) else {}
self._app_id = telegram_conf.get("app_id") self._app_id = telegram_conf.get("app_id")
self._api_hash = telegram_conf.get("api_hash") self._api_hash = telegram_conf.get("api_hash")
self._bot_token = telegram_conf.get("bot_token")
# Telethon downloads are chunked; larger parts mean fewer round-trips.
# Telethon typically expects 4..1024 KB and divisible by 4.
self._part_size_kb = telegram_conf.get("part_size_kb")
if self._part_size_kb is None:
self._part_size_kb = telegram_conf.get("chunk_kb")
if self._part_size_kb is None:
self._part_size_kb = telegram_conf.get("download_part_kb")
# Avoid repeatedly prompting during startup where validate() may be called multiple times.
_startup_auth_attempted: bool = False
def _legacy_session_base_path(self) -> Path:
# Older versions stored sessions under Log/medeia_macina.
root = Path(__file__).resolve().parents[1]
return root / "Log" / "medeia_macina" / "telegram"
def _migrate_legacy_session_if_needed(self) -> None:
"""If a legacy Telethon session exists, copy it to the new root location."""
try:
new_base = self._session_base_path()
new_session = Path(str(new_base) + ".session")
if new_session.is_file():
return
legacy_base = self._legacy_session_base_path()
legacy_session = Path(str(legacy_base) + ".session")
if not legacy_session.is_file():
return
for suffix in (".session", ".session-journal", ".session-wal", ".session-shm"):
src = Path(str(legacy_base) + suffix)
dst = Path(str(new_base) + suffix)
try:
if src.is_file() and not dst.exists():
shutil.copy2(str(src), str(dst))
except Exception:
continue
except Exception:
return
def _session_file_path(self) -> Path:
self._migrate_legacy_session_if_needed()
base = self._session_base_path()
return Path(str(base) + ".session")
def _has_session(self) -> bool:
self._migrate_legacy_session_if_needed()
try:
return self._session_file_path().is_file()
except Exception:
return False
def _ensure_session_interactive(self) -> bool:
"""Best-effort interactive auth to create a Telethon session file.
Returns True if a session exists after the attempt.
"""
if self._has_session():
return True
# Never prompt in non-interactive contexts.
try:
if not bool(getattr(sys.stdin, "isatty", lambda: False)()):
return False
except Exception:
return False
try:
from telethon.sync import TelegramClient
except Exception:
return False
try:
app_id, api_hash = self._credentials()
except Exception:
return False
self._ensure_event_loop()
loop = asyncio.get_event_loop()
if getattr(loop, "is_running", lambda: False)():
# Can't safely prompt/auth while another loop is running.
return False
def _resolve(value):
if asyncio.iscoroutine(value):
return loop.run_until_complete(value)
return value
try:
sys.stderr.write("[telegram] No session found; login required.\n")
sys.stderr.write("[telegram] Choose login method: 1) phone 2) bot token\n")
sys.stderr.write("[telegram] Enter 1 or 2: ")
sys.stderr.flush()
choice = ""
try:
choice = str(input()).strip().lower()
except EOFError:
choice = ""
use_bot = choice in {"2", "b", "bot", "token"}
bot_token = ""
if use_bot:
sys.stderr.write("[telegram] Bot token: ")
sys.stderr.flush()
try:
bot_token = str(input()).strip()
except EOFError:
bot_token = ""
if not bot_token:
return False
self._bot_token = bot_token
else:
sys.stderr.write("[telegram] Phone login selected (Telethon will prompt for phone + code).\n")
sys.stderr.flush()
session_base = self._session_base_path()
client = TelegramClient(str(session_base), app_id, api_hash)
try:
if use_bot:
_resolve(client.start(bot_token=bot_token))
else:
_resolve(client.start())
finally:
try:
_resolve(client.disconnect())
except Exception:
pass
finally:
try:
sys.stderr.write("\n")
sys.stderr.flush()
except Exception:
pass
return self._has_session()
def _ensure_session_with_bot_token(self, bot_token: str) -> bool:
"""Create a Telethon session using a bot token without prompting.
Returns True if a session exists after the attempt.
"""
if self._has_session():
return True
bot_token = str(bot_token or "").strip()
if not bot_token:
return False
try:
from telethon.sync import TelegramClient
except Exception:
return False
try:
app_id, api_hash = self._credentials()
except Exception:
return False
self._ensure_event_loop()
loop = asyncio.get_event_loop()
if getattr(loop, "is_running", lambda: False)():
return False
def _resolve(value):
if asyncio.iscoroutine(value):
return loop.run_until_complete(value)
return value
session_base = self._session_base_path()
client = TelegramClient(str(session_base), app_id, api_hash)
try:
_resolve(client.start(bot_token=bot_token))
finally:
try:
_resolve(client.disconnect())
except Exception:
pass
return self._has_session()
def _resolve_part_size_kb(self, file_size: Optional[int]) -> int:
# Default: bias to max throughput.
val = self._part_size_kb
try:
if val not in (None, ""):
ps = int(str(val).strip())
else:
ps = 1024
except Exception:
ps = 1024
# Clamp to Telethon-safe range.
if ps < 4:
ps = 4
if ps > 1024:
ps = 1024
# Must be divisible by 4.
ps = int(ps / 4) * 4
if ps <= 0:
ps = 64
# For very small files, reduce overhead a bit (still divisible by 4).
try:
if file_size is not None and int(file_size) > 0:
if int(file_size) < 2 * 1024 * 1024:
ps = min(ps, 256)
elif int(file_size) < 10 * 1024 * 1024:
ps = min(ps, 512)
except Exception:
pass
return ps
def validate(self) -> bool: def validate(self) -> bool:
try: try:
@@ -91,16 +301,29 @@ class Telegram(Provider):
except Exception: except Exception:
app_id = None app_id = None
api_hash = str(self._api_hash).strip() if self._api_hash not in (None, "") else "" api_hash = str(self._api_hash).strip() if self._api_hash not in (None, "") else ""
return bool(app_id and api_hash) if not bool(app_id and api_hash):
return False
# Consider Telegram "configured" only if a persisted session exists.
if self._has_session():
return True
# If a bot token is already configured, attempt a non-interactive login.
bot_token = str(self._bot_token or "").strip()
if bot_token:
return bool(self._ensure_session_with_bot_token(bot_token))
# Interactive startup prompt (only once per process).
if Telegram._startup_auth_attempted:
return False
Telegram._startup_auth_attempted = True
return bool(self._ensure_session_interactive())
def _session_base_path(self) -> Path: def _session_base_path(self) -> Path:
# Store session alongside cookies.txt at repo root.
# Telethon uses this as base name and writes "<base>.session".
root = Path(__file__).resolve().parents[1] root = Path(__file__).resolve().parents[1]
session_dir = root / "Log" / "medeia_macina" return root / "telegram"
try:
session_dir.mkdir(parents=True, exist_ok=True)
except Exception:
pass
return session_dir / "telegram"
def _credentials(self) -> Tuple[int, str]: def _credentials(self) -> Tuple[int, str]:
raw_app_id = self._app_id raw_app_id = self._app_id
@@ -266,7 +489,19 @@ class Telegram(Provider):
except Exception: except Exception:
return return
downloaded = _resolve(client.download_media(message, file=str(output_dir), progress_callback=_progress)) part_kb = self._resolve_part_size_kb(file_size)
try:
downloaded = _resolve(
client.download_media(
message,
file=str(output_dir),
progress_callback=_progress,
part_size_kb=part_kb,
)
)
except TypeError:
# Older/newer Telethon versions may not accept part_size_kb on download_media.
downloaded = _resolve(client.download_media(message, file=str(output_dir), progress_callback=_progress))
try: try:
sys.stderr.write("\n") sys.stderr.write("\n")
sys.stderr.flush() sys.stderr.flush()

View File

@@ -13,6 +13,7 @@ class ZeroXZero(Provider):
def upload(self, file_path: str, **kwargs: Any) -> str: def upload(self, file_path: str, **kwargs: Any) -> str:
from API.HTTP import HTTPClient from API.HTTP import HTTPClient
from models import ProgressFileReader
if not os.path.exists(file_path): if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}") raise FileNotFoundError(f"File not found: {file_path}")
@@ -21,7 +22,12 @@ class ZeroXZero(Provider):
headers = {"User-Agent": "Medeia-Macina/1.0"} headers = {"User-Agent": "Medeia-Macina/1.0"}
with HTTPClient(headers=headers) as client: with HTTPClient(headers=headers) as client:
with open(file_path, "rb") as handle: with open(file_path, "rb") as handle:
response = client.post("https://0x0.st", files={"file": handle}) try:
total = os.path.getsize(file_path)
except Exception:
total = None
wrapped = ProgressFileReader(handle, total_bytes=total, label="upload")
response = client.post("https://0x0.st", files={"file": wrapped})
if response.status_code == 200: if response.status_code == 200:
uploaded_url = response.text.strip() uploaded_url = response.text.strip()

View File

@@ -20,6 +20,7 @@ from Provider.openlibrary import OpenLibrary
from Provider.soulseek import Soulseek, download_soulseek_file from Provider.soulseek import Soulseek, download_soulseek_file
from Provider.telegram import Telegram from Provider.telegram import Telegram
from Provider.youtube import YouTube from Provider.youtube import YouTube
from Provider.fileio import FileIO
from Provider.zeroxzero import ZeroXZero from Provider.zeroxzero import ZeroXZero
@@ -34,6 +35,7 @@ _PROVIDERS: Dict[str, Type[Provider]] = {
"telegram": Telegram, "telegram": Telegram,
# Upload-capable providers # Upload-capable providers
"0x0": ZeroXZero, "0x0": ZeroXZero,
"file.io": FileIO,
"matrix": Matrix, "matrix": Matrix,
} }

View File

@@ -354,12 +354,49 @@ class Folder(Store):
self.add_url(file_hash, url) self.add_url(file_hash, url)
return file_hash return file_hash
# Move or copy file # Move or copy file (with progress bar on actual byte transfer).
# Note: a same-volume move may be a fast rename and won't show progress.
def _copy_with_progress(src: Path, dst: Path, *, label: str) -> None:
from models import ProgressFileReader
total_bytes = None
try:
total_bytes = int(src.stat().st_size)
except Exception:
total_bytes = None
with src.open("rb") as r, dst.open("wb") as w:
reader = ProgressFileReader(r, total_bytes=total_bytes, label=label)
while True:
chunk = reader.read(1024 * 1024)
if not chunk:
break
w.write(chunk)
# Preserve file metadata similar to shutil.copy2
try:
shutil.copystat(str(src), str(dst))
except Exception:
pass
if move_file: if move_file:
shutil.move(str(file_path), str(save_file)) # Prefer native move; fall back to copy+delete with progress on failure.
debug(f"Local move: {save_file}", file=sys.stderr) try:
shutil.move(str(file_path), str(save_file))
debug(f"Local move: {save_file}", file=sys.stderr)
except Exception:
_copy_with_progress(file_path, save_file, label=f"folder:{self._name} move")
try:
file_path.unlink(missing_ok=True) # type: ignore[arg-type]
except Exception:
try:
if file_path.exists():
file_path.unlink()
except Exception:
pass
debug(f"Local move (copy+delete): {save_file}", file=sys.stderr)
else: else:
shutil.copy2(str(file_path), str(save_file)) _copy_with_progress(file_path, save_file, label=f"folder:{self._name} copy")
debug(f"Local copy: {save_file}", file=sys.stderr) debug(f"Local copy: {save_file}", file=sys.stderr)
# Best-effort: capture duration for media # Best-effort: capture duration for media

View File

@@ -68,6 +68,23 @@ class Download_File(Cmdlet):
if isinstance(raw_url, str): if isinstance(raw_url, str):
raw_url = [raw_url] raw_url = [raw_url]
# Allow comma-separated URLs in a single argument.
# Example: download-file "https://a.pdf,https://b.pdf"
expanded_urls: List[str] = []
for u in (raw_url or []):
if u is None:
continue
s = str(u).strip()
if not s:
continue
if "," in s:
parts = [p.strip() for p in s.split(",")]
expanded_urls.extend([p for p in parts if p])
else:
expanded_urls.append(s)
if expanded_urls:
raw_url = expanded_urls
# If no URL args were provided, fall back to piped results (provider items) # If no URL args were provided, fall back to piped results (provider items)
piped_items: List[Any] = [] piped_items: List[Any] = []
if not raw_url: if not raw_url:

View File

@@ -450,9 +450,8 @@ def _build_ytdlp_options(opts: DownloadOptions) -> Dict[str, Any]:
except Exception: except Exception:
pass pass
# Avoid writing progress bars when running in quiet/background mode (e.g. mpv detached pipelines). # Always show a progress bar. The hook prints to stderr so piped stdout stays clean.
if not getattr(opts, "quiet", False): base_options["progress_hooks"] = [_progress_callback]
base_options["progress_hooks"] = [_progress_callback]
if opts.cookies_path and opts.cookies_path.is_file(): if opts.cookies_path and opts.cookies_path.is_file():
base_options["cookiefile"] = str(opts.cookies_path) base_options["cookiefile"] = str(opts.cookies_path)
@@ -1432,6 +1431,23 @@ class Download_Media(Cmdlet):
raw_url = parsed.get("url", []) raw_url = parsed.get("url", [])
if isinstance(raw_url, str): if isinstance(raw_url, str):
raw_url = [raw_url] raw_url = [raw_url]
# Allow a single quoted argument containing multiple URLs separated by commas.
# Example: download-media "https://a,https://b" -audio
expanded_urls: List[str] = []
for u in (raw_url or []):
if u is None:
continue
s = str(u).strip()
if not s:
continue
if "," in s:
parts = [p.strip() for p in s.split(",")]
expanded_urls.extend([p for p in parts if p])
else:
expanded_urls.append(s)
if expanded_urls:
raw_url = expanded_urls
# If no url provided via args, try to extract from piped result # If no url provided via args, try to extract from piped result
if not raw_url and result: if not raw_url and result:

View File

@@ -38,6 +38,7 @@ try:
from metadata import ( from metadata import (
read_tags_from_file, read_tags_from_file,
dedup_tags_by_namespace, dedup_tags_by_namespace,
merge_multiple_tag_lists,
) )
HAS_METADATA_API = True HAS_METADATA_API = True
except ImportError: except ImportError:
@@ -58,6 +59,17 @@ except ImportError:
def dedup_tags_by_namespace(tags: List[str]) -> List[str]: def dedup_tags_by_namespace(tags: List[str]) -> List[str]:
return tags return tags
def merge_multiple_tag_lists(sources: List[List[str]], strategy: str = 'first') -> List[str]:
out: List[str] = []
seen: set[str] = set()
for src in sources:
for t in (src or []):
s = str(t)
if s and s not in seen:
out.append(s)
seen.add(s)
return out
def write_metadata(*_args: Any, **_kwargs: Any) -> None: def write_metadata(*_args: Any, **_kwargs: Any) -> None:
return None return None
@@ -105,7 +117,8 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
source_files: List[Path] = [] source_files: List[Path] = []
source_hashes: List[str] = [] source_hashes: List[str] = []
source_url: List[str] = [] source_url: List[str] = []
source_tags: List[str] = [] # NEW: collect tags from source files source_tags: List[str] = [] # tags read from .tag sidecars
source_item_tag_lists: List[List[str]] = [] # tags carried in-memory on piped items
for item in files_to_merge: for item in files_to_merge:
raw_path = get_pipe_object_path(item) raw_path = get_pipe_object_path(item)
target_path = None target_path = None
@@ -119,6 +132,16 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
if target_path and target_path.exists(): if target_path and target_path.exists():
source_files.append(target_path) source_files.append(target_path)
# Track tags carried in the piped items (e.g. add-tag stage) so they survive merge.
try:
raw_tags = get_field(item, 'tag', [])
if isinstance(raw_tags, str) and raw_tags.strip():
source_item_tag_lists.append([raw_tags.strip()])
elif isinstance(raw_tags, list):
source_item_tag_lists.append([str(t) for t in raw_tags if t is not None and str(t).strip()])
except Exception:
pass
# Track tags from the .tag sidecar for this source (if present) # Track tags from the .tag sidecar for this source (if present)
tags_file = target_path.with_suffix(target_path.suffix + '.tag') tags_file = target_path.with_suffix(target_path.suffix + '.tag')
if tags_file.exists() and HAS_METADATA_API: if tags_file.exists() and HAS_METADATA_API:
@@ -217,17 +240,46 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
return 1 return 1
log(f"Merged {len(source_files)} files into: {output_path}", file=sys.stderr) log(f"Merged {len(source_files)} files into: {output_path}", file=sys.stderr)
merged_tags: List[str] = [f"title:{output_path.stem}"]
# Merge tags from source files into the emitted PipeObject only. def _title_value_from_tags(tags: List[str]) -> Optional[str]:
# Sidecar files (.tag/.metadata) are written only during explicit filesystem export (add-file to a path). for t in tags:
if source_tags and HAS_METADATA_API: try:
merged_source_tags = dedup_tags_by_namespace(source_tags) s = str(t)
merged_tags.extend(merged_source_tags) except Exception:
log(f"Merged {len(merged_source_tags)} unique tags from source files", file=sys.stderr) continue
elif source_tags: if s.lower().startswith('title:'):
merged_tags.extend(list(dict.fromkeys(source_tags))) # Preserve order, remove duplicates val = s.split(':', 1)[1].strip()
return val or None
return None
# Determine best title:
# - prefer a title tag shared across all inputs (typical when user did add-tag title:...)
# - otherwise fall back to first title tag encountered
shared_title: Optional[str] = None
try:
if source_item_tag_lists:
per_item_titles: List[Optional[str]] = [_title_value_from_tags(tl) for tl in source_item_tag_lists]
non_empty = [t for t in per_item_titles if t]
if non_empty:
candidate = non_empty[0]
if candidate and all((t == candidate) for t in non_empty):
shared_title = candidate
else:
shared_title = non_empty[0]
except Exception:
shared_title = None
merged_title = shared_title or output_path.stem
# Merge tags from:
# - in-memory PipeObject tags (from add-tag etc)
# - .tag sidecars (if present)
# Keep all unique plain tags, and keep the first value for namespaced tags.
merged_tags = merge_multiple_tag_lists(source_item_tag_lists + ([source_tags] if source_tags else []), strategy='combine')
# Ensure we always have a title tag (and make sure it's the chosen title)
merged_tags = [t for t in merged_tags if not str(t).lower().startswith('title:')]
merged_tags.insert(0, f"title:{merged_title}")
# Emit a PipeObject-compatible dict so the merged file can be piped to next command # Emit a PipeObject-compatible dict so the merged file can be piped to next command
try: try:
@@ -238,7 +290,7 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
identifier=output_path.name, identifier=output_path.name,
file_path=str(output_path), file_path=str(output_path),
cmdlet_name="merge-file", cmdlet_name="merge-file",
title=output_path.stem, title=merged_title,
hash_value=merged_hash, hash_value=merged_hash,
tag=merged_tags, tag=merged_tags,
url=source_url, url=source_url,

View File

@@ -7,6 +7,7 @@ import math
import os import os
import shutil import shutil
import sys import sys
import time
from dataclasses import dataclass, field from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Protocol, TextIO, Tuple from typing import Any, Callable, Dict, List, Optional, Protocol, TextIO, Tuple
@@ -660,6 +661,102 @@ class ProgressBar:
return " ".join(parts) return " ".join(parts)
class ProgressFileReader:
"""File-like wrapper that prints a ProgressBar as bytes are read.
Intended for uploads: pass this wrapper as the file object to httpx/requests.
Progress is written to stderr (so pipelines remain clean).
"""
def __init__(self, fileobj: Any, *, total_bytes: Optional[int], label: str = "upload", min_interval_s: float = 0.25):
self._f = fileobj
self._total = int(total_bytes) if total_bytes not in (None, 0, "") else 0
self._label = str(label or "upload")
self._min_interval_s = max(0.05, float(min_interval_s))
self._bar = ProgressBar()
self._start = time.time()
self._last = self._start
self._read = 0
self._done = False
def _render(self) -> None:
if self._done:
return
if self._total <= 0:
return
now = time.time()
if now - self._last < self._min_interval_s:
return
elapsed = max(0.001, now - self._start)
speed = float(self._read) / elapsed
eta_s = (float(self._total) - float(self._read)) / speed if speed > 0 else 0.0
minutes, seconds = divmod(int(max(0.0, eta_s)), 60)
hours, minutes = divmod(minutes, 60)
eta_str = f"{hours:02d}:{minutes:02d}:{seconds:02d}"
speed_str = self._bar.format_bytes(speed) + "/s"
percent = (float(self._read) / float(self._total)) * 100.0 if self._total > 0 else 0.0
line = self._bar.format_progress(
percent_str=f"{percent:.1f}%",
downloaded=int(self._read),
total=int(self._total),
speed_str=speed_str,
eta_str=eta_str,
)
sys.stderr.write("\r" + f"[{self._label}] " + line + " ")
sys.stderr.flush()
self._last = now
def _finish(self) -> None:
if self._done:
return
self._done = True
sys.stderr.write("\r" + (" " * 180) + "\r")
sys.stderr.write("\n")
sys.stderr.flush()
def read(self, size: int = -1) -> Any:
chunk = self._f.read(size)
try:
if chunk:
self._read += len(chunk)
self._render()
else:
# EOF
self._finish()
except Exception:
pass
return chunk
def seek(self, offset: int, whence: int = 0) -> Any:
out = self._f.seek(offset, whence)
try:
pos = int(self._f.tell())
if pos <= 0:
self._read = 0
self._start = time.time()
self._last = self._start
else:
self._read = pos
except Exception:
pass
return out
def tell(self) -> Any:
return self._f.tell()
def close(self) -> None:
try:
self._finish()
except Exception:
pass
return self._f.close()
def __getattr__(self, name: str) -> Any:
return getattr(self._f, name)
# ============================================================================ # ============================================================================
# PIPELINE EXECUTION CONTEXT # PIPELINE EXECUTION CONTEXT
# Consolidated from pipeline_context.py # Consolidated from pipeline_context.py