Matthias Nott
9 days ago 06bb73662d32d65d1e775a4dd35f67d82d673e40
feat: rewrite message store - append-only log, sync routing, eliminates async race conditions
3 files modified
changed files
lib/providers/providers.dart patch | view | blame | history
lib/screens/chat_screen.dart patch | view | blame | history
lib/services/message_store.dart patch | view | blame | history
lib/providers/providers.dart
....@@ -98,73 +98,48 @@
9898
9999 String? get currentSessionId => _currentSessionId;
100100
101
- /// Switch to a new session and load its messages.
102
- Future<void> switchSession(String sessionId) async {
103
- final trace = StackTrace.current.toString().split('\n').take(4).join(' | ');
104
- TraceService.instance.addTrace(
105
- 'switchSession',
106
- 'from=${_currentSessionId?.substring(0, 8) ?? "null"}(${state.length}) → ${sessionId.substring(0, 8)} | $trace',
107
- );
108
- // Write current session to disk
109
- if (_currentSessionId != null && state.isNotEmpty) {
110
- await MessageStore.writeDirect(_currentSessionId!, state);
111
- }
112
-
113
- // Skip reload if staying on the same session — messages are already in memory
101
+ /// Switch to a session. SYNCHRONOUS — no async gap, no race with incoming
102
+ /// messages. MessageStoreV2.loadSession reads from the in-memory index.
103
+ void switchSession(String sessionId) {
114104 if (_currentSessionId == sessionId) {
115
- TraceService.instance.addTrace('switchSession SKIP', 'already on ${sessionId.substring(0, 8)}');
105
+ TraceService.instance.addTrace(
106
+ 'switchSession SKIP', 'already on ${sessionId.substring(0, 8)}');
116107 return;
117108 }
118
-
109
+ TraceService.instance.addTrace(
110
+ 'switchSession',
111
+ 'from=${_currentSessionId?.substring(0, 8) ?? "null"}(${state.length}) → ${sessionId.substring(0, 8)}',
112
+ );
119113 _currentSessionId = sessionId;
120
- final messages = await MessageStore.loadAll(sessionId);
121
- // Merge: if addMessage ran during loadAll and added messages for THIS session,
122
- // they'll be in state but not in the loaded messages. Keep the longer list.
123
- if (state.length > messages.length && _currentSessionId == sessionId) {
124
- TraceService.instance.addTrace('switchSession MERGE', 'kept ${state.length} (loaded ${messages.length})');
125
- } else {
126
- state = messages;
127
- }
114
+ state = MessageStoreV2.loadSession(sessionId);
128115 }
129116
130
- /// Add a message to the current session.
117
+ /// Add a message to the current session (display + append-only persist).
131118 void addMessage(Message message) {
132119 state = [...state, message];
133120 if (_currentSessionId != null) {
134
- MessageStore.save(_currentSessionId!, state);
121
+ MessageStoreV2.append(_currentSessionId!, message);
135122 }
136123 }
137124
138
- /// Update a message by ID.
125
+ /// Update a message by ID (in-memory only — patch is not persisted to log).
139126 void updateMessage(String id, Message Function(Message) updater) {
140127 state = state.map((m) => m.id == id ? updater(m) : m).toList();
141
- if (_currentSessionId != null) {
142
- MessageStore.save(_currentSessionId!, state);
143
- }
144128 }
145129
146
- /// Remove a message by ID.
130
+ /// Remove a message by ID (in-memory only).
147131 void removeMessage(String id) {
148132 state = state.where((m) => m.id != id).toList();
149
- if (_currentSessionId != null) {
150
- MessageStore.save(_currentSessionId!, state);
151
- }
152133 }
153134
154
- /// Remove all messages matching a predicate.
135
+ /// Remove all messages matching a predicate (in-memory only).
155136 void removeWhere(bool Function(Message) test) {
156137 state = state.where((m) => !test(m)).toList();
157
- if (_currentSessionId != null) {
158
- MessageStore.save(_currentSessionId!, state);
159
- }
160138 }
161139
162
- /// Clear all messages for the current session.
140
+ /// Clear all messages for the current session (in-memory only).
163141 void clearMessages() {
164142 state = [];
165
- if (_currentSessionId != null) {
166
- MessageStore.save(_currentSessionId!, state);
167
- }
168143 }
169144
170145 void updateContent(String messageId, String content) {
....@@ -185,22 +160,6 @@
185160 else
186161 m,
187162 ];
188
- if (_currentSessionId != null) {
189
- MessageStore.save(_currentSessionId!, state);
190
- }
191
- }
192
-
193
- /// Load more (older) messages for pagination.
194
- Future<void> loadMore() async {
195
- if (_currentSessionId == null) return;
196
- final older = await MessageStore.load(
197
- _currentSessionId!,
198
- offset: state.length,
199
- limit: 50,
200
- );
201
- if (older.isNotEmpty) {
202
- state = [...older, ...state];
203
- }
204163 }
205164 }
206165
lib/screens/chat_screen.dart
....@@ -72,7 +72,8 @@
7272 final Set<int> _seenSeqs = {};
7373 bool _sessionReady = false;
7474 final List<Map<String, dynamic>> _pendingMessages = [];
75
- final Map<String, List<Message>> _catchUpPending = {};
75
+ // _catchUpPending removed: cross-session catch_up messages are now appended
76
+ // synchronously via MessageStoreV2.append() in the catch_up handler.
7677 List<String>? _cachedSessionOrder;
7778 Timer? _typingTimer;
7879 bool _unreadCountsLoaded = false;
....@@ -86,6 +87,9 @@
8687 }
8788
8889 Future<void> _initAll() async {
90
+ // Initialize append-only message store (reads log, rebuilds index, compacts).
91
+ await MessageStoreV2.initialize();
92
+
8993 // Load persisted state BEFORE connecting
9094 final prefs = await SharedPreferences.getInstance();
9195 _lastSeq = prefs.getInt('lastSeq') ?? 0;
....@@ -104,8 +108,8 @@
104108 final savedSessionId = prefs.getString('activeSessionId');
105109 if (savedSessionId != null && mounted) {
106110 ref.read(activeSessionIdProvider.notifier).state = savedSessionId;
107
- // Load messages for the restored session so chat isn't empty on startup
108
- await ref.read(messagesProvider.notifier).switchSession(savedSessionId);
111
+ // Synchronous: no async gap between load and any arriving messages.
112
+ ref.read(messagesProvider.notifier).switchSession(savedSessionId);
109113 }
110114 if (!mounted) return;
111115
....@@ -165,14 +169,11 @@
165169 _persistUnreadCounts(counts);
166170 }
167171
172
+ // ignore: unused_field
168173 bool _isLoadingMore = false;
169174 void _onScroll() {
170
- if (!_isLoadingMore &&
171
- _scrollController.position.pixels >=
172
- _scrollController.position.maxScrollExtent - 100) {
173
- _isLoadingMore = true;
174
- ref.read(messagesProvider.notifier).loadMore().then((_) => _isLoadingMore = false);
175
- }
175
+ // Pagination removed: all messages are loaded synchronously on session
176
+ // switch via the in-memory index. Nothing to do on scroll.
176177 }
177178
178179 // Helper: send a command to the gateway in the expected format
....@@ -377,18 +378,23 @@
377378 _lastSeq = serverSeq;
378379 _saveLastSeq();
379380 }
380
- // Merge catch_up messages: only add messages not already in local storage.
381
- // We check by content match against existing messages to avoid duplicates
382
- // while still picking up messages that arrived while the app was backgrounded.
381
+ // Merge catch_up messages: only add messages not already displayed.
382
+ // Dedup by content to avoid showing messages already in the UI.
383383 final catchUpMsgs = msg['messages'] as List<dynamic>?;
384384 if (catchUpMsgs != null && catchUpMsgs.isNotEmpty) {
385385 _isCatchingUp = true;
386386 final activeId = ref.read(activeSessionIdProvider);
387
+ final currentId = ref.read(messagesProvider.notifier).currentSessionId;
387388 final existing = ref.read(messagesProvider);
388389 final existingContents = existing
389390 .where((m) => m.role == MessageRole.assistant)
390391 .map((m) => m.content)
391392 .toSet();
393
+
394
+ // Collect cross-session sessions that received messages (for toasts).
395
+ final crossSessionCounts = <String, int>{};
396
+ final crossSessionPreviews = <String, String>{};
397
+
392398 for (final m in catchUpMsgs) {
393399 final map = m as Map<String, dynamic>;
394400 final msgType = map['type'] as String? ?? 'text';
....@@ -417,52 +423,46 @@
417423 );
418424 }
419425
420
- _chatLog('catch_up msg: session=${msgSessionId?.substring(0, 8) ?? "NULL"} active=${activeId?.substring(0, 8)} match=${msgSessionId == activeId || msgSessionId == null} content="${content.substring(0, content.length.clamp(0, 40))}"');
421
- if (msgSessionId == null || msgSessionId == activeId) {
422
- // Active session or no session: add directly to chat
426
+ _chatLog('catch_up msg: session=${msgSessionId?.substring(0, 8) ?? "NULL"} active=${activeId?.substring(0, 8)} content="${content.substring(0, content.length.clamp(0, 40))}"');
427
+
428
+ if (msgSessionId == null || msgSessionId == currentId) {
429
+ // Active session or no session: add to UI (addMessage also appends to log).
423430 ref.read(messagesProvider.notifier).addMessage(message);
424431 } else {
425
- // Different session: store + unread badge + toast
426
- // Collect for batch storage below to avoid race condition
427
- _catchUpPending.putIfAbsent(msgSessionId, () => []).add(message);
432
+ // Cross-session: synchronous append — no race condition.
433
+ MessageStoreV2.append(msgSessionId, message);
428434 _incrementUnread(msgSessionId);
435
+ crossSessionCounts[msgSessionId] = (crossSessionCounts[msgSessionId] ?? 0) + 1;
436
+ crossSessionPreviews.putIfAbsent(msgSessionId, () => content);
429437 }
430438 existingContents.add(content);
431439 }
440
+
432441 _isCatchingUp = false;
433442 _scrollToBottom();
434
- // Batch-store cross-session messages (sequential to avoid race condition)
435
- if (_catchUpPending.isNotEmpty) {
436
- final pending = Map<String, List<Message>>.from(_catchUpPending);
437
- _catchUpPending.clear();
438
- // Show one toast per session with message count
439
- if (mounted) {
440
- final sessions = ref.read(sessionsProvider);
441
- for (final entry in pending.entries) {
442
- final session = sessions.firstWhere(
443
- (s) => s.id == entry.key,
444
- orElse: () => Session(id: entry.key, index: 0, name: 'Unknown', type: 'claude'),
445
- );
446
- final count = entry.value.length;
447
- final preview = count == 1
448
- ? entry.value.first.content
449
- : '$count messages';
450
- ToastManager.show(
451
- context,
452
- sessionName: session.name,
453
- preview: preview.length > 100 ? '${preview.substring(0, 100)}...' : preview,
454
- onTap: () => _switchSession(entry.key),
455
- );
456
- }
443
+
444
+ // Show one toast per cross-session that received messages.
445
+ if (crossSessionCounts.isNotEmpty && mounted) {
446
+ final sessions = ref.read(sessionsProvider);
447
+ for (final entry in crossSessionCounts.entries) {
448
+ final sid = entry.key;
449
+ final count = entry.value;
450
+ final session = sessions.firstWhere(
451
+ (s) => s.id == sid,
452
+ orElse: () => Session(id: sid, index: 0, name: 'Unknown', type: 'claude'),
453
+ );
454
+ final preview = count == 1
455
+ ? (crossSessionPreviews[sid] ?? '')
456
+ : '$count messages';
457
+ ToastManager.show(
458
+ context,
459
+ sessionName: session.name,
460
+ preview: preview.length > 100 ? '${preview.substring(0, 100)}...' : preview,
461
+ onTap: () => _switchSession(sid),
462
+ );
457463 }
458
- () async {
459
- for (final entry in pending.entries) {
460
- final existing = await MessageStore.loadAll(entry.key);
461
- MessageStore.save(entry.key, [...existing, ...entry.value]);
462
- await MessageStore.flush();
463
- }
464
- }();
465464 }
465
+
466466 // Clear unread for active session
467467 if (activeId != null) {
468468 final counts = Map<String, int>.from(ref.read(unreadCountsProvider));
....@@ -500,6 +500,7 @@
500500 orElse: () => sessions.first,
501501 );
502502 ref.read(activeSessionIdProvider.notifier).state = active.id;
503
+ // Synchronous session switch — no async gap.
503504 ref.read(messagesProvider.notifier).switchSession(active.id);
504505 SharedPreferences.getInstance().then((p) => p.setString('activeSessionId', active.id));
505506 }
....@@ -520,7 +521,7 @@
520521 }
521522 }
522523
523
- Future<void> _handleIncomingMessage(Map<String, dynamic> msg) async {
524
+ void _handleIncomingMessage(Map<String, dynamic> msg) {
524525 final sessionId = msg['sessionId'] as String?;
525526 final content = msg['content'] as String? ??
526527 msg['text'] as String? ??
....@@ -537,16 +538,16 @@
537538 status: MessageStatus.sent,
538539 );
539540
540
- // Use currentSessionId from notifier (what's actually loaded in the provider)
541
- // not activeSessionIdProvider (can be stale after background resume)
541
+ // Use currentSessionId from notifier (what's actually loaded in the provider),
542
+ // not activeSessionIdProvider (can be stale after background resume).
542543 final currentId = ref.read(messagesProvider.notifier).currentSessionId;
543544 if (sessionId != null && sessionId != currentId) {
544
- // Store message for the other session so it's there when user switches
545
+ // Append directly to the log for the target session — synchronous, no race.
545546 TraceService.instance.addTrace(
546547 'message stored for session',
547548 'sessionId=${sessionId.substring(0, sessionId.length.clamp(0, 8))}, toast shown',
548549 );
549
- await _storeForSession(sessionId, message);
550
+ MessageStoreV2.append(sessionId, message);
550551 _incrementUnread(sessionId);
551552 final sessions = ref.read(sessionsProvider);
552553 final session = sessions.firstWhere(
....@@ -616,9 +617,10 @@
616617 final currentId = ref.read(messagesProvider.notifier).currentSessionId;
617618 _chatLog('voice: sessionId=$sessionId currentId=$currentId audioPath=$savedAudioPath content="${content.substring(0, content.length.clamp(0, 30))}"');
618619 if (sessionId != null && sessionId != currentId) {
619
- _chatLog('voice: cross-session, storing for $sessionId');
620
- await _storeForSession(sessionId, storedMessage);
621
- _chatLog('voice: stored, incrementing unread');
620
+ _chatLog('voice: cross-session, appending to store for $sessionId');
621
+ // Synchronous append — no async gap, no race condition.
622
+ MessageStoreV2.append(sessionId, storedMessage);
623
+ _chatLog('voice: appended, incrementing unread');
622624 _incrementUnread(sessionId);
623625 final sessions = ref.read(sessionsProvider);
624626 final session = sessions.firstWhere(
....@@ -684,10 +686,10 @@
684686 status: MessageStatus.sent,
685687 );
686688
687
- // Cross-session routing: store for target session if not currently loaded
689
+ // Cross-session routing: append to log for target session if not currently loaded.
688690 final currentId = ref.read(messagesProvider.notifier).currentSessionId;
689691 if (sessionId != null && sessionId != currentId) {
690
- _storeForSession(sessionId, message);
692
+ MessageStoreV2.append(sessionId, message);
691693 _incrementUnread(sessionId);
692694 return;
693695 }
....@@ -697,51 +699,19 @@
697699 _scrollToBottom();
698700 }
699701
700
- /// Store a message for a non-active session so it persists when the user switches to it.
701
- Future<void> _storeForSession(String sessionId, Message message) async {
702
- final existing = await MessageStore.loadAll(sessionId);
703
- _chatLog('storeForSession: $sessionId existing=${existing.length} adding type=${message.type.name} content="${message.content.substring(0, message.content.length.clamp(0, 30))}" audioUri=${message.audioUri != null ? "set(${message.audioUri!.length})" : "null"}');
704
- MessageStore.save(sessionId, [...existing, message]);
705
- await MessageStore.flush();
706
- // Verify
707
- final verify = await MessageStore.loadAll(sessionId);
708
- _chatLog('storeForSession: verified ${verify.length} messages after save');
702
+ /// Superseded by MessageStoreV2.append() — call sites now use the synchronous
703
+ /// append directly. Kept as dead code until all callers are confirmed removed.
704
+ // ignore: unused_element
705
+ void _storeForSession(String sessionId, Message message) {
706
+ MessageStoreV2.append(sessionId, message);
709707 }
710708
711
- /// Update a transcript for a message stored on disk (not in the active session).
712
- /// Scans all session files to find the message by ID, updates content, and saves.
709
+ /// With the append-only log, transcript updates for cross-session messages
710
+ /// are not patched back to disk (the append-only design doesn't support
711
+ /// in-place edits). The transcript is updated in-memory if the message is
712
+ /// in the active session. Cross-session transcript updates are a no-op.
713713 Future<void> _updateTranscriptOnDisk(String messageId, String content) async {
714
- try {
715
- final dir = await getApplicationDocumentsDirectory();
716
- final msgDir = Directory('${dir.path}/messages');
717
- if (!await msgDir.exists()) return;
718
-
719
- await for (final entity in msgDir.list()) {
720
- if (entity is! File || !entity.path.endsWith('.json')) continue;
721
-
722
- final jsonStr = await entity.readAsString();
723
- final List<dynamic> jsonList = jsonDecode(jsonStr) as List<dynamic>;
724
- bool found = false;
725
-
726
- final updated = jsonList.map((j) {
727
- final map = j as Map<String, dynamic>;
728
- if (map['id'] == messageId) {
729
- found = true;
730
- return {...map, 'content': content};
731
- }
732
- return map;
733
- }).toList();
734
-
735
- if (found) {
736
- await entity.writeAsString(jsonEncode(updated));
737
- _chatLog('transcript: updated messageId=$messageId on disk in ${entity.path.split('/').last}');
738
- return;
739
- }
740
- }
741
- _chatLog('transcript: messageId=$messageId not found on disk');
742
- } catch (e) {
743
- _chatLog('transcript: disk update error=$e');
744
- }
714
+ _chatLog('transcript: cross-session update for messageId=$messageId — in-memory only (append-only log)');
745715 }
746716
747717 void _incrementUnread(String sessionId) {
....@@ -770,7 +740,8 @@
770740 ref.read(isTypingProvider.notifier).state = false;
771741
772742 ref.read(activeSessionIdProvider.notifier).state = sessionId;
773
- await ref.read(messagesProvider.notifier).switchSession(sessionId);
743
+ // Synchronous — no async gap between session switch and incoming messages.
744
+ ref.read(messagesProvider.notifier).switchSession(sessionId);
774745 SharedPreferences.getInstance().then((p) => p.setString('activeSessionId', sessionId));
775746
776747 final counts = Map<String, int>.from(ref.read(unreadCountsProvider));
lib/services/message_store.dart
....@@ -1,4 +1,3 @@
1
-import 'dart:async';
21 import 'dart:convert';
32 import 'dart:io';
43
....@@ -8,30 +7,39 @@
87 import '../models/message.dart';
98 import 'trace_service.dart';
109
11
-/// Per-session JSON file persistence with debounced saves.
12
-class MessageStore {
13
- MessageStore._();
10
+/// Append-only log-based message persistence.
11
+///
12
+/// Layout:
13
+/// messages/log.jsonl — one JSON object per line, each a serialized Message
14
+/// messages/index.json — { "sessionId": [lineNumber, ...] }
15
+///
16
+/// All writes are synchronous (writeAsStringSync with FileMode.append) to
17
+/// prevent race conditions between concurrent addMessage / switchSession calls.
18
+class MessageStoreV2 {
19
+ MessageStoreV2._();
20
+
21
+ static const _backupChannel = MethodChannel('com.mnsoft.pailot/backup');
22
+
23
+ // In-memory index: sessionId -> list of 0-based line numbers in log.jsonl
24
+ static final Map<String, List<int>> _index = {};
25
+
26
+ // Number of lines currently in the log (= next line number to write)
27
+ static int _lineCount = 0;
28
+
29
+ // Flush the index to disk every N appends to amortise I/O
30
+ static const _indexFlushInterval = 20;
31
+ static int _appendsSinceFlush = 0;
1432
1533 static Directory? _baseDir;
16
- static Timer? _debounceTimer;
17
- static final Map<String, List<Message>> _pendingSaves = {};
18
- // Per-session lock to prevent concurrent read/write on the same file
19
- static final Map<String, Completer<void>> _locks = {};
2034
21
- static const _backupChannel =
22
- MethodChannel('com.mnsoft.pailot/backup');
35
+ // ------------------------------------------------------------------ init --
2336
24
- /// Initialize the base directory for message storage.
25
- /// On iOS, the directory is excluded from iCloud / iTunes backup so that
26
- /// large base64 image attachments do not bloat the user's cloud storage.
27
- /// Messages can be re-fetched from the server if needed.
2837 static Future<Directory> _getBaseDir() async {
2938 if (_baseDir != null) return _baseDir!;
3039 final appDir = await getApplicationDocumentsDirectory();
3140 _baseDir = Directory('${appDir.path}/messages');
32
- final created = !await _baseDir!.exists();
33
- if (created) {
34
- await _baseDir!.create(recursive: true);
41
+ if (!_baseDir!.existsSync()) {
42
+ _baseDir!.createSync(recursive: true);
3543 }
3644 // Exclude from iCloud / iTunes backup (best-effort, iOS only).
3745 if (Platform.isIOS) {
....@@ -40,144 +48,243 @@
4048 'excludeFromBackup',
4149 _baseDir!.path,
4250 );
43
- } catch (_) {
44
- // Non-fatal: if the channel call fails, backup exclusion is skipped.
45
- }
51
+ } catch (_) {}
4652 }
4753 return _baseDir!;
4854 }
4955
50
- static String _fileForSession(String sessionId) {
51
- // Sanitize session ID for filename
52
- final safe = sessionId.replaceAll(RegExp(r'[^\w\-]'), '_');
53
- return 'session_$safe.json';
54
- }
56
+ static String _logPath(Directory dir) => '${dir.path}/log.jsonl';
57
+ static String _indexPath(Directory dir) => '${dir.path}/index.json';
5558
56
- /// Save messages for a session with 1-second debounce.
57
- static void save(String sessionId, List<Message> messages) {
58
- _pendingSaves[sessionId] = messages;
59
- _debounceTimer?.cancel();
60
- _debounceTimer = Timer(const Duration(seconds: 1), _flushAll);
61
- }
62
-
63
- /// Write directly to disk, bypassing debounce. For critical saves.
64
- static Future<void> writeDirect(String sessionId, List<Message> messages) async {
65
- _debounceTimer?.cancel();
66
- _pendingSaves.remove(sessionId);
67
- await _withLock(sessionId, () => _writeSession(sessionId, messages));
68
- }
69
-
70
- /// Acquire a per-session lock, run the operation, release.
71
- static Future<T> _withLock<T>(String sessionId, Future<T> Function() fn) async {
72
- // Wait for any existing operation on this session to finish
73
- while (_locks.containsKey(sessionId)) {
74
- await _locks[sessionId]!.future;
75
- }
76
- final completer = Completer<void>();
77
- _locks[sessionId] = completer;
78
- try {
79
- return await fn();
80
- } finally {
81
- _locks.remove(sessionId);
82
- completer.complete();
83
- }
84
- }
85
-
86
- /// Immediately flush all pending saves.
87
- static Future<void> flush() async {
88
- _debounceTimer?.cancel();
89
- await _flushAll();
90
- }
91
-
92
- static Future<void> _flushAll() async {
93
- final entries = Map<String, List<Message>>.from(_pendingSaves);
94
- _pendingSaves.clear();
95
-
96
- for (final entry in entries.entries) {
97
- await _withLock(entry.key, () => _writeSession(entry.key, entry.value));
98
- }
99
- }
100
-
101
- static Future<void> _writeSession(
102
- String sessionId, List<Message> messages) async {
59
+ /// Called once at app startup. Reads log.jsonl and rebuilds the in-memory
60
+ /// index. Then calls compact() to trim old messages.
61
+ static Future<void> initialize() async {
10362 try {
10463 final dir = await _getBaseDir();
105
- final file = File('${dir.path}/${_fileForSession(sessionId)}');
106
- // Strip heavy fields for persistence
107
- final lightMessages = messages.map((m) => m.toJsonLight()).toList();
108
- final json = jsonEncode(lightMessages);
109
- await file.writeAsString(json);
110
- TraceService.instance.addTrace('MsgStore WRITE', '${sessionId.substring(0, 8)}: ${messages.length} msgs');
64
+ final logFile = File(_logPath(dir));
65
+ final indexFile = File(_indexPath(dir));
66
+
67
+ // Try loading saved index first (fast path).
68
+ if (indexFile.existsSync()) {
69
+ try {
70
+ final raw = indexFile.readAsStringSync();
71
+ final decoded = jsonDecode(raw) as Map<String, dynamic>;
72
+ for (final entry in decoded.entries) {
73
+ _index[entry.key] =
74
+ (entry.value as List<dynamic>).map((e) => e as int).toList();
75
+ }
76
+ } catch (_) {
77
+ _index.clear();
78
+ }
79
+ }
80
+
81
+ // Count actual lines in log to set _lineCount.
82
+ if (logFile.existsSync()) {
83
+ final content = logFile.readAsStringSync();
84
+ _lineCount = content.isEmpty
85
+ ? 0
86
+ : content.trimRight().split('\n').length;
87
+ } else {
88
+ _lineCount = 0;
89
+ }
90
+
91
+ // If the index was missing or corrupt, rebuild from log.
92
+ if (_index.isEmpty && _lineCount > 0) {
93
+ await _rebuildIndex(logFile);
94
+ }
95
+
96
+ TraceService.instance.addTrace(
97
+ 'MsgStoreV2 INIT', '$_lineCount lines, ${_index.length} sessions');
98
+
99
+ // Compact on startup (keeps last 200 per session).
100
+ await compact();
111101 } catch (e) {
112
- TraceService.instance.addTrace('MsgStore WRITE ERROR', '${sessionId.substring(0, 8)}: $e');
102
+ TraceService.instance.addTrace('MsgStoreV2 INIT ERROR', '$e');
113103 }
114104 }
115105
116
- /// Load messages for a session.
117
- /// [limit] controls how many recent messages to return (default: 50).
118
- /// [offset] is the number of messages to skip from the end (for pagination).
119
- static Future<List<Message>> load(
120
- String sessionId, {
121
- int limit = 50,
122
- int offset = 0,
123
- }) async {
106
+ static Future<void> _rebuildIndex(File logFile) async {
107
+ _index.clear();
108
+ final lines = logFile.readAsLinesSync();
109
+ for (var i = 0; i < lines.length; i++) {
110
+ final line = lines[i].trim();
111
+ if (line.isEmpty) continue;
112
+ try {
113
+ final map = jsonDecode(line) as Map<String, dynamic>;
114
+ final sessionId = map['sessionId'] as String?;
115
+ if (sessionId != null) {
116
+ _index.putIfAbsent(sessionId, () => []).add(i);
117
+ }
118
+ } catch (_) {}
119
+ }
120
+ }
121
+
122
+ // --------------------------------------------------------------- append --
123
+
124
+ /// Append a message to the log. SYNCHRONOUS — no async gap, no race.
125
+ ///
126
+ /// Each line written includes a 'sessionId' field so the index can be
127
+ /// rebuilt from the log alone if needed.
128
+ static void append(String sessionId, Message message) {
124129 try {
125
- final dir = await _getBaseDir();
126
- final file = File('${dir.path}/${_fileForSession(sessionId)}');
127
- if (!await file.exists()) return [];
130
+ final dir = _baseDir;
131
+ if (dir == null) {
132
+ // initialize() hasn't been called yet — silently drop (shouldn't happen).
133
+ TraceService.instance
134
+ .addTrace('MsgStoreV2 APPEND WARN', 'baseDir null, dropping');
135
+ return;
136
+ }
137
+ final logFile = File(_logPath(dir));
138
+ final json = message.toJsonLight();
139
+ json['sessionId'] = sessionId;
140
+ final line = '${jsonEncode(json)}\n';
128141
129
- final jsonStr = await file.readAsString();
130
- final List<dynamic> jsonList = jsonDecode(jsonStr) as List<dynamic>;
131
- final allMessages = jsonList
132
- .map((j) => _messageFromJson(j as Map<String, dynamic>))
133
- .where((m) => !m.isEmptyVoice && !m.isEmptyText)
134
- .toList();
142
+ // Synchronous append — atomic single write, no read-modify-write.
143
+ logFile.writeAsStringSync(line, mode: FileMode.append);
135144
136
- // Paginate from the end (newest messages first in storage)
137
- if (offset >= allMessages.length) return [];
138
- final end = allMessages.length - offset;
139
- final start = (end - limit).clamp(0, end);
140
- return allMessages.sublist(start, end);
145
+ // Update in-memory index.
146
+ _index.putIfAbsent(sessionId, () => []).add(_lineCount);
147
+ _lineCount++;
148
+
149
+ // Periodically flush index to disk.
150
+ _appendsSinceFlush++;
151
+ if (_appendsSinceFlush >= _indexFlushInterval) {
152
+ _flushIndex(dir);
153
+ _appendsSinceFlush = 0;
154
+ }
141155 } catch (e) {
156
+ TraceService.instance.addTrace('MsgStoreV2 APPEND ERROR', '$e');
157
+ }
158
+ }
159
+
160
+ // -------------------------------------------------------------- load --
161
+
162
+ /// Load messages for a session. SYNCHRONOUS — reads from the log using the
163
+ /// in-memory index. Safe to call from switchSession without async gaps.
164
+ static List<Message> loadSession(String sessionId) {
165
+ try {
166
+ final dir = _baseDir;
167
+ if (dir == null) return [];
168
+ final logFile = File(_logPath(dir));
169
+ if (!logFile.existsSync()) return [];
170
+
171
+ final lineNumbers = _index[sessionId];
172
+ if (lineNumbers == null || lineNumbers.isEmpty) return [];
173
+
174
+ // Read all lines at once then pick the ones we need.
175
+ final allLines = logFile.readAsLinesSync();
176
+ final messages = <Message>[];
177
+
178
+ for (final n in lineNumbers) {
179
+ if (n >= allLines.length) continue;
180
+ final line = allLines[n].trim();
181
+ if (line.isEmpty) continue;
182
+ try {
183
+ final map = jsonDecode(line) as Map<String, dynamic>;
184
+ // Remove synthetic sessionId field before deserialising.
185
+ map.remove('sessionId');
186
+ final msg = _messageFromJson(map);
187
+ if (!msg.isEmptyVoice && !msg.isEmptyText) {
188
+ messages.add(msg);
189
+ }
190
+ } catch (_) {}
191
+ }
192
+
193
+ TraceService.instance.addTrace(
194
+ 'MsgStoreV2 LOAD', '${sessionId.substring(0, 8)}: ${messages.length} msgs');
195
+ return messages;
196
+ } catch (e) {
197
+ TraceService.instance
198
+ .addTrace('MsgStoreV2 LOAD ERROR', '${sessionId.substring(0, 8)}: $e');
142199 return [];
143200 }
144201 }
145202
146
- /// Load all messages for a session (no pagination).
147
- static Future<List<Message>> loadAll(String sessionId) async {
148
- return _withLock(sessionId, () => _loadAllImpl(sessionId));
149
- }
203
+ // ------------------------------------------------------------- compact --
150204
151
- static Future<List<Message>> _loadAllImpl(String sessionId) async {
205
+ /// Rewrite the log keeping at most [keepPerSession] messages per session.
206
+ /// Called once on startup after initialize(). NOT called during normal use.
207
+ static Future<void> compact({int keepPerSession = 200}) async {
152208 try {
153209 final dir = await _getBaseDir();
154
- final file = File('${dir.path}/${_fileForSession(sessionId)}');
155
- if (!await file.exists()) return [];
210
+ final logFile = File(_logPath(dir));
211
+ if (!logFile.existsSync()) return;
156212
157
- final jsonStr = await file.readAsString();
158
- final List<dynamic> jsonList = jsonDecode(jsonStr) as List<dynamic>;
159
- final msgs = jsonList
160
- .map((j) => _messageFromJson(j as Map<String, dynamic>))
161
- .where((m) => !m.isEmptyVoice && !m.isEmptyText)
162
- .toList();
163
- TraceService.instance.addTrace('MsgStore LOAD', '${sessionId.substring(0, 8)}: ${msgs.length} msgs');
164
- return msgs;
213
+ final allLines = logFile.readAsLinesSync();
214
+ if (allLines.length < 500) return; // nothing worth compacting
215
+
216
+ // Build a set of line numbers to keep: last keepPerSession per session.
217
+ final keepLines = <int>{};
218
+ for (final entry in _index.entries) {
219
+ final lines = entry.value;
220
+ final start = lines.length > keepPerSession
221
+ ? lines.length - keepPerSession
222
+ : 0;
223
+ for (var i = start; i < lines.length; i++) {
224
+ keepLines.add(lines[i]);
225
+ }
226
+ }
227
+
228
+ if (keepLines.length == allLines.length) return; // nothing removed
229
+
230
+ // Rewrite the log with only the kept lines, rebuilding the index.
231
+ final newIndex = <String, List<int>>{};
232
+ final buffer = StringBuffer();
233
+ var newLine = 0;
234
+
235
+ for (var i = 0; i < allLines.length; i++) {
236
+ if (!keepLines.contains(i)) continue;
237
+ final line = allLines[i].trim();
238
+ if (line.isEmpty) continue;
239
+ buffer.write('$line\n');
240
+ // Extract sessionId for new index.
241
+ try {
242
+ final map = jsonDecode(line) as Map<String, dynamic>;
243
+ final sid = map['sessionId'] as String?;
244
+ if (sid != null) {
245
+ newIndex.putIfAbsent(sid, () => []).add(newLine);
246
+ }
247
+ } catch (_) {}
248
+ newLine++;
249
+ }
250
+
251
+ logFile.writeAsStringSync(buffer.toString());
252
+ _index
253
+ ..clear()
254
+ ..addAll(newIndex);
255
+ _lineCount = newLine;
256
+ _flushIndex(dir);
257
+
258
+ TraceService.instance.addTrace(
259
+ 'MsgStoreV2 COMPACT', '${allLines.length} → $newLine lines');
165260 } catch (e) {
166
- TraceService.instance.addTrace('MsgStore LOAD ERROR', '${sessionId.substring(0, 8)}: $e');
167
- return [];
261
+ TraceService.instance.addTrace('MsgStoreV2 COMPACT ERROR', '$e');
168262 }
169263 }
170264
171
- /// Deserialize a message from JSON, applying migration rules:
172
- /// - Voice messages without audioUri are downgraded to text (transcript only).
173
- /// This handles messages saved before a restart, where the temp audio file
174
- /// is no longer available. The transcript (content) is preserved.
265
+ // --------------------------------------------------------- index flush --
266
+
267
+ static void _flushIndex(Directory dir) {
268
+ try {
269
+ final indexMap = _index.map(
270
+ (k, v) => MapEntry(k, v));
271
+ File(_indexPath(dir))
272
+ .writeAsStringSync(jsonEncode(indexMap));
273
+ } catch (_) {}
274
+ }
275
+
276
+ /// Force-flush the index to disk (call on app suspend / session switch).
277
+ static void flushIndex() {
278
+ if (_baseDir != null) _flushIndex(_baseDir!);
279
+ }
280
+
281
+ // ------------------------------------------------- migration helper --
282
+
283
+ /// Deserialize a message, downgrading voice→text if audio is unavailable.
175284 static Message _messageFromJson(Map<String, dynamic> json) {
176285 final raw = Message.fromJson(json);
177286 if (raw.type == MessageType.voice &&
178287 (raw.audioUri == null || raw.audioUri!.isEmpty)) {
179
- // Downgrade to text so the bubble shows the transcript instead of a
180
- // broken play button.
181288 return Message(
182289 id: raw.id,
183290 role: raw.role,
....@@ -191,25 +298,18 @@
191298 return raw;
192299 }
193300
194
- /// Delete stored messages for a session.
195
- static Future<void> delete(String sessionId) async {
196
- try {
197
- final dir = await _getBaseDir();
198
- final file = File('${dir.path}/${_fileForSession(sessionId)}');
199
- if (await file.exists()) {
200
- await file.delete();
201
- }
202
- } catch (_) {}
203
- }
301
+ // --------------------------------------------------------- clear all --
204302
205
- /// Clear all stored messages.
303
+ /// Wipe everything (used from settings / debug).
206304 static Future<void> clearAll() async {
207305 try {
208306 final dir = await _getBaseDir();
209
- if (await dir.exists()) {
210
- await dir.delete(recursive: true);
211
- await dir.create(recursive: true);
307
+ if (dir.existsSync()) {
308
+ dir.deleteSync(recursive: true);
309
+ dir.createSync(recursive: true);
212310 }
311
+ _index.clear();
312
+ _lineCount = 0;
213313 } catch (_) {}
214314 }
215315 }