119 lines
		
	
	
		
			4.3 KiB
		
	
	
	
		
			C#
		
	
	
	
	
	
			
		
		
	
	
			119 lines
		
	
	
		
			4.3 KiB
		
	
	
	
		
			C#
		
	
	
	
	
	
| using System.Text.Json;
 | |
| using DysonNetwork.Ring.Email;
 | |
| using DysonNetwork.Ring.Notification;
 | |
| using DysonNetwork.Shared.Proto;
 | |
| using Google.Protobuf;
 | |
| using NATS.Client.Core;
 | |
| 
 | |
| namespace DysonNetwork.Ring.Services;
 | |
| 
 | |
| public class QueueBackgroundService(
 | |
|     INatsConnection nats,
 | |
|     IServiceProvider serviceProvider,
 | |
|     ILogger<QueueBackgroundService> logger,
 | |
|     IConfiguration configuration
 | |
| )
 | |
|     : BackgroundService
 | |
| {
 | |
|     public const string QueueName = "pusher_queue";
 | |
|     private const string QueueGroup = "pusher_workers";
 | |
|     private readonly int _consumerCount = configuration.GetValue<int?>("ConsumerCount") ?? Environment.ProcessorCount;
 | |
|     private readonly List<Task> _consumerTasks = [];
 | |
| 
 | |
|     protected override async Task ExecuteAsync(CancellationToken stoppingToken)
 | |
|     {
 | |
|         logger.LogInformation("Starting {ConsumerCount} queue consumers", _consumerCount);
 | |
| 
 | |
|         // Start multiple consumers
 | |
|         for (var i = 0; i < _consumerCount; i++)
 | |
|             _consumerTasks.Add(Task.Run(() => RunConsumerAsync(stoppingToken), stoppingToken));
 | |
| 
 | |
|         // Wait for all consumers to complete
 | |
|         await Task.WhenAll(_consumerTasks);
 | |
|     }
 | |
| 
 | |
|     private async Task RunConsumerAsync(CancellationToken stoppingToken)
 | |
|     {
 | |
|         logger.LogInformation("Queue consumer started");
 | |
| 
 | |
|         await foreach (var msg in nats.SubscribeAsync<byte[]>(QueueName, queueGroup: QueueGroup, cancellationToken: stoppingToken))
 | |
|         {
 | |
|             try
 | |
|             {
 | |
|                 var message = GrpcTypeHelper.ConvertByteStringToObject<QueueMessage>(ByteString.CopyFrom(msg.Data));
 | |
|                 if (message is not null)
 | |
|                 {
 | |
|                     await ProcessMessageAsync(message, stoppingToken);
 | |
|                 }
 | |
|                 else
 | |
|                 {
 | |
|                     logger.LogWarning($"Invalid message format for {msg.Subject}");
 | |
|                 }
 | |
|             }
 | |
|             catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
 | |
|             {
 | |
|                 // Normal shutdown
 | |
|                 break;
 | |
|             }
 | |
|             catch (Exception ex)
 | |
|             {
 | |
|                 logger.LogError(ex, "Error in queue consumer");
 | |
|             }
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     private async ValueTask ProcessMessageAsync(QueueMessage message,
 | |
|         CancellationToken cancellationToken)
 | |
|     {
 | |
|         using var scope = serviceProvider.CreateScope();
 | |
| 
 | |
|         logger.LogDebug("Processing message of type {MessageType}", message.Type);
 | |
| 
 | |
|         switch (message.Type)
 | |
|         {
 | |
|             case QueueMessageType.Email:
 | |
|                 await ProcessEmailMessageAsync(message, scope);
 | |
|                 break;
 | |
| 
 | |
|             case QueueMessageType.PushNotification:
 | |
|                 await ProcessPushNotificationMessageAsync(message, scope, cancellationToken);
 | |
|                 break;
 | |
| 
 | |
|             default:
 | |
|                 logger.LogWarning("Unknown message type: {MessageType}", message.Type);
 | |
|                 break;
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     private static async Task ProcessEmailMessageAsync(QueueMessage message, IServiceScope scope)
 | |
|     {
 | |
|         var emailService = scope.ServiceProvider.GetRequiredService<EmailService>();
 | |
|         var emailMessage = JsonSerializer.Deserialize<EmailMessage>(message.Data)
 | |
|                            ?? throw new InvalidOperationException("Invalid email message format");
 | |
| 
 | |
|         await emailService.SendEmailAsync(
 | |
|             emailMessage.ToName,
 | |
|             emailMessage.ToAddress,
 | |
|             emailMessage.Subject,
 | |
|             emailMessage.Body);
 | |
|     }
 | |
| 
 | |
|     private static async Task ProcessPushNotificationMessageAsync(QueueMessage message, IServiceScope scope,
 | |
|         CancellationToken cancellationToken)
 | |
|     {
 | |
|         var pushService = scope.ServiceProvider.GetRequiredService<PushService>();
 | |
|         var logger = scope.ServiceProvider.GetRequiredService<ILogger<QueueBackgroundService>>();
 | |
| 
 | |
|         var notification = JsonSerializer.Deserialize<Shared.Models.SnNotification>(message.Data);
 | |
|         if (notification == null)
 | |
|         {
 | |
|             logger.LogError("Invalid push notification data format");
 | |
|             return;
 | |
|         }
 | |
| 
 | |
|         logger.LogDebug("Processing push notification for account {AccountId}", notification.AccountId);
 | |
|         await pushService.DeliverPushNotification(notification, cancellationToken);
 | |
|         logger.LogDebug("Successfully processed push notification for account {AccountId}", notification.AccountId);
 | |
|     }
 | |
| }
 |