✨ Multipart file upload
This commit is contained in:
@ -2,6 +2,9 @@ package services
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/spf13/viper"
|
||||
"gorm.io/datatypes"
|
||||
"math"
|
||||
"mime"
|
||||
"mime/multipart"
|
||||
"net/http"
|
||||
@ -114,6 +117,38 @@ func NewAttachmentMetadata(tx *gorm.DB, user models.Account, file *multipart.Fil
|
||||
return attachment, nil
|
||||
}
|
||||
|
||||
func NewAttachmentPlaceholder(tx *gorm.DB, user models.Account, attachment models.Attachment) (models.Attachment, error) {
|
||||
attachment.Uuid = uuid.NewString()
|
||||
attachment.Rid = RandString(16)
|
||||
attachment.IsUploaded = false
|
||||
attachment.FileChunks = datatypes.JSONMap{}
|
||||
attachment.AccountID = user.ID
|
||||
|
||||
chunkSize := viper.GetInt64("performance.file_chunk_size")
|
||||
chunkCount := math.Ceil(float64(attachment.Size) / float64(chunkSize))
|
||||
for idx := 0; idx < int(chunkCount); idx++ {
|
||||
cid := RandString(8)
|
||||
attachment.FileChunks[cid] = idx
|
||||
}
|
||||
|
||||
// If the user didn't provide file mimetype manually, we have to detect it
|
||||
if len(attachment.MimeType) == 0 {
|
||||
if ext := filepath.Ext(attachment.Name); len(ext) > 0 {
|
||||
// Detect mimetype by file extensions
|
||||
attachment.MimeType = mime.TypeByExtension(ext)
|
||||
}
|
||||
}
|
||||
|
||||
if err := tx.Save(&attachment).Error; err != nil {
|
||||
return attachment, fmt.Errorf("failed to save attachment record: %v", err)
|
||||
} else {
|
||||
MaintainAttachmentCache()
|
||||
CacheAttachment(attachment)
|
||||
}
|
||||
|
||||
return attachment, nil
|
||||
}
|
||||
|
||||
func TryLinkAttachment(tx *gorm.DB, og models.Attachment, hash string) (bool, error) {
|
||||
prev, err := GetAttachmentByHash(hash)
|
||||
if err != nil {
|
||||
|
48
pkg/internal/services/merger.go
Normal file
48
pkg/internal/services/merger.go
Normal file
@ -0,0 +1,48 @@
|
||||
package services
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"git.solsynth.dev/hydrogen/paperclip/pkg/internal/database"
|
||||
"git.solsynth.dev/hydrogen/paperclip/pkg/internal/models"
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
"github.com/spf13/viper"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
)
|
||||
|
||||
func MergeFileChunks(meta models.Attachment, arrange []string) (models.Attachment, error) {
|
||||
destMap := viper.GetStringMap("destinations.temporary")
|
||||
|
||||
var dest models.LocalDestination
|
||||
rawDest, _ := jsoniter.Marshal(destMap)
|
||||
_ = jsoniter.Unmarshal(rawDest, &dest)
|
||||
|
||||
destPath := filepath.Join(dest.Path, meta.Uuid)
|
||||
destFile, err := os.Create(destPath)
|
||||
if err != nil {
|
||||
return meta, err
|
||||
}
|
||||
defer destFile.Close()
|
||||
|
||||
for _, chunk := range arrange {
|
||||
chunkPath := filepath.Join(dest.Path, fmt.Sprintf("%s.%s", meta.Uuid, chunk))
|
||||
chunkFile, err := os.Open(chunkPath)
|
||||
if err != nil {
|
||||
return meta, err
|
||||
}
|
||||
|
||||
_, err = io.Copy(destFile, chunkFile)
|
||||
if err != nil {
|
||||
_ = chunkFile.Close()
|
||||
return meta, err
|
||||
}
|
||||
|
||||
_ = chunkFile.Close()
|
||||
}
|
||||
|
||||
meta.IsUploaded = true
|
||||
database.C.Save(&meta)
|
||||
|
||||
return meta, nil
|
||||
}
|
@ -4,7 +4,7 @@ import (
|
||||
"math/rand"
|
||||
)
|
||||
|
||||
var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
|
||||
var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")
|
||||
|
||||
func RandString(length int) string {
|
||||
builder := make([]rune, length)
|
||||
|
@ -35,7 +35,7 @@ func StartConsumeDeletionTask() {
|
||||
}
|
||||
}
|
||||
|
||||
func RunMarkDeletionTask() {
|
||||
func RunMarkLifecycleDeletionTask() {
|
||||
var pools []models.AttachmentPool
|
||||
if err := database.C.Find(&pools).Error; err != nil {
|
||||
return
|
||||
@ -53,6 +53,7 @@ func RunMarkDeletionTask() {
|
||||
tx := database.C.
|
||||
Where("pool_id = ?", pool.ID).
|
||||
Where("created_at < ?", lifecycle).
|
||||
Where("cleaned_at IS NULL").
|
||||
Updates(&models.Attachment{CleanedAt: lo.ToPtr(time.Now())})
|
||||
log.Info().
|
||||
Str("pool", pool.Alias).
|
||||
@ -62,13 +63,26 @@ func RunMarkDeletionTask() {
|
||||
}
|
||||
}
|
||||
|
||||
func RunMarkMultipartDeletionTask() {
|
||||
lifecycle := time.Now().Add(-24 * time.Hour)
|
||||
tx := database.C.
|
||||
Where("created_at < ?", lifecycle).
|
||||
Where("is_uploaded = ?", false).
|
||||
Where("cleaned_at IS NULL").
|
||||
Updates(&models.Attachment{CleanedAt: lo.ToPtr(time.Now())})
|
||||
log.Info().
|
||||
Int64("count", tx.RowsAffected).
|
||||
Err(tx.Error).
|
||||
Msg("Marking attachments as clean needed due to multipart lifecycle...")
|
||||
}
|
||||
|
||||
func RunScheduleDeletionTask() {
|
||||
var attachments []models.Attachment
|
||||
if err := database.C.Where("cleaned_at IS NOT NULL").Find(&attachments).Error; err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
for idx, attachment := range attachments {
|
||||
for _, attachment := range attachments {
|
||||
if attachment.RefID != nil {
|
||||
continue
|
||||
}
|
||||
@ -76,8 +90,6 @@ func RunScheduleDeletionTask() {
|
||||
log.Error().
|
||||
Uint("id", attachment.ID).
|
||||
Msg("An error occurred when deleting marked clean up attachments...")
|
||||
} else {
|
||||
attachments[idx].CleanedAt = lo.ToPtr(time.Now())
|
||||
}
|
||||
}
|
||||
|
||||
@ -85,6 +97,20 @@ func RunScheduleDeletionTask() {
|
||||
}
|
||||
|
||||
func DeleteFile(meta models.Attachment) error {
|
||||
if !meta.IsUploaded {
|
||||
destMap := viper.GetStringMap("destinations.temporary")
|
||||
var dest models.LocalDestination
|
||||
rawDest, _ := jsoniter.Marshal(destMap)
|
||||
_ = jsoniter.Unmarshal(rawDest, &dest)
|
||||
|
||||
for cid := range meta.FileChunks {
|
||||
path := filepath.Join(dest.Path, fmt.Sprintf("%s.%s", meta.Uuid, cid))
|
||||
_ = os.Remove(path)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
var destMap map[string]any
|
||||
if meta.Destination == models.AttachmentDstTemporary {
|
||||
destMap = viper.GetStringMap("destinations.temporary")
|
||||
|
@ -1,8 +1,8 @@
|
||||
package services
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"mime/multipart"
|
||||
@ -29,12 +29,44 @@ func UploadFileToTemporary(ctx *fiber.Ctx, file *multipart.FileHeader, meta mode
|
||||
case models.DestinationTypeLocal:
|
||||
var destConfigured models.LocalDestination
|
||||
_ = jsoniter.Unmarshal(rawDest, &destConfigured)
|
||||
return UploadFileToLocal(destConfigured, ctx, file, meta)
|
||||
return ctx.SaveFile(file, filepath.Join(destConfigured.Path, meta.Uuid))
|
||||
default:
|
||||
return fmt.Errorf("invalid destination: unsupported protocol %s", dest.Type)
|
||||
}
|
||||
}
|
||||
|
||||
func UploadChunkToTemporary(ctx *fiber.Ctx, cid string, file *multipart.FileHeader, meta models.Attachment) error {
|
||||
destMap := viper.GetStringMap("destinations.temporary")
|
||||
|
||||
var dest models.BaseDestination
|
||||
rawDest, _ := jsoniter.Marshal(destMap)
|
||||
_ = jsoniter.Unmarshal(rawDest, &dest)
|
||||
|
||||
switch dest.Type {
|
||||
case models.DestinationTypeLocal:
|
||||
var destConfigured models.LocalDestination
|
||||
_ = jsoniter.Unmarshal(rawDest, &destConfigured)
|
||||
return ctx.SaveFile(file, filepath.Join(destConfigured.Path, fmt.Sprintf("%s.%s", meta.Uuid, cid)))
|
||||
default:
|
||||
return fmt.Errorf("invalid destination: unsupported protocol %s", dest.Type)
|
||||
}
|
||||
}
|
||||
|
||||
func CheckChunkExistsInTemporary(meta models.Attachment, cid string) bool {
|
||||
destMap := viper.GetStringMap("destinations.temporary")
|
||||
|
||||
var dest models.LocalDestination
|
||||
rawDest, _ := jsoniter.Marshal(destMap)
|
||||
_ = jsoniter.Unmarshal(rawDest, &dest)
|
||||
|
||||
path := filepath.Join(dest.Path, fmt.Sprintf("%s.%s", meta.Uuid, cid))
|
||||
if _, err := os.Stat(path); errors.Is(err, os.ErrNotExist) {
|
||||
return false
|
||||
} else {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
func ReUploadFileToPermanent(meta models.Attachment) error {
|
||||
if meta.Destination != models.AttachmentDstTemporary {
|
||||
return fmt.Errorf("attachment isn't in temporary storage, unable to process")
|
||||
@ -50,8 +82,8 @@ func ReUploadFileToPermanent(meta models.Attachment) error {
|
||||
|
||||
prevDestMap := viper.GetStringMap("destinations.temporary")
|
||||
|
||||
// Currently the temporary destination only support the local
|
||||
// So we can do this
|
||||
// Currently, the temporary destination only supports the local.
|
||||
// So we can do this.
|
||||
var prevDest models.LocalDestination
|
||||
prevRawDest, _ := jsoniter.Marshal(prevDestMap)
|
||||
_ = jsoniter.Unmarshal(prevRawDest, &prevDest)
|
||||
@ -111,39 +143,3 @@ func ReUploadFileToPermanent(meta models.Attachment) error {
|
||||
return fmt.Errorf("invalid destination: unsupported protocol %s", dest.Type)
|
||||
}
|
||||
}
|
||||
|
||||
func UploadFileToLocal(config models.LocalDestination, ctx *fiber.Ctx, file *multipart.FileHeader, meta models.Attachment) error {
|
||||
return ctx.SaveFile(file, filepath.Join(config.Path, meta.Uuid))
|
||||
}
|
||||
|
||||
func UploadFileToS3(config models.S3Destination, file *multipart.FileHeader, meta models.Attachment) error {
|
||||
header, err := file.Open()
|
||||
if err != nil {
|
||||
return fmt.Errorf("read upload file: %v", err)
|
||||
}
|
||||
defer header.Close()
|
||||
|
||||
buffer := bytes.NewBuffer(nil)
|
||||
if _, err := io.Copy(buffer, header); err != nil {
|
||||
return fmt.Errorf("create io reader for upload file: %v", err)
|
||||
}
|
||||
|
||||
client, err := minio.New(config.Endpoint, &minio.Options{
|
||||
Creds: credentials.NewStaticV4(config.SecretID, config.SecretKey, ""),
|
||||
Secure: config.EnableSSL,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to configure s3 client: %v", err)
|
||||
}
|
||||
|
||||
_, err = client.PutObject(context.Background(), config.Bucket, filepath.Join(config.Path, meta.Uuid), buffer, file.Size, minio.PutObjectOptions{
|
||||
ContentType: meta.MimeType,
|
||||
SendContentMd5: false,
|
||||
DisableContentSha256: true,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to upload file to s3: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
Reference in New Issue
Block a user