import asyncio import json from datetime import datetime, timezone from typing import AsyncGenerator, Literal from fastapi import APIRouter, Depends, Query from fastapi.responses import StreamingResponse from app.auth import verify_token from app.ops_runner import _BACKUP_TIMEOUT, new_op_id, is_cancelled, clear_cancelled, stream_ops_host _KEEPALIVE_INTERVAL = 15 # seconds between SSE keepalive pings router = APIRouter() def _sse_line(payload: dict) -> str: """Format a dict as a single SSE data line.""" return f"data: {json.dumps(payload)}\n\n" async def _restore_generator( project: str, env: str, source: str, dry_run: bool, name: str | None = None, mode: str = "full", ) -> AsyncGenerator[str, None]: """Async generator that drives the restore workflow and yields SSE events. Runs on the host via nsenter because ops restore delegates to project CLIs that use host Python venvs incompatible with the container's Python. """ op_id = new_op_id() yield _sse_line({"op_id": op_id}) try: base_args = ["restore", project, env] # Pass the backup file path to avoid interactive selection prompt if name: backup_path = f"/opt/data/backups/{project}/{env}/{name}" base_args.append(backup_path) if dry_run: base_args.append("--dry-run") # Granular restore mode if mode == "db": base_args.append("--db-only") elif mode == "wp": base_args.append("--wp-only") if source == "offsite": # ops offsite restore download_args = ["offsite", "restore", project, env] yield _sse_line({"line": f"Downloading {project}/{env} from offsite...", "timestamp": _now()}) download_ok = True downloaded_path = None async for line in stream_ops_host(download_args, timeout=_BACKUP_TIMEOUT, op_id=op_id): yield _sse_line({"line": line, "timestamp": _now()}) if line.startswith("[error]"): download_ok = False # Capture downloaded file path from offsite.py output if "Downloaded to" in line and "/tmp/" in line: # Parse "Downloaded to: /tmp/filename.tar.gz" or similar for part in line.split(): if part.startswith("/tmp/") and part.endswith(".tar.gz"): downloaded_path = part elif line.startswith(" ✓ Downloaded to "): for part in line.split(): if part.startswith("/tmp/") and part.endswith(".tar.gz"): downloaded_path = part if is_cancelled(op_id): yield _sse_line({"done": True, "success": False, "cancelled": True}) return if not download_ok: yield _sse_line({"done": True, "success": False}) return # Use the downloaded offsite file for restore if downloaded_path: base_args.append(downloaded_path) yield _sse_line({"line": f"Download complete. Restoring from {downloaded_path}...", "timestamp": _now()}) else: yield _sse_line({"line": "Download complete. Starting restore...", "timestamp": _now()}) success = True async for item in _stream_with_keepalive(stream_ops_host(base_args, timeout=_BACKUP_TIMEOUT, op_id=op_id)): if item is None: # Keepalive ping — SSE comment to prevent idle timeout yield ": keepalive\n\n" else: yield _sse_line({"line": item, "timestamp": _now()}) if item.startswith("[error]"): success = False if is_cancelled(op_id): yield _sse_line({"done": True, "success": False, "cancelled": True}) else: yield _sse_line({"done": True, "success": success}) finally: clear_cancelled(op_id) def _now() -> str: return datetime.now(timezone.utc).isoformat() async def _stream_with_keepalive(gen: AsyncGenerator[str, None]) -> AsyncGenerator[str | None, None]: """Wrap an async generator to yield None as keepalive when no data arrives within the interval.""" aiter = gen.__aiter__() pending = asyncio.ensure_future(aiter.__anext__()) while True: done, _ = await asyncio.wait({pending}, timeout=_KEEPALIVE_INTERVAL) if done: try: yield pending.result() except StopAsyncIteration: break pending = asyncio.ensure_future(aiter.__anext__()) else: yield None # keepalive — prevents Traefik idle timeout @router.get("/{project}/{env}", summary="Restore a backup with real-time output") async def restore_backup( project: str, env: str, source: Literal["local", "offsite"] = Query(default="local"), dry_run: bool = Query(default=False, alias="dry_run"), name: str | None = Query(default=None), mode: Literal["full", "db", "wp"] = Query(default="full"), _: str = Depends(verify_token), ) -> StreamingResponse: """ Restore a backup for the given project/env. Uses Server-Sent Events (SSE) to stream real-time progress. Runs on the host via nsenter for Python venv compatibility. Modes: full (default), db (database only), wp (wp-content only). """ return StreamingResponse( _restore_generator(project, env, source, dry_run, name, mode), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "X-Accel-Buffering": "no", }, )