From cb470d33d2665fcc6f8448d2736777656cf0cbe7 Mon Sep 17 00:00:00 2001
From: Matthias Nott <mnott@mnsoft.org>
Date: Tue, 24 Mar 2026 00:25:07 +0100
Subject: [PATCH] feat: MQTT migration, offline catch_up, clean session, image support

---
 lib/screens/chat_screen.dart |  127 ++++++++++++++++++++++++++++++++++++++---
 1 files changed, 116 insertions(+), 11 deletions(-)

diff --git a/lib/screens/chat_screen.dart b/lib/screens/chat_screen.dart
index 4f43034..2b0c8ee 100644
--- a/lib/screens/chat_screen.dart
+++ b/lib/screens/chat_screen.dart
@@ -56,6 +56,9 @@
   bool _isCatchingUp = false;
   bool _screenshotForChat = false;
   final Set<int> _seenSeqs = {};
+  bool _sessionReady = false;
+  final List<Map<String, dynamic>> _pendingMessages = [];
+  final Map<String, List<Message>> _catchUpPending = {};
 
   @override
   void initState() {
@@ -66,9 +69,14 @@
   }
 
   Future<void> _initAll() async {
-    // Load lastSeq BEFORE connecting so catch_up sends the right value
+    // Load persisted state BEFORE connecting
     final prefs = await SharedPreferences.getInstance();
     _lastSeq = prefs.getInt('lastSeq') ?? 0;
+    // Restore last active session so catch_up routes to the right session
+    final savedSessionId = prefs.getString('activeSessionId');
+    if (savedSessionId != null && mounted) {
+      ref.read(activeSessionIdProvider.notifier).state = savedSessionId;
+    }
     if (!mounted) return;
 
     // Listen for playback state changes to reset play button UI
@@ -146,10 +154,11 @@
     };
     _ws!.onMessage = _handleMessage;
     _ws!.onOpen = () {
+      _sessionReady = false; // Gate messages until sessions arrive
+      _pendingMessages.clear();
       final activeId = ref.read(activeSessionIdProvider);
       _sendCommand('sync', activeId != null ? {'activeSessionId': activeId} : null);
-      // catch_up is still available during the transition period
-      _sendCommand('catch_up', {'lastSeq': _lastSeq});
+      // catch_up is sent after sessions arrive (in _handleSessions)
     };
     _ws!.onError = (error) {
       debugPrint('MQTT error: $error');
@@ -168,6 +177,14 @@
   }
 
   void _handleMessage(Map<String, dynamic> msg) {
+    final type = msg['type'] as String?;
+    // Sessions and catch_up always process immediately
+    // Content messages (text, voice, image) wait until session is ready
+    if (!_sessionReady && type != 'sessions' && type != 'catch_up' && type != 'status' && type != 'typing') {
+      _pendingMessages.add(msg);
+      return;
+    }
+
     // Track sequence numbers for catch_up protocol
     final seq = msg['seq'] as int?;
     if (seq != null) {
@@ -184,8 +201,6 @@
         _saveLastSeq();
       }
     }
