From 34b82ffb9b31866b528cd91a3d2ec360dbb8da5e Mon Sep 17 00:00:00 2001
From: Matthias Nott <mnott@mnsoft.org>
Date: Mon, 06 Apr 2026 13:05:50 +0200
Subject: [PATCH] fix: explicit per-session MQTT subscriptions, lifecycle observer, resume reconnect with fallback

---
 lib/services/mqtt_service.dart |   88 ++++++++++++++++++++++++++++++++++++++-----
 lib/screens/chat_screen.dart   |   25 +++++++++++-
 2 files changed, 99 insertions(+), 14 deletions(-)

diff --git a/lib/screens/chat_screen.dart b/lib/screens/chat_screen.dart
index 9b8a551..22efed5 100644
--- a/lib/screens/chat_screen.dart
+++ b/lib/screens/chat_screen.dart
@@ -223,10 +223,27 @@
       _push?.onMqttConnected();
     };
     _ws!.onResume = () {
-      // App came back from background with connection still alive.
-      // Send catch_up to fetch any messages missed during suspend.
+      // App came back from background — connection may or may not be alive.
+      // Try catch_up first, but if no response comes, force reconnect.
       _chatLog('onResume: sending catch_up with lastSeq=$_lastSeq');
+      final seqBefore = _lastSeq;
       _sendCommand('catch_up', {'lastSeq': _lastSeq});
+      // Force UI rebuild for any buffered messages
+      Future.delayed(const Duration(milliseconds: 300), () {
+        if (mounted) {
+          setState(() {});
+          _scrollToBottom();
+        }
+      });
+      // If catch_up didn't produce a response in 2s, connection is dead — reconnect
+      Future.delayed(const Duration(seconds: 2), () {
+        if (!mounted) return;
+        if (_lastSeq == seqBefore) {
+          // No new messages arrived — connection likely dead
+          _chatLog('onResume: no catch_up response after 2s, forcing reconnect');
+          _ws?.forceReconnect();
+        }
+      });
     };
     _ws!.onError = (error) {
       debugPrint('MQTT error: $error');
@@ -247,7 +264,9 @@
     // sent immediately if already connected.
     _push = PushService(mqttService: _ws!);
     _push!.onNotificationTap = (data) {
-      // If notification carried a sessionId, switch to that session
+      // Switch to the session immediately, then request catch_up.
+      // The MQTT connection auto-reconnects on resume, and onResume
+      // already sends catch_up. We just need to be on the right session.
       final sessionId = data['sessionId'] as String?;
       if (sessionId != null && mounted) {
         _switchSession(sessionId);
diff --git a/lib/services/mqtt_service.dart b/lib/services/mqtt_service.dart
index 34ffc93..618a00e 100644
--- a/lib/services/mqtt_service.dart
+++ b/lib/services/mqtt_service.dart
@@ -46,7 +46,9 @@
 /// Subscribes to all pailot/ topics and dispatches messages
 /// through the onMessage callback interface.
 class MqttService with WidgetsBindingObserver {
-  MqttService({required this.config});
+  MqttService({required this.config}) {
+    WidgetsBinding.instance.addObserver(this);
+  }
 
   ServerConfig config;
   MqttServerClient? _client;
@@ -61,6 +63,9 @@
   // Message deduplication
   final Set<String> _seenMsgIds = {};
   final List<String> _seenMsgIdOrder = [];
+
+  // Per-session explicit subscriptions (wildcards broken in aedes)
+  final Set<String> _subscribedSessions = {};
   static const int _maxSeenIds = 500;
 
   // Callbacks
@@ -97,6 +102,34 @@
     }
     _clientId = id;
     return id;
+  }
+
+  /// Force reconnect — disconnect and reconnect to last known host.
+  void forceReconnect() {
+    _mqttLog('MQTT: force reconnect requested');
+    final lastHost = connectedHost;
+    _client?.disconnect();
+    _client = null;
+    _setStatus(ConnectionStatus.reconnecting);
+    onReconnecting?.call();
+    if (lastHost != null) {
+      _fastReconnect(lastHost);
+    } else {
+      connect();
+    }
+  }
+
+  /// Fast reconnect to a known host — skips discovery, short timeout.
+  Future<void> _fastReconnect(String host) async {
+    _mqttLog('MQTT: fast reconnect to $host');
+    final clientId = await _getClientId();
+    if (await _tryConnect(host, clientId, timeout: 2000)) {
+      connectedHost = host;
+      return;
+    }
+    // Fast path failed — fall back to full connect
+    _mqttLog('MQTT: fast reconnect failed, full connect...');
+    connect();
   }
 
   /// Connect to the MQTT broker.
@@ -458,15 +491,34 @@
       _mqttLog('MQTT: _subscribe called but client is null');
       return;
     }
-    _mqttLog('MQTT: subscribing to topics...');
+    // Subscribe to exact topics only — wildcard pailot/+/out is broken in aedes.
+    // Per-session topics are added dynamically when the session list arrives.
+    _mqttLog('MQTT: subscribing to base + ${_subscribedSessions.length} session topics...');
     client.subscribe('pailot/sessions', MqttQos.atLeastOnce);
     client.subscribe('pailot/status', MqttQos.atLeastOnce);
     client.subscribe('pailot/projects', MqttQos.atLeastOnce);
-    client.subscribe('pailot/+/out', MqttQos.atLeastOnce);
-    client.subscribe('pailot/+/typing', MqttQos.atMostOnce);
-    client.subscribe('pailot/+/screenshot', MqttQos.atLeastOnce);
     client.subscribe('pailot/control/out', MqttQos.atLeastOnce);
     client.subscribe('pailot/voice/transcript', MqttQos.atLeastOnce);
+    // Re-subscribe to all known per-session topics
+    for (final sid in _subscribedSessions) {
+      client.subscribe('pailot/$sid/out', MqttQos.atLeastOnce);
+      client.subscribe('pailot/$sid/typing', MqttQos.atMostOnce);
+      client.subscribe('pailot/$sid/screenshot', MqttQos.atLeastOnce);
+    }
+  }
+
+  /// Subscribe to per-session topics when session list arrives.
+  void _subscribeToSessions(List<String> sessionIds) {
+    final client = _client;
+    if (client == null) return;
+    for (final sid in sessionIds) {
+      if (_subscribedSessions.contains(sid)) continue;
+      _subscribedSessions.add(sid);
+      client.subscribe('pailot/$sid/out', MqttQos.atLeastOnce);
+      client.subscribe('pailot/$sid/typing', MqttQos.atMostOnce);
+      client.subscribe('pailot/$sid/screenshot', MqttQos.atLeastOnce);
+      _mqttLog('MQTT: subscribed to session ${sid.substring(0, 8)}');
+    }
   }
 
   void _listenMessages() {
@@ -525,9 +577,18 @@
   void _dispatchMessage(String topic, Map<String, dynamic> json) {
     final parts = topic.split('/');
 
-    // pailot/sessions
+    // pailot/sessions — also dynamically subscribe to per-session topics
     if (topic == 'pailot/sessions') {
       json['type'] = 'sessions';
+      final sessions = json['sessions'] as List<dynamic>?;
+      if (sessions != null) {
+        final ids = sessions
+            .map((s) => (s as Map<String, dynamic>)['id'] as String?)
+            .where((id) => id != null && id.isNotEmpty)
+            .cast<String>()
+            .toList();
+        if (ids.isNotEmpty) _subscribeToSessions(ids);
+      }
       onMessage?.call(json);
       return;
     }
@@ -770,6 +831,7 @@
 
   /// Dispose all resources.
   void dispose() {
+    WidgetsBinding.instance.removeObserver(this);
     disconnect();
   }
 
@@ -782,14 +844,18 @@
         _mqttLog('MQTT: app resumed, status=$_status client=${_client != null} mqttState=${_client?.connectionStatus?.state}');
         final client = _client;
         if (client == null || client.connectionStatus?.state != MqttConnectionState.connected) {
-          // Clearly disconnected — just reconnect
-          _mqttLog('MQTT: not connected on resume, reconnecting...');
+          // Clearly disconnected — fast reconnect to last known host
+          _mqttLog('MQTT: disconnected on resume, reconnecting...');
           _client = null;
           _setStatus(ConnectionStatus.reconnecting);
-          connect();
+          if (connectedHost != null) {
+            _fastReconnect(connectedHost!);
+          } else {
+            connect();
+          }
         } else {
-          // Appears connected — notify listener to fetch missed messages
-          // via catch_up. Don't call onOpen (it resets sessionReady and causes flicker).
+          // Appears connected — trigger catch_up to fetch missed messages.
+          // Don't disconnect! iOS may have buffered messages while suspended.
           _mqttLog('MQTT: appears connected on resume, triggering catch_up');
           onResume?.call();
         }

--
Gitblit v1.3.1