⏪ Remove jetstream
This commit is contained in:
		| @@ -2,8 +2,6 @@ using System.Text.Json; | ||||
| using DysonNetwork.Pass.Wallet; | ||||
| using DysonNetwork.Shared.Stream; | ||||
| using NATS.Client.Core; | ||||
| using NATS.Client.JetStream; | ||||
| using NATS.Client.JetStream.Models; | ||||
|  | ||||
| namespace DysonNetwork.Pass.Startup; | ||||
|  | ||||
| @@ -15,11 +13,7 @@ public class BroadcastEventHandler( | ||||
| { | ||||
|     protected override async Task ExecuteAsync(CancellationToken stoppingToken) | ||||
|     { | ||||
|         var js = new NatsJSContext(nats); | ||||
|         var stream = await js.GetStreamAsync(PaymentOrderEventBase.Type, cancellationToken: stoppingToken); | ||||
|         var consumer = await stream.CreateOrUpdateConsumerAsync(new ConsumerConfig("Dy_Pass_Stellar"), cancellationToken: stoppingToken); | ||||
|  | ||||
|         await foreach (var msg in consumer.ConsumeAsync<byte[]>(cancellationToken: stoppingToken)) | ||||
|         await foreach (var msg in nats.SubscribeAsync<byte[]>(PaymentOrderEventBase.Type, cancellationToken: stoppingToken)) | ||||
|         { | ||||
|             PaymentOrderEvent? evt = null; | ||||
|             try | ||||
| @@ -29,7 +23,6 @@ public class BroadcastEventHandler( | ||||
|                 if (evt?.ProductIdentifier is null || | ||||
|                     !evt.ProductIdentifier.StartsWith(SubscriptionType.StellarProgram)) | ||||
|                 { | ||||
|                     await msg.AckAsync(cancellationToken: stoppingToken); | ||||
|                     continue; | ||||
|                 } | ||||
|  | ||||
| @@ -46,19 +39,18 @@ public class BroadcastEventHandler( | ||||
|                 if (order is null) | ||||
|                 { | ||||
|                     logger.LogWarning("Order with ID {OrderId} not found.", evt.OrderId); | ||||
|                     await msg.NakAsync(delay: TimeSpan.FromSeconds(30), cancellationToken: stoppingToken); | ||||
|                     await nats.PublishAsync(PaymentOrderEventBase.Type, msg.Data, cancellationToken: stoppingToken); | ||||
|                     continue; | ||||
|                 } | ||||
|  | ||||
|                 await subscriptions.HandleSubscriptionOrder(order); | ||||
|  | ||||
|                 logger.LogInformation("Subscription for order {OrderId} handled successfully.", evt.OrderId); | ||||
|                 await msg.AckAsync(cancellationToken: stoppingToken); | ||||
|             } | ||||
|             catch (Exception ex) | ||||
|             { | ||||
|                 logger.LogError(ex, "Error processing payment order event for order {OrderId}", evt?.OrderId); | ||||
|                 await msg.NakAsync(delay: TimeSpan.FromSeconds(30), cancellationToken: stoppingToken); | ||||
|                 await nats.PublishAsync(PaymentOrderEventBase.Type, msg.Data, cancellationToken: stoppingToken); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user