Pusher service

This commit is contained in:
2025-07-12 23:31:21 +08:00
parent e1b47bc7d1
commit 4a7f2e18b3
16 changed files with 563 additions and 46 deletions

View File

@ -1,5 +1,5 @@
using DysonNetwork.Shared.Cache;
using DysonNetwork.Shared.Geo;
using DysonNetwork.Shared.GeoIp;
namespace DysonNetwork.Pass.Account;

View File

@ -3,7 +3,7 @@ using Microsoft.AspNetCore.Mvc;
using NodaTime;
using Microsoft.EntityFrameworkCore;
using DysonNetwork.Pass.Account;
using DysonNetwork.Shared.Geo;
using DysonNetwork.Shared.GeoIp;
namespace DysonNetwork.Pass.Auth;

View File

@ -4,9 +4,9 @@ using Microsoft.EntityFrameworkCore;
namespace DysonNetwork.Pass.Auth;
public class AuthServiceGrpc(AuthService authService, AppDatabase db) : Shared.Proto.AuthService
public class AuthServiceGrpc(AuthService authService, AppDatabase db) : Shared.Proto.AuthService.AuthServiceBase
{
public async Task<Shared.Proto.AuthSession> Authenticate(AuthenticateRequest request, ServerCallContext context)
public override async Task<Shared.Proto.AuthSession> Authenticate(AuthenticateRequest request, ServerCallContext context)
{
if (!authService.ValidateToken(request.Token, out var sessionId))
{

View File

@ -1,5 +1,6 @@
using System.Net;
using DysonNetwork.Pass.Account;
using DysonNetwork.Pass.Auth;
using DysonNetwork.Pass.Permission;
using Microsoft.AspNetCore.HttpOverrides;
using Prometheus;
@ -68,6 +69,7 @@ public static class ApplicationConfiguration
public static WebApplication ConfigureGrpcServices(this WebApplication app)
{
app.MapGrpcService<AccountServiceGrpc>();
app.MapGrpcService<AuthServiceGrpc>();
return app;
}

View File

@ -18,7 +18,7 @@ using DysonNetwork.Pass.Auth.OidcProvider.Services;
using DysonNetwork.Pass.Handlers;
using DysonNetwork.Pass.Wallet.PaymentHandlers;
using DysonNetwork.Shared.Cache;
using DysonNetwork.Shared.Geo;
using DysonNetwork.Shared.GeoIp;
namespace DysonNetwork.Pass.Startup;
@ -53,6 +53,7 @@ public static class ServiceCollectionExtensions
// Register gRPC services
services.AddScoped<AccountServiceGrpc>();
services.AddScoped<AuthServiceGrpc>();
// Register OIDC services
services.AddScoped<OidcService, GoogleOidcService>();

View File

@ -1,12 +0,0 @@
namespace DysonNetwork.Pass;
public class WeatherForecast
{
public DateOnly Date { get; set; }
public int TemperatureC { get; set; }
public int TemperatureF => 32 + (int)(TemperatureC / 0.5556);
public string? Summary { get; set; }
}

View File

@ -19,6 +19,7 @@
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="9.0.4" />
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL.NodaTime" Version="9.0.4" />
<PackageReference Include="Quartz" Version="3.14.0" />
<PackageReference Include="Quartz.Extensions.Hosting" Version="3.14.0" />
<PackageReference Include="Swashbuckle.AspNetCore" Version="9.0.3" />
<PackageReference Include="Swashbuckle.AspNetCore.SwaggerUI" Version="9.0.3" />
</ItemGroup>

View File

@ -66,8 +66,7 @@ public class PushService(IConfiguration config, AppDatabase db, IHttpClientFacto
return subscription;
}
public async Task<Pusher.Notification.Notification> SendNotification(
Account account,
public async Task SendNotification(Account account,
string topic,
string? title = null,
string? subtitle = null,
@ -75,8 +74,7 @@ public class PushService(IConfiguration config, AppDatabase db, IHttpClientFacto
Dictionary<string, object>? meta = null,
string? actionUri = null,
bool isSilent = false,
bool save = true
)
bool save = true)
{
if (title is null && subtitle is null && content is null)
throw new ArgumentException("Unable to send notification that completely empty.");
@ -102,8 +100,6 @@ public class PushService(IConfiguration config, AppDatabase db, IHttpClientFacto
}
if (!isSilent) _ = DeliveryNotification(notification);
return notification;
}
public async Task DeliveryNotification(Pusher.Notification.Notification notification)
@ -128,7 +124,7 @@ public class PushService(IConfiguration config, AppDatabase db, IHttpClientFacto
);
}
public async Task SendNotificationBatch(Notification notification, List<Account> accounts, bool save = false)
public async Task SendNotificationBatch(Notification notification, List<Guid> accounts, bool save = false)
{
if (save)
{
@ -142,23 +138,15 @@ public class PushService(IConfiguration config, AppDatabase db, IHttpClientFacto
Content = notification.Content,
Meta = notification.Meta,
Priority = notification.Priority,
Account = x,
AccountId = Guid.Parse(x.Id)
AccountId = x
};
return newNotification;
}).ToList();
await db.BulkInsertAsync(notifications);
}
foreach (var account in accounts)
{
notification.Account = account;
notification.AccountId = Guid.Parse(account.Id);
}
var accountsId = accounts.Select(x => Guid.Parse(x.Id)).ToList();
var subscribers = await db.PushSubscriptions
.Where(s => accountsId.Contains(s.AccountId))
.Where(s => accounts.Contains(s.AccountId))
.ToListAsync();
await _PushNotification(notification, subscribers);
}

View File

@ -1,23 +1,41 @@
using DysonNetwork.Pass.Startup;
using DysonNetwork.Pusher;
using DysonNetwork.Pusher.Startup;
using Microsoft.EntityFrameworkCore;
var builder = WebApplication.CreateBuilder(args);
// Add services to the container.
// Configure Kestrel and server options
builder.ConfigureAppKestrel();
builder.Services.AddControllers();
// Learn more about configuring OpenAPI at https://aka.ms/aspnet/openapi
builder.Services.AddOpenApi();
// Add application services
builder.Services.AddAppServices(builder.Configuration);
builder.Services.AddAppRateLimiting();
builder.Services.AddAppAuthentication();
builder.Services.AddAppSwagger();
// Add flush handlers and websocket handlers
builder.Services.AddAppFlushHandlers();
// Add business services
builder.Services.AddAppBusinessServices();
// Add scheduled jobs
builder.Services.AddAppScheduledJobs();
var app = builder.Build();
// Configure the HTTP request pipeline.
if (app.Environment.IsDevelopment())
// Run database migrations
using (var scope = app.Services.CreateScope())
{
app.MapOpenApi();
var db = scope.ServiceProvider.GetRequiredService<AppDatabase>();
await db.Database.MigrateAsync();
}
app.UseHttpsRedirection();
// Configure application middleware pipeline
app.ConfigureAppMiddleware(builder.Configuration);
app.UseAuthorization();
app.MapControllers();
// Configure gRPC
app.ConfigureGrpcServices();
app.Run();

View File

@ -0,0 +1,162 @@
using DysonNetwork.Pusher.Connection;
using DysonNetwork.Pusher.Email;
using DysonNetwork.Pusher.Notification;
using DysonNetwork.Shared.Proto;
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
namespace DysonNetwork.Pusher.Services;
public class PusherServiceGrpc(
EmailService emailService,
WebSocketService webSocketService,
PushService pushService
) : PusherService.PusherServiceBase
{
public override async Task<Empty> SendEmail(SendEmailRequest request, ServerCallContext context)
{
await emailService.SendEmailAsync(
request.Email.ToName,
request.Email.ToAddress,
request.Email.Subject,
request.Email.Body
);
return new Empty();
}
public override Task<Empty> PushWebSocketPacket(PushWebSocketPacketRequest request, ServerCallContext context)
{
var packet = new Connection.WebSocketPacket
{
Type = request.Packet.Type,
Data = request.Packet.Data,
ErrorMessage = request.Packet.ErrorMessage
};
webSocketService.SendPacketToAccount(request.UserId, packet);
return Task.FromResult(new Empty());
}
public override Task<Empty> PushWebSocketPacketToUsers(PushWebSocketPacketToUsersRequest request,
ServerCallContext context)
{
var packet = new Connection.WebSocketPacket
{
Type = request.Packet.Type,
Data = request.Packet.Data,
ErrorMessage = request.Packet.ErrorMessage
};
foreach (var userId in request.UserIds)
webSocketService.SendPacketToAccount(userId, packet);
return Task.FromResult(new Empty());
}
public override Task<Empty> PushWebSocketPacketToDevice(PushWebSocketPacketToDeviceRequest request,
ServerCallContext context)
{
var packet = new Connection.WebSocketPacket
{
Type = request.Packet.Type,
Data = request.Packet.Data,
ErrorMessage = request.Packet.ErrorMessage
};
webSocketService.SendPacketToDevice(request.DeviceId, packet);
return Task.FromResult(new Empty());
}
public override Task<Empty> PushWebSocketPacketToDevices(PushWebSocketPacketToDevicesRequest request,
ServerCallContext context)
{
var packet = new Connection.WebSocketPacket
{
Type = request.Packet.Type,
Data = request.Packet.Data,
ErrorMessage = request.Packet.ErrorMessage
};
foreach (var deviceId in request.DeviceIds)
webSocketService.SendPacketToDevice(deviceId, packet);
return Task.FromResult(new Empty());
}
public override async Task<Empty> SendPushNotification(SendPushNotificationRequest request,
ServerCallContext context)
{
// This is a placeholder implementation. In a real-world scenario, you would
// need to retrieve the account from the database based on the device token.
var account = new Account();
await pushService.SendNotification(
account,
request.Notification.Topic,
request.Notification.Title,
request.Notification.Subtitle,
request.Notification.Body,
GrpcTypeHelper.ConvertFromValueMap(request.Notification.Meta),
request.Notification.ActionUri,
request.Notification.IsSilent,
request.Notification.IsSavable
);
return new Empty();
}
public override async Task<Empty> SendPushNotificationToDevices(SendPushNotificationToDevicesRequest request,
ServerCallContext context)
{
// This is a placeholder implementation. In a real-world scenario, you would
// need to retrieve the accounts from the database based on the device tokens.
var account = new Account();
foreach (var deviceId in request.DeviceIds)
{
await pushService.SendNotification(
account,
request.Notification.Topic,
request.Notification.Title,
request.Notification.Subtitle,
request.Notification.Body,
GrpcTypeHelper.ConvertFromValueMap(request.Notification.Meta),
request.Notification.ActionUri,
request.Notification.IsSilent,
request.Notification.IsSavable
);
}
return new Empty();
}
public override async Task<Empty> SendPushNotificationToUser(SendPushNotificationToUserRequest request,
ServerCallContext context)
{
// This is a placeholder implementation. In a real-world scenario, you would
// need to retrieve the account from the database based on the user ID.
var account = new Account();
await pushService.SendNotification(
account,
request.Notification.Topic,
request.Notification.Title,
request.Notification.Subtitle,
request.Notification.Body,
GrpcTypeHelper.ConvertFromValueMap(request.Notification.Meta),
request.Notification.ActionUri,
request.Notification.IsSilent,
request.Notification.IsSavable
);
return new Empty();
}
public override async Task<Empty> SendPushNotificationToUsers(SendPushNotificationToUsersRequest request,
ServerCallContext context)
{
var notification = new Notification.Notification
{
Topic = request.Notification.Topic,
Title = request.Notification.Title,
Subtitle = request.Notification.Subtitle, Content = request.Notification.Body,
Meta = GrpcTypeHelper.ConvertFromValueMap(request.Notification.Meta),
};
if (request.Notification.ActionUri is not null)
notification.Meta["action_uri"] = request.Notification.ActionUri;
var accounts = request.UserIds.Select(Guid.Parse).ToList();
await pushService.SendNotificationBatch(notification, accounts, request.Notification.IsSavable);
return new Empty();
}
}

View File

@ -0,0 +1,70 @@
using System.Net;
using DysonNetwork.Pusher.Services;
using Microsoft.AspNetCore.HttpOverrides;
namespace DysonNetwork.Pusher.Startup;
public static class ApplicationConfiguration
{
public static WebApplication ConfigureAppMiddleware(this WebApplication app, IConfiguration configuration)
{
app.MapOpenApi();
app.UseSwagger();
app.UseSwaggerUI();
app.UseRequestLocalization();
ConfigureForwardedHeaders(app, configuration);
app.UseCors(opts =>
opts.SetIsOriginAllowed(_ => true)
.WithExposedHeaders("*")
.WithHeaders()
.AllowCredentials()
.AllowAnyHeader()
.AllowAnyMethod()
);
app.UseWebSockets();
app.UseRateLimiter();
app.UseHttpsRedirection();
app.UseAuthentication();
app.UseAuthorization();
app.MapControllers().RequireRateLimiting("fixed");
app.MapStaticAssets().RequireRateLimiting("fixed");
app.MapRazorPages().RequireRateLimiting("fixed");
return app;
}
private static void ConfigureForwardedHeaders(WebApplication app, IConfiguration configuration)
{
var knownProxiesSection = configuration.GetSection("KnownProxies");
var forwardedHeadersOptions = new ForwardedHeadersOptions { ForwardedHeaders = ForwardedHeaders.All };
if (knownProxiesSection.Exists())
{
var proxyAddresses = knownProxiesSection.Get<string[]>();
if (proxyAddresses != null)
foreach (var proxy in proxyAddresses)
if (IPAddress.TryParse(proxy, out var ipAddress))
forwardedHeadersOptions.KnownProxies.Add(ipAddress);
}
else
{
forwardedHeadersOptions.KnownProxies.Add(IPAddress.Any);
forwardedHeadersOptions.KnownProxies.Add(IPAddress.IPv6Any);
}
app.UseForwardedHeaders(forwardedHeadersOptions);
}
public static WebApplication ConfigureGrpcServices(this WebApplication app)
{
app.MapGrpcService<PusherServiceGrpc>();
return app;
}
}

View File

@ -0,0 +1,17 @@
namespace DysonNetwork.Pass.Startup;
public static class KestrelConfiguration
{
public static WebApplicationBuilder ConfigureAppKestrel(this WebApplicationBuilder builder)
{
builder.Host.UseContentRoot(Directory.GetCurrentDirectory());
builder.WebHost.ConfigureKestrel(options =>
{
options.Limits.MaxRequestBodySize = 50 * 1024 * 1024;
options.Limits.KeepAliveTimeout = TimeSpan.FromMinutes(2);
options.Limits.RequestHeadersTimeout = TimeSpan.FromSeconds(30);
});
return builder;
}
}

View File

@ -0,0 +1,22 @@
using Quartz;
namespace DysonNetwork.Pusher.Startup;
public static class ScheduledJobsConfiguration
{
public static IServiceCollection AddAppScheduledJobs(this IServiceCollection services)
{
services.AddQuartz(q =>
{
var appDatabaseRecyclingJob = new JobKey("AppDatabaseRecycling");
q.AddJob<AppDatabaseRecyclingJob>(opts => opts.WithIdentity(appDatabaseRecyclingJob));
q.AddTrigger(opts => opts
.ForJob(appDatabaseRecyclingJob)
.WithIdentity("AppDatabaseRecyclingTrigger")
.WithCronSchedule("0 0 0 * * ?"));
});
services.AddQuartzHostedService(q => q.WaitForJobsToComplete = true);
return services;
}
}

View File

@ -0,0 +1,138 @@
using System.Text.Json;
using System.Threading.RateLimiting;
using DysonNetwork.Pusher.Email;
using DysonNetwork.Pusher.Notification;
using DysonNetwork.Pusher.Services;
using DysonNetwork.Shared.Cache;
using Microsoft.AspNetCore.RateLimiting;
using Microsoft.OpenApi.Models;
using NodaTime;
using NodaTime.Serialization.SystemTextJson;
using StackExchange.Redis;
namespace DysonNetwork.Pusher.Startup;
public static class ServiceCollectionExtensions
{
public static IServiceCollection AddAppServices(this IServiceCollection services, IConfiguration configuration)
{
services.AddDbContext<AppDatabase>();
services.AddSingleton<IConnectionMultiplexer>(_ =>
{
var connection = configuration.GetConnectionString("FastRetrieve")!;
return ConnectionMultiplexer.Connect(connection);
});
services.AddSingleton<IClock>(SystemClock.Instance);
services.AddHttpContextAccessor();
services.AddSingleton<ICacheService, CacheServiceRedis>();
services.AddHttpClient();
// Register gRPC services
services.AddGrpc(options =>
{
options.EnableDetailedErrors = true; // Will be adjusted in Program.cs
options.MaxReceiveMessageSize = 16 * 1024 * 1024; // 16MB
options.MaxSendMessageSize = 16 * 1024 * 1024; // 16MB
});
// Register gRPC reflection for service discovery
services.AddGrpc();
// Register gRPC services
services.AddScoped<PusherServiceGrpc>();
// Register OIDC services
services.AddControllers().AddJsonOptions(options =>
{
options.JsonSerializerOptions.PropertyNamingPolicy = JsonNamingPolicy.SnakeCaseLower;
options.JsonSerializerOptions.DictionaryKeyPolicy = JsonNamingPolicy.SnakeCaseLower;
options.JsonSerializerOptions.ConfigureForNodaTime(DateTimeZoneProviders.Tzdb);
});
return services;
}
public static IServiceCollection AddAppRateLimiting(this IServiceCollection services)
{
services.AddRateLimiter(o => o.AddFixedWindowLimiter(policyName: "fixed", opts =>
{
opts.Window = TimeSpan.FromMinutes(1);
opts.PermitLimit = 120;
opts.QueueLimit = 2;
opts.QueueProcessingOrder = QueueProcessingOrder.OldestFirst;
}));
return services;
}
public static IServiceCollection AddAppAuthentication(this IServiceCollection services)
{
services.AddCors();
services.AddAuthorization();
return services;
}
public static IServiceCollection AddAppSwagger(this IServiceCollection services)
{
services.AddEndpointsApiExplorer();
services.AddSwaggerGen(options =>
{
options.SwaggerDoc("v1", new OpenApiInfo
{
Version = "v1",
Title = "Solar Network API",
Description = "An open-source social network",
TermsOfService = new Uri("https://solsynth.dev/terms"),
License = new OpenApiLicense
{
Name = "APGLv3",
Url = new Uri("https://www.gnu.org/licenses/agpl-3.0.html")
}
});
options.AddSecurityDefinition("Bearer", new OpenApiSecurityScheme
{
In = ParameterLocation.Header,
Description = "Please enter a valid token",
Name = "Authorization",
Type = SecuritySchemeType.Http,
BearerFormat = "JWT",
Scheme = "Bearer"
});
options.AddSecurityRequirement(new OpenApiSecurityRequirement
{
{
new OpenApiSecurityScheme
{
Reference = new OpenApiReference
{
Type = ReferenceType.SecurityScheme,
Id = "Bearer"
}
},
[]
}
});
});
services.AddOpenApi();
return services;
}
public static IServiceCollection AddAppFlushHandlers(this IServiceCollection services)
{
services.AddSingleton<FlushBufferService>();
return services;
}
public static IServiceCollection AddAppBusinessServices(this IServiceCollection services)
{
services.AddScoped<EmailService>();
services.AddScoped<PushService>();
return services;
}
}

View File

@ -3,7 +3,7 @@ using Microsoft.Extensions.Options;
using NetTopologySuite.Geometries;
using Point = NetTopologySuite.Geometries.Point;
namespace DysonNetwork.Shared.Geo;
namespace DysonNetwork.Shared.GeoIp;
public class GeoIpOptions
{

View File

@ -0,0 +1,110 @@
syntax = "proto3";
package proto;
option csharp_namespace = "DysonNetwork.Shared.Proto";
import "google/protobuf/struct.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/wrappers.proto";
// PusherService provides methods to send various types of notifications.
service PusherService {
// Sends an email.
rpc SendEmail(SendEmailRequest) returns (google.protobuf.Empty) {}
// Pushes a packet to a user via WebSocket.
rpc PushWebSocketPacket(PushWebSocketPacketRequest) returns (google.protobuf.Empty) {}
// Pushes a packet to a list of users via WebSocket.
rpc PushWebSocketPacketToUsers(PushWebSocketPacketToUsersRequest) returns (google.protobuf.Empty) {}
// Pushes a packet to a device via WebSocket.
rpc PushWebSocketPacketToDevice(PushWebSocketPacketToDeviceRequest) returns (google.protobuf.Empty) {}
// Pushes a packet to a list of devices via WebSocket.
rpc PushWebSocketPacketToDevices(PushWebSocketPacketToDevicesRequest) returns (google.protobuf.Empty) {}
// Sends a push notification to a device.
rpc SendPushNotification(SendPushNotificationRequest) returns (google.protobuf.Empty) {}
// Sends a push notification to a list of devices.
rpc SendPushNotificationToDevices(SendPushNotificationToDevicesRequest) returns (google.protobuf.Empty) {}
// Sends a push notification to a user.
rpc SendPushNotificationToUser(SendPushNotificationToUserRequest) returns (google.protobuf.Empty) {}
// Sends a push notification to a list of users.
rpc SendPushNotificationToUsers(SendPushNotificationToUsersRequest) returns (google.protobuf.Empty) {}
}
// Represents an email message.
message EmailMessage {
string to_name = 1;
string to_address = 2;
string subject = 3;
string body = 4;
}
message SendEmailRequest {
EmailMessage email = 1;
}
// Represents a WebSocket packet.
message WebSocketPacket {
string type = 1;
google.protobuf.Value data = 2;
google.protobuf.StringValue error_message = 3;
}
message PushWebSocketPacketRequest {
string user_id = 1;
WebSocketPacket packet = 2;
}
message PushWebSocketPacketToUsersRequest {
repeated string user_ids = 1;
WebSocketPacket packet = 2;
}
message PushWebSocketPacketToDeviceRequest {
string device_id = 1;
WebSocketPacket packet = 2;
}
message PushWebSocketPacketToDevicesRequest {
repeated string device_ids = 1;
WebSocketPacket packet = 2;
}
// Represents a push notification.
message PushNotification {
string topic = 1;
string title = 2;
string subtitle = 3;
string body = 4;
map<string, google.protobuf.Value> meta = 5;
optional string action_uri = 6;
bool is_silent = 7;
bool is_savable = 8;
}
message SendPushNotificationRequest {
string device_id = 1;
PushNotification notification = 2;
}
message SendPushNotificationToDevicesRequest {
repeated string device_ids = 1;
PushNotification notification = 2;
}
message SendPushNotificationToUserRequest {
string user_id = 1;
PushNotification notification = 2;
}
message SendPushNotificationToUsersRequest {
repeated string user_ids = 1;
PushNotification notification = 2;
}