From 1bf6e76e31383aef77e42943fc2caf350cf7e096 Mon Sep 17 00:00:00 2001
From: Matthias Nott <mnott@mnsoft.org>
Date: Mon, 06 Apr 2026 13:35:05 +0200
Subject: [PATCH] fix: resume message reload, direct session writes, MQTT trace pipe to server
---
lib/services/message_store.dart | 6 +++
lib/providers/providers.dart | 5 +-
lib/services/trace_service.dart | 39 +++++++++++++++++++
lib/services/mqtt_service.dart | 10 +++++
lib/screens/chat_screen.dart | 39 +++++++++----------
5 files changed, 75 insertions(+), 24 deletions(-)
diff --git a/lib/providers/providers.dart b/lib/providers/providers.dart
index c93f40e..63cf5a4 100644
--- a/lib/providers/providers.dart
+++ b/lib/providers/providers.dart
@@ -99,10 +99,9 @@
/// Switch to a new session and load its messages.
Future<void> switchSession(String sessionId) async {
- // Force-flush current session to disk before switching
+ // Write current session DIRECTLY to disk (no debounce — prevents data loss)
if (_currentSessionId != null && state.isNotEmpty) {
- MessageStore.save(_currentSessionId!, state);
- await MessageStore.flush();
+ await MessageStore.writeDirect(_currentSessionId!, state);
}
_currentSessionId = sessionId;
diff --git a/lib/screens/chat_screen.dart b/lib/screens/chat_screen.dart
index 22efed5..e470ce7 100644
--- a/lib/screens/chat_screen.dart
+++ b/lib/screens/chat_screen.dart
@@ -65,6 +65,7 @@
String? _playingMessageId;
int _lastSeq = 0;
bool _isCatchingUp = false;
+ bool _catchUpReceived = false;
bool _screenshotForChat = false;
// FIFO dedup queue: O(1) eviction by removing from front when over cap.
final List<int> _seenSeqsList = [];
@@ -222,28 +223,20 @@
// Re-register APNs token after reconnect so daemon always has a fresh token
_push?.onMqttConnected();
};
- _ws!.onResume = () {
- // App came back from background — connection may or may not be alive.
- // Try catch_up first, but if no response comes, force reconnect.
- _chatLog('onResume: sending catch_up with lastSeq=$_lastSeq');
- final seqBefore = _lastSeq;
+ _ws!.onResume = () async {
+ // App came back from background — reload messages and catch up.
+ _chatLog('onResume: reloading messages and sending catch_up');
_sendCommand('catch_up', {'lastSeq': _lastSeq});
- // Force UI rebuild for any buffered messages
- Future.delayed(const Duration(milliseconds: 300), () {
- if (mounted) {
- setState(() {});
- _scrollToBottom();
- }
- });
- // If catch_up didn't produce a response in 2s, connection is dead — reconnect
- Future.delayed(const Duration(seconds: 2), () {
- if (!mounted) return;
- if (_lastSeq == seqBefore) {
- // No new messages arrived — connection likely dead
- _chatLog('onResume: no catch_up response after 2s, forcing reconnect');
- _ws?.forceReconnect();
- }
- });
+ // Force reload current session messages from provider (triggers rebuild)
+ final activeId = ref.read(activeSessionIdProvider);
+ if (activeId != null) {
+ // Re-save then re-load to ensure UI matches persisted state
+ await ref.read(messagesProvider.notifier).switchSession(activeId);
+ }
+ if (mounted) {
+ setState(() {});
+ _scrollToBottom();
+ }
};
_ws!.onError = (error) {
debugPrint('MQTT error: $error');
@@ -259,6 +252,9 @@
);
await _ws!.connect();
+
+ // Attach MQTT to trace service for auto-publishing logs to server
+ TraceService.instance.attachMqtt(_ws!);
// Initialize push notifications after MQTT is set up so token can be
// sent immediately if already connected.
@@ -371,6 +367,7 @@
final sessionId = msg['sessionId'] as String?;
if (sessionId != null) _incrementUnread(sessionId);
case 'catch_up':
+ _catchUpReceived = true;
final serverSeq = msg['serverSeq'] as int?;
if (serverSeq != null) {
// Always sync to server's seq — if server restarted, its seq may be lower
diff --git a/lib/services/message_store.dart b/lib/services/message_store.dart
index faa45c4..2d682ec 100644
--- a/lib/services/message_store.dart
+++ b/lib/services/message_store.dart
@@ -57,6 +57,12 @@
_debounceTimer = Timer(const Duration(seconds: 1), _flushAll);
}
+ /// Write directly to disk, bypassing debounce. For critical saves.
+ static Future<void> writeDirect(String sessionId, List<Message> messages) async {
+ _pendingSaves.remove(sessionId);
+ await _writeSession(sessionId, messages);
+ }
+
/// Immediately flush all pending saves.
static Future<void> flush() async {
_debounceTimer?.cancel();
diff --git a/lib/services/mqtt_service.dart b/lib/services/mqtt_service.dart
index 618a00e..ba6056e 100644
--- a/lib/services/mqtt_service.dart
+++ b/lib/services/mqtt_service.dart
@@ -11,6 +11,7 @@
import 'package:path_provider/path_provider.dart' as pp;
import 'package:mqtt_client/mqtt_client.dart';
import 'package:mqtt_client/mqtt_server_client.dart';
+import 'package:typed_data/typed_data.dart';
import 'package:shared_preferences/shared_preferences.dart';
import 'package:uuid/uuid.dart';
@@ -665,6 +666,15 @@
/// Current timestamp in milliseconds.
int _now() => DateTime.now().millisecondsSinceEpoch;
+ /// Publish raw bytes to a topic. Used by TraceService for log streaming.
+ void publishRaw(String topic, Uint8Buffer payload, MqttQos qos) {
+ final client = _client;
+ if (client == null || client.connectionStatus?.state != MqttConnectionState.connected) return;
+ try {
+ client.publishMessage(topic, qos, payload);
+ } catch (_) {}
+ }
+
/// Publish a JSON payload to an MQTT topic.
void _publish(String topic, Map<String, dynamic> payload, MqttQos qos) {
final client = _client;
diff --git a/lib/services/trace_service.dart b/lib/services/trace_service.dart
index baeef9f..5ef89b8 100644
--- a/lib/services/trace_service.dart
+++ b/lib/services/trace_service.dart
@@ -1,4 +1,10 @@
+import 'dart:convert';
+
import 'package:flutter/foundation.dart';
+import 'package:mqtt_client/mqtt_client.dart';
+import 'package:mqtt_client/mqtt_server_client.dart';
+
+import 'mqtt_service.dart';
/// A single trace entry capturing a message-handling event.
class TraceEntry {
@@ -22,17 +28,28 @@
/// Captures message-handling events from MQTT, chat screen, and other
/// components. The buffer is capped at [maxEntries] (default 200).
/// Works in both debug and release builds.
+///
+/// When an MqttService is attached via [attachMqtt], trace entries are
+/// automatically published to the server on `pailot/control/in` so they
+/// can be read from the daemon log.
class TraceService {
TraceService._();
static final TraceService instance = TraceService._();
static const int maxEntries = 200;
final List<TraceEntry> _entries = [];
+ MqttService? _mqtt;
+
+ /// Attach an MQTT service for auto-publishing traces to the server.
+ void attachMqtt(MqttService mqtt) {
+ _mqtt = mqtt;
+ }
/// All entries, oldest first.
List<TraceEntry> get entries => List.unmodifiable(_entries);
/// Add a trace entry. Oldest entry is evicted once the buffer is full.
+ /// If MQTT is attached and connected, the entry is also published to the server.
void addTrace(String event, String details) {
_entries.add(TraceEntry(
timestamp: DateTime.now(),
@@ -43,6 +60,28 @@
_entries.removeAt(0);
}
debugPrint('[TRACE] $event — $details');
+
+ // Auto-publish to server if MQTT is connected
+ _publishTrace(event, details);
+ }
+
+ void _publishTrace(String event, String details) {
+ final mqtt = _mqtt;
+ if (mqtt == null || !mqtt.isConnected) return;
+ try {
+ final payload = jsonEncode({
+ 'type': 'command',
+ 'command': 'app_trace',
+ 'event': event,
+ 'details': details,
+ 'ts': DateTime.now().millisecondsSinceEpoch,
+ });
+ final builder = MqttClientPayloadBuilder();
+ builder.addString(payload);
+ mqtt.publishRaw('pailot/control/in', builder.payload!, MqttQos.atMostOnce);
+ } catch (_) {
+ // Non-fatal — don't let trace logging break the app
+ }
}
/// Clear all entries.
--
Gitblit v1.3.1