Compare commits
3 Commits
fa24f14c05
...
30fd912281
Author | SHA1 | Date | |
---|---|---|---|
30fd912281
|
|||
5bf58f0194
|
|||
8e3e3f09df
|
5
.editorconfig
Normal file
5
.editorconfig
Normal file
@@ -0,0 +1,5 @@
|
||||
root = true
|
||||
|
||||
[*]
|
||||
indent_style = space
|
||||
indent_size = 4
|
@@ -0,0 +1,12 @@
|
||||
using Microsoft.AspNetCore.Mvc;
|
||||
|
||||
[ApiController]
|
||||
[Route("config")]
|
||||
public class ConfigurationController(IConfiguration configuration) : ControllerBase
|
||||
{
|
||||
[HttpGet]
|
||||
public IActionResult Get() => Ok(configuration.GetSection("Client").Get<Dictionary<string, object>>());
|
||||
|
||||
[HttpGet("site")]
|
||||
public IActionResult GetSiteUrl() => Ok(configuration["SiteUrl"]);
|
||||
}
|
@@ -87,7 +87,7 @@ var swaggerRoutes = serviceNames.Select(serviceName => new RouteConfig
|
||||
RouteId = $"{serviceName}-swagger",
|
||||
ClusterId = serviceName,
|
||||
Match = new RouteMatch { Path = $"/swagger/{serviceName}/{{**catch-all}}" },
|
||||
Transforms =
|
||||
Transforms =
|
||||
[
|
||||
new Dictionary<string, string> { { "PathRemovePrefix", $"/swagger/{serviceName}" } },
|
||||
new Dictionary<string, string> { { "PathPrefix", "/swagger" } }
|
||||
@@ -106,9 +106,11 @@ var clusters = serviceNames.Select(serviceName => new ClusterConfig
|
||||
}).ToArray();
|
||||
|
||||
builder.Services
|
||||
.AddReverseProxy()
|
||||
.LoadFromMemory(routes, clusters)
|
||||
.AddServiceDiscoveryDestinationResolver();
|
||||
.AddReverseProxy()
|
||||
.LoadFromMemory(routes, clusters)
|
||||
.AddServiceDiscoveryDestinationResolver();
|
||||
|
||||
builder.Services.AddControllers();
|
||||
|
||||
var app = builder.Build();
|
||||
|
||||
@@ -118,4 +120,6 @@ app.UseRateLimiter();
|
||||
|
||||
app.MapReverseProxy().RequireRateLimiting("fixed");
|
||||
|
||||
app.MapControllers();
|
||||
|
||||
app.Run();
|
||||
|
@@ -5,5 +5,9 @@
|
||||
"Microsoft.AspNetCore": "Warning"
|
||||
}
|
||||
},
|
||||
"AllowedHosts": "*"
|
||||
"AllowedHosts": "*",
|
||||
"SiteUrl": "http://localhost:3000",
|
||||
"Client": {
|
||||
"SomeSetting": "SomeValue"
|
||||
}
|
||||
}
|
@@ -53,32 +53,10 @@ public class BroadcastEventHandler(
|
||||
continue;
|
||||
|
||||
// Handle subscription orders
|
||||
if (evt.ProductIdentifier.StartsWith(SubscriptionType.StellarProgram))
|
||||
{
|
||||
logger.LogInformation("Handling stellar program order: {OrderId}", evt.OrderId);
|
||||
|
||||
await using var scope = serviceProvider.CreateAsyncScope();
|
||||
var db = scope.ServiceProvider.GetRequiredService<AppDatabase>();
|
||||
var subscriptions = scope.ServiceProvider.GetRequiredService<SubscriptionService>();
|
||||
|
||||
var order = await db.PaymentOrders.FindAsync(
|
||||
[evt.OrderId],
|
||||
cancellationToken: stoppingToken
|
||||
);
|
||||
if (order is null)
|
||||
{
|
||||
logger.LogWarning("Order with ID {OrderId} not found. Redelivering.", evt.OrderId);
|
||||
await msg.NakAsync(cancellationToken: stoppingToken);
|
||||
continue;
|
||||
}
|
||||
|
||||
await subscriptions.HandleSubscriptionOrder(order);
|
||||
|
||||
logger.LogInformation("Subscription for order {OrderId} handled successfully.", evt.OrderId);
|
||||
await msg.AckAsync(cancellationToken: stoppingToken);
|
||||
}
|
||||
// Handle gift orders
|
||||
else if (evt.Meta?.TryGetValue("gift_id", out var giftIdValue) == true)
|
||||
if (
|
||||
evt.ProductIdentifier.StartsWith(SubscriptionType.StellarProgram) &&
|
||||
evt.Meta?.TryGetValue("gift_id", out var giftIdValue) == true
|
||||
)
|
||||
{
|
||||
logger.LogInformation("Handling gift order: {OrderId}", evt.OrderId);
|
||||
|
||||
@@ -102,6 +80,30 @@ public class BroadcastEventHandler(
|
||||
logger.LogInformation("Gift for order {OrderId} handled successfully.", evt.OrderId);
|
||||
await msg.AckAsync(cancellationToken: stoppingToken);
|
||||
}
|
||||
else if (evt.ProductIdentifier.StartsWith(SubscriptionType.StellarProgram))
|
||||
{
|
||||
logger.LogInformation("Handling stellar program order: {OrderId}", evt.OrderId);
|
||||
|
||||
await using var scope = serviceProvider.CreateAsyncScope();
|
||||
var db = scope.ServiceProvider.GetRequiredService<AppDatabase>();
|
||||
var subscriptions = scope.ServiceProvider.GetRequiredService<SubscriptionService>();
|
||||
|
||||
var order = await db.PaymentOrders.FindAsync(
|
||||
[evt.OrderId],
|
||||
cancellationToken: stoppingToken
|
||||
);
|
||||
if (order is null)
|
||||
{
|
||||
logger.LogWarning("Order with ID {OrderId} not found. Redelivering.", evt.OrderId);
|
||||
await msg.NakAsync(cancellationToken: stoppingToken);
|
||||
continue;
|
||||
}
|
||||
|
||||
await subscriptions.HandleSubscriptionOrder(order);
|
||||
|
||||
logger.LogInformation("Subscription for order {OrderId} handled successfully.", evt.OrderId);
|
||||
await msg.AckAsync(cancellationToken: stoppingToken);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Not a subscription or gift order, skip
|
||||
|
@@ -662,33 +662,7 @@ public class SubscriptionService(
|
||||
db.WalletGifts.Add(gift);
|
||||
await db.SaveChangesAsync();
|
||||
|
||||
// Create order and process payment
|
||||
var order = await payment.CreateOrderAsync(
|
||||
null, // No specific payee wallet for gifts
|
||||
subscriptionInfo.Currency,
|
||||
finalPrice,
|
||||
appIdentifier: "gift",
|
||||
productIdentifier: subscriptionIdentifier,
|
||||
meta: new Dictionary<string, object>
|
||||
{
|
||||
["gift_id"] = gift.Id.ToString()
|
||||
}
|
||||
);
|
||||
|
||||
// If payment method is in-app wallet, process payment immediately
|
||||
if (paymentMethod == SubscriptionPaymentMethod.InAppWallet)
|
||||
{
|
||||
var gifterWallet = await db.Wallets.FirstOrDefaultAsync(w => w.AccountId == gifter.Id);
|
||||
if (gifterWallet == null)
|
||||
throw new InvalidOperationException("Gifter wallet not found.");
|
||||
|
||||
await payment.PayOrderAsync(order.Id, gifterWallet);
|
||||
|
||||
// Mark gift as sent after successful payment
|
||||
gift.Status = DysonNetwork.Shared.Models.GiftStatus.Sent;
|
||||
gift.UpdatedAt = SystemClock.Instance.GetCurrentInstant();
|
||||
await db.SaveChangesAsync();
|
||||
}
|
||||
gift.Gifter = gifter;
|
||||
|
||||
return gift;
|
||||
}
|
||||
|
@@ -5,8 +5,6 @@ using DysonNetwork.Shared.Proto;
|
||||
using DysonNetwork.Shared.Stream;
|
||||
using Google.Protobuf;
|
||||
using NATS.Client.Core;
|
||||
using NATS.Client.JetStream;
|
||||
using NATS.Client.JetStream.Models;
|
||||
using NATS.Net;
|
||||
|
||||
namespace DysonNetwork.Ring.Services;
|
||||
@@ -39,29 +37,19 @@ public class QueueBackgroundService(
|
||||
private async Task RunConsumerAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
logger.LogInformation("Queue consumer started");
|
||||
var js = nats.CreateJetStreamContext();
|
||||
|
||||
await js.EnsureStreamCreated("pusher_events", [QueueName]);
|
||||
|
||||
var consumer = await js.CreateOrUpdateConsumerAsync(
|
||||
"pusher_events",
|
||||
new ConsumerConfig(QueueGroup), // durable consumer
|
||||
cancellationToken: stoppingToken);
|
||||
|
||||
await foreach (var msg in consumer.ConsumeAsync<byte[]>(cancellationToken: stoppingToken))
|
||||
await foreach (var msg in nats.SubscribeAsync<byte[]>(QueueName, queueGroup: QueueGroup, cancellationToken: stoppingToken))
|
||||
{
|
||||
try
|
||||
{
|
||||
var message = GrpcTypeHelper.ConvertByteStringToObject<QueueMessage>(ByteString.CopyFrom(msg.Data));
|
||||
if (message is not null)
|
||||
{
|
||||
await ProcessMessageAsync(msg, message, stoppingToken);
|
||||
await msg.AckAsync(cancellationToken: stoppingToken);
|
||||
await ProcessMessageAsync(message, stoppingToken);
|
||||
}
|
||||
else
|
||||
{
|
||||
logger.LogWarning($"Invalid message format for {msg.Subject}");
|
||||
await msg.AckAsync(cancellationToken: stoppingToken); // Acknowledge invalid messages to avoid redelivery
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
|
||||
@@ -72,12 +60,11 @@ public class QueueBackgroundService(
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogError(ex, "Error in queue consumer");
|
||||
await msg.NakAsync(cancellationToken: stoppingToken);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async ValueTask ProcessMessageAsync(NatsJSMsg<byte[]> rawMsg, QueueMessage message,
|
||||
private async ValueTask ProcessMessageAsync(QueueMessage message,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
using var scope = serviceProvider.CreateScope();
|
||||
|
@@ -1,7 +1,6 @@
|
||||
using System.Text.Json;
|
||||
using DysonNetwork.Shared.Proto;
|
||||
using NATS.Client.Core;
|
||||
using NATS.Net;
|
||||
|
||||
namespace DysonNetwork.Ring.Services;
|
||||
|
||||
@@ -21,15 +20,14 @@ public class QueueService(INatsConnection nats)
|
||||
})
|
||||
};
|
||||
var rawMessage = GrpcTypeHelper.ConvertObjectToByteString(message).ToByteArray();
|
||||
var js = nats.CreateJetStreamContext();
|
||||
await js.PublishAsync(QueueBackgroundService.QueueName, rawMessage);
|
||||
await nats.PublishAsync(QueueBackgroundService.QueueName, rawMessage);
|
||||
}
|
||||
|
||||
public async Task EnqueuePushNotification(Shared.Models.SnNotification notification, Guid userId, bool isSavable = false)
|
||||
{
|
||||
// Update the account ID in case it wasn't set
|
||||
notification.AccountId = userId;
|
||||
|
||||
|
||||
var message = new QueueMessage
|
||||
{
|
||||
Type = QueueMessageType.PushNotification,
|
||||
@@ -37,8 +35,7 @@ public class QueueService(INatsConnection nats)
|
||||
Data = JsonSerializer.Serialize(notification)
|
||||
};
|
||||
var rawMessage = GrpcTypeHelper.ConvertObjectToByteString(message).ToByteArray();
|
||||
var js = nats.CreateJetStreamContext();
|
||||
await js.PublishAsync(QueueBackgroundService.QueueName, rawMessage);
|
||||
await nats.PublishAsync(QueueBackgroundService.QueueName, rawMessage);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -61,4 +58,4 @@ public class EmailMessage
|
||||
public string ToAddress { get; set; } = string.Empty;
|
||||
public string Subject { get; set; } = string.Empty;
|
||||
public string Body { get; set; } = string.Empty;
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user