Better websocket reconnection and maintainer

This commit is contained in:
LittleSheep 2024-05-13 22:12:37 +08:00
parent b9461e5019
commit f43f9e91f6
7 changed files with 36 additions and 34 deletions

View File

@ -17,11 +17,11 @@ import 'package:solian/utils/services_url.dart';
import 'package:solian/widgets/chat/call/exts.dart'; import 'package:solian/widgets/chat/call/exts.dart';
import 'package:solian/widgets/exts.dart'; import 'package:solian/widgets/exts.dart';
import 'package:wakelock_plus/wakelock_plus.dart'; import 'package:wakelock_plus/wakelock_plus.dart';
import 'package:web_socket_channel/web_socket_channel.dart'; import 'package:web_socket_channel/io.dart';
import 'package:web_socket_channel/status.dart' as status;
import 'package:flutter_gen/gen_l10n/app_localizations.dart'; import 'package:flutter_gen/gen_l10n/app_localizations.dart';
class ChatProvider extends ChangeNotifier { class ChatProvider extends ChangeNotifier {
bool isOpened = false;
bool isCallShown = false; bool isCallShown = false;
Call? ongoingCall; Call? ongoingCall;
@ -31,12 +31,16 @@ class ChatProvider extends ChangeNotifier {
PagingController<int, Message>? historyPagingController; PagingController<int, Message>? historyPagingController;
WebSocketChannel? _channel; IOWebSocketChannel? _channel;
Future<WebSocketChannel?> connect(AuthProvider auth, {noRetry = false}) async { Future<IOWebSocketChannel?> connect(AuthProvider auth, {noRetry = false}) async {
if (auth.client == null) await auth.loadClient(); if (auth.client == null) await auth.loadClient();
if (!await auth.isAuthorized()) return null; if (!await auth.isAuthorized()) return null;
if (_channel != null && (_channel!.innerWebSocket?.readyState ?? 0) < 2) {
return _channel;
}
var ori = getRequestUri('messaging', '/api/ws'); var ori = getRequestUri('messaging', '/api/ws');
var uri = Uri( var uri = Uri(
scheme: ori.scheme.replaceFirst('http', 'ws'), scheme: ori.scheme.replaceFirst('http', 'ws'),
@ -46,10 +50,8 @@ class ChatProvider extends ChangeNotifier {
queryParameters: {'tk': Uri.encodeComponent(auth.client!.currentToken!)}, queryParameters: {'tk': Uri.encodeComponent(auth.client!.currentToken!)},
); );
isOpened = true;
try { try {
_channel = WebSocketChannel.connect(uri); _channel = IOWebSocketChannel.connect(uri);
await _channel!.ready; await _channel!.ready;
} catch (e) { } catch (e) {
if (!noRetry) { if (!noRetry) {
@ -108,8 +110,7 @@ class ChatProvider extends ChangeNotifier {
} }
void disconnect() { void disconnect() {
_channel = null; _channel?.sink.close(status.goingAway);
isOpened = false;
} }
Future<void> fetchMessages(int pageKey, BuildContext context) async { Future<void> fetchMessages(int pageKey, BuildContext context) async {

View File

@ -45,13 +45,11 @@ class KeypairProvider extends ChangeNotifier {
void receiveKeypair(Keypair kp) { void receiveKeypair(Keypair kp) {
keys[kp.id] = kp; keys[kp.id] = kp;
requestingKeys.remove(kp.id); requestingKeys.remove(kp.id);
saveKeys();
notifyListeners(); notifyListeners();
saveKeys();
} }
Keypair? provideKeypair(String id) { Keypair? provideKeypair(String id) {
print(id);
print(keys[id]);
return keys[id]; return keys[id];
} }
@ -79,9 +77,11 @@ class KeypairProvider extends ChangeNotifier {
saveKeys(); saveKeys();
} }
bool requestKey(String id, String algorithm, int uid) { void requestKey(String id, String algorithm, int uid) {
if (channel == null) return false; if (channel == null) return;
if (requestingKeys.contains(id)) return false; if (requestingKeys.contains(id)) return;
print('requested $id');
channel!.sink.add(jsonEncode( channel!.sink.add(jsonEncode(
NetworkPackage(method: 'kex.request', payload: { NetworkPackage(method: 'kex.request', payload: {
@ -95,7 +95,6 @@ class KeypairProvider extends ChangeNotifier {
requestingKeys.add(id); requestingKeys.add(id);
notifyListeners(); notifyListeners();
return true;
} }
String? encodeViaAESKey(String keypairId, String content) { String? encodeViaAESKey(String keypairId, String content) {

View File

@ -10,11 +10,11 @@ import 'package:solian/models/pagination.dart';
import 'package:solian/providers/auth.dart'; import 'package:solian/providers/auth.dart';
import 'package:solian/utils/services_url.dart'; import 'package:solian/utils/services_url.dart';
import 'package:solian/models/notification.dart' as model; import 'package:solian/models/notification.dart' as model;
import 'package:web_socket_channel/web_socket_channel.dart'; import 'package:web_socket_channel/io.dart';
import 'package:web_socket_channel/status.dart' as status;
import 'dart:math' as math; import 'dart:math' as math;
class NotifyProvider extends ChangeNotifier { class NotifyProvider extends ChangeNotifier {
bool isOpened = false;
int unreadAmount = 0; int unreadAmount = 0;
List<model.Notification> notifications = List.empty(growable: true); List<model.Notification> notifications = List.empty(growable: true);
@ -64,10 +64,9 @@ class NotifyProvider extends ChangeNotifier {
notifyListeners(); notifyListeners();
} }
WebSocketChannel? _channel; IOWebSocketChannel? _channel;
Future<WebSocketChannel?> connect( Future<IOWebSocketChannel?> connect(AuthProvider auth, {
AuthProvider auth, {
Keypair? Function(String id)? onKexRequest, Keypair? Function(String id)? onKexRequest,
Function(Keypair kp)? onKexProvide, Function(Keypair kp)? onKexProvide,
bool noRetry = false, bool noRetry = false,
@ -75,8 +74,9 @@ class NotifyProvider extends ChangeNotifier {
if (auth.client == null) await auth.loadClient(); if (auth.client == null) await auth.loadClient();
if (!await auth.isAuthorized()) return null; if (!await auth.isAuthorized()) return null;
await auth.client!.refreshToken(auth.client!.currentRefreshToken!); if (_channel != null && (_channel!.innerWebSocket?.readyState ?? 0) < 2) {
return _channel;
}
var ori = getRequestUri('passport', '/api/ws'); var ori = getRequestUri('passport', '/api/ws');
var uri = Uri( var uri = Uri(
scheme: ori.scheme.replaceFirst('http', 'ws'), scheme: ori.scheme.replaceFirst('http', 'ws'),
@ -86,10 +86,8 @@ class NotifyProvider extends ChangeNotifier {
queryParameters: {'tk': Uri.encodeComponent(auth.client!.currentToken!)}, queryParameters: {'tk': Uri.encodeComponent(auth.client!.currentToken!)},
); );
isOpened = true;
try { try {
_channel = WebSocketChannel.connect(uri); _channel = IOWebSocketChannel.connect(uri);
await _channel!.ready; await _channel!.ready;
} catch (e) { } catch (e) {
if (!noRetry) { if (!noRetry) {
@ -101,7 +99,7 @@ class NotifyProvider extends ChangeNotifier {
} }
_channel!.stream.listen( _channel!.stream.listen(
(event) { (event) {
final result = NetworkPackage.fromJson(jsonDecode(event)); final result = NetworkPackage.fromJson(jsonDecode(event));
switch (result.method) { switch (result.method) {
case 'notifications.new': case 'notifications.new':
@ -143,8 +141,7 @@ class NotifyProvider extends ChangeNotifier {
} }
void disconnect() { void disconnect() {
_channel = null; _channel?.sink.close(status.goingAway);
isOpened = false;
} }
void notifyMessage(String title, String body) { void notifyMessage(String title, String body) {

View File

@ -38,7 +38,6 @@ class _ChatMemberScreenState extends State<ChatMemberScreen> {
_selfId = prof['id']; _selfId = prof['id'];
var uri = getRequestUri('messaging', '/api/channels/${widget.realm}/${widget.channel.alias}/members'); var uri = getRequestUri('messaging', '/api/channels/${widget.realm}/${widget.channel.alias}/members');
print(uri);
var res = await auth.client!.get(uri); var res = await auth.client!.get(uri);
if (res.statusCode == 200) { if (res.statusCode == 200) {

View File

@ -162,7 +162,7 @@ class _ChatWidgetState extends State<ChatWidget> {
Future.delayed(Duration.zero, () async { Future.delayed(Duration.zero, () async {
final auth = context.read<AuthProvider>(); final auth = context.read<AuthProvider>();
if (!_chat.isOpened) await _chat.connect(auth); await _chat.connect(auth);
_chat.fetchOngoingCall(widget.alias, widget.realm); _chat.fetchOngoingCall(widget.alias, widget.realm);
_chat.fetchChannel(context, auth, widget.alias, widget.realm).then((result) { _chat.fetchChannel(context, auth, widget.alias, widget.realm).then((result) {

View File

@ -55,12 +55,11 @@ class _ChatMessageContentState extends State<ChatMessageContent> {
final keypair = context.watch<KeypairProvider>(); final keypair = context.watch<KeypairProvider>();
if (keypair.keys[widget.item.decodedContent['keypair_id']] == null) { if (keypair.keys[widget.item.decodedContent['keypair_id']] == null) {
WidgetsBinding.instance.addPostFrameCallback((_) { WidgetsBinding.instance.addPostFrameCallback((_) {
if (keypair.requestKey( keypair.requestKey(
widget.item.decodedContent['keypair_id'], widget.item.decodedContent['keypair_id'],
widget.item.decodedContent['algorithm'], widget.item.decodedContent['algorithm'],
widget.item.sender.account.externalId!, widget.item.sender.account.externalId!,
)) { );
}
}); });
} else { } else {
content = keypair.decodeViaAESKey( content = keypair.decodeViaAESKey(

View File

@ -1,3 +1,5 @@
import 'dart:async';
import 'package:flutter/material.dart'; import 'package:flutter/material.dart';
import 'package:provider/provider.dart'; import 'package:provider/provider.dart';
import 'package:solian/providers/auth.dart'; import 'package:solian/providers/auth.dart';
@ -43,6 +45,11 @@ class _ProviderInitializerState extends State<ProviderInitializer> {
onKexRequest: keypair.provideKeypair, onKexRequest: keypair.provideKeypair,
onKexProvide: keypair.receiveKeypair, onKexProvide: keypair.receiveKeypair,
); );
Timer.periodic(const Duration(seconds: 1), (timer) {
nty.connect(auth);
chat.connect(auth);
});
} }
} catch (e) { } catch (e) {
context.showErrorDialog(e); context.showErrorDialog(e);