From c4ce6380fbfa55f22e9c20bb2ccffe4456ed9683 Mon Sep 17 00:00:00 2001
From: Matthias Nott <mnott@mnsoft.org>
Date: Sun, 22 Mar 2026 17:37:55 +0100
Subject: [PATCH] feat: MQTT client replaces WebSocket (Phase 2)
---
lib/screens/settings_screen.dart | 22 ++
lib/providers/providers.dart | 10
lib/models/server_config.dart | 6
lib/services/mqtt_service.dart | 455 ++++++++++++++++++++++++++++++++++++++++++++++++++
pubspec.lock | 16 +
lib/screens/chat_screen.dart | 9
pubspec.yaml | 1
7 files changed, 507 insertions(+), 12 deletions(-)
diff --git a/lib/models/server_config.dart b/lib/models/server_config.dart
index 7aecc9c..7963c66 100644
--- a/lib/models/server_config.dart
+++ b/lib/models/server_config.dart
@@ -3,12 +3,14 @@
final int port;
final String? localHost;
final String? macAddress;
+ final String? mqttToken;
const ServerConfig({
required this.host,
this.port = 8765,
this.localHost,
this.macAddress,
+ this.mqttToken,
});
/// Primary WebSocket URL (local network).
@@ -34,6 +36,7 @@
'port': port,
if (localHost != null) 'localHost': localHost,
if (macAddress != null) 'macAddress': macAddress,
+ if (mqttToken != null) 'mqttToken': mqttToken,
};
}
@@ -43,6 +46,7 @@
port: json['port'] as int? ?? 8765,
localHost: json['localHost'] as String?,
macAddress: json['macAddress'] as String?,
+ mqttToken: json['mqttToken'] as String?,
);
}
@@ -51,12 +55,14 @@
int? port,
String? localHost,
String? macAddress,
+ String? mqttToken,
}) {
return ServerConfig(
host: host ?? this.host,
port: port ?? this.port,
localHost: localHost ?? this.localHost,
macAddress: macAddress ?? this.macAddress,
+ mqttToken: mqttToken ?? this.mqttToken,
);
}
}
diff --git a/lib/providers/providers.dart b/lib/providers/providers.dart
index d4654dd..046f38f 100644
--- a/lib/providers/providers.dart
+++ b/lib/providers/providers.dart
@@ -8,7 +8,7 @@
import '../models/server_config.dart';
import '../models/session.dart';
import '../services/message_store.dart';
-import '../services/websocket_service.dart';
+import '../services/websocket_service.dart' show ConnectionStatus;
// --- Enums ---
@@ -197,9 +197,5 @@
final inputModeProvider = StateProvider<InputMode>((ref) => InputMode.voice);
-// --- WebSocket Service (singleton) ---
-
-final webSocketServiceProvider = Provider<WebSocketService?>((ref) {
- // This is managed manually in the chat screen
- return null;
-});
+// --- MQTT Service (singleton) ---
+// The MqttService is managed manually in the chat screen.
diff --git a/lib/screens/chat_screen.dart b/lib/screens/chat_screen.dart
index 5f3e38c..b933ba2 100644
--- a/lib/screens/chat_screen.dart
+++ b/lib/screens/chat_screen.dart
@@ -14,7 +14,7 @@
import '../providers/providers.dart';
import '../services/audio_service.dart';
import '../services/message_store.dart';
-import '../services/websocket_service.dart';
+import '../services/mqtt_service.dart';
import '../theme/app_theme.dart';
import '../widgets/command_bar.dart';
import '../widgets/input_bar.dart';
@@ -34,7 +34,7 @@
class _ChatScreenState extends ConsumerState<ChatScreen>
with WidgetsBindingObserver {
- WebSocketService? _ws;
+ MqttService? _ws;
final TextEditingController _textController = TextEditingController();
final ScrollController _scrollController = ScrollController();
final GlobalKey<ScaffoldState> _scaffoldKey = GlobalKey<ScaffoldState>();
@@ -125,7 +125,7 @@
if (config == null) return;
}
- _ws = WebSocketService(config: config);
+ _ws = MqttService(config: config);
_ws!.onStatusChanged = (status) {
if (mounted) {
ref.read(wsStatusProvider.notifier).state = status;
@@ -135,10 +135,11 @@
_ws!.onOpen = () {
final activeId = ref.read(activeSessionIdProvider);
_sendCommand('sync', activeId != null ? {'activeSessionId': activeId} : null);
+ // catch_up is still available during the transition period
_sendCommand('catch_up', {'lastSeq': _lastSeq});
};
_ws!.onError = (error) {
- debugPrint('WS error: $error');
+ debugPrint('MQTT error: $error');
};
NavigateNotifier.instance = NavigateNotifier(
diff --git a/lib/screens/settings_screen.dart b/lib/screens/settings_screen.dart
index 6cb89e7..ed129f8 100644
--- a/lib/screens/settings_screen.dart
+++ b/lib/screens/settings_screen.dart
@@ -3,7 +3,7 @@
import '../models/server_config.dart';
import '../providers/providers.dart';
-import '../services/websocket_service.dart';
+import '../services/websocket_service.dart' show ConnectionStatus;
import '../services/wol_service.dart';
import '../theme/app_theme.dart';
import '../widgets/status_dot.dart';
@@ -21,6 +21,7 @@
late final TextEditingController _remoteHostController;
late final TextEditingController _portController;
late final TextEditingController _macController;
+ late final TextEditingController _mqttTokenController;
bool _isWaking = false;
@override
@@ -35,6 +36,8 @@
TextEditingController(text: '${config?.port ?? 8765}');
_macController =
TextEditingController(text: config?.macAddress ?? '');
+ _mqttTokenController =
+ TextEditingController(text: config?.mqttToken ?? '');
}
@override
@@ -43,6 +46,7 @@
_remoteHostController.dispose();
_portController.dispose();
_macController.dispose();
+ _mqttTokenController.dispose();
super.dispose();
}
@@ -58,6 +62,9 @@
macAddress: _macController.text.trim().isEmpty
? null
: _macController.text.trim(),
+ mqttToken: _mqttTokenController.text.trim().isEmpty
+ ? null
+ : _mqttTokenController.text.trim(),
);
await ref.read(serverConfigProvider.notifier).save(config);
@@ -183,6 +190,19 @@
hintText: 'AA:BB:CC:DD:EE:FF',
),
),
+ const SizedBox(height: 16),
+
+ // MQTT Token
+ Text('MQTT Token',
+ style: Theme.of(context).textTheme.bodyMedium),
+ const SizedBox(height: 4),
+ TextFormField(
+ controller: _mqttTokenController,
+ decoration: const InputDecoration(
+ hintText: 'Shared secret for MQTT auth',
+ ),
+ obscureText: true,
+ ),
const SizedBox(height: 24),
// Save button
diff --git a/lib/services/mqtt_service.dart b/lib/services/mqtt_service.dart
new file mode 100644
index 0000000..3f3afcb
--- /dev/null
+++ b/lib/services/mqtt_service.dart
@@ -0,0 +1,455 @@
+import 'dart:async';
+import 'dart:convert';
+
+import 'package:flutter/widgets.dart';
+import 'package:mqtt_client/mqtt_client.dart';
+import 'package:mqtt_client/mqtt_server_client.dart';
+import 'package:shared_preferences/shared_preferences.dart';
+import 'package:uuid/uuid.dart';
+
+import '../models/server_config.dart';
+import 'websocket_service.dart' show ConnectionStatus;
+import 'wol_service.dart';
+
+/// MQTT port — standard unencrypted MQTT.
+const int mqttPort = 1883;
+
+/// MQTT client for PAILot, replacing WebSocketService.
+///
+/// Connects to the AIBroker daemon's embedded aedes broker.
+/// Subscribes to all pailot/ topics and dispatches messages
+/// through the same callback interface as WebSocketService.
+class MqttService with WidgetsBindingObserver {
+ MqttService({required this.config});
+
+ ServerConfig config;
+ MqttServerClient? _client;
+ ConnectionStatus _status = ConnectionStatus.disconnected;
+ bool _intentionalClose = false;
+ String? _clientId;
+ StreamSubscription? _updatesSub;
+
+ // Message deduplication
+ final Set<String> _seenMsgIds = {};
+ final List<String> _seenMsgIdOrder = [];
+ static const int _maxSeenIds = 500;
+
+ // Callbacks — same interface as WebSocketService
+ void Function(ConnectionStatus status)? onStatusChanged;
+ void Function(Map<String, dynamic> message)? onMessage;
+ void Function()? onOpen;
+ void Function()? onClose;
+ void Function()? onReconnecting;
+ void Function(String error)? onError;
+
+ ConnectionStatus get status => _status;
+ bool get isConnected => _status == ConnectionStatus.connected;
+
+ void _setStatus(ConnectionStatus newStatus) {
+ if (_status == newStatus) return;
+ _status = newStatus;
+ onStatusChanged?.call(newStatus);
+ }
+
+ /// Get or create a persistent client ID for this device.
+ Future<String> _getClientId() async {
+ if (_clientId != null) return _clientId!;
+ final prefs = await SharedPreferences.getInstance();
+ var id = prefs.getString('mqtt_client_id');
+ if (id == null) {
+ id = 'pailot-${const Uuid().v4()}';
+ await prefs.setString('mqtt_client_id', id);
+ }
+ _clientId = id;
+ return id;
+ }
+
+ /// Connect to the MQTT broker.
+ /// Tries local host first (2.5s timeout), then remote host.
+ Future<void> connect() async {
+ if (_status == ConnectionStatus.connected ||
+ _status == ConnectionStatus.connecting) {
+ return;
+ }
+
+ _intentionalClose = false;
+ _setStatus(ConnectionStatus.connecting);
+
+ // Send Wake-on-LAN if MAC configured
+ if (config.macAddress != null && config.macAddress!.isNotEmpty) {
+ try {
+ await WolService.wake(config.macAddress!, localHost: config.localHost);
+ } catch (_) {}
+ }
+
+ final clientId = await _getClientId();
+ final hosts = _getHosts();
+
+ for (final host in hosts) {
+ if (_intentionalClose) return;
+
+ try {
+ final connected = await _tryConnect(
+ host,
+ clientId,
+ timeout: host == hosts.first && hosts.length > 1 ? 2500 : 5000,
+ );
+ if (connected) return;
+ } catch (_) {
+ continue;
+ }
+ }
+
+ // All hosts failed
+ _setStatus(ConnectionStatus.disconnected);
+ onError?.call('Failed to connect to MQTT broker');
+ }
+
+ /// Returns [localHost, remoteHost] for dual-connect attempts.
+ List<String> _getHosts() {
+ if (config.localHost != null &&
+ config.localHost!.isNotEmpty &&
+ config.localHost != config.host) {
+ return [config.localHost!, config.host];
+ }
+ return [config.host];
+ }
+
+ Future<bool> _tryConnect(String host, String clientId, {int timeout = 5000}) async {
+ try {
+ final client = MqttServerClient.withPort(host, clientId, mqttPort);
+ client.keepAlivePeriod = 30;
+ client.autoReconnect = true;
+ client.connectTimeoutPeriod = timeout;
+ client.logging(on: false);
+
+ client.onConnected = _onConnected;
+ client.onDisconnected = _onDisconnected;
+ client.onAutoReconnect = _onAutoReconnect;
+ client.onAutoReconnected = _onAutoReconnected;
+
+ // Persistent session (cleanSession = false) for offline message queuing
+ final connMessage = MqttConnectMessage()
+ .withClientIdentifier(clientId)
+ .authenticateAs('pailot', config.mqttToken ?? '')
+ .startClean(); // Use clean session for now; persistent sessions require broker support
+
+ // For persistent sessions, replace startClean() with:
+ // .withWillQos(MqttQos.atLeastOnce);
+ // and remove startClean()
+
+ client.connectionMessage = connMessage;
+
+ final result = await client.connect();
+ if (result?.state == MqttConnectionState.connected) {
+ _client = client;
+ return true;
+ }
+ client.disconnect();
+ return false;
+ } catch (e) {
+ return false;
+ }
+ }
+
+ void _onConnected() {
+ _setStatus(ConnectionStatus.connected);
+ _subscribe();
+ _listenMessages();
+ onOpen?.call();
+ }
+
+ void _onDisconnected() {
+ _updatesSub?.cancel();
+ _updatesSub = null;
+
+ if (_intentionalClose) {
+ _setStatus(ConnectionStatus.disconnected);
+ onClose?.call();
+ } else {
+ _setStatus(ConnectionStatus.reconnecting);
+ onReconnecting?.call();
+ }
+ }
+
+ void _onAutoReconnect() {
+ _setStatus(ConnectionStatus.reconnecting);
+ onReconnecting?.call();
+ }
+
+ void _onAutoReconnected() {
+ _setStatus(ConnectionStatus.connected);
+ _subscribe();
+ _listenMessages();
+ onOpen?.call();
+ }
+
+ void _subscribe() {
+ final client = _client;
+ if (client == null) return;
+
+ client.subscribe('pailot/sessions', MqttQos.atLeastOnce);
+ client.subscribe('pailot/status', MqttQos.atLeastOnce);
+ client.subscribe('pailot/projects', MqttQos.atLeastOnce);
+ client.subscribe('pailot/+/out', MqttQos.atLeastOnce);
+ client.subscribe('pailot/+/typing', MqttQos.atMostOnce);
+ client.subscribe('pailot/+/screenshot', MqttQos.atLeastOnce);
+ client.subscribe('pailot/control/out', MqttQos.atLeastOnce);
+ client.subscribe('pailot/voice/transcript', MqttQos.atLeastOnce);
+ }
+
+ void _listenMessages() {
+ _updatesSub?.cancel();
+ _updatesSub = _client?.updates?.listen(_onMqttMessage);
+ }
+
+ void _onMqttMessage(List<MqttReceivedMessage<MqttMessage>> messages) {
+ for (final msg in messages) {
+ final pubMsg = msg.payload as MqttPublishMessage;
+ final payload = MqttPublishPayload.bytesToStringAsString(
+ pubMsg.payload.message,
+ );
+
+ Map<String, dynamic> json;
+ try {
+ json = jsonDecode(payload) as Map<String, dynamic>;
+ } catch (_) {
+ continue; // Skip non-JSON
+ }
+
+ // Dedup by msgId
+ final msgId = json['msgId'] as String?;
+ if (msgId != null) {
+ if (_seenMsgIds.contains(msgId)) continue;
+ _seenMsgIds.add(msgId);
+ _seenMsgIdOrder.add(msgId);
+ _evictOldIds();
+ }
+
+ // Dispatch: parse topic to enrich the message with routing info
+ _dispatchMessage(msg.topic, json);
+ }
+ }
+
+ /// Route incoming MQTT messages to the onMessage callback.
+ /// Translates MQTT topic structure into the flat message format
+ /// that chat_screen expects (same as WebSocket messages).
+ void _dispatchMessage(String topic, Map<String, dynamic> json) {
+ final parts = topic.split('/');
+
+ // pailot/sessions
+ if (topic == 'pailot/sessions') {
+ json['type'] = 'sessions';
+ onMessage?.call(json);
+ return;
+ }
+
+ // pailot/status
+ if (topic == 'pailot/status') {
+ json['type'] = 'status';
+ onMessage?.call(json);
+ return;
+ }
+
+ // pailot/projects
+ if (topic == 'pailot/projects') {
+ json['type'] = 'projects';
+ onMessage?.call(json);
+ return;
+ }
+
+ // pailot/control/out — command responses (session_switched, session_renamed, error, unread)
+ if (topic == 'pailot/control/out') {
+ onMessage?.call(json);
+ return;
+ }
+
+ // pailot/voice/transcript
+ if (topic == 'pailot/voice/transcript') {
+ json['type'] = 'transcript';
+ onMessage?.call(json);
+ return;
+ }
+
+ // pailot/<sessionId>/out — text, voice, image messages
+ if (parts.length == 3 && parts[2] == 'out') {
+ final sessionId = parts[1];
+ json['sessionId'] ??= sessionId;
+ onMessage?.call(json);
+ return;
+ }
+
+ // pailot/<sessionId>/typing
+ if (parts.length == 3 && parts[2] == 'typing') {
+ final sessionId = parts[1];
+ json['type'] = 'typing';
+ json['sessionId'] ??= sessionId;
+ // Map 'active' field to the 'typing'/'isTyping' fields chat_screen expects
+ final active = json['active'] as bool? ?? true;
+ json['typing'] = active;
+ onMessage?.call(json);
+ return;
+ }
+
+ // pailot/<sessionId>/screenshot
+ if (parts.length == 3 && parts[2] == 'screenshot') {
+ final sessionId = parts[1];
+ json['type'] = 'screenshot';
+ json['sessionId'] ??= sessionId;
+ // Map imageBase64 to 'data' for compatibility with chat_screen handler
+ json['data'] ??= json['imageBase64'];
+ onMessage?.call(json);
+ return;
+ }
+ }
+
+ void _evictOldIds() {
+ while (_seenMsgIdOrder.length > _maxSeenIds) {
+ final oldest = _seenMsgIdOrder.removeAt(0);
+ _seenMsgIds.remove(oldest);
+ }
+ }
+
+ /// Generate a UUID v4 for message IDs.
+ String _uuid() => const Uuid().v4();
+
+ /// Current timestamp in milliseconds.
+ int _now() => DateTime.now().millisecondsSinceEpoch;
+
+ /// Publish a JSON payload to an MQTT topic.
+ void _publish(String topic, Map<String, dynamic> payload, MqttQos qos) {
+ final client = _client;
+ if (client == null || client.connectionStatus?.state != MqttConnectionState.connected) {
+ onError?.call('Not connected');
+ return;
+ }
+
+ try {
+ final builder = MqttClientPayloadBuilder();
+ builder.addString(jsonEncode(payload));
+ client.publishMessage(topic, qos, builder.payload!);
+ } catch (e) {
+ onError?.call('Send failed: $e');
+ }
+ }
+
+ /// Send a message — routes to the appropriate MQTT topic based on content.
+ /// Accepts the same message format as WebSocketService.send().
+ void send(Map<String, dynamic> message) {
+ final type = message['type'] as String?;
+ final sessionId = message['sessionId'] as String?;
+
+ if (type == 'command' || (message.containsKey('command') && type == null)) {
+ // Command messages go to pailot/control/in
+ final command = message['command'] as String? ?? '';
+ final args = message['args'] as Map<String, dynamic>? ?? {};
+ final payload = <String, dynamic>{
+ 'msgId': _uuid(),
+ 'type': 'command',
+ 'command': command,
+ 'ts': _now(),
+ ...args,
+ };
+ _publish('pailot/control/in', payload, MqttQos.atLeastOnce);
+ return;
+ }
+
+ if (type == 'voice' && sessionId != null) {
+ // Voice message
+ _publish('pailot/$sessionId/in', {
+ 'msgId': _uuid(),
+ 'type': 'voice',
+ 'sessionId': sessionId,
+ 'audioBase64': message['audioBase64'] ?? '',
+ 'ts': _now(),
+ }, MqttQos.atLeastOnce);
+ return;
+ }
+
+ if (type == 'image' && sessionId != null) {
+ // Image message
+ _publish('pailot/$sessionId/in', {
+ 'msgId': _uuid(),
+ 'type': 'image',
+ 'sessionId': sessionId,
+ 'imageBase64': message['imageBase64'] ?? '',
+ 'mimeType': message['mimeType'] ?? 'image/jpeg',
+ 'caption': message['caption'] ?? '',
+ 'ts': _now(),
+ }, MqttQos.atLeastOnce);
+ return;
+ }
+
+ if (type == 'tts' && sessionId != null) {
+ // TTS request — route as command
+ _publish('pailot/control/in', {
+ 'msgId': _uuid(),
+ 'type': 'command',
+ 'command': 'tts',
+ 'text': message['text'] ?? '',
+ 'sessionId': sessionId,
+ 'ts': _now(),
+ }, MqttQos.atLeastOnce);
+ return;
+ }
+
+ // Default: plain text message (content + sessionId)
+ if (sessionId != null) {
+ final content = message['content'] as String? ?? '';
+ _publish('pailot/$sessionId/in', {
+ 'msgId': _uuid(),
+ 'type': 'text',
+ 'sessionId': sessionId,
+ 'content': content,
+ 'ts': _now(),
+ }, MqttQos.atLeastOnce);
+ return;
+ }
+
+ onError?.call('Cannot send message: missing sessionId');
+ }
+
+ /// Disconnect intentionally.
+ void disconnect() {
+ _intentionalClose = true;
+ _updatesSub?.cancel();
+ _updatesSub = null;
+
+ try {
+ _client?.disconnect();
+ } catch (_) {}
+ _client = null;
+
+ _setStatus(ConnectionStatus.disconnected);
+ onClose?.call();
+ }
+
+ /// Update config and reconnect.
+ Future<void> updateConfig(ServerConfig newConfig) async {
+ config = newConfig;
+ disconnect();
+ await Future.delayed(const Duration(milliseconds: 100));
+ await connect();
+ }
+
+ /// Dispose all resources.
+ void dispose() {
+ disconnect();
+ }
+
+ // App lifecycle integration
+ @override
+ void didChangeAppLifecycleState(AppLifecycleState state) {
+ switch (state) {
+ case AppLifecycleState.resumed:
+ if (_status != ConnectionStatus.connected && !_intentionalClose) {
+ connect();
+ }
+ case AppLifecycleState.paused:
+ // Keep connection alive — MQTT handles keepalive natively
+ break;
+ default:
+ break;
+ }
+ }
+}
diff --git a/pubspec.lock b/pubspec.lock
index aa51aec..61cba3b 100644
--- a/pubspec.lock
+++ b/pubspec.lock
@@ -161,6 +161,14 @@
url: "https://pub.dev"
source: hosted
version: "7.0.3"
+ event_bus:
+ dependency: transitive
+ description:
+ name: event_bus
+ sha256: "1a55e97923769c286d295240048fc180e7b0768902c3c2e869fe059aafa15304"
+ url: "https://pub.dev"
+ source: hosted
+ version: "2.0.1"
fake_async:
dependency: transitive
description:
@@ -504,6 +512,14 @@
url: "https://pub.dev"
source: hosted
version: "2.0.0"
+ mqtt_client:
+ dependency: "direct main"
+ description:
+ name: mqtt_client
+ sha256: fd22ea00a4c7b5623e01000a91a256d62a8bacba38e9812170458070c52affed
+ url: "https://pub.dev"
+ source: hosted
+ version: "10.11.9"
native_toolchain_c:
dependency: transitive
description:
diff --git a/pubspec.yaml b/pubspec.yaml
index 3795dfa..7ad8705 100644
--- a/pubspec.yaml
+++ b/pubspec.yaml
@@ -26,6 +26,7 @@
share_plus: ^12.0.1
udp: ^5.0.3
intl: ^0.20.2
+ mqtt_client: ^10.6.0
uuid: ^4.5.1
collection: ^1.19.1
--
Gitblit v1.3.1