Compare commits

...

2 Commits

Author SHA1 Message Date
f81e3dc9f4 ♻️ Move file analyze, upload into message queue 2025-09-21 19:38:40 +08:00
b2a0d25ffa Functionable new upload method 2025-09-21 18:32:08 +08:00
11 changed files with 495 additions and 454 deletions

View File

@@ -1,5 +1,3 @@
using System.Net;
using System.Net.Sockets;
using Aspire.Hosting.Yarp.Transforms; using Aspire.Hosting.Yarp.Transforms;
using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Hosting;
@@ -62,7 +60,7 @@ for (var idx = 0; idx < services.Count; idx++)
// Extra double-ended references // Extra double-ended references
ringService.WithReference(passService); ringService.WithReference(passService);
builder.AddYarp("gateway") var gateway = builder.AddYarp("gateway")
.WithConfiguration(yarp => .WithConfiguration(yarp =>
{ {
var ringCluster = yarp.AddCluster(ringService.GetEndpoint("http")); var ringCluster = yarp.AddCluster(ringService.GetEndpoint("http"));
@@ -91,6 +89,8 @@ builder.AddYarp("gateway")
.WithTransformPathPrefix("/api"); .WithTransformPathPrefix("/api");
}); });
if (isDev) gateway.WithHostPort(5001);
builder.AddDockerComposeEnvironment("docker-compose"); builder.AddDockerComposeEnvironment("docker-compose");
builder.Build().Run(); builder.Build().Run();

View File

