From ed26def7d76ac011075c11e8c1679ed1f7a08abc Mon Sep 17 00:00:00 2001
From: Matthias Nott <mnott@mnsoft.org>
Date: Sat, 21 Feb 2026 16:48:40 +0100
Subject: [PATCH] feat: Clickable stat tiles, view toggle, CPU/memory metrics, restore fix

---
 app/app/routers/system.py |  191 +++++++++++++++++++++++++++--------------------
 1 files changed, 108 insertions(+), 83 deletions(-)

diff --git a/app/app/routers/system.py b/app/app/routers/system.py
index 0dcf545..a9f15ce 100644
--- a/app/app/routers/system.py
+++ b/app/app/routers/system.py
@@ -1,3 +1,5 @@
+import asyncio
+import os
 import re
 from typing import Any
 
@@ -14,98 +16,116 @@
 # ---------------------------------------------------------------------------
 
 def _parse_disk_output(raw: str) -> list[dict[str, str]]:
-    """
-    Parse df-style output into a list of filesystem dicts.
-    Handles both the standard df header and custom ops output.
-    """
+    """Parse df-style output into a list of filesystem dicts."""
     filesystems: list[dict[str, str]] = []
     lines = raw.strip().splitlines()
-
     if not lines:
         return filesystems
 
-    # Skip header line if present
     data_lines = lines[1:] if re.match(r"(?i)filesystem", lines[0]) else lines
 
     for line in data_lines:
         parts = line.split()
         if len(parts) >= 5:
-            filesystems.append(
-                {
-                    "filesystem": parts[0],
-                    "size": parts[1],
-                    "used": parts[2],
-                    "available": parts[3],
-                    "use_percent": parts[4],
-                    "mount": parts[5] if len(parts) > 5 else "",
-                }
-            )
-
+            filesystems.append({
+                "filesystem": parts[0],
+                "size": parts[1],
+                "used": parts[2],
+                "available": parts[3],
+                "use_percent": parts[4],
+                "mount": parts[5] if len(parts) > 5 else "",
+            })
     return filesystems
 
 
 def _parse_health_output(raw: str) -> list[dict[str, str]]:
-    """
-    Parse health check output into a list of check result dicts.
-    Expects lines like: "[OK] Database connection" or "[FAIL] Disk space"
-    Also handles plain text lines.
-    """
+    """Parse health check output into check result dicts."""
     checks: list[dict[str, str]] = []
     for line in raw.strip().splitlines():
         line = line.strip()
         if not line:
             continue
-
         match = re.match(r"^\[(\w+)\]\s*(.+)$", line)
         if match:
             checks.append({"status": match.group(1), "check": match.group(2)})
         else:
-            # Treat unstructured lines as informational
             checks.append({"status": "INFO", "check": line})
-
     return checks
 
 
 def _parse_timers_output(raw: str) -> list[dict[str, str]]:
-    """
-    Parse `systemctl list-timers` output into structured timer dicts.
-    Header: NEXT LEFT LAST PASSED UNIT ACTIVATES
-    """
+    """Parse `systemctl list-timers` output into timer dicts."""
     timers: list[dict[str, str]] = []
     lines = raw.strip().splitlines()
-
     if not lines:
         return timers
 
-    # Skip header
     header_idx = 0
     for i, line in enumerate(lines):
         if re.match(r"(?i)next\s+left", line):
             header_idx = i
             break
 
-    for line in lines[header_idx + 1 :]:
+    for line in lines[header_idx + 1:]:
         line = line.strip()
         if not line or line.startswith("timers listed") or line.startswith("To show"):
             continue
-
-        # systemctl list-timers columns are variable-width; split carefully
         parts = re.split(r"\s{2,}", line)
         if len(parts) >= 5:
-            timers.append(
-                {
-                    "next": parts[0],
-                    "left": parts[1],
-                    "last": parts[2],
-                    "passed": parts[3],
-                    "unit": parts[4],
-                    "activates": parts[5] if len(parts) > 5 else "",
-                }
-            )
+            timers.append({
+                "next": parts[0], "left": parts[1], "last": parts[2],
+                "passed": parts[3], "unit": parts[4],
+                "activates": parts[5] if len(parts) > 5 else "",
+            })
         elif parts:
             timers.append({"unit": parts[0], "next": "", "left": "", "last": "", "passed": "", "activates": ""})
-
     return timers
+
+
+def _read_memory() -> dict[str, Any]:
+    """Read memory and swap from /proc/meminfo."""
+    info: dict[str, int] = {}
+    try:
+        with open("/proc/meminfo") as f:
+            for line in f:
+                parts = line.split()
+                if len(parts) >= 2:
+                    key = parts[0].rstrip(":")
+                    info[key] = int(parts[1]) * 1024  # kB → bytes
+    except Exception:
+        return {}
+
+    mem_total = info.get("MemTotal", 0)
+    mem_available = info.get("MemAvailable", 0)
+    mem_used = mem_total - mem_available
+    swap_total = info.get("SwapTotal", 0)
+    swap_free = info.get("SwapFree", 0)
+    swap_used = swap_total - swap_free
+
+    return {
+        "memory": {
+            "total": mem_total,
+            "used": mem_used,
+            "available": mem_available,
+            "percent": round(mem_used / mem_total * 100, 1) if mem_total else 0,
+        },
+        "swap": {
+            "total": swap_total,
+            "used": swap_used,
+            "free": swap_free,
+            "percent": round(swap_used / swap_total * 100, 1) if swap_total else 0,
+        },
+    }
+
+
+def _read_cpu_stat() -> tuple[int, int]:
+    """Read idle and total jiffies from /proc/stat."""
+    with open("/proc/stat") as f:
+        line = f.readline()
+    parts = line.split()
+    values = [int(x) for x in parts[1:]]
+    idle = values[3] + (values[4] if len(values) > 4 else 0)  # idle + iowait
+    return idle, sum(values)
 
 
 # ---------------------------------------------------------------------------
