import asyncio import os import re from typing import Any from fastapi import APIRouter, Depends, HTTPException from app.auth import verify_token from app.ops_runner import run_command, run_command_host, run_ops, run_ops_host router = APIRouter() # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _parse_disk_output(raw: str) -> list[dict[str, str]]: """Parse df-style output into a list of filesystem dicts.""" filesystems: list[dict[str, str]] = [] lines = raw.strip().splitlines() if not lines: return filesystems data_lines = lines[1:] if re.match(r"(?i)filesystem", lines[0]) else lines for line in data_lines: parts = line.split() if len(parts) >= 5: filesystems.append({ "filesystem": parts[0], "size": parts[1], "used": parts[2], "available": parts[3], "use_percent": parts[4], "mount": parts[5] if len(parts) > 5 else "", }) return filesystems def _parse_health_output(raw: str) -> list[dict[str, str]]: """Parse health check output into check result dicts.""" checks: list[dict[str, str]] = [] for line in raw.strip().splitlines(): line = line.strip() if not line: continue match = re.match(r"^\[(\w+)\]\s*(.+)$", line) if match: checks.append({"status": match.group(1), "check": match.group(2)}) else: checks.append({"status": "INFO", "check": line}) return checks def _parse_timers_output(raw: str) -> list[dict[str, str]]: """Parse `systemctl list-timers` by anchoring on timestamp and .timer patterns.""" timers: list[dict[str, str]] = [] # Timestamp pattern: "Day YYYY-MM-DD HH:MM:SS TZ" or "-" ts = r"(?:\w{3} \d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} \w+|-)" timer_re = re.compile( rf"^(?P{ts})\s+(?P.+?)\s+" rf"(?P{ts})\s+(?P.+?)\s+" r"(?P\S+\.timer)\s+(?P\S+)" ) for line in raw.strip().splitlines(): m = timer_re.match(line) if m: timers.append({k: v.strip() for k, v in m.groupdict().items()}) return timers def _read_memory() -> dict[str, Any]: """Read memory and swap from /proc/meminfo.""" info: dict[str, int] = {} try: with open("/proc/meminfo") as f: for line in f: parts = line.split() if len(parts) >= 2: key = parts[0].rstrip(":") info[key] = int(parts[1]) * 1024 # kB → bytes except Exception: return {} mem_total = info.get("MemTotal", 0) mem_available = info.get("MemAvailable", 0) mem_used = mem_total - mem_available swap_total = info.get("SwapTotal", 0) swap_free = info.get("SwapFree", 0) swap_used = swap_total - swap_free return { "memory": { "total": mem_total, "used": mem_used, "available": mem_available, "percent": round(mem_used / mem_total * 100, 1) if mem_total else 0, }, "swap": { "total": swap_total, "used": swap_used, "free": swap_free, "percent": round(swap_used / swap_total * 100, 1) if swap_total else 0, }, } def _read_cpu_stat() -> tuple[int, int]: """Read idle and total jiffies from /proc/stat.""" with open("/proc/stat") as f: line = f.readline() parts = line.split() values = [int(x) for x in parts[1:]] idle = values[3] + (values[4] if len(values) > 4 else 0) # idle + iowait return idle, sum(values) # --------------------------------------------------------------------------- # Endpoints # --------------------------------------------------------------------------- @router.get("/disk", summary="Disk usage") async def disk_usage( _: str = Depends(verify_token), ) -> dict[str, Any]: """Returns disk usage via `ops disk` (fallback: `df -h`).""" result = await run_ops(["disk"]) raw = result["output"] if not result["success"] or not raw.strip(): fallback = await run_command(["df", "-h"]) raw = fallback["output"] if not fallback["success"]: raise HTTPException(status_code=500, detail=f"Failed to get disk usage: {result['error']}") return { "filesystems": _parse_disk_output(raw), "raw": raw, } @router.get("/health", summary="System health checks") async def health_check( _: str = Depends(verify_token), ) -> dict[str, Any]: """Returns health check results via `ops health` on the host.""" result = await run_ops_host(["health"]) if not result["success"] and not result["output"].strip(): raise HTTPException(status_code=500, detail=f"Failed to run health checks: {result['error']}") return { "checks": _parse_health_output(result["output"]), "raw": result["output"], } @router.get("/timers", summary="Systemd timers") async def list_timers( _: str = Depends(verify_token), ) -> dict[str, Any]: """Lists systemd timers via nsenter on the host.""" result = await run_command_host(["systemctl", "list-timers", "--no-pager"]) if not result["success"] and not result["output"].strip(): raise HTTPException(status_code=500, detail=f"Failed to list timers: {result['error']}") return { "timers": _parse_timers_output(result["output"]), "raw": result["output"], } @router.get("/info", summary="System information with CPU/memory") async def system_info( _: str = Depends(verify_token), ) -> dict[str, Any]: """ Returns system uptime, CPU usage, memory, and swap. CPU usage is measured over a 0.5s window from /proc/stat. Memory/swap are read from /proc/meminfo. """ uptime_str = "" # Uptime try: with open("/proc/uptime") as f: seconds_up = float(f.read().split()[0]) days = int(seconds_up // 86400) hours = int((seconds_up % 86400) // 3600) minutes = int((seconds_up % 3600) // 60) uptime_str = f"{days}d {hours}h {minutes}m" except Exception: pass # CPU usage (two samples, 0.5s apart) cpu_info: dict[str, Any] = {} try: idle1, total1 = _read_cpu_stat() await asyncio.sleep(0.5) idle2, total2 = _read_cpu_stat() total_delta = total2 - total1 if total_delta > 0: usage = round((1 - (idle2 - idle1) / total_delta) * 100, 1) else: usage = 0.0 cpu_info = { "usage_percent": usage, "cores": os.cpu_count() or 1, } except Exception: pass # Memory + Swap mem_info = _read_memory() # Container count containers_str = "" try: result = await run_command(["docker", "ps", "--format", "{{.State}}"]) if result["success"]: states = [s for s in result["output"].strip().splitlines() if s] running = sum(1 for s in states if s == "running") containers_str = f"{running}/{len(states)}" except Exception: pass # Process count processes = 0 try: with open("/proc/loadavg") as f: parts = f.read().split() if len(parts) >= 4: # /proc/loadavg field 4 is "running/total" processes processes = int(parts[3].split("/")[1]) except Exception: pass # Fallback for uptime if /proc wasn't available if not uptime_str: result = await run_command(["uptime"]) if result["success"]: raw = result["output"].strip() up_match = re.search(r"up\s+(.+?),\s+\d+\s+user", raw) if up_match: uptime_str = up_match.group(1).strip() return { "uptime": uptime_str or "unavailable", "cpu": cpu_info or None, "containers": containers_str or "n/a", "processes": processes or 0, **mem_info, }