""" Container lifecycle operations via Coolify API + SSH. Three operations: restart – docker restart {containers} via SSH (no Coolify, no image pruning) rebuild – Coolify stop → docker build → Coolify start recreate – Coolify stop → wipe data → docker build → Coolify start → show backups banner """ import json import os import urllib.request import urllib.error from datetime import datetime, timezone from typing import AsyncGenerator import yaml from fastapi import APIRouter, Depends, Query from fastapi.responses import StreamingResponse from app.auth import verify_token from app.ops_runner import ( OPS_CLI, _BACKUP_TIMEOUT, run_command, run_command_host, stream_command_host, ) router = APIRouter() # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- _REGISTRY_PATH = os.environ.get( "REGISTRY_PATH", "/opt/infrastructure/servers/hetzner-vps/registry.yaml", ) _COOLIFY_BASE = os.environ.get( "COOLIFY_BASE_URL", "https://cockpit.tekmidian.com/api/v1", ) _COOLIFY_TOKEN = os.environ.get( "COOLIFY_API_TOKEN", "3|f1fa8ee5791440ddd37e6cecafd964c8cd734dd4a8891180c424efad6bfdb7f5", ) _COOLIFY_TIMEOUT = 30 # seconds for API calls _POLL_INTERVAL = 5 # seconds between container status polls _POLL_MAX_WAIT = 180 # max seconds to wait for containers to stop/start # --------------------------------------------------------------------------- # Registry helpers # --------------------------------------------------------------------------- def _load_registry() -> dict: with open(_REGISTRY_PATH) as f: return yaml.safe_load(f) or {} def _project_cfg(project: str) -> dict: reg = _load_registry() projects = reg.get("projects", {}) if project not in projects: raise ValueError(f"Unknown project '{project}'") return projects[project] def _coolify_uuid(project: str, env: str) -> str: cfg = _project_cfg(project) uuids = cfg.get("coolify_uuids", {}) uuid = uuids.get(env) if not uuid: raise ValueError( f"No coolify_uuid configured for {project}/{env} in registry.yaml" ) return uuid def _data_dir(project: str, env: str) -> str: cfg = _project_cfg(project) template = cfg.get("data_dir", "") if not template: raise ValueError(f"No data_dir configured for {project} in registry.yaml") return template.replace("{env}", env) def _build_cfg(project: str, env: str) -> dict | None: """Return build config or None if the project uses registry-only images.""" cfg = _project_cfg(project) build = cfg.get("build", {}) if build.get("no_local_image"): return None ctx_template = build.get("build_context", "") if not ctx_template: return None return { "build_context": ctx_template.replace("{env}", env), "image_name": build.get("image_name", project), "env": env, } # --------------------------------------------------------------------------- # SSE helpers # --------------------------------------------------------------------------- def _sse(payload: dict) -> str: return f"data: {json.dumps(payload)}\n\n" def _now() -> str: return datetime.now(timezone.utc).isoformat() def _line(text: str) -> str: return _sse({"line": text, "timestamp": _now()}) def _done(success: bool, project: str, env: str, action: str) -> str: return _sse({ "done": True, "success": success, "project": project, "env": env, "action": action, }) # --------------------------------------------------------------------------- # Coolify API (synchronous — called from async context via run_in_executor) # --------------------------------------------------------------------------- def _coolify_request(method: str, path: str) -> dict: """Make a Coolify API request. Returns parsed JSON body.""" url = f"{_COOLIFY_BASE}{path}" req = urllib.request.Request( url, method=method, headers={ "Authorization": f"Bearer {_COOLIFY_TOKEN}", "Content-Type": "application/json", "Accept": "application/json", }, ) try: with urllib.request.urlopen(req, timeout=_COOLIFY_TIMEOUT) as resp: body = resp.read() return json.loads(body) if body else {} except urllib.error.HTTPError as exc: body = exc.read() raise RuntimeError( f"Coolify API {method} {path} returned HTTP {exc.code}: {body.decode(errors='replace')[:500]}" ) from exc except Exception as exc: raise RuntimeError(f"Coolify API call failed: {exc}") from exc async def _coolify_action(action: str, uuid: str) -> dict: """Call a Coolify service action endpoint (stop/start/restart).""" import asyncio loop = asyncio.get_event_loop() return await loop.run_in_executor( None, _coolify_request, "POST", f"/services/{uuid}/{action}" ) # --------------------------------------------------------------------------- # Container polling helpers # --------------------------------------------------------------------------- async def _find_containers_for_service(project: str, env: str) -> list[str]: """ Find all running Docker containers belonging to a project/env. Uses the registry name_prefix and matches {env}-{prefix}-* pattern. """ cfg = _project_cfg(project) prefix = cfg.get("name_prefix", project) name_pattern = f"{env}-{prefix}-" result = await run_command( ["docker", "ps", "--filter", f"name={name_pattern}", "--format", "{{.Names}}"], timeout=15, ) containers = [] if result["success"]: for name in result["output"].strip().splitlines(): name = name.strip() if name and name.startswith(name_pattern): containers.append(name) return containers async def _poll_until_stopped( project: str, env: str, max_wait: int = _POLL_MAX_WAIT, ) -> bool: """Poll until no containers for project/env are running. Returns True if stopped.""" import asyncio cfg = _project_cfg(project) prefix = cfg.get("name_prefix", project) name_pattern = f"{env}-{prefix}-" waited = 0 while waited < max_wait: result = await run_command( ["docker", "ps", "--filter", f"name={name_pattern}", "--format", "{{.Names}}"], timeout=15, ) running = [ n.strip() for n in result["output"].strip().splitlines() if n.strip().startswith(name_pattern) ] if result["success"] else [] if not running: return True await asyncio.sleep(_POLL_INTERVAL) waited += _POLL_INTERVAL return False async def _poll_until_running( project: str, env: str, max_wait: int = _POLL_MAX_WAIT, ) -> bool: """Poll until at least one container for project/env is running. Returns True if up.""" import asyncio cfg = _project_cfg(project) prefix = cfg.get("name_prefix", project) name_pattern = f"{env}-{prefix}-" waited = 0 while waited < max_wait: result = await run_command( ["docker", "ps", "--filter", f"name={name_pattern}", "--format", "{{.Names}}"], timeout=15, ) running = [ n.strip() for n in result["output"].strip().splitlines() if n.strip().startswith(name_pattern) ] if result["success"] else [] if running: return True await asyncio.sleep(_POLL_INTERVAL) waited += _POLL_INTERVAL return False # --------------------------------------------------------------------------- # Operation: Restart # --------------------------------------------------------------------------- async def _op_restart(project: str, env: str) -> AsyncGenerator[str, None]: """ Restart: docker restart {containers} via SSH/nsenter. No Coolify involvement — avoids the image-pruning stop/start cycle. """ yield _line(f"[restart] Finding containers for {project}/{env}...") try: containers = await _find_containers_for_service(project, env) except Exception as exc: yield _line(f"[error] Registry lookup failed: {exc}") yield _done(False, project, env, "restart") return if not containers: yield _line(f"[error] No running containers found for {project}/{env}") yield _done(False, project, env, "restart") return yield _line(f"[restart] Restarting {len(containers)} container(s): {', '.join(containers)}") cmd = ["docker", "restart"] + containers result = await run_command(cmd, timeout=120) if result["output"].strip(): for line in result["output"].strip().splitlines(): yield _line(line) if result["error"].strip(): for line in result["error"].strip().splitlines(): yield _line(f"[stderr] {line}") if result["success"]: yield _line(f"[restart] All containers restarted successfully.") yield _done(True, project, env, "restart") else: yield _line(f"[error] docker restart failed (exit code non-zero)") yield _done(False, project, env, "restart") # --------------------------------------------------------------------------- # Operation: Rebuild # --------------------------------------------------------------------------- async def _op_rebuild(project: str, env: str) -> AsyncGenerator[str, None]: """ Rebuild: docker compose down → build image → docker compose up. Uses `ops rebuild` on the host which handles env files, profiles, and cd correctly. No data loss. For code/Dockerfile changes. """ yield _line(f"[rebuild] Rebuilding {project}/{env} via ops CLI...") had_output = False success = True async for line in stream_command_host( [OPS_CLI, "rebuild", project, env], timeout=_BACKUP_TIMEOUT, ): had_output = True if line.startswith("[stderr] "): yield _line(line) elif line.startswith("ERROR") or line.startswith("[error]"): yield _line(f"[error] {line}") success = False else: yield _line(f"[rebuild] {line}") if not had_output: yield _line(f"[error] ops rebuild produced no output — check registry config for {project}") success = False if success: # Verify containers came up containers = await _find_containers_for_service(project, env) if containers: yield _line(f"[rebuild] {len(containers)} container(s) running: {', '.join(containers)}") yield _done(True, project, env, "rebuild") else: yield _line(f"[warn] No containers found after rebuild — check docker compose logs") yield _done(False, project, env, "rebuild") else: yield _done(False, project, env, "rebuild") # --------------------------------------------------------------------------- # Operation: Recreate (Disaster Recovery) # --------------------------------------------------------------------------- async def _op_recreate(project: str, env: str) -> AsyncGenerator[str, None]: """ Recreate: docker compose down → wipe data → docker build → docker compose up. DESTRUCTIVE — wipes all data volumes. Shows "Go to Backups" banner on success. """ try: data_dir = _data_dir(project, env) cfg = _project_cfg(project) except ValueError as exc: yield _line(f"[error] Config error: {exc}") yield _done(False, project, env, "recreate") return # Step 1: Find and stop containers via docker compose code_dir = cfg.get("path", "") + f"/{env}/code" yield _line(f"[recreate] Stopping {project}/{env} containers...") stop_result = await run_command_host( ["sh", "-c", f"cd {code_dir} && docker compose -p {env}-{cfg.get('name_prefix', project)} --profile {env} down 2>&1 || true"], timeout=120, ) if stop_result["output"].strip(): for line in stop_result["output"].strip().splitlines(): yield _line(line) # Step 2: Verify containers are stopped name_prefix = cfg.get("name_prefix", project) verify = await run_command_host( ["sh", "-c", f"docker ps --format '{{{{.Names}}}}' | grep '^{env}-{name_prefix}-' || true"], timeout=30, ) running_containers = verify["output"].strip() if running_containers: yield _line(f"[error] Containers still running for {project}/{env}:") for line in running_containers.splitlines(): yield _line(f" {line}") yield _done(False, project, env, "recreate") return yield _line(f"[recreate] All containers stopped.") # Step 3: Wipe data volumes yield _line(f"[recreate] WARNING: Wiping data directory: {data_dir}") wipe_result = await run_command_host( ["sh", "-c", f"rm -r {data_dir}/* 2>&1; echo EXIT_CODE=$?"], timeout=120, ) for line in (wipe_result["output"].strip() + "\n" + wipe_result["error"].strip()).strip().splitlines(): if line: yield _line(line) if "EXIT_CODE=0" in wipe_result["output"]: yield _line(f"[recreate] Data directory wiped.") else: yield _line(f"[error] Wipe may have failed — check output above.") yield _done(False, project, env, "recreate") return # Step 4: Rebuild via ops CLI (handles image build + compose up) yield _line(f"[recreate] Rebuilding containers...") async for line in stream_command_host( [OPS_CLI, "rebuild", project, env], timeout=_BACKUP_TIMEOUT, ): if line.startswith("[stderr] "): yield _line(line) else: yield _line(f"[recreate] {line}") # Step 5: Verify containers came up containers = await _find_containers_for_service(project, env) if containers: yield _line(f"[recreate] {len(containers)} container(s) running. Restore a backup to complete recovery.") yield _done(True, project, env, "recreate") else: yield _line(f"[warn] No containers found after recreate — check docker compose logs") yield _done(True, project, env, "recreate") # --------------------------------------------------------------------------- # Dispatch wrapper # --------------------------------------------------------------------------- async def _op_generator( project: str, env: str, action: str, ) -> AsyncGenerator[str, None]: """Route to the correct operation generator.""" if action == "restart": async for chunk in _op_restart(project, env): yield chunk elif action == "rebuild": async for chunk in _op_rebuild(project, env): yield chunk elif action == "recreate": async for chunk in _op_recreate(project, env): yield chunk else: yield _line(f"[error] Unknown action '{action}'. Valid: restart, rebuild, recreate") yield _done(False, project, env, action) # --------------------------------------------------------------------------- # Endpoint # --------------------------------------------------------------------------- @router.get( "/{project}/{env}", summary="Container lifecycle operation with real-time SSE output", ) async def lifecycle_op( project: str, env: str, action: str = Query( default="restart", description="Operation: restart | rebuild | recreate", ), _: str = Depends(verify_token), ) -> StreamingResponse: """ Stream a container lifecycle operation via SSE. - restart: docker restart containers (safe, fast) - rebuild: stop via Coolify, rebuild image, start via Coolify - recreate: stop, wipe data, rebuild image, start (destructive — DR only) """ return StreamingResponse( _op_generator(project, env, action), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "X-Accel-Buffering": "no", }, )