Compare commits
2 Commits
db98fa240e
...
741ed18ce5
| Author | SHA1 | Date | |
|---|---|---|---|
|
741ed18ce5
|
|||
|
2bfb50cc71
|
@@ -1,5 +1,7 @@
|
|||||||
using System.Text.Json;
|
using System.Text.Json;
|
||||||
|
using DysonNetwork.Drive.Storage;
|
||||||
using DysonNetwork.Drive.Storage.Model;
|
using DysonNetwork.Drive.Storage.Model;
|
||||||
|
using DysonNetwork.Shared.Models;
|
||||||
using DysonNetwork.Shared.Proto;
|
using DysonNetwork.Shared.Proto;
|
||||||
using DysonNetwork.Shared.Stream;
|
using DysonNetwork.Shared.Stream;
|
||||||
using FFMpegCore;
|
using FFMpegCore;
|
||||||
@@ -29,7 +31,6 @@ public class BroadcastEventHandler(
|
|||||||
[".gif", ".apng", ".avif"];
|
[".gif", ".apng", ".avif"];
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||||
{
|
{
|
||||||
var js = nats.CreateJetStreamContext();
|
var js = nats.CreateJetStreamContext();
|
||||||
@@ -53,7 +54,8 @@ public class BroadcastEventHandler(
|
|||||||
{
|
{
|
||||||
await foreach (var msg in consumer.ConsumeAsync<byte[]>(cancellationToken: stoppingToken))
|
await foreach (var msg in consumer.ConsumeAsync<byte[]>(cancellationToken: stoppingToken))
|
||||||
{
|
{
|
||||||
var payload = JsonSerializer.Deserialize<FileUploadedEventPayload>(msg.Data, GrpcTypeHelper.SerializerOptions);
|
var payload =
|
||||||
|
JsonSerializer.Deserialize<FileUploadedEventPayload>(msg.Data, GrpcTypeHelper.SerializerOptions);
|
||||||
if (payload == null)
|
if (payload == null)
|
||||||
{
|
{
|
||||||
await msg.AckAsync(cancellationToken: stoppingToken);
|
await msg.AckAsync(cancellationToken: stoppingToken);
|
||||||
@@ -130,7 +132,7 @@ public class BroadcastEventHandler(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task ProcessAndUploadInBackgroundAsync(
|
private async Task ProcessAndUploadInBackgroundAsync(
|
||||||
string fileId,
|
string fileId,
|
||||||
Guid remoteId,
|
Guid remoteId,
|
||||||
string storageId,
|
string storageId,
|
||||||
@@ -142,6 +144,7 @@ public class BroadcastEventHandler(
|
|||||||
using var scope = serviceProvider.CreateScope();
|
using var scope = serviceProvider.CreateScope();
|
||||||
var fs = scope.ServiceProvider.GetRequiredService<FileService>();
|
var fs = scope.ServiceProvider.GetRequiredService<FileService>();
|
||||||
var scopedDb = scope.ServiceProvider.GetRequiredService<AppDatabase>();
|
var scopedDb = scope.ServiceProvider.GetRequiredService<AppDatabase>();
|
||||||
|
var persistentTaskService = scope.ServiceProvider.GetRequiredService<PersistentTaskService>();
|
||||||
|
|
||||||
var pool = await fs.GetPoolAsync(remoteId);
|
var pool = await fs.GetPoolAsync(remoteId);
|
||||||
if (pool is null) return;
|
if (pool is null) return;
|
||||||
@@ -155,6 +158,11 @@ public class BroadcastEventHandler(
|
|||||||
|
|
||||||
var fileToUpdate = await scopedDb.Files.AsNoTracking().FirstAsync(f => f.Id == fileId);
|
var fileToUpdate = await scopedDb.Files.AsNoTracking().FirstAsync(f => f.Id == fileId);
|
||||||
|
|
||||||
|
// Find the upload task associated with this file
|
||||||
|
var uploadTask = await scopedDb.Tasks
|
||||||
|
.OfType<PersistentUploadTask>()
|
||||||
|
.FirstOrDefaultAsync(t => t.FileName == fileToUpdate.Name && t.FileSize == fileToUpdate.Size);
|
||||||
|
|
||||||
if (fileToUpdate.IsEncrypted)
|
if (fileToUpdate.IsEncrypted)
|
||||||
{
|
{
|
||||||
uploads.Add((processingFilePath, string.Empty, contentType, false));
|
uploads.Add((processingFilePath, string.Empty, contentType, false));
|
||||||
@@ -293,5 +301,51 @@ public class BroadcastEventHandler(
|
|||||||
}
|
}
|
||||||
|
|
||||||
await fs._PurgeCacheAsync(fileId);
|
await fs._PurgeCacheAsync(fileId);
|
||||||
|
|
||||||
|
// Complete the upload task if found
|
||||||
|
if (uploadTask != null)
|
||||||
|
{
|
||||||
|
await persistentTaskService.MarkTaskCompletedAsync(uploadTask.TaskId, new Dictionary<string, object?>
|
||||||
|
{
|
||||||
|
{ "FileId", fileId },
|
||||||
|
{ "FileName", fileToUpdate.Name },
|
||||||
|
{ "FileInfo", fileToUpdate },
|
||||||
|
{ "FileSize", fileToUpdate.Size },
|
||||||
|
{ "MimeType", newMimeType },
|
||||||
|
{ "HasCompression", hasCompression },
|
||||||
|
{ "HasThumbnail", hasThumbnail }
|
||||||
|
});
|
||||||
|
|
||||||
|
// Send push notification for large files (>5MB) that took longer to process
|
||||||
|
if (fileToUpdate.Size > 5 * 1024 * 1024) // 5MB threshold
|
||||||
|
await SendLargeFileProcessingCompleteNotificationAsync(uploadTask, fileToUpdate);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task SendLargeFileProcessingCompleteNotificationAsync(PersistentUploadTask task, SnCloudFile file)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var ringService = serviceProvider.GetRequiredService<RingService.RingServiceClient>();
|
||||||
|
|
||||||
|
var pushNotification = new PushNotification
|
||||||
|
{
|
||||||
|
Topic = "drive.tasks.upload",
|
||||||
|
Title = "File Processing Complete",
|
||||||
|
Subtitle = file.Name,
|
||||||
|
Body = $"Your file '{file.Name}' has finished processing and is now available.",
|
||||||
|
IsSavable = true
|
||||||
|
};
|
||||||
|
|
||||||
|
await ringService.SendPushNotificationToUserAsync(new SendPushNotificationToUserRequest
|
||||||
|
{
|
||||||
|
UserId = task.AccountId.ToString(),
|
||||||
|
Notification = pushNotification
|
||||||
|
});
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
logger.LogWarning(ex, "Failed to send large file processing notification for task {TaskId}", task.TaskId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1,5 +1,4 @@
|
|||||||
using System.ComponentModel.DataAnnotations;
|
using System.ComponentModel.DataAnnotations;
|
||||||
using System.Text.Json;
|
|
||||||
using DysonNetwork.Drive.Billing;
|
using DysonNetwork.Drive.Billing;
|
||||||
using DysonNetwork.Drive.Storage.Model;
|
using DysonNetwork.Drive.Storage.Model;
|
||||||
using DysonNetwork.Shared.Auth;
|
using DysonNetwork.Shared.Auth;
|
||||||
@@ -31,15 +30,13 @@ public class FileUploadController(
|
|||||||
{
|
{
|
||||||
private readonly string _tempPath =
|
private readonly string _tempPath =
|
||||||
configuration.GetValue<string>("Storage:Uploads") ?? Path.Combine(Path.GetTempPath(), "multipart-uploads");
|
configuration.GetValue<string>("Storage:Uploads") ?? Path.Combine(Path.GetTempPath(), "multipart-uploads");
|
||||||
private readonly ILogger<FileUploadController> _logger = logger;
|
|
||||||
|
|
||||||
private const long DefaultChunkSize = 1024 * 1024 * 5; // 5MB
|
private const long DefaultChunkSize = 1024 * 1024 * 5; // 5MB
|
||||||
|
|
||||||
[HttpPost("create")]
|
[HttpPost("create")]
|
||||||
public async Task<IActionResult> CreateUploadTask([FromBody] CreateUploadTaskRequest request)
|
public async Task<IActionResult> CreateUploadTask([FromBody] CreateUploadTaskRequest request)
|
||||||
{
|
{
|
||||||
var currentUser = HttpContext.Items["CurrentUser"] as Account;
|
if (HttpContext.Items["CurrentUser"] is not Account currentUser)
|
||||||
if (currentUser is null)
|
|
||||||
return new ObjectResult(ApiError.Unauthorized()) { StatusCode = 401 };
|
return new ObjectResult(ApiError.Unauthorized()) { StatusCode = 401 };
|
||||||
|
|
||||||
var permissionCheck = await ValidateUserPermissions(currentUser);
|
var permissionCheck = await ValidateUserPermissions(currentUser);
|
||||||
@@ -99,25 +96,25 @@ public class FileUploadController(
|
|||||||
new ObjectResult(ApiError.Unauthorized(forbidden: true)) { StatusCode = 403 };
|
new ObjectResult(ApiError.Unauthorized(forbidden: true)) { StatusCode = 403 };
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task<IActionResult?> ValidatePoolAccess(Account currentUser, FilePool pool, CreateUploadTaskRequest request)
|
private Task<IActionResult?> ValidatePoolAccess(Account currentUser, FilePool pool, CreateUploadTaskRequest request)
|
||||||
{
|
{
|
||||||
if (pool.PolicyConfig.RequirePrivilege <= 0) return null;
|
if (pool.PolicyConfig.RequirePrivilege <= 0) return Task.FromResult<IActionResult?>(null);
|
||||||
|
|
||||||
var privilege = currentUser.PerkSubscription is null ? 0 :
|
var privilege = currentUser.PerkSubscription is null ? 0 :
|
||||||
PerkSubscriptionPrivilege.GetPrivilegeFromIdentifier(currentUser.PerkSubscription.Identifier);
|
PerkSubscriptionPrivilege.GetPrivilegeFromIdentifier(currentUser.PerkSubscription.Identifier);
|
||||||
|
|
||||||
if (privilege < pool.PolicyConfig.RequirePrivilege)
|
if (privilege < pool.PolicyConfig.RequirePrivilege)
|
||||||
{
|
{
|
||||||
return new ObjectResult(ApiError.Unauthorized(
|
return Task.FromResult<IActionResult?>(new ObjectResult(ApiError.Unauthorized(
|
||||||
$"You need Stellar Program tier {pool.PolicyConfig.RequirePrivilege} to use pool {pool.Name}, you are tier {privilege}",
|
$"You need Stellar Program tier {pool.PolicyConfig.RequirePrivilege} to use pool {pool.Name}, you are tier {privilege}",
|
||||||
forbidden: true))
|
forbidden: true))
|
||||||
{ StatusCode = 403 };
|
{ StatusCode = 403 });
|
||||||
}
|
}
|
||||||
|
|
||||||
return null;
|
return Task.FromResult<IActionResult?>(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
private IActionResult? ValidatePoolPolicy(PolicyConfig policy, CreateUploadTaskRequest request)
|
private static IActionResult? ValidatePoolPolicy(PolicyConfig policy, CreateUploadTaskRequest request)
|
||||||
{
|
{
|
||||||
if (!policy.AllowEncryption && !string.IsNullOrEmpty(request.EncryptPassword))
|
if (!policy.AllowEncryption && !string.IsNullOrEmpty(request.EncryptPassword))
|
||||||
{
|
{
|
||||||
@@ -138,13 +135,11 @@ public class FileUploadController(
|
|||||||
|
|
||||||
var foundMatch = policy.AcceptTypes.Any(acceptType =>
|
var foundMatch = policy.AcceptTypes.Any(acceptType =>
|
||||||
{
|
{
|
||||||
if (acceptType.EndsWith("/*", StringComparison.OrdinalIgnoreCase))
|
if (!acceptType.EndsWith("/*", StringComparison.OrdinalIgnoreCase))
|
||||||
{
|
return acceptType.Equals(request.ContentType, StringComparison.OrdinalIgnoreCase);
|
||||||
var type = acceptType[..^2];
|
var type = acceptType[..^2];
|
||||||
return request.ContentType.StartsWith($"{type}/", StringComparison.OrdinalIgnoreCase);
|
return request.ContentType.StartsWith($"{type}/", StringComparison.OrdinalIgnoreCase);
|
||||||
}
|
|
||||||
|
|
||||||
return acceptType.Equals(request.ContentType, StringComparison.OrdinalIgnoreCase);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
if (!foundMatch)
|
if (!foundMatch)
|
||||||
@@ -191,41 +186,13 @@ public class FileUploadController(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task<(string taskId, UploadTask task)> CreateUploadTaskInternal(CreateUploadTaskRequest request)
|
|
||||||
{
|
|
||||||
var taskId = await Nanoid.GenerateAsync();
|
|
||||||
var taskPath = Path.Combine(_tempPath, taskId);
|
|
||||||
Directory.CreateDirectory(taskPath);
|
|
||||||
|
|
||||||
var chunkSize = request.ChunkSize ?? DefaultChunkSize;
|
|
||||||
var chunksCount = (int)Math.Ceiling((double)request.FileSize / chunkSize);
|
|
||||||
|
|
||||||
var task = new UploadTask
|
|
||||||
{
|
|
||||||
TaskId = taskId,
|
|
||||||
FileName = request.FileName,
|
|
||||||
FileSize = request.FileSize,
|
|
||||||
ContentType = request.ContentType,
|
|
||||||
ChunkSize = chunkSize,
|
|
||||||
ChunksCount = chunksCount,
|
|
||||||
PoolId = request.PoolId.Value,
|
|
||||||
BundleId = request.BundleId,
|
|
||||||
EncryptPassword = request.EncryptPassword,
|
|
||||||
ExpiredAt = request.ExpiredAt,
|
|
||||||
Hash = request.Hash,
|
|
||||||
};
|
|
||||||
|
|
||||||
await System.IO.File.WriteAllTextAsync(Path.Combine(taskPath, "task.json"), JsonSerializer.Serialize(task));
|
|
||||||
return (taskId, task);
|
|
||||||
}
|
|
||||||
|
|
||||||
public class UploadChunkRequest
|
public class UploadChunkRequest
|
||||||
{
|
{
|
||||||
[Required]
|
[Required]
|
||||||
public IFormFile Chunk { get; set; } = null!;
|
public IFormFile Chunk { get; set; } = null!;
|
||||||
}
|
}
|
||||||
|
|
||||||
[HttpPost("chunk/{taskId}/{chunkIndex}")]
|
[HttpPost("chunk/{taskId}/{chunkIndex:int}")]
|
||||||
[RequestSizeLimit(DefaultChunkSize + 1024 * 1024)] // 6MB to be safe
|
[RequestSizeLimit(DefaultChunkSize + 1024 * 1024)] // 6MB to be safe
|
||||||
[RequestFormLimits(MultipartBodyLengthLimit = DefaultChunkSize + 1024 * 1024)]
|
[RequestFormLimits(MultipartBodyLengthLimit = DefaultChunkSize + 1024 * 1024)]
|
||||||
public async Task<IActionResult> UploadChunk(string taskId, int chunkIndex, [FromForm] UploadChunkRequest request)
|
public async Task<IActionResult> UploadChunk(string taskId, int chunkIndex, [FromForm] UploadChunkRequest request)
|
||||||
@@ -278,7 +245,7 @@ public class FileUploadController(
|
|||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
await MergeChunks(taskPath, mergedFilePath, persistentTask.ChunksCount);
|
await MergeChunks(taskId, taskPath, mergedFilePath, persistentTask.ChunksCount, persistentTaskService);
|
||||||
|
|
||||||
var fileId = await Nanoid.GenerateAsync();
|
var fileId = await Nanoid.GenerateAsync();
|
||||||
var cloudFile = await fileService.ProcessNewFileAsync(
|
var cloudFile = await fileService.ProcessNewFileAsync(
|
||||||
@@ -293,10 +260,10 @@ public class FileUploadController(
|
|||||||
persistentTask.ExpiredAt
|
persistentTask.ExpiredAt
|
||||||
);
|
);
|
||||||
|
|
||||||
// Mark task as completed
|
// Update task status to "processing" - background processing is now happening
|
||||||
await persistentTaskService.MarkTaskCompletedAsync(taskId);
|
await persistentTaskService.UpdateTaskProgressAsync(taskId, 95, "Processing file in background...");
|
||||||
|
|
||||||
// Send completion notification
|
// Send upload completion notification (file is uploaded, but processing continues)
|
||||||
await persistentTaskService.SendUploadCompletedNotificationAsync(persistentTask, fileId);
|
await persistentTaskService.SendUploadCompletedNotificationAsync(persistentTask, fileId);
|
||||||
|
|
||||||
return Ok(cloudFile);
|
return Ok(cloudFile);
|
||||||
@@ -304,7 +271,7 @@ public class FileUploadController(
|
|||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
// Log the actual exception for debugging
|
// Log the actual exception for debugging
|
||||||
_logger.LogError(ex, "Failed to complete upload for task {TaskId}. Error: {ErrorMessage}", taskId, ex.Message);
|
logger.LogError(ex, "Failed to complete upload for task {TaskId}. Error: {ErrorMessage}", taskId, ex.Message);
|
||||||
|
|
||||||
// Mark task as failed
|
// Mark task as failed
|
||||||
await persistentTaskService.MarkTaskFailedAsync(taskId);
|
await persistentTaskService.MarkTaskFailedAsync(taskId);
|
||||||
@@ -328,24 +295,41 @@ public class FileUploadController(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task MergeChunks(string taskPath, string mergedFilePath, int chunksCount)
|
private static async Task MergeChunks(
|
||||||
|
string taskId,
|
||||||
|
string taskPath,
|
||||||
|
string mergedFilePath,
|
||||||
|
int chunksCount,
|
||||||
|
PersistentTaskService persistentTaskService)
|
||||||
{
|
{
|
||||||
await using var mergedStream = new FileStream(mergedFilePath, FileMode.Create);
|
await using var mergedStream = new FileStream(mergedFilePath, FileMode.Create);
|
||||||
|
|
||||||
|
const double baseProgress = 0.8; // Start from 80% (chunk upload is already at 95%)
|
||||||
|
const double remainingProgress = 0.15; // Remaining 15% progress distributed across chunks
|
||||||
|
var progressPerChunk = remainingProgress / chunksCount;
|
||||||
|
|
||||||
for (var i = 0; i < chunksCount; i++)
|
for (var i = 0; i < chunksCount; i++)
|
||||||
{
|
{
|
||||||
var chunkPath = Path.Combine(taskPath, $"{i}.chunk");
|
var chunkPath = Path.Combine(taskPath, i + ".chunk");
|
||||||
if (!System.IO.File.Exists(chunkPath))
|
if (!System.IO.File.Exists(chunkPath))
|
||||||
{
|
{
|
||||||
throw new InvalidOperationException($"Chunk {i} is missing.");
|
throw new InvalidOperationException("Chunk " + i + " is missing.");
|
||||||
}
|
}
|
||||||
|
|
||||||
await using var chunkStream = new FileStream(chunkPath, FileMode.Open);
|
await using var chunkStream = new FileStream(chunkPath, FileMode.Open);
|
||||||
await chunkStream.CopyToAsync(mergedStream);
|
await chunkStream.CopyToAsync(mergedStream);
|
||||||
|
|
||||||
|
// Update progress after each chunk is merged
|
||||||
|
var currentProgress = baseProgress + (progressPerChunk * (i + 1));
|
||||||
|
await persistentTaskService.UpdateTaskProgressAsync(
|
||||||
|
taskId,
|
||||||
|
currentProgress,
|
||||||
|
"Merging chunks... (" + (i + 1) + "/" + chunksCount + ")"
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task CleanupTempFiles(string taskPath, string mergedFilePath)
|
private static Task CleanupTempFiles(string taskPath, string mergedFilePath)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
@@ -359,6 +343,8 @@ public class FileUploadController(
|
|||||||
{
|
{
|
||||||
// Ignore cleanup errors to avoid masking the original exception
|
// Ignore cleanup errors to avoid masking the original exception
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return Task.CompletedTask;
|
||||||
}
|
}
|
||||||
|
|
||||||
// New endpoints for resumable uploads
|
// New endpoints for resumable uploads
|
||||||
@@ -395,7 +381,7 @@ public class FileUploadController(
|
|||||||
t.LastActivity,
|
t.LastActivity,
|
||||||
t.CreatedAt,
|
t.CreatedAt,
|
||||||
t.UpdatedAt,
|
t.UpdatedAt,
|
||||||
UploadedChunks = t.UploadedChunks,
|
t.UploadedChunks,
|
||||||
Pool = new { t.PoolId, Name = "Pool Name" }, // Could be expanded to include pool details
|
Pool = new { t.PoolId, Name = "Pool Name" }, // Could be expanded to include pool details
|
||||||
Bundle = t.BundleId.HasValue ? new { t.BundleId } : null
|
Bundle = t.BundleId.HasValue ? new { t.BundleId } : null
|
||||||
}));
|
}));
|
||||||
@@ -463,7 +449,7 @@ public class FileUploadController(
|
|||||||
task.ChunkSize,
|
task.ChunkSize,
|
||||||
task.ChunksCount,
|
task.ChunksCount,
|
||||||
task.ChunksUploaded,
|
task.ChunksUploaded,
|
||||||
UploadedChunks = task.UploadedChunks,
|
task.UploadedChunks,
|
||||||
Progress = task.ChunksCount > 0 ? (double)task.ChunksUploaded / task.ChunksCount * 100 : 0
|
Progress = task.ChunksCount > 0 ? (double)task.ChunksUploaded / task.ChunksCount * 100 : 0
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@@ -505,14 +491,14 @@ public class FileUploadController(
|
|||||||
|
|
||||||
return Ok(new
|
return Ok(new
|
||||||
{
|
{
|
||||||
TotalTasks = stats.TotalTasks,
|
stats.TotalTasks,
|
||||||
InProgressTasks = stats.InProgressTasks,
|
stats.InProgressTasks,
|
||||||
CompletedTasks = stats.CompletedTasks,
|
stats.CompletedTasks,
|
||||||
FailedTasks = stats.FailedTasks,
|
stats.FailedTasks,
|
||||||
ExpiredTasks = stats.ExpiredTasks,
|
stats.ExpiredTasks,
|
||||||
TotalUploadedBytes = stats.TotalUploadedBytes,
|
stats.TotalUploadedBytes,
|
||||||
AverageProgress = stats.AverageProgress,
|
stats.AverageProgress,
|
||||||
RecentActivity = stats.RecentActivity
|
stats.RecentActivity
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -591,7 +577,7 @@ public class FileUploadController(
|
|||||||
task.UpdatedAt,
|
task.UpdatedAt,
|
||||||
task.ExpiredAt,
|
task.ExpiredAt,
|
||||||
task.Hash,
|
task.Hash,
|
||||||
UploadedChunks = task.UploadedChunks
|
task.UploadedChunks
|
||||||
},
|
},
|
||||||
Pool = pool != null ? new
|
Pool = pool != null ? new
|
||||||
{
|
{
|
||||||
@@ -610,9 +596,9 @@ public class FileUploadController(
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private string? CalculateEstimatedTime(PersistentUploadTask task)
|
private static string? CalculateEstimatedTime(PersistentUploadTask task)
|
||||||
{
|
{
|
||||||
if (task.Status != Model.TaskStatus.InProgress || task.ChunksUploaded == 0)
|
if (task.Status != TaskStatus.InProgress || task.ChunksUploaded == 0)
|
||||||
return null;
|
return null;
|
||||||
|
|
||||||
var elapsed = NodaTime.SystemClock.Instance.GetCurrentInstant() - task.CreatedAt;
|
var elapsed = NodaTime.SystemClock.Instance.GetCurrentInstant() - task.CreatedAt;
|
||||||
@@ -625,27 +611,29 @@ public class FileUploadController(
|
|||||||
|
|
||||||
var remainingSeconds = remainingChunks / chunksPerSecond;
|
var remainingSeconds = remainingChunks / chunksPerSecond;
|
||||||
|
|
||||||
if (remainingSeconds < 60)
|
return remainingSeconds switch
|
||||||
return $"{remainingSeconds:F0} seconds";
|
{
|
||||||
if (remainingSeconds < 3600)
|
< 60 => $"{remainingSeconds:F0} seconds",
|
||||||
return $"{remainingSeconds / 60:F0} minutes";
|
< 3600 => $"{remainingSeconds / 60:F0} minutes",
|
||||||
return $"{remainingSeconds / 3600:F1} hours";
|
_ => $"{remainingSeconds / 3600:F1} hours"
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
private string? CalculateUploadSpeed(PersistentUploadTask task)
|
private static string? CalculateUploadSpeed(PersistentUploadTask task)
|
||||||
{
|
{
|
||||||
if (task.ChunksUploaded == 0)
|
if (task.ChunksUploaded == 0)
|
||||||
return null;
|
return null;
|
||||||
|
|
||||||
var elapsed = NodaTime.SystemClock.Instance.GetCurrentInstant() - task.CreatedAt;
|
var elapsed = SystemClock.Instance.GetCurrentInstant() - task.CreatedAt;
|
||||||
var elapsedSeconds = elapsed.TotalSeconds;
|
var elapsedSeconds = elapsed.TotalSeconds;
|
||||||
var bytesUploaded = (long)task.ChunksUploaded * task.ChunkSize;
|
var bytesUploaded = task.ChunksUploaded * task.ChunkSize;
|
||||||
var bytesPerSecond = bytesUploaded / elapsedSeconds;
|
var bytesPerSecond = bytesUploaded / elapsedSeconds;
|
||||||
|
|
||||||
if (bytesPerSecond < 1024)
|
return bytesPerSecond switch
|
||||||
return $"{bytesPerSecond:F0} B/s";
|
{
|
||||||
if (bytesPerSecond < 1024 * 1024)
|
< 1024 => $"{bytesPerSecond:F0} B/s",
|
||||||
return $"{bytesPerSecond / 1024:F0} KB/s";
|
< 1024 * 1024 => $"{bytesPerSecond / 1024:F0} KB/s",
|
||||||
return $"{bytesPerSecond / (1024 * 1024):F1} MB/s";
|
_ => $"{bytesPerSecond / (1024 * 1024):F1} MB/s"
|
||||||
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -157,24 +157,6 @@ public class PersistentTask : ModelBase
|
|||||||
|
|
||||||
// Estimated duration in seconds
|
// Estimated duration in seconds
|
||||||
public long? EstimatedDurationSeconds { get; set; }
|
public long? EstimatedDurationSeconds { get; set; }
|
||||||
|
|
||||||
// Helper methods for parameter management using GrpcTypeHelper
|
|
||||||
public MapField<string, Google.Protobuf.WellKnownTypes.Value> GetParametersAsGrpcMap()
|
|
||||||
{
|
|
||||||
var nonNullableParameters = Parameters.ToDictionary(kvp => kvp.Key, kvp => kvp.Value ?? string.Empty);
|
|
||||||
return GrpcTypeHelper.ConvertToValueMap(nonNullableParameters);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void SetParametersFromGrpcMap(MapField<string, Google.Protobuf.WellKnownTypes.Value> map)
|
|
||||||
{
|
|
||||||
Parameters = GrpcTypeHelper.ConvertFromValueMap(map);
|
|
||||||
}
|
|
||||||
|
|
||||||
public MapField<string, Google.Protobuf.WellKnownTypes.Value> GetResultsAsGrpcMap()
|
|
||||||
{
|
|
||||||
var nonNullableResults = Results.ToDictionary(kvp => kvp.Key, kvp => kvp.Value ?? string.Empty);
|
|
||||||
return GrpcTypeHelper.ConvertToValueMap(nonNullableResults);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Backward compatibility - UploadTask inherits from PersistentTask
|
// Backward compatibility - UploadTask inherits from PersistentTask
|
||||||
|
|||||||
@@ -26,7 +26,7 @@ public class PersistentTaskService(
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public async Task<T> CreateTaskAsync<T>(T task) where T : PersistentTask
|
public async Task<T> CreateTaskAsync<T>(T task) where T : PersistentTask
|
||||||
{
|
{
|
||||||
task.TaskId = NanoidDotNet.Nanoid.Generate();
|
task.TaskId = await Nanoid.GenerateAsync();
|
||||||
var now = SystemClock.Instance.GetCurrentInstant();
|
var now = SystemClock.Instance.GetCurrentInstant();
|
||||||
task.CreatedAt = now;
|
task.CreatedAt = now;
|
||||||
task.UpdatedAt = now;
|
task.UpdatedAt = now;
|
||||||
@@ -45,7 +45,7 @@ public class PersistentTaskService(
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// Gets a task by ID
|
/// Gets a task by ID
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public async Task<T?> GetTaskAsync<T>(string taskId) where T : PersistentTask
|
private async Task<T?> GetTaskAsync<T>(string taskId) where T : PersistentTask
|
||||||
{
|
{
|
||||||
var cacheKey = $"{CacheKeyPrefix}{taskId}";
|
var cacheKey = $"{CacheKeyPrefix}{taskId}";
|
||||||
var cachedTask = await cache.GetAsync<T>(cacheKey);
|
var cachedTask = await cache.GetAsync<T>(cacheKey);
|
||||||
@@ -55,13 +55,9 @@ public class PersistentTaskService(
|
|||||||
var task = await db.Tasks
|
var task = await db.Tasks
|
||||||
.FirstOrDefaultAsync(t => t.TaskId == taskId);
|
.FirstOrDefaultAsync(t => t.TaskId == taskId);
|
||||||
|
|
||||||
if (task is T typedTask)
|
if (task is not T typedTask) return null;
|
||||||
{
|
await SetCacheAsync(typedTask);
|
||||||
await SetCacheAsync(typedTask);
|
return typedTask;
|
||||||
return typedTask;
|
|
||||||
}
|
|
||||||
|
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@@ -73,20 +69,35 @@ public class PersistentTaskService(
|
|||||||
if (task is null) return;
|
if (task is null) return;
|
||||||
|
|
||||||
var previousProgress = task.Progress;
|
var previousProgress = task.Progress;
|
||||||
task.Progress = Math.Clamp(progress, 0, 100);
|
var clampedProgress = Math.Clamp(progress, 0, 1.0);
|
||||||
task.LastActivity = SystemClock.Instance.GetCurrentInstant();
|
var now = SystemClock.Instance.GetCurrentInstant();
|
||||||
task.UpdatedAt = task.LastActivity;
|
|
||||||
|
|
||||||
if (statusMessage is not null)
|
// Use ExecuteUpdateAsync for better performance - update only the fields we need
|
||||||
|
var updatedRows = await db.Tasks
|
||||||
|
.Where(t => t.TaskId == taskId)
|
||||||
|
.ExecuteUpdateAsync(setters => setters
|
||||||
|
.SetProperty(t => t.Progress, clampedProgress)
|
||||||
|
.SetProperty(t => t.LastActivity, now)
|
||||||
|
.SetProperty(t => t.UpdatedAt, now)
|
||||||
|
.SetProperty(t => t.Description, t => statusMessage ?? t.Description)
|
||||||
|
);
|
||||||
|
|
||||||
|
if (updatedRows > 0)
|
||||||
{
|
{
|
||||||
task.Description = statusMessage;
|
// Update the cached task
|
||||||
|
task.Progress = clampedProgress;
|
||||||
|
task.LastActivity = now;
|
||||||
|
task.UpdatedAt = now;
|
||||||
|
if (statusMessage is not null)
|
||||||
|
{
|
||||||
|
task.Description = statusMessage;
|
||||||
|
}
|
||||||
|
|
||||||
|
await SetCacheAsync(task);
|
||||||
|
|
||||||
|
// Send progress update notification
|
||||||
|
await SendTaskProgressUpdateAsync(task, task.Progress, previousProgress);
|
||||||
}
|
}
|
||||||
|
|
||||||
await db.SaveChangesAsync();
|
|
||||||
await SetCacheAsync(task);
|
|
||||||
|
|
||||||
// Send progress update notification
|
|
||||||
await SendTaskProgressUpdateAsync(task, task.Progress, previousProgress);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@@ -98,24 +109,38 @@ public class PersistentTaskService(
|
|||||||
if (task is null) return;
|
if (task is null) return;
|
||||||
|
|
||||||
var now = SystemClock.Instance.GetCurrentInstant();
|
var now = SystemClock.Instance.GetCurrentInstant();
|
||||||
task.Status = TaskStatus.Completed;
|
|
||||||
task.Progress = 100;
|
|
||||||
task.CompletedAt = now;
|
|
||||||
task.LastActivity = now;
|
|
||||||
task.UpdatedAt = now;
|
|
||||||
|
|
||||||
if (results is not null)
|
// Use ExecuteUpdateAsync for better performance - update only the fields we need
|
||||||
|
var updatedRows = await db.Tasks
|
||||||
|
.Where(t => t.TaskId == taskId)
|
||||||
|
.ExecuteUpdateAsync(setters => setters
|
||||||
|
.SetProperty(t => t.Status, TaskStatus.Completed)
|
||||||
|
.SetProperty(t => t.Progress, 1.0)
|
||||||
|
.SetProperty(t => t.CompletedAt, now)
|
||||||
|
.SetProperty(t => t.LastActivity, now)
|
||||||
|
.SetProperty(t => t.UpdatedAt, now)
|
||||||
|
);
|
||||||
|
|
||||||
|
if (updatedRows > 0)
|
||||||
{
|
{
|
||||||
foreach (var (key, value) in results)
|
// Update the cached task with results if provided
|
||||||
|
task.Status = TaskStatus.Completed;
|
||||||
|
task.Progress = 1.0;
|
||||||
|
task.CompletedAt = now;
|
||||||
|
task.LastActivity = now;
|
||||||
|
task.UpdatedAt = now;
|
||||||
|
|
||||||
|
if (results is not null)
|
||||||
{
|
{
|
||||||
task.Results[key] = value;
|
foreach (var (key, value) in results)
|
||||||
|
{
|
||||||
|
task.Results[key] = value;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
await RemoveCacheAsync(taskId);
|
||||||
|
await SendTaskCompletedNotificationAsync(task);
|
||||||
}
|
}
|
||||||
|
|
||||||
await db.SaveChangesAsync();
|
|
||||||
await RemoveCacheAsync(taskId);
|
|
||||||
|
|
||||||
await SendTaskCompletedNotificationAsync(task);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@@ -126,15 +151,30 @@ public class PersistentTaskService(
|
|||||||
var task = await GetTaskAsync<PersistentTask>(taskId);
|
var task = await GetTaskAsync<PersistentTask>(taskId);
|
||||||
if (task is null) return;
|
if (task is null) return;
|
||||||
|
|
||||||
task.Status = TaskStatus.Failed;
|
var now = SystemClock.Instance.GetCurrentInstant();
|
||||||
task.ErrorMessage = errorMessage ?? "Task failed due to an unknown error";
|
var errorMsg = errorMessage ?? "Task failed due to an unknown error";
|
||||||
task.LastActivity = SystemClock.Instance.GetCurrentInstant();
|
|
||||||
task.UpdatedAt = task.LastActivity;
|
|
||||||
|
|
||||||
await db.SaveChangesAsync();
|
// Use ExecuteUpdateAsync for better performance - update only the fields we need
|
||||||
await RemoveCacheAsync(taskId);
|
var updatedRows = await db.Tasks
|
||||||
|
.Where(t => t.TaskId == taskId)
|
||||||
|
.ExecuteUpdateAsync(setters => setters
|
||||||
|
.SetProperty(t => t.Status, TaskStatus.Failed)
|
||||||
|
.SetProperty(t => t.ErrorMessage, errorMsg)
|
||||||
|
.SetProperty(t => t.LastActivity, now)
|
||||||
|
.SetProperty(t => t.UpdatedAt, now)
|
||||||
|
);
|
||||||
|
|
||||||
await SendTaskFailedNotificationAsync(task);
|
if (updatedRows > 0)
|
||||||
|
{
|
||||||
|
// Update the cached task
|
||||||
|
task.Status = TaskStatus.Failed;
|
||||||
|
task.ErrorMessage = errorMsg;
|
||||||
|
task.LastActivity = now;
|
||||||
|
task.UpdatedAt = now;
|
||||||
|
|
||||||
|
await RemoveCacheAsync(taskId);
|
||||||
|
await SendTaskFailedNotificationAsync(task);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@@ -281,20 +321,20 @@ public class PersistentTaskService(
|
|||||||
ExpiredTasks = tasks.Count(t => t.Status == TaskStatus.Expired),
|
ExpiredTasks = tasks.Count(t => t.Status == TaskStatus.Expired),
|
||||||
AverageProgress = tasks.Any(t => t.Status == TaskStatus.InProgress || t.Status == TaskStatus.Paused)
|
AverageProgress = tasks.Any(t => t.Status == TaskStatus.InProgress || t.Status == TaskStatus.Paused)
|
||||||
? tasks.Where(t => t.Status == TaskStatus.InProgress || t.Status == TaskStatus.Paused)
|
? tasks.Where(t => t.Status == TaskStatus.InProgress || t.Status == TaskStatus.Paused)
|
||||||
.Average(t => t.Progress)
|
.Average(t => t.Progress)
|
||||||
: 0,
|
: 0,
|
||||||
RecentActivity = tasks.OrderByDescending(t => t.LastActivity)
|
RecentActivity = tasks.OrderByDescending(t => t.LastActivity)
|
||||||
.Take(10)
|
.Take(10)
|
||||||
.Select(t => new TaskActivity
|
.Select(t => new TaskActivity
|
||||||
{
|
{
|
||||||
TaskId = t.TaskId,
|
TaskId = t.TaskId,
|
||||||
Name = t.Name,
|
Name = t.Name,
|
||||||
Type = t.Type,
|
Type = t.Type,
|
||||||
Status = t.Status,
|
Status = t.Status,
|
||||||
Progress = t.Progress,
|
Progress = t.Progress,
|
||||||
LastActivity = t.LastActivity
|
LastActivity = t.LastActivity
|
||||||
})
|
})
|
||||||
.ToList()
|
.ToList()
|
||||||
};
|
};
|
||||||
|
|
||||||
return stats;
|
return stats;
|
||||||
@@ -314,11 +354,11 @@ public class PersistentTaskService(
|
|||||||
|
|
||||||
var oldTasks = await db.Tasks
|
var oldTasks = await db.Tasks
|
||||||
.Where(t => t.AccountId == accountId &&
|
.Where(t => t.AccountId == accountId &&
|
||||||
(t.Status == TaskStatus.Completed ||
|
(t.Status == TaskStatus.Completed ||
|
||||||
t.Status == TaskStatus.Failed ||
|
t.Status == TaskStatus.Failed ||
|
||||||
t.Status == TaskStatus.Cancelled ||
|
t.Status == TaskStatus.Cancelled ||
|
||||||
t.Status == TaskStatus.Expired) &&
|
t.Status == TaskStatus.Expired) &&
|
||||||
t.UpdatedAt < cutoff)
|
t.UpdatedAt < cutoff)
|
||||||
.ToListAsync();
|
.ToListAsync();
|
||||||
|
|
||||||
db.Tasks.RemoveRange(oldTasks);
|
db.Tasks.RemoveRange(oldTasks);
|
||||||
@@ -344,13 +384,13 @@ public class PersistentTaskService(
|
|||||||
TaskId = task.TaskId,
|
TaskId = task.TaskId,
|
||||||
Name = task.Name,
|
Name = task.Name,
|
||||||
Type = task.Type.ToString(),
|
Type = task.Type.ToString(),
|
||||||
CreatedAt = task.CreatedAt.ToString("%O", null)
|
CreatedAt = task.CreatedAt.ToString()
|
||||||
};
|
};
|
||||||
|
|
||||||
var packet = new WebSocketPacket
|
var packet = new WebSocketPacket
|
||||||
{
|
{
|
||||||
Type = "task.created",
|
Type = "task.created",
|
||||||
Data = Google.Protobuf.ByteString.CopyFromUtf8(System.Text.Json.JsonSerializer.Serialize(data))
|
Data = GrpcTypeHelper.ConvertObjectToByteString(data)
|
||||||
};
|
};
|
||||||
|
|
||||||
await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest
|
await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest
|
||||||
@@ -380,13 +420,13 @@ public class PersistentTaskService(
|
|||||||
Type = task.Type.ToString(),
|
Type = task.Type.ToString(),
|
||||||
Progress = newProgress,
|
Progress = newProgress,
|
||||||
Status = task.Status.ToString(),
|
Status = task.Status.ToString(),
|
||||||
LastActivity = task.LastActivity.ToString("%O", null)
|
LastActivity = task.LastActivity.ToString()
|
||||||
};
|
};
|
||||||
|
|
||||||
var packet = new WebSocketPacket
|
var packet = new WebSocketPacket
|
||||||
{
|
{
|
||||||
Type = "task.progress",
|
Type = "task.progress",
|
||||||
Data = Google.Protobuf.ByteString.CopyFromUtf8(System.Text.Json.JsonSerializer.Serialize(data))
|
Data = GrpcTypeHelper.ConvertObjectToByteString(data)
|
||||||
};
|
};
|
||||||
|
|
||||||
await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest
|
await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest
|
||||||
@@ -410,7 +450,7 @@ public class PersistentTaskService(
|
|||||||
TaskId = task.TaskId,
|
TaskId = task.TaskId,
|
||||||
Name = task.Name,
|
Name = task.Name,
|
||||||
Type = task.Type.ToString(),
|
Type = task.Type.ToString(),
|
||||||
CompletedAt = task.CompletedAt?.ToString("%O", null) ?? task.UpdatedAt.ToString("%O", null),
|
CompletedAt = task.CompletedAt?.ToString() ?? task.UpdatedAt.ToString(),
|
||||||
Results = task.Results
|
Results = task.Results
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -418,7 +458,7 @@ public class PersistentTaskService(
|
|||||||
var wsPacket = new WebSocketPacket
|
var wsPacket = new WebSocketPacket
|
||||||
{
|
{
|
||||||
Type = "task.completed",
|
Type = "task.completed",
|
||||||
Data = Google.Protobuf.ByteString.CopyFromUtf8(System.Text.Json.JsonSerializer.Serialize(data))
|
Data = GrpcTypeHelper.ConvertObjectToByteString(data)
|
||||||
};
|
};
|
||||||
|
|
||||||
await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest
|
await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest
|
||||||
@@ -430,7 +470,7 @@ public class PersistentTaskService(
|
|||||||
// Push notification
|
// Push notification
|
||||||
var pushNotification = new PushNotification
|
var pushNotification = new PushNotification
|
||||||
{
|
{
|
||||||
Topic = "task",
|
Topic = "drive.tasks",
|
||||||
Title = "Task Completed",
|
Title = "Task Completed",
|
||||||
Subtitle = task.Name,
|
Subtitle = task.Name,
|
||||||
Body = $"Your {task.Type.ToString().ToLower()} task has completed successfully.",
|
Body = $"Your {task.Type.ToString().ToLower()} task has completed successfully.",
|
||||||
@@ -458,7 +498,7 @@ public class PersistentTaskService(
|
|||||||
TaskId = task.TaskId,
|
TaskId = task.TaskId,
|
||||||
Name = task.Name,
|
Name = task.Name,
|
||||||
Type = task.Type.ToString(),
|
Type = task.Type.ToString(),
|
||||||
FailedAt = task.UpdatedAt.ToString("%O", null),
|
FailedAt = task.UpdatedAt.ToString(),
|
||||||
ErrorMessage = task.ErrorMessage ?? "Task failed due to an unknown error"
|
ErrorMessage = task.ErrorMessage ?? "Task failed due to an unknown error"
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -466,7 +506,7 @@ public class PersistentTaskService(
|
|||||||
var wsPacket = new WebSocketPacket
|
var wsPacket = new WebSocketPacket
|
||||||
{
|
{
|
||||||
Type = "task.failed",
|
Type = "task.failed",
|
||||||
Data = Google.Protobuf.ByteString.CopyFromUtf8(System.Text.Json.JsonSerializer.Serialize(data))
|
Data = GrpcTypeHelper.ConvertObjectToByteString(data)
|
||||||
};
|
};
|
||||||
|
|
||||||
await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest
|
await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest
|
||||||
@@ -478,7 +518,7 @@ public class PersistentTaskService(
|
|||||||
// Push notification
|
// Push notification
|
||||||
var pushNotification = new PushNotification
|
var pushNotification = new PushNotification
|
||||||
{
|
{
|
||||||
Topic = "task",
|
Topic = "drive.tasks",
|
||||||
Title = "Task Failed",
|
Title = "Task Failed",
|
||||||
Subtitle = task.Name,
|
Subtitle = task.Name,
|
||||||
Body = $"Your {task.Type.ToString().ToLower()} task has failed.",
|
Body = $"Your {task.Type.ToString().ToLower()} task has failed.",
|
||||||
@@ -657,16 +697,31 @@ public class PersistentTaskService(
|
|||||||
{
|
{
|
||||||
var previousProgress = task.ChunksCount > 0 ? (double)task.ChunksUploaded / task.ChunksCount * 100 : 0;
|
var previousProgress = task.ChunksCount > 0 ? (double)task.ChunksUploaded / task.ChunksCount * 100 : 0;
|
||||||
|
|
||||||
task.UploadedChunks.Add(chunkIndex);
|
// Use ExecuteUpdateAsync for better performance - update only the fields we need
|
||||||
task.ChunksUploaded = task.UploadedChunks.Count;
|
var now = SystemClock.Instance.GetCurrentInstant();
|
||||||
task.LastActivity = SystemClock.Instance.GetCurrentInstant();
|
var updatedRows = await db.Tasks
|
||||||
|
.OfType<PersistentUploadTask>()
|
||||||
|
.Where(t => t.TaskId == taskId)
|
||||||
|
.ExecuteUpdateAsync(setters => setters
|
||||||
|
.SetProperty(t => t.UploadedChunks, t => t.UploadedChunks.Append(chunkIndex).Distinct().ToList())
|
||||||
|
.SetProperty(t => t.ChunksUploaded, t => t.UploadedChunks.Count)
|
||||||
|
.SetProperty(t => t.LastActivity, now)
|
||||||
|
.SetProperty(t => t.UpdatedAt, now)
|
||||||
|
);
|
||||||
|
|
||||||
await db.SaveChangesAsync();
|
if (updatedRows > 0)
|
||||||
await SetCacheAsync(task);
|
{
|
||||||
|
// Update the cached task
|
||||||
|
task.UploadedChunks.Add(chunkIndex);
|
||||||
|
task.ChunksUploaded = task.UploadedChunks.Count;
|
||||||
|
task.LastActivity = now;
|
||||||
|
task.UpdatedAt = now;
|
||||||
|
await SetCacheAsync(task);
|
||||||
|
|
||||||
// Send real-time progress update
|
// Send real-time progress update
|
||||||
var newProgress = task.ChunksCount > 0 ? (double)task.ChunksUploaded / task.ChunksCount * 100 : 0;
|
var newProgress = task.ChunksCount > 0 ? (double)task.ChunksUploaded / task.ChunksCount * 100 : 0;
|
||||||
await SendUploadProgressUpdateAsync(task, newProgress, previousProgress);
|
await SendUploadProgressUpdateAsync(task, newProgress, previousProgress);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -727,17 +782,17 @@ public class PersistentTaskService(
|
|||||||
? query.OrderByDescending(t => t.FileSize)
|
? query.OrderByDescending(t => t.FileSize)
|
||||||
: query.OrderBy(t => t.FileSize);
|
: query.OrderBy(t => t.FileSize);
|
||||||
break;
|
break;
|
||||||
case "createdat":
|
case "created":
|
||||||
orderedQuery = sortDescending
|
orderedQuery = sortDescending
|
||||||
? query.OrderByDescending(t => t.CreatedAt)
|
? query.OrderByDescending(t => t.CreatedAt)
|
||||||
: query.OrderBy(t => t.CreatedAt);
|
: query.OrderBy(t => t.CreatedAt);
|
||||||
break;
|
break;
|
||||||
case "updatedat":
|
case "updated":
|
||||||
orderedQuery = sortDescending
|
orderedQuery = sortDescending
|
||||||
? query.OrderByDescending(t => t.UpdatedAt)
|
? query.OrderByDescending(t => t.UpdatedAt)
|
||||||
: query.OrderBy(t => t.UpdatedAt);
|
: query.OrderBy(t => t.UpdatedAt);
|
||||||
break;
|
break;
|
||||||
case "lastactivity":
|
case "activity":
|
||||||
default:
|
default:
|
||||||
orderedQuery = sortDescending
|
orderedQuery = sortDescending
|
||||||
? query.OrderByDescending(t => t.LastActivity)
|
? query.OrderByDescending(t => t.LastActivity)
|
||||||
@@ -774,19 +829,19 @@ public class PersistentTaskService(
|
|||||||
TotalUploadedBytes = tasks.Sum(t => (long)t.ChunksUploaded * t.ChunkSize),
|
TotalUploadedBytes = tasks.Sum(t => (long)t.ChunksUploaded * t.ChunkSize),
|
||||||
AverageProgress = tasks.Any(t => t.Status == Model.TaskStatus.InProgress)
|
AverageProgress = tasks.Any(t => t.Status == Model.TaskStatus.InProgress)
|
||||||
? tasks.Where(t => t.Status == Model.TaskStatus.InProgress)
|
? tasks.Where(t => t.Status == Model.TaskStatus.InProgress)
|
||||||
.Average(t => t.ChunksCount > 0 ? (double)t.ChunksUploaded / t.ChunksCount * 100 : 0)
|
.Average(t => t.ChunksCount > 0 ? (double)t.ChunksUploaded / t.ChunksCount * 100 : 0)
|
||||||
: 0,
|
: 0,
|
||||||
RecentActivity = tasks.OrderByDescending(t => t.LastActivity)
|
RecentActivity = tasks.OrderByDescending(t => t.LastActivity)
|
||||||
.Take(5)
|
.Take(5)
|
||||||
.Select(t => new RecentActivity
|
.Select(t => new RecentActivity
|
||||||
{
|
{
|
||||||
TaskId = t.TaskId,
|
TaskId = t.TaskId,
|
||||||
FileName = t.FileName,
|
FileName = t.FileName,
|
||||||
Status = (UploadTaskStatus)t.Status,
|
Status = (UploadTaskStatus)t.Status,
|
||||||
LastActivity = t.LastActivity,
|
LastActivity = t.LastActivity,
|
||||||
Progress = t.ChunksCount > 0 ? (double)t.ChunksUploaded / t.ChunksCount * 100 : 0
|
Progress = t.ChunksCount > 0 ? (double)t.ChunksUploaded / t.ChunksCount * 100 : 0
|
||||||
})
|
})
|
||||||
.ToList()
|
.ToList()
|
||||||
};
|
};
|
||||||
|
|
||||||
return stats;
|
return stats;
|
||||||
@@ -800,7 +855,7 @@ public class PersistentTaskService(
|
|||||||
var failedTasks = await db.Tasks
|
var failedTasks = await db.Tasks
|
||||||
.OfType<PersistentUploadTask>()
|
.OfType<PersistentUploadTask>()
|
||||||
.Where(t => t.AccountId == accountId &&
|
.Where(t => t.AccountId == accountId &&
|
||||||
(t.Status == Model.TaskStatus.Failed || t.Status == Model.TaskStatus.Expired))
|
(t.Status == TaskStatus.Failed || t.Status == TaskStatus.Expired))
|
||||||
.ToListAsync();
|
.ToListAsync();
|
||||||
|
|
||||||
foreach (var task in failedTasks)
|
foreach (var task in failedTasks)
|
||||||
@@ -809,16 +864,14 @@ public class PersistentTaskService(
|
|||||||
|
|
||||||
// Clean up temp files
|
// Clean up temp files
|
||||||
var taskPath = Path.Combine(Path.GetTempPath(), "multipart-uploads", task.TaskId);
|
var taskPath = Path.Combine(Path.GetTempPath(), "multipart-uploads", task.TaskId);
|
||||||
if (Directory.Exists(taskPath))
|
if (!Directory.Exists(taskPath)) continue;
|
||||||
|
try
|
||||||
{
|
{
|
||||||
try
|
Directory.Delete(taskPath, true);
|
||||||
{
|
}
|
||||||
Directory.Delete(taskPath, true);
|
catch (Exception ex)
|
||||||
}
|
{
|
||||||
catch (Exception ex)
|
logger.LogWarning(ex, "Failed to cleanup temp files for task {TaskId}", task.TaskId);
|
||||||
{
|
|
||||||
logger.LogWarning(ex, "Failed to cleanup temp files for task {TaskId}", task.TaskId);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -854,14 +907,14 @@ public class PersistentTaskService(
|
|||||||
FileId = fileId,
|
FileId = fileId,
|
||||||
FileName = task.FileName,
|
FileName = task.FileName,
|
||||||
FileSize = task.FileSize,
|
FileSize = task.FileSize,
|
||||||
CompletedAt = SystemClock.Instance.GetCurrentInstant().ToString("%O", null)
|
CompletedAt = SystemClock.Instance.GetCurrentInstant().ToString()
|
||||||
};
|
};
|
||||||
|
|
||||||
// Send WebSocket notification
|
// Send WebSocket notification
|
||||||
var wsPacket = new WebSocketPacket
|
var wsPacket = new WebSocketPacket
|
||||||
{
|
{
|
||||||
Type = "upload.completed",
|
Type = "upload.completed",
|
||||||
Data = Google.Protobuf.ByteString.CopyFromUtf8(System.Text.Json.JsonSerializer.Serialize(completionData))
|
Data = GrpcTypeHelper.ConvertObjectToByteString(completionData)
|
||||||
};
|
};
|
||||||
|
|
||||||
await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest
|
await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest
|
||||||
@@ -873,7 +926,7 @@ public class PersistentTaskService(
|
|||||||
// Send push notification
|
// Send push notification
|
||||||
var pushNotification = new PushNotification
|
var pushNotification = new PushNotification
|
||||||
{
|
{
|
||||||
Topic = "upload",
|
Topic = "drive.tasks.upload",
|
||||||
Title = "Upload Completed",
|
Title = "Upload Completed",
|
||||||
Subtitle = task.FileName,
|
Subtitle = task.FileName,
|
||||||
Body = $"Your file '{task.FileName}' has been uploaded successfully.",
|
Body = $"Your file '{task.FileName}' has been uploaded successfully.",
|
||||||
@@ -904,7 +957,7 @@ public class PersistentTaskService(
|
|||||||
TaskId = task.TaskId,
|
TaskId = task.TaskId,
|
||||||
FileName = task.FileName,
|
FileName = task.FileName,
|
||||||
FileSize = task.FileSize,
|
FileSize = task.FileSize,
|
||||||
FailedAt = SystemClock.Instance.GetCurrentInstant().ToString("%O", null),
|
FailedAt = SystemClock.Instance.GetCurrentInstant().ToString(),
|
||||||
ErrorMessage = errorMessage ?? "Upload failed due to an unknown error"
|
ErrorMessage = errorMessage ?? "Upload failed due to an unknown error"
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -912,7 +965,7 @@ public class PersistentTaskService(
|
|||||||
var wsPacket = new WebSocketPacket
|
var wsPacket = new WebSocketPacket
|
||||||
{
|
{
|
||||||
Type = "upload.failed",
|
Type = "upload.failed",
|
||||||
Data = Google.Protobuf.ByteString.CopyFromUtf8(System.Text.Json.JsonSerializer.Serialize(failureData))
|
Data = GrpcTypeHelper.ConvertObjectToByteString(failureData)
|
||||||
};
|
};
|
||||||
|
|
||||||
await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest
|
await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest
|
||||||
@@ -924,7 +977,7 @@ public class PersistentTaskService(
|
|||||||
// Send push notification
|
// Send push notification
|
||||||
var pushNotification = new PushNotification
|
var pushNotification = new PushNotification
|
||||||
{
|
{
|
||||||
Topic = "upload",
|
Topic = "drive.tasks.upload",
|
||||||
Title = "Upload Failed",
|
Title = "Upload Failed",
|
||||||
Subtitle = task.FileName,
|
Subtitle = task.FileName,
|
||||||
Body = $"Your file '{task.FileName}' upload has failed. You can try again.",
|
Body = $"Your file '{task.FileName}' upload has failed. You can try again.",
|
||||||
@@ -946,7 +999,8 @@ public class PersistentTaskService(
|
|||||||
/// <summary>
|
/// <summary>
|
||||||
/// Sends real-time upload progress update via WebSocket
|
/// Sends real-time upload progress update via WebSocket
|
||||||
/// </summary>
|
/// </summary>
|
||||||
private async Task SendUploadProgressUpdateAsync(PersistentUploadTask task, double newProgress, double previousProgress)
|
private async Task SendUploadProgressUpdateAsync(PersistentUploadTask task, double newProgress,
|
||||||
|
double previousProgress)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
@@ -963,13 +1017,13 @@ public class PersistentTaskService(
|
|||||||
ChunksTotal = task.ChunksCount,
|
ChunksTotal = task.ChunksCount,
|
||||||
Progress = newProgress,
|
Progress = newProgress,
|
||||||
Status = task.Status.ToString(),
|
Status = task.Status.ToString(),
|
||||||
LastActivity = task.LastActivity.ToString("%O", null)
|
LastActivity = task.LastActivity.ToString()
|
||||||
};
|
};
|
||||||
|
|
||||||
var packet = new WebSocketPacket
|
var packet = new WebSocketPacket
|
||||||
{
|
{
|
||||||
Type = "upload.progress",
|
Type = "upload.progress",
|
||||||
Data = Google.Protobuf.ByteString.CopyFromUtf8(System.Text.Json.JsonSerializer.Serialize(progressData))
|
Data = GrpcTypeHelper.ConvertObjectToByteString(progressData)
|
||||||
};
|
};
|
||||||
|
|
||||||
await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest
|
await ringService.PushWebSocketPacketAsync(new PushWebSocketPacketRequest
|
||||||
|
|||||||
@@ -1,33 +1,35 @@
|
|||||||
# DysonNetwork Drive - Persistent/Resumable Upload System
|
# DysonNetwork Drive - Persistent Task System
|
||||||
|
|
||||||
A comprehensive, production-ready file upload system with resumable uploads, real-time progress tracking, and dynamic notifications powered by RingService.
|
A comprehensive, production-ready generic task system with support for file uploads, background operations, real-time progress tracking, and dynamic notifications powered by RingService.
|
||||||
|
|
||||||
When using with the Gateway, use the `/drive` to replace `/api`.
|
When using with the Gateway, use the `/drive` to replace `/api`.
|
||||||
The realtime messages are from the websocket gateway.
|
The realtime messages are from the websocket gateway.
|
||||||
|
|
||||||
## 🚀 Features
|
## 🚀 Features
|
||||||
|
|
||||||
### Core Upload Features
|
### Core Task Features
|
||||||
|
- **Generic Task System**: Support for various background operations beyond file uploads
|
||||||
- **Resumable Uploads**: Pause and resume uploads across app restarts
|
- **Resumable Uploads**: Pause and resume uploads across app restarts
|
||||||
- **Chunked Uploads**: Efficient large file handling with configurable chunk sizes
|
- **Chunked Uploads**: Efficient large file handling with configurable chunk sizes
|
||||||
- **Progress Persistence**: Upload state survives server restarts and network interruptions
|
- **Progress Persistence**: Task state survives server restarts and network interruptions
|
||||||
- **Duplicate Detection**: Automatic detection of already uploaded files via hash checking
|
- **Duplicate Detection**: Automatic detection of already uploaded files via hash checking
|
||||||
- **Quota Management**: Integration with user quota and billing systems
|
- **Quota Management**: Integration with user quota and billing systems
|
||||||
- **Pool-based Storage**: Support for multiple storage pools with different policies
|
- **Pool-based Storage**: Support for multiple storage pools with different policies
|
||||||
|
|
||||||
### Real-Time Features
|
### Real-Time Features
|
||||||
- **Live Progress Updates**: WebSocket-based real-time progress tracking
|
- **Live Progress Updates**: WebSocket-based real-time progress tracking for all task types
|
||||||
- **Completion Notifications**: Instant notifications when uploads complete
|
- **Task Lifecycle Notifications**: Instant notifications for task creation, progress, completion, and failure
|
||||||
- **Failure Alerts**: Immediate notification of upload failures with error details
|
- **Failure Alerts**: Immediate notification of task failures with error details
|
||||||
- **Push Notifications**: Cross-platform push notifications for mobile/desktop
|
- **Push Notifications**: Cross-platform push notifications for mobile/desktop
|
||||||
- **Smart Throttling**: Optimized update frequency to prevent network spam
|
- **Smart Throttling**: Optimized update frequency to prevent network spam
|
||||||
|
|
||||||
### Management Features
|
### Management Features
|
||||||
- **Task Listing**: Comprehensive API for listing and filtering upload tasks
|
- **Task Listing**: Comprehensive API for listing and filtering all task types
|
||||||
- **Task Statistics**: Detailed analytics and usage statistics
|
- **Task Statistics**: Detailed analytics and usage statistics for all operations
|
||||||
- **Cleanup Operations**: Automatic and manual cleanup of failed/stale tasks
|
- **Cleanup Operations**: Automatic and manual cleanup of failed/stale tasks
|
||||||
- **Ownership Verification**: Secure access control for all operations
|
- **Ownership Verification**: Secure access control for all operations
|
||||||
- **Detailed Task Info**: Rich metadata including speed calculations and ETAs
|
- **Detailed Task Info**: Rich metadata including progress, parameters, and results
|
||||||
|
- **Task Lifecycle Management**: Full control over task states (pause, resume, cancel)
|
||||||
|
|
||||||
## 📋 Table of Contents
|
## 📋 Table of Contents
|
||||||
|
|
||||||
@@ -93,18 +95,29 @@ Creates a new resumable upload task.
|
|||||||
**Request Body:**
|
**Request Body:**
|
||||||
```json
|
```json
|
||||||
{
|
{
|
||||||
"fileName": "string", // Required: Name of the file
|
"fileName": "string",
|
||||||
"fileSize": "long", // Required: Size in bytes
|
"fileSize": "long",
|
||||||
"contentType": "string", // Required: MIME type
|
"contentType": "string",
|
||||||
"poolId": "uuid", // Optional: Storage pool ID
|
"poolId": "uuid",
|
||||||
"bundleId": "uuid", // Optional: File bundle ID
|
"bundleId": "uuid",
|
||||||
"chunkSize": "long", // Optional: Chunk size (default: 5MB)
|
"chunkSize": "long",
|
||||||
"encryptPassword": "string", // Optional: Encryption password
|
"encryptPassword": "string",
|
||||||
"expiredAt": "datetime", // Optional: Expiration date
|
"expiredAt": "datetime",
|
||||||
"hash": "string" // Required: File hash for deduplication
|
"hash": "string"
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
**Field Descriptions:**
|
||||||
|
- `fileName`: Required - Name of the file
|
||||||
|
- `fileSize`: Required - Size in bytes
|
||||||
|
- `contentType`: Required - MIME type
|
||||||
|
- `poolId`: Optional - Storage pool ID
|
||||||
|
- `bundleId`: Optional - File bundle ID
|
||||||
|
- `chunkSize`: Optional - Chunk size (default: 5MB)
|
||||||
|
- `encryptPassword`: Optional - Encryption password
|
||||||
|
- `expiredAt`: Optional - Expiration date
|
||||||
|
- `hash`: Required - File hash for deduplication
|
||||||
|
|
||||||
**Response:**
|
**Response:**
|
||||||
```json
|
```json
|
||||||
{
|
{
|
||||||
@@ -175,7 +188,7 @@ Gets upload statistics for the current user.
|
|||||||
"expiredTasks": 1,
|
"expiredTasks": 1,
|
||||||
"totalUploadedBytes": 5368709120,
|
"totalUploadedBytes": 5368709120,
|
||||||
"averageProgress": 67.5,
|
"averageProgress": 67.5,
|
||||||
"recentActivity": [...]
|
"recentActivity": []
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
@@ -187,56 +200,73 @@ Gets the most recent upload tasks.
|
|||||||
|
|
||||||
## 🔌 WebSocket Events
|
## 🔌 WebSocket Events
|
||||||
|
|
||||||
The system sends real-time updates via WebSocket using RingService. Connect to the WebSocket endpoint and listen for upload-related events.
|
The system sends real-time updates via WebSocket using RingService. Connect to the WebSocket endpoint and listen for task-related events.
|
||||||
|
|
||||||
### Event Types
|
### Event Types
|
||||||
|
|
||||||
#### `upload.progress`
|
#### `task.created`
|
||||||
Sent when upload progress changes significantly (every 5% or major milestones).
|
Sent when a new task is created.
|
||||||
|
|
||||||
```json
|
```json
|
||||||
{
|
{
|
||||||
"type": "upload.progress",
|
"type": "task.created",
|
||||||
"data": {
|
"data": {
|
||||||
"taskId": "abc123def456",
|
"taskId": "task123",
|
||||||
"fileName": "document.pdf",
|
"name": "Upload File",
|
||||||
"fileSize": 10485760,
|
"type": "FileUpload",
|
||||||
"chunksUploaded": 5,
|
"createdAt": "2025-11-09T02:00:00Z"
|
||||||
"chunksTotal": 10,
|
}
|
||||||
"progress": 50.0,
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
#### `task.progress`
|
||||||
|
Sent when task progress changes significantly (every 5% or major milestones).
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"type": "task.progress",
|
||||||
|
"data": {
|
||||||
|
"taskId": "task123",
|
||||||
|
"name": "Upload File",
|
||||||
|
"type": "FileUpload",
|
||||||
|
"progress": 67.5,
|
||||||
"status": "InProgress",
|
"status": "InProgress",
|
||||||
"lastActivity": "2025-11-09T01:56:00.0000000Z"
|
"lastActivity": "2025-11-09T02:05:00Z"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
#### `upload.completed`
|
#### `task.completed`
|
||||||
Sent when an upload completes successfully.
|
Sent when a task completes successfully.
|
||||||
|
|
||||||
```json
|
```json
|
||||||
{
|
{
|
||||||
"type": "upload.completed",
|
"type": "task.completed",
|
||||||
"data": {
|
"data": {
|
||||||
"taskId": "abc123def456",
|
"taskId": "task123",
|
||||||
"fileId": "file789xyz",
|
"name": "Upload File",
|
||||||
"fileName": "document.pdf",
|
"type": "FileUpload",
|
||||||
"fileSize": 10485760,
|
"completedAt": "2025-11-09T02:10:00Z",
|
||||||
"completedAt": "2025-11-09T01:57:00.0000000Z"
|
"results": {
|
||||||
|
"fileId": "file456",
|
||||||
|
"fileName": "document.pdf",
|
||||||
|
"fileSize": 10485760
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
#### `upload.failed`
|
#### `task.failed`
|
||||||
Sent when an upload fails.
|
Sent when a task fails.
|
||||||
|
|
||||||
```json
|
```json
|
||||||
{
|
{
|
||||||
"type": "upload.failed",
|
"type": "task.failed",
|
||||||
"data": {
|
"data": {
|
||||||
"taskId": "abc123def456",
|
"taskId": "task123",
|
||||||
"fileName": "document.pdf",
|
"name": "Upload File",
|
||||||
"fileSize": 10485760,
|
"type": "FileUpload",
|
||||||
"failedAt": "2025-11-09T01:58:00.0000000Z",
|
"failedAt": "2025-11-09T02:15:00Z",
|
||||||
"errorMessage": "File processing failed: invalid format"
|
"errorMessage": "File processing failed: invalid format"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -256,18 +286,18 @@ ws.onopen = () => {
|
|||||||
}));
|
}));
|
||||||
};
|
};
|
||||||
|
|
||||||
// Handle upload events
|
// Handle task events
|
||||||
ws.onmessage = (event) => {
|
ws.onmessage = (event) => {
|
||||||
const packet = JSON.parse(event.data);
|
const packet = JSON.parse(event.data);
|
||||||
|
|
||||||
switch (packet.type) {
|
switch (packet.type) {
|
||||||
case 'upload.progress':
|
case 'task.progress':
|
||||||
updateProgressBar(packet.data);
|
updateProgressBar(packet.data);
|
||||||
break;
|
break;
|
||||||
case 'upload.completed':
|
case 'task.completed':
|
||||||
showSuccessNotification(packet.data);
|
showSuccessNotification(packet.data);
|
||||||
break;
|
break;
|
||||||
case 'upload.failed':
|
case 'task.failed':
|
||||||
showErrorNotification(packet.data);
|
showErrorNotification(packet.data);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@@ -282,6 +312,10 @@ function updateProgressBar(data) {
|
|||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### Note on Upload-Specific Notifications
|
||||||
|
|
||||||
|
The system also includes upload-specific notifications (`upload.progress`, `upload.completed`, `upload.failed`) for backward compatibility. However, for new implementations, it's recommended to use the generic task notifications as they provide the same functionality with less object allocation overhead. Since users are typically in the foreground during upload operations, the generic task notifications provide sufficient progress visibility.
|
||||||
|
|
||||||
## 🗄️ Database Schema
|
## 🗄️ Database Schema
|
||||||
|
|
||||||
### `upload_tasks` Table
|
### `upload_tasks` Table
|
||||||
@@ -348,7 +382,7 @@ UPLOAD_CACHE_DURATION_MINUTES=30
|
|||||||
|
|
||||||
```csharp
|
```csharp
|
||||||
// In Program.cs or Startup.cs
|
// In Program.cs or Startup.cs
|
||||||
builder.Services.AddScoped<PersistentUploadService>();
|
builder.Services.AddScoped<PersistentTaskService>();
|
||||||
builder.Services.AddSingleton<RingService.RingServiceClient>(sp => {
|
builder.Services.AddSingleton<RingService.RingServiceClient>(sp => {
|
||||||
// Configure gRPC client for RingService
|
// Configure gRPC client for RingService
|
||||||
var channel = GrpcChannel.ForAddress("https://ring-service:50051");
|
var channel = GrpcChannel.ForAddress("https://ring-service:50051");
|
||||||
@@ -754,7 +788,7 @@ public class PersistentTaskService(
|
|||||||
|
|
||||||
### Real-Time Task Notifications
|
### Real-Time Task Notifications
|
||||||
|
|
||||||
All task operations send WebSocket notifications via RingService:
|
All task operations send WebSocket notifications via RingService using the shared `GrpcTypeHelper` for consistent JSON serialization:
|
||||||
|
|
||||||
#### Task Created
|
#### Task Created
|
||||||
```json
|
```json
|
||||||
@@ -867,6 +901,36 @@ Tasks support multiple statuses:
|
|||||||
- **Cancelled**: Manually cancelled
|
- **Cancelled**: Manually cancelled
|
||||||
- **Expired**: Timed out or expired
|
- **Expired**: Timed out or expired
|
||||||
|
|
||||||
|
### Available Service Methods
|
||||||
|
|
||||||
|
Based on the `PersistentTaskService` implementation, the following methods are available:
|
||||||
|
|
||||||
|
#### Core Task Operations
|
||||||
|
- `CreateTaskAsync<T>(T task)`: Creates any type of persistent task
|
||||||
|
- `GetTaskAsync<T>(string taskId)`: Retrieves a task by ID with caching
|
||||||
|
- `UpdateTaskProgressAsync(string taskId, double progress, string? statusMessage)`: Updates task progress with automatic notifications
|
||||||
|
- `MarkTaskCompletedAsync(string taskId, Dictionary<string, object?>? results)`: Marks task as completed with optional results
|
||||||
|
- `MarkTaskFailedAsync(string taskId, string? errorMessage)`: Marks task as failed with error message
|
||||||
|
- `PauseTaskAsync(string taskId)`: Pauses an in-progress task
|
||||||
|
- `ResumeTaskAsync(string taskId)`: Resumes a paused task
|
||||||
|
- `CancelTaskAsync(string taskId)`: Cancels a task
|
||||||
|
|
||||||
|
#### Task Querying & Statistics
|
||||||
|
- `GetUserTasksAsync()`: Gets tasks for a user with filtering and pagination
|
||||||
|
- `GetUserTaskStatsAsync(Guid accountId)`: Gets comprehensive task statistics
|
||||||
|
- `CleanupOldTasksAsync(Guid accountId, Duration maxAge)`: Cleans up old completed/failed tasks
|
||||||
|
|
||||||
|
#### Upload-Specific Operations
|
||||||
|
- `CreateUploadTaskAsync()`: Creates a new persistent upload task
|
||||||
|
- `GetUploadTaskAsync(string taskId)`: Gets an existing upload task
|
||||||
|
- `UpdateChunkProgressAsync(string taskId, int chunkIndex)`: Updates chunk upload progress
|
||||||
|
- `IsChunkUploadedAsync(string taskId, int chunkIndex)`: Checks if a chunk has been uploaded
|
||||||
|
- `GetUploadProgressAsync(string taskId)`: Gets upload progress as percentage
|
||||||
|
- `GetUserUploadTasksAsync()`: Gets user upload tasks with filtering
|
||||||
|
- `GetUserUploadStatsAsync(Guid accountId)`: Gets upload statistics for a user
|
||||||
|
- `CleanupUserFailedTasksAsync(Guid accountId)`: Cleans up failed upload tasks
|
||||||
|
- `GetRecentUserTasksAsync(Guid accountId, int limit)`: Gets recent upload tasks
|
||||||
|
|
||||||
### Priority System
|
### Priority System
|
||||||
|
|
||||||
Tasks can be assigned priorities (0-100, higher = more important) to control execution order in background processing.
|
Tasks can be assigned priorities (0-100, higher = more important) to control execution order in background processing.
|
||||||
|
|||||||
Reference in New Issue
Block a user