Matthias Nott
2026-03-24 cb470d33d2665fcc6f8448d2736777656cf0cbe7
feat: MQTT migration, offline catch_up, clean session, image support
7 files modified
1 files deleted
changed files
lib/models/server_config.dart patch | view | blame | history
lib/providers/providers.dart patch | view | blame | history
lib/screens/chat_screen.dart patch | view | blame | history
lib/screens/navigate_screen.dart patch | view | blame | history
lib/screens/settings_screen.dart patch | view | blame | history
lib/services/mqtt_service.dart patch | view | blame | history
lib/services/websocket_service.dart patch | view | blame | history
lib/widgets/status_dot.dart patch | view | blame | history
lib/models/server_config.dart
....@@ -13,23 +13,6 @@
1313 this.mqttToken,
1414 });
1515
16
- /// Primary WebSocket URL (local network).
17
- String get localUrl {
18
- final h = localHost ?? host;
19
- return 'ws://$h:$port';
20
- }
21
-
22
- /// Fallback WebSocket URL (remote / public).
23
- String get remoteUrl => 'ws://$host:$port';
24
-
25
- /// Returns [localUrl, remoteUrl] for dual-connect attempts.
26
- List<String> get urls {
27
- if (localHost != null && localHost!.isNotEmpty && localHost != host) {
28
- return [localUrl, remoteUrl];
29
- }
30
- return [remoteUrl];
31
- }
32
-
3316 Map<String, dynamic> toJson() {
3417 return {
3518 'host': host,
lib/providers/providers.dart
....@@ -8,7 +8,7 @@
88 import '../models/server_config.dart';
99 import '../models/session.dart';
1010 import '../services/message_store.dart';
11
-import '../services/websocket_service.dart' show ConnectionStatus;
11
+import '../services/mqtt_service.dart' show ConnectionStatus;
1212
1313 // --- Enums ---
1414
lib/screens/chat_screen.dart
....@@ -56,6 +56,9 @@
5656 bool _isCatchingUp = false;
5757 bool _screenshotForChat = false;
5858 final Set<int> _seenSeqs = {};
59
+ bool _sessionReady = false;
60
+ final List<Map<String, dynamic>> _pendingMessages = [];
61
+ final Map<String, List<Message>> _catchUpPending = {};
5962
6063 @override
6164 void initState() {
....@@ -66,9 +69,14 @@
6669 }
6770
6871 Future<void> _initAll() async {
69
- // Load lastSeq BEFORE connecting so catch_up sends the right value
72
+ // Load persisted state BEFORE connecting
7073 final prefs = await SharedPreferences.getInstance();
7174 _lastSeq = prefs.getInt('lastSeq') ?? 0;
75
+ // Restore last active session so catch_up routes to the right session
76
+ final savedSessionId = prefs.getString('activeSessionId');
77
+ if (savedSessionId != null && mounted) {
78
+ ref.read(activeSessionIdProvider.notifier).state = savedSessionId;
79
+ }
7280 if (!mounted) return;
7381
7482 // Listen for playback state changes to reset play button UI
....@@ -146,10 +154,11 @@
146154 };
147155 _ws!.onMessage = _handleMessage;
148156 _ws!.onOpen = () {
157
+ _sessionReady = false; // Gate messages until sessions arrive
158
+ _pendingMessages.clear();
149159 final activeId = ref.read(activeSessionIdProvider);
150160 _sendCommand('sync', activeId != null ? {'activeSessionId': activeId} : null);
151
- // catch_up is still available during the transition period
152
- _sendCommand('catch_up', {'lastSeq': _lastSeq});
161
+ // catch_up is sent after sessions arrive (in _handleSessions)
153162 };
154163 _ws!.onError = (error) {
155164 debugPrint('MQTT error: $error');
....@@ -168,6 +177,14 @@
168177 }
169178
170179 void _handleMessage(Map<String, dynamic> msg) {
180
+ final type = msg['type'] as String?;
181
+ // Sessions and catch_up always process immediately
182
+ // Content messages (text, voice, image) wait until session is ready
183
+ if (!_sessionReady && type != 'sessions' && type != 'catch_up' && type != 'status' && type != 'typing') {
184
+ _pendingMessages.add(msg);
185
+ return;
186
+ }
187
+
171188 // Track sequence numbers for catch_up protocol
172189 final seq = msg['seq'] as int?;
173190 if (seq != null) {
....@@ -184,8 +201,6 @@
184201 _saveLastSeq();
185202 }
186203 }
187
-
188
- final type = msg['type'] as String?;
189204
190205 switch (type) {
191206 case 'sessions':
....@@ -231,7 +246,8 @@
231246 if (sessionId != null) _incrementUnread(sessionId);
232247 case 'catch_up':
233248 final serverSeq = msg['serverSeq'] as int?;
234
- if (serverSeq != null && serverSeq > _lastSeq) {
249
+ if (serverSeq != null) {
250
+ // Always sync to server's seq — if server restarted, its seq may be lower
235251 _lastSeq = serverSeq;
236252 _saveLastSeq();
237253 }
....@@ -241,19 +257,91 @@
241257 final catchUpMsgs = msg['messages'] as List<dynamic>?;
242258 if (catchUpMsgs != null && catchUpMsgs.isNotEmpty) {
243259 _isCatchingUp = true;
260
+ final activeId = ref.read(activeSessionIdProvider);
244261 final existing = ref.read(messagesProvider);
245262 final existingContents = existing
246263 .where((m) => m.role == MessageRole.assistant)
247264 .map((m) => m.content)
248265 .toSet();
249266 for (final m in catchUpMsgs) {
250
- final content = (m as Map<String, dynamic>)['content'] as String? ?? '';
251
- // Skip if we already have this message locally
252
- if (content.isNotEmpty && existingContents.contains(content)) continue;
253
- _handleMessage(m);
254
- if (content.isNotEmpty) existingContents.add(content);
267
+ final map = m as Map<String, dynamic>;
268
+ final msgType = map['type'] as String? ?? 'text';
269
+ final content = map['content'] as String? ?? map['transcript'] as String? ?? map['caption'] as String? ?? '';
270
+ final msgSessionId = map['sessionId'] as String?;
271
+ final imageData = map['imageBase64'] as String?;
272
+
273
+ // Skip empty text messages (images with no caption are OK)
274
+ if (content.isEmpty && imageData == null) continue;
275
+ // Dedup by content (skip images from dedup — they have unique msgIds)
276
+ if (imageData == null && content.isNotEmpty && existingContents.contains(content)) continue;
277
+
278
+ final Message message;
279
+ if (msgType == 'image' && imageData != null) {
280
+ message = Message.image(
281
+ role: MessageRole.assistant,
282
+ imageBase64: imageData,
283
+ content: content,
284
+ status: MessageStatus.sent,
285
+ );
286
+ } else {
287
+ message = Message.text(
288
+ role: MessageRole.assistant,
289
+ content: content,
290
+ status: MessageStatus.sent,
291
+ );
292
+ }
293
+
294
+ if (msgSessionId == null || msgSessionId == activeId) {
295
+ // Active session or no session: add directly to chat
296
+ ref.read(messagesProvider.notifier).addMessage(message);
297
+ } else {
298
+ // Different session: store + unread badge + toast
299
+ // Collect for batch storage below to avoid race condition
300
+ _catchUpPending.putIfAbsent(msgSessionId, () => []).add(message);
301
+ _incrementUnread(msgSessionId);
302
+ }
303
+ existingContents.add(content);
255304 }
256305 _isCatchingUp = false;
306
+ _scrollToBottom();
307
+ // Batch-store cross-session messages (sequential to avoid race condition)
308
+ if (_catchUpPending.isNotEmpty) {
309
+ final pending = Map<String, List<Message>>.from(_catchUpPending);
310
+ _catchUpPending.clear();
311
+ // Show one toast per session with message count
312
+ if (mounted) {
313
+ final sessions = ref.read(sessionsProvider);
314
+ for (final entry in pending.entries) {
315
+ final session = sessions.firstWhere(
316
+ (s) => s.id == entry.key,
317
+ orElse: () => Session(id: entry.key, index: 0, name: 'Unknown', type: 'claude'),
318
+ );
319
+ final count = entry.value.length;
320
+ final preview = count == 1
321
+ ? entry.value.first.content
322
+ : '$count messages';
323
+ ToastManager.show(
324
+ context,
325
+ sessionName: session.name,
326
+ preview: preview.length > 100 ? '${preview.substring(0, 100)}...' : preview,
327
+ onTap: () => _switchSession(entry.key),
328
+ );
329
+ }
330
+ }
331
+ () async {
332
+ for (final entry in pending.entries) {
333
+ final existing = await MessageStore.loadAll(entry.key);
334
+ MessageStore.save(entry.key, [...existing, ...entry.value]);
335
+ await MessageStore.flush();
336
+ }
337
+ }();
338
+ }
339
+ // Clear unread for active session
340
+ if (activeId != null) {
341
+ final counts = Map<String, int>.from(ref.read(unreadCountsProvider));
342
+ counts.remove(activeId);
343
+ ref.read(unreadCountsProvider.notifier).state = counts;
344
+ }
257345 }
258346 case 'pong':
259347 break; // heartbeat response, ignore
....@@ -284,6 +372,22 @@
284372 );
285373 ref.read(activeSessionIdProvider.notifier).state = active.id;
286374 ref.read(messagesProvider.notifier).switchSession(active.id);
375
+ SharedPreferences.getInstance().then((p) => p.setString('activeSessionId', active.id));
376
+ }
377
+
378
+ // Session is ready — process any pending messages that arrived before sessions list
379
+ if (!_sessionReady) {
380
+ _sessionReady = true;
381
+ // Request catch_up now that session is set
382
+ _sendCommand('catch_up', {'lastSeq': _lastSeq});
383
+ // Drain messages that arrived before sessions list
384
+ if (_pendingMessages.isNotEmpty) {
385
+ final pending = List<Map<String, dynamic>>.from(_pendingMessages);
386
+ _pendingMessages.clear();
387
+ for (final m in pending) {
388
+ _handleMessage(m);
389
+ }
390
+ }
287391 }
288392 }
289393
....@@ -507,6 +611,7 @@
507611
508612 ref.read(activeSessionIdProvider.notifier).state = sessionId;
509613 await ref.read(messagesProvider.notifier).switchSession(sessionId);
614
+ SharedPreferences.getInstance().then((p) => p.setString('activeSessionId', sessionId));
510615
511616 final counts = Map<String, int>.from(ref.read(unreadCountsProvider));
512617 counts.remove(sessionId);
lib/screens/navigate_screen.dart
....@@ -192,20 +192,11 @@
192192 void _sendKey(String key) {
193193 _haptic();
194194
195
- // Send via WebSocket - the chat screen's WS is in the provider
196
- // We need to access the WS through the provider system
197
- // For now, send a nav command message
195
+ // Send via MQTT - the chat screen's MQTT service is in the provider
198196 final activeSessionId = ref.read(activeSessionIdProvider);
199197
200
- // Build the navigate command
201
- // This sends a key press to the AIBroker daemon
202
- // which forwards it to the active terminal session
203
- // The WS is managed by ChatScreen, so we'll use a message approach
204
-
205
- // Since we can't directly access the WS from here,
206
- // we send through the provider approach - the message will be picked up
207
- // by the WS service in ChatScreen via a shared notification mechanism.
208
- // For simplicity, we use a global event bus pattern.
198
+ // Send a key press to the AIBroker daemon via the MQTT service.
199
+ // NavigateNotifier bridges the navigate screen to the chat screen's MQTT service.
209200
210201 NavigateNotifier.instance?.sendKey(key, activeSessionId);
211202
....@@ -228,8 +219,8 @@
228219 }
229220 }
230221
231
-/// Global notifier to bridge navigate screen to WebSocket.
232
-/// Set by ChatScreen when WS is initialized.
222
+/// Global notifier to bridge navigate screen to MQTT service.
223
+/// Set by ChatScreen when MQTT is initialized.
233224 class NavigateNotifier {
234225 static NavigateNotifier? instance;
235226
lib/screens/settings_screen.dart
....@@ -3,7 +3,7 @@
33
44 import '../models/server_config.dart';
55 import '../providers/providers.dart';
6
-import '../services/websocket_service.dart' show ConnectionStatus;
6
+import '../services/mqtt_service.dart' show ConnectionStatus;
77 import '../services/wol_service.dart';
88 import '../theme/app_theme.dart';
99 import '../widgets/status_dot.dart';
lib/services/mqtt_service.dart
....@@ -10,8 +10,15 @@
1010 import 'package:uuid/uuid.dart';
1111
1212 import '../models/server_config.dart';
13
-import 'websocket_service.dart' show ConnectionStatus;
1413 import 'wol_service.dart';
14
+
15
+/// Connection status for the MQTT client.
16
+enum ConnectionStatus {
17
+ disconnected,
18
+ connecting,
19
+ connected,
20
+ reconnecting,
21
+}
1522
1623 // Debug log to file (survives release builds)
1724 Future<void> _mqttLog(String msg) async {
....@@ -23,11 +30,11 @@
2330 } catch (_) {}
2431 }
2532
26
-/// MQTT client for PAILot, replacing WebSocketService.
33
+/// MQTT client for PAILot.
2734 ///
2835 /// Connects to the AIBroker daemon's embedded aedes broker.
2936 /// Subscribes to all pailot/ topics and dispatches messages
30
-/// through the same callback interface as WebSocketService.
37
+/// through the onMessage callback interface.
3138 class MqttService with WidgetsBindingObserver {
3239 MqttService({required this.config});
3340
....@@ -43,7 +50,7 @@
4350 final List<String> _seenMsgIdOrder = [];
4451 static const int _maxSeenIds = 500;
4552
46
- // Callbacks — same interface as WebSocketService
53
+ // Callbacks
4754 void Function(ConnectionStatus status)? onStatusChanged;
4855 void Function(Map<String, dynamic> message)? onMessage;
4956 void Function()? onOpen;
....@@ -149,9 +156,12 @@
149156 client.onAutoReconnect = _onAutoReconnect;
150157 client.onAutoReconnected = _onAutoReconnected;
151158
152
- // Persistent session: broker queues QoS 1 messages while client is offline
159
+ // Clean session: we handle offline delivery ourselves via catch_up protocol.
160
+ // Persistent sessions cause the broker to flood all queued QoS 1 messages
161
+ // on reconnect, which overwhelms the client with large voice payloads.
153162 final connMessage = MqttConnectMessage()
154163 .withClientIdentifier(clientId)
164
+ .startClean()
155165 .authenticateAs('pailot', config.mqttToken ?? '');
156166
157167 client.connectionMessage = connMessage;
....@@ -268,7 +278,7 @@
268278
269279 /// Route incoming MQTT messages to the onMessage callback.
270280 /// Translates MQTT topic structure into the flat message format
271
- /// that chat_screen expects (same as WebSocket messages).
281
+ /// that chat_screen expects.
272282 void _dispatchMessage(String topic, Map<String, dynamic> json) {
273283 final parts = topic.split('/');
274284
....@@ -369,7 +379,6 @@
369379 }
370380
371381 /// Send a message — routes to the appropriate MQTT topic based on content.
372
- /// Accepts the same message format as WebSocketService.send().
373382 void send(Map<String, dynamic> message) {
374383 final type = message['type'] as String?;
375384 final sessionId = message['sessionId'] as String?;
lib/services/websocket_service.dart
deleted file mode 100644
....@@ -1,288 +0,0 @@
1
-import 'dart:async';
2
-import 'dart:convert';
3
-
4
-import 'package:flutter/widgets.dart';
5
-import 'package:web_socket_channel/web_socket_channel.dart';
6
-
7
-import '../models/server_config.dart';
8
-import 'wol_service.dart';
9
-
10
-enum ConnectionStatus {
11
- disconnected,
12
- connecting,
13
- connected,
14
- reconnecting,
15
-}
16
-
17
-/// WebSocket client with dual-URL fallback, heartbeat, and auto-reconnect.
18
-class WebSocketService with WidgetsBindingObserver {
19
- WebSocketService({required this.config});
20
-
21
- ServerConfig config;
22
- WebSocketChannel? _channel;
23
- ConnectionStatus _status = ConnectionStatus.disconnected;
24
- Timer? _heartbeatTimer;
25
- Timer? _zombieTimer;
26
- Timer? _reconnectTimer;
27
- int _reconnectAttempt = 0;
28
- bool _intentionalClose = false;
29
- DateTime? _lastPong;
30
- StreamSubscription? _subscription;
31
-
32
- // Callbacks
33
- void Function()? onOpen;
34
- void Function()? onClose;
35
- void Function()? onReconnecting;
36
- void Function(Map<String, dynamic> message)? onMessage;
37
- void Function(String error)? onError;
38
- void Function(ConnectionStatus status)? onStatusChanged;
39
-
40
- ConnectionStatus get status => _status;
41
- bool get isConnected => _status == ConnectionStatus.connected;
42
-
43
- void _setStatus(ConnectionStatus newStatus) {
44
- if (_status == newStatus) return;
45
- _status = newStatus;
46
- onStatusChanged?.call(newStatus);
47
- }
48
-
49
- /// Connect to the WebSocket server.
50
- /// Tries local URL first (2.5s timeout), then remote URL.
51
- Future<void> connect() async {
52
- if (_status == ConnectionStatus.connected ||
53
- _status == ConnectionStatus.connecting) {
54
- return;
55
- }
56
-
57
- _intentionalClose = false;
58
- _setStatus(ConnectionStatus.connecting);
59
-
60
- // Send Wake-on-LAN if MAC configured
61
- if (config.macAddress != null && config.macAddress!.isNotEmpty) {
62
- try {
63
- await WolService.wake(config.macAddress!, localHost: config.localHost);
64
- } catch (_) {}
65
- }
66
-
67
- final urls = config.urls;
68
-
69
- for (final url in urls) {
70
- if (_intentionalClose) return;
71
-
72
- try {
73
- final connected = await _tryConnect(url,
74
- timeout: url == urls.first && urls.length > 1
75
- ? const Duration(milliseconds: 2500)
76
- : const Duration(seconds: 5));
77
- if (connected) return;
78
- } catch (_) {
79
- continue;
80
- }
81
- }
82
-
83
- // All URLs failed
84
- _setStatus(ConnectionStatus.disconnected);
85
- onError?.call('Failed to connect to server');
86
- _scheduleReconnect();
87
- }
88
-
89
- Future<bool> _tryConnect(String url, {Duration? timeout}) async {
90
- try {
91
- final uri = Uri.parse(url);
92
- final channel = WebSocketChannel.connect(uri);
93
-
94
- // Wait for connection with timeout
95
- await channel.ready.timeout(
96
- timeout ?? const Duration(seconds: 5),
97
- onTimeout: () {
98
- channel.sink.close();
99
- throw TimeoutException('Connection timeout');
100
- },
101
- );
102
-
103
- _channel = channel;
104
- _reconnectAttempt = 0;
105
- _setStatus(ConnectionStatus.connected);
106
- _startHeartbeat();
107
- _listenMessages();
108
- onOpen?.call();
109
- return true;
110
- } catch (e) {
111
- return false;
112
- }
113
- }
114
-
115
- void _listenMessages() {
116
- _subscription?.cancel();
117
- _subscription = _channel?.stream.listen(
118
- (data) {
119
- _lastPong = DateTime.now();
120
-
121
- if (data is String) {
122
- // Handle pong
123
- if (data == 'pong') return;
124
-
125
- try {
126
- final json = jsonDecode(data) as Map<String, dynamic>;
127
- onMessage?.call(json);
128
- } catch (_) {
129
- // Non-JSON message, ignore
130
- }
131
- }
132
- },
133
- onError: (error) {
134
- onError?.call(error.toString());
135
- _handleDisconnect();
136
- },
137
- onDone: () {
138
- _handleDisconnect();
139
- },
140
- );
141
- }
142
-
143
- void _startHeartbeat() {
144
- _heartbeatTimer?.cancel();
145
- _zombieTimer?.cancel();
146
- _lastPong = DateTime.now();
147
-
148
- // Send ping every 30 seconds
149
- _heartbeatTimer = Timer.periodic(const Duration(seconds: 30), (_) {
150
- if (_channel != null && _status == ConnectionStatus.connected) {
151
- try {
152
- _channel!.sink.add(jsonEncode({'type': 'ping'}));
153
- } catch (_) {
154
- _handleDisconnect();
155
- }
156
- }
157
- });
158
-
159
- // Check for zombie connection every 15 seconds
160
- _zombieTimer = Timer.periodic(const Duration(seconds: 15), (_) {
161
- if (_lastPong != null) {
162
- final elapsed = DateTime.now().difference(_lastPong!);
163
- if (elapsed.inSeconds > 60) {
164
- _handleDisconnect();
165
- }
166
- }
167
- });
168
- }
169
-
170
- void _handleDisconnect() {
171
- _stopHeartbeat();
172
- _subscription?.cancel();
173
-
174
- final wasConnected = _status == ConnectionStatus.connected;
175
-
176
- try {
177
- _channel?.sink.close();
178
- } catch (_) {}
179
- _channel = null;
180
-
181
- if (_intentionalClose) {
182
- _setStatus(ConnectionStatus.disconnected);
183
- onClose?.call();
184
- } else if (wasConnected) {
185
- _setStatus(ConnectionStatus.reconnecting);
186
- onReconnecting?.call();
187
- _scheduleReconnect();
188
- }
189
- }
190
-
191
- void _stopHeartbeat() {
192
- _heartbeatTimer?.cancel();
193
- _zombieTimer?.cancel();
194
- _heartbeatTimer = null;
195
- _zombieTimer = null;
196
- }
197
-
198
- void _scheduleReconnect() {
199
- if (_intentionalClose) return;
200
-
201
- _reconnectTimer?.cancel();
202
-
203
- // Exponential backoff: 1s, 2s, 4s, 8s, 16s, 30s max
204
- final delay = Duration(
205
- milliseconds: (1000 * (1 << _reconnectAttempt.clamp(0, 4)))
206
- .clamp(1000, 30000),
207
- );
208
-
209
- _reconnectAttempt++;
210
-
211
- _reconnectTimer = Timer(delay, () {
212
- if (!_intentionalClose) {
213
- _setStatus(ConnectionStatus.reconnecting);
214
- onReconnecting?.call();
215
- connect();
216
- }
217
- });
218
- }
219
-
220
- /// Send a JSON message.
221
- void send(Map<String, dynamic> message) {
222
- if (_channel == null || _status != ConnectionStatus.connected) {
223
- onError?.call('Not connected');
224
- return;
225
- }
226
-
227
- try {
228
- _channel!.sink.add(jsonEncode(message));
229
- } catch (e) {
230
- onError?.call('Send failed: $e');
231
- }
232
- }
233
-
234
- /// Send a raw string.
235
- void sendRaw(String data) {
236
- if (_channel == null || _status != ConnectionStatus.connected) return;
237
- try {
238
- _channel!.sink.add(data);
239
- } catch (_) {}
240
- }
241
-
242
- /// Disconnect intentionally.
243
- void disconnect() {
244
- _intentionalClose = true;
245
- _reconnectTimer?.cancel();
246
- _stopHeartbeat();
247
- _subscription?.cancel();
248
-
249
- try {
250
- _channel?.sink.close();
251
- } catch (_) {}
252
- _channel = null;
253
-
254
- _setStatus(ConnectionStatus.disconnected);
255
- onClose?.call();
256
- }
257
-
258
- /// Update config and reconnect.
259
- Future<void> updateConfig(ServerConfig newConfig) async {
260
- config = newConfig;
261
- disconnect();
262
- await Future.delayed(const Duration(milliseconds: 100));
263
- await connect();
264
- }
265
-
266
- /// Dispose all resources.
267
- void dispose() {
268
- disconnect();
269
- _reconnectTimer?.cancel();
270
- }
271
-
272
- // App lifecycle integration
273
- @override
274
- void didChangeAppLifecycleState(AppLifecycleState state) {
275
- switch (state) {
276
- case AppLifecycleState.resumed:
277
- if (_status != ConnectionStatus.connected && !_intentionalClose) {
278
- _reconnectAttempt = 0;
279
- connect();
280
- }
281
- case AppLifecycleState.paused:
282
- // Keep connection alive but don't reconnect aggressively
283
- break;
284
- default:
285
- break;
286
- }
287
- }
288
-}
lib/widgets/status_dot.dart
....@@ -1,9 +1,9 @@
11 import 'package:flutter/material.dart';
22
3
-import '../services/websocket_service.dart';
3
+import '../services/mqtt_service.dart';
44 import '../theme/app_theme.dart';
55
6
-/// 10px circle indicating WebSocket connection status.
6
+/// 10px circle indicating MQTT connection status.
77 class StatusDot extends StatelessWidget {
88 final ConnectionStatus status;
99