| .. | .. |
|---|
| 46 | 46 | /// Subscribes to all pailot/ topics and dispatches messages |
|---|
| 47 | 47 | /// through the onMessage callback interface. |
|---|
| 48 | 48 | class MqttService with WidgetsBindingObserver { |
|---|
| 49 | | - MqttService({required this.config}); |
|---|
| 49 | + MqttService({required this.config}) { |
|---|
| 50 | + WidgetsBinding.instance.addObserver(this); |
|---|
| 51 | + } |
|---|
| 50 | 52 | |
|---|
| 51 | 53 | ServerConfig config; |
|---|
| 52 | 54 | MqttServerClient? _client; |
|---|
| .. | .. |
|---|
| 61 | 63 | // Message deduplication |
|---|
| 62 | 64 | final Set<String> _seenMsgIds = {}; |
|---|
| 63 | 65 | final List<String> _seenMsgIdOrder = []; |
|---|
| 66 | + |
|---|
| 67 | + // Per-session explicit subscriptions (wildcards broken in aedes) |
|---|
| 68 | + final Set<String> _subscribedSessions = {}; |
|---|
| 64 | 69 | static const int _maxSeenIds = 500; |
|---|
| 65 | 70 | |
|---|
| 66 | 71 | // Callbacks |
|---|
| .. | .. |
|---|
| 97 | 102 | } |
|---|
| 98 | 103 | _clientId = id; |
|---|
| 99 | 104 | return id; |
|---|
| 105 | + } |
|---|
| 106 | + |
|---|
| 107 | + /// Force reconnect — disconnect and reconnect to last known host. |
|---|
| 108 | + void forceReconnect() { |
|---|
| 109 | + _mqttLog('MQTT: force reconnect requested'); |
|---|
| 110 | + final lastHost = connectedHost; |
|---|
| 111 | + _client?.disconnect(); |
|---|
| 112 | + _client = null; |
|---|
| 113 | + _setStatus(ConnectionStatus.reconnecting); |
|---|
| 114 | + onReconnecting?.call(); |
|---|
| 115 | + if (lastHost != null) { |
|---|
| 116 | + _fastReconnect(lastHost); |
|---|
| 117 | + } else { |
|---|
| 118 | + connect(); |
|---|
| 119 | + } |
|---|
| 120 | + } |
|---|
| 121 | + |
|---|
| 122 | + /// Fast reconnect to a known host — skips discovery, short timeout. |
|---|
| 123 | + Future<void> _fastReconnect(String host) async { |
|---|
| 124 | + _mqttLog('MQTT: fast reconnect to $host'); |
|---|
| 125 | + final clientId = await _getClientId(); |
|---|
| 126 | + if (await _tryConnect(host, clientId, timeout: 2000)) { |
|---|
| 127 | + connectedHost = host; |
|---|
| 128 | + return; |
|---|
| 129 | + } |
|---|
| 130 | + // Fast path failed — fall back to full connect |
|---|
| 131 | + _mqttLog('MQTT: fast reconnect failed, full connect...'); |
|---|
| 132 | + connect(); |
|---|
| 100 | 133 | } |
|---|
| 101 | 134 | |
|---|
| 102 | 135 | /// Connect to the MQTT broker. |
|---|
| .. | .. |
|---|
| 458 | 491 | _mqttLog('MQTT: _subscribe called but client is null'); |
|---|
| 459 | 492 | return; |
|---|
| 460 | 493 | } |
|---|
| 461 | | - _mqttLog('MQTT: subscribing to topics...'); |
|---|
| 494 | + // Subscribe to exact topics only — wildcard pailot/+/out is broken in aedes. |
|---|
| 495 | + // Per-session topics are added dynamically when the session list arrives. |
|---|
| 496 | + _mqttLog('MQTT: subscribing to base + ${_subscribedSessions.length} session topics...'); |
|---|
| 462 | 497 | client.subscribe('pailot/sessions', MqttQos.atLeastOnce); |
|---|
| 463 | 498 | client.subscribe('pailot/status', MqttQos.atLeastOnce); |
|---|
| 464 | 499 | client.subscribe('pailot/projects', MqttQos.atLeastOnce); |
|---|
| 465 | | - client.subscribe('pailot/+/out', MqttQos.atLeastOnce); |
|---|
| 466 | | - client.subscribe('pailot/+/typing', MqttQos.atMostOnce); |
|---|
| 467 | | - client.subscribe('pailot/+/screenshot', MqttQos.atLeastOnce); |
|---|
| 468 | 500 | client.subscribe('pailot/control/out', MqttQos.atLeastOnce); |
|---|
| 469 | 501 | client.subscribe('pailot/voice/transcript', MqttQos.atLeastOnce); |
|---|
| 502 | + // Re-subscribe to all known per-session topics |
|---|
| 503 | + for (final sid in _subscribedSessions) { |
|---|
| 504 | + client.subscribe('pailot/$sid/out', MqttQos.atLeastOnce); |
|---|
| 505 | + client.subscribe('pailot/$sid/typing', MqttQos.atMostOnce); |
|---|
| 506 | + client.subscribe('pailot/$sid/screenshot', MqttQos.atLeastOnce); |
|---|
| 507 | + } |
|---|
| 508 | + } |
|---|
| 509 | + |
|---|
| 510 | + /// Subscribe to per-session topics when session list arrives. |
|---|
| 511 | + void _subscribeToSessions(List<String> sessionIds) { |
|---|
| 512 | + final client = _client; |
|---|
| 513 | + if (client == null) return; |
|---|
| 514 | + for (final sid in sessionIds) { |
|---|
| 515 | + if (_subscribedSessions.contains(sid)) continue; |
|---|
| 516 | + _subscribedSessions.add(sid); |
|---|
| 517 | + client.subscribe('pailot/$sid/out', MqttQos.atLeastOnce); |
|---|
| 518 | + client.subscribe('pailot/$sid/typing', MqttQos.atMostOnce); |
|---|
| 519 | + client.subscribe('pailot/$sid/screenshot', MqttQos.atLeastOnce); |
|---|
| 520 | + _mqttLog('MQTT: subscribed to session ${sid.substring(0, 8)}'); |
|---|
| 521 | + } |
|---|
| 470 | 522 | } |
|---|
| 471 | 523 | |
|---|
| 472 | 524 | void _listenMessages() { |
|---|
| .. | .. |
|---|
| 525 | 577 | void _dispatchMessage(String topic, Map<String, dynamic> json) { |
|---|
| 526 | 578 | final parts = topic.split('/'); |
|---|
| 527 | 579 | |
|---|
| 528 | | - // pailot/sessions |
|---|
| 580 | + // pailot/sessions — also dynamically subscribe to per-session topics |
|---|
| 529 | 581 | if (topic == 'pailot/sessions') { |
|---|
| 530 | 582 | json['type'] = 'sessions'; |
|---|
| 583 | + final sessions = json['sessions'] as List<dynamic>?; |
|---|
| 584 | + if (sessions != null) { |
|---|
| 585 | + final ids = sessions |
|---|
| 586 | + .map((s) => (s as Map<String, dynamic>)['id'] as String?) |
|---|
| 587 | + .where((id) => id != null && id.isNotEmpty) |
|---|
| 588 | + .cast<String>() |
|---|
| 589 | + .toList(); |
|---|
| 590 | + if (ids.isNotEmpty) _subscribeToSessions(ids); |
|---|
| 591 | + } |
|---|
| 531 | 592 | onMessage?.call(json); |
|---|
| 532 | 593 | return; |
|---|
| 533 | 594 | } |
|---|
| .. | .. |
|---|
| 770 | 831 | |
|---|
| 771 | 832 | /// Dispose all resources. |
|---|
| 772 | 833 | void dispose() { |
|---|
| 834 | + WidgetsBinding.instance.removeObserver(this); |
|---|
| 773 | 835 | disconnect(); |
|---|
| 774 | 836 | } |
|---|
| 775 | 837 | |
|---|
| .. | .. |
|---|
| 782 | 844 | _mqttLog('MQTT: app resumed, status=$_status client=${_client != null} mqttState=${_client?.connectionStatus?.state}'); |
|---|
| 783 | 845 | final client = _client; |
|---|
| 784 | 846 | if (client == null || client.connectionStatus?.state != MqttConnectionState.connected) { |
|---|
| 785 | | - // Clearly disconnected — just reconnect |
|---|
| 786 | | - _mqttLog('MQTT: not connected on resume, reconnecting...'); |
|---|
| 847 | + // Clearly disconnected — fast reconnect to last known host |
|---|
| 848 | + _mqttLog('MQTT: disconnected on resume, reconnecting...'); |
|---|
| 787 | 849 | _client = null; |
|---|
| 788 | 850 | _setStatus(ConnectionStatus.reconnecting); |
|---|
| 789 | | - connect(); |
|---|
| 851 | + if (connectedHost != null) { |
|---|
| 852 | + _fastReconnect(connectedHost!); |
|---|
| 853 | + } else { |
|---|
| 854 | + connect(); |
|---|
| 855 | + } |
|---|
| 790 | 856 | } else { |
|---|
| 791 | | - // Appears connected — notify listener to fetch missed messages |
|---|
| 792 | | - // via catch_up. Don't call onOpen (it resets sessionReady and causes flicker). |
|---|
| 857 | + // Appears connected — trigger catch_up to fetch missed messages. |
|---|
| 858 | + // Don't disconnect! iOS may have buffered messages while suspended. |
|---|
| 793 | 859 | _mqttLog('MQTT: appears connected on resume, triggering catch_up'); |
|---|
| 794 | 860 | onResume?.call(); |
|---|
| 795 | 861 | } |
|---|