支援分片上传 #3

Merged
LittleSheep merged 3 commits from feature/multipart-upload into master 2024-08-20 14:57:06 +00:00
14 changed files with 433 additions and 153 deletions

View File

@ -4,10 +4,12 @@
<option name="autoReloadType" value="ALL" /> <option name="autoReloadType" value="ALL" />
</component> </component>
<component name="ChangeListManager"> <component name="ChangeListManager">
<list default="true" id="18dd0d68-b4b8-40db-9734-9119b5c848bd" name="更改" comment=":bug: Fix crash on maintain cache"> <list default="true" id="18dd0d68-b4b8-40db-9734-9119b5c848bd" name="更改" comment=":sparkles: Make it more implementable">
<change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" /> <change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/pkg/internal/models/pools.go" beforeDir="false" afterPath="$PROJECT_DIR$/pkg/internal/models/pools.go" afterDir="false" /> <change beforePath="$PROJECT_DIR$/pkg/internal/server/api/up_multipart_api.go" beforeDir="false" afterPath="$PROJECT_DIR$/pkg/internal/server/api/up_multipart_api.go" afterDir="false" />
<change beforePath="$PROJECT_DIR$/pkg/internal/server/api/attachment_dir_api.go" beforeDir="false" afterPath="$PROJECT_DIR$/pkg/internal/server/api/index_api.go" afterDir="false" /> <change beforePath="$PROJECT_DIR$/pkg/internal/services/analyzer.go" beforeDir="false" afterPath="$PROJECT_DIR$/pkg/internal/services/analyzer.go" afterDir="false" />
<change beforePath="$PROJECT_DIR$/pkg/internal/services/merger.go" beforeDir="false" afterPath="$PROJECT_DIR$/pkg/internal/services/merger.go" afterDir="false" />
<change beforePath="$PROJECT_DIR$/pkg/internal/services/recycler.go" beforeDir="false" afterPath="$PROJECT_DIR$/pkg/internal/services/recycler.go" afterDir="false" />
</list> </list>
<option name="SHOW_DIALOG" value="false" /> <option name="SHOW_DIALOG" value="false" />
<option name="HIGHLIGHT_CONFLICTS" value="true" /> <option name="HIGHLIGHT_CONFLICTS" value="true" />
@ -23,10 +25,15 @@
</component> </component>
<component name="GOROOT" url="file:///opt/homebrew/opt/go/libexec" /> <component name="GOROOT" url="file:///opt/homebrew/opt/go/libexec" />
<component name="Git.Settings"> <component name="Git.Settings">
<option name="RECENT_BRANCH_BY_REPOSITORY">
<map>
<entry key="$PROJECT_DIR$" value="master" />
</map>
</option>
<option name="RECENT_GIT_ROOT_PATH" value="$PROJECT_DIR$" /> <option name="RECENT_GIT_ROOT_PATH" value="$PROJECT_DIR$" />
</component> </component>
<component name="ProblemsViewState"> <component name="ProblemsViewState">
<option name="selectedTabId" value="ProjectErrors" /> <option name="selectedTabId" value="CurrentFile" />
</component> </component>
<component name="ProjectColorInfo">{ <component name="ProjectColorInfo">{
&quot;customColor&quot;: &quot;&quot;, &quot;customColor&quot;: &quot;&quot;,
@ -37,33 +44,33 @@
<option name="hideEmptyMiddlePackages" value="true" /> <option name="hideEmptyMiddlePackages" value="true" />
<option name="showLibraryContents" value="true" /> <option name="showLibraryContents" value="true" />
</component> </component>
<component name="PropertiesComponent">{ <component name="PropertiesComponent"><![CDATA[{
&quot;keyToString&quot;: { "keyToString": {
&quot;DefaultGoTemplateProperty&quot;: &quot;Go File&quot;, "DefaultGoTemplateProperty": "Go File",
&quot;Go Build.Backend.executor&quot;: &quot;Run&quot;, "Go Build.Backend.executor": "Run",
&quot;Go 构建.Backend.executor&quot;: &quot;Run&quot;, "Go 构建.Backend.executor": "Run",
&quot;RunOnceActivity.ShowReadmeOnStart&quot;: &quot;true&quot;, "RunOnceActivity.ShowReadmeOnStart": "true",
&quot;RunOnceActivity.go.formatter.settings.were.checked&quot;: &quot;true&quot;, "RunOnceActivity.go.formatter.settings.were.checked": "true",
&quot;RunOnceActivity.go.migrated.go.modules.settings&quot;: &quot;true&quot;, "RunOnceActivity.go.migrated.go.modules.settings": "true",
&quot;RunOnceActivity.go.modules.automatic.dependencies.download&quot;: &quot;true&quot;, "RunOnceActivity.go.modules.automatic.dependencies.download": "true",
&quot;RunOnceActivity.go.modules.go.list.on.any.changes.was.set&quot;: &quot;true&quot;, "RunOnceActivity.go.modules.go.list.on.any.changes.was.set": "true",
&quot;git-widget-placeholder&quot;: &quot;master&quot;, "git-widget-placeholder": "feature/multipart-upload",
&quot;go.import.settings.migrated&quot;: &quot;true&quot;, "go.import.settings.migrated": "true",
&quot;go.sdk.automatically.set&quot;: &quot;true&quot;, "go.sdk.automatically.set": "true",
&quot;last_opened_file_path&quot;: &quot;/Users/littlesheep/Documents/Projects/Hydrogen/Paperclip/pkg/internal/grpc&quot;, "last_opened_file_path": "/Users/littlesheep/Documents/Projects/Hydrogen/Paperclip/pkg/internal/grpc",
&quot;node.js.detected.package.eslint&quot;: &quot;true&quot;, "node.js.detected.package.eslint": "true",
&quot;node.js.selected.package.eslint&quot;: &quot;(autodetect)&quot;, "node.js.selected.package.eslint": "(autodetect)",
&quot;nodejs_package_manager_path&quot;: &quot;npm&quot;, "nodejs_package_manager_path": "npm",
&quot;run.code.analysis.last.selected.profile&quot;: &quot;pProject Default&quot;, "run.code.analysis.last.selected.profile": "pProject Default",
&quot;settings.editor.selected.configurable&quot;: &quot;preferences.pluginManager&quot;, "settings.editor.selected.configurable": "preferences.pluginManager",
&quot;vue.rearranger.settings.migration&quot;: &quot;true&quot; "vue.rearranger.settings.migration": "true"
}, },
&quot;keyToStringList&quot;: { "keyToStringList": {
&quot;DatabaseDriversLRU&quot;: [ "DatabaseDriversLRU": [
&quot;postgresql&quot; "postgresql"
] ]
} }
}</component> }]]></component>
<component name="RecentsManager"> <component name="RecentsManager">
<key name="CopyFile.RECENT_KEYS"> <key name="CopyFile.RECENT_KEYS">
<recent name="$PROJECT_DIR$/pkg/internal/grpc" /> <recent name="$PROJECT_DIR$/pkg/internal/grpc" />
@ -113,9 +120,6 @@
</option> </option>
</component> </component>
<component name="VcsManagerConfiguration"> <component name="VcsManagerConfiguration">
<MESSAGE value=":sparkles: Provide a faster check attachment exists grpc method" />
<MESSAGE value=":truck: Update url mapping" />
<MESSAGE value=":bug: Fix uuid duplicate when link exists" />
<MESSAGE value="&#10;:sparkles: Add health check" /> <MESSAGE value="&#10;:sparkles: Add health check" />
<MESSAGE value=":arrow_up: Upgrade Passport and use Hyper SDK" /> <MESSAGE value=":arrow_up: Upgrade Passport and use Hyper SDK" />
<MESSAGE value=":arrow_up: Upgrade Passport to fix bug" /> <MESSAGE value=":arrow_up: Upgrade Passport to fix bug" />
@ -138,7 +142,10 @@
<MESSAGE value=":boom: Replace attachment id by rid when fetching" /> <MESSAGE value=":boom: Replace attachment id by rid when fetching" />
<MESSAGE value=":boom: Use attachment rid instead of primary key when create" /> <MESSAGE value=":boom: Use attachment rid instead of primary key when create" />
<MESSAGE value=":bug: Fix crash on maintain cache" /> <MESSAGE value=":bug: Fix crash on maintain cache" />
<option name="LAST_COMMIT_MESSAGE" value=":bug: Fix crash on maintain cache" /> <MESSAGE value=":sparkles: Un-public indexable &amp; select by pools" />
<MESSAGE value=":sparkles: Multipart file upload" />
<MESSAGE value=":sparkles: Make it more implementable" />
<option name="LAST_COMMIT_MESSAGE" value=":sparkles: Make it more implementable" />
</component> </component>
<component name="VgoProject"> <component name="VgoProject">
<settings-migrated>true</settings-migrated> <settings-migrated>true</settings-migrated>

View File

@ -28,11 +28,14 @@ type Attachment struct {
Destination AttachmentDst `json:"destination"` Destination AttachmentDst `json:"destination"`
RefCount int `json:"ref_count"` RefCount int `json:"ref_count"`
FileChunks datatypes.JSONMap `json:"file_chunks"`
CleanedAt *time.Time `json:"cleaned_at"` CleanedAt *time.Time `json:"cleaned_at"`
Metadata datatypes.JSONMap `json:"metadata"` Metadata datatypes.JSONMap `json:"metadata"`
IsMature bool `json:"is_mature"` IsMature bool `json:"is_mature"`
IsAnalyzed bool `json:"is_analyzed"` IsAnalyzed bool `json:"is_analyzed"`
IsUploaded bool `json:"is_uploaded"`
IsSelfRef bool `json:"is_self_ref"` IsSelfRef bool `json:"is_self_ref"`
Ref *Attachment `json:"ref"` Ref *Attachment `json:"ref"`

View File

@ -22,6 +22,8 @@ func openAttachment(c *fiber.Ctx) error {
metadata, err := services.GetAttachmentByRID(id) metadata, err := services.GetAttachmentByRID(id)
if err != nil { if err != nil {
return fiber.NewError(fiber.StatusNotFound) return fiber.NewError(fiber.StatusNotFound)
} else if !metadata.IsUploaded {
return fiber.NewError(fiber.StatusNotFound, "file is in uploading progress, please wait until all chunk uploaded")
} }
var destMap map[string]any var destMap map[string]any
@ -78,71 +80,6 @@ func getAttachmentMeta(c *fiber.Ctx) error {
return c.JSON(metadata) return c.JSON(metadata)
} }
func createAttachment(c *fiber.Ctx) error {
if err := gap.H.EnsureAuthenticated(c); err != nil {
return err
}
user := c.Locals("user").(models.Account)
poolAlias := c.FormValue("pool")
if len(poolAlias) == 0 {
poolAlias = c.FormValue("usage")
}
aliasingMap := viper.GetStringMapString("pools.aliases")
if val, ok := aliasingMap[poolAlias]; ok {
poolAlias = val
}
pool, err := services.GetAttachmentPoolByAlias(poolAlias)
if err != nil {
return fiber.NewError(fiber.StatusBadRequest, fmt.Sprintf("unable to get attachment pool info: %v", err))
}
file, err := c.FormFile("file")
if err != nil {
return err
}
if err = gap.H.EnsureGrantedPerm(c, "CreateAttachments", file.Size); err != nil {
return err
} else if pool.Config.Data().MaxFileSize != nil && file.Size > *pool.Config.Data().MaxFileSize {
return fiber.NewError(fiber.StatusBadRequest, fmt.Sprintf("attachment pool %s doesn't allow file larger than %d", pool.Alias, *pool.Config.Data().MaxFileSize))
}
usermeta := make(map[string]any)
_ = jsoniter.UnmarshalFromString(c.FormValue("metadata"), &usermeta)
tx := database.C.Begin()
metadata, err := services.NewAttachmentMetadata(tx, user, file, models.Attachment{
Alternative: c.FormValue("alt"),
MimeType: c.FormValue("mimetype"),
Metadata: usermeta,
IsMature: len(c.FormValue("mature")) > 0,
IsAnalyzed: false,
Destination: models.AttachmentDstTemporary,
Pool: &pool,
PoolID: &pool.ID,
})
if err != nil {
tx.Rollback()
return fiber.NewError(fiber.StatusBadRequest, err.Error())
}
if err := services.UploadFileToTemporary(c, file, metadata); err != nil {
tx.Rollback()
return fiber.NewError(fiber.StatusBadRequest, err.Error())
}
tx.Commit()
metadata.Account = user
metadata.Pool = &pool
services.PublishAnalyzeTask(metadata)
return c.JSON(metadata)
}
func updateAttachmentMeta(c *fiber.Ctx) error { func updateAttachmentMeta(c *fiber.Ctx) error {
id, _ := c.ParamsInt("id", 0) id, _ := c.ParamsInt("id", 0)

View File

@ -16,10 +16,13 @@ func MapAPIs(app *fiber.App, baseURL string) {
api.Get("/attachments", listAttachment) api.Get("/attachments", listAttachment)
api.Get("/attachments/:id/meta", getAttachmentMeta) api.Get("/attachments/:id/meta", getAttachmentMeta)
api.Get("/attachments/:id", openAttachment) api.Get("/attachments/:id", openAttachment)
api.Post("/attachments", createAttachment) api.Post("/attachments", createAttachmentDirectly)
api.Put("/attachments/:id", updateAttachmentMeta) api.Put("/attachments/:id", updateAttachmentMeta)
api.Delete("/attachments/:id", deleteAttachment) api.Delete("/attachments/:id", deleteAttachment)
api.Post("/attachments/multipart", createAttachmentMultipartPlaceholder)
api.Post("/attachments/multipart/:file/:chunk", uploadAttachmentMultipart)
api.Get("/stickers/manifest", listStickerManifest) api.Get("/stickers/manifest", listStickerManifest)
api.Get("/stickers/packs", listStickerPacks) api.Get("/stickers/packs", listStickerPacks)
api.Post("/stickers/packs", createStickerPack) api.Post("/stickers/packs", createStickerPack)

View File

@ -0,0 +1,75 @@
package api
import (
"fmt"
"git.solsynth.dev/hydrogen/paperclip/pkg/internal/database"
"git.solsynth.dev/hydrogen/paperclip/pkg/internal/gap"
"git.solsynth.dev/hydrogen/paperclip/pkg/internal/models"
"git.solsynth.dev/hydrogen/paperclip/pkg/internal/services"
"github.com/gofiber/fiber/v2"
jsoniter "github.com/json-iterator/go"
"github.com/spf13/viper"
)
func createAttachmentDirectly(c *fiber.Ctx) error {
if err := gap.H.EnsureAuthenticated(c); err != nil {
return err
}
user := c.Locals("user").(models.Account)
poolAlias := c.FormValue("pool")
aliasingMap := viper.GetStringMapString("pools.aliases")
if val, ok := aliasingMap[poolAlias]; ok {
poolAlias = val
}
pool, err := services.GetAttachmentPoolByAlias(poolAlias)
if err != nil {
return fiber.NewError(fiber.StatusBadRequest, fmt.Sprintf("unable to get attachment pool info: %v", err))
}
file, err := c.FormFile("file")
if err != nil {
return err
}
if err = gap.H.EnsureGrantedPerm(c, "CreateAttachments", file.Size); err != nil {
return err
} else if pool.Config.Data().MaxFileSize != nil && file.Size > *pool.Config.Data().MaxFileSize {
return fiber.NewError(fiber.StatusBadRequest, fmt.Sprintf("attachment pool %s doesn't allow file larger than %d", pool.Alias, *pool.Config.Data().MaxFileSize))
}
usermeta := make(map[string]any)
_ = jsoniter.UnmarshalFromString(c.FormValue("metadata"), &usermeta)
tx := database.C.Begin()
metadata, err := services.NewAttachmentMetadata(tx, user, file, models.Attachment{
Alternative: c.FormValue("alt"),
MimeType: c.FormValue("mimetype"),
Metadata: usermeta,
IsMature: len(c.FormValue("mature")) > 0,
IsAnalyzed: false,
Destination: models.AttachmentDstTemporary,
Pool: &pool,
PoolID: &pool.ID,
})
if err != nil {
tx.Rollback()
return fiber.NewError(fiber.StatusBadRequest, err.Error())
}
if err := services.UploadFileToTemporary(c, file, metadata); err != nil {
tx.Rollback()
return fiber.NewError(fiber.StatusBadRequest, err.Error())
}
tx.Commit()
metadata.Account = user
metadata.Pool = &pool
services.PublishAnalyzeTask(metadata)
return c.JSON(metadata)
}

View File

@ -0,0 +1,129 @@
package api
import (
"encoding/json"
"fmt"
"git.solsynth.dev/hydrogen/paperclip/pkg/internal/database"
"git.solsynth.dev/hydrogen/paperclip/pkg/internal/gap"
"git.solsynth.dev/hydrogen/paperclip/pkg/internal/models"
"git.solsynth.dev/hydrogen/paperclip/pkg/internal/server/exts"
"git.solsynth.dev/hydrogen/paperclip/pkg/internal/services"
"github.com/gofiber/fiber/v2"
"github.com/spf13/viper"
)
func createAttachmentMultipartPlaceholder(c *fiber.Ctx) error {
if err := gap.H.EnsureAuthenticated(c); err != nil {
return err
}
user := c.Locals("user").(models.Account)
var data struct {
Pool string `json:"pool" validate:"required"`
Size int64 `json:"size" validate:"required"`
FileName string `json:"name" validate:"required"`
Alternative string `json:"alt"`
MimeType string `json:"mimetype"`
Metadata map[string]any `json:"metadata"`
IsMature bool `json:"is_mature"`
}
if err := exts.BindAndValidate(c, &data); err != nil {
return err
}
aliasingMap := viper.GetStringMapString("pools.aliases")
if val, ok := aliasingMap[data.Pool]; ok {
data.Pool = val
}
pool, err := services.GetAttachmentPoolByAlias(data.Pool)
if err != nil {
return fiber.NewError(fiber.StatusBadRequest, fmt.Sprintf("unable to get attachment pool info: %v", err))
}
if err = gap.H.EnsureGrantedPerm(c, "CreateAttachments", data.Size); err != nil {
return err
} else if pool.Config.Data().MaxFileSize != nil && *pool.Config.Data().MaxFileSize > data.Size {
return fiber.NewError(fiber.StatusBadRequest, fmt.Sprintf("attachment pool %s doesn't allow file larger than %d", pool.Alias, *pool.Config.Data().MaxFileSize))
}
metadata, err := services.NewAttachmentPlaceholder(database.C, user, models.Attachment{
Name: data.FileName,
Size: data.Size,
Alternative: data.Alternative,
MimeType: data.MimeType,
Metadata: data.Metadata,
IsMature: data.IsMature,
IsAnalyzed: false,
Destination: models.AttachmentDstTemporary,
Pool: &pool,
PoolID: &pool.ID,
})
if err != nil {
return fiber.NewError(fiber.StatusBadRequest, err.Error())
}
return c.JSON(fiber.Map{
"chunk_size": viper.GetInt64("performance.file_chunk_size"),
"chunk_count": len(metadata.FileChunks),
"meta": metadata,
})
}
func uploadAttachmentMultipart(c *fiber.Ctx) error {
if err := gap.H.EnsureAuthenticated(c); err != nil {
return err
}
user := c.Locals("user").(models.Account)
rid := c.Params("file")
cid := c.Params("chunk")
file, err := c.FormFile("file")
if err != nil {
return err
} else if file.Size > viper.GetInt64("performance.file_chunk_size") {
return fiber.NewError(fiber.StatusBadRequest, "file is too large for one chunk")
}
meta, err := services.GetAttachmentByRID(rid)
if err != nil {
return fiber.NewError(fiber.StatusNotFound, fmt.Sprintf("attachment was not found: %v", err))
} else if user.ID != meta.AccountID {
return fiber.NewError(fiber.StatusForbidden, "you are not authorized to upload this attachment")
}
if _, ok := meta.FileChunks[cid]; !ok {
return fiber.NewError(fiber.StatusNotFound, fmt.Sprintf("chunk %s was not found", cid))
} else if services.CheckChunkExistsInTemporary(meta, cid) {
return fiber.NewError(fiber.StatusNotFound, fmt.Sprintf("chunk %s was uploaded", cid))
}
if err := services.UploadChunkToTemporary(c, cid, file, meta); err != nil {
return fiber.NewError(fiber.StatusBadRequest, err.Error())
}
chunkArrange := make([]string, len(meta.FileChunks))
isAllUploaded := true
for cid, idx := range meta.FileChunks {
if !services.CheckChunkExistsInTemporary(meta, cid) {
isAllUploaded = false
break
} else if val, ok := idx.(json.Number); ok {
data, _ := val.Int64()
chunkArrange[data] = cid
}
}
if !isAllUploaded {
database.C.Save(&meta)
return c.JSON(meta)
}
if meta, err = services.MergeFileChunks(meta, chunkArrange); err != nil {
return fiber.NewError(fiber.StatusInternalServerError, err.Error())
} else {
return c.JSON(meta)
}
}

View File

@ -53,7 +53,10 @@ func ScanUnanalyzedFileFromDatabase() {
} }
var attachments []models.Attachment var attachments []models.Attachment
if err := database.C.Where("destination = ? OR is_analyzed = ?", models.AttachmentDstTemporary, false).Find(&attachments).Error; err != nil { if err := database.C.
Where("is_uploaded = ?", true).
Where("destination = ? OR is_analyzed = ?", models.AttachmentDstTemporary, false).
Find(&attachments).Error; err != nil {
log.Error().Err(err).Msg("Scan unanalyzed files from database failed...") log.Error().Err(err).Msg("Scan unanalyzed files from database failed...")
return return
} }
@ -93,12 +96,22 @@ func ScanUnanalyzedFileFromDatabase() {
} }
func AnalyzeAttachment(file models.Attachment) error { func AnalyzeAttachment(file models.Attachment) error {
if file.Destination != models.AttachmentDstTemporary { if !file.IsUploaded {
return fmt.Errorf("file isn't finish multipart upload")
} else if file.Destination != models.AttachmentDstTemporary {
return fmt.Errorf("attachment isn't in temporary storage, unable to analyze") return fmt.Errorf("attachment isn't in temporary storage, unable to analyze")
} }
var start time.Time var start time.Time
if len(file.HashCode) == 0 {
if hash, err := HashAttachment(file); err != nil {
return err
} else {
file.HashCode = hash
}
}
// Do analyze jobs // Do analyze jobs
if !file.IsAnalyzed || len(file.HashCode) == 0 { if !file.IsAnalyzed || len(file.HashCode) == 0 {
destMap := viper.GetStringMap("destinations.temporary") destMap := viper.GetStringMap("destinations.temporary")
@ -158,12 +171,6 @@ func AnalyzeAttachment(file models.Attachment) error {
"color_space": stream.ColorSpace, "color_space": stream.ColorSpace,
} }
} }
if hash, err := HashAttachment(file); err != nil {
return err
} else {
file.HashCode = hash
}
} }
tx := database.C.Begin() tx := database.C.Begin()

View File

@ -2,6 +2,9 @@ package services
import ( import (
"fmt" "fmt"
"github.com/spf13/viper"
"gorm.io/datatypes"
"math"
"mime" "mime"
"mime/multipart" "mime/multipart"
"net/http" "net/http"
@ -114,6 +117,38 @@ func NewAttachmentMetadata(tx *gorm.DB, user models.Account, file *multipart.Fil
return attachment, nil return attachment, nil
} }
func NewAttachmentPlaceholder(tx *gorm.DB, user models.Account, attachment models.Attachment) (models.Attachment, error) {
attachment.Uuid = uuid.NewString()
attachment.Rid = RandString(16)
attachment.IsUploaded = false
attachment.FileChunks = datatypes.JSONMap{}
attachment.AccountID = user.ID
chunkSize := viper.GetInt64("performance.file_chunk_size")
chunkCount := math.Ceil(float64(attachment.Size) / float64(chunkSize))
for idx := 0; idx < int(chunkCount); idx++ {
cid := RandString(8)
attachment.FileChunks[cid] = idx
}
// If the user didn't provide file mimetype manually, we have to detect it
if len(attachment.MimeType) == 0 {
if ext := filepath.Ext(attachment.Name); len(ext) > 0 {
// Detect mimetype by file extensions
attachment.MimeType = mime.TypeByExtension(ext)
}
}
if err := tx.Save(&attachment).Error; err != nil {
return attachment, fmt.Errorf("failed to save attachment record: %v", err)
} else {
MaintainAttachmentCache()
CacheAttachment(attachment)
}
return attachment, nil
}
func TryLinkAttachment(tx *gorm.DB, og models.Attachment, hash string) (bool, error) { func TryLinkAttachment(tx *gorm.DB, og models.Attachment, hash string) (bool, error) {
prev, err := GetAttachmentByHash(hash) prev, err := GetAttachmentByHash(hash)
if err != nil { if err != nil {

View File

@ -0,0 +1,58 @@
package services
import (
"fmt"
"git.solsynth.dev/hydrogen/paperclip/pkg/internal/database"
"git.solsynth.dev/hydrogen/paperclip/pkg/internal/models"
jsoniter "github.com/json-iterator/go"
"github.com/spf13/viper"
"io"
"os"
"path/filepath"
)
func MergeFileChunks(meta models.Attachment, arrange []string) (models.Attachment, error) {
destMap := viper.GetStringMap("destinations.temporary")
var dest models.LocalDestination
rawDest, _ := jsoniter.Marshal(destMap)
_ = jsoniter.Unmarshal(rawDest, &dest)
destPath := filepath.Join(dest.Path, meta.Uuid)
destFile, err := os.Create(destPath)
if err != nil {
return meta, err
}
defer destFile.Close()
// Merge files
for _, chunk := range arrange {
chunkPath := filepath.Join(dest.Path, fmt.Sprintf("%s.%s", meta.Uuid, chunk))
chunkFile, err := os.Open(chunkPath)
if err != nil {
return meta, err
}
_, err = io.Copy(destFile, chunkFile)
if err != nil {
_ = chunkFile.Close()
return meta, err
}
_ = chunkFile.Close()
}
// Do post-upload tasks
meta.IsUploaded = true
database.C.Save(&meta)
PublishAnalyzeTask(meta)
// Clean up
for _, chunk := range arrange {
chunkPath := filepath.Join(dest.Path, fmt.Sprintf("%s.%s", meta.Uuid, chunk))
_ = os.Remove(chunkPath)
}
return meta, nil
}

View File

@ -4,7 +4,7 @@ import (
"math/rand" "math/rand"
) )
var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")
func RandString(length int) string { func RandString(length int) string {
builder := make([]rune, length) builder := make([]rune, length)

View File

@ -35,7 +35,7 @@ func StartConsumeDeletionTask() {
} }
} }
func RunMarkDeletionTask() { func RunMarkLifecycleDeletionTask() {
var pools []models.AttachmentPool var pools []models.AttachmentPool
if err := database.C.Find(&pools).Error; err != nil { if err := database.C.Find(&pools).Error; err != nil {
return return
@ -53,6 +53,7 @@ func RunMarkDeletionTask() {
tx := database.C. tx := database.C.
Where("pool_id = ?", pool.ID). Where("pool_id = ?", pool.ID).
Where("created_at < ?", lifecycle). Where("created_at < ?", lifecycle).
Where("cleaned_at IS NULL").
Updates(&models.Attachment{CleanedAt: lo.ToPtr(time.Now())}) Updates(&models.Attachment{CleanedAt: lo.ToPtr(time.Now())})
log.Info(). log.Info().
Str("pool", pool.Alias). Str("pool", pool.Alias).
@ -62,13 +63,26 @@ func RunMarkDeletionTask() {
} }
} }
func RunMarkMultipartDeletionTask() {
lifecycle := time.Now().Add(-60 * time.Minute)
tx := database.C.
Where("created_at < ?", lifecycle).
Where("is_uploaded = ?", false).
Where("cleaned_at IS NULL").
Updates(&models.Attachment{CleanedAt: lo.ToPtr(time.Now())})
log.Info().
Int64("count", tx.RowsAffected).
Err(tx.Error).
Msg("Marking attachments as clean needed due to multipart lifecycle...")
}
func RunScheduleDeletionTask() { func RunScheduleDeletionTask() {
var attachments []models.Attachment var attachments []models.Attachment
if err := database.C.Where("cleaned_at IS NOT NULL").Find(&attachments).Error; err != nil { if err := database.C.Where("cleaned_at IS NOT NULL").Find(&attachments).Error; err != nil {
return return
} }
for idx, attachment := range attachments { for _, attachment := range attachments {
if attachment.RefID != nil { if attachment.RefID != nil {
continue continue
} }
@ -76,8 +90,6 @@ func RunScheduleDeletionTask() {
log.Error(). log.Error().
Uint("id", attachment.ID). Uint("id", attachment.ID).
Msg("An error occurred when deleting marked clean up attachments...") Msg("An error occurred when deleting marked clean up attachments...")
} else {
attachments[idx].CleanedAt = lo.ToPtr(time.Now())
} }
} }
@ -85,6 +97,20 @@ func RunScheduleDeletionTask() {
} }
func DeleteFile(meta models.Attachment) error { func DeleteFile(meta models.Attachment) error {
if !meta.IsUploaded {
destMap := viper.GetStringMap("destinations.temporary")
var dest models.LocalDestination
rawDest, _ := jsoniter.Marshal(destMap)
_ = jsoniter.Unmarshal(rawDest, &dest)
for cid := range meta.FileChunks {
path := filepath.Join(dest.Path, fmt.Sprintf("%s.%s", meta.Uuid, cid))
_ = os.Remove(path)
}
return nil
}
var destMap map[string]any var destMap map[string]any
if meta.Destination == models.AttachmentDstTemporary { if meta.Destination == models.AttachmentDstTemporary {
destMap = viper.GetStringMap("destinations.temporary") destMap = viper.GetStringMap("destinations.temporary")

View File

@ -1,8 +1,8 @@
package services package services
import ( import (
"bytes"
"context" "context"
"errors"
"fmt" "fmt"
"io" "io"
"mime/multipart" "mime/multipart"
@ -29,12 +29,44 @@ func UploadFileToTemporary(ctx *fiber.Ctx, file *multipart.FileHeader, meta mode
case models.DestinationTypeLocal: case models.DestinationTypeLocal:
var destConfigured models.LocalDestination var destConfigured models.LocalDestination
_ = jsoniter.Unmarshal(rawDest, &destConfigured) _ = jsoniter.Unmarshal(rawDest, &destConfigured)
return UploadFileToLocal(destConfigured, ctx, file, meta) return ctx.SaveFile(file, filepath.Join(destConfigured.Path, meta.Uuid))
default: default:
return fmt.Errorf("invalid destination: unsupported protocol %s", dest.Type) return fmt.Errorf("invalid destination: unsupported protocol %s", dest.Type)
} }
} }
func UploadChunkToTemporary(ctx *fiber.Ctx, cid string, file *multipart.FileHeader, meta models.Attachment) error {
destMap := viper.GetStringMap("destinations.temporary")
var dest models.BaseDestination
rawDest, _ := jsoniter.Marshal(destMap)
_ = jsoniter.Unmarshal(rawDest, &dest)
switch dest.Type {
case models.DestinationTypeLocal:
var destConfigured models.LocalDestination
_ = jsoniter.Unmarshal(rawDest, &destConfigured)
return ctx.SaveFile(file, filepath.Join(destConfigured.Path, fmt.Sprintf("%s.%s", meta.Uuid, cid)))
default:
return fmt.Errorf("invalid destination: unsupported protocol %s", dest.Type)
}
}
func CheckChunkExistsInTemporary(meta models.Attachment, cid string) bool {
destMap := viper.GetStringMap("destinations.temporary")
var dest models.LocalDestination
rawDest, _ := jsoniter.Marshal(destMap)
_ = jsoniter.Unmarshal(rawDest, &dest)
path := filepath.Join(dest.Path, fmt.Sprintf("%s.%s", meta.Uuid, cid))
if _, err := os.Stat(path); errors.Is(err, os.ErrNotExist) {
return false
} else {
return true
}
}
func ReUploadFileToPermanent(meta models.Attachment) error { func ReUploadFileToPermanent(meta models.Attachment) error {
if meta.Destination != models.AttachmentDstTemporary { if meta.Destination != models.AttachmentDstTemporary {
return fmt.Errorf("attachment isn't in temporary storage, unable to process") return fmt.Errorf("attachment isn't in temporary storage, unable to process")
@ -50,8 +82,8 @@ func ReUploadFileToPermanent(meta models.Attachment) error {
prevDestMap := viper.GetStringMap("destinations.temporary") prevDestMap := viper.GetStringMap("destinations.temporary")
// Currently the temporary destination only support the local // Currently, the temporary destination only supports the local.
// So we can do this // So we can do this.
var prevDest models.LocalDestination var prevDest models.LocalDestination
prevRawDest, _ := jsoniter.Marshal(prevDestMap) prevRawDest, _ := jsoniter.Marshal(prevDestMap)
_ = jsoniter.Unmarshal(prevRawDest, &prevDest) _ = jsoniter.Unmarshal(prevRawDest, &prevDest)
@ -111,39 +143,3 @@ func ReUploadFileToPermanent(meta models.Attachment) error {
return fmt.Errorf("invalid destination: unsupported protocol %s", dest.Type) return fmt.Errorf("invalid destination: unsupported protocol %s", dest.Type)
} }
} }
func UploadFileToLocal(config models.LocalDestination, ctx *fiber.Ctx, file *multipart.FileHeader, meta models.Attachment) error {
return ctx.SaveFile(file, filepath.Join(config.Path, meta.Uuid))
}
func UploadFileToS3(config models.S3Destination, file *multipart.FileHeader, meta models.Attachment) error {
header, err := file.Open()
if err != nil {
return fmt.Errorf("read upload file: %v", err)
}
defer header.Close()
buffer := bytes.NewBuffer(nil)
if _, err := io.Copy(buffer, header); err != nil {
return fmt.Errorf("create io reader for upload file: %v", err)
}
client, err := minio.New(config.Endpoint, &minio.Options{
Creds: credentials.NewStaticV4(config.SecretID, config.SecretKey, ""),
Secure: config.EnableSSL,
})
if err != nil {
return fmt.Errorf("unable to configure s3 client: %v", err)
}
_, err = client.PutObject(context.Background(), config.Bucket, filepath.Join(config.Path, meta.Uuid), buffer, file.Size, minio.PutObjectOptions{
ContentType: meta.MimeType,
SendContentMd5: false,
DisableContentSha256: true,
})
if err != nil {
return fmt.Errorf("unable to upload file to s3: %v", err)
}
return nil
}

View File

@ -59,7 +59,8 @@ func main() {
// Configure timed tasks // Configure timed tasks
quartz := cron.New(cron.WithLogger(cron.VerbosePrintfLogger(&log.Logger))) quartz := cron.New(cron.WithLogger(cron.VerbosePrintfLogger(&log.Logger)))
quartz.AddFunc("@every 60m", services.DoAutoDatabaseCleanup) quartz.AddFunc("@every 60m", services.DoAutoDatabaseCleanup)
quartz.AddFunc("@every 60m", services.RunMarkDeletionTask) quartz.AddFunc("@every 60m", services.RunMarkLifecycleDeletionTask)
quartz.AddFunc("@every 60m", services.RunMarkMultipartDeletionTask)
quartz.AddFunc("@midnight", services.RunScheduleDeletionTask) quartz.AddFunc("@midnight", services.RunScheduleDeletionTask)
quartz.Start() quartz.Start()
@ -75,7 +76,7 @@ func main() {
log.Info().Msgf("Paperclip v%s is started...", pkg.AppVersion) log.Info().Msgf("Paperclip v%s is started...", pkg.AppVersion)
services.ScanUnanalyzedFileFromDatabase() services.ScanUnanalyzedFileFromDatabase()
services.RunMarkDeletionTask() services.RunMarkLifecycleDeletionTask()
quit := make(chan os.Signal, 1) quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)

View File

@ -29,6 +29,9 @@ cookie_samesite = "Lax"
access_token_duration = 300 access_token_duration = 300
refresh_token_duration = 2592000 refresh_token_duration = 2592000
[performance]
file_chunk_size = 26214400
[database] [database]
dsn = "host=localhost user=postgres password=password dbname=hy_paperclip port=5432 sslmode=disable" dsn = "host=localhost user=postgres password=password dbname=hy_paperclip port=5432 sslmode=disable"
prefix = "paperclip_" prefix = "paperclip_"