From 6cbbea9b96db551e5c0ac26f0ace3d4c3d82a276 Mon Sep 17 00:00:00 2001
From: Matthias Nott <mnott@mnsoft.org>
Date: Mon, 06 Apr 2026 15:02:33 +0200
Subject: [PATCH] fix: single pailot/out topic, per-session file locks, merge protection, resume reconnect
---
lib/services/mqtt_service.dart | 125 ++++++++++++-----------------------------
1 files changed, 36 insertions(+), 89 deletions(-)
diff --git a/lib/services/mqtt_service.dart b/lib/services/mqtt_service.dart
index ba6056e..285c6bf 100644
--- a/lib/services/mqtt_service.dart
+++ b/lib/services/mqtt_service.dart
@@ -65,8 +65,7 @@
final Set<String> _seenMsgIds = {};
final List<String> _seenMsgIdOrder = [];
- // Per-session explicit subscriptions (wildcards broken in aedes)
- final Set<String> _subscribedSessions = {};
+ // (Per-session subscriptions removed — single pailot/out topic now)
static const int _maxSeenIds = 500;
// Callbacks
@@ -492,34 +491,14 @@
_mqttLog('MQTT: _subscribe called but client is null');
return;
}
- // Subscribe to exact topics only — wildcard pailot/+/out is broken in aedes.
- // Per-session topics are added dynamically when the session list arrives.
- _mqttLog('MQTT: subscribing to base + ${_subscribedSessions.length} session topics...');
+ // Single outbound topic — all messages carry sessionId in payload.
+ // Client routes messages to the correct session based on payload.
+ _mqttLog('MQTT: subscribing to topics...');
+ client.subscribe('pailot/out', MqttQos.atLeastOnce);
client.subscribe('pailot/sessions', MqttQos.atLeastOnce);
client.subscribe('pailot/status', MqttQos.atLeastOnce);
client.subscribe('pailot/projects', MqttQos.atLeastOnce);
client.subscribe('pailot/control/out', MqttQos.atLeastOnce);
- client.subscribe('pailot/voice/transcript', MqttQos.atLeastOnce);
- // Re-subscribe to all known per-session topics
- for (final sid in _subscribedSessions) {
- client.subscribe('pailot/$sid/out', MqttQos.atLeastOnce);
- client.subscribe('pailot/$sid/typing', MqttQos.atMostOnce);
- client.subscribe('pailot/$sid/screenshot', MqttQos.atLeastOnce);
- }
- }
-
- /// Subscribe to per-session topics when session list arrives.
- void _subscribeToSessions(List<String> sessionIds) {
- final client = _client;
- if (client == null) return;
- for (final sid in sessionIds) {
- if (_subscribedSessions.contains(sid)) continue;
- _subscribedSessions.add(sid);
- client.subscribe('pailot/$sid/out', MqttQos.atLeastOnce);
- client.subscribe('pailot/$sid/typing', MqttQos.atMostOnce);
- client.subscribe('pailot/$sid/screenshot', MqttQos.atLeastOnce);
- _mqttLog('MQTT: subscribed to session ${sid.substring(0, 8)}');
- }
}
void _listenMessages() {
@@ -576,20 +555,9 @@
/// Translates MQTT topic structure into the flat message format
/// that chat_screen expects.
void _dispatchMessage(String topic, Map<String, dynamic> json) {
- final parts = topic.split('/');
-
- // pailot/sessions — also dynamically subscribe to per-session topics
+ // pailot/sessions
if (topic == 'pailot/sessions') {
json['type'] = 'sessions';
- final sessions = json['sessions'] as List<dynamic>?;
- if (sessions != null) {
- final ids = sessions
- .map((s) => (s as Map<String, dynamic>)['id'] as String?)
- .where((id) => id != null && id.isNotEmpty)
- .cast<String>()
- .toList();
- if (ids.isNotEmpty) _subscribeToSessions(ids);
- }
onMessage?.call(json);
return;
}
@@ -608,46 +576,25 @@
return;
}
- // pailot/control/out — command responses (session_switched, session_renamed, error, unread)
+ // pailot/control/out — command responses
if (topic == 'pailot/control/out') {
onMessage?.call(json);
return;
}
- // pailot/voice/transcript
- if (topic == 'pailot/voice/transcript') {
- json['type'] = 'transcript';
- onMessage?.call(json);
- return;
- }
-
- // pailot/<sessionId>/out — text, voice, image messages
- if (parts.length == 3 && parts[2] == 'out') {
- final sessionId = parts[1];
- json['sessionId'] ??= sessionId;
- onMessage?.call(json);
- return;
- }
-
- // pailot/<sessionId>/typing
- if (parts.length == 3 && parts[2] == 'typing') {
- final sessionId = parts[1];
- json['type'] = 'typing';
- json['sessionId'] ??= sessionId;
- // Map 'active' field to the 'typing'/'isTyping' fields chat_screen expects
- final active = json['active'] as bool? ?? true;
- json['typing'] = active;
- onMessage?.call(json);
- return;
- }
-
- // pailot/<sessionId>/screenshot
- if (parts.length == 3 && parts[2] == 'screenshot') {
- final sessionId = parts[1];
- json['type'] = 'screenshot';
- json['sessionId'] ??= sessionId;
- // Map imageBase64 to 'data' for compatibility with chat_screen handler
- json['data'] ??= json['imageBase64'];
+ // pailot/out — ALL content messages (text, voice, image, typing, screenshot, transcript)
+ // Each message carries its type and sessionId in the payload.
+ if (topic == 'pailot/out') {
+ final type = json['type'] as String?;
+ // Normalize typing fields for chat_screen
+ if (type == 'typing') {
+ final active = json['active'] as bool? ?? true;
+ json['typing'] = active;
+ }
+ // Normalize screenshot fields
+ if (type == 'screenshot') {
+ json['data'] ??= json['imageBase64'];
+ }
onMessage?.call(json);
return;
}
@@ -851,23 +798,23 @@
switch (state) {
case AppLifecycleState.resumed:
if (_intentionalClose) break;
- _mqttLog('MQTT: app resumed, status=$_status client=${_client != null} mqttState=${_client?.connectionStatus?.state}');
- final client = _client;
- if (client == null || client.connectionStatus?.state != MqttConnectionState.connected) {
- // Clearly disconnected — fast reconnect to last known host
- _mqttLog('MQTT: disconnected on resume, reconnecting...');
- _client = null;
- _setStatus(ConnectionStatus.reconnecting);
- if (connectedHost != null) {
- _fastReconnect(connectedHost!);
- } else {
- connect();
- }
+ _mqttLog('MQTT: app resumed — reconnecting to last host');
+ // Kill old client completely (disable autoReconnect first to prevent
+ // the MQTT library from spawning its own reconnect attempt)
+ final oldClient = _client;
+ if (oldClient != null) {
+ oldClient.autoReconnect = false;
+ try { oldClient.disconnect(); } catch (_) {}
+ }
+ _updatesSub?.cancel();
+ _updatesSub = null;
+ _client = null;
+ _setStatus(ConnectionStatus.reconnecting);
+ onReconnecting?.call();
+ if (connectedHost != null) {
+ _fastReconnect(connectedHost!);
} else {
- // Appears connected — trigger catch_up to fetch missed messages.
- // Don't disconnect! iOS may have buffered messages while suspended.
- _mqttLog('MQTT: appears connected on resume, triggering catch_up');
- onResume?.call();
+ connect();
}
case AppLifecycleState.paused:
break;
--
Gitblit v1.3.1