Compare commits
	
		
			3 Commits
		
	
	
		
			162967e68b
			...
			51697c31cb
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 51697c31cb | |||
| 409c83b030 | |||
| acb293ec8f | 
| @@ -362,9 +362,12 @@ public class SubscriptionService( | ||||
|             Topic = "subscriptions.begun", | ||||
|             Title = localizer["SubscriptionAppliedTitle", humanReadableName], | ||||
|             Body = localizer["SubscriptionAppliedBody", duration, humanReadableName], | ||||
|             Meta = GrpcTypeHelper.ConvertObjectToByteString(new Dictionary<string, object> | ||||
|             { | ||||
|                 ["subscription_id"] = subscription.Id.ToString() | ||||
|             }), | ||||
|             IsSavable = true | ||||
|         }; | ||||
|         notification.Meta.Add("subscription_id", Value.ForString(subscription.Id.ToString())); | ||||
|         await pusher.SendPushNotificationToUserAsync( | ||||
|             new SendPushNotificationToUserRequest | ||||
|             { | ||||
|   | ||||
| @@ -91,7 +91,7 @@ public class PusherServiceGrpc( | ||||
|             request.Notification.Title, | ||||
|             request.Notification.Subtitle, | ||||
|             request.Notification.Body, | ||||
|             GrpcTypeHelper.ConvertFromValueMap(request.Notification.Meta), | ||||
|             GrpcTypeHelper.ConvertByteStringToObject<Dictionary<string, object?>>(request.Notification.Meta) ?? [], | ||||
|             request.Notification.ActionUri, | ||||
|             request.Notification.IsSilent, | ||||
|             request.Notification.IsSavable | ||||
| @@ -108,7 +108,8 @@ public class PusherServiceGrpc( | ||||
|             Title = request.Notification.Title, | ||||
|             Subtitle = request.Notification.Subtitle, | ||||
|             Content = request.Notification.Body, | ||||
|             Meta = GrpcTypeHelper.ConvertFromValueMap(request.Notification.Meta) | ||||
|             Meta = GrpcTypeHelper.ConvertByteStringToObject<Dictionary<string, object?>>(request.Notification.Meta) ?? | ||||
|                    [] | ||||
|         }; | ||||
|         if (request.Notification.ActionUri is not null) | ||||
|             notification.Meta["action_uri"] = request.Notification.ActionUri; | ||||
|   | ||||
| @@ -85,7 +85,7 @@ message PushNotification { | ||||
|   string title = 2; | ||||
|   string subtitle = 3; | ||||
|   string body = 4; | ||||
|   map<string, google.protobuf.Value> meta = 5; | ||||
|   bytes meta = 5; | ||||
|   optional string action_uri = 6; | ||||
|   bool is_silent = 7; | ||||
|   bool is_savable = 8; | ||||
|   | ||||
| @@ -241,10 +241,10 @@ public partial class ChatService( | ||||
|             Body = !string.IsNullOrEmpty(message.Content) | ||||
|                 ? message.Content[..Math.Min(message.Content.Length, 100)] | ||||
|                 : "<no content>", | ||||
|             Meta = GrpcTypeHelper.ConvertObjectToByteString(metaDict), | ||||
|             ActionUri = $"/chat/{room.Id}", | ||||
|             IsSavable = false, | ||||
|         }; | ||||
|         notification.Meta.Add(GrpcTypeHelper.ConvertToValueMap(metaDict)); | ||||
|  | ||||
|         List<Account> accountsToNotify = []; | ||||
|         foreach (var member in members) | ||||
| @@ -254,7 +254,7 @@ public partial class ChatService( | ||||
|                 Packet = new WebSocketPacket | ||||
|                 { | ||||
|                     Type = type, | ||||
|                     Data = GrpcTypeHelper.ConvertObjectToByteString(metaDict), | ||||
|                     Data = GrpcTypeHelper.ConvertObjectToByteString(message), | ||||
|                 }, | ||||
|             }); | ||||
|  | ||||
|   | ||||
							
								
								
									
										52
									
								
								DysonNetwork.Sphere/Post/PostViewFlushHandler.cs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										52
									
								
								DysonNetwork.Sphere/Post/PostViewFlushHandler.cs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,52 @@ | ||||
| using DysonNetwork.Shared.Cache; | ||||
| using Microsoft.EntityFrameworkCore; | ||||
| using Quartz; | ||||
|  | ||||
| namespace DysonNetwork.Sphere.Post; | ||||
|  | ||||
| public class PostViewFlushHandler(IServiceProvider serviceProvider) : IFlushHandler<PostViewInfo> | ||||
| { | ||||
|     public async Task FlushAsync(IReadOnlyList<Sphere.Post.PostViewInfo> items) | ||||
|     { | ||||
|         using var scope = serviceProvider.CreateScope(); | ||||
|         var db = scope.ServiceProvider.GetRequiredService<AppDatabase>(); | ||||
|         var cache = scope.ServiceProvider.GetRequiredService<ICacheService>(); | ||||
|  | ||||
|         // Group views by post | ||||
|         var postViews = items | ||||
|             .GroupBy(x => x.PostId) | ||||
|             .ToDictionary(g => g.Key, g => g.ToList()); | ||||
|  | ||||
|         // Calculate total views and unique views per post | ||||
|         foreach (var postId in postViews.Keys) | ||||
|         { | ||||
|             // Calculate unique views by distinct viewer IDs (not null) | ||||
|             var uniqueViews = postViews[postId] | ||||
|                 .Where(v => !string.IsNullOrEmpty(v.ViewerId)) | ||||
|                 .Select(v => v.ViewerId) | ||||
|                 .Distinct() | ||||
|                 .Count(); | ||||
|  | ||||
|             // Total views is just the count of all items for this post | ||||
|             var totalViews = postViews[postId].Count; | ||||
|  | ||||
|             // Update the post in the database | ||||
|             await db.Posts | ||||
|                 .Where(p => p.Id == postId) | ||||
|                 .ExecuteUpdateAsync(p => p | ||||
|                     .SetProperty(x => x.ViewsTotal, x => x.ViewsTotal + totalViews) | ||||
|                     .SetProperty(x => x.ViewsUnique, x => x.ViewsUnique + uniqueViews)); | ||||
|  | ||||
|             // Invalidate any cache entries for this post | ||||
|             await cache.RemoveAsync($"post:{postId}"); | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| public class PostViewFlushJob(FlushBufferService fbs, PostViewFlushHandler hdl) : IJob | ||||
| { | ||||
|     public async Task Execute(IJobExecutionContext context) | ||||
|     { | ||||
|         await fbs.FlushAsync(hdl); | ||||
|     } | ||||
| } | ||||
| @@ -78,10 +78,10 @@ public class PublisherSubscriptionService( | ||||
|             Topic = "posts.new", | ||||
|             Title = localizer["PostSubscriptionTitle", post.Publisher.Name, title], | ||||
|             Body = message, | ||||
|             Meta = GrpcTypeHelper.ConvertObjectToByteString(data), | ||||
|             IsSavable = true, | ||||
|             ActionUri = $"/posts/{post.Id}" | ||||
|         }; | ||||
|         notification.Meta.Add(GrpcTypeHelper.ConvertToValueMap(data)); | ||||
|  | ||||
|         // Notify each subscriber | ||||
|         var notifiedCount = 0; | ||||
|   | ||||
| @@ -1,3 +1,4 @@ | ||||
| using DysonNetwork.Sphere.Post; | ||||
| using DysonNetwork.Sphere.WebReader; | ||||
| using Quartz; | ||||
|  | ||||
| @@ -16,15 +17,15 @@ public static class ScheduledJobsConfiguration | ||||
|                 .WithIdentity("AppDatabaseRecyclingTrigger") | ||||
|                 .WithCronSchedule("0 0 0 * * ?")); | ||||
|  | ||||
|             // var postViewFlushJob = new JobKey("PostViewFlush"); | ||||
|             // q.AddJob<PostViewFlushJob>(opts => opts.WithIdentity(postViewFlushJob)); | ||||
|             // q.AddTrigger(opts => opts | ||||
|             //     .ForJob(postViewFlushJob) | ||||
|             //     .WithIdentity("PostViewFlushTrigger") | ||||
|             //     .WithSimpleSchedule(o => o | ||||
|             //         .WithIntervalInMinutes(1) | ||||
|             //         .RepeatForever()) | ||||
|             // ); | ||||
|             var postViewFlushJob = new JobKey("PostViewFlush"); | ||||
|             q.AddJob<PostViewFlushJob>(opts => opts.WithIdentity(postViewFlushJob)); | ||||
|             q.AddTrigger(opts => opts | ||||
|                 .ForJob(postViewFlushJob) | ||||
|                 .WithIdentity("PostViewFlushTrigger") | ||||
|                 .WithSimpleSchedule(o => o | ||||
|                     .WithIntervalInMinutes(1) | ||||
|                     .RepeatForever()) | ||||
|             ); | ||||
|  | ||||
|             var webFeedScraperJob = new JobKey("WebFeedScraper"); | ||||
|             q.AddJob<WebFeedScraperJob>(opts => opts.WithIdentity(webFeedScraperJob)); | ||||
|   | ||||
| @@ -143,6 +143,7 @@ public static class ServiceCollectionExtensions | ||||
|     public static IServiceCollection AddAppFlushHandlers(this IServiceCollection services) | ||||
|     { | ||||
|         services.AddSingleton<FlushBufferService>(); | ||||
|         services.AddScoped<PostViewFlushHandler>(); | ||||
|  | ||||
|         return services; | ||||
|     } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user