♻️ Use jetstream to handle events broadcast
This commit is contained in:
		| @@ -14,6 +14,8 @@ | ||||
|             <PrivateAssets>all</PrivateAssets> | ||||
|             <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> | ||||
|         </PackageReference> | ||||
|         <PackageReference Include="NATS.Client.Core" Version="2.6.8" /> | ||||
|         <PackageReference Include="NATS.Client.JetStream" Version="2.6.8" /> | ||||
|         <PackageReference Include="NodaTime.Serialization.Protobuf" Version="2.0.2" /> | ||||
|         <PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="9.0.4"/> | ||||
|         <PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL.NodaTime" Version="9.0.4" /> | ||||
|   | ||||
| @@ -22,6 +22,8 @@ | ||||
|           <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> | ||||
|         </PackageReference> | ||||
|         <PackageReference Include="Minio" Version="6.0.5" /> | ||||
|         <PackageReference Include="NATS.Client.Core" Version="2.6.8" /> | ||||
|         <PackageReference Include="NATS.Client.JetStream" Version="2.6.8" /> | ||||
|         <PackageReference Include="Nerdbank.GitVersioning" Version="3.7.115"> | ||||
|           <PrivateAssets>all</PrivateAssets> | ||||
|           <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> | ||||
|   | ||||
| @@ -3,6 +3,8 @@ using DysonNetwork.Drive.Storage; | ||||
| using DysonNetwork.Shared.Stream; | ||||
| using Microsoft.EntityFrameworkCore; | ||||
| using NATS.Client.Core; | ||||
| using NATS.Client.JetStream; | ||||
| using NATS.Client.JetStream.Models; | ||||
|  | ||||
| namespace DysonNetwork.Drive.Startup; | ||||
|  | ||||
| @@ -14,14 +16,23 @@ public class BroadcastEventHandler( | ||||
| { | ||||
|     protected override async Task ExecuteAsync(CancellationToken stoppingToken) | ||||
|     { | ||||
|         await foreach (var msg in nats.SubscribeAsync<byte[]>(AccountDeletedEvent.Type, cancellationToken: stoppingToken)) | ||||
|         var js = new NatsJSContext(nats); | ||||
|         var stream = await js.GetStreamAsync(AccountDeletedEvent.Type, cancellationToken: stoppingToken); | ||||
|         var consumer = await stream.CreateOrUpdateConsumerAsync(new ConsumerConfig("DysonNetwork_Drive_AccountDeleted"), cancellationToken: stoppingToken); | ||||
|  | ||||
|         await foreach (var msg in consumer.ConsumeAsync<byte[]>(cancellationToken: stoppingToken)) | ||||
|         { | ||||
|             AccountDeletedEvent? evt = null; | ||||
|             try | ||||
|             { | ||||
|                 var evt = JsonSerializer.Deserialize<AccountDeletedEvent>(msg.Data); | ||||
|                 if (evt == null) continue; | ||||
|                 evt = JsonSerializer.Deserialize<AccountDeletedEvent>(msg.Data); | ||||
|                 if (evt == null) | ||||
|                 { | ||||
|                     await msg.AckAsync(cancellationToken: stoppingToken); | ||||
|                     continue; | ||||
|                 } | ||||
|  | ||||
|                 logger.LogInformation("Account deleted: {AccountId}", evt.AccountId); | ||||
|                 logger.LogInformation("Processing account deletion for: {AccountId}", evt.AccountId); | ||||
|  | ||||
|                 using var scope = serviceProvider.CreateScope(); | ||||
|                 var fs = scope.ServiceProvider.GetRequiredService<FileService>(); | ||||
| @@ -34,22 +45,30 @@ public class BroadcastEventHandler( | ||||
|                         .Where(p => p.AccountId == evt.AccountId) | ||||
|                         .ToListAsync(cancellationToken: stoppingToken); | ||||
|  | ||||
|                     await fs.DeleteFileDataBatchAsync(files); | ||||
|                     await db.Files | ||||
|                         .Where(p => p.AccountId == evt.AccountId) | ||||
|                         .ExecuteDeleteAsync(cancellationToken: stoppingToken); | ||||
|                     if (files.Any()) | ||||
|                     { | ||||
|                         await fs.DeleteFileDataBatchAsync(files); | ||||
|                         await db.Files | ||||
|                             .Where(p => p.AccountId == evt.AccountId) | ||||
|                             .ExecuteDeleteAsync(cancellationToken: stoppingToken); | ||||
|                     } | ||||
|  | ||||
|                     await transaction.CommitAsync(cancellationToken: stoppingToken); | ||||
|  | ||||
|                     await msg.AckAsync(cancellationToken: stoppingToken); | ||||
|                     logger.LogInformation("Account deletion for {AccountId} processed successfully.", evt.AccountId); | ||||
|                 } | ||||
|                 catch (Exception) | ||||
|                 catch (Exception ex) | ||||
|                 { | ||||
|                     await transaction.RollbackAsync(cancellationToken: stoppingToken); | ||||
|                     throw; | ||||
|                     logger.LogError(ex, "Error during transaction for account deletion {AccountId}, rolling back.", evt.AccountId); | ||||
|                     await transaction.RollbackAsync(CancellationToken.None); | ||||
|                     throw; // Let outer catch handle Nak | ||||
|                 } | ||||
|             } | ||||
|             catch (Exception ex) | ||||
|             { | ||||
|                 logger.LogError(ex, "Error processing AccountDeleted"); | ||||
|                 logger.LogError(ex, "Failed to process account deletion for {AccountId}, will retry.", evt?.AccountId); | ||||
|                 await msg.NakAsync(delay: TimeSpan.FromSeconds(30), cancellationToken: stoppingToken); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|   | ||||
| @@ -9,6 +9,8 @@ | ||||
|   <ItemGroup> | ||||
|     <PackageReference Include="dotnet-etcd" Version="8.0.1" /> | ||||
|     <PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="9.0.7" /> | ||||
|     <PackageReference Include="NATS.Client.Core" Version="2.6.8" /> | ||||
|     <PackageReference Include="NATS.Client.JetStream" Version="2.6.8" /> | ||||
|     <PackageReference Include="Nerdbank.GitVersioning" Version="3.7.115"> | ||||
|       <PrivateAssets>all</PrivateAssets> | ||||
|       <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> | ||||
|   | ||||
| @@ -14,7 +14,8 @@ | ||||
|             <PrivateAssets>all</PrivateAssets> | ||||
|         </PackageReference> | ||||
|         <PackageReference Include="Nager.Holiday" Version="1.0.1" /> | ||||
|         <PackageReference Include="NATS.Client.Core" Version="2.6.6" /> | ||||
|         <PackageReference Include="NATS.Client.Core" Version="2.6.8" /> | ||||
|         <PackageReference Include="NATS.Client.JetStream" Version="2.6.8" /> | ||||
|         <PackageReference Include="Nerdbank.GitVersioning" Version="3.7.115"> | ||||
|             <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> | ||||
|             <PrivateAssets>all</PrivateAssets> | ||||
|   | ||||
| @@ -2,6 +2,8 @@ using System.Text.Json; | ||||
| using DysonNetwork.Pass.Wallet; | ||||
| using DysonNetwork.Shared.Stream; | ||||
| using NATS.Client.Core; | ||||
| using NATS.Client.JetStream; | ||||
| using NATS.Client.JetStream.Models; | ||||
|  | ||||
| namespace DysonNetwork.Pass.Startup; | ||||
|  | ||||
| @@ -13,7 +15,11 @@ public class BroadcastEventHandler( | ||||
| { | ||||
|     protected override async Task ExecuteAsync(CancellationToken stoppingToken) | ||||
|     { | ||||
|         await foreach (var msg in nats.SubscribeAsync<byte[]>(PaymentOrderEventBase.Type, cancellationToken: stoppingToken)) | ||||
|         var js = new NatsJSContext(nats); | ||||
|         var stream = await js.GetStreamAsync(PaymentOrderEventBase.Type, cancellationToken: stoppingToken); | ||||
|         var consumer = await stream.CreateOrUpdateConsumerAsync(new ConsumerConfig("DysonNetwork_Pass_Stellar"), cancellationToken: stoppingToken); | ||||
|  | ||||
|         await foreach (var msg in consumer.ConsumeAsync<byte[]>(cancellationToken: stoppingToken)) | ||||
|         { | ||||
|             PaymentOrderEvent? evt = null; | ||||
|             try | ||||
| @@ -23,6 +29,7 @@ public class BroadcastEventHandler( | ||||
|                 if (evt?.ProductIdentifier is null || | ||||
|                     !evt.ProductIdentifier.StartsWith(SubscriptionType.StellarProgram)) | ||||
|                 { | ||||
|                     await msg.AckAsync(cancellationToken: stoppingToken); | ||||
|                     continue; | ||||
|                 } | ||||
|  | ||||
| @@ -39,18 +46,19 @@ public class BroadcastEventHandler( | ||||
|                 if (order is null) | ||||
|                 { | ||||
|                     logger.LogWarning("Order with ID {OrderId} not found.", evt.OrderId); | ||||
|                     await nats.PublishAsync(PaymentOrderEventBase.Type, msg.Data, cancellationToken: stoppingToken); | ||||
|                     await msg.NakAsync(delay: TimeSpan.FromSeconds(30), cancellationToken: stoppingToken); | ||||
|                     continue; | ||||
|                 } | ||||
|  | ||||
|                 await subscriptions.HandleSubscriptionOrder(order); | ||||
|  | ||||
|                 logger.LogInformation("Subscription for order {OrderId} handled successfully.", evt.OrderId); | ||||
|                 await msg.AckAsync(cancellationToken: stoppingToken); | ||||
|             } | ||||
|             catch (Exception ex) | ||||
|             { | ||||
|                 logger.LogError(ex, "Error processing payment order event for order {OrderId}", evt?.OrderId); | ||||
|                 await nats.PublishAsync(PaymentOrderEventBase.Type, msg.Data, cancellationToken: stoppingToken); | ||||
|                 await msg.NakAsync(delay: TimeSpan.FromSeconds(30), cancellationToken: stoppingToken); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|   | ||||
| @@ -19,6 +19,8 @@ | ||||
|           <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> | ||||
|           <PrivateAssets>all</PrivateAssets> | ||||
|         </PackageReference> | ||||
|         <PackageReference Include="NATS.Client.Core" Version="2.6.8" /> | ||||
|         <PackageReference Include="NATS.Client.JetStream" Version="2.6.8" /> | ||||
|         <PackageReference Include="Nerdbank.GitVersioning" Version="3.7.115"> | ||||
|           <PrivateAssets>all</PrivateAssets> | ||||
|           <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> | ||||
|   | ||||
| @@ -21,7 +21,8 @@ | ||||
|         <PackageReference Include="Microsoft.AspNetCore.Authentication" Version="2.3.0" /> | ||||
|         <PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="9.0.7" /> | ||||
|         <PackageReference Include="Microsoft.Extensions.Hosting" Version="9.0.7" /> | ||||
|         <PackageReference Include="NATS.Client.Core" Version="2.6.6" /> | ||||
|         <PackageReference Include="NATS.Client.Core" Version="2.6.8" /> | ||||
|         <PackageReference Include="NATS.Client.JetStream" Version="2.6.8" /> | ||||
|         <PackageReference Include="NodaTime" Version="3.2.2" /> | ||||
|         <PackageReference Include="NodaTime.Serialization.JsonNet" Version="3.2.0" /> | ||||
|         <PackageReference Include="NodaTime.Serialization.Protobuf" Version="2.0.2" /> | ||||
|   | ||||
| @@ -30,7 +30,8 @@ | ||||
|             <PrivateAssets>all</PrivateAssets> | ||||
|             <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> | ||||
|         </PackageReference> | ||||
|         <PackageReference Include="NATS.Client.Core" Version="2.6.6" /> | ||||
|         <PackageReference Include="NATS.Client.Core" Version="2.6.8" /> | ||||
|         <PackageReference Include="NATS.Client.JetStream" Version="2.6.8" /> | ||||
|         <PackageReference Include="Nerdbank.GitVersioning" Version="3.7.115"> | ||||
|             <PrivateAssets>all</PrivateAssets> | ||||
|             <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> | ||||
|   | ||||
| @@ -4,6 +4,8 @@ using DysonNetwork.Shared.Stream; | ||||
| using DysonNetwork.Sphere.Post; | ||||
| using Microsoft.EntityFrameworkCore; | ||||
| using NATS.Client.Core; | ||||
| using NATS.Client.JetStream; | ||||
| using NATS.Client.JetStream.Models; | ||||
|  | ||||
| namespace DysonNetwork.Sphere.Startup; | ||||
|  | ||||
| @@ -29,17 +31,38 @@ public class BroadcastEventHandler( | ||||
| { | ||||
|     protected override async Task ExecuteAsync(CancellationToken stoppingToken) | ||||
|     { | ||||
|         await foreach (var msg in nats.SubscribeAsync<byte[]>(PaymentOrderEventBase.Type, cancellationToken: stoppingToken)) | ||||
|         try | ||||
|         { | ||||
|             var paymentTask = ProcessPaymentOrdersAsync(stoppingToken); | ||||
|             var accountTask = ProcessAccountDeletionsAsync(stoppingToken); | ||||
|  | ||||
|             await Task.WhenAll(paymentTask, accountTask); | ||||
|         } | ||||
|         catch (Exception ex) | ||||
|         { | ||||
|             logger.LogError(ex, "BroadcastEventHandler stopped due to an unhandled exception."); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     private async Task ProcessPaymentOrdersAsync(CancellationToken stoppingToken) | ||||
|     { | ||||
|         var js = new NatsJSContext(nats); | ||||
|         var stream = await js.GetStreamAsync(PaymentOrderEventBase.Type, cancellationToken: stoppingToken); | ||||
|         var consumer = await stream.CreateOrUpdateConsumerAsync(new ConsumerConfig("DysonNetwork_Sphere_PaymentOrder"), | ||||
|             cancellationToken: stoppingToken); | ||||
|  | ||||
|         await foreach (var msg in consumer.ConsumeAsync<byte[]>(cancellationToken: stoppingToken)) | ||||
|         { | ||||
|             PaymentOrderEvent? evt = null; | ||||
|             try | ||||
|             { | ||||
|                 evt = JsonSerializer.Deserialize<PaymentOrderEvent>(msg.Data); | ||||
|  | ||||
|                 // Every order goes into the MQ is already paid, so we skipped the status validation | ||||
|  | ||||
|                 if (evt?.ProductIdentifier is null) | ||||
|                 { | ||||
|                     await msg.AckAsync(cancellationToken: stoppingToken); | ||||
|                     continue; | ||||
|                 } | ||||
|  | ||||
|                 switch (evt.ProductIdentifier) | ||||
|                 { | ||||
| @@ -61,29 +84,45 @@ public class BroadcastEventHandler( | ||||
|  | ||||
|                         logger.LogInformation("Post award for order {OrderId} handled successfully.", evt.OrderId); | ||||
|  | ||||
|                         await msg.AckAsync(cancellationToken: stoppingToken); | ||||
|                         break; | ||||
|                     } | ||||
|                     default: | ||||
|                         await nats.PublishAsync(PaymentOrderEventBase.Type, msg.Data, cancellationToken: stoppingToken); | ||||
|                         // Not for us, acknowledge and ignore. | ||||
|                         await msg.AckAsync(cancellationToken: stoppingToken); | ||||
|                         break; | ||||
|                 } | ||||
|             } | ||||
|             catch (Exception ex) | ||||
|             { | ||||
|                 logger.LogError(ex, "Error processing payment order event for order {OrderId}", evt?.OrderId); | ||||
|                 await nats.PublishAsync(PaymentOrderEventBase.Type, msg.Data, cancellationToken: stoppingToken); | ||||
|                 logger.LogError(ex, "Error processing payment order event for order {OrderId}, will retry.", | ||||
|                     evt?.OrderId); | ||||
|                 await msg.NakAsync(delay: TimeSpan.FromSeconds(30), cancellationToken: stoppingToken); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|         await foreach (var msg in nats.SubscribeAsync<byte[]>(AccountDeletedEvent.Type, | ||||
|                            cancellationToken: stoppingToken)) | ||||
|     private async Task ProcessAccountDeletionsAsync(CancellationToken stoppingToken) | ||||
|     { | ||||
|         var js = new NatsJSContext(nats); | ||||
|         var stream = await js.GetStreamAsync(AccountDeletedEvent.Type, cancellationToken: stoppingToken); | ||||
|         var consumer = | ||||
|             await stream.CreateOrUpdateConsumerAsync(new ConsumerConfig("DysonNetwork_Sphere_AccountDeleted"), | ||||
|                 cancellationToken: stoppingToken); | ||||
|  | ||||
|         await foreach (var msg in consumer.ConsumeAsync<byte[]>(cancellationToken: stoppingToken)) | ||||
|         { | ||||
|             AccountDeletedEvent? evt = null; | ||||
|             try | ||||
|             { | ||||
|                 var evt = JsonSerializer.Deserialize<AccountDeletedEvent>(msg.Data); | ||||
|                 if (evt == null) continue; | ||||
|                 evt = JsonSerializer.Deserialize<AccountDeletedEvent>(msg.Data); | ||||
|                 if (evt == null) | ||||
|                 { | ||||
|                     await msg.AckAsync(cancellationToken: stoppingToken); | ||||
|                     continue; | ||||
|                 } | ||||
|  | ||||
|                 logger.LogInformation("Account deleted: {AccountId}", evt.AccountId); | ||||
|                 logger.LogInformation("Processing account deletion for: {AccountId}", evt.AccountId); | ||||
|  | ||||
|                 using var scope = serviceProvider.CreateScope(); | ||||
|                 var db = scope.ServiceProvider.GetRequiredService<AppDatabase>(); | ||||
| @@ -103,27 +142,40 @@ public class BroadcastEventHandler( | ||||
|                         .Where(p => p.Members.All(m => m.AccountId == evt.AccountId)) | ||||
|                         .ToListAsync(cancellationToken: stoppingToken); | ||||
|  | ||||
|                     foreach (var publisher in publishers) | ||||
|                         await db.Posts | ||||
|                             .Where(p => p.PublisherId == publisher.Id) | ||||
|                             .ExecuteDeleteAsync(cancellationToken: stoppingToken); | ||||
|                     if (publishers.Any()) | ||||
|                     { | ||||
|                         foreach (var publisher in publishers) | ||||
|                         { | ||||
|                             await db.Posts | ||||
|                                 .Where(p => p.PublisherId == publisher.Id) | ||||
|                                 .ExecuteDeleteAsync(cancellationToken: stoppingToken); | ||||
|                         } | ||||
|  | ||||
|                     var publisherIds = publishers.Select(p => p.Id).ToList(); | ||||
|                     await db.Publishers | ||||
|                         .Where(p => publisherIds.Contains(p.Id)) | ||||
|                         .ExecuteDeleteAsync(cancellationToken: stoppingToken); | ||||
|                         var publisherIds = publishers.Select(p => p.Id).ToList(); | ||||
|                         await db.Publishers | ||||
|                             .Where(p => publisherIds.Contains(p.Id)) | ||||
|                             .ExecuteDeleteAsync(cancellationToken: stoppingToken); | ||||
|                     } | ||||
|  | ||||
|                     await transaction.CommitAsync(cancellationToken: stoppingToken); | ||||
|                     await msg.AckAsync(cancellationToken: stoppingToken); | ||||
|                     logger.LogInformation("Account deletion for {AccountId} processed successfully in Sphere.", | ||||
|                         evt.AccountId); | ||||
|                 } | ||||
|                 catch (Exception) | ||||
|                 catch (Exception ex) | ||||
|                 { | ||||
|                     await transaction.RollbackAsync(cancellationToken: stoppingToken); | ||||
|                     logger.LogError(ex, | ||||
|                         "Error during transaction for account deletion {AccountId} in Sphere, rolling back.", | ||||
|                         evt.AccountId); | ||||
|                     await transaction.RollbackAsync(CancellationToken.None); | ||||
|                     throw; | ||||
|                 } | ||||
|             } | ||||
|             catch (Exception ex) | ||||
|             { | ||||
|                 logger.LogError(ex, "Error processing AccountDeleted"); | ||||
|                 logger.LogError(ex, "Failed to process account deletion for {AccountId} in Sphere, will retry.", | ||||
|                     evt?.AccountId); | ||||
|                 await msg.NakAsync(delay: TimeSpan.FromSeconds(30), cancellationToken: stoppingToken); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user