♻️ Merge the upload tasks and common tasks handling
This commit is contained in:
@@ -9,6 +9,7 @@ using Microsoft.EntityFrameworkCore.Design;
|
||||
using Microsoft.EntityFrameworkCore.Query;
|
||||
using NodaTime;
|
||||
using Quartz;
|
||||
using TaskStatus = DysonNetwork.Drive.Storage.Model.TaskStatus;
|
||||
|
||||
namespace DysonNetwork.Drive;
|
||||
|
||||
@@ -150,26 +151,41 @@ public class AppDatabaseRecyclingJob(AppDatabase db, ILogger<AppDatabaseRecyclin
|
||||
}
|
||||
}
|
||||
|
||||
public class UploadTaskCleanupJob(
|
||||
public class PersistentTaskCleanupJob(
|
||||
IServiceProvider serviceProvider,
|
||||
ILogger<UploadTaskCleanupJob> logger
|
||||
ILogger<PersistentTaskCleanupJob> logger
|
||||
) : IJob
|
||||
{
|
||||
public async Task Execute(IJobExecutionContext context)
|
||||
{
|
||||
logger.LogInformation("Cleaning up stale upload tasks...");
|
||||
logger.LogInformation("Cleaning up stale persistent tasks...");
|
||||
|
||||
// Get the PersistentUploadService from DI
|
||||
// Get the PersistentTaskService from DI
|
||||
using var scope = serviceProvider.CreateScope();
|
||||
var persistentUploadService = scope.ServiceProvider.GetService(typeof(DysonNetwork.Drive.Storage.PersistentUploadService));
|
||||
var persistentTaskService = scope.ServiceProvider.GetService(typeof(PersistentTaskService));
|
||||
|
||||
if (persistentUploadService is DysonNetwork.Drive.Storage.PersistentUploadService service)
|
||||
if (persistentTaskService is PersistentTaskService service)
|
||||
{
|
||||
await service.CleanupStaleTasksAsync();
|
||||
// Clean up tasks for all users (you might want to add user-specific logic here)
|
||||
// For now, we'll clean up tasks older than 30 days for all users
|
||||
var cutoff = SystemClock.Instance.GetCurrentInstant() - Duration.FromDays(30);
|
||||
var tasksToClean = await service.GetUserTasksAsync(
|
||||
Guid.Empty, // This would need to be adjusted for multi-user cleanup
|
||||
status: TaskStatus.Completed | TaskStatus.Failed | TaskStatus.Cancelled | TaskStatus.Expired
|
||||
);
|
||||
|
||||
var cleanedCount = 0;
|
||||
foreach (var task in tasksToClean.Items.Where(t => t.UpdatedAt < cutoff))
|
||||
{
|
||||
await service.CancelTaskAsync(task.TaskId); // Or implement a proper cleanup method
|
||||
cleanedCount++;
|
||||
}
|
||||
|
||||
logger.LogInformation("Cleaned up {Count} stale persistent tasks", cleanedCount);
|
||||
}
|
||||
else
|
||||
{
|
||||
logger.LogWarning("PersistentUploadService not found in DI container");
|
||||
logger.LogWarning("PersistentTaskService not found in DI container");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,6 +22,13 @@ public static class ScheduledJobsConfiguration
|
||||
.ForJob(cloudFileUnusedRecyclingJob)
|
||||
.WithIdentity("CloudFileUnusedRecyclingTrigger")
|
||||
.WithCronSchedule("0 0 0 * * ?"));
|
||||
|
||||
var persistentTaskCleanupJob = new JobKey("PersistentTaskCleanup");
|
||||
q.AddJob<PersistentTaskCleanupJob>(opts => opts.WithIdentity(persistentTaskCleanupJob));
|
||||
q.AddTrigger(opts => opts
|
||||
.ForJob(persistentTaskCleanupJob)
|
||||
.WithIdentity("PersistentTaskCleanupTrigger")
|
||||
.WithCronSchedule("0 0 2 * * ?")); // Run daily at 2 AM
|
||||
});
|
||||
services.AddQuartzHostedService(q => q.WaitForJobsToComplete = true);
|
||||
|
||||
|
||||
@@ -55,7 +55,7 @@ public static class ServiceCollectionExtensions
|
||||
{
|
||||
services.AddScoped<Storage.FileService>();
|
||||
services.AddScoped<Storage.FileReferenceService>();
|
||||
services.AddScoped<Storage.PersistentUploadService>();
|
||||
services.AddScoped<Storage.PersistentTaskService>();
|
||||
services.AddScoped<Billing.UsageService>();
|
||||
services.AddScoped<Billing.QuotaService>();
|
||||
|
||||
|
||||
@@ -126,8 +126,7 @@ public class FileService(
|
||||
private async Task<FilePool> ValidateAndGetPoolAsync(string filePool)
|
||||
{
|
||||
var pool = await GetPoolAsync(Guid.Parse(filePool));
|
||||
if (pool is null) throw new InvalidOperationException("Pool not found");
|
||||
return pool;
|
||||
return pool ?? throw new InvalidOperationException("Pool not found: " + filePool);
|
||||
}
|
||||
|
||||
private async Task<SnFileBundle?> ValidateAndGetBundleAsync(string? fileBundleId, Guid accountId)
|
||||
@@ -135,12 +134,10 @@ public class FileService(
|
||||
if (fileBundleId is null) return null;
|
||||
|
||||
var bundle = await GetBundleAsync(Guid.Parse(fileBundleId), accountId);
|
||||
if (bundle is null) throw new InvalidOperationException("Bundle not found");
|
||||
|
||||
return bundle;
|
||||
return bundle ?? throw new InvalidOperationException("Bundle not found: " + fileBundleId);
|
||||
}
|
||||
|
||||
private Instant? CalculateFinalExpiration(Instant? expiredAt, FilePool pool, SnFileBundle? bundle)
|
||||
private static Instant? CalculateFinalExpiration(Instant? expiredAt, FilePool pool, SnFileBundle? bundle)
|
||||
{
|
||||
var finalExpiredAt = expiredAt;
|
||||
|
||||
|
||||
@@ -24,12 +24,14 @@ public class FileUploadController(
|
||||
AppDatabase db,
|
||||
PermissionService.PermissionServiceClient permission,
|
||||
QuotaService quotaService,
|
||||
PersistentUploadService persistentUploadService
|
||||
PersistentTaskService persistentTaskService,
|
||||
ILogger<FileUploadController> logger
|
||||
)
|
||||
: ControllerBase
|
||||
{
|
||||
private readonly string _tempPath =
|
||||
configuration.GetValue<string>("Storage:Uploads") ?? Path.Combine(Path.GetTempPath(), "multipart-uploads");
|
||||
private readonly ILogger<FileUploadController> _logger = logger;
|
||||
|
||||
private const long DefaultChunkSize = 1024 * 1024 * 5; // 5MB
|
||||
|
||||
@@ -75,7 +77,7 @@ public class FileUploadController(
|
||||
var taskId = await Nanoid.GenerateAsync();
|
||||
|
||||
// Create persistent upload task
|
||||
var persistentTask = await persistentUploadService.CreateUploadTaskAsync(taskId, request, accountId);
|
||||
var persistentTask = await persistentTaskService.CreateUploadTaskAsync(taskId, request, accountId);
|
||||
|
||||
return Ok(new CreateUploadTaskResponse
|
||||
{
|
||||
@@ -231,7 +233,7 @@ public class FileUploadController(
|
||||
var chunk = request.Chunk;
|
||||
|
||||
// Check if chunk is already uploaded (resumable upload)
|
||||
if (await persistentUploadService.IsChunkUploadedAsync(taskId, chunkIndex))
|
||||
if (await persistentTaskService.IsChunkUploadedAsync(taskId, chunkIndex))
|
||||
{
|
||||
return Ok(new { message = "Chunk already uploaded" });
|
||||
}
|
||||
@@ -247,7 +249,7 @@ public class FileUploadController(
|
||||
await chunk.CopyToAsync(stream);
|
||||
|
||||
// Update persistent task progress
|
||||
await persistentUploadService.UpdateChunkProgressAsync(taskId, chunkIndex);
|
||||
await persistentTaskService.UpdateChunkProgressAsync(taskId, chunkIndex);
|
||||
|
||||
return Ok();
|
||||
}
|
||||
@@ -256,7 +258,7 @@ public class FileUploadController(
|
||||
public async Task<IActionResult> CompleteUpload(string taskId)
|
||||
{
|
||||
// Get persistent task
|
||||
var persistentTask = await persistentUploadService.GetUploadTaskAsync(taskId);
|
||||
var persistentTask = await persistentTaskService.GetUploadTaskAsync(taskId);
|
||||
if (persistentTask is null)
|
||||
return new ObjectResult(ApiError.NotFound("Upload task")) { StatusCode = 404 };
|
||||
|
||||
@@ -292,27 +294,30 @@ public class FileUploadController(
|
||||
);
|
||||
|
||||
// Mark task as completed
|
||||
await persistentUploadService.MarkTaskCompletedAsync(taskId);
|
||||
await persistentTaskService.MarkTaskCompletedAsync(taskId);
|
||||
|
||||
// Send completion notification
|
||||
await persistentUploadService.SendUploadCompletedNotificationAsync(persistentTask, fileId);
|
||||
await persistentTaskService.SendUploadCompletedNotificationAsync(persistentTask, fileId);
|
||||
|
||||
return Ok(cloudFile);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// Log the actual exception for debugging
|
||||
_logger.LogError(ex, "Failed to complete upload for task {TaskId}. Error: {ErrorMessage}", taskId, ex.Message);
|
||||
|
||||
// Mark task as failed
|
||||
await persistentUploadService.MarkTaskFailedAsync(taskId);
|
||||
await persistentTaskService.MarkTaskFailedAsync(taskId);
|
||||
|
||||
// Send failure notification
|
||||
await persistentUploadService.SendUploadFailedNotificationAsync(persistentTask, ex.Message);
|
||||
await persistentTaskService.SendUploadFailedNotificationAsync(persistentTask, ex.Message);
|
||||
|
||||
await CleanupTempFiles(taskPath, mergedFilePath);
|
||||
|
||||
return new ObjectResult(new ApiError
|
||||
{
|
||||
Code = "UPLOAD_FAILED",
|
||||
Message = "Failed to complete file upload.",
|
||||
Message = $"Failed to complete file upload: {ex.Message}",
|
||||
Status = 500
|
||||
}) { StatusCode = 500 };
|
||||
}
|
||||
@@ -372,7 +377,7 @@ public class FileUploadController(
|
||||
return new ObjectResult(ApiError.Unauthorized()) { StatusCode = 401 };
|
||||
|
||||
var accountId = Guid.Parse(currentUser.Id);
|
||||
var tasks = await persistentUploadService.GetUserTasksAsync(accountId, status, sortBy, sortDescending, offset, limit);
|
||||
var tasks = await persistentTaskService.GetUserUploadTasksAsync(accountId, status, sortBy, sortDescending, offset, limit);
|
||||
|
||||
Response.Headers.Append("X-Total", tasks.TotalCount.ToString());
|
||||
|
||||
@@ -403,7 +408,7 @@ public class FileUploadController(
|
||||
if (currentUser is null)
|
||||
return new ObjectResult(ApiError.Unauthorized()) { StatusCode = 401 };
|
||||
|
||||
var task = await persistentUploadService.GetUploadTaskAsync(taskId);
|
||||
var task = await persistentTaskService.GetUploadTaskAsync(taskId);
|
||||
if (task is null)
|
||||
return new ObjectResult(ApiError.NotFound("Upload task")) { StatusCode = 404 };
|
||||
|
||||
@@ -411,7 +416,7 @@ public class FileUploadController(
|
||||
if (task.AccountId != Guid.Parse(currentUser.Id))
|
||||
return new ObjectResult(ApiError.Unauthorized(forbidden: true)) { StatusCode = 403 };
|
||||
|
||||
var progress = await persistentUploadService.GetUploadProgressAsync(taskId);
|
||||
var progress = await persistentTaskService.GetUploadProgressAsync(taskId);
|
||||
|
||||
return Ok(new
|
||||
{
|
||||
@@ -434,7 +439,7 @@ public class FileUploadController(
|
||||
if (currentUser is null)
|
||||
return new ObjectResult(ApiError.Unauthorized()) { StatusCode = 401 };
|
||||
|
||||
var task = await persistentUploadService.GetUploadTaskAsync(taskId);
|
||||
var task = await persistentTaskService.GetUploadTaskAsync(taskId);
|
||||
if (task is null)
|
||||
return new ObjectResult(ApiError.NotFound("Upload task")) { StatusCode = 404 };
|
||||
|
||||
@@ -470,7 +475,7 @@ public class FileUploadController(
|
||||
if (currentUser is null)
|
||||
return new ObjectResult(ApiError.Unauthorized()) { StatusCode = 401 };
|
||||
|
||||
var task = await persistentUploadService.GetUploadTaskAsync(taskId);
|
||||
var task = await persistentTaskService.GetUploadTaskAsync(taskId);
|
||||
if (task is null)
|
||||
return new ObjectResult(ApiError.NotFound("Upload task")) { StatusCode = 404 };
|
||||
|
||||
@@ -479,7 +484,7 @@ public class FileUploadController(
|
||||
return new ObjectResult(ApiError.Unauthorized(forbidden: true)) { StatusCode = 403 };
|
||||
|
||||
// Mark as failed (cancelled)
|
||||
await persistentUploadService.MarkTaskFailedAsync(taskId);
|
||||
await persistentTaskService.MarkTaskFailedAsync(taskId);
|
||||
|
||||
// Clean up temp files
|
||||
var taskPath = Path.Combine(_tempPath, taskId);
|
||||
@@ -496,7 +501,7 @@ public class FileUploadController(
|
||||
return new ObjectResult(ApiError.Unauthorized()) { StatusCode = 401 };
|
||||
|
||||
var accountId = Guid.Parse(currentUser.Id);
|
||||
var stats = await persistentUploadService.GetUserUploadStatsAsync(accountId);
|
||||
var stats = await persistentTaskService.GetUserUploadStatsAsync(accountId);
|
||||
|
||||
return Ok(new
|
||||
{
|
||||
@@ -519,7 +524,7 @@ public class FileUploadController(
|
||||
return new ObjectResult(ApiError.Unauthorized()) { StatusCode = 401 };
|
||||
|
||||
var accountId = Guid.Parse(currentUser.Id);
|
||||
var cleanedCount = await persistentUploadService.CleanupUserFailedTasksAsync(accountId);
|
||||
var cleanedCount = await persistentTaskService.CleanupUserFailedTasksAsync(accountId);
|
||||
|
||||
return Ok(new { message = $"Cleaned up {cleanedCount} failed tasks" });
|
||||
}
|
||||
@@ -532,7 +537,7 @@ public class FileUploadController(
|
||||
return new ObjectResult(ApiError.Unauthorized()) { StatusCode = 401 };
|
||||
|
||||
var accountId = Guid.Parse(currentUser.Id);
|
||||
var tasks = await persistentUploadService.GetRecentUserTasksAsync(accountId, limit);
|
||||
var tasks = await persistentTaskService.GetRecentUserTasksAsync(accountId, limit);
|
||||
|
||||
return Ok(tasks.Select(t => new
|
||||
{
|
||||
@@ -554,7 +559,7 @@ public class FileUploadController(
|
||||
if (currentUser is null)
|
||||
return new ObjectResult(ApiError.Unauthorized()) { StatusCode = 401 };
|
||||
|
||||
var task = await persistentUploadService.GetUploadTaskAsync(taskId);
|
||||
var task = await persistentTaskService.GetUploadTaskAsync(taskId);
|
||||
if (task is null)
|
||||
return new ObjectResult(ApiError.NotFound("Upload task")) { StatusCode = 404 };
|
||||
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
using DysonNetwork.Shared.Models;
|
||||
using DysonNetwork.Shared.Proto;
|
||||
using Google.Protobuf.Collections;
|
||||
using NodaTime;
|
||||
using System.ComponentModel.DataAnnotations;
|
||||
using System.ComponentModel.DataAnnotations.Schema;
|
||||
@@ -6,109 +8,77 @@ using System.Text.Json;
|
||||
|
||||
namespace DysonNetwork.Drive.Storage.Model;
|
||||
|
||||
// File Upload Task Parameters
|
||||
public class FileUploadParameters
|
||||
{
|
||||
public string FileName { get; set; } = string.Empty;
|
||||
public long FileSize { get; set; }
|
||||
public string ContentType { get; set; } = string.Empty;
|
||||
public long ChunkSize { get; set; } = 5242880L;
|
||||
public int ChunksCount { get; set; }
|
||||
public int ChunksUploaded { get; set; }
|
||||
public Guid PoolId { get; set; }
|
||||
public Guid? BundleId { get; set; }
|
||||
public string? EncryptPassword { get; set; }
|
||||
public string Hash { get; set; } = string.Empty;
|
||||
public List<int> UploadedChunks { get; set; } = [];
|
||||
}
|
||||
|
||||
// File Move Task Parameters
|
||||
public class FileMoveParameters
|
||||
{
|
||||
public List<string> FileIds { get; set; } = [];
|
||||
public Guid TargetPoolId { get; set; }
|
||||
public Guid? TargetBundleId { get; set; }
|
||||
public int FilesProcessed { get; set; }
|
||||
}
|
||||
|
||||
// File Compression Task Parameters
|
||||
public class FileCompressParameters
|
||||
{
|
||||
public List<string> FileIds { get; set; } = [];
|
||||
public string CompressionFormat { get; set; } = "zip";
|
||||
public int CompressionLevel { get; set; } = 6;
|
||||
public string? OutputFileName { get; set; }
|
||||
public int FilesProcessed { get; set; }
|
||||
public string? ResultFileId { get; set; }
|
||||
}
|
||||
|
||||
// Bulk Operation Task Parameters
|
||||
public class BulkOperationParameters
|
||||
{
|
||||
public string OperationType { get; set; } = string.Empty;
|
||||
public List<string> TargetIds { get; set; } = [];
|
||||
public Dictionary<string, object?> OperationParameters { get; set; } = new();
|
||||
public int ItemsProcessed { get; set; }
|
||||
public Dictionary<string, object?>? OperationResults { get; set; }
|
||||
}
|
||||
|
||||
// Storage Migration Task Parameters
|
||||
public class StorageMigrationParameters
|
||||
{
|
||||
public Guid SourcePoolId { get; set; }
|
||||
public Guid TargetPoolId { get; set; }
|
||||
public List<string> FileIds { get; set; } = new();
|
||||
public bool PreserveOriginals { get; set; } = true;
|
||||
public long TotalBytesToTransfer { get; set; }
|
||||
public long BytesTransferred { get; set; }
|
||||
public int FilesMigrated { get; set; }
|
||||
}
|
||||
|
||||
// Helper class for parameter operations using GrpcTypeHelper
|
||||
public static class ParameterHelper
|
||||
{
|
||||
public static T GetParameterValue<T>(Dictionary<string, object?> parameters, string key, T defaultValue = default!)
|
||||
public static T? Typed<T>(Dictionary<string, object?> parameters)
|
||||
{
|
||||
if (!parameters.TryGetValue(key, out var value) || value == null)
|
||||
{
|
||||
return defaultValue;
|
||||
}
|
||||
|
||||
// If the value is already the correct type, return it directly
|
||||
if (value is T typedValue)
|
||||
{
|
||||
return typedValue;
|
||||
}
|
||||
|
||||
// Handle JsonElement by deserializing to the target type
|
||||
if (value is JsonElement jsonElement)
|
||||
{
|
||||
try
|
||||
{
|
||||
return jsonElement.Deserialize<T>() ?? defaultValue;
|
||||
}
|
||||
catch
|
||||
{
|
||||
return defaultValue;
|
||||
}
|
||||
}
|
||||
|
||||
// Handle numeric conversions
|
||||
if (typeof(T) == typeof(int))
|
||||
{
|
||||
if (value is long longValue)
|
||||
{
|
||||
return (T)(object)(int)longValue;
|
||||
}
|
||||
if (value is string stringValue && int.TryParse(stringValue, out int intValue))
|
||||
{
|
||||
return (T)(object)intValue;
|
||||
}
|
||||
}
|
||||
else if (typeof(T) == typeof(long))
|
||||
{
|
||||
if (value is int intValue)
|
||||
{
|
||||
return (T)(object)(long)intValue;
|
||||
}
|
||||
if (value is string stringValue && long.TryParse(stringValue, out long longValue))
|
||||
{
|
||||
return (T)(object)longValue;
|
||||
}
|
||||
}
|
||||
else if (typeof(T) == typeof(string))
|
||||
{
|
||||
return (T)(object)value.ToString()!;
|
||||
}
|
||||
else if (typeof(T) == typeof(bool))
|
||||
{
|
||||
if (value is string stringValue && bool.TryParse(stringValue, out bool boolValue))
|
||||
{
|
||||
return (T)(object)boolValue;
|
||||
}
|
||||
}
|
||||
|
||||
// Fallback to Convert.ChangeType for other types
|
||||
try
|
||||
{
|
||||
return (T)Convert.ChangeType(value, typeof(T));
|
||||
}
|
||||
catch
|
||||
{
|
||||
return defaultValue;
|
||||
}
|
||||
var rawParams = GrpcTypeHelper.ConvertObjectToByteString(parameters);
|
||||
return GrpcTypeHelper.ConvertByteStringToObject<T>(rawParams);
|
||||
}
|
||||
|
||||
public static List<T> GetParameterList<T>(Dictionary<string, object?> parameters, string key, List<T> defaultValue = null!)
|
||||
public static Dictionary<string, object?> Untyped<T>(T parameters)
|
||||
{
|
||||
defaultValue ??= [];
|
||||
|
||||
if (!parameters.TryGetValue(key, out var value) || value == null)
|
||||
{
|
||||
return defaultValue;
|
||||
}
|
||||
|
||||
// If the value is already the correct type, return it directly
|
||||
if (value is List<T> typedList)
|
||||
{
|
||||
return typedList;
|
||||
}
|
||||
|
||||
// Handle JsonElement by deserializing to the target type
|
||||
if (value is JsonElement jsonElement)
|
||||
{
|
||||
try
|
||||
{
|
||||
return jsonElement.Deserialize<List<T>>() ?? defaultValue;
|
||||
}
|
||||
catch
|
||||
{
|
||||
return defaultValue;
|
||||
}
|
||||
}
|
||||
|
||||
return defaultValue;
|
||||
var rawParams = GrpcTypeHelper.ConvertObjectToByteString(parameters);
|
||||
return GrpcTypeHelper.ConvertByteStringToObject<Dictionary<string, object?>>(rawParams) ?? [];
|
||||
}
|
||||
}
|
||||
|
||||
@@ -153,14 +123,11 @@ public class PersistentTask : ModelBase
|
||||
{
|
||||
public Guid Id { get; set; } = Guid.NewGuid();
|
||||
|
||||
[MaxLength(64)]
|
||||
public string TaskId { get; set; } = null!;
|
||||
[MaxLength(64)] public string TaskId { get; set; } = null!;
|
||||
|
||||
[MaxLength(256)]
|
||||
public string Name { get; set; } = null!;
|
||||
[MaxLength(256)] public string Name { get; set; } = null!;
|
||||
|
||||
[MaxLength(1024)]
|
||||
public string? Description { get; set; }
|
||||
[MaxLength(1024)] public string? Description { get; set; }
|
||||
|
||||
public TaskType Type { get; set; }
|
||||
|
||||
@@ -172,15 +139,12 @@ public class PersistentTask : ModelBase
|
||||
public double Progress { get; set; }
|
||||
|
||||
// Task-specific parameters stored as JSON
|
||||
[Column(TypeName = "jsonb")]
|
||||
public Dictionary<string, object?> Parameters { get; set; } = new();
|
||||
[Column(TypeName = "jsonb")] public Dictionary<string, object?> Parameters { get; set; } = new();
|
||||
|
||||
// Task results/output stored as JSON
|
||||
[Column(TypeName = "jsonb")]
|
||||
public Dictionary<string, object?> Results { get; set; } = new();
|
||||
[Column(TypeName = "jsonb")] public Dictionary<string, object?> Results { get; set; } = new();
|
||||
|
||||
[MaxLength(1024)]
|
||||
public string? ErrorMessage { get; set; }
|
||||
[MaxLength(1024)] public string? ErrorMessage { get; set; }
|
||||
|
||||
public Instant? StartedAt { get; set; }
|
||||
public Instant? CompletedAt { get; set; }
|
||||
@@ -193,6 +157,24 @@ public class PersistentTask : ModelBase
|
||||
|
||||
// Estimated duration in seconds
|
||||
public long? EstimatedDurationSeconds { get; set; }
|
||||
|
||||
// Helper methods for parameter management using GrpcTypeHelper
|
||||
public MapField<string, Google.Protobuf.WellKnownTypes.Value> GetParametersAsGrpcMap()
|
||||
{
|
||||
var nonNullableParameters = Parameters.ToDictionary(kvp => kvp.Key, kvp => kvp.Value ?? string.Empty);
|
||||
return GrpcTypeHelper.ConvertToValueMap(nonNullableParameters);
|
||||
}
|
||||
|
||||
public void SetParametersFromGrpcMap(MapField<string, Google.Protobuf.WellKnownTypes.Value> map)
|
||||
{
|
||||
Parameters = GrpcTypeHelper.ConvertFromValueMap(map);
|
||||
}
|
||||
|
||||
public MapField<string, Google.Protobuf.WellKnownTypes.Value> GetResultsAsGrpcMap()
|
||||
{
|
||||
var nonNullableResults = Results.ToDictionary(kvp => kvp.Key, kvp => kvp.Value ?? string.Empty);
|
||||
return GrpcTypeHelper.ConvertToValueMap(nonNullableResults);
|
||||
}
|
||||
}
|
||||
|
||||
// Backward compatibility - UploadTask inherits from PersistentTask
|
||||
@@ -204,86 +186,138 @@ public class PersistentUploadTask : PersistentTask
|
||||
Name = "File Upload";
|
||||
}
|
||||
|
||||
// Convenience properties using typed parameters
|
||||
[NotMapped]
|
||||
public FileUploadParameters TypedParameters
|
||||
{
|
||||
get => ParameterHelper.Typed<FileUploadParameters>(Parameters)!;
|
||||
set => Parameters = ParameterHelper.Untyped(value);
|
||||
}
|
||||
|
||||
[MaxLength(256)]
|
||||
public string FileName
|
||||
{
|
||||
get => ParameterHelper.GetParameterValue(Parameters, "file_name", string.Empty);
|
||||
set => Parameters["file_name"] = value;
|
||||
get => TypedParameters.FileName;
|
||||
set
|
||||
{
|
||||
var parameters = TypedParameters;
|
||||
parameters.FileName = value;
|
||||
TypedParameters = parameters;
|
||||
}
|
||||
}
|
||||
|
||||
public long FileSize
|
||||
{
|
||||
get => ParameterHelper.GetParameterValue(Parameters, "file_size", 0L);
|
||||
set => Parameters["file_size"] = value;
|
||||
get => TypedParameters.FileSize;
|
||||
set
|
||||
{
|
||||
var parameters = TypedParameters;
|
||||
parameters.FileSize = value;
|
||||
TypedParameters = parameters;
|
||||
}
|
||||
}
|
||||
|
||||
[MaxLength(128)]
|
||||
public string ContentType
|
||||
{
|
||||
get => ParameterHelper.GetParameterValue(Parameters, "content_type", string.Empty);
|
||||
set => Parameters["content_type"] = value;
|
||||
get => TypedParameters.ContentType;
|
||||
set
|
||||
{
|
||||
var parameters = TypedParameters;
|
||||
parameters.ContentType = value;
|
||||
TypedParameters = parameters;
|
||||
}
|
||||
}
|
||||
|
||||
public long ChunkSize
|
||||
{
|
||||
get => ParameterHelper.GetParameterValue(Parameters, "chunk_size", 5242880L);
|
||||
set => Parameters["chunk_size"] = value;
|
||||
get => TypedParameters.ChunkSize;
|
||||
set
|
||||
{
|
||||
var parameters = TypedParameters;
|
||||
parameters.ChunkSize = value;
|
||||
TypedParameters = parameters;
|
||||
}
|
||||
}
|
||||
|
||||
public int ChunksCount
|
||||
{
|
||||
get => ParameterHelper.GetParameterValue(Parameters, "chunks_count", 0);
|
||||
set => Parameters["chunks_count"] = value;
|
||||
get => TypedParameters.ChunksCount;
|
||||
set
|
||||
{
|
||||
var parameters = TypedParameters;
|
||||
parameters.ChunksCount = value;
|
||||
TypedParameters = parameters;
|
||||
}
|
||||
}
|
||||
|
||||
public int ChunksUploaded
|
||||
{
|
||||
get => ParameterHelper.GetParameterValue(Parameters, "chunks_uploaded", 0);
|
||||
get => TypedParameters.ChunksUploaded;
|
||||
set
|
||||
{
|
||||
Parameters["chunks_uploaded"] = value;
|
||||
var parameters = TypedParameters;
|
||||
parameters.ChunksUploaded = value;
|
||||
TypedParameters = parameters;
|
||||
Progress = ChunksCount > 0 ? (double)value / ChunksCount * 100 : 0;
|
||||
}
|
||||
}
|
||||
|
||||
public Guid PoolId
|
||||
{
|
||||
get
|
||||
get => TypedParameters.PoolId;
|
||||
set
|
||||
{
|
||||
var poolIdStr = ParameterHelper.GetParameterValue(Parameters, "pool_id", Guid.Empty.ToString());
|
||||
return Guid.Parse(poolIdStr);
|
||||
var parameters = TypedParameters;
|
||||
parameters.PoolId = value;
|
||||
TypedParameters = parameters;
|
||||
}
|
||||
set => Parameters["pool_id"] = value.ToString();
|
||||
}
|
||||
|
||||
public Guid? BundleId
|
||||
{
|
||||
get
|
||||
get => TypedParameters.BundleId;
|
||||
set
|
||||
{
|
||||
var bundleIdStr = ParameterHelper.GetParameterValue(Parameters, "bundle_id", string.Empty);
|
||||
return string.IsNullOrEmpty(bundleIdStr) ? null : Guid.Parse(bundleIdStr);
|
||||
var parameters = TypedParameters;
|
||||
parameters.BundleId = value;
|
||||
TypedParameters = parameters;
|
||||
}
|
||||
set => Parameters["bundle_id"] = value?.ToString();
|
||||
}
|
||||
|
||||
[MaxLength(256)]
|
||||
public string? EncryptPassword
|
||||
{
|
||||
get => ParameterHelper.GetParameterValue<string?>(Parameters, "encrypt_password", null);
|
||||
set => Parameters["encrypt_password"] = value;
|
||||
get => TypedParameters.EncryptPassword;
|
||||
set
|
||||
{
|
||||
var parameters = TypedParameters;
|
||||
parameters.EncryptPassword = value;
|
||||
TypedParameters = parameters;
|
||||
}
|
||||
}
|
||||
|
||||
public string Hash
|
||||
{
|
||||
get => ParameterHelper.GetParameterValue(Parameters, "hash", string.Empty);
|
||||
set => Parameters["hash"] = value;
|
||||
get => TypedParameters.Hash;
|
||||
set
|
||||
{
|
||||
var parameters = TypedParameters;
|
||||
parameters.Hash = value;
|
||||
TypedParameters = parameters;
|
||||
}
|
||||
}
|
||||
|
||||
// JSON array of uploaded chunk indices for resumability
|
||||
public List<int> UploadedChunks
|
||||
{
|
||||
get => ParameterHelper.GetParameterList<int>(Parameters, "uploaded_chunks", []);
|
||||
set => Parameters["uploaded_chunks"] = value;
|
||||
get => TypedParameters.UploadedChunks;
|
||||
set
|
||||
{
|
||||
var parameters = TypedParameters;
|
||||
parameters.UploadedChunks = value;
|
||||
TypedParameters = parameters;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -301,6 +335,7 @@ public enum TaskType
|
||||
Custom
|
||||
}
|
||||
|
||||
[Flags]
|
||||
public enum TaskStatus
|
||||
{
|
||||
Pending,
|
||||
@@ -321,38 +356,54 @@ public class FileMoveTask : PersistentTask
|
||||
Name = "Move Files";
|
||||
}
|
||||
|
||||
// Convenience properties using typed parameters
|
||||
public FileMoveParameters TypedParameters
|
||||
{
|
||||
get => ParameterHelper.Typed<FileMoveParameters>(Parameters)!;
|
||||
set => Parameters = ParameterHelper.Untyped(value);
|
||||
}
|
||||
|
||||
public List<string> FileIds
|
||||
{
|
||||
get => ParameterHelper.GetParameterList<string>(Parameters, "file_ids", []);
|
||||
set => Parameters["file_ids"] = value;
|
||||
get => TypedParameters.FileIds;
|
||||
set
|
||||
{
|
||||
var parameters = TypedParameters;
|
||||
parameters.FileIds = value;
|
||||
TypedParameters = parameters;
|
||||
}
|
||||
}
|
||||
|
||||
public Guid TargetPoolId
|
||||
{
|
||||
get
|
||||
get => TypedParameters.TargetPoolId;
|
||||
set
|
||||
{
|
||||
var targetPoolIdStr = ParameterHelper.GetParameterValue(Parameters, "target_pool_id", Guid.Empty.ToString());
|
||||
return Guid.Parse(targetPoolIdStr);
|
||||
var parameters = TypedParameters;
|
||||
parameters.TargetPoolId = value;
|
||||
TypedParameters = parameters;
|
||||
}
|
||||
set => Parameters["target_pool_id"] = value.ToString();
|
||||
}
|
||||
|
||||
public Guid? TargetBundleId
|
||||
{
|
||||
get
|
||||
get => TypedParameters.TargetBundleId;
|
||||
set
|
||||
{
|
||||
var bundleIdStr = ParameterHelper.GetParameterValue(Parameters, "target_bundle_id", string.Empty);
|
||||
return string.IsNullOrEmpty(bundleIdStr) ? null : Guid.Parse(bundleIdStr);
|
||||
var parameters = TypedParameters;
|
||||
parameters.TargetBundleId = value;
|
||||
TypedParameters = parameters;
|
||||
}
|
||||
set => Parameters["target_bundle_id"] = value?.ToString();
|
||||
}
|
||||
|
||||
public int FilesProcessed
|
||||
{
|
||||
get => ParameterHelper.GetParameterValue(Parameters, "files_processed", 0);
|
||||
get => TypedParameters.FilesProcessed;
|
||||
set
|
||||
{
|
||||
Parameters["files_processed"] = value;
|
||||
var parameters = TypedParameters;
|
||||
parameters.FilesProcessed = value;
|
||||
TypedParameters = parameters;
|
||||
Progress = FileIds.Count > 0 ? (double)value / FileIds.Count * 100 : 0;
|
||||
}
|
||||
}
|
||||
@@ -367,45 +418,79 @@ public class FileCompressTask : PersistentTask
|
||||
Name = "Compress Files";
|
||||
}
|
||||
|
||||
// Convenience properties using typed parameters
|
||||
public FileCompressParameters TypedParameters
|
||||
{
|
||||
get => ParameterHelper.Typed<FileCompressParameters>(Parameters)!;
|
||||
set => Parameters = ParameterHelper.Untyped(value);
|
||||
}
|
||||
|
||||
public List<string> FileIds
|
||||
{
|
||||
get => ParameterHelper.GetParameterList<string>(Parameters, "file_ids", []);
|
||||
set => Parameters["file_ids"] = value;
|
||||
get => TypedParameters.FileIds;
|
||||
set
|
||||
{
|
||||
var parameters = TypedParameters;
|
||||
parameters.FileIds = value;
|
||||
TypedParameters = parameters;
|
||||
}
|
||||
}
|
||||
|
||||
[MaxLength(32)]
|
||||
public string CompressionFormat
|
||||
{
|
||||
get => ParameterHelper.GetParameterValue(Parameters, "compression_format", "zip");
|
||||
set => Parameters["compression_format"] = value;
|
||||
get => TypedParameters.CompressionFormat;
|
||||
set
|
||||
{
|
||||
var parameters = TypedParameters;
|
||||
parameters.CompressionFormat = value;
|
||||
TypedParameters = parameters;
|
||||
}
|
||||
}
|
||||
|
||||
public int CompressionLevel
|
||||
{
|
||||
get => ParameterHelper.GetParameterValue(Parameters, "compression_level", 6);
|
||||
set => Parameters["compression_level"] = value;
|
||||
get => TypedParameters.CompressionLevel;
|
||||
set
|
||||
{
|
||||
var parameters = TypedParameters;
|
||||
parameters.CompressionLevel = value;
|
||||
TypedParameters = parameters;
|
||||
}
|
||||
}
|
||||
|
||||
public string? OutputFileName
|
||||
{
|
||||
get => ParameterHelper.GetParameterValue<string?>(Parameters, "output_file_name", null);
|
||||
set => Parameters["output_file_name"] = value;
|
||||
get => TypedParameters.OutputFileName;
|
||||
set
|
||||
{
|
||||
var parameters = TypedParameters;
|
||||
parameters.OutputFileName = value;
|
||||
TypedParameters = parameters;
|
||||
}
|
||||
}
|
||||
|
||||
public int FilesProcessed
|
||||
{
|
||||
get => ParameterHelper.GetParameterValue(Parameters, "files_processed", 0);
|
||||
get => TypedParameters.FilesProcessed;
|
||||
set
|
||||
{
|
||||
Parameters["files_processed"] = value;
|
||||
var parameters = TypedParameters;
|
||||
parameters.FilesProcessed = value;
|
||||
TypedParameters = parameters;
|
||||
Progress = FileIds.Count > 0 ? (double)value / FileIds.Count * 100 : 0;
|
||||
}
|
||||
}
|
||||
|
||||
public string? ResultFileId
|
||||
{
|
||||
get => ParameterHelper.GetParameterValue<string?>(Results, "result_file_id", null);
|
||||
set => Results["result_file_id"] = value;
|
||||
get => TypedParameters.ResultFileId;
|
||||
set
|
||||
{
|
||||
var parameters = TypedParameters;
|
||||
parameters.ResultFileId = value;
|
||||
TypedParameters = parameters;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -418,41 +503,70 @@ public class BulkOperationTask : PersistentTask
|
||||
Name = "Bulk Operation";
|
||||
}
|
||||
|
||||
// Convenience properties using typed parameters
|
||||
public BulkOperationParameters TypedParameters
|
||||
{
|
||||
get => ParameterHelper.Typed<BulkOperationParameters>(Parameters)!;
|
||||
set => Parameters = ParameterHelper.Untyped(value);
|
||||
}
|
||||
|
||||
[MaxLength(128)]
|
||||
public string OperationType
|
||||
{
|
||||
get => ParameterHelper.GetParameterValue(Parameters, "operation_type", string.Empty);
|
||||
set => Parameters["operation_type"] = value;
|
||||
get => TypedParameters.OperationType;
|
||||
set
|
||||
{
|
||||
var parameters = TypedParameters;
|
||||
parameters.OperationType = value;
|
||||
TypedParameters = parameters;
|
||||
}
|
||||
}
|
||||
|
||||
public List<string> TargetIds
|
||||
{
|
||||
get => ParameterHelper.GetParameterList<string>(Parameters, "target_ids", []);
|
||||
set => Parameters["target_ids"] = value;
|
||||
get => TypedParameters.TargetIds;
|
||||
set
|
||||
{
|
||||
var parameters = TypedParameters;
|
||||
parameters.TargetIds = value;
|
||||
TypedParameters = parameters;
|
||||
}
|
||||
}
|
||||
|
||||
[Column(TypeName = "jsonb")]
|
||||
public Dictionary<string, object?> OperationParameters
|
||||
{
|
||||
get => ParameterHelper.GetParameterValue(Parameters, "operation_parameters", new Dictionary<string, object?>());
|
||||
set => Parameters["operation_parameters"] = value;
|
||||
get => TypedParameters.OperationParameters;
|
||||
set
|
||||
{
|
||||
var parameters = TypedParameters;
|
||||
parameters.OperationParameters = value;
|
||||
TypedParameters = parameters;
|
||||
}
|
||||
}
|
||||
|
||||
public int ItemsProcessed
|
||||
{
|
||||
get => ParameterHelper.GetParameterValue(Parameters, "items_processed", 0);
|
||||
get => TypedParameters.ItemsProcessed;
|
||||
set
|
||||
{
|
||||
Parameters["items_processed"] = value;
|
||||
var parameters = TypedParameters;
|
||||
parameters.ItemsProcessed = value;
|
||||
TypedParameters = parameters;
|
||||
Progress = TargetIds.Count > 0 ? (double)value / TargetIds.Count * 100 : 0;
|
||||
}
|
||||
}
|
||||
|
||||
[Column(TypeName = "jsonb")]
|
||||
public Dictionary<string, object?> OperationResults
|
||||
public Dictionary<string, object?>? OperationResults
|
||||
{
|
||||
get => ParameterHelper.GetParameterValue(Results, "operation_results", new Dictionary<string, object?>());
|
||||
set => Results["operation_results"] = value;
|
||||
get => TypedParameters.OperationResults;
|
||||
set
|
||||
{
|
||||
var parameters = TypedParameters;
|
||||
parameters.OperationResults = value;
|
||||
TypedParameters = parameters;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -465,58 +579,89 @@ public class StorageMigrationTask : PersistentTask
|
||||
Name = "Storage Migration";
|
||||
}
|
||||
|
||||
// Convenience properties using typed parameters
|
||||
public StorageMigrationParameters TypedParameters
|
||||
{
|
||||
get => ParameterHelper.Typed<StorageMigrationParameters>(Parameters)!;
|
||||
set => Parameters = ParameterHelper.Untyped(value);
|
||||
}
|
||||
|
||||
public Guid SourcePoolId
|
||||
{
|
||||
get
|
||||
get => TypedParameters.SourcePoolId;
|
||||
set
|
||||
{
|
||||
var sourcePoolIdStr = ParameterHelper.GetParameterValue(Parameters, "source_pool_id", Guid.Empty.ToString());
|
||||
return Guid.Parse(sourcePoolIdStr);
|
||||
var parameters = TypedParameters;
|
||||
parameters.SourcePoolId = value;
|
||||
TypedParameters = parameters;
|
||||
}
|
||||
set => Parameters["source_pool_id"] = value.ToString();
|
||||
}
|
||||
|
||||
public Guid TargetPoolId
|
||||
{
|
||||
get
|
||||
get => TypedParameters.TargetPoolId;
|
||||
set
|
||||
{
|
||||
var targetPoolIdStr = ParameterHelper.GetParameterValue(Parameters, "target_pool_id", Guid.Empty.ToString());
|
||||
return Guid.Parse(targetPoolIdStr);
|
||||
var parameters = TypedParameters;
|
||||
parameters.TargetPoolId = value;
|
||||
TypedParameters = parameters;
|
||||
}
|
||||
set => Parameters["target_pool_id"] = value.ToString();
|
||||
}
|
||||
|
||||
public List<string> FileIds
|
||||
{
|
||||
get => ParameterHelper.GetParameterList<string>(Parameters, "file_ids", []);
|
||||
set => Parameters["file_ids"] = value;
|
||||
get => TypedParameters.FileIds;
|
||||
set
|
||||
{
|
||||
var parameters = TypedParameters;
|
||||
parameters.FileIds = value;
|
||||
TypedParameters = parameters;
|
||||
}
|
||||
}
|
||||
|
||||
public bool PreserveOriginals
|
||||
{
|
||||
get => ParameterHelper.GetParameterValue(Parameters, "preserve_originals", true);
|
||||
set => Parameters["preserve_originals"] = value;
|
||||
get => TypedParameters.PreserveOriginals;
|
||||
set
|
||||
{
|
||||
var parameters = TypedParameters;
|
||||
parameters.PreserveOriginals = value;
|
||||
TypedParameters = parameters;
|
||||
}
|
||||
}
|
||||
|
||||
public long TotalBytesToTransfer
|
||||
{
|
||||
get => ParameterHelper.GetParameterValue(Parameters, "total_bytes_to_transfer", 0L);
|
||||
set => Parameters["total_bytes_to_transfer"] = value;
|
||||
get => TypedParameters.TotalBytesToTransfer;
|
||||
set
|
||||
{
|
||||
var parameters = TypedParameters;
|
||||
parameters.TotalBytesToTransfer = value;
|
||||
TypedParameters = parameters;
|
||||
}
|
||||
}
|
||||
|
||||
public long BytesTransferred
|
||||
{
|
||||
get => ParameterHelper.GetParameterValue(Parameters, "bytes_transferred", 0L);
|
||||
get => TypedParameters.BytesTransferred;
|
||||
set
|
||||
{
|
||||
Parameters["bytes_transferred"] = value;
|
||||
var parameters = TypedParameters;
|
||||
parameters.BytesTransferred = value;
|
||||
TypedParameters = parameters;
|
||||
Progress = TotalBytesToTransfer > 0 ? (double)value / TotalBytesToTransfer * 100 : 0;
|
||||
}
|
||||
}
|
||||
|
||||
public int FilesMigrated
|
||||
{
|
||||
get => ParameterHelper.GetParameterValue(Parameters, "files_migrated", 0);
|
||||
set => Parameters["files_migrated"] = value;
|
||||
get => TypedParameters.FilesMigrated;
|
||||
set
|
||||
{
|
||||
var parameters = TypedParameters;
|
||||
parameters.FilesMigrated = value;
|
||||
TypedParameters = parameters;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -527,4 +672,4 @@ public enum UploadTaskStatus
|
||||
Completed = TaskStatus.Completed,
|
||||
Failed = TaskStatus.Failed,
|
||||
Expired = TaskStatus.Expired
|
||||
}
|
||||
}
|
||||
@@ -233,17 +233,17 @@ public class PersistentTaskService(
|
||||
? query.OrderByDescending(t => t.Progress)
|
||||
: query.OrderBy(t => t.Progress);
|
||||
break;
|
||||
case "createdat":
|
||||
case "created":
|
||||
orderedQuery = sortDescending
|
||||
? query.OrderByDescending(t => t.CreatedAt)
|
||||
: query.OrderBy(t => t.CreatedAt);
|
||||
break;
|
||||
case "updatedat":
|
||||
case "updated":
|
||||
orderedQuery = sortDescending
|
||||
? query.OrderByDescending(t => t.UpdatedAt)
|
||||
: query.OrderBy(t => t.UpdatedAt);
|
||||
break;
|
||||
case "lastactivity":
|
||||
case "activity":
|
||||
default:
|
||||
orderedQuery = sortDescending
|
||||
? query.OrderByDescending(t => t.LastActivity)
|
||||
@@ -344,7 +344,7 @@ public class PersistentTaskService(
|
||||
TaskId = task.TaskId,
|
||||
Name = task.Name,
|
||||
Type = task.Type.ToString(),
|
||||
CreatedAt = task.CreatedAt.ToString("O", null)
|
||||
CreatedAt = task.CreatedAt.ToString("%O", null)
|
||||
};
|
||||
|
||||
var packet = new WebSocketPacket
|
||||
@@ -380,7 +380,7 @@ public class PersistentTaskService(
|
||||
Type = task.Type.ToString(),
|
||||
Progress = newProgress,
|
||||
Status = task.Status.ToString(),
|
||||
LastActivity = task.LastActivity.ToString("O", null)
|
||||
LastActivity = task.LastActivity.ToString("%O", null)
|
||||
};
|
||||
|
||||
var packet = new WebSocketPacket
|
||||
@@ -410,7 +410,7 @@ public class PersistentTaskService(
|
||||
TaskId = task.TaskId,
|
||||
Name = task.Name,
|
||||
Type = task.Type.ToString(),
|
||||
CompletedAt = task.CompletedAt?.ToString("O", null) ?? task.UpdatedAt.ToString("O", null),
|
||||
CompletedAt = task.CompletedAt?.ToString("%O", null) ?? task.UpdatedAt.ToString("%O", null),
|
||||
Results = task.Results
|
||||
};
|
||||
|
||||
@@ -458,7 +458,7 @@ public class PersistentTaskService(
|
||||
TaskId = task.TaskId,
|
||||
Name = task.Name,
|
||||
Type = task.Type.ToString(),
|
||||
FailedAt = task.UpdatedAt.ToString("O", null),
|
||||
FailedAt = task.UpdatedAt.ToString("%O", null),
|
||||
ErrorMessage = task.ErrorMessage ?? "Task failed due to an unknown error"
|
||||
};
|
||||
|
||||
@@ -504,6 +504,8 @@ public class PersistentTaskService(
|
||||
private async Task SetCacheAsync(PersistentTask task)
|
||||
{
|
||||
var cacheKey = $"{CacheKeyPrefix}{task.TaskId}";
|
||||
|
||||
// Cache the entire task object directly - this includes all properties including Parameters dictionary
|
||||
await cache.SetAsync(cacheKey, task, CacheDuration);
|
||||
}
|
||||
|
||||
@@ -514,6 +516,475 @@ public class PersistentTaskService(
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#region Upload-Specific Methods
|
||||
|
||||
/// <summary>
|
||||
/// Gets the first available pool ID, or creates a default one if none exist
|
||||
/// </summary>
|
||||
private async Task<Guid> GetFirstAvailablePoolIdAsync()
|
||||
{
|
||||
// Try to get the first available pool
|
||||
var firstPool = await db.Pools
|
||||
.Where(p => p.PolicyConfig.PublicUsable)
|
||||
.OrderBy(p => p.CreatedAt)
|
||||
.FirstOrDefaultAsync();
|
||||
|
||||
if (firstPool != null)
|
||||
{
|
||||
return firstPool.Id;
|
||||
}
|
||||
|
||||
// If no pools exist, create a default one
|
||||
logger.LogWarning("No pools found in database. Creating default pool...");
|
||||
|
||||
var defaultPoolId = Guid.NewGuid();
|
||||
var defaultPool = new DysonNetwork.Shared.Models.FilePool
|
||||
{
|
||||
Id = defaultPoolId,
|
||||
Name = "Default Storage Pool",
|
||||
Description = "Automatically created default storage pool",
|
||||
StorageConfig = new DysonNetwork.Shared.Models.RemoteStorageConfig
|
||||
{
|
||||
Region = "auto",
|
||||
Bucket = "solar-network-development",
|
||||
Endpoint = "localhost:9000",
|
||||
SecretId = "littlesheep",
|
||||
SecretKey = "password",
|
||||
EnableSigned = true,
|
||||
EnableSsl = false
|
||||
},
|
||||
BillingConfig = new DysonNetwork.Shared.Models.BillingConfig
|
||||
{
|
||||
CostMultiplier = 1.0
|
||||
},
|
||||
PolicyConfig = new DysonNetwork.Shared.Models.PolicyConfig
|
||||
{
|
||||
EnableFastUpload = true,
|
||||
EnableRecycle = true,
|
||||
PublicUsable = true,
|
||||
AllowEncryption = true,
|
||||
AllowAnonymous = true,
|
||||
AcceptTypes = new List<string> { "*/*" },
|
||||
MaxFileSize = 1024L * 1024 * 1024 * 10, // 10GB
|
||||
RequirePrivilege = 0
|
||||
},
|
||||
IsHidden = false,
|
||||
AccountId = null,
|
||||
CreatedAt = SystemClock.Instance.GetCurrentInstant(),
|
||||
UpdatedAt = SystemClock.Instance.GetCurrentInstant()
|
||||
};
|
||||
|
||||
db.Pools.Add(defaultPool);
|
||||
await db.SaveChangesAsync();
|
||||
|
||||
logger.LogInformation("Created default pool with ID: {PoolId}", defaultPoolId);
|
||||
return defaultPoolId;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Creates a new persistent upload task
|
||||
/// </summary>
|
||||
public async Task<PersistentUploadTask> CreateUploadTaskAsync(
|
||||
string taskId,
|
||||
CreateUploadTaskRequest request,
|
||||
Guid accountId
|
||||
)
|
||||
{
|
||||
var chunkSize = request.ChunkSize ?? 1024 * 1024 * 5; // 5MB default
|
||||
var chunksCount = (int)Math.Ceiling((double)request.FileSize / chunkSize);
|
||||
|
||||
// Use default pool if no pool is specified, or find first available pool
|
||||
var poolId = request.PoolId ?? await GetFirstAvailablePoolIdAsync();
|
||||
|
||||
var uploadTask = new PersistentUploadTask
|
||||
{
|
||||
TaskId = taskId,
|
||||
FileName = request.FileName,
|
||||
FileSize = request.FileSize,
|
||||
ContentType = request.ContentType,
|
||||
ChunkSize = chunkSize,
|
||||
ChunksCount = chunksCount,
|
||||
ChunksUploaded = 0,
|
||||
PoolId = poolId,
|
||||
BundleId = request.BundleId,
|
||||
EncryptPassword = request.EncryptPassword,
|
||||
ExpiredAt = request.ExpiredAt,
|
||||
Hash = request.Hash,
|
||||
AccountId = accountId,
|
||||
Status = TaskStatus.InProgress,
|
||||
UploadedChunks = [],
|
||||
LastActivity = SystemClock.Instance.GetCurrentInstant()
|
||||
};
|
||||
|
||||
db.Tasks.Add(uploadTask);
|
||||
await db.SaveChangesAsync();
|
||||
|
||||
await SetCacheAsync(uploadTask);
|
||||
await SendTaskCreatedNotificationAsync(uploadTask);
|
||||
return uploadTask;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets an existing upload task by ID
|
||||
/// </summary>
|
||||
public async Task<PersistentUploadTask?> GetUploadTaskAsync(string taskId)
|
||||
{
|
||||
var cacheKey = $"{CacheKeyPrefix}{taskId}";
|
||||
var cachedTask = await cache.GetAsync<PersistentUploadTask>(cacheKey);
|
||||
if (cachedTask is not null)
|
||||
return cachedTask;
|
||||
|
||||
var task = await db.Tasks
|
||||
.OfType<PersistentUploadTask>()
|
||||
.FirstOrDefaultAsync(t => t.TaskId == taskId && t.Status == TaskStatus.InProgress);
|
||||
|
||||
if (task is not null)
|
||||
await SetCacheAsync(task);
|
||||
|
||||
return task;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Updates chunk upload progress
|
||||
/// </summary>
|
||||
public async Task UpdateChunkProgressAsync(string taskId, int chunkIndex)
|
||||
{
|
||||
var task = await GetUploadTaskAsync(taskId);
|
||||
if (task is null) return;
|
||||
|
||||
if (!task.UploadedChunks.Contains(chunkIndex))
|
||||
{
|
||||
var previousProgress = task.ChunksCount > 0 ? (double)task.ChunksUploaded / task.ChunksCount * 100 : 0;
|
||||
|
||||
task.UploadedChunks.Add(chunkIndex);
|
||||
task.ChunksUploaded = task.UploadedChunks.Count;
|
||||
task.LastActivity = SystemClock.Instance.GetCurrentInstant();
|
||||
|
||||
await db.SaveChangesAsync();
|
||||
await SetCacheAsync(task);
|
||||
|
||||
// Send real-time progress update
|
||||
var newProgress = task.ChunksCount > 0 ? (double)task.ChunksUploaded / task.ChunksCount * 100 : 0;
|
||||
await SendUploadProgressUpdateAsync(task, newProgress, previousProgress);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Checks if a chunk has already been uploaded
|
||||
/// </summary>
|
||||
public async Task<bool> IsChunkUploadedAsync(string taskId, int chunkIndex)
|
||||
{
|
||||
var task = await GetUploadTaskAsync(taskId);
|
||||
return task?.UploadedChunks.Contains(chunkIndex) ?? false;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets upload progress as percentage
|
||||
/// </summary>
|
||||
public async Task<double> GetUploadProgressAsync(string taskId)
|
||||
{
|
||||
var task = await GetUploadTaskAsync(taskId);
|
||||
if (task is null || task.ChunksCount == 0) return 0;
|
||||
|
||||
return (double)task.ChunksUploaded / task.ChunksCount * 100;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets user upload tasks with filtering and pagination
|
||||
/// </summary>
|
||||
public async Task<(List<PersistentUploadTask> Items, int TotalCount)> GetUserUploadTasksAsync(
|
||||
Guid accountId,
|
||||
UploadTaskStatus? status = null,
|
||||
string? sortBy = "lastActivity",
|
||||
bool sortDescending = true,
|
||||
int offset = 0,
|
||||
int limit = 50
|
||||
)
|
||||
{
|
||||
var query = db.Tasks.OfType<PersistentUploadTask>().Where(t => t.AccountId == accountId);
|
||||
|
||||
// Apply status filter
|
||||
if (status.HasValue)
|
||||
{
|
||||
query = query.Where(t => t.Status == (TaskStatus)status.Value);
|
||||
}
|
||||
|
||||
// Get total count
|
||||
var totalCount = await query.CountAsync();
|
||||
|
||||
// Apply sorting
|
||||
IOrderedQueryable<PersistentUploadTask> orderedQuery;
|
||||
switch (sortBy?.ToLower())
|
||||
{
|
||||
case "filename":
|
||||
orderedQuery = sortDescending
|
||||
? query.OrderByDescending(t => t.FileName)
|
||||
: query.OrderBy(t => t.FileName);
|
||||
break;
|
||||
case "filesize":
|
||||
orderedQuery = sortDescending
|
||||
? query.OrderByDescending(t => t.FileSize)
|
||||
: query.OrderBy(t => t.FileSize);
|
||||
break;
|
||||
case "createdat":
|
||||
orderedQuery = sortDescending
|
||||
? query.OrderByDescending(t => t.CreatedAt)
|
||||
: query.OrderBy(t => t.CreatedAt);
|
||||
break;
|
||||
case "updatedat":
|
||||
orderedQuery = sortDescending
|
||||
? query.OrderByDescending(t => t.UpdatedAt)
|
||||
: query.OrderBy(t => t.UpdatedAt);
|
||||
break;
|
||||
case "lastactivity":
|
||||
default:
|
||||
orderedQuery = sortDescending
|
||||
? query.OrderByDescending(t => t.LastActivity)
|
||||
: query.OrderBy(t => t.LastActivity);
|
||||
break;
|
||||
}
|
||||
|
||||
// Apply pagination
|
||||
var items = await orderedQuery
|
||||
.Skip(offset)
|
||||
.Take(limit)
|
||||
.ToListAsync();
|
||||
|
||||
return (items, totalCount);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets upload statistics for a user
|
||||
/// </summary>
|
||||
public async Task<UserUploadStats> GetUserUploadStatsAsync(Guid accountId)
|
||||
{
|
||||
var tasks = await db.Tasks
|
||||
.OfType<PersistentUploadTask>()
|
||||
.Where(t => t.AccountId == accountId)
|
||||
.ToListAsync();
|
||||
|
||||
var stats = new UserUploadStats
|
||||
{
|
||||
TotalTasks = tasks.Count,
|
||||
InProgressTasks = tasks.Count(t => t.Status == Model.TaskStatus.InProgress),
|
||||
CompletedTasks = tasks.Count(t => t.Status == Model.TaskStatus.Completed),
|
||||
FailedTasks = tasks.Count(t => t.Status == Model.TaskStatus.Failed),
|
||||
ExpiredTasks = tasks.Count(t => t.Status == Model.TaskStatus.Expired),
|
||||
TotalUploadedBytes = tasks.Sum(t => (long)t.ChunksUploaded * t.ChunkSize),
|
||||
AverageProgress = tasks.Any(t => t.Status == Model.TaskStatus.InProgress)
|
||||
? tasks.Where(t => t.Status == Model.TaskStatus.InProgress)
|
||||
.Average(t => t.ChunksCount > 0 ? (double)t.ChunksUploaded / t.ChunksCount * 100 : 0)
|
||||
: 0,
|
||||
RecentActivity = tasks.OrderByDescending(t => t.LastActivity)
|
||||
.Take(5)
|
||||
.Select(t => new RecentActivity
|
||||
{
|
||||
TaskId = t.TaskId,
|
||||
FileName = t.FileName,
|
||||
Status = (UploadTaskStatus)t.Status,
|
||||
LastActivity = t.LastActivity,
|
||||
Progress = t.ChunksCount > 0 ? (double)t.ChunksUploaded / t.ChunksCount * 100 : 0
|
||||
})
|
||||
.ToList()
|
||||
};
|
||||
|
||||
return stats;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Cleans up failed tasks for a user
|
||||
/// </summary>
|
||||
public async Task<int> CleanupUserFailedTasksAsync(Guid accountId)
|
||||
{
|
||||
var failedTasks = await db.Tasks
|
||||
.OfType<PersistentUploadTask>()
|
||||
.Where(t => t.AccountId == accountId &&
|
||||
(t.Status == Model.TaskStatus.Failed || t.Status == Model.TaskStatus.Expired))
|
||||
.ToListAsync();
|
||||
|
||||
foreach (var task in failedTasks)
|
||||
{
|
||||
await RemoveCacheAsync(task.TaskId);
|
||||
|
||||
// Clean up temp files
|
||||
var taskPath = Path.Combine(Path.GetTempPath(), "multipart-uploads", task.TaskId);
|
||||
if (Directory.Exists(taskPath))
|
||||
{
|
||||
try
|
||||
{
|
||||
Directory.Delete(taskPath, true);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogWarning(ex, "Failed to cleanup temp files for task {TaskId}", task.TaskId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
db.Tasks.RemoveRange(failedTasks);
|
||||
await db.SaveChangesAsync();
|
||||
|
||||
return failedTasks.Count;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets recent tasks for a user
|
||||
/// </summary>
|
||||
public async Task<List<PersistentUploadTask>> GetRecentUserTasksAsync(Guid accountId, int limit = 10)
|
||||
{
|
||||
return await db.Tasks
|
||||
.OfType<PersistentUploadTask>()
|
||||
.Where(t => t.AccountId == accountId)
|
||||
.OrderByDescending(t => t.LastActivity)
|
||||
.Take(limit)
|
||||
.ToListAsync();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Sends upload completion notification
|
||||
/// </summary>
|
||||
public async Task SendUploadCompletedNotificationAsync(PersistentUploadTask task, string fileId)
|
||||
{
|
||||
try
|
||||
{
|
||||
var completionData = new UploadCompletionData
|
||||
{
|
||||
TaskId = task.TaskId,
|
||||
FileId = fileId,
|
||||
FileName = task.FileName,
|
||||
FileSize = task.FileSize,
|
||||
CompletedAt = SystemClock.Instance.GetCurrentInstant().ToString("%O", null)
|
||||
};
|
||||
|
||||
// Send WebSocket notification
|
||||
var wsPacket = new WebSocketPacket
|
||||
{
|
||||
Type = "upload.completed",
|
||||
Data = Google.Protobuf.ByteString.CopyFromUtf8(System.Text.Json.JsonSerializer.Serialize(completionData))
|
||||
};
|
||||
|
||||
await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest
|
||||
{
|
||||
UserId = task.AccountId.ToString(),
|
||||
Packet = wsPacket
|
||||
});
|
||||
|
||||
// Send push notification
|
||||
var pushNotification = new PushNotification
|
||||
{
|
||||
Topic = "upload",
|
||||
Title = "Upload Completed",
|
||||
Subtitle = task.FileName,
|
||||
Body = $"Your file '{task.FileName}' has been uploaded successfully.",
|
||||
IsSavable = true
|
||||
};
|
||||
|
||||
await ringService.SendPushNotificationToUserAsync(new SendPushNotificationToUserRequest
|
||||
{
|
||||
UserId = task.AccountId.ToString(),
|
||||
Notification = pushNotification
|
||||
});
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogWarning(ex, "Failed to send upload completion notification for task {TaskId}", task.TaskId);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Sends upload failure notification
|
||||
/// </summary>
|
||||
public async Task SendUploadFailedNotificationAsync(PersistentUploadTask task, string? errorMessage = null)
|
||||
{
|
||||
try
|
||||
{
|
||||
var failureData = new UploadFailureData
|
||||
{
|
||||
TaskId = task.TaskId,
|
||||
FileName = task.FileName,
|
||||
FileSize = task.FileSize,
|
||||
FailedAt = SystemClock.Instance.GetCurrentInstant().ToString("%O", null),
|
||||
ErrorMessage = errorMessage ?? "Upload failed due to an unknown error"
|
||||
};
|
||||
|
||||
// Send WebSocket notification
|
||||
var wsPacket = new WebSocketPacket
|
||||
{
|
||||
Type = "upload.failed",
|
||||
Data = Google.Protobuf.ByteString.CopyFromUtf8(System.Text.Json.JsonSerializer.Serialize(failureData))
|
||||
};
|
||||
|
||||
await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest
|
||||
{
|
||||
UserId = task.AccountId.ToString(),
|
||||
Packet = wsPacket
|
||||
});
|
||||
|
||||
// Send push notification
|
||||
var pushNotification = new PushNotification
|
||||
{
|
||||
Topic = "upload",
|
||||
Title = "Upload Failed",
|
||||
Subtitle = task.FileName,
|
||||
Body = $"Your file '{task.FileName}' upload has failed. You can try again.",
|
||||
IsSavable = true
|
||||
};
|
||||
|
||||
await ringService.SendPushNotificationToUserAsync(new SendPushNotificationToUserRequest
|
||||
{
|
||||
UserId = task.AccountId.ToString(),
|
||||
Notification = pushNotification
|
||||
});
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogWarning(ex, "Failed to send upload failure notification for task {TaskId}", task.TaskId);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Sends real-time upload progress update via WebSocket
|
||||
/// </summary>
|
||||
private async Task SendUploadProgressUpdateAsync(PersistentUploadTask task, double newProgress, double previousProgress)
|
||||
{
|
||||
try
|
||||
{
|
||||
// Only send significant progress updates (every 5% or major milestones)
|
||||
if (Math.Abs(newProgress - previousProgress) < 5 && newProgress < 100)
|
||||
return;
|
||||
|
||||
var progressData = new UploadProgressData
|
||||
{
|
||||
TaskId = task.TaskId,
|
||||
FileName = task.FileName,
|
||||
FileSize = task.FileSize,
|
||||
ChunksUploaded = task.ChunksUploaded,
|
||||
ChunksTotal = task.ChunksCount,
|
||||
Progress = newProgress,
|
||||
Status = task.Status.ToString(),
|
||||
LastActivity = task.LastActivity.ToString("%O", null)
|
||||
};
|
||||
|
||||
var packet = new WebSocketPacket
|
||||
{
|
||||
Type = "upload.progress",
|
||||
Data = Google.Protobuf.ByteString.CopyFromUtf8(System.Text.Json.JsonSerializer.Serialize(progressData))
|
||||
};
|
||||
|
||||
await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest
|
||||
{
|
||||
UserId = task.AccountId.ToString(),
|
||||
Packet = packet
|
||||
});
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogWarning(ex, "Failed to send upload progress update for task {TaskId}", task.TaskId);
|
||||
}
|
||||
}
|
||||
|
||||
#endregion
|
||||
}
|
||||
|
||||
#region Data Transfer Objects
|
||||
@@ -579,3 +1050,58 @@ public class TaskActivity
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
#region Upload-Specific Data Transfer Objects
|
||||
|
||||
public class UploadProgressData
|
||||
{
|
||||
public string TaskId { get; set; } = null!;
|
||||
public string FileName { get; set; } = null!;
|
||||
public long FileSize { get; set; }
|
||||
public int ChunksUploaded { get; set; }
|
||||
public int ChunksTotal { get; set; }
|
||||
public double Progress { get; set; }
|
||||
public string Status { get; set; } = null!;
|
||||
public string LastActivity { get; set; } = null!;
|
||||
}
|
||||
|
||||
public class UploadCompletionData
|
||||
{
|
||||
public string TaskId { get; set; } = null!;
|
||||
public string FileId { get; set; } = null!;
|
||||
public string FileName { get; set; } = null!;
|
||||
public long FileSize { get; set; }
|
||||
public string CompletedAt { get; set; } = null!;
|
||||
}
|
||||
|
||||
public class UploadFailureData
|
||||
{
|
||||
public string TaskId { get; set; } = null!;
|
||||
public string FileName { get; set; } = null!;
|
||||
public long FileSize { get; set; }
|
||||
public string FailedAt { get; set; } = null!;
|
||||
public string ErrorMessage { get; set; } = null!;
|
||||
}
|
||||
|
||||
public class UserUploadStats
|
||||
{
|
||||
public int TotalTasks { get; set; }
|
||||
public int InProgressTasks { get; set; }
|
||||
public int CompletedTasks { get; set; }
|
||||
public int FailedTasks { get; set; }
|
||||
public int ExpiredTasks { get; set; }
|
||||
public long TotalUploadedBytes { get; set; }
|
||||
public double AverageProgress { get; set; }
|
||||
public List<RecentActivity> RecentActivity { get; set; } = new();
|
||||
}
|
||||
|
||||
public class RecentActivity
|
||||
{
|
||||
public string TaskId { get; set; } = null!;
|
||||
public string FileName { get; set; } = null!;
|
||||
public UploadTaskStatus Status { get; set; }
|
||||
public Instant LastActivity { get; set; }
|
||||
public double Progress { get; set; }
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
@@ -1,567 +0,0 @@
|
||||
using DysonNetwork.Drive.Storage.Model;
|
||||
using DysonNetwork.Shared.Cache;
|
||||
using DysonNetwork.Shared.Proto;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using NodaTime;
|
||||
using System.Text.Json;
|
||||
using TaskStatus = DysonNetwork.Drive.Storage.Model.TaskStatus;
|
||||
|
||||
namespace DysonNetwork.Drive.Storage;
|
||||
|
||||
public class PersistentUploadService(
|
||||
AppDatabase db,
|
||||
ICacheService cache,
|
||||
ILogger<PersistentUploadService> logger,
|
||||
RingService.RingServiceClient ringService
|
||||
)
|
||||
{
|
||||
private const string CacheKeyPrefix = "upload:task:";
|
||||
private static readonly TimeSpan CacheDuration = TimeSpan.FromMinutes(30);
|
||||
|
||||
/// <summary>
|
||||
/// Creates a new persistent upload task
|
||||
/// </summary>
|
||||
public async Task<PersistentUploadTask> CreateUploadTaskAsync(
|
||||
string taskId,
|
||||
CreateUploadTaskRequest request,
|
||||
Guid accountId
|
||||
)
|
||||
{
|
||||
var chunkSize = request.ChunkSize ?? 1024 * 1024 * 5; // 5MB default
|
||||
var chunksCount = (int)Math.Ceiling((double)request.FileSize / chunkSize);
|
||||
|
||||
var uploadTask = new PersistentUploadTask
|
||||
{
|
||||
TaskId = taskId,
|
||||
FileName = request.FileName,
|
||||
FileSize = request.FileSize,
|
||||
ContentType = request.ContentType,
|
||||
ChunkSize = chunkSize,
|
||||
ChunksCount = chunksCount,
|
||||
ChunksUploaded = 0,
|
||||
PoolId = request.PoolId.Value,
|
||||
BundleId = request.BundleId,
|
||||
EncryptPassword = request.EncryptPassword,
|
||||
ExpiredAt = request.ExpiredAt,
|
||||
Hash = request.Hash,
|
||||
AccountId = accountId,
|
||||
Status = Model.TaskStatus.InProgress,
|
||||
UploadedChunks = new List<int>(),
|
||||
LastActivity = SystemClock.Instance.GetCurrentInstant()
|
||||
};
|
||||
|
||||
db.UploadTasks.Add(uploadTask);
|
||||
await db.SaveChangesAsync();
|
||||
|
||||
await SetCacheAsync(uploadTask);
|
||||
return uploadTask;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets an existing upload task by ID
|
||||
/// </summary>
|
||||
public async Task<PersistentUploadTask?> GetUploadTaskAsync(string taskId)
|
||||
{
|
||||
var cacheKey = $"{CacheKeyPrefix}{taskId}";
|
||||
var cachedTask = await cache.GetAsync<PersistentUploadTask>(cacheKey);
|
||||
if (cachedTask is not null)
|
||||
return cachedTask;
|
||||
|
||||
var task = await db.Tasks
|
||||
.OfType<PersistentUploadTask>()
|
||||
.FirstOrDefaultAsync(t => t.TaskId == taskId && t.Status == TaskStatus.InProgress);
|
||||
|
||||
if (task is not null)
|
||||
await SetCacheAsync(task);
|
||||
|
||||
return task;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Updates chunk upload progress
|
||||
/// </summary>
|
||||
public async Task UpdateChunkProgressAsync(string taskId, int chunkIndex)
|
||||
{
|
||||
var task = await GetUploadTaskAsync(taskId);
|
||||
if (task is null) return;
|
||||
|
||||
if (!task.UploadedChunks.Contains(chunkIndex))
|
||||
{
|
||||
var previousProgress = task.ChunksCount > 0 ? (double)task.ChunksUploaded / task.ChunksCount * 100 : 0;
|
||||
|
||||
task.UploadedChunks.Add(chunkIndex);
|
||||
task.ChunksUploaded = task.UploadedChunks.Count;
|
||||
task.LastActivity = SystemClock.Instance.GetCurrentInstant();
|
||||
|
||||
await db.SaveChangesAsync();
|
||||
await SetCacheAsync(task);
|
||||
|
||||
// Send real-time progress update
|
||||
var newProgress = task.ChunksCount > 0 ? (double)task.ChunksUploaded / task.ChunksCount * 100 : 0;
|
||||
await SendUploadProgressUpdateAsync(task, newProgress, previousProgress);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Marks an upload task as completed
|
||||
/// </summary>
|
||||
public async Task MarkTaskCompletedAsync(string taskId)
|
||||
{
|
||||
var task = await GetUploadTaskAsync(taskId);
|
||||
if (task is null) return;
|
||||
|
||||
task.Status = Model.TaskStatus.Completed;
|
||||
task.LastActivity = SystemClock.Instance.GetCurrentInstant();
|
||||
|
||||
await db.SaveChangesAsync();
|
||||
await RemoveCacheAsync(taskId);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Marks an upload task as failed
|
||||
/// </summary>
|
||||
public async Task MarkTaskFailedAsync(string taskId)
|
||||
{
|
||||
var task = await GetUploadTaskAsync(taskId);
|
||||
if (task is null) return;
|
||||
|
||||
task.Status = Model.TaskStatus.Failed;
|
||||
task.LastActivity = SystemClock.Instance.GetCurrentInstant();
|
||||
|
||||
await db.SaveChangesAsync();
|
||||
await RemoveCacheAsync(taskId);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets all resumable tasks for an account
|
||||
/// </summary>
|
||||
public async Task<List<PersistentUploadTask>> GetResumableTasksAsync(Guid accountId)
|
||||
{
|
||||
return await db.Tasks
|
||||
.OfType<PersistentUploadTask>()
|
||||
.Where(t => t.AccountId == accountId &&
|
||||
t.Status == Model.TaskStatus.InProgress &&
|
||||
t.LastActivity > SystemClock.Instance.GetCurrentInstant() - Duration.FromHours(24))
|
||||
.OrderByDescending(t => t.LastActivity)
|
||||
.ToListAsync();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets user tasks with filtering and pagination
|
||||
/// </summary>
|
||||
public async Task<(List<PersistentUploadTask> Items, int TotalCount)> GetUserTasksAsync(
|
||||
Guid accountId,
|
||||
UploadTaskStatus? status = null,
|
||||
string? sortBy = "lastActivity",
|
||||
bool sortDescending = true,
|
||||
int offset = 0,
|
||||
int limit = 50
|
||||
)
|
||||
{
|
||||
var query = db.Tasks.OfType<PersistentUploadTask>().Where(t => t.AccountId == accountId);
|
||||
|
||||
// Apply status filter
|
||||
if (status.HasValue)
|
||||
{
|
||||
query = query.Where(t => t.Status == (TaskStatus)status.Value);
|
||||
}
|
||||
|
||||
// Get total count
|
||||
var totalCount = await query.CountAsync();
|
||||
|
||||
// Apply sorting
|
||||
IOrderedQueryable<PersistentUploadTask> orderedQuery;
|
||||
switch (sortBy?.ToLower())
|
||||
{
|
||||
case "filename":
|
||||
orderedQuery = sortDescending
|
||||
? query.OrderByDescending(t => t.FileName)
|
||||
: query.OrderBy(t => t.FileName);
|
||||
break;
|
||||
case "filesize":
|
||||
orderedQuery = sortDescending
|
||||
? query.OrderByDescending(t => t.FileSize)
|
||||
: query.OrderBy(t => t.FileSize);
|
||||
break;
|
||||
case "createdat":
|
||||
orderedQuery = sortDescending
|
||||
? query.OrderByDescending(t => t.CreatedAt)
|
||||
: query.OrderBy(t => t.CreatedAt);
|
||||
break;
|
||||
case "updatedat":
|
||||
orderedQuery = sortDescending
|
||||
? query.OrderByDescending(t => t.UpdatedAt)
|
||||
: query.OrderBy(t => t.UpdatedAt);
|
||||
break;
|
||||
case "lastactivity":
|
||||
default:
|
||||
orderedQuery = sortDescending
|
||||
? query.OrderByDescending(t => t.LastActivity)
|
||||
: query.OrderBy(t => t.LastActivity);
|
||||
break;
|
||||
}
|
||||
|
||||
// Apply pagination
|
||||
var items = await orderedQuery
|
||||
.Skip(offset)
|
||||
.Take(limit)
|
||||
.ToListAsync();
|
||||
|
||||
return (items, totalCount);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Checks if a chunk has already been uploaded
|
||||
/// </summary>
|
||||
public async Task<bool> IsChunkUploadedAsync(string taskId, int chunkIndex)
|
||||
{
|
||||
var task = await GetUploadTaskAsync(taskId);
|
||||
return task?.UploadedChunks.Contains(chunkIndex) ?? false;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Cleans up expired/stale upload tasks
|
||||
/// </summary>
|
||||
public async Task CleanupStaleTasksAsync()
|
||||
{
|
||||
var now = SystemClock.Instance.GetCurrentInstant();
|
||||
var staleThreshold = now - Duration.FromHours(24); // 24 hours
|
||||
|
||||
var staleTasks = await db.Tasks
|
||||
.OfType<PersistentUploadTask>()
|
||||
.Where(t => t.Status == Model.TaskStatus.InProgress &&
|
||||
t.LastActivity < staleThreshold)
|
||||
.ToListAsync();
|
||||
|
||||
foreach (var task in staleTasks)
|
||||
{
|
||||
task.Status = Model.TaskStatus.Expired;
|
||||
await RemoveCacheAsync(task.TaskId);
|
||||
|
||||
// Clean up temp files
|
||||
var taskPath = Path.Combine(Path.GetTempPath(), "multipart-uploads", task.TaskId);
|
||||
if (Directory.Exists(taskPath))
|
||||
{
|
||||
try
|
||||
{
|
||||
Directory.Delete(taskPath, true);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogWarning(ex, "Failed to cleanup temp files for task {TaskId}", task.TaskId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
await db.SaveChangesAsync();
|
||||
|
||||
if (staleTasks.Any())
|
||||
{
|
||||
logger.LogInformation("Cleaned up {Count} stale upload tasks", staleTasks.Count);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets upload progress as percentage
|
||||
/// </summary>
|
||||
public async Task<double> GetUploadProgressAsync(string taskId)
|
||||
{
|
||||
var task = await GetUploadTaskAsync(taskId);
|
||||
if (task is null || task.ChunksCount == 0) return 0;
|
||||
|
||||
return (double)task.ChunksUploaded / task.ChunksCount * 100;
|
||||
}
|
||||
|
||||
private async Task SetCacheAsync(PersistentUploadTask task)
|
||||
{
|
||||
var cacheKey = $"{CacheKeyPrefix}{task.TaskId}";
|
||||
await cache.SetAsync(cacheKey, task, CacheDuration);
|
||||
}
|
||||
|
||||
private async Task RemoveCacheAsync(string taskId)
|
||||
{
|
||||
var cacheKey = $"{CacheKeyPrefix}{taskId}";
|
||||
await cache.RemoveAsync(cacheKey);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets upload statistics for a user
|
||||
/// </summary>
|
||||
public async Task<UserUploadStats> GetUserUploadStatsAsync(Guid accountId)
|
||||
{
|
||||
var tasks = await db.Tasks
|
||||
.OfType<PersistentUploadTask>()
|
||||
.Where(t => t.AccountId == accountId)
|
||||
.ToListAsync();
|
||||
|
||||
var stats = new UserUploadStats
|
||||
{
|
||||
TotalTasks = tasks.Count,
|
||||
InProgressTasks = tasks.Count(t => t.Status == Model.TaskStatus.InProgress),
|
||||
CompletedTasks = tasks.Count(t => t.Status == Model.TaskStatus.Completed),
|
||||
FailedTasks = tasks.Count(t => t.Status == Model.TaskStatus.Failed),
|
||||
ExpiredTasks = tasks.Count(t => t.Status == Model.TaskStatus.Expired),
|
||||
TotalUploadedBytes = tasks.Sum(t => (long)t.ChunksUploaded * t.ChunkSize),
|
||||
AverageProgress = tasks.Any(t => t.Status == Model.TaskStatus.InProgress)
|
||||
? tasks.Where(t => t.Status == Model.TaskStatus.InProgress)
|
||||
.Average(t => t.ChunksCount > 0 ? (double)t.ChunksUploaded / t.ChunksCount * 100 : 0)
|
||||
: 0,
|
||||
RecentActivity = tasks.OrderByDescending(t => t.LastActivity)
|
||||
.Take(5)
|
||||
.Select(t => new RecentActivity
|
||||
{
|
||||
TaskId = t.TaskId,
|
||||
FileName = t.FileName,
|
||||
Status = (UploadTaskStatus)t.Status,
|
||||
LastActivity = t.LastActivity,
|
||||
Progress = t.ChunksCount > 0 ? (double)t.ChunksUploaded / t.ChunksCount * 100 : 0
|
||||
})
|
||||
.ToList()
|
||||
};
|
||||
|
||||
return stats;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Cleans up failed tasks for a user
|
||||
/// </summary>
|
||||
public async Task<int> CleanupUserFailedTasksAsync(Guid accountId)
|
||||
{
|
||||
var failedTasks = await db.Tasks
|
||||
.OfType<PersistentUploadTask>()
|
||||
.Where(t => t.AccountId == accountId &&
|
||||
(t.Status == Model.TaskStatus.Failed || t.Status == Model.TaskStatus.Expired))
|
||||
.ToListAsync();
|
||||
|
||||
foreach (var task in failedTasks)
|
||||
{
|
||||
await RemoveCacheAsync(task.TaskId);
|
||||
|
||||
// Clean up temp files
|
||||
var taskPath = Path.Combine(Path.GetTempPath(), "multipart-uploads", task.TaskId);
|
||||
if (Directory.Exists(taskPath))
|
||||
{
|
||||
try
|
||||
{
|
||||
Directory.Delete(taskPath, true);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogWarning(ex, "Failed to cleanup temp files for task {TaskId}", task.TaskId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
db.Tasks.RemoveRange(failedTasks);
|
||||
await db.SaveChangesAsync();
|
||||
|
||||
return failedTasks.Count;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets recent tasks for a user
|
||||
/// </summary>
|
||||
public async Task<List<PersistentUploadTask>> GetRecentUserTasksAsync(Guid accountId, int limit = 10)
|
||||
{
|
||||
return await db.Tasks
|
||||
.OfType<PersistentUploadTask>()
|
||||
.Where(t => t.AccountId == accountId)
|
||||
.OrderByDescending(t => t.LastActivity)
|
||||
.Take(limit)
|
||||
.ToListAsync();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Sends real-time upload progress update via WebSocket
|
||||
/// </summary>
|
||||
private async Task SendUploadProgressUpdateAsync(PersistentUploadTask task, double newProgress, double previousProgress)
|
||||
{
|
||||
try
|
||||
{
|
||||
// Only send significant progress updates (every 5% or major milestones)
|
||||
if (Math.Abs(newProgress - previousProgress) < 5 && newProgress < 100)
|
||||
return;
|
||||
|
||||
var progressData = new UploadProgressData
|
||||
{
|
||||
TaskId = task.TaskId,
|
||||
FileName = task.FileName,
|
||||
FileSize = task.FileSize,
|
||||
ChunksUploaded = task.ChunksUploaded,
|
||||
ChunksTotal = task.ChunksCount,
|
||||
Progress = newProgress,
|
||||
Status = task.Status.ToString(),
|
||||
LastActivity = task.LastActivity.ToString("O", null)
|
||||
};
|
||||
|
||||
var packet = new WebSocketPacket
|
||||
{
|
||||
Type = "upload.progress",
|
||||
Data = Google.Protobuf.ByteString.CopyFromUtf8(System.Text.Json.JsonSerializer.Serialize(progressData))
|
||||
};
|
||||
|
||||
await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest
|
||||
{
|
||||
UserId = task.AccountId.ToString(),
|
||||
Packet = packet
|
||||
});
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogWarning(ex, "Failed to send upload progress update for task {TaskId}", task.TaskId);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Sends upload completion notification
|
||||
/// </summary>
|
||||
public async Task SendUploadCompletedNotificationAsync(PersistentUploadTask task, string fileId)
|
||||
{
|
||||
try
|
||||
{
|
||||
var completionData = new UploadCompletionData
|
||||
{
|
||||
TaskId = task.TaskId,
|
||||
FileId = fileId,
|
||||
FileName = task.FileName,
|
||||
FileSize = task.FileSize,
|
||||
CompletedAt = SystemClock.Instance.GetCurrentInstant().ToString("O", null)
|
||||
};
|
||||
|
||||
// Send WebSocket notification
|
||||
var wsPacket = new WebSocketPacket
|
||||
{
|
||||
Type = "upload.completed",
|
||||
Data = Google.Protobuf.ByteString.CopyFromUtf8(System.Text.Json.JsonSerializer.Serialize(completionData))
|
||||
};
|
||||
|
||||
await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest
|
||||
{
|
||||
UserId = task.AccountId.ToString(),
|
||||
Packet = wsPacket
|
||||
});
|
||||
|
||||
// Send push notification
|
||||
var pushNotification = new PushNotification
|
||||
{
|
||||
Topic = "upload",
|
||||
Title = "Upload Completed",
|
||||
Subtitle = task.FileName,
|
||||
Body = $"Your file '{task.FileName}' has been uploaded successfully.",
|
||||
IsSavable = true
|
||||
};
|
||||
|
||||
await ringService.SendPushNotificationToUserAsync(new SendPushNotificationToUserRequest
|
||||
{
|
||||
UserId = task.AccountId.ToString(),
|
||||
Notification = pushNotification
|
||||
});
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogWarning(ex, "Failed to send upload completion notification for task {TaskId}", task.TaskId);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Sends upload failure notification
|
||||
/// </summary>
|
||||
public async Task SendUploadFailedNotificationAsync(PersistentUploadTask task, string? errorMessage = null)
|
||||
{
|
||||
try
|
||||
{
|
||||
var failureData = new UploadFailureData
|
||||
{
|
||||
TaskId = task.TaskId,
|
||||
FileName = task.FileName,
|
||||
FileSize = task.FileSize,
|
||||
FailedAt = SystemClock.Instance.GetCurrentInstant().ToString("O", null),
|
||||
ErrorMessage = errorMessage ?? "Upload failed due to an unknown error"
|
||||
};
|
||||
|
||||
// Send WebSocket notification
|
||||
var wsPacket = new WebSocketPacket
|
||||
{
|
||||
Type = "upload.failed",
|
||||
Data = Google.Protobuf.ByteString.CopyFromUtf8(System.Text.Json.JsonSerializer.Serialize(failureData))
|
||||
};
|
||||
|
||||
await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest
|
||||
{
|
||||
UserId = task.AccountId.ToString(),
|
||||
Packet = wsPacket
|
||||
});
|
||||
|
||||
// Send push notification
|
||||
var pushNotification = new PushNotification
|
||||
{
|
||||
Topic = "upload",
|
||||
Title = "Upload Failed",
|
||||
Subtitle = task.FileName,
|
||||
Body = $"Your file '{task.FileName}' upload has failed. You can try again.",
|
||||
IsSavable = true
|
||||
};
|
||||
|
||||
await ringService.SendPushNotificationToUserAsync(new SendPushNotificationToUserRequest
|
||||
{
|
||||
UserId = task.AccountId.ToString(),
|
||||
Notification = pushNotification
|
||||
});
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogWarning(ex, "Failed to send upload failure notification for task {TaskId}", task.TaskId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public class UploadProgressData
|
||||
{
|
||||
public string TaskId { get; set; } = null!;
|
||||
public string FileName { get; set; } = null!;
|
||||
public long FileSize { get; set; }
|
||||
public int ChunksUploaded { get; set; }
|
||||
public int ChunksTotal { get; set; }
|
||||
public double Progress { get; set; }
|
||||
public string Status { get; set; } = null!;
|
||||
public string LastActivity { get; set; } = null!;
|
||||
}
|
||||
|
||||
public class UploadCompletionData
|
||||
{
|
||||
public string TaskId { get; set; } = null!;
|
||||
public string FileId { get; set; } = null!;
|
||||
public string FileName { get; set; } = null!;
|
||||
public long FileSize { get; set; }
|
||||
public string CompletedAt { get; set; } = null!;
|
||||
}
|
||||
|
||||
public class UploadFailureData
|
||||
{
|
||||
public string TaskId { get; set; } = null!;
|
||||
public string FileName { get; set; } = null!;
|
||||
public long FileSize { get; set; }
|
||||
public string FailedAt { get; set; } = null!;
|
||||
public string ErrorMessage { get; set; } = null!;
|
||||
}
|
||||
|
||||
public class UserUploadStats
|
||||
{
|
||||
public int TotalTasks { get; set; }
|
||||
public int InProgressTasks { get; set; }
|
||||
public int CompletedTasks { get; set; }
|
||||
public int FailedTasks { get; set; }
|
||||
public int ExpiredTasks { get; set; }
|
||||
public long TotalUploadedBytes { get; set; }
|
||||
public double AverageProgress { get; set; }
|
||||
public List<RecentActivity> RecentActivity { get; set; } = new();
|
||||
}
|
||||
|
||||
public class RecentActivity
|
||||
{
|
||||
public string TaskId { get; set; } = null!;
|
||||
public string FileName { get; set; } = null!;
|
||||
public UploadTaskStatus Status { get; set; }
|
||||
public Instant LastActivity { get; set; }
|
||||
public double Progress { get; set; }
|
||||
}
|
||||
@@ -32,7 +32,7 @@
|
||||
},
|
||||
"Storage": {
|
||||
"Uploads": "Uploads",
|
||||
"PreferredRemote": "2adceae3-981a-4564-9b8d-5d71a211c873",
|
||||
"PreferredRemote": "c53136a6-9152-4ecb-9f88-43c41438c23e",
|
||||
"Remote": [
|
||||
{
|
||||
"Id": "minio",
|
||||
|
||||
Reference in New Issue
Block a user