using CorePush.Apple; using CorePush.Firebase; using DysonNetwork.Pusher.Connection; using DysonNetwork.Shared.Cache; using DysonNetwork.Shared.Proto; using Microsoft.EntityFrameworkCore; using NodaTime; using System.Threading.Channels; namespace DysonNetwork.Pusher.Notification; public class PushService : IDisposable { private readonly AppDatabase _db; private readonly FlushBufferService _fbs; private readonly WebSocketService _ws; private readonly ILogger _logger; private readonly FirebaseSender? _fcm; private readonly ApnSender? _apns; private readonly string? _apnsTopic; private readonly Channel _channel; private readonly int _maxConcurrency; private readonly CancellationTokenSource _cts = new(); private readonly List _workers = new(); public PushService( IConfiguration config, AppDatabase db, FlushBufferService fbs, WebSocketService ws, IHttpClientFactory httpFactory, ILogger logger ) { var cfgSection = config.GetSection("Notifications:Push"); // Set up Firebase Cloud Messaging var fcmConfig = cfgSection.GetValue("Google"); if (fcmConfig != null && File.Exists(fcmConfig)) _fcm = new FirebaseSender(File.ReadAllText(fcmConfig), httpFactory.CreateClient()); // Set up Apple Push Notification Service var apnsKeyPath = cfgSection.GetValue("Apple:PrivateKey"); if (apnsKeyPath != null && File.Exists(apnsKeyPath)) { _apns = new ApnSender(new ApnSettings { P8PrivateKey = File.ReadAllText(apnsKeyPath), P8PrivateKeyId = cfgSection.GetValue("Apple:PrivateKeyId"), TeamId = cfgSection.GetValue("Apple:TeamId"), AppBundleIdentifier = cfgSection.GetValue("Apple:BundleIdentifier"), ServerType = cfgSection.GetValue("Production") ? ApnServerType.Production : ApnServerType.Development }, httpFactory.CreateClient()); _apnsTopic = cfgSection.GetValue("Apple:BundleIdentifier"); } _db = db; _fbs = fbs; _ws = ws; _logger = logger; // --- Concurrency & channel config --- // Defaults: 8 workers, bounded capacity 2000 items. _maxConcurrency = Math.Max(1, cfgSection.GetValue("MaxConcurrency") ?? 8); var capacity = Math.Max(1, cfgSection.GetValue("ChannelCapacity") ?? 2000); _channel = Channel.CreateBounded(new BoundedChannelOptions(capacity) { SingleWriter = false, SingleReader = false, FullMode = BoundedChannelFullMode.Wait, // apply backpressure instead of dropping AllowSynchronousContinuations = false }); // Start background consumers for (int i = 0; i < _maxConcurrency; i++) { _workers.Add(Task.Run(() => WorkerLoop(_cts.Token))); } _logger.LogInformation("PushService initialized with {Workers} workers and capacity {Capacity}", _maxConcurrency, capacity); } public void Dispose() { try { _channel.Writer.TryComplete(); _cts.Cancel(); } catch { /* ignore */ } try { Task.WhenAll(_workers).Wait(TimeSpan.FromSeconds(5)); } catch { /* ignore */ } _cts.Dispose(); } public async Task UnsubscribeDevice(string deviceId) { await _db.PushSubscriptions .Where(s => s.DeviceId == deviceId) .ExecuteDeleteAsync(); } public async Task SubscribeDevice( string deviceId, string deviceToken, PushProvider provider, Account account ) { var now = SystemClock.Instance.GetCurrentInstant(); var accountId = Guid.Parse(account.Id!); // Check for existing subscription with same device ID or token var existingSubscription = await _db.PushSubscriptions .Where(s => s.AccountId == accountId) .Where(s => s.DeviceId == deviceId || s.DeviceToken == deviceToken) .FirstOrDefaultAsync(); if (existingSubscription != null) { existingSubscription.DeviceId = deviceId; existingSubscription.DeviceToken = deviceToken; existingSubscription.Provider = provider; existingSubscription.UpdatedAt = now; _db.Update(existingSubscription); await _db.SaveChangesAsync(); return existingSubscription; } var subscription = new PushSubscription { DeviceId = deviceId, DeviceToken = deviceToken, Provider = provider, AccountId = accountId, CreatedAt = now, UpdatedAt = now }; _db.PushSubscriptions.Add(subscription); await _db.SaveChangesAsync(); return subscription; } public async Task SendNotification(Account account, string topic, string? title = null, string? subtitle = null, string? content = null, Dictionary? meta = null, string? actionUri = null, bool isSilent = false, bool save = true) { meta ??= []; if (title is null && subtitle is null && content is null) throw new ArgumentException("Unable to send notification that completely empty."); if (actionUri is not null) meta["action_uri"] = actionUri; var accountId = Guid.Parse(account.Id!); var notification = new Notification { Topic = topic, Title = title, Subtitle = subtitle, Content = content, Meta = meta, AccountId = accountId, }; if (save) _fbs.Enqueue(notification); if (!isSilent) await DeliveryNotification(notification); // returns quickly (does NOT wait for APNS/FCM) } private async Task DeliveryNotification(Notification notification) { _logger.LogInformation( "Delivering notification: {NotificationTopic} #{NotificationId} with meta {NotificationMeta}", notification.Topic, notification.Id, notification.Meta ); // WS send: still immediate (fire-and-forget from caller perspective) _ws.SendPacketToAccount(notification.AccountId.ToString(), new Connection.WebSocketPacket { Type = "notifications.new", Data = notification }); // Query subscribers and enqueue push work (non-blocking to the HTTP request) var subscribers = await _db.PushSubscriptions .Where(s => s.AccountId == notification.AccountId) .AsNoTracking() .ToListAsync(); await EnqueuePushWork(notification, subscribers); } public async Task MarkNotificationsViewed(ICollection notifications) { var now = SystemClock.Instance.GetCurrentInstant(); var id = notifications.Where(n => n.ViewedAt == null).Select(n => n.Id).ToList(); if (id.Count == 0) return; await _db.Notifications .Where(n => id.Contains(n.Id)) .ExecuteUpdateAsync(s => s.SetProperty(n => n.ViewedAt, now)); } public async Task MarkAllNotificationsViewed(Guid accountId) { var now = SystemClock.Instance.GetCurrentInstant(); await _db.Notifications .Where(n => n.AccountId == accountId) .ExecuteUpdateAsync(s => s.SetProperty(n => n.ViewedAt, now)); } public async Task SendNotificationBatch(Notification notification, List accounts, bool save = false) { if (save) { accounts.ForEach(x => { var newNotification = new Notification { Topic = notification.Topic, Title = notification.Title, Subtitle = notification.Subtitle, Content = notification.Content, Meta = notification.Meta, Priority = notification.Priority, AccountId = x }; _fbs.Enqueue(newNotification); }); } _logger.LogInformation( "Delivering notification in batch: {NotificationTopic} #{NotificationId} with meta {NotificationMeta}", notification.Topic, notification.Id, notification.Meta ); // WS first foreach (var account in accounts) { notification.AccountId = account; // keep original behavior _ws.SendPacketToAccount(account.ToString(), new Connection.WebSocketPacket { Type = "notifications.new", Data = notification }); } // Fetch all subscribers once and enqueue to workers var subscribers = await _db.PushSubscriptions .Where(s => accounts.Contains(s.AccountId)) .AsNoTracking() .ToListAsync(); await EnqueuePushWork(notification, subscribers); } private async Task EnqueuePushWork(Notification notification, IEnumerable subscriptions) { foreach (var sub in subscriptions) { // Use the current notification reference (no mutation of content after this point). var item = new PushWorkItem(notification, sub); // Respect backpressure if channel is full. await _channel.Writer.WriteAsync(item, _cts.Token); } } private async Task WorkerLoop(CancellationToken ct) { try { await foreach (var item in _channel.Reader.ReadAllAsync(ct)) { try { await _PushSingleNotification(item.Notification, item.Subscription); } catch (OperationCanceledException) when (ct.IsCancellationRequested) { break; } catch (Exception ex) { _logger.LogDebug(ex, "Worker handled exception for notification #{Id}", item.Notification.Id); } } } catch (OperationCanceledException) { // normal shutdown } } private readonly record struct PushWorkItem(Notification Notification, PushSubscription Subscription); private async Task _PushSingleNotification(Notification notification, PushSubscription subscription) { try { _logger.LogDebug( $"Pushing notification {notification.Topic} #{notification.Id} to device #{subscription.DeviceId}"); switch (subscription.Provider) { case PushProvider.Google: if (_fcm == null) throw new InvalidOperationException("Firebase Cloud Messaging is not initialized."); var body = string.Empty; if (!string.IsNullOrEmpty(notification.Subtitle) || !string.IsNullOrEmpty(notification.Content)) { body = string.Join("\n", notification.Subtitle ?? string.Empty, notification.Content ?? string.Empty ).Trim(); } var fcmResult = await _fcm.SendAsync(new Dictionary { ["message"] = new Dictionary { ["token"] = subscription.DeviceToken, ["notification"] = new Dictionary { ["title"] = notification.Title ?? string.Empty, ["body"] = body }, // You can re-enable data payloads if needed. // ["data"] = new Dictionary // { // ["Id"] = notification.Id, // ["Topic"] = notification.Topic, // ["Meta"] = notification.Meta // } } }); if (fcmResult.Error != null) throw new Exception($"Notification pushed failed ({fcmResult.StatusCode}) {fcmResult.Error}"); break; case PushProvider.Apple: if (_apns == null) throw new InvalidOperationException("Apple Push Notification Service is not initialized."); var alertDict = new Dictionary(); if (!string.IsNullOrEmpty(notification.Title)) alertDict["title"] = notification.Title; if (!string.IsNullOrEmpty(notification.Subtitle)) alertDict["subtitle"] = notification.Subtitle; if (!string.IsNullOrEmpty(notification.Content)) alertDict["body"] = notification.Content; var payload = new Dictionary { ["topic"] = _apnsTopic, ["type"] = notification.Topic, ["aps"] = new Dictionary { ["alert"] = alertDict, ["sound"] = notification.Priority >= 5 ? "default" : null, ["mutable-content"] = 1 }, ["meta"] = notification.Meta }; var apnResult = await _apns.SendAsync( payload, deviceToken: subscription.DeviceToken, apnsId: notification.Id.ToString(), apnsPriority: notification.Priority, apnPushType: ApnPushType.Alert ); if (apnResult.Error != null) throw new Exception($"Notification pushed failed ({apnResult.StatusCode}) {apnResult.Error}"); break; default: throw new InvalidOperationException($"Push provider not supported: {subscription.Provider}"); } } catch (Exception ex) { _logger.LogError(ex, $"Failed to push notification #{notification.Id} to device {subscription.DeviceId}. {ex.Message}"); // Swallow here to keep worker alive; upstream is fire-and-forget. } _logger.LogInformation( $"Successfully pushed notification #{notification.Id} to device {subscription.DeviceId} provider {subscription.Provider}"); } }