♻️ Move file analyze, upload into message queue

This commit is contained in:
2025-09-21 19:38:40 +08:00
parent b2a0d25ffa
commit f81e3dc9f4
6 changed files with 418 additions and 343 deletions

View File

@@ -1,10 +1,16 @@
using System.Text.Json; using System.Text.Json;
using DysonNetwork.Drive.Storage; using DysonNetwork.Drive.Storage.Model;
using DysonNetwork.Shared.Proto;
using DysonNetwork.Shared.Stream; using DysonNetwork.Shared.Stream;
using FFMpegCore;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using NATS.Client.Core; using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models; using NATS.Client.JetStream.Models;
using NATS.Net; using NATS.Net;
using NetVips;
using NodaTime;
using FileService = DysonNetwork.Drive.Storage.FileService;
namespace DysonNetwork.Drive.Startup; namespace DysonNetwork.Drive.Startup;
@@ -14,20 +20,72 @@ public class BroadcastEventHandler(
IServiceProvider serviceProvider IServiceProvider serviceProvider
) : BackgroundService ) : BackgroundService
{ {
private const string TempFileSuffix = "dypart";
private static readonly string[] AnimatedImageTypes =
["image/gif", "image/apng", "image/avif"];
private static readonly string[] AnimatedImageExtensions =
[".gif", ".apng", ".avif"];
protected override async Task ExecuteAsync(CancellationToken stoppingToken) protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{ {
var js = nats.CreateJetStreamContext(); var js = nats.CreateJetStreamContext();
await js.EnsureStreamCreated("account_events", [AccountDeletedEvent.Type]); await js.EnsureStreamCreated("account_events", [AccountDeletedEvent.Type]);
var consumer = await js.CreateOrUpdateConsumerAsync("account_events", var accountEventConsumer = await js.CreateOrUpdateConsumerAsync("account_events",
new ConsumerConfig("drive_account_deleted_handler"), cancellationToken: stoppingToken); new ConsumerConfig("drive_account_deleted_handler"), cancellationToken: stoppingToken);
await js.EnsureStreamCreated("file_events", [FileUploadedEvent.Type]);
var fileUploadedConsumer = await js.CreateOrUpdateConsumerAsync("file_events",
new ConsumerConfig("drive_file_uploaded_handler"), cancellationToken: stoppingToken);
var accountDeletedTask = HandleAccountDeleted(accountEventConsumer, stoppingToken);
var fileUploadedTask = HandleFileUploaded(fileUploadedConsumer, stoppingToken);
await Task.WhenAll(accountDeletedTask, fileUploadedTask);
}
private async Task HandleFileUploaded(INatsJSConsumer consumer, CancellationToken stoppingToken)
{
await foreach (var msg in consumer.ConsumeAsync<byte[]>(cancellationToken: stoppingToken))
{
var payload = JsonSerializer.Deserialize<FileUploadedEventPayload>(msg.Data, GrpcTypeHelper.SerializerOptions);
if (payload == null)
{
await msg.AckAsync(cancellationToken: stoppingToken);
continue;
}
try
{
await ProcessAndUploadInBackgroundAsync(
payload.FileId,
payload.RemoteId,
payload.StorageId,
payload.ContentType,
payload.ProcessingFilePath,
payload.IsTempFile
);
await msg.AckAsync(cancellationToken: stoppingToken);
}
catch (Exception ex)
{
logger.LogError(ex, "Error processing FileUploadedEvent for file {FileId}", payload?.FileId);
await msg.NakAsync(cancellationToken: stoppingToken);
}
}
}
private async Task HandleAccountDeleted(INatsJSConsumer consumer, CancellationToken stoppingToken)
{
await foreach (var msg in consumer.ConsumeAsync<byte[]>(cancellationToken: stoppingToken)) await foreach (var msg in consumer.ConsumeAsync<byte[]>(cancellationToken: stoppingToken))
{ {
try try
{ {
var evt = JsonSerializer.Deserialize<AccountDeletedEvent>(msg.Data); var evt = JsonSerializer.Deserialize<AccountDeletedEvent>(msg.Data, GrpcTypeHelper.SerializerOptions);
if (evt == null) if (evt == null)
{ {
await msg.AckAsync(cancellationToken: stoppingToken); await msg.AckAsync(cancellationToken: stoppingToken);
@@ -69,4 +127,168 @@ public class BroadcastEventHandler(
} }
} }
} }
private async Task ProcessAndUploadInBackgroundAsync(
string fileId,
Guid remoteId,
string storageId,
string contentType,
string processingFilePath,
bool isTempFile
)
{
using var scope = serviceProvider.CreateScope();
var fs = scope.ServiceProvider.GetRequiredService<FileService>();
var scopedDb = scope.ServiceProvider.GetRequiredService<AppDatabase>();
var pool = await fs.GetPoolAsync(remoteId);
if (pool is null) return;
var uploads = new List<(string FilePath, string Suffix, string ContentType, bool SelfDestruct)>();
var newMimeType = contentType;
var hasCompression = false;
var hasThumbnail = false;
try
{
logger.LogInformation("Processing file {FileId} in background...", fileId);
var fileToUpdate = await scopedDb.Files.AsNoTracking().FirstAsync(f => f.Id == fileId);
if (fileToUpdate.IsEncrypted)
{
uploads.Add((processingFilePath, string.Empty, contentType, false));
}
else if (!pool.PolicyConfig.NoOptimization)
{
var fileExtension = Path.GetExtension(processingFilePath);
switch (contentType.Split('/')[0])
{
case "image":
if (AnimatedImageTypes.Contains(contentType) || AnimatedImageExtensions.Contains(fileExtension))
{
logger.LogInformation("Skip optimize file {FileId} due to it is animated...", fileId);
uploads.Add((processingFilePath, string.Empty, contentType, false));
break;
}
newMimeType = "image/webp";
using (var vipsImage = Image.NewFromFile(processingFilePath))
{
var imageToWrite = vipsImage;
if (vipsImage.Interpretation is Enums.Interpretation.Scrgb or Enums.Interpretation.Xyz)
{
imageToWrite = vipsImage.Colourspace(Enums.Interpretation.Srgb);
}
var webpPath = Path.Join(Path.GetTempPath(), $"{fileId}.{TempFileSuffix}.webp");
imageToWrite.Autorot().WriteToFile(webpPath,
new VOption { { "lossless", true }, { "strip", true } });
uploads.Add((webpPath, string.Empty, newMimeType, true));
if (imageToWrite.Width * imageToWrite.Height >= 1024 * 1024)
{
var scale = 1024.0 / Math.Max(imageToWrite.Width, imageToWrite.Height);
var compressedPath =
Path.Join(Path.GetTempPath(), $"{fileId}.{TempFileSuffix}.compressed.webp");
using var compressedImage = imageToWrite.Resize(scale);
compressedImage.Autorot().WriteToFile(compressedPath,
new VOption { { "Q", 80 }, { "strip", true } });
uploads.Add((compressedPath, ".compressed", newMimeType, true));
hasCompression = true;
}
if (!ReferenceEquals(imageToWrite, vipsImage))
{
imageToWrite.Dispose();
}
}
break;
case "video":
uploads.Add((processingFilePath, string.Empty, contentType, false));
var thumbnailPath = Path.Join(Path.GetTempPath(), $"{fileId}.{TempFileSuffix}.thumbnail.jpg");
try
{
await FFMpegArguments
.FromFileInput(processingFilePath, verifyExists: true)
.OutputToFile(thumbnailPath, overwrite: true, options => options
.Seek(TimeSpan.FromSeconds(0))
.WithFrameOutputCount(1)
.WithCustomArgument("-q:v 2")
)
.NotifyOnOutput(line => logger.LogInformation("[FFmpeg] {Line}", line))
.NotifyOnError(line => logger.LogWarning("[FFmpeg] {Line}", line))
.ProcessAsynchronously();
if (File.Exists(thumbnailPath))
{
uploads.Add((thumbnailPath, ".thumbnail", "image/jpeg", true));
hasThumbnail = true;
}
else
{
logger.LogWarning("FFMpeg did not produce thumbnail for video {FileId}", fileId);
}
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to generate thumbnail for video {FileId}", fileId);
}
break;
default:
uploads.Add((processingFilePath, string.Empty, contentType, false));
break;
}
}
else uploads.Add((processingFilePath, string.Empty, contentType, false));
logger.LogInformation("Optimized file {FileId}, now uploading...", fileId);
if (uploads.Count > 0)
{
var destPool = remoteId;
var uploadTasks = uploads.Select(item =>
fs.UploadFileToRemoteAsync(
storageId,
destPool,
item.FilePath,
item.Suffix,
item.ContentType,
item.SelfDestruct
)
).ToList();
await Task.WhenAll(uploadTasks);
logger.LogInformation("Uploaded file {FileId} done!", fileId);
var now = SystemClock.Instance.GetCurrentInstant();
await scopedDb.Files.Where(f => f.Id == fileId).ExecuteUpdateAsync(setter => setter
.SetProperty(f => f.UploadedAt, now)
.SetProperty(f => f.PoolId, destPool)
.SetProperty(f => f.MimeType, newMimeType)
.SetProperty(f => f.HasCompression, hasCompression)
.SetProperty(f => f.HasThumbnail, hasThumbnail)
);
}
}
catch (Exception err)
{
logger.LogError(err, "Failed to process and upload {FileId}", fileId);
}
finally
{
if (isTempFile)
{
File.Delete(processingFilePath);
}
await fs._PurgeCacheAsync(fileId);
}
}
} }

View File

@@ -3,8 +3,8 @@ using Grpc.Core;
using NodaTime; using NodaTime;
using Duration = NodaTime.Duration; using Duration = NodaTime.Duration;
namespace DysonNetwork.Drive.Storage namespace DysonNetwork.Drive.Storage;
{
public class FileReferenceServiceGrpc(FileReferenceService fileReferenceService) public class FileReferenceServiceGrpc(FileReferenceService fileReferenceService)
: Shared.Proto.FileReferenceService.FileReferenceServiceBase : Shared.Proto.FileReferenceService.FileReferenceServiceBase
{ {
@@ -172,4 +172,3 @@ namespace DysonNetwork.Drive.Storage
return new HasFileReferencesResponse { HasReferences = hasReferences }; return new HasFileReferencesResponse { HasReferences = hasReferences };
} }
} }
}

View File

@@ -1,27 +1,28 @@
using System.Drawing;
using System.Globalization; using System.Globalization;
using FFMpegCore; using FFMpegCore;
using System.Security.Cryptography; using System.Security.Cryptography;
using DysonNetwork.Drive.Storage.Model;
using DysonNetwork.Shared.Cache; using DysonNetwork.Shared.Cache;
using DysonNetwork.Shared.Proto; using DysonNetwork.Shared.Proto;
using Google.Protobuf.WellKnownTypes; using Google.Protobuf.WellKnownTypes;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using Minio; using Minio;
using Minio.DataModel.Args; using Minio.DataModel.Args;
using NATS.Client.Core;
using NetVips; using NetVips;
using NodaTime; using NodaTime;
using tusdotnet.Stores;
using System.Linq.Expressions; using System.Linq.Expressions;
using DysonNetwork.Shared.Data; using DysonNetwork.Shared.Data;
using Microsoft.EntityFrameworkCore.Query; using Microsoft.EntityFrameworkCore.Query;
using NATS.Net;
namespace DysonNetwork.Drive.Storage; namespace DysonNetwork.Drive.Storage;
public class FileService( public class FileService(
AppDatabase db, AppDatabase db,
ILogger<FileService> logger, ILogger<FileService> logger,
IServiceScopeFactory scopeFactory, ICacheService cache,
ICacheService cache INatsConnection nats
) )
{ {
private const string CacheKeyPrefix = "file:"; private const string CacheKeyPrefix = "file:";
@@ -85,14 +86,6 @@ public class FileService(
.ToList(); .ToList();
} }
private const string TempFilePrefix = "dyn-cloudfile";
private static readonly string[] AnimatedImageTypes =
["image/gif", "image/apng", "image/avif"];
private static readonly string[] AnimatedImageExtensions =
[".gif", ".apng", ".avif"];
public async Task<CloudFile> ProcessNewFileAsync( public async Task<CloudFile> ProcessNewFileAsync(
Account account, Account account,
string fileId, string fileId,
@@ -135,7 +128,8 @@ public class FileService(
var fileInfo = new FileInfo(managedTempPath); var fileInfo = new FileInfo(managedTempPath);
var fileSize = fileInfo.Length; var fileSize = fileInfo.Length;
var finalContentType = contentType ?? (!fileName.Contains('.') ? "application/octet-stream" : MimeTypes.GetMimeType(fileName)); var finalContentType = contentType ??
(!fileName.Contains('.') ? "application/octet-stream" : MimeTypes.GetMimeType(fileName));
var file = new CloudFile var file = new CloudFile
{ {
@@ -180,8 +174,18 @@ public class FileService(
file.StorageId ??= file.Id; file.StorageId ??= file.Id;
_ = Task.Run(() => var js = nats.CreateJetStreamContext();
ProcessAndUploadInBackgroundAsync(file.Id, filePool, file.StorageId, file.MimeType, processingPath, isTempFile)); await js.PublishAsync(
FileUploadedEvent.Type,
GrpcTypeHelper.ConvertObjectToByteString(new FileUploadedEventPayload(
file.Id,
pool.Id,
file.StorageId,
file.MimeType,
processingPath,
isTempFile)
).ToByteArray()
);
return file; return file;
} }
@@ -296,170 +300,6 @@ public class FileService(
} }
} }
private async Task ProcessAndUploadInBackgroundAsync(
string fileId,
string remoteId,
string storageId,
string contentType,
string processingFilePath,
bool isTempFile
)
{
var pool = await GetPoolAsync(Guid.Parse(remoteId));
if (pool is null) return;
using var scope = scopeFactory.CreateScope();
var nfs = scope.ServiceProvider.GetRequiredService<FileService>();
var scopedDb = scope.ServiceProvider.GetRequiredService<AppDatabase>();
var uploads = new List<(string FilePath, string Suffix, string ContentType, bool SelfDestruct)>();
var newMimeType = contentType;
var hasCompression = false;
var hasThumbnail = false;
try
{
logger.LogInformation("Processing file {FileId} in background...", fileId);
var fileToUpdate = await scopedDb.Files.AsNoTracking().FirstAsync(f => f.Id == fileId);
if (fileToUpdate.IsEncrypted)
{
uploads.Add((processingFilePath, string.Empty, contentType, false));
}
else if (!pool.PolicyConfig.NoOptimization)
{
var fileExtension = Path.GetExtension(processingFilePath);
switch (contentType.Split('/')[0])
{
case "image":
if (AnimatedImageTypes.Contains(contentType) || AnimatedImageExtensions.Contains(fileExtension))
{
logger.LogInformation("Skip optimize file {FileId} due to it is animated...", fileId);
uploads.Add((processingFilePath, string.Empty, contentType, false));
break;
}
newMimeType = "image/webp";
using (var vipsImage = Image.NewFromFile(processingFilePath))
{
var imageToWrite = vipsImage;
if (vipsImage.Interpretation is Enums.Interpretation.Scrgb or Enums.Interpretation.Xyz)
{
imageToWrite = vipsImage.Colourspace(Enums.Interpretation.Srgb);
}
var webpPath = Path.Join(Path.GetTempPath(), $"{TempFilePrefix}#{fileId}.webp");
imageToWrite.Autorot().WriteToFile(webpPath,
new VOption { { "lossless", true }, { "strip", true } });
uploads.Add((webpPath, string.Empty, newMimeType, true));
if (imageToWrite.Width * imageToWrite.Height >= 1024 * 1024)
{
var scale = 1024.0 / Math.Max(imageToWrite.Width, imageToWrite.Height);
var compressedPath =
Path.Join(Path.GetTempPath(), $"{TempFilePrefix}#{fileId}-compressed.webp");
using var compressedImage = imageToWrite.Resize(scale);
compressedImage.Autorot().WriteToFile(compressedPath,
new VOption { { "Q", 80 }, { "strip", true } });
uploads.Add((compressedPath, ".compressed", newMimeType, true));
hasCompression = true;
}
if (!ReferenceEquals(imageToWrite, vipsImage))
{
imageToWrite.Dispose();
}
}
break;
case "video":
uploads.Add((processingFilePath, string.Empty, contentType, false));
var thumbnailPath = Path.Join(Path.GetTempPath(), $"{TempFilePrefix}#{fileId}.thumbnail.jpg");
try
{
await FFMpegArguments
.FromFileInput(processingFilePath, verifyExists: true)
.OutputToFile(thumbnailPath, overwrite: true, options => options
.Seek(TimeSpan.FromSeconds(0))
.WithFrameOutputCount(1)
.WithCustomArgument("-q:v 2")
)
.NotifyOnOutput(line => logger.LogInformation("[FFmpeg] {Line}", line))
.NotifyOnError(line => logger.LogWarning("[FFmpeg] {Line}", line))
.ProcessAsynchronously();
if (File.Exists(thumbnailPath))
{
uploads.Add((thumbnailPath, ".thumbnail", "image/jpeg", true));
hasThumbnail = true;
}
else
{
logger.LogWarning("FFMpeg did not produce thumbnail for video {FileId}", fileId);
}
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to generate thumbnail for video {FileId}", fileId);
}
break;
default:
uploads.Add((processingFilePath, string.Empty, contentType, false));
break;
}
}
else uploads.Add((processingFilePath, string.Empty, contentType, false));
logger.LogInformation("Optimized file {FileId}, now uploading...", fileId);
if (uploads.Count > 0)
{
var destPool = Guid.Parse(remoteId!);
var uploadTasks = uploads.Select(item =>
nfs.UploadFileToRemoteAsync(
storageId,
destPool,
item.FilePath,
item.Suffix,
item.ContentType,
item.SelfDestruct
)
).ToList();
await Task.WhenAll(uploadTasks);
logger.LogInformation("Uploaded file {FileId} done!", fileId);
var now = SystemClock.Instance.GetCurrentInstant();
await scopedDb.Files.Where(f => f.Id == fileId).ExecuteUpdateAsync(setter => setter
.SetProperty(f => f.UploadedAt, now)
.SetProperty(f => f.PoolId, destPool)
.SetProperty(f => f.MimeType, newMimeType)
.SetProperty(f => f.HasCompression, hasCompression)
.SetProperty(f => f.HasThumbnail, hasThumbnail)
);
}
}
catch (Exception err)
{
logger.LogError(err, "Failed to process and upload {FileId}", fileId);
}
finally
{
if (isTempFile)
{
File.Delete(processingFilePath);
}
await nfs._PurgeCacheAsync(fileId);
}
}
private static async Task<string> HashFileAsync(string filePath, int chunkSize = 1024 * 1024) private static async Task<string> HashFileAsync(string filePath, int chunkSize = 1024 * 1024)
{ {
var fileInfo = new FileInfo(filePath); var fileInfo = new FileInfo(filePath);
@@ -492,7 +332,7 @@ public class FileService(
return Convert.ToHexString(hash).ToLowerInvariant(); return Convert.ToHexString(hash).ToLowerInvariant();
} }
private async Task UploadFileToRemoteAsync( public async Task UploadFileToRemoteAsync(
string storageId, string storageId,
Guid targetRemote, Guid targetRemote,
string filePath, string filePath,
@@ -506,7 +346,7 @@ public class FileService(
if (selfDestruct) File.Delete(filePath); if (selfDestruct) File.Delete(filePath);
} }
private async Task UploadFileToRemoteAsync( public async Task UploadFileToRemoteAsync(
string storageId, string storageId,
Guid targetRemote, Guid targetRemote,
Stream stream, Stream stream,
@@ -882,7 +722,7 @@ file class UpdatableCloudFile(CloudFile file)
.SetProperty(f => f.Name, Name) .SetProperty(f => f.Name, Name)
.SetProperty(f => f.Description, Description) .SetProperty(f => f.Description, Description)
.SetProperty(f => f.FileMeta, FileMeta) .SetProperty(f => f.FileMeta, FileMeta)
.SetProperty(f => f.UserMeta, userMeta!) .SetProperty(f => f.UserMeta, userMeta)
.SetProperty(f => f.IsMarkedRecycle, IsMarkedRecycle); .SetProperty(f => f.IsMarkedRecycle, IsMarkedRecycle);
} }
} }

View File

@@ -0,0 +1,15 @@
namespace DysonNetwork.Drive.Storage.Model;
public static class FileUploadedEvent
{
public const string Type = "file_uploaded";
}
public record FileUploadedEventPayload(
string FileId,
Guid RemoteId,
string StorageId,
string ContentType,
string ProcessingFilePath,
bool IsTempFile
);

View File

@@ -1,4 +1,3 @@
using DysonNetwork.Drive.Storage;
using NodaTime; using NodaTime;
namespace DysonNetwork.Drive.Storage.Model namespace DysonNetwork.Drive.Storage.Model

View File

@@ -16,7 +16,7 @@ To begin a file upload, you first need to create an upload task. This is done by
"file_name": "string", "file_name": "string",
"file_size": "long (in bytes)", "file_size": "long (in bytes)",
"content_type": "string (e.g., 'image/jpeg')", "content_type": "string (e.g., 'image/jpeg')",
"pool_id": "string (GUID)", "pool_id": "string (GUID, optional)",
"bundle_id": "string (GUID, optional)", "bundle_id": "string (GUID, optional)",
"encrypt_password": "string (optional)", "encrypt_password": "string (optional)",
"expired_at": "string (ISO 8601 format, optional)", "expired_at": "string (ISO 8601 format, optional)",