1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
| | import asyncio
| | import json
| | from datetime import datetime, timezone
| | from typing import AsyncGenerator, Literal
| |
| | from fastapi import APIRouter, Depends, Query
| | from fastapi.responses import StreamingResponse
| |
| | from app.auth import verify_token
| | from app.ops_runner import _BACKUP_TIMEOUT, new_op_id, is_cancelled, clear_cancelled, stream_ops_host
| |
| | _KEEPALIVE_INTERVAL = 15 # seconds between SSE keepalive pings
| |
| | router = APIRouter()
| |
| |
| | def _sse_line(payload: dict) -> str:
| | """Format a dict as a single SSE data line."""
| | return f"data: {json.dumps(payload)}\n\n"
| |
| |
| | async def _restore_generator(
| | project: str,
| | env: str,
| | source: str,
| | dry_run: bool,
| | name: str | None = None,
| | mode: str = "full",
| | ) -> AsyncGenerator[str, None]:
| | """Async generator that drives the restore workflow and yields SSE events.
| |
| | Runs on the host via nsenter because ops restore delegates to project CLIs
| | that use host Python venvs incompatible with the container's Python.
| | """
| | op_id = new_op_id()
| | yield _sse_line({"op_id": op_id})
| |
| | try:
| | base_args = ["restore", project, env]
| |
| | # Pass the backup file path to avoid interactive selection prompt
| | if name:
| | backup_path = f"/opt/data/backups/{project}/{env}/{name}"
| | base_args.append(backup_path)
| |
| | if dry_run:
| | base_args.append("--dry-run")
| |
| | # Granular restore mode
| | if mode == "db":
| | base_args.append("--db-only")
| | elif mode == "wp":
| | base_args.append("--wp-only")
| |
| | if source == "offsite":
| | # ops offsite restore <project> <env>
| | download_args = ["offsite", "restore", project, env]
| | yield _sse_line({"line": f"Downloading {project}/{env} from offsite...", "timestamp": _now()})
| |
| | download_ok = True
| | downloaded_path = None
| | async for line in stream_ops_host(download_args, timeout=_BACKUP_TIMEOUT, op_id=op_id):
| | yield _sse_line({"line": line, "timestamp": _now()})
| | if line.startswith("[error]"):
| | download_ok = False
| | # Capture downloaded file path from offsite.py output
| | if "Downloaded to" in line and "/tmp/" in line:
| | # Parse "Downloaded to: /tmp/filename.tar.gz" or similar
| | for part in line.split():
| | if part.startswith("/tmp/") and part.endswith(".tar.gz"):
| | downloaded_path = part
| | elif line.startswith(" ✓ Downloaded to "):
| | for part in line.split():
| | if part.startswith("/tmp/") and part.endswith(".tar.gz"):
| | downloaded_path = part
| |
| | if is_cancelled(op_id):
| | yield _sse_line({"done": True, "success": False, "cancelled": True})
| | return
| |
| | if not download_ok:
| | yield _sse_line({"done": True, "success": False})
| | return
| |
| | # Use the downloaded offsite file for restore
| | if downloaded_path:
| | base_args.append(downloaded_path)
| | yield _sse_line({"line": f"Download complete. Restoring from {downloaded_path}...", "timestamp": _now()})
| | else:
| | yield _sse_line({"line": "Download complete. Starting restore...", "timestamp": _now()})
| |
| | success = True
| | async for item in _stream_with_keepalive(stream_ops_host(base_args, timeout=_BACKUP_TIMEOUT, op_id=op_id)):
| | if item is None:
| | # Keepalive ping — SSE comment to prevent idle timeout
| | yield ": keepalive\n\n"
| | else:
| | yield _sse_line({"line": item, "timestamp": _now()})
| | if item.startswith("[error]"):
| | success = False
| |
| | if is_cancelled(op_id):
| | yield _sse_line({"done": True, "success": False, "cancelled": True})
| | else:
| | yield _sse_line({"done": True, "success": success})
| | finally:
| | clear_cancelled(op_id)
| |
| |
| | def _now() -> str:
| | return datetime.now(timezone.utc).isoformat()
| |
| |
| | async def _stream_with_keepalive(gen: AsyncGenerator[str, None]) -> AsyncGenerator[str | None, None]:
| | """Wrap an async generator to yield None as keepalive when no data arrives within the interval."""
| | aiter = gen.__aiter__()
| | pending = asyncio.ensure_future(aiter.__anext__())
| | while True:
| | done, _ = await asyncio.wait({pending}, timeout=_KEEPALIVE_INTERVAL)
| | if done:
| | try:
| | yield pending.result()
| | except StopAsyncIteration:
| | break
| | pending = asyncio.ensure_future(aiter.__anext__())
| | else:
| | yield None # keepalive — prevents Traefik idle timeout
| |
| |
| | @router.get("/{project}/{env}", summary="Restore a backup with real-time output")
| | async def restore_backup(
| | project: str,
| | env: str,
| | source: Literal["local", "offsite"] = Query(default="local"),
| | dry_run: bool = Query(default=False, alias="dry_run"),
| | name: str | None = Query(default=None),
| | mode: Literal["full", "db", "wp"] = Query(default="full"),
| | _: str = Depends(verify_token),
| | ) -> StreamingResponse:
| | """
| | Restore a backup for the given project/env.
| |
| | Uses Server-Sent Events (SSE) to stream real-time progress.
| | Runs on the host via nsenter for Python venv compatibility.
| |
| | Modes: full (default), db (database only), wp (wp-content only).
| | """
| | return StreamingResponse(
| | _restore_generator(project, env, source, dry_run, name, mode),
| | media_type="text/event-stream",
| | headers={
| | "Cache-Control": "no-cache",
| | "X-Accel-Buffering": "no",
| | },
| | )
|
|