From 7300351bb9fb147f4de81a60423c4561a4924c21 Mon Sep 17 00:00:00 2001
From: Matthias Nott <mnott@mnsoft.org>
Date: Sat, 21 Feb 2026 16:53:38 +0100
Subject: [PATCH] fix: Run backup/restore on host via nsenter for Python venv compatibility
---
app/app/ops_runner.py | 258 ++++++++++++++++++++------------------------------
1 files changed, 104 insertions(+), 154 deletions(-)
diff --git a/app/app/ops_runner.py b/app/app/ops_runner.py
index 7b05679..226fdaa 100644
--- a/app/app/ops_runner.py
+++ b/app/app/ops_runner.py
@@ -10,58 +10,31 @@
_DEFAULT_TIMEOUT = 300
_BACKUP_TIMEOUT = 3600
+# nsenter via Docker: run commands on the host from inside the container.
+# Required because ops backup/restore delegate to host Python venvs (3.12)
+# that are incompatible with the container's Python (3.11).
+_NSENTER_PREFIX = [
+ "docker", "run", "--rm", "-i",
+ "--privileged", "--pid=host", "--network=host",
+ "alpine",
+ "nsenter", "-t", "1", "-m", "-u", "-i", "-n", "-p", "--",
+]
+
+
+# ---------------------------------------------------------------------------
+# In-container execution (status, disk, health, docker commands)
+# ---------------------------------------------------------------------------
async def run_ops(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
- """
- Run the ops CLI with the given arguments and capture output.
- Returns {"success": bool, "output": str, "error": str}.
- """
- try:
- proc = await asyncio.create_subprocess_exec(
- OPS_CLI,
- *args,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- )
- try:
- stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
- except asyncio.TimeoutError:
- proc.kill()
- await proc.communicate()
- return {
- "success": False,
- "output": "",
- "error": f"Command timed out after {timeout}s",
- }
-
- return {
- "success": proc.returncode == 0,
- "output": stdout.decode("utf-8", errors="replace"),
- "error": stderr.decode("utf-8", errors="replace"),
- }
- except FileNotFoundError:
- return {
- "success": False,
- "output": "",
- "error": f"ops CLI not found at {OPS_CLI}",
- }
- except Exception as exc:
- return {
- "success": False,
- "output": "",
- "error": str(exc),
- }
+ """Run the ops CLI inside the container."""
+ return await _run_exec([OPS_CLI] + args, timeout=timeout)
async def run_ops_json(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
- """
- Run the ops CLI with --json appended and return the parsed JSON output.
- Returns {"success": bool, "data": ..., "error": str}.
- """
+ """Run the ops CLI with --json and return parsed JSON."""
result = await run_ops(args + ["--json"], timeout=timeout)
if not result["success"]:
return {"success": False, "data": None, "error": result["error"] or result["output"]}
-
try:
data = json.loads(result["output"])
return {"success": True, "data": data, "error": ""}
@@ -69,95 +42,48 @@
return {
"success": False,
"data": None,
- "error": f"Failed to parse JSON output: {exc}\nRaw output: {result['output'][:500]}",
+ "error": f"Failed to parse JSON: {exc}\nRaw: {result['output'][:500]}",
}
async def stream_ops(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]:
- """
- Async generator that yields lines of stdout from the ops CLI.
- Also yields stderr lines prefixed with '[stderr] '.
- """
- try:
- proc = await asyncio.create_subprocess_exec(
- OPS_CLI,
- *args,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- )
- except FileNotFoundError:
- yield f"[error] ops CLI not found at {OPS_CLI}"
- return
- except Exception as exc:
- yield f"[error] Failed to start process: {exc}"
- return
-
- async def _read_stream(stream: asyncio.StreamReader, prefix: str = "") -> AsyncGenerator[str, None]:
- while True:
- try:
- line = await asyncio.wait_for(stream.readline(), timeout=timeout)
- except asyncio.TimeoutError:
- yield f"{prefix}[timeout] Command exceeded {timeout}s"
- break
- if not line:
- break
- yield prefix + line.decode("utf-8", errors="replace").rstrip("\n")
-
- # Interleave stdout and stderr
- stdout_gen = _read_stream(proc.stdout)
- stderr_gen = _read_stream(proc.stderr, prefix="[stderr] ")
-
- stdout_done = False
- stderr_done = False
-
- stdout_iter = stdout_gen.__aiter__()
- stderr_iter = stderr_gen.__aiter__()
-
- pending_stdout: asyncio.Task | None = None
- pending_stderr: asyncio.Task | None = None
-
- async def _next(it):
- try:
- return await it.__anext__()
- except StopAsyncIteration:
- return None
-
- pending_stdout = asyncio.create_task(_next(stdout_iter))
- pending_stderr = asyncio.create_task(_next(stderr_iter))
-
- while not (stdout_done and stderr_done):
- done, _ = await asyncio.wait(
- [t for t in [pending_stdout, pending_stderr] if t is not None],
- return_when=asyncio.FIRST_COMPLETED,
- )
-
- for task in done:
- val = task.result()
- if task is pending_stdout:
- if val is None:
- stdout_done = True
- pending_stdout = None
- else:
- yield val
- pending_stdout = asyncio.create_task(_next(stdout_iter))
- elif task is pending_stderr:
- if val is None:
- stderr_done = True
- pending_stderr = None
- else:
- yield val
- pending_stderr = asyncio.create_task(_next(stderr_iter))
-
- await proc.wait()
+ """Stream ops CLI output (in-container)."""
+ async for line in _stream_exec([OPS_CLI] + args, timeout=timeout):
+ yield line
-async def run_command(
- args: list[str], timeout: int = _DEFAULT_TIMEOUT
-) -> dict:
- """
- Generic command runner (non-ops). Accepts a full argv list.
- Returns {"success": bool, "output": str, "error": str}.
- """
+async def run_command(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
+ """Generic command runner (in-container)."""
+ return await _run_exec(args, timeout=timeout)
+
+
+async def stream_command(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]:
+ """Stream generic command output (in-container)."""
+ async for line in _stream_exec(args, timeout=timeout):
+ yield line
+
+
+# ---------------------------------------------------------------------------
+# Host execution (backup, restore — needs host Python venvs)
+# ---------------------------------------------------------------------------
+
+async def run_ops_host(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
+ """Run the ops CLI on the host via nsenter."""
+ return await _run_exec(_NSENTER_PREFIX + [OPS_CLI] + args, timeout=timeout)
+
+
+async def stream_ops_host(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]:
+ """Stream ops CLI output from the host via nsenter."""
+ async for line in _stream_exec(_NSENTER_PREFIX + [OPS_CLI] + args, timeout=timeout):
+ yield line
+
+
+# ---------------------------------------------------------------------------
+# Internal helpers
+# ---------------------------------------------------------------------------
+
+async def _run_exec(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
+ """Execute a command and capture output."""
try:
proc = await asyncio.create_subprocess_exec(
*args,
@@ -169,11 +95,7 @@
except asyncio.TimeoutError:
proc.kill()
await proc.communicate()
- return {
- "success": False,
- "output": "",
- "error": f"Command timed out after {timeout}s",
- }
+ return {"success": False, "output": "", "error": f"Command timed out after {timeout}s"}
return {
"success": proc.returncode == 0,
@@ -186,12 +108,8 @@
return {"success": False, "output": "", "error": str(exc)}
-async def stream_command(
- args: list[str], timeout: int = _DEFAULT_TIMEOUT
-) -> AsyncGenerator[str, None]:
- """
- Async generator that yields lines of stdout for an arbitrary command.
- """
+async def _stream_exec(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]:
+ """Execute a command and yield interleaved stdout/stderr lines."""
try:
proc = await asyncio.create_subprocess_exec(
*args,
@@ -202,24 +120,56 @@
yield f"[error] Executable not found: {exc}"
return
except Exception as exc:
- yield f"[error] {exc}"
+ yield f"[error] Failed to start process: {exc}"
return
- while True:
- try:
- line = await asyncio.wait_for(proc.stdout.readline(), timeout=timeout)
- except asyncio.TimeoutError:
- yield f"[timeout] Command exceeded {timeout}s"
- proc.kill()
- break
- if not line:
- break
- yield line.decode("utf-8", errors="replace").rstrip("\n")
+ async def _readline(stream, prefix=""):
+ while True:
+ try:
+ line = await asyncio.wait_for(stream.readline(), timeout=timeout)
+ except asyncio.TimeoutError:
+ yield f"{prefix}[timeout] Command exceeded {timeout}s"
+ break
+ if not line:
+ break
+ yield prefix + line.decode("utf-8", errors="replace").rstrip("\n")
- # Flush stderr as trailing info
- stderr_data = await proc.stderr.read()
- if stderr_data:
- for ln in stderr_data.decode("utf-8", errors="replace").splitlines():
- yield f"[stderr] {ln}"
+ stdout_gen = _readline(proc.stdout).__aiter__()
+ stderr_gen = _readline(proc.stderr, "[stderr] ").__aiter__()
+
+ stdout_done = stderr_done = False
+ pending_out = pending_err = None
+
+ async def _next(it):
+ try:
+ return await it.__anext__()
+ except StopAsyncIteration:
+ return None
+
+ pending_out = asyncio.create_task(_next(stdout_gen))
+ pending_err = asyncio.create_task(_next(stderr_gen))
+
+ while not (stdout_done and stderr_done):
+ tasks = [t for t in (pending_out, pending_err) if t is not None]
+ if not tasks:
+ break
+ done, _ = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
+
+ for task in done:
+ val = task.result()
+ if task is pending_out:
+ if val is None:
+ stdout_done = True
+ pending_out = None
+ else:
+ yield val
+ pending_out = asyncio.create_task(_next(stdout_gen))
+ elif task is pending_err:
+ if val is None:
+ stderr_done = True
+ pending_err = None
+ else:
+ yield val
+ pending_err = asyncio.create_task(_next(stderr_gen))
await proc.wait()
--
Gitblit v1.3.1