Compare commits

...

3 Commits

Author SHA1 Message Date
51697c31cb ♻️ Refactor notification meta 2025-07-29 23:23:40 +08:00
409c83b030 Putting back the view mark flush handler 2025-07-29 23:15:11 +08:00
acb293ec8f 🐛 Fix chat websocket packet 2025-07-29 23:08:41 +08:00
8 changed files with 74 additions and 16 deletions

View File

@@ -362,9 +362,12 @@ public class SubscriptionService(
Topic = "subscriptions.begun", Topic = "subscriptions.begun",
Title = localizer["SubscriptionAppliedTitle", humanReadableName], Title = localizer["SubscriptionAppliedTitle", humanReadableName],
Body = localizer["SubscriptionAppliedBody", duration, humanReadableName], Body = localizer["SubscriptionAppliedBody", duration, humanReadableName],
Meta = GrpcTypeHelper.ConvertObjectToByteString(new Dictionary<string, object>
{
["subscription_id"] = subscription.Id.ToString()
}),
IsSavable = true IsSavable = true
}; };
notification.Meta.Add("subscription_id", Value.ForString(subscription.Id.ToString()));
await pusher.SendPushNotificationToUserAsync( await pusher.SendPushNotificationToUserAsync(
new SendPushNotificationToUserRequest new SendPushNotificationToUserRequest
{ {

View File

@@ -91,7 +91,7 @@ public class PusherServiceGrpc(
request.Notification.Title, request.Notification.Title,
request.Notification.Subtitle, request.Notification.Subtitle,
request.Notification.Body, request.Notification.Body,
GrpcTypeHelper.ConvertFromValueMap(request.Notification.Meta), GrpcTypeHelper.ConvertByteStringToObject<Dictionary<string, object?>>(request.Notification.Meta) ?? [],
request.Notification.ActionUri, request.Notification.ActionUri,
request.Notification.IsSilent, request.Notification.IsSilent,
request.Notification.IsSavable request.Notification.IsSavable
@@ -108,7 +108,8 @@ public class PusherServiceGrpc(
Title = request.Notification.Title, Title = request.Notification.Title,
Subtitle = request.Notification.Subtitle, Subtitle = request.Notification.Subtitle,
Content = request.Notification.Body, Content = request.Notification.Body,
Meta = GrpcTypeHelper.ConvertFromValueMap(request.Notification.Meta) Meta = GrpcTypeHelper.ConvertByteStringToObject<Dictionary<string, object?>>(request.Notification.Meta) ??
[]
}; };
if (request.Notification.ActionUri is not null) if (request.Notification.ActionUri is not null)
notification.Meta["action_uri"] = request.Notification.ActionUri; notification.Meta["action_uri"] = request.Notification.ActionUri;

View File

@@ -85,7 +85,7 @@ message PushNotification {
string title = 2; string title = 2;
string subtitle = 3; string subtitle = 3;
string body = 4; string body = 4;
map<string, google.protobuf.Value> meta = 5; bytes meta = 5;
optional string action_uri = 6; optional string action_uri = 6;
bool is_silent = 7; bool is_silent = 7;
bool is_savable = 8; bool is_savable = 8;

View File

@@ -241,10 +241,10 @@ public partial class ChatService(
Body = !string.IsNullOrEmpty(message.Content) Body = !string.IsNullOrEmpty(message.Content)
? message.Content[..Math.Min(message.Content.Length, 100)] ? message.Content[..Math.Min(message.Content.Length, 100)]
: "<no content>", : "<no content>",
Meta = GrpcTypeHelper.ConvertObjectToByteString(metaDict),
ActionUri = $"/chat/{room.Id}", ActionUri = $"/chat/{room.Id}",
IsSavable = false, IsSavable = false,
}; };
notification.Meta.Add(GrpcTypeHelper.ConvertToValueMap(metaDict));
List<Account> accountsToNotify = []; List<Account> accountsToNotify = [];
foreach (var member in members) foreach (var member in members)
@@ -254,7 +254,7 @@ public partial class ChatService(
Packet = new WebSocketPacket Packet = new WebSocketPacket
{ {
Type = type, Type = type,
Data = GrpcTypeHelper.ConvertObjectToByteString(metaDict), Data = GrpcTypeHelper.ConvertObjectToByteString(message),
}, },
}); });

View File

@@ -0,0 +1,52 @@
using DysonNetwork.Shared.Cache;
using Microsoft.EntityFrameworkCore;
using Quartz;
namespace DysonNetwork.Sphere.Post;
public class PostViewFlushHandler(IServiceProvider serviceProvider) : IFlushHandler<PostViewInfo>
{
public async Task FlushAsync(IReadOnlyList<Sphere.Post.PostViewInfo> items)
{
using var scope = serviceProvider.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<AppDatabase>();
var cache = scope.ServiceProvider.GetRequiredService<ICacheService>();
// Group views by post
var postViews = items
.GroupBy(x => x.PostId)
.ToDictionary(g => g.Key, g => g.ToList());
// Calculate total views and unique views per post
foreach (var postId in postViews.Keys)
{
// Calculate unique views by distinct viewer IDs (not null)
var uniqueViews = postViews[postId]
.Where(v => !string.IsNullOrEmpty(v.ViewerId))
.Select(v => v.ViewerId)
.Distinct()
.Count();
// Total views is just the count of all items for this post
var totalViews = postViews[postId].Count;
// Update the post in the database
await db.Posts
.Where(p => p.Id == postId)
.ExecuteUpdateAsync(p => p
.SetProperty(x => x.ViewsTotal, x => x.ViewsTotal + totalViews)
.SetProperty(x => x.ViewsUnique, x => x.ViewsUnique + uniqueViews));
// Invalidate any cache entries for this post
await cache.RemoveAsync($"post:{postId}");
}
}
}
public class PostViewFlushJob(FlushBufferService fbs, PostViewFlushHandler hdl) : IJob
{
public async Task Execute(IJobExecutionContext context)
{
await fbs.FlushAsync(hdl);
}
}

View File

@@ -78,10 +78,10 @@ public class PublisherSubscriptionService(
Topic = "posts.new", Topic = "posts.new",
Title = localizer["PostSubscriptionTitle", post.Publisher.Name, title], Title = localizer["PostSubscriptionTitle", post.Publisher.Name, title],
Body = message, Body = message,
Meta = GrpcTypeHelper.ConvertObjectToByteString(data),
IsSavable = true, IsSavable = true,
ActionUri = $"/posts/{post.Id}" ActionUri = $"/posts/{post.Id}"
}; };
notification.Meta.Add(GrpcTypeHelper.ConvertToValueMap(data));
// Notify each subscriber // Notify each subscriber
var notifiedCount = 0; var notifiedCount = 0;

