💥 Rename Pusher to Ring
This commit is contained in:
172
DysonNetwork.Ring/Services/PusherServiceGrpc.cs
Normal file
172
DysonNetwork.Ring/Services/PusherServiceGrpc.cs
Normal file
@@ -0,0 +1,172 @@
|
||||
using DysonNetwork.Ring.Connection;
|
||||
using DysonNetwork.Ring.Email;
|
||||
using DysonNetwork.Ring.Notification;
|
||||
using DysonNetwork.Shared.Proto;
|
||||
using DysonNetwork.Shared.Registry;
|
||||
using Google.Protobuf.WellKnownTypes;
|
||||
using Grpc.Core;
|
||||
using System.Text.Json;
|
||||
|
||||
namespace DysonNetwork.Ring.Services;
|
||||
|
||||
public class RingServiceGrpc(
|
||||
QueueService queueService,
|
||||
WebSocketService websocket,
|
||||
PushService pushService
|
||||
) : RingService.RingServiceBase
|
||||
{
|
||||
public override async Task<Empty> SendEmail(SendEmailRequest request, ServerCallContext context)
|
||||
{
|
||||
await queueService.EnqueueEmail(
|
||||
request.Email.ToName,
|
||||
request.Email.ToAddress,
|
||||
request.Email.Subject,
|
||||
request.Email.Body
|
||||
);
|
||||
return new Empty();
|
||||
}
|
||||
|
||||
public override Task<Empty> PushWebSocketPacket(PushWebSocketPacketRequest request, ServerCallContext context)
|
||||
{
|
||||
var packet = new Connection.WebSocketPacket
|
||||
{
|
||||
Type = request.Packet.Type,
|
||||
Data = GrpcTypeHelper.ConvertByteStringToObject<Dictionary<string, object?>>(request.Packet.Data),
|
||||
ErrorMessage = request.Packet.ErrorMessage
|
||||
};
|
||||
websocket.SendPacketToAccount(request.UserId, packet);
|
||||
return Task.FromResult(new Empty());
|
||||
}
|
||||
|
||||
public override Task<Empty> PushWebSocketPacketToUsers(PushWebSocketPacketToUsersRequest request,
|
||||
ServerCallContext context)
|
||||
{
|
||||
var packet = new Connection.WebSocketPacket
|
||||
{
|
||||
Type = request.Packet.Type,
|
||||
Data = GrpcTypeHelper.ConvertByteStringToObject<Dictionary<string, object?>>(request.Packet.Data),
|
||||
ErrorMessage = request.Packet.ErrorMessage
|
||||
};
|
||||
|
||||
foreach (var userId in request.UserIds)
|
||||
{
|
||||
websocket.SendPacketToAccount(userId, packet);
|
||||
}
|
||||
|
||||
return Task.FromResult(new Empty());
|
||||
}
|
||||
|
||||
public override Task<Empty> PushWebSocketPacketToDevice(PushWebSocketPacketToDeviceRequest request,
|
||||
ServerCallContext context)
|
||||
{
|
||||
var packet = new Connection.WebSocketPacket
|
||||
{
|
||||
Type = request.Packet.Type,
|
||||
Data = GrpcTypeHelper.ConvertByteStringToObject<Dictionary<string, object?>>(request.Packet.Data),
|
||||
ErrorMessage = request.Packet.ErrorMessage
|
||||
};
|
||||
websocket.SendPacketToDevice(request.DeviceId, packet);
|
||||
return Task.FromResult(new Empty());
|
||||
}
|
||||
|
||||
public override Task<Empty> PushWebSocketPacketToDevices(PushWebSocketPacketToDevicesRequest request,
|
||||
ServerCallContext context)
|
||||
{
|
||||
var packet = new Connection.WebSocketPacket
|
||||
{
|
||||
Type = request.Packet.Type,
|
||||
Data = GrpcTypeHelper.ConvertByteStringToObject<Dictionary<string, object?>>(request.Packet.Data),
|
||||
ErrorMessage = request.Packet.ErrorMessage
|
||||
};
|
||||
|
||||
foreach (var deviceId in request.DeviceIds)
|
||||
{
|
||||
websocket.SendPacketToDevice(deviceId, packet);
|
||||
}
|
||||
|
||||
return Task.FromResult(new Empty());
|
||||
}
|
||||
|
||||
public override async Task<Empty> SendPushNotificationToUser(SendPushNotificationToUserRequest request,
|
||||
ServerCallContext context)
|
||||
{
|
||||
var notification = new Notification.Notification
|
||||
{
|
||||
Topic = request.Notification.Topic,
|
||||
Title = request.Notification.Title,
|
||||
Subtitle = request.Notification.Subtitle,
|
||||
Content = request.Notification.Body,
|
||||
Meta = request.Notification.HasMeta
|
||||
? GrpcTypeHelper.ConvertByteStringToObject<Dictionary<string, object?>>(request.Notification.Meta) ?? []
|
||||
: [],
|
||||
AccountId = Guid.Parse(request.UserId),
|
||||
};
|
||||
|
||||
if (request.Notification.ActionUri is not null)
|
||||
notification.Meta["action_uri"] = request.Notification.ActionUri;
|
||||
|
||||
if (request.Notification.IsSavable)
|
||||
await pushService.SaveNotification(notification);
|
||||
|
||||
await queueService.EnqueuePushNotification(
|
||||
notification,
|
||||
Guid.Parse(request.UserId),
|
||||
request.Notification.IsSavable
|
||||
);
|
||||
|
||||
return new Empty();
|
||||
}
|
||||
|
||||
public override async Task<Empty> SendPushNotificationToUsers(SendPushNotificationToUsersRequest request,
|
||||
ServerCallContext context)
|
||||
{
|
||||
var notification = new Notification.Notification
|
||||
{
|
||||
Topic = request.Notification.Topic,
|
||||
Title = request.Notification.Title,
|
||||
Subtitle = request.Notification.Subtitle,
|
||||
Content = request.Notification.Body,
|
||||
Meta = request.Notification.HasMeta
|
||||
? GrpcTypeHelper.ConvertByteStringToObject<Dictionary<string, object?>>(request.Notification.Meta) ?? []
|
||||
: [],
|
||||
};
|
||||
|
||||
if (request.Notification.ActionUri is not null)
|
||||
notification.Meta["action_uri"] = request.Notification.ActionUri;
|
||||
|
||||
var userIds = request.UserIds.Select(Guid.Parse).ToList();
|
||||
if (request.Notification.IsSavable)
|
||||
await pushService.SaveNotification(notification, userIds);
|
||||
|
||||
var tasks = userIds
|
||||
.Select(userId => queueService.EnqueuePushNotification(
|
||||
notification,
|
||||
userId,
|
||||
request.Notification.IsSavable
|
||||
));
|
||||
|
||||
await Task.WhenAll(tasks);
|
||||
return new Empty();
|
||||
}
|
||||
|
||||
public override async Task<Empty> UnsubscribePushNotifications(UnsubscribePushNotificationsRequest request,
|
||||
ServerCallContext context)
|
||||
{
|
||||
await pushService.UnsubscribeDevice(request.DeviceId);
|
||||
return new Empty();
|
||||
}
|
||||
|
||||
public override Task<GetWebsocketConnectionStatusResponse> GetWebsocketConnectionStatus(
|
||||
GetWebsocketConnectionStatusRequest request, ServerCallContext context)
|
||||
{
|
||||
var isConnected = request.IdCase switch
|
||||
{
|
||||
GetWebsocketConnectionStatusRequest.IdOneofCase.DeviceId =>
|
||||
websocket.GetDeviceIsConnected(request.DeviceId),
|
||||
GetWebsocketConnectionStatusRequest.IdOneofCase.UserId => websocket.GetAccountIsConnected(request.UserId),
|
||||
_ => false
|
||||
};
|
||||
|
||||
return Task.FromResult(new GetWebsocketConnectionStatusResponse { IsConnected = isConnected });
|
||||
}
|
||||
}
|
134
DysonNetwork.Ring/Services/QueueBackgroundService.cs
Normal file
134
DysonNetwork.Ring/Services/QueueBackgroundService.cs
Normal file
@@ -0,0 +1,134 @@
|
||||
using System.Text.Json;
|
||||
using DysonNetwork.Ring.Email;
|
||||
using DysonNetwork.Ring.Notification;
|
||||
using DysonNetwork.Shared.Proto;
|
||||
using DysonNetwork.Shared.Registry;
|
||||
using DysonNetwork.Shared.Stream;
|
||||
using Google.Protobuf;
|
||||
using NATS.Client.Core;
|
||||
using NATS.Client.JetStream;
|
||||
using NATS.Client.JetStream.Models;
|
||||
using NATS.Net;
|
||||
|
||||
namespace DysonNetwork.Ring.Services;
|
||||
|
||||
public class QueueBackgroundService(
|
||||
INatsConnection nats,
|
||||
IServiceProvider serviceProvider,
|
||||
ILogger<QueueBackgroundService> logger,
|
||||
IConfiguration configuration
|
||||
)
|
||||
: BackgroundService
|
||||
{
|
||||
public const string QueueName = "pusher_queue";
|
||||
private const string QueueGroup = "pusher_workers";
|
||||
private readonly int _consumerCount = configuration.GetValue<int?>("ConsumerCount") ?? Environment.ProcessorCount;
|
||||
private readonly List<Task> _consumerTasks = [];
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
logger.LogInformation("Starting {ConsumerCount} queue consumers", _consumerCount);
|
||||
|
||||
// Start multiple consumers
|
||||
for (var i = 0; i < _consumerCount; i++)
|
||||
_consumerTasks.Add(Task.Run(() => RunConsumerAsync(stoppingToken), stoppingToken));
|
||||
|
||||
// Wait for all consumers to complete
|
||||
await Task.WhenAll(_consumerTasks);
|
||||
}
|
||||
|
||||
private async Task RunConsumerAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
logger.LogInformation("Queue consumer started");
|
||||
var js = nats.CreateJetStreamContext();
|
||||
|
||||
await js.EnsureStreamCreated("pusher_events", [QueueName]);
|
||||
|
||||
var consumer = await js.CreateOrUpdateConsumerAsync(
|
||||
"pusher_events",
|
||||
new ConsumerConfig(QueueGroup), // durable consumer
|
||||
cancellationToken: stoppingToken);
|
||||
|
||||
await foreach (var msg in consumer.ConsumeAsync<byte[]>(cancellationToken: stoppingToken))
|
||||
{
|
||||
try
|
||||
{
|
||||
var message = GrpcTypeHelper.ConvertByteStringToObject<QueueMessage>(ByteString.CopyFrom(msg.Data));
|
||||
if (message is not null)
|
||||
{
|
||||
await ProcessMessageAsync(msg, message, stoppingToken);
|
||||
await msg.AckAsync(cancellationToken: stoppingToken);
|
||||
}
|
||||
else
|
||||
{
|
||||
logger.LogWarning($"Invalid message format for {msg.Subject}");
|
||||
await msg.AckAsync(cancellationToken: stoppingToken); // Acknowledge invalid messages to avoid redelivery
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
// Normal shutdown
|
||||
break;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogError(ex, "Error in queue consumer");
|
||||
await msg.NakAsync(cancellationToken: stoppingToken);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async ValueTask ProcessMessageAsync(NatsJSMsg<byte[]> rawMsg, QueueMessage message,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
using var scope = serviceProvider.CreateScope();
|
||||
|
||||
logger.LogDebug("Processing message of type {MessageType}", message.Type);
|
||||
|
||||
switch (message.Type)
|
||||
{
|
||||
case QueueMessageType.Email:
|
||||
await ProcessEmailMessageAsync(message, scope);
|
||||
break;
|
||||
|
||||
case QueueMessageType.PushNotification:
|
||||
await ProcessPushNotificationMessageAsync(message, scope, cancellationToken);
|
||||
break;
|
||||
|
||||
default:
|
||||
logger.LogWarning("Unknown message type: {MessageType}", message.Type);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
private static async Task ProcessEmailMessageAsync(QueueMessage message, IServiceScope scope)
|
||||
{
|
||||
var emailService = scope.ServiceProvider.GetRequiredService<EmailService>();
|
||||
var emailMessage = JsonSerializer.Deserialize<EmailMessage>(message.Data)
|
||||
?? throw new InvalidOperationException("Invalid email message format");
|
||||
|
||||
await emailService.SendEmailAsync(
|
||||
emailMessage.ToName,
|
||||
emailMessage.ToAddress,
|
||||
emailMessage.Subject,
|
||||
emailMessage.Body);
|
||||
}
|
||||
|
||||
private static async Task ProcessPushNotificationMessageAsync(QueueMessage message, IServiceScope scope,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var pushService = scope.ServiceProvider.GetRequiredService<PushService>();
|
||||
var logger = scope.ServiceProvider.GetRequiredService<ILogger<QueueBackgroundService>>();
|
||||
|
||||
var notification = JsonSerializer.Deserialize<Notification.Notification>(message.Data);
|
||||
if (notification == null)
|
||||
{
|
||||
logger.LogError("Invalid push notification data format");
|
||||
return;
|
||||
}
|
||||
|
||||
logger.LogDebug("Processing push notification for account {AccountId}", notification.AccountId);
|
||||
await pushService.DeliverPushNotification(notification, cancellationToken);
|
||||
logger.LogDebug("Successfully processed push notification for account {AccountId}", notification.AccountId);
|
||||
}
|
||||
}
|
65
DysonNetwork.Ring/Services/QueueService.cs
Normal file
65
DysonNetwork.Ring/Services/QueueService.cs
Normal file
@@ -0,0 +1,65 @@
|
||||
using System.Text.Json;
|
||||
using DysonNetwork.Shared.Proto;
|
||||
using NATS.Client.Core;
|
||||
using NATS.Client.JetStream;
|
||||
using NATS.Net;
|
||||
|
||||
namespace DysonNetwork.Ring.Services;
|
||||
|
||||
public class QueueService(INatsConnection nats)
|
||||
{
|
||||
public async Task EnqueueEmail(string toName, string toAddress, string subject, string body)
|
||||
{
|
||||
var message = new QueueMessage
|
||||
{
|
||||
Type = QueueMessageType.Email,
|
||||
Data = JsonSerializer.Serialize(new EmailMessage
|
||||
{
|
||||
ToName = toName,
|
||||
ToAddress = toAddress,
|
||||
Subject = subject,
|
||||
Body = body
|
||||
})
|
||||
};
|
||||
var rawMessage = GrpcTypeHelper.ConvertObjectToByteString(message).ToByteArray();
|
||||
var js = nats.CreateJetStreamContext();
|
||||
await js.PublishAsync(QueueBackgroundService.QueueName, rawMessage);
|
||||
}
|
||||
|
||||
public async Task EnqueuePushNotification(Notification.Notification notification, Guid userId, bool isSavable = false)
|
||||
{
|
||||
// Update the account ID in case it wasn't set
|
||||
notification.AccountId = userId;
|
||||
|
||||
var message = new QueueMessage
|
||||
{
|
||||
Type = QueueMessageType.PushNotification,
|
||||
TargetId = userId.ToString(),
|
||||
Data = JsonSerializer.Serialize(notification)
|
||||
};
|
||||
var rawMessage = GrpcTypeHelper.ConvertObjectToByteString(message).ToByteArray();
|
||||
var js = nats.CreateJetStreamContext();
|
||||
await js.PublishAsync(QueueBackgroundService.QueueName, rawMessage);
|
||||
}
|
||||
}
|
||||
|
||||
public class QueueMessage
|
||||
{
|
||||
public QueueMessageType Type { get; set; }
|
||||
public string? TargetId { get; set; }
|
||||
public string Data { get; set; } = string.Empty;
|
||||
}
|
||||
|
||||
public enum QueueMessageType
|
||||
{
|
||||
Email,
|
||||
PushNotification
|
||||
}
|
||||
|
||||
public class EmailMessage
|
||||
{
|
||||
public string ToName { get; set; } = string.Empty;
|
||||
public string ToAddress { get; set; } = string.Empty;
|
||||
public string Subject { get; set; } = string.Empty;
|
||||
public string Body { get; set; } = string.Empty;
|
||||
}
|
Reference in New Issue
Block a user