From cea208d3b99e49581dfefbeeefedfc4df6e5e881 Mon Sep 17 00:00:00 2001 From: Nose Date: Sun, 28 Dec 2025 16:52:47 -0800 Subject: [PATCH] klhj --- hh-zip-test4/README | 1 + scripts/hydrusnetwork.py | 397 ++++++++++++++++++++++++++++++++++++--- 2 files changed, 369 insertions(+), 29 deletions(-) create mode 100644 hh-zip-test4/README diff --git a/hh-zip-test4/README b/hh-zip-test4/README new file mode 100644 index 0000000..980a0d5 --- /dev/null +++ b/hh-zip-test4/README @@ -0,0 +1 @@ +Hello World! diff --git a/scripts/hydrusnetwork.py b/scripts/hydrusnetwork.py index eaddc46..ed63301 100644 --- a/scripts/hydrusnetwork.py +++ b/scripts/hydrusnetwork.py @@ -7,6 +7,7 @@ Works on Linux and Windows. Behavior: - Use --update to run `git pull` (if it's a git repo). - Use --force to remove it and re-clone. - If `git` is not available, the script will fall back to downloading the repository ZIP and extracting it. +- By default the script will create a repository-local virtual environment `.//.venv` after cloning/extraction; use `--no-venv` to skip this. Use `--install-deps` to install requirements.txt into that venv. Examples: python scripts/hydrusnetwork.py @@ -17,6 +18,7 @@ Examples: from __future__ import annotations import argparse +import os import shutil import subprocess import sys @@ -63,13 +65,19 @@ def is_git_repo(path: Path) -> bool: def run_git_clone(git: str, repo: str, dest: Path, branch: Optional[str] = None, depth: Optional[int] = None) -> None: - cmd = [git, "clone", repo, str(dest)] - if branch: - cmd[1:1] = [git] # noop just to keep style clear (no-op) - cmd += ["--branch", branch] + # Build git clone with options before the repository argument. Support shallow clones + # via --depth when requested. + cmd = [git, "clone"] if depth is not None and depth > 0: cmd += ["--depth", str(int(depth))] - logging.info("Cloning: %s -> %s", repo, dest) + # For performance/clarity, when doing a shallow clone of a specific branch, + # prefer --single-branch to avoid fetching other refs. + if branch: + cmd += ["--single-branch"] + if branch: + cmd += ["--branch", branch] + cmd += [repo, str(dest)] + logging.info("Cloning: %s -> %s (depth=%s)", repo, dest, str(depth) if depth else "full") subprocess.run(cmd, check=True) @@ -83,6 +91,11 @@ def download_and_extract_zip(repo_url: str, dest: Path, branch_candidates: Tuple This avoids requiring git to be installed. """ + +# By default, if a project virtualenv is detected (".venv" or "venv" under +# the chosen --root, or $VIRTUAL_ENV), the script will re-exec itself under +# that venv's python interpreter so subsequent operations use the project +# environment. Use --no-project-venv to opt out of this behavior. # Parse owner/repo from URL like https://github.com/owner/repo try: from urllib.parse import urlparse @@ -140,6 +153,238 @@ def download_and_extract_zip(repo_url: str, dest: Path, branch_candidates: Tuple raise RuntimeError(f"Failed to download zip for {repo_url}; errors: {errors}") +# --- Project venv helpers ------------------------------------------------- + +def get_python_in_venv(venv_dir: Path) -> Optional[Path]: + """Return path to python executable inside a venv-like directory, or None.""" + try: + v = Path(venv_dir) + # Windows + win_python = v / "Scripts" / "python.exe" + if win_python.exists(): + return win_python + # Unix + unix_python = v / "bin" / "python" + if unix_python.exists(): + return unix_python + unix_py3 = v / "bin" / "python3" + if unix_py3.exists(): + return unix_py3 + except Exception: + pass + return None + + +def find_project_venv(root: Path) -> Optional[Path]: + """Find a project venv directory under the given root (or VIRTUAL_ENV). + + Checks, in order: $VIRTUAL_ENV, /.venv, /venv + Returns the Path to the venv dir if it looks valid, else None. + """ + candidates = [] + try: + venv_env = os.environ.get("VIRTUAL_ENV") + if venv_env: + candidates.append(Path(venv_env)) + except Exception: + pass + candidates.extend([root / ".venv", root / "venv"]) # order matters: prefer .venv + for c in candidates: + try: + if c and c.exists(): + py = get_python_in_venv(c) + if py is not None: + return c + except Exception: + continue + return None + + +def maybe_reexec_under_project_venv(root: Path, disable: bool = False) -> None: + """If a project venv exists and we are not already running under it, re-exec + the current script using that venv's python interpreter. + + This makes the script "use the project venv by default" when present. + """ + if disable: + return + # Avoid infinite re-exec loops + if os.environ.get("HYDRUSNETWORK_REEXEC") == "1": + return + + try: + venv_dir = find_project_venv(root) + if not venv_dir: + return + py = get_python_in_venv(venv_dir) + if not py: + return + + current = Path(sys.executable) + try: + # If current interpreter is the same as venv's python, skip. + if current.resolve() == py.resolve(): + return + except Exception: + pass + + logging.info("Re-executing under project venv: %s", py) + env = os.environ.copy() + env["HYDRUSNETWORK_REEXEC"] = "1" + # Use absolute script path to avoid any relative path quirks. + try: + script_path = Path(sys.argv[0]).resolve() + except Exception: + script_path = None + args = [str(py), str(script_path) if script_path is not None else sys.argv[0]] + sys.argv[1:] + logging.debug("Exec args: %s", args) + os.execvpe(str(py), args, env) + except Exception as exc: + logging.debug("Failed to re-exec under project venv: %s", exc) + return + + +# --- Permissions helpers ------------------------------------------------- + +def is_elevated() -> bool: + """Return True if the current process is elevated (Windows) or running as root (Unix).""" + try: + if os.name == "nt": + import ctypes + + try: + return bool(ctypes.windll.shell32.IsUserAnAdmin()) + except Exception: + return False + else: + try: + return os.geteuid() == 0 + except Exception: + return False + except Exception: + return False + + +def fix_permissions_windows(path: Path, user: Optional[str] = None) -> bool: + """Attempt to set owner and grant FullControl via icacls/takeown. + + Returns True if commands report success; otherwise False. May require elevation. + """ + import getpass + import subprocess + + try: + if not user: + try: + who = subprocess.check_output(["whoami"], text=True).strip() + user = who or getpass.getuser() + except Exception: + user = getpass.getuser() + + logging.info("Attempting Windows ownership/ACL fix for %s (owner=%s)", path, user) + + # Try to take ownership (best-effort) + try: + subprocess.run(["takeown", "/F", str(path), "/R", "/D", "Y"], check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + except Exception: + pass + + rc_setowner = 1 + rc_grant = 1 + try: + out = subprocess.run(["icacls", str(path), "/setowner", user, "/T", "/C"], check=False) + rc_setowner = int(out.returncode) + except Exception: + rc_setowner = 1 + + try: + out = subprocess.run(["icacls", str(path), "/grant", f"{user}:(OI)(CI)F", "/T", "/C"], check=False) + rc_grant = int(out.returncode) + except Exception: + rc_grant = 1 + + success = (rc_setowner == 0 or rc_grant == 0) + if success: + logging.info("Windows permission fix succeeded (owner/grant applied).") + else: + logging.warning("Windows permission fix did not fully succeed (setowner/grant may require elevation).") + return success + except Exception as exc: + logging.debug("Windows fix-permissions error: %s", exc) + return False + + +def fix_permissions_unix(path: Path, user: Optional[str] = None, group: Optional[str] = None) -> bool: + """Attempt to chown/chmod recursively for a Unix-like system. + + Returns True if operations were attempted; may still fail for some files if not root. + """ + import getpass + import pwd + import grp + import subprocess + + try: + if not user: + user = getpass.getuser() + + try: + pw = pwd.getpwnam(user) + uid = pw.pw_uid + gid = pw.pw_gid if not group else grp.getgrnam(group).gr_gid + except Exception: + logging.warning("Could not resolve user/group to uid/gid; skipping chown.") + return False + + logging.info("Attempting to chown recursively to %s:%s (may require root)...", user, group or pw.pw_gid) + + try: + subprocess.run(["chown", "-R", f"{user}:{group or pw.pw_gid}", str(path)], check=True) + except Exception: + # Best-effort fallback: chown/chmod individual entries + for root_dir, dirs, files in os.walk(path): + try: + os.chown(root_dir, uid, gid) + except Exception: + pass + for fn in files: + fpath = os.path.join(root_dir, fn) + try: + os.chown(fpath, uid, gid) + except Exception: + pass + + # Fix modes: directories 0o755, files 0o644 (best-effort) + for root_dir, dirs, files in os.walk(path): + for d in dirs: + try: + os.chmod(os.path.join(root_dir, d), 0o755) + except Exception: + pass + for f in files: + try: + os.chmod(os.path.join(root_dir, f), 0o644) + except Exception: + pass + + logging.info("Unix permission fix attempted (some changes may require root privilege).") + return True + except Exception as exc: + logging.debug("Unix fix-permissions error: %s", exc) + return False + + +def fix_permissions(path: Path, user: Optional[str] = None, group: Optional[str] = None) -> bool: + try: + if os.name == "nt": + return fix_permissions_windows(path, user=user) + else: + return fix_permissions_unix(path, user=user, group=group) + except Exception as exc: + logging.debug("General fix-permissions error: %s", exc) + return False + + def main(argv: Optional[list[str]] = None) -> int: parser = argparse.ArgumentParser(description="Clone Hydrus into a 'hydrusnetwork' directory.") parser.add_argument("--root", "-r", default=".", help="Root folder to create the hydrusnetwork directory in (default: current working directory)") @@ -148,8 +393,18 @@ def main(argv: Optional[list[str]] = None) -> int: parser.add_argument("--update", action="store_true", help="If dest exists and is a git repo, run git pull instead of cloning") parser.add_argument("--force", "-f", action="store_true", help="Remove existing destination directory before cloning") parser.add_argument("--branch", "-b", default=None, help="Branch to clone (passed to git clone --branch).") - parser.add_argument("--depth", type=int, default=None, help="If set, pass --depth to git clone (shallow clone)") - parser.add_argument("--no-fallback", action="store_true", help="If set, do not attempt to download ZIP when git is missing") + parser.add_argument("--depth", type=int, default=1, help="If set, pass --depth to git clone (default: 1 for a shallow clone). Use --full to perform a full clone instead.") + parser.add_argument("--full", action="store_true", help="Perform a full clone (no --depth passed to git clone)") + parser.add_argument("--git", action="store_true", help="Use git clone instead of fetching repository ZIP (opt-in). Default: fetch ZIP (smaller).") + parser.add_argument("--no-fallback", action="store_true", help="If set, do not attempt to download ZIP when git is missing (only relevant with --git)") + parser.add_argument("--fix-permissions", action="store_true", help="Fix ownership/permissions on the obtained repo (OS-aware). Requires elevated privileges for some actions.") + parser.add_argument("--fix-permissions-user", default=None, help="User to set as owner when fixing permissions (defaults to current user).") + parser.add_argument("--fix-permissions-group", default=None, help="Group to set when fixing permissions (Unix only).") + parser.add_argument("--no-venv", action="store_true", help="Do not create a venv inside the cloned repo (default: create a .venv folder)") + parser.add_argument("--venv-name", default=".venv", help="Name of the venv directory to create inside the repo (default: .venv)") + parser.add_argument("--recreate-venv", action="store_true", help="Remove existing venv and create a fresh one") + parser.add_argument("--install-deps", action="store_true", help="If present, install dependencies from requirements.txt into the created venv") + parser.add_argument("--no-project-venv", action="store_true", help="Do not attempt to re-exec the script under a project venv (if present)") parser.add_argument("--verbose", "-v", action="store_true", help="Verbose logging") args = parser.parse_args(argv) @@ -158,6 +413,12 @@ def main(argv: Optional[list[str]] = None) -> int: logging.getLogger().setLevel(logging.DEBUG) root = Path(args.root).expanduser().resolve() + # Re-exec under project venv by default when present (opt-out with --no-project-venv) + try: + maybe_reexec_under_project_venv(root, disable=bool(args.no_project_venv)) + except Exception: + pass + dest = root / args.dest_name try: @@ -193,29 +454,107 @@ def main(argv: Optional[list[str]] = None) -> int: # Ensure parent exists dest.parent.mkdir(parents=True, exist_ok=True) - # If git is available, use it - if git: - try: - # If dest exists and is empty, git clone into dest should work - run_git_clone(git, args.repo, dest, branch=args.branch, depth=args.depth) - logging.info("Repository cloned into %s", dest) - return 0 - except subprocess.CalledProcessError as e: - logging.error("git clone failed: %s", e) - if args.no_fallback: - return 5 - logging.info("Falling back to ZIP download...") + obtained = False + obtained_by: Optional[str] = None - # Fallback: download ZIP from GitHub (tries main then master) - if args.no_fallback: - logging.error("No git available and fallback disabled (--no-fallback). Aborting.") - return 6 - try: - download_and_extract_zip(args.repo, dest) - return 0 - except Exception as exc: - logging.error("Failed to obtain repository: %s", exc) - return 7 + # If user explicitly requested git, try that first + if args.git: + if git: + try: + # Default behavior when using git: shallow clone (depth=1) unless --full specified. + depth_to_use = None if getattr(args, "full", False) else args.depth + run_git_clone(git, args.repo, dest, branch=args.branch, depth=depth_to_use) + logging.info("Repository cloned into %s", dest) + obtained = True + obtained_by = "git" + except subprocess.CalledProcessError as e: + logging.error("git clone failed: %s", e) + if args.no_fallback: + return 5 + logging.info("Git clone failed; falling back to ZIP download...") + else: + logging.info("Git not found; falling back to ZIP download...") + + # If not obtained via git, try ZIP fetch (default behavior) + if not obtained: + try: + download_and_extract_zip(args.repo, dest) + logging.info("Repository downloaded and extracted into %s", dest) + obtained = True + obtained_by = "zip" + except Exception as exc: + logging.error("Failed to obtain repository (ZIP): %s", exc) + return 7 + + # Post-obtain setup: create repository-local venv (unless disabled) + if not getattr(args, "no_venv", False): + try: + venv_py = None + venv_dir = dest / str(getattr(args, "venv_name", ".venv")) + if venv_dir.exists(): + if getattr(args, "recreate_venv", False): + logging.info("Removing existing venv: %s", venv_dir) + shutil.rmtree(venv_dir) + else: + logging.info("Using existing venv at %s", venv_dir) + if not venv_dir.exists(): + logging.info("Creating venv at %s", venv_dir) + try: + subprocess.run([sys.executable, "-m", "venv", str(venv_dir)], check=True) + except subprocess.CalledProcessError as e: + logging.error("Failed to create venv: %s", e) + return 8 + try: + venv_py = get_python_in_venv(venv_dir) + except Exception: + venv_py = None + if not venv_py: + logging.error("Could not locate python in venv %s", venv_dir) + return 9 + + logging.info("Venv ready: %s", venv_py) + + # Optionally install requirements.txt + if getattr(args, "install_deps", False): + req = dest / "requirements.txt" + if req.exists(): + logging.info("Installing dependencies from %s into venv", req) + try: + subprocess.run([str(venv_py), "-m", "pip", "install", "--upgrade", "pip"], check=True) + subprocess.run([str(venv_py), "-m", "pip", "install", "-r", str(req)], cwd=str(dest), check=True) + logging.info("Dependencies installed successfully") + except subprocess.CalledProcessError as e: + logging.error("Failed to install dependencies: %s", e) + return 10 + else: + logging.info("No requirements.txt found; skipping dependency installation") + + except Exception as exc: + logging.exception("Unexpected error during venv setup: %s", exc) + return 99 + + # Optionally fix permissions + if getattr(args, "fix_permissions", False): + logging.info("Fixing ownership/permissions for %s", dest) + fp_user = getattr(args, "fix_permissions_user", None) + fp_group = getattr(args, "fix_permissions_group", None) + try: + ok_perm = fix_permissions(dest, user=fp_user, group=fp_group) + if not ok_perm: + logging.warning("Permission fix reported issues or lacked privileges; some files may remain inaccessible.") + except Exception as exc: + logging.exception("Failed to fix permissions: %s", exc) + + if getattr(args, "no_venv", False): + logging.info("Setup complete. Venv creation was skipped (use --venv-name/omit --no-venv to create one).") + else: + logging.info("Setup complete. To use the repository venv, activate it:") + if os.name == "nt": + logging.info(" %s\\Scripts\\activate", venv_dir) + else: + logging.info(" source %s/bin/activate", venv_dir) + + return 0 except Exception as exc: # pragma: no cover - defensive logging.exception("Unexpected error: %s", exc)