66 lines
		
	
	
		
			1.7 KiB
		
	
	
	
		
			C#
		
	
	
	
	
	
			
		
		
	
	
			66 lines
		
	
	
		
			1.7 KiB
		
	
	
	
		
			C#
		
	
	
	
	
	
| using System.Collections.Concurrent;
 | |
| using Microsoft.Extensions.Logging;
 | |
| 
 | |
| namespace DysonNetwork.Shared.Cache;
 | |
| 
 | |
| public interface IFlushHandler<T>
 | |
| {
 | |
|     Task FlushAsync(IReadOnlyList<T> items);
 | |
| }
 | |
| 
 | |
| public class FlushBufferService(ILogger<FlushBufferService> logger)
 | |
| {
 | |
|     private readonly Dictionary<Type, object> _buffers = new();
 | |
|     private readonly Lock _lockObject = new();
 | |
| 
 | |
|     private ConcurrentQueue<T> _GetOrCreateBuffer<T>()
 | |
|     {
 | |
|         var type = typeof(T);
 | |
|         lock (_lockObject)
 | |
|         {
 | |
|             if (_buffers.TryGetValue(type, out var buffer)) return (ConcurrentQueue<T>)buffer;
 | |
|             buffer = new ConcurrentQueue<T>();
 | |
|             _buffers[type] = buffer;
 | |
|             return (ConcurrentQueue<T>)buffer;
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     public void Enqueue<T>(T item)
 | |
|     {
 | |
|         var buffer = _GetOrCreateBuffer<T>();
 | |
|         buffer.Enqueue(item);
 | |
|     }
 | |
| 
 | |
|     public async Task FlushAsync<T>(IFlushHandler<T> handler)
 | |
|     {
 | |
|         var buffer = _GetOrCreateBuffer<T>();
 | |
|         var workingQueue = new List<T>();
 | |
| 
 | |
|         while (buffer.TryDequeue(out var item))
 | |
|         {
 | |
|             workingQueue.Add(item);
 | |
|         }
 | |
| 
 | |
|         if (workingQueue.Count == 0)
 | |
|             return;
 | |
| 
 | |
|         try
 | |
|         {
 | |
|             await handler.FlushAsync(workingQueue);
 | |
|         }
 | |
|         catch (Exception ex)
 | |
|         {
 | |
|             logger.LogError(ex, "Error flushing {Count} items {ItemType}", workingQueue.Count, typeof(T));
 | |
|             // If flush fails, re-queue the items
 | |
|             foreach (var item in workingQueue)
 | |
|                 buffer.Enqueue(item);
 | |
|             throw;
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     public int GetPendingCount<T>()
 | |
|     {
 | |
|         var buffer = _GetOrCreateBuffer<T>();
 | |
|         return buffer.Count;
 | |
|     }
 | |
| } |