Matthias Nott
2026-02-21 7300351bb9fb147f4de81a60423c4561a4924c21
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import json
from datetime import datetime, timezone
from typing import AsyncGenerator, Literal
from fastapi import APIRouter, Depends, Query
from fastapi.responses import StreamingResponse
from app.auth import verify_token
from app.ops_runner import _BACKUP_TIMEOUT, stream_ops_host
router = APIRouter()
def _sse_line(payload: dict) -> str:
    """Format a dict as a single SSE data line."""
    return f"data: {json.dumps(payload)}\n\n"
async def _restore_generator(
    project: str,
    env: str,
    source: str,
    dry_run: bool,
) -> AsyncGenerator[str, None]:
    """Async generator that drives the restore workflow and yields SSE events.
    Runs on the host via nsenter because ops restore delegates to project CLIs
    that use host Python venvs incompatible with the container's Python.
    """
    base_args = ["restore", project, env]
    if dry_run:
        base_args.append("--dry-run")
    if source == "offsite":
        # ops offsite restore <project> <env>
        download_args = ["offsite", "restore", project, env]
        yield _sse_line({"line": f"Downloading {project}/{env} from offsite...", "timestamp": _now()})
        download_ok = True
        async for line in stream_ops_host(download_args, timeout=_BACKUP_TIMEOUT):
            yield _sse_line({"line": line, "timestamp": _now()})
            if line.startswith("[error]"):
                download_ok = False
        if not download_ok:
            yield _sse_line({"done": True, "success": False})
            return
        yield _sse_line({"line": "Download complete. Starting restore...", "timestamp": _now()})
    success = True
    async for line in stream_ops_host(base_args, timeout=_BACKUP_TIMEOUT):
        yield _sse_line({"line": line, "timestamp": _now()})
        if line.startswith("[error]"):
            success = False
    yield _sse_line({"done": True, "success": success})
def _now() -> str:
    return datetime.now(timezone.utc).isoformat()
@router.get("/{project}/{env}", summary="Restore a backup with real-time output")
async def restore_backup(
    project: str,
    env: str,
    source: Literal["local", "offsite"] = Query(default="local"),
    dry_run: bool = Query(default=False, alias="dry_run"),
    _: str = Depends(verify_token),
) -> StreamingResponse:
    """
    Restore a backup for the given project/env.
    Uses Server-Sent Events (SSE) to stream real-time progress.
    Runs on the host via nsenter for Python venv compatibility.
    """
    return StreamingResponse(
        _restore_generator(project, env, source, dry_run),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",
        },
    )