♻️ Update the usage counting since the pool id logic changed
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using NodaTime;
|
||||
using DysonNetwork.Shared.Models;
|
||||
using Quartz;
|
||||
|
||||
namespace DysonNetwork.Drive.Storage;
|
||||
@@ -40,8 +41,10 @@ public class CloudFileUnusedRecyclingJob(
|
||||
var markedCount = 0;
|
||||
var totalFiles = await db.Files
|
||||
.Where(f => f.FileIndexes.Count == 0)
|
||||
.Where(f => f.PoolId.HasValue && recyclablePools.Contains(f.PoolId.Value))
|
||||
.Where(f => f.Object!.FileReplicas.Any(r => r.PoolId.HasValue && recyclablePools.Contains(r.PoolId.Value)))
|
||||
.Where(f => !f.IsMarkedRecycle)
|
||||
.Include(f => f.Object)
|
||||
.ThenInclude(o => o.FileReplicas)
|
||||
.CountAsync();
|
||||
|
||||
logger.LogInformation("Found {TotalFiles} files to check for unused status", totalFiles);
|
||||
@@ -56,17 +59,18 @@ public class CloudFileUnusedRecyclingJob(
|
||||
|
||||
while (hasMoreFiles)
|
||||
{
|
||||
// Query for the next batch of files using keyset pagination
|
||||
var filesQuery = db.Files
|
||||
.Where(f => f.PoolId.HasValue && recyclablePools.Contains(f.PoolId.Value))
|
||||
IQueryable<SnCloudFile> baseQuery = db.Files
|
||||
.Where(f => f.Object!.FileReplicas.Any(r => r.PoolId.HasValue && recyclablePools.Contains(r.PoolId.Value)))
|
||||
.Where(f => !f.IsMarkedRecycle)
|
||||
.Where(f => f.CreatedAt <= ageThreshold); // Only process older files first
|
||||
.Where(f => f.CreatedAt <= ageThreshold)
|
||||
.Include(f => f.Object)
|
||||
.ThenInclude(o => o.FileReplicas);
|
||||
|
||||
if (lastProcessedId != null)
|
||||
filesQuery = filesQuery.Where(f => string.Compare(f.Id, lastProcessedId) > 0);
|
||||
baseQuery = baseQuery.Where(f => string.Compare(f.Id, lastProcessedId) > 0);
|
||||
|
||||
var fileBatch = await filesQuery
|
||||
.OrderBy(f => f.Id) // Ensure consistent ordering for pagination
|
||||
var fileBatch = await baseQuery
|
||||
.OrderBy(f => f.Id)
|
||||
.Take(batchSize)
|
||||
.Select(f => f.Id)
|
||||
.ToListAsync();
|
||||
@@ -80,12 +84,11 @@ public class CloudFileUnusedRecyclingJob(
|
||||
processedCount += fileBatch.Count;
|
||||
lastProcessedId = fileBatch.Last();
|
||||
|
||||
// Optimized query: Find files that have no other cloud files sharing the same object
|
||||
// A file is considered "unused" if no other SnCloudFile shares its ObjectId
|
||||
// Optimized query: Find files that have no file object or no replicas
|
||||
// A file is considered "unused" if its file object has no replicas
|
||||
var filesToMark = await db.Files
|
||||
.Where(f => fileBatch.Contains(f.Id))
|
||||
.Where(f => f.ObjectId == null || // No file object at all
|
||||
!db.Files.Any(cf => cf.ObjectId == f.ObjectId && cf.Id != f.Id)) // Or no other files share this object
|
||||
.Where(f => f.Object == null || f.Object.FileReplicas.Count == 0)
|
||||
.Select(f => f.Id)
|
||||
.ToListAsync();
|
||||
|
||||
|
||||
@@ -253,11 +253,12 @@ public class FileController(
|
||||
string? overrideMimeType
|
||||
)
|
||||
{
|
||||
if (!file.PoolId.HasValue)
|
||||
var primaryReplica = file.Object?.FileReplicas.FirstOrDefault(r => r.IsPrimary);
|
||||
if (primaryReplica == null || primaryReplica.PoolId == null)
|
||||
return StatusCode(StatusCodes.Status500InternalServerError,
|
||||
"File is in an inconsistent state: uploaded but no pool ID.");
|
||||
|
||||
var pool = await fs.GetPoolAsync(file.PoolId.Value);
|
||||
var pool = await fs.GetPoolAsync(primaryReplica.PoolId.Value);
|
||||
if (pool is null)
|
||||
return StatusCode(StatusCodes.Status410Gone, "The pool of the file no longer exists or not accessible.");
|
||||
|
||||
@@ -461,11 +462,10 @@ public class FileController(
|
||||
var filesQuery = db.Files
|
||||
.Where(e => e.IsMarkedRecycle == recycled)
|
||||
.Where(e => e.AccountId == accountId)
|
||||
.Include(e => e.Pool)
|
||||
.Include(e => e.Object)
|
||||
.AsQueryable();
|
||||
|
||||
if (pool.HasValue) filesQuery = filesQuery.Where(e => e.PoolId == pool);
|
||||
if (pool.HasValue) filesQuery = filesQuery.Where(e => e.Object!.FileReplicas.Any(r => r.PoolId == pool.Value));
|
||||
|
||||
if (!string.IsNullOrWhiteSpace(query))
|
||||
{
|
||||
|
||||
@@ -1,127 +0,0 @@
|
||||
using DysonNetwork.Shared.Models;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
|
||||
namespace DysonNetwork.Drive.Storage;
|
||||
|
||||
public class FileMigrationService(AppDatabase db, ILogger<FileMigrationService> logger)
|
||||
{
|
||||
public async Task MigrateCloudFilesAsync()
|
||||
{
|
||||
logger.LogInformation("Starting cloud file migration.");
|
||||
|
||||
var cloudFiles = await db.Files
|
||||
.Where(f =>
|
||||
f.ObjectId == null &&
|
||||
f.PoolId != null
|
||||
)
|
||||
.ToListAsync();
|
||||
|
||||
logger.LogDebug("Found {Count} cloud files to migrate.", cloudFiles.Count);
|
||||
|
||||
foreach (var cf in cloudFiles)
|
||||
{
|
||||
try
|
||||
{
|
||||
var ext = Path.GetExtension(cf.Name);
|
||||
var mimeType = ext != "" && MimeTypes.TryGetMimeType(ext, out var mime) ? mime : "application/octet-stream";
|
||||
|
||||
var fileObject = await db.FileObjects.FindAsync(cf.Id);
|
||||
|
||||
if (fileObject == null)
|
||||
{
|
||||
fileObject = new SnFileObject
|
||||
{
|
||||
Id = cf.Id,
|
||||
MimeType = mimeType,
|
||||
HasCompression = mimeType.StartsWith("image/"),
|
||||
HasThumbnail = mimeType.StartsWith("video/")
|
||||
};
|
||||
|
||||
db.FileObjects.Add(fileObject);
|
||||
}
|
||||
|
||||
var replicaExists = await db.FileReplicas.AnyAsync(r =>
|
||||
r.ObjectId == fileObject.Id &&
|
||||
r.PoolId == cf.PoolId!.Value);
|
||||
|
||||
if (!replicaExists)
|
||||
{
|
||||
var fileReplica = new SnFileReplica
|
||||
{
|
||||
Id = Guid.NewGuid(),
|
||||
ObjectId = fileObject.Id,
|
||||
PoolId = cf.PoolId!.Value,
|
||||
StorageId = cf.StorageId ?? cf.Id,
|
||||
Status = SnFileReplicaStatus.Available,
|
||||
IsPrimary = true
|
||||
};
|
||||
|
||||
fileObject.FileReplicas.Add(fileReplica);
|
||||
db.FileReplicas.Add(fileReplica);
|
||||
}
|
||||
|
||||
var permissionExists = await db.FilePermissions.AnyAsync(p => p.FileId == cf.Id);
|
||||
|
||||
if (!permissionExists)
|
||||
{
|
||||
var permission = new SnFilePermission
|
||||
{
|
||||
Id = Guid.NewGuid(),
|
||||
FileId = cf.Id,
|
||||
SubjectType = SnFilePermissionType.Anyone,
|
||||
SubjectId = string.Empty,
|
||||
Permission = SnFilePermissionLevel.Read
|
||||
};
|
||||
|
||||
db.FilePermissions.Add(permission);
|
||||
}
|
||||
|
||||
cf.ObjectId = fileObject.Id;
|
||||
cf.Object = fileObject;
|
||||
|
||||
await db.SaveChangesAsync();
|
||||
logger.LogInformation("Migrated file {FileId} successfully.", cf.Id);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogError(ex,
|
||||
"Failed migrating file {FileId}. ObjectId={ObjectId}, PoolId={PoolId}, StorageId={StorageId}",
|
||||
cf.Id,
|
||||
cf.ObjectId,
|
||||
cf.PoolId,
|
||||
cf.StorageId);
|
||||
}
|
||||
}
|
||||
|
||||
logger.LogInformation("Cloud file migration completed.");
|
||||
}
|
||||
|
||||
public async Task MigratePermissionsAsync()
|
||||
{
|
||||
logger.LogInformation("Starting file permission migration.");
|
||||
|
||||
var filesWithoutPermission = await db.Files
|
||||
.Where(f => !db.FilePermissions.Any(p => p.FileId == f.Id))
|
||||
.ToListAsync();
|
||||
|
||||
logger.LogDebug("Found {Count} files without permissions.", filesWithoutPermission.Count);
|
||||
|
||||
foreach (var file in filesWithoutPermission)
|
||||
{
|
||||
var permission = new SnFilePermission
|
||||
{
|
||||
Id = Guid.NewGuid(),
|
||||
FileId = file.Id,
|
||||
SubjectType = SnFilePermissionType.Anyone,
|
||||
SubjectId = string.Empty,
|
||||
Permission = SnFilePermissionLevel.Read
|
||||
};
|
||||
|
||||
db.FilePermissions.Add(permission);
|
||||
}
|
||||
|
||||
await db.SaveChangesAsync();
|
||||
|
||||
logger.LogInformation("Permission migration completed. Created {Count} permissions.", filesWithoutPermission.Count);
|
||||
}
|
||||
}
|
||||
@@ -23,10 +23,9 @@ public class FileReanalysisService(
|
||||
var now = SystemClock.Instance.GetCurrentInstant();
|
||||
var deadline = now.Minus(Duration.FromMinutes(30));
|
||||
return await db.Files
|
||||
.Where(f => f.ObjectId != null && f.PoolId != null)
|
||||
.Where(f => f.ObjectId != null)
|
||||
.Include(f => f.Object)
|
||||
.ThenInclude(f => f.FileReplicas)
|
||||
.Include(f => f.Pool)
|
||||
.Where(f => f.Object != null && (f.Object.Meta == null || f.Object.Meta.Count == 0))
|
||||
.Where(f => f.Object!.FileReplicas.Count > 0)
|
||||
.Where(f => f.CreatedAt <= deadline)
|
||||
@@ -39,9 +38,9 @@ public class FileReanalysisService(
|
||||
{
|
||||
logger.LogInformation("Starting reanalysis for file {FileId}: {FileName}", file.Id, file.Name);
|
||||
|
||||
if (file.Object == null || file.Pool == null)
|
||||
if (file.Object == null)
|
||||
{
|
||||
logger.LogWarning("File {FileId} missing object or pool, skipping reanalysis", file.Id);
|
||||
logger.LogWarning("File {FileId} missing object, skipping reanalysis", file.Id);
|
||||
return true; // not a failure
|
||||
}
|
||||
|
||||
@@ -147,16 +146,22 @@ public class FileReanalysisService(
|
||||
|
||||
private async Task DownloadFileAsync(SnCloudFile file, SnFileReplica replica, string tempPath)
|
||||
{
|
||||
var dest = file.Pool!.StorageConfig;
|
||||
if (dest == null)
|
||||
if (replica.PoolId == null)
|
||||
{
|
||||
throw new InvalidOperationException($"No remote storage configured for pool {file.PoolId}");
|
||||
throw new InvalidOperationException($"Replica for file {file.Id} has no pool ID");
|
||||
}
|
||||
|
||||
var pool = await db.Pools.FindAsync(replica.PoolId.Value);
|
||||
if (pool == null)
|
||||
{
|
||||
throw new InvalidOperationException($"No remote storage configured for pool {replica.PoolId}");
|
||||
}
|
||||
var dest = pool.StorageConfig;
|
||||
|
||||
var client = CreateMinioClient(dest);
|
||||
if (client == null)
|
||||
{
|
||||
throw new InvalidOperationException($"Failed to create Minio client for pool {file.PoolId}");
|
||||
throw new InvalidOperationException($"Failed to create Minio client for pool {replica.PoolId}");
|
||||
}
|
||||
|
||||
await using var fileStream = File.Create(tempPath);
|
||||
|
||||
@@ -38,7 +38,6 @@ public class FileService(
|
||||
|
||||
var file = await db.Files
|
||||
.Where(f => f.Id == fileId)
|
||||
.Include(f => f.Pool)
|
||||
.Include(f => f.Bundle)
|
||||
.Include(f => f.Object)
|
||||
.ThenInclude(o => o.FileReplicas)
|
||||
@@ -70,7 +69,7 @@ public class FileService(
|
||||
{
|
||||
var dbFiles = await db.Files
|
||||
.Where(f => uncachedIds.Contains(f.Id))
|
||||
.Include(f => f.Pool)
|
||||
.Include(f => f.Bundle)
|
||||
.Include(f => f.Object)
|
||||
.ThenInclude(o => o.FileReplicas)
|
||||
.ToListAsync();
|
||||
@@ -124,7 +123,7 @@ public class FileService(
|
||||
|
||||
fileObject.Hash = await HashFileAsync(processingPath);
|
||||
|
||||
await SaveFileToDatabaseAsync(file, fileObject);
|
||||
await SaveFileToDatabaseAsync(file, fileObject, pool.Id);
|
||||
|
||||
await PublishFileUploadedEventAsync(file, pool, processingPath, isTempFile);
|
||||
|
||||
@@ -245,13 +244,13 @@ public class FileService(
|
||||
return Task.FromResult((encryptedPath, true));
|
||||
}
|
||||
|
||||
private async Task SaveFileToDatabaseAsync(SnCloudFile file, SnFileObject fileObject)
|
||||
private async Task SaveFileToDatabaseAsync(SnCloudFile file, SnFileObject fileObject, Guid poolId)
|
||||
{
|
||||
var replica = new SnFileReplica
|
||||
{
|
||||
Id = Guid.NewGuid(),
|
||||
ObjectId = file.Id,
|
||||
PoolId = file.PoolId,
|
||||
PoolId = poolId,
|
||||
StorageId = file.StorageId ?? file.Id,
|
||||
Status = SnFileReplicaStatus.Available,
|
||||
IsPrimary = true
|
||||
@@ -540,7 +539,30 @@ public class FileService(
|
||||
|
||||
public async Task DeleteFileDataAsync(SnCloudFile file, bool force = false)
|
||||
{
|
||||
if (!file.PoolId.HasValue || file.ObjectId == null) return;
|
||||
if (file.ObjectId == null) return;
|
||||
|
||||
var replicas = await db.FileReplicas
|
||||
.Where(r => r.ObjectId == file.ObjectId)
|
||||
.ToListAsync();
|
||||
|
||||
if (replicas.Count == 0)
|
||||
{
|
||||
logger.LogWarning("No replicas found for file object {ObjectId}", file.ObjectId);
|
||||
return;
|
||||
}
|
||||
|
||||
var primaryReplica = replicas.FirstOrDefault(r => r.IsPrimary);
|
||||
if (primaryReplica == null)
|
||||
{
|
||||
logger.LogWarning("No primary replica found for file object {ObjectId}", file.ObjectId);
|
||||
return;
|
||||
}
|
||||
|
||||
if (primaryReplica.PoolId == null)
|
||||
{
|
||||
logger.LogWarning("Primary replica has no pool ID for file object {ObjectId}", file.ObjectId);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!force)
|
||||
{
|
||||
@@ -553,23 +575,12 @@ public class FileService(
|
||||
return;
|
||||
}
|
||||
|
||||
var replicas = await db.FileReplicas
|
||||
.Where(r => r.ObjectId == file.ObjectId)
|
||||
.ToListAsync();
|
||||
|
||||
if (replicas.Count == 0)
|
||||
{
|
||||
logger.LogWarning("No replicas found for file object {ObjectId}", file.ObjectId);
|
||||
return;
|
||||
}
|
||||
|
||||
var primaryReplica = replicas.First(r => r.IsPrimary);
|
||||
var dest = await GetRemoteStorageConfig(file.PoolId.Value);
|
||||
if (dest is null) throw new InvalidOperationException($"No remote storage configured for pool {file.PoolId}");
|
||||
var dest = await GetRemoteStorageConfig(primaryReplica.PoolId.Value);
|
||||
if (dest is null) throw new InvalidOperationException($"No remote storage configured for pool {primaryReplica.PoolId}");
|
||||
var client = CreateMinioClient(dest);
|
||||
if (client is null)
|
||||
throw new InvalidOperationException(
|
||||
$"Failed to configure client for remote destination '{file.PoolId}'"
|
||||
$"Failed to configure client for remote destination '{primaryReplica.PoolId}'"
|
||||
);
|
||||
|
||||
var bucket = dest.Bucket;
|
||||
@@ -615,7 +626,7 @@ public class FileService(
|
||||
|
||||
public async Task DeleteFileDataBatchAsync(List<SnCloudFile> files)
|
||||
{
|
||||
files = files.Where(f => f.PoolId.HasValue && f.ObjectId != null).ToList();
|
||||
files = files.Where(f => f.ObjectId != null).ToList();
|
||||
|
||||
var objectIds = files.Select(f => f.ObjectId).Distinct().ToList();
|
||||
var replicas = await db.FileReplicas
|
||||
@@ -759,8 +770,14 @@ public class FileService(
|
||||
|
||||
public async Task<int> DeletePoolRecycledFilesAsync(Guid poolId)
|
||||
{
|
||||
var fileIdsWithReplicas = await db.FileReplicas
|
||||
.Where(r => r.PoolId == poolId)
|
||||
.Select(r => r.ObjectId)
|
||||
.Distinct()
|
||||
.ToListAsync();
|
||||
|
||||
var files = await db.Files
|
||||
.Where(f => f.PoolId == poolId && f.IsMarkedRecycle)
|
||||
.Where(f => fileIdsWithReplicas.Contains(f.Id) && f.IsMarkedRecycle)
|
||||
.ToListAsync();
|
||||
var count = files.Count;
|
||||
var fileIds = files.Select(f => f.Id).ToList();
|
||||
|
||||
Reference in New Issue
Block a user