Surface/lib/pods/websocket.dart
LittleSheep a2053aa772 🐛 Fixes and optimzation on post
♻️ Replace freezed abstract classes with sealed
2025-05-24 22:38:07 +08:00

186 lines
5.4 KiB
Dart

import 'dart:async';
import 'dart:convert';
import 'dart:developer';
import 'package:flutter/foundation.dart';
import 'package:flutter_riverpod/flutter_riverpod.dart';
import 'package:freezed_annotation/freezed_annotation.dart';
import 'package:island/pods/config.dart';
import 'package:island/pods/network.dart';
import 'package:web_socket_channel/io.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
part 'websocket.freezed.dart';
part 'websocket.g.dart';
@freezed
sealed class WebSocketState with _$WebSocketState {
const factory WebSocketState.connected() = _Connected;
const factory WebSocketState.connecting() = _Connecting;
const factory WebSocketState.disconnected() = _Disconnected;
const factory WebSocketState.error(String message) = _Error;
}
@freezed
sealed class WebSocketPacket with _$WebSocketPacket {
const factory WebSocketPacket({
required String type,
required Map<String, dynamic>? data,
String? errorMessage,
}) = _WebSocketPacket;
factory WebSocketPacket.fromJson(Map<String, dynamic> json) =>
_$WebSocketPacketFromJson(json);
}
final websocketProvider = Provider<WebSocketService>((ref) {
return WebSocketService();
});
class WebSocketService {
late Ref _ref;
WebSocketChannel? _channel;
final StreamController<WebSocketPacket> _streamController =
StreamController<WebSocketPacket>.broadcast();
final StreamController<WebSocketState> _statusStreamController =
StreamController<WebSocketState>.broadcast();
Timer? _reconnectTimer;
Stream<WebSocketPacket> get dataStream => _streamController.stream;
Stream<WebSocketState> get statusStream => _statusStreamController.stream;
Future<void> connect(Ref ref) async {
_ref = ref;
final baseUrl = ref.watch(serverUrlProvider);
final atk = await getFreshAtk(
ref.watch(tokenPairProvider),
baseUrl,
onRefreshed: (atk, rtk) {
setTokenPair(ref.watch(sharedPreferencesProvider), atk, rtk);
ref.invalidate(tokenPairProvider);
},
);
final url = '$baseUrl/ws'.replaceFirst('http', 'ws');
log('[WebSocket] Trying connecting to $url');
try {
if (kIsWeb) {
_channel = WebSocketChannel.connect(Uri.parse('$url?tk=$atk'));
} else {
_channel = IOWebSocketChannel.connect(
Uri.parse(url),
headers: {'Authorization': 'Bearer $atk'},
);
}
await _channel!.ready;
_statusStreamController.sink.add(WebSocketState.connected());
_channel!.stream.listen(
(data) {
final dataStr =
data is Uint8List ? utf8.decode(data) : data.toString();
final packet = WebSocketPacket.fromJson(jsonDecode(dataStr));
_streamController.sink.add(packet);
log(
"[WebSocket] Received packet: ${packet.type} ${packet.errorMessage}",
);
},
onDone: () {
log('[WebSocket] Connection closed, attempting to reconnect...');
_scheduleReconnect();
_statusStreamController.sink.add(WebSocketState.disconnected());
},
onError: (error) {
log('[WebSocket] Error occurred: $error, attempting to reconnect...');
_scheduleReconnect();
_statusStreamController.sink.add(
WebSocketState.error(error.toString()),
);
},
);
} catch (err) {
log('[WebSocket] Failed to connect: $err');
_scheduleReconnect();
}
}
void _scheduleReconnect() {
_reconnectTimer?.cancel();
_reconnectTimer = Timer(const Duration(milliseconds: 500), () {
_statusStreamController.sink.add(WebSocketState.connecting());
connect(_ref);
});
}
WebSocketChannel? get ws => _channel;
void sendMessage(String message) {
_channel!.sink.add(message);
}
void close() {
_reconnectTimer?.cancel();
_channel?.sink.close();
}
}
final websocketStateProvider =
StateNotifierProvider<WebSocketStateNotifier, WebSocketState>(
(ref) => WebSocketStateNotifier(ref),
);
class WebSocketStateNotifier extends StateNotifier<WebSocketState> {
final Ref ref;
Timer? _reconnectTimer;
WebSocketStateNotifier(this.ref) : super(const WebSocketState.disconnected());
Future<void> connect() async {
state = const WebSocketState.connecting();
try {
final service = ref.read(websocketProvider);
final baseUrl = ref.watch(serverUrlProvider);
final atk = await getFreshAtk(
ref.watch(tokenPairProvider),
baseUrl,
onRefreshed: (atk, rtk) {
setTokenPair(ref.watch(sharedPreferencesProvider), atk, rtk);
ref.invalidate(tokenPairProvider);
},
);
if (atk == null) {
state = const WebSocketState.error('Unauthorized');
return;
}
await service.connect(ref);
state = const WebSocketState.connected();
service.statusStream.listen((event) {
state = event;
});
} catch (err) {
state = WebSocketState.error('Failed to connect: $err');
_scheduleReconnect();
}
}
void _scheduleReconnect() {
_reconnectTimer?.cancel();
_reconnectTimer = Timer(const Duration(milliseconds: 500), () {
connect();
});
}
void sendMessage(String message) {
final service = ref.read(websocketProvider);
service.sendMessage(message);
}
void close() {
final service = ref.read(websocketProvider);
service.close();
_reconnectTimer?.cancel();
state = const WebSocketState.disconnected();
}
}