| lib/models/server_config.dart | patch | view | blame | history | |
| lib/providers/providers.dart | patch | view | blame | history | |
| lib/screens/chat_screen.dart | patch | view | blame | history | |
| lib/screens/navigate_screen.dart | patch | view | blame | history | |
| lib/screens/settings_screen.dart | patch | view | blame | history | |
| lib/services/mqtt_service.dart | patch | view | blame | history | |
| lib/services/websocket_service.dart | patch | view | blame | history | |
| lib/widgets/status_dot.dart | patch | view | blame | history |
lib/models/server_config.dart
.. .. @@ -13,23 +13,6 @@ 13 13 this.mqttToken, 14 14 }); 15 15 16 - /// Primary WebSocket URL (local network).17 - String get localUrl {18 - final h = localHost ?? host;19 - return 'ws://$h:$port';20 - }21 -22 - /// Fallback WebSocket URL (remote / public).23 - String get remoteUrl => 'ws://$host:$port';24 -25 - /// Returns [localUrl, remoteUrl] for dual-connect attempts.26 - List<String> get urls {27 - if (localHost != null && localHost!.isNotEmpty && localHost != host) {28 - return [localUrl, remoteUrl];29 - }30 - return [remoteUrl];31 - }32 -33 16 Map<String, dynamic> toJson() { 34 17 return { 35 18 'host': host, lib/providers/providers.dart
.. .. @@ -8,7 +8,7 @@ 8 8 import '../models/server_config.dart'; 9 9 import '../models/session.dart'; 10 10 import '../services/message_store.dart'; 11 -import '../services/websocket_service.dart' show ConnectionStatus;11 +import '../services/mqtt_service.dart' show ConnectionStatus;12 12 13 13 // --- Enums --- 14 14 lib/screens/chat_screen.dart
.. .. @@ -56,6 +56,9 @@ 56 56 bool _isCatchingUp = false; 57 57 bool _screenshotForChat = false; 58 58 final Set<int> _seenSeqs = {}; 59 + bool _sessionReady = false;60 + final List<Map<String, dynamic>> _pendingMessages = [];61 + final Map<String, List<Message>> _catchUpPending = {};59 62 60 63 @override 61 64 void initState() { .. .. @@ -66,9 +69,14 @@ 66 69 } 67 70 68 71 Future<void> _initAll() async { 69 - // Load lastSeq BEFORE connecting so catch_up sends the right value72 + // Load persisted state BEFORE connecting70 73 final prefs = await SharedPreferences.getInstance(); 71 74 _lastSeq = prefs.getInt('lastSeq') ?? 0; 75 + // Restore last active session so catch_up routes to the right session76 + final savedSessionId = prefs.getString('activeSessionId');77 + if (savedSessionId != null && mounted) {78 + ref.read(activeSessionIdProvider.notifier).state = savedSessionId;79 + }72 80 if (!mounted) return; 73 81 74 82 // Listen for playback state changes to reset play button UI .. .. @@ -146,10 +154,11 @@ 146 154 }; 147 155 _ws!.onMessage = _handleMessage; 148 156 _ws!.onOpen = () { 157 + _sessionReady = false; // Gate messages until sessions arrive158 + _pendingMessages.clear();149 159 final activeId = ref.read(activeSessionIdProvider); 150 160 _sendCommand('sync', activeId != null ? {'activeSessionId': activeId} : null); 151 - // catch_up is still available during the transition period152 - _sendCommand('catch_up', {'lastSeq': _lastSeq});161 + // catch_up is sent after sessions arrive (in _handleSessions)153 162 }; 154 163 _ws!.onError = (error) { 155 164 debugPrint('MQTT error: $error'); .. .. @@ -168,6 +177,14 @@ 168 177 } 169 178 170 179 void _handleMessage(Map<String, dynamic> msg) { 180 + final type = msg['type'] as String?;181 + // Sessions and catch_up always process immediately182 + // Content messages (text, voice, image) wait until session is ready183 + if (!_sessionReady && type != 'sessions' && type != 'catch_up' && type != 'status' && type != 'typing') {184 + _pendingMessages.add(msg);185 + return;186 + }187 +171 188 // Track sequence numbers for catch_up protocol 172 189 final seq = msg['seq'] as int?; 173 190 if (seq != null) { .. .. @@ -184,8 +201,6 @@ 184 201 _saveLastSeq(); 185 202 } 186 203 } 187 -188 - final type = msg['type'] as String?;189 204 190 205 switch (type) { 191 206 case 'sessions': .. .. @@ -231,7 +246,8 @@ 231 246 if (sessionId != null) _incrementUnread(sessionId); 232 247 case 'catch_up': 233 248 final serverSeq = msg['serverSeq'] as int?; 234 - if (serverSeq != null && serverSeq > _lastSeq) {249 + if (serverSeq != null) {250 + // Always sync to server's seq — if server restarted, its seq may be lower235 251 _lastSeq = serverSeq; 236 252 _saveLastSeq(); 237 253 } .. .. @@ -241,19 +257,91 @@ 241 257 final catchUpMsgs = msg['messages'] as List<dynamic>?; 242 258 if (catchUpMsgs != null && catchUpMsgs.isNotEmpty) { 243 259 _isCatchingUp = true; 260 + final activeId = ref.read(activeSessionIdProvider);244 261 final existing = ref.read(messagesProvider); 245 262 final existingContents = existing 246 263 .where((m) => m.role == MessageRole.assistant) 247 264 .map((m) => m.content) 248 265 .toSet(); 249 266 for (final m in catchUpMsgs) { 250 - final content = (m as Map<String, dynamic>)['content'] as String? ?? '';251 - // Skip if we already have this message locally252 - if (content.isNotEmpty && existingContents.contains(content)) continue;253 - _handleMessage(m);254 - if (content.isNotEmpty) existingContents.add(content);267 + final map = m as Map<String, dynamic>;268 + final msgType = map['type'] as String? ?? 'text';269 + final content = map['content'] as String? ?? map['transcript'] as String? ?? map['caption'] as String? ?? '';270 + final msgSessionId = map['sessionId'] as String?;271 + final imageData = map['imageBase64'] as String?;272 +273 + // Skip empty text messages (images with no caption are OK)274 + if (content.isEmpty && imageData == null) continue;275 + // Dedup by content (skip images from dedup — they have unique msgIds)276 + if (imageData == null && content.isNotEmpty && existingContents.contains(content)) continue;277 +278 + final Message message;279 + if (msgType == 'image' && imageData != null) {280 + message = Message.image(281 + role: MessageRole.assistant,282 + imageBase64: imageData,283 + content: content,284 + status: MessageStatus.sent,285 + );286 + } else {287 + message = Message.text(288 + role: MessageRole.assistant,289 + content: content,290 + status: MessageStatus.sent,291 + );292 + }293 +294 + if (msgSessionId == null || msgSessionId == activeId) {295 + // Active session or no session: add directly to chat296 + ref.read(messagesProvider.notifier).addMessage(message);297 + } else {298 + // Different session: store + unread badge + toast299 + // Collect for batch storage below to avoid race condition300 + _catchUpPending.putIfAbsent(msgSessionId, () => []).add(message);301 + _incrementUnread(msgSessionId);302 + }303 + existingContents.add(content);255 304 } 256 305 _isCatchingUp = false; 306 + _scrollToBottom();307 + // Batch-store cross-session messages (sequential to avoid race condition)308 + if (_catchUpPending.isNotEmpty) {309 + final pending = Map<String, List<Message>>.from(_catchUpPending);310 + _catchUpPending.clear();311 + // Show one toast per session with message count312 + if (mounted) {313 + final sessions = ref.read(sessionsProvider);314 + for (final entry in pending.entries) {315 + final session = sessions.firstWhere(316 + (s) => s.id == entry.key,317 + orElse: () => Session(id: entry.key, index: 0, name: 'Unknown', type: 'claude'),318 + );319 + final count = entry.value.length;320 + final preview = count == 1321 + ? entry.value.first.content322 + : '$count messages';323 + ToastManager.show(324 + context,325 + sessionName: session.name,326 + preview: preview.length > 100 ? '${preview.substring(0, 100)}...' : preview,327 + onTap: () => _switchSession(entry.key),328 + );329 + }330 + }331 + () async {332 + for (final entry in pending.entries) {333 + final existing = await MessageStore.loadAll(entry.key);334 + MessageStore.save(entry.key, [...existing, ...entry.value]);335 + await MessageStore.flush();336 + }337 + }();338 + }339 + // Clear unread for active session340 + if (activeId != null) {341 + final counts = Map<String, int>.from(ref.read(unreadCountsProvider));342 + counts.remove(activeId);343 + ref.read(unreadCountsProvider.notifier).state = counts;344 + }257 345 } 258 346 case 'pong': 259 347 break; // heartbeat response, ignore .. .. @@ -284,6 +372,22 @@ 284 372 ); 285 373 ref.read(activeSessionIdProvider.notifier).state = active.id; 286 374 ref.read(messagesProvider.notifier).switchSession(active.id); 375 + SharedPreferences.getInstance().then((p) => p.setString('activeSessionId', active.id));376 + }377 +378 + // Session is ready — process any pending messages that arrived before sessions list379 + if (!_sessionReady) {380 + _sessionReady = true;381 + // Request catch_up now that session is set382 + _sendCommand('catch_up', {'lastSeq': _lastSeq});383 + // Drain messages that arrived before sessions list384 + if (_pendingMessages.isNotEmpty) {385 + final pending = List<Map<String, dynamic>>.from(_pendingMessages);386 + _pendingMessages.clear();387 + for (final m in pending) {388 + _handleMessage(m);389 + }390 + }287 391 } 288 392 } 289 393 .. .. @@ -507,6 +611,7 @@ 507 611 508 612 ref.read(activeSessionIdProvider.notifier).state = sessionId; 509 613 await ref.read(messagesProvider.notifier).switchSession(sessionId); 614 + SharedPreferences.getInstance().then((p) => p.setString('activeSessionId', sessionId));510 615 511 616 final counts = Map<String, int>.from(ref.read(unreadCountsProvider)); 512 617 counts.remove(sessionId);
.. .. @@ -192,20 +192,11 @@ 192 192 void _sendKey(String key) { 193 193 _haptic(); 194 194 195 - // Send via WebSocket - the chat screen's WS is in the provider196 - // We need to access the WS through the provider system197 - // For now, send a nav command message195 + // Send via MQTT - the chat screen's MQTT service is in the provider198 196 final activeSessionId = ref.read(activeSessionIdProvider); 199 197 200 - // Build the navigate command201 - // This sends a key press to the AIBroker daemon202 - // which forwards it to the active terminal session203 - // The WS is managed by ChatScreen, so we'll use a message approach204 -205 - // Since we can't directly access the WS from here,206 - // we send through the provider approach - the message will be picked up207 - // by the WS service in ChatScreen via a shared notification mechanism.208 - // For simplicity, we use a global event bus pattern.198 + // Send a key press to the AIBroker daemon via the MQTT service.199 + // NavigateNotifier bridges the navigate screen to the chat screen's MQTT service.209 200 210 201 NavigateNotifier.instance?.sendKey(key, activeSessionId); 211 202 .. .. @@ -228,8 +219,8 @@ 228 219 } 229 220 } 230 221 231 -/// Global notifier to bridge navigate screen to WebSocket.232 -/// Set by ChatScreen when WS is initialized.222 +/// Global notifier to bridge navigate screen to MQTT service.223 +/// Set by ChatScreen when MQTT is initialized.233 224 class NavigateNotifier { 234 225 static NavigateNotifier? instance; 235 226 lib/screens/settings_screen.dart
.. .. @@ -3,7 +3,7 @@ 3 3 4 4 import '../models/server_config.dart'; 5 5 import '../providers/providers.dart'; 6 -import '../services/websocket_service.dart' show ConnectionStatus;6 +import '../services/mqtt_service.dart' show ConnectionStatus;7 7 import '../services/wol_service.dart'; 8 8 import '../theme/app_theme.dart'; 9 9 import '../widgets/status_dot.dart'; lib/services/mqtt_service.dart
.. .. @@ -10,8 +10,15 @@ 10 10 import 'package:uuid/uuid.dart'; 11 11 12 12 import '../models/server_config.dart'; 13 -import 'websocket_service.dart' show ConnectionStatus;14 13 import 'wol_service.dart'; 14 +15 +/// Connection status for the MQTT client.16 +enum ConnectionStatus {17 + disconnected,18 + connecting,19 + connected,20 + reconnecting,21 +}15 22 16 23 // Debug log to file (survives release builds) 17 24 Future<void> _mqttLog(String msg) async { .. .. @@ -23,11 +30,11 @@ 23 30 } catch (_) {} 24 31 } 25 32 26 -/// MQTT client for PAILot, replacing WebSocketService.33 +/// MQTT client for PAILot.27 34 /// 28 35 /// Connects to the AIBroker daemon's embedded aedes broker. 29 36 /// Subscribes to all pailot/ topics and dispatches messages 30 -/// through the same callback interface as WebSocketService.37 +/// through the onMessage callback interface.31 38 class MqttService with WidgetsBindingObserver { 32 39 MqttService({required this.config}); 33 40 .. .. @@ -43,7 +50,7 @@ 43 50 final List<String> _seenMsgIdOrder = []; 44 51 static const int _maxSeenIds = 500; 45 52 46 - // Callbacks — same interface as WebSocketService53 + // Callbacks47 54 void Function(ConnectionStatus status)? onStatusChanged; 48 55 void Function(Map<String, dynamic> message)? onMessage; 49 56 void Function()? onOpen; .. .. @@ -149,9 +156,12 @@ 149 156 client.onAutoReconnect = _onAutoReconnect; 150 157 client.onAutoReconnected = _onAutoReconnected; 151 158 152 - // Persistent session: broker queues QoS 1 messages while client is offline159 + // Clean session: we handle offline delivery ourselves via catch_up protocol.160 + // Persistent sessions cause the broker to flood all queued QoS 1 messages161 + // on reconnect, which overwhelms the client with large voice payloads.153 162 final connMessage = MqttConnectMessage() 154 163 .withClientIdentifier(clientId) 164 + .startClean()155 165 .authenticateAs('pailot', config.mqttToken ?? ''); 156 166 157 167 client.connectionMessage = connMessage; .. .. @@ -268,7 +278,7 @@ 268 278 269 279 /// Route incoming MQTT messages to the onMessage callback. 270 280 /// Translates MQTT topic structure into the flat message format 271 - /// that chat_screen expects (same as WebSocket messages).281 + /// that chat_screen expects.272 282 void _dispatchMessage(String topic, Map<String, dynamic> json) { 273 283 final parts = topic.split('/'); 274 284 .. .. @@ -369,7 +379,6 @@ 369 379 } 370 380 371 381 /// Send a message — routes to the appropriate MQTT topic based on content. 372 - /// Accepts the same message format as WebSocketService.send().373 382 void send(Map<String, dynamic> message) { 374 383 final type = message['type'] as String?; 375 384 final sessionId = message['sessionId'] as String?; lib/services/websocket_service.dartdeleted file mode 100644
.. .. @@ -1,288 +0,0 @@ 1 -import 'dart:async';2 -import 'dart:convert';3 -4 -import 'package:flutter/widgets.dart';5 -import 'package:web_socket_channel/web_socket_channel.dart';6 -7 -import '../models/server_config.dart';8 -import 'wol_service.dart';9 -10 -enum ConnectionStatus {11 - disconnected,12 - connecting,13 - connected,14 - reconnecting,15 -}16 -17 -/// WebSocket client with dual-URL fallback, heartbeat, and auto-reconnect.18 -class WebSocketService with WidgetsBindingObserver {19 - WebSocketService({required this.config});20 -21 - ServerConfig config;22 - WebSocketChannel? _channel;23 - ConnectionStatus _status = ConnectionStatus.disconnected;24 - Timer? _heartbeatTimer;25 - Timer? _zombieTimer;26 - Timer? _reconnectTimer;27 - int _reconnectAttempt = 0;28 - bool _intentionalClose = false;29 - DateTime? _lastPong;30 - StreamSubscription? _subscription;31 -32 - // Callbacks33 - void Function()? onOpen;34 - void Function()? onClose;35 - void Function()? onReconnecting;36 - void Function(Map<String, dynamic> message)? onMessage;37 - void Function(String error)? onError;38 - void Function(ConnectionStatus status)? onStatusChanged;39 -40 - ConnectionStatus get status => _status;41 - bool get isConnected => _status == ConnectionStatus.connected;42 -43 - void _setStatus(ConnectionStatus newStatus) {44 - if (_status == newStatus) return;45 - _status = newStatus;46 - onStatusChanged?.call(newStatus);47 - }48 -49 - /// Connect to the WebSocket server.50 - /// Tries local URL first (2.5s timeout), then remote URL.51 - Future<void> connect() async {52 - if (_status == ConnectionStatus.connected ||53 - _status == ConnectionStatus.connecting) {54 - return;55 - }56 -57 - _intentionalClose = false;58 - _setStatus(ConnectionStatus.connecting);59 -60 - // Send Wake-on-LAN if MAC configured61 - if (config.macAddress != null && config.macAddress!.isNotEmpty) {62 - try {63 - await WolService.wake(config.macAddress!, localHost: config.localHost);64 - } catch (_) {}65 - }66 -67 - final urls = config.urls;68 -69 - for (final url in urls) {70 - if (_intentionalClose) return;71 -72 - try {73 - final connected = await _tryConnect(url,74 - timeout: url == urls.first && urls.length > 175 - ? const Duration(milliseconds: 2500)76 - : const Duration(seconds: 5));77 - if (connected) return;78 - } catch (_) {79 - continue;80 - }81 - }82 -83 - // All URLs failed84 - _setStatus(ConnectionStatus.disconnected);85 - onError?.call('Failed to connect to server');86 - _scheduleReconnect();87 - }88 -89 - Future<bool> _tryConnect(String url, {Duration? timeout}) async {90 - try {91 - final uri = Uri.parse(url);92 - final channel = WebSocketChannel.connect(uri);93 -94 - // Wait for connection with timeout95 - await channel.ready.timeout(96 - timeout ?? const Duration(seconds: 5),97 - onTimeout: () {98 - channel.sink.close();99 - throw TimeoutException('Connection timeout');100 - },101 - );102 -103 - _channel = channel;104 - _reconnectAttempt = 0;105 - _setStatus(ConnectionStatus.connected);106 - _startHeartbeat();107 - _listenMessages();108 - onOpen?.call();109 - return true;110 - } catch (e) {111 - return false;112 - }113 - }114 -115 - void _listenMessages() {116 - _subscription?.cancel();117 - _subscription = _channel?.stream.listen(118 - (data) {119 - _lastPong = DateTime.now();120 -121 - if (data is String) {122 - // Handle pong123 - if (data == 'pong') return;124 -125 - try {126 - final json = jsonDecode(data) as Map<String, dynamic>;127 - onMessage?.call(json);128 - } catch (_) {129 - // Non-JSON message, ignore130 - }131 - }132 - },133 - onError: (error) {134 - onError?.call(error.toString());135 - _handleDisconnect();136 - },137 - onDone: () {138 - _handleDisconnect();139 - },140 - );141 - }142 -143 - void _startHeartbeat() {144 - _heartbeatTimer?.cancel();145 - _zombieTimer?.cancel();146 - _lastPong = DateTime.now();147 -148 - // Send ping every 30 seconds149 - _heartbeatTimer = Timer.periodic(const Duration(seconds: 30), (_) {150 - if (_channel != null && _status == ConnectionStatus.connected) {151 - try {152 - _channel!.sink.add(jsonEncode({'type': 'ping'}));153 - } catch (_) {154 - _handleDisconnect();155 - }156 - }157 - });158 -159 - // Check for zombie connection every 15 seconds160 - _zombieTimer = Timer.periodic(const Duration(seconds: 15), (_) {161 - if (_lastPong != null) {162 - final elapsed = DateTime.now().difference(_lastPong!);163 - if (elapsed.inSeconds > 60) {164 - _handleDisconnect();165 - }166 - }167 - });168 - }169 -170 - void _handleDisconnect() {171 - _stopHeartbeat();172 - _subscription?.cancel();173 -174 - final wasConnected = _status == ConnectionStatus.connected;175 -176 - try {177 - _channel?.sink.close();178 - } catch (_) {}179 - _channel = null;180 -181 - if (_intentionalClose) {182 - _setStatus(ConnectionStatus.disconnected);183 - onClose?.call();184 - } else if (wasConnected) {185 - _setStatus(ConnectionStatus.reconnecting);186 - onReconnecting?.call();187 - _scheduleReconnect();188 - }189 - }190 -191 - void _stopHeartbeat() {192 - _heartbeatTimer?.cancel();193 - _zombieTimer?.cancel();194 - _heartbeatTimer = null;195 - _zombieTimer = null;196 - }197 -198 - void _scheduleReconnect() {199 - if (_intentionalClose) return;200 -201 - _reconnectTimer?.cancel();202 -203 - // Exponential backoff: 1s, 2s, 4s, 8s, 16s, 30s max204 - final delay = Duration(205 - milliseconds: (1000 * (1 << _reconnectAttempt.clamp(0, 4)))206 - .clamp(1000, 30000),207 - );208 -209 - _reconnectAttempt++;210 -211 - _reconnectTimer = Timer(delay, () {212 - if (!_intentionalClose) {213 - _setStatus(ConnectionStatus.reconnecting);214 - onReconnecting?.call();215 - connect();216 - }217 - });218 - }219 -220 - /// Send a JSON message.221 - void send(Map<String, dynamic> message) {222 - if (_channel == null || _status != ConnectionStatus.connected) {223 - onError?.call('Not connected');224 - return;225 - }226 -227 - try {228 - _channel!.sink.add(jsonEncode(message));229 - } catch (e) {230 - onError?.call('Send failed: $e');231 - }232 - }233 -234 - /// Send a raw string.235 - void sendRaw(String data) {236 - if (_channel == null || _status != ConnectionStatus.connected) return;237 - try {238 - _channel!.sink.add(data);239 - } catch (_) {}240 - }241 -242 - /// Disconnect intentionally.243 - void disconnect() {244 - _intentionalClose = true;245 - _reconnectTimer?.cancel();246 - _stopHeartbeat();247 - _subscription?.cancel();248 -249 - try {250 - _channel?.sink.close();251 - } catch (_) {}252 - _channel = null;253 -254 - _setStatus(ConnectionStatus.disconnected);255 - onClose?.call();256 - }257 -258 - /// Update config and reconnect.259 - Future<void> updateConfig(ServerConfig newConfig) async {260 - config = newConfig;261 - disconnect();262 - await Future.delayed(const Duration(milliseconds: 100));263 - await connect();264 - }265 -266 - /// Dispose all resources.267 - void dispose() {268 - disconnect();269 - _reconnectTimer?.cancel();270 - }271 -272 - // App lifecycle integration273 - @override274 - void didChangeAppLifecycleState(AppLifecycleState state) {275 - switch (state) {276 - case AppLifecycleState.resumed:277 - if (_status != ConnectionStatus.connected && !_intentionalClose) {278 - _reconnectAttempt = 0;279 - connect();280 - }281 - case AppLifecycleState.paused:282 - // Keep connection alive but don't reconnect aggressively283 - break;284 - default:285 - break;286 - }287 - }288 -}lib/widgets/status_dot.dart
.. .. @@ -1,9 +1,9 @@ 1 1 import 'package:flutter/material.dart'; 2 2 3 -import '../services/websocket_service.dart';3 +import '../services/mqtt_service.dart';4 4 import '../theme/app_theme.dart'; 5 5 6 -/// 10px circle indicating WebSocket connection status.6 +/// 10px circle indicating MQTT connection status.7 7 class StatusDot extends StatelessWidget { 8 8 final ConnectionStatus status; 9 9