diff --git a/lib/controllers/chat_message_controller.dart b/lib/controllers/chat_message_controller.dart index a4c4d3b..d093737 100644 --- a/lib/controllers/chat_message_controller.dart +++ b/lib/controllers/chat_message_controller.dart @@ -71,7 +71,7 @@ class ChatMessageController extends ChangeNotifier { resp.data as Map, ); - _wsSubscription = _ws.stream.stream.listen((event) { + _wsSubscription = _ws.pk.stream.listen((event) { switch (event.method) { case 'events.new': if (event.payload?['channel_id'] != channel?.id) break; diff --git a/lib/providers/notification.dart b/lib/providers/notification.dart index c931522..8a196d6 100644 --- a/lib/providers/notification.dart +++ b/lib/providers/notification.dart @@ -77,7 +77,7 @@ class NotificationProvider extends ChangeNotifier { List notifications = List.empty(growable: true); void listen() { - _ws.stream.stream.listen((event) { + _ws.pk.stream.listen((event) { if (event.method == 'notifications.new') { final notification = SnNotification.fromJson(event.payload!); if (showingCount < 0) showingCount = 0; diff --git a/lib/providers/websocket.dart b/lib/providers/websocket.dart index a31e18e..7758577 100644 --- a/lib/providers/websocket.dart +++ b/lib/providers/websocket.dart @@ -18,7 +18,8 @@ class WebSocketProvider extends ChangeNotifier { late final SnNetworkProvider _sn; late final UserProvider _ua; - StreamController stream = StreamController.broadcast(); + StreamController pk = StreamController.broadcast(); + Stream? _wsStream; WebSocketProvider(BuildContext context) { _sn = context.read(); @@ -36,7 +37,7 @@ class WebSocketProvider extends ChangeNotifier { Completer? _connectCompleter; Future connect({noRetry = false}) async { - if(_connectCompleter != null) { + if (_connectCompleter != null) { await _connectCompleter!.future; _connectCompleter = null; } @@ -59,6 +60,7 @@ class WebSocketProvider extends ChangeNotifier { try { conn = WebSocketChannel.connect(uri); await conn!.ready; + _wsStream = conn!.stream.asBroadcastStream(); listen(); log('[WebSocket] Connected to server!'); isConnected = true; @@ -73,7 +75,7 @@ class WebSocketProvider extends ChangeNotifier { log('Retry connecting to websocket in 3 seconds...'); return Future.delayed( const Duration(seconds: 3), - () => connect(noRetry: true), + () => connect(noRetry: true), ); } } finally { @@ -93,11 +95,12 @@ class WebSocketProvider extends ChangeNotifier { } void listen() { - conn?.stream.listen( + if (_wsStream == null) return; + _wsStream!.listen( (event) { final packet = WebSocketPackage.fromJson(jsonDecode(event)); log('Websocket incoming message: ${packet.method} ${packet.message}'); - stream.sink.add(packet); + pk.sink.add(packet); }, onDone: () { isConnected = false; diff --git a/lib/screens/chat/room.dart b/lib/screens/chat/room.dart index 39fa609..766d8b2 100644 --- a/lib/screens/chat/room.dart +++ b/lib/screens/chat/room.dart @@ -206,7 +206,7 @@ class _ChatRoomScreenState extends State { }); final ws = context.read(); - _wsSubscription = ws.stream.stream.listen((event) { + _wsSubscription = ws.pk.stream.listen((event) { switch (event.method) { case 'calls.new': final payload = SnChatCall.fromJson(event.payload!);