From 461ed1fcdac262ec63e428c1f3093d06ae784df3 Mon Sep 17 00:00:00 2001 From: LittleSheep Date: Thu, 11 Sep 2025 00:56:26 +0800 Subject: [PATCH] :recycle: FFI windows rpc ipc implmentation --- lib/pods/activity_rpc.dart | 416 ++++++++++++++++++++++--------------- 1 file changed, 254 insertions(+), 162 deletions(-) diff --git a/lib/pods/activity_rpc.dart b/lib/pods/activity_rpc.dart index 9a405e2c..bccf6417 100644 --- a/lib/pods/activity_rpc.dart +++ b/lib/pods/activity_rpc.dart @@ -1,6 +1,7 @@ import 'dart:async'; import 'dart:convert'; import 'dart:developer' as developer; +import 'dart:ffi'; import 'dart:io'; import 'package:flutter/foundation.dart'; import 'package:hooks_riverpod/hooks_riverpod.dart'; @@ -10,6 +11,9 @@ import 'package:shelf/shelf_io.dart' as shelf_io; import 'package:shelf_web_socket/shelf_web_socket.dart'; import 'package:web_socket_channel/web_socket_channel.dart'; import 'package:path/path.dart' as path; +import 'package:win32/win32.dart'; +import 'package:win32/winsock2.dart' as winsock2; +import 'package:ffi/ffi.dart'; const String kRpcLogPrefix = 'arRPC.websocket'; const String kRpcIpcLogPrefix = 'arRPC.ipc'; @@ -43,12 +47,13 @@ class IpcErrorCodes { static const int invalidEncoding = 4005; } +// Reference https://github.com/OpenAsar/arrpc/blob/main/src/transports/ipc.js class ActivityRpcServer { static const List portRange = [6463, 6472]; // Ports 6463–6472 - Map - handlers; // {connection: (socket), message: (socket, data), close: (socket)} + Map handlers; // {connection: (socket), message: (socket, data), close: (socket)} HttpServer? _httpServer; ServerSocket? _ipcServer; + int? _pipeHandle; // For Windows named pipe final List _wsSockets = []; final List<_IpcSocketWrapper> _ipcSockets = []; @@ -79,10 +84,7 @@ class ActivityRpcServer { // Find available IPC socket path Future _findAvailableIpcPath() async { - if (Platform.isWindows) { - // Use TCP sockets on Windows for IPC (simpler and more compatible) - return _findAvailableTcpPort(); - } + if (Platform.isWindows) return r'\\.\pipe\discord-ipc'; // Build list of directories to try, with macOS-specific handling final baseDirs = []; @@ -103,11 +105,11 @@ class ActivityRpcServer { // Add other standard directories final otherDirs = [ - Platform.environment['XDG_RUNTIME_DIR'], // User runtime directory - Platform.environment['TMPDIR'], // App container temp (fallback) + Platform.environment['XDG_RUNTIME_DIR'], + Platform.environment['TMPDIR'], Platform.environment['TMP'], Platform.environment['TEMP'], - '/tmp', // System temp directory - most compatible + '/tmp', ]; baseDirs.addAll( @@ -123,7 +125,6 @@ class ActivityRpcServer { 0, ); socket.close(); - // Clean up the test socket try { await File(socketPath).delete(); } catch (_) {} @@ -133,9 +134,7 @@ class ActivityRpcServer { ); return socketPath; } catch (e) { - // Path not available, try next if (i == 0) { - // Log only for the first attempt per directory developer.log( 'IPC path $socketPath not available: $e', name: kRpcIpcLogPrefix, @@ -150,36 +149,7 @@ class ActivityRpcServer { ); } - // Find available TCP port for Windows IPC - Future _findAvailableTcpPort() async { - // Use ports in the range 6473-6482 (different from WebSocket server range 6463-6472) - for (int port = 6473; port <= 6482; port++) { - try { - final socket = await ServerSocket.bind( - InternetAddress.loopbackIPv4, - port, - ); - socket.close(); - developer.log( - 'IPC TCP socket will be created on port: $port', - name: kRpcIpcLogPrefix, - ); - return port.toString(); // Return as string to match existing interface - } catch (e) { - // Port not available, try next - if (port == 6473) { - developer.log( - 'IPC TCP port $port not available: $e', - name: kRpcIpcLogPrefix, - ); - } - continue; - } - } - throw Exception('No available IPC TCP ports found'); - } - - // Start the WebSocket server + // Start the server Future start() async { int port = portRange[0]; bool wsSuccess = false; @@ -188,11 +158,9 @@ class ActivityRpcServer { while (port <= portRange[1]) { developer.log('trying port $port', name: kRpcLogPrefix); try { - // Start HTTP server _httpServer = await HttpServer.bind(InternetAddress.loopbackIPv4, port); developer.log('listening on $port', name: kRpcLogPrefix); - // Handle WebSocket upgrades shelf_io.serveRequests(_httpServer!, (Request request) async { developer.log('new request', name: kRpcLogPrefix); if (request.headers['upgrade']?.toLowerCase() == 'websocket') { @@ -212,7 +180,6 @@ class ActivityRpcServer { break; } catch (e) { if (e is SocketException && e.osError?.errorCode == 98) { - // EADDRINUSE developer.log('$port in use!', name: kRpcLogPrefix); } else { developer.log('http error: $e', name: kRpcLogPrefix); @@ -227,47 +194,123 @@ class ActivityRpcServer { ); } - // Start IPC server (skip on macOS due to sandboxing) - final shouldStartIpc = !Platform.isMacOS; + // Start IPC server + final shouldStartIpc = !Platform.isMacOS && !kIsWeb; if (shouldStartIpc) { - try { - if (Platform.isWindows) { - // Use TCP socket on Windows - final ipcPortStr = await _findAvailableIpcPath(); - final ipcPort = int.parse(ipcPortStr); - _ipcServer = await ServerSocket.bind( - InternetAddress.loopbackIPv4, - ipcPort, - ); - developer.log( - 'IPC listening on TCP port $ipcPort', - name: kRpcIpcLogPrefix, - ); - } else { - // Use Unix socket on other platforms + if (Platform.isWindows) { + await _startWindowsIpcServer(); + } else { + try { final ipcPath = await _findAvailableIpcPath(); _ipcServer = await ServerSocket.bind( InternetAddress(ipcPath, type: InternetAddressType.unix), 0, ); developer.log('IPC listening at $ipcPath', name: kRpcIpcLogPrefix); - } - _ipcServer!.listen((Socket socket) { - _onIpcConnection(socket); - }); - } catch (e) { - developer.log('IPC server error: $e', name: kRpcIpcLogPrefix); - // Continue without IPC if it fails + _ipcServer!.listen((Socket socket) { + _onIpcConnection(socket); + }); + } catch (e) { + developer.log('IPC server error: $e', name: kRpcIpcLogPrefix); + } } } else { developer.log( - 'IPC server disabled on macOS due to sandboxing', + 'IPC server disabled on macOS or web in production mode', name: kRpcIpcLogPrefix, ); } } + // Start Windows-specific IPC server using Winsock2 named pipe + Future _startWindowsIpcServer() async { + final pipeName = r'\\.\pipe\discord-ipc'.toNativeUtf16(); + try { + _pipeHandle = CreateNamedPipe( + pipeName, + PIPE_ACCESS_DUPLEX, + PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE | PIPE_WAIT, + PIPE_UNLIMITED_INSTANCES, + 4096, // Output buffer size + 4096, // Input buffer size + 0, // Default timeout + nullptr, // Security attributes + ); + + if (_pipeHandle == INVALID_HANDLE_VALUE) { + final error = GetLastError(); + throw Exception('Failed to create named pipe: error code $error'); + } + + developer.log('IPC named pipe created at \\\\.\\pipe\\discord-ipc', name: kRpcIpcLogPrefix); + + // Start listening for connections in a separate isolate or async loop + _listenWindowsIpc(); + } finally { + free(pipeName); + } + } + + // Listen for Windows IPC connections + void _listenWindowsIpc() { + Timer.periodic(Duration(milliseconds: 100), (timer) async { + if (_pipeHandle == null || _pipeHandle == INVALID_HANDLE_VALUE) { + timer.cancel(); + return; + } + + final connected = ConnectNamedPipe(_pipeHandle!, nullptr); + if (connected != 0 || GetLastError() == ERROR_PIPE_CONNECTED) { + final socketWrapper = _IpcSocketWrapper(_pipeHandle!); + _ipcSockets.add(socketWrapper); + developer.log('New IPC connection on named pipe', name: kRpcIpcLogPrefix); + + // Handle data reading in a separate async function + _handleWindowsIpcData(socketWrapper); + + // Create a new pipe for the next connection + await _startWindowsIpcServer(); + } + }); + } + + // Handle Windows IPC data + void _handleWindowsIpcData(_IpcSocketWrapper socket) async { + final buffer = malloc.allocate(4096); + final bytesRead = malloc.allocate(sizeOf()); + try { + while (socket.pipeHandle != null) { + final success = ReadFile( + socket.pipeHandle!, + buffer.cast(), + 4096, + bytesRead, + nullptr, + ); + + if (success == FALSE && GetLastError() != ERROR_MORE_DATA) { + developer.log('IPC read error: ${GetLastError()}', name: kRpcIpcLogPrefix); + socket.close(); + break; + } + + final data = buffer.asTypedList(bytesRead.value); + socket.addData(data); + final packets = socket.readPackets(); + for (final packet in packets) { + _handleIpcPacket(socket, packet); + } + } + } catch (e) { + developer.log('IPC data error: $e', name: kRpcIpcLogPrefix); + socket.closeWithCode(IpcCloseCodes.closeUnsupported, e.toString()); + } finally { + malloc.free(buffer); + malloc.free(bytesRead); + } + } + // Stop the server Future stop() async { // Stop WebSocket server @@ -282,6 +325,10 @@ class ActivityRpcServer { socket.close(); } _ipcSockets.clear(); + if (Platform.isWindows && _pipeHandle != null) { + CloseHandle(_pipeHandle!); + _pipeHandle = null; + } await _ipcServer?.close(); developer.log('servers stopped', name: kRpcLogPrefix); @@ -289,7 +336,6 @@ class ActivityRpcServer { // Handle new WebSocket connection void _onWsConnection(WebSocketChannel socket, Request request) { - // Parse query parameters final uri = request.url; final params = uri.queryParameters; final ver = int.tryParse(params['v'] ?? '1') ?? 1; @@ -302,7 +348,6 @@ class ActivityRpcServer { name: kRpcLogPrefix, ); - // Validate origin if (origin.isNotEmpty && ![ 'https://discord.com', @@ -314,7 +359,6 @@ class ActivityRpcServer { return; } - // Validate encoding if (encoding != 'json') { developer.log( 'unsupported encoding requested: $encoding', @@ -324,17 +368,14 @@ class ActivityRpcServer { return; } - // Validate version if (ver != 1) { developer.log('unsupported version requested: $ver', name: kRpcLogPrefix); socket.sink.close(); return; } - // Store client info on socket final socketWithMeta = _WsSocketWrapper(socket, clientId, encoding); - // Set up event listeners socket.stream.listen( (data) => _onWsMessage(socketWithMeta, data), onError: (e) { @@ -347,7 +388,6 @@ class ActivityRpcServer { }, ); - // Notify handler of new connection handlers['connection']?.call(socketWithMeta); } @@ -355,10 +395,9 @@ class ActivityRpcServer { void _onIpcConnection(Socket socket) { developer.log('new IPC connection!', name: kRpcIpcLogPrefix); - final socketWrapper = _IpcSocketWrapper(socket); + final socketWrapper = _IpcSocketWrapper.fromSocket(socket); _ipcSockets.add(socketWrapper); - // Set up event listeners socket.listen( (data) => _onIpcData(socketWrapper, data), onError: (e) { @@ -408,7 +447,6 @@ class ActivityRpcServer { case IpcTypes.pong: developer.log('IPC pong received', name: kRpcIpcLogPrefix); - // Handle pong if needed break; case IpcTypes.handshake: @@ -443,7 +481,6 @@ class ActivityRpcServer { final ver = int.tryParse(params['v']?.toString() ?? '1') ?? 1; final clientId = params['client_id']?.toString() ?? ''; - // Validate version if (ver != 1) { developer.log( 'IPC unsupported version requested: $ver', @@ -453,7 +490,6 @@ class ActivityRpcServer { return; } - // Validate client ID if (clientId.isEmpty) { developer.log('IPC client ID required', name: kRpcIpcLogPrefix); socket.closeWithCode(IpcErrorCodes.invalidClientId); @@ -462,7 +498,6 @@ class ActivityRpcServer { socket.clientId = clientId; - // Notify handler of new connection handlers['connection']?.call(socket); } } @@ -483,12 +518,14 @@ class _WsSocketWrapper { // IPC wrapper class _IpcSocketWrapper { - final Socket socket; + final Socket? socket; + final int? pipeHandle; String clientId = ''; bool handshook = false; final List _buffer = []; - _IpcSocketWrapper(this.socket); + _IpcSocketWrapper(this.pipeHandle) : socket = null; + _IpcSocketWrapper.fromSocket(this.socket) : pipeHandle = null; void addData(List data) { _buffer.addAll(data); @@ -497,23 +534,82 @@ class _IpcSocketWrapper { void send(Map msg) { developer.log('IPC sending: $msg', name: kRpcIpcLogPrefix); final packet = ActivityRpcServer.encodeIpcPacket(IpcTypes.frame, msg); - socket.add(packet); + if (Platform.isWindows && pipeHandle != null) { + final buffer = malloc.allocate(packet.length); + buffer.asTypedList(packet.length).setAll(0, packet); + final bytesWritten = malloc.allocate(sizeOf()); + try { + WriteFile( + pipeHandle!, + buffer.cast(), + packet.length, + bytesWritten, + nullptr, + ); + } finally { + malloc.free(buffer); + malloc.free(bytesWritten); + } + } else { + socket?.add(packet); + } } void sendPong(dynamic data) { final packet = ActivityRpcServer.encodeIpcPacket(IpcTypes.pong, data ?? {}); - socket.add(packet); + if (Platform.isWindows && pipeHandle != null) { + final buffer = malloc.allocate(packet.length); + buffer.asTypedList(packet.length).setAll(0, packet); + final bytesWritten = malloc.allocate(sizeOf()); + try { + WriteFile( + pipeHandle!, + buffer.cast(), + packet.length, + bytesWritten, + nullptr, + ); + } finally { + malloc.free(buffer); + malloc.free(bytesWritten); + } + } else { + socket?.add(packet); + } } void close() { - socket.close(); + if (Platform.isWindows && pipeHandle != null) { + CloseHandle(pipeHandle!); + } else { + socket?.close(); + } } void closeWithCode(int code, [String message = '']) { final closeData = {'code': code, 'message': message}; final packet = ActivityRpcServer.encodeIpcPacket(IpcTypes.close, closeData); - socket.add(packet); - socket.close(); + if (Platform.isWindows && pipeHandle != null) { + final buffer = malloc.allocate(packet.length); + buffer.asTypedList(packet.length).setAll(0, packet); + final bytesWritten = malloc.allocate(sizeOf()); + try { + WriteFile( + pipeHandle!, + buffer.cast(), + packet.length, + bytesWritten, + nullptr, + ); + } finally { + malloc.free(buffer); + malloc.free(bytesWritten); + } + CloseHandle(pipeHandle!); + } else { + socket?.add(packet); + socket?.close(); + } } List<_IpcPacket> readPackets() { @@ -568,10 +664,9 @@ class ServerStateNotifier extends StateNotifier { final ActivityRpcServer server; ServerStateNotifier(this.server) - : super(ServerState(status: 'Server not started')); + : super(ServerState(status: 'Server not started')); Future start() async { - // Only start server on desktop platforms if (!Platform.isAndroid && !Platform.isIOS && !kIsWeb) { try { await server.start(); @@ -596,77 +691,74 @@ class ServerStateNotifier extends StateNotifier { // Providers final rpcServerStateProvider = StateNotifierProvider((ref) { - final server = ActivityRpcServer({}); - final notifier = ServerStateNotifier(server); - server.updateHandlers({ - 'connection': (socket) { - final clientId = - socket is _WsSocketWrapper - ? socket.clientId - : (socket as _IpcSocketWrapper).clientId; - notifier.updateStatus('Client connected (ID: $clientId)'); - // Send READY event - socket.send({ - 'cmd': 'DISPATCH', - 'data': { - 'v': 1, - 'config': { - 'cdn_host': 'fake.cdn', - 'api_endpoint': '//fake.api', - 'environment': 'dev', - }, - 'user': { - 'id': 'fake_user_id', - 'username': 'FakeUser', - 'discriminator': '0001', - 'avatar': null, - 'bot': false, - }, - }, - 'evt': 'READY', - 'nonce': '12345', // Should be dynamic - }); - }, - 'message': (socket, dynamic data) async { - if (data['cmd'] == 'SET_ACTIVITY') { - notifier.addActivity( - 'Activity: ${data['args']['activity']['details'] ?? 'Unknown'}', - ); - // Call setRemoteActivityStatus - final label = data['args']['activity']['details'] ?? 'Unknown'; - final appId = socket.clientId; - try { - await setRemoteActivityStatus(ref, label, appId); - } catch (e) { - developer.log( - 'Failed to set remote activity status: $e', - name: kRpcLogPrefix, - ); - } - // Echo back success - socket.send({ - 'cmd': 'SET_ACTIVITY', - 'data': data['args']['activity'], - 'evt': null, - 'nonce': data['nonce'], - }); - } - }, - 'close': (socket) async { - notifier.updateStatus('Client disconnected'); - final appId = socket.clientId; - try { - await unsetRemoteActivityStatus(ref, appId); - } catch (e) { - developer.log( - 'Failed to unset remote activity status: $e', - name: kRpcLogPrefix, - ); - } + final server = ActivityRpcServer({}); + final notifier = ServerStateNotifier(server); + server.updateHandlers({ + 'connection': (socket) { + final clientId = + socket is _WsSocketWrapper + ? socket.clientId + : (socket as _IpcSocketWrapper).clientId; + notifier.updateStatus('Client connected (ID: $clientId)'); + socket.send({ + 'cmd': 'DISPATCH', + 'data': { + 'v': 1, + 'config': { + 'cdn_host': 'fake.cdn', + 'api_endpoint': '//fake.api', + 'environment': 'dev', + }, + 'user': { + 'id': 'fake_user_id', + 'username': 'FakeUser', + 'discriminator': '0001', + 'avatar': null, + 'bot': false, + }, }, + 'evt': 'READY', + 'nonce': '12345', }); - return notifier; - }); + }, + 'message': (socket, dynamic data) async { + if (data['cmd'] == 'SET_ACTIVITY') { + notifier.addActivity( + 'Activity: ${data['args']['activity']['details'] ?? 'Unknown'}', + ); + final label = data['args']['activity']['details'] ?? 'Unknown'; + final appId = socket.clientId; + try { + await setRemoteActivityStatus(ref, label, appId); + } catch (e) { + developer.log( + 'Failed to set remote activity status: $e', + name: kRpcLogPrefix, + ); + } + socket.send({ + 'cmd': 'SET_ACTIVITY', + 'data': data['args']['activity'], + 'evt': null, + 'nonce': data['nonce'], + }); + } + }, + 'close': (socket) async { + notifier.updateStatus('Client disconnected'); + final appId = socket.clientId; + try { + await unsetRemoteActivityStatus(ref, appId); + } catch (e) { + developer.log( + 'Failed to unset remote activity status: $e', + name: kRpcLogPrefix, + ); + } + }, + }); + return notifier; +}); final rpcServerProvider = Provider((ref) { final notifier = ref.watch(rpcServerStateProvider.notifier); @@ -697,4 +789,4 @@ Future unsetRemoteActivityStatus(Ref ref, String appId) async { '/id/accounts/me/statuses', queryParameters: {'app': appId}, ); -} +} \ No newline at end of file