From e3a4988ccf9bb13ce8d70b55c5b3397c5ed6f07d Mon Sep 17 00:00:00 2001 From: LittleSheep Date: Sat, 1 Mar 2025 17:52:57 +0800 Subject: [PATCH] :zap: Optimize for passive users :sparkles: Subscribe to channel focused --- pkg/internal/grpc/stream.go | 31 ++++++++++++++++++ pkg/internal/models/channels.go | 2 +- pkg/internal/services/calls.go | 24 +++++++++----- pkg/internal/services/channels.go | 2 ++ pkg/internal/services/events.go | 12 ++++++- pkg/internal/services/status.go | 25 +++++++++------ pkg/internal/services/subscribe.go | 51 ++++++++++++++++++++++++++++++ settings.toml | 3 ++ 8 files changed, 130 insertions(+), 20 deletions(-) create mode 100644 pkg/internal/services/subscribe.go diff --git a/pkg/internal/grpc/stream.go b/pkg/internal/grpc/stream.go index 5fbaed6..ecf8db7 100644 --- a/pkg/internal/grpc/stream.go +++ b/pkg/internal/grpc/stream.go @@ -3,6 +3,8 @@ package grpc import ( "context" "fmt" + "strings" + "git.solsynth.dev/hypernet/messaging/pkg/internal/gap" "git.solsynth.dev/hypernet/messaging/pkg/internal/http/exts" "git.solsynth.dev/hypernet/messaging/pkg/internal/services" @@ -51,6 +53,35 @@ func (v *Server) PushStream(_ context.Context, request *proto.PushStreamRequest) }) break } + case "events.subscribe", "events.unsubscribe", "events.unsubscribeAll": + var data struct { + ChannelID uint `json:"channel_id" validate:"required"` + } + + err := jsoniter.Unmarshal(in.RawPayload(), &data) + if err == nil { + err = exts.ValidateStruct(data) + } + if err != nil { + _, _ = sc.PushStream(context.Background(), &proto.PushStreamRequest{ + ClientId: request.ClientId, + Body: nex.WebSocketPackage{ + Action: "error", + Message: fmt.Sprintf("unable parse payload: %v", err), + }.Marshal(), + }) + break + } + + action := strings.Split(in.Action, ".")[1] + switch action { + case "subscribe": + services.SubscribeChannel(uint(request.GetUserId()), data.ChannelID) + case "unsubscribe": + services.UnsubscribeChannel(uint(request.GetUserId()), data.ChannelID) + case "unsubscribeAll": + services.UnsubscribeAll(uint(request.GetUserId())) + } case "events.read": var data struct { ChannelMemberID uint `json:"channel_member_id" validate:"required"` diff --git a/pkg/internal/models/channels.go b/pkg/internal/models/channels.go index 878199b..d103d4e 100644 --- a/pkg/internal/models/channels.go +++ b/pkg/internal/models/channels.go @@ -39,7 +39,7 @@ func (v Channel) DisplayText() string { if v.Realm != nil { return fmt.Sprintf("%s, %s", v.Name, v.Realm.Name) } - return fmt.Sprintf("%s", v.Name) + return v.Name } type NotifyLevel = int8 diff --git a/pkg/internal/services/calls.go b/pkg/internal/services/calls.go index ccbe79c..1dacb50 100644 --- a/pkg/internal/services/calls.go +++ b/pkg/internal/services/calls.go @@ -117,10 +117,6 @@ func NewCall(channel models.Channel, founder models.ChannelMember) (models.Call, if member.ID != call.Founder.ID { pendingUsers = append(pendingUsers, uint64(member.AccountID)) } - PushCommand(member.AccountID, nex.WebSocketPackage{ - Action: "calls.new", - Payload: call, - }) } channel, _ = GetChannel(channel.ID) @@ -131,6 +127,13 @@ func NewCall(channel models.Channel, founder models.ChannelMember) (models.Call, } } + // The call notification is not happen very often + // So we don't need to optimize the performance for passive users + PushCommandBatch(pendingUsers, nex.WebSocketPackage{ + Action: "calls.new", + Payload: call, + }) + err = authkit.NotifyUserBatch( gap.Nx, pendingUsers, @@ -172,12 +175,17 @@ func EndCall(call models.Call) (models.Call, error) { ChannelID: call.ChannelID, }).Find(&members).Error; err == nil { call, _ = GetCall(call.Channel, call.ID) + var pendingUsers []uint64 for _, member := range members { - PushCommand(member.AccountID, nex.WebSocketPackage{ - Action: "calls.end", - Payload: call, - }) + if member.ID != call.Founder.ID { + pendingUsers = append(pendingUsers, uint64(member.AccountID)) + } } + + PushCommandBatch(pendingUsers, nex.WebSocketPackage{ + Action: "calls.end", + Payload: call, + }) } return call, nil diff --git a/pkg/internal/services/channels.go b/pkg/internal/services/channels.go index 745f58b..856fba4 100644 --- a/pkg/internal/services/channels.go +++ b/pkg/internal/services/channels.go @@ -297,6 +297,8 @@ func EditChannel(channel models.Channel) (models.Channel, error) { func DeleteChannel(channel models.Channel) error { if err := database.C.Delete(&channel).Error; err == nil { + UnsubscribeAllWithChannels(channel.ID) + database.C.Where("channel_id = ?", channel.ID).Delete(&models.Event{}) cacheManager := cache.New[any](localCache.S) diff --git a/pkg/internal/services/events.go b/pkg/internal/services/events.go index 773ea80..cc3ca58 100644 --- a/pkg/internal/services/events.go +++ b/pkg/internal/services/events.go @@ -15,6 +15,7 @@ import ( jsoniter "github.com/json-iterator/go" "github.com/rs/zerolog/log" "github.com/samber/lo" + "github.com/spf13/viper" ) func CountEvent(channel models.Channel) int64 { @@ -89,7 +90,13 @@ func NewEvent(event models.Event) (models.Event, error) { log.Error().Err(err).Msg("Failed to fetch event, the notifying of new event was terminated...") return event, err } - idxList := lo.Map(members, func(item models.ChannelMember, index int) uint64 { + idxList := lo.Map(lo.Filter(members, func(item models.ChannelMember, index int) bool { + if !viper.GetBool("performance.passive_user_optimize") { + // Leave this for backward compatibility + return true + } + return CheckSubscribed(item.AccountID, event.ChannelID) + }), func(item models.ChannelMember, index int) uint64 { return uint64(item.AccountID) }) _ = PushCommandBatch(idxList, nex.WebSocketPackage{ @@ -120,6 +127,9 @@ func NotifyMessageEvent(members []models.ChannelMember, event models.Event) { var mentionedUsers []uint64 for _, member := range members { + if CheckSubscribed(member.AccountID, event.ChannelID) { + continue + } if member.ID != event.SenderID { switch member.Notify { case models.NotifyLevelNone: diff --git a/pkg/internal/services/status.go b/pkg/internal/services/status.go index 270dfa6..ae21c3a 100644 --- a/pkg/internal/services/status.go +++ b/pkg/internal/services/status.go @@ -3,15 +3,16 @@ package services import ( "context" "fmt" + localCache "git.solsynth.dev/hypernet/messaging/pkg/internal/cache" "git.solsynth.dev/hypernet/messaging/pkg/internal/database" - "git.solsynth.dev/hypernet/messaging/pkg/internal/gap" "git.solsynth.dev/hypernet/messaging/pkg/internal/models" "git.solsynth.dev/hypernet/nexus/pkg/nex" - "git.solsynth.dev/hypernet/nexus/pkg/proto" "github.com/eko/gocache/lib/v4/cache" "github.com/eko/gocache/lib/v4/marshaler" "github.com/eko/gocache/lib/v4/store" + "github.com/samber/lo" + "github.com/spf13/viper" ) type statusQueryCacheEntry struct { @@ -76,14 +77,18 @@ func SetTypingStatus(channelId uint, userId uint) error { ) } - sc := proto.NewStreamServiceClient(gap.Nx.GetNexusGrpcConn()) - _, err := sc.PushStreamBatch(context.Background(), &proto.PushStreamBatchRequest{ - UserId: broadcastTarget, - Body: nex.WebSocketPackage{ - Action: "status.typing", - Payload: data, - }.Marshal(), + broadcastTarget = lo.Filter(broadcastTarget, func(item uint64, index int) bool { + if !viper.GetBool("performance.passive_user_optimize") { + // Leave this for backward compatibility + return true + } + return CheckSubscribed(uint(item), channelId) }) - return err + PushCommandBatch(broadcastTarget, nex.WebSocketPackage{ + Action: "status.typing", + Payload: data, + }) + + return nil } diff --git a/pkg/internal/services/subscribe.go b/pkg/internal/services/subscribe.go new file mode 100644 index 0000000..089aa35 --- /dev/null +++ b/pkg/internal/services/subscribe.go @@ -0,0 +1,51 @@ +package services + +import "sync" + +// ChannelID -> UserID +var subscribeInfo = make(map[uint]map[uint]bool) +var subscribeLock sync.Mutex + +// If user subscribed to a channel +// Push the new message to them via websocket +// And skip the notification + +func CheckSubscribed(UserID uint, ChannelID uint) bool { + if _, ok := subscribeInfo[ChannelID]; ok { + if _, ok := subscribeInfo[ChannelID][UserID]; ok { + return true + } + } + return false +} + +func SubscribeChannel(userId uint, channelId uint) { + subscribeLock.Lock() + defer subscribeLock.Unlock() + if _, ok := subscribeInfo[channelId]; !ok { + subscribeInfo[channelId] = make(map[uint]bool) + } + subscribeInfo[channelId][userId] = true +} + +func UnsubscribeChannel(userId uint, channelId uint) { + subscribeLock.Lock() + defer subscribeLock.Unlock() + if _, ok := subscribeInfo[channelId]; ok { + delete(subscribeInfo[channelId], userId) + } +} + +func UnsubscribeAll(userId uint) { + subscribeLock.Lock() + defer subscribeLock.Unlock() + for _, v := range subscribeInfo { + delete(v, userId) + } +} + +func UnsubscribeAllWithChannels(channelId uint) { + subscribeLock.Lock() + defer subscribeLock.Unlock() + delete(subscribeInfo, channelId) +} diff --git a/settings.toml b/settings.toml index 3a6fed9..6e45239 100644 --- a/settings.toml +++ b/settings.toml @@ -5,6 +5,9 @@ grpc_bind = "0.0.0.0:7447" nexus_addr = "localhost:7001" +[performance] +passive_user_optimize = true + [debug] database = false print_routes = false