Compare commits
2 Commits
c597df3937
...
205ccd66b3
| Author | SHA1 | Date | |
|---|---|---|---|
| 205ccd66b3 | |||
| fdfdffa382 |
@@ -1,14 +1,12 @@
|
||||
using Quartz;
|
||||
using System.Collections.Concurrent;
|
||||
using DysonNetwork.Sphere.Connection;
|
||||
using Microsoft.AspNetCore.Http;
|
||||
using DysonNetwork.Sphere.Storage;
|
||||
using DysonNetwork.Sphere.Storage.Handlers;
|
||||
|
||||
namespace DysonNetwork.Sphere.Account;
|
||||
|
||||
public class ActionLogService(AppDatabase db, GeoIpService geo) : IDisposable
|
||||
public class ActionLogService(AppDatabase db, GeoIpService geo, FlushBufferService fbs)
|
||||
{
|
||||
private readonly ConcurrentQueue<ActionLog> _creationQueue = new();
|
||||
|
||||
public void CreateActionLog(Guid accountId, string action, Dictionary<string, object> meta)
|
||||
{
|
||||
var log = new ActionLog
|
||||
@@ -18,7 +16,7 @@ public class ActionLogService(AppDatabase db, GeoIpService geo) : IDisposable
|
||||
Meta = meta,
|
||||
};
|
||||
|
||||
_creationQueue.Enqueue(log);
|
||||
fbs.Enqueue(log);
|
||||
}
|
||||
|
||||
public void CreateActionLogFromRequest(string action, Dictionary<string, object> meta, HttpRequest request,
|
||||
@@ -43,42 +41,6 @@ public class ActionLogService(AppDatabase db, GeoIpService geo) : IDisposable
|
||||
if (request.HttpContext.Items["CurrentSession"] is Auth.Session currentSession)
|
||||
log.SessionId = currentSession.Id;
|
||||
|
||||
_creationQueue.Enqueue(log);
|
||||
}
|
||||
|
||||
public async Task FlushQueue()
|
||||
{
|
||||
var workingQueue = new List<ActionLog>();
|
||||
while (_creationQueue.TryDequeue(out var log))
|
||||
workingQueue.Add(log);
|
||||
|
||||
if (workingQueue.Count != 0)
|
||||
{
|
||||
try
|
||||
{
|
||||
await db.ActionLogs.AddRangeAsync(workingQueue);
|
||||
await db.SaveChangesAsync();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
foreach (var log in workingQueue)
|
||||
_creationQueue.Enqueue(log);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
FlushQueue().Wait();
|
||||
GC.SuppressFinalize(this);
|
||||
}
|
||||
}
|
||||
|
||||
public class ActionLogFlushJob(ActionLogService als) : IJob
|
||||
{
|
||||
public async Task Execute(IJobExecutionContext context)
|
||||
{
|
||||
await als.FlushQueue();
|
||||
fbs.Enqueue(log);
|
||||
}
|
||||
}
|
||||
@@ -63,7 +63,7 @@ public class AppDatabase(
|
||||
public DbSet<Chat.ChatMember> ChatMembers { get; set; }
|
||||
public DbSet<Chat.Message> ChatMessages { get; set; }
|
||||
public DbSet<Chat.RealtimeCall> ChatRealtimeCall { get; set; }
|
||||
public DbSet<Chat.MessageStatus> ChatStatuses { get; set; }
|
||||
public DbSet<Chat.MessageReadReceipt> ChatReadReceipts { get; set; }
|
||||
public DbSet<Chat.MessageReaction> ChatReactions { get; set; }
|
||||
|
||||
public DbSet<Sticker.Sticker> Stickers { get; set; }
|
||||
@@ -237,7 +237,7 @@ public class AppDatabase(
|
||||
.WithMany()
|
||||
.HasForeignKey(pm => pm.AccountId)
|
||||
.OnDelete(DeleteBehavior.Cascade);
|
||||
modelBuilder.Entity<Chat.MessageStatus>()
|
||||
modelBuilder.Entity<Chat.MessageReadReceipt>()
|
||||
.HasKey(e => new { e.MessageId, e.SenderId });
|
||||
modelBuilder.Entity<Chat.Message>()
|
||||
.HasOne(m => m.ForwardedMessage)
|
||||
|
||||
@@ -61,7 +61,7 @@ public class ChatService(AppDatabase db, IServiceScopeFactory scopeFactory)
|
||||
|
||||
public async Task MarkMessageAsReadAsync(Guid messageId, Guid roomId, Guid userId)
|
||||
{
|
||||
var existingStatus = await db.ChatStatuses
|
||||
var existingStatus = await db.ChatReadReceipts
|
||||
.FirstOrDefaultAsync(x => x.MessageId == messageId && x.Sender.AccountId == userId);
|
||||
var sender = await db.ChatMembers
|
||||
.Where(m => m.AccountId == userId && m.ChatRoomId == roomId)
|
||||
@@ -70,12 +70,12 @@ public class ChatService(AppDatabase db, IServiceScopeFactory scopeFactory)
|
||||
|
||||
if (existingStatus == null)
|
||||
{
|
||||
existingStatus = new MessageStatus
|
||||
existingStatus = new MessageReadReceipt
|
||||
{
|
||||
MessageId = messageId,
|
||||
SenderId = sender.Id,
|
||||
};
|
||||
db.ChatStatuses.Add(existingStatus);
|
||||
db.ChatReadReceipts.Add(existingStatus);
|
||||
}
|
||||
|
||||
await db.SaveChangesAsync();
|
||||
@@ -83,14 +83,16 @@ public class ChatService(AppDatabase db, IServiceScopeFactory scopeFactory)
|
||||
|
||||
public async Task<bool> GetMessageReadStatus(Guid messageId, Guid userId)
|
||||
{
|
||||
return await db.ChatStatuses
|
||||
return await db.ChatReadReceipts
|
||||
.AnyAsync(x => x.MessageId == messageId && x.Sender.AccountId == userId);
|
||||
}
|
||||
|
||||
public async Task<int> CountUnreadMessage(Guid userId, Guid chatRoomId)
|
||||
{
|
||||
var cutoff = SystemClock.Instance.GetCurrentInstant().Minus(Duration.FromDays(30));
|
||||
var messages = await db.ChatMessages
|
||||
.Where(m => m.ChatRoomId == chatRoomId)
|
||||
.Where(m => m.CreatedAt < cutoff)
|
||||
.Select(m => new MessageStatusResponse
|
||||
{
|
||||
MessageId = m.Id,
|
||||
@@ -103,12 +105,14 @@ public class ChatService(AppDatabase db, IServiceScopeFactory scopeFactory)
|
||||
|
||||
public async Task<Dictionary<Guid, int>> CountUnreadMessagesForJoinedRoomsAsync(Guid userId)
|
||||
{
|
||||
var cutoff = SystemClock.Instance.GetCurrentInstant().Minus(Duration.FromDays(30));
|
||||
var userRooms = await db.ChatMembers
|
||||
.Where(m => m.AccountId == userId)
|
||||
.Select(m => m.ChatRoomId)
|
||||
.ToListAsync();
|
||||
|
||||
var messages = await db.ChatMessages
|
||||
.Where(m => m.CreatedAt < cutoff)
|
||||
.Where(m => userRooms.Contains(m.ChatRoomId))
|
||||
.Select(m => new
|
||||
{
|
||||
|
||||
@@ -19,7 +19,7 @@ public class Message : ModelBase
|
||||
|
||||
public ICollection<CloudFile> Attachments { get; set; } = new List<CloudFile>();
|
||||
public ICollection<MessageReaction> Reactions { get; set; } = new List<MessageReaction>();
|
||||
public ICollection<MessageStatus> Statuses { get; set; } = new List<MessageStatus>();
|
||||
public ICollection<MessageReadReceipt> Statuses { get; set; } = new List<MessageReadReceipt>();
|
||||
|
||||
public Guid? RepliedMessageId { get; set; }
|
||||
public Message? RepliedMessage { get; set; }
|
||||
@@ -43,7 +43,7 @@ public class Message : ModelBase
|
||||
EditedAt = EditedAt,
|
||||
Attachments = new List<CloudFile>(Attachments),
|
||||
Reactions = new List<MessageReaction>(Reactions),
|
||||
Statuses = new List<MessageStatus>(Statuses),
|
||||
Statuses = new List<MessageReadReceipt>(Statuses),
|
||||
RepliedMessageId = RepliedMessageId,
|
||||
RepliedMessage = RepliedMessage?.Clone() as Message,
|
||||
ForwardedMessageId = ForwardedMessageId,
|
||||
@@ -78,16 +78,14 @@ public class MessageReaction : ModelBase
|
||||
public MessageReactionAttitude Attitude { get; set; }
|
||||
}
|
||||
|
||||
/// If the status is exist, means the user has read the message.
|
||||
/// If the status exists, means the user has read the message.
|
||||
[Index(nameof(MessageId), nameof(SenderId), IsUnique = true)]
|
||||
public class MessageStatus : ModelBase
|
||||
public class MessageReadReceipt : ModelBase
|
||||
{
|
||||
public Guid MessageId { get; set; }
|
||||
public Message Message { get; set; } = null!;
|
||||
public Guid SenderId { get; set; }
|
||||
public ChatMember Sender { get; set; } = null!;
|
||||
|
||||
public Instant ReadAt { get; set; }
|
||||
}
|
||||
|
||||
[NotMapped]
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
using System.Net.WebSockets;
|
||||
using DysonNetwork.Sphere.Chat;
|
||||
using DysonNetwork.Sphere.Storage;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.Extensions.Caching.Memory;
|
||||
using Microsoft.Extensions.Internal;
|
||||
@@ -7,7 +8,13 @@ using SystemClock = NodaTime.SystemClock;
|
||||
|
||||
namespace DysonNetwork.Sphere.Connection.Handlers;
|
||||
|
||||
public class MessageReadHandler(AppDatabase db, IMemoryCache cache, ChatRoomService crs) : IWebSocketPacketHandler
|
||||
public class MessageReadHandler(
|
||||
AppDatabase db,
|
||||
IMemoryCache cache,
|
||||
ChatRoomService crs,
|
||||
FlushBufferService buffer
|
||||
)
|
||||
: IWebSocketPacketHandler
|
||||
{
|
||||
public string PacketType => "messages.read";
|
||||
|
||||
@@ -70,25 +77,16 @@ public class MessageReadHandler(AppDatabase db, IMemoryCache cache, ChatRoomServ
|
||||
return;
|
||||
}
|
||||
|
||||
db.ChatStatuses.Add(new MessageStatus
|
||||
var readReceipt = new MessageReadReceipt
|
||||
{
|
||||
MessageId = request.MessageId,
|
||||
SenderId = sender.Id,
|
||||
ReadAt = SystemClock.Instance.GetCurrentInstant(),
|
||||
});
|
||||
};
|
||||
|
||||
try
|
||||
{
|
||||
await db.SaveChangesAsync();
|
||||
|
||||
// Broadcast read statuses
|
||||
var otherMembers = (await crs.ListRoomMembers(request.ChatRoomId)).Select(m => m.AccountId).ToList();
|
||||
foreach (var member in otherMembers)
|
||||
srv.SendPacketToAccount(member, packet);
|
||||
}
|
||||
catch
|
||||
{
|
||||
// ignored
|
||||
}
|
||||
buffer.Enqueue(readReceipt);
|
||||
|
||||
var otherMembers = (await crs.ListRoomMembers(request.ChatRoomId)).Select(m => m.AccountId).ToList();
|
||||
foreach (var member in otherMembers)
|
||||
srv.SendPacketToAccount(member, packet);
|
||||
}
|
||||
}
|
||||
@@ -27,6 +27,8 @@
|
||||
<PackageReference Include="BCrypt.Net-Next" Version="4.0.3" />
|
||||
<PackageReference Include="Blurhash.ImageSharp" Version="4.0.0" />
|
||||
<PackageReference Include="CorePush" Version="4.3.0" />
|
||||
<PackageReference Include="EFCore.BulkExtensions" Version="9.0.1" />
|
||||
<PackageReference Include="EFCore.BulkExtensions.PostgreSql" Version="9.0.1" />
|
||||
<PackageReference Include="EFCore.NamingConventions" Version="9.0.0" />
|
||||
<PackageReference Include="FFMpegCore" Version="5.2.0" />
|
||||
<PackageReference Include="MailKit" Version="4.11.0" />
|
||||
|
||||
@@ -18,10 +18,10 @@ using DysonNetwork.Sphere.Publisher;
|
||||
using DysonNetwork.Sphere.Realm;
|
||||
using DysonNetwork.Sphere.Sticker;
|
||||
using DysonNetwork.Sphere.Storage;
|
||||
using DysonNetwork.Sphere.Storage.Handlers;
|
||||
using DysonNetwork.Sphere.Wallet;
|
||||
using Microsoft.AspNetCore.HttpOverrides;
|
||||
using Microsoft.AspNetCore.Mvc;
|
||||
using Microsoft.AspNetCore.Mvc.Razor;
|
||||
using Microsoft.AspNetCore.RateLimiting;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.Extensions.Options;
|
||||
@@ -135,6 +135,11 @@ var tusDiskStore = new TusDiskStore(
|
||||
);
|
||||
builder.Services.AddSingleton(tusDiskStore);
|
||||
|
||||
builder.Services.AddSingleton<FlushBufferService>();
|
||||
builder.Services.AddScoped<ActionLogFlushHandler>();
|
||||
builder.Services.AddScoped<MessageReadReceiptFlushHandler>();
|
||||
builder.Services.AddScoped<ActionLogService>();
|
||||
|
||||
// The handlers for websocket
|
||||
builder.Services.AddScoped<IWebSocketPacketHandler, MessageReadHandler>();
|
||||
builder.Services.AddScoped<IWebSocketPacketHandler, MessageTypingHandler>();
|
||||
@@ -194,6 +199,24 @@ builder.Services.AddQuartz(q =>
|
||||
.WithIntervalInMinutes(5)
|
||||
.RepeatForever())
|
||||
);
|
||||
|
||||
var readReceiptFlushJob = new JobKey("ReadReceiptFlush");
|
||||
q.AddJob<ReadReceiptFlushJob>(opts => opts.WithIdentity(readReceiptFlushJob));
|
||||
q.AddTrigger(opts => opts
|
||||
.ForJob(readReceiptFlushJob)
|
||||
.WithIdentity("ReadReceiptFlushTrigger")
|
||||
.WithSimpleSchedule(o => o
|
||||
.WithIntervalInSeconds(60)
|
||||
.RepeatForever())
|
||||
);
|
||||
|
||||
var readReceiptRecyclingJob = new JobKey("ReadReceiptRecycling");
|
||||
q.AddJob<ReadReceiptRecyclingJob>(opts => opts.WithIdentity(readReceiptRecyclingJob));
|
||||
q.AddTrigger(opts => opts
|
||||
.ForJob(readReceiptRecyclingJob)
|
||||
.WithIdentity("ReadReceiptRecyclingTrigger")
|
||||
.WithCronSchedule("0 0 0 * * ?")
|
||||
);
|
||||
});
|
||||
builder.Services.AddQuartzHostedService(q => q.WaitForJobsToComplete = true);
|
||||
|
||||
|
||||
66
DysonNetwork.Sphere/Storage/FlushBufferService.cs
Normal file
66
DysonNetwork.Sphere/Storage/FlushBufferService.cs
Normal file
@@ -0,0 +1,66 @@
|
||||
using System.Collections.Concurrent;
|
||||
|
||||
namespace DysonNetwork.Sphere.Storage;
|
||||
|
||||
public interface IFlushHandler<T>
|
||||
{
|
||||
Task FlushAsync(IReadOnlyList<T> items);
|
||||
}
|
||||
|
||||
public class FlushBufferService
|
||||
{
|
||||
private readonly Dictionary<Type, object> _buffers = new();
|
||||
private readonly Lock _lockObject = new();
|
||||
|
||||
private ConcurrentQueue<T> _GetOrCreateBuffer<T>()
|
||||
{
|
||||
var type = typeof(T);
|
||||
lock (_lockObject)
|
||||
{
|
||||
if (!_buffers.TryGetValue(type, out var buffer))
|
||||
{
|
||||
buffer = new ConcurrentQueue<T>();
|
||||
_buffers[type] = buffer;
|
||||
}
|
||||
return (ConcurrentQueue<T>)buffer;
|
||||
}
|
||||
}
|
||||
|
||||
public void Enqueue<T>(T item)
|
||||
{
|
||||
var buffer = _GetOrCreateBuffer<T>();
|
||||
buffer.Enqueue(item);
|
||||
}
|
||||
|
||||
public async Task FlushAsync<T>(IFlushHandler<T> handler)
|
||||
{
|
||||
var buffer = _GetOrCreateBuffer<T>();
|
||||
var workingQueue = new List<T>();
|
||||
|
||||
while (buffer.TryDequeue(out var item))
|
||||
{
|
||||
workingQueue.Add(item);
|
||||
}
|
||||
|
||||
if (workingQueue.Count == 0)
|
||||
return;
|
||||
|
||||
try
|
||||
{
|
||||
await handler.FlushAsync(workingQueue);
|
||||
}
|
||||
catch (Exception)
|
||||
{
|
||||
// If flush fails, re-queue the items
|
||||
foreach (var item in workingQueue)
|
||||
buffer.Enqueue(item);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
public int GetPendingCount<T>()
|
||||
{
|
||||
var buffer = _GetOrCreateBuffer<T>();
|
||||
return buffer.Count;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,24 @@
|
||||
using DysonNetwork.Sphere.Account;
|
||||
using EFCore.BulkExtensions;
|
||||
using Quartz;
|
||||
|
||||
namespace DysonNetwork.Sphere.Storage.Handlers;
|
||||
|
||||
public class ActionLogFlushHandler(IServiceProvider serviceProvider) : IFlushHandler<ActionLog>
|
||||
{
|
||||
public async Task FlushAsync(IReadOnlyList<ActionLog> items)
|
||||
{
|
||||
using var scope = serviceProvider.CreateScope();
|
||||
var db = scope.ServiceProvider.GetRequiredService<AppDatabase>();
|
||||
|
||||
await db.BulkInsertAsync(items);
|
||||
}
|
||||
}
|
||||
|
||||
public class ActionLogFlushJob(FlushBufferService fbs, ActionLogFlushHandler hdl) : IJob
|
||||
{
|
||||
public async Task Execute(IJobExecutionContext context)
|
||||
{
|
||||
await fbs.FlushAsync(hdl);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,37 @@
|
||||
using DysonNetwork.Sphere.Chat;
|
||||
using EFCore.BulkExtensions;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using NodaTime;
|
||||
using Quartz;
|
||||
|
||||
namespace DysonNetwork.Sphere.Storage.Handlers;
|
||||
|
||||
public class MessageReadReceiptFlushHandler(IServiceProvider serviceProvider) : IFlushHandler<MessageReadReceipt>
|
||||
{
|
||||
public async Task FlushAsync(IReadOnlyList<MessageReadReceipt> items)
|
||||
{
|
||||
using var scope = serviceProvider.CreateScope();
|
||||
var db = scope.ServiceProvider.GetRequiredService<AppDatabase>();
|
||||
|
||||
await db.BulkInsertAsync(items);
|
||||
}
|
||||
}
|
||||
|
||||
public class ReadReceiptFlushJob(FlushBufferService fbs, MessageReadReceiptFlushHandler hdl) : IJob
|
||||
{
|
||||
public async Task Execute(IJobExecutionContext context)
|
||||
{
|
||||
await fbs.FlushAsync(hdl);
|
||||
}
|
||||
}
|
||||
|
||||
public class ReadReceiptRecyclingJob(AppDatabase db) : IJob
|
||||
{
|
||||
public async Task Execute(IJobExecutionContext context)
|
||||
{
|
||||
var cutoff = SystemClock.Instance.GetCurrentInstant().Minus(Duration.FromDays(30));
|
||||
await db.ChatReadReceipts
|
||||
.Where(r => r.CreatedAt < cutoff)
|
||||
.ExecuteDeleteAsync();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user