From dddc7c245462846e1e09d6cfb2102934aa1f4e8e Mon Sep 17 00:00:00 2001
From: Matthias Nott <mnott@mnsoft.org>
Date: Sat, 21 Feb 2026 23:30:48 +0100
Subject: [PATCH] refactor: remove duplicate app/app/ phantom directory
---
/dev/null | 253 --------------------------------------------------
1 files changed, 0 insertions(+), 253 deletions(-)
diff --git a/app/app/__init__.py b/app/app/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/app/app/__init__.py
+++ /dev/null
diff --git a/app/app/auth.py b/app/app/auth.py
deleted file mode 100644
index 2265d08..0000000
--- a/app/app/auth.py
+++ /dev/null
@@ -1,33 +0,0 @@
-import os
-from fastapi import HTTPException, Security, Query
-from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
-from typing import Optional
-
-_AUTH_TOKEN = os.environ.get("AUTH_TOKEN", "changeme")
-
-_bearer_scheme = HTTPBearer(auto_error=False)
-
-
-async def verify_token(
- credentials: Optional[HTTPAuthorizationCredentials] = Security(_bearer_scheme),
- token: Optional[str] = Query(default=None),
-) -> str:
- """
- Verify the bearer token from Authorization header or ?token= query param.
- Raises 401 if missing or invalid.
- """
- provided: Optional[str] = None
-
- if credentials is not None:
- provided = credentials.credentials
- elif token is not None:
- provided = token
-
- if provided is None or provided != _AUTH_TOKEN:
- raise HTTPException(
- status_code=401,
- detail="Invalid or missing authentication token",
- headers={"WWW-Authenticate": "Bearer"},
- )
-
- return provided
diff --git a/app/app/main.py b/app/app/main.py
deleted file mode 100644
index 7088a4b..0000000
--- a/app/app/main.py
+++ /dev/null
@@ -1,60 +0,0 @@
-import logging
-from contextlib import asynccontextmanager
-from pathlib import Path
-
-from fastapi import FastAPI
-from fastapi.middleware.cors import CORSMiddleware
-from fastapi.staticfiles import StaticFiles
-
-from app.routers import backups, restore, services, status, system
-
-logging.basicConfig(
- level=logging.INFO,
- format="%(asctime)s %(levelname)s %(name)s: %(message)s",
-)
-logger = logging.getLogger(__name__)
-
-_STATIC_DIR = Path(__file__).parent.parent / "static"
-
-
-@asynccontextmanager
-async def lifespan(app: FastAPI):
- logger.info("Ops WebUI server is running")
- yield
-
-
-app = FastAPI(
- title="Ops WebUI API",
- description="Backend API for the ops web dashboard",
- version="1.0.0",
- lifespan=lifespan,
-)
-
-# ---------------------------------------------------------------------------
-# CORS – open for development; restrict in production via env/reverse proxy
-# ---------------------------------------------------------------------------
-app.add_middleware(
- CORSMiddleware,
- allow_origins=["*"],
- allow_credentials=True,
- allow_methods=["*"],
- allow_headers=["*"],
-)
-
-# ---------------------------------------------------------------------------
-# API routers
-# ---------------------------------------------------------------------------
-app.include_router(status.router, prefix="/api/status", tags=["status"])
-app.include_router(backups.router, prefix="/api/backups", tags=["backups"])
-app.include_router(restore.router, prefix="/api/restore", tags=["restore"])
-app.include_router(services.router, prefix="/api/services", tags=["services"])
-app.include_router(system.router, prefix="/api/system", tags=["system"])
-
-# ---------------------------------------------------------------------------
-# Static files – serve the frontend SPA at /
-# Mount last so API routes take precedence
-# ---------------------------------------------------------------------------
-if _STATIC_DIR.exists():
- app.mount("/", StaticFiles(directory=str(_STATIC_DIR), html=True), name="static")
-else:
- logger.warning("Static directory not found at %s – frontend will not be served", _STATIC_DIR)
diff --git a/app/app/ops_runner.py b/app/app/ops_runner.py
deleted file mode 100644
index 226fdaa..0000000
--- a/app/app/ops_runner.py
+++ /dev/null
@@ -1,175 +0,0 @@
-import asyncio
-import json
-import os
-from typing import AsyncGenerator
-
-OPS_CLI = os.environ.get("OPS_CLI", "/opt/infrastructure/ops")
-OFFSITE_PYTHON = os.environ.get("OFFSITE_PYTHON", "/opt/data/π/bin/python3")
-OFFSITE_SCRIPT = os.environ.get("OFFSITE_SCRIPT", "/opt/data/scripts/offsite.py")
-
-_DEFAULT_TIMEOUT = 300
-_BACKUP_TIMEOUT = 3600
-
-# nsenter via Docker: run commands on the host from inside the container.
-# Required because ops backup/restore delegate to host Python venvs (3.12)
-# that are incompatible with the container's Python (3.11).
-_NSENTER_PREFIX = [
- "docker", "run", "--rm", "-i",
- "--privileged", "--pid=host", "--network=host",
- "alpine",
- "nsenter", "-t", "1", "-m", "-u", "-i", "-n", "-p", "--",
-]
-
-
-# ---------------------------------------------------------------------------
-# In-container execution (status, disk, health, docker commands)
-# ---------------------------------------------------------------------------
-
-async def run_ops(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
- """Run the ops CLI inside the container."""
- return await _run_exec([OPS_CLI] + args, timeout=timeout)
-
-
-async def run_ops_json(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
- """Run the ops CLI with --json and return parsed JSON."""
- result = await run_ops(args + ["--json"], timeout=timeout)
- if not result["success"]:
- return {"success": False, "data": None, "error": result["error"] or result["output"]}
- try:
- data = json.loads(result["output"])
- return {"success": True, "data": data, "error": ""}
- except json.JSONDecodeError as exc:
- return {
- "success": False,
- "data": None,
- "error": f"Failed to parse JSON: {exc}\nRaw: {result['output'][:500]}",
- }
-
-
-async def stream_ops(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]:
- """Stream ops CLI output (in-container)."""
- async for line in _stream_exec([OPS_CLI] + args, timeout=timeout):
- yield line
-
-
-async def run_command(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
- """Generic command runner (in-container)."""
- return await _run_exec(args, timeout=timeout)
-
-
-async def stream_command(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]:
- """Stream generic command output (in-container)."""
- async for line in _stream_exec(args, timeout=timeout):
- yield line
-
-
-# ---------------------------------------------------------------------------
-# Host execution (backup, restore — needs host Python venvs)
-# ---------------------------------------------------------------------------
-
-async def run_ops_host(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
- """Run the ops CLI on the host via nsenter."""
- return await _run_exec(_NSENTER_PREFIX + [OPS_CLI] + args, timeout=timeout)
-
-
-async def stream_ops_host(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]:
- """Stream ops CLI output from the host via nsenter."""
- async for line in _stream_exec(_NSENTER_PREFIX + [OPS_CLI] + args, timeout=timeout):
- yield line
-
-
-# ---------------------------------------------------------------------------
-# Internal helpers
-# ---------------------------------------------------------------------------
-
-async def _run_exec(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
- """Execute a command and capture output."""
- try:
- proc = await asyncio.create_subprocess_exec(
- *args,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- )
- try:
- stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
- except asyncio.TimeoutError:
- proc.kill()
- await proc.communicate()
- return {"success": False, "output": "", "error": f"Command timed out after {timeout}s"}
-
- return {
- "success": proc.returncode == 0,
- "output": stdout.decode("utf-8", errors="replace"),
- "error": stderr.decode("utf-8", errors="replace"),
- }
- except FileNotFoundError as exc:
- return {"success": False, "output": "", "error": f"Executable not found: {exc}"}
- except Exception as exc:
- return {"success": False, "output": "", "error": str(exc)}
-
-
-async def _stream_exec(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]:
- """Execute a command and yield interleaved stdout/stderr lines."""
- try:
- proc = await asyncio.create_subprocess_exec(
- *args,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- )
- except FileNotFoundError as exc:
- yield f"[error] Executable not found: {exc}"
- return
- except Exception as exc:
- yield f"[error] Failed to start process: {exc}"
- return
-
- async def _readline(stream, prefix=""):
- while True:
- try:
- line = await asyncio.wait_for(stream.readline(), timeout=timeout)
- except asyncio.TimeoutError:
- yield f"{prefix}[timeout] Command exceeded {timeout}s"
- break
- if not line:
- break
- yield prefix + line.decode("utf-8", errors="replace").rstrip("\n")
-
- stdout_gen = _readline(proc.stdout).__aiter__()
- stderr_gen = _readline(proc.stderr, "[stderr] ").__aiter__()
-
- stdout_done = stderr_done = False
- pending_out = pending_err = None
-
- async def _next(it):
- try:
- return await it.__anext__()
- except StopAsyncIteration:
- return None
-
- pending_out = asyncio.create_task(_next(stdout_gen))
- pending_err = asyncio.create_task(_next(stderr_gen))
-
- while not (stdout_done and stderr_done):
- tasks = [t for t in (pending_out, pending_err) if t is not None]
- if not tasks:
- break
- done, _ = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
-
- for task in done:
- val = task.result()
- if task is pending_out:
- if val is None:
- stdout_done = True
- pending_out = None
- else:
- yield val
- pending_out = asyncio.create_task(_next(stdout_gen))
- elif task is pending_err:
- if val is None:
- stderr_done = True
- pending_err = None
- else:
- yield val
- pending_err = asyncio.create_task(_next(stderr_gen))
-
- await proc.wait()
diff --git a/app/app/routers/__init__.py b/app/app/routers/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/app/app/routers/__init__.py
+++ /dev/null
diff --git a/app/app/routers/backups.py b/app/app/routers/backups.py
deleted file mode 100644
index d1cf26e..0000000
--- a/app/app/routers/backups.py
+++ /dev/null
@@ -1,101 +0,0 @@
-from typing import Any
-
-from fastapi import APIRouter, Depends, HTTPException
-
-from app.auth import verify_token
-from app.ops_runner import run_ops, run_ops_json, run_ops_host, _BACKUP_TIMEOUT
-
-router = APIRouter()
-
-
-@router.get("/", summary="List local backups")
-async def list_backups(
- _: str = Depends(verify_token),
-) -> list[dict[str, Any]]:
- """Returns a list of local backup records from `ops backups --json`."""
- result = await run_ops_json(["backups"])
- if not result["success"]:
- raise HTTPException(status_code=500, detail=f"Failed to list backups: {result['error']}")
-
- data = result["data"]
- if isinstance(data, list):
- return data
- if isinstance(data, dict):
- for key in ("backups", "data", "items"):
- if key in data and isinstance(data[key], list):
- return data[key]
- return [data]
- return []
-
-
-@router.get("/offsite", summary="List offsite backups")
-async def list_offsite_backups(
- _: str = Depends(verify_token),
-) -> list[dict[str, Any]]:
- """Returns a list of offsite backup records."""
- all_backups = []
- for project in ["mdf", "seriousletter"]:
- result = await run_ops_json(["offsite", "list", project])
- if result["success"] and isinstance(result["data"], list):
- for b in result["data"]:
- b["project"] = project
- all_backups.extend(result["data"])
- return all_backups
-
-
-@router.post("/{project}/{env}", summary="Create a local backup")
-async def create_backup(
- project: str,
- env: str,
- _: str = Depends(verify_token),
-) -> dict[str, Any]:
- """
- Runs `ops backup {project} {env}` on the host.
-
- Runs via nsenter because ops backup delegates to project CLIs
- that use host Python venvs.
- """
- result = await run_ops_host(["backup", project, env], timeout=_BACKUP_TIMEOUT)
- if not result["success"]:
- raise HTTPException(
- status_code=500,
- detail=f"Backup failed: {result['error'] or result['output']}",
- )
- return {
- "success": True,
- "output": result["output"],
- "project": project,
- "env": env,
- }
-
-
-@router.post("/offsite/upload/{project}/{env}", summary="Upload backup to offsite")
-async def upload_offsite(
- project: str,
- env: str,
- _: str = Depends(verify_token),
-) -> dict[str, Any]:
- """Runs `ops offsite upload {project} {env}` on the host."""
- result = await run_ops_host(
- ["offsite", "upload", project, env], timeout=_BACKUP_TIMEOUT
- )
- if not result["success"]:
- raise HTTPException(
- status_code=500,
- detail=f"Offsite upload failed: {result['error'] or result['output']}",
- )
- return {"success": True, "output": result["output"], "project": project, "env": env}
-
-
-@router.post("/offsite/retention", summary="Apply offsite retention policy")
-async def apply_retention(
- _: str = Depends(verify_token),
-) -> dict[str, Any]:
- """Runs `ops offsite retention` on the host."""
- result = await run_ops_host(["offsite", "retention"], timeout=_BACKUP_TIMEOUT)
- if not result["success"]:
- raise HTTPException(
- status_code=500,
- detail=f"Retention policy failed: {result['error'] or result['output']}",
- )
- return {"success": True, "output": result["output"]}
diff --git a/app/app/routers/restore.py b/app/app/routers/restore.py
deleted file mode 100644
index d03428e..0000000
--- a/app/app/routers/restore.py
+++ /dev/null
@@ -1,85 +0,0 @@
-import json
-from datetime import datetime, timezone
-from typing import AsyncGenerator, Literal
-
-from fastapi import APIRouter, Depends, Query
-from fastapi.responses import StreamingResponse
-
-from app.auth import verify_token
-from app.ops_runner import _BACKUP_TIMEOUT, stream_ops_host
-
-router = APIRouter()
-
-
-def _sse_line(payload: dict) -> str:
- """Format a dict as a single SSE data line."""
- return f"data: {json.dumps(payload)}\n\n"
-
-
-async def _restore_generator(
- project: str,
- env: str,
- source: str,
- dry_run: bool,
-) -> AsyncGenerator[str, None]:
- """Async generator that drives the restore workflow and yields SSE events.
-
- Runs on the host via nsenter because ops restore delegates to project CLIs
- that use host Python venvs incompatible with the container's Python.
- """
- base_args = ["restore", project, env]
- if dry_run:
- base_args.append("--dry-run")
-
- if source == "offsite":
- # ops offsite restore <project> <env>
- download_args = ["offsite", "restore", project, env]
- yield _sse_line({"line": f"Downloading {project}/{env} from offsite...", "timestamp": _now()})
-
- download_ok = True
- async for line in stream_ops_host(download_args, timeout=_BACKUP_TIMEOUT):
- yield _sse_line({"line": line, "timestamp": _now()})
- if line.startswith("[error]"):
- download_ok = False
-
- if not download_ok:
- yield _sse_line({"done": True, "success": False})
- return
-
- yield _sse_line({"line": "Download complete. Starting restore...", "timestamp": _now()})
-
- success = True
- async for line in stream_ops_host(base_args, timeout=_BACKUP_TIMEOUT):
- yield _sse_line({"line": line, "timestamp": _now()})
- if line.startswith("[error]"):
- success = False
-
- yield _sse_line({"done": True, "success": success})
-
-
-def _now() -> str:
- return datetime.now(timezone.utc).isoformat()
-
-
-@router.get("/{project}/{env}", summary="Restore a backup with real-time output")
-async def restore_backup(
- project: str,
- env: str,
- source: Literal["local", "offsite"] = Query(default="local"),
- dry_run: bool = Query(default=False, alias="dry_run"),
- _: str = Depends(verify_token),
-) -> StreamingResponse:
- """
- Restore a backup for the given project/env.
-
- Uses Server-Sent Events (SSE) to stream real-time progress.
- Runs on the host via nsenter for Python venv compatibility.
- """
- return StreamingResponse(
- _restore_generator(project, env, source, dry_run),
- media_type="text/event-stream",
- headers={
- "Cache-Control": "no-cache",
- "X-Accel-Buffering": "no",
- },
- )
diff --git a/app/app/routers/services.py b/app/app/routers/services.py
deleted file mode 100644
index 7cdad19..0000000
--- a/app/app/routers/services.py
+++ /dev/null
@@ -1,177 +0,0 @@
-import os
-from typing import Any
-
-import yaml
-from fastapi import APIRouter, Depends, HTTPException, Query
-
-from app.auth import verify_token
-from app.ops_runner import run_command
-
-router = APIRouter()
-
-_DOCKER = "docker"
-_REGISTRY_PATH = os.environ.get(
- "REGISTRY_PATH",
- "/opt/infrastructure/servers/hetzner-vps/registry.yaml",
-)
-
-# ---------------------------------------------------------------------------
-# Registry-based name prefix lookup (cached)
-# ---------------------------------------------------------------------------
-_prefix_cache: dict[str, str] | None = None
-
-
-def _load_prefixes() -> dict[str, str]:
- """Load project -> name_prefix mapping from the ops registry."""
- global _prefix_cache
- if _prefix_cache is not None:
- return _prefix_cache
-
- try:
- with open(_REGISTRY_PATH) as f:
- data = yaml.safe_load(f)
- _prefix_cache = {}
- for proj_name, cfg in data.get("projects", {}).items():
- _prefix_cache[proj_name] = cfg.get("name_prefix", proj_name)
- return _prefix_cache
- except Exception:
- return {}
-
-
-# ---------------------------------------------------------------------------
-# Container name resolution
-# ---------------------------------------------------------------------------
-
-
-async def _find_by_prefix(pattern: str) -> str | None:
- """Find first running container whose name starts with `pattern`."""
- result = await run_command(
- [_DOCKER, "ps", "--filter", f"name={pattern}", "--format", "{{.Names}}"],
- timeout=10,
- )
- if not result["success"]:
- return None
- for name in result["output"].strip().splitlines():
- name = name.strip()
- if name and name.startswith(pattern):
- return name
- return None
-
-
-async def _find_exact(name: str) -> str | None:
- """Find a running container with exactly this name."""
- result = await run_command(
- [_DOCKER, "ps", "--filter", f"name={name}", "--format", "{{.Names}}"],
- timeout=10,
- )
- if not result["success"]:
- return None
- for n in result["output"].strip().splitlines():
- if n.strip() == name:
- return name
- return None
-
-
-async def _resolve_container(project: str, env: str, service: str) -> str:
- """
- Resolve the actual Docker container name from project/env/service.
-
- Uses the ops registry name_prefix mapping and tries patterns in order:
- 1. {env}-{prefix}-{service} (mdf, seriousletter: dev-mdf-mysql-UUID)
- 2. {prefix}-{service} (ringsaday: ringsaday-website-UUID, coolify: coolify-db)
- 3. {prefix}-{env} (ringsaday: ringsaday-dev-UUID)
- 4. exact {prefix} (coolify infra: coolify)
- """
- prefixes = _load_prefixes()
- prefix = prefixes.get(project, project)
-
- # Pattern 1: {env}-{prefix}-{service}
- hit = await _find_by_prefix(f"{env}-{prefix}-{service}")
- if hit:
- return hit
-
- # Pattern 2: {prefix}-{service}
- hit = await _find_by_prefix(f"{prefix}-{service}")
- if hit:
- return hit
-
- # Pattern 3: {prefix}-{env}
- hit = await _find_by_prefix(f"{prefix}-{env}")
- if hit:
- return hit
-
- # Pattern 4: exact match when service == prefix (e.g., coolify)
- if service == prefix:
- hit = await _find_exact(prefix)
- if hit:
- return hit
-
- raise HTTPException(
- status_code=404,
- detail=f"Container not found for {project}/{env}/{service}",
- )
-
-
-# ---------------------------------------------------------------------------
-# Endpoints
-# ---------------------------------------------------------------------------
-
-
-@router.get("/logs/{project}/{env}/{service}", summary="Get container logs")
-async def get_logs(
- project: str,
- env: str,
- service: str,
- lines: int = Query(
- default=100, ge=1, le=10000, description="Number of log lines to return"
- ),
- _: str = Depends(verify_token),
-) -> dict[str, Any]:
- """Fetch the last N lines of logs from a container."""
- container = await _resolve_container(project, env, service)
- result = await run_command(
- [_DOCKER, "logs", "--tail", str(lines), container],
- timeout=30,
- )
-
- # docker logs writes to stderr by default; combine both streams
- combined = result["output"] + result["error"]
-
- if not result["success"] and not combined.strip():
- raise HTTPException(
- status_code=500,
- detail=f"Failed to retrieve logs for container '{container}'",
- )
-
- return {
- "container": container,
- "lines": lines,
- "logs": combined,
- }
-
-
-@router.post("/restart/{project}/{env}/{service}", summary="Restart a container")
-async def restart_service(
- project: str,
- env: str,
- service: str,
- _: str = Depends(verify_token),
-) -> dict[str, Any]:
- """Restart a Docker container."""
- container = await _resolve_container(project, env, service)
- result = await run_command(
- [_DOCKER, "restart", container],
- timeout=60,
- )
-
- if not result["success"]:
- raise HTTPException(
- status_code=500,
- detail=f"Failed to restart container '{container}': {result['error'] or result['output']}",
- )
-
- return {
- "success": True,
- "container": container,
- "message": f"Container '{container}' restarted successfully",
- }
diff --git a/app/app/routers/status.py b/app/app/routers/status.py
deleted file mode 100644
index 78918d4..0000000
--- a/app/app/routers/status.py
+++ /dev/null
@@ -1,37 +0,0 @@
-from typing import Any
-
-from fastapi import APIRouter, Depends, HTTPException
-
-from app.auth import verify_token
-from app.ops_runner import run_ops_json
-
-router = APIRouter()
-
-
-@router.get("/", summary="Get all container statuses")
-async def get_status(
- _: str = Depends(verify_token),
-) -> list[dict[str, Any]]:
- """
- Returns a list of container status objects from `ops status --json`.
-
- Each item contains: project, service, status, health, uptime.
- """
- result = await run_ops_json(["status"])
- if not result["success"]:
- raise HTTPException(
- status_code=500,
- detail=f"Failed to retrieve status: {result['error']}",
- )
-
- data = result["data"]
- # Normalise to list regardless of what ops returns
- if isinstance(data, list):
- return data
- if isinstance(data, dict):
- # Some ops implementations wrap the list in a key
- for key in ("services", "containers", "status", "data"):
- if key in data and isinstance(data[key], list):
- return data[key]
- return [data]
- return []
diff --git a/app/app/routers/system.py b/app/app/routers/system.py
deleted file mode 100644
index a9f15ce..0000000
--- a/app/app/routers/system.py
+++ /dev/null
@@ -1,253 +0,0 @@
-import asyncio
-import os
-import re
-from typing import Any
-
-from fastapi import APIRouter, Depends, HTTPException
-
-from app.auth import verify_token
-from app.ops_runner import run_command, run_ops
-
-router = APIRouter()
-
-
-# ---------------------------------------------------------------------------
-# Helpers
-# ---------------------------------------------------------------------------
-
-def _parse_disk_output(raw: str) -> list[dict[str, str]]:
- """Parse df-style output into a list of filesystem dicts."""
- filesystems: list[dict[str, str]] = []
- lines = raw.strip().splitlines()
- if not lines:
- return filesystems
-
- data_lines = lines[1:] if re.match(r"(?i)filesystem", lines[0]) else lines
-
- for line in data_lines:
- parts = line.split()
- if len(parts) >= 5:
- filesystems.append({
- "filesystem": parts[0],
- "size": parts[1],
- "used": parts[2],
- "available": parts[3],
- "use_percent": parts[4],
- "mount": parts[5] if len(parts) > 5 else "",
- })
- return filesystems
-
-
-def _parse_health_output(raw: str) -> list[dict[str, str]]:
- """Parse health check output into check result dicts."""
- checks: list[dict[str, str]] = []
- for line in raw.strip().splitlines():
- line = line.strip()
- if not line:
- continue
- match = re.match(r"^\[(\w+)\]\s*(.+)$", line)
- if match:
- checks.append({"status": match.group(1), "check": match.group(2)})
- else:
- checks.append({"status": "INFO", "check": line})
- return checks
-
-
-def _parse_timers_output(raw: str) -> list[dict[str, str]]:
- """Parse `systemctl list-timers` output into timer dicts."""
- timers: list[dict[str, str]] = []
- lines = raw.strip().splitlines()
- if not lines:
- return timers
-
- header_idx = 0
- for i, line in enumerate(lines):
- if re.match(r"(?i)next\s+left", line):
- header_idx = i
- break
-
- for line in lines[header_idx + 1:]:
- line = line.strip()
- if not line or line.startswith("timers listed") or line.startswith("To show"):
- continue
- parts = re.split(r"\s{2,}", line)
- if len(parts) >= 5:
- timers.append({
- "next": parts[0], "left": parts[1], "last": parts[2],
- "passed": parts[3], "unit": parts[4],
- "activates": parts[5] if len(parts) > 5 else "",
- })
- elif parts:
- timers.append({"unit": parts[0], "next": "", "left": "", "last": "", "passed": "", "activates": ""})
- return timers
-
-
-def _read_memory() -> dict[str, Any]:
- """Read memory and swap from /proc/meminfo."""
- info: dict[str, int] = {}
- try:
- with open("/proc/meminfo") as f:
- for line in f:
- parts = line.split()
- if len(parts) >= 2:
- key = parts[0].rstrip(":")
- info[key] = int(parts[1]) * 1024 # kB → bytes
- except Exception:
- return {}
-
- mem_total = info.get("MemTotal", 0)
- mem_available = info.get("MemAvailable", 0)
- mem_used = mem_total - mem_available
- swap_total = info.get("SwapTotal", 0)
- swap_free = info.get("SwapFree", 0)
- swap_used = swap_total - swap_free
-
- return {
- "memory": {
- "total": mem_total,
- "used": mem_used,
- "available": mem_available,
- "percent": round(mem_used / mem_total * 100, 1) if mem_total else 0,
- },
- "swap": {
- "total": swap_total,
- "used": swap_used,
- "free": swap_free,
- "percent": round(swap_used / swap_total * 100, 1) if swap_total else 0,
- },
- }
-
-
-def _read_cpu_stat() -> tuple[int, int]:
- """Read idle and total jiffies from /proc/stat."""
- with open("/proc/stat") as f:
- line = f.readline()
- parts = line.split()
- values = [int(x) for x in parts[1:]]
- idle = values[3] + (values[4] if len(values) > 4 else 0) # idle + iowait
- return idle, sum(values)
-
-
-# ---------------------------------------------------------------------------
-# Endpoints
-# ---------------------------------------------------------------------------
-
-@router.get("/disk", summary="Disk usage")
-async def disk_usage(
- _: str = Depends(verify_token),
-) -> dict[str, Any]:
- """Returns disk usage via `ops disk` (fallback: `df -h`)."""
- result = await run_ops(["disk"])
- raw = result["output"]
-
- if not result["success"] or not raw.strip():
- fallback = await run_command(["df", "-h"])
- raw = fallback["output"]
- if not fallback["success"]:
- raise HTTPException(status_code=500, detail=f"Failed to get disk usage: {result['error']}")
-
- return {
- "filesystems": _parse_disk_output(raw),
- "raw": raw,
- }
-
-
-@router.get("/health", summary="System health checks")
-async def health_check(
- _: str = Depends(verify_token),
-) -> dict[str, Any]:
- """Returns health check results via `ops health`."""
- result = await run_ops(["health"])
- if not result["success"] and not result["output"].strip():
- raise HTTPException(status_code=500, detail=f"Failed to run health checks: {result['error']}")
- return {
- "checks": _parse_health_output(result["output"]),
- "raw": result["output"],
- }
-
-
-@router.get("/timers", summary="Systemd timers")
-async def list_timers(
- _: str = Depends(verify_token),
-) -> dict[str, Any]:
- """Lists systemd timers."""
- result = await run_command(["systemctl", "list-timers", "--no-pager"])
- if not result["success"] and not result["output"].strip():
- raise HTTPException(status_code=500, detail=f"Failed to list timers: {result['error']}")
- return {
- "timers": _parse_timers_output(result["output"]),
- "raw": result["output"],
- }
-
-
-@router.get("/info", summary="System information with CPU/memory")
-async def system_info(
- _: str = Depends(verify_token),
-) -> dict[str, Any]:
- """
- Returns system uptime, load average, CPU usage, memory, and swap.
-
- CPU usage is measured over a 0.5s window from /proc/stat.
- Memory/swap are read from /proc/meminfo.
- """
- uptime_str = ""
- load_str = ""
-
- # Uptime
- try:
- with open("/proc/uptime") as f:
- seconds_up = float(f.read().split()[0])
- days = int(seconds_up // 86400)
- hours = int((seconds_up % 86400) // 3600)
- minutes = int((seconds_up % 3600) // 60)
- uptime_str = f"{days}d {hours}h {minutes}m"
- except Exception:
- pass
-
- # Load average
- try:
- with open("/proc/loadavg") as f:
- parts = f.read().split()
- load_str = f"{parts[0]}, {parts[1]}, {parts[2]}"
- except Exception:
- pass
-
- # CPU usage (two samples, 0.5s apart)
- cpu_info: dict[str, Any] = {}
- try:
- idle1, total1 = _read_cpu_stat()
- await asyncio.sleep(0.5)
- idle2, total2 = _read_cpu_stat()
- total_delta = total2 - total1
- if total_delta > 0:
- usage = round((1 - (idle2 - idle1) / total_delta) * 100, 1)
- else:
- usage = 0.0
- cpu_info = {
- "usage_percent": usage,
- "cores": os.cpu_count() or 1,
- }
- except Exception:
- pass
-
- # Memory + Swap
- mem_info = _read_memory()
-
- # Fallback for uptime/load if /proc wasn't available
- if not uptime_str or not load_str:
- result = await run_command(["uptime"])
- if result["success"]:
- raw = result["output"].strip()
- up_match = re.search(r"up\s+(.+?),\s+\d+\s+user", raw)
- if up_match:
- uptime_str = uptime_str or up_match.group(1).strip()
- load_match = re.search(r"load average[s]?:\s*(.+)$", raw, re.IGNORECASE)
- if load_match:
- load_str = load_str or load_match.group(1).strip()
-
- return {
- "uptime": uptime_str or "unavailable",
- "load": load_str or "unavailable",
- "cpu": cpu_info or None,
- **mem_info,
- }
--
Gitblit v1.3.1