From cb470d33d2665fcc6f8448d2736777656cf0cbe7 Mon Sep 17 00:00:00 2001
From: Matthias Nott <mnott@mnsoft.org>
Date: Tue, 24 Mar 2026 00:25:07 +0100
Subject: [PATCH] feat: MQTT migration, offline catch_up, clean session, image support
---
lib/screens/chat_screen.dart | 127 ++++++++++++++++++++++++++++++++++++++---
1 files changed, 116 insertions(+), 11 deletions(-)
diff --git a/lib/screens/chat_screen.dart b/lib/screens/chat_screen.dart
index 4f43034..2b0c8ee 100644
--- a/lib/screens/chat_screen.dart
+++ b/lib/screens/chat_screen.dart
@@ -56,6 +56,9 @@
bool _isCatchingUp = false;
bool _screenshotForChat = false;
final Set<int> _seenSeqs = {};
+ bool _sessionReady = false;
+ final List<Map<String, dynamic>> _pendingMessages = [];
+ final Map<String, List<Message>> _catchUpPending = {};
@override
void initState() {
@@ -66,9 +69,14 @@
}
Future<void> _initAll() async {
- // Load lastSeq BEFORE connecting so catch_up sends the right value
+ // Load persisted state BEFORE connecting
final prefs = await SharedPreferences.getInstance();
_lastSeq = prefs.getInt('lastSeq') ?? 0;
+ // Restore last active session so catch_up routes to the right session
+ final savedSessionId = prefs.getString('activeSessionId');
+ if (savedSessionId != null && mounted) {
+ ref.read(activeSessionIdProvider.notifier).state = savedSessionId;
+ }
if (!mounted) return;
// Listen for playback state changes to reset play button UI
@@ -146,10 +154,11 @@
};
_ws!.onMessage = _handleMessage;
_ws!.onOpen = () {
+ _sessionReady = false; // Gate messages until sessions arrive
+ _pendingMessages.clear();
final activeId = ref.read(activeSessionIdProvider);
_sendCommand('sync', activeId != null ? {'activeSessionId': activeId} : null);
- // catch_up is still available during the transition period
- _sendCommand('catch_up', {'lastSeq': _lastSeq});
+ // catch_up is sent after sessions arrive (in _handleSessions)
};
_ws!.onError = (error) {
debugPrint('MQTT error: $error');
@@ -168,6 +177,14 @@
}
void _handleMessage(Map<String, dynamic> msg) {
+ final type = msg['type'] as String?;
+ // Sessions and catch_up always process immediately
+ // Content messages (text, voice, image) wait until session is ready
+ if (!_sessionReady && type != 'sessions' && type != 'catch_up' && type != 'status' && type != 'typing') {
+ _pendingMessages.add(msg);
+ return;
+ }
+
// Track sequence numbers for catch_up protocol
final seq = msg['seq'] as int?;
if (seq != null) {
@@ -184,8 +201,6 @@
_saveLastSeq();
}
}
-
- final type = msg['type'] as String?;
switch (type) {
case 'sessions':
@@ -231,7 +246,8 @@
if (sessionId != null) _incrementUnread(sessionId);
case 'catch_up':
final serverSeq = msg['serverSeq'] as int?;
- if (serverSeq != null && serverSeq > _lastSeq) {
+ if (serverSeq != null) {
+ // Always sync to server's seq — if server restarted, its seq may be lower
_lastSeq = serverSeq;
_saveLastSeq();
}
@@ -241,19 +257,91 @@
final catchUpMsgs = msg['messages'] as List<dynamic>?;
if (catchUpMsgs != null && catchUpMsgs.isNotEmpty) {
_isCatchingUp = true;
+ final activeId = ref.read(activeSessionIdProvider);
final existing = ref.read(messagesProvider);
final existingContents = existing
.where((m) => m.role == MessageRole.assistant)
.map((m) => m.content)
.toSet();
for (final m in catchUpMsgs) {
- final content = (m as Map<String, dynamic>)['content'] as String? ?? '';
- // Skip if we already have this message locally
- if (content.isNotEmpty && existingContents.contains(content)) continue;
- _handleMessage(m);
- if (content.isNotEmpty) existingContents.add(content);
+ final map = m as Map<String, dynamic>;
+ final msgType = map['type'] as String? ?? 'text';
+ final content = map['content'] as String? ?? map['transcript'] as String? ?? map['caption'] as String? ?? '';
+ final msgSessionId = map['sessionId'] as String?;
+ final imageData = map['imageBase64'] as String?;
+
+ // Skip empty text messages (images with no caption are OK)
+ if (content.isEmpty && imageData == null) continue;
+ // Dedup by content (skip images from dedup — they have unique msgIds)
+ if (imageData == null && content.isNotEmpty && existingContents.contains(content)) continue;
+
+ final Message message;
+ if (msgType == 'image' && imageData != null) {
+ message = Message.image(
+ role: MessageRole.assistant,
+ imageBase64: imageData,
+ content: content,
+ status: MessageStatus.sent,
+ );
+ } else {
+ message = Message.text(
+ role: MessageRole.assistant,
+ content: content,
+ status: MessageStatus.sent,
+ );
+ }
+
+ if (msgSessionId == null || msgSessionId == activeId) {
+ // Active session or no session: add directly to chat
+ ref.read(messagesProvider.notifier).addMessage(message);
+ } else {
+ // Different session: store + unread badge + toast
+ // Collect for batch storage below to avoid race condition
+ _catchUpPending.putIfAbsent(msgSessionId, () => []).add(message);
+ _incrementUnread(msgSessionId);
+ }
+ existingContents.add(content);
}
_isCatchingUp = false;
+ _scrollToBottom();
+ // Batch-store cross-session messages (sequential to avoid race condition)
+ if (_catchUpPending.isNotEmpty) {
+ final pending = Map<String, List<Message>>.from(_catchUpPending);
+ _catchUpPending.clear();
+ // Show one toast per session with message count
+ if (mounted) {
+ final sessions = ref.read(sessionsProvider);
+ for (final entry in pending.entries) {
+ final session = sessions.firstWhere(
+ (s) => s.id == entry.key,
+ orElse: () => Session(id: entry.key, index: 0, name: 'Unknown', type: 'claude'),
+ );
+ final count = entry.value.length;
+ final preview = count == 1
+ ? entry.value.first.content
+ : '$count messages';
+ ToastManager.show(
+ context,
+ sessionName: session.name,
+ preview: preview.length > 100 ? '${preview.substring(0, 100)}...' : preview,
+ onTap: () => _switchSession(entry.key),
+ );
+ }
+ }
+ () async {
+ for (final entry in pending.entries) {
+ final existing = await MessageStore.loadAll(entry.key);
+ MessageStore.save(entry.key, [...existing, ...entry.value]);
+ await MessageStore.flush();
+ }
+ }();
+ }
+ // Clear unread for active session
+ if (activeId != null) {
+ final counts = Map<String, int>.from(ref.read(unreadCountsProvider));
+ counts.remove(activeId);
+ ref.read(unreadCountsProvider.notifier).state = counts;
+ }
}
case 'pong':
break; // heartbeat response, ignore
@@ -284,6 +372,22 @@
);
ref.read(activeSessionIdProvider.notifier).state = active.id;
ref.read(messagesProvider.notifier).switchSession(active.id);
+ SharedPreferences.getInstance().then((p) => p.setString('activeSessionId', active.id));
+ }
+
+ // Session is ready — process any pending messages that arrived before sessions list
+ if (!_sessionReady) {
+ _sessionReady = true;
+ // Request catch_up now that session is set
+ _sendCommand('catch_up', {'lastSeq': _lastSeq});
+ // Drain messages that arrived before sessions list
+ if (_pendingMessages.isNotEmpty) {
+ final pending = List<Map<String, dynamic>>.from(_pendingMessages);
+ _pendingMessages.clear();
+ for (final m in pending) {
+ _handleMessage(m);
+ }
+ }
}
}
@@ -507,6 +611,7 @@
ref.read(activeSessionIdProvider.notifier).state = sessionId;
await ref.read(messagesProvider.notifier).switchSession(sessionId);
+ SharedPreferences.getInstance().then((p) => p.setString('activeSessionId', sessionId));
final counts = Map<String, int>.from(ref.read(unreadCountsProvider));
counts.remove(sessionId);
--
Gitblit v1.3.1