diff --git a/lib/pods/activity_rpc.dart b/lib/pods/activity_rpc.dart index 04c80299..e646c757 100644 --- a/lib/pods/activity_rpc.dart +++ b/lib/pods/activity_rpc.dart @@ -3,6 +3,7 @@ import 'dart:convert'; import 'dart:developer' as developer; import 'dart:ffi'; import 'dart:io'; +import 'dart:isolate'; import 'package:flutter/foundation.dart'; import 'package:hooks_riverpod/hooks_riverpod.dart'; import 'package:island/pods/network.dart'; @@ -53,6 +54,7 @@ class ActivityRpcServer { HttpServer? _httpServer; ServerSocket? _ipcServer; int? _pipeHandle; // For Windows named pipe + Timer? _ipcTimer; // Store timer for cancellation final List _wsSockets = []; final List<_IpcSocketWrapper> _ipcSockets = []; @@ -155,13 +157,13 @@ class ActivityRpcServer { // Start WebSocket server while (port <= portRange[1]) { - developer.log('trying port $port', name: kRpcLogPrefix); + developer.log('Trying port $port', name: kRpcLogPrefix); try { _httpServer = await HttpServer.bind(InternetAddress.loopbackIPv4, port); - developer.log('listening on $port', name: kRpcLogPrefix); + developer.log('Listening on $port', name: kRpcLogPrefix); shelf_io.serveRequests(_httpServer!, (Request request) async { - developer.log('new request', name: kRpcLogPrefix); + developer.log('New request', name: kRpcLogPrefix); if (request.headers['upgrade']?.toLowerCase() == 'websocket') { final handler = webSocketHandler((WebSocketChannel channel, _) { _wsSockets.add(channel); @@ -170,7 +172,7 @@ class ActivityRpcServer { return handler(request); } developer.log( - 'new request disposed due to not websocket', + 'New request disposed due to not websocket', name: kRpcLogPrefix, ); return Response.notFound('Not a WebSocket request'); @@ -181,9 +183,10 @@ class ActivityRpcServer { if (e is SocketException && e.osError?.errorCode == 98) { developer.log('$port in use!', name: kRpcLogPrefix); } else { - developer.log('http error: $e', name: kRpcLogPrefix); + developer.log('HTTP error: $e', name: kRpcLogPrefix); } port++; + await Future.delayed(Duration(milliseconds: 100)); // Add delay } } @@ -244,42 +247,62 @@ class ActivityRpcServer { developer.log('IPC named pipe created at \\\\.\\pipe\\discord-ipc', name: kRpcIpcLogPrefix); - // Start listening for connections in a separate isolate or async loop + // Start listening for connections in a separate isolate _listenWindowsIpc(); } finally { free(pipeName); } } - // Listen for Windows IPC connections - void _listenWindowsIpc() { - Timer.periodic(Duration(milliseconds: 100), (timer) async { - if (_pipeHandle == null || _pipeHandle == INVALID_HANDLE_VALUE) { - timer.cancel(); - return; - } + // Listen for Windows IPC connections in an isolate + void _listenWindowsIpc() async { + final receivePort = ReceivePort(); + await Isolate.spawn(_windowsIpcIsolate, receivePort.sendPort); - final connected = ConnectNamedPipe(_pipeHandle!, nullptr); - if (connected != 0 || GetLastError() == ERROR_PIPE_CONNECTED) { - final socketWrapper = _IpcSocketWrapper(_pipeHandle!); + receivePort.listen((message) { + if (message is int) { + final socketWrapper = _IpcSocketWrapper(message); _ipcSockets.add(socketWrapper); developer.log('New IPC connection on named pipe', name: kRpcIpcLogPrefix); - - // Handle data reading in a separate async function _handleWindowsIpcData(socketWrapper); - - // Create a new pipe for the next connection - await _startWindowsIpcServer(); + _startWindowsIpcServer(); // Create new pipe for next connection } }); } + static void _windowsIpcIsolate(SendPort sendPort) { + while (true) { + final pipeHandle = CreateNamedPipe( + r'\\.\pipe\discord-ipc'.toNativeUtf16(), + PIPE_ACCESS_DUPLEX, + PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE | PIPE_WAIT, + PIPE_UNLIMITED_INSTANCES, + 4096, + 4096, + 0, + nullptr, + ); + if (pipeHandle == INVALID_HANDLE_VALUE) { + developer.log('Failed to create named pipe: ${GetLastError()}', name: kRpcIpcLogPrefix); + break; + } + final connected = ConnectNamedPipe(pipeHandle, nullptr); + if (connected != 0 || GetLastError() == ERROR_PIPE_CONNECTED) { + sendPort.send(pipeHandle); + } + // Avoid tight loop + sleep(Duration(milliseconds: 100)); + } + } + // Handle Windows IPC data void _handleWindowsIpcData(_IpcSocketWrapper socket) async { + final startTime = DateTime.now(); final buffer = malloc.allocate(4096); final bytesRead = malloc.allocate(sizeOf()); try { while (socket.pipeHandle != null) { + final readStart = DateTime.now(); final success = ReadFile( socket.pipeHandle!, buffer.cast(), @@ -287,6 +310,8 @@ class ActivityRpcServer { bytesRead, nullptr, ); + final readDuration = DateTime.now().difference(readStart).inMicroseconds; + developer.log('ReadFile took $readDuration microseconds', name: kRpcIpcLogPrefix); if (success == FALSE && GetLastError() != ERROR_MORE_DATA) { developer.log('IPC read error: ${GetLastError()}', name: kRpcIpcLogPrefix); @@ -307,6 +332,8 @@ class ActivityRpcServer { } finally { malloc.free(buffer); malloc.free(bytesRead); + final totalDuration = DateTime.now().difference(startTime).inMicroseconds; + developer.log('handleWindowsIpcData took $totalDuration microseconds', name: kRpcIpcLogPrefix); } } @@ -314,23 +341,36 @@ class ActivityRpcServer { Future stop() async { // Stop WebSocket server for (var socket in _wsSockets) { - await socket.sink.close(); + try { + await socket.sink.close(); + } catch (e) { + developer.log('Error closing WebSocket: $e', name: kRpcLogPrefix); + } } _wsSockets.clear(); - await _httpServer?.close(); + await _httpServer?.close(force: true); // Stop IPC server for (var socket in _ipcSockets) { - socket.close(); + try { + socket.close(); + } catch (e) { + developer.log('Error closing IPC socket: $e', name: kRpcIpcLogPrefix); + } } _ipcSockets.clear(); if (Platform.isWindows && _pipeHandle != null) { - CloseHandle(_pipeHandle!); + try { + CloseHandle(_pipeHandle!); + } catch (e) { + developer.log('Error closing named pipe: $e', name: kRpcIpcLogPrefix); + } _pipeHandle = null; } + _ipcTimer?.cancel(); await _ipcServer?.close(); - developer.log('servers stopped', name: kRpcLogPrefix); + developer.log('Servers stopped', name: kRpcLogPrefix); } // Handle new WebSocket connection @@ -343,7 +383,7 @@ class ActivityRpcServer { final origin = request.headers['origin'] ?? ''; developer.log( - 'new WS connection! origin: $origin, params: $params', + 'New WS connection! origin: $origin, params: $params', name: kRpcLogPrefix, ); @@ -353,14 +393,14 @@ class ActivityRpcServer { 'https://ptb.discord.com', 'https://canary.discord.com', ].contains(origin)) { - developer.log('disallowed origin: $origin', name: kRpcLogPrefix); + developer.log('Disallowed origin: $origin', name: kRpcLogPrefix); socket.sink.close(); return; } if (encoding != 'json') { developer.log( - 'unsupported encoding requested: $encoding', + 'Unsupported encoding requested: $encoding', name: kRpcLogPrefix, ); socket.sink.close(); @@ -368,7 +408,7 @@ class ActivityRpcServer { } if (ver != 1) { - developer.log('unsupported version requested: $ver', name: kRpcLogPrefix); + developer.log('Unsupported version requested: $ver', name: kRpcLogPrefix); socket.sink.close(); return; } @@ -392,7 +432,7 @@ class ActivityRpcServer { // Handle new IPC connection void _onIpcConnection(Socket socket) { - developer.log('new IPC connection!', name: kRpcIpcLogPrefix); + developer.log('New IPC connection!', name: kRpcIpcLogPrefix); final socketWrapper = _IpcSocketWrapper.fromSocket(socket); _ipcSockets.add(socketWrapper); @@ -412,9 +452,17 @@ class ActivityRpcServer { } // Handle incoming WebSocket message - void _onWsMessage(_WsSocketWrapper socket, dynamic data) { + Future _onWsMessage(_WsSocketWrapper socket, dynamic data) async { + if (data is! String) { + developer.log('Invalid WebSocket message: not a string', name: kRpcLogPrefix); + return; + } try { - final jsonData = jsonDecode(data as String); + final jsonData = await compute(jsonDecode, data); + if (jsonData is! Map) { + developer.log('Invalid WebSocket message: not a JSON object', name: kRpcLogPrefix); + return; + } developer.log('WS message: $jsonData', name: kRpcLogPrefix); handlers['message']?.call(socket, jsonData); } catch (e) {