From 2daf8f5d77facc4512c5e7c997126c5b3ab21830 Mon Sep 17 00:00:00 2001 From: LittleSheep Date: Wed, 14 Jan 2026 19:43:43 +0800 Subject: [PATCH] :zap: Much larger batch in validation files --- .../Storage/FileReanalysisService.cs | 118 ++++++++++++++++-- DysonNetwork.sln.DotSettings.user | 1 + 2 files changed, 107 insertions(+), 12 deletions(-) diff --git a/DysonNetwork.Drive/Storage/FileReanalysisService.cs b/DysonNetwork.Drive/Storage/FileReanalysisService.cs index 1d7f829b..110da79e 100644 --- a/DysonNetwork.Drive/Storage/FileReanalysisService.cs +++ b/DysonNetwork.Drive/Storage/FileReanalysisService.cs @@ -239,6 +239,102 @@ public class FileReanalysisService( } } + private async Task ValidateBatchCompressionAndThumbnailAsync(List files, bool validateCompression, bool validateThumbnail) + { + // Collect unique pool IDs and fetch all pools in one query + var poolIds = files.Select(f => f.Object!.FileReplicas.First(r => r.IsPrimary).PoolId) + .Where(pid => pid.HasValue) + .Select(pid => pid.Value) + .Distinct() + .ToList(); + var pools = await db.Pools.Where(p => poolIds.Contains(p.Id)).ToDictionaryAsync(p => p.Id); + + var semaphore = new SemaphoreSlim(20); + var groupedByPool = files.GroupBy(f => f.Object!.FileReplicas.First(r => r.IsPrimary).PoolId); + + var tasks = new List(); + foreach (var group in groupedByPool) + { + if (!group.Key.HasValue) continue; + var poolId = group.Key.Value; + var poolFiles = group.ToList(); + var task = semaphore.WaitAsync().ContinueWith(async _ => + { + try + { + if (!pools.TryGetValue(poolId, out var pool)) + { + logger.LogWarning("No pool found for pool {PoolId}, skipping batch validation", poolId); + return; + } + + var dest = pool.StorageConfig; + var client = CreateMinioClient(dest); + if (client == null) + { + logger.LogWarning("Failed to create Minio client for pool {PoolId}, skipping batch validation", poolId); + return; + } + + // List all objects in the bucket to merge existence checks + var listArgs = new ListObjectsArgs().WithBucket(dest.Bucket); + var objectNames = new HashSet(); + await foreach (var item in client.ListObjectsEnumAsync(listArgs)) + objectNames.Add(item.Key); + + // Check existence for each file in the group + foreach (var file in poolFiles) + { + if (file.Object == null) continue; + var primaryReplica = file.Object.FileReplicas.FirstOrDefault(r => r.IsPrimary); + if (primaryReplica == null) continue; + + var updated = false; + + if (validateCompression && file.Object.HasCompression) + { + if (!objectNames.Contains(primaryReplica.StorageId + ".compressed")) + { + logger.LogInformation("File {FileId} has compression flag but compressed version not found, setting HasCompression to false", file.Id); + file.Object.HasCompression = false; + updated = true; + } + } + + if (validateThumbnail && file.Object.HasThumbnail) + { + if (!objectNames.Contains(primaryReplica.StorageId + ".thumbnail")) + { + logger.LogInformation("File {FileId} has thumbnail flag but thumbnail not found, setting HasThumbnail to false", file.Id); + file.Object.HasThumbnail = false; + updated = true; + } + } + + if (updated) + { + logger.LogInformation("Updated compression/thumbnail status for file {FileId}", file.Id); + } + } + + // Save changes for the group + await db.SaveChangesAsync(); + } + catch (Exception ex) + { + logger.LogError(ex, "Failed to batch validate compression/thumbnail for pool {PoolId}", poolId); + } + finally + { + semaphore.Release(); + } + }).Unwrap(); + tasks.Add(task); + } + + await Task.WhenAll(tasks); + } + private static async Task ObjectExistsAsync(IMinioClient client, string bucket, string objectName) { try @@ -290,28 +386,26 @@ public class FileReanalysisService( if (_options.ValidateCompression) { - var compressionFiles = await GetFilesNeedingCompressionValidationAsync(5); + var compressionFiles = await GetFilesNeedingCompressionValidationAsync(); if (compressionFiles.Count > 0) { - var file = compressionFiles[0]; - await ValidateCompressionAndThumbnailAsync(file); - _validationProcessed++; - _totalProcessed++; - logger.LogInformation("Validation progress: {ValidationProcessed} processed", _validationProcessed); + await ValidateBatchCompressionAndThumbnailAsync(compressionFiles, true, false); + _validationProcessed += compressionFiles.Count; + _totalProcessed += compressionFiles.Count; + logger.LogInformation("Batch compression validation progress: {ValidationProcessed} processed", _validationProcessed); return; } } if (_options.ValidateThumbnails) { - var thumbnailFiles = await GetFilesNeedingThumbnailValidationAsync(5); + var thumbnailFiles = await GetFilesNeedingThumbnailValidationAsync(); if (thumbnailFiles.Count > 0) { - var file = thumbnailFiles[0]; - await ValidateCompressionAndThumbnailAsync(file); - _validationProcessed++; - _totalProcessed++; - logger.LogInformation("Validation progress: {ValidationProcessed} processed", _validationProcessed); + await ValidateBatchCompressionAndThumbnailAsync(thumbnailFiles, false, true); + _validationProcessed += thumbnailFiles.Count; + _totalProcessed += thumbnailFiles.Count; + logger.LogInformation("Batch thumbnail validation progress: {ValidationProcessed} processed", _validationProcessed); return; } } diff --git a/DysonNetwork.sln.DotSettings.user b/DysonNetwork.sln.DotSettings.user index 38ab4e31..dbba0646 100644 --- a/DysonNetwork.sln.DotSettings.user +++ b/DysonNetwork.sln.DotSettings.user @@ -76,6 +76,7 @@ ForceIncluded ForceIncluded ForceIncluded + ForceIncluded ForceIncluded ForceIncluded ForceIncluded