Compare commits
	
		
			2 Commits
		
	
	
		
			7ec3f25d43
			...
			b04b17c8ae
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| b04b17c8ae | |||
| b037ecad79 | 
| @@ -12,7 +12,7 @@ public class ActionLogFlushHandler(IServiceProvider serviceProvider) : IFlushHan | |||||||
|     { |     { | ||||||
|         using var scope = serviceProvider.CreateScope(); |         using var scope = serviceProvider.CreateScope(); | ||||||
|         var db = scope.ServiceProvider.GetRequiredService<AppDatabase>(); |         var db = scope.ServiceProvider.GetRequiredService<AppDatabase>(); | ||||||
|          |  | ||||||
|         await db.BulkInsertAsync(items.Select(x => |         await db.BulkInsertAsync(items.Select(x => | ||||||
|         { |         { | ||||||
|             x.CreatedAt = SystemClock.Instance.GetCurrentInstant(); |             x.CreatedAt = SystemClock.Instance.GetCurrentInstant(); | ||||||
| @@ -28,4 +28,4 @@ public class ActionLogFlushJob(FlushBufferService fbs, ActionLogFlushHandler hdl | |||||||
|     { |     { | ||||||
|         await fbs.FlushAsync(hdl); |         await fbs.FlushAsync(hdl); | ||||||
|     } |     } | ||||||
| } | } | ||||||
|   | |||||||
| @@ -57,7 +57,7 @@ public class WebSocketController(WebSocketService ws, ILogger<WebSocketContext> | |||||||
|             return; |             return; | ||||||
|         } |         } | ||||||
|  |  | ||||||
|         logger.LogInformation( |         logger.LogDebug( | ||||||
|             $"Connection established with user @{currentUser.Name}#{currentUser.Id} and device #{deviceId}"); |             $"Connection established with user @{currentUser.Name}#{currentUser.Id} and device #{deviceId}"); | ||||||
|  |  | ||||||
|         try |         try | ||||||
| @@ -66,12 +66,13 @@ public class WebSocketController(WebSocketService ws, ILogger<WebSocketContext> | |||||||
|         } |         } | ||||||
|         catch (Exception ex) |         catch (Exception ex) | ||||||
|         { |         { | ||||||
|             Console.WriteLine($"WebSocket Error: {ex.Message}"); |             if (ex is not WebSocketException) | ||||||
|  |                 logger.LogError(ex, "WebSocket disconnected with user @{UserName}#{UserId} and device #{DeviceId} unexpectedly"); | ||||||
|         } |         } | ||||||
|         finally |         finally | ||||||
|         { |         { | ||||||
|             ws.Disconnect(connectionKey); |             ws.Disconnect(connectionKey); | ||||||
|             logger.LogInformation( |             logger.LogDebug( | ||||||
|                 $"Connection disconnected with user @{currentUser.Name}#{currentUser.Id} and device #{deviceId}" |                 $"Connection disconnected with user @{currentUser.Name}#{currentUser.Id} and device #{deviceId}" | ||||||
|             ); |             ); | ||||||
|         } |         } | ||||||
| @@ -114,4 +115,4 @@ public class WebSocketController(WebSocketService ws, ILogger<WebSocketContext> | |||||||
|             } |             } | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| } | } | ||||||
|   | |||||||
							
								
								
									
										28
									
								
								DysonNetwork.Pusher/Notification/NotificationFlushHandler.cs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										28
									
								
								DysonNetwork.Pusher/Notification/NotificationFlushHandler.cs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,28 @@ | |||||||
|  | using DysonNetwork.Shared.Cache; | ||||||
|  | using EFCore.BulkExtensions; | ||||||
|  | using NodaTime; | ||||||
|  | using Quartz; | ||||||
|  |  | ||||||
|  | namespace DysonNetwork.Pusher.Notification; | ||||||
|  |  | ||||||
|  | public class NotificationFlushHandler(AppDatabase db, Logger<NotificationFlushHandler> logger) : IFlushHandler<Notification> | ||||||
|  | { | ||||||
|  |     public async Task FlushAsync(IReadOnlyList<Notification> items) | ||||||
|  |     { | ||||||
|  |         await db.BulkInsertAsync(items.Select(x => | ||||||
|  |         { | ||||||
|  |             x.CreatedAt = SystemClock.Instance.GetCurrentInstant(); | ||||||
|  |             x.UpdatedAt = x.CreatedAt; | ||||||
|  |             return x; | ||||||
|  |         }), config => config.ConflictOption = ConflictOption.Ignore); | ||||||
|  |         logger.LogInformation("Stored {Count} notifications", items.Count); | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | public class NotificationFlushJob(FlushBufferService fbs, NotificationFlushHandler hdl) : IJob | ||||||
|  | { | ||||||
|  |     public async Task Execute(IJobExecutionContext context) | ||||||
|  |     { | ||||||
|  |         await fbs.FlushAsync(hdl); | ||||||
|  |     } | ||||||
|  | } | ||||||
| @@ -1,6 +1,7 @@ | |||||||
| using CorePush.Apple; | using CorePush.Apple; | ||||||
| using CorePush.Firebase; | using CorePush.Firebase; | ||||||
| using DysonNetwork.Pusher.Connection; | using DysonNetwork.Pusher.Connection; | ||||||
|  | using DysonNetwork.Shared.Cache; | ||||||
| using DysonNetwork.Shared.Proto; | using DysonNetwork.Shared.Proto; | ||||||
| using EFCore.BulkExtensions; | using EFCore.BulkExtensions; | ||||||
| using Microsoft.EntityFrameworkCore; | using Microsoft.EntityFrameworkCore; | ||||||
| @@ -11,6 +12,7 @@ namespace DysonNetwork.Pusher.Notification; | |||||||
| public class PushService | public class PushService | ||||||
| { | { | ||||||
|     private readonly AppDatabase _db; |     private readonly AppDatabase _db; | ||||||
|  |     private readonly FlushBufferService _fbs; | ||||||
|     private readonly WebSocketService _ws; |     private readonly WebSocketService _ws; | ||||||
|     private readonly ILogger<PushService> _logger; |     private readonly ILogger<PushService> _logger; | ||||||
|     private readonly FirebaseSender? _fcm; |     private readonly FirebaseSender? _fcm; | ||||||
| @@ -20,6 +22,7 @@ public class PushService | |||||||
|     public PushService( |     public PushService( | ||||||
|         IConfiguration config, |         IConfiguration config, | ||||||
|         AppDatabase db, |         AppDatabase db, | ||||||
|  |         FlushBufferService fbs, | ||||||
|         WebSocketService ws, |         WebSocketService ws, | ||||||
|         IHttpClientFactory httpFactory, |         IHttpClientFactory httpFactory, | ||||||
|         ILogger<PushService> logger |         ILogger<PushService> logger | ||||||
| @@ -50,6 +53,7 @@ public class PushService | |||||||
|         } |         } | ||||||
|  |  | ||||||
|         _db = db; |         _db = db; | ||||||
|  |         _fbs = fbs; | ||||||
|         _ws = ws; |         _ws = ws; | ||||||
|         _logger = logger; |         _logger = logger; | ||||||
|     } |     } | ||||||
| @@ -112,11 +116,12 @@ public class PushService | |||||||
|         string? title = null, |         string? title = null, | ||||||
|         string? subtitle = null, |         string? subtitle = null, | ||||||
|         string? content = null, |         string? content = null, | ||||||
|         Dictionary<string, object?> meta = null, |         Dictionary<string, object?>? meta = null, | ||||||
|         string? actionUri = null, |         string? actionUri = null, | ||||||
|         bool isSilent = false, |         bool isSilent = false, | ||||||
|         bool save = true) |         bool save = true) | ||||||
|     { |     { | ||||||
|  |         meta ??= []; | ||||||
|         if (title is null && subtitle is null && content is null) |         if (title is null && subtitle is null && content is null) | ||||||
|             throw new ArgumentException("Unable to send notification that completely empty."); |             throw new ArgumentException("Unable to send notification that completely empty."); | ||||||
|  |  | ||||||
| @@ -134,10 +139,7 @@ public class PushService | |||||||
|         }; |         }; | ||||||
|  |  | ||||||
|         if (save) |         if (save) | ||||||
|         { |             _fbs.Enqueue(notification); | ||||||
|             _db.Add(notification); |  | ||||||
|             await _db.SaveChangesAsync(); |  | ||||||
|         } |  | ||||||
|  |  | ||||||
|         if (!isSilent) _ = DeliveryNotification(notification); |         if (!isSilent) _ = DeliveryNotification(notification); | ||||||
|     } |     } | ||||||
| @@ -174,8 +176,7 @@ public class PushService | |||||||
|     public async Task SendNotificationBatch(Notification notification, List<Guid> accounts, bool save = false) |     public async Task SendNotificationBatch(Notification notification, List<Guid> accounts, bool save = false) | ||||||
|     { |     { | ||||||
|         if (save) |         if (save) | ||||||
|         { |             accounts.ForEach(x => | ||||||
|             var notifications = accounts.Select(x => |  | ||||||
|             { |             { | ||||||
|                 var newNotification = new Notification |                 var newNotification = new Notification | ||||||
|                 { |                 { | ||||||
| @@ -187,11 +188,9 @@ public class PushService | |||||||
|                     Priority = notification.Priority, |                     Priority = notification.Priority, | ||||||
|                     AccountId = x |                     AccountId = x | ||||||
|                 }; |                 }; | ||||||
|                 return newNotification; |                 _fbs.Enqueue(newNotification); | ||||||
|             }).ToList(); |             }); | ||||||
|             await _db.BulkInsertAsync(notifications); |  | ||||||
|         } |  | ||||||
|          |  | ||||||
|         _logger.LogInformation( |         _logger.LogInformation( | ||||||
|             "Delivering notification in batch: {NotificationTopic} #{NotificationId} with meta {NotificationMeta}", |             "Delivering notification in batch: {NotificationTopic} #{NotificationId} with meta {NotificationMeta}", | ||||||
|             notification.Topic, |             notification.Topic, | ||||||
| @@ -304,6 +303,6 @@ public class PushService | |||||||
|         } |         } | ||||||
|  |  | ||||||
|         _logger.LogInformation( |         _logger.LogInformation( | ||||||
|             $"Successfully pushed notification #{notification.Id} to device {subscription.DeviceId}"); |             $"Successfully pushed notification #{notification.Id} to device {subscription.DeviceId} provider {subscription.Provider}"); | ||||||
|     } |     } | ||||||
| } | } | ||||||
|   | |||||||
| @@ -24,7 +24,6 @@ builder.Services.AddAppFlushHandlers(); | |||||||
|  |  | ||||||
| // Add business services | // Add business services | ||||||
| builder.Services.AddAppBusinessServices(); | builder.Services.AddAppBusinessServices(); | ||||||
| builder.Services.AddPushServices(builder.Configuration); |  | ||||||
|  |  | ||||||
| // Add scheduled jobs | // Add scheduled jobs | ||||||
| builder.Services.AddAppScheduledJobs(); | builder.Services.AddAppScheduledJobs(); | ||||||
| @@ -44,4 +43,4 @@ app.ConfigureAppMiddleware(builder.Configuration); | |||||||
| // Configure gRPC | // Configure gRPC | ||||||
| app.ConfigureGrpcServices(); | app.ConfigureGrpcServices(); | ||||||
|  |  | ||||||
| app.Run(); | app.Run(); | ||||||
|   | |||||||
| @@ -1,3 +1,4 @@ | |||||||
|  | using DysonNetwork.Pusher.Notification; | ||||||
| using Quartz; | using Quartz; | ||||||
|  |  | ||||||
| namespace DysonNetwork.Pusher.Startup; | namespace DysonNetwork.Pusher.Startup; | ||||||
| @@ -14,6 +15,13 @@ public static class ScheduledJobsConfiguration | |||||||
|                 .ForJob(appDatabaseRecyclingJob) |                 .ForJob(appDatabaseRecyclingJob) | ||||||
|                 .WithIdentity("AppDatabaseRecyclingTrigger") |                 .WithIdentity("AppDatabaseRecyclingTrigger") | ||||||
|                 .WithCronSchedule("0 0 0 * * ?")); |                 .WithCronSchedule("0 0 0 * * ?")); | ||||||
|  |  | ||||||
|  |             var notificationFlushJob = new JobKey("NotificationFlush"); | ||||||
|  |             q.AddJob<NotificationFlushJob>(opts => opts.WithIdentity(notificationFlushJob)); | ||||||
|  |             q.AddTrigger(opts => opts | ||||||
|  |                 .ForJob(notificationFlushJob) | ||||||
|  |                 .WithIdentity("NotificationFlushTrigger") | ||||||
|  |                 .WithSimpleSchedule(a => a.WithIntervalInSeconds(60).RepeatForever())); | ||||||
|         }); |         }); | ||||||
|         services.AddQuartzHostedService(q => q.WaitForJobsToComplete = true); |         services.AddQuartzHostedService(q => q.WaitForJobsToComplete = true); | ||||||
|  |  | ||||||
|   | |||||||
| @@ -38,7 +38,7 @@ public static class ServiceCollectionExtensions | |||||||
|             options.MaxReceiveMessageSize = 16 * 1024 * 1024; // 16MB |             options.MaxReceiveMessageSize = 16 * 1024 * 1024; // 16MB | ||||||
|             options.MaxSendMessageSize = 16 * 1024 * 1024; // 16MB |             options.MaxSendMessageSize = 16 * 1024 * 1024; // 16MB | ||||||
|         }); |         }); | ||||||
|          |  | ||||||
|         // Register gRPC reflection for service discovery |         // Register gRPC reflection for service discovery | ||||||
|         services.AddGrpc(); |         services.AddGrpc(); | ||||||
|  |  | ||||||
| @@ -127,6 +127,7 @@ public static class ServiceCollectionExtensions | |||||||
|     public static IServiceCollection AddAppFlushHandlers(this IServiceCollection services) |     public static IServiceCollection AddAppFlushHandlers(this IServiceCollection services) | ||||||
|     { |     { | ||||||
|         services.AddSingleton<FlushBufferService>(); |         services.AddSingleton<FlushBufferService>(); | ||||||
|  |         services.AddScoped<NotificationFlushHandler>(); | ||||||
|  |  | ||||||
|         return services; |         return services; | ||||||
|     } |     } | ||||||
| @@ -139,13 +140,4 @@ public static class ServiceCollectionExtensions | |||||||
|  |  | ||||||
|         return services; |         return services; | ||||||
|     } |     } | ||||||
|      | } | ||||||
|     public static void AddPushServices(this IServiceCollection services, IConfiguration configuration) |  | ||||||
|     { |  | ||||||
|         services.Configure<ApnSettings>(configuration.GetSection("PushNotify:Apple")); |  | ||||||
|         services.AddHttpClient<ApnSender>(); |  | ||||||
|  |  | ||||||
|         services.Configure<FirebaseSettings>(configuration.GetSection("PushNotify:Firebase")); |  | ||||||
|         services.AddHttpClient<FirebaseSettings>(); |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user