From cb470d33d2665fcc6f8448d2736777656cf0cbe7 Mon Sep 17 00:00:00 2001
From: Matthias Nott <mnott@mnsoft.org>
Date: Tue, 24 Mar 2026 00:25:07 +0100
Subject: [PATCH] feat: MQTT migration, offline catch_up, clean session, image support

---
 lib/screens/settings_screen.dart |    2 
 /dev/null                        |  288 --------------------------------
 lib/providers/providers.dart     |    2 
 lib/models/server_config.dart    |   17 -
 lib/services/mqtt_service.dart   |   23 +
 lib/screens/navigate_screen.dart |   19 -
 lib/widgets/status_dot.dart      |    4 
 lib/screens/chat_screen.dart     |  127 ++++++++++++-
 8 files changed, 141 insertions(+), 341 deletions(-)

diff --git a/lib/models/server_config.dart b/lib/models/server_config.dart
index 7963c66..14fa6df 100644
--- a/lib/models/server_config.dart
+++ b/lib/models/server_config.dart
@@ -13,23 +13,6 @@
     this.mqttToken,
   });
 
-  /// Primary WebSocket URL (local network).
-  String get localUrl {
-    final h = localHost ?? host;
-    return 'ws://$h:$port';
-  }
-
-  /// Fallback WebSocket URL (remote / public).
-  String get remoteUrl => 'ws://$host:$port';
-
-  /// Returns [localUrl, remoteUrl] for dual-connect attempts.
-  List<String> get urls {
-    if (localHost != null && localHost!.isNotEmpty && localHost != host) {
-      return [localUrl, remoteUrl];
-    }
-    return [remoteUrl];
-  }
-
   Map<String, dynamic> toJson() {
     return {
       'host': host,
diff --git a/lib/providers/providers.dart b/lib/providers/providers.dart
index 046f38f..b477237 100644
--- a/lib/providers/providers.dart
+++ b/lib/providers/providers.dart
@@ -8,7 +8,7 @@
 import '../models/server_config.dart';
 import '../models/session.dart';
 import '../services/message_store.dart';
-import '../services/websocket_service.dart' show ConnectionStatus;
+import '../services/mqtt_service.dart' show ConnectionStatus;
 
 // --- Enums ---
 
diff --git a/lib/screens/chat_screen.dart b/lib/screens/chat_screen.dart
index 4f43034..2b0c8ee 100644
--- a/lib/screens/chat_screen.dart
+++ b/lib/screens/chat_screen.dart
@@ -56,6 +56,9 @@
   bool _isCatchingUp = false;
   bool _screenshotForChat = false;
   final Set<int> _seenSeqs = {};
+  bool _sessionReady = false;
+  final List<Map<String, dynamic>> _pendingMessages = [];
+  final Map<String, List<Message>> _catchUpPending = {};
 
   @override
   void initState() {
@@ -66,9 +69,14 @@
   }
 
   Future<void> _initAll() async {
-    // Load lastSeq BEFORE connecting so catch_up sends the right value
+    // Load persisted state BEFORE connecting
     final prefs = await SharedPreferences.getInstance();
     _lastSeq = prefs.getInt('lastSeq') ?? 0;
+    // Restore last active session so catch_up routes to the right session
+    final savedSessionId = prefs.getString('activeSessionId');
+    if (savedSessionId != null && mounted) {
+      ref.read(activeSessionIdProvider.notifier).state = savedSessionId;
+    }
     if (!mounted) return;
 
     // Listen for playback state changes to reset play button UI
@@ -146,10 +154,11 @@
     };
     _ws!.onMessage = _handleMessage;
     _ws!.onOpen = () {
+      _sessionReady = false; // Gate messages until sessions arrive
+      _pendingMessages.clear();
       final activeId = ref.read(activeSessionIdProvider);
       _sendCommand('sync', activeId != null ? {'activeSessionId': activeId} : null);
-      // catch_up is still available during the transition period
-      _sendCommand('catch_up', {'lastSeq': _lastSeq});
+      // catch_up is sent after sessions arrive (in _handleSessions)
     };
     _ws!.onError = (error) {
       debugPrint('MQTT error: $error');
@@ -168,6 +177,14 @@
   }
 
   void _handleMessage(Map<String, dynamic> msg) {
+    final type = msg['type'] as String?;
+    // Sessions and catch_up always process immediately
+    // Content messages (text, voice, image) wait until session is ready
+    if (!_sessionReady && type != 'sessions' && type != 'catch_up' && type != 'status' && type != 'typing') {
+      _pendingMessages.add(msg);
+      return;
+    }
+
     // Track sequence numbers for catch_up protocol
     final seq = msg['seq'] as int?;
     if (seq != null) {
@@ -184,8 +201,6 @@
         _saveLastSeq();
       }
     }
-
-    final type = msg['type'] as String?;
 
     switch (type) {
       case 'sessions':
@@ -231,7 +246,8 @@
         if (sessionId != null) _incrementUnread(sessionId);
       case 'catch_up':
         final serverSeq = msg['serverSeq'] as int?;
-        if (serverSeq != null && serverSeq > _lastSeq) {
+        if (serverSeq != null) {
+          // Always sync to server's seq — if server restarted, its seq may be lower
           _lastSeq = serverSeq;
           _saveLastSeq();
         }
@@ -241,19 +257,91 @@
         final catchUpMsgs = msg['messages'] as List<dynamic>?;
         if (catchUpMsgs != null && catchUpMsgs.isNotEmpty) {
           _isCatchingUp = true;
+          final activeId = ref.read(activeSessionIdProvider);
           final existing = ref.read(messagesProvider);
           final existingContents = existing
               .where((m) => m.role == MessageRole.assistant)
               .map((m) => m.content)
               .toSet();
           for (final m in catchUpMsgs) {
-            final content = (m as Map<String, dynamic>)['content'] as String? ?? '';
-            // Skip if we already have this message locally
-            if (content.isNotEmpty && existingContents.contains(content)) continue;
-            _handleMessage(m);
-            if (content.isNotEmpty) existingContents.add(content);
+            final map = m as Map<String, dynamic>;
+            final msgType = map['type'] as String? ?? 'text';
+            final content = map['content'] as String? ?? map['transcript'] as String? ?? map['caption'] as String? ?? '';
+            final msgSessionId = map['sessionId'] as String?;
+            final imageData = map['imageBase64'] as String?;
+
+            // Skip empty text messages (images with no caption are OK)
+            if (content.isEmpty && imageData == null) continue;
+            // Dedup by content (skip images from dedup — they have unique msgIds)
+            if (imageData == null && content.isNotEmpty && existingContents.contains(content)) continue;
+
+            final Message message;
+            if (msgType == 'image' && imageData != null) {
+              message = Message.image(
+                role: MessageRole.assistant,
+                imageBase64: imageData,
+                content: content,
+                status: MessageStatus.sent,
+              );
+            } else {
+              message = Message.text(
+                role: MessageRole.assistant,
+                content: content,
+                status: MessageStatus.sent,
+              );
+            }
+
+            if (msgSessionId == null || msgSessionId == activeId) {
+              // Active session or no session: add directly to chat
+              ref.read(messagesProvider.notifier).addMessage(message);
+            } else {
+              // Different session: store + unread badge + toast
+              // Collect for batch storage below to avoid race condition
+              _catchUpPending.putIfAbsent(msgSessionId, () => []).add(message);
+              _incrementUnread(msgSessionId);
+            }
+            existingContents.add(content);
           }
           _isCatchingUp = false;
+          _scrollToBottom();
+          // Batch-store cross-session messages (sequential to avoid race condition)
+          if (_catchUpPending.isNotEmpty) {
+            final pending = Map<String, List<Message>>.from(_catchUpPending);
+            _catchUpPending.clear();
+            // Show one toast per session with message count
+            if (mounted) {
+              final sessions = ref.read(sessionsProvider);
+              for (final entry in pending.entries) {
+                final session = sessions.firstWhere(
+                  (s) => s.id == entry.key,
+                  orElse: () => Session(id: entry.key, index: 0, name: 'Unknown', type: 'claude'),
+                );
+                final count = entry.value.length;
+                final preview = count == 1
+                    ? entry.value.first.content
+                    : '$count messages';
+                ToastManager.show(
+                  context,
+                  sessionName: session.name,
+                  preview: preview.length > 100 ? '${preview.substring(0, 100)}...' : preview,
+                  onTap: () => _switchSession(entry.key),
+                );
+              }
+            }
+            () async {
+              for (final entry in pending.entries) {
+                final existing = await MessageStore.loadAll(entry.key);
+                MessageStore.save(entry.key, [...existing, ...entry.value]);
+                await MessageStore.flush();
+              }
+            }();
+          }
+          // Clear unread for active session
+          if (activeId != null) {
+            final counts = Map<String, int>.from(ref.read(unreadCountsProvider));
+            counts.remove(activeId);
+            ref.read(unreadCountsProvider.notifier).state = counts;
+          }
         }
       case 'pong':
         break; // heartbeat response, ignore
@@ -284,6 +372,22 @@
       );
       ref.read(activeSessionIdProvider.notifier).state = active.id;
       ref.read(messagesProvider.notifier).switchSession(active.id);
+      SharedPreferences.getInstance().then((p) => p.setString('activeSessionId', active.id));
+    }
+
+    // Session is ready — process any pending messages that arrived before sessions list
+    if (!_sessionReady) {
+      _sessionReady = true;
+      // Request catch_up now that session is set
+      _sendCommand('catch_up', {'lastSeq': _lastSeq});
+      // Drain messages that arrived before sessions list
+      if (_pendingMessages.isNotEmpty) {
+        final pending = List<Map<String, dynamic>>.from(_pendingMessages);
+        _pendingMessages.clear();
+        for (final m in pending) {
+          _handleMessage(m);
+        }
+      }
     }
   }
 
@@ -507,6 +611,7 @@
 
     ref.read(activeSessionIdProvider.notifier).state = sessionId;
     await ref.read(messagesProvider.notifier).switchSession(sessionId);
+    SharedPreferences.getInstance().then((p) => p.setString('activeSessionId', sessionId));
 
     final counts = Map<String, int>.from(ref.read(unreadCountsProvider));
     counts.remove(sessionId);
diff --git a/lib/screens/navigate_screen.dart b/lib/screens/navigate_screen.dart
index 9ffa4cf..b0cfc88 100644
--- a/lib/screens/navigate_screen.dart
+++ b/lib/screens/navigate_screen.dart
@@ -192,20 +192,11 @@
   void _sendKey(String key) {
     _haptic();
 
-    // Send via WebSocket - the chat screen's WS is in the provider
-    // We need to access the WS through the provider system
-    // For now, send a nav command message
+    // Send via MQTT - the chat screen's MQTT service is in the provider
     final activeSessionId = ref.read(activeSessionIdProvider);
 
-    // Build the navigate command
-    // This sends a key press to the AIBroker daemon
-    // which forwards it to the active terminal session
-    // The WS is managed by ChatScreen, so we'll use a message approach
-
-    // Since we can't directly access the WS from here,
-    // we send through the provider approach - the message will be picked up
-    // by the WS service in ChatScreen via a shared notification mechanism.
-    // For simplicity, we use a global event bus pattern.
+    // Send a key press to the AIBroker daemon via the MQTT service.
+    // NavigateNotifier bridges the navigate screen to the chat screen's MQTT service.
 
     NavigateNotifier.instance?.sendKey(key, activeSessionId);
 
@@ -228,8 +219,8 @@
   }
 }
 
