🔀 Merge pull request '✨ 支援分片上传' (#3) from feature/multipart-upload into master
Reviewed-on: Hydrogen/Paperclip#3
This commit is contained in:
		
							
								
								
									
										71
									
								
								.idea/workspace.xml
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										71
									
								
								.idea/workspace.xml
									
									
									
										generated
									
									
									
								
							| @@ -4,10 +4,12 @@ | ||||
|     <option name="autoReloadType" value="ALL" /> | ||||
|   </component> | ||||
|   <component name="ChangeListManager"> | ||||
|     <list default="true" id="18dd0d68-b4b8-40db-9734-9119b5c848bd" name="更改" comment=":bug: Fix crash on maintain cache"> | ||||
|     <list default="true" id="18dd0d68-b4b8-40db-9734-9119b5c848bd" name="更改" comment=":sparkles: Make it more implementable"> | ||||
|       <change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" /> | ||||
|       <change beforePath="$PROJECT_DIR$/pkg/internal/models/pools.go" beforeDir="false" afterPath="$PROJECT_DIR$/pkg/internal/models/pools.go" afterDir="false" /> | ||||
|       <change beforePath="$PROJECT_DIR$/pkg/internal/server/api/attachment_dir_api.go" beforeDir="false" afterPath="$PROJECT_DIR$/pkg/internal/server/api/index_api.go" afterDir="false" /> | ||||
|       <change beforePath="$PROJECT_DIR$/pkg/internal/server/api/up_multipart_api.go" beforeDir="false" afterPath="$PROJECT_DIR$/pkg/internal/server/api/up_multipart_api.go" afterDir="false" /> | ||||
|       <change beforePath="$PROJECT_DIR$/pkg/internal/services/analyzer.go" beforeDir="false" afterPath="$PROJECT_DIR$/pkg/internal/services/analyzer.go" afterDir="false" /> | ||||
|       <change beforePath="$PROJECT_DIR$/pkg/internal/services/merger.go" beforeDir="false" afterPath="$PROJECT_DIR$/pkg/internal/services/merger.go" afterDir="false" /> | ||||
|       <change beforePath="$PROJECT_DIR$/pkg/internal/services/recycler.go" beforeDir="false" afterPath="$PROJECT_DIR$/pkg/internal/services/recycler.go" afterDir="false" /> | ||||
|     </list> | ||||
|     <option name="SHOW_DIALOG" value="false" /> | ||||
|     <option name="HIGHLIGHT_CONFLICTS" value="true" /> | ||||
| @@ -23,10 +25,15 @@ | ||||
|   </component> | ||||
|   <component name="GOROOT" url="file:///opt/homebrew/opt/go/libexec" /> | ||||
|   <component name="Git.Settings"> | ||||
|     <option name="RECENT_BRANCH_BY_REPOSITORY"> | ||||
|       <map> | ||||
|         <entry key="$PROJECT_DIR$" value="master" /> | ||||
|       </map> | ||||
|     </option> | ||||
|     <option name="RECENT_GIT_ROOT_PATH" value="$PROJECT_DIR$" /> | ||||
|   </component> | ||||
|   <component name="ProblemsViewState"> | ||||
|     <option name="selectedTabId" value="ProjectErrors" /> | ||||
|     <option name="selectedTabId" value="CurrentFile" /> | ||||
|   </component> | ||||
|   <component name="ProjectColorInfo">{ | ||||
|   "customColor": "", | ||||
| @@ -37,33 +44,33 @@ | ||||
|     <option name="hideEmptyMiddlePackages" value="true" /> | ||||
|     <option name="showLibraryContents" value="true" /> | ||||
|   </component> | ||||
|   <component name="PropertiesComponent">{ | ||||
|   "keyToString": { | ||||
|     "DefaultGoTemplateProperty": "Go File", | ||||
|     "Go Build.Backend.executor": "Run", | ||||
|     "Go 构建.Backend.executor": "Run", | ||||
|     "RunOnceActivity.ShowReadmeOnStart": "true", | ||||
|     "RunOnceActivity.go.formatter.settings.were.checked": "true", | ||||
|     "RunOnceActivity.go.migrated.go.modules.settings": "true", | ||||
|     "RunOnceActivity.go.modules.automatic.dependencies.download": "true", | ||||
|     "RunOnceActivity.go.modules.go.list.on.any.changes.was.set": "true", | ||||
|     "git-widget-placeholder": "master", | ||||
|     "go.import.settings.migrated": "true", | ||||
|     "go.sdk.automatically.set": "true", | ||||
|     "last_opened_file_path": "/Users/littlesheep/Documents/Projects/Hydrogen/Paperclip/pkg/internal/grpc", | ||||
|     "node.js.detected.package.eslint": "true", | ||||
|     "node.js.selected.package.eslint": "(autodetect)", | ||||
|     "nodejs_package_manager_path": "npm", | ||||
|     "run.code.analysis.last.selected.profile": "pProject Default", | ||||
|     "settings.editor.selected.configurable": "preferences.pluginManager", | ||||
|     "vue.rearranger.settings.migration": "true" | ||||
|   <component name="PropertiesComponent"><![CDATA[{ | ||||
|   "keyToString": { | ||||
|     "DefaultGoTemplateProperty": "Go File", | ||||
|     "Go Build.Backend.executor": "Run", | ||||
|     "Go 构建.Backend.executor": "Run", | ||||
|     "RunOnceActivity.ShowReadmeOnStart": "true", | ||||
|     "RunOnceActivity.go.formatter.settings.were.checked": "true", | ||||
|     "RunOnceActivity.go.migrated.go.modules.settings": "true", | ||||
|     "RunOnceActivity.go.modules.automatic.dependencies.download": "true", | ||||
|     "RunOnceActivity.go.modules.go.list.on.any.changes.was.set": "true", | ||||
|     "git-widget-placeholder": "feature/multipart-upload", | ||||
|     "go.import.settings.migrated": "true", | ||||
|     "go.sdk.automatically.set": "true", | ||||
|     "last_opened_file_path": "/Users/littlesheep/Documents/Projects/Hydrogen/Paperclip/pkg/internal/grpc", | ||||
|     "node.js.detected.package.eslint": "true", | ||||
|     "node.js.selected.package.eslint": "(autodetect)", | ||||
|     "nodejs_package_manager_path": "npm", | ||||
|     "run.code.analysis.last.selected.profile": "pProject Default", | ||||
|     "settings.editor.selected.configurable": "preferences.pluginManager", | ||||
|     "vue.rearranger.settings.migration": "true" | ||||
|   }, | ||||
|   "keyToStringList": { | ||||
|     "DatabaseDriversLRU": [ | ||||
|       "postgresql" | ||||
|   "keyToStringList": { | ||||
|     "DatabaseDriversLRU": [ | ||||
|       "postgresql" | ||||
|     ] | ||||
|   } | ||||
| }</component> | ||||
| }]]></component> | ||||
|   <component name="RecentsManager"> | ||||
|     <key name="CopyFile.RECENT_KEYS"> | ||||
|       <recent name="$PROJECT_DIR$/pkg/internal/grpc" /> | ||||
| @@ -113,9 +120,6 @@ | ||||
|     </option> | ||||
|   </component> | ||||
|   <component name="VcsManagerConfiguration"> | ||||
|     <MESSAGE value=":sparkles: Provide a faster check attachment exists grpc method" /> | ||||
|     <MESSAGE value=":truck: Update url mapping" /> | ||||
|     <MESSAGE value=":bug: Fix uuid duplicate when link exists" /> | ||||
|     <MESSAGE value="
:sparkles: Add health check" /> | ||||
|     <MESSAGE value=":arrow_up: Upgrade Passport and use Hyper SDK" /> | ||||
|     <MESSAGE value=":arrow_up: Upgrade Passport to fix bug" /> | ||||
| @@ -138,7 +142,10 @@ | ||||
|     <MESSAGE value=":boom: Replace attachment id by rid when fetching" /> | ||||
|     <MESSAGE value=":boom: Use attachment rid instead of primary key when create" /> | ||||
|     <MESSAGE value=":bug: Fix crash on maintain cache" /> | ||||
|     <option name="LAST_COMMIT_MESSAGE" value=":bug: Fix crash on maintain cache" /> | ||||
|     <MESSAGE value=":sparkles: Un-public indexable & select by pools" /> | ||||
|     <MESSAGE value=":sparkles: Multipart file upload" /> | ||||
|     <MESSAGE value=":sparkles: Make it more implementable" /> | ||||
|     <option name="LAST_COMMIT_MESSAGE" value=":sparkles: Make it more implementable" /> | ||||
|   </component> | ||||
|   <component name="VgoProject"> | ||||
|     <settings-migrated>true</settings-migrated> | ||||
|   | ||||
| @@ -28,11 +28,14 @@ type Attachment struct { | ||||
| 	Destination AttachmentDst `json:"destination"` | ||||
| 	RefCount    int           `json:"ref_count"` | ||||
|  | ||||
| 	FileChunks datatypes.JSONMap `json:"file_chunks"` | ||||
|  | ||||
| 	CleanedAt *time.Time `json:"cleaned_at"` | ||||
|  | ||||
| 	Metadata   datatypes.JSONMap `json:"metadata"` | ||||
| 	IsMature   bool              `json:"is_mature"` | ||||
| 	IsAnalyzed bool              `json:"is_analyzed"` | ||||
| 	IsUploaded bool              `json:"is_uploaded"` | ||||
| 	IsSelfRef  bool              `json:"is_self_ref"` | ||||
|  | ||||
| 	Ref   *Attachment `json:"ref"` | ||||
|   | ||||
| @@ -22,6 +22,8 @@ func openAttachment(c *fiber.Ctx) error { | ||||
| 	metadata, err := services.GetAttachmentByRID(id) | ||||
| 	if err != nil { | ||||
| 		return fiber.NewError(fiber.StatusNotFound) | ||||
| 	} else if !metadata.IsUploaded { | ||||
| 		return fiber.NewError(fiber.StatusNotFound, "file is in uploading progress, please wait until all chunk uploaded") | ||||
| 	} | ||||
|  | ||||
| 	var destMap map[string]any | ||||
| @@ -78,71 +80,6 @@ func getAttachmentMeta(c *fiber.Ctx) error { | ||||
| 	return c.JSON(metadata) | ||||
| } | ||||
|  | ||||
| func createAttachment(c *fiber.Ctx) error { | ||||
| 	if err := gap.H.EnsureAuthenticated(c); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	user := c.Locals("user").(models.Account) | ||||
|  | ||||
| 	poolAlias := c.FormValue("pool") | ||||
| 	if len(poolAlias) == 0 { | ||||
| 		poolAlias = c.FormValue("usage") | ||||
| 	} | ||||
| 	aliasingMap := viper.GetStringMapString("pools.aliases") | ||||
| 	if val, ok := aliasingMap[poolAlias]; ok { | ||||
| 		poolAlias = val | ||||
| 	} | ||||
|  | ||||
| 	pool, err := services.GetAttachmentPoolByAlias(poolAlias) | ||||
| 	if err != nil { | ||||
| 		return fiber.NewError(fiber.StatusBadRequest, fmt.Sprintf("unable to get attachment pool info: %v", err)) | ||||
| 	} | ||||
|  | ||||
| 	file, err := c.FormFile("file") | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	if err = gap.H.EnsureGrantedPerm(c, "CreateAttachments", file.Size); err != nil { | ||||
| 		return err | ||||
| 	} else if pool.Config.Data().MaxFileSize != nil && file.Size > *pool.Config.Data().MaxFileSize { | ||||
| 		return fiber.NewError(fiber.StatusBadRequest, fmt.Sprintf("attachment pool %s doesn't allow file larger than %d", pool.Alias, *pool.Config.Data().MaxFileSize)) | ||||
| 	} | ||||
|  | ||||
| 	usermeta := make(map[string]any) | ||||
| 	_ = jsoniter.UnmarshalFromString(c.FormValue("metadata"), &usermeta) | ||||
|  | ||||
| 	tx := database.C.Begin() | ||||
|  | ||||
| 	metadata, err := services.NewAttachmentMetadata(tx, user, file, models.Attachment{ | ||||
| 		Alternative: c.FormValue("alt"), | ||||
| 		MimeType:    c.FormValue("mimetype"), | ||||
| 		Metadata:    usermeta, | ||||
| 		IsMature:    len(c.FormValue("mature")) > 0, | ||||
| 		IsAnalyzed:  false, | ||||
| 		Destination: models.AttachmentDstTemporary, | ||||
| 		Pool:        &pool, | ||||
| 		PoolID:      &pool.ID, | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		tx.Rollback() | ||||
| 		return fiber.NewError(fiber.StatusBadRequest, err.Error()) | ||||
| 	} | ||||
|  | ||||
| 	if err := services.UploadFileToTemporary(c, file, metadata); err != nil { | ||||
| 		tx.Rollback() | ||||
| 		return fiber.NewError(fiber.StatusBadRequest, err.Error()) | ||||
| 	} | ||||
|  | ||||
| 	tx.Commit() | ||||
|  | ||||
| 	metadata.Account = user | ||||
| 	metadata.Pool = &pool | ||||
| 	services.PublishAnalyzeTask(metadata) | ||||
|  | ||||
| 	return c.JSON(metadata) | ||||
| } | ||||
|  | ||||
| func updateAttachmentMeta(c *fiber.Ctx) error { | ||||
| 	id, _ := c.ParamsInt("id", 0) | ||||
|  | ||||
|   | ||||
| @@ -16,10 +16,13 @@ func MapAPIs(app *fiber.App, baseURL string) { | ||||
| 		api.Get("/attachments", listAttachment) | ||||
| 		api.Get("/attachments/:id/meta", getAttachmentMeta) | ||||
| 		api.Get("/attachments/:id", openAttachment) | ||||
| 		api.Post("/attachments", createAttachment) | ||||
| 		api.Post("/attachments", createAttachmentDirectly) | ||||
| 		api.Put("/attachments/:id", updateAttachmentMeta) | ||||
| 		api.Delete("/attachments/:id", deleteAttachment) | ||||
|  | ||||
| 		api.Post("/attachments/multipart", createAttachmentMultipartPlaceholder) | ||||
| 		api.Post("/attachments/multipart/:file/:chunk", uploadAttachmentMultipart) | ||||
|  | ||||
| 		api.Get("/stickers/manifest", listStickerManifest) | ||||
| 		api.Get("/stickers/packs", listStickerPacks) | ||||
| 		api.Post("/stickers/packs", createStickerPack) | ||||
|   | ||||
							
								
								
									
										75
									
								
								pkg/internal/server/api/up_direct_api.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										75
									
								
								pkg/internal/server/api/up_direct_api.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,75 @@ | ||||
| package api | ||||
|  | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"git.solsynth.dev/hydrogen/paperclip/pkg/internal/database" | ||||
| 	"git.solsynth.dev/hydrogen/paperclip/pkg/internal/gap" | ||||
| 	"git.solsynth.dev/hydrogen/paperclip/pkg/internal/models" | ||||
| 	"git.solsynth.dev/hydrogen/paperclip/pkg/internal/services" | ||||
| 	"github.com/gofiber/fiber/v2" | ||||
| 	jsoniter "github.com/json-iterator/go" | ||||
| 	"github.com/spf13/viper" | ||||
| ) | ||||
|  | ||||
| func createAttachmentDirectly(c *fiber.Ctx) error { | ||||
| 	if err := gap.H.EnsureAuthenticated(c); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	user := c.Locals("user").(models.Account) | ||||
|  | ||||
| 	poolAlias := c.FormValue("pool") | ||||
|  | ||||
| 	aliasingMap := viper.GetStringMapString("pools.aliases") | ||||
| 	if val, ok := aliasingMap[poolAlias]; ok { | ||||
| 		poolAlias = val | ||||
| 	} | ||||
|  | ||||
| 	pool, err := services.GetAttachmentPoolByAlias(poolAlias) | ||||
| 	if err != nil { | ||||
| 		return fiber.NewError(fiber.StatusBadRequest, fmt.Sprintf("unable to get attachment pool info: %v", err)) | ||||
| 	} | ||||
|  | ||||
| 	file, err := c.FormFile("file") | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	if err = gap.H.EnsureGrantedPerm(c, "CreateAttachments", file.Size); err != nil { | ||||
| 		return err | ||||
| 	} else if pool.Config.Data().MaxFileSize != nil && file.Size > *pool.Config.Data().MaxFileSize { | ||||
| 		return fiber.NewError(fiber.StatusBadRequest, fmt.Sprintf("attachment pool %s doesn't allow file larger than %d", pool.Alias, *pool.Config.Data().MaxFileSize)) | ||||
| 	} | ||||
|  | ||||
| 	usermeta := make(map[string]any) | ||||
| 	_ = jsoniter.UnmarshalFromString(c.FormValue("metadata"), &usermeta) | ||||
|  | ||||
| 	tx := database.C.Begin() | ||||
|  | ||||
| 	metadata, err := services.NewAttachmentMetadata(tx, user, file, models.Attachment{ | ||||
| 		Alternative: c.FormValue("alt"), | ||||
| 		MimeType:    c.FormValue("mimetype"), | ||||
| 		Metadata:    usermeta, | ||||
| 		IsMature:    len(c.FormValue("mature")) > 0, | ||||
| 		IsAnalyzed:  false, | ||||
| 		Destination: models.AttachmentDstTemporary, | ||||
| 		Pool:        &pool, | ||||
| 		PoolID:      &pool.ID, | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		tx.Rollback() | ||||
| 		return fiber.NewError(fiber.StatusBadRequest, err.Error()) | ||||
| 	} | ||||
|  | ||||
| 	if err := services.UploadFileToTemporary(c, file, metadata); err != nil { | ||||
| 		tx.Rollback() | ||||
| 		return fiber.NewError(fiber.StatusBadRequest, err.Error()) | ||||
| 	} | ||||
|  | ||||
| 	tx.Commit() | ||||
|  | ||||
| 	metadata.Account = user | ||||
| 	metadata.Pool = &pool | ||||
| 	services.PublishAnalyzeTask(metadata) | ||||
|  | ||||
| 	return c.JSON(metadata) | ||||
| } | ||||
							
								
								
									
										129
									
								
								pkg/internal/server/api/up_multipart_api.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										129
									
								
								pkg/internal/server/api/up_multipart_api.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,129 @@ | ||||
| package api | ||||
|  | ||||
| import ( | ||||
| 	"encoding/json" | ||||
| 	"fmt" | ||||
| 	"git.solsynth.dev/hydrogen/paperclip/pkg/internal/database" | ||||
| 	"git.solsynth.dev/hydrogen/paperclip/pkg/internal/gap" | ||||
| 	"git.solsynth.dev/hydrogen/paperclip/pkg/internal/models" | ||||
| 	"git.solsynth.dev/hydrogen/paperclip/pkg/internal/server/exts" | ||||
| 	"git.solsynth.dev/hydrogen/paperclip/pkg/internal/services" | ||||
| 	"github.com/gofiber/fiber/v2" | ||||
| 	"github.com/spf13/viper" | ||||
| ) | ||||
|  | ||||
| func createAttachmentMultipartPlaceholder(c *fiber.Ctx) error { | ||||
| 	if err := gap.H.EnsureAuthenticated(c); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	user := c.Locals("user").(models.Account) | ||||
|  | ||||
| 	var data struct { | ||||
| 		Pool        string         `json:"pool" validate:"required"` | ||||
| 		Size        int64          `json:"size" validate:"required"` | ||||
| 		FileName    string         `json:"name" validate:"required"` | ||||
| 		Alternative string         `json:"alt"` | ||||
| 		MimeType    string         `json:"mimetype"` | ||||
| 		Metadata    map[string]any `json:"metadata"` | ||||
| 		IsMature    bool           `json:"is_mature"` | ||||
| 	} | ||||
|  | ||||
| 	if err := exts.BindAndValidate(c, &data); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	aliasingMap := viper.GetStringMapString("pools.aliases") | ||||
| 	if val, ok := aliasingMap[data.Pool]; ok { | ||||
| 		data.Pool = val | ||||
| 	} | ||||
|  | ||||
| 	pool, err := services.GetAttachmentPoolByAlias(data.Pool) | ||||
| 	if err != nil { | ||||
| 		return fiber.NewError(fiber.StatusBadRequest, fmt.Sprintf("unable to get attachment pool info: %v", err)) | ||||
| 	} | ||||
|  | ||||
| 	if err = gap.H.EnsureGrantedPerm(c, "CreateAttachments", data.Size); err != nil { | ||||
| 		return err | ||||
| 	} else if pool.Config.Data().MaxFileSize != nil && *pool.Config.Data().MaxFileSize > data.Size { | ||||
| 		return fiber.NewError(fiber.StatusBadRequest, fmt.Sprintf("attachment pool %s doesn't allow file larger than %d", pool.Alias, *pool.Config.Data().MaxFileSize)) | ||||
| 	} | ||||
|  | ||||
| 	metadata, err := services.NewAttachmentPlaceholder(database.C, user, models.Attachment{ | ||||
| 		Name:        data.FileName, | ||||
| 		Size:        data.Size, | ||||
| 		Alternative: data.Alternative, | ||||
| 		MimeType:    data.MimeType, | ||||
| 		Metadata:    data.Metadata, | ||||
| 		IsMature:    data.IsMature, | ||||
| 		IsAnalyzed:  false, | ||||
| 		Destination: models.AttachmentDstTemporary, | ||||
| 		Pool:        &pool, | ||||
| 		PoolID:      &pool.ID, | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		return fiber.NewError(fiber.StatusBadRequest, err.Error()) | ||||
| 	} | ||||
|  | ||||
| 	return c.JSON(fiber.Map{ | ||||
| 		"chunk_size":  viper.GetInt64("performance.file_chunk_size"), | ||||
| 		"chunk_count": len(metadata.FileChunks), | ||||
| 		"meta":        metadata, | ||||
| 	}) | ||||
| } | ||||
|  | ||||
| func uploadAttachmentMultipart(c *fiber.Ctx) error { | ||||
| 	if err := gap.H.EnsureAuthenticated(c); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	user := c.Locals("user").(models.Account) | ||||
|  | ||||
| 	rid := c.Params("file") | ||||
| 	cid := c.Params("chunk") | ||||
|  | ||||
| 	file, err := c.FormFile("file") | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} else if file.Size > viper.GetInt64("performance.file_chunk_size") { | ||||
| 		return fiber.NewError(fiber.StatusBadRequest, "file is too large for one chunk") | ||||
| 	} | ||||
|  | ||||
| 	meta, err := services.GetAttachmentByRID(rid) | ||||
| 	if err != nil { | ||||
| 		return fiber.NewError(fiber.StatusNotFound, fmt.Sprintf("attachment was not found: %v", err)) | ||||
| 	} else if user.ID != meta.AccountID { | ||||
| 		return fiber.NewError(fiber.StatusForbidden, "you are not authorized to upload this attachment") | ||||
| 	} | ||||
|  | ||||
| 	if _, ok := meta.FileChunks[cid]; !ok { | ||||
| 		return fiber.NewError(fiber.StatusNotFound, fmt.Sprintf("chunk %s was not found", cid)) | ||||
| 	} else if services.CheckChunkExistsInTemporary(meta, cid) { | ||||
| 		return fiber.NewError(fiber.StatusNotFound, fmt.Sprintf("chunk %s was uploaded", cid)) | ||||
| 	} | ||||
|  | ||||
| 	if err := services.UploadChunkToTemporary(c, cid, file, meta); err != nil { | ||||
| 		return fiber.NewError(fiber.StatusBadRequest, err.Error()) | ||||
| 	} | ||||
|  | ||||
| 	chunkArrange := make([]string, len(meta.FileChunks)) | ||||
| 	isAllUploaded := true | ||||
| 	for cid, idx := range meta.FileChunks { | ||||
| 		if !services.CheckChunkExistsInTemporary(meta, cid) { | ||||
| 			isAllUploaded = false | ||||
| 			break | ||||
| 		} else if val, ok := idx.(json.Number); ok { | ||||
| 			data, _ := val.Int64() | ||||
| 			chunkArrange[data] = cid | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	if !isAllUploaded { | ||||
| 		database.C.Save(&meta) | ||||
| 		return c.JSON(meta) | ||||
| 	} | ||||
|  | ||||
| 	if meta, err = services.MergeFileChunks(meta, chunkArrange); err != nil { | ||||
| 		return fiber.NewError(fiber.StatusInternalServerError, err.Error()) | ||||
| 	} else { | ||||
| 		return c.JSON(meta) | ||||
| 	} | ||||
| } | ||||
| @@ -53,7 +53,10 @@ func ScanUnanalyzedFileFromDatabase() { | ||||
| 	} | ||||
|  | ||||
| 	var attachments []models.Attachment | ||||
| 	if err := database.C.Where("destination = ? OR is_analyzed = ?", models.AttachmentDstTemporary, false).Find(&attachments).Error; err != nil { | ||||
| 	if err := database.C. | ||||
| 		Where("is_uploaded = ?", true). | ||||
| 		Where("destination = ? OR is_analyzed = ?", models.AttachmentDstTemporary, false). | ||||
| 		Find(&attachments).Error; err != nil { | ||||
| 		log.Error().Err(err).Msg("Scan unanalyzed files from database failed...") | ||||
| 		return | ||||
| 	} | ||||
| @@ -93,12 +96,22 @@ func ScanUnanalyzedFileFromDatabase() { | ||||
| } | ||||
|  | ||||
| func AnalyzeAttachment(file models.Attachment) error { | ||||
| 	if file.Destination != models.AttachmentDstTemporary { | ||||
| 	if !file.IsUploaded { | ||||
| 		return fmt.Errorf("file isn't finish multipart upload") | ||||
| 	} else if file.Destination != models.AttachmentDstTemporary { | ||||
| 		return fmt.Errorf("attachment isn't in temporary storage, unable to analyze") | ||||
| 	} | ||||
|  | ||||
| 	var start time.Time | ||||
|  | ||||
| 	if len(file.HashCode) == 0 { | ||||
| 		if hash, err := HashAttachment(file); err != nil { | ||||
| 			return err | ||||
| 		} else { | ||||
| 			file.HashCode = hash | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// Do analyze jobs | ||||
| 	if !file.IsAnalyzed || len(file.HashCode) == 0 { | ||||
| 		destMap := viper.GetStringMap("destinations.temporary") | ||||
| @@ -158,12 +171,6 @@ func AnalyzeAttachment(file models.Attachment) error { | ||||
| 				"color_space": stream.ColorSpace, | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		if hash, err := HashAttachment(file); err != nil { | ||||
| 			return err | ||||
| 		} else { | ||||
| 			file.HashCode = hash | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	tx := database.C.Begin() | ||||
|   | ||||
| @@ -2,6 +2,9 @@ package services | ||||
|  | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"github.com/spf13/viper" | ||||
| 	"gorm.io/datatypes" | ||||
| 	"math" | ||||
| 	"mime" | ||||
| 	"mime/multipart" | ||||
| 	"net/http" | ||||
| @@ -114,6 +117,38 @@ func NewAttachmentMetadata(tx *gorm.DB, user models.Account, file *multipart.Fil | ||||
| 	return attachment, nil | ||||
| } | ||||
|  | ||||
| func NewAttachmentPlaceholder(tx *gorm.DB, user models.Account, attachment models.Attachment) (models.Attachment, error) { | ||||
| 	attachment.Uuid = uuid.NewString() | ||||
| 	attachment.Rid = RandString(16) | ||||
| 	attachment.IsUploaded = false | ||||
| 	attachment.FileChunks = datatypes.JSONMap{} | ||||
| 	attachment.AccountID = user.ID | ||||
|  | ||||
| 	chunkSize := viper.GetInt64("performance.file_chunk_size") | ||||
| 	chunkCount := math.Ceil(float64(attachment.Size) / float64(chunkSize)) | ||||
| 	for idx := 0; idx < int(chunkCount); idx++ { | ||||
| 		cid := RandString(8) | ||||
| 		attachment.FileChunks[cid] = idx | ||||
| 	} | ||||
|  | ||||
| 	// If the user didn't provide file mimetype manually, we have to detect it | ||||
| 	if len(attachment.MimeType) == 0 { | ||||
| 		if ext := filepath.Ext(attachment.Name); len(ext) > 0 { | ||||
| 			// Detect mimetype by file extensions | ||||
| 			attachment.MimeType = mime.TypeByExtension(ext) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	if err := tx.Save(&attachment).Error; err != nil { | ||||
| 		return attachment, fmt.Errorf("failed to save attachment record: %v", err) | ||||
| 	} else { | ||||
| 		MaintainAttachmentCache() | ||||
| 		CacheAttachment(attachment) | ||||
| 	} | ||||
|  | ||||
| 	return attachment, nil | ||||
| } | ||||
|  | ||||
| func TryLinkAttachment(tx *gorm.DB, og models.Attachment, hash string) (bool, error) { | ||||
| 	prev, err := GetAttachmentByHash(hash) | ||||
| 	if err != nil { | ||||
|   | ||||
							
								
								
									
										58
									
								
								pkg/internal/services/merger.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										58
									
								
								pkg/internal/services/merger.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,58 @@ | ||||
| package services | ||||
|  | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"git.solsynth.dev/hydrogen/paperclip/pkg/internal/database" | ||||
| 	"git.solsynth.dev/hydrogen/paperclip/pkg/internal/models" | ||||
| 	jsoniter "github.com/json-iterator/go" | ||||
| 	"github.com/spf13/viper" | ||||
| 	"io" | ||||
| 	"os" | ||||
| 	"path/filepath" | ||||
| ) | ||||
|  | ||||
| func MergeFileChunks(meta models.Attachment, arrange []string) (models.Attachment, error) { | ||||
| 	destMap := viper.GetStringMap("destinations.temporary") | ||||
|  | ||||
| 	var dest models.LocalDestination | ||||
| 	rawDest, _ := jsoniter.Marshal(destMap) | ||||
| 	_ = jsoniter.Unmarshal(rawDest, &dest) | ||||
|  | ||||
| 	destPath := filepath.Join(dest.Path, meta.Uuid) | ||||
| 	destFile, err := os.Create(destPath) | ||||
| 	if err != nil { | ||||
| 		return meta, err | ||||
| 	} | ||||
| 	defer destFile.Close() | ||||
|  | ||||
| 	// Merge files | ||||
| 	for _, chunk := range arrange { | ||||
| 		chunkPath := filepath.Join(dest.Path, fmt.Sprintf("%s.%s", meta.Uuid, chunk)) | ||||
| 		chunkFile, err := os.Open(chunkPath) | ||||
| 		if err != nil { | ||||
| 			return meta, err | ||||
| 		} | ||||
|  | ||||
| 		_, err = io.Copy(destFile, chunkFile) | ||||
| 		if err != nil { | ||||
| 			_ = chunkFile.Close() | ||||
| 			return meta, err | ||||
| 		} | ||||
|  | ||||
| 		_ = chunkFile.Close() | ||||
| 	} | ||||
|  | ||||
| 	// Do post-upload tasks | ||||
| 	meta.IsUploaded = true | ||||
| 	database.C.Save(&meta) | ||||
|  | ||||
| 	PublishAnalyzeTask(meta) | ||||
|  | ||||
| 	// Clean up | ||||
| 	for _, chunk := range arrange { | ||||
| 		chunkPath := filepath.Join(dest.Path, fmt.Sprintf("%s.%s", meta.Uuid, chunk)) | ||||
| 		_ = os.Remove(chunkPath) | ||||
| 	} | ||||
|  | ||||
| 	return meta, nil | ||||
| } | ||||
| @@ -4,7 +4,7 @@ import ( | ||||
| 	"math/rand" | ||||
| ) | ||||
|  | ||||
| var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") | ||||
| var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789") | ||||
|  | ||||
| func RandString(length int) string { | ||||
| 	builder := make([]rune, length) | ||||
|   | ||||
| @@ -35,7 +35,7 @@ func StartConsumeDeletionTask() { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func RunMarkDeletionTask() { | ||||
| func RunMarkLifecycleDeletionTask() { | ||||
| 	var pools []models.AttachmentPool | ||||
| 	if err := database.C.Find(&pools).Error; err != nil { | ||||
| 		return | ||||
| @@ -53,6 +53,7 @@ func RunMarkDeletionTask() { | ||||
| 		tx := database.C. | ||||
| 			Where("pool_id = ?", pool.ID). | ||||
| 			Where("created_at < ?", lifecycle). | ||||
| 			Where("cleaned_at IS NULL"). | ||||
| 			Updates(&models.Attachment{CleanedAt: lo.ToPtr(time.Now())}) | ||||
| 		log.Info(). | ||||
| 			Str("pool", pool.Alias). | ||||
| @@ -62,13 +63,26 @@ func RunMarkDeletionTask() { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func RunMarkMultipartDeletionTask() { | ||||
| 	lifecycle := time.Now().Add(-60 * time.Minute) | ||||
| 	tx := database.C. | ||||
| 		Where("created_at < ?", lifecycle). | ||||
| 		Where("is_uploaded = ?", false). | ||||
| 		Where("cleaned_at IS NULL"). | ||||
| 		Updates(&models.Attachment{CleanedAt: lo.ToPtr(time.Now())}) | ||||
| 	log.Info(). | ||||
| 		Int64("count", tx.RowsAffected). | ||||
| 		Err(tx.Error). | ||||
| 		Msg("Marking attachments as clean needed due to multipart lifecycle...") | ||||
| } | ||||
|  | ||||
| func RunScheduleDeletionTask() { | ||||
| 	var attachments []models.Attachment | ||||
| 	if err := database.C.Where("cleaned_at IS NOT NULL").Find(&attachments).Error; err != nil { | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	for idx, attachment := range attachments { | ||||
| 	for _, attachment := range attachments { | ||||
| 		if attachment.RefID != nil { | ||||
| 			continue | ||||
| 		} | ||||
| @@ -76,8 +90,6 @@ func RunScheduleDeletionTask() { | ||||
| 			log.Error(). | ||||
| 				Uint("id", attachment.ID). | ||||
| 				Msg("An error occurred when deleting marked clean up attachments...") | ||||
| 		} else { | ||||
| 			attachments[idx].CleanedAt = lo.ToPtr(time.Now()) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| @@ -85,6 +97,20 @@ func RunScheduleDeletionTask() { | ||||
| } | ||||
|  | ||||
| func DeleteFile(meta models.Attachment) error { | ||||
| 	if !meta.IsUploaded { | ||||
| 		destMap := viper.GetStringMap("destinations.temporary") | ||||
| 		var dest models.LocalDestination | ||||
| 		rawDest, _ := jsoniter.Marshal(destMap) | ||||
| 		_ = jsoniter.Unmarshal(rawDest, &dest) | ||||
|  | ||||
| 		for cid := range meta.FileChunks { | ||||
| 			path := filepath.Join(dest.Path, fmt.Sprintf("%s.%s", meta.Uuid, cid)) | ||||
| 			_ = os.Remove(path) | ||||
| 		} | ||||
|  | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	var destMap map[string]any | ||||
| 	if meta.Destination == models.AttachmentDstTemporary { | ||||
| 		destMap = viper.GetStringMap("destinations.temporary") | ||||
|   | ||||
| @@ -1,8 +1,8 @@ | ||||
| package services | ||||
|  | ||||
| import ( | ||||
| 	"bytes" | ||||
| 	"context" | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"io" | ||||
| 	"mime/multipart" | ||||
| @@ -29,12 +29,44 @@ func UploadFileToTemporary(ctx *fiber.Ctx, file *multipart.FileHeader, meta mode | ||||
| 	case models.DestinationTypeLocal: | ||||
| 		var destConfigured models.LocalDestination | ||||
| 		_ = jsoniter.Unmarshal(rawDest, &destConfigured) | ||||
| 		return UploadFileToLocal(destConfigured, ctx, file, meta) | ||||
| 		return ctx.SaveFile(file, filepath.Join(destConfigured.Path, meta.Uuid)) | ||||
| 	default: | ||||
| 		return fmt.Errorf("invalid destination: unsupported protocol %s", dest.Type) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func UploadChunkToTemporary(ctx *fiber.Ctx, cid string, file *multipart.FileHeader, meta models.Attachment) error { | ||||
| 	destMap := viper.GetStringMap("destinations.temporary") | ||||
|  | ||||
| 	var dest models.BaseDestination | ||||
| 	rawDest, _ := jsoniter.Marshal(destMap) | ||||
| 	_ = jsoniter.Unmarshal(rawDest, &dest) | ||||
|  | ||||
| 	switch dest.Type { | ||||
| 	case models.DestinationTypeLocal: | ||||
| 		var destConfigured models.LocalDestination | ||||
| 		_ = jsoniter.Unmarshal(rawDest, &destConfigured) | ||||
| 		return ctx.SaveFile(file, filepath.Join(destConfigured.Path, fmt.Sprintf("%s.%s", meta.Uuid, cid))) | ||||
| 	default: | ||||
| 		return fmt.Errorf("invalid destination: unsupported protocol %s", dest.Type) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func CheckChunkExistsInTemporary(meta models.Attachment, cid string) bool { | ||||
| 	destMap := viper.GetStringMap("destinations.temporary") | ||||
|  | ||||
| 	var dest models.LocalDestination | ||||
| 	rawDest, _ := jsoniter.Marshal(destMap) | ||||
| 	_ = jsoniter.Unmarshal(rawDest, &dest) | ||||
|  | ||||
| 	path := filepath.Join(dest.Path, fmt.Sprintf("%s.%s", meta.Uuid, cid)) | ||||
| 	if _, err := os.Stat(path); errors.Is(err, os.ErrNotExist) { | ||||
| 		return false | ||||
| 	} else { | ||||
| 		return true | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func ReUploadFileToPermanent(meta models.Attachment) error { | ||||
| 	if meta.Destination != models.AttachmentDstTemporary { | ||||
| 		return fmt.Errorf("attachment isn't in temporary storage, unable to process") | ||||
| @@ -50,8 +82,8 @@ func ReUploadFileToPermanent(meta models.Attachment) error { | ||||
|  | ||||
| 	prevDestMap := viper.GetStringMap("destinations.temporary") | ||||
|  | ||||
| 	// Currently the temporary destination only support the local | ||||
| 	// So we can do this | ||||
| 	// Currently, the temporary destination only supports the local. | ||||
| 	// So we can do this. | ||||
| 	var prevDest models.LocalDestination | ||||
| 	prevRawDest, _ := jsoniter.Marshal(prevDestMap) | ||||
| 	_ = jsoniter.Unmarshal(prevRawDest, &prevDest) | ||||
| @@ -111,39 +143,3 @@ func ReUploadFileToPermanent(meta models.Attachment) error { | ||||
| 		return fmt.Errorf("invalid destination: unsupported protocol %s", dest.Type) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func UploadFileToLocal(config models.LocalDestination, ctx *fiber.Ctx, file *multipart.FileHeader, meta models.Attachment) error { | ||||
| 	return ctx.SaveFile(file, filepath.Join(config.Path, meta.Uuid)) | ||||
| } | ||||
|  | ||||
| func UploadFileToS3(config models.S3Destination, file *multipart.FileHeader, meta models.Attachment) error { | ||||
| 	header, err := file.Open() | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("read upload file: %v", err) | ||||
| 	} | ||||
| 	defer header.Close() | ||||
|  | ||||
| 	buffer := bytes.NewBuffer(nil) | ||||
| 	if _, err := io.Copy(buffer, header); err != nil { | ||||
| 		return fmt.Errorf("create io reader for upload file: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	client, err := minio.New(config.Endpoint, &minio.Options{ | ||||
| 		Creds:  credentials.NewStaticV4(config.SecretID, config.SecretKey, ""), | ||||
| 		Secure: config.EnableSSL, | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("unable to configure s3 client: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	_, err = client.PutObject(context.Background(), config.Bucket, filepath.Join(config.Path, meta.Uuid), buffer, file.Size, minio.PutObjectOptions{ | ||||
| 		ContentType:          meta.MimeType, | ||||
| 		SendContentMd5:       false, | ||||
| 		DisableContentSha256: true, | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("unable to upload file to s3: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|   | ||||
| @@ -59,7 +59,8 @@ func main() { | ||||
| 	// Configure timed tasks | ||||
| 	quartz := cron.New(cron.WithLogger(cron.VerbosePrintfLogger(&log.Logger))) | ||||
| 	quartz.AddFunc("@every 60m", services.DoAutoDatabaseCleanup) | ||||
| 	quartz.AddFunc("@every 60m", services.RunMarkDeletionTask) | ||||
| 	quartz.AddFunc("@every 60m", services.RunMarkLifecycleDeletionTask) | ||||
| 	quartz.AddFunc("@every 60m", services.RunMarkMultipartDeletionTask) | ||||
| 	quartz.AddFunc("@midnight", services.RunScheduleDeletionTask) | ||||
| 	quartz.Start() | ||||
|  | ||||
| @@ -75,7 +76,7 @@ func main() { | ||||
| 	log.Info().Msgf("Paperclip v%s is started...", pkg.AppVersion) | ||||
|  | ||||
| 	services.ScanUnanalyzedFileFromDatabase() | ||||
| 	services.RunMarkDeletionTask() | ||||
| 	services.RunMarkLifecycleDeletionTask() | ||||
|  | ||||
| 	quit := make(chan os.Signal, 1) | ||||
| 	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) | ||||
|   | ||||
| @@ -29,6 +29,9 @@ cookie_samesite = "Lax" | ||||
| access_token_duration = 300 | ||||
| refresh_token_duration = 2592000 | ||||
|  | ||||
| [performance] | ||||
| file_chunk_size = 26214400 | ||||
|  | ||||
| [database] | ||||
| dsn = "host=localhost user=postgres password=password dbname=hy_paperclip port=5432 sslmode=disable" | ||||
| prefix = "paperclip_" | ||||
|   | ||||
		Reference in New Issue
	
	Block a user