diff --git a/DysonNetwork.Drive/Storage/FileReanalysisService.cs b/DysonNetwork.Drive/Storage/FileReanalysisService.cs index 95f1278d..2490d60c 100644 --- a/DysonNetwork.Drive/Storage/FileReanalysisService.cs +++ b/DysonNetwork.Drive/Storage/FileReanalysisService.cs @@ -20,12 +20,13 @@ public class FileReanalysisService( { private readonly FileReanalysisOptions _options = options.Value; private readonly HashSet _failedFileIds = []; + private readonly Dictionary> _bucketObjectCache = new(); private int _totalProcessed = 0; private int _reanalysisSuccess = 0; private int _reanalysisFailure = 0; private int _validationProcessed = 0; - private async Task> GetFilesNeedingReanalysisAsync(int limit = 100) + private async Task> GetFilesNeedingReanalysisAsync(int limit = 1000) { var now = SystemClock.Instance.GetCurrentInstant(); var deadline = now.Minus(Duration.FromMinutes(30)); @@ -44,30 +45,68 @@ public class FileReanalysisService( .ToListAsync(); } - private async Task> GetFilesNeedingCompressionValidationAsync(int limit = 100) + private async Task> GetFilesNeedingCompressionValidationAsync(int limit = 1000) { - return await db.Files + var files = await db.Files .Where(f => f.ObjectId != null) - .Include(f => f.Object) - .ThenInclude(f => f.FileReplicas) + .Select(f => new SnCloudFile + { + Id = f.Id, + ObjectId = f.ObjectId, + Object = new SnFileObject + { + Id = f.Object.Id, + HasCompression = f.Object.HasCompression, + FileReplicas = f.Object.FileReplicas.Where(r => r.IsPrimary).ToList() + } + }) .Where(f => f.Object != null && f.Object.HasCompression) .Where(f => f.Object!.FileReplicas.Count > 0) - .OrderBy(f => f.Object!.UpdatedAt) + .OrderBy(f => f.Object!.Id) .Take(limit) .ToListAsync(); + + // Load objects for tracking updates + var objectIds = files.Select(f => f.ObjectId).ToList(); + var objects = await db.FileObjects.Where(o => objectIds.Contains(o.Id)).ToDictionaryAsync(o => o.Id); + foreach (var file in files) + { + file.Object = objects[file.ObjectId!]; + } + + return files; } - private async Task> GetFilesNeedingThumbnailValidationAsync(int limit = 100) + private async Task> GetFilesNeedingThumbnailValidationAsync(int limit = 1000) { - return await db.Files + var files = await db.Files .Where(f => f.ObjectId != null) - .Include(f => f.Object) - .ThenInclude(f => f.FileReplicas) + .Select(f => new SnCloudFile + { + Id = f.Id, + ObjectId = f.ObjectId, + Object = new SnFileObject + { + Id = f.Object.Id, + HasThumbnail = f.Object.HasThumbnail, + FileReplicas = f.Object.FileReplicas.Where(r => r.IsPrimary).ToList() + } + }) .Where(f => f.Object != null && f.Object.HasThumbnail) .Where(f => f.Object!.FileReplicas.Count > 0) - .OrderBy(f => f.Object!.UpdatedAt) + .OrderBy(f => f.Object!.Id) .Take(limit) .ToListAsync(); + + // Load objects for tracking updates + var objectIds = files.Select(f => f.ObjectId).ToList(); + var objects = await db.FileObjects.Where(o => objectIds.Contains(o.Id)).ToDictionaryAsync(o => o.Id); + foreach (var file in files) + { + file.Object = objects[file.ObjectId!]; + } + + return files; } private async Task ReanalyzeFileAsync(SnCloudFile file) @@ -166,93 +205,20 @@ public class FileReanalysisService( } } - public async Task ValidateCompressionAndThumbnailAsync(SnCloudFile file) - { - if (file.Object == null) - { - logger.LogWarning("File {FileId} missing object, skipping validation", file.Id); - return; - } - - var primaryReplica = file.Object.FileReplicas.FirstOrDefault(r => r.IsPrimary); - if (primaryReplica == null) - { - logger.LogWarning("File {FileId} has no primary replica, skipping validation", file.Id); - return; - } - - try - { - var pool = await db.Pools.FindAsync(primaryReplica.PoolId.Value); - if (pool == null) - { - logger.LogWarning("No pool found for replica {ReplicaId}, skipping validation", primaryReplica.Id); - return; - } - - var dest = pool.StorageConfig; - var client = CreateMinioClient(dest); - if (client == null) - { - logger.LogWarning("Failed to create Minio client for pool {PoolId}, skipping validation", - primaryReplica.PoolId); - return; - } - - bool updated = false; - - if (_options.ValidateCompression && file.Object.HasCompression) - { - var compressedExists = - await ObjectExistsAsync(client, dest.Bucket, primaryReplica.StorageId + ".compressed"); - if (!compressedExists) - { - logger.LogInformation( - "File {FileId} has compression flag but compressed version not found, setting HasCompression to false", - file.Id); - file.Object.HasCompression = false; - updated = true; - } - } - - if (_options.ValidateThumbnails && file.Object.HasThumbnail) - { - var thumbnailExists = - await ObjectExistsAsync(client, dest.Bucket, primaryReplica.StorageId + ".thumbnail"); - if (!thumbnailExists) - { - logger.LogInformation( - "File {FileId} has thumbnail flag but thumbnail not found, setting HasThumbnail to false", - file.Id); - file.Object.HasThumbnail = false; - updated = true; - } - } - - if (updated) - { - await db.SaveChangesAsync(); - logger.LogInformation("Updated compression/thumbnail status for file {FileId}", file.Id); - } - } - catch (Exception ex) - { - logger.LogError(ex, "Failed to validate compression/thumbnail for file {FileId}", file.Id); - } - } - - private async Task ValidateBatchCompressionAndThumbnailAsync(List files, bool validateCompression, - bool validateThumbnail) + private async Task ValidateBatchCompressionAndThumbnailAsync( + List files, + bool validateCompression, + bool validateThumbnail + ) { // Collect unique pool IDs and fetch all pools in one query var poolIds = files.Select(f => f.Object!.FileReplicas.First(r => r.IsPrimary).PoolId) .Where(pid => pid.HasValue) - .Select(pid => pid.Value) + .Select(pid => pid!.Value) .Distinct() .ToList(); var pools = await db.Pools.Where(p => poolIds.Contains(p.Id)).ToDictionaryAsync(p => p.Id); - var semaphore = new SemaphoreSlim(20); var groupedByPool = files.GroupBy(f => f.Object!.FileReplicas.First(r => r.IsPrimary).PoolId); var tasks = new List(); @@ -261,7 +227,7 @@ public class FileReanalysisService( if (!group.Key.HasValue) continue; var poolId = group.Key.Value; var poolFiles = group.ToList(); - var task = semaphore.WaitAsync().ContinueWith(async _ => + tasks.Add(Task.Run(async () => { try { @@ -280,11 +246,18 @@ public class FileReanalysisService( return; } - // List all objects in the bucket to merge existence checks - var listArgs = new ListObjectsArgs().WithBucket(dest.Bucket); - var objectNames = new HashSet(); - await foreach (var item in client.ListObjectsEnumAsync(listArgs)) - objectNames.Add(item.Key); + // Get or fetch cached list of relevant objects in the bucket + if (!_bucketObjectCache.TryGetValue(dest.Bucket, out var objectNames)) + { + var listArgs = new ListObjectsArgs().WithBucket(dest.Bucket); + objectNames = new HashSet(); + await foreach (var item in client.ListObjectsEnumAsync(listArgs)) + { + if (item.Key.EndsWith(".compressed") || item.Key.EndsWith(".thumbnail")) + objectNames.Add(item.Key); + } + _bucketObjectCache[dest.Bucket] = objectNames; + } // Check existence for each file in the group foreach (var file in poolFiles) @@ -332,33 +305,12 @@ public class FileReanalysisService( { logger.LogError(ex, "Failed to batch validate compression/thumbnail for pool {PoolId}", poolId); } - finally - { - semaphore.Release(); - } - }).Unwrap(); - tasks.Add(task); + })); } await Task.WhenAll(tasks); } - private static async Task ObjectExistsAsync(IMinioClient client, string bucket, string objectName) - { - try - { - var statArgs = new StatObjectArgs() - .WithBucket(bucket) - .WithObject(objectName); - await client.StatObjectAsync(statArgs); - return true; - } - catch (ObjectNotFoundException) - { - return false; - } - } - public async Task ProcessNextFileAsync() { List reanalysisFiles = [];