diff --git a/DysonNetwork.Develop/DysonNetwork.Develop.csproj b/DysonNetwork.Develop/DysonNetwork.Develop.csproj index d3c65ae..1577bf3 100644 --- a/DysonNetwork.Develop/DysonNetwork.Develop.csproj +++ b/DysonNetwork.Develop/DysonNetwork.Develop.csproj @@ -14,8 +14,6 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive - - diff --git a/DysonNetwork.Drive/DysonNetwork.Drive.csproj b/DysonNetwork.Drive/DysonNetwork.Drive.csproj index 1f45677..6e54599 100644 --- a/DysonNetwork.Drive/DysonNetwork.Drive.csproj +++ b/DysonNetwork.Drive/DysonNetwork.Drive.csproj @@ -22,8 +22,6 @@ runtime; build; native; contentfiles; analyzers; buildtransitive - - all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/DysonNetwork.Drive/Startup/BroadcastEventHandler.cs b/DysonNetwork.Drive/Startup/BroadcastEventHandler.cs index 9b0678a..9cd251d 100644 --- a/DysonNetwork.Drive/Startup/BroadcastEventHandler.cs +++ b/DysonNetwork.Drive/Startup/BroadcastEventHandler.cs @@ -3,6 +3,8 @@ using DysonNetwork.Drive.Storage; using DysonNetwork.Shared.Stream; using Microsoft.EntityFrameworkCore; using NATS.Client.Core; +using NATS.Client.JetStream.Models; +using NATS.Net; namespace DysonNetwork.Drive.Startup; @@ -14,12 +16,23 @@ public class BroadcastEventHandler( { protected override async Task ExecuteAsync(CancellationToken stoppingToken) { - await foreach (var msg in nats.SubscribeAsync(AccountDeletedEvent.Type, cancellationToken: stoppingToken)) + var js = nats.CreateJetStreamContext(); + + await js.EnsureStreamCreated("account_events", [AccountDeletedEvent.Type]); + + var consumer = await js.CreateOrUpdateConsumerAsync("account_events", + new ConsumerConfig("drive_account_deleted_handler"), cancellationToken: stoppingToken); + + await foreach (var msg in consumer.ConsumeAsync(cancellationToken: stoppingToken)) { try { var evt = JsonSerializer.Deserialize(msg.Data); - if (evt == null) continue; + if (evt == null) + { + await msg.AckAsync(cancellationToken: stoppingToken); + continue; + } logger.LogInformation("Account deleted: {AccountId}", evt.AccountId); @@ -46,11 +59,14 @@ public class BroadcastEventHandler( await transaction.RollbackAsync(cancellationToken: stoppingToken); throw; } + + await msg.AckAsync(cancellationToken: stoppingToken); } catch (Exception ex) { logger.LogError(ex, "Error processing AccountDeleted"); + await msg.NakAsync(cancellationToken: stoppingToken); } } } -} +} \ No newline at end of file diff --git a/DysonNetwork.Gateway/DysonNetwork.Gateway.csproj b/DysonNetwork.Gateway/DysonNetwork.Gateway.csproj index ca65c4c..7403f3c 100644 --- a/DysonNetwork.Gateway/DysonNetwork.Gateway.csproj +++ b/DysonNetwork.Gateway/DysonNetwork.Gateway.csproj @@ -9,8 +9,6 @@ - - all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/DysonNetwork.Pass/Account/AccountService.cs b/DysonNetwork.Pass/Account/AccountService.cs index 7889de5..a1b9fed 100644 --- a/DysonNetwork.Pass/Account/AccountService.cs +++ b/DysonNetwork.Pass/Account/AccountService.cs @@ -13,6 +13,8 @@ using EFCore.BulkExtensions; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Localization; using NATS.Client.Core; +using NATS.Client.JetStream; +using NATS.Net; using NodaTime; using OtpNet; using AuthService = DysonNetwork.Pass.Auth.AuthService; @@ -189,7 +191,8 @@ public class AccountService( ); } - public async Task CreateBotAccount(Account account, Guid automatedId, string? pictureId, string? backgroundId) + public async Task CreateBotAccount(Account account, Guid automatedId, string? pictureId, + string? backgroundId) { var dupeAutomateCount = await db.Accounts.Where(a => a.AutomatedId == automatedId).CountAsync(); if (dupeAutomateCount > 0) @@ -230,7 +233,7 @@ public class AccountService( ); account.Profile.Background = CloudFileReferenceObject.FromProtoValue(file); } - + db.Accounts.Add(account); await db.SaveChangesAsync(); @@ -442,7 +445,7 @@ public class AccountService( if (contact is null) { logger.LogWarning( - "Unable to send factor code to #{FactorId} with, due to no contact method was found...", + "Unable to send factor code to #{FactorId} with, due to no contact method was found...", factor.Id ); return; @@ -740,10 +743,14 @@ public class AccountService( db.Accounts.Remove(account); await db.SaveChangesAsync(); - await nats.PublishAsync(AccountDeletedEvent.Type, JsonSerializer.SerializeToUtf8Bytes(new AccountDeletedEvent - { - AccountId = account.Id, - DeletedAt = SystemClock.Instance.GetCurrentInstant() - })); + var js = nats.CreateJetStreamContext(); + await js.PublishAsync( + AccountDeletedEvent.Type, + GrpcTypeHelper.ConvertObjectToByteString(new AccountDeletedEvent + { + AccountId = account.Id, + DeletedAt = SystemClock.Instance.GetCurrentInstant() + }).ToByteArray() + ); } } \ No newline at end of file diff --git a/DysonNetwork.Pass/DysonNetwork.Pass.csproj b/DysonNetwork.Pass/DysonNetwork.Pass.csproj index ad47d11..c5e9687 100644 --- a/DysonNetwork.Pass/DysonNetwork.Pass.csproj +++ b/DysonNetwork.Pass/DysonNetwork.Pass.csproj @@ -14,8 +14,6 @@ all - - runtime; build; native; contentfiles; analyzers; buildtransitive all diff --git a/DysonNetwork.Pass/Startup/BroadcastEventHandler.cs b/DysonNetwork.Pass/Startup/BroadcastEventHandler.cs index 83017b5..c326ed1 100644 --- a/DysonNetwork.Pass/Startup/BroadcastEventHandler.cs +++ b/DysonNetwork.Pass/Startup/BroadcastEventHandler.cs @@ -1,7 +1,10 @@ using System.Text.Json; using DysonNetwork.Pass.Wallet; +using DysonNetwork.Shared.Proto; using DysonNetwork.Shared.Stream; using NATS.Client.Core; +using NATS.Client.JetStream.Models; +using NATS.Net; namespace DysonNetwork.Pass.Startup; @@ -13,18 +16,30 @@ public class BroadcastEventHandler( { protected override async Task ExecuteAsync(CancellationToken stoppingToken) { - await foreach (var msg in nats.SubscribeAsync(PaymentOrderEventBase.Type, cancellationToken: stoppingToken)) + var js = nats.CreateJetStreamContext(); + + await js.EnsureStreamCreated("payment_events", [PaymentOrderEventBase.Type]); + + var consumer = await js.CreateOrUpdateConsumerAsync("payment_events", + new ConsumerConfig("pass_payment_handler"), + cancellationToken: stoppingToken); + + await foreach (var msg in consumer.ConsumeAsync(cancellationToken: stoppingToken)) { PaymentOrderEvent? evt = null; try { - evt = JsonSerializer.Deserialize(msg.Data); + evt = JsonSerializer.Deserialize(msg.Data, GrpcTypeHelper.SerializerOptions); + + logger.LogInformation( + "Received order event: {ProductIdentifier} {OrderId}", + evt?.ProductIdentifier, + evt?.OrderId + ); if (evt?.ProductIdentifier is null || !evt.ProductIdentifier.StartsWith(SubscriptionType.StellarProgram)) - { continue; - } logger.LogInformation("Handling stellar program order: {OrderId}", evt.OrderId); @@ -38,19 +53,20 @@ public class BroadcastEventHandler( ); if (order is null) { - logger.LogWarning("Order with ID {OrderId} not found.", evt.OrderId); - await nats.PublishAsync(PaymentOrderEventBase.Type, msg.Data, cancellationToken: stoppingToken); + logger.LogWarning("Order with ID {OrderId} not found. Redelivering.", evt.OrderId); + await msg.NakAsync(cancellationToken: stoppingToken); continue; } await subscriptions.HandleSubscriptionOrder(order); logger.LogInformation("Subscription for order {OrderId} handled successfully.", evt.OrderId); + await msg.AckAsync(cancellationToken: stoppingToken); } catch (Exception ex) { - logger.LogError(ex, "Error processing payment order event for order {OrderId}", evt?.OrderId); - await nats.PublishAsync(PaymentOrderEventBase.Type, msg.Data, cancellationToken: stoppingToken); + logger.LogError(ex, "Error processing payment order event for order {OrderId}. Redelivering.", evt?.OrderId); + await msg.NakAsync(cancellationToken: stoppingToken); } } } diff --git a/DysonNetwork.Pass/Startup/ServiceCollectionExtensions.cs b/DysonNetwork.Pass/Startup/ServiceCollectionExtensions.cs index 712c03a..44bbd66 100644 --- a/DysonNetwork.Pass/Startup/ServiceCollectionExtensions.cs +++ b/DysonNetwork.Pass/Startup/ServiceCollectionExtensions.cs @@ -207,9 +207,11 @@ public static class ServiceCollectionExtensions services.AddScoped(); services.AddScoped(); services.AddScoped(); - + services.Configure(configuration.GetSection("OidcProvider")); services.AddScoped(); + + services.AddHostedService(); return services; } diff --git a/DysonNetwork.Pass/Wallet/PaymentService.cs b/DysonNetwork.Pass/Wallet/PaymentService.cs index a3b551c..389c9be 100644 --- a/DysonNetwork.Pass/Wallet/PaymentService.cs +++ b/DysonNetwork.Pass/Wallet/PaymentService.cs @@ -7,6 +7,8 @@ using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore.Storage; using Microsoft.Extensions.Localization; using NATS.Client.Core; +using NATS.Client.JetStream; +using NATS.Net; using NodaTime; using AccountService = DysonNetwork.Pass.Account.AccountService; @@ -253,6 +255,27 @@ public class PaymentService( throw new InvalidOperationException("Order not found"); } + var js = nats.CreateJetStreamContext(); + + if (order.Status == OrderStatus.Paid) + { + await js.PublishAsync( + PaymentOrderEventBase.Type, + GrpcTypeHelper.ConvertObjectToByteString(new PaymentOrderEvent + { + OrderId = order.Id, + WalletId = payerWallet.Id, + AccountId = payerWallet.AccountId, + AppIdentifier = order.AppIdentifier, + ProductIdentifier = order.ProductIdentifier, + Meta = order.Meta ?? [], + Status = (int)order.Status, + }).ToByteArray() + ); + + return order; + } + if (order.Status != OrderStatus.Unpaid) { throw new InvalidOperationException($"Order is in invalid status: {order.Status}"); @@ -282,16 +305,19 @@ public class PaymentService( await NotifyOrderPaid(order, payerWallet, order.PayeeWallet); - await nats.PublishAsync(PaymentOrderEventBase.Type, JsonSerializer.SerializeToUtf8Bytes(new PaymentOrderEvent - { - OrderId = order.Id, - WalletId = payerWallet.Id, - AccountId = payerWallet.AccountId, - AppIdentifier = order.AppIdentifier, - ProductIdentifier = order.ProductIdentifier, - Meta = order.Meta ?? [], - Status = (int)order.Status, - })); + await js.PublishAsync( + PaymentOrderEventBase.Type, + GrpcTypeHelper.ConvertObjectToByteString(new PaymentOrderEvent + { + OrderId = order.Id, + WalletId = payerWallet.Id, + AccountId = payerWallet.AccountId, + AppIdentifier = order.AppIdentifier, + ProductIdentifier = order.ProductIdentifier, + Meta = order.Meta ?? [], + Status = (int)order.Status, + }).ToByteArray() + ); return order; } diff --git a/DysonNetwork.Pusher/DysonNetwork.Pusher.csproj b/DysonNetwork.Pusher/DysonNetwork.Pusher.csproj index 76416a5..49445bb 100644 --- a/DysonNetwork.Pusher/DysonNetwork.Pusher.csproj +++ b/DysonNetwork.Pusher/DysonNetwork.Pusher.csproj @@ -19,8 +19,6 @@ runtime; build; native; contentfiles; analyzers; buildtransitive all - - all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/DysonNetwork.Pusher/Services/QueueBackgroundService.cs b/DysonNetwork.Pusher/Services/QueueBackgroundService.cs index 422d042..d39e444 100644 --- a/DysonNetwork.Pusher/Services/QueueBackgroundService.cs +++ b/DysonNetwork.Pusher/Services/QueueBackgroundService.cs @@ -3,8 +3,12 @@ using DysonNetwork.Pusher.Email; using DysonNetwork.Pusher.Notification; using DysonNetwork.Shared.Proto; using DysonNetwork.Shared.Registry; +using DysonNetwork.Shared.Stream; using Google.Protobuf; using NATS.Client.Core; +using NATS.Client.JetStream; +using NATS.Client.JetStream.Models; +using NATS.Net; namespace DysonNetwork.Pusher.Services; @@ -16,8 +20,8 @@ public class QueueBackgroundService( ) : BackgroundService { - public const string QueueName = "pusher.queue"; - public const string QueueGroup = "pusher.workers"; + public const string QueueName = "pusher_queue"; + private const string QueueGroup = "pusher_workers"; private readonly int _consumerCount = configuration.GetValue("ConsumerCount") ?? Environment.ProcessorCount; private readonly List _consumerTasks = []; @@ -36,11 +40,16 @@ public class QueueBackgroundService( private async Task RunConsumerAsync(CancellationToken stoppingToken) { logger.LogInformation("Queue consumer started"); + var js = nats.CreateJetStreamContext(); - await foreach (var msg in nats.SubscribeAsync( - QueueName, - queueGroup: QueueGroup, - cancellationToken: stoppingToken)) + await js.EnsureStreamCreated("pusher_events", [QueueName]); + + var consumer = await js.CreateOrUpdateConsumerAsync( + "pusher_events", + new ConsumerConfig(QueueGroup), // durable consumer + cancellationToken: stoppingToken); + + await foreach (var msg in consumer.ConsumeAsync(cancellationToken: stoppingToken)) { try { @@ -48,10 +57,12 @@ public class QueueBackgroundService( if (message is not null) { await ProcessMessageAsync(msg, message, stoppingToken); + await msg.AckAsync(cancellationToken: stoppingToken); } else { logger.LogWarning($"Invalid message format for {msg.Subject}"); + await msg.AckAsync(cancellationToken: stoppingToken); // Acknowledge invalid messages to avoid redelivery } } catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) @@ -62,41 +73,31 @@ public class QueueBackgroundService( catch (Exception ex) { logger.LogError(ex, "Error in queue consumer"); - // Add a small delay to prevent tight error loops - await Task.Delay(1000, stoppingToken); + await msg.NakAsync(cancellationToken: stoppingToken); } } } - private async ValueTask ProcessMessageAsync(NatsMsg rawMsg, QueueMessage message, + private async ValueTask ProcessMessageAsync(NatsJSMsg rawMsg, QueueMessage message, CancellationToken cancellationToken) { using var scope = serviceProvider.CreateScope(); logger.LogDebug("Processing message of type {MessageType}", message.Type); - try + switch (message.Type) { - switch (message.Type) - { - case QueueMessageType.Email: - await ProcessEmailMessageAsync(message, scope); - break; + case QueueMessageType.Email: + await ProcessEmailMessageAsync(message, scope); + break; - case QueueMessageType.PushNotification: - await ProcessPushNotificationMessageAsync(message, scope, cancellationToken); - break; + case QueueMessageType.PushNotification: + await ProcessPushNotificationMessageAsync(message, scope, cancellationToken); + break; - default: - logger.LogWarning("Unknown message type: {MessageType}", message.Type); - break; - } - } - catch (Exception ex) - { - logger.LogError(ex, "Error processing message of type {MessageType}", message.Type); - // Don't rethrow to prevent the message from being retried indefinitely - // In a production scenario, you might want to implement a dead-letter queue + default: + logger.LogWarning("Unknown message type: {MessageType}", message.Type); + break; } } @@ -126,16 +127,8 @@ public class QueueBackgroundService( return; } - try - { - logger.LogDebug("Processing push notification for account {AccountId}", notification.AccountId); - await pushService.DeliverPushNotification(notification, cancellationToken); - logger.LogDebug("Successfully processed push notification for account {AccountId}", notification.AccountId); - } - catch (Exception ex) - { - logger.LogError(ex, "Error processing push notification for account {AccountId}", notification.AccountId); - // Don't rethrow to prevent the message from being retried indefinitely - } + logger.LogDebug("Processing push notification for account {AccountId}", notification.AccountId); + await pushService.DeliverPushNotification(notification, cancellationToken); + logger.LogDebug("Successfully processed push notification for account {AccountId}", notification.AccountId); } } \ No newline at end of file diff --git a/DysonNetwork.Pusher/Services/QueueService.cs b/DysonNetwork.Pusher/Services/QueueService.cs index 8e1f69e..094d504 100644 --- a/DysonNetwork.Pusher/Services/QueueService.cs +++ b/DysonNetwork.Pusher/Services/QueueService.cs @@ -1,6 +1,8 @@ using System.Text.Json; using DysonNetwork.Shared.Proto; using NATS.Client.Core; +using NATS.Client.JetStream; +using NATS.Net; namespace DysonNetwork.Pusher.Services; @@ -20,7 +22,8 @@ public class QueueService(INatsConnection nats) }) }; var rawMessage = GrpcTypeHelper.ConvertObjectToByteString(message).ToByteArray(); - await nats.PublishAsync(QueueBackgroundService.QueueName, rawMessage); + var js = nats.CreateJetStreamContext(); + await js.PublishAsync(QueueBackgroundService.QueueName, rawMessage); } public async Task EnqueuePushNotification(Notification.Notification notification, Guid userId, bool isSavable = false) @@ -35,7 +38,8 @@ public class QueueService(INatsConnection nats) Data = JsonSerializer.Serialize(notification) }; var rawMessage = GrpcTypeHelper.ConvertObjectToByteString(message).ToByteArray(); - await nats.PublishAsync(QueueBackgroundService.QueueName, rawMessage); + var js = nats.CreateJetStreamContext(); + await js.PublishAsync(QueueBackgroundService.QueueName, rawMessage); } } diff --git a/DysonNetwork.Shared/DysonNetwork.Shared.csproj b/DysonNetwork.Shared/DysonNetwork.Shared.csproj index cfaf3de..341d5bf 100644 --- a/DysonNetwork.Shared/DysonNetwork.Shared.csproj +++ b/DysonNetwork.Shared/DysonNetwork.Shared.csproj @@ -21,8 +21,7 @@ - - + diff --git a/DysonNetwork.Shared/Stream/Streamer.cs b/DysonNetwork.Shared/Stream/Streamer.cs new file mode 100644 index 0000000..8a2df5e --- /dev/null +++ b/DysonNetwork.Shared/Stream/Streamer.cs @@ -0,0 +1,23 @@ +using NATS.Client.JetStream; +using NATS.Client.JetStream.Models; + +namespace DysonNetwork.Shared.Stream; + +public static class Streamer +{ + public static async Task EnsureStreamCreated( + this INatsJSContext context, + string stream, + ICollection? subjects + ) + { + try + { + return await context.CreateStreamAsync(new StreamConfig(stream, subjects ?? [])); + } + catch (NatsJSException) + { + return await context.GetStreamAsync(stream); + } + } +} \ No newline at end of file diff --git a/DysonNetwork.Sphere/DysonNetwork.Sphere.csproj b/DysonNetwork.Sphere/DysonNetwork.Sphere.csproj index 69f5b71..cc01ed1 100644 --- a/DysonNetwork.Sphere/DysonNetwork.Sphere.csproj +++ b/DysonNetwork.Sphere/DysonNetwork.Sphere.csproj @@ -30,8 +30,6 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive - - all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/DysonNetwork.Sphere/Startup/BroadcastEventHandler.cs b/DysonNetwork.Sphere/Startup/BroadcastEventHandler.cs index 6620fa5..97cd5f3 100644 --- a/DysonNetwork.Sphere/Startup/BroadcastEventHandler.cs +++ b/DysonNetwork.Sphere/Startup/BroadcastEventHandler.cs @@ -1,9 +1,12 @@ using System.Text.Json; using System.Text.Json.Serialization; +using DysonNetwork.Shared.Proto; using DysonNetwork.Shared.Stream; using DysonNetwork.Sphere.Post; using Microsoft.EntityFrameworkCore; using NATS.Client.Core; +using NATS.Client.JetStream.Models; +using NATS.Net; namespace DysonNetwork.Sphere.Startup; @@ -16,7 +19,7 @@ public class PaymentOrderAwardMeta { [JsonPropertyName("account_id")] public Guid AccountId { get; set; } [JsonPropertyName("post_id")] public Guid PostId { get; set; } - [JsonPropertyName("amount")] public string Amount { get; set; } + [JsonPropertyName("amount")] public string Amount { get; set; } = null!; [JsonPropertyName("attitude")] public PostReactionAttitude Attitude { get; set; } [JsonPropertyName("message")] public string? Message { get; set; } } @@ -29,14 +32,33 @@ public class BroadcastEventHandler( { protected override async Task ExecuteAsync(CancellationToken stoppingToken) { - await foreach (var msg in nats.SubscribeAsync(PaymentOrderEventBase.Type, cancellationToken: stoppingToken)) + var paymentTask = HandlePaymentOrders(stoppingToken); + var accountTask = HandleAccountDeletions(stoppingToken); + + await Task.WhenAll(paymentTask, accountTask); + } + + private async Task HandlePaymentOrders(CancellationToken stoppingToken) + { + var js = nats.CreateJetStreamContext(); + + await js.EnsureStreamCreated("payment_events", [PaymentOrderEventBase.Type]); + + var consumer = await js.CreateOrUpdateConsumerAsync("payment_events", + new ConsumerConfig("sphere_payment_handler"), cancellationToken: stoppingToken); + + await foreach (var msg in consumer.ConsumeAsync(cancellationToken: stoppingToken)) { PaymentOrderEvent? evt = null; try { - evt = JsonSerializer.Deserialize(msg.Data); - - // Every order goes into the MQ is already paid, so we skipped the status validation + evt = JsonSerializer.Deserialize(msg.Data, GrpcTypeHelper.SerializerOptions); + + logger.LogInformation( + "Received order event: {ProductIdentifier} {OrderId}", + evt?.ProductIdentifier, + evt?.OrderId + ); if (evt?.ProductIdentifier is null) continue; @@ -47,9 +69,9 @@ public class BroadcastEventHandler( { var awardEvt = JsonSerializer.Deserialize(msg.Data); if (awardEvt?.Meta == null) throw new ArgumentNullException(nameof(awardEvt)); - + var meta = awardEvt.Meta; - + logger.LogInformation("Handling post award order: {OrderId}", evt.OrderId); await using var scope = serviceProvider.CreateAsyncScope(); @@ -60,28 +82,41 @@ public class BroadcastEventHandler( await ps.AwardPost(meta.PostId, meta.AccountId, amountNum, meta.Attitude, meta.Message); logger.LogInformation("Post award for order {OrderId} handled successfully.", evt.OrderId); - + await msg.AckAsync(cancellationToken: stoppingToken); break; } default: - await nats.PublishAsync(PaymentOrderEventBase.Type, msg.Data, cancellationToken: stoppingToken); + await msg.NakAsync(cancellationToken: stoppingToken); break; } } catch (Exception ex) { logger.LogError(ex, "Error processing payment order event for order {OrderId}", evt?.OrderId); - await nats.PublishAsync(PaymentOrderEventBase.Type, msg.Data, cancellationToken: stoppingToken); + await msg.NakAsync(cancellationToken: stoppingToken); } } + } - await foreach (var msg in nats.SubscribeAsync(AccountDeletedEvent.Type, - cancellationToken: stoppingToken)) + private async Task HandleAccountDeletions(CancellationToken stoppingToken) + { + var js = nats.CreateJetStreamContext(); + + await js.EnsureStreamCreated("account_events", [AccountDeletedEvent.Type]); + + var consumer = await js.CreateOrUpdateConsumerAsync("account_events", + new ConsumerConfig("sphere_account_deleted_handler"), cancellationToken: stoppingToken); + + await foreach (var msg in consumer.ConsumeAsync(cancellationToken: stoppingToken)) { try { - var evt = JsonSerializer.Deserialize(msg.Data); - if (evt == null) continue; + var evt = JsonSerializer.Deserialize(msg.Data, GrpcTypeHelper.SerializerOptions); + if (evt == null) + { + await msg.AckAsync(cancellationToken: stoppingToken); + continue; + } logger.LogInformation("Account deleted: {AccountId}", evt.AccountId); @@ -120,11 +155,14 @@ public class BroadcastEventHandler( await transaction.RollbackAsync(cancellationToken: stoppingToken); throw; } + + await msg.AckAsync(cancellationToken: stoppingToken); } catch (Exception ex) { logger.LogError(ex, "Error processing AccountDeleted"); + await msg.NakAsync(cancellationToken: stoppingToken); } } } -} +} \ No newline at end of file diff --git a/DysonNetwork.sln.DotSettings.user b/DysonNetwork.sln.DotSettings.user index 77593bf..6c84793 100644 --- a/DysonNetwork.sln.DotSettings.user +++ b/DysonNetwork.sln.DotSettings.user @@ -138,6 +138,7 @@ ForceIncluded ForceIncluded ForceIncluded + ForceIncluded ForceIncluded ForceIncluded ForceIncluded