🐛 Dozens of bug fixes to new task system

This commit is contained in:
2025-11-10 00:14:41 +08:00
parent db98fa240e
commit 2bfb50cc71
4 changed files with 192 additions and 76 deletions

View File

@@ -1,5 +1,7 @@
using System.Text.Json;
using DysonNetwork.Drive.Storage;
using DysonNetwork.Drive.Storage.Model;
using DysonNetwork.Shared.Models;
using DysonNetwork.Shared.Proto;
using DysonNetwork.Shared.Stream;
using FFMpegCore;
@@ -142,6 +144,7 @@ public class BroadcastEventHandler(
using var scope = serviceProvider.CreateScope();
var fs = scope.ServiceProvider.GetRequiredService<FileService>();
var scopedDb = scope.ServiceProvider.GetRequiredService<AppDatabase>();
var persistentTaskService = scope.ServiceProvider.GetRequiredService<PersistentTaskService>();
var pool = await fs.GetPoolAsync(remoteId);
if (pool is null) return;
@@ -155,6 +158,11 @@ public class BroadcastEventHandler(
var fileToUpdate = await scopedDb.Files.AsNoTracking().FirstAsync(f => f.Id == fileId);
// Find the upload task associated with this file
var uploadTask = await scopedDb.Tasks
.OfType<PersistentUploadTask>()
.FirstOrDefaultAsync(t => t.FileName == fileToUpdate.Name && t.FileSize == fileToUpdate.Size);
if (fileToUpdate.IsEncrypted)
{
uploads.Add((processingFilePath, string.Empty, contentType, false));
@@ -293,5 +301,52 @@ public class BroadcastEventHandler(
}
await fs._PurgeCacheAsync(fileId);
// Complete the upload task if found
if (uploadTask != null)
{
await persistentTaskService.MarkTaskCompletedAsync(uploadTask.TaskId, new Dictionary<string, object?>
{
{ "fileId", fileId },
{ "fileName", fileToUpdate.Name },
{ "fileSize", fileToUpdate.Size },
{ "mimeType", newMimeType },
{ "hasCompression", hasCompression },
{ "hasThumbnail", hasThumbnail }
});
// Send push notification for large files (>5MB) that took longer to process
if (fileToUpdate.Size > 5 * 1024 * 1024) // 5MB threshold
{
await SendLargeFileProcessingCompleteNotificationAsync(uploadTask, fileToUpdate);
}
}
}
private async Task SendLargeFileProcessingCompleteNotificationAsync(PersistentUploadTask task, SnCloudFile file)
{
try
{
var ringService = serviceProvider.GetRequiredService<RingService.RingServiceClient>();
var pushNotification = new PushNotification
{
Topic = "drive.tasks.upload",
Title = "File Processing Complete",
Subtitle = file.Name,
Body = $"Your file '{file.Name}' has finished processing and is now available.",
IsSavable = true
};
await ringService.SendPushNotificationToUserAsync(new SendPushNotificationToUserRequest
{
UserId = task.AccountId.ToString(),
Notification = pushNotification
});
}
catch (Exception ex)
{
logger.LogWarning(ex, "Failed to send large file processing notification for task {TaskId}", task.TaskId);
}
}
}

View File

@@ -293,10 +293,10 @@ public class FileUploadController(
persistentTask.ExpiredAt
);
// Mark task as completed
await persistentTaskService.MarkTaskCompletedAsync(taskId);
// Update task status to "processing" - background processing is now happening
await persistentTaskService.UpdateTaskProgressAsync(taskId, 95, "Processing file in background...");
// Send completion notification
// Send upload completion notification (file is uploaded, but processing continues)
await persistentTaskService.SendUploadCompletedNotificationAsync(persistentTask, fileId);
return Ok(cloudFile);

View File

@@ -26,7 +26,7 @@ public class PersistentTaskService(
/// </summary>
public async Task<T> CreateTaskAsync<T>(T task) where T : PersistentTask
{
task.TaskId = NanoidDotNet.Nanoid.Generate();
task.TaskId = await Nanoid.GenerateAsync();
var now = SystemClock.Instance.GetCurrentInstant();
task.CreatedAt = now;
task.UpdatedAt = now;
@@ -45,7 +45,7 @@ public class PersistentTaskService(
/// <summary>
/// Gets a task by ID
/// </summary>
public async Task<T?> GetTaskAsync<T>(string taskId) where T : PersistentTask
private async Task<T?> GetTaskAsync<T>(string taskId) where T : PersistentTask
{
var cacheKey = $"{CacheKeyPrefix}{taskId}";
var cachedTask = await cache.GetAsync<T>(cacheKey);
@@ -55,13 +55,10 @@ public class PersistentTaskService(
var task = await db.Tasks
.FirstOrDefaultAsync(t => t.TaskId == taskId);
if (task is T typedTask)
{
await SetCacheAsync(typedTask);
return typedTask;
}
if (task is not T typedTask) return null;
await SetCacheAsync(typedTask);
return typedTask;
return null;
}
/// <summary>
@@ -344,7 +341,7 @@ public class PersistentTaskService(
TaskId = task.TaskId,
Name = task.Name,
Type = task.Type.ToString(),
CreatedAt = task.CreatedAt.ToString("%O", null)
CreatedAt = task.CreatedAt.ToString()
};
var packet = new WebSocketPacket
@@ -380,7 +377,7 @@ public class PersistentTaskService(
Type = task.Type.ToString(),
Progress = newProgress,
Status = task.Status.ToString(),
LastActivity = task.LastActivity.ToString("%O", null)
LastActivity = task.LastActivity.ToString()
};
var packet = new WebSocketPacket
@@ -410,7 +407,7 @@ public class PersistentTaskService(
TaskId = task.TaskId,
Name = task.Name,
Type = task.Type.ToString(),
CompletedAt = task.CompletedAt?.ToString("%O", null) ?? task.UpdatedAt.ToString("%O", null),
CompletedAt = task.CompletedAt?.ToString() ?? task.UpdatedAt.ToString(),
Results = task.Results
};
@@ -430,7 +427,7 @@ public class PersistentTaskService(
// Push notification
var pushNotification = new PushNotification
{
Topic = "task",
Topic = "drive.tasks",
Title = "Task Completed",
Subtitle = task.Name,
Body = $"Your {task.Type.ToString().ToLower()} task has completed successfully.",
@@ -458,7 +455,7 @@ public class PersistentTaskService(
TaskId = task.TaskId,
Name = task.Name,
Type = task.Type.ToString(),
FailedAt = task.UpdatedAt.ToString("%O", null),
FailedAt = task.UpdatedAt.ToString(),
ErrorMessage = task.ErrorMessage ?? "Task failed due to an unknown error"
};
@@ -478,7 +475,7 @@ public class PersistentTaskService(
// Push notification
var pushNotification = new PushNotification
{
Topic = "task",
Topic = "drive.tasks",
Title = "Task Failed",
Subtitle = task.Name,
Body = $"Your {task.Type.ToString().ToLower()} task has failed.",
@@ -727,17 +724,17 @@ public class PersistentTaskService(
? query.OrderByDescending(t => t.FileSize)
: query.OrderBy(t => t.FileSize);
break;
case "createdat":
case "created":
orderedQuery = sortDescending
? query.OrderByDescending(t => t.CreatedAt)
: query.OrderBy(t => t.CreatedAt);
break;
case "updatedat":
case "updated":
orderedQuery = sortDescending
? query.OrderByDescending(t => t.UpdatedAt)
: query.OrderBy(t => t.UpdatedAt);
break;
case "lastactivity":
case "activity":
default:
orderedQuery = sortDescending
? query.OrderByDescending(t => t.LastActivity)
@@ -854,7 +851,7 @@ public class PersistentTaskService(
FileId = fileId,
FileName = task.FileName,
FileSize = task.FileSize,
CompletedAt = SystemClock.Instance.GetCurrentInstant().ToString("%O", null)
CompletedAt = SystemClock.Instance.GetCurrentInstant().ToString()
};
// Send WebSocket notification
@@ -873,7 +870,7 @@ public class PersistentTaskService(
// Send push notification
var pushNotification = new PushNotification
{
Topic = "upload",
Topic = "drive.tasks.upload",
Title = "Upload Completed",
Subtitle = task.FileName,
Body = $"Your file '{task.FileName}' has been uploaded successfully.",
@@ -904,7 +901,7 @@ public class PersistentTaskService(
TaskId = task.TaskId,
FileName = task.FileName,
FileSize = task.FileSize,
FailedAt = SystemClock.Instance.GetCurrentInstant().ToString("%O", null),
FailedAt = SystemClock.Instance.GetCurrentInstant().ToString(),
ErrorMessage = errorMessage ?? "Upload failed due to an unknown error"
};
@@ -924,7 +921,7 @@ public class PersistentTaskService(
// Send push notification
var pushNotification = new PushNotification
{
Topic = "upload",
Topic = "drive.tasks.upload",
Title = "Upload Failed",
Subtitle = task.FileName,
Body = $"Your file '{task.FileName}' upload has failed. You can try again.",
@@ -963,7 +960,7 @@ public class PersistentTaskService(
ChunksTotal = task.ChunksCount,
Progress = newProgress,
Status = task.Status.ToString(),
LastActivity = task.LastActivity.ToString("%O", null)
LastActivity = task.LastActivity.ToString()
};
var packet = new WebSocketPacket

View File

@@ -1,33 +1,35 @@
# DysonNetwork Drive - Persistent/Resumable Upload System
# DysonNetwork Drive - Persistent Task System
A comprehensive, production-ready file upload system with resumable uploads, real-time progress tracking, and dynamic notifications powered by RingService.
A comprehensive, production-ready generic task system with support for file uploads, background operations, real-time progress tracking, and dynamic notifications powered by RingService.
When using with the Gateway, use the `/drive` to replace `/api`.
The realtime messages are from the websocket gateway.
## 🚀 Features
### Core Upload Features
### Core Task Features
- **Generic Task System**: Support for various background operations beyond file uploads
- **Resumable Uploads**: Pause and resume uploads across app restarts
- **Chunked Uploads**: Efficient large file handling with configurable chunk sizes
- **Progress Persistence**: Upload state survives server restarts and network interruptions
- **Progress Persistence**: Task state survives server restarts and network interruptions
- **Duplicate Detection**: Automatic detection of already uploaded files via hash checking
- **Quota Management**: Integration with user quota and billing systems
- **Pool-based Storage**: Support for multiple storage pools with different policies
### Real-Time Features
- **Live Progress Updates**: WebSocket-based real-time progress tracking
- **Completion Notifications**: Instant notifications when uploads complete
- **Failure Alerts**: Immediate notification of upload failures with error details
- **Live Progress Updates**: WebSocket-based real-time progress tracking for all task types
- **Task Lifecycle Notifications**: Instant notifications for task creation, progress, completion, and failure
- **Failure Alerts**: Immediate notification of task failures with error details
- **Push Notifications**: Cross-platform push notifications for mobile/desktop
- **Smart Throttling**: Optimized update frequency to prevent network spam
### Management Features
- **Task Listing**: Comprehensive API for listing and filtering upload tasks
- **Task Statistics**: Detailed analytics and usage statistics
- **Task Listing**: Comprehensive API for listing and filtering all task types
- **Task Statistics**: Detailed analytics and usage statistics for all operations
- **Cleanup Operations**: Automatic and manual cleanup of failed/stale tasks
- **Ownership Verification**: Secure access control for all operations
- **Detailed Task Info**: Rich metadata including speed calculations and ETAs
- **Detailed Task Info**: Rich metadata including progress, parameters, and results
- **Task Lifecycle Management**: Full control over task states (pause, resume, cancel)
## 📋 Table of Contents
@@ -93,18 +95,29 @@ Creates a new resumable upload task.
**Request Body:**
```json
{
"fileName": "string", // Required: Name of the file
"fileSize": "long", // Required: Size in bytes
"contentType": "string", // Required: MIME type
"poolId": "uuid", // Optional: Storage pool ID
"bundleId": "uuid", // Optional: File bundle ID
"chunkSize": "long", // Optional: Chunk size (default: 5MB)
"encryptPassword": "string", // Optional: Encryption password
"expiredAt": "datetime", // Optional: Expiration date
"hash": "string" // Required: File hash for deduplication
"fileName": "string",
"fileSize": "long",
"contentType": "string",
"poolId": "uuid",
"bundleId": "uuid",
"chunkSize": "long",
"encryptPassword": "string",
"expiredAt": "datetime",
"hash": "string"
}
```
**Field Descriptions:**
- `fileName`: Required - Name of the file
- `fileSize`: Required - Size in bytes
- `contentType`: Required - MIME type
- `poolId`: Optional - Storage pool ID
- `bundleId`: Optional - File bundle ID
- `chunkSize`: Optional - Chunk size (default: 5MB)
- `encryptPassword`: Optional - Encryption password
- `expiredAt`: Optional - Expiration date
- `hash`: Required - File hash for deduplication
**Response:**
```json
{
@@ -175,7 +188,7 @@ Gets upload statistics for the current user.
"expiredTasks": 1,
"totalUploadedBytes": 5368709120,
"averageProgress": 67.5,
"recentActivity": [...]
"recentActivity": []
}
```
@@ -187,56 +200,73 @@ Gets the most recent upload tasks.
## 🔌 WebSocket Events
The system sends real-time updates via WebSocket using RingService. Connect to the WebSocket endpoint and listen for upload-related events.
The system sends real-time updates via WebSocket using RingService. Connect to the WebSocket endpoint and listen for task-related events.
### Event Types
#### `upload.progress`
Sent when upload progress changes significantly (every 5% or major milestones).
#### `task.created`
Sent when a new task is created.
```json
{
"type": "upload.progress",
"type": "task.created",
"data": {
"taskId": "abc123def456",
"fileName": "document.pdf",
"fileSize": 10485760,
"chunksUploaded": 5,
"chunksTotal": 10,
"progress": 50.0,
"taskId": "task123",
"name": "Upload File",
"type": "FileUpload",
"createdAt": "2025-11-09T02:00:00Z"
}
}
```
#### `task.progress`
Sent when task progress changes significantly (every 5% or major milestones).
```json
{
"type": "task.progress",
"data": {
"taskId": "task123",
"name": "Upload File",
"type": "FileUpload",
"progress": 67.5,
"status": "InProgress",
"lastActivity": "2025-11-09T01:56:00.0000000Z"
"lastActivity": "2025-11-09T02:05:00Z"
}
}
```
#### `upload.completed`
Sent when an upload completes successfully.
#### `task.completed`
Sent when a task completes successfully.
```json
{
"type": "upload.completed",
"type": "task.completed",
"data": {
"taskId": "abc123def456",
"fileId": "file789xyz",
"fileName": "document.pdf",
"fileSize": 10485760,
"completedAt": "2025-11-09T01:57:00.0000000Z"
"taskId": "task123",
"name": "Upload File",
"type": "FileUpload",
"completedAt": "2025-11-09T02:10:00Z",
"results": {
"fileId": "file456",
"fileName": "document.pdf",
"fileSize": 10485760
}
}
}
```
#### `upload.failed`
Sent when an upload fails.
#### `task.failed`
Sent when a task fails.
```json
{
"type": "upload.failed",
"type": "task.failed",
"data": {
"taskId": "abc123def456",
"fileName": "document.pdf",
"fileSize": 10485760,
"failedAt": "2025-11-09T01:58:00.0000000Z",
"taskId": "task123",
"name": "Upload File",
"type": "FileUpload",
"failedAt": "2025-11-09T02:15:00Z",
"errorMessage": "File processing failed: invalid format"
}
}
@@ -256,18 +286,18 @@ ws.onopen = () => {
}));
};
// Handle upload events
// Handle task events
ws.onmessage = (event) => {
const packet = JSON.parse(event.data);
switch (packet.type) {
case 'upload.progress':
case 'task.progress':
updateProgressBar(packet.data);
break;
case 'upload.completed':
case 'task.completed':
showSuccessNotification(packet.data);
break;
case 'upload.failed':
case 'task.failed':
showErrorNotification(packet.data);
break;
}
@@ -282,6 +312,10 @@ function updateProgressBar(data) {
}
```
### Note on Upload-Specific Notifications
The system also includes upload-specific notifications (`upload.progress`, `upload.completed`, `upload.failed`) for backward compatibility. However, for new implementations, it's recommended to use the generic task notifications as they provide the same functionality with less object allocation overhead. Since users are typically in the foreground during upload operations, the generic task notifications provide sufficient progress visibility.
## 🗄️ Database Schema
### `upload_tasks` Table
@@ -348,7 +382,7 @@ UPLOAD_CACHE_DURATION_MINUTES=30
```csharp
// In Program.cs or Startup.cs
builder.Services.AddScoped<PersistentUploadService>();
builder.Services.AddScoped<PersistentTaskService>();
builder.Services.AddSingleton<RingService.RingServiceClient>(sp => {
// Configure gRPC client for RingService
var channel = GrpcChannel.ForAddress("https://ring-service:50051");
@@ -867,6 +901,36 @@ Tasks support multiple statuses:
- **Cancelled**: Manually cancelled
- **Expired**: Timed out or expired
### Available Service Methods
Based on the `PersistentTaskService` implementation, the following methods are available:
#### Core Task Operations
- `CreateTaskAsync<T>(T task)`: Creates any type of persistent task
- `GetTaskAsync<T>(string taskId)`: Retrieves a task by ID with caching
- `UpdateTaskProgressAsync(string taskId, double progress, string? statusMessage)`: Updates task progress with automatic notifications
- `MarkTaskCompletedAsync(string taskId, Dictionary<string, object?>? results)`: Marks task as completed with optional results
- `MarkTaskFailedAsync(string taskId, string? errorMessage)`: Marks task as failed with error message
- `PauseTaskAsync(string taskId)`: Pauses an in-progress task
- `ResumeTaskAsync(string taskId)`: Resumes a paused task
- `CancelTaskAsync(string taskId)`: Cancels a task
#### Task Querying & Statistics
- `GetUserTasksAsync()`: Gets tasks for a user with filtering and pagination
- `GetUserTaskStatsAsync(Guid accountId)`: Gets comprehensive task statistics
- `CleanupOldTasksAsync(Guid accountId, Duration maxAge)`: Cleans up old completed/failed tasks
#### Upload-Specific Operations
- `CreateUploadTaskAsync()`: Creates a new persistent upload task
- `GetUploadTaskAsync(string taskId)`: Gets an existing upload task
- `UpdateChunkProgressAsync(string taskId, int chunkIndex)`: Updates chunk upload progress
- `IsChunkUploadedAsync(string taskId, int chunkIndex)`: Checks if a chunk has been uploaded
- `GetUploadProgressAsync(string taskId)`: Gets upload progress as percentage
- `GetUserUploadTasksAsync()`: Gets user upload tasks with filtering
- `GetUserUploadStatsAsync(Guid accountId)`: Gets upload statistics for a user
- `CleanupUserFailedTasksAsync(Guid accountId)`: Cleans up failed upload tasks
- `GetRecentUserTasksAsync(Guid accountId, int limit)`: Gets recent upload tasks
### Priority System
Tasks can be assigned priorities (0-100, higher = more important) to control execution order in background processing.