66 lines
1.5 KiB
C#
66 lines
1.5 KiB
C#
using System.Collections.Concurrent;
|
|
|
|
namespace DysonNetwork.Sphere.Storage;
|
|
|
|
public interface IFlushHandler<T>
|
|
{
|
|
Task FlushAsync(IReadOnlyList<T> items);
|
|
}
|
|
|
|
public class FlushBufferService
|
|
{
|
|
private readonly Dictionary<Type, object> _buffers = new();
|
|
private readonly Lock _lockObject = new();
|
|
|
|
private ConcurrentQueue<T> _GetOrCreateBuffer<T>()
|
|
{
|
|
var type = typeof(T);
|
|
lock (_lockObject)
|
|
{
|
|
if (!_buffers.TryGetValue(type, out var buffer))
|
|
{
|
|
buffer = new ConcurrentQueue<T>();
|
|
_buffers[type] = buffer;
|
|
}
|
|
return (ConcurrentQueue<T>)buffer;
|
|
}
|
|
}
|
|
|
|
public void Enqueue<T>(T item)
|
|
{
|
|
var buffer = _GetOrCreateBuffer<T>();
|
|
buffer.Enqueue(item);
|
|
}
|
|
|
|
public async Task FlushAsync<T>(IFlushHandler<T> handler)
|
|
{
|
|
var buffer = _GetOrCreateBuffer<T>();
|
|
var workingQueue = new List<T>();
|
|
|
|
while (buffer.TryDequeue(out var item))
|
|
{
|
|
workingQueue.Add(item);
|
|
}
|
|
|
|
if (workingQueue.Count == 0)
|
|
return;
|
|
|
|
try
|
|
{
|
|
await handler.FlushAsync(workingQueue);
|
|
}
|
|
catch (Exception)
|
|
{
|
|
// If flush fails, re-queue the items
|
|
foreach (var item in workingQueue)
|
|
buffer.Enqueue(item);
|
|
throw;
|
|
}
|
|
}
|
|
|
|
public int GetPendingCount<T>()
|
|
{
|
|
var buffer = _GetOrCreateBuffer<T>();
|
|
return buffer.Count;
|
|
}
|
|
} |