diff --git a/DysonNetwork.Drive/Startup/BroadcastEventHandler.cs b/DysonNetwork.Drive/Startup/BroadcastEventHandler.cs index 9cd251d..5654a16 100644 --- a/DysonNetwork.Drive/Startup/BroadcastEventHandler.cs +++ b/DysonNetwork.Drive/Startup/BroadcastEventHandler.cs @@ -1,10 +1,16 @@ using System.Text.Json; -using DysonNetwork.Drive.Storage; +using DysonNetwork.Drive.Storage.Model; +using DysonNetwork.Shared.Proto; using DysonNetwork.Shared.Stream; +using FFMpegCore; using Microsoft.EntityFrameworkCore; using NATS.Client.Core; +using NATS.Client.JetStream; using NATS.Client.JetStream.Models; using NATS.Net; +using NetVips; +using NodaTime; +using FileService = DysonNetwork.Drive.Storage.FileService; namespace DysonNetwork.Drive.Startup; @@ -14,20 +20,72 @@ public class BroadcastEventHandler( IServiceProvider serviceProvider ) : BackgroundService { + private const string TempFileSuffix = "dypart"; + + private static readonly string[] AnimatedImageTypes = + ["image/gif", "image/apng", "image/avif"]; + + private static readonly string[] AnimatedImageExtensions = + [".gif", ".apng", ".avif"]; + protected override async Task ExecuteAsync(CancellationToken stoppingToken) { var js = nats.CreateJetStreamContext(); await js.EnsureStreamCreated("account_events", [AccountDeletedEvent.Type]); - var consumer = await js.CreateOrUpdateConsumerAsync("account_events", + var accountEventConsumer = await js.CreateOrUpdateConsumerAsync("account_events", new ConsumerConfig("drive_account_deleted_handler"), cancellationToken: stoppingToken); + + await js.EnsureStreamCreated("file_events", [FileUploadedEvent.Type]); + var fileUploadedConsumer = await js.CreateOrUpdateConsumerAsync("file_events", + new ConsumerConfig("drive_file_uploaded_handler"), cancellationToken: stoppingToken); + var accountDeletedTask = HandleAccountDeleted(accountEventConsumer, stoppingToken); + var fileUploadedTask = HandleFileUploaded(fileUploadedConsumer, stoppingToken); + + await Task.WhenAll(accountDeletedTask, fileUploadedTask); + } + + private async Task HandleFileUploaded(INatsJSConsumer consumer, CancellationToken stoppingToken) + { + await foreach (var msg in consumer.ConsumeAsync(cancellationToken: stoppingToken)) + { + var payload = JsonSerializer.Deserialize(msg.Data, GrpcTypeHelper.SerializerOptions); + if (payload == null) + { + await msg.AckAsync(cancellationToken: stoppingToken); + continue; + } + + try + { + await ProcessAndUploadInBackgroundAsync( + payload.FileId, + payload.RemoteId, + payload.StorageId, + payload.ContentType, + payload.ProcessingFilePath, + payload.IsTempFile + ); + + await msg.AckAsync(cancellationToken: stoppingToken); + } + catch (Exception ex) + { + logger.LogError(ex, "Error processing FileUploadedEvent for file {FileId}", payload?.FileId); + await msg.NakAsync(cancellationToken: stoppingToken); + } + } + } + + private async Task HandleAccountDeleted(INatsJSConsumer consumer, CancellationToken stoppingToken) + { await foreach (var msg in consumer.ConsumeAsync(cancellationToken: stoppingToken)) { try { - var evt = JsonSerializer.Deserialize(msg.Data); + var evt = JsonSerializer.Deserialize(msg.Data, GrpcTypeHelper.SerializerOptions); if (evt == null) { await msg.AckAsync(cancellationToken: stoppingToken); @@ -69,4 +127,168 @@ public class BroadcastEventHandler( } } } + + private async Task ProcessAndUploadInBackgroundAsync( + string fileId, + Guid remoteId, + string storageId, + string contentType, + string processingFilePath, + bool isTempFile + ) + { + using var scope = serviceProvider.CreateScope(); + var fs = scope.ServiceProvider.GetRequiredService(); + var scopedDb = scope.ServiceProvider.GetRequiredService(); + + var pool = await fs.GetPoolAsync(remoteId); + if (pool is null) return; + + var uploads = new List<(string FilePath, string Suffix, string ContentType, bool SelfDestruct)>(); + var newMimeType = contentType; + var hasCompression = false; + var hasThumbnail = false; + + try + { + logger.LogInformation("Processing file {FileId} in background...", fileId); + + var fileToUpdate = await scopedDb.Files.AsNoTracking().FirstAsync(f => f.Id == fileId); + + if (fileToUpdate.IsEncrypted) + { + uploads.Add((processingFilePath, string.Empty, contentType, false)); + } + else if (!pool.PolicyConfig.NoOptimization) + { + var fileExtension = Path.GetExtension(processingFilePath); + switch (contentType.Split('/')[0]) + { + case "image": + if (AnimatedImageTypes.Contains(contentType) || AnimatedImageExtensions.Contains(fileExtension)) + { + logger.LogInformation("Skip optimize file {FileId} due to it is animated...", fileId); + uploads.Add((processingFilePath, string.Empty, contentType, false)); + break; + } + + newMimeType = "image/webp"; + using (var vipsImage = Image.NewFromFile(processingFilePath)) + { + var imageToWrite = vipsImage; + + if (vipsImage.Interpretation is Enums.Interpretation.Scrgb or Enums.Interpretation.Xyz) + { + imageToWrite = vipsImage.Colourspace(Enums.Interpretation.Srgb); + } + + var webpPath = Path.Join(Path.GetTempPath(), $"{fileId}.{TempFileSuffix}.webp"); + imageToWrite.Autorot().WriteToFile(webpPath, + new VOption { { "lossless", true }, { "strip", true } }); + uploads.Add((webpPath, string.Empty, newMimeType, true)); + + if (imageToWrite.Width * imageToWrite.Height >= 1024 * 1024) + { + var scale = 1024.0 / Math.Max(imageToWrite.Width, imageToWrite.Height); + var compressedPath = + Path.Join(Path.GetTempPath(), $"{fileId}.{TempFileSuffix}.compressed.webp"); + using var compressedImage = imageToWrite.Resize(scale); + compressedImage.Autorot().WriteToFile(compressedPath, + new VOption { { "Q", 80 }, { "strip", true } }); + uploads.Add((compressedPath, ".compressed", newMimeType, true)); + hasCompression = true; + } + + if (!ReferenceEquals(imageToWrite, vipsImage)) + { + imageToWrite.Dispose(); + } + } + + break; + + case "video": + uploads.Add((processingFilePath, string.Empty, contentType, false)); + + var thumbnailPath = Path.Join(Path.GetTempPath(), $"{fileId}.{TempFileSuffix}.thumbnail.jpg"); + try + { + await FFMpegArguments + .FromFileInput(processingFilePath, verifyExists: true) + .OutputToFile(thumbnailPath, overwrite: true, options => options + .Seek(TimeSpan.FromSeconds(0)) + .WithFrameOutputCount(1) + .WithCustomArgument("-q:v 2") + ) + .NotifyOnOutput(line => logger.LogInformation("[FFmpeg] {Line}", line)) + .NotifyOnError(line => logger.LogWarning("[FFmpeg] {Line}", line)) + .ProcessAsynchronously(); + + if (File.Exists(thumbnailPath)) + { + uploads.Add((thumbnailPath, ".thumbnail", "image/jpeg", true)); + hasThumbnail = true; + } + else + { + logger.LogWarning("FFMpeg did not produce thumbnail for video {FileId}", fileId); + } + } + catch (Exception ex) + { + logger.LogError(ex, "Failed to generate thumbnail for video {FileId}", fileId); + } + + break; + + default: + uploads.Add((processingFilePath, string.Empty, contentType, false)); + break; + } + } + else uploads.Add((processingFilePath, string.Empty, contentType, false)); + + logger.LogInformation("Optimized file {FileId}, now uploading...", fileId); + + if (uploads.Count > 0) + { + var destPool = remoteId; + var uploadTasks = uploads.Select(item => + fs.UploadFileToRemoteAsync( + storageId, + destPool, + item.FilePath, + item.Suffix, + item.ContentType, + item.SelfDestruct + ) + ).ToList(); + + await Task.WhenAll(uploadTasks); + + logger.LogInformation("Uploaded file {FileId} done!", fileId); + + var now = SystemClock.Instance.GetCurrentInstant(); + await scopedDb.Files.Where(f => f.Id == fileId).ExecuteUpdateAsync(setter => setter + .SetProperty(f => f.UploadedAt, now) + .SetProperty(f => f.PoolId, destPool) + .SetProperty(f => f.MimeType, newMimeType) + .SetProperty(f => f.HasCompression, hasCompression) + .SetProperty(f => f.HasThumbnail, hasThumbnail) + ); + } + } + catch (Exception err) + { + logger.LogError(err, "Failed to process and upload {FileId}", fileId); + } + finally + { + if (isTempFile) + { + File.Delete(processingFilePath); + } + await fs._PurgeCacheAsync(fileId); + } + } } \ No newline at end of file diff --git a/DysonNetwork.Drive/Storage/FileReferenceServiceGrpc.cs b/DysonNetwork.Drive/Storage/FileReferenceServiceGrpc.cs index c1547d6..b0e62d1 100644 --- a/DysonNetwork.Drive/Storage/FileReferenceServiceGrpc.cs +++ b/DysonNetwork.Drive/Storage/FileReferenceServiceGrpc.cs @@ -3,173 +3,172 @@ using Grpc.Core; using NodaTime; using Duration = NodaTime.Duration; -namespace DysonNetwork.Drive.Storage +namespace DysonNetwork.Drive.Storage; + +public class FileReferenceServiceGrpc(FileReferenceService fileReferenceService) + : Shared.Proto.FileReferenceService.FileReferenceServiceBase { - public class FileReferenceServiceGrpc(FileReferenceService fileReferenceService) - : Shared.Proto.FileReferenceService.FileReferenceServiceBase + public override async Task CreateReference(CreateReferenceRequest request, + ServerCallContext context) { - public override async Task CreateReference(CreateReferenceRequest request, - ServerCallContext context) - { - Instant? expiredAt = null; - if (request.ExpiredAt != null) - expiredAt = Instant.FromUnixTimeSeconds(request.ExpiredAt.Seconds); - else if (request.Duration != null) - expiredAt = SystemClock.Instance.GetCurrentInstant() + - Duration.FromTimeSpan(request.Duration.ToTimeSpan()); + Instant? expiredAt = null; + if (request.ExpiredAt != null) + expiredAt = Instant.FromUnixTimeSeconds(request.ExpiredAt.Seconds); + else if (request.Duration != null) + expiredAt = SystemClock.Instance.GetCurrentInstant() + + Duration.FromTimeSpan(request.Duration.ToTimeSpan()); - var reference = await fileReferenceService.CreateReferenceAsync( - request.FileId, - request.Usage, - request.ResourceId, - expiredAt - ); - return reference.ToProtoValue(); - } + var reference = await fileReferenceService.CreateReferenceAsync( + request.FileId, + request.Usage, + request.ResourceId, + expiredAt + ); + return reference.ToProtoValue(); + } - public override async Task CreateReferenceBatch(CreateReferenceBatchRequest request, - ServerCallContext context) - { - Instant? expiredAt = null; - if (request.ExpiredAt != null) - expiredAt = Instant.FromUnixTimeSeconds(request.ExpiredAt.Seconds); - else if (request.Duration != null) - expiredAt = SystemClock.Instance.GetCurrentInstant() + - Duration.FromTimeSpan(request.Duration.ToTimeSpan()); + public override async Task CreateReferenceBatch(CreateReferenceBatchRequest request, + ServerCallContext context) + { + Instant? expiredAt = null; + if (request.ExpiredAt != null) + expiredAt = Instant.FromUnixTimeSeconds(request.ExpiredAt.Seconds); + else if (request.Duration != null) + expiredAt = SystemClock.Instance.GetCurrentInstant() + + Duration.FromTimeSpan(request.Duration.ToTimeSpan()); - var references = await fileReferenceService.CreateReferencesAsync( - request.FilesId.ToList(), - request.Usage, - request.ResourceId, - expiredAt - ); - var response = new CreateReferenceBatchResponse(); - response.References.AddRange(references.Select(r => r.ToProtoValue())); - return response; - } + var references = await fileReferenceService.CreateReferencesAsync( + request.FilesId.ToList(), + request.Usage, + request.ResourceId, + expiredAt + ); + var response = new CreateReferenceBatchResponse(); + response.References.AddRange(references.Select(r => r.ToProtoValue())); + return response; + } - public override async Task GetReferences(GetReferencesRequest request, - ServerCallContext context) - { - var references = await fileReferenceService.GetReferencesAsync(request.FileId); - var response = new GetReferencesResponse(); - response.References.AddRange(references.Select(r => r.ToProtoValue())); - return response; - } + public override async Task GetReferences(GetReferencesRequest request, + ServerCallContext context) + { + var references = await fileReferenceService.GetReferencesAsync(request.FileId); + var response = new GetReferencesResponse(); + response.References.AddRange(references.Select(r => r.ToProtoValue())); + return response; + } - public override async Task GetReferenceCount(GetReferenceCountRequest request, - ServerCallContext context) - { - var count = await fileReferenceService.GetReferenceCountAsync(request.FileId); - return new GetReferenceCountResponse { Count = count }; - } + public override async Task GetReferenceCount(GetReferenceCountRequest request, + ServerCallContext context) + { + var count = await fileReferenceService.GetReferenceCountAsync(request.FileId); + return new GetReferenceCountResponse { Count = count }; + } - public override async Task GetResourceReferences(GetResourceReferencesRequest request, - ServerCallContext context) - { - var references = await fileReferenceService.GetResourceReferencesAsync(request.ResourceId, request.Usage); - var response = new GetReferencesResponse(); - response.References.AddRange(references.Select(r => r.ToProtoValue())); - return response; - } + public override async Task GetResourceReferences(GetResourceReferencesRequest request, + ServerCallContext context) + { + var references = await fileReferenceService.GetResourceReferencesAsync(request.ResourceId, request.Usage); + var response = new GetReferencesResponse(); + response.References.AddRange(references.Select(r => r.ToProtoValue())); + return response; + } - public override async Task GetResourceFiles(GetResourceFilesRequest request, - ServerCallContext context) - { - var files = await fileReferenceService.GetResourceFilesAsync(request.ResourceId, request.Usage); - var response = new GetResourceFilesResponse(); - response.Files.AddRange(files.Select(f => f.ToProtoValue())); - return response; - } + public override async Task GetResourceFiles(GetResourceFilesRequest request, + ServerCallContext context) + { + var files = await fileReferenceService.GetResourceFilesAsync(request.ResourceId, request.Usage); + var response = new GetResourceFilesResponse(); + response.Files.AddRange(files.Select(f => f.ToProtoValue())); + return response; + } - public override async Task DeleteResourceReferences( - DeleteResourceReferencesRequest request, ServerCallContext context) - { - int deletedCount; - if (request.Usage is null) - deletedCount = await fileReferenceService.DeleteResourceReferencesAsync(request.ResourceId); - else - deletedCount = - await fileReferenceService.DeleteResourceReferencesAsync(request.ResourceId, request.Usage!); - return new DeleteResourceReferencesResponse { DeletedCount = deletedCount }; - } + public override async Task DeleteResourceReferences( + DeleteResourceReferencesRequest request, ServerCallContext context) + { + int deletedCount; + if (request.Usage is null) + deletedCount = await fileReferenceService.DeleteResourceReferencesAsync(request.ResourceId); + else + deletedCount = + await fileReferenceService.DeleteResourceReferencesAsync(request.ResourceId, request.Usage!); + return new DeleteResourceReferencesResponse { DeletedCount = deletedCount }; + } - public override async Task DeleteResourceReferencesBatch(DeleteResourceReferencesBatchRequest request, ServerCallContext context) + public override async Task DeleteResourceReferencesBatch(DeleteResourceReferencesBatchRequest request, ServerCallContext context) + { + var resourceIds = request.ResourceIds.ToList(); + int deletedCount; + if (request.Usage is null) + deletedCount = await fileReferenceService.DeleteResourceReferencesBatchAsync(resourceIds); + else + deletedCount = + await fileReferenceService.DeleteResourceReferencesBatchAsync(resourceIds, request.Usage!); + return new DeleteResourceReferencesResponse { DeletedCount = deletedCount }; + } + + public override async Task DeleteReference(DeleteReferenceRequest request, + ServerCallContext context) + { + var success = await fileReferenceService.DeleteReferenceAsync(Guid.Parse(request.ReferenceId)); + return new DeleteReferenceResponse { Success = success }; + } + + public override async Task UpdateResourceFiles(UpdateResourceFilesRequest request, + ServerCallContext context) + { + Instant? expiredAt = null; + if (request.ExpiredAt != null) { - var resourceIds = request.ResourceIds.ToList(); - int deletedCount; - if (request.Usage is null) - deletedCount = await fileReferenceService.DeleteResourceReferencesBatchAsync(resourceIds); - else - deletedCount = - await fileReferenceService.DeleteResourceReferencesBatchAsync(resourceIds, request.Usage!); - return new DeleteResourceReferencesResponse { DeletedCount = deletedCount }; + expiredAt = Instant.FromUnixTimeSeconds(request.ExpiredAt.Seconds); + } + else if (request.Duration != null) + { + expiredAt = SystemClock.Instance.GetCurrentInstant() + + Duration.FromTimeSpan(request.Duration.ToTimeSpan()); } - public override async Task DeleteReference(DeleteReferenceRequest request, - ServerCallContext context) + var references = await fileReferenceService.UpdateResourceFilesAsync( + request.ResourceId, + request.FileIds, + request.Usage, + expiredAt + ); + var response = new UpdateResourceFilesResponse(); + response.References.AddRange(references.Select(r => r.ToProtoValue())); + return response; + } + + public override async Task SetReferenceExpiration( + SetReferenceExpirationRequest request, ServerCallContext context) + { + Instant? expiredAt = null; + if (request.ExpiredAt != null) { - var success = await fileReferenceService.DeleteReferenceAsync(Guid.Parse(request.ReferenceId)); - return new DeleteReferenceResponse { Success = success }; + expiredAt = Instant.FromUnixTimeSeconds(request.ExpiredAt.Seconds); + } + else if (request.Duration != null) + { + expiredAt = SystemClock.Instance.GetCurrentInstant() + + Duration.FromTimeSpan(request.Duration.ToTimeSpan()); } - public override async Task UpdateResourceFiles(UpdateResourceFilesRequest request, - ServerCallContext context) - { - Instant? expiredAt = null; - if (request.ExpiredAt != null) - { - expiredAt = Instant.FromUnixTimeSeconds(request.ExpiredAt.Seconds); - } - else if (request.Duration != null) - { - expiredAt = SystemClock.Instance.GetCurrentInstant() + - Duration.FromTimeSpan(request.Duration.ToTimeSpan()); - } + var success = + await fileReferenceService.SetReferenceExpirationAsync(Guid.Parse(request.ReferenceId), expiredAt); + return new SetReferenceExpirationResponse { Success = success }; + } - var references = await fileReferenceService.UpdateResourceFilesAsync( - request.ResourceId, - request.FileIds, - request.Usage, - expiredAt - ); - var response = new UpdateResourceFilesResponse(); - response.References.AddRange(references.Select(r => r.ToProtoValue())); - return response; - } + public override async Task SetFileReferencesExpiration( + SetFileReferencesExpirationRequest request, ServerCallContext context) + { + var expiredAt = Instant.FromUnixTimeSeconds(request.ExpiredAt.Seconds); + var updatedCount = await fileReferenceService.SetFileReferencesExpirationAsync(request.FileId, expiredAt); + return new SetFileReferencesExpirationResponse { UpdatedCount = updatedCount }; + } - public override async Task SetReferenceExpiration( - SetReferenceExpirationRequest request, ServerCallContext context) - { - Instant? expiredAt = null; - if (request.ExpiredAt != null) - { - expiredAt = Instant.FromUnixTimeSeconds(request.ExpiredAt.Seconds); - } - else if (request.Duration != null) - { - expiredAt = SystemClock.Instance.GetCurrentInstant() + - Duration.FromTimeSpan(request.Duration.ToTimeSpan()); - } - - var success = - await fileReferenceService.SetReferenceExpirationAsync(Guid.Parse(request.ReferenceId), expiredAt); - return new SetReferenceExpirationResponse { Success = success }; - } - - public override async Task SetFileReferencesExpiration( - SetFileReferencesExpirationRequest request, ServerCallContext context) - { - var expiredAt = Instant.FromUnixTimeSeconds(request.ExpiredAt.Seconds); - var updatedCount = await fileReferenceService.SetFileReferencesExpirationAsync(request.FileId, expiredAt); - return new SetFileReferencesExpirationResponse { UpdatedCount = updatedCount }; - } - - public override async Task HasFileReferences(HasFileReferencesRequest request, - ServerCallContext context) - { - var hasReferences = await fileReferenceService.HasFileReferencesAsync(request.FileId); - return new HasFileReferencesResponse { HasReferences = hasReferences }; - } + public override async Task HasFileReferences(HasFileReferencesRequest request, + ServerCallContext context) + { + var hasReferences = await fileReferenceService.HasFileReferencesAsync(request.FileId); + return new HasFileReferencesResponse { HasReferences = hasReferences }; } } \ No newline at end of file diff --git a/DysonNetwork.Drive/Storage/FileService.cs b/DysonNetwork.Drive/Storage/FileService.cs index b532b73..3890556 100644 --- a/DysonNetwork.Drive/Storage/FileService.cs +++ b/DysonNetwork.Drive/Storage/FileService.cs @@ -1,27 +1,28 @@ -using System.Drawing; using System.Globalization; using FFMpegCore; using System.Security.Cryptography; +using DysonNetwork.Drive.Storage.Model; using DysonNetwork.Shared.Cache; using DysonNetwork.Shared.Proto; using Google.Protobuf.WellKnownTypes; using Microsoft.EntityFrameworkCore; using Minio; using Minio.DataModel.Args; +using NATS.Client.Core; using NetVips; using NodaTime; -using tusdotnet.Stores; using System.Linq.Expressions; using DysonNetwork.Shared.Data; using Microsoft.EntityFrameworkCore.Query; +using NATS.Net; namespace DysonNetwork.Drive.Storage; public class FileService( AppDatabase db, ILogger logger, - IServiceScopeFactory scopeFactory, - ICacheService cache + ICacheService cache, + INatsConnection nats ) { private const string CacheKeyPrefix = "file:"; @@ -85,14 +86,6 @@ public class FileService( .ToList(); } - private const string TempFilePrefix = "dyn-cloudfile"; - - private static readonly string[] AnimatedImageTypes = - ["image/gif", "image/apng", "image/avif"]; - - private static readonly string[] AnimatedImageExtensions = - [".gif", ".apng", ".avif"]; - public async Task ProcessNewFileAsync( Account account, string fileId, @@ -129,13 +122,14 @@ public class FileService( if (bundle?.ExpiredAt != null) expiredAt = bundle.ExpiredAt.Value; - + var managedTempPath = Path.Combine(Path.GetTempPath(), fileId); File.Copy(filePath, managedTempPath, true); var fileInfo = new FileInfo(managedTempPath); var fileSize = fileInfo.Length; - var finalContentType = contentType ?? (!fileName.Contains('.') ? "application/octet-stream" : MimeTypes.GetMimeType(fileName)); + var finalContentType = contentType ?? + (!fileName.Contains('.') ? "application/octet-stream" : MimeTypes.GetMimeType(fileName)); var file = new CloudFile { @@ -154,20 +148,20 @@ public class FileService( } string processingPath = managedTempPath; - bool isTempFile = true; + bool isTempFile = true; if (!string.IsNullOrWhiteSpace(encryptPassword)) { if (!pool.PolicyConfig.AllowEncryption) throw new InvalidOperationException("Encryption is not allowed in this pool"); - + var encryptedPath = Path.Combine(Path.GetTempPath(), $"{fileId}.encrypted"); FileEncryptor.EncryptFile(managedTempPath, encryptedPath, encryptPassword); - + File.Delete(managedTempPath); - + processingPath = encryptedPath; - + file.IsEncrypted = true; file.MimeType = "application/octet-stream"; file.Size = new FileInfo(processingPath).Length; @@ -180,8 +174,18 @@ public class FileService( file.StorageId ??= file.Id; - _ = Task.Run(() => - ProcessAndUploadInBackgroundAsync(file.Id, filePool, file.StorageId, file.MimeType, processingPath, isTempFile)); + var js = nats.CreateJetStreamContext(); + await js.PublishAsync( + FileUploadedEvent.Type, + GrpcTypeHelper.ConvertObjectToByteString(new FileUploadedEventPayload( + file.Id, + pool.Id, + file.StorageId, + file.MimeType, + processingPath, + isTempFile) + ).ToByteArray() + ); return file; } @@ -296,170 +300,6 @@ public class FileService( } } - private async Task ProcessAndUploadInBackgroundAsync( - string fileId, - string remoteId, - string storageId, - string contentType, - string processingFilePath, - bool isTempFile - ) - { - var pool = await GetPoolAsync(Guid.Parse(remoteId)); - if (pool is null) return; - - using var scope = scopeFactory.CreateScope(); - var nfs = scope.ServiceProvider.GetRequiredService(); - var scopedDb = scope.ServiceProvider.GetRequiredService(); - - var uploads = new List<(string FilePath, string Suffix, string ContentType, bool SelfDestruct)>(); - var newMimeType = contentType; - var hasCompression = false; - var hasThumbnail = false; - - try - { - logger.LogInformation("Processing file {FileId} in background...", fileId); - - var fileToUpdate = await scopedDb.Files.AsNoTracking().FirstAsync(f => f.Id == fileId); - - if (fileToUpdate.IsEncrypted) - { - uploads.Add((processingFilePath, string.Empty, contentType, false)); - } - else if (!pool.PolicyConfig.NoOptimization) - { - var fileExtension = Path.GetExtension(processingFilePath); - switch (contentType.Split('/')[0]) - { - case "image": - if (AnimatedImageTypes.Contains(contentType) || AnimatedImageExtensions.Contains(fileExtension)) - { - logger.LogInformation("Skip optimize file {FileId} due to it is animated...", fileId); - uploads.Add((processingFilePath, string.Empty, contentType, false)); - break; - } - - newMimeType = "image/webp"; - using (var vipsImage = Image.NewFromFile(processingFilePath)) - { - var imageToWrite = vipsImage; - - if (vipsImage.Interpretation is Enums.Interpretation.Scrgb or Enums.Interpretation.Xyz) - { - imageToWrite = vipsImage.Colourspace(Enums.Interpretation.Srgb); - } - - var webpPath = Path.Join(Path.GetTempPath(), $"{TempFilePrefix}#{fileId}.webp"); - imageToWrite.Autorot().WriteToFile(webpPath, - new VOption { { "lossless", true }, { "strip", true } }); - uploads.Add((webpPath, string.Empty, newMimeType, true)); - - if (imageToWrite.Width * imageToWrite.Height >= 1024 * 1024) - { - var scale = 1024.0 / Math.Max(imageToWrite.Width, imageToWrite.Height); - var compressedPath = - Path.Join(Path.GetTempPath(), $"{TempFilePrefix}#{fileId}-compressed.webp"); - using var compressedImage = imageToWrite.Resize(scale); - compressedImage.Autorot().WriteToFile(compressedPath, - new VOption { { "Q", 80 }, { "strip", true } }); - uploads.Add((compressedPath, ".compressed", newMimeType, true)); - hasCompression = true; - } - - if (!ReferenceEquals(imageToWrite, vipsImage)) - { - imageToWrite.Dispose(); - } - } - - break; - - case "video": - uploads.Add((processingFilePath, string.Empty, contentType, false)); - - var thumbnailPath = Path.Join(Path.GetTempPath(), $"{TempFilePrefix}#{fileId}.thumbnail.jpg"); - try - { - await FFMpegArguments - .FromFileInput(processingFilePath, verifyExists: true) - .OutputToFile(thumbnailPath, overwrite: true, options => options - .Seek(TimeSpan.FromSeconds(0)) - .WithFrameOutputCount(1) - .WithCustomArgument("-q:v 2") - ) - .NotifyOnOutput(line => logger.LogInformation("[FFmpeg] {Line}", line)) - .NotifyOnError(line => logger.LogWarning("[FFmpeg] {Line}", line)) - .ProcessAsynchronously(); - - if (File.Exists(thumbnailPath)) - { - uploads.Add((thumbnailPath, ".thumbnail", "image/jpeg", true)); - hasThumbnail = true; - } - else - { - logger.LogWarning("FFMpeg did not produce thumbnail for video {FileId}", fileId); - } - } - catch (Exception ex) - { - logger.LogError(ex, "Failed to generate thumbnail for video {FileId}", fileId); - } - - break; - - default: - uploads.Add((processingFilePath, string.Empty, contentType, false)); - break; - } - } - else uploads.Add((processingFilePath, string.Empty, contentType, false)); - - logger.LogInformation("Optimized file {FileId}, now uploading...", fileId); - - if (uploads.Count > 0) - { - var destPool = Guid.Parse(remoteId!); - var uploadTasks = uploads.Select(item => - nfs.UploadFileToRemoteAsync( - storageId, - destPool, - item.FilePath, - item.Suffix, - item.ContentType, - item.SelfDestruct - ) - ).ToList(); - - await Task.WhenAll(uploadTasks); - - logger.LogInformation("Uploaded file {FileId} done!", fileId); - - var now = SystemClock.Instance.GetCurrentInstant(); - await scopedDb.Files.Where(f => f.Id == fileId).ExecuteUpdateAsync(setter => setter - .SetProperty(f => f.UploadedAt, now) - .SetProperty(f => f.PoolId, destPool) - .SetProperty(f => f.MimeType, newMimeType) - .SetProperty(f => f.HasCompression, hasCompression) - .SetProperty(f => f.HasThumbnail, hasThumbnail) - ); - } - } - catch (Exception err) - { - logger.LogError(err, "Failed to process and upload {FileId}", fileId); - } - finally - { - if (isTempFile) - { - File.Delete(processingFilePath); - } - await nfs._PurgeCacheAsync(fileId); - } - } - private static async Task HashFileAsync(string filePath, int chunkSize = 1024 * 1024) { var fileInfo = new FileInfo(filePath); @@ -492,7 +332,7 @@ public class FileService( return Convert.ToHexString(hash).ToLowerInvariant(); } - private async Task UploadFileToRemoteAsync( + public async Task UploadFileToRemoteAsync( string storageId, Guid targetRemote, string filePath, @@ -506,7 +346,7 @@ public class FileService( if (selfDestruct) File.Delete(filePath); } - private async Task UploadFileToRemoteAsync( + public async Task UploadFileToRemoteAsync( string storageId, Guid targetRemote, Stream stream, @@ -882,7 +722,7 @@ file class UpdatableCloudFile(CloudFile file) .SetProperty(f => f.Name, Name) .SetProperty(f => f.Description, Description) .SetProperty(f => f.FileMeta, FileMeta) - .SetProperty(f => f.UserMeta, userMeta!) + .SetProperty(f => f.UserMeta, userMeta) .SetProperty(f => f.IsMarkedRecycle, IsMarkedRecycle); } -} +} \ No newline at end of file diff --git a/DysonNetwork.Drive/Storage/Model/Events.cs b/DysonNetwork.Drive/Storage/Model/Events.cs new file mode 100644 index 0000000..ab6320a --- /dev/null +++ b/DysonNetwork.Drive/Storage/Model/Events.cs @@ -0,0 +1,15 @@ +namespace DysonNetwork.Drive.Storage.Model; + +public static class FileUploadedEvent +{ + public const string Type = "file_uploaded"; +} + +public record FileUploadedEventPayload( + string FileId, + Guid RemoteId, + string StorageId, + string ContentType, + string ProcessingFilePath, + bool IsTempFile +); diff --git a/DysonNetwork.Drive/Storage/Model/FileUploadModels.cs b/DysonNetwork.Drive/Storage/Model/FileUploadModels.cs index 2b92614..b2d73f5 100644 --- a/DysonNetwork.Drive/Storage/Model/FileUploadModels.cs +++ b/DysonNetwork.Drive/Storage/Model/FileUploadModels.cs @@ -1,4 +1,3 @@ -using DysonNetwork.Drive.Storage; using NodaTime; namespace DysonNetwork.Drive.Storage.Model diff --git a/DysonNetwork.Drive/Storage/README.md b/DysonNetwork.Drive/Storage/README.md index 2aeba4a..dc705e4 100644 --- a/DysonNetwork.Drive/Storage/README.md +++ b/DysonNetwork.Drive/Storage/README.md @@ -16,7 +16,7 @@ To begin a file upload, you first need to create an upload task. This is done by "file_name": "string", "file_size": "long (in bytes)", "content_type": "string (e.g., 'image/jpeg')", - "pool_id": "string (GUID)", + "pool_id": "string (GUID, optional)", "bundle_id": "string (GUID, optional)", "encrypt_password": "string (optional)", "expired_at": "string (ISO 8601 format, optional)",