using System.Text.Json; using DysonNetwork.Pusher.Email; using DysonNetwork.Pusher.Notification; using DysonNetwork.Shared.Proto; using DysonNetwork.Shared.Registry; using Google.Protobuf; using NATS.Client.Core; namespace DysonNetwork.Pusher.Services; public class QueueBackgroundService( INatsConnection nats, IServiceProvider serviceProvider, ILogger logger, IConfiguration configuration ) : BackgroundService { public const string QueueName = "pusher.queue"; public const string QueueGroup = "pusher.workers"; private readonly int _consumerCount = configuration.GetValue("ConsumerCount") ?? Environment.ProcessorCount; private readonly List _consumerTasks = []; protected override async Task ExecuteAsync(CancellationToken stoppingToken) { logger.LogInformation("Starting {ConsumerCount} queue consumers", _consumerCount); // Start multiple consumers for (var i = 0; i < _consumerCount; i++) _consumerTasks.Add(Task.Run(() => RunConsumerAsync(stoppingToken), stoppingToken)); // Wait for all consumers to complete await Task.WhenAll(_consumerTasks); } private async Task RunConsumerAsync(CancellationToken stoppingToken) { logger.LogInformation("Queue consumer started"); await foreach (var msg in nats.SubscribeAsync( QueueName, queueGroup: QueueGroup, cancellationToken: stoppingToken)) { try { var message = GrpcTypeHelper.ConvertByteStringToObject(ByteString.CopyFrom(msg.Data)); if (message is not null) { await ProcessMessageAsync(msg, message, stoppingToken); } else { await msg.ReplyAsync(cancellationToken: stoppingToken); logger.LogWarning($"Invalid message format for {msg.Subject}"); } } catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) { // Normal shutdown break; } catch (Exception ex) { logger.LogError(ex, "Error in queue consumer"); // Add a small delay to prevent tight error loops await Task.Delay(1000, stoppingToken); } } } private async ValueTask ProcessMessageAsync(NatsMsg rawMsg, QueueMessage message, CancellationToken cancellationToken) { using var scope = serviceProvider.CreateScope(); logger.LogDebug("Processing message of type {MessageType}", message.Type); try { switch (message.Type) { case QueueMessageType.Email: await ProcessEmailMessageAsync(message, scope, cancellationToken); break; case QueueMessageType.PushNotification: await ProcessPushNotificationMessageAsync(message, scope, cancellationToken); break; default: logger.LogWarning("Unknown message type: {MessageType}", message.Type); break; } await rawMsg.ReplyAsync(cancellationToken: cancellationToken); } catch (Exception ex) { logger.LogError(ex, "Error processing message of type {MessageType}", message.Type); // Don't rethrow to prevent the message from being retried indefinitely // In a production scenario, you might want to implement a dead-letter queue } } private static async Task ProcessEmailMessageAsync(QueueMessage message, IServiceScope scope, CancellationToken cancellationToken) { var emailService = scope.ServiceProvider.GetRequiredService(); var emailMessage = JsonSerializer.Deserialize(message.Data) ?? throw new InvalidOperationException("Invalid email message format"); await emailService.SendEmailAsync( emailMessage.ToName, emailMessage.ToAddress, emailMessage.Subject, emailMessage.Body); } private static async Task ProcessPushNotificationMessageAsync(QueueMessage message, IServiceScope scope, CancellationToken cancellationToken) { var pushService = scope.ServiceProvider.GetRequiredService(); var logger = scope.ServiceProvider.GetRequiredService>(); var notification = JsonSerializer.Deserialize(message.Data); if (notification == null) { logger.LogError("Invalid push notification data format"); return; } try { logger.LogDebug("Processing push notification for account {AccountId}", notification.AccountId); await pushService.DeliverPushNotification(notification, cancellationToken); logger.LogDebug("Successfully processed push notification for account {AccountId}", notification.AccountId); } catch (Exception ex) { logger.LogError(ex, "Error processing push notification for account {AccountId}", notification.AccountId); // Don't rethrow to prevent the message from being retried indefinitely } } }