Matthias Nott
2026-02-21 7300351bb9fb147f4de81a60423c4561a4924c21
app/app/ops_runner.py
....@@ -10,58 +10,31 @@
1010 _DEFAULT_TIMEOUT = 300
1111 _BACKUP_TIMEOUT = 3600
1212
13
+# nsenter via Docker: run commands on the host from inside the container.
14
+# Required because ops backup/restore delegate to host Python venvs (3.12)
15
+# that are incompatible with the container's Python (3.11).
16
+_NSENTER_PREFIX = [
17
+ "docker", "run", "--rm", "-i",
18
+ "--privileged", "--pid=host", "--network=host",
19
+ "alpine",
20
+ "nsenter", "-t", "1", "-m", "-u", "-i", "-n", "-p", "--",
21
+]
22
+
23
+
24
+# ---------------------------------------------------------------------------
25
+# In-container execution (status, disk, health, docker commands)
26
+# ---------------------------------------------------------------------------
1327
1428 async def run_ops(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
15
- """
16
- Run the ops CLI with the given arguments and capture output.
17
- Returns {"success": bool, "output": str, "error": str}.
18
- """
19
- try:
20
- proc = await asyncio.create_subprocess_exec(
21
- OPS_CLI,
22
- *args,
23
- stdout=asyncio.subprocess.PIPE,
24
- stderr=asyncio.subprocess.PIPE,
25
- )
26
- try:
27
- stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
28
- except asyncio.TimeoutError:
29
- proc.kill()
30
- await proc.communicate()
31
- return {
32
- "success": False,
33
- "output": "",
34
- "error": f"Command timed out after {timeout}s",
35
- }
36
-
37
- return {
38
- "success": proc.returncode == 0,
39
- "output": stdout.decode("utf-8", errors="replace"),
40
- "error": stderr.decode("utf-8", errors="replace"),
41
- }
42
- except FileNotFoundError:
43
- return {
44
- "success": False,
45
- "output": "",
46
- "error": f"ops CLI not found at {OPS_CLI}",
47
- }
48
- except Exception as exc:
49
- return {
50
- "success": False,
51
- "output": "",
52
- "error": str(exc),
53
- }
29
+ """Run the ops CLI inside the container."""
30
+ return await _run_exec([OPS_CLI] + args, timeout=timeout)
5431
5532
5633 async def run_ops_json(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
57
- """
58
- Run the ops CLI with --json appended and return the parsed JSON output.
59
- Returns {"success": bool, "data": ..., "error": str}.
60
- """
34
+ """Run the ops CLI with --json and return parsed JSON."""
6135 result = await run_ops(args + ["--json"], timeout=timeout)
6236 if not result["success"]:
6337 return {"success": False, "data": None, "error": result["error"] or result["output"]}
64
-
6538 try:
6639 data = json.loads(result["output"])
6740 return {"success": True, "data": data, "error": ""}
....@@ -69,95 +42,48 @@
6942 return {
7043 "success": False,
7144 "data": None,
72
- "error": f"Failed to parse JSON output: {exc}\nRaw output: {result['output'][:500]}",
45
+ "error": f"Failed to parse JSON: {exc}\nRaw: {result['output'][:500]}",
7346 }
7447
7548
7649 async def stream_ops(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]:
77
- """
78
- Async generator that yields lines of stdout from the ops CLI.
79
- Also yields stderr lines prefixed with '[stderr] '.
80
- """
81
- try:
82
- proc = await asyncio.create_subprocess_exec(
83
- OPS_CLI,
84
- *args,
85
- stdout=asyncio.subprocess.PIPE,
86
- stderr=asyncio.subprocess.PIPE,
87
- )
88
- except FileNotFoundError:
89
- yield f"[error] ops CLI not found at {OPS_CLI}"
90
- return
91
- except Exception as exc:
92
- yield f"[error] Failed to start process: {exc}"
93
- return
94
-
95
- async def _read_stream(stream: asyncio.StreamReader, prefix: str = "") -> AsyncGenerator[str, None]:
96
- while True:
97
- try:
98
- line = await asyncio.wait_for(stream.readline(), timeout=timeout)
99
- except asyncio.TimeoutError:
100
- yield f"{prefix}[timeout] Command exceeded {timeout}s"
101
- break
102
- if not line:
103
- break
104
- yield prefix + line.decode("utf-8", errors="replace").rstrip("\n")
105
-
106
- # Interleave stdout and stderr
107
- stdout_gen = _read_stream(proc.stdout)
108
- stderr_gen = _read_stream(proc.stderr, prefix="[stderr] ")
109
-
110
- stdout_done = False
111
- stderr_done = False
112
-
113
- stdout_iter = stdout_gen.__aiter__()
114
- stderr_iter = stderr_gen.__aiter__()
115
-
116
- pending_stdout: asyncio.Task | None = None
117
- pending_stderr: asyncio.Task | None = None
118
-
119
- async def _next(it):
120
- try:
121
- return await it.__anext__()
122
- except StopAsyncIteration:
123
- return None
124
-
125
- pending_stdout = asyncio.create_task(_next(stdout_iter))
126
- pending_stderr = asyncio.create_task(_next(stderr_iter))
127
-
128
- while not (stdout_done and stderr_done):
129
- done, _ = await asyncio.wait(
130
- [t for t in [pending_stdout, pending_stderr] if t is not None],
131
- return_when=asyncio.FIRST_COMPLETED,
132
- )
133
-
134
- for task in done:
135
- val = task.result()
136
- if task is pending_stdout:
137
- if val is None:
138
- stdout_done = True
139
- pending_stdout = None
140
- else:
141
- yield val
142
- pending_stdout = asyncio.create_task(_next(stdout_iter))
143
- elif task is pending_stderr:
144
- if val is None:
145
- stderr_done = True
146
- pending_stderr = None
147
- else:
148
- yield val
149
- pending_stderr = asyncio.create_task(_next(stderr_iter))
150
-
151
- await proc.wait()
50
+ """Stream ops CLI output (in-container)."""
51
+ async for line in _stream_exec([OPS_CLI] + args, timeout=timeout):
52
+ yield line
15253
15354
154
-async def run_command(
155
- args: list[str], timeout: int = _DEFAULT_TIMEOUT
156
-) -> dict:
157
- """
158
- Generic command runner (non-ops). Accepts a full argv list.
159
- Returns {"success": bool, "output": str, "error": str}.
160
- """
55
+async def run_command(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
56
+ """Generic command runner (in-container)."""
57
+ return await _run_exec(args, timeout=timeout)
58
+
59
+
60
+async def stream_command(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]:
61
+ """Stream generic command output (in-container)."""
62
+ async for line in _stream_exec(args, timeout=timeout):
63
+ yield line
64
+
65
+
66
+# ---------------------------------------------------------------------------
67
+# Host execution (backup, restore — needs host Python venvs)
68
+# ---------------------------------------------------------------------------
69
+
70
+async def run_ops_host(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
71
+ """Run the ops CLI on the host via nsenter."""
72
+ return await _run_exec(_NSENTER_PREFIX + [OPS_CLI] + args, timeout=timeout)
73
+
74
+
75
+async def stream_ops_host(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]:
76
+ """Stream ops CLI output from the host via nsenter."""
77
+ async for line in _stream_exec(_NSENTER_PREFIX + [OPS_CLI] + args, timeout=timeout):
78
+ yield line
79
+
80
+
81
+# ---------------------------------------------------------------------------
82
+# Internal helpers
83
+# ---------------------------------------------------------------------------
84
+
85
+async def _run_exec(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
86
+ """Execute a command and capture output."""
16187 try:
16288 proc = await asyncio.create_subprocess_exec(
16389 *args,
....@@ -169,11 +95,7 @@
16995 except asyncio.TimeoutError:
17096 proc.kill()
17197 await proc.communicate()
172
- return {
173
- "success": False,
174
- "output": "",
175
- "error": f"Command timed out after {timeout}s",
176
- }
98
+ return {"success": False, "output": "", "error": f"Command timed out after {timeout}s"}
17799
178100 return {
179101 "success": proc.returncode == 0,
....@@ -186,12 +108,8 @@
186108 return {"success": False, "output": "", "error": str(exc)}
187109
188110
189
-async def stream_command(
190
- args: list[str], timeout: int = _DEFAULT_TIMEOUT
191
-) -> AsyncGenerator[str, None]:
192
- """
193
- Async generator that yields lines of stdout for an arbitrary command.
194
- """
111
+async def _stream_exec(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]:
112
+ """Execute a command and yield interleaved stdout/stderr lines."""
195113 try:
196114 proc = await asyncio.create_subprocess_exec(
197115 *args,
....@@ -202,24 +120,56 @@
202120 yield f"[error] Executable not found: {exc}"
203121 return
204122 except Exception as exc:
205
- yield f"[error] {exc}"
123
+ yield f"[error] Failed to start process: {exc}"
206124 return
207125
208
- while True:
209
- try:
210
- line = await asyncio.wait_for(proc.stdout.readline(), timeout=timeout)
211
- except asyncio.TimeoutError:
212
- yield f"[timeout] Command exceeded {timeout}s"
213
- proc.kill()
214
- break
215
- if not line:
216
- break
217
- yield line.decode("utf-8", errors="replace").rstrip("\n")
126
+ async def _readline(stream, prefix=""):
127
+ while True:
128
+ try:
129
+ line = await asyncio.wait_for(stream.readline(), timeout=timeout)
130
+ except asyncio.TimeoutError:
131
+ yield f"{prefix}[timeout] Command exceeded {timeout}s"
132
+ break
133
+ if not line:
134
+ break
135
+ yield prefix + line.decode("utf-8", errors="replace").rstrip("\n")
218136
219
- # Flush stderr as trailing info
220
- stderr_data = await proc.stderr.read()
221
- if stderr_data:
222
- for ln in stderr_data.decode("utf-8", errors="replace").splitlines():
223
- yield f"[stderr] {ln}"
137
+ stdout_gen = _readline(proc.stdout).__aiter__()
138
+ stderr_gen = _readline(proc.stderr, "[stderr] ").__aiter__()
139
+
140
+ stdout_done = stderr_done = False
141
+ pending_out = pending_err = None
142
+
143
+ async def _next(it):
144
+ try:
145
+ return await it.__anext__()
146
+ except StopAsyncIteration:
147
+ return None
148
+
149
+ pending_out = asyncio.create_task(_next(stdout_gen))
150
+ pending_err = asyncio.create_task(_next(stderr_gen))
151
+
152
+ while not (stdout_done and stderr_done):
153
+ tasks = [t for t in (pending_out, pending_err) if t is not None]
154
+ if not tasks:
155
+ break
156
+ done, _ = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
157
+
158
+ for task in done:
159
+ val = task.result()
160
+ if task is pending_out:
161
+ if val is None:
162
+ stdout_done = True
163
+ pending_out = None
164
+ else:
165
+ yield val
166
+ pending_out = asyncio.create_task(_next(stdout_gen))
167
+ elif task is pending_err:
168
+ if val is None:
169
+ stderr_done = True
170
+ pending_err = None
171
+ else:
172
+ yield val
173
+ pending_err = asyncio.create_task(_next(stderr_gen))
224174
225175 await proc.wait()