♻️ Move the chat part of the Sphere service to the Messager service
This commit is contained in:
345
DysonNetwork.Messager/Startup/BroadcastEventHandler.cs
Normal file
345
DysonNetwork.Messager/Startup/BroadcastEventHandler.cs
Normal file
@@ -0,0 +1,345 @@
|
||||
using System.Text.Json;
|
||||
using DysonNetwork.Shared.Proto;
|
||||
using DysonNetwork.Shared.Queue;
|
||||
using DysonNetwork.Messager.Chat;
|
||||
using Google.Protobuf;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using NATS.Client.Core;
|
||||
using NATS.Client.JetStream.Models;
|
||||
using NATS.Net;
|
||||
using NodaTime;
|
||||
using WebSocketPacket = DysonNetwork.Shared.Models.WebSocketPacket;
|
||||
|
||||
namespace DysonNetwork.Messager.Startup;
|
||||
|
||||
public class BroadcastEventHandler(
|
||||
IServiceProvider serviceProvider,
|
||||
ILogger<BroadcastEventHandler> logger,
|
||||
INatsConnection nats,
|
||||
RingService.RingServiceClient pusher
|
||||
) : BackgroundService
|
||||
{
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
var accountTask = HandleAccountDeletions(stoppingToken);
|
||||
var websocketTask = HandleWebSocketPackets(stoppingToken);
|
||||
var accountStatusTask = HandleAccountStatusUpdates(stoppingToken);
|
||||
|
||||
await Task.WhenAll(accountTask, websocketTask, accountStatusTask);
|
||||
}
|
||||
|
||||
private async Task HandleAccountDeletions(CancellationToken stoppingToken)
|
||||
{
|
||||
var js = nats.CreateJetStreamContext();
|
||||
|
||||
await js.EnsureStreamCreated("account_events", [AccountDeletedEvent.Type]);
|
||||
|
||||
var consumer = await js.CreateOrUpdateConsumerAsync("account_events",
|
||||
new ConsumerConfig("messager_account_deleted_handler"), cancellationToken: stoppingToken);
|
||||
|
||||
await foreach (var msg in consumer.ConsumeAsync<byte[]>(cancellationToken: stoppingToken))
|
||||
{
|
||||
try
|
||||
{
|
||||
var evt = JsonSerializer.Deserialize<AccountDeletedEvent>(msg.Data, GrpcTypeHelper.SerializerOptions);
|
||||
if (evt == null)
|
||||
{
|
||||
await msg.AckAsync(cancellationToken: stoppingToken);
|
||||
continue;
|
||||
}
|
||||
|
||||
logger.LogInformation("Account deleted: {AccountId}", evt.AccountId);
|
||||
|
||||
using var scope = serviceProvider.CreateScope();
|
||||
var db = scope.ServiceProvider.GetRequiredService<AppDatabase>();
|
||||
|
||||
await db.ChatMembers
|
||||
.Where(m => m.AccountId == evt.AccountId)
|
||||
.ExecuteDeleteAsync(cancellationToken: stoppingToken);
|
||||
|
||||
await using var transaction = await db.Database.BeginTransactionAsync(cancellationToken: stoppingToken);
|
||||
try
|
||||
{
|
||||
var now = SystemClock.Instance.GetCurrentInstant();
|
||||
await db.ChatMessages
|
||||
.Where(m => m.Sender.AccountId == evt.AccountId)
|
||||
.ExecuteUpdateAsync(c => c.SetProperty(p => p.DeletedAt, now), stoppingToken);
|
||||
|
||||
await db.ChatReactions
|
||||
.Where(r => r.Sender.AccountId == evt.AccountId)
|
||||
.ExecuteUpdateAsync(c => c.SetProperty(p => p.DeletedAt, now), stoppingToken);
|
||||
|
||||
await db.ChatMembers
|
||||
.Where(m => m.AccountId == evt.AccountId)
|
||||
.ExecuteUpdateAsync(c => c.SetProperty(p => p.DeletedAt, now), stoppingToken);
|
||||
|
||||
await transaction.CommitAsync(cancellationToken: stoppingToken);
|
||||
}
|
||||
catch (Exception)
|
||||
{
|
||||
await transaction.RollbackAsync(cancellationToken: stoppingToken);
|
||||
throw;
|
||||
}
|
||||
|
||||
await msg.AckAsync(cancellationToken: stoppingToken);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogError(ex, "Error processing AccountDeleted");
|
||||
await msg.NakAsync(cancellationToken: stoppingToken);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task HandleWebSocketPackets(CancellationToken stoppingToken)
|
||||
{
|
||||
await foreach (var msg in nats.SubscribeAsync<byte[]>(
|
||||
WebSocketPacketEvent.SubjectPrefix + "sphere", cancellationToken: stoppingToken))
|
||||
{
|
||||
logger.LogDebug("Handling websocket packet...");
|
||||
|
||||
try
|
||||
{
|
||||
var evt = JsonSerializer.Deserialize<WebSocketPacketEvent>(msg.Data, GrpcTypeHelper.SerializerOptions);
|
||||
if (evt == null) throw new ArgumentNullException(nameof(evt));
|
||||
var packet = WebSocketPacket.FromBytes(evt.PacketBytes);
|
||||
logger.LogInformation("Handling websocket packet... {Type}", packet.Type);
|
||||
switch (packet.Type)
|
||||
{
|
||||
case "messages.read":
|
||||
await HandleMessageRead(evt, packet);
|
||||
break;
|
||||
case "messages.typing":
|
||||
await HandleMessageTyping(evt, packet);
|
||||
break;
|
||||
case "messages.subscribe":
|
||||
await HandleMessageSubscribe(evt, packet);
|
||||
break;
|
||||
case "messages.unsubscribe":
|
||||
await HandleMessageUnsubscribe(evt, packet);
|
||||
break;
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogError(ex, "Error processing websocket packet");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task HandleMessageRead(WebSocketPacketEvent evt, WebSocketPacket packet)
|
||||
{
|
||||
using var scope = serviceProvider.CreateScope();
|
||||
var cs = scope.ServiceProvider.GetRequiredService<ChatService>();
|
||||
var crs = scope.ServiceProvider.GetRequiredService<ChatRoomService>();
|
||||
|
||||
if (packet.Data == null)
|
||||
{
|
||||
await SendErrorResponse(evt, "Mark message as read requires you to provide the ChatRoomId");
|
||||
return;
|
||||
}
|
||||
|
||||
var requestData = packet.GetData<Chat.ChatController.MarkMessageReadRequest>();
|
||||
if (requestData == null)
|
||||
{
|
||||
await SendErrorResponse(evt, "Invalid request data");
|
||||
return;
|
||||
}
|
||||
|
||||
var sender = await crs.GetRoomMember(evt.AccountId, requestData.ChatRoomId);
|
||||
if (sender == null)
|
||||
{
|
||||
await SendErrorResponse(evt, "User is not a member of the chat room.");
|
||||
return;
|
||||
}
|
||||
|
||||
await cs.ReadChatRoomAsync(requestData.ChatRoomId, evt.AccountId);
|
||||
}
|
||||
|
||||
private async Task HandleMessageTyping(WebSocketPacketEvent evt, WebSocketPacket packet)
|
||||
{
|
||||
using var scope = serviceProvider.CreateScope();
|
||||
var crs = scope.ServiceProvider.GetRequiredService<ChatRoomService>();
|
||||
|
||||
if (packet.Data == null)
|
||||
{
|
||||
await SendErrorResponse(evt, "messages.typing requires you to provide the ChatRoomId");
|
||||
return;
|
||||
}
|
||||
|
||||
var requestData = packet.GetData<Chat.ChatController.ChatRoomWsUniversalRequest>();
|
||||
if (requestData == null)
|
||||
{
|
||||
await SendErrorResponse(evt, "Invalid request data");
|
||||
return;
|
||||
}
|
||||
|
||||
var sender = await crs.GetRoomMember(evt.AccountId, requestData.ChatRoomId);
|
||||
if (sender == null)
|
||||
{
|
||||
await SendErrorResponse(evt, "User is not a member of the chat room.");
|
||||
return;
|
||||
}
|
||||
|
||||
var responsePacket = new WebSocketPacket
|
||||
{
|
||||
Type = "messages.typing",
|
||||
Data = new
|
||||
{
|
||||
room_id = sender.ChatRoomId,
|
||||
sender_id = sender.Id,
|
||||
sender
|
||||
}
|
||||
};
|
||||
|
||||
// Broadcast typing indicator to subscribed room members only
|
||||
var subscribedMemberIds = await crs.GetSubscribedMembers(requestData.ChatRoomId);
|
||||
var roomMembers = await crs.ListRoomMembers(requestData.ChatRoomId);
|
||||
|
||||
// Filter to subscribed members excluding the current user
|
||||
var subscribedMembers = roomMembers
|
||||
.Where(m => subscribedMemberIds.Contains(m.Id) && m.AccountId != evt.AccountId)
|
||||
.Select(m => m.AccountId.ToString())
|
||||
.ToList();
|
||||
|
||||
if (subscribedMembers.Count > 0)
|
||||
{
|
||||
var respRequest = new PushWebSocketPacketToUsersRequest { Packet = responsePacket.ToProtoValue() };
|
||||
respRequest.UserIds.AddRange(subscribedMembers);
|
||||
await pusher.PushWebSocketPacketToUsersAsync(respRequest);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task HandleMessageSubscribe(WebSocketPacketEvent evt, WebSocketPacket packet)
|
||||
{
|
||||
using var scope = serviceProvider.CreateScope();
|
||||
var crs = scope.ServiceProvider.GetRequiredService<ChatRoomService>();
|
||||
|
||||
if (packet.Data == null)
|
||||
{
|
||||
await SendErrorResponse(evt, "messages.subscribe requires you to provide the ChatRoomId");
|
||||
return;
|
||||
}
|
||||
|
||||
var requestData = packet.GetData<Chat.ChatController.ChatRoomWsUniversalRequest>();
|
||||
if (requestData == null)
|
||||
{
|
||||
await SendErrorResponse(evt, "Invalid request data");
|
||||
return;
|
||||
}
|
||||
|
||||
var sender = await crs.GetRoomMember(evt.AccountId, requestData.ChatRoomId);
|
||||
if (sender == null)
|
||||
{
|
||||
await SendErrorResponse(evt, "User is not a member of the chat room.");
|
||||
return;
|
||||
}
|
||||
|
||||
await crs.SubscribeChatRoom(sender);
|
||||
}
|
||||
|
||||
private async Task HandleMessageUnsubscribe(WebSocketPacketEvent evt, WebSocketPacket packet)
|
||||
{
|
||||
using var scope = serviceProvider.CreateScope();
|
||||
var crs = scope.ServiceProvider.GetRequiredService<ChatRoomService>();
|
||||
|
||||
if (packet.Data == null)
|
||||
{
|
||||
await SendErrorResponse(evt, "messages.unsubscribe requires you to provide the ChatRoomId");
|
||||
return;
|
||||
}
|
||||
|
||||
var requestData = packet.GetData<Chat.ChatController.ChatRoomWsUniversalRequest>();
|
||||
if (requestData == null)
|
||||
{
|
||||
await SendErrorResponse(evt, "Invalid request data");
|
||||
return;
|
||||
}
|
||||
|
||||
var sender = await crs.GetRoomMember(evt.AccountId, requestData.ChatRoomId);
|
||||
if (sender == null)
|
||||
{
|
||||
await SendErrorResponse(evt, "User is not a member of the chat room.");
|
||||
return;
|
||||
}
|
||||
|
||||
await crs.UnsubscribeChatRoom(sender);
|
||||
}
|
||||
|
||||
private async Task HandleAccountStatusUpdates(CancellationToken stoppingToken)
|
||||
{
|
||||
await foreach (var msg in nats.SubscribeAsync<byte[]>(AccountStatusUpdatedEvent.Type,
|
||||
cancellationToken: stoppingToken))
|
||||
{
|
||||
try
|
||||
{
|
||||
var evt =
|
||||
GrpcTypeHelper.ConvertByteStringToObject<AccountStatusUpdatedEvent>(ByteString.CopyFrom(msg.Data));
|
||||
if (evt == null)
|
||||
continue;
|
||||
|
||||
logger.LogInformation("Account status updated: {AccountId}", evt.AccountId);
|
||||
|
||||
await using var scope = serviceProvider.CreateAsyncScope();
|
||||
var db = scope.ServiceProvider.GetRequiredService<AppDatabase>();
|
||||
var chatRoomService = scope.ServiceProvider.GetRequiredService<ChatRoomService>();
|
||||
|
||||
// Get user's joined chat rooms
|
||||
var userRooms = await db.ChatMembers
|
||||
.Where(m => m.AccountId == evt.AccountId && m.JoinedAt != null && m.LeaveAt == null)
|
||||
.Select(m => m.ChatRoomId)
|
||||
.ToListAsync(cancellationToken: stoppingToken);
|
||||
|
||||
// Send WebSocket packet to subscribed users per room
|
||||
foreach (var roomId in userRooms)
|
||||
{
|
||||
var members = await chatRoomService.ListRoomMembers(roomId);
|
||||
var subscribedMemberIds = await chatRoomService.GetSubscribedMembers(roomId);
|
||||
var subscribedUsers = members
|
||||
.Where(m => subscribedMemberIds.Contains(m.Id))
|
||||
.Select(m => m.AccountId.ToString())
|
||||
.ToList();
|
||||
|
||||
if (subscribedUsers.Count == 0) continue;
|
||||
var packet = new WebSocketPacket
|
||||
{
|
||||
Type = "accounts.status.update",
|
||||
Data = new Dictionary<string, object>
|
||||
{
|
||||
["status"] = evt.Status,
|
||||
["chat_room_id"] = roomId
|
||||
}
|
||||
};
|
||||
|
||||
var request = new PushWebSocketPacketToUsersRequest
|
||||
{
|
||||
Packet = packet.ToProtoValue()
|
||||
};
|
||||
request.UserIds.AddRange(subscribedUsers);
|
||||
|
||||
await pusher.PushWebSocketPacketToUsersAsync(request, cancellationToken: stoppingToken);
|
||||
|
||||
logger.LogInformation("Sent status update for room {roomId} to {count} subscribed users", roomId,
|
||||
subscribedUsers.Count);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogError(ex, "Error processing AccountStatusUpdated");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task SendErrorResponse(WebSocketPacketEvent evt, string message)
|
||||
{
|
||||
await pusher.PushWebSocketPacketToDeviceAsync(new PushWebSocketPacketToDeviceRequest
|
||||
{
|
||||
DeviceId = evt.DeviceId,
|
||||
Packet = new WebSocketPacket
|
||||
{
|
||||
Type = "error",
|
||||
ErrorMessage = message
|
||||
}.ToProtoValue()
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,8 @@
|
||||
using System.Globalization;
|
||||
using System.Text.Json;
|
||||
using System.Text.Json.Serialization;
|
||||
using DysonNetwork.Messager.Chat;
|
||||
using DysonNetwork.Messager.Chat.Realtime;
|
||||
using NodaTime;
|
||||
using NodaTime.Serialization.SystemTextJson;
|
||||
|
||||
@@ -8,45 +10,52 @@ namespace DysonNetwork.Messager.Startup;
|
||||
|
||||
public static class ServiceCollectionExtensions
|
||||
{
|
||||
public static IServiceCollection AddAppServices(this IServiceCollection services)
|
||||
extension(IServiceCollection services)
|
||||
{
|
||||
services.AddDbContext<AppDatabase>();
|
||||
services.AddHttpContextAccessor();
|
||||
|
||||
services.AddHttpClient();
|
||||
|
||||
services
|
||||
.AddControllers()
|
||||
.AddJsonOptions(options =>
|
||||
{
|
||||
options.JsonSerializerOptions.NumberHandling =
|
||||
JsonNumberHandling.AllowNamedFloatingPointLiterals;
|
||||
options.JsonSerializerOptions.PropertyNamingPolicy =
|
||||
JsonNamingPolicy.SnakeCaseLower;
|
||||
|
||||
options.JsonSerializerOptions.ConfigureForNodaTime(DateTimeZoneProviders.Tzdb);
|
||||
});
|
||||
|
||||
services.AddGrpc(options =>
|
||||
public IServiceCollection AddAppServices()
|
||||
{
|
||||
options.EnableDetailedErrors = true;
|
||||
});
|
||||
services.AddGrpcReflection();
|
||||
services.AddDbContext<AppDatabase>();
|
||||
services.AddHttpContextAccessor();
|
||||
|
||||
return services;
|
||||
}
|
||||
services.AddHttpClient();
|
||||
|
||||
public static IServiceCollection AddAppAuthentication(this IServiceCollection services)
|
||||
{
|
||||
services.AddAuthorization();
|
||||
return services;
|
||||
}
|
||||
services
|
||||
.AddControllers()
|
||||
.AddJsonOptions(options =>
|
||||
{
|
||||
options.JsonSerializerOptions.NumberHandling =
|
||||
JsonNumberHandling.AllowNamedFloatingPointLiterals;
|
||||
options.JsonSerializerOptions.PropertyNamingPolicy =
|
||||
JsonNamingPolicy.SnakeCaseLower;
|
||||
|
||||
public static IServiceCollection AddAppBusinessServices(
|
||||
this IServiceCollection services,
|
||||
IConfiguration configuration
|
||||
)
|
||||
{
|
||||
return services;
|
||||
options.JsonSerializerOptions.ConfigureForNodaTime(DateTimeZoneProviders.Tzdb);
|
||||
});
|
||||
|
||||
services.AddGrpc(options =>
|
||||
{
|
||||
options.EnableDetailedErrors = true;
|
||||
});
|
||||
services.AddGrpcReflection();
|
||||
|
||||
return services;
|
||||
}
|
||||
|
||||
public IServiceCollection AddAppAuthentication()
|
||||
{
|
||||
services.AddAuthorization();
|
||||
return services;
|
||||
}
|
||||
|
||||
public IServiceCollection AddAppBusinessServices(IConfiguration configuration
|
||||
)
|
||||
{
|
||||
services.AddScoped<ChatRoomService>();
|
||||
services.AddScoped<ChatService>();
|
||||
services.AddScoped<IRealtimeService, LiveKitRealtimeService>();
|
||||
|
||||
services.AddHostedService<BroadcastEventHandler>();
|
||||
|
||||
return services;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user