Matthias Nott
9 days ago 489419f3b133c4725d1ffdf5fb320898a55e9544
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
import 'dart:convert';
import 'dart:io';
import 'package:flutter/services.dart';
import 'package:path_provider/path_provider.dart';
import '../models/message.dart';
import 'trace_service.dart';
/// Append-only log-based message persistence.
///
/// Layout:
///   messages/log.jsonl   — one JSON object per line, each a serialized Message
///   messages/index.json  — { "sessionId": [lineNumber, ...] }
///
/// All writes are synchronous (writeAsStringSync with FileMode.append) to
/// prevent race conditions between concurrent addMessage / switchSession calls.
class MessageStoreV2 {
  MessageStoreV2._();
  static const _backupChannel = MethodChannel('com.mnsoft.pailot/backup');
  // In-memory index: sessionId -> list of 0-based line numbers in log.jsonl
  static final Map<String, List<int>> _index = {};
  // Number of lines currently in the log (= next line number to write)
  static int _lineCount = 0;
  // Flush the index to disk every N appends to amortise I/O
  static const _indexFlushInterval = 20;
  static int _appendsSinceFlush = 0;
  static Directory? _baseDir;
  // ------------------------------------------------------------------ init --
  static Future<Directory> _getBaseDir() async {
    if (_baseDir != null) return _baseDir!;
    final appDir = await getApplicationDocumentsDirectory();
    _baseDir = Directory('${appDir.path}/messages');
    if (!_baseDir!.existsSync()) {
      _baseDir!.createSync(recursive: true);
    }
    // Exclude from iCloud / iTunes backup (best-effort, iOS only).
    if (Platform.isIOS) {
      try {
        await _backupChannel.invokeMethod<void>(
          'excludeFromBackup',
          _baseDir!.path,
        );
      } catch (_) {}
    }
    return _baseDir!;
  }
  static String _logPath(Directory dir) => '${dir.path}/log.jsonl';
  static String _indexPath(Directory dir) => '${dir.path}/index.json';
  /// Called once at app startup. Reads log.jsonl and rebuilds the in-memory
  /// index. Then calls compact() to trim old messages.
  static Future<void> initialize() async {
    try {
      final dir = await _getBaseDir();
      final logFile = File(_logPath(dir));
      final indexFile = File(_indexPath(dir));
      // Always rebuild index from log (the saved index.json may be stale
      // if the app was killed before a flush).
      if (logFile.existsSync()) {
        final content = logFile.readAsStringSync();
        _lineCount = content.isEmpty
            ? 0
            : content.trimRight().split('\n').length;
        if (_lineCount > 0) {
          await _rebuildIndex(logFile);
        }
      } else {
        _lineCount = 0;
      }
      TraceService.instance.addTrace(
          'MsgStoreV2 INIT', '$_lineCount lines, ${_index.length} sessions');
      // Compact on startup (keeps last 200 per session).
      await compact();
    } catch (e) {
      TraceService.instance.addTrace('MsgStoreV2 INIT ERROR', '$e');
    }
  }
  static Future<void> _rebuildIndex(File logFile) async {
    _index.clear();
    final lines = logFile.readAsLinesSync();
    for (var i = 0; i < lines.length; i++) {
      final line = lines[i].trim();
      if (line.isEmpty) continue;
      try {
        final map = jsonDecode(line) as Map<String, dynamic>;
        final sessionId = map['sessionId'] as String?;
        if (sessionId != null) {
          _index.putIfAbsent(sessionId, () => []).add(i);
        }
      } catch (_) {}
    }
  }
  // --------------------------------------------------------------- append --
  /// Append a message to the log. SYNCHRONOUS — no async gap, no race.
  ///
  /// Each line written includes a 'sessionId' field so the index can be
  /// rebuilt from the log alone if needed.
  static void append(String sessionId, Message message) {
    try {
      final dir = _baseDir;
      if (dir == null) {
        // initialize() hasn't been called yet — silently drop (shouldn't happen).
        TraceService.instance
            .addTrace('MsgStoreV2 APPEND WARN', 'baseDir null, dropping');
        return;
      }
      final logFile = File(_logPath(dir));
      final json = message.toJsonLight();
      json['sessionId'] = sessionId;
      final line = '${jsonEncode(json)}\n';
      // Synchronous append — atomic single write, no read-modify-write.
      logFile.writeAsStringSync(line, mode: FileMode.append);
      // Update in-memory index.
      final lineNum = _lineCount;
      _index.putIfAbsent(sessionId, () => []).add(lineNum);
      _lineCount++;
      TraceService.instance.addTrace('MsgStoreV2 APPEND',
          '${sessionId.substring(0, 8)} line=$lineNum total=$_lineCount idx=${_index[sessionId]?.length ?? 0}');
      // Flush index after every append to prevent data loss on app kill.
      _flushIndex(dir);
    } catch (e) {
      TraceService.instance.addTrace('MsgStoreV2 APPEND ERROR', '$e');
    }
  }
  // -------------------------------------------------------------- load --
  /// Load messages for a session. SYNCHRONOUS — reads from the log using the
  /// in-memory index. Safe to call from switchSession without async gaps.
  static List<Message> loadSession(String sessionId) {
    try {
      final dir = _baseDir;
      if (dir == null) return [];
      final logFile = File(_logPath(dir));
      if (!logFile.existsSync()) return [];
      final lineNumbers = _index[sessionId];
      if (lineNumbers == null || lineNumbers.isEmpty) return [];
      // Read all lines at once then pick the ones we need.
      final allLines = logFile.readAsLinesSync();
      TraceService.instance.addTrace('MsgStoreV2 LOAD detail',
          '${sessionId.substring(0, 8)}: fileLines=${allLines.length} indexEntries=${lineNumbers.length} lineCount=$_lineCount');
      final messages = <Message>[];
      for (final n in lineNumbers) {
        if (n >= allLines.length) continue;
        final line = allLines[n].trim();
        if (line.isEmpty) continue;
        try {
          final map = jsonDecode(line) as Map<String, dynamic>;
          // Remove synthetic sessionId field before deserialising.
          map.remove('sessionId');
          final msg = _messageFromJson(map);
          if (!msg.isEmptyVoice && !msg.isEmptyText) {
            messages.add(msg);
          }
        } catch (_) {}
      }
      TraceService.instance.addTrace(
          'MsgStoreV2 LOAD', '${sessionId.substring(0, 8)}: ${messages.length} msgs');
      return messages;
    } catch (e) {
      TraceService.instance
          .addTrace('MsgStoreV2 LOAD ERROR', '${sessionId.substring(0, 8)}: $e');
      return [];
    }
  }
  // ------------------------------------------------------------- compact --
  /// Rewrite the log keeping at most [keepPerSession] messages per session.
  /// Called once on startup after initialize(). NOT called during normal use.
  static Future<void> compact({int keepPerSession = 200}) async {
    try {
      final dir = await _getBaseDir();
      final logFile = File(_logPath(dir));
      if (!logFile.existsSync()) return;
      final allLines = logFile.readAsLinesSync();
      if (allLines.length < 500) return; // nothing worth compacting
      // Build a set of line numbers to keep: last keepPerSession per session.
      final keepLines = <int>{};
      for (final entry in _index.entries) {
        final lines = entry.value;
        final start = lines.length > keepPerSession
            ? lines.length - keepPerSession
            : 0;
        for (var i = start; i < lines.length; i++) {
          keepLines.add(lines[i]);
        }
      }
      if (keepLines.length == allLines.length) return; // nothing removed
      // Rewrite the log with only the kept lines, rebuilding the index.
      final newIndex = <String, List<int>>{};
      final buffer = StringBuffer();
      var newLine = 0;
      for (var i = 0; i < allLines.length; i++) {
        if (!keepLines.contains(i)) continue;
        final line = allLines[i].trim();
        if (line.isEmpty) continue;
        buffer.write('$line\n');
        // Extract sessionId for new index.
        try {
          final map = jsonDecode(line) as Map<String, dynamic>;
          final sid = map['sessionId'] as String?;
          if (sid != null) {
            newIndex.putIfAbsent(sid, () => []).add(newLine);
          }
        } catch (_) {}
        newLine++;
      }
      logFile.writeAsStringSync(buffer.toString());
      _index
        ..clear()
        ..addAll(newIndex);
      _lineCount = newLine;
      _flushIndex(dir);
      TraceService.instance.addTrace(
          'MsgStoreV2 COMPACT', '${allLines.length} → $newLine lines');
    } catch (e) {
      TraceService.instance.addTrace('MsgStoreV2 COMPACT ERROR', '$e');
    }
  }
  // --------------------------------------------------------- index flush --
  static void _flushIndex(Directory dir) {
    try {
      final indexMap = _index.map(
          (k, v) => MapEntry(k, v));
      File(_indexPath(dir))
          .writeAsStringSync(jsonEncode(indexMap));
    } catch (_) {}
  }
  /// Force-flush the index to disk (call on app suspend / session switch).
  static void flushIndex() {
    if (_baseDir != null) _flushIndex(_baseDir!);
  }
  // ------------------------------------------------- migration helper --
  /// Deserialize a message, downgrading voice→text if audio is unavailable.
  static Message _messageFromJson(Map<String, dynamic> json) {
    final raw = Message.fromJson(json);
    if (raw.type == MessageType.voice &&
        (raw.audioUri == null || raw.audioUri!.isEmpty)) {
      return Message(
        id: raw.id,
        role: raw.role,
        type: MessageType.text,
        content: raw.content,
        timestamp: raw.timestamp,
        status: raw.status,
        duration: raw.duration,
      );
    }
    return raw;
  }
  // --------------------------------------------------------- clear all --
  /// Wipe everything (used from settings / debug).
  static Future<void> clearAll() async {
    try {
      final dir = await _getBaseDir();
      if (dir.existsSync()) {
        dir.deleteSync(recursive: true);
        dir.createSync(recursive: true);
      }
      _index.clear();
      _lineCount = 0;
    } catch (_) {}
  }
}