This commit is contained in:
2026-03-25 22:39:30 -07:00
parent c31402c8f1
commit 562acd809c
46 changed files with 2367 additions and 1868 deletions

79
cmdnat/_parsing.py Normal file
View File

@@ -0,0 +1,79 @@
from __future__ import annotations
from typing import Any, Iterable, List, Optional, Sequence
VALUE_ARG_FLAGS = frozenset({"-value", "--value", "-set-value", "--set-value"})
def extract_piped_value(result: Any) -> Optional[str]:
if isinstance(result, str):
return result.strip() if result.strip() else None
if isinstance(result, (int, float)):
return str(result)
if isinstance(result, dict):
value = result.get("value")
if value is not None:
return str(value).strip()
return None
def extract_arg_value(
args: Sequence[str],
*,
flags: Iterable[str],
allow_positional: bool = False,
) -> Optional[str]:
if not args:
return None
tokens = [str(tok) for tok in args if tok is not None]
normalized_flags = {
str(flag).strip().lower() for flag in flags if str(flag).strip()
}
if not normalized_flags:
return None
for idx, tok in enumerate(tokens):
text = tok.strip()
if not text:
continue
low = text.lower()
if low in normalized_flags and idx + 1 < len(tokens):
candidate = str(tokens[idx + 1]).strip()
if candidate:
return candidate
if "=" in low:
head, value = low.split("=", 1)
if head in normalized_flags and value:
return value.strip()
if not allow_positional:
return None
for tok in tokens:
text = str(tok).strip()
if text and not text.startswith("-"):
return text
return None
def extract_value_arg(args: Sequence[str]) -> Optional[str]:
return extract_arg_value(args, flags=VALUE_ARG_FLAGS, allow_positional=True)
def has_flag(args: Sequence[str], flag: str) -> bool:
try:
want = str(flag or "").strip().lower()
if not want:
return False
return any(str(arg).strip().lower() == want for arg in (args or []))
except Exception:
return False
def normalize_to_list(value: Any) -> List[Any]:
if value is None:
return []
if isinstance(value, list):
return value
return [value]

112
cmdnat/_status_shared.py Normal file
View File

@@ -0,0 +1,112 @@
from __future__ import annotations
from typing import Any
import httpx
from SYS.result_table import Table
def upper_text(value: Any) -> str:
text = "" if value is None else str(value)
return text.upper()
def add_startup_check(
table: Table,
status: str,
name: str,
*,
provider: str = "",
store: str = "",
files: int | str | None = None,
detail: str = "",
) -> None:
row = table.add_row()
row.add_column("STATUS", upper_text(status))
row.add_column("NAME", upper_text(name))
row.add_column("PROVIDER", upper_text(provider or ""))
row.add_column("STORE", upper_text(store or ""))
row.add_column("FILES", "" if files is None else str(files))
row.add_column("DETAIL", upper_text(detail or ""))
def has_store_subtype(cfg: dict, subtype: str) -> bool:
store_cfg = cfg.get("store")
if not isinstance(store_cfg, dict):
return False
bucket = store_cfg.get(subtype)
if not isinstance(bucket, dict):
return False
return any(isinstance(value, dict) and bool(value) for value in bucket.values())
def has_provider(cfg: dict, name: str) -> bool:
provider_cfg = cfg.get("provider")
if not isinstance(provider_cfg, dict):
return False
block = provider_cfg.get(str(name).strip().lower())
return isinstance(block, dict) and bool(block)
def has_tool(cfg: dict, name: str) -> bool:
tool_cfg = cfg.get("tool")
if not isinstance(tool_cfg, dict):
return False
block = tool_cfg.get(str(name).strip().lower())
return isinstance(block, dict) and bool(block)
def ping_url(url: str, timeout: float = 3.0) -> tuple[bool, str]:
try:
from API.HTTP import HTTPClient
with HTTPClient(timeout=timeout, retries=1) as client:
response = client.get(url, allow_redirects=True)
code = int(getattr(response, "status_code", 0) or 0)
ok = 200 <= code < 500
return ok, f"{url} (HTTP {code})"
except httpx.TimeoutException:
return False, f"{url} (timeout)"
except Exception as exc:
return False, f"{url} ({type(exc).__name__})"
def provider_display_name(key: str) -> str:
label = (key or "").strip()
lower = label.lower()
if lower == "openlibrary":
return "OpenLibrary"
if lower == "alldebrid":
return "AllDebrid"
if lower == "youtube":
return "YouTube"
return label[:1].upper() + label[1:] if label else "Provider"
def default_provider_ping_targets(provider_key: str) -> list[str]:
provider = (provider_key or "").strip().lower()
if provider == "openlibrary":
return ["https://openlibrary.org"]
if provider == "youtube":
return ["https://www.youtube.com"]
if provider == "bandcamp":
return ["https://bandcamp.com"]
if provider == "libgen":
try:
from Provider.libgen import MIRRORS
return [str(url).rstrip("/") + "/json.php" for url in (MIRRORS or []) if str(url).strip()]
except ImportError:
return []
return []
def ping_first(urls: list[str]) -> tuple[bool, str]:
for url in urls:
ok, detail = ping_url(url)
if ok:
return True, detail
if urls:
return ping_url(urls[0])
return False, "No ping target"

