huy
This commit is contained in:
@@ -527,15 +527,72 @@ def get_config_all() -> Dict[str, Any]:
|
||||
|
||||
# Worker Management Methods for medios.db
|
||||
|
||||
def _worker_db_connect(timeout: float = 0.75) -> sqlite3.Connection:
|
||||
conn = sqlite3.connect(
|
||||
str(DB_PATH),
|
||||
timeout=timeout,
|
||||
check_same_thread=False,
|
||||
)
|
||||
conn.row_factory = sqlite3.Row
|
||||
try:
|
||||
busy_ms = max(1, int(timeout * 1000))
|
||||
conn.execute(f"PRAGMA busy_timeout = {busy_ms}")
|
||||
conn.execute("PRAGMA journal_mode=WAL")
|
||||
conn.execute("PRAGMA synchronous=NORMAL")
|
||||
except sqlite3.Error:
|
||||
pass
|
||||
return conn
|
||||
|
||||
|
||||
def _worker_db_execute(
|
||||
query: str,
|
||||
params: tuple = (),
|
||||
*,
|
||||
fetch: Optional[str] = None,
|
||||
timeout: float = 0.75,
|
||||
retries: int = 1,
|
||||
) -> Any:
|
||||
attempts = 0
|
||||
while True:
|
||||
conn: Optional[sqlite3.Connection] = None
|
||||
try:
|
||||
conn = _worker_db_connect(timeout=timeout)
|
||||
cursor = conn.cursor()
|
||||
try:
|
||||
cursor.execute(query, params)
|
||||
if fetch == "one":
|
||||
result = cursor.fetchone()
|
||||
elif fetch == "all":
|
||||
result = cursor.fetchall()
|
||||
else:
|
||||
result = cursor.rowcount
|
||||
conn.commit()
|
||||
return result
|
||||
finally:
|
||||
cursor.close()
|
||||
except sqlite3.OperationalError as exc:
|
||||
msg = str(exc).lower()
|
||||
if "locked" in msg and attempts < retries:
|
||||
attempts += 1
|
||||
time.sleep(0.05 * attempts)
|
||||
continue
|
||||
raise
|
||||
finally:
|
||||
if conn is not None:
|
||||
try:
|
||||
conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def insert_worker(worker_id: str, worker_type: str, title: str = "", description: str = "") -> bool:
|
||||
try:
|
||||
db.execute(
|
||||
_worker_db_execute(
|
||||
"INSERT INTO workers (id, type, title, description, status) VALUES (?, ?, ?, ?, 'running')",
|
||||
(worker_id, worker_type, title, description)
|
||||
(worker_id, worker_type, title, description),
|
||||
)
|
||||
return True
|
||||
except Exception as exc:
|
||||
logger.exception("Failed to insert worker %s: %s", worker_id, exc)
|
||||
logger.warning("Failed to insert worker %s: %s", worker_id, exc)
|
||||
return False
|
||||
|
||||
def update_worker(worker_id: str, **kwargs) -> bool:
|
||||
@@ -559,20 +616,20 @@ def update_worker(worker_id: str, **kwargs) -> bool:
|
||||
vals.append(worker_id)
|
||||
|
||||
try:
|
||||
db.execute(query, tuple(vals))
|
||||
_worker_db_execute(query, tuple(vals))
|
||||
return True
|
||||
except Exception as exc:
|
||||
logger.exception("Failed to update worker %s: %s", worker_id, exc)
|
||||
logger.warning("Failed to update worker %s: %s", worker_id, exc)
|
||||
return False
|
||||
|
||||
def append_worker_stdout(worker_id: str, content: str, channel: str = 'stdout'):
|
||||
try:
|
||||
db.execute(
|
||||
_worker_db_execute(
|
||||
"INSERT INTO worker_stdout (worker_id, channel, content) VALUES (?, ?, ?)",
|
||||
(worker_id, channel, content)
|
||||
(worker_id, channel, content),
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.exception("Failed to append worker stdout for %s", worker_id)
|
||||
logger.warning("Failed to append worker stdout for %s: %s", worker_id, exc)
|
||||
|
||||
def get_worker_stdout(worker_id: str, channel: Optional[str] = None) -> str:
|
||||
query = "SELECT content FROM worker_stdout WHERE worker_id = ?"
|
||||
@@ -582,20 +639,30 @@ def get_worker_stdout(worker_id: str, channel: Optional[str] = None) -> str:
|
||||
params.append(channel)
|
||||
query += " ORDER BY timestamp ASC"
|
||||
|
||||
rows = db.fetchall(query, tuple(params))
|
||||
rows = _worker_db_execute(query, tuple(params), fetch="all") or []
|
||||
return "\n".join(row['content'] for row in rows)
|
||||
|
||||
def get_active_workers() -> List[Dict[str, Any]]:
|
||||
rows = db.fetchall("SELECT * FROM workers WHERE status = 'running' ORDER BY created_at DESC")
|
||||
rows = _worker_db_execute(
|
||||
"SELECT * FROM workers WHERE status = 'running' ORDER BY created_at DESC",
|
||||
fetch="all",
|
||||
) or []
|
||||
return [dict(row) for row in rows]
|
||||
|
||||
def get_worker(worker_id: str) -> Optional[Dict[str, Any]]:
|
||||
row = db.fetchone("SELECT * FROM workers WHERE id = ?", (worker_id,))
|
||||
row = _worker_db_execute(
|
||||
"SELECT * FROM workers WHERE id = ?",
|
||||
(worker_id,),
|
||||
fetch="one",
|
||||
)
|
||||
return dict(row) if row else None
|
||||
|
||||
def expire_running_workers(older_than_seconds: int = 300, status: str = 'error', reason: str = 'timeout') -> int:
|
||||
# SQLITE doesn't have a simple way to do DATETIME - INTERVAL, so we'll use strftime/unixepoch if available
|
||||
# or just do regular update for all running ones for now as a simple fallback
|
||||
query = f"UPDATE workers SET status = ?, error_message = ? WHERE status = 'running'"
|
||||
db.execute(query, (status, reason))
|
||||
try:
|
||||
_worker_db_execute(query, (status, reason), timeout=0.5, retries=0)
|
||||
except Exception:
|
||||
return 0
|
||||
return 0 # We don't easily get the rowcount from db.execute right now
|
||||
|
||||
Reference in New Issue
Block a user