From fd03c16eca085423267c163137b28ccb60de8db0 Mon Sep 17 00:00:00 2001
From: Matthias Nott <mnott@mnsoft.org>
Date: Wed, 25 Feb 2026 00:45:13 +0100
Subject: [PATCH] feat: multi-compose rebuild (Seafile), cancel endpoint, schedule router, project descriptor

---
 app/ops_runner.py |  158 ++++++++++++++++++++++++++++++++++++----------------
 1 files changed, 108 insertions(+), 50 deletions(-)

diff --git a/app/ops_runner.py b/app/ops_runner.py
index d9a460a..460fe80 100644
--- a/app/ops_runner.py
+++ b/app/ops_runner.py
@@ -1,6 +1,7 @@
 import asyncio
 import json
 import os
+import uuid
 from typing import AsyncGenerator
 
 OPS_CLI = os.environ.get("OPS_CLI", "/opt/infrastructure/ops")
@@ -9,6 +10,48 @@
 
 _DEFAULT_TIMEOUT = 300
 _BACKUP_TIMEOUT = 3600
+
+# ---------------------------------------------------------------------------
+# Operation registry — tracks running processes for cancel support
+# ---------------------------------------------------------------------------
+_active_ops: dict[str, asyncio.subprocess.Process] = {}
+_cancelled_ops: set[str] = set()
+
+
+def new_op_id() -> str:
+    return uuid.uuid4().hex[:12]
+
+
+def register_op(op_id: str, proc: asyncio.subprocess.Process) -> None:
+    _active_ops[op_id] = proc
+
+
+def deregister_op(op_id: str) -> None:
+    _active_ops.pop(op_id, None)
+    # NOTE: do NOT clear _cancelled_ops here — callers check is_cancelled()
+    # after the stream ends. The flag is cleared by clear_cancelled() instead.
+
+
+def clear_cancelled(op_id: str) -> None:
+    """Call after the generator has finished checking is_cancelled()."""
+    _cancelled_ops.discard(op_id)
+
+
+def cancel_op(op_id: str) -> bool:
+    """Terminate a running operation. Returns True if found and killed."""
+    proc = _active_ops.get(op_id)
+    if proc is None:
+        return False
+    _cancelled_ops.add(op_id)
+    try:
+        proc.terminate()
+    except ProcessLookupError:
+        pass
+    return True
+
+
+def is_cancelled(op_id: str) -> bool:
+    return op_id in _cancelled_ops
 
 # nsenter via Docker: run commands on the host from inside the container.
 # Required because ops backup/restore delegate to host Python venvs (3.12)
@@ -90,9 +133,9 @@
         }
 
 
-async def stream_ops_host(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]:
+async def stream_ops_host(args: list[str], timeout: int = _DEFAULT_TIMEOUT, op_id: str | None = None) -> AsyncGenerator[str, None]:
     """Stream ops CLI output from the host via nsenter."""
-    async for line in _stream_exec(_NSENTER_PREFIX + [OPS_CLI] + args, timeout=timeout):
+    async for line in _stream_exec(_NSENTER_PREFIX + [OPS_CLI] + args, timeout=timeout, op_id=op_id):
         yield line
 
 
@@ -101,9 +144,9 @@
     return await _run_exec(_NSENTER_PREFIX + args, timeout=timeout)
 
 
-async def stream_command_host(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]:
+async def stream_command_host(args: list[str], timeout: int = _DEFAULT_TIMEOUT, op_id: str | None = None) -> AsyncGenerator[str, None]:
     """Stream arbitrary command output from the host via nsenter."""
-    async for line in _stream_exec(_NSENTER_PREFIX + args, timeout=timeout):
+    async for line in _stream_exec(_NSENTER_PREFIX + args, timeout=timeout, op_id=op_id):
         yield line
 
 
@@ -137,7 +180,7 @@
         return {"success": False, "output": "", "error": str(exc)}
 
 
