♻️ Splitted wallet service
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
using DysonNetwork.Shared.Auth;
|
||||
using DysonNetwork.Shared.Networking;
|
||||
using DysonNetwork.Wallet.Payment;
|
||||
|
||||
namespace DysonNetwork.Wallet.Startup;
|
||||
|
||||
@@ -25,6 +26,8 @@ public static class ApplicationConfiguration
|
||||
|
||||
public static WebApplication ConfigureGrpcServices(this WebApplication app)
|
||||
{
|
||||
app.MapGrpcService<WalletServiceGrpc>();
|
||||
app.MapGrpcService<PaymentServiceGrpc>();
|
||||
app.MapGrpcReflectionService();
|
||||
|
||||
return app;
|
||||
|
||||
@@ -1,4 +1,14 @@
|
||||
using System.Text.Json;
|
||||
using DysonNetwork.Shared.Cache;
|
||||
using DysonNetwork.Shared.Models;
|
||||
using DysonNetwork.Shared.Proto;
|
||||
using DysonNetwork.Shared.Queue;
|
||||
using DysonNetwork.Wallet.Payment;
|
||||
using Google.Protobuf;
|
||||
using NATS.Client.Core;
|
||||
using NATS.Client.JetStream.Models;
|
||||
using NATS.Net;
|
||||
using NodaTime;
|
||||
|
||||
namespace DysonNetwork.Wallet.Startup;
|
||||
|
||||
@@ -8,8 +18,103 @@ public class BroadcastEventHandler(
|
||||
IServiceProvider serviceProvider
|
||||
) : BackgroundService
|
||||
{
|
||||
protected override Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
return Task.CompletedTask;
|
||||
var paymentTask = HandlePaymentEventsAsync(stoppingToken);
|
||||
|
||||
await Task.WhenAll(paymentTask);
|
||||
}
|
||||
|
||||
private async Task HandlePaymentEventsAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
var js = nats.CreateJetStreamContext();
|
||||
|
||||
await js.EnsureStreamCreated("payment_events", [PaymentOrderEventBase.Type]);
|
||||
|
||||
var consumer = await js.CreateOrUpdateConsumerAsync("payment_events",
|
||||
new ConsumerConfig("wallet_payment_handler"),
|
||||
cancellationToken: stoppingToken);
|
||||
|
||||
await foreach (var msg in consumer.ConsumeAsync<byte[]>(cancellationToken: stoppingToken))
|
||||
{
|
||||
PaymentOrderEvent? evt = null;
|
||||
try
|
||||
{
|
||||
evt = JsonSerializer.Deserialize<PaymentOrderEvent>(msg.Data, GrpcTypeHelper.SerializerOptions);
|
||||
|
||||
logger.LogInformation(
|
||||
"Received order event: {ProductIdentifier} {OrderId}",
|
||||
evt?.ProductIdentifier,
|
||||
evt?.OrderId
|
||||
);
|
||||
|
||||
if (evt?.ProductIdentifier is null)
|
||||
continue;
|
||||
|
||||
// Handle subscription orders
|
||||
if (
|
||||
evt.ProductIdentifier.StartsWith(SubscriptionType.StellarProgram) &&
|
||||
evt.Meta?.TryGetValue("gift_id", out var giftIdValue) == true
|
||||
)
|
||||
{
|
||||
logger.LogInformation("Handling gift order: {OrderId}", evt.OrderId);
|
||||
|
||||
await using var scope = serviceProvider.CreateAsyncScope();
|
||||
var db = scope.ServiceProvider.GetRequiredService<AppDatabase>();
|
||||
var subscriptions = scope.ServiceProvider.GetRequiredService<Payment.SubscriptionService>();
|
||||
|
||||
var order = await db.PaymentOrders.FindAsync(
|
||||
[evt.OrderId],
|
||||
cancellationToken: stoppingToken
|
||||
);
|
||||
if (order is null)
|
||||
{
|
||||
logger.LogWarning("Order with ID {OrderId} not found. Redelivering.", evt.OrderId);
|
||||
await msg.NakAsync(cancellationToken: stoppingToken);
|
||||
continue;
|
||||
}
|
||||
|
||||
await subscriptions.HandleGiftOrder(order);
|
||||
|
||||
logger.LogInformation("Gift for order {OrderId} handled successfully.", evt.OrderId);
|
||||
await msg.AckAsync(cancellationToken: stoppingToken);
|
||||
}
|
||||
else if (evt.ProductIdentifier.StartsWith(SubscriptionType.StellarProgram))
|
||||
{
|
||||
logger.LogInformation("Handling stellar program order: {OrderId}", evt.OrderId);
|
||||
|
||||
await using var scope = serviceProvider.CreateAsyncScope();
|
||||
var db = scope.ServiceProvider.GetRequiredService<AppDatabase>();
|
||||
var subscriptions = scope.ServiceProvider.GetRequiredService<Payment.SubscriptionService>();
|
||||
|
||||
var order = await db.PaymentOrders.FindAsync(
|
||||
[evt.OrderId],
|
||||
cancellationToken: stoppingToken
|
||||
);
|
||||
if (order is null)
|
||||
{
|
||||
logger.LogWarning("Order with ID {OrderId} not found. Redelivering.", evt.OrderId);
|
||||
await msg.NakAsync(cancellationToken: stoppingToken);
|
||||
continue;
|
||||
}
|
||||
|
||||
await subscriptions.HandleSubscriptionOrder(order);
|
||||
|
||||
logger.LogInformation("Subscription for order {OrderId} handled successfully.", evt.OrderId);
|
||||
await msg.AckAsync(cancellationToken: stoppingToken);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Not a subscription or gift order, skip
|
||||
continue;
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogError(ex, "Error processing payment order event for order {OrderId}. Redelivering.",
|
||||
evt?.OrderId);
|
||||
await msg.NakAsync(cancellationToken: stoppingToken);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
using Quartz;
|
||||
using DysonNetwork.Wallet.Payment;
|
||||
|
||||
namespace DysonNetwork.Wallet.Startup;
|
||||
|
||||
@@ -13,6 +14,33 @@ public static class ScheduledJobsConfiguration
|
||||
.ForJob("AppDatabaseRecycling")
|
||||
.WithIdentity("AppDatabaseRecyclingTrigger")
|
||||
.WithCronSchedule("0 0 0 * * ?"));
|
||||
|
||||
q.AddJob<SubscriptionRenewalJob>(opts => opts.WithIdentity("SubscriptionRenewal"));
|
||||
q.AddTrigger(opts => opts
|
||||
.ForJob("SubscriptionRenewal")
|
||||
.WithIdentity("SubscriptionRenewalTrigger")
|
||||
.WithSimpleSchedule(o => o
|
||||
.WithIntervalInMinutes(30)
|
||||
.RepeatForever())
|
||||
);
|
||||
|
||||
q.AddJob<GiftCleanupJob>(opts => opts.WithIdentity("GiftCleanup"));
|
||||
q.AddTrigger(opts => opts
|
||||
.ForJob("GiftCleanup")
|
||||
.WithIdentity("GiftCleanupTrigger")
|
||||
.WithSimpleSchedule(o => o
|
||||
.WithIntervalInHours(1)
|
||||
.RepeatForever())
|
||||
);
|
||||
|
||||
q.AddJob<FundExpirationJob>(opts => opts.WithIdentity("FundExpiration"));
|
||||
q.AddTrigger(opts => opts
|
||||
.ForJob("FundExpiration")
|
||||
.WithIdentity("FundExpirationTrigger")
|
||||
.WithSimpleSchedule(o => o
|
||||
.WithIntervalInHours(1)
|
||||
.RepeatForever())
|
||||
);
|
||||
});
|
||||
services.AddQuartzHostedService(q => q.WaitForJobsToComplete = true);
|
||||
|
||||
|
||||
@@ -7,6 +7,9 @@ using System.Text.Json.Serialization;
|
||||
using DysonNetwork.Shared.Cache;
|
||||
using DysonNetwork.Shared.Geometry;
|
||||
using DysonNetwork.Shared.Registry;
|
||||
using DysonNetwork.Wallet.Localization;
|
||||
using DysonNetwork.Wallet.Payment;
|
||||
using DysonNetwork.Wallet.Payment.PaymentHandlers;
|
||||
|
||||
namespace DysonNetwork.Wallet.Startup;
|
||||
|
||||
@@ -39,6 +42,10 @@ public static class ServiceCollectionExtensions
|
||||
options.JsonSerializerOptions.DictionaryKeyPolicy = JsonNamingPolicy.SnakeCaseLower;
|
||||
|
||||
options.JsonSerializerOptions.ConfigureForNodaTime(DateTimeZoneProviders.Tzdb);
|
||||
}).AddDataAnnotationsLocalization(options =>
|
||||
{
|
||||
options.DataAnnotationLocalizerProvider = (type, factory) =>
|
||||
factory.Create(typeof(NotificationResource));
|
||||
});
|
||||
services.AddRazorPages();
|
||||
|
||||
@@ -82,6 +89,12 @@ public static class ServiceCollectionExtensions
|
||||
services.Configure<GeoOptions>(configuration.GetSection("GeoIP"));
|
||||
services.AddScoped<GeoService>();
|
||||
|
||||
// Register Wallet services
|
||||
services.AddScoped<WalletService>();
|
||||
services.AddScoped<PaymentService>();
|
||||
services.AddScoped<SubscriptionService>();
|
||||
services.AddScoped<AfdianPaymentHandler>();
|
||||
|
||||
services.AddHostedService<BroadcastEventHandler>();
|
||||
|
||||
return services;
|
||||
|
||||
Reference in New Issue
Block a user