From 34b82ffb9b31866b528cd91a3d2ec360dbb8da5e Mon Sep 17 00:00:00 2001
From: Matthias Nott <mnott@mnsoft.org>
Date: Mon, 06 Apr 2026 13:05:50 +0200
Subject: [PATCH] fix: explicit per-session MQTT subscriptions, lifecycle observer, resume reconnect with fallback
---
lib/services/mqtt_service.dart | 88 ++++++++++++++++++++++++++++++++++++++-----
lib/screens/chat_screen.dart | 25 +++++++++++-
2 files changed, 99 insertions(+), 14 deletions(-)
diff --git a/lib/screens/chat_screen.dart b/lib/screens/chat_screen.dart
index 9b8a551..22efed5 100644
--- a/lib/screens/chat_screen.dart
+++ b/lib/screens/chat_screen.dart
@@ -223,10 +223,27 @@
_push?.onMqttConnected();
};
_ws!.onResume = () {
- // App came back from background with connection still alive.
- // Send catch_up to fetch any messages missed during suspend.
+ // App came back from background — connection may or may not be alive.
+ // Try catch_up first, but if no response comes, force reconnect.
_chatLog('onResume: sending catch_up with lastSeq=$_lastSeq');
+ final seqBefore = _lastSeq;
_sendCommand('catch_up', {'lastSeq': _lastSeq});
+ // Force UI rebuild for any buffered messages
+ Future.delayed(const Duration(milliseconds: 300), () {
+ if (mounted) {
+ setState(() {});
+ _scrollToBottom();
+ }
+ });
+ // If catch_up didn't produce a response in 2s, connection is dead — reconnect
+ Future.delayed(const Duration(seconds: 2), () {
+ if (!mounted) return;
+ if (_lastSeq == seqBefore) {
+ // No new messages arrived — connection likely dead
+ _chatLog('onResume: no catch_up response after 2s, forcing reconnect');
+ _ws?.forceReconnect();
+ }
+ });
};
_ws!.onError = (error) {
debugPrint('MQTT error: $error');
@@ -247,7 +264,9 @@
// sent immediately if already connected.
_push = PushService(mqttService: _ws!);
_push!.onNotificationTap = (data) {
- // If notification carried a sessionId, switch to that session
+ // Switch to the session immediately, then request catch_up.
+ // The MQTT connection auto-reconnects on resume, and onResume
+ // already sends catch_up. We just need to be on the right session.
final sessionId = data['sessionId'] as String?;
if (sessionId != null && mounted) {
_switchSession(sessionId);
diff --git a/lib/services/mqtt_service.dart b/lib/services/mqtt_service.dart
index 34ffc93..618a00e 100644
--- a/lib/services/mqtt_service.dart
+++ b/lib/services/mqtt_service.dart
@@ -46,7 +46,9 @@
/// Subscribes to all pailot/ topics and dispatches messages
/// through the onMessage callback interface.
class MqttService with WidgetsBindingObserver {
- MqttService({required this.config});
+ MqttService({required this.config}) {
+ WidgetsBinding.instance.addObserver(this);
+ }
ServerConfig config;
MqttServerClient? _client;
@@ -61,6 +63,9 @@
// Message deduplication
final Set<String> _seenMsgIds = {};
final List<String> _seenMsgIdOrder = [];
+
+ // Per-session explicit subscriptions (wildcards broken in aedes)
+ final Set<String> _subscribedSessions = {};
static const int _maxSeenIds = 500;
// Callbacks
@@ -97,6 +102,34 @@
}
_clientId = id;
return id;
+ }
+
+ /// Force reconnect — disconnect and reconnect to last known host.
+ void forceReconnect() {
+ _mqttLog('MQTT: force reconnect requested');
+ final lastHost = connectedHost;
+ _client?.disconnect();
+ _client = null;
+ _setStatus(ConnectionStatus.reconnecting);
+ onReconnecting?.call();
+ if (lastHost != null) {
+ _fastReconnect(lastHost);
+ } else {
+ connect();
+ }
+ }
+
+ /// Fast reconnect to a known host — skips discovery, short timeout.
+ Future<void> _fastReconnect(String host) async {
+ _mqttLog('MQTT: fast reconnect to $host');
+ final clientId = await _getClientId();
+ if (await _tryConnect(host, clientId, timeout: 2000)) {
+ connectedHost = host;
+ return;
+ }
+ // Fast path failed — fall back to full connect
+ _mqttLog('MQTT: fast reconnect failed, full connect...');
+ connect();
}
/// Connect to the MQTT broker.
@@ -458,15 +491,34 @@
_mqttLog('MQTT: _subscribe called but client is null');
return;
}
- _mqttLog('MQTT: subscribing to topics...');
+ // Subscribe to exact topics only — wildcard pailot/+/out is broken in aedes.
+ // Per-session topics are added dynamically when the session list arrives.
+ _mqttLog('MQTT: subscribing to base + ${_subscribedSessions.length} session topics...');
client.subscribe('pailot/sessions', MqttQos.atLeastOnce);
client.subscribe('pailot/status', MqttQos.atLeastOnce);
client.subscribe('pailot/projects', MqttQos.atLeastOnce);
- client.subscribe('pailot/+/out', MqttQos.atLeastOnce);
- client.subscribe('pailot/+/typing', MqttQos.atMostOnce);
- client.subscribe('pailot/+/screenshot', MqttQos.atLeastOnce);
client.subscribe('pailot/control/out', MqttQos.atLeastOnce);
client.subscribe('pailot/voice/transcript', MqttQos.atLeastOnce);
+ // Re-subscribe to all known per-session topics
+ for (final sid in _subscribedSessions) {
+ client.subscribe('pailot/$sid/out', MqttQos.atLeastOnce);
+ client.subscribe('pailot/$sid/typing', MqttQos.atMostOnce);
+ client.subscribe('pailot/$sid/screenshot', MqttQos.atLeastOnce);
+ }
+ }
+
+ /// Subscribe to per-session topics when session list arrives.
+ void _subscribeToSessions(List<String> sessionIds) {
+ final client = _client;
+ if (client == null) return;
+ for (final sid in sessionIds) {
+ if (_subscribedSessions.contains(sid)) continue;
+ _subscribedSessions.add(sid);
+ client.subscribe('pailot/$sid/out', MqttQos.atLeastOnce);
+ client.subscribe('pailot/$sid/typing', MqttQos.atMostOnce);
+ client.subscribe('pailot/$sid/screenshot', MqttQos.atLeastOnce);
+ _mqttLog('MQTT: subscribed to session ${sid.substring(0, 8)}');
+ }
}
void _listenMessages() {
@@ -525,9 +577,18 @@
void _dispatchMessage(String topic, Map<String, dynamic> json) {
final parts = topic.split('/');
- // pailot/sessions
+ // pailot/sessions — also dynamically subscribe to per-session topics
if (topic == 'pailot/sessions') {
json['type'] = 'sessions';
+ final sessions = json['sessions'] as List<dynamic>?;
+ if (sessions != null) {
+ final ids = sessions
+ .map((s) => (s as Map<String, dynamic>)['id'] as String?)
+ .where((id) => id != null && id.isNotEmpty)
+ .cast<String>()
+ .toList();
+ if (ids.isNotEmpty) _subscribeToSessions(ids);
+ }
onMessage?.call(json);
return;
}
@@ -770,6 +831,7 @@
/// Dispose all resources.
void dispose() {
+ WidgetsBinding.instance.removeObserver(this);
disconnect();
}
@@ -782,14 +844,18 @@
_mqttLog('MQTT: app resumed, status=$_status client=${_client != null} mqttState=${_client?.connectionStatus?.state}');
final client = _client;
if (client == null || client.connectionStatus?.state != MqttConnectionState.connected) {
- // Clearly disconnected — just reconnect
- _mqttLog('MQTT: not connected on resume, reconnecting...');
+ // Clearly disconnected — fast reconnect to last known host
+ _mqttLog('MQTT: disconnected on resume, reconnecting...');
_client = null;
_setStatus(ConnectionStatus.reconnecting);
- connect();
+ if (connectedHost != null) {
+ _fastReconnect(connectedHost!);
+ } else {
+ connect();
+ }
} else {
- // Appears connected — notify listener to fetch missed messages
- // via catch_up. Don't call onOpen (it resets sessionReady and causes flicker).
+ // Appears connected — trigger catch_up to fetch missed messages.
+ // Don't disconnect! iOS may have buffered messages while suspended.
_mqttLog('MQTT: appears connected on resume, triggering catch_up');
onResume?.call();
}
--
Gitblit v1.3.1