using DysonNetwork.Sphere.Connection;
using DysonNetwork.Sphere.Storage;
using Livekit.Server.Sdk.Dotnet;
using Microsoft.EntityFrameworkCore;
using NodaTime;
using System.Text.Json;
namespace DysonNetwork.Sphere.Chat.Realtime;
///
/// LiveKit implementation of the real-time communication service
///
public class LiveKitRealtimeService : IRealtimeService
{
private readonly AppDatabase _db;
private readonly ICacheService _cache;
private readonly RealtimeStatusService _callStatus;
private readonly ILogger _logger;
private readonly RoomServiceClient _roomService;
private readonly AccessToken _accessToken;
private readonly WebhookReceiver _webhookReceiver;
public LiveKitRealtimeService(
IConfiguration configuration,
ILogger logger,
AppDatabase db,
ICacheService cache,
RealtimeStatusService callStatus
)
{
_logger = logger;
// Get LiveKit configuration from appsettings
var host = configuration["Realtime:LiveKit:Endpoint"] ??
throw new ArgumentNullException("Endpoint configuration is required");
var apiKey = configuration["Realtime:LiveKit:ApiKey"] ??
throw new ArgumentNullException("ApiKey configuration is required");
var apiSecret = configuration["Realtime:LiveKit:ApiSecret"] ??
throw new ArgumentNullException("ApiSecret configuration is required");
_roomService = new RoomServiceClient(host, apiKey, apiSecret);
_accessToken = new AccessToken(apiKey, apiSecret);
_webhookReceiver = new WebhookReceiver(apiKey, apiSecret);
_db = db;
_cache = cache;
_callStatus = callStatus;
}
///
public string ProviderName => "LiveKit";
///
public async Task CreateSessionAsync(Guid roomId, Dictionary metadata)
{
try
{
var roomName = $"Call_{roomId.ToString().Replace("-", "")}";
// Convert metadata to a string dictionary for LiveKit
var roomMetadata = new Dictionary();
foreach (var item in metadata)
{
roomMetadata[item.Key] = item.Value?.ToString() ?? string.Empty;
}
// Create room in LiveKit
var room = await _roomService.CreateRoom(new CreateRoomRequest
{
Name = roomName,
EmptyTimeout = 300, // 5 minutes
Metadata = JsonSerializer.Serialize(roomMetadata)
});
// Return session config
return new RealtimeSessionConfig
{
SessionId = room.Name,
Parameters = new Dictionary
{
{ "sid", room.Sid },
{ "emptyTimeout", room.EmptyTimeout },
{ "creationTime", room.CreationTime }
}
};
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to create LiveKit room for roomId: {RoomId}", roomId);
throw;
}
}
///
public async Task EndSessionAsync(string sessionId, RealtimeSessionConfig config)
{
try
{
// Delete the room in LiveKit
await _roomService.DeleteRoom(new DeleteRoomRequest
{
Room = sessionId
});
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to end LiveKit session: {SessionId}", sessionId);
throw;
}
}
///
public string GetUserToken(Account.Account account, string sessionId, bool isAdmin = false)
{
return GetUserTokenAsync(account, sessionId, isAdmin).GetAwaiter().GetResult();
}
public Task GetUserTokenAsync(Account.Account account, string sessionId, bool isAdmin = false)
{
var token = _accessToken.WithIdentity(account.Name)
.WithName(account.Nick)
.WithGrants(new VideoGrants
{
RoomJoin = true,
CanPublish = true,
CanPublishData = true,
CanSubscribe = true,
CanSubscribeMetrics = true,
RoomAdmin = isAdmin,
Room = sessionId
})
.WithMetadata(JsonSerializer.Serialize(new Dictionary
{ { "account_id", account.Id.ToString() } }))
.WithTtl(TimeSpan.FromHours(1));
return Task.FromResult(token.ToJwt());
}
public async Task ReceiveWebhook(string body, string authHeader)
{
var evt = _webhookReceiver.Receive(body, authHeader);
if (evt is null) return;
switch (evt.Event)
{
case "room_finished":
var now = SystemClock.Instance.GetCurrentInstant();
await _db.ChatRealtimeCall
.Where(c => c.SessionId == evt.Room.Name)
.ExecuteUpdateAsync(s => s.SetProperty(p => p.EndedAt, now)
);
// Also clean up participants list when the room is finished
await _cache.RemoveAsync(_GetParticipantsKey(evt.Room.Name));
break;
case "participant_joined":
if (evt.Participant != null)
{
// Add the participant to cache
await _AddParticipantToCache(evt.Room.Name, evt.Participant);
_logger.LogInformation(
"Participant joined room: {RoomName}, Participant: {ParticipantIdentity}",
evt.Room.Name, evt.Participant.Identity);
// Broadcast participant list update to all participants
var info = await GetRoomParticipantsAsync(evt.Room.Name);
await _callStatus.BroadcastParticipantUpdate(evt.Room.Name, info);
}
break;
case "participant_left":
if (evt.Participant != null)
{
// Remove the participant from cache
await _RemoveParticipantFromCache(evt.Room.Name, evt.Participant);
_logger.LogInformation(
"Participant left room: {RoomName}, Participant: {ParticipantIdentity}",
evt.Room.Name, evt.Participant.Identity);
// Broadcast participant list update to all participants
var info = await GetRoomParticipantsAsync(evt.Room.Name);
await _callStatus.BroadcastParticipantUpdate(evt.Room.Name, info);
}
break;
}
}
private static string _GetParticipantsKey(string roomName)
=> $"RoomParticipants_{roomName}";
private async Task _AddParticipantToCache(string roomName, ParticipantInfo participant)
{
var participantsKey = _GetParticipantsKey(roomName);
// Try to acquire a lock to prevent race conditions when updating the participants list
await using var lockObj = await _cache.AcquireLockAsync(
$"{participantsKey}_lock",
TimeSpan.FromSeconds(10),
TimeSpan.FromSeconds(5));
if (lockObj == null)
{
_logger.LogWarning("Failed to acquire lock for updating participants list in room: {RoomName}", roomName);
return;
}
// Get the current participants list
var participants = await _cache.GetAsync>(participantsKey) ??
[];
// Check if the participant already exists
var existingIndex = participants.FindIndex(p => p.Identity == participant.Identity);
if (existingIndex >= 0)
{
// Update existing participant
participants[existingIndex] = CreateParticipantCacheItem(participant);
}
else
{
// Add new participant
participants.Add(CreateParticipantCacheItem(participant));
}
// Update cache with new list
await _cache.SetAsync(participantsKey, participants, TimeSpan.FromHours(6));
// Also add to a room group in cache for easy cleanup
await _cache.AddToGroupAsync(participantsKey, $"Room_{roomName}");
}
private async Task _RemoveParticipantFromCache(string roomName, ParticipantInfo participant)
{
var participantsKey = _GetParticipantsKey(roomName);
// Try to acquire a lock to prevent race conditions when updating the participants list
await using var lockObj = await _cache.AcquireLockAsync(
$"{participantsKey}_lock",
TimeSpan.FromSeconds(10),
TimeSpan.FromSeconds(5));
if (lockObj == null)
{
_logger.LogWarning("Failed to acquire lock for updating participants list in room: {RoomName}", roomName);
return;
}
// Get current participants list
var participants = await _cache.GetAsync>(participantsKey);
if (participants == null || !participants.Any())
return;
// Remove participant
participants.RemoveAll(p => p.Identity == participant.Identity);
// Update cache with new list
await _cache.SetAsync(participantsKey, participants, TimeSpan.FromHours(6));
}
// Helper method to get participants in a room
public async Task> GetRoomParticipantsAsync(string roomName)
{
var participantsKey = _GetParticipantsKey(roomName);
return await _cache.GetAsync>(participantsKey) ?? [];
}
private ParticipantInfoItem CreateParticipantCacheItem(ParticipantInfo participant)
{
// Try to parse account ID from metadata
Guid? accountId = null;
var metadata = new Dictionary();
if (string.IsNullOrEmpty(participant.Metadata))
return new ParticipantInfoItem
{
Identity = participant.Identity,
Name = participant.Name,
AccountId = accountId,
Metadata = metadata,
JoinedAt = DateTime.UtcNow
};
try
{
metadata = JsonSerializer.Deserialize>(participant.Metadata) ??
new Dictionary();
if (metadata.TryGetValue("account_id", out var accountIdStr))
if (Guid.TryParse(accountIdStr, out var parsedId))
accountId = parsedId;
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to parse participant metadata");
}
return new ParticipantInfoItem
{
Identity = participant.Identity,
Name = participant.Name,
AccountId = accountId,
Metadata = metadata,
JoinedAt = DateTime.UtcNow
};
}
}