♻️ Refactor the notification service to use gorush as push service

This commit is contained in:
LittleSheep 2025-06-01 01:04:20 +08:00
parent 7fa0dfdcad
commit a78e92a23a
14 changed files with 121 additions and 191 deletions

View File

@ -5,7 +5,7 @@ using DysonNetwork.Sphere.Storage.Handlers;
namespace DysonNetwork.Sphere.Account; namespace DysonNetwork.Sphere.Account;
public class ActionLogService(AppDatabase db, GeoIpService geo, FlushBufferService fbs) public class ActionLogService(GeoIpService geo, FlushBufferService fbs)
{ {
public void CreateActionLog(Guid accountId, string action, Dictionary<string, object> meta) public void CreateActionLog(Guid accountId, string action, Dictionary<string, object> meta)
{ {

View File

@ -23,7 +23,7 @@ public class MagicSpell : ModelBase
public MagicSpellType Type { get; set; } public MagicSpellType Type { get; set; }
public Instant? ExpiresAt { get; set; } public Instant? ExpiresAt { get; set; }
public Instant? AffectedAt { get; set; } public Instant? AffectedAt { get; set; }
[Column(TypeName = "jsonb")] public Dictionary<string, object> Meta { get; set; } [Column(TypeName = "jsonb")] public Dictionary<string, object> Meta { get; set; } = new();
public Guid? AccountId { get; set; } public Guid? AccountId { get; set; }
public Account? Account { get; set; } public Account? Account { get; set; }

View File

@ -1,5 +1,5 @@
using CorePush.Apple; using System.Text;
using CorePush.Firebase; using System.Text.Json;
using DysonNetwork.Sphere.Connection; using DysonNetwork.Sphere.Connection;
using EFCore.BulkExtensions; using EFCore.BulkExtensions;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
@ -7,46 +7,15 @@ using NodaTime;
namespace DysonNetwork.Sphere.Account; namespace DysonNetwork.Sphere.Account;
public class NotificationService public class NotificationService(
AppDatabase db,
WebSocketService ws,
ILogger<NotificationService> logger,
IHttpClientFactory httpFactory,
IConfiguration config)
{ {
private readonly AppDatabase _db; private readonly string _notifyTopic = config["Notifications:Topic"]!;
private readonly WebSocketService _ws; private readonly Uri _notifyEndpoint = new(config["Notifications:Endpoint"]!);
private readonly ILogger<NotificationService> _logger;
private readonly FirebaseSender? _fcm;
private readonly ApnSender? _apns;
public NotificationService(
AppDatabase db,
WebSocketService ws,
IConfiguration cfg,
IHttpClientFactory clientFactory,
ILogger<NotificationService> logger
)
{
_db = db;
_ws = ws;
_logger = logger;
var cfgSection = cfg.GetSection("Notifications:Push");
// Set up the firebase push notification
var fcmConfig = cfgSection.GetValue<string>("Google");
if (fcmConfig != null)
_fcm = new FirebaseSender(File.ReadAllText(fcmConfig), clientFactory.CreateClient());
// Set up the apple push notification service
var apnsCert = cfgSection.GetValue<string>("Apple:PrivateKey");
if (apnsCert != null)
_apns = new ApnSender(new ApnSettings
{
P8PrivateKey = File.ReadAllText(apnsCert),
P8PrivateKeyId = cfgSection.GetValue<string>("Apple:PrivateKeyId"),
TeamId = cfgSection.GetValue<string>("Apple:TeamId"),
AppBundleIdentifier = cfgSection.GetValue<string>("Apple:BundleIdentifier"),
ServerType = cfgSection.GetValue<bool>("Production")
? ApnServerType.Production
: ApnServerType.Development
}, clientFactory.CreateClient());
}
// TODO remove all push notification with this device id when this device is logged out // TODO remove all push notification with this device id when this device is logged out
@ -57,7 +26,7 @@ public class NotificationService
string deviceToken string deviceToken
) )
{ {
var existingSubscription = await _db.NotificationPushSubscriptions var existingSubscription = await db.NotificationPushSubscriptions
.Where(s => s.AccountId == account.Id) .Where(s => s.AccountId == account.Id)
.Where(s => s.DeviceId == deviceId || s.DeviceToken == deviceToken) .Where(s => s.DeviceId == deviceId || s.DeviceToken == deviceToken)
.FirstOrDefaultAsync(); .FirstOrDefaultAsync();
@ -67,8 +36,8 @@ public class NotificationService
// Reset these audit fields to renew the lifecycle of this device token // Reset these audit fields to renew the lifecycle of this device token
existingSubscription.CreatedAt = Instant.FromDateTimeUtc(DateTime.UtcNow); existingSubscription.CreatedAt = Instant.FromDateTimeUtc(DateTime.UtcNow);
existingSubscription.UpdatedAt = Instant.FromDateTimeUtc(DateTime.UtcNow); existingSubscription.UpdatedAt = Instant.FromDateTimeUtc(DateTime.UtcNow);
_db.Update(existingSubscription); db.Update(existingSubscription);
await _db.SaveChangesAsync(); await db.SaveChangesAsync();
return existingSubscription; return existingSubscription;
} }
@ -80,8 +49,8 @@ public class NotificationService
AccountId = account.Id, AccountId = account.Id,
}; };
_db.NotificationPushSubscriptions.Add(subscription); db.NotificationPushSubscriptions.Add(subscription);
await _db.SaveChangesAsync(); await db.SaveChangesAsync();
return subscription; return subscription;
} }
@ -111,8 +80,8 @@ public class NotificationService
AccountId = account.Id, AccountId = account.Id,
}; };
_db.Add(notification); db.Add(notification);
await _db.SaveChangesAsync(); await db.SaveChangesAsync();
if (!isSilent) _ = DeliveryNotification(notification); if (!isSilent) _ = DeliveryNotification(notification);
@ -121,22 +90,18 @@ public class NotificationService
public async Task DeliveryNotification(Notification notification) public async Task DeliveryNotification(Notification notification)
{ {
_ws.SendPacketToAccount(notification.AccountId, new WebSocketPacket ws.SendPacketToAccount(notification.AccountId, new WebSocketPacket
{ {
Type = "notifications.new", Type = "notifications.new",
Data = notification Data = notification
}); });
// Pushing the notification // Pushing the notification
var subscribers = await _db.NotificationPushSubscriptions var subscribers = await db.NotificationPushSubscriptions
.Where(s => s.AccountId == notification.AccountId) .Where(s => s.AccountId == notification.AccountId)
.ToListAsync(); .ToListAsync();
var tasks = subscribers await _PushNotification(notification, subscribers);
.Select(subscriber => _PushSingleNotification(notification, subscriber))
.ToList();
await Task.WhenAll(tasks);
} }
public async Task MarkNotificationsViewed(ICollection<Notification> notifications) public async Task MarkNotificationsViewed(ICollection<Notification> notifications)
@ -145,7 +110,7 @@ public class NotificationService
var id = notifications.Where(n => n.ViewedAt == null).Select(n => n.Id).ToList(); var id = notifications.Where(n => n.ViewedAt == null).Select(n => n.Id).ToList();
if (id.Count == 0) return; if (id.Count == 0) return;
await _db.Notifications await db.Notifications
.Where(n => id.Contains(n.Id)) .Where(n => id.Contains(n.Id))
.ExecuteUpdateAsync(s => s.SetProperty(n => n.ViewedAt, now) .ExecuteUpdateAsync(s => s.SetProperty(n => n.ViewedAt, now)
); );
@ -155,7 +120,7 @@ public class NotificationService
{ {
if (save) if (save)
{ {
var accounts = await _db.Accounts.ToListAsync(); var accounts = await db.Accounts.ToListAsync();
var notifications = accounts.Select(x => var notifications = accounts.Select(x =>
{ {
var newNotification = new Notification var newNotification = new Notification
@ -171,19 +136,12 @@ public class NotificationService
}; };
return newNotification; return newNotification;
}).ToList(); }).ToList();
await _db.BulkInsertAsync(notifications); await db.BulkInsertAsync(notifications);
} }
var subscribers = await _db.NotificationPushSubscriptions var subscribers = await db.NotificationPushSubscriptions
.ToListAsync(); .ToListAsync();
var tasks = new List<Task>(); await _PushNotification(notification, subscribers);
foreach (var subscriber in subscribers)
{
notification.AccountId = subscriber.AccountId;
tasks.Add(_PushSingleNotification(notification, subscriber));
}
await Task.WhenAll(tasks);
} }
public async Task SendNotificationBatch(Notification notification, List<Account> accounts, bool save = false) public async Task SendNotificationBatch(Notification notification, List<Account> accounts, bool save = false)
@ -205,113 +163,103 @@ public class NotificationService
}; };
return newNotification; return newNotification;
}).ToList(); }).ToList();
await _db.BulkInsertAsync(notifications); await db.BulkInsertAsync(notifications);
} }
var accountsId = accounts.Select(x => x.Id).ToList(); var accountsId = accounts.Select(x => x.Id).ToList();
var subscribers = await _db.NotificationPushSubscriptions var subscribers = await db.NotificationPushSubscriptions
.Where(s => accountsId.Contains(s.AccountId)) .Where(s => accountsId.Contains(s.AccountId))
.ToListAsync(); .ToListAsync();
var tasks = new List<Task>(); await _PushNotification(notification, subscribers);
foreach (var subscriber in subscribers)
{
notification.AccountId = subscriber.AccountId;
tasks.Add(_PushSingleNotification(notification, subscriber));
}
await Task.WhenAll(tasks);
} }
private async Task _PushSingleNotification(Notification notification, NotificationPushSubscription subscription) private List<Dictionary<string, object>> _BuildNotificationPayload(Notification notification,
IEnumerable<NotificationPushSubscription> subscriptions)
{ {
try var subDict = subscriptions
.GroupBy(x => x.Provider)
.ToDictionary(x => x.Key, x => x.ToList());
var notifications = subDict.Select(value =>
{ {
_logger.LogDebug( int platformCode = value.Key switch
$"Pushing notification {notification.Topic} #{notification.Id} to device #{subscription.DeviceId}");
var body = string.Empty;
switch (subscription.Provider)
{ {
case NotificationPushProvider.Google: NotificationPushProvider.Google => 1,
if (_fcm == null) NotificationPushProvider.Apple => 2,
throw new InvalidOperationException("The firebase cloud messaging is not initialized."); _ => throw new InvalidOperationException($"Unknown push provider: {value.Key}")
};
if (!string.IsNullOrEmpty(notification.Subtitle) || !string.IsNullOrEmpty(notification.Content)) var tokens = value.Value.Select(x => x.DeviceToken).ToList();
{ return _BuildNotificationPayload(notification, platformCode, tokens);
body = string.Join("\n", }).ToList();
notification.Subtitle ?? string.Empty,
notification.Content ?? string.Empty).Trim();
}
await _fcm.SendAsync(new Dictionary<string, object> return notifications.ToList();
{ }
["message"] = new Dictionary<string, object>
{
["token"] = subscription.DeviceToken,
["notification"] = new Dictionary<string, object>
{
["title"] = notification.Title ?? string.Empty,
["body"] = body
},
["data"] = new Dictionary<string, object>
{
["id"] = notification.Id,
["topic"] = notification.Topic,
["meta"] = notification.Meta ?? new Dictionary<string, object>()
}
}
});
break;
case NotificationPushProvider.Apple: private Dictionary<string, object> _BuildNotificationPayload(Notification notification, int platformCode,
if (_apns == null) IEnumerable<string> deviceTokens)
throw new InvalidOperationException("The apple notification push service is not initialized."); {
var alertDict = new Dictionary<string, object>();
var alertDict = new Dictionary<string, object>(); var dict = new Dictionary<string, object>
if (!string.IsNullOrEmpty(notification.Title))
alertDict["title"] = notification.Title;
if (!string.IsNullOrEmpty(notification.Subtitle))
alertDict["subtitle"] = notification.Subtitle;
if (!string.IsNullOrEmpty(notification.Content))
alertDict["body"] = notification.Content;
var payload = new Dictionary<string, object>
{
["topic"] = notification.Topic,
["aps"] = new Dictionary<string, object>
{
["alert"] = alertDict
},
["sound"] = (notification.Priority > 0 ? "default" : null) ?? string.Empty,
["mutable-content"] = 1,
["meta"] = notification.Meta ?? new Dictionary<string, object>()
};
await _apns.SendAsync(
payload,
deviceToken: subscription.DeviceToken,
apnsId: notification.Id.ToString(),
apnsPriority: notification.Priority,
apnPushType: ApnPushType.Alert
);
break;
default:
throw new InvalidOperationException($"Provider not supported: {subscription.Provider}");
}
}
catch (Exception ex)
{ {
// Log the exception ["notif_id"] = notification.Id.ToString(),
// Consider implementing a retry mechanism ["apns_id"] = notification.Id.ToString(),
// Rethrow or handle as needed ["topic"] = _notifyTopic,
_logger.LogError( ["category"] = notification.Topic,
$"Failed to push notification #{notification.Id} to device... {ex.Message} {ex.StackTrace}"); ["tokens"] = deviceTokens,
throw new Exception($"Failed to send notification to {subscription.Provider}: {ex.Message}", ex); ["alert"] = new Dictionary<string, object>(),
["data"] = new Dictionary<string, object>
{
["d_topic"] = notification.Topic,
["meta"] = notification.Meta ?? new Dictionary<string, object>(),
},
["mutable_content"] = true,
};
if (!string.IsNullOrWhiteSpace(notification.Title))
{
dict["title"] = notification.Title;
alertDict["title"] = notification.Title;
} }
_logger.LogInformation( if (!string.IsNullOrWhiteSpace(notification.Content))
$"Pushed notification #{notification.Id} to device #{subscription.DeviceId}"); {
dict["message"] = notification.Content;
alertDict["body"] = notification.Content;
}
if (!string.IsNullOrWhiteSpace(notification.Subtitle))
{
dict["message"] = $"{notification.Subtitle}\n{dict["message"]}";
alertDict["subtitle"] = notification.Subtitle;
}
if (notification.Priority >= 5)
{
dict["sound"] = new Dictionary<string, object> { ["name"] = "default" };
}
dict["platform"] = platformCode;
dict["alert"] = alertDict;
return dict;
}
private async Task _PushNotification(Notification notification,
IEnumerable<NotificationPushSubscription> subscriptions)
{
var requestDict = new Dictionary<string, object>
{
["notifications"] = _BuildNotificationPayload(notification, subscriptions)
};
var client = httpFactory.CreateClient();
client.BaseAddress = _notifyEndpoint;
var request = await client.PostAsync("/api/push", new StringContent(
JsonSerializer.Serialize(requestDict),
Encoding.UTF8,
"application/json"
));
request.EnsureSuccessStatusCode();
} }
} }

