From 7300351bb9fb147f4de81a60423c4561a4924c21 Mon Sep 17 00:00:00 2001
From: Matthias Nott <mnott@mnsoft.org>
Date: Sat, 21 Feb 2026 16:53:38 +0100
Subject: [PATCH] fix: Run backup/restore on host via nsenter for Python venv compatibility
---
app/app/ops_runner.py | 258 +++++++++--------------
app/routers/backups.py | 46 +--
app/app/routers/backups.py | 46 +--
app/routers/restore.py | 16
app/app/routers/restore.py | 16
app/ops_runner.py | 258 +++++++++--------------
6 files changed, 258 insertions(+), 382 deletions(-)
diff --git a/app/app/ops_runner.py b/app/app/ops_runner.py
index 7b05679..226fdaa 100644
--- a/app/app/ops_runner.py
+++ b/app/app/ops_runner.py
@@ -10,58 +10,31 @@
_DEFAULT_TIMEOUT = 300
_BACKUP_TIMEOUT = 3600
+# nsenter via Docker: run commands on the host from inside the container.
+# Required because ops backup/restore delegate to host Python venvs (3.12)
+# that are incompatible with the container's Python (3.11).
+_NSENTER_PREFIX = [
+ "docker", "run", "--rm", "-i",
+ "--privileged", "--pid=host", "--network=host",
+ "alpine",
+ "nsenter", "-t", "1", "-m", "-u", "-i", "-n", "-p", "--",
+]
+
+
+# ---------------------------------------------------------------------------
+# In-container execution (status, disk, health, docker commands)
+# ---------------------------------------------------------------------------
async def run_ops(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
- """
- Run the ops CLI with the given arguments and capture output.
- Returns {"success": bool, "output": str, "error": str}.
- """
- try:
- proc = await asyncio.create_subprocess_exec(
- OPS_CLI,
- *args,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- )
- try:
- stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
- except asyncio.TimeoutError:
- proc.kill()
- await proc.communicate()
- return {
- "success": False,
- "output": "",
- "error": f"Command timed out after {timeout}s",
- }
-
- return {
- "success": proc.returncode == 0,
- "output": stdout.decode("utf-8", errors="replace"),
- "error": stderr.decode("utf-8", errors="replace"),
- }
- except FileNotFoundError:
- return {
- "success": False,
- "output": "",
- "error": f"ops CLI not found at {OPS_CLI}",
- }
- except Exception as exc:
- return {
- "success": False,
- "output": "",
- "error": str(exc),
- }
+ """Run the ops CLI inside the container."""
+ return await _run_exec([OPS_CLI] + args, timeout=timeout)
async def run_ops_json(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
- """
- Run the ops CLI with --json appended and return the parsed JSON output.
- Returns {"success": bool, "data": ..., "error": str}.
- """
+ """Run the ops CLI with --json and return parsed JSON."""
result = await run_ops(args + ["--json"], timeout=timeout)
if not result["success"]:
return {"success": False, "data": None, "error": result["error"] or result["output"]}
-
try:
data = json.loads(result["output"])
return {"success": True, "data": data, "error": ""}
@@ -69,95 +42,48 @@
return {
"success": False,
"data": None,
- "error": f"Failed to parse JSON output: {exc}\nRaw output: {result['output'][:500]}",
+ "error": f"Failed to parse JSON: {exc}\nRaw: {result['output'][:500]}",
}
async def stream_ops(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]:
- """
- Async generator that yields lines of stdout from the ops CLI.
- Also yields stderr lines prefixed with '[stderr] '.
- """
- try:
- proc = await asyncio.create_subprocess_exec(
- OPS_CLI,
- *args,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- )
- except FileNotFoundError:
- yield f"[error] ops CLI not found at {OPS_CLI}"
- return
- except Exception as exc:
- yield f"[error] Failed to start process: {exc}"
- return
-
- async def _read_stream(stream: asyncio.StreamReader, prefix: str = "") -> AsyncGenerator[str, None]:
- while True:
- try:
- line = await asyncio.wait_for(stream.readline(), timeout=timeout)
- except asyncio.TimeoutError:
- yield f"{prefix}[timeout] Command exceeded {timeout}s"
- break
- if not line:
- break
- yield prefix + line.decode("utf-8", errors="replace").rstrip("\n")
-
- # Interleave stdout and stderr
- stdout_gen = _read_stream(proc.stdout)
- stderr_gen = _read_stream(proc.stderr, prefix="[stderr] ")
-
- stdout_done = False
- stderr_done = False
-
- stdout_iter = stdout_gen.__aiter__()
- stderr_iter = stderr_gen.__aiter__()
-
- pending_stdout: asyncio.Task | None = None
- pending_stderr: asyncio.Task | None = None
-
- async def _next(it):
- try:
- return await it.__anext__()
- except StopAsyncIteration:
- return None
-
- pending_stdout = asyncio.create_task(_next(stdout_iter))
- pending_stderr = asyncio.create_task(_next(stderr_iter))
-
- while not (stdout_done and stderr_done):
- done, _ = await asyncio.wait(
- [t for t in [pending_stdout, pending_stderr] if t is not None],
- return_when=asyncio.FIRST_COMPLETED,
- )
-
- for task in done:
- val = task.result()
- if task is pending_stdout:
- if val is None:
- stdout_done = True
- pending_stdout = None
- else:
- yield val
- pending_stdout = asyncio.create_task(_next(stdout_iter))
- elif task is pending_stderr:
- if val is None:
- stderr_done = True
- pending_stderr = None
- else:
- yield val
- pending_stderr = asyncio.create_task(_next(stderr_iter))
-
- await proc.wait()
+ """Stream ops CLI output (in-container)."""
+ async for line in _stream_exec([OPS_CLI] + args, timeout=timeout):
+ yield line
-async def run_command(
- args: list[str], timeout: int = _DEFAULT_TIMEOUT
-) -> dict:
- """
- Generic command runner (non-ops). Accepts a full argv list.
- Returns {"success": bool, "output": str, "error": str}.
- """
+async def run_command(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
+ """Generic command runner (in-container)."""
+ return await _run_exec(args, timeout=timeout)
+
+
+async def stream_command(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]:
+ """Stream generic command output (in-container)."""
+ async for line in _stream_exec(args, timeout=timeout):
+ yield line
+
+
+# ---------------------------------------------------------------------------
+# Host execution (backup, restore — needs host Python venvs)
+# ---------------------------------------------------------------------------
+
+async def run_ops_host(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
+ """Run the ops CLI on the host via nsenter."""
+ return await _run_exec(_NSENTER_PREFIX + [OPS_CLI] + args, timeout=timeout)
+
+
+async def stream_ops_host(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]:
+ """Stream ops CLI output from the host via nsenter."""
+ async for line in _stream_exec(_NSENTER_PREFIX + [OPS_CLI] + args, timeout=timeout):
+ yield line
+
+
+# ---------------------------------------------------------------------------
+# Internal helpers
+# ---------------------------------------------------------------------------
+
+async def _run_exec(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
+ """Execute a command and capture output."""
try:
proc = await asyncio.create_subprocess_exec(
*args,
@@ -169,11 +95,7 @@
except asyncio.TimeoutError:
proc.kill()
await proc.communicate()
- return {
- "success": False,
- "output": "",
- "error": f"Command timed out after {timeout}s",
- }
+ return {"success": False, "output": "", "error": f"Command timed out after {timeout}s"}
return {
"success": proc.returncode == 0,
@@ -186,12 +108,8 @@
return {"success": False, "output": "", "error": str(exc)}
-async def stream_command(
- args: list[str], timeout: int = _DEFAULT_TIMEOUT
-) -> AsyncGenerator[str, None]:
- """
- Async generator that yields lines of stdout for an arbitrary command.
- """
+async def _stream_exec(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]:
+ """Execute a command and yield interleaved stdout/stderr lines."""
try:
proc = await asyncio.create_subprocess_exec(
*args,
@@ -202,24 +120,56 @@
yield f"[error] Executable not found: {exc}"
return
except Exception as exc:
- yield f"[error] {exc}"
+ yield f"[error] Failed to start process: {exc}"
return
- while True:
- try:
- line = await asyncio.wait_for(proc.stdout.readline(), timeout=timeout)
- except asyncio.TimeoutError:
- yield f"[timeout] Command exceeded {timeout}s"
- proc.kill()
- break
- if not line:
- break
- yield line.decode("utf-8", errors="replace").rstrip("\n")
+ async def _readline(stream, prefix=""):
+ while True:
+ try:
+ line = await asyncio.wait_for(stream.readline(), timeout=timeout)
+ except asyncio.TimeoutError:
+ yield f"{prefix}[timeout] Command exceeded {timeout}s"
+ break
+ if not line:
+ break
+ yield prefix + line.decode("utf-8", errors="replace").rstrip("\n")
- # Flush stderr as trailing info
- stderr_data = await proc.stderr.read()
- if stderr_data:
- for ln in stderr_data.decode("utf-8", errors="replace").splitlines():
- yield f"[stderr] {ln}"
+ stdout_gen = _readline(proc.stdout).__aiter__()
+ stderr_gen = _readline(proc.stderr, "[stderr] ").__aiter__()
+
+ stdout_done = stderr_done = False
+ pending_out = pending_err = None
+
+ async def _next(it):
+ try:
+ return await it.__anext__()
+ except StopAsyncIteration:
+ return None
+
+ pending_out = asyncio.create_task(_next(stdout_gen))
+ pending_err = asyncio.create_task(_next(stderr_gen))
+
+ while not (stdout_done and stderr_done):
+ tasks = [t for t in (pending_out, pending_err) if t is not None]
+ if not tasks:
+ break
+ done, _ = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
+
+ for task in done:
+ val = task.result()
+ if task is pending_out:
+ if val is None:
+ stdout_done = True
+ pending_out = None
+ else:
+ yield val
+ pending_out = asyncio.create_task(_next(stdout_gen))
+ elif task is pending_err:
+ if val is None:
+ stderr_done = True
+ pending_err = None
+ else:
+ yield val
+ pending_err = asyncio.create_task(_next(stderr_gen))
await proc.wait()
diff --git a/app/app/routers/backups.py b/app/app/routers/backups.py
index d0a6f81..d1cf26e 100644
--- a/app/app/routers/backups.py
+++ b/app/app/routers/backups.py
@@ -3,7 +3,7 @@
from fastapi import APIRouter, Depends, HTTPException
from app.auth import verify_token
-from app.ops_runner import run_ops, run_ops_json, _BACKUP_TIMEOUT
+from app.ops_runner import run_ops, run_ops_json, run_ops_host, _BACKUP_TIMEOUT
router = APIRouter()
@@ -12,15 +12,10 @@
async def list_backups(
_: str = Depends(verify_token),
) -> list[dict[str, Any]]:
- """
- Returns a list of local backup records from `ops backups --json`.
- """
+ """Returns a list of local backup records from `ops backups --json`."""
result = await run_ops_json(["backups"])
if not result["success"]:
- raise HTTPException(
- status_code=500,
- detail=f"Failed to list backups: {result['error']}",
- )
+ raise HTTPException(status_code=500, detail=f"Failed to list backups: {result['error']}")
data = result["data"]
if isinstance(data, list):
@@ -37,9 +32,7 @@
async def list_offsite_backups(
_: str = Depends(verify_token),
) -> list[dict[str, Any]]:
- """
- Returns a list of offsite backup records, querying each project separately.
- """
+ """Returns a list of offsite backup records."""
all_backups = []
for project in ["mdf", "seriousletter"]:
result = await run_ops_json(["offsite", "list", project])
@@ -57,9 +50,12 @@
_: str = Depends(verify_token),
) -> dict[str, Any]:
"""
- Runs `ops backup {project} {env}` and returns the result.
+ Runs `ops backup {project} {env}` on the host.
+
+ Runs via nsenter because ops backup delegates to project CLIs
+ that use host Python venvs.
"""
- result = await run_ops(["backup", project, env], timeout=_BACKUP_TIMEOUT)
+ result = await run_ops_host(["backup", project, env], timeout=_BACKUP_TIMEOUT)
if not result["success"]:
raise HTTPException(
status_code=500,
@@ -79,10 +75,8 @@
env: str,
_: str = Depends(verify_token),
) -> dict[str, Any]:
- """
- Runs `ops offsite upload {project} {env}` and returns the result.
- """
- result = await run_ops(
+ """Runs `ops offsite upload {project} {env}` on the host."""
+ result = await run_ops_host(
["offsite", "upload", project, env], timeout=_BACKUP_TIMEOUT
)
if not result["success"]:
@@ -90,28 +84,18 @@
status_code=500,
detail=f"Offsite upload failed: {result['error'] or result['output']}",
)
- return {
- "success": True,
- "output": result["output"],
- "project": project,
- "env": env,
- }
+ return {"success": True, "output": result["output"], "project": project, "env": env}
@router.post("/offsite/retention", summary="Apply offsite retention policy")
async def apply_retention(
_: str = Depends(verify_token),
) -> dict[str, Any]:
- """
- Runs `ops offsite retention` and returns the result.
- """
- result = await run_ops(["offsite", "retention"], timeout=_BACKUP_TIMEOUT)
+ """Runs `ops offsite retention` on the host."""
+ result = await run_ops_host(["offsite", "retention"], timeout=_BACKUP_TIMEOUT)
if not result["success"]:
raise HTTPException(
status_code=500,
detail=f"Retention policy failed: {result['error'] or result['output']}",
)
- return {
- "success": True,
- "output": result["output"],
- }
+ return {"success": True, "output": result["output"]}
diff --git a/app/app/routers/restore.py b/app/app/routers/restore.py
index b487952..d03428e 100644
--- a/app/app/routers/restore.py
+++ b/app/app/routers/restore.py
@@ -6,7 +6,7 @@
from fastapi.responses import StreamingResponse
from app.auth import verify_token
-from app.ops_runner import _BACKUP_TIMEOUT, stream_ops
+from app.ops_runner import _BACKUP_TIMEOUT, stream_ops_host
router = APIRouter()
@@ -22,18 +22,22 @@
source: str,
dry_run: bool,
) -> AsyncGenerator[str, None]:
- """Async generator that drives the restore workflow and yields SSE events."""
+ """Async generator that drives the restore workflow and yields SSE events.
+
+ Runs on the host via nsenter because ops restore delegates to project CLIs
+ that use host Python venvs incompatible with the container's Python.
+ """
base_args = ["restore", project, env]
if dry_run:
base_args.append("--dry-run")
if source == "offsite":
- # ops offsite restore <project> <env> — downloads from offsite storage
+ # ops offsite restore <project> <env>
download_args = ["offsite", "restore", project, env]
yield _sse_line({"line": f"Downloading {project}/{env} from offsite...", "timestamp": _now()})
download_ok = True
- async for line in stream_ops(download_args, timeout=_BACKUP_TIMEOUT):
+ async for line in stream_ops_host(download_args, timeout=_BACKUP_TIMEOUT):
yield _sse_line({"line": line, "timestamp": _now()})
if line.startswith("[error]"):
download_ok = False
@@ -45,7 +49,7 @@
yield _sse_line({"line": "Download complete. Starting restore...", "timestamp": _now()})
success = True
- async for line in stream_ops(base_args, timeout=_BACKUP_TIMEOUT):
+ async for line in stream_ops_host(base_args, timeout=_BACKUP_TIMEOUT):
yield _sse_line({"line": line, "timestamp": _now()})
if line.startswith("[error]"):
success = False
@@ -69,7 +73,7 @@
Restore a backup for the given project/env.
Uses Server-Sent Events (SSE) to stream real-time progress.
- Parameters are passed as query strings since EventSource only supports GET.
+ Runs on the host via nsenter for Python venv compatibility.
"""
return StreamingResponse(
_restore_generator(project, env, source, dry_run),
diff --git a/app/ops_runner.py b/app/ops_runner.py
index 7b05679..226fdaa 100644
--- a/app/ops_runner.py
+++ b/app/ops_runner.py
@@ -10,58 +10,31 @@
_DEFAULT_TIMEOUT = 300
_BACKUP_TIMEOUT = 3600
+# nsenter via Docker: run commands on the host from inside the container.
+# Required because ops backup/restore delegate to host Python venvs (3.12)
+# that are incompatible with the container's Python (3.11).
+_NSENTER_PREFIX = [
+ "docker", "run", "--rm", "-i",
+ "--privileged", "--pid=host", "--network=host",
+ "alpine",
+ "nsenter", "-t", "1", "-m", "-u", "-i", "-n", "-p", "--",
+]
+
+
+# ---------------------------------------------------------------------------
+# In-container execution (status, disk, health, docker commands)
+# ---------------------------------------------------------------------------
async def run_ops(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
- """
- Run the ops CLI with the given arguments and capture output.
- Returns {"success": bool, "output": str, "error": str}.
- """
- try:
- proc = await asyncio.create_subprocess_exec(
- OPS_CLI,
- *args,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- )
- try:
- stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
- except asyncio.TimeoutError:
- proc.kill()
- await proc.communicate()
- return {
- "success": False,
- "output": "",
- "error": f"Command timed out after {timeout}s",
- }
-
- return {
- "success": proc.returncode == 0,
- "output": stdout.decode("utf-8", errors="replace"),
- "error": stderr.decode("utf-8", errors="replace"),
- }
- except FileNotFoundError:
- return {
- "success": False,
- "output": "",
- "error": f"ops CLI not found at {OPS_CLI}",
- }
- except Exception as exc:
- return {
- "success": False,
- "output": "",
- "error": str(exc),
- }
+ """Run the ops CLI inside the container."""
+ return await _run_exec([OPS_CLI] + args, timeout=timeout)
async def run_ops_json(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
- """
- Run the ops CLI with --json appended and return the parsed JSON output.
- Returns {"success": bool, "data": ..., "error": str}.
- """
+ """Run the ops CLI with --json and return parsed JSON."""
result = await run_ops(args + ["--json"], timeout=timeout)
if not result["success"]:
return {"success": False, "data": None, "error": result["error"] or result["output"]}
-
try:
data = json.loads(result["output"])
return {"success": True, "data": data, "error": ""}
@@ -69,95 +42,48 @@
return {
"success": False,
"data": None,
- "error": f"Failed to parse JSON output: {exc}\nRaw output: {result['output'][:500]}",
+ "error": f"Failed to parse JSON: {exc}\nRaw: {result['output'][:500]}",
}
async def stream_ops(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]:
- """
- Async generator that yields lines of stdout from the ops CLI.
- Also yields stderr lines prefixed with '[stderr] '.
- """
- try:
- proc = await asyncio.create_subprocess_exec(
- OPS_CLI,
- *args,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- )
- except FileNotFoundError:
- yield f"[error] ops CLI not found at {OPS_CLI}"
- return
- except Exception as exc:
- yield f"[error] Failed to start process: {exc}"
- return
-
- async def _read_stream(stream: asyncio.StreamReader, prefix: str = "") -> AsyncGenerator[str, None]:
- while True:
- try:
- line = await asyncio.wait_for(stream.readline(), timeout=timeout)
- except asyncio.TimeoutError:
- yield f"{prefix}[timeout] Command exceeded {timeout}s"
- break
- if not line:
- break
- yield prefix + line.decode("utf-8", errors="replace").rstrip("\n")
-
- # Interleave stdout and stderr
- stdout_gen = _read_stream(proc.stdout)
- stderr_gen = _read_stream(proc.stderr, prefix="[stderr] ")
-
- stdout_done = False
- stderr_done = False
-
- stdout_iter = stdout_gen.__aiter__()
- stderr_iter = stderr_gen.__aiter__()
-
- pending_stdout: asyncio.Task | None = None
- pending_stderr: asyncio.Task | None = None
-
- async def _next(it):
- try:
- return await it.__anext__()
- except StopAsyncIteration:
- return None
-
- pending_stdout = asyncio.create_task(_next(stdout_iter))
- pending_stderr = asyncio.create_task(_next(stderr_iter))
-
- while not (stdout_done and stderr_done):
- done, _ = await asyncio.wait(
- [t for t in [pending_stdout, pending_stderr] if t is not None],
- return_when=asyncio.FIRST_COMPLETED,
- )
-
- for task in done:
- val = task.result()
- if task is pending_stdout:
- if val is None:
- stdout_done = True
- pending_stdout = None
- else:
- yield val
- pending_stdout = asyncio.create_task(_next(stdout_iter))
- elif task is pending_stderr:
- if val is None:
- stderr_done = True
- pending_stderr = None
- else:
- yield val
- pending_stderr = asyncio.create_task(_next(stderr_iter))
-
- await proc.wait()
+ """Stream ops CLI output (in-container)."""
+ async for line in _stream_exec([OPS_CLI] + args, timeout=timeout):
+ yield line
-async def run_command(
- args: list[str], timeout: int = _DEFAULT_TIMEOUT
-) -> dict:
- """
- Generic command runner (non-ops). Accepts a full argv list.
- Returns {"success": bool, "output": str, "error": str}.
- """
+async def run_command(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
+ """Generic command runner (in-container)."""
+ return await _run_exec(args, timeout=timeout)
+
+
+async def stream_command(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]:
+ """Stream generic command output (in-container)."""
+ async for line in _stream_exec(args, timeout=timeout):
+ yield line
+
+
+# ---------------------------------------------------------------------------
+# Host execution (backup, restore — needs host Python venvs)
+# ---------------------------------------------------------------------------
+
+async def run_ops_host(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
+ """Run the ops CLI on the host via nsenter."""
+ return await _run_exec(_NSENTER_PREFIX + [OPS_CLI] + args, timeout=timeout)
+
+
+async def stream_ops_host(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]:
+ """Stream ops CLI output from the host via nsenter."""
+ async for line in _stream_exec(_NSENTER_PREFIX + [OPS_CLI] + args, timeout=timeout):
+ yield line
+
+
+# ---------------------------------------------------------------------------
+# Internal helpers
+# ---------------------------------------------------------------------------
+
+async def _run_exec(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
+ """Execute a command and capture output."""
try:
proc = await asyncio.create_subprocess_exec(
*args,
@@ -169,11 +95,7 @@
except asyncio.TimeoutError:
proc.kill()
await proc.communicate()
- return {
- "success": False,
- "output": "",
- "error": f"Command timed out after {timeout}s",
- }
+ return {"success": False, "output": "", "error": f"Command timed out after {timeout}s"}
return {
"success": proc.returncode == 0,
@@ -186,12 +108,8 @@
return {"success": False, "output": "", "error": str(exc)}
-async def stream_command(
- args: list[str], timeout: int = _DEFAULT_TIMEOUT
-) -> AsyncGenerator[str, None]:
- """
- Async generator that yields lines of stdout for an arbitrary command.
- """
+async def _stream_exec(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]:
+ """Execute a command and yield interleaved stdout/stderr lines."""
try:
proc = await asyncio.create_subprocess_exec(
*args,
@@ -202,24 +120,56 @@
yield f"[error] Executable not found: {exc}"
return
except Exception as exc:
- yield f"[error] {exc}"
+ yield f"[error] Failed to start process: {exc}"
return
- while True:
- try:
- line = await asyncio.wait_for(proc.stdout.readline(), timeout=timeout)
- except asyncio.TimeoutError:
- yield f"[timeout] Command exceeded {timeout}s"
- proc.kill()
- break
- if not line:
- break
- yield line.decode("utf-8", errors="replace").rstrip("\n")
+ async def _readline(stream, prefix=""):
+ while True:
+ try:
+ line = await asyncio.wait_for(stream.readline(), timeout=timeout)
+ except asyncio.TimeoutError:
+ yield f"{prefix}[timeout] Command exceeded {timeout}s"
+ break
+ if not line:
+ break
+ yield prefix + line.decode("utf-8", errors="replace").rstrip("\n")
- # Flush stderr as trailing info
- stderr_data = await proc.stderr.read()
- if stderr_data:
- for ln in stderr_data.decode("utf-8", errors="replace").splitlines():
- yield f"[stderr] {ln}"
+ stdout_gen = _readline(proc.stdout).__aiter__()
+ stderr_gen = _readline(proc.stderr, "[stderr] ").__aiter__()
+
+ stdout_done = stderr_done = False
+ pending_out = pending_err = None
+
+ async def _next(it):
+ try:
+ return await it.__anext__()
+ except StopAsyncIteration:
+ return None
+
+ pending_out = asyncio.create_task(_next(stdout_gen))
+ pending_err = asyncio.create_task(_next(stderr_gen))
+
+ while not (stdout_done and stderr_done):
+ tasks = [t for t in (pending_out, pending_err) if t is not None]
+ if not tasks:
+ break
+ done, _ = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
+
+ for task in done:
+ val = task.result()
+ if task is pending_out:
+ if val is None:
+ stdout_done = True
+ pending_out = None
+ else:
+ yield val
+ pending_out = asyncio.create_task(_next(stdout_gen))
+ elif task is pending_err:
+ if val is None:
+ stderr_done = True
+ pending_err = None
+ else:
+ yield val
+ pending_err = asyncio.create_task(_next(stderr_gen))
await proc.wait()
diff --git a/app/routers/backups.py b/app/routers/backups.py
index d0a6f81..d1cf26e 100644
--- a/app/routers/backups.py
+++ b/app/routers/backups.py
@@ -3,7 +3,7 @@
from fastapi import APIRouter, Depends, HTTPException
from app.auth import verify_token
-from app.ops_runner import run_ops, run_ops_json, _BACKUP_TIMEOUT
+from app.ops_runner import run_ops, run_ops_json, run_ops_host, _BACKUP_TIMEOUT
router = APIRouter()
@@ -12,15 +12,10 @@
async def list_backups(
_: str = Depends(verify_token),
) -> list[dict[str, Any]]:
- """
- Returns a list of local backup records from `ops backups --json`.
- """
+ """Returns a list of local backup records from `ops backups --json`."""
result = await run_ops_json(["backups"])
if not result["success"]:
- raise HTTPException(
- status_code=500,
- detail=f"Failed to list backups: {result['error']}",
- )
+ raise HTTPException(status_code=500, detail=f"Failed to list backups: {result['error']}")
data = result["data"]
if isinstance(data, list):
@@ -37,9 +32,7 @@
async def list_offsite_backups(
_: str = Depends(verify_token),
) -> list[dict[str, Any]]:
- """
- Returns a list of offsite backup records, querying each project separately.
- """
+ """Returns a list of offsite backup records."""
all_backups = []
for project in ["mdf", "seriousletter"]:
result = await run_ops_json(["offsite", "list", project])
@@ -57,9 +50,12 @@
_: str = Depends(verify_token),
) -> dict[str, Any]:
"""
- Runs `ops backup {project} {env}` and returns the result.
+ Runs `ops backup {project} {env}` on the host.
+
+ Runs via nsenter because ops backup delegates to project CLIs
+ that use host Python venvs.
"""
- result = await run_ops(["backup", project, env], timeout=_BACKUP_TIMEOUT)
+ result = await run_ops_host(["backup", project, env], timeout=_BACKUP_TIMEOUT)
if not result["success"]:
raise HTTPException(
status_code=500,
@@ -79,10 +75,8 @@
env: str,
_: str = Depends(verify_token),
) -> dict[str, Any]:
- """
- Runs `ops offsite upload {project} {env}` and returns the result.
- """
- result = await run_ops(
+ """Runs `ops offsite upload {project} {env}` on the host."""
+ result = await run_ops_host(
["offsite", "upload", project, env], timeout=_BACKUP_TIMEOUT
)
if not result["success"]:
@@ -90,28 +84,18 @@
status_code=500,
detail=f"Offsite upload failed: {result['error'] or result['output']}",
)
- return {
- "success": True,
- "output": result["output"],
- "project": project,
- "env": env,
- }
+ return {"success": True, "output": result["output"], "project": project, "env": env}
@router.post("/offsite/retention", summary="Apply offsite retention policy")
async def apply_retention(
_: str = Depends(verify_token),
) -> dict[str, Any]:
- """
- Runs `ops offsite retention` and returns the result.
- """
- result = await run_ops(["offsite", "retention"], timeout=_BACKUP_TIMEOUT)
+ """Runs `ops offsite retention` on the host."""
+ result = await run_ops_host(["offsite", "retention"], timeout=_BACKUP_TIMEOUT)
if not result["success"]:
raise HTTPException(
status_code=500,
detail=f"Retention policy failed: {result['error'] or result['output']}",
)
- return {
- "success": True,
- "output": result["output"],
- }
+ return {"success": True, "output": result["output"]}
diff --git a/app/routers/restore.py b/app/routers/restore.py
index b487952..d03428e 100644
--- a/app/routers/restore.py
+++ b/app/routers/restore.py
@@ -6,7 +6,7 @@
from fastapi.responses import StreamingResponse
from app.auth import verify_token
-from app.ops_runner import _BACKUP_TIMEOUT, stream_ops
+from app.ops_runner import _BACKUP_TIMEOUT, stream_ops_host
router = APIRouter()
@@ -22,18 +22,22 @@
source: str,
dry_run: bool,
) -> AsyncGenerator[str, None]:
- """Async generator that drives the restore workflow and yields SSE events."""
+ """Async generator that drives the restore workflow and yields SSE events.
+
+ Runs on the host via nsenter because ops restore delegates to project CLIs
+ that use host Python venvs incompatible with the container's Python.
+ """
base_args = ["restore", project, env]
if dry_run:
base_args.append("--dry-run")
if source == "offsite":
- # ops offsite restore <project> <env> — downloads from offsite storage
+ # ops offsite restore <project> <env>
download_args = ["offsite", "restore", project, env]
yield _sse_line({"line": f"Downloading {project}/{env} from offsite...", "timestamp": _now()})
download_ok = True
- async for line in stream_ops(download_args, timeout=_BACKUP_TIMEOUT):
+ async for line in stream_ops_host(download_args, timeout=_BACKUP_TIMEOUT):
yield _sse_line({"line": line, "timestamp": _now()})
if line.startswith("[error]"):
download_ok = False
@@ -45,7 +49,7 @@
yield _sse_line({"line": "Download complete. Starting restore...", "timestamp": _now()})
success = True
- async for line in stream_ops(base_args, timeout=_BACKUP_TIMEOUT):
+ async for line in stream_ops_host(base_args, timeout=_BACKUP_TIMEOUT):
yield _sse_line({"line": line, "timestamp": _now()})
if line.startswith("[error]"):
success = False
@@ -69,7 +73,7 @@
Restore a backup for the given project/env.
Uses Server-Sent Events (SSE) to stream real-time progress.
- Parameters are passed as query strings since EventSource only supports GET.
+ Runs on the host via nsenter for Python venv compatibility.
"""
return StreamingResponse(
_restore_generator(project, env, source, dry_run),
--
Gitblit v1.3.1