229 lines
		
	
	
		
			9.4 KiB
		
	
	
	
		
			C#
		
	
	
	
	
	
			
		
		
	
	
			229 lines
		
	
	
		
			9.4 KiB
		
	
	
	
		
			C#
		
	
	
	
	
	
| using System.Text.Json;
 | |
| using DysonNetwork.Pass.Account;
 | |
| using DysonNetwork.Pass.Wallet;
 | |
| using DysonNetwork.Shared.Models;
 | |
| using DysonNetwork.Shared.Proto;
 | |
| using DysonNetwork.Shared.Stream;
 | |
| using Google.Protobuf;
 | |
| using NATS.Client.Core;
 | |
| using NATS.Client.JetStream.Models;
 | |
| using NATS.Net;
 | |
| using NodaTime;
 | |
| 
 | |
| namespace DysonNetwork.Pass.Startup;
 | |
| 
 | |
| public class BroadcastEventHandler(
 | |
|     INatsConnection nats,
 | |
|     ILogger<BroadcastEventHandler> logger,
 | |
|     IServiceProvider serviceProvider
 | |
| ) : BackgroundService
 | |
| {
 | |
|     protected override async Task ExecuteAsync(CancellationToken stoppingToken)
 | |
|     {
 | |
|         var paymentTask = HandlePaymentEventsAsync(stoppingToken);
 | |
|         var webSocketTask = HandleWebSocketEventsAsync(stoppingToken);
 | |
| 
 | |
|         await Task.WhenAll(paymentTask, webSocketTask);
 | |
|     }
 | |
| 
 | |
|     private async Task HandlePaymentEventsAsync(CancellationToken stoppingToken)
 | |
|     {
 | |
|         var js = nats.CreateJetStreamContext();
 | |
| 
 | |
|         await js.EnsureStreamCreated("payment_events", [PaymentOrderEventBase.Type]);
 | |
| 
 | |
|         var consumer = await js.CreateOrUpdateConsumerAsync("payment_events",
 | |
|             new ConsumerConfig("pass_payment_handler"),
 | |
|             cancellationToken: stoppingToken);
 | |
| 
 | |
|         await foreach (var msg in consumer.ConsumeAsync<byte[]>(cancellationToken: stoppingToken))
 | |
|         {
 | |
|             PaymentOrderEvent? evt = null;
 | |
|             try
 | |
|             {
 | |
|                 evt = JsonSerializer.Deserialize<PaymentOrderEvent>(msg.Data, GrpcTypeHelper.SerializerOptions);
 | |
| 
 | |
|                 logger.LogInformation(
 | |
|                     "Received order event: {ProductIdentifier} {OrderId}",
 | |
|                     evt?.ProductIdentifier,
 | |
|                     evt?.OrderId
 | |
|                 );
 | |
| 
 | |
|                 if (evt?.ProductIdentifier is null)
 | |
|                     continue;
 | |
| 
 | |
|                 // Handle subscription orders
 | |
|                 if (
 | |
|                     evt.ProductIdentifier.StartsWith(SubscriptionType.StellarProgram) &&
 | |
|                     evt.Meta?.TryGetValue("gift_id", out var giftIdValue) == true
 | |
|                 )
 | |
|                 {
 | |
|                     logger.LogInformation("Handling gift order: {OrderId}", evt.OrderId);
 | |
| 
 | |
|                     await using var scope = serviceProvider.CreateAsyncScope();
 | |
|                     var db = scope.ServiceProvider.GetRequiredService<AppDatabase>();
 | |
|                     var subscriptions = scope.ServiceProvider.GetRequiredService<SubscriptionService>();
 | |
| 
 | |
|                     var order = await db.PaymentOrders.FindAsync(
 | |
|                         [evt.OrderId],
 | |
|                         cancellationToken: stoppingToken
 | |
|                     );
 | |
|                     if (order is null)
 | |
|                     {
 | |
|                         logger.LogWarning("Order with ID {OrderId} not found. Redelivering.", evt.OrderId);
 | |
|                         await msg.NakAsync(cancellationToken: stoppingToken);
 | |
|                         continue;
 | |
|                     }
 | |
| 
 | |
|                     await subscriptions.HandleGiftOrder(order);
 | |
| 
 | |
|                     logger.LogInformation("Gift for order {OrderId} handled successfully.", evt.OrderId);
 | |
|                     await msg.AckAsync(cancellationToken: stoppingToken);
 | |
|                 }
 | |
|                 else if (evt.ProductIdentifier.StartsWith(SubscriptionType.StellarProgram))
 | |
|                 {
 | |
|                     logger.LogInformation("Handling stellar program order: {OrderId}", evt.OrderId);
 | |
| 
 | |
|                     await using var scope = serviceProvider.CreateAsyncScope();
 | |
|                     var db = scope.ServiceProvider.GetRequiredService<AppDatabase>();
 | |
|                     var subscriptions = scope.ServiceProvider.GetRequiredService<SubscriptionService>();
 | |
| 
 | |
|                     var order = await db.PaymentOrders.FindAsync(
 | |
|                         [evt.OrderId],
 | |
|                         cancellationToken: stoppingToken
 | |
|                     );
 | |
|                     if (order is null)
 | |
|                     {
 | |
|                         logger.LogWarning("Order with ID {OrderId} not found. Redelivering.", evt.OrderId);
 | |
|                         await msg.NakAsync(cancellationToken: stoppingToken);
 | |
|                         continue;
 | |
|                     }
 | |
| 
 | |
|                     await subscriptions.HandleSubscriptionOrder(order);
 | |
| 
 | |
|                     logger.LogInformation("Subscription for order {OrderId} handled successfully.", evt.OrderId);
 | |
|                     await msg.AckAsync(cancellationToken: stoppingToken);
 | |
|                 }
 | |
|                 else if (evt.ProductIdentifier == "lottery")
 | |
|                 {
 | |
|                     logger.LogInformation("Handling lottery order: {OrderId}", evt.OrderId);
 | |
| 
 | |
|                     await using var scope = serviceProvider.CreateAsyncScope();
 | |
|                     var db = scope.ServiceProvider.GetRequiredService<AppDatabase>();
 | |
|                     var lotteries = scope.ServiceProvider.GetRequiredService<Lotteries.LotteryService>();
 | |
| 
 | |
|                     var order = await db.PaymentOrders.FindAsync(
 | |
|                         [evt.OrderId],
 | |
|                         cancellationToken: stoppingToken
 | |
|                     );
 | |
|                     if (order == null)
 | |
|                     {
 | |
|                         logger.LogWarning("Order with ID {OrderId} not found. Redelivering.", evt.OrderId);
 | |
|                         await msg.NakAsync(cancellationToken: stoppingToken);
 | |
|                         continue;
 | |
|                     }
 | |
| 
 | |
|                     await lotteries.HandleLotteryOrder(order);
 | |
| 
 | |
|                     logger.LogInformation("Lottery ticket for order {OrderId} created successfully.", evt.OrderId);
 | |
|                     await msg.AckAsync(cancellationToken: stoppingToken);
 | |
|                 }
 | |
|                 else
 | |
|                 {
 | |
|                     // Not a subscription, gift, or lottery order, skip
 | |
|                     continue;
 | |
|                 }
 | |
|             }
 | |
|             catch (Exception ex)
 | |
|             {
 | |
|                 logger.LogError(ex, "Error processing payment order event for order {OrderId}. Redelivering.",
 | |
|                     evt?.OrderId);
 | |
|                 await msg.NakAsync(cancellationToken: stoppingToken);
 | |
|             }
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     private async Task HandleWebSocketEventsAsync(CancellationToken stoppingToken)
 | |
|     {
 | |
|         var connectedTask = HandleConnectedEventsAsync(stoppingToken);
 | |
|         var disconnectedTask = HandleDisconnectedEventsAsync(stoppingToken);
 | |
| 
 | |
|         await Task.WhenAll(connectedTask, disconnectedTask);
 | |
|     }
 | |
| 
 | |
|     private async Task HandleConnectedEventsAsync(CancellationToken stoppingToken)
 | |
|     {
 | |
|         await foreach (var msg in nats.SubscribeAsync<byte[]>("websocket_connected", cancellationToken: stoppingToken))
 | |
|         {
 | |
|             try
 | |
|             {
 | |
|                 var evt =
 | |
|                     GrpcTypeHelper.ConvertByteStringToObject<WebSocketConnectedEvent>(ByteString.CopyFrom(msg.Data));
 | |
| 
 | |
|                 logger.LogInformation("Received WebSocket connected event for user {AccountId}, device {DeviceId}",
 | |
|                     evt.AccountId, evt.DeviceId);
 | |
| 
 | |
|                 await using var scope = serviceProvider.CreateAsyncScope();
 | |
|                 var accountEventService = scope.ServiceProvider.GetRequiredService<AccountEventService>();
 | |
| 
 | |
|                 var status = await accountEventService.GetStatus(evt.AccountId);
 | |
| 
 | |
|                 await nats.PublishAsync(
 | |
|                     AccountStatusUpdatedEvent.Type,
 | |
|                     GrpcTypeHelper.ConvertObjectToByteString(new AccountStatusUpdatedEvent
 | |
|                     {
 | |
|                         AccountId = evt.AccountId,
 | |
|                         Status = status,
 | |
|                         UpdatedAt = SystemClock.Instance.GetCurrentInstant()
 | |
|                     }).ToByteArray()
 | |
|                 );
 | |
| 
 | |
|                 logger.LogInformation("Broadcasted status update for user {AccountId}", evt.AccountId);
 | |
|             }
 | |
|             catch (Exception ex)
 | |
|             {
 | |
|                 logger.LogError(ex, "Error processing WebSocket connected event");
 | |
|             }
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     private async Task HandleDisconnectedEventsAsync(CancellationToken stoppingToken)
 | |
|     {
 | |
|         await foreach (var msg in nats.SubscribeAsync<byte[]>("websocket_disconnected",
 | |
|                            cancellationToken: stoppingToken))
 | |
|         {
 | |
|             try
 | |
|             {
 | |
|                 var evt =
 | |
|                     GrpcTypeHelper.ConvertByteStringToObject<WebSocketDisconnectedEvent>(ByteString.CopyFrom(msg.Data));
 | |
| 
 | |
|                 logger.LogInformation(
 | |
|                     "Received WebSocket disconnected event for user {AccountId}, device {DeviceId}, IsOffline: {IsOffline}",
 | |
|                     evt.AccountId, evt.DeviceId, evt.IsOffline
 | |
|                 );
 | |
| 
 | |
|                 await using var scope = serviceProvider.CreateAsyncScope();
 | |
|                 var accountEventService = scope.ServiceProvider.GetRequiredService<AccountEventService>();
 | |
| 
 | |
|                 var status = await accountEventService.GetStatus(evt.AccountId);
 | |
| 
 | |
|                 await nats.PublishAsync(
 | |
|                     AccountStatusUpdatedEvent.Type,
 | |
|                     GrpcTypeHelper.ConvertObjectToByteString(new AccountStatusUpdatedEvent
 | |
|                     {
 | |
|                         AccountId = evt.AccountId,
 | |
|                         Status = status,
 | |
|                         UpdatedAt = SystemClock.Instance.GetCurrentInstant()
 | |
|                     }).ToByteArray()
 | |
|                 );
 | |
| 
 | |
|                 logger.LogInformation("Broadcasted status update for user {AccountId}", evt.AccountId);
 | |
|             }
 | |
|             catch (Exception ex)
 | |
|             {
 | |
|                 logger.LogError(ex, "Error processing WebSocket disconnected event");
 | |
|             }
 | |
|         }
 | |
|     }
 | |
| }
 |