Matthias Nott
2026-02-25 fd03c16eca085423267c163137b28ccb60de8db0
app/ops_runner.py
....@@ -1,6 +1,7 @@
11 import asyncio
22 import json
33 import os
4
+import uuid
45 from typing import AsyncGenerator
56
67 OPS_CLI = os.environ.get("OPS_CLI", "/opt/infrastructure/ops")
....@@ -9,6 +10,48 @@
910
1011 _DEFAULT_TIMEOUT = 300
1112 _BACKUP_TIMEOUT = 3600
13
+
14
+# ---------------------------------------------------------------------------
15
+# Operation registry — tracks running processes for cancel support
16
+# ---------------------------------------------------------------------------
17
+_active_ops: dict[str, asyncio.subprocess.Process] = {}
18
+_cancelled_ops: set[str] = set()
19
+
20
+
21
+def new_op_id() -> str:
22
+ return uuid.uuid4().hex[:12]
23
+
24
+
25
+def register_op(op_id: str, proc: asyncio.subprocess.Process) -> None:
26
+ _active_ops[op_id] = proc
27
+
28
+
29
+def deregister_op(op_id: str) -> None:
30
+ _active_ops.pop(op_id, None)
31
+ # NOTE: do NOT clear _cancelled_ops here — callers check is_cancelled()
32
+ # after the stream ends. The flag is cleared by clear_cancelled() instead.
33
+
34
+
35
+def clear_cancelled(op_id: str) -> None:
36
+ """Call after the generator has finished checking is_cancelled()."""
37
+ _cancelled_ops.discard(op_id)
38
+
39
+
40
+def cancel_op(op_id: str) -> bool:
41
+ """Terminate a running operation. Returns True if found and killed."""
42
+ proc = _active_ops.get(op_id)
43
+ if proc is None:
44
+ return False
45
+ _cancelled_ops.add(op_id)
46
+ try:
47
+ proc.terminate()
48
+ except ProcessLookupError:
49
+ pass
50
+ return True
51
+
52
+
53
+def is_cancelled(op_id: str) -> bool:
54
+ return op_id in _cancelled_ops
1255
1356 # nsenter via Docker: run commands on the host from inside the container.
1457 # Required because ops backup/restore delegate to host Python venvs (3.12)
....@@ -90,9 +133,9 @@
90133 }
91134
92135
93
-async def stream_ops_host(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]:
136
+async def stream_ops_host(args: list[str], timeout: int = _DEFAULT_TIMEOUT, op_id: str | None = None) -> AsyncGenerator[str, None]:
94137 """Stream ops CLI output from the host via nsenter."""
95
- async for line in _stream_exec(_NSENTER_PREFIX + [OPS_CLI] + args, timeout=timeout):
138
+ async for line in _stream_exec(_NSENTER_PREFIX + [OPS_CLI] + args, timeout=timeout, op_id=op_id):
96139 yield line
97140
98141
....@@ -101,9 +144,9 @@
101144 return await _run_exec(_NSENTER_PREFIX + args, timeout=timeout)
102145
103146
104
-async def stream_command_host(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]:
147
+async def stream_command_host(args: list[str], timeout: int = _DEFAULT_TIMEOUT, op_id: str | None = None) -> AsyncGenerator[str, None]:
105148 """Stream arbitrary command output from the host via nsenter."""
106
- async for line in _stream_exec(_NSENTER_PREFIX + args, timeout=timeout):
149
+ async for line in _stream_exec(_NSENTER_PREFIX + args, timeout=timeout, op_id=op_id):
107150 yield line
108151
109152
....@@ -137,7 +180,7 @@
137180 return {"success": False, "output": "", "error": str(exc)}
138181
139182
140
-async def _stream_exec(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]:
183
+async def _stream_exec(args: list[str], timeout: int = _DEFAULT_TIMEOUT, op_id: str | None = None) -> AsyncGenerator[str, None]:
141184 """Execute a command and yield interleaved stdout/stderr lines."""
142185 try:
143186 proc = await asyncio.create_subprocess_exec(
....@@ -152,53 +195,68 @@
152195 yield f"[error] Failed to start process: {exc}"
153196 return
154197
155
- async def _readline(stream, prefix=""):
156
- while True:
198
+ if op_id:
199
+ register_op(op_id, proc)
200
+
201
+ try:
202
+ async def _readline(stream, prefix=""):
203
+ while True:
204
+ try:
205
+ line = await asyncio.wait_for(stream.readline(), timeout=timeout)
206
+ except asyncio.TimeoutError:
207
+ yield f"{prefix}[timeout] Command exceeded {timeout}s"
208
+ break
209
+ if not line:
210
+ break
211
+ yield prefix + line.decode("utf-8", errors="replace").rstrip("\n")
212
+
213
+ stdout_gen = _readline(proc.stdout).__aiter__()
214
+ stderr_gen = _readline(proc.stderr, "[stderr] ").__aiter__()
215
+
216
+ stdout_done = stderr_done = False
217
+ pending_out = pending_err = None
218
+
219
+ async def _next(it):
157220 try:
158
- line = await asyncio.wait_for(stream.readline(), timeout=timeout)
159
- except asyncio.TimeoutError:
160
- yield f"{prefix}[timeout] Command exceeded {timeout}s"
221
+ return await it.__anext__()
222
+ except StopAsyncIteration:
223
+ return None
224
+
225
+ pending_out = asyncio.create_task(_next(stdout_gen))
226
+ pending_err = asyncio.create_task(_next(stderr_gen))
227
+
228
+ while not (stdout_done and stderr_done):
229
+ tasks = [t for t in (pending_out, pending_err) if t is not None]
230
+ if not tasks:
161231 break
162
- if not line:
163
- break
164
- yield prefix + line.decode("utf-8", errors="replace").rstrip("\n")
232
+ done, _ = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
165233
166
- stdout_gen = _readline(proc.stdout).__aiter__()
167
- stderr_gen = _readline(proc.stderr, "[stderr] ").__aiter__()
234
+ for task in done:
235
+ val = task.result()
236
+ if task is pending_out:
237
+ if val is None:
238
+ stdout_done = True
239
+ pending_out = None
240
+ else:
241
+ yield val
242
+ pending_out = asyncio.create_task(_next(stdout_gen))
243
+ elif task is pending_err:
244
+ if val is None:
245
+ stderr_done = True
246
+ pending_err = None
247
+ else:
248
+ yield val
249
+ pending_err = asyncio.create_task(_next(stderr_gen))
168250
169
- stdout_done = stderr_done = False
170
- pending_out = pending_err = None
171
-
172
- async def _next(it):
251
+ await proc.wait()
252
+ except (asyncio.CancelledError, GeneratorExit):
253
+ # Browser disconnected or generator closed — kill the process
173254 try:
174
- return await it.__anext__()
175
- except StopAsyncIteration:
176
- return None
177
-
178
- pending_out = asyncio.create_task(_next(stdout_gen))
179
- pending_err = asyncio.create_task(_next(stderr_gen))
180
-
181
- while not (stdout_done and stderr_done):
182
- tasks = [t for t in (pending_out, pending_err) if t is not None]
183
- if not tasks:
184
- break
185
- done, _ = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
186
-
187
- for task in done:
188
- val = task.result()
189
- if task is pending_out:
190
- if val is None:
191
- stdout_done = True
192
- pending_out = None
193
- else:
194
- yield val
195
- pending_out = asyncio.create_task(_next(stdout_gen))
196
- elif task is pending_err:
197
- if val is None:
198
- stderr_done = True
199
- pending_err = None
200
- else:
201
- yield val
202
- pending_err = asyncio.create_task(_next(stderr_gen))
203
-
204
- await proc.wait()
255
+ proc.terminate()
256
+ except ProcessLookupError:
257
+ pass
258
+ await proc.wait()
259
+ raise
260
+ finally:
261
+ if op_id:
262
+ deregister_op(op_id)