Matthias Nott
2026-02-22 7d94ec0d18b46893e23680cf8438109a34cc2a10
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import json
from datetime import datetime, timezone
from typing import AsyncGenerator, Literal
from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi.responses import StreamingResponse
from app.auth import verify_token
from app.ops_runner import _BACKUP_TIMEOUT, stream_ops_host
router = APIRouter()
# Only adjacent-environment sync paths are allowed (data flows down)
_VALID_SYNC_PAIRS = {("prod", "int"), ("int", "dev")}
def _sse_line(payload: dict) -> str:
    return f"data: {json.dumps(payload)}\n\n"
def _now() -> str:
    return datetime.now(timezone.utc).isoformat()
async def _sync_generator(
    project: str,
    from_env: str,
    to_env: str,
    db_only: bool,
    uploads_only: bool,
) -> AsyncGenerator[str, None]:
    """Stream sync output via SSE."""
    args = ["sync", project, "--from", from_env, "--to", to_env, "--yes"]
    if db_only:
        args.append("--db-only")
    if uploads_only:
        args.append("--uploads-only")
    mode = "db-only" if db_only else ("uploads-only" if uploads_only else "full")
    yield _sse_line({
        "line": f"Syncing {project}: {from_env} -> {to_env} ({mode})...",
        "timestamp": _now(),
    })
    success = True
    async for line in stream_ops_host(args, timeout=_BACKUP_TIMEOUT):
        yield _sse_line({"line": line, "timestamp": _now()})
        if line.startswith("[error]") or "failed" in line.lower():
            success = False
    yield _sse_line({"done": True, "success": success})
@router.get("/{project}", summary="Sync data with real-time output")
async def sync_data(
    project: str,
    from_env: str = Query(default="prod", alias="from"),
    to_env: str = Query(default="int", alias="to"),
    db_only: bool = Query(default=False),
    uploads_only: bool = Query(default=False),
    _: str = Depends(verify_token),
) -> StreamingResponse:
    """Sync data backward (prod->int, int->dev) with SSE streaming."""
    if (from_env, to_env) not in _VALID_SYNC_PAIRS:
        raise HTTPException(
            status_code=400,
            detail=f"Invalid sync path '{from_env} -> {to_env}'. Only adjacent pairs are allowed: prod->int, int->dev.",
        )
    return StreamingResponse(
        _sync_generator(project, from_env, to_env, db_only, uploads_only),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",
        },
    )