From 58a44e8af46bcc9119dbcc9b0e4952de87731bdf Mon Sep 17 00:00:00 2001 From: LittleSheep Date: Sat, 27 Sep 2025 19:25:10 +0800 Subject: [PATCH] :sparkles: Chat subscribe fixes and status update --- .../Account/AccountEventService.cs | 22 +++- .../Startup/BroadcastEventHandler.cs | 107 +++++++++++++++++- .../Connection/WebSocketController.cs | 31 ++++- DysonNetwork.Shared/Stream/AccountEvent.cs | 14 ++- .../Stream/WebSocketPacketEvent.cs | 22 ++++ .../Chat/ChatRoomController.cs | 30 +++++ DysonNetwork.Sphere/Chat/ChatRoomService.cs | 16 ++- .../Startup/BroadcastEventHandler.cs | 65 ++++++++++- 8 files changed, 292 insertions(+), 15 deletions(-) diff --git a/DysonNetwork.Pass/Account/AccountEventService.cs b/DysonNetwork.Pass/Account/AccountEventService.cs index fe3a762..493cd41 100644 --- a/DysonNetwork.Pass/Account/AccountEventService.cs +++ b/DysonNetwork.Pass/Account/AccountEventService.cs @@ -3,8 +3,11 @@ using DysonNetwork.Pass.Wallet; using DysonNetwork.Shared.Cache; using DysonNetwork.Shared.Models; using DysonNetwork.Shared.Proto; +using DysonNetwork.Shared.Stream; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Localization; +using NATS.Client.Core; +using NATS.Net; using NodaTime; using NodaTime.Extensions; @@ -17,7 +20,8 @@ public class AccountEventService( IStringLocalizer localizer, RingService.RingServiceClient pusher, SubscriptionService subscriptions, - Pass.Leveling.ExperienceService experienceService + Pass.Leveling.ExperienceService experienceService, + INatsConnection nats ) { private static readonly Random Random = new(); @@ -37,6 +41,19 @@ public class AccountEventService( cache.RemoveAsync(cacheKey); } + private async Task BroadcastStatusUpdate(SnAccountStatus status) + { + await nats.PublishAsync( + AccountStatusUpdatedEvent.Type, + GrpcTypeHelper.ConvertObjectToByteString(new AccountStatusUpdatedEvent + { + AccountId = status.AccountId, + Status = status, + UpdatedAt = SystemClock.Instance.GetCurrentInstant() + }).ToByteArray() + ); + } + public async Task GetStatus(Guid userId) { var cacheKey = $"{StatusCacheKey}{userId}"; @@ -158,6 +175,8 @@ public class AccountEventService( db.AccountStatuses.Add(status); await db.SaveChangesAsync(); + await BroadcastStatusUpdate(status); + return status; } @@ -167,6 +186,7 @@ public class AccountEventService( db.Update(status); await db.SaveChangesAsync(); PurgeStatusCache(user.Id); + await BroadcastStatusUpdate(status); } private const int FortuneTipCount = 14; // This will be the max index for each type (positive/negative) diff --git a/DysonNetwork.Pass/Startup/BroadcastEventHandler.cs b/DysonNetwork.Pass/Startup/BroadcastEventHandler.cs index efead54..682e792 100644 --- a/DysonNetwork.Pass/Startup/BroadcastEventHandler.cs +++ b/DysonNetwork.Pass/Startup/BroadcastEventHandler.cs @@ -1,11 +1,14 @@ using System.Text.Json; +using DysonNetwork.Pass.Account; using DysonNetwork.Pass.Wallet; using DysonNetwork.Shared.Models; using DysonNetwork.Shared.Proto; using DysonNetwork.Shared.Stream; +using Google.Protobuf; using NATS.Client.Core; using NATS.Client.JetStream.Models; using NATS.Net; +using NodaTime; namespace DysonNetwork.Pass.Startup; @@ -16,22 +19,30 @@ public class BroadcastEventHandler( ) : BackgroundService { protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + var paymentTask = HandlePaymentEventsAsync(stoppingToken); + var webSocketTask = HandleWebSocketEventsAsync(stoppingToken); + + await Task.WhenAll(paymentTask, webSocketTask); + } + + private async Task HandlePaymentEventsAsync(CancellationToken stoppingToken) { var js = nats.CreateJetStreamContext(); await js.EnsureStreamCreated("payment_events", [PaymentOrderEventBase.Type]); - - var consumer = await js.CreateOrUpdateConsumerAsync("payment_events", - new ConsumerConfig("pass_payment_handler"), + + var consumer = await js.CreateOrUpdateConsumerAsync("payment_events", + new ConsumerConfig("pass_payment_handler"), cancellationToken: stoppingToken); - + await foreach (var msg in consumer.ConsumeAsync(cancellationToken: stoppingToken)) { PaymentOrderEvent? evt = null; try { evt = JsonSerializer.Deserialize(msg.Data, GrpcTypeHelper.SerializerOptions); - + logger.LogInformation( "Received order event: {ProductIdentifier} {OrderId}", evt?.ProductIdentifier, @@ -66,9 +77,93 @@ public class BroadcastEventHandler( } catch (Exception ex) { - logger.LogError(ex, "Error processing payment order event for order {OrderId}. Redelivering.", evt?.OrderId); + logger.LogError(ex, "Error processing payment order event for order {OrderId}. Redelivering.", + evt?.OrderId); await msg.NakAsync(cancellationToken: stoppingToken); } } } + + private async Task HandleWebSocketEventsAsync(CancellationToken stoppingToken) + { + var connectedTask = HandleConnectedEventsAsync(stoppingToken); + var disconnectedTask = HandleDisconnectedEventsAsync(stoppingToken); + + await Task.WhenAll(connectedTask, disconnectedTask); + } + + private async Task HandleConnectedEventsAsync(CancellationToken stoppingToken) + { + await foreach (var msg in nats.SubscribeAsync("websocket_connected", cancellationToken: stoppingToken)) + { + try + { + var evt = + GrpcTypeHelper.ConvertByteStringToObject(ByteString.CopyFrom(msg.Data)); + + logger.LogInformation("Received WebSocket connected event for user {AccountId}, device {DeviceId}", + evt.AccountId, evt.DeviceId); + + await using var scope = serviceProvider.CreateAsyncScope(); + var accountEventService = scope.ServiceProvider.GetRequiredService(); + + var status = await accountEventService.GetStatus(evt.AccountId); + + await nats.PublishAsync( + AccountStatusUpdatedEvent.Type, + GrpcTypeHelper.ConvertObjectToByteString(new AccountStatusUpdatedEvent + { + AccountId = evt.AccountId, + Status = status, + UpdatedAt = SystemClock.Instance.GetCurrentInstant() + }).ToByteArray() + ); + + logger.LogInformation("Broadcasted status update for user {AccountId}", evt.AccountId); + } + catch (Exception ex) + { + logger.LogError(ex, "Error processing WebSocket connected event"); + } + } + } + + private async Task HandleDisconnectedEventsAsync(CancellationToken stoppingToken) + { + await foreach (var msg in nats.SubscribeAsync("websocket_disconnected", + cancellationToken: stoppingToken)) + { + try + { + var evt = + GrpcTypeHelper.ConvertByteStringToObject(ByteString.CopyFrom(msg.Data)); + + logger.LogInformation( + "Received WebSocket disconnected event for user {AccountId}, device {DeviceId}, IsOffline: {IsOffline}", + evt.AccountId, evt.DeviceId, evt.IsOffline + ); + + await using var scope = serviceProvider.CreateAsyncScope(); + var accountEventService = scope.ServiceProvider.GetRequiredService(); + + var status = await accountEventService.GetStatus(evt.AccountId); + + await nats.PublishAsync( + AccountStatusUpdatedEvent.Type, + GrpcTypeHelper.ConvertObjectToByteString(new AccountStatusUpdatedEvent + { + AccountId = evt.AccountId, + Status = status, + UpdatedAt = SystemClock.Instance.GetCurrentInstant() + }).ToByteArray() + ); + + logger.LogInformation("Broadcasted status update for user {AccountId}", evt.AccountId); + } + catch (Exception ex) + { + logger.LogError(ex, "Error processing WebSocket disconnected event"); + } + } + } } diff --git a/DysonNetwork.Ring/Connection/WebSocketController.cs b/DysonNetwork.Ring/Connection/WebSocketController.cs index e5027f1..4e1eeac 100644 --- a/DysonNetwork.Ring/Connection/WebSocketController.cs +++ b/DysonNetwork.Ring/Connection/WebSocketController.cs @@ -1,7 +1,10 @@ using System.Net.WebSockets; using DysonNetwork.Shared.Proto; +using DysonNetwork.Shared.Stream; using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Mvc; +using NATS.Client.Core; +using NATS.Net; using Swashbuckle.AspNetCore.Annotations; using WebSocketPacket = DysonNetwork.Shared.Models.WebSocketPacket; @@ -10,7 +13,8 @@ namespace DysonNetwork.Ring.Connection; [ApiController] public class WebSocketController( WebSocketService ws, - ILogger logger + ILogger logger, + INatsConnection nats ) : ControllerBase { [Route("/ws")] @@ -64,6 +68,18 @@ public class WebSocketController( logger.LogDebug( $"Connection established with user @{currentUser.Name}#{currentUser.Id} and device #{deviceId}"); + // Broadcast WebSocket connected event + await nats.PublishAsync( + WebSocketConnectedEvent.Type, + GrpcTypeHelper.ConvertObjectToByteString(new WebSocketConnectedEvent + { + AccountId = accountId, + DeviceId = deviceId, + IsOffline = false + }).ToByteArray(), + cancellationToken: cts.Token + ); + try { await _ConnectionEventLoop(deviceId, currentUser, webSocket, cts.Token); @@ -80,6 +96,19 @@ public class WebSocketController( finally { ws.Disconnect(connectionKey); + + // Broadcast WebSocket disconnected event + await nats.PublishAsync( + WebSocketDisconnectedEvent.Type, + GrpcTypeHelper.ConvertObjectToByteString(new WebSocketDisconnectedEvent + { + AccountId = accountId, + DeviceId = deviceId, + IsOffline = !WebSocketService.GetAccountIsConnected(accountId) + }).ToByteArray(), + cancellationToken: cts.Token + ); + logger.LogDebug( $"Connection disconnected with user @{currentUser.Name}#{currentUser.Id} and device #{deviceId}" ); diff --git a/DysonNetwork.Shared/Stream/AccountEvent.cs b/DysonNetwork.Shared/Stream/AccountEvent.cs index f1d12f7..310258c 100644 --- a/DysonNetwork.Shared/Stream/AccountEvent.cs +++ b/DysonNetwork.Shared/Stream/AccountEvent.cs @@ -1,3 +1,4 @@ +using DysonNetwork.Shared.Models; using NodaTime; namespace DysonNetwork.Shared.Stream; @@ -5,7 +6,16 @@ namespace DysonNetwork.Shared.Stream; public class AccountDeletedEvent { public static string Type => "account_deleted"; - + public Guid AccountId { get; set; } = Guid.NewGuid(); public Instant DeletedAt { get; set; } = SystemClock.Instance.GetCurrentInstant(); -} \ No newline at end of file +} + +public class AccountStatusUpdatedEvent +{ + public static string Type => "account_status_updated"; + + public Guid AccountId { get; set; } + public SnAccountStatus Status { get; set; } = new(); + public Instant UpdatedAt { get; set; } = SystemClock.Instance.GetCurrentInstant(); +} diff --git a/DysonNetwork.Shared/Stream/WebSocketPacketEvent.cs b/DysonNetwork.Shared/Stream/WebSocketPacketEvent.cs index 6fc79d4..4707c6d 100644 --- a/DysonNetwork.Shared/Stream/WebSocketPacketEvent.cs +++ b/DysonNetwork.Shared/Stream/WebSocketPacketEvent.cs @@ -1,3 +1,5 @@ +using NodaTime; + namespace DysonNetwork.Shared.Stream; public class WebSocketPacketEvent @@ -10,3 +12,23 @@ public class WebSocketPacketEvent public string DeviceId { get; set; } = null!; public byte[] PacketBytes { get; set; } = null!; } + +public class WebSocketConnectedEvent +{ + public static string Type => "websocket_connected"; + + public Guid AccountId { get; set; } + public string DeviceId { get; set; } = null!; + public Instant ConnectedAt { get; set; } = SystemClock.Instance.GetCurrentInstant(); + public bool IsOffline { get; set; } = false; +} + +public class WebSocketDisconnectedEvent +{ + public static string Type => "websocket_disconnected"; + + public Guid AccountId { get; set; } + public string DeviceId { get; set; } = null!; + public Instant DisconnectedAt { get; set; } = SystemClock.Instance.GetCurrentInstant(); + public bool IsOffline { get; set; } +} diff --git a/DysonNetwork.Sphere/Chat/ChatRoomController.cs b/DysonNetwork.Sphere/Chat/ChatRoomController.cs index 4e69267..54c5ec0 100644 --- a/DysonNetwork.Sphere/Chat/ChatRoomController.cs +++ b/DysonNetwork.Sphere/Chat/ChatRoomController.cs @@ -483,6 +483,36 @@ public class ChatRoomController( return Ok(await crs.LoadMemberAccount(member)); } + [HttpGet("{roomId:guid}/members/online")] + public async Task> GetOnlineUsersCount(Guid roomId) + { + var currentUser = HttpContext.Items["CurrentUser"] as Account; + + var room = await db.ChatRooms + .FirstOrDefaultAsync(r => r.Id == roomId); + if (room is null) return NotFound(); + + if (!room.IsPublic) + { + if (currentUser is null) return Unauthorized(); + var member = await db.ChatMembers + .FirstOrDefaultAsync(m => m.ChatRoomId == roomId && m.AccountId == Guid.Parse(currentUser.Id)); + if (member is null) return StatusCode(403, "You need to be a member to see online count of private chat room."); + } + + var members = await db.ChatMembers + .Where(m => m.ChatRoomId == roomId) + .Where(m => m.LeaveAt == null) + .Select(m => m.AccountId) + .ToListAsync(); + + var memberStatuses = await accountsHelper.GetAccountStatusBatch(members); + + var onlineCount = memberStatuses.Count(s => s.Value.IsOnline); + + return Ok(onlineCount); + } + [HttpGet("{roomId:guid}/members")] public async Task>> ListMembers(Guid roomId, [FromQuery] int take = 20, diff --git a/DysonNetwork.Sphere/Chat/ChatRoomService.cs b/DysonNetwork.Sphere/Chat/ChatRoomService.cs index d0bb770..8bf644e 100644 --- a/DysonNetwork.Sphere/Chat/ChatRoomService.cs +++ b/DysonNetwork.Sphere/Chat/ChatRoomService.cs @@ -167,20 +167,28 @@ public class ChatRoomService( public async Task SubscribeChatRoom(SnChatMember member) { - var cacheKey = ChatRoomSubscribeKeyPrefix + member.Id; + var cacheKey = $"{ChatRoomSubscribeKeyPrefix}{member.ChatRoomId}:{member.Id}"; await cache.SetAsync(cacheKey, true, TimeSpan.FromHours(1)); + await cache.AddToGroupAsync(cacheKey, $"chatroom:subscribers:{member.ChatRoomId}"); } public async Task UnsubscribeChatRoom(SnChatMember member) { - var cacheKey = ChatRoomSubscribeKeyPrefix + member.Id; + var cacheKey = $"{ChatRoomSubscribeKeyPrefix}{member.ChatRoomId}:{member.Id}"; await cache.RemoveAsync(cacheKey); } - public async Task IsSubscribedChatRoom(Guid memberId) + public async Task IsSubscribedChatRoom(Guid roomId, Guid memberId) { - var cacheKey = ChatRoomSubscribeKeyPrefix + memberId; + var cacheKey = $"{ChatRoomSubscribeKeyPrefix}{roomId}:{memberId}"; var result = await cache.GetAsync(cacheKey); return result ?? false; } + + public async Task> GetSubscribedMembers(Guid roomId) + { + var group = $"chatroom:subscribers:{roomId}"; + var keys = await cache.GetGroupKeysAsync(group); + return keys.Select(k => Guid.Parse(k.Split(':').Last())).ToList(); + } } diff --git a/DysonNetwork.Sphere/Startup/BroadcastEventHandler.cs b/DysonNetwork.Sphere/Startup/BroadcastEventHandler.cs index b8fb0f3..729e66e 100644 --- a/DysonNetwork.Sphere/Startup/BroadcastEventHandler.cs +++ b/DysonNetwork.Sphere/Startup/BroadcastEventHandler.cs @@ -5,6 +5,7 @@ using DysonNetwork.Shared.Proto; using DysonNetwork.Shared.Stream; using DysonNetwork.Sphere.Chat; using DysonNetwork.Sphere.Post; +using Google.Protobuf; using Microsoft.EntityFrameworkCore; using NATS.Client.Core; using NATS.Client.JetStream.Models; @@ -39,8 +40,9 @@ public class BroadcastEventHandler( var paymentTask = HandlePaymentOrders(stoppingToken); var accountTask = HandleAccountDeletions(stoppingToken); var websocketTask = HandleWebSocketPackets(stoppingToken); + var accountStatusTask = HandleAccountStatusUpdates(stoppingToken); - await Task.WhenAll(paymentTask, accountTask, websocketTask); + await Task.WhenAll(paymentTask, accountTask, websocketTask, accountStatusTask); } private async Task HandlePaymentOrders(CancellationToken stoppingToken) @@ -340,6 +342,67 @@ public class BroadcastEventHandler( await crs.UnsubscribeChatRoom(sender); } + private async Task HandleAccountStatusUpdates(CancellationToken stoppingToken) + { + await foreach (var msg in nats.SubscribeAsync(AccountStatusUpdatedEvent.Type, cancellationToken: stoppingToken)) + { + try + { + var evt = GrpcTypeHelper.ConvertByteStringToObject(ByteString.CopyFrom(msg.Data)); + if (evt == null) + continue; + + logger.LogInformation("Account status updated: {AccountId}", evt.AccountId); + + await using var scope = serviceProvider.CreateAsyncScope(); + var db = scope.ServiceProvider.GetRequiredService(); + var chatRoomService = scope.ServiceProvider.GetRequiredService(); + + // Get user's joined chat rooms + var userRooms = await db.ChatMembers + .Where(m => m.AccountId == evt.AccountId && m.LeaveAt == null) + .Select(m => m.ChatRoomId) + .ToListAsync(cancellationToken: stoppingToken); + + // Send WebSocket packet to subscribed users per room + foreach (var roomId in userRooms) + { + var members = await chatRoomService.ListRoomMembers(roomId); + var subscribedMemberIds = await chatRoomService.GetSubscribedMembers(roomId); + var subscribedUsers = members + .Where(m => subscribedMemberIds.Contains(m.Id)) + .Select(m => m.AccountId.ToString()) + .ToList(); + + if (subscribedUsers.Count == 0) continue; + var packet = new WebSocketPacket + { + Type = "accounts.status.update", + Data = new Dictionary + { + ["status"] = evt.Status, + ["chat_room_id"] = roomId + } + }; + + var request = new PushWebSocketPacketToUsersRequest + { + Packet = packet.ToProtoValue() + }; + request.UserIds.AddRange(subscribedUsers); + + await pusher.PushWebSocketPacketToUsersAsync(request, cancellationToken: stoppingToken); + + logger.LogInformation("Sent status update for room {roomId} to {count} subscribed users", roomId, subscribedUsers.Count); + } + } + catch (Exception ex) + { + logger.LogError(ex, "Error processing AccountStatusUpdated"); + } + } + } + private async Task SendErrorResponse(WebSocketPacketEvent evt, string message) { await pusher.PushWebSocketPacketToDeviceAsync(new PushWebSocketPacketToDeviceRequest