♻️ Refactor the last read at system of chat
This commit is contained in:
parent
1b2ca34aad
commit
213d81a5ca
@ -63,7 +63,6 @@ public class AppDatabase(
|
|||||||
public DbSet<Chat.ChatMember> ChatMembers { get; set; }
|
public DbSet<Chat.ChatMember> ChatMembers { get; set; }
|
||||||
public DbSet<Chat.Message> ChatMessages { get; set; }
|
public DbSet<Chat.Message> ChatMessages { get; set; }
|
||||||
public DbSet<Chat.RealtimeCall> ChatRealtimeCall { get; set; }
|
public DbSet<Chat.RealtimeCall> ChatRealtimeCall { get; set; }
|
||||||
public DbSet<Chat.MessageReadReceipt> ChatReadReceipts { get; set; }
|
|
||||||
public DbSet<Chat.MessageReaction> ChatReactions { get; set; }
|
public DbSet<Chat.MessageReaction> ChatReactions { get; set; }
|
||||||
|
|
||||||
public DbSet<Sticker.Sticker> Stickers { get; set; }
|
public DbSet<Sticker.Sticker> Stickers { get; set; }
|
||||||
@ -232,8 +231,6 @@ public class AppDatabase(
|
|||||||
.WithMany()
|
.WithMany()
|
||||||
.HasForeignKey(pm => pm.AccountId)
|
.HasForeignKey(pm => pm.AccountId)
|
||||||
.OnDelete(DeleteBehavior.Cascade);
|
.OnDelete(DeleteBehavior.Cascade);
|
||||||
modelBuilder.Entity<Chat.MessageReadReceipt>()
|
|
||||||
.HasKey(e => new { e.MessageId, e.SenderId });
|
|
||||||
modelBuilder.Entity<Chat.Message>()
|
modelBuilder.Entity<Chat.Message>()
|
||||||
.HasOne(m => m.ForwardedMessage)
|
.HasOne(m => m.ForwardedMessage)
|
||||||
.WithMany()
|
.WithMany()
|
||||||
|
@ -15,7 +15,6 @@ public partial class ChatController(AppDatabase db, ChatService cs) : Controller
|
|||||||
{
|
{
|
||||||
public class MarkMessageReadRequest
|
public class MarkMessageReadRequest
|
||||||
{
|
{
|
||||||
public Guid MessageId { get; set; }
|
|
||||||
public Guid ChatRoomId { get; set; }
|
public Guid ChatRoomId { get; set; }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -63,6 +63,7 @@ public class ChatMember : ModelBase
|
|||||||
|
|
||||||
public ChatMemberRole Role { get; set; } = ChatMemberRole.Member;
|
public ChatMemberRole Role { get; set; } = ChatMemberRole.Member;
|
||||||
public ChatMemberNotify Notify { get; set; } = ChatMemberNotify.All;
|
public ChatMemberNotify Notify { get; set; } = ChatMemberNotify.All;
|
||||||
|
public Instant? LastReadAt { get; set; }
|
||||||
public Instant? JoinedAt { get; set; }
|
public Instant? JoinedAt { get; set; }
|
||||||
public Instant? LeaveAt { get; set; }
|
public Instant? LeaveAt { get; set; }
|
||||||
public bool IsBot { get; set; } = false;
|
public bool IsBot { get; set; } = false;
|
||||||
|
@ -63,73 +63,54 @@ public class ChatService(AppDatabase db, FileService fs, IServiceScopeFactory sc
|
|||||||
await Task.WhenAll(tasks);
|
await Task.WhenAll(tasks);
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task MarkMessageAsReadAsync(Guid messageId, Guid roomId, Guid userId)
|
/// <summary>
|
||||||
|
/// This method will instant update the LastReadAt field for chat member,
|
||||||
|
/// for better performance, using the flush buffer one instead
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="roomId">The user chat room</param>
|
||||||
|
/// <param name="userId">The user id</param>
|
||||||
|
/// <exception cref="ArgumentException"></exception>
|
||||||
|
public async Task ReadChatRoomAsync(Guid roomId, Guid userId)
|
||||||
{
|
{
|
||||||
var existingStatus = await db.ChatReadReceipts
|
|
||||||
.FirstOrDefaultAsync(x => x.MessageId == messageId && x.Sender.AccountId == userId);
|
|
||||||
var sender = await db.ChatMembers
|
var sender = await db.ChatMembers
|
||||||
.Where(m => m.AccountId == userId && m.ChatRoomId == roomId)
|
.Where(m => m.AccountId == userId && m.ChatRoomId == roomId)
|
||||||
.FirstOrDefaultAsync();
|
.FirstOrDefaultAsync();
|
||||||
if (sender is null) throw new ArgumentException("User is not a member of the chat room.");
|
if (sender is null) throw new ArgumentException("User is not a member of the chat room.");
|
||||||
|
|
||||||
if (existingStatus == null)
|
sender.LastReadAt = SystemClock.Instance.GetCurrentInstant();
|
||||||
{
|
|
||||||
existingStatus = new MessageReadReceipt
|
|
||||||
{
|
|
||||||
MessageId = messageId,
|
|
||||||
SenderId = sender.Id,
|
|
||||||
};
|
|
||||||
db.ChatReadReceipts.Add(existingStatus);
|
|
||||||
}
|
|
||||||
|
|
||||||
await db.SaveChangesAsync();
|
await db.SaveChangesAsync();
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task<bool> GetMessageReadStatus(Guid messageId, Guid userId)
|
|
||||||
{
|
|
||||||
return await db.ChatReadReceipts
|
|
||||||
.AnyAsync(x => x.MessageId == messageId && x.Sender.AccountId == userId);
|
|
||||||
}
|
|
||||||
|
|
||||||
public async Task<int> CountUnreadMessage(Guid userId, Guid chatRoomId)
|
public async Task<int> CountUnreadMessage(Guid userId, Guid chatRoomId)
|
||||||
{
|
{
|
||||||
var cutoff = SystemClock.Instance.GetCurrentInstant().Minus(Duration.FromDays(30));
|
var sender = await db.ChatMembers
|
||||||
var messages = await db.ChatMessages
|
.Where(m => m.AccountId == userId && m.ChatRoomId == chatRoomId)
|
||||||
.Where(m => m.ChatRoomId == chatRoomId)
|
.Select(m => new { m.LastReadAt })
|
||||||
.Where(m => m.CreatedAt < cutoff)
|
.FirstOrDefaultAsync();
|
||||||
.Select(m => new MessageStatusResponse
|
if (sender?.LastReadAt is null) return 0;
|
||||||
{
|
|
||||||
MessageId = m.Id,
|
|
||||||
IsRead = m.Statuses.Any(rs => rs.Sender.AccountId == userId)
|
|
||||||
})
|
|
||||||
.ToListAsync();
|
|
||||||
|
|
||||||
return messages.Count(m => !m.IsRead);
|
return await db.ChatMessages
|
||||||
|
.Where(m => m.ChatRoomId == chatRoomId)
|
||||||
|
.Where(m => m.CreatedAt > sender.LastReadAt)
|
||||||
|
.CountAsync();
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task<Dictionary<Guid, int>> CountUnreadMessageForUser(Guid userId)
|
public async Task<Dictionary<Guid, int>> CountUnreadMessageForUser(Guid userId)
|
||||||
{
|
{
|
||||||
var cutoff = SystemClock.Instance.GetCurrentInstant().Minus(Duration.FromDays(30));
|
var members = await db.ChatMembers
|
||||||
var userRooms = await db.ChatMembers
|
|
||||||
.Where(m => m.AccountId == userId)
|
.Where(m => m.AccountId == userId)
|
||||||
.Select(m => m.ChatRoomId)
|
.Select(m => new { m.ChatRoomId, m.LastReadAt })
|
||||||
.ToListAsync();
|
.ToListAsync();
|
||||||
|
|
||||||
var messages = await db.ChatMessages
|
var lastReadAt = members.ToDictionary(m => m.ChatRoomId, m => m.LastReadAt);
|
||||||
.Where(m => m.CreatedAt < cutoff)
|
var roomsId = lastReadAt.Keys.ToList();
|
||||||
.Where(m => userRooms.Contains(m.ChatRoomId))
|
|
||||||
.Select(m => new
|
return await db.ChatMessages
|
||||||
{
|
.Where(m => roomsId.Contains(m.ChatRoomId))
|
||||||
m.ChatRoomId,
|
|
||||||
IsRead = m.Statuses.Any(rs => rs.Sender.AccountId == userId)
|
|
||||||
})
|
|
||||||
.ToListAsync();
|
|
||||||
|
|
||||||
return messages
|
|
||||||
.GroupBy(m => m.ChatRoomId)
|
.GroupBy(m => m.ChatRoomId)
|
||||||
.ToDictionary(
|
.ToDictionaryAsync(
|
||||||
g => g.Key,
|
g => g.Key,
|
||||||
g => g.Count(m => !m.IsRead)
|
g => g.Count(m => lastReadAt[g.Key] == null || m.CreatedAt > lastReadAt[g.Key])
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -10,7 +10,7 @@ namespace DysonNetwork.Sphere.Chat;
|
|||||||
public class Message : ModelBase
|
public class Message : ModelBase
|
||||||
{
|
{
|
||||||
public Guid Id { get; set; } = Guid.NewGuid();
|
public Guid Id { get; set; } = Guid.NewGuid();
|
||||||
public string Type { get; set; } = null!;
|
[MaxLength(1024)] public string Type { get; set; } = null!;
|
||||||
[MaxLength(4096)] public string? Content { get; set; }
|
[MaxLength(4096)] public string? Content { get; set; }
|
||||||
[Column(TypeName = "jsonb")] public Dictionary<string, object>? Meta { get; set; }
|
[Column(TypeName = "jsonb")] public Dictionary<string, object>? Meta { get; set; }
|
||||||
[Column(TypeName = "jsonb")] public List<Guid>? MembersMentioned { get; set; }
|
[Column(TypeName = "jsonb")] public List<Guid>? MembersMentioned { get; set; }
|
||||||
@ -19,7 +19,6 @@ public class Message : ModelBase
|
|||||||
|
|
||||||
public ICollection<CloudFile> Attachments { get; set; } = new List<CloudFile>();
|
public ICollection<CloudFile> Attachments { get; set; } = new List<CloudFile>();
|
||||||
public ICollection<MessageReaction> Reactions { get; set; } = new List<MessageReaction>();
|
public ICollection<MessageReaction> Reactions { get; set; } = new List<MessageReaction>();
|
||||||
public ICollection<MessageReadReceipt> Statuses { get; set; } = new List<MessageReadReceipt>();
|
|
||||||
|
|
||||||
public Guid? RepliedMessageId { get; set; }
|
public Guid? RepliedMessageId { get; set; }
|
||||||
public Message? RepliedMessage { get; set; }
|
public Message? RepliedMessage { get; set; }
|
||||||
@ -43,7 +42,6 @@ public class Message : ModelBase
|
|||||||
EditedAt = EditedAt,
|
EditedAt = EditedAt,
|
||||||
Attachments = new List<CloudFile>(Attachments),
|
Attachments = new List<CloudFile>(Attachments),
|
||||||
Reactions = new List<MessageReaction>(Reactions),
|
Reactions = new List<MessageReaction>(Reactions),
|
||||||
Statuses = new List<MessageReadReceipt>(Statuses),
|
|
||||||
RepliedMessageId = RepliedMessageId,
|
RepliedMessageId = RepliedMessageId,
|
||||||
RepliedMessage = RepliedMessage?.Clone() as Message,
|
RepliedMessage = RepliedMessage?.Clone() as Message,
|
||||||
ForwardedMessageId = ForwardedMessageId,
|
ForwardedMessageId = ForwardedMessageId,
|
||||||
@ -78,19 +76,13 @@ public class MessageReaction : ModelBase
|
|||||||
public MessageReactionAttitude Attitude { get; set; }
|
public MessageReactionAttitude Attitude { get; set; }
|
||||||
}
|
}
|
||||||
|
|
||||||
/// If the status exists, means the user has read the message.
|
/// <summary>
|
||||||
[Index(nameof(MessageId), nameof(SenderId), IsUnique = true)]
|
/// The data model for updating the last read at field for chat member,
|
||||||
public class MessageReadReceipt : ModelBase
|
/// after the refactor of the unread system, this no longer stored in the database.
|
||||||
{
|
/// Not only used for the data transmission object
|
||||||
public Guid MessageId { get; set; }
|
/// </summary>
|
||||||
public Message Message { get; set; } = null!;
|
|
||||||
public Guid SenderId { get; set; }
|
|
||||||
public ChatMember Sender { get; set; } = null!;
|
|
||||||
}
|
|
||||||
|
|
||||||
[NotMapped]
|
[NotMapped]
|
||||||
public class MessageStatusResponse
|
public class MessageReadReceipt
|
||||||
{
|
{
|
||||||
public Guid MessageId { get; set; }
|
public Guid SenderId { get; set; }
|
||||||
public bool IsRead { get; set; }
|
|
||||||
}
|
}
|
@ -79,7 +79,6 @@ public class MessageReadHandler(
|
|||||||
|
|
||||||
var readReceipt = new MessageReadReceipt
|
var readReceipt = new MessageReadReceipt
|
||||||
{
|
{
|
||||||
MessageId = request.MessageId,
|
|
||||||
SenderId = sender.Id,
|
SenderId = sender.Id,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -210,14 +210,6 @@ builder.Services.AddQuartz(q =>
|
|||||||
.WithIntervalInSeconds(60)
|
.WithIntervalInSeconds(60)
|
||||||
.RepeatForever())
|
.RepeatForever())
|
||||||
);
|
);
|
||||||
|
|
||||||
var readReceiptRecyclingJob = new JobKey("ReadReceiptRecycling");
|
|
||||||
q.AddJob<ReadReceiptRecyclingJob>(opts => opts.WithIdentity(readReceiptRecyclingJob));
|
|
||||||
q.AddTrigger(opts => opts
|
|
||||||
.ForJob(readReceiptRecyclingJob)
|
|
||||||
.WithIdentity("ReadReceiptRecyclingTrigger")
|
|
||||||
.WithCronSchedule("0 0 0 * * ?")
|
|
||||||
);
|
|
||||||
});
|
});
|
||||||
builder.Services.AddQuartzHostedService(q => q.WaitForJobsToComplete = true);
|
builder.Services.AddQuartzHostedService(q => q.WaitForJobsToComplete = true);
|
||||||
|
|
||||||
|
@ -10,22 +10,17 @@ public class MessageReadReceiptFlushHandler(IServiceProvider serviceProvider) :
|
|||||||
{
|
{
|
||||||
public async Task FlushAsync(IReadOnlyList<MessageReadReceipt> items)
|
public async Task FlushAsync(IReadOnlyList<MessageReadReceipt> items)
|
||||||
{
|
{
|
||||||
var distinctItems = items
|
var now = SystemClock.Instance.GetCurrentInstant();
|
||||||
.DistinctBy(x => new { x.MessageId, x.SenderId })
|
var distinctId = items
|
||||||
.Select(x =>
|
.DistinctBy(x => x.SenderId)
|
||||||
{
|
.Select(x => x.SenderId)
|
||||||
x.CreatedAt = SystemClock.Instance.GetCurrentInstant();
|
|
||||||
x.UpdatedAt = x.CreatedAt;
|
|
||||||
return x;
|
|
||||||
})
|
|
||||||
.ToList();
|
.ToList();
|
||||||
|
|
||||||
using var scope = serviceProvider.CreateScope();
|
using var scope = serviceProvider.CreateScope();
|
||||||
var db = scope.ServiceProvider.GetRequiredService<AppDatabase>();
|
var db = scope.ServiceProvider.GetRequiredService<AppDatabase>();
|
||||||
await db.BulkInsertAsync(distinctItems, config => {
|
await db.ChatMembers.Where(r => distinctId.Contains(r.Id))
|
||||||
config.ConflictOption = ConflictOption.Ignore;
|
.ExecuteUpdateAsync(s => s.SetProperty(m => m.LastReadAt, now)
|
||||||
config.UpdateByProperties = [nameof(MessageReadReceipt.MessageId), nameof(MessageReadReceipt.SenderId)];
|
);
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -36,14 +31,3 @@ public class ReadReceiptFlushJob(FlushBufferService fbs, MessageReadReceiptFlush
|
|||||||
await fbs.FlushAsync(hdl);
|
await fbs.FlushAsync(hdl);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public class ReadReceiptRecyclingJob(AppDatabase db) : IJob
|
|
||||||
{
|
|
||||||
public async Task Execute(IJobExecutionContext context)
|
|
||||||
{
|
|
||||||
var cutoff = SystemClock.Instance.GetCurrentInstant().Minus(Duration.FromDays(30));
|
|
||||||
await db.ChatReadReceipts
|
|
||||||
.Where(r => r.CreatedAt < cutoff)
|
|
||||||
.ExecuteDeleteAsync();
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
x
Reference in New Issue
Block a user