diff --git a/DysonNetwork.Drive/Startup/BroadcastEventHandler.cs b/DysonNetwork.Drive/Startup/BroadcastEventHandler.cs index 3f6094a..d1f83f7 100644 --- a/DysonNetwork.Drive/Startup/BroadcastEventHandler.cs +++ b/DysonNetwork.Drive/Startup/BroadcastEventHandler.cs @@ -31,16 +31,15 @@ public class BroadcastEventHandler( [".gif", ".apng", ".avif"]; - protected override async Task ExecuteAsync(CancellationToken stoppingToken) { var js = nats.CreateJetStreamContext(); await js.EnsureStreamCreated("account_events", [AccountDeletedEvent.Type]); - + var accountEventConsumer = await js.CreateOrUpdateConsumerAsync("account_events", new ConsumerConfig("drive_account_deleted_handler"), cancellationToken: stoppingToken); - + await js.EnsureStreamCreated("file_events", [FileUploadedEvent.Type]); var fileUploadedConsumer = await js.CreateOrUpdateConsumerAsync("file_events", new ConsumerConfig("drive_file_uploaded_handler") { MaxDeliver = 3 }, cancellationToken: stoppingToken); @@ -55,13 +54,14 @@ public class BroadcastEventHandler( { await foreach (var msg in consumer.ConsumeAsync(cancellationToken: stoppingToken)) { - var payload = JsonSerializer.Deserialize(msg.Data, GrpcTypeHelper.SerializerOptions); + var payload = + JsonSerializer.Deserialize(msg.Data, GrpcTypeHelper.SerializerOptions); if (payload == null) { await msg.AckAsync(cancellationToken: stoppingToken); continue; } - + try { await ProcessAndUploadInBackgroundAsync( @@ -131,8 +131,8 @@ public class BroadcastEventHandler( } } } - - private async Task ProcessAndUploadInBackgroundAsync( + + private async Task ProcessAndUploadInBackgroundAsync( string fileId, Guid remoteId, string storageId, @@ -307,19 +307,18 @@ public class BroadcastEventHandler( { await persistentTaskService.MarkTaskCompletedAsync(uploadTask.TaskId, new Dictionary { - { "fileId", fileId }, - { "fileName", fileToUpdate.Name }, - { "fileSize", fileToUpdate.Size }, - { "mimeType", newMimeType }, - { "hasCompression", hasCompression }, - { "hasThumbnail", hasThumbnail } + { "FileId", fileId }, + { "FileName", fileToUpdate.Name }, + { "FileInfo", fileToUpdate }, + { "FileSize", fileToUpdate.Size }, + { "MimeType", newMimeType }, + { "HasCompression", hasCompression }, + { "HasThumbnail", hasThumbnail } }); // Send push notification for large files (>5MB) that took longer to process if (fileToUpdate.Size > 5 * 1024 * 1024) // 5MB threshold - { await SendLargeFileProcessingCompleteNotificationAsync(uploadTask, fileToUpdate); - } } } @@ -328,7 +327,7 @@ public class BroadcastEventHandler( try { var ringService = serviceProvider.GetRequiredService(); - + var pushNotification = new PushNotification { Topic = "drive.tasks.upload", @@ -349,4 +348,4 @@ public class BroadcastEventHandler( logger.LogWarning(ex, "Failed to send large file processing notification for task {TaskId}", task.TaskId); } } -} +} \ No newline at end of file diff --git a/DysonNetwork.Drive/Storage/FileUploadController.cs b/DysonNetwork.Drive/Storage/FileUploadController.cs index 469c09a..2c5c10c 100644 --- a/DysonNetwork.Drive/Storage/FileUploadController.cs +++ b/DysonNetwork.Drive/Storage/FileUploadController.cs @@ -1,5 +1,4 @@ using System.ComponentModel.DataAnnotations; -using System.Text.Json; using DysonNetwork.Drive.Billing; using DysonNetwork.Drive.Storage.Model; using DysonNetwork.Shared.Auth; @@ -31,15 +30,13 @@ public class FileUploadController( { private readonly string _tempPath = configuration.GetValue("Storage:Uploads") ?? Path.Combine(Path.GetTempPath(), "multipart-uploads"); - private readonly ILogger _logger = logger; private const long DefaultChunkSize = 1024 * 1024 * 5; // 5MB [HttpPost("create")] public async Task CreateUploadTask([FromBody] CreateUploadTaskRequest request) { - var currentUser = HttpContext.Items["CurrentUser"] as Account; - if (currentUser is null) + if (HttpContext.Items["CurrentUser"] is not Account currentUser) return new ObjectResult(ApiError.Unauthorized()) { StatusCode = 401 }; var permissionCheck = await ValidateUserPermissions(currentUser); @@ -99,25 +96,25 @@ public class FileUploadController( new ObjectResult(ApiError.Unauthorized(forbidden: true)) { StatusCode = 403 }; } - private async Task ValidatePoolAccess(Account currentUser, FilePool pool, CreateUploadTaskRequest request) + private Task ValidatePoolAccess(Account currentUser, FilePool pool, CreateUploadTaskRequest request) { - if (pool.PolicyConfig.RequirePrivilege <= 0) return null; + if (pool.PolicyConfig.RequirePrivilege <= 0) return Task.FromResult(null); var privilege = currentUser.PerkSubscription is null ? 0 : PerkSubscriptionPrivilege.GetPrivilegeFromIdentifier(currentUser.PerkSubscription.Identifier); if (privilege < pool.PolicyConfig.RequirePrivilege) { - return new ObjectResult(ApiError.Unauthorized( - $"You need Stellar Program tier {pool.PolicyConfig.RequirePrivilege} to use pool {pool.Name}, you are tier {privilege}", - forbidden: true)) - { StatusCode = 403 }; + return Task.FromResult(new ObjectResult(ApiError.Unauthorized( + $"You need Stellar Program tier {pool.PolicyConfig.RequirePrivilege} to use pool {pool.Name}, you are tier {privilege}", + forbidden: true)) + { StatusCode = 403 }); } - return null; + return Task.FromResult(null); } - private IActionResult? ValidatePoolPolicy(PolicyConfig policy, CreateUploadTaskRequest request) + private static IActionResult? ValidatePoolPolicy(PolicyConfig policy, CreateUploadTaskRequest request) { if (!policy.AllowEncryption && !string.IsNullOrEmpty(request.EncryptPassword)) { @@ -138,13 +135,11 @@ public class FileUploadController( var foundMatch = policy.AcceptTypes.Any(acceptType => { - if (acceptType.EndsWith("/*", StringComparison.OrdinalIgnoreCase)) - { - var type = acceptType[..^2]; - return request.ContentType.StartsWith($"{type}/", StringComparison.OrdinalIgnoreCase); - } + if (!acceptType.EndsWith("/*", StringComparison.OrdinalIgnoreCase)) + return acceptType.Equals(request.ContentType, StringComparison.OrdinalIgnoreCase); + var type = acceptType[..^2]; + return request.ContentType.StartsWith($"{type}/", StringComparison.OrdinalIgnoreCase); - return acceptType.Equals(request.ContentType, StringComparison.OrdinalIgnoreCase); }); if (!foundMatch) @@ -191,41 +186,13 @@ public class FileUploadController( } } - private async Task<(string taskId, UploadTask task)> CreateUploadTaskInternal(CreateUploadTaskRequest request) - { - var taskId = await Nanoid.GenerateAsync(); - var taskPath = Path.Combine(_tempPath, taskId); - Directory.CreateDirectory(taskPath); - - var chunkSize = request.ChunkSize ?? DefaultChunkSize; - var chunksCount = (int)Math.Ceiling((double)request.FileSize / chunkSize); - - var task = new UploadTask - { - TaskId = taskId, - FileName = request.FileName, - FileSize = request.FileSize, - ContentType = request.ContentType, - ChunkSize = chunkSize, - ChunksCount = chunksCount, - PoolId = request.PoolId.Value, - BundleId = request.BundleId, - EncryptPassword = request.EncryptPassword, - ExpiredAt = request.ExpiredAt, - Hash = request.Hash, - }; - - await System.IO.File.WriteAllTextAsync(Path.Combine(taskPath, "task.json"), JsonSerializer.Serialize(task)); - return (taskId, task); - } - public class UploadChunkRequest { [Required] public IFormFile Chunk { get; set; } = null!; } - [HttpPost("chunk/{taskId}/{chunkIndex}")] + [HttpPost("chunk/{taskId}/{chunkIndex:int}")] [RequestSizeLimit(DefaultChunkSize + 1024 * 1024)] // 6MB to be safe [RequestFormLimits(MultipartBodyLengthLimit = DefaultChunkSize + 1024 * 1024)] public async Task UploadChunk(string taskId, int chunkIndex, [FromForm] UploadChunkRequest request) @@ -278,7 +245,7 @@ public class FileUploadController( try { - await MergeChunks(taskPath, mergedFilePath, persistentTask.ChunksCount); + await MergeChunks(taskId, taskPath, mergedFilePath, persistentTask.ChunksCount, persistentTaskService); var fileId = await Nanoid.GenerateAsync(); var cloudFile = await fileService.ProcessNewFileAsync( @@ -304,7 +271,7 @@ public class FileUploadController( catch (Exception ex) { // Log the actual exception for debugging - _logger.LogError(ex, "Failed to complete upload for task {TaskId}. Error: {ErrorMessage}", taskId, ex.Message); + logger.LogError(ex, "Failed to complete upload for task {TaskId}. Error: {ErrorMessage}", taskId, ex.Message); // Mark task as failed await persistentTaskService.MarkTaskFailedAsync(taskId); @@ -328,24 +295,41 @@ public class FileUploadController( } } - private async Task MergeChunks(string taskPath, string mergedFilePath, int chunksCount) + private static async Task MergeChunks( + string taskId, + string taskPath, + string mergedFilePath, + int chunksCount, + PersistentTaskService persistentTaskService) { await using var mergedStream = new FileStream(mergedFilePath, FileMode.Create); + const double baseProgress = 0.8; // Start from 80% (chunk upload is already at 95%) + const double remainingProgress = 0.15; // Remaining 15% progress distributed across chunks + var progressPerChunk = remainingProgress / chunksCount; + for (var i = 0; i < chunksCount; i++) { - var chunkPath = Path.Combine(taskPath, $"{i}.chunk"); + var chunkPath = Path.Combine(taskPath, i + ".chunk"); if (!System.IO.File.Exists(chunkPath)) { - throw new InvalidOperationException($"Chunk {i} is missing."); + throw new InvalidOperationException("Chunk " + i + " is missing."); } await using var chunkStream = new FileStream(chunkPath, FileMode.Open); await chunkStream.CopyToAsync(mergedStream); + + // Update progress after each chunk is merged + var currentProgress = baseProgress + (progressPerChunk * (i + 1)); + await persistentTaskService.UpdateTaskProgressAsync( + taskId, + currentProgress, + "Merging chunks... (" + (i + 1) + "/" + chunksCount + ")" + ); } } - private async Task CleanupTempFiles(string taskPath, string mergedFilePath) + private static Task CleanupTempFiles(string taskPath, string mergedFilePath) { try { @@ -359,6 +343,8 @@ public class FileUploadController( { // Ignore cleanup errors to avoid masking the original exception } + + return Task.CompletedTask; } // New endpoints for resumable uploads @@ -395,7 +381,7 @@ public class FileUploadController( t.LastActivity, t.CreatedAt, t.UpdatedAt, - UploadedChunks = t.UploadedChunks, + t.UploadedChunks, Pool = new { t.PoolId, Name = "Pool Name" }, // Could be expanded to include pool details Bundle = t.BundleId.HasValue ? new { t.BundleId } : null })); @@ -463,7 +449,7 @@ public class FileUploadController( task.ChunkSize, task.ChunksCount, task.ChunksUploaded, - UploadedChunks = task.UploadedChunks, + task.UploadedChunks, Progress = task.ChunksCount > 0 ? (double)task.ChunksUploaded / task.ChunksCount * 100 : 0 }); } @@ -505,14 +491,14 @@ public class FileUploadController( return Ok(new { - TotalTasks = stats.TotalTasks, - InProgressTasks = stats.InProgressTasks, - CompletedTasks = stats.CompletedTasks, - FailedTasks = stats.FailedTasks, - ExpiredTasks = stats.ExpiredTasks, - TotalUploadedBytes = stats.TotalUploadedBytes, - AverageProgress = stats.AverageProgress, - RecentActivity = stats.RecentActivity + stats.TotalTasks, + stats.InProgressTasks, + stats.CompletedTasks, + stats.FailedTasks, + stats.ExpiredTasks, + stats.TotalUploadedBytes, + stats.AverageProgress, + stats.RecentActivity }); } @@ -591,7 +577,7 @@ public class FileUploadController( task.UpdatedAt, task.ExpiredAt, task.Hash, - UploadedChunks = task.UploadedChunks + task.UploadedChunks }, Pool = pool != null ? new { @@ -610,9 +596,9 @@ public class FileUploadController( }); } - private string? CalculateEstimatedTime(PersistentUploadTask task) + private static string? CalculateEstimatedTime(PersistentUploadTask task) { - if (task.Status != Model.TaskStatus.InProgress || task.ChunksUploaded == 0) + if (task.Status != TaskStatus.InProgress || task.ChunksUploaded == 0) return null; var elapsed = NodaTime.SystemClock.Instance.GetCurrentInstant() - task.CreatedAt; @@ -625,27 +611,29 @@ public class FileUploadController( var remainingSeconds = remainingChunks / chunksPerSecond; - if (remainingSeconds < 60) - return $"{remainingSeconds:F0} seconds"; - if (remainingSeconds < 3600) - return $"{remainingSeconds / 60:F0} minutes"; - return $"{remainingSeconds / 3600:F1} hours"; + return remainingSeconds switch + { + < 60 => $"{remainingSeconds:F0} seconds", + < 3600 => $"{remainingSeconds / 60:F0} minutes", + _ => $"{remainingSeconds / 3600:F1} hours" + }; } - private string? CalculateUploadSpeed(PersistentUploadTask task) + private static string? CalculateUploadSpeed(PersistentUploadTask task) { if (task.ChunksUploaded == 0) return null; - var elapsed = NodaTime.SystemClock.Instance.GetCurrentInstant() - task.CreatedAt; + var elapsed = SystemClock.Instance.GetCurrentInstant() - task.CreatedAt; var elapsedSeconds = elapsed.TotalSeconds; - var bytesUploaded = (long)task.ChunksUploaded * task.ChunkSize; + var bytesUploaded = task.ChunksUploaded * task.ChunkSize; var bytesPerSecond = bytesUploaded / elapsedSeconds; - if (bytesPerSecond < 1024) - return $"{bytesPerSecond:F0} B/s"; - if (bytesPerSecond < 1024 * 1024) - return $"{bytesPerSecond / 1024:F0} KB/s"; - return $"{bytesPerSecond / (1024 * 1024):F1} MB/s"; + return bytesPerSecond switch + { + < 1024 => $"{bytesPerSecond:F0} B/s", + < 1024 * 1024 => $"{bytesPerSecond / 1024:F0} KB/s", + _ => $"{bytesPerSecond / (1024 * 1024):F1} MB/s" + }; } } diff --git a/DysonNetwork.Drive/Storage/Model/FileUploadModels.cs b/DysonNetwork.Drive/Storage/Model/FileUploadModels.cs index 6d7da2e..d3c932e 100644 --- a/DysonNetwork.Drive/Storage/Model/FileUploadModels.cs +++ b/DysonNetwork.Drive/Storage/Model/FileUploadModels.cs @@ -157,24 +157,6 @@ public class PersistentTask : ModelBase // Estimated duration in seconds public long? EstimatedDurationSeconds { get; set; } - - // Helper methods for parameter management using GrpcTypeHelper - public MapField GetParametersAsGrpcMap() - { - var nonNullableParameters = Parameters.ToDictionary(kvp => kvp.Key, kvp => kvp.Value ?? string.Empty); - return GrpcTypeHelper.ConvertToValueMap(nonNullableParameters); - } - - public void SetParametersFromGrpcMap(MapField map) - { - Parameters = GrpcTypeHelper.ConvertFromValueMap(map); - } - - public MapField GetResultsAsGrpcMap() - { - var nonNullableResults = Results.ToDictionary(kvp => kvp.Key, kvp => kvp.Value ?? string.Empty); - return GrpcTypeHelper.ConvertToValueMap(nonNullableResults); - } } // Backward compatibility - UploadTask inherits from PersistentTask diff --git a/DysonNetwork.Drive/Storage/PersistentTaskService.cs b/DysonNetwork.Drive/Storage/PersistentTaskService.cs index 4dcd22c..1e9a349 100644 --- a/DysonNetwork.Drive/Storage/PersistentTaskService.cs +++ b/DysonNetwork.Drive/Storage/PersistentTaskService.cs @@ -58,7 +58,6 @@ public class PersistentTaskService( if (task is not T typedTask) return null; await SetCacheAsync(typedTask); return typedTask; - } /// @@ -70,20 +69,35 @@ public class PersistentTaskService( if (task is null) return; var previousProgress = task.Progress; - task.Progress = Math.Clamp(progress, 0, 100); - task.LastActivity = SystemClock.Instance.GetCurrentInstant(); - task.UpdatedAt = task.LastActivity; + var clampedProgress = Math.Clamp(progress, 0, 1.0); + var now = SystemClock.Instance.GetCurrentInstant(); - if (statusMessage is not null) + // Use ExecuteUpdateAsync for better performance - update only the fields we need + var updatedRows = await db.Tasks + .Where(t => t.TaskId == taskId) + .ExecuteUpdateAsync(setters => setters + .SetProperty(t => t.Progress, clampedProgress) + .SetProperty(t => t.LastActivity, now) + .SetProperty(t => t.UpdatedAt, now) + .SetProperty(t => t.Description, t => statusMessage ?? t.Description) + ); + + if (updatedRows > 0) { - task.Description = statusMessage; + // Update the cached task + task.Progress = clampedProgress; + task.LastActivity = now; + task.UpdatedAt = now; + if (statusMessage is not null) + { + task.Description = statusMessage; + } + + await SetCacheAsync(task); + + // Send progress update notification + await SendTaskProgressUpdateAsync(task, task.Progress, previousProgress); } - - await db.SaveChangesAsync(); - await SetCacheAsync(task); - - // Send progress update notification - await SendTaskProgressUpdateAsync(task, task.Progress, previousProgress); } /// @@ -95,24 +109,38 @@ public class PersistentTaskService( if (task is null) return; var now = SystemClock.Instance.GetCurrentInstant(); - task.Status = TaskStatus.Completed; - task.Progress = 100; - task.CompletedAt = now; - task.LastActivity = now; - task.UpdatedAt = now; - if (results is not null) + // Use ExecuteUpdateAsync for better performance - update only the fields we need + var updatedRows = await db.Tasks + .Where(t => t.TaskId == taskId) + .ExecuteUpdateAsync(setters => setters + .SetProperty(t => t.Status, TaskStatus.Completed) + .SetProperty(t => t.Progress, 1.0) + .SetProperty(t => t.CompletedAt, now) + .SetProperty(t => t.LastActivity, now) + .SetProperty(t => t.UpdatedAt, now) + ); + + if (updatedRows > 0) { - foreach (var (key, value) in results) + // Update the cached task with results if provided + task.Status = TaskStatus.Completed; + task.Progress = 1.0; + task.CompletedAt = now; + task.LastActivity = now; + task.UpdatedAt = now; + + if (results is not null) { - task.Results[key] = value; + foreach (var (key, value) in results) + { + task.Results[key] = value; + } } + + await RemoveCacheAsync(taskId); + await SendTaskCompletedNotificationAsync(task); } - - await db.SaveChangesAsync(); - await RemoveCacheAsync(taskId); - - await SendTaskCompletedNotificationAsync(task); } /// @@ -123,15 +151,30 @@ public class PersistentTaskService( var task = await GetTaskAsync(taskId); if (task is null) return; - task.Status = TaskStatus.Failed; - task.ErrorMessage = errorMessage ?? "Task failed due to an unknown error"; - task.LastActivity = SystemClock.Instance.GetCurrentInstant(); - task.UpdatedAt = task.LastActivity; + var now = SystemClock.Instance.GetCurrentInstant(); + var errorMsg = errorMessage ?? "Task failed due to an unknown error"; - await db.SaveChangesAsync(); - await RemoveCacheAsync(taskId); + // Use ExecuteUpdateAsync for better performance - update only the fields we need + var updatedRows = await db.Tasks + .Where(t => t.TaskId == taskId) + .ExecuteUpdateAsync(setters => setters + .SetProperty(t => t.Status, TaskStatus.Failed) + .SetProperty(t => t.ErrorMessage, errorMsg) + .SetProperty(t => t.LastActivity, now) + .SetProperty(t => t.UpdatedAt, now) + ); - await SendTaskFailedNotificationAsync(task); + if (updatedRows > 0) + { + // Update the cached task + task.Status = TaskStatus.Failed; + task.ErrorMessage = errorMsg; + task.LastActivity = now; + task.UpdatedAt = now; + + await RemoveCacheAsync(taskId); + await SendTaskFailedNotificationAsync(task); + } } /// @@ -278,20 +321,20 @@ public class PersistentTaskService( ExpiredTasks = tasks.Count(t => t.Status == TaskStatus.Expired), AverageProgress = tasks.Any(t => t.Status == TaskStatus.InProgress || t.Status == TaskStatus.Paused) ? tasks.Where(t => t.Status == TaskStatus.InProgress || t.Status == TaskStatus.Paused) - .Average(t => t.Progress) + .Average(t => t.Progress) : 0, RecentActivity = tasks.OrderByDescending(t => t.LastActivity) - .Take(10) - .Select(t => new TaskActivity - { - TaskId = t.TaskId, - Name = t.Name, - Type = t.Type, - Status = t.Status, - Progress = t.Progress, - LastActivity = t.LastActivity - }) - .ToList() + .Take(10) + .Select(t => new TaskActivity + { + TaskId = t.TaskId, + Name = t.Name, + Type = t.Type, + Status = t.Status, + Progress = t.Progress, + LastActivity = t.LastActivity + }) + .ToList() }; return stats; @@ -311,11 +354,11 @@ public class PersistentTaskService( var oldTasks = await db.Tasks .Where(t => t.AccountId == accountId && - (t.Status == TaskStatus.Completed || - t.Status == TaskStatus.Failed || - t.Status == TaskStatus.Cancelled || - t.Status == TaskStatus.Expired) && - t.UpdatedAt < cutoff) + (t.Status == TaskStatus.Completed || + t.Status == TaskStatus.Failed || + t.Status == TaskStatus.Cancelled || + t.Status == TaskStatus.Expired) && + t.UpdatedAt < cutoff) .ToListAsync(); db.Tasks.RemoveRange(oldTasks); @@ -347,7 +390,7 @@ public class PersistentTaskService( var packet = new WebSocketPacket { Type = "task.created", - Data = Google.Protobuf.ByteString.CopyFromUtf8(System.Text.Json.JsonSerializer.Serialize(data)) + Data = GrpcTypeHelper.ConvertObjectToByteString(data) }; await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest @@ -383,7 +426,7 @@ public class PersistentTaskService( var packet = new WebSocketPacket { Type = "task.progress", - Data = Google.Protobuf.ByteString.CopyFromUtf8(System.Text.Json.JsonSerializer.Serialize(data)) + Data = GrpcTypeHelper.ConvertObjectToByteString(data) }; await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest @@ -415,7 +458,7 @@ public class PersistentTaskService( var wsPacket = new WebSocketPacket { Type = "task.completed", - Data = Google.Protobuf.ByteString.CopyFromUtf8(System.Text.Json.JsonSerializer.Serialize(data)) + Data = GrpcTypeHelper.ConvertObjectToByteString(data) }; await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest @@ -463,7 +506,7 @@ public class PersistentTaskService( var wsPacket = new WebSocketPacket { Type = "task.failed", - Data = Google.Protobuf.ByteString.CopyFromUtf8(System.Text.Json.JsonSerializer.Serialize(data)) + Data = GrpcTypeHelper.ConvertObjectToByteString(data) }; await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest @@ -501,7 +544,7 @@ public class PersistentTaskService( private async Task SetCacheAsync(PersistentTask task) { var cacheKey = $"{CacheKeyPrefix}{task.TaskId}"; - + // Cache the entire task object directly - this includes all properties including Parameters dictionary await cache.SetAsync(cacheKey, task, CacheDuration); } @@ -534,7 +577,7 @@ public class PersistentTaskService( // If no pools exist, create a default one logger.LogWarning("No pools found in database. Creating default pool..."); - + var defaultPoolId = Guid.NewGuid(); var defaultPool = new DysonNetwork.Shared.Models.FilePool { @@ -590,7 +633,7 @@ public class PersistentTaskService( { var chunkSize = request.ChunkSize ?? 1024 * 1024 * 5; // 5MB default var chunksCount = (int)Math.Ceiling((double)request.FileSize / chunkSize); - + // Use default pool if no pool is specified, or find first available pool var poolId = request.PoolId ?? await GetFirstAvailablePoolIdAsync(); @@ -654,16 +697,31 @@ public class PersistentTaskService( { var previousProgress = task.ChunksCount > 0 ? (double)task.ChunksUploaded / task.ChunksCount * 100 : 0; - task.UploadedChunks.Add(chunkIndex); - task.ChunksUploaded = task.UploadedChunks.Count; - task.LastActivity = SystemClock.Instance.GetCurrentInstant(); + // Use ExecuteUpdateAsync for better performance - update only the fields we need + var now = SystemClock.Instance.GetCurrentInstant(); + var updatedRows = await db.Tasks + .OfType() + .Where(t => t.TaskId == taskId) + .ExecuteUpdateAsync(setters => setters + .SetProperty(t => t.UploadedChunks, t => t.UploadedChunks.Append(chunkIndex).Distinct().ToList()) + .SetProperty(t => t.ChunksUploaded, t => t.UploadedChunks.Count) + .SetProperty(t => t.LastActivity, now) + .SetProperty(t => t.UpdatedAt, now) + ); - await db.SaveChangesAsync(); - await SetCacheAsync(task); + if (updatedRows > 0) + { + // Update the cached task + task.UploadedChunks.Add(chunkIndex); + task.ChunksUploaded = task.UploadedChunks.Count; + task.LastActivity = now; + task.UpdatedAt = now; + await SetCacheAsync(task); - // Send real-time progress update - var newProgress = task.ChunksCount > 0 ? (double)task.ChunksUploaded / task.ChunksCount * 100 : 0; - await SendUploadProgressUpdateAsync(task, newProgress, previousProgress); + // Send real-time progress update + var newProgress = task.ChunksCount > 0 ? (double)task.ChunksUploaded / task.ChunksCount * 100 : 0; + await SendUploadProgressUpdateAsync(task, newProgress, previousProgress); + } } } @@ -771,19 +829,19 @@ public class PersistentTaskService( TotalUploadedBytes = tasks.Sum(t => (long)t.ChunksUploaded * t.ChunkSize), AverageProgress = tasks.Any(t => t.Status == Model.TaskStatus.InProgress) ? tasks.Where(t => t.Status == Model.TaskStatus.InProgress) - .Average(t => t.ChunksCount > 0 ? (double)t.ChunksUploaded / t.ChunksCount * 100 : 0) + .Average(t => t.ChunksCount > 0 ? (double)t.ChunksUploaded / t.ChunksCount * 100 : 0) : 0, RecentActivity = tasks.OrderByDescending(t => t.LastActivity) - .Take(5) - .Select(t => new RecentActivity - { - TaskId = t.TaskId, - FileName = t.FileName, - Status = (UploadTaskStatus)t.Status, - LastActivity = t.LastActivity, - Progress = t.ChunksCount > 0 ? (double)t.ChunksUploaded / t.ChunksCount * 100 : 0 - }) - .ToList() + .Take(5) + .Select(t => new RecentActivity + { + TaskId = t.TaskId, + FileName = t.FileName, + Status = (UploadTaskStatus)t.Status, + LastActivity = t.LastActivity, + Progress = t.ChunksCount > 0 ? (double)t.ChunksUploaded / t.ChunksCount * 100 : 0 + }) + .ToList() }; return stats; @@ -797,7 +855,7 @@ public class PersistentTaskService( var failedTasks = await db.Tasks .OfType() .Where(t => t.AccountId == accountId && - (t.Status == Model.TaskStatus.Failed || t.Status == Model.TaskStatus.Expired)) + (t.Status == TaskStatus.Failed || t.Status == TaskStatus.Expired)) .ToListAsync(); foreach (var task in failedTasks) @@ -806,16 +864,14 @@ public class PersistentTaskService( // Clean up temp files var taskPath = Path.Combine(Path.GetTempPath(), "multipart-uploads", task.TaskId); - if (Directory.Exists(taskPath)) + if (!Directory.Exists(taskPath)) continue; + try { - try - { - Directory.Delete(taskPath, true); - } - catch (Exception ex) - { - logger.LogWarning(ex, "Failed to cleanup temp files for task {TaskId}", task.TaskId); - } + Directory.Delete(taskPath, true); + } + catch (Exception ex) + { + logger.LogWarning(ex, "Failed to cleanup temp files for task {TaskId}", task.TaskId); } } @@ -858,7 +914,7 @@ public class PersistentTaskService( var wsPacket = new WebSocketPacket { Type = "upload.completed", - Data = Google.Protobuf.ByteString.CopyFromUtf8(System.Text.Json.JsonSerializer.Serialize(completionData)) + Data = GrpcTypeHelper.ConvertObjectToByteString(completionData) }; await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest @@ -909,7 +965,7 @@ public class PersistentTaskService( var wsPacket = new WebSocketPacket { Type = "upload.failed", - Data = Google.Protobuf.ByteString.CopyFromUtf8(System.Text.Json.JsonSerializer.Serialize(failureData)) + Data = GrpcTypeHelper.ConvertObjectToByteString(failureData) }; await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest @@ -943,7 +999,8 @@ public class PersistentTaskService( /// /// Sends real-time upload progress update via WebSocket /// - private async Task SendUploadProgressUpdateAsync(PersistentUploadTask task, double newProgress, double previousProgress) + private async Task SendUploadProgressUpdateAsync(PersistentUploadTask task, double newProgress, + double previousProgress) { try { @@ -966,7 +1023,7 @@ public class PersistentTaskService( var packet = new WebSocketPacket { Type = "upload.progress", - Data = Google.Protobuf.ByteString.CopyFromUtf8(System.Text.Json.JsonSerializer.Serialize(progressData)) + Data = GrpcTypeHelper.ConvertObjectToByteString(progressData) }; await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest @@ -1101,4 +1158,4 @@ public class RecentActivity public double Progress { get; set; } } -#endregion +#endregion \ No newline at end of file diff --git a/DysonNetwork.Drive/Storage/README.md b/DysonNetwork.Drive/Storage/README.md index a36421d..79140a5 100644 --- a/DysonNetwork.Drive/Storage/README.md +++ b/DysonNetwork.Drive/Storage/README.md @@ -788,7 +788,7 @@ public class PersistentTaskService( ### Real-Time Task Notifications -All task operations send WebSocket notifications via RingService: +All task operations send WebSocket notifications via RingService using the shared `GrpcTypeHelper` for consistent JSON serialization: #### Task Created ```json