using System.Net; using System.Security.Cryptography; using System.Text; using System.Text.Json; using DysonNetwork.Shared.Models; using DysonNetwork.Shared.Proto; using Google.Protobuf; using Microsoft.Extensions.Options; using NATS.Client.Core; using NodaTime; namespace DysonNetwork.Sphere.ActivityPub; public class ActivityPubDeliveryWorker( INatsConnection nats, IServiceProvider serviceProvider, IHttpClientFactory httpClientFactory, IOptions options, ILogger logger, IClock clock ) : BackgroundService { public const string QueueName = "activitypub_delivery_queue"; private const string QueueGroup = "activitypub_delivery_workers"; private readonly List _consumerTasks = []; protected override async Task ExecuteAsync(CancellationToken stoppingToken) { logger.LogInformation("Starting {ConsumerCount} ActivityPub delivery consumers", options.Value.ConsumerCount); for (var i = 0; i < options.Value.ConsumerCount; i++) _consumerTasks.Add(Task.Run(() => RunConsumerAsync(stoppingToken), stoppingToken)); await Task.WhenAll(_consumerTasks); } private async Task RunConsumerAsync(CancellationToken stoppingToken) { logger.LogInformation("ActivityPub delivery consumer started"); await foreach (var msg in nats.SubscribeAsync(QueueName, queueGroup: QueueGroup, cancellationToken: stoppingToken)) { try { var message = GrpcTypeHelper.ConvertByteStringToObject(ByteString.CopyFrom(msg.Data)); if (message is not null) { await ProcessDeliveryAsync(message, stoppingToken); } else { logger.LogWarning("Invalid message format for ActivityPub delivery"); } } catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) { break; } catch (Exception ex) { logger.LogError(ex, "Error in ActivityPub delivery consumer"); } } } private async Task ProcessDeliveryAsync(ActivityPubDeliveryMessage message, CancellationToken cancellationToken) { using var scope = serviceProvider.CreateScope(); var db = scope.ServiceProvider.GetRequiredService(); var signatureService = scope.ServiceProvider.GetRequiredService(); logger.LogDebug("Processing ActivityPub delivery {DeliveryId} to {Inbox}", message.DeliveryId, message.InboxUri); var delivery = await db.ActivityPubDeliveries.FindAsync([message.DeliveryId], cancellationToken); if (delivery == null) { logger.LogWarning("Delivery record not found: {DeliveryId}", message.DeliveryId); return; } delivery.Status = DeliveryStatus.Processing; delivery.LastAttemptAt = clock.GetCurrentInstant(); await db.SaveChangesAsync(cancellationToken); try { var success = await SendActivityToInboxAsync( message.Activity, message.InboxUri, message.ActorUri, signatureService, httpClientFactory, logger, cancellationToken ); if (success.IsSuccessStatusCode) { delivery.Status = DeliveryStatus.Sent; delivery.SentAt = clock.GetCurrentInstant(); delivery.ResponseStatusCode = success.StatusCode.ToString(); logger.LogInformation("Successfully delivered activity {ActivityId} to {Inbox}", message.ActivityId, message.InboxUri); } else { var shouldRetry = ShouldRetry(success.StatusCode); delivery.ResponseStatusCode = success.StatusCode.ToString(); delivery.ErrorMessage = await success.Content.ReadAsStringAsync(cancellationToken); if (shouldRetry && delivery.RetryCount < options.Value.MaxRetries) { delivery.Status = DeliveryStatus.Failed; delivery.RetryCount++; delivery.NextRetryAt = CalculateNextRetryAt(delivery.RetryCount, clock); logger.LogWarning("Failed to deliver activity {ActivityId} to {Inbox}. Status: {Status}. Retry {RetryCount}/{MaxRetries} at {NextRetry}", message.ActivityId, message.InboxUri, success.StatusCode, delivery.RetryCount, options.Value.MaxRetries, delivery.NextRetryAt); } else { delivery.Status = DeliveryStatus.ExhaustedRetries; logger.LogError("Exhausted retries for activity {ActivityId} to {Inbox}. Status: {Status}", message.ActivityId, message.InboxUri, success.StatusCode); } } } catch (Exception ex) { delivery.Status = DeliveryStatus.Failed; delivery.ErrorMessage = ex.Message; if (delivery.RetryCount < options.Value.MaxRetries) { delivery.RetryCount++; delivery.NextRetryAt = CalculateNextRetryAt(delivery.RetryCount, clock); logger.LogError(ex, "Error delivering activity {ActivityId} to {Inbox}. Retry {RetryCount}/{MaxRetries} at {NextRetry}", message.ActivityId, message.InboxUri, delivery.RetryCount, options.Value.MaxRetries, delivery.NextRetryAt); } else { delivery.Status = DeliveryStatus.ExhaustedRetries; logger.LogError(ex, "Exhausted retries for activity {ActivityId} to {Inbox}", message.ActivityId, message.InboxUri); } } await db.SaveChangesAsync(cancellationToken); } private static async Task SendActivityToInboxAsync( Dictionary activity, string inboxUrl, string actorUri, ActivityPubSignatureService signatureService, IHttpClientFactory httpClientFactory, ILogger logger, CancellationToken cancellationToken) { var client = httpClientFactory.CreateClient(); var json = JsonSerializer.Serialize(activity); var request = new HttpRequestMessage(HttpMethod.Post, inboxUrl) { Content = new StringContent(json, Encoding.UTF8, "application/activity+json") }; request.Headers.Date = DateTimeOffset.UtcNow; var bodyBytes = Encoding.UTF8.GetBytes(json); var hash = SHA256.HashData(bodyBytes); var digest = $"SHA-256={Convert.ToBase64String(hash)}"; request.Headers.Add("Digest", digest); request.Headers.Host = new Uri(inboxUrl).Host; logger.LogDebug("Sending request to {Inbox}", inboxUrl); var signatureHeaders = await signatureService.SignOutgoingRequest(request, actorUri); var signatureString = $"keyId=\"{signatureHeaders["keyId"]}\"," + $"algorithm=\"{signatureHeaders["algorithm"]}\"," + $"headers=\"{signatureHeaders["headers"]}\"," + $"signature=\"{signatureHeaders["signature"]}\""; request.Headers.Add("Signature", signatureString); var response = await client.SendAsync(request, cancellationToken); logger.LogDebug("Response from {Inbox}. Status: {Status}", inboxUrl, response.StatusCode); return response; } private static bool ShouldRetry(HttpStatusCode statusCode) { return statusCode == HttpStatusCode.InternalServerError || statusCode == HttpStatusCode.BadGateway || statusCode == HttpStatusCode.ServiceUnavailable || statusCode == HttpStatusCode.GatewayTimeout || statusCode == HttpStatusCode.RequestTimeout || statusCode == (HttpStatusCode)429; // Too Many Requests } private static Instant CalculateNextRetryAt(int retryCount, IClock clock) { var baseDelaySeconds = 1; var maxDelaySeconds = 300; var delaySeconds = Math.Min(maxDelaySeconds, baseDelaySeconds * (int)Math.Pow(2, retryCount - 1)); return clock.GetCurrentInstant() + Duration.FromSeconds(delaySeconds); } } public class ActivityPubDeliveryOptions { public const string SectionName = "ActivityPubDelivery"; public int MaxRetries { get; set; } = 5; public int ConsumerCount { get; set; } = 4; }