415 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			C#
		
	
	
	
	
	
			
		
		
	
	
			415 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			C#
		
	
	
	
	
	
| using System.Text.Json;
 | |
| using System.Text.Json.Serialization;
 | |
| using DysonNetwork.Shared.Models;
 | |
| using DysonNetwork.Shared.Proto;
 | |
| using DysonNetwork.Shared.Stream;
 | |
| using DysonNetwork.Sphere.Chat;
 | |
| using DysonNetwork.Sphere.Post;
 | |
| using Google.Protobuf;
 | |
| using Microsoft.EntityFrameworkCore;
 | |
| using NATS.Client.Core;
 | |
| using NATS.Client.JetStream.Models;
 | |
| using NATS.Net;
 | |
| using WebSocketPacket = DysonNetwork.Shared.Models.WebSocketPacket;
 | |
| 
 | |
| namespace DysonNetwork.Sphere.Startup;
 | |
| 
 | |
| public class PaymentOrderAwardEvent : PaymentOrderEventBase
 | |
| {
 | |
|     public PaymentOrderAwardMeta Meta { get; set; } = null!;
 | |
| }
 | |
| 
 | |
| public class PaymentOrderAwardMeta
 | |
| {
 | |
|     [JsonPropertyName("account_id")] public Guid AccountId { get; set; }
 | |
|     [JsonPropertyName("post_id")] public Guid PostId { get; set; }
 | |
|     [JsonPropertyName("amount")] public string Amount { get; set; } = null!;
 | |
|     [JsonPropertyName("attitude")] public Shared.Models.PostReactionAttitude Attitude { get; set; }
 | |
|     [JsonPropertyName("message")] public string? Message { get; set; }
 | |
| }
 | |
| 
 | |
| public class BroadcastEventHandler(
 | |
|     IServiceProvider serviceProvider,
 | |
|     ILogger<BroadcastEventHandler> logger,
 | |
|     INatsConnection nats,
 | |
|     RingService.RingServiceClient pusher
 | |
| ) : BackgroundService
 | |
