Sped up and reduce storage usage of read receipt

This commit is contained in:
LittleSheep 2025-05-18 12:14:23 +08:00
parent fdfdffa382
commit 205ccd66b3
8 changed files with 95 additions and 39 deletions

View File

@ -44,11 +44,3 @@ public class ActionLogService(AppDatabase db, GeoIpService geo, FlushBufferServi
fbs.Enqueue(log);
}
}
public class ActionLogFlushJob(FlushBufferService fbs, ActionLogFlushHandler hdl) : IJob
{
public async Task Execute(IJobExecutionContext context)
{
await fbs.FlushAsync(hdl);
}
}

View File

@ -63,7 +63,7 @@ public class AppDatabase(
public DbSet<Chat.ChatMember> ChatMembers { get; set; }
public DbSet<Chat.Message> ChatMessages { get; set; }
public DbSet<Chat.RealtimeCall> ChatRealtimeCall { get; set; }
public DbSet<Chat.MessageStatus> ChatStatuses { get; set; }
public DbSet<Chat.MessageReadReceipt> ChatReadReceipts { get; set; }
public DbSet<Chat.MessageReaction> ChatReactions { get; set; }
public DbSet<Sticker.Sticker> Stickers { get; set; }
@ -237,7 +237,7 @@ public class AppDatabase(
.WithMany()
.HasForeignKey(pm => pm.AccountId)
.OnDelete(DeleteBehavior.Cascade);
modelBuilder.Entity<Chat.MessageStatus>()
modelBuilder.Entity<Chat.MessageReadReceipt>()
.HasKey(e => new { e.MessageId, e.SenderId });
modelBuilder.Entity<Chat.Message>()
.HasOne(m => m.ForwardedMessage)

View File

@ -61,7 +61,7 @@ public class ChatService(AppDatabase db, IServiceScopeFactory scopeFactory)
public async Task MarkMessageAsReadAsync(Guid messageId, Guid roomId, Guid userId)
{
var existingStatus = await db.ChatStatuses
var existingStatus = await db.ChatReadReceipts
.FirstOrDefaultAsync(x => x.MessageId == messageId && x.Sender.AccountId == userId);
var sender = await db.ChatMembers
.Where(m => m.AccountId == userId && m.ChatRoomId == roomId)
@ -70,12 +70,12 @@ public class ChatService(AppDatabase db, IServiceScopeFactory scopeFactory)
if (existingStatus == null)
{
existingStatus = new MessageStatus
existingStatus = new MessageReadReceipt
{
MessageId = messageId,
SenderId = sender.Id,
};
db.ChatStatuses.Add(existingStatus);
db.ChatReadReceipts.Add(existingStatus);
}
await db.SaveChangesAsync();
@ -83,14 +83,16 @@ public class ChatService(AppDatabase db, IServiceScopeFactory scopeFactory)
public async Task<bool> GetMessageReadStatus(Guid messageId, Guid userId)
{
return await db.ChatStatuses
return await db.ChatReadReceipts
.AnyAsync(x => x.MessageId == messageId && x.Sender.AccountId == userId);
}
public async Task<int> CountUnreadMessage(Guid userId, Guid chatRoomId)
{
var cutoff = SystemClock.Instance.GetCurrentInstant().Minus(Duration.FromDays(30));
var messages = await db.ChatMessages
.Where(m => m.ChatRoomId == chatRoomId)
.Where(m => m.CreatedAt < cutoff)
.Select(m => new MessageStatusResponse
{
MessageId = m.Id,
@ -103,12 +105,14 @@ public class ChatService(AppDatabase db, IServiceScopeFactory scopeFactory)
public async Task<Dictionary<Guid, int>> CountUnreadMessagesForJoinedRoomsAsync(Guid userId)
{
var cutoff = SystemClock.Instance.GetCurrentInstant().Minus(Duration.FromDays(30));
var userRooms = await db.ChatMembers
.Where(m => m.AccountId == userId)
.Select(m => m.ChatRoomId)
.ToListAsync();
var messages = await db.ChatMessages
.Where(m => m.CreatedAt < cutoff)
.Where(m => userRooms.Contains(m.ChatRoomId))
.Select(m => new
{

View File

@ -19,7 +19,7 @@ public class Message : ModelBase
public ICollection<CloudFile> Attachments { get; set; } = new List<CloudFile>();
public ICollection<MessageReaction> Reactions { get; set; } = new List<MessageReaction>();
public ICollection<MessageStatus> Statuses { get; set; } = new List<MessageStatus>();
public ICollection<MessageReadReceipt> Statuses { get; set; } = new List<MessageReadReceipt>();
public Guid? RepliedMessageId { get; set; }
public Message? RepliedMessage { get; set; }
@ -43,7 +43,7 @@ public class Message : ModelBase
EditedAt = EditedAt,
Attachments = new List<CloudFile>(Attachments),
Reactions = new List<MessageReaction>(Reactions),
Statuses = new List<MessageStatus>(Statuses),
Statuses = new List<MessageReadReceipt>(Statuses),
RepliedMessageId = RepliedMessageId,
RepliedMessage = RepliedMessage?.Clone() as Message,
ForwardedMessageId = ForwardedMessageId,
@ -78,16 +78,14 @@ public class MessageReaction : ModelBase
public MessageReactionAttitude Attitude { get; set; }
}
/// If the status is exist, means the user has read the message.
/// If the status exists, means the user has read the message.
[Index(nameof(MessageId), nameof(SenderId), IsUnique = true)]
public class MessageStatus : ModelBase
public class MessageReadReceipt : ModelBase
{
public Guid MessageId { get; set; }
public Message Message { get; set; } = null!;
public Guid SenderId { get; set; }
public ChatMember Sender { get; set; } = null!;
public Instant ReadAt { get; set; }
}
[NotMapped]

View File

@ -1,5 +1,6 @@
using System.Net.WebSockets;
using DysonNetwork.Sphere.Chat;
using DysonNetwork.Sphere.Storage;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Internal;
@ -7,7 +8,13 @@ using SystemClock = NodaTime.SystemClock;
namespace DysonNetwork.Sphere.Connection.Handlers;
public class MessageReadHandler(AppDatabase db, IMemoryCache cache, ChatRoomService crs) : IWebSocketPacketHandler
public class MessageReadHandler(
AppDatabase db,
IMemoryCache cache,
ChatRoomService crs,
FlushBufferService buffer
)
: IWebSocketPacketHandler
{
public string PacketType => "messages.read";
@ -70,25 +77,16 @@ public class MessageReadHandler(AppDatabase db, IMemoryCache cache, ChatRoomServ
return;
}
db.ChatStatuses.Add(new MessageStatus
var readReceipt = new MessageReadReceipt
{
MessageId = request.MessageId,
SenderId = sender.Id,
ReadAt = SystemClock.Instance.GetCurrentInstant(),
});
};
try
{
await db.SaveChangesAsync();
buffer.Enqueue(readReceipt);
// Broadcast read statuses
var otherMembers = (await crs.ListRoomMembers(request.ChatRoomId)).Select(m => m.AccountId).ToList();
foreach (var member in otherMembers)
srv.SendPacketToAccount(member, packet);
}
catch
{
// ignored
}
var otherMembers = (await crs.ListRoomMembers(request.ChatRoomId)).Select(m => m.AccountId).ToList();
foreach (var member in otherMembers)
srv.SendPacketToAccount(member, packet);
}
}

View File

@ -22,7 +22,6 @@ using DysonNetwork.Sphere.Storage.Handlers;
using DysonNetwork.Sphere.Wallet;
using Microsoft.AspNetCore.HttpOverrides;
using Microsoft.AspNetCore.Mvc;
using Microsoft.AspNetCore.Mvc.Razor;
using Microsoft.AspNetCore.RateLimiting;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Options;
@ -138,6 +137,7 @@ builder.Services.AddSingleton(tusDiskStore);
builder.Services.AddSingleton<FlushBufferService>();
builder.Services.AddScoped<ActionLogFlushHandler>();
builder.Services.AddScoped<MessageReadReceiptFlushHandler>();
builder.Services.AddScoped<ActionLogService>();
// The handlers for websocket
@ -199,6 +199,24 @@ builder.Services.AddQuartz(q =>
.WithIntervalInMinutes(5)
.RepeatForever())
);
var readReceiptFlushJob = new JobKey("ReadReceiptFlush");
q.AddJob<ReadReceiptFlushJob>(opts => opts.WithIdentity(readReceiptFlushJob));
q.AddTrigger(opts => opts
.ForJob(readReceiptFlushJob)
.WithIdentity("ReadReceiptFlushTrigger")
.WithSimpleSchedule(o => o
.WithIntervalInSeconds(60)
.RepeatForever())
);
var readReceiptRecyclingJob = new JobKey("ReadReceiptRecycling");
q.AddJob<ReadReceiptRecyclingJob>(opts => opts.WithIdentity(readReceiptRecyclingJob));
q.AddTrigger(opts => opts
.ForJob(readReceiptRecyclingJob)
.WithIdentity("ReadReceiptRecyclingTrigger")
.WithCronSchedule("0 0 0 * * ?")
);
});
builder.Services.AddQuartzHostedService(q => q.WaitForJobsToComplete = true);

View File

@ -1,5 +1,6 @@
using DysonNetwork.Sphere.Account;
using EFCore.BulkExtensions;
using Quartz;
namespace DysonNetwork.Sphere.Storage.Handlers;
@ -13,3 +14,11 @@ public class ActionLogFlushHandler(IServiceProvider serviceProvider) : IFlushHan
await db.BulkInsertAsync(items);
}
}
public class ActionLogFlushJob(FlushBufferService fbs, ActionLogFlushHandler hdl) : IJob
{
public async Task Execute(IJobExecutionContext context)
{
await fbs.FlushAsync(hdl);
}
}

View File

@ -0,0 +1,37 @@
using DysonNetwork.Sphere.Chat;
using EFCore.BulkExtensions;
using Microsoft.EntityFrameworkCore;
using NodaTime;
using Quartz;
namespace DysonNetwork.Sphere.Storage.Handlers;
public class MessageReadReceiptFlushHandler(IServiceProvider serviceProvider) : IFlushHandler<MessageReadReceipt>
{
public async Task FlushAsync(IReadOnlyList<MessageReadReceipt> items)
{
using var scope = serviceProvider.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<AppDatabase>();
await db.BulkInsertAsync(items);
}
}
public class ReadReceiptFlushJob(FlushBufferService fbs, MessageReadReceiptFlushHandler hdl) : IJob
{
public async Task Execute(IJobExecutionContext context)
{
await fbs.FlushAsync(hdl);
}
}
public class ReadReceiptRecyclingJob(AppDatabase db) : IJob
{
public async Task Execute(IJobExecutionContext context)
{
var cutoff = SystemClock.Instance.GetCurrentInstant().Minus(Duration.FromDays(30));
await db.ChatReadReceipts
.Where(r => r.CreatedAt < cutoff)
.ExecuteDeleteAsync();
}
}