#!/usr/bin/env python3 """Create a 'hydrusnetwork' directory and clone the Hydrus repository into it. Works on Linux and Windows. Behavior: - By default creates ./hydrusnetwork and clones https://github.com/hydrusnetwork/hydrus there. - If the target directory already exists: - Use --update to run `git pull` (if it's a git repo). - Use --force to remove it and re-clone. - If `git` is not available, the script will fall back to downloading the repository ZIP and extracting it. Examples: python scripts/hydrusnetwork.py python scripts/hydrusnetwork.py --root /opt --dest-name hydrusnetwork --force python scripts/hydrusnetwork.py --update """ from __future__ import annotations import argparse import shutil import subprocess import sys import tempfile import urllib.request import zipfile from pathlib import Path from typing import Optional, Tuple import logging logging.basicConfig(level=logging.INFO, format="%(message)s") def find_git_executable() -> Optional[str]: """Return the git executable path or None if not found.""" import shutil as _shutil git = _shutil.which("git") if not git: return None # Quick sanity check try: subprocess.run([git, "--version"], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) return git except Exception: return None def is_git_repo(path: Path) -> bool: """Determine whether the given path is a git working tree.""" if not path.exists() or not path.is_dir(): return False if (path / ".git").exists(): return True git = find_git_executable() if not git: return False try: subprocess.run([git, "-C", str(path), "rev-parse", "--is-inside-work-tree"], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) return True except Exception: return False def run_git_clone(git: str, repo: str, dest: Path, branch: Optional[str] = None, depth: Optional[int] = None) -> None: cmd = [git, "clone", repo, str(dest)] if branch: cmd[1:1] = [git] # noop just to keep style clear (no-op) cmd += ["--branch", branch] if depth is not None and depth > 0: cmd += ["--depth", str(int(depth))] logging.info("Cloning: %s -> %s", repo, dest) subprocess.run(cmd, check=True) def run_git_pull(git: str, dest: Path) -> None: logging.info("Updating git repository in %s", dest) subprocess.run([git, "-C", str(dest), "pull"], check=True) def download_and_extract_zip(repo_url: str, dest: Path, branch_candidates: Tuple[str, ...] = ("main", "master")) -> None: """Download the GitHub repo zip and extract it into dest. This avoids requiring git to be installed. """ # Parse owner/repo from URL like https://github.com/owner/repo try: from urllib.parse import urlparse p = urlparse(repo_url) parts = [p for p in p.path.split("/") if p] if len(parts) < 2: raise ValueError("Cannot parse owner/repo from URL") owner, repo = parts[0], parts[1] except Exception: raise RuntimeError(f"Invalid repo URL: {repo_url}") errors = [] for branch in branch_candidates: zip_url = f"https://github.com/{owner}/{repo}/archive/refs/heads/{branch}.zip" logging.info("Attempting ZIP download: %s", zip_url) try: with urllib.request.urlopen(zip_url) as resp: if resp.status != 200: raise RuntimeError(f"HTTP {resp.status} while fetching {zip_url}") with tempfile.TemporaryDirectory() as td: tmpzip = Path(td) / "repo.zip" with open(tmpzip, "wb") as fh: fh.write(resp.read()) with zipfile.ZipFile(tmpzip, "r") as z: z.extractall(td) # Extracted content usually at repo-/ extracted_root = None td_path = Path(td) for child in td_path.iterdir(): if child.is_dir(): extracted_root = child break if not extracted_root: raise RuntimeError("Broken ZIP: no extracted directory found") # Move contents of extracted_root into dest dest.mkdir(parents=True, exist_ok=True) for entry in extracted_root.iterdir(): target = dest / entry.name if target.exists(): # Try to remove before moving if target.is_dir(): shutil.rmtree(target) else: target.unlink() shutil.move(str(entry), str(dest)) logging.info("Downloaded and extracted %s (branch: %s) into %s", repo_url, branch, dest) return except Exception as exc: errors.append(str(exc)) logging.debug("ZIP download failed for branch %s: %s", branch, exc) continue # If we failed for all branches raise RuntimeError(f"Failed to download zip for {repo_url}; errors: {errors}") def main(argv: Optional[list[str]] = None) -> int: parser = argparse.ArgumentParser(description="Clone Hydrus into a 'hydrusnetwork' directory.") parser.add_argument("--root", "-r", default=".", help="Root folder to create the hydrusnetwork directory in (default: current working directory)") parser.add_argument("--dest-name", "-d", default="hydrusnetwork", help="Name of the destination folder (default: hydrusnetwork)") parser.add_argument("--repo", default="https://github.com/hydrusnetwork/hydrus", help="Repository URL to clone") parser.add_argument("--update", action="store_true", help="If dest exists and is a git repo, run git pull instead of cloning") parser.add_argument("--force", "-f", action="store_true", help="Remove existing destination directory before cloning") parser.add_argument("--branch", "-b", default=None, help="Branch to clone (passed to git clone --branch).") parser.add_argument("--depth", type=int, default=None, help="If set, pass --depth to git clone (shallow clone)") parser.add_argument("--no-fallback", action="store_true", help="If set, do not attempt to download ZIP when git is missing") parser.add_argument("--verbose", "-v", action="store_true", help="Verbose logging") args = parser.parse_args(argv) if args.verbose: logging.getLogger().setLevel(logging.DEBUG) root = Path(args.root).expanduser().resolve() dest = root / args.dest_name try: git = find_git_executable() if dest.exists(): if args.force: logging.info("Removing existing directory: %s", dest) shutil.rmtree(dest) else: # If it's a git repo and user asked to update, do a pull if is_git_repo(dest): if args.update: if not git: logging.error("Git not found; cannot --update without git") return 2 try: run_git_pull(git, dest) logging.info("Updated repository in %s", dest) return 0 except subprocess.CalledProcessError as e: logging.error("git pull failed: %s", e) return 3 else: logging.info("Destination %s is already a git repository. Use --update to pull or --force to re-clone.", dest) return 0 # If directory isn't a git repo and not empty, avoid overwriting if any(dest.iterdir()): logging.error("Destination %s already exists and is not empty. Use --force to overwrite.", dest) return 4 # Empty directory: attempt clone into it # Ensure parent exists dest.parent.mkdir(parents=True, exist_ok=True) # If git is available, use it if git: try: # If dest exists and is empty, git clone into dest should work run_git_clone(git, args.repo, dest, branch=args.branch, depth=args.depth) logging.info("Repository cloned into %s", dest) return 0 except subprocess.CalledProcessError as e: logging.error("git clone failed: %s", e) if args.no_fallback: return 5 logging.info("Falling back to ZIP download...") # Fallback: download ZIP from GitHub (tries main then master) if args.no_fallback: logging.error("No git available and fallback disabled (--no-fallback). Aborting.") return 6 try: download_and_extract_zip(args.repo, dest) return 0 except Exception as exc: logging.error("Failed to obtain repository: %s", exc) return 7 except Exception as exc: # pragma: no cover - defensive logging.exception("Unexpected error: %s", exc) return 99 if __name__ == "__main__": raise SystemExit(main())