From 7b026eeae1348b2e3fbfaefc22cb89feb6bf6fd8 Mon Sep 17 00:00:00 2001 From: LittleSheep Date: Fri, 11 Jul 2025 21:07:38 +0800 Subject: [PATCH 1/4] :bricks: Add cloudflare realtime service --- DysonNetwork.Pass/Team/Team.cs | 82 +++++ .../Realtime/CloudflareRealtimeService.cs | 315 ++++++++++++++++++ .../Chat/Realtime/IRealtimeService.cs | 11 +- .../Chat/Realtime/LivekitService.cs | 289 +--------------- .../Chat/RealtimeCallController.cs | 39 +-- .../Startup/ServiceCollectionExtensions.cs | 22 +- DysonNetwork.Sphere/appsettings.json | 16 +- 7 files changed, 458 insertions(+), 316 deletions(-) create mode 100644 DysonNetwork.Pass/Team/Team.cs create mode 100644 DysonNetwork.Sphere/Chat/Realtime/CloudflareRealtimeService.cs diff --git a/DysonNetwork.Pass/Team/Team.cs b/DysonNetwork.Pass/Team/Team.cs new file mode 100644 index 0000000..2947bdf --- /dev/null +++ b/DysonNetwork.Pass/Team/Team.cs @@ -0,0 +1,82 @@ +using System.ComponentModel.DataAnnotations; +using System.ComponentModel.DataAnnotations.Schema; +using System.Text.Json.Serialization; +using Microsoft.EntityFrameworkCore; +using NodaTime; + +namespace DysonNetwork.Pass.Team; + +public enum TeamType +{ + Individual, + Organizational +} + +[Index(nameof(Name), IsUnique = true)] +public class Team : ModelBase, IIdentifiedResource +{ + public Guid Id { get; set; } + public TeamType Type { get; set; } + [MaxLength(256)] public string Name { get; set; } = string.Empty; + [MaxLength(256)] public string Nick { get; set; } = string.Empty; + [MaxLength(4096)] public string? Bio { get; set; } + + // Outdated fields, for backward compability + [MaxLength(32)] public string? PictureId { get; set; } + [MaxLength(32)] public string? BackgroundId { get; set; } + + [Column(TypeName = "jsonb")] public CloudFileReferenceObject? Picture { get; set; } + [Column(TypeName = "jsonb")] public CloudFileReferenceObject? Background { get; set; } + + [Column(TypeName = "jsonb")] public Account.VerificationMark? Verification { get; set; } + + [JsonIgnore] public ICollection Members { get; set; } = new List(); + [JsonIgnore] public ICollection Features { get; set; } = new List(); + + public Guid? AccountId { get; set; } + public Account.Account? Account { get; set; } + + public string ResourceIdentifier => $"publisher/{Id}"; +} + +public enum TeamMemberRole +{ + Owner = 100, + Manager = 75, + Editor = 50, + Viewer = 25 +} + +public class TeamMember : ModelBase +{ + public Guid TeamId { get; set; } + [JsonIgnore] public Team Team { get; set; } = null!; + public Guid AccountId { get; set; } + public Account.Account Account { get; set; } = null!; + + public TeamMemberRole Role { get; set; } = TeamMemberRole.Viewer; + public Instant? JoinedAt { get; set; } +} + +public enum TeamSubscriptionStatus +{ + Active, + Expired, + Cancelled +} + +public class TeamFeature : ModelBase +{ + public Guid Id { get; set; } + [MaxLength(1024)] public string Flag { get; set; } = null!; + public Instant? ExpiredAt { get; set; } + + public Guid TeamId { get; set; } + public Team Team { get; set; } = null!; +} + +public abstract class TeamFeatureFlag +{ + public static List AllFlags => [Develop]; + public static string Develop = "develop"; +} \ No newline at end of file diff --git a/DysonNetwork.Sphere/Chat/Realtime/CloudflareRealtimeService.cs b/DysonNetwork.Sphere/Chat/Realtime/CloudflareRealtimeService.cs new file mode 100644 index 0000000..b965c2e --- /dev/null +++ b/DysonNetwork.Sphere/Chat/Realtime/CloudflareRealtimeService.cs @@ -0,0 +1,315 @@ +using System.Security.Cryptography; +using System.Security.Cryptography.X509Certificates; +using System.Text; +using System.Text.Json; +using Microsoft.IdentityModel.Tokens; +using System.Text.Json.Serialization; + +namespace DysonNetwork.Sphere.Chat.Realtime; + +public class CloudflareRealtimeService : IRealtimeService +{ + private readonly HttpClient _httpClient; + private readonly IConfiguration _configuration; + private readonly string _apiSecret; + private readonly ChatRoomService _chatRoomService; + private RSA? _publicKey; + + public CloudflareRealtimeService(HttpClient httpClient, IConfiguration configuration, + ChatRoomService chatRoomService) + { + _httpClient = httpClient; + _configuration = configuration; + _chatRoomService = chatRoomService; + var apiKey = _configuration["Realtime:Cloudflare:ApiKey"]; + _apiSecret = _configuration["Realtime:Cloudflare:ApiSecret"]!; + var credentials = Convert.ToBase64String(Encoding.ASCII.GetBytes($"{apiKey}:{_apiSecret}")); + _httpClient.BaseAddress = new Uri("https://rtk.realtime.cloudflare.com/v2/"); + _httpClient.DefaultRequestHeaders.Authorization = + new System.Net.Http.Headers.AuthenticationHeaderValue("Basic", credentials); + } + + public string ProviderName => "Cloudflare"; + + public async Task CreateSessionAsync(Guid roomId, Dictionary metadata) + { + var roomName = $"Call_{roomId.ToString().Replace("-", "")}"; + var requestBody = new + { + title = $"Solar Room Call #{roomId}", + preferred_region = _configuration["Realtime:Cloudflare:PreferredRegion"], + data = metadata, + room_name = roomName + }; + + var content = new StringContent(JsonSerializer.Serialize(requestBody), Encoding.UTF8, "application/json"); + var response = await _httpClient.PostAsync("meetings", content); + + response.EnsureSuccessStatusCode(); + + var responseContent = await response.Content.ReadAsStringAsync(); + var meetingResponse = JsonSerializer.Deserialize(responseContent); + + return new RealtimeSessionConfig + { + SessionId = meetingResponse.Data.Id, + Parameters = new Dictionary + { + { "meetingId", meetingResponse.Data.Id } + } + }; + } + + public async Task EndSessionAsync(string sessionId, RealtimeSessionConfig config) + { + var requestBody = new + { + status = "INACTIVE" + }; + + var content = new StringContent(JsonSerializer.Serialize(requestBody), Encoding.UTF8, "application/json"); + var response = await _httpClient.PatchAsync($"sessions/{sessionId}", content); + + response.EnsureSuccessStatusCode(); + } + + public string GetUserToken(Account.Account account, string sessionId, bool isAdmin = false) + { + return GetUserTokenAsync(account, sessionId, isAdmin).GetAwaiter().GetResult(); + } + + public async Task GetUserTokenAsync(Account.Account account, string sessionId, bool isAdmin = false) + { + try + { + // First try to get the participant by their custom ID + var participantCheckResponse = await _httpClient + .GetAsync($"meetings/{sessionId}/participants/{account.Id}"); + + if (participantCheckResponse.IsSuccessStatusCode) + { + // Participant exists, get a new token + var tokenResponse = await _httpClient + .PostAsync($"meetings/{sessionId}/participants/{account.Id}/token", null); + tokenResponse.EnsureSuccessStatusCode(); + var tokenContent = await tokenResponse.Content.ReadAsStringAsync(); + var tokenData = JsonSerializer.Deserialize>(tokenContent); + if (tokenData == null || !tokenData.Success) + { + throw new Exception("Failed to get participant token"); + } + return tokenData.Data?.Token ?? throw new Exception("Token is null"); + } + + // Participant doesn't exist, create a new one + var requestBody = new + { + name = "@" + account.Name, + preset_name = isAdmin ? "group_call_host" : "group_call_participant", + custom_user_id = account.Id.ToString() + }; + + var content = new StringContent( + JsonSerializer.Serialize(requestBody), + Encoding.UTF8, + "application/json" + ); + + var createResponse = await _httpClient.PostAsync($"meetings/{sessionId}/participants", content); + createResponse.EnsureSuccessStatusCode(); + + var responseContent = await createResponse.Content.ReadAsStringAsync(); + var participantData = JsonSerializer.Deserialize>(responseContent); + if (participantData == null || !participantData.Success) + { + throw new Exception("Failed to create participant"); + } + return participantData.Data?.Token ?? throw new Exception("Token is null"); + } + catch (Exception ex) + { + // Log the error or handle it appropriately + throw new Exception($"Failed to get or create participant: {ex.Message}", ex); + } + } + + public async Task ReceiveWebhook(string body, string authHeader) + { + if (string.IsNullOrEmpty(authHeader)) + { + throw new ArgumentException("Auth header is missing"); + } + + if (_publicKey == null) + { + await GetPublicKeyAsync(); + } + + var signature = authHeader.Replace("Signature ", ""); + var bodyBytes = Encoding.UTF8.GetBytes(body); + var signatureBytes = Convert.FromBase64String(signature); + + if (!(_publicKey?.VerifyData(bodyBytes, signatureBytes, HashAlgorithmName.SHA256, RSASignaturePadding.Pkcs1) ?? + false)) + { + throw new SecurityTokenException("Webhook signature validation failed"); + } + + // Process the webhook event + var webhookEvent = JsonSerializer.Deserialize(body); + + if (webhookEvent.Type == "participant.joined") + { + await _chatRoomService.SetRoomCallStatus( + Guid.Parse(webhookEvent.Event.Meeting.RoomName.Replace("Call_", "")), true); + } + else if (webhookEvent.Type == "room.ended") + { + await _chatRoomService.SetRoomCallStatus( + Guid.Parse(webhookEvent.Event.Meeting.RoomName.Replace("Call_", "")), false); + } + } + + private class WebhooksConfig + { + [JsonPropertyName("keys")] public List Keys { get; set; } = new List(); + } + + private class WebhookKey + { + [JsonPropertyName("publicKeyPem")] public string PublicKeyPem { get; set; } = string.Empty; + } + + private async Task GetPublicKeyAsync() + { + var response = await _httpClient.GetAsync("https://rtk.realtime.cloudflare.com/.well-known/webhooks.json"); + response.EnsureSuccessStatusCode(); + var content = await response.Content.ReadAsStringAsync(); + var webhooksConfig = JsonSerializer.Deserialize(content); + var publicKeyPem = webhooksConfig?.Keys.FirstOrDefault()?.PublicKeyPem; + + if (string.IsNullOrEmpty(publicKeyPem)) + { + throw new InvalidOperationException("Public key not found in webhooks configuration."); + } + + _publicKey = RSA.Create(); + _publicKey.ImportFromPem(publicKeyPem); + } + + private class DyteMeetingResponse + { + [JsonPropertyName("data")] + public DyteMeetingData Data { get; set; } = new(); + } + + private class DyteMeetingData + { + [JsonPropertyName("id")] + public string Id { get; set; } = string.Empty; + [JsonPropertyName("roomName")] + public string RoomName { get; set; } = string.Empty; + [JsonPropertyName("title")] + public string Title { get; set; } = string.Empty; + [JsonPropertyName("status")] + public string Status { get; set; } = string.Empty; + [JsonPropertyName("createdAt")] + public DateTime CreatedAt { get; set; } + [JsonPropertyName("updatedAt")] + public DateTime UpdatedAt { get; set; } + } + + private class DyteParticipant + { + public string Id { get; set; } = string.Empty; + public string Token { get; set; } = string.Empty; + public string CustomParticipantId { get; set; } = string.Empty; + } + + public class DyteResponse + { + [JsonPropertyName("success")] + public bool Success { get; set; } + + [JsonPropertyName("data")] + public T? Data { get; set; } + } + + public class DyteTokenResponse + { + [JsonPropertyName("token")] + public string Token { get; set; } = string.Empty; + } + + public class DyteParticipantResponse + { + [JsonPropertyName("id")] + public string Id { get; set; } = string.Empty; + + [JsonPropertyName("name")] + public string Name { get; set; } = string.Empty; + + [JsonPropertyName("customUserId")] + public string CustomUserId { get; set; } = string.Empty; + + [JsonPropertyName("presetName")] + public string PresetName { get; set; } = string.Empty; + + [JsonPropertyName("isActive")] + public bool IsActive { get; set; } + + [JsonPropertyName("token")] + public string Token { get; set; } = string.Empty; + } + + public class DyteWebhookEvent + { + [JsonPropertyName("id")] public string Id { get; set; } + + [JsonPropertyName("type")] public string Type { get; set; } + + [JsonPropertyName("webhookId")] public string WebhookId { get; set; } + + [JsonPropertyName("timestamp")] public DateTime Timestamp { get; set; } + + [JsonPropertyName("event")] public EventData Event { get; set; } + } + + public class EventData + { + [JsonPropertyName("meeting")] public MeetingData Meeting { get; set; } + + [JsonPropertyName("participant")] public ParticipantData Participant { get; set; } + } + + public class MeetingData + { + [JsonPropertyName("id")] public string Id { get; set; } + + [JsonPropertyName("roomName")] public string RoomName { get; set; } + + [JsonPropertyName("title")] public string Title { get; set; } + + [JsonPropertyName("status")] public string Status { get; set; } + + [JsonPropertyName("createdAt")] public DateTime CreatedAt { get; set; } + + [JsonPropertyName("updatedAt")] public DateTime UpdatedAt { get; set; } + } + + public class ParticipantData + { + [JsonPropertyName("id")] public string Id { get; set; } + + [JsonPropertyName("userId")] public string UserId { get; set; } + + [JsonPropertyName("customParticipantId")] + public string CustomParticipantId { get; set; } + + [JsonPropertyName("presetName")] public string PresetName { get; set; } + + [JsonPropertyName("joinedAt")] public DateTime JoinedAt { get; set; } + + [JsonPropertyName("leftAt")] public DateTime? LeftAt { get; set; } + } +} \ No newline at end of file diff --git a/DysonNetwork.Sphere/Chat/Realtime/IRealtimeService.cs b/DysonNetwork.Sphere/Chat/Realtime/IRealtimeService.cs index b7c7509..20dd520 100644 --- a/DysonNetwork.Sphere/Chat/Realtime/IRealtimeService.cs +++ b/DysonNetwork.Sphere/Chat/Realtime/IRealtimeService.cs @@ -30,7 +30,7 @@ public interface IRealtimeService Task EndSessionAsync(string sessionId, RealtimeSessionConfig config); /// - /// Gets a token for user to join the session + /// Gets a token for user to join the session (synchronous version for backward compatibility) /// /// The user identifier /// The session identifier @@ -38,6 +38,15 @@ public interface IRealtimeService /// User-specific token for the session string GetUserToken(Account.Account account, string sessionId, bool isAdmin = false); + /// + /// Gets a token for user to join the session asynchronously + /// + /// The user identifier + /// The session identifier + /// The user is the admin of session + /// Task that resolves to the user-specific token for the session + Task GetUserTokenAsync(Account.Account account, string sessionId, bool isAdmin = false); + /// /// Processes incoming webhook requests from the realtime service provider /// diff --git a/DysonNetwork.Sphere/Chat/Realtime/LivekitService.cs b/DysonNetwork.Sphere/Chat/Realtime/LivekitService.cs index 8e07d05..591f730 100644 --- a/DysonNetwork.Sphere/Chat/Realtime/LivekitService.cs +++ b/DysonNetwork.Sphere/Chat/Realtime/LivekitService.cs @@ -12,10 +12,6 @@ namespace DysonNetwork.Sphere.Chat.Realtime; /// public class LivekitRealtimeService : IRealtimeService { - private readonly AppDatabase _db; - private readonly ICacheService _cache; - private readonly WebSocketService _ws; - private readonly ILogger _logger; private readonly RoomServiceClient _roomService; private readonly AccessToken _accessToken; @@ -23,29 +19,21 @@ public class LivekitRealtimeService : IRealtimeService public LivekitRealtimeService( IConfiguration configuration, - ILogger logger, - AppDatabase db, - ICacheService cache, - WebSocketService ws - ) + ILogger logger) { _logger = logger; // Get LiveKit configuration from appsettings - var host = configuration["RealtimeChat:Endpoint"] ?? + var host = configuration["Livekit:Endpoint"] ?? throw new ArgumentNullException("Endpoint configuration is required"); - var apiKey = configuration["RealtimeChat:ApiKey"] ?? + var apiKey = configuration["Livekit:ApiKey"] ?? throw new ArgumentNullException("ApiKey configuration is required"); - var apiSecret = configuration["RealtimeChat:ApiSecret"] ?? + var apiSecret = configuration["Livekit:ApiSecret"] ?? throw new ArgumentNullException("ApiSecret configuration is required"); _roomService = new RoomServiceClient(host, apiKey, apiSecret); _accessToken = new AccessToken(apiKey, apiSecret); _webhookReceiver = new WebhookReceiver(apiKey, apiSecret); - - _db = db; - _cache = cache; - _ws = ws; } /// @@ -112,6 +100,11 @@ public class LivekitRealtimeService : IRealtimeService /// public string GetUserToken(Account.Account account, string sessionId, bool isAdmin = false) + { + return GetUserTokenAsync(account, sessionId, isAdmin).GetAwaiter().GetResult(); + } + + public Task GetUserTokenAsync(Account.Account account, string sessionId, bool isAdmin = false) { var token = _accessToken.WithIdentity(account.Name) .WithName(account.Nick) @@ -128,264 +121,16 @@ public class LivekitRealtimeService : IRealtimeService .WithMetadata(JsonSerializer.Serialize(new Dictionary { { "account_id", account.Id.ToString() } })) .WithTtl(TimeSpan.FromHours(1)); - return token.ToJwt(); + return Task.FromResult(token.ToJwt()); } - public async Task ReceiveWebhook(string body, string authHeader) + public Task ReceiveWebhook(string body, string authHeader) { var evt = _webhookReceiver.Receive(body, authHeader); - if (evt is null) return; - - switch (evt.Event) - { - case "room_finished": - var now = SystemClock.Instance.GetCurrentInstant(); - await _db.ChatRealtimeCall - .Where(c => c.SessionId == evt.Room.Name) - .ExecuteUpdateAsync(s => s.SetProperty(p => p.EndedAt, now) - ); - - // Also clean up participants list when the room is finished - await _cache.RemoveAsync(_GetParticipantsKey(evt.Room.Name)); - break; - - case "participant_joined": - if (evt.Participant != null) - { - // Add the participant to cache - await _AddParticipantToCache(evt.Room.Name, evt.Participant); - _logger.LogInformation( - "Participant joined room: {RoomName}, Participant: {ParticipantIdentity}", - evt.Room.Name, evt.Participant.Identity); - - // Broadcast participant list update to all participants - await _BroadcastParticipantUpdate(evt.Room.Name); - } - - break; - - case "participant_left": - if (evt.Participant != null) - { - // Remove the participant from cache - await _RemoveParticipantFromCache(evt.Room.Name, evt.Participant); - _logger.LogInformation( - "Participant left room: {RoomName}, Participant: {ParticipantIdentity}", - evt.Room.Name, evt.Participant.Identity); - - // Broadcast participant list update to all participants - await _BroadcastParticipantUpdate(evt.Room.Name); - } - - break; - } + if (evt is null) return Task.CompletedTask; + + // TODO: Handle webhook events + + return Task.CompletedTask; } - - private static string _GetParticipantsKey(string roomName) - => $"RoomParticipants_{roomName}"; - - private async Task _AddParticipantToCache(string roomName, ParticipantInfo participant) - { - var participantsKey = _GetParticipantsKey(roomName); - - // Try to acquire a lock to prevent race conditions when updating the participants list - await using var lockObj = await _cache.AcquireLockAsync( - $"{participantsKey}_lock", - TimeSpan.FromSeconds(10), - TimeSpan.FromSeconds(5)); - - if (lockObj == null) - { - _logger.LogWarning("Failed to acquire lock for updating participants list in room: {RoomName}", roomName); - return; - } - - // Get the current participants list - var participants = await _cache.GetAsync>(participantsKey) ?? - []; - - // Check if the participant already exists - var existingIndex = participants.FindIndex(p => p.Identity == participant.Identity); - if (existingIndex >= 0) - { - // Update existing participant - participants[existingIndex] = CreateParticipantCacheItem(participant); - } - else - { - // Add new participant - participants.Add(CreateParticipantCacheItem(participant)); - } - - // Update cache with new list - await _cache.SetAsync(participantsKey, participants, TimeSpan.FromHours(6)); - - // Also add to a room group in cache for easy cleanup - await _cache.AddToGroupAsync(participantsKey, $"Room_{roomName}"); - } - - private async Task _RemoveParticipantFromCache(string roomName, ParticipantInfo participant) - { - var participantsKey = _GetParticipantsKey(roomName); - - // Try to acquire a lock to prevent race conditions when updating the participants list - await using var lockObj = await _cache.AcquireLockAsync( - $"{participantsKey}_lock", - TimeSpan.FromSeconds(10), - TimeSpan.FromSeconds(5)); - - if (lockObj == null) - { - _logger.LogWarning("Failed to acquire lock for updating participants list in room: {RoomName}", roomName); - return; - } - - // Get current participants list - var participants = await _cache.GetAsync>(participantsKey); - if (participants == null || !participants.Any()) - return; - - // Remove participant - participants.RemoveAll(p => p.Identity == participant.Identity); - - // Update cache with new list - await _cache.SetAsync(participantsKey, participants, TimeSpan.FromHours(6)); - } - - // Helper method to get participants in a room - public async Task> GetRoomParticipantsAsync(string roomName) - { - var participantsKey = _GetParticipantsKey(roomName); - return await _cache.GetAsync>(participantsKey) ?? []; - } - - // Class to represent a participant in the cache - public class ParticipantCacheItem - { - public string Identity { get; set; } = null!; - public string Name { get; set; } = null!; - public Guid? AccountId { get; set; } - public ParticipantInfo.Types.State State { get; set; } - public Dictionary Metadata { get; set; } = new(); - public DateTime JoinedAt { get; set; } - } - - private ParticipantCacheItem CreateParticipantCacheItem(ParticipantInfo participant) - { - // Try to parse account ID from metadata - Guid? accountId = null; - var metadata = new Dictionary(); - - if (string.IsNullOrEmpty(participant.Metadata)) - return new ParticipantCacheItem - { - Identity = participant.Identity, - Name = participant.Name, - AccountId = accountId, - State = participant.State, - Metadata = metadata, - JoinedAt = DateTime.UtcNow - }; - try - { - metadata = JsonSerializer.Deserialize>(participant.Metadata) ?? - new Dictionary(); - - if (metadata.TryGetValue("account_id", out var accountIdStr)) - if (Guid.TryParse(accountIdStr, out var parsedId)) - accountId = parsedId; - } - catch (Exception ex) - { - _logger.LogError(ex, "Failed to parse participant metadata"); - } - - return new ParticipantCacheItem - { - Identity = participant.Identity, - Name = participant.Name, - AccountId = accountId, - State = participant.State, - Metadata = metadata, - JoinedAt = DateTime.UtcNow - }; - } - - // Broadcast participant update to all participants in a room - private async Task _BroadcastParticipantUpdate(string roomName) - { - try - { - // Get the room ID from the session name - var roomInfo = await _db.ChatRealtimeCall - .Where(c => c.SessionId == roomName && c.EndedAt == null) - .Select(c => new { c.RoomId, c.Id }) - .FirstOrDefaultAsync(); - - if (roomInfo == null) - { - _logger.LogWarning("Could not find room info for session: {SessionName}", roomName); - return; - } - - // Get current participants - var livekitParticipants = await GetRoomParticipantsAsync(roomName); - - // Get all room members who should receive this update - var roomMembers = await _db.ChatMembers - .Where(m => m.ChatRoomId == roomInfo.RoomId && m.LeaveAt == null) - .Select(m => m.AccountId) - .ToListAsync(); - - // Get member profiles for participants who have account IDs - var accountIds = livekitParticipants - .Where(p => p.AccountId.HasValue) - .Select(p => p.AccountId!.Value) - .ToList(); - - var memberProfiles = new Dictionary(); - if (accountIds.Count != 0) - { - memberProfiles = await _db.ChatMembers - .Where(m => m.ChatRoomId == roomInfo.RoomId && accountIds.Contains(m.AccountId)) - .Include(m => m.Account) - .ThenInclude(m => m.Profile) - .ToDictionaryAsync(m => m.AccountId, m => m); - } - - // Convert to CallParticipant objects - var participants = livekitParticipants.Select(p => new CallParticipant - { - Identity = p.Identity, - Name = p.Name, - AccountId = p.AccountId, - JoinedAt = p.JoinedAt, - Profile = p.AccountId.HasValue && memberProfiles.TryGetValue(p.AccountId.Value, out var profile) - ? profile - : null - }).ToList(); - - // Create the update packet with CallParticipant objects - var updatePacket = new WebSocketPacket - { - Type = WebSocketPacketType.CallParticipantsUpdate, - Data = new Dictionary - { - { "room_id", roomInfo.RoomId }, - { "call_id", roomInfo.Id }, - { "participants", participants } - } - }; - - // Send the update to all members - foreach (var accountId in roomMembers) - { - _ws.SendPacketToAccount(accountId, updatePacket); - } - } - catch (Exception ex) - { - _logger.LogError(ex, "Error broadcasting participant update for room {RoomName}", roomName); - } - } -} \ No newline at end of file +} diff --git a/DysonNetwork.Sphere/Chat/RealtimeCallController.cs b/DysonNetwork.Sphere/Chat/RealtimeCallController.cs index e26bcd9..53e702c 100644 --- a/DysonNetwork.Sphere/Chat/RealtimeCallController.cs +++ b/DysonNetwork.Sphere/Chat/RealtimeCallController.cs @@ -1,5 +1,4 @@ using DysonNetwork.Sphere.Chat.Realtime; -using Livekit.Server.Sdk.Dotnet; using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Mvc; using Microsoft.EntityFrameworkCore; @@ -97,34 +96,6 @@ public class RealtimeCallController( var endpoint = _config.Endpoint ?? throw new InvalidOperationException("LiveKit endpoint configuration is missing"); - // Inject the ChatRoomService - var chatRoomService = HttpContext.RequestServices.GetRequiredService(); - - // Get current participants from the LiveKit service - var participants = new List(); - if (realtime is LivekitRealtimeService livekitService) - { - var roomParticipants = await livekitService.GetRoomParticipantsAsync(ongoingCall.SessionId); - participants = []; - - foreach (var p in roomParticipants) - { - var participant = new CallParticipant - { - Identity = p.Identity, - Name = p.Name, - AccountId = p.AccountId, - JoinedAt = p.JoinedAt - }; - - // Fetch the ChatMember profile if we have an account ID - if (p.AccountId.HasValue) - participant.Profile = await chatRoomService.GetRoomMember(p.AccountId.Value, roomId); - - participants.Add(participant); - } - } - // Create the response model var response = new JoinCallResponse { @@ -133,8 +104,7 @@ public class RealtimeCallController( Token = userToken, CallId = ongoingCall.Id, RoomName = ongoingCall.SessionId, - IsAdmin = isAdmin, - Participants = participants + IsAdmin = isAdmin }; return Ok(response); @@ -215,11 +185,6 @@ public class JoinCallResponse /// Whether the user is the admin of the call /// public bool IsAdmin { get; set; } - - /// - /// Current participants in the call - /// - public List Participants { get; set; } = new(); } /// @@ -251,4 +216,4 @@ public class CallParticipant /// When the participant joined the call /// public DateTime JoinedAt { get; set; } -} \ No newline at end of file +} diff --git a/DysonNetwork.Sphere/Startup/ServiceCollectionExtensions.cs b/DysonNetwork.Sphere/Startup/ServiceCollectionExtensions.cs index 4db2772..9abb006 100644 --- a/DysonNetwork.Sphere/Startup/ServiceCollectionExtensions.cs +++ b/DysonNetwork.Sphere/Startup/ServiceCollectionExtensions.cs @@ -229,7 +229,7 @@ public static class ServiceCollectionExtensions services.AddScoped(); services.AddScoped(); services.AddScoped(); - services.AddScoped(); + services.AddRealtimeService(configuration); services.AddScoped(); services.AddScoped(); services.AddScoped(); @@ -242,4 +242,22 @@ public static class ServiceCollectionExtensions return services; } -} \ No newline at end of file + + private static IServiceCollection AddRealtimeService(this IServiceCollection services, IConfiguration configuration) + { + var provider = configuration["Realtime:Provider"]; + switch (provider) + { + case "Cloudflare": + services.AddHttpClient(); + break; + case "LiveKit": + services.AddScoped(); + break; + default: + throw new NotSupportedException($"Realtime provider '{provider}' is not supported."); + } + + return services; + } +} diff --git a/DysonNetwork.Sphere/appsettings.json b/DysonNetwork.Sphere/appsettings.json index 82c1088..0f21995 100644 --- a/DysonNetwork.Sphere/appsettings.json +++ b/DysonNetwork.Sphere/appsettings.json @@ -85,10 +85,18 @@ "FromName": "Alphabot", "SubjectPrefix": "Solar Network" }, - "RealtimeChat": { - "Endpoint": "https://solar-network-im44o8gq.livekit.cloud", - "ApiKey": "APIs6TiL8wj3A4j", - "ApiSecret": "SffxRneIwTnlHPtEf3zicmmv3LUEl7xXael4PvWZrEhE" + "Realtime": { + "Provider": "LiveKit", + "LiveKit": { + "Endpoint": "https://solar-network-im44o8gq.livekit.cloud", + "ApiKey": "APIs6TiL8wj3A4j", + "ApiSecret": "SffxRneIwTnlHPtEf3zicmmv3LUEl7xXael4PvWZrEhE" + }, + "Cloudflare": { + "ApiKey": "", + "ApiSecret": "", + "PreferredRegion": "us-east-1" + } }, "GeoIp": { "DatabasePath": "./Keys/GeoLite2-City.mmdb" From bec294365fe1dbbcb6899dd99d2a4a742943c32b Mon Sep 17 00:00:00 2001 From: LittleSheep Date: Fri, 11 Jul 2025 23:07:32 +0800 Subject: [PATCH 2/4] :recycle: Refactored webhook receiver in realtime call --- ...ealtimeService.cs => CloudflareService.cs} | 144 ++++---- .../Chat/Realtime/LiveKitService.cs | 307 ++++++++++++++++++ .../Chat/Realtime/LivekitService.cs | 136 -------- .../Chat/Realtime/RealtimeStatusService.cs | 91 ++++++ .../Startup/ServiceCollectionExtensions.cs | 3 +- DysonNetwork.sln.DotSettings.user | 1 + 6 files changed, 468 insertions(+), 214 deletions(-) rename DysonNetwork.Sphere/Chat/Realtime/{CloudflareRealtimeService.cs => CloudflareService.cs} (71%) create mode 100644 DysonNetwork.Sphere/Chat/Realtime/LiveKitService.cs delete mode 100644 DysonNetwork.Sphere/Chat/Realtime/LivekitService.cs create mode 100644 DysonNetwork.Sphere/Chat/Realtime/RealtimeStatusService.cs diff --git a/DysonNetwork.Sphere/Chat/Realtime/CloudflareRealtimeService.cs b/DysonNetwork.Sphere/Chat/Realtime/CloudflareService.cs similarity index 71% rename from DysonNetwork.Sphere/Chat/Realtime/CloudflareRealtimeService.cs rename to DysonNetwork.Sphere/Chat/Realtime/CloudflareService.cs index b965c2e..188b9de 100644 --- a/DysonNetwork.Sphere/Chat/Realtime/CloudflareRealtimeService.cs +++ b/DysonNetwork.Sphere/Chat/Realtime/CloudflareService.cs @@ -1,29 +1,32 @@ using System.Security.Cryptography; -using System.Security.Cryptography.X509Certificates; using System.Text; using System.Text.Json; using Microsoft.IdentityModel.Tokens; using System.Text.Json.Serialization; +using Microsoft.EntityFrameworkCore; +using NodaTime; namespace DysonNetwork.Sphere.Chat.Realtime; public class CloudflareRealtimeService : IRealtimeService { + private readonly AppDatabase _db; private readonly HttpClient _httpClient; private readonly IConfiguration _configuration; - private readonly string _apiSecret; - private readonly ChatRoomService _chatRoomService; private RSA? _publicKey; - public CloudflareRealtimeService(HttpClient httpClient, IConfiguration configuration, - ChatRoomService chatRoomService) + public CloudflareRealtimeService( + AppDatabase db, + HttpClient httpClient, + IConfiguration configuration + ) { + _db = db; _httpClient = httpClient; _configuration = configuration; - _chatRoomService = chatRoomService; var apiKey = _configuration["Realtime:Cloudflare:ApiKey"]; - _apiSecret = _configuration["Realtime:Cloudflare:ApiSecret"]!; - var credentials = Convert.ToBase64String(Encoding.ASCII.GetBytes($"{apiKey}:{_apiSecret}")); + var apiSecret = _configuration["Realtime:Cloudflare:ApiSecret"]!; + var credentials = Convert.ToBase64String(Encoding.ASCII.GetBytes($"{apiKey}:{apiSecret}")); _httpClient.BaseAddress = new Uri("https://rtk.realtime.cloudflare.com/v2/"); _httpClient.DefaultRequestHeaders.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue("Basic", credentials); @@ -33,13 +36,12 @@ public class CloudflareRealtimeService : IRealtimeService public async Task CreateSessionAsync(Guid roomId, Dictionary metadata) { - var roomName = $"Call_{roomId.ToString().Replace("-", "")}"; + var roomName = $"Room Call #{roomId.ToString().Replace("-", "")}"; var requestBody = new { - title = $"Solar Room Call #{roomId}", + title = roomName, preferred_region = _configuration["Realtime:Cloudflare:PreferredRegion"], - data = metadata, - room_name = roomName + data = metadata }; var content = new StringContent(JsonSerializer.Serialize(requestBody), Encoding.UTF8, "application/json"); @@ -48,7 +50,8 @@ public class CloudflareRealtimeService : IRealtimeService response.EnsureSuccessStatusCode(); var responseContent = await response.Content.ReadAsStringAsync(); - var meetingResponse = JsonSerializer.Deserialize(responseContent); + var meetingResponse = JsonSerializer.Deserialize(responseContent); + if (meetingResponse is null) throw new Exception("Failed to create meeting with cloudflare"); return new RealtimeSessionConfig { @@ -77,7 +80,7 @@ public class CloudflareRealtimeService : IRealtimeService { return GetUserTokenAsync(account, sessionId, isAdmin).GetAwaiter().GetResult(); } - + public async Task GetUserTokenAsync(Account.Account account, string sessionId, bool isAdmin = false) { try @@ -93,14 +96,15 @@ public class CloudflareRealtimeService : IRealtimeService .PostAsync($"meetings/{sessionId}/participants/{account.Id}/token", null); tokenResponse.EnsureSuccessStatusCode(); var tokenContent = await tokenResponse.Content.ReadAsStringAsync(); - var tokenData = JsonSerializer.Deserialize>(tokenContent); + var tokenData = JsonSerializer.Deserialize>(tokenContent); if (tokenData == null || !tokenData.Success) { throw new Exception("Failed to get participant token"); } + return tokenData.Data?.Token ?? throw new Exception("Token is null"); } - + // Participant doesn't exist, create a new one var requestBody = new { @@ -110,20 +114,21 @@ public class CloudflareRealtimeService : IRealtimeService }; var content = new StringContent( - JsonSerializer.Serialize(requestBody), - Encoding.UTF8, + JsonSerializer.Serialize(requestBody), + Encoding.UTF8, "application/json" ); - + var createResponse = await _httpClient.PostAsync($"meetings/{sessionId}/participants", content); createResponse.EnsureSuccessStatusCode(); var responseContent = await createResponse.Content.ReadAsStringAsync(); - var participantData = JsonSerializer.Deserialize>(responseContent); + var participantData = JsonSerializer.Deserialize>(responseContent); if (participantData == null || !participantData.Success) { throw new Exception("Failed to create participant"); } + return participantData.Data?.Token ?? throw new Exception("Token is null"); } catch (Exception ex) @@ -156,17 +161,18 @@ public class CloudflareRealtimeService : IRealtimeService } // Process the webhook event - var webhookEvent = JsonSerializer.Deserialize(body); + var evt = JsonSerializer.Deserialize(body); + if (evt is null) return; - if (webhookEvent.Type == "participant.joined") + switch (evt.Type) { - await _chatRoomService.SetRoomCallStatus( - Guid.Parse(webhookEvent.Event.Meeting.RoomName.Replace("Call_", "")), true); - } - else if (webhookEvent.Type == "room.ended") - { - await _chatRoomService.SetRoomCallStatus( - Guid.Parse(webhookEvent.Event.Meeting.RoomName.Replace("Call_", "")), false); + case "meeting.ended": + var now = SystemClock.Instance.GetCurrentInstant(); + await _db.ChatRealtimeCall + .Where(c => c.SessionId == evt.Event.Meeting.Id) + .ExecuteUpdateAsync(s => s.SetProperty(p => p.EndedAt, now) + ); + break; } } @@ -197,72 +203,56 @@ public class CloudflareRealtimeService : IRealtimeService _publicKey.ImportFromPem(publicKeyPem); } - private class DyteMeetingResponse + private class CfMeetingResponse { - [JsonPropertyName("data")] - public DyteMeetingData Data { get; set; } = new(); + [JsonPropertyName("data")] public CfMeetingData Data { get; set; } = new(); } - private class DyteMeetingData + private class CfMeetingData { - [JsonPropertyName("id")] - public string Id { get; set; } = string.Empty; - [JsonPropertyName("roomName")] - public string RoomName { get; set; } = string.Empty; - [JsonPropertyName("title")] - public string Title { get; set; } = string.Empty; - [JsonPropertyName("status")] - public string Status { get; set; } = string.Empty; - [JsonPropertyName("createdAt")] - public DateTime CreatedAt { get; set; } - [JsonPropertyName("updatedAt")] - public DateTime UpdatedAt { get; set; } + [JsonPropertyName("id")] public string Id { get; set; } = string.Empty; + [JsonPropertyName("roomName")] public string RoomName { get; set; } = string.Empty; + [JsonPropertyName("title")] public string Title { get; set; } = string.Empty; + [JsonPropertyName("status")] public string Status { get; set; } = string.Empty; + [JsonPropertyName("createdAt")] public DateTime CreatedAt { get; set; } + [JsonPropertyName("updatedAt")] public DateTime UpdatedAt { get; set; } } - private class DyteParticipant + private class CfParticipant { public string Id { get; set; } = string.Empty; public string Token { get; set; } = string.Empty; public string CustomParticipantId { get; set; } = string.Empty; } - public class DyteResponse + public class CfResponse { - [JsonPropertyName("success")] - public bool Success { get; set; } - - [JsonPropertyName("data")] - public T? Data { get; set; } + [JsonPropertyName("success")] public bool Success { get; set; } + + [JsonPropertyName("data")] public T? Data { get; set; } } - public class DyteTokenResponse + public class CfTokenResponse { - [JsonPropertyName("token")] - public string Token { get; set; } = string.Empty; - } - - public class DyteParticipantResponse - { - [JsonPropertyName("id")] - public string Id { get; set; } = string.Empty; - - [JsonPropertyName("name")] - public string Name { get; set; } = string.Empty; - - [JsonPropertyName("customUserId")] - public string CustomUserId { get; set; } = string.Empty; - - [JsonPropertyName("presetName")] - public string PresetName { get; set; } = string.Empty; - - [JsonPropertyName("isActive")] - public bool IsActive { get; set; } - - [JsonPropertyName("token")] - public string Token { get; set; } = string.Empty; + [JsonPropertyName("token")] public string Token { get; set; } = string.Empty; } - public class DyteWebhookEvent + public class CfParticipantResponse + { + [JsonPropertyName("id")] public string Id { get; set; } = string.Empty; + + [JsonPropertyName("name")] public string Name { get; set; } = string.Empty; + + [JsonPropertyName("customUserId")] public string CustomUserId { get; set; } = string.Empty; + + [JsonPropertyName("presetName")] public string PresetName { get; set; } = string.Empty; + + [JsonPropertyName("isActive")] public bool IsActive { get; set; } + + [JsonPropertyName("token")] public string Token { get; set; } = string.Empty; + } + + public class CfWebhookEvent { [JsonPropertyName("id")] public string Id { get; set; } diff --git a/DysonNetwork.Sphere/Chat/Realtime/LiveKitService.cs b/DysonNetwork.Sphere/Chat/Realtime/LiveKitService.cs new file mode 100644 index 0000000..ac5ae4b --- /dev/null +++ b/DysonNetwork.Sphere/Chat/Realtime/LiveKitService.cs @@ -0,0 +1,307 @@ +using DysonNetwork.Sphere.Connection; +using DysonNetwork.Sphere.Storage; +using Livekit.Server.Sdk.Dotnet; +using Microsoft.EntityFrameworkCore; +using NodaTime; +using System.Text.Json; + +namespace DysonNetwork.Sphere.Chat.Realtime; + +/// +/// LiveKit implementation of the real-time communication service +/// +public class LiveKitRealtimeService : IRealtimeService +{ + private readonly AppDatabase _db; + private readonly ICacheService _cache; + private readonly RealtimeStatusService _callStatus; + + private readonly ILogger _logger; + private readonly RoomServiceClient _roomService; + private readonly AccessToken _accessToken; + private readonly WebhookReceiver _webhookReceiver; + + public LiveKitRealtimeService( + IConfiguration configuration, + ILogger logger, + AppDatabase db, + ICacheService cache, + RealtimeStatusService callStatus + ) + { + _logger = logger; + + // Get LiveKit configuration from appsettings + var host = configuration["Realtime:LiveKit:Endpoint"] ?? + throw new ArgumentNullException("Endpoint configuration is required"); + var apiKey = configuration["Realtime:LiveKit:ApiKey"] ?? + throw new ArgumentNullException("ApiKey configuration is required"); + var apiSecret = configuration["Realtime:LiveKit:ApiSecret"] ?? + throw new ArgumentNullException("ApiSecret configuration is required"); + + _roomService = new RoomServiceClient(host, apiKey, apiSecret); + _accessToken = new AccessToken(apiKey, apiSecret); + _webhookReceiver = new WebhookReceiver(apiKey, apiSecret); + + _db = db; + _cache = cache; + _callStatus = callStatus; + } + + /// + public string ProviderName => "LiveKit"; + + /// + public async Task CreateSessionAsync(Guid roomId, Dictionary metadata) + { + try + { + var roomName = $"Call_{roomId.ToString().Replace("-", "")}"; + + // Convert metadata to a string dictionary for LiveKit + var roomMetadata = new Dictionary(); + foreach (var item in metadata) + { + roomMetadata[item.Key] = item.Value?.ToString() ?? string.Empty; + } + + // Create room in LiveKit + var room = await _roomService.CreateRoom(new CreateRoomRequest + { + Name = roomName, + EmptyTimeout = 300, // 5 minutes + Metadata = JsonSerializer.Serialize(roomMetadata) + }); + + // Return session config + return new RealtimeSessionConfig + { + SessionId = room.Name, + Parameters = new Dictionary + { + { "sid", room.Sid }, + { "emptyTimeout", room.EmptyTimeout }, + { "creationTime", room.CreationTime } + } + }; + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to create LiveKit room for roomId: {RoomId}", roomId); + throw; + } + } + + /// + public async Task EndSessionAsync(string sessionId, RealtimeSessionConfig config) + { + try + { + // Delete the room in LiveKit + await _roomService.DeleteRoom(new DeleteRoomRequest + { + Room = sessionId + }); + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to end LiveKit session: {SessionId}", sessionId); + throw; + } + } + + /// + public string GetUserToken(Account.Account account, string sessionId, bool isAdmin = false) + { + return GetUserTokenAsync(account, sessionId, isAdmin).GetAwaiter().GetResult(); + } + + public Task GetUserTokenAsync(Account.Account account, string sessionId, bool isAdmin = false) + { + var token = _accessToken.WithIdentity(account.Name) + .WithName(account.Nick) + .WithGrants(new VideoGrants + { + RoomJoin = true, + CanPublish = true, + CanPublishData = true, + CanSubscribe = true, + CanSubscribeMetrics = true, + RoomAdmin = isAdmin, + Room = sessionId + }) + .WithMetadata(JsonSerializer.Serialize(new Dictionary + { { "account_id", account.Id.ToString() } })) + .WithTtl(TimeSpan.FromHours(1)); + return Task.FromResult(token.ToJwt()); + } + + public async Task ReceiveWebhook(string body, string authHeader) + { + var evt = _webhookReceiver.Receive(body, authHeader); + if (evt is null) return; + + switch (evt.Event) + { + case "room_finished": + var now = SystemClock.Instance.GetCurrentInstant(); + await _db.ChatRealtimeCall + .Where(c => c.SessionId == evt.Room.Name) + .ExecuteUpdateAsync(s => s.SetProperty(p => p.EndedAt, now) + ); + + // Also clean up participants list when the room is finished + await _cache.RemoveAsync(_GetParticipantsKey(evt.Room.Name)); + break; + + case "participant_joined": + if (evt.Participant != null) + { + // Add the participant to cache + await _AddParticipantToCache(evt.Room.Name, evt.Participant); + _logger.LogInformation( + "Participant joined room: {RoomName}, Participant: {ParticipantIdentity}", + evt.Room.Name, evt.Participant.Identity); + + // Broadcast participant list update to all participants + var info = await GetRoomParticipantsAsync(evt.Room.Name); + await _callStatus.BroadcastParticipantUpdate(evt.Room.Name, info); + } + + break; + + case "participant_left": + if (evt.Participant != null) + { + // Remove the participant from cache + await _RemoveParticipantFromCache(evt.Room.Name, evt.Participant); + _logger.LogInformation( + "Participant left room: {RoomName}, Participant: {ParticipantIdentity}", + evt.Room.Name, evt.Participant.Identity); + + // Broadcast participant list update to all participants + var info = await GetRoomParticipantsAsync(evt.Room.Name); + await _callStatus.BroadcastParticipantUpdate(evt.Room.Name, info); + } + + break; + } + } + + private static string _GetParticipantsKey(string roomName) + => $"RoomParticipants_{roomName}"; + + private async Task _AddParticipantToCache(string roomName, ParticipantInfo participant) + { + var participantsKey = _GetParticipantsKey(roomName); + + // Try to acquire a lock to prevent race conditions when updating the participants list + await using var lockObj = await _cache.AcquireLockAsync( + $"{participantsKey}_lock", + TimeSpan.FromSeconds(10), + TimeSpan.FromSeconds(5)); + + if (lockObj == null) + { + _logger.LogWarning("Failed to acquire lock for updating participants list in room: {RoomName}", roomName); + return; + } + + // Get the current participants list + var participants = await _cache.GetAsync>(participantsKey) ?? + []; + + // Check if the participant already exists + var existingIndex = participants.FindIndex(p => p.Identity == participant.Identity); + if (existingIndex >= 0) + { + // Update existing participant + participants[existingIndex] = CreateParticipantCacheItem(participant); + } + else + { + // Add new participant + participants.Add(CreateParticipantCacheItem(participant)); + } + + // Update cache with new list + await _cache.SetAsync(participantsKey, participants, TimeSpan.FromHours(6)); + + // Also add to a room group in cache for easy cleanup + await _cache.AddToGroupAsync(participantsKey, $"Room_{roomName}"); + } + + private async Task _RemoveParticipantFromCache(string roomName, ParticipantInfo participant) + { + var participantsKey = _GetParticipantsKey(roomName); + + // Try to acquire a lock to prevent race conditions when updating the participants list + await using var lockObj = await _cache.AcquireLockAsync( + $"{participantsKey}_lock", + TimeSpan.FromSeconds(10), + TimeSpan.FromSeconds(5)); + + if (lockObj == null) + { + _logger.LogWarning("Failed to acquire lock for updating participants list in room: {RoomName}", roomName); + return; + } + + // Get current participants list + var participants = await _cache.GetAsync>(participantsKey); + if (participants == null || !participants.Any()) + return; + + // Remove participant + participants.RemoveAll(p => p.Identity == participant.Identity); + + // Update cache with new list + await _cache.SetAsync(participantsKey, participants, TimeSpan.FromHours(6)); + } + + // Helper method to get participants in a room + public async Task> GetRoomParticipantsAsync(string roomName) + { + var participantsKey = _GetParticipantsKey(roomName); + return await _cache.GetAsync>(participantsKey) ?? []; + } + + private ParticipantInfoItem CreateParticipantCacheItem(ParticipantInfo participant) + { + // Try to parse account ID from metadata + Guid? accountId = null; + var metadata = new Dictionary(); + + if (string.IsNullOrEmpty(participant.Metadata)) + return new ParticipantInfoItem + { + Identity = participant.Identity, + Name = participant.Name, + AccountId = accountId, + Metadata = metadata, + JoinedAt = DateTime.UtcNow + }; + try + { + metadata = JsonSerializer.Deserialize>(participant.Metadata) ?? + new Dictionary(); + + if (metadata.TryGetValue("account_id", out var accountIdStr)) + if (Guid.TryParse(accountIdStr, out var parsedId)) + accountId = parsedId; + } + catch (Exception ex) + { + _logger.LogError(ex, "Failed to parse participant metadata"); + } + + return new ParticipantInfoItem + { + Identity = participant.Identity, + Name = participant.Name, + AccountId = accountId, + Metadata = metadata, + JoinedAt = DateTime.UtcNow + }; + } +} diff --git a/DysonNetwork.Sphere/Chat/Realtime/LivekitService.cs b/DysonNetwork.Sphere/Chat/Realtime/LivekitService.cs deleted file mode 100644 index 591f730..0000000 --- a/DysonNetwork.Sphere/Chat/Realtime/LivekitService.cs +++ /dev/null @@ -1,136 +0,0 @@ -using DysonNetwork.Sphere.Connection; -using DysonNetwork.Sphere.Storage; -using Livekit.Server.Sdk.Dotnet; -using Microsoft.EntityFrameworkCore; -using NodaTime; -using System.Text.Json; - -namespace DysonNetwork.Sphere.Chat.Realtime; - -/// -/// LiveKit implementation of the real-time communication service -/// -public class LivekitRealtimeService : IRealtimeService -{ - private readonly ILogger _logger; - private readonly RoomServiceClient _roomService; - private readonly AccessToken _accessToken; - private readonly WebhookReceiver _webhookReceiver; - - public LivekitRealtimeService( - IConfiguration configuration, - ILogger logger) - { - _logger = logger; - - // Get LiveKit configuration from appsettings - var host = configuration["Livekit:Endpoint"] ?? - throw new ArgumentNullException("Endpoint configuration is required"); - var apiKey = configuration["Livekit:ApiKey"] ?? - throw new ArgumentNullException("ApiKey configuration is required"); - var apiSecret = configuration["Livekit:ApiSecret"] ?? - throw new ArgumentNullException("ApiSecret configuration is required"); - - _roomService = new RoomServiceClient(host, apiKey, apiSecret); - _accessToken = new AccessToken(apiKey, apiSecret); - _webhookReceiver = new WebhookReceiver(apiKey, apiSecret); - } - - /// - public string ProviderName => "LiveKit"; - - /// - public async Task CreateSessionAsync(Guid roomId, Dictionary metadata) - { - try - { - var roomName = $"Call_{roomId.ToString().Replace("-", "")}"; - - // Convert metadata to a string dictionary for LiveKit - var roomMetadata = new Dictionary(); - foreach (var item in metadata) - { - roomMetadata[item.Key] = item.Value?.ToString() ?? string.Empty; - } - - // Create room in LiveKit - var room = await _roomService.CreateRoom(new CreateRoomRequest - { - Name = roomName, - EmptyTimeout = 300, // 5 minutes - Metadata = JsonSerializer.Serialize(roomMetadata) - }); - - // Return session config - return new RealtimeSessionConfig - { - SessionId = room.Name, - Parameters = new Dictionary - { - { "sid", room.Sid }, - { "emptyTimeout", room.EmptyTimeout }, - { "creationTime", room.CreationTime } - } - }; - } - catch (Exception ex) - { - _logger.LogError(ex, "Failed to create LiveKit room for roomId: {RoomId}", roomId); - throw; - } - } - - /// - public async Task EndSessionAsync(string sessionId, RealtimeSessionConfig config) - { - try - { - // Delete the room in LiveKit - await _roomService.DeleteRoom(new DeleteRoomRequest - { - Room = sessionId - }); - } - catch (Exception ex) - { - _logger.LogError(ex, "Failed to end LiveKit session: {SessionId}", sessionId); - throw; - } - } - - /// - public string GetUserToken(Account.Account account, string sessionId, bool isAdmin = false) - { - return GetUserTokenAsync(account, sessionId, isAdmin).GetAwaiter().GetResult(); - } - - public Task GetUserTokenAsync(Account.Account account, string sessionId, bool isAdmin = false) - { - var token = _accessToken.WithIdentity(account.Name) - .WithName(account.Nick) - .WithGrants(new VideoGrants - { - RoomJoin = true, - CanPublish = true, - CanPublishData = true, - CanSubscribe = true, - CanSubscribeMetrics = true, - RoomAdmin = isAdmin, - Room = sessionId - }) - .WithMetadata(JsonSerializer.Serialize(new Dictionary - { { "account_id", account.Id.ToString() } })) - .WithTtl(TimeSpan.FromHours(1)); - return Task.FromResult(token.ToJwt()); - } - - public Task ReceiveWebhook(string body, string authHeader) - { - var evt = _webhookReceiver.Receive(body, authHeader); - if (evt is null) return Task.CompletedTask; - - // TODO: Handle webhook events - - return Task.CompletedTask; - } -} diff --git a/DysonNetwork.Sphere/Chat/Realtime/RealtimeStatusService.cs b/DysonNetwork.Sphere/Chat/Realtime/RealtimeStatusService.cs new file mode 100644 index 0000000..2fdd815 --- /dev/null +++ b/DysonNetwork.Sphere/Chat/Realtime/RealtimeStatusService.cs @@ -0,0 +1,91 @@ +using DysonNetwork.Sphere.Connection; +using Microsoft.EntityFrameworkCore; + +namespace DysonNetwork.Sphere.Chat.Realtime; + +public class ParticipantInfoItem +{ + public string Identity { get; set; } = null!; + public string Name { get; set; } = null!; + public Guid? AccountId { get; set; } + public Dictionary Metadata { get; set; } = new(); + public DateTime JoinedAt { get; set; } +} + +public class RealtimeStatusService(AppDatabase db, WebSocketService ws, ILogger logger) +{ + // Broadcast participant update to all participants in a room + public async Task BroadcastParticipantUpdate(string roomName, List participantsInfo) + { + try + { + // Get the room ID from the session name + var roomInfo = await db.ChatRealtimeCall + .Where(c => c.SessionId == roomName && c.EndedAt == null) + .Select(c => new { c.RoomId, c.Id }) + .FirstOrDefaultAsync(); + + if (roomInfo == null) + { + logger.LogWarning("Could not find room info for session: {SessionName}", roomName); + return; + } + + // Get all room members who should receive this update + var roomMembers = await db.ChatMembers + .Where(m => m.ChatRoomId == roomInfo.RoomId && m.LeaveAt == null) + .Select(m => m.AccountId) + .ToListAsync(); + + // Get member profiles for participants who have account IDs + var accountIds = participantsInfo + .Where(p => p.AccountId.HasValue) + .Select(p => p.AccountId!.Value) + .ToList(); + + var memberProfiles = new Dictionary(); + if (accountIds.Count != 0) + { + memberProfiles = await db.ChatMembers + .Where(m => m.ChatRoomId == roomInfo.RoomId && accountIds.Contains(m.AccountId)) + .Include(m => m.Account) + .ThenInclude(m => m.Profile) + .ToDictionaryAsync(m => m.AccountId, m => m); + } + + // Convert to CallParticipant objects + var participants = participantsInfo.Select(p => new CallParticipant + { + Identity = p.Identity, + Name = p.Name, + AccountId = p.AccountId, + JoinedAt = p.JoinedAt, + Profile = p.AccountId.HasValue && memberProfiles.TryGetValue(p.AccountId.Value, out var profile) + ? profile + : null + }).ToList(); + + // Create the update packet with CallParticipant objects + var updatePacket = new WebSocketPacket + { + Type = WebSocketPacketType.CallParticipantsUpdate, + Data = new Dictionary + { + { "room_id", roomInfo.RoomId }, + { "call_id", roomInfo.Id }, + { "participants", participants } + } + }; + + // Send the update to all members + foreach (var accountId in roomMembers) + { + ws.SendPacketToAccount(accountId, updatePacket); + } + } + catch (Exception ex) + { + logger.LogError(ex, "Error broadcasting participant update for room {RoomName}", roomName); + } + } +} \ No newline at end of file diff --git a/DysonNetwork.Sphere/Startup/ServiceCollectionExtensions.cs b/DysonNetwork.Sphere/Startup/ServiceCollectionExtensions.cs index 9abb006..4f6287f 100644 --- a/DysonNetwork.Sphere/Startup/ServiceCollectionExtensions.cs +++ b/DysonNetwork.Sphere/Startup/ServiceCollectionExtensions.cs @@ -229,6 +229,7 @@ public static class ServiceCollectionExtensions services.AddScoped(); services.AddScoped(); services.AddScoped(); + services.AddScoped(); services.AddRealtimeService(configuration); services.AddScoped(); services.AddScoped(); @@ -252,7 +253,7 @@ public static class ServiceCollectionExtensions services.AddHttpClient(); break; case "LiveKit": - services.AddScoped(); + services.AddScoped(); break; default: throw new NotSupportedException($"Realtime provider '{provider}' is not supported."); diff --git a/DysonNetwork.sln.DotSettings.user b/DysonNetwork.sln.DotSettings.user index 897f1de..c088690 100644 --- a/DysonNetwork.sln.DotSettings.user +++ b/DysonNetwork.sln.DotSettings.user @@ -56,6 +56,7 @@ ForceIncluded ForceIncluded ForceIncluded + ForceIncluded ForceIncluded ForceIncluded ForceIncluded From da4ee81c951455fb4714017629208ebb5ef94b29 Mon Sep 17 00:00:00 2001 From: LittleSheep Date: Fri, 11 Jul 2025 23:41:39 +0800 Subject: [PATCH 3/4] :bug: Bug fixes on swagger ui --- DysonNetwork.Sphere/Connection/ClientTypeMiddleware.cs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/DysonNetwork.Sphere/Connection/ClientTypeMiddleware.cs b/DysonNetwork.Sphere/Connection/ClientTypeMiddleware.cs index 8a305aa..447d9ac 100644 --- a/DysonNetwork.Sphere/Connection/ClientTypeMiddleware.cs +++ b/DysonNetwork.Sphere/Connection/ClientTypeMiddleware.cs @@ -31,7 +31,10 @@ public class ClientTypeMiddleware(RequestDelegate next) context.Items["IsWebPage"] = isWebPage; - if (!isWebPage && context.Request.Path != "/ws" && !context.Request.Path.StartsWithSegments("/api")) + var redirectWhiteList = new[] { "/ws", "/.well-known", "/swagger" }; + if(redirectWhiteList.Any(w => context.Request.Path.StartsWithSegments(w))) + await next(context); + else if (!isWebPage && !context.Request.Path.StartsWithSegments("/api")) context.Response.Redirect( $"/api{context.Request.Path.Value}{context.Request.QueryString.Value}", permanent: false From b12e3315febe3834d0dd98d8480ce34d2f9310f0 Mon Sep 17 00:00:00 2001 From: LittleSheep Date: Sat, 12 Jul 2025 01:54:00 +0800 Subject: [PATCH 4/4] :bug: Fixes of bugs --- .../Chat/Realtime/CloudflareService.cs | 9 +++-- .../Chat/RealtimeCallController.cs | 36 +++++++++---------- DysonNetwork.Sphere/appsettings.json | 2 +- DysonNetwork.sln.DotSettings.user | 1 + 4 files changed, 25 insertions(+), 23 deletions(-) diff --git a/DysonNetwork.Sphere/Chat/Realtime/CloudflareService.cs b/DysonNetwork.Sphere/Chat/Realtime/CloudflareService.cs index 188b9de..edd9fda 100644 --- a/DysonNetwork.Sphere/Chat/Realtime/CloudflareService.cs +++ b/DysonNetwork.Sphere/Chat/Realtime/CloudflareService.cs @@ -40,8 +40,7 @@ public class CloudflareRealtimeService : IRealtimeService var requestBody = new { title = roomName, - preferred_region = _configuration["Realtime:Cloudflare:PreferredRegion"], - data = metadata + preferred_region = _configuration["Realtime:Cloudflare:PreferredRegion"] }; var content = new StringContent(JsonSerializer.Serialize(requestBody), Encoding.UTF8, "application/json"); @@ -106,11 +105,15 @@ public class CloudflareRealtimeService : IRealtimeService } // Participant doesn't exist, create a new one + var baseUrl = _configuration["BaseUrl"]; var requestBody = new { name = "@" + account.Name, + picture = account.Profile.Picture is not null + ? $"{baseUrl}/api/files/{account.Profile.Picture.Id}" + : null, preset_name = isAdmin ? "group_call_host" : "group_call_participant", - custom_user_id = account.Id.ToString() + custom_participant_id = account.Id.ToString() }; var content = new StringContent( diff --git a/DysonNetwork.Sphere/Chat/RealtimeCallController.cs b/DysonNetwork.Sphere/Chat/RealtimeCallController.cs index 53e702c..426b856 100644 --- a/DysonNetwork.Sphere/Chat/RealtimeCallController.cs +++ b/DysonNetwork.Sphere/Chat/RealtimeCallController.cs @@ -6,11 +6,6 @@ using Swashbuckle.AspNetCore.Annotations; namespace DysonNetwork.Sphere.Chat; -public class RealtimeChatConfiguration -{ - public string Endpoint { get; set; } = null!; -} - [ApiController] [Route("/api/chat/realtime")] public class RealtimeCallController( @@ -20,9 +15,6 @@ public class RealtimeCallController( IRealtimeService realtime ) : ControllerBase { - private readonly RealtimeChatConfiguration _config = - configuration.GetSection("RealtimeChat").Get()!; - /// /// This endpoint is especially designed for livekit webhooks, /// for update the call participates and more. @@ -35,9 +27,9 @@ public class RealtimeCallController( using var reader = new StreamReader(Request.Body); var postData = await reader.ReadToEndAsync(); var authHeader = Request.Headers.Authorization.ToString(); - + await realtime.ReceiveWebhook(postData, authHeader); - + return Ok(); } @@ -90,11 +82,17 @@ public class RealtimeCallController( return BadRequest("Call session is not properly configured."); var isAdmin = member.Role >= ChatMemberRole.Moderator; - var userToken = realtime.GetUserToken(currentUser, ongoingCall.SessionId, isAdmin); + var userToken = await realtime.GetUserTokenAsync(currentUser, ongoingCall.SessionId, isAdmin); // Get LiveKit endpoint from configuration - var endpoint = _config.Endpoint ?? - throw new InvalidOperationException("LiveKit endpoint configuration is missing"); + var endpoint = configuration[$"Realtime:{realtime.ProviderName}:Endpoint"] ?? realtime.ProviderName switch + { + // Unusable for sure, just for placeholder + "LiveKit" => "https://livekit.cloud", + "Cloudflare" => "https://rtk.realtime.cloudflare.com/v2", + // Unusable for sure, just for placeholder + _ => "https://example.com" + }; // Create the response model var response = new JoinCallResponse @@ -162,7 +160,7 @@ public class JoinCallResponse public string Provider { get; set; } = null!; /// - /// The LiveKit server endpoint + /// The provider server endpoint /// public string Endpoint { get; set; } = null!; @@ -196,24 +194,24 @@ public class CallParticipant /// The participant's identity (username) /// public string Identity { get; set; } = null!; - + /// /// The participant's display name /// public string Name { get; set; } = null!; - + /// /// The participant's account ID if available /// public Guid? AccountId { get; set; } - + /// /// The participant's profile in the chat /// public ChatMember? Profile { get; set; } - + /// /// When the participant joined the call /// public DateTime JoinedAt { get; set; } -} +} \ No newline at end of file diff --git a/DysonNetwork.Sphere/appsettings.json b/DysonNetwork.Sphere/appsettings.json index 0f21995..7d464e0 100644 --- a/DysonNetwork.Sphere/appsettings.json +++ b/DysonNetwork.Sphere/appsettings.json @@ -86,7 +86,7 @@ "SubjectPrefix": "Solar Network" }, "Realtime": { - "Provider": "LiveKit", + "Provider": "Cloudflare", "LiveKit": { "Endpoint": "https://solar-network-im44o8gq.livekit.cloud", "ApiKey": "APIs6TiL8wj3A4j", diff --git a/DysonNetwork.sln.DotSettings.user b/DysonNetwork.sln.DotSettings.user index c088690..2d8eb61 100644 --- a/DysonNetwork.sln.DotSettings.user +++ b/DysonNetwork.sln.DotSettings.user @@ -85,6 +85,7 @@ ForceIncluded ForceIncluded ForceIncluded + ForceIncluded ForceIncluded ForceIncluded ForceIncluded