⚡ Optimize for passive users
✨ Subscribe to channel focused
This commit is contained in:
parent
d22a435224
commit
e3a4988ccf
@ -3,6 +3,8 @@ package grpc
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"git.solsynth.dev/hypernet/messaging/pkg/internal/gap"
|
"git.solsynth.dev/hypernet/messaging/pkg/internal/gap"
|
||||||
"git.solsynth.dev/hypernet/messaging/pkg/internal/http/exts"
|
"git.solsynth.dev/hypernet/messaging/pkg/internal/http/exts"
|
||||||
"git.solsynth.dev/hypernet/messaging/pkg/internal/services"
|
"git.solsynth.dev/hypernet/messaging/pkg/internal/services"
|
||||||
@ -51,6 +53,35 @@ func (v *Server) PushStream(_ context.Context, request *proto.PushStreamRequest)
|
|||||||
})
|
})
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
case "events.subscribe", "events.unsubscribe", "events.unsubscribeAll":
|
||||||
|
var data struct {
|
||||||
|
ChannelID uint `json:"channel_id" validate:"required"`
|
||||||
|
}
|
||||||
|
|
||||||
|
err := jsoniter.Unmarshal(in.RawPayload(), &data)
|
||||||
|
if err == nil {
|
||||||
|
err = exts.ValidateStruct(data)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
_, _ = sc.PushStream(context.Background(), &proto.PushStreamRequest{
|
||||||
|
ClientId: request.ClientId,
|
||||||
|
Body: nex.WebSocketPackage{
|
||||||
|
Action: "error",
|
||||||
|
Message: fmt.Sprintf("unable parse payload: %v", err),
|
||||||
|
}.Marshal(),
|
||||||
|
})
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
action := strings.Split(in.Action, ".")[1]
|
||||||
|
switch action {
|
||||||
|
case "subscribe":
|
||||||
|
services.SubscribeChannel(uint(request.GetUserId()), data.ChannelID)
|
||||||
|
case "unsubscribe":
|
||||||
|
services.UnsubscribeChannel(uint(request.GetUserId()), data.ChannelID)
|
||||||
|
case "unsubscribeAll":
|
||||||
|
services.UnsubscribeAll(uint(request.GetUserId()))
|
||||||
|
}
|
||||||
case "events.read":
|
case "events.read":
|
||||||
var data struct {
|
var data struct {
|
||||||
ChannelMemberID uint `json:"channel_member_id" validate:"required"`
|
ChannelMemberID uint `json:"channel_member_id" validate:"required"`
|
||||||
|
@ -39,7 +39,7 @@ func (v Channel) DisplayText() string {
|
|||||||
if v.Realm != nil {
|
if v.Realm != nil {
|
||||||
return fmt.Sprintf("%s, %s", v.Name, v.Realm.Name)
|
return fmt.Sprintf("%s, %s", v.Name, v.Realm.Name)
|
||||||
}
|
}
|
||||||
return fmt.Sprintf("%s", v.Name)
|
return v.Name
|
||||||
}
|
}
|
||||||
|
|
||||||
type NotifyLevel = int8
|
type NotifyLevel = int8
|
||||||
|
@ -117,10 +117,6 @@ func NewCall(channel models.Channel, founder models.ChannelMember) (models.Call,
|
|||||||
if member.ID != call.Founder.ID {
|
if member.ID != call.Founder.ID {
|
||||||
pendingUsers = append(pendingUsers, uint64(member.AccountID))
|
pendingUsers = append(pendingUsers, uint64(member.AccountID))
|
||||||
}
|
}
|
||||||
PushCommand(member.AccountID, nex.WebSocketPackage{
|
|
||||||
Action: "calls.new",
|
|
||||||
Payload: call,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
channel, _ = GetChannel(channel.ID)
|
channel, _ = GetChannel(channel.ID)
|
||||||
@ -131,6 +127,13 @@ func NewCall(channel models.Channel, founder models.ChannelMember) (models.Call,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// The call notification is not happen very often
|
||||||
|
// So we don't need to optimize the performance for passive users
|
||||||
|
PushCommandBatch(pendingUsers, nex.WebSocketPackage{
|
||||||
|
Action: "calls.new",
|
||||||
|
Payload: call,
|
||||||
|
})
|
||||||
|
|
||||||
err = authkit.NotifyUserBatch(
|
err = authkit.NotifyUserBatch(
|
||||||
gap.Nx,
|
gap.Nx,
|
||||||
pendingUsers,
|
pendingUsers,
|
||||||
@ -172,12 +175,17 @@ func EndCall(call models.Call) (models.Call, error) {
|
|||||||
ChannelID: call.ChannelID,
|
ChannelID: call.ChannelID,
|
||||||
}).Find(&members).Error; err == nil {
|
}).Find(&members).Error; err == nil {
|
||||||
call, _ = GetCall(call.Channel, call.ID)
|
call, _ = GetCall(call.Channel, call.ID)
|
||||||
|
var pendingUsers []uint64
|
||||||
for _, member := range members {
|
for _, member := range members {
|
||||||
PushCommand(member.AccountID, nex.WebSocketPackage{
|
if member.ID != call.Founder.ID {
|
||||||
Action: "calls.end",
|
pendingUsers = append(pendingUsers, uint64(member.AccountID))
|
||||||
Payload: call,
|
}
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
PushCommandBatch(pendingUsers, nex.WebSocketPackage{
|
||||||
|
Action: "calls.end",
|
||||||
|
Payload: call,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
return call, nil
|
return call, nil
|
||||||
|
@ -297,6 +297,8 @@ func EditChannel(channel models.Channel) (models.Channel, error) {
|
|||||||
|
|
||||||
func DeleteChannel(channel models.Channel) error {
|
func DeleteChannel(channel models.Channel) error {
|
||||||
if err := database.C.Delete(&channel).Error; err == nil {
|
if err := database.C.Delete(&channel).Error; err == nil {
|
||||||
|
UnsubscribeAllWithChannels(channel.ID)
|
||||||
|
|
||||||
database.C.Where("channel_id = ?", channel.ID).Delete(&models.Event{})
|
database.C.Where("channel_id = ?", channel.ID).Delete(&models.Event{})
|
||||||
|
|
||||||
cacheManager := cache.New[any](localCache.S)
|
cacheManager := cache.New[any](localCache.S)
|
||||||
|
@ -15,6 +15,7 @@ import (
|
|||||||
jsoniter "github.com/json-iterator/go"
|
jsoniter "github.com/json-iterator/go"
|
||||||
"github.com/rs/zerolog/log"
|
"github.com/rs/zerolog/log"
|
||||||
"github.com/samber/lo"
|
"github.com/samber/lo"
|
||||||
|
"github.com/spf13/viper"
|
||||||
)
|
)
|
||||||
|
|
||||||
func CountEvent(channel models.Channel) int64 {
|
func CountEvent(channel models.Channel) int64 {
|
||||||
@ -89,7 +90,13 @@ func NewEvent(event models.Event) (models.Event, error) {
|
|||||||
log.Error().Err(err).Msg("Failed to fetch event, the notifying of new event was terminated...")
|
log.Error().Err(err).Msg("Failed to fetch event, the notifying of new event was terminated...")
|
||||||
return event, err
|
return event, err
|
||||||
}
|
}
|
||||||
idxList := lo.Map(members, func(item models.ChannelMember, index int) uint64 {
|
idxList := lo.Map(lo.Filter(members, func(item models.ChannelMember, index int) bool {
|
||||||
|
if !viper.GetBool("performance.passive_user_optimize") {
|
||||||
|
// Leave this for backward compatibility
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return CheckSubscribed(item.AccountID, event.ChannelID)
|
||||||
|
}), func(item models.ChannelMember, index int) uint64 {
|
||||||
return uint64(item.AccountID)
|
return uint64(item.AccountID)
|
||||||
})
|
})
|
||||||
_ = PushCommandBatch(idxList, nex.WebSocketPackage{
|
_ = PushCommandBatch(idxList, nex.WebSocketPackage{
|
||||||
@ -120,6 +127,9 @@ func NotifyMessageEvent(members []models.ChannelMember, event models.Event) {
|
|||||||
var mentionedUsers []uint64
|
var mentionedUsers []uint64
|
||||||
|
|
||||||
for _, member := range members {
|
for _, member := range members {
|
||||||
|
if CheckSubscribed(member.AccountID, event.ChannelID) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
if member.ID != event.SenderID {
|
if member.ID != event.SenderID {
|
||||||
switch member.Notify {
|
switch member.Notify {
|
||||||
case models.NotifyLevelNone:
|
case models.NotifyLevelNone:
|
||||||
|
@ -3,15 +3,16 @@ package services
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
localCache "git.solsynth.dev/hypernet/messaging/pkg/internal/cache"
|
localCache "git.solsynth.dev/hypernet/messaging/pkg/internal/cache"
|
||||||
"git.solsynth.dev/hypernet/messaging/pkg/internal/database"
|
"git.solsynth.dev/hypernet/messaging/pkg/internal/database"
|
||||||
"git.solsynth.dev/hypernet/messaging/pkg/internal/gap"
|
|
||||||
"git.solsynth.dev/hypernet/messaging/pkg/internal/models"
|
"git.solsynth.dev/hypernet/messaging/pkg/internal/models"
|
||||||
"git.solsynth.dev/hypernet/nexus/pkg/nex"
|
"git.solsynth.dev/hypernet/nexus/pkg/nex"
|
||||||
"git.solsynth.dev/hypernet/nexus/pkg/proto"
|
|
||||||
"github.com/eko/gocache/lib/v4/cache"
|
"github.com/eko/gocache/lib/v4/cache"
|
||||||
"github.com/eko/gocache/lib/v4/marshaler"
|
"github.com/eko/gocache/lib/v4/marshaler"
|
||||||
"github.com/eko/gocache/lib/v4/store"
|
"github.com/eko/gocache/lib/v4/store"
|
||||||
|
"github.com/samber/lo"
|
||||||
|
"github.com/spf13/viper"
|
||||||
)
|
)
|
||||||
|
|
||||||
type statusQueryCacheEntry struct {
|
type statusQueryCacheEntry struct {
|
||||||
@ -76,14 +77,18 @@ func SetTypingStatus(channelId uint, userId uint) error {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
sc := proto.NewStreamServiceClient(gap.Nx.GetNexusGrpcConn())
|
broadcastTarget = lo.Filter(broadcastTarget, func(item uint64, index int) bool {
|
||||||
_, err := sc.PushStreamBatch(context.Background(), &proto.PushStreamBatchRequest{
|
if !viper.GetBool("performance.passive_user_optimize") {
|
||||||
UserId: broadcastTarget,
|
// Leave this for backward compatibility
|
||||||
Body: nex.WebSocketPackage{
|
return true
|
||||||
Action: "status.typing",
|
}
|
||||||
Payload: data,
|
return CheckSubscribed(uint(item), channelId)
|
||||||
}.Marshal(),
|
|
||||||
})
|
})
|
||||||
|
|
||||||
return err
|
PushCommandBatch(broadcastTarget, nex.WebSocketPackage{
|
||||||
|
Action: "status.typing",
|
||||||
|
Payload: data,
|
||||||
|
})
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
51
pkg/internal/services/subscribe.go
Normal file
51
pkg/internal/services/subscribe.go
Normal file
@ -0,0 +1,51 @@
|
|||||||
|
package services
|
||||||
|
|
||||||
|
import "sync"
|
||||||
|
|
||||||
|
// ChannelID -> UserID
|
||||||
|
var subscribeInfo = make(map[uint]map[uint]bool)
|
||||||
|
var subscribeLock sync.Mutex
|
||||||
|
|
||||||
|
// If user subscribed to a channel
|
||||||
|
// Push the new message to them via websocket
|
||||||
|
// And skip the notification
|
||||||
|
|
||||||
|
func CheckSubscribed(UserID uint, ChannelID uint) bool {
|
||||||
|
if _, ok := subscribeInfo[ChannelID]; ok {
|
||||||
|
if _, ok := subscribeInfo[ChannelID][UserID]; ok {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func SubscribeChannel(userId uint, channelId uint) {
|
||||||
|
subscribeLock.Lock()
|
||||||
|
defer subscribeLock.Unlock()
|
||||||
|
if _, ok := subscribeInfo[channelId]; !ok {
|
||||||
|
subscribeInfo[channelId] = make(map[uint]bool)
|
||||||
|
}
|
||||||
|
subscribeInfo[channelId][userId] = true
|
||||||
|
}
|
||||||
|
|
||||||
|
func UnsubscribeChannel(userId uint, channelId uint) {
|
||||||
|
subscribeLock.Lock()
|
||||||
|
defer subscribeLock.Unlock()
|
||||||
|
if _, ok := subscribeInfo[channelId]; ok {
|
||||||
|
delete(subscribeInfo[channelId], userId)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func UnsubscribeAll(userId uint) {
|
||||||
|
subscribeLock.Lock()
|
||||||
|
defer subscribeLock.Unlock()
|
||||||
|
for _, v := range subscribeInfo {
|
||||||
|
delete(v, userId)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func UnsubscribeAllWithChannels(channelId uint) {
|
||||||
|
subscribeLock.Lock()
|
||||||
|
defer subscribeLock.Unlock()
|
||||||
|
delete(subscribeInfo, channelId)
|
||||||
|
}
|
@ -5,6 +5,9 @@ grpc_bind = "0.0.0.0:7447"
|
|||||||
|
|
||||||
nexus_addr = "localhost:7001"
|
nexus_addr = "localhost:7001"
|
||||||
|
|
||||||
|
[performance]
|
||||||
|
passive_user_optimize = true
|
||||||
|
|
||||||
[debug]
|
[debug]
|
||||||
database = false
|
database = false
|
||||||
print_routes = false
|
print_routes = false
|
||||||
|
Loading…
x
Reference in New Issue
Block a user