♻️ Refactor the way to handle websocket

This commit is contained in:
2025-09-21 23:07:20 +08:00
parent e3657386cd
commit 204640a759
13 changed files with 196 additions and 219 deletions

View File

@@ -17,6 +17,7 @@
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets> <PrivateAssets>all</PrivateAssets>
</PackageReference> </PackageReference>
<PackageReference Include="MimeKit" Version="4.13.0" />
<PackageReference Include="MimeTypes" Version="2.5.2"> <PackageReference Include="MimeTypes" Version="2.5.2">
<PrivateAssets>all</PrivateAssets> <PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>

View File

@@ -1,5 +1,6 @@
using System.Net.WebSockets; using System.Net.WebSockets;
using DysonNetwork.Shared.Proto; using DysonNetwork.Shared.Proto;
using WebSocketPacket = DysonNetwork.Shared.Data.WebSocketPacket;
namespace DysonNetwork.Ring.Connection; namespace DysonNetwork.Ring.Connection;

View File

@@ -3,11 +3,15 @@ using DysonNetwork.Shared.Proto;
using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.Mvc;
using Swashbuckle.AspNetCore.Annotations; using Swashbuckle.AspNetCore.Annotations;
using WebSocketPacket = DysonNetwork.Shared.Data.WebSocketPacket;
namespace DysonNetwork.Ring.Connection; namespace DysonNetwork.Ring.Connection;
[ApiController] [ApiController]
public class WebSocketController(WebSocketService ws, ILogger<WebSocketContext> logger) : ControllerBase public class WebSocketController(
WebSocketService ws,
ILogger<WebSocketContext> logger
) : ControllerBase
{ {
[Route("/ws")] [Route("/ws")]
[Authorize] [Authorize]
@@ -23,7 +27,7 @@ public class WebSocketController(WebSocketService ws, ILogger<WebSocketContext>
return; return;
} }
var accountId = currentUser.Id!; var accountId = Guid.Parse(currentUser.Id!);
var deviceId = currentSession.Challenge?.DeviceId ?? Guid.NewGuid().ToString(); var deviceId = currentSession.Challenge?.DeviceId ?? Guid.NewGuid().ToString();
if (string.IsNullOrEmpty(deviceId)) if (string.IsNullOrEmpty(deviceId))
@@ -89,7 +93,7 @@ public class WebSocketController(WebSocketService ws, ILogger<WebSocketContext>
CancellationToken cancellationToken CancellationToken cancellationToken
) )
{ {
var connectionKey = (AccountId: currentUser.Id, DeviceId: deviceId); var connectionKey = (AccountId: Guid.Parse(currentUser.Id), DeviceId: deviceId);
var buffer = new byte[1024 * 4]; var buffer = new byte[1024 * 4];
try try

View File

@@ -2,36 +2,38 @@ using System.Collections.Concurrent;
using System.Net.WebSockets; using System.Net.WebSockets;
using DysonNetwork.Shared.Data; using DysonNetwork.Shared.Data;
using DysonNetwork.Shared.Proto; using DysonNetwork.Shared.Proto;
using Grpc.Core; using DysonNetwork.Shared.Stream;
using NATS.Client.Core;
using WebSocketPacket = DysonNetwork.Shared.Data.WebSocketPacket;
namespace DysonNetwork.Ring.Connection; namespace DysonNetwork.Ring.Connection;
public class WebSocketService public class WebSocketService
{ {
private readonly IConfiguration _configuration; private readonly INatsConnection _nats;
private readonly ILogger<WebSocketService> _logger; private readonly ILogger<WebSocketService> _logger;
private readonly IDictionary<string, IWebSocketPacketHandler> _handlerMap; private readonly IDictionary<string, IWebSocketPacketHandler> _handlerMap;
public WebSocketService( public WebSocketService(
IEnumerable<IWebSocketPacketHandler> handlers, IEnumerable<IWebSocketPacketHandler> handlers,
ILogger<WebSocketService> logger, ILogger<WebSocketService> logger,
IConfiguration configuration INatsConnection nats
) )
{ {
_logger = logger; _logger = logger;
_configuration = configuration;
_handlerMap = handlers.ToDictionary(h => h.PacketType); _handlerMap = handlers.ToDictionary(h => h.PacketType);
_nats = nats;
} }
private static readonly ConcurrentDictionary< private static readonly ConcurrentDictionary<
(string AccountId, string DeviceId), (Guid AccountId, string DeviceId),
(WebSocket Socket, CancellationTokenSource Cts) (WebSocket Socket, CancellationTokenSource Cts)
> ActiveConnections = new(); > ActiveConnections = new();
private static readonly ConcurrentDictionary<string, string> ActiveSubscriptions = new(); // deviceId -> chatRoomId private static readonly ConcurrentDictionary<string, string> ActiveSubscriptions = new(); // deviceId -> chatRoomId
public bool TryAdd( public bool TryAdd(
(string AccountId, string DeviceId) key, (Guid AccountId, string DeviceId) key,
WebSocket socket, WebSocket socket,
CancellationTokenSource cts CancellationTokenSource cts
) )
@@ -42,7 +44,7 @@ public class WebSocketService
return ActiveConnections.TryAdd(key, (socket, cts)); return ActiveConnections.TryAdd(key, (socket, cts));
} }
public void Disconnect((string AccountId, string DeviceId) key, string? reason = null) public void Disconnect((Guid AccountId, string DeviceId) key, string? reason = null)
{ {
if (!ActiveConnections.TryGetValue(key, out var data)) return; if (!ActiveConnections.TryGetValue(key, out var data)) return;
try try
@@ -63,19 +65,19 @@ public class WebSocketService
ActiveConnections.TryRemove(key, out _); ActiveConnections.TryRemove(key, out _);
} }
public bool GetDeviceIsConnected(string deviceId) public static bool GetDeviceIsConnected(string deviceId)
{ {
return ActiveConnections.Any(c => c.Key.DeviceId == deviceId); return ActiveConnections.Any(c => c.Key.DeviceId == deviceId);
} }
public bool GetAccountIsConnected(string accountId) public static bool GetAccountIsConnected(Guid accountId)
{ {
return ActiveConnections.Any(c => c.Key.AccountId == accountId); return ActiveConnections.Any(c => c.Key.AccountId == accountId);
} }
public void SendPacketToAccount(string userId, WebSocketPacket packet) public static void SendPacketToAccount(Guid accountId, WebSocketPacket packet)
{ {
var connections = ActiveConnections.Where(c => c.Key.AccountId == userId); var connections = ActiveConnections.Where(c => c.Key.AccountId == accountId);
var packetBytes = packet.ToBytes(); var packetBytes = packet.ToBytes();
var segment = new ArraySegment<byte>(packetBytes); var segment = new ArraySegment<byte>(packetBytes);
@@ -139,28 +141,16 @@ public class WebSocketService
try try
{ {
var endpoint = packet.Endpoint.Replace("DysonNetwork.", "").ToLower(); var endpoint = packet.Endpoint.Replace("DysonNetwork.", "").ToLower();
var serviceUrl = "https://_grpc." + endpoint; await _nats.PublishAsync(WebSocketPacketEvent.SubjectPrefix + endpoint, new WebSocketPacketEvent
var callInvoker = GrpcClientHelper.CreateCallInvoker(serviceUrl);
var client = new RingHandlerService.RingHandlerServiceClient(callInvoker);
try
{ {
await client.ReceiveWebSocketPacketAsync(new ReceiveWebSocketPacketRequest AccountId = Guid.Parse(currentUser.Id),
{
Account = currentUser,
DeviceId = deviceId, DeviceId = deviceId,
Packet = packet.ToProtoValue() PacketBytes = packet.ToBytes()
}); });
} }
catch (RpcException ex)
{
_logger.LogError(ex, $"Error forwarding packet to endpoint: {packet.Endpoint} (${endpoint})");
}
}
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError(ex, $"Error forwarding packet to endpoint: {packet.Endpoint}"); _logger.LogError(ex, "Error forwarding packet to endpoint: {Endpoint}", packet.Endpoint);
} }
} }

