| .. | .. |
|---|
| 1 | +import 'dart:async'; |
|---|
| 2 | +import 'dart:convert'; |
|---|
| 3 | + |
|---|
| 4 | +import 'package:flutter/widgets.dart'; |
|---|
| 5 | +import 'package:mqtt_client/mqtt_client.dart'; |
|---|
| 6 | +import 'package:mqtt_client/mqtt_server_client.dart'; |
|---|
| 7 | +import 'package:shared_preferences/shared_preferences.dart'; |
|---|
| 8 | +import 'package:uuid/uuid.dart'; |
|---|
| 9 | + |
|---|
| 10 | +import '../models/server_config.dart'; |
|---|
| 11 | +import 'websocket_service.dart' show ConnectionStatus; |
|---|
| 12 | +import 'wol_service.dart'; |
|---|
| 13 | + |
|---|
| 14 | +/// MQTT port — standard unencrypted MQTT. |
|---|
| 15 | +const int mqttPort = 1883; |
|---|
| 16 | + |
|---|
| 17 | +/// MQTT client for PAILot, replacing WebSocketService. |
|---|
| 18 | +/// |
|---|
| 19 | +/// Connects to the AIBroker daemon's embedded aedes broker. |
|---|
| 20 | +/// Subscribes to all pailot/ topics and dispatches messages |
|---|
| 21 | +/// through the same callback interface as WebSocketService. |
|---|
| 22 | +class MqttService with WidgetsBindingObserver { |
|---|
| 23 | + MqttService({required this.config}); |
|---|
| 24 | + |
|---|
| 25 | + ServerConfig config; |
|---|
| 26 | + MqttServerClient? _client; |
|---|
| 27 | + ConnectionStatus _status = ConnectionStatus.disconnected; |
|---|
| 28 | + bool _intentionalClose = false; |
|---|
| 29 | + String? _clientId; |
|---|
| 30 | + StreamSubscription? _updatesSub; |
|---|
| 31 | + |
|---|
| 32 | + // Message deduplication |
|---|
| 33 | + final Set<String> _seenMsgIds = {}; |
|---|
| 34 | + final List<String> _seenMsgIdOrder = []; |
|---|
| 35 | + static const int _maxSeenIds = 500; |
|---|
| 36 | + |
|---|
| 37 | + // Callbacks — same interface as WebSocketService |
|---|
| 38 | + void Function(ConnectionStatus status)? onStatusChanged; |
|---|
| 39 | + void Function(Map<String, dynamic> message)? onMessage; |
|---|
| 40 | + void Function()? onOpen; |
|---|
| 41 | + void Function()? onClose; |
|---|
| 42 | + void Function()? onReconnecting; |
|---|
| 43 | + void Function(String error)? onError; |
|---|
| 44 | + |
|---|
| 45 | + ConnectionStatus get status => _status; |
|---|
| 46 | + bool get isConnected => _status == ConnectionStatus.connected; |
|---|
| 47 | + |
|---|
| 48 | + void _setStatus(ConnectionStatus newStatus) { |
|---|
| 49 | + if (_status == newStatus) return; |
|---|
| 50 | + _status = newStatus; |
|---|
| 51 | + onStatusChanged?.call(newStatus); |
|---|
| 52 | + } |
|---|
| 53 | + |
|---|
| 54 | + /// Get or create a persistent client ID for this device. |
|---|
| 55 | + Future<String> _getClientId() async { |
|---|
| 56 | + if (_clientId != null) return _clientId!; |
|---|
| 57 | + final prefs = await SharedPreferences.getInstance(); |
|---|
| 58 | + var id = prefs.getString('mqtt_client_id'); |
|---|
| 59 | + if (id == null) { |
|---|
| 60 | + id = 'pailot-${const Uuid().v4()}'; |
|---|
| 61 | + await prefs.setString('mqtt_client_id', id); |
|---|
| 62 | + } |
|---|
| 63 | + _clientId = id; |
|---|
| 64 | + return id; |
|---|
| 65 | + } |
|---|
| 66 | + |
|---|
| 67 | + /// Connect to the MQTT broker. |
|---|
| 68 | + /// Tries local host first (2.5s timeout), then remote host. |
|---|
| 69 | + Future<void> connect() async { |
|---|
| 70 | + if (_status == ConnectionStatus.connected || |
|---|
| 71 | + _status == ConnectionStatus.connecting) { |
|---|
| 72 | + return; |
|---|
| 73 | + } |
|---|
| 74 | + |
|---|
| 75 | + _intentionalClose = false; |
|---|
| 76 | + _setStatus(ConnectionStatus.connecting); |
|---|
| 77 | + |
|---|
| 78 | + // Send Wake-on-LAN if MAC configured |
|---|
| 79 | + if (config.macAddress != null && config.macAddress!.isNotEmpty) { |
|---|
| 80 | + try { |
|---|
| 81 | + await WolService.wake(config.macAddress!, localHost: config.localHost); |
|---|
| 82 | + } catch (_) {} |
|---|
| 83 | + } |
|---|
| 84 | + |
|---|
| 85 | + final clientId = await _getClientId(); |
|---|
| 86 | + final hosts = _getHosts(); |
|---|
| 87 | + |
|---|
| 88 | + for (final host in hosts) { |
|---|
| 89 | + if (_intentionalClose) return; |
|---|
| 90 | + |
|---|
| 91 | + try { |
|---|
| 92 | + final connected = await _tryConnect( |
|---|
| 93 | + host, |
|---|
| 94 | + clientId, |
|---|
| 95 | + timeout: host == hosts.first && hosts.length > 1 ? 2500 : 5000, |
|---|
| 96 | + ); |
|---|
| 97 | + if (connected) return; |
|---|
| 98 | + } catch (_) { |
|---|
| 99 | + continue; |
|---|
| 100 | + } |
|---|
| 101 | + } |
|---|
| 102 | + |
|---|
| 103 | + // All hosts failed |
|---|
| 104 | + _setStatus(ConnectionStatus.disconnected); |
|---|
| 105 | + onError?.call('Failed to connect to MQTT broker'); |
|---|
| 106 | + } |
|---|
| 107 | + |
|---|
| 108 | + /// Returns [localHost, remoteHost] for dual-connect attempts. |
|---|
| 109 | + List<String> _getHosts() { |
|---|
| 110 | + if (config.localHost != null && |
|---|
| 111 | + config.localHost!.isNotEmpty && |
|---|
| 112 | + config.localHost != config.host) { |
|---|
| 113 | + return [config.localHost!, config.host]; |
|---|
| 114 | + } |
|---|
| 115 | + return [config.host]; |
|---|
| 116 | + } |
|---|
| 117 | + |
|---|
| 118 | + Future<bool> _tryConnect(String host, String clientId, {int timeout = 5000}) async { |
|---|
| 119 | + try { |
|---|
| 120 | + final client = MqttServerClient.withPort(host, clientId, mqttPort); |
|---|
| 121 | + client.keepAlivePeriod = 30; |
|---|
| 122 | + client.autoReconnect = true; |
|---|
| 123 | + client.connectTimeoutPeriod = timeout; |
|---|
| 124 | + client.logging(on: false); |
|---|
| 125 | + |
|---|
| 126 | + client.onConnected = _onConnected; |
|---|
| 127 | + client.onDisconnected = _onDisconnected; |
|---|
| 128 | + client.onAutoReconnect = _onAutoReconnect; |
|---|
| 129 | + client.onAutoReconnected = _onAutoReconnected; |
|---|
| 130 | + |
|---|
| 131 | + // Persistent session (cleanSession = false) for offline message queuing |
|---|
| 132 | + final connMessage = MqttConnectMessage() |
|---|
| 133 | + .withClientIdentifier(clientId) |
|---|
| 134 | + .authenticateAs('pailot', config.mqttToken ?? '') |
|---|
| 135 | + .startClean(); // Use clean session for now; persistent sessions require broker support |
|---|
| 136 | + |
|---|
| 137 | + // For persistent sessions, replace startClean() with: |
|---|
| 138 | + // .withWillQos(MqttQos.atLeastOnce); |
|---|
| 139 | + // and remove startClean() |
|---|
| 140 | + |
|---|
| 141 | + client.connectionMessage = connMessage; |
|---|
| 142 | + |
|---|
| 143 | + final result = await client.connect(); |
|---|
| 144 | + if (result?.state == MqttConnectionState.connected) { |
|---|
| 145 | + _client = client; |
|---|
| 146 | + return true; |
|---|
| 147 | + } |
|---|
| 148 | + client.disconnect(); |
|---|
| 149 | + return false; |
|---|
| 150 | + } catch (e) { |
|---|
| 151 | + return false; |
|---|
| 152 | + } |
|---|
| 153 | + } |
|---|
| 154 | + |
|---|
| 155 | + void _onConnected() { |
|---|
| 156 | + _setStatus(ConnectionStatus.connected); |
|---|
| 157 | + _subscribe(); |
|---|
| 158 | + _listenMessages(); |
|---|
| 159 | + onOpen?.call(); |
|---|
| 160 | + } |
|---|
| 161 | + |
|---|
| 162 | + void _onDisconnected() { |
|---|
| 163 | + _updatesSub?.cancel(); |
|---|
| 164 | + _updatesSub = null; |
|---|
| 165 | + |
|---|
| 166 | + if (_intentionalClose) { |
|---|
| 167 | + _setStatus(ConnectionStatus.disconnected); |
|---|
| 168 | + onClose?.call(); |
|---|
| 169 | + } else { |
|---|
| 170 | + _setStatus(ConnectionStatus.reconnecting); |
|---|
| 171 | + onReconnecting?.call(); |
|---|
| 172 | + } |
|---|
| 173 | + } |
|---|
| 174 | + |
|---|
| 175 | + void _onAutoReconnect() { |
|---|
| 176 | + _setStatus(ConnectionStatus.reconnecting); |
|---|
| 177 | + onReconnecting?.call(); |
|---|
| 178 | + } |
|---|
| 179 | + |
|---|
| 180 | + void _onAutoReconnected() { |
|---|
| 181 | + _setStatus(ConnectionStatus.connected); |
|---|
| 182 | + _subscribe(); |
|---|
| 183 | + _listenMessages(); |
|---|
| 184 | + onOpen?.call(); |
|---|
| 185 | + } |
|---|
| 186 | + |
|---|
| 187 | + void _subscribe() { |
|---|
| 188 | + final client = _client; |
|---|
| 189 | + if (client == null) return; |
|---|
| 190 | + |
|---|
| 191 | + client.subscribe('pailot/sessions', MqttQos.atLeastOnce); |
|---|
| 192 | + client.subscribe('pailot/status', MqttQos.atLeastOnce); |
|---|
| 193 | + client.subscribe('pailot/projects', MqttQos.atLeastOnce); |
|---|
| 194 | + client.subscribe('pailot/+/out', MqttQos.atLeastOnce); |
|---|
| 195 | + client.subscribe('pailot/+/typing', MqttQos.atMostOnce); |
|---|
| 196 | + client.subscribe('pailot/+/screenshot', MqttQos.atLeastOnce); |
|---|
| 197 | + client.subscribe('pailot/control/out', MqttQos.atLeastOnce); |
|---|
| 198 | + client.subscribe('pailot/voice/transcript', MqttQos.atLeastOnce); |
|---|
| 199 | + } |
|---|
| 200 | + |
|---|
| 201 | + void _listenMessages() { |
|---|
| 202 | + _updatesSub?.cancel(); |
|---|
| 203 | + _updatesSub = _client?.updates?.listen(_onMqttMessage); |
|---|
| 204 | + } |
|---|
| 205 | + |
|---|
| 206 | + void _onMqttMessage(List<MqttReceivedMessage<MqttMessage>> messages) { |
|---|
| 207 | + for (final msg in messages) { |
|---|
| 208 | + final pubMsg = msg.payload as MqttPublishMessage; |
|---|
| 209 | + final payload = MqttPublishPayload.bytesToStringAsString( |
|---|
| 210 | + pubMsg.payload.message, |
|---|
| 211 | + ); |
|---|
| 212 | + |
|---|
| 213 | + Map<String, dynamic> json; |
|---|
| 214 | + try { |
|---|
| 215 | + json = jsonDecode(payload) as Map<String, dynamic>; |
|---|
| 216 | + } catch (_) { |
|---|
| 217 | + continue; // Skip non-JSON |
|---|
| 218 | + } |
|---|
| 219 | + |
|---|
| 220 | + // Dedup by msgId |
|---|
| 221 | + final msgId = json['msgId'] as String?; |
|---|
| 222 | + if (msgId != null) { |
|---|
| 223 | + if (_seenMsgIds.contains(msgId)) continue; |
|---|
| 224 | + _seenMsgIds.add(msgId); |
|---|
| 225 | + _seenMsgIdOrder.add(msgId); |
|---|
| 226 | + _evictOldIds(); |
|---|
| 227 | + } |
|---|
| 228 | + |
|---|
| 229 | + // Dispatch: parse topic to enrich the message with routing info |
|---|
| 230 | + _dispatchMessage(msg.topic, json); |
|---|
| 231 | + } |
|---|
| 232 | + } |
|---|
| 233 | + |
|---|
| 234 | + /// Route incoming MQTT messages to the onMessage callback. |
|---|
| 235 | + /// Translates MQTT topic structure into the flat message format |
|---|
| 236 | + /// that chat_screen expects (same as WebSocket messages). |
|---|
| 237 | + void _dispatchMessage(String topic, Map<String, dynamic> json) { |
|---|
| 238 | + final parts = topic.split('/'); |
|---|
| 239 | + |
|---|
| 240 | + // pailot/sessions |
|---|
| 241 | + if (topic == 'pailot/sessions') { |
|---|
| 242 | + json['type'] = 'sessions'; |
|---|
| 243 | + onMessage?.call(json); |
|---|
| 244 | + return; |
|---|
| 245 | + } |
|---|
| 246 | + |
|---|
| 247 | + // pailot/status |
|---|
| 248 | + if (topic == 'pailot/status') { |
|---|
| 249 | + json['type'] = 'status'; |
|---|
| 250 | + onMessage?.call(json); |
|---|
| 251 | + return; |
|---|
| 252 | + } |
|---|
| 253 | + |
|---|
| 254 | + // pailot/projects |
|---|
| 255 | + if (topic == 'pailot/projects') { |
|---|
| 256 | + json['type'] = 'projects'; |
|---|
| 257 | + onMessage?.call(json); |
|---|
| 258 | + return; |
|---|
| 259 | + } |
|---|
| 260 | + |
|---|
| 261 | + // pailot/control/out — command responses (session_switched, session_renamed, error, unread) |
|---|
| 262 | + if (topic == 'pailot/control/out') { |
|---|
| 263 | + onMessage?.call(json); |
|---|
| 264 | + return; |
|---|
| 265 | + } |
|---|
| 266 | + |
|---|
| 267 | + // pailot/voice/transcript |
|---|
| 268 | + if (topic == 'pailot/voice/transcript') { |
|---|
| 269 | + json['type'] = 'transcript'; |
|---|
| 270 | + onMessage?.call(json); |
|---|
| 271 | + return; |
|---|
| 272 | + } |
|---|
| 273 | + |
|---|
| 274 | + // pailot/<sessionId>/out — text, voice, image messages |
|---|
| 275 | + if (parts.length == 3 && parts[2] == 'out') { |
|---|
| 276 | + final sessionId = parts[1]; |
|---|
| 277 | + json['sessionId'] ??= sessionId; |
|---|
| 278 | + onMessage?.call(json); |
|---|
| 279 | + return; |
|---|
| 280 | + } |
|---|
| 281 | + |
|---|
| 282 | + // pailot/<sessionId>/typing |
|---|
| 283 | + if (parts.length == 3 && parts[2] == 'typing') { |
|---|
| 284 | + final sessionId = parts[1]; |
|---|
| 285 | + json['type'] = 'typing'; |
|---|
| 286 | + json['sessionId'] ??= sessionId; |
|---|
| 287 | + // Map 'active' field to the 'typing'/'isTyping' fields chat_screen expects |
|---|
| 288 | + final active = json['active'] as bool? ?? true; |
|---|
| 289 | + json['typing'] = active; |
|---|
| 290 | + onMessage?.call(json); |
|---|
| 291 | + return; |
|---|
| 292 | + } |
|---|
| 293 | + |
|---|
| 294 | + // pailot/<sessionId>/screenshot |
|---|
| 295 | + if (parts.length == 3 && parts[2] == 'screenshot') { |
|---|
| 296 | + final sessionId = parts[1]; |
|---|
| 297 | + json['type'] = 'screenshot'; |
|---|
| 298 | + json['sessionId'] ??= sessionId; |
|---|
| 299 | + // Map imageBase64 to 'data' for compatibility with chat_screen handler |
|---|
| 300 | + json['data'] ??= json['imageBase64']; |
|---|
| 301 | + onMessage?.call(json); |
|---|
| 302 | + return; |
|---|
| 303 | + } |
|---|
| 304 | + } |
|---|
| 305 | + |
|---|
| 306 | + void _evictOldIds() { |
|---|
| 307 | + while (_seenMsgIdOrder.length > _maxSeenIds) { |
|---|
| 308 | + final oldest = _seenMsgIdOrder.removeAt(0); |
|---|
| 309 | + _seenMsgIds.remove(oldest); |
|---|
| 310 | + } |
|---|
| 311 | + } |
|---|
| 312 | + |
|---|
| 313 | + /// Generate a UUID v4 for message IDs. |
|---|
| 314 | + String _uuid() => const Uuid().v4(); |
|---|
| 315 | + |
|---|
| 316 | + /// Current timestamp in milliseconds. |
|---|
| 317 | + int _now() => DateTime.now().millisecondsSinceEpoch; |
|---|
| 318 | + |
|---|
| 319 | + /// Publish a JSON payload to an MQTT topic. |
|---|
| 320 | + void _publish(String topic, Map<String, dynamic> payload, MqttQos qos) { |
|---|
| 321 | + final client = _client; |
|---|
| 322 | + if (client == null || client.connectionStatus?.state != MqttConnectionState.connected) { |
|---|
| 323 | + onError?.call('Not connected'); |
|---|
| 324 | + return; |
|---|
| 325 | + } |
|---|
| 326 | + |
|---|
| 327 | + try { |
|---|
| 328 | + final builder = MqttClientPayloadBuilder(); |
|---|
| 329 | + builder.addString(jsonEncode(payload)); |
|---|
| 330 | + client.publishMessage(topic, qos, builder.payload!); |
|---|
| 331 | + } catch (e) { |
|---|
| 332 | + onError?.call('Send failed: $e'); |
|---|
| 333 | + } |
|---|
| 334 | + } |
|---|
| 335 | + |
|---|
| 336 | + /// Send a message — routes to the appropriate MQTT topic based on content. |
|---|
| 337 | + /// Accepts the same message format as WebSocketService.send(). |
|---|
| 338 | + void send(Map<String, dynamic> message) { |
|---|
| 339 | + final type = message['type'] as String?; |
|---|
| 340 | + final sessionId = message['sessionId'] as String?; |
|---|
| 341 | + |
|---|
| 342 | + if (type == 'command' || (message.containsKey('command') && type == null)) { |
|---|
| 343 | + // Command messages go to pailot/control/in |
|---|
| 344 | + final command = message['command'] as String? ?? ''; |
|---|
| 345 | + final args = message['args'] as Map<String, dynamic>? ?? {}; |
|---|
| 346 | + final payload = <String, dynamic>{ |
|---|
| 347 | + 'msgId': _uuid(), |
|---|
| 348 | + 'type': 'command', |
|---|
| 349 | + 'command': command, |
|---|
| 350 | + 'ts': _now(), |
|---|
| 351 | + ...args, |
|---|
| 352 | + }; |
|---|
| 353 | + _publish('pailot/control/in', payload, MqttQos.atLeastOnce); |
|---|
| 354 | + return; |
|---|
| 355 | + } |
|---|
| 356 | + |
|---|
| 357 | + if (type == 'voice' && sessionId != null) { |
|---|
| 358 | + // Voice message |
|---|
| 359 | + _publish('pailot/$sessionId/in', { |
|---|
| 360 | + 'msgId': _uuid(), |
|---|
| 361 | + 'type': 'voice', |
|---|
| 362 | + 'sessionId': sessionId, |
|---|
| 363 | + 'audioBase64': message['audioBase64'] ?? '', |
|---|
| 364 | + 'ts': _now(), |
|---|
| 365 | + }, MqttQos.atLeastOnce); |
|---|
| 366 | + return; |
|---|
| 367 | + } |
|---|
| 368 | + |
|---|
| 369 | + if (type == 'image' && sessionId != null) { |
|---|
| 370 | + // Image message |
|---|
| 371 | + _publish('pailot/$sessionId/in', { |
|---|
| 372 | + 'msgId': _uuid(), |
|---|
| 373 | + 'type': 'image', |
|---|
| 374 | + 'sessionId': sessionId, |
|---|
| 375 | + 'imageBase64': message['imageBase64'] ?? '', |
|---|
| 376 | + 'mimeType': message['mimeType'] ?? 'image/jpeg', |
|---|
| 377 | + 'caption': message['caption'] ?? '', |
|---|
| 378 | + 'ts': _now(), |
|---|
| 379 | + }, MqttQos.atLeastOnce); |
|---|
| 380 | + return; |
|---|
| 381 | + } |
|---|
| 382 | + |
|---|
| 383 | + if (type == 'tts' && sessionId != null) { |
|---|
| 384 | + // TTS request — route as command |
|---|
| 385 | + _publish('pailot/control/in', { |
|---|
| 386 | + 'msgId': _uuid(), |
|---|
| 387 | + 'type': 'command', |
|---|
| 388 | + 'command': 'tts', |
|---|
| 389 | + 'text': message['text'] ?? '', |
|---|
| 390 | + 'sessionId': sessionId, |
|---|
| 391 | + 'ts': _now(), |
|---|
| 392 | + }, MqttQos.atLeastOnce); |
|---|
| 393 | + return; |
|---|
| 394 | + } |
|---|
| 395 | + |
|---|
| 396 | + // Default: plain text message (content + sessionId) |
|---|
| 397 | + if (sessionId != null) { |
|---|
| 398 | + final content = message['content'] as String? ?? ''; |
|---|
| 399 | + _publish('pailot/$sessionId/in', { |
|---|
| 400 | + 'msgId': _uuid(), |
|---|
| 401 | + 'type': 'text', |
|---|
| 402 | + 'sessionId': sessionId, |
|---|
| 403 | + 'content': content, |
|---|
| 404 | + 'ts': _now(), |
|---|
| 405 | + }, MqttQos.atLeastOnce); |
|---|
| 406 | + return; |
|---|
| 407 | + } |
|---|
| 408 | + |
|---|
| 409 | + onError?.call('Cannot send message: missing sessionId'); |
|---|
| 410 | + } |
|---|
| 411 | + |
|---|
| 412 | + /// Disconnect intentionally. |
|---|
| 413 | + void disconnect() { |
|---|
| 414 | + _intentionalClose = true; |
|---|
| 415 | + _updatesSub?.cancel(); |
|---|
| 416 | + _updatesSub = null; |
|---|
| 417 | + |
|---|
| 418 | + try { |
|---|
| 419 | + _client?.disconnect(); |
|---|
| 420 | + } catch (_) {} |
|---|
| 421 | + _client = null; |
|---|
| 422 | + |
|---|
| 423 | + _setStatus(ConnectionStatus.disconnected); |
|---|
| 424 | + onClose?.call(); |
|---|
| 425 | + } |
|---|
| 426 | + |
|---|
| 427 | + /// Update config and reconnect. |
|---|
| 428 | + Future<void> updateConfig(ServerConfig newConfig) async { |
|---|
| 429 | + config = newConfig; |
|---|
| 430 | + disconnect(); |
|---|
| 431 | + await Future.delayed(const Duration(milliseconds: 100)); |
|---|
| 432 | + await connect(); |
|---|
| 433 | + } |
|---|
| 434 | + |
|---|
| 435 | + /// Dispose all resources. |
|---|
| 436 | + void dispose() { |
|---|
| 437 | + disconnect(); |
|---|
| 438 | + } |
|---|
| 439 | + |
|---|
| 440 | + // App lifecycle integration |
|---|
| 441 | + @override |
|---|
| 442 | + void didChangeAppLifecycleState(AppLifecycleState state) { |
|---|
| 443 | + switch (state) { |
|---|
| 444 | + case AppLifecycleState.resumed: |
|---|
| 445 | + if (_status != ConnectionStatus.connected && !_intentionalClose) { |
|---|
| 446 | + connect(); |
|---|
| 447 | + } |
|---|
| 448 | + case AppLifecycleState.paused: |
|---|
| 449 | + // Keep connection alive — MQTT handles keepalive natively |
|---|
| 450 | + break; |
|---|
| 451 | + default: |
|---|
| 452 | + break; |
|---|
| 453 | + } |
|---|
| 454 | + } |
|---|
| 455 | +} |
|---|