Matthias Nott
10 days ago 6cbbea9b96db551e5c0ac26f0ace3d4c3d82a276
lib/services/mqtt_service.dart
....@@ -65,8 +65,7 @@
6565 final Set<String> _seenMsgIds = {};
6666 final List<String> _seenMsgIdOrder = [];
6767
68
- // Per-session explicit subscriptions (wildcards broken in aedes)
69
- final Set<String> _subscribedSessions = {};
68
+ // (Per-session subscriptions removed — single pailot/out topic now)
7069 static const int _maxSeenIds = 500;
7170
7271 // Callbacks
....@@ -492,34 +491,14 @@
492491 _mqttLog('MQTT: _subscribe called but client is null');
493492 return;
494493 }
495
- // Subscribe to exact topics only — wildcard pailot/+/out is broken in aedes.
496
- // Per-session topics are added dynamically when the session list arrives.
497
- _mqttLog('MQTT: subscribing to base + ${_subscribedSessions.length} session topics...');
494
+ // Single outbound topic — all messages carry sessionId in payload.
495
+ // Client routes messages to the correct session based on payload.
496
+ _mqttLog('MQTT: subscribing to topics...');
497
+ client.subscribe('pailot/out', MqttQos.atLeastOnce);
498498 client.subscribe('pailot/sessions', MqttQos.atLeastOnce);
499499 client.subscribe('pailot/status', MqttQos.atLeastOnce);
500500 client.subscribe('pailot/projects', MqttQos.atLeastOnce);
501501 client.subscribe('pailot/control/out', MqttQos.atLeastOnce);
502
- client.subscribe('pailot/voice/transcript', MqttQos.atLeastOnce);
503
- // Re-subscribe to all known per-session topics
504
- for (final sid in _subscribedSessions) {
505
- client.subscribe('pailot/$sid/out', MqttQos.atLeastOnce);
506
- client.subscribe('pailot/$sid/typing', MqttQos.atMostOnce);
507
- client.subscribe('pailot/$sid/screenshot', MqttQos.atLeastOnce);
508
- }
509
- }
510
-
511
- /// Subscribe to per-session topics when session list arrives.
512
- void _subscribeToSessions(List<String> sessionIds) {
513
- final client = _client;
514
- if (client == null) return;
515
- for (final sid in sessionIds) {
516
- if (_subscribedSessions.contains(sid)) continue;
517
- _subscribedSessions.add(sid);
518
- client.subscribe('pailot/$sid/out', MqttQos.atLeastOnce);
519
- client.subscribe('pailot/$sid/typing', MqttQos.atMostOnce);
520
- client.subscribe('pailot/$sid/screenshot', MqttQos.atLeastOnce);
521
- _mqttLog('MQTT: subscribed to session ${sid.substring(0, 8)}');
522
- }
523502 }
524503
525504 void _listenMessages() {
....@@ -576,20 +555,9 @@
576555 /// Translates MQTT topic structure into the flat message format
577556 /// that chat_screen expects.
578557 void _dispatchMessage(String topic, Map<String, dynamic> json) {
579
- final parts = topic.split('/');
580
-
581
- // pailot/sessions — also dynamically subscribe to per-session topics
558
+ // pailot/sessions
582559 if (topic == 'pailot/sessions') {
583560 json['type'] = 'sessions';
584
- final sessions = json['sessions'] as List<dynamic>?;
585
- if (sessions != null) {
586
- final ids = sessions
587
- .map((s) => (s as Map<String, dynamic>)['id'] as String?)
588
- .where((id) => id != null && id.isNotEmpty)
589
- .cast<String>()
590
- .toList();
591
- if (ids.isNotEmpty) _subscribeToSessions(ids);
592
- }
593561 onMessage?.call(json);
594562 return;
595563 }
....@@ -608,46 +576,25 @@
608576 return;
609577 }
610578
611
- // pailot/control/out — command responses (session_switched, session_renamed, error, unread)
579
+ // pailot/control/out — command responses
612580 if (topic == 'pailot/control/out') {
613581 onMessage?.call(json);
614582 return;
615583 }
616584
617
- // pailot/voice/transcript
618
- if (topic == 'pailot/voice/transcript') {
619
- json['type'] = 'transcript';
620
- onMessage?.call(json);
621
- return;
622
- }
623
-
624
- // pailot/<sessionId>/out — text, voice, image messages
625
- if (parts.length == 3 && parts[2] == 'out') {
626
- final sessionId = parts[1];
627
- json['sessionId'] ??= sessionId;
628
- onMessage?.call(json);
629
- return;
630
- }
631
-
632
- // pailot/<sessionId>/typing
633
- if (parts.length == 3 && parts[2] == 'typing') {
634
- final sessionId = parts[1];
635
- json['type'] = 'typing';
636
- json['sessionId'] ??= sessionId;
637
- // Map 'active' field to the 'typing'/'isTyping' fields chat_screen expects
638
- final active = json['active'] as bool? ?? true;
639
- json['typing'] = active;
640
- onMessage?.call(json);
641
- return;
642
- }
643
-
644
- // pailot/<sessionId>/screenshot
645
- if (parts.length == 3 && parts[2] == 'screenshot') {
646
- final sessionId = parts[1];
647
- json['type'] = 'screenshot';
648
- json['sessionId'] ??= sessionId;
649
- // Map imageBase64 to 'data' for compatibility with chat_screen handler
650
- json['data'] ??= json['imageBase64'];
585
+ // pailot/out — ALL content messages (text, voice, image, typing, screenshot, transcript)
586
+ // Each message carries its type and sessionId in the payload.
587
+ if (topic == 'pailot/out') {
588
+ final type = json['type'] as String?;
589
+ // Normalize typing fields for chat_screen
590
+ if (type == 'typing') {
591
+ final active = json['active'] as bool? ?? true;
592
+ json['typing'] = active;
593
+ }
594
+ // Normalize screenshot fields
595
+ if (type == 'screenshot') {
596
+ json['data'] ??= json['imageBase64'];
597
+ }
651598 onMessage?.call(json);
652599 return;
653600 }
....@@ -851,23 +798,23 @@
851798 switch (state) {
852799 case AppLifecycleState.resumed:
853800 if (_intentionalClose) break;
854
- _mqttLog('MQTT: app resumed, status=$_status client=${_client != null} mqttState=${_client?.connectionStatus?.state}');
855
- final client = _client;
856
- if (client == null || client.connectionStatus?.state != MqttConnectionState.connected) {
857
- // Clearly disconnected — fast reconnect to last known host
858
- _mqttLog('MQTT: disconnected on resume, reconnecting...');
859
- _client = null;
860
- _setStatus(ConnectionStatus.reconnecting);
861
- if (connectedHost != null) {
862
- _fastReconnect(connectedHost!);
863
- } else {
864
- connect();
865
- }
801
+ _mqttLog('MQTT: app resumed — reconnecting to last host');
802
+ // Kill old client completely (disable autoReconnect first to prevent
803
+ // the MQTT library from spawning its own reconnect attempt)
804
+ final oldClient = _client;
805
+ if (oldClient != null) {
806
+ oldClient.autoReconnect = false;
807
+ try { oldClient.disconnect(); } catch (_) {}
808
+ }
809
+ _updatesSub?.cancel();
810
+ _updatesSub = null;
811
+ _client = null;
812
+ _setStatus(ConnectionStatus.reconnecting);
813
+ onReconnecting?.call();
814
+ if (connectedHost != null) {
815
+ _fastReconnect(connectedHost!);
866816 } else {
867
- // Appears connected — trigger catch_up to fetch missed messages.
868
- // Don't disconnect! iOS may have buffered messages while suspended.
869
- _mqttLog('MQTT: appears connected on resume, triggering catch_up');
870
- onResume?.call();
817
+ connect();
871818 }
872819 case AppLifecycleState.paused:
873820 break;