♻️ Replace normal streams with JetStream

🐛 Fix pass order didn't handled successfully
This commit is contained in:
2025-09-14 19:25:53 +08:00
parent 19bf17200d
commit 4ee387ab76
17 changed files with 213 additions and 100 deletions

View File

@@ -3,8 +3,12 @@ using DysonNetwork.Pusher.Email;
using DysonNetwork.Pusher.Notification;
using DysonNetwork.Shared.Proto;
using DysonNetwork.Shared.Registry;
using DysonNetwork.Shared.Stream;
using Google.Protobuf;
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;
using NATS.Net;
namespace DysonNetwork.Pusher.Services;
@@ -16,8 +20,8 @@ public class QueueBackgroundService(
)
: BackgroundService
{
public const string QueueName = "pusher.queue";
public const string QueueGroup = "pusher.workers";
public const string QueueName = "pusher_queue";
private const string QueueGroup = "pusher_workers";
private readonly int _consumerCount = configuration.GetValue<int?>("ConsumerCount") ?? Environment.ProcessorCount;
private readonly List<Task> _consumerTasks = [];
@@ -36,11 +40,16 @@ public class QueueBackgroundService(
private async Task RunConsumerAsync(CancellationToken stoppingToken)
{
logger.LogInformation("Queue consumer started");
var js = nats.CreateJetStreamContext();
await foreach (var msg in nats.SubscribeAsync<byte[]>(
QueueName,
queueGroup: QueueGroup,
cancellationToken: stoppingToken))
await js.EnsureStreamCreated("pusher_events", [QueueName]);
var consumer = await js.CreateOrUpdateConsumerAsync(
"pusher_events",
new ConsumerConfig(QueueGroup), // durable consumer
cancellationToken: stoppingToken);
await foreach (var msg in consumer.ConsumeAsync<byte[]>(cancellationToken: stoppingToken))
{
try
{
@@ -48,10 +57,12 @@ public class QueueBackgroundService(
if (message is not null)
{
await ProcessMessageAsync(msg, message, stoppingToken);
await msg.AckAsync(cancellationToken: stoppingToken);
}
else
{
logger.LogWarning($"Invalid message format for {msg.Subject}");
await msg.AckAsync(cancellationToken: stoppingToken); // Acknowledge invalid messages to avoid redelivery
}
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
@@ -62,41 +73,31 @@ public class QueueBackgroundService(
catch (Exception ex)
{
logger.LogError(ex, "Error in queue consumer");
// Add a small delay to prevent tight error loops
await Task.Delay(1000, stoppingToken);
await msg.NakAsync(cancellationToken: stoppingToken);
}
}
}
private async ValueTask ProcessMessageAsync(NatsMsg<byte[]> rawMsg, QueueMessage message,
private async ValueTask ProcessMessageAsync(NatsJSMsg<byte[]> rawMsg, QueueMessage message,
CancellationToken cancellationToken)
{
using var scope = serviceProvider.CreateScope();
logger.LogDebug("Processing message of type {MessageType}", message.Type);
try
switch (message.Type)
{
switch (message.Type)
{
case QueueMessageType.Email:
await ProcessEmailMessageAsync(message, scope);
break;
case QueueMessageType.Email:
await ProcessEmailMessageAsync(message, scope);
break;
case QueueMessageType.PushNotification:
await ProcessPushNotificationMessageAsync(message, scope, cancellationToken);
break;
case QueueMessageType.PushNotification:
await ProcessPushNotificationMessageAsync(message, scope, cancellationToken);
break;
default:
logger.LogWarning("Unknown message type: {MessageType}", message.Type);
break;
}
}
catch (Exception ex)
{
logger.LogError(ex, "Error processing message of type {MessageType}", message.Type);
// Don't rethrow to prevent the message from being retried indefinitely
// In a production scenario, you might want to implement a dead-letter queue
default:
logger.LogWarning("Unknown message type: {MessageType}", message.Type);
break;
}
}
@@ -126,16 +127,8 @@ public class QueueBackgroundService(
return;
}
try
{
logger.LogDebug("Processing push notification for account {AccountId}", notification.AccountId);
await pushService.DeliverPushNotification(notification, cancellationToken);
logger.LogDebug("Successfully processed push notification for account {AccountId}", notification.AccountId);
}
catch (Exception ex)
{
logger.LogError(ex, "Error processing push notification for account {AccountId}", notification.AccountId);
// Don't rethrow to prevent the message from being retried indefinitely
}
logger.LogDebug("Processing push notification for account {AccountId}", notification.AccountId);
await pushService.DeliverPushNotification(notification, cancellationToken);
logger.LogDebug("Successfully processed push notification for account {AccountId}", notification.AccountId);
}
}