-/// Global notifier to bridge navigate screen to WebSocket.
-/// Set by ChatScreen when WS is initialized.
+/// Global notifier to bridge navigate screen to MQTT service.
+/// Set by ChatScreen when MQTT is initialized.
 class NavigateNotifier {
   static NavigateNotifier? instance;
 
diff --git a/lib/screens/settings_screen.dart b/lib/screens/settings_screen.dart
index ed129f8..0cfb0fc 100644
--- a/lib/screens/settings_screen.dart
+++ b/lib/screens/settings_screen.dart
@@ -3,7 +3,7 @@
 
 import '../models/server_config.dart';
 import '../providers/providers.dart';
-import '../services/websocket_service.dart' show ConnectionStatus;
+import '../services/mqtt_service.dart' show ConnectionStatus;
 import '../services/wol_service.dart';
 import '../theme/app_theme.dart';
 import '../widgets/status_dot.dart';
diff --git a/lib/services/mqtt_service.dart b/lib/services/mqtt_service.dart
index 32e6fba..f7a51be 100644
--- a/lib/services/mqtt_service.dart
+++ b/lib/services/mqtt_service.dart
@@ -10,8 +10,15 @@
 import 'package:uuid/uuid.dart';
 
 import '../models/server_config.dart';
-import 'websocket_service.dart' show ConnectionStatus;
 import 'wol_service.dart';
+
+/// Connection status for the MQTT client.
+enum ConnectionStatus {
+  disconnected,
+  connecting,
+  connected,
+  reconnecting,
+}
 
 // Debug log to file (survives release builds)
 Future<void> _mqttLog(String msg) async {
@@ -23,11 +30,11 @@
   } catch (_) {}
 }
 
