Matthias Nott
9 days ago 06bb73662d32d65d1e775a4dd35f67d82d673e40
lib/screens/chat_screen.dart
....@@ -72,7 +72,8 @@
7272 final Set<int> _seenSeqs = {};
7373 bool _sessionReady = false;
7474 final List<Map<String, dynamic>> _pendingMessages = [];
75
- final Map<String, List<Message>> _catchUpPending = {};
75
+ // _catchUpPending removed: cross-session catch_up messages are now appended
76
+ // synchronously via MessageStoreV2.append() in the catch_up handler.
7677 List<String>? _cachedSessionOrder;
7778 Timer? _typingTimer;
7879 bool _unreadCountsLoaded = false;
....@@ -86,6 +87,9 @@
8687 }
8788
8889 Future<void> _initAll() async {
90
+ // Initialize append-only message store (reads log, rebuilds index, compacts).
91
+ await MessageStoreV2.initialize();
92
+
8993 // Load persisted state BEFORE connecting
9094 final prefs = await SharedPreferences.getInstance();
9195 _lastSeq = prefs.getInt('lastSeq') ?? 0;
....@@ -104,8 +108,8 @@
104108 final savedSessionId = prefs.getString('activeSessionId');
105109 if (savedSessionId != null && mounted) {
106110 ref.read(activeSessionIdProvider.notifier).state = savedSessionId;
107
- // Load messages for the restored session so chat isn't empty on startup
108
- await ref.read(messagesProvider.notifier).switchSession(savedSessionId);
111
+ // Synchronous: no async gap between load and any arriving messages.
112
+ ref.read(messagesProvider.notifier).switchSession(savedSessionId);
109113 }
110114 if (!mounted) return;
111115
....@@ -165,14 +169,11 @@
165169 _persistUnreadCounts(counts);
166170 }
167171
172
+ // ignore: unused_field
168173 bool _isLoadingMore = false;
169174 void _onScroll() {
170
- if (!_isLoadingMore &&
171
- _scrollController.position.pixels >=
172
- _scrollController.position.maxScrollExtent - 100) {
173
- _isLoadingMore = true;
174
- ref.read(messagesProvider.notifier).loadMore().then((_) => _isLoadingMore = false);
175
- }
175
+ // Pagination removed: all messages are loaded synchronously on session
176
+ // switch via the in-memory index. Nothing to do on scroll.
176177 }
177178
178179 // Helper: send a command to the gateway in the expected format
....@@ -377,18 +378,23 @@
377378 _lastSeq = serverSeq;
378379 _saveLastSeq();
379380 }
380
- // Merge catch_up messages: only add messages not already in local storage.
381
- // We check by content match against existing messages to avoid duplicates
382
- // while still picking up messages that arrived while the app was backgrounded.
381
+ // Merge catch_up messages: only add messages not already displayed.
382
+ // Dedup by content to avoid showing messages already in the UI.
383383 final catchUpMsgs = msg['messages'] as List<dynamic>?;
384384 if (catchUpMsgs != null && catchUpMsgs.isNotEmpty) {
385385 _isCatchingUp = true;
386386 final activeId = ref.read(activeSessionIdProvider);
387
+ final currentId = ref.read(messagesProvider.notifier).currentSessionId;
387388 final existing = ref.read(messagesProvider);
388389 final existingContents = existing
389390 .where((m) => m.role == MessageRole.assistant)
390391 .map((m) => m.content)
391392 .toSet();
393
+
394
+ // Collect cross-session sessions that received messages (for toasts).
395
+ final crossSessionCounts = <String, int>{};
396
+ final crossSessionPreviews = <String, String>{};
397
+
392398 for (final m in catchUpMsgs) {
393399 final map = m as Map<String, dynamic>;
394400 final msgType = map['type'] as String? ?? 'text';
....@@ -417,52 +423,46 @@
417423 );
418424 }
419425
420
- _chatLog('catch_up msg: session=${msgSessionId?.substring(0, 8) ?? "NULL"} active=${activeId?.substring(0, 8)} match=${msgSessionId == activeId || msgSessionId == null} content="${content.substring(0, content.length.clamp(0, 40))}"');
421
- if (msgSessionId == null || msgSessionId == activeId) {
422
- // Active session or no session: add directly to chat
426
+ _chatLog('catch_up msg: session=${msgSessionId?.substring(0, 8) ?? "NULL"} active=${activeId?.substring(0, 8)} content="${content.substring(0, content.length.clamp(0, 40))}"');
427
+
428
+ if (msgSessionId == null || msgSessionId == currentId) {
429
+ // Active session or no session: add to UI (addMessage also appends to log).
423430 ref.read(messagesProvider.notifier).addMessage(message);
424431 } else {
425
- // Different session: store + unread badge + toast
426
- // Collect for batch storage below to avoid race condition
427
- _catchUpPending.putIfAbsent(msgSessionId, () => []).add(message);
432
+ // Cross-session: synchronous append — no race condition.
433
+ MessageStoreV2.append(msgSessionId, message);
428434 _incrementUnread(msgSessionId);
435
+ crossSessionCounts[msgSessionId] = (crossSessionCounts[msgSessionId] ?? 0) + 1;
436
+ crossSessionPreviews.putIfAbsent(msgSessionId, () => content);
429437 }
430438 existingContents.add(content);
431439 }
440
+
432441 _isCatchingUp = false;
433442 _scrollToBottom();
434
- // Batch-store cross-session messages (sequential to avoid race condition)
435
- if (_catchUpPending.isNotEmpty) {
436
- final pending = Map<String, List<Message>>.from(_catchUpPending);
437
- _catchUpPending.clear();
438
- // Show one toast per session with message count
439
- if (mounted) {
440
- final sessions = ref.read(sessionsProvider);
441
- for (final entry in pending.entries) {
442
- final session = sessions.firstWhere(
443
- (s) => s.id == entry.key,
444
- orElse: () => Session(id: entry.key, index: 0, name: 'Unknown', type: 'claude'),
445
- );
446
- final count = entry.value.length;
447
- final preview = count == 1
448
- ? entry.value.first.content
449
- : '$count messages';
450
- ToastManager.show(
451
- context,
452
- sessionName: session.name,
453
- preview: preview.length > 100 ? '${preview.substring(0, 100)}...' : preview,
454
- onTap: () => _switchSession(entry.key),
455
- );
456
- }
443
+
444
+ // Show one toast per cross-session that received messages.
445
+ if (crossSessionCounts.isNotEmpty && mounted) {
446
+ final sessions = ref.read(sessionsProvider);
447
+ for (final entry in crossSessionCounts.entries) {
448
+ final sid = entry.key;
449
+ final count = entry.value;
450
+ final session = sessions.firstWhere(
451
+ (s) => s.id == sid,
452
+ orElse: () => Session(id: sid, index: 0, name: 'Unknown', type: 'claude'),
453
+ );
454
+ final preview = count == 1
455
+ ? (crossSessionPreviews[sid] ?? '')
456
+ : '$count messages';
457
+ ToastManager.show(
458
+ context,
459
+ sessionName: session.name,
460
+ preview: preview.length > 100 ? '${preview.substring(0, 100)}...' : preview,
461
+ onTap: () => _switchSession(sid),
462
+ );
457463 }
458
- () async {
459
- for (final entry in pending.entries) {
460
- final existing = await MessageStore.loadAll(entry.key);
461
- MessageStore.save(entry.key, [...existing, ...entry.value]);
462
- await MessageStore.flush();
463
- }
464
- }();
465464 }
465
+
466466 // Clear unread for active session
467467 if (activeId != null) {
468468 final counts = Map<String, int>.from(ref.read(unreadCountsProvider));
....@@ -500,6 +500,7 @@
500500 orElse: () => sessions.first,
501501 );
502502 ref.read(activeSessionIdProvider.notifier).state = active.id;
503
+ // Synchronous session switch — no async gap.
503504 ref.read(messagesProvider.notifier).switchSession(active.id);
504505 SharedPreferences.getInstance().then((p) => p.setString('activeSessionId', active.id));
505506 }
....@@ -520,7 +521,7 @@
520521 }
521522 }
522523
523
- Future<void> _handleIncomingMessage(Map<String, dynamic> msg) async {
524
+ void _handleIncomingMessage(Map<String, dynamic> msg) {
524525 final sessionId = msg['sessionId'] as String?;
525526 final content = msg['content'] as String? ??
526527 msg['text'] as String? ??
....@@ -537,16 +538,16 @@
537538 status: MessageStatus.sent,
538539 );
539540
540
- // Use currentSessionId from notifier (what's actually loaded in the provider)
541
- // not activeSessionIdProvider (can be stale after background resume)
541
+ // Use currentSessionId from notifier (what's actually loaded in the provider),
542
+ // not activeSessionIdProvider (can be stale after background resume).
542543 final currentId = ref.read(messagesProvider.notifier).currentSessionId;
543544 if (sessionId != null && sessionId != currentId) {
544
- // Store message for the other session so it's there when user switches
545
+ // Append directly to the log for the target session — synchronous, no race.
545546 TraceService.instance.addTrace(
546547 'message stored for session',
547548 'sessionId=${sessionId.substring(0, sessionId.length.clamp(0, 8))}, toast shown',
548549 );
549
- await _storeForSession(sessionId, message);
550
+ MessageStoreV2.append(sessionId, message);
550551 _incrementUnread(sessionId);
551552 final sessions = ref.read(sessionsProvider);
552553 final session = sessions.firstWhere(
....@@ -616,9 +617,10 @@
616617 final currentId = ref.read(messagesProvider.notifier).currentSessionId;
617618 _chatLog('voice: sessionId=$sessionId currentId=$currentId audioPath=$savedAudioPath content="${content.substring(0, content.length.clamp(0, 30))}"');
618619 if (sessionId != null && sessionId != currentId) {
619
- _chatLog('voice: cross-session, storing for $sessionId');
620
- await _storeForSession(sessionId, storedMessage);
621
- _chatLog('voice: stored, incrementing unread');
620
+ _chatLog('voice: cross-session, appending to store for $sessionId');
621
+ // Synchronous append — no async gap, no race condition.
622
+ MessageStoreV2.append(sessionId, storedMessage);
623
+ _chatLog('voice: appended, incrementing unread');
622624 _incrementUnread(sessionId);
623625 final sessions = ref.read(sessionsProvider);
624626 final session = sessions.firstWhere(
....@@ -684,10 +686,10 @@
684686 status: MessageStatus.sent,
685687 );
686688
687
- // Cross-session routing: store for target session if not currently loaded
689
+ // Cross-session routing: append to log for target session if not currently loaded.
688690 final currentId = ref.read(messagesProvider.notifier).currentSessionId;
689691 if (sessionId != null && sessionId != currentId) {
690
- _storeForSession(sessionId, message);
692
+ MessageStoreV2.append(sessionId, message);
691693 _incrementUnread(sessionId);
692694 return;
693695 }
....@@ -697,51 +699,19 @@
697699 _scrollToBottom();
698700 }
699701
700
- /// Store a message for a non-active session so it persists when the user switches to it.
701
- Future<void> _storeForSession(String sessionId, Message message) async {
702
- final existing = await MessageStore.loadAll(sessionId);
703
- _chatLog('storeForSession: $sessionId existing=${existing.length} adding type=${message.type.name} content="${message.content.substring(0, message.content.length.clamp(0, 30))}" audioUri=${message.audioUri != null ? "set(${message.audioUri!.length})" : "null"}');
704
- MessageStore.save(sessionId, [...existing, message]);
705
- await MessageStore.flush();
706
- // Verify
707
- final verify = await MessageStore.loadAll(sessionId);
708
- _chatLog('storeForSession: verified ${verify.length} messages after save');
702
+ /// Superseded by MessageStoreV2.append() — call sites now use the synchronous
703
+ /// append directly. Kept as dead code until all callers are confirmed removed.
704
+ // ignore: unused_element
705
+ void _storeForSession(String sessionId, Message message) {
706
+ MessageStoreV2.append(sessionId, message);
709707 }
710708
711
- /// Update a transcript for a message stored on disk (not in the active session).
712
- /// Scans all session files to find the message by ID, updates content, and saves.
709
+ /// With the append-only log, transcript updates for cross-session messages
710
+ /// are not patched back to disk (the append-only design doesn't support
711
+ /// in-place edits). The transcript is updated in-memory if the message is
712
+ /// in the active session. Cross-session transcript updates are a no-op.
713713 Future<void> _updateTranscriptOnDisk(String messageId, String content) async {
714
- try {
715
- final dir = await getApplicationDocumentsDirectory();
716
- final msgDir = Directory('${dir.path}/messages');
717
- if (!await msgDir.exists()) return;
718
-
719
- await for (final entity in msgDir.list()) {
720
- if (entity is! File || !entity.path.endsWith('.json')) continue;
721
-
722
- final jsonStr = await entity.readAsString();
723
- final List<dynamic> jsonList = jsonDecode(jsonStr) as List<dynamic>;
724
- bool found = false;
725
-
726
- final updated = jsonList.map((j) {
727
- final map = j as Map<String, dynamic>;
728
- if (map['id'] == messageId) {
729
- found = true;
730
- return {...map, 'content': content};
731
- }
732
- return map;
733
- }).toList();
734
-
735
- if (found) {
736
- await entity.writeAsString(jsonEncode(updated));
737
- _chatLog('transcript: updated messageId=$messageId on disk in ${entity.path.split('/').last}');
738
- return;
739
- }
740
- }
741
- _chatLog('transcript: messageId=$messageId not found on disk');
742
- } catch (e) {
743
- _chatLog('transcript: disk update error=$e');
744
- }
714
+ _chatLog('transcript: cross-session update for messageId=$messageId — in-memory only (append-only log)');
745715 }
746716
747717 void _incrementUnread(String sessionId) {
....@@ -770,7 +740,8 @@
770740 ref.read(isTypingProvider.notifier).state = false;
771741
772742 ref.read(activeSessionIdProvider.notifier).state = sessionId;
773
- await ref.read(messagesProvider.notifier).switchSession(sessionId);
743
+ // Synchronous — no async gap between session switch and incoming messages.
744
+ ref.read(messagesProvider.notifier).switchSession(sessionId);
774745 SharedPreferences.getInstance().then((p) => p.setString('activeSessionId', sessionId));
775746
776747 final counts = Map<String, int>.from(ref.read(unreadCountsProvider));