import 'dart:convert'; import 'dart:io'; import 'package:flutter/services.dart'; import 'package:path_provider/path_provider.dart'; import '../models/message.dart'; import 'trace_service.dart'; /// Append-only log-based message persistence. /// /// Layout: /// messages/log.jsonl — one JSON object per line, each a serialized Message /// messages/index.json — { "sessionId": [lineNumber, ...] } /// /// All writes are synchronous (writeAsStringSync with FileMode.append) to /// prevent race conditions between concurrent addMessage / switchSession calls. class MessageStoreV2 { MessageStoreV2._(); static const _backupChannel = MethodChannel('com.mnsoft.pailot/backup'); // In-memory index: sessionId -> list of 0-based line numbers in log.jsonl static final Map> _index = {}; // Number of lines currently in the log (= next line number to write) static int _lineCount = 0; // Flush the index to disk every N appends to amortise I/O static const _indexFlushInterval = 20; static int _appendsSinceFlush = 0; static Directory? _baseDir; // ------------------------------------------------------------------ init -- static Future _getBaseDir() async { if (_baseDir != null) return _baseDir!; final appDir = await getApplicationDocumentsDirectory(); _baseDir = Directory('${appDir.path}/messages'); if (!_baseDir!.existsSync()) { _baseDir!.createSync(recursive: true); } // Exclude from iCloud / iTunes backup (best-effort, iOS only). if (Platform.isIOS) { try { await _backupChannel.invokeMethod( 'excludeFromBackup', _baseDir!.path, ); } catch (_) {} } return _baseDir!; } static String _logPath(Directory dir) => '${dir.path}/log.jsonl'; static String _indexPath(Directory dir) => '${dir.path}/index.json'; /// Called once at app startup. Reads log.jsonl and rebuilds the in-memory /// index. Then calls compact() to trim old messages. static Future initialize() async { try { final dir = await _getBaseDir(); final logFile = File(_logPath(dir)); final indexFile = File(_indexPath(dir)); // Try loading saved index first (fast path). if (indexFile.existsSync()) { try { final raw = indexFile.readAsStringSync(); final decoded = jsonDecode(raw) as Map; for (final entry in decoded.entries) { _index[entry.key] = (entry.value as List).map((e) => e as int).toList(); } } catch (_) { _index.clear(); } } // Count actual lines in log to set _lineCount. if (logFile.existsSync()) { final content = logFile.readAsStringSync(); _lineCount = content.isEmpty ? 0 : content.trimRight().split('\n').length; } else { _lineCount = 0; } // If the index was missing or corrupt, rebuild from log. if (_index.isEmpty && _lineCount > 0) { await _rebuildIndex(logFile); } TraceService.instance.addTrace( 'MsgStoreV2 INIT', '$_lineCount lines, ${_index.length} sessions'); // Compact on startup (keeps last 200 per session). await compact(); } catch (e) { TraceService.instance.addTrace('MsgStoreV2 INIT ERROR', '$e'); } } static Future _rebuildIndex(File logFile) async { _index.clear(); final lines = logFile.readAsLinesSync(); for (var i = 0; i < lines.length; i++) { final line = lines[i].trim(); if (line.isEmpty) continue; try { final map = jsonDecode(line) as Map; final sessionId = map['sessionId'] as String?; if (sessionId != null) { _index.putIfAbsent(sessionId, () => []).add(i); } } catch (_) {} } } // --------------------------------------------------------------- append -- /// Append a message to the log. SYNCHRONOUS — no async gap, no race. /// /// Each line written includes a 'sessionId' field so the index can be /// rebuilt from the log alone if needed. static void append(String sessionId, Message message) { try { final dir = _baseDir; if (dir == null) { // initialize() hasn't been called yet — silently drop (shouldn't happen). TraceService.instance .addTrace('MsgStoreV2 APPEND WARN', 'baseDir null, dropping'); return; } final logFile = File(_logPath(dir)); final json = message.toJsonLight(); json['sessionId'] = sessionId; final line = '${jsonEncode(json)}\n'; // Synchronous append — atomic single write, no read-modify-write. logFile.writeAsStringSync(line, mode: FileMode.append); // Update in-memory index. _index.putIfAbsent(sessionId, () => []).add(_lineCount); _lineCount++; // Periodically flush index to disk. _appendsSinceFlush++; if (_appendsSinceFlush >= _indexFlushInterval) { _flushIndex(dir); _appendsSinceFlush = 0; } } catch (e) { TraceService.instance.addTrace('MsgStoreV2 APPEND ERROR', '$e'); } } // -------------------------------------------------------------- load -- /// Load messages for a session. SYNCHRONOUS — reads from the log using the /// in-memory index. Safe to call from switchSession without async gaps. static List loadSession(String sessionId) { try { final dir = _baseDir; if (dir == null) return []; final logFile = File(_logPath(dir)); if (!logFile.existsSync()) return []; final lineNumbers = _index[sessionId]; if (lineNumbers == null || lineNumbers.isEmpty) return []; // Read all lines at once then pick the ones we need. final allLines = logFile.readAsLinesSync(); final messages = []; for (final n in lineNumbers) { if (n >= allLines.length) continue; final line = allLines[n].trim(); if (line.isEmpty) continue; try { final map = jsonDecode(line) as Map; // Remove synthetic sessionId field before deserialising. map.remove('sessionId'); final msg = _messageFromJson(map); if (!msg.isEmptyVoice && !msg.isEmptyText) { messages.add(msg); } } catch (_) {} } TraceService.instance.addTrace( 'MsgStoreV2 LOAD', '${sessionId.substring(0, 8)}: ${messages.length} msgs'); return messages; } catch (e) { TraceService.instance .addTrace('MsgStoreV2 LOAD ERROR', '${sessionId.substring(0, 8)}: $e'); return []; } } // ------------------------------------------------------------- compact -- /// Rewrite the log keeping at most [keepPerSession] messages per session. /// Called once on startup after initialize(). NOT called during normal use. static Future compact({int keepPerSession = 200}) async { try { final dir = await _getBaseDir(); final logFile = File(_logPath(dir)); if (!logFile.existsSync()) return; final allLines = logFile.readAsLinesSync(); if (allLines.length < 500) return; // nothing worth compacting // Build a set of line numbers to keep: last keepPerSession per session. final keepLines = {}; for (final entry in _index.entries) { final lines = entry.value; final start = lines.length > keepPerSession ? lines.length - keepPerSession : 0; for (var i = start; i < lines.length; i++) { keepLines.add(lines[i]); } } if (keepLines.length == allLines.length) return; // nothing removed // Rewrite the log with only the kept lines, rebuilding the index. final newIndex = >{}; final buffer = StringBuffer(); var newLine = 0; for (var i = 0; i < allLines.length; i++) { if (!keepLines.contains(i)) continue; final line = allLines[i].trim(); if (line.isEmpty) continue; buffer.write('$line\n'); // Extract sessionId for new index. try { final map = jsonDecode(line) as Map; final sid = map['sessionId'] as String?; if (sid != null) { newIndex.putIfAbsent(sid, () => []).add(newLine); } } catch (_) {} newLine++; } logFile.writeAsStringSync(buffer.toString()); _index ..clear() ..addAll(newIndex); _lineCount = newLine; _flushIndex(dir); TraceService.instance.addTrace( 'MsgStoreV2 COMPACT', '${allLines.length} → $newLine lines'); } catch (e) { TraceService.instance.addTrace('MsgStoreV2 COMPACT ERROR', '$e'); } } // --------------------------------------------------------- index flush -- static void _flushIndex(Directory dir) { try { final indexMap = _index.map( (k, v) => MapEntry(k, v)); File(_indexPath(dir)) .writeAsStringSync(jsonEncode(indexMap)); } catch (_) {} } /// Force-flush the index to disk (call on app suspend / session switch). static void flushIndex() { if (_baseDir != null) _flushIndex(_baseDir!); } // ------------------------------------------------- migration helper -- /// Deserialize a message, downgrading voice→text if audio is unavailable. static Message _messageFromJson(Map json) { final raw = Message.fromJson(json); if (raw.type == MessageType.voice && (raw.audioUri == null || raw.audioUri!.isEmpty)) { return Message( id: raw.id, role: raw.role, type: MessageType.text, content: raw.content, timestamp: raw.timestamp, status: raw.status, duration: raw.duration, ); } return raw; } // --------------------------------------------------------- clear all -- /// Wipe everything (used from settings / debug). static Future clearAll() async { try { final dir = await _getBaseDir(); if (dir.existsSync()) { dir.deleteSync(recursive: true); dir.createSync(recursive: true); } _index.clear(); _lineCount = 0; } catch (_) {} } }