Matthias Nott
8 days ago 34b82ffb9b31866b528cd91a3d2ec360dbb8da5e
fix: explicit per-session MQTT subscriptions, lifecycle observer, resume reconnect with fallback
2 files modified
changed files
lib/screens/chat_screen.dart patch | view | blame | history
lib/services/mqtt_service.dart patch | view | blame | history
lib/screens/chat_screen.dart
....@@ -223,10 +223,27 @@
223223 _push?.onMqttConnected();
224224 };
225225 _ws!.onResume = () {
226
- // App came back from background with connection still alive.
227
- // Send catch_up to fetch any messages missed during suspend.
226
+ // App came back from background — connection may or may not be alive.
227
+ // Try catch_up first, but if no response comes, force reconnect.
228228 _chatLog('onResume: sending catch_up with lastSeq=$_lastSeq');
229
+ final seqBefore = _lastSeq;
229230 _sendCommand('catch_up', {'lastSeq': _lastSeq});
231
+ // Force UI rebuild for any buffered messages
232
+ Future.delayed(const Duration(milliseconds: 300), () {
233
+ if (mounted) {
234
+ setState(() {});
235
+ _scrollToBottom();
236
+ }
237
+ });
238
+ // If catch_up didn't produce a response in 2s, connection is dead — reconnect
239
+ Future.delayed(const Duration(seconds: 2), () {
240
+ if (!mounted) return;
241
+ if (_lastSeq == seqBefore) {
242
+ // No new messages arrived — connection likely dead
243
+ _chatLog('onResume: no catch_up response after 2s, forcing reconnect');
244
+ _ws?.forceReconnect();
245
+ }
246
+ });
230247 };
231248 _ws!.onError = (error) {
232249 debugPrint('MQTT error: $error');
....@@ -247,7 +264,9 @@
247264 // sent immediately if already connected.
248265 _push = PushService(mqttService: _ws!);
249266 _push!.onNotificationTap = (data) {
250
- // If notification carried a sessionId, switch to that session
267
+ // Switch to the session immediately, then request catch_up.
268
+ // The MQTT connection auto-reconnects on resume, and onResume
269
+ // already sends catch_up. We just need to be on the right session.
251270 final sessionId = data['sessionId'] as String?;
252271 if (sessionId != null && mounted) {
253272 _switchSession(sessionId);
lib/services/mqtt_service.dart
....@@ -46,7 +46,9 @@
4646 /// Subscribes to all pailot/ topics and dispatches messages
4747 /// through the onMessage callback interface.
4848 class MqttService with WidgetsBindingObserver {
49
- MqttService({required this.config});
49
+ MqttService({required this.config}) {
50
+ WidgetsBinding.instance.addObserver(this);
51
+ }
5052
5153 ServerConfig config;
5254 MqttServerClient? _client;
....@@ -61,6 +63,9 @@
6163 // Message deduplication
6264 final Set<String> _seenMsgIds = {};
6365 final List<String> _seenMsgIdOrder = [];
66
+
67
+ // Per-session explicit subscriptions (wildcards broken in aedes)
68
+ final Set<String> _subscribedSessions = {};
6469 static const int _maxSeenIds = 500;
6570
6671 // Callbacks
....@@ -97,6 +102,34 @@
97102 }
98103 _clientId = id;
99104 return id;
105
+ }
106
+
107
+ /// Force reconnect — disconnect and reconnect to last known host.
108
+ void forceReconnect() {
109
+ _mqttLog('MQTT: force reconnect requested');
110
+ final lastHost = connectedHost;
111
+ _client?.disconnect();
112
+ _client = null;
113
+ _setStatus(ConnectionStatus.reconnecting);
114
+ onReconnecting?.call();
115
+ if (lastHost != null) {
116
+ _fastReconnect(lastHost);
117
+ } else {
118
+ connect();
119
+ }
120
+ }
121
+
122
+ /// Fast reconnect to a known host — skips discovery, short timeout.
123
+ Future<void> _fastReconnect(String host) async {
124
+ _mqttLog('MQTT: fast reconnect to $host');
125
+ final clientId = await _getClientId();
126
+ if (await _tryConnect(host, clientId, timeout: 2000)) {
127
+ connectedHost = host;
128
+ return;
129
+ }
130
+ // Fast path failed — fall back to full connect
131
+ _mqttLog('MQTT: fast reconnect failed, full connect...');
132
+ connect();
100133 }
101134
102135 /// Connect to the MQTT broker.
....@@ -458,15 +491,34 @@
458491 _mqttLog('MQTT: _subscribe called but client is null');
459492 return;
460493 }
461
- _mqttLog('MQTT: subscribing to topics...');
494
+ // Subscribe to exact topics only — wildcard pailot/+/out is broken in aedes.
495
+ // Per-session topics are added dynamically when the session list arrives.
496
+ _mqttLog('MQTT: subscribing to base + ${_subscribedSessions.length} session topics...');
462497 client.subscribe('pailot/sessions', MqttQos.atLeastOnce);
463498 client.subscribe('pailot/status', MqttQos.atLeastOnce);
464499 client.subscribe('pailot/projects', MqttQos.atLeastOnce);
465
- client.subscribe('pailot/+/out', MqttQos.atLeastOnce);
466
- client.subscribe('pailot/+/typing', MqttQos.atMostOnce);
467
- client.subscribe('pailot/+/screenshot', MqttQos.atLeastOnce);
468500 client.subscribe('pailot/control/out', MqttQos.atLeastOnce);
469501 client.subscribe('pailot/voice/transcript', MqttQos.atLeastOnce);
502
+ // Re-subscribe to all known per-session topics
503
+ for (final sid in _subscribedSessions) {
504
+ client.subscribe('pailot/$sid/out', MqttQos.atLeastOnce);
505
+ client.subscribe('pailot/$sid/typing', MqttQos.atMostOnce);
506
+ client.subscribe('pailot/$sid/screenshot', MqttQos.atLeastOnce);
507
+ }
508
+ }
509
+
510
+ /// Subscribe to per-session topics when session list arrives.
511
+ void _subscribeToSessions(List<String> sessionIds) {
512
+ final client = _client;
513
+ if (client == null) return;
514
+ for (final sid in sessionIds) {
515
+ if (_subscribedSessions.contains(sid)) continue;
516
+ _subscribedSessions.add(sid);
517
+ client.subscribe('pailot/$sid/out', MqttQos.atLeastOnce);
518
+ client.subscribe('pailot/$sid/typing', MqttQos.atMostOnce);
519
+ client.subscribe('pailot/$sid/screenshot', MqttQos.atLeastOnce);
520
+ _mqttLog('MQTT: subscribed to session ${sid.substring(0, 8)}');
521
+ }
470522 }
471523
472524 void _listenMessages() {
....@@ -525,9 +577,18 @@
525577 void _dispatchMessage(String topic, Map<String, dynamic> json) {
526578 final parts = topic.split('/');
527579
528
- // pailot/sessions
580
+ // pailot/sessions — also dynamically subscribe to per-session topics
529581 if (topic == 'pailot/sessions') {
530582 json['type'] = 'sessions';
583
+ final sessions = json['sessions'] as List<dynamic>?;
584
+ if (sessions != null) {
585
+ final ids = sessions
586
+ .map((s) => (s as Map<String, dynamic>)['id'] as String?)
587
+ .where((id) => id != null && id.isNotEmpty)
588
+ .cast<String>()
589
+ .toList();
590
+ if (ids.isNotEmpty) _subscribeToSessions(ids);
591
+ }
531592 onMessage?.call(json);
532593 return;
533594 }
....@@ -770,6 +831,7 @@
770831
771832 /// Dispose all resources.
772833 void dispose() {
834
+ WidgetsBinding.instance.removeObserver(this);
773835 disconnect();
774836 }
775837
....@@ -782,14 +844,18 @@
782844 _mqttLog('MQTT: app resumed, status=$_status client=${_client != null} mqttState=${_client?.connectionStatus?.state}');
783845 final client = _client;
784846 if (client == null || client.connectionStatus?.state != MqttConnectionState.connected) {
785
- // Clearly disconnected — just reconnect
786
- _mqttLog('MQTT: not connected on resume, reconnecting...');
847
+ // Clearly disconnected — fast reconnect to last known host
848
+ _mqttLog('MQTT: disconnected on resume, reconnecting...');
787849 _client = null;
788850 _setStatus(ConnectionStatus.reconnecting);
789
- connect();
851
+ if (connectedHost != null) {
852
+ _fastReconnect(connectedHost!);
853
+ } else {
854
+ connect();
855
+ }
790856 } else {
791
- // Appears connected — notify listener to fetch missed messages
792
- // via catch_up. Don't call onOpen (it resets sessionReady and causes flicker).
857
+ // Appears connected — trigger catch_up to fetch missed messages.
858
+ // Don't disconnect! iOS may have buffered messages while suspended.
793859 _mqttLog('MQTT: appears connected on resume, triggering catch_up');
794860 onResume?.call();
795861 }