Files
Medios-Macina/SYS/repl_queue.py

84 lines
2.4 KiB
Python
Raw Permalink Normal View History

2026-03-18 20:17:28 -07:00
from __future__ import annotations
import json
import time
import uuid
from pathlib import Path
from typing import Any, Dict, List, Optional
def repl_queue_dir(root: Path) -> Path:
return Path(root) / "Log" / "repl_queue"
2026-03-19 13:08:15 -07:00
def _legacy_repl_queue_glob(root: Path) -> list[Path]:
log_dir = Path(root) / "Log"
if not log_dir.exists():
return []
return list(log_dir.glob("medeia-repl-queue-*.json"))
2026-03-18 20:17:28 -07:00
def enqueue_repl_command(
root: Path,
command: str,
*,
source: str = "external",
metadata: Optional[Dict[str, Any]] = None,
) -> Path:
queue_dir = repl_queue_dir(root)
queue_dir.mkdir(parents=True, exist_ok=True)
payload: Dict[str, Any] = {
"id": uuid.uuid4().hex,
"command": str(command or "").strip(),
"source": str(source or "external").strip() or "external",
"created_at": time.time(),
}
if isinstance(metadata, dict) and metadata:
payload["metadata"] = metadata
stamp = int(time.time() * 1000)
token = payload["id"][:8]
final_path = queue_dir / f"{stamp:013d}-{token}.json"
temp_path = final_path.with_suffix(".tmp")
temp_path.write_text(json.dumps(payload, ensure_ascii=False), encoding="utf-8")
temp_path.replace(final_path)
return final_path
def pop_repl_commands(root: Path, *, limit: int = 8) -> List[Dict[str, Any]]:
queue_dir = repl_queue_dir(root)
2026-03-19 13:08:15 -07:00
legacy_entries = _legacy_repl_queue_glob(root)
if not queue_dir.exists() and not legacy_entries:
2026-03-18 20:17:28 -07:00
return []
items: List[Dict[str, Any]] = []
2026-03-19 13:08:15 -07:00
entries: List[Path] = []
if queue_dir.exists():
entries.extend(queue_dir.glob("*.json"))
entries.extend(legacy_entries)
def _sort_key(path: Path) -> tuple[float, str]:
try:
ts = float(path.stat().st_mtime)
except Exception:
ts = 0.0
return (ts, path.name)
for entry in sorted(entries, key=_sort_key)[: max(1, int(limit or 1))]:
2026-03-18 20:17:28 -07:00
try:
payload = json.loads(entry.read_text(encoding="utf-8"))
except Exception:
payload = {
"id": entry.stem,
"command": "",
"source": "invalid",
"created_at": entry.stat().st_mtime,
}
try:
entry.unlink()
except Exception:
continue
if isinstance(payload, dict):
items.append(payload)
return items