lib/providers/providers.dart
.. .. @@ -100,28 +100,38 @@ 100 100 101 101 /// Switch to a new session and load its messages. 102 102 Future<void> switchSession(String sessionId) async { 103 - // Log caller for debugging104 103 final trace = StackTrace.current.toString().split('\n').take(4).join(' | '); 105 104 TraceService.instance.addTrace( 106 105 'switchSession', 107 106 'from=${_currentSessionId?.substring(0, 8) ?? "null"}(${state.length}) → ${sessionId.substring(0, 8)} | $trace', 108 107 ); 109 - // Write current session DIRECTLY to disk (no debounce — prevents data loss)108 + // Write current session to disk110 109 if (_currentSessionId != null && state.isNotEmpty) { 111 110 await MessageStore.writeDirect(_currentSessionId!, state); 112 111 } 113 112 113 + // Skip reload if staying on the same session — messages are already in memory114 + if (_currentSessionId == sessionId) {115 + TraceService.instance.addTrace('switchSession SKIP', 'already on ${sessionId.substring(0, 8)}');116 + return;117 + }118 +114 119 _currentSessionId = sessionId; 115 120 final messages = await MessageStore.loadAll(sessionId); 116 - state = messages;121 + // Merge: if addMessage ran during loadAll and added messages for THIS session,122 + // they'll be in state but not in the loaded messages. Keep the longer list.123 + if (state.length > messages.length && _currentSessionId == sessionId) {124 + TraceService.instance.addTrace('switchSession MERGE', 'kept ${state.length} (loaded ${messages.length})');125 + } else {126 + state = messages;127 + }117 128 } 118 129 119 130 /// Add a message to the current session. 120 131 void addMessage(Message message) { 121 132 state = [...state, message]; 122 133 if (_currentSessionId != null) { 123 - // Write immediately (not debounced) to prevent race with switchSession's loadAll124 - MessageStore.writeDirect(_currentSessionId!, state);134 + MessageStore.save(_currentSessionId!, state);125 135 } 126 136 } 127 137 lib/screens/chat_screen.dart
.. .. @@ -216,12 +216,15 @@ 216 216 _ws!.onOpen = () { 217 217 _sessionReady = false; // Gate messages until sessions arrive 218 218 _pendingMessages.clear(); 219 - final activeId = ref.read(activeSessionIdProvider);220 - _sendCommand('sync', activeId != null ? {'activeSessionId': activeId} : null);221 - // catch_up is sent after sessions arrive (in _handleSessions)222 -223 - // Re-register APNs token after reconnect so daemon always has a fresh token224 - _push?.onMqttConnected();219 + // Delay sync slightly to let broker acknowledge our subscriptions first.220 + // Without this, the catch_up response arrives before pailot/control/out221 + // subscription is active, and the message is lost.222 + Future.delayed(const Duration(milliseconds: 500), () {223 + if (!mounted) return;224 + final activeId = ref.read(activeSessionIdProvider);225 + _sendCommand('sync', activeId != null ? {'activeSessionId': activeId} : null);226 + _push?.onMqttConnected();227 + });225 228 }; 226 229 _ws!.onResume = () { 227 230 // App came back from background. The in-memory state already has .. .. @@ -534,8 +537,10 @@ 534 537 status: MessageStatus.sent, 535 538 ); 536 539 537 - final activeId = ref.read(activeSessionIdProvider);538 - if (sessionId != null && sessionId != activeId) {540 + // Use currentSessionId from notifier (what's actually loaded in the provider)541 + // not activeSessionIdProvider (can be stale after background resume)542 + final currentId = ref.read(messagesProvider.notifier).currentSessionId;543 + if (sessionId != null && sessionId != currentId) {539 544 // Store message for the other session so it's there when user switches 540 545 TraceService.instance.addTrace( 541 546 'message stored for session', .. .. @@ -608,9 +613,9 @@ 608 613 duration: duration, 609 614 ); 610 615 611 - final activeId = ref.read(activeSessionIdProvider);612 - _chatLog('voice: sessionId=$sessionId activeId=$activeId audioPath=$savedAudioPath content="${content.substring(0, content.length.clamp(0, 30))}"');613 - if (sessionId != null && sessionId != activeId) {616 + final currentId = ref.read(messagesProvider.notifier).currentSessionId;617 + _chatLog('voice: sessionId=$sessionId currentId=$currentId audioPath=$savedAudioPath content="${content.substring(0, content.length.clamp(0, 30))}"');618 + if (sessionId != null && sessionId != currentId) {614 619 _chatLog('voice: cross-session, storing for $sessionId'); 615 620 await _storeForSession(sessionId, storedMessage); 616 621 _chatLog('voice: stored, incrementing unread'); .. .. @@ -679,9 +684,9 @@ 679 684 status: MessageStatus.sent, 680 685 ); 681 686 682 - // Cross-session routing: store for target session if not active683 - final activeId = ref.read(activeSessionIdProvider);684 - if (sessionId != null && sessionId != activeId) {687 + // Cross-session routing: store for target session if not currently loaded688 + final currentId = ref.read(messagesProvider.notifier).currentSessionId;689 + if (sessionId != null && sessionId != currentId) {685 690 _storeForSession(sessionId, message); 686 691 _incrementUnread(sessionId); 687 692 return; lib/services/message_store.dart
.. .. @@ -15,6 +15,8 @@ 15 15 static Directory? _baseDir; 16 16 static Timer? _debounceTimer; 17 17 static final Map<String, List<Message>> _pendingSaves = {}; 18 + // Per-session lock to prevent concurrent read/write on the same file19 + static final Map<String, Completer<void>> _locks = {};18 20 19 21 static const _backupChannel = 20 22 MethodChannel('com.mnsoft.pailot/backup'); .. .. @@ -60,10 +62,25 @@ 60 62 61 63 /// Write directly to disk, bypassing debounce. For critical saves. 62 64 static Future<void> writeDirect(String sessionId, List<Message> messages) async { 63 - // Cancel ALL pending debounce to prevent race with frozen iOS timers64 65 _debounceTimer?.cancel(); 65 66 _pendingSaves.remove(sessionId); 66 - await _writeSession(sessionId, messages);67 + await _withLock(sessionId, () => _writeSession(sessionId, messages));68 + }69 +70 + /// Acquire a per-session lock, run the operation, release.71 + static Future<T> _withLock<T>(String sessionId, Future<T> Function() fn) async {72 + // Wait for any existing operation on this session to finish73 + while (_locks.containsKey(sessionId)) {74 + await _locks[sessionId]!.future;75 + }76 + final completer = Completer<void>();77 + _locks[sessionId] = completer;78 + try {79 + return await fn();80 + } finally {81 + _locks.remove(sessionId);82 + completer.complete();83 + }67 84 } 68 85 69 86 /// Immediately flush all pending saves. .. .. @@ -77,7 +94,7 @@ 77 94 _pendingSaves.clear(); 78 95 79 96 for (final entry in entries.entries) { 80 - await _writeSession(entry.key, entry.value);97 + await _withLock(entry.key, () => _writeSession(entry.key, entry.value));81 98 } 82 99 } 83 100 .. .. @@ -128,6 +145,10 @@ 128 145 129 146 /// Load all messages for a session (no pagination). 130 147 static Future<List<Message>> loadAll(String sessionId) async { 148 + return _withLock(sessionId, () => _loadAllImpl(sessionId));149 + }150 +151 + static Future<List<Message>> _loadAllImpl(String sessionId) async {131 152 try { 132 153 final dir = await _getBaseDir(); 133 154 final file = File('${dir.path}/${_fileForSession(sessionId)}'); lib/services/mqtt_service.dart
.. .. @@ -65,8 +65,7 @@ 65 65 final Set<String> _seenMsgIds = {}; 66 66 final List<String> _seenMsgIdOrder = []; 67 67 68 - // Per-session explicit subscriptions (wildcards broken in aedes)69 - final Set<String> _subscribedSessions = {};68 + // (Per-session subscriptions removed — single pailot/out topic now)70 69 static const int _maxSeenIds = 500; 71 70 72 71 // Callbacks .. .. @@ -492,34 +491,14 @@ 492 491 _mqttLog('MQTT: _subscribe called but client is null'); 493 492 return; 494 493 } 495 - // Subscribe to exact topics only — wildcard pailot/+/out is broken in aedes.496 - // Per-session topics are added dynamically when the session list arrives.497 - _mqttLog('MQTT: subscribing to base + ${_subscribedSessions.length} session topics...');494 + // Single outbound topic — all messages carry sessionId in payload.495 + // Client routes messages to the correct session based on payload.496 + _mqttLog('MQTT: subscribing to topics...');497 + client.subscribe('pailot/out', MqttQos.atLeastOnce);498 498 client.subscribe('pailot/sessions', MqttQos.atLeastOnce); 499 499 client.subscribe('pailot/status', MqttQos.atLeastOnce); 500 500 client.subscribe('pailot/projects', MqttQos.atLeastOnce); 501 501 client.subscribe('pailot/control/out', MqttQos.atLeastOnce); 502 - client.subscribe('pailot/voice/transcript', MqttQos.atLeastOnce);503 - // Re-subscribe to all known per-session topics504 - for (final sid in _subscribedSessions) {505 - client.subscribe('pailot/$sid/out', MqttQos.atLeastOnce);506 - client.subscribe('pailot/$sid/typing', MqttQos.atMostOnce);507 - client.subscribe('pailot/$sid/screenshot', MqttQos.atLeastOnce);508 - }509 - }510 -511 - /// Subscribe to per-session topics when session list arrives.512 - void _subscribeToSessions(List<String> sessionIds) {513 - final client = _client;514 - if (client == null) return;515 - for (final sid in sessionIds) {516 - if (_subscribedSessions.contains(sid)) continue;517 - _subscribedSessions.add(sid);518 - client.subscribe('pailot/$sid/out', MqttQos.atLeastOnce);519 - client.subscribe('pailot/$sid/typing', MqttQos.atMostOnce);520 - client.subscribe('pailot/$sid/screenshot', MqttQos.atLeastOnce);521 - _mqttLog('MQTT: subscribed to session ${sid.substring(0, 8)}');522 - }523 502 } 524 503 525 504 void _listenMessages() { .. .. @@ -576,20 +555,9 @@ 576 555 /// Translates MQTT topic structure into the flat message format 577 556 /// that chat_screen expects. 578 557 void _dispatchMessage(String topic, Map<String, dynamic> json) { 579 - final parts = topic.split('/');580 -581 - // pailot/sessions — also dynamically subscribe to per-session topics558 + // pailot/sessions582 559 if (topic == 'pailot/sessions') { 583 560 json['type'] = 'sessions'; 584 - final sessions = json['sessions'] as List<dynamic>?;585 - if (sessions != null) {586 - final ids = sessions587 - .map((s) => (s as Map<String, dynamic>)['id'] as String?)588 - .where((id) => id != null && id.isNotEmpty)589 - .cast<String>()590 - .toList();591 - if (ids.isNotEmpty) _subscribeToSessions(ids);592 - }593 561 onMessage?.call(json); 594 562 return; 595 563 } .. .. @@ -608,46 +576,25 @@ 608 576 return; 609 577 } 610 578 611 - // pailot/control/out — command responses (session_switched, session_renamed, error, unread)579 + // pailot/control/out — command responses612 580 if (topic == 'pailot/control/out') { 613 581 onMessage?.call(json); 614 582 return; 615 583 } 616 584 617 - // pailot/voice/transcript618 - if (topic == 'pailot/voice/transcript') {619 - json['type'] = 'transcript';620 - onMessage?.call(json);621 - return;622 - }623 -624 - // pailot/<sessionId>/out — text, voice, image messages625 - if (parts.length == 3 && parts[2] == 'out') {626 - final sessionId = parts[1];627 - json['sessionId'] ??= sessionId;628 - onMessage?.call(json);629 - return;630 - }631 -632 - // pailot/<sessionId>/typing633 - if (parts.length == 3 && parts[2] == 'typing') {634 - final sessionId = parts[1];635 - json['type'] = 'typing';636 - json['sessionId'] ??= sessionId;637 - // Map 'active' field to the 'typing'/'isTyping' fields chat_screen expects638 - final active = json['active'] as bool? ?? true;639 - json['typing'] = active;640 - onMessage?.call(json);641 - return;642 - }643 -644 - // pailot/<sessionId>/screenshot645 - if (parts.length == 3 && parts[2] == 'screenshot') {646 - final sessionId = parts[1];647 - json['type'] = 'screenshot';648 - json['sessionId'] ??= sessionId;649 - // Map imageBase64 to 'data' for compatibility with chat_screen handler650 - json['data'] ??= json['imageBase64'];585 + // pailot/out — ALL content messages (text, voice, image, typing, screenshot, transcript)586 + // Each message carries its type and sessionId in the payload.587 + if (topic == 'pailot/out') {588 + final type = json['type'] as String?;589 + // Normalize typing fields for chat_screen590 + if (type == 'typing') {591 + final active = json['active'] as bool? ?? true;592 + json['typing'] = active;593 + }594 + // Normalize screenshot fields595 + if (type == 'screenshot') {596 + json['data'] ??= json['imageBase64'];597 + }651 598 onMessage?.call(json); 652 599 return; 653 600 } .. .. @@ -851,23 +798,23 @@ 851 798 switch (state) { 852 799 case AppLifecycleState.resumed: 853 800 if (_intentionalClose) break; 854 - _mqttLog('MQTT: app resumed, status=$_status client=${_client != null} mqttState=${_client?.connectionStatus?.state}');855 - final client = _client;856 - if (client == null || client.connectionStatus?.state != MqttConnectionState.connected) {857 - // Clearly disconnected — fast reconnect to last known host858 - _mqttLog('MQTT: disconnected on resume, reconnecting...');859 - _client = null;860 - _setStatus(ConnectionStatus.reconnecting);861 - if (connectedHost != null) {862 - _fastReconnect(connectedHost!);863 - } else {864 - connect();865 - }801 + _mqttLog('MQTT: app resumed — reconnecting to last host');802 + // Kill old client completely (disable autoReconnect first to prevent803 + // the MQTT library from spawning its own reconnect attempt)804 + final oldClient = _client;805 + if (oldClient != null) {806 + oldClient.autoReconnect = false;807 + try { oldClient.disconnect(); } catch (_) {}808 + }809 + _updatesSub?.cancel();810 + _updatesSub = null;811 + _client = null;812 + _setStatus(ConnectionStatus.reconnecting);813 + onReconnecting?.call();814 + if (connectedHost != null) {815 + _fastReconnect(connectedHost!);866 816 } else { 867 - // Appears connected — trigger catch_up to fetch missed messages.868 - // Don't disconnect! iOS may have buffered messages while suspended.869 - _mqttLog('MQTT: appears connected on resume, triggering catch_up');870 - onResume?.call();817 + connect();871 818 } 872 819 case AppLifecycleState.paused: 873 820 break;