♻️ Refactored webhook receiver in realtime call
This commit is contained in:
@ -1,29 +1,32 @@
|
||||
using System.Security.Cryptography;
|
||||
using System.Security.Cryptography.X509Certificates;
|
||||
using System.Text;
|
||||
using System.Text.Json;
|
||||
using Microsoft.IdentityModel.Tokens;
|
||||
using System.Text.Json.Serialization;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using NodaTime;
|
||||
|
||||
namespace DysonNetwork.Sphere.Chat.Realtime;
|
||||
|
||||
public class CloudflareRealtimeService : IRealtimeService
|
||||
{
|
||||
private readonly AppDatabase _db;
|
||||
private readonly HttpClient _httpClient;
|
||||
private readonly IConfiguration _configuration;
|
||||
private readonly string _apiSecret;
|
||||
private readonly ChatRoomService _chatRoomService;
|
||||
private RSA? _publicKey;
|
||||
|
||||
public CloudflareRealtimeService(HttpClient httpClient, IConfiguration configuration,
|
||||
ChatRoomService chatRoomService)
|
||||
public CloudflareRealtimeService(
|
||||
AppDatabase db,
|
||||
HttpClient httpClient,
|
||||
IConfiguration configuration
|
||||
)
|
||||
{
|
||||
_db = db;
|
||||
_httpClient = httpClient;
|
||||
_configuration = configuration;
|
||||
_chatRoomService = chatRoomService;
|
||||
var apiKey = _configuration["Realtime:Cloudflare:ApiKey"];
|
||||
_apiSecret = _configuration["Realtime:Cloudflare:ApiSecret"]!;
|
||||
var credentials = Convert.ToBase64String(Encoding.ASCII.GetBytes($"{apiKey}:{_apiSecret}"));
|
||||
var apiSecret = _configuration["Realtime:Cloudflare:ApiSecret"]!;
|
||||
var credentials = Convert.ToBase64String(Encoding.ASCII.GetBytes($"{apiKey}:{apiSecret}"));
|
||||
_httpClient.BaseAddress = new Uri("https://rtk.realtime.cloudflare.com/v2/");
|
||||
_httpClient.DefaultRequestHeaders.Authorization =
|
||||
new System.Net.Http.Headers.AuthenticationHeaderValue("Basic", credentials);
|
||||
@ -33,13 +36,12 @@ public class CloudflareRealtimeService : IRealtimeService
|
||||
|
||||
public async Task<RealtimeSessionConfig> CreateSessionAsync(Guid roomId, Dictionary<string, object> metadata)
|
||||
{
|
||||
var roomName = $"Call_{roomId.ToString().Replace("-", "")}";
|
||||
var roomName = $"Room Call #{roomId.ToString().Replace("-", "")}";
|
||||
var requestBody = new
|
||||
{
|
||||
title = $"Solar Room Call #{roomId}",
|
||||
title = roomName,
|
||||
preferred_region = _configuration["Realtime:Cloudflare:PreferredRegion"],
|
||||
data = metadata,
|
||||
room_name = roomName
|
||||
data = metadata
|
||||
};
|
||||
|
||||
var content = new StringContent(JsonSerializer.Serialize(requestBody), Encoding.UTF8, "application/json");
|
||||
@ -48,7 +50,8 @@ public class CloudflareRealtimeService : IRealtimeService
|
||||
response.EnsureSuccessStatusCode();
|
||||
|
||||
var responseContent = await response.Content.ReadAsStringAsync();
|
||||
var meetingResponse = JsonSerializer.Deserialize<DyteMeetingResponse>(responseContent);
|
||||
var meetingResponse = JsonSerializer.Deserialize<CfMeetingResponse>(responseContent);
|
||||
if (meetingResponse is null) throw new Exception("Failed to create meeting with cloudflare");
|
||||
|
||||
return new RealtimeSessionConfig
|
||||
{
|
||||
@ -77,7 +80,7 @@ public class CloudflareRealtimeService : IRealtimeService
|
||||
{
|
||||
return GetUserTokenAsync(account, sessionId, isAdmin).GetAwaiter().GetResult();
|
||||
}
|
||||
|
||||
|
||||
public async Task<string> GetUserTokenAsync(Account.Account account, string sessionId, bool isAdmin = false)
|
||||
{
|
||||
try
|
||||
@ -93,14 +96,15 @@ public class CloudflareRealtimeService : IRealtimeService
|
||||
.PostAsync($"meetings/{sessionId}/participants/{account.Id}/token", null);
|
||||
tokenResponse.EnsureSuccessStatusCode();
|
||||
var tokenContent = await tokenResponse.Content.ReadAsStringAsync();
|
||||
var tokenData = JsonSerializer.Deserialize<DyteResponse<DyteTokenResponse>>(tokenContent);
|
||||
var tokenData = JsonSerializer.Deserialize<CfResponse<CfTokenResponse>>(tokenContent);
|
||||
if (tokenData == null || !tokenData.Success)
|
||||
{
|
||||
throw new Exception("Failed to get participant token");
|
||||
}
|
||||
|
||||
return tokenData.Data?.Token ?? throw new Exception("Token is null");
|
||||
}
|
||||
|
||||
|
||||
// Participant doesn't exist, create a new one
|
||||
var requestBody = new
|
||||
{
|
||||
@ -110,20 +114,21 @@ public class CloudflareRealtimeService : IRealtimeService
|
||||
};
|
||||
|
||||
var content = new StringContent(
|
||||
JsonSerializer.Serialize(requestBody),
|
||||
Encoding.UTF8,
|
||||
JsonSerializer.Serialize(requestBody),
|
||||
Encoding.UTF8,
|
||||
"application/json"
|
||||
);
|
||||
|
||||
|
||||
var createResponse = await _httpClient.PostAsync($"meetings/{sessionId}/participants", content);
|
||||
createResponse.EnsureSuccessStatusCode();
|
||||
|
||||
var responseContent = await createResponse.Content.ReadAsStringAsync();
|
||||
var participantData = JsonSerializer.Deserialize<DyteResponse<DyteParticipantResponse>>(responseContent);
|
||||
var participantData = JsonSerializer.Deserialize<CfResponse<CfParticipantResponse>>(responseContent);
|
||||
if (participantData == null || !participantData.Success)
|
||||
{
|
||||
throw new Exception("Failed to create participant");
|
||||
}
|
||||
|
||||
return participantData.Data?.Token ?? throw new Exception("Token is null");
|
||||
}
|
||||
catch (Exception ex)
|
||||
@ -156,17 +161,18 @@ public class CloudflareRealtimeService : IRealtimeService
|
||||
}
|
||||
|
||||
// Process the webhook event
|
||||
var webhookEvent = JsonSerializer.Deserialize<DyteWebhookEvent>(body);
|
||||
var evt = JsonSerializer.Deserialize<CfWebhookEvent>(body);
|
||||
if (evt is null) return;
|
||||
|
||||
if (webhookEvent.Type == "participant.joined")
|
||||
switch (evt.Type)
|
||||
{
|
||||
await _chatRoomService.SetRoomCallStatus(
|
||||
Guid.Parse(webhookEvent.Event.Meeting.RoomName.Replace("Call_", "")), true);
|
||||
}
|
||||
else if (webhookEvent.Type == "room.ended")
|
||||
{
|
||||
await _chatRoomService.SetRoomCallStatus(
|
||||
Guid.Parse(webhookEvent.Event.Meeting.RoomName.Replace("Call_", "")), false);
|
||||
case "meeting.ended":
|
||||
var now = SystemClock.Instance.GetCurrentInstant();
|
||||
await _db.ChatRealtimeCall
|
||||
.Where(c => c.SessionId == evt.Event.Meeting.Id)
|
||||
.ExecuteUpdateAsync(s => s.SetProperty(p => p.EndedAt, now)
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@ -197,72 +203,56 @@ public class CloudflareRealtimeService : IRealtimeService
|
||||
_publicKey.ImportFromPem(publicKeyPem);
|
||||
}
|
||||
|
||||
private class DyteMeetingResponse
|
||||
private class CfMeetingResponse
|
||||
{
|
||||
[JsonPropertyName("data")]
|
||||
public DyteMeetingData Data { get; set; } = new();
|
||||
[JsonPropertyName("data")] public CfMeetingData Data { get; set; } = new();
|
||||
}
|
||||
|
||||
private class DyteMeetingData
|
||||
private class CfMeetingData
|
||||
{
|
||||
[JsonPropertyName("id")]
|
||||
public string Id { get; set; } = string.Empty;
|
||||
[JsonPropertyName("roomName")]
|
||||
public string RoomName { get; set; } = string.Empty;
|
||||
[JsonPropertyName("title")]
|
||||
public string Title { get; set; } = string.Empty;
|
||||
[JsonPropertyName("status")]
|
||||
public string Status { get; set; } = string.Empty;
|
||||
[JsonPropertyName("createdAt")]
|
||||
public DateTime CreatedAt { get; set; }
|
||||
[JsonPropertyName("updatedAt")]
|
||||
public DateTime UpdatedAt { get; set; }
|
||||
[JsonPropertyName("id")] public string Id { get; set; } = string.Empty;
|
||||
[JsonPropertyName("roomName")] public string RoomName { get; set; } = string.Empty;
|
||||
[JsonPropertyName("title")] public string Title { get; set; } = string.Empty;
|
||||
[JsonPropertyName("status")] public string Status { get; set; } = string.Empty;
|
||||
[JsonPropertyName("createdAt")] public DateTime CreatedAt { get; set; }
|
||||
[JsonPropertyName("updatedAt")] public DateTime UpdatedAt { get; set; }
|
||||
}
|
||||
|
||||
private class DyteParticipant
|
||||
private class CfParticipant
|
||||
{
|
||||
public string Id { get; set; } = string.Empty;
|
||||
public string Token { get; set; } = string.Empty;
|
||||
public string CustomParticipantId { get; set; } = string.Empty;
|
||||
}
|
||||
|
||||
public class DyteResponse<T>
|
||||
public class CfResponse<T>
|
||||
{
|
||||
[JsonPropertyName("success")]
|
||||
public bool Success { get; set; }
|
||||
|
||||
[JsonPropertyName("data")]
|
||||
public T? Data { get; set; }
|
||||
[JsonPropertyName("success")] public bool Success { get; set; }
|
||||
|
||||
[JsonPropertyName("data")] public T? Data { get; set; }
|
||||
}
|
||||
|
||||
public class DyteTokenResponse
|
||||
public class CfTokenResponse
|
||||
{
|
||||
[JsonPropertyName("token")]
|
||||
public string Token { get; set; } = string.Empty;
|
||||
}
|
||||
|
||||
public class DyteParticipantResponse
|
||||
{
|
||||
[JsonPropertyName("id")]
|
||||
public string Id { get; set; } = string.Empty;
|
||||
|
||||
[JsonPropertyName("name")]
|
||||
public string Name { get; set; } = string.Empty;
|
||||
|
||||
[JsonPropertyName("customUserId")]
|
||||
public string CustomUserId { get; set; } = string.Empty;
|
||||
|
||||
[JsonPropertyName("presetName")]
|
||||
public string PresetName { get; set; } = string.Empty;
|
||||
|
||||
[JsonPropertyName("isActive")]
|
||||
public bool IsActive { get; set; }
|
||||
|
||||
[JsonPropertyName("token")]
|
||||
public string Token { get; set; } = string.Empty;
|
||||
[JsonPropertyName("token")] public string Token { get; set; } = string.Empty;
|
||||
}
|
||||
|
||||
public class DyteWebhookEvent
|
||||
public class CfParticipantResponse
|
||||
{
|
||||
[JsonPropertyName("id")] public string Id { get; set; } = string.Empty;
|
||||
|
||||
[JsonPropertyName("name")] public string Name { get; set; } = string.Empty;
|
||||
|
||||
[JsonPropertyName("customUserId")] public string CustomUserId { get; set; } = string.Empty;
|
||||
|
||||
[JsonPropertyName("presetName")] public string PresetName { get; set; } = string.Empty;
|
||||
|
||||
[JsonPropertyName("isActive")] public bool IsActive { get; set; }
|
||||
|
||||
[JsonPropertyName("token")] public string Token { get; set; } = string.Empty;
|
||||
}
|
||||
|
||||
public class CfWebhookEvent
|
||||
{
|
||||
[JsonPropertyName("id")] public string Id { get; set; }
|
||||
|
307
DysonNetwork.Sphere/Chat/Realtime/LiveKitService.cs
Normal file
307
DysonNetwork.Sphere/Chat/Realtime/LiveKitService.cs
Normal file
@ -0,0 +1,307 @@
|
||||
using DysonNetwork.Sphere.Connection;
|
||||
using DysonNetwork.Sphere.Storage;
|
||||
using Livekit.Server.Sdk.Dotnet;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using NodaTime;
|
||||
using System.Text.Json;
|
||||
|
||||
namespace DysonNetwork.Sphere.Chat.Realtime;
|
||||
|
||||
/// <summary>
|
||||
/// LiveKit implementation of the real-time communication service
|
||||
/// </summary>
|
||||
public class LiveKitRealtimeService : IRealtimeService
|
||||
{
|
||||
private readonly AppDatabase _db;
|
||||
private readonly ICacheService _cache;
|
||||
private readonly RealtimeStatusService _callStatus;
|
||||
|
||||
private readonly ILogger<LiveKitRealtimeService> _logger;
|
||||
private readonly RoomServiceClient _roomService;
|
||||
private readonly AccessToken _accessToken;
|
||||
private readonly WebhookReceiver _webhookReceiver;
|
||||
|
||||
public LiveKitRealtimeService(
|
||||
IConfiguration configuration,
|
||||
ILogger<LiveKitRealtimeService> logger,
|
||||
AppDatabase db,
|
||||
ICacheService cache,
|
||||
RealtimeStatusService callStatus
|
||||
)
|
||||
{
|
||||
_logger = logger;
|
||||
|
||||
// Get LiveKit configuration from appsettings
|
||||
var host = configuration["Realtime:LiveKit:Endpoint"] ??
|
||||
throw new ArgumentNullException("Endpoint configuration is required");
|
||||
var apiKey = configuration["Realtime:LiveKit:ApiKey"] ??
|
||||
throw new ArgumentNullException("ApiKey configuration is required");
|
||||
var apiSecret = configuration["Realtime:LiveKit:ApiSecret"] ??
|
||||
throw new ArgumentNullException("ApiSecret configuration is required");
|
||||
|
||||
_roomService = new RoomServiceClient(host, apiKey, apiSecret);
|
||||
_accessToken = new AccessToken(apiKey, apiSecret);
|
||||
_webhookReceiver = new WebhookReceiver(apiKey, apiSecret);
|
||||
|
||||
_db = db;
|
||||
_cache = cache;
|
||||
_callStatus = callStatus;
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public string ProviderName => "LiveKit";
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task<RealtimeSessionConfig> CreateSessionAsync(Guid roomId, Dictionary<string, object> metadata)
|
||||
{
|
||||
try
|
||||
{
|
||||
var roomName = $"Call_{roomId.ToString().Replace("-", "")}";
|
||||
|
||||
// Convert metadata to a string dictionary for LiveKit
|
||||
var roomMetadata = new Dictionary<string, string>();
|
||||
foreach (var item in metadata)
|
||||
{
|
||||
roomMetadata[item.Key] = item.Value?.ToString() ?? string.Empty;
|
||||
}
|
||||
|
||||
// Create room in LiveKit
|
||||
var room = await _roomService.CreateRoom(new CreateRoomRequest
|
||||
{
|
||||
Name = roomName,
|
||||
EmptyTimeout = 300, // 5 minutes
|
||||
Metadata = JsonSerializer.Serialize(roomMetadata)
|
||||
});
|
||||
|
||||
// Return session config
|
||||
return new RealtimeSessionConfig
|
||||
{
|
||||
SessionId = room.Name,
|
||||
Parameters = new Dictionary<string, object>
|
||||
{
|
||||
{ "sid", room.Sid },
|
||||
{ "emptyTimeout", room.EmptyTimeout },
|
||||
{ "creationTime", room.CreationTime }
|
||||
}
|
||||
};
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Failed to create LiveKit room for roomId: {RoomId}", roomId);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task EndSessionAsync(string sessionId, RealtimeSessionConfig config)
|
||||
{
|
||||
try
|
||||
{
|
||||
// Delete the room in LiveKit
|
||||
await _roomService.DeleteRoom(new DeleteRoomRequest
|
||||
{
|
||||
Room = sessionId
|
||||
});
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Failed to end LiveKit session: {SessionId}", sessionId);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public string GetUserToken(Account.Account account, string sessionId, bool isAdmin = false)
|
||||
{
|
||||
return GetUserTokenAsync(account, sessionId, isAdmin).GetAwaiter().GetResult();
|
||||
}
|
||||
|
||||
public Task<string> GetUserTokenAsync(Account.Account account, string sessionId, bool isAdmin = false)
|
||||
{
|
||||
var token = _accessToken.WithIdentity(account.Name)
|
||||
.WithName(account.Nick)
|
||||
.WithGrants(new VideoGrants
|
||||
{
|
||||
RoomJoin = true,
|
||||
CanPublish = true,
|
||||
CanPublishData = true,
|
||||
CanSubscribe = true,
|
||||
CanSubscribeMetrics = true,
|
||||
RoomAdmin = isAdmin,
|
||||
Room = sessionId
|
||||
})
|
||||
.WithMetadata(JsonSerializer.Serialize(new Dictionary<string, string>
|
||||
{ { "account_id", account.Id.ToString() } }))
|
||||
.WithTtl(TimeSpan.FromHours(1));
|
||||
return Task.FromResult(token.ToJwt());
|
||||
}
|
||||
|
||||
public async Task ReceiveWebhook(string body, string authHeader)
|
||||
{
|
||||
var evt = _webhookReceiver.Receive(body, authHeader);
|
||||
if (evt is null) return;
|
||||
|
||||
switch (evt.Event)
|
||||
{
|
||||
case "room_finished":
|
||||
var now = SystemClock.Instance.GetCurrentInstant();
|
||||
await _db.ChatRealtimeCall
|
||||
.Where(c => c.SessionId == evt.Room.Name)
|
||||
.ExecuteUpdateAsync(s => s.SetProperty(p => p.EndedAt, now)
|
||||
);
|
||||
|
||||
// Also clean up participants list when the room is finished
|
||||
await _cache.RemoveAsync(_GetParticipantsKey(evt.Room.Name));
|
||||
break;
|
||||
|
||||
case "participant_joined":
|
||||
if (evt.Participant != null)
|
||||
{
|
||||
// Add the participant to cache
|
||||
await _AddParticipantToCache(evt.Room.Name, evt.Participant);
|
||||
_logger.LogInformation(
|
||||
"Participant joined room: {RoomName}, Participant: {ParticipantIdentity}",
|
||||
evt.Room.Name, evt.Participant.Identity);
|
||||
|
||||
// Broadcast participant list update to all participants
|
||||
var info = await GetRoomParticipantsAsync(evt.Room.Name);
|
||||
await _callStatus.BroadcastParticipantUpdate(evt.Room.Name, info);
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
case "participant_left":
|
||||
if (evt.Participant != null)
|
||||
{
|
||||
// Remove the participant from cache
|
||||
await _RemoveParticipantFromCache(evt.Room.Name, evt.Participant);
|
||||
_logger.LogInformation(
|
||||
"Participant left room: {RoomName}, Participant: {ParticipantIdentity}",
|
||||
evt.Room.Name, evt.Participant.Identity);
|
||||
|
||||
// Broadcast participant list update to all participants
|
||||
var info = await GetRoomParticipantsAsync(evt.Room.Name);
|
||||
await _callStatus.BroadcastParticipantUpdate(evt.Room.Name, info);
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
private static string _GetParticipantsKey(string roomName)
|
||||
=> $"RoomParticipants_{roomName}";
|
||||
|
||||
private async Task _AddParticipantToCache(string roomName, ParticipantInfo participant)
|
||||
{
|
||||
var participantsKey = _GetParticipantsKey(roomName);
|
||||
|
||||
// Try to acquire a lock to prevent race conditions when updating the participants list
|
||||
await using var lockObj = await _cache.AcquireLockAsync(
|
||||
$"{participantsKey}_lock",
|
||||
TimeSpan.FromSeconds(10),
|
||||
TimeSpan.FromSeconds(5));
|
||||
|
||||
if (lockObj == null)
|
||||
{
|
||||
_logger.LogWarning("Failed to acquire lock for updating participants list in room: {RoomName}", roomName);
|
||||
return;
|
||||
}
|
||||
|
||||
// Get the current participants list
|
||||
var participants = await _cache.GetAsync<List<ParticipantInfoItem>>(participantsKey) ??
|
||||
[];
|
||||
|
||||
// Check if the participant already exists
|
||||
var existingIndex = participants.FindIndex(p => p.Identity == participant.Identity);
|
||||
if (existingIndex >= 0)
|
||||
{
|
||||
// Update existing participant
|
||||
participants[existingIndex] = CreateParticipantCacheItem(participant);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Add new participant
|
||||
participants.Add(CreateParticipantCacheItem(participant));
|
||||
}
|
||||
|
||||
// Update cache with new list
|
||||
await _cache.SetAsync(participantsKey, participants, TimeSpan.FromHours(6));
|
||||
|
||||
// Also add to a room group in cache for easy cleanup
|
||||
await _cache.AddToGroupAsync(participantsKey, $"Room_{roomName}");
|
||||
}
|
||||
|
||||
private async Task _RemoveParticipantFromCache(string roomName, ParticipantInfo participant)
|
||||
{
|
||||
var participantsKey = _GetParticipantsKey(roomName);
|
||||
|
||||
// Try to acquire a lock to prevent race conditions when updating the participants list
|
||||
await using var lockObj = await _cache.AcquireLockAsync(
|
||||
$"{participantsKey}_lock",
|
||||
TimeSpan.FromSeconds(10),
|
||||
TimeSpan.FromSeconds(5));
|
||||
|
||||
if (lockObj == null)
|
||||
{
|
||||
_logger.LogWarning("Failed to acquire lock for updating participants list in room: {RoomName}", roomName);
|
||||
return;
|
||||
}
|
||||
|
||||
// Get current participants list
|
||||
var participants = await _cache.GetAsync<List<ParticipantInfoItem>>(participantsKey);
|
||||
if (participants == null || !participants.Any())
|
||||
return;
|
||||
|
||||
// Remove participant
|
||||
participants.RemoveAll(p => p.Identity == participant.Identity);
|
||||
|
||||
// Update cache with new list
|
||||
await _cache.SetAsync(participantsKey, participants, TimeSpan.FromHours(6));
|
||||
}
|
||||
|
||||
// Helper method to get participants in a room
|
||||
public async Task<List<ParticipantInfoItem>> GetRoomParticipantsAsync(string roomName)
|
||||
{
|
||||
var participantsKey = _GetParticipantsKey(roomName);
|
||||
return await _cache.GetAsync<List<ParticipantInfoItem>>(participantsKey) ?? [];
|
||||
}
|
||||
|
||||
private ParticipantInfoItem CreateParticipantCacheItem(ParticipantInfo participant)
|
||||
{
|
||||
// Try to parse account ID from metadata
|
||||
Guid? accountId = null;
|
||||
var metadata = new Dictionary<string, string>();
|
||||
|
||||
if (string.IsNullOrEmpty(participant.Metadata))
|
||||
return new ParticipantInfoItem
|
||||
{
|
||||
Identity = participant.Identity,
|
||||
Name = participant.Name,
|
||||
AccountId = accountId,
|
||||
Metadata = metadata,
|
||||
JoinedAt = DateTime.UtcNow
|
||||
};
|
||||
try
|
||||
{
|
||||
metadata = JsonSerializer.Deserialize<Dictionary<string, string>>(participant.Metadata) ??
|
||||
new Dictionary<string, string>();
|
||||
|
||||
if (metadata.TryGetValue("account_id", out var accountIdStr))
|
||||
if (Guid.TryParse(accountIdStr, out var parsedId))
|
||||
accountId = parsedId;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Failed to parse participant metadata");
|
||||
}
|
||||
|
||||
return new ParticipantInfoItem
|
||||
{
|
||||
Identity = participant.Identity,
|
||||
Name = participant.Name,
|
||||
AccountId = accountId,
|
||||
Metadata = metadata,
|
||||
JoinedAt = DateTime.UtcNow
|
||||
};
|
||||
}
|
||||
}
|
@ -1,136 +0,0 @@
|
||||
using DysonNetwork.Sphere.Connection;
|
||||
using DysonNetwork.Sphere.Storage;
|
||||
using Livekit.Server.Sdk.Dotnet;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using NodaTime;
|
||||
using System.Text.Json;
|
||||
|
||||
namespace DysonNetwork.Sphere.Chat.Realtime;
|
||||
|
||||
/// <summary>
|
||||
/// LiveKit implementation of the real-time communication service
|
||||
/// </summary>
|
||||
public class LivekitRealtimeService : IRealtimeService
|
||||
{
|
||||
private readonly ILogger<LivekitRealtimeService> _logger;
|
||||
private readonly RoomServiceClient _roomService;
|
||||
private readonly AccessToken _accessToken;
|
||||
private readonly WebhookReceiver _webhookReceiver;
|
||||
|
||||
public LivekitRealtimeService(
|
||||
IConfiguration configuration,
|
||||
ILogger<LivekitRealtimeService> logger)
|
||||
{
|
||||
_logger = logger;
|
||||
|
||||
// Get LiveKit configuration from appsettings
|
||||
var host = configuration["Livekit:Endpoint"] ??
|
||||
throw new ArgumentNullException("Endpoint configuration is required");
|
||||
var apiKey = configuration["Livekit:ApiKey"] ??
|
||||
throw new ArgumentNullException("ApiKey configuration is required");
|
||||
var apiSecret = configuration["Livekit:ApiSecret"] ??
|
||||
throw new ArgumentNullException("ApiSecret configuration is required");
|
||||
|
||||
_roomService = new RoomServiceClient(host, apiKey, apiSecret);
|
||||
_accessToken = new AccessToken(apiKey, apiSecret);
|
||||
_webhookReceiver = new WebhookReceiver(apiKey, apiSecret);
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public string ProviderName => "LiveKit";
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task<RealtimeSessionConfig> CreateSessionAsync(Guid roomId, Dictionary<string, object> metadata)
|
||||
{
|
||||
try
|
||||
{
|
||||
var roomName = $"Call_{roomId.ToString().Replace("-", "")}";
|
||||
|
||||
// Convert metadata to a string dictionary for LiveKit
|
||||
var roomMetadata = new Dictionary<string, string>();
|
||||
foreach (var item in metadata)
|
||||
{
|
||||
roomMetadata[item.Key] = item.Value?.ToString() ?? string.Empty;
|
||||
}
|
||||
|
||||
// Create room in LiveKit
|
||||
var room = await _roomService.CreateRoom(new CreateRoomRequest
|
||||
{
|
||||
Name = roomName,
|
||||
EmptyTimeout = 300, // 5 minutes
|
||||
Metadata = JsonSerializer.Serialize(roomMetadata)
|
||||
});
|
||||
|
||||
// Return session config
|
||||
return new RealtimeSessionConfig
|
||||
{
|
||||
SessionId = room.Name,
|
||||
Parameters = new Dictionary<string, object>
|
||||
{
|
||||
{ "sid", room.Sid },
|
||||
{ "emptyTimeout", room.EmptyTimeout },
|
||||
{ "creationTime", room.CreationTime }
|
||||
}
|
||||
};
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Failed to create LiveKit room for roomId: {RoomId}", roomId);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task EndSessionAsync(string sessionId, RealtimeSessionConfig config)
|
||||
{
|
||||
try
|
||||
{
|
||||
// Delete the room in LiveKit
|
||||
await _roomService.DeleteRoom(new DeleteRoomRequest
|
||||
{
|
||||
Room = sessionId
|
||||
});
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Failed to end LiveKit session: {SessionId}", sessionId);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public string GetUserToken(Account.Account account, string sessionId, bool isAdmin = false)
|
||||
{
|
||||
return GetUserTokenAsync(account, sessionId, isAdmin).GetAwaiter().GetResult();
|
||||
}
|
||||
|
||||
public Task<string> GetUserTokenAsync(Account.Account account, string sessionId, bool isAdmin = false)
|
||||
{
|
||||
var token = _accessToken.WithIdentity(account.Name)
|
||||
.WithName(account.Nick)
|
||||
.WithGrants(new VideoGrants
|
||||
{
|
||||
RoomJoin = true,
|
||||
CanPublish = true,
|
||||
CanPublishData = true,
|
||||
CanSubscribe = true,
|
||||
CanSubscribeMetrics = true,
|
||||
RoomAdmin = isAdmin,
|
||||
Room = sessionId
|
||||
})
|
||||
.WithMetadata(JsonSerializer.Serialize(new Dictionary<string, string>
|
||||
{ { "account_id", account.Id.ToString() } }))
|
||||
.WithTtl(TimeSpan.FromHours(1));
|
||||
return Task.FromResult(token.ToJwt());
|
||||
}
|
||||
|
||||
public Task ReceiveWebhook(string body, string authHeader)
|
||||
{
|
||||
var evt = _webhookReceiver.Receive(body, authHeader);
|
||||
if (evt is null) return Task.CompletedTask;
|
||||
|
||||
// TODO: Handle webhook events
|
||||
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
}
|
91
DysonNetwork.Sphere/Chat/Realtime/RealtimeStatusService.cs
Normal file
91
DysonNetwork.Sphere/Chat/Realtime/RealtimeStatusService.cs
Normal file
@ -0,0 +1,91 @@
|
||||
using DysonNetwork.Sphere.Connection;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
|
||||
namespace DysonNetwork.Sphere.Chat.Realtime;
|
||||
|
||||
public class ParticipantInfoItem
|
||||
{
|
||||
public string Identity { get; set; } = null!;
|
||||
public string Name { get; set; } = null!;
|
||||
public Guid? AccountId { get; set; }
|
||||
public Dictionary<string, string> Metadata { get; set; } = new();
|
||||
public DateTime JoinedAt { get; set; }
|
||||
}
|
||||
|
||||
public class RealtimeStatusService(AppDatabase db, WebSocketService ws, ILogger<RealtimeStatusService> logger)
|
||||
{
|
||||
// Broadcast participant update to all participants in a room
|
||||
public async Task BroadcastParticipantUpdate(string roomName, List<ParticipantInfoItem> participantsInfo)
|
||||
{
|
||||
try
|
||||
{
|
||||
// Get the room ID from the session name
|
||||
var roomInfo = await db.ChatRealtimeCall
|
||||
.Where(c => c.SessionId == roomName && c.EndedAt == null)
|
||||
.Select(c => new { c.RoomId, c.Id })
|
||||
.FirstOrDefaultAsync();
|
||||
|
||||
if (roomInfo == null)
|
||||
{
|
||||
logger.LogWarning("Could not find room info for session: {SessionName}", roomName);
|
||||
return;
|
||||
}
|
||||
|
||||
// Get all room members who should receive this update
|
||||
var roomMembers = await db.ChatMembers
|
||||
.Where(m => m.ChatRoomId == roomInfo.RoomId && m.LeaveAt == null)
|
||||
.Select(m => m.AccountId)
|
||||
.ToListAsync();
|
||||
|
||||
// Get member profiles for participants who have account IDs
|
||||
var accountIds = participantsInfo
|
||||
.Where(p => p.AccountId.HasValue)
|
||||
.Select(p => p.AccountId!.Value)
|
||||
.ToList();
|
||||
|
||||
var memberProfiles = new Dictionary<Guid, ChatMember>();
|
||||
if (accountIds.Count != 0)
|
||||
{
|
||||
memberProfiles = await db.ChatMembers
|
||||
.Where(m => m.ChatRoomId == roomInfo.RoomId && accountIds.Contains(m.AccountId))
|
||||
.Include(m => m.Account)
|
||||
.ThenInclude(m => m.Profile)
|
||||
.ToDictionaryAsync(m => m.AccountId, m => m);
|
||||
}
|
||||
|
||||
// Convert to CallParticipant objects
|
||||
var participants = participantsInfo.Select(p => new CallParticipant
|
||||
{
|
||||
Identity = p.Identity,
|
||||
Name = p.Name,
|
||||
AccountId = p.AccountId,
|
||||
JoinedAt = p.JoinedAt,
|
||||
Profile = p.AccountId.HasValue && memberProfiles.TryGetValue(p.AccountId.Value, out var profile)
|
||||
? profile
|
||||
: null
|
||||
}).ToList();
|
||||
|
||||
// Create the update packet with CallParticipant objects
|
||||
var updatePacket = new WebSocketPacket
|
||||
{
|
||||
Type = WebSocketPacketType.CallParticipantsUpdate,
|
||||
Data = new Dictionary<string, object>
|
||||
{
|
||||
{ "room_id", roomInfo.RoomId },
|
||||
{ "call_id", roomInfo.Id },
|
||||
{ "participants", participants }
|
||||
}
|
||||
};
|
||||
|
||||
// Send the update to all members
|
||||
foreach (var accountId in roomMembers)
|
||||
{
|
||||
ws.SendPacketToAccount(accountId, updatePacket);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogError(ex, "Error broadcasting participant update for room {RoomName}", roomName);
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user