🐛 Fix windows rpc

This commit is contained in:
2025-09-11 01:15:52 +08:00
parent 1c058a4323
commit 4af816d931

View File

@@ -3,6 +3,7 @@ import 'dart:convert';
import 'dart:developer' as developer;
import 'dart:ffi';
import 'dart:io';
import 'dart:isolate';
import 'package:flutter/foundation.dart';
import 'package:hooks_riverpod/hooks_riverpod.dart';
import 'package:island/pods/network.dart';
@@ -53,6 +54,7 @@ class ActivityRpcServer {
HttpServer? _httpServer;
ServerSocket? _ipcServer;
int? _pipeHandle; // For Windows named pipe
Timer? _ipcTimer; // Store timer for cancellation
final List<WebSocketChannel> _wsSockets = [];
final List<_IpcSocketWrapper> _ipcSockets = [];
@@ -155,13 +157,13 @@ class ActivityRpcServer {
// Start WebSocket server
while (port <= portRange[1]) {
developer.log('trying port $port', name: kRpcLogPrefix);
developer.log('Trying port $port', name: kRpcLogPrefix);
try {
_httpServer = await HttpServer.bind(InternetAddress.loopbackIPv4, port);
developer.log('listening on $port', name: kRpcLogPrefix);
developer.log('Listening on $port', name: kRpcLogPrefix);
shelf_io.serveRequests(_httpServer!, (Request request) async {
developer.log('new request', name: kRpcLogPrefix);
developer.log('New request', name: kRpcLogPrefix);
if (request.headers['upgrade']?.toLowerCase() == 'websocket') {
final handler = webSocketHandler((WebSocketChannel channel, _) {
_wsSockets.add(channel);
@@ -170,7 +172,7 @@ class ActivityRpcServer {
return handler(request);
}
developer.log(
'new request disposed due to not websocket',
'New request disposed due to not websocket',
name: kRpcLogPrefix,
);
return Response.notFound('Not a WebSocket request');
@@ -181,9 +183,10 @@ class ActivityRpcServer {
if (e is SocketException && e.osError?.errorCode == 98) {
developer.log('$port in use!', name: kRpcLogPrefix);
} else {
developer.log('http error: $e', name: kRpcLogPrefix);
developer.log('HTTP error: $e', name: kRpcLogPrefix);
}
port++;
await Future.delayed(Duration(milliseconds: 100)); // Add delay
}
}
@@ -244,42 +247,62 @@ class ActivityRpcServer {
developer.log('IPC named pipe created at \\\\.\\pipe\\discord-ipc', name: kRpcIpcLogPrefix);
// Start listening for connections in a separate isolate or async loop
// Start listening for connections in a separate isolate
_listenWindowsIpc();
} finally {
free(pipeName);
}
}
// Listen for Windows IPC connections
void _listenWindowsIpc() {
Timer.periodic(Duration(milliseconds: 100), (timer) async {
if (_pipeHandle == null || _pipeHandle == INVALID_HANDLE_VALUE) {
timer.cancel();
return;
}
// Listen for Windows IPC connections in an isolate
void _listenWindowsIpc() async {
final receivePort = ReceivePort();
await Isolate.spawn(_windowsIpcIsolate, receivePort.sendPort);
final connected = ConnectNamedPipe(_pipeHandle!, nullptr);
if (connected != 0 || GetLastError() == ERROR_PIPE_CONNECTED) {
final socketWrapper = _IpcSocketWrapper(_pipeHandle!);
receivePort.listen((message) {
if (message is int) {
final socketWrapper = _IpcSocketWrapper(message);
_ipcSockets.add(socketWrapper);
developer.log('New IPC connection on named pipe', name: kRpcIpcLogPrefix);
// Handle data reading in a separate async function
_handleWindowsIpcData(socketWrapper);
// Create a new pipe for the next connection
await _startWindowsIpcServer();
_startWindowsIpcServer(); // Create new pipe for next connection
}
});
}
static void _windowsIpcIsolate(SendPort sendPort) {
while (true) {
final pipeHandle = CreateNamedPipe(
r'\\.\pipe\discord-ipc'.toNativeUtf16(),
PIPE_ACCESS_DUPLEX,
PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE | PIPE_WAIT,
PIPE_UNLIMITED_INSTANCES,
4096,
4096,
0,
nullptr,
);
if (pipeHandle == INVALID_HANDLE_VALUE) {
developer.log('Failed to create named pipe: ${GetLastError()}', name: kRpcIpcLogPrefix);
break;
}
final connected = ConnectNamedPipe(pipeHandle, nullptr);
if (connected != 0 || GetLastError() == ERROR_PIPE_CONNECTED) {
sendPort.send(pipeHandle);
}
// Avoid tight loop
sleep(Duration(milliseconds: 100));
}
}
// Handle Windows IPC data
void _handleWindowsIpcData(_IpcSocketWrapper socket) async {
final startTime = DateTime.now();
final buffer = malloc.allocate<Uint8>(4096);
final bytesRead = malloc.allocate<Uint32>(sizeOf<Uint32>());
try {
while (socket.pipeHandle != null) {
final readStart = DateTime.now();
final success = ReadFile(
socket.pipeHandle!,
buffer.cast(),
@@ -287,6 +310,8 @@ class ActivityRpcServer {
bytesRead,
nullptr,
);
final readDuration = DateTime.now().difference(readStart).inMicroseconds;
developer.log('ReadFile took $readDuration microseconds', name: kRpcIpcLogPrefix);
if (success == FALSE && GetLastError() != ERROR_MORE_DATA) {
developer.log('IPC read error: ${GetLastError()}', name: kRpcIpcLogPrefix);
@@ -307,6 +332,8 @@ class ActivityRpcServer {
} finally {
malloc.free(buffer);
malloc.free(bytesRead);
final totalDuration = DateTime.now().difference(startTime).inMicroseconds;
developer.log('handleWindowsIpcData took $totalDuration microseconds', name: kRpcIpcLogPrefix);
}
}
@@ -314,23 +341,36 @@ class ActivityRpcServer {
Future<void> stop() async {
// Stop WebSocket server
for (var socket in _wsSockets) {
try {
await socket.sink.close();
} catch (e) {
developer.log('Error closing WebSocket: $e', name: kRpcLogPrefix);
}
}
_wsSockets.clear();
await _httpServer?.close();
await _httpServer?.close(force: true);
// Stop IPC server
for (var socket in _ipcSockets) {
try {
socket.close();
} catch (e) {
developer.log('Error closing IPC socket: $e', name: kRpcIpcLogPrefix);
}
}
_ipcSockets.clear();
if (Platform.isWindows && _pipeHandle != null) {
try {
CloseHandle(_pipeHandle!);
} catch (e) {
developer.log('Error closing named pipe: $e', name: kRpcIpcLogPrefix);
}
_pipeHandle = null;
}
_ipcTimer?.cancel();
await _ipcServer?.close();
developer.log('servers stopped', name: kRpcLogPrefix);
developer.log('Servers stopped', name: kRpcLogPrefix);
}
// Handle new WebSocket connection
@@ -343,7 +383,7 @@ class ActivityRpcServer {
final origin = request.headers['origin'] ?? '';
developer.log(
'new WS connection! origin: $origin, params: $params',
'New WS connection! origin: $origin, params: $params',
name: kRpcLogPrefix,
);
@@ -353,14 +393,14 @@ class ActivityRpcServer {
'https://ptb.discord.com',
'https://canary.discord.com',
].contains(origin)) {
developer.log('disallowed origin: $origin', name: kRpcLogPrefix);
developer.log('Disallowed origin: $origin', name: kRpcLogPrefix);
socket.sink.close();
return;
}
if (encoding != 'json') {
developer.log(
'unsupported encoding requested: $encoding',
'Unsupported encoding requested: $encoding',
name: kRpcLogPrefix,
);
socket.sink.close();
@@ -368,7 +408,7 @@ class ActivityRpcServer {
}
if (ver != 1) {
developer.log('unsupported version requested: $ver', name: kRpcLogPrefix);
developer.log('Unsupported version requested: $ver', name: kRpcLogPrefix);
socket.sink.close();
return;
}
@@ -392,7 +432,7 @@ class ActivityRpcServer {
// Handle new IPC connection
void _onIpcConnection(Socket socket) {
developer.log('new IPC connection!', name: kRpcIpcLogPrefix);
developer.log('New IPC connection!', name: kRpcIpcLogPrefix);
final socketWrapper = _IpcSocketWrapper.fromSocket(socket);
_ipcSockets.add(socketWrapper);
@@ -412,9 +452,17 @@ class ActivityRpcServer {
}
// Handle incoming WebSocket message
void _onWsMessage(_WsSocketWrapper socket, dynamic data) {
Future<void> _onWsMessage(_WsSocketWrapper socket, dynamic data) async {
if (data is! String) {
developer.log('Invalid WebSocket message: not a string', name: kRpcLogPrefix);
return;
}
try {
final jsonData = jsonDecode(data as String);
final jsonData = await compute(jsonDecode, data);
if (jsonData is! Map<String, dynamic>) {
developer.log('Invalid WebSocket message: not a JSON object', name: kRpcLogPrefix);
return;
}
developer.log('WS message: $jsonData', name: kRpcLogPrefix);
handlers['message']?.call(socket, jsonData);
} catch (e) {