import 'dart:async'; import 'dart:convert'; import 'dart:io'; import 'dart:typed_data'; import 'package:crypto/crypto.dart'; import 'package:bonsoir/bonsoir.dart'; import 'package:connectivity_plus/connectivity_plus.dart'; import 'package:flutter/foundation.dart'; import 'package:flutter/widgets.dart'; import 'package:path_provider/path_provider.dart' as pp; import 'package:mqtt_client/mqtt_client.dart'; import 'package:mqtt_client/mqtt_server_client.dart'; import 'package:typed_data/typed_data.dart'; import 'package:shared_preferences/shared_preferences.dart'; import 'package:uuid/uuid.dart'; import '../models/server_config.dart'; import 'trace_service.dart'; import 'wol_service.dart'; /// Connection status for the MQTT client. enum ConnectionStatus { disconnected, connecting, connected, reconnecting, } // Debug log — writes to file only in debug builds, always prints via debugPrint. // Also adds entries to TraceService so they appear in the trace log viewer. Future _mqttLog(String msg) async { debugPrint('[MQTT] $msg'); TraceService.instance.addTrace('MQTT', msg); if (!kDebugMode) return; try { final dir = await pp.getApplicationDocumentsDirectory(); final file = File('${dir.path}/mqtt_debug.log'); final ts = DateTime.now().toIso8601String().substring(11, 19); await file.writeAsString('[$ts] $msg\n', mode: FileMode.append); } catch (_) {} } /// MQTT client for PAILot. /// /// Connects to the AIBroker daemon's embedded aedes broker. /// Subscribes to all pailot/ topics and dispatches messages /// through the onMessage callback interface. class MqttService with WidgetsBindingObserver { MqttService({required this.config}) { WidgetsBinding.instance.addObserver(this); } ServerConfig config; MqttServerClient? _client; ConnectionStatus _status = ConnectionStatus.disconnected; bool _intentionalClose = false; String? _clientId; String? _lastDiscoveredHost; StreamSubscription? _connectivitySub; List? _lastConnectivity; StreamSubscription? _updatesSub; // Message deduplication final Set _seenMsgIds = {}; final List _seenMsgIdOrder = []; // (Per-session subscriptions removed — single pailot/out topic now) static const int _maxSeenIds = 500; // Callbacks void Function(ConnectionStatus status)? onStatusChanged; void Function(String detail)? onStatusDetail; // "Probing local...", "Scanning network..." String? connectedHost; // The host we're currently connected to String? connectedVia; // "Local", "VPN", "Remote", "Bonjour", "Scan" void Function(Map message)? onMessage; void Function()? onOpen; void Function()? onClose; void Function()? onReconnecting; void Function()? onResume; void Function(String error)? onError; ConnectionStatus get status => _status; bool get isConnected => _status == ConnectionStatus.connected; void _setStatus(ConnectionStatus newStatus) { if (_status == newStatus) return; _status = newStatus; onStatusChanged?.call(newStatus); } /// Get or create a persistent client ID for this device. Future _getClientId() async { if (_clientId != null) return _clientId!; final prefs = await SharedPreferences.getInstance(); var id = prefs.getString('mqtt_client_id'); // Regenerate if old format (too long for MQTT 3.1.1) if (id == null || id.length > 23) { // MQTT 3.1.1 client IDs: max 23 chars, alphanumeric id = 'pailot${const Uuid().v4().replaceAll('-', '').substring(0, 16)}'; await prefs.setString('mqtt_client_id', id); } _clientId = id; return id; } /// Force reconnect — disconnect and reconnect to last known host. void forceReconnect() { _mqttLog('MQTT: force reconnect requested'); final lastHost = connectedHost; _client?.disconnect(); _client = null; _setStatus(ConnectionStatus.reconnecting); onReconnecting?.call(); if (lastHost != null) { _fastReconnect(lastHost); } else { connect(); } } /// Fast reconnect to a known host — skips discovery, short timeout. Future _fastReconnect(String host) async { _mqttLog('MQTT: fast reconnect to $host'); final clientId = await _getClientId(); if (await _tryConnect(host, clientId, timeout: 2000)) { connectedHost = host; return; } // Fast path failed — fall back to full connect _mqttLog('MQTT: fast reconnect failed, full connect...'); connect(); } /// Connect to the MQTT broker. /// Tries local host first (2.5s timeout), then remote host. Future connect() async { if (_status == ConnectionStatus.connected || _status == ConnectionStatus.connecting) { return; } _intentionalClose = false; _setStatus(ConnectionStatus.connecting); // Network change detection disabled — was causing spurious reconnects. // MQTT keepalive + auto-reconnect handles dead connections reliably. // Load trusted cert fingerprint for TOFU verification if (_trustedFingerprint == null) await _loadTrustedFingerprint(); // Send Wake-on-LAN if MAC configured if (config.macAddress != null && config.macAddress!.isNotEmpty) { try { await WolService.wake(config.macAddress!, localHost: config.localHost); } catch (_) {} } final clientId = await _getClientId(); // Phase 1: Race configured hosts (fast — just TLS probe, ~1s each) final hosts = []; if (config.localHost != null && config.localHost!.isNotEmpty) hosts.add(config.localHost!); if (_lastDiscoveredHost != null && !hosts.contains(_lastDiscoveredHost)) hosts.add(_lastDiscoveredHost!); if (config.vpnHost != null && config.vpnHost!.isNotEmpty) hosts.add(config.vpnHost!); if (config.host.isNotEmpty) hosts.add(config.host); _mqttLog('MQTT: racing ${hosts.length} configured hosts: ${hosts.join(", ")}'); onStatusDetail?.call('Connecting...'); // Race: first probe to succeed wins, don't wait for others String? winner; if (hosts.isNotEmpty) { final completer = Completer(); int pending = hosts.length; for (final host in hosts) { _probeHost(host, config.port).then((result) { if (result != null && !completer.isCompleted) { completer.complete(result); } else { pending--; if (pending <= 0 && !completer.isCompleted) { completer.complete(null); // All failed } } }); } winner = await completer.future.timeout( const Duration(seconds: 3), onTimeout: () => null, ); } // Phase 2: If configured hosts failed, try Bonjour/subnet discovery if (winner == null && !_intentionalClose) { _mqttLog('MQTT: configured hosts failed, trying discovery...'); onStatusDetail?.call('Scanning network...'); final discovered = await _discoverViaMdns(); if (discovered != null) { _lastDiscoveredHost = discovered; winner = discovered; } } if (winner != null && !_intentionalClose) { // Determine connection method label if (winner == config.localHost) { connectedVia = 'Local'; } else if (winner == config.vpnHost) { connectedVia = 'VPN'; } else if (winner == config.host) { connectedVia = 'Remote'; } else if (winner == _lastDiscoveredHost) { connectedVia = 'Discovered'; } else { connectedVia = winner; } connectedHost = winner; _mqttLog('MQTT: winner: $winner ($connectedVia), connecting...'); onStatusDetail?.call('Connecting via $connectedVia...'); try { if (await _tryConnect(winner, clientId, timeout: 5000)) return; } catch (e) { _mqttLog('MQTT: connect to $winner failed: $e'); connectedHost = null; connectedVia = null; } } // All hosts failed — retry after delay _mqttLog('MQTT: all attempts failed, retrying in 5s'); onStatusDetail?.call('No server found, retrying...'); _setStatus(ConnectionStatus.reconnecting); Future.delayed(const Duration(seconds: 5), () { if (!_intentionalClose && _status != ConnectionStatus.connected) { connect(); } }); } /// Discover AIBroker on local network via Bonjour/mDNS. /// Falls back to subnet scan if Bonjour fails (iOS blocks mDNS on Personal Hotspot). /// Returns the IP address or null if not found within timeout. Future _discoverViaMdns({Duration timeout = const Duration(seconds: 3)}) async { // Try Bonjour first try { final discovery = BonsoirDiscovery(type: '_mqtt._tcp'); await discovery.initialize(); final completer = Completer(); StreamSubscription? sub; sub = discovery.eventStream?.listen((event) { switch (event) { case BonsoirDiscoveryServiceResolvedEvent(): final ip = event.service.host; _mqttLog('MQTT: Bonjour resolved: ${event.service.name} at $ip:${event.service.port}'); if (ip != null && ip.isNotEmpty && !completer.isCompleted) { completer.complete(ip); } case BonsoirDiscoveryServiceFoundEvent(): _mqttLog('MQTT: Bonjour found: ${event.service.name}'); default: break; } }); await discovery.start(); final ip = await completer.future.timeout(timeout, onTimeout: () => null); await sub?.cancel(); await discovery.stop(); if (ip != null) return ip; } catch (e) { _mqttLog('MQTT: Bonjour discovery error: $e'); } // Fallback: scan local subnet for MQTT port (handles Personal Hotspot) _mqttLog('MQTT: Bonjour failed, trying subnet scan...'); onStatusDetail?.call('Scanning local network...'); return _scanSubnetForMqtt(); } /// Scan the local subnet for an MQTT broker by probing the configured port. /// Useful when iOS Personal Hotspot blocks mDNS. Future _scanSubnetForMqtt() async { try { // Get device's own IP to determine the subnet final interfaces = await NetworkInterface.list(type: InternetAddressType.IPv4); for (final iface in interfaces) { for (final addr in iface.addresses) { final parts = addr.address.split('.'); if (parts.length != 4) continue; // Skip loopback if (parts[0] == '127') continue; // Only scan small subnets (hotspot = /28, max 14 hosts) final subnet = '${parts[0]}.${parts[1]}.${parts[2]}'; _mqttLog('MQTT: scanning $subnet.0/24 on ${iface.name}'); // Probe in batches of 20 to avoid flooding the network. // Early exit on first hit. for (int batch = 1; batch <= 254; batch += 20) { final end = (batch + 19).clamp(1, 254); final futures = >[]; for (int i = batch; i <= end; i++) { final probe = '$subnet.$i'; if (probe == addr.address) continue; futures.add(_probeHost(probe, config.port)); } final results = await Future.wait(futures); final found = results.firstWhere((r) => r != null, orElse: () => null); if (found != null) { _mqttLog('MQTT: subnet scan found broker at $found'); return found; } } } } } catch (e) { _mqttLog('MQTT: subnet scan error: $e'); } return null; } // --- TOFU (Trust On First Use) certificate pinning --- String? _trustedFingerprint; // Loaded from SharedPreferences at startup /// Load the trusted cert fingerprint from storage. Future _loadTrustedFingerprint() async { final prefs = await SharedPreferences.getInstance(); _trustedFingerprint = prefs.getString('trustedCertFingerprint'); if (_trustedFingerprint != null) { _mqttLog('TOFU: loaded trusted fingerprint: ${_trustedFingerprint!.substring(0, 16)}...'); } } /// Compute SHA-256 fingerprint of a certificate's DER bytes. String _certFingerprint(X509Certificate cert) { final der = cert.der; final digest = sha256.convert(der); return digest.toString(); } /// TOFU verification: accept on first use, reject if fingerprint changes. bool _verifyCertTofu(dynamic certificate) { if (certificate is! X509Certificate) return true; // Can't verify, accept final fingerprint = _certFingerprint(certificate); if (_trustedFingerprint == null) { // First connection — trust and save _trustedFingerprint = fingerprint; SharedPreferences.getInstance().then((prefs) { prefs.setString('trustedCertFingerprint', fingerprint); }); _mqttLog('TOFU: first connection, saved fingerprint: ${fingerprint.substring(0, 16)}...'); return true; } if (_trustedFingerprint == fingerprint) { return true; // Known cert, trusted } // Fingerprint mismatch — possible MITM or server reinstall _mqttLog('TOFU: CERT MISMATCH! Expected ${_trustedFingerprint!.substring(0, 16)}... got ${fingerprint.substring(0, 16)}...'); // Reject the connection. User must reset trust in settings. return false; } /// Reset the trusted cert fingerprint (e.g., after server reinstall). Future resetTrustedCert() async { _trustedFingerprint = null; final prefs = await SharedPreferences.getInstance(); await prefs.remove('trustedCertFingerprint'); _mqttLog('TOFU: trust reset'); } /// Probe a single host:port with a TLS connection attempt (1s timeout). /// Uses SecureSocket since the broker now requires TLS. Future _probeHost(String host, int port) async { try { final socket = await SecureSocket.connect( host, port, timeout: const Duration(milliseconds: 500), onBadCertificate: (_) => true, // Accept self-signed cert during scan ); await socket.close(); return host; } catch (_) { return null; } } Future _tryConnect(String host, String clientId, {int timeout = 5000}) async { try { final client = MqttServerClient.withPort(host, clientId, config.port); client.keepAlivePeriod = 30; client.autoReconnect = false; // Don't auto-reconnect during trial — enable after success client.connectTimeoutPeriod = timeout; // client.maxConnectionAttempts is final — can't set it client.logging(on: false); // TLS with TOFU (Trust On First Use) cert pinning. // First connection: accept cert, save its SHA-256 fingerprint. // Future connections: only accept certs matching the saved fingerprint. client.secure = true; client.securityContext = SecurityContext(withTrustedRoots: true); client.onBadCertificate = (dynamic certificate) { return _verifyCertTofu(certificate); }; client.onConnected = _onConnected; client.onDisconnected = _onDisconnected; client.onAutoReconnect = _onAutoReconnect; client.onAutoReconnected = _onAutoReconnected; // Clean session: we handle offline delivery ourselves via catch_up protocol. // Persistent sessions cause the broker to flood all queued QoS 1 messages // on reconnect, which overwhelms the client with large voice payloads. final connMessage = MqttConnectMessage() .withClientIdentifier(clientId) .startClean() .authenticateAs('pailot', config.mqttToken ?? ''); client.connectionMessage = connMessage; // Set _client BEFORE connect() so _onConnected can subscribe _client = client; _mqttLog('MQTT: connecting to $host:${config.port} as $clientId (timeout=${timeout}ms)'); final result = await client.connect().timeout( Duration(milliseconds: timeout + 1000), onTimeout: () { _mqttLog('MQTT: connect timed out for $host'); return null; }, ); _mqttLog('MQTT: connect result=${result?.state}'); if (result?.state == MqttConnectionState.connected) { // Don't enable autoReconnect — we handle reconnection ourselves // in didChangeAppLifecycleState(resumed). The library's autoReconnect // competes with our force-reconnect and causes connection flickering. return true; } _client = null; client.disconnect(); return false; } catch (e) { _mqttLog('MQTT: connect exception=$e'); return false; } } void _onConnected() { _mqttLog('MQTT: _onConnected fired'); _setStatus(ConnectionStatus.connected); _subscribe(); _listenMessages(); onOpen?.call(); } void _onDisconnected() { _updatesSub?.cancel(); _updatesSub = null; if (_intentionalClose) { _setStatus(ConnectionStatus.disconnected); onClose?.call(); } else { _setStatus(ConnectionStatus.reconnecting); onReconnecting?.call(); } } void _onAutoReconnect() { _setStatus(ConnectionStatus.reconnecting); onReconnecting?.call(); } void _onAutoReconnected() { _setStatus(ConnectionStatus.connected); _subscribe(); _listenMessages(); onOpen?.call(); } void _subscribe() { final client = _client; if (client == null) { _mqttLog('MQTT: _subscribe called but client is null'); return; } // Single outbound topic — all messages carry sessionId in payload. // Client routes messages to the correct session based on payload. _mqttLog('MQTT: subscribing to topics...'); client.subscribe('pailot/out', MqttQos.atLeastOnce); client.subscribe('pailot/sessions', MqttQos.atLeastOnce); client.subscribe('pailot/status', MqttQos.atLeastOnce); client.subscribe('pailot/projects', MqttQos.atLeastOnce); client.subscribe('pailot/control/out', MqttQos.atLeastOnce); } void _listenMessages() { _updatesSub?.cancel(); _updatesSub = _client?.updates?.listen(_onMqttMessage); } void _onMqttMessage(List> messages) { _mqttLog('MQTT: received ${messages.length} message(s)'); for (final msg in messages) { _mqttLog('MQTT: topic=${msg.topic}'); final pubMsg = msg.payload as MqttPublishMessage; final payload = MqttPublishPayload.bytesToStringAsString( pubMsg.payload.message, ); Map json; try { json = jsonDecode(payload) as Map; } catch (_) { continue; // Skip non-JSON } // Dedup by msgId final msgId = json['msgId'] as String?; if (msgId != null) { if (_seenMsgIds.contains(msgId)) { final seq = json['seq']; final type = json['type'] as String? ?? '?'; TraceService.instance.addTrace( 'MQTT deduped', 'msgId=${msgId.substring(0, 8)} type=$type seq=$seq topic=${msg.topic}', ); continue; } _seenMsgIds.add(msgId); _seenMsgIdOrder.add(msgId); _evictOldIds(); } final seq = json['seq']; final type = json['type'] as String? ?? '?'; TraceService.instance.addTrace( 'MQTT received', 'seq=$seq type=$type on ${msg.topic}', ); // Dispatch: parse topic to enrich the message with routing info _dispatchMessage(msg.topic, json); } } /// Route incoming MQTT messages to the onMessage callback. /// Translates MQTT topic structure into the flat message format /// that chat_screen expects. void _dispatchMessage(String topic, Map json) { // pailot/sessions if (topic == 'pailot/sessions') { json['type'] = 'sessions'; onMessage?.call(json); return; } // pailot/status if (topic == 'pailot/status') { json['type'] = 'status'; onMessage?.call(json); return; } // pailot/projects if (topic == 'pailot/projects') { json['type'] = 'projects'; onMessage?.call(json); return; } // pailot/control/out — command responses if (topic == 'pailot/control/out') { onMessage?.call(json); return; } // pailot/out — ALL content messages (text, voice, image, typing, screenshot, transcript) // Each message carries its type and sessionId in the payload. if (topic == 'pailot/out') { final type = json['type'] as String?; // Normalize typing fields for chat_screen if (type == 'typing') { final active = json['active'] as bool? ?? true; json['typing'] = active; } // Normalize screenshot fields if (type == 'screenshot') { json['data'] ??= json['imageBase64']; } onMessage?.call(json); return; } } void _evictOldIds() { while (_seenMsgIdOrder.length > _maxSeenIds) { final oldest = _seenMsgIdOrder.removeAt(0); _seenMsgIds.remove(oldest); } } /// Generate a UUID v4 for message IDs. String _uuid() => const Uuid().v4(); /// Current timestamp in milliseconds. int _now() => DateTime.now().millisecondsSinceEpoch; /// Publish raw bytes to a topic. Used by TraceService for log streaming. void publishRaw(String topic, Uint8Buffer payload, MqttQos qos) { final client = _client; if (client == null || client.connectionStatus?.state != MqttConnectionState.connected) return; try { client.publishMessage(topic, qos, payload); } catch (_) {} } /// Publish a JSON payload to an MQTT topic. void _publish(String topic, Map payload, MqttQos qos) { final client = _client; if (client == null || client.connectionStatus?.state != MqttConnectionState.connected) { onError?.call('Not connected'); return; } try { final builder = MqttClientPayloadBuilder(); builder.addString(jsonEncode(payload)); client.publishMessage(topic, qos, builder.payload!); } catch (e) { onError?.call('Send failed: $e'); } } /// Send a message — routes to the appropriate MQTT topic based on content. void send(Map message) { final type = message['type'] as String?; final sessionId = message['sessionId'] as String?; if (type == 'command' || (message.containsKey('command') && type == null)) { // Command messages go to pailot/control/in final command = message['command'] as String? ?? ''; final args = message['args'] as Map? ?? {}; final payload = { 'msgId': _uuid(), 'type': 'command', 'command': command, 'ts': _now(), ...args, }; _publish('pailot/control/in', payload, MqttQos.atLeastOnce); return; } if (type == 'voice' && sessionId != null) { // Voice message _publish('pailot/$sessionId/in', { 'msgId': _uuid(), 'type': 'voice', 'sessionId': sessionId, 'audioBase64': message['audioBase64'] ?? '', 'messageId': message['messageId'] ?? '', 'ts': _now(), }, MqttQos.atLeastOnce); return; } if (type == 'image' && sessionId != null) { _publish('pailot/$sessionId/in', { 'msgId': _uuid(), 'type': 'image', 'sessionId': sessionId, 'imageBase64': message['imageBase64'] ?? '', 'mimeType': message['mimeType'] ?? 'image/jpeg', 'caption': message['caption'] ?? '', 'ts': _now(), }, MqttQos.atLeastOnce); return; } if (type == 'bundle' && sessionId != null) { // Atomic multi-attachment message _publish('pailot/$sessionId/in', { 'msgId': _uuid(), 'type': 'bundle', 'sessionId': sessionId, 'caption': message['caption'] ?? '', if (message['audioBase64'] != null) 'audioBase64': message['audioBase64'], if (message['voiceMessageId'] != null) 'voiceMessageId': message['voiceMessageId'], 'attachments': message['attachments'] ?? [], 'ts': _now(), }, MqttQos.atLeastOnce); return; } if (type == 'file' && sessionId != null) { _publish('pailot/$sessionId/in', { 'msgId': _uuid(), 'type': 'file', 'sessionId': sessionId, 'fileBase64': message['fileBase64'] ?? '', 'fileName': message['fileName'] ?? 'file', 'mimeType': message['mimeType'] ?? 'application/octet-stream', 'fileSize': message['fileSize'] ?? 0, 'ts': _now(), }, MqttQos.atLeastOnce); return; } if (type == 'tts' && sessionId != null) { // TTS request — route as command _publish('pailot/control/in', { 'msgId': _uuid(), 'type': 'command', 'command': 'tts', 'text': message['text'] ?? '', 'sessionId': sessionId, 'ts': _now(), }, MqttQos.atLeastOnce); return; } // Default: plain text message (content + sessionId) if (sessionId != null) { final content = message['content'] as String? ?? ''; _publish('pailot/$sessionId/in', { 'msgId': _uuid(), 'type': 'text', 'sessionId': sessionId, 'content': content, 'ts': _now(), }, MqttQos.atLeastOnce); return; } onError?.call('Cannot send message: missing sessionId'); } /// Publish the APNs device token to the daemon for push notification delivery. /// The daemon stores it in ~/.aibroker/apns-tokens.json and uses it when /// no MQTT clients are connected (app is backgrounded or offline). void sendDeviceToken(String token) { final client = _client; if (client == null || client.connectionStatus?.state != MqttConnectionState.connected) { return; } try { final builder = MqttClientPayloadBuilder(); builder.addString('{"token":"$token","ts":${DateTime.now().millisecondsSinceEpoch}}'); client.publishMessage('pailot/device/token', MqttQos.atLeastOnce, builder.payload!); _mqttLog('Push: device token published to pailot/device/token'); } catch (e) { _mqttLog('Push: failed to publish device token: $e'); } } /// Disconnect intentionally. void disconnect() { _intentionalClose = true; _updatesSub?.cancel(); _updatesSub = null; _connectivitySub?.cancel(); _connectivitySub = null; try { _client?.disconnect(); } catch (_) {} _client = null; _setStatus(ConnectionStatus.disconnected); onClose?.call(); } /// Update config and reconnect. Future updateConfig(ServerConfig newConfig) async { config = newConfig; disconnect(); await Future.delayed(const Duration(milliseconds: 100)); await connect(); } /// Dispose all resources. void dispose() { WidgetsBinding.instance.removeObserver(this); disconnect(); } // App lifecycle integration @override void didChangeAppLifecycleState(AppLifecycleState state) { switch (state) { case AppLifecycleState.resumed: if (_intentionalClose) break; _mqttLog('MQTT: app resumed — reconnecting to last host'); // Kill old client completely (disable autoReconnect first to prevent // the MQTT library from spawning its own reconnect attempt) final oldClient = _client; if (oldClient != null) { oldClient.autoReconnect = false; try { oldClient.disconnect(); } catch (_) {} } _updatesSub?.cancel(); _updatesSub = null; _client = null; _setStatus(ConnectionStatus.reconnecting); onReconnecting?.call(); if (connectedHost != null) { _fastReconnect(connectedHost!); } else { connect(); } case AppLifecycleState.paused: break; default: break; } } }