| .. | .. |
|---|
| 1 | | -from typing import Any |
|---|
| 1 | +import json |
|---|
| 2 | +from datetime import datetime, timezone |
|---|
| 3 | +from typing import Any, AsyncGenerator |
|---|
| 2 | 4 | |
|---|
| 3 | 5 | from fastapi import APIRouter, Depends, HTTPException, Query |
|---|
| 6 | +from fastapi.responses import StreamingResponse |
|---|
| 4 | 7 | |
|---|
| 5 | 8 | from app.auth import verify_token |
|---|
| 6 | | -from app.ops_runner import run_ops, run_ops_json, run_ops_host, run_ops_host_json, run_command_host, _BACKUP_TIMEOUT |
|---|
| 9 | +from app.ops_runner import ( |
|---|
| 10 | + run_ops, run_ops_json, run_ops_host, run_ops_host_json, run_command_host, |
|---|
| 11 | + stream_ops_host, stream_command_host, new_op_id, is_cancelled, clear_cancelled, |
|---|
| 12 | + _BACKUP_TIMEOUT, OFFSITE_PYTHON, |
|---|
| 13 | +) |
|---|
| 7 | 14 | |
|---|
| 8 | 15 | router = APIRouter() |
|---|
| 16 | + |
|---|
| 17 | + |
|---|
| 18 | +def _sse(payload: dict) -> str: |
|---|
| 19 | + return f"data: {json.dumps(payload)}\n\n" |
|---|
| 20 | + |
|---|
| 21 | + |
|---|
| 22 | +def _now() -> str: |
|---|
| 23 | + return datetime.now(timezone.utc).isoformat() |
|---|
| 9 | 24 | |
|---|
| 10 | 25 | |
|---|
| 11 | 26 | @router.get("/", summary="List local backups") |
|---|
| .. | .. |
|---|
| 82 | 97 | } |
|---|
| 83 | 98 | |
|---|
| 84 | 99 | |
|---|
| 100 | +async def _backup_stream(project: str, env: str) -> AsyncGenerator[str, None]: |
|---|
| 101 | + """Stream backup creation progress via SSE.""" |
|---|
| 102 | + op_id = new_op_id() |
|---|
| 103 | + yield _sse({"op_id": op_id}) |
|---|
| 104 | + yield _sse({"line": f"Creating backup for {project}/{env}...", "timestamp": _now()}) |
|---|
| 105 | + |
|---|
| 106 | + try: |
|---|
| 107 | + success = True |
|---|
| 108 | + async for line in stream_ops_host( |
|---|
| 109 | + ["backup", project, env], timeout=_BACKUP_TIMEOUT, op_id=op_id |
|---|
| 110 | + ): |
|---|
| 111 | + yield _sse({"line": line, "timestamp": _now()}) |
|---|
| 112 | + if line.startswith("[error]") or line.startswith("ERROR"): |
|---|
| 113 | + success = False |
|---|
| 114 | + |
|---|
| 115 | + if is_cancelled(op_id): |
|---|
| 116 | + yield _sse({"done": True, "success": False, "cancelled": True}) |
|---|
| 117 | + else: |
|---|
| 118 | + yield _sse({"done": True, "success": success, "project": project, "env": env}) |
|---|
| 119 | + finally: |
|---|
| 120 | + clear_cancelled(op_id) |
|---|
| 121 | + |
|---|
| 122 | + |
|---|
| 123 | +@router.get("/stream/{project}/{env}", summary="Create backup with streaming output") |
|---|
| 124 | +async def create_backup_stream( |
|---|
| 125 | + project: str, |
|---|
| 126 | + env: str, |
|---|
| 127 | + _: str = Depends(verify_token), |
|---|
| 128 | +) -> StreamingResponse: |
|---|
| 129 | + """Create a backup with real-time SSE progress output.""" |
|---|
| 130 | + return StreamingResponse( |
|---|
| 131 | + _backup_stream(project, env), |
|---|
| 132 | + media_type="text/event-stream", |
|---|
| 133 | + headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, |
|---|
| 134 | + ) |
|---|
| 135 | + |
|---|
| 136 | + |
|---|
| 85 | 137 | @router.post("/offsite/upload/{project}/{env}", summary="Upload backup to offsite") |
|---|
| 86 | 138 | async def upload_offsite( |
|---|
| 87 | 139 | project: str, |
|---|
| .. | .. |
|---|
| 100 | 152 | return {"success": True, "output": result["output"], "project": project, "env": env} |
|---|
| 101 | 153 | |
|---|
| 102 | 154 | |
|---|
| 155 | +async def _upload_stream(project: str, env: str, name: str | None = None) -> AsyncGenerator[str, None]: |
|---|
| 156 | + """Stream offsite upload progress via SSE.""" |
|---|
| 157 | + op_id = new_op_id() |
|---|
| 158 | + yield _sse({"op_id": op_id}) |
|---|
| 159 | + label = f"{project}/{env}/{name}" if name else f"{project}/{env} (latest)" |
|---|
| 160 | + yield _sse({"line": f"Uploading {label} to offsite storage...", "timestamp": _now()}) |
|---|
| 161 | + |
|---|
| 162 | + cmd = ["offsite", "upload", project, env] |
|---|
| 163 | + if name: |
|---|
| 164 | + cmd.append(name) |
|---|
| 165 | + |
|---|
| 166 | + try: |
|---|
| 167 | + success = True |
|---|
| 168 | + async for line in stream_ops_host( |
|---|
| 169 | + cmd, timeout=_BACKUP_TIMEOUT, op_id=op_id |
|---|
| 170 | + ): |
|---|
| 171 | + yield _sse({"line": line, "timestamp": _now()}) |
|---|
| 172 | + if line.startswith("[error]") or line.startswith("ERROR"): |
|---|
| 173 | + success = False |
|---|
| 174 | + |
|---|
| 175 | + if is_cancelled(op_id): |
|---|
| 176 | + yield _sse({"done": True, "success": False, "cancelled": True}) |
|---|
| 177 | + else: |
|---|
| 178 | + yield _sse({"done": True, "success": success, "project": project, "env": env}) |
|---|
| 179 | + finally: |
|---|
| 180 | + clear_cancelled(op_id) |
|---|
| 181 | + |
|---|
| 182 | + |
|---|
| 183 | +@router.get("/offsite/stream/{project}/{env}", summary="Upload to offsite with streaming output") |
|---|
| 184 | +async def upload_offsite_stream( |
|---|
| 185 | + project: str, |
|---|
| 186 | + env: str, |
|---|
| 187 | + name: str | None = Query(None), |
|---|
| 188 | + _: str = Depends(verify_token), |
|---|
| 189 | +) -> StreamingResponse: |
|---|
| 190 | + """Upload backup to offsite with real-time SSE progress output.""" |
|---|
| 191 | + return StreamingResponse( |
|---|
| 192 | + _upload_stream(project, env, name), |
|---|
| 193 | + media_type="text/event-stream", |
|---|
| 194 | + headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, |
|---|
| 195 | + ) |
|---|
| 196 | + |
|---|
| 197 | + |
|---|
| 198 | +async def _download_stream(project: str, env: str, name: str) -> AsyncGenerator[str, None]: |
|---|
| 199 | + """Stream offsite-to-local download progress via SSE.""" |
|---|
| 200 | + op_id = new_op_id() |
|---|
| 201 | + yield _sse({"op_id": op_id}) |
|---|
| 202 | + yield _sse({"line": f"Downloading {name} from offsite to local storage...", "timestamp": _now()}) |
|---|
| 203 | + |
|---|
| 204 | + # Download to the local backup directory so it appears in the backup list |
|---|
| 205 | + local_path = f"/opt/data/backups/{project}/{env}/{name}" |
|---|
| 206 | + cmd = [ |
|---|
| 207 | + OFFSITE_PYTHON, "-c", |
|---|
| 208 | + f"import sys; sys.stdout.reconfigure(line_buffering=True); " |
|---|
| 209 | + f"sys.path.insert(0, '/opt/data/scripts'); " |
|---|
| 210 | + f"from offsite import download; from pathlib import Path; " |
|---|
| 211 | + f"import os; os.makedirs('/opt/data/backups/{project}/{env}', exist_ok=True); " |
|---|
| 212 | + f"ok = download('{name}', Path('{local_path}'), '{project}', '{env}'); " |
|---|
| 213 | + f"sys.exit(0 if ok else 1)" |
|---|
| 214 | + ] |
|---|
| 215 | + |
|---|
| 216 | + try: |
|---|
| 217 | + success = True |
|---|
| 218 | + async for line in stream_command_host(cmd, timeout=_BACKUP_TIMEOUT, op_id=op_id): |
|---|
| 219 | + yield _sse({"line": line, "timestamp": _now()}) |
|---|
| 220 | + if line.startswith("[error]") or line.startswith("ERROR") or "failed" in line.lower(): |
|---|
| 221 | + success = False |
|---|
| 222 | + |
|---|
| 223 | + if is_cancelled(op_id): |
|---|
| 224 | + yield _sse({"done": True, "success": False, "cancelled": True}) |
|---|
| 225 | + else: |
|---|
| 226 | + yield _sse({"done": True, "success": success, "project": project, "env": env, "name": name}) |
|---|
| 227 | + finally: |
|---|
| 228 | + clear_cancelled(op_id) |
|---|
| 229 | + |
|---|
| 230 | + |
|---|
| 231 | +@router.get("/offsite/download/stream/{project}/{env}", summary="Download offsite backup to local storage with streaming output") |
|---|
| 232 | +async def download_offsite_stream( |
|---|
| 233 | + project: str, |
|---|
| 234 | + env: str, |
|---|
| 235 | + name: str = Query(...), |
|---|
| 236 | + _: str = Depends(verify_token), |
|---|
| 237 | +) -> StreamingResponse: |
|---|
| 238 | + """Download an offsite backup to local storage with real-time SSE progress output.""" |
|---|
| 239 | + if "/" in name or "\\" in name or ".." in name: |
|---|
| 240 | + raise HTTPException(status_code=400, detail="Invalid backup name") |
|---|
| 241 | + return StreamingResponse( |
|---|
| 242 | + _download_stream(project, env, name), |
|---|
| 243 | + media_type="text/event-stream", |
|---|
| 244 | + headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, |
|---|
| 245 | + ) |
|---|
| 246 | + |
|---|
| 247 | + |
|---|
| 103 | 248 | @router.post("/offsite/retention", summary="Apply offsite retention policy") |
|---|
| 104 | 249 | async def apply_retention( |
|---|
| 105 | 250 | _: str = Depends(verify_token), |
|---|