diff --git a/DysonNetwork.Develop/DysonNetwork.Develop.csproj b/DysonNetwork.Develop/DysonNetwork.Develop.csproj index 1577bf3..d3c65ae 100644 --- a/DysonNetwork.Develop/DysonNetwork.Develop.csproj +++ b/DysonNetwork.Develop/DysonNetwork.Develop.csproj @@ -14,6 +14,8 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive + + diff --git a/DysonNetwork.Drive/DysonNetwork.Drive.csproj b/DysonNetwork.Drive/DysonNetwork.Drive.csproj index 6e54599..1f45677 100644 --- a/DysonNetwork.Drive/DysonNetwork.Drive.csproj +++ b/DysonNetwork.Drive/DysonNetwork.Drive.csproj @@ -22,6 +22,8 @@ runtime; build; native; contentfiles; analyzers; buildtransitive + + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/DysonNetwork.Drive/Startup/BroadcastEventHandler.cs b/DysonNetwork.Drive/Startup/BroadcastEventHandler.cs index 265f2e7..98667d6 100644 --- a/DysonNetwork.Drive/Startup/BroadcastEventHandler.cs +++ b/DysonNetwork.Drive/Startup/BroadcastEventHandler.cs @@ -3,6 +3,8 @@ using DysonNetwork.Drive.Storage; using DysonNetwork.Shared.Stream; using Microsoft.EntityFrameworkCore; using NATS.Client.Core; +using NATS.Client.JetStream; +using NATS.Client.JetStream.Models; namespace DysonNetwork.Drive.Startup; @@ -14,14 +16,23 @@ public class BroadcastEventHandler( { protected override async Task ExecuteAsync(CancellationToken stoppingToken) { - await foreach (var msg in nats.SubscribeAsync(AccountDeletedEvent.Type, cancellationToken: stoppingToken)) + var js = new NatsJSContext(nats); + var stream = await js.GetStreamAsync(AccountDeletedEvent.Type, cancellationToken: stoppingToken); + var consumer = await stream.CreateOrUpdateConsumerAsync(new ConsumerConfig("DysonNetwork_Drive_AccountDeleted"), cancellationToken: stoppingToken); + + await foreach (var msg in consumer.ConsumeAsync(cancellationToken: stoppingToken)) { + AccountDeletedEvent? evt = null; try { - var evt = JsonSerializer.Deserialize(msg.Data); - if (evt == null) continue; + evt = JsonSerializer.Deserialize(msg.Data); + if (evt == null) + { + await msg.AckAsync(cancellationToken: stoppingToken); + continue; + } - logger.LogInformation("Account deleted: {AccountId}", evt.AccountId); + logger.LogInformation("Processing account deletion for: {AccountId}", evt.AccountId); using var scope = serviceProvider.CreateScope(); var fs = scope.ServiceProvider.GetRequiredService(); @@ -34,23 +45,31 @@ public class BroadcastEventHandler( .Where(p => p.AccountId == evt.AccountId) .ToListAsync(cancellationToken: stoppingToken); - await fs.DeleteFileDataBatchAsync(files); - await db.Files - .Where(p => p.AccountId == evt.AccountId) - .ExecuteDeleteAsync(cancellationToken: stoppingToken); + if (files.Any()) + { + await fs.DeleteFileDataBatchAsync(files); + await db.Files + .Where(p => p.AccountId == evt.AccountId) + .ExecuteDeleteAsync(cancellationToken: stoppingToken); + } await transaction.CommitAsync(cancellationToken: stoppingToken); + + await msg.AckAsync(cancellationToken: stoppingToken); + logger.LogInformation("Account deletion for {AccountId} processed successfully.", evt.AccountId); } - catch (Exception) + catch (Exception ex) { - await transaction.RollbackAsync(cancellationToken: stoppingToken); - throw; + logger.LogError(ex, "Error during transaction for account deletion {AccountId}, rolling back.", evt.AccountId); + await transaction.RollbackAsync(CancellationToken.None); + throw; // Let outer catch handle Nak } } catch (Exception ex) { - logger.LogError(ex, "Error processing AccountDeleted"); + logger.LogError(ex, "Failed to process account deletion for {AccountId}, will retry.", evt?.AccountId); + await msg.NakAsync(delay: TimeSpan.FromSeconds(30), cancellationToken: stoppingToken); } } } -} \ No newline at end of file +} diff --git a/DysonNetwork.Gateway/DysonNetwork.Gateway.csproj b/DysonNetwork.Gateway/DysonNetwork.Gateway.csproj index 7403f3c..ca65c4c 100644 --- a/DysonNetwork.Gateway/DysonNetwork.Gateway.csproj +++ b/DysonNetwork.Gateway/DysonNetwork.Gateway.csproj @@ -9,6 +9,8 @@ + + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/DysonNetwork.Pass/DysonNetwork.Pass.csproj b/DysonNetwork.Pass/DysonNetwork.Pass.csproj index ca2a376..ad47d11 100644 --- a/DysonNetwork.Pass/DysonNetwork.Pass.csproj +++ b/DysonNetwork.Pass/DysonNetwork.Pass.csproj @@ -14,7 +14,8 @@ all - + + runtime; build; native; contentfiles; analyzers; buildtransitive all diff --git a/DysonNetwork.Pass/Startup/BroadcastEventHandler.cs b/DysonNetwork.Pass/Startup/BroadcastEventHandler.cs index a885809..060c9f8 100644 --- a/DysonNetwork.Pass/Startup/BroadcastEventHandler.cs +++ b/DysonNetwork.Pass/Startup/BroadcastEventHandler.cs @@ -2,6 +2,8 @@ using System.Text.Json; using DysonNetwork.Pass.Wallet; using DysonNetwork.Shared.Stream; using NATS.Client.Core; +using NATS.Client.JetStream; +using NATS.Client.JetStream.Models; namespace DysonNetwork.Pass.Startup; @@ -13,7 +15,11 @@ public class BroadcastEventHandler( { protected override async Task ExecuteAsync(CancellationToken stoppingToken) { - await foreach (var msg in nats.SubscribeAsync(PaymentOrderEventBase.Type, cancellationToken: stoppingToken)) + var js = new NatsJSContext(nats); + var stream = await js.GetStreamAsync(PaymentOrderEventBase.Type, cancellationToken: stoppingToken); + var consumer = await stream.CreateOrUpdateConsumerAsync(new ConsumerConfig("DysonNetwork_Pass_Stellar"), cancellationToken: stoppingToken); + + await foreach (var msg in consumer.ConsumeAsync(cancellationToken: stoppingToken)) { PaymentOrderEvent? evt = null; try @@ -23,6 +29,7 @@ public class BroadcastEventHandler( if (evt?.ProductIdentifier is null || !evt.ProductIdentifier.StartsWith(SubscriptionType.StellarProgram)) { + await msg.AckAsync(cancellationToken: stoppingToken); continue; } @@ -39,19 +46,20 @@ public class BroadcastEventHandler( if (order is null) { logger.LogWarning("Order with ID {OrderId} not found.", evt.OrderId); - await nats.PublishAsync(PaymentOrderEventBase.Type, msg.Data, cancellationToken: stoppingToken); + await msg.NakAsync(delay: TimeSpan.FromSeconds(30), cancellationToken: stoppingToken); continue; } await subscriptions.HandleSubscriptionOrder(order); logger.LogInformation("Subscription for order {OrderId} handled successfully.", evt.OrderId); + await msg.AckAsync(cancellationToken: stoppingToken); } catch (Exception ex) { logger.LogError(ex, "Error processing payment order event for order {OrderId}", evt?.OrderId); - await nats.PublishAsync(PaymentOrderEventBase.Type, msg.Data, cancellationToken: stoppingToken); + await msg.NakAsync(delay: TimeSpan.FromSeconds(30), cancellationToken: stoppingToken); } } } -} \ No newline at end of file +} diff --git a/DysonNetwork.Pusher/DysonNetwork.Pusher.csproj b/DysonNetwork.Pusher/DysonNetwork.Pusher.csproj index 49445bb..76416a5 100644 --- a/DysonNetwork.Pusher/DysonNetwork.Pusher.csproj +++ b/DysonNetwork.Pusher/DysonNetwork.Pusher.csproj @@ -19,6 +19,8 @@ runtime; build; native; contentfiles; analyzers; buildtransitive all + + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/DysonNetwork.Shared/DysonNetwork.Shared.csproj b/DysonNetwork.Shared/DysonNetwork.Shared.csproj index f84626e..cfaf3de 100644 --- a/DysonNetwork.Shared/DysonNetwork.Shared.csproj +++ b/DysonNetwork.Shared/DysonNetwork.Shared.csproj @@ -21,7 +21,8 @@ - + + diff --git a/DysonNetwork.Sphere/DysonNetwork.Sphere.csproj b/DysonNetwork.Sphere/DysonNetwork.Sphere.csproj index b30ee12..69f5b71 100644 --- a/DysonNetwork.Sphere/DysonNetwork.Sphere.csproj +++ b/DysonNetwork.Sphere/DysonNetwork.Sphere.csproj @@ -30,7 +30,8 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive - + + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/DysonNetwork.Sphere/Startup/BroadcastEventHandler.cs b/DysonNetwork.Sphere/Startup/BroadcastEventHandler.cs index a74e11f..be3eb20 100644 --- a/DysonNetwork.Sphere/Startup/BroadcastEventHandler.cs +++ b/DysonNetwork.Sphere/Startup/BroadcastEventHandler.cs @@ -4,6 +4,8 @@ using DysonNetwork.Shared.Stream; using DysonNetwork.Sphere.Post; using Microsoft.EntityFrameworkCore; using NATS.Client.Core; +using NATS.Client.JetStream; +using NATS.Client.JetStream.Models; namespace DysonNetwork.Sphere.Startup; @@ -29,17 +31,38 @@ public class BroadcastEventHandler( { protected override async Task ExecuteAsync(CancellationToken stoppingToken) { - await foreach (var msg in nats.SubscribeAsync(PaymentOrderEventBase.Type, cancellationToken: stoppingToken)) + try + { + var paymentTask = ProcessPaymentOrdersAsync(stoppingToken); + var accountTask = ProcessAccountDeletionsAsync(stoppingToken); + + await Task.WhenAll(paymentTask, accountTask); + } + catch (Exception ex) + { + logger.LogError(ex, "BroadcastEventHandler stopped due to an unhandled exception."); + } + } + + private async Task ProcessPaymentOrdersAsync(CancellationToken stoppingToken) + { + var js = new NatsJSContext(nats); + var stream = await js.GetStreamAsync(PaymentOrderEventBase.Type, cancellationToken: stoppingToken); + var consumer = await stream.CreateOrUpdateConsumerAsync(new ConsumerConfig("DysonNetwork_Sphere_PaymentOrder"), + cancellationToken: stoppingToken); + + await foreach (var msg in consumer.ConsumeAsync(cancellationToken: stoppingToken)) { PaymentOrderEvent? evt = null; try { evt = JsonSerializer.Deserialize(msg.Data); - // Every order goes into the MQ is already paid, so we skipped the status validation - if (evt?.ProductIdentifier is null) + { + await msg.AckAsync(cancellationToken: stoppingToken); continue; + } switch (evt.ProductIdentifier) { @@ -47,9 +70,9 @@ public class BroadcastEventHandler( { var awardEvt = JsonSerializer.Deserialize(msg.Data); if (awardEvt?.Meta == null) throw new ArgumentNullException(nameof(awardEvt)); - + var meta = awardEvt.Meta; - + logger.LogInformation("Handling post award order: {OrderId}", evt.OrderId); await using var scope = serviceProvider.CreateAsyncScope(); @@ -61,29 +84,45 @@ public class BroadcastEventHandler( logger.LogInformation("Post award for order {OrderId} handled successfully.", evt.OrderId); + await msg.AckAsync(cancellationToken: stoppingToken); break; } default: - await nats.PublishAsync(PaymentOrderEventBase.Type, msg.Data, cancellationToken: stoppingToken); + // Not for us, acknowledge and ignore. + await msg.AckAsync(cancellationToken: stoppingToken); break; } } catch (Exception ex) { - logger.LogError(ex, "Error processing payment order event for order {OrderId}", evt?.OrderId); - await nats.PublishAsync(PaymentOrderEventBase.Type, msg.Data, cancellationToken: stoppingToken); + logger.LogError(ex, "Error processing payment order event for order {OrderId}, will retry.", + evt?.OrderId); + await msg.NakAsync(delay: TimeSpan.FromSeconds(30), cancellationToken: stoppingToken); } } + } - await foreach (var msg in nats.SubscribeAsync(AccountDeletedEvent.Type, - cancellationToken: stoppingToken)) + private async Task ProcessAccountDeletionsAsync(CancellationToken stoppingToken) + { + var js = new NatsJSContext(nats); + var stream = await js.GetStreamAsync(AccountDeletedEvent.Type, cancellationToken: stoppingToken); + var consumer = + await stream.CreateOrUpdateConsumerAsync(new ConsumerConfig("DysonNetwork_Sphere_AccountDeleted"), + cancellationToken: stoppingToken); + + await foreach (var msg in consumer.ConsumeAsync(cancellationToken: stoppingToken)) { + AccountDeletedEvent? evt = null; try { - var evt = JsonSerializer.Deserialize(msg.Data); - if (evt == null) continue; + evt = JsonSerializer.Deserialize(msg.Data); + if (evt == null) + { + await msg.AckAsync(cancellationToken: stoppingToken); + continue; + } - logger.LogInformation("Account deleted: {AccountId}", evt.AccountId); + logger.LogInformation("Processing account deletion for: {AccountId}", evt.AccountId); using var scope = serviceProvider.CreateScope(); var db = scope.ServiceProvider.GetRequiredService(); @@ -103,27 +142,40 @@ public class BroadcastEventHandler( .Where(p => p.Members.All(m => m.AccountId == evt.AccountId)) .ToListAsync(cancellationToken: stoppingToken); - foreach (var publisher in publishers) - await db.Posts - .Where(p => p.PublisherId == publisher.Id) - .ExecuteDeleteAsync(cancellationToken: stoppingToken); + if (publishers.Any()) + { + foreach (var publisher in publishers) + { + await db.Posts + .Where(p => p.PublisherId == publisher.Id) + .ExecuteDeleteAsync(cancellationToken: stoppingToken); + } - var publisherIds = publishers.Select(p => p.Id).ToList(); - await db.Publishers - .Where(p => publisherIds.Contains(p.Id)) - .ExecuteDeleteAsync(cancellationToken: stoppingToken); + var publisherIds = publishers.Select(p => p.Id).ToList(); + await db.Publishers + .Where(p => publisherIds.Contains(p.Id)) + .ExecuteDeleteAsync(cancellationToken: stoppingToken); + } await transaction.CommitAsync(cancellationToken: stoppingToken); + await msg.AckAsync(cancellationToken: stoppingToken); + logger.LogInformation("Account deletion for {AccountId} processed successfully in Sphere.", + evt.AccountId); } - catch (Exception) + catch (Exception ex) { - await transaction.RollbackAsync(cancellationToken: stoppingToken); + logger.LogError(ex, + "Error during transaction for account deletion {AccountId} in Sphere, rolling back.", + evt.AccountId); + await transaction.RollbackAsync(CancellationToken.None); throw; } } catch (Exception ex) { - logger.LogError(ex, "Error processing AccountDeleted"); + logger.LogError(ex, "Failed to process account deletion for {AccountId} in Sphere, will retry.", + evt?.AccountId); + await msg.NakAsync(delay: TimeSpan.FromSeconds(30), cancellationToken: stoppingToken); } } }