diff --git a/DysonNetwork.Pass/Handlers/ActionLogFlushHandler.cs b/DysonNetwork.Pass/Handlers/ActionLogFlushHandler.cs index 1534a64..97bcec9 100644 --- a/DysonNetwork.Pass/Handlers/ActionLogFlushHandler.cs +++ b/DysonNetwork.Pass/Handlers/ActionLogFlushHandler.cs @@ -12,7 +12,7 @@ public class ActionLogFlushHandler(IServiceProvider serviceProvider) : IFlushHan { using var scope = serviceProvider.CreateScope(); var db = scope.ServiceProvider.GetRequiredService(); - + await db.BulkInsertAsync(items.Select(x => { x.CreatedAt = SystemClock.Instance.GetCurrentInstant(); @@ -28,4 +28,4 @@ public class ActionLogFlushJob(FlushBufferService fbs, ActionLogFlushHandler hdl { await fbs.FlushAsync(hdl); } -} \ No newline at end of file +} diff --git a/DysonNetwork.Pusher/Notification/NotificationFlushHandler.cs b/DysonNetwork.Pusher/Notification/NotificationFlushHandler.cs new file mode 100644 index 0000000..0ede414 --- /dev/null +++ b/DysonNetwork.Pusher/Notification/NotificationFlushHandler.cs @@ -0,0 +1,28 @@ +using DysonNetwork.Shared.Cache; +using EFCore.BulkExtensions; +using NodaTime; +using Quartz; + +namespace DysonNetwork.Pusher.Notification; + +public class NotificationFlushHandler(AppDatabase db, Logger logger) : IFlushHandler +{ + public async Task FlushAsync(IReadOnlyList items) + { + await db.BulkInsertAsync(items.Select(x => + { + x.CreatedAt = SystemClock.Instance.GetCurrentInstant(); + x.UpdatedAt = x.CreatedAt; + return x; + }), config => config.ConflictOption = ConflictOption.Ignore); + logger.LogInformation("Stored {Count} notifications", items.Count); + } +} + +public class NotificationFlushJob(FlushBufferService fbs, NotificationFlushHandler hdl) : IJob +{ + public async Task Execute(IJobExecutionContext context) + { + await fbs.FlushAsync(hdl); + } +} diff --git a/DysonNetwork.Pusher/Notification/PushService.cs b/DysonNetwork.Pusher/Notification/PushService.cs index e546ad8..c277aca 100644 --- a/DysonNetwork.Pusher/Notification/PushService.cs +++ b/DysonNetwork.Pusher/Notification/PushService.cs @@ -1,6 +1,7 @@ using CorePush.Apple; using CorePush.Firebase; using DysonNetwork.Pusher.Connection; +using DysonNetwork.Shared.Cache; using DysonNetwork.Shared.Proto; using EFCore.BulkExtensions; using Microsoft.EntityFrameworkCore; @@ -11,6 +12,7 @@ namespace DysonNetwork.Pusher.Notification; public class PushService { private readonly AppDatabase _db; + private readonly FlushBufferService _fbs; private readonly WebSocketService _ws; private readonly ILogger _logger; private readonly FirebaseSender? _fcm; @@ -20,6 +22,7 @@ public class PushService public PushService( IConfiguration config, AppDatabase db, + FlushBufferService fbs, WebSocketService ws, IHttpClientFactory httpFactory, ILogger logger @@ -50,6 +53,7 @@ public class PushService } _db = db; + _fbs = fbs; _ws = ws; _logger = logger; } @@ -112,11 +116,12 @@ public class PushService string? title = null, string? subtitle = null, string? content = null, - Dictionary meta = null, + Dictionary? meta = null, string? actionUri = null, bool isSilent = false, bool save = true) { + meta ??= []; if (title is null && subtitle is null && content is null) throw new ArgumentException("Unable to send notification that completely empty."); @@ -134,10 +139,7 @@ public class PushService }; if (save) - { - _db.Add(notification); - await _db.SaveChangesAsync(); - } + _fbs.Enqueue(notification); if (!isSilent) _ = DeliveryNotification(notification); } @@ -174,8 +176,7 @@ public class PushService public async Task SendNotificationBatch(Notification notification, List accounts, bool save = false) { if (save) - { - var notifications = accounts.Select(x => + accounts.ForEach(x => { var newNotification = new Notification { @@ -187,10 +188,8 @@ public class PushService Priority = notification.Priority, AccountId = x }; - return newNotification; - }).ToList(); - await _db.BulkInsertAsync(notifications); - } + _fbs.Enqueue(newNotification); + }); _logger.LogInformation( "Delivering notification in batch: {NotificationTopic} #{NotificationId} with meta {NotificationMeta}", diff --git a/DysonNetwork.Pusher/Program.cs b/DysonNetwork.Pusher/Program.cs index 3c327df..d3134ca 100644 --- a/DysonNetwork.Pusher/Program.cs +++ b/DysonNetwork.Pusher/Program.cs @@ -24,7 +24,6 @@ builder.Services.AddAppFlushHandlers(); // Add business services builder.Services.AddAppBusinessServices(); -builder.Services.AddPushServices(builder.Configuration); // Add scheduled jobs builder.Services.AddAppScheduledJobs(); @@ -44,4 +43,4 @@ app.ConfigureAppMiddleware(builder.Configuration); // Configure gRPC app.ConfigureGrpcServices(); -app.Run(); \ No newline at end of file +app.Run(); diff --git a/DysonNetwork.Pusher/Startup/ScheduledJobsConfiguration.cs b/DysonNetwork.Pusher/Startup/ScheduledJobsConfiguration.cs index 6a9343a..239ada7 100644 --- a/DysonNetwork.Pusher/Startup/ScheduledJobsConfiguration.cs +++ b/DysonNetwork.Pusher/Startup/ScheduledJobsConfiguration.cs @@ -1,3 +1,4 @@ +using DysonNetwork.Pusher.Notification; using Quartz; namespace DysonNetwork.Pusher.Startup; @@ -14,6 +15,13 @@ public static class ScheduledJobsConfiguration .ForJob(appDatabaseRecyclingJob) .WithIdentity("AppDatabaseRecyclingTrigger") .WithCronSchedule("0 0 0 * * ?")); + + var notificationFlushJob = new JobKey("NotificationFlush"); + q.AddJob(opts => opts.WithIdentity(notificationFlushJob)); + q.AddTrigger(opts => opts + .ForJob(notificationFlushJob) + .WithIdentity("NotificationFlushTrigger") + .WithSimpleSchedule(a => a.WithIntervalInSeconds(60).RepeatForever())); }); services.AddQuartzHostedService(q => q.WaitForJobsToComplete = true); diff --git a/DysonNetwork.Pusher/Startup/ServiceCollectionExtensions.cs b/DysonNetwork.Pusher/Startup/ServiceCollectionExtensions.cs index b24311f..8104d3e 100644 --- a/DysonNetwork.Pusher/Startup/ServiceCollectionExtensions.cs +++ b/DysonNetwork.Pusher/Startup/ServiceCollectionExtensions.cs @@ -38,7 +38,7 @@ public static class ServiceCollectionExtensions options.MaxReceiveMessageSize = 16 * 1024 * 1024; // 16MB options.MaxSendMessageSize = 16 * 1024 * 1024; // 16MB }); - + // Register gRPC reflection for service discovery services.AddGrpc(); @@ -127,6 +127,7 @@ public static class ServiceCollectionExtensions public static IServiceCollection AddAppFlushHandlers(this IServiceCollection services) { services.AddSingleton(); + services.AddScoped(); return services; } @@ -139,13 +140,4 @@ public static class ServiceCollectionExtensions return services; } - - public static void AddPushServices(this IServiceCollection services, IConfiguration configuration) - { - services.Configure(configuration.GetSection("PushNotify:Apple")); - services.AddHttpClient(); - - services.Configure(configuration.GetSection("PushNotify:Firebase")); - services.AddHttpClient(); - } -} \ No newline at end of file +}