From c4ce6380fbfa55f22e9c20bb2ccffe4456ed9683 Mon Sep 17 00:00:00 2001
From: Matthias Nott <mnott@mnsoft.org>
Date: Sun, 22 Mar 2026 17:37:55 +0100
Subject: [PATCH] feat: MQTT client replaces WebSocket (Phase 2)

---
 lib/screens/settings_screen.dart |   22 ++
 lib/providers/providers.dart     |   10 
 lib/models/server_config.dart    |    6 
 lib/services/mqtt_service.dart   |  455 ++++++++++++++++++++++++++++++++++++++++++++++++++
 pubspec.lock                     |   16 +
 lib/screens/chat_screen.dart     |    9 
 pubspec.yaml                     |    1 
 7 files changed, 507 insertions(+), 12 deletions(-)

diff --git a/lib/models/server_config.dart b/lib/models/server_config.dart
index 7aecc9c..7963c66 100644
--- a/lib/models/server_config.dart
+++ b/lib/models/server_config.dart
@@ -3,12 +3,14 @@
   final int port;
   final String? localHost;
   final String? macAddress;
+  final String? mqttToken;
 
   const ServerConfig({
     required this.host,
     this.port = 8765,
     this.localHost,
     this.macAddress,
+    this.mqttToken,
   });
 
   /// Primary WebSocket URL (local network).
@@ -34,6 +36,7 @@
       'port': port,
       if (localHost != null) 'localHost': localHost,
       if (macAddress != null) 'macAddress': macAddress,
+      if (mqttToken != null) 'mqttToken': mqttToken,
     };
   }
 
@@ -43,6 +46,7 @@
       port: json['port'] as int? ?? 8765,
       localHost: json['localHost'] as String?,
       macAddress: json['macAddress'] as String?,
+      mqttToken: json['mqttToken'] as String?,
     );
   }
 
@@ -51,12 +55,14 @@
     int? port,
     String? localHost,
     String? macAddress,
+    String? mqttToken,
   }) {
     return ServerConfig(
       host: host ?? this.host,
       port: port ?? this.port,
       localHost: localHost ?? this.localHost,
       macAddress: macAddress ?? this.macAddress,
+      mqttToken: mqttToken ?? this.mqttToken,
     );
   }
 }
diff --git a/lib/providers/providers.dart b/lib/providers/providers.dart
index d4654dd..046f38f 100644
--- a/lib/providers/providers.dart
+++ b/lib/providers/providers.dart
@@ -8,7 +8,7 @@
 import '../models/server_config.dart';
 import '../models/session.dart';
 import '../services/message_store.dart';
-import '../services/websocket_service.dart';
+import '../services/websocket_service.dart' show ConnectionStatus;
 
 // --- Enums ---
 
@@ -197,9 +197,5 @@
 
 final inputModeProvider = StateProvider<InputMode>((ref) => InputMode.voice);
 
-// --- WebSocket Service (singleton) ---
-
-final webSocketServiceProvider = Provider<WebSocketService?>((ref) {
-  // This is managed manually in the chat screen
-  return null;
-});
+// --- MQTT Service (singleton) ---
+// The MqttService is managed manually in the chat screen.
diff --git a/lib/screens/chat_screen.dart b/lib/screens/chat_screen.dart
index 5f3e38c..b933ba2 100644
--- a/lib/screens/chat_screen.dart
+++ b/lib/screens/chat_screen.dart
@@ -14,7 +14,7 @@
 import '../providers/providers.dart';
 import '../services/audio_service.dart';
 import '../services/message_store.dart';
-import '../services/websocket_service.dart';
+import '../services/mqtt_service.dart';
 import '../theme/app_theme.dart';
 import '../widgets/command_bar.dart';
 import '../widgets/input_bar.dart';