| {
 | |
|     protected override async Task ExecuteAsync(CancellationToken stoppingToken)
 | |
|     {
 | |
|         var paymentTask = HandlePaymentOrders(stoppingToken);
 | |
|         var accountTask = HandleAccountDeletions(stoppingToken);
 | |
|         var websocketTask = HandleWebSocketPackets(stoppingToken);
 | |
|         var accountStatusTask = HandleAccountStatusUpdates(stoppingToken);
 | |
| 
 | |
|         await Task.WhenAll(paymentTask, accountTask, websocketTask, accountStatusTask);
 | |
|     }
 | |
| 
 | |
|     private async Task HandlePaymentOrders(CancellationToken stoppingToken)
 | |
|     {
 | |
|         var js = nats.CreateJetStreamContext();
 | |
| 
 | |
|         await js.EnsureStreamCreated("payment_events", [PaymentOrderEventBase.Type]);
 | |
| 
 | |
|         var consumer = await js.CreateOrUpdateConsumerAsync("payment_events",
 | |
|             new ConsumerConfig("sphere_payment_handler"), cancellationToken: stoppingToken);
 | |
| 
 | |
|         await foreach (var msg in consumer.ConsumeAsync<byte[]>(cancellationToken: stoppingToken))
 | |
|         {
 | |
|             PaymentOrderEvent? evt = null;
 | |
|             try
 | |
|             {
 | |
|                 evt = JsonSerializer.Deserialize<PaymentOrderEvent>(msg.Data, GrpcTypeHelper.SerializerOptions);
 | |
| 
 | |
|                 logger.LogInformation(
 | |
|                     "Received order event: {ProductIdentifier} {OrderId}",
 | |
|                     evt?.ProductIdentifier,
 | |
|                     evt?.OrderId
 | |
|                 );
 | |
| 
 | |
|                 if (evt?.ProductIdentifier is null)
 | |
|                     continue;
 | |
| 
 | |
|                 switch (evt.ProductIdentifier)
 | |
|                 {
 | |
|                     case "posts.award":
 | |
|                         {
 | |
|                             var awardEvt = JsonSerializer.Deserialize<PaymentOrderAwardEvent>(msg.Data, GrpcTypeHelper.SerializerOptions);
 | |
|                             if (awardEvt?.Meta == null) throw new ArgumentNullException(nameof(awardEvt));
 | |
| 
 | |
|                             var meta = awardEvt.Meta;
 | |
| 
 | |
|                             logger.LogInformation("Handling post award order: {OrderId}", evt.OrderId);
 | |
| 
 | |
|                             await using var scope = serviceProvider.CreateAsyncScope();
 | |
|                             var ps = scope.ServiceProvider.GetRequiredService<Post.PostService>();
 | |
| 
 | |
|                             var amountNum = decimal.Parse(meta.Amount);
 | |
| 
 | |
|                             await ps.AwardPost(meta.PostId, meta.AccountId, amountNum, meta.Attitude, meta.Message);
 | |
| 
 | |
|                             logger.LogInformation("Post award for order {OrderId} handled successfully.", evt.OrderId);
 | |
|                             await msg.AckAsync(cancellationToken: stoppingToken);
 | |
|                             break;
 | |
|                         }
 | |
|                     default:
 | |
|                         // ignore
 | |
|                         break;
 | |
|                 }
 | |
|             }
 | |
|             catch (Exception ex)
 | |
|             {
 | |
|                 logger.LogError(ex, "Error processing payment order event for order {OrderId}", evt?.OrderId);
 | |
|                 await msg.NakAsync(cancellationToken: stoppingToken);
 | |
|             }
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     private async Task HandleAccountDeletions(CancellationToken stoppingToken)
 | |
|     {
 | |
|         var js = nats.CreateJetStreamContext();
 | |
| 
 | |
|         await js.EnsureStreamCreated("account_events", [AccountDeletedEvent.Type]);
 | |
| 
 | |
|         var consumer = await js.CreateOrUpdateConsumerAsync("account_events",
 | |
|             new ConsumerConfig("sphere_account_deleted_handler"), cancellationToken: stoppingToken);
 | |
| 
 | |
|         await foreach (var msg in consumer.ConsumeAsync<byte[]>(cancellationToken: stoppingToken))
 | |
|         {
 | |
|             try
 | |
|             {
 | |
|                 var evt = JsonSerializer.Deserialize<AccountDeletedEvent>(msg.Data, GrpcTypeHelper.SerializerOptions);
 | |
|                 if (evt == null)
 | |
|                 {
 | |
|                     await msg.AckAsync(cancellationToken: stoppingToken);
 | |
|                     continue;
 | |
|                 }
 | |
| 
 | |
|                 logger.LogInformation("Account deleted: {AccountId}", evt.AccountId);
 | |
| 
 | |
|                 using var scope = serviceProvider.CreateScope();
 | |
|                 var db = scope.ServiceProvider.GetRequiredService<AppDatabase>();
 | |
| 
 | |
|                 await db.ChatMembers
 | |
|                     .Where(m => m.AccountId == evt.AccountId)
 | |
|                     .ExecuteDeleteAsync(cancellationToken: stoppingToken);
 | |
| 
 | |
|                 await using var transaction = await db.Database.BeginTransactionAsync(cancellationToken: stoppingToken);
 | |
|                 try
 | |
|                 {
 | |
|                     var publishers = await db.Publishers
 | |
|                         .Where(p => p.Members.All(m => m.AccountId == evt.AccountId))
 | |
|                         .ToListAsync(cancellationToken: stoppingToken);
 | |
| 
 | |
|                     foreach (var publisher in publishers)
 | |
|                         await db.Posts
 | |
|                             .Where(p => p.PublisherId == publisher.Id)
 | |
|                             .ExecuteDeleteAsync(cancellationToken: stoppingToken);
 | |
| 
 | |
|                     var publisherIds = publishers.Select(p => p.Id).ToList();
 | |
|                     await db.Publishers
 | |
|                         .Where(p => publisherIds.Contains(p.Id))
 | |
|                         .ExecuteDeleteAsync(cancellationToken: stoppingToken);
 | |
| 
 | |
|                     await transaction.CommitAsync(cancellationToken: stoppingToken);
 | |
|                 }
 | |
|                 catch (Exception)
 | |
|                 {
 | |
|                     await transaction.RollbackAsync(cancellationToken: stoppingToken);
 | |
|                     throw;
 | |
|                 }
 | |
| 
 | |
|                 await msg.AckAsync(cancellationToken: stoppingToken);
 | |
|             }
 | |
|             catch (Exception ex)
 | |
|             {
 | |
|                 logger.LogError(ex, "Error processing AccountDeleted");
 | |
|                 await msg.NakAsync(cancellationToken: stoppingToken);
 | |
|             }
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     private async Task HandleWebSocketPackets(CancellationToken stoppingToken)
 | |
|     {
 | |
|         await foreach (var msg in nats.SubscribeAsync<byte[]>(
 | |
|                            WebSocketPacketEvent.SubjectPrefix + "sphere", cancellationToken: stoppingToken))
 | |
|         {
 | |
|             logger.LogDebug("Handling websocket packet...");
 | |
| 
 | |
|             try
 | |
|             {
 | |
|                 var evt = JsonSerializer.Deserialize<WebSocketPacketEvent>(msg.Data, GrpcTypeHelper.SerializerOptions);
 | |
|                 if (evt == null) throw new ArgumentNullException(nameof(evt));
 | |
|                 var packet = WebSocketPacket.FromBytes(evt.PacketBytes);
 | |
|                 logger.LogInformation("Handling websocket packet... {Type}", packet.Type);
 | |
|                 switch (packet.Type)
 | |
|                 {
 | |
|                     case "messages.read":
 | |
|                         await HandleMessageRead(evt, packet);
 | |
|                         break;
 | |
|                     case "messages.typing":
 | |
|                         await HandleMessageTyping(evt, packet);
 | |
|                         break;
 | |
|                     case "messages.subscribe":
 | |
|                         await HandleMessageSubscribe(evt, packet);
 | |
|                         break;
 | |
|                     case "messages.unsubscribe":
 | |
|                         await HandleMessageUnsubscribe(evt, packet);
 | |
|                         break;
 | |
|                 }
 | |
|             }
 | |
|             catch (Exception ex)
 | |
|             {
 | |
|                 logger.LogError(ex, "Error processing websocket packet");
 | |
|             }
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     private async Task HandleMessageRead(WebSocketPacketEvent evt, WebSocketPacket packet)
 | |
|     {
 | |
|         using var scope = serviceProvider.CreateScope();
 | |
|         var cs = scope.ServiceProvider.GetRequiredService<ChatService>();
 | |
|         var crs = scope.ServiceProvider.GetRequiredService<ChatRoomService>();
 | |
| 
 | |
|         if (packet.Data == null)
 | |
|         {
 | |
|             await SendErrorResponse(evt, "Mark message as read requires you to provide the ChatRoomId");
 | |
|             return;
 | |
|         }
 | |
| 
 | |
|         var requestData = packet.GetData<ChatController.MarkMessageReadRequest>();
 | |
|         if (requestData == null)
 | |
|         {
 | |
|             await SendErrorResponse(evt, "Invalid request data");
 | |
|             return;
 | |
|         }
 | |
| 
 | |
|         var sender = await crs.GetRoomMember(evt.AccountId, requestData.ChatRoomId);
 | |
|         if (sender == null)
 | |
|         {
 | |
|             await SendErrorResponse(evt, "User is not a member of the chat room.");
 | |
|             return;
 | |
|         }
 | |
| 
 | |
|         await cs.ReadChatRoomAsync(requestData.ChatRoomId, evt.AccountId);
 | |
|     }
 | |
| 
 | |
|     private async Task HandleMessageTyping(WebSocketPacketEvent evt, WebSocketPacket packet)
 | |
|     {
 | |
|         using var scope = serviceProvider.CreateScope();
 | |
|         var crs = scope.ServiceProvider.GetRequiredService<ChatRoomService>();
 | |
| 
 | |
|         if (packet.Data == null)
 | |
|         {
 | |
|             await SendErrorResponse(evt, "messages.typing requires you to provide the ChatRoomId");
 | |
|             return;
 | |
|         }
 | |
| 
 | |
|         var requestData = packet.GetData<ChatController.ChatRoomWsUniversalRequest>();
 | |
|         if (requestData == null)
 | |
|         {
 | |
|             await SendErrorResponse(evt, "Invalid request data");
 | |
|             return;
 | |
|         }
 | |
| 
 | |
|         var sender = await crs.GetRoomMember(evt.AccountId, requestData.ChatRoomId);
 | |
|         if (sender == null)
 | |
|         {
 | |
|             await SendErrorResponse(evt, "User is not a member of the chat room.");
 | |
|             return;
 | |
|         }
 | |
| 
 | |
|         var responsePacket = new WebSocketPacket
 | |
|         {
 | |
|             Type = "messages.typing",
 | |
|             Data = new
 | |
|             {
 | |
|                 room_id = sender.ChatRoomId,
 | |
|                 sender_id = sender.Id,
 | |
|                 sender = sender
 | |
|             }
 | |
|         };
 | |
| 
 | |
|         // Broadcast typing indicator to other room members
 | |
|         var otherMembers = (await crs.ListRoomMembers(requestData.ChatRoomId))
 | |
|             .Where(m => m.AccountId != evt.AccountId)
 | |
|             .Select(m => m.AccountId.ToString())
 | |
|             .ToList();
 | |
| 
 | |
|         var respRequest = new PushWebSocketPacketToUsersRequest() { Packet = responsePacket.ToProtoValue() };
 | |
|         respRequest.UserIds.AddRange(otherMembers);
 | |
| 
 | |
|         await pusher.PushWebSocketPacketToUsersAsync(respRequest);
 | |
|     }
 | |
| 
 | |
|     private async Task HandleMessageSubscribe(WebSocketPacketEvent evt, WebSocketPacket packet)
 | |
|     {
 | |
|         using var scope = serviceProvider.CreateScope();
 | |
|         var crs = scope.ServiceProvider.GetRequiredService<ChatRoomService>();
 | |
| 
 | |
|         if (packet.Data == null)
 | |
|         {
 | |
|             await SendErrorResponse(evt, "messages.subscribe requires you to provide the ChatRoomId");
 | |
|             return;
 | |
|         }
 | |
| 
 | |
|         var requestData = packet.GetData<ChatController.ChatRoomWsUniversalRequest>();
 | |
|         if (requestData == null)
 | |
|         {
 | |
|             await SendErrorResponse(evt, "Invalid request data");
 | |
|             return;
 | |
|         }
 | |
| 
 | |
|         var sender = await crs.GetRoomMember(evt.AccountId, requestData.ChatRoomId);
 | |
|         if (sender == null)
 | |
|         {
 | |
|             await SendErrorResponse(evt, "User is not a member of the chat room.");
 | |
|             return;
 | |
|         }
 | |
| 
 | |
|         await crs.SubscribeChatRoom(sender);
 | |
|     }
 | |
| 
 | |
|     private async Task HandleMessageUnsubscribe(WebSocketPacketEvent evt, WebSocketPacket packet)
 | |
|     {
 | |
|         using var scope = serviceProvider.CreateScope();
 | |
|         var crs = scope.ServiceProvider.GetRequiredService<ChatRoomService>();
 | |
| 
 | |
|         if (packet.Data == null)
 | |
|         {
 | |
|             await SendErrorResponse(evt, "messages.unsubscribe requires you to provide the ChatRoomId");
 | |
|             return;
 | |
|         }
 | |
| 
 | |
|         var requestData = packet.GetData<ChatController.ChatRoomWsUniversalRequest>();
 | |
|         if (requestData == null)
 | |
|         {
 | |
|             await SendErrorResponse(evt, "Invalid request data");
 | |
|             return;
 | |
|         }
 | |
| 
 | |
|         var sender = await crs.GetRoomMember(evt.AccountId, requestData.ChatRoomId);
 | |
|         if (sender == null)
 | |
|         {
 | |
|             await SendErrorResponse(evt, "User is not a member of the chat room.");
 | |
|             return;
 | |
|         }
 | |
| 
 | |
|         await crs.UnsubscribeChatRoom(sender);
 | |
|     }
 | |
| 
 | |
|     private async Task HandleAccountStatusUpdates(CancellationToken stoppingToken)
 | |
|     {
 | |
|         await foreach (var msg in nats.SubscribeAsync<byte[]>(AccountStatusUpdatedEvent.Type, cancellationToken: stoppingToken))
 | |
|         {
 | |
|             try
 | |
|             {
 | |
|                 var evt = GrpcTypeHelper.ConvertByteStringToObject<AccountStatusUpdatedEvent>(ByteString.CopyFrom(msg.Data));
 | |
|                 if (evt == null)
 | |
|                     continue;
 | |
| 
 | |
|                 logger.LogInformation("Account status updated: {AccountId}", evt.AccountId);
 | |
| 
 | |
|                 await using var scope = serviceProvider.CreateAsyncScope();
 | |
|                 var db = scope.ServiceProvider.GetRequiredService<AppDatabase>();
 | |
|                 var chatRoomService = scope.ServiceProvider.GetRequiredService<ChatRoomService>();
 | |
| 
 | |
|                 // Get user's joined chat rooms
 | |
|                 var userRooms = await db.ChatMembers
 | |
|                     .Where(m => m.AccountId == evt.AccountId && m.JoinedAt != null && m.LeaveAt == null)
 | |
|                     .Select(m => m.ChatRoomId)
 | |
|                     .ToListAsync(cancellationToken: stoppingToken);
 | |
| 
 | |
|                 // Send WebSocket packet to subscribed users per room
 | |
|                 foreach (var roomId in userRooms)
 | |
|                 {
 | |
|                     var members = await chatRoomService.ListRoomMembers(roomId);
 | |
|                     var subscribedMemberIds = await chatRoomService.GetSubscribedMembers(roomId);
 | |
|                     var subscribedUsers = members
 | |
|                         .Where(m => subscribedMemberIds.Contains(m.Id))
 | |
|                         .Select(m => m.AccountId.ToString())
 | |
|                         .ToList();
 | |
| 
 | |
|                     if (subscribedUsers.Count == 0) continue;
 | |
|                     var packet = new WebSocketPacket
 | |
|                     {
 | |
|                         Type = "accounts.status.update",
 | |
|                         Data = new Dictionary<string, object>
 | |
|                         {
 | |
|                             ["status"] = evt.Status,
 | |
|                             ["chat_room_id"] = roomId
 | |
|                         }
 | |
|                     };
 | |
| 
 | |
|                     var request = new PushWebSocketPacketToUsersRequest
 | |
|                     {
 | |
|                         Packet = packet.ToProtoValue()
 | |
|                     };
 | |
|                     request.UserIds.AddRange(subscribedUsers);
 | |
| 
 | |
|                     await pusher.PushWebSocketPacketToUsersAsync(request, cancellationToken: stoppingToken);
 | |
| 
 | |
|                     logger.LogInformation("Sent status update for room {roomId} to {count} subscribed users", roomId, subscribedUsers.Count);
 | |
|                 }
 | |
|             }
 | |
|             catch (Exception ex)
 | |
|             {
 | |
|                 logger.LogError(ex, "Error processing AccountStatusUpdated");
 | |
|             }
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     private async Task SendErrorResponse(WebSocketPacketEvent evt, string message)
 | |
|     {
 | |
|         await pusher.PushWebSocketPacketToDeviceAsync(new PushWebSocketPacketToDeviceRequest
 | |
|         {
 | |
|             DeviceId = evt.DeviceId,
 | |
|             Packet = new WebSocketPacket
 | |
|             {
 | |
|                 Type = "error",
 | |
|                 ErrorMessage = message
 | |
|             }.ToProtoValue()
 | |
|         });
 | |
|     }
 | |
| }
 |