diff --git a/DysonNetwork.Sphere/Account/ActionLogService.cs b/DysonNetwork.Sphere/Account/ActionLogService.cs index cc39c68..e3a8cf5 100644 --- a/DysonNetwork.Sphere/Account/ActionLogService.cs +++ b/DysonNetwork.Sphere/Account/ActionLogService.cs @@ -43,12 +43,4 @@ public class ActionLogService(AppDatabase db, GeoIpService geo, FlushBufferServi fbs.Enqueue(log); } -} - -public class ActionLogFlushJob(FlushBufferService fbs, ActionLogFlushHandler hdl) : IJob -{ - public async Task Execute(IJobExecutionContext context) - { - await fbs.FlushAsync(hdl); - } } \ No newline at end of file diff --git a/DysonNetwork.Sphere/AppDatabase.cs b/DysonNetwork.Sphere/AppDatabase.cs index df81d9a..540d9d1 100644 --- a/DysonNetwork.Sphere/AppDatabase.cs +++ b/DysonNetwork.Sphere/AppDatabase.cs @@ -63,7 +63,7 @@ public class AppDatabase( public DbSet ChatMembers { get; set; } public DbSet ChatMessages { get; set; } public DbSet ChatRealtimeCall { get; set; } - public DbSet ChatStatuses { get; set; } + public DbSet ChatReadReceipts { get; set; } public DbSet ChatReactions { get; set; } public DbSet Stickers { get; set; } @@ -237,7 +237,7 @@ public class AppDatabase( .WithMany() .HasForeignKey(pm => pm.AccountId) .OnDelete(DeleteBehavior.Cascade); - modelBuilder.Entity() + modelBuilder.Entity() .HasKey(e => new { e.MessageId, e.SenderId }); modelBuilder.Entity() .HasOne(m => m.ForwardedMessage) diff --git a/DysonNetwork.Sphere/Chat/ChatService.cs b/DysonNetwork.Sphere/Chat/ChatService.cs index 7358dcd..1f34841 100644 --- a/DysonNetwork.Sphere/Chat/ChatService.cs +++ b/DysonNetwork.Sphere/Chat/ChatService.cs @@ -61,7 +61,7 @@ public class ChatService(AppDatabase db, IServiceScopeFactory scopeFactory) public async Task MarkMessageAsReadAsync(Guid messageId, Guid roomId, Guid userId) { - var existingStatus = await db.ChatStatuses + var existingStatus = await db.ChatReadReceipts .FirstOrDefaultAsync(x => x.MessageId == messageId && x.Sender.AccountId == userId); var sender = await db.ChatMembers .Where(m => m.AccountId == userId && m.ChatRoomId == roomId) @@ -70,12 +70,12 @@ public class ChatService(AppDatabase db, IServiceScopeFactory scopeFactory) if (existingStatus == null) { - existingStatus = new MessageStatus + existingStatus = new MessageReadReceipt { MessageId = messageId, SenderId = sender.Id, }; - db.ChatStatuses.Add(existingStatus); + db.ChatReadReceipts.Add(existingStatus); } await db.SaveChangesAsync(); @@ -83,14 +83,16 @@ public class ChatService(AppDatabase db, IServiceScopeFactory scopeFactory) public async Task GetMessageReadStatus(Guid messageId, Guid userId) { - return await db.ChatStatuses + return await db.ChatReadReceipts .AnyAsync(x => x.MessageId == messageId && x.Sender.AccountId == userId); } public async Task CountUnreadMessage(Guid userId, Guid chatRoomId) { + var cutoff = SystemClock.Instance.GetCurrentInstant().Minus(Duration.FromDays(30)); var messages = await db.ChatMessages .Where(m => m.ChatRoomId == chatRoomId) + .Where(m => m.CreatedAt < cutoff) .Select(m => new MessageStatusResponse { MessageId = m.Id, @@ -103,12 +105,14 @@ public class ChatService(AppDatabase db, IServiceScopeFactory scopeFactory) public async Task> CountUnreadMessagesForJoinedRoomsAsync(Guid userId) { + var cutoff = SystemClock.Instance.GetCurrentInstant().Minus(Duration.FromDays(30)); var userRooms = await db.ChatMembers .Where(m => m.AccountId == userId) .Select(m => m.ChatRoomId) .ToListAsync(); var messages = await db.ChatMessages + .Where(m => m.CreatedAt < cutoff) .Where(m => userRooms.Contains(m.ChatRoomId)) .Select(m => new { diff --git a/DysonNetwork.Sphere/Chat/Message.cs b/DysonNetwork.Sphere/Chat/Message.cs index 5e8288d..2ed95fb 100644 --- a/DysonNetwork.Sphere/Chat/Message.cs +++ b/DysonNetwork.Sphere/Chat/Message.cs @@ -19,7 +19,7 @@ public class Message : ModelBase public ICollection Attachments { get; set; } = new List(); public ICollection Reactions { get; set; } = new List(); - public ICollection Statuses { get; set; } = new List(); + public ICollection Statuses { get; set; } = new List(); public Guid? RepliedMessageId { get; set; } public Message? RepliedMessage { get; set; } @@ -43,7 +43,7 @@ public class Message : ModelBase EditedAt = EditedAt, Attachments = new List(Attachments), Reactions = new List(Reactions), - Statuses = new List(Statuses), + Statuses = new List(Statuses), RepliedMessageId = RepliedMessageId, RepliedMessage = RepliedMessage?.Clone() as Message, ForwardedMessageId = ForwardedMessageId, @@ -78,16 +78,14 @@ public class MessageReaction : ModelBase public MessageReactionAttitude Attitude { get; set; } } -/// If the status is exist, means the user has read the message. +/// If the status exists, means the user has read the message. [Index(nameof(MessageId), nameof(SenderId), IsUnique = true)] -public class MessageStatus : ModelBase +public class MessageReadReceipt : ModelBase { public Guid MessageId { get; set; } public Message Message { get; set; } = null!; public Guid SenderId { get; set; } public ChatMember Sender { get; set; } = null!; - - public Instant ReadAt { get; set; } } [NotMapped] diff --git a/DysonNetwork.Sphere/Connection/Handlers/MessageReadHandler.cs b/DysonNetwork.Sphere/Connection/Handlers/MessageReadHandler.cs index 2c17d67..240f498 100644 --- a/DysonNetwork.Sphere/Connection/Handlers/MessageReadHandler.cs +++ b/DysonNetwork.Sphere/Connection/Handlers/MessageReadHandler.cs @@ -1,5 +1,6 @@ using System.Net.WebSockets; using DysonNetwork.Sphere.Chat; +using DysonNetwork.Sphere.Storage; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Caching.Memory; using Microsoft.Extensions.Internal; @@ -7,7 +8,13 @@ using SystemClock = NodaTime.SystemClock; namespace DysonNetwork.Sphere.Connection.Handlers; -public class MessageReadHandler(AppDatabase db, IMemoryCache cache, ChatRoomService crs) : IWebSocketPacketHandler +public class MessageReadHandler( + AppDatabase db, + IMemoryCache cache, + ChatRoomService crs, + FlushBufferService buffer +) + : IWebSocketPacketHandler { public string PacketType => "messages.read"; @@ -70,25 +77,16 @@ public class MessageReadHandler(AppDatabase db, IMemoryCache cache, ChatRoomServ return; } - db.ChatStatuses.Add(new MessageStatus + var readReceipt = new MessageReadReceipt { MessageId = request.MessageId, SenderId = sender.Id, - ReadAt = SystemClock.Instance.GetCurrentInstant(), - }); + }; - try - { - await db.SaveChangesAsync(); - - // Broadcast read statuses - var otherMembers = (await crs.ListRoomMembers(request.ChatRoomId)).Select(m => m.AccountId).ToList(); - foreach (var member in otherMembers) - srv.SendPacketToAccount(member, packet); - } - catch - { - // ignored - } + buffer.Enqueue(readReceipt); + + var otherMembers = (await crs.ListRoomMembers(request.ChatRoomId)).Select(m => m.AccountId).ToList(); + foreach (var member in otherMembers) + srv.SendPacketToAccount(member, packet); } } \ No newline at end of file diff --git a/DysonNetwork.Sphere/Program.cs b/DysonNetwork.Sphere/Program.cs index 92577d7..1e41513 100644 --- a/DysonNetwork.Sphere/Program.cs +++ b/DysonNetwork.Sphere/Program.cs @@ -22,7 +22,6 @@ using DysonNetwork.Sphere.Storage.Handlers; using DysonNetwork.Sphere.Wallet; using Microsoft.AspNetCore.HttpOverrides; using Microsoft.AspNetCore.Mvc; -using Microsoft.AspNetCore.Mvc.Razor; using Microsoft.AspNetCore.RateLimiting; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Options; @@ -138,6 +137,7 @@ builder.Services.AddSingleton(tusDiskStore); builder.Services.AddSingleton(); builder.Services.AddScoped(); +builder.Services.AddScoped(); builder.Services.AddScoped(); // The handlers for websocket @@ -199,6 +199,24 @@ builder.Services.AddQuartz(q => .WithIntervalInMinutes(5) .RepeatForever()) ); + + var readReceiptFlushJob = new JobKey("ReadReceiptFlush"); + q.AddJob(opts => opts.WithIdentity(readReceiptFlushJob)); + q.AddTrigger(opts => opts + .ForJob(readReceiptFlushJob) + .WithIdentity("ReadReceiptFlushTrigger") + .WithSimpleSchedule(o => o + .WithIntervalInSeconds(60) + .RepeatForever()) + ); + + var readReceiptRecyclingJob = new JobKey("ReadReceiptRecycling"); + q.AddJob(opts => opts.WithIdentity(readReceiptRecyclingJob)); + q.AddTrigger(opts => opts + .ForJob(readReceiptRecyclingJob) + .WithIdentity("ReadReceiptRecyclingTrigger") + .WithCronSchedule("0 0 0 * * ?") + ); }); builder.Services.AddQuartzHostedService(q => q.WaitForJobsToComplete = true); diff --git a/DysonNetwork.Sphere/Storage/Handlers/ActionLogFlushHandler.cs b/DysonNetwork.Sphere/Storage/Handlers/ActionLogFlushHandler.cs index 3b66278..8798ff7 100644 --- a/DysonNetwork.Sphere/Storage/Handlers/ActionLogFlushHandler.cs +++ b/DysonNetwork.Sphere/Storage/Handlers/ActionLogFlushHandler.cs @@ -1,5 +1,6 @@ using DysonNetwork.Sphere.Account; using EFCore.BulkExtensions; +using Quartz; namespace DysonNetwork.Sphere.Storage.Handlers; @@ -9,7 +10,15 @@ public class ActionLogFlushHandler(IServiceProvider serviceProvider) : IFlushHan { using var scope = serviceProvider.CreateScope(); var db = scope.ServiceProvider.GetRequiredService(); - + await db.BulkInsertAsync(items); } +} + +public class ActionLogFlushJob(FlushBufferService fbs, ActionLogFlushHandler hdl) : IJob +{ + public async Task Execute(IJobExecutionContext context) + { + await fbs.FlushAsync(hdl); + } } \ No newline at end of file diff --git a/DysonNetwork.Sphere/Storage/Handlers/MessageReadReceiptFlushHandler.cs b/DysonNetwork.Sphere/Storage/Handlers/MessageReadReceiptFlushHandler.cs new file mode 100644 index 0000000..6e26223 --- /dev/null +++ b/DysonNetwork.Sphere/Storage/Handlers/MessageReadReceiptFlushHandler.cs @@ -0,0 +1,37 @@ +using DysonNetwork.Sphere.Chat; +using EFCore.BulkExtensions; +using Microsoft.EntityFrameworkCore; +using NodaTime; +using Quartz; + +namespace DysonNetwork.Sphere.Storage.Handlers; + +public class MessageReadReceiptFlushHandler(IServiceProvider serviceProvider) : IFlushHandler +{ + public async Task FlushAsync(IReadOnlyList items) + { + using var scope = serviceProvider.CreateScope(); + var db = scope.ServiceProvider.GetRequiredService(); + + await db.BulkInsertAsync(items); + } +} + +public class ReadReceiptFlushJob(FlushBufferService fbs, MessageReadReceiptFlushHandler hdl) : IJob +{ + public async Task Execute(IJobExecutionContext context) + { + await fbs.FlushAsync(hdl); + } +} + +public class ReadReceiptRecyclingJob(AppDatabase db) : IJob +{ + public async Task Execute(IJobExecutionContext context) + { + var cutoff = SystemClock.Instance.GetCurrentInstant().Minus(Duration.FromDays(30)); + await db.ChatReadReceipts + .Where(r => r.CreatedAt < cutoff) + .ExecuteDeleteAsync(); + } +} \ No newline at end of file