♻️ 使用 SQLITE 来存储本地消息记录 #1

Merged
LittleSheep merged 6 commits from features/local-message-history into master 2024-06-23 11:13:42 +00:00
5 changed files with 167 additions and 78 deletions
Showing only changes of commit 52e5dd6860 - Show all commits

View File

@ -47,7 +47,8 @@ class Message {
attachments: json["attachments"] != null attachments: json["attachments"] != null
? List<int>.from(json["attachments"]) ? List<int>.from(json["attachments"])
: null, : null,
channel: Channel.fromJson(json['channel']), channel:
json['channel'] != null ? Channel.fromJson(json['channel']) : null,
sender: Sender.fromJson(json['sender']), sender: Sender.fromJson(json['sender']),
replyId: json['reply_id'], replyId: json['reply_id'],
replyTo: json['reply_to'] != null replyTo: json['reply_to'] != null

View File

@ -20,28 +20,75 @@ extension MessageHistoryHelper on MessageHistoryDb {
)); ));
} }
syncMessages(Channel channel, {String? scope}) async { replaceMessage(Message remote) async {
await localMessages.update(LocalMessage(
remote.id,
remote,
remote.channelId,
));
}
burnMessage(int id) async {
await localMessages.delete(id);
}
syncMessages(Channel channel, {String scope = 'global'}) async {
final lastOne = await localMessages.findLastByChannel(channel.id);
final data = await _getRemoteMessages(
channel,
scope,
remainBreath: 5,
onBrake: (items) {
return items.any((x) => x.id == lastOne?.id);
},
);
await localMessages.insertBulk(
data.map((x) => LocalMessage(x.id, x, x.channelId)).toList(),
);
}
Future<List<Message>> _getRemoteMessages(
Channel channel,
String scope, {
required int remainBreath,
bool Function(List<Message> items)? onBrake,
take = 10,
offset = 0,
}) async {
if (remainBreath <= 0) {
return List.empty();
}
final AuthProvider auth = Get.find(); final AuthProvider auth = Get.find();
if (!await auth.isAuthorized) return; if (!await auth.isAuthorized) return List.empty();
final client = auth.configureClient('messaging'); final client = auth.configureClient('messaging');
final resp = await client final resp = await client.get(
.get('/api/channels/$scope/${channel.alias}/messages?take=10&offset=0'); '/api/channels/$scope/${channel.alias}/messages?take=$take&offset=$offset');
if (resp.statusCode != 200) { if (resp.statusCode != 200) {
throw Exception(resp.bodyString); throw Exception(resp.bodyString);
} }
// TODO Continue sync until the last message / the message exists / sync limitation final PaginationResult response = PaginationResult.fromJson(resp.body);
final result =
response.data?.map((e) => Message.fromJson(e)).toList() ?? List.empty();
final PaginationResult result = PaginationResult.fromJson(resp.body); if (onBrake != null && onBrake(result)) {
final parsed = result.data?.map((e) => Message.fromJson(e)).toList(); return result;
if (parsed != null) {
await localMessages.insertBulk(
parsed.map((x) => LocalMessage(x.id, x, x.channelId)).toList(),
);
} }
final expandResult = await _getRemoteMessages(
channel,
scope,
remainBreath: remainBreath - 1,
take: take,
offset: offset + result.length,
);
return [...result, ...expandResult];
} }
Future<List<LocalMessage>> listMessages(Channel channel) async { Future<List<LocalMessage>> listMessages(Channel channel) async {

View File

@ -31,16 +31,31 @@ class RemoteMessageConverter extends TypeConverter<Message, String> {
@dao @dao
abstract class LocalMessageDao { abstract class LocalMessageDao {
@Query('SELECT * FROM LocalMessage WHERE channelId = :channelId') @Query('SELECT COUNT(id) FROM LocalMessage WHERE channelId = :channelId')
Future<int?> countByChannel(int channelId);
@Query('SELECT * FROM LocalMessage WHERE channelId = :channelId ORDER BY id DESC')
Future<List<LocalMessage>> findAllByChannel(int channelId); Future<List<LocalMessage>> findAllByChannel(int channelId);
@Query('SELECT * FROM LocalMessage WHERE channelId = :channelId ORDER BY id DESC LIMIT 1')
Future<LocalMessage?> findLastByChannel(int channelId);
@Insert(onConflict: OnConflictStrategy.replace) @Insert(onConflict: OnConflictStrategy.replace)
Future<void> insert(LocalMessage m); Future<void> insert(LocalMessage m);
@Insert(onConflict: OnConflictStrategy.replace) @Insert(onConflict: OnConflictStrategy.replace)
Future<void> insertBulk(List<LocalMessage> m); Future<void> insertBulk(List<LocalMessage> m);
@Query('DELETE * FROM LocalMessage') @Update(onConflict: OnConflictStrategy.replace)
Future<void> update(LocalMessage person);
@Query('DELETE FROM LocalMessage WHERE id = :id')
Future<void> delete(int id);
@Query('DELETE FROM LocalMessage WHERE channelId = :channelId')
Future<List<LocalMessage>> deleteByChannel(int channelId);
@Query('DELETE FROM LocalMessage')
Future<void> wipeLocalMessages(); Future<void> wipeLocalMessages();
} }

View File

@ -119,6 +119,15 @@ class _$LocalMessageDao extends LocalMessageDao {
_localMessageInsertionAdapter = InsertionAdapter( _localMessageInsertionAdapter = InsertionAdapter(
database, database,
'LocalMessage', 'LocalMessage',
(LocalMessage item) => <String, Object?>{
'id': item.id,
'data': _remoteMessageConverter.encode(item.data),
'channelId': item.channelId
}),
_localMessageUpdateAdapter = UpdateAdapter(
database,
'LocalMessage',
['id'],
(LocalMessage item) => <String, Object?>{ (LocalMessage item) => <String, Object?>{
'id': item.id, 'id': item.id,
'data': _remoteMessageConverter.encode(item.data), 'data': _remoteMessageConverter.encode(item.data),
@ -133,10 +142,45 @@ class _$LocalMessageDao extends LocalMessageDao {
final InsertionAdapter<LocalMessage> _localMessageInsertionAdapter; final InsertionAdapter<LocalMessage> _localMessageInsertionAdapter;
final UpdateAdapter<LocalMessage> _localMessageUpdateAdapter;
@override
Future<int?> countByChannel(int channelId) async {
return _queryAdapter.query(
'SELECT COUNT(id) FROM LocalMessage WHERE channelId = ?1',
mapper: (Map<String, Object?> row) => row.values.first as int,
arguments: [channelId]);
}
@override @override
Future<List<LocalMessage>> findAllByChannel(int channelId) async { Future<List<LocalMessage>> findAllByChannel(int channelId) async {
return _queryAdapter.queryList( return _queryAdapter.queryList(
'SELECT * FROM LocalMessage WHERE channelId = ?1', 'SELECT * FROM LocalMessage WHERE channelId = ?1 ORDER BY id DESC',
mapper: (Map<String, Object?> row) => LocalMessage(
row['id'] as int,
_remoteMessageConverter.decode(row['data'] as String),
row['channelId'] as int),
arguments: [channelId]);
}
@override
Future<LocalMessage?> findLastByChannel(int channelId) async {
return _queryAdapter.query(
'SELECT * FROM LocalMessage WHERE channelId = ?1 ORDER BY id DESC LIMIT 1',
mapper: (Map<String, Object?> row) => LocalMessage(row['id'] as int, _remoteMessageConverter.decode(row['data'] as String), row['channelId'] as int),
arguments: [channelId]);
}
@override
Future<void> delete(int id) async {
await _queryAdapter.queryNoReturn('DELETE FROM LocalMessage WHERE id = ?1',
arguments: [id]);
}
@override
Future<List<LocalMessage>> deleteByChannel(int channelId) async {
return _queryAdapter.queryList(
'DELETE FROM LocalMessage WHERE channelId = ?1',
mapper: (Map<String, Object?> row) => LocalMessage( mapper: (Map<String, Object?> row) => LocalMessage(
row['id'] as int, row['id'] as int,
_remoteMessageConverter.decode(row['data'] as String), _remoteMessageConverter.decode(row['data'] as String),
@ -146,7 +190,7 @@ class _$LocalMessageDao extends LocalMessageDao {
@override @override
Future<void> wipeLocalMessages() async { Future<void> wipeLocalMessages() async {
await _queryAdapter.queryNoReturn('DELETE * FROM LocalMessage'); await _queryAdapter.queryNoReturn('DELETE FROM LocalMessage');
} }
@override @override
@ -159,6 +203,11 @@ class _$LocalMessageDao extends LocalMessageDao {
await _localMessageInsertionAdapter.insertList( await _localMessageInsertionAdapter.insertList(
m, OnConflictStrategy.replace); m, OnConflictStrategy.replace);
} }
@override
Future<void> update(LocalMessage person) async {
await _localMessageUpdateAdapter.update(person, OnConflictStrategy.replace);
}
} }
// ignore_for_file: unused_element // ignore_for_file: unused_element

View File

@ -3,17 +3,17 @@ import 'dart:ui';
import 'package:flutter/material.dart'; import 'package:flutter/material.dart';
import 'package:get/get.dart'; import 'package:get/get.dart';
import 'package:infinite_scroll_pagination/infinite_scroll_pagination.dart';
import 'package:solian/exts.dart'; import 'package:solian/exts.dart';
import 'package:solian/models/call.dart'; import 'package:solian/models/call.dart';
import 'package:solian/models/channel.dart'; import 'package:solian/models/channel.dart';
import 'package:solian/models/message.dart'; import 'package:solian/models/message.dart';
import 'package:solian/models/packet.dart'; import 'package:solian/models/packet.dart';
import 'package:solian/models/pagination.dart';
import 'package:solian/providers/auth.dart'; import 'package:solian/providers/auth.dart';
import 'package:solian/providers/chat.dart'; import 'package:solian/providers/chat.dart';
import 'package:solian/providers/content/call.dart'; import 'package:solian/providers/content/call.dart';
import 'package:solian/providers/content/channel.dart'; import 'package:solian/providers/content/channel.dart';
import 'package:solian/providers/message/helper.dart';
import 'package:solian/providers/message/history.dart';
import 'package:solian/router.dart'; import 'package:solian/router.dart';
import 'package:solian/screens/channel/channel_detail.dart'; import 'package:solian/screens/channel/channel_detail.dart';
import 'package:solian/theme.dart'; import 'package:solian/theme.dart';
@ -50,8 +50,8 @@ class _ChannelChatScreenState extends State<ChannelChatScreen> {
Call? _ongoingCall; Call? _ongoingCall;
StreamSubscription<NetworkPackage>? _subscription; StreamSubscription<NetworkPackage>? _subscription;
final PagingController<int, Message> _pagingController = MessageHistoryDb? _db;
PagingController(firstPageKey: 0); List<LocalMessage> _currentHistory = List.empty();
getProfile() async { getProfile() async {
final AuthProvider auth = Get.find(); final AuthProvider auth = Get.find();
@ -106,29 +106,14 @@ class _ChannelChatScreenState extends State<ChannelChatScreen> {
setState(() => _isBusy = false); setState(() => _isBusy = false);
} }
Future<void> getMessages(int pageKey) async { Future<void> getMessages() async {
final AuthProvider auth = Get.find(); await _db!.syncMessages(_channel!, scope: widget.realm);
if (!await auth.isAuthorized) return; await syncHistory();
}
final client = auth.configureClient('messaging'); Future<void> syncHistory() async {
_currentHistory = await _db!.localMessages.findAllByChannel(_channel!.id);
final resp = await client.get( setState(() {});
'/api/channels/${widget.realm}/${widget.alias}/messages?take=10&offset=$pageKey');
if (resp.statusCode == 200) {
final PaginationResult result = PaginationResult.fromJson(resp.body);
final parsed = result.data?.map((e) => Message.fromJson(e)).toList();
if (parsed != null && parsed.length >= 10) {
_pagingController.appendPage(parsed, pageKey + parsed.length);
} else if (parsed != null) {
_pagingController.appendLastPage(parsed);
}
} else if (resp.statusCode == 403) {
_pagingController.appendLastPage([]);
} else {
_pagingController.error = resp.bodyString;
}
} }
void listenMessages() { void listenMessages() {
@ -138,33 +123,19 @@ class _ChannelChatScreenState extends State<ChannelChatScreen> {
case 'messages.new': case 'messages.new':
final payload = Message.fromJson(event.payload!); final payload = Message.fromJson(event.payload!);
if (payload.channelId == _channel?.id) { if (payload.channelId == _channel?.id) {
final idx = _pagingController.itemList _db?.receiveMessage(payload);
?.indexWhere((e) => e.uuid == payload.uuid);
if ((idx ?? -1) >= 0) {
_pagingController.itemList?[idx!] = payload;
} else {
_pagingController.itemList?.insert(0, payload);
}
} }
break; break;
case 'messages.update': case 'messages.update':
final payload = Message.fromJson(event.payload!); final payload = Message.fromJson(event.payload!);
if (payload.channelId == _channel?.id) { if (payload.channelId == _channel?.id) {
final idx = _pagingController.itemList _db?.replaceMessage(payload);
?.indexWhere((x) => x.uuid == payload.uuid);
if (idx != null) {
_pagingController.itemList?[idx] = payload;
}
} }
break; break;
case 'messages.burnt': case 'messages.burnt':
final payload = Message.fromJson(event.payload!); final payload = Message.fromJson(event.payload!);
if (payload.channelId == _channel?.id) { if (payload.channelId == _channel?.id) {
final idx = _pagingController.itemList _db?.burnMessage(payload.id);
?.indexWhere((x) => x.uuid != payload.uuid);
if (idx != null) {
_pagingController.itemList?.removeAt(idx - 1);
}
} }
break; break;
case 'calls.new': case 'calls.new':
@ -175,7 +146,7 @@ class _ChannelChatScreenState extends State<ChannelChatScreen> {
_ongoingCall = null; _ongoingCall = null;
break; break;
} }
setState(() {}); syncHistory();
}); });
} }
@ -200,21 +171,23 @@ class _ChannelChatScreenState extends State<ChannelChatScreen> {
Message? _messageToReplying; Message? _messageToReplying;
Message? _messageToEditing; Message? _messageToEditing;
Widget buildHistory(context, Message item, index) { Widget buildHistory(context, index) {
bool isMerged = false, hasMerged = false; bool isMerged = false, hasMerged = false;
if (index > 0) { if (index > 0) {
hasMerged = checkMessageMergeable( hasMerged = checkMessageMergeable(
_pagingController.itemList?[index - 1], _currentHistory[index - 1].data,
item, _currentHistory[index].data,
); );
} }
if (index + 1 < (_pagingController.itemList?.length ?? 0)) { if (index + 1 < _currentHistory.length) {
isMerged = checkMessageMergeable( isMerged = checkMessageMergeable(
item, _currentHistory[index].data,
_pagingController.itemList?[index + 1], _currentHistory[index + 1].data,
); );
} }
final item = _currentHistory[index].data;
Widget content; Widget content;
if (item.replyTo != null) { if (item.replyTo != null) {
content = Column( content = Column(
@ -268,14 +241,20 @@ class _ChannelChatScreenState extends State<ChannelChatScreen> {
@override @override
void initState() { void initState() {
super.initState(); createHistoryDb().then((db) async {
_db = db;
await getChannel();
await syncHistory();
getProfile();
getOngoingCall();
getMessages();
getProfile();
getChannel().then((_) {
listenMessages(); listenMessages();
_pagingController.addPageRequestListener(getMessages);
}); });
getOngoingCall();
super.initState();
} }
@override @override
@ -352,14 +331,11 @@ class _ChannelChatScreenState extends State<ChannelChatScreen> {
Column( Column(
children: [ children: [
Expanded( Expanded(
child: PagedListView<int, Message>( child: ListView.builder(
itemCount: _currentHistory.length,
clipBehavior: Clip.none, clipBehavior: Clip.none,
reverse: true, reverse: true,
pagingController: _pagingController, itemBuilder: buildHistory,
builderDelegate: PagedChildBuilderDelegate<Message>(
itemBuilder: buildHistory,
noItemsFoundIndicatorBuilder: (_) => Container(),
),
).paddingOnly(bottom: 56), ).paddingOnly(bottom: 56),
), ),
], ],
@ -380,7 +356,8 @@ class _ChannelChatScreenState extends State<ChannelChatScreen> {
channel: _channel!, channel: _channel!,
onSent: (Message item) { onSent: (Message item) {
setState(() { setState(() {
_pagingController.itemList?.insert(0, item); _db?.receiveMessage(item);
syncHistory();
}); });
}, },
onReset: () { onReset: () {