From 6cbbea9b96db551e5c0ac26f0ace3d4c3d82a276 Mon Sep 17 00:00:00 2001
From: Matthias Nott <mnott@mnsoft.org>
Date: Mon, 06 Apr 2026 15:02:33 +0200
Subject: [PATCH] fix: single pailot/out topic, per-session file locks, merge protection, resume reconnect

---
 lib/services/mqtt_service.dart |  125 ++++++++++++-----------------------------
 1 files changed, 36 insertions(+), 89 deletions(-)

diff --git a/lib/services/mqtt_service.dart b/lib/services/mqtt_service.dart
index ba6056e..285c6bf 100644
--- a/lib/services/mqtt_service.dart
+++ b/lib/services/mqtt_service.dart
@@ -65,8 +65,7 @@
   final Set<String> _seenMsgIds = {};
   final List<String> _seenMsgIdOrder = [];
 
-  // Per-session explicit subscriptions (wildcards broken in aedes)
-  final Set<String> _subscribedSessions = {};
+  // (Per-session subscriptions removed — single pailot/out topic now)
   static const int _maxSeenIds = 500;
 
   // Callbacks
@@ -492,34 +491,14 @@
       _mqttLog('MQTT: _subscribe called but client is null');
       return;
     }
-    // Subscribe to exact topics only — wildcard pailot/+/out is broken in aedes.
-    // Per-session topics are added dynamically when the session list arrives.
-    _mqttLog('MQTT: subscribing to base + ${_subscribedSessions.length} session topics...');
+    // Single outbound topic — all messages carry sessionId in payload.
+    // Client routes messages to the correct session based on payload.
+    _mqttLog('MQTT: subscribing to topics...');
+    client.subscribe('pailot/out', MqttQos.atLeastOnce);
     client.subscribe('pailot/sessions', MqttQos.atLeastOnce);
     client.subscribe('pailot/status', MqttQos.atLeastOnce);
     client.subscribe('pailot/projects', MqttQos.atLeastOnce);
     client.subscribe('pailot/control/out', MqttQos.atLeastOnce);