View File

@@ -1,7 +1,7 @@
import json
import os
import sys
from typing import List, Dict, Any, Sequence
from typing import List, Dict, Any, Sequence, Optional
from SYS.cmdlet_spec import Cmdlet, CmdletArg
from SYS.logger import log
from SYS.result_table import Table
@@ -12,22 +12,45 @@ ADJECTIVE_FILE = os.path.join(
"cmdnat",
"adjective.json"
)
_ADJECTIVE_CACHE: Optional[Dict[str, List[str]]] = None
_ADJECTIVE_CACHE_MTIME_NS: Optional[int] = None
def _load_adjectives() -> Dict[str, List[str]]:
global _ADJECTIVE_CACHE, _ADJECTIVE_CACHE_MTIME_NS
try:
if os.path.exists(ADJECTIVE_FILE):
with open(ADJECTIVE_FILE, "r", encoding="utf-8") as f:
return json.load(f)
if not os.path.exists(ADJECTIVE_FILE):
_ADJECTIVE_CACHE = {}
_ADJECTIVE_CACHE_MTIME_NS = None
return {}
current_mtime_ns = os.stat(ADJECTIVE_FILE).st_mtime_ns
if (_ADJECTIVE_CACHE is not None and
_ADJECTIVE_CACHE_MTIME_NS == current_mtime_ns):
return _ADJECTIVE_CACHE
with open(ADJECTIVE_FILE, "r", encoding="utf-8") as f:
loaded = json.load(f)
if not isinstance(loaded, dict):
loaded = {}
_ADJECTIVE_CACHE = loaded
_ADJECTIVE_CACHE_MTIME_NS = current_mtime_ns
return _ADJECTIVE_CACHE
except Exception as e:
log(f"Error loading adjectives: {e}", file=sys.stderr)
_ADJECTIVE_CACHE = {}
_ADJECTIVE_CACHE_MTIME_NS = None
return {}
def _save_adjectives(data: Dict[str, List[str]]) -> bool:
global _ADJECTIVE_CACHE, _ADJECTIVE_CACHE_MTIME_NS
try:
with open(ADJECTIVE_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
_ADJECTIVE_CACHE = data
_ADJECTIVE_CACHE_MTIME_NS = os.stat(ADJECTIVE_FILE).st_mtime_ns
return True
except Exception as e:
log(f"Error saving adjectives: {e}", file=sys.stderr)

View File

@@ -1,9 +1,18 @@
from typing import List, Dict, Any, Optional, Sequence
from SYS.cmdlet_spec import Cmdlet, CmdletArg
from SYS.config import load_config, save_config, save_config_and_verify
from SYS.config import (
load_config,
save_config,
save_config_and_verify,
set_nested_config_value,
)
from SYS import pipeline as ctx
from SYS.result_table import Table
from cmdnat._parsing import (
extract_piped_value as _extract_piped_value,
extract_value_arg as _extract_value_arg,
)
CMDLET = Cmdlet(
name=".config",
@@ -43,91 +52,7 @@ def flatten_config(config: Dict[str, Any], parent_key: str = "", sep: str = ".")
def set_nested_config(config: Dict[str, Any], key: str, value: str) -> bool:
keys = key.split(".")
d = config
# Navigate to the parent dict
for k in keys[:-1]:
if k not in d or not isinstance(d[k], dict):
d[k] = {}
d = d[k]
last_key = keys[-1]
# Try to preserve type if key exists
if last_key in d:
current_val = d[last_key]
if isinstance(current_val, bool):
if value.lower() in ("true", "yes", "1", "on"):
d[last_key] = True
elif value.lower() in ("false", "no", "0", "off"):
d[last_key] = False
else:
# Fallback to boolean conversion of string (usually True for non-empty)
# But for config, explicit is better.
print(f"Warning: Could not convert '{value}' to boolean. Using string.")
d[last_key] = value
elif isinstance(current_val, int):
try:
d[last_key] = int(value)
except ValueError:
print(f"Warning: Could not convert '{value}' to int. Using string.")
d[last_key] = value
elif isinstance(current_val, float):
try:
d[last_key] = float(value)
except ValueError:
print(f"Warning: Could not convert '{value}' to float. Using string.")
d[last_key] = value
else:
d[last_key] = value
else:
# New key, try to infer type
if value.lower() in ("true", "false"):
d[last_key] = value.lower() == "true"
elif value.isdigit():
d[last_key] = int(value)
else:
d[last_key] = value
return True
def _extract_piped_value(result: Any) -> Optional[str]:
if isinstance(result, str):
return result.strip() if result.strip() else None
if isinstance(result, (int, float)):
return str(result)
if isinstance(result, dict):
val = result.get("value")
if val is not None:
return str(val).strip()
return None
def _extract_value_arg(args: Sequence[str]) -> Optional[str]:
if not args:
return None
tokens = [str(tok) for tok in args if tok is not None]
flags = {"-value", "--value", "-set-value", "--set-value"}
for idx, tok in enumerate(tokens):
text = tok.strip()
if not text:
continue
low = text.lower()
if low in flags and idx + 1 < len(tokens):
candidate = str(tokens[idx + 1]).strip()
if candidate:
return candidate
if "=" in low:
head, val = low.split("=", 1)
if head in flags and val:
return val.strip()
for tok in tokens:
text = str(tok).strip()
if text and not text.startswith("-"):
return text
return None
return set_nested_config_value(config, key, value, on_error=print)
def _get_selected_config_key() -> Optional[str]:

View File

@@ -12,8 +12,16 @@ from SYS.cmdlet_spec import Cmdlet, CmdletArg
from SYS.config import load_config, save_config
from SYS.logger import log, debug
from SYS.result_table import Table
from SYS.item_accessors import get_sha256_hex
from SYS.utils import extract_hydrus_hash_from_url
from SYS import pipeline as ctx
from cmdnat._parsing import (
extract_arg_value,
extract_piped_value as _extract_piped_value,
extract_value_arg as _extract_value_arg,
has_flag as _has_flag,
normalize_to_list as _normalize_to_list,
)
_MATRIX_PENDING_ITEMS_KEY = "matrix_pending_items"
_MATRIX_PENDING_TEXT_KEY = "matrix_pending_text"
@@ -21,62 +29,9 @@ _MATRIX_MENU_STATE_KEY = "matrix_menu_state"
_MATRIX_SELECTED_SETTING_KEY_KEY = "matrix_selected_setting_key"
def _extract_piped_value(result: Any) -> Optional[str]:
"""Extract the piped value from result (string, number, or dict with 'value' key)."""
if isinstance(result, str):
return result.strip() if result.strip() else None
if isinstance(result, (int, float)):
return str(result)
if isinstance(result, dict):
# Fallback to value field if it's a dict
val = result.get("value")
if val is not None:
return str(val).strip()
return None
def _extract_value_arg(args: Sequence[str]) -> Optional[str]:
"""Extract a fallback value from command-line args (value flag or positional)."""
if not args:
return None
tokens = [str(tok) for tok in args if tok is not None]
value_flags = {"-value", "--value", "-set-value", "--set-value"}
for idx, tok in enumerate(tokens):
low = tok.strip()
if not low:
continue
low_lower = low.lower()
if low_lower in value_flags and idx + 1 < len(tokens):
candidate = str(tokens[idx + 1]).strip()
if candidate:
return candidate
if "=" in low_lower:
head, val = low_lower.split("=", 1)
if head in value_flags and val:
return val.strip()
# Fallback to first non-flag token
for tok in tokens:
text = str(tok).strip()
if text and not text.startswith("-"):
return text
return None
def _extract_set_value_arg(args: Sequence[str]) -> Optional[str]:
"""Extract the value from -set-value flag."""
if not args:
return None
try:
tokens = list(args)
except Exception:
return None
for i, tok in enumerate(tokens):
try:
if str(tok).lower() == "-set-value" and i + 1 < len(tokens):
return str(tokens[i + 1]).strip()
except Exception:
continue
return None
return extract_arg_value(args, flags={"-set-value"})
def _update_matrix_config(config: Dict[str, Any], key: str, value: Any) -> bool:
@@ -122,16 +77,6 @@ def _update_matrix_config(config: Dict[str, Any], key: str, value: Any) -> bool:
return False
def _has_flag(args: Sequence[str], flag: str) -> bool:
try:
want = str(flag or "").strip().lower()
if not want:
return False
return any(str(a).strip().lower() == want for a in (args or []))
except Exception:
return False
def _parse_config_room_filter_ids(config: Dict[str, Any]) -> List[str]:
try:
if not isinstance(config, dict):
@@ -426,14 +371,6 @@ def _extract_text_arg(args: Sequence[str]) -> str:
return ""
def _normalize_to_list(value: Any) -> List[Any]:
if value is None:
return []
if isinstance(value, list):
return value
return [value]
def _extract_room_id(room_obj: Any) -> Optional[str]:
try:
# PipeObject stores unknown fields in .extra
@@ -525,22 +462,8 @@ def _extract_url(item: Any) -> Optional[str]:
return None
_SHA256_RE = re.compile(r"^[0-9a-fA-F]{64}$")
def _extract_sha256_hex(item: Any) -> Optional[str]:
try:
if hasattr(item, "hash"):
h = getattr(item, "hash")
if isinstance(h, str) and _SHA256_RE.fullmatch(h.strip()):
return h.strip().lower()
if isinstance(item, dict):
h = item.get("hash")
if isinstance(h, str) and _SHA256_RE.fullmatch(h.strip()):
return h.strip().lower()
except Exception:
pass
return None
return get_sha256_hex(item, "hash")
def _extract_hash_from_hydrus_file_url(url: str) -> Optional[str]:

View File

@@ -39,6 +39,7 @@ _WINDOWS_RESERVED_NAMES = {
*(f"com{i}" for i in range(1, 10)),
*(f"lpt{i}" for i in range(1, 10)),
}
_ILLEGAL_FILENAME_CHARS_RE = re.compile(r'[<>:"/\\|?*]')
def _sanitize_filename_base(text: str) -> str:
@@ -48,7 +49,7 @@ def _sanitize_filename_base(text: str) -> str:
return "table"
# Replace characters illegal on Windows (and generally unsafe cross-platform).
s = re.sub(r'[<>:"/\\|?*]', " ", s)
s = _ILLEGAL_FILENAME_CHARS_RE.sub(" ", s)
# Drop control characters.
s = "".join(ch for ch in s if ch.isprintable())

View File

@@ -23,6 +23,15 @@ _ALLDEBRID_UNLOCK_CACHE: Dict[str,
str] = {}
_NOTES_PREFETCH_INFLIGHT: set[str] = set()
_NOTES_PREFETCH_LOCK = threading.Lock()
_PLAYLIST_STORE_CACHE: Optional[Dict[str, Any]] = None
_PLAYLIST_STORE_MTIME_NS: Optional[int] = None
_SHA256_RE = re.compile(r"[0-9a-f]{64}")
_SHA256_FULL_RE = re.compile(r"^[0-9a-f]{64}$")
_EXTINF_TITLE_RE = re.compile(r"#EXTINF:-1,(.*?)(?:\n|\r|$)")
_WINDOWS_PATH_RE = re.compile(r"^[a-z]:[\\/]", flags=re.IGNORECASE)
_HASH_QUERY_RE = re.compile(r"hash=([0-9a-f]{64})")
_IPV4_RE = re.compile(r"^\d+\.\d+\.\d+\.\d+$")
_MPD_PATH_RE = re.compile(r"\.mpd($|\?)")
def _repo_root() -> Path:
@@ -36,26 +45,56 @@ def _playlist_store_path() -> Path:
return _repo_root() / "mpv_playlists.json"
def _load_playlist_store(path: Path) -> Dict[str, Any]:
if not path.exists():
return {"next_id": 1, "playlists": []}
def _new_playlist_store() -> Dict[str, Any]:
return {"next_id": 1, "playlists": []}
def _normalize_playlist_store(data: Any) -> Dict[str, Any]:
if not isinstance(data, dict):
return _new_playlist_store()
normalized = dict(data)
try:
data = json.loads(path.read_text(encoding="utf-8"))
if not isinstance(data, dict):
return {"next_id": 1, "playlists": []}
data.setdefault("next_id", 1)
data.setdefault("playlists", [])
if not isinstance(data["playlists"], list):
data["playlists"] = []
next_id = int(normalized.get("next_id") or 1)
except Exception:
next_id = 1
normalized["next_id"] = max(next_id, 1)
playlists = normalized.get("playlists")
normalized["playlists"] = playlists if isinstance(playlists, list) else []
return normalized
def _load_playlist_store(path: Path) -> Dict[str, Any]:
global _PLAYLIST_STORE_CACHE, _PLAYLIST_STORE_MTIME_NS
if not path.exists():
_PLAYLIST_STORE_CACHE = _new_playlist_store()
_PLAYLIST_STORE_MTIME_NS = None
return _PLAYLIST_STORE_CACHE
try:
current_mtime_ns = path.stat().st_mtime_ns
if (_PLAYLIST_STORE_CACHE is not None and
_PLAYLIST_STORE_MTIME_NS == current_mtime_ns):
return _PLAYLIST_STORE_CACHE
data = _normalize_playlist_store(json.loads(path.read_text(encoding="utf-8")))
_PLAYLIST_STORE_CACHE = data
_PLAYLIST_STORE_MTIME_NS = current_mtime_ns
return data
except Exception:
return {"next_id": 1, "playlists": []}
_PLAYLIST_STORE_CACHE = _new_playlist_store()
_PLAYLIST_STORE_MTIME_NS = None
return _PLAYLIST_STORE_CACHE
def _save_playlist_store(path: Path, data: Dict[str, Any]) -> bool:
global _PLAYLIST_STORE_CACHE, _PLAYLIST_STORE_MTIME_NS
try:
normalized = _normalize_playlist_store(data)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(data, indent=2), encoding="utf-8")
path.write_text(json.dumps(normalized, indent=2), encoding="utf-8")
_PLAYLIST_STORE_CACHE = normalized
_PLAYLIST_STORE_MTIME_NS = path.stat().st_mtime_ns
return True
except Exception:
return False
@@ -559,7 +598,7 @@ def _extract_store_and_hash(item: Any) -> tuple[Optional[str], Optional[str]]:
else:
text = getattr(item, "path", None) or getattr(item, "url", None)
if text:
m = re.search(r"[0-9a-f]{64}", str(text).lower())
m = _SHA256_RE.search(str(text).lower())
if m:
file_hash = m.group(0)
except Exception:
@@ -707,7 +746,7 @@ def _extract_title_from_item(item: Dict[str, Any]) -> str:
try:
# Extract title from #EXTINF:-1,Title
# Use regex to find title between #EXTINF:-1, and newline
match = re.search(r"#EXTINF:-1,(.*?)(?:\n|\r|$)", filename)
match = _EXTINF_TITLE_RE.search(filename)
if match:
extracted_title = match.group(1).strip()
if not title or title == "memory://":
@@ -817,7 +856,7 @@ def _normalize_playlist_path(text: Optional[str]) -> Optional[str]:
return None
# If it's already a bare hydrus hash, use it directly
lower_real = real.lower()
if re.fullmatch(r"[0-9a-f]{64}", lower_real):
if _SHA256_FULL_RE.fullmatch(lower_real):
return lower_real
# If it's a hydrus file URL, normalize to the hash for dedupe
@@ -829,7 +868,7 @@ def _normalize_playlist_path(text: Optional[str]) -> Optional[str]:
if parsed.path.endswith("/get_files/file"):
qs = parse_qs(parsed.query)
h = qs.get("hash", [None])[0]
if h and re.fullmatch(r"[0-9a-f]{64}", h.lower()):
if h and _SHA256_FULL_RE.fullmatch(h.lower()):
return h.lower()
except Exception:
pass
@@ -862,7 +901,7 @@ def _infer_store_from_playlist_item(
target = memory_target
# Hydrus hashes: bare 64-hex entries
if re.fullmatch(r"[0-9a-f]{64}", target.lower()):
if _SHA256_FULL_RE.fullmatch(target.lower()):
# If we have file_storage, query each Hydrus instance to find which one has this hash
if file_storage:
hash_str = target.lower()
@@ -877,7 +916,7 @@ def _infer_store_from_playlist_item(
if lower.startswith("hydrus://"):
# Extract hash from hydrus:// URL if possible
if file_storage:
hash_match = re.search(r"[0-9a-f]{64}", target.lower())
hash_match = _SHA256_RE.search(target.lower())
if hash_match:
hash_str = hash_match.group(0)
hydrus_instance = _find_hydrus_instance_for_hash(hash_str, file_storage)
@@ -886,9 +925,7 @@ def _infer_store_from_playlist_item(
return "hydrus"
# Windows / UNC paths
if re.match(r"^[a-z]:[\\/]",
target,
flags=re.IGNORECASE) or target.startswith("\\\\"):
if _WINDOWS_PATH_RE.match(target) or target.startswith("\\\\"):
return "local"
# file:// url
@@ -918,7 +955,7 @@ def _infer_store_from_playlist_item(
# Hydrus API URL - try to extract hash and find instance
if file_storage:
# Try to extract hash from URL parameters
hash_match = re.search(r"hash=([0-9a-f]{64})", target.lower())
hash_match = _HASH_QUERY_RE.search(target.lower())
if hash_match:
hash_str = hash_match.group(1)
hydrus_instance = _find_hydrus_instance_for_hash(hash_str, file_storage)
@@ -929,10 +966,10 @@ def _infer_store_from_playlist_item(
if hydrus_instance:
return hydrus_instance
return "hydrus"
if re.match(r"^\d+\.\d+\.\d+\.\d+$", host_stripped) and "get_files" in path:
if _IPV4_RE.match(host_stripped) and "get_files" in path:
# IP-based Hydrus URL
if file_storage:
hash_match = re.search(r"hash=([0-9a-f]{64})", target.lower())
hash_match = _HASH_QUERY_RE.search(target.lower())
if hash_match:
hash_str = hash_match.group(1)
hydrus_instance = _find_hydrus_instance_for_hash(hash_str, file_storage)
@@ -1002,7 +1039,7 @@ def _is_hydrus_path(path: str, hydrus_url: Optional[str]) -> bool:
pass
if "get_files" in path_part or "file?hash=" in path_part:
return True
if re.match(r"^\d+\.\d+\.\d+\.\d+$", host) and "get_files" in path_part:
if _IPV4_RE.match(host) and "get_files" in path_part:
return True
return False
@@ -1493,7 +1530,7 @@ def _queue_items(
# Set it via IPC before loadfile so the currently running MPV can play the manifest.
try:
target_str = str(target or "")
if re.search(r"\.mpd($|\?)", target_str.lower()):
if _MPD_PATH_RE.search(target_str.lower()):
_send_ipc_command(
{
"command": [
@@ -1556,8 +1593,9 @@ def _queue_items(
if target:
# If we just have a hydrus hash, build a direct file URL for MPV
if re.fullmatch(r"[0-9a-f]{64}",
str(target).strip().lower()) and effective_hydrus_url:
if _SHA256_FULL_RE.fullmatch(
str(target).strip().lower()
) and effective_hydrus_url:
target = (
f"{effective_hydrus_url.rstrip('/')}/get_files/file?hash={str(target).strip()}"
)
@@ -2337,7 +2375,7 @@ def _run(result: Any, args: Sequence[str], config: Dict[str, Any]) -> int:
# Check if it's a Hydrus URL
if "get_files/file" in real_path or "hash=" in real_path:
# Extract hash from Hydrus URL
hash_match = re.search(r"hash=([0-9a-f]{64})", real_path.lower())
hash_match = _HASH_QUERY_RE.search(real_path.lower())
if hash_match:
file_hash = hash_match.group(1)
# Try to find which Hydrus instance has this file
@@ -2576,7 +2614,7 @@ def _start_mpv(
candidate = it.get("path") or it.get("url")
else:
candidate = getattr(it, "path", None) or getattr(it, "url", None)
if candidate and re.search(r"\.mpd($|\?)", str(candidate).lower()):
if candidate and _MPD_PATH_RE.search(str(candidate).lower()):
needs_mpd_whitelist = True
break
if needs_mpd_whitelist:

View File

@@ -7,6 +7,16 @@ from SYS.cmdlet_spec import Cmdlet
from SYS import pipeline as ctx
from SYS.result_table import Table
from SYS.logger import set_debug, debug
from cmdnat._status_shared import (
add_startup_check as _add_startup_check,
default_provider_ping_targets as _default_provider_ping_targets,
has_provider as _has_provider,
has_store_subtype as _has_store_subtype,
has_tool as _has_tool,
ping_first as _ping_first,
ping_url as _ping_url,
provider_display_name as _provider_display_name,
)
CMDLET = Cmdlet(
name=".status",
@@ -15,91 +25,6 @@ CMDLET = Cmdlet(
arg=[],
)
def _upper(value: Any) -> str:
text = "" if value is None else str(value)
return text.upper()
def _add_startup_check(
table: Table,
status: str,
name: str,
*,
provider: str = "",
store: str = "",
files: int | str | None = None,
detail: str = "",
) -> None:
row = table.add_row()
row.add_column("STATUS", _upper(status))
row.add_column("NAME", _upper(name))
row.add_column("PROVIDER", _upper(provider or ""))
row.add_column("STORE", _upper(store or ""))
row.add_column("FILES", "" if files is None else str(files))
row.add_column("DETAIL", _upper(detail or ""))
def _has_store_subtype(cfg: dict, subtype: str) -> bool:
store_cfg = cfg.get("store")
if not isinstance(store_cfg, dict):
return False
bucket = store_cfg.get(subtype)
if not isinstance(bucket, dict):
return False
return any(isinstance(v, dict) and bool(v) for v in bucket.values())
def _has_provider(cfg: dict, name: str) -> bool:
provider_cfg = cfg.get("provider")
if not isinstance(provider_cfg, dict):
return False
block = provider_cfg.get(str(name).strip().lower())
return isinstance(block, dict) and bool(block)
def _has_tool(cfg: dict, name: str) -> bool:
tool_cfg = cfg.get("tool")
if not isinstance(tool_cfg, dict):
return False
block = tool_cfg.get(str(name).strip().lower())
return isinstance(block, dict) and bool(block)
def _ping_url(url: str, timeout: float = 3.0) -> tuple[bool, str]:
try:
from API.HTTP import HTTPClient
with HTTPClient(timeout=timeout, retries=1) as client:
resp = client.get(url, allow_redirects=True)
code = int(getattr(resp, "status_code", 0) or 0)
ok = 200 <= code < 500
return ok, f"{url} (HTTP {code})"
except Exception as exc:
return False, f"{url} ({type(exc).__name__})"
def _provider_display_name(key: str) -> str:
k = (key or "").strip()
low = k.lower()
if low == "openlibrary": return "OpenLibrary"
if low == "alldebrid": return "AllDebrid"
if low == "youtube": return "YouTube"
return k[:1].upper() + k[1:] if k else "Provider"
def _default_provider_ping_targets(provider_key: str) -> list[str]:
prov = (provider_key or "").strip().lower()
if prov == "openlibrary": return ["https://openlibrary.org"]
if prov == "youtube": return ["https://www.youtube.com"]
if prov == "bandcamp": return ["https://bandcamp.com"]
if prov == "libgen":
try:
from Provider.libgen import MIRRORS
return [str(x).rstrip("/") + "/json.php" for x in (MIRRORS or []) if str(x).strip()]
except ImportError: return []
return []
def _ping_first(urls: list[str]) -> tuple[bool, str]:
for u in urls:
ok, detail = _ping_url(u)
if ok: return True, detail
if urls:
ok, detail = _ping_url(urls[0])
return ok, detail
return False, "No ping target"
def _run(result: Any, args: List[str], config: Dict[str, Any]) -> int:
startup_table = Table(
"*********<IGNITIO>*********<NOUSEMPEH>*********<RUGRAPOG>*********<OMEGHAU>*********"

View File

@@ -8,28 +8,10 @@ from SYS.cmdlet_spec import Cmdlet, CmdletArg
from SYS.logger import log
from SYS.result_table import Table
from SYS import pipeline as ctx
from cmdnat._parsing import has_flag as _has_flag, normalize_to_list as _normalize_to_list
_TELEGRAM_PENDING_ITEMS_KEY = "telegram_pending_items"
def _has_flag(args: Sequence[str], flag: str) -> bool:
try:
want = str(flag or "").strip().lower()
if not want:
return False
return any(str(a).strip().lower() == want for a in (args or []))
except Exception:
return False
def _normalize_to_list(value: Any) -> List[Any]:
if value is None:
return []
if isinstance(value, list):
return value
return [value]
def _extract_chat_id(chat_obj: Any) -> Optional[int]:
try:
if isinstance(chat_obj, dict):