-async def _stream_exec(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]:
+async def _stream_exec(args: list[str], timeout: int = _DEFAULT_TIMEOUT, op_id: str | None = None) -> AsyncGenerator[str, None]:
     """Execute a command and yield interleaved stdout/stderr lines."""
     try:
         proc = await asyncio.create_subprocess_exec(
@@ -152,53 +195,68 @@
         yield f"[error] Failed to start process: {exc}"
         return
 
-    async def _readline(stream, prefix=""):
-        while True:
+    if op_id:
+        register_op(op_id, proc)
+
+    try:
+        async def _readline(stream, prefix=""):
+            while True:
+                try:
+                    line = await asyncio.wait_for(stream.readline(), timeout=timeout)
+                except asyncio.TimeoutError:
+                    yield f"{prefix}[timeout] Command exceeded {timeout}s"
+                    break
+                if not line:
+                    break
+                yield prefix + line.decode("utf-8", errors="replace").rstrip("\n")
+
+        stdout_gen = _readline(proc.stdout).__aiter__()
+        stderr_gen = _readline(proc.stderr, "[stderr] ").__aiter__()
+
+        stdout_done = stderr_done = False
+        pending_out = pending_err = None
+
+        async def _next(it):
             try:
-                line = await asyncio.wait_for(stream.readline(), timeout=timeout)
-            except asyncio.TimeoutError:
-                yield f"{prefix}[timeout] Command exceeded {timeout}s"
+                return await it.__anext__()
+            except StopAsyncIteration:
+                return None
+
+        pending_out = asyncio.create_task(_next(stdout_gen))
+        pending_err = asyncio.create_task(_next(stderr_gen))
+
+        while not (stdout_done and stderr_done):
+            tasks = [t for t in (pending_out, pending_err) if t is not None]
+            if not tasks:
                 break
-            if not line:
-                break
-            yield prefix + line.decode("utf-8", errors="replace").rstrip("\n")
+            done, _ = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
 
-    stdout_gen = _readline(proc.stdout).__aiter__()
-    stderr_gen = _readline(proc.stderr, "[stderr] ").__aiter__()
+            for task in done:
+                val = task.result()
+                if task is pending_out:
+                    if val is None:
+                        stdout_done = True
+                        pending_out = None
+                    else:
+                        yield val
+                        pending_out = asyncio.create_task(_next(stdout_gen))
+                elif task is pending_err:
+                    if val is None:
+                        stderr_done = True
+                        pending_err = None
+                    else:
+                        yield val
+                        pending_err = asyncio.create_task(_next(stderr_gen))
 
-    stdout_done = stderr_done = False
-    pending_out = pending_err = None
-
-    async def _next(it):
+        await proc.wait()
+    except (asyncio.CancelledError, GeneratorExit):
+        # Browser disconnected or generator closed — kill the process
         try:
-            return await it.__anext__()
-        except StopAsyncIteration:
-            return None
-
-    pending_out = asyncio.create_task(_next(stdout_gen))
-    pending_err = asyncio.create_task(_next(stderr_gen))
-
-    while not (stdout_done and stderr_done):
-        tasks = [t for t in (pending_out, pending_err) if t is not None]
-        if not tasks:
-            break
-        done, _ = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
-
-        for task in done:
-            val = task.result()
-            if task is pending_out:
-                if val is None:
-                    stdout_done = True
-                    pending_out = None
-                else:
-                    yield val
-                    pending_out = asyncio.create_task(_next(stdout_gen))
-            elif task is pending_err:
-                if val is None:
-                    stderr_done = True
-                    pending_err = None
-                else:
-                    yield val
-                    pending_err = asyncio.create_task(_next(stderr_gen))
-
-    await proc.wait()
+            proc.terminate()
+        except ProcessLookupError:
+            pass
+        await proc.wait()
+        raise
+    finally:
+        if op_id:
+            deregister_op(op_id)

--
Gitblit v1.3.1