✨ Drive resource recycler, delete files in batch
This commit is contained in:
@@ -5,6 +5,7 @@ using DysonNetwork.Shared.Auth;
|
|||||||
using DysonNetwork.Shared.Http;
|
using DysonNetwork.Shared.Http;
|
||||||
using DysonNetwork.Shared.PageData;
|
using DysonNetwork.Shared.PageData;
|
||||||
using DysonNetwork.Shared.Registry;
|
using DysonNetwork.Shared.Registry;
|
||||||
|
using DysonNetwork.Shared.Stream;
|
||||||
using Microsoft.EntityFrameworkCore;
|
using Microsoft.EntityFrameworkCore;
|
||||||
using tusdotnet.Stores;
|
using tusdotnet.Stores;
|
||||||
|
|
||||||
@@ -15,6 +16,7 @@ builder.ConfigureAppKestrel(builder.Configuration, maxRequestBodySize: long.MaxV
|
|||||||
|
|
||||||
// Add application services
|
// Add application services
|
||||||
builder.Services.AddRegistryService(builder.Configuration);
|
builder.Services.AddRegistryService(builder.Configuration);
|
||||||
|
builder.Services.AddStreamConnection(builder.Configuration);
|
||||||
builder.Services.AddAppServices(builder.Configuration);
|
builder.Services.AddAppServices(builder.Configuration);
|
||||||
builder.Services.AddAppRateLimiting();
|
builder.Services.AddAppRateLimiting();
|
||||||
builder.Services.AddAppAuthentication();
|
builder.Services.AddAppAuthentication();
|
||||||
|
53
DysonNetwork.Drive/Startup/BroadcastEventHandler.cs
Normal file
53
DysonNetwork.Drive/Startup/BroadcastEventHandler.cs
Normal file
@@ -0,0 +1,53 @@
|
|||||||
|
using System.Text.Json;
|
||||||
|
using DysonNetwork.Drive.Storage;
|
||||||
|
using DysonNetwork.Shared.Stream;
|
||||||
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
using NATS.Client.Core;
|
||||||
|
|
||||||
|
namespace DysonNetwork.Drive.Startup;
|
||||||
|
|
||||||
|
public class BroadcastEventHandler(
|
||||||
|
INatsConnection nats,
|
||||||
|
ILogger<BroadcastEventHandler> logger,
|
||||||
|
FileService fs,
|
||||||
|
AppDatabase db
|
||||||
|
) : BackgroundService
|
||||||
|
{
|
||||||
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||||
|
{
|
||||||
|
await foreach (var msg in nats.SubscribeAsync<byte[]>("accounts.deleted", cancellationToken: stoppingToken))
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var evt = JsonSerializer.Deserialize<AccountDeletedEvent>(msg.Data);
|
||||||
|
if (evt == null) continue;
|
||||||
|
|
||||||
|
logger.LogInformation("Account deleted: {AccountId}", evt.AccountId);
|
||||||
|
|
||||||
|
await using var transaction = await db.Database.BeginTransactionAsync(cancellationToken: stoppingToken);
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var files = await db.Files
|
||||||
|
.Where(p => p.AccountId == evt.AccountId)
|
||||||
|
.ToListAsync(cancellationToken: stoppingToken);
|
||||||
|
|
||||||
|
await fs.DeleteFileDataBatchAsync(files);
|
||||||
|
await db.Files
|
||||||
|
.Where(p => p.AccountId == evt.AccountId)
|
||||||
|
.ExecuteDeleteAsync(cancellationToken: stoppingToken);
|
||||||
|
|
||||||
|
await transaction.CommitAsync(cancellationToken: stoppingToken);
|
||||||
|
}
|
||||||
|
catch (Exception)
|
||||||
|
{
|
||||||
|
await transaction.RollbackAsync(cancellationToken: stoppingToken);
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
logger.LogError(ex, "Error processing AccountDeleted");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@@ -141,6 +141,8 @@ public static class ServiceCollectionExtensions
|
|||||||
services.AddScoped<Billing.UsageService>();
|
services.AddScoped<Billing.UsageService>();
|
||||||
services.AddScoped<Billing.QuotaService>();
|
services.AddScoped<Billing.QuotaService>();
|
||||||
|
|
||||||
|
services.AddHostedService<BroadcastEventHandler>();
|
||||||
|
|
||||||
return services;
|
return services;
|
||||||
}
|
}
|
||||||
}
|
}
|
@@ -33,10 +33,6 @@ public class CloudFile : ModelBase, ICloudFile, IIdentifiedResource
|
|||||||
[JsonIgnore] public FileBundle? Bundle { get; set; }
|
[JsonIgnore] public FileBundle? Bundle { get; set; }
|
||||||
public Guid? BundleId { get; set; }
|
public Guid? BundleId { get; set; }
|
||||||
|
|
||||||
[Obsolete("Deprecated, use PoolId instead. For database migration only.")]
|
|
||||||
[MaxLength(128)]
|
|
||||||
public string? UploadedTo { get; set; }
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// The field is set to true if the recycling job plans to delete the file.
|
/// The field is set to true if the recycling job plans to delete the file.
|
||||||
/// Due to the unstable of the recycling job, this doesn't really delete the file until a human verifies it.
|
/// Due to the unstable of the recycling job, this doesn't really delete the file until a human verifies it.
|
||||||
@@ -61,6 +57,8 @@ public class CloudFile : ModelBase, ICloudFile, IIdentifiedResource
|
|||||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||||
public string? FastUploadLink { get; set; }
|
public string? FastUploadLink { get; set; }
|
||||||
|
|
||||||
|
public ICollection<CloudFileReference> References { get; set; } = new List<CloudFileReference>();
|
||||||
|
|
||||||
public Guid AccountId { get; set; }
|
public Guid AccountId { get; set; }
|
||||||
|
|
||||||
public CloudFileReferenceObject ToReferenceObject()
|
public CloudFileReferenceObject ToReferenceObject()
|
||||||
|
@@ -102,6 +102,7 @@ public class FileService(
|
|||||||
|
|
||||||
private static readonly string[] AnimatedImageTypes =
|
private static readonly string[] AnimatedImageTypes =
|
||||||
["image/gif", "image/apng", "image/avif"];
|
["image/gif", "image/apng", "image/avif"];
|
||||||
|
|
||||||
private static readonly string[] AnimatedImageExtensions =
|
private static readonly string[] AnimatedImageExtensions =
|
||||||
[".gif", ".apng", ".avif"];
|
[".gif", ".apng", ".avif"];
|
||||||
|
|
||||||
@@ -336,7 +337,8 @@ public class FileService(
|
|||||||
if (!pool.PolicyConfig.NoOptimization)
|
if (!pool.PolicyConfig.NoOptimization)
|
||||||
switch (contentType.Split('/')[0])
|
switch (contentType.Split('/')[0])
|
||||||
{
|
{
|
||||||
case "image" when !AnimatedImageTypes.Contains(contentType) && !AnimatedImageExtensions.Contains(fileExtension):
|
case "image" when !AnimatedImageTypes.Contains(contentType) &&
|
||||||
|
!AnimatedImageExtensions.Contains(fileExtension):
|
||||||
newMimeType = "image/webp";
|
newMimeType = "image/webp";
|
||||||
using (var vipsImage = Image.NewFromFile(originalFilePath))
|
using (var vipsImage = Image.NewFromFile(originalFilePath))
|
||||||
{
|
{
|
||||||
@@ -643,7 +645,44 @@ public class FileService(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task<FileBundle?> GetBundleAsync(Guid id, Guid accountId)
|
/// <summary>
|
||||||
|
/// The most efficent way to delete file data (stored files) in batch.
|
||||||
|
/// But this DO NOT check the storage id, so use with caution!
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="files">Files to delete</param>
|
||||||
|
/// <exception cref="InvalidOperationException">Something went wrong</exception>
|
||||||
|
public async Task DeleteFileDataBatchAsync(List<CloudFile> files)
|
||||||
|
{
|
||||||
|
files = files.Where(f => f.PoolId.HasValue).ToList();
|
||||||
|
|
||||||
|
foreach (var fileGroup in files.GroupBy(f => f.PoolId!.Value))
|
||||||
|
{
|
||||||
|
// If any other file with the same storage ID is referenced, don't delete the actual file data
|
||||||
|
var dest = await GetRemoteStorageConfig(fileGroup.Key);
|
||||||
|
if (dest is null)
|
||||||
|
throw new InvalidOperationException($"No remote storage configured for pool {fileGroup.Key}");
|
||||||
|
var client = CreateMinioClient(dest);
|
||||||
|
if (client is null)
|
||||||
|
throw new InvalidOperationException(
|
||||||
|
$"Failed to configure client for remote destination '{fileGroup.Key}'"
|
||||||
|
);
|
||||||
|
|
||||||
|
List<string> objectsToDelete = [];
|
||||||
|
|
||||||
|
foreach (var file in fileGroup)
|
||||||
|
{
|
||||||
|
objectsToDelete.Add(file.StorageId ?? file.Id);
|
||||||
|
if(file.HasCompression) objectsToDelete.Add(file.StorageId ?? file.Id + ".compressed");
|
||||||
|
if(file.HasThumbnail) objectsToDelete.Add(file.StorageId ?? file.Id + ".thumbnail");
|
||||||
|
}
|
||||||
|
|
||||||
|
await client.RemoveObjectsAsync(
|
||||||
|
new RemoveObjectsArgs().WithBucket(dest.Bucket).WithObjects(objectsToDelete)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task<FileBundle?> GetBundleAsync(Guid id, Guid accountId)
|
||||||
{
|
{
|
||||||
var bundle = await db.Bundles
|
var bundle = await db.Bundles
|
||||||
.Where(e => e.Id == id)
|
.Where(e => e.Id == id)
|
||||||
|
@@ -21,7 +21,6 @@ public static class GrpcClientHelper
|
|||||||
? X509Certificate2.CreateFromPemFile(clientCertPath, clientKeyPath)
|
? X509Certificate2.CreateFromPemFile(clientCertPath, clientKeyPath)
|
||||||
: X509Certificate2.CreateFromEncryptedPemFile(clientCertPath, clientCertPassword, clientKeyPath)
|
: X509Certificate2.CreateFromEncryptedPemFile(clientCertPath, clientCertPassword, clientKeyPath)
|
||||||
);
|
);
|
||||||
// TODO: Verify the ca in the future
|
|
||||||
handler.ServerCertificateCustomValidationCallback = (message, cert, chain, errors) => true;
|
handler.ServerCertificateCustomValidationCallback = (message, cert, chain, errors) => true;
|
||||||
var httpClient = new HttpClient(handler);
|
var httpClient = new HttpClient(handler);
|
||||||
httpClient.DefaultRequestVersion = HttpVersion.Version20;
|
httpClient.DefaultRequestVersion = HttpVersion.Version20;
|
||||||
|
@@ -22,6 +22,15 @@ public class BroadcastEventHandler(
|
|||||||
|
|
||||||
logger.LogInformation("Account deleted: {AccountId}", evt.AccountId);
|
logger.LogInformation("Account deleted: {AccountId}", evt.AccountId);
|
||||||
|
|
||||||
|
// TODO: Add empty realm, chat recycler in the db recycle
|
||||||
|
await db.ChatMembers
|
||||||
|
.Where(m => m.AccountId == evt.AccountId)
|
||||||
|
.ExecuteDeleteAsync(cancellationToken: stoppingToken);
|
||||||
|
|
||||||
|
await db.RealmMembers
|
||||||
|
.Where(m => m.AccountId == evt.AccountId)
|
||||||
|
.ExecuteDeleteAsync(cancellationToken: stoppingToken);
|
||||||
|
|
||||||
await using var transaction = await db.Database.BeginTransactionAsync(cancellationToken: stoppingToken);
|
await using var transaction = await db.Database.BeginTransactionAsync(cancellationToken: stoppingToken);
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
@@ -34,6 +43,11 @@ public class BroadcastEventHandler(
|
|||||||
.Where(p => p.PublisherId == publisher.Id)
|
.Where(p => p.PublisherId == publisher.Id)
|
||||||
.ExecuteDeleteAsync(cancellationToken: stoppingToken);
|
.ExecuteDeleteAsync(cancellationToken: stoppingToken);
|
||||||
|
|
||||||
|
var publisherIds = publishers.Select(p => p.Id).ToList();
|
||||||
|
await db.Publishers
|
||||||
|
.Where(p => publisherIds.Contains(p.Id))
|
||||||
|
.ExecuteDeleteAsync(cancellationToken: stoppingToken);
|
||||||
|
|
||||||
await transaction.CommitAsync(cancellationToken: stoppingToken);
|
await transaction.CommitAsync(cancellationToken: stoppingToken);
|
||||||
}
|
}
|
||||||
catch (Exception)
|
catch (Exception)
|
||||||
|
Reference in New Issue
Block a user