From bec294365fe1dbbcb6899dd99d2a4a742943c32b Mon Sep 17 00:00:00 2001 From: LittleSheep Date: Fri, 11 Jul 2025 23:07:32 +0800 Subject: [PATCH] :recycle: Refactored webhook receiver in realtime call --- ...ealtimeService.cs => CloudflareService.cs} | 144 ++++---- .../Chat/Realtime/LiveKitService.cs | 307 ++++++++++++++++++ .../Chat/Realtime/LivekitService.cs | 136 -------- .../Chat/Realtime/RealtimeStatusService.cs | 91 ++++++ .../Startup/ServiceCollectionExtensions.cs | 3 +- DysonNetwork.sln.DotSettings.user | 1 + 6 files changed, 468 insertions(+), 214 deletions(-) rename DysonNetwork.Sphere/Chat/Realtime/{CloudflareRealtimeService.cs => CloudflareService.cs} (71%) create mode 100644 DysonNetwork.Sphere/Chat/Realtime/LiveKitService.cs delete mode 100644 DysonNetwork.Sphere/Chat/Realtime/LivekitService.cs create mode 100644 DysonNetwork.Sphere/Chat/Realtime/RealtimeStatusService.cs diff --git a/DysonNetwork.Sphere/Chat/Realtime/CloudflareRealtimeService.cs b/DysonNetwork.Sphere/Chat/Realtime/CloudflareService.cs similarity index 71% rename from DysonNetwork.Sphere/Chat/Realtime/CloudflareRealtimeService.cs rename to DysonNetwork.Sphere/Chat/Realtime/CloudflareService.cs index b965c2e..188b9de 100644 --- a/DysonNetwork.Sphere/Chat/Realtime/CloudflareRealtimeService.cs +++ b/DysonNetwork.Sphere/Chat/Realtime/CloudflareService.cs @@ -1,29 +1,32 @@ using System.Security.Cryptography; -using System.Security.Cryptography.X509Certificates; using System.Text; using System.Text.Json; using Microsoft.IdentityModel.Tokens; using System.Text.Json.Serialization; +using Microsoft.EntityFrameworkCore; +using NodaTime; namespace DysonNetwork.Sphere.Chat.Realtime; public class CloudflareRealtimeService : IRealtimeService { + private readonly AppDatabase _db; private readonly HttpClient _httpClient; private readonly IConfiguration _configuration; - private readonly string _apiSecret; - private readonly ChatRoomService _chatRoomService; private RSA? _publicKey; - public CloudflareRealtimeService(HttpClient httpClient, IConfiguration configuration, - ChatRoomService chatRoomService) + public CloudflareRealtimeService( + AppDatabase db, + HttpClient httpClient, + IConfiguration configuration + ) { + _db = db; _httpClient = httpClient; _configuration = configuration; - _chatRoomService = chatRoomService; var apiKey = _configuration["Realtime:Cloudflare:ApiKey"]; - _apiSecret = _configuration["Realtime:Cloudflare:ApiSecret"]!; - var credentials = Convert.ToBase64String(Encoding.ASCII.GetBytes($"{apiKey}:{_apiSecret}")); + var apiSecret = _configuration["Realtime:Cloudflare:ApiSecret"]!; + var credentials = Convert.ToBase64String(Encoding.ASCII.GetBytes($"{apiKey}:{apiSecret}")); _httpClient.BaseAddress = new Uri("https://rtk.realtime.cloudflare.com/v2/"); _httpClient.DefaultRequestHeaders.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue("Basic", credentials); @@ -33,13 +36,12 @@ public class CloudflareRealtimeService : IRealtimeService public async Task CreateSessionAsync(Guid roomId, Dictionary metadata) { - var roomName = $"Call_{roomId.ToString().Replace("-", "")}"; + var roomName = $"Room Call #{roomId.ToString().Replace("-", "")}"; var requestBody = new { - title = $"Solar Room Call #{roomId}", + title = roomName, preferred_region = _configuration["Realtime:Cloudflare:PreferredRegion"], - data = metadata, - room_name = roomName + data = metadata }; var content = new StringContent(JsonSerializer.Serialize(requestBody), Encoding.UTF8, "application/json"); @@ -48,7 +50,8 @@ public class CloudflareRealtimeService : IRealtimeService response.EnsureSuccessStatusCode(); var responseContent = await response.Content.ReadAsStringAsync(); - var meetingResponse = JsonSerializer.Deserialize(responseContent); + var meetingResponse = JsonSerializer.Deserialize(responseContent); + if (meetingResponse is null) throw new Exception("Failed to create meeting with cloudflare"); return new RealtimeSessionConfig { @@ -77,7 +80,7 @@ public class CloudflareRealtimeService : IRealtimeService { return GetUserTokenAsync(account, sessionId, isAdmin).GetAwaiter().GetResult(); } - + public async Task GetUserTokenAsync(Account.Account account, string sessionId, bool isAdmin = false) { try @@ -93,14 +96,15 @@ public class CloudflareRealtimeService : IRealtimeService .PostAsync($"meetings/{sessionId}/participants/{account.Id}/token", null); tokenResponse.EnsureSuccessStatusCode(); var tokenContent = await tokenResponse.Content.ReadAsStringAsync(); - var tokenData = JsonSerializer.Deserialize>(tokenContent); + var tokenData = JsonSerializer.Deserialize>(tokenContent); if (tokenData == null || !tokenData.Success) { throw new Exception("Failed to get participant token"); } + return tokenData.Data?.Token ?? throw new Exception("Token is null"); } - + // Participant doesn't exist, create a new one var requestBody = new { @@ -110,20 +114,21 @@ public class CloudflareRealtimeService : IRealtimeService }; var content = new StringContent( - JsonSerializer.Serialize(requestBody), - Encoding.UTF8, + JsonSerializer.Serialize(requestBody), + Encoding.UTF8, "application/json" ); - + var createResponse = await _httpClient.PostAsync($"meetings/{sessionId}/participants", content); createResponse.EnsureSuccessStatusCode(); var responseContent = await createResponse.Content.ReadAsStringAsync(); - var participantData = JsonSerializer.Deserialize>(responseContent); + var participantData = JsonSerializer.Deserialize>(responseContent); if (participantData == null || !participantData.Success) { throw new Exception("Failed to create participant"); } + return participantData.Data?.Token ?? throw new Exception("Token is null"); } catch (Exception ex) @@ -156,17 +161,18 @@ public class CloudflareRealtimeService : IRealtimeService } // Process the webhook event - var webhookEvent = JsonSerializer.Deserialize(body); + var evt = JsonSerializer.Deserialize(body); + if (evt is null) return; - if (webhookEvent.Type == "participant.joined") + switch (evt.Type) { - await _chatRoomService.SetRoomCallStatus( - Guid.Parse(webhookEvent.Event.Meeting.RoomName.Replace("Call_", "")), true); - } - else if (webhookEvent.Type == "room.ended") - { - await _chatRoomService.SetRoomCallStatus( - Guid.Parse(webhookEvent.Event.Meeting.RoomName.Replace("Call_", "")), false); + case "meeting.ended": + var now = SystemClock.Instance.GetCurrentInstant(); + await _db.ChatRealtimeCall + .Where(c => c.SessionId == evt.Event.Meeting.Id) + .ExecuteUpdateAsync(s => s.SetProperty(p => p.EndedAt, now) + ); + break; } } @@ -197,72 +203,56 @@ public class CloudflareRealtimeService : IRealtimeService _publicKey.ImportFromPem(publicKeyPem); } - private class DyteMeetingResponse + private class CfMeetingResponse { - [JsonPropertyName("data")] - public DyteMeetingData Data { get; set; } = new(); + [JsonPropertyName("data")] public CfMeetingData Data { get; set; } = new(); } - private class DyteMeetingData + private class CfMeetingData { - [JsonPropertyName("id")] - public string Id { get; set; } = string.Empty; - [JsonPropertyName("roomName")] - public string RoomName { get; set; } = string.Empty; - [JsonPropertyName("title")] - public string Title { get; set; } = string.Empty; - [JsonPropertyName("status")] - public string Status { get; set; } = string.Empty; - [JsonPropertyName("createdAt")] - public DateTime CreatedAt { get; set; } - [JsonPropertyName("updatedAt")] - public DateTime UpdatedAt { get; set; } + [JsonPropertyName("id")] public string Id { get; set; } = string.Empty; + [JsonPropertyName("roomName")] public string RoomName { get; set; } = string.Empty; + [JsonPropertyName("title")] public string Title { get; set; } = string.Empty; + [JsonPropertyName("status")] public string Status { get; set; } = string.Empty; + [JsonPropertyName("createdAt")] public DateTime CreatedAt { get; set; } + [JsonPropertyName("updatedAt")] public DateTime UpdatedAt { get; set; } } - private class DyteParticipant + private class CfParticipant { public string Id { get; set; } = string.Empty; public string Token { get; set; } = string.Empty; public string CustomParticipantId { get; set; } = string.Empty; } - public class DyteResponse + public class CfResponse { - [JsonPropertyName("success")] - public bool Success { get; set; } - - [JsonPropertyName("data")] - public T? Data { get; set; } + [JsonPropertyName("success")] public bool Success { get; set; } + + [JsonPropertyName("data")] public T? Data { get; set; } } - public class DyteTokenResponse + public class CfTokenResponse { - [JsonPropertyName("token")] - public string Token { get; set; } = string.Empty; - } - - public class DyteParticipantResponse - { - [JsonPropertyName("id")] - public string Id { get; set; } = string.Empty; - - [JsonPropertyName("name")] - public string Name { get; set; } = string.Empty; - - [JsonPropertyName("customUserId")] - public string CustomUserId { get; set; } = string.Empty; - - [JsonPropertyName("presetName")] - public string PresetName { get; set; } = string.Empty; - - [JsonPropertyName("isActive")] - public bool IsActive { get; set; } - - [JsonPropertyName("token")] - public string Token { get; set; } = string.Empty; + [JsonPropertyName("token")] public string Token { get; set; } = string.Empty; } - public class DyteWebhookEvent + public class CfParticipantResponse + { + [JsonPropertyName("id")] public string Id { get; set; } = string.Empty; + + [JsonPropertyName("name")] public string Name { get; set; } = string.Empty; + + [JsonPropertyName("customUserId")] public string CustomUserId { get; set; } = string.Empty; + + [JsonPropertyName("presetName")] public string PresetName { get; set; } = string.Empty; + + [JsonPropertyName("isActive")] public bool IsActive { get; set; } + + [JsonPropertyName("token")] public string Token { get; set; } = string.Empty; + } + + public class CfWebhookEvent { [JsonPropertyName("id")] public string Id { get; set; } diff --git a/DysonNetwork.Sphere/Chat/Realtime/LiveKitService.cs b/DysonNetwork.Sphere/Chat/Realtime/LiveKitService.cs new file mode 100644 index 0000000..ac5ae4b --- /dev/null +++ b/DysonNetwork.Sphere/Chat/Realtime/LiveKitService.cs @@ -0,0 +1,307 @@ +using DysonNetwork.Sphere.Connection; +using DysonNetwork.Sphere.Storage; +using Livekit.Server.Sdk.Dotnet; +using Microsoft.EntityFrameworkCore; +using NodaTime; +using System.Text.Json; + +namespace DysonNetwork.Sphere.Chat.Realtime; + +/// +/// LiveKit implementation of the real-time communication service +/// +public class LiveKitRealtimeService : IRealtimeService +{ + private readonly AppDatabase _db; + private readonly ICacheService _cache; + private readonly RealtimeStatusService _callStatus; + + private readonly ILogger _logger; + private readonly RoomServiceClient _roomService; + private readonly AccessToken _accessToken; + private readonly WebhookReceiver _webhookReceiver; + + public LiveKitRealtimeService( + IConfiguration configuration, + ILogger logger, + AppDatabase db, + ICacheService cache, + RealtimeStatusService callStatus + ) + { + _logger = logger; + + // Get LiveKit configuration from appsettings + var host = configuration["Realtime:LiveKit:Endpoint"] ?? + throw new ArgumentNullException("Endpoint configuration is required"); + var apiKey = configuration["Realtime:LiveKit:ApiKey"] ?? + throw new ArgumentNullException("ApiKey configuration is required"); + var apiSecret = configuration["Realtime:LiveKit:ApiSecret"] ?? + throw new ArgumentNullException("ApiSecret configuration is required"); + + _roomService = new RoomServiceClient(host, apiKey, apiSecret); + _accessToken = new AccessToken(apiKey, apiSecret); + _webhookReceiver = new WebhookReceiver(apiKey, apiSecret); + + _db = db; + _cache = cache; + _callStatus = callStatus; + } + + /// + public string ProviderName => "LiveKit"; + + /// + public async Task CreateSessionAsync(Guid roomId, Dictionary metadata) + { + try + { + var roomName = $"Call_{roomId.ToString().Replace("-", "")}"; + + // Convert metadata to a string dictionary for LiveKit + var roomMetadata = new Dictionary(); + foreach (var item in metadata) + { + roomMetadata[item.Key] = item.Value?.ToString() ?? string.Empty; + } + + // Create room in LiveKit + var room = await _roomService.CreateRoom(new CreateRoomRequest + { + Name = roomName, + EmptyTimeout = 300, // 5 minutes + Metadata = JsonSerializer.Serialize(roomMetadata) + }); + + // Return session config + return new RealtimeSessionConfig + { + SessionId = room.Name, + Parameters = new Dictionary + { + { "sid", room.Sid }, + { "emptyTimeout", room.EmptyTimeout }, + { "creationTime", room.CreationTime } + } + }; + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to create LiveKit room for roomId: {RoomId}", roomId); + throw; + } + } + + /// + public async Task EndSessionAsync(string sessionId, RealtimeSessionConfig config) + { + try + { + // Delete the room in LiveKit + await _roomService.DeleteRoom(new DeleteRoomRequest + { + Room = sessionId + }); + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to end LiveKit session: {SessionId}", sessionId); + throw; + } + } + + /// + public string GetUserToken(Account.Account account, string sessionId, bool isAdmin = false) + { + return GetUserTokenAsync(account, sessionId, isAdmin).GetAwaiter().GetResult(); + } + + public Task GetUserTokenAsync(Account.Account account, string sessionId, bool isAdmin = false) + { + var token = _accessToken.WithIdentity(account.Name) + .WithName(account.Nick) + .WithGrants(new VideoGrants + { + RoomJoin = true, + CanPublish = true, + CanPublishData = true, + CanSubscribe = true, + CanSubscribeMetrics = true, + RoomAdmin = isAdmin, + Room = sessionId + }) + .WithMetadata(JsonSerializer.Serialize(new Dictionary + { { "account_id", account.Id.ToString() } })) + .WithTtl(TimeSpan.FromHours(1)); + return Task.FromResult(token.ToJwt()); + } + + public async Task ReceiveWebhook(string body, string authHeader) + { + var evt = _webhookReceiver.Receive(body, authHeader); + if (evt is null) return; + + switch (evt.Event) + { + case "room_finished": + var now = SystemClock.Instance.GetCurrentInstant(); + await _db.ChatRealtimeCall + .Where(c => c.SessionId == evt.Room.Name) + .ExecuteUpdateAsync(s => s.SetProperty(p => p.EndedAt, now) + ); + + // Also clean up participants list when the room is finished + await _cache.RemoveAsync(_GetParticipantsKey(evt.Room.Name)); + break; + + case "participant_joined": + if (evt.Participant != null) + { + // Add the participant to cache + await _AddParticipantToCache(evt.Room.Name, evt.Participant); + _logger.LogInformation( + "Participant joined room: {RoomName}, Participant: {ParticipantIdentity}", + evt.Room.Name, evt.Participant.Identity); + + // Broadcast participant list update to all participants + var info = await GetRoomParticipantsAsync(evt.Room.Name); + await _callStatus.BroadcastParticipantUpdate(evt.Room.Name, info); + } + + break; + + case "participant_left": + if (evt.Participant != null) + { + // Remove the participant from cache + await _RemoveParticipantFromCache(evt.Room.Name, evt.Participant); + _logger.LogInformation( + "Participant left room: {RoomName}, Participant: {ParticipantIdentity}", + evt.Room.Name, evt.Participant.Identity); + + // Broadcast participant list update to all participants + var info = await GetRoomParticipantsAsync(evt.Room.Name); + await _callStatus.BroadcastParticipantUpdate(evt.Room.Name, info); + } + + break; + } + } + + private static string _GetParticipantsKey(string roomName) + => $"RoomParticipants_{roomName}"; + + private async Task _AddParticipantToCache(string roomName, ParticipantInfo participant) + { + var participantsKey = _GetParticipantsKey(roomName); + + // Try to acquire a lock to prevent race conditions when updating the participants list + await using var lockObj = await _cache.AcquireLockAsync( + $"{participantsKey}_lock", + TimeSpan.FromSeconds(10), + TimeSpan.FromSeconds(5)); + + if (lockObj == null) + { + _logger.LogWarning("Failed to acquire lock for updating participants list in room: {RoomName}", roomName); + return; + } + + // Get the current participants list + var participants = await _cache.GetAsync>(participantsKey) ?? + []; + + // Check if the participant already exists + var existingIndex = participants.FindIndex(p => p.Identity == participant.Identity); + if (existingIndex >= 0) + { + // Update existing participant + participants[existingIndex] = CreateParticipantCacheItem(participant); + } + else + { + // Add new participant + participants.Add(CreateParticipantCacheItem(participant)); + } + + // Update cache with new list + await _cache.SetAsync(participantsKey, participants, TimeSpan.FromHours(6)); + + // Also add to a room group in cache for easy cleanup + await _cache.AddToGroupAsync(participantsKey, $"Room_{roomName}"); + } + + private async Task _RemoveParticipantFromCache(string roomName, ParticipantInfo participant) + { + var participantsKey = _GetParticipantsKey(roomName); + + // Try to acquire a lock to prevent race conditions when updating the participants list + await using var lockObj = await _cache.AcquireLockAsync( + $"{participantsKey}_lock", + TimeSpan.FromSeconds(10), + TimeSpan.FromSeconds(5)); + + if (lockObj == null) + { + _logger.LogWarning("Failed to acquire lock for updating participants list in room: {RoomName}", roomName); + return; + } + + // Get current participants list + var participants = await _cache.GetAsync>(participantsKey); + if (participants == null || !participants.Any()) + return; + + // Remove participant + participants.RemoveAll(p => p.Identity == participant.Identity); + + // Update cache with new list + await _cache.SetAsync(participantsKey, participants, TimeSpan.FromHours(6)); + } + + // Helper method to get participants in a room + public async Task> GetRoomParticipantsAsync(string roomName) + { + var participantsKey = _GetParticipantsKey(roomName); + return await _cache.GetAsync>(participantsKey) ?? []; + } + + private ParticipantInfoItem CreateParticipantCacheItem(ParticipantInfo participant) + { + // Try to parse account ID from metadata + Guid? accountId = null; + var metadata = new Dictionary(); + + if (string.IsNullOrEmpty(participant.Metadata)) + return new ParticipantInfoItem + { + Identity = participant.Identity, + Name = participant.Name, + AccountId = accountId, + Metadata = metadata, + JoinedAt = DateTime.UtcNow + }; + try + { + metadata = JsonSerializer.Deserialize>(participant.Metadata) ?? + new Dictionary(); + + if (metadata.TryGetValue("account_id", out var accountIdStr)) + if (Guid.TryParse(accountIdStr, out var parsedId)) + accountId = parsedId; + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to parse participant metadata"); + } + + return new ParticipantInfoItem + { + Identity = participant.Identity, + Name = participant.Name, + AccountId = accountId, + Metadata = metadata, + JoinedAt = DateTime.UtcNow + }; + } +} diff --git a/DysonNetwork.Sphere/Chat/Realtime/LivekitService.cs b/DysonNetwork.Sphere/Chat/Realtime/LivekitService.cs deleted file mode 100644 index 591f730..0000000 --- a/DysonNetwork.Sphere/Chat/Realtime/LivekitService.cs +++ /dev/null @@ -1,136 +0,0 @@ -using DysonNetwork.Sphere.Connection; -using DysonNetwork.Sphere.Storage; -using Livekit.Server.Sdk.Dotnet; -using Microsoft.EntityFrameworkCore; -using NodaTime; -using System.Text.Json; - -namespace DysonNetwork.Sphere.Chat.Realtime; - -/// -/// LiveKit implementation of the real-time communication service -/// -public class LivekitRealtimeService : IRealtimeService -{ - private readonly ILogger _logger; - private readonly RoomServiceClient _roomService; - private readonly AccessToken _accessToken; - private readonly WebhookReceiver _webhookReceiver; - - public LivekitRealtimeService( - IConfiguration configuration, - ILogger logger) - { - _logger = logger; - - // Get LiveKit configuration from appsettings - var host = configuration["Livekit:Endpoint"] ?? - throw new ArgumentNullException("Endpoint configuration is required"); - var apiKey = configuration["Livekit:ApiKey"] ?? - throw new ArgumentNullException("ApiKey configuration is required"); - var apiSecret = configuration["Livekit:ApiSecret"] ?? - throw new ArgumentNullException("ApiSecret configuration is required"); - - _roomService = new RoomServiceClient(host, apiKey, apiSecret); - _accessToken = new AccessToken(apiKey, apiSecret); - _webhookReceiver = new WebhookReceiver(apiKey, apiSecret); - } - - /// - public string ProviderName => "LiveKit"; - - /// - public async Task CreateSessionAsync(Guid roomId, Dictionary metadata) - { - try - { - var roomName = $"Call_{roomId.ToString().Replace("-", "")}"; - - // Convert metadata to a string dictionary for LiveKit - var roomMetadata = new Dictionary(); - foreach (var item in metadata) - { - roomMetadata[item.Key] = item.Value?.ToString() ?? string.Empty; - } - - // Create room in LiveKit - var room = await _roomService.CreateRoom(new CreateRoomRequest - { - Name = roomName, - EmptyTimeout = 300, // 5 minutes - Metadata = JsonSerializer.Serialize(roomMetadata) - }); - - // Return session config - return new RealtimeSessionConfig - { - SessionId = room.Name, - Parameters = new Dictionary - { - { "sid", room.Sid }, - { "emptyTimeout", room.EmptyTimeout }, - { "creationTime", room.CreationTime } - } - }; - } - catch (Exception ex) - { - _logger.LogError(ex, "Failed to create LiveKit room for roomId: {RoomId}", roomId); - throw; - } - } - - /// - public async Task EndSessionAsync(string sessionId, RealtimeSessionConfig config) - { - try - { - // Delete the room in LiveKit - await _roomService.DeleteRoom(new DeleteRoomRequest - { - Room = sessionId - }); - } - catch (Exception ex) - { - _logger.LogError(ex, "Failed to end LiveKit session: {SessionId}", sessionId); - throw; - } - } - - /// - public string GetUserToken(Account.Account account, string sessionId, bool isAdmin = false) - { - return GetUserTokenAsync(account, sessionId, isAdmin).GetAwaiter().GetResult(); - } - - public Task GetUserTokenAsync(Account.Account account, string sessionId, bool isAdmin = false) - { - var token = _accessToken.WithIdentity(account.Name) - .WithName(account.Nick) - .WithGrants(new VideoGrants - { - RoomJoin = true, - CanPublish = true, - CanPublishData = true, - CanSubscribe = true, - CanSubscribeMetrics = true, - RoomAdmin = isAdmin, - Room = sessionId - }) - .WithMetadata(JsonSerializer.Serialize(new Dictionary - { { "account_id", account.Id.ToString() } })) - .WithTtl(TimeSpan.FromHours(1)); - return Task.FromResult(token.ToJwt()); - } - - public Task ReceiveWebhook(string body, string authHeader) - { - var evt = _webhookReceiver.Receive(body, authHeader); - if (evt is null) return Task.CompletedTask; - - // TODO: Handle webhook events - - return Task.CompletedTask; - } -} diff --git a/DysonNetwork.Sphere/Chat/Realtime/RealtimeStatusService.cs b/DysonNetwork.Sphere/Chat/Realtime/RealtimeStatusService.cs new file mode 100644 index 0000000..2fdd815 --- /dev/null +++ b/DysonNetwork.Sphere/Chat/Realtime/RealtimeStatusService.cs @@ -0,0 +1,91 @@ +using DysonNetwork.Sphere.Connection; +using Microsoft.EntityFrameworkCore; + +namespace DysonNetwork.Sphere.Chat.Realtime; + +public class ParticipantInfoItem +{ + public string Identity { get; set; } = null!; + public string Name { get; set; } = null!; + public Guid? AccountId { get; set; } + public Dictionary Metadata { get; set; } = new(); + public DateTime JoinedAt { get; set; } +} + +public class RealtimeStatusService(AppDatabase db, WebSocketService ws, ILogger logger) +{ + // Broadcast participant update to all participants in a room + public async Task BroadcastParticipantUpdate(string roomName, List participantsInfo) + { + try + { + // Get the room ID from the session name + var roomInfo = await db.ChatRealtimeCall + .Where(c => c.SessionId == roomName && c.EndedAt == null) + .Select(c => new { c.RoomId, c.Id }) + .FirstOrDefaultAsync(); + + if (roomInfo == null) + { + logger.LogWarning("Could not find room info for session: {SessionName}", roomName); + return; + } + + // Get all room members who should receive this update + var roomMembers = await db.ChatMembers + .Where(m => m.ChatRoomId == roomInfo.RoomId && m.LeaveAt == null) + .Select(m => m.AccountId) + .ToListAsync(); + + // Get member profiles for participants who have account IDs + var accountIds = participantsInfo + .Where(p => p.AccountId.HasValue) + .Select(p => p.AccountId!.Value) + .ToList(); + + var memberProfiles = new Dictionary(); + if (accountIds.Count != 0) + { + memberProfiles = await db.ChatMembers + .Where(m => m.ChatRoomId == roomInfo.RoomId && accountIds.Contains(m.AccountId)) + .Include(m => m.Account) + .ThenInclude(m => m.Profile) + .ToDictionaryAsync(m => m.AccountId, m => m); + } + + // Convert to CallParticipant objects + var participants = participantsInfo.Select(p => new CallParticipant + { + Identity = p.Identity, + Name = p.Name, + AccountId = p.AccountId, + JoinedAt = p.JoinedAt, + Profile = p.AccountId.HasValue && memberProfiles.TryGetValue(p.AccountId.Value, out var profile) + ? profile + : null + }).ToList(); + + // Create the update packet with CallParticipant objects + var updatePacket = new WebSocketPacket + { + Type = WebSocketPacketType.CallParticipantsUpdate, + Data = new Dictionary + { + { "room_id", roomInfo.RoomId }, + { "call_id", roomInfo.Id }, + { "participants", participants } + } + }; + + // Send the update to all members + foreach (var accountId in roomMembers) + { + ws.SendPacketToAccount(accountId, updatePacket); + } + } + catch (Exception ex) + { + logger.LogError(ex, "Error broadcasting participant update for room {RoomName}", roomName); + } + } +} \ No newline at end of file diff --git a/DysonNetwork.Sphere/Startup/ServiceCollectionExtensions.cs b/DysonNetwork.Sphere/Startup/ServiceCollectionExtensions.cs index 9abb006..4f6287f 100644 --- a/DysonNetwork.Sphere/Startup/ServiceCollectionExtensions.cs +++ b/DysonNetwork.Sphere/Startup/ServiceCollectionExtensions.cs @@ -229,6 +229,7 @@ public static class ServiceCollectionExtensions services.AddScoped(); services.AddScoped(); services.AddScoped(); + services.AddScoped(); services.AddRealtimeService(configuration); services.AddScoped(); services.AddScoped(); @@ -252,7 +253,7 @@ public static class ServiceCollectionExtensions services.AddHttpClient(); break; case "LiveKit": - services.AddScoped(); + services.AddScoped(); break; default: throw new NotSupportedException($"Realtime provider '{provider}' is not supported."); diff --git a/DysonNetwork.sln.DotSettings.user b/DysonNetwork.sln.DotSettings.user index 897f1de..c088690 100644 --- a/DysonNetwork.sln.DotSettings.user +++ b/DysonNetwork.sln.DotSettings.user @@ -56,6 +56,7 @@ ForceIncluded ForceIncluded ForceIncluded + ForceIncluded ForceIncluded ForceIncluded ForceIncluded