View File

@@ -1,3 +1,4 @@
using DysonNetwork.Sphere.Post;
using DysonNetwork.Sphere.WebReader; using DysonNetwork.Sphere.WebReader;
using Quartz; using Quartz;
@@ -16,15 +17,15 @@ public static class ScheduledJobsConfiguration
.WithIdentity("AppDatabaseRecyclingTrigger") .WithIdentity("AppDatabaseRecyclingTrigger")
.WithCronSchedule("0 0 0 * * ?")); .WithCronSchedule("0 0 0 * * ?"));
// var postViewFlushJob = new JobKey("PostViewFlush"); var postViewFlushJob = new JobKey("PostViewFlush");
// q.AddJob<PostViewFlushJob>(opts => opts.WithIdentity(postViewFlushJob)); q.AddJob<PostViewFlushJob>(opts => opts.WithIdentity(postViewFlushJob));
// q.AddTrigger(opts => opts q.AddTrigger(opts => opts
// .ForJob(postViewFlushJob) .ForJob(postViewFlushJob)
// .WithIdentity("PostViewFlushTrigger") .WithIdentity("PostViewFlushTrigger")
// .WithSimpleSchedule(o => o .WithSimpleSchedule(o => o
// .WithIntervalInMinutes(1) .WithIntervalInMinutes(1)
// .RepeatForever()) .RepeatForever())
// ); );
var webFeedScraperJob = new JobKey("WebFeedScraper"); var webFeedScraperJob = new JobKey("WebFeedScraper");
q.AddJob<WebFeedScraperJob>(opts => opts.WithIdentity(webFeedScraperJob)); q.AddJob<WebFeedScraperJob>(opts => opts.WithIdentity(webFeedScraperJob));

View File

@@ -143,6 +143,7 @@ public static class ServiceCollectionExtensions
public static IServiceCollection AddAppFlushHandlers(this IServiceCollection services) public static IServiceCollection AddAppFlushHandlers(this IServiceCollection services)
{ {
services.AddSingleton<FlushBufferService>(); services.AddSingleton<FlushBufferService>();
services.AddScoped<PostViewFlushHandler>();
return services; return services;
} }