1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
| | import json
| | from datetime import datetime, timezone
| | from typing import AsyncGenerator, Literal
| |
| | from fastapi import APIRouter, Depends, HTTPException, Query
| | from fastapi.responses import StreamingResponse
| |
| | from app.auth import verify_token
| | from app.ops_runner import _BACKUP_TIMEOUT, stream_ops_host
| |
| | router = APIRouter()
| |
| | # Only adjacent-environment sync paths are allowed (data flows down)
| | _VALID_SYNC_PAIRS = {("prod", "int"), ("int", "dev"), ("int", "prod"), ("dev", "int")}
| |
| |
| | def _sse_line(payload: dict) -> str:
| | return f"data: {json.dumps(payload)}\n\n"
| |
| |
| | def _now() -> str:
| | return datetime.now(timezone.utc).isoformat()
| |
| |
| | async def _sync_generator(
| | project: str,
| | from_env: str,
| | to_env: str,
| | db_only: bool,
| | uploads_only: bool,
| | dry_run: bool = False,
| | skip_backup: bool = False,
| | ) -> AsyncGenerator[str, None]:
| | """Stream sync output via SSE."""
| | args = ["sync", project, "--from", from_env, "--to", to_env]
| | if db_only:
| | args.append("--db-only")
| | if uploads_only:
| | args.append("--uploads-only")
| | if dry_run:
| | args.append("--dry-run")
| | if skip_backup:
| | args.append("--skip-backup")
| |
| | mode = "db-only" if db_only else ("uploads-only" if uploads_only else "full")
| | yield _sse_line({
| | "line": f"Syncing {project}: {from_env} -> {to_env} ({mode})...",
| | "timestamp": _now(),
| | })
| |
| | success = True
| | async for line in stream_ops_host(args, timeout=_BACKUP_TIMEOUT):
| | yield _sse_line({"line": line, "timestamp": _now()})
| | if line.startswith("[error]") or "failed" in line.lower():
| | success = False
| |
| | yield _sse_line({"done": True, "success": success})
| |
| |
| | @router.get("/{project}", summary="Sync data with real-time output")
| | async def sync_data(
| | project: str,
| | from_env: str = Query(default="prod", alias="from"),
| | to_env: str = Query(default="int", alias="to"),
| | db_only: bool = Query(default=False),
| | uploads_only: bool = Query(default=False),
| | dry_run: bool = Query(default=False),
| | skip_backup: bool = Query(default=False),
| | _: str = Depends(verify_token),
| | ) -> StreamingResponse:
| | """Sync data backward (prod->int, int->dev) with SSE streaming."""
| | if (from_env, to_env) not in _VALID_SYNC_PAIRS:
| | raise HTTPException(
| | status_code=400,
| | detail=f"Invalid sync path '{from_env} -> {to_env}'. Only adjacent pairs are allowed: prod<->int, int<->dev.",
| | )
| | return StreamingResponse(
| | _sync_generator(project, from_env, to_env, db_only, uploads_only, dry_run, skip_backup),
| | media_type="text/event-stream",
| | headers={
| | "Cache-Control": "no-cache",
| | "X-Accel-Buffering": "no",
| | },
| | )
|
|