| .. | .. |
|---|
| 1 | +# PAILot Message System Rewrite Spec |
|---|
| 2 | + |
|---|
| 3 | +## Problem |
|---|
| 4 | + |
|---|
| 5 | +The current message handling has accumulated race conditions from incremental fixes: |
|---|
| 6 | +- `switchSession` is async — messages arriving during the async gap get overwritten by `loadAll` |
|---|
| 7 | +- iOS kills the MQTT socket in background but the client reports "connected" |
|---|
| 8 | +- File corruption from concurrent read/write on the same session file |
|---|
| 9 | +- Notification tap triggers `switchSession` which reloads from disk, losing in-memory messages |
|---|
| 10 | +- `addMessage` and `switchSession` compete on the same state and disk files |
|---|
| 11 | + |
|---|
| 12 | +## Architecture: Single Message Bus |
|---|
| 13 | + |
|---|
| 14 | +### 1. One MQTT topic for everything |
|---|
| 15 | + |
|---|
| 16 | +- **Server**: publish ALL outbound messages to `pailot/out` (DONE) |
|---|
| 17 | +- **App**: subscribe to `pailot/out` + `pailot/sessions` + `pailot/status` + `pailot/control/out` |
|---|
| 18 | +- Every message carries `type`, `sessionId`, `seq` in the payload |
|---|
| 19 | +- Client routes by `sessionId` — never by topic |
|---|
| 20 | + |
|---|
| 21 | +### 2. MessageStore redesign |
|---|
| 22 | + |
|---|
| 23 | +Replace per-session debounced saves with a single append-only log: |
|---|
| 24 | + |
|---|
| 25 | +``` |
|---|
| 26 | +~/.../messages/log.jsonl — append-only, one JSON line per message |
|---|
| 27 | +~/.../messages/index.json — { sessionId: [lineNumbers] } for fast lookup |
|---|
| 28 | +``` |
|---|
| 29 | + |
|---|
| 30 | +**Operations:** |
|---|
| 31 | +- `append(message)` — append one line to log.jsonl (sync, atomic, no race) |
|---|
| 32 | +- `loadSession(sessionId)` — read index, seek to lines, return messages |
|---|
| 33 | +- `compact()` — rewrite log removing old messages (run on app start, not during use) |
|---|
| 34 | + |
|---|
| 35 | +**Benefits:** |
|---|
| 36 | +- No per-session files — no file-level races |
|---|
| 37 | +- Append-only — no read-modify-write cycle |
|---|
| 38 | +- No debounce needed — each append is a single `writeAsStringSync` with `mode: FileMode.append` |
|---|
| 39 | + |
|---|
| 40 | +### 3. Connection state machine |
|---|
| 41 | + |
|---|
| 42 | +``` |
|---|
| 43 | +States: disconnected → connecting → connected → suspended → reconnecting → connected |
|---|
| 44 | + ↑ | |
|---|
| 45 | + └──────────────────────────┘ |
|---|
| 46 | + |
|---|
| 47 | +Transitions: |
|---|
| 48 | +- App launch: disconnected → connecting → connected |
|---|
| 49 | +- App background: connected → suspended (keep client, mark state) |
|---|
| 50 | +- App resume: suspended → reconnecting → connected (always force-reconnect) |
|---|
| 51 | +- Connection lost: connected → reconnecting → connected (autoReconnect) |
|---|
| 52 | +- User disconnect: any → disconnected |
|---|
| 53 | +``` |
|---|
| 54 | + |
|---|
| 55 | +**Key rule:** In `suspended` state, do NOT process buffered MQTT messages. Process them only after reconnect + catch_up completes. This prevents the race where a buffered message is added to state before `loadAll` overwrites it. |
|---|
| 56 | + |
|---|
| 57 | +### 4. Message routing (synchronous, no async gaps) |
|---|
| 58 | + |
|---|
| 59 | +```dart |
|---|
| 60 | +void _onMessage(Map<String, dynamic> json) { |
|---|
| 61 | + final type = json['type'] as String?; |
|---|
| 62 | + final sessionId = json['sessionId'] as String?; |
|---|
| 63 | + final currentId = _currentSessionId; |
|---|
| 64 | + |
|---|
| 65 | + if (type == 'text' || type == 'voice' || type == 'image') { |
|---|
| 66 | + // Append to log immediately (sync) |
|---|
| 67 | + MessageStore.append(Message.fromMqtt(json)); |
|---|
| 68 | + |
|---|
| 69 | + // Display only if for current session |
|---|
| 70 | + if (sessionId == currentId) { |
|---|
| 71 | + _messages.add(Message.fromMqtt(json)); |
|---|
| 72 | + notifyListeners(); // or setState |
|---|
| 73 | + } else { |
|---|
| 74 | + _incrementUnread(sessionId); |
|---|
| 75 | + } |
|---|
| 76 | + } |
|---|
| 77 | +} |
|---|
| 78 | +``` |
|---|
| 79 | + |
|---|
| 80 | +No async. No switchSession during message handling. No race. |
|---|
| 81 | + |
|---|
| 82 | +### 5. Session switching |
|---|
| 83 | + |
|---|
| 84 | +```dart |
|---|
| 85 | +void switchSession(String sessionId) { |
|---|
| 86 | + _currentSessionId = sessionId; |
|---|
| 87 | + _messages = MessageStore.loadSession(sessionId); // sync read from index |
|---|
| 88 | + notifyListeners(); |
|---|
| 89 | +} |
|---|
| 90 | +``` |
|---|
| 91 | + |
|---|
| 92 | +Synchronous. No async gap. No race with incoming messages. |
|---|
| 93 | + |
|---|
| 94 | +### 6. Resume flow |
|---|
| 95 | + |
|---|
| 96 | +```dart |
|---|
| 97 | +void onResume() { |
|---|
| 98 | + state = suspended; |
|---|
| 99 | + // Kill old client (disable autoReconnect first) |
|---|
| 100 | + _client?.autoReconnect = false; |
|---|
| 101 | + _client?.disconnect(); |
|---|
| 102 | + _client = null; |
|---|
| 103 | + |
|---|
| 104 | + // Fast reconnect to last host |
|---|
| 105 | + await _fastReconnect(connectedHost); |
|---|
| 106 | + |
|---|
| 107 | + // Now process: sync → sessions → catch_up |
|---|
| 108 | + // catch_up messages go through same _onMessage path (append + display if current) |
|---|
| 109 | + state = connected; |
|---|
| 110 | +} |
|---|
| 111 | +``` |
|---|
| 112 | + |
|---|
| 113 | +### 7. Notification tap |
|---|
| 114 | + |
|---|
| 115 | +```dart |
|---|
| 116 | +void onNotificationTap(String sessionId) { |
|---|
| 117 | + if (sessionId != _currentSessionId) { |
|---|
| 118 | + switchSession(sessionId); // sync, no async |
|---|
| 119 | + } |
|---|
| 120 | + // Message is already in the log from MQTT delivery or catch_up |
|---|
| 121 | + // switchSession loads it |
|---|
| 122 | +} |
|---|
| 123 | +``` |
|---|
| 124 | + |
|---|
| 125 | +## Migration path |
|---|
| 126 | + |
|---|
| 127 | +1. Create `MessageStoreV2` with append-only log |
|---|
| 128 | +2. Create `ConnectionStateMachine` with explicit states |
|---|
| 129 | +3. Rewrite `_handleIncomingMessage` to use sync append |
|---|
| 130 | +4. Rewrite `switchSession` to be sync |
|---|
| 131 | +5. Remove debounced saves, per-session file locks, merge protection |
|---|
| 132 | +6. Test each step before moving to next |
|---|
| 133 | + |
|---|
| 134 | +## What stays the same |
|---|
| 135 | + |
|---|
| 136 | +- MQTT transport (mqtt_client package) |
|---|
| 137 | +- aedes broker with loopback client on server |
|---|
| 138 | +- Single `pailot/out` topic |
|---|
| 139 | +- APNs push notifications |
|---|
| 140 | +- Splash screen |
|---|
| 141 | +- UI components (chat bubbles, drawer, settings) |
|---|