Pusher service basis

This commit is contained in:
2025-07-12 22:15:18 +08:00
parent 33f56c4ef5
commit e1b47bc7d1
22 changed files with 1117 additions and 104 deletions

View File

@@ -0,0 +1,178 @@
using System.Linq.Expressions;
using System.Reflection;
using DysonNetwork.Pusher.Notification;
using DysonNetwork.Shared.Data;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Design;
using Microsoft.EntityFrameworkCore.Query;
using NodaTime;
using Quartz;
namespace DysonNetwork.Pusher;
public class AppDatabase(
DbContextOptions<AppDatabase> options,
IConfiguration configuration
) : DbContext(options)
{
public DbSet<Notification.Notification> Notifications { get; set; } = null!;
public DbSet<PushSubscription> PushSubscriptions { get; set; }
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
optionsBuilder.UseNpgsql(
configuration.GetConnectionString("App"),
opt => opt
.ConfigureDataSource(optSource => optSource.EnableDynamicJson())
.UseQuerySplittingBehavior(QuerySplittingBehavior.SplitQuery)
.UseNodaTime()
).UseSnakeCaseNamingConvention();
base.OnConfiguring(optionsBuilder);
}
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
base.OnModelCreating(modelBuilder);
// Automatically apply soft-delete filter to all entities inheriting BaseModel
foreach (var entityType in modelBuilder.Model.GetEntityTypes())
{
if (!typeof(ModelBase).IsAssignableFrom(entityType.ClrType)) continue;
var method = typeof(AppDatabase)
.GetMethod(nameof(SetSoftDeleteFilter),
BindingFlags.NonPublic | BindingFlags.Static)!
.MakeGenericMethod(entityType.ClrType);
method.Invoke(null, [modelBuilder]);
}
}
private static void SetSoftDeleteFilter<TEntity>(ModelBuilder modelBuilder)
where TEntity : ModelBase
{
modelBuilder.Entity<TEntity>().HasQueryFilter(e => e.DeletedAt == null);
}
public override async Task<int> SaveChangesAsync(CancellationToken cancellationToken = default)
{
var now = SystemClock.Instance.GetCurrentInstant();
foreach (var entry in ChangeTracker.Entries<ModelBase>())
{
switch (entry.State)
{
case EntityState.Added:
entry.Entity.CreatedAt = now;
entry.Entity.UpdatedAt = now;
break;
case EntityState.Modified:
entry.Entity.UpdatedAt = now;
break;
case EntityState.Deleted:
entry.State = EntityState.Modified;
entry.Entity.DeletedAt = now;
break;
case EntityState.Detached:
case EntityState.Unchanged:
default:
break;
}
}
return await base.SaveChangesAsync(cancellationToken);
}
}
public class AppDatabaseRecyclingJob(AppDatabase db, ILogger<AppDatabaseRecyclingJob> logger) : IJob
{
public async Task Execute(IJobExecutionContext context)
{
var now = SystemClock.Instance.GetCurrentInstant();
logger.LogInformation("Deleting soft-deleted records...");
var threshold = now - Duration.FromDays(7);
var entityTypes = db.Model.GetEntityTypes()
.Where(t => typeof(ModelBase).IsAssignableFrom(t.ClrType) && t.ClrType != typeof(ModelBase))
.Select(t => t.ClrType);
foreach (var entityType in entityTypes)
{
var set = (IQueryable)db.GetType().GetMethod(nameof(DbContext.Set), Type.EmptyTypes)!
.MakeGenericMethod(entityType).Invoke(db, null)!;
var parameter = Expression.Parameter(entityType, "e");
var property = Expression.Property(parameter, nameof(ModelBase.DeletedAt));
var condition = Expression.LessThan(property, Expression.Constant(threshold, typeof(Instant?)));
var notNull = Expression.NotEqual(property, Expression.Constant(null, typeof(Instant?)));
var finalCondition = Expression.AndAlso(notNull, condition);
var lambda = Expression.Lambda(finalCondition, parameter);
var queryable = set.Provider.CreateQuery(
Expression.Call(
typeof(Queryable),
"Where",
[entityType],
set.Expression,
Expression.Quote(lambda)
)
);
var toListAsync = typeof(EntityFrameworkQueryableExtensions)
.GetMethod(nameof(EntityFrameworkQueryableExtensions.ToListAsync))!
.MakeGenericMethod(entityType);
var items = await (dynamic)toListAsync.Invoke(null, [queryable, CancellationToken.None])!;
db.RemoveRange(items);
}
await db.SaveChangesAsync();
}
}
public class AppDatabaseFactory : IDesignTimeDbContextFactory<AppDatabase>
{
public AppDatabase CreateDbContext(string[] args)
{
var configuration = new ConfigurationBuilder()
.SetBasePath(Directory.GetCurrentDirectory())
.AddJsonFile("appsettings.json")
.Build();
var optionsBuilder = new DbContextOptionsBuilder<AppDatabase>();
return new AppDatabase(optionsBuilder.Options, configuration);
}
}
public static class OptionalQueryExtensions
{
public static IQueryable<T> If<T>(
this IQueryable<T> source,
bool condition,
Func<IQueryable<T>, IQueryable<T>> transform
)
{
return condition ? transform(source) : source;
}
public static IQueryable<T> If<T, TP>(
this IIncludableQueryable<T, TP> source,
bool condition,
Func<IIncludableQueryable<T, TP>, IQueryable<T>> transform
)
where T : class
{
return condition ? transform(source) : source;
}
public static IQueryable<T> If<T, TP>(
this IIncludableQueryable<T, IEnumerable<TP>> source,
bool condition,
Func<IIncludableQueryable<T, IEnumerable<TP>>, IQueryable<T>> transform
)
where T : class
{
return condition ? transform(source) : source;
}
}

