Implement realtime chat

This commit is contained in:
2025-05-25 05:51:13 +08:00
parent 59bc9edd4b
commit 9e7ba820c4
13 changed files with 3849 additions and 33 deletions

View File

@ -1,4 +1,5 @@
using DysonNetwork.Sphere.Account;
using DysonNetwork.Sphere.Chat.Realtime;
using DysonNetwork.Sphere.Connection;
using DysonNetwork.Sphere.Storage;
using Microsoft.EntityFrameworkCore;
@ -6,7 +7,12 @@ using NodaTime;
namespace DysonNetwork.Sphere.Chat;
public class ChatService(AppDatabase db, FileService fs, IServiceScopeFactory scopeFactory)
public class ChatService(
AppDatabase db,
FileService fs,
IServiceScopeFactory scopeFactory,
IRealtimeService realtime
)
{
public async Task<Message> SendMessageAsync(Message message, ChatMember sender, ChatRoom room)
{
@ -105,10 +111,10 @@ public class ChatService(AppDatabase db, FileService fs, IServiceScopeFactory sc
.Where(m => m.AccountId == userId)
.Select(m => new { m.ChatRoomId, m.LastReadAt })
.ToListAsync();
var lastReadAt = members.ToDictionary(m => m.ChatRoomId, m => m.LastReadAt);
var roomsId = lastReadAt.Keys.ToList();
return await db.ChatMessages
.Where(m => roomsId.Contains(m.ChatRoomId))
.GroupBy(m => m.ChatRoomId)
@ -124,7 +130,7 @@ public class ChatService(AppDatabase db, FileService fs, IServiceScopeFactory sc
.Where(m => m.AccountId == userId)
.Select(m => m.ChatRoomId)
.ToListAsync();
var messages = await db.ChatMessages
.IgnoreQueryFilters()
.Include(m => m.Sender)
@ -137,7 +143,7 @@ public class ChatService(AppDatabase db, FileService fs, IServiceScopeFactory sc
m => m!.ChatRoomId,
m => m
);
return messages;
}
@ -147,13 +153,33 @@ public class ChatService(AppDatabase db, FileService fs, IServiceScopeFactory sc
{
RoomId = room.Id,
SenderId = sender.Id,
ProviderName = realtime.ProviderName
};
try
{
var sessionConfig = await realtime.CreateSessionAsync(room.Id, new Dictionary<string, object>
{
{ "room_id", room.Id },
{ "user_id", sender.AccountId },
});
// Store session details
call.SessionId = sessionConfig.SessionId;
call.UpstreamConfig = sessionConfig.Parameters;
}
catch (Exception ex)
{
// Log the exception but continue with call creation
throw new InvalidOperationException($"Failed to create {realtime.ProviderName} session: {ex.Message}");
}
db.ChatRealtimeCall.Add(call);
await db.SaveChangesAsync();
await SendMessageAsync(new Message
{
Type = "realtime.start",
Type = "call.start",
ChatRoomId = room.Id,
SenderId = sender.Id,
Meta = new Dictionary<string, object>
@ -169,14 +195,34 @@ public class ChatService(AppDatabase db, FileService fs, IServiceScopeFactory sc
{
var call = await GetCallOngoingAsync(roomId);
if (call is null) throw new InvalidOperationException("No ongoing call was not found.");
// End the realtime session if it exists
if (!string.IsNullOrEmpty(call.SessionId) && !string.IsNullOrEmpty(call.ProviderName))
{
try
{
var config = new RealtimeSessionConfig
{
SessionId = call.SessionId,
Parameters = call.UpstreamConfig
};
await realtime.EndSessionAsync(call.SessionId, config);
}
catch (Exception ex)
{
// Log the exception but continue with call ending
throw new InvalidOperationException($"Failed to end {call.ProviderName} session: {ex.Message}");
}
}
call.EndedAt = SystemClock.Instance.GetCurrentInstant();
db.ChatRealtimeCall.Update(call);
await db.SaveChangesAsync();
await SendMessageAsync(new Message
{
Type = "realtime.ended",
Type = "call.ended",
ChatRoomId = call.RoomId,
SenderId = call.SenderId,
Meta = new Dictionary<string, object>
@ -185,7 +231,7 @@ public class ChatService(AppDatabase db, FileService fs, IServiceScopeFactory sc
}
}, call.Sender, call.Room);
}
public async Task<RealtimeCall?> GetCallOngoingAsync(Guid roomId)
{
return await db.ChatRealtimeCall
@ -195,7 +241,7 @@ public class ChatService(AppDatabase db, FileService fs, IServiceScopeFactory sc
.Include(c => c.Sender)
.FirstOrDefaultAsync();
}
public async Task<SyncResponse> GetSyncDataAsync(Guid roomId, long lastSyncTimestamp)
{
var timestamp = Instant.FromUnixTimeMilliseconds(lastSyncTimestamp);