import re from typing import Any from fastapi import APIRouter, Depends, HTTPException from app.auth import verify_token from app.ops_runner import run_command, run_ops router = APIRouter() # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _parse_disk_output(raw: str) -> list[dict[str, str]]: """ Parse df-style output into a list of filesystem dicts. Handles both the standard df header and custom ops output. """ filesystems: list[dict[str, str]] = [] lines = raw.strip().splitlines() if not lines: return filesystems # Skip header line if present data_lines = lines[1:] if re.match(r"(?i)filesystem", lines[0]) else lines for line in data_lines: parts = line.split() if len(parts) >= 5: filesystems.append( { "filesystem": parts[0], "size": parts[1], "used": parts[2], "available": parts[3], "use_percent": parts[4], "mount": parts[5] if len(parts) > 5 else "", } ) return filesystems def _parse_health_output(raw: str) -> list[dict[str, str]]: """ Parse health check output into a list of check result dicts. Expects lines like: "[OK] Database connection" or "[FAIL] Disk space" Also handles plain text lines. """ checks: list[dict[str, str]] = [] for line in raw.strip().splitlines(): line = line.strip() if not line: continue match = re.match(r"^\[(\w+)\]\s*(.+)$", line) if match: checks.append({"status": match.group(1), "check": match.group(2)}) else: # Treat unstructured lines as informational checks.append({"status": "INFO", "check": line}) return checks def _parse_timers_output(raw: str) -> list[dict[str, str]]: """ Parse `systemctl list-timers` output into structured timer dicts. Header: NEXT LEFT LAST PASSED UNIT ACTIVATES """ timers: list[dict[str, str]] = [] lines = raw.strip().splitlines() if not lines: return timers # Skip header header_idx = 0 for i, line in enumerate(lines): if re.match(r"(?i)next\s+left", line): header_idx = i break for line in lines[header_idx + 1 :]: line = line.strip() if not line or line.startswith("timers listed") or line.startswith("To show"): continue # systemctl list-timers columns are variable-width; split carefully parts = re.split(r"\s{2,}", line) if len(parts) >= 5: timers.append( { "next": parts[0], "left": parts[1], "last": parts[2], "passed": parts[3], "unit": parts[4], "activates": parts[5] if len(parts) > 5 else "", } ) elif parts: timers.append({"unit": parts[0], "next": "", "left": "", "last": "", "passed": "", "activates": ""}) return timers # --------------------------------------------------------------------------- # Endpoints # --------------------------------------------------------------------------- @router.get("/disk", summary="Disk usage") async def disk_usage( _: str = Depends(verify_token), ) -> dict[str, Any]: """ Returns disk usage via `ops disk` (or falls back to `df -h`). """ result = await run_ops(["disk"]) raw = result["output"] if not result["success"] or not raw.strip(): # Fallback to df fallback = await run_command(["df", "-h"]) raw = fallback["output"] if not fallback["success"]: raise HTTPException( status_code=500, detail=f"Failed to get disk usage: {result['error']}", ) return { "filesystems": _parse_disk_output(raw), "raw": raw, } @router.get("/health", summary="System health checks") async def health_check( _: str = Depends(verify_token), ) -> dict[str, Any]: """ Returns health check results via `ops health`. """ result = await run_ops(["health"]) if not result["success"] and not result["output"].strip(): raise HTTPException( status_code=500, detail=f"Failed to run health checks: {result['error']}", ) raw = result["output"] return { "checks": _parse_health_output(raw), "raw": raw, } @router.get("/timers", summary="Systemd timers") async def list_timers( _: str = Depends(verify_token), ) -> dict[str, Any]: """ Lists systemd timers via `systemctl list-timers --no-pager`. """ result = await run_command(["systemctl", "list-timers", "--no-pager"]) if not result["success"] and not result["output"].strip(): raise HTTPException( status_code=500, detail=f"Failed to list timers: {result['error']}", ) raw = result["output"] return { "timers": _parse_timers_output(raw), "raw": raw, } @router.get("/info", summary="Basic system information") async def system_info( _: str = Depends(verify_token), ) -> dict[str, Any]: """ Returns system uptime and load average. Reads from /proc/uptime and /proc/loadavg; falls back to `uptime` command. """ uptime_str = "" load_str = "" # Try /proc first (Linux) try: with open("/proc/uptime") as f: seconds_up = float(f.read().split()[0]) days = int(seconds_up // 86400) hours = int((seconds_up % 86400) // 3600) minutes = int((seconds_up % 3600) // 60) uptime_str = f"{days}d {hours}h {minutes}m" except (FileNotFoundError, ValueError): pass try: with open("/proc/loadavg") as f: parts = f.read().split() load_str = f"{parts[0]}, {parts[1]}, {parts[2]}" except (FileNotFoundError, ValueError): pass # Fallback: use `uptime` command (works on macOS too) if not uptime_str or not load_str: result = await run_command(["uptime"]) if result["success"]: raw = result["output"].strip() # Parse "up X days, Y:Z, N users, load average: a, b, c" up_match = re.search(r"up\s+(.+?),\s+\d+\s+user", raw) if up_match: uptime_str = uptime_str or up_match.group(1).strip() load_match = re.search(r"load average[s]?:\s*(.+)$", raw, re.IGNORECASE) if load_match: load_str = load_str or load_match.group(1).strip() return { "uptime": uptime_str or "unavailable", "load": load_str or "unavailable", }