Matthias Nott
2026-02-21 68b89251bd42af5eea293b9302b78df0ed87a86f
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import json
from datetime import datetime, timezone
from typing import AsyncGenerator, Literal
from fastapi import APIRouter, Depends, Query
from fastapi.responses import StreamingResponse
from app.auth import verify_token
from app.ops_runner import _BACKUP_TIMEOUT, stream_ops
router = APIRouter()
def _sse_line(payload: dict) -> str:
    """Format a dict as a single SSE data line."""
    return f"data: {json.dumps(payload)}\n\n"
async def _restore_generator(
    project: str,
    env: str,
    source: str,
    dry_run: bool,
) -> AsyncGenerator[str, None]:
    """
    Async generator that drives the restore workflow and yields SSE events.
    """
    base_args = ["restore", project, env]
    if dry_run:
        base_args.append("--dry-run")
    if source == "offsite":
        download_args = ["offsite", "download", project, env]
        yield _sse_line({"line": f"Downloading {project}/{env} from offsite...", "timestamp": _now()})
        download_ok = True
        async for line in stream_ops(download_args, timeout=_BACKUP_TIMEOUT):
            yield _sse_line({"line": line, "timestamp": _now()})
            if line.startswith("[error]"):
                download_ok = False
        if not download_ok:
            yield _sse_line({"done": True, "success": False})
            return
        yield _sse_line({"line": "Download complete. Starting restore...", "timestamp": _now()})
    success = True
    async for line in stream_ops(base_args, timeout=_BACKUP_TIMEOUT):
        yield _sse_line({"line": line, "timestamp": _now()})
        if line.startswith("[error]"):
            success = False
    yield _sse_line({"done": True, "success": success})
def _now() -> str:
    return datetime.now(timezone.utc).isoformat()
@router.get("/{project}/{env}", summary="Restore a backup with real-time output")
async def restore_backup(
    project: str,
    env: str,
    source: Literal["local", "offsite"] = Query(default="local"),
    dry_run: bool = Query(default=False, alias="dry_run"),
    _: str = Depends(verify_token),
) -> StreamingResponse:
    """
    Restore a backup for the given project/env.
    Uses Server-Sent Events (SSE) to stream real-time progress to the client.
    Parameters are passed as query strings since EventSource only supports GET.
    """
    return StreamingResponse(
        _restore_generator(project, env, source, dry_run),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",
        },
    )