import json from datetime import datetime, timezone from typing import AsyncGenerator, Literal from fastapi import APIRouter, Depends, HTTPException, Query from fastapi.responses import StreamingResponse from app.auth import verify_token from app.ops_runner import _BACKUP_TIMEOUT, stream_ops_host router = APIRouter() # Only adjacent-environment promote paths are allowed (code flows up) _VALID_PROMOTE_PAIRS = {("dev", "int"), ("int", "prod")} def _sse_line(payload: dict) -> str: return f"data: {json.dumps(payload)}\n\n" def _now() -> str: return datetime.now(timezone.utc).isoformat() async def _promote_generator( project: str, from_env: str, to_env: str, dry_run: bool, ) -> AsyncGenerator[str, None]: """Stream promote output via SSE.""" args = ["promote", project, from_env, to_env] if dry_run: args.append("--dry-run") args.append("--yes") label = f"Promoting {project}: {from_env} -> {to_env}" if dry_run: label += " (dry run)" yield _sse_line({"line": label, "timestamp": _now()}) success = True async for line in stream_ops_host(args, timeout=_BACKUP_TIMEOUT): yield _sse_line({"line": line, "timestamp": _now()}) if line.startswith("[error]") or "failed" in line.lower(): success = False yield _sse_line({"done": True, "success": success}) @router.get("/{project}/{from_env}/{to_env}", summary="Promote code with real-time output") async def promote_code( project: str, from_env: str, to_env: str, dry_run: bool = Query(default=False, alias="dry_run"), _: str = Depends(verify_token), ) -> StreamingResponse: """Promote code forward (dev->int, int->prod) with SSE streaming.""" if (from_env, to_env) not in _VALID_PROMOTE_PAIRS: raise HTTPException( status_code=400, detail=f"Invalid promote path '{from_env} -> {to_env}'. Only adjacent pairs are allowed: dev->int, int->prod.", ) return StreamingResponse( _promote_generator(project, from_env, to_env, dry_run), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "X-Accel-Buffering": "no", }, )