@@ -34,7 +34,7 @@
 
 class _ChatScreenState extends ConsumerState<ChatScreen>
     with WidgetsBindingObserver {
-  WebSocketService? _ws;
+  MqttService? _ws;
   final TextEditingController _textController = TextEditingController();
   final ScrollController _scrollController = ScrollController();
   final GlobalKey<ScaffoldState> _scaffoldKey = GlobalKey<ScaffoldState>();
@@ -125,7 +125,7 @@
       if (config == null) return;
     }
 
-    _ws = WebSocketService(config: config);
+    _ws = MqttService(config: config);
     _ws!.onStatusChanged = (status) {
       if (mounted) {
         ref.read(wsStatusProvider.notifier).state = status;
@@ -135,10 +135,11 @@
     _ws!.onOpen = () {
       final activeId = ref.read(activeSessionIdProvider);
       _sendCommand('sync', activeId != null ? {'activeSessionId': activeId} : null);
+      // catch_up is still available during the transition period
       _sendCommand('catch_up', {'lastSeq': _lastSeq});
     };
     _ws!.onError = (error) {
-      debugPrint('WS error: $error');
+      debugPrint('MQTT error: $error');
     };
 
     NavigateNotifier.instance = NavigateNotifier(
diff --git a/lib/screens/settings_screen.dart b/lib/screens/settings_screen.dart
index 6cb89e7..ed129f8 100644
--- a/lib/screens/settings_screen.dart
+++ b/lib/screens/settings_screen.dart
@@ -3,7 +3,7 @@
 
 import '../models/server_config.dart';
 import '../providers/providers.dart';
-import '../services/websocket_service.dart';
+import '../services/websocket_service.dart' show ConnectionStatus;
 import '../services/wol_service.dart';
 import '../theme/app_theme.dart';
 import '../widgets/status_dot.dart';
@@ -21,6 +21,7 @@
   late final TextEditingController _remoteHostController;
   late final TextEditingController _portController;
   late final TextEditingController _macController;
+  late final TextEditingController _mqttTokenController;
   bool _isWaking = false;
 
   @override
@@ -35,6 +36,8 @@
         TextEditingController(text: '${config?.port ?? 8765}');
     _macController =
         TextEditingController(text: config?.macAddress ?? '');
+    _mqttTokenController =
+        TextEditingController(text: config?.mqttToken ?? '');
   }
 
   @override
@@ -43,6 +46,7 @@
     _remoteHostController.dispose();
     _portController.dispose();
     _macController.dispose();
+    _mqttTokenController.dispose();
     super.dispose();
   }
 
@@ -58,6 +62,9 @@
       macAddress: _macController.text.trim().isEmpty
           ? null
           : _macController.text.trim(),
+      mqttToken: _mqttTokenController.text.trim().isEmpty
+          ? null
+          : _mqttTokenController.text.trim(),
     );
 
     await ref.read(serverConfigProvider.notifier).save(config);
@@ -183,6 +190,19 @@
                   hintText: 'AA:BB:CC:DD:EE:FF',
                 ),
               ),
+              const SizedBox(height: 16),
+
+              // MQTT Token
+              Text('MQTT Token',
+                  style: Theme.of(context).textTheme.bodyMedium),
+              const SizedBox(height: 4),
+              TextFormField(
+                controller: _mqttTokenController,
+                decoration: const InputDecoration(
+                  hintText: 'Shared secret for MQTT auth',
+                ),
+                obscureText: true,
+              ),
               const SizedBox(height: 24),
 
               // Save button
