Matthias Nott
2026-02-25 fd03c16eca085423267c163137b28ccb60de8db0
app/routers/backups.py
....@@ -1,11 +1,26 @@
1
-from typing import Any
1
+import json
2
+from datetime import datetime, timezone
3
+from typing import Any, AsyncGenerator
24
35 from fastapi import APIRouter, Depends, HTTPException, Query
6
+from fastapi.responses import StreamingResponse
47
58 from app.auth import verify_token
6
-from app.ops_runner import run_ops, run_ops_json, run_ops_host, run_ops_host_json, run_command_host, _BACKUP_TIMEOUT
9
+from app.ops_runner import (
10
+ run_ops, run_ops_json, run_ops_host, run_ops_host_json, run_command_host,
11
+ stream_ops_host, stream_command_host, new_op_id, is_cancelled, clear_cancelled,
12
+ _BACKUP_TIMEOUT, OFFSITE_PYTHON,
13
+)
714
815 router = APIRouter()
16
+
17
+
18
+def _sse(payload: dict) -> str:
19
+ return f"data: {json.dumps(payload)}\n\n"
20
+
21
+
22
+def _now() -> str:
23
+ return datetime.now(timezone.utc).isoformat()
924
1025
1126 @router.get("/", summary="List local backups")
....@@ -82,6 +97,43 @@
8297 }
8398
8499
100
+async def _backup_stream(project: str, env: str) -> AsyncGenerator[str, None]:
101
+ """Stream backup creation progress via SSE."""
102
+ op_id = new_op_id()
103
+ yield _sse({"op_id": op_id})
104
+ yield _sse({"line": f"Creating backup for {project}/{env}...", "timestamp": _now()})
105
+
106
+ try:
107
+ success = True
108
+ async for line in stream_ops_host(
109
+ ["backup", project, env], timeout=_BACKUP_TIMEOUT, op_id=op_id
110
+ ):
111
+ yield _sse({"line": line, "timestamp": _now()})
112
+ if line.startswith("[error]") or line.startswith("ERROR"):
113
+ success = False
114
+
115
+ if is_cancelled(op_id):
116
+ yield _sse({"done": True, "success": False, "cancelled": True})
117
+ else:
118
+ yield _sse({"done": True, "success": success, "project": project, "env": env})
119
+ finally:
120
+ clear_cancelled(op_id)
121
+
122
+
123
+@router.get("/stream/{project}/{env}", summary="Create backup with streaming output")
124
+async def create_backup_stream(
125
+ project: str,
126
+ env: str,
127
+ _: str = Depends(verify_token),
128
+) -> StreamingResponse:
129
+ """Create a backup with real-time SSE progress output."""
130
+ return StreamingResponse(
131
+ _backup_stream(project, env),
132
+ media_type="text/event-stream",
133
+ headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
134
+ )
135
+
136
+
85137 @router.post("/offsite/upload/{project}/{env}", summary="Upload backup to offsite")
86138 async def upload_offsite(
87139 project: str,
....@@ -100,6 +152,99 @@
100152 return {"success": True, "output": result["output"], "project": project, "env": env}
101153
102154
155
+async def _upload_stream(project: str, env: str, name: str | None = None) -> AsyncGenerator[str, None]:
156
+ """Stream offsite upload progress via SSE."""
157
+ op_id = new_op_id()
158
+ yield _sse({"op_id": op_id})
159
+ label = f"{project}/{env}/{name}" if name else f"{project}/{env} (latest)"
160
+ yield _sse({"line": f"Uploading {label} to offsite storage...", "timestamp": _now()})
161
+
162
+ cmd = ["offsite", "upload", project, env]
163
+ if name:
164
+ cmd.append(name)
165
+
166
+ try:
167
+ success = True
168
+ async for line in stream_ops_host(
169
+ cmd, timeout=_BACKUP_TIMEOUT, op_id=op_id
170
+ ):
171
+ yield _sse({"line": line, "timestamp": _now()})
172
+ if line.startswith("[error]") or line.startswith("ERROR"):
173
+ success = False
174
+
175
+ if is_cancelled(op_id):
176
+ yield _sse({"done": True, "success": False, "cancelled": True})
177
+ else:
178
+ yield _sse({"done": True, "success": success, "project": project, "env": env})
179
+ finally:
180
+ clear_cancelled(op_id)
181
+
182
+
183
+@router.get("/offsite/stream/{project}/{env}", summary="Upload to offsite with streaming output")
184
+async def upload_offsite_stream(
185
+ project: str,
186
+ env: str,
187
+ name: str | None = Query(None),
188
+ _: str = Depends(verify_token),
189
+) -> StreamingResponse:
190
+ """Upload backup to offsite with real-time SSE progress output."""
191
+ return StreamingResponse(
192
+ _upload_stream(project, env, name),
193
+ media_type="text/event-stream",
194
+ headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
195
+ )
196
+
197
+
198
+async def _download_stream(project: str, env: str, name: str) -> AsyncGenerator[str, None]:
199
+ """Stream offsite-to-local download progress via SSE."""
200
+ op_id = new_op_id()
201
+ yield _sse({"op_id": op_id})
202
+ yield _sse({"line": f"Downloading {name} from offsite to local storage...", "timestamp": _now()})
203
+
204
+ # Download to the local backup directory so it appears in the backup list
205
+ local_path = f"/opt/data/backups/{project}/{env}/{name}"
206
+ cmd = [
207
+ OFFSITE_PYTHON, "-c",
208
+ f"import sys; sys.stdout.reconfigure(line_buffering=True); "
209
+ f"sys.path.insert(0, '/opt/data/scripts'); "
210
+ f"from offsite import download; from pathlib import Path; "
211
+ f"import os; os.makedirs('/opt/data/backups/{project}/{env}', exist_ok=True); "
212
+ f"ok = download('{name}', Path('{local_path}'), '{project}', '{env}'); "
213
+ f"sys.exit(0 if ok else 1)"
214
+ ]
215
+
216
+ try:
217
+ success = True
218
+ async for line in stream_command_host(cmd, timeout=_BACKUP_TIMEOUT, op_id=op_id):
219
+ yield _sse({"line": line, "timestamp": _now()})
220
+ if line.startswith("[error]") or line.startswith("ERROR") or "failed" in line.lower():
221
+ success = False
222
+
223
+ if is_cancelled(op_id):
224
+ yield _sse({"done": True, "success": False, "cancelled": True})
225
+ else:
226
+ yield _sse({"done": True, "success": success, "project": project, "env": env, "name": name})
227
+ finally:
228
+ clear_cancelled(op_id)
229
+
230
+
231
+@router.get("/offsite/download/stream/{project}/{env}", summary="Download offsite backup to local storage with streaming output")
232
+async def download_offsite_stream(
233
+ project: str,
234
+ env: str,
235
+ name: str = Query(...),
236
+ _: str = Depends(verify_token),
237
+) -> StreamingResponse:
238
+ """Download an offsite backup to local storage with real-time SSE progress output."""
239
+ if "/" in name or "\\" in name or ".." in name:
240
+ raise HTTPException(status_code=400, detail="Invalid backup name")
241
+ return StreamingResponse(
242
+ _download_stream(project, env, name),
243
+ media_type="text/event-stream",
244
+ headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
245
+ )
246
+
247
+
103248 @router.post("/offsite/retention", summary="Apply offsite retention policy")
104249 async def apply_retention(
105250 _: str = Depends(verify_token),