This commit is contained in:
2026-01-22 04:08:07 -08:00
parent 88a537d9f7
commit cd7f03e592
4 changed files with 238 additions and 487 deletions

14
CLI.py
View File

@@ -28,20 +28,6 @@ if not os.environ.get("MM_DEBUG"):
val = str(row[0]).strip().lower()
if val in ("1", "true", "yes", "on"):
os.environ["MM_DEBUG"] = "1"
# Fallback to legacy config.conf if not set by DB
if not os.environ.get("MM_DEBUG"):
conf_path = Path(__file__).resolve().parent / "config.conf"
if conf_path.exists():
for ln in conf_path.read_text(encoding="utf-8").splitlines():
ln_strip = ln.strip()
if ln_strip.startswith("debug"):
parts = ln_strip.split("=", 1)
if len(parts) >= 2:
val = parts[1].strip().strip('"').strip("'").strip().lower()
if val in ("1", "true", "yes", "on"):
os.environ["MM_DEBUG"] = "1"
break
except Exception:
pass

View File

@@ -39,321 +39,78 @@ def clear_config_cache() -> None:
_CONFIG_CACHE.clear()
def _strip_inline_comment(line: str) -> str:
# Strip comments in a way that's friendly to common .conf usage:
# - Full-line comments starting with '#' or ';'
# - Inline comments starting with '#' or ';' *outside quotes*
# (e.g. dtype="float16" # optional)
stripped = line.strip()
if not stripped:
return ""
if stripped.startswith("#") or stripped.startswith(";"):
return ""
in_single = False
in_double = False
for i, ch in enumerate(line):
if ch == "'" and not in_double:
in_single = not in_single
continue
if ch == '"' and not in_single:
in_double = not in_double
continue
if in_single or in_double:
continue
if ch in {"#", ";"}:
# Treat as a comment start only when preceded by whitespace.
# This keeps values like paths or tokens containing '#' working
# when quoted, and reduces surprises for unquoted values.
if i == 0 or line[i - 1].isspace():
return line[:i].rstrip()
return line
def reload_config(
config_dir: Optional[Path] = None, filename: str = "medios.db"
) -> Dict[str, Any]:
_CONFIG_CACHE.pop("db_config", None)
return load_config(config_dir=config_dir, filename=filename)
def _parse_scalar(value: str) -> Any:
v = value.strip()
if not v:
return ""
def load_config(
config_dir: Optional[Path] = None, filename: str = "medios.db"
) -> Dict[str, Any]:
# We no longer use config_dir or filename for the config file itself,
# but we keep them in the signature for backward compatibility.
cache_key = "db_config"
if cache_key in _CONFIG_CACHE:
return _CONFIG_CACHE[cache_key]
if (v.startswith('"') and v.endswith('"')) or (v.startswith("'") and v.endswith("'")):
return v[1:-1]
# Load from database
try:
from SYS.database import get_config_all
db_config = get_config_all()
if db_config:
_CONFIG_CACHE[cache_key] = db_config
return db_config
except Exception:
pass
low = v.lower()
if low in {"true", "yes", "on", "1"}:
return True
if low in {"false", "no", "off", "0"}:
return False
if re.fullmatch(r"-?\d+", v):
try:
return int(v)
except Exception:
return v
if re.fullmatch(r"-?\d+\.\d+", v):
try:
return float(v)
except Exception:
return v
return v
return {}
def _set_nested(d: Dict[str, Any], dotted_key: str, value: Any) -> None:
parts = [p for p in dotted_key.split(".") if p]
if not parts:
return
cur: Dict[str, Any] = d
for p in parts[:-1]:
nxt = cur.get(p)
if not isinstance(nxt, dict):
nxt = {}
cur[p] = nxt
cur = nxt
cur[parts[-1]] = value
def _merge_dict_inplace(base: Dict[str, Any], patch: Dict[str, Any]) -> Dict[str, Any]:
for k, v in patch.items():
if isinstance(v, dict) and isinstance(base.get(k), dict):
_merge_dict_inplace(base[k], v) # type: ignore[index]
else:
base[k] = v
return base
def _apply_conf_block(
config: Dict[str, Any], kind: str, subtype: str, block: Dict[str, Any]
def save_config(
config: Dict[str, Any],
config_dir: Optional[Path] = None,
filename: str = "medios.db",
) -> None:
kind_l = str(kind).strip().lower()
subtype_l = str(subtype).strip().lower()
if kind_l == "store":
# Store instances are keyed by NAME (preferred). If a block uses `name=...`,
# normalize it into NAME to keep a single canonical key.
name = block.get("NAME")
if not name:
name = block.get("name")
if name:
block = dict(block)
block.pop("name", None)
block["NAME"] = name
if not name:
return
name_l = str(name).strip().lower()
payload = dict(block)
store = config.setdefault("store", {})
if not isinstance(store, dict):
config["store"] = {}
store = config["store"]
bucket = store.setdefault(subtype_l, {})
if not isinstance(bucket, dict):
store[subtype_l] = {}
bucket = store[subtype_l]
existing = bucket.get(name_l)
if isinstance(existing, dict):
_merge_dict_inplace(existing, payload)
else:
bucket[name_l] = payload
return
if kind_l == "provider":
provider_name = str(subtype).strip().lower()
provider = config.setdefault("provider", {})
if not isinstance(provider, dict):
config["provider"] = {}
provider = config["provider"]
existing = provider.get(provider_name)
if isinstance(existing, dict):
_merge_dict_inplace(existing, block)
else:
provider[provider_name] = dict(block)
return
if kind_l == "tool":
tool_name = str(subtype).strip().lower()
if not tool_name:
return
tool = config.setdefault("tool", {})
if not isinstance(tool, dict):
config["tool"] = {}
tool = config["tool"]
existing = tool.get(tool_name)
if isinstance(existing, dict):
_merge_dict_inplace(existing, block)
else:
tool[tool_name] = dict(block)
return
return
def parse_conf_text(text: str, *, base: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Parse a lightweight .conf format into the app's config dict.
Supported patterns:
- Top-level key/value: temp="./temp"
- Sections: [store=hydrusnetwork] + name/access key/url lines
- Sections: [provider=OpenLibrary] + email/password lines
- Dotted keys: store.hydrusnetwork.<name>.url="http://..." (optional)
"""
config: Dict[str, Any] = dict(base or {})
current_kind: Optional[str] = None
current_subtype: Optional[str] = None
current_block: Dict[str, Any] = {}
def flush() -> None:
nonlocal current_kind, current_subtype, current_block
if current_kind and current_subtype and current_block:
_apply_conf_block(config, current_kind, current_subtype, current_block)
current_kind = None
current_subtype = None
current_block = {}
for raw_line in text.splitlines():
line = _strip_inline_comment(raw_line)
if not line.strip():
continue
stripped = line.strip()
if stripped.startswith("[") and stripped.endswith("]"):
flush()
header = stripped[1:-1].strip()
if "=" in header:
k, v = header.split("=", 1)
current_kind = k.strip()
current_subtype = v.strip()
"""Persist configuration to the database."""
try:
from SYS.database import save_config_value
for key, value in config.items():
if key in ('store', 'provider', 'tool'):
if isinstance(value, dict):
for subtype, instances in value.items():
if isinstance(instances, dict):
# provider/tool are usually config[cat][subtype][key]
# but store is config['store'][subtype][name][key]
if key == 'store':
for name, settings in instances.items():
if isinstance(settings, dict):
for k, v in settings.items():
save_config_value(key, subtype, name, k, v)
else:
for k, v in instances.items():
save_config_value(key, subtype, "default", k, v)
else:
# Unknown header style; ignore block
current_kind = None
current_subtype = None
continue
# global settings
if not key.startswith("_"):
save_config_value("global", "none", "none", key, value)
except Exception as e:
log(f"Failed to save config to database: {e}")
if "=" not in stripped:
continue
key, value = stripped.split("=", 1)
key = key.strip()
parsed_val = _parse_scalar(value)
if current_kind and current_subtype:
current_block[key] = parsed_val
else:
if "." in key:
_set_nested(config, key, parsed_val)
else:
config[key] = parsed_val
flush()
return config
_CONFIG_CACHE["db_config"] = config
def _load_conf_config(base_dir: Path, config_path: Path) -> Dict[str, Any]:
config: Dict[str, Any] = {}
raw = config_path.read_text(encoding="utf-8")
config = parse_conf_text(raw, base=config)
conf_dir = base_dir / "config.d"
if conf_dir.exists() and conf_dir.is_dir():
for frag in sorted(conf_dir.glob("*.conf")):
try:
frag_raw = frag.read_text(encoding="utf-8")
config = parse_conf_text(frag_raw, base=config)
except OSError as exc:
log(f"Failed to read {frag}: {exc}")
return config
def load() -> Dict[str, Any]:
"""Return the parsed configuration from database."""
return load_config()
def _format_conf_value(val: Any) -> str:
if isinstance(val, bool):
return "true" if val else "false"
if isinstance(val, (int, float)):
return str(val)
if val is None:
return '""'
s = str(val)
s = s.replace('"', '\\"')
return f'"{s}"'
def _serialize_conf(config: Dict[str, Any]) -> str:
lines: list[str] = []
# Top-level scalars first
for key in sorted(config.keys()):
if key in {"store", "provider", "tool", "networking"}:
continue
value = config.get(key)
if isinstance(value, dict):
continue
lines.append(f"{key}={_format_conf_value(value)}")
# Store blocks
store = config.get("store")
if isinstance(store, dict):
for subtype in sorted(store.keys()):
bucket = store.get(subtype)
if not isinstance(bucket, dict):
continue
for name in sorted(bucket.keys()):
block = bucket.get(name)
if not isinstance(block, dict):
continue
lines.append("")
lines.append(f"[store={subtype}]")
lines.append(f"name={_format_conf_value(name)}")
# Deduplicate keys case-insensitively and skip "name"
seen_keys = {"NAME", "name"}
for k in sorted(block.keys()):
k_upper = k.upper()
if k_upper in seen_keys:
continue
seen_keys.add(k_upper)
lines.append(f"{k}={_format_conf_value(block.get(k))}")
# Provider blocks
provider = config.get("provider")
if isinstance(provider, dict):
for prov in sorted(provider.keys()):
block = provider.get(prov)
if not isinstance(block, dict):
continue
lines.append("")
lines.append(f"[provider={prov}]")
seen_keys = set()
for k in sorted(block.keys()):
k_upper = k.upper()
if k_upper in seen_keys:
continue
seen_keys.add(k_upper)
lines.append(f"{k}={_format_conf_value(block.get(k))}")
# Tool blocks
tool = config.get("tool")
if isinstance(tool, dict):
for name in sorted(tool.keys()):
block = tool.get(name)
if not isinstance(block, dict):
continue
lines.append("")
lines.append(f"[tool={name}]")
seen_keys = set()
for k in sorted(block.keys()):
k_upper = k.upper()
if k_upper in seen_keys:
continue
seen_keys.add(k_upper)
lines.append(f"{k}={_format_conf_value(block.get(k))}")
return "\n".join(lines).rstrip() + "\n"
def save(config: Dict[str, Any]) -> None:
"""Persist *config* back to database."""
save_config(config)
def _make_cache_key(config_dir: Optional[Path], filename: str, actual_path: Optional[Path]) -> str:

View File

@@ -603,8 +603,8 @@ def main() -> int:
# If the current interpreter is the one inside the local venv, try to
# run the uninstall via a Python outside the venv so files (including
# the interpreter binary) can be removed on Windows.
current_exe = Path(sys.executable).resolve()
try:
current_exe = Path(sys.executable).resolve()
in_venv = str(current_exe).lower().startswith(str(vdir.resolve()).lower())
except Exception:
in_venv = False
@@ -751,7 +751,6 @@ def main() -> int:
if db_path.exists():
try:
import sqlite3
import json
with sqlite3.connect(str(db_path)) as conn:
# We want to set store.hydrusnetwork.hydrus.<key>
cur = conn.cursor()
@@ -940,7 +939,7 @@ def main() -> int:
try:
# When piped, script_path is None. We don't want to use the detected repo_root
# because that's just CWD.
if script_path is not None:
if script_path is not None and repo_root is not None:
default_install = repo_root
else:
# When piped, default to home folder on POSIX, CWD on Windows
@@ -964,6 +963,11 @@ def main() -> int:
# Resolve while expanding user paths (~) and environment variables ($HOME)
expanded = os.path.expandvars(os.path.expanduser(install_dir_raw))
install_path = Path(expanded).resolve()
if install_path is None:
print("Error: Could not determine installation path.", file=sys.stderr)
return False
except (EOFError, KeyboardInterrupt):
return False
@@ -1143,153 +1147,166 @@ def main() -> int:
# If no specific action flag is passed and we're in a terminal (or we're being piped), show the menu
if (sys.stdin.isatty() or sys.stdout.isatty() or script_path is None) and not args.quiet:
sel = _interactive_menu()
if sel == "install":
if not _ensure_repo_available():
return 1
args.skip_deps = False
args.install_editable = True
args.no_playwright = False
elif sel == "extras_hydrus":
install_location = _prompt_hydrus_install_location()
if install_location is None:
return 0
install_root, install_dest = install_location
# Choice 2 is for installing HydrusNetwork standalone/independently.
# We preferentially use the local script if already in a repo.
hydrus_script = None
temp_installer_path: Path | None = None
temp_hydrus_repo: Path | None = None
if is_in_repo and repo_root:
hydrus_script = repo_root / "scripts" / "hydrusnetwork.py"
if not hydrus_script or not hydrus_script.exists():
print("Downloading the Hydrus installation helper...")
try:
fd, path = tempfile.mkstemp(prefix="mm_hydrus_", suffix=".py")
os.close(fd)
helper_path = Path(path)
if _download_hydrus_installer(helper_path):
hydrus_script = helper_path
temp_installer_path = helper_path
else:
helper_path.unlink(missing_ok=True)
while True:
sel = _interactive_menu()
if sel == "install":
if not _ensure_repo_available():
return 1
args.skip_deps = False
args.install_editable = True
args.no_playwright = False
# Break the loop to proceed with the main installation steps below
break
elif sel == "extras_hydrus":
install_location = _prompt_hydrus_install_location()
if install_location is None:
continue
install_root, install_dest = install_location
# Choice 2 is for installing HydrusNetwork standalone/independently.
# We preferentially use the local script if already in a repo.
hydrus_script = None
temp_installer_path: Path | None = None
temp_hydrus_repo: Path | None = None
if is_in_repo and repo_root:
hydrus_script = repo_root / "scripts" / "hydrusnetwork.py"
if not hydrus_script or not hydrus_script.exists():
print("Downloading the Hydrus installation helper...")
try:
fd, path = tempfile.mkstemp(prefix="mm_hydrus_", suffix=".py")
os.close(fd)
helper_path = Path(path)
if _download_hydrus_installer(helper_path):
hydrus_script = helper_path
temp_installer_path = helper_path
else:
helper_path.unlink(missing_ok=True)
hydrus_script = None
except Exception as e:
print(f"Error setting up temporary installer: {e}")
hydrus_script = None
except Exception as e:
print(f"Error setting up temporary installer: {e}")
hydrus_script = None
if (not hydrus_script or not hydrus_script.exists()) and temp_hydrus_repo is None:
print("Falling back to clone the Medios-Macina repository to obtain the helper script...")
try:
temp_mm_repo_dir = Path(tempfile.mkdtemp(prefix="mm_repo_"))
if _clone_repo(REPO_URL, temp_mm_repo_dir, depth=1):
hydrus_script = temp_mm_repo_dir / "scripts" / "hydrusnetwork.py"
temp_hydrus_repo = temp_mm_repo_dir
else:
shutil.rmtree(temp_mm_repo_dir, ignore_errors=True)
if (not hydrus_script or not hydrus_script.exists()) and temp_hydrus_repo is None:
print("Falling back to clone the Medios-Macina repository to obtain the helper script...")
try:
temp_mm_repo_dir = Path(tempfile.mkdtemp(prefix="mm_repo_"))
if _clone_repo(REPO_URL, temp_mm_repo_dir, depth=1):
hydrus_script = temp_mm_repo_dir / "scripts" / "hydrusnetwork.py"
temp_hydrus_repo = temp_mm_repo_dir
else:
shutil.rmtree(temp_mm_repo_dir, ignore_errors=True)
hydrus_script = None
except Exception as e:
print(f"Error cloning Medios-Macina repo: {e}")
hydrus_script = None
except Exception as e:
print(f"Error cloning Medios-Macina repo: {e}")
hydrus_script = None
if hydrus_script and hydrus_script.exists():
try:
# Clear out project-venv related env vars to prevent auto-reexec
env = os.environ.copy()
env.pop("VIRTUAL_ENV", None)
env.pop("PYTHONHOME", None)
env.pop("PYTHONPATH", None)
# We use sys.executable (the one running bootstrap.py) to run hydrusnetwork.py
# This ensures it uses the same environment that started the bootstrap.
# Pass sys.stdin to ensure the subprocess can talk to the terminal.
subprocess.check_call(
[
sys.executable,
str(hydrus_script),
"--no-project-venv",
"--root",
str(install_root),
"--dest-name",
install_dest,
],
env=env,
stdin=sys.stdin
)
except subprocess.CalledProcessError:
print("\nHydrusNetwork setup exited with an error.")
except Exception as e:
print(f"\nFailed to run HydrusNetwork setup: {e}")
finally:
if temp_installer_path:
temp_installer_path.unlink(missing_ok=True)
if temp_hydrus_repo is not None:
shutil.rmtree(temp_hydrus_repo, ignore_errors=True)
else:
print(f"\nError: {hydrus_script} not found.")
return 0
elif sel == "install_service":
# Direct path input for the target repository
print("\n[ SYSTEM SERVICE INSTALLATION ]")
print("Enter the root directory of the Hydrus repository you want to run as a service.")
print("This is the folder containing 'hydrus_client.py'.")
# Default to repo_root/hydrusnetwork if available, otherwise CWD
default_path = repo_root / "hydrusnetwork" if repo_root else Path.cwd()
sys.stdout.write(f"Repository Root [{default_path}]: ")
sys.stdout.flush()
path_raw = sys.stdin.readline().strip()
target_repo = Path(path_raw).resolve() if path_raw else default_path
if hydrus_script and hydrus_script.exists():
try:
# Clear out project-venv related env vars to prevent auto-reexec
env = os.environ.copy()
env.pop("VIRTUAL_ENV", None)
env.pop("PYTHONHOME", None)
env.pop("PYTHONPATH", None)
# We use sys.executable (the one running bootstrap.py) to run hydrusnetwork.py
# This ensures it uses the same environment that started the bootstrap.
# Pass sys.stdin to ensure the subprocess can talk to the terminal.
subprocess.check_call(
[
sys.executable,
str(hydrus_script),
"--no-project-venv",
"--root",
str(install_root),
"--dest-name",
install_dest,
],
env=env,
stdin=sys.stdin
)
# Update the main project's config with the new Hydrus path
if is_in_repo and repo_root:
_update_config_value(repo_root, "gitclone", str(Path(install_root) / install_dest))
except subprocess.CalledProcessError:
print("\nHydrusNetwork setup exited with an error.")
except Exception as e:
print(f"\nFailed to run HydrusNetwork setup: {e}")
finally:
if temp_installer_path:
temp_installer_path.unlink(missing_ok=True)
if temp_hydrus_repo is not None:
shutil.rmtree(temp_hydrus_repo, ignore_errors=True)
else:
print(f"\nError: {hydrus_script} not found.")
print("\nHydrus installation task finished.")
sys.stdout.write("Press Enter to return to menu...")
sys.stdout.flush()
sys.stdin.readline()
continue
elif sel == "install_service":
# Direct path input for the target repository
print("\n[ SYSTEM SERVICE INSTALLATION ]")
print("Enter the root directory of the Hydrus repository you want to run as a service.")
print("This is the folder containing 'hydrus_client.py'.")
# Default to repo_root/hydrusnetwork if available, otherwise CWD
default_path = repo_root / "hydrusnetwork" if repo_root else Path.cwd()
sys.stdout.write(f"Repository Root [{default_path}]: ")
sys.stdout.flush()
path_raw = sys.stdin.readline().strip()
target_repo = Path(path_raw).resolve() if path_raw else default_path
if not (target_repo / "hydrus_client.py").exists():
print(f"\n[!] Error: 'hydrus_client.py' not found in: {target_repo}")
print(" Please ensure you've entered the correct repository root.")
if not (target_repo / "hydrus_client.py").exists():
print(f"\n[!] Error: 'hydrus_client.py' not found in: {target_repo}")
print(" Please ensure you've entered the correct repository root.")
sys.stdout.write("\nPress Enter to return to menu...")
sys.stdout.flush()
sys.stdin.readline()
continue
run_client_script = repo_root / "scripts" / "run_client.py" if repo_root else Path(__file__).parent / "run_client.py"
if run_client_script.exists():
try:
# We pass --repo-root explicitly to the target_repo provided by the user
subprocess.check_call(
[
sys.executable,
str(run_client_script),
"--install-service",
"--service-name", "hydrus-client",
"--repo-root", str(target_repo),
"--headless",
"--pull"
],
stdin=sys.stdin
)
print("\nHydrus System service installed successfully.")
except subprocess.CalledProcessError:
print("\nService installation failed.")
except Exception as e:
print(f"\nError installing service: {e}")
else:
print(f"\nError: {run_client_script} not found.")
sys.stdout.write("\nPress Enter to return to menu...")
sys.stdout.flush()
sys.stdin.readline()
return "menu"
run_client_script = repo_root / "scripts" / "run_client.py" if repo_root else Path(__file__).parent / "run_client.py"
if run_client_script.exists():
try:
# We pass --repo-root explicitly to the target_repo provided by the user
subprocess.check_call(
[
sys.executable,
str(run_client_script),
"--install-service",
"--service-name", "hydrus-client",
"--repo-root", str(target_repo),
"--headless",
"--pull"
],
stdin=sys.stdin
)
print("\nHydrus System service installed successfully.")
except subprocess.CalledProcessError:
print("\nService installation failed.")
except Exception as e:
print(f"\nError installing service: {e}")
else:
print(f"\nError: {run_client_script} not found.")
sys.stdout.write("\nPress Enter to continue...")
sys.stdout.flush()
sys.stdin.readline()
return "menu"
elif sel == "uninstall":
return _do_uninstall()
elif sel == "delegate":
rc = run_platform_bootstrap(repo_root)
if rc != 0:
return rc
if not args.quiet:
print("Platform bootstrap completed successfully.")
return 0
elif sel == 0:
return 0
continue
elif sel == "uninstall":
return _do_uninstall()
elif sel == "delegate":
rc = run_platform_bootstrap(repo_root)
if rc != 0:
return rc
if not args.quiet:
print("Platform bootstrap completed successfully.")
return 0
elif sel == 0:
return 0
elif sel == "menu":
continue
elif not args.no_delegate and script_path is not None:
# Default non-interactive behavior: delegate to platform script
rc = run_platform_bootstrap(repo_root)
@@ -1742,7 +1759,14 @@ if (Test-Path (Join-Path $repo 'CLI.py')) {
else:
# POSIX
# If running as root (id 0), prefer /usr/bin or /usr/local/bin which are standard on PATH
if hasattr(os, "getuid") and os.getuid() == 0:
is_root = False
try:
if platform.system().lower() != "windows" and os.getuid() == 0:
is_root = True
except (AttributeError, Exception):
pass
if is_root:
user_bin = Path("/usr/local/bin")
if not os.access(user_bin, os.W_OK):
user_bin = Path("/usr/bin")

View File

@@ -192,34 +192,6 @@ def update_medios_config(hydrus_path: Path) -> bool:
except Exception as e:
logging.error("Failed to update medios.db: %s", e)
# Fallback to config.conf
if not config_path.exists():
logging.debug("MM config.conf not found at %s; skipping legacy auto-link.", config_path)
return False
try:
content = config_path.read_text(encoding="utf-8")
key = "gitclone"
value = hydrus_abs_path
# Pattern to replace existing gitclone in the hydrusnetwork section
pattern = rf'^(\s*{re.escape(key)}\s*=\s*)(.*)$'
if re.search(pattern, content, flags=re.MULTILINE):
new_content = re.sub(pattern, rf'\1"{value}"', content, flags=re.MULTILINE)
else:
section_pattern = r'\[store=hydrusnetwork\]'
if re.search(section_pattern, content):
new_content = re.sub(section_pattern, f'[store=hydrusnetwork]\n{key}="{value}"', content, count=1)
else:
new_content = content + f'\n\n[store=hydrusnetwork]\nname="hydrus"\n{key}="{value}"'
if new_content != content:
config_path.write_text(new_content, encoding="utf-8")
logging.info("✅ Linked Hydrus installation in Medios-Macina config (gitclone=\"%s\")", value)
return True
except Exception as e:
logging.error("Failed to update config.conf: %s", e)
return False
return False
@@ -1124,6 +1096,7 @@ def main(argv: Optional[list[str]] = None) -> int:
"show-in-file-manager": "showinfm",
"opencv-python-headless": "cv2",
"mpv": "mpv",
"python-mpv": "mpv",
"pyside6": "PySide6",
"pyside6-essentials": "PySide6",
"pyside6-addons": "PySide6",
@@ -1140,6 +1113,11 @@ def main(argv: Optional[list[str]] = None) -> int:
stderr=subprocess.DEVNULL,
)
except Exception:
if mod == "mpv":
# python-mpv requires system libmpv; failure is common on server/headless envs
logging.info("Package '%s' is installed, but 'import %s' failed (likely missing system libmpv). This is usually non-critical.", pkg, mod)
continue
logging.warning(
"Package '%s' not importable inside venv (module %s)",
pkg,
@@ -1439,6 +1417,7 @@ def main(argv: Optional[list[str]] = None) -> int:
"show-in-file-manager": "showinfm",
"opencv-python-headless": "cv2",
"mpv": "mpv",
"python-mpv": "mpv",
"pyside6": "PySide6",
"pyside6-essentials": "PySide6",
"pyside6-addons": "PySide6",
@@ -1476,6 +1455,11 @@ def main(argv: Optional[list[str]] = None) -> int:
stderr=subprocess.DEVNULL,
)
except subprocess.CalledProcessError:
if import_name == "mpv":
# python-mpv requires system libmpv; failure is common on server/headless envs
logging.info("Package '%s' is installed, but 'import %s' failed (likely missing system libmpv). This is usually non-critical.", pkg, import_name)
continue
logging.warning(
"Package '%s' appears installed but 'import %s' failed inside venv.",
pkg,