@@ -1,10 +1,16 @@
using System.Text.Json; using System.Text.Json;
using DysonNetwork.Drive.Storage; using DysonNetwork.Drive.Storage.Model;
using DysonNetwork.Shared.Proto;
using DysonNetwork.Shared.Stream; using DysonNetwork.Shared.Stream;
using FFMpegCore;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using NATS.Client.Core; using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models; using NATS.Client.JetStream.Models;
using NATS.Net; using NATS.Net;
using NetVips;
using NodaTime;
using FileService = DysonNetwork.Drive.Storage.FileService;
namespace DysonNetwork.Drive.Startup; namespace DysonNetwork.Drive.Startup;
@@ -14,20 +20,72 @@ public class BroadcastEventHandler(
IServiceProvider serviceProvider IServiceProvider serviceProvider
) : BackgroundService ) : BackgroundService
{ {
private const string TempFileSuffix = "dypart";
private static readonly string[] AnimatedImageTypes =
["image/gif", "image/apng", "image/avif"];
private static readonly string[] AnimatedImageExtensions =
[".gif", ".apng", ".avif"];
protected override async Task ExecuteAsync(CancellationToken stoppingToken) protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{ {
var js = nats.CreateJetStreamContext(); var js = nats.CreateJetStreamContext();
await js.EnsureStreamCreated("account_events", [AccountDeletedEvent.Type]); await js.EnsureStreamCreated("account_events", [AccountDeletedEvent.Type]);
var consumer = await js.CreateOrUpdateConsumerAsync("account_events", var accountEventConsumer = await js.CreateOrUpdateConsumerAsync("account_events",
new ConsumerConfig("drive_account_deleted_handler"), cancellationToken: stoppingToken); new ConsumerConfig("drive_account_deleted_handler"), cancellationToken: stoppingToken);
await js.EnsureStreamCreated("file_events", [FileUploadedEvent.Type]);
var fileUploadedConsumer = await js.CreateOrUpdateConsumerAsync("file_events",
new ConsumerConfig("drive_file_uploaded_handler"), cancellationToken: stoppingToken);
var accountDeletedTask = HandleAccountDeleted(accountEventConsumer, stoppingToken);
var fileUploadedTask = HandleFileUploaded(fileUploadedConsumer, stoppingToken);
await Task.WhenAll(accountDeletedTask, fileUploadedTask);
}
private async Task HandleFileUploaded(INatsJSConsumer consumer, CancellationToken stoppingToken)
{
await foreach (var msg in consumer.ConsumeAsync<byte[]>(cancellationToken: stoppingToken))
{
var payload = JsonSerializer.Deserialize<FileUploadedEventPayload>(msg.Data, GrpcTypeHelper.SerializerOptions);
if (payload == null)
{
await msg.AckAsync(cancellationToken: stoppingToken);
continue;
}
try
{
await ProcessAndUploadInBackgroundAsync(
payload.FileId,
payload.RemoteId,
payload.StorageId,
payload.ContentType,
payload.ProcessingFilePath,
payload.IsTempFile
);
await msg.AckAsync(cancellationToken: stoppingToken);
}
catch (Exception ex)
{
logger.LogError(ex, "Error processing FileUploadedEvent for file {FileId}", payload?.FileId);
await msg.NakAsync(cancellationToken: stoppingToken);
}
}
}
private async Task HandleAccountDeleted(INatsJSConsumer consumer, CancellationToken stoppingToken)
{
await foreach (var msg in consumer.ConsumeAsync<byte[]>(cancellationToken: stoppingToken)) await foreach (var msg in consumer.ConsumeAsync<byte[]>(cancellationToken: stoppingToken))
{ {
try try
{ {
var evt = JsonSerializer.Deserialize<AccountDeletedEvent>(msg.Data); var evt = JsonSerializer.Deserialize<AccountDeletedEvent>(msg.Data, GrpcTypeHelper.SerializerOptions);
if (evt == null) if (evt == null)
{ {
await msg.AckAsync(cancellationToken: stoppingToken); await msg.AckAsync(cancellationToken: stoppingToken);
@@ -69,4 +127,168 @@ public class BroadcastEventHandler(
} }
} }
} }
private async Task ProcessAndUploadInBackgroundAsync(
string fileId,
Guid remoteId,
string storageId,
string contentType,
string processingFilePath,
bool isTempFile
)
{
using var scope = serviceProvider.CreateScope();
var fs = scope.ServiceProvider.GetRequiredService<FileService>();
var scopedDb = scope.ServiceProvider.GetRequiredService<AppDatabase>();
var pool = await fs.GetPoolAsync(remoteId);
if (pool is null) return;
var uploads = new List<(string FilePath, string Suffix, string ContentType, bool SelfDestruct)>();
var newMimeType = contentType;
var hasCompression = false;
var hasThumbnail = false;
try
{
logger.LogInformation("Processing file {FileId} in background...", fileId);
var fileToUpdate = await scopedDb.Files.AsNoTracking().FirstAsync(f => f.Id == fileId);
if (fileToUpdate.IsEncrypted)
{
uploads.Add((processingFilePath, string.Empty, contentType, false));
}
else if (!pool.PolicyConfig.NoOptimization)
{
var fileExtension = Path.GetExtension(processingFilePath);
switch (contentType.Split('/')[0])
{
case "image":
if (AnimatedImageTypes.Contains(contentType) || AnimatedImageExtensions.Contains(fileExtension))
{
logger.LogInformation("Skip optimize file {FileId} due to it is animated...", fileId);
uploads.Add((processingFilePath, string.Empty, contentType, false));
break;
}
newMimeType = "image/webp";
using (var vipsImage = Image.NewFromFile(processingFilePath))
{
var imageToWrite = vipsImage;
if (vipsImage.Interpretation is Enums.Interpretation.Scrgb or Enums.Interpretation.Xyz)
{
imageToWrite = vipsImage.Colourspace(Enums.Interpretation.Srgb);
}
var webpPath = Path.Join(Path.GetTempPath(), $"{fileId}.{TempFileSuffix}.webp");
imageToWrite.Autorot().WriteToFile(webpPath,
new VOption { { "lossless", true }, { "strip", true } });
uploads.Add((webpPath, string.Empty, newMimeType, true));
if (imageToWrite.Width * imageToWrite.Height >= 1024 * 1024)
{
var scale = 1024.0 / Math.Max(imageToWrite.Width, imageToWrite.Height);
var compressedPath =
Path.Join(Path.GetTempPath(), $"{fileId}.{TempFileSuffix}.compressed.webp");
using var compressedImage = imageToWrite.Resize(scale);
compressedImage.Autorot().WriteToFile(compressedPath,
new VOption { { "Q", 80 }, { "strip", true } });
uploads.Add((compressedPath, ".compressed", newMimeType, true));
hasCompression = true;
}
if (!ReferenceEquals(imageToWrite, vipsImage))
{
imageToWrite.Dispose();
}
}
break;
case "video":
uploads.Add((processingFilePath, string.Empty, contentType, false));
var thumbnailPath = Path.Join(Path.GetTempPath(), $"{fileId}.{TempFileSuffix}.thumbnail.jpg");
try
{
await FFMpegArguments
.FromFileInput(processingFilePath, verifyExists: true)
.OutputToFile(thumbnailPath, overwrite: true, options => options
.Seek(TimeSpan.FromSeconds(0))
.WithFrameOutputCount(1)
.WithCustomArgument("-q:v 2")
)
.NotifyOnOutput(line => logger.LogInformation("[FFmpeg] {Line}", line))
.NotifyOnError(line => logger.LogWarning("[FFmpeg] {Line}", line))
.ProcessAsynchronously();
if (File.Exists(thumbnailPath))
{
uploads.Add((thumbnailPath, ".thumbnail", "image/jpeg", true));
hasThumbnail = true;
}
else
{
logger.LogWarning("FFMpeg did not produce thumbnail for video {FileId}", fileId);
}
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to generate thumbnail for video {FileId}", fileId);
}
break;
default:
uploads.Add((processingFilePath, string.Empty, contentType, false));
break;
}
}
else uploads.Add((processingFilePath, string.Empty, contentType, false));
logger.LogInformation("Optimized file {FileId}, now uploading...", fileId);
if (uploads.Count > 0)
{
var destPool = remoteId;
var uploadTasks = uploads.Select(item =>
fs.UploadFileToRemoteAsync(
storageId,
destPool,
item.FilePath,
item.Suffix,
item.ContentType,
item.SelfDestruct
)
).ToList();
await Task.WhenAll(uploadTasks);
logger.LogInformation("Uploaded file {FileId} done!", fileId);
var now = SystemClock.Instance.GetCurrentInstant();
await scopedDb.Files.Where(f => f.Id == fileId).ExecuteUpdateAsync(setter => setter
.SetProperty(f => f.UploadedAt, now)
.SetProperty(f => f.PoolId, destPool)
.SetProperty(f => f.MimeType, newMimeType)
.SetProperty(f => f.HasCompression, hasCompression)
.SetProperty(f => f.HasThumbnail, hasThumbnail)
);
}
}
catch (Exception err)
{
logger.LogError(err, "Failed to process and upload {FileId}", fileId);
}
finally
{
if (isTempFile)
{
File.Delete(processingFilePath);
}
await fs._PurgeCacheAsync(fileId);
}
}
} }

View File

@@ -46,12 +46,36 @@ public class FileController(
if (!string.IsNullOrWhiteSpace(file.StorageUrl)) return Redirect(file.StorageUrl); if (!string.IsNullOrWhiteSpace(file.StorageUrl)) return Redirect(file.StorageUrl);
if (file.UploadedAt is null)
{
// File is not yet uploaded to remote storage. Try to serve from local temp storage.
var tempFilePath = Path.Combine(Path.GetTempPath(), file.Id);
if (System.IO.File.Exists(tempFilePath))
{
if (file.IsEncrypted)
{
return StatusCode(StatusCodes.Status403Forbidden, "Encrypted files cannot be accessed before they are processed and stored.");
}
return PhysicalFile(tempFilePath, file.MimeType ?? "application/octet-stream", file.Name, enableRangeProcessing: true);
}
// Fallback for tus uploads that are not processed yet.
var tusStorePath = configuration.GetValue<string>("Tus:StorePath");
if (!string.IsNullOrEmpty(tusStorePath))
{
var tusFilePath = Path.Combine(env.ContentRootPath, tusStorePath, file.Id);
if (System.IO.File.Exists(tusFilePath))
{
return PhysicalFile(tusFilePath, file.MimeType ?? "application/octet-stream", file.Name, enableRangeProcessing: true);
}
}
return StatusCode(StatusCodes.Status503ServiceUnavailable, "File is being processed. Please try again later.");
}
if (!file.PoolId.HasValue) if (!file.PoolId.HasValue)
{ {
var tusStorePath = configuration.GetValue<string>("Tus:StorePath")!; return StatusCode(StatusCodes.Status500InternalServerError, "File is in an inconsistent state: uploaded but no pool ID.");
var filePath = Path.Combine(env.ContentRootPath, tusStorePath, file.Id);
if (!System.IO.File.Exists(filePath)) return new NotFoundResult();
return PhysicalFile(filePath, file.MimeType ?? "application/octet-stream", file.Name);
} }
var pool = await fs.GetPoolAsync(file.PoolId.Value); var pool = await fs.GetPoolAsync(file.PoolId.Value);

View File

@@ -3,173 +3,172 @@ using Grpc.Core;
using NodaTime; using NodaTime;
using Duration = NodaTime.Duration; using Duration = NodaTime.Duration;
namespace DysonNetwork.Drive.Storage namespace DysonNetwork.Drive.Storage;
public class FileReferenceServiceGrpc(FileReferenceService fileReferenceService)
: Shared.Proto.FileReferenceService.FileReferenceServiceBase
{ {
public class FileReferenceServiceGrpc(FileReferenceService fileReferenceService) public override async Task<Shared.Proto.CloudFileReference> CreateReference(CreateReferenceRequest request,
: Shared.Proto.FileReferenceService.FileReferenceServiceBase ServerCallContext context)
{ {
public override async Task<Shared.Proto.CloudFileReference> CreateReference(CreateReferenceRequest request, Instant? expiredAt = null;
ServerCallContext context) if (request.ExpiredAt != null)
{ expiredAt = Instant.FromUnixTimeSeconds(request.ExpiredAt.Seconds);
Instant? expiredAt = null; else if (request.Duration != null)
if (request.ExpiredAt != null) expiredAt = SystemClock.Instance.GetCurrentInstant() +
expiredAt = Instant.FromUnixTimeSeconds(request.ExpiredAt.Seconds); Duration.FromTimeSpan(request.Duration.ToTimeSpan());
else if (request.Duration != null)
expiredAt = SystemClock.Instance.GetCurrentInstant() +
Duration.FromTimeSpan(request.Duration.ToTimeSpan());
var reference = await fileReferenceService.CreateReferenceAsync( var reference = await fileReferenceService.CreateReferenceAsync(
request.FileId, request.FileId,
request.Usage, request.Usage,
request.ResourceId, request.ResourceId,
expiredAt expiredAt
); );
return reference.ToProtoValue(); return reference.ToProtoValue();
} }
public override async Task<CreateReferenceBatchResponse> CreateReferenceBatch(CreateReferenceBatchRequest request, public override async Task<CreateReferenceBatchResponse> CreateReferenceBatch(CreateReferenceBatchRequest request,
ServerCallContext context) ServerCallContext context)
{ {
Instant? expiredAt = null; Instant? expiredAt = null;
if (request.ExpiredAt != null) if (request.ExpiredAt != null)
expiredAt = Instant.FromUnixTimeSeconds(request.ExpiredAt.Seconds); expiredAt = Instant.FromUnixTimeSeconds(request.ExpiredAt.Seconds);
else if (request.Duration != null) else if (request.Duration != null)
expiredAt = SystemClock.Instance.GetCurrentInstant() + expiredAt = SystemClock.Instance.GetCurrentInstant() +
Duration.FromTimeSpan(request.Duration.ToTimeSpan()); Duration.FromTimeSpan(request.Duration.ToTimeSpan());
var references = await fileReferenceService.CreateReferencesAsync( var references = await fileReferenceService.CreateReferencesAsync(
request.FilesId.ToList(), request.FilesId.ToList(),
request.Usage, request.Usage,
request.ResourceId, request.ResourceId,
expiredAt expiredAt
); );
var response = new CreateReferenceBatchResponse(); var response = new CreateReferenceBatchResponse();
response.References.AddRange(references.Select(r => r.ToProtoValue())); response.References.AddRange(references.Select(r => r.ToProtoValue()));
return response; return response;
} }
public override async Task<GetReferencesResponse> GetReferences(GetReferencesRequest request, public override async Task<GetReferencesResponse> GetReferences(GetReferencesRequest request,
ServerCallContext context) ServerCallContext context)
{ {
var references = await fileReferenceService.GetReferencesAsync(request.FileId); var references = await fileReferenceService.GetReferencesAsync(request.FileId);
var response = new GetReferencesResponse(); var response = new GetReferencesResponse();
response.References.AddRange(references.Select(r => r.ToProtoValue())); response.References.AddRange(references.Select(r => r.ToProtoValue()));
return response; return response;
} }
public override async Task<GetReferenceCountResponse> GetReferenceCount(GetReferenceCountRequest request, public override async Task<GetReferenceCountResponse> GetReferenceCount(GetReferenceCountRequest request,
ServerCallContext context) ServerCallContext context)
{ {
var count = await fileReferenceService.GetReferenceCountAsync(request.FileId); var count = await fileReferenceService.GetReferenceCountAsync(request.FileId);
return new GetReferenceCountResponse { Count = count }; return new GetReferenceCountResponse { Count = count };
} }
public override async Task<GetReferencesResponse> GetResourceReferences(GetResourceReferencesRequest request, public override async Task<GetReferencesResponse> GetResourceReferences(GetResourceReferencesRequest request,
ServerCallContext context) ServerCallContext context)
{ {
var references = await fileReferenceService.GetResourceReferencesAsync(request.ResourceId, request.Usage); var references = await fileReferenceService.GetResourceReferencesAsync(request.ResourceId, request.Usage);
var response = new GetReferencesResponse(); var response = new GetReferencesResponse();
response.References.AddRange(references.Select(r => r.ToProtoValue())); response.References.AddRange(references.Select(r => r.ToProtoValue()));
return response; return response;
} }
public override async Task<GetResourceFilesResponse> GetResourceFiles(GetResourceFilesRequest request, public override async Task<GetResourceFilesResponse> GetResourceFiles(GetResourceFilesRequest request,
ServerCallContext context) ServerCallContext context)
{ {
var files = await fileReferenceService.GetResourceFilesAsync(request.ResourceId, request.Usage); var files = await fileReferenceService.GetResourceFilesAsync(request.ResourceId, request.Usage);
var response = new GetResourceFilesResponse(); var response = new GetResourceFilesResponse();
response.Files.AddRange(files.Select(f => f.ToProtoValue())); response.Files.AddRange(files.Select(f => f.ToProtoValue()));
return response; return response;
} }
public override async Task<DeleteResourceReferencesResponse> DeleteResourceReferences( public override async Task<DeleteResourceReferencesResponse> DeleteResourceReferences(
DeleteResourceReferencesRequest request, ServerCallContext context) DeleteResourceReferencesRequest request, ServerCallContext context)
{ {
int deletedCount; int deletedCount;
if (request.Usage is null) if (request.Usage is null)
deletedCount = await fileReferenceService.DeleteResourceReferencesAsync(request.ResourceId); deletedCount = await fileReferenceService.DeleteResourceReferencesAsync(request.ResourceId);
else else
deletedCount = deletedCount =
await fileReferenceService.DeleteResourceReferencesAsync(request.ResourceId, request.Usage!); await fileReferenceService.DeleteResourceReferencesAsync(request.ResourceId, request.Usage!);
return new DeleteResourceReferencesResponse { DeletedCount = deletedCount }; return new DeleteResourceReferencesResponse { DeletedCount = deletedCount };
} }
public override async Task<DeleteResourceReferencesResponse> DeleteResourceReferencesBatch(DeleteResourceReferencesBatchRequest request, ServerCallContext context) public override async Task<DeleteResourceReferencesResponse> DeleteResourceReferencesBatch(DeleteResourceReferencesBatchRequest request, ServerCallContext context)
{
var resourceIds = request.ResourceIds.ToList();
int deletedCount;
if (request.Usage is null)
deletedCount = await fileReferenceService.DeleteResourceReferencesBatchAsync(resourceIds);
else
deletedCount =
await fileReferenceService.DeleteResourceReferencesBatchAsync(resourceIds, request.Usage!);
return new DeleteResourceReferencesResponse { DeletedCount = deletedCount };
}
public override async Task<DeleteReferenceResponse> DeleteReference(DeleteReferenceRequest request,
ServerCallContext context)
{
var success = await fileReferenceService.DeleteReferenceAsync(Guid.Parse(request.ReferenceId));
return new DeleteReferenceResponse { Success = success };
}
public override async Task<UpdateResourceFilesResponse> UpdateResourceFiles(UpdateResourceFilesRequest request,
ServerCallContext context)
{
Instant? expiredAt = null;
if (request.ExpiredAt != null)
{ {
var resourceIds = request.ResourceIds.ToList(); expiredAt = Instant.FromUnixTimeSeconds(request.ExpiredAt.Seconds);
int deletedCount; }
if (request.Usage is null) else if (request.Duration != null)
deletedCount = await fileReferenceService.DeleteResourceReferencesBatchAsync(resourceIds); {
else expiredAt = SystemClock.Instance.GetCurrentInstant() +
deletedCount = Duration.FromTimeSpan(request.Duration.ToTimeSpan());
await fileReferenceService.DeleteResourceReferencesBatchAsync(resourceIds, request.Usage!);
return new DeleteResourceReferencesResponse { DeletedCount = deletedCount };
} }
public override async Task<DeleteReferenceResponse> DeleteReference(DeleteReferenceRequest request, var references = await fileReferenceService.UpdateResourceFilesAsync(
ServerCallContext context) request.ResourceId,
request.FileIds,
request.Usage,
expiredAt
);
var response = new UpdateResourceFilesResponse();
response.References.AddRange(references.Select(r => r.ToProtoValue()));
return response;
}
public override async Task<SetReferenceExpirationResponse> SetReferenceExpiration(
SetReferenceExpirationRequest request, ServerCallContext context)
{
Instant? expiredAt = null;
if (request.ExpiredAt != null)
{ {
var success = await fileReferenceService.DeleteReferenceAsync(Guid.Parse(request.ReferenceId)); expiredAt = Instant.FromUnixTimeSeconds(request.ExpiredAt.Seconds);
return new DeleteReferenceResponse { Success = success }; }
else if (request.Duration != null)
{
expiredAt = SystemClock.Instance.GetCurrentInstant() +
Duration.FromTimeSpan(request.Duration.ToTimeSpan());
} }
public override async Task<UpdateResourceFilesResponse> UpdateResourceFiles(UpdateResourceFilesRequest request, var success =
ServerCallContext context) await fileReferenceService.SetReferenceExpirationAsync(Guid.Parse(request.ReferenceId), expiredAt);
{ return new SetReferenceExpirationResponse { Success = success };
Instant? expiredAt = null; }
if (request.ExpiredAt != null)
{
expiredAt = Instant.FromUnixTimeSeconds(request.ExpiredAt.Seconds);
}
else if (request.Duration != null)
{
expiredAt = SystemClock.Instance.GetCurrentInstant() +
Duration.FromTimeSpan(request.Duration.ToTimeSpan());
}
var references = await fileReferenceService.UpdateResourceFilesAsync( public override async Task<SetFileReferencesExpirationResponse> SetFileReferencesExpiration(
request.ResourceId, SetFileReferencesExpirationRequest request, ServerCallContext context)
request.FileIds, {
request.Usage, var expiredAt = Instant.FromUnixTimeSeconds(request.ExpiredAt.Seconds);
expiredAt var updatedCount = await fileReferenceService.SetFileReferencesExpirationAsync(request.FileId, expiredAt);
); return new SetFileReferencesExpirationResponse { UpdatedCount = updatedCount };
var response = new UpdateResourceFilesResponse(); }
response.References.AddRange(references.Select(r => r.ToProtoValue()));
return response;
}
public override async Task<SetReferenceExpirationResponse> SetReferenceExpiration( public override async Task<HasFileReferencesResponse> HasFileReferences(HasFileReferencesRequest request,
SetReferenceExpirationRequest request, ServerCallContext context) ServerCallContext context)
{ {
Instant? expiredAt = null; var hasReferences = await fileReferenceService.HasFileReferencesAsync(request.FileId);
if (request.ExpiredAt != null) return new HasFileReferencesResponse { HasReferences = hasReferences };
{
expiredAt = Instant.FromUnixTimeSeconds(request.ExpiredAt.Seconds);
}
else if (request.Duration != null)
{
expiredAt = SystemClock.Instance.GetCurrentInstant() +
Duration.FromTimeSpan(request.Duration.ToTimeSpan());
}
var success =
await fileReferenceService.SetReferenceExpirationAsync(Guid.Parse(request.ReferenceId), expiredAt);
return new SetReferenceExpirationResponse { Success = success };
}
public override async Task<SetFileReferencesExpirationResponse> SetFileReferencesExpiration(
SetFileReferencesExpirationRequest request, ServerCallContext context)
{
var expiredAt = Instant.FromUnixTimeSeconds(request.ExpiredAt.Seconds);
var updatedCount = await fileReferenceService.SetFileReferencesExpirationAsync(request.FileId, expiredAt);
return new SetFileReferencesExpirationResponse { UpdatedCount = updatedCount };
}
public override async Task<HasFileReferencesResponse> HasFileReferences(HasFileReferencesRequest request,
ServerCallContext context)
{
var hasReferences = await fileReferenceService.HasFileReferencesAsync(request.FileId);
return new HasFileReferencesResponse { HasReferences = hasReferences };
}
} }
} }

View File

@@ -1,41 +1,33 @@
using System.Drawing;
using System.Globalization; using System.Globalization;
using FFMpegCore; using FFMpegCore;
using System.Security.Cryptography; using System.Security.Cryptography;
using DysonNetwork.Drive.Storage.Model;
using DysonNetwork.Shared.Cache; using DysonNetwork.Shared.Cache;
using DysonNetwork.Shared.Proto; using DysonNetwork.Shared.Proto;
using Google.Protobuf.WellKnownTypes; using Google.Protobuf.WellKnownTypes;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using Minio; using Minio;
using Minio.DataModel.Args; using Minio.DataModel.Args;
using NATS.Client.Core;
using NetVips; using NetVips;
using NodaTime; using NodaTime;
using tusdotnet.Stores;
using System.Linq.Expressions; using System.Linq.Expressions;
using DysonNetwork.Shared.Data; using DysonNetwork.Shared.Data;
using Microsoft.EntityFrameworkCore.Query; using Microsoft.EntityFrameworkCore.Query;
using NATS.Net;
namespace DysonNetwork.Drive.Storage; namespace DysonNetwork.Drive.Storage;
public class FileService( public class FileService(
AppDatabase db, AppDatabase db,
IConfiguration configuration,
ILogger<FileService> logger, ILogger<FileService> logger,
IServiceScopeFactory scopeFactory, ICacheService cache,
ICacheService cache INatsConnection nats
) )
{ {
private const string CacheKeyPrefix = "file:"; private const string CacheKeyPrefix = "file:";
private static readonly TimeSpan CacheDuration = TimeSpan.FromMinutes(15); private static readonly TimeSpan CacheDuration = TimeSpan.FromMinutes(15);
/// <summary>
/// The api for getting file meta with cache,
/// the best use case is for accessing the file data.
///
/// <b>This function won't load uploader's information, only keep minimal file meta</b>
/// </summary>
/// <param name="fileId">The id of the cloud file requested</param>
/// <returns>The minimal file meta</returns>
public async Task<CloudFile?> GetFileAsync(string fileId) public async Task<CloudFile?> GetFileAsync(string fileId)
{ {
var cacheKey = $"{CacheKeyPrefix}{fileId}"; var cacheKey = $"{CacheKeyPrefix}{fileId}";
@@ -61,7 +53,6 @@ public class FileService(
var cachedFiles = new Dictionary<string, CloudFile>(); var cachedFiles = new Dictionary<string, CloudFile>();
var uncachedIds = new List<string>(); var uncachedIds = new List<string>();
// Check cache first
foreach (var fileId in fileIds) foreach (var fileId in fileIds)
{ {
var cacheKey = $"{CacheKeyPrefix}{fileId}"; var cacheKey = $"{CacheKeyPrefix}{fileId}";
@@ -73,7 +64,6 @@ public class FileService(
uncachedIds.Add(fileId); uncachedIds.Add(fileId);
} }
// Load uncached files from database
if (uncachedIds.Count > 0) if (uncachedIds.Count > 0)
{ {
var dbFiles = await db.Files var dbFiles = await db.Files
@@ -81,7 +71,6 @@ public class FileService(
.Include(f => f.Pool) .Include(f => f.Pool)
.ToListAsync(); .ToListAsync();
// Add to cache
foreach (var file in dbFiles) foreach (var file in dbFiles)
{ {
var cacheKey = $"{CacheKeyPrefix}{file.Id}"; var cacheKey = $"{CacheKeyPrefix}{file.Id}";
@@ -90,7 +79,6 @@ public class FileService(
} }
} }
// Preserve original order
return fileIds return fileIds
.Select(f => cachedFiles.GetValueOrDefault(f)) .Select(f => cachedFiles.GetValueOrDefault(f))
.Where(f => f != null) .Where(f => f != null)
@@ -98,20 +86,12 @@ public class FileService(
.ToList(); .ToList();
} }
private const string TempFilePrefix = "dyn-cloudfile";
private static readonly string[] AnimatedImageTypes =
["image/gif", "image/apng", "image/avif"];
private static readonly string[] AnimatedImageExtensions =
[".gif", ".apng", ".avif"];
public async Task<CloudFile> ProcessNewFileAsync( public async Task<CloudFile> ProcessNewFileAsync(
Account account, Account account,
string fileId, string fileId,
string filePool, string filePool,
string? fileBundleId, string? fileBundleId,
Stream stream, string filePath,
string fileName, string fileName,
string? contentType, string? contentType,
string? encryptPassword, string? encryptPassword,
@@ -143,57 +123,74 @@ public class FileService(
if (bundle?.ExpiredAt != null) if (bundle?.ExpiredAt != null)
expiredAt = bundle.ExpiredAt.Value; expiredAt = bundle.ExpiredAt.Value;
var ogFilePath = Path.GetFullPath(Path.Join(configuration.GetValue<string>("Tus:StorePath"), fileId)); var managedTempPath = Path.Combine(Path.GetTempPath(), fileId);
var fileSize = stream.Length; File.Copy(filePath, managedTempPath, true);
contentType ??= !fileName.Contains('.') ? "application/octet-stream" : MimeTypes.GetMimeType(fileName);
if (!string.IsNullOrWhiteSpace(encryptPassword)) var fileInfo = new FileInfo(managedTempPath);
{ var fileSize = fileInfo.Length;
if (!pool.PolicyConfig.AllowEncryption) var finalContentType = contentType ??
throw new InvalidOperationException("Encryption is not allowed in this pool"); (!fileName.Contains('.') ? "application/octet-stream" : MimeTypes.GetMimeType(fileName));
var encryptedPath = Path.Combine(Path.GetTempPath(), $"{fileId}.encrypted");
FileEncryptor.EncryptFile(ogFilePath, encryptedPath, encryptPassword);
File.Delete(ogFilePath); // Delete original unencrypted
File.Move(encryptedPath, ogFilePath); // Replace the original one with encrypted
contentType = "application/octet-stream";
}
var hash = await HashFileAsync(ogFilePath);
var file = new CloudFile var file = new CloudFile
{ {
Id = fileId, Id = fileId,
Name = fileName, Name = fileName,
MimeType = contentType, MimeType = finalContentType,
Size = fileSize, Size = fileSize,
Hash = hash,
ExpiredAt = expiredAt, ExpiredAt = expiredAt,
BundleId = bundle?.Id, BundleId = bundle?.Id,
AccountId = Guid.Parse(account.Id), AccountId = Guid.Parse(account.Id),
IsEncrypted = !string.IsNullOrWhiteSpace(encryptPassword) && pool.PolicyConfig.AllowEncryption
}; };
// Extract metadata on the current thread for a faster initial response
if (!pool.PolicyConfig.NoMetadata) if (!pool.PolicyConfig.NoMetadata)
await ExtractMetadataAsync(file, ogFilePath, stream); {
await ExtractMetadataAsync(file, managedTempPath);
}
string processingPath = managedTempPath;
bool isTempFile = true;
if (!string.IsNullOrWhiteSpace(encryptPassword))
{
if (!pool.PolicyConfig.AllowEncryption)
throw new InvalidOperationException("Encryption is not allowed in this pool");
var encryptedPath = Path.Combine(Path.GetTempPath(), $"{fileId}.encrypted");
FileEncryptor.EncryptFile(managedTempPath, encryptedPath, encryptPassword);
File.Delete(managedTempPath);
processingPath = encryptedPath;
file.IsEncrypted = true;
file.MimeType = "application/octet-stream";
file.Size = new FileInfo(processingPath).Length;
}
file.Hash = await HashFileAsync(processingPath);
db.Files.Add(file); db.Files.Add(file);
await db.SaveChangesAsync(); await db.SaveChangesAsync();
file.StorageId ??= file.Id; file.StorageId ??= file.Id;
// Offload optimization (image conversion, thumbnailing) and uploading to a background task var js = nats.CreateJetStreamContext();
_ = Task.Run(() => await js.PublishAsync(
ProcessAndUploadInBackgroundAsync(file.Id, filePool, file.StorageId, contentType, ogFilePath, stream)); FileUploadedEvent.Type,
GrpcTypeHelper.ConvertObjectToByteString(new FileUploadedEventPayload(
file.Id,
pool.Id,
file.StorageId,
file.MimeType,
processingPath,
isTempFile)
).ToByteArray()
);
return file; return file;
} }
/// <summary> private async Task ExtractMetadataAsync(CloudFile file, string filePath)
/// Extracts metadata from the file based on its content type.
/// This runs synchronously to ensure the initial database record has basic metadata.
/// </summary>
private async Task ExtractMetadataAsync(CloudFile file, string filePath, Stream stream)
{ {
switch (file.MimeType?.Split('/')[0]) switch (file.MimeType?.Split('/')[0])
{ {
@@ -201,6 +198,7 @@ public class FileService(
try try
{ {
var blurhash = BlurHashSharp.SkiaSharp.BlurHashEncoder.Encode(3, 3, filePath); var blurhash = BlurHashSharp.SkiaSharp.BlurHashEncoder.Encode(3, 3, filePath);
await using var stream = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.Read);
stream.Position = 0; stream.Position = 0;
using var vipsImage = Image.NewFromStream(stream); using var vipsImage = Image.NewFromStream(stream);
@@ -265,7 +263,6 @@ public class FileService(
["bit_rate"] = mediaInfo.Format.BitRate.ToString(CultureInfo.InvariantCulture), ["bit_rate"] = mediaInfo.Format.BitRate.ToString(CultureInfo.InvariantCulture),
["tags"] = mediaInfo.Format.Tags ?? new Dictionary<string, string>(), ["tags"] = mediaInfo.Format.Tags ?? new Dictionary<string, string>(),
["chapters"] = mediaInfo.Chapters, ["chapters"] = mediaInfo.Chapters,
// Add detailed stream information
["video_streams"] = mediaInfo.VideoStreams.Select(s => new ["video_streams"] = mediaInfo.VideoStreams.Select(s => new
{ {
s.AvgFrameRate, s.AvgFrameRate,
@@ -303,166 +300,6 @@ public class FileService(
} }
} }
/// <summary>
/// Handles file optimization (image compression, video thumbnail) and uploads to remote storage in the background.
/// </summary>
private async Task ProcessAndUploadInBackgroundAsync(
string fileId,
string remoteId,
string storageId,
string contentType,
string originalFilePath,
Stream stream
)
{
var pool = await GetPoolAsync(Guid.Parse(remoteId));
if (pool is null) return;
await using var bgStream = stream; // Ensure stream is disposed at the end of this task
using var scope = scopeFactory.CreateScope();
var nfs = scope.ServiceProvider.GetRequiredService<FileService>();
var scopedDb = scope.ServiceProvider.GetRequiredService<AppDatabase>();
var uploads = new List<(string FilePath, string Suffix, string ContentType, bool SelfDestruct)>();
var newMimeType = contentType;
var hasCompression = false;
var hasThumbnail = false;
try
{
logger.LogInformation("Processing file {FileId} in background...", fileId);
var fileExtension = Path.GetExtension(originalFilePath);
if (!pool.PolicyConfig.NoOptimization)
switch (contentType.Split('/')[0])
{
case "image":
if (AnimatedImageTypes.Contains(contentType) || AnimatedImageExtensions.Contains(fileExtension))
{
logger.LogInformation("Skip optimize file {FileId} due to it is animated...", fileId);
uploads.Add((originalFilePath, string.Empty, contentType, false));
break;
}
newMimeType = "image/webp";
using (var vipsImage = Image.NewFromFile(originalFilePath))
{
var imageToWrite = vipsImage;
if (vipsImage.Interpretation is Enums.Interpretation.Scrgb or Enums.Interpretation.Xyz)
{
imageToWrite = vipsImage.Colourspace(Enums.Interpretation.Srgb);
}
var webpPath = Path.Join(Path.GetTempPath(), $"{TempFilePrefix}#{fileId}.webp");
imageToWrite.Autorot().WriteToFile(webpPath,
new VOption { { "lossless", true }, { "strip", true } });
uploads.Add((webpPath, string.Empty, newMimeType, true));
if (imageToWrite.Width * imageToWrite.Height >= 1024 * 1024)
{
var scale = 1024.0 / Math.Max(imageToWrite.Width, imageToWrite.Height);
var compressedPath =
Path.Join(Path.GetTempPath(), $"{TempFilePrefix}#{fileId}-compressed.webp");
using var compressedImage = imageToWrite.Resize(scale);
compressedImage.Autorot().WriteToFile(compressedPath,
new VOption { { "Q", 80 }, { "strip", true } });
uploads.Add((compressedPath, ".compressed", newMimeType, true));
hasCompression = true;
}
if (!ReferenceEquals(imageToWrite, vipsImage))
{
imageToWrite.Dispose(); // Clean up manually created colourspace-converted image
}
}
break;
case "video":
uploads.Add((originalFilePath, string.Empty, contentType, false));
var thumbnailPath = Path.Join(Path.GetTempPath(), $"{TempFilePrefix}#{fileId}.thumbnail.jpg");
try
{
await FFMpegArguments
.FromFileInput(originalFilePath, verifyExists: true)
.OutputToFile(thumbnailPath, overwrite: true, options => options
.Seek(TimeSpan.FromSeconds(0))
.WithFrameOutputCount(1)
.WithCustomArgument("-q:v 2")
)
.NotifyOnOutput(line => logger.LogInformation("[FFmpeg] {Line}", line))
.NotifyOnError(line => logger.LogWarning("[FFmpeg] {Line}", line))
.ProcessAsynchronously();
if (File.Exists(thumbnailPath))
{
uploads.Add((thumbnailPath, ".thumbnail", "image/jpeg", true));
hasThumbnail = true;
}
else
{
logger.LogWarning("FFMpeg did not produce thumbnail for video {FileId}", fileId);
}
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to generate thumbnail for video {FileId}", fileId);
}
break;
default:
uploads.Add((originalFilePath, string.Empty, contentType, false));
break;
}
else uploads.Add((originalFilePath, string.Empty, contentType, false));
logger.LogInformation("Optimized file {FileId}, now uploading...", fileId);
if (uploads.Count > 0)
{
var destPool = Guid.Parse(remoteId!);
var uploadTasks = uploads.Select(item =>
nfs.UploadFileToRemoteAsync(
storageId,
destPool,
item.FilePath,
item.Suffix,
item.ContentType,
item.SelfDestruct
)
).ToList();
await Task.WhenAll(uploadTasks);
logger.LogInformation("Uploaded file {FileId} done!", fileId);
var fileToUpdate = await scopedDb.Files.FirstAsync(f => f.Id == fileId);
if (hasThumbnail) fileToUpdate.HasThumbnail = true;
var now = SystemClock.Instance.GetCurrentInstant();
await scopedDb.Files.Where(f => f.Id == fileId).ExecuteUpdateAsync(setter => setter
.SetProperty(f => f.UploadedAt, now)
.SetProperty(f => f.PoolId, destPool)
.SetProperty(f => f.MimeType, newMimeType)
.SetProperty(f => f.HasCompression, hasCompression)
.SetProperty(f => f.HasThumbnail, hasThumbnail)
);
}
}
catch (Exception err)
{
logger.LogError(err, "Failed to process and upload {FileId}", fileId);
}
finally
{
await nfs._PurgeCacheAsync(fileId);
}
}
private static async Task<string> HashFileAsync(string filePath, int chunkSize = 1024 * 1024) private static async Task<string> HashFileAsync(string filePath, int chunkSize = 1024 * 1024)
{ {
var fileInfo = new FileInfo(filePath); var fileInfo = new FileInfo(filePath);
@@ -491,11 +328,11 @@ public class FileService(
} }
var hash = MD5.HashData(buffer.AsSpan(0, bytesRead)); var hash = MD5.HashData(buffer.AsSpan(0, bytesRead));
stream.Position = 0; // Reset stream position stream.Position = 0;
return Convert.ToHexString(hash).ToLowerInvariant(); return Convert.ToHexString(hash).ToLowerInvariant();
} }
private async Task UploadFileToRemoteAsync( public async Task UploadFileToRemoteAsync(
string storageId, string storageId,
Guid targetRemote, Guid targetRemote,
string filePath, string filePath,
@@ -509,7 +346,7 @@ public class FileService(
if (selfDestruct) File.Delete(filePath); if (selfDestruct) File.Delete(filePath);
} }
private async Task UploadFileToRemoteAsync( public async Task UploadFileToRemoteAsync(
string storageId, string storageId,
Guid targetRemote, Guid targetRemote,
Stream stream, Stream stream,
@@ -574,7 +411,6 @@ public class FileService(
await db.Files.Where(f => f.Id == file.Id).ExecuteUpdateAsync(updatable.ToSetPropertyCalls()); await db.Files.Where(f => f.Id == file.Id).ExecuteUpdateAsync(updatable.ToSetPropertyCalls());
await _PurgeCacheAsync(file.Id); await _PurgeCacheAsync(file.Id);
// Re-fetch the file to return the updated state
return await db.Files.AsNoTracking().FirstAsync(f => f.Id == file.Id); return await db.Files.AsNoTracking().FirstAsync(f => f.Id == file.Id);
} }
@@ -593,18 +429,15 @@ public class FileService(
if (!force) if (!force)
{ {
// Check if any other file with the same storage ID is referenced
var sameOriginFiles = await db.Files var sameOriginFiles = await db.Files
.Where(f => f.StorageId == file.StorageId && f.Id != file.Id) .Where(f => f.StorageId == file.StorageId && f.Id != file.Id)
.Select(f => f.Id) .Select(f => f.Id)
.ToListAsync(); .ToListAsync();
// Check if any of these files are referenced
if (sameOriginFiles.Count != 0) if (sameOriginFiles.Count != 0)
return; return;
} }
// If any other file with the same storage ID is referenced, don't delete the actual file data
var dest = await GetRemoteStorageConfig(file.PoolId.Value); var dest = await GetRemoteStorageConfig(file.PoolId.Value);
if (dest is null) throw new InvalidOperationException($"No remote storage configured for pool {file.PoolId}"); if (dest is null) throw new InvalidOperationException($"No remote storage configured for pool {file.PoolId}");
var client = CreateMinioClient(dest); var client = CreateMinioClient(dest);
@@ -614,7 +447,7 @@ public class FileService(
); );
var bucket = dest.Bucket; var bucket = dest.Bucket;
var objectId = file.StorageId ?? file.Id; // Use StorageId if available, otherwise fall back to Id var objectId = file.StorageId ?? file.Id;
await client.RemoveObjectAsync( await client.RemoveObjectAsync(
new RemoveObjectArgs().WithBucket(bucket).WithObject(objectId) new RemoveObjectArgs().WithBucket(bucket).WithObject(objectId)
@@ -630,7 +463,6 @@ public class FileService(
} }
catch catch
{ {
// Ignore errors when deleting compressed version
logger.LogWarning("Failed to delete compressed version of file {fileId}", file.Id); logger.LogWarning("Failed to delete compressed version of file {fileId}", file.Id);
} }
} }
@@ -645,25 +477,17 @@ public class FileService(
} }
catch catch
{ {
// Ignore errors when deleting thumbnail
logger.LogWarning("Failed to delete thumbnail of file {fileId}", file.Id); logger.LogWarning("Failed to delete thumbnail of file {fileId}", file.Id);
} }
} }
} }
/// <summary>
/// The most efficent way to delete file data (stored files) in batch.
/// But this DO NOT check the storage id, so use with caution!
/// </summary>
/// <param name="files">Files to delete</param>
/// <exception cref="InvalidOperationException">Something went wrong</exception>
public async Task DeleteFileDataBatchAsync(List<CloudFile> files) public async Task DeleteFileDataBatchAsync(List<CloudFile> files)
{ {
files = files.Where(f => f.PoolId.HasValue).ToList(); files = files.Where(f => f.PoolId.HasValue).ToList();
foreach (var fileGroup in files.GroupBy(f => f.PoolId!.Value)) foreach (var fileGroup in files.GroupBy(f => f.PoolId!.Value))
{ {
// If any other file with the same storage ID is referenced, don't delete the actual file data
var dest = await GetRemoteStorageConfig(fileGroup.Key); var dest = await GetRemoteStorageConfig(fileGroup.Key);
if (dest is null) if (dest is null)
throw new InvalidOperationException($"No remote storage configured for pool {fileGroup.Key}"); throw new InvalidOperationException($"No remote storage configured for pool {fileGroup.Key}");
@@ -733,15 +557,12 @@ public class FileService(
return client.Build(); return client.Build();
} }
// Helper method to purge the cache for a specific file
// Made internal to allow FileReferenceService to use it
internal async Task _PurgeCacheAsync(string fileId) internal async Task _PurgeCacheAsync(string fileId)
{ {
var cacheKey = $"{CacheKeyPrefix}{fileId}"; var cacheKey = $"{CacheKeyPrefix}{fileId}";
await cache.RemoveAsync(cacheKey); await cache.RemoveAsync(cacheKey);
} }
// Helper method to purge cache for multiple files
internal async Task _PurgeCacheRangeAsync(IEnumerable<string> fileIds) internal async Task _PurgeCacheRangeAsync(IEnumerable<string> fileIds)
{ {
var tasks = fileIds.Select(_PurgeCacheAsync); var tasks = fileIds.Select(_PurgeCacheAsync);
@@ -753,7 +574,6 @@ public class FileService(
var cachedFiles = new Dictionary<string, CloudFile>(); var cachedFiles = new Dictionary<string, CloudFile>();
var uncachedIds = new List<string>(); var uncachedIds = new List<string>();
// Check cache first
foreach (var reference in references) foreach (var reference in references)
{ {
var cacheKey = $"{CacheKeyPrefix}{reference.Id}"; var cacheKey = $"{CacheKeyPrefix}{reference.Id}";
@@ -769,14 +589,12 @@ public class FileService(
} }
} }
// Load uncached files from database
if (uncachedIds.Count > 0) if (uncachedIds.Count > 0)
{ {
var dbFiles = await db.Files var dbFiles = await db.Files
.Where(f => uncachedIds.Contains(f.Id)) .Where(f => uncachedIds.Contains(f.Id))
.ToListAsync(); .ToListAsync();
// Add to cache
foreach (var file in dbFiles) foreach (var file in dbFiles)
{ {
var cacheKey = $"{CacheKeyPrefix}{file.Id}"; var cacheKey = $"{CacheKeyPrefix}{file.Id}";
@@ -785,18 +603,12 @@ public class FileService(
} }
} }
// Preserve original order
return references return references
.Select(r => cachedFiles.GetValueOrDefault(r.Id)) .Select(r => cachedFiles.GetValueOrDefault(r.Id))
.Where(f => f != null) .Where(f => f != null)
.ToList(); .ToList();
} }
/// <summary>
/// Gets the number of references to a file based on CloudFileReference records
/// </summary>
/// <param name="fileId">The ID of the file</param>
/// <returns>The number of references to the file</returns>
public async Task<int> GetReferenceCountAsync(string fileId) public async Task<int> GetReferenceCountAsync(string fileId)
{ {
return await db.FileReferences return await db.FileReferences
@@ -804,11 +616,6 @@ public class FileService(
.CountAsync(); .CountAsync();
} }
/// <summary>
/// Checks if a file is referenced by any resource
/// </summary>
/// <param name="fileId">The ID of the file to check</param>
/// <returns>True if the file is referenced, false otherwise</returns>
public async Task<bool> IsReferencedAsync(string fileId) public async Task<bool> IsReferencedAsync(string fileId)
{ {
return await db.FileReferences return await db.FileReferences
@@ -816,12 +623,8 @@ public class FileService(
.AnyAsync(); .AnyAsync();
} }
/// <summary>
/// Checks if an EXIF field should be ignored (e.g., GPS data).
/// </summary>
private static bool IsIgnoredField(string fieldName) private static bool IsIgnoredField(string fieldName)
{ {
// Common GPS EXIF field names
var gpsFields = new[] var gpsFields = new[]
{ {
"gps-latitude", "gps-longitude", "gps-altitude", "gps-latitude-ref", "gps-longitude-ref", "gps-latitude", "gps-longitude", "gps-altitude", "gps-latitude-ref", "gps-longitude-ref",
@@ -904,9 +707,6 @@ public class FileService(
} }
} }
/// <summary>
/// A helper class to build an ExecuteUpdateAsync call for CloudFile.
/// </summary>
file class UpdatableCloudFile(CloudFile file) file class UpdatableCloudFile(CloudFile file)
{ {
public string Name { get; set; } = file.Name; public string Name { get; set; } = file.Name;
@@ -922,7 +722,7 @@ file class UpdatableCloudFile(CloudFile file)
.SetProperty(f => f.Name, Name) .SetProperty(f => f.Name, Name)
.SetProperty(f => f.Description, Description) .SetProperty(f => f.Description, Description)
.SetProperty(f => f.FileMeta, FileMeta) .SetProperty(f => f.FileMeta, FileMeta)
.SetProperty(f => f.UserMeta, userMeta!) .SetProperty(f => f.UserMeta, userMeta)
.SetProperty(f => f.IsMarkedRecycle, IsMarkedRecycle); .SetProperty(f => f.IsMarkedRecycle, IsMarkedRecycle);
} }
} }

View File

@@ -23,7 +23,7 @@ public class FileUploadController(
: ControllerBase : ControllerBase
{ {
private readonly string _tempPath = private readonly string _tempPath =
Path.Combine(configuration.GetValue<string>("Storage:Uploads") ?? Path.GetTempPath(), "multipart-uploads"); configuration.GetValue<string>("Storage:Uploads") ?? Path.Combine(Path.GetTempPath(), "multipart-uploads");
private const long DefaultChunkSize = 1024 * 1024 * 5; // 5MB private const long DefaultChunkSize = 1024 * 1024 * 5; // 5MB
@@ -42,12 +42,9 @@ public class FileUploadController(
} }
} }
if (!Guid.TryParse(request.PoolId, out var poolGuid)) request.PoolId ??= Guid.Parse(configuration["Storage:PreferredRemote"]!);
{
return BadRequest("Invalid file pool id");
}
var pool = await fileService.GetPoolAsync(poolGuid); var pool = await fileService.GetPoolAsync(request.PoolId.Value);
if (pool is null) if (pool is null)
{ {
return BadRequest("Pool not found"); return BadRequest("Pool not found");
@@ -73,11 +70,6 @@ public class FileUploadController(
} }
} }
if (!string.IsNullOrEmpty(request.BundleId) && !Guid.TryParse(request.BundleId, out _))
{
return BadRequest("Invalid file bundle id");
}
var policy = pool.PolicyConfig; var policy = pool.PolicyConfig;
if (!policy.AllowEncryption && !string.IsNullOrEmpty(request.EncryptPassword)) if (!policy.AllowEncryption && !string.IsNullOrEmpty(request.EncryptPassword))
{ {
@@ -160,7 +152,7 @@ public class FileUploadController(
ContentType = request.ContentType, ContentType = request.ContentType,
ChunkSize = chunkSize, ChunkSize = chunkSize,
ChunksCount = chunksCount, ChunksCount = chunksCount,
PoolId = request.PoolId, PoolId = request.PoolId.Value,
BundleId = request.BundleId, BundleId = request.BundleId,
EncryptPassword = request.EncryptPassword, EncryptPassword = request.EncryptPassword,
ExpiredAt = request.ExpiredAt, ExpiredAt = request.ExpiredAt,
@@ -241,26 +233,22 @@ public class FileUploadController(
var fileId = await Nanoid.GenerateAsync(); var fileId = await Nanoid.GenerateAsync();
await using (var fileStream = var cloudFile = await fileService.ProcessNewFileAsync(
new FileStream(mergedFilePath, FileMode.Open, FileAccess.Read, FileShare.Read))
{
var cloudFile = await fileService.ProcessNewFileAsync(
currentUser, currentUser,
fileId, fileId,
task.PoolId, task.PoolId.ToString(),
task.BundleId, task.BundleId?.ToString(),
fileStream, mergedFilePath,
task.FileName, task.FileName,
task.ContentType, task.ContentType,
task.EncryptPassword, task.EncryptPassword,
task.ExpiredAt task.ExpiredAt
); );
// Clean up // Clean up
Directory.Delete(taskPath, true); Directory.Delete(taskPath, true);
System.IO.File.Delete(mergedFilePath); System.IO.File.Delete(mergedFilePath);
return Ok(cloudFile); return Ok(cloudFile);
}
} }
} }

View File

@@ -0,0 +1,15 @@
namespace DysonNetwork.Drive.Storage.Model;
public static class FileUploadedEvent
{
public const string Type = "file_uploaded";
}
public record FileUploadedEventPayload(
string FileId,
Guid RemoteId,
string StorageId,
string ContentType,
string ProcessingFilePath,
bool IsTempFile
);

View File

@@ -1,4 +1,3 @@
using DysonNetwork.Drive.Storage;
using NodaTime; using NodaTime;
namespace DysonNetwork.Drive.Storage.Model namespace DysonNetwork.Drive.Storage.Model
@@ -9,8 +8,8 @@ namespace DysonNetwork.Drive.Storage.Model
public string FileName { get; set; } = null!; public string FileName { get; set; } = null!;
public long FileSize { get; set; } public long FileSize { get; set; }
public string ContentType { get; set; } = null!; public string ContentType { get; set; } = null!;
public string PoolId { get; set; } = null!; public Guid? PoolId { get; set; } = null!;
public string? BundleId { get; set; } public Guid? BundleId { get; set; }
public string? EncryptPassword { get; set; } public string? EncryptPassword { get; set; }
public Instant? ExpiredAt { get; set; } public Instant? ExpiredAt { get; set; }
public long? ChunkSize { get; set; } public long? ChunkSize { get; set; }
@@ -33,8 +32,8 @@ namespace DysonNetwork.Drive.Storage.Model
public string ContentType { get; set; } = null!; public string ContentType { get; set; } = null!;
public long ChunkSize { get; set; } public long ChunkSize { get; set; }
public int ChunksCount { get; set; } public int ChunksCount { get; set; }
public string PoolId { get; set; } = null!; public Guid PoolId { get; set; }
public string? BundleId { get; set; } public Guid? BundleId { get; set; }
public string? EncryptPassword { get; set; } public string? EncryptPassword { get; set; }
public Instant? ExpiredAt { get; set; } public Instant? ExpiredAt { get; set; }
public string Hash { get; set; } = null!; public string Hash { get; set; } = null!;

View File

@@ -16,7 +16,7 @@ To begin a file upload, you first need to create an upload task. This is done by
"file_name": "string", "file_name": "string",
"file_size": "long (in bytes)", "file_size": "long (in bytes)",
"content_type": "string (e.g., 'image/jpeg')", "content_type": "string (e.g., 'image/jpeg')",
"pool_id": "string (GUID)", "pool_id": "string (GUID, optional)",
"bundle_id": "string (GUID, optional)", "bundle_id": "string (GUID, optional)",
"encrypt_password": "string (optional)", "encrypt_password": "string (optional)",
"expired_at": "string (ISO 8601 format, optional)", "expired_at": "string (ISO 8601 format, optional)",

View File

@@ -113,7 +113,7 @@ public abstract class TusService
: "uploaded_file"; : "uploaded_file";
var contentType = metadata.TryGetValue("content-type", out var ct) ? ct.GetString(Encoding.UTF8) : null; var contentType = metadata.TryGetValue("content-type", out var ct) ? ct.GetString(Encoding.UTF8) : null;
var fileStream = await file.GetContentAsync(eventContext.CancellationToken); var filePath = Path.Combine(configuration.GetValue<string>("Tus:StorePath")!, file.Id);
var filePool = httpContext.Request.Headers["X-FilePool"].FirstOrDefault(); var filePool = httpContext.Request.Headers["X-FilePool"].FirstOrDefault();
var bundleId = eventContext.HttpContext.Request.Headers["X-FileBundle"].FirstOrDefault(); var bundleId = eventContext.HttpContext.Request.Headers["X-FileBundle"].FirstOrDefault();
@@ -135,7 +135,7 @@ public abstract class TusService
file.Id, file.Id,
filePool!, filePool!,
bundleId, bundleId,
fileStream, filePath,
fileName, fileName,
contentType, contentType,
encryptPassword, encryptPassword,
@@ -155,11 +155,6 @@ public abstract class TusService
await eventContext.HttpContext.Response.WriteAsync(ex.Message); await eventContext.HttpContext.Response.WriteAsync(ex.Message);
logger.LogError(ex, "Error handling file upload..."); logger.LogError(ex, "Error handling file upload...");
} }
finally
{
// Dispose the stream after all processing is complete
await fileStream.DisposeAsync();
}
}, },
OnBeforeCreateAsync = async eventContext => OnBeforeCreateAsync = async eventContext =>
{ {

View File

@@ -4,7 +4,6 @@ using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting; using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Server.Kestrel.Core; using Microsoft.AspNetCore.Server.Kestrel.Core;
using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
namespace DysonNetwork.Shared.Http; namespace DysonNetwork.Shared.Http;