Matthias Nott
9 days ago 66e5a4ae432d67335d6ea36607e98d3c14959f3d
docs: message system rewrite spec - append-only log, sync routing, state machine
1 files added
changed files
Notes/SPEC-message-rewrite.md patch | view | blame | history
Notes/SPEC-message-rewrite.md
....@@ -0,0 +1,141 @@
1
+# PAILot Message System Rewrite Spec
2
+
3
+## Problem
4
+
5
+The current message handling has accumulated race conditions from incremental fixes:
6
+- `switchSession` is async — messages arriving during the async gap get overwritten by `loadAll`
7
+- iOS kills the MQTT socket in background but the client reports "connected"
8
+- File corruption from concurrent read/write on the same session file
9
+- Notification tap triggers `switchSession` which reloads from disk, losing in-memory messages
10
+- `addMessage` and `switchSession` compete on the same state and disk files
11
+
12
+## Architecture: Single Message Bus
13
+
14
+### 1. One MQTT topic for everything
15
+
16
+- **Server**: publish ALL outbound messages to `pailot/out` (DONE)
17
+- **App**: subscribe to `pailot/out` + `pailot/sessions` + `pailot/status` + `pailot/control/out`
18
+- Every message carries `type`, `sessionId`, `seq` in the payload
19
+- Client routes by `sessionId` — never by topic
20
+
21
+### 2. MessageStore redesign
22
+
23
+Replace per-session debounced saves with a single append-only log:
24
+
25
+```
26
+~/.../messages/log.jsonl — append-only, one JSON line per message
27
+~/.../messages/index.json — { sessionId: [lineNumbers] } for fast lookup
28
+```
29
+
30
+**Operations:**
31
+- `append(message)` — append one line to log.jsonl (sync, atomic, no race)
32
+- `loadSession(sessionId)` — read index, seek to lines, return messages
33
+- `compact()` — rewrite log removing old messages (run on app start, not during use)
34
+
35
+**Benefits:**
36
+- No per-session files — no file-level races
37
+- Append-only — no read-modify-write cycle
38
+- No debounce needed — each append is a single `writeAsStringSync` with `mode: FileMode.append`
39
+
40
+### 3. Connection state machine
41
+
42
+```
43
+States: disconnected → connecting → connected → suspended → reconnecting → connected
44
+ ↑ |
45
+ └──────────────────────────┘
46
+
47
+Transitions:
48
+- App launch: disconnected → connecting → connected
49
+- App background: connected → suspended (keep client, mark state)
50
+- App resume: suspended → reconnecting → connected (always force-reconnect)
51
+- Connection lost: connected → reconnecting → connected (autoReconnect)
52
+- User disconnect: any → disconnected
53
+```
54
+
55
+**Key rule:** In `suspended` state, do NOT process buffered MQTT messages. Process them only after reconnect + catch_up completes. This prevents the race where a buffered message is added to state before `loadAll` overwrites it.
56
+
57
+### 4. Message routing (synchronous, no async gaps)
58
+
59
+```dart
60
+void _onMessage(Map<String, dynamic> json) {
61
+ final type = json['type'] as String?;
62
+ final sessionId = json['sessionId'] as String?;
63
+ final currentId = _currentSessionId;
64
+
65
+ if (type == 'text' || type == 'voice' || type == 'image') {
66
+ // Append to log immediately (sync)
67
+ MessageStore.append(Message.fromMqtt(json));
68
+
69
+ // Display only if for current session
70
+ if (sessionId == currentId) {
71
+ _messages.add(Message.fromMqtt(json));
72
+ notifyListeners(); // or setState
73
+ } else {
74
+ _incrementUnread(sessionId);
75
+ }
76
+ }
77
+}
78
+```
79
+
80
+No async. No switchSession during message handling. No race.
81
+
82
+### 5. Session switching
83
+
84
+```dart
85
+void switchSession(String sessionId) {
86
+ _currentSessionId = sessionId;
87
+ _messages = MessageStore.loadSession(sessionId); // sync read from index
88
+ notifyListeners();
89
+}
90
+```
91
+
92
+Synchronous. No async gap. No race with incoming messages.
93
+
94
+### 6. Resume flow
95
+
96
+```dart
97
+void onResume() {
98
+ state = suspended;
99
+ // Kill old client (disable autoReconnect first)
100
+ _client?.autoReconnect = false;
101
+ _client?.disconnect();
102
+ _client = null;
103
+
104
+ // Fast reconnect to last host
105
+ await _fastReconnect(connectedHost);
106
+
107
+ // Now process: sync → sessions → catch_up
108
+ // catch_up messages go through same _onMessage path (append + display if current)
109
+ state = connected;
110
+}
111
+```
112
+
113
+### 7. Notification tap
114
+
115
+```dart
116
+void onNotificationTap(String sessionId) {
117
+ if (sessionId != _currentSessionId) {
118
+ switchSession(sessionId); // sync, no async
119
+ }
120
+ // Message is already in the log from MQTT delivery or catch_up
121
+ // switchSession loads it
122
+}
123
+```
124
+
125
+## Migration path
126
+
127
+1. Create `MessageStoreV2` with append-only log
128
+2. Create `ConnectionStateMachine` with explicit states
129
+3. Rewrite `_handleIncomingMessage` to use sync append
130
+4. Rewrite `switchSession` to be sync
131
+5. Remove debounced saves, per-session file locks, merge protection
132
+6. Test each step before moving to next
133
+
134
+## What stays the same
135
+
136
+- MQTT transport (mqtt_client package)
137
+- aedes broker with loopback client on server
138
+- Single `pailot/out` topic
139
+- APNs push notifications
140
+- Splash screen
141
+- UI components (chat bubbles, drawer, settings)