🐛 Fixes for drive task tracking

This commit is contained in:
2025-11-10 01:53:58 +08:00
parent 2bfb50cc71
commit 741ed18ce5
5 changed files with 231 additions and 205 deletions

View File

@@ -31,7 +31,6 @@ public class BroadcastEventHandler(
[".gif", ".apng", ".avif"]; [".gif", ".apng", ".avif"];
protected override async Task ExecuteAsync(CancellationToken stoppingToken) protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{ {
var js = nats.CreateJetStreamContext(); var js = nats.CreateJetStreamContext();
@@ -55,7 +54,8 @@ public class BroadcastEventHandler(
{ {
await foreach (var msg in consumer.ConsumeAsync<byte[]>(cancellationToken: stoppingToken)) await foreach (var msg in consumer.ConsumeAsync<byte[]>(cancellationToken: stoppingToken))
{ {
var payload = JsonSerializer.Deserialize<FileUploadedEventPayload>(msg.Data, GrpcTypeHelper.SerializerOptions); var payload =
JsonSerializer.Deserialize<FileUploadedEventPayload>(msg.Data, GrpcTypeHelper.SerializerOptions);
if (payload == null) if (payload == null)
{ {
await msg.AckAsync(cancellationToken: stoppingToken); await msg.AckAsync(cancellationToken: stoppingToken);
@@ -132,7 +132,7 @@ public class BroadcastEventHandler(
} }
} }
private async Task ProcessAndUploadInBackgroundAsync( private async Task ProcessAndUploadInBackgroundAsync(
string fileId, string fileId,
Guid remoteId, Guid remoteId,
string storageId, string storageId,
@@ -307,19 +307,18 @@ public class BroadcastEventHandler(
{ {
await persistentTaskService.MarkTaskCompletedAsync(uploadTask.TaskId, new Dictionary<string, object?> await persistentTaskService.MarkTaskCompletedAsync(uploadTask.TaskId, new Dictionary<string, object?>
{ {
{ "fileId", fileId }, { "FileId", fileId },
{ "fileName", fileToUpdate.Name }, { "FileName", fileToUpdate.Name },
{ "fileSize", fileToUpdate.Size }, { "FileInfo", fileToUpdate },
{ "mimeType", newMimeType }, { "FileSize", fileToUpdate.Size },
{ "hasCompression", hasCompression }, { "MimeType", newMimeType },
{ "hasThumbnail", hasThumbnail } { "HasCompression", hasCompression },
{ "HasThumbnail", hasThumbnail }
}); });
// Send push notification for large files (>5MB) that took longer to process // Send push notification for large files (>5MB) that took longer to process
if (fileToUpdate.Size > 5 * 1024 * 1024) // 5MB threshold if (fileToUpdate.Size > 5 * 1024 * 1024) // 5MB threshold
{
await SendLargeFileProcessingCompleteNotificationAsync(uploadTask, fileToUpdate); await SendLargeFileProcessingCompleteNotificationAsync(uploadTask, fileToUpdate);
}
} }
} }

View File

@@ -1,5 +1,4 @@
using System.ComponentModel.DataAnnotations; using System.ComponentModel.DataAnnotations;
using System.Text.Json;
using DysonNetwork.Drive.Billing; using DysonNetwork.Drive.Billing;
using DysonNetwork.Drive.Storage.Model; using DysonNetwork.Drive.Storage.Model;
using DysonNetwork.Shared.Auth; using DysonNetwork.Shared.Auth;
@@ -31,15 +30,13 @@ public class FileUploadController(
{ {
private readonly string _tempPath = private readonly string _tempPath =
configuration.GetValue<string>("Storage:Uploads") ?? Path.Combine(Path.GetTempPath(), "multipart-uploads"); configuration.GetValue<string>("Storage:Uploads") ?? Path.Combine(Path.GetTempPath(), "multipart-uploads");
private readonly ILogger<FileUploadController> _logger = logger;
private const long DefaultChunkSize = 1024 * 1024 * 5; // 5MB private const long DefaultChunkSize = 1024 * 1024 * 5; // 5MB
[HttpPost("create")] [HttpPost("create")]
public async Task<IActionResult> CreateUploadTask([FromBody] CreateUploadTaskRequest request) public async Task<IActionResult> CreateUploadTask([FromBody] CreateUploadTaskRequest request)
{ {
var currentUser = HttpContext.Items["CurrentUser"] as Account; if (HttpContext.Items["CurrentUser"] is not Account currentUser)
if (currentUser is null)
return new ObjectResult(ApiError.Unauthorized()) { StatusCode = 401 }; return new ObjectResult(ApiError.Unauthorized()) { StatusCode = 401 };
var permissionCheck = await ValidateUserPermissions(currentUser); var permissionCheck = await ValidateUserPermissions(currentUser);
@@ -99,25 +96,25 @@ public class FileUploadController(
new ObjectResult(ApiError.Unauthorized(forbidden: true)) { StatusCode = 403 }; new ObjectResult(ApiError.Unauthorized(forbidden: true)) { StatusCode = 403 };
} }
private async Task<IActionResult?> ValidatePoolAccess(Account currentUser, FilePool pool, CreateUploadTaskRequest request) private Task<IActionResult?> ValidatePoolAccess(Account currentUser, FilePool pool, CreateUploadTaskRequest request)
{ {
if (pool.PolicyConfig.RequirePrivilege <= 0) return null; if (pool.PolicyConfig.RequirePrivilege <= 0) return Task.FromResult<IActionResult?>(null);
var privilege = currentUser.PerkSubscription is null ? 0 : var privilege = currentUser.PerkSubscription is null ? 0 :
PerkSubscriptionPrivilege.GetPrivilegeFromIdentifier(currentUser.PerkSubscription.Identifier); PerkSubscriptionPrivilege.GetPrivilegeFromIdentifier(currentUser.PerkSubscription.Identifier);
if (privilege < pool.PolicyConfig.RequirePrivilege) if (privilege < pool.PolicyConfig.RequirePrivilege)
{ {
return new ObjectResult(ApiError.Unauthorized( return Task.FromResult<IActionResult?>(new ObjectResult(ApiError.Unauthorized(
$"You need Stellar Program tier {pool.PolicyConfig.RequirePrivilege} to use pool {pool.Name}, you are tier {privilege}", $"You need Stellar Program tier {pool.PolicyConfig.RequirePrivilege} to use pool {pool.Name}, you are tier {privilege}",
forbidden: true)) forbidden: true))
{ StatusCode = 403 }; { StatusCode = 403 });
} }
return null; return Task.FromResult<IActionResult?>(null);
} }
private IActionResult? ValidatePoolPolicy(PolicyConfig policy, CreateUploadTaskRequest request) private static IActionResult? ValidatePoolPolicy(PolicyConfig policy, CreateUploadTaskRequest request)
{ {
if (!policy.AllowEncryption && !string.IsNullOrEmpty(request.EncryptPassword)) if (!policy.AllowEncryption && !string.IsNullOrEmpty(request.EncryptPassword))
{ {
@@ -138,13 +135,11 @@ public class FileUploadController(
var foundMatch = policy.AcceptTypes.Any(acceptType => var foundMatch = policy.AcceptTypes.Any(acceptType =>
{ {
if (acceptType.EndsWith("/*", StringComparison.OrdinalIgnoreCase)) if (!acceptType.EndsWith("/*", StringComparison.OrdinalIgnoreCase))
{ return acceptType.Equals(request.ContentType, StringComparison.OrdinalIgnoreCase);
var type = acceptType[..^2]; var type = acceptType[..^2];
return request.ContentType.StartsWith($"{type}/", StringComparison.OrdinalIgnoreCase); return request.ContentType.StartsWith($"{type}/", StringComparison.OrdinalIgnoreCase);
}
return acceptType.Equals(request.ContentType, StringComparison.OrdinalIgnoreCase);
}); });
if (!foundMatch) if (!foundMatch)
@@ -191,41 +186,13 @@ public class FileUploadController(
} }
} }
private async Task<(string taskId, UploadTask task)> CreateUploadTaskInternal(CreateUploadTaskRequest request)
{
var taskId = await Nanoid.GenerateAsync();
var taskPath = Path.Combine(_tempPath, taskId);
Directory.CreateDirectory(taskPath);
var chunkSize = request.ChunkSize ?? DefaultChunkSize;
var chunksCount = (int)Math.Ceiling((double)request.FileSize / chunkSize);
var task = new UploadTask
{
TaskId = taskId,
FileName = request.FileName,
FileSize = request.FileSize,
ContentType = request.ContentType,
ChunkSize = chunkSize,
ChunksCount = chunksCount,
PoolId = request.PoolId.Value,
BundleId = request.BundleId,
EncryptPassword = request.EncryptPassword,
ExpiredAt = request.ExpiredAt,
Hash = request.Hash,
};
await System.IO.File.WriteAllTextAsync(Path.Combine(taskPath, "task.json"), JsonSerializer.Serialize(task));
return (taskId, task);
}
public class UploadChunkRequest public class UploadChunkRequest
{ {
[Required] [Required]
public IFormFile Chunk { get; set; } = null!; public IFormFile Chunk { get; set; } = null!;
} }
[HttpPost("chunk/{taskId}/{chunkIndex}")] [HttpPost("chunk/{taskId}/{chunkIndex:int}")]
[RequestSizeLimit(DefaultChunkSize + 1024 * 1024)] // 6MB to be safe [RequestSizeLimit(DefaultChunkSize + 1024 * 1024)] // 6MB to be safe
[RequestFormLimits(MultipartBodyLengthLimit = DefaultChunkSize + 1024 * 1024)] [RequestFormLimits(MultipartBodyLengthLimit = DefaultChunkSize + 1024 * 1024)]
public async Task<IActionResult> UploadChunk(string taskId, int chunkIndex, [FromForm] UploadChunkRequest request) public async Task<IActionResult> UploadChunk(string taskId, int chunkIndex, [FromForm] UploadChunkRequest request)
@@ -278,7 +245,7 @@ public class FileUploadController(
try try
{ {
await MergeChunks(taskPath, mergedFilePath, persistentTask.ChunksCount); await MergeChunks(taskId, taskPath, mergedFilePath, persistentTask.ChunksCount, persistentTaskService);
var fileId = await Nanoid.GenerateAsync(); var fileId = await Nanoid.GenerateAsync();
var cloudFile = await fileService.ProcessNewFileAsync( var cloudFile = await fileService.ProcessNewFileAsync(
@@ -304,7 +271,7 @@ public class FileUploadController(
catch (Exception ex) catch (Exception ex)
{ {
// Log the actual exception for debugging // Log the actual exception for debugging
_logger.LogError(ex, "Failed to complete upload for task {TaskId}. Error: {ErrorMessage}", taskId, ex.Message); logger.LogError(ex, "Failed to complete upload for task {TaskId}. Error: {ErrorMessage}", taskId, ex.Message);
// Mark task as failed // Mark task as failed
await persistentTaskService.MarkTaskFailedAsync(taskId); await persistentTaskService.MarkTaskFailedAsync(taskId);
@@ -328,24 +295,41 @@ public class FileUploadController(
} }
} }
private async Task MergeChunks(string taskPath, string mergedFilePath, int chunksCount) private static async Task MergeChunks(
string taskId,
string taskPath,
string mergedFilePath,
int chunksCount,
PersistentTaskService persistentTaskService)
{ {
await using var mergedStream = new FileStream(mergedFilePath, FileMode.Create); await using var mergedStream = new FileStream(mergedFilePath, FileMode.Create);
const double baseProgress = 0.8; // Start from 80% (chunk upload is already at 95%)
const double remainingProgress = 0.15; // Remaining 15% progress distributed across chunks
var progressPerChunk = remainingProgress / chunksCount;
for (var i = 0; i < chunksCount; i++) for (var i = 0; i < chunksCount; i++)
{ {
var chunkPath = Path.Combine(taskPath, $"{i}.chunk"); var chunkPath = Path.Combine(taskPath, i + ".chunk");
if (!System.IO.File.Exists(chunkPath)) if (!System.IO.File.Exists(chunkPath))
{ {
throw new InvalidOperationException($"Chunk {i} is missing."); throw new InvalidOperationException("Chunk " + i + " is missing.");
} }
await using var chunkStream = new FileStream(chunkPath, FileMode.Open); await using var chunkStream = new FileStream(chunkPath, FileMode.Open);
await chunkStream.CopyToAsync(mergedStream); await chunkStream.CopyToAsync(mergedStream);
// Update progress after each chunk is merged
var currentProgress = baseProgress + (progressPerChunk * (i + 1));
await persistentTaskService.UpdateTaskProgressAsync(
taskId,
currentProgress,
"Merging chunks... (" + (i + 1) + "/" + chunksCount + ")"
);
} }
} }
private async Task CleanupTempFiles(string taskPath, string mergedFilePath) private static Task CleanupTempFiles(string taskPath, string mergedFilePath)
{ {
try try
{ {
@@ -359,6 +343,8 @@ public class FileUploadController(
{ {
// Ignore cleanup errors to avoid masking the original exception // Ignore cleanup errors to avoid masking the original exception
} }
return Task.CompletedTask;
} }
// New endpoints for resumable uploads // New endpoints for resumable uploads
@@ -395,7 +381,7 @@ public class FileUploadController(
t.LastActivity, t.LastActivity,
t.CreatedAt, t.CreatedAt,
t.UpdatedAt, t.UpdatedAt,
UploadedChunks = t.UploadedChunks, t.UploadedChunks,
Pool = new { t.PoolId, Name = "Pool Name" }, // Could be expanded to include pool details Pool = new { t.PoolId, Name = "Pool Name" }, // Could be expanded to include pool details
Bundle = t.BundleId.HasValue ? new { t.BundleId } : null Bundle = t.BundleId.HasValue ? new { t.BundleId } : null
})); }));
@@ -463,7 +449,7 @@ public class FileUploadController(
task.ChunkSize, task.ChunkSize,
task.ChunksCount, task.ChunksCount,
task.ChunksUploaded, task.ChunksUploaded,
UploadedChunks = task.UploadedChunks, task.UploadedChunks,
Progress = task.ChunksCount > 0 ? (double)task.ChunksUploaded / task.ChunksCount * 100 : 0 Progress = task.ChunksCount > 0 ? (double)task.ChunksUploaded / task.ChunksCount * 100 : 0
}); });
} }
@@ -505,14 +491,14 @@ public class FileUploadController(
return Ok(new return Ok(new
{ {
TotalTasks = stats.TotalTasks, stats.TotalTasks,
InProgressTasks = stats.InProgressTasks, stats.InProgressTasks,
CompletedTasks = stats.CompletedTasks, stats.CompletedTasks,
FailedTasks = stats.FailedTasks, stats.FailedTasks,
ExpiredTasks = stats.ExpiredTasks, stats.ExpiredTasks,
TotalUploadedBytes = stats.TotalUploadedBytes, stats.TotalUploadedBytes,
AverageProgress = stats.AverageProgress, stats.AverageProgress,
RecentActivity = stats.RecentActivity stats.RecentActivity
}); });
} }
@@ -591,7 +577,7 @@ public class FileUploadController(
task.UpdatedAt, task.UpdatedAt,
task.ExpiredAt, task.ExpiredAt,
task.Hash, task.Hash,
UploadedChunks = task.UploadedChunks task.UploadedChunks
}, },
Pool = pool != null ? new Pool = pool != null ? new
{ {
@@ -610,9 +596,9 @@ public class FileUploadController(
}); });
} }
private string? CalculateEstimatedTime(PersistentUploadTask task) private static string? CalculateEstimatedTime(PersistentUploadTask task)
{ {
if (task.Status != Model.TaskStatus.InProgress || task.ChunksUploaded == 0) if (task.Status != TaskStatus.InProgress || task.ChunksUploaded == 0)
return null; return null;
var elapsed = NodaTime.SystemClock.Instance.GetCurrentInstant() - task.CreatedAt; var elapsed = NodaTime.SystemClock.Instance.GetCurrentInstant() - task.CreatedAt;
@@ -625,27 +611,29 @@ public class FileUploadController(
var remainingSeconds = remainingChunks / chunksPerSecond; var remainingSeconds = remainingChunks / chunksPerSecond;
if (remainingSeconds < 60) return remainingSeconds switch
return $"{remainingSeconds:F0} seconds"; {
if (remainingSeconds < 3600) < 60 => $"{remainingSeconds:F0} seconds",
return $"{remainingSeconds / 60:F0} minutes"; < 3600 => $"{remainingSeconds / 60:F0} minutes",
return $"{remainingSeconds / 3600:F1} hours"; _ => $"{remainingSeconds / 3600:F1} hours"
};
} }
private string? CalculateUploadSpeed(PersistentUploadTask task) private static string? CalculateUploadSpeed(PersistentUploadTask task)
{ {
if (task.ChunksUploaded == 0) if (task.ChunksUploaded == 0)
return null; return null;
var elapsed = NodaTime.SystemClock.Instance.GetCurrentInstant() - task.CreatedAt; var elapsed = SystemClock.Instance.GetCurrentInstant() - task.CreatedAt;
var elapsedSeconds = elapsed.TotalSeconds; var elapsedSeconds = elapsed.TotalSeconds;
var bytesUploaded = (long)task.ChunksUploaded * task.ChunkSize; var bytesUploaded = task.ChunksUploaded * task.ChunkSize;
var bytesPerSecond = bytesUploaded / elapsedSeconds; var bytesPerSecond = bytesUploaded / elapsedSeconds;
if (bytesPerSecond < 1024) return bytesPerSecond switch
return $"{bytesPerSecond:F0} B/s"; {
if (bytesPerSecond < 1024 * 1024) < 1024 => $"{bytesPerSecond:F0} B/s",
return $"{bytesPerSecond / 1024:F0} KB/s"; < 1024 * 1024 => $"{bytesPerSecond / 1024:F0} KB/s",
return $"{bytesPerSecond / (1024 * 1024):F1} MB/s"; _ => $"{bytesPerSecond / (1024 * 1024):F1} MB/s"
};
} }
} }

View File

@@ -157,24 +157,6 @@ public class PersistentTask : ModelBase
// Estimated duration in seconds // Estimated duration in seconds
public long? EstimatedDurationSeconds { get; set; } public long? EstimatedDurationSeconds { get; set; }
// Helper methods for parameter management using GrpcTypeHelper
public MapField<string, Google.Protobuf.WellKnownTypes.Value> GetParametersAsGrpcMap()
{
var nonNullableParameters = Parameters.ToDictionary(kvp => kvp.Key, kvp => kvp.Value ?? string.Empty);
return GrpcTypeHelper.ConvertToValueMap(nonNullableParameters);
}
public void SetParametersFromGrpcMap(MapField<string, Google.Protobuf.WellKnownTypes.Value> map)
{
Parameters = GrpcTypeHelper.ConvertFromValueMap(map);
}
public MapField<string, Google.Protobuf.WellKnownTypes.Value> GetResultsAsGrpcMap()
{
var nonNullableResults = Results.ToDictionary(kvp => kvp.Key, kvp => kvp.Value ?? string.Empty);
return GrpcTypeHelper.ConvertToValueMap(nonNullableResults);
}
} }
// Backward compatibility - UploadTask inherits from PersistentTask // Backward compatibility - UploadTask inherits from PersistentTask

View File

@@ -58,7 +58,6 @@ public class PersistentTaskService(
if (task is not T typedTask) return null; if (task is not T typedTask) return null;
await SetCacheAsync(typedTask); await SetCacheAsync(typedTask);
return typedTask; return typedTask;
} }
/// <summary> /// <summary>
@@ -70,20 +69,35 @@ public class PersistentTaskService(
if (task is null) return; if (task is null) return;
var previousProgress = task.Progress; var previousProgress = task.Progress;
task.Progress = Math.Clamp(progress, 0, 100); var clampedProgress = Math.Clamp(progress, 0, 1.0);
task.LastActivity = SystemClock.Instance.GetCurrentInstant(); var now = SystemClock.Instance.GetCurrentInstant();
task.UpdatedAt = task.LastActivity;
if (statusMessage is not null) // Use ExecuteUpdateAsync for better performance - update only the fields we need
var updatedRows = await db.Tasks
.Where(t => t.TaskId == taskId)
.ExecuteUpdateAsync(setters => setters
.SetProperty(t => t.Progress, clampedProgress)
.SetProperty(t => t.LastActivity, now)
.SetProperty(t => t.UpdatedAt, now)
.SetProperty(t => t.Description, t => statusMessage ?? t.Description)
);
if (updatedRows > 0)
{ {
task.Description = statusMessage; // Update the cached task
task.Progress = clampedProgress;
task.LastActivity = now;
task.UpdatedAt = now;
if (statusMessage is not null)
{
task.Description = statusMessage;
}
await SetCacheAsync(task);
// Send progress update notification
await SendTaskProgressUpdateAsync(task, task.Progress, previousProgress);
} }
await db.SaveChangesAsync();
await SetCacheAsync(task);
// Send progress update notification
await SendTaskProgressUpdateAsync(task, task.Progress, previousProgress);
} }
/// <summary> /// <summary>
@@ -95,24 +109,38 @@ public class PersistentTaskService(
if (task is null) return; if (task is null) return;
var now = SystemClock.Instance.GetCurrentInstant(); var now = SystemClock.Instance.GetCurrentInstant();
task.Status = TaskStatus.Completed;
task.Progress = 100;
task.CompletedAt = now;
task.LastActivity = now;
task.UpdatedAt = now;
if (results is not null) // Use ExecuteUpdateAsync for better performance - update only the fields we need
var updatedRows = await db.Tasks
.Where(t => t.TaskId == taskId)
.ExecuteUpdateAsync(setters => setters
.SetProperty(t => t.Status, TaskStatus.Completed)
.SetProperty(t => t.Progress, 1.0)
.SetProperty(t => t.CompletedAt, now)
.SetProperty(t => t.LastActivity, now)
.SetProperty(t => t.UpdatedAt, now)
);
if (updatedRows > 0)
{ {
foreach (var (key, value) in results) // Update the cached task with results if provided
task.Status = TaskStatus.Completed;
task.Progress = 1.0;
task.CompletedAt = now;
task.LastActivity = now;
task.UpdatedAt = now;
if (results is not null)
{ {
task.Results[key] = value; foreach (var (key, value) in results)
{
task.Results[key] = value;
}
} }
await RemoveCacheAsync(taskId);
await SendTaskCompletedNotificationAsync(task);
} }
await db.SaveChangesAsync();
await RemoveCacheAsync(taskId);
await SendTaskCompletedNotificationAsync(task);
} }
/// <summary> /// <summary>
@@ -123,15 +151,30 @@ public class PersistentTaskService(
var task = await GetTaskAsync<PersistentTask>(taskId); var task = await GetTaskAsync<PersistentTask>(taskId);
if (task is null) return; if (task is null) return;
task.Status = TaskStatus.Failed; var now = SystemClock.Instance.GetCurrentInstant();
task.ErrorMessage = errorMessage ?? "Task failed due to an unknown error"; var errorMsg = errorMessage ?? "Task failed due to an unknown error";
task.LastActivity = SystemClock.Instance.GetCurrentInstant();
task.UpdatedAt = task.LastActivity;
await db.SaveChangesAsync(); // Use ExecuteUpdateAsync for better performance - update only the fields we need
await RemoveCacheAsync(taskId); var updatedRows = await db.Tasks
.Where(t => t.TaskId == taskId)
.ExecuteUpdateAsync(setters => setters
.SetProperty(t => t.Status, TaskStatus.Failed)
.SetProperty(t => t.ErrorMessage, errorMsg)
.SetProperty(t => t.LastActivity, now)
.SetProperty(t => t.UpdatedAt, now)
);
await SendTaskFailedNotificationAsync(task); if (updatedRows > 0)
{
// Update the cached task
task.Status = TaskStatus.Failed;
task.ErrorMessage = errorMsg;
task.LastActivity = now;
task.UpdatedAt = now;
await RemoveCacheAsync(taskId);
await SendTaskFailedNotificationAsync(task);
}
} }
/// <summary> /// <summary>
@@ -278,20 +321,20 @@ public class PersistentTaskService(
ExpiredTasks = tasks.Count(t => t.Status == TaskStatus.Expired), ExpiredTasks = tasks.Count(t => t.Status == TaskStatus.Expired),
AverageProgress = tasks.Any(t => t.Status == TaskStatus.InProgress || t.Status == TaskStatus.Paused) AverageProgress = tasks.Any(t => t.Status == TaskStatus.InProgress || t.Status == TaskStatus.Paused)
? tasks.Where(t => t.Status == TaskStatus.InProgress || t.Status == TaskStatus.Paused) ? tasks.Where(t => t.Status == TaskStatus.InProgress || t.Status == TaskStatus.Paused)
.Average(t => t.Progress) .Average(t => t.Progress)
: 0, : 0,
RecentActivity = tasks.OrderByDescending(t => t.LastActivity) RecentActivity = tasks.OrderByDescending(t => t.LastActivity)
.Take(10) .Take(10)
.Select(t => new TaskActivity .Select(t => new TaskActivity
{ {
TaskId = t.TaskId, TaskId = t.TaskId,
Name = t.Name, Name = t.Name,
Type = t.Type, Type = t.Type,
Status = t.Status, Status = t.Status,
Progress = t.Progress, Progress = t.Progress,
LastActivity = t.LastActivity LastActivity = t.LastActivity
}) })
.ToList() .ToList()
}; };
return stats; return stats;
@@ -311,11 +354,11 @@ public class PersistentTaskService(
var oldTasks = await db.Tasks var oldTasks = await db.Tasks
.Where(t => t.AccountId == accountId && .Where(t => t.AccountId == accountId &&
(t.Status == TaskStatus.Completed || (t.Status == TaskStatus.Completed ||
t.Status == TaskStatus.Failed || t.Status == TaskStatus.Failed ||
t.Status == TaskStatus.Cancelled || t.Status == TaskStatus.Cancelled ||
t.Status == TaskStatus.Expired) && t.Status == TaskStatus.Expired) &&
t.UpdatedAt < cutoff) t.UpdatedAt < cutoff)
.ToListAsync(); .ToListAsync();
db.Tasks.RemoveRange(oldTasks); db.Tasks.RemoveRange(oldTasks);
@@ -347,7 +390,7 @@ public class PersistentTaskService(
var packet = new WebSocketPacket var packet = new WebSocketPacket
{ {
Type = "task.created", Type = "task.created",
Data = Google.Protobuf.ByteString.CopyFromUtf8(System.Text.Json.JsonSerializer.Serialize(data)) Data = GrpcTypeHelper.ConvertObjectToByteString(data)
}; };
await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest
@@ -383,7 +426,7 @@ public class PersistentTaskService(
var packet = new WebSocketPacket var packet = new WebSocketPacket
{ {
Type = "task.progress", Type = "task.progress",
Data = Google.Protobuf.ByteString.CopyFromUtf8(System.Text.Json.JsonSerializer.Serialize(data)) Data = GrpcTypeHelper.ConvertObjectToByteString(data)
}; };
await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest
@@ -415,7 +458,7 @@ public class PersistentTaskService(
var wsPacket = new WebSocketPacket var wsPacket = new WebSocketPacket
{ {
Type = "task.completed", Type = "task.completed",
Data = Google.Protobuf.ByteString.CopyFromUtf8(System.Text.Json.JsonSerializer.Serialize(data)) Data = GrpcTypeHelper.ConvertObjectToByteString(data)
}; };
await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest
@@ -463,7 +506,7 @@ public class PersistentTaskService(
var wsPacket = new WebSocketPacket var wsPacket = new WebSocketPacket
{ {
Type = "task.failed", Type = "task.failed",
Data = Google.Protobuf.ByteString.CopyFromUtf8(System.Text.Json.JsonSerializer.Serialize(data)) Data = GrpcTypeHelper.ConvertObjectToByteString(data)
}; };
await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest
@@ -654,16 +697,31 @@ public class PersistentTaskService(
{ {
var previousProgress = task.ChunksCount > 0 ? (double)task.ChunksUploaded / task.ChunksCount * 100 : 0; var previousProgress = task.ChunksCount > 0 ? (double)task.ChunksUploaded / task.ChunksCount * 100 : 0;
task.UploadedChunks.Add(chunkIndex); // Use ExecuteUpdateAsync for better performance - update only the fields we need
task.ChunksUploaded = task.UploadedChunks.Count; var now = SystemClock.Instance.GetCurrentInstant();
task.LastActivity = SystemClock.Instance.GetCurrentInstant(); var updatedRows = await db.Tasks
.OfType<PersistentUploadTask>()
.Where(t => t.TaskId == taskId)
.ExecuteUpdateAsync(setters => setters
.SetProperty(t => t.UploadedChunks, t => t.UploadedChunks.Append(chunkIndex).Distinct().ToList())
.SetProperty(t => t.ChunksUploaded, t => t.UploadedChunks.Count)
.SetProperty(t => t.LastActivity, now)
.SetProperty(t => t.UpdatedAt, now)
);
await db.SaveChangesAsync(); if (updatedRows > 0)
await SetCacheAsync(task); {
// Update the cached task
task.UploadedChunks.Add(chunkIndex);
task.ChunksUploaded = task.UploadedChunks.Count;
task.LastActivity = now;
task.UpdatedAt = now;
await SetCacheAsync(task);
// Send real-time progress update // Send real-time progress update
var newProgress = task.ChunksCount > 0 ? (double)task.ChunksUploaded / task.ChunksCount * 100 : 0; var newProgress = task.ChunksCount > 0 ? (double)task.ChunksUploaded / task.ChunksCount * 100 : 0;
await SendUploadProgressUpdateAsync(task, newProgress, previousProgress); await SendUploadProgressUpdateAsync(task, newProgress, previousProgress);
}
} }
} }
@@ -771,19 +829,19 @@ public class PersistentTaskService(
TotalUploadedBytes = tasks.Sum(t => (long)t.ChunksUploaded * t.ChunkSize), TotalUploadedBytes = tasks.Sum(t => (long)t.ChunksUploaded * t.ChunkSize),
AverageProgress = tasks.Any(t => t.Status == Model.TaskStatus.InProgress) AverageProgress = tasks.Any(t => t.Status == Model.TaskStatus.InProgress)
? tasks.Where(t => t.Status == Model.TaskStatus.InProgress) ? tasks.Where(t => t.Status == Model.TaskStatus.InProgress)
.Average(t => t.ChunksCount > 0 ? (double)t.ChunksUploaded / t.ChunksCount * 100 : 0) .Average(t => t.ChunksCount > 0 ? (double)t.ChunksUploaded / t.ChunksCount * 100 : 0)
: 0, : 0,
RecentActivity = tasks.OrderByDescending(t => t.LastActivity) RecentActivity = tasks.OrderByDescending(t => t.LastActivity)
.Take(5) .Take(5)
.Select(t => new RecentActivity .Select(t => new RecentActivity
{ {
TaskId = t.TaskId, TaskId = t.TaskId,
FileName = t.FileName, FileName = t.FileName,
Status = (UploadTaskStatus)t.Status, Status = (UploadTaskStatus)t.Status,
LastActivity = t.LastActivity, LastActivity = t.LastActivity,
Progress = t.ChunksCount > 0 ? (double)t.ChunksUploaded / t.ChunksCount * 100 : 0 Progress = t.ChunksCount > 0 ? (double)t.ChunksUploaded / t.ChunksCount * 100 : 0
}) })
.ToList() .ToList()
}; };
return stats; return stats;
@@ -797,7 +855,7 @@ public class PersistentTaskService(
var failedTasks = await db.Tasks var failedTasks = await db.Tasks
.OfType<PersistentUploadTask>() .OfType<PersistentUploadTask>()
.Where(t => t.AccountId == accountId && .Where(t => t.AccountId == accountId &&
(t.Status == Model.TaskStatus.Failed || t.Status == Model.TaskStatus.Expired)) (t.Status == TaskStatus.Failed || t.Status == TaskStatus.Expired))
.ToListAsync(); .ToListAsync();
foreach (var task in failedTasks) foreach (var task in failedTasks)
@@ -806,16 +864,14 @@ public class PersistentTaskService(
// Clean up temp files // Clean up temp files
var taskPath = Path.Combine(Path.GetTempPath(), "multipart-uploads", task.TaskId); var taskPath = Path.Combine(Path.GetTempPath(), "multipart-uploads", task.TaskId);
if (Directory.Exists(taskPath)) if (!Directory.Exists(taskPath)) continue;
try
{ {
try Directory.Delete(taskPath, true);
{ }
Directory.Delete(taskPath, true); catch (Exception ex)
} {
catch (Exception ex) logger.LogWarning(ex, "Failed to cleanup temp files for task {TaskId}", task.TaskId);
{
logger.LogWarning(ex, "Failed to cleanup temp files for task {TaskId}", task.TaskId);
}
} }
} }
@@ -858,7 +914,7 @@ public class PersistentTaskService(
var wsPacket = new WebSocketPacket var wsPacket = new WebSocketPacket
{ {
Type = "upload.completed", Type = "upload.completed",
Data = Google.Protobuf.ByteString.CopyFromUtf8(System.Text.Json.JsonSerializer.Serialize(completionData)) Data = GrpcTypeHelper.ConvertObjectToByteString(completionData)
}; };
await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest
@@ -909,7 +965,7 @@ public class PersistentTaskService(
var wsPacket = new WebSocketPacket var wsPacket = new WebSocketPacket
{ {
Type = "upload.failed", Type = "upload.failed",
Data = Google.Protobuf.ByteString.CopyFromUtf8(System.Text.Json.JsonSerializer.Serialize(failureData)) Data = GrpcTypeHelper.ConvertObjectToByteString(failureData)
}; };
await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest
@@ -943,7 +999,8 @@ public class PersistentTaskService(
/// <summary> /// <summary>
/// Sends real-time upload progress update via WebSocket /// Sends real-time upload progress update via WebSocket
/// </summary> /// </summary>
private async Task SendUploadProgressUpdateAsync(PersistentUploadTask task, double newProgress, double previousProgress) private async Task SendUploadProgressUpdateAsync(PersistentUploadTask task, double newProgress,
double previousProgress)
{ {
try try
{ {
@@ -966,7 +1023,7 @@ public class PersistentTaskService(
var packet = new WebSocketPacket var packet = new WebSocketPacket
{ {
Type = "upload.progress", Type = "upload.progress",
Data = Google.Protobuf.ByteString.CopyFromUtf8(System.Text.Json.JsonSerializer.Serialize(progressData)) Data = GrpcTypeHelper.ConvertObjectToByteString(progressData)
}; };
await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest

View File

@@ -788,7 +788,7 @@ public class PersistentTaskService(
### Real-Time Task Notifications ### Real-Time Task Notifications
All task operations send WebSocket notifications via RingService: All task operations send WebSocket notifications via RingService using the shared `GrpcTypeHelper` for consistent JSON serialization:
#### Task Created #### Task Created
```json ```json