import asyncio import os import re from typing import Any from fastapi import APIRouter, Depends, HTTPException from app.auth import verify_token from app.ops_runner import run_command, run_ops router = APIRouter() # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _parse_disk_output(raw: str) -> list[dict[str, str]]: """Parse df-style output into a list of filesystem dicts.""" filesystems: list[dict[str, str]] = [] lines = raw.strip().splitlines() if not lines: return filesystems data_lines = lines[1:] if re.match(r"(?i)filesystem", lines[0]) else lines for line in data_lines: parts = line.split() if len(parts) >= 5: filesystems.append({ "filesystem": parts[0], "size": parts[1], "used": parts[2], "available": parts[3], "use_percent": parts[4], "mount": parts[5] if len(parts) > 5 else "", }) return filesystems def _parse_health_output(raw: str) -> list[dict[str, str]]: """Parse health check output into check result dicts.""" checks: list[dict[str, str]] = [] for line in raw.strip().splitlines(): line = line.strip() if not line: continue match = re.match(r"^\[(\w+)\]\s*(.+)$", line) if match: checks.append({"status": match.group(1), "check": match.group(2)}) else: checks.append({"status": "INFO", "check": line}) return checks def _parse_timers_output(raw: str) -> list[dict[str, str]]: """Parse `systemctl list-timers` output into timer dicts.""" timers: list[dict[str, str]] = [] lines = raw.strip().splitlines() if not lines: return timers header_idx = 0 for i, line in enumerate(lines): if re.match(r"(?i)next\s+left", line): header_idx = i break for line in lines[header_idx + 1:]: line = line.strip() if not line or line.startswith("timers listed") or line.startswith("To show"): continue parts = re.split(r"\s{2,}", line) if len(parts) >= 5: timers.append({ "next": parts[0], "left": parts[1], "last": parts[2], "passed": parts[3], "unit": parts[4], "activates": parts[5] if len(parts) > 5 else "", }) elif parts: timers.append({"unit": parts[0], "next": "", "left": "", "last": "", "passed": "", "activates": ""}) return timers def _read_memory() -> dict[str, Any]: """Read memory and swap from /proc/meminfo.""" info: dict[str, int] = {} try: with open("/proc/meminfo") as f: for line in f: parts = line.split() if len(parts) >= 2: key = parts[0].rstrip(":") info[key] = int(parts[1]) * 1024 # kB → bytes except Exception: return {} mem_total = info.get("MemTotal", 0) mem_available = info.get("MemAvailable", 0) mem_used = mem_total - mem_available swap_total = info.get("SwapTotal", 0) swap_free = info.get("SwapFree", 0) swap_used = swap_total - swap_free return { "memory": { "total": mem_total, "used": mem_used, "available": mem_available, "percent": round(mem_used / mem_total * 100, 1) if mem_total else 0, }, "swap": { "total": swap_total, "used": swap_used, "free": swap_free, "percent": round(swap_used / swap_total * 100, 1) if swap_total else 0, }, } def _read_cpu_stat() -> tuple[int, int]: """Read idle and total jiffies from /proc/stat.""" with open("/proc/stat") as f: line = f.readline() parts = line.split() values = [int(x) for x in parts[1:]] idle = values[3] + (values[4] if len(values) > 4 else 0) # idle + iowait return idle, sum(values) # --------------------------------------------------------------------------- # Endpoints # --------------------------------------------------------------------------- @router.get("/disk", summary="Disk usage") async def disk_usage( _: str = Depends(verify_token), ) -> dict[str, Any]: """Returns disk usage via `ops disk` (fallback: `df -h`).""" result = await run_ops(["disk"]) raw = result["output"] if not result["success"] or not raw.strip(): fallback = await run_command(["df", "-h"]) raw = fallback["output"] if not fallback["success"]: raise HTTPException(status_code=500, detail=f"Failed to get disk usage: {result['error']}") return { "filesystems": _parse_disk_output(raw), "raw": raw, } @router.get("/health", summary="System health checks") async def health_check( _: str = Depends(verify_token), ) -> dict[str, Any]: """Returns health check results via `ops health`.""" result = await run_ops(["health"]) if not result["success"] and not result["output"].strip(): raise HTTPException(status_code=500, detail=f"Failed to run health checks: {result['error']}") return { "checks": _parse_health_output(result["output"]), "raw": result["output"], } @router.get("/timers", summary="Systemd timers") async def list_timers( _: str = Depends(verify_token), ) -> dict[str, Any]: """Lists systemd timers.""" result = await run_command(["systemctl", "list-timers", "--no-pager"]) if not result["success"] and not result["output"].strip(): raise HTTPException(status_code=500, detail=f"Failed to list timers: {result['error']}") return { "timers": _parse_timers_output(result["output"]), "raw": result["output"], } @router.get("/info", summary="System information with CPU/memory") async def system_info( _: str = Depends(verify_token), ) -> dict[str, Any]: """ Returns system uptime, load average, CPU usage, memory, and swap. CPU usage is measured over a 0.5s window from /proc/stat. Memory/swap are read from /proc/meminfo. """ uptime_str = "" load_str = "" # Uptime try: with open("/proc/uptime") as f: seconds_up = float(f.read().split()[0]) days = int(seconds_up // 86400) hours = int((seconds_up % 86400) // 3600) minutes = int((seconds_up % 3600) // 60) uptime_str = f"{days}d {hours}h {minutes}m" except Exception: pass # Load average try: with open("/proc/loadavg") as f: parts = f.read().split() load_str = f"{parts[0]}, {parts[1]}, {parts[2]}" except Exception: pass # CPU usage (two samples, 0.5s apart) cpu_info: dict[str, Any] = {} try: idle1, total1 = _read_cpu_stat() await asyncio.sleep(0.5) idle2, total2 = _read_cpu_stat() total_delta = total2 - total1 if total_delta > 0: usage = round((1 - (idle2 - idle1) / total_delta) * 100, 1) else: usage = 0.0 cpu_info = { "usage_percent": usage, "cores": os.cpu_count() or 1, } except Exception: pass # Memory + Swap mem_info = _read_memory() # Fallback for uptime/load if /proc wasn't available if not uptime_str or not load_str: result = await run_command(["uptime"]) if result["success"]: raw = result["output"].strip() up_match = re.search(r"up\s+(.+?),\s+\d+\s+user", raw) if up_match: uptime_str = uptime_str or up_match.group(1).strip() load_match = re.search(r"load average[s]?:\s*(.+)$", raw, re.IGNORECASE) if load_match: load_str = load_str or load_match.group(1).strip() return { "uptime": uptime_str or "unavailable", "load": load_str or "unavailable", "cpu": cpu_info or None, **mem_info, }