From 2a94bb20f8d9f1c9a72400c0a6e10547cf262b50 Mon Sep 17 00:00:00 2001 From: LittleSheep Date: Sun, 28 Jul 2024 14:04:18 +0800 Subject: [PATCH 1/9] :memo: Update README for the future features --- README.md | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index f09d89d..36476f1 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,20 @@ It contains file metadata compute, instant upload, calculating hashing, multi de ## Features +Paperclip store and processing uploaded files with pipeline flow. +When a user try to upload files. The file will store in local first for media processing. + +Then the server will publish a message into the message queue. +And the background consumer will start dealing with the uploaded files. + +The background consumer will hash the file and merge the files with same hashcode. +The background consumer will decode the image and generate ratio and read more info from image file too. + +After the processing done. The consumer will upload the file to the persistent storage like a s3 bucket and remove local cache. +While the processing, the file record in database will marked to the temporary and load file from the temporary storage. +When the processing done, the file record will be updated. + ### Supported Destinations - Local filesystem -- S3 compilable bucket \ No newline at end of file +- S3 compilable bucket -- 2.45.2 From 10879bef1409cd71f885f6151e4c3382b7b07b54 Mon Sep 17 00:00:00 2001 From: LittleSheep Date: Sun, 28 Jul 2024 21:03:56 +0800 Subject: [PATCH 2/9] :sparkles: Basic queue for processing --- README.md | 2 +- go.mod | 2 +- pkg/internal/grpc/attachments.go | 76 ----- pkg/internal/grpc/server.go | 6 +- pkg/internal/models/attachments.go | 27 +- pkg/internal/server/api/attachments_api.go | 31 +- pkg/internal/services/analyzer.go | 68 ++++ pkg/internal/services/attachments.go | 10 +- pkg/internal/services/recycler.go | 19 +- pkg/internal/services/uploader.go | 33 +- pkg/proto/attachments.pb.go | 349 --------------------- pkg/proto/attachments_grpc.pb.go | 147 --------- settings.toml | 5 +- 13 files changed, 140 insertions(+), 635 deletions(-) delete mode 100644 pkg/internal/grpc/attachments.go create mode 100644 pkg/internal/services/analyzer.go delete mode 100644 pkg/proto/attachments.pb.go delete mode 100644 pkg/proto/attachments_grpc.pb.go diff --git a/README.md b/README.md index 36476f1..e0fca7d 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ And the background consumer will start dealing with the uploaded files. The background consumer will hash the file and merge the files with same hashcode. The background consumer will decode the image and generate ratio and read more info from image file too. -After the processing done. The consumer will upload the file to the persistent storage like a s3 bucket and remove local cache. +After the processing done. The consumer will upload the file to the permanent storage like a s3 bucket and remove local cache. While the processing, the file record in database will marked to the temporary and load file from the temporary storage. When the processing done, the file record will be updated. diff --git a/go.mod b/go.mod index bb5ef9e..60b76e5 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,6 @@ require ( github.com/samber/lo v1.39.0 github.com/spf13/viper v1.18.2 google.golang.org/grpc v1.64.0 - google.golang.org/protobuf v1.34.2 gorm.io/datatypes v1.2.0 gorm.io/driver/postgres v1.5.4 gorm.io/gorm v1.25.6 @@ -86,6 +85,7 @@ require ( golang.org/x/sys v0.21.0 // indirect golang.org/x/text v0.16.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240604185151-ef581f913117 // indirect + google.golang.org/protobuf v1.34.2 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect gorm.io/driver/mysql v1.5.2 // indirect diff --git a/pkg/internal/grpc/attachments.go b/pkg/internal/grpc/attachments.go deleted file mode 100644 index 5712a7f..0000000 --- a/pkg/internal/grpc/attachments.go +++ /dev/null @@ -1,76 +0,0 @@ -package grpc - -import ( - "context" - "fmt" - - "git.solsynth.dev/hydrogen/paperclip/pkg/internal/database" - "git.solsynth.dev/hydrogen/paperclip/pkg/proto" - "google.golang.org/protobuf/types/known/emptypb" - - "git.solsynth.dev/hydrogen/paperclip/pkg/internal/models" - jsoniter "github.com/json-iterator/go" - "github.com/samber/lo" -) - -func (v *Server) GetAttachment(ctx context.Context, request *proto.AttachmentLookupRequest) (*proto.Attachment, error) { - var attachment models.Attachment - - tx := database.C.Model(&models.Attachment{}) - if request.Id != nil { - tx = tx.Where("id = ?", request.GetId()) - } - if request.Uuid != nil { - tx = tx.Where("uuid = ?", request.GetUuid()) - } - if request.Usage != nil { - tx = tx.Where("usage = ?", request.GetUsage()) - } - - if err := tx.First(&attachment).Error; err != nil { - return nil, err - } - - rawMetadata, _ := jsoniter.Marshal(attachment.Metadata) - - if attachment.AccountID == nil { - attachment.AccountID = lo.ToPtr[uint](0) - } - - return &proto.Attachment{ - Id: uint64(attachment.ID), - Uuid: attachment.Uuid, - Size: attachment.Size, - Name: attachment.Name, - Alt: attachment.Alternative, - Usage: attachment.Usage, - Mimetype: attachment.MimeType, - Hash: attachment.HashCode, - Destination: attachment.Destination, - Metadata: rawMetadata, - IsMature: attachment.IsMature, - AccountId: uint64(*attachment.AccountID), - }, nil -} - -func (v *Server) CheckAttachmentExists(ctx context.Context, request *proto.AttachmentLookupRequest) (*emptypb.Empty, error) { - tx := database.C.Model(&models.Attachment{}) - if request.Id != nil { - tx = tx.Where("id = ?", request.GetId()) - } - if request.Uuid != nil { - tx = tx.Where("uuid = ?", request.GetUuid()) - } - if request.Usage != nil { - tx = tx.Where("usage = ?", request.GetUsage()) - } - - var count int64 - if err := tx.Model(&models.Attachment{}).Count(&count).Error; err != nil { - return nil, err - } else if count == 0 { - return nil, fmt.Errorf("record not found") - } - - return &emptypb.Empty{}, nil -} diff --git a/pkg/internal/grpc/server.go b/pkg/internal/grpc/server.go index f0831ac..6cb38c3 100644 --- a/pkg/internal/grpc/server.go +++ b/pkg/internal/grpc/server.go @@ -1,16 +1,15 @@ package grpc import ( - "git.solsynth.dev/hydrogen/paperclip/pkg/proto" + "net" + "github.com/spf13/viper" "google.golang.org/grpc" health "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/reflection" - "net" ) type Server struct { - proto.UnimplementedAttachmentsServer } var S *grpc.Server @@ -18,7 +17,6 @@ var S *grpc.Server func NewGRPC() { S = grpc.NewServer() - proto.RegisterAttachmentsServer(S, &Server{}) health.RegisterHealthServer(S, &Server{}) reflection.Register(S) diff --git a/pkg/internal/models/attachments.go b/pkg/internal/models/attachments.go index b13a8c9..9009870 100644 --- a/pkg/internal/models/attachments.go +++ b/pkg/internal/models/attachments.go @@ -2,21 +2,28 @@ package models import "gorm.io/datatypes" +type AttachmentDst = int8 + +const ( + AttachmentDstTemporary = AttachmentDst(iota) + AttachmentDstPermanent +) + type Attachment struct { BaseModel - Uuid string `json:"uuid"` - Size int64 `json:"size"` - Name string `json:"name"` - Alternative string `json:"alt"` - Usage string `json:"usage"` - MimeType string `json:"mimetype"` - HashCode string `json:"hash"` - Destination string `json:"destination"` + Uuid string `json:"uuid"` + Size int64 `json:"size"` + Name string `json:"name"` + Alternative string `json:"alt"` + Usage string `json:"usage"` + MimeType string `json:"mimetype"` + HashCode string `json:"hash"` + Destination AttachmentDst `json:"destination"` Metadata datatypes.JSONMap `json:"metadata"` IsMature bool `json:"is_mature"` - Account *Account `json:"account"` - AccountID *uint `json:"account_id"` + Account Account `json:"account"` + AccountID uint `json:"account_id"` } diff --git a/pkg/internal/server/api/attachments_api.go b/pkg/internal/server/api/attachments_api.go index 9c945c2..eb0dc61 100644 --- a/pkg/internal/server/api/attachments_api.go +++ b/pkg/internal/server/api/attachments_api.go @@ -25,17 +25,18 @@ func openAttachment(c *fiber.Ctx) error { return fiber.NewError(fiber.StatusNotFound) } - destMap := viper.GetStringMap("destinations") - dest, destOk := destMap[metadata.Destination] - if !destOk { - return fiber.NewError(fiber.StatusInternalServerError, "invalid destination: destination configuration was not found") + var destMap map[string]any + if metadata.Destination == models.AttachmentDstTemporary { + destMap = viper.GetStringMap("destinations.temporary") + } else { + destMap = viper.GetStringMap("destinations.permanent") } - var destParsed models.BaseDestination - rawDest, _ := jsoniter.Marshal(dest) - _ = jsoniter.Unmarshal(rawDest, &destParsed) + var dest models.BaseDestination + rawDest, _ := jsoniter.Marshal(destMap) + _ = jsoniter.Unmarshal(rawDest, &dest) - switch destParsed.Type { + switch dest.Type { case models.DestinationTypeLocal: var destConfigured models.LocalDestination _ = jsoniter.Unmarshal(rawDest, &destConfigured) @@ -43,7 +44,6 @@ func openAttachment(c *fiber.Ctx) error { c.Set(fiber.HeaderContentType, metadata.MimeType) } return c.SendFile(filepath.Join(destConfigured.Path, metadata.Uuid), false) - case models.DestinationTypeS3: var destConfigured models.S3Destination _ = jsoniter.Unmarshal(rawDest, &destConfigured) @@ -54,10 +54,9 @@ func openAttachment(c *fiber.Ctx) error { destConfigured.Bucket, destConfigured.Endpoint, url.QueryEscape(filepath.Join(destConfigured.Path, metadata.Uuid)), - )) - + ), fiber.StatusMovedPermanently) default: - return fmt.Errorf("invalid destination: unsupported protocol %s", destParsed.Type) + return fmt.Errorf("invalid destination: unsupported protocol %s", dest.Type) } } @@ -79,8 +78,6 @@ func createAttachment(c *fiber.Ctx) error { } user = lo.ToPtr(c.Locals("user").(models.Account)) - destName := c.Query("destination", viper.GetString("preferred_destination")) - hash := c.FormValue("hash") if len(hash) != 64 { return fiber.NewError(fiber.StatusBadRequest, "please provide a sha-256 hash code, length should be 64 characters") @@ -110,7 +107,7 @@ func createAttachment(c *fiber.Ctx) error { MimeType: c.FormValue("mimetype"), Metadata: usermeta, IsMature: len(c.FormValue("mature")) > 0, - Destination: destName, + Destination: models.AttachmentDstTemporary, }) if err != nil { tx.Rollback() @@ -118,7 +115,7 @@ func createAttachment(c *fiber.Ctx) error { } if !linked { - if err := services.UploadFile(destName, c, file, metadata); err != nil { + if err := services.UploadFileToTemporary(c, file, metadata); err != nil { tx.Rollback() return fiber.NewError(fiber.StatusBadRequest, err.Error()) } @@ -176,7 +173,7 @@ func deleteAttachment(c *fiber.Ctx) error { attachment, err := services.GetAttachmentByID(uint(id)) if err != nil { return fiber.NewError(fiber.StatusNotFound, err.Error()) - } else if attachment.AccountID == nil || *attachment.AccountID != user.ID { + } else if attachment.AccountID != user.ID { return fiber.NewError(fiber.StatusNotFound, "record not created by you") } diff --git a/pkg/internal/services/analyzer.go b/pkg/internal/services/analyzer.go new file mode 100644 index 0000000..81e1614 --- /dev/null +++ b/pkg/internal/services/analyzer.go @@ -0,0 +1,68 @@ +package services + +import ( + "fmt" + "image" + "os" + "path/filepath" + "strings" + + "git.solsynth.dev/hydrogen/paperclip/pkg/internal/database" + "git.solsynth.dev/hydrogen/paperclip/pkg/internal/models" + jsoniter "github.com/json-iterator/go" + "github.com/spf13/viper" + + _ "image/gif" + _ "image/jpeg" + _ "image/png" +) + +var fileAnalyzeQueue = make(chan models.Attachment, 256) + +func PublishAnalyzeTask(file models.Attachment) { + fileAnalyzeQueue <- file +} + +func AnalyzeAttachment(file models.Attachment) error { + if file.Destination != models.AttachmentDstTemporary { + return fmt.Errorf("attachment isn't in temporary storage, unable to analyze") + } + + destMap := viper.GetStringMap("destinations.temporary") + + var dest models.LocalDestination + rawDest, _ := jsoniter.Marshal(destMap) + _ = jsoniter.Unmarshal(rawDest, &dest) + + dst := filepath.Join(dest.Path, file.Uuid) + if _, err := os.Stat(dst); !os.IsExist(err) { + return fmt.Errorf("attachment doesn't exists in temporary storage") + } + + if t := strings.SplitN(file.MimeType, "/", 2)[0]; t == "image" { + // Dealing with image + reader, err := os.Open(dst) + if err != nil { + return fmt.Errorf("unable to open file: %v", err) + } + defer reader.Close() + im, _, err := image.Decode(reader) + if err != nil { + return fmt.Errorf("unable to decode file as an image: %v", err) + } + width := im.Bounds().Dx() + height := im.Bounds().Dy() + ratio := width / height + file.Metadata = map[string]any{ + "width": width, + "height": height, + "ratio": ratio, + } + } + + if err := database.C.Save(&file).Error; err != nil { + return fmt.Errorf("unable to save file record: %v", err) + } + + return nil +} diff --git a/pkg/internal/services/attachments.go b/pkg/internal/services/attachments.go index cebae19..0a6c313 100644 --- a/pkg/internal/services/attachments.go +++ b/pkg/internal/services/attachments.go @@ -58,19 +58,13 @@ func NewAttachmentMetadata(tx *gorm.DB, user *models.Account, file *multipart.Fi exists.Metadata = attachment.Metadata attachment = exists attachment.ID = 0 - - if user != nil { - attachment.AccountID = &user.ID - } + attachment.AccountID = user.ID } else { // Upload the new file attachment.Uuid = uuid.NewString() attachment.Size = file.Size attachment.Name = file.Filename - - if user != nil { - attachment.AccountID = &user.ID - } + attachment.AccountID = user.ID // If the user didn't provide file mimetype manually, we have to detect it if len(attachment.MimeType) == 0 { diff --git a/pkg/internal/services/recycler.go b/pkg/internal/services/recycler.go index 54e21e9..10ebbb8 100644 --- a/pkg/internal/services/recycler.go +++ b/pkg/internal/services/recycler.go @@ -14,17 +14,18 @@ import ( ) func DeleteFile(meta models.Attachment) error { - destMap := viper.GetStringMap("destinations") - dest, destOk := destMap[meta.Destination] - if !destOk { - return fmt.Errorf("invalid destination: destination configuration was not found") + var destMap map[string]any + if meta.Destination == models.AttachmentDstTemporary { + destMap = viper.GetStringMap("destinations.temporary") + } else { + destMap = viper.GetStringMap("destinations.permanent") } - var destParsed models.BaseDestination - rawDest, _ := jsoniter.Marshal(dest) - _ = jsoniter.Unmarshal(rawDest, &destParsed) + var dest models.BaseDestination + rawDest, _ := jsoniter.Marshal(destMap) + _ = jsoniter.Unmarshal(rawDest, &dest) - switch destParsed.Type { + switch dest.Type { case models.DestinationTypeLocal: var destConfigured models.LocalDestination _ = jsoniter.Unmarshal(rawDest, &destConfigured) @@ -34,7 +35,7 @@ func DeleteFile(meta models.Attachment) error { _ = jsoniter.Unmarshal(rawDest, &destConfigured) return DeleteFileFromS3(destConfigured, meta) default: - return fmt.Errorf("invalid destination: unsupported protocol %s", destParsed.Type) + return fmt.Errorf("invalid destination: unsupported protocol %s", dest.Type) } } diff --git a/pkg/internal/services/uploader.go b/pkg/internal/services/uploader.go index 22c5806..ff4027c 100644 --- a/pkg/internal/services/uploader.go +++ b/pkg/internal/services/uploader.go @@ -16,18 +16,31 @@ import ( "github.com/spf13/viper" ) -func UploadFile(destName string, ctx *fiber.Ctx, file *multipart.FileHeader, meta models.Attachment) error { - destMap := viper.GetStringMap("destinations") - dest, destOk := destMap[destName] - if !destOk { - return fmt.Errorf("invalid destination: destination configuration was not found") +func UploadFileToTemporary(ctx *fiber.Ctx, file *multipart.FileHeader, meta models.Attachment) error { + destMap := viper.GetStringMap("destinations.temporary") + + var dest models.BaseDestination + rawDest, _ := jsoniter.Marshal(destMap) + _ = jsoniter.Unmarshal(rawDest, &dest) + + switch dest.Type { + case models.DestinationTypeLocal: + var destConfigured models.LocalDestination + _ = jsoniter.Unmarshal(rawDest, &destConfigured) + return UploadFileToLocal(destConfigured, ctx, file, meta) + default: + return fmt.Errorf("invalid destination: unsupported protocol %s", dest.Type) } +} - var destParsed models.BaseDestination - rawDest, _ := jsoniter.Marshal(dest) - _ = jsoniter.Unmarshal(rawDest, &destParsed) +func UploadFileToPermanent(ctx *fiber.Ctx, file *multipart.FileHeader, meta models.Attachment) error { + destMap := viper.GetStringMap("destinations.permanent") - switch destParsed.Type { + var dest models.BaseDestination + rawDest, _ := jsoniter.Marshal(destMap) + _ = jsoniter.Unmarshal(rawDest, &dest) + + switch dest.Type { case models.DestinationTypeLocal: var destConfigured models.LocalDestination _ = jsoniter.Unmarshal(rawDest, &destConfigured) @@ -37,7 +50,7 @@ func UploadFile(destName string, ctx *fiber.Ctx, file *multipart.FileHeader, met _ = jsoniter.Unmarshal(rawDest, &destConfigured) return UploadFileToS3(destConfigured, file, meta) default: - return fmt.Errorf("invalid destination: unsupported protocol %s", destParsed.Type) + return fmt.Errorf("invalid destination: unsupported protocol %s", dest.Type) } } diff --git a/pkg/proto/attachments.pb.go b/pkg/proto/attachments.pb.go deleted file mode 100644 index dff920b..0000000 --- a/pkg/proto/attachments.pb.go +++ /dev/null @@ -1,349 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// versions: -// protoc-gen-go v1.33.0 -// protoc v5.26.1 -// source: attachments.proto - -package proto - -import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" - emptypb "google.golang.org/protobuf/types/known/emptypb" - reflect "reflect" - sync "sync" -) - -const ( - // Verify that this generated code is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) - // Verify that runtime/protoimpl is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) -) - -type Attachment struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Id uint64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` - Uuid string `protobuf:"bytes,2,opt,name=uuid,proto3" json:"uuid,omitempty"` - Size int64 `protobuf:"varint,3,opt,name=size,proto3" json:"size,omitempty"` - Name string `protobuf:"bytes,4,opt,name=name,proto3" json:"name,omitempty"` - Alt string `protobuf:"bytes,5,opt,name=alt,proto3" json:"alt,omitempty"` - Usage string `protobuf:"bytes,6,opt,name=usage,proto3" json:"usage,omitempty"` - Mimetype string `protobuf:"bytes,7,opt,name=mimetype,proto3" json:"mimetype,omitempty"` - Hash string `protobuf:"bytes,8,opt,name=hash,proto3" json:"hash,omitempty"` - Destination string `protobuf:"bytes,9,opt,name=destination,proto3" json:"destination,omitempty"` - Metadata []byte `protobuf:"bytes,10,opt,name=metadata,proto3" json:"metadata,omitempty"` - IsMature bool `protobuf:"varint,11,opt,name=is_mature,json=isMature,proto3" json:"is_mature,omitempty"` - AccountId uint64 `protobuf:"varint,12,opt,name=account_id,json=accountId,proto3" json:"account_id,omitempty"` -} - -func (x *Attachment) Reset() { - *x = Attachment{} - if protoimpl.UnsafeEnabled { - mi := &file_attachments_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *Attachment) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*Attachment) ProtoMessage() {} - -func (x *Attachment) ProtoReflect() protoreflect.Message { - mi := &file_attachments_proto_msgTypes[0] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use Attachment.ProtoReflect.Descriptor instead. -func (*Attachment) Descriptor() ([]byte, []int) { - return file_attachments_proto_rawDescGZIP(), []int{0} -} - -func (x *Attachment) GetId() uint64 { - if x != nil { - return x.Id - } - return 0 -} - -func (x *Attachment) GetUuid() string { - if x != nil { - return x.Uuid - } - return "" -} - -func (x *Attachment) GetSize() int64 { - if x != nil { - return x.Size - } - return 0 -} - -func (x *Attachment) GetName() string { - if x != nil { - return x.Name - } - return "" -} - -func (x *Attachment) GetAlt() string { - if x != nil { - return x.Alt - } - return "" -} - -func (x *Attachment) GetUsage() string { - if x != nil { - return x.Usage - } - return "" -} - -func (x *Attachment) GetMimetype() string { - if x != nil { - return x.Mimetype - } - return "" -} - -func (x *Attachment) GetHash() string { - if x != nil { - return x.Hash - } - return "" -} - -func (x *Attachment) GetDestination() string { - if x != nil { - return x.Destination - } - return "" -} - -func (x *Attachment) GetMetadata() []byte { - if x != nil { - return x.Metadata - } - return nil -} - -func (x *Attachment) GetIsMature() bool { - if x != nil { - return x.IsMature - } - return false -} - -func (x *Attachment) GetAccountId() uint64 { - if x != nil { - return x.AccountId - } - return 0 -} - -type AttachmentLookupRequest struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Id *uint64 `protobuf:"varint,1,opt,name=id,proto3,oneof" json:"id,omitempty"` - Uuid *string `protobuf:"bytes,2,opt,name=uuid,proto3,oneof" json:"uuid,omitempty"` - Usage *string `protobuf:"bytes,3,opt,name=usage,proto3,oneof" json:"usage,omitempty"` -} - -func (x *AttachmentLookupRequest) Reset() { - *x = AttachmentLookupRequest{} - if protoimpl.UnsafeEnabled { - mi := &file_attachments_proto_msgTypes[1] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *AttachmentLookupRequest) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*AttachmentLookupRequest) ProtoMessage() {} - -func (x *AttachmentLookupRequest) ProtoReflect() protoreflect.Message { - mi := &file_attachments_proto_msgTypes[1] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use AttachmentLookupRequest.ProtoReflect.Descriptor instead. -func (*AttachmentLookupRequest) Descriptor() ([]byte, []int) { - return file_attachments_proto_rawDescGZIP(), []int{1} -} - -func (x *AttachmentLookupRequest) GetId() uint64 { - if x != nil && x.Id != nil { - return *x.Id - } - return 0 -} - -func (x *AttachmentLookupRequest) GetUuid() string { - if x != nil && x.Uuid != nil { - return *x.Uuid - } - return "" -} - -func (x *AttachmentLookupRequest) GetUsage() string { - if x != nil && x.Usage != nil { - return *x.Usage - } - return "" -} - -var File_attachments_proto protoreflect.FileDescriptor - -var file_attachments_proto_rawDesc = []byte{ - 0x0a, 0x11, 0x61, 0x74, 0x74, 0x61, 0x63, 0x68, 0x6d, 0x65, 0x6e, 0x74, 0x73, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x12, 0x05, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, - 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, - 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xaa, 0x02, 0x0a, 0x0a, 0x41, 0x74, 0x74, 0x61, - 0x63, 0x68, 0x6d, 0x65, 0x6e, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x04, 0x52, 0x02, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x75, 0x75, 0x69, 0x64, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x75, 0x75, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x73, 0x69, - 0x7a, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x04, 0x73, 0x69, 0x7a, 0x65, 0x12, 0x12, - 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, - 0x6d, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x61, 0x6c, 0x74, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x03, 0x61, 0x6c, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x75, 0x73, 0x61, 0x67, 0x65, 0x18, 0x06, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x05, 0x75, 0x73, 0x61, 0x67, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x6d, 0x69, - 0x6d, 0x65, 0x74, 0x79, 0x70, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x6d, 0x69, - 0x6d, 0x65, 0x74, 0x79, 0x70, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x68, 0x61, 0x73, 0x68, 0x18, 0x08, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x68, 0x61, 0x73, 0x68, 0x12, 0x20, 0x0a, 0x0b, 0x64, 0x65, - 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x09, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x0b, 0x64, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1a, 0x0a, 0x08, - 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x08, - 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x1b, 0x0a, 0x09, 0x69, 0x73, 0x5f, 0x6d, - 0x61, 0x74, 0x75, 0x72, 0x65, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x08, 0x52, 0x08, 0x69, 0x73, 0x4d, - 0x61, 0x74, 0x75, 0x72, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x61, 0x63, 0x63, 0x6f, 0x75, 0x6e, 0x74, - 0x5f, 0x69, 0x64, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x04, 0x52, 0x09, 0x61, 0x63, 0x63, 0x6f, 0x75, - 0x6e, 0x74, 0x49, 0x64, 0x22, 0x7c, 0x0a, 0x17, 0x41, 0x74, 0x74, 0x61, 0x63, 0x68, 0x6d, 0x65, - 0x6e, 0x74, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, - 0x13, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x48, 0x00, 0x52, 0x02, 0x69, - 0x64, 0x88, 0x01, 0x01, 0x12, 0x17, 0x0a, 0x04, 0x75, 0x75, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x09, 0x48, 0x01, 0x52, 0x04, 0x75, 0x75, 0x69, 0x64, 0x88, 0x01, 0x01, 0x12, 0x19, 0x0a, - 0x05, 0x75, 0x73, 0x61, 0x67, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x48, 0x02, 0x52, 0x05, - 0x75, 0x73, 0x61, 0x67, 0x65, 0x88, 0x01, 0x01, 0x42, 0x05, 0x0a, 0x03, 0x5f, 0x69, 0x64, 0x42, - 0x07, 0x0a, 0x05, 0x5f, 0x75, 0x75, 0x69, 0x64, 0x42, 0x08, 0x0a, 0x06, 0x5f, 0x75, 0x73, 0x61, - 0x67, 0x65, 0x32, 0xa6, 0x01, 0x0a, 0x0b, 0x41, 0x74, 0x74, 0x61, 0x63, 0x68, 0x6d, 0x65, 0x6e, - 0x74, 0x73, 0x12, 0x44, 0x0a, 0x0d, 0x47, 0x65, 0x74, 0x41, 0x74, 0x74, 0x61, 0x63, 0x68, 0x6d, - 0x65, 0x6e, 0x74, 0x12, 0x1e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x41, 0x74, 0x74, 0x61, - 0x63, 0x68, 0x6d, 0x65, 0x6e, 0x74, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x1a, 0x11, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x41, 0x74, 0x74, 0x61, - 0x63, 0x68, 0x6d, 0x65, 0x6e, 0x74, 0x22, 0x00, 0x12, 0x51, 0x0a, 0x15, 0x43, 0x68, 0x65, 0x63, - 0x6b, 0x41, 0x74, 0x74, 0x61, 0x63, 0x68, 0x6d, 0x65, 0x6e, 0x74, 0x45, 0x78, 0x69, 0x73, 0x74, - 0x73, 0x12, 0x1e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x41, 0x74, 0x74, 0x61, 0x63, 0x68, - 0x6d, 0x65, 0x6e, 0x74, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x42, 0x09, 0x5a, 0x07, 0x2e, - 0x3b, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, -} - -var ( - file_attachments_proto_rawDescOnce sync.Once - file_attachments_proto_rawDescData = file_attachments_proto_rawDesc -) - -func file_attachments_proto_rawDescGZIP() []byte { - file_attachments_proto_rawDescOnce.Do(func() { - file_attachments_proto_rawDescData = protoimpl.X.CompressGZIP(file_attachments_proto_rawDescData) - }) - return file_attachments_proto_rawDescData -} - -var file_attachments_proto_msgTypes = make([]protoimpl.MessageInfo, 2) -var file_attachments_proto_goTypes = []interface{}{ - (*Attachment)(nil), // 0: proto.Attachment - (*AttachmentLookupRequest)(nil), // 1: proto.AttachmentLookupRequest - (*emptypb.Empty)(nil), // 2: google.protobuf.Empty -} -var file_attachments_proto_depIdxs = []int32{ - 1, // 0: proto.Attachments.GetAttachment:input_type -> proto.AttachmentLookupRequest - 1, // 1: proto.Attachments.CheckAttachmentExists:input_type -> proto.AttachmentLookupRequest - 0, // 2: proto.Attachments.GetAttachment:output_type -> proto.Attachment - 2, // 3: proto.Attachments.CheckAttachmentExists:output_type -> google.protobuf.Empty - 2, // [2:4] is the sub-list for method output_type - 0, // [0:2] is the sub-list for method input_type - 0, // [0:0] is the sub-list for extension type_name - 0, // [0:0] is the sub-list for extension extendee - 0, // [0:0] is the sub-list for field type_name -} - -func init() { file_attachments_proto_init() } -func file_attachments_proto_init() { - if File_attachments_proto != nil { - return - } - if !protoimpl.UnsafeEnabled { - file_attachments_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Attachment); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_attachments_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*AttachmentLookupRequest); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - } - file_attachments_proto_msgTypes[1].OneofWrappers = []interface{}{} - type x struct{} - out := protoimpl.TypeBuilder{ - File: protoimpl.DescBuilder{ - GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_attachments_proto_rawDesc, - NumEnums: 0, - NumMessages: 2, - NumExtensions: 0, - NumServices: 1, - }, - GoTypes: file_attachments_proto_goTypes, - DependencyIndexes: file_attachments_proto_depIdxs, - MessageInfos: file_attachments_proto_msgTypes, - }.Build() - File_attachments_proto = out.File - file_attachments_proto_rawDesc = nil - file_attachments_proto_goTypes = nil - file_attachments_proto_depIdxs = nil -} diff --git a/pkg/proto/attachments_grpc.pb.go b/pkg/proto/attachments_grpc.pb.go deleted file mode 100644 index fa298ee..0000000 --- a/pkg/proto/attachments_grpc.pb.go +++ /dev/null @@ -1,147 +0,0 @@ -// Code generated by protoc-gen-go-grpc. DO NOT EDIT. -// versions: -// - protoc-gen-go-grpc v1.3.0 -// - protoc v5.26.1 -// source: attachments.proto - -package proto - -import ( - context "context" - grpc "google.golang.org/grpc" - codes "google.golang.org/grpc/codes" - status "google.golang.org/grpc/status" - emptypb "google.golang.org/protobuf/types/known/emptypb" -) - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the grpc package it is being compiled against. -// Requires gRPC-Go v1.32.0 or later. -const _ = grpc.SupportPackageIsVersion7 - -const ( - Attachments_GetAttachment_FullMethodName = "/proto.Attachments/GetAttachment" - Attachments_CheckAttachmentExists_FullMethodName = "/proto.Attachments/CheckAttachmentExists" -) - -// AttachmentsClient is the client API for Attachments service. -// -// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. -type AttachmentsClient interface { - GetAttachment(ctx context.Context, in *AttachmentLookupRequest, opts ...grpc.CallOption) (*Attachment, error) - CheckAttachmentExists(ctx context.Context, in *AttachmentLookupRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) -} - -type attachmentsClient struct { - cc grpc.ClientConnInterface -} - -func NewAttachmentsClient(cc grpc.ClientConnInterface) AttachmentsClient { - return &attachmentsClient{cc} -} - -func (c *attachmentsClient) GetAttachment(ctx context.Context, in *AttachmentLookupRequest, opts ...grpc.CallOption) (*Attachment, error) { - out := new(Attachment) - err := c.cc.Invoke(ctx, Attachments_GetAttachment_FullMethodName, in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *attachmentsClient) CheckAttachmentExists(ctx context.Context, in *AttachmentLookupRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { - out := new(emptypb.Empty) - err := c.cc.Invoke(ctx, Attachments_CheckAttachmentExists_FullMethodName, in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -// AttachmentsServer is the server API for Attachments service. -// All implementations must embed UnimplementedAttachmentsServer -// for forward compatibility -type AttachmentsServer interface { - GetAttachment(context.Context, *AttachmentLookupRequest) (*Attachment, error) - CheckAttachmentExists(context.Context, *AttachmentLookupRequest) (*emptypb.Empty, error) - mustEmbedUnimplementedAttachmentsServer() -} - -// UnimplementedAttachmentsServer must be embedded to have forward compatible implementations. -type UnimplementedAttachmentsServer struct { -} - -func (UnimplementedAttachmentsServer) GetAttachment(context.Context, *AttachmentLookupRequest) (*Attachment, error) { - return nil, status.Errorf(codes.Unimplemented, "method GetAttachment not implemented") -} -func (UnimplementedAttachmentsServer) CheckAttachmentExists(context.Context, *AttachmentLookupRequest) (*emptypb.Empty, error) { - return nil, status.Errorf(codes.Unimplemented, "method CheckAttachmentExists not implemented") -} -func (UnimplementedAttachmentsServer) mustEmbedUnimplementedAttachmentsServer() {} - -// UnsafeAttachmentsServer may be embedded to opt out of forward compatibility for this service. -// Use of this interface is not recommended, as added methods to AttachmentsServer will -// result in compilation errors. -type UnsafeAttachmentsServer interface { - mustEmbedUnimplementedAttachmentsServer() -} - -func RegisterAttachmentsServer(s grpc.ServiceRegistrar, srv AttachmentsServer) { - s.RegisterService(&Attachments_ServiceDesc, srv) -} - -func _Attachments_GetAttachment_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(AttachmentLookupRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(AttachmentsServer).GetAttachment(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: Attachments_GetAttachment_FullMethodName, - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(AttachmentsServer).GetAttachment(ctx, req.(*AttachmentLookupRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _Attachments_CheckAttachmentExists_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(AttachmentLookupRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(AttachmentsServer).CheckAttachmentExists(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: Attachments_CheckAttachmentExists_FullMethodName, - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(AttachmentsServer).CheckAttachmentExists(ctx, req.(*AttachmentLookupRequest)) - } - return interceptor(ctx, in, info, handler) -} - -// Attachments_ServiceDesc is the grpc.ServiceDesc for Attachments service. -// It's only intended for direct use with grpc.RegisterService, -// and not to be introspected or modified (even as a copy) -var Attachments_ServiceDesc = grpc.ServiceDesc{ - ServiceName: "proto.Attachments", - HandlerType: (*AttachmentsServer)(nil), - Methods: []grpc.MethodDesc{ - { - MethodName: "GetAttachment", - Handler: _Attachments_GetAttachment_Handler, - }, - { - MethodName: "CheckAttachmentExists", - Handler: _Attachments_CheckAttachmentExists_Handler, - }, - }, - Streams: []grpc.StreamDesc{}, - Metadata: "attachments.proto", -} diff --git a/settings.toml b/settings.toml index 46313dc..fc75ae9 100644 --- a/settings.toml +++ b/settings.toml @@ -5,7 +5,6 @@ grpc_bind = "0.0.0.0:7443" domain = "usercontent.solsynth.dev" secret = "LtTjzAGFLshwXhN4ZD4nG5KlMv1MWcsvfv03TSZYnT1VhiAnLIZFTnHUwR0XhGgi" -preferred_destination = "local" accepts_usage = ["p.avatar", "p.banner", "i.attachment", "m.attachment"] [debug] @@ -25,11 +24,11 @@ refresh_token_duration = 2592000 dsn = "host=localhost user=postgres password=password dbname=hy_paperclip port=5432 sslmode=disable" prefix = "paperclip_" -[destinations.local] +[destinations.temporary] type = "local" path = "uploads" -[destinations.s3] +[destinations.permanent] type = "s3" bucket = "bucket" endpoint = "s3.ap-east-1.amazonaws.com" -- 2.45.2 From 020e59234e08d5fc21a93967181afe5301dee963 Mon Sep 17 00:00:00 2001 From: LittleSheep Date: Sun, 28 Jul 2024 23:16:46 +0800 Subject: [PATCH 3/9] :sparkles: Reupload file to permanent storage --- pkg/internal/services/uploader.go | 56 +++++++++++++++++++++++++++++-- 1 file changed, 53 insertions(+), 3 deletions(-) diff --git a/pkg/internal/services/uploader.go b/pkg/internal/services/uploader.go index ff4027c..5f27dd4 100644 --- a/pkg/internal/services/uploader.go +++ b/pkg/internal/services/uploader.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "mime/multipart" + "os" "path/filepath" "git.solsynth.dev/hydrogen/paperclip/pkg/internal/models" @@ -33,22 +34,71 @@ func UploadFileToTemporary(ctx *fiber.Ctx, file *multipart.FileHeader, meta mode } } -func UploadFileToPermanent(ctx *fiber.Ctx, file *multipart.FileHeader, meta models.Attachment) error { +func ReUploadFileToPermanent(meta models.Attachment) error { + if meta.Destination != models.AttachmentDstTemporary { + return fmt.Errorf("attachment isn't in temporary storage, unable to process") + } + destMap := viper.GetStringMap("destinations.permanent") var dest models.BaseDestination rawDest, _ := jsoniter.Marshal(destMap) _ = jsoniter.Unmarshal(rawDest, &dest) + prevDestMap := viper.GetStringMap("destinations.temporary") + + // Currently the temporary destination only support the local + // So we can do this + var prevDest models.LocalDestination + prevRawDest, _ := jsoniter.Marshal(prevDestMap) + _ = jsoniter.Unmarshal(prevRawDest, &prevDest) + + in, err := os.Open(filepath.Join(prevDest.Path, meta.Uuid)) + if err != nil { + return fmt.Errorf("unable to open file in temporary storage: %v", err) + } + defer in.Close() + switch dest.Type { case models.DestinationTypeLocal: var destConfigured models.LocalDestination _ = jsoniter.Unmarshal(rawDest, &destConfigured) - return UploadFileToLocal(destConfigured, ctx, file, meta) + + out, err := os.Create(filepath.Join(destConfigured.Path, meta.Uuid)) + if err != nil { + return fmt.Errorf("unable to open dest file: %v", err) + } + defer out.Close() + + _, err = io.Copy(out, in) + if err != nil { + return fmt.Errorf("unable to copy data to dest file: %v", err) + } + return nil case models.DestinationTypeS3: var destConfigured models.S3Destination _ = jsoniter.Unmarshal(rawDest, &destConfigured) - return UploadFileToS3(destConfigured, file, meta) + + buffer := bytes.NewBuffer(nil) + if _, err := io.Copy(buffer, in); err != nil { + return fmt.Errorf("create io reader for upload file: %v", err) + } + + client, err := minio.New(destConfigured.Endpoint, &minio.Options{ + Creds: credentials.NewStaticV4(destConfigured.SecretID, destConfigured.SecretKey, ""), + Secure: destConfigured.EnableSSL, + }) + if err != nil { + return fmt.Errorf("unable to configure s3 client: %v", err) + } + + _, err = client.PutObject(context.Background(), destConfigured.Bucket, filepath.Join(destConfigured.Path, meta.Uuid), buffer, -1, minio.PutObjectOptions{ + ContentType: meta.MimeType, + }) + if err != nil { + return fmt.Errorf("unable to upload file to s3: %v", err) + } + return nil default: return fmt.Errorf("invalid destination: unsupported protocol %s", dest.Type) } -- 2.45.2 From 089a9ecd9df96c7e679f062097c61e0700843d77 Mon Sep 17 00:00:00 2001 From: LittleSheep Date: Mon, 29 Jul 2024 00:01:51 +0800 Subject: [PATCH 4/9] :sparkles: Processing files in background --- pkg/internal/models/attachments.go | 4 + pkg/internal/server/api/attachments_api.go | 17 ++- pkg/internal/services/analyzer.go | 66 +++++++++++- pkg/internal/services/attachments.go | 119 ++++++++++++--------- 4 files changed, 145 insertions(+), 61 deletions(-) diff --git a/pkg/internal/models/attachments.go b/pkg/internal/models/attachments.go index 9009870..bd5d5b0 100644 --- a/pkg/internal/models/attachments.go +++ b/pkg/internal/models/attachments.go @@ -20,10 +20,14 @@ type Attachment struct { MimeType string `json:"mimetype"` HashCode string `json:"hash"` Destination AttachmentDst `json:"destination"` + RefCount int `json:"ref_count"` Metadata datatypes.JSONMap `json:"metadata"` IsMature bool `json:"is_mature"` + Ref *Attachment `json:"ref"` + RefID *uint `json:"ref_id"` + Account Account `json:"account"` AccountID uint `json:"account_id"` } diff --git a/pkg/internal/server/api/attachments_api.go b/pkg/internal/server/api/attachments_api.go index eb0dc61..af5f15a 100644 --- a/pkg/internal/server/api/attachments_api.go +++ b/pkg/internal/server/api/attachments_api.go @@ -78,10 +78,6 @@ func createAttachment(c *fiber.Ctx) error { } user = lo.ToPtr(c.Locals("user").(models.Account)) - hash := c.FormValue("hash") - if len(hash) != 64 { - return fiber.NewError(fiber.StatusBadRequest, "please provide a sha-256 hash code, length should be 64 characters") - } usage := c.FormValue("usage") if !lo.Contains(viper.GetStringSlice("accepts_usage"), usage) { return fiber.NewError(fiber.StatusBadRequest, fmt.Sprintf("disallowed usage: %s", usage)) @@ -100,9 +96,8 @@ func createAttachment(c *fiber.Ctx) error { _ = jsoniter.UnmarshalFromString(c.FormValue("metadata"), &usermeta) tx := database.C.Begin() - metadata, linked, err := services.NewAttachmentMetadata(tx, user, file, models.Attachment{ + metadata, err := services.NewAttachmentMetadata(tx, user, file, models.Attachment{ Usage: usage, - HashCode: hash, Alternative: c.FormValue("alt"), MimeType: c.FormValue("mimetype"), Metadata: usermeta, @@ -114,15 +109,15 @@ func createAttachment(c *fiber.Ctx) error { return fiber.NewError(fiber.StatusBadRequest, err.Error()) } - if !linked { - if err := services.UploadFileToTemporary(c, file, metadata); err != nil { - tx.Rollback() - return fiber.NewError(fiber.StatusBadRequest, err.Error()) - } + if err := services.UploadFileToTemporary(c, file, metadata); err != nil { + tx.Rollback() + return fiber.NewError(fiber.StatusBadRequest, err.Error()) } tx.Commit() + services.PublishAnalyzeTask(metadata) + return c.JSON(metadata) } diff --git a/pkg/internal/services/analyzer.go b/pkg/internal/services/analyzer.go index 81e1614..5a7cbfd 100644 --- a/pkg/internal/services/analyzer.go +++ b/pkg/internal/services/analyzer.go @@ -1,8 +1,11 @@ package services import ( + "crypto/sha256" + "encoding/hex" "fmt" "image" + "io" "os" "path/filepath" "strings" @@ -60,9 +63,68 @@ func AnalyzeAttachment(file models.Attachment) error { } } - if err := database.C.Save(&file).Error; err != nil { - return fmt.Errorf("unable to save file record: %v", err) + if hash, err := HashAttachment(file); err != nil { + return err + } else { + file.HashCode = hash } + tx := database.C.Begin() + + linked, err := TryLinkAttachment(tx, file, file.HashCode) + if linked && err != nil { + return fmt.Errorf("unable to link file record: %v", err) + } else if !linked { + if err := tx.Save(&file); err != nil { + tx.Rollback() + return fmt.Errorf("unable to save file record: %v", err) + } + } + + if !linked { + if err := ReUploadFileToPermanent(file); err != nil { + tx.Rollback() + return fmt.Errorf("unable to move file to permanet storage: %v", err) + } + } + + tx.Commit() + return nil } + +func HashAttachment(file models.Attachment) (hash string, err error) { + if file.Destination != models.AttachmentDstTemporary { + err = fmt.Errorf("attachment isn't in temporary storage, unable to hash") + return + } + + destMap := viper.GetStringMap("destinations.temporary") + + var dest models.LocalDestination + rawDest, _ := jsoniter.Marshal(destMap) + _ = jsoniter.Unmarshal(rawDest, &dest) + + dst := filepath.Join(dest.Path, file.Uuid) + if _, err = os.Stat(dst); !os.IsExist(err) { + err = fmt.Errorf("attachment doesn't exists in temporary storage") + return + } + + var in *os.File + in, err = os.Open("file.txt") + if err != nil { + err = fmt.Errorf("unable to open file: %v", err) + return + } + defer in.Close() + + hasher := sha256.New() + if _, err = io.Copy(hasher, in); err != nil { + err = fmt.Errorf("unable to hash: %v", err) + return + } + + hash = hex.EncodeToString(hasher.Sum(nil)) + return +} diff --git a/pkg/internal/services/attachments.go b/pkg/internal/services/attachments.go index 0a6c313..0af79bf 100644 --- a/pkg/internal/services/attachments.go +++ b/pkg/internal/services/attachments.go @@ -48,50 +48,37 @@ func GetAttachmentByHash(hash string) (models.Attachment, error) { return attachment, nil } -func NewAttachmentMetadata(tx *gorm.DB, user *models.Account, file *multipart.FileHeader, attachment models.Attachment) (models.Attachment, bool, error) { - linked := false - exists, pickupErr := GetAttachmentByHash(attachment.HashCode) - if pickupErr == nil { - linked = true - exists.Alternative = attachment.Alternative - exists.Usage = attachment.Usage - exists.Metadata = attachment.Metadata - attachment = exists - attachment.ID = 0 - attachment.AccountID = user.ID - } else { - // Upload the new file - attachment.Uuid = uuid.NewString() - attachment.Size = file.Size - attachment.Name = file.Filename - attachment.AccountID = user.ID +func NewAttachmentMetadata(tx *gorm.DB, user *models.Account, file *multipart.FileHeader, attachment models.Attachment) (models.Attachment, error) { + attachment.Uuid = uuid.NewString() + attachment.Size = file.Size + attachment.Name = file.Filename + attachment.AccountID = user.ID - // If the user didn't provide file mimetype manually, we have to detect it - if len(attachment.MimeType) == 0 { - if ext := filepath.Ext(attachment.Name); len(ext) > 0 { - // Detect mimetype by file extensions - attachment.MimeType = mime.TypeByExtension(ext) - } else { - // Detect mimetype by file header - // This method as a fallback method, because this isn't pretty accurate - header, err := file.Open() - if err != nil { - return attachment, false, fmt.Errorf("failed to read file header: %v", err) - } - defer header.Close() - - fileHeader := make([]byte, 512) - _, err = header.Read(fileHeader) - if err != nil { - return attachment, false, err - } - attachment.MimeType = http.DetectContentType(fileHeader) + // If the user didn't provide file mimetype manually, we have to detect it + if len(attachment.MimeType) == 0 { + if ext := filepath.Ext(attachment.Name); len(ext) > 0 { + // Detect mimetype by file extensions + attachment.MimeType = mime.TypeByExtension(ext) + } else { + // Detect mimetype by file header + // This method as a fallback method, because this isn't pretty accurate + header, err := file.Open() + if err != nil { + return attachment, fmt.Errorf("failed to read file header: %v", err) } + defer header.Close() + + fileHeader := make([]byte, 512) + _, err = header.Read(fileHeader) + if err != nil { + return attachment, err + } + attachment.MimeType = http.DetectContentType(fileHeader) } } if err := tx.Save(&attachment).Error; err != nil { - return attachment, linked, fmt.Errorf("failed to save attachment record: %v", err) + return attachment, fmt.Errorf("failed to save attachment record: %v", err) } else { if len(metadataCache) > metadataCacheLimit { clear(metadataCache) @@ -99,7 +86,32 @@ func NewAttachmentMetadata(tx *gorm.DB, user *models.Account, file *multipart.Fi metadataCache[attachment.ID] = attachment } - return attachment, linked, nil + return attachment, nil +} + +func TryLinkAttachment(tx *gorm.DB, og models.Attachment, hash string) (bool, error) { + prev, err := GetAttachmentByHash(hash) + if err != nil { + return false, err + } + + prev.RefCount++ + og.RefID = &prev.ID + og.Uuid = prev.Uuid + og.Destination = prev.Destination + + if err := tx.Save(&og).Error; err != nil { + tx.Rollback() + return true, err + } else if err = tx.Save(&prev).Error; err != nil { + tx.Rollback() + return true, err + } + + metadataCache[prev.ID] = prev + metadataCache[og.ID] = og + + return true, nil } func UpdateAttachment(item models.Attachment) (models.Attachment, error) { @@ -116,22 +128,33 @@ func UpdateAttachment(item models.Attachment) (models.Attachment, error) { } func DeleteAttachment(item models.Attachment) error { - var dupeCount int64 - if err := database.C. - Where(&models.Attachment{HashCode: item.HashCode}). - Model(&models.Attachment{}). - Count(&dupeCount).Error; err != nil { - dupeCount = -1 - } + dat := item + tx := database.C.Begin() + + if item.RefID != nil { + var refTarget models.Attachment + if err := database.C.Where(models.Attachment{ + BaseModel: models.BaseModel{ID: *item.RefID}, + }).First(&refTarget).Error; err == nil { + refTarget.RefCount-- + if err := tx.Save(&refTarget).Error; err != nil { + tx.Rollback() + return fmt.Errorf("unable to update ref count: %v", err) + } + } + } if err := database.C.Delete(&item).Error; err != nil { + tx.Rollback() return err } else { delete(metadataCache, item.ID) } - if dupeCount != -1 && dupeCount <= 1 { - return DeleteFile(item) + tx.Commit() + + if dat.RefCount == 0 { + return DeleteFile(dat) } return nil -- 2.45.2 From 82cb45ec5331f883c3e3142d87f9fb18adc81f7c Mon Sep 17 00:00:00 2001 From: LittleSheep Date: Mon, 29 Jul 2024 00:53:40 +0800 Subject: [PATCH 5/9] :sparkles: File operations queue --- pkg/internal/services/analyzer.go | 10 ++++++++++ pkg/internal/services/attachments.go | 2 +- pkg/internal/services/recycler.go | 16 ++++++++++++++++ pkg/main.go | 8 ++++++++ settings.toml | 4 ++++ 5 files changed, 39 insertions(+), 1 deletion(-) diff --git a/pkg/internal/services/analyzer.go b/pkg/internal/services/analyzer.go index 5a7cbfd..d1160e8 100644 --- a/pkg/internal/services/analyzer.go +++ b/pkg/internal/services/analyzer.go @@ -13,6 +13,7 @@ import ( "git.solsynth.dev/hydrogen/paperclip/pkg/internal/database" "git.solsynth.dev/hydrogen/paperclip/pkg/internal/models" jsoniter "github.com/json-iterator/go" + "github.com/rs/zerolog/log" "github.com/spf13/viper" _ "image/gif" @@ -26,6 +27,15 @@ func PublishAnalyzeTask(file models.Attachment) { fileAnalyzeQueue <- file } +func StartConsumeAnalyzeTask() { + for { + task := <-fileAnalyzeQueue + if err := AnalyzeAttachment(task); err != nil { + log.Error().Err(err).Any("task", task).Msg("A file analyze task failed...") + } + } +} + func AnalyzeAttachment(file models.Attachment) error { if file.Destination != models.AttachmentDstTemporary { return fmt.Errorf("attachment isn't in temporary storage, unable to analyze") diff --git a/pkg/internal/services/attachments.go b/pkg/internal/services/attachments.go index 0af79bf..51bd62c 100644 --- a/pkg/internal/services/attachments.go +++ b/pkg/internal/services/attachments.go @@ -154,7 +154,7 @@ func DeleteAttachment(item models.Attachment) error { tx.Commit() if dat.RefCount == 0 { - return DeleteFile(dat) + PublishDeleteFileTask(dat) } return nil diff --git a/pkg/internal/services/recycler.go b/pkg/internal/services/recycler.go index 10ebbb8..bc92e10 100644 --- a/pkg/internal/services/recycler.go +++ b/pkg/internal/services/recycler.go @@ -10,9 +10,25 @@ import ( jsoniter "github.com/json-iterator/go" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" + "github.com/rs/zerolog/log" "github.com/spf13/viper" ) +var fileDeletionQueue = make(chan models.Attachment, 256) + +func PublishDeleteFileTask(file models.Attachment) { + fileDeletionQueue <- file +} + +func StartConsumeDeletionTask() { + for { + task := <-fileDeletionQueue + if err := DeleteFile(task); err != nil { + log.Error().Err(err).Any("task", task).Msg("A file deletion task failed...") + } + } +} + func DeleteFile(meta models.Attachment) error { var destMap map[string]any if meta.Destination == models.AttachmentDstTemporary { diff --git a/pkg/main.go b/pkg/main.go index 61e61db..6b9bdd2 100644 --- a/pkg/main.go +++ b/pkg/main.go @@ -48,6 +48,14 @@ func main() { log.Error().Err(err).Msg("An error occurred when registering service to dealer...") } + // Setup some workers + for idx := 0; idx < viper.GetInt("workers.files_deletion"); idx++ { + go services.StartConsumeDeletionTask() + } + for idx := 0; idx < viper.GetInt("workers.files_analyze"); idx++ { + go services.StartConsumeAnalyzeTask() + } + // Configure timed tasks quartz := cron.New(cron.WithLogger(cron.VerbosePrintfLogger(&log.Logger))) quartz.AddFunc("@every 60m", services.DoAutoDatabaseCleanup) diff --git a/settings.toml b/settings.toml index fc75ae9..b46da6f 100644 --- a/settings.toml +++ b/settings.toml @@ -7,6 +7,10 @@ secret = "LtTjzAGFLshwXhN4ZD4nG5KlMv1MWcsvfv03TSZYnT1VhiAnLIZFTnHUwR0XhGgi" accepts_usage = ["p.avatar", "p.banner", "i.attachment", "m.attachment"] +[workers] +files_deletion = 4 +files_analyze = 4 + [debug] database = false print_routes = false -- 2.45.2 From 8f08d85fb15a33d8ce4588895e6770724ff40102 Mon Sep 17 00:00:00 2001 From: LittleSheep Date: Mon, 29 Jul 2024 01:47:33 +0800 Subject: [PATCH 6/9] :rocket: Fix everything and ready to launch! --- .idea/workspace.xml | 11 +------- pkg/internal/server/api/attachments_api.go | 1 + pkg/internal/services/analyzer.go | 17 +++++++------ pkg/internal/services/recycler.go | 4 +++ pkg/internal/services/uploader.go | 29 +++++++++++----------- 5 files changed, 31 insertions(+), 31 deletions(-) diff --git a/.idea/workspace.xml b/.idea/workspace.xml index 9a9a347..d3a3de4 100644 --- a/.idea/workspace.xml +++ b/.idea/workspace.xml @@ -5,15 +5,6 @@ - - - - - - - - -