From 6cbbea9b96db551e5c0ac26f0ace3d4c3d82a276 Mon Sep 17 00:00:00 2001
From: Matthias Nott <mnott@mnsoft.org>
Date: Mon, 06 Apr 2026 15:02:33 +0200
Subject: [PATCH] fix: single pailot/out topic, per-session file locks, merge protection, resume reconnect
---
lib/services/message_store.dart | 27 ++++++
lib/providers/providers.dart | 20 +++-
lib/services/mqtt_service.dart | 125 +++++++++----------------------
lib/screens/chat_screen.dart | 33 ++++---
4 files changed, 94 insertions(+), 111 deletions(-)
diff --git a/lib/providers/providers.dart b/lib/providers/providers.dart
index ad4259a..6155189 100644
--- a/lib/providers/providers.dart
+++ b/lib/providers/providers.dart
@@ -100,28 +100,38 @@
/// Switch to a new session and load its messages.
Future<void> switchSession(String sessionId) async {
- // Log caller for debugging
final trace = StackTrace.current.toString().split('\n').take(4).join(' | ');
TraceService.instance.addTrace(
'switchSession',
'from=${_currentSessionId?.substring(0, 8) ?? "null"}(${state.length}) → ${sessionId.substring(0, 8)} | $trace',
);
- // Write current session DIRECTLY to disk (no debounce — prevents data loss)
+ // Write current session to disk
if (_currentSessionId != null && state.isNotEmpty) {
await MessageStore.writeDirect(_currentSessionId!, state);
}
+ // Skip reload if staying on the same session — messages are already in memory
+ if (_currentSessionId == sessionId) {
+ TraceService.instance.addTrace('switchSession SKIP', 'already on ${sessionId.substring(0, 8)}');
+ return;
+ }
+
_currentSessionId = sessionId;
final messages = await MessageStore.loadAll(sessionId);
- state = messages;
+ // Merge: if addMessage ran during loadAll and added messages for THIS session,
+ // they'll be in state but not in the loaded messages. Keep the longer list.
+ if (state.length > messages.length && _currentSessionId == sessionId) {
+ TraceService.instance.addTrace('switchSession MERGE', 'kept ${state.length} (loaded ${messages.length})');
+ } else {
+ state = messages;
+ }
}
/// Add a message to the current session.
void addMessage(Message message) {
state = [...state, message];
if (_currentSessionId != null) {
- // Write immediately (not debounced) to prevent race with switchSession's loadAll
- MessageStore.writeDirect(_currentSessionId!, state);
+ MessageStore.save(_currentSessionId!, state);
}
}
diff --git a/lib/screens/chat_screen.dart b/lib/screens/chat_screen.dart
index e889469..f331ab9 100644
--- a/lib/screens/chat_screen.dart
+++ b/lib/screens/chat_screen.dart
@@ -216,12 +216,15 @@
_ws!.onOpen = () {
_sessionReady = false; // Gate messages until sessions arrive
_pendingMessages.clear();
- final activeId = ref.read(activeSessionIdProvider);
- _sendCommand('sync', activeId != null ? {'activeSessionId': activeId} : null);
- // catch_up is sent after sessions arrive (in _handleSessions)
-
- // Re-register APNs token after reconnect so daemon always has a fresh token
- _push?.onMqttConnected();
+ // Delay sync slightly to let broker acknowledge our subscriptions first.
+ // Without this, the catch_up response arrives before pailot/control/out
+ // subscription is active, and the message is lost.
+ Future.delayed(const Duration(milliseconds: 500), () {
+ if (!mounted) return;
+ final activeId = ref.read(activeSessionIdProvider);
+ _sendCommand('sync', activeId != null ? {'activeSessionId': activeId} : null);
+ _push?.onMqttConnected();
+ });
};
_ws!.onResume = () {
// App came back from background. The in-memory state already has
@@ -534,8 +537,10 @@
status: MessageStatus.sent,
);
- final activeId = ref.read(activeSessionIdProvider);
- if (sessionId != null && sessionId != activeId) {
+ // Use currentSessionId from notifier (what's actually loaded in the provider)
+ // not activeSessionIdProvider (can be stale after background resume)
+ final currentId = ref.read(messagesProvider.notifier).currentSessionId;
+ if (sessionId != null && sessionId != currentId) {
// Store message for the other session so it's there when user switches
TraceService.instance.addTrace(
'message stored for session',
@@ -608,9 +613,9 @@
duration: duration,
);
- final activeId = ref.read(activeSessionIdProvider);
- _chatLog('voice: sessionId=$sessionId activeId=$activeId audioPath=$savedAudioPath content="${content.substring(0, content.length.clamp(0, 30))}"');
- if (sessionId != null && sessionId != activeId) {
+ final currentId = ref.read(messagesProvider.notifier).currentSessionId;
+ _chatLog('voice: sessionId=$sessionId currentId=$currentId audioPath=$savedAudioPath content="${content.substring(0, content.length.clamp(0, 30))}"');
+ if (sessionId != null && sessionId != currentId) {
_chatLog('voice: cross-session, storing for $sessionId');
await _storeForSession(sessionId, storedMessage);
_chatLog('voice: stored, incrementing unread');
@@ -679,9 +684,9 @@
status: MessageStatus.sent,
);
- // Cross-session routing: store for target session if not active
- final activeId = ref.read(activeSessionIdProvider);
- if (sessionId != null && sessionId != activeId) {
+ // Cross-session routing: store for target session if not currently loaded
+ final currentId = ref.read(messagesProvider.notifier).currentSessionId;
+ if (sessionId != null && sessionId != currentId) {
_storeForSession(sessionId, message);
_incrementUnread(sessionId);
return;
diff --git a/lib/services/message_store.dart b/lib/services/message_store.dart
index f45e0d0..1723b70 100644
--- a/lib/services/message_store.dart
+++ b/lib/services/message_store.dart
@@ -15,6 +15,8 @@
static Directory? _baseDir;
static Timer? _debounceTimer;
static final Map<String, List<Message>> _pendingSaves = {};
+ // Per-session lock to prevent concurrent read/write on the same file
+ static final Map<String, Completer<void>> _locks = {};
static const _backupChannel =
MethodChannel('com.mnsoft.pailot/backup');
@@ -60,10 +62,25 @@
/// Write directly to disk, bypassing debounce. For critical saves.
static Future<void> writeDirect(String sessionId, List<Message> messages) async {
- // Cancel ALL pending debounce to prevent race with frozen iOS timers
_debounceTimer?.cancel();
_pendingSaves.remove(sessionId);
- await _writeSession(sessionId, messages);
+ await _withLock(sessionId, () => _writeSession(sessionId, messages));
+ }
+
+ /// Acquire a per-session lock, run the operation, release.
+ static Future<T> _withLock<T>(String sessionId, Future<T> Function() fn) async {
+ // Wait for any existing operation on this session to finish
+ while (_locks.containsKey(sessionId)) {
+ await _locks[sessionId]!.future;
+ }
+ final completer = Completer<void>();
+ _locks[sessionId] = completer;
+ try {
+ return await fn();
+ } finally {
+ _locks.remove(sessionId);
+ completer.complete();
+ }
}
/// Immediately flush all pending saves.
@@ -77,7 +94,7 @@
_pendingSaves.clear();
for (final entry in entries.entries) {
- await _writeSession(entry.key, entry.value);
+ await _withLock(entry.key, () => _writeSession(entry.key, entry.value));
}
}
@@ -128,6 +145,10 @@
/// Load all messages for a session (no pagination).
static Future<List<Message>> loadAll(String sessionId) async {
+ return _withLock(sessionId, () => _loadAllImpl(sessionId));
+ }
+
+ static Future<List<Message>> _loadAllImpl(String sessionId) async {
try {
final dir = await _getBaseDir();
final file = File('${dir.path}/${_fileForSession(sessionId)}');
diff --git a/lib/services/mqtt_service.dart b/lib/services/mqtt_service.dart
index ba6056e..285c6bf 100644
--- a/lib/services/mqtt_service.dart
+++ b/lib/services/mqtt_service.dart
@@ -65,8 +65,7 @@
final Set<String> _seenMsgIds = {};
final List<String> _seenMsgIdOrder = [];
- // Per-session explicit subscriptions (wildcards broken in aedes)
- final Set<String> _subscribedSessions = {};
+ // (Per-session subscriptions removed — single pailot/out topic now)
static const int _maxSeenIds = 500;
// Callbacks
@@ -492,34 +491,14 @@
_mqttLog('MQTT: _subscribe called but client is null');
return;
}
- // Subscribe to exact topics only — wildcard pailot/+/out is broken in aedes.
- // Per-session topics are added dynamically when the session list arrives.
- _mqttLog('MQTT: subscribing to base + ${_subscribedSessions.length} session topics...');
+ // Single outbound topic — all messages carry sessionId in payload.
+ // Client routes messages to the correct session based on payload.
+ _mqttLog('MQTT: subscribing to topics...');
+ client.subscribe('pailot/out', MqttQos.atLeastOnce);
client.subscribe('pailot/sessions', MqttQos.atLeastOnce);
client.subscribe('pailot/status', MqttQos.atLeastOnce);
client.subscribe('pailot/projects', MqttQos.atLeastOnce);
client.subscribe('pailot/control/out', MqttQos.atLeastOnce);
- client.subscribe('pailot/voice/transcript', MqttQos.atLeastOnce);
- // Re-subscribe to all known per-session topics
- for (final sid in _subscribedSessions) {
- client.subscribe('pailot/$sid/out', MqttQos.atLeastOnce);
- client.subscribe('pailot/$sid/typing', MqttQos.atMostOnce);
- client.subscribe('pailot/$sid/screenshot', MqttQos.atLeastOnce);
- }
- }
-
- /// Subscribe to per-session topics when session list arrives.
- void _subscribeToSessions(List<String> sessionIds) {
- final client = _client;
- if (client == null) return;
- for (final sid in sessionIds) {
- if (_subscribedSessions.contains(sid)) continue;
- _subscribedSessions.add(sid);
- client.subscribe('pailot/$sid/out', MqttQos.atLeastOnce);
- client.subscribe('pailot/$sid/typing', MqttQos.atMostOnce);
- client.subscribe('pailot/$sid/screenshot', MqttQos.atLeastOnce);
- _mqttLog('MQTT: subscribed to session ${sid.substring(0, 8)}');
- }
}
void _listenMessages() {
@@ -576,20 +555,9 @@
/// Translates MQTT topic structure into the flat message format
/// that chat_screen expects.
void _dispatchMessage(String topic, Map<String, dynamic> json) {
- final parts = topic.split('/');
-
- // pailot/sessions — also dynamically subscribe to per-session topics
+ // pailot/sessions
if (topic == 'pailot/sessions') {
json['type'] = 'sessions';
- final sessions = json['sessions'] as List<dynamic>?;
- if (sessions != null) {
- final ids = sessions
- .map((s) => (s as Map<String, dynamic>)['id'] as String?)
- .where((id) => id != null && id.isNotEmpty)
- .cast<String>()
- .toList();
- if (ids.isNotEmpty) _subscribeToSessions(ids);
- }
onMessage?.call(json);
return;
}
@@ -608,46 +576,25 @@
return;
}
- // pailot/control/out — command responses (session_switched, session_renamed, error, unread)
+ // pailot/control/out — command responses
if (topic == 'pailot/control/out') {
onMessage?.call(json);
return;
}
- // pailot/voice/transcript
- if (topic == 'pailot/voice/transcript') {
- json['type'] = 'transcript';
- onMessage?.call(json);
- return;
- }
-
- // pailot/<sessionId>/out — text, voice, image messages
- if (parts.length == 3 && parts[2] == 'out') {
- final sessionId = parts[1];
- json['sessionId'] ??= sessionId;
- onMessage?.call(json);
- return;
- }
-
- // pailot/<sessionId>/typing
- if (parts.length == 3 && parts[2] == 'typing') {
- final sessionId = parts[1];
- json['type'] = 'typing';
- json['sessionId'] ??= sessionId;
- // Map 'active' field to the 'typing'/'isTyping' fields chat_screen expects
- final active = json['active'] as bool? ?? true;
- json['typing'] = active;
- onMessage?.call(json);
- return;
- }
-
- // pailot/<sessionId>/screenshot
- if (parts.length == 3 && parts[2] == 'screenshot') {
- final sessionId = parts[1];
- json['type'] = 'screenshot';
- json['sessionId'] ??= sessionId;
- // Map imageBase64 to 'data' for compatibility with chat_screen handler
- json['data'] ??= json['imageBase64'];
+ // pailot/out — ALL content messages (text, voice, image, typing, screenshot, transcript)
+ // Each message carries its type and sessionId in the payload.
+ if (topic == 'pailot/out') {
+ final type = json['type'] as String?;
+ // Normalize typing fields for chat_screen
+ if (type == 'typing') {
+ final active = json['active'] as bool? ?? true;
+ json['typing'] = active;
+ }
+ // Normalize screenshot fields
+ if (type == 'screenshot') {
+ json['data'] ??= json['imageBase64'];
+ }
onMessage?.call(json);
return;
}
@@ -851,23 +798,23 @@
switch (state) {
case AppLifecycleState.resumed:
if (_intentionalClose) break;
- _mqttLog('MQTT: app resumed, status=$_status client=${_client != null} mqttState=${_client?.connectionStatus?.state}');
- final client = _client;
- if (client == null || client.connectionStatus?.state != MqttConnectionState.connected) {
- // Clearly disconnected — fast reconnect to last known host
- _mqttLog('MQTT: disconnected on resume, reconnecting...');
- _client = null;
- _setStatus(ConnectionStatus.reconnecting);
- if (connectedHost != null) {
- _fastReconnect(connectedHost!);
- } else {
- connect();
- }
+ _mqttLog('MQTT: app resumed — reconnecting to last host');
+ // Kill old client completely (disable autoReconnect first to prevent
+ // the MQTT library from spawning its own reconnect attempt)
+ final oldClient = _client;
+ if (oldClient != null) {
+ oldClient.autoReconnect = false;
+ try { oldClient.disconnect(); } catch (_) {}
+ }
+ _updatesSub?.cancel();
+ _updatesSub = null;
+ _client = null;
+ _setStatus(ConnectionStatus.reconnecting);
+ onReconnecting?.call();
+ if (connectedHost != null) {
+ _fastReconnect(connectedHost!);
} else {
- // Appears connected — trigger catch_up to fetch missed messages.
- // Don't disconnect! iOS may have buffered messages while suspended.
- _mqttLog('MQTT: appears connected on resume, triggering catch_up');
- onResume?.call();
+ connect();
}
case AppLifecycleState.paused:
break;
--
Gitblit v1.3.1