Files
.github
.idx
DysonNetwork.Sphere
Account
Activity
Auth
Chat
Connection
Developer
Email
Localization
Migrations
Pages
Permission
Post
Properties
Publisher
Realm
Resources
Sticker
Storage
Handlers
CacheService.cs
CloudFile.cs
CloudFileUnusedRecyclingJob.cs
FileController.cs
FileExpirationJob.cs
FileReferenceService.cs
FileService.ReferenceMigration.cs
FileService.cs
FlushBufferService.cs
ICloudFile.cs
TextSanitizer.cs
TusService.cs
Wallet
wwwroot
.DS_Store
.gitignore
AppDatabase.cs
Dockerfile
DysonNetwork.Sphere.csproj
DysonNetwork.Sphere.csproj.DotSettings.user
DysonNetwork.Sphere.http
Program.cs
appsettings.json
package.json
postcss.config.js
tailwind.config.js
.dockerignore
.gitignore
DysonNetwork.sln
DysonNetwork.sln.DotSettings.user
compose.yaml
Swarm/DysonNetwork.Sphere/Storage/FlushBufferService.cs
2025-05-18 12:00:05 +08:00

66 lines
1.5 KiB
C#

using System.Collections.Concurrent;
namespace DysonNetwork.Sphere.Storage;
public interface IFlushHandler<T>
{
Task FlushAsync(IReadOnlyList<T> items);
}
public class FlushBufferService
{
private readonly Dictionary<Type, object> _buffers = new();
private readonly Lock _lockObject = new();
private ConcurrentQueue<T> _GetOrCreateBuffer<T>()
{
var type = typeof(T);
lock (_lockObject)
{
if (!_buffers.TryGetValue(type, out var buffer))
{
buffer = new ConcurrentQueue<T>();
_buffers[type] = buffer;
}
return (ConcurrentQueue<T>)buffer;
}
}
public void Enqueue<T>(T item)
{
var buffer = _GetOrCreateBuffer<T>();
buffer.Enqueue(item);
}
public async Task FlushAsync<T>(IFlushHandler<T> handler)
{
var buffer = _GetOrCreateBuffer<T>();
var workingQueue = new List<T>();
while (buffer.TryDequeue(out var item))
{
workingQueue.Add(item);
}
if (workingQueue.Count == 0)
return;
try
{
await handler.FlushAsync(workingQueue);
}
catch (Exception)
{
// If flush fails, re-queue the items
foreach (var item in workingQueue)
buffer.Enqueue(item);
throw;
}
}
public int GetPendingCount<T>()
{
var buffer = _GetOrCreateBuffer<T>();
return buffer.Count;
}
}