-/// MQTT client for PAILot, replacing WebSocketService.
+/// MQTT client for PAILot.
 ///
 /// Connects to the AIBroker daemon's embedded aedes broker.
 /// Subscribes to all pailot/ topics and dispatches messages
-/// through the same callback interface as WebSocketService.
+/// through the onMessage callback interface.
 class MqttService with WidgetsBindingObserver {
   MqttService({required this.config});
 
@@ -43,7 +50,7 @@
   final List<String> _seenMsgIdOrder = [];
   static const int _maxSeenIds = 500;
 
-  // Callbacks — same interface as WebSocketService
+  // Callbacks
   void Function(ConnectionStatus status)? onStatusChanged;
   void Function(Map<String, dynamic> message)? onMessage;
   void Function()? onOpen;
@@ -149,9 +156,12 @@
       client.onAutoReconnect = _onAutoReconnect;
       client.onAutoReconnected = _onAutoReconnected;
 
-      // Persistent session: broker queues QoS 1 messages while client is offline
+      // Clean session: we handle offline delivery ourselves via catch_up protocol.
+      // Persistent sessions cause the broker to flood all queued QoS 1 messages
+      // on reconnect, which overwhelms the client with large voice payloads.
       final connMessage = MqttConnectMessage()
           .withClientIdentifier(clientId)
+          .startClean()
           .authenticateAs('pailot', config.mqttToken ?? '');
 
       client.connectionMessage = connMessage;
@@ -268,7 +278,7 @@
 
   /// Route incoming MQTT messages to the onMessage callback.
   /// Translates MQTT topic structure into the flat message format
-  /// that chat_screen expects (same as WebSocket messages).
+  /// that chat_screen expects.
   void _dispatchMessage(String topic, Map<String, dynamic> json) {
     final parts = topic.split('/');
 
@@ -369,7 +379,6 @@
   }
 
   /// Send a message — routes to the appropriate MQTT topic based on content.
-  /// Accepts the same message format as WebSocketService.send().
   void send(Map<String, dynamic> message) {
     final type = message['type'] as String?;
     final sessionId = message['sessionId'] as String?;
diff --git a/lib/services/websocket_service.dart b/lib/services/websocket_service.dart
deleted file mode 100644
index 96e21fa..0000000
--- a/lib/services/websocket_service.dart
+++ /dev/null
@@ -1,288 +0,0 @@
-import 'dart:async';
-import 'dart:convert';
-
-import 'package:flutter/widgets.dart';
-import 'package:web_socket_channel/web_socket_channel.dart';
-
-import '../models/server_config.dart';
-import 'wol_service.dart';
-
-enum ConnectionStatus {
-  disconnected,
-  connecting,
-  connected,
-  reconnecting,
-}
-
-/// WebSocket client with dual-URL fallback, heartbeat, and auto-reconnect.
-class WebSocketService with WidgetsBindingObserver {
-  WebSocketService({required this.config});
-
-  ServerConfig config;
-  WebSocketChannel? _channel;
-  ConnectionStatus _status = ConnectionStatus.disconnected;
-  Timer? _heartbeatTimer;
-  Timer? _zombieTimer;
-  Timer? _reconnectTimer;
-  int _reconnectAttempt = 0;
-  bool _intentionalClose = false;
-  DateTime? _lastPong;
-  StreamSubscription? _subscription;
-
-  // Callbacks
-  void Function()? onOpen;
-  void Function()? onClose;
-  void Function()? onReconnecting;
-  void Function(Map<String, dynamic> message)? onMessage;
-  void Function(String error)? onError;
-  void Function(ConnectionStatus status)? onStatusChanged;
-
-  ConnectionStatus get status => _status;
-  bool get isConnected => _status == ConnectionStatus.connected;
-
-  void _setStatus(ConnectionStatus newStatus) {
-    if (_status == newStatus) return;
-    _status = newStatus;
-    onStatusChanged?.call(newStatus);
-  }
-
-  /// Connect to the WebSocket server.
-  /// Tries local URL first (2.5s timeout), then remote URL.
-  Future<void> connect() async {
-    if (_status == ConnectionStatus.connected ||
-        _status == ConnectionStatus.connecting) {
-      return;
-    }
-
-    _intentionalClose = false;
-    _setStatus(ConnectionStatus.connecting);
-
-    // Send Wake-on-LAN if MAC configured
-    if (config.macAddress != null && config.macAddress!.isNotEmpty) {
-      try {
-        await WolService.wake(config.macAddress!, localHost: config.localHost);
-      } catch (_) {}
-    }
-
-    final urls = config.urls;
-
-    for (final url in urls) {
-      if (_intentionalClose) return;
-
-      try {
-        final connected = await _tryConnect(url,
-            timeout: url == urls.first && urls.length > 1
-                ? const Duration(milliseconds: 2500)
-                : const Duration(seconds: 5));
-        if (connected) return;
-      } catch (_) {
-        continue;
-      }
-    }
-
-    // All URLs failed
-    _setStatus(ConnectionStatus.disconnected);
-    onError?.call('Failed to connect to server');
-    _scheduleReconnect();
-  }
-
-  Future<bool> _tryConnect(String url, {Duration? timeout}) async {
-    try {
-      final uri = Uri.parse(url);
-      final channel = WebSocketChannel.connect(uri);
-
-      // Wait for connection with timeout
-      await channel.ready.timeout(
-        timeout ?? const Duration(seconds: 5),
-        onTimeout: () {
-          channel.sink.close();
-          throw TimeoutException('Connection timeout');
-        },
-      );
-
-      _channel = channel;
-      _reconnectAttempt = 0;
-      _setStatus(ConnectionStatus.connected);
-      _startHeartbeat();
-      _listenMessages();
-      onOpen?.call();
-      return true;
-    } catch (e) {
-      return false;
-    }
-  }
-
-  void _listenMessages() {
-    _subscription?.cancel();
-    _subscription = _channel?.stream.listen(
-      (data) {
-        _lastPong = DateTime.now();
-
-        if (data is String) {
-          // Handle pong
-          if (data == 'pong') return;
-
-          try {
-            final json = jsonDecode(data) as Map<String, dynamic>;
-            onMessage?.call(json);
-          } catch (_) {
-            // Non-JSON message, ignore
-          }
-        }
-      },
-      onError: (error) {
-        onError?.call(error.toString());
-        _handleDisconnect();
-      },
-      onDone: () {
-        _handleDisconnect();
-      },
-    );
-  }
-
-  void _startHeartbeat() {
-    _heartbeatTimer?.cancel();
-    _zombieTimer?.cancel();
-    _lastPong = DateTime.now();
-
-    // Send ping every 30 seconds
-    _heartbeatTimer = Timer.periodic(const Duration(seconds: 30), (_) {
-      if (_channel != null && _status == ConnectionStatus.connected) {
-        try {
-          _channel!.sink.add(jsonEncode({'type': 'ping'}));
-        } catch (_) {
-          _handleDisconnect();
-        }
-      }
-    });
-
-    // Check for zombie connection every 15 seconds
-    _zombieTimer = Timer.periodic(const Duration(seconds: 15), (_) {
-      if (_lastPong != null) {
-        final elapsed = DateTime.now().difference(_lastPong!);
-        if (elapsed.inSeconds > 60) {
-          _handleDisconnect();
-        }
-      }
-    });
-  }
-
-  void _handleDisconnect() {
-    _stopHeartbeat();
-    _subscription?.cancel();
-
-    final wasConnected = _status == ConnectionStatus.connected;
-
-    try {
-      _channel?.sink.close();
-    } catch (_) {}
-    _channel = null;
-
-    if (_intentionalClose) {
-      _setStatus(ConnectionStatus.disconnected);
-      onClose?.call();
-    } else if (wasConnected) {
-      _setStatus(ConnectionStatus.reconnecting);
-      onReconnecting?.call();
-      _scheduleReconnect();
-    }
-  }
-
-  void _stopHeartbeat() {
-    _heartbeatTimer?.cancel();
-    _zombieTimer?.cancel();
-    _heartbeatTimer = null;
-    _zombieTimer = null;
-  }
-
-  void _scheduleReconnect() {
-    if (_intentionalClose) return;
-
-    _reconnectTimer?.cancel();
-
-    // Exponential backoff: 1s, 2s, 4s, 8s, 16s, 30s max
-    final delay = Duration(
-      milliseconds: (1000 * (1 << _reconnectAttempt.clamp(0, 4)))
-          .clamp(1000, 30000),
-    );
-
-    _reconnectAttempt++;
-
-    _reconnectTimer = Timer(delay, () {
-      if (!_intentionalClose) {
-        _setStatus(ConnectionStatus.reconnecting);
-        onReconnecting?.call();
-        connect();
-      }
-    });
-  }
-
-  /// Send a JSON message.
-  void send(Map<String, dynamic> message) {
-    if (_channel == null || _status != ConnectionStatus.connected) {
-      onError?.call('Not connected');
-      return;
-    }
-
-    try {
-      _channel!.sink.add(jsonEncode(message));
-    } catch (e) {
-      onError?.call('Send failed: $e');
-    }
-  }
-
-  /// Send a raw string.
-  void sendRaw(String data) {
-    if (_channel == null || _status != ConnectionStatus.connected) return;
-    try {
-      _channel!.sink.add(data);
-    } catch (_) {}
-  }
-
-  /// Disconnect intentionally.
-  void disconnect() {
-    _intentionalClose = true;
-    _reconnectTimer?.cancel();
-    _stopHeartbeat();
-    _subscription?.cancel();
-
-    try {
-      _channel?.sink.close();
-    } catch (_) {}
-    _channel = null;
-
-    _setStatus(ConnectionStatus.disconnected);
-    onClose?.call();
-  }
-
-  /// Update config and reconnect.
-  Future<void> updateConfig(ServerConfig newConfig) async {
-    config = newConfig;
-    disconnect();
-    await Future.delayed(const Duration(milliseconds: 100));
-    await connect();
-  }
-
-  /// Dispose all resources.
-  void dispose() {
-    disconnect();
-    _reconnectTimer?.cancel();
-  }
-
-  // App lifecycle integration
-  @override
-  void didChangeAppLifecycleState(AppLifecycleState state) {
-    switch (state) {
-      case AppLifecycleState.resumed:
-        if (_status != ConnectionStatus.connected && !_intentionalClose) {
-          _reconnectAttempt = 0;
-          connect();
-        }
-      case AppLifecycleState.paused:
-        // Keep connection alive but don't reconnect aggressively
-        break;
-      default:
-        break;
-    }
-  }
-}
diff --git a/lib/widgets/status_dot.dart b/lib/widgets/status_dot.dart
index bb4f631..94023e0 100644
--- a/lib/widgets/status_dot.dart
+++ b/lib/widgets/status_dot.dart
@@ -1,9 +1,9 @@
 import 'package:flutter/material.dart';
 
-import '../services/websocket_service.dart';
+import '../services/mqtt_service.dart';
 import '../theme/app_theme.dart';
 
-/// 10px circle indicating WebSocket connection status.
+/// 10px circle indicating MQTT connection status.
 class StatusDot extends StatelessWidget {
   final ConnectionStatus status;
 

--
Gitblit v1.3.1