From fd03c16eca085423267c163137b28ccb60de8db0 Mon Sep 17 00:00:00 2001
From: Matthias Nott <mnott@mnsoft.org>
Date: Wed, 25 Feb 2026 00:45:13 +0100
Subject: [PATCH] feat: multi-compose rebuild (Seafile), cancel endpoint, schedule router, project descriptor
---
app/ops_runner.py | 158 ++++++++++++++++++++++++++++++++++++----------------
1 files changed, 108 insertions(+), 50 deletions(-)
diff --git a/app/ops_runner.py b/app/ops_runner.py
index d9a460a..460fe80 100644
--- a/app/ops_runner.py
+++ b/app/ops_runner.py
@@ -1,6 +1,7 @@
import asyncio
import json
import os
+import uuid
from typing import AsyncGenerator
OPS_CLI = os.environ.get("OPS_CLI", "/opt/infrastructure/ops")
@@ -9,6 +10,48 @@
_DEFAULT_TIMEOUT = 300
_BACKUP_TIMEOUT = 3600
+
+# ---------------------------------------------------------------------------
+# Operation registry — tracks running processes for cancel support
+# ---------------------------------------------------------------------------
+_active_ops: dict[str, asyncio.subprocess.Process] = {}
+_cancelled_ops: set[str] = set()
+
+
+def new_op_id() -> str:
+ return uuid.uuid4().hex[:12]
+
+
+def register_op(op_id: str, proc: asyncio.subprocess.Process) -> None:
+ _active_ops[op_id] = proc
+
+
+def deregister_op(op_id: str) -> None:
+ _active_ops.pop(op_id, None)
+ # NOTE: do NOT clear _cancelled_ops here — callers check is_cancelled()
+ # after the stream ends. The flag is cleared by clear_cancelled() instead.
+
+
+def clear_cancelled(op_id: str) -> None:
+ """Call after the generator has finished checking is_cancelled()."""
+ _cancelled_ops.discard(op_id)
+
+
+def cancel_op(op_id: str) -> bool:
+ """Terminate a running operation. Returns True if found and killed."""
+ proc = _active_ops.get(op_id)
+ if proc is None:
+ return False
+ _cancelled_ops.add(op_id)
+ try:
+ proc.terminate()
+ except ProcessLookupError:
+ pass
+ return True
+
+
+def is_cancelled(op_id: str) -> bool:
+ return op_id in _cancelled_ops
# nsenter via Docker: run commands on the host from inside the container.
# Required because ops backup/restore delegate to host Python venvs (3.12)
@@ -90,9 +133,9 @@
}
-async def stream_ops_host(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]:
+async def stream_ops_host(args: list[str], timeout: int = _DEFAULT_TIMEOUT, op_id: str | None = None) -> AsyncGenerator[str, None]:
"""Stream ops CLI output from the host via nsenter."""
- async for line in _stream_exec(_NSENTER_PREFIX + [OPS_CLI] + args, timeout=timeout):
+ async for line in _stream_exec(_NSENTER_PREFIX + [OPS_CLI] + args, timeout=timeout, op_id=op_id):
yield line
@@ -101,9 +144,9 @@
return await _run_exec(_NSENTER_PREFIX + args, timeout=timeout)
-async def stream_command_host(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]:
+async def stream_command_host(args: list[str], timeout: int = _DEFAULT_TIMEOUT, op_id: str | None = None) -> AsyncGenerator[str, None]:
"""Stream arbitrary command output from the host via nsenter."""
- async for line in _stream_exec(_NSENTER_PREFIX + args, timeout=timeout):
+ async for line in _stream_exec(_NSENTER_PREFIX + args, timeout=timeout, op_id=op_id):
yield line
@@ -137,7 +180,7 @@
return {"success": False, "output": "", "error": str(exc)}
-async def _stream_exec(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]:
+async def _stream_exec(args: list[str], timeout: int = _DEFAULT_TIMEOUT, op_id: str | None = None) -> AsyncGenerator[str, None]:
"""Execute a command and yield interleaved stdout/stderr lines."""
try:
proc = await asyncio.create_subprocess_exec(
@@ -152,53 +195,68 @@
yield f"[error] Failed to start process: {exc}"
return
- async def _readline(stream, prefix=""):
- while True:
+ if op_id:
+ register_op(op_id, proc)
+
+ try:
+ async def _readline(stream, prefix=""):
+ while True:
+ try:
+ line = await asyncio.wait_for(stream.readline(), timeout=timeout)
+ except asyncio.TimeoutError:
+ yield f"{prefix}[timeout] Command exceeded {timeout}s"
+ break
+ if not line:
+ break
+ yield prefix + line.decode("utf-8", errors="replace").rstrip("\n")
+
+ stdout_gen = _readline(proc.stdout).__aiter__()
+ stderr_gen = _readline(proc.stderr, "[stderr] ").__aiter__()
+
+ stdout_done = stderr_done = False
+ pending_out = pending_err = None
+
+ async def _next(it):
try:
- line = await asyncio.wait_for(stream.readline(), timeout=timeout)
- except asyncio.TimeoutError:
- yield f"{prefix}[timeout] Command exceeded {timeout}s"
+ return await it.__anext__()
+ except StopAsyncIteration:
+ return None
+
+ pending_out = asyncio.create_task(_next(stdout_gen))
+ pending_err = asyncio.create_task(_next(stderr_gen))
+
+ while not (stdout_done and stderr_done):
+ tasks = [t for t in (pending_out, pending_err) if t is not None]
+ if not tasks:
break
- if not line:
- break
- yield prefix + line.decode("utf-8", errors="replace").rstrip("\n")
+ done, _ = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
- stdout_gen = _readline(proc.stdout).__aiter__()
- stderr_gen = _readline(proc.stderr, "[stderr] ").__aiter__()
+ for task in done:
+ val = task.result()
+ if task is pending_out:
+ if val is None:
+ stdout_done = True
+ pending_out = None
+ else:
+ yield val
+ pending_out = asyncio.create_task(_next(stdout_gen))
+ elif task is pending_err:
+ if val is None:
+ stderr_done = True
+ pending_err = None
+ else:
+ yield val
+ pending_err = asyncio.create_task(_next(stderr_gen))
- stdout_done = stderr_done = False
- pending_out = pending_err = None
-
- async def _next(it):
+ await proc.wait()
+ except (asyncio.CancelledError, GeneratorExit):
+ # Browser disconnected or generator closed — kill the process
try:
- return await it.__anext__()
- except StopAsyncIteration:
- return None
-
- pending_out = asyncio.create_task(_next(stdout_gen))
- pending_err = asyncio.create_task(_next(stderr_gen))
-
- while not (stdout_done and stderr_done):
- tasks = [t for t in (pending_out, pending_err) if t is not None]
- if not tasks:
- break
- done, _ = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
-
- for task in done:
- val = task.result()
- if task is pending_out:
- if val is None:
- stdout_done = True
- pending_out = None
- else:
- yield val
- pending_out = asyncio.create_task(_next(stdout_gen))
- elif task is pending_err:
- if val is None:
- stderr_done = True
- pending_err = None
- else:
- yield val
- pending_err = asyncio.create_task(_next(stderr_gen))
-
- await proc.wait()
+ proc.terminate()
+ except ProcessLookupError:
+ pass
+ await proc.wait()
+ raise
+ finally:
+ if op_id:
+ deregister_op(op_id)
--
Gitblit v1.3.1