From 34b82ffb9b31866b528cd91a3d2ec360dbb8da5e Mon Sep 17 00:00:00 2001
From: Matthias Nott <mnott@mnsoft.org>
Date: Mon, 06 Apr 2026 13:05:50 +0200
Subject: [PATCH] fix: explicit per-session MQTT subscriptions, lifecycle observer, resume reconnect with fallback

---
 lib/services/mqtt_service.dart |   88 ++++++++++++++++++++++++++++++++++++++-----
 1 files changed, 77 insertions(+), 11 deletions(-)

diff --git a/lib/services/mqtt_service.dart b/lib/services/mqtt_service.dart
index 34ffc93..618a00e 100644
--- a/lib/services/mqtt_service.dart
+++ b/lib/services/mqtt_service.dart
@@ -46,7 +46,9 @@
 /// Subscribes to all pailot/ topics and dispatches messages
 /// through the onMessage callback interface.
 class MqttService with WidgetsBindingObserver {
-  MqttService({required this.config});
+  MqttService({required this.config}) {
+    WidgetsBinding.instance.addObserver(this);
+  }
 
   ServerConfig config;
   MqttServerClient? _client;
@@ -61,6 +63,9 @@
   // Message deduplication
   final Set<String> _seenMsgIds = {};
   final List<String> _seenMsgIdOrder = [];
+
+  // Per-session explicit subscriptions (wildcards broken in aedes)
+  final Set<String> _subscribedSessions = {};
   static const int _maxSeenIds = 500;
 
   // Callbacks
@@ -97,6 +102,34 @@
     }
     _clientId = id;
     return id;
+  }
+
+  /// Force reconnect — disconnect and reconnect to last known host.
+  void forceReconnect() {
+    _mqttLog('MQTT: force reconnect requested');
+    final lastHost = connectedHost;
+    _client?.disconnect();
+    _client = null;
+    _setStatus(ConnectionStatus.reconnecting);
+    onReconnecting?.call();
+    if (lastHost != null) {
+      _fastReconnect(lastHost);
+    } else {
+      connect();
+    }
+  }
+
+  /// Fast reconnect to a known host — skips discovery, short timeout.
+  Future<void> _fastReconnect(String host) async {
+    _mqttLog('MQTT: fast reconnect to $host');
+    final clientId = await _getClientId();
+    if (await _tryConnect(host, clientId, timeout: 2000)) {
+      connectedHost = host;
+      return;
+    }
+    // Fast path failed — fall back to full connect
+    _mqttLog('MQTT: fast reconnect failed, full connect...');
+    connect();
   }
 
   /// Connect to the MQTT broker.
@@ -458,15 +491,34 @@
       _mqttLog('MQTT: _subscribe called but client is null');
       return;
     }
-    _mqttLog('MQTT: subscribing to topics...');
+    // Subscribe to exact topics only — wildcard pailot/+/out is broken in aedes.
+    // Per-session topics are added dynamically when the session list arrives.
+    _mqttLog('MQTT: subscribing to base + ${_subscribedSessions.length} session topics...');
     client.subscribe('pailot/sessions', MqttQos.atLeastOnce);
     client.subscribe('pailot/status', MqttQos.atLeastOnce);
     client.subscribe('pailot/projects', MqttQos.atLeastOnce);
-    client.subscribe('pailot/+/out', MqttQos.atLeastOnce);
-    client.subscribe('pailot/+/typing', MqttQos.atMostOnce);
-    client.subscribe('pailot/+/screenshot', MqttQos.atLeastOnce);
     client.subscribe('pailot/control/out', MqttQos.atLeastOnce);
     client.subscribe('pailot/voice/transcript', MqttQos.atLeastOnce);
+    // Re-subscribe to all known per-session topics
+    for (final sid in _subscribedSessions) {
+      client.subscribe('pailot/$sid/out', MqttQos.atLeastOnce);
+      client.subscribe('pailot/$sid/typing', MqttQos.atMostOnce);
+      client.subscribe('pailot/$sid/screenshot', MqttQos.atLeastOnce);
+    }
+  }
+
+  /// Subscribe to per-session topics when session list arrives.
+  void _subscribeToSessions(List<String> sessionIds) {
+    final client = _client;
+    if (client == null) return;
+    for (final sid in sessionIds) {
+      if (_subscribedSessions.contains(sid)) continue;
+      _subscribedSessions.add(sid);
+      client.subscribe('pailot/$sid/out', MqttQos.atLeastOnce);
+      client.subscribe('pailot/$sid/typing', MqttQos.atMostOnce);
+      client.subscribe('pailot/$sid/screenshot', MqttQos.atLeastOnce);
+      _mqttLog('MQTT: subscribed to session ${sid.substring(0, 8)}');
+    }
   }
 
   void _listenMessages() {
@@ -525,9 +577,18 @@
   void _dispatchMessage(String topic, Map<String, dynamic> json) {
     final parts = topic.split('/');
 
-    // pailot/sessions
+    // pailot/sessions — also dynamically subscribe to per-session topics
     if (topic == 'pailot/sessions') {
       json['type'] = 'sessions';
+      final sessions = json['sessions'] as List<dynamic>?;
+      if (sessions != null) {
+        final ids = sessions
+            .map((s) => (s as Map<String, dynamic>)['id'] as String?)
+            .where((id) => id != null && id.isNotEmpty)
+            .cast<String>()
+            .toList();
+        if (ids.isNotEmpty) _subscribeToSessions(ids);
+      }
       onMessage?.call(json);
       return;
     }
@@ -770,6 +831,7 @@
 
   /// Dispose all resources.
   void dispose() {
+    WidgetsBinding.instance.removeObserver(this);
     disconnect();
   }
 
@@ -782,14 +844,18 @@
         _mqttLog('MQTT: app resumed, status=$_status client=${_client != null} mqttState=${_client?.connectionStatus?.state}');
         final client = _client;
         if (client == null || client.connectionStatus?.state != MqttConnectionState.connected) {
-          // Clearly disconnected — just reconnect
-          _mqttLog('MQTT: not connected on resume, reconnecting...');
+          // Clearly disconnected — fast reconnect to last known host
+          _mqttLog('MQTT: disconnected on resume, reconnecting...');
           _client = null;
           _setStatus(ConnectionStatus.reconnecting);
-          connect();
+          if (connectedHost != null) {
+            _fastReconnect(connectedHost!);
+          } else {
+            connect();
+          }
         } else {
-          // Appears connected — notify listener to fetch missed messages
-          // via catch_up. Don't call onOpen (it resets sessionReady and causes flicker).
+          // Appears connected — trigger catch_up to fetch missed messages.
+          // Don't disconnect! iOS may have buffered messages while suspended.
           _mqttLog('MQTT: appears connected on resume, triggering catch_up');
           onResume?.call();
         }

--
Gitblit v1.3.1