diff --git a/DysonNetwork.Drive/Startup/BroadcastEventHandler.cs b/DysonNetwork.Drive/Startup/BroadcastEventHandler.cs index 6b22c87..9b0678a 100644 --- a/DysonNetwork.Drive/Startup/BroadcastEventHandler.cs +++ b/DysonNetwork.Drive/Startup/BroadcastEventHandler.cs @@ -3,8 +3,6 @@ using DysonNetwork.Drive.Storage; using DysonNetwork.Shared.Stream; using Microsoft.EntityFrameworkCore; using NATS.Client.Core; -using NATS.Client.JetStream; -using NATS.Client.JetStream.Models; namespace DysonNetwork.Drive.Startup; @@ -16,23 +14,14 @@ public class BroadcastEventHandler( { protected override async Task ExecuteAsync(CancellationToken stoppingToken) { - var js = new NatsJSContext(nats); - var stream = await js.GetStreamAsync(AccountDeletedEvent.Type, cancellationToken: stoppingToken); - var consumer = await stream.CreateOrUpdateConsumerAsync(new ConsumerConfig("Dy_Drive_AccountDeleted"), cancellationToken: stoppingToken); - - await foreach (var msg in consumer.ConsumeAsync(cancellationToken: stoppingToken)) + await foreach (var msg in nats.SubscribeAsync(AccountDeletedEvent.Type, cancellationToken: stoppingToken)) { - AccountDeletedEvent? evt = null; try { - evt = JsonSerializer.Deserialize(msg.Data); - if (evt == null) - { - await msg.AckAsync(cancellationToken: stoppingToken); - continue; - } + var evt = JsonSerializer.Deserialize(msg.Data); + if (evt == null) continue; - logger.LogInformation("Processing account deletion for: {AccountId}", evt.AccountId); + logger.LogInformation("Account deleted: {AccountId}", evt.AccountId); using var scope = serviceProvider.CreateScope(); var fs = scope.ServiceProvider.GetRequiredService(); @@ -45,30 +34,22 @@ public class BroadcastEventHandler( .Where(p => p.AccountId == evt.AccountId) .ToListAsync(cancellationToken: stoppingToken); - if (files.Any()) - { - await fs.DeleteFileDataBatchAsync(files); - await db.Files - .Where(p => p.AccountId == evt.AccountId) - .ExecuteDeleteAsync(cancellationToken: stoppingToken); - } + await fs.DeleteFileDataBatchAsync(files); + await db.Files + .Where(p => p.AccountId == evt.AccountId) + .ExecuteDeleteAsync(cancellationToken: stoppingToken); await transaction.CommitAsync(cancellationToken: stoppingToken); - - await msg.AckAsync(cancellationToken: stoppingToken); - logger.LogInformation("Account deletion for {AccountId} processed successfully.", evt.AccountId); } - catch (Exception ex) + catch (Exception) { - logger.LogError(ex, "Error during transaction for account deletion {AccountId}, rolling back.", evt.AccountId); - await transaction.RollbackAsync(CancellationToken.None); - throw; // Let outer catch handle Nak + await transaction.RollbackAsync(cancellationToken: stoppingToken); + throw; } } catch (Exception ex) { - logger.LogError(ex, "Failed to process account deletion for {AccountId}, will retry.", evt?.AccountId); - await msg.NakAsync(delay: TimeSpan.FromSeconds(30), cancellationToken: stoppingToken); + logger.LogError(ex, "Error processing AccountDeleted"); } } } diff --git a/DysonNetwork.Pass/Startup/BroadcastEventHandler.cs b/DysonNetwork.Pass/Startup/BroadcastEventHandler.cs index 0bd97ca..83017b5 100644 --- a/DysonNetwork.Pass/Startup/BroadcastEventHandler.cs +++ b/DysonNetwork.Pass/Startup/BroadcastEventHandler.cs @@ -2,8 +2,6 @@ using System.Text.Json; using DysonNetwork.Pass.Wallet; using DysonNetwork.Shared.Stream; using NATS.Client.Core; -using NATS.Client.JetStream; -using NATS.Client.JetStream.Models; namespace DysonNetwork.Pass.Startup; @@ -15,11 +13,7 @@ public class BroadcastEventHandler( { protected override async Task ExecuteAsync(CancellationToken stoppingToken) { - var js = new NatsJSContext(nats); - var stream = await js.GetStreamAsync(PaymentOrderEventBase.Type, cancellationToken: stoppingToken); - var consumer = await stream.CreateOrUpdateConsumerAsync(new ConsumerConfig("Dy_Pass_Stellar"), cancellationToken: stoppingToken); - - await foreach (var msg in consumer.ConsumeAsync(cancellationToken: stoppingToken)) + await foreach (var msg in nats.SubscribeAsync(PaymentOrderEventBase.Type, cancellationToken: stoppingToken)) { PaymentOrderEvent? evt = null; try @@ -29,7 +23,6 @@ public class BroadcastEventHandler( if (evt?.ProductIdentifier is null || !evt.ProductIdentifier.StartsWith(SubscriptionType.StellarProgram)) { - await msg.AckAsync(cancellationToken: stoppingToken); continue; } @@ -46,19 +39,18 @@ public class BroadcastEventHandler( if (order is null) { logger.LogWarning("Order with ID {OrderId} not found.", evt.OrderId); - await msg.NakAsync(delay: TimeSpan.FromSeconds(30), cancellationToken: stoppingToken); + await nats.PublishAsync(PaymentOrderEventBase.Type, msg.Data, cancellationToken: stoppingToken); continue; } await subscriptions.HandleSubscriptionOrder(order); logger.LogInformation("Subscription for order {OrderId} handled successfully.", evt.OrderId); - await msg.AckAsync(cancellationToken: stoppingToken); } catch (Exception ex) { logger.LogError(ex, "Error processing payment order event for order {OrderId}", evt?.OrderId); - await msg.NakAsync(delay: TimeSpan.FromSeconds(30), cancellationToken: stoppingToken); + await nats.PublishAsync(PaymentOrderEventBase.Type, msg.Data, cancellationToken: stoppingToken); } } } diff --git a/DysonNetwork.Sphere/Startup/BroadcastEventHandler.cs b/DysonNetwork.Sphere/Startup/BroadcastEventHandler.cs index 6d97f52..6620fa5 100644 --- a/DysonNetwork.Sphere/Startup/BroadcastEventHandler.cs +++ b/DysonNetwork.Sphere/Startup/BroadcastEventHandler.cs @@ -4,8 +4,6 @@ using DysonNetwork.Shared.Stream; using DysonNetwork.Sphere.Post; using Microsoft.EntityFrameworkCore; using NATS.Client.Core; -using NATS.Client.JetStream; -using NATS.Client.JetStream.Models; namespace DysonNetwork.Sphere.Startup; @@ -31,38 +29,17 @@ public class BroadcastEventHandler( { protected override async Task ExecuteAsync(CancellationToken stoppingToken) { - try - { - var paymentTask = ProcessPaymentOrdersAsync(stoppingToken); - var accountTask = ProcessAccountDeletionsAsync(stoppingToken); - - await Task.WhenAll(paymentTask, accountTask); - } - catch (Exception ex) - { - logger.LogError(ex, "BroadcastEventHandler stopped due to an unhandled exception."); - } - } - - private async Task ProcessPaymentOrdersAsync(CancellationToken stoppingToken) - { - var js = new NatsJSContext(nats); - var stream = await js.GetStreamAsync(PaymentOrderEventBase.Type, cancellationToken: stoppingToken); - var consumer = await stream.CreateOrUpdateConsumerAsync(new ConsumerConfig("Dy_Sphere_PaymentOrder"), - cancellationToken: stoppingToken); - - await foreach (var msg in consumer.ConsumeAsync(cancellationToken: stoppingToken)) + await foreach (var msg in nats.SubscribeAsync(PaymentOrderEventBase.Type, cancellationToken: stoppingToken)) { PaymentOrderEvent? evt = null; try { evt = JsonSerializer.Deserialize(msg.Data); + // Every order goes into the MQ is already paid, so we skipped the status validation + if (evt?.ProductIdentifier is null) - { - await msg.AckAsync(cancellationToken: stoppingToken); continue; - } switch (evt.ProductIdentifier) { @@ -70,9 +47,9 @@ public class BroadcastEventHandler( { var awardEvt = JsonSerializer.Deserialize(msg.Data); if (awardEvt?.Meta == null) throw new ArgumentNullException(nameof(awardEvt)); - + var meta = awardEvt.Meta; - + logger.LogInformation("Handling post award order: {OrderId}", evt.OrderId); await using var scope = serviceProvider.CreateAsyncScope(); @@ -84,45 +61,29 @@ public class BroadcastEventHandler( logger.LogInformation("Post award for order {OrderId} handled successfully.", evt.OrderId); - await msg.AckAsync(cancellationToken: stoppingToken); break; } default: - // Not for us, acknowledge and ignore. - await msg.AckAsync(cancellationToken: stoppingToken); + await nats.PublishAsync(PaymentOrderEventBase.Type, msg.Data, cancellationToken: stoppingToken); break; } } catch (Exception ex) { - logger.LogError(ex, "Error processing payment order event for order {OrderId}, will retry.", - evt?.OrderId); - await msg.NakAsync(delay: TimeSpan.FromSeconds(30), cancellationToken: stoppingToken); + logger.LogError(ex, "Error processing payment order event for order {OrderId}", evt?.OrderId); + await nats.PublishAsync(PaymentOrderEventBase.Type, msg.Data, cancellationToken: stoppingToken); } } - } - private async Task ProcessAccountDeletionsAsync(CancellationToken stoppingToken) - { - var js = new NatsJSContext(nats); - var stream = await js.GetStreamAsync(AccountDeletedEvent.Type, cancellationToken: stoppingToken); - var consumer = - await stream.CreateOrUpdateConsumerAsync(new ConsumerConfig("Dy_Sphere_AccountDeleted"), - cancellationToken: stoppingToken); - - await foreach (var msg in consumer.ConsumeAsync(cancellationToken: stoppingToken)) + await foreach (var msg in nats.SubscribeAsync(AccountDeletedEvent.Type, + cancellationToken: stoppingToken)) { - AccountDeletedEvent? evt = null; try { - evt = JsonSerializer.Deserialize(msg.Data); - if (evt == null) - { - await msg.AckAsync(cancellationToken: stoppingToken); - continue; - } + var evt = JsonSerializer.Deserialize(msg.Data); + if (evt == null) continue; - logger.LogInformation("Processing account deletion for: {AccountId}", evt.AccountId); + logger.LogInformation("Account deleted: {AccountId}", evt.AccountId); using var scope = serviceProvider.CreateScope(); var db = scope.ServiceProvider.GetRequiredService(); @@ -142,41 +103,28 @@ public class BroadcastEventHandler( .Where(p => p.Members.All(m => m.AccountId == evt.AccountId)) .ToListAsync(cancellationToken: stoppingToken); - if (publishers.Any()) - { - foreach (var publisher in publishers) - { - await db.Posts - .Where(p => p.PublisherId == publisher.Id) - .ExecuteDeleteAsync(cancellationToken: stoppingToken); - } - - var publisherIds = publishers.Select(p => p.Id).ToList(); - await db.Publishers - .Where(p => publisherIds.Contains(p.Id)) + foreach (var publisher in publishers) + await db.Posts + .Where(p => p.PublisherId == publisher.Id) .ExecuteDeleteAsync(cancellationToken: stoppingToken); - } + + var publisherIds = publishers.Select(p => p.Id).ToList(); + await db.Publishers + .Where(p => publisherIds.Contains(p.Id)) + .ExecuteDeleteAsync(cancellationToken: stoppingToken); await transaction.CommitAsync(cancellationToken: stoppingToken); - await msg.AckAsync(cancellationToken: stoppingToken); - logger.LogInformation("Account deletion for {AccountId} processed successfully in Sphere.", - evt.AccountId); } - catch (Exception ex) + catch (Exception) { - logger.LogError(ex, - "Error during transaction for account deletion {AccountId} in Sphere, rolling back.", - evt.AccountId); - await transaction.RollbackAsync(CancellationToken.None); + await transaction.RollbackAsync(cancellationToken: stoppingToken); throw; } } catch (Exception ex) { - logger.LogError(ex, "Failed to process account deletion for {AccountId} in Sphere, will retry.", - evt?.AccountId); - await msg.NakAsync(delay: TimeSpan.FromSeconds(30), cancellationToken: stoppingToken); + logger.LogError(ex, "Error processing AccountDeleted"); } } } -} \ No newline at end of file +}