diff --git a/DysonNetwork.Pusher/Services/QueueBackgroundService.cs b/DysonNetwork.Pusher/Services/QueueBackgroundService.cs index 5593b65..214459d 100644 --- a/DysonNetwork.Pusher/Services/QueueBackgroundService.cs +++ b/DysonNetwork.Pusher/Services/QueueBackgroundService.cs @@ -3,6 +3,7 @@ using DysonNetwork.Pusher.Email; using DysonNetwork.Pusher.Notification; using DysonNetwork.Shared.Proto; using DysonNetwork.Shared.Registry; +using Google.Protobuf; using NATS.Client.Core; namespace DysonNetwork.Pusher.Services; @@ -36,14 +37,23 @@ public class QueueBackgroundService( { logger.LogInformation("Queue consumer started"); - await foreach (var msg in nats.SubscribeAsync( + await foreach (var msg in nats.SubscribeAsync( QueueName, queueGroup: QueueGroup, cancellationToken: stoppingToken)) { try { - await ProcessMessageAsync(msg, stoppingToken); + var message = GrpcTypeHelper.ConvertByteStringToObject(ByteString.CopyFrom(msg.Data)); + if (message is not null) + { + await ProcessMessageAsync(msg, message, stoppingToken); + } + else + { + await msg.ReplyAsync(cancellationToken: stoppingToken); + logger.LogWarning($"Invalid message format for {msg.Subject}"); + } } catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) { @@ -59,10 +69,10 @@ public class QueueBackgroundService( } } - private async ValueTask ProcessMessageAsync(NatsMsg msg, CancellationToken cancellationToken) + private async ValueTask ProcessMessageAsync(NatsMsg rawMsg, QueueMessage message, + CancellationToken cancellationToken) { using var scope = serviceProvider.CreateScope(); - var message = msg.Data; logger.LogDebug("Processing message of type {MessageType}", message.Type); @@ -83,7 +93,7 @@ public class QueueBackgroundService( break; } - await msg.ReplyAsync(); + await rawMsg.ReplyAsync(cancellationToken: cancellationToken); } catch (Exception ex) { diff --git a/DysonNetwork.Pusher/Services/QueueService.cs b/DysonNetwork.Pusher/Services/QueueService.cs index 95c21d9..3d1ced9 100644 --- a/DysonNetwork.Pusher/Services/QueueService.cs +++ b/DysonNetwork.Pusher/Services/QueueService.cs @@ -1,4 +1,5 @@ using System.Text.Json; +using DysonNetwork.Shared.Proto; using NATS.Client.Core; namespace DysonNetwork.Pusher.Services; @@ -20,7 +21,8 @@ public class QueueService(INatsConnection nats) Body = body }) }; - await nats.PublishAsync(QueueName, message); + var rawMessage = GrpcTypeHelper.ConvertObjectToByteString(message).ToByteArray(); + await nats.PublishAsync(QueueName, rawMessage); } public async Task EnqueuePushNotification(Notification.Notification notification, Guid userId, bool isSavable = false) @@ -34,8 +36,8 @@ public class QueueService(INatsConnection nats) TargetId = userId.ToString(), Data = JsonSerializer.Serialize(notification) }; - - await nats.PublishAsync(QueueName, message); + var rawMessage = GrpcTypeHelper.ConvertObjectToByteString(message).ToByteArray(); + await nats.PublishAsync(QueueName, rawMessage); } }