From fd03c16eca085423267c163137b28ccb60de8db0 Mon Sep 17 00:00:00 2001
From: Matthias Nott <mnott@mnsoft.org>
Date: Wed, 25 Feb 2026 00:45:13 +0100
Subject: [PATCH] feat: multi-compose rebuild (Seafile), cancel endpoint, schedule router, project descriptor

---
 app/routers/backups.py |  149 +++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 147 insertions(+), 2 deletions(-)

diff --git a/app/routers/backups.py b/app/routers/backups.py
index de5a15c..badf0bf 100644
--- a/app/routers/backups.py
+++ b/app/routers/backups.py
@@ -1,11 +1,26 @@
-from typing import Any
+import json
+from datetime import datetime, timezone
+from typing import Any, AsyncGenerator
 
 from fastapi import APIRouter, Depends, HTTPException, Query
+from fastapi.responses import StreamingResponse
 
 from app.auth import verify_token
-from app.ops_runner import run_ops, run_ops_json, run_ops_host, run_ops_host_json, run_command_host, _BACKUP_TIMEOUT
+from app.ops_runner import (
+    run_ops, run_ops_json, run_ops_host, run_ops_host_json, run_command_host,
+    stream_ops_host, stream_command_host, new_op_id, is_cancelled, clear_cancelled,
+    _BACKUP_TIMEOUT, OFFSITE_PYTHON,
+)
 
 router = APIRouter()
+
+
+def _sse(payload: dict) -> str:
+    return f"data: {json.dumps(payload)}\n\n"
+
+
+def _now() -> str:
+    return datetime.now(timezone.utc).isoformat()
 
 
 @router.get("/", summary="List local backups")
@@ -82,6 +97,43 @@
     }
 
 
+async def _backup_stream(project: str, env: str) -> AsyncGenerator[str, None]:
+    """Stream backup creation progress via SSE."""
+    op_id = new_op_id()
+    yield _sse({"op_id": op_id})
+    yield _sse({"line": f"Creating backup for {project}/{env}...", "timestamp": _now()})
+
+    try:
+        success = True
+        async for line in stream_ops_host(
+            ["backup", project, env], timeout=_BACKUP_TIMEOUT, op_id=op_id
+        ):
+            yield _sse({"line": line, "timestamp": _now()})
+            if line.startswith("[error]") or line.startswith("ERROR"):
+                success = False
+
+        if is_cancelled(op_id):
+            yield _sse({"done": True, "success": False, "cancelled": True})
+        else:
+            yield _sse({"done": True, "success": success, "project": project, "env": env})
+    finally:
+        clear_cancelled(op_id)
+
+
+@router.get("/stream/{project}/{env}", summary="Create backup with streaming output")
+async def create_backup_stream(
+    project: str,
+    env: str,
+    _: str = Depends(verify_token),
+) -> StreamingResponse:
+    """Create a backup with real-time SSE progress output."""
+    return StreamingResponse(
+        _backup_stream(project, env),
+        media_type="text/event-stream",
+        headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
+    )
+
+
 @router.post("/offsite/upload/{project}/{env}", summary="Upload backup to offsite")
 async def upload_offsite(
     project: str,
@@ -100,6 +152,99 @@
     return {"success": True, "output": result["output"], "project": project, "env": env}
 
 
+async def _upload_stream(project: str, env: str, name: str | None = None) -> AsyncGenerator[str, None]:
+    """Stream offsite upload progress via SSE."""
+    op_id = new_op_id()
+    yield _sse({"op_id": op_id})
+    label = f"{project}/{env}/{name}" if name else f"{project}/{env} (latest)"
+    yield _sse({"line": f"Uploading {label} to offsite storage...", "timestamp": _now()})
+
+    cmd = ["offsite", "upload", project, env]
+    if name:
+        cmd.append(name)
+
+    try:
+        success = True
+        async for line in stream_ops_host(
+            cmd, timeout=_BACKUP_TIMEOUT, op_id=op_id
+        ):
+            yield _sse({"line": line, "timestamp": _now()})
+            if line.startswith("[error]") or line.startswith("ERROR"):
+                success = False
+
+        if is_cancelled(op_id):
+            yield _sse({"done": True, "success": False, "cancelled": True})
+        else:
+            yield _sse({"done": True, "success": success, "project": project, "env": env})
+    finally:
+        clear_cancelled(op_id)
+
+
+@router.get("/offsite/stream/{project}/{env}", summary="Upload to offsite with streaming output")
+async def upload_offsite_stream(
+    project: str,
+    env: str,
+    name: str | None = Query(None),
+    _: str = Depends(verify_token),
+) -> StreamingResponse:
+    """Upload backup to offsite with real-time SSE progress output."""
+    return StreamingResponse(
+        _upload_stream(project, env, name),
+        media_type="text/event-stream",
+        headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
+    )
+
+
+async def _download_stream(project: str, env: str, name: str) -> AsyncGenerator[str, None]:
+    """Stream offsite-to-local download progress via SSE."""
+    op_id = new_op_id()
+    yield _sse({"op_id": op_id})
+    yield _sse({"line": f"Downloading {name} from offsite to local storage...", "timestamp": _now()})
+
+    # Download to the local backup directory so it appears in the backup list
+    local_path = f"/opt/data/backups/{project}/{env}/{name}"
+    cmd = [
+        OFFSITE_PYTHON, "-c",
+        f"import sys; sys.stdout.reconfigure(line_buffering=True); "
+        f"sys.path.insert(0, '/opt/data/scripts'); "
+        f"from offsite import download; from pathlib import Path; "
+        f"import os; os.makedirs('/opt/data/backups/{project}/{env}', exist_ok=True); "
+        f"ok = download('{name}', Path('{local_path}'), '{project}', '{env}'); "
+        f"sys.exit(0 if ok else 1)"
+    ]
+
+    try:
+        success = True
+        async for line in stream_command_host(cmd, timeout=_BACKUP_TIMEOUT, op_id=op_id):
+            yield _sse({"line": line, "timestamp": _now()})
+            if line.startswith("[error]") or line.startswith("ERROR") or "failed" in line.lower():
+                success = False
+
+        if is_cancelled(op_id):
+            yield _sse({"done": True, "success": False, "cancelled": True})
+        else:
+            yield _sse({"done": True, "success": success, "project": project, "env": env, "name": name})
+    finally:
+        clear_cancelled(op_id)
+
+
+@router.get("/offsite/download/stream/{project}/{env}", summary="Download offsite backup to local storage with streaming output")
+async def download_offsite_stream(
+    project: str,
+    env: str,
+    name: str = Query(...),
+    _: str = Depends(verify_token),
+) -> StreamingResponse:
+    """Download an offsite backup to local storage with real-time SSE progress output."""
+    if "/" in name or "\\" in name or ".." in name:
+        raise HTTPException(status_code=400, detail="Invalid backup name")
+    return StreamingResponse(
+        _download_stream(project, env, name),
+        media_type="text/event-stream",
+        headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
+    )
+
+
 @router.post("/offsite/retention", summary="Apply offsite retention policy")
 async def apply_retention(
     _: str = Depends(verify_token),

--
Gitblit v1.3.1