Compare commits

..

3 Commits

Author SHA1 Message Date
30fd912281 Optimize queue usage 2025-10-03 16:38:10 +08:00
5bf58f0194 🐛 Fix subscription gift 2025-10-03 16:38:01 +08:00
8e3e3f09df Gateway config serving 2025-10-03 16:37:51 +08:00
9 changed files with 66 additions and 2318 deletions

5
.editorconfig Normal file
View File

@@ -0,0 +1,5 @@
root = true
[*]
indent_style = space
indent_size = 4

View File

@@ -0,0 +1,12 @@
using Microsoft.AspNetCore.Mvc;
[ApiController]
[Route("config")]
public class ConfigurationController(IConfiguration configuration) : ControllerBase
{
[HttpGet]
public IActionResult Get() => Ok(configuration.GetSection("Client").Get<Dictionary<string, object>>());
[HttpGet("site")]
public IActionResult GetSiteUrl() => Ok(configuration["SiteUrl"]);
}

View File

@@ -106,9 +106,11 @@ var clusters = serviceNames.Select(serviceName => new ClusterConfig
}).ToArray(); }).ToArray();
builder.Services builder.Services
.AddReverseProxy() .AddReverseProxy()
.LoadFromMemory(routes, clusters) .LoadFromMemory(routes, clusters)
.AddServiceDiscoveryDestinationResolver(); .AddServiceDiscoveryDestinationResolver();
builder.Services.AddControllers();
var app = builder.Build(); var app = builder.Build();
@@ -118,4 +120,6 @@ app.UseRateLimiter();
app.MapReverseProxy().RequireRateLimiting("fixed"); app.MapReverseProxy().RequireRateLimiting("fixed");
app.MapControllers();
app.Run(); app.Run();

View File

@@ -5,5 +5,9 @@
"Microsoft.AspNetCore": "Warning" "Microsoft.AspNetCore": "Warning"
} }
}, },
"AllowedHosts": "*" "AllowedHosts": "*",
"SiteUrl": "http://localhost:3000",
"Client": {
"SomeSetting": "SomeValue"
}
} }

View File

