diff --git a/DysonNetwork.Pass/Migrations/20250820170714_AddLevelingBonusMultiplier.Designer.cs b/DysonNetwork.Pass/Migrations/20250821093930_AddLevelingBonusMultiplier.Designer.cs similarity index 99% rename from DysonNetwork.Pass/Migrations/20250820170714_AddLevelingBonusMultiplier.Designer.cs rename to DysonNetwork.Pass/Migrations/20250821093930_AddLevelingBonusMultiplier.Designer.cs index 46ea391..a0657e2 100644 --- a/DysonNetwork.Pass/Migrations/20250820170714_AddLevelingBonusMultiplier.Designer.cs +++ b/DysonNetwork.Pass/Migrations/20250821093930_AddLevelingBonusMultiplier.Designer.cs @@ -19,7 +19,7 @@ using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata; namespace DysonNetwork.Pass.Migrations { [DbContext(typeof(AppDatabase))] - [Migration("20250820170714_AddLevelingBonusMultiplier")] + [Migration("20250821093930_AddLevelingBonusMultiplier")] partial class AddLevelingBonusMultiplier { /// @@ -1108,8 +1108,8 @@ namespace DysonNetwork.Pass.Migrations .HasColumnType("uuid") .HasColumnName("account_id"); - b.Property("BonusMultiplier") - .HasColumnType("integer") + b.Property("BonusMultiplier") + .HasColumnType("double precision") .HasColumnName("bonus_multiplier"); b.Property("CreatedAt") diff --git a/DysonNetwork.Pass/Migrations/20250820170714_AddLevelingBonusMultiplier.cs b/DysonNetwork.Pass/Migrations/20250821093930_AddLevelingBonusMultiplier.cs similarity index 84% rename from DysonNetwork.Pass/Migrations/20250820170714_AddLevelingBonusMultiplier.cs rename to DysonNetwork.Pass/Migrations/20250821093930_AddLevelingBonusMultiplier.cs index 2813232..487dff6 100644 --- a/DysonNetwork.Pass/Migrations/20250820170714_AddLevelingBonusMultiplier.cs +++ b/DysonNetwork.Pass/Migrations/20250821093930_AddLevelingBonusMultiplier.cs @@ -10,12 +10,12 @@ namespace DysonNetwork.Pass.Migrations /// protected override void Up(MigrationBuilder migrationBuilder) { - migrationBuilder.AddColumn( + migrationBuilder.AddColumn( name: "bonus_multiplier", table: "experience_records", - type: "integer", + type: "double precision", nullable: false, - defaultValue: 0); + defaultValue: 0.0); } /// diff --git a/DysonNetwork.Pass/Migrations/AppDatabaseModelSnapshot.cs b/DysonNetwork.Pass/Migrations/AppDatabaseModelSnapshot.cs index 0b66a4a..a909b3d 100644 --- a/DysonNetwork.Pass/Migrations/AppDatabaseModelSnapshot.cs +++ b/DysonNetwork.Pass/Migrations/AppDatabaseModelSnapshot.cs @@ -1105,8 +1105,8 @@ namespace DysonNetwork.Pass.Migrations .HasColumnType("uuid") .HasColumnName("account_id"); - b.Property("BonusMultiplier") - .HasColumnType("integer") + b.Property("BonusMultiplier") + .HasColumnType("double precision") .HasColumnName("bonus_multiplier"); b.Property("CreatedAt") diff --git a/DysonNetwork.Pusher/Notification/PushService.cs b/DysonNetwork.Pusher/Notification/PushService.cs index b28103b..6986840 100644 --- a/DysonNetwork.Pusher/Notification/PushService.cs +++ b/DysonNetwork.Pusher/Notification/PushService.cs @@ -1,34 +1,29 @@ using CorePush.Apple; using CorePush.Firebase; using DysonNetwork.Pusher.Connection; +using DysonNetwork.Pusher.Services; using DysonNetwork.Shared.Cache; using DysonNetwork.Shared.Proto; using Microsoft.EntityFrameworkCore; using NodaTime; -using System.Threading.Channels; namespace DysonNetwork.Pusher.Notification; -public class PushService : IDisposable +public class PushService { private readonly AppDatabase _db; - private readonly FlushBufferService _fbs; private readonly WebSocketService _ws; + private readonly QueueService _queueService; private readonly ILogger _logger; private readonly FirebaseSender? _fcm; private readonly ApnSender? _apns; private readonly string? _apnsTopic; - private readonly Channel _channel; - private readonly int _maxConcurrency; - private readonly CancellationTokenSource _cts = new(); - private readonly List _workers = new(); - public PushService( IConfiguration config, AppDatabase db, - FlushBufferService fbs, WebSocketService ws, + QueueService queueService, IHttpClientFactory httpFactory, ILogger logger ) @@ -58,48 +53,11 @@ public class PushService : IDisposable } _db = db; - _fbs = fbs; _ws = ws; + _queueService = queueService; _logger = logger; - - // --- Concurrency & channel config --- - // Defaults: 8 workers, bounded capacity 2000 items. - _maxConcurrency = Math.Max(1, cfgSection.GetValue("MaxConcurrency") ?? 8); - var capacity = Math.Max(1, cfgSection.GetValue("ChannelCapacity") ?? 2000); - - _channel = Channel.CreateBounded(new BoundedChannelOptions(capacity) - { - SingleWriter = false, - SingleReader = false, - FullMode = BoundedChannelFullMode.Wait, // apply backpressure instead of dropping - AllowSynchronousContinuations = false - }); - - // Start background consumers - for (int i = 0; i < _maxConcurrency; i++) - { - _workers.Add(Task.Run(() => WorkerLoop(_cts.Token))); - } - - _logger.LogInformation("PushService initialized with {Workers} workers and capacity {Capacity}", _maxConcurrency, capacity); - } - - public void Dispose() - { - try - { - _channel.Writer.TryComplete(); - _cts.Cancel(); - } - catch { /* ignore */ } - - try - { - Task.WhenAll(_workers).Wait(TimeSpan.FromSeconds(5)); - } - catch { /* ignore */ } - - _cts.Dispose(); + + _logger.LogInformation("PushService initialized"); } public async Task UnsubscribeDevice(string deviceId) @@ -165,7 +123,7 @@ public class PushService : IDisposable { meta ??= []; if (title is null && subtitle is null && content is null) - throw new ArgumentException("Unable to send notification that completely empty."); + throw new ArgumentException("Unable to send notification that is completely empty."); if (actionUri is not null) meta["action_uri"] = actionUri; @@ -181,35 +139,57 @@ public class PushService : IDisposable }; if (save) - _fbs.Enqueue(notification); + { + _db.Notifications.Add(notification); + await _db.SaveChangesAsync(); + } if (!isSilent) - await DeliveryNotification(notification); // returns quickly (does NOT wait for APNS/FCM) + _ = _queueService.EnqueuePushNotification(notification, accountId, save); } - private async Task DeliveryNotification(Notification notification) + public async Task DeliverPushNotification(Notification notification, CancellationToken cancellationToken = default) { - _logger.LogInformation( - "Delivering notification: {NotificationTopic} #{NotificationId} with meta {NotificationMeta}", - notification.Topic, - notification.Id, - notification.Meta - ); - - // WS send: still immediate (fire-and-forget from caller perspective) - _ws.SendPacketToAccount(notification.AccountId.ToString(), new Connection.WebSocketPacket + try { - Type = "notifications.new", - Data = notification - }); + _logger.LogInformation( + "Delivering push notification: {NotificationTopic} with meta {NotificationMeta}", + notification.Topic, + notification.Meta + ); - // Query subscribers and enqueue push work (non-blocking to the HTTP request) - var subscribers = await _db.PushSubscriptions - .Where(s => s.AccountId == notification.AccountId) - .AsNoTracking() - .ToListAsync(); + // Get all push subscriptions for the account + var subscriptions = await _db.PushSubscriptions + .Where(s => s.AccountId == notification.AccountId) + .ToListAsync(cancellationToken); - await EnqueuePushWork(notification, subscribers); + if (!subscriptions.Any()) + { + _logger.LogInformation("No push subscriptions found for account {AccountId}", notification.AccountId); + return; + } + + // Send push notifications + var tasks = new List(); + foreach (var subscription in subscriptions) + { + try + { + tasks.Add(SendPushNotificationAsync(subscription, notification, cancellationToken)); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error sending push notification to {DeviceId}", subscription.DeviceId); + } + } + + await Task.WhenAll(tasks); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error in DeliverPushNotification"); + throw; + } } public async Task MarkNotificationsViewed(ICollection notifications) @@ -235,20 +215,25 @@ public class PushService : IDisposable { if (save) { - accounts.ForEach(x => + var now = SystemClock.Instance.GetCurrentInstant(); + var notifications = accounts.Select(accountId => new Notification { - var newNotification = new Notification - { - Topic = notification.Topic, - Title = notification.Title, - Subtitle = notification.Subtitle, - Content = notification.Content, - Meta = notification.Meta, - Priority = notification.Priority, - AccountId = x - }; - _fbs.Enqueue(newNotification); - }); + Topic = notification.Topic, + Title = notification.Title, + Subtitle = notification.Subtitle, + Content = notification.Content, + Meta = notification.Meta, + Priority = notification.Priority, + AccountId = accountId, + CreatedAt = now, + UpdatedAt = now + }).ToList(); + + if (notifications.Count != 0) + { + await _db.Notifications.AddRangeAsync(notifications); + await _db.SaveChangesAsync(); + } } _logger.LogInformation( @@ -270,55 +255,15 @@ public class PushService : IDisposable } // Fetch all subscribers once and enqueue to workers - var subscribers = await _db.PushSubscriptions + var subscriptions = await _db.PushSubscriptions .Where(s => accounts.Contains(s.AccountId)) .AsNoTracking() .ToListAsync(); - await EnqueuePushWork(notification, subscribers); + await DeliverPushNotification(notification); } - private async Task EnqueuePushWork(Notification notification, IEnumerable subscriptions) - { - foreach (var sub in subscriptions) - { - // Use the current notification reference (no mutation of content after this point). - var item = new PushWorkItem(notification, sub); - - // Respect backpressure if channel is full. - await _channel.Writer.WriteAsync(item, _cts.Token); - } - } - - private async Task WorkerLoop(CancellationToken ct) - { - try - { - await foreach (var item in _channel.Reader.ReadAllAsync(ct)) - { - try - { - await _PushSingleNotification(item.Notification, item.Subscription); - } - catch (OperationCanceledException) when (ct.IsCancellationRequested) - { - break; - } - catch (Exception ex) - { - _logger.LogDebug(ex, "Worker handled exception for notification #{Id}", item.Notification.Id); - } - } - } - catch (OperationCanceledException) - { - // normal shutdown - } - } - - private readonly record struct PushWorkItem(Notification Notification, PushSubscription Subscription); - - private async Task _PushSingleNotification(Notification notification, PushSubscription subscription) + private async Task SendPushNotificationAsync(PushSubscription subscription, Notification notification, CancellationToken cancellationToken) { try { diff --git a/DysonNetwork.Pusher/Program.cs b/DysonNetwork.Pusher/Program.cs index d3134ca..7bfb0ba 100644 --- a/DysonNetwork.Pusher/Program.cs +++ b/DysonNetwork.Pusher/Program.cs @@ -3,6 +3,7 @@ using DysonNetwork.Pusher.Startup; using DysonNetwork.Shared.Auth; using DysonNetwork.Shared.Http; using DysonNetwork.Shared.Registry; +using DysonNetwork.Shared.Stream; using Microsoft.EntityFrameworkCore; var builder = WebApplication.CreateBuilder(args); @@ -12,6 +13,7 @@ builder.ConfigureAppKestrel(builder.Configuration); // Add application services builder.Services.AddRegistryService(builder.Configuration); +builder.Services.AddStreamConnection(builder.Configuration); builder.Services.AddAppServices(builder.Configuration); builder.Services.AddAppRateLimiting(); builder.Services.AddAppAuthentication(); diff --git a/DysonNetwork.Pusher/Services/PusherServiceGrpc.cs b/DysonNetwork.Pusher/Services/PusherServiceGrpc.cs index 64fe847..bf24b15 100644 --- a/DysonNetwork.Pusher/Services/PusherServiceGrpc.cs +++ b/DysonNetwork.Pusher/Services/PusherServiceGrpc.cs @@ -5,19 +5,21 @@ using DysonNetwork.Shared.Proto; using DysonNetwork.Shared.Registry; using Google.Protobuf.WellKnownTypes; using Grpc.Core; +using System.Text.Json; namespace DysonNetwork.Pusher.Services; public class PusherServiceGrpc( - EmailService emailService, + QueueService queueService, WebSocketService websocket, PushService pushService, - AccountClientHelper accountsHelper + AccountClientHelper accountsHelper, + EmailService emailService ) : PusherService.PusherServiceBase { public override async Task SendEmail(SendEmailRequest request, ServerCallContext context) { - await emailService.SendEmailAsync( + await queueService.EnqueueEmail( request.Email.ToName, request.Email.ToAddress, request.Email.Subject, @@ -47,13 +49,16 @@ public class PusherServiceGrpc( Data = GrpcTypeHelper.ConvertByteStringToObject>(request.Packet.Data), ErrorMessage = request.Packet.ErrorMessage }; + foreach (var userId in request.UserIds) + { websocket.SendPacketToAccount(userId, packet); - + } + return Task.FromResult(new Empty()); } - public override Task PushWebSocketPacketToDevice(PushWebSocketPacketToDeviceRequest request, +public override Task PushWebSocketPacketToDevice(PushWebSocketPacketToDeviceRequest request, ServerCallContext context) { var packet = new Connection.WebSocketPacket @@ -75,29 +80,38 @@ public class PusherServiceGrpc( Data = GrpcTypeHelper.ConvertByteStringToObject>(request.Packet.Data), ErrorMessage = request.Packet.ErrorMessage }; + foreach (var deviceId in request.DeviceIds) + { websocket.SendPacketToDevice(deviceId, packet); - + } + return Task.FromResult(new Empty()); } public override async Task SendPushNotificationToUser(SendPushNotificationToUserRequest request, ServerCallContext context) { - var account = await accountsHelper.GetAccount(Guid.Parse(request.UserId)); - await pushService.SendNotification( - account, - request.Notification.Topic, - request.Notification.Title, - request.Notification.Subtitle, - request.Notification.Body, - request.Notification.HasMeta + var notification = new Notification.Notification + { + Topic = request.Notification.Topic, + Title = request.Notification.Title, + Subtitle = request.Notification.Subtitle, + Content = request.Notification.Body, + Meta = request.Notification.HasMeta ? GrpcTypeHelper.ConvertByteStringToObject>(request.Notification.Meta) ?? [] - : [], - request.Notification.ActionUri, - request.Notification.IsSilent, + : [] + }; + + if (request.Notification.ActionUri is not null) + notification.Meta["action_uri"] = request.Notification.ActionUri; + + await queueService.EnqueuePushNotification( + notification, + Guid.Parse(request.UserId), request.Notification.IsSavable ); + return new Empty(); } @@ -114,10 +128,18 @@ public class PusherServiceGrpc( ? GrpcTypeHelper.ConvertByteStringToObject>(request.Notification.Meta) ?? [] : [], }; + if (request.Notification.ActionUri is not null) notification.Meta["action_uri"] = request.Notification.ActionUri; - var accounts = request.UserIds.Select(Guid.Parse).ToList(); - await pushService.SendNotificationBatch(notification, accounts, request.Notification.IsSavable); + + var tasks = request.UserIds + .Select(userId => queueService.EnqueuePushNotification( + notification, + Guid.Parse(userId), + request.Notification.IsSavable + )); + + await Task.WhenAll(tasks); return new Empty(); } diff --git a/DysonNetwork.Pusher/Services/QueueBackgroundService.cs b/DysonNetwork.Pusher/Services/QueueBackgroundService.cs new file mode 100644 index 0000000..5593b65 --- /dev/null +++ b/DysonNetwork.Pusher/Services/QueueBackgroundService.cs @@ -0,0 +1,135 @@ +using System.Text.Json; +using DysonNetwork.Pusher.Email; +using DysonNetwork.Pusher.Notification; +using DysonNetwork.Shared.Proto; +using DysonNetwork.Shared.Registry; +using NATS.Client.Core; + +namespace DysonNetwork.Pusher.Services; + +public class QueueBackgroundService( + INatsConnection nats, + IServiceProvider serviceProvider, + ILogger logger, + IConfiguration configuration +) + : BackgroundService +{ + private const string QueueName = "pusher.queue"; + private const string QueueGroup = "pusher.workers"; + private readonly int _consumerCount = configuration.GetValue("ConsumerCount") ?? Environment.ProcessorCount; + private readonly List _consumerTasks = []; + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + logger.LogInformation("Starting {ConsumerCount} queue consumers", _consumerCount); + + // Start multiple consumers + for (var i = 0; i < _consumerCount; i++) + _consumerTasks.Add(Task.Run(() => RunConsumerAsync(stoppingToken), stoppingToken)); + + // Wait for all consumers to complete + await Task.WhenAll(_consumerTasks); + } + + private async Task RunConsumerAsync(CancellationToken stoppingToken) + { + logger.LogInformation("Queue consumer started"); + + await foreach (var msg in nats.SubscribeAsync( + QueueName, + queueGroup: QueueGroup, + cancellationToken: stoppingToken)) + { + try + { + await ProcessMessageAsync(msg, stoppingToken); + } + catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) + { + // Normal shutdown + break; + } + catch (Exception ex) + { + logger.LogError(ex, "Error in queue consumer"); + // Add a small delay to prevent tight error loops + await Task.Delay(1000, stoppingToken); + } + } + } + + private async ValueTask ProcessMessageAsync(NatsMsg msg, CancellationToken cancellationToken) + { + using var scope = serviceProvider.CreateScope(); + var message = msg.Data; + + logger.LogDebug("Processing message of type {MessageType}", message.Type); + + try + { + switch (message.Type) + { + case QueueMessageType.Email: + await ProcessEmailMessageAsync(message, scope, cancellationToken); + break; + + case QueueMessageType.PushNotification: + await ProcessPushNotificationMessageAsync(message, scope, cancellationToken); + break; + + default: + logger.LogWarning("Unknown message type: {MessageType}", message.Type); + break; + } + + await msg.ReplyAsync(); + } + catch (Exception ex) + { + logger.LogError(ex, "Error processing message of type {MessageType}", message.Type); + // Don't rethrow to prevent the message from being retried indefinitely + // In a production scenario, you might want to implement a dead-letter queue + } + } + + private static async Task ProcessEmailMessageAsync(QueueMessage message, IServiceScope scope, + CancellationToken cancellationToken) + { + var emailService = scope.ServiceProvider.GetRequiredService(); + var emailMessage = JsonSerializer.Deserialize(message.Data) + ?? throw new InvalidOperationException("Invalid email message format"); + + await emailService.SendEmailAsync( + emailMessage.ToName, + emailMessage.ToAddress, + emailMessage.Subject, + emailMessage.Body); + } + + private static async Task ProcessPushNotificationMessageAsync(QueueMessage message, IServiceScope scope, + CancellationToken cancellationToken) + { + var pushService = scope.ServiceProvider.GetRequiredService(); + var logger = scope.ServiceProvider.GetRequiredService>(); + + var notification = JsonSerializer.Deserialize(message.Data); + if (notification == null) + { + logger.LogError("Invalid push notification data format"); + return; + } + + try + { + logger.LogDebug("Processing push notification for account {AccountId}", notification.AccountId); + await pushService.DeliverPushNotification(notification, cancellationToken); + logger.LogDebug("Successfully processed push notification for account {AccountId}", notification.AccountId); + } + catch (Exception ex) + { + logger.LogError(ex, "Error processing push notification for account {AccountId}", notification.AccountId); + // Don't rethrow to prevent the message from being retried indefinitely + } + } +} \ No newline at end of file diff --git a/DysonNetwork.Pusher/Services/QueueService.cs b/DysonNetwork.Pusher/Services/QueueService.cs new file mode 100644 index 0000000..95c21d9 --- /dev/null +++ b/DysonNetwork.Pusher/Services/QueueService.cs @@ -0,0 +1,61 @@ +using System.Text.Json; +using NATS.Client.Core; + +namespace DysonNetwork.Pusher.Services; + +public class QueueService(INatsConnection nats) +{ + private const string QueueName = "pusher_queue"; + + public async Task EnqueueEmail(string toName, string toAddress, string subject, string body) + { + var message = new QueueMessage + { + Type = QueueMessageType.Email, + Data = JsonSerializer.Serialize(new EmailMessage + { + ToName = toName, + ToAddress = toAddress, + Subject = subject, + Body = body + }) + }; + await nats.PublishAsync(QueueName, message); + } + + public async Task EnqueuePushNotification(Notification.Notification notification, Guid userId, bool isSavable = false) + { + // Update the account ID in case it wasn't set + notification.AccountId = userId; + + var message = new QueueMessage + { + Type = QueueMessageType.PushNotification, + TargetId = userId.ToString(), + Data = JsonSerializer.Serialize(notification) + }; + + await nats.PublishAsync(QueueName, message); + } +} + +public class QueueMessage +{ + public QueueMessageType Type { get; set; } + public string? TargetId { get; set; } + public string Data { get; set; } = string.Empty; +} + +public enum QueueMessageType +{ + Email, + PushNotification +} + +public class EmailMessage +{ + public string ToName { get; set; } = string.Empty; + public string ToAddress { get; set; } = string.Empty; + public string Subject { get; set; } = string.Empty; + public string Body { get; set; } = string.Empty; +} \ No newline at end of file diff --git a/DysonNetwork.Pusher/Startup/ServiceCollectionExtensions.cs b/DysonNetwork.Pusher/Startup/ServiceCollectionExtensions.cs index 8104d3e..ffefb67 100644 --- a/DysonNetwork.Pusher/Startup/ServiceCollectionExtensions.cs +++ b/DysonNetwork.Pusher/Startup/ServiceCollectionExtensions.cs @@ -137,6 +137,12 @@ public static class ServiceCollectionExtensions services.AddScoped(); services.AddScoped(); services.AddScoped(); + + // Register QueueService as a singleton since it's thread-safe + services.AddSingleton(); + + // Register the background service + services.AddHostedService(); return services; }