♻️ Refactored presence update logic

This commit is contained in:
2025-11-04 22:13:19 +08:00
parent f271681b5d
commit 58e79655e8
8 changed files with 223 additions and 42 deletions

View File

@@ -7,7 +7,6 @@ using DysonNetwork.Shared.Stream;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Localization;
using NATS.Client.Core;
using NATS.Net;
using NodaTime;
using NodaTime.Extensions;
@@ -28,7 +27,7 @@ public class AccountEventService(
private const string StatusCacheKey = "account:status:";
private const string ActivityCacheKey = "account:activities:";
private async Task<bool> GetAccountIsConnected(Guid userId)
public async Task<bool> GetAccountIsConnected(Guid userId)
{
var resp = await pusher.GetWebsocketConnectionStatusAsync(
new GetWebsocketConnectionStatusRequest { UserId = userId.ToString() }
@@ -36,6 +35,16 @@ public class AccountEventService(
return resp.IsConnected;
}
public async Task<Dictionary<string, bool>> GetAccountIsConnectedBatch(List<Guid> userIds)
{
var req = new GetWebsocketConnectionStatusBatchRequest();
req.UsersId.AddRange(userIds.Select(u => u.ToString()));
var resp = await pusher.GetWebsocketConnectionStatusBatchAsync(
req
);
return resp.IsConnected.ToDictionary();
}
public void PurgeStatusCache(Guid userId)
{
var cacheKey = $"{StatusCacheKey}{userId}";
@@ -531,9 +540,9 @@ public class AccountEventService(
)
{
var now = SystemClock.Instance.GetCurrentInstant();
var activity = await db.PresenceActivities.FirstOrDefaultAsync(
e => e.ManualId == manualId && e.AccountId == userId && e.LeaseExpiresAt > now && e.DeletedAt == null
);
var activity = await db.PresenceActivities.FirstOrDefaultAsync(e =>
e.ManualId == manualId && e.AccountId == userId && e.LeaseExpiresAt > now && e.DeletedAt == null
);
if (activity == null)
return null;
@@ -558,8 +567,8 @@ public class AccountEventService(
public async Task<bool> DeleteActivityByManualId(string manualId, Guid userId)
{
var now = SystemClock.Instance.GetCurrentInstant();
var activity = await db.PresenceActivities.FirstOrDefaultAsync(
e => e.ManualId == manualId && e.AccountId == userId && e.LeaseExpiresAt > now && e.DeletedAt == null
var activity = await db.PresenceActivities.FirstOrDefaultAsync(e =>
e.ManualId == manualId && e.AccountId == userId && e.LeaseExpiresAt > now && e.DeletedAt == null
);
if (activity == null) return false;
if (activity.LeaseExpiresAt <= now)
@@ -600,4 +609,16 @@ public class AccountEventService(
PurgeActivityCache(activity.AccountId);
return true;
}
/// <summary>
/// Gets all user IDs that have Spotify connections
/// </summary>
public async Task<List<Guid>> GetSpotifyConnectedUsersAsync()
{
return await db.AccountConnections
.Where(c => c.Provider == "spotify" && c.AccessToken != null && c.RefreshToken != null)
.Select(c => c.AccountId)
.Distinct()
.ToListAsync();
}
}

View File

@@ -0,0 +1,21 @@
using DysonNetwork.Shared.Models;
namespace DysonNetwork.Pass.Account.Presences;
/// <summary>
/// Interface for presence services that can update user presence activities
/// </summary>
public interface IPresenceService
{
/// <summary>
/// The unique identifier for this presence service (e.g., "spotify", "discord")
/// </summary>
string ServiceId { get; }
/// <summary>
/// Updates presence activities for the specified users
/// </summary>
/// <param name="userIds">The user IDs to update presence for</param>
/// <returns>A task representing the asynchronous operation</returns>
Task UpdatePresencesAsync(IEnumerable<Guid> userIds);
}

View File

@@ -0,0 +1,112 @@
using Quartz;
namespace DysonNetwork.Pass.Account.Presences;
public class PresenceUpdateJob(
IEnumerable<IPresenceService> presenceServices,
AccountEventService accountEventService,
ILogger<PresenceUpdateJob> logger
) : IJob
{
public async Task Execute(IJobExecutionContext context)
{
// Get the stage parameter from the job data
var stageString = context.JobDetail.JobDataMap.GetString("stage");
if (!Enum.TryParse<PresenceUpdateStage>(stageString, out var stage))
{
logger.LogError("Invalid or missing stage parameter: {Stage}", stageString);
return;
}
logger.LogInformation("Starting presence updates for stage: {Stage}", stage);
try
{
// Get users to update based on the stage
var userIds = await GetUsersForStageAsync(stage);
if (userIds.Count == 0)
{
logger.LogInformation("No users found for stage {Stage}", stage);
return;
}
logger.LogInformation("Found {UserCount} users for stage {Stage}", userIds.Count, stage);
// Update presence for each service
foreach (var presenceService in presenceServices)
{
try
{
await presenceService.UpdatePresencesAsync(userIds);
logger.LogInformation("Updated {ServiceId} presences for {UserCount} users in stage {Stage}",
presenceService.ServiceId, userIds.Count, stage);
}
catch (Exception ex)
{
logger.LogError(ex, "Error updating {ServiceId} presences for stage {Stage}",
presenceService.ServiceId, stage);
}
}
logger.LogInformation("Presence updates completed for stage {Stage}", stage);
}
catch (Exception ex)
{
logger.LogError(ex, "Error occurred during presence updates for stage {Stage}", stage);
}
}
private async Task<List<Guid>> GetUsersForStageAsync(PresenceUpdateStage stage)
{
// Get all users with presence connections
var allUserIds = await GetAllUsersWithPresenceConnectionsAsync();
if (!allUserIds.Any())
{
return new List<Guid>();
}
// Batch fetch online status for all users
var onlineStatuses = await accountEventService.GetAccountIsConnectedBatch(allUserIds);
var filteredUserIds = new List<Guid>();
foreach (var userId in allUserIds)
{
var userIdString = userId.ToString();
var isOnline = onlineStatuses.GetValueOrDefault(userIdString, false);
var activeActivities = await accountEventService.GetActiveActivities(userId);
var hasActivePresence = activeActivities.Any();
var shouldInclude = stage switch
{
PresenceUpdateStage.Active => isOnline && hasActivePresence,
PresenceUpdateStage.Maybe => isOnline && !hasActivePresence,
PresenceUpdateStage.Cold => !isOnline,
_ => false
};
if (shouldInclude)
{
filteredUserIds.Add(userId);
}
}
return filteredUserIds;
}
private async Task<List<Guid>> GetAllUsersWithPresenceConnectionsAsync()
{
// This method should return all users who have connections to any presence service
// For now, we'll focus on Spotify users, but this should be extended to include all presence services
// In a more complete implementation, you might want to query all presence services
// to get users with connections to any of them
// For simplicity, we'll return users with Spotify connections
// This should be made more generic in the future
var spotifyUsers = await accountEventService.GetSpotifyConnectedUsersAsync();
return spotifyUsers;
}
}

View File

@@ -0,0 +1,19 @@
namespace DysonNetwork.Pass.Account.Presences;
public enum PresenceUpdateStage
{
/// <summary>
/// Active users - online and have active presence activities
/// </summary>
Active,
/// <summary>
/// Maybe active users - online but no active presence activities
/// </summary>
Maybe,
/// <summary>
/// Cold users - offline users
/// </summary>
Cold
}

View File

@@ -10,15 +10,17 @@ public class SpotifyPresenceService(
Auth.OpenId.SpotifyOidcService spotifyService,
AccountEventService accountEventService,
ILogger<SpotifyPresenceService> logger
)
) : IPresenceService
{
/// <summary>
/// Updates presence activities for users who have Spotify connections and are currently playing music
/// </summary>
public async Task UpdateAllSpotifyPresencesAsync()
/// <inheritdoc />
public string ServiceId => "spotify";
/// <inheritdoc />
public async Task UpdatePresencesAsync(IEnumerable<Guid> userIds)
{
var userIdList = userIds.ToList();
var userConnections = await db.AccountConnections
.Where(c => c.Provider == "spotify" && c.AccessToken != null && c.RefreshToken != null)
.Where(c => userIdList.Contains(c.AccountId) && c.Provider == "spotify" && c.AccessToken != null && c.RefreshToken != null)
.Include(c => c.Account)
.ToListAsync();

View File

@@ -1,21 +0,0 @@
using Quartz;
namespace DysonNetwork.Pass.Account.Presences;
public class SpotifyPresenceUpdateJob(SpotifyPresenceService spotifyPresenceService, ILogger<SpotifyPresenceUpdateJob> logger) : IJob
{
public async Task Execute(IJobExecutionContext context)
{
logger.LogInformation("Starting Spotify presence updates...");
try
{
await spotifyPresenceService.UpdateAllSpotifyPresencesAsync();
logger.LogInformation("Spotify presence updates completed successfully.");
}
catch (Exception ex)
{
logger.LogError(ex, "Error occurred during Spotify presence updates.");
}
}
}

View File

@@ -84,14 +84,38 @@ public static class ScheduledJobsConfiguration
.WithIdentity("SocialCreditValidationTrigger")
.WithCronSchedule("0 0 0 * * ?"));
var spotifyPresenceUpdateJob = new JobKey("SpotifyPresenceUpdate");
q.AddJob<SpotifyPresenceUpdateJob>(opts => opts.WithIdentity(spotifyPresenceUpdateJob));
// Presence update jobs for different user stages
var activePresenceUpdateJob = new JobKey("ActivePresenceUpdate");
q.AddJob<PresenceUpdateJob>(opts => opts.WithIdentity(activePresenceUpdateJob));
q.AddTrigger(opts => opts
.ForJob(spotifyPresenceUpdateJob)
.WithIdentity("SpotifyPresenceUpdateTrigger")
.ForJob(activePresenceUpdateJob)
.WithIdentity("ActivePresenceUpdateTrigger")
.WithSimpleSchedule(o => o
.WithIntervalInMinutes(2)
.WithIntervalInMinutes(1)
.RepeatForever())
.UsingJobData("stage", nameof(PresenceUpdateStage.Active))
);
var maybePresenceUpdateJob = new JobKey("MaybePresenceUpdate");
q.AddJob<PresenceUpdateJob>(opts => opts.WithIdentity(maybePresenceUpdateJob));
q.AddTrigger(opts => opts
.ForJob(maybePresenceUpdateJob)
.WithIdentity("MaybePresenceUpdateTrigger")
.WithSimpleSchedule(o => o
.WithIntervalInMinutes(3)
.RepeatForever())
.UsingJobData("stage", nameof(PresenceUpdateStage.Maybe))
);
var coldPresenceUpdateJob = new JobKey("ColdPresenceUpdate");
q.AddJob<PresenceUpdateJob>(opts => opts.WithIdentity(coldPresenceUpdateJob));
q.AddTrigger(opts => opts
.ForJob(coldPresenceUpdateJob)
.WithIdentity("ColdPresenceUpdateTrigger")
.WithSimpleSchedule(o => o
.WithIntervalInMinutes(10)
.RepeatForever())
.UsingJobData("stage", nameof(PresenceUpdateStage.Cold))
);
});
services.AddQuartzHostedService(q => q.WaitForJobsToComplete = true);

View File

@@ -10,12 +10,13 @@ using NodaTime;
using NodaTime.Serialization.SystemTextJson;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Threading.RateLimiting;
using DysonNetwork.Pass.Account.Presences;
using DysonNetwork.Pass.Auth.OidcProvider.Options;
using DysonNetwork.Pass.Auth.OidcProvider.Services;
using DysonNetwork.Pass.Credit;
using DysonNetwork.Pass.Handlers;
using DysonNetwork.Pass.Leveling;
using DysonNetwork.Pass.Lotteries;
using DysonNetwork.Pass.Mailer;
using DysonNetwork.Pass.Realm;
using DysonNetwork.Pass.Safety;
@@ -144,7 +145,6 @@ public static class ServiceCollectionExtensions
services.AddScoped<ActionLogService>();
services.AddScoped<RelationshipService>();
services.AddScoped<MagicSpellService>();
services.AddScoped<DysonNetwork.Pass.Account.Presences.SpotifyPresenceService>();
services.AddScoped<AuthService>();
services.AddScoped<TokenAuthService>();
services.AddScoped<AccountUsernameService>();
@@ -156,7 +156,10 @@ public static class ServiceCollectionExtensions
services.AddScoped<SocialCreditService>();
services.AddScoped<ExperienceService>();
services.AddScoped<RealmService>();
services.AddScoped<Lotteries.LotteryService>();
services.AddScoped<LotteryService>();
services.AddScoped<SpotifyPresenceService>();
services.AddScoped<IPresenceService, SpotifyPresenceService>();
services.Configure<OidcProviderOptions>(configuration.GetSection("OidcProvider"));
services.AddScoped<OidcProviderService>();