@@ -116,21 +136,15 @@
 async def disk_usage(
     _: str = Depends(verify_token),
 ) -> dict[str, Any]:
-    """
-    Returns disk usage via `ops disk` (or falls back to `df -h`).
-    """
+    """Returns disk usage via `ops disk` (fallback: `df -h`)."""
     result = await run_ops(["disk"])
     raw = result["output"]
 
     if not result["success"] or not raw.strip():
-        # Fallback to df
         fallback = await run_command(["df", "-h"])
         raw = fallback["output"]
         if not fallback["success"]:
-            raise HTTPException(
-                status_code=500,
-                detail=f"Failed to get disk usage: {result['error']}",
-            )
+            raise HTTPException(status_code=500, detail=f"Failed to get disk usage: {result['error']}")
 
     return {
         "filesystems": _parse_disk_output(raw),
@@ -142,20 +156,13 @@
 async def health_check(
     _: str = Depends(verify_token),
 ) -> dict[str, Any]:
-    """
-    Returns health check results via `ops health`.
-    """
+    """Returns health check results via `ops health`."""
     result = await run_ops(["health"])
     if not result["success"] and not result["output"].strip():
-        raise HTTPException(
-            status_code=500,
-            detail=f"Failed to run health checks: {result['error']}",
-        )
-
-    raw = result["output"]
+        raise HTTPException(status_code=500, detail=f"Failed to run health checks: {result['error']}")
     return {
-        "checks": _parse_health_output(raw),
-        "raw": raw,
+        "checks": _parse_health_output(result["output"]),
+        "raw": result["output"],
     }
 
 
@@ -163,35 +170,30 @@
 async def list_timers(
     _: str = Depends(verify_token),
 ) -> dict[str, Any]:
-    """
-    Lists systemd timers via `systemctl list-timers --no-pager`.
-    """
+    """Lists systemd timers."""
     result = await run_command(["systemctl", "list-timers", "--no-pager"])
     if not result["success"] and not result["output"].strip():
-        raise HTTPException(
-            status_code=500,
-            detail=f"Failed to list timers: {result['error']}",
-        )
-
-    raw = result["output"]
+        raise HTTPException(status_code=500, detail=f"Failed to list timers: {result['error']}")
     return {
-        "timers": _parse_timers_output(raw),
-        "raw": raw,
+        "timers": _parse_timers_output(result["output"]),
+        "raw": result["output"],
     }
 
 
-@router.get("/info", summary="Basic system information")
+@router.get("/info", summary="System information with CPU/memory")
 async def system_info(
     _: str = Depends(verify_token),
 ) -> dict[str, Any]:
     """
-    Returns system uptime and load average.
-    Reads from /proc/uptime and /proc/loadavg; falls back to `uptime` command.
+    Returns system uptime, load average, CPU usage, memory, and swap.
+
+    CPU usage is measured over a 0.5s window from /proc/stat.
+    Memory/swap are read from /proc/meminfo.
     """
     uptime_str = ""
     load_str = ""
 
-    # Try /proc first (Linux)
+    # Uptime
     try:
         with open("/proc/uptime") as f:
             seconds_up = float(f.read().split()[0])
@@ -199,22 +201,43 @@
             hours = int((seconds_up % 86400) // 3600)
             minutes = int((seconds_up % 3600) // 60)
             uptime_str = f"{days}d {hours}h {minutes}m"
-    except (FileNotFoundError, ValueError):
+    except Exception:
         pass
 
+    # Load average
     try:
         with open("/proc/loadavg") as f:
             parts = f.read().split()
             load_str = f"{parts[0]}, {parts[1]}, {parts[2]}"
-    except (FileNotFoundError, ValueError):
+    except Exception:
         pass
 
-    # Fallback: use `uptime` command (works on macOS too)
+    # CPU usage (two samples, 0.5s apart)
+    cpu_info: dict[str, Any] = {}
+    try:
+        idle1, total1 = _read_cpu_stat()
+        await asyncio.sleep(0.5)
+        idle2, total2 = _read_cpu_stat()
+        total_delta = total2 - total1
+        if total_delta > 0:
+            usage = round((1 - (idle2 - idle1) / total_delta) * 100, 1)
+        else:
+            usage = 0.0
+        cpu_info = {
+            "usage_percent": usage,
+            "cores": os.cpu_count() or 1,
+        }
+    except Exception:
+        pass
+
+    # Memory + Swap
+    mem_info = _read_memory()
+
+    # Fallback for uptime/load if /proc wasn't available
     if not uptime_str or not load_str:
         result = await run_command(["uptime"])
         if result["success"]:
             raw = result["output"].strip()
-            # Parse "up X days, Y:Z,  N users,  load average: a, b, c"
             up_match = re.search(r"up\s+(.+?),\s+\d+\s+user", raw)
             if up_match:
                 uptime_str = uptime_str or up_match.group(1).strip()
@@ -225,4 +248,6 @@
     return {
         "uptime": uptime_str or "unavailable",
         "load": load_str or "unavailable",
+        "cpu": cpu_info or None,
+        **mem_info,
     }

--
Gitblit v1.3.1