diff --git a/pkg/internal/grpc/stream.go b/pkg/internal/grpc/stream.go index b2402bf..5fbaed6 100644 --- a/pkg/internal/grpc/stream.go +++ b/pkg/internal/grpc/stream.go @@ -3,11 +3,10 @@ package grpc import ( "context" "fmt" - "git.solsynth.dev/hypernet/nexus/pkg/nex" - "git.solsynth.dev/hypernet/messaging/pkg/internal/gap" "git.solsynth.dev/hypernet/messaging/pkg/internal/http/exts" "git.solsynth.dev/hypernet/messaging/pkg/internal/services" + "git.solsynth.dev/hypernet/nexus/pkg/nex" "git.solsynth.dev/hypernet/nexus/pkg/proto" jsoniter "github.com/json-iterator/go" ) @@ -38,6 +37,7 @@ func (v *Server) PushStream(_ context.Context, request *proto.PushStreamRequest) Message: fmt.Sprintf("unable parse payload: %v", err), }.Marshal(), }) + break } err = services.SetTypingStatus(data.ChannelID, uint(request.GetUserId())) @@ -49,7 +49,31 @@ func (v *Server) PushStream(_ context.Context, request *proto.PushStreamRequest) Message: fmt.Sprintf("unable boardcast status: %v", err), }.Marshal(), }) + break } + case "events.read": + var data struct { + ChannelMemberID uint `json:"channel_member_id" validate:"required"` + EventID uint `json:"event_id" validate:"required"` + } + + err := jsoniter.Unmarshal(in.RawPayload(), &data) + if err == nil { + err = exts.ValidateStruct(data) + } + if err != nil { + _, _ = sc.PushStream(context.Background(), &proto.PushStreamRequest{ + ClientId: request.ClientId, + Body: nex.WebSocketPackage{ + Action: "error", + Message: fmt.Sprintf("unable parse payload: %v", err), + }.Marshal(), + }) + break + } + + // WARN We trust the user here, so we don't need to check if the channel member is valid for performance + services.SetReadingAnchor(data.ChannelMemberID, data.EventID) } return &proto.PushStreamResponse{}, nil diff --git a/pkg/internal/models/channels.go b/pkg/internal/models/channels.go index 7577937..f8e854a 100644 --- a/pkg/internal/models/channels.go +++ b/pkg/internal/models/channels.go @@ -56,11 +56,12 @@ type ChannelMember struct { Nick string `json:"nick"` Avatar *string `json:"avatar"` - ChannelID uint `json:"channel_id"` - AccountID uint `json:"account_id"` - Channel Channel `json:"channel"` - Notify NotifyLevel `json:"notify"` - PowerLevel int `json:"power_level"` + ChannelID uint `json:"channel_id"` + AccountID uint `json:"account_id"` + Channel Channel `json:"channel"` + Notify NotifyLevel `json:"notify"` + PowerLevel int `json:"power_level"` + ReadingAnchor *int `json:"reading_anchor"` Calls []Call `json:"calls" gorm:"foreignKey:FounderID"` Events []Event `json:"events" gorm:"foreignKey:SenderID"` diff --git a/pkg/internal/services/reading_anchor.go b/pkg/internal/services/reading_anchor.go new file mode 100644 index 0000000..ceb35e4 --- /dev/null +++ b/pkg/internal/services/reading_anchor.go @@ -0,0 +1,40 @@ +package services + +import ( + "git.solsynth.dev/hypernet/messaging/pkg/internal/database" + "git.solsynth.dev/hypernet/messaging/pkg/internal/models" + "github.com/rs/zerolog/log" + "github.com/samber/lo" + "gorm.io/gorm" +) + +var readingAnchorQueue = make(map[uint]uint) + +func SetReadingAnchor(memberId uint, eventId uint) { + if val, ok := readingAnchorQueue[memberId]; ok { + readingAnchorQueue[memberId] = max(eventId, val) + } else { + readingAnchorQueue[memberId] = eventId + } +} + +func FlushReadingAnchor() { + if len(readingAnchorQueue) == 0 { + return + } + idSet := lo.Uniq(lo.Map(lo.Keys(readingAnchorQueue), func(item uint, _ int) uint { + return item + })) + var pairs []map[string]any + for k, v := range readingAnchorQueue { + pairs = append(pairs, map[string]any{ + "id": k, + "reading_anchor": gorm.Expr("GREATEST(reading_anchor, ?)", v), + }) + } + if err := database.C.Model(&models.ChannelMember{}). + Where("id IN ?", idSet). + Updates(pairs).Error; err != nil { + log.Error().Err(err).Msg("An error occurred when flushing reading anchor...") + } +} diff --git a/pkg/main.go b/pkg/main.go index f860cfc..8d272b5 100644 --- a/pkg/main.go +++ b/pkg/main.go @@ -83,6 +83,7 @@ func main() { // Configure timed tasks quartz := cron.New(cron.WithLogger(cron.VerbosePrintfLogger(&log.Logger))) quartz.AddFunc("@every 60m", services.DoAutoDatabaseCleanup) + quartz.AddFunc("@every 1m", services.FlushReadingAnchor) quartz.Start() // Messages