♻️ Refactor NATS message handling

This commit is contained in:
2025-08-21 18:47:23 +08:00
parent 4d1972bc99
commit d92220b4bc
2 changed files with 20 additions and 8 deletions

View File

@@ -3,6 +3,7 @@ using DysonNetwork.Pusher.Email;
using DysonNetwork.Pusher.Notification; using DysonNetwork.Pusher.Notification;
using DysonNetwork.Shared.Proto; using DysonNetwork.Shared.Proto;
using DysonNetwork.Shared.Registry; using DysonNetwork.Shared.Registry;
using Google.Protobuf;
using NATS.Client.Core; using NATS.Client.Core;
namespace DysonNetwork.Pusher.Services; namespace DysonNetwork.Pusher.Services;
@@ -36,14 +37,23 @@ public class QueueBackgroundService(
{ {
logger.LogInformation("Queue consumer started"); logger.LogInformation("Queue consumer started");
await foreach (var msg in nats.SubscribeAsync<QueueMessage>( await foreach (var msg in nats.SubscribeAsync<byte[]>(
QueueName, QueueName,
queueGroup: QueueGroup, queueGroup: QueueGroup,
cancellationToken: stoppingToken)) cancellationToken: stoppingToken))
{ {
try try
{ {
await ProcessMessageAsync(msg, stoppingToken); var message = GrpcTypeHelper.ConvertByteStringToObject<QueueMessage>(ByteString.CopyFrom(msg.Data));
if (message is not null)
{
await ProcessMessageAsync(msg, message, stoppingToken);
}
else
{
await msg.ReplyAsync(cancellationToken: stoppingToken);
logger.LogWarning($"Invalid message format for {msg.Subject}");
}
} }
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{ {
@@ -59,10 +69,10 @@ public class QueueBackgroundService(
} }
} }
private async ValueTask ProcessMessageAsync(NatsMsg<QueueMessage> msg, CancellationToken cancellationToken) private async ValueTask ProcessMessageAsync(NatsMsg<byte[]> rawMsg, QueueMessage message,
CancellationToken cancellationToken)
{ {
using var scope = serviceProvider.CreateScope(); using var scope = serviceProvider.CreateScope();
var message = msg.Data;
logger.LogDebug("Processing message of type {MessageType}", message.Type); logger.LogDebug("Processing message of type {MessageType}", message.Type);
@@ -83,7 +93,7 @@ public class QueueBackgroundService(
break; break;
} }
await msg.ReplyAsync(); await rawMsg.ReplyAsync(cancellationToken: cancellationToken);
} }
catch (Exception ex) catch (Exception ex)
{ {

View File

@@ -1,4 +1,5 @@
using System.Text.Json; using System.Text.Json;
using DysonNetwork.Shared.Proto;
using NATS.Client.Core; using NATS.Client.Core;
namespace DysonNetwork.Pusher.Services; namespace DysonNetwork.Pusher.Services;
@@ -20,7 +21,8 @@ public class QueueService(INatsConnection nats)
Body = body Body = body
}) })
}; };
await nats.PublishAsync(QueueName, message); var rawMessage = GrpcTypeHelper.ConvertObjectToByteString(message).ToByteArray();
await nats.PublishAsync(QueueName, rawMessage);
} }
public async Task EnqueuePushNotification(Notification.Notification notification, Guid userId, bool isSavable = false) public async Task EnqueuePushNotification(Notification.Notification notification, Guid userId, bool isSavable = false)
@@ -34,8 +36,8 @@ public class QueueService(INatsConnection nats)
TargetId = userId.ToString(), TargetId = userId.ToString(),
Data = JsonSerializer.Serialize(notification) Data = JsonSerializer.Serialize(notification)
}; };
var rawMessage = GrpcTypeHelper.ConvertObjectToByteString(message).ToByteArray();
await nats.PublishAsync(QueueName, message); await nats.PublishAsync(QueueName, rawMessage);
} }
} }