♻️ Better event broadcast status changes
This commit is contained in:
@@ -25,9 +25,10 @@ public class AccountEventService(
|
|||||||
{
|
{
|
||||||
private static readonly Random Random = new();
|
private static readonly Random Random = new();
|
||||||
private const string StatusCacheKey = "account:status:";
|
private const string StatusCacheKey = "account:status:";
|
||||||
|
private const string PreviousStatusCacheKey = "account:status:prev:";
|
||||||
private const string ActivityCacheKey = "account:activities:";
|
private const string ActivityCacheKey = "account:activities:";
|
||||||
|
|
||||||
public async Task<bool> GetAccountIsConnected(Guid userId)
|
private async Task<bool> GetAccountIsConnected(Guid userId)
|
||||||
{
|
{
|
||||||
var resp = await pusher.GetWebsocketConnectionStatusAsync(
|
var resp = await pusher.GetWebsocketConnectionStatusAsync(
|
||||||
new GetWebsocketConnectionStatusRequest { UserId = userId.ToString() }
|
new GetWebsocketConnectionStatusRequest { UserId = userId.ToString() }
|
||||||
@@ -49,6 +50,8 @@ public class AccountEventService(
|
|||||||
{
|
{
|
||||||
var cacheKey = $"{StatusCacheKey}{userId}";
|
var cacheKey = $"{StatusCacheKey}{userId}";
|
||||||
cache.RemoveAsync(cacheKey);
|
cache.RemoveAsync(cacheKey);
|
||||||
|
var prevCacheKey = $"{PreviousStatusCacheKey}{userId}";
|
||||||
|
cache.RemoveAsync(prevCacheKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void PurgeActivityCache(Guid userId)
|
public void PurgeActivityCache(Guid userId)
|
||||||
@@ -70,18 +73,30 @@ public class AccountEventService(
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static bool StatusesEqual(SnAccountStatus a, SnAccountStatus b)
|
||||||
|
{
|
||||||
|
return a.Attitude == b.Attitude &&
|
||||||
|
a.IsOnline == b.IsOnline &&
|
||||||
|
a.IsCustomized == b.IsCustomized &&
|
||||||
|
a.Label == b.Label &&
|
||||||
|
a.IsInvisible == b.IsInvisible &&
|
||||||
|
a.IsNotDisturb == b.IsNotDisturb;
|
||||||
|
}
|
||||||
|
|
||||||
public async Task<SnAccountStatus> GetStatus(Guid userId)
|
public async Task<SnAccountStatus> GetStatus(Guid userId)
|
||||||
{
|
{
|
||||||
var cacheKey = $"{StatusCacheKey}{userId}";
|
var cacheKey = $"{StatusCacheKey}{userId}";
|
||||||
var cachedStatus = await cache.GetAsync<SnAccountStatus>(cacheKey);
|
var cachedStatus = await cache.GetAsync<SnAccountStatus>(cacheKey);
|
||||||
|
SnAccountStatus status;
|
||||||
if (cachedStatus is not null)
|
if (cachedStatus is not null)
|
||||||
{
|
{
|
||||||
cachedStatus!.IsOnline = !cachedStatus.IsInvisible && await GetAccountIsConnected(userId);
|
cachedStatus!.IsOnline = !cachedStatus.IsInvisible && await GetAccountIsConnected(userId);
|
||||||
return cachedStatus;
|
status = cachedStatus;
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
var now = SystemClock.Instance.GetCurrentInstant();
|
var now = SystemClock.Instance.GetCurrentInstant();
|
||||||
var status = await db.AccountStatuses
|
status = await db.AccountStatuses
|
||||||
.Where(e => e.AccountId == userId)
|
.Where(e => e.AccountId == userId)
|
||||||
.Where(e => e.ClearedAt == null || e.ClearedAt > now)
|
.Where(e => e.ClearedAt == null || e.ClearedAt > now)
|
||||||
.OrderByDescending(e => e.CreatedAt)
|
.OrderByDescending(e => e.CreatedAt)
|
||||||
@@ -92,12 +107,12 @@ public class AccountEventService(
|
|||||||
status.IsOnline = !status.IsInvisible && isOnline;
|
status.IsOnline = !status.IsInvisible && isOnline;
|
||||||
await cache.SetWithGroupsAsync(cacheKey, status, [$"{AccountService.AccountCachePrefix}{status.AccountId}"],
|
await cache.SetWithGroupsAsync(cacheKey, status, [$"{AccountService.AccountCachePrefix}{status.AccountId}"],
|
||||||
TimeSpan.FromMinutes(5));
|
TimeSpan.FromMinutes(5));
|
||||||
return status;
|
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
if (isOnline)
|
if (isOnline)
|
||||||
{
|
{
|
||||||
return new SnAccountStatus
|
status = new SnAccountStatus
|
||||||
{
|
{
|
||||||
Attitude = Shared.Models.StatusAttitude.Neutral,
|
Attitude = Shared.Models.StatusAttitude.Neutral,
|
||||||
IsOnline = true,
|
IsOnline = true,
|
||||||
@@ -106,8 +121,9 @@ public class AccountEventService(
|
|||||||
AccountId = userId,
|
AccountId = userId,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
else
|
||||||
return new SnAccountStatus
|
{
|
||||||
|
status = new SnAccountStatus
|
||||||
{
|
{
|
||||||
Attitude = Shared.Models.StatusAttitude.Neutral,
|
Attitude = Shared.Models.StatusAttitude.Neutral,
|
||||||
IsOnline = false,
|
IsOnline = false,
|
||||||
@@ -116,6 +132,13 @@ public class AccountEventService(
|
|||||||
AccountId = userId,
|
AccountId = userId,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
await cache.SetAsync($"{PreviousStatusCacheKey}{userId}", status, TimeSpan.FromMinutes(5));
|
||||||
|
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
|
||||||
public async Task<Dictionary<Guid, SnAccountStatus>> GetStatuses(List<Guid> userIds)
|
public async Task<Dictionary<Guid, SnAccountStatus>> GetStatuses(List<Guid> userIds)
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
using System.Text.Json;
|
using System.Text.Json;
|
||||||
using DysonNetwork.Pass.Account;
|
using DysonNetwork.Pass.Account;
|
||||||
using DysonNetwork.Pass.Wallet;
|
using DysonNetwork.Pass.Wallet;
|
||||||
|
using DysonNetwork.Shared.Cache;
|
||||||
using DysonNetwork.Shared.Models;
|
using DysonNetwork.Shared.Models;
|
||||||
using DysonNetwork.Shared.Proto;
|
using DysonNetwork.Shared.Proto;
|
||||||
using DysonNetwork.Shared.Queue;
|
using DysonNetwork.Shared.Queue;
|
||||||
@@ -18,6 +19,16 @@ public class BroadcastEventHandler(
|
|||||||
IServiceProvider serviceProvider
|
IServiceProvider serviceProvider
|
||||||
) : BackgroundService
|
) : BackgroundService
|
||||||
{
|
{
|
||||||
|
private static bool StatusesEqual(SnAccountStatus a, SnAccountStatus b)
|
||||||
|
{
|
||||||
|
return a.Attitude == b.Attitude &&
|
||||||
|
a.IsOnline == b.IsOnline &&
|
||||||
|
a.IsCustomized == b.IsCustomized &&
|
||||||
|
a.Label == b.Label &&
|
||||||
|
a.IsInvisible == b.IsInvisible &&
|
||||||
|
a.IsNotDisturb == b.IsNotDisturb;
|
||||||
|
}
|
||||||
|
|
||||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||||
{
|
{
|
||||||
var paymentTask = HandlePaymentEventsAsync(stoppingToken);
|
var paymentTask = HandlePaymentEventsAsync(stoppingToken);
|
||||||
@@ -159,15 +170,20 @@ public class BroadcastEventHandler(
|
|||||||
{
|
{
|
||||||
var evt =
|
var evt =
|
||||||
GrpcTypeHelper.ConvertByteStringToObject<WebSocketConnectedEvent>(ByteString.CopyFrom(msg.Data));
|
GrpcTypeHelper.ConvertByteStringToObject<WebSocketConnectedEvent>(ByteString.CopyFrom(msg.Data));
|
||||||
|
if (evt is null) continue;
|
||||||
|
|
||||||
logger.LogInformation("Received WebSocket connected event for user {AccountId}, device {DeviceId}",
|
logger.LogInformation("Received WebSocket connected event for user {AccountId}, device {DeviceId}",
|
||||||
evt.AccountId, evt.DeviceId);
|
evt.AccountId, evt.DeviceId);
|
||||||
|
|
||||||
await using var scope = serviceProvider.CreateAsyncScope();
|
await using var scope = serviceProvider.CreateAsyncScope();
|
||||||
var accountEventService = scope.ServiceProvider.GetRequiredService<AccountEventService>();
|
var accountEventService = scope.ServiceProvider.GetRequiredService<AccountEventService>();
|
||||||
|
var cache = scope.ServiceProvider.GetRequiredService<ICacheService>();
|
||||||
|
|
||||||
|
var previous = await cache.GetAsync<SnAccountStatus>($"account:status:prev:{evt.AccountId}");
|
||||||
var status = await accountEventService.GetStatus(evt.AccountId);
|
var status = await accountEventService.GetStatus(evt.AccountId);
|
||||||
|
|
||||||
|
if (previous != null && !StatusesEqual(previous, status))
|
||||||
|
{
|
||||||
await nats.PublishAsync(
|
await nats.PublishAsync(
|
||||||
AccountStatusUpdatedEvent.Type,
|
AccountStatusUpdatedEvent.Type,
|
||||||
ByteString.CopyFromUtf8(JsonSerializer.Serialize(new AccountStatusUpdatedEvent
|
ByteString.CopyFromUtf8(JsonSerializer.Serialize(new AccountStatusUpdatedEvent
|
||||||
@@ -175,10 +191,12 @@ public class BroadcastEventHandler(
|
|||||||
AccountId = evt.AccountId,
|
AccountId = evt.AccountId,
|
||||||
Status = status,
|
Status = status,
|
||||||
UpdatedAt = SystemClock.Instance.GetCurrentInstant()
|
UpdatedAt = SystemClock.Instance.GetCurrentInstant()
|
||||||
}, GrpcTypeHelper.SerializerOptionsWithoutIgnore)).ToByteArray()
|
}, GrpcTypeHelper.SerializerOptionsWithoutIgnore)).ToByteArray(),
|
||||||
|
cancellationToken: stoppingToken
|
||||||
);
|
);
|
||||||
|
}
|
||||||
|
|
||||||
logger.LogInformation("Broadcasted status update for user {AccountId}", evt.AccountId);
|
logger.LogInformation("Handled status update for user {AccountId} on connect", evt.AccountId);
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
@@ -204,9 +222,13 @@ public class BroadcastEventHandler(
|
|||||||
|
|
||||||
await using var scope = serviceProvider.CreateAsyncScope();
|
await using var scope = serviceProvider.CreateAsyncScope();
|
||||||
var accountEventService = scope.ServiceProvider.GetRequiredService<AccountEventService>();
|
var accountEventService = scope.ServiceProvider.GetRequiredService<AccountEventService>();
|
||||||
|
var cache = scope.ServiceProvider.GetRequiredService<ICacheService>();
|
||||||
|
|
||||||
|
var previous = await cache.GetAsync<SnAccountStatus>($"account:status:prev:{evt.AccountId}");
|
||||||
var status = await accountEventService.GetStatus(evt.AccountId);
|
var status = await accountEventService.GetStatus(evt.AccountId);
|
||||||
|
|
||||||
|
if (previous != null && !StatusesEqual(previous, status))
|
||||||
|
{
|
||||||
await nats.PublishAsync(
|
await nats.PublishAsync(
|
||||||
AccountStatusUpdatedEvent.Type,
|
AccountStatusUpdatedEvent.Type,
|
||||||
ByteString.CopyFromUtf8(JsonSerializer.Serialize(new AccountStatusUpdatedEvent
|
ByteString.CopyFromUtf8(JsonSerializer.Serialize(new AccountStatusUpdatedEvent
|
||||||
@@ -216,8 +238,9 @@ public class BroadcastEventHandler(
|
|||||||
UpdatedAt = SystemClock.Instance.GetCurrentInstant()
|
UpdatedAt = SystemClock.Instance.GetCurrentInstant()
|
||||||
}, GrpcTypeHelper.SerializerOptionsWithoutIgnore)).ToByteArray()
|
}, GrpcTypeHelper.SerializerOptionsWithoutIgnore)).ToByteArray()
|
||||||
);
|
);
|
||||||
|
}
|
||||||
|
|
||||||
logger.LogInformation("Broadcasted status update for user {AccountId}", evt.AccountId);
|
logger.LogInformation("Handled status update for user {AccountId} on disconnect", evt.AccountId);
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
|
|||||||
Reference in New Issue
Block a user