From cb470d33d2665fcc6f8448d2736777656cf0cbe7 Mon Sep 17 00:00:00 2001
From: Matthias Nott <mnott@mnsoft.org>
Date: Tue, 24 Mar 2026 00:25:07 +0100
Subject: [PATCH] feat: MQTT migration, offline catch_up, clean session, image support
---
lib/screens/settings_screen.dart | 2
/dev/null | 288 --------------------------------
lib/providers/providers.dart | 2
lib/models/server_config.dart | 17 -
lib/services/mqtt_service.dart | 23 +
lib/screens/navigate_screen.dart | 19 -
lib/widgets/status_dot.dart | 4
lib/screens/chat_screen.dart | 127 ++++++++++++-
8 files changed, 141 insertions(+), 341 deletions(-)
diff --git a/lib/models/server_config.dart b/lib/models/server_config.dart
index 7963c66..14fa6df 100644
--- a/lib/models/server_config.dart
+++ b/lib/models/server_config.dart
@@ -13,23 +13,6 @@
this.mqttToken,
});
- /// Primary WebSocket URL (local network).
- String get localUrl {
- final h = localHost ?? host;
- return 'ws://$h:$port';
- }
-
- /// Fallback WebSocket URL (remote / public).
- String get remoteUrl => 'ws://$host:$port';
-
- /// Returns [localUrl, remoteUrl] for dual-connect attempts.
- List<String> get urls {
- if (localHost != null && localHost!.isNotEmpty && localHost != host) {
- return [localUrl, remoteUrl];
- }
- return [remoteUrl];
- }
-
Map<String, dynamic> toJson() {
return {
'host': host,
diff --git a/lib/providers/providers.dart b/lib/providers/providers.dart
index 046f38f..b477237 100644
--- a/lib/providers/providers.dart
+++ b/lib/providers/providers.dart
@@ -8,7 +8,7 @@
import '../models/server_config.dart';
import '../models/session.dart';
import '../services/message_store.dart';
-import '../services/websocket_service.dart' show ConnectionStatus;
+import '../services/mqtt_service.dart' show ConnectionStatus;
// --- Enums ---
diff --git a/lib/screens/chat_screen.dart b/lib/screens/chat_screen.dart
index 4f43034..2b0c8ee 100644
--- a/lib/screens/chat_screen.dart
+++ b/lib/screens/chat_screen.dart
@@ -56,6 +56,9 @@
bool _isCatchingUp = false;
bool _screenshotForChat = false;
final Set<int> _seenSeqs = {};
+ bool _sessionReady = false;
+ final List<Map<String, dynamic>> _pendingMessages = [];
+ final Map<String, List<Message>> _catchUpPending = {};
@override
void initState() {
@@ -66,9 +69,14 @@
}
Future<void> _initAll() async {
- // Load lastSeq BEFORE connecting so catch_up sends the right value
+ // Load persisted state BEFORE connecting
final prefs = await SharedPreferences.getInstance();
_lastSeq = prefs.getInt('lastSeq') ?? 0;
+ // Restore last active session so catch_up routes to the right session
+ final savedSessionId = prefs.getString('activeSessionId');
+ if (savedSessionId != null && mounted) {
+ ref.read(activeSessionIdProvider.notifier).state = savedSessionId;
+ }
if (!mounted) return;
// Listen for playback state changes to reset play button UI
@@ -146,10 +154,11 @@
};
_ws!.onMessage = _handleMessage;
_ws!.onOpen = () {
+ _sessionReady = false; // Gate messages until sessions arrive
+ _pendingMessages.clear();
final activeId = ref.read(activeSessionIdProvider);
_sendCommand('sync', activeId != null ? {'activeSessionId': activeId} : null);
- // catch_up is still available during the transition period
- _sendCommand('catch_up', {'lastSeq': _lastSeq});
+ // catch_up is sent after sessions arrive (in _handleSessions)
};
_ws!.onError = (error) {
debugPrint('MQTT error: $error');
@@ -168,6 +177,14 @@
}
void _handleMessage(Map<String, dynamic> msg) {
+ final type = msg['type'] as String?;
+ // Sessions and catch_up always process immediately
+ // Content messages (text, voice, image) wait until session is ready
+ if (!_sessionReady && type != 'sessions' && type != 'catch_up' && type != 'status' && type != 'typing') {
+ _pendingMessages.add(msg);
+ return;
+ }
+
// Track sequence numbers for catch_up protocol
final seq = msg['seq'] as int?;
if (seq != null) {
@@ -184,8 +201,6 @@
_saveLastSeq();
}
}
-
- final type = msg['type'] as String?;
switch (type) {
case 'sessions':
@@ -231,7 +246,8 @@
if (sessionId != null) _incrementUnread(sessionId);
case 'catch_up':
final serverSeq = msg['serverSeq'] as int?;
- if (serverSeq != null && serverSeq > _lastSeq) {
+ if (serverSeq != null) {
+ // Always sync to server's seq — if server restarted, its seq may be lower
_lastSeq = serverSeq;
_saveLastSeq();
}
@@ -241,19 +257,91 @@
final catchUpMsgs = msg['messages'] as List<dynamic>?;
if (catchUpMsgs != null && catchUpMsgs.isNotEmpty) {
_isCatchingUp = true;
+ final activeId = ref.read(activeSessionIdProvider);
final existing = ref.read(messagesProvider);
final existingContents = existing
.where((m) => m.role == MessageRole.assistant)
.map((m) => m.content)
.toSet();
for (final m in catchUpMsgs) {
- final content = (m as Map<String, dynamic>)['content'] as String? ?? '';
- // Skip if we already have this message locally
- if (content.isNotEmpty && existingContents.contains(content)) continue;
- _handleMessage(m);
- if (content.isNotEmpty) existingContents.add(content);
+ final map = m as Map<String, dynamic>;
+ final msgType = map['type'] as String? ?? 'text';
+ final content = map['content'] as String? ?? map['transcript'] as String? ?? map['caption'] as String? ?? '';
+ final msgSessionId = map['sessionId'] as String?;
+ final imageData = map['imageBase64'] as String?;
+
+ // Skip empty text messages (images with no caption are OK)
+ if (content.isEmpty && imageData == null) continue;
+ // Dedup by content (skip images from dedup — they have unique msgIds)
+ if (imageData == null && content.isNotEmpty && existingContents.contains(content)) continue;
+
+ final Message message;
+ if (msgType == 'image' && imageData != null) {
+ message = Message.image(
+ role: MessageRole.assistant,
+ imageBase64: imageData,
+ content: content,
+ status: MessageStatus.sent,
+ );
+ } else {
+ message = Message.text(
+ role: MessageRole.assistant,
+ content: content,
+ status: MessageStatus.sent,
+ );
+ }
+
+ if (msgSessionId == null || msgSessionId == activeId) {
+ // Active session or no session: add directly to chat
+ ref.read(messagesProvider.notifier).addMessage(message);
+ } else {
+ // Different session: store + unread badge + toast
+ // Collect for batch storage below to avoid race condition
+ _catchUpPending.putIfAbsent(msgSessionId, () => []).add(message);
+ _incrementUnread(msgSessionId);
+ }
+ existingContents.add(content);
}
_isCatchingUp = false;
+ _scrollToBottom();
+ // Batch-store cross-session messages (sequential to avoid race condition)
+ if (_catchUpPending.isNotEmpty) {
+ final pending = Map<String, List<Message>>.from(_catchUpPending);
+ _catchUpPending.clear();
+ // Show one toast per session with message count
+ if (mounted) {
+ final sessions = ref.read(sessionsProvider);
+ for (final entry in pending.entries) {
+ final session = sessions.firstWhere(
+ (s) => s.id == entry.key,
+ orElse: () => Session(id: entry.key, index: 0, name: 'Unknown', type: 'claude'),
+ );
+ final count = entry.value.length;
+ final preview = count == 1
+ ? entry.value.first.content
+ : '$count messages';
+ ToastManager.show(
+ context,
+ sessionName: session.name,
+ preview: preview.length > 100 ? '${preview.substring(0, 100)}...' : preview,
+ onTap: () => _switchSession(entry.key),
+ );
+ }
+ }
+ () async {
+ for (final entry in pending.entries) {
+ final existing = await MessageStore.loadAll(entry.key);
+ MessageStore.save(entry.key, [...existing, ...entry.value]);
+ await MessageStore.flush();
+ }
+ }();
+ }
+ // Clear unread for active session
+ if (activeId != null) {
+ final counts = Map<String, int>.from(ref.read(unreadCountsProvider));
+ counts.remove(activeId);
+ ref.read(unreadCountsProvider.notifier).state = counts;
+ }
}
case 'pong':
break; // heartbeat response, ignore
@@ -284,6 +372,22 @@
);
ref.read(activeSessionIdProvider.notifier).state = active.id;
ref.read(messagesProvider.notifier).switchSession(active.id);
+ SharedPreferences.getInstance().then((p) => p.setString('activeSessionId', active.id));
+ }
+
+ // Session is ready — process any pending messages that arrived before sessions list
+ if (!_sessionReady) {
+ _sessionReady = true;
+ // Request catch_up now that session is set
+ _sendCommand('catch_up', {'lastSeq': _lastSeq});
+ // Drain messages that arrived before sessions list
+ if (_pendingMessages.isNotEmpty) {
+ final pending = List<Map<String, dynamic>>.from(_pendingMessages);
+ _pendingMessages.clear();
+ for (final m in pending) {
+ _handleMessage(m);
+ }
+ }
}
}
@@ -507,6 +611,7 @@
ref.read(activeSessionIdProvider.notifier).state = sessionId;
await ref.read(messagesProvider.notifier).switchSession(sessionId);
+ SharedPreferences.getInstance().then((p) => p.setString('activeSessionId', sessionId));
final counts = Map<String, int>.from(ref.read(unreadCountsProvider));
counts.remove(sessionId);
diff --git a/lib/screens/navigate_screen.dart b/lib/screens/navigate_screen.dart
index 9ffa4cf..b0cfc88 100644
--- a/lib/screens/navigate_screen.dart
+++ b/lib/screens/navigate_screen.dart
@@ -192,20 +192,11 @@
void _sendKey(String key) {
_haptic();
- // Send via WebSocket - the chat screen's WS is in the provider
- // We need to access the WS through the provider system
- // For now, send a nav command message
+ // Send via MQTT - the chat screen's MQTT service is in the provider
final activeSessionId = ref.read(activeSessionIdProvider);
- // Build the navigate command
- // This sends a key press to the AIBroker daemon
- // which forwards it to the active terminal session
- // The WS is managed by ChatScreen, so we'll use a message approach
-
- // Since we can't directly access the WS from here,
- // we send through the provider approach - the message will be picked up
- // by the WS service in ChatScreen via a shared notification mechanism.
- // For simplicity, we use a global event bus pattern.
+ // Send a key press to the AIBroker daemon via the MQTT service.
+ // NavigateNotifier bridges the navigate screen to the chat screen's MQTT service.
NavigateNotifier.instance?.sendKey(key, activeSessionId);
@@ -228,8 +219,8 @@
}
}
-/// Global notifier to bridge navigate screen to WebSocket.
-/// Set by ChatScreen when WS is initialized.
+/// Global notifier to bridge navigate screen to MQTT service.
+/// Set by ChatScreen when MQTT is initialized.
class NavigateNotifier {
static NavigateNotifier? instance;
diff --git a/lib/screens/settings_screen.dart b/lib/screens/settings_screen.dart
index ed129f8..0cfb0fc 100644
--- a/lib/screens/settings_screen.dart
+++ b/lib/screens/settings_screen.dart
@@ -3,7 +3,7 @@
import '../models/server_config.dart';
import '../providers/providers.dart';
-import '../services/websocket_service.dart' show ConnectionStatus;
+import '../services/mqtt_service.dart' show ConnectionStatus;
import '../services/wol_service.dart';
import '../theme/app_theme.dart';
import '../widgets/status_dot.dart';
diff --git a/lib/services/mqtt_service.dart b/lib/services/mqtt_service.dart
index 32e6fba..f7a51be 100644
--- a/lib/services/mqtt_service.dart
+++ b/lib/services/mqtt_service.dart
@@ -10,8 +10,15 @@
import 'package:uuid/uuid.dart';
import '../models/server_config.dart';
-import 'websocket_service.dart' show ConnectionStatus;
import 'wol_service.dart';
+
+/// Connection status for the MQTT client.
+enum ConnectionStatus {
+ disconnected,
+ connecting,
+ connected,
+ reconnecting,
+}
// Debug log to file (survives release builds)
Future<void> _mqttLog(String msg) async {
@@ -23,11 +30,11 @@
} catch (_) {}
}
-/// MQTT client for PAILot, replacing WebSocketService.
+/// MQTT client for PAILot.
///
/// Connects to the AIBroker daemon's embedded aedes broker.
/// Subscribes to all pailot/ topics and dispatches messages
-/// through the same callback interface as WebSocketService.
+/// through the onMessage callback interface.
class MqttService with WidgetsBindingObserver {
MqttService({required this.config});
@@ -43,7 +50,7 @@
final List<String> _seenMsgIdOrder = [];
static const int _maxSeenIds = 500;
- // Callbacks — same interface as WebSocketService
+ // Callbacks
void Function(ConnectionStatus status)? onStatusChanged;
void Function(Map<String, dynamic> message)? onMessage;
void Function()? onOpen;
@@ -149,9 +156,12 @@
client.onAutoReconnect = _onAutoReconnect;
client.onAutoReconnected = _onAutoReconnected;
- // Persistent session: broker queues QoS 1 messages while client is offline
+ // Clean session: we handle offline delivery ourselves via catch_up protocol.
+ // Persistent sessions cause the broker to flood all queued QoS 1 messages
+ // on reconnect, which overwhelms the client with large voice payloads.
final connMessage = MqttConnectMessage()
.withClientIdentifier(clientId)
+ .startClean()
.authenticateAs('pailot', config.mqttToken ?? '');
client.connectionMessage = connMessage;
@@ -268,7 +278,7 @@
/// Route incoming MQTT messages to the onMessage callback.
/// Translates MQTT topic structure into the flat message format
- /// that chat_screen expects (same as WebSocket messages).
+ /// that chat_screen expects.
void _dispatchMessage(String topic, Map<String, dynamic> json) {
final parts = topic.split('/');
@@ -369,7 +379,6 @@
}
/// Send a message — routes to the appropriate MQTT topic based on content.
- /// Accepts the same message format as WebSocketService.send().
void send(Map<String, dynamic> message) {
final type = message['type'] as String?;
final sessionId = message['sessionId'] as String?;
diff --git a/lib/services/websocket_service.dart b/lib/services/websocket_service.dart
deleted file mode 100644
index 96e21fa..0000000
--- a/lib/services/websocket_service.dart
+++ /dev/null
@@ -1,288 +0,0 @@
-import 'dart:async';
-import 'dart:convert';
-
-import 'package:flutter/widgets.dart';
-import 'package:web_socket_channel/web_socket_channel.dart';
-
-import '../models/server_config.dart';
-import 'wol_service.dart';
-
-enum ConnectionStatus {
- disconnected,
- connecting,
- connected,
- reconnecting,
-}
-
-/// WebSocket client with dual-URL fallback, heartbeat, and auto-reconnect.
-class WebSocketService with WidgetsBindingObserver {
- WebSocketService({required this.config});
-
- ServerConfig config;
- WebSocketChannel? _channel;
- ConnectionStatus _status = ConnectionStatus.disconnected;
- Timer? _heartbeatTimer;
- Timer? _zombieTimer;
- Timer? _reconnectTimer;
- int _reconnectAttempt = 0;
- bool _intentionalClose = false;
- DateTime? _lastPong;
- StreamSubscription? _subscription;
-
- // Callbacks
- void Function()? onOpen;
- void Function()? onClose;
- void Function()? onReconnecting;
- void Function(Map<String, dynamic> message)? onMessage;
- void Function(String error)? onError;
- void Function(ConnectionStatus status)? onStatusChanged;
-
- ConnectionStatus get status => _status;
- bool get isConnected => _status == ConnectionStatus.connected;
-
- void _setStatus(ConnectionStatus newStatus) {
- if (_status == newStatus) return;
- _status = newStatus;
- onStatusChanged?.call(newStatus);
- }
-
- /// Connect to the WebSocket server.
- /// Tries local URL first (2.5s timeout), then remote URL.
- Future<void> connect() async {
- if (_status == ConnectionStatus.connected ||
- _status == ConnectionStatus.connecting) {
- return;
- }
-
- _intentionalClose = false;
- _setStatus(ConnectionStatus.connecting);
-
- // Send Wake-on-LAN if MAC configured
- if (config.macAddress != null && config.macAddress!.isNotEmpty) {
- try {
- await WolService.wake(config.macAddress!, localHost: config.localHost);
- } catch (_) {}
- }
-
- final urls = config.urls;
-
- for (final url in urls) {
- if (_intentionalClose) return;
-
- try {
- final connected = await _tryConnect(url,
- timeout: url == urls.first && urls.length > 1
- ? const Duration(milliseconds: 2500)
- : const Duration(seconds: 5));
- if (connected) return;
- } catch (_) {
- continue;
- }
- }
-
- // All URLs failed
- _setStatus(ConnectionStatus.disconnected);
- onError?.call('Failed to connect to server');
- _scheduleReconnect();
- }
-
- Future<bool> _tryConnect(String url, {Duration? timeout}) async {
- try {
- final uri = Uri.parse(url);
- final channel = WebSocketChannel.connect(uri);
-
- // Wait for connection with timeout
- await channel.ready.timeout(
- timeout ?? const Duration(seconds: 5),
- onTimeout: () {
- channel.sink.close();
- throw TimeoutException('Connection timeout');
- },
- );
-
- _channel = channel;
- _reconnectAttempt = 0;
- _setStatus(ConnectionStatus.connected);
- _startHeartbeat();
- _listenMessages();
- onOpen?.call();
- return true;
- } catch (e) {
- return false;
- }
- }
-
- void _listenMessages() {
- _subscription?.cancel();
- _subscription = _channel?.stream.listen(
- (data) {
- _lastPong = DateTime.now();
-
- if (data is String) {
- // Handle pong
- if (data == 'pong') return;
-
- try {
- final json = jsonDecode(data) as Map<String, dynamic>;
- onMessage?.call(json);
- } catch (_) {
- // Non-JSON message, ignore
- }
- }
- },
- onError: (error) {
- onError?.call(error.toString());
- _handleDisconnect();
- },
- onDone: () {
- _handleDisconnect();
- },
- );
- }
-
- void _startHeartbeat() {
- _heartbeatTimer?.cancel();
- _zombieTimer?.cancel();
- _lastPong = DateTime.now();
-
- // Send ping every 30 seconds
- _heartbeatTimer = Timer.periodic(const Duration(seconds: 30), (_) {
- if (_channel != null && _status == ConnectionStatus.connected) {
- try {
- _channel!.sink.add(jsonEncode({'type': 'ping'}));
- } catch (_) {
- _handleDisconnect();
- }
- }
- });
-
- // Check for zombie connection every 15 seconds
- _zombieTimer = Timer.periodic(const Duration(seconds: 15), (_) {
- if (_lastPong != null) {
- final elapsed = DateTime.now().difference(_lastPong!);
- if (elapsed.inSeconds > 60) {
- _handleDisconnect();
- }
- }
- });
- }
-
- void _handleDisconnect() {
- _stopHeartbeat();
- _subscription?.cancel();
-
- final wasConnected = _status == ConnectionStatus.connected;
-
- try {
- _channel?.sink.close();
- } catch (_) {}
- _channel = null;
-
- if (_intentionalClose) {
- _setStatus(ConnectionStatus.disconnected);
- onClose?.call();
- } else if (wasConnected) {
- _setStatus(ConnectionStatus.reconnecting);
- onReconnecting?.call();
- _scheduleReconnect();
- }
- }
-
- void _stopHeartbeat() {
- _heartbeatTimer?.cancel();
- _zombieTimer?.cancel();
- _heartbeatTimer = null;
- _zombieTimer = null;
- }
-
- void _scheduleReconnect() {
- if (_intentionalClose) return;
-
- _reconnectTimer?.cancel();
-
- // Exponential backoff: 1s, 2s, 4s, 8s, 16s, 30s max
- final delay = Duration(
- milliseconds: (1000 * (1 << _reconnectAttempt.clamp(0, 4)))
- .clamp(1000, 30000),
- );
-
- _reconnectAttempt++;
-
- _reconnectTimer = Timer(delay, () {
- if (!_intentionalClose) {
- _setStatus(ConnectionStatus.reconnecting);
- onReconnecting?.call();
- connect();
- }
- });
- }
-
- /// Send a JSON message.
- void send(Map<String, dynamic> message) {
- if (_channel == null || _status != ConnectionStatus.connected) {
- onError?.call('Not connected');
- return;
- }
-
- try {
- _channel!.sink.add(jsonEncode(message));
- } catch (e) {
- onError?.call('Send failed: $e');
- }
- }
-
- /// Send a raw string.
- void sendRaw(String data) {
- if (_channel == null || _status != ConnectionStatus.connected) return;
- try {
- _channel!.sink.add(data);
- } catch (_) {}
- }
-
- /// Disconnect intentionally.
- void disconnect() {
- _intentionalClose = true;
- _reconnectTimer?.cancel();
- _stopHeartbeat();
- _subscription?.cancel();
-
- try {
- _channel?.sink.close();
- } catch (_) {}
- _channel = null;
-
- _setStatus(ConnectionStatus.disconnected);
- onClose?.call();
- }
-
- /// Update config and reconnect.
- Future<void> updateConfig(ServerConfig newConfig) async {
- config = newConfig;
- disconnect();
- await Future.delayed(const Duration(milliseconds: 100));
- await connect();
- }
-
- /// Dispose all resources.
- void dispose() {
- disconnect();
- _reconnectTimer?.cancel();
- }
-
- // App lifecycle integration
- @override
- void didChangeAppLifecycleState(AppLifecycleState state) {
- switch (state) {
- case AppLifecycleState.resumed:
- if (_status != ConnectionStatus.connected && !_intentionalClose) {
- _reconnectAttempt = 0;
- connect();
- }
- case AppLifecycleState.paused:
- // Keep connection alive but don't reconnect aggressively
- break;
- default:
- break;
- }
- }
-}
diff --git a/lib/widgets/status_dot.dart b/lib/widgets/status_dot.dart
index bb4f631..94023e0 100644
--- a/lib/widgets/status_dot.dart
+++ b/lib/widgets/status_dot.dart
@@ -1,9 +1,9 @@
import 'package:flutter/material.dart';
-import '../services/websocket_service.dart';
+import '../services/mqtt_service.dart';
import '../theme/app_theme.dart';
-/// 10px circle indicating WebSocket connection status.
+/// 10px circle indicating MQTT connection status.
class StatusDot extends StatelessWidget {
final ConnectionStatus status;
--
Gitblit v1.3.1