♻️ Fragment based multipart upload API
This commit is contained in:
@ -238,22 +238,24 @@ func AnalyzeAttachment(file models.Attachment) error {
|
||||
|
||||
log.Info().Dur("elapsed", time.Since(start)).Uint("id", file.ID).Msg("A file analyze task was finished, starting uploading...")
|
||||
|
||||
start = time.Now()
|
||||
|
||||
// Move temporary to permanent
|
||||
if !linked {
|
||||
if err := ReUploadFileToPermanent(file, 1); err != nil {
|
||||
return fmt.Errorf("unable to move file to permanet storage: %v", err)
|
||||
}
|
||||
go func() {
|
||||
start = time.Now()
|
||||
if err := ReUploadFileToPermanent(file, 1); err != nil {
|
||||
log.Warn().Any("file", file).Err(err).Msg("Unable to move file to permanet storage...")
|
||||
} else {
|
||||
// Recycle the temporary file
|
||||
file.Destination = models.AttachmentDstTemporary
|
||||
go DeleteFile(file)
|
||||
// Finish
|
||||
log.Info().Dur("elapsed", time.Since(start)).Uint("id", file.ID).Msg("A file post-analyze upload task was finished.")
|
||||
}
|
||||
}()
|
||||
} else {
|
||||
log.Info().Uint("id", file.ID).Msg("File is linked to exists one, skipping uploading...")
|
||||
}
|
||||
|
||||
// Recycle the temporary file
|
||||
file.Destination = models.AttachmentDstTemporary
|
||||
go DeleteFile(file)
|
||||
|
||||
// Finish
|
||||
log.Info().Dur("elapsed", time.Since(start)).Uint("id", file.ID).Bool("linked", linked).Msg("A file post-analyze upload task was finished.")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -3,7 +3,6 @@ package services
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"mime"
|
||||
"mime/multipart"
|
||||
"net/http"
|
||||
@ -15,8 +14,6 @@ import (
|
||||
"github.com/eko/gocache/lib/v4/cache"
|
||||
"github.com/eko/gocache/lib/v4/marshaler"
|
||||
"github.com/eko/gocache/lib/v4/store"
|
||||
"github.com/spf13/viper"
|
||||
"gorm.io/datatypes"
|
||||
|
||||
localCache "git.solsynth.dev/hypernet/paperclip/pkg/internal/cache"
|
||||
"git.solsynth.dev/hypernet/paperclip/pkg/internal/database"
|
||||
@ -54,7 +51,7 @@ func GetAttachmentByRID(rid string) (models.Attachment, error) {
|
||||
GetAttachmentCacheKey(rid),
|
||||
new(models.Attachment),
|
||||
); err == nil {
|
||||
return val.(models.Attachment), nil
|
||||
return *val.(*models.Attachment), nil
|
||||
}
|
||||
|
||||
var attachment models.Attachment
|
||||
@ -147,37 +144,6 @@ func NewAttachmentMetadata(tx *gorm.DB, user *sec.UserInfo, file *multipart.File
|
||||
return attachment, nil
|
||||
}
|
||||
|
||||
func NewAttachmentPlaceholder(tx *gorm.DB, user *sec.UserInfo, attachment models.Attachment) (models.Attachment, error) {
|
||||
attachment.Uuid = uuid.NewString()
|
||||
attachment.Rid = RandString(16)
|
||||
attachment.IsUploaded = false
|
||||
attachment.FileChunks = datatypes.JSONMap{}
|
||||
attachment.AccountID = user.ID
|
||||
|
||||
chunkSize := viper.GetInt64("performance.file_chunk_size")
|
||||
chunkCount := math.Ceil(float64(attachment.Size) / float64(chunkSize))
|
||||
for idx := 0; idx < int(chunkCount); idx++ {
|
||||
cid := RandString(8)
|
||||
attachment.FileChunks[cid] = idx
|
||||
}
|
||||
|
||||
// If the user didn't provide file mimetype manually, we have to detect it
|
||||
if len(attachment.MimeType) == 0 {
|
||||
if ext := filepath.Ext(attachment.Name); len(ext) > 0 {
|
||||
// Detect mimetype by file extensions
|
||||
attachment.MimeType = mime.TypeByExtension(ext)
|
||||
}
|
||||
}
|
||||
|
||||
if err := tx.Save(&attachment).Error; err != nil {
|
||||
return attachment, fmt.Errorf("failed to save attachment record: %v", err)
|
||||
} else {
|
||||
CacheAttachment(attachment)
|
||||
}
|
||||
|
||||
return attachment, nil
|
||||
}
|
||||
|
||||
func TryLinkAttachment(tx *gorm.DB, og models.Attachment, hash string) (bool, error) {
|
||||
prev, err := GetAttachmentByHash(hash)
|
||||
if err != nil {
|
||||
|
153
pkg/internal/services/fragments.go
Normal file
153
pkg/internal/services/fragments.go
Normal file
@ -0,0 +1,153 @@
|
||||
package services
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"mime"
|
||||
"mime/multipart"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"git.solsynth.dev/hypernet/nexus/pkg/nex/sec"
|
||||
localCache "git.solsynth.dev/hypernet/paperclip/pkg/internal/cache"
|
||||
"git.solsynth.dev/hypernet/paperclip/pkg/internal/database"
|
||||
"git.solsynth.dev/hypernet/paperclip/pkg/internal/models"
|
||||
"github.com/eko/gocache/lib/v4/cache"
|
||||
"github.com/eko/gocache/lib/v4/marshaler"
|
||||
"github.com/eko/gocache/lib/v4/store"
|
||||
"github.com/gofiber/fiber/v2"
|
||||
"github.com/google/uuid"
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
"github.com/spf13/viper"
|
||||
"gorm.io/datatypes"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
func GetAttachmentFragmentCacheKey(rid string) any {
|
||||
return fmt.Sprintf("attachment-fragment#%s", rid)
|
||||
}
|
||||
|
||||
func NewAttachmentFragment(tx *gorm.DB, user *sec.UserInfo, fragment models.AttachmentFragment) (models.AttachmentFragment, error) {
|
||||
if fragment.Fingerprint != nil {
|
||||
var existsFragment models.AttachmentFragment
|
||||
if err := database.C.Where(models.AttachmentFragment{
|
||||
Fingerprint: fragment.Fingerprint,
|
||||
AccountID: user.ID,
|
||||
}).First(&existsFragment).Error; err == nil {
|
||||
return existsFragment, nil
|
||||
}
|
||||
}
|
||||
|
||||
fragment.Uuid = uuid.NewString()
|
||||
fragment.Rid = RandString(16)
|
||||
fragment.FileChunks = datatypes.JSONMap{}
|
||||
fragment.AccountID = user.ID
|
||||
|
||||
chunkSize := viper.GetInt64("performance.file_chunk_size")
|
||||
chunkCount := math.Ceil(float64(fragment.Size) / float64(chunkSize))
|
||||
for idx := 0; idx < int(chunkCount); idx++ {
|
||||
cid := RandString(8)
|
||||
fragment.FileChunks[cid] = idx
|
||||
}
|
||||
|
||||
// If the user didn't provide file mimetype manually, we have to detect it
|
||||
if len(fragment.MimeType) == 0 {
|
||||
if ext := filepath.Ext(fragment.Name); len(ext) > 0 {
|
||||
// Detect mimetype by file extensions
|
||||
fragment.MimeType = mime.TypeByExtension(ext)
|
||||
}
|
||||
}
|
||||
|
||||
if err := tx.Save(&fragment).Error; err != nil {
|
||||
return fragment, fmt.Errorf("failed to save attachment record: %v", err)
|
||||
}
|
||||
|
||||
return fragment, nil
|
||||
}
|
||||
|
||||
func GetFragmentByRID(rid string) (models.AttachmentFragment, error) {
|
||||
cacheManager := cache.New[any](localCache.S)
|
||||
marshal := marshaler.New(cacheManager)
|
||||
contx := context.Background()
|
||||
|
||||
if val, err := marshal.Get(
|
||||
contx,
|
||||
GetAttachmentFragmentCacheKey(rid),
|
||||
new(models.AttachmentFragment),
|
||||
); err == nil {
|
||||
return *val.(*models.AttachmentFragment), nil
|
||||
}
|
||||
|
||||
var attachment models.AttachmentFragment
|
||||
if err := database.C.Where(models.AttachmentFragment{
|
||||
Rid: rid,
|
||||
}).Preload("Pool").First(&attachment).Error; err != nil {
|
||||
return attachment, err
|
||||
} else {
|
||||
CacheAttachmentFragment(attachment)
|
||||
}
|
||||
|
||||
return attachment, nil
|
||||
}
|
||||
|
||||
func CacheAttachmentFragment(item models.AttachmentFragment) {
|
||||
cacheManager := cache.New[any](localCache.S)
|
||||
marshal := marshaler.New(cacheManager)
|
||||
contx := context.Background()
|
||||
|
||||
_ = marshal.Set(
|
||||
contx,
|
||||
GetAttachmentFragmentCacheKey(item.Rid),
|
||||
item,
|
||||
store.WithExpiration(60*time.Minute),
|
||||
store.WithTags([]string{"attachment-fragment", fmt.Sprintf("user#%d", item.AccountID)}),
|
||||
)
|
||||
}
|
||||
|
||||
func UploadFragmentChunk(ctx *fiber.Ctx, cid string, file *multipart.FileHeader, meta models.AttachmentFragment) error {
|
||||
destMap := viper.GetStringMap("destinations.0")
|
||||
|
||||
var dest models.LocalDestination
|
||||
rawDest, _ := jsoniter.Marshal(destMap)
|
||||
_ = jsoniter.Unmarshal(rawDest, &dest)
|
||||
|
||||
tempPath := filepath.Join(dest.Path, fmt.Sprintf("%s.part%s.partial", meta.Uuid, cid))
|
||||
destPath := filepath.Join(dest.Path, fmt.Sprintf("%s.part%s", meta.Uuid, cid))
|
||||
if err := ctx.SaveFile(file, tempPath); err != nil {
|
||||
return err
|
||||
}
|
||||
return os.Rename(tempPath, destPath)
|
||||
}
|
||||
|
||||
func UploadFragmentChunkBytes(ctx *fiber.Ctx, cid string, raw []byte, meta models.AttachmentFragment) error {
|
||||
destMap := viper.GetStringMap("destinations.0")
|
||||
|
||||
var dest models.LocalDestination
|
||||
rawDest, _ := jsoniter.Marshal(destMap)
|
||||
_ = jsoniter.Unmarshal(rawDest, &dest)
|
||||
|
||||
tempPath := filepath.Join(dest.Path, fmt.Sprintf("%s.part%s.partial", meta.Uuid, cid))
|
||||
destPath := filepath.Join(dest.Path, fmt.Sprintf("%s.part%s", meta.Uuid, cid))
|
||||
if err := os.WriteFile(tempPath, raw, 0644); err != nil {
|
||||
return err
|
||||
}
|
||||
return os.Rename(tempPath, destPath)
|
||||
}
|
||||
|
||||
func CheckFragmentChunkExists(meta models.AttachmentFragment, cid string) bool {
|
||||
destMap := viper.GetStringMap("destinations.0")
|
||||
|
||||
var dest models.LocalDestination
|
||||
rawDest, _ := jsoniter.Marshal(destMap)
|
||||
_ = jsoniter.Unmarshal(rawDest, &dest)
|
||||
|
||||
path := filepath.Join(dest.Path, fmt.Sprintf("%s.part%s", meta.Uuid, cid))
|
||||
if _, err := os.Stat(path); errors.Is(err, os.ErrNotExist) {
|
||||
return false
|
||||
} else {
|
||||
return true
|
||||
}
|
||||
}
|
@ -11,7 +11,9 @@ import (
|
||||
"github.com/spf13/viper"
|
||||
)
|
||||
|
||||
func MergeFileChunks(meta models.Attachment, arrange []string) (models.Attachment, error) {
|
||||
func MergeFileChunks(meta models.AttachmentFragment, arrange []string) (models.Attachment, error) {
|
||||
attachment := meta.ToAttachment()
|
||||
|
||||
// Fetch destination from config
|
||||
destMap := viper.GetStringMapString("destinations.0")
|
||||
|
||||
@ -22,7 +24,7 @@ func MergeFileChunks(meta models.Attachment, arrange []string) (models.Attachmen
|
||||
destPath := filepath.Join(dest.Path, meta.Uuid)
|
||||
destFile, err := os.Create(destPath)
|
||||
if err != nil {
|
||||
return meta, err
|
||||
return attachment, err
|
||||
}
|
||||
defer destFile.Close()
|
||||
|
||||
@ -34,7 +36,7 @@ func MergeFileChunks(meta models.Attachment, arrange []string) (models.Attachmen
|
||||
chunkPath := filepath.Join(dest.Path, fmt.Sprintf("%s.part%s", meta.Uuid, chunk))
|
||||
chunkFile, err := os.Open(chunkPath)
|
||||
if err != nil {
|
||||
return meta, err
|
||||
return attachment, err
|
||||
}
|
||||
|
||||
defer chunkFile.Close() // Ensure the file is closed after reading
|
||||
@ -42,35 +44,36 @@ func MergeFileChunks(meta models.Attachment, arrange []string) (models.Attachmen
|
||||
for {
|
||||
n, err := chunkFile.Read(buf)
|
||||
if err != nil && err != io.EOF {
|
||||
return meta, err
|
||||
return attachment, err
|
||||
}
|
||||
if n == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
if _, err := destFile.Write(buf[:n]); err != nil {
|
||||
return meta, err
|
||||
return attachment, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Post-upload tasks
|
||||
meta.IsUploaded = true
|
||||
meta.FileChunks = nil
|
||||
if err := database.C.Save(&meta).Error; err != nil {
|
||||
return meta, err
|
||||
if err := database.C.Save(&attachment).Error; err != nil {
|
||||
return attachment, err
|
||||
}
|
||||
|
||||
CacheAttachment(meta)
|
||||
PublishAnalyzeTask(meta)
|
||||
CacheAttachment(attachment)
|
||||
PublishAnalyzeTask(attachment)
|
||||
|
||||
// Clean up: remove chunk files
|
||||
for _, chunk := range arrange {
|
||||
chunkPath := filepath.Join(dest.Path, fmt.Sprintf("%s.part%s", meta.Uuid, chunk))
|
||||
if err := os.Remove(chunkPath); err != nil {
|
||||
return meta, err
|
||||
return attachment, err
|
||||
}
|
||||
}
|
||||
|
||||
return meta, nil
|
||||
// Clean up: remove fragment record
|
||||
database.C.Delete(&meta)
|
||||
|
||||
return attachment, nil
|
||||
}
|
||||
|
@ -2,7 +2,6 @@ package services
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"mime/multipart"
|
||||
@ -35,65 +34,6 @@ func UploadFileToTemporary(ctx *fiber.Ctx, file *multipart.FileHeader, meta mode
|
||||
}
|
||||
}
|
||||
|
||||
func UploadChunkToTemporary(ctx *fiber.Ctx, cid string, file *multipart.FileHeader, meta models.Attachment) error {
|
||||
destMap := viper.GetStringMap("destinations.0")
|
||||
|
||||
var dest models.BaseDestination
|
||||
rawDest, _ := jsoniter.Marshal(destMap)
|
||||
_ = jsoniter.Unmarshal(rawDest, &dest)
|
||||
|
||||
switch dest.Type {
|
||||
case models.DestinationTypeLocal:
|
||||
var destConfigured models.LocalDestination
|
||||
_ = jsoniter.Unmarshal(rawDest, &destConfigured)
|
||||
tempPath := filepath.Join(destConfigured.Path, fmt.Sprintf("%s.part%s.partial", meta.Uuid, cid))
|
||||
destPath := filepath.Join(destConfigured.Path, fmt.Sprintf("%s.part%s", meta.Uuid, cid))
|
||||
if err := ctx.SaveFile(file, tempPath); err != nil {
|
||||
return err
|
||||
}
|
||||
return os.Rename(tempPath, destPath)
|
||||
default:
|
||||
return fmt.Errorf("invalid destination: unsupported protocol %s", dest.Type)
|
||||
}
|
||||
}
|
||||
|
||||
func UploadChunkToTemporaryWithRaw(ctx *fiber.Ctx, cid string, raw []byte, meta models.Attachment) error {
|
||||
destMap := viper.GetStringMap("destinations.0")
|
||||
|
||||
var dest models.BaseDestination
|
||||
rawDest, _ := jsoniter.Marshal(destMap)
|
||||
_ = jsoniter.Unmarshal(rawDest, &dest)
|
||||
|
||||
switch dest.Type {
|
||||
case models.DestinationTypeLocal:
|
||||
var destConfigured models.LocalDestination
|
||||
_ = jsoniter.Unmarshal(rawDest, &destConfigured)
|
||||
tempPath := filepath.Join(destConfigured.Path, fmt.Sprintf("%s.part%s.partial", meta.Uuid, cid))
|
||||
destPath := filepath.Join(destConfigured.Path, fmt.Sprintf("%s.part%s", meta.Uuid, cid))
|
||||
if err := os.WriteFile(tempPath, raw, 0644); err != nil {
|
||||
return err
|
||||
}
|
||||
return os.Rename(tempPath, destPath)
|
||||
default:
|
||||
return fmt.Errorf("invalid destination: unsupported protocol %s", dest.Type)
|
||||
}
|
||||
}
|
||||
|
||||
func CheckChunkExistsInTemporary(meta models.Attachment, cid string) bool {
|
||||
destMap := viper.GetStringMap("destinations.0")
|
||||
|
||||
var dest models.LocalDestination
|
||||
rawDest, _ := jsoniter.Marshal(destMap)
|
||||
_ = jsoniter.Unmarshal(rawDest, &dest)
|
||||
|
||||
path := filepath.Join(dest.Path, fmt.Sprintf("%s.part%s", meta.Uuid, cid))
|
||||
if _, err := os.Stat(path); errors.Is(err, os.ErrNotExist) {
|
||||
return false
|
||||
} else {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
func ReUploadFileToPermanent(meta models.Attachment, dst int) error {
|
||||
if dst == models.AttachmentDstTemporary || meta.Destination == dst {
|
||||
return fmt.Errorf("destnation cannot be reversed temporary or the same as the original")
|
||||
|
Reference in New Issue
Block a user