-
-    final type = msg['type'] as String?;
 
     switch (type) {
       case 'sessions':
@@ -231,7 +246,8 @@
         if (sessionId != null) _incrementUnread(sessionId);
       case 'catch_up':
         final serverSeq = msg['serverSeq'] as int?;
-        if (serverSeq != null && serverSeq > _lastSeq) {
+        if (serverSeq != null) {
+          // Always sync to server's seq — if server restarted, its seq may be lower
           _lastSeq = serverSeq;
           _saveLastSeq();
         }
@@ -241,19 +257,91 @@
         final catchUpMsgs = msg['messages'] as List<dynamic>?;
         if (catchUpMsgs != null && catchUpMsgs.isNotEmpty) {
           _isCatchingUp = true;
+          final activeId = ref.read(activeSessionIdProvider);
           final existing = ref.read(messagesProvider);
           final existingContents = existing
               .where((m) => m.role == MessageRole.assistant)
               .map((m) => m.content)
               .toSet();
           for (final m in catchUpMsgs) {
-            final content = (m as Map<String, dynamic>)['content'] as String? ?? '';
-            // Skip if we already have this message locally
-            if (content.isNotEmpty && existingContents.contains(content)) continue;
-            _handleMessage(m);
-            if (content.isNotEmpty) existingContents.add(content);
+            final map = m as Map<String, dynamic>;
+            final msgType = map['type'] as String? ?? 'text';
+            final content = map['content'] as String? ?? map['transcript'] as String? ?? map['caption'] as String? ?? '';
+            final msgSessionId = map['sessionId'] as String?;
+            final imageData = map['imageBase64'] as String?;
+
+            // Skip empty text messages (images with no caption are OK)
+            if (content.isEmpty && imageData == null) continue;
+            // Dedup by content (skip images from dedup — they have unique msgIds)
+            if (imageData == null && content.isNotEmpty && existingContents.contains(content)) continue;
+
+            final Message message;
+            if (msgType == 'image' && imageData != null) {
+              message = Message.image(
+                role: MessageRole.assistant,
+                imageBase64: imageData,
+                content: content,
+                status: MessageStatus.sent,
+              );
+            } else {
+              message = Message.text(
+                role: MessageRole.assistant,
+                content: content,
+                status: MessageStatus.sent,
+              );
+            }
+
+            if (msgSessionId == null || msgSessionId == activeId) {
+              // Active session or no session: add directly to chat
+              ref.read(messagesProvider.notifier).addMessage(message);
+            } else {
+              // Different session: store + unread badge + toast
+              // Collect for batch storage below to avoid race condition
+              _catchUpPending.putIfAbsent(msgSessionId, () => []).add(message);
+              _incrementUnread(msgSessionId);
+            }
+            existingContents.add(content);
           }
           _isCatchingUp = false;
+          _scrollToBottom();
+          // Batch-store cross-session messages (sequential to avoid race condition)
+          if (_catchUpPending.isNotEmpty) {
+            final pending = Map<String, List<Message>>.from(_catchUpPending);
+            _catchUpPending.clear();
+            // Show one toast per session with message count
+            if (mounted) {
+              final sessions = ref.read(sessionsProvider);
+              for (final entry in pending.entries) {
+                final session = sessions.firstWhere(
+                  (s) => s.id == entry.key,
+                  orElse: () => Session(id: entry.key, index: 0, name: 'Unknown', type: 'claude'),
+                );
+                final count = entry.value.length;
+                final preview = count == 1
+                    ? entry.value.first.content
+                    : '$count messages';
+                ToastManager.show(
+                  context,
+                  sessionName: session.name,
+                  preview: preview.length > 100 ? '${preview.substring(0, 100)}...' : preview,
+                  onTap: () => _switchSession(entry.key),
+                );
+              }
+            }
+            () async {
+              for (final entry in pending.entries) {
+                final existing = await MessageStore.loadAll(entry.key);
+                MessageStore.save(entry.key, [...existing, ...entry.value]);
+                await MessageStore.flush();
+              }
+            }();
+          }
+          // Clear unread for active session
+          if (activeId != null) {
+            final counts = Map<String, int>.from(ref.read(unreadCountsProvider));
+            counts.remove(activeId);
+            ref.read(unreadCountsProvider.notifier).state = counts;
+          }
         }
       case 'pong':
         break; // heartbeat response, ignore
@@ -284,6 +372,22 @@
       );
       ref.read(activeSessionIdProvider.notifier).state = active.id;
       ref.read(messagesProvider.notifier).switchSession(active.id);
+      SharedPreferences.getInstance().then((p) => p.setString('activeSessionId', active.id));
+    }
+
+    // Session is ready — process any pending messages that arrived before sessions list
+    if (!_sessionReady) {
+      _sessionReady = true;
+      // Request catch_up now that session is set
+      _sendCommand('catch_up', {'lastSeq': _lastSeq});
+      // Drain messages that arrived before sessions list
+      if (_pendingMessages.isNotEmpty) {
+        final pending = List<Map<String, dynamic>>.from(_pendingMessages);
+        _pendingMessages.clear();
+        for (final m in pending) {
+          _handleMessage(m);
+        }
+      }
     }
   }
 
@@ -507,6 +611,7 @@
 
     ref.read(activeSessionIdProvider.notifier).state = sessionId;
     await ref.read(messagesProvider.notifier).switchSession(sessionId);
+    SharedPreferences.getInstance().then((p) => p.setString('activeSessionId', sessionId));
 
     final counts = Map<String, int>.from(ref.read(unreadCountsProvider));
     counts.remove(sessionId);

--
Gitblit v1.3.1