Much larger batch in validation files

This commit is contained in:
2026-01-14 19:43:43 +08:00
parent 10e680ed07
commit 2daf8f5d77
2 changed files with 107 additions and 12 deletions

View File

@@ -239,6 +239,102 @@ public class FileReanalysisService(
}
}
private async Task ValidateBatchCompressionAndThumbnailAsync(List<SnCloudFile> files, bool validateCompression, bool validateThumbnail)
{
// Collect unique pool IDs and fetch all pools in one query
var poolIds = files.Select(f => f.Object!.FileReplicas.First(r => r.IsPrimary).PoolId)
.Where(pid => pid.HasValue)
.Select(pid => pid.Value)
.Distinct()
.ToList();
var pools = await db.Pools.Where(p => poolIds.Contains(p.Id)).ToDictionaryAsync(p => p.Id);
var semaphore = new SemaphoreSlim(20);
var groupedByPool = files.GroupBy(f => f.Object!.FileReplicas.First(r => r.IsPrimary).PoolId);
var tasks = new List<Task>();
foreach (var group in groupedByPool)
{
if (!group.Key.HasValue) continue;
var poolId = group.Key.Value;
var poolFiles = group.ToList();
var task = semaphore.WaitAsync().ContinueWith(async _ =>
{
try
{
if (!pools.TryGetValue(poolId, out var pool))
{
logger.LogWarning("No pool found for pool {PoolId}, skipping batch validation", poolId);
return;
}
var dest = pool.StorageConfig;
var client = CreateMinioClient(dest);
if (client == null)
{
logger.LogWarning("Failed to create Minio client for pool {PoolId}, skipping batch validation", poolId);
return;
}
// List all objects in the bucket to merge existence checks
var listArgs = new ListObjectsArgs().WithBucket(dest.Bucket);
var objectNames = new HashSet<string>();
await foreach (var item in client.ListObjectsEnumAsync(listArgs))
objectNames.Add(item.Key);
// Check existence for each file in the group
foreach (var file in poolFiles)
{
if (file.Object == null) continue;
var primaryReplica = file.Object.FileReplicas.FirstOrDefault(r => r.IsPrimary);
if (primaryReplica == null) continue;
var updated = false;
if (validateCompression && file.Object.HasCompression)
{
if (!objectNames.Contains(primaryReplica.StorageId + ".compressed"))
{
logger.LogInformation("File {FileId} has compression flag but compressed version not found, setting HasCompression to false", file.Id);
file.Object.HasCompression = false;
updated = true;
}
}
if (validateThumbnail && file.Object.HasThumbnail)
{
if (!objectNames.Contains(primaryReplica.StorageId + ".thumbnail"))
{
logger.LogInformation("File {FileId} has thumbnail flag but thumbnail not found, setting HasThumbnail to false", file.Id);
file.Object.HasThumbnail = false;
updated = true;
}
}
if (updated)
{
logger.LogInformation("Updated compression/thumbnail status for file {FileId}", file.Id);
}
}
// Save changes for the group
await db.SaveChangesAsync();
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to batch validate compression/thumbnail for pool {PoolId}", poolId);
}
finally
{
semaphore.Release();
}
}).Unwrap();
tasks.Add(task);
}
await Task.WhenAll(tasks);
}
private static async Task<bool> ObjectExistsAsync(IMinioClient client, string bucket, string objectName)
{
try
@@ -290,28 +386,26 @@ public class FileReanalysisService(
if (_options.ValidateCompression)
{
var compressionFiles = await GetFilesNeedingCompressionValidationAsync(5);
var compressionFiles = await GetFilesNeedingCompressionValidationAsync();
if (compressionFiles.Count > 0)
{
var file = compressionFiles[0];
await ValidateCompressionAndThumbnailAsync(file);
_validationProcessed++;
_totalProcessed++;
logger.LogInformation("Validation progress: {ValidationProcessed} processed", _validationProcessed);
await ValidateBatchCompressionAndThumbnailAsync(compressionFiles, true, false);
_validationProcessed += compressionFiles.Count;
_totalProcessed += compressionFiles.Count;
logger.LogInformation("Batch compression validation progress: {ValidationProcessed} processed", _validationProcessed);
return;
}
}
if (_options.ValidateThumbnails)
{
var thumbnailFiles = await GetFilesNeedingThumbnailValidationAsync(5);
var thumbnailFiles = await GetFilesNeedingThumbnailValidationAsync();
if (thumbnailFiles.Count > 0)
{
var file = thumbnailFiles[0];
await ValidateCompressionAndThumbnailAsync(file);
_validationProcessed++;
_totalProcessed++;
logger.LogInformation("Validation progress: {ValidationProcessed} processed", _validationProcessed);
await ValidateBatchCompressionAndThumbnailAsync(thumbnailFiles, false, true);
_validationProcessed += thumbnailFiles.Count;
_totalProcessed += thumbnailFiles.Count;
logger.LogInformation("Batch thumbnail validation progress: {ValidationProcessed} processed", _validationProcessed);
return;
}
}