diff --git a/lib/services/mqtt_service.dart b/lib/services/mqtt_service.dart
new file mode 100644
index 0000000..3f3afcb
--- /dev/null
+++ b/lib/services/mqtt_service.dart
@@ -0,0 +1,455 @@
+import 'dart:async';
+import 'dart:convert';
+
+import 'package:flutter/widgets.dart';
+import 'package:mqtt_client/mqtt_client.dart';
+import 'package:mqtt_client/mqtt_server_client.dart';
+import 'package:shared_preferences/shared_preferences.dart';
+import 'package:uuid/uuid.dart';
+
+import '../models/server_config.dart';
+import 'websocket_service.dart' show ConnectionStatus;
+import 'wol_service.dart';
+
+/// MQTT port — standard unencrypted MQTT.
+const int mqttPort = 1883;
+
+/// MQTT client for PAILot, replacing WebSocketService.
+///
+/// Connects to the AIBroker daemon's embedded aedes broker.
+/// Subscribes to all pailot/ topics and dispatches messages
+/// through the same callback interface as WebSocketService.
+class MqttService with WidgetsBindingObserver {
+  MqttService({required this.config});
+
+  ServerConfig config;
+  MqttServerClient? _client;
+  ConnectionStatus _status = ConnectionStatus.disconnected;
+  bool _intentionalClose = false;
+  String? _clientId;
+  StreamSubscription? _updatesSub;
+
+  // Message deduplication
+  final Set<String> _seenMsgIds = {};
+  final List<String> _seenMsgIdOrder = [];
+  static const int _maxSeenIds = 500;
+
+  // Callbacks — same interface as WebSocketService
+  void Function(ConnectionStatus status)? onStatusChanged;
+  void Function(Map<String, dynamic> message)? onMessage;
+  void Function()? onOpen;
+  void Function()? onClose;
+  void Function()? onReconnecting;
+  void Function(String error)? onError;
+
+  ConnectionStatus get status => _status;
+  bool get isConnected => _status == ConnectionStatus.connected;
+
+  void _setStatus(ConnectionStatus newStatus) {
+    if (_status == newStatus) return;
+    _status = newStatus;
+    onStatusChanged?.call(newStatus);
+  }
+
+  /// Get or create a persistent client ID for this device.
+  Future<String> _getClientId() async {
+    if (_clientId != null) return _clientId!;
+    final prefs = await SharedPreferences.getInstance();
+    var id = prefs.getString('mqtt_client_id');
+    if (id == null) {
+      id = 'pailot-${const Uuid().v4()}';
+      await prefs.setString('mqtt_client_id', id);
+    }
+    _clientId = id;
+    return id;
+  }
+
+  /// Connect to the MQTT broker.
+  /// Tries local host first (2.5s timeout), then remote host.
+  Future<void> connect() async {
+    if (_status == ConnectionStatus.connected ||
+        _status == ConnectionStatus.connecting) {
+      return;
+    }
+
+    _intentionalClose = false;
+    _setStatus(ConnectionStatus.connecting);
+
+    // Send Wake-on-LAN if MAC configured
+    if (config.macAddress != null && config.macAddress!.isNotEmpty) {
+      try {
+        await WolService.wake(config.macAddress!, localHost: config.localHost);
+      } catch (_) {}
+    }
+
+    final clientId = await _getClientId();
+    final hosts = _getHosts();
+
+    for (final host in hosts) {
+      if (_intentionalClose) return;
+
+      try {
+        final connected = await _tryConnect(
+          host,
+          clientId,
+          timeout: host == hosts.first && hosts.length > 1 ? 2500 : 5000,
+        );
+        if (connected) return;
+      } catch (_) {
+        continue;
+      }
+    }
+
+    // All hosts failed
+    _setStatus(ConnectionStatus.disconnected);
+    onError?.call('Failed to connect to MQTT broker');
+  }
+
+  /// Returns [localHost, remoteHost] for dual-connect attempts.
+  List<String> _getHosts() {
+    if (config.localHost != null &&
+        config.localHost!.isNotEmpty &&
+        config.localHost != config.host) {
+      return [config.localHost!, config.host];
+    }
+    return [config.host];
+  }
+
+  Future<bool> _tryConnect(String host, String clientId, {int timeout = 5000}) async {
+    try {
+      final client = MqttServerClient.withPort(host, clientId, mqttPort);
+      client.keepAlivePeriod = 30;
+      client.autoReconnect = true;
+      client.connectTimeoutPeriod = timeout;
+      client.logging(on: false);
+
+      client.onConnected = _onConnected;
+      client.onDisconnected = _onDisconnected;
+      client.onAutoReconnect = _onAutoReconnect;
+      client.onAutoReconnected = _onAutoReconnected;
+
+      // Persistent session (cleanSession = false) for offline message queuing
+      final connMessage = MqttConnectMessage()
+          .withClientIdentifier(clientId)
+          .authenticateAs('pailot', config.mqttToken ?? '')
+          .startClean(); // Use clean session for now; persistent sessions require broker support
+
+      // For persistent sessions, replace startClean() with:
+      // .withWillQos(MqttQos.atLeastOnce);
+      // and remove startClean()
+
+      client.connectionMessage = connMessage;
+
+      final result = await client.connect();
+      if (result?.state == MqttConnectionState.connected) {
+        _client = client;
+        return true;
+      }
+      client.disconnect();
+      return false;
+    } catch (e) {
+      return false;
+    }
+  }
+
+  void _onConnected() {
+    _setStatus(ConnectionStatus.connected);
+    _subscribe();
+    _listenMessages();
+    onOpen?.call();
+  }
+
+  void _onDisconnected() {
+    _updatesSub?.cancel();
+    _updatesSub = null;
+
+    if (_intentionalClose) {
+      _setStatus(ConnectionStatus.disconnected);
+      onClose?.call();
+    } else {
+      _setStatus(ConnectionStatus.reconnecting);
+      onReconnecting?.call();
+    }
+  }
+
+  void _onAutoReconnect() {
+    _setStatus(ConnectionStatus.reconnecting);
+    onReconnecting?.call();
+  }
+
+  void _onAutoReconnected() {
+    _setStatus(ConnectionStatus.connected);
+    _subscribe();
+    _listenMessages();
+    onOpen?.call();
+  }
+
+  void _subscribe() {
+    final client = _client;
+    if (client == null) return;
+
+    client.subscribe('pailot/sessions', MqttQos.atLeastOnce);
+    client.subscribe('pailot/status', MqttQos.atLeastOnce);
+    client.subscribe('pailot/projects', MqttQos.atLeastOnce);
+    client.subscribe('pailot/+/out', MqttQos.atLeastOnce);
+    client.subscribe('pailot/+/typing', MqttQos.atMostOnce);
+    client.subscribe('pailot/+/screenshot', MqttQos.atLeastOnce);
+    client.subscribe('pailot/control/out', MqttQos.atLeastOnce);
+    client.subscribe('pailot/voice/transcript', MqttQos.atLeastOnce);
+  }
+
+  void _listenMessages() {
+    _updatesSub?.cancel();
+    _updatesSub = _client?.updates?.listen(_onMqttMessage);
+  }
+
+  void _onMqttMessage(List<MqttReceivedMessage<MqttMessage>> messages) {
+    for (final msg in messages) {
+      final pubMsg = msg.payload as MqttPublishMessage;
+      final payload = MqttPublishPayload.bytesToStringAsString(
+        pubMsg.payload.message,
+      );
+
+      Map<String, dynamic> json;
+      try {
+        json = jsonDecode(payload) as Map<String, dynamic>;
+      } catch (_) {
+        continue; // Skip non-JSON
+      }
+
+      // Dedup by msgId
+      final msgId = json['msgId'] as String?;
+      if (msgId != null) {
+        if (_seenMsgIds.contains(msgId)) continue;
+        _seenMsgIds.add(msgId);
+        _seenMsgIdOrder.add(msgId);
+        _evictOldIds();
+      }
+
+      // Dispatch: parse topic to enrich the message with routing info
+      _dispatchMessage(msg.topic, json);
+    }
+  }
+
+  /// Route incoming MQTT messages to the onMessage callback.
+  /// Translates MQTT topic structure into the flat message format
+  /// that chat_screen expects (same as WebSocket messages).
+  void _dispatchMessage(String topic, Map<String, dynamic> json) {
+    final parts = topic.split('/');
+
+    // pailot/sessions
+    if (topic == 'pailot/sessions') {
+      json['type'] = 'sessions';
+      onMessage?.call(json);
+      return;
+    }
+
+    // pailot/status
+    if (topic == 'pailot/status') {
+      json['type'] = 'status';
+      onMessage?.call(json);
+      return;
+    }
+
+    // pailot/projects
+    if (topic == 'pailot/projects') {
+      json['type'] = 'projects';
+      onMessage?.call(json);
+      return;
+    }
+
+    // pailot/control/out — command responses (session_switched, session_renamed, error, unread)
+    if (topic == 'pailot/control/out') {
+      onMessage?.call(json);
+      return;
+    }
+
+    // pailot/voice/transcript
+    if (topic == 'pailot/voice/transcript') {
+      json['type'] = 'transcript';
+      onMessage?.call(json);
+      return;
+    }
+
+    // pailot/<sessionId>/out — text, voice, image messages
+    if (parts.length == 3 && parts[2] == 'out') {
+      final sessionId = parts[1];
+      json['sessionId'] ??= sessionId;
+      onMessage?.call(json);
+      return;
+    }
+
+    // pailot/<sessionId>/typing
+    if (parts.length == 3 && parts[2] == 'typing') {
+      final sessionId = parts[1];
+      json['type'] = 'typing';
+      json['sessionId'] ??= sessionId;
+      // Map 'active' field to the 'typing'/'isTyping' fields chat_screen expects
+      final active = json['active'] as bool? ?? true;
+      json['typing'] = active;
+      onMessage?.call(json);
+      return;
+    }
+
+    // pailot/<sessionId>/screenshot
+    if (parts.length == 3 && parts[2] == 'screenshot') {
+      final sessionId = parts[1];
+      json['type'] = 'screenshot';
+      json['sessionId'] ??= sessionId;
+      // Map imageBase64 to 'data' for compatibility with chat_screen handler
+      json['data'] ??= json['imageBase64'];
+      onMessage?.call(json);
+      return;
+    }
+  }
+
+  void _evictOldIds() {
+    while (_seenMsgIdOrder.length > _maxSeenIds) {
+      final oldest = _seenMsgIdOrder.removeAt(0);
+      _seenMsgIds.remove(oldest);
+    }
+  }
+
+  /// Generate a UUID v4 for message IDs.
+  String _uuid() => const Uuid().v4();
+
+  /// Current timestamp in milliseconds.
+  int _now() => DateTime.now().millisecondsSinceEpoch;
+
+  /// Publish a JSON payload to an MQTT topic.
+  void _publish(String topic, Map<String, dynamic> payload, MqttQos qos) {
+    final client = _client;
+    if (client == null || client.connectionStatus?.state != MqttConnectionState.connected) {
+      onError?.call('Not connected');
+      return;
+    }
+
+    try {
+      final builder = MqttClientPayloadBuilder();
+      builder.addString(jsonEncode(payload));
+      client.publishMessage(topic, qos, builder.payload!);
+    } catch (e) {
+      onError?.call('Send failed: $e');
+    }
+  }
+
+  /// Send a message — routes to the appropriate MQTT topic based on content.
+  /// Accepts the same message format as WebSocketService.send().
+  void send(Map<String, dynamic> message) {
+    final type = message['type'] as String?;
+    final sessionId = message['sessionId'] as String?;
+
+    if (type == 'command' || (message.containsKey('command') && type == null)) {
+      // Command messages go to pailot/control/in
+      final command = message['command'] as String? ?? '';
+      final args = message['args'] as Map<String, dynamic>? ?? {};
+      final payload = <String, dynamic>{
+        'msgId': _uuid(),
+        'type': 'command',
+        'command': command,
+        'ts': _now(),
+        ...args,
+      };
+      _publish('pailot/control/in', payload, MqttQos.atLeastOnce);
+      return;
+    }
+
+    if (type == 'voice' && sessionId != null) {
+      // Voice message
+      _publish('pailot/$sessionId/in', {
+        'msgId': _uuid(),
+        'type': 'voice',
+        'sessionId': sessionId,
+        'audioBase64': message['audioBase64'] ?? '',
+        'ts': _now(),
+      }, MqttQos.atLeastOnce);
+      return;
+    }
+
+    if (type == 'image' && sessionId != null) {
+      // Image message
+      _publish('pailot/$sessionId/in', {
+        'msgId': _uuid(),
+        'type': 'image',
+        'sessionId': sessionId,
+        'imageBase64': message['imageBase64'] ?? '',
+        'mimeType': message['mimeType'] ?? 'image/jpeg',
+        'caption': message['caption'] ?? '',
+        'ts': _now(),
+      }, MqttQos.atLeastOnce);
+      return;
+    }
+
+    if (type == 'tts' && sessionId != null) {
+      // TTS request — route as command
+      _publish('pailot/control/in', {
+        'msgId': _uuid(),
+        'type': 'command',
+        'command': 'tts',
+        'text': message['text'] ?? '',
+        'sessionId': sessionId,
+        'ts': _now(),
+      }, MqttQos.atLeastOnce);
+      return;
+    }
+
+    // Default: plain text message (content + sessionId)
+    if (sessionId != null) {
+      final content = message['content'] as String? ?? '';
+      _publish('pailot/$sessionId/in', {
+        'msgId': _uuid(),
+        'type': 'text',
+        'sessionId': sessionId,
+        'content': content,
+        'ts': _now(),
+      }, MqttQos.atLeastOnce);
+      return;
+    }
+
+    onError?.call('Cannot send message: missing sessionId');
+  }
+
+  /// Disconnect intentionally.
+  void disconnect() {
+    _intentionalClose = true;
+    _updatesSub?.cancel();
+    _updatesSub = null;
+
+    try {
+      _client?.disconnect();
+    } catch (_) {}
+    _client = null;
+
+    _setStatus(ConnectionStatus.disconnected);
+    onClose?.call();
+  }
+
+  /// Update config and reconnect.
+  Future<void> updateConfig(ServerConfig newConfig) async {
+    config = newConfig;
+    disconnect();
+    await Future.delayed(const Duration(milliseconds: 100));
+    await connect();
+  }
+
+  /// Dispose all resources.
+  void dispose() {
+    disconnect();
+  }
+
+  // App lifecycle integration
+  @override
+  void didChangeAppLifecycleState(AppLifecycleState state) {
+    switch (state) {
+      case AppLifecycleState.resumed:
+        if (_status != ConnectionStatus.connected && !_intentionalClose) {
+          connect();
+        }
+      case AppLifecycleState.paused:
+        // Keep connection alive — MQTT handles keepalive natively
+        break;
+      default:
+        break;
+    }
+  }
+}
diff --git a/pubspec.lock b/pubspec.lock
index aa51aec..61cba3b 100644
--- a/pubspec.lock
+++ b/pubspec.lock
@@ -161,6 +161,14 @@
       url: "https://pub.dev"
     source: hosted
     version: "7.0.3"
+  event_bus:
+    dependency: transitive
+    description:
+      name: event_bus
+      sha256: "1a55e97923769c286d295240048fc180e7b0768902c3c2e869fe059aafa15304"
+      url: "https://pub.dev"
+    source: hosted
+    version: "2.0.1"
   fake_async:
     dependency: transitive
     description:
@@ -504,6 +512,14 @@
       url: "https://pub.dev"
     source: hosted
     version: "2.0.0"
+  mqtt_client:
+    dependency: "direct main"
+    description:
+      name: mqtt_client
+      sha256: fd22ea00a4c7b5623e01000a91a256d62a8bacba38e9812170458070c52affed
+      url: "https://pub.dev"
+    source: hosted
+    version: "10.11.9"
   native_toolchain_c:
     dependency: transitive
     description:
diff --git a/pubspec.yaml b/pubspec.yaml
index 3795dfa..7ad8705 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -26,6 +26,7 @@
   share_plus: ^12.0.1
   udp: ^5.0.3
   intl: ^0.20.2
+  mqtt_client: ^10.6.0
   uuid: ^4.5.1
   collection: ^1.19.1
 

--
Gitblit v1.3.1