From eeb583d78dddbb9a1ce3be3e737141ab2ebdaa8c Mon Sep 17 00:00:00 2001 From: LittleSheep Date: Fri, 2 Jan 2026 15:08:02 +0800 Subject: [PATCH] :recycle: Better event broadcast status changes --- .../Account/AccountEventService.cs | 91 ++++++++++++------- .../Startup/BroadcastEventHandler.cs | 63 +++++++++---- 2 files changed, 100 insertions(+), 54 deletions(-) diff --git a/DysonNetwork.Pass/Account/AccountEventService.cs b/DysonNetwork.Pass/Account/AccountEventService.cs index 3e94324..05a2c95 100644 --- a/DysonNetwork.Pass/Account/AccountEventService.cs +++ b/DysonNetwork.Pass/Account/AccountEventService.cs @@ -25,9 +25,10 @@ public class AccountEventService( { private static readonly Random Random = new(); private const string StatusCacheKey = "account:status:"; + private const string PreviousStatusCacheKey = "account:status:prev:"; private const string ActivityCacheKey = "account:activities:"; - public async Task GetAccountIsConnected(Guid userId) + private async Task GetAccountIsConnected(Guid userId) { var resp = await pusher.GetWebsocketConnectionStatusAsync( new GetWebsocketConnectionStatusRequest { UserId = userId.ToString() } @@ -49,6 +50,8 @@ public class AccountEventService( { var cacheKey = $"{StatusCacheKey}{userId}"; cache.RemoveAsync(cacheKey); + var prevCacheKey = $"{PreviousStatusCacheKey}{userId}"; + cache.RemoveAsync(prevCacheKey); } public void PurgeActivityCache(Guid userId) @@ -70,51 +73,71 @@ public class AccountEventService( ); } + private static bool StatusesEqual(SnAccountStatus a, SnAccountStatus b) + { + return a.Attitude == b.Attitude && + a.IsOnline == b.IsOnline && + a.IsCustomized == b.IsCustomized && + a.Label == b.Label && + a.IsInvisible == b.IsInvisible && + a.IsNotDisturb == b.IsNotDisturb; + } + public async Task GetStatus(Guid userId) { var cacheKey = $"{StatusCacheKey}{userId}"; var cachedStatus = await cache.GetAsync(cacheKey); + SnAccountStatus status; if (cachedStatus is not null) { cachedStatus!.IsOnline = !cachedStatus.IsInvisible && await GetAccountIsConnected(userId); - return cachedStatus; + status = cachedStatus; } - - var now = SystemClock.Instance.GetCurrentInstant(); - var status = await db.AccountStatuses - .Where(e => e.AccountId == userId) - .Where(e => e.ClearedAt == null || e.ClearedAt > now) - .OrderByDescending(e => e.CreatedAt) - .FirstOrDefaultAsync(); - var isOnline = await GetAccountIsConnected(userId); - if (status is not null) + else { - status.IsOnline = !status.IsInvisible && isOnline; - await cache.SetWithGroupsAsync(cacheKey, status, [$"{AccountService.AccountCachePrefix}{status.AccountId}"], - TimeSpan.FromMinutes(5)); - return status; - } - - if (isOnline) - { - return new SnAccountStatus + var now = SystemClock.Instance.GetCurrentInstant(); + status = await db.AccountStatuses + .Where(e => e.AccountId == userId) + .Where(e => e.ClearedAt == null || e.ClearedAt > now) + .OrderByDescending(e => e.CreatedAt) + .FirstOrDefaultAsync(); + var isOnline = await GetAccountIsConnected(userId); + if (status is not null) { - Attitude = Shared.Models.StatusAttitude.Neutral, - IsOnline = true, - IsCustomized = false, - Label = "Online", - AccountId = userId, - }; + status.IsOnline = !status.IsInvisible && isOnline; + await cache.SetWithGroupsAsync(cacheKey, status, [$"{AccountService.AccountCachePrefix}{status.AccountId}"], + TimeSpan.FromMinutes(5)); + } + else + { + if (isOnline) + { + status = new SnAccountStatus + { + Attitude = Shared.Models.StatusAttitude.Neutral, + IsOnline = true, + IsCustomized = false, + Label = "Online", + AccountId = userId, + }; + } + else + { + status = new SnAccountStatus + { + Attitude = Shared.Models.StatusAttitude.Neutral, + IsOnline = false, + IsCustomized = false, + Label = "Offline", + AccountId = userId, + }; + } + } } - return new SnAccountStatus - { - Attitude = Shared.Models.StatusAttitude.Neutral, - IsOnline = false, - IsCustomized = false, - Label = "Offline", - AccountId = userId, - }; + await cache.SetAsync($"{PreviousStatusCacheKey}{userId}", status, TimeSpan.FromMinutes(5)); + + return status; } public async Task> GetStatuses(List userIds) diff --git a/DysonNetwork.Pass/Startup/BroadcastEventHandler.cs b/DysonNetwork.Pass/Startup/BroadcastEventHandler.cs index ef48aef..847c9ca 100644 --- a/DysonNetwork.Pass/Startup/BroadcastEventHandler.cs +++ b/DysonNetwork.Pass/Startup/BroadcastEventHandler.cs @@ -1,6 +1,7 @@ using System.Text.Json; using DysonNetwork.Pass.Account; using DysonNetwork.Pass.Wallet; +using DysonNetwork.Shared.Cache; using DysonNetwork.Shared.Models; using DysonNetwork.Shared.Proto; using DysonNetwork.Shared.Queue; @@ -18,6 +19,16 @@ public class BroadcastEventHandler( IServiceProvider serviceProvider ) : BackgroundService { + private static bool StatusesEqual(SnAccountStatus a, SnAccountStatus b) + { + return a.Attitude == b.Attitude && + a.IsOnline == b.IsOnline && + a.IsCustomized == b.IsCustomized && + a.Label == b.Label && + a.IsInvisible == b.IsInvisible && + a.IsNotDisturb == b.IsNotDisturb; + } + protected override async Task ExecuteAsync(CancellationToken stoppingToken) { var paymentTask = HandlePaymentEventsAsync(stoppingToken); @@ -159,26 +170,33 @@ public class BroadcastEventHandler( { var evt = GrpcTypeHelper.ConvertByteStringToObject(ByteString.CopyFrom(msg.Data)); + if (evt is null) continue; logger.LogInformation("Received WebSocket connected event for user {AccountId}, device {DeviceId}", evt.AccountId, evt.DeviceId); await using var scope = serviceProvider.CreateAsyncScope(); var accountEventService = scope.ServiceProvider.GetRequiredService(); + var cache = scope.ServiceProvider.GetRequiredService(); + var previous = await cache.GetAsync($"account:status:prev:{evt.AccountId}"); var status = await accountEventService.GetStatus(evt.AccountId); - await nats.PublishAsync( - AccountStatusUpdatedEvent.Type, - ByteString.CopyFromUtf8(JsonSerializer.Serialize(new AccountStatusUpdatedEvent - { - AccountId = evt.AccountId, - Status = status, - UpdatedAt = SystemClock.Instance.GetCurrentInstant() - }, GrpcTypeHelper.SerializerOptionsWithoutIgnore)).ToByteArray() - ); + if (previous != null && !StatusesEqual(previous, status)) + { + await nats.PublishAsync( + AccountStatusUpdatedEvent.Type, + ByteString.CopyFromUtf8(JsonSerializer.Serialize(new AccountStatusUpdatedEvent + { + AccountId = evt.AccountId, + Status = status, + UpdatedAt = SystemClock.Instance.GetCurrentInstant() + }, GrpcTypeHelper.SerializerOptionsWithoutIgnore)).ToByteArray(), + cancellationToken: stoppingToken + ); + } - logger.LogInformation("Broadcasted status update for user {AccountId}", evt.AccountId); + logger.LogInformation("Handled status update for user {AccountId} on connect", evt.AccountId); } catch (Exception ex) { @@ -204,20 +222,25 @@ public class BroadcastEventHandler( await using var scope = serviceProvider.CreateAsyncScope(); var accountEventService = scope.ServiceProvider.GetRequiredService(); + var cache = scope.ServiceProvider.GetRequiredService(); + var previous = await cache.GetAsync($"account:status:prev:{evt.AccountId}"); var status = await accountEventService.GetStatus(evt.AccountId); - await nats.PublishAsync( - AccountStatusUpdatedEvent.Type, - ByteString.CopyFromUtf8(JsonSerializer.Serialize(new AccountStatusUpdatedEvent - { - AccountId = evt.AccountId, - Status = status, - UpdatedAt = SystemClock.Instance.GetCurrentInstant() - }, GrpcTypeHelper.SerializerOptionsWithoutIgnore)).ToByteArray() - ); + if (previous != null && !StatusesEqual(previous, status)) + { + await nats.PublishAsync( + AccountStatusUpdatedEvent.Type, + ByteString.CopyFromUtf8(JsonSerializer.Serialize(new AccountStatusUpdatedEvent + { + AccountId = evt.AccountId, + Status = status, + UpdatedAt = SystemClock.Instance.GetCurrentInstant() + }, GrpcTypeHelper.SerializerOptionsWithoutIgnore)).ToByteArray() + ); + } - logger.LogInformation("Broadcasted status update for user {AccountId}", evt.AccountId); + logger.LogInformation("Handled status update for user {AccountId} on disconnect", evt.AccountId); } catch (Exception ex) {