import 'dart:async'; import 'dart:convert'; import 'dart:io'; import 'dart:typed_data'; import 'package:crypto/crypto.dart'; import 'package:bonsoir/bonsoir.dart'; import 'package:flutter/widgets.dart'; import 'package:path_provider/path_provider.dart' as pp; import 'package:mqtt_client/mqtt_client.dart'; import 'package:mqtt_client/mqtt_server_client.dart'; import 'package:shared_preferences/shared_preferences.dart'; import 'package:uuid/uuid.dart'; import '../models/server_config.dart'; import 'wol_service.dart'; /// Connection status for the MQTT client. enum ConnectionStatus { disconnected, connecting, connected, reconnecting, } // Debug log to file (survives release builds) Future _mqttLog(String msg) async { try { final dir = await pp.getApplicationDocumentsDirectory(); final file = File('${dir.path}/mqtt_debug.log'); final ts = DateTime.now().toIso8601String().substring(11, 19); await file.writeAsString('[$ts] $msg\n', mode: FileMode.append); } catch (_) {} } /// MQTT client for PAILot. /// /// Connects to the AIBroker daemon's embedded aedes broker. /// Subscribes to all pailot/ topics and dispatches messages /// through the onMessage callback interface. class MqttService with WidgetsBindingObserver { MqttService({required this.config}); ServerConfig config; MqttServerClient? _client; ConnectionStatus _status = ConnectionStatus.disconnected; bool _intentionalClose = false; String? _clientId; String? _lastDiscoveredHost; StreamSubscription? _updatesSub; // Message deduplication final Set _seenMsgIds = {}; final List _seenMsgIdOrder = []; static const int _maxSeenIds = 500; // Callbacks void Function(ConnectionStatus status)? onStatusChanged; void Function(Map message)? onMessage; void Function()? onOpen; void Function()? onClose; void Function()? onReconnecting; void Function()? onResume; void Function(String error)? onError; ConnectionStatus get status => _status; bool get isConnected => _status == ConnectionStatus.connected; void _setStatus(ConnectionStatus newStatus) { if (_status == newStatus) return; _status = newStatus; onStatusChanged?.call(newStatus); } /// Get or create a persistent client ID for this device. Future _getClientId() async { if (_clientId != null) return _clientId!; final prefs = await SharedPreferences.getInstance(); var id = prefs.getString('mqtt_client_id'); // Regenerate if old format (too long for MQTT 3.1.1) if (id == null || id.length > 23) { // MQTT 3.1.1 client IDs: max 23 chars, alphanumeric id = 'pailot${const Uuid().v4().replaceAll('-', '').substring(0, 16)}'; await prefs.setString('mqtt_client_id', id); } _clientId = id; return id; } /// Connect to the MQTT broker. /// Tries local host first (2.5s timeout), then remote host. Future connect() async { if (_status == ConnectionStatus.connected || _status == ConnectionStatus.connecting) { return; } _intentionalClose = false; _setStatus(ConnectionStatus.connecting); // Load trusted cert fingerprint for TOFU verification if (_trustedFingerprint == null) await _loadTrustedFingerprint(); // Send Wake-on-LAN if MAC configured if (config.macAddress != null && config.macAddress!.isNotEmpty) { try { await WolService.wake(config.macAddress!, localHost: config.localHost); } catch (_) {} } final clientId = await _getClientId(); // Connection order: local → cached discovery → Bonjour/scan → VPN → remote final attempts = >[]; // host → timeout ms if (config.localHost != null && config.localHost!.isNotEmpty) { attempts.add(MapEntry(config.localHost!, 2500)); } // Try cached discovered host before scanning again if (_lastDiscoveredHost != null) { attempts.add(MapEntry(_lastDiscoveredHost!, 3000)); } if (config.vpnHost != null && config.vpnHost!.isNotEmpty) { attempts.add(MapEntry(config.vpnHost!, 3000)); } if (config.host.isNotEmpty) { attempts.add(MapEntry(config.host, 5000)); } _mqttLog('MQTT: attempts=${attempts.map((e) => e.key).join(", ")} port=${config.port}'); for (final attempt in attempts) { if (_intentionalClose) return; _mqttLog('MQTT: trying ${attempt.key}:${config.port}'); try { if (await _tryConnect(attempt.key, clientId, timeout: attempt.value)) return; } catch (e) { _mqttLog('MQTT: ${attempt.key} error=$e'); } } // All configured hosts failed — try Bonjour/subnet scan (only once, not on retry) if (_lastDiscoveredHost == null && !_intentionalClose) { _mqttLog('MQTT: trying Bonjour/subnet discovery...'); final discovered = await _discoverViaMdns(); if (discovered != null && !_intentionalClose) { _lastDiscoveredHost = discovered; _mqttLog('MQTT: discovered $discovered, connecting...'); try { if (await _tryConnect(discovered, clientId, timeout: 3000)) return; } catch (e) { _mqttLog('MQTT: discovered host $discovered error=$e'); } } else { _mqttLog('MQTT: discovery returned nothing'); } } // All hosts failed — retry after delay _mqttLog('MQTT: all attempts failed, retrying in 5s'); _setStatus(ConnectionStatus.reconnecting); Future.delayed(const Duration(seconds: 5), () { if (!_intentionalClose && _status != ConnectionStatus.connected) { connect(); } }); } /// Discover AIBroker on local network via Bonjour/mDNS. /// Falls back to subnet scan if Bonjour fails (iOS blocks mDNS on Personal Hotspot). /// Returns the IP address or null if not found within timeout. Future _discoverViaMdns({Duration timeout = const Duration(seconds: 3)}) async { // Try Bonjour first try { final discovery = BonsoirDiscovery(type: '_mqtt._tcp'); await discovery.initialize(); final completer = Completer(); StreamSubscription? sub; sub = discovery.eventStream?.listen((event) { switch (event) { case BonsoirDiscoveryServiceResolvedEvent(): final ip = event.service.host; _mqttLog('MQTT: Bonjour resolved: ${event.service.name} at $ip:${event.service.port}'); if (ip != null && ip.isNotEmpty && !completer.isCompleted) { completer.complete(ip); } case BonsoirDiscoveryServiceFoundEvent(): _mqttLog('MQTT: Bonjour found: ${event.service.name}'); default: break; } }); await discovery.start(); final ip = await completer.future.timeout(timeout, onTimeout: () => null); await sub?.cancel(); await discovery.stop(); if (ip != null) return ip; } catch (e) { _mqttLog('MQTT: Bonjour discovery error: $e'); } // Fallback: scan local subnet for MQTT port (handles Personal Hotspot) _mqttLog('MQTT: Bonjour failed, trying subnet scan...'); return _scanSubnetForMqtt(); } /// Scan the local subnet for an MQTT broker by probing the configured port. /// Useful when iOS Personal Hotspot blocks mDNS. Future _scanSubnetForMqtt() async { try { // Get device's own IP to determine the subnet final interfaces = await NetworkInterface.list(type: InternetAddressType.IPv4); for (final iface in interfaces) { for (final addr in iface.addresses) { final parts = addr.address.split('.'); if (parts.length != 4) continue; // Skip loopback if (parts[0] == '127') continue; // Only scan small subnets (hotspot = /28, max 14 hosts) final subnet = '${parts[0]}.${parts[1]}.${parts[2]}'; _mqttLog('MQTT: scanning $subnet.0/24 on ${iface.name}'); // Probe all hosts in parallel — 1s timeout each, runs concurrently final futures = >[]; for (int i = 1; i <= 254; i++) { final probe = '$subnet.$i'; if (probe == addr.address) continue; // skip self futures.add(_probeHost(probe, config.port)); } final results = await Future.wait(futures); final found = results.firstWhere((r) => r != null, orElse: () => null); if (found != null) { _mqttLog('MQTT: subnet scan found broker at $found'); return found; } } } } catch (e) { _mqttLog('MQTT: subnet scan error: $e'); } return null; } // --- TOFU (Trust On First Use) certificate pinning --- String? _trustedFingerprint; // Loaded from SharedPreferences at startup /// Load the trusted cert fingerprint from storage. Future _loadTrustedFingerprint() async { final prefs = await SharedPreferences.getInstance(); _trustedFingerprint = prefs.getString('trustedCertFingerprint'); if (_trustedFingerprint != null) { _mqttLog('TOFU: loaded trusted fingerprint: ${_trustedFingerprint!.substring(0, 16)}...'); } } /// Compute SHA-256 fingerprint of a certificate's DER bytes. String _certFingerprint(X509Certificate cert) { final der = cert.der; final digest = sha256.convert(der); return digest.toString(); } /// TOFU verification: accept on first use, reject if fingerprint changes. bool _verifyCertTofu(dynamic certificate) { if (certificate is! X509Certificate) return true; // Can't verify, accept final fingerprint = _certFingerprint(certificate); if (_trustedFingerprint == null) { // First connection — trust and save _trustedFingerprint = fingerprint; SharedPreferences.getInstance().then((prefs) { prefs.setString('trustedCertFingerprint', fingerprint); }); _mqttLog('TOFU: first connection, saved fingerprint: ${fingerprint.substring(0, 16)}...'); return true; } if (_trustedFingerprint == fingerprint) { return true; // Known cert, trusted } // Fingerprint mismatch — possible MITM or server reinstall _mqttLog('TOFU: CERT MISMATCH! Expected ${_trustedFingerprint!.substring(0, 16)}... got ${fingerprint.substring(0, 16)}...'); // Reject the connection. User must reset trust in settings. return false; } /// Reset the trusted cert fingerprint (e.g., after server reinstall). Future resetTrustedCert() async { _trustedFingerprint = null; final prefs = await SharedPreferences.getInstance(); await prefs.remove('trustedCertFingerprint'); _mqttLog('TOFU: trust reset'); } /// Probe a single host:port with a TLS connection attempt (1s timeout). /// Uses SecureSocket since the broker now requires TLS. Future _probeHost(String host, int port) async { try { final socket = await SecureSocket.connect( host, port, timeout: const Duration(seconds: 1), onBadCertificate: (_) => true, // Accept self-signed cert during scan ); await socket.close(); return host; } catch (_) { return null; } } Future _tryConnect(String host, String clientId, {int timeout = 5000}) async { try { final client = MqttServerClient.withPort(host, clientId, config.port); client.keepAlivePeriod = 30; client.autoReconnect = false; // Don't auto-reconnect during trial — enable after success client.connectTimeoutPeriod = timeout; // client.maxConnectionAttempts is final — can't set it client.logging(on: false); // TLS with TOFU (Trust On First Use) cert pinning. // First connection: accept cert, save its SHA-256 fingerprint. // Future connections: only accept certs matching the saved fingerprint. client.secure = true; client.securityContext = SecurityContext(withTrustedRoots: true); client.onBadCertificate = (dynamic certificate) { return _verifyCertTofu(certificate); }; client.onConnected = _onConnected; client.onDisconnected = _onDisconnected; client.onAutoReconnect = _onAutoReconnect; client.onAutoReconnected = _onAutoReconnected; // Clean session: we handle offline delivery ourselves via catch_up protocol. // Persistent sessions cause the broker to flood all queued QoS 1 messages // on reconnect, which overwhelms the client with large voice payloads. final connMessage = MqttConnectMessage() .withClientIdentifier(clientId) .startClean() .authenticateAs('pailot', config.mqttToken ?? ''); client.connectionMessage = connMessage; // Set _client BEFORE connect() so _onConnected can subscribe _client = client; _mqttLog('MQTT: connecting to $host:${config.port} as $clientId (timeout=${timeout}ms)'); final result = await client.connect().timeout( Duration(milliseconds: timeout + 1000), onTimeout: () { _mqttLog('MQTT: connect timed out for $host'); return null; }, ); _mqttLog('MQTT: connect result=${result?.state}'); if (result?.state == MqttConnectionState.connected) { client.autoReconnect = true; // Now enable auto-reconnect for the live connection return true; } _client = null; client.disconnect(); return false; } catch (e) { _mqttLog('MQTT: connect exception=$e'); return false; } } void _onConnected() { _mqttLog('MQTT: _onConnected fired'); _setStatus(ConnectionStatus.connected); _subscribe(); _listenMessages(); onOpen?.call(); } void _onDisconnected() { _updatesSub?.cancel(); _updatesSub = null; if (_intentionalClose) { _setStatus(ConnectionStatus.disconnected); onClose?.call(); } else { _setStatus(ConnectionStatus.reconnecting); onReconnecting?.call(); } } void _onAutoReconnect() { _setStatus(ConnectionStatus.reconnecting); onReconnecting?.call(); } void _onAutoReconnected() { _setStatus(ConnectionStatus.connected); _subscribe(); _listenMessages(); onOpen?.call(); } void _subscribe() { final client = _client; if (client == null) { _mqttLog('MQTT: _subscribe called but client is null'); return; } _mqttLog('MQTT: subscribing to topics...'); client.subscribe('pailot/sessions', MqttQos.atLeastOnce); client.subscribe('pailot/status', MqttQos.atLeastOnce); client.subscribe('pailot/projects', MqttQos.atLeastOnce); client.subscribe('pailot/+/out', MqttQos.atLeastOnce); client.subscribe('pailot/+/typing', MqttQos.atMostOnce); client.subscribe('pailot/+/screenshot', MqttQos.atLeastOnce); client.subscribe('pailot/control/out', MqttQos.atLeastOnce); client.subscribe('pailot/voice/transcript', MqttQos.atLeastOnce); } void _listenMessages() { _updatesSub?.cancel(); _updatesSub = _client?.updates?.listen(_onMqttMessage); } void _onMqttMessage(List> messages) { _mqttLog('MQTT: received ${messages.length} message(s)'); for (final msg in messages) { _mqttLog('MQTT: topic=${msg.topic}'); final pubMsg = msg.payload as MqttPublishMessage; final payload = MqttPublishPayload.bytesToStringAsString( pubMsg.payload.message, ); Map json; try { json = jsonDecode(payload) as Map; } catch (_) { continue; // Skip non-JSON } // Dedup by msgId final msgId = json['msgId'] as String?; if (msgId != null) { if (_seenMsgIds.contains(msgId)) continue; _seenMsgIds.add(msgId); _seenMsgIdOrder.add(msgId); _evictOldIds(); } // Dispatch: parse topic to enrich the message with routing info _dispatchMessage(msg.topic, json); } } /// Route incoming MQTT messages to the onMessage callback. /// Translates MQTT topic structure into the flat message format /// that chat_screen expects. void _dispatchMessage(String topic, Map json) { final parts = topic.split('/'); // pailot/sessions if (topic == 'pailot/sessions') { json['type'] = 'sessions'; onMessage?.call(json); return; } // pailot/status if (topic == 'pailot/status') { json['type'] = 'status'; onMessage?.call(json); return; } // pailot/projects if (topic == 'pailot/projects') { json['type'] = 'projects'; onMessage?.call(json); return; } // pailot/control/out — command responses (session_switched, session_renamed, error, unread) if (topic == 'pailot/control/out') { onMessage?.call(json); return; } // pailot/voice/transcript if (topic == 'pailot/voice/transcript') { json['type'] = 'transcript'; onMessage?.call(json); return; } // pailot//out — text, voice, image messages if (parts.length == 3 && parts[2] == 'out') { final sessionId = parts[1]; json['sessionId'] ??= sessionId; onMessage?.call(json); return; } // pailot//typing if (parts.length == 3 && parts[2] == 'typing') { final sessionId = parts[1]; json['type'] = 'typing'; json['sessionId'] ??= sessionId; // Map 'active' field to the 'typing'/'isTyping' fields chat_screen expects final active = json['active'] as bool? ?? true; json['typing'] = active; onMessage?.call(json); return; } // pailot//screenshot if (parts.length == 3 && parts[2] == 'screenshot') { final sessionId = parts[1]; json['type'] = 'screenshot'; json['sessionId'] ??= sessionId; // Map imageBase64 to 'data' for compatibility with chat_screen handler json['data'] ??= json['imageBase64']; onMessage?.call(json); return; } } void _evictOldIds() { while (_seenMsgIdOrder.length > _maxSeenIds) { final oldest = _seenMsgIdOrder.removeAt(0); _seenMsgIds.remove(oldest); } } /// Generate a UUID v4 for message IDs. String _uuid() => const Uuid().v4(); /// Current timestamp in milliseconds. int _now() => DateTime.now().millisecondsSinceEpoch; /// Publish a JSON payload to an MQTT topic. void _publish(String topic, Map payload, MqttQos qos) { final client = _client; if (client == null || client.connectionStatus?.state != MqttConnectionState.connected) { onError?.call('Not connected'); return; } try { final builder = MqttClientPayloadBuilder(); builder.addString(jsonEncode(payload)); client.publishMessage(topic, qos, builder.payload!); } catch (e) { onError?.call('Send failed: $e'); } } /// Send a message — routes to the appropriate MQTT topic based on content. void send(Map message) { final type = message['type'] as String?; final sessionId = message['sessionId'] as String?; if (type == 'command' || (message.containsKey('command') && type == null)) { // Command messages go to pailot/control/in final command = message['command'] as String? ?? ''; final args = message['args'] as Map? ?? {}; final payload = { 'msgId': _uuid(), 'type': 'command', 'command': command, 'ts': _now(), ...args, }; _publish('pailot/control/in', payload, MqttQos.atLeastOnce); return; } if (type == 'voice' && sessionId != null) { // Voice message _publish('pailot/$sessionId/in', { 'msgId': _uuid(), 'type': 'voice', 'sessionId': sessionId, 'audioBase64': message['audioBase64'] ?? '', 'messageId': message['messageId'] ?? '', 'ts': _now(), }, MqttQos.atLeastOnce); return; } if (type == 'image' && sessionId != null) { _publish('pailot/$sessionId/in', { 'msgId': _uuid(), 'type': 'image', 'sessionId': sessionId, 'imageBase64': message['imageBase64'] ?? '', 'mimeType': message['mimeType'] ?? 'image/jpeg', 'caption': message['caption'] ?? '', 'ts': _now(), }, MqttQos.atLeastOnce); return; } if (type == 'bundle' && sessionId != null) { // Atomic multi-attachment message _publish('pailot/$sessionId/in', { 'msgId': _uuid(), 'type': 'bundle', 'sessionId': sessionId, 'caption': message['caption'] ?? '', if (message['audioBase64'] != null) 'audioBase64': message['audioBase64'], if (message['voiceMessageId'] != null) 'voiceMessageId': message['voiceMessageId'], 'attachments': message['attachments'] ?? [], 'ts': _now(), }, MqttQos.atLeastOnce); return; } if (type == 'file' && sessionId != null) { _publish('pailot/$sessionId/in', { 'msgId': _uuid(), 'type': 'file', 'sessionId': sessionId, 'fileBase64': message['fileBase64'] ?? '', 'fileName': message['fileName'] ?? 'file', 'mimeType': message['mimeType'] ?? 'application/octet-stream', 'fileSize': message['fileSize'] ?? 0, 'ts': _now(), }, MqttQos.atLeastOnce); return; } if (type == 'tts' && sessionId != null) { // TTS request — route as command _publish('pailot/control/in', { 'msgId': _uuid(), 'type': 'command', 'command': 'tts', 'text': message['text'] ?? '', 'sessionId': sessionId, 'ts': _now(), }, MqttQos.atLeastOnce); return; } // Default: plain text message (content + sessionId) if (sessionId != null) { final content = message['content'] as String? ?? ''; _publish('pailot/$sessionId/in', { 'msgId': _uuid(), 'type': 'text', 'sessionId': sessionId, 'content': content, 'ts': _now(), }, MqttQos.atLeastOnce); return; } onError?.call('Cannot send message: missing sessionId'); } /// Disconnect intentionally. void disconnect() { _intentionalClose = true; _updatesSub?.cancel(); _updatesSub = null; try { _client?.disconnect(); } catch (_) {} _client = null; _setStatus(ConnectionStatus.disconnected); onClose?.call(); } /// Update config and reconnect. Future updateConfig(ServerConfig newConfig) async { config = newConfig; disconnect(); await Future.delayed(const Duration(milliseconds: 100)); await connect(); } /// Dispose all resources. void dispose() { disconnect(); } // App lifecycle integration @override void didChangeAppLifecycleState(AppLifecycleState state) { switch (state) { case AppLifecycleState.resumed: if (_intentionalClose) break; _mqttLog('MQTT: app resumed, status=$_status client=${_client != null} mqttState=${_client?.connectionStatus?.state}'); final client = _client; if (client == null || client.connectionStatus?.state != MqttConnectionState.connected) { // Clearly disconnected — just reconnect _mqttLog('MQTT: not connected on resume, reconnecting...'); _client = null; _setStatus(ConnectionStatus.reconnecting); connect(); } else { // Appears connected — notify listener to fetch missed messages // via catch_up. Don't call onOpen (it resets sessionReady and causes flicker). _mqttLog('MQTT: appears connected on resume, triggering catch_up'); onResume?.call(); } case AppLifecycleState.paused: break; default: break; } } }