♻️ FFI windows rpc ipc implmentation

This commit is contained in:
2025-09-11 00:56:26 +08:00
parent 5363afa558
commit 461ed1fcda

View File

@@ -1,6 +1,7 @@
import 'dart:async';
import 'dart:convert';
import 'dart:developer' as developer;
import 'dart:ffi';
import 'dart:io';
import 'package:flutter/foundation.dart';
import 'package:hooks_riverpod/hooks_riverpod.dart';
@@ -10,6 +11,9 @@ import 'package:shelf/shelf_io.dart' as shelf_io;
import 'package:shelf_web_socket/shelf_web_socket.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:path/path.dart' as path;
import 'package:win32/win32.dart';
import 'package:win32/winsock2.dart' as winsock2;
import 'package:ffi/ffi.dart';
const String kRpcLogPrefix = 'arRPC.websocket';
const String kRpcIpcLogPrefix = 'arRPC.ipc';
@@ -43,12 +47,13 @@ class IpcErrorCodes {
static const int invalidEncoding = 4005;
}
// Reference https://github.com/OpenAsar/arrpc/blob/main/src/transports/ipc.js
class ActivityRpcServer {
static const List<int> portRange = [6463, 6472]; // Ports 64636472
Map<String, Function>
handlers; // {connection: (socket), message: (socket, data), close: (socket)}
Map<String, Function> handlers; // {connection: (socket), message: (socket, data), close: (socket)}
HttpServer? _httpServer;
ServerSocket? _ipcServer;
int? _pipeHandle; // For Windows named pipe
final List<WebSocketChannel> _wsSockets = [];
final List<_IpcSocketWrapper> _ipcSockets = [];
@@ -79,10 +84,7 @@ class ActivityRpcServer {
// Find available IPC socket path
Future<String> _findAvailableIpcPath() async {
if (Platform.isWindows) {
// Use TCP sockets on Windows for IPC (simpler and more compatible)
return _findAvailableTcpPort();
}
if (Platform.isWindows) return r'\\.\pipe\discord-ipc';
// Build list of directories to try, with macOS-specific handling
final baseDirs = <String>[];
@@ -103,11 +105,11 @@ class ActivityRpcServer {
// Add other standard directories
final otherDirs = [
Platform.environment['XDG_RUNTIME_DIR'], // User runtime directory
Platform.environment['TMPDIR'], // App container temp (fallback)
Platform.environment['XDG_RUNTIME_DIR'],
Platform.environment['TMPDIR'],
Platform.environment['TMP'],
Platform.environment['TEMP'],
'/tmp', // System temp directory - most compatible
'/tmp',
];
baseDirs.addAll(
@@ -123,7 +125,6 @@ class ActivityRpcServer {
0,
);
socket.close();
// Clean up the test socket
try {
await File(socketPath).delete();
} catch (_) {}
@@ -133,9 +134,7 @@ class ActivityRpcServer {
);
return socketPath;
} catch (e) {
// Path not available, try next
if (i == 0) {
// Log only for the first attempt per directory
developer.log(
'IPC path $socketPath not available: $e',
name: kRpcIpcLogPrefix,
@@ -150,36 +149,7 @@ class ActivityRpcServer {
);
}
// Find available TCP port for Windows IPC
Future<String> _findAvailableTcpPort() async {
// Use ports in the range 6473-6482 (different from WebSocket server range 6463-6472)
for (int port = 6473; port <= 6482; port++) {
try {
final socket = await ServerSocket.bind(
InternetAddress.loopbackIPv4,
port,
);
socket.close();
developer.log(
'IPC TCP socket will be created on port: $port',
name: kRpcIpcLogPrefix,
);
return port.toString(); // Return as string to match existing interface
} catch (e) {
// Port not available, try next
if (port == 6473) {
developer.log(
'IPC TCP port $port not available: $e',
name: kRpcIpcLogPrefix,
);
}
continue;
}
}
throw Exception('No available IPC TCP ports found');
}
// Start the WebSocket server
// Start the server
Future<void> start() async {
int port = portRange[0];
bool wsSuccess = false;
@@ -188,11 +158,9 @@ class ActivityRpcServer {
while (port <= portRange[1]) {
developer.log('trying port $port', name: kRpcLogPrefix);
try {
// Start HTTP server
_httpServer = await HttpServer.bind(InternetAddress.loopbackIPv4, port);
developer.log('listening on $port', name: kRpcLogPrefix);
// Handle WebSocket upgrades
shelf_io.serveRequests(_httpServer!, (Request request) async {
developer.log('new request', name: kRpcLogPrefix);
if (request.headers['upgrade']?.toLowerCase() == 'websocket') {
@@ -212,7 +180,6 @@ class ActivityRpcServer {
break;
} catch (e) {
if (e is SocketException && e.osError?.errorCode == 98) {
// EADDRINUSE
developer.log('$port in use!', name: kRpcLogPrefix);
} else {
developer.log('http error: $e', name: kRpcLogPrefix);
@@ -227,47 +194,123 @@ class ActivityRpcServer {
);
}
// Start IPC server (skip on macOS due to sandboxing)
final shouldStartIpc = !Platform.isMacOS;
// Start IPC server
final shouldStartIpc = !Platform.isMacOS && !kIsWeb;
if (shouldStartIpc) {
try {
if (Platform.isWindows) {
// Use TCP socket on Windows
final ipcPortStr = await _findAvailableIpcPath();
final ipcPort = int.parse(ipcPortStr);
_ipcServer = await ServerSocket.bind(
InternetAddress.loopbackIPv4,
ipcPort,
);
developer.log(
'IPC listening on TCP port $ipcPort',
name: kRpcIpcLogPrefix,
);
} else {
// Use Unix socket on other platforms
if (Platform.isWindows) {
await _startWindowsIpcServer();
} else {
try {
final ipcPath = await _findAvailableIpcPath();
_ipcServer = await ServerSocket.bind(
InternetAddress(ipcPath, type: InternetAddressType.unix),
0,
);
developer.log('IPC listening at $ipcPath', name: kRpcIpcLogPrefix);
}
_ipcServer!.listen((Socket socket) {
_onIpcConnection(socket);
});
} catch (e) {
developer.log('IPC server error: $e', name: kRpcIpcLogPrefix);
// Continue without IPC if it fails
_ipcServer!.listen((Socket socket) {
_onIpcConnection(socket);
});
} catch (e) {
developer.log('IPC server error: $e', name: kRpcIpcLogPrefix);
}
}
} else {
developer.log(
'IPC server disabled on macOS due to sandboxing',
'IPC server disabled on macOS or web in production mode',
name: kRpcIpcLogPrefix,
);
}
}
// Start Windows-specific IPC server using Winsock2 named pipe
Future<void> _startWindowsIpcServer() async {
final pipeName = r'\\.\pipe\discord-ipc'.toNativeUtf16();
try {
_pipeHandle = CreateNamedPipe(
pipeName,
PIPE_ACCESS_DUPLEX,
PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE | PIPE_WAIT,
PIPE_UNLIMITED_INSTANCES,
4096, // Output buffer size
4096, // Input buffer size
0, // Default timeout
nullptr, // Security attributes
);
if (_pipeHandle == INVALID_HANDLE_VALUE) {
final error = GetLastError();
throw Exception('Failed to create named pipe: error code $error');
}
developer.log('IPC named pipe created at \\\\.\\pipe\\discord-ipc', name: kRpcIpcLogPrefix);
// Start listening for connections in a separate isolate or async loop
_listenWindowsIpc();
} finally {
free(pipeName);
}
}
// Listen for Windows IPC connections
void _listenWindowsIpc() {
Timer.periodic(Duration(milliseconds: 100), (timer) async {
if (_pipeHandle == null || _pipeHandle == INVALID_HANDLE_VALUE) {
timer.cancel();
return;
}
final connected = ConnectNamedPipe(_pipeHandle!, nullptr);
if (connected != 0 || GetLastError() == ERROR_PIPE_CONNECTED) {
final socketWrapper = _IpcSocketWrapper(_pipeHandle!);
_ipcSockets.add(socketWrapper);
developer.log('New IPC connection on named pipe', name: kRpcIpcLogPrefix);
// Handle data reading in a separate async function
_handleWindowsIpcData(socketWrapper);
// Create a new pipe for the next connection
await _startWindowsIpcServer();
}
});
}
// Handle Windows IPC data
void _handleWindowsIpcData(_IpcSocketWrapper socket) async {
final buffer = malloc.allocate<Uint8>(4096);
final bytesRead = malloc.allocate<Uint32>(sizeOf<Uint32>());
try {
while (socket.pipeHandle != null) {
final success = ReadFile(
socket.pipeHandle!,
buffer.cast(),
4096,
bytesRead,
nullptr,
);
if (success == FALSE && GetLastError() != ERROR_MORE_DATA) {
developer.log('IPC read error: ${GetLastError()}', name: kRpcIpcLogPrefix);
socket.close();
break;
}
final data = buffer.asTypedList(bytesRead.value);
socket.addData(data);
final packets = socket.readPackets();
for (final packet in packets) {
_handleIpcPacket(socket, packet);
}
}
} catch (e) {
developer.log('IPC data error: $e', name: kRpcIpcLogPrefix);
socket.closeWithCode(IpcCloseCodes.closeUnsupported, e.toString());
} finally {
malloc.free(buffer);
malloc.free(bytesRead);
}
}
// Stop the server
Future<void> stop() async {
// Stop WebSocket server
@@ -282,6 +325,10 @@ class ActivityRpcServer {
socket.close();
}
_ipcSockets.clear();
if (Platform.isWindows && _pipeHandle != null) {
CloseHandle(_pipeHandle!);
_pipeHandle = null;
}
await _ipcServer?.close();
developer.log('servers stopped', name: kRpcLogPrefix);
@@ -289,7 +336,6 @@ class ActivityRpcServer {
// Handle new WebSocket connection
void _onWsConnection(WebSocketChannel socket, Request request) {
// Parse query parameters
final uri = request.url;
final params = uri.queryParameters;
final ver = int.tryParse(params['v'] ?? '1') ?? 1;
@@ -302,7 +348,6 @@ class ActivityRpcServer {
name: kRpcLogPrefix,
);
// Validate origin
if (origin.isNotEmpty &&
![
'https://discord.com',
@@ -314,7 +359,6 @@ class ActivityRpcServer {
return;
}
// Validate encoding
if (encoding != 'json') {
developer.log(
'unsupported encoding requested: $encoding',
@@ -324,17 +368,14 @@ class ActivityRpcServer {
return;
}
// Validate version
if (ver != 1) {
developer.log('unsupported version requested: $ver', name: kRpcLogPrefix);
socket.sink.close();
return;
}
// Store client info on socket
final socketWithMeta = _WsSocketWrapper(socket, clientId, encoding);
// Set up event listeners
socket.stream.listen(
(data) => _onWsMessage(socketWithMeta, data),
onError: (e) {
@@ -347,7 +388,6 @@ class ActivityRpcServer {
},
);
// Notify handler of new connection
handlers['connection']?.call(socketWithMeta);
}
@@ -355,10 +395,9 @@ class ActivityRpcServer {
void _onIpcConnection(Socket socket) {
developer.log('new IPC connection!', name: kRpcIpcLogPrefix);
final socketWrapper = _IpcSocketWrapper(socket);
final socketWrapper = _IpcSocketWrapper.fromSocket(socket);
_ipcSockets.add(socketWrapper);
// Set up event listeners
socket.listen(
(data) => _onIpcData(socketWrapper, data),
onError: (e) {
@@ -408,7 +447,6 @@ class ActivityRpcServer {
case IpcTypes.pong:
developer.log('IPC pong received', name: kRpcIpcLogPrefix);
// Handle pong if needed
break;
case IpcTypes.handshake:
@@ -443,7 +481,6 @@ class ActivityRpcServer {
final ver = int.tryParse(params['v']?.toString() ?? '1') ?? 1;
final clientId = params['client_id']?.toString() ?? '';
// Validate version
if (ver != 1) {
developer.log(
'IPC unsupported version requested: $ver',
@@ -453,7 +490,6 @@ class ActivityRpcServer {
return;
}
// Validate client ID
if (clientId.isEmpty) {
developer.log('IPC client ID required', name: kRpcIpcLogPrefix);
socket.closeWithCode(IpcErrorCodes.invalidClientId);
@@ -462,7 +498,6 @@ class ActivityRpcServer {
socket.clientId = clientId;
// Notify handler of new connection
handlers['connection']?.call(socket);
}
}
@@ -483,12 +518,14 @@ class _WsSocketWrapper {
// IPC wrapper
class _IpcSocketWrapper {
final Socket socket;
final Socket? socket;
final int? pipeHandle;
String clientId = '';
bool handshook = false;
final List<int> _buffer = [];
_IpcSocketWrapper(this.socket);
_IpcSocketWrapper(this.pipeHandle) : socket = null;
_IpcSocketWrapper.fromSocket(this.socket) : pipeHandle = null;
void addData(List<int> data) {
_buffer.addAll(data);
@@ -497,23 +534,82 @@ class _IpcSocketWrapper {
void send(Map<String, dynamic> msg) {
developer.log('IPC sending: $msg', name: kRpcIpcLogPrefix);
final packet = ActivityRpcServer.encodeIpcPacket(IpcTypes.frame, msg);
socket.add(packet);
if (Platform.isWindows && pipeHandle != null) {
final buffer = malloc.allocate<Uint8>(packet.length);
buffer.asTypedList(packet.length).setAll(0, packet);
final bytesWritten = malloc.allocate<Uint32>(sizeOf<Uint32>());
try {
WriteFile(
pipeHandle!,
buffer.cast(),
packet.length,
bytesWritten,
nullptr,
);
} finally {
malloc.free(buffer);
malloc.free(bytesWritten);
}
} else {
socket?.add(packet);
}
}
void sendPong(dynamic data) {
final packet = ActivityRpcServer.encodeIpcPacket(IpcTypes.pong, data ?? {});
socket.add(packet);
if (Platform.isWindows && pipeHandle != null) {
final buffer = malloc.allocate<Uint8>(packet.length);
buffer.asTypedList(packet.length).setAll(0, packet);
final bytesWritten = malloc.allocate<Uint32>(sizeOf<Uint32>());
try {
WriteFile(
pipeHandle!,
buffer.cast(),
packet.length,
bytesWritten,
nullptr,
);
} finally {
malloc.free(buffer);
malloc.free(bytesWritten);
}
} else {
socket?.add(packet);
}
}
void close() {
socket.close();
if (Platform.isWindows && pipeHandle != null) {
CloseHandle(pipeHandle!);
} else {
socket?.close();
}
}
void closeWithCode(int code, [String message = '']) {
final closeData = {'code': code, 'message': message};
final packet = ActivityRpcServer.encodeIpcPacket(IpcTypes.close, closeData);
socket.add(packet);
socket.close();
if (Platform.isWindows && pipeHandle != null) {
final buffer = malloc.allocate<Uint8>(packet.length);
buffer.asTypedList(packet.length).setAll(0, packet);
final bytesWritten = malloc.allocate<Uint32>(sizeOf<Uint32>());
try {
WriteFile(
pipeHandle!,
buffer.cast(),
packet.length,
bytesWritten,
nullptr,
);
} finally {
malloc.free(buffer);
malloc.free(bytesWritten);
}
CloseHandle(pipeHandle!);
} else {
socket?.add(packet);
socket?.close();
}
}
List<_IpcPacket> readPackets() {
@@ -568,10 +664,9 @@ class ServerStateNotifier extends StateNotifier<ServerState> {
final ActivityRpcServer server;
ServerStateNotifier(this.server)
: super(ServerState(status: 'Server not started'));
: super(ServerState(status: 'Server not started'));
Future<void> start() async {
// Only start server on desktop platforms
if (!Platform.isAndroid && !Platform.isIOS && !kIsWeb) {
try {
await server.start();
@@ -596,77 +691,74 @@ class ServerStateNotifier extends StateNotifier<ServerState> {
// Providers
final rpcServerStateProvider =
StateNotifierProvider<ServerStateNotifier, ServerState>((ref) {
final server = ActivityRpcServer({});
final notifier = ServerStateNotifier(server);
server.updateHandlers({
'connection': (socket) {
final clientId =
socket is _WsSocketWrapper
? socket.clientId
: (socket as _IpcSocketWrapper).clientId;
notifier.updateStatus('Client connected (ID: $clientId)');
// Send READY event
socket.send({
'cmd': 'DISPATCH',
'data': {
'v': 1,
'config': {
'cdn_host': 'fake.cdn',
'api_endpoint': '//fake.api',
'environment': 'dev',
},
'user': {
'id': 'fake_user_id',
'username': 'FakeUser',
'discriminator': '0001',
'avatar': null,
'bot': false,
},
},
'evt': 'READY',
'nonce': '12345', // Should be dynamic
});
},
'message': (socket, dynamic data) async {
if (data['cmd'] == 'SET_ACTIVITY') {
notifier.addActivity(
'Activity: ${data['args']['activity']['details'] ?? 'Unknown'}',
);
// Call setRemoteActivityStatus
final label = data['args']['activity']['details'] ?? 'Unknown';
final appId = socket.clientId;
try {
await setRemoteActivityStatus(ref, label, appId);
} catch (e) {
developer.log(
'Failed to set remote activity status: $e',
name: kRpcLogPrefix,
);
}
// Echo back success
socket.send({
'cmd': 'SET_ACTIVITY',
'data': data['args']['activity'],
'evt': null,
'nonce': data['nonce'],
});
}
},
'close': (socket) async {
notifier.updateStatus('Client disconnected');
final appId = socket.clientId;
try {
await unsetRemoteActivityStatus(ref, appId);
} catch (e) {
developer.log(
'Failed to unset remote activity status: $e',
name: kRpcLogPrefix,
);
}
final server = ActivityRpcServer({});
final notifier = ServerStateNotifier(server);
server.updateHandlers({
'connection': (socket) {
final clientId =
socket is _WsSocketWrapper
? socket.clientId
: (socket as _IpcSocketWrapper).clientId;
notifier.updateStatus('Client connected (ID: $clientId)');
socket.send({
'cmd': 'DISPATCH',
'data': {
'v': 1,
'config': {
'cdn_host': 'fake.cdn',
'api_endpoint': '//fake.api',
'environment': 'dev',
},
'user': {
'id': 'fake_user_id',
'username': 'FakeUser',
'discriminator': '0001',
'avatar': null,
'bot': false,
},
},
'evt': 'READY',
'nonce': '12345',
});
return notifier;
});
},
'message': (socket, dynamic data) async {
if (data['cmd'] == 'SET_ACTIVITY') {
notifier.addActivity(
'Activity: ${data['args']['activity']['details'] ?? 'Unknown'}',
);
final label = data['args']['activity']['details'] ?? 'Unknown';
final appId = socket.clientId;
try {
await setRemoteActivityStatus(ref, label, appId);
} catch (e) {
developer.log(
'Failed to set remote activity status: $e',
name: kRpcLogPrefix,
);
}
socket.send({
'cmd': 'SET_ACTIVITY',
'data': data['args']['activity'],
'evt': null,
'nonce': data['nonce'],
});
}
},
'close': (socket) async {
notifier.updateStatus('Client disconnected');
final appId = socket.clientId;
try {
await unsetRemoteActivityStatus(ref, appId);
} catch (e) {
developer.log(
'Failed to unset remote activity status: $e',
name: kRpcLogPrefix,
);
}
},
});
return notifier;
});
final rpcServerProvider = Provider<ActivityRpcServer>((ref) {
final notifier = ref.watch(rpcServerStateProvider.notifier);
@@ -697,4 +789,4 @@ Future<void> unsetRemoteActivityStatus(Ref ref, String appId) async {
'/id/accounts/me/statuses',
queryParameters: {'app': appId},
);
}
}