Files
Swarm/DysonNetwork.Sphere/ActivityPub/ActivityPubDeliveryWorker.cs
2026-01-01 01:20:44 +08:00

216 lines
8.7 KiB
C#

using System.Net;
using System.Security.Cryptography;
using System.Text;
using System.Text.Json;
using DysonNetwork.Shared.Models;
using DysonNetwork.Shared.Proto;
using Google.Protobuf;
using Microsoft.Extensions.Options;
using NATS.Client.Core;
using NodaTime;
namespace DysonNetwork.Sphere.ActivityPub;
public class ActivityPubDeliveryWorker(
INatsConnection nats,
IServiceProvider serviceProvider,
IHttpClientFactory httpClientFactory,
IOptions<ActivityPubDeliveryOptions> options,
ILogger<ActivityPubDeliveryWorker> logger,
IClock clock
) : BackgroundService
{
public const string QueueName = "activitypub_delivery_queue";
private const string QueueGroup = "activitypub_delivery_workers";
private readonly List<Task> _consumerTasks = [];
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
logger.LogInformation("Starting {ConsumerCount} ActivityPub delivery consumers", options.Value.ConsumerCount);
for (var i = 0; i < options.Value.ConsumerCount; i++)
_consumerTasks.Add(Task.Run(() => RunConsumerAsync(stoppingToken), stoppingToken));
await Task.WhenAll(_consumerTasks);
}
private async Task RunConsumerAsync(CancellationToken stoppingToken)
{
logger.LogInformation("ActivityPub delivery consumer started");
await foreach (var msg in nats.SubscribeAsync<byte[]>(QueueName, queueGroup: QueueGroup, cancellationToken: stoppingToken))
{
try
{
var message = GrpcTypeHelper.ConvertByteStringToObject<ActivityPubDeliveryMessage>(ByteString.CopyFrom(msg.Data));
if (message is not null)
{
await ProcessDeliveryAsync(message, stoppingToken);
}
else
{
logger.LogWarning("Invalid message format for ActivityPub delivery");
}
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
break;
}
catch (Exception ex)
{
logger.LogError(ex, "Error in ActivityPub delivery consumer");
}
}
}
private async Task ProcessDeliveryAsync(ActivityPubDeliveryMessage message, CancellationToken cancellationToken)
{
using var scope = serviceProvider.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<AppDatabase>();
var signatureService = scope.ServiceProvider.GetRequiredService<ActivityPubSignatureService>();
logger.LogDebug("Processing ActivityPub delivery {DeliveryId} to {Inbox}", message.DeliveryId, message.InboxUri);
var delivery = await db.ActivityPubDeliveries.FindAsync([message.DeliveryId], cancellationToken);
if (delivery == null)
{
logger.LogWarning("Delivery record not found: {DeliveryId}", message.DeliveryId);
return;
}
delivery.Status = DeliveryStatus.Processing;
delivery.LastAttemptAt = clock.GetCurrentInstant();
await db.SaveChangesAsync(cancellationToken);
try
{
var success = await SendActivityToInboxAsync(
message.Activity,
message.InboxUri,
message.ActorUri,
signatureService,
httpClientFactory,
logger,
cancellationToken
);
if (success.IsSuccessStatusCode)
{
delivery.Status = DeliveryStatus.Sent;
delivery.SentAt = clock.GetCurrentInstant();
delivery.ResponseStatusCode = success.StatusCode.ToString();
logger.LogInformation("Successfully delivered activity {ActivityId} to {Inbox}",
message.ActivityId, message.InboxUri);
}
else
{
var shouldRetry = ShouldRetry(success.StatusCode);
delivery.ResponseStatusCode = success.StatusCode.ToString();
delivery.ErrorMessage = await success.Content.ReadAsStringAsync(cancellationToken);
if (shouldRetry && delivery.RetryCount < options.Value.MaxRetries)
{
delivery.Status = DeliveryStatus.Failed;
delivery.RetryCount++;
delivery.NextRetryAt = CalculateNextRetryAt(delivery.RetryCount, clock);
logger.LogWarning("Failed to deliver activity {ActivityId} to {Inbox}. Status: {Status}. Retry {RetryCount}/{MaxRetries} at {NextRetry}",
message.ActivityId, message.InboxUri, success.StatusCode, delivery.RetryCount, options.Value.MaxRetries, delivery.NextRetryAt);
}
else
{
delivery.Status = DeliveryStatus.ExhaustedRetries;
logger.LogError("Exhausted retries for activity {ActivityId} to {Inbox}. Status: {Status}",
message.ActivityId, message.InboxUri, success.StatusCode);
}
}
}
catch (Exception ex)
{
delivery.Status = DeliveryStatus.Failed;
delivery.ErrorMessage = ex.Message;
if (delivery.RetryCount < options.Value.MaxRetries)
{
delivery.RetryCount++;
delivery.NextRetryAt = CalculateNextRetryAt(delivery.RetryCount, clock);
logger.LogError(ex, "Error delivering activity {ActivityId} to {Inbox}. Retry {RetryCount}/{MaxRetries} at {NextRetry}",
message.ActivityId, message.InboxUri, delivery.RetryCount, options.Value.MaxRetries, delivery.NextRetryAt);
}
else
{
delivery.Status = DeliveryStatus.ExhaustedRetries;
logger.LogError(ex, "Exhausted retries for activity {ActivityId} to {Inbox}",
message.ActivityId, message.InboxUri);
}
}
await db.SaveChangesAsync(cancellationToken);
}
private static async Task<HttpResponseMessage> SendActivityToInboxAsync(
Dictionary<string, object> activity,
string inboxUrl,
string actorUri,
ActivityPubSignatureService signatureService,
IHttpClientFactory httpClientFactory,
ILogger logger,
CancellationToken cancellationToken)
{
var client = httpClientFactory.CreateClient();
var json = JsonSerializer.Serialize(activity);
var request = new HttpRequestMessage(HttpMethod.Post, inboxUrl)
{
Content = new StringContent(json, Encoding.UTF8, "application/activity+json")
};
request.Headers.Date = DateTimeOffset.UtcNow;
var bodyBytes = Encoding.UTF8.GetBytes(json);
var hash = SHA256.HashData(bodyBytes);
var digest = $"SHA-256={Convert.ToBase64String(hash)}";
request.Headers.Add("Digest", digest);
request.Headers.Host = new Uri(inboxUrl).Host;
logger.LogDebug("Sending request to {Inbox}", inboxUrl);
var signatureHeaders = await signatureService.SignOutgoingRequest(request, actorUri);
var signatureString = $"keyId=\"{signatureHeaders["keyId"]}\"," +
$"algorithm=\"{signatureHeaders["algorithm"]}\"," +
$"headers=\"{signatureHeaders["headers"]}\"," +
$"signature=\"{signatureHeaders["signature"]}\"";
request.Headers.Add("Signature", signatureString);
var response = await client.SendAsync(request, cancellationToken);
logger.LogDebug("Response from {Inbox}. Status: {Status}", inboxUrl, response.StatusCode);
return response;
}
private static bool ShouldRetry(HttpStatusCode statusCode)
{
return statusCode == HttpStatusCode.InternalServerError ||
statusCode == HttpStatusCode.BadGateway ||
statusCode == HttpStatusCode.ServiceUnavailable ||
statusCode == HttpStatusCode.GatewayTimeout ||
statusCode == HttpStatusCode.RequestTimeout ||
statusCode == (HttpStatusCode)429; // Too Many Requests
}
private static Instant CalculateNextRetryAt(int retryCount, IClock clock)
{
var baseDelaySeconds = 1;
var maxDelaySeconds = 300;
var delaySeconds = Math.Min(maxDelaySeconds, baseDelaySeconds * (int)Math.Pow(2, retryCount - 1));
return clock.GetCurrentInstant() + Duration.FromSeconds(delaySeconds);
}
}
public class ActivityPubDeliveryOptions
{
public const string SectionName = "ActivityPubDelivery";
public int MaxRetries { get; set; } = 5;
public int ConsumerCount { get; set; } = 4;
}