345 lines
13 KiB
C#
345 lines
13 KiB
C#
using System.Text.Json;
|
|
using DysonNetwork.Shared.Proto;
|
|
using DysonNetwork.Shared.Queue;
|
|
using DysonNetwork.Messager.Chat;
|
|
using Google.Protobuf;
|
|
using Microsoft.EntityFrameworkCore;
|
|
using NATS.Client.Core;
|
|
using NATS.Client.JetStream.Models;
|
|
using NATS.Net;
|
|
using NodaTime;
|
|
using WebSocketPacket = DysonNetwork.Shared.Models.WebSocketPacket;
|
|
|
|
namespace DysonNetwork.Messager.Startup;
|
|
|
|
public class BroadcastEventHandler(
|
|
IServiceProvider serviceProvider,
|
|
ILogger<BroadcastEventHandler> logger,
|
|
INatsConnection nats,
|
|
RingService.RingServiceClient pusher
|
|
) : BackgroundService
|
|
{
|
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
|
{
|
|
var accountTask = HandleAccountDeletions(stoppingToken);
|
|
var websocketTask = HandleWebSocketPackets(stoppingToken);
|
|
var accountStatusTask = HandleAccountStatusUpdates(stoppingToken);
|
|
|
|
await Task.WhenAll(accountTask, websocketTask, accountStatusTask);
|
|
}
|
|
|
|
private async Task HandleAccountDeletions(CancellationToken stoppingToken)
|
|
{
|
|
var js = nats.CreateJetStreamContext();
|
|
|
|
await js.EnsureStreamCreated("account_events", [AccountDeletedEvent.Type]);
|
|
|
|
var consumer = await js.CreateOrUpdateConsumerAsync("account_events",
|
|
new ConsumerConfig("messager_account_deleted_handler"), cancellationToken: stoppingToken);
|
|
|
|
await foreach (var msg in consumer.ConsumeAsync<byte[]>(cancellationToken: stoppingToken))
|
|
{
|
|
try
|
|
{
|
|
var evt = JsonSerializer.Deserialize<AccountDeletedEvent>(msg.Data, GrpcTypeHelper.SerializerOptions);
|
|
if (evt == null)
|
|
{
|
|
await msg.AckAsync(cancellationToken: stoppingToken);
|
|
continue;
|
|
}
|
|
|
|
logger.LogInformation("Account deleted: {AccountId}", evt.AccountId);
|
|
|
|
using var scope = serviceProvider.CreateScope();
|
|
var db = scope.ServiceProvider.GetRequiredService<AppDatabase>();
|
|
|
|
await db.ChatMembers
|
|
.Where(m => m.AccountId == evt.AccountId)
|
|
.ExecuteDeleteAsync(cancellationToken: stoppingToken);
|
|
|
|
await using var transaction = await db.Database.BeginTransactionAsync(cancellationToken: stoppingToken);
|
|
try
|
|
{
|
|
var now = SystemClock.Instance.GetCurrentInstant();
|
|
await db.ChatMessages
|
|
.Where(m => m.Sender.AccountId == evt.AccountId)
|
|
.ExecuteUpdateAsync(c => c.SetProperty(p => p.DeletedAt, now), stoppingToken);
|
|
|
|
await db.ChatReactions
|
|
.Where(r => r.Sender.AccountId == evt.AccountId)
|
|
.ExecuteUpdateAsync(c => c.SetProperty(p => p.DeletedAt, now), stoppingToken);
|
|
|
|
await db.ChatMembers
|
|
.Where(m => m.AccountId == evt.AccountId)
|
|
.ExecuteUpdateAsync(c => c.SetProperty(p => p.DeletedAt, now), stoppingToken);
|
|
|
|
await transaction.CommitAsync(cancellationToken: stoppingToken);
|
|
}
|
|
catch (Exception)
|
|
{
|
|
await transaction.RollbackAsync(cancellationToken: stoppingToken);
|
|
throw;
|
|
}
|
|
|
|
await msg.AckAsync(cancellationToken: stoppingToken);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
logger.LogError(ex, "Error processing AccountDeleted");
|
|
await msg.NakAsync(cancellationToken: stoppingToken);
|
|
}
|
|
}
|
|
}
|
|
|
|
private async Task HandleWebSocketPackets(CancellationToken stoppingToken)
|
|
{
|
|
await foreach (var msg in nats.SubscribeAsync<byte[]>(
|
|
WebSocketPacketEvent.SubjectPrefix + "sphere", cancellationToken: stoppingToken))
|
|
{
|
|
logger.LogDebug("Handling websocket packet...");
|
|
|
|
try
|
|
{
|
|
var evt = JsonSerializer.Deserialize<WebSocketPacketEvent>(msg.Data, GrpcTypeHelper.SerializerOptions);
|
|
if (evt == null) throw new ArgumentNullException(nameof(evt));
|
|
var packet = WebSocketPacket.FromBytes(evt.PacketBytes);
|
|
logger.LogInformation("Handling websocket packet... {Type}", packet.Type);
|
|
switch (packet.Type)
|
|
{
|
|
case "messages.read":
|
|
await HandleMessageRead(evt, packet);
|
|
break;
|
|
case "messages.typing":
|
|
await HandleMessageTyping(evt, packet);
|
|
break;
|
|
case "messages.subscribe":
|
|
await HandleMessageSubscribe(evt, packet);
|
|
break;
|
|
case "messages.unsubscribe":
|
|
await HandleMessageUnsubscribe(evt, packet);
|
|
break;
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
logger.LogError(ex, "Error processing websocket packet");
|
|
}
|
|
}
|
|
}
|
|
|
|
private async Task HandleMessageRead(WebSocketPacketEvent evt, WebSocketPacket packet)
|
|
{
|
|
using var scope = serviceProvider.CreateScope();
|
|
var cs = scope.ServiceProvider.GetRequiredService<ChatService>();
|
|
var crs = scope.ServiceProvider.GetRequiredService<ChatRoomService>();
|
|
|
|
if (packet.Data == null)
|
|
{
|
|
await SendErrorResponse(evt, "Mark message as read requires you to provide the ChatRoomId");
|
|
return;
|
|
}
|
|
|
|
var requestData = packet.GetData<Chat.ChatController.MarkMessageReadRequest>();
|
|
if (requestData == null)
|
|
{
|
|
await SendErrorResponse(evt, "Invalid request data");
|
|
return;
|
|
}
|
|
|
|
var sender = await crs.GetRoomMember(evt.AccountId, requestData.ChatRoomId);
|
|
if (sender == null)
|
|
{
|
|
await SendErrorResponse(evt, "User is not a member of the chat room.");
|
|
return;
|
|
}
|
|
|
|
await cs.ReadChatRoomAsync(requestData.ChatRoomId, evt.AccountId);
|
|
}
|
|
|
|
private async Task HandleMessageTyping(WebSocketPacketEvent evt, WebSocketPacket packet)
|
|
{
|
|
using var scope = serviceProvider.CreateScope();
|
|
var crs = scope.ServiceProvider.GetRequiredService<ChatRoomService>();
|
|
|
|
if (packet.Data == null)
|
|
{
|
|
await SendErrorResponse(evt, "messages.typing requires you to provide the ChatRoomId");
|
|
return;
|
|
}
|
|
|
|
var requestData = packet.GetData<Chat.ChatController.ChatRoomWsUniversalRequest>();
|
|
if (requestData == null)
|
|
{
|
|
await SendErrorResponse(evt, "Invalid request data");
|
|
return;
|
|
}
|
|
|
|
var sender = await crs.GetRoomMember(evt.AccountId, requestData.ChatRoomId);
|
|
if (sender == null)
|
|
{
|
|
await SendErrorResponse(evt, "User is not a member of the chat room.");
|
|
return;
|
|
}
|
|
|
|
var responsePacket = new WebSocketPacket
|
|
{
|
|
Type = "messages.typing",
|
|
Data = new
|
|
{
|
|
room_id = sender.ChatRoomId,
|
|
sender_id = sender.Id,
|
|
sender
|
|
}
|
|
};
|
|
|
|
// Broadcast typing indicator to subscribed room members only
|
|
var subscribedMemberIds = await crs.GetSubscribedMembers(requestData.ChatRoomId);
|
|
var roomMembers = await crs.ListRoomMembers(requestData.ChatRoomId);
|
|
|
|
// Filter to subscribed members excluding the current user
|
|
var subscribedMembers = roomMembers
|
|
.Where(m => subscribedMemberIds.Contains(m.Id) && m.AccountId != evt.AccountId)
|
|
.Select(m => m.AccountId.ToString())
|
|
.ToList();
|
|
|
|
if (subscribedMembers.Count > 0)
|
|
{
|
|
var respRequest = new PushWebSocketPacketToUsersRequest { Packet = responsePacket.ToProtoValue() };
|
|
respRequest.UserIds.AddRange(subscribedMembers);
|
|
await pusher.PushWebSocketPacketToUsersAsync(respRequest);
|
|
}
|
|
}
|
|
|
|
private async Task HandleMessageSubscribe(WebSocketPacketEvent evt, WebSocketPacket packet)
|
|
{
|
|
using var scope = serviceProvider.CreateScope();
|
|
var crs = scope.ServiceProvider.GetRequiredService<ChatRoomService>();
|
|
|
|
if (packet.Data == null)
|
|
{
|
|
await SendErrorResponse(evt, "messages.subscribe requires you to provide the ChatRoomId");
|
|
return;
|
|
}
|
|
|
|
var requestData = packet.GetData<Chat.ChatController.ChatRoomWsUniversalRequest>();
|
|
if (requestData == null)
|
|
{
|
|
await SendErrorResponse(evt, "Invalid request data");
|
|
return;
|
|
}
|
|
|
|
var sender = await crs.GetRoomMember(evt.AccountId, requestData.ChatRoomId);
|
|
if (sender == null)
|
|
{
|
|
await SendErrorResponse(evt, "User is not a member of the chat room.");
|
|
return;
|
|
}
|
|
|
|
await crs.SubscribeChatRoom(sender);
|
|
}
|
|
|
|
private async Task HandleMessageUnsubscribe(WebSocketPacketEvent evt, WebSocketPacket packet)
|
|
{
|
|
using var scope = serviceProvider.CreateScope();
|
|
var crs = scope.ServiceProvider.GetRequiredService<ChatRoomService>();
|
|
|
|
if (packet.Data == null)
|
|
{
|
|
await SendErrorResponse(evt, "messages.unsubscribe requires you to provide the ChatRoomId");
|
|
return;
|
|
}
|
|
|
|
var requestData = packet.GetData<Chat.ChatController.ChatRoomWsUniversalRequest>();
|
|
if (requestData == null)
|
|
{
|
|
await SendErrorResponse(evt, "Invalid request data");
|
|
return;
|
|
}
|
|
|
|
var sender = await crs.GetRoomMember(evt.AccountId, requestData.ChatRoomId);
|
|
if (sender == null)
|
|
{
|
|
await SendErrorResponse(evt, "User is not a member of the chat room.");
|
|
return;
|
|
}
|
|
|
|
await crs.UnsubscribeChatRoom(sender);
|
|
}
|
|
|
|
private async Task HandleAccountStatusUpdates(CancellationToken stoppingToken)
|
|
{
|
|
await foreach (var msg in nats.SubscribeAsync<byte[]>(AccountStatusUpdatedEvent.Type,
|
|
cancellationToken: stoppingToken))
|
|
{
|
|
try
|
|
{
|
|
var evt =
|
|
GrpcTypeHelper.ConvertByteStringToObject<AccountStatusUpdatedEvent>(ByteString.CopyFrom(msg.Data));
|
|
if (evt == null)
|
|
continue;
|
|
|
|
logger.LogInformation("Account status updated: {AccountId}", evt.AccountId);
|
|
|
|
await using var scope = serviceProvider.CreateAsyncScope();
|
|
var db = scope.ServiceProvider.GetRequiredService<AppDatabase>();
|
|
var chatRoomService = scope.ServiceProvider.GetRequiredService<ChatRoomService>();
|
|
|
|
// Get user's joined chat rooms
|
|
var userRooms = await db.ChatMembers
|
|
.Where(m => m.AccountId == evt.AccountId && m.JoinedAt != null && m.LeaveAt == null)
|
|
.Select(m => m.ChatRoomId)
|
|
.ToListAsync(cancellationToken: stoppingToken);
|
|
|
|
// Send WebSocket packet to subscribed users per room
|
|
foreach (var roomId in userRooms)
|
|
{
|
|
var members = await chatRoomService.ListRoomMembers(roomId);
|
|
var subscribedMemberIds = await chatRoomService.GetSubscribedMembers(roomId);
|
|
var subscribedUsers = members
|
|
.Where(m => subscribedMemberIds.Contains(m.Id))
|
|
.Select(m => m.AccountId.ToString())
|
|
.ToList();
|
|
|
|
if (subscribedUsers.Count == 0) continue;
|
|
var packet = new WebSocketPacket
|
|
{
|
|
Type = "accounts.status.update",
|
|
Data = new Dictionary<string, object>
|
|
{
|
|
["status"] = evt.Status,
|
|
["chat_room_id"] = roomId
|
|
}
|
|
};
|
|
|
|
var request = new PushWebSocketPacketToUsersRequest
|
|
{
|
|
Packet = packet.ToProtoValue()
|
|
};
|
|
request.UserIds.AddRange(subscribedUsers);
|
|
|
|
await pusher.PushWebSocketPacketToUsersAsync(request, cancellationToken: stoppingToken);
|
|
|
|
logger.LogInformation("Sent status update for room {roomId} to {count} subscribed users", roomId,
|
|
subscribedUsers.Count);
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
logger.LogError(ex, "Error processing AccountStatusUpdated");
|
|
}
|
|
}
|
|
}
|
|
|
|
private async Task SendErrorResponse(WebSocketPacketEvent evt, string message)
|
|
{
|
|
await pusher.PushWebSocketPacketToDeviceAsync(new PushWebSocketPacketToDeviceRequest
|
|
{
|
|
DeviceId = evt.DeviceId,
|
|
Packet = new WebSocketPacket
|
|
{
|
|
Type = "error",
|
|
ErrorMessage = message
|
|
}.ToProtoValue()
|
|
});
|
|
}
|
|
} |