import asyncio import json import os import uuid from typing import AsyncGenerator OPS_CLI = os.environ.get("OPS_CLI", "/opt/infrastructure/ops") OFFSITE_PYTHON = os.environ.get("OFFSITE_PYTHON", "/opt/data/π/bin/python3") OFFSITE_SCRIPT = os.environ.get("OFFSITE_SCRIPT", "/opt/data/scripts/offsite.py") _DEFAULT_TIMEOUT = 300 _BACKUP_TIMEOUT = 3600 # --------------------------------------------------------------------------- # Operation registry — tracks running processes for cancel support # --------------------------------------------------------------------------- _active_ops: dict[str, asyncio.subprocess.Process] = {} _cancelled_ops: set[str] = set() def new_op_id() -> str: return uuid.uuid4().hex[:12] def register_op(op_id: str, proc: asyncio.subprocess.Process) -> None: _active_ops[op_id] = proc def deregister_op(op_id: str) -> None: _active_ops.pop(op_id, None) # NOTE: do NOT clear _cancelled_ops here — callers check is_cancelled() # after the stream ends. The flag is cleared by clear_cancelled() instead. def clear_cancelled(op_id: str) -> None: """Call after the generator has finished checking is_cancelled().""" _cancelled_ops.discard(op_id) def cancel_op(op_id: str) -> bool: """Terminate a running operation. Returns True if found and killed.""" proc = _active_ops.get(op_id) if proc is None: return False _cancelled_ops.add(op_id) try: proc.terminate() except ProcessLookupError: pass return True def is_cancelled(op_id: str) -> bool: return op_id in _cancelled_ops # nsenter via Docker: run commands on the host from inside the container. # Required because ops backup/restore delegate to host Python venvs (3.12) # that are incompatible with the container's Python (3.11). _NSENTER_PREFIX = [ "docker", "run", "--rm", "-i", "--privileged", "--pid=host", "--network=host", "alpine", "nsenter", "-t", "1", "-m", "-u", "-i", "-n", "-p", "--", ] # --------------------------------------------------------------------------- # In-container execution (status, disk, health, docker commands) # --------------------------------------------------------------------------- async def run_ops(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict: """Run the ops CLI inside the container.""" return await _run_exec([OPS_CLI] + args, timeout=timeout) async def run_ops_json(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict: """Run the ops CLI with --json and return parsed JSON.""" result = await run_ops(args + ["--json"], timeout=timeout) if not result["success"]: return {"success": False, "data": None, "error": result["error"] or result["output"]} try: data = json.loads(result["output"]) return {"success": True, "data": data, "error": ""} except json.JSONDecodeError as exc: raw = result["output"][:500] return { "success": False, "data": None, "error": f"Failed to parse JSON: {exc}\nRaw: {raw}", } async def stream_ops(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]: """Stream ops CLI output (in-container).""" async for line in _stream_exec([OPS_CLI] + args, timeout=timeout): yield line async def run_command(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict: """Generic command runner (in-container).""" return await _run_exec(args, timeout=timeout) async def stream_command(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]: """Stream generic command output (in-container).""" async for line in _stream_exec(args, timeout=timeout): yield line # --------------------------------------------------------------------------- # Host execution (backup, restore — needs host Python venvs) # --------------------------------------------------------------------------- async def run_ops_host(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict: """Run the ops CLI on the host via nsenter.""" return await _run_exec(_NSENTER_PREFIX + [OPS_CLI] + args, timeout=timeout) async def run_ops_host_json(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict: """Run the ops CLI on the host via nsenter with --json and return parsed JSON.""" result = await run_ops_host(args + ["--json"], timeout=timeout) if not result["success"]: return {"success": False, "data": None, "error": result["error"] or result["output"]} try: data = json.loads(result["output"]) return {"success": True, "data": data, "error": ""} except json.JSONDecodeError as exc: raw = result["output"][:500] return { "success": False, "data": None, "error": f"Failed to parse JSON: {exc}\nRaw: {raw}", } async def stream_ops_host(args: list[str], timeout: int = _DEFAULT_TIMEOUT, op_id: str | None = None) -> AsyncGenerator[str, None]: """Stream ops CLI output from the host via nsenter.""" async for line in _stream_exec(_NSENTER_PREFIX + [OPS_CLI] + args, timeout=timeout, op_id=op_id): yield line async def run_command_host(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict: """Run an arbitrary command on the host via nsenter.""" return await _run_exec(_NSENTER_PREFIX + args, timeout=timeout) async def stream_command_host(args: list[str], timeout: int = _DEFAULT_TIMEOUT, op_id: str | None = None) -> AsyncGenerator[str, None]: """Stream arbitrary command output from the host via nsenter.""" async for line in _stream_exec(_NSENTER_PREFIX + args, timeout=timeout, op_id=op_id): yield line # --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- async def _run_exec(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict: """Execute a command and capture output.""" try: proc = await asyncio.create_subprocess_exec( *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) try: stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout) except asyncio.TimeoutError: proc.kill() await proc.communicate() return {"success": False, "output": "", "error": f"Command timed out after {timeout}s"} return { "success": proc.returncode == 0, "output": stdout.decode("utf-8", errors="replace"), "error": stderr.decode("utf-8", errors="replace"), } except FileNotFoundError as exc: return {"success": False, "output": "", "error": f"Executable not found: {exc}"} except Exception as exc: return {"success": False, "output": "", "error": str(exc)} async def _stream_exec(args: list[str], timeout: int = _DEFAULT_TIMEOUT, op_id: str | None = None) -> AsyncGenerator[str, None]: """Execute a command and yield interleaved stdout/stderr lines.""" try: proc = await asyncio.create_subprocess_exec( *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) except FileNotFoundError as exc: yield f"[error] Executable not found: {exc}" return except Exception as exc: yield f"[error] Failed to start process: {exc}" return if op_id: register_op(op_id, proc) try: async def _readline(stream, prefix=""): while True: try: line = await asyncio.wait_for(stream.readline(), timeout=timeout) except asyncio.TimeoutError: yield f"{prefix}[timeout] Command exceeded {timeout}s" break if not line: break yield prefix + line.decode("utf-8", errors="replace").rstrip("\n") stdout_gen = _readline(proc.stdout).__aiter__() stderr_gen = _readline(proc.stderr, "[stderr] ").__aiter__() stdout_done = stderr_done = False pending_out = pending_err = None async def _next(it): try: return await it.__anext__() except StopAsyncIteration: return None pending_out = asyncio.create_task(_next(stdout_gen)) pending_err = asyncio.create_task(_next(stderr_gen)) while not (stdout_done and stderr_done): tasks = [t for t in (pending_out, pending_err) if t is not None] if not tasks: break done, _ = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) for task in done: val = task.result() if task is pending_out: if val is None: stdout_done = True pending_out = None else: yield val pending_out = asyncio.create_task(_next(stdout_gen)) elif task is pending_err: if val is None: stderr_done = True pending_err = None else: yield val pending_err = asyncio.create_task(_next(stderr_gen)) await proc.wait() except (asyncio.CancelledError, GeneratorExit): # Browser disconnected or generator closed — kill the process try: proc.terminate() except ProcessLookupError: pass await proc.wait() raise finally: if op_id: deregister_op(op_id)