♻️ Replace the impl of the reanalysis service

This commit is contained in:
2026-01-14 23:50:15 +08:00
parent ca21acbff6
commit c3304e0663

View File

@@ -171,7 +171,6 @@ public class FileReanalysisService(
bool validateThumbnail bool validateThumbnail
) )
{ {
// Collect unique pool IDs and fetch all pools in one query
var poolIds = files.Select(f => f.Object!.FileReplicas.First(r => r.IsPrimary).PoolId) var poolIds = files.Select(f => f.Object!.FileReplicas.First(r => r.IsPrimary).PoolId)
.Where(pid => pid.HasValue) .Where(pid => pid.HasValue)
.Select(pid => pid!.Value) .Select(pid => pid!.Value)
@@ -181,93 +180,101 @@ public class FileReanalysisService(
var groupedByPool = files.GroupBy(f => f.Object!.FileReplicas.First(r => r.IsPrimary).PoolId); var groupedByPool = files.GroupBy(f => f.Object!.FileReplicas.First(r => r.IsPrimary).PoolId);
var tasks = new List<Task>();
foreach (var group in groupedByPool) foreach (var group in groupedByPool)
{ {
if (!group.Key.HasValue) continue; if (!group.Key.HasValue) continue;
var poolId = group.Key.Value; var poolId = group.Key.Value;
var poolFiles = group.ToList(); var poolFiles = group.ToList();
tasks.Add(Task.Run(async () =>
{
try
{
if (!pools.TryGetValue(poolId, out var pool)) if (!pools.TryGetValue(poolId, out var pool))
{ {
logger.LogWarning("No pool found for pool {PoolId}, skipping batch validation", poolId); logger.LogWarning("No pool found for pool {PoolId}, skipping batch validation", poolId);
return; continue;
} }
var dest = pool.StorageConfig; var dest = pool.StorageConfig;
var client = CreateMinioClient(dest); var client = CreateMinioClient(dest);
if (client == null) if (client == null)
{ {
logger.LogWarning("Failed to create Minio client for pool {PoolId}, skipping batch validation", logger.LogWarning("Failed to create Minio client for pool {PoolId}, skipping batch validation", poolId);
poolId); continue;
return;
} }
// Get or fetch cached list of relevant objects in the bucket var updatedFiles = new List<SnCloudFile>();
if (!_bucketObjectCache.TryGetValue(dest.Bucket, out var objectNames))
{
var listArgs = new ListObjectsArgs().WithBucket(dest.Bucket);
objectNames = [];
await foreach (var item in client.ListObjectsEnumAsync(listArgs))
{
if (item.Key.EndsWith(".compressed") || item.Key.EndsWith(".thumbnail"))
objectNames.Add(item.Key);
}
_bucketObjectCache[dest.Bucket] = objectNames;
}
// Check existence for each file in the group
foreach (var file in poolFiles) foreach (var file in poolFiles)
{ {
if (file.Object == null) continue; if (file.Object == null) continue;
var primaryReplica = file.Object.FileReplicas.FirstOrDefault(r => r.IsPrimary); var primaryReplica = file.Object.FileReplicas.FirstOrDefault(r => r.IsPrimary);
if (primaryReplica == null) continue; if (primaryReplica == null) continue;
var updated = false; var fileUpdated = false;
var baseStorageId = primaryReplica.StorageId;
if (validateCompression && file.Object.HasCompression) if (validateCompression && file.Object.HasCompression)
{ {
if (!objectNames.Contains(primaryReplica.StorageId + ".compressed")) try
{
var statArgs = new StatObjectArgs()
.WithBucket(dest.Bucket)
.WithObject(baseStorageId + ".compressed");
await client.StatObjectAsync(statArgs);
}
catch (ObjectNotFoundException)
{ {
logger.LogInformation( logger.LogInformation(
"File {FileId} has compression flag but compressed version not found, setting HasCompression to false", "File {FileId} has compression flag but compressed version not found, setting HasCompression to false",
file.Id); file.Id);
file.Object.HasCompression = false; file.Object.HasCompression = false;
updated = true; fileUpdated = true;
}
catch (Exception ex)
{
logger.LogWarning(ex, "Failed to stat compressed version for file {FileId}", file.Id);
} }
} }
if (validateThumbnail && file.Object.HasThumbnail) if (validateThumbnail && file.Object.HasThumbnail)
{ {
if (!objectNames.Contains(primaryReplica.StorageId + ".thumbnail")) try
{
var statArgs = new StatObjectArgs()
.WithBucket(dest.Bucket)
.WithObject(baseStorageId + ".thumbnail");
await client.StatObjectAsync(statArgs);
}
catch (ObjectNotFoundException)
{ {
logger.LogInformation( logger.LogInformation(
"File {FileId} has thumbnail flag but thumbnail not found, setting HasThumbnail to false", "File {FileId} has thumbnail flag but thumbnail not found, setting HasThumbnail to false",
file.Id); file.Id);
file.Object.HasThumbnail = false; file.Object.HasThumbnail = false;
updated = true; fileUpdated = true;
}
}
if (!updated) continue;
logger.LogInformation("Updated compression/thumbnail status for file {FileId}", file.Id);
db.Update(file.Object);
}
// Save changes for the group
await db.SaveChangesAsync();
} }
catch (Exception ex) catch (Exception ex)
{ {
logger.LogError(ex, "Failed to batch validate compression/thumbnail for pool {PoolId}", poolId); logger.LogWarning(ex, "Failed to stat thumbnail for file {FileId}", file.Id);
} }
}));
} }
await Task.WhenAll(tasks); if (fileUpdated)
{
updatedFiles.Add(file);
}
}
if (updatedFiles.Count > 0)
{
foreach (var file in updatedFiles)
{
db.Update(file.Object);
}
await db.SaveChangesAsync();
logger.LogInformation("Updated compression/thumbnail status for {Count} files in pool {PoolId}",
updatedFiles.Count, poolId);
}
}
} }
public async Task ProcessNextFileAsync() public async Task ProcessNextFileAsync()