Improve the file validation speed

This commit is contained in:
2026-01-14 23:09:17 +08:00
parent 68d0881e34
commit 9513a460d0

View File

@@ -20,12 +20,13 @@ public class FileReanalysisService(
{ {
private readonly FileReanalysisOptions _options = options.Value; private readonly FileReanalysisOptions _options = options.Value;
private readonly HashSet<string> _failedFileIds = []; private readonly HashSet<string> _failedFileIds = [];
private readonly Dictionary<string, HashSet<string>> _bucketObjectCache = new();
private int _totalProcessed = 0; private int _totalProcessed = 0;
private int _reanalysisSuccess = 0; private int _reanalysisSuccess = 0;
private int _reanalysisFailure = 0; private int _reanalysisFailure = 0;
private int _validationProcessed = 0; private int _validationProcessed = 0;
private async Task<List<SnCloudFile>> GetFilesNeedingReanalysisAsync(int limit = 100) private async Task<List<SnCloudFile>> GetFilesNeedingReanalysisAsync(int limit = 1000)
{ {
var now = SystemClock.Instance.GetCurrentInstant(); var now = SystemClock.Instance.GetCurrentInstant();
var deadline = now.Minus(Duration.FromMinutes(30)); var deadline = now.Minus(Duration.FromMinutes(30));
@@ -44,30 +45,68 @@ public class FileReanalysisService(
.ToListAsync(); .ToListAsync();
} }
private async Task<List<SnCloudFile>> GetFilesNeedingCompressionValidationAsync(int limit = 100) private async Task<List<SnCloudFile>> GetFilesNeedingCompressionValidationAsync(int limit = 1000)
{ {
return await db.Files var files = await db.Files
.Where(f => f.ObjectId != null) .Where(f => f.ObjectId != null)
.Include(f => f.Object) .Select(f => new SnCloudFile
.ThenInclude(f => f.FileReplicas) {
Id = f.Id,
ObjectId = f.ObjectId,
Object = new SnFileObject
{
Id = f.Object.Id,
HasCompression = f.Object.HasCompression,
FileReplicas = f.Object.FileReplicas.Where(r => r.IsPrimary).ToList()
}
})
.Where(f => f.Object != null && f.Object.HasCompression) .Where(f => f.Object != null && f.Object.HasCompression)
.Where(f => f.Object!.FileReplicas.Count > 0) .Where(f => f.Object!.FileReplicas.Count > 0)
.OrderBy(f => f.Object!.UpdatedAt) .OrderBy(f => f.Object!.Id)
.Take(limit) .Take(limit)
.ToListAsync(); .ToListAsync();
// Load objects for tracking updates
var objectIds = files.Select(f => f.ObjectId).ToList();
var objects = await db.FileObjects.Where(o => objectIds.Contains(o.Id)).ToDictionaryAsync(o => o.Id);
foreach (var file in files)
{
file.Object = objects[file.ObjectId!];
}
return files;
} }
private async Task<List<SnCloudFile>> GetFilesNeedingThumbnailValidationAsync(int limit = 100) private async Task<List<SnCloudFile>> GetFilesNeedingThumbnailValidationAsync(int limit = 1000)
{ {
return await db.Files var files = await db.Files
.Where(f => f.ObjectId != null) .Where(f => f.ObjectId != null)
.Include(f => f.Object) .Select(f => new SnCloudFile
.ThenInclude(f => f.FileReplicas) {
Id = f.Id,
ObjectId = f.ObjectId,
Object = new SnFileObject
{
Id = f.Object.Id,
HasThumbnail = f.Object.HasThumbnail,
FileReplicas = f.Object.FileReplicas.Where(r => r.IsPrimary).ToList()
}
})
.Where(f => f.Object != null && f.Object.HasThumbnail) .Where(f => f.Object != null && f.Object.HasThumbnail)
.Where(f => f.Object!.FileReplicas.Count > 0) .Where(f => f.Object!.FileReplicas.Count > 0)
.OrderBy(f => f.Object!.UpdatedAt) .OrderBy(f => f.Object!.Id)
.Take(limit) .Take(limit)
.ToListAsync(); .ToListAsync();
// Load objects for tracking updates
var objectIds = files.Select(f => f.ObjectId).ToList();
var objects = await db.FileObjects.Where(o => objectIds.Contains(o.Id)).ToDictionaryAsync(o => o.Id);
foreach (var file in files)
{
file.Object = objects[file.ObjectId!];
}
return files;
} }
private async Task<bool> ReanalyzeFileAsync(SnCloudFile file) private async Task<bool> ReanalyzeFileAsync(SnCloudFile file)
@@ -166,93 +205,20 @@ public class FileReanalysisService(
} }
} }
public async Task ValidateCompressionAndThumbnailAsync(SnCloudFile file) private async Task ValidateBatchCompressionAndThumbnailAsync(
{ List<SnCloudFile> files,
if (file.Object == null) bool validateCompression,
{ bool validateThumbnail
logger.LogWarning("File {FileId} missing object, skipping validation", file.Id); )
return;
}
var primaryReplica = file.Object.FileReplicas.FirstOrDefault(r => r.IsPrimary);
if (primaryReplica == null)
{
logger.LogWarning("File {FileId} has no primary replica, skipping validation", file.Id);
return;
}
try
{
var pool = await db.Pools.FindAsync(primaryReplica.PoolId.Value);
if (pool == null)
{
logger.LogWarning("No pool found for replica {ReplicaId}, skipping validation", primaryReplica.Id);
return;
}
var dest = pool.StorageConfig;
var client = CreateMinioClient(dest);
if (client == null)
{
logger.LogWarning("Failed to create Minio client for pool {PoolId}, skipping validation",
primaryReplica.PoolId);
return;
}
bool updated = false;
if (_options.ValidateCompression && file.Object.HasCompression)
{
var compressedExists =
await ObjectExistsAsync(client, dest.Bucket, primaryReplica.StorageId + ".compressed");
if (!compressedExists)
{
logger.LogInformation(
"File {FileId} has compression flag but compressed version not found, setting HasCompression to false",
file.Id);
file.Object.HasCompression = false;
updated = true;
}
}
if (_options.ValidateThumbnails && file.Object.HasThumbnail)
{
var thumbnailExists =
await ObjectExistsAsync(client, dest.Bucket, primaryReplica.StorageId + ".thumbnail");
if (!thumbnailExists)
{
logger.LogInformation(
"File {FileId} has thumbnail flag but thumbnail not found, setting HasThumbnail to false",
file.Id);
file.Object.HasThumbnail = false;
updated = true;
}
}
if (updated)
{
await db.SaveChangesAsync();
logger.LogInformation("Updated compression/thumbnail status for file {FileId}", file.Id);
}
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to validate compression/thumbnail for file {FileId}", file.Id);
}
}
private async Task ValidateBatchCompressionAndThumbnailAsync(List<SnCloudFile> files, bool validateCompression,
bool validateThumbnail)
{ {
// Collect unique pool IDs and fetch all pools in one query // Collect unique pool IDs and fetch all pools in one query
var poolIds = files.Select(f => f.Object!.FileReplicas.First(r => r.IsPrimary).PoolId) var poolIds = files.Select(f => f.Object!.FileReplicas.First(r => r.IsPrimary).PoolId)
.Where(pid => pid.HasValue) .Where(pid => pid.HasValue)
.Select(pid => pid.Value) .Select(pid => pid!.Value)
.Distinct() .Distinct()
.ToList(); .ToList();
var pools = await db.Pools.Where(p => poolIds.Contains(p.Id)).ToDictionaryAsync(p => p.Id); var pools = await db.Pools.Where(p => poolIds.Contains(p.Id)).ToDictionaryAsync(p => p.Id);
var semaphore = new SemaphoreSlim(20);
var groupedByPool = files.GroupBy(f => f.Object!.FileReplicas.First(r => r.IsPrimary).PoolId); var groupedByPool = files.GroupBy(f => f.Object!.FileReplicas.First(r => r.IsPrimary).PoolId);
var tasks = new List<Task>(); var tasks = new List<Task>();
@@ -261,7 +227,7 @@ public class FileReanalysisService(
if (!group.Key.HasValue) continue; if (!group.Key.HasValue) continue;
var poolId = group.Key.Value; var poolId = group.Key.Value;
var poolFiles = group.ToList(); var poolFiles = group.ToList();
var task = semaphore.WaitAsync().ContinueWith(async _ => tasks.Add(Task.Run(async () =>
{ {
try try
{ {
@@ -280,11 +246,18 @@ public class FileReanalysisService(
return; return;
} }
// List all objects in the bucket to merge existence checks // Get or fetch cached list of relevant objects in the bucket
var listArgs = new ListObjectsArgs().WithBucket(dest.Bucket); if (!_bucketObjectCache.TryGetValue(dest.Bucket, out var objectNames))
var objectNames = new HashSet<string>(); {
await foreach (var item in client.ListObjectsEnumAsync(listArgs)) var listArgs = new ListObjectsArgs().WithBucket(dest.Bucket);
objectNames.Add(item.Key); objectNames = new HashSet<string>();
await foreach (var item in client.ListObjectsEnumAsync(listArgs))
{
if (item.Key.EndsWith(".compressed") || item.Key.EndsWith(".thumbnail"))
objectNames.Add(item.Key);
}
_bucketObjectCache[dest.Bucket] = objectNames;
}
// Check existence for each file in the group // Check existence for each file in the group
foreach (var file in poolFiles) foreach (var file in poolFiles)
@@ -332,33 +305,12 @@ public class FileReanalysisService(
{ {
logger.LogError(ex, "Failed to batch validate compression/thumbnail for pool {PoolId}", poolId); logger.LogError(ex, "Failed to batch validate compression/thumbnail for pool {PoolId}", poolId);
} }
finally }));
{
semaphore.Release();
}
}).Unwrap();
tasks.Add(task);
} }
await Task.WhenAll(tasks); await Task.WhenAll(tasks);
} }
private static async Task<bool> ObjectExistsAsync(IMinioClient client, string bucket, string objectName)
{
try
{
var statArgs = new StatObjectArgs()
.WithBucket(bucket)
.WithObject(objectName);
await client.StatObjectAsync(statArgs);
return true;
}
catch (ObjectNotFoundException)
{
return false;
}
}
public async Task ProcessNextFileAsync() public async Task ProcessNextFileAsync()
{ {
List<SnCloudFile> reanalysisFiles = []; List<SnCloudFile> reanalysisFiles = [];