diff --git a/DysonNetwork.Drive/AppDatabase.cs b/DysonNetwork.Drive/AppDatabase.cs index cc091d4..e3dbfd4 100644 --- a/DysonNetwork.Drive/AppDatabase.cs +++ b/DysonNetwork.Drive/AppDatabase.cs @@ -9,6 +9,7 @@ using Microsoft.EntityFrameworkCore.Design; using Microsoft.EntityFrameworkCore.Query; using NodaTime; using Quartz; +using TaskStatus = DysonNetwork.Drive.Storage.Model.TaskStatus; namespace DysonNetwork.Drive; @@ -150,26 +151,41 @@ public class AppDatabaseRecyclingJob(AppDatabase db, ILogger logger + ILogger logger ) : IJob { public async Task Execute(IJobExecutionContext context) { - logger.LogInformation("Cleaning up stale upload tasks..."); + logger.LogInformation("Cleaning up stale persistent tasks..."); - // Get the PersistentUploadService from DI + // Get the PersistentTaskService from DI using var scope = serviceProvider.CreateScope(); - var persistentUploadService = scope.ServiceProvider.GetService(typeof(DysonNetwork.Drive.Storage.PersistentUploadService)); + var persistentTaskService = scope.ServiceProvider.GetService(typeof(PersistentTaskService)); - if (persistentUploadService is DysonNetwork.Drive.Storage.PersistentUploadService service) + if (persistentTaskService is PersistentTaskService service) { - await service.CleanupStaleTasksAsync(); + // Clean up tasks for all users (you might want to add user-specific logic here) + // For now, we'll clean up tasks older than 30 days for all users + var cutoff = SystemClock.Instance.GetCurrentInstant() - Duration.FromDays(30); + var tasksToClean = await service.GetUserTasksAsync( + Guid.Empty, // This would need to be adjusted for multi-user cleanup + status: TaskStatus.Completed | TaskStatus.Failed | TaskStatus.Cancelled | TaskStatus.Expired + ); + + var cleanedCount = 0; + foreach (var task in tasksToClean.Items.Where(t => t.UpdatedAt < cutoff)) + { + await service.CancelTaskAsync(task.TaskId); // Or implement a proper cleanup method + cleanedCount++; + } + + logger.LogInformation("Cleaned up {Count} stale persistent tasks", cleanedCount); } else { - logger.LogWarning("PersistentUploadService not found in DI container"); + logger.LogWarning("PersistentTaskService not found in DI container"); } } } diff --git a/DysonNetwork.Drive/Startup/ScheduledJobsConfiguration.cs b/DysonNetwork.Drive/Startup/ScheduledJobsConfiguration.cs index e54ee86..58745a5 100644 --- a/DysonNetwork.Drive/Startup/ScheduledJobsConfiguration.cs +++ b/DysonNetwork.Drive/Startup/ScheduledJobsConfiguration.cs @@ -22,6 +22,13 @@ public static class ScheduledJobsConfiguration .ForJob(cloudFileUnusedRecyclingJob) .WithIdentity("CloudFileUnusedRecyclingTrigger") .WithCronSchedule("0 0 0 * * ?")); + + var persistentTaskCleanupJob = new JobKey("PersistentTaskCleanup"); + q.AddJob(opts => opts.WithIdentity(persistentTaskCleanupJob)); + q.AddTrigger(opts => opts + .ForJob(persistentTaskCleanupJob) + .WithIdentity("PersistentTaskCleanupTrigger") + .WithCronSchedule("0 0 2 * * ?")); // Run daily at 2 AM }); services.AddQuartzHostedService(q => q.WaitForJobsToComplete = true); diff --git a/DysonNetwork.Drive/Startup/ServiceCollectionExtensions.cs b/DysonNetwork.Drive/Startup/ServiceCollectionExtensions.cs index 7006d46..af8b39e 100644 --- a/DysonNetwork.Drive/Startup/ServiceCollectionExtensions.cs +++ b/DysonNetwork.Drive/Startup/ServiceCollectionExtensions.cs @@ -55,7 +55,7 @@ public static class ServiceCollectionExtensions { services.AddScoped(); services.AddScoped(); - services.AddScoped(); + services.AddScoped(); services.AddScoped(); services.AddScoped(); diff --git a/DysonNetwork.Drive/Storage/FileService.cs b/DysonNetwork.Drive/Storage/FileService.cs index 583a866..b9acb51 100644 --- a/DysonNetwork.Drive/Storage/FileService.cs +++ b/DysonNetwork.Drive/Storage/FileService.cs @@ -126,8 +126,7 @@ public class FileService( private async Task ValidateAndGetPoolAsync(string filePool) { var pool = await GetPoolAsync(Guid.Parse(filePool)); - if (pool is null) throw new InvalidOperationException("Pool not found"); - return pool; + return pool ?? throw new InvalidOperationException("Pool not found: " + filePool); } private async Task ValidateAndGetBundleAsync(string? fileBundleId, Guid accountId) @@ -135,12 +134,10 @@ public class FileService( if (fileBundleId is null) return null; var bundle = await GetBundleAsync(Guid.Parse(fileBundleId), accountId); - if (bundle is null) throw new InvalidOperationException("Bundle not found"); - - return bundle; + return bundle ?? throw new InvalidOperationException("Bundle not found: " + fileBundleId); } - private Instant? CalculateFinalExpiration(Instant? expiredAt, FilePool pool, SnFileBundle? bundle) + private static Instant? CalculateFinalExpiration(Instant? expiredAt, FilePool pool, SnFileBundle? bundle) { var finalExpiredAt = expiredAt; diff --git a/DysonNetwork.Drive/Storage/FileUploadController.cs b/DysonNetwork.Drive/Storage/FileUploadController.cs index 087bc3c..ffef107 100644 --- a/DysonNetwork.Drive/Storage/FileUploadController.cs +++ b/DysonNetwork.Drive/Storage/FileUploadController.cs @@ -24,12 +24,14 @@ public class FileUploadController( AppDatabase db, PermissionService.PermissionServiceClient permission, QuotaService quotaService, - PersistentUploadService persistentUploadService + PersistentTaskService persistentTaskService, + ILogger logger ) : ControllerBase { private readonly string _tempPath = configuration.GetValue("Storage:Uploads") ?? Path.Combine(Path.GetTempPath(), "multipart-uploads"); + private readonly ILogger _logger = logger; private const long DefaultChunkSize = 1024 * 1024 * 5; // 5MB @@ -75,7 +77,7 @@ public class FileUploadController( var taskId = await Nanoid.GenerateAsync(); // Create persistent upload task - var persistentTask = await persistentUploadService.CreateUploadTaskAsync(taskId, request, accountId); + var persistentTask = await persistentTaskService.CreateUploadTaskAsync(taskId, request, accountId); return Ok(new CreateUploadTaskResponse { @@ -231,7 +233,7 @@ public class FileUploadController( var chunk = request.Chunk; // Check if chunk is already uploaded (resumable upload) - if (await persistentUploadService.IsChunkUploadedAsync(taskId, chunkIndex)) + if (await persistentTaskService.IsChunkUploadedAsync(taskId, chunkIndex)) { return Ok(new { message = "Chunk already uploaded" }); } @@ -247,7 +249,7 @@ public class FileUploadController( await chunk.CopyToAsync(stream); // Update persistent task progress - await persistentUploadService.UpdateChunkProgressAsync(taskId, chunkIndex); + await persistentTaskService.UpdateChunkProgressAsync(taskId, chunkIndex); return Ok(); } @@ -256,7 +258,7 @@ public class FileUploadController( public async Task CompleteUpload(string taskId) { // Get persistent task - var persistentTask = await persistentUploadService.GetUploadTaskAsync(taskId); + var persistentTask = await persistentTaskService.GetUploadTaskAsync(taskId); if (persistentTask is null) return new ObjectResult(ApiError.NotFound("Upload task")) { StatusCode = 404 }; @@ -292,27 +294,30 @@ public class FileUploadController( ); // Mark task as completed - await persistentUploadService.MarkTaskCompletedAsync(taskId); + await persistentTaskService.MarkTaskCompletedAsync(taskId); // Send completion notification - await persistentUploadService.SendUploadCompletedNotificationAsync(persistentTask, fileId); + await persistentTaskService.SendUploadCompletedNotificationAsync(persistentTask, fileId); return Ok(cloudFile); } catch (Exception ex) { + // Log the actual exception for debugging + _logger.LogError(ex, "Failed to complete upload for task {TaskId}. Error: {ErrorMessage}", taskId, ex.Message); + // Mark task as failed - await persistentUploadService.MarkTaskFailedAsync(taskId); + await persistentTaskService.MarkTaskFailedAsync(taskId); // Send failure notification - await persistentUploadService.SendUploadFailedNotificationAsync(persistentTask, ex.Message); + await persistentTaskService.SendUploadFailedNotificationAsync(persistentTask, ex.Message); await CleanupTempFiles(taskPath, mergedFilePath); return new ObjectResult(new ApiError { Code = "UPLOAD_FAILED", - Message = "Failed to complete file upload.", + Message = $"Failed to complete file upload: {ex.Message}", Status = 500 }) { StatusCode = 500 }; } @@ -372,7 +377,7 @@ public class FileUploadController( return new ObjectResult(ApiError.Unauthorized()) { StatusCode = 401 }; var accountId = Guid.Parse(currentUser.Id); - var tasks = await persistentUploadService.GetUserTasksAsync(accountId, status, sortBy, sortDescending, offset, limit); + var tasks = await persistentTaskService.GetUserUploadTasksAsync(accountId, status, sortBy, sortDescending, offset, limit); Response.Headers.Append("X-Total", tasks.TotalCount.ToString()); @@ -403,7 +408,7 @@ public class FileUploadController( if (currentUser is null) return new ObjectResult(ApiError.Unauthorized()) { StatusCode = 401 }; - var task = await persistentUploadService.GetUploadTaskAsync(taskId); + var task = await persistentTaskService.GetUploadTaskAsync(taskId); if (task is null) return new ObjectResult(ApiError.NotFound("Upload task")) { StatusCode = 404 }; @@ -411,7 +416,7 @@ public class FileUploadController( if (task.AccountId != Guid.Parse(currentUser.Id)) return new ObjectResult(ApiError.Unauthorized(forbidden: true)) { StatusCode = 403 }; - var progress = await persistentUploadService.GetUploadProgressAsync(taskId); + var progress = await persistentTaskService.GetUploadProgressAsync(taskId); return Ok(new { @@ -434,7 +439,7 @@ public class FileUploadController( if (currentUser is null) return new ObjectResult(ApiError.Unauthorized()) { StatusCode = 401 }; - var task = await persistentUploadService.GetUploadTaskAsync(taskId); + var task = await persistentTaskService.GetUploadTaskAsync(taskId); if (task is null) return new ObjectResult(ApiError.NotFound("Upload task")) { StatusCode = 404 }; @@ -470,7 +475,7 @@ public class FileUploadController( if (currentUser is null) return new ObjectResult(ApiError.Unauthorized()) { StatusCode = 401 }; - var task = await persistentUploadService.GetUploadTaskAsync(taskId); + var task = await persistentTaskService.GetUploadTaskAsync(taskId); if (task is null) return new ObjectResult(ApiError.NotFound("Upload task")) { StatusCode = 404 }; @@ -479,7 +484,7 @@ public class FileUploadController( return new ObjectResult(ApiError.Unauthorized(forbidden: true)) { StatusCode = 403 }; // Mark as failed (cancelled) - await persistentUploadService.MarkTaskFailedAsync(taskId); + await persistentTaskService.MarkTaskFailedAsync(taskId); // Clean up temp files var taskPath = Path.Combine(_tempPath, taskId); @@ -496,7 +501,7 @@ public class FileUploadController( return new ObjectResult(ApiError.Unauthorized()) { StatusCode = 401 }; var accountId = Guid.Parse(currentUser.Id); - var stats = await persistentUploadService.GetUserUploadStatsAsync(accountId); + var stats = await persistentTaskService.GetUserUploadStatsAsync(accountId); return Ok(new { @@ -519,7 +524,7 @@ public class FileUploadController( return new ObjectResult(ApiError.Unauthorized()) { StatusCode = 401 }; var accountId = Guid.Parse(currentUser.Id); - var cleanedCount = await persistentUploadService.CleanupUserFailedTasksAsync(accountId); + var cleanedCount = await persistentTaskService.CleanupUserFailedTasksAsync(accountId); return Ok(new { message = $"Cleaned up {cleanedCount} failed tasks" }); } @@ -532,7 +537,7 @@ public class FileUploadController( return new ObjectResult(ApiError.Unauthorized()) { StatusCode = 401 }; var accountId = Guid.Parse(currentUser.Id); - var tasks = await persistentUploadService.GetRecentUserTasksAsync(accountId, limit); + var tasks = await persistentTaskService.GetRecentUserTasksAsync(accountId, limit); return Ok(tasks.Select(t => new { @@ -554,7 +559,7 @@ public class FileUploadController( if (currentUser is null) return new ObjectResult(ApiError.Unauthorized()) { StatusCode = 401 }; - var task = await persistentUploadService.GetUploadTaskAsync(taskId); + var task = await persistentTaskService.GetUploadTaskAsync(taskId); if (task is null) return new ObjectResult(ApiError.NotFound("Upload task")) { StatusCode = 404 }; diff --git a/DysonNetwork.Drive/Storage/Model/FileUploadModels.cs b/DysonNetwork.Drive/Storage/Model/FileUploadModels.cs index 40734af..6d7da2e 100644 --- a/DysonNetwork.Drive/Storage/Model/FileUploadModels.cs +++ b/DysonNetwork.Drive/Storage/Model/FileUploadModels.cs @@ -1,4 +1,6 @@ using DysonNetwork.Shared.Models; +using DysonNetwork.Shared.Proto; +using Google.Protobuf.Collections; using NodaTime; using System.ComponentModel.DataAnnotations; using System.ComponentModel.DataAnnotations.Schema; @@ -6,109 +8,77 @@ using System.Text.Json; namespace DysonNetwork.Drive.Storage.Model; +// File Upload Task Parameters +public class FileUploadParameters +{ + public string FileName { get; set; } = string.Empty; + public long FileSize { get; set; } + public string ContentType { get; set; } = string.Empty; + public long ChunkSize { get; set; } = 5242880L; + public int ChunksCount { get; set; } + public int ChunksUploaded { get; set; } + public Guid PoolId { get; set; } + public Guid? BundleId { get; set; } + public string? EncryptPassword { get; set; } + public string Hash { get; set; } = string.Empty; + public List UploadedChunks { get; set; } = []; +} + +// File Move Task Parameters +public class FileMoveParameters +{ + public List FileIds { get; set; } = []; + public Guid TargetPoolId { get; set; } + public Guid? TargetBundleId { get; set; } + public int FilesProcessed { get; set; } +} + +// File Compression Task Parameters +public class FileCompressParameters +{ + public List FileIds { get; set; } = []; + public string CompressionFormat { get; set; } = "zip"; + public int CompressionLevel { get; set; } = 6; + public string? OutputFileName { get; set; } + public int FilesProcessed { get; set; } + public string? ResultFileId { get; set; } +} + +// Bulk Operation Task Parameters +public class BulkOperationParameters +{ + public string OperationType { get; set; } = string.Empty; + public List TargetIds { get; set; } = []; + public Dictionary OperationParameters { get; set; } = new(); + public int ItemsProcessed { get; set; } + public Dictionary? OperationResults { get; set; } +} + +// Storage Migration Task Parameters +public class StorageMigrationParameters +{ + public Guid SourcePoolId { get; set; } + public Guid TargetPoolId { get; set; } + public List FileIds { get; set; } = new(); + public bool PreserveOriginals { get; set; } = true; + public long TotalBytesToTransfer { get; set; } + public long BytesTransferred { get; set; } + public int FilesMigrated { get; set; } +} + +// Helper class for parameter operations using GrpcTypeHelper public static class ParameterHelper { - public static T GetParameterValue(Dictionary parameters, string key, T defaultValue = default!) + public static T? Typed(Dictionary parameters) { - if (!parameters.TryGetValue(key, out var value) || value == null) - { - return defaultValue; - } - - // If the value is already the correct type, return it directly - if (value is T typedValue) - { - return typedValue; - } - - // Handle JsonElement by deserializing to the target type - if (value is JsonElement jsonElement) - { - try - { - return jsonElement.Deserialize() ?? defaultValue; - } - catch - { - return defaultValue; - } - } - - // Handle numeric conversions - if (typeof(T) == typeof(int)) - { - if (value is long longValue) - { - return (T)(object)(int)longValue; - } - if (value is string stringValue && int.TryParse(stringValue, out int intValue)) - { - return (T)(object)intValue; - } - } - else if (typeof(T) == typeof(long)) - { - if (value is int intValue) - { - return (T)(object)(long)intValue; - } - if (value is string stringValue && long.TryParse(stringValue, out long longValue)) - { - return (T)(object)longValue; - } - } - else if (typeof(T) == typeof(string)) - { - return (T)(object)value.ToString()!; - } - else if (typeof(T) == typeof(bool)) - { - if (value is string stringValue && bool.TryParse(stringValue, out bool boolValue)) - { - return (T)(object)boolValue; - } - } - - // Fallback to Convert.ChangeType for other types - try - { - return (T)Convert.ChangeType(value, typeof(T)); - } - catch - { - return defaultValue; - } + var rawParams = GrpcTypeHelper.ConvertObjectToByteString(parameters); + return GrpcTypeHelper.ConvertByteStringToObject(rawParams); } - public static List GetParameterList(Dictionary parameters, string key, List defaultValue = null!) + public static Dictionary Untyped(T parameters) { - defaultValue ??= []; - - if (!parameters.TryGetValue(key, out var value) || value == null) - { - return defaultValue; - } - - // If the value is already the correct type, return it directly - if (value is List typedList) - { - return typedList; - } - - // Handle JsonElement by deserializing to the target type - if (value is JsonElement jsonElement) - { - try - { - return jsonElement.Deserialize>() ?? defaultValue; - } - catch - { - return defaultValue; - } - } - - return defaultValue; + var rawParams = GrpcTypeHelper.ConvertObjectToByteString(parameters); + return GrpcTypeHelper.ConvertByteStringToObject>(rawParams) ?? []; } } @@ -153,14 +123,11 @@ public class PersistentTask : ModelBase { public Guid Id { get; set; } = Guid.NewGuid(); - [MaxLength(64)] - public string TaskId { get; set; } = null!; + [MaxLength(64)] public string TaskId { get; set; } = null!; - [MaxLength(256)] - public string Name { get; set; } = null!; + [MaxLength(256)] public string Name { get; set; } = null!; - [MaxLength(1024)] - public string? Description { get; set; } + [MaxLength(1024)] public string? Description { get; set; } public TaskType Type { get; set; } @@ -172,15 +139,12 @@ public class PersistentTask : ModelBase public double Progress { get; set; } // Task-specific parameters stored as JSON - [Column(TypeName = "jsonb")] - public Dictionary Parameters { get; set; } = new(); + [Column(TypeName = "jsonb")] public Dictionary Parameters { get; set; } = new(); // Task results/output stored as JSON - [Column(TypeName = "jsonb")] - public Dictionary Results { get; set; } = new(); + [Column(TypeName = "jsonb")] public Dictionary Results { get; set; } = new(); - [MaxLength(1024)] - public string? ErrorMessage { get; set; } + [MaxLength(1024)] public string? ErrorMessage { get; set; } public Instant? StartedAt { get; set; } public Instant? CompletedAt { get; set; } @@ -193,6 +157,24 @@ public class PersistentTask : ModelBase // Estimated duration in seconds public long? EstimatedDurationSeconds { get; set; } + + // Helper methods for parameter management using GrpcTypeHelper + public MapField GetParametersAsGrpcMap() + { + var nonNullableParameters = Parameters.ToDictionary(kvp => kvp.Key, kvp => kvp.Value ?? string.Empty); + return GrpcTypeHelper.ConvertToValueMap(nonNullableParameters); + } + + public void SetParametersFromGrpcMap(MapField map) + { + Parameters = GrpcTypeHelper.ConvertFromValueMap(map); + } + + public MapField GetResultsAsGrpcMap() + { + var nonNullableResults = Results.ToDictionary(kvp => kvp.Key, kvp => kvp.Value ?? string.Empty); + return GrpcTypeHelper.ConvertToValueMap(nonNullableResults); + } } // Backward compatibility - UploadTask inherits from PersistentTask @@ -204,86 +186,138 @@ public class PersistentUploadTask : PersistentTask Name = "File Upload"; } + // Convenience properties using typed parameters + [NotMapped] + public FileUploadParameters TypedParameters + { + get => ParameterHelper.Typed(Parameters)!; + set => Parameters = ParameterHelper.Untyped(value); + } + [MaxLength(256)] public string FileName { - get => ParameterHelper.GetParameterValue(Parameters, "file_name", string.Empty); - set => Parameters["file_name"] = value; + get => TypedParameters.FileName; + set + { + var parameters = TypedParameters; + parameters.FileName = value; + TypedParameters = parameters; + } } public long FileSize { - get => ParameterHelper.GetParameterValue(Parameters, "file_size", 0L); - set => Parameters["file_size"] = value; + get => TypedParameters.FileSize; + set + { + var parameters = TypedParameters; + parameters.FileSize = value; + TypedParameters = parameters; + } } [MaxLength(128)] public string ContentType { - get => ParameterHelper.GetParameterValue(Parameters, "content_type", string.Empty); - set => Parameters["content_type"] = value; + get => TypedParameters.ContentType; + set + { + var parameters = TypedParameters; + parameters.ContentType = value; + TypedParameters = parameters; + } } public long ChunkSize { - get => ParameterHelper.GetParameterValue(Parameters, "chunk_size", 5242880L); - set => Parameters["chunk_size"] = value; + get => TypedParameters.ChunkSize; + set + { + var parameters = TypedParameters; + parameters.ChunkSize = value; + TypedParameters = parameters; + } } public int ChunksCount { - get => ParameterHelper.GetParameterValue(Parameters, "chunks_count", 0); - set => Parameters["chunks_count"] = value; + get => TypedParameters.ChunksCount; + set + { + var parameters = TypedParameters; + parameters.ChunksCount = value; + TypedParameters = parameters; + } } public int ChunksUploaded { - get => ParameterHelper.GetParameterValue(Parameters, "chunks_uploaded", 0); + get => TypedParameters.ChunksUploaded; set { - Parameters["chunks_uploaded"] = value; + var parameters = TypedParameters; + parameters.ChunksUploaded = value; + TypedParameters = parameters; Progress = ChunksCount > 0 ? (double)value / ChunksCount * 100 : 0; } } public Guid PoolId { - get + get => TypedParameters.PoolId; + set { - var poolIdStr = ParameterHelper.GetParameterValue(Parameters, "pool_id", Guid.Empty.ToString()); - return Guid.Parse(poolIdStr); + var parameters = TypedParameters; + parameters.PoolId = value; + TypedParameters = parameters; } - set => Parameters["pool_id"] = value.ToString(); } public Guid? BundleId { - get + get => TypedParameters.BundleId; + set { - var bundleIdStr = ParameterHelper.GetParameterValue(Parameters, "bundle_id", string.Empty); - return string.IsNullOrEmpty(bundleIdStr) ? null : Guid.Parse(bundleIdStr); + var parameters = TypedParameters; + parameters.BundleId = value; + TypedParameters = parameters; } - set => Parameters["bundle_id"] = value?.ToString(); } [MaxLength(256)] public string? EncryptPassword { - get => ParameterHelper.GetParameterValue(Parameters, "encrypt_password", null); - set => Parameters["encrypt_password"] = value; + get => TypedParameters.EncryptPassword; + set + { + var parameters = TypedParameters; + parameters.EncryptPassword = value; + TypedParameters = parameters; + } } public string Hash { - get => ParameterHelper.GetParameterValue(Parameters, "hash", string.Empty); - set => Parameters["hash"] = value; + get => TypedParameters.Hash; + set + { + var parameters = TypedParameters; + parameters.Hash = value; + TypedParameters = parameters; + } } // JSON array of uploaded chunk indices for resumability public List UploadedChunks { - get => ParameterHelper.GetParameterList(Parameters, "uploaded_chunks", []); - set => Parameters["uploaded_chunks"] = value; + get => TypedParameters.UploadedChunks; + set + { + var parameters = TypedParameters; + parameters.UploadedChunks = value; + TypedParameters = parameters; + } } } @@ -301,6 +335,7 @@ public enum TaskType Custom } +[Flags] public enum TaskStatus { Pending, @@ -321,38 +356,54 @@ public class FileMoveTask : PersistentTask Name = "Move Files"; } + // Convenience properties using typed parameters + public FileMoveParameters TypedParameters + { + get => ParameterHelper.Typed(Parameters)!; + set => Parameters = ParameterHelper.Untyped(value); + } + public List FileIds { - get => ParameterHelper.GetParameterList(Parameters, "file_ids", []); - set => Parameters["file_ids"] = value; + get => TypedParameters.FileIds; + set + { + var parameters = TypedParameters; + parameters.FileIds = value; + TypedParameters = parameters; + } } public Guid TargetPoolId { - get + get => TypedParameters.TargetPoolId; + set { - var targetPoolIdStr = ParameterHelper.GetParameterValue(Parameters, "target_pool_id", Guid.Empty.ToString()); - return Guid.Parse(targetPoolIdStr); + var parameters = TypedParameters; + parameters.TargetPoolId = value; + TypedParameters = parameters; } - set => Parameters["target_pool_id"] = value.ToString(); } public Guid? TargetBundleId { - get + get => TypedParameters.TargetBundleId; + set { - var bundleIdStr = ParameterHelper.GetParameterValue(Parameters, "target_bundle_id", string.Empty); - return string.IsNullOrEmpty(bundleIdStr) ? null : Guid.Parse(bundleIdStr); + var parameters = TypedParameters; + parameters.TargetBundleId = value; + TypedParameters = parameters; } - set => Parameters["target_bundle_id"] = value?.ToString(); } public int FilesProcessed { - get => ParameterHelper.GetParameterValue(Parameters, "files_processed", 0); + get => TypedParameters.FilesProcessed; set { - Parameters["files_processed"] = value; + var parameters = TypedParameters; + parameters.FilesProcessed = value; + TypedParameters = parameters; Progress = FileIds.Count > 0 ? (double)value / FileIds.Count * 100 : 0; } } @@ -367,45 +418,79 @@ public class FileCompressTask : PersistentTask Name = "Compress Files"; } + // Convenience properties using typed parameters + public FileCompressParameters TypedParameters + { + get => ParameterHelper.Typed(Parameters)!; + set => Parameters = ParameterHelper.Untyped(value); + } + public List FileIds { - get => ParameterHelper.GetParameterList(Parameters, "file_ids", []); - set => Parameters["file_ids"] = value; + get => TypedParameters.FileIds; + set + { + var parameters = TypedParameters; + parameters.FileIds = value; + TypedParameters = parameters; + } } [MaxLength(32)] public string CompressionFormat { - get => ParameterHelper.GetParameterValue(Parameters, "compression_format", "zip"); - set => Parameters["compression_format"] = value; + get => TypedParameters.CompressionFormat; + set + { + var parameters = TypedParameters; + parameters.CompressionFormat = value; + TypedParameters = parameters; + } } public int CompressionLevel { - get => ParameterHelper.GetParameterValue(Parameters, "compression_level", 6); - set => Parameters["compression_level"] = value; + get => TypedParameters.CompressionLevel; + set + { + var parameters = TypedParameters; + parameters.CompressionLevel = value; + TypedParameters = parameters; + } } public string? OutputFileName { - get => ParameterHelper.GetParameterValue(Parameters, "output_file_name", null); - set => Parameters["output_file_name"] = value; + get => TypedParameters.OutputFileName; + set + { + var parameters = TypedParameters; + parameters.OutputFileName = value; + TypedParameters = parameters; + } } public int FilesProcessed { - get => ParameterHelper.GetParameterValue(Parameters, "files_processed", 0); + get => TypedParameters.FilesProcessed; set { - Parameters["files_processed"] = value; + var parameters = TypedParameters; + parameters.FilesProcessed = value; + TypedParameters = parameters; Progress = FileIds.Count > 0 ? (double)value / FileIds.Count * 100 : 0; } } public string? ResultFileId { - get => ParameterHelper.GetParameterValue(Results, "result_file_id", null); - set => Results["result_file_id"] = value; + get => TypedParameters.ResultFileId; + set + { + var parameters = TypedParameters; + parameters.ResultFileId = value; + TypedParameters = parameters; + } } } @@ -418,41 +503,70 @@ public class BulkOperationTask : PersistentTask Name = "Bulk Operation"; } + // Convenience properties using typed parameters + public BulkOperationParameters TypedParameters + { + get => ParameterHelper.Typed(Parameters)!; + set => Parameters = ParameterHelper.Untyped(value); + } + [MaxLength(128)] public string OperationType { - get => ParameterHelper.GetParameterValue(Parameters, "operation_type", string.Empty); - set => Parameters["operation_type"] = value; + get => TypedParameters.OperationType; + set + { + var parameters = TypedParameters; + parameters.OperationType = value; + TypedParameters = parameters; + } } public List TargetIds { - get => ParameterHelper.GetParameterList(Parameters, "target_ids", []); - set => Parameters["target_ids"] = value; + get => TypedParameters.TargetIds; + set + { + var parameters = TypedParameters; + parameters.TargetIds = value; + TypedParameters = parameters; + } } [Column(TypeName = "jsonb")] public Dictionary OperationParameters { - get => ParameterHelper.GetParameterValue(Parameters, "operation_parameters", new Dictionary()); - set => Parameters["operation_parameters"] = value; + get => TypedParameters.OperationParameters; + set + { + var parameters = TypedParameters; + parameters.OperationParameters = value; + TypedParameters = parameters; + } } public int ItemsProcessed { - get => ParameterHelper.GetParameterValue(Parameters, "items_processed", 0); + get => TypedParameters.ItemsProcessed; set { - Parameters["items_processed"] = value; + var parameters = TypedParameters; + parameters.ItemsProcessed = value; + TypedParameters = parameters; Progress = TargetIds.Count > 0 ? (double)value / TargetIds.Count * 100 : 0; } } [Column(TypeName = "jsonb")] - public Dictionary OperationResults + public Dictionary? OperationResults { - get => ParameterHelper.GetParameterValue(Results, "operation_results", new Dictionary()); - set => Results["operation_results"] = value; + get => TypedParameters.OperationResults; + set + { + var parameters = TypedParameters; + parameters.OperationResults = value; + TypedParameters = parameters; + } } } @@ -465,58 +579,89 @@ public class StorageMigrationTask : PersistentTask Name = "Storage Migration"; } + // Convenience properties using typed parameters + public StorageMigrationParameters TypedParameters + { + get => ParameterHelper.Typed(Parameters)!; + set => Parameters = ParameterHelper.Untyped(value); + } + public Guid SourcePoolId { - get + get => TypedParameters.SourcePoolId; + set { - var sourcePoolIdStr = ParameterHelper.GetParameterValue(Parameters, "source_pool_id", Guid.Empty.ToString()); - return Guid.Parse(sourcePoolIdStr); + var parameters = TypedParameters; + parameters.SourcePoolId = value; + TypedParameters = parameters; } - set => Parameters["source_pool_id"] = value.ToString(); } public Guid TargetPoolId { - get + get => TypedParameters.TargetPoolId; + set { - var targetPoolIdStr = ParameterHelper.GetParameterValue(Parameters, "target_pool_id", Guid.Empty.ToString()); - return Guid.Parse(targetPoolIdStr); + var parameters = TypedParameters; + parameters.TargetPoolId = value; + TypedParameters = parameters; } - set => Parameters["target_pool_id"] = value.ToString(); } public List FileIds { - get => ParameterHelper.GetParameterList(Parameters, "file_ids", []); - set => Parameters["file_ids"] = value; + get => TypedParameters.FileIds; + set + { + var parameters = TypedParameters; + parameters.FileIds = value; + TypedParameters = parameters; + } } public bool PreserveOriginals { - get => ParameterHelper.GetParameterValue(Parameters, "preserve_originals", true); - set => Parameters["preserve_originals"] = value; + get => TypedParameters.PreserveOriginals; + set + { + var parameters = TypedParameters; + parameters.PreserveOriginals = value; + TypedParameters = parameters; + } } public long TotalBytesToTransfer { - get => ParameterHelper.GetParameterValue(Parameters, "total_bytes_to_transfer", 0L); - set => Parameters["total_bytes_to_transfer"] = value; + get => TypedParameters.TotalBytesToTransfer; + set + { + var parameters = TypedParameters; + parameters.TotalBytesToTransfer = value; + TypedParameters = parameters; + } } public long BytesTransferred { - get => ParameterHelper.GetParameterValue(Parameters, "bytes_transferred", 0L); + get => TypedParameters.BytesTransferred; set { - Parameters["bytes_transferred"] = value; + var parameters = TypedParameters; + parameters.BytesTransferred = value; + TypedParameters = parameters; Progress = TotalBytesToTransfer > 0 ? (double)value / TotalBytesToTransfer * 100 : 0; } } public int FilesMigrated { - get => ParameterHelper.GetParameterValue(Parameters, "files_migrated", 0); - set => Parameters["files_migrated"] = value; + get => TypedParameters.FilesMigrated; + set + { + var parameters = TypedParameters; + parameters.FilesMigrated = value; + TypedParameters = parameters; + } } } @@ -527,4 +672,4 @@ public enum UploadTaskStatus Completed = TaskStatus.Completed, Failed = TaskStatus.Failed, Expired = TaskStatus.Expired -} +} \ No newline at end of file diff --git a/DysonNetwork.Drive/Storage/PersistentTaskService.cs b/DysonNetwork.Drive/Storage/PersistentTaskService.cs index b901cbc..7483b4e 100644 --- a/DysonNetwork.Drive/Storage/PersistentTaskService.cs +++ b/DysonNetwork.Drive/Storage/PersistentTaskService.cs @@ -233,17 +233,17 @@ public class PersistentTaskService( ? query.OrderByDescending(t => t.Progress) : query.OrderBy(t => t.Progress); break; - case "createdat": + case "created": orderedQuery = sortDescending ? query.OrderByDescending(t => t.CreatedAt) : query.OrderBy(t => t.CreatedAt); break; - case "updatedat": + case "updated": orderedQuery = sortDescending ? query.OrderByDescending(t => t.UpdatedAt) : query.OrderBy(t => t.UpdatedAt); break; - case "lastactivity": + case "activity": default: orderedQuery = sortDescending ? query.OrderByDescending(t => t.LastActivity) @@ -344,7 +344,7 @@ public class PersistentTaskService( TaskId = task.TaskId, Name = task.Name, Type = task.Type.ToString(), - CreatedAt = task.CreatedAt.ToString("O", null) + CreatedAt = task.CreatedAt.ToString("%O", null) }; var packet = new WebSocketPacket @@ -380,7 +380,7 @@ public class PersistentTaskService( Type = task.Type.ToString(), Progress = newProgress, Status = task.Status.ToString(), - LastActivity = task.LastActivity.ToString("O", null) + LastActivity = task.LastActivity.ToString("%O", null) }; var packet = new WebSocketPacket @@ -410,7 +410,7 @@ public class PersistentTaskService( TaskId = task.TaskId, Name = task.Name, Type = task.Type.ToString(), - CompletedAt = task.CompletedAt?.ToString("O", null) ?? task.UpdatedAt.ToString("O", null), + CompletedAt = task.CompletedAt?.ToString("%O", null) ?? task.UpdatedAt.ToString("%O", null), Results = task.Results }; @@ -458,7 +458,7 @@ public class PersistentTaskService( TaskId = task.TaskId, Name = task.Name, Type = task.Type.ToString(), - FailedAt = task.UpdatedAt.ToString("O", null), + FailedAt = task.UpdatedAt.ToString("%O", null), ErrorMessage = task.ErrorMessage ?? "Task failed due to an unknown error" }; @@ -504,6 +504,8 @@ public class PersistentTaskService( private async Task SetCacheAsync(PersistentTask task) { var cacheKey = $"{CacheKeyPrefix}{task.TaskId}"; + + // Cache the entire task object directly - this includes all properties including Parameters dictionary await cache.SetAsync(cacheKey, task, CacheDuration); } @@ -514,6 +516,475 @@ public class PersistentTaskService( } #endregion + + #region Upload-Specific Methods + + /// + /// Gets the first available pool ID, or creates a default one if none exist + /// + private async Task GetFirstAvailablePoolIdAsync() + { + // Try to get the first available pool + var firstPool = await db.Pools + .Where(p => p.PolicyConfig.PublicUsable) + .OrderBy(p => p.CreatedAt) + .FirstOrDefaultAsync(); + + if (firstPool != null) + { + return firstPool.Id; + } + + // If no pools exist, create a default one + logger.LogWarning("No pools found in database. Creating default pool..."); + + var defaultPoolId = Guid.NewGuid(); + var defaultPool = new DysonNetwork.Shared.Models.FilePool + { + Id = defaultPoolId, + Name = "Default Storage Pool", + Description = "Automatically created default storage pool", + StorageConfig = new DysonNetwork.Shared.Models.RemoteStorageConfig + { + Region = "auto", + Bucket = "solar-network-development", + Endpoint = "localhost:9000", + SecretId = "littlesheep", + SecretKey = "password", + EnableSigned = true, + EnableSsl = false + }, + BillingConfig = new DysonNetwork.Shared.Models.BillingConfig + { + CostMultiplier = 1.0 + }, + PolicyConfig = new DysonNetwork.Shared.Models.PolicyConfig + { + EnableFastUpload = true, + EnableRecycle = true, + PublicUsable = true, + AllowEncryption = true, + AllowAnonymous = true, + AcceptTypes = new List { "*/*" }, + MaxFileSize = 1024L * 1024 * 1024 * 10, // 10GB + RequirePrivilege = 0 + }, + IsHidden = false, + AccountId = null, + CreatedAt = SystemClock.Instance.GetCurrentInstant(), + UpdatedAt = SystemClock.Instance.GetCurrentInstant() + }; + + db.Pools.Add(defaultPool); + await db.SaveChangesAsync(); + + logger.LogInformation("Created default pool with ID: {PoolId}", defaultPoolId); + return defaultPoolId; + } + + /// + /// Creates a new persistent upload task + /// + public async Task CreateUploadTaskAsync( + string taskId, + CreateUploadTaskRequest request, + Guid accountId + ) + { + var chunkSize = request.ChunkSize ?? 1024 * 1024 * 5; // 5MB default + var chunksCount = (int)Math.Ceiling((double)request.FileSize / chunkSize); + + // Use default pool if no pool is specified, or find first available pool + var poolId = request.PoolId ?? await GetFirstAvailablePoolIdAsync(); + + var uploadTask = new PersistentUploadTask + { + TaskId = taskId, + FileName = request.FileName, + FileSize = request.FileSize, + ContentType = request.ContentType, + ChunkSize = chunkSize, + ChunksCount = chunksCount, + ChunksUploaded = 0, + PoolId = poolId, + BundleId = request.BundleId, + EncryptPassword = request.EncryptPassword, + ExpiredAt = request.ExpiredAt, + Hash = request.Hash, + AccountId = accountId, + Status = TaskStatus.InProgress, + UploadedChunks = [], + LastActivity = SystemClock.Instance.GetCurrentInstant() + }; + + db.Tasks.Add(uploadTask); + await db.SaveChangesAsync(); + + await SetCacheAsync(uploadTask); + await SendTaskCreatedNotificationAsync(uploadTask); + return uploadTask; + } + + /// + /// Gets an existing upload task by ID + /// + public async Task GetUploadTaskAsync(string taskId) + { + var cacheKey = $"{CacheKeyPrefix}{taskId}"; + var cachedTask = await cache.GetAsync(cacheKey); + if (cachedTask is not null) + return cachedTask; + + var task = await db.Tasks + .OfType() + .FirstOrDefaultAsync(t => t.TaskId == taskId && t.Status == TaskStatus.InProgress); + + if (task is not null) + await SetCacheAsync(task); + + return task; + } + + /// + /// Updates chunk upload progress + /// + public async Task UpdateChunkProgressAsync(string taskId, int chunkIndex) + { + var task = await GetUploadTaskAsync(taskId); + if (task is null) return; + + if (!task.UploadedChunks.Contains(chunkIndex)) + { + var previousProgress = task.ChunksCount > 0 ? (double)task.ChunksUploaded / task.ChunksCount * 100 : 0; + + task.UploadedChunks.Add(chunkIndex); + task.ChunksUploaded = task.UploadedChunks.Count; + task.LastActivity = SystemClock.Instance.GetCurrentInstant(); + + await db.SaveChangesAsync(); + await SetCacheAsync(task); + + // Send real-time progress update + var newProgress = task.ChunksCount > 0 ? (double)task.ChunksUploaded / task.ChunksCount * 100 : 0; + await SendUploadProgressUpdateAsync(task, newProgress, previousProgress); + } + } + + /// + /// Checks if a chunk has already been uploaded + /// + public async Task IsChunkUploadedAsync(string taskId, int chunkIndex) + { + var task = await GetUploadTaskAsync(taskId); + return task?.UploadedChunks.Contains(chunkIndex) ?? false; + } + + /// + /// Gets upload progress as percentage + /// + public async Task GetUploadProgressAsync(string taskId) + { + var task = await GetUploadTaskAsync(taskId); + if (task is null || task.ChunksCount == 0) return 0; + + return (double)task.ChunksUploaded / task.ChunksCount * 100; + } + + /// + /// Gets user upload tasks with filtering and pagination + /// + public async Task<(List Items, int TotalCount)> GetUserUploadTasksAsync( + Guid accountId, + UploadTaskStatus? status = null, + string? sortBy = "lastActivity", + bool sortDescending = true, + int offset = 0, + int limit = 50 + ) + { + var query = db.Tasks.OfType().Where(t => t.AccountId == accountId); + + // Apply status filter + if (status.HasValue) + { + query = query.Where(t => t.Status == (TaskStatus)status.Value); + } + + // Get total count + var totalCount = await query.CountAsync(); + + // Apply sorting + IOrderedQueryable orderedQuery; + switch (sortBy?.ToLower()) + { + case "filename": + orderedQuery = sortDescending + ? query.OrderByDescending(t => t.FileName) + : query.OrderBy(t => t.FileName); + break; + case "filesize": + orderedQuery = sortDescending + ? query.OrderByDescending(t => t.FileSize) + : query.OrderBy(t => t.FileSize); + break; + case "createdat": + orderedQuery = sortDescending + ? query.OrderByDescending(t => t.CreatedAt) + : query.OrderBy(t => t.CreatedAt); + break; + case "updatedat": + orderedQuery = sortDescending + ? query.OrderByDescending(t => t.UpdatedAt) + : query.OrderBy(t => t.UpdatedAt); + break; + case "lastactivity": + default: + orderedQuery = sortDescending + ? query.OrderByDescending(t => t.LastActivity) + : query.OrderBy(t => t.LastActivity); + break; + } + + // Apply pagination + var items = await orderedQuery + .Skip(offset) + .Take(limit) + .ToListAsync(); + + return (items, totalCount); + } + + /// + /// Gets upload statistics for a user + /// + public async Task GetUserUploadStatsAsync(Guid accountId) + { + var tasks = await db.Tasks + .OfType() + .Where(t => t.AccountId == accountId) + .ToListAsync(); + + var stats = new UserUploadStats + { + TotalTasks = tasks.Count, + InProgressTasks = tasks.Count(t => t.Status == Model.TaskStatus.InProgress), + CompletedTasks = tasks.Count(t => t.Status == Model.TaskStatus.Completed), + FailedTasks = tasks.Count(t => t.Status == Model.TaskStatus.Failed), + ExpiredTasks = tasks.Count(t => t.Status == Model.TaskStatus.Expired), + TotalUploadedBytes = tasks.Sum(t => (long)t.ChunksUploaded * t.ChunkSize), + AverageProgress = tasks.Any(t => t.Status == Model.TaskStatus.InProgress) + ? tasks.Where(t => t.Status == Model.TaskStatus.InProgress) + .Average(t => t.ChunksCount > 0 ? (double)t.ChunksUploaded / t.ChunksCount * 100 : 0) + : 0, + RecentActivity = tasks.OrderByDescending(t => t.LastActivity) + .Take(5) + .Select(t => new RecentActivity + { + TaskId = t.TaskId, + FileName = t.FileName, + Status = (UploadTaskStatus)t.Status, + LastActivity = t.LastActivity, + Progress = t.ChunksCount > 0 ? (double)t.ChunksUploaded / t.ChunksCount * 100 : 0 + }) + .ToList() + }; + + return stats; + } + + /// + /// Cleans up failed tasks for a user + /// + public async Task CleanupUserFailedTasksAsync(Guid accountId) + { + var failedTasks = await db.Tasks + .OfType() + .Where(t => t.AccountId == accountId && + (t.Status == Model.TaskStatus.Failed || t.Status == Model.TaskStatus.Expired)) + .ToListAsync(); + + foreach (var task in failedTasks) + { + await RemoveCacheAsync(task.TaskId); + + // Clean up temp files + var taskPath = Path.Combine(Path.GetTempPath(), "multipart-uploads", task.TaskId); + if (Directory.Exists(taskPath)) + { + try + { + Directory.Delete(taskPath, true); + } + catch (Exception ex) + { + logger.LogWarning(ex, "Failed to cleanup temp files for task {TaskId}", task.TaskId); + } + } + } + + db.Tasks.RemoveRange(failedTasks); + await db.SaveChangesAsync(); + + return failedTasks.Count; + } + + /// + /// Gets recent tasks for a user + /// + public async Task> GetRecentUserTasksAsync(Guid accountId, int limit = 10) + { + return await db.Tasks + .OfType() + .Where(t => t.AccountId == accountId) + .OrderByDescending(t => t.LastActivity) + .Take(limit) + .ToListAsync(); + } + + /// + /// Sends upload completion notification + /// + public async Task SendUploadCompletedNotificationAsync(PersistentUploadTask task, string fileId) + { + try + { + var completionData = new UploadCompletionData + { + TaskId = task.TaskId, + FileId = fileId, + FileName = task.FileName, + FileSize = task.FileSize, + CompletedAt = SystemClock.Instance.GetCurrentInstant().ToString("%O", null) + }; + + // Send WebSocket notification + var wsPacket = new WebSocketPacket + { + Type = "upload.completed", + Data = Google.Protobuf.ByteString.CopyFromUtf8(System.Text.Json.JsonSerializer.Serialize(completionData)) + }; + + await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest + { + UserId = task.AccountId.ToString(), + Packet = wsPacket + }); + + // Send push notification + var pushNotification = new PushNotification + { + Topic = "upload", + Title = "Upload Completed", + Subtitle = task.FileName, + Body = $"Your file '{task.FileName}' has been uploaded successfully.", + IsSavable = true + }; + + await ringService.SendPushNotificationToUserAsync(new SendPushNotificationToUserRequest + { + UserId = task.AccountId.ToString(), + Notification = pushNotification + }); + } + catch (Exception ex) + { + logger.LogWarning(ex, "Failed to send upload completion notification for task {TaskId}", task.TaskId); + } + } + + /// + /// Sends upload failure notification + /// + public async Task SendUploadFailedNotificationAsync(PersistentUploadTask task, string? errorMessage = null) + { + try + { + var failureData = new UploadFailureData + { + TaskId = task.TaskId, + FileName = task.FileName, + FileSize = task.FileSize, + FailedAt = SystemClock.Instance.GetCurrentInstant().ToString("%O", null), + ErrorMessage = errorMessage ?? "Upload failed due to an unknown error" + }; + + // Send WebSocket notification + var wsPacket = new WebSocketPacket + { + Type = "upload.failed", + Data = Google.Protobuf.ByteString.CopyFromUtf8(System.Text.Json.JsonSerializer.Serialize(failureData)) + }; + + await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest + { + UserId = task.AccountId.ToString(), + Packet = wsPacket + }); + + // Send push notification + var pushNotification = new PushNotification + { + Topic = "upload", + Title = "Upload Failed", + Subtitle = task.FileName, + Body = $"Your file '{task.FileName}' upload has failed. You can try again.", + IsSavable = true + }; + + await ringService.SendPushNotificationToUserAsync(new SendPushNotificationToUserRequest + { + UserId = task.AccountId.ToString(), + Notification = pushNotification + }); + } + catch (Exception ex) + { + logger.LogWarning(ex, "Failed to send upload failure notification for task {TaskId}", task.TaskId); + } + } + + /// + /// Sends real-time upload progress update via WebSocket + /// + private async Task SendUploadProgressUpdateAsync(PersistentUploadTask task, double newProgress, double previousProgress) + { + try + { + // Only send significant progress updates (every 5% or major milestones) + if (Math.Abs(newProgress - previousProgress) < 5 && newProgress < 100) + return; + + var progressData = new UploadProgressData + { + TaskId = task.TaskId, + FileName = task.FileName, + FileSize = task.FileSize, + ChunksUploaded = task.ChunksUploaded, + ChunksTotal = task.ChunksCount, + Progress = newProgress, + Status = task.Status.ToString(), + LastActivity = task.LastActivity.ToString("%O", null) + }; + + var packet = new WebSocketPacket + { + Type = "upload.progress", + Data = Google.Protobuf.ByteString.CopyFromUtf8(System.Text.Json.JsonSerializer.Serialize(progressData)) + }; + + await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest + { + UserId = task.AccountId.ToString(), + Packet = packet + }); + } + catch (Exception ex) + { + logger.LogWarning(ex, "Failed to send upload progress update for task {TaskId}", task.TaskId); + } + } + + #endregion } #region Data Transfer Objects @@ -579,3 +1050,58 @@ public class TaskActivity } #endregion + +#region Upload-Specific Data Transfer Objects + +public class UploadProgressData +{ + public string TaskId { get; set; } = null!; + public string FileName { get; set; } = null!; + public long FileSize { get; set; } + public int ChunksUploaded { get; set; } + public int ChunksTotal { get; set; } + public double Progress { get; set; } + public string Status { get; set; } = null!; + public string LastActivity { get; set; } = null!; +} + +public class UploadCompletionData +{ + public string TaskId { get; set; } = null!; + public string FileId { get; set; } = null!; + public string FileName { get; set; } = null!; + public long FileSize { get; set; } + public string CompletedAt { get; set; } = null!; +} + +public class UploadFailureData +{ + public string TaskId { get; set; } = null!; + public string FileName { get; set; } = null!; + public long FileSize { get; set; } + public string FailedAt { get; set; } = null!; + public string ErrorMessage { get; set; } = null!; +} + +public class UserUploadStats +{ + public int TotalTasks { get; set; } + public int InProgressTasks { get; set; } + public int CompletedTasks { get; set; } + public int FailedTasks { get; set; } + public int ExpiredTasks { get; set; } + public long TotalUploadedBytes { get; set; } + public double AverageProgress { get; set; } + public List RecentActivity { get; set; } = new(); +} + +public class RecentActivity +{ + public string TaskId { get; set; } = null!; + public string FileName { get; set; } = null!; + public UploadTaskStatus Status { get; set; } + public Instant LastActivity { get; set; } + public double Progress { get; set; } +} + +#endregion diff --git a/DysonNetwork.Drive/Storage/PersistentUploadService.cs b/DysonNetwork.Drive/Storage/PersistentUploadService.cs deleted file mode 100644 index f2c40d2..0000000 --- a/DysonNetwork.Drive/Storage/PersistentUploadService.cs +++ /dev/null @@ -1,567 +0,0 @@ -using DysonNetwork.Drive.Storage.Model; -using DysonNetwork.Shared.Cache; -using DysonNetwork.Shared.Proto; -using Microsoft.EntityFrameworkCore; -using NodaTime; -using System.Text.Json; -using TaskStatus = DysonNetwork.Drive.Storage.Model.TaskStatus; - -namespace DysonNetwork.Drive.Storage; - -public class PersistentUploadService( - AppDatabase db, - ICacheService cache, - ILogger logger, - RingService.RingServiceClient ringService -) -{ - private const string CacheKeyPrefix = "upload:task:"; - private static readonly TimeSpan CacheDuration = TimeSpan.FromMinutes(30); - - /// - /// Creates a new persistent upload task - /// - public async Task CreateUploadTaskAsync( - string taskId, - CreateUploadTaskRequest request, - Guid accountId - ) - { - var chunkSize = request.ChunkSize ?? 1024 * 1024 * 5; // 5MB default - var chunksCount = (int)Math.Ceiling((double)request.FileSize / chunkSize); - - var uploadTask = new PersistentUploadTask - { - TaskId = taskId, - FileName = request.FileName, - FileSize = request.FileSize, - ContentType = request.ContentType, - ChunkSize = chunkSize, - ChunksCount = chunksCount, - ChunksUploaded = 0, - PoolId = request.PoolId.Value, - BundleId = request.BundleId, - EncryptPassword = request.EncryptPassword, - ExpiredAt = request.ExpiredAt, - Hash = request.Hash, - AccountId = accountId, - Status = Model.TaskStatus.InProgress, - UploadedChunks = new List(), - LastActivity = SystemClock.Instance.GetCurrentInstant() - }; - - db.UploadTasks.Add(uploadTask); - await db.SaveChangesAsync(); - - await SetCacheAsync(uploadTask); - return uploadTask; - } - - /// - /// Gets an existing upload task by ID - /// - public async Task GetUploadTaskAsync(string taskId) - { - var cacheKey = $"{CacheKeyPrefix}{taskId}"; - var cachedTask = await cache.GetAsync(cacheKey); - if (cachedTask is not null) - return cachedTask; - - var task = await db.Tasks - .OfType() - .FirstOrDefaultAsync(t => t.TaskId == taskId && t.Status == TaskStatus.InProgress); - - if (task is not null) - await SetCacheAsync(task); - - return task; - } - - /// - /// Updates chunk upload progress - /// - public async Task UpdateChunkProgressAsync(string taskId, int chunkIndex) - { - var task = await GetUploadTaskAsync(taskId); - if (task is null) return; - - if (!task.UploadedChunks.Contains(chunkIndex)) - { - var previousProgress = task.ChunksCount > 0 ? (double)task.ChunksUploaded / task.ChunksCount * 100 : 0; - - task.UploadedChunks.Add(chunkIndex); - task.ChunksUploaded = task.UploadedChunks.Count; - task.LastActivity = SystemClock.Instance.GetCurrentInstant(); - - await db.SaveChangesAsync(); - await SetCacheAsync(task); - - // Send real-time progress update - var newProgress = task.ChunksCount > 0 ? (double)task.ChunksUploaded / task.ChunksCount * 100 : 0; - await SendUploadProgressUpdateAsync(task, newProgress, previousProgress); - } - } - - /// - /// Marks an upload task as completed - /// - public async Task MarkTaskCompletedAsync(string taskId) - { - var task = await GetUploadTaskAsync(taskId); - if (task is null) return; - - task.Status = Model.TaskStatus.Completed; - task.LastActivity = SystemClock.Instance.GetCurrentInstant(); - - await db.SaveChangesAsync(); - await RemoveCacheAsync(taskId); - } - - /// - /// Marks an upload task as failed - /// - public async Task MarkTaskFailedAsync(string taskId) - { - var task = await GetUploadTaskAsync(taskId); - if (task is null) return; - - task.Status = Model.TaskStatus.Failed; - task.LastActivity = SystemClock.Instance.GetCurrentInstant(); - - await db.SaveChangesAsync(); - await RemoveCacheAsync(taskId); - } - - /// - /// Gets all resumable tasks for an account - /// - public async Task> GetResumableTasksAsync(Guid accountId) - { - return await db.Tasks - .OfType() - .Where(t => t.AccountId == accountId && - t.Status == Model.TaskStatus.InProgress && - t.LastActivity > SystemClock.Instance.GetCurrentInstant() - Duration.FromHours(24)) - .OrderByDescending(t => t.LastActivity) - .ToListAsync(); - } - - /// - /// Gets user tasks with filtering and pagination - /// - public async Task<(List Items, int TotalCount)> GetUserTasksAsync( - Guid accountId, - UploadTaskStatus? status = null, - string? sortBy = "lastActivity", - bool sortDescending = true, - int offset = 0, - int limit = 50 - ) - { - var query = db.Tasks.OfType().Where(t => t.AccountId == accountId); - - // Apply status filter - if (status.HasValue) - { - query = query.Where(t => t.Status == (TaskStatus)status.Value); - } - - // Get total count - var totalCount = await query.CountAsync(); - - // Apply sorting - IOrderedQueryable orderedQuery; - switch (sortBy?.ToLower()) - { - case "filename": - orderedQuery = sortDescending - ? query.OrderByDescending(t => t.FileName) - : query.OrderBy(t => t.FileName); - break; - case "filesize": - orderedQuery = sortDescending - ? query.OrderByDescending(t => t.FileSize) - : query.OrderBy(t => t.FileSize); - break; - case "createdat": - orderedQuery = sortDescending - ? query.OrderByDescending(t => t.CreatedAt) - : query.OrderBy(t => t.CreatedAt); - break; - case "updatedat": - orderedQuery = sortDescending - ? query.OrderByDescending(t => t.UpdatedAt) - : query.OrderBy(t => t.UpdatedAt); - break; - case "lastactivity": - default: - orderedQuery = sortDescending - ? query.OrderByDescending(t => t.LastActivity) - : query.OrderBy(t => t.LastActivity); - break; - } - - // Apply pagination - var items = await orderedQuery - .Skip(offset) - .Take(limit) - .ToListAsync(); - - return (items, totalCount); - } - - /// - /// Checks if a chunk has already been uploaded - /// - public async Task IsChunkUploadedAsync(string taskId, int chunkIndex) - { - var task = await GetUploadTaskAsync(taskId); - return task?.UploadedChunks.Contains(chunkIndex) ?? false; - } - - /// - /// Cleans up expired/stale upload tasks - /// - public async Task CleanupStaleTasksAsync() - { - var now = SystemClock.Instance.GetCurrentInstant(); - var staleThreshold = now - Duration.FromHours(24); // 24 hours - - var staleTasks = await db.Tasks - .OfType() - .Where(t => t.Status == Model.TaskStatus.InProgress && - t.LastActivity < staleThreshold) - .ToListAsync(); - - foreach (var task in staleTasks) - { - task.Status = Model.TaskStatus.Expired; - await RemoveCacheAsync(task.TaskId); - - // Clean up temp files - var taskPath = Path.Combine(Path.GetTempPath(), "multipart-uploads", task.TaskId); - if (Directory.Exists(taskPath)) - { - try - { - Directory.Delete(taskPath, true); - } - catch (Exception ex) - { - logger.LogWarning(ex, "Failed to cleanup temp files for task {TaskId}", task.TaskId); - } - } - } - - await db.SaveChangesAsync(); - - if (staleTasks.Any()) - { - logger.LogInformation("Cleaned up {Count} stale upload tasks", staleTasks.Count); - } - } - - /// - /// Gets upload progress as percentage - /// - public async Task GetUploadProgressAsync(string taskId) - { - var task = await GetUploadTaskAsync(taskId); - if (task is null || task.ChunksCount == 0) return 0; - - return (double)task.ChunksUploaded / task.ChunksCount * 100; - } - - private async Task SetCacheAsync(PersistentUploadTask task) - { - var cacheKey = $"{CacheKeyPrefix}{task.TaskId}"; - await cache.SetAsync(cacheKey, task, CacheDuration); - } - - private async Task RemoveCacheAsync(string taskId) - { - var cacheKey = $"{CacheKeyPrefix}{taskId}"; - await cache.RemoveAsync(cacheKey); - } - - /// - /// Gets upload statistics for a user - /// - public async Task GetUserUploadStatsAsync(Guid accountId) - { - var tasks = await db.Tasks - .OfType() - .Where(t => t.AccountId == accountId) - .ToListAsync(); - - var stats = new UserUploadStats - { - TotalTasks = tasks.Count, - InProgressTasks = tasks.Count(t => t.Status == Model.TaskStatus.InProgress), - CompletedTasks = tasks.Count(t => t.Status == Model.TaskStatus.Completed), - FailedTasks = tasks.Count(t => t.Status == Model.TaskStatus.Failed), - ExpiredTasks = tasks.Count(t => t.Status == Model.TaskStatus.Expired), - TotalUploadedBytes = tasks.Sum(t => (long)t.ChunksUploaded * t.ChunkSize), - AverageProgress = tasks.Any(t => t.Status == Model.TaskStatus.InProgress) - ? tasks.Where(t => t.Status == Model.TaskStatus.InProgress) - .Average(t => t.ChunksCount > 0 ? (double)t.ChunksUploaded / t.ChunksCount * 100 : 0) - : 0, - RecentActivity = tasks.OrderByDescending(t => t.LastActivity) - .Take(5) - .Select(t => new RecentActivity - { - TaskId = t.TaskId, - FileName = t.FileName, - Status = (UploadTaskStatus)t.Status, - LastActivity = t.LastActivity, - Progress = t.ChunksCount > 0 ? (double)t.ChunksUploaded / t.ChunksCount * 100 : 0 - }) - .ToList() - }; - - return stats; - } - - /// - /// Cleans up failed tasks for a user - /// - public async Task CleanupUserFailedTasksAsync(Guid accountId) - { - var failedTasks = await db.Tasks - .OfType() - .Where(t => t.AccountId == accountId && - (t.Status == Model.TaskStatus.Failed || t.Status == Model.TaskStatus.Expired)) - .ToListAsync(); - - foreach (var task in failedTasks) - { - await RemoveCacheAsync(task.TaskId); - - // Clean up temp files - var taskPath = Path.Combine(Path.GetTempPath(), "multipart-uploads", task.TaskId); - if (Directory.Exists(taskPath)) - { - try - { - Directory.Delete(taskPath, true); - } - catch (Exception ex) - { - logger.LogWarning(ex, "Failed to cleanup temp files for task {TaskId}", task.TaskId); - } - } - } - - db.Tasks.RemoveRange(failedTasks); - await db.SaveChangesAsync(); - - return failedTasks.Count; - } - - /// - /// Gets recent tasks for a user - /// - public async Task> GetRecentUserTasksAsync(Guid accountId, int limit = 10) - { - return await db.Tasks - .OfType() - .Where(t => t.AccountId == accountId) - .OrderByDescending(t => t.LastActivity) - .Take(limit) - .ToListAsync(); - } - - /// - /// Sends real-time upload progress update via WebSocket - /// - private async Task SendUploadProgressUpdateAsync(PersistentUploadTask task, double newProgress, double previousProgress) - { - try - { - // Only send significant progress updates (every 5% or major milestones) - if (Math.Abs(newProgress - previousProgress) < 5 && newProgress < 100) - return; - - var progressData = new UploadProgressData - { - TaskId = task.TaskId, - FileName = task.FileName, - FileSize = task.FileSize, - ChunksUploaded = task.ChunksUploaded, - ChunksTotal = task.ChunksCount, - Progress = newProgress, - Status = task.Status.ToString(), - LastActivity = task.LastActivity.ToString("O", null) - }; - - var packet = new WebSocketPacket - { - Type = "upload.progress", - Data = Google.Protobuf.ByteString.CopyFromUtf8(System.Text.Json.JsonSerializer.Serialize(progressData)) - }; - - await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest - { - UserId = task.AccountId.ToString(), - Packet = packet - }); - } - catch (Exception ex) - { - logger.LogWarning(ex, "Failed to send upload progress update for task {TaskId}", task.TaskId); - } - } - - /// - /// Sends upload completion notification - /// - public async Task SendUploadCompletedNotificationAsync(PersistentUploadTask task, string fileId) - { - try - { - var completionData = new UploadCompletionData - { - TaskId = task.TaskId, - FileId = fileId, - FileName = task.FileName, - FileSize = task.FileSize, - CompletedAt = SystemClock.Instance.GetCurrentInstant().ToString("O", null) - }; - - // Send WebSocket notification - var wsPacket = new WebSocketPacket - { - Type = "upload.completed", - Data = Google.Protobuf.ByteString.CopyFromUtf8(System.Text.Json.JsonSerializer.Serialize(completionData)) - }; - - await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest - { - UserId = task.AccountId.ToString(), - Packet = wsPacket - }); - - // Send push notification - var pushNotification = new PushNotification - { - Topic = "upload", - Title = "Upload Completed", - Subtitle = task.FileName, - Body = $"Your file '{task.FileName}' has been uploaded successfully.", - IsSavable = true - }; - - await ringService.SendPushNotificationToUserAsync(new SendPushNotificationToUserRequest - { - UserId = task.AccountId.ToString(), - Notification = pushNotification - }); - } - catch (Exception ex) - { - logger.LogWarning(ex, "Failed to send upload completion notification for task {TaskId}", task.TaskId); - } - } - - /// - /// Sends upload failure notification - /// - public async Task SendUploadFailedNotificationAsync(PersistentUploadTask task, string? errorMessage = null) - { - try - { - var failureData = new UploadFailureData - { - TaskId = task.TaskId, - FileName = task.FileName, - FileSize = task.FileSize, - FailedAt = SystemClock.Instance.GetCurrentInstant().ToString("O", null), - ErrorMessage = errorMessage ?? "Upload failed due to an unknown error" - }; - - // Send WebSocket notification - var wsPacket = new WebSocketPacket - { - Type = "upload.failed", - Data = Google.Protobuf.ByteString.CopyFromUtf8(System.Text.Json.JsonSerializer.Serialize(failureData)) - }; - - await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest - { - UserId = task.AccountId.ToString(), - Packet = wsPacket - }); - - // Send push notification - var pushNotification = new PushNotification - { - Topic = "upload", - Title = "Upload Failed", - Subtitle = task.FileName, - Body = $"Your file '{task.FileName}' upload has failed. You can try again.", - IsSavable = true - }; - - await ringService.SendPushNotificationToUserAsync(new SendPushNotificationToUserRequest - { - UserId = task.AccountId.ToString(), - Notification = pushNotification - }); - } - catch (Exception ex) - { - logger.LogWarning(ex, "Failed to send upload failure notification for task {TaskId}", task.TaskId); - } - } -} - -public class UploadProgressData -{ - public string TaskId { get; set; } = null!; - public string FileName { get; set; } = null!; - public long FileSize { get; set; } - public int ChunksUploaded { get; set; } - public int ChunksTotal { get; set; } - public double Progress { get; set; } - public string Status { get; set; } = null!; - public string LastActivity { get; set; } = null!; -} - -public class UploadCompletionData -{ - public string TaskId { get; set; } = null!; - public string FileId { get; set; } = null!; - public string FileName { get; set; } = null!; - public long FileSize { get; set; } - public string CompletedAt { get; set; } = null!; -} - -public class UploadFailureData -{ - public string TaskId { get; set; } = null!; - public string FileName { get; set; } = null!; - public long FileSize { get; set; } - public string FailedAt { get; set; } = null!; - public string ErrorMessage { get; set; } = null!; -} - -public class UserUploadStats -{ - public int TotalTasks { get; set; } - public int InProgressTasks { get; set; } - public int CompletedTasks { get; set; } - public int FailedTasks { get; set; } - public int ExpiredTasks { get; set; } - public long TotalUploadedBytes { get; set; } - public double AverageProgress { get; set; } - public List RecentActivity { get; set; } = new(); -} - -public class RecentActivity -{ - public string TaskId { get; set; } = null!; - public string FileName { get; set; } = null!; - public UploadTaskStatus Status { get; set; } - public Instant LastActivity { get; set; } - public double Progress { get; set; } -} diff --git a/DysonNetwork.Drive/appsettings.json b/DysonNetwork.Drive/appsettings.json index 1186138..ca65a37 100644 --- a/DysonNetwork.Drive/appsettings.json +++ b/DysonNetwork.Drive/appsettings.json @@ -32,7 +32,7 @@ }, "Storage": { "Uploads": "Uploads", - "PreferredRemote": "2adceae3-981a-4564-9b8d-5d71a211c873", + "PreferredRemote": "c53136a6-9152-4ecb-9f88-43c41438c23e", "Remote": [ { "Id": "minio",