import json from datetime import datetime, timezone from typing import AsyncGenerator, Literal from fastapi import APIRouter, Depends, HTTPException, Query from fastapi.responses import StreamingResponse from app.auth import verify_token from app.ops_runner import _BACKUP_TIMEOUT, stream_ops_host router = APIRouter() # Only adjacent-environment sync paths are allowed (data flows down) _VALID_SYNC_PAIRS = {("prod", "int"), ("int", "dev"), ("int", "prod"), ("dev", "int")} def _sse_line(payload: dict) -> str: return f"data: {json.dumps(payload)}\n\n" def _now() -> str: return datetime.now(timezone.utc).isoformat() async def _sync_generator( project: str, from_env: str, to_env: str, db_only: bool, uploads_only: bool, dry_run: bool = False, skip_backup: bool = False, ) -> AsyncGenerator[str, None]: """Stream sync output via SSE.""" args = ["sync", project, "--from", from_env, "--to", to_env] if db_only: args.append("--db-only") if uploads_only: args.append("--uploads-only") if dry_run: args.append("--dry-run") if skip_backup: args.append("--skip-backup") mode = "db-only" if db_only else ("uploads-only" if uploads_only else "full") yield _sse_line({ "line": f"Syncing {project}: {from_env} -> {to_env} ({mode})...", "timestamp": _now(), }) success = True async for line in stream_ops_host(args, timeout=_BACKUP_TIMEOUT): yield _sse_line({"line": line, "timestamp": _now()}) if line.startswith("[error]") or "failed" in line.lower(): success = False yield _sse_line({"done": True, "success": success}) @router.get("/{project}", summary="Sync data with real-time output") async def sync_data( project: str, from_env: str = Query(default="prod", alias="from"), to_env: str = Query(default="int", alias="to"), db_only: bool = Query(default=False), uploads_only: bool = Query(default=False), dry_run: bool = Query(default=False), skip_backup: bool = Query(default=False), _: str = Depends(verify_token), ) -> StreamingResponse: """Sync data backward (prod->int, int->dev) with SSE streaming.""" if (from_env, to_env) not in _VALID_SYNC_PAIRS: raise HTTPException( status_code=400, detail=f"Invalid sync path '{from_env} -> {to_env}'. Only adjacent pairs are allowed: prod<->int, int<->dev.", ) return StreamingResponse( _sync_generator(project, from_env, to_env, db_only, uploads_only, dry_run, skip_backup), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "X-Accel-Buffering": "no", }, )