Compare commits
2 Commits
db98fa240e
...
741ed18ce5
| Author | SHA1 | Date | |
|---|---|---|---|
|
741ed18ce5
|
|||
|
2bfb50cc71
|
@@ -1,5 +1,7 @@
|
||||
using System.Text.Json;
|
||||
using DysonNetwork.Drive.Storage;
|
||||
using DysonNetwork.Drive.Storage.Model;
|
||||
using DysonNetwork.Shared.Models;
|
||||
using DysonNetwork.Shared.Proto;
|
||||
using DysonNetwork.Shared.Stream;
|
||||
using FFMpegCore;
|
||||
@@ -29,16 +31,15 @@ public class BroadcastEventHandler(
|
||||
[".gif", ".apng", ".avif"];
|
||||
|
||||
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
var js = nats.CreateJetStreamContext();
|
||||
|
||||
await js.EnsureStreamCreated("account_events", [AccountDeletedEvent.Type]);
|
||||
|
||||
|
||||
var accountEventConsumer = await js.CreateOrUpdateConsumerAsync("account_events",
|
||||
new ConsumerConfig("drive_account_deleted_handler"), cancellationToken: stoppingToken);
|
||||
|
||||
|
||||
await js.EnsureStreamCreated("file_events", [FileUploadedEvent.Type]);
|
||||
var fileUploadedConsumer = await js.CreateOrUpdateConsumerAsync("file_events",
|
||||
new ConsumerConfig("drive_file_uploaded_handler") { MaxDeliver = 3 }, cancellationToken: stoppingToken);
|
||||
@@ -53,13 +54,14 @@ public class BroadcastEventHandler(
|
||||
{
|
||||
await foreach (var msg in consumer.ConsumeAsync<byte[]>(cancellationToken: stoppingToken))
|
||||
{
|
||||
var payload = JsonSerializer.Deserialize<FileUploadedEventPayload>(msg.Data, GrpcTypeHelper.SerializerOptions);
|
||||
var payload =
|
||||
JsonSerializer.Deserialize<FileUploadedEventPayload>(msg.Data, GrpcTypeHelper.SerializerOptions);
|
||||
if (payload == null)
|
||||
{
|
||||
await msg.AckAsync(cancellationToken: stoppingToken);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
try
|
||||
{
|
||||
await ProcessAndUploadInBackgroundAsync(
|
||||
@@ -129,8 +131,8 @@ public class BroadcastEventHandler(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ProcessAndUploadInBackgroundAsync(
|
||||
|
||||
private async Task ProcessAndUploadInBackgroundAsync(
|
||||
string fileId,
|
||||
Guid remoteId,
|
||||
string storageId,
|
||||
@@ -142,6 +144,7 @@ public class BroadcastEventHandler(
|
||||
using var scope = serviceProvider.CreateScope();
|
||||
var fs = scope.ServiceProvider.GetRequiredService<FileService>();
|
||||
var scopedDb = scope.ServiceProvider.GetRequiredService<AppDatabase>();
|
||||
var persistentTaskService = scope.ServiceProvider.GetRequiredService<PersistentTaskService>();
|
||||
|
||||
var pool = await fs.GetPoolAsync(remoteId);
|
||||
if (pool is null) return;
|
||||
@@ -155,6 +158,11 @@ public class BroadcastEventHandler(
|
||||
|
||||
var fileToUpdate = await scopedDb.Files.AsNoTracking().FirstAsync(f => f.Id == fileId);
|
||||
|
||||
// Find the upload task associated with this file
|
||||
var uploadTask = await scopedDb.Tasks
|
||||
.OfType<PersistentUploadTask>()
|
||||
.FirstOrDefaultAsync(t => t.FileName == fileToUpdate.Name && t.FileSize == fileToUpdate.Size);
|
||||
|
||||
if (fileToUpdate.IsEncrypted)
|
||||
{
|
||||
uploads.Add((processingFilePath, string.Empty, contentType, false));
|
||||
@@ -293,5 +301,51 @@ public class BroadcastEventHandler(
|
||||
}
|
||||
|
||||
await fs._PurgeCacheAsync(fileId);
|
||||
|
||||
// Complete the upload task if found
|
||||
if (uploadTask != null)
|
||||
{
|
||||
await persistentTaskService.MarkTaskCompletedAsync(uploadTask.TaskId, new Dictionary<string, object?>
|
||||
{
|
||||
{ "FileId", fileId },
|
||||
{ "FileName", fileToUpdate.Name },
|
||||
{ "FileInfo", fileToUpdate },
|
||||
{ "FileSize", fileToUpdate.Size },
|
||||
{ "MimeType", newMimeType },
|
||||
{ "HasCompression", hasCompression },
|
||||
{ "HasThumbnail", hasThumbnail }
|
||||
});
|
||||
|
||||
// Send push notification for large files (>5MB) that took longer to process
|
||||
if (fileToUpdate.Size > 5 * 1024 * 1024) // 5MB threshold
|
||||
await SendLargeFileProcessingCompleteNotificationAsync(uploadTask, fileToUpdate);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task SendLargeFileProcessingCompleteNotificationAsync(PersistentUploadTask task, SnCloudFile file)
|
||||
{
|
||||
try
|
||||
{
|
||||
var ringService = serviceProvider.GetRequiredService<RingService.RingServiceClient>();
|
||||
|
||||
var pushNotification = new PushNotification
|
||||
{
|
||||
Topic = "drive.tasks.upload",
|
||||
Title = "File Processing Complete",
|
||||
Subtitle = file.Name,
|
||||
Body = $"Your file '{file.Name}' has finished processing and is now available.",
|
||||
IsSavable = true
|
||||
};
|
||||
|
||||
await ringService.SendPushNotificationToUserAsync(new SendPushNotificationToUserRequest
|
||||
{
|
||||
UserId = task.AccountId.ToString(),
|
||||
Notification = pushNotification
|
||||
});
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogWarning(ex, "Failed to send large file processing notification for task {TaskId}", task.TaskId);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,4 @@
|
||||
using System.ComponentModel.DataAnnotations;
|
||||
using System.Text.Json;
|
||||
using DysonNetwork.Drive.Billing;
|
||||
using DysonNetwork.Drive.Storage.Model;
|
||||
using DysonNetwork.Shared.Auth;
|
||||
@@ -31,15 +30,13 @@ public class FileUploadController(
|
||||
{
|
||||
private readonly string _tempPath =
|
||||
configuration.GetValue<string>("Storage:Uploads") ?? Path.Combine(Path.GetTempPath(), "multipart-uploads");
|
||||
private readonly ILogger<FileUploadController> _logger = logger;
|
||||
|
||||
private const long DefaultChunkSize = 1024 * 1024 * 5; // 5MB
|
||||
|
||||
[HttpPost("create")]
|
||||
public async Task<IActionResult> CreateUploadTask([FromBody] CreateUploadTaskRequest request)
|
||||
{
|
||||
var currentUser = HttpContext.Items["CurrentUser"] as Account;
|
||||
if (currentUser is null)
|
||||
if (HttpContext.Items["CurrentUser"] is not Account currentUser)
|
||||
return new ObjectResult(ApiError.Unauthorized()) { StatusCode = 401 };
|
||||
|
||||
var permissionCheck = await ValidateUserPermissions(currentUser);
|
||||
@@ -99,25 +96,25 @@ public class FileUploadController(
|
||||
new ObjectResult(ApiError.Unauthorized(forbidden: true)) { StatusCode = 403 };
|
||||
}
|
||||
|
||||
private async Task<IActionResult?> ValidatePoolAccess(Account currentUser, FilePool pool, CreateUploadTaskRequest request)
|
||||
private Task<IActionResult?> ValidatePoolAccess(Account currentUser, FilePool pool, CreateUploadTaskRequest request)
|
||||
{
|
||||
if (pool.PolicyConfig.RequirePrivilege <= 0) return null;
|
||||
if (pool.PolicyConfig.RequirePrivilege <= 0) return Task.FromResult<IActionResult?>(null);
|
||||
|
||||
var privilege = currentUser.PerkSubscription is null ? 0 :
|
||||
PerkSubscriptionPrivilege.GetPrivilegeFromIdentifier(currentUser.PerkSubscription.Identifier);
|
||||
|
||||
if (privilege < pool.PolicyConfig.RequirePrivilege)
|
||||
{
|
||||
return new ObjectResult(ApiError.Unauthorized(
|
||||
$"You need Stellar Program tier {pool.PolicyConfig.RequirePrivilege} to use pool {pool.Name}, you are tier {privilege}",
|
||||
forbidden: true))
|
||||
{ StatusCode = 403 };
|
||||
return Task.FromResult<IActionResult?>(new ObjectResult(ApiError.Unauthorized(
|
||||
$"You need Stellar Program tier {pool.PolicyConfig.RequirePrivilege} to use pool {pool.Name}, you are tier {privilege}",
|
||||
forbidden: true))
|
||||
{ StatusCode = 403 });
|
||||
}
|
||||
|
||||
return null;
|
||||
return Task.FromResult<IActionResult?>(null);
|
||||
}
|
||||
|
||||
private IActionResult? ValidatePoolPolicy(PolicyConfig policy, CreateUploadTaskRequest request)
|
||||
private static IActionResult? ValidatePoolPolicy(PolicyConfig policy, CreateUploadTaskRequest request)
|
||||
{
|
||||
if (!policy.AllowEncryption && !string.IsNullOrEmpty(request.EncryptPassword))
|
||||
{
|
||||
@@ -138,13 +135,11 @@ public class FileUploadController(
|
||||
|
||||
var foundMatch = policy.AcceptTypes.Any(acceptType =>
|
||||
{
|
||||
if (acceptType.EndsWith("/*", StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
var type = acceptType[..^2];
|
||||
return request.ContentType.StartsWith($"{type}/", StringComparison.OrdinalIgnoreCase);
|
||||
}
|
||||
if (!acceptType.EndsWith("/*", StringComparison.OrdinalIgnoreCase))
|
||||
return acceptType.Equals(request.ContentType, StringComparison.OrdinalIgnoreCase);
|
||||
var type = acceptType[..^2];
|
||||
return request.ContentType.StartsWith($"{type}/", StringComparison.OrdinalIgnoreCase);
|
||||
|
||||
return acceptType.Equals(request.ContentType, StringComparison.OrdinalIgnoreCase);
|
||||
});
|
||||
|
||||
if (!foundMatch)
|
||||
@@ -191,41 +186,13 @@ public class FileUploadController(
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<(string taskId, UploadTask task)> CreateUploadTaskInternal(CreateUploadTaskRequest request)
|
||||
{
|
||||
var taskId = await Nanoid.GenerateAsync();
|
||||
var taskPath = Path.Combine(_tempPath, taskId);
|
||||
Directory.CreateDirectory(taskPath);
|
||||
|
||||
var chunkSize = request.ChunkSize ?? DefaultChunkSize;
|
||||
var chunksCount = (int)Math.Ceiling((double)request.FileSize / chunkSize);
|
||||
|
||||
var task = new UploadTask
|
||||
{
|
||||
TaskId = taskId,
|
||||
FileName = request.FileName,
|
||||
FileSize = request.FileSize,
|
||||
ContentType = request.ContentType,
|
||||
ChunkSize = chunkSize,
|
||||
ChunksCount = chunksCount,
|
||||
PoolId = request.PoolId.Value,
|
||||
BundleId = request.BundleId,
|
||||
EncryptPassword = request.EncryptPassword,
|
||||
ExpiredAt = request.ExpiredAt,
|
||||
Hash = request.Hash,
|
||||
};
|
||||
|
||||
await System.IO.File.WriteAllTextAsync(Path.Combine(taskPath, "task.json"), JsonSerializer.Serialize(task));
|
||||
return (taskId, task);
|
||||
}
|
||||
|
||||
public class UploadChunkRequest
|
||||
{
|
||||
[Required]
|
||||
public IFormFile Chunk { get; set; } = null!;
|
||||
}
|
||||
|
||||
[HttpPost("chunk/{taskId}/{chunkIndex}")]
|
||||
[HttpPost("chunk/{taskId}/{chunkIndex:int}")]
|
||||
[RequestSizeLimit(DefaultChunkSize + 1024 * 1024)] // 6MB to be safe
|
||||
[RequestFormLimits(MultipartBodyLengthLimit = DefaultChunkSize + 1024 * 1024)]
|
||||
public async Task<IActionResult> UploadChunk(string taskId, int chunkIndex, [FromForm] UploadChunkRequest request)
|
||||
@@ -278,7 +245,7 @@ public class FileUploadController(
|
||||
|
||||
try
|
||||
{
|
||||
await MergeChunks(taskPath, mergedFilePath, persistentTask.ChunksCount);
|
||||
await MergeChunks(taskId, taskPath, mergedFilePath, persistentTask.ChunksCount, persistentTaskService);
|
||||
|
||||
var fileId = await Nanoid.GenerateAsync();
|
||||
var cloudFile = await fileService.ProcessNewFileAsync(
|
||||
@@ -293,10 +260,10 @@ public class FileUploadController(
|
||||
persistentTask.ExpiredAt
|
||||
);
|
||||
|
||||
// Mark task as completed
|
||||
await persistentTaskService.MarkTaskCompletedAsync(taskId);
|
||||
// Update task status to "processing" - background processing is now happening
|
||||
await persistentTaskService.UpdateTaskProgressAsync(taskId, 95, "Processing file in background...");
|
||||
|
||||
// Send completion notification
|
||||
// Send upload completion notification (file is uploaded, but processing continues)
|
||||
await persistentTaskService.SendUploadCompletedNotificationAsync(persistentTask, fileId);
|
||||
|
||||
return Ok(cloudFile);
|
||||
@@ -304,7 +271,7 @@ public class FileUploadController(
|
||||
catch (Exception ex)
|
||||
{
|
||||
// Log the actual exception for debugging
|
||||
_logger.LogError(ex, "Failed to complete upload for task {TaskId}. Error: {ErrorMessage}", taskId, ex.Message);
|
||||
logger.LogError(ex, "Failed to complete upload for task {TaskId}. Error: {ErrorMessage}", taskId, ex.Message);
|
||||
|
||||
// Mark task as failed
|
||||
await persistentTaskService.MarkTaskFailedAsync(taskId);
|
||||
@@ -328,24 +295,41 @@ public class FileUploadController(
|
||||
}
|
||||
}
|
||||
|
||||
private async Task MergeChunks(string taskPath, string mergedFilePath, int chunksCount)
|
||||
private static async Task MergeChunks(
|
||||
string taskId,
|
||||
string taskPath,
|
||||
string mergedFilePath,
|
||||
int chunksCount,
|
||||
PersistentTaskService persistentTaskService)
|
||||
{
|
||||
await using var mergedStream = new FileStream(mergedFilePath, FileMode.Create);
|
||||
|
||||
const double baseProgress = 0.8; // Start from 80% (chunk upload is already at 95%)
|
||||
const double remainingProgress = 0.15; // Remaining 15% progress distributed across chunks
|
||||
var progressPerChunk = remainingProgress / chunksCount;
|
||||
|
||||
for (var i = 0; i < chunksCount; i++)
|
||||
{
|
||||
var chunkPath = Path.Combine(taskPath, $"{i}.chunk");
|
||||
var chunkPath = Path.Combine(taskPath, i + ".chunk");
|
||||
if (!System.IO.File.Exists(chunkPath))
|
||||
{
|
||||
throw new InvalidOperationException($"Chunk {i} is missing.");
|
||||
throw new InvalidOperationException("Chunk " + i + " is missing.");
|
||||
}
|
||||
|
||||
await using var chunkStream = new FileStream(chunkPath, FileMode.Open);
|
||||
await chunkStream.CopyToAsync(mergedStream);
|
||||
|
||||
// Update progress after each chunk is merged
|
||||
var currentProgress = baseProgress + (progressPerChunk * (i + 1));
|
||||
await persistentTaskService.UpdateTaskProgressAsync(
|
||||
taskId,
|
||||
currentProgress,
|
||||
"Merging chunks... (" + (i + 1) + "/" + chunksCount + ")"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task CleanupTempFiles(string taskPath, string mergedFilePath)
|
||||
private static Task CleanupTempFiles(string taskPath, string mergedFilePath)
|
||||
{
|
||||
try
|
||||
{
|
||||
@@ -359,6 +343,8 @@ public class FileUploadController(
|
||||
{
|
||||
// Ignore cleanup errors to avoid masking the original exception
|
||||
}
|
||||
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
// New endpoints for resumable uploads
|
||||
@@ -395,7 +381,7 @@ public class FileUploadController(
|
||||
t.LastActivity,
|
||||
t.CreatedAt,
|
||||
t.UpdatedAt,
|
||||
UploadedChunks = t.UploadedChunks,
|
||||
t.UploadedChunks,
|
||||
Pool = new { t.PoolId, Name = "Pool Name" }, // Could be expanded to include pool details
|
||||
Bundle = t.BundleId.HasValue ? new { t.BundleId } : null
|
||||
}));
|
||||
@@ -463,7 +449,7 @@ public class FileUploadController(
|
||||
task.ChunkSize,
|
||||
task.ChunksCount,
|
||||
task.ChunksUploaded,
|
||||
UploadedChunks = task.UploadedChunks,
|
||||
task.UploadedChunks,
|
||||
Progress = task.ChunksCount > 0 ? (double)task.ChunksUploaded / task.ChunksCount * 100 : 0
|
||||
});
|
||||
}
|
||||
@@ -505,14 +491,14 @@ public class FileUploadController(
|
||||
|
||||
return Ok(new
|
||||
{
|
||||
TotalTasks = stats.TotalTasks,
|
||||
InProgressTasks = stats.InProgressTasks,
|
||||
CompletedTasks = stats.CompletedTasks,
|
||||
FailedTasks = stats.FailedTasks,
|
||||
ExpiredTasks = stats.ExpiredTasks,
|
||||
TotalUploadedBytes = stats.TotalUploadedBytes,
|
||||
AverageProgress = stats.AverageProgress,
|
||||
RecentActivity = stats.RecentActivity
|
||||
stats.TotalTasks,
|
||||
stats.InProgressTasks,
|
||||
stats.CompletedTasks,
|
||||
stats.FailedTasks,
|
||||
stats.ExpiredTasks,
|
||||
stats.TotalUploadedBytes,
|
||||
stats.AverageProgress,
|
||||
stats.RecentActivity
|
||||
});
|
||||
}
|
||||
|
||||
@@ -591,7 +577,7 @@ public class FileUploadController(
|
||||
task.UpdatedAt,
|
||||
task.ExpiredAt,
|
||||
task.Hash,
|
||||
UploadedChunks = task.UploadedChunks
|
||||
task.UploadedChunks
|
||||
},
|
||||
Pool = pool != null ? new
|
||||
{
|
||||
@@ -610,9 +596,9 @@ public class FileUploadController(
|
||||
});
|
||||
}
|
||||
|
||||
private string? CalculateEstimatedTime(PersistentUploadTask task)
|
||||
private static string? CalculateEstimatedTime(PersistentUploadTask task)
|
||||
{
|
||||
if (task.Status != Model.TaskStatus.InProgress || task.ChunksUploaded == 0)
|
||||
if (task.Status != TaskStatus.InProgress || task.ChunksUploaded == 0)
|
||||
return null;
|
||||
|
||||
var elapsed = NodaTime.SystemClock.Instance.GetCurrentInstant() - task.CreatedAt;
|
||||
@@ -625,27 +611,29 @@ public class FileUploadController(
|
||||
|
||||
var remainingSeconds = remainingChunks / chunksPerSecond;
|
||||
|
||||
if (remainingSeconds < 60)
|
||||
return $"{remainingSeconds:F0} seconds";
|
||||
if (remainingSeconds < 3600)
|
||||
return $"{remainingSeconds / 60:F0} minutes";
|
||||
return $"{remainingSeconds / 3600:F1} hours";
|
||||
return remainingSeconds switch
|
||||
{
|
||||
< 60 => $"{remainingSeconds:F0} seconds",
|
||||
< 3600 => $"{remainingSeconds / 60:F0} minutes",
|
||||
_ => $"{remainingSeconds / 3600:F1} hours"
|
||||
};
|
||||
}
|
||||
|
||||
private string? CalculateUploadSpeed(PersistentUploadTask task)
|
||||
private static string? CalculateUploadSpeed(PersistentUploadTask task)
|
||||
{
|
||||
if (task.ChunksUploaded == 0)
|
||||
return null;
|
||||
|
||||
var elapsed = NodaTime.SystemClock.Instance.GetCurrentInstant() - task.CreatedAt;
|
||||
var elapsed = SystemClock.Instance.GetCurrentInstant() - task.CreatedAt;
|
||||
var elapsedSeconds = elapsed.TotalSeconds;
|
||||
var bytesUploaded = (long)task.ChunksUploaded * task.ChunkSize;
|
||||
var bytesUploaded = task.ChunksUploaded * task.ChunkSize;
|
||||
var bytesPerSecond = bytesUploaded / elapsedSeconds;
|
||||
|
||||
if (bytesPerSecond < 1024)
|
||||
return $"{bytesPerSecond:F0} B/s";
|
||||
if (bytesPerSecond < 1024 * 1024)
|
||||
return $"{bytesPerSecond / 1024:F0} KB/s";
|
||||
return $"{bytesPerSecond / (1024 * 1024):F1} MB/s";
|
||||
return bytesPerSecond switch
|
||||
{
|
||||
< 1024 => $"{bytesPerSecond:F0} B/s",
|
||||
< 1024 * 1024 => $"{bytesPerSecond / 1024:F0} KB/s",
|
||||
_ => $"{bytesPerSecond / (1024 * 1024):F1} MB/s"
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -157,24 +157,6 @@ public class PersistentTask : ModelBase
|
||||
|
||||
// Estimated duration in seconds
|
||||
public long? EstimatedDurationSeconds { get; set; }
|
||||
|
||||
// Helper methods for parameter management using GrpcTypeHelper
|
||||
public MapField<string, Google.Protobuf.WellKnownTypes.Value> GetParametersAsGrpcMap()
|
||||
{
|
||||
var nonNullableParameters = Parameters.ToDictionary(kvp => kvp.Key, kvp => kvp.Value ?? string.Empty);
|
||||
return GrpcTypeHelper.ConvertToValueMap(nonNullableParameters);
|
||||
}
|
||||
|
||||
public void SetParametersFromGrpcMap(MapField<string, Google.Protobuf.WellKnownTypes.Value> map)
|
||||
{
|
||||
Parameters = GrpcTypeHelper.ConvertFromValueMap(map);
|
||||
}
|
||||
|
||||
public MapField<string, Google.Protobuf.WellKnownTypes.Value> GetResultsAsGrpcMap()
|
||||
{
|
||||
var nonNullableResults = Results.ToDictionary(kvp => kvp.Key, kvp => kvp.Value ?? string.Empty);
|
||||
return GrpcTypeHelper.ConvertToValueMap(nonNullableResults);
|
||||
}
|
||||
}
|
||||
|
||||
// Backward compatibility - UploadTask inherits from PersistentTask
|
||||
|
||||
@@ -26,7 +26,7 @@ public class PersistentTaskService(
|
||||
/// </summary>
|
||||
public async Task<T> CreateTaskAsync<T>(T task) where T : PersistentTask
|
||||
{
|
||||
task.TaskId = NanoidDotNet.Nanoid.Generate();
|
||||
task.TaskId = await Nanoid.GenerateAsync();
|
||||
var now = SystemClock.Instance.GetCurrentInstant();
|
||||
task.CreatedAt = now;
|
||||
task.UpdatedAt = now;
|
||||
@@ -45,7 +45,7 @@ public class PersistentTaskService(
|
||||
/// <summary>
|
||||
/// Gets a task by ID
|
||||
/// </summary>
|
||||
public async Task<T?> GetTaskAsync<T>(string taskId) where T : PersistentTask
|
||||
private async Task<T?> GetTaskAsync<T>(string taskId) where T : PersistentTask
|
||||
{
|
||||
var cacheKey = $"{CacheKeyPrefix}{taskId}";
|
||||
var cachedTask = await cache.GetAsync<T>(cacheKey);
|
||||
@@ -55,13 +55,9 @@ public class PersistentTaskService(
|
||||
var task = await db.Tasks
|
||||
.FirstOrDefaultAsync(t => t.TaskId == taskId);
|
||||
|
||||
if (task is T typedTask)
|
||||
{
|
||||
await SetCacheAsync(typedTask);
|
||||
return typedTask;
|
||||
}
|
||||
|
||||
return null;
|
||||
if (task is not T typedTask) return null;
|
||||
await SetCacheAsync(typedTask);
|
||||
return typedTask;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -73,20 +69,35 @@ public class PersistentTaskService(
|
||||
if (task is null) return;
|
||||
|
||||
var previousProgress = task.Progress;
|
||||
task.Progress = Math.Clamp(progress, 0, 100);
|
||||
task.LastActivity = SystemClock.Instance.GetCurrentInstant();
|
||||
task.UpdatedAt = task.LastActivity;
|
||||
var clampedProgress = Math.Clamp(progress, 0, 1.0);
|
||||
var now = SystemClock.Instance.GetCurrentInstant();
|
||||
|
||||
if (statusMessage is not null)
|
||||
// Use ExecuteUpdateAsync for better performance - update only the fields we need
|
||||
var updatedRows = await db.Tasks
|
||||
.Where(t => t.TaskId == taskId)
|
||||
.ExecuteUpdateAsync(setters => setters
|
||||
.SetProperty(t => t.Progress, clampedProgress)
|
||||
.SetProperty(t => t.LastActivity, now)
|
||||
.SetProperty(t => t.UpdatedAt, now)
|
||||
.SetProperty(t => t.Description, t => statusMessage ?? t.Description)
|
||||
);
|
||||
|
||||
if (updatedRows > 0)
|
||||
{
|
||||
task.Description = statusMessage;
|
||||
// Update the cached task
|
||||
task.Progress = clampedProgress;
|
||||
task.LastActivity = now;
|
||||
task.UpdatedAt = now;
|
||||
if (statusMessage is not null)
|
||||
{
|
||||
task.Description = statusMessage;
|
||||
}
|
||||
|
||||
await SetCacheAsync(task);
|
||||
|
||||
// Send progress update notification
|
||||
await SendTaskProgressUpdateAsync(task, task.Progress, previousProgress);
|
||||
}
|
||||
|
||||
await db.SaveChangesAsync();
|
||||
await SetCacheAsync(task);
|
||||
|
||||
// Send progress update notification
|
||||
await SendTaskProgressUpdateAsync(task, task.Progress, previousProgress);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -98,24 +109,38 @@ public class PersistentTaskService(
|
||||
if (task is null) return;
|
||||
|
||||
var now = SystemClock.Instance.GetCurrentInstant();
|
||||
task.Status = TaskStatus.Completed;
|
||||
task.Progress = 100;
|
||||
task.CompletedAt = now;
|
||||
task.LastActivity = now;
|
||||
task.UpdatedAt = now;
|
||||
|
||||
if (results is not null)
|
||||
// Use ExecuteUpdateAsync for better performance - update only the fields we need
|
||||
var updatedRows = await db.Tasks
|
||||
.Where(t => t.TaskId == taskId)
|
||||
.ExecuteUpdateAsync(setters => setters
|
||||
.SetProperty(t => t.Status, TaskStatus.Completed)
|
||||
.SetProperty(t => t.Progress, 1.0)
|
||||
.SetProperty(t => t.CompletedAt, now)
|
||||
.SetProperty(t => t.LastActivity, now)
|
||||
.SetProperty(t => t.UpdatedAt, now)
|
||||
);
|
||||
|
||||
if (updatedRows > 0)
|
||||
{
|
||||
foreach (var (key, value) in results)
|
||||
// Update the cached task with results if provided
|
||||
task.Status = TaskStatus.Completed;
|
||||
task.Progress = 1.0;
|
||||
task.CompletedAt = now;
|
||||
task.LastActivity = now;
|
||||
task.UpdatedAt = now;
|
||||
|
||||
if (results is not null)
|
||||
{
|
||||
task.Results[key] = value;
|
||||
foreach (var (key, value) in results)
|
||||
{
|
||||
task.Results[key] = value;
|
||||
}
|
||||
}
|
||||
|
||||
await RemoveCacheAsync(taskId);
|
||||
await SendTaskCompletedNotificationAsync(task);
|
||||
}
|
||||
|
||||
await db.SaveChangesAsync();
|
||||
await RemoveCacheAsync(taskId);
|
||||
|
||||
await SendTaskCompletedNotificationAsync(task);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -126,15 +151,30 @@ public class PersistentTaskService(
|
||||
var task = await GetTaskAsync<PersistentTask>(taskId);
|
||||
if (task is null) return;
|
||||
|
||||
task.Status = TaskStatus.Failed;
|
||||
task.ErrorMessage = errorMessage ?? "Task failed due to an unknown error";
|
||||
task.LastActivity = SystemClock.Instance.GetCurrentInstant();
|
||||
task.UpdatedAt = task.LastActivity;
|
||||
var now = SystemClock.Instance.GetCurrentInstant();
|
||||
var errorMsg = errorMessage ?? "Task failed due to an unknown error";
|
||||
|
||||
await db.SaveChangesAsync();
|
||||
await RemoveCacheAsync(taskId);
|
||||
// Use ExecuteUpdateAsync for better performance - update only the fields we need
|
||||
var updatedRows = await db.Tasks
|
||||
.Where(t => t.TaskId == taskId)
|
||||
.ExecuteUpdateAsync(setters => setters
|
||||
.SetProperty(t => t.Status, TaskStatus.Failed)
|
||||
.SetProperty(t => t.ErrorMessage, errorMsg)
|
||||
.SetProperty(t => t.LastActivity, now)
|
||||
.SetProperty(t => t.UpdatedAt, now)
|
||||
);
|
||||
|
||||
await SendTaskFailedNotificationAsync(task);
|
||||
if (updatedRows > 0)
|
||||
{
|
||||
// Update the cached task
|
||||
task.Status = TaskStatus.Failed;
|
||||
task.ErrorMessage = errorMsg;
|
||||
task.LastActivity = now;
|
||||
task.UpdatedAt = now;
|
||||
|
||||
await RemoveCacheAsync(taskId);
|
||||
await SendTaskFailedNotificationAsync(task);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -281,20 +321,20 @@ public class PersistentTaskService(
|
||||
ExpiredTasks = tasks.Count(t => t.Status == TaskStatus.Expired),
|
||||
AverageProgress = tasks.Any(t => t.Status == TaskStatus.InProgress || t.Status == TaskStatus.Paused)
|
||||
? tasks.Where(t => t.Status == TaskStatus.InProgress || t.Status == TaskStatus.Paused)
|
||||
.Average(t => t.Progress)
|
||||
.Average(t => t.Progress)
|
||||
: 0,
|
||||
RecentActivity = tasks.OrderByDescending(t => t.LastActivity)
|
||||
.Take(10)
|
||||
.Select(t => new TaskActivity
|
||||
{
|
||||
TaskId = t.TaskId,
|
||||
Name = t.Name,
|
||||
Type = t.Type,
|
||||
Status = t.Status,
|
||||
Progress = t.Progress,
|
||||
LastActivity = t.LastActivity
|
||||
})
|
||||
.ToList()
|
||||
.Take(10)
|
||||
.Select(t => new TaskActivity
|
||||
{
|
||||
TaskId = t.TaskId,
|
||||
Name = t.Name,
|
||||
Type = t.Type,
|
||||
Status = t.Status,
|
||||
Progress = t.Progress,
|
||||
LastActivity = t.LastActivity
|
||||
})
|
||||
.ToList()
|
||||
};
|
||||
|
||||
return stats;
|
||||
@@ -314,11 +354,11 @@ public class PersistentTaskService(
|
||||
|
||||
var oldTasks = await db.Tasks
|
||||
.Where(t => t.AccountId == accountId &&
|
||||
(t.Status == TaskStatus.Completed ||
|
||||
t.Status == TaskStatus.Failed ||
|
||||
t.Status == TaskStatus.Cancelled ||
|
||||
t.Status == TaskStatus.Expired) &&
|
||||
t.UpdatedAt < cutoff)
|
||||
(t.Status == TaskStatus.Completed ||
|
||||
t.Status == TaskStatus.Failed ||
|
||||
t.Status == TaskStatus.Cancelled ||
|
||||
t.Status == TaskStatus.Expired) &&
|
||||
t.UpdatedAt < cutoff)
|
||||
.ToListAsync();
|
||||
|
||||
db.Tasks.RemoveRange(oldTasks);
|
||||
@@ -344,13 +384,13 @@ public class PersistentTaskService(
|
||||
TaskId = task.TaskId,
|
||||
Name = task.Name,
|
||||
Type = task.Type.ToString(),
|
||||
CreatedAt = task.CreatedAt.ToString("%O", null)
|
||||
CreatedAt = task.CreatedAt.ToString()
|
||||
};
|
||||
|
||||
var packet = new WebSocketPacket
|
||||
{
|
||||
Type = "task.created",
|
||||
Data = Google.Protobuf.ByteString.CopyFromUtf8(System.Text.Json.JsonSerializer.Serialize(data))
|
||||
Data = GrpcTypeHelper.ConvertObjectToByteString(data)
|
||||
};
|
||||
|
||||
await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest
|
||||
@@ -380,13 +420,13 @@ public class PersistentTaskService(
|
||||
Type = task.Type.ToString(),
|
||||
Progress = newProgress,
|
||||
Status = task.Status.ToString(),
|
||||
LastActivity = task.LastActivity.ToString("%O", null)
|
||||
LastActivity = task.LastActivity.ToString()
|
||||
};
|
||||
|
||||
var packet = new WebSocketPacket
|
||||
{
|
||||
Type = "task.progress",
|
||||
Data = Google.Protobuf.ByteString.CopyFromUtf8(System.Text.Json.JsonSerializer.Serialize(data))
|
||||
Data = GrpcTypeHelper.ConvertObjectToByteString(data)
|
||||
};
|
||||
|
||||
await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest
|
||||
@@ -410,7 +450,7 @@ public class PersistentTaskService(
|
||||
TaskId = task.TaskId,
|
||||
Name = task.Name,
|
||||
Type = task.Type.ToString(),
|
||||
CompletedAt = task.CompletedAt?.ToString("%O", null) ?? task.UpdatedAt.ToString("%O", null),
|
||||
CompletedAt = task.CompletedAt?.ToString() ?? task.UpdatedAt.ToString(),
|
||||
Results = task.Results
|
||||
};
|
||||
|
||||
@@ -418,7 +458,7 @@ public class PersistentTaskService(
|
||||
var wsPacket = new WebSocketPacket
|
||||
{
|
||||
Type = "task.completed",
|
||||
Data = Google.Protobuf.ByteString.CopyFromUtf8(System.Text.Json.JsonSerializer.Serialize(data))
|
||||
Data = GrpcTypeHelper.ConvertObjectToByteString(data)
|
||||
};
|
||||
|
||||
await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest
|
||||
@@ -430,7 +470,7 @@ public class PersistentTaskService(
|
||||
// Push notification
|
||||
var pushNotification = new PushNotification
|
||||
{
|
||||
Topic = "task",
|
||||
Topic = "drive.tasks",
|
||||
Title = "Task Completed",
|
||||
Subtitle = task.Name,
|
||||
Body = $"Your {task.Type.ToString().ToLower()} task has completed successfully.",
|
||||
@@ -458,7 +498,7 @@ public class PersistentTaskService(
|
||||
TaskId = task.TaskId,
|
||||
Name = task.Name,
|
||||
Type = task.Type.ToString(),
|
||||
FailedAt = task.UpdatedAt.ToString("%O", null),
|
||||
FailedAt = task.UpdatedAt.ToString(),
|
||||
ErrorMessage = task.ErrorMessage ?? "Task failed due to an unknown error"
|
||||
};
|
||||
|
||||
@@ -466,7 +506,7 @@ public class PersistentTaskService(
|
||||
var wsPacket = new WebSocketPacket
|
||||
{
|
||||
Type = "task.failed",
|
||||
Data = Google.Protobuf.ByteString.CopyFromUtf8(System.Text.Json.JsonSerializer.Serialize(data))
|
||||
Data = GrpcTypeHelper.ConvertObjectToByteString(data)
|
||||
};
|
||||
|
||||
await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest
|
||||
@@ -478,7 +518,7 @@ public class PersistentTaskService(
|
||||
// Push notification
|
||||
var pushNotification = new PushNotification
|
||||
{
|
||||
Topic = "task",
|
||||
Topic = "drive.tasks",
|
||||
Title = "Task Failed",
|
||||
Subtitle = task.Name,
|
||||
Body = $"Your {task.Type.ToString().ToLower()} task has failed.",
|
||||
@@ -504,7 +544,7 @@ public class PersistentTaskService(
|
||||
private async Task SetCacheAsync(PersistentTask task)
|
||||
{
|
||||
var cacheKey = $"{CacheKeyPrefix}{task.TaskId}";
|
||||
|
||||
|
||||
// Cache the entire task object directly - this includes all properties including Parameters dictionary
|
||||
await cache.SetAsync(cacheKey, task, CacheDuration);
|
||||
}
|
||||
@@ -537,7 +577,7 @@ public class PersistentTaskService(
|
||||
|
||||
// If no pools exist, create a default one
|
||||
logger.LogWarning("No pools found in database. Creating default pool...");
|
||||
|
||||
|
||||
var defaultPoolId = Guid.NewGuid();
|
||||
var defaultPool = new DysonNetwork.Shared.Models.FilePool
|
||||
{
|
||||
@@ -593,7 +633,7 @@ public class PersistentTaskService(
|
||||
{
|
||||
var chunkSize = request.ChunkSize ?? 1024 * 1024 * 5; // 5MB default
|
||||
var chunksCount = (int)Math.Ceiling((double)request.FileSize / chunkSize);
|
||||
|
||||
|
||||
// Use default pool if no pool is specified, or find first available pool
|
||||
var poolId = request.PoolId ?? await GetFirstAvailablePoolIdAsync();
|
||||
|
||||
@@ -657,16 +697,31 @@ public class PersistentTaskService(
|
||||
{
|
||||
var previousProgress = task.ChunksCount > 0 ? (double)task.ChunksUploaded / task.ChunksCount * 100 : 0;
|
||||
|
||||
task.UploadedChunks.Add(chunkIndex);
|
||||
task.ChunksUploaded = task.UploadedChunks.Count;
|
||||
task.LastActivity = SystemClock.Instance.GetCurrentInstant();
|
||||
// Use ExecuteUpdateAsync for better performance - update only the fields we need
|
||||
var now = SystemClock.Instance.GetCurrentInstant();
|
||||
var updatedRows = await db.Tasks
|
||||
.OfType<PersistentUploadTask>()
|
||||
.Where(t => t.TaskId == taskId)
|
||||
.ExecuteUpdateAsync(setters => setters
|
||||
.SetProperty(t => t.UploadedChunks, t => t.UploadedChunks.Append(chunkIndex).Distinct().ToList())
|
||||
.SetProperty(t => t.ChunksUploaded, t => t.UploadedChunks.Count)
|
||||
.SetProperty(t => t.LastActivity, now)
|
||||
.SetProperty(t => t.UpdatedAt, now)
|
||||
);
|
||||
|
||||
await db.SaveChangesAsync();
|
||||
await SetCacheAsync(task);
|
||||
if (updatedRows > 0)
|
||||
{
|
||||
// Update the cached task
|
||||
task.UploadedChunks.Add(chunkIndex);
|
||||
task.ChunksUploaded = task.UploadedChunks.Count;
|
||||
task.LastActivity = now;
|
||||
task.UpdatedAt = now;
|
||||
await SetCacheAsync(task);
|
||||
|
||||
// Send real-time progress update
|
||||
var newProgress = task.ChunksCount > 0 ? (double)task.ChunksUploaded / task.ChunksCount * 100 : 0;
|
||||
await SendUploadProgressUpdateAsync(task, newProgress, previousProgress);
|
||||
// Send real-time progress update
|
||||
var newProgress = task.ChunksCount > 0 ? (double)task.ChunksUploaded / task.ChunksCount * 100 : 0;
|
||||
await SendUploadProgressUpdateAsync(task, newProgress, previousProgress);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -727,17 +782,17 @@ public class PersistentTaskService(
|
||||
? query.OrderByDescending(t => t.FileSize)
|
||||
: query.OrderBy(t => t.FileSize);
|
||||
break;
|
||||
case "createdat":
|
||||
case "created":
|
||||
orderedQuery = sortDescending
|
||||
? query.OrderByDescending(t => t.CreatedAt)
|
||||
: query.OrderBy(t => t.CreatedAt);
|
||||
break;
|
||||
case "updatedat":
|
||||
case "updated":
|
||||
orderedQuery = sortDescending
|
||||
? query.OrderByDescending(t => t.UpdatedAt)
|
||||
: query.OrderBy(t => t.UpdatedAt);
|
||||
break;
|
||||
case "lastactivity":
|
||||
case "activity":
|
||||
default:
|
||||
orderedQuery = sortDescending
|
||||
? query.OrderByDescending(t => t.LastActivity)
|
||||
@@ -774,19 +829,19 @@ public class PersistentTaskService(
|
||||
TotalUploadedBytes = tasks.Sum(t => (long)t.ChunksUploaded * t.ChunkSize),
|
||||
AverageProgress = tasks.Any(t => t.Status == Model.TaskStatus.InProgress)
|
||||
? tasks.Where(t => t.Status == Model.TaskStatus.InProgress)
|
||||
.Average(t => t.ChunksCount > 0 ? (double)t.ChunksUploaded / t.ChunksCount * 100 : 0)
|
||||
.Average(t => t.ChunksCount > 0 ? (double)t.ChunksUploaded / t.ChunksCount * 100 : 0)
|
||||
: 0,
|
||||
RecentActivity = tasks.OrderByDescending(t => t.LastActivity)
|
||||
.Take(5)
|
||||
.Select(t => new RecentActivity
|
||||
{
|
||||
TaskId = t.TaskId,
|
||||
FileName = t.FileName,
|
||||
Status = (UploadTaskStatus)t.Status,
|
||||
LastActivity = t.LastActivity,
|
||||
Progress = t.ChunksCount > 0 ? (double)t.ChunksUploaded / t.ChunksCount * 100 : 0
|
||||
})
|
||||
.ToList()
|
||||
.Take(5)
|
||||
.Select(t => new RecentActivity
|
||||
{
|
||||
TaskId = t.TaskId,
|
||||
FileName = t.FileName,
|
||||
Status = (UploadTaskStatus)t.Status,
|
||||
LastActivity = t.LastActivity,
|
||||
Progress = t.ChunksCount > 0 ? (double)t.ChunksUploaded / t.ChunksCount * 100 : 0
|
||||
})
|
||||
.ToList()
|
||||
};
|
||||
|
||||
return stats;
|
||||
@@ -800,7 +855,7 @@ public class PersistentTaskService(
|
||||
var failedTasks = await db.Tasks
|
||||
.OfType<PersistentUploadTask>()
|
||||
.Where(t => t.AccountId == accountId &&
|
||||
(t.Status == Model.TaskStatus.Failed || t.Status == Model.TaskStatus.Expired))
|
||||
(t.Status == TaskStatus.Failed || t.Status == TaskStatus.Expired))
|
||||
.ToListAsync();
|
||||
|
||||
foreach (var task in failedTasks)
|
||||
@@ -809,16 +864,14 @@ public class PersistentTaskService(
|
||||
|
||||
// Clean up temp files
|
||||
var taskPath = Path.Combine(Path.GetTempPath(), "multipart-uploads", task.TaskId);
|
||||
if (Directory.Exists(taskPath))
|
||||
if (!Directory.Exists(taskPath)) continue;
|
||||
try
|
||||
{
|
||||
try
|
||||
{
|
||||
Directory.Delete(taskPath, true);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogWarning(ex, "Failed to cleanup temp files for task {TaskId}", task.TaskId);
|
||||
}
|
||||
Directory.Delete(taskPath, true);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogWarning(ex, "Failed to cleanup temp files for task {TaskId}", task.TaskId);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -854,14 +907,14 @@ public class PersistentTaskService(
|
||||
FileId = fileId,
|
||||
FileName = task.FileName,
|
||||
FileSize = task.FileSize,
|
||||
CompletedAt = SystemClock.Instance.GetCurrentInstant().ToString("%O", null)
|
||||
CompletedAt = SystemClock.Instance.GetCurrentInstant().ToString()
|
||||
};
|
||||
|
||||
// Send WebSocket notification
|
||||
var wsPacket = new WebSocketPacket
|
||||
{
|
||||
Type = "upload.completed",
|
||||
Data = Google.Protobuf.ByteString.CopyFromUtf8(System.Text.Json.JsonSerializer.Serialize(completionData))
|
||||
Data = GrpcTypeHelper.ConvertObjectToByteString(completionData)
|
||||
};
|
||||
|
||||
await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest
|
||||
@@ -873,7 +926,7 @@ public class PersistentTaskService(
|
||||
// Send push notification
|
||||
var pushNotification = new PushNotification
|
||||
{
|
||||
Topic = "upload",
|
||||
Topic = "drive.tasks.upload",
|
||||
Title = "Upload Completed",
|
||||
Subtitle = task.FileName,
|
||||
Body = $"Your file '{task.FileName}' has been uploaded successfully.",
|
||||
@@ -904,7 +957,7 @@ public class PersistentTaskService(
|
||||
TaskId = task.TaskId,
|
||||
FileName = task.FileName,
|
||||
FileSize = task.FileSize,
|
||||
FailedAt = SystemClock.Instance.GetCurrentInstant().ToString("%O", null),
|
||||
FailedAt = SystemClock.Instance.GetCurrentInstant().ToString(),
|
||||
ErrorMessage = errorMessage ?? "Upload failed due to an unknown error"
|
||||
};
|
||||
|
||||
@@ -912,7 +965,7 @@ public class PersistentTaskService(
|
||||
var wsPacket = new WebSocketPacket
|
||||
{
|
||||
Type = "upload.failed",
|
||||
Data = Google.Protobuf.ByteString.CopyFromUtf8(System.Text.Json.JsonSerializer.Serialize(failureData))
|
||||
Data = GrpcTypeHelper.ConvertObjectToByteString(failureData)
|
||||
};
|
||||
|
||||
await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest
|
||||
@@ -924,7 +977,7 @@ public class PersistentTaskService(
|
||||
// Send push notification
|
||||
var pushNotification = new PushNotification
|
||||
{
|
||||
Topic = "upload",
|
||||
Topic = "drive.tasks.upload",
|
||||
Title = "Upload Failed",
|
||||
Subtitle = task.FileName,
|
||||
Body = $"Your file '{task.FileName}' upload has failed. You can try again.",
|
||||
@@ -946,7 +999,8 @@ public class PersistentTaskService(
|
||||
/// <summary>
|
||||
/// Sends real-time upload progress update via WebSocket
|
||||
/// </summary>
|
||||
private async Task SendUploadProgressUpdateAsync(PersistentUploadTask task, double newProgress, double previousProgress)
|
||||
private async Task SendUploadProgressUpdateAsync(PersistentUploadTask task, double newProgress,
|
||||
double previousProgress)
|
||||
{
|
||||
try
|
||||
{
|
||||
@@ -963,13 +1017,13 @@ public class PersistentTaskService(
|
||||
ChunksTotal = task.ChunksCount,
|
||||
Progress = newProgress,
|
||||
Status = task.Status.ToString(),
|
||||
LastActivity = task.LastActivity.ToString("%O", null)
|
||||
LastActivity = task.LastActivity.ToString()
|
||||
};
|
||||
|
||||
var packet = new WebSocketPacket
|
||||
{
|
||||
Type = "upload.progress",
|
||||
Data = Google.Protobuf.ByteString.CopyFromUtf8(System.Text.Json.JsonSerializer.Serialize(progressData))
|
||||
Data = GrpcTypeHelper.ConvertObjectToByteString(progressData)
|
||||
};
|
||||
|
||||
await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest
|
||||
@@ -1104,4 +1158,4 @@ public class RecentActivity
|
||||
public double Progress { get; set; }
|
||||
}
|
||||
|
||||
#endregion
|
||||
#endregion
|
||||
@@ -1,33 +1,35 @@
|
||||
# DysonNetwork Drive - Persistent/Resumable Upload System
|
||||
# DysonNetwork Drive - Persistent Task System
|
||||
|
||||
A comprehensive, production-ready file upload system with resumable uploads, real-time progress tracking, and dynamic notifications powered by RingService.
|
||||
A comprehensive, production-ready generic task system with support for file uploads, background operations, real-time progress tracking, and dynamic notifications powered by RingService.
|
||||
|
||||
When using with the Gateway, use the `/drive` to replace `/api`.
|
||||
The realtime messages are from the websocket gateway.
|
||||
|
||||
## 🚀 Features
|
||||
|
||||
### Core Upload Features
|
||||
### Core Task Features
|
||||
- **Generic Task System**: Support for various background operations beyond file uploads
|
||||
- **Resumable Uploads**: Pause and resume uploads across app restarts
|
||||
- **Chunked Uploads**: Efficient large file handling with configurable chunk sizes
|
||||
- **Progress Persistence**: Upload state survives server restarts and network interruptions
|
||||
- **Progress Persistence**: Task state survives server restarts and network interruptions
|
||||
- **Duplicate Detection**: Automatic detection of already uploaded files via hash checking
|
||||
- **Quota Management**: Integration with user quota and billing systems
|
||||
- **Pool-based Storage**: Support for multiple storage pools with different policies
|
||||
|
||||
### Real-Time Features
|
||||
- **Live Progress Updates**: WebSocket-based real-time progress tracking
|
||||
- **Completion Notifications**: Instant notifications when uploads complete
|
||||
- **Failure Alerts**: Immediate notification of upload failures with error details
|
||||
- **Live Progress Updates**: WebSocket-based real-time progress tracking for all task types
|
||||
- **Task Lifecycle Notifications**: Instant notifications for task creation, progress, completion, and failure
|
||||
- **Failure Alerts**: Immediate notification of task failures with error details
|
||||
- **Push Notifications**: Cross-platform push notifications for mobile/desktop
|
||||
- **Smart Throttling**: Optimized update frequency to prevent network spam
|
||||
|
||||
### Management Features
|
||||
- **Task Listing**: Comprehensive API for listing and filtering upload tasks
|
||||
- **Task Statistics**: Detailed analytics and usage statistics
|
||||
- **Task Listing**: Comprehensive API for listing and filtering all task types
|
||||
- **Task Statistics**: Detailed analytics and usage statistics for all operations
|
||||
- **Cleanup Operations**: Automatic and manual cleanup of failed/stale tasks
|
||||
- **Ownership Verification**: Secure access control for all operations
|
||||
- **Detailed Task Info**: Rich metadata including speed calculations and ETAs
|
||||
- **Detailed Task Info**: Rich metadata including progress, parameters, and results
|
||||
- **Task Lifecycle Management**: Full control over task states (pause, resume, cancel)
|
||||
|
||||
## 📋 Table of Contents
|
||||
|
||||
@@ -93,18 +95,29 @@ Creates a new resumable upload task.
|
||||
**Request Body:**
|
||||
```json
|
||||
{
|
||||
"fileName": "string", // Required: Name of the file
|
||||
"fileSize": "long", // Required: Size in bytes
|
||||
"contentType": "string", // Required: MIME type
|
||||
"poolId": "uuid", // Optional: Storage pool ID
|
||||
"bundleId": "uuid", // Optional: File bundle ID
|
||||
"chunkSize": "long", // Optional: Chunk size (default: 5MB)
|
||||
"encryptPassword": "string", // Optional: Encryption password
|
||||
"expiredAt": "datetime", // Optional: Expiration date
|
||||
"hash": "string" // Required: File hash for deduplication
|
||||
"fileName": "string",
|
||||
"fileSize": "long",
|
||||
"contentType": "string",
|
||||
"poolId": "uuid",
|
||||
"bundleId": "uuid",
|
||||
"chunkSize": "long",
|
||||
"encryptPassword": "string",
|
||||
"expiredAt": "datetime",
|
||||
"hash": "string"
|
||||
}
|
||||
```
|
||||
|
||||
**Field Descriptions:**
|
||||
- `fileName`: Required - Name of the file
|
||||
- `fileSize`: Required - Size in bytes
|
||||
- `contentType`: Required - MIME type
|
||||
- `poolId`: Optional - Storage pool ID
|
||||
- `bundleId`: Optional - File bundle ID
|
||||
- `chunkSize`: Optional - Chunk size (default: 5MB)
|
||||
- `encryptPassword`: Optional - Encryption password
|
||||
- `expiredAt`: Optional - Expiration date
|
||||
- `hash`: Required - File hash for deduplication
|
||||
|
||||
**Response:**
|
||||
```json
|
||||
{
|
||||
@@ -175,7 +188,7 @@ Gets upload statistics for the current user.
|
||||
"expiredTasks": 1,
|
||||
"totalUploadedBytes": 5368709120,
|
||||
"averageProgress": 67.5,
|
||||
"recentActivity": [...]
|
||||
"recentActivity": []
|
||||
}
|
||||
```
|
||||
|
||||
@@ -187,56 +200,73 @@ Gets the most recent upload tasks.
|
||||
|
||||
## 🔌 WebSocket Events
|
||||
|
||||
The system sends real-time updates via WebSocket using RingService. Connect to the WebSocket endpoint and listen for upload-related events.
|
||||
The system sends real-time updates via WebSocket using RingService. Connect to the WebSocket endpoint and listen for task-related events.
|
||||
|
||||
### Event Types
|
||||
|
||||
#### `upload.progress`
|
||||
Sent when upload progress changes significantly (every 5% or major milestones).
|
||||
#### `task.created`
|
||||
Sent when a new task is created.
|
||||
|
||||
```json
|
||||
{
|
||||
"type": "upload.progress",
|
||||
"type": "task.created",
|
||||
"data": {
|
||||
"taskId": "abc123def456",
|
||||
"fileName": "document.pdf",
|
||||
"fileSize": 10485760,
|
||||
"chunksUploaded": 5,
|
||||
"chunksTotal": 10,
|
||||
"progress": 50.0,
|
||||
"taskId": "task123",
|
||||
"name": "Upload File",
|
||||
"type": "FileUpload",
|
||||
"createdAt": "2025-11-09T02:00:00Z"
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
#### `task.progress`
|
||||
Sent when task progress changes significantly (every 5% or major milestones).
|
||||
|
||||
```json
|
||||
{
|
||||
"type": "task.progress",
|
||||
"data": {
|
||||
"taskId": "task123",
|
||||
"name": "Upload File",
|
||||
"type": "FileUpload",
|
||||
"progress": 67.5,
|
||||
"status": "InProgress",
|
||||
"lastActivity": "2025-11-09T01:56:00.0000000Z"
|
||||
"lastActivity": "2025-11-09T02:05:00Z"
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
#### `upload.completed`
|
||||
Sent when an upload completes successfully.
|
||||
#### `task.completed`
|
||||
Sent when a task completes successfully.
|
||||
|
||||
```json
|
||||
{
|
||||
"type": "upload.completed",
|
||||
"type": "task.completed",
|
||||
"data": {
|
||||
"taskId": "abc123def456",
|
||||
"fileId": "file789xyz",
|
||||
"fileName": "document.pdf",
|
||||
"fileSize": 10485760,
|
||||
"completedAt": "2025-11-09T01:57:00.0000000Z"
|
||||
"taskId": "task123",
|
||||
"name": "Upload File",
|
||||
"type": "FileUpload",
|
||||
"completedAt": "2025-11-09T02:10:00Z",
|
||||
"results": {
|
||||
"fileId": "file456",
|
||||
"fileName": "document.pdf",
|
||||
"fileSize": 10485760
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
#### `upload.failed`
|
||||
Sent when an upload fails.
|
||||
#### `task.failed`
|
||||
Sent when a task fails.
|
||||
|
||||
```json
|
||||
{
|
||||
"type": "upload.failed",
|
||||
"type": "task.failed",
|
||||
"data": {
|
||||
"taskId": "abc123def456",
|
||||
"fileName": "document.pdf",
|
||||
"fileSize": 10485760,
|
||||
"failedAt": "2025-11-09T01:58:00.0000000Z",
|
||||
"taskId": "task123",
|
||||
"name": "Upload File",
|
||||
"type": "FileUpload",
|
||||
"failedAt": "2025-11-09T02:15:00Z",
|
||||
"errorMessage": "File processing failed: invalid format"
|
||||
}
|
||||
}
|
||||
@@ -256,18 +286,18 @@ ws.onopen = () => {
|
||||
}));
|
||||
};
|
||||
|
||||
// Handle upload events
|
||||
// Handle task events
|
||||
ws.onmessage = (event) => {
|
||||
const packet = JSON.parse(event.data);
|
||||
|
||||
switch (packet.type) {
|
||||
case 'upload.progress':
|
||||
case 'task.progress':
|
||||
updateProgressBar(packet.data);
|
||||
break;
|
||||
case 'upload.completed':
|
||||
case 'task.completed':
|
||||
showSuccessNotification(packet.data);
|
||||
break;
|
||||
case 'upload.failed':
|
||||
case 'task.failed':
|
||||
showErrorNotification(packet.data);
|
||||
break;
|
||||
}
|
||||
@@ -282,6 +312,10 @@ function updateProgressBar(data) {
|
||||
}
|
||||
```
|
||||
|
||||
### Note on Upload-Specific Notifications
|
||||
|
||||
The system also includes upload-specific notifications (`upload.progress`, `upload.completed`, `upload.failed`) for backward compatibility. However, for new implementations, it's recommended to use the generic task notifications as they provide the same functionality with less object allocation overhead. Since users are typically in the foreground during upload operations, the generic task notifications provide sufficient progress visibility.
|
||||
|
||||
## 🗄️ Database Schema
|
||||
|
||||
### `upload_tasks` Table
|
||||
@@ -348,7 +382,7 @@ UPLOAD_CACHE_DURATION_MINUTES=30
|
||||
|
||||
```csharp
|
||||
// In Program.cs or Startup.cs
|
||||
builder.Services.AddScoped<PersistentUploadService>();
|
||||
builder.Services.AddScoped<PersistentTaskService>();
|
||||
builder.Services.AddSingleton<RingService.RingServiceClient>(sp => {
|
||||
// Configure gRPC client for RingService
|
||||
var channel = GrpcChannel.ForAddress("https://ring-service:50051");
|
||||
@@ -754,7 +788,7 @@ public class PersistentTaskService(
|
||||
|
||||
### Real-Time Task Notifications
|
||||
|
||||
All task operations send WebSocket notifications via RingService:
|
||||
All task operations send WebSocket notifications via RingService using the shared `GrpcTypeHelper` for consistent JSON serialization:
|
||||
|
||||
#### Task Created
|
||||
```json
|
||||
@@ -867,6 +901,36 @@ Tasks support multiple statuses:
|
||||
- **Cancelled**: Manually cancelled
|
||||
- **Expired**: Timed out or expired
|
||||
|
||||
### Available Service Methods
|
||||
|
||||
Based on the `PersistentTaskService` implementation, the following methods are available:
|
||||
|
||||
#### Core Task Operations
|
||||
- `CreateTaskAsync<T>(T task)`: Creates any type of persistent task
|
||||
- `GetTaskAsync<T>(string taskId)`: Retrieves a task by ID with caching
|
||||
- `UpdateTaskProgressAsync(string taskId, double progress, string? statusMessage)`: Updates task progress with automatic notifications
|
||||
- `MarkTaskCompletedAsync(string taskId, Dictionary<string, object?>? results)`: Marks task as completed with optional results
|
||||
- `MarkTaskFailedAsync(string taskId, string? errorMessage)`: Marks task as failed with error message
|
||||
- `PauseTaskAsync(string taskId)`: Pauses an in-progress task
|
||||
- `ResumeTaskAsync(string taskId)`: Resumes a paused task
|
||||
- `CancelTaskAsync(string taskId)`: Cancels a task
|
||||
|
||||
#### Task Querying & Statistics
|
||||
- `GetUserTasksAsync()`: Gets tasks for a user with filtering and pagination
|
||||
- `GetUserTaskStatsAsync(Guid accountId)`: Gets comprehensive task statistics
|
||||
- `CleanupOldTasksAsync(Guid accountId, Duration maxAge)`: Cleans up old completed/failed tasks
|
||||
|
||||
#### Upload-Specific Operations
|
||||
- `CreateUploadTaskAsync()`: Creates a new persistent upload task
|
||||
- `GetUploadTaskAsync(string taskId)`: Gets an existing upload task
|
||||
- `UpdateChunkProgressAsync(string taskId, int chunkIndex)`: Updates chunk upload progress
|
||||
- `IsChunkUploadedAsync(string taskId, int chunkIndex)`: Checks if a chunk has been uploaded
|
||||
- `GetUploadProgressAsync(string taskId)`: Gets upload progress as percentage
|
||||
- `GetUserUploadTasksAsync()`: Gets user upload tasks with filtering
|
||||
- `GetUserUploadStatsAsync(Guid accountId)`: Gets upload statistics for a user
|
||||
- `CleanupUserFailedTasksAsync(Guid accountId)`: Cleans up failed upload tasks
|
||||
- `GetRecentUserTasksAsync(Guid accountId, int limit)`: Gets recent upload tasks
|
||||
|
||||
### Priority System
|
||||
|
||||
Tasks can be assigned priorities (0-100, higher = more important) to control execution order in background processing.
|
||||
|
||||
Reference in New Issue
Block a user