import asyncio import json import os from typing import AsyncGenerator OPS_CLI = os.environ.get("OPS_CLI", "/opt/infrastructure/ops") OFFSITE_PYTHON = os.environ.get("OFFSITE_PYTHON", "/opt/data/π/bin/python3") OFFSITE_SCRIPT = os.environ.get("OFFSITE_SCRIPT", "/opt/data/scripts/offsite.py") _DEFAULT_TIMEOUT = 300 _BACKUP_TIMEOUT = 3600 async def run_ops(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict: """ Run the ops CLI with the given arguments and capture output. Returns {"success": bool, "output": str, "error": str}. """ try: proc = await asyncio.create_subprocess_exec( OPS_CLI, *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) try: stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout) except asyncio.TimeoutError: proc.kill() await proc.communicate() return { "success": False, "output": "", "error": f"Command timed out after {timeout}s", } return { "success": proc.returncode == 0, "output": stdout.decode("utf-8", errors="replace"), "error": stderr.decode("utf-8", errors="replace"), } except FileNotFoundError: return { "success": False, "output": "", "error": f"ops CLI not found at {OPS_CLI}", } except Exception as exc: return { "success": False, "output": "", "error": str(exc), } async def run_ops_json(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict: """ Run the ops CLI with --json appended and return the parsed JSON output. Returns {"success": bool, "data": ..., "error": str}. """ result = await run_ops(args + ["--json"], timeout=timeout) if not result["success"]: return {"success": False, "data": None, "error": result["error"] or result["output"]} try: data = json.loads(result["output"]) return {"success": True, "data": data, "error": ""} except json.JSONDecodeError as exc: return { "success": False, "data": None, "error": f"Failed to parse JSON output: {exc}\nRaw output: {result['output'][:500]}", } async def stream_ops(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]: """ Async generator that yields lines of stdout from the ops CLI. Also yields stderr lines prefixed with '[stderr] '. """ try: proc = await asyncio.create_subprocess_exec( OPS_CLI, *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) except FileNotFoundError: yield f"[error] ops CLI not found at {OPS_CLI}" return except Exception as exc: yield f"[error] Failed to start process: {exc}" return async def _read_stream(stream: asyncio.StreamReader, prefix: str = "") -> AsyncGenerator[str, None]: while True: try: line = await asyncio.wait_for(stream.readline(), timeout=timeout) except asyncio.TimeoutError: yield f"{prefix}[timeout] Command exceeded {timeout}s" break if not line: break yield prefix + line.decode("utf-8", errors="replace").rstrip("\n") # Interleave stdout and stderr stdout_gen = _read_stream(proc.stdout) stderr_gen = _read_stream(proc.stderr, prefix="[stderr] ") stdout_done = False stderr_done = False stdout_iter = stdout_gen.__aiter__() stderr_iter = stderr_gen.__aiter__() pending_stdout: asyncio.Task | None = None pending_stderr: asyncio.Task | None = None async def _next(it): try: return await it.__anext__() except StopAsyncIteration: return None pending_stdout = asyncio.create_task(_next(stdout_iter)) pending_stderr = asyncio.create_task(_next(stderr_iter)) while not (stdout_done and stderr_done): done, _ = await asyncio.wait( [t for t in [pending_stdout, pending_stderr] if t is not None], return_when=asyncio.FIRST_COMPLETED, ) for task in done: val = task.result() if task is pending_stdout: if val is None: stdout_done = True pending_stdout = None else: yield val pending_stdout = asyncio.create_task(_next(stdout_iter)) elif task is pending_stderr: if val is None: stderr_done = True pending_stderr = None else: yield val pending_stderr = asyncio.create_task(_next(stderr_iter)) await proc.wait() async def run_command( args: list[str], timeout: int = _DEFAULT_TIMEOUT ) -> dict: """ Generic command runner (non-ops). Accepts a full argv list. Returns {"success": bool, "output": str, "error": str}. """ try: proc = await asyncio.create_subprocess_exec( *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) try: stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout) except asyncio.TimeoutError: proc.kill() await proc.communicate() return { "success": False, "output": "", "error": f"Command timed out after {timeout}s", } return { "success": proc.returncode == 0, "output": stdout.decode("utf-8", errors="replace"), "error": stderr.decode("utf-8", errors="replace"), } except FileNotFoundError as exc: return {"success": False, "output": "", "error": f"Executable not found: {exc}"} except Exception as exc: return {"success": False, "output": "", "error": str(exc)} async def stream_command( args: list[str], timeout: int = _DEFAULT_TIMEOUT ) -> AsyncGenerator[str, None]: """ Async generator that yields lines of stdout for an arbitrary command. """ try: proc = await asyncio.create_subprocess_exec( *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) except FileNotFoundError as exc: yield f"[error] Executable not found: {exc}" return except Exception as exc: yield f"[error] {exc}" return while True: try: line = await asyncio.wait_for(proc.stdout.readline(), timeout=timeout) except asyncio.TimeoutError: yield f"[timeout] Command exceeded {timeout}s" proc.kill() break if not line: break yield line.decode("utf-8", errors="replace").rstrip("\n") # Flush stderr as trailing info stderr_data = await proc.stderr.read() if stderr_data: for ln in stderr_data.decode("utf-8", errors="replace").splitlines(): yield f"[stderr] {ln}" await proc.wait()