import 'dart:async'; import 'dart:convert'; import 'dart:io'; import 'package:flutter/widgets.dart'; import 'package:path_provider/path_provider.dart' as pp; import 'package:mqtt_client/mqtt_client.dart'; import 'package:mqtt_client/mqtt_server_client.dart'; import 'package:shared_preferences/shared_preferences.dart'; import 'package:uuid/uuid.dart'; import '../models/server_config.dart'; import 'wol_service.dart'; /// Connection status for the MQTT client. enum ConnectionStatus { disconnected, connecting, connected, reconnecting, } // Debug log to file (survives release builds) Future _mqttLog(String msg) async { try { final dir = await pp.getApplicationDocumentsDirectory(); final file = File('${dir.path}/mqtt_debug.log'); final ts = DateTime.now().toIso8601String().substring(11, 19); await file.writeAsString('[$ts] $msg\n', mode: FileMode.append); } catch (_) {} } /// MQTT client for PAILot. /// /// Connects to the AIBroker daemon's embedded aedes broker. /// Subscribes to all pailot/ topics and dispatches messages /// through the onMessage callback interface. class MqttService with WidgetsBindingObserver { MqttService({required this.config}); ServerConfig config; MqttServerClient? _client; ConnectionStatus _status = ConnectionStatus.disconnected; bool _intentionalClose = false; String? _clientId; StreamSubscription? _updatesSub; // Message deduplication final Set _seenMsgIds = {}; final List _seenMsgIdOrder = []; static const int _maxSeenIds = 500; // Callbacks void Function(ConnectionStatus status)? onStatusChanged; void Function(Map message)? onMessage; void Function()? onOpen; void Function()? onClose; void Function()? onReconnecting; void Function()? onResume; void Function(String error)? onError; ConnectionStatus get status => _status; bool get isConnected => _status == ConnectionStatus.connected; void _setStatus(ConnectionStatus newStatus) { if (_status == newStatus) return; _status = newStatus; onStatusChanged?.call(newStatus); } /// Get or create a persistent client ID for this device. Future _getClientId() async { if (_clientId != null) return _clientId!; final prefs = await SharedPreferences.getInstance(); var id = prefs.getString('mqtt_client_id'); // Regenerate if old format (too long for MQTT 3.1.1) if (id == null || id.length > 23) { // MQTT 3.1.1 client IDs: max 23 chars, alphanumeric id = 'pailot${const Uuid().v4().replaceAll('-', '').substring(0, 16)}'; await prefs.setString('mqtt_client_id', id); } _clientId = id; return id; } /// Connect to the MQTT broker. /// Tries local host first (2.5s timeout), then remote host. Future connect() async { if (_status == ConnectionStatus.connected || _status == ConnectionStatus.connecting) { return; } _intentionalClose = false; _setStatus(ConnectionStatus.connecting); // Send Wake-on-LAN if MAC configured if (config.macAddress != null && config.macAddress!.isNotEmpty) { try { await WolService.wake(config.macAddress!, localHost: config.localHost); } catch (_) {} } final clientId = await _getClientId(); final hosts = _getHosts(); _mqttLog('MQTT: hosts=${hosts.join(", ")} port=${config.port}'); for (final host in hosts) { if (_intentionalClose) return; _mqttLog('MQTT: trying $host:${config.port}'); try { final connected = await _tryConnect( host, clientId, timeout: host == hosts.first && hosts.length > 1 ? 2500 : 5000, ); _mqttLog('MQTT: $host result=$connected'); if (connected) return; } catch (e) { _mqttLog('MQTT: $host error=$e'); continue; } } // All hosts failed — retry after delay _mqttLog('MQTT: all hosts failed, retrying in 5s'); _setStatus(ConnectionStatus.reconnecting); Future.delayed(const Duration(seconds: 5), () { if (!_intentionalClose && _status != ConnectionStatus.connected) { connect(); } }); } /// Returns [localHost, remoteHost] for dual-connect attempts. List _getHosts() { if (config.localHost != null && config.localHost!.isNotEmpty && config.localHost != config.host) { return [config.localHost!, config.host]; } return [config.host]; } Future _tryConnect(String host, String clientId, {int timeout = 5000}) async { try { final client = MqttServerClient.withPort(host, clientId, config.port); client.keepAlivePeriod = 30; client.autoReconnect = false; // Don't auto-reconnect during trial — enable after success client.connectTimeoutPeriod = timeout; // client.maxConnectionAttempts is final — can't set it client.logging(on: false); client.onConnected = _onConnected; client.onDisconnected = _onDisconnected; client.onAutoReconnect = _onAutoReconnect; client.onAutoReconnected = _onAutoReconnected; // Clean session: we handle offline delivery ourselves via catch_up protocol. // Persistent sessions cause the broker to flood all queued QoS 1 messages // on reconnect, which overwhelms the client with large voice payloads. final connMessage = MqttConnectMessage() .withClientIdentifier(clientId) .startClean() .authenticateAs('pailot', config.mqttToken ?? ''); client.connectionMessage = connMessage; // Set _client BEFORE connect() so _onConnected can subscribe _client = client; _mqttLog('MQTT: connecting to $host:${config.port} as $clientId (timeout=${timeout}ms)'); final result = await client.connect().timeout( Duration(milliseconds: timeout + 1000), onTimeout: () { _mqttLog('MQTT: connect timed out for $host'); return null; }, ); _mqttLog('MQTT: connect result=${result?.state}'); if (result?.state == MqttConnectionState.connected) { client.autoReconnect = true; // Now enable auto-reconnect for the live connection return true; } _client = null; client.disconnect(); return false; } catch (e) { _mqttLog('MQTT: connect exception=$e'); return false; } } void _onConnected() { _mqttLog('MQTT: _onConnected fired'); _setStatus(ConnectionStatus.connected); _subscribe(); _listenMessages(); onOpen?.call(); } void _onDisconnected() { _updatesSub?.cancel(); _updatesSub = null; if (_intentionalClose) { _setStatus(ConnectionStatus.disconnected); onClose?.call(); } else { _setStatus(ConnectionStatus.reconnecting); onReconnecting?.call(); } } void _onAutoReconnect() { _setStatus(ConnectionStatus.reconnecting); onReconnecting?.call(); } void _onAutoReconnected() { _setStatus(ConnectionStatus.connected); _subscribe(); _listenMessages(); onOpen?.call(); } void _subscribe() { final client = _client; if (client == null) { _mqttLog('MQTT: _subscribe called but client is null'); return; } _mqttLog('MQTT: subscribing to topics...'); client.subscribe('pailot/sessions', MqttQos.atLeastOnce); client.subscribe('pailot/status', MqttQos.atLeastOnce); client.subscribe('pailot/projects', MqttQos.atLeastOnce); client.subscribe('pailot/+/out', MqttQos.atLeastOnce); client.subscribe('pailot/+/typing', MqttQos.atMostOnce); client.subscribe('pailot/+/screenshot', MqttQos.atLeastOnce); client.subscribe('pailot/control/out', MqttQos.atLeastOnce); client.subscribe('pailot/voice/transcript', MqttQos.atLeastOnce); } void _listenMessages() { _updatesSub?.cancel(); _updatesSub = _client?.updates?.listen(_onMqttMessage); } void _onMqttMessage(List> messages) { _mqttLog('MQTT: received ${messages.length} message(s)'); for (final msg in messages) { _mqttLog('MQTT: topic=${msg.topic}'); final pubMsg = msg.payload as MqttPublishMessage; final payload = MqttPublishPayload.bytesToStringAsString( pubMsg.payload.message, ); Map json; try { json = jsonDecode(payload) as Map; } catch (_) { continue; // Skip non-JSON } // Dedup by msgId final msgId = json['msgId'] as String?; if (msgId != null) { if (_seenMsgIds.contains(msgId)) continue; _seenMsgIds.add(msgId); _seenMsgIdOrder.add(msgId); _evictOldIds(); } // Dispatch: parse topic to enrich the message with routing info _dispatchMessage(msg.topic, json); } } /// Route incoming MQTT messages to the onMessage callback. /// Translates MQTT topic structure into the flat message format /// that chat_screen expects. void _dispatchMessage(String topic, Map json) { final parts = topic.split('/'); // pailot/sessions if (topic == 'pailot/sessions') { json['type'] = 'sessions'; onMessage?.call(json); return; } // pailot/status if (topic == 'pailot/status') { json['type'] = 'status'; onMessage?.call(json); return; } // pailot/projects if (topic == 'pailot/projects') { json['type'] = 'projects'; onMessage?.call(json); return; } // pailot/control/out — command responses (session_switched, session_renamed, error, unread) if (topic == 'pailot/control/out') { onMessage?.call(json); return; } // pailot/voice/transcript if (topic == 'pailot/voice/transcript') { json['type'] = 'transcript'; onMessage?.call(json); return; } // pailot//out — text, voice, image messages if (parts.length == 3 && parts[2] == 'out') { final sessionId = parts[1]; json['sessionId'] ??= sessionId; onMessage?.call(json); return; } // pailot//typing if (parts.length == 3 && parts[2] == 'typing') { final sessionId = parts[1]; json['type'] = 'typing'; json['sessionId'] ??= sessionId; // Map 'active' field to the 'typing'/'isTyping' fields chat_screen expects final active = json['active'] as bool? ?? true; json['typing'] = active; onMessage?.call(json); return; } // pailot//screenshot if (parts.length == 3 && parts[2] == 'screenshot') { final sessionId = parts[1]; json['type'] = 'screenshot'; json['sessionId'] ??= sessionId; // Map imageBase64 to 'data' for compatibility with chat_screen handler json['data'] ??= json['imageBase64']; onMessage?.call(json); return; } } void _evictOldIds() { while (_seenMsgIdOrder.length > _maxSeenIds) { final oldest = _seenMsgIdOrder.removeAt(0); _seenMsgIds.remove(oldest); } } /// Generate a UUID v4 for message IDs. String _uuid() => const Uuid().v4(); /// Current timestamp in milliseconds. int _now() => DateTime.now().millisecondsSinceEpoch; /// Publish a JSON payload to an MQTT topic. void _publish(String topic, Map payload, MqttQos qos) { final client = _client; if (client == null || client.connectionStatus?.state != MqttConnectionState.connected) { onError?.call('Not connected'); return; } try { final builder = MqttClientPayloadBuilder(); builder.addString(jsonEncode(payload)); client.publishMessage(topic, qos, builder.payload!); } catch (e) { onError?.call('Send failed: $e'); } } /// Send a message — routes to the appropriate MQTT topic based on content. void send(Map message) { final type = message['type'] as String?; final sessionId = message['sessionId'] as String?; if (type == 'command' || (message.containsKey('command') && type == null)) { // Command messages go to pailot/control/in final command = message['command'] as String? ?? ''; final args = message['args'] as Map? ?? {}; final payload = { 'msgId': _uuid(), 'type': 'command', 'command': command, 'ts': _now(), ...args, }; _publish('pailot/control/in', payload, MqttQos.atLeastOnce); return; } if (type == 'voice' && sessionId != null) { // Voice message _publish('pailot/$sessionId/in', { 'msgId': _uuid(), 'type': 'voice', 'sessionId': sessionId, 'audioBase64': message['audioBase64'] ?? '', 'messageId': message['messageId'] ?? '', 'ts': _now(), }, MqttQos.atLeastOnce); return; } if (type == 'image' && sessionId != null) { _publish('pailot/$sessionId/in', { 'msgId': _uuid(), 'type': 'image', 'sessionId': sessionId, 'imageBase64': message['imageBase64'] ?? '', 'mimeType': message['mimeType'] ?? 'image/jpeg', 'caption': message['caption'] ?? '', 'ts': _now(), }, MqttQos.atLeastOnce); return; } if (type == 'bundle' && sessionId != null) { // Atomic multi-attachment message _publish('pailot/$sessionId/in', { 'msgId': _uuid(), 'type': 'bundle', 'sessionId': sessionId, 'caption': message['caption'] ?? '', if (message['audioBase64'] != null) 'audioBase64': message['audioBase64'], if (message['voiceMessageId'] != null) 'voiceMessageId': message['voiceMessageId'], 'attachments': message['attachments'] ?? [], 'ts': _now(), }, MqttQos.atLeastOnce); return; } if (type == 'file' && sessionId != null) { _publish('pailot/$sessionId/in', { 'msgId': _uuid(), 'type': 'file', 'sessionId': sessionId, 'fileBase64': message['fileBase64'] ?? '', 'fileName': message['fileName'] ?? 'file', 'mimeType': message['mimeType'] ?? 'application/octet-stream', 'fileSize': message['fileSize'] ?? 0, 'ts': _now(), }, MqttQos.atLeastOnce); return; } if (type == 'tts' && sessionId != null) { // TTS request — route as command _publish('pailot/control/in', { 'msgId': _uuid(), 'type': 'command', 'command': 'tts', 'text': message['text'] ?? '', 'sessionId': sessionId, 'ts': _now(), }, MqttQos.atLeastOnce); return; } // Default: plain text message (content + sessionId) if (sessionId != null) { final content = message['content'] as String? ?? ''; _publish('pailot/$sessionId/in', { 'msgId': _uuid(), 'type': 'text', 'sessionId': sessionId, 'content': content, 'ts': _now(), }, MqttQos.atLeastOnce); return; } onError?.call('Cannot send message: missing sessionId'); } /// Disconnect intentionally. void disconnect() { _intentionalClose = true; _updatesSub?.cancel(); _updatesSub = null; try { _client?.disconnect(); } catch (_) {} _client = null; _setStatus(ConnectionStatus.disconnected); onClose?.call(); } /// Update config and reconnect. Future updateConfig(ServerConfig newConfig) async { config = newConfig; disconnect(); await Future.delayed(const Duration(milliseconds: 100)); await connect(); } /// Dispose all resources. void dispose() { disconnect(); } // App lifecycle integration @override void didChangeAppLifecycleState(AppLifecycleState state) { switch (state) { case AppLifecycleState.resumed: if (_intentionalClose) break; _mqttLog('MQTT: app resumed, status=$_status client=${_client != null} mqttState=${_client?.connectionStatus?.state}'); final client = _client; if (client == null || client.connectionStatus?.state != MqttConnectionState.connected) { // Clearly disconnected — just reconnect _mqttLog('MQTT: not connected on resume, reconnecting...'); _client = null; _setStatus(ConnectionStatus.reconnecting); connect(); } else { // Appears connected — notify listener to fetch missed messages // via catch_up. Don't call onOpen (it resets sessionReady and causes flicker). _mqttLog('MQTT: appears connected on resume, triggering catch_up'); onResume?.call(); } case AppLifecycleState.paused: break; default: break; } } }