Paperclip/pkg/internal/services/analyzer.go
2024-09-22 15:49:32 +08:00

269 lines
7.3 KiB
Go

package services
import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"image"
"io"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"git.solsynth.dev/hydrogen/paperclip/pkg/internal/database"
"git.solsynth.dev/hydrogen/paperclip/pkg/internal/models"
jsoniter "github.com/json-iterator/go"
"github.com/k0kubun/go-ansi"
"github.com/rs/zerolog/log"
"github.com/schollz/progressbar/v3"
"github.com/spf13/viper"
"gopkg.in/vansante/go-ffprobe.v2"
_ "image/gif"
_ "image/jpeg"
_ "image/png"
)
var fileAnalyzeQueue = make(chan models.Attachment, 256)
func PublishAnalyzeTask(file models.Attachment) {
fileAnalyzeQueue <- file
}
func StartConsumeAnalyzeTask() {
for {
task := <-fileAnalyzeQueue
start := time.Now()
if err := AnalyzeAttachment(task); err != nil {
log.Error().Err(err).Any("task", task).Msg("A file analyze task failed...")
} else {
log.Info().Dur("elapsed", time.Since(start)).Uint("id", task.ID).Msg("A file analyze task was completed.")
}
}
}
func ScanUnanalyzedFileFromDatabase() {
workers := viper.GetInt("workers.files_analyze")
if workers < 2 {
log.Warn().Int("val", workers).Int("min", 2).Msg("The file analyzer does not have enough computing power, and the scan of unanalyzed files will not start...")
}
var attachments []models.Attachment
if err := database.C.
Where("is_uploaded = ?", true).
Where("destination = ? OR is_analyzed = ?", models.AttachmentDstTemporary, false).
Find(&attachments).Error; err != nil {
log.Error().Err(err).Msg("Scan unanalyzed files from database failed...")
return
}
if len(attachments) == 0 {
return
}
go func() {
var deletionIdSet []uint
bar := progressbar.NewOptions(len(attachments),
progressbar.OptionSetWriter(ansi.NewAnsiStdout()),
progressbar.OptionEnableColorCodes(true),
progressbar.OptionShowBytes(true),
progressbar.OptionSetWidth(15),
progressbar.OptionSetDescription("Analyzing the unanalyzed files..."),
progressbar.OptionSetTheme(progressbar.Theme{
Saucer: "[green]=[reset]",
SaucerHead: "[green]>[reset]",
SaucerPadding: " ",
BarStart: "[",
BarEnd: "]",
}))
for _, task := range attachments {
if err := AnalyzeAttachment(task); err != nil {
log.Error().Err(err).Any("task", task).Msg("A background file analyze task failed...")
deletionIdSet = append(deletionIdSet, task.ID)
}
bar.Add(1)
}
log.Info().Int("count", len(attachments)).Int("fails", len(deletionIdSet)).Msg("All unanalyzed files has been analyzed!")
if len(deletionIdSet) > 0 {
database.C.Delete(&models.Attachment{}, deletionIdSet)
}
}()
}
func AnalyzeAttachment(file models.Attachment) error {
if !file.IsUploaded {
return fmt.Errorf("file isn't finish multipart upload")
} else if file.Destination != models.AttachmentDstTemporary {
return fmt.Errorf("attachment isn't in temporary storage, unable to analyze")
}
var start time.Time
if len(file.HashCode) == 0 {
if hash, err := HashAttachment(file); err != nil {
return err
} else {
file.HashCode = hash
}
}
// Do analyze jobs
if !file.IsAnalyzed || len(file.HashCode) == 0 {
destMap := viper.GetStringMap("destinations.temporary")
var dest models.LocalDestination
rawDest, _ := jsoniter.Marshal(destMap)
_ = jsoniter.Unmarshal(rawDest, &dest)
start = time.Now()
dst := filepath.Join(dest.Path, file.Uuid)
if _, err := os.Stat(dst); os.IsNotExist(err) {
return fmt.Errorf("attachment doesn't exists in temporary storage: %v", err)
}
switch strings.SplitN(file.MimeType, "/", 2)[0] {
case "image":
// Dealing with image
reader, err := os.Open(dst)
if err != nil {
return fmt.Errorf("unable to open file: %v", err)
}
defer reader.Close()
im, _, err := image.Decode(reader)
if err != nil {
return fmt.Errorf("unable to decode file as an image: %v", err)
}
width := im.Bounds().Dx()
height := im.Bounds().Dy()
ratio := float64(width) / float64(height)
file.Metadata = map[string]any{
"width": width,
"height": height,
"ratio": ratio,
}
case "video":
// Dealing with video
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
data, err := ffprobe.ProbeURL(ctx, dst)
if err != nil {
return fmt.Errorf("unable to analyze video information: %v", err)
}
stream := data.FirstVideoStream()
ratio := float64(stream.Width) / float64(stream.Height)
duration, _ := strconv.ParseFloat(stream.Duration, 64)
file.Metadata = map[string]any{
"width": stream.Width,
"height": stream.Height,
"ratio": ratio,
"duration": duration,
"bit_rate": stream.BitRate,
"codec_name": stream.CodecName,
"color_range": stream.ColorRange,
"color_space": stream.ColorSpace,
}
}
}
tx := database.C.Begin()
file.IsAnalyzed = true
linked, err := TryLinkAttachment(tx, file, file.HashCode)
if linked && err != nil {
return fmt.Errorf("unable to link file record: %v", err)
} else if !linked {
CacheAttachment(file)
if err := tx.Save(&file).Error; err != nil {
tx.Rollback()
return fmt.Errorf("unable to save file record: %v", err)
}
}
tx.Commit()
log.Info().Dur("elapsed", time.Since(start)).Uint("id", file.ID).Msg("A file analyze task was finished, starting uploading...")
start = time.Now()
// Move temporary to permanent
if !linked {
if err := ReUploadFileToPermanent(file); err != nil {
return fmt.Errorf("unable to move file to permanet storage: %v", err)
}
}
// Recycle the temporary file
file.Destination = models.AttachmentDstTemporary
PublishDeleteFileTask(file)
// Finish
log.Info().Dur("elapsed", time.Since(start)).Uint("id", file.ID).Bool("linked", linked).Msg("A file post-analyze upload task was finished.")
return nil
}
func HashAttachment(file models.Attachment) (hash string, err error) {
const chunkSize = 32 * 1024
destMap := viper.GetStringMapString("destinations.temporary")
destPath := filepath.Join(destMap["path"], file.Uuid)
// Check if the file exists
fileInfo, err := os.Stat(destPath)
if os.IsNotExist(err) {
return "", fmt.Errorf("file does not exist: %v", err)
}
// Open the file
inFile, err := os.Open(destPath)
if err != nil {
return "", fmt.Errorf("unable to open file: %v", err)
}
defer inFile.Close()
hasher := sha256.New()
// Hash the first 32KB
buf := make([]byte, chunkSize)
if _, err := inFile.Read(buf); err != nil && err != io.EOF {
return "", fmt.Errorf("error reading file: %v", err)
}
hasher.Write(buf)
// Hash the middle 32KB
middleOffset := fileInfo.Size() / 2
if _, err := inFile.Seek(middleOffset, io.SeekStart); err != nil {
return "", fmt.Errorf("error seeking to middle: %v", err)
}
if _, err := inFile.Read(buf); err != nil && err != io.EOF {
return "", fmt.Errorf("error reading middle: %v", err)
}
hasher.Write(buf)
// Hash the last 32KB
endOffset := fileInfo.Size() - chunkSize
if _, err := inFile.Seek(endOffset, io.SeekStart); err != nil {
return "", fmt.Errorf("error seeking to end: %v", err)
}
if _, err := inFile.Read(buf); err != nil && err != io.EOF {
return "", fmt.Errorf("error reading end: %v", err)
}
hasher.Write(buf)
// Hash with the file metadata
hasher.Write([]byte(fmt.Sprintf("%d", file.Size)))
// Return the combined hash
hash = hex.EncodeToString(hasher.Sum(nil))
return
}