import json from datetime import datetime, timezone from typing import Any, AsyncGenerator from fastapi import APIRouter, Depends, HTTPException, Query from fastapi.responses import StreamingResponse from app.auth import verify_token from app.ops_runner import ( run_ops, run_ops_json, run_ops_host, run_ops_host_json, run_command_host, stream_ops_host, stream_command_host, new_op_id, is_cancelled, clear_cancelled, _BACKUP_TIMEOUT, OFFSITE_PYTHON, ) router = APIRouter() def _sse(payload: dict) -> str: return f"data: {json.dumps(payload)}\n\n" def _now() -> str: return datetime.now(timezone.utc).isoformat() @router.get("/", summary="List local backups") async def list_backups( _: str = Depends(verify_token), ) -> list[dict[str, Any]]: """Returns a list of local backup records from `ops backups --json`.""" result = await run_ops_json(["backups"]) if not result["success"]: raise HTTPException(status_code=500, detail=f"Failed to list backups: {result['error']}") data = result["data"] if isinstance(data, list): return data if isinstance(data, dict): for key in ("backups", "data", "items"): if key in data and isinstance(data[key], list): return data[key] return [data] return [] @router.get("/offsite", summary="List offsite backups") async def list_offsite_backups( _: str = Depends(verify_token), ) -> list[dict[str, Any]]: """Returns a list of offsite backup records.""" # Get project list from registry import yaml registry_path = "/opt/infrastructure/servers/hetzner-vps/registry.yaml" try: with open(registry_path) as f: registry = yaml.safe_load(f) projects = [ name for name, cfg in registry.get("projects", {}).items() if cfg.get("backup_dir") and not cfg.get("infrastructure") and not cfg.get("static") ] except Exception: projects = ["mdf", "seriousletter"] # Fallback all_backups = [] for project in projects: result = await run_ops_host_json(["offsite", "list", project]) if result["success"] and isinstance(result["data"], list): for b in result["data"]: b["project"] = project all_backups.extend(result["data"]) return all_backups @router.post("/{project}/{env}", summary="Create a local backup") async def create_backup( project: str, env: str, _: str = Depends(verify_token), ) -> dict[str, Any]: """ Runs `ops backup {project} {env}` on the host. Runs via nsenter because ops backup delegates to project CLIs that use host Python venvs. """ result = await run_ops_host(["backup", project, env], timeout=_BACKUP_TIMEOUT) if not result["success"]: raise HTTPException( status_code=500, detail=f"Backup failed: {result['error'] or result['output']}", ) return { "success": True, "output": result["output"], "project": project, "env": env, } async def _backup_stream(project: str, env: str) -> AsyncGenerator[str, None]: """Stream backup creation progress via SSE.""" op_id = new_op_id() yield _sse({"op_id": op_id}) yield _sse({"line": f"Creating backup for {project}/{env}...", "timestamp": _now()}) try: success = True async for line in stream_ops_host( ["backup", project, env], timeout=_BACKUP_TIMEOUT, op_id=op_id ): yield _sse({"line": line, "timestamp": _now()}) if line.startswith("[error]") or line.startswith("ERROR"): success = False if is_cancelled(op_id): yield _sse({"done": True, "success": False, "cancelled": True}) else: yield _sse({"done": True, "success": success, "project": project, "env": env}) finally: clear_cancelled(op_id) @router.get("/stream/{project}/{env}", summary="Create backup with streaming output") async def create_backup_stream( project: str, env: str, _: str = Depends(verify_token), ) -> StreamingResponse: """Create a backup with real-time SSE progress output.""" return StreamingResponse( _backup_stream(project, env), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, ) @router.post("/offsite/upload/{project}/{env}", summary="Upload backup to offsite") async def upload_offsite( project: str, env: str, _: str = Depends(verify_token), ) -> dict[str, Any]: """Runs `ops offsite upload {project} {env}` on the host.""" result = await run_ops_host( ["offsite", "upload", project, env], timeout=_BACKUP_TIMEOUT ) if not result["success"]: raise HTTPException( status_code=500, detail=f"Offsite upload failed: {result['error'] or result['output']}", ) return {"success": True, "output": result["output"], "project": project, "env": env} async def _upload_stream(project: str, env: str, name: str | None = None) -> AsyncGenerator[str, None]: """Stream offsite upload progress via SSE.""" op_id = new_op_id() yield _sse({"op_id": op_id}) label = f"{project}/{env}/{name}" if name else f"{project}/{env} (latest)" yield _sse({"line": f"Uploading {label} to offsite storage...", "timestamp": _now()}) cmd = ["offsite", "upload", project, env] if name: cmd.append(name) try: success = True async for line in stream_ops_host( cmd, timeout=_BACKUP_TIMEOUT, op_id=op_id ): yield _sse({"line": line, "timestamp": _now()}) if line.startswith("[error]") or line.startswith("ERROR"): success = False if is_cancelled(op_id): yield _sse({"done": True, "success": False, "cancelled": True}) else: yield _sse({"done": True, "success": success, "project": project, "env": env}) finally: clear_cancelled(op_id) @router.get("/offsite/stream/{project}/{env}", summary="Upload to offsite with streaming output") async def upload_offsite_stream( project: str, env: str, name: str | None = Query(None), _: str = Depends(verify_token), ) -> StreamingResponse: """Upload backup to offsite with real-time SSE progress output.""" return StreamingResponse( _upload_stream(project, env, name), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, ) async def _download_stream(project: str, env: str, name: str) -> AsyncGenerator[str, None]: """Stream offsite-to-local download progress via SSE.""" op_id = new_op_id() yield _sse({"op_id": op_id}) yield _sse({"line": f"Downloading {name} from offsite to local storage...", "timestamp": _now()}) # Download to the local backup directory so it appears in the backup list local_path = f"/opt/data/backups/{project}/{env}/{name}" cmd = [ OFFSITE_PYTHON, "-c", f"import sys; sys.stdout.reconfigure(line_buffering=True); " f"sys.path.insert(0, '/opt/data/scripts'); " f"from offsite import download; from pathlib import Path; " f"import os; os.makedirs('/opt/data/backups/{project}/{env}', exist_ok=True); " f"ok = download('{name}', Path('{local_path}'), '{project}', '{env}'); " f"sys.exit(0 if ok else 1)" ] try: success = True async for line in stream_command_host(cmd, timeout=_BACKUP_TIMEOUT, op_id=op_id): yield _sse({"line": line, "timestamp": _now()}) if line.startswith("[error]") or line.startswith("ERROR") or "failed" in line.lower(): success = False if is_cancelled(op_id): yield _sse({"done": True, "success": False, "cancelled": True}) else: yield _sse({"done": True, "success": success, "project": project, "env": env, "name": name}) finally: clear_cancelled(op_id) @router.get("/offsite/download/stream/{project}/{env}", summary="Download offsite backup to local storage with streaming output") async def download_offsite_stream( project: str, env: str, name: str = Query(...), _: str = Depends(verify_token), ) -> StreamingResponse: """Download an offsite backup to local storage with real-time SSE progress output.""" if "/" in name or "\\" in name or ".." in name: raise HTTPException(status_code=400, detail="Invalid backup name") return StreamingResponse( _download_stream(project, env, name), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, ) @router.post("/offsite/retention", summary="Apply offsite retention policy") async def apply_retention( _: str = Depends(verify_token), ) -> dict[str, Any]: """Runs `ops offsite retention` on the host.""" result = await run_ops_host(["offsite", "retention"], timeout=_BACKUP_TIMEOUT) if not result["success"]: raise HTTPException( status_code=500, detail=f"Retention policy failed: {result['error'] or result['output']}", ) return {"success": True, "output": result["output"]} @router.delete("/{project}/{env}/{name}", summary="Delete a backup") async def delete_backup( project: str, env: str, name: str, target: str = Query("local", regex="^(local|offsite|both)$"), _: str = Depends(verify_token), ) -> dict[str, Any]: """ Delete a backup from local storage, offsite, or both. Query param `target`: local | offsite | both (default: local). """ if "/" in name or "\\" in name or ".." in name: raise HTTPException(status_code=400, detail="Invalid backup name") results = {"local": None, "offsite": None} # Delete local if target in ("local", "both"): backup_path = f"/opt/data/backups/{project}/{env}/{name}" check = await run_command_host(["test", "-f", backup_path]) if check["success"]: result = await run_command_host(["rm", backup_path]) results["local"] = "ok" if result["success"] else "failed" else: results["local"] = "not_found" # Delete offsite if target in ("offsite", "both"): result = await run_command_host([ "/opt/data/\u03c0/bin/python3", "-c", f"import sys; sys.path.insert(0, '/opt/data/scripts'); " f"from offsite import delete; " f"ok = delete('{name}', '{project}', '{env}', quiet=True); " f"sys.exit(0 if ok else 1)" ]) results["offsite"] = "ok" if result["success"] else "failed" # Check if anything succeeded any_ok = "ok" in results.values() if not any_ok: raise HTTPException(status_code=500, detail=f"Delete failed: {results}") return {"success": True, "project": project, "env": env, "name": name, "results": results}