Matthias Nott
9 days ago 6cbbea9b96db551e5c0ac26f0ace3d4c3d82a276
fix: single pailot/out topic, per-session file locks, merge protection, resume reconnect
4 files modified
changed files
lib/providers/providers.dart patch | view | blame | history
lib/screens/chat_screen.dart patch | view | blame | history
lib/services/message_store.dart patch | view | blame | history
lib/services/mqtt_service.dart patch | view | blame | history
lib/providers/providers.dart
....@@ -100,28 +100,38 @@
100100
101101 /// Switch to a new session and load its messages.
102102 Future<void> switchSession(String sessionId) async {
103
- // Log caller for debugging
104103 final trace = StackTrace.current.toString().split('\n').take(4).join(' | ');
105104 TraceService.instance.addTrace(
106105 'switchSession',
107106 'from=${_currentSessionId?.substring(0, 8) ?? "null"}(${state.length}) → ${sessionId.substring(0, 8)} | $trace',
108107 );
109
- // Write current session DIRECTLY to disk (no debounce — prevents data loss)
108
+ // Write current session to disk
110109 if (_currentSessionId != null && state.isNotEmpty) {
111110 await MessageStore.writeDirect(_currentSessionId!, state);
112111 }
113112
113
+ // Skip reload if staying on the same session — messages are already in memory
114
+ if (_currentSessionId == sessionId) {
115
+ TraceService.instance.addTrace('switchSession SKIP', 'already on ${sessionId.substring(0, 8)}');
116
+ return;
117
+ }
118
+
114119 _currentSessionId = sessionId;
115120 final messages = await MessageStore.loadAll(sessionId);
116
- state = messages;
121
+ // Merge: if addMessage ran during loadAll and added messages for THIS session,
122
+ // they'll be in state but not in the loaded messages. Keep the longer list.
123
+ if (state.length > messages.length && _currentSessionId == sessionId) {
124
+ TraceService.instance.addTrace('switchSession MERGE', 'kept ${state.length} (loaded ${messages.length})');
125
+ } else {
126
+ state = messages;
127
+ }
117128 }
118129
119130 /// Add a message to the current session.
120131 void addMessage(Message message) {
121132 state = [...state, message];
122133 if (_currentSessionId != null) {
123
- // Write immediately (not debounced) to prevent race with switchSession's loadAll
124
- MessageStore.writeDirect(_currentSessionId!, state);
134
+ MessageStore.save(_currentSessionId!, state);
125135 }
126136 }
127137
lib/screens/chat_screen.dart
....@@ -216,12 +216,15 @@
216216 _ws!.onOpen = () {
217217 _sessionReady = false; // Gate messages until sessions arrive
218218 _pendingMessages.clear();
219
- final activeId = ref.read(activeSessionIdProvider);
220
- _sendCommand('sync', activeId != null ? {'activeSessionId': activeId} : null);
221
- // catch_up is sent after sessions arrive (in _handleSessions)
222
-
223
- // Re-register APNs token after reconnect so daemon always has a fresh token
224
- _push?.onMqttConnected();
219
+ // Delay sync slightly to let broker acknowledge our subscriptions first.
220
+ // Without this, the catch_up response arrives before pailot/control/out
221
+ // subscription is active, and the message is lost.
222
+ Future.delayed(const Duration(milliseconds: 500), () {
223
+ if (!mounted) return;
224
+ final activeId = ref.read(activeSessionIdProvider);
225
+ _sendCommand('sync', activeId != null ? {'activeSessionId': activeId} : null);
226
+ _push?.onMqttConnected();
227
+ });
225228 };
226229 _ws!.onResume = () {
227230 // App came back from background. The in-memory state already has
....@@ -534,8 +537,10 @@
534537 status: MessageStatus.sent,
535538 );
536539
537
- final activeId = ref.read(activeSessionIdProvider);
538
- if (sessionId != null && sessionId != activeId) {
540
+ // Use currentSessionId from notifier (what's actually loaded in the provider)
541
+ // not activeSessionIdProvider (can be stale after background resume)
542
+ final currentId = ref.read(messagesProvider.notifier).currentSessionId;
543
+ if (sessionId != null && sessionId != currentId) {
539544 // Store message for the other session so it's there when user switches
540545 TraceService.instance.addTrace(
541546 'message stored for session',
....@@ -608,9 +613,9 @@
608613 duration: duration,
609614 );
610615
611
- final activeId = ref.read(activeSessionIdProvider);
612
- _chatLog('voice: sessionId=$sessionId activeId=$activeId audioPath=$savedAudioPath content="${content.substring(0, content.length.clamp(0, 30))}"');
613
- if (sessionId != null && sessionId != activeId) {
616
+ final currentId = ref.read(messagesProvider.notifier).currentSessionId;
617
+ _chatLog('voice: sessionId=$sessionId currentId=$currentId audioPath=$savedAudioPath content="${content.substring(0, content.length.clamp(0, 30))}"');
618
+ if (sessionId != null && sessionId != currentId) {
614619 _chatLog('voice: cross-session, storing for $sessionId');
615620 await _storeForSession(sessionId, storedMessage);
616621 _chatLog('voice: stored, incrementing unread');
....@@ -679,9 +684,9 @@
679684 status: MessageStatus.sent,
680685 );
681686
682
- // Cross-session routing: store for target session if not active
683
- final activeId = ref.read(activeSessionIdProvider);
684
- if (sessionId != null && sessionId != activeId) {
687
+ // Cross-session routing: store for target session if not currently loaded
688
+ final currentId = ref.read(messagesProvider.notifier).currentSessionId;
689
+ if (sessionId != null && sessionId != currentId) {
685690 _storeForSession(sessionId, message);
686691 _incrementUnread(sessionId);
687692 return;
lib/services/message_store.dart
....@@ -15,6 +15,8 @@
1515 static Directory? _baseDir;
1616 static Timer? _debounceTimer;
1717 static final Map<String, List<Message>> _pendingSaves = {};
18
+ // Per-session lock to prevent concurrent read/write on the same file
19
+ static final Map<String, Completer<void>> _locks = {};
1820
1921 static const _backupChannel =
2022 MethodChannel('com.mnsoft.pailot/backup');
....@@ -60,10 +62,25 @@
6062
6163 /// Write directly to disk, bypassing debounce. For critical saves.
6264 static Future<void> writeDirect(String sessionId, List<Message> messages) async {
63
- // Cancel ALL pending debounce to prevent race with frozen iOS timers
6465 _debounceTimer?.cancel();
6566 _pendingSaves.remove(sessionId);
66
- await _writeSession(sessionId, messages);
67
+ await _withLock(sessionId, () => _writeSession(sessionId, messages));
68
+ }
69
+
70
+ /// Acquire a per-session lock, run the operation, release.
71
+ static Future<T> _withLock<T>(String sessionId, Future<T> Function() fn) async {
72
+ // Wait for any existing operation on this session to finish
73
+ while (_locks.containsKey(sessionId)) {
74
+ await _locks[sessionId]!.future;
75
+ }
76
+ final completer = Completer<void>();
77
+ _locks[sessionId] = completer;
78
+ try {
79
+ return await fn();
80
+ } finally {
81
+ _locks.remove(sessionId);
82
+ completer.complete();
83
+ }
6784 }
6885
6986 /// Immediately flush all pending saves.
....@@ -77,7 +94,7 @@
7794 _pendingSaves.clear();
7895
7996 for (final entry in entries.entries) {
80
- await _writeSession(entry.key, entry.value);
97
+ await _withLock(entry.key, () => _writeSession(entry.key, entry.value));
8198 }
8299 }
83100
....@@ -128,6 +145,10 @@
128145
129146 /// Load all messages for a session (no pagination).
130147 static Future<List<Message>> loadAll(String sessionId) async {
148
+ return _withLock(sessionId, () => _loadAllImpl(sessionId));
149
+ }
150
+
151
+ static Future<List<Message>> _loadAllImpl(String sessionId) async {
131152 try {
132153 final dir = await _getBaseDir();
133154 final file = File('${dir.path}/${_fileForSession(sessionId)}');
lib/services/mqtt_service.dart
....@@ -65,8 +65,7 @@
6565 final Set<String> _seenMsgIds = {};
6666 final List<String> _seenMsgIdOrder = [];
6767
68
- // Per-session explicit subscriptions (wildcards broken in aedes)
69
- final Set<String> _subscribedSessions = {};
68
+ // (Per-session subscriptions removed — single pailot/out topic now)
7069 static const int _maxSeenIds = 500;
7170
7271 // Callbacks
....@@ -492,34 +491,14 @@
492491 _mqttLog('MQTT: _subscribe called but client is null');
493492 return;
494493 }
495
- // Subscribe to exact topics only — wildcard pailot/+/out is broken in aedes.
496
- // Per-session topics are added dynamically when the session list arrives.
497
- _mqttLog('MQTT: subscribing to base + ${_subscribedSessions.length} session topics...');
494
+ // Single outbound topic — all messages carry sessionId in payload.
495
+ // Client routes messages to the correct session based on payload.
496
+ _mqttLog('MQTT: subscribing to topics...');
497
+ client.subscribe('pailot/out', MqttQos.atLeastOnce);
498498 client.subscribe('pailot/sessions', MqttQos.atLeastOnce);
499499 client.subscribe('pailot/status', MqttQos.atLeastOnce);
500500 client.subscribe('pailot/projects', MqttQos.atLeastOnce);
501501 client.subscribe('pailot/control/out', MqttQos.atLeastOnce);
502
- client.subscribe('pailot/voice/transcript', MqttQos.atLeastOnce);
503
- // Re-subscribe to all known per-session topics
504
- for (final sid in _subscribedSessions) {
505
- client.subscribe('pailot/$sid/out', MqttQos.atLeastOnce);
506
- client.subscribe('pailot/$sid/typing', MqttQos.atMostOnce);
507
- client.subscribe('pailot/$sid/screenshot', MqttQos.atLeastOnce);
508
- }
509
- }
510
-
511
- /// Subscribe to per-session topics when session list arrives.
512
- void _subscribeToSessions(List<String> sessionIds) {
513
- final client = _client;
514
- if (client == null) return;
515
- for (final sid in sessionIds) {
516
- if (_subscribedSessions.contains(sid)) continue;
517
- _subscribedSessions.add(sid);
518
- client.subscribe('pailot/$sid/out', MqttQos.atLeastOnce);
519
- client.subscribe('pailot/$sid/typing', MqttQos.atMostOnce);
520
- client.subscribe('pailot/$sid/screenshot', MqttQos.atLeastOnce);
521
- _mqttLog('MQTT: subscribed to session ${sid.substring(0, 8)}');
522
- }
523502 }
524503
525504 void _listenMessages() {
....@@ -576,20 +555,9 @@
576555 /// Translates MQTT topic structure into the flat message format
577556 /// that chat_screen expects.
578557 void _dispatchMessage(String topic, Map<String, dynamic> json) {
579
- final parts = topic.split('/');
580
-
581
- // pailot/sessions — also dynamically subscribe to per-session topics
558
+ // pailot/sessions
582559 if (topic == 'pailot/sessions') {
583560 json['type'] = 'sessions';
584
- final sessions = json['sessions'] as List<dynamic>?;
585
- if (sessions != null) {
586
- final ids = sessions
587
- .map((s) => (s as Map<String, dynamic>)['id'] as String?)
588
- .where((id) => id != null && id.isNotEmpty)
589
- .cast<String>()
590
- .toList();
591
- if (ids.isNotEmpty) _subscribeToSessions(ids);
592
- }
593561 onMessage?.call(json);
594562 return;
595563 }
....@@ -608,46 +576,25 @@
608576 return;
609577 }
610578
611
- // pailot/control/out — command responses (session_switched, session_renamed, error, unread)
579
+ // pailot/control/out — command responses
612580 if (topic == 'pailot/control/out') {
613581 onMessage?.call(json);
614582 return;
615583 }
616584
617
- // pailot/voice/transcript
618
- if (topic == 'pailot/voice/transcript') {
619
- json['type'] = 'transcript';
620
- onMessage?.call(json);
621
- return;
622
- }
623
-
624
- // pailot/<sessionId>/out — text, voice, image messages
625
- if (parts.length == 3 && parts[2] == 'out') {
626
- final sessionId = parts[1];
627
- json['sessionId'] ??= sessionId;
628
- onMessage?.call(json);
629
- return;
630
- }
631
-
632
- // pailot/<sessionId>/typing
633
- if (parts.length == 3 && parts[2] == 'typing') {
634
- final sessionId = parts[1];
635
- json['type'] = 'typing';
636
- json['sessionId'] ??= sessionId;
637
- // Map 'active' field to the 'typing'/'isTyping' fields chat_screen expects
638
- final active = json['active'] as bool? ?? true;
639
- json['typing'] = active;
640
- onMessage?.call(json);
641
- return;
642
- }
643
-
644
- // pailot/<sessionId>/screenshot
645
- if (parts.length == 3 && parts[2] == 'screenshot') {
646
- final sessionId = parts[1];
647
- json['type'] = 'screenshot';
648
- json['sessionId'] ??= sessionId;
649
- // Map imageBase64 to 'data' for compatibility with chat_screen handler
650
- json['data'] ??= json['imageBase64'];
585
+ // pailot/out — ALL content messages (text, voice, image, typing, screenshot, transcript)
586
+ // Each message carries its type and sessionId in the payload.
587
+ if (topic == 'pailot/out') {
588
+ final type = json['type'] as String?;
589
+ // Normalize typing fields for chat_screen
590
+ if (type == 'typing') {
591
+ final active = json['active'] as bool? ?? true;
592
+ json['typing'] = active;
593
+ }
594
+ // Normalize screenshot fields
595
+ if (type == 'screenshot') {
596
+ json['data'] ??= json['imageBase64'];
597
+ }
651598 onMessage?.call(json);
652599 return;
653600 }
....@@ -851,23 +798,23 @@
851798 switch (state) {
852799 case AppLifecycleState.resumed:
853800 if (_intentionalClose) break;
854
- _mqttLog('MQTT: app resumed, status=$_status client=${_client != null} mqttState=${_client?.connectionStatus?.state}');
855
- final client = _client;
856
- if (client == null || client.connectionStatus?.state != MqttConnectionState.connected) {
857
- // Clearly disconnected — fast reconnect to last known host
858
- _mqttLog('MQTT: disconnected on resume, reconnecting...');
859
- _client = null;
860
- _setStatus(ConnectionStatus.reconnecting);
861
- if (connectedHost != null) {
862
- _fastReconnect(connectedHost!);
863
- } else {
864
- connect();
865
- }
801
+ _mqttLog('MQTT: app resumed — reconnecting to last host');
802
+ // Kill old client completely (disable autoReconnect first to prevent
803
+ // the MQTT library from spawning its own reconnect attempt)
804
+ final oldClient = _client;
805
+ if (oldClient != null) {
806
+ oldClient.autoReconnect = false;
807
+ try { oldClient.disconnect(); } catch (_) {}
808
+ }
809
+ _updatesSub?.cancel();
810
+ _updatesSub = null;
811
+ _client = null;
812
+ _setStatus(ConnectionStatus.reconnecting);
813
+ onReconnecting?.call();
814
+ if (connectedHost != null) {
815
+ _fastReconnect(connectedHost!);
866816 } else {
867
- // Appears connected — trigger catch_up to fetch missed messages.
868
- // Don't disconnect! iOS may have buffered messages while suspended.
869
- _mqttLog('MQTT: appears connected on resume, triggering catch_up');
870
- onResume?.call();
817
+ connect();
871818 }
872819 case AppLifecycleState.paused:
873820 break;