♻️ Refactored the message repository logic

This commit is contained in:
2025-08-16 23:07:21 +08:00
parent 67d130dc34
commit 007b46b080
4 changed files with 434 additions and 626 deletions

View File

@@ -1,6 +1,7 @@
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:dio/dio.dart';
import 'package:easy_localization/easy_localization.dart';
import 'package:flutter/foundation.dart';
import 'package:flutter/material.dart';
@@ -10,14 +11,15 @@ import 'package:flutter_hooks/flutter_hooks.dart';
import 'package:gap/gap.dart';
import 'package:hooks_riverpod/hooks_riverpod.dart';
import 'package:image_picker/image_picker.dart';
import 'package:island/database/drift_db.dart';
import 'package:island/database/message.dart';
import 'package:island/database/message_repository.dart';
import 'package:island/models/chat.dart';
import 'package:island/models/file.dart';
import 'package:island/pods/config.dart';
import 'package:island/pods/database.dart';
import 'package:island/pods/network.dart';
import 'package:island/pods/websocket.dart';
import 'package:island/services/file.dart';
import 'package:island/services/responsive.dart';
import 'package:island/widgets/alert.dart';
import 'package:island/widgets/app_scaffold.dart';
@@ -39,17 +41,44 @@ import 'package:island/widgets/stickers/picker.dart';
part 'room.g.dart';
final messageRepositoryProvider =
FutureProvider.family<MessageRepository, String>((ref, roomId) async {
final room = await ref.watch(chatroomProvider(roomId).future);
final identity = await ref.watch(chatroomIdentityProvider(roomId).future);
final apiClient = ref.watch(apiClientProvider);
final database = ref.watch(databaseProvider);
return MessageRepository(room!, identity!, apiClient, database);
});
final appLifecycleStateProvider = StreamProvider<AppLifecycleState>((ref) {
final controller = StreamController<AppLifecycleState>();
final observer = _AppLifecycleObserver((state) {
if (controller.isClosed) return;
controller.add(state);
});
WidgetsBinding.instance.addObserver(observer);
ref.onDispose(() {
WidgetsBinding.instance.removeObserver(observer);
controller.close();
});
return controller.stream;
});
class _AppLifecycleObserver extends WidgetsBindingObserver {
final ValueChanged<AppLifecycleState> onChange;
_AppLifecycleObserver(this.onChange);
@override
void didChangeAppLifecycleState(AppLifecycleState state) {
onChange(state);
}
}
@riverpod
class MessagesNotifier extends _$MessagesNotifier {
late final Dio _apiClient;
late final AppDatabase _database;
late final SnChatRoom _room;
late final SnChatMember _identity;
final Map<String, LocalChatMessage> _pendingMessages = {};
final Map<String, Map<int, double>> _fileUploadProgress = {};
int? _totalCount;
late final String _roomId;
int _currentPage = 0;
static const int _pageSize = 20;
@@ -58,16 +87,183 @@ class MessagesNotifier extends _$MessagesNotifier {
@override
FutureOr<List<LocalChatMessage>> build(String roomId) async {
_roomId = roomId;
_apiClient = ref.watch(apiClientProvider);
_database = ref.watch(databaseProvider);
final room = await ref.watch(chatroomProvider(roomId).future);
final identity = await ref.watch(chatroomIdentityProvider(roomId).future);
if (room == null || identity == null) {
throw Exception('Room or identity not found');
}
_room = room;
_identity = identity;
ref.listen(appLifecycleStateProvider, (_, next) {
if (next.hasValue && next.value == AppLifecycleState.resumed) {
syncMessages();
}
});
return await loadInitial();
}
Future<List<LocalChatMessage>> _getCachedMessages({
int offset = 0,
int take = 20,
}) async {
final dbMessages = await _database.getMessagesForRoom(
_roomId,
offset: offset,
limit: take,
);
final dbLocalMessages =
dbMessages.map(_database.companionToMessage).toList();
if (offset == 0) {
final pendingForRoom =
_pendingMessages.values
.where((msg) => msg.roomId == _roomId)
.toList();
final allMessages = [...pendingForRoom, ...dbLocalMessages];
allMessages.sort((a, b) => b.createdAt.compareTo(a.createdAt));
final uniqueMessages = <LocalChatMessage>[];
final seenIds = <String>{};
for (final message in allMessages) {
if (seenIds.add(message.id)) {
uniqueMessages.add(message);
}
}
return uniqueMessages;
}
return dbLocalMessages;
}
Future<List<LocalChatMessage>> _fetchAndCacheMessages({
int offset = 0,
int take = 20,
}) async {
if (_totalCount == null) {
final response = await _apiClient.get(
'/sphere/chat/$_roomId/messages',
queryParameters: {'offset': 0, 'take': 1},
);
_totalCount = int.parse(response.headers['x-total']?.firstOrNull ?? '0');
}
if (offset >= _totalCount!) {
return [];
}
final response = await _apiClient.get(
'/sphere/chat/$_roomId/messages',
queryParameters: {'offset': offset, 'take': take},
);
final List<dynamic> data = response.data;
_totalCount = int.parse(response.headers['x-total']?.firstOrNull ?? '0');
final messages =
data.map((json) {
final remoteMessage = SnChatMessage.fromJson(json);
return LocalChatMessage.fromRemoteMessage(
remoteMessage,
MessageStatus.sent,
);
}).toList();
for (final message in messages) {
await _database.saveMessage(_database.messageToCompanion(message));
if (message.nonce != null) {
_pendingMessages.removeWhere(
(_, pendingMsg) => pendingMsg.nonce == message.nonce,
);
}
}
return messages;
}
Future<bool> syncMessages() async {
final dbMessages = await _database.getMessagesForRoom(
_room.id,
offset: 0,
limit: 1,
);
final lastMessage =
dbMessages.isEmpty
? null
: _database.companionToMessage(dbMessages.first);
if (lastMessage == null) return false;
try {
final resp = await _apiClient.post(
'/sphere/chat/${_room.id}/sync',
data: {
'last_sync_timestamp':
lastMessage.toRemoteMessage().updatedAt.millisecondsSinceEpoch,
},
);
final response = MessageSyncResponse.fromJson(resp.data);
for (final change in response.changes) {
switch (change.action) {
case MessageChangeAction.create:
await receiveMessage(change.message!);
break;
case MessageChangeAction.update:
await receiveMessageUpdate(change.message!);
break;
case MessageChangeAction.delete:
await receiveMessageDeletion(change.messageId.toString());
break;
}
}
} catch (err) {
showErrorAlert(err);
}
return true;
}
Future<List<LocalChatMessage>> listMessages({
int offset = 0,
int take = 20,
bool synced = false,
}) async {
try {
if (offset == 0 && !synced) {
_fetchAndCacheMessages(offset: 0, take: take).catchError((_) {
return <LocalChatMessage>[];
});
}
final localMessages = await _getCachedMessages(
offset: offset,
take: take,
);
if (localMessages.isNotEmpty) {
return localMessages;
}
return await _fetchAndCacheMessages(offset: offset, take: take);
} catch (e) {
final localMessages = await _getCachedMessages(
offset: offset,
take: take,
);
if (localMessages.isNotEmpty) {
return localMessages;
}
rethrow;
}
}
Future<List<LocalChatMessage>> loadInitial() async {
try {
final repository = await ref.read(
messageRepositoryProvider(_roomId).future,
);
final synced = await repository.syncMessages();
final messages = await repository.listMessages(
final synced = await syncMessages();
final messages = await listMessages(
offset: 0,
take: _pageSize,
synced: synced,
@@ -86,10 +282,7 @@ class MessagesNotifier extends _$MessagesNotifier {
try {
final currentMessages = state.value ?? [];
_currentPage++;
final repository = await ref.read(
messageRepositoryProvider(_roomId).future,
);
final newMessages = await repository.listMessages(
final newMessages = await listMessages(
offset: _currentPage * _pageSize,
take: _pageSize,
);
@@ -113,163 +306,247 @@ class MessagesNotifier extends _$MessagesNotifier {
SnChatMessage? replyingTo,
Function(String, Map<int, double>)? onProgress,
}) async {
final baseUrl = ref.read(serverUrlProvider);
final token = await getToken(ref.watch(tokenProvider));
if (token == null) throw ArgumentError('Access token is null');
final nonce = const Uuid().v4();
final mockMessage = SnChatMessage(
id: 'pending_$nonce',
chatRoomId: _roomId,
senderId: _identity.id,
content: content,
createdAt: DateTime.now(),
updatedAt: DateTime.now(),
nonce: nonce,
sender: _identity,
);
final localMessage = LocalChatMessage.fromRemoteMessage(
mockMessage,
MessageStatus.pending,
);
_pendingMessages[localMessage.id] = localMessage;
_fileUploadProgress[localMessage.id] = {};
await _database.saveMessage(_database.messageToCompanion(localMessage));
final currentMessages = state.value ?? [];
state = AsyncValue.data([localMessage, ...currentMessages]);
try {
final repository = await ref.read(
messageRepositoryProvider(_roomId).future,
);
final baseUrl = ref.read(serverUrlProvider);
final token = await getToken(ref.watch(tokenProvider));
if (token == null) throw ArgumentError('Access token is null');
var cloudAttachments = List.empty(growable: true);
for (var idx = 0; idx < attachments.length; idx++) {
final cloudFile =
await putMediaToCloud(
fileData: attachments[idx],
atk: token,
baseUrl: baseUrl,
filename: attachments[idx].data.name ?? 'Post media',
mimetype:
attachments[idx].data.mimeType ??
switch (attachments[idx].type) {
UniversalFileType.image => 'image/unknown',
UniversalFileType.video => 'video/unknown',
UniversalFileType.audio => 'audio/unknown',
UniversalFileType.file => 'application/octet-stream',
},
onProgress: (progress, _) {
_fileUploadProgress[localMessage.id]?[idx] = progress;
onProgress?.call(
localMessage.id,
_fileUploadProgress[localMessage.id] ?? {},
);
},
).future;
if (cloudFile == null) {
throw ArgumentError('Failed to upload the file...');
}
cloudAttachments.add(cloudFile);
}
final currentMessages = state.value ?? [];
await repository.sendMessage(
token,
baseUrl,
_roomId,
content,
const Uuid().v4(),
attachments: attachments,
editingTo: editingTo,
forwardingTo: forwardingTo,
replyingTo: replyingTo,
onPending: (pending) {
state = AsyncValue.data([pending, ...currentMessages]);
final response = await _apiClient.request(
editingTo == null
? '/sphere/chat/$_roomId/messages'
: '/sphere/chat/$_roomId/messages/${editingTo.id}',
data: {
'content': content,
'attachments_id': cloudAttachments.map((e) => e.id).toList(),
'replied_message_id': replyingTo?.id,
'forwarded_message_id': forwardingTo?.id,
'meta': {},
'nonce': nonce,
},
onProgress: onProgress,
options: Options(method: editingTo == null ? 'POST' : 'PATCH'),
);
// Refresh messages
final messages = await repository.listMessages(
offset: 0,
take: _pageSize,
final remoteMessage = SnChatMessage.fromJson(response.data);
final updatedMessage = LocalChatMessage.fromRemoteMessage(
remoteMessage,
MessageStatus.sent,
);
state = AsyncValue.data(messages);
_pendingMessages.remove(localMessage.id);
await _database.deleteMessage(localMessage.id);
await _database.saveMessage(_database.messageToCompanion(updatedMessage));
final newMessages =
(state.value ?? []).map((m) {
if (m.id == localMessage.id) {
return updatedMessage;
}
return m;
}).toList();
state = AsyncValue.data(newMessages);
} catch (err) {
localMessage.status = MessageStatus.failed;
_pendingMessages[localMessage.id] = localMessage;
await _database.updateMessageStatus(
localMessage.id,
MessageStatus.failed,
);
final newMessages =
(state.value ?? []).map((m) {
if (m.id == localMessage.id) {
return m..status = MessageStatus.failed;
}
return m;
}).toList();
state = AsyncValue.data(newMessages);
showErrorAlert(err);
}
}
Future<void> retryMessage(String pendingMessageId) async {
try {
final repository = await ref.read(
messageRepositoryProvider(_roomId).future,
);
final updatedMessage = await repository.retryMessage(pendingMessageId);
final message = await fetchMessageById(pendingMessageId);
if (message == null) {
throw Exception('Message not found');
}
// Update the message in the list
final currentMessages = state.value ?? [];
final index = currentMessages.indexWhere((m) => m.id == pendingMessageId);
if (index >= 0) {
final newList = [...currentMessages];
newList[index] = updatedMessage;
state = AsyncValue.data(newList);
}
message.status = MessageStatus.pending;
_pendingMessages[pendingMessageId] = message;
await _database.updateMessageStatus(
pendingMessageId,
MessageStatus.pending,
);
try {
var remoteMessage = message.toRemoteMessage();
final response = await _apiClient.post(
'/sphere/chat/${message.roomId}/messages',
data: {
'content': remoteMessage.content,
'attachments_id': remoteMessage.attachments,
'meta': remoteMessage.meta,
'nonce': message.nonce,
},
);
remoteMessage = SnChatMessage.fromJson(response.data);
final updatedMessage = LocalChatMessage.fromRemoteMessage(
remoteMessage,
MessageStatus.sent,
);
_pendingMessages.remove(pendingMessageId);
await _database.deleteMessage(pendingMessageId);
await _database.saveMessage(_database.messageToCompanion(updatedMessage));
final newMessages =
(state.value ?? []).map((m) {
if (m.id == pendingMessageId) {
return updatedMessage;
}
return m;
}).toList();
state = AsyncValue.data(newMessages);
} catch (err) {
message.status = MessageStatus.failed;
_pendingMessages[pendingMessageId] = message;
await _database.updateMessageStatus(
pendingMessageId,
MessageStatus.failed,
);
final newMessages =
(state.value ?? []).map((m) {
if (m.id == pendingMessageId) {
return m..status = MessageStatus.failed;
}
return m;
}).toList();
state = AsyncValue.data(newMessages);
showErrorAlert(err);
}
}
Future<void> receiveMessage(SnChatMessage remoteMessage) async {
try {
final repository = await ref.read(
messageRepositoryProvider(_roomId).future,
if (remoteMessage.chatRoomId != _roomId) return;
final localMessage = LocalChatMessage.fromRemoteMessage(
remoteMessage,
MessageStatus.sent,
);
if (remoteMessage.nonce != null) {
_pendingMessages.removeWhere(
(_, pendingMsg) => pendingMsg.nonce == remoteMessage.nonce,
);
}
// Skip if this message is not for this room
if (remoteMessage.chatRoomId != _roomId) return;
await _database.saveMessage(_database.messageToCompanion(localMessage));
final localMessage = await repository.receiveMessage(remoteMessage);
final currentMessages = state.value ?? [];
final existingIndex = currentMessages.indexWhere(
(m) =>
m.id == localMessage.id ||
(localMessage.nonce != null && m.nonce == localMessage.nonce),
);
// Add the new message to the state
final currentMessages = state.value ?? [];
// Check if the message already exists (by id or nonce)
final existingIndex = currentMessages.indexWhere(
(m) =>
m.id == localMessage.id ||
(localMessage.nonce != null && m.nonce == localMessage.nonce),
);
if (existingIndex >= 0) {
// Replace existing message
final newList = [...currentMessages];
newList[existingIndex] = localMessage;
state = AsyncValue.data(newList);
} else {
// Add new message at the beginning (newest first)
state = AsyncValue.data([localMessage, ...currentMessages]);
}
} catch (err) {
showErrorAlert(err);
if (existingIndex >= 0) {
final newList = [...currentMessages];
newList[existingIndex] = localMessage;
state = AsyncValue.data(newList);
} else {
state = AsyncValue.data([localMessage, ...currentMessages]);
}
}
Future<void> receiveMessageUpdate(SnChatMessage remoteMessage) async {
try {
final repository = await ref.read(
messageRepositoryProvider(_roomId).future,
);
if (remoteMessage.chatRoomId != _roomId) return;
// Skip if this message is not for this room
if (remoteMessage.chatRoomId != _roomId) return;
final updatedMessage = LocalChatMessage.fromRemoteMessage(
remoteMessage,
MessageStatus.sent,
);
await _database.updateMessage(_database.messageToCompanion(updatedMessage));
final updatedMessage = await repository.receiveMessageUpdate(
remoteMessage,
);
final currentMessages = state.value ?? [];
final index = currentMessages.indexWhere((m) => m.id == updatedMessage.id);
// Update the message in the list
final currentMessages = state.value ?? [];
final index = currentMessages.indexWhere(
(m) => m.id == updatedMessage.id,
);
if (index >= 0) {
final newList = [...currentMessages];
newList[index] = updatedMessage;
state = AsyncValue.data(newList);
}
} catch (err) {
showErrorAlert(err);
if (index >= 0) {
final newList = [...currentMessages];
newList[index] = updatedMessage;
state = AsyncValue.data(newList);
}
}
Future<void> receiveMessageDeletion(String messageId) async {
try {
final repository = await ref.read(
messageRepositoryProvider(_roomId).future,
);
_pendingMessages.remove(messageId);
await _database.deleteMessage(messageId);
await repository.receiveMessageDeletion(messageId);
final currentMessages = state.value ?? [];
final filteredMessages =
currentMessages.where((m) => m.id != messageId).toList();
// Remove the message from the list
final currentMessages = state.value ?? [];
final filteredMessages =
currentMessages.where((m) => m.id != messageId).toList();
if (filteredMessages.length != currentMessages.length) {
state = AsyncValue.data(filteredMessages);
}
} catch (err) {
showErrorAlert(err);
if (filteredMessages.length != currentMessages.length) {
state = AsyncValue.data(filteredMessages);
}
}
Future<void> deleteMessage(String messageId) async {
try {
final repository = await ref.read(
messageRepositoryProvider(_roomId).future,
);
await repository.deleteMessage(messageId);
// Remove the message from the list
final currentMessages = state.value ?? [];
final filteredMessages =
currentMessages.where((m) => m.id != messageId).toList();
if (filteredMessages.length != currentMessages.length) {
state = AsyncValue.data(filteredMessages);
}
await _apiClient.delete('/sphere/chat/$_roomId/messages/$messageId');
await receiveMessageDeletion(messageId);
} catch (err) {
showErrorAlert(err);
}
@@ -277,13 +554,27 @@ class MessagesNotifier extends _$MessagesNotifier {
Future<LocalChatMessage?> fetchMessageById(String messageId) async {
try {
final repository = await ref.read(
messageRepositoryProvider(_roomId).future,
final localMessage =
await (_database.select(_database.chatMessages)
..where((tbl) => tbl.id.equals(messageId))).getSingleOrNull();
if (localMessage != null) {
return _database.companionToMessage(localMessage);
}
final response = await _apiClient.get(
'/sphere/chat/$_roomId/messages/$messageId',
);
return await repository.getMessageById(messageId);
} catch (err) {
showErrorAlert(err);
return null;
final remoteMessage = SnChatMessage.fromJson(response.data);
final message = LocalChatMessage.fromRemoteMessage(
remoteMessage,
MessageStatus.sent,
);
await _database.saveMessage(_database.messageToCompanion(message));
return message;
} catch (e) {
if (e is DioException) return null;
rethrow;
}
}
}
@@ -746,6 +1037,7 @@ class ChatRoomScreen extends HookConsumerWidget {
skipError: true,
data:
(identity) => MessageItem(
key: ValueKey(message.id),
message: message,
isCurrentUser:
identity?.id == message.senderId,