Matthias Nott
8 days ago 1bf6e76e31383aef77e42943fc2caf350cf7e096
fix: resume message reload, direct session writes, MQTT trace pipe to server
5 files modified
changed files
lib/providers/providers.dart patch | view | blame | history
lib/screens/chat_screen.dart patch | view | blame | history
lib/services/message_store.dart patch | view | blame | history
lib/services/mqtt_service.dart patch | view | blame | history
lib/services/trace_service.dart patch | view | blame | history
lib/providers/providers.dart
....@@ -99,10 +99,9 @@
9999
100100 /// Switch to a new session and load its messages.
101101 Future<void> switchSession(String sessionId) async {
102
- // Force-flush current session to disk before switching
102
+ // Write current session DIRECTLY to disk (no debounce — prevents data loss)
103103 if (_currentSessionId != null && state.isNotEmpty) {
104
- MessageStore.save(_currentSessionId!, state);
105
- await MessageStore.flush();
104
+ await MessageStore.writeDirect(_currentSessionId!, state);
106105 }
107106
108107 _currentSessionId = sessionId;
lib/screens/chat_screen.dart
....@@ -65,6 +65,7 @@
6565 String? _playingMessageId;
6666 int _lastSeq = 0;
6767 bool _isCatchingUp = false;
68
+ bool _catchUpReceived = false;
6869 bool _screenshotForChat = false;
6970 // FIFO dedup queue: O(1) eviction by removing from front when over cap.
7071 final List<int> _seenSeqsList = [];
....@@ -222,28 +223,20 @@
222223 // Re-register APNs token after reconnect so daemon always has a fresh token
223224 _push?.onMqttConnected();
224225 };
225
- _ws!.onResume = () {
226
- // App came back from background — connection may or may not be alive.
227
- // Try catch_up first, but if no response comes, force reconnect.
228
- _chatLog('onResume: sending catch_up with lastSeq=$_lastSeq');
229
- final seqBefore = _lastSeq;
226
+ _ws!.onResume = () async {
227
+ // App came back from background — reload messages and catch up.
228
+ _chatLog('onResume: reloading messages and sending catch_up');
230229 _sendCommand('catch_up', {'lastSeq': _lastSeq});
231
- // Force UI rebuild for any buffered messages
232
- Future.delayed(const Duration(milliseconds: 300), () {
233
- if (mounted) {
234
- setState(() {});
235
- _scrollToBottom();
236
- }
237
- });
238
- // If catch_up didn't produce a response in 2s, connection is dead — reconnect
239
- Future.delayed(const Duration(seconds: 2), () {
240
- if (!mounted) return;
241
- if (_lastSeq == seqBefore) {
242
- // No new messages arrived — connection likely dead
243
- _chatLog('onResume: no catch_up response after 2s, forcing reconnect');
244
- _ws?.forceReconnect();
245
- }
246
- });
230
+ // Force reload current session messages from provider (triggers rebuild)
231
+ final activeId = ref.read(activeSessionIdProvider);
232
+ if (activeId != null) {
233
+ // Re-save then re-load to ensure UI matches persisted state
234
+ await ref.read(messagesProvider.notifier).switchSession(activeId);
235
+ }
236
+ if (mounted) {
237
+ setState(() {});
238
+ _scrollToBottom();
239
+ }
247240 };
248241 _ws!.onError = (error) {
249242 debugPrint('MQTT error: $error');
....@@ -259,6 +252,9 @@
259252 );
260253
261254 await _ws!.connect();
255
+
256
+ // Attach MQTT to trace service for auto-publishing logs to server
257
+ TraceService.instance.attachMqtt(_ws!);
262258
263259 // Initialize push notifications after MQTT is set up so token can be
264260 // sent immediately if already connected.
....@@ -371,6 +367,7 @@
371367 final sessionId = msg['sessionId'] as String?;
372368 if (sessionId != null) _incrementUnread(sessionId);
373369 case 'catch_up':
370
+ _catchUpReceived = true;
374371 final serverSeq = msg['serverSeq'] as int?;
375372 if (serverSeq != null) {
376373 // Always sync to server's seq — if server restarted, its seq may be lower
lib/services/message_store.dart
....@@ -57,6 +57,12 @@
5757 _debounceTimer = Timer(const Duration(seconds: 1), _flushAll);
5858 }
5959
60
+ /// Write directly to disk, bypassing debounce. For critical saves.
61
+ static Future<void> writeDirect(String sessionId, List<Message> messages) async {
62
+ _pendingSaves.remove(sessionId);
63
+ await _writeSession(sessionId, messages);
64
+ }
65
+
6066 /// Immediately flush all pending saves.
6167 static Future<void> flush() async {
6268 _debounceTimer?.cancel();
lib/services/mqtt_service.dart
....@@ -11,6 +11,7 @@
1111 import 'package:path_provider/path_provider.dart' as pp;
1212 import 'package:mqtt_client/mqtt_client.dart';
1313 import 'package:mqtt_client/mqtt_server_client.dart';
14
+import 'package:typed_data/typed_data.dart';
1415 import 'package:shared_preferences/shared_preferences.dart';
1516 import 'package:uuid/uuid.dart';
1617
....@@ -665,6 +666,15 @@
665666 /// Current timestamp in milliseconds.
666667 int _now() => DateTime.now().millisecondsSinceEpoch;
667668
669
+ /// Publish raw bytes to a topic. Used by TraceService for log streaming.
670
+ void publishRaw(String topic, Uint8Buffer payload, MqttQos qos) {
671
+ final client = _client;
672
+ if (client == null || client.connectionStatus?.state != MqttConnectionState.connected) return;
673
+ try {
674
+ client.publishMessage(topic, qos, payload);
675
+ } catch (_) {}
676
+ }
677
+
668678 /// Publish a JSON payload to an MQTT topic.
669679 void _publish(String topic, Map<String, dynamic> payload, MqttQos qos) {
670680 final client = _client;
lib/services/trace_service.dart
....@@ -1,4 +1,10 @@
1
+import 'dart:convert';
2
+
13 import 'package:flutter/foundation.dart';
4
+import 'package:mqtt_client/mqtt_client.dart';
5
+import 'package:mqtt_client/mqtt_server_client.dart';
6
+
7
+import 'mqtt_service.dart';
28
39 /// A single trace entry capturing a message-handling event.
410 class TraceEntry {
....@@ -22,17 +28,28 @@
2228 /// Captures message-handling events from MQTT, chat screen, and other
2329 /// components. The buffer is capped at [maxEntries] (default 200).
2430 /// Works in both debug and release builds.
31
+///
32
+/// When an MqttService is attached via [attachMqtt], trace entries are
33
+/// automatically published to the server on `pailot/control/in` so they
34
+/// can be read from the daemon log.
2535 class TraceService {
2636 TraceService._();
2737 static final TraceService instance = TraceService._();
2838
2939 static const int maxEntries = 200;
3040 final List<TraceEntry> _entries = [];
41
+ MqttService? _mqtt;
42
+
43
+ /// Attach an MQTT service for auto-publishing traces to the server.
44
+ void attachMqtt(MqttService mqtt) {
45
+ _mqtt = mqtt;
46
+ }
3147
3248 /// All entries, oldest first.
3349 List<TraceEntry> get entries => List.unmodifiable(_entries);
3450
3551 /// Add a trace entry. Oldest entry is evicted once the buffer is full.
52
+ /// If MQTT is attached and connected, the entry is also published to the server.
3653 void addTrace(String event, String details) {
3754 _entries.add(TraceEntry(
3855 timestamp: DateTime.now(),
....@@ -43,6 +60,28 @@
4360 _entries.removeAt(0);
4461 }
4562 debugPrint('[TRACE] $event — $details');
63
+
64
+ // Auto-publish to server if MQTT is connected
65
+ _publishTrace(event, details);
66
+ }
67
+
68
+ void _publishTrace(String event, String details) {
69
+ final mqtt = _mqtt;
70
+ if (mqtt == null || !mqtt.isConnected) return;
71
+ try {
72
+ final payload = jsonEncode({
73
+ 'type': 'command',
74
+ 'command': 'app_trace',
75
+ 'event': event,
76
+ 'details': details,
77
+ 'ts': DateTime.now().millisecondsSinceEpoch,
78
+ });
79
+ final builder = MqttClientPayloadBuilder();
80
+ builder.addString(payload);
81
+ mqtt.publishRaw('pailot/control/in', builder.payload!, MqttQos.atMostOnce);
82
+ } catch (_) {
83
+ // Non-fatal — don't let trace logging break the app
84
+ }
4685 }
4786
4887 /// Clear all entries.