💥 ♻️ Refactor cloud files' references, and loading system
This commit is contained in:
@ -39,7 +39,10 @@ public class FileService(
|
||||
if (cachedFile is not null)
|
||||
return cachedFile;
|
||||
|
||||
var file = await db.Files.FirstOrDefaultAsync(f => f.Id == fileId);
|
||||
var file = await db.Files
|
||||
.Include(f => f.Account)
|
||||
.Where(f => f.Id == fileId)
|
||||
.FirstOrDefaultAsync();
|
||||
|
||||
if (file != null)
|
||||
await cache.SetAsync(cacheKey, file, CacheDuration);
|
||||
@ -208,8 +211,9 @@ public class FileService(
|
||||
if (result.Count > 0)
|
||||
{
|
||||
List<Task<CloudFile>> tasks = [];
|
||||
tasks.AddRange(result.Select(result =>
|
||||
nfs.UploadFileToRemoteAsync(file, result.filePath, null, result.suffix, true)));
|
||||
tasks.AddRange(result.Select(item =>
|
||||
nfs.UploadFileToRemoteAsync(file, item.filePath, null, item.suffix, true))
|
||||
);
|
||||
|
||||
await Task.WhenAll(tasks);
|
||||
file = await tasks.First();
|
||||
@ -326,10 +330,23 @@ public class FileService(
|
||||
if (file.StorageId is null) return;
|
||||
if (file.UploadedTo is null) return;
|
||||
|
||||
var repeatedStorageId = await db.Files
|
||||
.Where(f => f.StorageId == file.StorageId && f.Id != file.Id && f.UsedCount > 0)
|
||||
.AnyAsync();
|
||||
if (repeatedStorageId) return;
|
||||
// Check if any other file with the same storage ID is referenced
|
||||
var otherFilesWithSameStorageId = await db.Files
|
||||
.Where(f => f.StorageId == file.StorageId && f.Id != file.Id)
|
||||
.Select(f => f.Id)
|
||||
.ToListAsync();
|
||||
|
||||
// Check if any of these files are referenced
|
||||
var anyReferenced = false;
|
||||
if (otherFilesWithSameStorageId.Any())
|
||||
{
|
||||
anyReferenced = await db.FileReferences
|
||||
.Where(r => otherFilesWithSameStorageId.Contains(r.FileId))
|
||||
.AnyAsync();
|
||||
}
|
||||
|
||||
// If any other file with the same storage ID is referenced, don't delete the actual file data
|
||||
if (anyReferenced) return;
|
||||
|
||||
var dest = GetRemoteStorageConfig(file.UploadedTo);
|
||||
var client = CreateMinioClient(dest);
|
||||
@ -380,242 +397,88 @@ public class FileService(
|
||||
|
||||
return client.Build();
|
||||
}
|
||||
|
||||
public async Task MarkUsageAsync(CloudFile file, int delta)
|
||||
{
|
||||
await db.Files.Where(o => o.Id == file.Id)
|
||||
.ExecuteUpdateAsync(setter => setter.SetProperty(
|
||||
b => b.UsedCount,
|
||||
b => b.UsedCount + delta
|
||||
)
|
||||
);
|
||||
|
||||
await _PurgeCacheAsync(file.Id);
|
||||
}
|
||||
|
||||
public async Task MarkUsageRangeAsync(ICollection<CloudFile> files, int delta)
|
||||
{
|
||||
var ids = files.Select(f => f.Id).ToArray();
|
||||
await db.Files.Where(o => ids.Contains(o.Id))
|
||||
.ExecuteUpdateAsync(setter => setter.SetProperty(
|
||||
b => b.UsedCount,
|
||||
b => b.UsedCount + delta
|
||||
)
|
||||
);
|
||||
|
||||
await _PurgeCacheRangeAsync(files.Select(x => x.Id).ToList());
|
||||
}
|
||||
|
||||
|
||||
public async Task SetExpiresRangeAsync(ICollection<CloudFile> files, Duration? duration)
|
||||
{
|
||||
var ids = files.Select(f => f.Id).ToArray();
|
||||
await db.Files.Where(o => ids.Contains(o.Id))
|
||||
.ExecuteUpdateAsync(setter => setter.SetProperty(
|
||||
b => b.ExpiredAt,
|
||||
duration.HasValue
|
||||
? b => SystemClock.Instance.GetCurrentInstant() + duration.Value
|
||||
: _ => null
|
||||
)
|
||||
);
|
||||
|
||||
await _PurgeCacheRangeAsync(files.Select(x => x.Id).ToList());
|
||||
}
|
||||
|
||||
public async Task SetUsageAsync(CloudFile file, string? usage)
|
||||
{
|
||||
await db.Files.Where(o => o.Id == file.Id)
|
||||
.ExecuteUpdateAsync(setter => setter.SetProperty(
|
||||
b => b.Usage,
|
||||
_ => usage
|
||||
)
|
||||
);
|
||||
|
||||
await _PurgeCacheAsync(file.Id);
|
||||
}
|
||||
|
||||
public async Task SetUsageRangeAsync(ICollection<CloudFile> files, string? usage)
|
||||
{
|
||||
var ids = files.Select(f => f.Id).ToArray();
|
||||
await db.Files.Where(o => ids.Contains(o.Id))
|
||||
.ExecuteUpdateAsync(setter => setter.SetProperty(
|
||||
b => b.Usage,
|
||||
_ => usage
|
||||
)
|
||||
);
|
||||
|
||||
await _PurgeCacheRangeAsync(files.Select(x => x.Id).ToList());
|
||||
}
|
||||
|
||||
public async Task<(ICollection<CloudFile> current, ICollection<CloudFile> added, ICollection<CloudFile> removed)>
|
||||
DiffAndSetUsageAsync(
|
||||
ICollection<string>? newFileIds,
|
||||
string? usage,
|
||||
ICollection<CloudFile>? previousFiles = null
|
||||
)
|
||||
{
|
||||
if (newFileIds == null) return ([], [], previousFiles ?? []);
|
||||
|
||||
var records = await db.Files.Where(f => newFileIds.Contains(f.Id)).ToListAsync();
|
||||
var previous = previousFiles?.ToDictionary(f => f.Id) ?? new Dictionary<string, CloudFile>();
|
||||
var current = records.ToDictionary(f => f.Id);
|
||||
|
||||
var added = current.Keys.Except(previous.Keys).Select(id => current[id]).ToList();
|
||||
var removed = previous.Keys.Except(current.Keys).Select(id => previous[id]).ToList();
|
||||
|
||||
if (added.Count > 0) await SetUsageRangeAsync(added, usage);
|
||||
if (removed.Count > 0) await SetUsageRangeAsync(removed, null);
|
||||
|
||||
return (newFileIds.Select(id => current[id]).ToList(), added, removed);
|
||||
}
|
||||
|
||||
public async Task<(ICollection<CloudFile> current, ICollection<CloudFile> added, ICollection<CloudFile> removed)>
|
||||
DiffAndMarkFilesAsync(
|
||||
ICollection<string>? newFileIds,
|
||||
ICollection<CloudFile>? previousFiles = null
|
||||
)
|
||||
{
|
||||
if (newFileIds == null) return ([], [], previousFiles ?? []);
|
||||
|
||||
var records = await db.Files.Where(f => newFileIds.Contains(f.Id)).ToListAsync();
|
||||
var previous = previousFiles?.ToDictionary(f => f.Id) ?? new Dictionary<string, CloudFile>();
|
||||
var current = records.ToDictionary(f => f.Id);
|
||||
|
||||
var added = current.Keys.Except(previous.Keys).Select(id => current[id]).ToList();
|
||||
var removed = previous.Keys.Except(current.Keys).Select(id => previous[id]).ToList();
|
||||
|
||||
if (added.Count > 0) await MarkUsageRangeAsync(added, 1);
|
||||
if (removed.Count > 0) await MarkUsageRangeAsync(removed, -1);
|
||||
|
||||
return (newFileIds.Select(id => current[id]).ToList(), added, removed);
|
||||
}
|
||||
|
||||
public async Task<(ICollection<CloudFile> current, ICollection<CloudFile> added, ICollection<CloudFile> removed)>
|
||||
DiffAndSetExpiresAsync(
|
||||
ICollection<string>? newFileIds,
|
||||
Duration? duration,
|
||||
ICollection<CloudFile>? previousFiles = null
|
||||
)
|
||||
{
|
||||
if (newFileIds == null) return ([], [], previousFiles ?? []);
|
||||
|
||||
var records = await db.Files.Where(f => newFileIds.Contains(f.Id)).ToListAsync();
|
||||
var previous = previousFiles?.ToDictionary(f => f.Id) ?? new Dictionary<string, CloudFile>();
|
||||
var current = records.ToDictionary(f => f.Id);
|
||||
|
||||
var added = current.Keys.Except(previous.Keys).Select(id => current[id]).ToList();
|
||||
var removed = previous.Keys.Except(current.Keys).Select(id => previous[id]).ToList();
|
||||
|
||||
if (added.Count > 0) await SetExpiresRangeAsync(added, duration);
|
||||
if (removed.Count > 0) await SetExpiresRangeAsync(removed, null);
|
||||
|
||||
return (newFileIds.Select(id => current[id]).ToList(), added, removed);
|
||||
}
|
||||
|
||||
// Add this helper method to purge the cache for a specific file
|
||||
private async Task _PurgeCacheAsync(string fileId)
|
||||
|
||||
// Helper method to purge the cache for a specific file
|
||||
// Made internal to allow FileReferenceService to use it
|
||||
internal async Task _PurgeCacheAsync(string fileId)
|
||||
{
|
||||
var cacheKey = $"{CacheKeyPrefix}{fileId}";
|
||||
await cache.RemoveAsync(cacheKey);
|
||||
}
|
||||
|
||||
// Add this helper method to purge cache for multiple files
|
||||
private async Task _PurgeCacheRangeAsync(ICollection<string> fileIds)
|
||||
// Helper method to purge cache for multiple files
|
||||
internal async Task _PurgeCacheRangeAsync(IEnumerable<string> fileIds)
|
||||
{
|
||||
var tasks = fileIds.Select(_PurgeCacheAsync);
|
||||
await Task.WhenAll(tasks);
|
||||
}
|
||||
}
|
||||
|
||||
public class CloudFileUnusedRecyclingJob(AppDatabase db, FileService fs, ILogger<CloudFileUnusedRecyclingJob> logger)
|
||||
: IJob
|
||||
{
|
||||
public async Task Execute(IJobExecutionContext context)
|
||||
public async Task<List<CloudFile?>> LoadFromReference(List<CloudFileReferenceObject> references)
|
||||
{
|
||||
logger.LogInformation("Deleting unused cloud files...");
|
||||
var cachedFiles = new Dictionary<string, CloudFile>();
|
||||
var uncachedIds = new List<string>();
|
||||
|
||||
var cutoff = SystemClock.Instance.GetCurrentInstant() - Duration.FromHours(1);
|
||||
var now = SystemClock.Instance.GetCurrentInstant();
|
||||
|
||||
// Get files to delete along with their storage IDs
|
||||
var files = await db.Files
|
||||
.Where(f =>
|
||||
(f.ExpiredAt == null && f.UsedCount == 0 && f.CreatedAt < cutoff) ||
|
||||
(f.ExpiredAt != null && now >= f.ExpiredAt)
|
||||
)
|
||||
.ToListAsync();
|
||||
|
||||
if (files.Count == 0)
|
||||
// Check cache first
|
||||
foreach (var reference in references)
|
||||
{
|
||||
logger.LogInformation("No files to delete");
|
||||
return;
|
||||
var cacheKey = $"{CacheKeyPrefix}{reference.Id}";
|
||||
var cachedFile = await cache.GetAsync<CloudFile>(cacheKey);
|
||||
|
||||
if (cachedFile != null)
|
||||
{
|
||||
cachedFiles[reference.Id] = cachedFile;
|
||||
}
|
||||
else
|
||||
{
|
||||
uncachedIds.Add(reference.Id);
|
||||
}
|
||||
}
|
||||
|
||||
logger.LogInformation($"Found {files.Count} files to process...");
|
||||
// Load uncached files from database
|
||||
if (uncachedIds.Count > 0)
|
||||
{
|
||||
var dbFiles = await db.Files
|
||||
.Include(f => f.Account)
|
||||
.Where(f => uncachedIds.Contains(f.Id))
|
||||
.ToListAsync();
|
||||
|
||||
// Group files by StorageId and find which ones are safe to delete
|
||||
var storageIds = files.Where(f => f.StorageId != null)
|
||||
.Select(f => f.StorageId!)
|
||||
.Distinct()
|
||||
// Add to cache
|
||||
foreach (var file in dbFiles)
|
||||
{
|
||||
var cacheKey = $"{CacheKeyPrefix}{file.Id}";
|
||||
await cache.SetAsync(cacheKey, file, CacheDuration);
|
||||
cachedFiles[file.Id] = file;
|
||||
}
|
||||
}
|
||||
|
||||
// Preserve original order
|
||||
return references
|
||||
.Select(r => cachedFiles.GetValueOrDefault(r.Id))
|
||||
.Where(f => f != null)
|
||||
.ToList();
|
||||
|
||||
var usedStorageIds = await db.Files
|
||||
.Where(f => f.StorageId != null &&
|
||||
storageIds.Contains(f.StorageId) &&
|
||||
!files.Select(ff => ff.Id).Contains(f.Id))
|
||||
.Select(f => f.StorageId!)
|
||||
.Distinct()
|
||||
.ToListAsync();
|
||||
|
||||
// Group files for deletion
|
||||
var filesToDelete = files.Where(f => f.StorageId == null || !usedStorageIds.Contains(f.StorageId))
|
||||
.GroupBy(f => f.UploadedTo)
|
||||
.ToDictionary(grouping => grouping.Key!, grouping => grouping.ToList());
|
||||
|
||||
// Delete files by remote storage
|
||||
foreach (var group in filesToDelete.Where(group => !string.IsNullOrEmpty(group.Key)))
|
||||
{
|
||||
try
|
||||
{
|
||||
var dest = fs.GetRemoteStorageConfig(group.Key);
|
||||
var client = fs.CreateMinioClient(dest);
|
||||
if (client == null) continue;
|
||||
|
||||
// Create delete tasks for each file in the group
|
||||
var deleteTasks = group.Value.Select(file =>
|
||||
{
|
||||
var objectId = file.StorageId ?? file.Id;
|
||||
var tasks = new List<Task>
|
||||
{
|
||||
client.RemoveObjectAsync(new RemoveObjectArgs()
|
||||
.WithBucket(dest.Bucket)
|
||||
.WithObject(objectId))
|
||||
};
|
||||
|
||||
if (file.HasCompression)
|
||||
{
|
||||
tasks.Add(client.RemoveObjectAsync(new RemoveObjectArgs()
|
||||
.WithBucket(dest.Bucket)
|
||||
.WithObject(objectId + ".compressed")));
|
||||
}
|
||||
|
||||
return Task.WhenAll(tasks);
|
||||
});
|
||||
|
||||
await Task.WhenAll(deleteTasks);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogError(ex, "Error deleting files from remote storage {remote}", group.Key);
|
||||
}
|
||||
}
|
||||
|
||||
// Delete all file records from the database
|
||||
var fileIds = files.Select(f => f.Id).ToList();
|
||||
await db.Files
|
||||
.Where(f => fileIds.Contains(f.Id))
|
||||
.ExecuteDeleteAsync();
|
||||
|
||||
logger.LogInformation($"Completed deleting {files.Count} files");
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets the number of references to a file based on CloudFileReference records
|
||||
/// </summary>
|
||||
/// <param name="fileId">The ID of the file</param>
|
||||
/// <returns>The number of references to the file</returns>
|
||||
public async Task<int> GetReferenceCountAsync(string fileId)
|
||||
{
|
||||
return await db.FileReferences
|
||||
.Where(r => r.FileId == fileId)
|
||||
.CountAsync();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Checks if a file is referenced by any resource
|
||||
/// </summary>
|
||||
/// <param name="fileId">The ID of the file to check</param>
|
||||
/// <returns>True if the file is referenced, false otherwise</returns>
|
||||
public async Task<bool> IsReferencedAsync(string fileId)
|
||||
{
|
||||
return await db.FileReferences
|
||||
.Where(r => r.FileId == fileId)
|
||||
.AnyAsync();
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user