Matthias Nott
2026-02-25 fd03c16eca085423267c163137b28ccb60de8db0
app/routers/restore.py
....@@ -1,3 +1,4 @@
1
+import asyncio
12 import json
23 from datetime import datetime, timezone
34 from typing import AsyncGenerator, Literal
....@@ -6,7 +7,9 @@
67 from fastapi.responses import StreamingResponse
78
89 from app.auth import verify_token
9
-from app.ops_runner import _BACKUP_TIMEOUT, stream_ops_host
10
+from app.ops_runner import _BACKUP_TIMEOUT, new_op_id, is_cancelled, clear_cancelled, stream_ops_host
11
+
12
+_KEEPALIVE_INTERVAL = 15 # seconds between SSE keepalive pings
1013
1114 router = APIRouter()
1215
....@@ -29,52 +32,101 @@
2932 Runs on the host via nsenter because ops restore delegates to project CLIs
3033 that use host Python venvs incompatible with the container's Python.
3134 """
32
- base_args = ["restore", project, env]
35
+ op_id = new_op_id()
36
+ yield _sse_line({"op_id": op_id})
3337
34
- # Pass the backup file path to avoid interactive selection prompt
35
- if name:
36
- backup_path = f"/opt/data/backups/{project}/{env}/{name}"
37
- base_args.append(backup_path)
38
+ try:
39
+ base_args = ["restore", project, env]
3840
39
- if dry_run:
40
- base_args.append("--dry-run")
41
+ # Pass the backup file path to avoid interactive selection prompt
42
+ if name:
43
+ backup_path = f"/opt/data/backups/{project}/{env}/{name}"
44
+ base_args.append(backup_path)
4145
42
- # Granular restore mode
43
- if mode == "db":
44
- base_args.append("--db-only")
45
- elif mode == "wp":
46
- base_args.append("--wp-only")
46
+ if dry_run:
47
+ base_args.append("--dry-run")
4748
48
- if source == "offsite":
49
- # ops offsite restore <project> <env>
50
- download_args = ["offsite", "restore", project, env]
51
- yield _sse_line({"line": f"Downloading {project}/{env} from offsite...", "timestamp": _now()})
49
+ # Granular restore mode
50
+ if mode == "db":
51
+ base_args.append("--db-only")
52
+ elif mode == "wp":
53
+ base_args.append("--wp-only")
5254
53
- download_ok = True
54
- async for line in stream_ops_host(download_args, timeout=_BACKUP_TIMEOUT):
55
- yield _sse_line({"line": line, "timestamp": _now()})
56
- if line.startswith("[error]"):
57
- download_ok = False
55
+ if source == "offsite":
56
+ # ops offsite restore <project> <env>
57
+ download_args = ["offsite", "restore", project, env]
58
+ yield _sse_line({"line": f"Downloading {project}/{env} from offsite...", "timestamp": _now()})
5859
59
- if not download_ok:
60
- yield _sse_line({"done": True, "success": False})
61
- return
60
+ download_ok = True
61
+ downloaded_path = None
62
+ async for line in stream_ops_host(download_args, timeout=_BACKUP_TIMEOUT, op_id=op_id):
63
+ yield _sse_line({"line": line, "timestamp": _now()})
64
+ if line.startswith("[error]"):
65
+ download_ok = False
66
+ # Capture downloaded file path from offsite.py output
67
+ if "Downloaded to" in line and "/tmp/" in line:
68
+ # Parse "Downloaded to: /tmp/filename.tar.gz" or similar
69
+ for part in line.split():
70
+ if part.startswith("/tmp/") and part.endswith(".tar.gz"):
71
+ downloaded_path = part
72
+ elif line.startswith(" ✓ Downloaded to "):
73
+ for part in line.split():
74
+ if part.startswith("/tmp/") and part.endswith(".tar.gz"):
75
+ downloaded_path = part
6276
63
- yield _sse_line({"line": "Download complete. Starting restore...", "timestamp": _now()})
77
+ if is_cancelled(op_id):
78
+ yield _sse_line({"done": True, "success": False, "cancelled": True})
79
+ return
6480
65
- success = True
66
- async for line in stream_ops_host(base_args, timeout=_BACKUP_TIMEOUT):
67
- yield _sse_line({"line": line, "timestamp": _now()})
68
- if line.startswith("[error]"):
69
- success = False
81
+ if not download_ok:
82
+ yield _sse_line({"done": True, "success": False})
83
+ return
7084
71
- yield _sse_line({"done": True, "success": success})
85
+ # Use the downloaded offsite file for restore
86
+ if downloaded_path:
87
+ base_args.append(downloaded_path)
88
+ yield _sse_line({"line": f"Download complete. Restoring from {downloaded_path}...", "timestamp": _now()})
89
+ else:
90
+ yield _sse_line({"line": "Download complete. Starting restore...", "timestamp": _now()})
91
+
92
+ success = True
93
+ async for item in _stream_with_keepalive(stream_ops_host(base_args, timeout=_BACKUP_TIMEOUT, op_id=op_id)):
94
+ if item is None:
95
+ # Keepalive ping — SSE comment to prevent idle timeout
96
+ yield ": keepalive\n\n"
97
+ else:
98
+ yield _sse_line({"line": item, "timestamp": _now()})
99
+ if item.startswith("[error]"):
100
+ success = False
101
+
102
+ if is_cancelled(op_id):
103
+ yield _sse_line({"done": True, "success": False, "cancelled": True})
104
+ else:
105
+ yield _sse_line({"done": True, "success": success})
106
+ finally:
107
+ clear_cancelled(op_id)
72108
73109
74110 def _now() -> str:
75111 return datetime.now(timezone.utc).isoformat()
76112
77113
114
+async def _stream_with_keepalive(gen: AsyncGenerator[str, None]) -> AsyncGenerator[str | None, None]:
115
+ """Wrap an async generator to yield None as keepalive when no data arrives within the interval."""
116
+ aiter = gen.__aiter__()
117
+ pending = asyncio.ensure_future(aiter.__anext__())
118
+ while True:
119
+ done, _ = await asyncio.wait({pending}, timeout=_KEEPALIVE_INTERVAL)
120
+ if done:
121
+ try:
122
+ yield pending.result()
123
+ except StopAsyncIteration:
124
+ break
125
+ pending = asyncio.ensure_future(aiter.__anext__())
126
+ else:
127
+ yield None # keepalive — prevents Traefik idle timeout
128
+
129
+
78130 @router.get("/{project}/{env}", summary="Restore a backup with real-time output")
79131 async def restore_backup(
80132 project: str,