From ed26def7d76ac011075c11e8c1679ed1f7a08abc Mon Sep 17 00:00:00 2001
From: Matthias Nott <mnott@mnsoft.org>
Date: Sat, 21 Feb 2026 16:48:40 +0100
Subject: [PATCH] feat: Clickable stat tiles, view toggle, CPU/memory metrics, restore fix
---
app/app/routers/system.py | 191 +++++++++++++++++++++++++++--------------------
1 files changed, 108 insertions(+), 83 deletions(-)
diff --git a/app/app/routers/system.py b/app/app/routers/system.py
index 0dcf545..a9f15ce 100644
--- a/app/app/routers/system.py
+++ b/app/app/routers/system.py
@@ -1,3 +1,5 @@
+import asyncio
+import os
import re
from typing import Any
@@ -14,98 +16,116 @@
# ---------------------------------------------------------------------------
def _parse_disk_output(raw: str) -> list[dict[str, str]]:
- """
- Parse df-style output into a list of filesystem dicts.
- Handles both the standard df header and custom ops output.
- """
+ """Parse df-style output into a list of filesystem dicts."""
filesystems: list[dict[str, str]] = []
lines = raw.strip().splitlines()
-
if not lines:
return filesystems
- # Skip header line if present
data_lines = lines[1:] if re.match(r"(?i)filesystem", lines[0]) else lines
for line in data_lines:
parts = line.split()
if len(parts) >= 5:
- filesystems.append(
- {
- "filesystem": parts[0],
- "size": parts[1],
- "used": parts[2],
- "available": parts[3],
- "use_percent": parts[4],
- "mount": parts[5] if len(parts) > 5 else "",
- }
- )
-
+ filesystems.append({
+ "filesystem": parts[0],
+ "size": parts[1],
+ "used": parts[2],
+ "available": parts[3],
+ "use_percent": parts[4],
+ "mount": parts[5] if len(parts) > 5 else "",
+ })
return filesystems
def _parse_health_output(raw: str) -> list[dict[str, str]]:
- """
- Parse health check output into a list of check result dicts.
- Expects lines like: "[OK] Database connection" or "[FAIL] Disk space"
- Also handles plain text lines.
- """
+ """Parse health check output into check result dicts."""
checks: list[dict[str, str]] = []
for line in raw.strip().splitlines():
line = line.strip()
if not line:
continue
-
match = re.match(r"^\[(\w+)\]\s*(.+)$", line)
if match:
checks.append({"status": match.group(1), "check": match.group(2)})
else:
- # Treat unstructured lines as informational
checks.append({"status": "INFO", "check": line})
-
return checks
def _parse_timers_output(raw: str) -> list[dict[str, str]]:
- """
- Parse `systemctl list-timers` output into structured timer dicts.
- Header: NEXT LEFT LAST PASSED UNIT ACTIVATES
- """
+ """Parse `systemctl list-timers` output into timer dicts."""
timers: list[dict[str, str]] = []
lines = raw.strip().splitlines()
-
if not lines:
return timers
- # Skip header
header_idx = 0
for i, line in enumerate(lines):
if re.match(r"(?i)next\s+left", line):
header_idx = i
break
- for line in lines[header_idx + 1 :]:
+ for line in lines[header_idx + 1:]:
line = line.strip()
if not line or line.startswith("timers listed") or line.startswith("To show"):
continue
-
- # systemctl list-timers columns are variable-width; split carefully
parts = re.split(r"\s{2,}", line)
if len(parts) >= 5:
- timers.append(
- {
- "next": parts[0],
- "left": parts[1],
- "last": parts[2],
- "passed": parts[3],
- "unit": parts[4],
- "activates": parts[5] if len(parts) > 5 else "",
- }
- )
+ timers.append({
+ "next": parts[0], "left": parts[1], "last": parts[2],
+ "passed": parts[3], "unit": parts[4],
+ "activates": parts[5] if len(parts) > 5 else "",
+ })
elif parts:
timers.append({"unit": parts[0], "next": "", "left": "", "last": "", "passed": "", "activates": ""})
-
return timers
+
+
+def _read_memory() -> dict[str, Any]:
+ """Read memory and swap from /proc/meminfo."""
+ info: dict[str, int] = {}
+ try:
+ with open("/proc/meminfo") as f:
+ for line in f:
+ parts = line.split()
+ if len(parts) >= 2:
+ key = parts[0].rstrip(":")
+ info[key] = int(parts[1]) * 1024 # kB → bytes
+ except Exception:
+ return {}
+
+ mem_total = info.get("MemTotal", 0)
+ mem_available = info.get("MemAvailable", 0)
+ mem_used = mem_total - mem_available
+ swap_total = info.get("SwapTotal", 0)
+ swap_free = info.get("SwapFree", 0)
+ swap_used = swap_total - swap_free
+
+ return {
+ "memory": {
+ "total": mem_total,
+ "used": mem_used,
+ "available": mem_available,
+ "percent": round(mem_used / mem_total * 100, 1) if mem_total else 0,
+ },
+ "swap": {
+ "total": swap_total,
+ "used": swap_used,
+ "free": swap_free,
+ "percent": round(swap_used / swap_total * 100, 1) if swap_total else 0,
+ },
+ }
+
+
+def _read_cpu_stat() -> tuple[int, int]:
+ """Read idle and total jiffies from /proc/stat."""
+ with open("/proc/stat") as f:
+ line = f.readline()
+ parts = line.split()
+ values = [int(x) for x in parts[1:]]
+ idle = values[3] + (values[4] if len(values) > 4 else 0) # idle + iowait
+ return idle, sum(values)
# ---------------------------------------------------------------------------
@@ -116,21 +136,15 @@
async def disk_usage(
_: str = Depends(verify_token),
) -> dict[str, Any]:
- """
- Returns disk usage via `ops disk` (or falls back to `df -h`).
- """
+ """Returns disk usage via `ops disk` (fallback: `df -h`)."""
result = await run_ops(["disk"])
raw = result["output"]
if not result["success"] or not raw.strip():
- # Fallback to df
fallback = await run_command(["df", "-h"])
raw = fallback["output"]
if not fallback["success"]:
- raise HTTPException(
- status_code=500,
- detail=f"Failed to get disk usage: {result['error']}",
- )
+ raise HTTPException(status_code=500, detail=f"Failed to get disk usage: {result['error']}")
return {
"filesystems": _parse_disk_output(raw),
@@ -142,20 +156,13 @@
async def health_check(
_: str = Depends(verify_token),
) -> dict[str, Any]:
- """
- Returns health check results via `ops health`.
- """
+ """Returns health check results via `ops health`."""
result = await run_ops(["health"])
if not result["success"] and not result["output"].strip():
- raise HTTPException(
- status_code=500,
- detail=f"Failed to run health checks: {result['error']}",
- )
-
- raw = result["output"]
+ raise HTTPException(status_code=500, detail=f"Failed to run health checks: {result['error']}")
return {
- "checks": _parse_health_output(raw),
- "raw": raw,
+ "checks": _parse_health_output(result["output"]),
+ "raw": result["output"],
}
@@ -163,35 +170,30 @@
async def list_timers(
_: str = Depends(verify_token),
) -> dict[str, Any]:
- """
- Lists systemd timers via `systemctl list-timers --no-pager`.
- """
+ """Lists systemd timers."""
result = await run_command(["systemctl", "list-timers", "--no-pager"])
if not result["success"] and not result["output"].strip():
- raise HTTPException(
- status_code=500,
- detail=f"Failed to list timers: {result['error']}",
- )
-
- raw = result["output"]
+ raise HTTPException(status_code=500, detail=f"Failed to list timers: {result['error']}")
return {
- "timers": _parse_timers_output(raw),
- "raw": raw,
+ "timers": _parse_timers_output(result["output"]),
+ "raw": result["output"],
}
-@router.get("/info", summary="Basic system information")
+@router.get("/info", summary="System information with CPU/memory")
async def system_info(
_: str = Depends(verify_token),
) -> dict[str, Any]:
"""
- Returns system uptime and load average.
- Reads from /proc/uptime and /proc/loadavg; falls back to `uptime` command.
+ Returns system uptime, load average, CPU usage, memory, and swap.
+
+ CPU usage is measured over a 0.5s window from /proc/stat.
+ Memory/swap are read from /proc/meminfo.
"""
uptime_str = ""
load_str = ""
- # Try /proc first (Linux)
+ # Uptime
try:
with open("/proc/uptime") as f:
seconds_up = float(f.read().split()[0])
@@ -199,22 +201,43 @@
hours = int((seconds_up % 86400) // 3600)
minutes = int((seconds_up % 3600) // 60)
uptime_str = f"{days}d {hours}h {minutes}m"
- except (FileNotFoundError, ValueError):
+ except Exception:
pass
+ # Load average
try:
with open("/proc/loadavg") as f:
parts = f.read().split()
load_str = f"{parts[0]}, {parts[1]}, {parts[2]}"
- except (FileNotFoundError, ValueError):
+ except Exception:
pass
- # Fallback: use `uptime` command (works on macOS too)
+ # CPU usage (two samples, 0.5s apart)
+ cpu_info: dict[str, Any] = {}
+ try:
+ idle1, total1 = _read_cpu_stat()
+ await asyncio.sleep(0.5)
+ idle2, total2 = _read_cpu_stat()
+ total_delta = total2 - total1
+ if total_delta > 0:
+ usage = round((1 - (idle2 - idle1) / total_delta) * 100, 1)
+ else:
+ usage = 0.0
+ cpu_info = {
+ "usage_percent": usage,
+ "cores": os.cpu_count() or 1,
+ }
+ except Exception:
+ pass
+
+ # Memory + Swap
+ mem_info = _read_memory()
+
+ # Fallback for uptime/load if /proc wasn't available
if not uptime_str or not load_str:
result = await run_command(["uptime"])
if result["success"]:
raw = result["output"].strip()
- # Parse "up X days, Y:Z, N users, load average: a, b, c"
up_match = re.search(r"up\s+(.+?),\s+\d+\s+user", raw)
if up_match:
uptime_str = uptime_str or up_match.group(1).strip()
@@ -225,4 +248,6 @@
return {
"uptime": uptime_str or "unavailable",
"load": load_str or "unavailable",
+ "cpu": cpu_info or None,
+ **mem_info,
}
--
Gitblit v1.3.1