From 73b39a4fb5d0b79bdb29379f66222af9c28e3f18 Mon Sep 17 00:00:00 2001 From: LittleSheep Date: Tue, 20 Aug 2024 17:08:40 +0800 Subject: [PATCH 1/3] :sparkles: Multipart file upload --- .idea/workspace.xml | 29 +++-- pkg/internal/models/attachments.go | 4 + pkg/internal/server/api/attachments_api.go | 67 +---------- pkg/internal/server/api/index.go | 5 +- pkg/internal/server/api/up_direct_api.go | 75 +++++++++++++ pkg/internal/server/api/up_multipart_api.go | 117 ++++++++++++++++++++ pkg/internal/services/attachments.go | 35 ++++++ pkg/internal/services/merger.go | 48 ++++++++ pkg/internal/services/random_id.go | 2 +- pkg/internal/services/recycler.go | 34 +++++- pkg/internal/services/uploader.go | 76 ++++++------- pkg/main.go | 5 +- settings.toml | 3 + 13 files changed, 380 insertions(+), 120 deletions(-) create mode 100644 pkg/internal/server/api/up_direct_api.go create mode 100644 pkg/internal/server/api/up_multipart_api.go create mode 100644 pkg/internal/services/merger.go diff --git a/.idea/workspace.xml b/.idea/workspace.xml index 27937a4..1a3ab33 100644 --- a/.idea/workspace.xml +++ b/.idea/workspace.xml @@ -4,10 +4,20 @@ - @@ -138,7 +152,8 @@ - true diff --git a/pkg/internal/models/attachments.go b/pkg/internal/models/attachments.go index 223816b..d62eee5 100644 --- a/pkg/internal/models/attachments.go +++ b/pkg/internal/models/attachments.go @@ -25,14 +25,18 @@ type Attachment struct { Alternative string `json:"alt"` MimeType string `json:"mimetype"` HashCode string `json:"hash"` + UserHash *string `json:"user_hash"` Destination AttachmentDst `json:"destination"` RefCount int `json:"ref_count"` + FileChunks datatypes.JSONMap `json:"file_chunks"` + CleanedAt *time.Time `json:"cleaned_at"` Metadata datatypes.JSONMap `json:"metadata"` IsMature bool `json:"is_mature"` IsAnalyzed bool `json:"is_analyzed"` + IsUploaded bool `json:"is_uploaded"` IsSelfRef bool `json:"is_self_ref"` Ref *Attachment `json:"ref"` diff --git a/pkg/internal/server/api/attachments_api.go b/pkg/internal/server/api/attachments_api.go index 04c2107..0e63d51 100644 --- a/pkg/internal/server/api/attachments_api.go +++ b/pkg/internal/server/api/attachments_api.go @@ -22,6 +22,8 @@ func openAttachment(c *fiber.Ctx) error { metadata, err := services.GetAttachmentByRID(id) if err != nil { return fiber.NewError(fiber.StatusNotFound) + } else if !metadata.IsUploaded { + return fiber.NewError(fiber.StatusNotFound, "file is in uploading progress, please wait until all chunk uploaded") } var destMap map[string]any @@ -78,71 +80,6 @@ func getAttachmentMeta(c *fiber.Ctx) error { return c.JSON(metadata) } -func createAttachment(c *fiber.Ctx) error { - if err := gap.H.EnsureAuthenticated(c); err != nil { - return err - } - user := c.Locals("user").(models.Account) - - poolAlias := c.FormValue("pool") - if len(poolAlias) == 0 { - poolAlias = c.FormValue("usage") - } - aliasingMap := viper.GetStringMapString("pools.aliases") - if val, ok := aliasingMap[poolAlias]; ok { - poolAlias = val - } - - pool, err := services.GetAttachmentPoolByAlias(poolAlias) - if err != nil { - return fiber.NewError(fiber.StatusBadRequest, fmt.Sprintf("unable to get attachment pool info: %v", err)) - } - - file, err := c.FormFile("file") - if err != nil { - return err - } - - if err = gap.H.EnsureGrantedPerm(c, "CreateAttachments", file.Size); err != nil { - return err - } else if pool.Config.Data().MaxFileSize != nil && file.Size > *pool.Config.Data().MaxFileSize { - return fiber.NewError(fiber.StatusBadRequest, fmt.Sprintf("attachment pool %s doesn't allow file larger than %d", pool.Alias, *pool.Config.Data().MaxFileSize)) - } - - usermeta := make(map[string]any) - _ = jsoniter.UnmarshalFromString(c.FormValue("metadata"), &usermeta) - - tx := database.C.Begin() - - metadata, err := services.NewAttachmentMetadata(tx, user, file, models.Attachment{ - Alternative: c.FormValue("alt"), - MimeType: c.FormValue("mimetype"), - Metadata: usermeta, - IsMature: len(c.FormValue("mature")) > 0, - IsAnalyzed: false, - Destination: models.AttachmentDstTemporary, - Pool: &pool, - PoolID: &pool.ID, - }) - if err != nil { - tx.Rollback() - return fiber.NewError(fiber.StatusBadRequest, err.Error()) - } - - if err := services.UploadFileToTemporary(c, file, metadata); err != nil { - tx.Rollback() - return fiber.NewError(fiber.StatusBadRequest, err.Error()) - } - - tx.Commit() - - metadata.Account = user - metadata.Pool = &pool - services.PublishAnalyzeTask(metadata) - - return c.JSON(metadata) -} - func updateAttachmentMeta(c *fiber.Ctx) error { id, _ := c.ParamsInt("id", 0) diff --git a/pkg/internal/server/api/index.go b/pkg/internal/server/api/index.go index e0254be..5f6a307 100644 --- a/pkg/internal/server/api/index.go +++ b/pkg/internal/server/api/index.go @@ -16,10 +16,13 @@ func MapAPIs(app *fiber.App, baseURL string) { api.Get("/attachments", listAttachment) api.Get("/attachments/:id/meta", getAttachmentMeta) api.Get("/attachments/:id", openAttachment) - api.Post("/attachments", createAttachment) + api.Post("/attachments", createAttachmentDirectly) api.Put("/attachments/:id", updateAttachmentMeta) api.Delete("/attachments/:id", deleteAttachment) + api.Post("/attachments/multipart", createAttachmentMultipartPlaceholder) + api.Post("/attachments/multipart/:file/:chunk", uploadAttachmentMultipart) + api.Get("/stickers/manifest", listStickerManifest) api.Get("/stickers/packs", listStickerPacks) api.Post("/stickers/packs", createStickerPack) diff --git a/pkg/internal/server/api/up_direct_api.go b/pkg/internal/server/api/up_direct_api.go new file mode 100644 index 0000000..cb98b38 --- /dev/null +++ b/pkg/internal/server/api/up_direct_api.go @@ -0,0 +1,75 @@ +package api + +import ( + "fmt" + "git.solsynth.dev/hydrogen/paperclip/pkg/internal/database" + "git.solsynth.dev/hydrogen/paperclip/pkg/internal/gap" + "git.solsynth.dev/hydrogen/paperclip/pkg/internal/models" + "git.solsynth.dev/hydrogen/paperclip/pkg/internal/services" + "github.com/gofiber/fiber/v2" + jsoniter "github.com/json-iterator/go" + "github.com/spf13/viper" +) + +func createAttachmentDirectly(c *fiber.Ctx) error { + if err := gap.H.EnsureAuthenticated(c); err != nil { + return err + } + user := c.Locals("user").(models.Account) + + poolAlias := c.FormValue("pool") + + aliasingMap := viper.GetStringMapString("pools.aliases") + if val, ok := aliasingMap[poolAlias]; ok { + poolAlias = val + } + + pool, err := services.GetAttachmentPoolByAlias(poolAlias) + if err != nil { + return fiber.NewError(fiber.StatusBadRequest, fmt.Sprintf("unable to get attachment pool info: %v", err)) + } + + file, err := c.FormFile("file") + if err != nil { + return err + } + + if err = gap.H.EnsureGrantedPerm(c, "CreateAttachments", file.Size); err != nil { + return err + } else if pool.Config.Data().MaxFileSize != nil && file.Size > *pool.Config.Data().MaxFileSize { + return fiber.NewError(fiber.StatusBadRequest, fmt.Sprintf("attachment pool %s doesn't allow file larger than %d", pool.Alias, *pool.Config.Data().MaxFileSize)) + } + + usermeta := make(map[string]any) + _ = jsoniter.UnmarshalFromString(c.FormValue("metadata"), &usermeta) + + tx := database.C.Begin() + + metadata, err := services.NewAttachmentMetadata(tx, user, file, models.Attachment{ + Alternative: c.FormValue("alt"), + MimeType: c.FormValue("mimetype"), + Metadata: usermeta, + IsMature: len(c.FormValue("mature")) > 0, + IsAnalyzed: false, + Destination: models.AttachmentDstTemporary, + Pool: &pool, + PoolID: &pool.ID, + }) + if err != nil { + tx.Rollback() + return fiber.NewError(fiber.StatusBadRequest, err.Error()) + } + + if err := services.UploadFileToTemporary(c, file, metadata); err != nil { + tx.Rollback() + return fiber.NewError(fiber.StatusBadRequest, err.Error()) + } + + tx.Commit() + + metadata.Account = user + metadata.Pool = &pool + services.PublishAnalyzeTask(metadata) + + return c.JSON(metadata) +} diff --git a/pkg/internal/server/api/up_multipart_api.go b/pkg/internal/server/api/up_multipart_api.go new file mode 100644 index 0000000..991b7bb --- /dev/null +++ b/pkg/internal/server/api/up_multipart_api.go @@ -0,0 +1,117 @@ +package api + +import ( + "fmt" + "git.solsynth.dev/hydrogen/paperclip/pkg/internal/database" + "git.solsynth.dev/hydrogen/paperclip/pkg/internal/gap" + "git.solsynth.dev/hydrogen/paperclip/pkg/internal/models" + "git.solsynth.dev/hydrogen/paperclip/pkg/internal/services" + "github.com/gofiber/fiber/v2" + "github.com/spf13/viper" +) + +func createAttachmentMultipartPlaceholder(c *fiber.Ctx) error { + if err := gap.H.EnsureAuthenticated(c); err != nil { + return err + } + user := c.Locals("user").(models.Account) + + var data struct { + Pool string `json:"pool" validate:"required"` + Size int64 `json:"size" validate:"required"` + Hash string `json:"hash" validate:"required"` + Alternative string `json:"alt"` + MimeType string `json:"mimetype"` + Metadata map[string]any `json:"metadata"` + IsMature bool `json:"is_mature"` + } + + aliasingMap := viper.GetStringMapString("pools.aliases") + if val, ok := aliasingMap[data.Pool]; ok { + data.Pool = val + } + + pool, err := services.GetAttachmentPoolByAlias(data.Pool) + if err != nil { + return fiber.NewError(fiber.StatusBadRequest, fmt.Sprintf("unable to get attachment pool info: %v", err)) + } + + if err = gap.H.EnsureGrantedPerm(c, "CreateAttachments", data.Size); err != nil { + return err + } else if pool.Config.Data().MaxFileSize != nil && *pool.Config.Data().MaxFileSize > data.Size { + return fiber.NewError(fiber.StatusBadRequest, fmt.Sprintf("attachment pool %s doesn't allow file larger than %d", pool.Alias, *pool.Config.Data().MaxFileSize)) + } + + metadata, err := services.NewAttachmentPlaceholder(database.C, user, models.Attachment{ + UserHash: &data.Hash, + Alternative: data.Alternative, + MimeType: data.MimeType, + Metadata: data.Metadata, + IsMature: data.IsMature, + IsAnalyzed: false, + Destination: models.AttachmentDstTemporary, + Pool: &pool, + PoolID: &pool.ID, + }) + if err != nil { + return fiber.NewError(fiber.StatusBadRequest, err.Error()) + } + + return c.JSON(metadata) +} + +func uploadAttachmentMultipart(c *fiber.Ctx) error { + if err := gap.H.EnsureAuthenticated(c); err != nil { + return err + } + user := c.Locals("user").(models.Account) + + rid := c.Params("file") + cid := c.Params("chunk") + + file, err := c.FormFile("file") + if err != nil { + return err + } else if file.Size > viper.GetInt64("performance.file_chunk_size") { + return fiber.NewError(fiber.StatusBadRequest, "file is too large for one chunk") + } + + meta, err := services.GetAttachmentByRID(rid) + if err != nil { + return fiber.NewError(fiber.StatusNotFound, fmt.Sprintf("attachment was not found: %v", err)) + } else if user.ID != meta.AccountID { + return fiber.NewError(fiber.StatusForbidden, "you are not authorized to upload this attachment") + } + + if _, ok := meta.FileChunks[cid]; !ok { + return fiber.NewError(fiber.StatusNotFound, fmt.Sprintf("chunk %s was not found", cid)) + } else if services.CheckChunkExistsInTemporary(meta, cid) { + return fiber.NewError(fiber.StatusNotFound, fmt.Sprintf("chunk %s was uploaded", cid)) + } + + if err := services.UploadChunkToTemporary(c, cid, file, meta); err != nil { + return fiber.NewError(fiber.StatusBadRequest, err.Error()) + } + + chunkArrange := make([]string, len(meta.FileChunks)) + isAllUploaded := true + for cid, idx := range meta.FileChunks { + if !services.CheckChunkExistsInTemporary(meta, cid) { + isAllUploaded = false + break + } else if val, ok := idx.(int); ok { + chunkArrange[val] = cid + } + } + + if !isAllUploaded { + database.C.Save(&meta) + return c.JSON(meta) + } + + if meta, err = services.MergeFileChunks(meta, chunkArrange); err != nil { + return fiber.NewError(fiber.StatusInternalServerError, err.Error()) + } else { + return c.JSON(meta) + } +} diff --git a/pkg/internal/services/attachments.go b/pkg/internal/services/attachments.go index d18f932..c3f2cac 100644 --- a/pkg/internal/services/attachments.go +++ b/pkg/internal/services/attachments.go @@ -2,6 +2,9 @@ package services import ( "fmt" + "github.com/spf13/viper" + "gorm.io/datatypes" + "math" "mime" "mime/multipart" "net/http" @@ -114,6 +117,38 @@ func NewAttachmentMetadata(tx *gorm.DB, user models.Account, file *multipart.Fil return attachment, nil } +func NewAttachmentPlaceholder(tx *gorm.DB, user models.Account, attachment models.Attachment) (models.Attachment, error) { + attachment.Uuid = uuid.NewString() + attachment.Rid = RandString(16) + attachment.IsUploaded = false + attachment.FileChunks = datatypes.JSONMap{} + attachment.AccountID = user.ID + + chunkSize := viper.GetInt64("performance.file_chunk_size") + chunkCount := math.Ceil(float64(attachment.Size) / float64(chunkSize)) + for idx := 0; idx < int(chunkCount); idx++ { + cid := RandString(8) + attachment.FileChunks[cid] = idx + } + + // If the user didn't provide file mimetype manually, we have to detect it + if len(attachment.MimeType) == 0 { + if ext := filepath.Ext(attachment.Name); len(ext) > 0 { + // Detect mimetype by file extensions + attachment.MimeType = mime.TypeByExtension(ext) + } + } + + if err := tx.Save(&attachment).Error; err != nil { + return attachment, fmt.Errorf("failed to save attachment record: %v", err) + } else { + MaintainAttachmentCache() + CacheAttachment(attachment) + } + + return attachment, nil +} + func TryLinkAttachment(tx *gorm.DB, og models.Attachment, hash string) (bool, error) { prev, err := GetAttachmentByHash(hash) if err != nil { diff --git a/pkg/internal/services/merger.go b/pkg/internal/services/merger.go new file mode 100644 index 0000000..24eefc2 --- /dev/null +++ b/pkg/internal/services/merger.go @@ -0,0 +1,48 @@ +package services + +import ( + "fmt" + "git.solsynth.dev/hydrogen/paperclip/pkg/internal/database" + "git.solsynth.dev/hydrogen/paperclip/pkg/internal/models" + jsoniter "github.com/json-iterator/go" + "github.com/spf13/viper" + "io" + "os" + "path/filepath" +) + +func MergeFileChunks(meta models.Attachment, arrange []string) (models.Attachment, error) { + destMap := viper.GetStringMap("destinations.temporary") + + var dest models.LocalDestination + rawDest, _ := jsoniter.Marshal(destMap) + _ = jsoniter.Unmarshal(rawDest, &dest) + + destPath := filepath.Join(dest.Path, meta.Uuid) + destFile, err := os.Create(destPath) + if err != nil { + return meta, err + } + defer destFile.Close() + + for _, chunk := range arrange { + chunkPath := filepath.Join(dest.Path, fmt.Sprintf("%s.%s", meta.Uuid, chunk)) + chunkFile, err := os.Open(chunkPath) + if err != nil { + return meta, err + } + + _, err = io.Copy(destFile, chunkFile) + if err != nil { + _ = chunkFile.Close() + return meta, err + } + + _ = chunkFile.Close() + } + + meta.IsUploaded = true + database.C.Save(&meta) + + return meta, nil +} diff --git a/pkg/internal/services/random_id.go b/pkg/internal/services/random_id.go index 5c44a4d..16676de 100644 --- a/pkg/internal/services/random_id.go +++ b/pkg/internal/services/random_id.go @@ -4,7 +4,7 @@ import ( "math/rand" ) -var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") +var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789") func RandString(length int) string { builder := make([]rune, length) diff --git a/pkg/internal/services/recycler.go b/pkg/internal/services/recycler.go index f22112d..87ce3f5 100644 --- a/pkg/internal/services/recycler.go +++ b/pkg/internal/services/recycler.go @@ -35,7 +35,7 @@ func StartConsumeDeletionTask() { } } -func RunMarkDeletionTask() { +func RunMarkLifecycleDeletionTask() { var pools []models.AttachmentPool if err := database.C.Find(&pools).Error; err != nil { return @@ -53,6 +53,7 @@ func RunMarkDeletionTask() { tx := database.C. Where("pool_id = ?", pool.ID). Where("created_at < ?", lifecycle). + Where("cleaned_at IS NULL"). Updates(&models.Attachment{CleanedAt: lo.ToPtr(time.Now())}) log.Info(). Str("pool", pool.Alias). @@ -62,13 +63,26 @@ func RunMarkDeletionTask() { } } +func RunMarkMultipartDeletionTask() { + lifecycle := time.Now().Add(-24 * time.Hour) + tx := database.C. + Where("created_at < ?", lifecycle). + Where("is_uploaded = ?", false). + Where("cleaned_at IS NULL"). + Updates(&models.Attachment{CleanedAt: lo.ToPtr(time.Now())}) + log.Info(). + Int64("count", tx.RowsAffected). + Err(tx.Error). + Msg("Marking attachments as clean needed due to multipart lifecycle...") +} + func RunScheduleDeletionTask() { var attachments []models.Attachment if err := database.C.Where("cleaned_at IS NOT NULL").Find(&attachments).Error; err != nil { return } - for idx, attachment := range attachments { + for _, attachment := range attachments { if attachment.RefID != nil { continue } @@ -76,8 +90,6 @@ func RunScheduleDeletionTask() { log.Error(). Uint("id", attachment.ID). Msg("An error occurred when deleting marked clean up attachments...") - } else { - attachments[idx].CleanedAt = lo.ToPtr(time.Now()) } } @@ -85,6 +97,20 @@ func RunScheduleDeletionTask() { } func DeleteFile(meta models.Attachment) error { + if !meta.IsUploaded { + destMap := viper.GetStringMap("destinations.temporary") + var dest models.LocalDestination + rawDest, _ := jsoniter.Marshal(destMap) + _ = jsoniter.Unmarshal(rawDest, &dest) + + for cid := range meta.FileChunks { + path := filepath.Join(dest.Path, fmt.Sprintf("%s.%s", meta.Uuid, cid)) + _ = os.Remove(path) + } + + return nil + } + var destMap map[string]any if meta.Destination == models.AttachmentDstTemporary { destMap = viper.GetStringMap("destinations.temporary") diff --git a/pkg/internal/services/uploader.go b/pkg/internal/services/uploader.go index cda4362..bbbb8bf 100644 --- a/pkg/internal/services/uploader.go +++ b/pkg/internal/services/uploader.go @@ -1,8 +1,8 @@ package services import ( - "bytes" "context" + "errors" "fmt" "io" "mime/multipart" @@ -29,12 +29,44 @@ func UploadFileToTemporary(ctx *fiber.Ctx, file *multipart.FileHeader, meta mode case models.DestinationTypeLocal: var destConfigured models.LocalDestination _ = jsoniter.Unmarshal(rawDest, &destConfigured) - return UploadFileToLocal(destConfigured, ctx, file, meta) + return ctx.SaveFile(file, filepath.Join(destConfigured.Path, meta.Uuid)) default: return fmt.Errorf("invalid destination: unsupported protocol %s", dest.Type) } } +func UploadChunkToTemporary(ctx *fiber.Ctx, cid string, file *multipart.FileHeader, meta models.Attachment) error { + destMap := viper.GetStringMap("destinations.temporary") + + var dest models.BaseDestination + rawDest, _ := jsoniter.Marshal(destMap) + _ = jsoniter.Unmarshal(rawDest, &dest) + + switch dest.Type { + case models.DestinationTypeLocal: + var destConfigured models.LocalDestination + _ = jsoniter.Unmarshal(rawDest, &destConfigured) + return ctx.SaveFile(file, filepath.Join(destConfigured.Path, fmt.Sprintf("%s.%s", meta.Uuid, cid))) + default: + return fmt.Errorf("invalid destination: unsupported protocol %s", dest.Type) + } +} + +func CheckChunkExistsInTemporary(meta models.Attachment, cid string) bool { + destMap := viper.GetStringMap("destinations.temporary") + + var dest models.LocalDestination + rawDest, _ := jsoniter.Marshal(destMap) + _ = jsoniter.Unmarshal(rawDest, &dest) + + path := filepath.Join(dest.Path, fmt.Sprintf("%s.%s", meta.Uuid, cid)) + if _, err := os.Stat(path); errors.Is(err, os.ErrNotExist) { + return false + } else { + return true + } +} + func ReUploadFileToPermanent(meta models.Attachment) error { if meta.Destination != models.AttachmentDstTemporary { return fmt.Errorf("attachment isn't in temporary storage, unable to process") @@ -50,8 +82,8 @@ func ReUploadFileToPermanent(meta models.Attachment) error { prevDestMap := viper.GetStringMap("destinations.temporary") - // Currently the temporary destination only support the local - // So we can do this + // Currently, the temporary destination only supports the local. + // So we can do this. var prevDest models.LocalDestination prevRawDest, _ := jsoniter.Marshal(prevDestMap) _ = jsoniter.Unmarshal(prevRawDest, &prevDest) @@ -111,39 +143,3 @@ func ReUploadFileToPermanent(meta models.Attachment) error { return fmt.Errorf("invalid destination: unsupported protocol %s", dest.Type) } } - -func UploadFileToLocal(config models.LocalDestination, ctx *fiber.Ctx, file *multipart.FileHeader, meta models.Attachment) error { - return ctx.SaveFile(file, filepath.Join(config.Path, meta.Uuid)) -} - -func UploadFileToS3(config models.S3Destination, file *multipart.FileHeader, meta models.Attachment) error { - header, err := file.Open() - if err != nil { - return fmt.Errorf("read upload file: %v", err) - } - defer header.Close() - - buffer := bytes.NewBuffer(nil) - if _, err := io.Copy(buffer, header); err != nil { - return fmt.Errorf("create io reader for upload file: %v", err) - } - - client, err := minio.New(config.Endpoint, &minio.Options{ - Creds: credentials.NewStaticV4(config.SecretID, config.SecretKey, ""), - Secure: config.EnableSSL, - }) - if err != nil { - return fmt.Errorf("unable to configure s3 client: %v", err) - } - - _, err = client.PutObject(context.Background(), config.Bucket, filepath.Join(config.Path, meta.Uuid), buffer, file.Size, minio.PutObjectOptions{ - ContentType: meta.MimeType, - SendContentMd5: false, - DisableContentSha256: true, - }) - if err != nil { - return fmt.Errorf("unable to upload file to s3: %v", err) - } - - return nil -} diff --git a/pkg/main.go b/pkg/main.go index 3987dc7..de0f98d 100644 --- a/pkg/main.go +++ b/pkg/main.go @@ -59,7 +59,8 @@ func main() { // Configure timed tasks quartz := cron.New(cron.WithLogger(cron.VerbosePrintfLogger(&log.Logger))) quartz.AddFunc("@every 60m", services.DoAutoDatabaseCleanup) - quartz.AddFunc("@every 60m", services.RunMarkDeletionTask) + quartz.AddFunc("@every 60m", services.RunMarkLifecycleDeletionTask) + quartz.AddFunc("@every 60m", services.RunMarkMultipartDeletionTask) quartz.AddFunc("@midnight", services.RunScheduleDeletionTask) quartz.Start() @@ -75,7 +76,7 @@ func main() { log.Info().Msgf("Paperclip v%s is started...", pkg.AppVersion) services.ScanUnanalyzedFileFromDatabase() - services.RunMarkDeletionTask() + services.RunMarkLifecycleDeletionTask() quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) diff --git a/settings.toml b/settings.toml index afdd5fe..3c82f1f 100644 --- a/settings.toml +++ b/settings.toml @@ -29,6 +29,9 @@ cookie_samesite = "Lax" access_token_duration = 300 refresh_token_duration = 2592000 +[performance] +file_chunk_size = 5242880 + [database] dsn = "host=localhost user=postgres password=password dbname=hy_paperclip port=5432 sslmode=disable" prefix = "paperclip_" From 37c47f983957c1defccbcf1b34b8c11bed7987c8 Mon Sep 17 00:00:00 2001 From: LittleSheep Date: Tue, 20 Aug 2024 19:17:43 +0800 Subject: [PATCH 2/3] :sparkles: Make it more implementable --- .idea/workspace.xml | 19 ++++++------------- pkg/internal/models/attachments.go | 1 - pkg/internal/server/api/up_multipart_api.go | 7 ++++--- pkg/internal/services/analyzer.go | 14 ++++++++------ pkg/internal/services/merger.go | 2 ++ settings.toml | 2 +- 6 files changed, 21 insertions(+), 24 deletions(-) diff --git a/.idea/workspace.xml b/.idea/workspace.xml index 1a3ab33..41a8899 100644 --- a/.idea/workspace.xml +++ b/.idea/workspace.xml @@ -4,19 +4,12 @@ - - - - + - - - - - - - + + + - @@ -153,7 +145,8 @@ - true diff --git a/pkg/internal/models/attachments.go b/pkg/internal/models/attachments.go index d62eee5..ca5c960 100644 --- a/pkg/internal/models/attachments.go +++ b/pkg/internal/models/attachments.go @@ -25,7 +25,6 @@ type Attachment struct { Alternative string `json:"alt"` MimeType string `json:"mimetype"` HashCode string `json:"hash"` - UserHash *string `json:"user_hash"` Destination AttachmentDst `json:"destination"` RefCount int `json:"ref_count"` diff --git a/pkg/internal/server/api/up_multipart_api.go b/pkg/internal/server/api/up_multipart_api.go index 991b7bb..8b17573 100644 --- a/pkg/internal/server/api/up_multipart_api.go +++ b/pkg/internal/server/api/up_multipart_api.go @@ -19,7 +19,6 @@ func createAttachmentMultipartPlaceholder(c *fiber.Ctx) error { var data struct { Pool string `json:"pool" validate:"required"` Size int64 `json:"size" validate:"required"` - Hash string `json:"hash" validate:"required"` Alternative string `json:"alt"` MimeType string `json:"mimetype"` Metadata map[string]any `json:"metadata"` @@ -43,7 +42,6 @@ func createAttachmentMultipartPlaceholder(c *fiber.Ctx) error { } metadata, err := services.NewAttachmentPlaceholder(database.C, user, models.Attachment{ - UserHash: &data.Hash, Alternative: data.Alternative, MimeType: data.MimeType, Metadata: data.Metadata, @@ -57,7 +55,10 @@ func createAttachmentMultipartPlaceholder(c *fiber.Ctx) error { return fiber.NewError(fiber.StatusBadRequest, err.Error()) } - return c.JSON(metadata) + return c.JSON(fiber.Map{ + "chunk_size": viper.GetInt64("performance.file_chunk_size"), + "meta": metadata, + }) } func uploadAttachmentMultipart(c *fiber.Ctx) error { diff --git a/pkg/internal/services/analyzer.go b/pkg/internal/services/analyzer.go index caa28ca..adebacd 100644 --- a/pkg/internal/services/analyzer.go +++ b/pkg/internal/services/analyzer.go @@ -99,6 +99,14 @@ func AnalyzeAttachment(file models.Attachment) error { var start time.Time + if len(file.HashCode) == 0 { + if hash, err := HashAttachment(file); err != nil { + return err + } else { + file.HashCode = hash + } + } + // Do analyze jobs if !file.IsAnalyzed || len(file.HashCode) == 0 { destMap := viper.GetStringMap("destinations.temporary") @@ -158,12 +166,6 @@ func AnalyzeAttachment(file models.Attachment) error { "color_space": stream.ColorSpace, } } - - if hash, err := HashAttachment(file); err != nil { - return err - } else { - file.HashCode = hash - } } tx := database.C.Begin() diff --git a/pkg/internal/services/merger.go b/pkg/internal/services/merger.go index 24eefc2..a6821b3 100644 --- a/pkg/internal/services/merger.go +++ b/pkg/internal/services/merger.go @@ -44,5 +44,7 @@ func MergeFileChunks(meta models.Attachment, arrange []string) (models.Attachmen meta.IsUploaded = true database.C.Save(&meta) + PublishAnalyzeTask(meta) + return meta, nil } diff --git a/settings.toml b/settings.toml index 3c82f1f..35ebb20 100644 --- a/settings.toml +++ b/settings.toml @@ -30,7 +30,7 @@ access_token_duration = 300 refresh_token_duration = 2592000 [performance] -file_chunk_size = 5242880 +file_chunk_size = 26214400 [database] dsn = "host=localhost user=postgres password=password dbname=hy_paperclip port=5432 sslmode=disable" From 7a8fa116d324fbd238cda8733d77dd56340f1fe3 Mon Sep 17 00:00:00 2001 From: LittleSheep Date: Tue, 20 Aug 2024 22:55:58 +0800 Subject: [PATCH 3/3] :bug: Bug fixes and optimization --- .idea/workspace.xml | 57 ++++++++++----------- pkg/internal/server/api/up_multipart_api.go | 19 +++++-- pkg/internal/services/analyzer.go | 9 +++- pkg/internal/services/merger.go | 8 +++ pkg/internal/services/recycler.go | 2 +- 5 files changed, 59 insertions(+), 36 deletions(-) diff --git a/.idea/workspace.xml b/.idea/workspace.xml index 41a8899..fcee6f4 100644 --- a/.idea/workspace.xml +++ b/.idea/workspace.xml @@ -4,13 +4,12 @@ - + - - + - { - "keyToString": { - "DefaultGoTemplateProperty": "Go File", - "Go Build.Backend.executor": "Run", - "Go 构建.Backend.executor": "Run", - "RunOnceActivity.ShowReadmeOnStart": "true", - "RunOnceActivity.go.formatter.settings.were.checked": "true", - "RunOnceActivity.go.migrated.go.modules.settings": "true", - "RunOnceActivity.go.modules.automatic.dependencies.download": "true", - "RunOnceActivity.go.modules.go.list.on.any.changes.was.set": "true", - "git-widget-placeholder": "feature/multipart-upload", - "go.import.settings.migrated": "true", - "go.sdk.automatically.set": "true", - "last_opened_file_path": "/Users/littlesheep/Documents/Projects/Hydrogen/Paperclip/pkg/internal/grpc", - "node.js.detected.package.eslint": "true", - "node.js.selected.package.eslint": "(autodetect)", - "nodejs_package_manager_path": "npm", - "run.code.analysis.last.selected.profile": "pProject Default", - "settings.editor.selected.configurable": "preferences.pluginManager", - "vue.rearranger.settings.migration": "true" + +}]]> @@ -121,7 +120,6 @@ - @@ -146,7 +144,8 @@ - true diff --git a/pkg/internal/server/api/up_multipart_api.go b/pkg/internal/server/api/up_multipart_api.go index 8b17573..46e9985 100644 --- a/pkg/internal/server/api/up_multipart_api.go +++ b/pkg/internal/server/api/up_multipart_api.go @@ -1,10 +1,12 @@ package api import ( + "encoding/json" "fmt" "git.solsynth.dev/hydrogen/paperclip/pkg/internal/database" "git.solsynth.dev/hydrogen/paperclip/pkg/internal/gap" "git.solsynth.dev/hydrogen/paperclip/pkg/internal/models" + "git.solsynth.dev/hydrogen/paperclip/pkg/internal/server/exts" "git.solsynth.dev/hydrogen/paperclip/pkg/internal/services" "github.com/gofiber/fiber/v2" "github.com/spf13/viper" @@ -19,12 +21,17 @@ func createAttachmentMultipartPlaceholder(c *fiber.Ctx) error { var data struct { Pool string `json:"pool" validate:"required"` Size int64 `json:"size" validate:"required"` + FileName string `json:"name" validate:"required"` Alternative string `json:"alt"` MimeType string `json:"mimetype"` Metadata map[string]any `json:"metadata"` IsMature bool `json:"is_mature"` } + if err := exts.BindAndValidate(c, &data); err != nil { + return err + } + aliasingMap := viper.GetStringMapString("pools.aliases") if val, ok := aliasingMap[data.Pool]; ok { data.Pool = val @@ -42,6 +49,8 @@ func createAttachmentMultipartPlaceholder(c *fiber.Ctx) error { } metadata, err := services.NewAttachmentPlaceholder(database.C, user, models.Attachment{ + Name: data.FileName, + Size: data.Size, Alternative: data.Alternative, MimeType: data.MimeType, Metadata: data.Metadata, @@ -56,8 +65,9 @@ func createAttachmentMultipartPlaceholder(c *fiber.Ctx) error { } return c.JSON(fiber.Map{ - "chunk_size": viper.GetInt64("performance.file_chunk_size"), - "meta": metadata, + "chunk_size": viper.GetInt64("performance.file_chunk_size"), + "chunk_count": len(metadata.FileChunks), + "meta": metadata, }) } @@ -100,8 +110,9 @@ func uploadAttachmentMultipart(c *fiber.Ctx) error { if !services.CheckChunkExistsInTemporary(meta, cid) { isAllUploaded = false break - } else if val, ok := idx.(int); ok { - chunkArrange[val] = cid + } else if val, ok := idx.(json.Number); ok { + data, _ := val.Int64() + chunkArrange[data] = cid } } diff --git a/pkg/internal/services/analyzer.go b/pkg/internal/services/analyzer.go index adebacd..b1962f8 100644 --- a/pkg/internal/services/analyzer.go +++ b/pkg/internal/services/analyzer.go @@ -53,7 +53,10 @@ func ScanUnanalyzedFileFromDatabase() { } var attachments []models.Attachment - if err := database.C.Where("destination = ? OR is_analyzed = ?", models.AttachmentDstTemporary, false).Find(&attachments).Error; err != nil { + if err := database.C. + Where("is_uploaded = ?", true). + Where("destination = ? OR is_analyzed = ?", models.AttachmentDstTemporary, false). + Find(&attachments).Error; err != nil { log.Error().Err(err).Msg("Scan unanalyzed files from database failed...") return } @@ -93,7 +96,9 @@ func ScanUnanalyzedFileFromDatabase() { } func AnalyzeAttachment(file models.Attachment) error { - if file.Destination != models.AttachmentDstTemporary { + if !file.IsUploaded { + return fmt.Errorf("file isn't finish multipart upload") + } else if file.Destination != models.AttachmentDstTemporary { return fmt.Errorf("attachment isn't in temporary storage, unable to analyze") } diff --git a/pkg/internal/services/merger.go b/pkg/internal/services/merger.go index a6821b3..4111eb7 100644 --- a/pkg/internal/services/merger.go +++ b/pkg/internal/services/merger.go @@ -25,6 +25,7 @@ func MergeFileChunks(meta models.Attachment, arrange []string) (models.Attachmen } defer destFile.Close() + // Merge files for _, chunk := range arrange { chunkPath := filepath.Join(dest.Path, fmt.Sprintf("%s.%s", meta.Uuid, chunk)) chunkFile, err := os.Open(chunkPath) @@ -41,10 +42,17 @@ func MergeFileChunks(meta models.Attachment, arrange []string) (models.Attachmen _ = chunkFile.Close() } + // Do post-upload tasks meta.IsUploaded = true database.C.Save(&meta) PublishAnalyzeTask(meta) + // Clean up + for _, chunk := range arrange { + chunkPath := filepath.Join(dest.Path, fmt.Sprintf("%s.%s", meta.Uuid, chunk)) + _ = os.Remove(chunkPath) + } + return meta, nil } diff --git a/pkg/internal/services/recycler.go b/pkg/internal/services/recycler.go index 87ce3f5..2cd396c 100644 --- a/pkg/internal/services/recycler.go +++ b/pkg/internal/services/recycler.go @@ -64,7 +64,7 @@ func RunMarkLifecycleDeletionTask() { } func RunMarkMultipartDeletionTask() { - lifecycle := time.Now().Add(-24 * time.Hour) + lifecycle := time.Now().Add(-60 * time.Minute) tx := database.C. Where("created_at < ?", lifecycle). Where("is_uploaded = ?", false).