From fd03c16eca085423267c163137b28ccb60de8db0 Mon Sep 17 00:00:00 2001
From: Matthias Nott <mnott@mnsoft.org>
Date: Wed, 25 Feb 2026 00:45:13 +0100
Subject: [PATCH] feat: multi-compose rebuild (Seafile), cancel endpoint, schedule router, project descriptor
---
app/routers/backups.py | 149 +++++++++++++++++++++++++++++++++++++++++++++++++
1 files changed, 147 insertions(+), 2 deletions(-)
diff --git a/app/routers/backups.py b/app/routers/backups.py
index de5a15c..badf0bf 100644
--- a/app/routers/backups.py
+++ b/app/routers/backups.py
@@ -1,11 +1,26 @@
-from typing import Any
+import json
+from datetime import datetime, timezone
+from typing import Any, AsyncGenerator
from fastapi import APIRouter, Depends, HTTPException, Query
+from fastapi.responses import StreamingResponse
from app.auth import verify_token
-from app.ops_runner import run_ops, run_ops_json, run_ops_host, run_ops_host_json, run_command_host, _BACKUP_TIMEOUT
+from app.ops_runner import (
+ run_ops, run_ops_json, run_ops_host, run_ops_host_json, run_command_host,
+ stream_ops_host, stream_command_host, new_op_id, is_cancelled, clear_cancelled,
+ _BACKUP_TIMEOUT, OFFSITE_PYTHON,
+)
router = APIRouter()
+
+
+def _sse(payload: dict) -> str:
+ return f"data: {json.dumps(payload)}\n\n"
+
+
+def _now() -> str:
+ return datetime.now(timezone.utc).isoformat()
@router.get("/", summary="List local backups")
@@ -82,6 +97,43 @@
}
+async def _backup_stream(project: str, env: str) -> AsyncGenerator[str, None]:
+ """Stream backup creation progress via SSE."""
+ op_id = new_op_id()
+ yield _sse({"op_id": op_id})
+ yield _sse({"line": f"Creating backup for {project}/{env}...", "timestamp": _now()})
+
+ try:
+ success = True
+ async for line in stream_ops_host(
+ ["backup", project, env], timeout=_BACKUP_TIMEOUT, op_id=op_id
+ ):
+ yield _sse({"line": line, "timestamp": _now()})
+ if line.startswith("[error]") or line.startswith("ERROR"):
+ success = False
+
+ if is_cancelled(op_id):
+ yield _sse({"done": True, "success": False, "cancelled": True})
+ else:
+ yield _sse({"done": True, "success": success, "project": project, "env": env})
+ finally:
+ clear_cancelled(op_id)
+
+
+@router.get("/stream/{project}/{env}", summary="Create backup with streaming output")
+async def create_backup_stream(
+ project: str,
+ env: str,
+ _: str = Depends(verify_token),
+) -> StreamingResponse:
+ """Create a backup with real-time SSE progress output."""
+ return StreamingResponse(
+ _backup_stream(project, env),
+ media_type="text/event-stream",
+ headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
+ )
+
+
@router.post("/offsite/upload/{project}/{env}", summary="Upload backup to offsite")
async def upload_offsite(
project: str,
@@ -100,6 +152,99 @@
return {"success": True, "output": result["output"], "project": project, "env": env}
+async def _upload_stream(project: str, env: str, name: str | None = None) -> AsyncGenerator[str, None]:
+ """Stream offsite upload progress via SSE."""
+ op_id = new_op_id()
+ yield _sse({"op_id": op_id})
+ label = f"{project}/{env}/{name}" if name else f"{project}/{env} (latest)"
+ yield _sse({"line": f"Uploading {label} to offsite storage...", "timestamp": _now()})
+
+ cmd = ["offsite", "upload", project, env]
+ if name:
+ cmd.append(name)
+
+ try:
+ success = True
+ async for line in stream_ops_host(
+ cmd, timeout=_BACKUP_TIMEOUT, op_id=op_id
+ ):
+ yield _sse({"line": line, "timestamp": _now()})
+ if line.startswith("[error]") or line.startswith("ERROR"):
+ success = False
+
+ if is_cancelled(op_id):
+ yield _sse({"done": True, "success": False, "cancelled": True})
+ else:
+ yield _sse({"done": True, "success": success, "project": project, "env": env})
+ finally:
+ clear_cancelled(op_id)
+
+
+@router.get("/offsite/stream/{project}/{env}", summary="Upload to offsite with streaming output")
+async def upload_offsite_stream(
+ project: str,
+ env: str,
+ name: str | None = Query(None),
+ _: str = Depends(verify_token),
+) -> StreamingResponse:
+ """Upload backup to offsite with real-time SSE progress output."""
+ return StreamingResponse(
+ _upload_stream(project, env, name),
+ media_type="text/event-stream",
+ headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
+ )
+
+
+async def _download_stream(project: str, env: str, name: str) -> AsyncGenerator[str, None]:
+ """Stream offsite-to-local download progress via SSE."""
+ op_id = new_op_id()
+ yield _sse({"op_id": op_id})
+ yield _sse({"line": f"Downloading {name} from offsite to local storage...", "timestamp": _now()})
+
+ # Download to the local backup directory so it appears in the backup list
+ local_path = f"/opt/data/backups/{project}/{env}/{name}"
+ cmd = [
+ OFFSITE_PYTHON, "-c",
+ f"import sys; sys.stdout.reconfigure(line_buffering=True); "
+ f"sys.path.insert(0, '/opt/data/scripts'); "
+ f"from offsite import download; from pathlib import Path; "
+ f"import os; os.makedirs('/opt/data/backups/{project}/{env}', exist_ok=True); "
+ f"ok = download('{name}', Path('{local_path}'), '{project}', '{env}'); "
+ f"sys.exit(0 if ok else 1)"
+ ]
+
+ try:
+ success = True
+ async for line in stream_command_host(cmd, timeout=_BACKUP_TIMEOUT, op_id=op_id):
+ yield _sse({"line": line, "timestamp": _now()})
+ if line.startswith("[error]") or line.startswith("ERROR") or "failed" in line.lower():
+ success = False
+
+ if is_cancelled(op_id):
+ yield _sse({"done": True, "success": False, "cancelled": True})
+ else:
+ yield _sse({"done": True, "success": success, "project": project, "env": env, "name": name})
+ finally:
+ clear_cancelled(op_id)
+
+
+@router.get("/offsite/download/stream/{project}/{env}", summary="Download offsite backup to local storage with streaming output")
+async def download_offsite_stream(
+ project: str,
+ env: str,
+ name: str = Query(...),
+ _: str = Depends(verify_token),
+) -> StreamingResponse:
+ """Download an offsite backup to local storage with real-time SSE progress output."""
+ if "/" in name or "\\" in name or ".." in name:
+ raise HTTPException(status_code=400, detail="Invalid backup name")
+ return StreamingResponse(
+ _download_stream(project, env, name),
+ media_type="text/event-stream",
+ headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
+ )
+
+
@router.post("/offsite/retention", summary="Apply offsite retention policy")
async def apply_retention(
_: str = Depends(verify_token),
--
Gitblit v1.3.1