-    client.subscribe('pailot/voice/transcript', MqttQos.atLeastOnce);
-    // Re-subscribe to all known per-session topics
-    for (final sid in _subscribedSessions) {
-      client.subscribe('pailot/$sid/out', MqttQos.atLeastOnce);
-      client.subscribe('pailot/$sid/typing', MqttQos.atMostOnce);
-      client.subscribe('pailot/$sid/screenshot', MqttQos.atLeastOnce);
-    }
-  }
-
-  /// Subscribe to per-session topics when session list arrives.
-  void _subscribeToSessions(List<String> sessionIds) {
-    final client = _client;
-    if (client == null) return;
-    for (final sid in sessionIds) {
-      if (_subscribedSessions.contains(sid)) continue;
-      _subscribedSessions.add(sid);
-      client.subscribe('pailot/$sid/out', MqttQos.atLeastOnce);
-      client.subscribe('pailot/$sid/typing', MqttQos.atMostOnce);
-      client.subscribe('pailot/$sid/screenshot', MqttQos.atLeastOnce);
-      _mqttLog('MQTT: subscribed to session ${sid.substring(0, 8)}');
-    }
   }
 
   void _listenMessages() {
@@ -576,20 +555,9 @@
   /// Translates MQTT topic structure into the flat message format
   /// that chat_screen expects.
   void _dispatchMessage(String topic, Map<String, dynamic> json) {
-    final parts = topic.split('/');
-
-    // pailot/sessions — also dynamically subscribe to per-session topics
+    // pailot/sessions
     if (topic == 'pailot/sessions') {
       json['type'] = 'sessions';
-      final sessions = json['sessions'] as List<dynamic>?;
-      if (sessions != null) {
-        final ids = sessions
-            .map((s) => (s as Map<String, dynamic>)['id'] as String?)
-            .where((id) => id != null && id.isNotEmpty)
-            .cast<String>()
-            .toList();
-        if (ids.isNotEmpty) _subscribeToSessions(ids);
-      }
       onMessage?.call(json);
       return;
     }
@@ -608,46 +576,25 @@
       return;
     }
 
-    // pailot/control/out — command responses (session_switched, session_renamed, error, unread)
+    // pailot/control/out — command responses
     if (topic == 'pailot/control/out') {
       onMessage?.call(json);
       return;
     }
 
-    // pailot/voice/transcript
-    if (topic == 'pailot/voice/transcript') {
-      json['type'] = 'transcript';
-      onMessage?.call(json);
-      return;
-    }
-
-    // pailot/<sessionId>/out — text, voice, image messages
-    if (parts.length == 3 && parts[2] == 'out') {
-      final sessionId = parts[1];
-      json['sessionId'] ??= sessionId;
-      onMessage?.call(json);
-      return;
-    }
-
-    // pailot/<sessionId>/typing
-    if (parts.length == 3 && parts[2] == 'typing') {
-      final sessionId = parts[1];
-      json['type'] = 'typing';
-      json['sessionId'] ??= sessionId;
-      // Map 'active' field to the 'typing'/'isTyping' fields chat_screen expects
-      final active = json['active'] as bool? ?? true;
-      json['typing'] = active;
-      onMessage?.call(json);
-      return;
-    }
-
-    // pailot/<sessionId>/screenshot
-    if (parts.length == 3 && parts[2] == 'screenshot') {
-      final sessionId = parts[1];
-      json['type'] = 'screenshot';
-      json['sessionId'] ??= sessionId;
-      // Map imageBase64 to 'data' for compatibility with chat_screen handler
-      json['data'] ??= json['imageBase64'];
+    // pailot/out — ALL content messages (text, voice, image, typing, screenshot, transcript)
+    // Each message carries its type and sessionId in the payload.
+    if (topic == 'pailot/out') {
+      final type = json['type'] as String?;
+      // Normalize typing fields for chat_screen
+      if (type == 'typing') {
+        final active = json['active'] as bool? ?? true;
+        json['typing'] = active;
+      }
+      // Normalize screenshot fields
+      if (type == 'screenshot') {
+        json['data'] ??= json['imageBase64'];
+      }
       onMessage?.call(json);
       return;
     }
@@ -851,23 +798,23 @@
     switch (state) {
       case AppLifecycleState.resumed:
         if (_intentionalClose) break;
-        _mqttLog('MQTT: app resumed, status=$_status client=${_client != null} mqttState=${_client?.connectionStatus?.state}');
-        final client = _client;
-        if (client == null || client.connectionStatus?.state != MqttConnectionState.connected) {
-          // Clearly disconnected — fast reconnect to last known host
-          _mqttLog('MQTT: disconnected on resume, reconnecting...');
-          _client = null;
-          _setStatus(ConnectionStatus.reconnecting);
-          if (connectedHost != null) {
-            _fastReconnect(connectedHost!);
-          } else {
-            connect();
-          }
+        _mqttLog('MQTT: app resumed — reconnecting to last host');
+        // Kill old client completely (disable autoReconnect first to prevent
+        // the MQTT library from spawning its own reconnect attempt)
+        final oldClient = _client;
+        if (oldClient != null) {
+          oldClient.autoReconnect = false;
+          try { oldClient.disconnect(); } catch (_) {}
+        }
+        _updatesSub?.cancel();
+        _updatesSub = null;
+        _client = null;
+        _setStatus(ConnectionStatus.reconnecting);
+        onReconnecting?.call();
+        if (connectedHost != null) {
+          _fastReconnect(connectedHost!);
         } else {
-          // Appears connected — trigger catch_up to fetch missed messages.
-          // Don't disconnect! iOS may have buffered messages while suspended.
-          _mqttLog('MQTT: appears connected on resume, triggering catch_up');
-          onResume?.call();
+          connect();
         }
       case AppLifecycleState.paused:
         break;

--
Gitblit v1.3.1