Compare commits
	
		
			2 Commits
		
	
	
		
			7ec3f25d43
			...
			b04b17c8ae
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| b04b17c8ae | |||
| b037ecad79 | 
| @@ -57,7 +57,7 @@ public class WebSocketController(WebSocketService ws, ILogger<WebSocketContext> | ||||
|             return; | ||||
|         } | ||||
|  | ||||
|         logger.LogInformation( | ||||
|         logger.LogDebug( | ||||
|             $"Connection established with user @{currentUser.Name}#{currentUser.Id} and device #{deviceId}"); | ||||
|  | ||||
|         try | ||||
| @@ -66,12 +66,13 @@ public class WebSocketController(WebSocketService ws, ILogger<WebSocketContext> | ||||
|         } | ||||
|         catch (Exception ex) | ||||
|         { | ||||
|             Console.WriteLine($"WebSocket Error: {ex.Message}"); | ||||
|             if (ex is not WebSocketException) | ||||
|                 logger.LogError(ex, "WebSocket disconnected with user @{UserName}#{UserId} and device #{DeviceId} unexpectedly"); | ||||
|         } | ||||
|         finally | ||||
|         { | ||||
|             ws.Disconnect(connectionKey); | ||||
|             logger.LogInformation( | ||||
|             logger.LogDebug( | ||||
|                 $"Connection disconnected with user @{currentUser.Name}#{currentUser.Id} and device #{deviceId}" | ||||
|             ); | ||||
|         } | ||||
|   | ||||
							
								
								
									
										28
									
								
								DysonNetwork.Pusher/Notification/NotificationFlushHandler.cs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										28
									
								
								DysonNetwork.Pusher/Notification/NotificationFlushHandler.cs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,28 @@ | ||||
| using DysonNetwork.Shared.Cache; | ||||
| using EFCore.BulkExtensions; | ||||
| using NodaTime; | ||||
| using Quartz; | ||||
|  | ||||
| namespace DysonNetwork.Pusher.Notification; | ||||
|  | ||||
| public class NotificationFlushHandler(AppDatabase db, Logger<NotificationFlushHandler> logger) : IFlushHandler<Notification> | ||||
| { | ||||
|     public async Task FlushAsync(IReadOnlyList<Notification> items) | ||||
|     { | ||||
|         await db.BulkInsertAsync(items.Select(x => | ||||
|         { | ||||
|             x.CreatedAt = SystemClock.Instance.GetCurrentInstant(); | ||||
|             x.UpdatedAt = x.CreatedAt; | ||||
|             return x; | ||||
|         }), config => config.ConflictOption = ConflictOption.Ignore); | ||||
|         logger.LogInformation("Stored {Count} notifications", items.Count); | ||||
|     } | ||||
| } | ||||
|  | ||||
| public class NotificationFlushJob(FlushBufferService fbs, NotificationFlushHandler hdl) : IJob | ||||
| { | ||||
|     public async Task Execute(IJobExecutionContext context) | ||||
|     { | ||||
|         await fbs.FlushAsync(hdl); | ||||
|     } | ||||
| } | ||||
| @@ -1,6 +1,7 @@ | ||||
| using CorePush.Apple; | ||||
| using CorePush.Firebase; | ||||
| using DysonNetwork.Pusher.Connection; | ||||
| using DysonNetwork.Shared.Cache; | ||||
| using DysonNetwork.Shared.Proto; | ||||
| using EFCore.BulkExtensions; | ||||
| using Microsoft.EntityFrameworkCore; | ||||
| @@ -11,6 +12,7 @@ namespace DysonNetwork.Pusher.Notification; | ||||
| public class PushService | ||||
| { | ||||
|     private readonly AppDatabase _db; | ||||
|     private readonly FlushBufferService _fbs; | ||||
|     private readonly WebSocketService _ws; | ||||
|     private readonly ILogger<PushService> _logger; | ||||
|     private readonly FirebaseSender? _fcm; | ||||
| @@ -20,6 +22,7 @@ public class PushService | ||||
|     public PushService( | ||||
|         IConfiguration config, | ||||
|         AppDatabase db, | ||||
|         FlushBufferService fbs, | ||||
|         WebSocketService ws, | ||||
|         IHttpClientFactory httpFactory, | ||||
|         ILogger<PushService> logger | ||||
| @@ -50,6 +53,7 @@ public class PushService | ||||
|         } | ||||
|  | ||||
|         _db = db; | ||||
|         _fbs = fbs; | ||||
|         _ws = ws; | ||||
|         _logger = logger; | ||||
|     } | ||||
| @@ -112,11 +116,12 @@ public class PushService | ||||
|         string? title = null, | ||||
|         string? subtitle = null, | ||||
|         string? content = null, | ||||
|         Dictionary<string, object?> meta = null, | ||||
|         Dictionary<string, object?>? meta = null, | ||||
|         string? actionUri = null, | ||||
|         bool isSilent = false, | ||||
|         bool save = true) | ||||
|     { | ||||
|         meta ??= []; | ||||
|         if (title is null && subtitle is null && content is null) | ||||
|             throw new ArgumentException("Unable to send notification that completely empty."); | ||||
|  | ||||
| @@ -134,10 +139,7 @@ public class PushService | ||||
|         }; | ||||
|  | ||||
|         if (save) | ||||
|         { | ||||
|             _db.Add(notification); | ||||
|             await _db.SaveChangesAsync(); | ||||
|         } | ||||
|             _fbs.Enqueue(notification); | ||||
|  | ||||
|         if (!isSilent) _ = DeliveryNotification(notification); | ||||
|     } | ||||
| @@ -174,8 +176,7 @@ public class PushService | ||||
|     public async Task SendNotificationBatch(Notification notification, List<Guid> accounts, bool save = false) | ||||
|     { | ||||
|         if (save) | ||||
|         { | ||||
|             var notifications = accounts.Select(x => | ||||
|             accounts.ForEach(x => | ||||
|             { | ||||
|                 var newNotification = new Notification | ||||
|                 { | ||||
| @@ -187,10 +188,8 @@ public class PushService | ||||
|                     Priority = notification.Priority, | ||||
|                     AccountId = x | ||||
|                 }; | ||||
|                 return newNotification; | ||||
|             }).ToList(); | ||||
|             await _db.BulkInsertAsync(notifications); | ||||
|         } | ||||
|                 _fbs.Enqueue(newNotification); | ||||
|             }); | ||||
|  | ||||
|         _logger.LogInformation( | ||||
|             "Delivering notification in batch: {NotificationTopic} #{NotificationId} with meta {NotificationMeta}", | ||||
| @@ -304,6 +303,6 @@ public class PushService | ||||
|         } | ||||
|  | ||||
|         _logger.LogInformation( | ||||
|             $"Successfully pushed notification #{notification.Id} to device {subscription.DeviceId}"); | ||||
|             $"Successfully pushed notification #{notification.Id} to device {subscription.DeviceId} provider {subscription.Provider}"); | ||||
|     } | ||||
| } | ||||
| @@ -24,7 +24,6 @@ builder.Services.AddAppFlushHandlers(); | ||||
|  | ||||
| // Add business services | ||||
| builder.Services.AddAppBusinessServices(); | ||||
| builder.Services.AddPushServices(builder.Configuration); | ||||
|  | ||||
| // Add scheduled jobs | ||||
| builder.Services.AddAppScheduledJobs(); | ||||
|   | ||||
| @@ -1,3 +1,4 @@ | ||||
| using DysonNetwork.Pusher.Notification; | ||||
| using Quartz; | ||||
|  | ||||
| namespace DysonNetwork.Pusher.Startup; | ||||
| @@ -14,6 +15,13 @@ public static class ScheduledJobsConfiguration | ||||
|                 .ForJob(appDatabaseRecyclingJob) | ||||
|                 .WithIdentity("AppDatabaseRecyclingTrigger") | ||||
|                 .WithCronSchedule("0 0 0 * * ?")); | ||||
|  | ||||
|             var notificationFlushJob = new JobKey("NotificationFlush"); | ||||
|             q.AddJob<NotificationFlushJob>(opts => opts.WithIdentity(notificationFlushJob)); | ||||
|             q.AddTrigger(opts => opts | ||||
|                 .ForJob(notificationFlushJob) | ||||
|                 .WithIdentity("NotificationFlushTrigger") | ||||
|                 .WithSimpleSchedule(a => a.WithIntervalInSeconds(60).RepeatForever())); | ||||
|         }); | ||||
|         services.AddQuartzHostedService(q => q.WaitForJobsToComplete = true); | ||||
|  | ||||
|   | ||||
| @@ -127,6 +127,7 @@ public static class ServiceCollectionExtensions | ||||
|     public static IServiceCollection AddAppFlushHandlers(this IServiceCollection services) | ||||
|     { | ||||
|         services.AddSingleton<FlushBufferService>(); | ||||
|         services.AddScoped<NotificationFlushHandler>(); | ||||
|  | ||||
|         return services; | ||||
|     } | ||||
| @@ -139,13 +140,4 @@ public static class ServiceCollectionExtensions | ||||
|  | ||||
|         return services; | ||||
|     } | ||||
|      | ||||
|     public static void AddPushServices(this IServiceCollection services, IConfiguration configuration) | ||||
|     { | ||||
|         services.Configure<ApnSettings>(configuration.GetSection("PushNotify:Apple")); | ||||
|         services.AddHttpClient<ApnSender>(); | ||||
|  | ||||
|         services.Configure<FirebaseSettings>(configuration.GetSection("PushNotify:Firebase")); | ||||
|         services.AddHttpClient<FirebaseSettings>(); | ||||
|     } | ||||
| } | ||||
		Reference in New Issue
	
	Block a user