✨ Member reading anchor
This commit is contained in:
		| @@ -3,11 +3,10 @@ package grpc | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"git.solsynth.dev/hypernet/nexus/pkg/nex" | ||||
|  | ||||
| 	"git.solsynth.dev/hypernet/messaging/pkg/internal/gap" | ||||
| 	"git.solsynth.dev/hypernet/messaging/pkg/internal/http/exts" | ||||
| 	"git.solsynth.dev/hypernet/messaging/pkg/internal/services" | ||||
| 	"git.solsynth.dev/hypernet/nexus/pkg/nex" | ||||
| 	"git.solsynth.dev/hypernet/nexus/pkg/proto" | ||||
| 	jsoniter "github.com/json-iterator/go" | ||||
| ) | ||||
| @@ -38,6 +37,7 @@ func (v *Server) PushStream(_ context.Context, request *proto.PushStreamRequest) | ||||
| 					Message: fmt.Sprintf("unable parse payload: %v", err), | ||||
| 				}.Marshal(), | ||||
| 			}) | ||||
| 			break | ||||
| 		} | ||||
|  | ||||
| 		err = services.SetTypingStatus(data.ChannelID, uint(request.GetUserId())) | ||||
| @@ -49,7 +49,31 @@ func (v *Server) PushStream(_ context.Context, request *proto.PushStreamRequest) | ||||
| 					Message: fmt.Sprintf("unable boardcast status: %v", err), | ||||
| 				}.Marshal(), | ||||
| 			}) | ||||
| 			break | ||||
| 		} | ||||
| 	case "events.read": | ||||
| 		var data struct { | ||||
| 			ChannelMemberID uint `json:"channel_member_id" validate:"required"` | ||||
| 			EventID         uint `json:"event_id" validate:"required"` | ||||
| 		} | ||||
|  | ||||
| 		err := jsoniter.Unmarshal(in.RawPayload(), &data) | ||||
| 		if err == nil { | ||||
| 			err = exts.ValidateStruct(data) | ||||
| 		} | ||||
| 		if err != nil { | ||||
| 			_, _ = sc.PushStream(context.Background(), &proto.PushStreamRequest{ | ||||
| 				ClientId: request.ClientId, | ||||
| 				Body: nex.WebSocketPackage{ | ||||
| 					Action:  "error", | ||||
| 					Message: fmt.Sprintf("unable parse payload: %v", err), | ||||
| 				}.Marshal(), | ||||
| 			}) | ||||
| 			break | ||||
| 		} | ||||
|  | ||||
| 		// WARN We trust the user here, so we don't need to check if the channel member is valid for performance | ||||
| 		services.SetReadingAnchor(data.ChannelMemberID, data.EventID) | ||||
| 	} | ||||
|  | ||||
| 	return &proto.PushStreamResponse{}, nil | ||||
|   | ||||
| @@ -56,11 +56,12 @@ type ChannelMember struct { | ||||
| 	Nick   string  `json:"nick"` | ||||
| 	Avatar *string `json:"avatar"` | ||||
|  | ||||
| 	ChannelID  uint        `json:"channel_id"` | ||||
| 	AccountID  uint        `json:"account_id"` | ||||
| 	Channel    Channel     `json:"channel"` | ||||
| 	Notify     NotifyLevel `json:"notify"` | ||||
| 	PowerLevel int         `json:"power_level"` | ||||
| 	ChannelID     uint        `json:"channel_id"` | ||||
| 	AccountID     uint        `json:"account_id"` | ||||
| 	Channel       Channel     `json:"channel"` | ||||
| 	Notify        NotifyLevel `json:"notify"` | ||||
| 	PowerLevel    int         `json:"power_level"` | ||||
| 	ReadingAnchor *int        `json:"reading_anchor"` | ||||
|  | ||||
| 	Calls  []Call  `json:"calls" gorm:"foreignKey:FounderID"` | ||||
| 	Events []Event `json:"events" gorm:"foreignKey:SenderID"` | ||||
|   | ||||
							
								
								
									
										40
									
								
								pkg/internal/services/reading_anchor.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										40
									
								
								pkg/internal/services/reading_anchor.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,40 @@ | ||||
| package services | ||||
|  | ||||
| import ( | ||||
| 	"git.solsynth.dev/hypernet/messaging/pkg/internal/database" | ||||
| 	"git.solsynth.dev/hypernet/messaging/pkg/internal/models" | ||||
| 	"github.com/rs/zerolog/log" | ||||
| 	"github.com/samber/lo" | ||||
| 	"gorm.io/gorm" | ||||
| ) | ||||
|  | ||||
| var readingAnchorQueue = make(map[uint]uint) | ||||
|  | ||||
| func SetReadingAnchor(memberId uint, eventId uint) { | ||||
| 	if val, ok := readingAnchorQueue[memberId]; ok { | ||||
| 		readingAnchorQueue[memberId] = max(eventId, val) | ||||
| 	} else { | ||||
| 		readingAnchorQueue[memberId] = eventId | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func FlushReadingAnchor() { | ||||
| 	if len(readingAnchorQueue) == 0 { | ||||
| 		return | ||||
| 	} | ||||
| 	idSet := lo.Uniq(lo.Map(lo.Keys(readingAnchorQueue), func(item uint, _ int) uint { | ||||
| 		return item | ||||
| 	})) | ||||
| 	var pairs []map[string]any | ||||
| 	for k, v := range readingAnchorQueue { | ||||
| 		pairs = append(pairs, map[string]any{ | ||||
| 			"id":             k, | ||||
| 			"reading_anchor": gorm.Expr("GREATEST(reading_anchor, ?)", v), | ||||
| 		}) | ||||
| 	} | ||||
| 	if err := database.C.Model(&models.ChannelMember{}). | ||||
| 		Where("id IN ?", idSet). | ||||
| 		Updates(pairs).Error; err != nil { | ||||
| 		log.Error().Err(err).Msg("An error occurred when flushing reading anchor...") | ||||
| 	} | ||||
| } | ||||
| @@ -83,6 +83,7 @@ func main() { | ||||
| 	// Configure timed tasks | ||||
| 	quartz := cron.New(cron.WithLogger(cron.VerbosePrintfLogger(&log.Logger))) | ||||
| 	quartz.AddFunc("@every 60m", services.DoAutoDatabaseCleanup) | ||||
| 	quartz.AddFunc("@every 1m", services.FlushReadingAnchor) | ||||
| 	quartz.Start() | ||||
|  | ||||
| 	// Messages | ||||
|   | ||||
		Reference in New Issue
	
	Block a user