From 6cbbea9b96db551e5c0ac26f0ace3d4c3d82a276 Mon Sep 17 00:00:00 2001
From: Matthias Nott <mnott@mnsoft.org>
Date: Mon, 06 Apr 2026 15:02:33 +0200
Subject: [PATCH] fix: single pailot/out topic, per-session file locks, merge protection, resume reconnect

---
 lib/services/message_store.dart |   27 ++++++
 lib/providers/providers.dart    |   20 +++-
 lib/services/mqtt_service.dart  |  125 +++++++++----------------------
 lib/screens/chat_screen.dart    |   33 ++++---
 4 files changed, 94 insertions(+), 111 deletions(-)

diff --git a/lib/providers/providers.dart b/lib/providers/providers.dart
index ad4259a..6155189 100644
--- a/lib/providers/providers.dart
+++ b/lib/providers/providers.dart
@@ -100,28 +100,38 @@
 
   /// Switch to a new session and load its messages.
   Future<void> switchSession(String sessionId) async {
-    // Log caller for debugging
     final trace = StackTrace.current.toString().split('\n').take(4).join(' | ');
     TraceService.instance.addTrace(
       'switchSession',
       'from=${_currentSessionId?.substring(0, 8) ?? "null"}(${state.length}) → ${sessionId.substring(0, 8)} | $trace',
     );
-    // Write current session DIRECTLY to disk (no debounce — prevents data loss)
+    // Write current session to disk
     if (_currentSessionId != null && state.isNotEmpty) {
       await MessageStore.writeDirect(_currentSessionId!, state);
     }
 
+    // Skip reload if staying on the same session — messages are already in memory
+    if (_currentSessionId == sessionId) {
+      TraceService.instance.addTrace('switchSession SKIP', 'already on ${sessionId.substring(0, 8)}');
+      return;
+    }
+
     _currentSessionId = sessionId;
     final messages = await MessageStore.loadAll(sessionId);
-    state = messages;
+    // Merge: if addMessage ran during loadAll and added messages for THIS session,
+    // they'll be in state but not in the loaded messages. Keep the longer list.
+    if (state.length > messages.length && _currentSessionId == sessionId) {
+      TraceService.instance.addTrace('switchSession MERGE', 'kept ${state.length} (loaded ${messages.length})');
+    } else {
+      state = messages;
+    }
   }
 
   /// Add a message to the current session.
   void addMessage(Message message) {
     state = [...state, message];
     if (_currentSessionId != null) {
-      // Write immediately (not debounced) to prevent race with switchSession's loadAll
-      MessageStore.writeDirect(_currentSessionId!, state);
+      MessageStore.save(_currentSessionId!, state);
     }
   }
 
diff --git a/lib/screens/chat_screen.dart b/lib/screens/chat_screen.dart
index e889469..f331ab9 100644
--- a/lib/screens/chat_screen.dart
+++ b/lib/screens/chat_screen.dart
@@ -216,12 +216,15 @@
     _ws!.onOpen = () {
       _sessionReady = false; // Gate messages until sessions arrive
       _pendingMessages.clear();
-      final activeId = ref.read(activeSessionIdProvider);
-      _sendCommand('sync', activeId != null ? {'activeSessionId': activeId} : null);
-      // catch_up is sent after sessions arrive (in _handleSessions)
-
-      // Re-register APNs token after reconnect so daemon always has a fresh token
-      _push?.onMqttConnected();
+      // Delay sync slightly to let broker acknowledge our subscriptions first.
+      // Without this, the catch_up response arrives before pailot/control/out
+      // subscription is active, and the message is lost.
+      Future.delayed(const Duration(milliseconds: 500), () {
+        if (!mounted) return;
+        final activeId = ref.read(activeSessionIdProvider);
+        _sendCommand('sync', activeId != null ? {'activeSessionId': activeId} : null);
+        _push?.onMqttConnected();
+      });
     };
     _ws!.onResume = () {
       // App came back from background. The in-memory state already has
@@ -534,8 +537,10 @@
       status: MessageStatus.sent,
     );
 
