diff --git a/DysonNetwork.Pusher/Notification/PushService.cs b/DysonNetwork.Pusher/Notification/PushService.cs index e011e4e..b28103b 100644 --- a/DysonNetwork.Pusher/Notification/PushService.cs +++ b/DysonNetwork.Pusher/Notification/PushService.cs @@ -5,10 +5,11 @@ using DysonNetwork.Shared.Cache; using DysonNetwork.Shared.Proto; using Microsoft.EntityFrameworkCore; using NodaTime; +using System.Threading.Channels; namespace DysonNetwork.Pusher.Notification; -public class PushService +public class PushService : IDisposable { private readonly AppDatabase _db; private readonly FlushBufferService _fbs; @@ -18,6 +19,11 @@ public class PushService private readonly ApnSender? _apns; private readonly string? _apnsTopic; + private readonly Channel _channel; + private readonly int _maxConcurrency; + private readonly CancellationTokenSource _cts = new(); + private readonly List _workers = new(); + public PushService( IConfiguration config, AppDatabase db, @@ -55,6 +61,45 @@ public class PushService _fbs = fbs; _ws = ws; _logger = logger; + + // --- Concurrency & channel config --- + // Defaults: 8 workers, bounded capacity 2000 items. + _maxConcurrency = Math.Max(1, cfgSection.GetValue("MaxConcurrency") ?? 8); + var capacity = Math.Max(1, cfgSection.GetValue("ChannelCapacity") ?? 2000); + + _channel = Channel.CreateBounded(new BoundedChannelOptions(capacity) + { + SingleWriter = false, + SingleReader = false, + FullMode = BoundedChannelFullMode.Wait, // apply backpressure instead of dropping + AllowSynchronousContinuations = false + }); + + // Start background consumers + for (int i = 0; i < _maxConcurrency; i++) + { + _workers.Add(Task.Run(() => WorkerLoop(_cts.Token))); + } + + _logger.LogInformation("PushService initialized with {Workers} workers and capacity {Capacity}", _maxConcurrency, capacity); + } + + public void Dispose() + { + try + { + _channel.Writer.TryComplete(); + _cts.Cancel(); + } + catch { /* ignore */ } + + try + { + Task.WhenAll(_workers).Wait(TimeSpan.FromSeconds(5)); + } + catch { /* ignore */ } + + _cts.Dispose(); } public async Task UnsubscribeDevice(string deviceId) @@ -82,7 +127,6 @@ public class PushService if (existingSubscription != null) { - // Update existing subscription existingSubscription.DeviceId = deviceId; existingSubscription.DeviceToken = deviceToken; existingSubscription.Provider = provider; @@ -93,7 +137,6 @@ public class PushService return existingSubscription; } - // Create new subscription var subscription = new PushSubscription { DeviceId = deviceId, @@ -140,7 +183,8 @@ public class PushService if (save) _fbs.Enqueue(notification); - if (!isSilent) await DeliveryNotification(notification); + if (!isSilent) + await DeliveryNotification(notification); // returns quickly (does NOT wait for APNS/FCM) } private async Task DeliveryNotification(Notification notification) @@ -152,18 +196,20 @@ public class PushService notification.Meta ); + // WS send: still immediate (fire-and-forget from caller perspective) _ws.SendPacketToAccount(notification.AccountId.ToString(), new Connection.WebSocketPacket { Type = "notifications.new", Data = notification }); - // Pushing the notification + // Query subscribers and enqueue push work (non-blocking to the HTTP request) var subscribers = await _db.PushSubscriptions .Where(s => s.AccountId == notification.AccountId) .AsNoTracking() .ToListAsync(); - await _PushNotification(notification, subscribers); + + await EnqueuePushWork(notification, subscribers); } public async Task MarkNotificationsViewed(ICollection notifications) @@ -174,8 +220,7 @@ public class PushService await _db.Notifications .Where(n => id.Contains(n.Id)) - .ExecuteUpdateAsync(s => s.SetProperty(n => n.ViewedAt, now) - ); + .ExecuteUpdateAsync(s => s.SetProperty(n => n.ViewedAt, now)); } public async Task MarkAllNotificationsViewed(Guid accountId) @@ -189,6 +234,7 @@ public class PushService public async Task SendNotificationBatch(Notification notification, List accounts, bool save = false) { if (save) + { accounts.ForEach(x => { var newNotification = new Notification @@ -203,6 +249,7 @@ public class PushService }; _fbs.Enqueue(newNotification); }); + } _logger.LogInformation( "Delivering notification in batch: {NotificationTopic} #{NotificationId} with meta {NotificationMeta}", @@ -211,9 +258,10 @@ public class PushService notification.Meta ); + // WS first foreach (var account in accounts) { - notification.AccountId = account; + notification.AccountId = account; // keep original behavior _ws.SendPacketToAccount(account.ToString(), new Connection.WebSocketPacket { Type = "notifications.new", @@ -221,25 +269,55 @@ public class PushService }); } + // Fetch all subscribers once and enqueue to workers var subscribers = await _db.PushSubscriptions .Where(s => accounts.Contains(s.AccountId)) .AsNoTracking() .ToListAsync(); - await _PushNotification(notification, subscribers); + + await EnqueuePushWork(notification, subscribers); } - private async Task _PushNotification( - Notification notification, - IEnumerable subscriptions - ) + private async Task EnqueuePushWork(Notification notification, IEnumerable subscriptions) { - var tasks = subscriptions - .Select(subscription => _PushSingleNotification(notification, subscription)) - .ToList(); + foreach (var sub in subscriptions) + { + // Use the current notification reference (no mutation of content after this point). + var item = new PushWorkItem(notification, sub); - await Task.WhenAll(tasks); + // Respect backpressure if channel is full. + await _channel.Writer.WriteAsync(item, _cts.Token); + } } + private async Task WorkerLoop(CancellationToken ct) + { + try + { + await foreach (var item in _channel.Reader.ReadAllAsync(ct)) + { + try + { + await _PushSingleNotification(item.Notification, item.Subscription); + } + catch (OperationCanceledException) when (ct.IsCancellationRequested) + { + break; + } + catch (Exception ex) + { + _logger.LogDebug(ex, "Worker handled exception for notification #{Id}", item.Notification.Id); + } + } + } + catch (OperationCanceledException) + { + // normal shutdown + } + } + + private readonly record struct PushWorkItem(Notification Notification, PushSubscription Subscription); + private async Task _PushSingleNotification(Notification notification, PushSubscription subscription) { try @@ -272,6 +350,7 @@ public class PushService ["title"] = notification.Title ?? string.Empty, ["body"] = body }, + // You can re-enable data payloads if needed. // ["data"] = new Dictionary // { // ["Id"] = notification.Id, @@ -330,7 +409,7 @@ public class PushService { _logger.LogError(ex, $"Failed to push notification #{notification.Id} to device {subscription.DeviceId}. {ex.Message}"); - throw new Exception($"Failed to send notification to {subscription.Provider}: {ex.Message}", ex); + // Swallow here to keep worker alive; upstream is fire-and-forget. } _logger.LogInformation(