⚡ Optimize queue usage
This commit is contained in:
5
.editorconfig
Normal file
5
.editorconfig
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
root = true
|
||||||
|
|
||||||
|
[*]
|
||||||
|
indent_style = space
|
||||||
|
indent_size = 4
|
@@ -5,8 +5,6 @@ using DysonNetwork.Shared.Proto;
|
|||||||
using DysonNetwork.Shared.Stream;
|
using DysonNetwork.Shared.Stream;
|
||||||
using Google.Protobuf;
|
using Google.Protobuf;
|
||||||
using NATS.Client.Core;
|
using NATS.Client.Core;
|
||||||
using NATS.Client.JetStream;
|
|
||||||
using NATS.Client.JetStream.Models;
|
|
||||||
using NATS.Net;
|
using NATS.Net;
|
||||||
|
|
||||||
namespace DysonNetwork.Ring.Services;
|
namespace DysonNetwork.Ring.Services;
|
||||||
@@ -39,29 +37,19 @@ public class QueueBackgroundService(
|
|||||||
private async Task RunConsumerAsync(CancellationToken stoppingToken)
|
private async Task RunConsumerAsync(CancellationToken stoppingToken)
|
||||||
{
|
{
|
||||||
logger.LogInformation("Queue consumer started");
|
logger.LogInformation("Queue consumer started");
|
||||||
var js = nats.CreateJetStreamContext();
|
|
||||||
|
|
||||||
await js.EnsureStreamCreated("pusher_events", [QueueName]);
|
await foreach (var msg in nats.SubscribeAsync<byte[]>(QueueName, queueGroup: QueueGroup, cancellationToken: stoppingToken))
|
||||||
|
|
||||||
var consumer = await js.CreateOrUpdateConsumerAsync(
|
|
||||||
"pusher_events",
|
|
||||||
new ConsumerConfig(QueueGroup), // durable consumer
|
|
||||||
cancellationToken: stoppingToken);
|
|
||||||
|
|
||||||
await foreach (var msg in consumer.ConsumeAsync<byte[]>(cancellationToken: stoppingToken))
|
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
var message = GrpcTypeHelper.ConvertByteStringToObject<QueueMessage>(ByteString.CopyFrom(msg.Data));
|
var message = GrpcTypeHelper.ConvertByteStringToObject<QueueMessage>(ByteString.CopyFrom(msg.Data));
|
||||||
if (message is not null)
|
if (message is not null)
|
||||||
{
|
{
|
||||||
await ProcessMessageAsync(msg, message, stoppingToken);
|
await ProcessMessageAsync(message, stoppingToken);
|
||||||
await msg.AckAsync(cancellationToken: stoppingToken);
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
logger.LogWarning($"Invalid message format for {msg.Subject}");
|
logger.LogWarning($"Invalid message format for {msg.Subject}");
|
||||||
await msg.AckAsync(cancellationToken: stoppingToken); // Acknowledge invalid messages to avoid redelivery
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
|
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
|
||||||
@@ -72,12 +60,11 @@ public class QueueBackgroundService(
|
|||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
logger.LogError(ex, "Error in queue consumer");
|
logger.LogError(ex, "Error in queue consumer");
|
||||||
await msg.NakAsync(cancellationToken: stoppingToken);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async ValueTask ProcessMessageAsync(NatsJSMsg<byte[]> rawMsg, QueueMessage message,
|
private async ValueTask ProcessMessageAsync(QueueMessage message,
|
||||||
CancellationToken cancellationToken)
|
CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
using var scope = serviceProvider.CreateScope();
|
using var scope = serviceProvider.CreateScope();
|
||||||
|
@@ -1,7 +1,6 @@
|
|||||||
using System.Text.Json;
|
using System.Text.Json;
|
||||||
using DysonNetwork.Shared.Proto;
|
using DysonNetwork.Shared.Proto;
|
||||||
using NATS.Client.Core;
|
using NATS.Client.Core;
|
||||||
using NATS.Net;
|
|
||||||
|
|
||||||
namespace DysonNetwork.Ring.Services;
|
namespace DysonNetwork.Ring.Services;
|
||||||
|
|
||||||
@@ -21,8 +20,7 @@ public class QueueService(INatsConnection nats)
|
|||||||
})
|
})
|
||||||
};
|
};
|
||||||
var rawMessage = GrpcTypeHelper.ConvertObjectToByteString(message).ToByteArray();
|
var rawMessage = GrpcTypeHelper.ConvertObjectToByteString(message).ToByteArray();
|
||||||
var js = nats.CreateJetStreamContext();
|
await nats.PublishAsync(QueueBackgroundService.QueueName, rawMessage);
|
||||||
await js.PublishAsync(QueueBackgroundService.QueueName, rawMessage);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task EnqueuePushNotification(Shared.Models.SnNotification notification, Guid userId, bool isSavable = false)
|
public async Task EnqueuePushNotification(Shared.Models.SnNotification notification, Guid userId, bool isSavable = false)
|
||||||
@@ -37,8 +35,7 @@ public class QueueService(INatsConnection nats)
|
|||||||
Data = JsonSerializer.Serialize(notification)
|
Data = JsonSerializer.Serialize(notification)
|
||||||
};
|
};
|
||||||
var rawMessage = GrpcTypeHelper.ConvertObjectToByteString(message).ToByteArray();
|
var rawMessage = GrpcTypeHelper.ConvertObjectToByteString(message).ToByteArray();
|
||||||
var js = nats.CreateJetStreamContext();
|
await nats.PublishAsync(QueueBackgroundService.QueueName, rawMessage);
|
||||||
await js.PublishAsync(QueueBackgroundService.QueueName, rawMessage);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user