2024-07-28 13:03:56 +00:00
package services
import (
2024-08-02 14:31:30 +00:00
"context"
2024-07-28 16:01:51 +00:00
"crypto/sha256"
"encoding/hex"
2024-07-28 13:03:56 +00:00
"fmt"
"image"
2024-07-28 16:01:51 +00:00
"io"
2024-07-28 13:03:56 +00:00
"os"
"path/filepath"
2024-08-02 14:31:30 +00:00
"strconv"
2024-07-28 13:03:56 +00:00
"strings"
2024-07-28 17:47:33 +00:00
"time"
2024-07-28 13:03:56 +00:00
"git.solsynth.dev/hydrogen/paperclip/pkg/internal/database"
"git.solsynth.dev/hydrogen/paperclip/pkg/internal/models"
jsoniter "github.com/json-iterator/go"
2024-07-29 06:41:28 +00:00
"github.com/k0kubun/go-ansi"
2024-07-28 16:53:40 +00:00
"github.com/rs/zerolog/log"
2024-07-29 06:41:28 +00:00
"github.com/schollz/progressbar/v3"
2024-07-28 13:03:56 +00:00
"github.com/spf13/viper"
2024-08-02 14:31:30 +00:00
"gopkg.in/vansante/go-ffprobe.v2"
2024-07-28 13:03:56 +00:00
_ "image/gif"
_ "image/jpeg"
_ "image/png"
)
var fileAnalyzeQueue = make ( chan models . Attachment , 256 )
func PublishAnalyzeTask ( file models . Attachment ) {
fileAnalyzeQueue <- file
}
2024-07-28 16:53:40 +00:00
func StartConsumeAnalyzeTask ( ) {
for {
task := <- fileAnalyzeQueue
2024-07-28 17:47:33 +00:00
start := time . Now ( )
2024-07-28 16:53:40 +00:00
if err := AnalyzeAttachment ( task ) ; err != nil {
log . Error ( ) . Err ( err ) . Any ( "task" , task ) . Msg ( "A file analyze task failed..." )
2024-07-28 17:47:33 +00:00
} else {
2024-07-29 05:22:57 +00:00
log . Info ( ) . Dur ( "elapsed" , time . Since ( start ) ) . Uint ( "id" , task . ID ) . Msg ( "A file analyze task was completed." )
2024-07-28 16:53:40 +00:00
}
}
}
2024-07-29 06:41:28 +00:00
func ScanUnanalyzedFileFromDatabase ( ) {
workers := viper . GetInt ( "workers.files_analyze" )
if workers < 2 {
log . Warn ( ) . Int ( "val" , workers ) . Int ( "min" , 2 ) . Msg ( "The file analyzer does not have enough computing power, and the scan of unanalyzed files will not start..." )
}
var attachments [ ] models . Attachment
if err := database . C . Where ( "destination = ? OR is_analyzed = ?" , models . AttachmentDstTemporary , false ) . Find ( & attachments ) . Error ; err != nil {
log . Error ( ) . Err ( err ) . Msg ( "Scan unanalyzed files from database failed..." )
return
}
if len ( attachments ) == 0 {
return
}
go func ( ) {
var deletionIdSet [ ] uint
bar := progressbar . NewOptions ( len ( attachments ) ,
progressbar . OptionSetWriter ( ansi . NewAnsiStdout ( ) ) ,
progressbar . OptionEnableColorCodes ( true ) ,
progressbar . OptionShowBytes ( true ) ,
progressbar . OptionSetWidth ( 15 ) ,
progressbar . OptionSetDescription ( "Analyzing the unanalyzed files..." ) ,
progressbar . OptionSetTheme ( progressbar . Theme {
Saucer : "[green]=[reset]" ,
SaucerHead : "[green]>[reset]" ,
SaucerPadding : " " ,
BarStart : "[" ,
BarEnd : "]" ,
} ) )
for _ , task := range attachments {
if err := AnalyzeAttachment ( task ) ; err != nil {
log . Error ( ) . Err ( err ) . Any ( "task" , task ) . Msg ( "A background file analyze task failed..." )
deletionIdSet = append ( deletionIdSet , task . ID )
}
bar . Add ( 1 )
}
log . Info ( ) . Int ( "count" , len ( attachments ) ) . Int ( "fails" , len ( deletionIdSet ) ) . Msg ( "All unanalyzed files has been analyzed!" )
if len ( deletionIdSet ) > 0 {
database . C . Delete ( & models . Attachment { } , deletionIdSet )
}
} ( )
}
2024-07-28 13:03:56 +00:00
func AnalyzeAttachment ( file models . Attachment ) error {
if file . Destination != models . AttachmentDstTemporary {
return fmt . Errorf ( "attachment isn't in temporary storage, unable to analyze" )
}
2024-07-29 06:41:28 +00:00
var start time . Time
2024-07-28 13:03:56 +00:00
2024-08-02 14:31:30 +00:00
// Do analyze job
2024-07-29 06:41:28 +00:00
if ! file . IsAnalyzed || len ( file . HashCode ) == 0 {
destMap := viper . GetStringMap ( "destinations.temporary" )
2024-07-28 13:03:56 +00:00
2024-07-29 06:41:28 +00:00
var dest models . LocalDestination
rawDest , _ := jsoniter . Marshal ( destMap )
_ = jsoniter . Unmarshal ( rawDest , & dest )
2024-07-29 05:22:57 +00:00
2024-07-29 06:41:28 +00:00
start = time . Now ( )
2024-07-28 13:03:56 +00:00
2024-07-29 06:41:28 +00:00
dst := filepath . Join ( dest . Path , file . Uuid )
if _ , err := os . Stat ( dst ) ; os . IsNotExist ( err ) {
return fmt . Errorf ( "attachment doesn't exists in temporary storage: %v" , err )
2024-07-28 13:03:56 +00:00
}
2024-07-29 06:41:28 +00:00
2024-08-02 14:31:30 +00:00
switch strings . SplitN ( file . MimeType , "/" , 2 ) [ 0 ] {
case "image" :
2024-07-29 06:41:28 +00:00
// Dealing with image
reader , err := os . Open ( dst )
if err != nil {
return fmt . Errorf ( "unable to open file: %v" , err )
}
defer reader . Close ( )
im , _ , err := image . Decode ( reader )
if err != nil {
return fmt . Errorf ( "unable to decode file as an image: %v" , err )
}
width := im . Bounds ( ) . Dx ( )
height := im . Bounds ( ) . Dy ( )
ratio := float64 ( width ) / float64 ( height )
file . Metadata = map [ string ] any {
"width" : width ,
"height" : height ,
"ratio" : ratio ,
}
2024-08-02 14:31:30 +00:00
case "video" :
// Dealing with video
ctx , cancel := context . WithTimeout ( context . Background ( ) , 10 * time . Second )
defer cancel ( )
data , err := ffprobe . ProbeURL ( ctx , dst )
if err != nil {
return fmt . Errorf ( "unable to analyze video information: %v" , err )
}
stream := data . FirstVideoStream ( )
ratio := float64 ( stream . Width ) / float64 ( stream . Height )
duration , _ := strconv . ParseFloat ( stream . Duration , 64 )
file . Metadata = map [ string ] any {
"width" : stream . Width ,
"height" : stream . Height ,
"ratio" : ratio ,
"duration" : duration ,
"bit_rate" : stream . BitRate ,
"codec_name" : stream . CodecName ,
"color_range" : stream . ColorRange ,
"color_space" : stream . ColorSpace ,
}
2024-07-28 13:03:56 +00:00
}
2024-07-29 06:41:28 +00:00
if hash , err := HashAttachment ( file ) ; err != nil {
return err
} else {
file . HashCode = hash
}
2024-07-28 13:03:56 +00:00
}
2024-07-28 16:01:51 +00:00
tx := database . C . Begin ( )
2024-07-29 05:22:57 +00:00
file . IsAnalyzed = true
2024-07-28 16:01:51 +00:00
linked , err := TryLinkAttachment ( tx , file , file . HashCode )
if linked && err != nil {
return fmt . Errorf ( "unable to link file record: %v" , err )
} else if ! linked {
2024-07-29 05:31:48 +00:00
metadataCache . Store ( file . ID , file )
2024-07-28 17:47:33 +00:00
if err := tx . Save ( & file ) . Error ; err != nil {
2024-07-28 16:01:51 +00:00
tx . Rollback ( )
return fmt . Errorf ( "unable to save file record: %v" , err )
}
}
2024-07-29 05:22:57 +00:00
tx . Commit ( )
log . Info ( ) . Dur ( "elapsed" , time . Since ( start ) ) . Uint ( "id" , file . ID ) . Msg ( "A file analyze task was finished, starting uploading..." )
start = time . Now ( )
// Move temporary to permanet
2024-07-28 16:01:51 +00:00
if ! linked {
if err := ReUploadFileToPermanent ( file ) ; err != nil {
return fmt . Errorf ( "unable to move file to permanet storage: %v" , err )
}
}
2024-07-29 05:22:57 +00:00
// Recycle the temporary file
file . Destination = models . AttachmentDstTemporary
PublishDeleteFileTask ( file )
// Finish
log . Info ( ) . Dur ( "elapsed" , time . Since ( start ) ) . Uint ( "id" , file . ID ) . Bool ( "linked" , linked ) . Msg ( "A file post-analyze upload task was finished." )
2024-07-28 16:01:51 +00:00
2024-07-28 13:03:56 +00:00
return nil
}
2024-07-28 16:01:51 +00:00
func HashAttachment ( file models . Attachment ) ( hash string , err error ) {
if file . Destination != models . AttachmentDstTemporary {
err = fmt . Errorf ( "attachment isn't in temporary storage, unable to hash" )
return
}
destMap := viper . GetStringMap ( "destinations.temporary" )
var dest models . LocalDestination
rawDest , _ := jsoniter . Marshal ( destMap )
_ = jsoniter . Unmarshal ( rawDest , & dest )
dst := filepath . Join ( dest . Path , file . Uuid )
2024-07-28 17:47:33 +00:00
if _ , err = os . Stat ( dst ) ; os . IsNotExist ( err ) {
err = fmt . Errorf ( "attachment doesn't exists in temporary storage: %v" , err )
2024-07-28 16:01:51 +00:00
return
}
var in * os . File
2024-07-28 17:47:33 +00:00
in , err = os . Open ( dst )
2024-07-28 16:01:51 +00:00
if err != nil {
err = fmt . Errorf ( "unable to open file: %v" , err )
return
}
defer in . Close ( )
hasher := sha256 . New ( )
if _ , err = io . Copy ( hasher , in ) ; err != nil {
err = fmt . Errorf ( "unable to hash: %v" , err )
return
}
hash = hex . EncodeToString ( hasher . Sum ( nil ) )
return
}