Optimize action log flushing

This commit is contained in:
LittleSheep 2025-05-18 12:00:05 +08:00
parent c597df3937
commit fdfdffa382
5 changed files with 95 additions and 37 deletions

View File

@ -1,14 +1,12 @@
using Quartz; using Quartz;
using System.Collections.Concurrent;
using DysonNetwork.Sphere.Connection; using DysonNetwork.Sphere.Connection;
using Microsoft.AspNetCore.Http; using DysonNetwork.Sphere.Storage;
using DysonNetwork.Sphere.Storage.Handlers;
namespace DysonNetwork.Sphere.Account; namespace DysonNetwork.Sphere.Account;
public class ActionLogService(AppDatabase db, GeoIpService geo) : IDisposable public class ActionLogService(AppDatabase db, GeoIpService geo, FlushBufferService fbs)
{ {
private readonly ConcurrentQueue<ActionLog> _creationQueue = new();
public void CreateActionLog(Guid accountId, string action, Dictionary<string, object> meta) public void CreateActionLog(Guid accountId, string action, Dictionary<string, object> meta)
{ {
var log = new ActionLog var log = new ActionLog
@ -18,7 +16,7 @@ public class ActionLogService(AppDatabase db, GeoIpService geo) : IDisposable
Meta = meta, Meta = meta,
}; };
_creationQueue.Enqueue(log); fbs.Enqueue(log);
} }
public void CreateActionLogFromRequest(string action, Dictionary<string, object> meta, HttpRequest request, public void CreateActionLogFromRequest(string action, Dictionary<string, object> meta, HttpRequest request,
@ -43,42 +41,14 @@ public class ActionLogService(AppDatabase db, GeoIpService geo) : IDisposable
if (request.HttpContext.Items["CurrentSession"] is Auth.Session currentSession) if (request.HttpContext.Items["CurrentSession"] is Auth.Session currentSession)
log.SessionId = currentSession.Id; log.SessionId = currentSession.Id;
_creationQueue.Enqueue(log); fbs.Enqueue(log);
}
public async Task FlushQueue()
{
var workingQueue = new List<ActionLog>();
while (_creationQueue.TryDequeue(out var log))
workingQueue.Add(log);
if (workingQueue.Count != 0)
{
try
{
await db.ActionLogs.AddRangeAsync(workingQueue);
await db.SaveChangesAsync();
}
catch (Exception ex)
{
foreach (var log in workingQueue)
_creationQueue.Enqueue(log);
throw;
}
}
}
public void Dispose()
{
FlushQueue().Wait();
GC.SuppressFinalize(this);
} }
} }
public class ActionLogFlushJob(ActionLogService als) : IJob public class ActionLogFlushJob(FlushBufferService fbs, ActionLogFlushHandler hdl) : IJob
{ {
public async Task Execute(IJobExecutionContext context) public async Task Execute(IJobExecutionContext context)
{ {
await als.FlushQueue(); await fbs.FlushAsync(hdl);
} }
} }

View File

@ -27,6 +27,8 @@
<PackageReference Include="BCrypt.Net-Next" Version="4.0.3" /> <PackageReference Include="BCrypt.Net-Next" Version="4.0.3" />
<PackageReference Include="Blurhash.ImageSharp" Version="4.0.0" /> <PackageReference Include="Blurhash.ImageSharp" Version="4.0.0" />
<PackageReference Include="CorePush" Version="4.3.0" /> <PackageReference Include="CorePush" Version="4.3.0" />
<PackageReference Include="EFCore.BulkExtensions" Version="9.0.1" />
<PackageReference Include="EFCore.BulkExtensions.PostgreSql" Version="9.0.1" />
<PackageReference Include="EFCore.NamingConventions" Version="9.0.0" /> <PackageReference Include="EFCore.NamingConventions" Version="9.0.0" />
<PackageReference Include="FFMpegCore" Version="5.2.0" /> <PackageReference Include="FFMpegCore" Version="5.2.0" />
<PackageReference Include="MailKit" Version="4.11.0" /> <PackageReference Include="MailKit" Version="4.11.0" />

View File

@ -18,6 +18,7 @@ using DysonNetwork.Sphere.Publisher;
using DysonNetwork.Sphere.Realm; using DysonNetwork.Sphere.Realm;
using DysonNetwork.Sphere.Sticker; using DysonNetwork.Sphere.Sticker;
using DysonNetwork.Sphere.Storage; using DysonNetwork.Sphere.Storage;
using DysonNetwork.Sphere.Storage.Handlers;
using DysonNetwork.Sphere.Wallet; using DysonNetwork.Sphere.Wallet;
using Microsoft.AspNetCore.HttpOverrides; using Microsoft.AspNetCore.HttpOverrides;
using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.Mvc;
@ -135,6 +136,10 @@ var tusDiskStore = new TusDiskStore(
); );
builder.Services.AddSingleton(tusDiskStore); builder.Services.AddSingleton(tusDiskStore);
builder.Services.AddSingleton<FlushBufferService>();
builder.Services.AddScoped<ActionLogFlushHandler>();
builder.Services.AddScoped<ActionLogService>();
// The handlers for websocket // The handlers for websocket
builder.Services.AddScoped<IWebSocketPacketHandler, MessageReadHandler>(); builder.Services.AddScoped<IWebSocketPacketHandler, MessageReadHandler>();
builder.Services.AddScoped<IWebSocketPacketHandler, MessageTypingHandler>(); builder.Services.AddScoped<IWebSocketPacketHandler, MessageTypingHandler>();

View File

@ -0,0 +1,66 @@
using System.Collections.Concurrent;
namespace DysonNetwork.Sphere.Storage;
public interface IFlushHandler<T>
{
Task FlushAsync(IReadOnlyList<T> items);
}
public class FlushBufferService
{
private readonly Dictionary<Type, object> _buffers = new();
private readonly Lock _lockObject = new();
private ConcurrentQueue<T> _GetOrCreateBuffer<T>()
{
var type = typeof(T);
lock (_lockObject)
{
if (!_buffers.TryGetValue(type, out var buffer))
{
buffer = new ConcurrentQueue<T>();
_buffers[type] = buffer;
}
return (ConcurrentQueue<T>)buffer;
}
}
public void Enqueue<T>(T item)
{
var buffer = _GetOrCreateBuffer<T>();
buffer.Enqueue(item);
}
public async Task FlushAsync<T>(IFlushHandler<T> handler)
{
var buffer = _GetOrCreateBuffer<T>();
var workingQueue = new List<T>();
while (buffer.TryDequeue(out var item))
{
workingQueue.Add(item);
}
if (workingQueue.Count == 0)
return;
try
{
await handler.FlushAsync(workingQueue);
}
catch (Exception)
{
// If flush fails, re-queue the items
foreach (var item in workingQueue)
buffer.Enqueue(item);
throw;
}
}
public int GetPendingCount<T>()
{
var buffer = _GetOrCreateBuffer<T>();
return buffer.Count;
}
}

View File

@ -0,0 +1,15 @@
using DysonNetwork.Sphere.Account;
using EFCore.BulkExtensions;
namespace DysonNetwork.Sphere.Storage.Handlers;
public class ActionLogFlushHandler(IServiceProvider serviceProvider) : IFlushHandler<ActionLog>
{
public async Task FlushAsync(IReadOnlyList<ActionLog> items)
{
using var scope = serviceProvider.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<AppDatabase>();
await db.BulkInsertAsync(items);
}
}