♻️ Refactor to replace Hive with Sqlite
This commit is contained in:
@ -2,11 +2,12 @@ import 'dart:async';
|
||||
import 'dart:convert';
|
||||
import 'dart:math' as math;
|
||||
|
||||
import 'package:collection/collection.dart';
|
||||
import 'package:dio/dio.dart';
|
||||
import 'package:drift/drift.dart';
|
||||
import 'package:flutter/material.dart';
|
||||
import 'package:hive/hive.dart';
|
||||
import 'package:provider/provider.dart';
|
||||
import 'package:surface/database/database.dart';
|
||||
import 'package:surface/providers/database.dart';
|
||||
import 'package:surface/providers/sn_attachment.dart';
|
||||
import 'package:surface/providers/sn_network.dart';
|
||||
import 'package:surface/providers/user_directory.dart';
|
||||
@ -16,13 +17,13 @@ import 'package:surface/types/websocket.dart';
|
||||
import 'package:uuid/uuid.dart';
|
||||
|
||||
class ChatMessageController extends ChangeNotifier {
|
||||
static const kChatMessageBoxPrefix = 'nex_chat_messages_';
|
||||
static const kSingleBatchLoadLimit = 100;
|
||||
|
||||
late final SnNetworkProvider _sn;
|
||||
late final UserDirectoryProvider _ud;
|
||||
late final WebSocketProvider _ws;
|
||||
late final SnAttachmentProvider _attach;
|
||||
late final DatabaseProvider _dt;
|
||||
|
||||
StreamSubscription? _wsSubscription;
|
||||
|
||||
@ -31,6 +32,7 @@ class ChatMessageController extends ChangeNotifier {
|
||||
_ud = context.read<UserDirectoryProvider>();
|
||||
_ws = context.read<WebSocketProvider>();
|
||||
_attach = context.read<SnAttachmentProvider>();
|
||||
_dt = context.read<DatabaseProvider>();
|
||||
}
|
||||
|
||||
bool isPending = true;
|
||||
@ -38,9 +40,9 @@ class ChatMessageController extends ChangeNotifier {
|
||||
|
||||
int? messageTotal;
|
||||
|
||||
bool get isAllLoaded => messageTotal != null && messages.length >= messageTotal!;
|
||||
bool get isAllLoaded =>
|
||||
messageTotal != null && messages.length >= messageTotal!;
|
||||
|
||||
String? _boxKey;
|
||||
SnChannel? channel;
|
||||
SnChannelMember? profile;
|
||||
|
||||
@ -51,25 +53,17 @@ class ChatMessageController extends ChangeNotifier {
|
||||
/// Stored as a list of nonce to provide the loading state
|
||||
final List<String> unconfirmedMessages = List.empty(growable: true);
|
||||
|
||||
Box<SnChatMessage>? get _box => (_boxKey == null || isPending) ? null : Hive.box<SnChatMessage>(_boxKey!);
|
||||
|
||||
final List<SnChannelMember> typingMembers = List.empty(growable: true);
|
||||
final Map<int, Timer> typingInactiveTimer = {};
|
||||
|
||||
Future<void> initialize(SnChannel chan) async {
|
||||
channel = chan;
|
||||
|
||||
// Initialize local data
|
||||
_boxKey = '$kChatMessageBoxPrefix${chan.id}';
|
||||
await Hive.openBox<SnChatMessage>(_boxKey!);
|
||||
|
||||
// Fetch channel profile
|
||||
final resp = await _sn.client.get(
|
||||
'/cgi/im/channels/${chan.keyPath}/me',
|
||||
);
|
||||
profile = SnChannelMember.fromJson(
|
||||
resp.data as Map<String, dynamic>,
|
||||
);
|
||||
profile = SnChannelMember.fromJson(resp.data);
|
||||
|
||||
_wsSubscription = _ws.pk.stream.listen((event) {
|
||||
switch (event.method) {
|
||||
@ -87,7 +81,8 @@ class ChatMessageController extends ChangeNotifier {
|
||||
notifyListeners();
|
||||
}
|
||||
typingInactiveTimer[member.id]?.cancel();
|
||||
typingInactiveTimer[member.id] = Timer(const Duration(seconds: 3), () {
|
||||
typingInactiveTimer[member.id] =
|
||||
Timer(const Duration(seconds: 3), () {
|
||||
typingMembers.removeWhere((x) => x.id == member.id);
|
||||
typingInactiveTimer.remove(member.id);
|
||||
notifyListeners();
|
||||
@ -129,10 +124,16 @@ class ChatMessageController extends ChangeNotifier {
|
||||
}
|
||||
|
||||
Future<void> _saveMessageToLocal(Iterable<SnChatMessage> messages) async {
|
||||
if (_box == null) return;
|
||||
await _box!.putAll({
|
||||
for (final message in messages) message.id: message,
|
||||
});
|
||||
await _dt.db.snLocalChatMessage.insertAll(
|
||||
messages.map(
|
||||
(ele) => SnLocalChatMessageCompanion.insert(
|
||||
id: Value(ele.id),
|
||||
content: ele,
|
||||
channelId: channel!.id,
|
||||
createdAt: Value(ele.createdAt),
|
||||
),
|
||||
),
|
||||
onConflict: DoNothing());
|
||||
}
|
||||
|
||||
Future<void> _addUnconfirmedMessage(SnChatMessage message) async {
|
||||
@ -184,8 +185,21 @@ class ChatMessageController extends ChangeNotifier {
|
||||
await _applyMessage(message);
|
||||
notifyListeners();
|
||||
|
||||
if (_box == null) return;
|
||||
await _box!.put(message.id, message);
|
||||
if (isCheckedUpdate) {
|
||||
await _dt.db.snLocalChatMessage.insertOne(
|
||||
SnLocalChatMessageCompanion.insert(
|
||||
id: Value(message.id),
|
||||
content: message,
|
||||
channelId: channel!.id,
|
||||
createdAt: Value(message.createdAt),
|
||||
),
|
||||
onConflict: DoUpdate((_) => SnLocalChatMessageCompanion.custom(
|
||||
content: Constant(jsonEncode(message.toJson())),
|
||||
)),
|
||||
);
|
||||
} else {
|
||||
incomeStrandedQueue.add(message);
|
||||
}
|
||||
}
|
||||
|
||||
Future<void> _applyMessage(SnChatMessage message) async {
|
||||
@ -194,7 +208,8 @@ class ChatMessageController extends ChangeNotifier {
|
||||
switch (message.type) {
|
||||
case 'messages.edit':
|
||||
if (message.relatedEventId != null) {
|
||||
final idx = messages.indexWhere((x) => x.id == message.relatedEventId);
|
||||
final idx =
|
||||
messages.indexWhere((x) => x.id == message.relatedEventId);
|
||||
if (idx != -1) {
|
||||
final newBody = message.body;
|
||||
newBody.remove('related_event');
|
||||
@ -202,16 +217,24 @@ class ChatMessageController extends ChangeNotifier {
|
||||
body: newBody,
|
||||
updatedAt: message.updatedAt,
|
||||
);
|
||||
if (_box!.containsKey(message.relatedEventId)) {
|
||||
await _box!.put(message.relatedEventId, messages[idx]);
|
||||
if (message.relatedEventId != null) {
|
||||
await (_dt.db.snLocalChatMessage.update()
|
||||
..where((e) => e.id.equals(message.relatedEventId!)))
|
||||
.write(
|
||||
SnLocalChatMessageCompanion.custom(
|
||||
content: Constant(jsonEncode(messages[idx].toJson())),
|
||||
),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
case 'messages.delete':
|
||||
if (message.relatedEventId != null) {
|
||||
messages.removeWhere((x) => x.id == message.relatedEventId);
|
||||
if (_box!.containsKey(message.relatedEventId)) {
|
||||
await _box!.delete(message.relatedEventId);
|
||||
if (message.relatedEventId != null) {
|
||||
await (_dt.db.snLocalChatMessage.delete()
|
||||
..where((e) => e.id.equals(message.relatedEventId!)))
|
||||
.go();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -233,7 +256,8 @@ class ChatMessageController extends ChangeNotifier {
|
||||
'algorithm': 'plain',
|
||||
if (quoteId != null) 'quote_event': quoteId,
|
||||
if (relatedId != null) 'related_event': relatedId,
|
||||
if (attachments != null && attachments.isNotEmpty) 'attachments': attachments,
|
||||
if (attachments != null && attachments.isNotEmpty)
|
||||
'attachments': attachments,
|
||||
};
|
||||
|
||||
// Mock the message locally
|
||||
@ -287,20 +311,34 @@ class ChatMessageController extends ChangeNotifier {
|
||||
}
|
||||
}
|
||||
|
||||
bool isCheckedUpdate = false;
|
||||
List<SnChatMessage> incomeStrandedQueue = List.empty(growable: true);
|
||||
|
||||
/// Check the local storage is up to date with the server.
|
||||
/// If the local storage is not up to date, it will be updated.
|
||||
Future<void> checkUpdate() async {
|
||||
if (_box == null) return;
|
||||
if (_box!.isEmpty) return;
|
||||
|
||||
isLoading = true;
|
||||
notifyListeners();
|
||||
|
||||
final mostRecentMessage = await (_dt.db.snLocalChatMessage.select()
|
||||
..limit(1)
|
||||
..orderBy([
|
||||
(e) =>
|
||||
OrderingTerm(expression: e.createdAt, mode: OrderingMode.desc)
|
||||
]))
|
||||
.getSingleOrNull();
|
||||
if (mostRecentMessage == null) {
|
||||
// Initial load
|
||||
await loadMessages(take: 20);
|
||||
isCheckedUpdate = true;
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
final resp = await _sn.client.get(
|
||||
'/cgi/im/channels/${channel!.keyPath}/events/update',
|
||||
queryParameters: {
|
||||
'pivot': _box!.values.last.id,
|
||||
'pivot': mostRecentMessage.content.id,
|
||||
},
|
||||
);
|
||||
if (resp.data['up_to_date'] == true) return;
|
||||
@ -316,6 +354,12 @@ class ChatMessageController extends ChangeNotifier {
|
||||
} finally {
|
||||
await loadMessages();
|
||||
isLoading = false;
|
||||
|
||||
isCheckedUpdate = true;
|
||||
_saveMessageToLocal(incomeStrandedQueue).then((_) {
|
||||
incomeStrandedQueue.clear();
|
||||
});
|
||||
|
||||
notifyListeners();
|
||||
}
|
||||
}
|
||||
@ -324,13 +368,18 @@ class ChatMessageController extends ChangeNotifier {
|
||||
/// If it was not found in local storage we will look it up in remote
|
||||
Future<SnChatMessage?> getMessage(int id) async {
|
||||
SnChatMessage? out;
|
||||
if (_box != null && _box!.containsKey(id)) {
|
||||
out = _box!.get(id);
|
||||
final local = await (_dt.db.snLocalChatMessage.select()
|
||||
..limit(1)
|
||||
..where((e) => e.id.equals(id)))
|
||||
.getSingleOrNull();
|
||||
if (local != null) {
|
||||
out = local.content;
|
||||
}
|
||||
|
||||
if (out == null) {
|
||||
try {
|
||||
final resp = await _sn.client.get('/cgi/im/channels/${channel!.keyPath}/events/$id');
|
||||
final resp = await _sn.client
|
||||
.get('/cgi/im/channels/${channel!.keyPath}/events/$id');
|
||||
out = SnChatMessage.fromJson(resp.data);
|
||||
_saveMessageToLocal([out]);
|
||||
} catch (_) {
|
||||
@ -364,16 +413,21 @@ class ChatMessageController extends ChangeNotifier {
|
||||
bool forceLocal = false,
|
||||
bool forceRemote = false,
|
||||
}) async {
|
||||
final localTotal = await _dt.db.snLocalChatMessage
|
||||
.count(where: (e) => e.channelId.equals(channel!.id))
|
||||
.getSingle();
|
||||
|
||||
late List<SnChatMessage> out;
|
||||
if (_box != null && (_box!.length >= take + offset || forceLocal) && !forceRemote) {
|
||||
out = _box!.keys
|
||||
.toList()
|
||||
.cast<int>()
|
||||
.sorted((a, b) => b.compareTo(a))
|
||||
.skip(offset)
|
||||
.take(take)
|
||||
.map((key) => _box!.get(key)!)
|
||||
.toList();
|
||||
if ((localTotal >= take + offset || forceLocal) && !forceRemote) {
|
||||
final result = await (_dt.db.snLocalChatMessage.select()
|
||||
..where((e) => e.channelId.equals(channel!.id))
|
||||
..orderBy([
|
||||
(e) =>
|
||||
OrderingTerm(expression: e.createdAt, mode: OrderingMode.desc)
|
||||
])
|
||||
..limit(take, offset: offset))
|
||||
.get();
|
||||
out = result.map((e) => e.content).toList();
|
||||
} else {
|
||||
final resp = await _sn.client.get(
|
||||
'/cgi/im/channels/${channel!.keyPath}/events',
|
||||
@ -408,7 +462,8 @@ class ChatMessageController extends ChangeNotifier {
|
||||
quoteEvent: quoteEvent,
|
||||
attachments: attachments
|
||||
.where(
|
||||
(ele) => out[i].body['attachments']?.contains(ele?.rid) ?? false,
|
||||
(ele) =>
|
||||
out[i].body['attachments']?.contains(ele?.rid) ?? false,
|
||||
)
|
||||
.toList(),
|
||||
),
|
||||
@ -416,7 +471,10 @@ class ChatMessageController extends ChangeNotifier {
|
||||
}
|
||||
|
||||
// Preload sender accounts
|
||||
final accountId = out.where((ele) => ele.sender.accountId >= 0).map((ele) => ele.sender.accountId).toSet();
|
||||
final accountId = out
|
||||
.where((ele) => ele.sender.accountId >= 0)
|
||||
.map((ele) => ele.sender.accountId)
|
||||
.toSet();
|
||||
await _ud.listAccount(accountId);
|
||||
|
||||
return out;
|
||||
@ -443,7 +501,6 @@ class ChatMessageController extends ChangeNotifier {
|
||||
|
||||
@override
|
||||
void dispose() {
|
||||
_box?.close();
|
||||
_wsSubscription?.cancel();
|
||||
super.dispose();
|
||||
}
|
||||
|
Reference in New Issue
Block a user