♻️ Refactored the queue

This commit is contained in:
2025-08-21 17:41:48 +08:00
parent 83c052ec4e
commit 4d1972bc99
9 changed files with 325 additions and 154 deletions

View File

@@ -19,7 +19,7 @@ using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata;
namespace DysonNetwork.Pass.Migrations namespace DysonNetwork.Pass.Migrations
{ {
[DbContext(typeof(AppDatabase))] [DbContext(typeof(AppDatabase))]
[Migration("20250820170714_AddLevelingBonusMultiplier")] [Migration("20250821093930_AddLevelingBonusMultiplier")]
partial class AddLevelingBonusMultiplier partial class AddLevelingBonusMultiplier
{ {
/// <inheritdoc /> /// <inheritdoc />
@@ -1108,8 +1108,8 @@ namespace DysonNetwork.Pass.Migrations
.HasColumnType("uuid") .HasColumnType("uuid")
.HasColumnName("account_id"); .HasColumnName("account_id");
b.Property<int>("BonusMultiplier") b.Property<double>("BonusMultiplier")
.HasColumnType("integer") .HasColumnType("double precision")
.HasColumnName("bonus_multiplier"); .HasColumnName("bonus_multiplier");
b.Property<Instant>("CreatedAt") b.Property<Instant>("CreatedAt")

View File

@@ -10,12 +10,12 @@ namespace DysonNetwork.Pass.Migrations
/// <inheritdoc /> /// <inheritdoc />
protected override void Up(MigrationBuilder migrationBuilder) protected override void Up(MigrationBuilder migrationBuilder)
{ {
migrationBuilder.AddColumn<int>( migrationBuilder.AddColumn<double>(
name: "bonus_multiplier", name: "bonus_multiplier",
table: "experience_records", table: "experience_records",
type: "integer", type: "double precision",
nullable: false, nullable: false,
defaultValue: 0); defaultValue: 0.0);
} }
/// <inheritdoc /> /// <inheritdoc />

View File

@@ -1105,8 +1105,8 @@ namespace DysonNetwork.Pass.Migrations
.HasColumnType("uuid") .HasColumnType("uuid")
.HasColumnName("account_id"); .HasColumnName("account_id");
b.Property<int>("BonusMultiplier") b.Property<double>("BonusMultiplier")
.HasColumnType("integer") .HasColumnType("double precision")
.HasColumnName("bonus_multiplier"); .HasColumnName("bonus_multiplier");
b.Property<Instant>("CreatedAt") b.Property<Instant>("CreatedAt")

View File

@@ -1,34 +1,29 @@
using CorePush.Apple; using CorePush.Apple;
using CorePush.Firebase; using CorePush.Firebase;
using DysonNetwork.Pusher.Connection; using DysonNetwork.Pusher.Connection;
using DysonNetwork.Pusher.Services;
using DysonNetwork.Shared.Cache; using DysonNetwork.Shared.Cache;
using DysonNetwork.Shared.Proto; using DysonNetwork.Shared.Proto;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using NodaTime; using NodaTime;
using System.Threading.Channels;
namespace DysonNetwork.Pusher.Notification; namespace DysonNetwork.Pusher.Notification;
public class PushService : IDisposable public class PushService
{ {
private readonly AppDatabase _db; private readonly AppDatabase _db;
private readonly FlushBufferService _fbs;
private readonly WebSocketService _ws; private readonly WebSocketService _ws;
private readonly QueueService _queueService;
private readonly ILogger<PushService> _logger; private readonly ILogger<PushService> _logger;
private readonly FirebaseSender? _fcm; private readonly FirebaseSender? _fcm;
private readonly ApnSender? _apns; private readonly ApnSender? _apns;
private readonly string? _apnsTopic; private readonly string? _apnsTopic;
private readonly Channel<PushWorkItem> _channel;
private readonly int _maxConcurrency;
private readonly CancellationTokenSource _cts = new();
private readonly List<Task> _workers = new();
public PushService( public PushService(
IConfiguration config, IConfiguration config,
AppDatabase db, AppDatabase db,
FlushBufferService fbs,
WebSocketService ws, WebSocketService ws,
QueueService queueService,
IHttpClientFactory httpFactory, IHttpClientFactory httpFactory,
ILogger<PushService> logger ILogger<PushService> logger
) )
@@ -58,48 +53,11 @@ public class PushService : IDisposable
} }
_db = db; _db = db;
_fbs = fbs;
_ws = ws; _ws = ws;
_queueService = queueService;
_logger = logger; _logger = logger;
// --- Concurrency & channel config --- _logger.LogInformation("PushService initialized");
// Defaults: 8 workers, bounded capacity 2000 items.
_maxConcurrency = Math.Max(1, cfgSection.GetValue<int?>("MaxConcurrency") ?? 8);
var capacity = Math.Max(1, cfgSection.GetValue<int?>("ChannelCapacity") ?? 2000);
_channel = Channel.CreateBounded<PushWorkItem>(new BoundedChannelOptions(capacity)
{
SingleWriter = false,
SingleReader = false,
FullMode = BoundedChannelFullMode.Wait, // apply backpressure instead of dropping
AllowSynchronousContinuations = false
});
// Start background consumers
for (int i = 0; i < _maxConcurrency; i++)
{
_workers.Add(Task.Run(() => WorkerLoop(_cts.Token)));
}
_logger.LogInformation("PushService initialized with {Workers} workers and capacity {Capacity}", _maxConcurrency, capacity);
}
public void Dispose()
{
try
{
_channel.Writer.TryComplete();
_cts.Cancel();
}
catch { /* ignore */ }
try
{
Task.WhenAll(_workers).Wait(TimeSpan.FromSeconds(5));
}
catch { /* ignore */ }
_cts.Dispose();
} }
public async Task UnsubscribeDevice(string deviceId) public async Task UnsubscribeDevice(string deviceId)
@@ -165,7 +123,7 @@ public class PushService : IDisposable
{ {
meta ??= []; meta ??= [];
if (title is null && subtitle is null && content is null) if (title is null && subtitle is null && content is null)
throw new ArgumentException("Unable to send notification that completely empty."); throw new ArgumentException("Unable to send notification that is completely empty.");
if (actionUri is not null) meta["action_uri"] = actionUri; if (actionUri is not null) meta["action_uri"] = actionUri;
@@ -181,35 +139,57 @@ public class PushService : IDisposable
}; };
if (save) if (save)
_fbs.Enqueue(notification); {
_db.Notifications.Add(notification);
await _db.SaveChangesAsync();
}
if (!isSilent) if (!isSilent)
await DeliveryNotification(notification); // returns quickly (does NOT wait for APNS/FCM) _ = _queueService.EnqueuePushNotification(notification, accountId, save);
} }
private async Task DeliveryNotification(Notification notification) public async Task DeliverPushNotification(Notification notification, CancellationToken cancellationToken = default)
{ {
_logger.LogInformation( try
"Delivering notification: {NotificationTopic} #{NotificationId} with meta {NotificationMeta}",
notification.Topic,
notification.Id,
notification.Meta
);
// WS send: still immediate (fire-and-forget from caller perspective)
_ws.SendPacketToAccount(notification.AccountId.ToString(), new Connection.WebSocketPacket
{ {
Type = "notifications.new", _logger.LogInformation(
Data = notification "Delivering push notification: {NotificationTopic} with meta {NotificationMeta}",
}); notification.Topic,
notification.Meta
);
// Query subscribers and enqueue push work (non-blocking to the HTTP request) // Get all push subscriptions for the account
var subscribers = await _db.PushSubscriptions var subscriptions = await _db.PushSubscriptions
.Where(s => s.AccountId == notification.AccountId) .Where(s => s.AccountId == notification.AccountId)
.AsNoTracking() .ToListAsync(cancellationToken);
.ToListAsync();
await EnqueuePushWork(notification, subscribers); if (!subscriptions.Any())
{
_logger.LogInformation("No push subscriptions found for account {AccountId}", notification.AccountId);
return;
}
// Send push notifications
var tasks = new List<Task>();
foreach (var subscription in subscriptions)
{
try
{
tasks.Add(SendPushNotificationAsync(subscription, notification, cancellationToken));
}
catch (Exception ex)
{
_logger.LogError(ex, "Error sending push notification to {DeviceId}", subscription.DeviceId);
}
}
await Task.WhenAll(tasks);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in DeliverPushNotification");
throw;
}
} }
public async Task MarkNotificationsViewed(ICollection<Notification> notifications) public async Task MarkNotificationsViewed(ICollection<Notification> notifications)
@@ -235,20 +215,25 @@ public class PushService : IDisposable
{ {
if (save) if (save)
{ {
accounts.ForEach(x => var now = SystemClock.Instance.GetCurrentInstant();
var notifications = accounts.Select(accountId => new Notification
{ {
var newNotification = new Notification Topic = notification.Topic,
{ Title = notification.Title,
Topic = notification.Topic, Subtitle = notification.Subtitle,
Title = notification.Title, Content = notification.Content,
Subtitle = notification.Subtitle, Meta = notification.Meta,
Content = notification.Content, Priority = notification.Priority,
Meta = notification.Meta, AccountId = accountId,
Priority = notification.Priority, CreatedAt = now,
AccountId = x UpdatedAt = now
}; }).ToList();
_fbs.Enqueue(newNotification);
}); if (notifications.Count != 0)
{
await _db.Notifications.AddRangeAsync(notifications);
await _db.SaveChangesAsync();
}
} }
_logger.LogInformation( _logger.LogInformation(
@@ -270,55 +255,15 @@ public class PushService : IDisposable
} }
// Fetch all subscribers once and enqueue to workers // Fetch all subscribers once and enqueue to workers
var subscribers = await _db.PushSubscriptions var subscriptions = await _db.PushSubscriptions
.Where(s => accounts.Contains(s.AccountId)) .Where(s => accounts.Contains(s.AccountId))
.AsNoTracking() .AsNoTracking()
.ToListAsync(); .ToListAsync();
await EnqueuePushWork(notification, subscribers); await DeliverPushNotification(notification);
} }
private async Task EnqueuePushWork(Notification notification, IEnumerable<PushSubscription> subscriptions) private async Task SendPushNotificationAsync(PushSubscription subscription, Notification notification, CancellationToken cancellationToken)
{
foreach (var sub in subscriptions)
{
// Use the current notification reference (no mutation of content after this point).
var item = new PushWorkItem(notification, sub);
// Respect backpressure if channel is full.
await _channel.Writer.WriteAsync(item, _cts.Token);
}
}
private async Task WorkerLoop(CancellationToken ct)
{
try
{
await foreach (var item in _channel.Reader.ReadAllAsync(ct))
{
try
{
await _PushSingleNotification(item.Notification, item.Subscription);
}
catch (OperationCanceledException) when (ct.IsCancellationRequested)
{
break;
}
catch (Exception ex)
{
_logger.LogDebug(ex, "Worker handled exception for notification #{Id}", item.Notification.Id);
}
}
}
catch (OperationCanceledException)
{
// normal shutdown
}
}
private readonly record struct PushWorkItem(Notification Notification, PushSubscription Subscription);
private async Task _PushSingleNotification(Notification notification, PushSubscription subscription)
{ {
try try
{ {

View File

@@ -3,6 +3,7 @@ using DysonNetwork.Pusher.Startup;
using DysonNetwork.Shared.Auth; using DysonNetwork.Shared.Auth;
using DysonNetwork.Shared.Http; using DysonNetwork.Shared.Http;
using DysonNetwork.Shared.Registry; using DysonNetwork.Shared.Registry;
using DysonNetwork.Shared.Stream;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
var builder = WebApplication.CreateBuilder(args); var builder = WebApplication.CreateBuilder(args);
@@ -12,6 +13,7 @@ builder.ConfigureAppKestrel(builder.Configuration);
// Add application services // Add application services
builder.Services.AddRegistryService(builder.Configuration); builder.Services.AddRegistryService(builder.Configuration);
builder.Services.AddStreamConnection(builder.Configuration);
builder.Services.AddAppServices(builder.Configuration); builder.Services.AddAppServices(builder.Configuration);
builder.Services.AddAppRateLimiting(); builder.Services.AddAppRateLimiting();
builder.Services.AddAppAuthentication(); builder.Services.AddAppAuthentication();

View File

@@ -5,19 +5,21 @@ using DysonNetwork.Shared.Proto;
using DysonNetwork.Shared.Registry; using DysonNetwork.Shared.Registry;
using Google.Protobuf.WellKnownTypes; using Google.Protobuf.WellKnownTypes;
using Grpc.Core; using Grpc.Core;
using System.Text.Json;
namespace DysonNetwork.Pusher.Services; namespace DysonNetwork.Pusher.Services;
public class PusherServiceGrpc( public class PusherServiceGrpc(
EmailService emailService, QueueService queueService,
WebSocketService websocket, WebSocketService websocket,
PushService pushService, PushService pushService,
AccountClientHelper accountsHelper AccountClientHelper accountsHelper,
EmailService emailService
) : PusherService.PusherServiceBase ) : PusherService.PusherServiceBase
{ {
public override async Task<Empty> SendEmail(SendEmailRequest request, ServerCallContext context) public override async Task<Empty> SendEmail(SendEmailRequest request, ServerCallContext context)
{ {
await emailService.SendEmailAsync( await queueService.EnqueueEmail(
request.Email.ToName, request.Email.ToName,
request.Email.ToAddress, request.Email.ToAddress,
request.Email.Subject, request.Email.Subject,
@@ -47,13 +49,16 @@ public class PusherServiceGrpc(
Data = GrpcTypeHelper.ConvertByteStringToObject<Dictionary<string, object?>>(request.Packet.Data), Data = GrpcTypeHelper.ConvertByteStringToObject<Dictionary<string, object?>>(request.Packet.Data),
ErrorMessage = request.Packet.ErrorMessage ErrorMessage = request.Packet.ErrorMessage
}; };
foreach (var userId in request.UserIds) foreach (var userId in request.UserIds)
{
websocket.SendPacketToAccount(userId, packet); websocket.SendPacketToAccount(userId, packet);
}
return Task.FromResult(new Empty()); return Task.FromResult(new Empty());
} }
public override Task<Empty> PushWebSocketPacketToDevice(PushWebSocketPacketToDeviceRequest request, public override Task<Empty> PushWebSocketPacketToDevice(PushWebSocketPacketToDeviceRequest request,
ServerCallContext context) ServerCallContext context)
{ {
var packet = new Connection.WebSocketPacket var packet = new Connection.WebSocketPacket
@@ -75,29 +80,38 @@ public class PusherServiceGrpc(
Data = GrpcTypeHelper.ConvertByteStringToObject<Dictionary<string, object?>>(request.Packet.Data), Data = GrpcTypeHelper.ConvertByteStringToObject<Dictionary<string, object?>>(request.Packet.Data),
ErrorMessage = request.Packet.ErrorMessage ErrorMessage = request.Packet.ErrorMessage
}; };
foreach (var deviceId in request.DeviceIds) foreach (var deviceId in request.DeviceIds)
{
websocket.SendPacketToDevice(deviceId, packet); websocket.SendPacketToDevice(deviceId, packet);
}
return Task.FromResult(new Empty()); return Task.FromResult(new Empty());
} }
public override async Task<Empty> SendPushNotificationToUser(SendPushNotificationToUserRequest request, public override async Task<Empty> SendPushNotificationToUser(SendPushNotificationToUserRequest request,
ServerCallContext context) ServerCallContext context)
{ {
var account = await accountsHelper.GetAccount(Guid.Parse(request.UserId)); var notification = new Notification.Notification
await pushService.SendNotification( {
account, Topic = request.Notification.Topic,
request.Notification.Topic, Title = request.Notification.Title,
request.Notification.Title, Subtitle = request.Notification.Subtitle,
request.Notification.Subtitle, Content = request.Notification.Body,
request.Notification.Body, Meta = request.Notification.HasMeta
request.Notification.HasMeta
? GrpcTypeHelper.ConvertByteStringToObject<Dictionary<string, object?>>(request.Notification.Meta) ?? [] ? GrpcTypeHelper.ConvertByteStringToObject<Dictionary<string, object?>>(request.Notification.Meta) ?? []
: [], : []
request.Notification.ActionUri, };
request.Notification.IsSilent,
if (request.Notification.ActionUri is not null)
notification.Meta["action_uri"] = request.Notification.ActionUri;
await queueService.EnqueuePushNotification(
notification,
Guid.Parse(request.UserId),
request.Notification.IsSavable request.Notification.IsSavable
); );
return new Empty(); return new Empty();
} }
@@ -114,10 +128,18 @@ public class PusherServiceGrpc(
? GrpcTypeHelper.ConvertByteStringToObject<Dictionary<string, object?>>(request.Notification.Meta) ?? [] ? GrpcTypeHelper.ConvertByteStringToObject<Dictionary<string, object?>>(request.Notification.Meta) ?? []
: [], : [],
}; };
if (request.Notification.ActionUri is not null) if (request.Notification.ActionUri is not null)
notification.Meta["action_uri"] = request.Notification.ActionUri; notification.Meta["action_uri"] = request.Notification.ActionUri;
var accounts = request.UserIds.Select(Guid.Parse).ToList();
await pushService.SendNotificationBatch(notification, accounts, request.Notification.IsSavable); var tasks = request.UserIds
.Select(userId => queueService.EnqueuePushNotification(
notification,
Guid.Parse(userId),
request.Notification.IsSavable
));
await Task.WhenAll(tasks);
return new Empty(); return new Empty();
} }

View File

@@ -0,0 +1,135 @@
using System.Text.Json;
using DysonNetwork.Pusher.Email;
using DysonNetwork.Pusher.Notification;
using DysonNetwork.Shared.Proto;
using DysonNetwork.Shared.Registry;
using NATS.Client.Core;
namespace DysonNetwork.Pusher.Services;
public class QueueBackgroundService(
INatsConnection nats,
IServiceProvider serviceProvider,
ILogger<QueueBackgroundService> logger,
IConfiguration configuration
)
: BackgroundService
{
private const string QueueName = "pusher.queue";
private const string QueueGroup = "pusher.workers";
private readonly int _consumerCount = configuration.GetValue<int?>("ConsumerCount") ?? Environment.ProcessorCount;
private readonly List<Task> _consumerTasks = [];
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
logger.LogInformation("Starting {ConsumerCount} queue consumers", _consumerCount);
// Start multiple consumers
for (var i = 0; i < _consumerCount; i++)
_consumerTasks.Add(Task.Run(() => RunConsumerAsync(stoppingToken), stoppingToken));
// Wait for all consumers to complete
await Task.WhenAll(_consumerTasks);
}
private async Task RunConsumerAsync(CancellationToken stoppingToken)
{
logger.LogInformation("Queue consumer started");
await foreach (var msg in nats.SubscribeAsync<QueueMessage>(
QueueName,
queueGroup: QueueGroup,
cancellationToken: stoppingToken))
{
try
{
await ProcessMessageAsync(msg, stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
// Normal shutdown
break;
}
catch (Exception ex)
{
logger.LogError(ex, "Error in queue consumer");
// Add a small delay to prevent tight error loops
await Task.Delay(1000, stoppingToken);
}
}
}
private async ValueTask ProcessMessageAsync(NatsMsg<QueueMessage> msg, CancellationToken cancellationToken)
{
using var scope = serviceProvider.CreateScope();
var message = msg.Data;
logger.LogDebug("Processing message of type {MessageType}", message.Type);
try
{
switch (message.Type)
{
case QueueMessageType.Email:
await ProcessEmailMessageAsync(message, scope, cancellationToken);
break;
case QueueMessageType.PushNotification:
await ProcessPushNotificationMessageAsync(message, scope, cancellationToken);
break;
default:
logger.LogWarning("Unknown message type: {MessageType}", message.Type);
break;
}
await msg.ReplyAsync();
}
catch (Exception ex)
{
logger.LogError(ex, "Error processing message of type {MessageType}", message.Type);
// Don't rethrow to prevent the message from being retried indefinitely
// In a production scenario, you might want to implement a dead-letter queue
}
}
private static async Task ProcessEmailMessageAsync(QueueMessage message, IServiceScope scope,
CancellationToken cancellationToken)
{
var emailService = scope.ServiceProvider.GetRequiredService<EmailService>();
var emailMessage = JsonSerializer.Deserialize<EmailMessage>(message.Data)
?? throw new InvalidOperationException("Invalid email message format");
await emailService.SendEmailAsync(
emailMessage.ToName,
emailMessage.ToAddress,
emailMessage.Subject,
emailMessage.Body);
}
private static async Task ProcessPushNotificationMessageAsync(QueueMessage message, IServiceScope scope,
CancellationToken cancellationToken)
{
var pushService = scope.ServiceProvider.GetRequiredService<PushService>();
var logger = scope.ServiceProvider.GetRequiredService<ILogger<QueueBackgroundService>>();
var notification = JsonSerializer.Deserialize<Notification.Notification>(message.Data);
if (notification == null)
{
logger.LogError("Invalid push notification data format");
return;
}
try
{
logger.LogDebug("Processing push notification for account {AccountId}", notification.AccountId);
await pushService.DeliverPushNotification(notification, cancellationToken);
logger.LogDebug("Successfully processed push notification for account {AccountId}", notification.AccountId);
}
catch (Exception ex)
{
logger.LogError(ex, "Error processing push notification for account {AccountId}", notification.AccountId);
// Don't rethrow to prevent the message from being retried indefinitely
}
}
}

View File

@@ -0,0 +1,61 @@
using System.Text.Json;
using NATS.Client.Core;
namespace DysonNetwork.Pusher.Services;
public class QueueService(INatsConnection nats)
{
private const string QueueName = "pusher_queue";
public async Task EnqueueEmail(string toName, string toAddress, string subject, string body)
{
var message = new QueueMessage
{
Type = QueueMessageType.Email,
Data = JsonSerializer.Serialize(new EmailMessage
{
ToName = toName,
ToAddress = toAddress,
Subject = subject,
Body = body
})
};
await nats.PublishAsync(QueueName, message);
}
public async Task EnqueuePushNotification(Notification.Notification notification, Guid userId, bool isSavable = false)
{
// Update the account ID in case it wasn't set
notification.AccountId = userId;
var message = new QueueMessage
{
Type = QueueMessageType.PushNotification,
TargetId = userId.ToString(),
Data = JsonSerializer.Serialize(notification)
};
await nats.PublishAsync(QueueName, message);
}
}
public class QueueMessage
{
public QueueMessageType Type { get; set; }
public string? TargetId { get; set; }
public string Data { get; set; } = string.Empty;
}
public enum QueueMessageType
{
Email,
PushNotification
}
public class EmailMessage
{
public string ToName { get; set; } = string.Empty;
public string ToAddress { get; set; } = string.Empty;
public string Subject { get; set; } = string.Empty;
public string Body { get; set; } = string.Empty;
}

View File

@@ -137,6 +137,12 @@ public static class ServiceCollectionExtensions
services.AddScoped<WebSocketService>(); services.AddScoped<WebSocketService>();
services.AddScoped<EmailService>(); services.AddScoped<EmailService>();
services.AddScoped<PushService>(); services.AddScoped<PushService>();
// Register QueueService as a singleton since it's thread-safe
services.AddSingleton<QueueService>();
// Register the background service
services.AddHostedService<QueueBackgroundService>();
return services; return services;
} }