From 06bb73662d32d65d1e775a4dd35f67d82d673e40 Mon Sep 17 00:00:00 2001
From: Matthias Nott <mnott@mnsoft.org>
Date: Mon, 06 Apr 2026 15:32:03 +0200
Subject: [PATCH] feat: rewrite message store - append-only log, sync routing, eliminates async race conditions
---
lib/screens/chat_screen.dart | 173 ++++++++++++++++++++++++---------------------------------
1 files changed, 72 insertions(+), 101 deletions(-)
diff --git a/lib/screens/chat_screen.dart b/lib/screens/chat_screen.dart
index f331ab9..ee0f68b 100644
--- a/lib/screens/chat_screen.dart
+++ b/lib/screens/chat_screen.dart
@@ -72,7 +72,8 @@
final Set<int> _seenSeqs = {};
bool _sessionReady = false;
final List<Map<String, dynamic>> _pendingMessages = [];
- final Map<String, List<Message>> _catchUpPending = {};
+ // _catchUpPending removed: cross-session catch_up messages are now appended
+ // synchronously via MessageStoreV2.append() in the catch_up handler.
List<String>? _cachedSessionOrder;
Timer? _typingTimer;
bool _unreadCountsLoaded = false;
@@ -86,6 +87,9 @@
}
Future<void> _initAll() async {
+ // Initialize append-only message store (reads log, rebuilds index, compacts).
+ await MessageStoreV2.initialize();
+
// Load persisted state BEFORE connecting
final prefs = await SharedPreferences.getInstance();
_lastSeq = prefs.getInt('lastSeq') ?? 0;
@@ -104,8 +108,8 @@
final savedSessionId = prefs.getString('activeSessionId');
if (savedSessionId != null && mounted) {
ref.read(activeSessionIdProvider.notifier).state = savedSessionId;
- // Load messages for the restored session so chat isn't empty on startup
- await ref.read(messagesProvider.notifier).switchSession(savedSessionId);
+ // Synchronous: no async gap between load and any arriving messages.
+ ref.read(messagesProvider.notifier).switchSession(savedSessionId);
}
if (!mounted) return;
@@ -165,14 +169,11 @@
_persistUnreadCounts(counts);
}
+ // ignore: unused_field
bool _isLoadingMore = false;
void _onScroll() {
- if (!_isLoadingMore &&
- _scrollController.position.pixels >=
- _scrollController.position.maxScrollExtent - 100) {
- _isLoadingMore = true;
- ref.read(messagesProvider.notifier).loadMore().then((_) => _isLoadingMore = false);
- }
+ // Pagination removed: all messages are loaded synchronously on session
+ // switch via the in-memory index. Nothing to do on scroll.
}
// Helper: send a command to the gateway in the expected format
@@ -377,18 +378,23 @@
_lastSeq = serverSeq;
_saveLastSeq();
}
- // Merge catch_up messages: only add messages not already in local storage.
- // We check by content match against existing messages to avoid duplicates
- // while still picking up messages that arrived while the app was backgrounded.
+ // Merge catch_up messages: only add messages not already displayed.
+ // Dedup by content to avoid showing messages already in the UI.
final catchUpMsgs = msg['messages'] as List<dynamic>?;
if (catchUpMsgs != null && catchUpMsgs.isNotEmpty) {
_isCatchingUp = true;
final activeId = ref.read(activeSessionIdProvider);
+ final currentId = ref.read(messagesProvider.notifier).currentSessionId;
final existing = ref.read(messagesProvider);
final existingContents = existing
.where((m) => m.role == MessageRole.assistant)
.map((m) => m.content)
.toSet();
+
+ // Collect cross-session sessions that received messages (for toasts).
+ final crossSessionCounts = <String, int>{};
+ final crossSessionPreviews = <String, String>{};
+
for (final m in catchUpMsgs) {
final map = m as Map<String, dynamic>;
final msgType = map['type'] as String? ?? 'text';
@@ -417,52 +423,46 @@
);
}
- _chatLog('catch_up msg: session=${msgSessionId?.substring(0, 8) ?? "NULL"} active=${activeId?.substring(0, 8)} match=${msgSessionId == activeId || msgSessionId == null} content="${content.substring(0, content.length.clamp(0, 40))}"');
- if (msgSessionId == null || msgSessionId == activeId) {
- // Active session or no session: add directly to chat
+ _chatLog('catch_up msg: session=${msgSessionId?.substring(0, 8) ?? "NULL"} active=${activeId?.substring(0, 8)} content="${content.substring(0, content.length.clamp(0, 40))}"');
+
+ if (msgSessionId == null || msgSessionId == currentId) {
+ // Active session or no session: add to UI (addMessage also appends to log).
ref.read(messagesProvider.notifier).addMessage(message);
} else {
- // Different session: store + unread badge + toast
- // Collect for batch storage below to avoid race condition
- _catchUpPending.putIfAbsent(msgSessionId, () => []).add(message);
+ // Cross-session: synchronous append — no race condition.
+ MessageStoreV2.append(msgSessionId, message);
_incrementUnread(msgSessionId);
+ crossSessionCounts[msgSessionId] = (crossSessionCounts[msgSessionId] ?? 0) + 1;
+ crossSessionPreviews.putIfAbsent(msgSessionId, () => content);
}
existingContents.add(content);
}
+
_isCatchingUp = false;
_scrollToBottom();
- // Batch-store cross-session messages (sequential to avoid race condition)
- if (_catchUpPending.isNotEmpty) {
- final pending = Map<String, List<Message>>.from(_catchUpPending);
- _catchUpPending.clear();
- // Show one toast per session with message count
- if (mounted) {
- final sessions = ref.read(sessionsProvider);
- for (final entry in pending.entries) {
- final session = sessions.firstWhere(
- (s) => s.id == entry.key,
- orElse: () => Session(id: entry.key, index: 0, name: 'Unknown', type: 'claude'),
- );
- final count = entry.value.length;
- final preview = count == 1
- ? entry.value.first.content
- : '$count messages';
- ToastManager.show(
- context,
- sessionName: session.name,
- preview: preview.length > 100 ? '${preview.substring(0, 100)}...' : preview,
- onTap: () => _switchSession(entry.key),
- );
- }
+
+ // Show one toast per cross-session that received messages.
+ if (crossSessionCounts.isNotEmpty && mounted) {
+ final sessions = ref.read(sessionsProvider);
+ for (final entry in crossSessionCounts.entries) {
+ final sid = entry.key;
+ final count = entry.value;
+ final session = sessions.firstWhere(
+ (s) => s.id == sid,
+ orElse: () => Session(id: sid, index: 0, name: 'Unknown', type: 'claude'),
+ );
+ final preview = count == 1
+ ? (crossSessionPreviews[sid] ?? '')
+ : '$count messages';
+ ToastManager.show(
+ context,
+ sessionName: session.name,
+ preview: preview.length > 100 ? '${preview.substring(0, 100)}...' : preview,
+ onTap: () => _switchSession(sid),
+ );
}
- () async {
- for (final entry in pending.entries) {
- final existing = await MessageStore.loadAll(entry.key);
- MessageStore.save(entry.key, [...existing, ...entry.value]);
- await MessageStore.flush();
- }
- }();
}
+
// Clear unread for active session
if (activeId != null) {
final counts = Map<String, int>.from(ref.read(unreadCountsProvider));
@@ -500,6 +500,7 @@
orElse: () => sessions.first,
);
ref.read(activeSessionIdProvider.notifier).state = active.id;
+ // Synchronous session switch — no async gap.
ref.read(messagesProvider.notifier).switchSession(active.id);
SharedPreferences.getInstance().then((p) => p.setString('activeSessionId', active.id));
}
@@ -520,7 +521,7 @@
}
}
- Future<void> _handleIncomingMessage(Map<String, dynamic> msg) async {
+ void _handleIncomingMessage(Map<String, dynamic> msg) {
final sessionId = msg['sessionId'] as String?;
final content = msg['content'] as String? ??
msg['text'] as String? ??
@@ -537,16 +538,16 @@
status: MessageStatus.sent,
);
- // Use currentSessionId from notifier (what's actually loaded in the provider)
- // not activeSessionIdProvider (can be stale after background resume)
+ // Use currentSessionId from notifier (what's actually loaded in the provider),
+ // not activeSessionIdProvider (can be stale after background resume).
final currentId = ref.read(messagesProvider.notifier).currentSessionId;
if (sessionId != null && sessionId != currentId) {
- // Store message for the other session so it's there when user switches
+ // Append directly to the log for the target session — synchronous, no race.
TraceService.instance.addTrace(
'message stored for session',
'sessionId=${sessionId.substring(0, sessionId.length.clamp(0, 8))}, toast shown',
);
- await _storeForSession(sessionId, message);
+ MessageStoreV2.append(sessionId, message);
_incrementUnread(sessionId);
final sessions = ref.read(sessionsProvider);
final session = sessions.firstWhere(
@@ -616,9 +617,10 @@
final currentId = ref.read(messagesProvider.notifier).currentSessionId;
_chatLog('voice: sessionId=$sessionId currentId=$currentId audioPath=$savedAudioPath content="${content.substring(0, content.length.clamp(0, 30))}"');
if (sessionId != null && sessionId != currentId) {
- _chatLog('voice: cross-session, storing for $sessionId');
- await _storeForSession(sessionId, storedMessage);
- _chatLog('voice: stored, incrementing unread');
+ _chatLog('voice: cross-session, appending to store for $sessionId');
+ // Synchronous append — no async gap, no race condition.
+ MessageStoreV2.append(sessionId, storedMessage);
+ _chatLog('voice: appended, incrementing unread');
_incrementUnread(sessionId);
final sessions = ref.read(sessionsProvider);
final session = sessions.firstWhere(
@@ -684,10 +686,10 @@
status: MessageStatus.sent,
);
- // Cross-session routing: store for target session if not currently loaded
+ // Cross-session routing: append to log for target session if not currently loaded.
final currentId = ref.read(messagesProvider.notifier).currentSessionId;
if (sessionId != null && sessionId != currentId) {
- _storeForSession(sessionId, message);
+ MessageStoreV2.append(sessionId, message);
_incrementUnread(sessionId);
return;
}
@@ -697,51 +699,19 @@
_scrollToBottom();
}
- /// Store a message for a non-active session so it persists when the user switches to it.
- Future<void> _storeForSession(String sessionId, Message message) async {
- final existing = await MessageStore.loadAll(sessionId);
- _chatLog('storeForSession: $sessionId existing=${existing.length} adding type=${message.type.name} content="${message.content.substring(0, message.content.length.clamp(0, 30))}" audioUri=${message.audioUri != null ? "set(${message.audioUri!.length})" : "null"}');
- MessageStore.save(sessionId, [...existing, message]);
- await MessageStore.flush();
- // Verify
- final verify = await MessageStore.loadAll(sessionId);
- _chatLog('storeForSession: verified ${verify.length} messages after save');
+ /// Superseded by MessageStoreV2.append() — call sites now use the synchronous
+ /// append directly. Kept as dead code until all callers are confirmed removed.
+ // ignore: unused_element
+ void _storeForSession(String sessionId, Message message) {
+ MessageStoreV2.append(sessionId, message);
}
- /// Update a transcript for a message stored on disk (not in the active session).
- /// Scans all session files to find the message by ID, updates content, and saves.
+ /// With the append-only log, transcript updates for cross-session messages
+ /// are not patched back to disk (the append-only design doesn't support
+ /// in-place edits). The transcript is updated in-memory if the message is
+ /// in the active session. Cross-session transcript updates are a no-op.
Future<void> _updateTranscriptOnDisk(String messageId, String content) async {
- try {
- final dir = await getApplicationDocumentsDirectory();
- final msgDir = Directory('${dir.path}/messages');
- if (!await msgDir.exists()) return;
-
- await for (final entity in msgDir.list()) {
- if (entity is! File || !entity.path.endsWith('.json')) continue;
-
- final jsonStr = await entity.readAsString();
- final List<dynamic> jsonList = jsonDecode(jsonStr) as List<dynamic>;
- bool found = false;
-
- final updated = jsonList.map((j) {
- final map = j as Map<String, dynamic>;
- if (map['id'] == messageId) {
- found = true;
- return {...map, 'content': content};
- }
- return map;
- }).toList();
-
- if (found) {
- await entity.writeAsString(jsonEncode(updated));
- _chatLog('transcript: updated messageId=$messageId on disk in ${entity.path.split('/').last}');
- return;
- }
- }
- _chatLog('transcript: messageId=$messageId not found on disk');
- } catch (e) {
- _chatLog('transcript: disk update error=$e');
- }
+ _chatLog('transcript: cross-session update for messageId=$messageId — in-memory only (append-only log)');
}
void _incrementUnread(String sessionId) {
@@ -770,7 +740,8 @@
ref.read(isTypingProvider.notifier).state = false;
ref.read(activeSessionIdProvider.notifier).state = sessionId;
- await ref.read(messagesProvider.notifier).switchSession(sessionId);
+ // Synchronous — no async gap between session switch and incoming messages.
+ ref.read(messagesProvider.notifier).switchSession(sessionId);
SharedPreferences.getInstance().then((p) => p.setString('activeSessionId', sessionId));
final counts = Map<String, int>.from(ref.read(unreadCountsProvider));
--
Gitblit v1.3.1