Add the cloud file recycling job back to online with data safety.

This commit is contained in:
LittleSheep 2025-06-12 00:58:16 +08:00
parent 2e09e63022
commit ffb3f83b96
3 changed files with 73 additions and 100 deletions

View File

@ -72,6 +72,7 @@ public class Profile : ModelBase
[Column(TypeName = "jsonb")] public VerificationMark? Verification { get; set; }
[Column(TypeName = "jsonb")] public BadgeReferenceObject? ActiveBadge { get; set; }
[Column(TypeName = "jsonb")] public SubscriptionReferenceObject? StellarMembership { get; set; }
public int Experience { get; set; } = 0;
[NotMapped] public int Level => Leveling.ExperiencePerLevel.Count(xp => Experience >= xp) - 1;

View File

@ -53,6 +53,12 @@ public class CloudFile : ModelBase, ICloudFile, IIdentifiedResource
public Instant? UploadedAt { get; set; }
[MaxLength(128)] public string? UploadedTo { get; set; }
public bool HasCompression { get; set; } = false;
/// <summary>
/// The field is set to true if the recycling job plans to delete the file.
/// Due to the unstable of the recycling job, this doesn't really delete the file until a human verifies it.
/// </summary>
public bool IsMarkedRecycle { get; set; } = false;
/// The object name which stored remotely,
/// multiple cloud file may have same storage id to indicate they are the same file

View File

@ -14,115 +14,81 @@ public class CloudFileUnusedRecyclingJob(
{
public async Task Execute(IJobExecutionContext context)
{
return;
logger.LogInformation("Deleting unused cloud files...");
logger.LogInformation("Marking unused cloud files...");
var cutoff = SystemClock.Instance.GetCurrentInstant() - Duration.FromHours(1);
var now = SystemClock.Instance.GetCurrentInstant();
const int batchSize = 1000; // Process larger batches for efficiency
var processedCount = 0;
var markedCount = 0;
var totalFiles = await db.Files.Where(f => !f.IsMarkedRecycle).CountAsync();
// Get files that are either expired or created more than an hour ago
var fileIds = await db.Files
.Select(f => f.Id)
.ToListAsync();
logger.LogInformation("Found {TotalFiles} files to check for unused status", totalFiles);
// Filter to only include files that have no references or all references have expired
var deletionPlan = new List<string>();
foreach (var batch in fileIds.Chunk(100)) // Process in batches to avoid excessive query size
// Define a timestamp to limit the age of files we're processing in this run
// This spreads the processing across multiple job runs for very large databases
var ageThreshold = now - Duration.FromDays(30); // Process files up to 90 days old in this run
// Instead of loading all files at once, use pagination
var hasMoreFiles = true;
string? lastProcessedId = null;
while (hasMoreFiles)
{
var references = await fileRefService.GetReferencesAsync(batch);
deletionPlan.AddRange(from refer in references
where refer.Value.Count == 0 || refer.Value.All(r => r.ExpiredAt != null && now >= r.ExpiredAt)
select refer.Key);
}
// Query for the next batch of files using keyset pagination
var filesQuery = db.Files
.Where(f => !f.IsMarkedRecycle)
.Where(f => f.CreatedAt <= ageThreshold); // Only process older files first
if (deletionPlan.Count == 0)
{
logger.LogInformation("No files to delete");
return;
}
// Get the actual file objects for the files to be deleted
var files = await db.Files
.Where(f => deletionPlan.Contains(f.Id))
.ToListAsync();
logger.LogInformation($"Found {files.Count} files to delete...");
// Group files by StorageId and find which ones are safe to delete
var storageIds = files.Where(f => f.StorageId != null)
.Select(f => f.StorageId!)
.Distinct()
.ToList();
// Check if any other files with the same storage IDs are referenced
var usedStorageIds = new List<string>();
var filesWithSameStorageId = await db.Files
.Where(f => f.StorageId != null &&
storageIds.Contains(f.StorageId) &&
!files.Select(ff => ff.Id).Contains(f.Id))
.ToListAsync();
foreach (var file in filesWithSameStorageId)
{
// Get all references for the file
var references = await fileRefService.GetReferencesAsync(file.Id);
// Check if file has active references (non-expired)
if (references.Any(r => r.ExpiredAt == null || r.ExpiredAt > now) && file.StorageId != null)
if (lastProcessedId != null)
{
usedStorageIds.Add(file.StorageId);
filesQuery = filesQuery.Where(f => string.Compare(f.Id, lastProcessedId) > 0);
}
var fileBatch = await filesQuery
.OrderBy(f => f.Id) // Ensure consistent ordering for pagination
.Take(batchSize)
.Select(f => f.Id)
.ToListAsync();
if (fileBatch.Count == 0)
{
hasMoreFiles = false;
continue;
}
processedCount += fileBatch.Count;
lastProcessedId = fileBatch.Last();
// Get all relevant file references for this batch
var fileReferences = await fileRefService.GetReferencesAsync(fileBatch);
// Filter to find files that have no references or all expired references
var filesToMark = fileBatch.Where(fileId =>
!fileReferences.TryGetValue(fileId, out var references) ||
references.Count == 0 ||
references.All(r => r.ExpiredAt.HasValue && r.ExpiredAt.Value <= now)
).ToList();
if (filesToMark.Count > 0)
{
// Use a bulk update for better performance - mark all qualifying files at once
var updateCount = await db.Files
.Where(f => filesToMark.Contains(f.Id))
.ExecuteUpdateAsync(setter => setter
.SetProperty(f => f.IsMarkedRecycle, true));
markedCount += updateCount;
}
// Log progress periodically
if (processedCount % 10000 == 0 || !hasMoreFiles)
{
logger.LogInformation(
"Progress: processed {ProcessedCount}/{TotalFiles} files, marked {MarkedCount} for recycling",
processedCount, totalFiles, markedCount);
}
}
// Group files for deletion
var filesToDelete = files.Where(f => f.StorageId == null || !usedStorageIds.Contains(f.StorageId))
.GroupBy(f => f.UploadedTo)
.ToDictionary(grouping => grouping.Key!, grouping => grouping.ToList());
// Delete files by remote storage
foreach (var group in filesToDelete.Where(group => !string.IsNullOrEmpty(group.Key)))
{
try
{
var dest = fs.GetRemoteStorageConfig(group.Key);
var client = fs.CreateMinioClient(dest);
if (client == null) continue;
// Create delete tasks for each file in the group
// var deleteTasks = group.Value.Select(file =>
// {
// var objectId = file.StorageId ?? file.Id;
// var tasks = new List<Task>
// {
// client.RemoveObjectAsync(new Minio.DataModel.Args.RemoveObjectArgs()
// .WithBucket(dest.Bucket)
// .WithObject(objectId))
// };
//
// if (file.HasCompression)
// {
// tasks.Add(client.RemoveObjectAsync(new Minio.DataModel.Args.RemoveObjectArgs()
// .WithBucket(dest.Bucket)
// .WithObject(objectId + ".compressed")));
// }
//
// return Task.WhenAll(tasks);
// });
//
// await Task.WhenAll(deleteTasks);
}
catch (Exception ex)
{
logger.LogError(ex, "Error deleting files from remote storage {remote}", group.Key);
}
}
// Delete all file records from the database
var fileIdsToDelete = files.Select(f => f.Id).ToList();
await db.Files
.Where(f => fileIdsToDelete.Contains(f.Id))
.ExecuteDeleteAsync();
logger.LogInformation($"Completed deleting {files.Count} files");
logger.LogInformation("Completed marking {MarkedCount} files for recycling", markedCount);
}
}