Optimize push notification saving by introducing the flush buffer

This commit is contained in:
2025-08-14 15:31:33 +08:00
parent b037ecad79
commit b04b17c8ae
6 changed files with 52 additions and 26 deletions

View File

@@ -12,7 +12,7 @@ public class ActionLogFlushHandler(IServiceProvider serviceProvider) : IFlushHan
{
using var scope = serviceProvider.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<AppDatabase>();
await db.BulkInsertAsync(items.Select(x =>
{
x.CreatedAt = SystemClock.Instance.GetCurrentInstant();
@@ -28,4 +28,4 @@ public class ActionLogFlushJob(FlushBufferService fbs, ActionLogFlushHandler hdl
{
await fbs.FlushAsync(hdl);
}
}
}

View File

@@ -0,0 +1,28 @@
using DysonNetwork.Shared.Cache;
using EFCore.BulkExtensions;
using NodaTime;
using Quartz;
namespace DysonNetwork.Pusher.Notification;
public class NotificationFlushHandler(AppDatabase db, Logger<NotificationFlushHandler> logger) : IFlushHandler<Notification>
{
public async Task FlushAsync(IReadOnlyList<Notification> items)
{
await db.BulkInsertAsync(items.Select(x =>
{
x.CreatedAt = SystemClock.Instance.GetCurrentInstant();
x.UpdatedAt = x.CreatedAt;
return x;
}), config => config.ConflictOption = ConflictOption.Ignore);
logger.LogInformation("Stored {Count} notifications", items.Count);
}
}
public class NotificationFlushJob(FlushBufferService fbs, NotificationFlushHandler hdl) : IJob
{
public async Task Execute(IJobExecutionContext context)
{
await fbs.FlushAsync(hdl);
}
}

View File

@@ -1,6 +1,7 @@
using CorePush.Apple;
using CorePush.Firebase;
using DysonNetwork.Pusher.Connection;
using DysonNetwork.Shared.Cache;
using DysonNetwork.Shared.Proto;
using EFCore.BulkExtensions;
using Microsoft.EntityFrameworkCore;
@@ -11,6 +12,7 @@ namespace DysonNetwork.Pusher.Notification;
public class PushService
{
private readonly AppDatabase _db;
private readonly FlushBufferService _fbs;
private readonly WebSocketService _ws;
private readonly ILogger<PushService> _logger;
private readonly FirebaseSender? _fcm;
@@ -20,6 +22,7 @@ public class PushService
public PushService(
IConfiguration config,
AppDatabase db,
FlushBufferService fbs,
WebSocketService ws,
IHttpClientFactory httpFactory,
ILogger<PushService> logger
@@ -50,6 +53,7 @@ public class PushService
}
_db = db;
_fbs = fbs;
_ws = ws;
_logger = logger;
}
@@ -112,11 +116,12 @@ public class PushService
string? title = null,
string? subtitle = null,
string? content = null,
Dictionary<string, object?> meta = null,
Dictionary<string, object?>? meta = null,
string? actionUri = null,
bool isSilent = false,
bool save = true)
{
meta ??= [];
if (title is null && subtitle is null && content is null)
throw new ArgumentException("Unable to send notification that completely empty.");
@@ -134,10 +139,7 @@ public class PushService
};
if (save)
{
_db.Add(notification);
await _db.SaveChangesAsync();
}
_fbs.Enqueue(notification);
if (!isSilent) _ = DeliveryNotification(notification);
}
@@ -174,8 +176,7 @@ public class PushService
public async Task SendNotificationBatch(Notification notification, List<Guid> accounts, bool save = false)
{
if (save)
{
var notifications = accounts.Select(x =>
accounts.ForEach(x =>
{
var newNotification = new Notification
{
@@ -187,10 +188,8 @@ public class PushService
Priority = notification.Priority,
AccountId = x
};
return newNotification;
}).ToList();
await _db.BulkInsertAsync(notifications);
}
_fbs.Enqueue(newNotification);
});
_logger.LogInformation(
"Delivering notification in batch: {NotificationTopic} #{NotificationId} with meta {NotificationMeta}",

View File

@@ -24,7 +24,6 @@ builder.Services.AddAppFlushHandlers();
// Add business services
builder.Services.AddAppBusinessServices();
builder.Services.AddPushServices(builder.Configuration);
// Add scheduled jobs
builder.Services.AddAppScheduledJobs();
@@ -44,4 +43,4 @@ app.ConfigureAppMiddleware(builder.Configuration);
// Configure gRPC
app.ConfigureGrpcServices();
app.Run();
app.Run();

View File

@@ -1,3 +1,4 @@
using DysonNetwork.Pusher.Notification;
using Quartz;
namespace DysonNetwork.Pusher.Startup;
@@ -14,6 +15,13 @@ public static class ScheduledJobsConfiguration
.ForJob(appDatabaseRecyclingJob)
.WithIdentity("AppDatabaseRecyclingTrigger")
.WithCronSchedule("0 0 0 * * ?"));
var notificationFlushJob = new JobKey("NotificationFlush");
q.AddJob<NotificationFlushJob>(opts => opts.WithIdentity(notificationFlushJob));
q.AddTrigger(opts => opts
.ForJob(notificationFlushJob)
.WithIdentity("NotificationFlushTrigger")
.WithSimpleSchedule(a => a.WithIntervalInSeconds(60).RepeatForever()));
});
services.AddQuartzHostedService(q => q.WaitForJobsToComplete = true);

View File

@@ -38,7 +38,7 @@ public static class ServiceCollectionExtensions
options.MaxReceiveMessageSize = 16 * 1024 * 1024; // 16MB
options.MaxSendMessageSize = 16 * 1024 * 1024; // 16MB
});
// Register gRPC reflection for service discovery
services.AddGrpc();
@@ -127,6 +127,7 @@ public static class ServiceCollectionExtensions
public static IServiceCollection AddAppFlushHandlers(this IServiceCollection services)
{
services.AddSingleton<FlushBufferService>();
services.AddScoped<NotificationFlushHandler>();
return services;
}
@@ -139,13 +140,4 @@ public static class ServiceCollectionExtensions
return services;
}
public static void AddPushServices(this IServiceCollection services, IConfiguration configuration)
{
services.Configure<ApnSettings>(configuration.GetSection("PushNotify:Apple"));
services.AddHttpClient<ApnSender>();
services.Configure<FirebaseSettings>(configuration.GetSection("PushNotify:Firebase"));
services.AddHttpClient<FirebaseSettings>();
}
}
}