View File

@ -178,7 +178,7 @@ public class DysonTokenAuthHandler(
{ {
return new TokenInfo return new TokenInfo
{ {
Token = queryToken, Token = queryToken.ToString(),
Type = TokenType.AuthKey Type = TokenType.AuthKey
}; };
} }

View File

@ -12,7 +12,7 @@ namespace DysonNetwork.Sphere.Chat;
[ApiController] [ApiController]
[Route("/chat")] [Route("/chat")]
public partial class ChatController(AppDatabase db, ChatService cs, FileService fs) : ControllerBase public partial class ChatController(AppDatabase db, ChatService cs) : ControllerBase
{ {
public class MarkMessageReadRequest public class MarkMessageReadRequest
{ {

View File

@ -484,7 +484,7 @@ public class ChatRoomController(
member.JoinedAt = NodaTime.Instant.FromDateTimeUtc(DateTime.UtcNow); member.JoinedAt = NodaTime.Instant.FromDateTimeUtc(DateTime.UtcNow);
db.Update(member); db.Update(member);
await db.SaveChangesAsync(); await db.SaveChangesAsync();
crs.PurgeRoomMembersCache(roomId); _ = crs.PurgeRoomMembersCache(roomId);
als.CreateActionLogFromRequest( als.CreateActionLogFromRequest(
ActionLogType.ChatroomJoin, ActionLogType.ChatroomJoin,
@ -607,7 +607,7 @@ public class ChatRoomController(
member.LeaveAt = SystemClock.Instance.GetCurrentInstant(); member.LeaveAt = SystemClock.Instance.GetCurrentInstant();
await db.SaveChangesAsync(); await db.SaveChangesAsync();
crs.PurgeRoomMembersCache(roomId); _ = crs.PurgeRoomMembersCache(roomId);
als.CreateActionLogFromRequest( als.CreateActionLogFromRequest(
ActionLogType.ChatroomKick, ActionLogType.ChatroomKick,
@ -649,7 +649,7 @@ public class ChatRoomController(
db.ChatMembers.Add(newMember); db.ChatMembers.Add(newMember);
await db.SaveChangesAsync(); await db.SaveChangesAsync();
crs.PurgeRoomMembersCache(roomId); _ = crs.PurgeRoomMembersCache(roomId);
als.CreateActionLogFromRequest( als.CreateActionLogFromRequest(
ActionLogType.ChatroomJoin, ActionLogType.ChatroomJoin,

View File

@ -9,8 +9,6 @@ using SystemClock = NodaTime.SystemClock;
namespace DysonNetwork.Sphere.Connection.Handlers; namespace DysonNetwork.Sphere.Connection.Handlers;
public class MessageReadHandler( public class MessageReadHandler(
AppDatabase db,
ICacheService cache,
ChatRoomService crs, ChatRoomService crs,
FlushBufferService buffer FlushBufferService buffer
) )

View File

@ -5,7 +5,7 @@ using Microsoft.EntityFrameworkCore;
namespace DysonNetwork.Sphere.Connection.Handlers; namespace DysonNetwork.Sphere.Connection.Handlers;
public class MessageTypingHandler(AppDatabase db, ChatRoomService crs, ICacheService cache) : IWebSocketPacketHandler public class MessageTypingHandler(ChatRoomService crs) : IWebSocketPacketHandler
{ {
public string PacketType => "messages.typing"; public string PacketType => "messages.typing";

View File

@ -91,8 +91,7 @@ public class WebSocketController(WebSocketService ws, ILogger<WebSocketContext>
); );
var packet = WebSocketPacket.FromBytes(buffer[..receiveResult.Count]); var packet = WebSocketPacket.FromBytes(buffer[..receiveResult.Count]);
if (packet is null) continue; _ = ws.HandlePacket(currentUser, connectionKey.DeviceId, packet, webSocket);
ws.HandlePacket(currentUser, connectionKey.DeviceId, packet, webSocket);
} }
} }
catch (OperationCanceledException) catch (OperationCanceledException)

View File

@ -14,7 +14,7 @@ public class WebSocketPacketType
public class WebSocketPacket public class WebSocketPacket
{ {
public string Type { get; set; } = null!; public string Type { get; set; } = null!;
public object Data { get; set; } public object Data { get; set; } = null!;
public string? ErrorMessage { get; set; } public string? ErrorMessage { get; set; }
/// <summary> /// <summary>

View File

@ -18,7 +18,6 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="BCrypt.Net-Next" Version="4.0.3"/> <PackageReference Include="BCrypt.Net-Next" Version="4.0.3"/>
<PackageReference Include="BlurHashSharp.SkiaSharp" Version="1.3.4"/> <PackageReference Include="BlurHashSharp.SkiaSharp" Version="1.3.4"/>
<PackageReference Include="CorePush" Version="4.3.0"/>
<PackageReference Include="EFCore.BulkExtensions" Version="9.0.1"/> <PackageReference Include="EFCore.BulkExtensions" Version="9.0.1"/>
<PackageReference Include="EFCore.BulkExtensions.PostgreSql" Version="9.0.1"/> <PackageReference Include="EFCore.BulkExtensions.PostgreSql" Version="9.0.1"/>
<PackageReference Include="EFCore.NamingConventions" Version="9.0.0"/> <PackageReference Include="EFCore.NamingConventions" Version="9.0.0"/>

View File

@ -7,8 +7,8 @@ namespace DysonNetwork.Sphere.Permission;
public class PermissionService( public class PermissionService(
AppDatabase db, AppDatabase db,
ICacheService cache, ICacheService cache
ILogger<PermissionService> logger) )
{ {
private static readonly TimeSpan CacheExpiration = TimeSpan.FromMinutes(1); private static readonly TimeSpan CacheExpiration = TimeSpan.FromMinutes(1);

View File

@ -1,10 +1,5 @@
using DysonNetwork.Sphere.Storage; using DysonNetwork.Sphere.Storage;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Caching.Memory;
using System;
using System.Linq;
using System.Threading.Tasks;
namespace DysonNetwork.Sphere.Sticker; namespace DysonNetwork.Sphere.Sticker;

View File

@ -31,7 +31,6 @@
"StorePath": "Uploads" "StorePath": "Uploads"
}, },
"Storage": { "Storage": {
"BaseUrl": "http://localhost:5071",
"PreferredRemote": "cloudflare", "PreferredRemote": "cloudflare",
"Remote": [ "Remote": [
{ {
@ -53,16 +52,8 @@
"ApiSecret": "0x4AAAAAABCDUWABiJQweqlB7tYq-IqIm8U" "ApiSecret": "0x4AAAAAABCDUWABiJQweqlB7tYq-IqIm8U"
}, },
"Notifications": { "Notifications": {
"Push": { "Topic": "dev.solsynth.solian",
"Production": true, "Endpoint": "http://localhost:8088"
"Google": "./Keys/Solian.json",
"Apple": {
"PrivateKey": "./Keys/Solian.p8",
"PrivateKeyId": "4US4KSX4W6",
"TeamId": "W7HPZ53V6B",
"BundleIdentifier": "dev.solsynth.solian"
}
}
}, },
"Email": { "Email": {
"Server": "smtp4dev.orb.local", "Server": "smtp4dev.orb.local",