@@ -53,32 +53,10 @@ public class BroadcastEventHandler(
continue; continue;
// Handle subscription orders // Handle subscription orders
if (evt.ProductIdentifier.StartsWith(SubscriptionType.StellarProgram)) if (
{ evt.ProductIdentifier.StartsWith(SubscriptionType.StellarProgram) &&
logger.LogInformation("Handling stellar program order: {OrderId}", evt.OrderId); evt.Meta?.TryGetValue("gift_id", out var giftIdValue) == true
)
await using var scope = serviceProvider.CreateAsyncScope();
var db = scope.ServiceProvider.GetRequiredService<AppDatabase>();
var subscriptions = scope.ServiceProvider.GetRequiredService<SubscriptionService>();
var order = await db.PaymentOrders.FindAsync(
[evt.OrderId],
cancellationToken: stoppingToken
);
if (order is null)
{
logger.LogWarning("Order with ID {OrderId} not found. Redelivering.", evt.OrderId);
await msg.NakAsync(cancellationToken: stoppingToken);
continue;
}
await subscriptions.HandleSubscriptionOrder(order);
logger.LogInformation("Subscription for order {OrderId} handled successfully.", evt.OrderId);
await msg.AckAsync(cancellationToken: stoppingToken);
}
// Handle gift orders
else if (evt.Meta?.TryGetValue("gift_id", out var giftIdValue) == true)
{ {
logger.LogInformation("Handling gift order: {OrderId}", evt.OrderId); logger.LogInformation("Handling gift order: {OrderId}", evt.OrderId);
@@ -102,6 +80,30 @@ public class BroadcastEventHandler(
logger.LogInformation("Gift for order {OrderId} handled successfully.", evt.OrderId); logger.LogInformation("Gift for order {OrderId} handled successfully.", evt.OrderId);
await msg.AckAsync(cancellationToken: stoppingToken); await msg.AckAsync(cancellationToken: stoppingToken);
} }
else if (evt.ProductIdentifier.StartsWith(SubscriptionType.StellarProgram))
{
logger.LogInformation("Handling stellar program order: {OrderId}", evt.OrderId);
await using var scope = serviceProvider.CreateAsyncScope();
var db = scope.ServiceProvider.GetRequiredService<AppDatabase>();
var subscriptions = scope.ServiceProvider.GetRequiredService<SubscriptionService>();
var order = await db.PaymentOrders.FindAsync(
[evt.OrderId],
cancellationToken: stoppingToken
);
if (order is null)
{
logger.LogWarning("Order with ID {OrderId} not found. Redelivering.", evt.OrderId);
await msg.NakAsync(cancellationToken: stoppingToken);
continue;
}
await subscriptions.HandleSubscriptionOrder(order);
logger.LogInformation("Subscription for order {OrderId} handled successfully.", evt.OrderId);
await msg.AckAsync(cancellationToken: stoppingToken);
}
else else
{ {
// Not a subscription or gift order, skip // Not a subscription or gift order, skip

View File

@@ -662,33 +662,7 @@ public class SubscriptionService(
db.WalletGifts.Add(gift); db.WalletGifts.Add(gift);
await db.SaveChangesAsync(); await db.SaveChangesAsync();
// Create order and process payment gift.Gifter = gifter;
var order = await payment.CreateOrderAsync(
null, // No specific payee wallet for gifts
subscriptionInfo.Currency,
finalPrice,
appIdentifier: "gift",
productIdentifier: subscriptionIdentifier,
meta: new Dictionary<string, object>
{
["gift_id"] = gift.Id.ToString()
}
);
// If payment method is in-app wallet, process payment immediately
if (paymentMethod == SubscriptionPaymentMethod.InAppWallet)
{
var gifterWallet = await db.Wallets.FirstOrDefaultAsync(w => w.AccountId == gifter.Id);
if (gifterWallet == null)
throw new InvalidOperationException("Gifter wallet not found.");
await payment.PayOrderAsync(order.Id, gifterWallet);
// Mark gift as sent after successful payment
gift.Status = DysonNetwork.Shared.Models.GiftStatus.Sent;
gift.UpdatedAt = SystemClock.Instance.GetCurrentInstant();
await db.SaveChangesAsync();
}
return gift; return gift;
} }

View File

@@ -5,8 +5,6 @@ using DysonNetwork.Shared.Proto;
using DysonNetwork.Shared.Stream; using DysonNetwork.Shared.Stream;
using Google.Protobuf; using Google.Protobuf;
using NATS.Client.Core; using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;
using NATS.Net; using NATS.Net;
namespace DysonNetwork.Ring.Services; namespace DysonNetwork.Ring.Services;
@@ -39,29 +37,19 @@ public class QueueBackgroundService(
private async Task RunConsumerAsync(CancellationToken stoppingToken) private async Task RunConsumerAsync(CancellationToken stoppingToken)
{ {
logger.LogInformation("Queue consumer started"); logger.LogInformation("Queue consumer started");
var js = nats.CreateJetStreamContext();
await js.EnsureStreamCreated("pusher_events", [QueueName]); await foreach (var msg in nats.SubscribeAsync<byte[]>(QueueName, queueGroup: QueueGroup, cancellationToken: stoppingToken))
var consumer = await js.CreateOrUpdateConsumerAsync(
"pusher_events",
new ConsumerConfig(QueueGroup), // durable consumer
cancellationToken: stoppingToken);
await foreach (var msg in consumer.ConsumeAsync<byte[]>(cancellationToken: stoppingToken))
{ {
try try
{ {
var message = GrpcTypeHelper.ConvertByteStringToObject<QueueMessage>(ByteString.CopyFrom(msg.Data)); var message = GrpcTypeHelper.ConvertByteStringToObject<QueueMessage>(ByteString.CopyFrom(msg.Data));
if (message is not null) if (message is not null)
{ {
await ProcessMessageAsync(msg, message, stoppingToken); await ProcessMessageAsync(message, stoppingToken);
await msg.AckAsync(cancellationToken: stoppingToken);
} }
else else
{ {
logger.LogWarning($"Invalid message format for {msg.Subject}"); logger.LogWarning($"Invalid message format for {msg.Subject}");
await msg.AckAsync(cancellationToken: stoppingToken); // Acknowledge invalid messages to avoid redelivery
} }
} }
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
@@ -72,12 +60,11 @@ public class QueueBackgroundService(
catch (Exception ex) catch (Exception ex)
{ {
logger.LogError(ex, "Error in queue consumer"); logger.LogError(ex, "Error in queue consumer");
await msg.NakAsync(cancellationToken: stoppingToken);
} }
} }
} }
private async ValueTask ProcessMessageAsync(NatsJSMsg<byte[]> rawMsg, QueueMessage message, private async ValueTask ProcessMessageAsync(QueueMessage message,
CancellationToken cancellationToken) CancellationToken cancellationToken)
{ {
using var scope = serviceProvider.CreateScope(); using var scope = serviceProvider.CreateScope();

View File

@@ -1,7 +1,6 @@
using System.Text.Json; using System.Text.Json;
using DysonNetwork.Shared.Proto; using DysonNetwork.Shared.Proto;
using NATS.Client.Core; using NATS.Client.Core;
using NATS.Net;
namespace DysonNetwork.Ring.Services; namespace DysonNetwork.Ring.Services;
@@ -21,8 +20,7 @@ public class QueueService(INatsConnection nats)
}) })
}; };
var rawMessage = GrpcTypeHelper.ConvertObjectToByteString(message).ToByteArray(); var rawMessage = GrpcTypeHelper.ConvertObjectToByteString(message).ToByteArray();
var js = nats.CreateJetStreamContext(); await nats.PublishAsync(QueueBackgroundService.QueueName, rawMessage);
await js.PublishAsync(QueueBackgroundService.QueueName, rawMessage);
} }
public async Task EnqueuePushNotification(Shared.Models.SnNotification notification, Guid userId, bool isSavable = false) public async Task EnqueuePushNotification(Shared.Models.SnNotification notification, Guid userId, bool isSavable = false)
@@ -37,8 +35,7 @@ public class QueueService(INatsConnection nats)
Data = JsonSerializer.Serialize(notification) Data = JsonSerializer.Serialize(notification)
}; };
var rawMessage = GrpcTypeHelper.ConvertObjectToByteString(message).ToByteArray(); var rawMessage = GrpcTypeHelper.ConvertObjectToByteString(message).ToByteArray();
var js = nats.CreateJetStreamContext(); await nats.PublishAsync(QueueBackgroundService.QueueName, rawMessage);
await js.PublishAsync(QueueBackgroundService.QueueName, rawMessage);
} }
} }

2237
debug.txt

File diff suppressed because it is too large Load Diff