From 3bd79350d164736894f22d9d68f821a387ad9598 Mon Sep 17 00:00:00 2001 From: LittleSheep Date: Mon, 8 Sep 2025 20:57:27 +0800 Subject: [PATCH] :bricks: Activity RPC server --- lib/pods/activity_rpc.dart | 572 +++++++++++++++++++++++++ lib/widgets/app_wrapper.dart | 4 + macos/Runner/DebugProfile.entitlements | 14 - pubspec.lock | 8 +- pubspec.yaml | 2 + 5 files changed, 582 insertions(+), 18 deletions(-) create mode 100644 lib/pods/activity_rpc.dart diff --git a/lib/pods/activity_rpc.dart b/lib/pods/activity_rpc.dart new file mode 100644 index 00000000..9cd76cb8 --- /dev/null +++ b/lib/pods/activity_rpc.dart @@ -0,0 +1,572 @@ +import 'dart:async'; +import 'dart:convert'; +import 'dart:developer' as developer; +import 'dart:io'; +import 'dart:typed_data'; +import 'package:hooks_riverpod/hooks_riverpod.dart'; +import 'package:shelf/shelf.dart'; +import 'package:shelf/shelf_io.dart' as shelf_io; +import 'package:shelf_web_socket/shelf_web_socket.dart'; +import 'package:web_socket_channel/web_socket_channel.dart'; +import 'package:path/path.dart' as path; + +const String kRpcLogPrefix = 'arRPC.websocket'; +const String kRpcIpcLogPrefix = 'arRPC.ipc'; + +// IPC Constants +const String kIpcBasePath = 'discord-ipc'; + +// IPC Packet Types +class IpcTypes { + static const int handshake = 0; + static const int frame = 1; + static const int close = 2; + static const int ping = 3; + static const int pong = 4; +} + +// IPC Close Codes +class IpcCloseCodes { + static const int closeNormal = 1000; + static const int closeUnsupported = 1003; + static const int closeAbnormal = 1006; +} + +// IPC Error Codes +class IpcErrorCodes { + static const int invalidClientId = 4000; + static const int invalidOrigin = 4001; + static const int rateLimited = 4002; + static const int tokenRevoked = 4003; + static const int invalidVersion = 4004; + static const int invalidEncoding = 4005; +} + +class ActivityRpcServer { + static const List portRange = [6463, 6472]; // Ports 6463–6472 + Map + handlers; // {connection: (socket), message: (socket, data), close: (socket)} + HttpServer? _httpServer; + ServerSocket? _ipcServer; + final List _wsSockets = []; + final List<_IpcSocketWrapper> _ipcSockets = []; + + ActivityRpcServer(this.handlers); + + void updateHandlers(Map newHandlers) { + handlers = newHandlers; + } + + // Encode IPC packet + static Uint8List encodeIpcPacket(int type, Map data) { + final jsonData = jsonEncode(data); + final dataBytes = utf8.encode(jsonData); + final dataSize = dataBytes.length; + + final buffer = ByteData(8 + dataSize); + buffer.setInt32(0, type, Endian.little); + buffer.setInt32(4, dataSize, Endian.little); + buffer.buffer.asUint8List().setRange(8, 8 + dataSize, dataBytes); + + return buffer.buffer.asUint8List(); + } + + Future _getMacOsSystemTmpDir() async { + final result = await Process.run('getconf', ['DARWIN_USER_TEMP_DIR']); + return (result.stdout as String).trim(); + } + + // Find available IPC socket path + Future _findAvailableIpcPath() async { + final baseDirs = [ + if (Platform.isMacOS) await _getMacOsSystemTmpDir(), + Platform.environment['XDG_RUNTIME_DIR'], // User runtime directory + Platform.environment['TMPDIR'], // App container temp (fallback) + Platform.environment['TMP'], + Platform.environment['TEMP'], + '/temp', + '/tmp', // System temp directory - most compatible + ]; + + for (final baseDir in baseDirs) { + if (baseDir == null || baseDir.isEmpty) continue; + for (int i = 0; i < 10; i++) { + final socketPath = path.join(baseDir, '$kIpcBasePath-$i'); + try { + final socket = await ServerSocket.bind( + InternetAddress(socketPath, type: InternetAddressType.unix), + 0, + ); + socket.close(); + // Clean up the test socket + try { + await File(socketPath).delete(); + } catch (_) {} + developer.log( + 'IPC socket will be created at: $socketPath', + name: kRpcIpcLogPrefix, + ); + return socketPath; + } catch (e) { + // Path not available, try next + if (i == 0) { + // Log only for the first attempt per directory + developer.log( + 'IPC path $socketPath not available: $e', + name: kRpcIpcLogPrefix, + ); + } + continue; + } + } + } + throw Exception( + 'No available IPC socket paths found in any temp directory', + ); + } + + // Start the WebSocket server + Future start() async { + int port = portRange[0]; + bool wsSuccess = false; + + // Start WebSocket server + while (port <= portRange[1]) { + developer.log('trying port $port', name: kRpcLogPrefix); + try { + // Start HTTP server + _httpServer = await HttpServer.bind(InternetAddress.loopbackIPv4, port); + developer.log('listening on $port', name: kRpcLogPrefix); + + // Handle WebSocket upgrades + shelf_io.serveRequests(_httpServer!, (Request request) async { + developer.log('new request', name: kRpcLogPrefix); + if (request.headers['upgrade']?.toLowerCase() == 'websocket') { + final handler = webSocketHandler((WebSocketChannel channel) { + _wsSockets.add(channel); + _onWsConnection(channel, request); + }); + return handler(request); + } + developer.log( + 'new request disposed due to not websocket', + name: kRpcLogPrefix, + ); + return Response.notFound('Not a WebSocket request'); + }); + wsSuccess = true; + break; + } catch (e) { + if (e is SocketException && e.osError?.errorCode == 98) { + // EADDRINUSE + developer.log('$port in use!', name: kRpcLogPrefix); + } else { + developer.log('http error: $e', name: kRpcLogPrefix); + } + port++; + } + } + + if (!wsSuccess) { + throw Exception( + 'Failed to bind to any port in range ${portRange[0]}–${portRange[1]}', + ); + } + + // Start IPC server + try { + final ipcPath = await _findAvailableIpcPath(); + _ipcServer = await ServerSocket.bind( + InternetAddress(ipcPath, type: InternetAddressType.unix), + 0, + ); + developer.log('IPC listening at $ipcPath', name: kRpcIpcLogPrefix); + + _ipcServer!.listen((Socket socket) { + _onIpcConnection(socket); + }); + } catch (e) { + developer.log('IPC server error: $e', name: kRpcIpcLogPrefix); + // Continue without IPC if it fails + } + } + + // Stop the server + Future stop() async { + // Stop WebSocket server + for (var socket in _wsSockets) { + await socket.sink.close(); + } + _wsSockets.clear(); + await _httpServer?.close(); + + // Stop IPC server + for (var socket in _ipcSockets) { + socket.close(); + } + _ipcSockets.clear(); + await _ipcServer?.close(); + + developer.log('servers stopped', name: kRpcLogPrefix); + } + + // Handle new WebSocket connection + void _onWsConnection(WebSocketChannel socket, Request request) { + // Parse query parameters + final uri = request.url; + final params = uri.queryParameters; + final ver = int.tryParse(params['v'] ?? '1') ?? 1; + final encoding = params['encoding'] ?? 'json'; + final clientId = params['client_id'] ?? ''; + final origin = request.headers['origin'] ?? ''; + + developer.log( + 'new WS connection! origin: $origin, params: $params', + name: kRpcLogPrefix, + ); + + // Validate origin + if (origin.isNotEmpty && + ![ + 'https://discord.com', + 'https://ptb.discord.com', + 'https://canary.discord.com', + ].contains(origin)) { + developer.log('disallowed origin: $origin', name: kRpcLogPrefix); + socket.sink.close(); + return; + } + + // Validate encoding + if (encoding != 'json') { + developer.log( + 'unsupported encoding requested: $encoding', + name: kRpcLogPrefix, + ); + socket.sink.close(); + return; + } + + // Validate version + if (ver != 1) { + developer.log('unsupported version requested: $ver', name: kRpcLogPrefix); + socket.sink.close(); + return; + } + + // Store client info on socket + final socketWithMeta = _WsSocketWrapper(socket, clientId, encoding); + + // Set up event listeners + socket.stream.listen( + (data) => _onWsMessage(socketWithMeta, data), + onError: (e) { + developer.log('WS socket error: $e', name: kRpcLogPrefix); + }, + onDone: () { + developer.log('WS socket closed', name: kRpcLogPrefix); + handlers['close']?.call(socketWithMeta); + _wsSockets.remove(socket); + }, + ); + + // Notify handler of new connection + handlers['connection']?.call(socketWithMeta); + } + + // Handle new IPC connection + void _onIpcConnection(Socket socket) { + developer.log('new IPC connection!', name: kRpcIpcLogPrefix); + + final socketWrapper = _IpcSocketWrapper(socket); + _ipcSockets.add(socketWrapper); + + // Set up event listeners + socket.listen( + (data) => _onIpcData(socketWrapper, data), + onError: (e) { + developer.log('IPC socket error: $e', name: kRpcIpcLogPrefix); + socket.close(); + }, + onDone: () { + developer.log('IPC socket closed', name: kRpcIpcLogPrefix); + handlers['close']?.call(socketWrapper); + _ipcSockets.remove(socketWrapper); + }, + ); + } + + // Handle incoming WebSocket message + void _onWsMessage(_WsSocketWrapper socket, dynamic data) { + try { + final jsonData = jsonDecode(data as String); + developer.log('WS message: $jsonData', name: kRpcLogPrefix); + handlers['message']?.call(socket, jsonData); + } catch (e) { + developer.log('WS message parse error: $e', name: kRpcLogPrefix); + } + } + + // Handle incoming IPC data + void _onIpcData(_IpcSocketWrapper socket, List data) { + try { + socket.addData(data); + final packets = socket.readPackets(); + for (final packet in packets) { + _handleIpcPacket(socket, packet); + } + } catch (e) { + developer.log('IPC data error: $e', name: kRpcIpcLogPrefix); + socket.closeWithCode(IpcCloseCodes.closeUnsupported, e.toString()); + } + } + + // Handle IPC packet + void _handleIpcPacket(_IpcSocketWrapper socket, _IpcPacket packet) { + switch (packet.type) { + case IpcTypes.ping: + developer.log('IPC ping received', name: kRpcIpcLogPrefix); + socket.sendPong(packet.data); + break; + + case IpcTypes.pong: + developer.log('IPC pong received', name: kRpcIpcLogPrefix); + // Handle pong if needed + break; + + case IpcTypes.handshake: + if (socket.handshook) { + throw Exception('Already handshook'); + } + socket.handshook = true; + _onIpcHandshake(socket, packet.data); + break; + + case IpcTypes.frame: + if (!socket.handshook) { + throw Exception('Need to handshake first'); + } + developer.log('IPC frame: ${packet.data}', name: kRpcIpcLogPrefix); + handlers['message']?.call(socket, packet.data); + break; + + case IpcTypes.close: + socket.close(); + break; + + default: + throw Exception('Invalid packet type: ${packet.type}'); + } + } + + // Handle IPC handshake + void _onIpcHandshake(_IpcSocketWrapper socket, Map params) { + developer.log('IPC handshake: $params', name: kRpcIpcLogPrefix); + + final ver = int.tryParse(params['v']?.toString() ?? '1') ?? 1; + final clientId = params['client_id']?.toString() ?? ''; + + // Validate version + if (ver != 1) { + developer.log( + 'IPC unsupported version requested: $ver', + name: kRpcIpcLogPrefix, + ); + socket.closeWithCode(IpcErrorCodes.invalidVersion); + return; + } + + // Validate client ID + if (clientId.isEmpty) { + developer.log('IPC client ID required', name: kRpcIpcLogPrefix); + socket.closeWithCode(IpcErrorCodes.invalidClientId); + return; + } + + socket.clientId = clientId; + + // Notify handler of new connection + handlers['connection']?.call(socket); + } +} + +// WebSocket wrapper +class _WsSocketWrapper { + final WebSocketChannel channel; + final String clientId; + final String encoding; + + _WsSocketWrapper(this.channel, this.clientId, this.encoding); + + void send(Map msg) { + developer.log('WS sending: $msg', name: kRpcLogPrefix); + channel.sink.add(jsonEncode(msg)); + } +} + +// IPC wrapper +class _IpcSocketWrapper { + final Socket socket; + String clientId = ''; + bool handshook = false; + final List _buffer = []; + + _IpcSocketWrapper(this.socket); + + void addData(List data) { + _buffer.addAll(data); + } + + void send(Map msg) { + developer.log('IPC sending: $msg', name: kRpcIpcLogPrefix); + final packet = ActivityRpcServer.encodeIpcPacket(IpcTypes.frame, msg); + socket.add(packet); + } + + void sendPong(dynamic data) { + final packet = ActivityRpcServer.encodeIpcPacket(IpcTypes.pong, data ?? {}); + socket.add(packet); + } + + void close() { + socket.close(); + } + + void closeWithCode(int code, [String message = '']) { + final closeData = {'code': code, 'message': message}; + final packet = ActivityRpcServer.encodeIpcPacket(IpcTypes.close, closeData); + socket.add(packet); + socket.close(); + } + + List<_IpcPacket> readPackets() { + final packets = <_IpcPacket>[]; + + while (_buffer.length >= 8) { + final buffer = Uint8List.fromList(_buffer); + final byteData = ByteData.view(buffer.buffer); + + final type = byteData.getInt32(0, Endian.little); + final dataSize = byteData.getInt32(4, Endian.little); + + if (_buffer.length < 8 + dataSize) break; + + final dataBytes = _buffer.sublist(8, 8 + dataSize); + final jsonStr = utf8.decode(dataBytes); + final jsonData = jsonDecode(jsonStr); + + packets.add(_IpcPacket(type, jsonData)); + + _buffer.removeRange(0, 8 + dataSize); + } + + return packets; + } +} + +// IPC Packet structure +class _IpcPacket { + final int type; + final Map data; + + _IpcPacket(this.type, this.data); +} + +// State management for server status and activities +class ServerState { + final String status; + final List activities; + + ServerState({required this.status, this.activities = const []}); + + ServerState copyWith({String? status, List? activities}) { + return ServerState( + status: status ?? this.status, + activities: activities ?? this.activities, + ); + } +} + +class ServerStateNotifier extends StateNotifier { + final ActivityRpcServer server; + + ServerStateNotifier(this.server) + : super(ServerState(status: 'Server not started')); + + Future start() async { + try { + await server.start(); + state = state.copyWith(status: 'Server running'); + } catch (e) { + state = state.copyWith(status: 'Server failed: $e'); + } + } + + void updateStatus(String status) { + state = state.copyWith(status: status); + } + + void addActivity(String activity) { + state = state.copyWith(activities: [...state.activities, activity]); + } +} + +// Providers +final rpcServerStateProvider = + StateNotifierProvider((ref) { + final server = ActivityRpcServer({}); + final notifier = ServerStateNotifier(server); + server.updateHandlers({ + 'connection': (socket) { + final clientId = + socket is _WsSocketWrapper + ? socket.clientId + : (socket as _IpcSocketWrapper).clientId; + notifier.updateStatus('Client connected (ID: $clientId)'); + // Send READY event + socket.send({ + 'cmd': 'DISPATCH', + 'data': { + 'v': 1, + 'config': { + 'cdn_host': 'fake.cdn', + 'api_endpoint': '//fake.api', + 'environment': 'dev', + }, + 'user': { + 'id': 'fake_user_id', + 'username': 'FakeUser', + 'discriminator': '0001', + 'avatar': null, + 'bot': false, + }, + }, + 'evt': 'READY', + 'nonce': '12345', // Should be dynamic + }); + }, + 'message': (socket, dynamic data) { + if (data['cmd'] == 'SET_ACTIVITY') { + notifier.addActivity( + 'Activity: ${data['args']['activity']['details'] ?? 'Unknown'}', + ); + // Echo back success + socket.send({ + 'cmd': 'SET_ACTIVITY', + 'data': data['args']['activity'], + 'evt': null, + 'nonce': data['nonce'], + }); + } + }, + 'close': (socket) { + notifier.updateStatus('Client disconnected'); + }, + }); + return notifier; + }); + +final rpcServerProvider = Provider((ref) { + final notifier = ref.watch(rpcServerStateProvider.notifier); + return notifier.server; +}); diff --git a/lib/widgets/app_wrapper.dart b/lib/widgets/app_wrapper.dart index 9bc0d390..4e983907 100644 --- a/lib/widgets/app_wrapper.dart +++ b/lib/widgets/app_wrapper.dart @@ -3,6 +3,7 @@ import 'package:bitsdojo_window/bitsdojo_window.dart'; import 'package:flutter/material.dart'; import 'package:flutter_hooks/flutter_hooks.dart'; import 'package:hooks_riverpod/hooks_riverpod.dart'; +import 'package:island/pods/activity_rpc.dart'; import 'package:island/pods/websocket.dart'; import 'package:island/screens/tray_manager.dart'; import 'package:island/services/notify.dart'; @@ -31,7 +32,10 @@ class AppWrapper extends HookConsumerWidget with TrayListener { TrayService.instance.initialize(this); + ref.read(rpcServerStateProvider.notifier).start(); + return () { + ref.read(rpcServerProvider).stop(); TrayService.instance.dispose(this); sharingService.dispose(); ntySubs?.cancel(); diff --git a/macos/Runner/DebugProfile.entitlements b/macos/Runner/DebugProfile.entitlements index 07f0ebed..ad00adc7 100644 --- a/macos/Runner/DebugProfile.entitlements +++ b/macos/Runner/DebugProfile.entitlements @@ -8,21 +8,7 @@ com.apple.developer.device-information.user-assigned-device-name - com.apple.security.app-sandbox - com.apple.security.cs.allow-jit - com.apple.security.device.audio-input - - com.apple.security.device.camera - - com.apple.security.files.downloads.read-write - - com.apple.security.files.user-selected.read-only - - com.apple.security.network.client - - com.apple.security.network.server - diff --git a/pubspec.lock b/pubspec.lock index d6c0ae40..04dd706c 100644 --- a/pubspec.lock +++ b/pubspec.lock @@ -2206,7 +2206,7 @@ packages: source: hosted version: "2.4.1" shelf: - dependency: transitive + dependency: "direct main" description: name: shelf sha256: e7dd780a7ffb623c57850b33f43309312fc863fb6aa3d276a754bb299839ef12 @@ -2214,13 +2214,13 @@ packages: source: hosted version: "1.4.2" shelf_web_socket: - dependency: transitive + dependency: "direct main" description: name: shelf_web_socket - sha256: "3632775c8e90d6c9712f883e633716432a27758216dfb61bd86a8321c0580925" + sha256: cc36c297b52866d203dbf9332263c94becc2fe0ceaa9681d07b6ef9807023b67 url: "https://pub.dev" source: hosted - version: "3.0.0" + version: "2.0.1" shortid: dependency: transitive description: diff --git a/pubspec.yaml b/pubspec.yaml index 59702158..9dbaef30 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -145,6 +145,8 @@ dependencies: flutter_local_notifications: ^19.4.1 wakelock_plus: ^1.3.2 slide_countdown: ^2.0.2 + shelf: ^1.4.2 + shelf_web_socket: ^2.0.0 dev_dependencies: flutter_test: