from __future__ import annotations from pathlib import Path from typing import Any, Dict, List, Optional, Sequence import sys import tempfile import re import uuid from urllib.parse import parse_qs, urlparse from cmdlet._shared import Cmdlet, CmdletArg from SYS.logger import log, debug from SYS.result_table import ResultTable from SYS import pipeline as ctx _MATRIX_PENDING_ITEMS_KEY = "matrix_pending_items" _MATRIX_PENDING_TEXT_KEY = "matrix_pending_text" def _has_flag(args: Sequence[str], flag: str) -> bool: try: want = str(flag or "").strip().lower() if not want: return False return any(str(a).strip().lower() == want for a in (args or [])) except Exception: return False def _parse_config_room_filter_ids(config: Dict[str, Any]) -> List[str]: try: if not isinstance(config, dict): return [] providers = config.get("provider") if not isinstance(providers, dict): return [] matrix_conf = providers.get("matrix") if not isinstance(matrix_conf, dict): return [] raw = None # Support a few common spellings; `room` is the documented key. for key in ("room", "room_id", "rooms", "room_ids"): if key in matrix_conf: raw = matrix_conf.get(key) break if raw is None: return [] # Allow either a string or a list-like value. if isinstance(raw, (list, tuple, set)): items = [str(v).strip() for v in raw if str(v).strip()] return items text = str(raw or "").strip() if not text: return [] # Comma-separated list of room IDs, but be tolerant of whitespace/newlines. items = [p.strip() for p in re.split(r"[,\s]+", text) if p and p.strip()] return items except Exception: return [] def _get_matrix_size_limit_bytes(config: Dict[str, Any]) -> Optional[int]: """Return max allowed per-file size in bytes for Matrix uploads. Config: [provider=Matrix] size_limit=50 # MB """ try: if not isinstance(config, dict): return None providers = config.get("provider") if not isinstance(providers, dict): return None matrix_conf = providers.get("matrix") if not isinstance(matrix_conf, dict): return None raw = None for key in ("size_limit", "size_limit_mb", "max_mb"): if key in matrix_conf: raw = matrix_conf.get(key) break if raw is None: return None mb: Optional[float] = None if isinstance(raw, (int, float)): mb = float(raw) else: text = str(raw or "").strip().lower() if not text: return None m = re.fullmatch(r"(\d+(?:\.\d+)?)\s*(mb|mib|m)?", text) if not m: return None mb = float(m.group(1)) if mb is None or mb <= 0: return None # Use MiB semantics for predictable limits. return int(mb * 1024 * 1024) except Exception: return None def _room_id_matches_filter(room_id: str, allowed_ids_canon: set[str]) -> bool: rid = str(room_id or "").strip() if not rid or not allowed_ids_canon: return False rid_canon = rid.casefold() if rid_canon in allowed_ids_canon: return True # Allow matching when config omits the homeserver part: "!abc" matches "!abc:server". base = rid.split(":", 1)[0].strip().casefold() return bool(base) and base in allowed_ids_canon def _extract_text_arg(args: Sequence[str]) -> str: """Extract a `-text ` argument from a cmdnat args list.""" if not args: return "" try: tokens = list(args) except Exception: return "" for i, tok in enumerate(tokens): try: if str(tok).lower() == "-text" and i + 1 < len(tokens): return str(tokens[i + 1] or "").strip() except Exception: continue return "" def _normalize_to_list(value: Any) -> List[Any]: if value is None: return [] if isinstance(value, list): return value return [value] def _extract_room_id(room_obj: Any) -> Optional[str]: try: # PipeObject stores unknown fields in .extra if hasattr(room_obj, "extra"): extra = getattr(room_obj, "extra") if isinstance(extra, dict): rid = extra.get("room_id") if isinstance(rid, str) and rid.strip(): return rid.strip() # Dict fallback if isinstance(room_obj, dict): rid = room_obj.get("room_id") if isinstance(rid, str) and rid.strip(): return rid.strip() except Exception: pass return None def _extract_file_path(item: Any) -> Optional[str]: """Best-effort local file path extraction. Returns a filesystem path string only if it exists. """ def _maybe_local_path(value: Any) -> Optional[str]: if value is None: return None if isinstance(value, Path): candidate_path = value else: text = str(value).strip() if not text: return None # Treat URLs as not-local. if text.startswith("http://") or text.startswith("https://"): return None candidate_path = Path(text).expanduser() try: if candidate_path.exists(): return str(candidate_path) except Exception: return None return None try: if hasattr(item, "path"): found = _maybe_local_path(getattr(item, "path")) if found: return found if hasattr(item, "file_path"): found = _maybe_local_path(getattr(item, "file_path")) if found: return found if isinstance(item, dict): for key in ("path", "file_path", "target"): found = _maybe_local_path(item.get(key)) if found: return found except Exception: pass return None def _extract_url(item: Any) -> Optional[str]: try: if hasattr(item, "url"): raw = getattr(item, "url") if isinstance(raw, str) and raw.strip(): return raw.strip() if isinstance(raw, (list, tuple)): for v in raw: if isinstance(v, str) and v.strip(): return v.strip() if hasattr(item, "source_url"): raw = getattr(item, "source_url") if isinstance(raw, str) and raw.strip(): return raw.strip() if isinstance(item, dict): for key in ("url", "source_url", "path", "target"): raw = item.get(key) if (isinstance(raw, str) and raw.strip() and raw.strip().startswith( ("http://", "https://"))): return raw.strip() except Exception: pass return None _SHA256_RE = re.compile(r"^[0-9a-fA-F]{64}$") def _extract_sha256_hex(item: Any) -> Optional[str]: try: if hasattr(item, "hash"): h = getattr(item, "hash") if isinstance(h, str) and _SHA256_RE.fullmatch(h.strip()): return h.strip().lower() if isinstance(item, dict): h = item.get("hash") if isinstance(h, str) and _SHA256_RE.fullmatch(h.strip()): return h.strip().lower() except Exception: pass return None def _extract_hash_from_hydrus_file_url(url: str) -> Optional[str]: try: parsed = urlparse(url) if not (parsed.path or "").endswith("/get_files/file"): return None qs = parse_qs(parsed.query or "") h = (qs.get("hash") or [None])[0] if isinstance(h, str) and _SHA256_RE.fullmatch(h.strip()): return h.strip().lower() except Exception: pass return None def _maybe_download_hydrus_file(item: Any, config: Dict[str, Any], output_dir: Path) -> Optional[str]: """If the item looks like a Hydrus file (hash + Hydrus URL), download it using Hydrus access key headers. This avoids 401 from Hydrus when the URL is /get_files/file?hash=... without headers. """ try: from SYS.config import get_hydrus_access_key, get_hydrus_url from API.HydrusNetwork import HydrusNetwork as HydrusClient, download_hydrus_file # Prefer per-item Hydrus instance name when it matches a configured instance. store_name = None if isinstance(item, dict): store_name = item.get("store") else: store_name = getattr(item, "store", None) store_name = str(store_name).strip() if store_name else "" # Try the store name as instance key first; fallback to "home". instance_candidates = [s for s in [store_name.lower(), "home"] if s] hydrus_url = None access_key = None for inst in instance_candidates: access_key = (get_hydrus_access_key(config, inst) or "").strip() or None hydrus_url = (get_hydrus_url(config, inst) or "").strip() or None if access_key and hydrus_url: break if not access_key or not hydrus_url: return None url = _extract_url(item) file_hash = _extract_sha256_hex(item) if url and not file_hash: file_hash = _extract_hash_from_hydrus_file_url(url) # If it doesn't look like a Hydrus file, skip. if not file_hash: return None # Only treat it as Hydrus when we have a matching /get_files/file URL OR the item store suggests it. is_hydrus_url = False if url: parsed = urlparse(url) is_hydrus_url = (parsed.path or "").endswith( "/get_files/file" ) and _extract_hash_from_hydrus_file_url(url) == file_hash hydrus_instances: set[str] = set() try: store_cfg = (config or {}).get("store") if isinstance(config, dict) else None if isinstance(store_cfg, dict): hydrus_cfg = store_cfg.get("hydrusnetwork") if isinstance(hydrus_cfg, dict): hydrus_instances = { str(k).strip().lower() for k in hydrus_cfg.keys() if str(k).strip() } except Exception: hydrus_instances = set() store_hint = store_name.lower() in { "hydrus", "hydrusnetwork" } or (store_name.lower() in hydrus_instances) if not (is_hydrus_url or store_hint): return None client = HydrusClient(url=hydrus_url, access_key=access_key, timeout=30.0) file_url = url if (url and is_hydrus_url) else client.file_url(file_hash) # Best-effort extension from Hydrus metadata. suffix = ".hydrus" try: meta_response = client.fetch_file_metadata( hashes=[file_hash], include_mime=True ) entries = meta_response.get("metadata" ) if isinstance(meta_response, dict) else None if isinstance(entries, list) and entries: entry = entries[0] if isinstance(entry, dict): ext = entry.get("ext") if isinstance(ext, str) and ext.strip(): cleaned = ext.strip() if not cleaned.startswith("."): cleaned = "." + cleaned.lstrip(".") if len(cleaned) <= 12: suffix = cleaned except Exception: pass output_dir.mkdir(parents=True, exist_ok=True) dest = output_dir / f"{file_hash}{suffix}" if dest.exists(): # Avoid clobbering; pick a unique name. dest = output_dir / f"{file_hash}_{uuid.uuid4().hex[:10]}{suffix}" headers = { "Hydrus-Client-API-Access-Key": access_key } download_hydrus_file(file_url, headers, dest, timeout=30.0) if dest.exists(): return str(dest) except Exception as exc: debug(f"[matrix] Hydrus export failed: {exc}") return None def _maybe_unlock_alldebrid_url(url: str, config: Dict[str, Any]) -> str: try: parsed = urlparse(url) host = (parsed.netloc or "").lower() if host != "alldebrid.com": return url if not (parsed.path or "").startswith("/f/"): return url try: from Provider.alldebrid import _get_debrid_api_key # type: ignore api_key = _get_debrid_api_key(config or {}) except Exception: api_key = None if not api_key: return url from API.alldebrid import AllDebridClient client = AllDebridClient(str(api_key)) unlocked = client.unlock_link(url) if isinstance(unlocked, str) and unlocked.strip(): return unlocked.strip() except Exception: pass return url def _resolve_upload_path(item: Any, config: Dict[str, Any]) -> Optional[str]: """Resolve a usable local file path for uploading. - Prefer existing local file paths. - Otherwise, if the item has an http(s) URL, download it to a temp directory. """ local = _extract_file_path(item) if local: return local # If this is a Hydrus-backed item (e.g. /get_files/file?hash=...), download it with Hydrus headers. try: base_tmp = None if isinstance(config, dict): base_tmp = config.get("temp") output_dir = ( Path(str(base_tmp)).expanduser() if base_tmp else (Path(tempfile.gettempdir()) / "Medios-Macina") ) output_dir = output_dir / "matrix" / "hydrus" hydrus_path = _maybe_download_hydrus_file(item, config, output_dir) if hydrus_path: return hydrus_path except Exception: pass url = _extract_url(item) if not url: return None # Best-effort: unlock AllDebrid file links (they require auth and aren't directly downloadable). url = _maybe_unlock_alldebrid_url(url, config) try: from SYS.download import _download_direct_file base_tmp = None if isinstance(config, dict): base_tmp = config.get("temp") output_dir = ( Path(str(base_tmp)).expanduser() if base_tmp else (Path(tempfile.gettempdir()) / "Medios-Macina") ) output_dir = output_dir / "matrix" output_dir.mkdir(parents=True, exist_ok=True) result = _download_direct_file(url, output_dir, quiet=True) if (result and hasattr(result, "path") and isinstance(result.path, Path) and result.path.exists()): return str(result.path) except Exception as exc: debug(f"[matrix] Failed to download URL for upload: {exc}") return None def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int: # Internal stage: send previously selected items to selected rooms. if any(str(a).lower() == "-send" for a in (args or [])): # Ensure we don't re-print the rooms picker table on the send stage. try: if hasattr(ctx, "set_last_result_table_overlay"): ctx.set_last_result_table_overlay(None, None, None) except Exception: pass try: if hasattr(ctx, "set_current_stage_table"): ctx.set_current_stage_table(None) except Exception: pass rooms = _normalize_to_list(result) room_ids: List[str] = [] for r in rooms: rid = _extract_room_id(r) if rid: room_ids.append(rid) if not room_ids: log("No Matrix room selected (use @N on the rooms table)", file=sys.stderr) return 1 pending_items = ctx.load_value(_MATRIX_PENDING_ITEMS_KEY, default=[]) items = _normalize_to_list(pending_items) if not items: log("No pending items to upload (use: @N | .matrix)", file=sys.stderr) return 1 from Provider.matrix import Matrix try: provider = Matrix(config) except Exception as exc: log(f"Matrix not available: {exc}", file=sys.stderr) return 1 text_value = _extract_text_arg(args) if not text_value: try: text_value = str( ctx.load_value(_MATRIX_PENDING_TEXT_KEY, default="") or "" ).strip() except Exception: text_value = "" size_limit_bytes = _get_matrix_size_limit_bytes(config) size_limit_mb = (size_limit_bytes / (1024 * 1024)) if size_limit_bytes else None # Resolve upload paths once (also avoids repeated downloads when sending to multiple rooms). upload_jobs: List[Dict[str, Any]] = [] any_failed = False for item in items: file_path = _resolve_upload_path(item, config) if not file_path: any_failed = True log( "Matrix upload requires a local file (path) or a direct URL on the selected item", file=sys.stderr, ) continue media_path = Path(file_path) if not media_path.exists(): any_failed = True log(f"Matrix upload file missing: {file_path}", file=sys.stderr) continue if size_limit_bytes is not None: try: byte_size = int(media_path.stat().st_size) except Exception: byte_size = -1 if byte_size >= 0 and byte_size > size_limit_bytes: any_failed = True actual_mb = byte_size / (1024 * 1024) lim = float(size_limit_mb or 0) log( f"ERROR: file is too big, skipping: {media_path.name} ({actual_mb:.1f} MB > {lim:.1f} MB)", file=sys.stderr, ) continue upload_jobs.append({ "path": str(media_path), "pipe_obj": item }) for rid in room_ids: sent_any_for_room = False for job in upload_jobs: file_path = str(job.get("path") or "") if not file_path: continue try: link = provider.upload_to_room( file_path, rid, pipe_obj=job.get("pipe_obj") ) debug(f"✓ Sent {Path(file_path).name} -> {rid}") if link: log(link) sent_any_for_room = True except Exception as exc: any_failed = True log( f"Matrix send failed for {Path(file_path).name}: {exc}", file=sys.stderr ) # Optional caption-like follow-up message (sent once per room). if text_value and sent_any_for_room: try: provider.send_text_to_room(text_value, rid) except Exception as exc: any_failed = True log(f"Matrix text send failed: {exc}", file=sys.stderr) # Clear pending items once we've attempted to send. ctx.store_value(_MATRIX_PENDING_ITEMS_KEY, []) try: ctx.store_value(_MATRIX_PENDING_TEXT_KEY, "") except Exception: pass return 1 if any_failed else 0 # Default stage: show rooms, then wait for @N selection to resume sending. selected_items = _normalize_to_list(result) if not selected_items: log( "Usage: @N | .matrix (select items first, then pick a room)", file=sys.stderr ) return 1 ctx.store_value(_MATRIX_PENDING_ITEMS_KEY, selected_items) try: ctx.store_value(_MATRIX_PENDING_TEXT_KEY, _extract_text_arg(args)) except Exception: pass from Provider.matrix import Matrix try: provider = Matrix(config) except Exception as exc: log(f"Matrix not available: {exc}", file=sys.stderr) return 1 try: configured_ids = None if not _has_flag(args, "-all"): ids = [ str(v).strip() for v in _parse_config_room_filter_ids(config) if str(v).strip() ] if ids: configured_ids = ids rooms = provider.list_rooms(room_ids=configured_ids) except Exception as exc: log(f"Failed to list Matrix rooms: {exc}", file=sys.stderr) return 1 # Diagnostics if a configured filter yields no rows (provider filtered before name lookups for speed). if not rooms and not _has_flag(args, "-all"): configured_ids_dbg = [ str(v).strip() for v in _parse_config_room_filter_ids(config) if str(v).strip() ] if configured_ids_dbg: try: joined_ids = provider.list_joined_room_ids() debug(f"[matrix] Configured room filter IDs: {configured_ids_dbg}") debug(f"[matrix] Joined room IDs (from Matrix): {joined_ids}") except Exception: pass if not rooms: if _parse_config_room_filter_ids(config) and not _has_flag(args, "-all"): log( "No joined rooms matched the configured Matrix room filter (use: .matrix -all)", file=sys.stderr, ) else: log("No joined rooms found.", file=sys.stderr) return 0 table = ResultTable("Matrix Rooms (select with @N)") table.set_table("matrix") table.set_source_command(".matrix", []) for room in rooms: row = table.add_row() name = str(room.get("name") or "").strip() if isinstance(room, dict) else "" room_id = str(room.get("room_id") or "" ).strip() if isinstance(room, dict) else "" row.add_column("Name", name) row.add_column("Room", room_id) # Make selection results clearer: stash a friendly title/store on the backing items. # This avoids confusion when the selection handler prints PipeObject debug info. room_items: List[Dict[str, Any]] = [] for room in rooms: if not isinstance(room, dict): continue room_id = str(room.get("room_id") or "").strip() name = str(room.get("name") or "").strip() room_items.append( { **room, "store": "matrix", "title": name or room_id or "Matrix Room", } ) # Overlay table: user selects @N, then we resume with `.matrix -send`. ctx.set_last_result_table_overlay(table, room_items) ctx.set_current_stage_table(table) ctx.set_pending_pipeline_tail([[".matrix", "-send"]], ".matrix") return 0 CMDLET = Cmdlet( name=".matrix", alias=["matrix", "rooms"], summary="Send selected items to a Matrix room", usage="@N | .matrix", arg=[ CmdletArg( name="send", type="bool", description="(internal) Send to selected room(s)", required=False, ), CmdletArg( name="all", type="bool", description="Ignore config room filter and show all joined rooms", required=False, ), CmdletArg( name="text", type="string", description="Send a follow-up text message after each upload (caption-like)", required=False, ), ], exec=_run, )