105 lines
3.0 KiB
C#
105 lines
3.0 KiB
C#
using System.Collections.Concurrent;
|
|
using DysonNetwork.Pass.Features.Account;
|
|
using DysonNetwork.Common.Models;
|
|
using Microsoft.EntityFrameworkCore;
|
|
using NodaTime;
|
|
|
|
namespace DysonNetwork.Pass.Storage.Handlers;
|
|
|
|
public class FlushBufferService
|
|
{
|
|
private readonly ConcurrentQueue<object> _buffer = new();
|
|
private readonly IServiceScopeFactory _scopeFactory;
|
|
private readonly ILogger<FlushBufferService> _logger;
|
|
private Timer? _timer;
|
|
|
|
public FlushBufferService(IServiceScopeFactory scopeFactory, ILogger<FlushBufferService> logger)
|
|
{
|
|
_scopeFactory = scopeFactory;
|
|
_logger = logger;
|
|
_timer = new Timer(FlushBuffer, null, TimeSpan.Zero, TimeSpan.FromSeconds(5));
|
|
}
|
|
|
|
public void Enqueue(object item)
|
|
{
|
|
_buffer.Enqueue(item);
|
|
}
|
|
|
|
private async void FlushBuffer(object? state)
|
|
{
|
|
if (_buffer.IsEmpty) return;
|
|
|
|
using var scope = _scopeFactory.CreateScope();
|
|
var db = scope.ServiceProvider.GetRequiredService<Data.PassDatabase>();
|
|
|
|
var itemsToProcess = new List<object>();
|
|
while (_buffer.TryDequeue(out var item))
|
|
{
|
|
itemsToProcess.Add(item);
|
|
}
|
|
|
|
if (itemsToProcess.Count == 0) return;
|
|
|
|
_logger.LogInformation("Flushing {Count} items from buffer.", itemsToProcess.Count);
|
|
|
|
foreach (var item in itemsToProcess)
|
|
{
|
|
switch (item)
|
|
{
|
|
case LastActiveInfo lastActiveInfo:
|
|
await HandleLastActiveInfo(db, lastActiveInfo);
|
|
break;
|
|
case ActionLog actionLog:
|
|
await HandleActionLog(db, actionLog);
|
|
break;
|
|
default:
|
|
_logger.LogWarning("Unknown item type in buffer: {Type}", item.GetType().Name);
|
|
break;
|
|
}
|
|
}
|
|
|
|
try
|
|
{
|
|
await db.SaveChangesAsync();
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "Error saving changes during buffer flush.");
|
|
}
|
|
}
|
|
|
|
private async Task HandleLastActiveInfo(Data.PassDatabase db, LastActiveInfo info)
|
|
{
|
|
var profile = await db.AccountProfiles.FirstOrDefaultAsync(p => p.AccountId == info.Account.Id);
|
|
if (profile != null)
|
|
{
|
|
profile.LastSeenAt = info.SeenAt;
|
|
db.AccountProfiles.Update(profile);
|
|
}
|
|
|
|
var session = await db.AuthSessions.FirstOrDefaultAsync(s => s.Id == info.Session.Id);
|
|
if (session != null)
|
|
{
|
|
session.LastGrantedAt = info.SeenAt;
|
|
db.AuthSessions.Update(session);
|
|
}
|
|
}
|
|
|
|
private async Task HandleActionLog(Data.PassDatabase db, ActionLog log)
|
|
{
|
|
db.ActionLogs.Add(log);
|
|
}
|
|
|
|
public void Dispose()
|
|
{
|
|
_timer?.Dispose();
|
|
_timer = null;
|
|
}
|
|
}
|
|
|
|
public class LastActiveInfo
|
|
{
|
|
public Account Account { get; set; } = null!;
|
|
public AuthSession Session { get; set; } = null!;
|
|
public Instant SeenAt { get; set; }
|
|
} |