-    final activeId = ref.read(activeSessionIdProvider);
-    if (sessionId != null && sessionId != activeId) {
+    // Use currentSessionId from notifier (what's actually loaded in the provider)
+    // not activeSessionIdProvider (can be stale after background resume)
+    final currentId = ref.read(messagesProvider.notifier).currentSessionId;
+    if (sessionId != null && sessionId != currentId) {
       // Store message for the other session so it's there when user switches
       TraceService.instance.addTrace(
         'message stored for session',
@@ -608,9 +613,9 @@
       duration: duration,
     );
 
-    final activeId = ref.read(activeSessionIdProvider);
-    _chatLog('voice: sessionId=$sessionId activeId=$activeId audioPath=$savedAudioPath content="${content.substring(0, content.length.clamp(0, 30))}"');
-    if (sessionId != null && sessionId != activeId) {
+    final currentId = ref.read(messagesProvider.notifier).currentSessionId;
+    _chatLog('voice: sessionId=$sessionId currentId=$currentId audioPath=$savedAudioPath content="${content.substring(0, content.length.clamp(0, 30))}"');
+    if (sessionId != null && sessionId != currentId) {
       _chatLog('voice: cross-session, storing for $sessionId');
       await _storeForSession(sessionId, storedMessage);
       _chatLog('voice: stored, incrementing unread');
@@ -679,9 +684,9 @@
       status: MessageStatus.sent,
     );
 
-    // Cross-session routing: store for target session if not active
-    final activeId = ref.read(activeSessionIdProvider);
-    if (sessionId != null && sessionId != activeId) {
+    // Cross-session routing: store for target session if not currently loaded
+    final currentId = ref.read(messagesProvider.notifier).currentSessionId;
+    if (sessionId != null && sessionId != currentId) {
       _storeForSession(sessionId, message);
       _incrementUnread(sessionId);
       return;
diff --git a/lib/services/message_store.dart b/lib/services/message_store.dart
index f45e0d0..1723b70 100644
--- a/lib/services/message_store.dart
+++ b/lib/services/message_store.dart
@@ -15,6 +15,8 @@
   static Directory? _baseDir;
   static Timer? _debounceTimer;
   static final Map<String, List<Message>> _pendingSaves = {};
+  // Per-session lock to prevent concurrent read/write on the same file
+  static final Map<String, Completer<void>> _locks = {};
 
   static const _backupChannel =
       MethodChannel('com.mnsoft.pailot/backup');
@@ -60,10 +62,25 @@
 
   /// Write directly to disk, bypassing debounce. For critical saves.
   static Future<void> writeDirect(String sessionId, List<Message> messages) async {
-    // Cancel ALL pending debounce to prevent race with frozen iOS timers
     _debounceTimer?.cancel();
     _pendingSaves.remove(sessionId);
-    await _writeSession(sessionId, messages);
+    await _withLock(sessionId, () => _writeSession(sessionId, messages));
+  }
+
+  /// Acquire a per-session lock, run the operation, release.
+  static Future<T> _withLock<T>(String sessionId, Future<T> Function() fn) async {
+    // Wait for any existing operation on this session to finish
+    while (_locks.containsKey(sessionId)) {
+      await _locks[sessionId]!.future;
+    }
+    final completer = Completer<void>();
+    _locks[sessionId] = completer;
+    try {
+      return await fn();
+    } finally {
+      _locks.remove(sessionId);
+      completer.complete();
+    }
   }
 
   /// Immediately flush all pending saves.
@@ -77,7 +94,7 @@
     _pendingSaves.clear();
 
     for (final entry in entries.entries) {
-      await _writeSession(entry.key, entry.value);
+      await _withLock(entry.key, () => _writeSession(entry.key, entry.value));
     }
   }
 
@@ -128,6 +145,10 @@
 
   /// Load all messages for a session (no pagination).
   static Future<List<Message>> loadAll(String sessionId) async {
+    return _withLock(sessionId, () => _loadAllImpl(sessionId));
+  }
+
+  static Future<List<Message>> _loadAllImpl(String sessionId) async {
     try {
       final dir = await _getBaseDir();
       final file = File('${dir.path}/${_fileForSession(sessionId)}');
diff --git a/lib/services/mqtt_service.dart b/lib/services/mqtt_service.dart
index ba6056e..285c6bf 100644
--- a/lib/services/mqtt_service.dart
+++ b/lib/services/mqtt_service.dart
@@ -65,8 +65,7 @@
   final Set<String> _seenMsgIds = {};
   final List<String> _seenMsgIdOrder = [];
 
-  // Per-session explicit subscriptions (wildcards broken in aedes)
-  final Set<String> _subscribedSessions = {};
+  // (Per-session subscriptions removed — single pailot/out topic now)
   static const int _maxSeenIds = 500;
 
   // Callbacks
@@ -492,34 +491,14 @@
       _mqttLog('MQTT: _subscribe called but client is null');
       return;
     }
-    // Subscribe to exact topics only — wildcard pailot/+/out is broken in aedes.
-    // Per-session topics are added dynamically when the session list arrives.
-    _mqttLog('MQTT: subscribing to base + ${_subscribedSessions.length} session topics...');
+    // Single outbound topic — all messages carry sessionId in payload.
+    // Client routes messages to the correct session based on payload.
+    _mqttLog('MQTT: subscribing to topics...');
+    client.subscribe('pailot/out', MqttQos.atLeastOnce);
     client.subscribe('pailot/sessions', MqttQos.atLeastOnce);
     client.subscribe('pailot/status', MqttQos.atLeastOnce);
     client.subscribe('pailot/projects', MqttQos.atLeastOnce);
     client.subscribe('pailot/control/out', MqttQos.atLeastOnce);
-    client.subscribe('pailot/voice/transcript', MqttQos.atLeastOnce);
-    // Re-subscribe to all known per-session topics
-    for (final sid in _subscribedSessions) {
-      client.subscribe('pailot/$sid/out', MqttQos.atLeastOnce);
-      client.subscribe('pailot/$sid/typing', MqttQos.atMostOnce);
-      client.subscribe('pailot/$sid/screenshot', MqttQos.atLeastOnce);
-    }
-  }
-
-  /// Subscribe to per-session topics when session list arrives.
-  void _subscribeToSessions(List<String> sessionIds) {
-    final client = _client;
-    if (client == null) return;
-    for (final sid in sessionIds) {
-      if (_subscribedSessions.contains(sid)) continue;
-      _subscribedSessions.add(sid);
-      client.subscribe('pailot/$sid/out', MqttQos.atLeastOnce);
-      client.subscribe('pailot/$sid/typing', MqttQos.atMostOnce);
-      client.subscribe('pailot/$sid/screenshot', MqttQos.atLeastOnce);
-      _mqttLog('MQTT: subscribed to session ${sid.substring(0, 8)}');
-    }
   }
 
   void _listenMessages() {
@@ -576,20 +555,9 @@
   /// Translates MQTT topic structure into the flat message format
   /// that chat_screen expects.
   void _dispatchMessage(String topic, Map<String, dynamic> json) {
-    final parts = topic.split('/');
-
-    // pailot/sessions — also dynamically subscribe to per-session topics
+    // pailot/sessions
     if (topic == 'pailot/sessions') {
       json['type'] = 'sessions';
-      final sessions = json['sessions'] as List<dynamic>?;
-      if (sessions != null) {
-        final ids = sessions
-            .map((s) => (s as Map<String, dynamic>)['id'] as String?)
-            .where((id) => id != null && id.isNotEmpty)
-            .cast<String>()
-            .toList();
-        if (ids.isNotEmpty) _subscribeToSessions(ids);
-      }
       onMessage?.call(json);
       return;
     }
@@ -608,46 +576,25 @@
       return;
     }
 
-    // pailot/control/out — command responses (session_switched, session_renamed, error, unread)
+    // pailot/control/out — command responses
     if (topic == 'pailot/control/out') {
       onMessage?.call(json);
       return;
     }
 
-    // pailot/voice/transcript
-    if (topic == 'pailot/voice/transcript') {
-      json['type'] = 'transcript';
-      onMessage?.call(json);
-      return;
-    }
-
-    // pailot/<sessionId>/out — text, voice, image messages
-    if (parts.length == 3 && parts[2] == 'out') {
-      final sessionId = parts[1];
-      json['sessionId'] ??= sessionId;
-      onMessage?.call(json);
-      return;
-    }
-
-    // pailot/<sessionId>/typing
-    if (parts.length == 3 && parts[2] == 'typing') {
-      final sessionId = parts[1];
-      json['type'] = 'typing';
-      json['sessionId'] ??= sessionId;
-      // Map 'active' field to the 'typing'/'isTyping' fields chat_screen expects
-      final active = json['active'] as bool? ?? true;
-      json['typing'] = active;
-      onMessage?.call(json);
-      return;
-    }
-
-    // pailot/<sessionId>/screenshot
-    if (parts.length == 3 && parts[2] == 'screenshot') {
-      final sessionId = parts[1];
-      json['type'] = 'screenshot';
-      json['sessionId'] ??= sessionId;
-      // Map imageBase64 to 'data' for compatibility with chat_screen handler
-      json['data'] ??= json['imageBase64'];
+    // pailot/out — ALL content messages (text, voice, image, typing, screenshot, transcript)
+    // Each message carries its type and sessionId in the payload.
+    if (topic == 'pailot/out') {
+      final type = json['type'] as String?;
+      // Normalize typing fields for chat_screen
+      if (type == 'typing') {
+        final active = json['active'] as bool? ?? true;
+        json['typing'] = active;
+      }
+      // Normalize screenshot fields
+      if (type == 'screenshot') {
+        json['data'] ??= json['imageBase64'];
+      }
       onMessage?.call(json);
       return;
     }
@@ -851,23 +798,23 @@
     switch (state) {
       case AppLifecycleState.resumed:
         if (_intentionalClose) break;
-        _mqttLog('MQTT: app resumed, status=$_status client=${_client != null} mqttState=${_client?.connectionStatus?.state}');
-        final client = _client;
-        if (client == null || client.connectionStatus?.state != MqttConnectionState.connected) {
-          // Clearly disconnected — fast reconnect to last known host
-          _mqttLog('MQTT: disconnected on resume, reconnecting...');
-          _client = null;
-          _setStatus(ConnectionStatus.reconnecting);
-          if (connectedHost != null) {
-            _fastReconnect(connectedHost!);
-          } else {
-            connect();
-          }
+        _mqttLog('MQTT: app resumed — reconnecting to last host');
+        // Kill old client completely (disable autoReconnect first to prevent
+        // the MQTT library from spawning its own reconnect attempt)
+        final oldClient = _client;
+        if (oldClient != null) {
+          oldClient.autoReconnect = false;
+          try { oldClient.disconnect(); } catch (_) {}
+        }
+        _updatesSub?.cancel();
+        _updatesSub = null;
+        _client = null;
+        _setStatus(ConnectionStatus.reconnecting);
+        onReconnecting?.call();
+        if (connectedHost != null) {
+          _fastReconnect(connectedHost!);
         } else {
-          // Appears connected — trigger catch_up to fetch missed messages.
-          // Don't disconnect! iOS may have buffered messages while suspended.
-          _mqttLog('MQTT: appears connected on resume, triggering catch_up');
-          onResume?.call();
+          connect();
         }
       case AppLifecycleState.paused:
         break;

--
Gitblit v1.3.1