♻️ Replace the LiveKit with built-in webrtc

This commit is contained in:
2025-10-19 17:30:51 +08:00
parent 46ebd92dc1
commit 4626529eb5
6 changed files with 925 additions and 127 deletions

View File

@@ -1,10 +1,12 @@
using DysonNetwork.Shared.Models;
using DysonNetwork.Shared.Proto;
using DysonNetwork.Sphere.Chat.Realtime;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
using Swashbuckle.AspNetCore.Annotations;
using System.Collections.Concurrent;
using System.Net.WebSockets;
using DysonNetwork.Shared.Proto;
using WebSocketPacket = DysonNetwork.Shared.Models.WebSocketPacket;
namespace DysonNetwork.Sphere.Chat;
@@ -13,6 +15,14 @@ public class RealtimeChatConfiguration
public string Endpoint { get; set; } = null!;
}
public class SignalingMessage
{
public string Type { get; set; } = null!;
public object? Data { get; set; }
public string? AccountId { get; set; }
public SnAccount? Account { get; set; }
}
[ApiController]
[Route("/api/chat/realtime")]
public class RealtimeCallController(
@@ -20,31 +30,36 @@ public class RealtimeCallController(
AppDatabase db,
ChatService cs,
ChatRoomService crs,
IRealtimeService realtime
ILogger<RealtimeCallController> logger
) : ControllerBase
{
private readonly RealtimeChatConfiguration _config =
configuration.GetSection("RealtimeChat").Get<RealtimeChatConfiguration>()!;
// A thread-safe collection to hold connected WebSocket clients per chat room.
private static readonly
ConcurrentDictionary<string, ConcurrentDictionary<Guid, (WebSocket Socket, string
AccountId, int Role)>> RoomClients = new();
// A thread-safe collection to hold participants in each room.
private static readonly
ConcurrentDictionary<string, ConcurrentDictionary<string, (Account Account, DateTime JoinedAt)>>
RoomParticipants = new();
/// <summary>
/// This endpoint is especially designed for livekit webhooks,
/// for update the call participates and more.
/// Learn more at: https://docs.livekit.io/home/server/webhooks/
/// This endpoint is for WebRTC signaling webhooks if needed in the future.
/// Currently built-in WebRTC signaling doesn't require external webhooks.
/// </summary>
[HttpPost("webhook")]
[SwaggerIgnore]
public async Task<IActionResult> WebhookReceiver()
public Task<IActionResult> WebhookReceiver()
{
using var reader = new StreamReader(Request.Body);
var postData = await reader.ReadToEndAsync();
var authHeader = Request.Headers.Authorization.ToString();
await realtime.ReceiveWebhook(postData, authHeader);
return Ok();
// Built-in WebRTC signaling doesn't require webhooks
// Return success to indicate endpoint exists for potential future use
return Task.FromResult<IActionResult>(Ok("Webhook received - built-in WebRTC signaling active"));
}
[HttpGet("{roomId:guid}")]
[HttpGet("{roomId:guid}/status")]
[Authorize]
public async Task<ActionResult<SnRealtimeCall>> GetOngoingCall(Guid roomId)
{
@@ -94,46 +109,32 @@ public class RealtimeCallController(
return BadRequest("Call session is not properly configured.");
var isAdmin = member.Role >= ChatMemberRole.Moderator;
var userToken = realtime.GetUserToken(currentUser, ongoingCall.SessionId, isAdmin);
// Get LiveKit endpoint from configuration
// Get WebRTC signaling server endpoint from configuration
var endpoint = _config.Endpoint ??
throw new InvalidOperationException("LiveKit endpoint configuration is missing");
throw new InvalidOperationException("WebRTC signaling endpoint configuration is missing");
// Inject the ChatRoomService
var chatRoomService = HttpContext.RequestServices.GetRequiredService<ChatRoomService>();
// Get current participants from the LiveKit service
// Get current participants from the participant list
var participants = new List<CallParticipant>();
if (realtime is LiveKitRealtimeService livekitService)
var roomKey = ongoingCall.RoomId.ToString();
if (RoomParticipants.TryGetValue(roomKey, out var partsDict))
{
var roomParticipants = await livekitService.GetRoomParticipantsAsync(ongoingCall.SessionId);
participants = [];
foreach (var p in roomParticipants)
{
var participant = new CallParticipant
participants.AddRange(from part in partsDict.Values
select new CallParticipant
{
Identity = p.Identity,
Name = p.Name,
AccountId = p.AccountId,
JoinedAt = p.JoinedAt
};
// Fetch the ChatMember profile if we have an account ID
if (p.AccountId.HasValue)
participant.Profile = await chatRoomService.GetRoomMember(p.AccountId.Value, roomId);
participants.Add(participant);
}
Identity = part.Account.Id,
Name = part.Account.Name,
AccountId = Guid.Parse(part.Account.Id),
JoinedAt = part.JoinedAt
});
}
// Create the response model
// Create the response model for built-in WebRTC signaling
var response = new JoinCallResponse
{
Provider = realtime.ProviderName,
Provider = "Built-in WebRTC Signaling",
Endpoint = endpoint,
Token = userToken,
Token = "", // No external token needed for built-in signaling
CallId = ongoingCall.Id,
RoomName = ongoingCall.SessionId,
IsAdmin = isAdmin,
@@ -186,6 +187,205 @@ public class RealtimeCallController(
return BadRequest(exception.Message);
}
}
/// <summary>
/// WebSocket signaling endpoint for WebRTC calls in a specific chat room.
/// Path: /api/chat/realtime/{chatId}
/// Requires JWT authentication (handled by middleware).
/// </summary>
[HttpGet("{chatId:guid}")]
public async Task SignalingWebSocket(Guid chatId)
{
if (HttpContext.Items["CurrentUser"] is not Account currentUser)
{
HttpContext.Response.StatusCode = 401;
await HttpContext.Response.WriteAsync("Unauthorized");
return;
}
// Verify the user is a member of the chat room
var accountId = Guid.Parse(currentUser.Id);
var member = await db.ChatMembers
.Where(m => m.AccountId == accountId && m.ChatRoomId == chatId && m.JoinedAt != null && m.LeaveAt == null)
.FirstOrDefaultAsync();
if (member == null || member.Role < ChatMemberRole.Member)
{
HttpContext.Response.StatusCode = 403;
await HttpContext.Response.WriteAsync("Forbidden: Not a member of this chat room");
return;
}
if (!HttpContext.WebSockets.IsWebSocketRequest)
{
HttpContext.Response.StatusCode = 400;
await HttpContext.Response.WriteAsync("Bad Request: WebSocket connection expected");
return;
}
var webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync();
var clientId = Guid.NewGuid();
// Add a client to the room-specific clients dictionary
var roomKey = chatId.ToString();
var roomDict = RoomClients.GetOrAdd(roomKey,
_ => new ConcurrentDictionary<Guid, (WebSocket, string, int)>());
roomDict.TryAdd(clientId, (webSocket, currentUser.Id, member.Role));
// Add to the participant list
var participantsDict = RoomParticipants.GetOrAdd(roomKey,
_ => new ConcurrentDictionary<string, (Account Account, DateTime JoinedAt)>());
var wasAdded = participantsDict.TryAdd(currentUser.Id, (currentUser, DateTime.UtcNow));
logger.LogInformation(
"WebRTC signaling client connected: {ClientId} ({UserId}) in room {RoomId}. Total clients in room: {Count}",
clientId, currentUser.Id, chatId, roomDict.Count);
// Get other participants as CallParticipant objects
var otherParticipants = participantsDict.Values
.Where(p => p.Account.Id != currentUser.Id)
.Select(p => new CallParticipant
{
Identity = p.Account.Id,
Name = p.Account.Name,
AccountId = Guid.Parse(p.Account.Id),
Account = SnAccount.FromProtoValue(p.Account),
JoinedAt = p.JoinedAt
})
.ToList();
var welcomePacket = new WebSocketPacket
{
Type = "webrtc",
Data = new
{
userId = currentUser.Id,
roomId = chatId,
message = $"Connected to call of #{chatId}.",
timestamp = DateTime.UtcNow.ToString("o"),
participants = otherParticipants
}
};
var responseBytes = welcomePacket.ToBytes();
await webSocket.SendAsync(new ArraySegment<byte>(responseBytes), WebSocketMessageType.Text, true,
CancellationToken.None);
// Broadcast user-joined to existing clients if this is the first connection for this user in the room
if (wasAdded)
{
var joinPacket = new WebSocketPacket
{
Type = "webrtc.signal",
Data = new SignalingMessage
{
Type = "user-joined",
AccountId = currentUser.Id,
Account = SnAccount.FromProtoValue(currentUser),
Data = new { }
}
};
await BroadcastMessageToRoom(chatId, clientId, joinPacket);
}
try
{
// Use a MemoryStream to build the full message from potentially multiple chunks.
using var ms = new MemoryStream();
// A larger buffer can be more efficient, but the loop is what handles correctness.
var buffer = new byte[1024 * 8];
while (webSocket.State == WebSocketState.Open)
{
ms.SetLength(0); // Clear the stream for the new message.
WebSocketReceiveResult result;
do
{
result = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
if (result.MessageType == WebSocketMessageType.Close)
{
break;
}
ms.Write(buffer, 0, result.Count);
} while (!result.EndOfMessage);
if (result.MessageType == WebSocketMessageType.Close)
break;
var packet = WebSocketPacket.FromBytes(ms.ToArray());
var signalingMessage = packet.GetData<SignalingMessage>();
if (signalingMessage is null)
{
logger.LogWarning("Signaling message could not be parsed, dismissed...");
continue;
}
signalingMessage.AccountId = currentUser.Id;
signalingMessage.Account = SnAccount.FromProtoValue(currentUser);
var broadcastPacket = new WebSocketPacket
{
Type = "webrtc.signal",
Data = signalingMessage
};
logger.LogDebug("Message received from {ClientId} ({UserId}): Type={MessageType}", clientId, currentUser.Id, signalingMessage.Type);
await BroadcastMessageToRoom(chatId, clientId, broadcastPacket);
}
}
catch (WebSocketException wsex) when (wsex.WebSocketErrorCode == WebSocketError.ConnectionClosedPrematurely)
{
// This is an expected exception when a client closes the browser tab.
logger.LogDebug("WebRTC signaling client connection was closed prematurely for user {UserId}",
currentUser.Id);
}
catch (Exception ex)
{
logger.LogError(ex, "Error with WebRTC signaling client connection for user {UserId}", currentUser.Id);
}
finally
{
// Remove the client from the room
if (roomDict.TryRemove(clientId, out _))
{
logger.LogInformation(
"WebRTC signaling client disconnected: {ClientId} ({UserId}). Total clients in room: {Count}",
clientId, currentUser.Id, roomDict.Count);
// If no more connections from this account, remove from participants
if (roomDict.Values.All(v => v.AccountId != currentUser.Id))
{
var tempParticipantsDict = RoomParticipants.GetOrAdd(roomKey,
_ => new ConcurrentDictionary<string, (Account Account, DateTime JoinedAt)>());
if (tempParticipantsDict.TryRemove(currentUser.Id, out _))
{
logger.LogInformation("Participant {UserId} removed from room {RoomId}", currentUser.Id,
chatId);
}
}
}
webSocket.Dispose();
}
}
private async Task BroadcastMessageToRoom(Guid roomId, Guid senderId, WebSocketPacket packet)
{
var roomKey = roomId.ToString();
if (!RoomClients.TryGetValue(roomKey, out var roomDict))
return;
var messageBytes = packet.ToBytes();
var segment = new ArraySegment<byte>(messageBytes);
foreach (var pair in roomDict)
{
if (pair.Key == senderId) continue;
if (pair.Value.Socket.State != WebSocketState.Open) continue;
await pair.Value.Socket.SendAsync(segment, WebSocketMessageType.Text, true, CancellationToken.None);
logger.LogDebug("Message broadcasted to {ClientId} in room {RoomId}", pair.Key, roomId);
}
}
}
// Response model for joining a call
@@ -220,7 +420,7 @@ public class JoinCallResponse
/// Whether the user is the admin of the call
/// </summary>
public bool IsAdmin { get; set; }
/// <summary>
/// Current participants in the call
/// </summary>
@@ -236,22 +436,22 @@ public class CallParticipant
/// The participant's identity (username)
/// </summary>
public string Identity { get; set; } = null!;
/// <summary>
/// The participant's display name
/// </summary>
public string Name { get; set; } = null!;
/// <summary>
/// The participant's account ID if available
/// </summary>
public Guid? AccountId { get; set; }
/// <summary>
/// The participant's profile in the chat
/// </summary>
public SnChatMember? Profile { get; set; }
public SnAccount? Account { get; set; }
/// <summary>
/// When the participant joined the call
/// </summary>