From fdfdffa3827be5ee3bfa7f2962487742872be1e2 Mon Sep 17 00:00:00 2001 From: LittleSheep Date: Sun, 18 May 2025 12:00:05 +0800 Subject: [PATCH] :zap: Optimize action log flushing --- .../Account/ActionLogService.cs | 44 ++----------- .../DysonNetwork.Sphere.csproj | 2 + DysonNetwork.Sphere/Program.cs | 5 ++ .../Storage/FlushBufferService.cs | 66 +++++++++++++++++++ .../Storage/Handlers/ActionLogFlushHandler.cs | 15 +++++ 5 files changed, 95 insertions(+), 37 deletions(-) create mode 100644 DysonNetwork.Sphere/Storage/FlushBufferService.cs create mode 100644 DysonNetwork.Sphere/Storage/Handlers/ActionLogFlushHandler.cs diff --git a/DysonNetwork.Sphere/Account/ActionLogService.cs b/DysonNetwork.Sphere/Account/ActionLogService.cs index 1f47b41..cc39c68 100644 --- a/DysonNetwork.Sphere/Account/ActionLogService.cs +++ b/DysonNetwork.Sphere/Account/ActionLogService.cs @@ -1,14 +1,12 @@ using Quartz; -using System.Collections.Concurrent; using DysonNetwork.Sphere.Connection; -using Microsoft.AspNetCore.Http; +using DysonNetwork.Sphere.Storage; +using DysonNetwork.Sphere.Storage.Handlers; namespace DysonNetwork.Sphere.Account; -public class ActionLogService(AppDatabase db, GeoIpService geo) : IDisposable +public class ActionLogService(AppDatabase db, GeoIpService geo, FlushBufferService fbs) { - private readonly ConcurrentQueue _creationQueue = new(); - public void CreateActionLog(Guid accountId, string action, Dictionary meta) { var log = new ActionLog @@ -18,7 +16,7 @@ public class ActionLogService(AppDatabase db, GeoIpService geo) : IDisposable Meta = meta, }; - _creationQueue.Enqueue(log); + fbs.Enqueue(log); } public void CreateActionLogFromRequest(string action, Dictionary meta, HttpRequest request, @@ -43,42 +41,14 @@ public class ActionLogService(AppDatabase db, GeoIpService geo) : IDisposable if (request.HttpContext.Items["CurrentSession"] is Auth.Session currentSession) log.SessionId = currentSession.Id; - _creationQueue.Enqueue(log); - } - - public async Task FlushQueue() - { - var workingQueue = new List(); - while (_creationQueue.TryDequeue(out var log)) - workingQueue.Add(log); - - if (workingQueue.Count != 0) - { - try - { - await db.ActionLogs.AddRangeAsync(workingQueue); - await db.SaveChangesAsync(); - } - catch (Exception ex) - { - foreach (var log in workingQueue) - _creationQueue.Enqueue(log); - throw; - } - } - } - - public void Dispose() - { - FlushQueue().Wait(); - GC.SuppressFinalize(this); + fbs.Enqueue(log); } } -public class ActionLogFlushJob(ActionLogService als) : IJob +public class ActionLogFlushJob(FlushBufferService fbs, ActionLogFlushHandler hdl) : IJob { public async Task Execute(IJobExecutionContext context) { - await als.FlushQueue(); + await fbs.FlushAsync(hdl); } } \ No newline at end of file diff --git a/DysonNetwork.Sphere/DysonNetwork.Sphere.csproj b/DysonNetwork.Sphere/DysonNetwork.Sphere.csproj index 99c938f..670e461 100644 --- a/DysonNetwork.Sphere/DysonNetwork.Sphere.csproj +++ b/DysonNetwork.Sphere/DysonNetwork.Sphere.csproj @@ -27,6 +27,8 @@ + + diff --git a/DysonNetwork.Sphere/Program.cs b/DysonNetwork.Sphere/Program.cs index d961ac6..92577d7 100644 --- a/DysonNetwork.Sphere/Program.cs +++ b/DysonNetwork.Sphere/Program.cs @@ -18,6 +18,7 @@ using DysonNetwork.Sphere.Publisher; using DysonNetwork.Sphere.Realm; using DysonNetwork.Sphere.Sticker; using DysonNetwork.Sphere.Storage; +using DysonNetwork.Sphere.Storage.Handlers; using DysonNetwork.Sphere.Wallet; using Microsoft.AspNetCore.HttpOverrides; using Microsoft.AspNetCore.Mvc; @@ -135,6 +136,10 @@ var tusDiskStore = new TusDiskStore( ); builder.Services.AddSingleton(tusDiskStore); +builder.Services.AddSingleton(); +builder.Services.AddScoped(); +builder.Services.AddScoped(); + // The handlers for websocket builder.Services.AddScoped(); builder.Services.AddScoped(); diff --git a/DysonNetwork.Sphere/Storage/FlushBufferService.cs b/DysonNetwork.Sphere/Storage/FlushBufferService.cs new file mode 100644 index 0000000..43dd6d8 --- /dev/null +++ b/DysonNetwork.Sphere/Storage/FlushBufferService.cs @@ -0,0 +1,66 @@ +using System.Collections.Concurrent; + +namespace DysonNetwork.Sphere.Storage; + +public interface IFlushHandler +{ + Task FlushAsync(IReadOnlyList items); +} + +public class FlushBufferService +{ + private readonly Dictionary _buffers = new(); + private readonly Lock _lockObject = new(); + + private ConcurrentQueue _GetOrCreateBuffer() + { + var type = typeof(T); + lock (_lockObject) + { + if (!_buffers.TryGetValue(type, out var buffer)) + { + buffer = new ConcurrentQueue(); + _buffers[type] = buffer; + } + return (ConcurrentQueue)buffer; + } + } + + public void Enqueue(T item) + { + var buffer = _GetOrCreateBuffer(); + buffer.Enqueue(item); + } + + public async Task FlushAsync(IFlushHandler handler) + { + var buffer = _GetOrCreateBuffer(); + var workingQueue = new List(); + + while (buffer.TryDequeue(out var item)) + { + workingQueue.Add(item); + } + + if (workingQueue.Count == 0) + return; + + try + { + await handler.FlushAsync(workingQueue); + } + catch (Exception) + { + // If flush fails, re-queue the items + foreach (var item in workingQueue) + buffer.Enqueue(item); + throw; + } + } + + public int GetPendingCount() + { + var buffer = _GetOrCreateBuffer(); + return buffer.Count; + } +} \ No newline at end of file diff --git a/DysonNetwork.Sphere/Storage/Handlers/ActionLogFlushHandler.cs b/DysonNetwork.Sphere/Storage/Handlers/ActionLogFlushHandler.cs new file mode 100644 index 0000000..3b66278 --- /dev/null +++ b/DysonNetwork.Sphere/Storage/Handlers/ActionLogFlushHandler.cs @@ -0,0 +1,15 @@ +using DysonNetwork.Sphere.Account; +using EFCore.BulkExtensions; + +namespace DysonNetwork.Sphere.Storage.Handlers; + +public class ActionLogFlushHandler(IServiceProvider serviceProvider) : IFlushHandler +{ + public async Task FlushAsync(IReadOnlyList items) + { + using var scope = serviceProvider.CreateScope(); + var db = scope.ServiceProvider.GetRequiredService(); + + await db.BulkInsertAsync(items); + } +} \ No newline at end of file