Matthias Nott
9 days ago 06bb73662d32d65d1e775a4dd35f67d82d673e40
lib/services/message_store.dart
....@@ -1,4 +1,3 @@
1
-import 'dart:async';
21 import 'dart:convert';
32 import 'dart:io';
43
....@@ -8,30 +7,39 @@
87 import '../models/message.dart';
98 import 'trace_service.dart';
109
11
-/// Per-session JSON file persistence with debounced saves.
12
-class MessageStore {
13
- MessageStore._();
10
+/// Append-only log-based message persistence.
11
+///
12
+/// Layout:
13
+/// messages/log.jsonl — one JSON object per line, each a serialized Message
14
+/// messages/index.json — { "sessionId": [lineNumber, ...] }
15
+///
16
+/// All writes are synchronous (writeAsStringSync with FileMode.append) to
17
+/// prevent race conditions between concurrent addMessage / switchSession calls.
18
+class MessageStoreV2 {
19
+ MessageStoreV2._();
20
+
21
+ static const _backupChannel = MethodChannel('com.mnsoft.pailot/backup');
22
+
23
+ // In-memory index: sessionId -> list of 0-based line numbers in log.jsonl
24
+ static final Map<String, List<int>> _index = {};
25
+
26
+ // Number of lines currently in the log (= next line number to write)
27
+ static int _lineCount = 0;
28
+
29
+ // Flush the index to disk every N appends to amortise I/O
30
+ static const _indexFlushInterval = 20;
31
+ static int _appendsSinceFlush = 0;
1432
1533 static Directory? _baseDir;
16
- static Timer? _debounceTimer;
17
- static final Map<String, List<Message>> _pendingSaves = {};
18
- // Per-session lock to prevent concurrent read/write on the same file
19
- static final Map<String, Completer<void>> _locks = {};
2034
21
- static const _backupChannel =
22
- MethodChannel('com.mnsoft.pailot/backup');
35
+ // ------------------------------------------------------------------ init --
2336
24
- /// Initialize the base directory for message storage.
25
- /// On iOS, the directory is excluded from iCloud / iTunes backup so that
26
- /// large base64 image attachments do not bloat the user's cloud storage.
27
- /// Messages can be re-fetched from the server if needed.
2837 static Future<Directory> _getBaseDir() async {
2938 if (_baseDir != null) return _baseDir!;
3039 final appDir = await getApplicationDocumentsDirectory();
3140 _baseDir = Directory('${appDir.path}/messages');
32
- final created = !await _baseDir!.exists();
33
- if (created) {
34
- await _baseDir!.create(recursive: true);
41
+ if (!_baseDir!.existsSync()) {
42
+ _baseDir!.createSync(recursive: true);
3543 }
3644 // Exclude from iCloud / iTunes backup (best-effort, iOS only).
3745 if (Platform.isIOS) {
....@@ -40,144 +48,243 @@
4048 'excludeFromBackup',
4149 _baseDir!.path,
4250 );
43
- } catch (_) {
44
- // Non-fatal: if the channel call fails, backup exclusion is skipped.
45
- }
51
+ } catch (_) {}
4652 }
4753 return _baseDir!;
4854 }
4955
50
- static String _fileForSession(String sessionId) {
51
- // Sanitize session ID for filename
52
- final safe = sessionId.replaceAll(RegExp(r'[^\w\-]'), '_');
53
- return 'session_$safe.json';
54
- }
56
+ static String _logPath(Directory dir) => '${dir.path}/log.jsonl';
57
+ static String _indexPath(Directory dir) => '${dir.path}/index.json';
5558
56
- /// Save messages for a session with 1-second debounce.
57
- static void save(String sessionId, List<Message> messages) {
58
- _pendingSaves[sessionId] = messages;
59
- _debounceTimer?.cancel();
60
- _debounceTimer = Timer(const Duration(seconds: 1), _flushAll);
61
- }
62
-
63
- /// Write directly to disk, bypassing debounce. For critical saves.
64
- static Future<void> writeDirect(String sessionId, List<Message> messages) async {
65
- _debounceTimer?.cancel();
66
- _pendingSaves.remove(sessionId);
67
- await _withLock(sessionId, () => _writeSession(sessionId, messages));
68
- }
69
-
70
- /// Acquire a per-session lock, run the operation, release.
71
- static Future<T> _withLock<T>(String sessionId, Future<T> Function() fn) async {
72
- // Wait for any existing operation on this session to finish
73
- while (_locks.containsKey(sessionId)) {
74
- await _locks[sessionId]!.future;
75
- }
76
- final completer = Completer<void>();
77
- _locks[sessionId] = completer;
78
- try {
79
- return await fn();
80
- } finally {
81
- _locks.remove(sessionId);
82
- completer.complete();
83
- }
84
- }
85
-
86
- /// Immediately flush all pending saves.
87
- static Future<void> flush() async {
88
- _debounceTimer?.cancel();
89
- await _flushAll();
90
- }
91
-
92
- static Future<void> _flushAll() async {
93
- final entries = Map<String, List<Message>>.from(_pendingSaves);
94
- _pendingSaves.clear();
95
-
96
- for (final entry in entries.entries) {
97
- await _withLock(entry.key, () => _writeSession(entry.key, entry.value));
98
- }
99
- }
100
-
101
- static Future<void> _writeSession(
102
- String sessionId, List<Message> messages) async {
59
+ /// Called once at app startup. Reads log.jsonl and rebuilds the in-memory
60
+ /// index. Then calls compact() to trim old messages.
61
+ static Future<void> initialize() async {
10362 try {
10463 final dir = await _getBaseDir();
105
- final file = File('${dir.path}/${_fileForSession(sessionId)}');
106
- // Strip heavy fields for persistence
107
- final lightMessages = messages.map((m) => m.toJsonLight()).toList();
108
- final json = jsonEncode(lightMessages);
109
- await file.writeAsString(json);
110
- TraceService.instance.addTrace('MsgStore WRITE', '${sessionId.substring(0, 8)}: ${messages.length} msgs');
64
+ final logFile = File(_logPath(dir));
65
+ final indexFile = File(_indexPath(dir));
66
+
67
+ // Try loading saved index first (fast path).
68
+ if (indexFile.existsSync()) {
69
+ try {
70
+ final raw = indexFile.readAsStringSync();
71
+ final decoded = jsonDecode(raw) as Map<String, dynamic>;
72
+ for (final entry in decoded.entries) {
73
+ _index[entry.key] =
74
+ (entry.value as List<dynamic>).map((e) => e as int).toList();
75
+ }
76
+ } catch (_) {
77
+ _index.clear();
78
+ }
79
+ }
80
+
81
+ // Count actual lines in log to set _lineCount.
82
+ if (logFile.existsSync()) {
83
+ final content = logFile.readAsStringSync();
84
+ _lineCount = content.isEmpty
85
+ ? 0
86
+ : content.trimRight().split('\n').length;
87
+ } else {
88
+ _lineCount = 0;
89
+ }
90
+
91
+ // If the index was missing or corrupt, rebuild from log.
92
+ if (_index.isEmpty && _lineCount > 0) {
93
+ await _rebuildIndex(logFile);
94
+ }
95
+
96
+ TraceService.instance.addTrace(
97
+ 'MsgStoreV2 INIT', '$_lineCount lines, ${_index.length} sessions');
98
+
99
+ // Compact on startup (keeps last 200 per session).
100
+ await compact();
111101 } catch (e) {
112
- TraceService.instance.addTrace('MsgStore WRITE ERROR', '${sessionId.substring(0, 8)}: $e');
102
+ TraceService.instance.addTrace('MsgStoreV2 INIT ERROR', '$e');
113103 }
114104 }
115105
116
- /// Load messages for a session.
117
- /// [limit] controls how many recent messages to return (default: 50).
118
- /// [offset] is the number of messages to skip from the end (for pagination).
119
- static Future<List<Message>> load(
120
- String sessionId, {
121
- int limit = 50,
122
- int offset = 0,
123
- }) async {
106
+ static Future<void> _rebuildIndex(File logFile) async {
107
+ _index.clear();
108
+ final lines = logFile.readAsLinesSync();
109
+ for (var i = 0; i < lines.length; i++) {
110
+ final line = lines[i].trim();
111
+ if (line.isEmpty) continue;
112
+ try {
113
+ final map = jsonDecode(line) as Map<String, dynamic>;
114
+ final sessionId = map['sessionId'] as String?;
115
+ if (sessionId != null) {
116
+ _index.putIfAbsent(sessionId, () => []).add(i);
117
+ }
118
+ } catch (_) {}
119
+ }
120
+ }
121
+
122
+ // --------------------------------------------------------------- append --
123
+
124
+ /// Append a message to the log. SYNCHRONOUS — no async gap, no race.
125
+ ///
126
+ /// Each line written includes a 'sessionId' field so the index can be
127
+ /// rebuilt from the log alone if needed.
128
+ static void append(String sessionId, Message message) {
124129 try {
125
- final dir = await _getBaseDir();
126
- final file = File('${dir.path}/${_fileForSession(sessionId)}');
127
- if (!await file.exists()) return [];
130
+ final dir = _baseDir;
131
+ if (dir == null) {
132
+ // initialize() hasn't been called yet — silently drop (shouldn't happen).
133
+ TraceService.instance
134
+ .addTrace('MsgStoreV2 APPEND WARN', 'baseDir null, dropping');
135
+ return;
136
+ }
137
+ final logFile = File(_logPath(dir));
138
+ final json = message.toJsonLight();
139
+ json['sessionId'] = sessionId;
140
+ final line = '${jsonEncode(json)}\n';
128141
129
- final jsonStr = await file.readAsString();
130
- final List<dynamic> jsonList = jsonDecode(jsonStr) as List<dynamic>;
131
- final allMessages = jsonList
132
- .map((j) => _messageFromJson(j as Map<String, dynamic>))
133
- .where((m) => !m.isEmptyVoice && !m.isEmptyText)
134
- .toList();
142
+ // Synchronous append — atomic single write, no read-modify-write.
143
+ logFile.writeAsStringSync(line, mode: FileMode.append);
135144
136
- // Paginate from the end (newest messages first in storage)
137
- if (offset >= allMessages.length) return [];
138
- final end = allMessages.length - offset;
139
- final start = (end - limit).clamp(0, end);
140
- return allMessages.sublist(start, end);
145
+ // Update in-memory index.
146
+ _index.putIfAbsent(sessionId, () => []).add(_lineCount);
147
+ _lineCount++;
148
+
149
+ // Periodically flush index to disk.
150
+ _appendsSinceFlush++;
151
+ if (_appendsSinceFlush >= _indexFlushInterval) {
152
+ _flushIndex(dir);
153
+ _appendsSinceFlush = 0;
154
+ }
141155 } catch (e) {
156
+ TraceService.instance.addTrace('MsgStoreV2 APPEND ERROR', '$e');
157
+ }
158
+ }
159
+
160
+ // -------------------------------------------------------------- load --
161
+
162
+ /// Load messages for a session. SYNCHRONOUS — reads from the log using the
163
+ /// in-memory index. Safe to call from switchSession without async gaps.
164
+ static List<Message> loadSession(String sessionId) {
165
+ try {
166
+ final dir = _baseDir;
167
+ if (dir == null) return [];
168
+ final logFile = File(_logPath(dir));
169
+ if (!logFile.existsSync()) return [];
170
+
171
+ final lineNumbers = _index[sessionId];
172
+ if (lineNumbers == null || lineNumbers.isEmpty) return [];
173
+
174
+ // Read all lines at once then pick the ones we need.
175
+ final allLines = logFile.readAsLinesSync();
176
+ final messages = <Message>[];
177
+
178
+ for (final n in lineNumbers) {
179
+ if (n >= allLines.length) continue;
180
+ final line = allLines[n].trim();
181
+ if (line.isEmpty) continue;
182
+ try {
183
+ final map = jsonDecode(line) as Map<String, dynamic>;
184
+ // Remove synthetic sessionId field before deserialising.
185
+ map.remove('sessionId');
186
+ final msg = _messageFromJson(map);
187
+ if (!msg.isEmptyVoice && !msg.isEmptyText) {
188
+ messages.add(msg);
189
+ }
190
+ } catch (_) {}
191
+ }
192
+
193
+ TraceService.instance.addTrace(
194
+ 'MsgStoreV2 LOAD', '${sessionId.substring(0, 8)}: ${messages.length} msgs');
195
+ return messages;
196
+ } catch (e) {
197
+ TraceService.instance
198
+ .addTrace('MsgStoreV2 LOAD ERROR', '${sessionId.substring(0, 8)}: $e');
142199 return [];
143200 }
144201 }
145202
146
- /// Load all messages for a session (no pagination).
147
- static Future<List<Message>> loadAll(String sessionId) async {
148
- return _withLock(sessionId, () => _loadAllImpl(sessionId));
149
- }
203
+ // ------------------------------------------------------------- compact --
150204
151
- static Future<List<Message>> _loadAllImpl(String sessionId) async {
205
+ /// Rewrite the log keeping at most [keepPerSession] messages per session.
206
+ /// Called once on startup after initialize(). NOT called during normal use.
207
+ static Future<void> compact({int keepPerSession = 200}) async {
152208 try {
153209 final dir = await _getBaseDir();
154
- final file = File('${dir.path}/${_fileForSession(sessionId)}');
155
- if (!await file.exists()) return [];
210
+ final logFile = File(_logPath(dir));
211
+ if (!logFile.existsSync()) return;
156212
157
- final jsonStr = await file.readAsString();
158
- final List<dynamic> jsonList = jsonDecode(jsonStr) as List<dynamic>;
159
- final msgs = jsonList
160
- .map((j) => _messageFromJson(j as Map<String, dynamic>))
161
- .where((m) => !m.isEmptyVoice && !m.isEmptyText)
162
- .toList();
163
- TraceService.instance.addTrace('MsgStore LOAD', '${sessionId.substring(0, 8)}: ${msgs.length} msgs');
164
- return msgs;
213
+ final allLines = logFile.readAsLinesSync();
214
+ if (allLines.length < 500) return; // nothing worth compacting
215
+
216
+ // Build a set of line numbers to keep: last keepPerSession per session.
217
+ final keepLines = <int>{};
218
+ for (final entry in _index.entries) {
219
+ final lines = entry.value;
220
+ final start = lines.length > keepPerSession
221
+ ? lines.length - keepPerSession
222
+ : 0;
223
+ for (var i = start; i < lines.length; i++) {
224
+ keepLines.add(lines[i]);
225
+ }
226
+ }
227
+
228
+ if (keepLines.length == allLines.length) return; // nothing removed
229
+
230
+ // Rewrite the log with only the kept lines, rebuilding the index.
231
+ final newIndex = <String, List<int>>{};
232
+ final buffer = StringBuffer();
233
+ var newLine = 0;
234
+
235
+ for (var i = 0; i < allLines.length; i++) {
236
+ if (!keepLines.contains(i)) continue;
237
+ final line = allLines[i].trim();
238
+ if (line.isEmpty) continue;
239
+ buffer.write('$line\n');
240
+ // Extract sessionId for new index.
241
+ try {
242
+ final map = jsonDecode(line) as Map<String, dynamic>;
243
+ final sid = map['sessionId'] as String?;
244
+ if (sid != null) {
245
+ newIndex.putIfAbsent(sid, () => []).add(newLine);
246
+ }
247
+ } catch (_) {}
248
+ newLine++;
249
+ }
250
+
251
+ logFile.writeAsStringSync(buffer.toString());
252
+ _index
253
+ ..clear()
254
+ ..addAll(newIndex);
255
+ _lineCount = newLine;
256
+ _flushIndex(dir);
257
+
258
+ TraceService.instance.addTrace(
259
+ 'MsgStoreV2 COMPACT', '${allLines.length} → $newLine lines');
165260 } catch (e) {
166
- TraceService.instance.addTrace('MsgStore LOAD ERROR', '${sessionId.substring(0, 8)}: $e');
167
- return [];
261
+ TraceService.instance.addTrace('MsgStoreV2 COMPACT ERROR', '$e');
168262 }
169263 }
170264
171
- /// Deserialize a message from JSON, applying migration rules:
172
- /// - Voice messages without audioUri are downgraded to text (transcript only).
173
- /// This handles messages saved before a restart, where the temp audio file
174
- /// is no longer available. The transcript (content) is preserved.
265
+ // --------------------------------------------------------- index flush --
266
+
267
+ static void _flushIndex(Directory dir) {
268
+ try {
269
+ final indexMap = _index.map(
270
+ (k, v) => MapEntry(k, v));
271
+ File(_indexPath(dir))
272
+ .writeAsStringSync(jsonEncode(indexMap));
273
+ } catch (_) {}
274
+ }
275
+
276
+ /// Force-flush the index to disk (call on app suspend / session switch).
277
+ static void flushIndex() {
278
+ if (_baseDir != null) _flushIndex(_baseDir!);
279
+ }
280
+
281
+ // ------------------------------------------------- migration helper --
282
+
283
+ /// Deserialize a message, downgrading voice→text if audio is unavailable.
175284 static Message _messageFromJson(Map<String, dynamic> json) {
176285 final raw = Message.fromJson(json);
177286 if (raw.type == MessageType.voice &&
178287 (raw.audioUri == null || raw.audioUri!.isEmpty)) {
179
- // Downgrade to text so the bubble shows the transcript instead of a
180
- // broken play button.
181288 return Message(
182289 id: raw.id,
183290 role: raw.role,
....@@ -191,25 +298,18 @@
191298 return raw;
192299 }
193300
194
- /// Delete stored messages for a session.
195
- static Future<void> delete(String sessionId) async {
196
- try {
197
- final dir = await _getBaseDir();
198
- final file = File('${dir.path}/${_fileForSession(sessionId)}');
199
- if (await file.exists()) {
200
- await file.delete();
201
- }
202
- } catch (_) {}
203
- }
301
+ // --------------------------------------------------------- clear all --
204302
205
- /// Clear all stored messages.
303
+ /// Wipe everything (used from settings / debug).
206304 static Future<void> clearAll() async {
207305 try {
208306 final dir = await _getBaseDir();
209
- if (await dir.exists()) {
210
- await dir.delete(recursive: true);
211
- await dir.create(recursive: true);
307
+ if (dir.existsSync()) {
308
+ dir.deleteSync(recursive: true);
309
+ dir.createSync(recursive: true);
212310 }
311
+ _index.clear();
312
+ _lineCount = 0;
213313 } catch (_) {}
214314 }
215315 }