From 7300351bb9fb147f4de81a60423c4561a4924c21 Mon Sep 17 00:00:00 2001
From: Matthias Nott <mnott@mnsoft.org>
Date: Sat, 21 Feb 2026 16:53:38 +0100
Subject: [PATCH] fix: Run backup/restore on host via nsenter for Python venv compatibility

---
 app/app/ops_runner.py      |  258 +++++++++--------------
 app/routers/backups.py     |   46 +--
 app/app/routers/backups.py |   46 +--
 app/routers/restore.py     |   16 
 app/app/routers/restore.py |   16 
 app/ops_runner.py          |  258 +++++++++--------------
 6 files changed, 258 insertions(+), 382 deletions(-)

diff --git a/app/app/ops_runner.py b/app/app/ops_runner.py
index 7b05679..226fdaa 100644
--- a/app/app/ops_runner.py
+++ b/app/app/ops_runner.py
@@ -10,58 +10,31 @@
 _DEFAULT_TIMEOUT = 300
 _BACKUP_TIMEOUT = 3600
 
+# nsenter via Docker: run commands on the host from inside the container.
+# Required because ops backup/restore delegate to host Python venvs (3.12)
+# that are incompatible with the container's Python (3.11).
+_NSENTER_PREFIX = [
+    "docker", "run", "--rm", "-i",
+    "--privileged", "--pid=host", "--network=host",
+    "alpine",
+    "nsenter", "-t", "1", "-m", "-u", "-i", "-n", "-p", "--",
+]
+
+
+# ---------------------------------------------------------------------------
+# In-container execution (status, disk, health, docker commands)
+# ---------------------------------------------------------------------------
 
 async def run_ops(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
-    """
-    Run the ops CLI with the given arguments and capture output.
-    Returns {"success": bool, "output": str, "error": str}.
-    """
-    try:
-        proc = await asyncio.create_subprocess_exec(
-            OPS_CLI,
-            *args,
-            stdout=asyncio.subprocess.PIPE,
-            stderr=asyncio.subprocess.PIPE,
-        )
-        try:
-            stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
-        except asyncio.TimeoutError:
-            proc.kill()
-            await proc.communicate()
-            return {
-                "success": False,
-                "output": "",
-                "error": f"Command timed out after {timeout}s",
-            }
-
-        return {
-            "success": proc.returncode == 0,
-            "output": stdout.decode("utf-8", errors="replace"),
-            "error": stderr.decode("utf-8", errors="replace"),
-        }
-    except FileNotFoundError:
-        return {
-            "success": False,
-            "output": "",
-            "error": f"ops CLI not found at {OPS_CLI}",
-        }
-    except Exception as exc:
-        return {
-            "success": False,
-            "output": "",
-            "error": str(exc),
-        }
+    """Run the ops CLI inside the container."""
+    return await _run_exec([OPS_CLI] + args, timeout=timeout)
 
 
 async def run_ops_json(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
-    """
-    Run the ops CLI with --json appended and return the parsed JSON output.
-    Returns {"success": bool, "data": ..., "error": str}.
-    """
+    """Run the ops CLI with --json and return parsed JSON."""
     result = await run_ops(args + ["--json"], timeout=timeout)
     if not result["success"]:
         return {"success": False, "data": None, "error": result["error"] or result["output"]}
-
     try:
         data = json.loads(result["output"])
         return {"success": True, "data": data, "error": ""}
@@ -69,95 +42,48 @@
         return {
             "success": False,
             "data": None,
-            "error": f"Failed to parse JSON output: {exc}\nRaw output: {result['output'][:500]}",
+            "error": f"Failed to parse JSON: {exc}\nRaw: {result['output'][:500]}",
         }
 
 
 async def stream_ops(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]:
-    """
-    Async generator that yields lines of stdout from the ops CLI.
-    Also yields stderr lines prefixed with '[stderr] '.
-    """
-    try:
-        proc = await asyncio.create_subprocess_exec(
-            OPS_CLI,
-            *args,
-            stdout=asyncio.subprocess.PIPE,
-            stderr=asyncio.subprocess.PIPE,
-        )
-    except FileNotFoundError:
-        yield f"[error] ops CLI not found at {OPS_CLI}"
-        return
-    except Exception as exc:
-        yield f"[error] Failed to start process: {exc}"
-        return
-
-    async def _read_stream(stream: asyncio.StreamReader, prefix: str = "") -> AsyncGenerator[str, None]:
-        while True:
-            try:
-                line = await asyncio.wait_for(stream.readline(), timeout=timeout)
-            except asyncio.TimeoutError:
-                yield f"{prefix}[timeout] Command exceeded {timeout}s"
-                break
-            if not line:
-                break
-            yield prefix + line.decode("utf-8", errors="replace").rstrip("\n")
-
-    # Interleave stdout and stderr
-    stdout_gen = _read_stream(proc.stdout)
-    stderr_gen = _read_stream(proc.stderr, prefix="[stderr] ")
-
-    stdout_done = False
-    stderr_done = False
-
-    stdout_iter = stdout_gen.__aiter__()
-    stderr_iter = stderr_gen.__aiter__()
-
-    pending_stdout: asyncio.Task | None = None
-    pending_stderr: asyncio.Task | None = None
-
-    async def _next(it):
-        try:
-            return await it.__anext__()
-        except StopAsyncIteration:
-            return None
-
-    pending_stdout = asyncio.create_task(_next(stdout_iter))
-    pending_stderr = asyncio.create_task(_next(stderr_iter))
-
-    while not (stdout_done and stderr_done):
-        done, _ = await asyncio.wait(
-            [t for t in [pending_stdout, pending_stderr] if t is not None],
-            return_when=asyncio.FIRST_COMPLETED,
-        )
-
-        for task in done:
-            val = task.result()
-            if task is pending_stdout:
-                if val is None:
-                    stdout_done = True
-                    pending_stdout = None
-                else:
-                    yield val
-                    pending_stdout = asyncio.create_task(_next(stdout_iter))
-            elif task is pending_stderr:
-                if val is None:
-                    stderr_done = True
-                    pending_stderr = None
-                else:
-                    yield val
-                    pending_stderr = asyncio.create_task(_next(stderr_iter))
-
-    await proc.wait()
+    """Stream ops CLI output (in-container)."""
+    async for line in _stream_exec([OPS_CLI] + args, timeout=timeout):
+        yield line
 
 
-async def run_command(
-    args: list[str], timeout: int = _DEFAULT_TIMEOUT
-) -> dict:
-    """
-    Generic command runner (non-ops). Accepts a full argv list.
-    Returns {"success": bool, "output": str, "error": str}.
-    """
+async def run_command(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
+    """Generic command runner (in-container)."""
+    return await _run_exec(args, timeout=timeout)
+
+
+async def stream_command(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]:
+    """Stream generic command output (in-container)."""
+    async for line in _stream_exec(args, timeout=timeout):
+        yield line
+
+
+# ---------------------------------------------------------------------------
+# Host execution (backup, restore — needs host Python venvs)
+# ---------------------------------------------------------------------------
+
+async def run_ops_host(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
+    """Run the ops CLI on the host via nsenter."""
+    return await _run_exec(_NSENTER_PREFIX + [OPS_CLI] + args, timeout=timeout)
+
+
+async def stream_ops_host(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]:
+    """Stream ops CLI output from the host via nsenter."""
+    async for line in _stream_exec(_NSENTER_PREFIX + [OPS_CLI] + args, timeout=timeout):
+        yield line
+
+
+# ---------------------------------------------------------------------------
+# Internal helpers
+# ---------------------------------------------------------------------------
+
+async def _run_exec(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
+    """Execute a command and capture output."""
     try:
         proc = await asyncio.create_subprocess_exec(
             *args,
@@ -169,11 +95,7 @@
         except asyncio.TimeoutError:
             proc.kill()
             await proc.communicate()
-            return {
-                "success": False,
-                "output": "",
-                "error": f"Command timed out after {timeout}s",
-            }
+            return {"success": False, "output": "", "error": f"Command timed out after {timeout}s"}
 
         return {
             "success": proc.returncode == 0,
@@ -186,12 +108,8 @@
         return {"success": False, "output": "", "error": str(exc)}
 
 
-async def stream_command(
-    args: list[str], timeout: int = _DEFAULT_TIMEOUT
-) -> AsyncGenerator[str, None]:
-    """
-    Async generator that yields lines of stdout for an arbitrary command.
-    """
+async def _stream_exec(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]:
+    """Execute a command and yield interleaved stdout/stderr lines."""
     try:
         proc = await asyncio.create_subprocess_exec(
             *args,
@@ -202,24 +120,56 @@
         yield f"[error] Executable not found: {exc}"
         return
     except Exception as exc:
-        yield f"[error] {exc}"
+        yield f"[error] Failed to start process: {exc}"
         return
 
-    while True:
-        try:
-            line = await asyncio.wait_for(proc.stdout.readline(), timeout=timeout)
-        except asyncio.TimeoutError:
-            yield f"[timeout] Command exceeded {timeout}s"
-            proc.kill()
-            break
-        if not line:
-            break
-        yield line.decode("utf-8", errors="replace").rstrip("\n")
+    async def _readline(stream, prefix=""):
+        while True:
+            try:
+                line = await asyncio.wait_for(stream.readline(), timeout=timeout)
+            except asyncio.TimeoutError:
+                yield f"{prefix}[timeout] Command exceeded {timeout}s"
+                break
+            if not line:
+                break
+            yield prefix + line.decode("utf-8", errors="replace").rstrip("\n")
 
-    # Flush stderr as trailing info
-    stderr_data = await proc.stderr.read()
-    if stderr_data:
-        for ln in stderr_data.decode("utf-8", errors="replace").splitlines():
-            yield f"[stderr] {ln}"
+    stdout_gen = _readline(proc.stdout).__aiter__()
+    stderr_gen = _readline(proc.stderr, "[stderr] ").__aiter__()
+
+    stdout_done = stderr_done = False
+    pending_out = pending_err = None
+
+    async def _next(it):
+        try:
+            return await it.__anext__()
+        except StopAsyncIteration:
+            return None
+
+    pending_out = asyncio.create_task(_next(stdout_gen))
+    pending_err = asyncio.create_task(_next(stderr_gen))
+
+    while not (stdout_done and stderr_done):
+        tasks = [t for t in (pending_out, pending_err) if t is not None]
+        if not tasks:
+            break
+        done, _ = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
+
+        for task in done:
+            val = task.result()
+            if task is pending_out:
+                if val is None:
+                    stdout_done = True
+                    pending_out = None
+                else:
+                    yield val
+                    pending_out = asyncio.create_task(_next(stdout_gen))
+            elif task is pending_err:
+                if val is None:
+                    stderr_done = True
+                    pending_err = None
+                else:
+                    yield val
+                    pending_err = asyncio.create_task(_next(stderr_gen))
 
     await proc.wait()
diff --git a/app/app/routers/backups.py b/app/app/routers/backups.py
index d0a6f81..d1cf26e 100644
--- a/app/app/routers/backups.py
+++ b/app/app/routers/backups.py
@@ -3,7 +3,7 @@
 from fastapi import APIRouter, Depends, HTTPException
 
 from app.auth import verify_token
-from app.ops_runner import run_ops, run_ops_json, _BACKUP_TIMEOUT
+from app.ops_runner import run_ops, run_ops_json, run_ops_host, _BACKUP_TIMEOUT
 
 router = APIRouter()
 
@@ -12,15 +12,10 @@
 async def list_backups(
     _: str = Depends(verify_token),
 ) -> list[dict[str, Any]]:
-    """
-    Returns a list of local backup records from `ops backups --json`.
-    """
+    """Returns a list of local backup records from `ops backups --json`."""
     result = await run_ops_json(["backups"])
     if not result["success"]:
-        raise HTTPException(
-            status_code=500,
-            detail=f"Failed to list backups: {result['error']}",
-        )
+        raise HTTPException(status_code=500, detail=f"Failed to list backups: {result['error']}")
 
     data = result["data"]
     if isinstance(data, list):
@@ -37,9 +32,7 @@
 async def list_offsite_backups(
     _: str = Depends(verify_token),
 ) -> list[dict[str, Any]]:
-    """
-    Returns a list of offsite backup records, querying each project separately.
-    """
+    """Returns a list of offsite backup records."""
     all_backups = []
     for project in ["mdf", "seriousletter"]:
         result = await run_ops_json(["offsite", "list", project])
@@ -57,9 +50,12 @@
     _: str = Depends(verify_token),
 ) -> dict[str, Any]:
     """
-    Runs `ops backup {project} {env}` and returns the result.
+    Runs `ops backup {project} {env}` on the host.
+
+    Runs via nsenter because ops backup delegates to project CLIs
+    that use host Python venvs.
     """
-    result = await run_ops(["backup", project, env], timeout=_BACKUP_TIMEOUT)
+    result = await run_ops_host(["backup", project, env], timeout=_BACKUP_TIMEOUT)
     if not result["success"]:
         raise HTTPException(
             status_code=500,
@@ -79,10 +75,8 @@
     env: str,
     _: str = Depends(verify_token),
 ) -> dict[str, Any]:
-    """
-    Runs `ops offsite upload {project} {env}` and returns the result.
-    """
-    result = await run_ops(
+    """Runs `ops offsite upload {project} {env}` on the host."""
+    result = await run_ops_host(
         ["offsite", "upload", project, env], timeout=_BACKUP_TIMEOUT
     )
     if not result["success"]:
@@ -90,28 +84,18 @@
             status_code=500,
             detail=f"Offsite upload failed: {result['error'] or result['output']}",
         )
-    return {
-        "success": True,
-        "output": result["output"],
-        "project": project,
-        "env": env,
-    }
+    return {"success": True, "output": result["output"], "project": project, "env": env}
 
 
 @router.post("/offsite/retention", summary="Apply offsite retention policy")
 async def apply_retention(
     _: str = Depends(verify_token),
 ) -> dict[str, Any]:
-    """
-    Runs `ops offsite retention` and returns the result.
-    """
-    result = await run_ops(["offsite", "retention"], timeout=_BACKUP_TIMEOUT)
+    """Runs `ops offsite retention` on the host."""
+    result = await run_ops_host(["offsite", "retention"], timeout=_BACKUP_TIMEOUT)
     if not result["success"]:
         raise HTTPException(
             status_code=500,
             detail=f"Retention policy failed: {result['error'] or result['output']}",
         )
-    return {
-        "success": True,
-        "output": result["output"],
-    }
+    return {"success": True, "output": result["output"]}
diff --git a/app/app/routers/restore.py b/app/app/routers/restore.py
index b487952..d03428e 100644
--- a/app/app/routers/restore.py
+++ b/app/app/routers/restore.py
@@ -6,7 +6,7 @@
 from fastapi.responses import StreamingResponse
 
 from app.auth import verify_token
-from app.ops_runner import _BACKUP_TIMEOUT, stream_ops
+from app.ops_runner import _BACKUP_TIMEOUT, stream_ops_host
 
 router = APIRouter()
 
@@ -22,18 +22,22 @@
     source: str,
     dry_run: bool,
 ) -> AsyncGenerator[str, None]:
-    """Async generator that drives the restore workflow and yields SSE events."""
+    """Async generator that drives the restore workflow and yields SSE events.
+
+    Runs on the host via nsenter because ops restore delegates to project CLIs
+    that use host Python venvs incompatible with the container's Python.
+    """
     base_args = ["restore", project, env]
     if dry_run:
         base_args.append("--dry-run")
 
     if source == "offsite":
-        # ops offsite restore <project> <env> — downloads from offsite storage
+        # ops offsite restore <project> <env>
         download_args = ["offsite", "restore", project, env]
         yield _sse_line({"line": f"Downloading {project}/{env} from offsite...", "timestamp": _now()})
 
         download_ok = True
-        async for line in stream_ops(download_args, timeout=_BACKUP_TIMEOUT):
+        async for line in stream_ops_host(download_args, timeout=_BACKUP_TIMEOUT):
             yield _sse_line({"line": line, "timestamp": _now()})
             if line.startswith("[error]"):
                 download_ok = False
@@ -45,7 +49,7 @@
         yield _sse_line({"line": "Download complete. Starting restore...", "timestamp": _now()})
 
     success = True
-    async for line in stream_ops(base_args, timeout=_BACKUP_TIMEOUT):
+    async for line in stream_ops_host(base_args, timeout=_BACKUP_TIMEOUT):
         yield _sse_line({"line": line, "timestamp": _now()})
         if line.startswith("[error]"):
             success = False
@@ -69,7 +73,7 @@
     Restore a backup for the given project/env.
 
     Uses Server-Sent Events (SSE) to stream real-time progress.
-    Parameters are passed as query strings since EventSource only supports GET.
+    Runs on the host via nsenter for Python venv compatibility.
     """
     return StreamingResponse(
         _restore_generator(project, env, source, dry_run),
diff --git a/app/ops_runner.py b/app/ops_runner.py
index 7b05679..226fdaa 100644
--- a/app/ops_runner.py
+++ b/app/ops_runner.py
@@ -10,58 +10,31 @@
 _DEFAULT_TIMEOUT = 300
 _BACKUP_TIMEOUT = 3600
 
+# nsenter via Docker: run commands on the host from inside the container.
+# Required because ops backup/restore delegate to host Python venvs (3.12)
+# that are incompatible with the container's Python (3.11).
+_NSENTER_PREFIX = [
+    "docker", "run", "--rm", "-i",
+    "--privileged", "--pid=host", "--network=host",
+    "alpine",
+    "nsenter", "-t", "1", "-m", "-u", "-i", "-n", "-p", "--",
+]
+
+
+# ---------------------------------------------------------------------------
+# In-container execution (status, disk, health, docker commands)
+# ---------------------------------------------------------------------------
 
 async def run_ops(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
-    """
-    Run the ops CLI with the given arguments and capture output.
-    Returns {"success": bool, "output": str, "error": str}.
-    """
-    try:
-        proc = await asyncio.create_subprocess_exec(
-            OPS_CLI,
-            *args,
-            stdout=asyncio.subprocess.PIPE,
-            stderr=asyncio.subprocess.PIPE,
-        )
-        try:
-            stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
-        except asyncio.TimeoutError:
-            proc.kill()
-            await proc.communicate()
-            return {
-                "success": False,
-                "output": "",
-                "error": f"Command timed out after {timeout}s",
-            }
-
-        return {
-            "success": proc.returncode == 0,
-            "output": stdout.decode("utf-8", errors="replace"),
-            "error": stderr.decode("utf-8", errors="replace"),
-        }
-    except FileNotFoundError:
-        return {
-            "success": False,
-            "output": "",
-            "error": f"ops CLI not found at {OPS_CLI}",
-        }
-    except Exception as exc:
-        return {
-            "success": False,
-            "output": "",
-            "error": str(exc),
-        }
+    """Run the ops CLI inside the container."""
+    return await _run_exec([OPS_CLI] + args, timeout=timeout)
 
 
 async def run_ops_json(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
-    """
-    Run the ops CLI with --json appended and return the parsed JSON output.
-    Returns {"success": bool, "data": ..., "error": str}.
-    """
+    """Run the ops CLI with --json and return parsed JSON."""
     result = await run_ops(args + ["--json"], timeout=timeout)
     if not result["success"]:
         return {"success": False, "data": None, "error": result["error"] or result["output"]}
-
     try:
         data = json.loads(result["output"])
         return {"success": True, "data": data, "error": ""}
@@ -69,95 +42,48 @@
         return {
             "success": False,
             "data": None,
-            "error": f"Failed to parse JSON output: {exc}\nRaw output: {result['output'][:500]}",
+            "error": f"Failed to parse JSON: {exc}\nRaw: {result['output'][:500]}",
         }
 
 
 async def stream_ops(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]:
-    """
-    Async generator that yields lines of stdout from the ops CLI.
-    Also yields stderr lines prefixed with '[stderr] '.
-    """
-    try:
-        proc = await asyncio.create_subprocess_exec(
-            OPS_CLI,
-            *args,
-            stdout=asyncio.subprocess.PIPE,
-            stderr=asyncio.subprocess.PIPE,
-        )
-    except FileNotFoundError:
-        yield f"[error] ops CLI not found at {OPS_CLI}"
-        return
-    except Exception as exc:
-        yield f"[error] Failed to start process: {exc}"
-        return
-
-    async def _read_stream(stream: asyncio.StreamReader, prefix: str = "") -> AsyncGenerator[str, None]:
-        while True:
-            try:
-                line = await asyncio.wait_for(stream.readline(), timeout=timeout)
-            except asyncio.TimeoutError:
-                yield f"{prefix}[timeout] Command exceeded {timeout}s"
-                break
-            if not line:
-                break
-            yield prefix + line.decode("utf-8", errors="replace").rstrip("\n")
-
-    # Interleave stdout and stderr
-    stdout_gen = _read_stream(proc.stdout)
-    stderr_gen = _read_stream(proc.stderr, prefix="[stderr] ")
-
-    stdout_done = False
-    stderr_done = False
-
-    stdout_iter = stdout_gen.__aiter__()
-    stderr_iter = stderr_gen.__aiter__()
-
-    pending_stdout: asyncio.Task | None = None
-    pending_stderr: asyncio.Task | None = None
-
-    async def _next(it):
-        try:
-            return await it.__anext__()
-        except StopAsyncIteration:
-            return None
-
-    pending_stdout = asyncio.create_task(_next(stdout_iter))
-    pending_stderr = asyncio.create_task(_next(stderr_iter))
-
-    while not (stdout_done and stderr_done):
-        done, _ = await asyncio.wait(
-            [t for t in [pending_stdout, pending_stderr] if t is not None],
-            return_when=asyncio.FIRST_COMPLETED,
-        )
-
-        for task in done:
-            val = task.result()
-            if task is pending_stdout:
-                if val is None:
-                    stdout_done = True
-                    pending_stdout = None
-                else:
-                    yield val
-                    pending_stdout = asyncio.create_task(_next(stdout_iter))
-            elif task is pending_stderr:
-                if val is None:
-                    stderr_done = True
-                    pending_stderr = None
-                else:
-                    yield val
-                    pending_stderr = asyncio.create_task(_next(stderr_iter))
-
-    await proc.wait()
+    """Stream ops CLI output (in-container)."""
+    async for line in _stream_exec([OPS_CLI] + args, timeout=timeout):
+        yield line
 
 
-async def run_command(
-    args: list[str], timeout: int = _DEFAULT_TIMEOUT
-) -> dict:
-    """
-    Generic command runner (non-ops). Accepts a full argv list.
-    Returns {"success": bool, "output": str, "error": str}.
-    """
+async def run_command(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
+    """Generic command runner (in-container)."""
+    return await _run_exec(args, timeout=timeout)
+
+
+async def stream_command(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]:
+    """Stream generic command output (in-container)."""
+    async for line in _stream_exec(args, timeout=timeout):
+        yield line
+
+
+# ---------------------------------------------------------------------------
+# Host execution (backup, restore — needs host Python venvs)
+# ---------------------------------------------------------------------------
+
+async def run_ops_host(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
+    """Run the ops CLI on the host via nsenter."""
+    return await _run_exec(_NSENTER_PREFIX + [OPS_CLI] + args, timeout=timeout)
+
+
+async def stream_ops_host(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]:
+    """Stream ops CLI output from the host via nsenter."""
+    async for line in _stream_exec(_NSENTER_PREFIX + [OPS_CLI] + args, timeout=timeout):
+        yield line
+
+
+# ---------------------------------------------------------------------------
+# Internal helpers
+# ---------------------------------------------------------------------------
+
+async def _run_exec(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
+    """Execute a command and capture output."""
     try:
         proc = await asyncio.create_subprocess_exec(
             *args,
@@ -169,11 +95,7 @@
         except asyncio.TimeoutError:
             proc.kill()
             await proc.communicate()
-            return {
-                "success": False,
-                "output": "",
-                "error": f"Command timed out after {timeout}s",
-            }
+            return {"success": False, "output": "", "error": f"Command timed out after {timeout}s"}
 
         return {
             "success": proc.returncode == 0,
@@ -186,12 +108,8 @@
         return {"success": False, "output": "", "error": str(exc)}
 
 
-async def stream_command(
-    args: list[str], timeout: int = _DEFAULT_TIMEOUT
-) -> AsyncGenerator[str, None]:
-    """
-    Async generator that yields lines of stdout for an arbitrary command.
-    """
+async def _stream_exec(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]:
+    """Execute a command and yield interleaved stdout/stderr lines."""
     try:
         proc = await asyncio.create_subprocess_exec(
             *args,
@@ -202,24 +120,56 @@
         yield f"[error] Executable not found: {exc}"
         return
     except Exception as exc:
-        yield f"[error] {exc}"
+        yield f"[error] Failed to start process: {exc}"
         return
 
-    while True:
-        try:
-            line = await asyncio.wait_for(proc.stdout.readline(), timeout=timeout)
-        except asyncio.TimeoutError:
-            yield f"[timeout] Command exceeded {timeout}s"
-            proc.kill()
-            break
-        if not line:
-            break
-        yield line.decode("utf-8", errors="replace").rstrip("\n")
+    async def _readline(stream, prefix=""):
+        while True:
+            try:
+                line = await asyncio.wait_for(stream.readline(), timeout=timeout)
+            except asyncio.TimeoutError:
+                yield f"{prefix}[timeout] Command exceeded {timeout}s"
+                break
+            if not line:
+                break
+            yield prefix + line.decode("utf-8", errors="replace").rstrip("\n")
 
-    # Flush stderr as trailing info
-    stderr_data = await proc.stderr.read()
-    if stderr_data:
-        for ln in stderr_data.decode("utf-8", errors="replace").splitlines():
-            yield f"[stderr] {ln}"
+    stdout_gen = _readline(proc.stdout).__aiter__()
+    stderr_gen = _readline(proc.stderr, "[stderr] ").__aiter__()
+
+    stdout_done = stderr_done = False
+    pending_out = pending_err = None
+
+    async def _next(it):
+        try:
+            return await it.__anext__()
+        except StopAsyncIteration:
+            return None
+
+    pending_out = asyncio.create_task(_next(stdout_gen))
+    pending_err = asyncio.create_task(_next(stderr_gen))
+
+    while not (stdout_done and stderr_done):
+        tasks = [t for t in (pending_out, pending_err) if t is not None]
+        if not tasks:
+            break
+        done, _ = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
+
+        for task in done:
+            val = task.result()
+            if task is pending_out:
+                if val is None:
+                    stdout_done = True
+                    pending_out = None
+                else:
+                    yield val
+                    pending_out = asyncio.create_task(_next(stdout_gen))
+            elif task is pending_err:
+                if val is None:
+                    stderr_done = True
+                    pending_err = None
+                else:
+                    yield val
+                    pending_err = asyncio.create_task(_next(stderr_gen))
 
     await proc.wait()
diff --git a/app/routers/backups.py b/app/routers/backups.py
index d0a6f81..d1cf26e 100644
--- a/app/routers/backups.py
+++ b/app/routers/backups.py
@@ -3,7 +3,7 @@
 from fastapi import APIRouter, Depends, HTTPException
 
 from app.auth import verify_token
-from app.ops_runner import run_ops, run_ops_json, _BACKUP_TIMEOUT
+from app.ops_runner import run_ops, run_ops_json, run_ops_host, _BACKUP_TIMEOUT
 
 router = APIRouter()
 
@@ -12,15 +12,10 @@
 async def list_backups(
     _: str = Depends(verify_token),
 ) -> list[dict[str, Any]]:
-    """
-    Returns a list of local backup records from `ops backups --json`.
-    """
+    """Returns a list of local backup records from `ops backups --json`."""
     result = await run_ops_json(["backups"])
     if not result["success"]:
-        raise HTTPException(
-            status_code=500,
-            detail=f"Failed to list backups: {result['error']}",
-        )
+        raise HTTPException(status_code=500, detail=f"Failed to list backups: {result['error']}")
 
     data = result["data"]
     if isinstance(data, list):
@@ -37,9 +32,7 @@
 async def list_offsite_backups(
     _: str = Depends(verify_token),
 ) -> list[dict[str, Any]]:
-    """
-    Returns a list of offsite backup records, querying each project separately.
-    """
+    """Returns a list of offsite backup records."""
     all_backups = []
     for project in ["mdf", "seriousletter"]:
         result = await run_ops_json(["offsite", "list", project])
@@ -57,9 +50,12 @@
     _: str = Depends(verify_token),
 ) -> dict[str, Any]:
     """
-    Runs `ops backup {project} {env}` and returns the result.
+    Runs `ops backup {project} {env}` on the host.
+
+    Runs via nsenter because ops backup delegates to project CLIs
+    that use host Python venvs.
     """
-    result = await run_ops(["backup", project, env], timeout=_BACKUP_TIMEOUT)
+    result = await run_ops_host(["backup", project, env], timeout=_BACKUP_TIMEOUT)
     if not result["success"]:
         raise HTTPException(
             status_code=500,
@@ -79,10 +75,8 @@
     env: str,
     _: str = Depends(verify_token),
 ) -> dict[str, Any]:
-    """
-    Runs `ops offsite upload {project} {env}` and returns the result.
-    """
-    result = await run_ops(
+    """Runs `ops offsite upload {project} {env}` on the host."""
+    result = await run_ops_host(
         ["offsite", "upload", project, env], timeout=_BACKUP_TIMEOUT
     )
     if not result["success"]:
@@ -90,28 +84,18 @@
             status_code=500,
             detail=f"Offsite upload failed: {result['error'] or result['output']}",
         )
-    return {
-        "success": True,
-        "output": result["output"],
-        "project": project,
-        "env": env,
-    }
+    return {"success": True, "output": result["output"], "project": project, "env": env}
 
 
 @router.post("/offsite/retention", summary="Apply offsite retention policy")
 async def apply_retention(
     _: str = Depends(verify_token),
 ) -> dict[str, Any]:
-    """
-    Runs `ops offsite retention` and returns the result.
-    """
-    result = await run_ops(["offsite", "retention"], timeout=_BACKUP_TIMEOUT)
+    """Runs `ops offsite retention` on the host."""
+    result = await run_ops_host(["offsite", "retention"], timeout=_BACKUP_TIMEOUT)
     if not result["success"]:
         raise HTTPException(
             status_code=500,
             detail=f"Retention policy failed: {result['error'] or result['output']}",
         )
-    return {
-        "success": True,
-        "output": result["output"],
-    }
+    return {"success": True, "output": result["output"]}
diff --git a/app/routers/restore.py b/app/routers/restore.py
index b487952..d03428e 100644
--- a/app/routers/restore.py
+++ b/app/routers/restore.py
@@ -6,7 +6,7 @@
 from fastapi.responses import StreamingResponse
 
 from app.auth import verify_token
-from app.ops_runner import _BACKUP_TIMEOUT, stream_ops
+from app.ops_runner import _BACKUP_TIMEOUT, stream_ops_host
 
 router = APIRouter()
 
@@ -22,18 +22,22 @@
     source: str,
     dry_run: bool,
 ) -> AsyncGenerator[str, None]:
-    """Async generator that drives the restore workflow and yields SSE events."""
+    """Async generator that drives the restore workflow and yields SSE events.
+
+    Runs on the host via nsenter because ops restore delegates to project CLIs
+    that use host Python venvs incompatible with the container's Python.
+    """
     base_args = ["restore", project, env]
     if dry_run:
         base_args.append("--dry-run")
 
     if source == "offsite":
-        # ops offsite restore <project> <env> — downloads from offsite storage
+        # ops offsite restore <project> <env>
         download_args = ["offsite", "restore", project, env]
         yield _sse_line({"line": f"Downloading {project}/{env} from offsite...", "timestamp": _now()})
 
         download_ok = True
-        async for line in stream_ops(download_args, timeout=_BACKUP_TIMEOUT):
+        async for line in stream_ops_host(download_args, timeout=_BACKUP_TIMEOUT):
             yield _sse_line({"line": line, "timestamp": _now()})
             if line.startswith("[error]"):
                 download_ok = False
@@ -45,7 +49,7 @@
         yield _sse_line({"line": "Download complete. Starting restore...", "timestamp": _now()})
 
     success = True
-    async for line in stream_ops(base_args, timeout=_BACKUP_TIMEOUT):
+    async for line in stream_ops_host(base_args, timeout=_BACKUP_TIMEOUT):
         yield _sse_line({"line": line, "timestamp": _now()})
         if line.startswith("[error]"):
             success = False
@@ -69,7 +73,7 @@
     Restore a backup for the given project/env.
 
     Uses Server-Sent Events (SSE) to stream real-time progress.
-    Parameters are passed as query strings since EventSource only supports GET.
+    Runs on the host via nsenter for Python venv compatibility.
     """
     return StreamingResponse(
         _restore_generator(project, env, source, dry_run),

--
Gitblit v1.3.1