Files
Medios-Macina/scripts/hydrusnetwork.py
Nose cea208d3b9
Some checks failed
smoke-mm / Install & smoke test mm --help (push) Has been cancelled
klhj
2025-12-28 16:52:47 -08:00

566 lines
23 KiB
Python

#!/usr/bin/env python3
"""Create a 'hydrusnetwork' directory and clone the Hydrus repository into it.
Works on Linux and Windows. Behavior:
- By default creates ./hydrusnetwork and clones https://github.com/hydrusnetwork/hydrus there.
- If the target directory already exists:
- Use --update to run `git pull` (if it's a git repo).
- Use --force to remove it and re-clone.
- If `git` is not available, the script will fall back to downloading the repository ZIP and extracting it.
- By default the script will create a repository-local virtual environment `./<dest>/.venv` after cloning/extraction; use `--no-venv` to skip this. Use `--install-deps` to install requirements.txt into that venv.
Examples:
python scripts/hydrusnetwork.py
python scripts/hydrusnetwork.py --root /opt --dest-name hydrusnetwork --force
python scripts/hydrusnetwork.py --update
"""
from __future__ import annotations
import argparse
import os
import shutil
import subprocess
import sys
import tempfile
import urllib.request
import zipfile
from pathlib import Path
from typing import Optional, Tuple
import logging
logging.basicConfig(level=logging.INFO, format="%(message)s")
def find_git_executable() -> Optional[str]:
"""Return the git executable path or None if not found."""
import shutil as _shutil
git = _shutil.which("git")
if not git:
return None
# Quick sanity check
try:
subprocess.run([git, "--version"], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return git
except Exception:
return None
def is_git_repo(path: Path) -> bool:
"""Determine whether the given path is a git working tree."""
if not path.exists() or not path.is_dir():
return False
if (path / ".git").exists():
return True
git = find_git_executable()
if not git:
return False
try:
subprocess.run([git, "-C", str(path), "rev-parse", "--is-inside-work-tree"], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return True
except Exception:
return False
def run_git_clone(git: str, repo: str, dest: Path, branch: Optional[str] = None, depth: Optional[int] = None) -> None:
# Build git clone with options before the repository argument. Support shallow clones
# via --depth when requested.
cmd = [git, "clone"]
if depth is not None and depth > 0:
cmd += ["--depth", str(int(depth))]
# For performance/clarity, when doing a shallow clone of a specific branch,
# prefer --single-branch to avoid fetching other refs.
if branch:
cmd += ["--single-branch"]
if branch:
cmd += ["--branch", branch]
cmd += [repo, str(dest)]
logging.info("Cloning: %s -> %s (depth=%s)", repo, dest, str(depth) if depth else "full")
subprocess.run(cmd, check=True)
def run_git_pull(git: str, dest: Path) -> None:
logging.info("Updating git repository in %s", dest)
subprocess.run([git, "-C", str(dest), "pull"], check=True)
def download_and_extract_zip(repo_url: str, dest: Path, branch_candidates: Tuple[str, ...] = ("main", "master")) -> None:
"""Download the GitHub repo zip and extract it into dest.
This avoids requiring git to be installed.
"""
# By default, if a project virtualenv is detected (".venv" or "venv" under
# the chosen --root, or $VIRTUAL_ENV), the script will re-exec itself under
# that venv's python interpreter so subsequent operations use the project
# environment. Use --no-project-venv to opt out of this behavior.
# Parse owner/repo from URL like https://github.com/owner/repo
try:
from urllib.parse import urlparse
p = urlparse(repo_url)
parts = [p for p in p.path.split("/") if p]
if len(parts) < 2:
raise ValueError("Cannot parse owner/repo from URL")
owner, repo = parts[0], parts[1]
except Exception:
raise RuntimeError(f"Invalid repo URL: {repo_url}")
errors = []
for branch in branch_candidates:
zip_url = f"https://github.com/{owner}/{repo}/archive/refs/heads/{branch}.zip"
logging.info("Attempting ZIP download: %s", zip_url)
try:
with urllib.request.urlopen(zip_url) as resp:
if resp.status != 200:
raise RuntimeError(f"HTTP {resp.status} while fetching {zip_url}")
with tempfile.TemporaryDirectory() as td:
tmpzip = Path(td) / "repo.zip"
with open(tmpzip, "wb") as fh:
fh.write(resp.read())
with zipfile.ZipFile(tmpzip, "r") as z:
z.extractall(td)
# Extracted content usually at repo-<branch>/
extracted_root = None
td_path = Path(td)
for child in td_path.iterdir():
if child.is_dir():
extracted_root = child
break
if not extracted_root:
raise RuntimeError("Broken ZIP: no extracted directory found")
# Move contents of extracted_root into dest
dest.mkdir(parents=True, exist_ok=True)
for entry in extracted_root.iterdir():
target = dest / entry.name
if target.exists():
# Try to remove before moving
if target.is_dir():
shutil.rmtree(target)
else:
target.unlink()
shutil.move(str(entry), str(dest))
logging.info("Downloaded and extracted %s (branch: %s) into %s", repo_url, branch, dest)
return
except Exception as exc:
errors.append(str(exc))
logging.debug("ZIP download failed for branch %s: %s", branch, exc)
continue
# If we failed for all branches
raise RuntimeError(f"Failed to download zip for {repo_url}; errors: {errors}")
# --- Project venv helpers -------------------------------------------------
def get_python_in_venv(venv_dir: Path) -> Optional[Path]:
"""Return path to python executable inside a venv-like directory, or None."""
try:
v = Path(venv_dir)
# Windows
win_python = v / "Scripts" / "python.exe"
if win_python.exists():
return win_python
# Unix
unix_python = v / "bin" / "python"
if unix_python.exists():
return unix_python
unix_py3 = v / "bin" / "python3"
if unix_py3.exists():
return unix_py3
except Exception:
pass
return None
def find_project_venv(root: Path) -> Optional[Path]:
"""Find a project venv directory under the given root (or VIRTUAL_ENV).
Checks, in order: $VIRTUAL_ENV, <root>/.venv, <root>/venv
Returns the Path to the venv dir if it looks valid, else None.
"""
candidates = []
try:
venv_env = os.environ.get("VIRTUAL_ENV")
if venv_env:
candidates.append(Path(venv_env))
except Exception:
pass
candidates.extend([root / ".venv", root / "venv"]) # order matters: prefer .venv
for c in candidates:
try:
if c and c.exists():
py = get_python_in_venv(c)
if py is not None:
return c
except Exception:
continue
return None
def maybe_reexec_under_project_venv(root: Path, disable: bool = False) -> None:
"""If a project venv exists and we are not already running under it, re-exec
the current script using that venv's python interpreter.
This makes the script "use the project venv by default" when present.
"""
if disable:
return
# Avoid infinite re-exec loops
if os.environ.get("HYDRUSNETWORK_REEXEC") == "1":
return
try:
venv_dir = find_project_venv(root)
if not venv_dir:
return
py = get_python_in_venv(venv_dir)
if not py:
return
current = Path(sys.executable)
try:
# If current interpreter is the same as venv's python, skip.
if current.resolve() == py.resolve():
return
except Exception:
pass
logging.info("Re-executing under project venv: %s", py)
env = os.environ.copy()
env["HYDRUSNETWORK_REEXEC"] = "1"
# Use absolute script path to avoid any relative path quirks.
try:
script_path = Path(sys.argv[0]).resolve()
except Exception:
script_path = None
args = [str(py), str(script_path) if script_path is not None else sys.argv[0]] + sys.argv[1:]
logging.debug("Exec args: %s", args)
os.execvpe(str(py), args, env)
except Exception as exc:
logging.debug("Failed to re-exec under project venv: %s", exc)
return
# --- Permissions helpers -------------------------------------------------
def is_elevated() -> bool:
"""Return True if the current process is elevated (Windows) or running as root (Unix)."""
try:
if os.name == "nt":
import ctypes
try:
return bool(ctypes.windll.shell32.IsUserAnAdmin())
except Exception:
return False
else:
try:
return os.geteuid() == 0
except Exception:
return False
except Exception:
return False
def fix_permissions_windows(path: Path, user: Optional[str] = None) -> bool:
"""Attempt to set owner and grant FullControl via icacls/takeown.
Returns True if commands report success; otherwise False. May require elevation.
"""
import getpass
import subprocess
try:
if not user:
try:
who = subprocess.check_output(["whoami"], text=True).strip()
user = who or getpass.getuser()
except Exception:
user = getpass.getuser()
logging.info("Attempting Windows ownership/ACL fix for %s (owner=%s)", path, user)
# Try to take ownership (best-effort)
try:
subprocess.run(["takeown", "/F", str(path), "/R", "/D", "Y"], check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
except Exception:
pass
rc_setowner = 1
rc_grant = 1
try:
out = subprocess.run(["icacls", str(path), "/setowner", user, "/T", "/C"], check=False)
rc_setowner = int(out.returncode)
except Exception:
rc_setowner = 1
try:
out = subprocess.run(["icacls", str(path), "/grant", f"{user}:(OI)(CI)F", "/T", "/C"], check=False)
rc_grant = int(out.returncode)
except Exception:
rc_grant = 1
success = (rc_setowner == 0 or rc_grant == 0)
if success:
logging.info("Windows permission fix succeeded (owner/grant applied).")
else:
logging.warning("Windows permission fix did not fully succeed (setowner/grant may require elevation).")
return success
except Exception as exc:
logging.debug("Windows fix-permissions error: %s", exc)
return False
def fix_permissions_unix(path: Path, user: Optional[str] = None, group: Optional[str] = None) -> bool:
"""Attempt to chown/chmod recursively for a Unix-like system.
Returns True if operations were attempted; may still fail for some files if not root.
"""
import getpass
import pwd
import grp
import subprocess
try:
if not user:
user = getpass.getuser()
try:
pw = pwd.getpwnam(user)
uid = pw.pw_uid
gid = pw.pw_gid if not group else grp.getgrnam(group).gr_gid
except Exception:
logging.warning("Could not resolve user/group to uid/gid; skipping chown.")
return False
logging.info("Attempting to chown recursively to %s:%s (may require root)...", user, group or pw.pw_gid)
try:
subprocess.run(["chown", "-R", f"{user}:{group or pw.pw_gid}", str(path)], check=True)
except Exception:
# Best-effort fallback: chown/chmod individual entries
for root_dir, dirs, files in os.walk(path):
try:
os.chown(root_dir, uid, gid)
except Exception:
pass
for fn in files:
fpath = os.path.join(root_dir, fn)
try:
os.chown(fpath, uid, gid)
except Exception:
pass
# Fix modes: directories 0o755, files 0o644 (best-effort)
for root_dir, dirs, files in os.walk(path):
for d in dirs:
try:
os.chmod(os.path.join(root_dir, d), 0o755)
except Exception:
pass
for f in files:
try:
os.chmod(os.path.join(root_dir, f), 0o644)
except Exception:
pass
logging.info("Unix permission fix attempted (some changes may require root privilege).")
return True
except Exception as exc:
logging.debug("Unix fix-permissions error: %s", exc)
return False
def fix_permissions(path: Path, user: Optional[str] = None, group: Optional[str] = None) -> bool:
try:
if os.name == "nt":
return fix_permissions_windows(path, user=user)
else:
return fix_permissions_unix(path, user=user, group=group)
except Exception as exc:
logging.debug("General fix-permissions error: %s", exc)
return False
def main(argv: Optional[list[str]] = None) -> int:
parser = argparse.ArgumentParser(description="Clone Hydrus into a 'hydrusnetwork' directory.")
parser.add_argument("--root", "-r", default=".", help="Root folder to create the hydrusnetwork directory in (default: current working directory)")
parser.add_argument("--dest-name", "-d", default="hydrusnetwork", help="Name of the destination folder (default: hydrusnetwork)")
parser.add_argument("--repo", default="https://github.com/hydrusnetwork/hydrus", help="Repository URL to clone")
parser.add_argument("--update", action="store_true", help="If dest exists and is a git repo, run git pull instead of cloning")
parser.add_argument("--force", "-f", action="store_true", help="Remove existing destination directory before cloning")
parser.add_argument("--branch", "-b", default=None, help="Branch to clone (passed to git clone --branch).")
parser.add_argument("--depth", type=int, default=1, help="If set, pass --depth to git clone (default: 1 for a shallow clone). Use --full to perform a full clone instead.")
parser.add_argument("--full", action="store_true", help="Perform a full clone (no --depth passed to git clone)")
parser.add_argument("--git", action="store_true", help="Use git clone instead of fetching repository ZIP (opt-in). Default: fetch ZIP (smaller).")
parser.add_argument("--no-fallback", action="store_true", help="If set, do not attempt to download ZIP when git is missing (only relevant with --git)")
parser.add_argument("--fix-permissions", action="store_true", help="Fix ownership/permissions on the obtained repo (OS-aware). Requires elevated privileges for some actions.")
parser.add_argument("--fix-permissions-user", default=None, help="User to set as owner when fixing permissions (defaults to current user).")
parser.add_argument("--fix-permissions-group", default=None, help="Group to set when fixing permissions (Unix only).")
parser.add_argument("--no-venv", action="store_true", help="Do not create a venv inside the cloned repo (default: create a .venv folder)")
parser.add_argument("--venv-name", default=".venv", help="Name of the venv directory to create inside the repo (default: .venv)")
parser.add_argument("--recreate-venv", action="store_true", help="Remove existing venv and create a fresh one")
parser.add_argument("--install-deps", action="store_true", help="If present, install dependencies from requirements.txt into the created venv")
parser.add_argument("--no-project-venv", action="store_true", help="Do not attempt to re-exec the script under a project venv (if present)")
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose logging")
args = parser.parse_args(argv)
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
root = Path(args.root).expanduser().resolve()
# Re-exec under project venv by default when present (opt-out with --no-project-venv)
try:
maybe_reexec_under_project_venv(root, disable=bool(args.no_project_venv))
except Exception:
pass
dest = root / args.dest_name
try:
git = find_git_executable()
if dest.exists():
if args.force:
logging.info("Removing existing directory: %s", dest)
shutil.rmtree(dest)
else:
# If it's a git repo and user asked to update, do a pull
if is_git_repo(dest):
if args.update:
if not git:
logging.error("Git not found; cannot --update without git")
return 2
try:
run_git_pull(git, dest)
logging.info("Updated repository in %s", dest)
return 0
except subprocess.CalledProcessError as e:
logging.error("git pull failed: %s", e)
return 3
else:
logging.info("Destination %s is already a git repository. Use --update to pull or --force to re-clone.", dest)
return 0
# If directory isn't a git repo and not empty, avoid overwriting
if any(dest.iterdir()):
logging.error("Destination %s already exists and is not empty. Use --force to overwrite.", dest)
return 4
# Empty directory: attempt clone into it
# Ensure parent exists
dest.parent.mkdir(parents=True, exist_ok=True)
obtained = False
obtained_by: Optional[str] = None
# If user explicitly requested git, try that first
if args.git:
if git:
try:
# Default behavior when using git: shallow clone (depth=1) unless --full specified.
depth_to_use = None if getattr(args, "full", False) else args.depth
run_git_clone(git, args.repo, dest, branch=args.branch, depth=depth_to_use)
logging.info("Repository cloned into %s", dest)
obtained = True
obtained_by = "git"
except subprocess.CalledProcessError as e:
logging.error("git clone failed: %s", e)
if args.no_fallback:
return 5
logging.info("Git clone failed; falling back to ZIP download...")
else:
logging.info("Git not found; falling back to ZIP download...")
# If not obtained via git, try ZIP fetch (default behavior)
if not obtained:
try:
download_and_extract_zip(args.repo, dest)
logging.info("Repository downloaded and extracted into %s", dest)
obtained = True
obtained_by = "zip"
except Exception as exc:
logging.error("Failed to obtain repository (ZIP): %s", exc)
return 7
# Post-obtain setup: create repository-local venv (unless disabled)
if not getattr(args, "no_venv", False):
try:
venv_py = None
venv_dir = dest / str(getattr(args, "venv_name", ".venv"))
if venv_dir.exists():
if getattr(args, "recreate_venv", False):
logging.info("Removing existing venv: %s", venv_dir)
shutil.rmtree(venv_dir)
else:
logging.info("Using existing venv at %s", venv_dir)
if not venv_dir.exists():
logging.info("Creating venv at %s", venv_dir)
try:
subprocess.run([sys.executable, "-m", "venv", str(venv_dir)], check=True)
except subprocess.CalledProcessError as e:
logging.error("Failed to create venv: %s", e)
return 8
try:
venv_py = get_python_in_venv(venv_dir)
except Exception:
venv_py = None
if not venv_py:
logging.error("Could not locate python in venv %s", venv_dir)
return 9
logging.info("Venv ready: %s", venv_py)
# Optionally install requirements.txt
if getattr(args, "install_deps", False):
req = dest / "requirements.txt"
if req.exists():
logging.info("Installing dependencies from %s into venv", req)
try:
subprocess.run([str(venv_py), "-m", "pip", "install", "--upgrade", "pip"], check=True)
subprocess.run([str(venv_py), "-m", "pip", "install", "-r", str(req)], cwd=str(dest), check=True)
logging.info("Dependencies installed successfully")
except subprocess.CalledProcessError as e:
logging.error("Failed to install dependencies: %s", e)
return 10
else:
logging.info("No requirements.txt found; skipping dependency installation")
except Exception as exc:
logging.exception("Unexpected error during venv setup: %s", exc)
return 99
# Optionally fix permissions
if getattr(args, "fix_permissions", False):
logging.info("Fixing ownership/permissions for %s", dest)
fp_user = getattr(args, "fix_permissions_user", None)
fp_group = getattr(args, "fix_permissions_group", None)
try:
ok_perm = fix_permissions(dest, user=fp_user, group=fp_group)
if not ok_perm:
logging.warning("Permission fix reported issues or lacked privileges; some files may remain inaccessible.")
except Exception as exc:
logging.exception("Failed to fix permissions: %s", exc)
if getattr(args, "no_venv", False):
logging.info("Setup complete. Venv creation was skipped (use --venv-name/omit --no-venv to create one).")
else:
logging.info("Setup complete. To use the repository venv, activate it:")
if os.name == "nt":
logging.info(" %s\\Scripts\\activate", venv_dir)
else:
logging.info(" source %s/bin/activate", venv_dir)
return 0
except Exception as exc: # pragma: no cover - defensive
logging.exception("Unexpected error: %s", exc)
return 99
if __name__ == "__main__":
raise SystemExit(main())