Matthias Nott
2026-02-26 5d0247159b125bf035285d56c2b9bb58d6bb3029
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import asyncio
import json
from datetime import datetime, timezone
from typing import AsyncGenerator, Literal
from fastapi import APIRouter, Depends, Query
from fastapi.responses import StreamingResponse
from app.auth import verify_token
from app.ops_runner import _BACKUP_TIMEOUT, new_op_id, is_cancelled, clear_cancelled, stream_ops_host
_KEEPALIVE_INTERVAL = 15  # seconds between SSE keepalive pings
router = APIRouter()
def _sse_line(payload: dict) -> str:
    """Format a dict as a single SSE data line."""
    return f"data: {json.dumps(payload)}\n\n"
async def _restore_generator(
    project: str,
    env: str,
    source: str,
    dry_run: bool,
    name: str | None = None,
    mode: str = "full",
) -> AsyncGenerator[str, None]:
    """Async generator that drives the restore workflow and yields SSE events.
    Runs on the host via nsenter because ops restore delegates to project CLIs
    that use host Python venvs incompatible with the container's Python.
    """
    op_id = new_op_id()
    yield _sse_line({"op_id": op_id})
    try:
        base_args = ["restore", project, env]
        # Pass the backup file path to avoid interactive selection prompt
        if name:
            backup_path = f"/opt/data/backups/{project}/{env}/{name}"
            base_args.append(backup_path)
        if dry_run:
            base_args.append("--dry-run")
        # Granular restore mode
        if mode == "db":
            base_args.append("--db-only")
        elif mode == "wp":
            base_args.append("--wp-only")
        if source == "offsite":
            # ops offsite restore <project> <env>
            download_args = ["offsite", "restore", project, env]
            yield _sse_line({"line": f"Downloading {project}/{env} from offsite...", "timestamp": _now()})
            download_ok = True
            downloaded_path = None
            async for line in stream_ops_host(download_args, timeout=_BACKUP_TIMEOUT, op_id=op_id):
                yield _sse_line({"line": line, "timestamp": _now()})
                if line.startswith("[error]"):
                    download_ok = False
                # Capture downloaded file path from offsite.py output
                if "Downloaded to" in line and "/tmp/" in line:
                    # Parse "Downloaded to: /tmp/filename.tar.gz" or similar
                    for part in line.split():
                        if part.startswith("/tmp/") and part.endswith(".tar.gz"):
                            downloaded_path = part
                elif line.startswith("  ✓ Downloaded to "):
                    for part in line.split():
                        if part.startswith("/tmp/") and part.endswith(".tar.gz"):
                            downloaded_path = part
            if is_cancelled(op_id):
                yield _sse_line({"done": True, "success": False, "cancelled": True})
                return
            if not download_ok:
                yield _sse_line({"done": True, "success": False})
                return
            # Use the downloaded offsite file for restore
            if downloaded_path:
                base_args.append(downloaded_path)
                yield _sse_line({"line": f"Download complete. Restoring from {downloaded_path}...", "timestamp": _now()})
            else:
                yield _sse_line({"line": "Download complete. Starting restore...", "timestamp": _now()})
        success = True
        async for item in _stream_with_keepalive(stream_ops_host(base_args, timeout=_BACKUP_TIMEOUT, op_id=op_id)):
            if item is None:
                # Keepalive ping — SSE comment to prevent idle timeout
                yield ": keepalive\n\n"
            else:
                yield _sse_line({"line": item, "timestamp": _now()})
                if item.startswith("[error]"):
                    success = False
        if is_cancelled(op_id):
            yield _sse_line({"done": True, "success": False, "cancelled": True})
        else:
            yield _sse_line({"done": True, "success": success})
    finally:
        clear_cancelled(op_id)
def _now() -> str:
    return datetime.now(timezone.utc).isoformat()
async def _stream_with_keepalive(gen: AsyncGenerator[str, None]) -> AsyncGenerator[str | None, None]:
    """Wrap an async generator to yield None as keepalive when no data arrives within the interval."""
    aiter = gen.__aiter__()
    pending = asyncio.ensure_future(aiter.__anext__())
    while True:
        done, _ = await asyncio.wait({pending}, timeout=_KEEPALIVE_INTERVAL)
        if done:
            try:
                yield pending.result()
            except StopAsyncIteration:
                break
            pending = asyncio.ensure_future(aiter.__anext__())
        else:
            yield None  # keepalive — prevents Traefik idle timeout
@router.get("/{project}/{env}", summary="Restore a backup with real-time output")
async def restore_backup(
    project: str,
    env: str,
    source: Literal["local", "offsite"] = Query(default="local"),
    dry_run: bool = Query(default=False, alias="dry_run"),
    name: str | None = Query(default=None),
    mode: Literal["full", "db", "wp"] = Query(default="full"),
    _: str = Depends(verify_token),
) -> StreamingResponse:
    """
    Restore a backup for the given project/env.
    Uses Server-Sent Events (SSE) to stream real-time progress.
    Runs on the host via nsenter for Python venv compatibility.
    Modes: full (default), db (database only), wp (wp-content only).
    """
    return StreamingResponse(
        _restore_generator(project, env, source, dry_run, name, mode),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",
        },
    )