1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
| | import asyncio
| | import json
| | import os
| | import uuid
| | from typing import AsyncGenerator
| |
| | OPS_CLI = os.environ.get("OPS_CLI", "/opt/infrastructure/ops")
| | OFFSITE_PYTHON = os.environ.get("OFFSITE_PYTHON", "/opt/data/π/bin/python3")
| | OFFSITE_SCRIPT = os.environ.get("OFFSITE_SCRIPT", "/opt/data/scripts/offsite.py")
| |
| | _DEFAULT_TIMEOUT = 300
| | _BACKUP_TIMEOUT = 3600
| |
| | # ---------------------------------------------------------------------------
| | # Operation registry — tracks running processes for cancel support
| | # ---------------------------------------------------------------------------
| | _active_ops: dict[str, asyncio.subprocess.Process] = {}
| | _cancelled_ops: set[str] = set()
| |
| |
| | def new_op_id() -> str:
| | return uuid.uuid4().hex[:12]
| |
| |
| | def register_op(op_id: str, proc: asyncio.subprocess.Process) -> None:
| | _active_ops[op_id] = proc
| |
| |
| | def deregister_op(op_id: str) -> None:
| | _active_ops.pop(op_id, None)
| | # NOTE: do NOT clear _cancelled_ops here — callers check is_cancelled()
| | # after the stream ends. The flag is cleared by clear_cancelled() instead.
| |
| |
| | def clear_cancelled(op_id: str) -> None:
| | """Call after the generator has finished checking is_cancelled()."""
| | _cancelled_ops.discard(op_id)
| |
| |
| | def cancel_op(op_id: str) -> bool:
| | """Terminate a running operation. Returns True if found and killed."""
| | proc = _active_ops.get(op_id)
| | if proc is None:
| | return False
| | _cancelled_ops.add(op_id)
| | try:
| | proc.terminate()
| | except ProcessLookupError:
| | pass
| | return True
| |
| |
| | def is_cancelled(op_id: str) -> bool:
| | return op_id in _cancelled_ops
| |
| | # nsenter via Docker: run commands on the host from inside the container.
| | # Required because ops backup/restore delegate to host Python venvs (3.12)
| | # that are incompatible with the container's Python (3.11).
| | _NSENTER_PREFIX = [
| | "docker", "run", "--rm", "-i",
| | "--privileged", "--pid=host", "--network=host",
| | "alpine",
| | "nsenter", "-t", "1", "-m", "-u", "-i", "-n", "-p", "--",
| | ]
| |
| |
| | # ---------------------------------------------------------------------------
| | # In-container execution (status, disk, health, docker commands)
| | # ---------------------------------------------------------------------------
| |
| | async def run_ops(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
| | """Run the ops CLI inside the container."""
| | return await _run_exec([OPS_CLI] + args, timeout=timeout)
| |
| |
| | async def run_ops_json(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
| | """Run the ops CLI with --json and return parsed JSON."""
| | result = await run_ops(args + ["--json"], timeout=timeout)
| | if not result["success"]:
| | return {"success": False, "data": None, "error": result["error"] or result["output"]}
| | try:
| | data = json.loads(result["output"])
| | return {"success": True, "data": data, "error": ""}
| | except json.JSONDecodeError as exc:
| | raw = result["output"][:500]
| | return {
| | "success": False,
| | "data": None,
| | "error": f"Failed to parse JSON: {exc}\nRaw: {raw}",
| | }
| |
| |
| | async def stream_ops(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]:
| | """Stream ops CLI output (in-container)."""
| | async for line in _stream_exec([OPS_CLI] + args, timeout=timeout):
| | yield line
| |
| |
| | async def run_command(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
| | """Generic command runner (in-container)."""
| | return await _run_exec(args, timeout=timeout)
| |
| |
| | async def stream_command(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> AsyncGenerator[str, None]:
| | """Stream generic command output (in-container)."""
| | async for line in _stream_exec(args, timeout=timeout):
| | yield line
| |
| |
| | # ---------------------------------------------------------------------------
| | # Host execution (backup, restore — needs host Python venvs)
| | # ---------------------------------------------------------------------------
| |
| | async def run_ops_host(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
| | """Run the ops CLI on the host via nsenter."""
| | return await _run_exec(_NSENTER_PREFIX + [OPS_CLI] + args, timeout=timeout)
| |
| |
| | async def run_ops_host_json(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
| | """Run the ops CLI on the host via nsenter with --json and return parsed JSON."""
| | result = await run_ops_host(args + ["--json"], timeout=timeout)
| | if not result["success"]:
| | return {"success": False, "data": None, "error": result["error"] or result["output"]}
| | try:
| | data = json.loads(result["output"])
| | return {"success": True, "data": data, "error": ""}
| | except json.JSONDecodeError as exc:
| | raw = result["output"][:500]
| | return {
| | "success": False,
| | "data": None,
| | "error": f"Failed to parse JSON: {exc}\nRaw: {raw}",
| | }
| |
| |
| | async def stream_ops_host(args: list[str], timeout: int = _DEFAULT_TIMEOUT, op_id: str | None = None) -> AsyncGenerator[str, None]:
| | """Stream ops CLI output from the host via nsenter."""
| | async for line in _stream_exec(_NSENTER_PREFIX + [OPS_CLI] + args, timeout=timeout, op_id=op_id):
| | yield line
| |
| |
| | async def run_command_host(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
| | """Run an arbitrary command on the host via nsenter."""
| | return await _run_exec(_NSENTER_PREFIX + args, timeout=timeout)
| |
| |
| | async def stream_command_host(args: list[str], timeout: int = _DEFAULT_TIMEOUT, op_id: str | None = None) -> AsyncGenerator[str, None]:
| | """Stream arbitrary command output from the host via nsenter."""
| | async for line in _stream_exec(_NSENTER_PREFIX + args, timeout=timeout, op_id=op_id):
| | yield line
| |
| |
| | # ---------------------------------------------------------------------------
| | # Internal helpers
| | # ---------------------------------------------------------------------------
| |
| | async def _run_exec(args: list[str], timeout: int = _DEFAULT_TIMEOUT) -> dict:
| | """Execute a command and capture output."""
| | try:
| | proc = await asyncio.create_subprocess_exec(
| | *args,
| | stdout=asyncio.subprocess.PIPE,
| | stderr=asyncio.subprocess.PIPE,
| | )
| | try:
| | stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
| | except asyncio.TimeoutError:
| | proc.kill()
| | await proc.communicate()
| | return {"success": False, "output": "", "error": f"Command timed out after {timeout}s"}
| |
| | return {
| | "success": proc.returncode == 0,
| | "output": stdout.decode("utf-8", errors="replace"),
| | "error": stderr.decode("utf-8", errors="replace"),
| | }
| | except FileNotFoundError as exc:
| | return {"success": False, "output": "", "error": f"Executable not found: {exc}"}
| | except Exception as exc:
| | return {"success": False, "output": "", "error": str(exc)}
| |
| |
| | async def _stream_exec(args: list[str], timeout: int = _DEFAULT_TIMEOUT, op_id: str | None = None) -> AsyncGenerator[str, None]:
| | """Execute a command and yield interleaved stdout/stderr lines."""
| | try:
| | proc = await asyncio.create_subprocess_exec(
| | *args,
| | stdout=asyncio.subprocess.PIPE,
| | stderr=asyncio.subprocess.PIPE,
| | )
| | except FileNotFoundError as exc:
| | yield f"[error] Executable not found: {exc}"
| | return
| | except Exception as exc:
| | yield f"[error] Failed to start process: {exc}"
| | return
| |
| | if op_id:
| | register_op(op_id, proc)
| |
| | try:
| | async def _readline(stream, prefix=""):
| | while True:
| | try:
| | line = await asyncio.wait_for(stream.readline(), timeout=timeout)
| | except asyncio.TimeoutError:
| | yield f"{prefix}[timeout] Command exceeded {timeout}s"
| | break
| | if not line:
| | break
| | yield prefix + line.decode("utf-8", errors="replace").rstrip("\n")
| |
| | stdout_gen = _readline(proc.stdout).__aiter__()
| | stderr_gen = _readline(proc.stderr, "[stderr] ").__aiter__()
| |
| | stdout_done = stderr_done = False
| | pending_out = pending_err = None
| |
| | async def _next(it):
| | try:
| | return await it.__anext__()
| | except StopAsyncIteration:
| | return None
| |
| | pending_out = asyncio.create_task(_next(stdout_gen))
| | pending_err = asyncio.create_task(_next(stderr_gen))
| |
| | while not (stdout_done and stderr_done):
| | tasks = [t for t in (pending_out, pending_err) if t is not None]
| | if not tasks:
| | break
| | done, _ = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
| |
| | for task in done:
| | val = task.result()
| | if task is pending_out:
| | if val is None:
| | stdout_done = True
| | pending_out = None
| | else:
| | yield val
| | pending_out = asyncio.create_task(_next(stdout_gen))
| | elif task is pending_err:
| | if val is None:
| | stderr_done = True
| | pending_err = None
| | else:
| | yield val
| | pending_err = asyncio.create_task(_next(stderr_gen))
| |
| | await proc.wait()
| | except (asyncio.CancelledError, GeneratorExit):
| | # Browser disconnected or generator closed — kill the process
| | try:
| | proc.terminate()
| | except ProcessLookupError:
| | pass
| | await proc.wait()
| | raise
| | finally:
| | if op_id:
| | deregister_op(op_id)
|
|