diff --git a/DysonNetwork.Drive/DysonNetwork.Drive.csproj b/DysonNetwork.Drive/DysonNetwork.Drive.csproj
index 9f2d1cf..8a03ea2 100644
--- a/DysonNetwork.Drive/DysonNetwork.Drive.csproj
+++ b/DysonNetwork.Drive/DysonNetwork.Drive.csproj
@@ -17,6 +17,7 @@
runtime; build; native; contentfiles; analyzers; buildtransitive
all
+
all
runtime; build; native; contentfiles; analyzers; buildtransitive
diff --git a/DysonNetwork.Ring/Connection/IWebSocketPacketHandler.cs b/DysonNetwork.Ring/Connection/IWebSocketPacketHandler.cs
index 0613398..b7108e4 100644
--- a/DysonNetwork.Ring/Connection/IWebSocketPacketHandler.cs
+++ b/DysonNetwork.Ring/Connection/IWebSocketPacketHandler.cs
@@ -1,5 +1,6 @@
using System.Net.WebSockets;
using DysonNetwork.Shared.Proto;
+using WebSocketPacket = DysonNetwork.Shared.Data.WebSocketPacket;
namespace DysonNetwork.Ring.Connection;
diff --git a/DysonNetwork.Ring/Connection/WebSocketController.cs b/DysonNetwork.Ring/Connection/WebSocketController.cs
index 6a8e241..f1b5668 100644
--- a/DysonNetwork.Ring/Connection/WebSocketController.cs
+++ b/DysonNetwork.Ring/Connection/WebSocketController.cs
@@ -3,11 +3,15 @@ using DysonNetwork.Shared.Proto;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using Swashbuckle.AspNetCore.Annotations;
+using WebSocketPacket = DysonNetwork.Shared.Data.WebSocketPacket;
namespace DysonNetwork.Ring.Connection;
[ApiController]
-public class WebSocketController(WebSocketService ws, ILogger logger) : ControllerBase
+public class WebSocketController(
+ WebSocketService ws,
+ ILogger logger
+) : ControllerBase
{
[Route("/ws")]
[Authorize]
@@ -23,7 +27,7 @@ public class WebSocketController(WebSocketService ws, ILogger
return;
}
- var accountId = currentUser.Id!;
+ var accountId = Guid.Parse(currentUser.Id!);
var deviceId = currentSession.Challenge?.DeviceId ?? Guid.NewGuid().ToString();
if (string.IsNullOrEmpty(deviceId))
@@ -89,7 +93,7 @@ public class WebSocketController(WebSocketService ws, ILogger
CancellationToken cancellationToken
)
{
- var connectionKey = (AccountId: currentUser.Id, DeviceId: deviceId);
+ var connectionKey = (AccountId: Guid.Parse(currentUser.Id), DeviceId: deviceId);
var buffer = new byte[1024 * 4];
try
diff --git a/DysonNetwork.Ring/Connection/WebSocketService.cs b/DysonNetwork.Ring/Connection/WebSocketService.cs
index 31eef44..62c8233 100644
--- a/DysonNetwork.Ring/Connection/WebSocketService.cs
+++ b/DysonNetwork.Ring/Connection/WebSocketService.cs
@@ -2,36 +2,38 @@ using System.Collections.Concurrent;
using System.Net.WebSockets;
using DysonNetwork.Shared.Data;
using DysonNetwork.Shared.Proto;
-using Grpc.Core;
+using DysonNetwork.Shared.Stream;
+using NATS.Client.Core;
+using WebSocketPacket = DysonNetwork.Shared.Data.WebSocketPacket;
namespace DysonNetwork.Ring.Connection;
public class WebSocketService
{
- private readonly IConfiguration _configuration;
+ private readonly INatsConnection _nats;
private readonly ILogger _logger;
private readonly IDictionary _handlerMap;
public WebSocketService(
IEnumerable handlers,
ILogger logger,
- IConfiguration configuration
+ INatsConnection nats
)
{
_logger = logger;
- _configuration = configuration;
_handlerMap = handlers.ToDictionary(h => h.PacketType);
+ _nats = nats;
}
private static readonly ConcurrentDictionary<
- (string AccountId, string DeviceId),
+ (Guid AccountId, string DeviceId),
(WebSocket Socket, CancellationTokenSource Cts)
> ActiveConnections = new();
private static readonly ConcurrentDictionary ActiveSubscriptions = new(); // deviceId -> chatRoomId
public bool TryAdd(
- (string AccountId, string DeviceId) key,
+ (Guid AccountId, string DeviceId) key,
WebSocket socket,
CancellationTokenSource cts
)
@@ -42,7 +44,7 @@ public class WebSocketService
return ActiveConnections.TryAdd(key, (socket, cts));
}
- public void Disconnect((string AccountId, string DeviceId) key, string? reason = null)
+ public void Disconnect((Guid AccountId, string DeviceId) key, string? reason = null)
{
if (!ActiveConnections.TryGetValue(key, out var data)) return;
try
@@ -63,19 +65,19 @@ public class WebSocketService
ActiveConnections.TryRemove(key, out _);
}
- public bool GetDeviceIsConnected(string deviceId)
+ public static bool GetDeviceIsConnected(string deviceId)
{
return ActiveConnections.Any(c => c.Key.DeviceId == deviceId);
}
- public bool GetAccountIsConnected(string accountId)
+ public static bool GetAccountIsConnected(Guid accountId)
{
return ActiveConnections.Any(c => c.Key.AccountId == accountId);
}
- public void SendPacketToAccount(string userId, WebSocketPacket packet)
+ public static void SendPacketToAccount(Guid accountId, WebSocketPacket packet)
{
- var connections = ActiveConnections.Where(c => c.Key.AccountId == userId);
+ var connections = ActiveConnections.Where(c => c.Key.AccountId == accountId);
var packetBytes = packet.ToBytes();
var segment = new ArraySegment(packetBytes);
@@ -139,28 +141,16 @@ public class WebSocketService
try
{
var endpoint = packet.Endpoint.Replace("DysonNetwork.", "").ToLower();
- var serviceUrl = "https://_grpc." + endpoint;
-
- var callInvoker = GrpcClientHelper.CreateCallInvoker(serviceUrl);
- var client = new RingHandlerService.RingHandlerServiceClient(callInvoker);
-
- try
+ await _nats.PublishAsync(WebSocketPacketEvent.SubjectPrefix + endpoint, new WebSocketPacketEvent
{
- await client.ReceiveWebSocketPacketAsync(new ReceiveWebSocketPacketRequest
- {
- Account = currentUser,
- DeviceId = deviceId,
- Packet = packet.ToProtoValue()
- });
- }
- catch (RpcException ex)
- {
- _logger.LogError(ex, $"Error forwarding packet to endpoint: {packet.Endpoint} (${endpoint})");
- }
+ AccountId = Guid.Parse(currentUser.Id),
+ DeviceId = deviceId,
+ PacketBytes = packet.ToBytes()
+ });
}
catch (Exception ex)
{
- _logger.LogError(ex, $"Error forwarding packet to endpoint: {packet.Endpoint}");
+ _logger.LogError(ex, "Error forwarding packet to endpoint: {Endpoint}", packet.Endpoint);
}
}
@@ -175,4 +165,4 @@ public class WebSocketService
CancellationToken.None
);
}
-}
\ No newline at end of file
+}
diff --git a/DysonNetwork.Ring/Notification/PushService.cs b/DysonNetwork.Ring/Notification/PushService.cs
index 29b5eb8..c9d5074 100644
--- a/DysonNetwork.Ring/Notification/PushService.cs
+++ b/DysonNetwork.Ring/Notification/PushService.cs
@@ -5,14 +5,13 @@ using DysonNetwork.Ring.Services;
using DysonNetwork.Shared.Proto;
using Microsoft.EntityFrameworkCore;
using NodaTime;
-using WebSocketPacket = DysonNetwork.Ring.Connection.WebSocketPacket;
+using WebSocketPacket = DysonNetwork.Shared.Data.WebSocketPacket;
namespace DysonNetwork.Ring.Notification;
public class PushService
{
private readonly AppDatabase _db;
- private readonly WebSocketService _ws;
private readonly QueueService _queueService;
private readonly ILogger _logger;
private readonly FirebaseSender? _fcm;
@@ -22,7 +21,6 @@ public class PushService
public PushService(
IConfiguration config,
AppDatabase db,
- WebSocketService ws,
QueueService queueService,
IHttpClientFactory httpFactory,
ILogger logger
@@ -53,7 +51,6 @@ public class PushService
}
_db = db;
- _ws = ws;
_queueService = queueService;
_logger = logger;
}
@@ -73,9 +70,9 @@ public class PushService
)
{
var now = SystemClock.Instance.GetCurrentInstant();
- var accountId = Guid.Parse(account.Id!);
+ var accountId = Guid.Parse(account.Id);
- // Check for existing subscription with same device ID or token
+ // Check for existing subscription with the same device ID or token
var existingSubscription = await _db.PushSubscriptions
.Where(s => s.AccountId == accountId)
.Where(s => s.DeviceId == deviceId || s.DeviceToken == deviceToken)
@@ -125,7 +122,7 @@ public class PushService
if (actionUri is not null) meta["action_uri"] = actionUri;
- var accountId = Guid.Parse(account.Id!);
+ var accountId = account.Id;
var notification = new Notification
{
Topic = topic,
@@ -133,7 +130,7 @@ public class PushService
Subtitle = subtitle,
Content = content,
Meta = meta,
- AccountId = accountId,
+ AccountId = Guid.Parse(accountId),
};
if (save)
@@ -143,12 +140,12 @@ public class PushService
}
if (!isSilent)
- _ = _queueService.EnqueuePushNotification(notification, accountId, save);
+ _ = _queueService.EnqueuePushNotification(notification, Guid.Parse(accountId), save);
}
public async Task DeliverPushNotification(Notification notification, CancellationToken cancellationToken = default)
{
- _ws.SendPacketToAccount(notification.AccountId.ToString(), new WebSocketPacket()
+ WebSocketService.SendPacketToAccount(notification.AccountId, new WebSocketPacket()
{
Type = "notifications.new",
Data = notification,
@@ -251,8 +248,8 @@ public class PushService
// WS first
foreach (var account in accounts)
{
- notification.AccountId = account; // keep original behavior
- _ws.SendPacketToAccount(account.ToString(), new Connection.WebSocketPacket
+ notification.AccountId = account;
+ WebSocketService.SendPacketToAccount(account, new WebSocketPacket
{
Type = "notifications.new",
Data = notification
diff --git a/DysonNetwork.Ring/Services/PusherServiceGrpc.cs b/DysonNetwork.Ring/Services/PusherServiceGrpc.cs
index 80a3e08..6957e7b 100644
--- a/DysonNetwork.Ring/Services/PusherServiceGrpc.cs
+++ b/DysonNetwork.Ring/Services/PusherServiceGrpc.cs
@@ -1,10 +1,8 @@
using DysonNetwork.Ring.Connection;
-using DysonNetwork.Ring.Email;
using DysonNetwork.Ring.Notification;
using DysonNetwork.Shared.Proto;
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
-using System.Text.Json;
namespace DysonNetwork.Ring.Services;
@@ -27,30 +25,28 @@ public class RingServiceGrpc(
public override Task PushWebSocketPacket(PushWebSocketPacketRequest request, ServerCallContext context)
{
- var packet = new Connection.WebSocketPacket
+ var packet = new Shared.Data.WebSocketPacket
{
Type = request.Packet.Type,
Data = GrpcTypeHelper.ConvertByteStringToObject>(request.Packet.Data),
ErrorMessage = request.Packet.ErrorMessage
};
- websocket.SendPacketToAccount(request.UserId, packet);
+ WebSocketService.SendPacketToAccount(Guid.Parse(request.UserId), packet);
return Task.FromResult(new Empty());
}
public override Task PushWebSocketPacketToUsers(PushWebSocketPacketToUsersRequest request,
ServerCallContext context)
{
- var packet = new Connection.WebSocketPacket
+ var packet = new Shared.Data.WebSocketPacket
{
Type = request.Packet.Type,
Data = GrpcTypeHelper.ConvertByteStringToObject>(request.Packet.Data),
ErrorMessage = request.Packet.ErrorMessage
};
- foreach (var userId in request.UserIds)
- {
- websocket.SendPacketToAccount(userId, packet);
- }
+ foreach (var accountId in request.UserIds)
+ WebSocketService.SendPacketToAccount(Guid.Parse(accountId), packet);
return Task.FromResult(new Empty());
}
@@ -58,7 +54,7 @@ public class RingServiceGrpc(
public override Task PushWebSocketPacketToDevice(PushWebSocketPacketToDeviceRequest request,
ServerCallContext context)
{
- var packet = new Connection.WebSocketPacket
+ var packet = new Shared.Data.WebSocketPacket
{
Type = request.Packet.Type,
Data = GrpcTypeHelper.ConvertByteStringToObject>(request.Packet.Data),
@@ -71,7 +67,7 @@ public override Task PushWebSocketPacketToDevice(PushWebSocketPacketToDev
public override Task PushWebSocketPacketToDevices(PushWebSocketPacketToDevicesRequest request,
ServerCallContext context)
{
- var packet = new Connection.WebSocketPacket
+ var packet = new Shared.Data.WebSocketPacket
{
Type = request.Packet.Type,
Data = GrpcTypeHelper.ConvertByteStringToObject>(request.Packet.Data),
@@ -161,8 +157,8 @@ public override Task PushWebSocketPacketToDevice(PushWebSocketPacketToDev
var isConnected = request.IdCase switch
{
GetWebsocketConnectionStatusRequest.IdOneofCase.DeviceId =>
- websocket.GetDeviceIsConnected(request.DeviceId),
- GetWebsocketConnectionStatusRequest.IdOneofCase.UserId => websocket.GetAccountIsConnected(request.UserId),
+ WebSocketService.GetDeviceIsConnected(request.DeviceId),
+ GetWebsocketConnectionStatusRequest.IdOneofCase.UserId => WebSocketService.GetAccountIsConnected(Guid.Parse(request.UserId)),
_ => false
};
diff --git a/DysonNetwork.Ring/Connection/WebSocketPacket.cs b/DysonNetwork.Shared/Data/WebSocketPacket.cs
similarity index 67%
rename from DysonNetwork.Ring/Connection/WebSocketPacket.cs
rename to DysonNetwork.Shared/Data/WebSocketPacket.cs
index 3050de0..260337b 100644
--- a/DysonNetwork.Ring/Connection/WebSocketPacket.cs
+++ b/DysonNetwork.Shared/Data/WebSocketPacket.cs
@@ -4,7 +4,7 @@ using DysonNetwork.Shared.Proto;
using NodaTime;
using NodaTime.Serialization.SystemTextJson;
-namespace DysonNetwork.Ring.Connection;
+namespace DysonNetwork.Shared.Data;
public class WebSocketPacket
{
@@ -27,13 +27,7 @@ public class WebSocketPacket
public static WebSocketPacket FromBytes(byte[] bytes)
{
var json = System.Text.Encoding.UTF8.GetString(bytes);
- var jsonOpts = new JsonSerializerOptions
- {
- NumberHandling = JsonNumberHandling.AllowNamedFloatingPointLiterals,
- PropertyNamingPolicy = JsonNamingPolicy.SnakeCaseLower,
- DictionaryKeyPolicy = JsonNamingPolicy.SnakeCaseLower,
- };
- return JsonSerializer.Deserialize(json, jsonOpts) ??
+ return JsonSerializer.Deserialize(json, GrpcTypeHelper.SerializerOptions) ??
throw new JsonException("Failed to deserialize WebSocketPacket");
}
@@ -47,15 +41,9 @@ public class WebSocketPacket
if (Data is T typedData)
return typedData;
- var jsonOpts = new JsonSerializerOptions
- {
- NumberHandling = JsonNumberHandling.AllowNamedFloatingPointLiterals,
- PropertyNamingPolicy = JsonNamingPolicy.SnakeCaseLower,
- DictionaryKeyPolicy = JsonNamingPolicy.SnakeCaseLower,
- };
return JsonSerializer.Deserialize(
- JsonSerializer.Serialize(Data, jsonOpts),
- jsonOpts
+ JsonSerializer.Serialize(Data, GrpcTypeHelper.SerializerOptions),
+ GrpcTypeHelper.SerializerOptions
);
}
@@ -65,16 +53,10 @@ public class WebSocketPacket
/// Byte array representation of the packet
public byte[] ToBytes()
{
- var jsonOpts = new JsonSerializerOptions
- {
- NumberHandling = JsonNumberHandling.AllowNamedFloatingPointLiterals,
- PropertyNamingPolicy = JsonNamingPolicy.SnakeCaseLower,
- DictionaryKeyPolicy = JsonNamingPolicy.SnakeCaseLower,
- }.ConfigureForNodaTime(DateTimeZoneProviders.Tzdb);
- var json = JsonSerializer.Serialize(this, jsonOpts);
+ var json = JsonSerializer.Serialize(this, GrpcTypeHelper.SerializerOptions);
return System.Text.Encoding.UTF8.GetBytes(json);
}
-
+
public Shared.Proto.WebSocketPacket ToProtoValue()
{
return new Shared.Proto.WebSocketPacket
@@ -94,4 +76,4 @@ public class WebSocketPacket
ErrorMessage = packet.ErrorMessage
};
}
-}
+}
\ No newline at end of file
diff --git a/DysonNetwork.Shared/Stream/WebSocketPacketEvent.cs b/DysonNetwork.Shared/Stream/WebSocketPacketEvent.cs
new file mode 100644
index 0000000..0fa82ba
--- /dev/null
+++ b/DysonNetwork.Shared/Stream/WebSocketPacketEvent.cs
@@ -0,0 +1,12 @@
+namespace DysonNetwork.Shared.Stream;
+
+public class WebSocketPacketEvent
+{
+ public static string Type => "websocket_msg";
+
+ public static string SubjectPrefix = "websocket_";
+
+ public Guid AccountId { get; set; }
+ public string DeviceId { get; set; } = null!;
+ public byte[] PacketBytes { get; set; } = null!;
+}
diff --git a/DysonNetwork.Sphere/Chat/ChatService.cs b/DysonNetwork.Sphere/Chat/ChatService.cs
index b1c6e1f..cf043f8 100644
--- a/DysonNetwork.Sphere/Chat/ChatService.cs
+++ b/DysonNetwork.Sphere/Chat/ChatService.cs
@@ -5,6 +5,7 @@ using DysonNetwork.Sphere.Chat.Realtime;
using DysonNetwork.Sphere.WebReader;
using Microsoft.EntityFrameworkCore;
using NodaTime;
+using WebSocketPacket = DysonNetwork.Shared.Proto.WebSocketPacket;
namespace DysonNetwork.Sphere.Chat;
diff --git a/DysonNetwork.Sphere/Connection/WebsocketHandler.cs b/DysonNetwork.Sphere/Connection/WebsocketHandler.cs
deleted file mode 100644
index 4dbf760..0000000
--- a/DysonNetwork.Sphere/Connection/WebsocketHandler.cs
+++ /dev/null
@@ -1,128 +0,0 @@
-using DysonNetwork.Shared.Cache;
-using DysonNetwork.Shared.Proto;
-using DysonNetwork.Sphere.Chat;
-using Google.Protobuf.WellKnownTypes;
-using Grpc.Core;
-
-namespace DysonNetwork.Sphere.Connection;
-
-public class WebSocketHandlerGrpc(RingService.RingServiceClient pusher, ChatRoomService crs, ChatService cs)
- : RingHandlerService.RingHandlerServiceBase
-{
- public override async Task ReceiveWebSocketPacket(
- ReceiveWebSocketPacketRequest request,
- ServerCallContext context
- )
- {
- switch (request.Packet.Type)
- {
- case "messages.read":
- await HandleMessageRead(request, context);
- break;
- case "messages.typing":
- await HandleMessageTyping(request, context);
- break;
- }
-
- return new Empty();
- }
-
- private async Task HandleMessageRead(ReceiveWebSocketPacketRequest request, ServerCallContext context)
- {
- var currentUser = request.Account;
- var packet = request.Packet;
-
- if (packet.Data == null)
- {
- await SendErrorResponse(
- request,
- "Mark message as read requires you to provide the ChatRoomId"
- );
- return;
- }
-
- var requestData = GrpcTypeHelper.ConvertByteStringToObject(packet.Data);
- if (requestData == null)
- {
- await SendErrorResponse(request, "Invalid request data");
- return;
- }
-
- var sender = await crs.GetRoomMember(
- Guid.Parse(currentUser.Id),
- requestData.ChatRoomId
- );
-
- if (sender == null)
- {
- await SendErrorResponse(request, "User is not a member of the chat room.");
- return;
- }
-
- await cs.ReadChatRoomAsync(requestData.ChatRoomId, Guid.Parse(currentUser.Id));
- }
-
- private async Task HandleMessageTyping(ReceiveWebSocketPacketRequest request, ServerCallContext context)
- {
- var currentUser = request.Account;
- var packet = request.Packet;
-
- if (packet.Data == null)
- {
- await SendErrorResponse(request, "messages.typing requires you to provide the ChatRoomId");
- return;
- }
-
- var requestData = GrpcTypeHelper.ConvertByteStringToObject(packet.Data);
- if (requestData == null)
- {
- await SendErrorResponse(request, "Invalid request data");
- return;
- }
-
- var sender = await crs.GetRoomMember(
- Guid.Parse(currentUser.Id),
- requestData.ChatRoomId
- );
- if (sender == null)
- {
- await SendErrorResponse(request, "User is not a member of the chat room.");
- return;
- }
-
- var responsePacket = new WebSocketPacket
- {
- Type = "messages.typing",
- Data = GrpcTypeHelper.ConvertObjectToByteString(new
- {
- room_id = sender.ChatRoomId,
- sender_id = sender.Id,
- sender = sender
- })
- };
-
- // Broadcast typing indicator to other room members
- var otherMembers = (await crs.ListRoomMembers(requestData.ChatRoomId))
- .Where(m => m.AccountId != Guid.Parse(currentUser.Id))
- .Select(m => m.AccountId.ToString())
- .ToList();
-
- var respRequest = new PushWebSocketPacketToUsersRequest() { Packet = responsePacket };
- respRequest.UserIds.AddRange(otherMembers);
-
- await pusher.PushWebSocketPacketToUsersAsync(respRequest);
- }
-
- private async Task SendErrorResponse(ReceiveWebSocketPacketRequest request, string message)
- {
- await pusher.PushWebSocketPacketToDeviceAsync(new PushWebSocketPacketToDeviceRequest
- {
- DeviceId = request.DeviceId,
- Packet = new WebSocketPacket
- {
- Type = "error",
- ErrorMessage = message
- }
- });
- }
-}
\ No newline at end of file
diff --git a/DysonNetwork.Sphere/Startup/ApplicationConfiguration.cs b/DysonNetwork.Sphere/Startup/ApplicationConfiguration.cs
index 80d69db..81bfa88 100644
--- a/DysonNetwork.Sphere/Startup/ApplicationConfiguration.cs
+++ b/DysonNetwork.Sphere/Startup/ApplicationConfiguration.cs
@@ -1,6 +1,5 @@
using DysonNetwork.Shared.Auth;
using DysonNetwork.Shared.Http;
-using DysonNetwork.Sphere.Connection;
using DysonNetwork.Sphere.Publisher;
using Prometheus;
@@ -28,7 +27,6 @@ public static class ApplicationConfiguration
app.MapControllers();
// Map gRPC services
- app.MapGrpcService();
app.MapGrpcService();
return app;
diff --git a/DysonNetwork.Sphere/Startup/BroadcastEventHandler.cs b/DysonNetwork.Sphere/Startup/BroadcastEventHandler.cs
index 97cd5f3..3bafb9e 100644
--- a/DysonNetwork.Sphere/Startup/BroadcastEventHandler.cs
+++ b/DysonNetwork.Sphere/Startup/BroadcastEventHandler.cs
@@ -2,11 +2,14 @@ using System.Text.Json;
using System.Text.Json.Serialization;
using DysonNetwork.Shared.Proto;
using DysonNetwork.Shared.Stream;
+using DysonNetwork.Sphere.Chat;
using DysonNetwork.Sphere.Post;
+using DysonNetwork.Sphere.Realm;
using Microsoft.EntityFrameworkCore;
using NATS.Client.Core;
using NATS.Client.JetStream.Models;
using NATS.Net;
+using WebSocketPacket = DysonNetwork.Shared.Data.WebSocketPacket;
namespace DysonNetwork.Sphere.Startup;
@@ -25,17 +28,19 @@ public class PaymentOrderAwardMeta
}
public class BroadcastEventHandler(
- INatsConnection nats,
+ IServiceProvider serviceProvider,
ILogger logger,
- IServiceProvider serviceProvider
+ INatsConnection nats,
+ RingService.RingServiceClient pusher
) : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var paymentTask = HandlePaymentOrders(stoppingToken);
var accountTask = HandleAccountDeletions(stoppingToken);
+ var websocketTask = HandleWebSocketPackets(stoppingToken);
- await Task.WhenAll(paymentTask, accountTask);
+ await Task.WhenAll(paymentTask, accountTask, websocketTask);
}
private async Task HandlePaymentOrders(CancellationToken stoppingToken)
@@ -53,7 +58,7 @@ public class BroadcastEventHandler(
try
{
evt = JsonSerializer.Deserialize(msg.Data, GrpcTypeHelper.SerializerOptions);
-
+
logger.LogInformation(
"Received order event: {ProductIdentifier} {OrderId}",
evt?.ProductIdentifier,
@@ -101,9 +106,9 @@ public class BroadcastEventHandler(
private async Task HandleAccountDeletions(CancellationToken stoppingToken)
{
var js = nats.CreateJetStreamContext();
-
+
await js.EnsureStreamCreated("account_events", [AccountDeletedEvent.Type]);
-
+
var consumer = await js.CreateOrUpdateConsumerAsync("account_events",
new ConsumerConfig("sphere_account_deleted_handler"), cancellationToken: stoppingToken);
@@ -165,4 +170,120 @@ public class BroadcastEventHandler(
}
}
}
+
+ private async Task HandleWebSocketPackets(CancellationToken stoppingToken)
+ {
+ await foreach (var msg in nats.SubscribeAsync(
+ WebSocketPacketEvent.SubjectPrefix + "sphere", cancellationToken: stoppingToken))
+ {
+ try
+ {
+ var evt = msg.Data;
+ var packet = WebSocketPacket.FromBytes(evt.PacketBytes);
+ switch (packet.Type)
+ {
+ case "messages.read":
+ await HandleMessageRead(evt, packet);
+ break;
+ case "messages.typing":
+ await HandleMessageTyping(evt, packet);
+ break;
+ }
+ }
+ catch (Exception ex)
+ {
+ logger.LogError(ex, "Error processing websocket packet");
+ }
+ }
+ }
+
+ private async Task HandleMessageRead(WebSocketPacketEvent evt, WebSocketPacket packet)
+ {
+ using var scope = serviceProvider.CreateScope();
+ var cs = scope.ServiceProvider.GetRequiredService();
+ var crs = scope.ServiceProvider.GetRequiredService();
+
+ if (packet.Data == null)
+ {
+ await SendErrorResponse(evt, "Mark message as read requires you to provide the ChatRoomId");
+ return;
+ }
+
+ var requestData = packet.GetData();
+ if (requestData == null)
+ {
+ await SendErrorResponse(evt, "Invalid request data");
+ return;
+ }
+
+ var sender = await crs.GetRoomMember(evt.AccountId, requestData.ChatRoomId);
+ if (sender == null)
+ {
+ await SendErrorResponse(evt, "User is not a member of the chat room.");
+ return;
+ }
+
+ await cs.ReadChatRoomAsync(requestData.ChatRoomId, evt.AccountId);
+ }
+
+ private async Task HandleMessageTyping(WebSocketPacketEvent evt, WebSocketPacket packet)
+ {
+ using var scope = serviceProvider.CreateScope();
+ var crs = scope.ServiceProvider.GetRequiredService();
+
+ if (packet.Data == null)
+ {
+ await SendErrorResponse(evt, "messages.typing requires you to provide the ChatRoomId");
+ return;
+ }
+
+ var requestData = packet.GetData();
+ if (requestData == null)
+ {
+ await SendErrorResponse(evt, "Invalid request data");
+ return;
+ }
+
+ var sender = await crs.GetRoomMember(evt.AccountId, requestData.ChatRoomId);
+ if (sender == null)
+ {
+ await SendErrorResponse(evt, "User is not a member of the chat room.");
+ return;
+ }
+
+ var responsePacket = new WebSocketPacket
+ {
+ Type = "messages.typing",
+ Data = GrpcTypeHelper.ConvertObjectToByteString(new
+ {
+ room_id = sender.ChatRoomId,
+ sender_id = sender.Id,
+ sender = sender
+ })
+ };
+
+ // Broadcast typing indicator to other room members
+ var otherMembers = (await crs.ListRoomMembers(requestData.ChatRoomId))
+ .Where(m => m.AccountId != evt.AccountId)
+ .Select(m => m.AccountId.ToString())
+ .ToList();
+
+ var respRequest = new PushWebSocketPacketToUsersRequest() { Packet = responsePacket.ToProtoValue() };
+ respRequest.UserIds.AddRange(otherMembers);
+
+ await pusher.PushWebSocketPacketToUsersAsync(respRequest);
+ }
+
+ private async Task SendErrorResponse(WebSocketPacketEvent evt, string message)
+ {
+ await pusher.PushWebSocketPacketToDeviceAsync(new PushWebSocketPacketToDeviceRequest
+ {
+ DeviceId = evt.DeviceId,
+ Packet = new WebSocketPacket
+ {
+ Type = "error",
+ ErrorMessage = message
+ }.ToProtoValue()
+ });
+ }
}
\ No newline at end of file
diff --git a/DysonNetwork.sln.DotSettings.user b/DysonNetwork.sln.DotSettings.user
index fbceea4..90d3763 100644
--- a/DysonNetwork.sln.DotSettings.user
+++ b/DysonNetwork.sln.DotSettings.user
@@ -21,6 +21,7 @@
ForceIncluded
ForceIncluded
ForceIncluded
+ ForceIncluded
ForceIncluded
ForceIncluded
ForceIncluded
@@ -162,6 +163,7 @@
ForceIncluded
<AssemblyExplorer>
<Assembly Path="/opt/homebrew/Cellar/dotnet/9.0.6/libexec/packs/Microsoft.AspNetCore.App.Ref/9.0.6/ref/net9.0/Microsoft.AspNetCore.RateLimiting.dll" />
+ <Assembly Path="/Users/littlesheep/.nuget/packages/nodatime/3.2.2/lib/net8.0/NodaTime.dll" />
</AssemblyExplorer>
False