From 57f85ec3419d76c07680b2f05b298e1b9f9d72f7 Mon Sep 17 00:00:00 2001 From: LittleSheep Date: Fri, 18 Jul 2025 16:12:34 +0800 Subject: [PATCH] :sparkles: Websocket handler --- .../Account/ActionLogServiceGrpc.cs | 2 +- .../Connection/WebSocketController.cs | 4 +- .../Connection/WebSocketPacket.cs | 24 +++- .../Connection/WebSocketService.cs | 67 ++++++++- .../Data/CloudFileReferenceObject.cs | 4 +- DysonNetwork.Shared/Proto/GrpcClientHelper.cs | 2 +- DysonNetwork.Shared/Proto/GrpcTypeHelper.cs | 36 ++++- DysonNetwork.Shared/Proto/pusher.proto | 13 ++ .../Connection/WebsocketHandler.cs | 129 ++++++++++++++++++ .../Startup/ApplicationConfiguration.cs | 4 + .../Startup/ServiceCollectionExtensions.cs | 5 + DysonNetwork.sln.DotSettings.user | 1 + 12 files changed, 276 insertions(+), 15 deletions(-) create mode 100644 DysonNetwork.Sphere/Connection/WebsocketHandler.cs diff --git a/DysonNetwork.Pass/Account/ActionLogServiceGrpc.cs b/DysonNetwork.Pass/Account/ActionLogServiceGrpc.cs index e81f2b6..186519b 100644 --- a/DysonNetwork.Pass/Account/ActionLogServiceGrpc.cs +++ b/DysonNetwork.Pass/Account/ActionLogServiceGrpc.cs @@ -32,7 +32,7 @@ public class ActionLogServiceGrpc : Shared.Proto.ActionLogService.ActionLogServi try { var meta = request.Meta - ?.Select(x => new KeyValuePair(x.Key, GrpcTypeHelper.ConvertField(x.Value))) + ?.Select(x => new KeyValuePair(x.Key, GrpcTypeHelper.ConvertValueToObject(x.Value))) .ToDictionary() ?? new Dictionary(); _actionLogService.CreateActionLog( diff --git a/DysonNetwork.Pusher/Connection/WebSocketController.cs b/DysonNetwork.Pusher/Connection/WebSocketController.cs index a7135d8..8acca46 100644 --- a/DysonNetwork.Pusher/Connection/WebSocketController.cs +++ b/DysonNetwork.Pusher/Connection/WebSocketController.cs @@ -40,8 +40,8 @@ public class WebSocketController(WebSocketService ws, ILogger if (!ws.TryAdd(connectionKey, webSocket, cts)) { await webSocket.CloseAsync( - WebSocketCloseStatus.InternalServerError, - "Failed to establish connection.", + WebSocketCloseStatus.PolicyViolation, + "Too many connections from the same device and account.", CancellationToken.None ); return; diff --git a/DysonNetwork.Pusher/Connection/WebSocketPacket.cs b/DysonNetwork.Pusher/Connection/WebSocketPacket.cs index cbb2f50..59a3733 100644 --- a/DysonNetwork.Pusher/Connection/WebSocketPacket.cs +++ b/DysonNetwork.Pusher/Connection/WebSocketPacket.cs @@ -1,4 +1,5 @@ using System.Text.Json; +using DysonNetwork.Shared.Proto; using NodaTime; using NodaTime.Serialization.SystemTextJson; @@ -7,7 +8,8 @@ namespace DysonNetwork.Pusher.Connection; public class WebSocketPacket { public string Type { get; set; } = null!; - public object Data { get; set; } = null!; + public object? Data { get; set; } = null!; + public string? Endpoint { get; set; } public string? ErrorMessage { get; set; } /// @@ -62,4 +64,24 @@ public class WebSocketPacket var json = JsonSerializer.Serialize(this, jsonOpts); return System.Text.Encoding.UTF8.GetBytes(json); } + + public Shared.Proto.WebSocketPacket ToProtoValue() + { + return new Shared.Proto.WebSocketPacket + { + Type = Type, + Data = GrpcTypeHelper.ConvertClassToValue(Data), + ErrorMessage = ErrorMessage + }; + } + + public static WebSocketPacket FromProtoValue(Shared.Proto.WebSocketPacket packet) + { + return new WebSocketPacket + { + Type = packet.Type, + Data = GrpcTypeHelper.ConvertValueToObject(packet.Data), + ErrorMessage = packet.ErrorMessage + }; + } } \ No newline at end of file diff --git a/DysonNetwork.Pusher/Connection/WebSocketService.cs b/DysonNetwork.Pusher/Connection/WebSocketService.cs index 94eeaaf..992527b 100644 --- a/DysonNetwork.Pusher/Connection/WebSocketService.cs +++ b/DysonNetwork.Pusher/Connection/WebSocketService.cs @@ -1,16 +1,29 @@ using System.Collections.Concurrent; using System.Net.WebSockets; +using dotnet_etcd.interfaces; using DysonNetwork.Shared.Data; using DysonNetwork.Shared.Proto; +using Grpc.Core; namespace DysonNetwork.Pusher.Connection; public class WebSocketService { + private readonly IConfiguration _configuration; + private readonly ILogger _logger; + private readonly IEtcdClient _etcdClient; private readonly IDictionary _handlerMap; - public WebSocketService(IEnumerable handlers) + public WebSocketService( + IEnumerable handlers, + IEtcdClient etcdClient, + ILogger logger, + IConfiguration configuration + ) { + _etcdClient = etcdClient; + _logger = logger; + _configuration = configuration; _handlerMap = handlers.ToDictionary(h => h.PacketType); } @@ -44,7 +57,7 @@ public class WebSocketService data.Cts.Cancel(); ActiveConnections.TryRemove(key, out _); } - + public bool GetDeviceIsConnected(string deviceId) { return ActiveConnections.Any(c => c.Key.DeviceId == deviceId); @@ -102,6 +115,56 @@ public class WebSocketService return; } + if (packet.Endpoint is not null) + { + try + { + // Get the service URL from etcd for the specified endpoint + var serviceKey = $"/services/{packet.Endpoint}"; + var response = await _etcdClient.GetAsync(serviceKey); + + if (response.Kvs.Count > 0) + { + var serviceUrl = response.Kvs[0].Value.ToStringUtf8(); + + var clientCertPath = _configuration["Service:ClientCert"]!; + var clientKeyPath = _configuration["Service:ClientKey"]!; + var clientCertPassword = _configuration["Service:CertPassword"]; + + var callInvoker = + GrpcClientHelper.CreateCallInvoker( + serviceUrl, + clientCertPath, + clientKeyPath, + clientCertPassword + ); + var client = new PusherHandlerService.PusherHandlerServiceClient(callInvoker); + + try + { + await client.ReceiveWebSocketPacketAsync(new ReceiveWebSocketPacketRequest + { + Account = currentUser, + DeviceId = deviceId, + Packet = packet.ToProtoValue() + }); + } + catch (RpcException ex) + { + _logger.LogError(ex, $"Error forwarding packet to endpoint: {packet.Endpoint}"); + } + + return; + } + + _logger.LogWarning($"No service registered for endpoint: {packet.Endpoint}"); + } + catch (Exception ex) + { + _logger.LogError(ex, $"Error forwarding packet to endpoint: {packet.Endpoint}"); + } + } + await socket.SendAsync( new ArraySegment(new WebSocketPacket { diff --git a/DysonNetwork.Shared/Data/CloudFileReferenceObject.cs b/DysonNetwork.Shared/Data/CloudFileReferenceObject.cs index ec69d00..ee24982 100644 --- a/DysonNetwork.Shared/Data/CloudFileReferenceObject.cs +++ b/DysonNetwork.Shared/Data/CloudFileReferenceObject.cs @@ -42,9 +42,9 @@ public class CloudFileReferenceObject : ModelBase, ICloudFile Id = proto.Id, Name = proto.Name, FileMeta = proto.FileMeta - .ToDictionary(kvp => kvp.Key, kvp => GrpcTypeHelper.ConvertField(kvp.Value)), + .ToDictionary(kvp => kvp.Key, kvp => GrpcTypeHelper.ConvertValueToObject(kvp.Value)), UserMeta = proto.UserMeta - .ToDictionary(kvp => kvp.Key, kvp => GrpcTypeHelper.ConvertField(kvp.Value)), + .ToDictionary(kvp => kvp.Key, kvp => GrpcTypeHelper.ConvertValueToObject(kvp.Value)), MimeType = proto.MimeType, Hash = proto.Hash, Size = proto.Size, diff --git a/DysonNetwork.Shared/Proto/GrpcClientHelper.cs b/DysonNetwork.Shared/Proto/GrpcClientHelper.cs index e62970d..a865319 100644 --- a/DysonNetwork.Shared/Proto/GrpcClientHelper.cs +++ b/DysonNetwork.Shared/Proto/GrpcClientHelper.cs @@ -8,7 +8,7 @@ namespace DysonNetwork.Shared.Proto; public static class GrpcClientHelper { - private static CallInvoker CreateCallInvoker( + public static CallInvoker CreateCallInvoker( string url, string clientCertPath, string clientKeyPath, diff --git a/DysonNetwork.Shared/Proto/GrpcTypeHelper.cs b/DysonNetwork.Shared/Proto/GrpcTypeHelper.cs index 7ed044c..7eb90b8 100644 --- a/DysonNetwork.Shared/Proto/GrpcTypeHelper.cs +++ b/DysonNetwork.Shared/Proto/GrpcTypeHelper.cs @@ -1,6 +1,8 @@ +using System.Text.Json; using Google.Protobuf.Collections; using Google.Protobuf.WellKnownTypes; using Newtonsoft.Json; +using Newtonsoft.Json.Serialization; namespace DysonNetwork.Shared.Proto; @@ -8,8 +10,10 @@ public abstract class GrpcTypeHelper { private static readonly JsonSerializerSettings SerializerSettings = new() { - PreserveReferencesHandling = PreserveReferencesHandling.All, - ReferenceLoopHandling = ReferenceLoopHandling.Ignore + ContractResolver = new DefaultContractResolver { NamingStrategy = new SnakeCaseNamingStrategy() }, + PreserveReferencesHandling = PreserveReferencesHandling.Objects, + NullValueHandling = NullValueHandling.Include, + DateParseHandling = DateParseHandling.None }; public static MapField ConvertToValueMap(Dictionary source) @@ -44,7 +48,7 @@ public abstract class GrpcTypeHelper try { // Try to parse as JSON object or primitive - result[kvp.Key] = JsonConvert.DeserializeObject(value.StringValue); + result[kvp.Key] = JsonConvert.DeserializeObject(value.StringValue, SerializerSettings); } catch { @@ -62,10 +66,10 @@ public abstract class GrpcTypeHelper result[kvp.Key] = null; break; case Value.KindOneofCase.StructValue: - result[kvp.Key] = JsonConvert.DeserializeObject(JsonConvert.SerializeObject(value.StructValue.Fields.ToDictionary(f => f.Key, f => ConvertField(f.Value)), SerializerSettings)); + result[kvp.Key] = JsonConvert.DeserializeObject(JsonConvert.SerializeObject(value.StructValue.Fields.ToDictionary(f => f.Key, f => ConvertValueToObject(f.Value)), SerializerSettings)); break; case Value.KindOneofCase.ListValue: - result[kvp.Key] = JsonConvert.DeserializeObject(JsonConvert.SerializeObject(value.ListValue.Values.Select(ConvertField).ToList(), SerializerSettings)); + result[kvp.Key] = JsonConvert.DeserializeObject(JsonConvert.SerializeObject(value.ListValue.Values.Select(ConvertValueToObject).ToList(), SerializerSettings), SerializerSettings); break; default: result[kvp.Key] = null; @@ -75,7 +79,7 @@ public abstract class GrpcTypeHelper return result; } - public static object? ConvertField(Value value) + public static object? ConvertValueToObject(Value value) { return value.KindCase switch { @@ -87,6 +91,15 @@ public abstract class GrpcTypeHelper }; } + public static T? ConvertValueToClass(Value value) + { + return value.KindCase switch + { + Value.KindOneofCase.StringValue => JsonConvert.DeserializeObject(value.StringValue, SerializerSettings), + _ => JsonConvert.DeserializeObject(JsonConvert.SerializeObject(value, SerializerSettings)) + }; + } + public static Value ConvertObjectToValue(object? obj) { return obj switch @@ -101,4 +114,15 @@ public abstract class GrpcTypeHelper _ => Value.ForString(JsonConvert.SerializeObject(obj, SerializerSettings)) // fallback to JSON string }; } + + public static Value ConvertClassToValue(T obj) + { + if (obj is JsonElement element) + return Value.ForString(element.GetRawText()); + return obj switch + { + null => Value.ForNull(), + _ => Value.ForString(JsonConvert.SerializeObject(obj, SerializerSettings)) + }; + } } \ No newline at end of file diff --git a/DysonNetwork.Shared/Proto/pusher.proto b/DysonNetwork.Shared/Proto/pusher.proto index 65d71a3..bed1e02 100644 --- a/DysonNetwork.Shared/Proto/pusher.proto +++ b/DysonNetwork.Shared/Proto/pusher.proto @@ -8,6 +8,8 @@ import "google/protobuf/struct.proto"; import "google/protobuf/empty.proto"; import "google/protobuf/wrappers.proto"; +import "account.proto"; + // PusherService provides methods to send various types of notifications. service PusherService { // Sends an email. @@ -129,3 +131,14 @@ message GetWebsocketConnectionStatusRequest { message GetWebsocketConnectionStatusResponse { bool is_connected = 1; } + + +service PusherHandlerService { + rpc ReceiveWebSocketPacket(ReceiveWebSocketPacketRequest) returns (google.protobuf.Empty) {} +} + +message ReceiveWebSocketPacketRequest { + WebSocketPacket packet = 1; + Account account = 2; + string device_id = 3; +} \ No newline at end of file diff --git a/DysonNetwork.Sphere/Connection/WebsocketHandler.cs b/DysonNetwork.Sphere/Connection/WebsocketHandler.cs new file mode 100644 index 0000000..5a2c84b --- /dev/null +++ b/DysonNetwork.Sphere/Connection/WebsocketHandler.cs @@ -0,0 +1,129 @@ +using DysonNetwork.Shared.Cache; +using DysonNetwork.Shared.Proto; +using DysonNetwork.Sphere.Chat; +using Google.Protobuf.WellKnownTypes; +using Grpc.Core; + +namespace DysonNetwork.Sphere.Connection; + +public class WebSocketHandlerGrpc(PusherService.PusherServiceClient pusher, ChatRoomService crs) + : PusherHandlerService.PusherHandlerServiceBase +{ + public override async Task ReceiveWebSocketPacket( + ReceiveWebSocketPacketRequest request, + ServerCallContext context + ) + { + switch (request.Packet.Type) + { + case "messages.read": + await HandleMessageRead(request, context); + break; + case "messages.typing": + await HandleMessageTyping(request, context); + break; + } + + return new Empty(); + } + + private async Task HandleMessageRead(ReceiveWebSocketPacketRequest request, ServerCallContext context) + { + var currentUser = request.Account; + var packet = request.Packet; + + if (packet.Data == null) + { + await SendErrorResponse(request, + "Mark message as read requires you to provide the ChatRoomId and MessageId"); + return; + } + + var requestData = GrpcTypeHelper.ConvertValueToClass(packet.Data); + if (requestData == null) + { + await SendErrorResponse(request, "Invalid request data"); + return; + } + + var sender = await crs.GetRoomMember( + Guid.Parse(currentUser.Id), + requestData.ChatRoomId + ); + + if (sender == null) + { + await SendErrorResponse(request, "User is not a member of the chat room."); + return; + } + + var readReceipt = new MessageReadReceipt { SenderId = sender.Id }; + + var bufferService = context.GetHttpContext().RequestServices.GetRequiredService(); + bufferService.Enqueue(readReceipt); + } + + private async Task HandleMessageTyping(ReceiveWebSocketPacketRequest request, ServerCallContext context) + { + var currentUser = request.Account; + var packet = request.Packet; + + if (packet.Data == null) + { + await SendErrorResponse(request, "messages.typing requires you to provide the ChatRoomId"); + return; + } + + var requestData = GrpcTypeHelper.ConvertValueToClass(packet.Data); + if (requestData == null) + { + await SendErrorResponse(request, "Invalid request data"); + return; + } + + var sender = await crs.GetRoomMember( + Guid.Parse(currentUser.Id), + requestData.ChatRoomId + ); + if (sender == null) + { + await SendErrorResponse(request, "User is not a member of the chat room."); + return; + } + + var responsePacket = new WebSocketPacket + { + Type = "messages.typing", + Data = GrpcTypeHelper.ConvertObjectToValue(new + { + room_id = sender.ChatRoomId, + sender_id = sender.Id, + sender = sender + }) + }; + + // Broadcast typing indicator to other room members + var otherMembers = (await crs.ListRoomMembers(requestData.ChatRoomId)) + .Where(m => m.AccountId != Guid.Parse(currentUser.Id)) + .Select(m => m.AccountId.ToString()) + .ToList(); + + var respRequest = new PushWebSocketPacketToUsersRequest() { Packet = responsePacket }; + respRequest.UserIds.AddRange(otherMembers); + + await pusher.PushWebSocketPacketToUsersAsync(respRequest); + } + + private async Task SendErrorResponse(ReceiveWebSocketPacketRequest request, string message) + { + await pusher.PushWebSocketPacketToDeviceAsync(new PushWebSocketPacketToDeviceRequest + { + DeviceId = request.DeviceId, + Packet = new WebSocketPacket + { + Type = "error", + ErrorMessage = message + } + }); + } +} \ No newline at end of file diff --git a/DysonNetwork.Sphere/Startup/ApplicationConfiguration.cs b/DysonNetwork.Sphere/Startup/ApplicationConfiguration.cs index b7ca2c8..80c276a 100644 --- a/DysonNetwork.Sphere/Startup/ApplicationConfiguration.cs +++ b/DysonNetwork.Sphere/Startup/ApplicationConfiguration.cs @@ -1,5 +1,6 @@ using System.Net; using DysonNetwork.Shared.Auth; +using DysonNetwork.Sphere.Connection; using Microsoft.AspNetCore.HttpOverrides; using Prometheus; @@ -38,6 +39,9 @@ public static class ApplicationConfiguration app.MapStaticAssets().RequireRateLimiting("fixed"); app.MapRazorPages().RequireRateLimiting("fixed"); + // Map gRPC services + app.MapGrpcService(); + return app; } diff --git a/DysonNetwork.Sphere/Startup/ServiceCollectionExtensions.cs b/DysonNetwork.Sphere/Startup/ServiceCollectionExtensions.cs index 4d1a858..98b1966 100644 --- a/DysonNetwork.Sphere/Startup/ServiceCollectionExtensions.cs +++ b/DysonNetwork.Sphere/Startup/ServiceCollectionExtensions.cs @@ -53,6 +53,11 @@ public static class ServiceCollectionExtensions factory.Create(typeof(SharedResource)); }); services.AddRazorPages(); + + services.AddGrpc(options => + { + options.EnableDetailedErrors = true; + }); services.Configure(options => { diff --git a/DysonNetwork.sln.DotSettings.user b/DysonNetwork.sln.DotSettings.user index 5967b91..6f4a05e 100644 --- a/DysonNetwork.sln.DotSettings.user +++ b/DysonNetwork.sln.DotSettings.user @@ -78,6 +78,7 @@ ForceIncluded ForceIncluded ForceIncluded + ForceIncluded ForceIncluded ForceIncluded ForceIncluded