Websocket handler

This commit is contained in:
2025-07-18 16:12:34 +08:00
parent 086a12f971
commit 57f85ec341
12 changed files with 276 additions and 15 deletions

View File

@@ -32,7 +32,7 @@ public class ActionLogServiceGrpc : Shared.Proto.ActionLogService.ActionLogServi
try
{
var meta = request.Meta
?.Select(x => new KeyValuePair<string, object?>(x.Key, GrpcTypeHelper.ConvertField(x.Value)))
?.Select(x => new KeyValuePair<string, object?>(x.Key, GrpcTypeHelper.ConvertValueToObject(x.Value)))
.ToDictionary() ?? new Dictionary<string, object?>();
_actionLogService.CreateActionLog(

View File

@@ -40,8 +40,8 @@ public class WebSocketController(WebSocketService ws, ILogger<WebSocketContext>
if (!ws.TryAdd(connectionKey, webSocket, cts))
{
await webSocket.CloseAsync(
WebSocketCloseStatus.InternalServerError,
"Failed to establish connection.",
WebSocketCloseStatus.PolicyViolation,
"Too many connections from the same device and account.",
CancellationToken.None
);
return;

View File

@@ -1,4 +1,5 @@
using System.Text.Json;
using DysonNetwork.Shared.Proto;
using NodaTime;
using NodaTime.Serialization.SystemTextJson;
@@ -7,7 +8,8 @@ namespace DysonNetwork.Pusher.Connection;
public class WebSocketPacket
{
public string Type { get; set; } = null!;
public object Data { get; set; } = null!;
public object? Data { get; set; } = null!;
public string? Endpoint { get; set; }
public string? ErrorMessage { get; set; }
/// <summary>
@@ -62,4 +64,24 @@ public class WebSocketPacket
var json = JsonSerializer.Serialize(this, jsonOpts);
return System.Text.Encoding.UTF8.GetBytes(json);
}
public Shared.Proto.WebSocketPacket ToProtoValue()
{
return new Shared.Proto.WebSocketPacket
{
Type = Type,
Data = GrpcTypeHelper.ConvertClassToValue(Data),
ErrorMessage = ErrorMessage
};
}
public static WebSocketPacket FromProtoValue(Shared.Proto.WebSocketPacket packet)
{
return new WebSocketPacket
{
Type = packet.Type,
Data = GrpcTypeHelper.ConvertValueToObject(packet.Data),
ErrorMessage = packet.ErrorMessage
};
}
}

View File

@@ -1,16 +1,29 @@
using System.Collections.Concurrent;
using System.Net.WebSockets;
using dotnet_etcd.interfaces;
using DysonNetwork.Shared.Data;
using DysonNetwork.Shared.Proto;
using Grpc.Core;
namespace DysonNetwork.Pusher.Connection;
public class WebSocketService
{
private readonly IConfiguration _configuration;
private readonly ILogger<WebSocketService> _logger;
private readonly IEtcdClient _etcdClient;
private readonly IDictionary<string, IWebSocketPacketHandler> _handlerMap;
public WebSocketService(IEnumerable<IWebSocketPacketHandler> handlers)
public WebSocketService(
IEnumerable<IWebSocketPacketHandler> handlers,
IEtcdClient etcdClient,
ILogger<WebSocketService> logger,
IConfiguration configuration
)
{
_etcdClient = etcdClient;
_logger = logger;
_configuration = configuration;
_handlerMap = handlers.ToDictionary(h => h.PacketType);
}
@@ -44,7 +57,7 @@ public class WebSocketService
data.Cts.Cancel();
ActiveConnections.TryRemove(key, out _);
}
public bool GetDeviceIsConnected(string deviceId)
{
return ActiveConnections.Any(c => c.Key.DeviceId == deviceId);
@@ -102,6 +115,56 @@ public class WebSocketService
return;
}
if (packet.Endpoint is not null)
{
try
{
// Get the service URL from etcd for the specified endpoint
var serviceKey = $"/services/{packet.Endpoint}";
var response = await _etcdClient.GetAsync(serviceKey);
if (response.Kvs.Count > 0)
{
var serviceUrl = response.Kvs[0].Value.ToStringUtf8();
var clientCertPath = _configuration["Service:ClientCert"]!;
var clientKeyPath = _configuration["Service:ClientKey"]!;
var clientCertPassword = _configuration["Service:CertPassword"];
var callInvoker =
GrpcClientHelper.CreateCallInvoker(
serviceUrl,
clientCertPath,
clientKeyPath,
clientCertPassword
);
var client = new PusherHandlerService.PusherHandlerServiceClient(callInvoker);
try
{
await client.ReceiveWebSocketPacketAsync(new ReceiveWebSocketPacketRequest
{
Account = currentUser,
DeviceId = deviceId,
Packet = packet.ToProtoValue()
});
}
catch (RpcException ex)
{
_logger.LogError(ex, $"Error forwarding packet to endpoint: {packet.Endpoint}");
}
return;
}
_logger.LogWarning($"No service registered for endpoint: {packet.Endpoint}");
}
catch (Exception ex)
{
_logger.LogError(ex, $"Error forwarding packet to endpoint: {packet.Endpoint}");
}
}
await socket.SendAsync(
new ArraySegment<byte>(new WebSocketPacket
{

View File

@@ -42,9 +42,9 @@ public class CloudFileReferenceObject : ModelBase, ICloudFile
Id = proto.Id,
Name = proto.Name,
FileMeta = proto.FileMeta
.ToDictionary(kvp => kvp.Key, kvp => GrpcTypeHelper.ConvertField(kvp.Value)),
.ToDictionary(kvp => kvp.Key, kvp => GrpcTypeHelper.ConvertValueToObject(kvp.Value)),
UserMeta = proto.UserMeta
.ToDictionary(kvp => kvp.Key, kvp => GrpcTypeHelper.ConvertField(kvp.Value)),
.ToDictionary(kvp => kvp.Key, kvp => GrpcTypeHelper.ConvertValueToObject(kvp.Value)),
MimeType = proto.MimeType,
Hash = proto.Hash,
Size = proto.Size,

View File

@@ -8,7 +8,7 @@ namespace DysonNetwork.Shared.Proto;
public static class GrpcClientHelper
{
private static CallInvoker CreateCallInvoker(
public static CallInvoker CreateCallInvoker(
string url,
string clientCertPath,
string clientKeyPath,

View File

@@ -1,6 +1,8 @@
using System.Text.Json;
using Google.Protobuf.Collections;
using Google.Protobuf.WellKnownTypes;
using Newtonsoft.Json;
using Newtonsoft.Json.Serialization;
namespace DysonNetwork.Shared.Proto;
@@ -8,8 +10,10 @@ public abstract class GrpcTypeHelper
{
private static readonly JsonSerializerSettings SerializerSettings = new()
{
PreserveReferencesHandling = PreserveReferencesHandling.All,
ReferenceLoopHandling = ReferenceLoopHandling.Ignore
ContractResolver = new DefaultContractResolver { NamingStrategy = new SnakeCaseNamingStrategy() },
PreserveReferencesHandling = PreserveReferencesHandling.Objects,
NullValueHandling = NullValueHandling.Include,
DateParseHandling = DateParseHandling.None
};
public static MapField<string, Value> ConvertToValueMap(Dictionary<string, object> source)
@@ -44,7 +48,7 @@ public abstract class GrpcTypeHelper
try
{
// Try to parse as JSON object or primitive
result[kvp.Key] = JsonConvert.DeserializeObject(value.StringValue);
result[kvp.Key] = JsonConvert.DeserializeObject(value.StringValue, SerializerSettings);
}
catch
{
@@ -62,10 +66,10 @@ public abstract class GrpcTypeHelper
result[kvp.Key] = null;
break;
case Value.KindOneofCase.StructValue:
result[kvp.Key] = JsonConvert.DeserializeObject(JsonConvert.SerializeObject(value.StructValue.Fields.ToDictionary(f => f.Key, f => ConvertField(f.Value)), SerializerSettings));
result[kvp.Key] = JsonConvert.DeserializeObject(JsonConvert.SerializeObject(value.StructValue.Fields.ToDictionary(f => f.Key, f => ConvertValueToObject(f.Value)), SerializerSettings));
break;
case Value.KindOneofCase.ListValue:
result[kvp.Key] = JsonConvert.DeserializeObject(JsonConvert.SerializeObject(value.ListValue.Values.Select(ConvertField).ToList(), SerializerSettings));
result[kvp.Key] = JsonConvert.DeserializeObject(JsonConvert.SerializeObject(value.ListValue.Values.Select(ConvertValueToObject).ToList(), SerializerSettings), SerializerSettings);
break;
default:
result[kvp.Key] = null;
@@ -75,7 +79,7 @@ public abstract class GrpcTypeHelper
return result;
}
public static object? ConvertField(Value value)
public static object? ConvertValueToObject(Value value)
{
return value.KindCase switch
{
@@ -87,6 +91,15 @@ public abstract class GrpcTypeHelper
};
}
public static T? ConvertValueToClass<T>(Value value)
{
return value.KindCase switch
{
Value.KindOneofCase.StringValue => JsonConvert.DeserializeObject<T>(value.StringValue, SerializerSettings),
_ => JsonConvert.DeserializeObject<T>(JsonConvert.SerializeObject(value, SerializerSettings))
};
}
public static Value ConvertObjectToValue(object? obj)
{
return obj switch
@@ -101,4 +114,15 @@ public abstract class GrpcTypeHelper
_ => Value.ForString(JsonConvert.SerializeObject(obj, SerializerSettings)) // fallback to JSON string
};
}
public static Value ConvertClassToValue<T>(T obj)
{
if (obj is JsonElement element)
return Value.ForString(element.GetRawText());
return obj switch
{
null => Value.ForNull(),
_ => Value.ForString(JsonConvert.SerializeObject(obj, SerializerSettings))
};
}
}

View File

@@ -8,6 +8,8 @@ import "google/protobuf/struct.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/wrappers.proto";
import "account.proto";
// PusherService provides methods to send various types of notifications.
service PusherService {
// Sends an email.
@@ -129,3 +131,14 @@ message GetWebsocketConnectionStatusRequest {
message GetWebsocketConnectionStatusResponse {
bool is_connected = 1;
}
service PusherHandlerService {
rpc ReceiveWebSocketPacket(ReceiveWebSocketPacketRequest) returns (google.protobuf.Empty) {}
}
message ReceiveWebSocketPacketRequest {
WebSocketPacket packet = 1;
Account account = 2;
string device_id = 3;
}

View File

@@ -0,0 +1,129 @@
using DysonNetwork.Shared.Cache;
using DysonNetwork.Shared.Proto;
using DysonNetwork.Sphere.Chat;
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
namespace DysonNetwork.Sphere.Connection;
public class WebSocketHandlerGrpc(PusherService.PusherServiceClient pusher, ChatRoomService crs)
: PusherHandlerService.PusherHandlerServiceBase
{
public override async Task<Empty> ReceiveWebSocketPacket(
ReceiveWebSocketPacketRequest request,
ServerCallContext context
)
{
switch (request.Packet.Type)
{
case "messages.read":
await HandleMessageRead(request, context);
break;
case "messages.typing":
await HandleMessageTyping(request, context);
break;
}
return new Empty();
}
private async Task HandleMessageRead(ReceiveWebSocketPacketRequest request, ServerCallContext context)
{
var currentUser = request.Account;
var packet = request.Packet;
if (packet.Data == null)
{
await SendErrorResponse(request,
"Mark message as read requires you to provide the ChatRoomId and MessageId");
return;
}
var requestData = GrpcTypeHelper.ConvertValueToClass<ChatController.MarkMessageReadRequest>(packet.Data);
if (requestData == null)
{
await SendErrorResponse(request, "Invalid request data");
return;
}
var sender = await crs.GetRoomMember(
Guid.Parse(currentUser.Id),
requestData.ChatRoomId
);
if (sender == null)
{
await SendErrorResponse(request, "User is not a member of the chat room.");
return;
}
var readReceipt = new MessageReadReceipt { SenderId = sender.Id };
var bufferService = context.GetHttpContext().RequestServices.GetRequiredService<FlushBufferService>();
bufferService.Enqueue(readReceipt);
}
private async Task HandleMessageTyping(ReceiveWebSocketPacketRequest request, ServerCallContext context)
{
var currentUser = request.Account;
var packet = request.Packet;
if (packet.Data == null)
{
await SendErrorResponse(request, "messages.typing requires you to provide the ChatRoomId");
return;
}
var requestData = GrpcTypeHelper.ConvertValueToClass<ChatController.ChatRoomWsUniversalRequest>(packet.Data);
if (requestData == null)
{
await SendErrorResponse(request, "Invalid request data");
return;
}
var sender = await crs.GetRoomMember(
Guid.Parse(currentUser.Id),
requestData.ChatRoomId
);
if (sender == null)
{
await SendErrorResponse(request, "User is not a member of the chat room.");
return;
}
var responsePacket = new WebSocketPacket
{
Type = "messages.typing",
Data = GrpcTypeHelper.ConvertObjectToValue(new
{
room_id = sender.ChatRoomId,
sender_id = sender.Id,
sender = sender
})
};
// Broadcast typing indicator to other room members
var otherMembers = (await crs.ListRoomMembers(requestData.ChatRoomId))
.Where(m => m.AccountId != Guid.Parse(currentUser.Id))
.Select(m => m.AccountId.ToString())
.ToList();
var respRequest = new PushWebSocketPacketToUsersRequest() { Packet = responsePacket };
respRequest.UserIds.AddRange(otherMembers);
await pusher.PushWebSocketPacketToUsersAsync(respRequest);
}
private async Task SendErrorResponse(ReceiveWebSocketPacketRequest request, string message)
{
await pusher.PushWebSocketPacketToDeviceAsync(new PushWebSocketPacketToDeviceRequest
{
DeviceId = request.DeviceId,
Packet = new WebSocketPacket
{
Type = "error",
ErrorMessage = message
}
});
}
}

View File

@@ -1,5 +1,6 @@
using System.Net;
using DysonNetwork.Shared.Auth;
using DysonNetwork.Sphere.Connection;
using Microsoft.AspNetCore.HttpOverrides;
using Prometheus;
@@ -38,6 +39,9 @@ public static class ApplicationConfiguration
app.MapStaticAssets().RequireRateLimiting("fixed");
app.MapRazorPages().RequireRateLimiting("fixed");
// Map gRPC services
app.MapGrpcService<WebSocketHandlerGrpc>();
return app;
}

View File

@@ -53,6 +53,11 @@ public static class ServiceCollectionExtensions
factory.Create(typeof(SharedResource));
});
services.AddRazorPages();
services.AddGrpc(options =>
{
options.EnableDetailedErrors = true;
});
services.Configure<RequestLocalizationOptions>(options =>
{

View File

@@ -78,6 +78,7 @@
<s:String x:Key="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003ANpgsqlEntityTypeBuilderExtensions_002Ecs_002Fl_003A_002E_002E_003F_002E_002E_003F_002E_002E_003FLibrary_003FApplication_0020Support_003FJetBrains_003FRider2025_002E1_003Fresharper_002Dhost_003FDecompilerCache_003Fdecompiler_003Fccb1faacaea4420db96b09857fc56178a1600_003Fd9_003F9acf9507_003FNpgsqlEntityTypeBuilderExtensions_002Ecs/@EntryIndexedValue">ForceIncluded</s:String>
<s:String x:Key="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003ANullable_00601_002Ecs_002Fl_003A_002E_002E_003F_002E_002E_003F_002E_002E_003FLibrary_003FApplication_0020Support_003FJetBrains_003FRider2025_002E1_003Fresharper_002Dhost_003FDecompilerCache_003Fdecompiler_003F3bef61b8a21d4c8e96872ecdd7782fa0e55000_003F79_003F4ab1c673_003FNullable_00601_002Ecs/@EntryIndexedValue">ForceIncluded</s:String>
<s:String x:Key="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003ANullable_00601_002Ecs_002Fl_003A_002E_002E_003F_002E_002E_003F_002E_002E_003FLibrary_003FApplication_0020Support_003FJetBrains_003FRider2025_002E1_003Fresharper_002Dhost_003FDecompilerCache_003Fdecompiler_003Fb6f0571a6bc744b0b551fd4578292582e54c00_003F6a_003Fea17bf26_003FNullable_00601_002Ecs/@EntryIndexedValue">ForceIncluded</s:String>
<s:String x:Key="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003AObject_002Ecs_002Fl_003A_002E_002E_003F_002E_002E_003F_002E_002E_003FLibrary_003FApplication_0020Support_003FJetBrains_003FRider2025_002E1_003Fresharper_002Dhost_003FDecompilerCache_003Fdecompiler_003Fe6898c1ddf974e16b95b114722270029e55000_003F8e_003F98039498_003FObject_002Ecs/@EntryIndexedValue">ForceIncluded</s:String>
<s:String x:Key="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003AOk_002Ecs_002Fl_003A_002E_002E_003F_002E_002E_003F_002E_002E_003FLibrary_003FApplication_0020Support_003FJetBrains_003FRider2024_002E3_003Fresharper_002Dhost_003FDecompilerCache_003Fdecompiler_003F01d30b32e2ff422cb80129ca2a441c4242600_003F3b_003F237bf104_003FOk_002Ecs/@EntryIndexedValue">ForceIncluded</s:String>
<s:String x:Key="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003AOptionsConfigurationServiceCollectionExtensions_002Ecs_002Fl_003A_002E_002E_003F_002E_002E_003F_002E_002E_003FLibrary_003FApplication_0020Support_003FJetBrains_003FRider2024_002E3_003Fresharper_002Dhost_003FDecompilerCache_003Fdecompiler_003F6622dea924b14dc7aa3ee69d7c84e5735000_003Fe0_003F024ba0b7_003FOptionsConfigurationServiceCollectionExtensions_002Ecs/@EntryIndexedValue">ForceIncluded</s:String>
<s:String x:Key="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003APath_002Ecs_002Fl_003A_002E_002E_003F_002E_002E_003F_002E_002E_003FLibrary_003FApplication_0020Support_003FJetBrains_003FRider2024_002E3_003Fresharper_002Dhost_003FDecompilerCache_003Fdecompiler_003Fb6f0571a6bc744b0b551fd4578292582e54c00_003Fd3_003F7b05b2bd_003FPath_002Ecs/@EntryIndexedValue">ForceIncluded</s:String>