View File

@@ -1,9 +1,17 @@
using System.Net.WebSockets;
using DysonNetwork.Shared.Proto;
namespace DysonNetwork.Pusher.Connection;
public interface IWebSocketPacketHandler
{
string PacketType { get; }
Task HandleAsync(Account currentUser, string deviceId, WebSocketPacket packet, WebSocket socket, WebSocketService srv);
Task HandleAsync(
Account currentUser,
string deviceId,
WebSocketPacket packet,
WebSocket socket,
WebSocketService srv
);
}

View File

@@ -1,8 +1,7 @@
using System.Collections.Concurrent;
using System.Net.WebSockets;
using DysonNetwork.Shared.Proto;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore.Metadata.Internal;
using Swashbuckle.AspNetCore.Annotations;
namespace DysonNetwork.Pusher.Connection;
@@ -18,15 +17,15 @@ public class WebSocketController(WebSocketService ws, ILogger<WebSocketContext>
{
HttpContext.Items.TryGetValue("CurrentUser", out var currentUserValue);
HttpContext.Items.TryGetValue("CurrentSession", out var currentSessionValue);
if (currentUserValue is not Account.Account currentUser ||
currentSessionValue is not Auth.Session currentSession)
if (currentUserValue is not Account currentUser ||
currentSessionValue is not AuthSession currentSession)
{
HttpContext.Response.StatusCode = StatusCodes.Status401Unauthorized;
return;
}
var accountId = currentUser.Id;
var deviceId = currentSession.Challenge.DeviceId;
var accountId = currentUser.Id!;
var deviceId = currentSession.Challenge.DeviceId!;
if (string.IsNullOrEmpty(deviceId))
{
@@ -69,7 +68,7 @@ public class WebSocketController(WebSocketService ws, ILogger<WebSocketContext>
private async Task _ConnectionEventLoop(
string deviceId,
Account.Account currentUser,
Account currentUser,
WebSocket webSocket,
CancellationToken cancellationToken
)

View File

@@ -2,7 +2,9 @@ using System.Text.Json;
using NodaTime;
using NodaTime.Serialization.SystemTextJson;
public class WebSocketPacketType
namespace DysonNetwork.Pusher.Connection;
public abstract class WebSocketPacketType
{
public const string Error = "error";
public const string MessageNew = "messages.new";
@@ -31,7 +33,7 @@ public class WebSocketPacket
DictionaryKeyPolicy = JsonNamingPolicy.SnakeCaseLower,
};
return JsonSerializer.Deserialize<WebSocketPacket>(json, jsonOpts) ??
throw new JsonException("Failed to deserialize WebSocketPacket");
throw new JsonException("Failed to deserialize WebSocketPacket");
}
/// <summary>

View File

@@ -1,5 +1,6 @@
using System.Collections.Concurrent;
using System.Net.WebSockets;
using DysonNetwork.Shared.Proto;
namespace DysonNetwork.Pusher.Connection;
@@ -13,7 +14,7 @@ public class WebSocketService
}
private static readonly ConcurrentDictionary<
(Guid AccountId, string DeviceId),
(string AccountId, string DeviceId),
(WebSocket Socket, CancellationTokenSource Cts)
> ActiveConnections = new();
@@ -29,21 +30,23 @@ public class WebSocketService
ActiveSubscriptions.TryRemove(deviceId, out _);
}
public bool IsUserSubscribedToChatRoom(Guid accountId, string chatRoomId)
public bool IsUserSubscribedToChatRoom(string accountId, string chatRoomId)
{
var userDeviceIds = ActiveConnections.Keys.Where(k => k.AccountId == accountId).Select(k => k.DeviceId);
foreach (var deviceId in userDeviceIds)
{
if (ActiveSubscriptions.TryGetValue(deviceId, out var subscribedChatRoomId) && subscribedChatRoomId == chatRoomId)
if (ActiveSubscriptions.TryGetValue(deviceId, out var subscribedChatRoomId) &&
subscribedChatRoomId == chatRoomId)
{
return true;
}
}
return false;
}
public bool TryAdd(
(Guid AccountId, string DeviceId) key,
(string AccountId, string DeviceId) key,
WebSocket socket,
CancellationTokenSource cts
)
@@ -54,7 +57,7 @@ public class WebSocketService
return ActiveConnections.TryAdd(key, (socket, cts));
}
public void Disconnect((Guid AccountId, string DeviceId) key, string? reason = null)
public void Disconnect((string AccountId, string DeviceId) key, string? reason = null)
{
if (!ActiveConnections.TryGetValue(key, out var data)) return;
data.Socket.CloseAsync(
@@ -67,12 +70,12 @@ public class WebSocketService
UnsubscribeFromChatRoom(key.DeviceId);
}
public bool GetAccountIsConnected(Guid accountId)
public bool GetAccountIsConnected(string accountId)
{
return ActiveConnections.Any(c => c.Key.AccountId == accountId);
}
public void SendPacketToAccount(Guid userId, WebSocketPacket packet)
public void SendPacketToAccount(string userId, WebSocketPacket packet)
{
var connections = ActiveConnections.Where(c => c.Key.AccountId == userId);
var packetBytes = packet.ToBytes();
@@ -106,8 +109,12 @@ public class WebSocketService
}
}
public async Task HandlePacket(Account.Account currentUser, string deviceId, WebSocketPacket packet,
WebSocket socket)
public async Task HandlePacket(
Account currentUser,
string deviceId,
WebSocketPacket packet,
WebSocket socket
)
{
if (_handlerMap.TryGetValue(packet.Type, out var handler))
{

View File

@@ -8,8 +8,19 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="EFCore.BulkExtensions" Version="9.0.1" />
<PackageReference Include="EFCore.BulkExtensions.PostgreSql" Version="9.0.1" />
<PackageReference Include="EFCore.NamingConventions" Version="9.0.0" />
<PackageReference Include="Grpc.AspNetCore.Server" Version="2.71.0" />
<PackageReference Include="MailKit" Version="4.13.0" />
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="9.0.7"/>
<PackageReference Include="NodaTime" Version="3.2.2" />
<PackageReference Include="NodaTime.Serialization.SystemTextJson" Version="1.3.0" />
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="9.0.4" />
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL.NodaTime" Version="9.0.4" />
<PackageReference Include="Quartz" Version="3.14.0" />
<PackageReference Include="Swashbuckle.AspNetCore" Version="9.0.3" />
<PackageReference Include="Swashbuckle.AspNetCore.SwaggerUI" Version="9.0.3" />
</ItemGroup>
<ItemGroup>
@@ -18,4 +29,8 @@
</Content>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\DysonNetwork.Shared\DysonNetwork.Shared.csproj" />
</ItemGroup>
</Project>

View File

@@ -0,0 +1,86 @@
using MailKit.Net.Smtp;
using MimeKit;
namespace DysonNetwork.Pusher.Email;
public class EmailServiceConfiguration
{
public string Server { get; set; } = null!;
public int Port { get; set; }
public bool UseSsl { get; set; }
public string Username { get; set; } = null!;
public string Password { get; set; } = null!;
public string FromAddress { get; set; } = null!;
public string FromName { get; set; } = null!;
public string SubjectPrefix { get; set; } = null!;
}
public class EmailService
{
private readonly EmailServiceConfiguration _configuration;
private readonly ILogger<EmailService> _logger;
public EmailService(IConfiguration configuration, ILogger<EmailService> logger)
{
var cfg = configuration.GetSection("Email").Get<EmailServiceConfiguration>();
_configuration = cfg ?? throw new ArgumentException("Email service was not configured.");
_logger = logger;
}
public async Task SendEmailAsync(string? recipientName, string recipientEmail, string subject, string textBody)
{
await SendEmailAsync(recipientName, recipientEmail, subject, textBody, null);
}
public async Task SendEmailAsync(string? recipientName, string recipientEmail, string subject, string textBody,
string? htmlBody)
{
subject = $"[{_configuration.SubjectPrefix}] {subject}";
var emailMessage = new MimeMessage();
emailMessage.From.Add(new MailboxAddress(_configuration.FromName, _configuration.FromAddress));
emailMessage.To.Add(new MailboxAddress(recipientName, recipientEmail));
emailMessage.Subject = subject;
var bodyBuilder = new BodyBuilder
{
TextBody = textBody
};
if (!string.IsNullOrEmpty(htmlBody))
bodyBuilder.HtmlBody = htmlBody;
emailMessage.Body = bodyBuilder.ToMessageBody();
using var client = new SmtpClient();
await client.ConnectAsync(_configuration.Server, _configuration.Port, _configuration.UseSsl);
await client.AuthenticateAsync(_configuration.Username, _configuration.Password);
await client.SendAsync(emailMessage);
await client.DisconnectAsync(true);
}
private static string _ConvertHtmlToPlainText(string html)
{
// Remove style tags and their contents
html = System.Text.RegularExpressions.Regex.Replace(html, "<style[^>]*>.*?</style>", "",
System.Text.RegularExpressions.RegexOptions.Singleline);
// Replace header tags with text + newlines
html = System.Text.RegularExpressions.Regex.Replace(html, "<h[1-6][^>]*>(.*?)</h[1-6]>", "$1\n\n",
System.Text.RegularExpressions.RegexOptions.IgnoreCase);
// Replace line breaks
html = html.Replace("<br>", "\n").Replace("<br/>", "\n").Replace("<br />", "\n");
// Remove all remaining HTML tags
html = System.Text.RegularExpressions.Regex.Replace(html, "<[^>]+>", "");
// Decode HTML entities
html = System.Net.WebUtility.HtmlDecode(html);
// Remove excess whitespace
html = System.Text.RegularExpressions.Regex.Replace(html, @"\s+", " ").Trim();
return html;
}
}

View File

@@ -0,0 +1,24 @@
using System.ComponentModel.DataAnnotations;
using System.ComponentModel.DataAnnotations.Schema;
using System.Text.Json.Serialization;
using DysonNetwork.Shared.Data;
using DysonNetwork.Shared.Proto;
using NodaTime;
namespace DysonNetwork.Pusher.Notification;
public class Notification : ModelBase
{
public Guid Id { get; set; } = Guid.NewGuid();
[MaxLength(1024)] public string Topic { get; set; } = null!;
[MaxLength(1024)] public string? Title { get; set; }
[MaxLength(2048)] public string? Subtitle { get; set; }
[MaxLength(4096)] public string? Content { get; set; }
[Column(TypeName = "jsonb")] public Dictionary<string, object>? Meta { get; set; }
public int Priority { get; set; } = 10;
public Instant? ViewedAt { get; set; }
public Guid AccountId { get; set; }
[JsonIgnore] public Account Account { get; set; } = null!;
}

View File

@@ -0,0 +1,258 @@
using System.Text;
using System.Text.Json;
using DysonNetwork.Shared.Proto;
using EFCore.BulkExtensions;
using Microsoft.EntityFrameworkCore;
using NodaTime;
namespace DysonNetwork.Pusher.Notification;
public class PushService(IConfiguration config, AppDatabase db, IHttpClientFactory httpFactory)
{
private readonly string _notifyTopic = config["Notifications:Topic"]!;
private readonly Uri _notifyEndpoint = new(config["Notifications:Endpoint"]!);
public async Task UnsubscribePushNotifications(string deviceId)
{
await db.PushSubscriptions
.Where(s => s.DeviceId == deviceId)
.ExecuteDeleteAsync();
}
public async Task<PushSubscription> SubscribePushNotification(
string deviceId,
string deviceToken,
PushProvider provider,
Account account
)
{
var now = SystemClock.Instance.GetCurrentInstant();
// First check if a matching subscription exists
var accountId = Guid.Parse(account.Id!);
var existingSubscription = await db.PushSubscriptions
.Where(s => s.AccountId == accountId)
.Where(s => s.DeviceId == deviceId || s.DeviceToken == deviceToken)
.FirstOrDefaultAsync();
if (existingSubscription is not null)
{
// Update the existing subscription directly in the database
await db.PushSubscriptions
.Where(s => s.Id == existingSubscription.Id)
.ExecuteUpdateAsync(setters => setters
.SetProperty(s => s.DeviceId, deviceId)
.SetProperty(s => s.DeviceToken, deviceToken)
.SetProperty(s => s.UpdatedAt, now));
// Return the updated subscription
existingSubscription.DeviceId = deviceId;
existingSubscription.DeviceToken = deviceToken;
existingSubscription.UpdatedAt = now;
return existingSubscription;
}
var subscription = new PushSubscription
{
DeviceId = deviceId,
DeviceToken = deviceToken,
Provider = provider,
AccountId = accountId,
};
db.PushSubscriptions.Add(subscription);
await db.SaveChangesAsync();
return subscription;
}
public async Task<Pusher.Notification.Notification> SendNotification(
Account account,
string topic,
string? title = null,
string? subtitle = null,
string? content = null,
Dictionary<string, object>? meta = null,
string? actionUri = null,
bool isSilent = false,
bool save = true
)
{
if (title is null && subtitle is null && content is null)
throw new ArgumentException("Unable to send notification that completely empty.");
meta ??= new Dictionary<string, object>();
if (actionUri is not null) meta["action_uri"] = actionUri;
var accountId = Guid.Parse(account.Id!);
var notification = new Notification
{
Topic = topic,
Title = title,
Subtitle = subtitle,
Content = content,
Meta = meta,
AccountId = accountId,
};
if (save)
{
db.Add(notification);
await db.SaveChangesAsync();
}
if (!isSilent) _ = DeliveryNotification(notification);
return notification;
}
public async Task DeliveryNotification(Pusher.Notification.Notification notification)
{
// Pushing the notification
var subscribers = await db.PushSubscriptions
.Where(s => s.AccountId == notification.AccountId)
.ToListAsync();
await _PushNotification(notification, subscribers);
}
public async Task MarkNotificationsViewed(ICollection<Notification> notifications)
{
var now = SystemClock.Instance.GetCurrentInstant();
var id = notifications.Where(n => n.ViewedAt == null).Select(n => n.Id).ToList();
if (id.Count == 0) return;
await db.Notifications
.Where(n => id.Contains(n.Id))
.ExecuteUpdateAsync(s => s.SetProperty(n => n.ViewedAt, now)
);
}
public async Task SendNotificationBatch(Notification notification, List<Account> accounts, bool save = false)
{
if (save)
{
var notifications = accounts.Select(x =>
{
var newNotification = new Notification
{
Topic = notification.Topic,
Title = notification.Title,
Subtitle = notification.Subtitle,
Content = notification.Content,
Meta = notification.Meta,
Priority = notification.Priority,
Account = x,
AccountId = Guid.Parse(x.Id)
};
return newNotification;
}).ToList();
await db.BulkInsertAsync(notifications);
}
foreach (var account in accounts)
{
notification.Account = account;
notification.AccountId = Guid.Parse(account.Id);
}
var accountsId = accounts.Select(x => Guid.Parse(x.Id)).ToList();
var subscribers = await db.PushSubscriptions
.Where(s => accountsId.Contains(s.AccountId))
.ToListAsync();
await _PushNotification(notification, subscribers);
}
private List<Dictionary<string, object>> _BuildNotificationPayload(Notification notification,
IEnumerable<PushSubscription> subscriptions)
{
var subDict = subscriptions
.GroupBy(x => x.Provider)
.ToDictionary(x => x.Key, x => x.ToList());
var notifications = subDict.Select(value =>
{
var platformCode = value.Key switch
{
PushProvider.Apple => 1,
PushProvider.Google => 2,
_ => throw new InvalidOperationException($"Unknown push provider: {value.Key}")
};
var tokens = value.Value.Select(x => x.DeviceToken).ToList();
return _BuildNotificationPayload(notification, platformCode, tokens);
}).ToList();
return notifications.ToList();
}
private Dictionary<string, object> _BuildNotificationPayload(Pusher.Notification.Notification notification,
int platformCode,
IEnumerable<string> deviceTokens)
{
var alertDict = new Dictionary<string, object>();
var dict = new Dictionary<string, object>
{
["notif_id"] = notification.Id.ToString(),
["apns_id"] = notification.Id.ToString(),
["topic"] = _notifyTopic,
["tokens"] = deviceTokens,
["data"] = new Dictionary<string, object>
{
["type"] = notification.Topic,
["meta"] = notification.Meta ?? new Dictionary<string, object>(),
},
["mutable_content"] = true,
["priority"] = notification.Priority >= 5 ? "high" : "normal",
};
if (!string.IsNullOrWhiteSpace(notification.Title))
{
dict["title"] = notification.Title;
alertDict["title"] = notification.Title;
}
if (!string.IsNullOrWhiteSpace(notification.Content))
{
dict["message"] = notification.Content;
alertDict["body"] = notification.Content;
}
if (!string.IsNullOrWhiteSpace(notification.Subtitle))
{
dict["message"] = $"{notification.Subtitle}\n{dict["message"]}";
alertDict["subtitle"] = notification.Subtitle;
}
if (notification.Priority >= 5)
dict["name"] = "default";
dict["platform"] = platformCode;
dict["alert"] = alertDict;
return dict;
}
private async Task _PushNotification(
Notification notification,
IEnumerable<PushSubscription> subscriptions
)
{
var subList = subscriptions.ToList();
if (subList.Count == 0) return;
var requestDict = new Dictionary<string, object>
{
["notifications"] = _BuildNotificationPayload(notification, subList)
};
var client = httpFactory.CreateClient();
client.BaseAddress = _notifyEndpoint;
var request = await client.PostAsync("/push", new StringContent(
JsonSerializer.Serialize(requestDict),
Encoding.UTF8,
"application/json"
));
request.EnsureSuccessStatusCode();
}
}

View File

@@ -0,0 +1,25 @@
using System.ComponentModel.DataAnnotations;
using DysonNetwork.Shared.Data;
using Microsoft.EntityFrameworkCore;
using NodaTime;
namespace DysonNetwork.Pusher.Notification;
public enum PushProvider
{
Apple,
Google
}
[Index(nameof(AccountId), nameof(DeviceId), nameof(DeletedAt), IsUnique = true)]
public class PushSubscription : ModelBase
{
public Guid Id { get; set; } = Guid.NewGuid();
public Guid AccountId { get; set; }
[MaxLength(8192)] public string DeviceId { get; set; } = null!;
[MaxLength(8192)] public string DeviceToken { get; set; } = null!;
public PushProvider Provider { get; set; }
public int CountDelivered { get; set; }
public Instant? LastUsedAt { get; set; }
}