style: apply ruff auto-fixes
This commit is contained in:
@@ -1,9 +1,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import sys
|
||||
import subprocess
|
||||
import atexit
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
|
||||
@@ -4,13 +4,10 @@ import subprocess
|
||||
import sys
|
||||
import shutil
|
||||
from SYS.logger import log, debug
|
||||
from urllib.parse import urlsplit, urlunsplit, unquote
|
||||
from collections import deque
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Iterable, List, Optional, Sequence, Set, Tuple
|
||||
|
||||
from API.HydrusNetwork import apply_hydrus_tag_mutation, fetch_hydrus_metadata, fetch_hydrus_metadata_by_url
|
||||
from SYS.models import FileRelationshipTracker
|
||||
|
||||
try: # Optional; used when available for richer metadata fetches
|
||||
import yt_dlp
|
||||
@@ -2585,7 +2582,7 @@ def scrape_url_metadata(
|
||||
)
|
||||
except json_module.JSONDecodeError:
|
||||
pass
|
||||
except Exception as e:
|
||||
except Exception:
|
||||
pass # Silently ignore if we can't get playlist entries
|
||||
|
||||
# Fallback: if still no tags detected, get from first item
|
||||
|
||||
@@ -4,7 +4,7 @@ import importlib
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
from typing import Any, Dict, Iterable, List, Optional, Tuple
|
||||
from typing import Any, Dict, List, Tuple
|
||||
|
||||
from SYS.logger import log
|
||||
from SYS.rich_display import stdout_console
|
||||
|
||||
1691
SYS/pipeline.py
1691
SYS/pipeline.py
File diff suppressed because it is too large
Load Diff
@@ -18,8 +18,7 @@ so authors don't have to install pandas/bs4 unless they want to.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any, Dict, List, Optional
|
||||
from urllib.parse import quote_plus
|
||||
from typing import List, Optional
|
||||
|
||||
from API.HTTP import HTTPClient
|
||||
from ProviderCore.base import SearchResult
|
||||
|
||||
@@ -16,7 +16,6 @@ from dataclasses import dataclass, field
|
||||
from typing import Any, Dict, List, Optional, Callable, Set
|
||||
from pathlib import Path
|
||||
import json
|
||||
import shutil
|
||||
|
||||
from rich.box import SIMPLE
|
||||
from rich.console import Group
|
||||
@@ -1678,7 +1677,7 @@ class Table:
|
||||
try:
|
||||
int(value)
|
||||
except ValueError:
|
||||
print(f"Must be an integer")
|
||||
print("Must be an integer")
|
||||
continue
|
||||
|
||||
return value
|
||||
|
||||
@@ -11,7 +11,7 @@ from __future__ import annotations
|
||||
|
||||
import contextlib
|
||||
import sys
|
||||
from typing import Any, Iterator, Sequence, TextIO
|
||||
from typing import Any, Iterator, TextIO
|
||||
|
||||
from rich.console import Console
|
||||
from rich.panel import Panel
|
||||
@@ -81,7 +81,6 @@ def show_provider_config_panel(
|
||||
) -> None:
|
||||
"""Show a Rich panel explaining how to configure providers."""
|
||||
from rich.table import Table as RichTable
|
||||
from rich.text import Text
|
||||
from rich.console import Group
|
||||
|
||||
if isinstance(provider_names, str):
|
||||
@@ -117,7 +116,6 @@ def show_store_config_panel(
|
||||
) -> None:
|
||||
"""Show a Rich panel explaining how to configure storage backends."""
|
||||
from rich.table import Table as RichTable
|
||||
from rich.text import Text
|
||||
from rich.console import Group
|
||||
|
||||
if isinstance(store_names, str):
|
||||
@@ -152,7 +150,6 @@ def show_available_providers_panel(provider_names: List[str]) -> None:
|
||||
"""Show a Rich panel listing available/configured providers."""
|
||||
from rich.columns import Columns
|
||||
from rich.console import Group
|
||||
from rich.text import Text
|
||||
|
||||
if not provider_names:
|
||||
return
|
||||
|
||||
@@ -14,9 +14,8 @@ except Exception:
|
||||
import os
|
||||
import base64
|
||||
import logging
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any, Iterable, Optional
|
||||
from typing import Any, Iterable
|
||||
from datetime import datetime
|
||||
from dataclasses import dataclass, field
|
||||
from fnmatch import fnmatch
|
||||
|
||||
349
SYS/worker.py
Normal file
349
SYS/worker.py
Normal file
@@ -0,0 +1,349 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import atexit
|
||||
import io
|
||||
import sys
|
||||
import uuid
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Optional, Set, TextIO
|
||||
|
||||
from SYS.config import get_local_storage_path
|
||||
from SYS.worker_manager import WorkerManager
|
||||
|
||||
|
||||
class WorkerOutputMirror(io.TextIOBase):
|
||||
"""Mirror stdout/stderr to worker manager while preserving console output."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
original: TextIO,
|
||||
manager: WorkerManager,
|
||||
worker_id: str,
|
||||
channel: str,
|
||||
):
|
||||
self._original = original
|
||||
self._manager = manager
|
||||
self._worker_id = worker_id
|
||||
self._channel = channel
|
||||
self._pending: str = ""
|
||||
|
||||
def write(self, data: str) -> int: # type: ignore[override]
|
||||
if not data:
|
||||
return 0
|
||||
self._original.write(data)
|
||||
self._buffer_text(data)
|
||||
return len(data)
|
||||
|
||||
def flush(self) -> None: # type: ignore[override]
|
||||
self._original.flush()
|
||||
self._flush_pending(force=True)
|
||||
|
||||
def isatty(self) -> bool: # pragma: no cover
|
||||
return bool(getattr(self._original, "isatty", lambda: False)())
|
||||
|
||||
def _buffer_text(self, data: str) -> None:
|
||||
combined = self._pending + data
|
||||
lines = combined.splitlines(keepends=True)
|
||||
if not lines:
|
||||
self._pending = combined
|
||||
return
|
||||
|
||||
if lines[-1].endswith(("\n", "\r")):
|
||||
complete = lines
|
||||
self._pending = ""
|
||||
else:
|
||||
complete = lines[:-1]
|
||||
self._pending = lines[-1]
|
||||
|
||||
for chunk in complete:
|
||||
self._emit(chunk)
|
||||
|
||||
def _flush_pending(self, *, force: bool = False) -> None:
|
||||
if self._pending and force:
|
||||
self._emit(self._pending)
|
||||
self._pending = ""
|
||||
|
||||
def _emit(self, text: str) -> None:
|
||||
if not text:
|
||||
return
|
||||
try:
|
||||
self._manager.append_stdout(self._worker_id, text, channel=self._channel)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@property
|
||||
def encoding(self) -> str: # type: ignore[override]
|
||||
return getattr(self._original, "encoding", "utf-8")
|
||||
|
||||
|
||||
class WorkerStageSession:
|
||||
"""Lifecycle helper for wrapping a CLI cmdlet execution in a worker record."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
manager: WorkerManager,
|
||||
worker_id: str,
|
||||
orig_stdout: TextIO,
|
||||
orig_stderr: TextIO,
|
||||
stdout_proxy: WorkerOutputMirror,
|
||||
stderr_proxy: WorkerOutputMirror,
|
||||
config: Optional[Dict[str, Any]],
|
||||
logging_enabled: bool,
|
||||
completion_label: str,
|
||||
error_label: str,
|
||||
) -> None:
|
||||
self.manager = manager
|
||||
self.worker_id = worker_id
|
||||
self.orig_stdout = orig_stdout
|
||||
self.orig_stderr = orig_stderr
|
||||
self.stdout_proxy = stdout_proxy
|
||||
self.stderr_proxy = stderr_proxy
|
||||
self.config = config
|
||||
self.logging_enabled = logging_enabled
|
||||
self.closed = False
|
||||
self._completion_label = completion_label
|
||||
self._error_label = error_label
|
||||
|
||||
def close(self, *, status: str = "completed", error_msg: str = "") -> None:
|
||||
if self.closed:
|
||||
return
|
||||
try:
|
||||
self.stdout_proxy.flush()
|
||||
self.stderr_proxy.flush()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
sys.stdout = self.orig_stdout
|
||||
sys.stderr = self.orig_stderr
|
||||
|
||||
if self.logging_enabled:
|
||||
try:
|
||||
self.manager.disable_logging_for_worker(self.worker_id)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
try:
|
||||
if status == "completed":
|
||||
self.manager.log_step(self.worker_id, self._completion_label)
|
||||
else:
|
||||
self.manager.log_step(
|
||||
self.worker_id, f"{self._error_label}: {error_msg or status}"
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
try:
|
||||
self.manager.finish_worker(
|
||||
self.worker_id, result=status or "completed", error_msg=error_msg or ""
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if self.config and self.config.get("_current_worker_id") == self.worker_id:
|
||||
self.config.pop("_current_worker_id", None)
|
||||
self.closed = True
|
||||
|
||||
|
||||
class WorkerManagerRegistry:
|
||||
"""Process-wide WorkerManager cache keyed by library_root."""
|
||||
|
||||
_manager: Optional[WorkerManager] = None
|
||||
_manager_root: Optional[Path] = None
|
||||
_orphan_cleanup_done: bool = False
|
||||
_registered: bool = False
|
||||
|
||||
@classmethod
|
||||
def ensure(cls, config: Dict[str, Any]) -> Optional[WorkerManager]:
|
||||
if not isinstance(config, dict):
|
||||
return None
|
||||
|
||||
existing = config.get("_worker_manager")
|
||||
if isinstance(existing, WorkerManager):
|
||||
return existing
|
||||
|
||||
library_root = get_local_storage_path(config)
|
||||
if not library_root:
|
||||
return None
|
||||
|
||||
try:
|
||||
resolved_root = Path(library_root).resolve()
|
||||
except Exception:
|
||||
resolved_root = Path(library_root)
|
||||
|
||||
try:
|
||||
if cls._manager is None or cls._manager_root != resolved_root:
|
||||
if cls._manager is not None:
|
||||
try:
|
||||
cls._manager.close()
|
||||
except Exception:
|
||||
pass
|
||||
cls._manager = WorkerManager(resolved_root, auto_refresh_interval=0.5)
|
||||
cls._manager_root = resolved_root
|
||||
|
||||
manager = cls._manager
|
||||
config["_worker_manager"] = manager
|
||||
|
||||
if manager is not None and not cls._orphan_cleanup_done:
|
||||
try:
|
||||
manager.expire_running_workers(
|
||||
older_than_seconds=120,
|
||||
worker_id_prefix="cli_%",
|
||||
reason=(
|
||||
"CLI session ended unexpectedly; marking worker as failed",
|
||||
),
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
else:
|
||||
cls._orphan_cleanup_done = True
|
||||
|
||||
if not cls._registered:
|
||||
atexit.register(cls.close)
|
||||
cls._registered = True
|
||||
|
||||
return manager
|
||||
except Exception as exc:
|
||||
print(f"[worker] Could not initialize worker manager: {exc}", file=sys.stderr)
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def close(cls) -> None:
|
||||
if cls._manager is None:
|
||||
return
|
||||
try:
|
||||
cls._manager.close()
|
||||
except Exception:
|
||||
pass
|
||||
cls._manager = None
|
||||
cls._manager_root = None
|
||||
cls._orphan_cleanup_done = False
|
||||
|
||||
|
||||
class WorkerStages:
|
||||
"""Factory methods for stage/pipeline worker sessions."""
|
||||
|
||||
@staticmethod
|
||||
def _start_worker_session(
|
||||
worker_manager: Optional[WorkerManager],
|
||||
*,
|
||||
worker_type: str,
|
||||
title: str,
|
||||
description: str,
|
||||
pipe_text: str,
|
||||
config: Optional[Dict[str, Any]],
|
||||
completion_label: str,
|
||||
error_label: str,
|
||||
skip_logging_for: Optional[Set[str]] = None,
|
||||
session_worker_ids: Optional[Set[str]] = None,
|
||||
) -> Optional[WorkerStageSession]:
|
||||
if worker_manager is None:
|
||||
return None
|
||||
if skip_logging_for and worker_type in skip_logging_for:
|
||||
return None
|
||||
|
||||
safe_type = worker_type or "cmd"
|
||||
worker_id = f"cli_{safe_type[:8]}_{uuid.uuid4().hex[:6]}"
|
||||
|
||||
try:
|
||||
tracked = worker_manager.track_worker(
|
||||
worker_id,
|
||||
worker_type=worker_type,
|
||||
title=title,
|
||||
description=description or "(no args)",
|
||||
pipe=pipe_text,
|
||||
)
|
||||
if not tracked:
|
||||
return None
|
||||
except Exception as exc:
|
||||
print(f"[worker] Failed to track {worker_type}: {exc}", file=sys.stderr)
|
||||
return None
|
||||
|
||||
if session_worker_ids is not None:
|
||||
session_worker_ids.add(worker_id)
|
||||
|
||||
logging_enabled = False
|
||||
try:
|
||||
handler = worker_manager.enable_logging_for_worker(worker_id)
|
||||
logging_enabled = handler is not None
|
||||
except Exception:
|
||||
logging_enabled = False
|
||||
|
||||
orig_stdout = sys.stdout
|
||||
orig_stderr = sys.stderr
|
||||
stdout_proxy = WorkerOutputMirror(orig_stdout, worker_manager, worker_id, "stdout")
|
||||
stderr_proxy = WorkerOutputMirror(orig_stderr, worker_manager, worker_id, "stderr")
|
||||
sys.stdout = stdout_proxy
|
||||
sys.stderr = stderr_proxy
|
||||
if isinstance(config, dict):
|
||||
config["_current_worker_id"] = worker_id
|
||||
|
||||
try:
|
||||
worker_manager.log_step(worker_id, f"Started {worker_type}")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return WorkerStageSession(
|
||||
manager=worker_manager,
|
||||
worker_id=worker_id,
|
||||
orig_stdout=orig_stdout,
|
||||
orig_stderr=orig_stderr,
|
||||
stdout_proxy=stdout_proxy,
|
||||
stderr_proxy=stderr_proxy,
|
||||
config=config,
|
||||
logging_enabled=logging_enabled,
|
||||
completion_label=completion_label,
|
||||
error_label=error_label,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def begin_stage(
|
||||
cls,
|
||||
worker_manager: Optional[WorkerManager],
|
||||
*,
|
||||
cmd_name: str,
|
||||
stage_tokens: Sequence[str],
|
||||
config: Optional[Dict[str, Any]],
|
||||
command_text: str,
|
||||
) -> Optional[WorkerStageSession]:
|
||||
description = " ".join(stage_tokens[1:]) if len(stage_tokens) > 1 else "(no args)"
|
||||
session_worker_ids = None
|
||||
if isinstance(config, dict):
|
||||
session_worker_ids = config.get("_session_worker_ids")
|
||||
|
||||
return cls._start_worker_session(
|
||||
worker_manager,
|
||||
worker_type=cmd_name,
|
||||
title=f"{cmd_name} stage",
|
||||
description=description,
|
||||
pipe_text=command_text,
|
||||
config=config,
|
||||
completion_label="Stage completed",
|
||||
error_label="Stage error",
|
||||
skip_logging_for={".worker", "worker", "workers"},
|
||||
session_worker_ids=session_worker_ids,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def begin_pipeline(
|
||||
cls,
|
||||
worker_manager: Optional[WorkerManager],
|
||||
*,
|
||||
pipeline_text: str,
|
||||
config: Optional[Dict[str, Any]],
|
||||
) -> Optional[WorkerStageSession]:
|
||||
session_worker_ids: Set[str] = set()
|
||||
if isinstance(config, dict):
|
||||
config["_session_worker_ids"] = session_worker_ids
|
||||
|
||||
return cls._start_worker_session(
|
||||
worker_manager,
|
||||
worker_type="pipeline",
|
||||
title="Pipeline run",
|
||||
description=pipeline_text,
|
||||
pipe_text=pipeline_text,
|
||||
config=config,
|
||||
completion_label="Pipeline completed",
|
||||
error_label="Pipeline error",
|
||||
session_worker_ids=session_worker_ids,
|
||||
)
|
||||
Reference in New Issue
Block a user