From 06bb73662d32d65d1e775a4dd35f67d82d673e40 Mon Sep 17 00:00:00 2001
From: Matthias Nott <mnott@mnsoft.org>
Date: Mon, 06 Apr 2026 15:32:03 +0200
Subject: [PATCH] feat: rewrite message store - append-only log, sync routing, eliminates async race conditions
---
lib/services/message_store.dart | 376 ++++++++++++++++++++++++++++++++++-------------------
1 files changed, 238 insertions(+), 138 deletions(-)
diff --git a/lib/services/message_store.dart b/lib/services/message_store.dart
index 1723b70..71a7655 100644
--- a/lib/services/message_store.dart
+++ b/lib/services/message_store.dart
@@ -1,4 +1,3 @@
-import 'dart:async';
import 'dart:convert';
import 'dart:io';
@@ -8,30 +7,39 @@
import '../models/message.dart';
import 'trace_service.dart';
-/// Per-session JSON file persistence with debounced saves.
-class MessageStore {
- MessageStore._();
+/// Append-only log-based message persistence.
+///
+/// Layout:
+/// messages/log.jsonl — one JSON object per line, each a serialized Message
+/// messages/index.json — { "sessionId": [lineNumber, ...] }
+///
+/// All writes are synchronous (writeAsStringSync with FileMode.append) to
+/// prevent race conditions between concurrent addMessage / switchSession calls.
+class MessageStoreV2 {
+ MessageStoreV2._();
+
+ static const _backupChannel = MethodChannel('com.mnsoft.pailot/backup');
+
+ // In-memory index: sessionId -> list of 0-based line numbers in log.jsonl
+ static final Map<String, List<int>> _index = {};
+
+ // Number of lines currently in the log (= next line number to write)
+ static int _lineCount = 0;
+
+ // Flush the index to disk every N appends to amortise I/O
+ static const _indexFlushInterval = 20;
+ static int _appendsSinceFlush = 0;
static Directory? _baseDir;
- static Timer? _debounceTimer;
- static final Map<String, List<Message>> _pendingSaves = {};
- // Per-session lock to prevent concurrent read/write on the same file
- static final Map<String, Completer<void>> _locks = {};
- static const _backupChannel =
- MethodChannel('com.mnsoft.pailot/backup');
+ // ------------------------------------------------------------------ init --
- /// Initialize the base directory for message storage.
- /// On iOS, the directory is excluded from iCloud / iTunes backup so that
- /// large base64 image attachments do not bloat the user's cloud storage.
- /// Messages can be re-fetched from the server if needed.
static Future<Directory> _getBaseDir() async {
if (_baseDir != null) return _baseDir!;
final appDir = await getApplicationDocumentsDirectory();
_baseDir = Directory('${appDir.path}/messages');
- final created = !await _baseDir!.exists();
- if (created) {
- await _baseDir!.create(recursive: true);
+ if (!_baseDir!.existsSync()) {
+ _baseDir!.createSync(recursive: true);
}
// Exclude from iCloud / iTunes backup (best-effort, iOS only).
if (Platform.isIOS) {
@@ -40,144 +48,243 @@
'excludeFromBackup',
_baseDir!.path,
);
- } catch (_) {
- // Non-fatal: if the channel call fails, backup exclusion is skipped.
- }
+ } catch (_) {}
}
return _baseDir!;
}
- static String _fileForSession(String sessionId) {
- // Sanitize session ID for filename
- final safe = sessionId.replaceAll(RegExp(r'[^\w\-]'), '_');
- return 'session_$safe.json';
- }
+ static String _logPath(Directory dir) => '${dir.path}/log.jsonl';
+ static String _indexPath(Directory dir) => '${dir.path}/index.json';
- /// Save messages for a session with 1-second debounce.
- static void save(String sessionId, List<Message> messages) {
- _pendingSaves[sessionId] = messages;
- _debounceTimer?.cancel();
- _debounceTimer = Timer(const Duration(seconds: 1), _flushAll);
- }
-
- /// Write directly to disk, bypassing debounce. For critical saves.
- static Future<void> writeDirect(String sessionId, List<Message> messages) async {
- _debounceTimer?.cancel();
- _pendingSaves.remove(sessionId);
- await _withLock(sessionId, () => _writeSession(sessionId, messages));
- }
-
- /// Acquire a per-session lock, run the operation, release.
- static Future<T> _withLock<T>(String sessionId, Future<T> Function() fn) async {
- // Wait for any existing operation on this session to finish
- while (_locks.containsKey(sessionId)) {
- await _locks[sessionId]!.future;
- }
- final completer = Completer<void>();
- _locks[sessionId] = completer;
- try {
- return await fn();
- } finally {
- _locks.remove(sessionId);
- completer.complete();
- }
- }
-
- /// Immediately flush all pending saves.
- static Future<void> flush() async {
- _debounceTimer?.cancel();
- await _flushAll();
- }
-
- static Future<void> _flushAll() async {
- final entries = Map<String, List<Message>>.from(_pendingSaves);
- _pendingSaves.clear();
-
- for (final entry in entries.entries) {
- await _withLock(entry.key, () => _writeSession(entry.key, entry.value));
- }
- }
-
- static Future<void> _writeSession(
- String sessionId, List<Message> messages) async {
+ /// Called once at app startup. Reads log.jsonl and rebuilds the in-memory
+ /// index. Then calls compact() to trim old messages.
+ static Future<void> initialize() async {
try {
final dir = await _getBaseDir();
- final file = File('${dir.path}/${_fileForSession(sessionId)}');
- // Strip heavy fields for persistence
- final lightMessages = messages.map((m) => m.toJsonLight()).toList();
- final json = jsonEncode(lightMessages);
- await file.writeAsString(json);
- TraceService.instance.addTrace('MsgStore WRITE', '${sessionId.substring(0, 8)}: ${messages.length} msgs');
+ final logFile = File(_logPath(dir));
+ final indexFile = File(_indexPath(dir));
+
+ // Try loading saved index first (fast path).
+ if (indexFile.existsSync()) {
+ try {
+ final raw = indexFile.readAsStringSync();
+ final decoded = jsonDecode(raw) as Map<String, dynamic>;
+ for (final entry in decoded.entries) {
+ _index[entry.key] =
+ (entry.value as List<dynamic>).map((e) => e as int).toList();
+ }
+ } catch (_) {
+ _index.clear();
+ }
+ }
+
+ // Count actual lines in log to set _lineCount.
+ if (logFile.existsSync()) {
+ final content = logFile.readAsStringSync();
+ _lineCount = content.isEmpty
+ ? 0
+ : content.trimRight().split('\n').length;
+ } else {
+ _lineCount = 0;
+ }
+
+ // If the index was missing or corrupt, rebuild from log.
+ if (_index.isEmpty && _lineCount > 0) {
+ await _rebuildIndex(logFile);
+ }
+
+ TraceService.instance.addTrace(
+ 'MsgStoreV2 INIT', '$_lineCount lines, ${_index.length} sessions');
+
+ // Compact on startup (keeps last 200 per session).
+ await compact();
} catch (e) {
- TraceService.instance.addTrace('MsgStore WRITE ERROR', '${sessionId.substring(0, 8)}: $e');
+ TraceService.instance.addTrace('MsgStoreV2 INIT ERROR', '$e');
}
}
- /// Load messages for a session.
- /// [limit] controls how many recent messages to return (default: 50).
- /// [offset] is the number of messages to skip from the end (for pagination).
- static Future<List<Message>> load(
- String sessionId, {
- int limit = 50,
- int offset = 0,
- }) async {
+ static Future<void> _rebuildIndex(File logFile) async {
+ _index.clear();
+ final lines = logFile.readAsLinesSync();
+ for (var i = 0; i < lines.length; i++) {
+ final line = lines[i].trim();
+ if (line.isEmpty) continue;
+ try {
+ final map = jsonDecode(line) as Map<String, dynamic>;
+ final sessionId = map['sessionId'] as String?;
+ if (sessionId != null) {
+ _index.putIfAbsent(sessionId, () => []).add(i);
+ }
+ } catch (_) {}
+ }
+ }
+
+ // --------------------------------------------------------------- append --
+
+ /// Append a message to the log. SYNCHRONOUS — no async gap, no race.
+ ///
+ /// Each line written includes a 'sessionId' field so the index can be
+ /// rebuilt from the log alone if needed.
+ static void append(String sessionId, Message message) {
try {
- final dir = await _getBaseDir();
- final file = File('${dir.path}/${_fileForSession(sessionId)}');
- if (!await file.exists()) return [];
+ final dir = _baseDir;
+ if (dir == null) {
+ // initialize() hasn't been called yet — silently drop (shouldn't happen).
+ TraceService.instance
+ .addTrace('MsgStoreV2 APPEND WARN', 'baseDir null, dropping');
+ return;
+ }
+ final logFile = File(_logPath(dir));
+ final json = message.toJsonLight();
+ json['sessionId'] = sessionId;
+ final line = '${jsonEncode(json)}\n';
- final jsonStr = await file.readAsString();
- final List<dynamic> jsonList = jsonDecode(jsonStr) as List<dynamic>;
- final allMessages = jsonList
- .map((j) => _messageFromJson(j as Map<String, dynamic>))
- .where((m) => !m.isEmptyVoice && !m.isEmptyText)
- .toList();
+ // Synchronous append — atomic single write, no read-modify-write.
+ logFile.writeAsStringSync(line, mode: FileMode.append);
- // Paginate from the end (newest messages first in storage)
- if (offset >= allMessages.length) return [];
- final end = allMessages.length - offset;
- final start = (end - limit).clamp(0, end);
- return allMessages.sublist(start, end);
+ // Update in-memory index.
+ _index.putIfAbsent(sessionId, () => []).add(_lineCount);
+ _lineCount++;
+
+ // Periodically flush index to disk.
+ _appendsSinceFlush++;
+ if (_appendsSinceFlush >= _indexFlushInterval) {
+ _flushIndex(dir);
+ _appendsSinceFlush = 0;
+ }
} catch (e) {
+ TraceService.instance.addTrace('MsgStoreV2 APPEND ERROR', '$e');
+ }
+ }
+
+ // -------------------------------------------------------------- load --
+
+ /// Load messages for a session. SYNCHRONOUS — reads from the log using the
+ /// in-memory index. Safe to call from switchSession without async gaps.
+ static List<Message> loadSession(String sessionId) {
+ try {
+ final dir = _baseDir;
+ if (dir == null) return [];
+ final logFile = File(_logPath(dir));
+ if (!logFile.existsSync()) return [];
+
+ final lineNumbers = _index[sessionId];
+ if (lineNumbers == null || lineNumbers.isEmpty) return [];
+
+ // Read all lines at once then pick the ones we need.
+ final allLines = logFile.readAsLinesSync();
+ final messages = <Message>[];
+
+ for (final n in lineNumbers) {
+ if (n >= allLines.length) continue;
+ final line = allLines[n].trim();
+ if (line.isEmpty) continue;
+ try {
+ final map = jsonDecode(line) as Map<String, dynamic>;
+ // Remove synthetic sessionId field before deserialising.
+ map.remove('sessionId');
+ final msg = _messageFromJson(map);
+ if (!msg.isEmptyVoice && !msg.isEmptyText) {
+ messages.add(msg);
+ }
+ } catch (_) {}
+ }
+
+ TraceService.instance.addTrace(
+ 'MsgStoreV2 LOAD', '${sessionId.substring(0, 8)}: ${messages.length} msgs');
+ return messages;
+ } catch (e) {
+ TraceService.instance
+ .addTrace('MsgStoreV2 LOAD ERROR', '${sessionId.substring(0, 8)}: $e');
return [];
}
}
- /// Load all messages for a session (no pagination).
- static Future<List<Message>> loadAll(String sessionId) async {
- return _withLock(sessionId, () => _loadAllImpl(sessionId));
- }
+ // ------------------------------------------------------------- compact --
- static Future<List<Message>> _loadAllImpl(String sessionId) async {
+ /// Rewrite the log keeping at most [keepPerSession] messages per session.
+ /// Called once on startup after initialize(). NOT called during normal use.
+ static Future<void> compact({int keepPerSession = 200}) async {
try {
final dir = await _getBaseDir();
- final file = File('${dir.path}/${_fileForSession(sessionId)}');
- if (!await file.exists()) return [];
+ final logFile = File(_logPath(dir));
+ if (!logFile.existsSync()) return;
- final jsonStr = await file.readAsString();
- final List<dynamic> jsonList = jsonDecode(jsonStr) as List<dynamic>;
- final msgs = jsonList
- .map((j) => _messageFromJson(j as Map<String, dynamic>))
- .where((m) => !m.isEmptyVoice && !m.isEmptyText)
- .toList();
- TraceService.instance.addTrace('MsgStore LOAD', '${sessionId.substring(0, 8)}: ${msgs.length} msgs');
- return msgs;
+ final allLines = logFile.readAsLinesSync();
+ if (allLines.length < 500) return; // nothing worth compacting
+
+ // Build a set of line numbers to keep: last keepPerSession per session.
+ final keepLines = <int>{};
+ for (final entry in _index.entries) {
+ final lines = entry.value;
+ final start = lines.length > keepPerSession
+ ? lines.length - keepPerSession
+ : 0;
+ for (var i = start; i < lines.length; i++) {
+ keepLines.add(lines[i]);
+ }
+ }
+
+ if (keepLines.length == allLines.length) return; // nothing removed
+
+ // Rewrite the log with only the kept lines, rebuilding the index.
+ final newIndex = <String, List<int>>{};
+ final buffer = StringBuffer();
+ var newLine = 0;
+
+ for (var i = 0; i < allLines.length; i++) {
+ if (!keepLines.contains(i)) continue;
+ final line = allLines[i].trim();
+ if (line.isEmpty) continue;
+ buffer.write('$line\n');
+ // Extract sessionId for new index.
+ try {
+ final map = jsonDecode(line) as Map<String, dynamic>;
+ final sid = map['sessionId'] as String?;
+ if (sid != null) {
+ newIndex.putIfAbsent(sid, () => []).add(newLine);
+ }
+ } catch (_) {}
+ newLine++;
+ }
+
+ logFile.writeAsStringSync(buffer.toString());
+ _index
+ ..clear()
+ ..addAll(newIndex);
+ _lineCount = newLine;
+ _flushIndex(dir);
+
+ TraceService.instance.addTrace(
+ 'MsgStoreV2 COMPACT', '${allLines.length} → $newLine lines');
} catch (e) {
- TraceService.instance.addTrace('MsgStore LOAD ERROR', '${sessionId.substring(0, 8)}: $e');
- return [];
+ TraceService.instance.addTrace('MsgStoreV2 COMPACT ERROR', '$e');
}
}
- /// Deserialize a message from JSON, applying migration rules:
- /// - Voice messages without audioUri are downgraded to text (transcript only).
- /// This handles messages saved before a restart, where the temp audio file
- /// is no longer available. The transcript (content) is preserved.
+ // --------------------------------------------------------- index flush --
+
+ static void _flushIndex(Directory dir) {
+ try {
+ final indexMap = _index.map(
+ (k, v) => MapEntry(k, v));
+ File(_indexPath(dir))
+ .writeAsStringSync(jsonEncode(indexMap));
+ } catch (_) {}
+ }
+
+ /// Force-flush the index to disk (call on app suspend / session switch).
+ static void flushIndex() {
+ if (_baseDir != null) _flushIndex(_baseDir!);
+ }
+
+ // ------------------------------------------------- migration helper --
+
+ /// Deserialize a message, downgrading voice→text if audio is unavailable.
static Message _messageFromJson(Map<String, dynamic> json) {
final raw = Message.fromJson(json);
if (raw.type == MessageType.voice &&
(raw.audioUri == null || raw.audioUri!.isEmpty)) {
- // Downgrade to text so the bubble shows the transcript instead of a
- // broken play button.
return Message(
id: raw.id,
role: raw.role,
@@ -191,25 +298,18 @@
return raw;
}
- /// Delete stored messages for a session.
- static Future<void> delete(String sessionId) async {
- try {
- final dir = await _getBaseDir();
- final file = File('${dir.path}/${_fileForSession(sessionId)}');
- if (await file.exists()) {
- await file.delete();
- }
- } catch (_) {}
- }
+ // --------------------------------------------------------- clear all --
- /// Clear all stored messages.
+ /// Wipe everything (used from settings / debug).
static Future<void> clearAll() async {
try {
final dir = await _getBaseDir();
- if (await dir.exists()) {
- await dir.delete(recursive: true);
- await dir.create(recursive: true);
+ if (dir.existsSync()) {
+ dir.deleteSync(recursive: true);
+ dir.createSync(recursive: true);
}
+ _index.clear();
+ _lineCount = 0;
} catch (_) {}
}
}
--
Gitblit v1.3.1