| lib/providers/providers.dart | patch | view | blame | history | |
| lib/screens/chat_screen.dart | patch | view | blame | history | |
| lib/services/message_store.dart | patch | view | blame | history |
lib/providers/providers.dart
.. .. @@ -98,73 +98,48 @@ 98 98 99 99 String? get currentSessionId => _currentSessionId; 100 100 101 - /// Switch to a new session and load its messages.102 - Future<void> switchSession(String sessionId) async {103 - final trace = StackTrace.current.toString().split('\n').take(4).join(' | ');104 - TraceService.instance.addTrace(105 - 'switchSession',106 - 'from=${_currentSessionId?.substring(0, 8) ?? "null"}(${state.length}) → ${sessionId.substring(0, 8)} | $trace',107 - );108 - // Write current session to disk109 - if (_currentSessionId != null && state.isNotEmpty) {110 - await MessageStore.writeDirect(_currentSessionId!, state);111 - }112 -113 - // Skip reload if staying on the same session — messages are already in memory101 + /// Switch to a session. SYNCHRONOUS — no async gap, no race with incoming102 + /// messages. MessageStoreV2.loadSession reads from the in-memory index.103 + void switchSession(String sessionId) {114 104 if (_currentSessionId == sessionId) { 115 - TraceService.instance.addTrace('switchSession SKIP', 'already on ${sessionId.substring(0, 8)}');105 + TraceService.instance.addTrace(106 + 'switchSession SKIP', 'already on ${sessionId.substring(0, 8)}');116 107 return; 117 108 } 118 -109 + TraceService.instance.addTrace(110 + 'switchSession',111 + 'from=${_currentSessionId?.substring(0, 8) ?? "null"}(${state.length}) → ${sessionId.substring(0, 8)}',112 + );119 113 _currentSessionId = sessionId; 120 - final messages = await MessageStore.loadAll(sessionId);121 - // Merge: if addMessage ran during loadAll and added messages for THIS session,122 - // they'll be in state but not in the loaded messages. Keep the longer list.123 - if (state.length > messages.length && _currentSessionId == sessionId) {124 - TraceService.instance.addTrace('switchSession MERGE', 'kept ${state.length} (loaded ${messages.length})');125 - } else {126 - state = messages;127 - }114 + state = MessageStoreV2.loadSession(sessionId);128 115 } 129 116 130 - /// Add a message to the current session.117 + /// Add a message to the current session (display + append-only persist).131 118 void addMessage(Message message) { 132 119 state = [...state, message]; 133 120 if (_currentSessionId != null) { 134 - MessageStore.save(_currentSessionId!, state);121 + MessageStoreV2.append(_currentSessionId!, message);135 122 } 136 123 } 137 124 138 - /// Update a message by ID.125 + /// Update a message by ID (in-memory only — patch is not persisted to log).139 126 void updateMessage(String id, Message Function(Message) updater) { 140 127 state = state.map((m) => m.id == id ? updater(m) : m).toList(); 141 - if (_currentSessionId != null) {142 - MessageStore.save(_currentSessionId!, state);143 - }144 128 } 145 129 146 - /// Remove a message by ID.130 + /// Remove a message by ID (in-memory only).147 131 void removeMessage(String id) { 148 132 state = state.where((m) => m.id != id).toList(); 149 - if (_currentSessionId != null) {150 - MessageStore.save(_currentSessionId!, state);151 - }152 133 } 153 134 154 - /// Remove all messages matching a predicate.135 + /// Remove all messages matching a predicate (in-memory only).155 136 void removeWhere(bool Function(Message) test) { 156 137 state = state.where((m) => !test(m)).toList(); 157 - if (_currentSessionId != null) {158 - MessageStore.save(_currentSessionId!, state);159 - }160 138 } 161 139 162 - /// Clear all messages for the current session.140 + /// Clear all messages for the current session (in-memory only).163 141 void clearMessages() { 164 142 state = []; 165 - if (_currentSessionId != null) {166 - MessageStore.save(_currentSessionId!, state);167 - }168 143 } 169 144 170 145 void updateContent(String messageId, String content) { .. .. @@ -185,22 +160,6 @@ 185 160 else 186 161 m, 187 162 ]; 188 - if (_currentSessionId != null) {189 - MessageStore.save(_currentSessionId!, state);190 - }191 - }192 -193 - /// Load more (older) messages for pagination.194 - Future<void> loadMore() async {195 - if (_currentSessionId == null) return;196 - final older = await MessageStore.load(197 - _currentSessionId!,198 - offset: state.length,199 - limit: 50,200 - );201 - if (older.isNotEmpty) {202 - state = [...older, ...state];203 - }204 163 } 205 164 } 206 165 lib/screens/chat_screen.dart
.. .. @@ -72,7 +72,8 @@ 72 72 final Set<int> _seenSeqs = {}; 73 73 bool _sessionReady = false; 74 74 final List<Map<String, dynamic>> _pendingMessages = []; 75 - final Map<String, List<Message>> _catchUpPending = {};75 + // _catchUpPending removed: cross-session catch_up messages are now appended76 + // synchronously via MessageStoreV2.append() in the catch_up handler.76 77 List<String>? _cachedSessionOrder; 77 78 Timer? _typingTimer; 78 79 bool _unreadCountsLoaded = false; .. .. @@ -86,6 +87,9 @@ 86 87 } 87 88 88 89 Future<void> _initAll() async { 90 + // Initialize append-only message store (reads log, rebuilds index, compacts).91 + await MessageStoreV2.initialize();92 +89 93 // Load persisted state BEFORE connecting 90 94 final prefs = await SharedPreferences.getInstance(); 91 95 _lastSeq = prefs.getInt('lastSeq') ?? 0; .. .. @@ -104,8 +108,8 @@ 104 108 final savedSessionId = prefs.getString('activeSessionId'); 105 109 if (savedSessionId != null && mounted) { 106 110 ref.read(activeSessionIdProvider.notifier).state = savedSessionId; 107 - // Load messages for the restored session so chat isn't empty on startup108 - await ref.read(messagesProvider.notifier).switchSession(savedSessionId);111 + // Synchronous: no async gap between load and any arriving messages.112 + ref.read(messagesProvider.notifier).switchSession(savedSessionId);109 113 } 110 114 if (!mounted) return; 111 115 .. .. @@ -165,14 +169,11 @@ 165 169 _persistUnreadCounts(counts); 166 170 } 167 171 172 + // ignore: unused_field168 173 bool _isLoadingMore = false; 169 174 void _onScroll() { 170 - if (!_isLoadingMore &&171 - _scrollController.position.pixels >=172 - _scrollController.position.maxScrollExtent - 100) {173 - _isLoadingMore = true;174 - ref.read(messagesProvider.notifier).loadMore().then((_) => _isLoadingMore = false);175 - }175 + // Pagination removed: all messages are loaded synchronously on session176 + // switch via the in-memory index. Nothing to do on scroll.176 177 } 177 178 178 179 // Helper: send a command to the gateway in the expected format .. .. @@ -377,18 +378,23 @@ 377 378 _lastSeq = serverSeq; 378 379 _saveLastSeq(); 379 380 } 380 - // Merge catch_up messages: only add messages not already in local storage.381 - // We check by content match against existing messages to avoid duplicates382 - // while still picking up messages that arrived while the app was backgrounded.381 + // Merge catch_up messages: only add messages not already displayed.382 + // Dedup by content to avoid showing messages already in the UI.383 383 final catchUpMsgs = msg['messages'] as List<dynamic>?; 384 384 if (catchUpMsgs != null && catchUpMsgs.isNotEmpty) { 385 385 _isCatchingUp = true; 386 386 final activeId = ref.read(activeSessionIdProvider); 387 + final currentId = ref.read(messagesProvider.notifier).currentSessionId;387 388 final existing = ref.read(messagesProvider); 388 389 final existingContents = existing 389 390 .where((m) => m.role == MessageRole.assistant) 390 391 .map((m) => m.content) 391 392 .toSet(); 393 +394 + // Collect cross-session sessions that received messages (for toasts).395 + final crossSessionCounts = <String, int>{};396 + final crossSessionPreviews = <String, String>{};397 +392 398 for (final m in catchUpMsgs) { 393 399 final map = m as Map<String, dynamic>; 394 400 final msgType = map['type'] as String? ?? 'text'; .. .. @@ -417,52 +423,46 @@ 417 423 ); 418 424 } 419 425 420 - _chatLog('catch_up msg: session=${msgSessionId?.substring(0, 8) ?? "NULL"} active=${activeId?.substring(0, 8)} match=${msgSessionId == activeId || msgSessionId == null} content="${content.substring(0, content.length.clamp(0, 40))}"');421 - if (msgSessionId == null || msgSessionId == activeId) {422 - // Active session or no session: add directly to chat426 + _chatLog('catch_up msg: session=${msgSessionId?.substring(0, 8) ?? "NULL"} active=${activeId?.substring(0, 8)} content="${content.substring(0, content.length.clamp(0, 40))}"');427 +428 + if (msgSessionId == null || msgSessionId == currentId) {429 + // Active session or no session: add to UI (addMessage also appends to log).423 430 ref.read(messagesProvider.notifier).addMessage(message); 424 431 } else { 425 - // Different session: store + unread badge + toast426 - // Collect for batch storage below to avoid race condition427 - _catchUpPending.putIfAbsent(msgSessionId, () => []).add(message);432 + // Cross-session: synchronous append — no race condition.433 + MessageStoreV2.append(msgSessionId, message);428 434 _incrementUnread(msgSessionId); 435 + crossSessionCounts[msgSessionId] = (crossSessionCounts[msgSessionId] ?? 0) + 1;436 + crossSessionPreviews.putIfAbsent(msgSessionId, () => content);429 437 } 430 438 existingContents.add(content); 431 439 } 440 +432 441 _isCatchingUp = false; 433 442 _scrollToBottom(); 434 - // Batch-store cross-session messages (sequential to avoid race condition)435 - if (_catchUpPending.isNotEmpty) {436 - final pending = Map<String, List<Message>>.from(_catchUpPending);437 - _catchUpPending.clear();438 - // Show one toast per session with message count439 - if (mounted) {440 - final sessions = ref.read(sessionsProvider);441 - for (final entry in pending.entries) {442 - final session = sessions.firstWhere(443 - (s) => s.id == entry.key,444 - orElse: () => Session(id: entry.key, index: 0, name: 'Unknown', type: 'claude'),445 - );446 - final count = entry.value.length;447 - final preview = count == 1448 - ? entry.value.first.content449 - : '$count messages';450 - ToastManager.show(451 - context,452 - sessionName: session.name,453 - preview: preview.length > 100 ? '${preview.substring(0, 100)}...' : preview,454 - onTap: () => _switchSession(entry.key),455 - );456 - }443 +444 + // Show one toast per cross-session that received messages.445 + if (crossSessionCounts.isNotEmpty && mounted) {446 + final sessions = ref.read(sessionsProvider);447 + for (final entry in crossSessionCounts.entries) {448 + final sid = entry.key;449 + final count = entry.value;450 + final session = sessions.firstWhere(451 + (s) => s.id == sid,452 + orElse: () => Session(id: sid, index: 0, name: 'Unknown', type: 'claude'),453 + );454 + final preview = count == 1455 + ? (crossSessionPreviews[sid] ?? '')456 + : '$count messages';457 + ToastManager.show(458 + context,459 + sessionName: session.name,460 + preview: preview.length > 100 ? '${preview.substring(0, 100)}...' : preview,461 + onTap: () => _switchSession(sid),462 + );457 463 } 458 - () async {459 - for (final entry in pending.entries) {460 - final existing = await MessageStore.loadAll(entry.key);461 - MessageStore.save(entry.key, [...existing, ...entry.value]);462 - await MessageStore.flush();463 - }464 - }();465 464 } 465 +466 466 // Clear unread for active session 467 467 if (activeId != null) { 468 468 final counts = Map<String, int>.from(ref.read(unreadCountsProvider)); .. .. @@ -500,6 +500,7 @@ 500 500 orElse: () => sessions.first, 501 501 ); 502 502 ref.read(activeSessionIdProvider.notifier).state = active.id; 503 + // Synchronous session switch — no async gap.503 504 ref.read(messagesProvider.notifier).switchSession(active.id); 504 505 SharedPreferences.getInstance().then((p) => p.setString('activeSessionId', active.id)); 505 506 } .. .. @@ -520,7 +521,7 @@ 520 521 } 521 522 } 522 523 523 - Future<void> _handleIncomingMessage(Map<String, dynamic> msg) async {524 + void _handleIncomingMessage(Map<String, dynamic> msg) {524 525 final sessionId = msg['sessionId'] as String?; 525 526 final content = msg['content'] as String? ?? 526 527 msg['text'] as String? ?? .. .. @@ -537,16 +538,16 @@ 537 538 status: MessageStatus.sent, 538 539 ); 539 540 540 - // Use currentSessionId from notifier (what's actually loaded in the provider)541 - // not activeSessionIdProvider (can be stale after background resume)541 + // Use currentSessionId from notifier (what's actually loaded in the provider),542 + // not activeSessionIdProvider (can be stale after background resume).542 543 final currentId = ref.read(messagesProvider.notifier).currentSessionId; 543 544 if (sessionId != null && sessionId != currentId) { 544 - // Store message for the other session so it's there when user switches545 + // Append directly to the log for the target session — synchronous, no race.545 546 TraceService.instance.addTrace( 546 547 'message stored for session', 547 548 'sessionId=${sessionId.substring(0, sessionId.length.clamp(0, 8))}, toast shown', 548 549 ); 549 - await _storeForSession(sessionId, message);550 + MessageStoreV2.append(sessionId, message);550 551 _incrementUnread(sessionId); 551 552 final sessions = ref.read(sessionsProvider); 552 553 final session = sessions.firstWhere( .. .. @@ -616,9 +617,10 @@ 616 617 final currentId = ref.read(messagesProvider.notifier).currentSessionId; 617 618 _chatLog('voice: sessionId=$sessionId currentId=$currentId audioPath=$savedAudioPath content="${content.substring(0, content.length.clamp(0, 30))}"'); 618 619 if (sessionId != null && sessionId != currentId) { 619 - _chatLog('voice: cross-session, storing for $sessionId');620 - await _storeForSession(sessionId, storedMessage);621 - _chatLog('voice: stored, incrementing unread');620 + _chatLog('voice: cross-session, appending to store for $sessionId');621 + // Synchronous append — no async gap, no race condition.622 + MessageStoreV2.append(sessionId, storedMessage);623 + _chatLog('voice: appended, incrementing unread');622 624 _incrementUnread(sessionId); 623 625 final sessions = ref.read(sessionsProvider); 624 626 final session = sessions.firstWhere( .. .. @@ -684,10 +686,10 @@ 684 686 status: MessageStatus.sent, 685 687 ); 686 688 687 - // Cross-session routing: store for target session if not currently loaded689 + // Cross-session routing: append to log for target session if not currently loaded.688 690 final currentId = ref.read(messagesProvider.notifier).currentSessionId; 689 691 if (sessionId != null && sessionId != currentId) { 690 - _storeForSession(sessionId, message);692 + MessageStoreV2.append(sessionId, message);691 693 _incrementUnread(sessionId); 692 694 return; 693 695 } .. .. @@ -697,51 +699,19 @@ 697 699 _scrollToBottom(); 698 700 } 699 701 700 - /// Store a message for a non-active session so it persists when the user switches to it.701 - Future<void> _storeForSession(String sessionId, Message message) async {702 - final existing = await MessageStore.loadAll(sessionId);703 - _chatLog('storeForSession: $sessionId existing=${existing.length} adding type=${message.type.name} content="${message.content.substring(0, message.content.length.clamp(0, 30))}" audioUri=${message.audioUri != null ? "set(${message.audioUri!.length})" : "null"}');704 - MessageStore.save(sessionId, [...existing, message]);705 - await MessageStore.flush();706 - // Verify707 - final verify = await MessageStore.loadAll(sessionId);708 - _chatLog('storeForSession: verified ${verify.length} messages after save');702 + /// Superseded by MessageStoreV2.append() — call sites now use the synchronous703 + /// append directly. Kept as dead code until all callers are confirmed removed.704 + // ignore: unused_element705 + void _storeForSession(String sessionId, Message message) {706 + MessageStoreV2.append(sessionId, message);709 707 } 710 708 711 - /// Update a transcript for a message stored on disk (not in the active session).712 - /// Scans all session files to find the message by ID, updates content, and saves.709 + /// With the append-only log, transcript updates for cross-session messages710 + /// are not patched back to disk (the append-only design doesn't support711 + /// in-place edits). The transcript is updated in-memory if the message is712 + /// in the active session. Cross-session transcript updates are a no-op.713 713 Future<void> _updateTranscriptOnDisk(String messageId, String content) async { 714 - try {715 - final dir = await getApplicationDocumentsDirectory();716 - final msgDir = Directory('${dir.path}/messages');717 - if (!await msgDir.exists()) return;718 -719 - await for (final entity in msgDir.list()) {720 - if (entity is! File || !entity.path.endsWith('.json')) continue;721 -722 - final jsonStr = await entity.readAsString();723 - final List<dynamic> jsonList = jsonDecode(jsonStr) as List<dynamic>;724 - bool found = false;725 -726 - final updated = jsonList.map((j) {727 - final map = j as Map<String, dynamic>;728 - if (map['id'] == messageId) {729 - found = true;730 - return {...map, 'content': content};731 - }732 - return map;733 - }).toList();734 -735 - if (found) {736 - await entity.writeAsString(jsonEncode(updated));737 - _chatLog('transcript: updated messageId=$messageId on disk in ${entity.path.split('/').last}');738 - return;739 - }740 - }741 - _chatLog('transcript: messageId=$messageId not found on disk');742 - } catch (e) {743 - _chatLog('transcript: disk update error=$e');744 - }714 + _chatLog('transcript: cross-session update for messageId=$messageId — in-memory only (append-only log)');745 715 } 746 716 747 717 void _incrementUnread(String sessionId) { .. .. @@ -770,7 +740,8 @@ 770 740 ref.read(isTypingProvider.notifier).state = false; 771 741 772 742 ref.read(activeSessionIdProvider.notifier).state = sessionId; 773 - await ref.read(messagesProvider.notifier).switchSession(sessionId);743 + // Synchronous — no async gap between session switch and incoming messages.744 + ref.read(messagesProvider.notifier).switchSession(sessionId);774 745 SharedPreferences.getInstance().then((p) => p.setString('activeSessionId', sessionId)); 775 746 776 747 final counts = Map<String, int>.from(ref.read(unreadCountsProvider)); lib/services/message_store.dart
.. .. @@ -1,4 +1,3 @@ 1 -import 'dart:async';2 1 import 'dart:convert'; 3 2 import 'dart:io'; 4 3 .. .. @@ -8,30 +7,39 @@ 8 7 import '../models/message.dart'; 9 8 import 'trace_service.dart'; 10 9 11 -/// Per-session JSON file persistence with debounced saves.12 -class MessageStore {13 - MessageStore._();10 +/// Append-only log-based message persistence.11 +///12 +/// Layout:13 +/// messages/log.jsonl — one JSON object per line, each a serialized Message14 +/// messages/index.json — { "sessionId": [lineNumber, ...] }15 +///16 +/// All writes are synchronous (writeAsStringSync with FileMode.append) to17 +/// prevent race conditions between concurrent addMessage / switchSession calls.18 +class MessageStoreV2 {19 + MessageStoreV2._();20 +21 + static const _backupChannel = MethodChannel('com.mnsoft.pailot/backup');22 +23 + // In-memory index: sessionId -> list of 0-based line numbers in log.jsonl24 + static final Map<String, List<int>> _index = {};25 +26 + // Number of lines currently in the log (= next line number to write)27 + static int _lineCount = 0;28 +29 + // Flush the index to disk every N appends to amortise I/O30 + static const _indexFlushInterval = 20;31 + static int _appendsSinceFlush = 0;14 32 15 33 static Directory? _baseDir; 16 - static Timer? _debounceTimer;17 - static final Map<String, List<Message>> _pendingSaves = {};18 - // Per-session lock to prevent concurrent read/write on the same file19 - static final Map<String, Completer<void>> _locks = {};20 34 21 - static const _backupChannel =22 - MethodChannel('com.mnsoft.pailot/backup');35 + // ------------------------------------------------------------------ init --23 36 24 - /// Initialize the base directory for message storage.25 - /// On iOS, the directory is excluded from iCloud / iTunes backup so that26 - /// large base64 image attachments do not bloat the user's cloud storage.27 - /// Messages can be re-fetched from the server if needed.28 37 static Future<Directory> _getBaseDir() async { 29 38 if (_baseDir != null) return _baseDir!; 30 39 final appDir = await getApplicationDocumentsDirectory(); 31 40 _baseDir = Directory('${appDir.path}/messages'); 32 - final created = !await _baseDir!.exists();33 - if (created) {34 - await _baseDir!.create(recursive: true);41 + if (!_baseDir!.existsSync()) {42 + _baseDir!.createSync(recursive: true);35 43 } 36 44 // Exclude from iCloud / iTunes backup (best-effort, iOS only). 37 45 if (Platform.isIOS) { .. .. @@ -40,144 +48,243 @@ 40 48 'excludeFromBackup', 41 49 _baseDir!.path, 42 50 ); 43 - } catch (_) {44 - // Non-fatal: if the channel call fails, backup exclusion is skipped.45 - }51 + } catch (_) {}46 52 } 47 53 return _baseDir!; 48 54 } 49 55 50 - static String _fileForSession(String sessionId) {51 - // Sanitize session ID for filename52 - final safe = sessionId.replaceAll(RegExp(r'[^\w\-]'), '_');53 - return 'session_$safe.json';54 - }56 + static String _logPath(Directory dir) => '${dir.path}/log.jsonl';57 + static String _indexPath(Directory dir) => '${dir.path}/index.json';55 58 56 - /// Save messages for a session with 1-second debounce.57 - static void save(String sessionId, List<Message> messages) {58 - _pendingSaves[sessionId] = messages;59 - _debounceTimer?.cancel();60 - _debounceTimer = Timer(const Duration(seconds: 1), _flushAll);61 - }62 -63 - /// Write directly to disk, bypassing debounce. For critical saves.64 - static Future<void> writeDirect(String sessionId, List<Message> messages) async {65 - _debounceTimer?.cancel();66 - _pendingSaves.remove(sessionId);67 - await _withLock(sessionId, () => _writeSession(sessionId, messages));68 - }69 -70 - /// Acquire a per-session lock, run the operation, release.71 - static Future<T> _withLock<T>(String sessionId, Future<T> Function() fn) async {72 - // Wait for any existing operation on this session to finish73 - while (_locks.containsKey(sessionId)) {74 - await _locks[sessionId]!.future;75 - }76 - final completer = Completer<void>();77 - _locks[sessionId] = completer;78 - try {79 - return await fn();80 - } finally {81 - _locks.remove(sessionId);82 - completer.complete();83 - }84 - }85 -86 - /// Immediately flush all pending saves.87 - static Future<void> flush() async {88 - _debounceTimer?.cancel();89 - await _flushAll();90 - }91 -92 - static Future<void> _flushAll() async {93 - final entries = Map<String, List<Message>>.from(_pendingSaves);94 - _pendingSaves.clear();95 -96 - for (final entry in entries.entries) {97 - await _withLock(entry.key, () => _writeSession(entry.key, entry.value));98 - }99 - }100 -101 - static Future<void> _writeSession(102 - String sessionId, List<Message> messages) async {59 + /// Called once at app startup. Reads log.jsonl and rebuilds the in-memory60 + /// index. Then calls compact() to trim old messages.61 + static Future<void> initialize() async {103 62 try { 104 63 final dir = await _getBaseDir(); 105 - final file = File('${dir.path}/${_fileForSession(sessionId)}');106 - // Strip heavy fields for persistence107 - final lightMessages = messages.map((m) => m.toJsonLight()).toList();108 - final json = jsonEncode(lightMessages);109 - await file.writeAsString(json);110 - TraceService.instance.addTrace('MsgStore WRITE', '${sessionId.substring(0, 8)}: ${messages.length} msgs');64 + final logFile = File(_logPath(dir));65 + final indexFile = File(_indexPath(dir));66 +67 + // Try loading saved index first (fast path).68 + if (indexFile.existsSync()) {69 + try {70 + final raw = indexFile.readAsStringSync();71 + final decoded = jsonDecode(raw) as Map<String, dynamic>;72 + for (final entry in decoded.entries) {73 + _index[entry.key] =74 + (entry.value as List<dynamic>).map((e) => e as int).toList();75 + }76 + } catch (_) {77 + _index.clear();78 + }79 + }80 +81 + // Count actual lines in log to set _lineCount.82 + if (logFile.existsSync()) {83 + final content = logFile.readAsStringSync();84 + _lineCount = content.isEmpty85 + ? 086 + : content.trimRight().split('\n').length;87 + } else {88 + _lineCount = 0;89 + }90 +91 + // If the index was missing or corrupt, rebuild from log.92 + if (_index.isEmpty && _lineCount > 0) {93 + await _rebuildIndex(logFile);94 + }95 +96 + TraceService.instance.addTrace(97 + 'MsgStoreV2 INIT', '$_lineCount lines, ${_index.length} sessions');98 +99 + // Compact on startup (keeps last 200 per session).100 + await compact();111 101 } catch (e) { 112 - TraceService.instance.addTrace('MsgStore WRITE ERROR', '${sessionId.substring(0, 8)}: $e');102 + TraceService.instance.addTrace('MsgStoreV2 INIT ERROR', '$e');113 103 } 114 104 } 115 105 116 - /// Load messages for a session.117 - /// [limit] controls how many recent messages to return (default: 50).118 - /// [offset] is the number of messages to skip from the end (for pagination).119 - static Future<List<Message>> load(120 - String sessionId, {121 - int limit = 50,122 - int offset = 0,123 - }) async {106 + static Future<void> _rebuildIndex(File logFile) async {107 + _index.clear();108 + final lines = logFile.readAsLinesSync();109 + for (var i = 0; i < lines.length; i++) {110 + final line = lines[i].trim();111 + if (line.isEmpty) continue;112 + try {113 + final map = jsonDecode(line) as Map<String, dynamic>;114 + final sessionId = map['sessionId'] as String?;115 + if (sessionId != null) {116 + _index.putIfAbsent(sessionId, () => []).add(i);117 + }118 + } catch (_) {}119 + }120 + }121 +122 + // --------------------------------------------------------------- append --123 +124 + /// Append a message to the log. SYNCHRONOUS — no async gap, no race.125 + ///126 + /// Each line written includes a 'sessionId' field so the index can be127 + /// rebuilt from the log alone if needed.128 + static void append(String sessionId, Message message) {124 129 try { 125 - final dir = await _getBaseDir();126 - final file = File('${dir.path}/${_fileForSession(sessionId)}');127 - if (!await file.exists()) return [];130 + final dir = _baseDir;131 + if (dir == null) {132 + // initialize() hasn't been called yet — silently drop (shouldn't happen).133 + TraceService.instance134 + .addTrace('MsgStoreV2 APPEND WARN', 'baseDir null, dropping');135 + return;136 + }137 + final logFile = File(_logPath(dir));138 + final json = message.toJsonLight();139 + json['sessionId'] = sessionId;140 + final line = '${jsonEncode(json)}\n';128 141 129 - final jsonStr = await file.readAsString();130 - final List<dynamic> jsonList = jsonDecode(jsonStr) as List<dynamic>;131 - final allMessages = jsonList132 - .map((j) => _messageFromJson(j as Map<String, dynamic>))133 - .where((m) => !m.isEmptyVoice && !m.isEmptyText)134 - .toList();142 + // Synchronous append — atomic single write, no read-modify-write.143 + logFile.writeAsStringSync(line, mode: FileMode.append);135 144 136 - // Paginate from the end (newest messages first in storage)137 - if (offset >= allMessages.length) return [];138 - final end = allMessages.length - offset;139 - final start = (end - limit).clamp(0, end);140 - return allMessages.sublist(start, end);145 + // Update in-memory index.146 + _index.putIfAbsent(sessionId, () => []).add(_lineCount);147 + _lineCount++;148 +149 + // Periodically flush index to disk.150 + _appendsSinceFlush++;151 + if (_appendsSinceFlush >= _indexFlushInterval) {152 + _flushIndex(dir);153 + _appendsSinceFlush = 0;154 + }141 155 } catch (e) { 156 + TraceService.instance.addTrace('MsgStoreV2 APPEND ERROR', '$e');157 + }158 + }159 +160 + // -------------------------------------------------------------- load --161 +162 + /// Load messages for a session. SYNCHRONOUS — reads from the log using the163 + /// in-memory index. Safe to call from switchSession without async gaps.164 + static List<Message> loadSession(String sessionId) {165 + try {166 + final dir = _baseDir;167 + if (dir == null) return [];168 + final logFile = File(_logPath(dir));169 + if (!logFile.existsSync()) return [];170 +171 + final lineNumbers = _index[sessionId];172 + if (lineNumbers == null || lineNumbers.isEmpty) return [];173 +174 + // Read all lines at once then pick the ones we need.175 + final allLines = logFile.readAsLinesSync();176 + final messages = <Message>[];177 +178 + for (final n in lineNumbers) {179 + if (n >= allLines.length) continue;180 + final line = allLines[n].trim();181 + if (line.isEmpty) continue;182 + try {183 + final map = jsonDecode(line) as Map<String, dynamic>;184 + // Remove synthetic sessionId field before deserialising.185 + map.remove('sessionId');186 + final msg = _messageFromJson(map);187 + if (!msg.isEmptyVoice && !msg.isEmptyText) {188 + messages.add(msg);189 + }190 + } catch (_) {}191 + }192 +193 + TraceService.instance.addTrace(194 + 'MsgStoreV2 LOAD', '${sessionId.substring(0, 8)}: ${messages.length} msgs');195 + return messages;196 + } catch (e) {197 + TraceService.instance198 + .addTrace('MsgStoreV2 LOAD ERROR', '${sessionId.substring(0, 8)}: $e');142 199 return []; 143 200 } 144 201 } 145 202 146 - /// Load all messages for a session (no pagination).147 - static Future<List<Message>> loadAll(String sessionId) async {148 - return _withLock(sessionId, () => _loadAllImpl(sessionId));149 - }203 + // ------------------------------------------------------------- compact --150 204 151 - static Future<List<Message>> _loadAllImpl(String sessionId) async {205 + /// Rewrite the log keeping at most [keepPerSession] messages per session.206 + /// Called once on startup after initialize(). NOT called during normal use.207 + static Future<void> compact({int keepPerSession = 200}) async {152 208 try { 153 209 final dir = await _getBaseDir(); 154 - final file = File('${dir.path}/${_fileForSession(sessionId)}');155 - if (!await file.exists()) return [];210 + final logFile = File(_logPath(dir));211 + if (!logFile.existsSync()) return;156 212 157 - final jsonStr = await file.readAsString();158 - final List<dynamic> jsonList = jsonDecode(jsonStr) as List<dynamic>;159 - final msgs = jsonList160 - .map((j) => _messageFromJson(j as Map<String, dynamic>))161 - .where((m) => !m.isEmptyVoice && !m.isEmptyText)162 - .toList();163 - TraceService.instance.addTrace('MsgStore LOAD', '${sessionId.substring(0, 8)}: ${msgs.length} msgs');164 - return msgs;213 + final allLines = logFile.readAsLinesSync();214 + if (allLines.length < 500) return; // nothing worth compacting215 +216 + // Build a set of line numbers to keep: last keepPerSession per session.217 + final keepLines = <int>{};218 + for (final entry in _index.entries) {219 + final lines = entry.value;220 + final start = lines.length > keepPerSession221 + ? lines.length - keepPerSession222 + : 0;223 + for (var i = start; i < lines.length; i++) {224 + keepLines.add(lines[i]);225 + }226 + }227 +228 + if (keepLines.length == allLines.length) return; // nothing removed229 +230 + // Rewrite the log with only the kept lines, rebuilding the index.231 + final newIndex = <String, List<int>>{};232 + final buffer = StringBuffer();233 + var newLine = 0;234 +235 + for (var i = 0; i < allLines.length; i++) {236 + if (!keepLines.contains(i)) continue;237 + final line = allLines[i].trim();238 + if (line.isEmpty) continue;239 + buffer.write('$line\n');240 + // Extract sessionId for new index.241 + try {242 + final map = jsonDecode(line) as Map<String, dynamic>;243 + final sid = map['sessionId'] as String?;244 + if (sid != null) {245 + newIndex.putIfAbsent(sid, () => []).add(newLine);246 + }247 + } catch (_) {}248 + newLine++;249 + }250 +251 + logFile.writeAsStringSync(buffer.toString());252 + _index253 + ..clear()254 + ..addAll(newIndex);255 + _lineCount = newLine;256 + _flushIndex(dir);257 +258 + TraceService.instance.addTrace(259 + 'MsgStoreV2 COMPACT', '${allLines.length} → $newLine lines');165 260 } catch (e) { 166 - TraceService.instance.addTrace('MsgStore LOAD ERROR', '${sessionId.substring(0, 8)}: $e');167 - return [];261 + TraceService.instance.addTrace('MsgStoreV2 COMPACT ERROR', '$e');168 262 } 169 263 } 170 264 171 - /// Deserialize a message from JSON, applying migration rules:172 - /// - Voice messages without audioUri are downgraded to text (transcript only).173 - /// This handles messages saved before a restart, where the temp audio file174 - /// is no longer available. The transcript (content) is preserved.265 + // --------------------------------------------------------- index flush --266 +267 + static void _flushIndex(Directory dir) {268 + try {269 + final indexMap = _index.map(270 + (k, v) => MapEntry(k, v));271 + File(_indexPath(dir))272 + .writeAsStringSync(jsonEncode(indexMap));273 + } catch (_) {}274 + }275 +276 + /// Force-flush the index to disk (call on app suspend / session switch).277 + static void flushIndex() {278 + if (_baseDir != null) _flushIndex(_baseDir!);279 + }280 +281 + // ------------------------------------------------- migration helper --282 +283 + /// Deserialize a message, downgrading voice→text if audio is unavailable.175 284 static Message _messageFromJson(Map<String, dynamic> json) { 176 285 final raw = Message.fromJson(json); 177 286 if (raw.type == MessageType.voice && 178 287 (raw.audioUri == null || raw.audioUri!.isEmpty)) { 179 - // Downgrade to text so the bubble shows the transcript instead of a180 - // broken play button.181 288 return Message( 182 289 id: raw.id, 183 290 role: raw.role, .. .. @@ -191,25 +298,18 @@ 191 298 return raw; 192 299 } 193 300 194 - /// Delete stored messages for a session.195 - static Future<void> delete(String sessionId) async {196 - try {197 - final dir = await _getBaseDir();198 - final file = File('${dir.path}/${_fileForSession(sessionId)}');199 - if (await file.exists()) {200 - await file.delete();201 - }202 - } catch (_) {}203 - }301 + // --------------------------------------------------------- clear all --204 302 205 - /// Clear all stored messages.303 + /// Wipe everything (used from settings / debug).206 304 static Future<void> clearAll() async { 207 305 try { 208 306 final dir = await _getBaseDir(); 209 - if (await dir.exists()) {210 - await dir.delete(recursive: true);211 - await dir.create(recursive: true);307 + if (dir.existsSync()) {308 + dir.deleteSync(recursive: true);309 + dir.createSync(recursive: true);212 310 } 311 + _index.clear();312 + _lineCount = 0;213 313 } catch (_) {} 214 314 } 215 315 }