Matthias Nott
2026-02-21 7300351bb9fb147f4de81a60423c4561a4924c21
fix: Run backup/restore on host via nsenter for Python venv compatibility

Host Python venvs (3.12) cannot run inside the container (Python 3.11).
Added run_ops_host/stream_ops_host in ops_runner.py that use
docker run --privileged --pid=host alpine nsenter to execute ops
commands in the host namespace. Applied to backup and restore flows.
6 files modified
changed files
app/app/ops_runner.py patch | view | blame | history
app/app/routers/backups.py patch | view | blame | history
app/app/routers/restore.py patch | view | blame | history
app/ops_runner.py patch | view | blame | history
app/routers/backups.py patch | view | blame | history
app/routers/restore.py patch | view | blame | history
app/app/ops_runner.py
....@@ -10,58 +10,31 @@
1010 _DEFAULT_TIMEOUT = 300
1111 _BACKUP_TIMEOUT = 3600
1212
13
+# nsenter via Docker: run commands on the host from inside the container.
14
+# Required because ops backup/restore delegate to host Python venvs (3.12)
15
+# that are incompatible with the container's Python (3.11).
16
+_NSENTER_PREFIX = [
17
+ "docker", "run", "--rm", "-i",
18
+ "--privileged", "--pid=host", "--network=host",
19
+ "alpine",
20
+ "nsenter", "-t", "1", "-m", "-u", "-i", "-n", "-p", "--",
21
+]
22
+
23
+
24
+# ---------------------------------------------------------------------------
25
+# In-container execution (status, disk, health, docker commands)
26
+# ---------------------------------------------------------------------------
1327
1428 async def run_ops(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
15
- """
16
- Run the ops CLI with the given arguments and capture output.
17
- Returns {"success": bool, "output": str, "error": str}.
18
- """
19
- try:
20
- proc = await asyncio.create_subprocess_exec(
21
- OPS_CLI,
22
- *args,
23
- stdout=asyncio.subprocess.PIPE,
24
- stderr=asyncio.subprocess.PIPE,
25
- )
26
- try:
27
- stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
28
- except asyncio.TimeoutError:
29
- proc.kill()
30
- await proc.communicate()
31
- return {
32
- "success": False,
33
- "output": "",
34
- "error": f"Command timed out after {timeout}s",
35
- }
36
-
37
- return {
38
- "success": proc.returncode == 0,
39
- "output": stdout.decode("utf-8", errors="replace"),
40
- "error": stderr.decode("utf-8", errors="replace"),
41
- }
42
- except FileNotFoundError:
43
- return {
44
- "success": False,
45
- "output": "",
46
- "error": f"ops CLI not found at {OPS_CLI}",
47
- }
48
- except Exception as exc:
49
- return {
50
- "success": False,
51
- "output": "",
52
- "error": str(exc),
53
- }
29
+ """Run the ops CLI inside the container."""
30
+ return await _run_exec([OPS_CLI] + args, timeout=timeout)
5431
5532
5633 async def run_ops_json(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
57
- """
58
- Run the ops CLI with --json appended and return the parsed JSON output.
59
- Returns {"success": bool, "data": ..., "error": str}.
60
- """
34
+ """Run the ops CLI with --json and return parsed JSON."""
6135 result = await run_ops(args + ["--json"], timeout=timeout)
6236 if not result["success"]:
6337 return {"success": False, "data": None, "error": result["error"] or result["output"]}
64
-
6538 try:
6639 data = json.loads(result["output"])
6740 return {"success": True, "data": data, "error": ""}
....@@ -69,95 +42,48 @@
6942 return {
7043 "success": False,
7144 "data": None,
72
- "error": f"Failed to parse JSON output: {exc}\nRaw output: {result['output'][:500]}",
45
+ "error": f"Failed to parse JSON: {exc}\nRaw: {result['output'][:500]}",
7346 }
7447
7548
7649 async def stream_ops(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]:
77
- """
78
- Async generator that yields lines of stdout from the ops CLI.
79
- Also yields stderr lines prefixed with '[stderr] '.
80
- """
81
- try:
82
- proc = await asyncio.create_subprocess_exec(
83
- OPS_CLI,
84
- *args,
85
- stdout=asyncio.subprocess.PIPE,
86
- stderr=asyncio.subprocess.PIPE,
87
- )
88
- except FileNotFoundError:
89
- yield f"[error] ops CLI not found at {OPS_CLI}"
90
- return
91
- except Exception as exc:
92
- yield f"[error] Failed to start process: {exc}"
93
- return
94
-
95
- async def _read_stream(stream: asyncio.StreamReader, prefix: str = "") -> AsyncGenerator[str, None]:
96
- while True:
97
- try:
98
- line = await asyncio.wait_for(stream.readline(), timeout=timeout)
99
- except asyncio.TimeoutError:
100
- yield f"{prefix}[timeout] Command exceeded {timeout}s"
101
- break
102
- if not line:
103
- break
104
- yield prefix + line.decode("utf-8", errors="replace").rstrip("\n")
105
-
106
- # Interleave stdout and stderr
107
- stdout_gen = _read_stream(proc.stdout)
108
- stderr_gen = _read_stream(proc.stderr, prefix="[stderr] ")
109
-
110
- stdout_done = False
111
- stderr_done = False
112
-
113
- stdout_iter = stdout_gen.__aiter__()
114
- stderr_iter = stderr_gen.__aiter__()
115
-
116
- pending_stdout: asyncio.Task | None = None
117
- pending_stderr: asyncio.Task | None = None
118
-
119
- async def _next(it):
120
- try:
121
- return await it.__anext__()
122
- except StopAsyncIteration:
123
- return None
124
-
125
- pending_stdout = asyncio.create_task(_next(stdout_iter))
126
- pending_stderr = asyncio.create_task(_next(stderr_iter))
127
-
128
- while not (stdout_done and stderr_done):
129
- done, _ = await asyncio.wait(
130
- [t for t in [pending_stdout, pending_stderr] if t is not None],
131
- return_when=asyncio.FIRST_COMPLETED,
132
- )
133
-
134
- for task in done:
135
- val = task.result()
136
- if task is pending_stdout:
137
- if val is None:
138
- stdout_done = True
139
- pending_stdout = None
140
- else:
141
- yield val
142
- pending_stdout = asyncio.create_task(_next(stdout_iter))
143
- elif task is pending_stderr:
144
- if val is None:
145
- stderr_done = True
146
- pending_stderr = None
147
- else:
148
- yield val
149
- pending_stderr = asyncio.create_task(_next(stderr_iter))
150
-
151
- await proc.wait()
50
+ """Stream ops CLI output (in-container)."""
51
+ async for line in _stream_exec([OPS_CLI] + args, timeout=timeout):
52
+ yield line
15253
15354
154
-async def run_command(
155
- args: list[str], timeout: int = _DEFAULT_TIMEOUT
156
-) -> dict:
157
- """
158
- Generic command runner (non-ops). Accepts a full argv list.
159
- Returns {"success": bool, "output": str, "error": str}.
160
- """
55
+async def run_command(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
56
+ """Generic command runner (in-container)."""
57
+ return await _run_exec(args, timeout=timeout)
58
+
59
+
60
+async def stream_command(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]:
61
+ """Stream generic command output (in-container)."""
62
+ async for line in _stream_exec(args, timeout=timeout):
63
+ yield line
64
+
65
+
66
+# ---------------------------------------------------------------------------
67
+# Host execution (backup, restore — needs host Python venvs)
68
+# ---------------------------------------------------------------------------
69
+
70
+async def run_ops_host(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
71
+ """Run the ops CLI on the host via nsenter."""
72
+ return await _run_exec(_NSENTER_PREFIX + [OPS_CLI] + args, timeout=timeout)
73
+
74
+
75
+async def stream_ops_host(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]:
76
+ """Stream ops CLI output from the host via nsenter."""
77
+ async for line in _stream_exec(_NSENTER_PREFIX + [OPS_CLI] + args, timeout=timeout):
78
+ yield line
79
+
80
+
81
+# ---------------------------------------------------------------------------
82
+# Internal helpers
83
+# ---------------------------------------------------------------------------
84
+
85
+async def _run_exec(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
86
+ """Execute a command and capture output."""
16187 try:
16288 proc = await asyncio.create_subprocess_exec(
16389 *args,
....@@ -169,11 +95,7 @@
16995 except asyncio.TimeoutError:
17096 proc.kill()
17197 await proc.communicate()
172
- return {
173
- "success": False,
174
- "output": "",
175
- "error": f"Command timed out after {timeout}s",
176
- }
98
+ return {"success": False, "output": "", "error": f"Command timed out after {timeout}s"}
17799
178100 return {
179101 "success": proc.returncode == 0,
....@@ -186,12 +108,8 @@
186108 return {"success": False, "output": "", "error": str(exc)}
187109
188110
189
-async def stream_command(
190
- args: list[str], timeout: int = _DEFAULT_TIMEOUT
191
-) -> AsyncGenerator[str, None]:
192
- """
193
- Async generator that yields lines of stdout for an arbitrary command.
194
- """
111
+async def _stream_exec(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]:
112
+ """Execute a command and yield interleaved stdout/stderr lines."""
195113 try:
196114 proc = await asyncio.create_subprocess_exec(
197115 *args,
....@@ -202,24 +120,56 @@
202120 yield f"[error] Executable not found: {exc}"
203121 return
204122 except Exception as exc:
205
- yield f"[error] {exc}"
123
+ yield f"[error] Failed to start process: {exc}"
206124 return
207125
208
- while True:
209
- try:
210
- line = await asyncio.wait_for(proc.stdout.readline(), timeout=timeout)
211
- except asyncio.TimeoutError:
212
- yield f"[timeout] Command exceeded {timeout}s"
213
- proc.kill()
214
- break
215
- if not line:
216
- break
217
- yield line.decode("utf-8", errors="replace").rstrip("\n")
126
+ async def _readline(stream, prefix=""):
127
+ while True:
128
+ try:
129
+ line = await asyncio.wait_for(stream.readline(), timeout=timeout)
130
+ except asyncio.TimeoutError:
131
+ yield f"{prefix}[timeout] Command exceeded {timeout}s"
132
+ break
133
+ if not line:
134
+ break
135
+ yield prefix + line.decode("utf-8", errors="replace").rstrip("\n")
218136
219
- # Flush stderr as trailing info
220
- stderr_data = await proc.stderr.read()
221
- if stderr_data:
222
- for ln in stderr_data.decode("utf-8", errors="replace").splitlines():
223
- yield f"[stderr] {ln}"
137
+ stdout_gen = _readline(proc.stdout).__aiter__()
138
+ stderr_gen = _readline(proc.stderr, "[stderr] ").__aiter__()
139
+
140
+ stdout_done = stderr_done = False
141
+ pending_out = pending_err = None
142
+
143
+ async def _next(it):
144
+ try:
145
+ return await it.__anext__()
146
+ except StopAsyncIteration:
147
+ return None
148
+
149
+ pending_out = asyncio.create_task(_next(stdout_gen))
150
+ pending_err = asyncio.create_task(_next(stderr_gen))
151
+
152
+ while not (stdout_done and stderr_done):
153
+ tasks = [t for t in (pending_out, pending_err) if t is not None]
154
+ if not tasks:
155
+ break
156
+ done, _ = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
157
+
158
+ for task in done:
159
+ val = task.result()
160
+ if task is pending_out:
161
+ if val is None:
162
+ stdout_done = True
163
+ pending_out = None
164
+ else:
165
+ yield val
166
+ pending_out = asyncio.create_task(_next(stdout_gen))
167
+ elif task is pending_err:
168
+ if val is None:
169
+ stderr_done = True
170
+ pending_err = None
171
+ else:
172
+ yield val
173
+ pending_err = asyncio.create_task(_next(stderr_gen))
224174
225175 await proc.wait()
app/app/routers/backups.py
....@@ -3,7 +3,7 @@
33 from fastapi import APIRouter, Depends, HTTPException
44
55 from app.auth import verify_token
6
-from app.ops_runner import run_ops, run_ops_json, _BACKUP_TIMEOUT
6
+from app.ops_runner import run_ops, run_ops_json, run_ops_host, _BACKUP_TIMEOUT
77
88 router = APIRouter()
99
....@@ -12,15 +12,10 @@
1212 async def list_backups(
1313 _: str = Depends(verify_token),
1414 ) -> list[dict[str, Any]]:
15
- """
16
- Returns a list of local backup records from `ops backups --json`.
17
- """
15
+ """Returns a list of local backup records from `ops backups --json`."""
1816 result = await run_ops_json(["backups"])
1917 if not result["success"]:
20
- raise HTTPException(
21
- status_code=500,
22
- detail=f"Failed to list backups: {result['error']}",
23
- )
18
+ raise HTTPException(status_code=500, detail=f"Failed to list backups: {result['error']}")
2419
2520 data = result["data"]
2621 if isinstance(data, list):
....@@ -37,9 +32,7 @@
3732 async def list_offsite_backups(
3833 _: str = Depends(verify_token),
3934 ) -> list[dict[str, Any]]:
40
- """
41
- Returns a list of offsite backup records, querying each project separately.
42
- """
35
+ """Returns a list of offsite backup records."""
4336 all_backups = []
4437 for project in ["mdf", "seriousletter"]:
4538 result = await run_ops_json(["offsite", "list", project])
....@@ -57,9 +50,12 @@
5750 _: str = Depends(verify_token),
5851 ) -> dict[str, Any]:
5952 """
60
- Runs `ops backup {project} {env}` and returns the result.
53
+ Runs `ops backup {project} {env}` on the host.
54
+
55
+ Runs via nsenter because ops backup delegates to project CLIs
56
+ that use host Python venvs.
6157 """
62
- result = await run_ops(["backup", project, env], timeout=_BACKUP_TIMEOUT)
58
+ result = await run_ops_host(["backup", project, env], timeout=_BACKUP_TIMEOUT)
6359 if not result["success"]:
6460 raise HTTPException(
6561 status_code=500,
....@@ -79,10 +75,8 @@
7975 env: str,
8076 _: str = Depends(verify_token),
8177 ) -> dict[str, Any]:
82
- """
83
- Runs `ops offsite upload {project} {env}` and returns the result.
84
- """
85
- result = await run_ops(
78
+ """Runs `ops offsite upload {project} {env}` on the host."""
79
+ result = await run_ops_host(
8680 ["offsite", "upload", project, env], timeout=_BACKUP_TIMEOUT
8781 )
8882 if not result["success"]:
....@@ -90,28 +84,18 @@
9084 status_code=500,
9185 detail=f"Offsite upload failed: {result['error'] or result['output']}",
9286 )
93
- return {
94
- "success": True,
95
- "output": result["output"],
96
- "project": project,
97
- "env": env,
98
- }
87
+ return {"success": True, "output": result["output"], "project": project, "env": env}
9988
10089
10190 @router.post("/offsite/retention", summary="Apply offsite retention policy")
10291 async def apply_retention(
10392 _: str = Depends(verify_token),
10493 ) -> dict[str, Any]:
105
- """
106
- Runs `ops offsite retention` and returns the result.
107
- """
108
- result = await run_ops(["offsite", "retention"], timeout=_BACKUP_TIMEOUT)
94
+ """Runs `ops offsite retention` on the host."""
95
+ result = await run_ops_host(["offsite", "retention"], timeout=_BACKUP_TIMEOUT)
10996 if not result["success"]:
11097 raise HTTPException(
11198 status_code=500,
11299 detail=f"Retention policy failed: {result['error'] or result['output']}",
113100 )
114
- return {
115
- "success": True,
116
- "output": result["output"],
117
- }
101
+ return {"success": True, "output": result["output"]}
app/app/routers/restore.py
....@@ -6,7 +6,7 @@
66 from fastapi.responses import StreamingResponse
77
88 from app.auth import verify_token
9
-from app.ops_runner import _BACKUP_TIMEOUT, stream_ops
9
+from app.ops_runner import _BACKUP_TIMEOUT, stream_ops_host
1010
1111 router = APIRouter()
1212
....@@ -22,18 +22,22 @@
2222 source: str,
2323 dry_run: bool,
2424 ) -> AsyncGenerator[str, None]:
25
- """Async generator that drives the restore workflow and yields SSE events."""
25
+ """Async generator that drives the restore workflow and yields SSE events.
26
+
27
+ Runs on the host via nsenter because ops restore delegates to project CLIs
28
+ that use host Python venvs incompatible with the container's Python.
29
+ """
2630 base_args = ["restore", project, env]
2731 if dry_run:
2832 base_args.append("--dry-run")
2933
3034 if source == "offsite":
31
- # ops offsite restore <project> <env> — downloads from offsite storage
35
+ # ops offsite restore <project> <env>
3236 download_args = ["offsite", "restore", project, env]
3337 yield _sse_line({"line": f"Downloading {project}/{env} from offsite...", "timestamp": _now()})
3438
3539 download_ok = True
36
- async for line in stream_ops(download_args, timeout=_BACKUP_TIMEOUT):
40
+ async for line in stream_ops_host(download_args, timeout=_BACKUP_TIMEOUT):
3741 yield _sse_line({"line": line, "timestamp": _now()})
3842 if line.startswith("[error]"):
3943 download_ok = False
....@@ -45,7 +49,7 @@
4549 yield _sse_line({"line": "Download complete. Starting restore...", "timestamp": _now()})
4650
4751 success = True
48
- async for line in stream_ops(base_args, timeout=_BACKUP_TIMEOUT):
52
+ async for line in stream_ops_host(base_args, timeout=_BACKUP_TIMEOUT):
4953 yield _sse_line({"line": line, "timestamp": _now()})
5054 if line.startswith("[error]"):
5155 success = False
....@@ -69,7 +73,7 @@
6973 Restore a backup for the given project/env.
7074
7175 Uses Server-Sent Events (SSE) to stream real-time progress.
72
- Parameters are passed as query strings since EventSource only supports GET.
76
+ Runs on the host via nsenter for Python venv compatibility.
7377 """
7478 return StreamingResponse(
7579 _restore_generator(project, env, source, dry_run),
app/ops_runner.py
....@@ -10,58 +10,31 @@
1010 _DEFAULT_TIMEOUT = 300
1111 _BACKUP_TIMEOUT = 3600
1212
13
+# nsenter via Docker: run commands on the host from inside the container.
14
+# Required because ops backup/restore delegate to host Python venvs (3.12)
15
+# that are incompatible with the container's Python (3.11).
16
+_NSENTER_PREFIX = [
17
+ "docker", "run", "--rm", "-i",
18
+ "--privileged", "--pid=host", "--network=host",
19
+ "alpine",
20
+ "nsenter", "-t", "1", "-m", "-u", "-i", "-n", "-p", "--",
21
+]
22
+
23
+
24
+# ---------------------------------------------------------------------------
25
+# In-container execution (status, disk, health, docker commands)
26
+# ---------------------------------------------------------------------------
1327
1428 async def run_ops(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
15
- """
16
- Run the ops CLI with the given arguments and capture output.
17
- Returns {"success": bool, "output": str, "error": str}.
18
- """
19
- try:
20
- proc = await asyncio.create_subprocess_exec(
21
- OPS_CLI,
22
- *args,
23
- stdout=asyncio.subprocess.PIPE,
24
- stderr=asyncio.subprocess.PIPE,
25
- )
26
- try:
27
- stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
28
- except asyncio.TimeoutError:
29
- proc.kill()
30
- await proc.communicate()
31
- return {
32
- "success": False,
33
- "output": "",
34
- "error": f"Command timed out after {timeout}s",
35
- }
36
-
37
- return {
38
- "success": proc.returncode == 0,
39
- "output": stdout.decode("utf-8", errors="replace"),
40
- "error": stderr.decode("utf-8", errors="replace"),
41
- }
42
- except FileNotFoundError:
43
- return {
44
- "success": False,
45
- "output": "",
46
- "error": f"ops CLI not found at {OPS_CLI}",
47
- }
48
- except Exception as exc:
49
- return {
50
- "success": False,
51
- "output": "",
52
- "error": str(exc),
53
- }
29
+ """Run the ops CLI inside the container."""
30
+ return await _run_exec([OPS_CLI] + args, timeout=timeout)
5431
5532
5633 async def run_ops_json(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
57
- """
58
- Run the ops CLI with --json appended and return the parsed JSON output.
59
- Returns {"success": bool, "data": ..., "error": str}.
60
- """
34
+ """Run the ops CLI with --json and return parsed JSON."""
6135 result = await run_ops(args + ["--json"], timeout=timeout)
6236 if not result["success"]:
6337 return {"success": False, "data": None, "error": result["error"] or result["output"]}
64
-
6538 try:
6639 data = json.loads(result["output"])
6740 return {"success": True, "data": data, "error": ""}
....@@ -69,95 +42,48 @@
6942 return {
7043 "success": False,
7144 "data": None,
72
- "error": f"Failed to parse JSON output: {exc}\nRaw output: {result['output'][:500]}",
45
+ "error": f"Failed to parse JSON: {exc}\nRaw: {result['output'][:500]}",
7346 }
7447
7548
7649 async def stream_ops(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]:
77
- """
78
- Async generator that yields lines of stdout from the ops CLI.
79
- Also yields stderr lines prefixed with '[stderr] '.
80
- """
81
- try:
82
- proc = await asyncio.create_subprocess_exec(
83
- OPS_CLI,
84
- *args,
85
- stdout=asyncio.subprocess.PIPE,
86
- stderr=asyncio.subprocess.PIPE,
87
- )
88
- except FileNotFoundError:
89
- yield f"[error] ops CLI not found at {OPS_CLI}"
90
- return
91
- except Exception as exc:
92
- yield f"[error] Failed to start process: {exc}"
93
- return
94
-
95
- async def _read_stream(stream: asyncio.StreamReader, prefix: str = "") -> AsyncGenerator[str, None]:
96
- while True:
97
- try:
98
- line = await asyncio.wait_for(stream.readline(), timeout=timeout)
99
- except asyncio.TimeoutError:
100
- yield f"{prefix}[timeout] Command exceeded {timeout}s"
101
- break
102
- if not line:
103
- break
104
- yield prefix + line.decode("utf-8", errors="replace").rstrip("\n")
105
-
106
- # Interleave stdout and stderr
107
- stdout_gen = _read_stream(proc.stdout)
108
- stderr_gen = _read_stream(proc.stderr, prefix="[stderr] ")
109
-
110
- stdout_done = False
111
- stderr_done = False
112
-
113
- stdout_iter = stdout_gen.__aiter__()
114
- stderr_iter = stderr_gen.__aiter__()
115
-
116
- pending_stdout: asyncio.Task | None = None
117
- pending_stderr: asyncio.Task | None = None
118
-
119
- async def _next(it):
120
- try:
121
- return await it.__anext__()
122
- except StopAsyncIteration:
123
- return None
124
-
125
- pending_stdout = asyncio.create_task(_next(stdout_iter))
126
- pending_stderr = asyncio.create_task(_next(stderr_iter))
127
-
128
- while not (stdout_done and stderr_done):
129
- done, _ = await asyncio.wait(
130
- [t for t in [pending_stdout, pending_stderr] if t is not None],
131
- return_when=asyncio.FIRST_COMPLETED,
132
- )
133
-
134
- for task in done:
135
- val = task.result()
136
- if task is pending_stdout:
137
- if val is None:
138
- stdout_done = True
139
- pending_stdout = None
140
- else:
141
- yield val
142
- pending_stdout = asyncio.create_task(_next(stdout_iter))
143
- elif task is pending_stderr:
144
- if val is None:
145
- stderr_done = True
146
- pending_stderr = None
147
- else:
148
- yield val
149
- pending_stderr = asyncio.create_task(_next(stderr_iter))
150
-
151
- await proc.wait()
50
+ """Stream ops CLI output (in-container)."""
51
+ async for line in _stream_exec([OPS_CLI] + args, timeout=timeout):
52
+ yield line
15253
15354
154
-async def run_command(
155
- args: list[str], timeout: int = _DEFAULT_TIMEOUT
156
-) -> dict:
157
- """
158
- Generic command runner (non-ops). Accepts a full argv list.
159
- Returns {"success": bool, "output": str, "error": str}.
160
- """
55
+async def run_command(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
56
+ """Generic command runner (in-container)."""
57
+ return await _run_exec(args, timeout=timeout)
58
+
59
+
60
+async def stream_command(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]:
61
+ """Stream generic command output (in-container)."""
62
+ async for line in _stream_exec(args, timeout=timeout):
63
+ yield line
64
+
65
+
66
+# ---------------------------------------------------------------------------
67
+# Host execution (backup, restore — needs host Python venvs)
68
+# ---------------------------------------------------------------------------
69
+
70
+async def run_ops_host(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
71
+ """Run the ops CLI on the host via nsenter."""
72
+ return await _run_exec(_NSENTER_PREFIX + [OPS_CLI] + args, timeout=timeout)
73
+
74
+
75
+async def stream_ops_host(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]:
76
+ """Stream ops CLI output from the host via nsenter."""
77
+ async for line in _stream_exec(_NSENTER_PREFIX + [OPS_CLI] + args, timeout=timeout):
78
+ yield line
79
+
80
+
81
+# ---------------------------------------------------------------------------
82
+# Internal helpers
83
+# ---------------------------------------------------------------------------
84
+
85
+async def _run_exec(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
86
+ """Execute a command and capture output."""
16187 try:
16288 proc = await asyncio.create_subprocess_exec(
16389 *args,
....@@ -169,11 +95,7 @@
16995 except asyncio.TimeoutError:
17096 proc.kill()
17197 await proc.communicate()
172
- return {
173
- "success": False,
174
- "output": "",
175
- "error": f"Command timed out after {timeout}s",
176
- }
98
+ return {"success": False, "output": "", "error": f"Command timed out after {timeout}s"}
17799
178100 return {
179101 "success": proc.returncode == 0,
....@@ -186,12 +108,8 @@
186108 return {"success": False, "output": "", "error": str(exc)}
187109
188110
189
-async def stream_command(
190
- args: list[str], timeout: int = _DEFAULT_TIMEOUT
191
-) -> AsyncGenerator[str, None]:
192
- """
193
- Async generator that yields lines of stdout for an arbitrary command.
194
- """
111
+async def _stream_exec(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]:
112
+ """Execute a command and yield interleaved stdout/stderr lines."""
195113 try:
196114 proc = await asyncio.create_subprocess_exec(
197115 *args,
....@@ -202,24 +120,56 @@
202120 yield f"[error] Executable not found: {exc}"
203121 return
204122 except Exception as exc:
205
- yield f"[error] {exc}"
123
+ yield f"[error] Failed to start process: {exc}"
206124 return
207125
208
- while True:
209
- try:
210
- line = await asyncio.wait_for(proc.stdout.readline(), timeout=timeout)
211
- except asyncio.TimeoutError:
212
- yield f"[timeout] Command exceeded {timeout}s"
213
- proc.kill()
214
- break
215
- if not line:
216
- break
217
- yield line.decode("utf-8", errors="replace").rstrip("\n")
126
+ async def _readline(stream, prefix=""):
127
+ while True:
128
+ try:
129
+ line = await asyncio.wait_for(stream.readline(), timeout=timeout)
130
+ except asyncio.TimeoutError:
131
+ yield f"{prefix}[timeout] Command exceeded {timeout}s"
132
+ break
133
+ if not line:
134
+ break
135
+ yield prefix + line.decode("utf-8", errors="replace").rstrip("\n")
218136
219
- # Flush stderr as trailing info
220
- stderr_data = await proc.stderr.read()
221
- if stderr_data:
222
- for ln in stderr_data.decode("utf-8", errors="replace").splitlines():
223
- yield f"[stderr] {ln}"
137
+ stdout_gen = _readline(proc.stdout).__aiter__()
138
+ stderr_gen = _readline(proc.stderr, "[stderr] ").__aiter__()
139
+
140
+ stdout_done = stderr_done = False
141
+ pending_out = pending_err = None
142
+
143
+ async def _next(it):
144
+ try:
145
+ return await it.__anext__()
146
+ except StopAsyncIteration:
147
+ return None
148
+
149
+ pending_out = asyncio.create_task(_next(stdout_gen))
150
+ pending_err = asyncio.create_task(_next(stderr_gen))
151
+
152
+ while not (stdout_done and stderr_done):
153
+ tasks = [t for t in (pending_out, pending_err) if t is not None]
154
+ if not tasks:
155
+ break
156
+ done, _ = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
157
+
158
+ for task in done:
159
+ val = task.result()
160
+ if task is pending_out:
161
+ if val is None:
162
+ stdout_done = True
163
+ pending_out = None
164
+ else:
165
+ yield val
166
+ pending_out = asyncio.create_task(_next(stdout_gen))
167
+ elif task is pending_err:
168
+ if val is None:
169
+ stderr_done = True
170
+ pending_err = None
171
+ else:
172
+ yield val
173
+ pending_err = asyncio.create_task(_next(stderr_gen))
224174
225175 await proc.wait()
app/routers/backups.py
....@@ -3,7 +3,7 @@
33 from fastapi import APIRouter, Depends, HTTPException
44
55 from app.auth import verify_token
6
-from app.ops_runner import run_ops, run_ops_json, _BACKUP_TIMEOUT
6
+from app.ops_runner import run_ops, run_ops_json, run_ops_host, _BACKUP_TIMEOUT
77
88 router = APIRouter()
99
....@@ -12,15 +12,10 @@
1212 async def list_backups(
1313 _: str = Depends(verify_token),
1414 ) -> list[dict[str, Any]]:
15
- """
16
- Returns a list of local backup records from `ops backups --json`.
17
- """
15
+ """Returns a list of local backup records from `ops backups --json`."""
1816 result = await run_ops_json(["backups"])
1917 if not result["success"]:
20
- raise HTTPException(
21
- status_code=500,
22
- detail=f"Failed to list backups: {result['error']}",
23
- )
18
+ raise HTTPException(status_code=500, detail=f"Failed to list backups: {result['error']}")
2419
2520 data = result["data"]
2621 if isinstance(data, list):
....@@ -37,9 +32,7 @@
3732 async def list_offsite_backups(
3833 _: str = Depends(verify_token),
3934 ) -> list[dict[str, Any]]:
40
- """
41
- Returns a list of offsite backup records, querying each project separately.
42
- """
35
+ """Returns a list of offsite backup records."""
4336 all_backups = []
4437 for project in ["mdf", "seriousletter"]:
4538 result = await run_ops_json(["offsite", "list", project])
....@@ -57,9 +50,12 @@
5750 _: str = Depends(verify_token),
5851 ) -> dict[str, Any]:
5952 """
60
- Runs `ops backup {project} {env}` and returns the result.
53
+ Runs `ops backup {project} {env}` on the host.
54
+
55
+ Runs via nsenter because ops backup delegates to project CLIs
56
+ that use host Python venvs.
6157 """
62
- result = await run_ops(["backup", project, env], timeout=_BACKUP_TIMEOUT)
58
+ result = await run_ops_host(["backup", project, env], timeout=_BACKUP_TIMEOUT)
6359 if not result["success"]:
6460 raise HTTPException(
6561 status_code=500,
....@@ -79,10 +75,8 @@
7975 env: str,
8076 _: str = Depends(verify_token),
8177 ) -> dict[str, Any]:
82
- """
83
- Runs `ops offsite upload {project} {env}` and returns the result.
84
- """
85
- result = await run_ops(
78
+ """Runs `ops offsite upload {project} {env}` on the host."""
79
+ result = await run_ops_host(
8680 ["offsite", "upload", project, env], timeout=_BACKUP_TIMEOUT
8781 )
8882 if not result["success"]:
....@@ -90,28 +84,18 @@
9084 status_code=500,
9185 detail=f"Offsite upload failed: {result['error'] or result['output']}",
9286 )
93
- return {
94
- "success": True,
95
- "output": result["output"],
96
- "project": project,
97
- "env": env,
98
- }
87
+ return {"success": True, "output": result["output"], "project": project, "env": env}
9988
10089
10190 @router.post("/offsite/retention", summary="Apply offsite retention policy")
10291 async def apply_retention(
10392 _: str = Depends(verify_token),
10493 ) -> dict[str, Any]:
105
- """
106
- Runs `ops offsite retention` and returns the result.
107
- """
108
- result = await run_ops(["offsite", "retention"], timeout=_BACKUP_TIMEOUT)
94
+ """Runs `ops offsite retention` on the host."""
95
+ result = await run_ops_host(["offsite", "retention"], timeout=_BACKUP_TIMEOUT)
10996 if not result["success"]:
11097 raise HTTPException(
11198 status_code=500,
11299 detail=f"Retention policy failed: {result['error'] or result['output']}",
113100 )
114
- return {
115
- "success": True,
116
- "output": result["output"],
117
- }
101
+ return {"success": True, "output": result["output"]}
app/routers/restore.py
....@@ -6,7 +6,7 @@
66 from fastapi.responses import StreamingResponse
77
88 from app.auth import verify_token
9
-from app.ops_runner import _BACKUP_TIMEOUT, stream_ops
9
+from app.ops_runner import _BACKUP_TIMEOUT, stream_ops_host
1010
1111 router = APIRouter()
1212
....@@ -22,18 +22,22 @@
2222 source: str,
2323 dry_run: bool,
2424 ) -> AsyncGenerator[str, None]:
25
- """Async generator that drives the restore workflow and yields SSE events."""
25
+ """Async generator that drives the restore workflow and yields SSE events.
26
+
27
+ Runs on the host via nsenter because ops restore delegates to project CLIs
28
+ that use host Python venvs incompatible with the container's Python.
29
+ """
2630 base_args = ["restore", project, env]
2731 if dry_run:
2832 base_args.append("--dry-run")
2933
3034 if source == "offsite":
31
- # ops offsite restore <project> <env> — downloads from offsite storage
35
+ # ops offsite restore <project> <env>
3236 download_args = ["offsite", "restore", project, env]
3337 yield _sse_line({"line": f"Downloading {project}/{env} from offsite...", "timestamp": _now()})
3438
3539 download_ok = True
36
- async for line in stream_ops(download_args, timeout=_BACKUP_TIMEOUT):
40
+ async for line in stream_ops_host(download_args, timeout=_BACKUP_TIMEOUT):
3741 yield _sse_line({"line": line, "timestamp": _now()})
3842 if line.startswith("[error]"):
3943 download_ok = False
....@@ -45,7 +49,7 @@
4549 yield _sse_line({"line": "Download complete. Starting restore...", "timestamp": _now()})
4650
4751 success = True
48
- async for line in stream_ops(base_args, timeout=_BACKUP_TIMEOUT):
52
+ async for line in stream_ops_host(base_args, timeout=_BACKUP_TIMEOUT):
4953 yield _sse_line({"line": line, "timestamp": _now()})
5054 if line.startswith("[error]"):
5155 success = False
....@@ -69,7 +73,7 @@
6973 Restore a backup for the given project/env.
7074
7175 Uses Server-Sent Events (SSE) to stream real-time progress.
72
- Parameters are passed as query strings since EventSource only supports GET.
76
+ Runs on the host via nsenter for Python venv compatibility.
7377 """
7478 return StreamingResponse(
7579 _restore_generator(project, env, source, dry_run),