View File

@@ -5,14 +5,13 @@ using DysonNetwork.Ring.Services;
using DysonNetwork.Shared.Proto; using DysonNetwork.Shared.Proto;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using NodaTime; using NodaTime;
using WebSocketPacket = DysonNetwork.Ring.Connection.WebSocketPacket; using WebSocketPacket = DysonNetwork.Shared.Data.WebSocketPacket;
namespace DysonNetwork.Ring.Notification; namespace DysonNetwork.Ring.Notification;
public class PushService public class PushService
{ {
private readonly AppDatabase _db; private readonly AppDatabase _db;
private readonly WebSocketService _ws;
private readonly QueueService _queueService; private readonly QueueService _queueService;
private readonly ILogger<PushService> _logger; private readonly ILogger<PushService> _logger;
private readonly FirebaseSender? _fcm; private readonly FirebaseSender? _fcm;
@@ -22,7 +21,6 @@ public class PushService
public PushService( public PushService(
IConfiguration config, IConfiguration config,
AppDatabase db, AppDatabase db,
WebSocketService ws,
QueueService queueService, QueueService queueService,
IHttpClientFactory httpFactory, IHttpClientFactory httpFactory,
ILogger<PushService> logger ILogger<PushService> logger
@@ -53,7 +51,6 @@ public class PushService
} }
_db = db; _db = db;
_ws = ws;
_queueService = queueService; _queueService = queueService;
_logger = logger; _logger = logger;
} }
@@ -73,9 +70,9 @@ public class PushService
) )
{ {
var now = SystemClock.Instance.GetCurrentInstant(); var now = SystemClock.Instance.GetCurrentInstant();
var accountId = Guid.Parse(account.Id!); var accountId = Guid.Parse(account.Id);
// Check for existing subscription with same device ID or token // Check for existing subscription with the same device ID or token
var existingSubscription = await _db.PushSubscriptions var existingSubscription = await _db.PushSubscriptions
.Where(s => s.AccountId == accountId) .Where(s => s.AccountId == accountId)
.Where(s => s.DeviceId == deviceId || s.DeviceToken == deviceToken) .Where(s => s.DeviceId == deviceId || s.DeviceToken == deviceToken)
@@ -125,7 +122,7 @@ public class PushService
if (actionUri is not null) meta["action_uri"] = actionUri; if (actionUri is not null) meta["action_uri"] = actionUri;
var accountId = Guid.Parse(account.Id!); var accountId = account.Id;
var notification = new Notification var notification = new Notification
{ {
Topic = topic, Topic = topic,
@@ -133,7 +130,7 @@ public class PushService
Subtitle = subtitle, Subtitle = subtitle,
Content = content, Content = content,
Meta = meta, Meta = meta,
AccountId = accountId, AccountId = Guid.Parse(accountId),
}; };
if (save) if (save)
@@ -143,12 +140,12 @@ public class PushService
} }
if (!isSilent) if (!isSilent)
_ = _queueService.EnqueuePushNotification(notification, accountId, save); _ = _queueService.EnqueuePushNotification(notification, Guid.Parse(accountId), save);
} }
public async Task DeliverPushNotification(Notification notification, CancellationToken cancellationToken = default) public async Task DeliverPushNotification(Notification notification, CancellationToken cancellationToken = default)
{ {
_ws.SendPacketToAccount(notification.AccountId.ToString(), new WebSocketPacket() WebSocketService.SendPacketToAccount(notification.AccountId, new WebSocketPacket()
{ {
Type = "notifications.new", Type = "notifications.new",
Data = notification, Data = notification,
@@ -251,8 +248,8 @@ public class PushService
// WS first // WS first
foreach (var account in accounts) foreach (var account in accounts)
{ {
notification.AccountId = account; // keep original behavior notification.AccountId = account;
_ws.SendPacketToAccount(account.ToString(), new Connection.WebSocketPacket WebSocketService.SendPacketToAccount(account, new WebSocketPacket
{ {
Type = "notifications.new", Type = "notifications.new",
Data = notification Data = notification

View File

@@ -1,10 +1,8 @@
using DysonNetwork.Ring.Connection; using DysonNetwork.Ring.Connection;
using DysonNetwork.Ring.Email;
using DysonNetwork.Ring.Notification; using DysonNetwork.Ring.Notification;
using DysonNetwork.Shared.Proto; using DysonNetwork.Shared.Proto;
using Google.Protobuf.WellKnownTypes; using Google.Protobuf.WellKnownTypes;
using Grpc.Core; using Grpc.Core;
using System.Text.Json;
namespace DysonNetwork.Ring.Services; namespace DysonNetwork.Ring.Services;
@@ -27,30 +25,28 @@ public class RingServiceGrpc(
public override Task<Empty> PushWebSocketPacket(PushWebSocketPacketRequest request, ServerCallContext context) public override Task<Empty> PushWebSocketPacket(PushWebSocketPacketRequest request, ServerCallContext context)
{ {
var packet = new Connection.WebSocketPacket var packet = new Shared.Data.WebSocketPacket
{ {
Type = request.Packet.Type, Type = request.Packet.Type,
Data = GrpcTypeHelper.ConvertByteStringToObject<Dictionary<string, object?>>(request.Packet.Data), Data = GrpcTypeHelper.ConvertByteStringToObject<Dictionary<string, object?>>(request.Packet.Data),
ErrorMessage = request.Packet.ErrorMessage ErrorMessage = request.Packet.ErrorMessage
}; };
websocket.SendPacketToAccount(request.UserId, packet); WebSocketService.SendPacketToAccount(Guid.Parse(request.UserId), packet);
return Task.FromResult(new Empty()); return Task.FromResult(new Empty());
} }
public override Task<Empty> PushWebSocketPacketToUsers(PushWebSocketPacketToUsersRequest request, public override Task<Empty> PushWebSocketPacketToUsers(PushWebSocketPacketToUsersRequest request,
ServerCallContext context) ServerCallContext context)
{ {
var packet = new Connection.WebSocketPacket var packet = new Shared.Data.WebSocketPacket
{ {
Type = request.Packet.Type, Type = request.Packet.Type,
Data = GrpcTypeHelper.ConvertByteStringToObject<Dictionary<string, object?>>(request.Packet.Data), Data = GrpcTypeHelper.ConvertByteStringToObject<Dictionary<string, object?>>(request.Packet.Data),
ErrorMessage = request.Packet.ErrorMessage ErrorMessage = request.Packet.ErrorMessage
}; };
foreach (var userId in request.UserIds) foreach (var accountId in request.UserIds)
{ WebSocketService.SendPacketToAccount(Guid.Parse(accountId), packet);
websocket.SendPacketToAccount(userId, packet);
}
return Task.FromResult(new Empty()); return Task.FromResult(new Empty());
} }
@@ -58,7 +54,7 @@ public class RingServiceGrpc(
public override Task<Empty> PushWebSocketPacketToDevice(PushWebSocketPacketToDeviceRequest request, public override Task<Empty> PushWebSocketPacketToDevice(PushWebSocketPacketToDeviceRequest request,
ServerCallContext context) ServerCallContext context)
{ {
var packet = new Connection.WebSocketPacket var packet = new Shared.Data.WebSocketPacket
{ {
Type = request.Packet.Type, Type = request.Packet.Type,
Data = GrpcTypeHelper.ConvertByteStringToObject<Dictionary<string, object?>>(request.Packet.Data), Data = GrpcTypeHelper.ConvertByteStringToObject<Dictionary<string, object?>>(request.Packet.Data),
@@ -71,7 +67,7 @@ public override Task<Empty> PushWebSocketPacketToDevice(PushWebSocketPacketToDev
public override Task<Empty> PushWebSocketPacketToDevices(PushWebSocketPacketToDevicesRequest request, public override Task<Empty> PushWebSocketPacketToDevices(PushWebSocketPacketToDevicesRequest request,
ServerCallContext context) ServerCallContext context)
{ {
var packet = new Connection.WebSocketPacket var packet = new Shared.Data.WebSocketPacket
{ {
Type = request.Packet.Type, Type = request.Packet.Type,
Data = GrpcTypeHelper.ConvertByteStringToObject<Dictionary<string, object?>>(request.Packet.Data), Data = GrpcTypeHelper.ConvertByteStringToObject<Dictionary<string, object?>>(request.Packet.Data),
@@ -161,8 +157,8 @@ public override Task<Empty> PushWebSocketPacketToDevice(PushWebSocketPacketToDev
var isConnected = request.IdCase switch var isConnected = request.IdCase switch
{ {
GetWebsocketConnectionStatusRequest.IdOneofCase.DeviceId => GetWebsocketConnectionStatusRequest.IdOneofCase.DeviceId =>
websocket.GetDeviceIsConnected(request.DeviceId), WebSocketService.GetDeviceIsConnected(request.DeviceId),
GetWebsocketConnectionStatusRequest.IdOneofCase.UserId => websocket.GetAccountIsConnected(request.UserId), GetWebsocketConnectionStatusRequest.IdOneofCase.UserId => WebSocketService.GetAccountIsConnected(Guid.Parse(request.UserId)),
_ => false _ => false
}; };

View File

@@ -4,7 +4,7 @@ using DysonNetwork.Shared.Proto;
using NodaTime; using NodaTime;
using NodaTime.Serialization.SystemTextJson; using NodaTime.Serialization.SystemTextJson;
namespace DysonNetwork.Ring.Connection; namespace DysonNetwork.Shared.Data;
public class WebSocketPacket public class WebSocketPacket
{ {
@@ -27,13 +27,7 @@ public class WebSocketPacket
public static WebSocketPacket FromBytes(byte[] bytes) public static WebSocketPacket FromBytes(byte[] bytes)
{ {
var json = System.Text.Encoding.UTF8.GetString(bytes); var json = System.Text.Encoding.UTF8.GetString(bytes);
var jsonOpts = new JsonSerializerOptions return JsonSerializer.Deserialize<WebSocketPacket>(json, GrpcTypeHelper.SerializerOptions) ??
{
NumberHandling = JsonNumberHandling.AllowNamedFloatingPointLiterals,
PropertyNamingPolicy = JsonNamingPolicy.SnakeCaseLower,
DictionaryKeyPolicy = JsonNamingPolicy.SnakeCaseLower,
};
return JsonSerializer.Deserialize<WebSocketPacket>(json, jsonOpts) ??
throw new JsonException("Failed to deserialize WebSocketPacket"); throw new JsonException("Failed to deserialize WebSocketPacket");
} }
@@ -47,15 +41,9 @@ public class WebSocketPacket
if (Data is T typedData) if (Data is T typedData)
return typedData; return typedData;
var jsonOpts = new JsonSerializerOptions
{
NumberHandling = JsonNumberHandling.AllowNamedFloatingPointLiterals,
PropertyNamingPolicy = JsonNamingPolicy.SnakeCaseLower,
DictionaryKeyPolicy = JsonNamingPolicy.SnakeCaseLower,
};
return JsonSerializer.Deserialize<T>( return JsonSerializer.Deserialize<T>(
JsonSerializer.Serialize(Data, jsonOpts), JsonSerializer.Serialize(Data, GrpcTypeHelper.SerializerOptions),
jsonOpts GrpcTypeHelper.SerializerOptions
); );
} }
@@ -65,13 +53,7 @@ public class WebSocketPacket
/// <returns>Byte array representation of the packet</returns> /// <returns>Byte array representation of the packet</returns>
public byte[] ToBytes() public byte[] ToBytes()
{ {
var jsonOpts = new JsonSerializerOptions var json = JsonSerializer.Serialize(this, GrpcTypeHelper.SerializerOptions);
{
NumberHandling = JsonNumberHandling.AllowNamedFloatingPointLiterals,
PropertyNamingPolicy = JsonNamingPolicy.SnakeCaseLower,
DictionaryKeyPolicy = JsonNamingPolicy.SnakeCaseLower,
}.ConfigureForNodaTime(DateTimeZoneProviders.Tzdb);
var json = JsonSerializer.Serialize(this, jsonOpts);
return System.Text.Encoding.UTF8.GetBytes(json); return System.Text.Encoding.UTF8.GetBytes(json);
} }

View File

@@ -0,0 +1,12 @@
namespace DysonNetwork.Shared.Stream;
public class WebSocketPacketEvent
{
public static string Type => "websocket_msg";
public static string SubjectPrefix = "websocket_";
public Guid AccountId { get; set; }
public string DeviceId { get; set; } = null!;
public byte[] PacketBytes { get; set; } = null!;
}

View File

@@ -5,6 +5,7 @@ using DysonNetwork.Sphere.Chat.Realtime;
using DysonNetwork.Sphere.WebReader; using DysonNetwork.Sphere.WebReader;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using NodaTime; using NodaTime;
using WebSocketPacket = DysonNetwork.Shared.Proto.WebSocketPacket;
namespace DysonNetwork.Sphere.Chat; namespace DysonNetwork.Sphere.Chat;

View File

@@ -1,128 +0,0 @@
using DysonNetwork.Shared.Cache;
using DysonNetwork.Shared.Proto;
using DysonNetwork.Sphere.Chat;
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
namespace DysonNetwork.Sphere.Connection;
public class WebSocketHandlerGrpc(RingService.RingServiceClient pusher, ChatRoomService crs, ChatService cs)
: RingHandlerService.RingHandlerServiceBase
{
public override async Task<Empty> ReceiveWebSocketPacket(
ReceiveWebSocketPacketRequest request,
ServerCallContext context
)
{
switch (request.Packet.Type)
{
case "messages.read":
await HandleMessageRead(request, context);
break;
case "messages.typing":
await HandleMessageTyping(request, context);
break;
}
return new Empty();
}
private async Task HandleMessageRead(ReceiveWebSocketPacketRequest request, ServerCallContext context)
{
var currentUser = request.Account;
var packet = request.Packet;
if (packet.Data == null)
{
await SendErrorResponse(
request,
"Mark message as read requires you to provide the ChatRoomId"
);
return;
}
var requestData = GrpcTypeHelper.ConvertByteStringToObject<ChatController.MarkMessageReadRequest>(packet.Data);
if (requestData == null)
{
await SendErrorResponse(request, "Invalid request data");
return;
}
var sender = await crs.GetRoomMember(
Guid.Parse(currentUser.Id),
requestData.ChatRoomId
);
if (sender == null)
{
await SendErrorResponse(request, "User is not a member of the chat room.");
return;
}
await cs.ReadChatRoomAsync(requestData.ChatRoomId, Guid.Parse(currentUser.Id));
}
private async Task HandleMessageTyping(ReceiveWebSocketPacketRequest request, ServerCallContext context)
{
var currentUser = request.Account;
var packet = request.Packet;
if (packet.Data == null)
{
await SendErrorResponse(request, "messages.typing requires you to provide the ChatRoomId");
return;
}
var requestData = GrpcTypeHelper.ConvertByteStringToObject<ChatController.ChatRoomWsUniversalRequest>(packet.Data);
if (requestData == null)
{
await SendErrorResponse(request, "Invalid request data");
return;
}
var sender = await crs.GetRoomMember(
Guid.Parse(currentUser.Id),
requestData.ChatRoomId
);
if (sender == null)
{
await SendErrorResponse(request, "User is not a member of the chat room.");
return;
}
var responsePacket = new WebSocketPacket
{
Type = "messages.typing",
Data = GrpcTypeHelper.ConvertObjectToByteString(new
{
room_id = sender.ChatRoomId,
sender_id = sender.Id,
sender = sender
})
};
// Broadcast typing indicator to other room members
var otherMembers = (await crs.ListRoomMembers(requestData.ChatRoomId))
.Where(m => m.AccountId != Guid.Parse(currentUser.Id))
.Select(m => m.AccountId.ToString())
.ToList();
var respRequest = new PushWebSocketPacketToUsersRequest() { Packet = responsePacket };
respRequest.UserIds.AddRange(otherMembers);
await pusher.PushWebSocketPacketToUsersAsync(respRequest);
}
private async Task SendErrorResponse(ReceiveWebSocketPacketRequest request, string message)
{
await pusher.PushWebSocketPacketToDeviceAsync(new PushWebSocketPacketToDeviceRequest
{
DeviceId = request.DeviceId,
Packet = new WebSocketPacket
{
Type = "error",
ErrorMessage = message
}
});
}
}

View File

@@ -1,6 +1,5 @@
using DysonNetwork.Shared.Auth; using DysonNetwork.Shared.Auth;
using DysonNetwork.Shared.Http; using DysonNetwork.Shared.Http;
using DysonNetwork.Sphere.Connection;
using DysonNetwork.Sphere.Publisher; using DysonNetwork.Sphere.Publisher;
using Prometheus; using Prometheus;
@@ -28,7 +27,6 @@ public static class ApplicationConfiguration
app.MapControllers(); app.MapControllers();
// Map gRPC services // Map gRPC services
app.MapGrpcService<WebSocketHandlerGrpc>();
app.MapGrpcService<PublisherServiceGrpc>(); app.MapGrpcService<PublisherServiceGrpc>();
return app; return app;

View File

@@ -2,11 +2,14 @@ using System.Text.Json;
using System.Text.Json.Serialization; using System.Text.Json.Serialization;
using DysonNetwork.Shared.Proto; using DysonNetwork.Shared.Proto;
using DysonNetwork.Shared.Stream; using DysonNetwork.Shared.Stream;
using DysonNetwork.Sphere.Chat;
using DysonNetwork.Sphere.Post; using DysonNetwork.Sphere.Post;
using DysonNetwork.Sphere.Realm;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using NATS.Client.Core; using NATS.Client.Core;
using NATS.Client.JetStream.Models; using NATS.Client.JetStream.Models;
using NATS.Net; using NATS.Net;
using WebSocketPacket = DysonNetwork.Shared.Data.WebSocketPacket;
namespace DysonNetwork.Sphere.Startup; namespace DysonNetwork.Sphere.Startup;
@@ -25,17 +28,19 @@ public class PaymentOrderAwardMeta
} }
public class BroadcastEventHandler( public class BroadcastEventHandler(
INatsConnection nats, IServiceProvider serviceProvider,
ILogger<BroadcastEventHandler> logger, ILogger<BroadcastEventHandler> logger,
IServiceProvider serviceProvider INatsConnection nats,
RingService.RingServiceClient pusher
) : BackgroundService ) : BackgroundService
{ {
protected override async Task ExecuteAsync(CancellationToken stoppingToken) protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{ {
var paymentTask = HandlePaymentOrders(stoppingToken); var paymentTask = HandlePaymentOrders(stoppingToken);
var accountTask = HandleAccountDeletions(stoppingToken); var accountTask = HandleAccountDeletions(stoppingToken);
var websocketTask = HandleWebSocketPackets(stoppingToken);
await Task.WhenAll(paymentTask, accountTask); await Task.WhenAll(paymentTask, accountTask, websocketTask);
} }
private async Task HandlePaymentOrders(CancellationToken stoppingToken) private async Task HandlePaymentOrders(CancellationToken stoppingToken)
@@ -165,4 +170,120 @@ public class BroadcastEventHandler(
} }
} }
} }
private async Task HandleWebSocketPackets(CancellationToken stoppingToken)
{
await foreach (var msg in nats.SubscribeAsync<WebSocketPacketEvent>(
WebSocketPacketEvent.SubjectPrefix + "sphere", cancellationToken: stoppingToken))
{
try
{
var evt = msg.Data;
var packet = WebSocketPacket.FromBytes(evt.PacketBytes);
switch (packet.Type)
{
case "messages.read":
await HandleMessageRead(evt, packet);
break;
case "messages.typing":
await HandleMessageTyping(evt, packet);
break;
}
}
catch (Exception ex)
{
logger.LogError(ex, "Error processing websocket packet");
}
}
}
private async Task HandleMessageRead(WebSocketPacketEvent evt, WebSocketPacket packet)
{
using var scope = serviceProvider.CreateScope();
var cs = scope.ServiceProvider.GetRequiredService<ChatService>();
var crs = scope.ServiceProvider.GetRequiredService<ChatRoomService>();
if (packet.Data == null)
{
await SendErrorResponse(evt, "Mark message as read requires you to provide the ChatRoomId");
return;
}
var requestData = packet.GetData<ChatController.MarkMessageReadRequest>();
if (requestData == null)
{
await SendErrorResponse(evt, "Invalid request data");
return;
}
var sender = await crs.GetRoomMember(evt.AccountId, requestData.ChatRoomId);
if (sender == null)
{
await SendErrorResponse(evt, "User is not a member of the chat room.");
return;
}
await cs.ReadChatRoomAsync(requestData.ChatRoomId, evt.AccountId);
}
private async Task HandleMessageTyping(WebSocketPacketEvent evt, WebSocketPacket packet)
{
using var scope = serviceProvider.CreateScope();
var crs = scope.ServiceProvider.GetRequiredService<ChatRoomService>();
if (packet.Data == null)
{
await SendErrorResponse(evt, "messages.typing requires you to provide the ChatRoomId");
return;
}
var requestData = packet.GetData<ChatController.ChatRoomWsUniversalRequest>();
if (requestData == null)
{
await SendErrorResponse(evt, "Invalid request data");
return;
}
var sender = await crs.GetRoomMember(evt.AccountId, requestData.ChatRoomId);
if (sender == null)
{
await SendErrorResponse(evt, "User is not a member of the chat room.");
return;
}
var responsePacket = new WebSocketPacket
{
Type = "messages.typing",
Data = GrpcTypeHelper.ConvertObjectToByteString(new
{
room_id = sender.ChatRoomId,
sender_id = sender.Id,
sender = sender
})
};
// Broadcast typing indicator to other room members
var otherMembers = (await crs.ListRoomMembers(requestData.ChatRoomId))
.Where(m => m.AccountId != evt.AccountId)
.Select(m => m.AccountId.ToString())
.ToList();
var respRequest = new PushWebSocketPacketToUsersRequest() { Packet = responsePacket.ToProtoValue() };
respRequest.UserIds.AddRange(otherMembers);
await pusher.PushWebSocketPacketToUsersAsync(respRequest);
}
private async Task SendErrorResponse(WebSocketPacketEvent evt, string message)
{
await pusher.PushWebSocketPacketToDeviceAsync(new PushWebSocketPacketToDeviceRequest
{
DeviceId = evt.DeviceId,
Packet = new WebSocketPacket
{
Type = "error",
ErrorMessage = message
}.ToProtoValue()
});
}
} }

View File

@@ -21,6 +21,7 @@
<s:String x:Key="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003AChapterData_002Ecs_002Fl_003A_002E_002E_003F_002E_002E_003F_002E_002E_003FLibrary_003FApplication_0020Support_003FJetBrains_003FRider2024_002E3_003Fresharper_002Dhost_003FDecompilerCache_003Fdecompiler_003Ffef366b36a224d469ff150d30f9a866d23c00_003Fe6_003F64a6c0f7_003FChapterData_002Ecs/@EntryIndexedValue">ForceIncluded</s:String> <s:String x:Key="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003AChapterData_002Ecs_002Fl_003A_002E_002E_003F_002E_002E_003F_002E_002E_003FLibrary_003FApplication_0020Support_003FJetBrains_003FRider2024_002E3_003Fresharper_002Dhost_003FDecompilerCache_003Fdecompiler_003Ffef366b36a224d469ff150d30f9a866d23c00_003Fe6_003F64a6c0f7_003FChapterData_002Ecs/@EntryIndexedValue">ForceIncluded</s:String>
<s:String x:Key="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003AClaim_002Ecs_002Fl_003A_002E_002E_003F_002E_002E_003F_002E_002E_003FLibrary_003FApplication_0020Support_003FJetBrains_003FRider2024_002E3_003Fresharper_002Dhost_003FDecompilerCache_003Fdecompiler_003Fa7fdc52b6e574ae7b9822133be91162a15800_003Ff7_003Feebffd8d_003FClaim_002Ecs/@EntryIndexedValue">ForceIncluded</s:String> <s:String x:Key="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003AClaim_002Ecs_002Fl_003A_002E_002E_003F_002E_002E_003F_002E_002E_003FLibrary_003FApplication_0020Support_003FJetBrains_003FRider2024_002E3_003Fresharper_002Dhost_003FDecompilerCache_003Fdecompiler_003Fa7fdc52b6e574ae7b9822133be91162a15800_003Ff7_003Feebffd8d_003FClaim_002Ecs/@EntryIndexedValue">ForceIncluded</s:String>
<s:String x:Key="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003AClusterConfig_002Ecs_002Fl_003A_002E_002E_003F_002E_002E_003F_002E_002E_003FLibrary_003FApplication_0020Support_003FJetBrains_003FRider2025_002E1_003Fresharper_002Dhost_003FDecompilerCache_003Fdecompiler_003Fbf3f51607a3e4e76b5b91640cd7409195c430_003F3f_003F87f581ed_003FClusterConfig_002Ecs/@EntryIndexedValue">ForceIncluded</s:String> <s:String x:Key="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003AClusterConfig_002Ecs_002Fl_003A_002E_002E_003F_002E_002E_003F_002E_002E_003FLibrary_003FApplication_0020Support_003FJetBrains_003FRider2025_002E1_003Fresharper_002Dhost_003FDecompilerCache_003Fdecompiler_003Fbf3f51607a3e4e76b5b91640cd7409195c430_003F3f_003F87f581ed_003FClusterConfig_002Ecs/@EntryIndexedValue">ForceIncluded</s:String>
<s:String x:Key="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003AConcurrentDictionary_00602_002Ecs_002Fl_003A_002E_002E_003F_002E_002E_003F_002E_002E_003FLibrary_003FApplication_0020Support_003FJetBrains_003FRider2025_002E2_003Fresharper_002Dhost_003FDecompilerCache_003Fdecompiler_003F1f443143201742669eeb211a435e32ae4c600_003F24_003F59c4e69f_003FConcurrentDictionary_00602_002Ecs/@EntryIndexedValue">ForceIncluded</s:String>
<s:String x:Key="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003AConnectionMultiplexer_002Ecs_002Fl_003A_002E_002E_003F_002E_002E_003F_002E_002E_003FLibrary_003FApplication_0020Support_003FJetBrains_003FRider2025_002E1_003Fresharper_002Dhost_003FSourcesCache_003F2ed0e2f073b1d77b98dadb822da09ee8a9dfb91bf29bf2bbaecb8750d7e74cc9_003FConnectionMultiplexer_002Ecs/@EntryIndexedValue">ForceIncluded</s:String> <s:String x:Key="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003AConnectionMultiplexer_002Ecs_002Fl_003A_002E_002E_003F_002E_002E_003F_002E_002E_003FLibrary_003FApplication_0020Support_003FJetBrains_003FRider2025_002E1_003Fresharper_002Dhost_003FSourcesCache_003F2ed0e2f073b1d77b98dadb822da09ee8a9dfb91bf29bf2bbaecb8750d7e74cc9_003FConnectionMultiplexer_002Ecs/@EntryIndexedValue">ForceIncluded</s:String>
<s:String x:Key="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003AContainerResourceBuilderExtensions_002Ecs_002Fl_003A_002E_002E_003F_002E_002E_003F_002E_002E_003FLibrary_003FApplication_0020Support_003FJetBrains_003FRider2025_002E2_003Fresharper_002Dhost_003FDecompilerCache_003Fdecompiler_003F19256b6d2a8a458692f07fe8d98d79e9161628_003Fd7_003F266d041b_003FContainerResourceBuilderExtensions_002Ecs/@EntryIndexedValue">ForceIncluded</s:String> <s:String x:Key="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003AContainerResourceBuilderExtensions_002Ecs_002Fl_003A_002E_002E_003F_002E_002E_003F_002E_002E_003FLibrary_003FApplication_0020Support_003FJetBrains_003FRider2025_002E2_003Fresharper_002Dhost_003FDecompilerCache_003Fdecompiler_003F19256b6d2a8a458692f07fe8d98d79e9161628_003Fd7_003F266d041b_003FContainerResourceBuilderExtensions_002Ecs/@EntryIndexedValue">ForceIncluded</s:String>
<s:String x:Key="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003AControllerBase_002Ecs_002Fl_003A_002E_002E_003F_002E_002E_003F_002E_002E_003FLibrary_003FApplication_0020Support_003FJetBrains_003FRider2025_002E1_003Fresharper_002Dhost_003FDecompilerCache_003Fdecompiler_003F0b5acdd962e549369896cece0026e556214600_003Ff6_003Fdf150bb3_003FControllerBase_002Ecs/@EntryIndexedValue">ForceIncluded</s:String> <s:String x:Key="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003AControllerBase_002Ecs_002Fl_003A_002E_002E_003F_002E_002E_003F_002E_002E_003FLibrary_003FApplication_0020Support_003FJetBrains_003FRider2025_002E1_003Fresharper_002Dhost_003FDecompilerCache_003Fdecompiler_003F0b5acdd962e549369896cece0026e556214600_003Ff6_003Fdf150bb3_003FControllerBase_002Ecs/@EntryIndexedValue">ForceIncluded</s:String>
@@ -162,6 +163,7 @@
<s:String x:Key="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003AWebSocketCloseStatus_002Ecs_002Fl_003A_002E_002E_003F_002E_002E_003F_002E_002E_003FLibrary_003FApplication_0020Support_003FJetBrains_003FRider2025_002E1_003Fresharper_002Dhost_003FDecompilerCache_003Fdecompiler_003F82dcad099d814e3facee3a7c5e19928a3ae00_003F67_003F9e63fab4_003FWebSocketCloseStatus_002Ecs/@EntryIndexedValue">ForceIncluded</s:String> <s:String x:Key="/Default/CodeInspection/ExcludedFiles/FilesAndFoldersToSkip2/=7020124F_002D9FFC_002D4AC3_002D8F3D_002DAAB8E0240759_002Ff_003AWebSocketCloseStatus_002Ecs_002Fl_003A_002E_002E_003F_002E_002E_003F_002E_002E_003FLibrary_003FApplication_0020Support_003FJetBrains_003FRider2025_002E1_003Fresharper_002Dhost_003FDecompilerCache_003Fdecompiler_003F82dcad099d814e3facee3a7c5e19928a3ae00_003F67_003F9e63fab4_003FWebSocketCloseStatus_002Ecs/@EntryIndexedValue">ForceIncluded</s:String>
<s:String x:Key="/Default/Environment/AssemblyExplorer/XmlDocument/@EntryValue">&lt;AssemblyExplorer&gt; <s:String x:Key="/Default/Environment/AssemblyExplorer/XmlDocument/@EntryValue">&lt;AssemblyExplorer&gt;
&lt;Assembly Path="/opt/homebrew/Cellar/dotnet/9.0.6/libexec/packs/Microsoft.AspNetCore.App.Ref/9.0.6/ref/net9.0/Microsoft.AspNetCore.RateLimiting.dll" /&gt; &lt;Assembly Path="/opt/homebrew/Cellar/dotnet/9.0.6/libexec/packs/Microsoft.AspNetCore.App.Ref/9.0.6/ref/net9.0/Microsoft.AspNetCore.RateLimiting.dll" /&gt;
&lt;Assembly Path="/Users/littlesheep/.nuget/packages/nodatime/3.2.2/lib/net8.0/NodaTime.dll" /&gt;
&lt;/AssemblyExplorer&gt;</s:String> &lt;/AssemblyExplorer&gt;</s:String>
<s:Boolean x:Key="/Default/ResxEditorPersonal/CheckedGroups/=DysonNetwork_002EPass_002FResources_002FLocalization_002FAccountEventResource/@EntryIndexedValue">False</s:Boolean> <s:Boolean x:Key="/Default/ResxEditorPersonal/CheckedGroups/=DysonNetwork_002EPass_002FResources_002FLocalization_002FAccountEventResource/@EntryIndexedValue">False</s:Boolean>