diff --git a/pkg/internal/grpc/services.go b/pkg/internal/grpc/services.go index 7ba9add..8b02ae9 100644 --- a/pkg/internal/grpc/services.go +++ b/pkg/internal/grpc/services.go @@ -6,12 +6,25 @@ import ( "git.solsynth.dev/hypernet/nexus/pkg/nex" "git.solsynth.dev/hypernet/nexus/pkg/proto" jsoniter "github.com/json-iterator/go" + "github.com/rs/zerolog/log" "git.solsynth.dev/hypernet/messaging/pkg/internal/database" + "git.solsynth.dev/hypernet/messaging/pkg/internal/services" ) func (v *Server) BroadcastEvent(ctx context.Context, in *proto.EventInfo) (*proto.EventResponse, error) { + log.Debug().Str("event", in.GetEvent()). + Msg("Got a broadcasting event...") + switch in.GetEvent() { + // Clear the subscribed channel + case "ws.client.unregister": + // Update user last seen at + data := nex.DecodeMap(in.GetData()) + id := data["id"].(string) + services.UnsubscribeAllWithClient(id) + log.Info().Str("client", id).Msg("Client unregistered, cleaning up subscribed channels...") + // Account recycle case "deletion": data := nex.DecodeMap(in.GetData()) resType, ok := data["type"].(string) diff --git a/pkg/internal/grpc/stream.go b/pkg/internal/grpc/stream.go index ecf8db7..bce3bc6 100644 --- a/pkg/internal/grpc/stream.go +++ b/pkg/internal/grpc/stream.go @@ -76,7 +76,7 @@ func (v *Server) PushStream(_ context.Context, request *proto.PushStreamRequest) action := strings.Split(in.Action, ".")[1] switch action { case "subscribe": - services.SubscribeChannel(uint(request.GetUserId()), data.ChannelID) + services.SubscribeChannel(uint(request.GetUserId()), data.ChannelID, request.GetClientId()) case "unsubscribe": services.UnsubscribeChannel(uint(request.GetUserId()), data.ChannelID) case "unsubscribeAll": diff --git a/pkg/internal/services/subscribe.go b/pkg/internal/services/subscribe.go index 089aa35..bb80a34 100644 --- a/pkg/internal/services/subscribe.go +++ b/pkg/internal/services/subscribe.go @@ -2,8 +2,8 @@ package services import "sync" -// ChannelID -> UserID -var subscribeInfo = make(map[uint]map[uint]bool) +// ChannelID -> UserID -> Client ID +var subscribeInfo = make(map[uint]map[uint]string) var subscribeLock sync.Mutex // If user subscribed to a channel @@ -19,13 +19,13 @@ func CheckSubscribed(UserID uint, ChannelID uint) bool { return false } -func SubscribeChannel(userId uint, channelId uint) { +func SubscribeChannel(userId uint, channelId uint, clientId string) { subscribeLock.Lock() defer subscribeLock.Unlock() if _, ok := subscribeInfo[channelId]; !ok { - subscribeInfo[channelId] = make(map[uint]bool) + subscribeInfo[channelId] = make(map[uint]string) } - subscribeInfo[channelId][userId] = true + subscribeInfo[channelId][userId] = clientId } func UnsubscribeChannel(userId uint, channelId uint) { @@ -49,3 +49,15 @@ func UnsubscribeAllWithChannels(channelId uint) { defer subscribeLock.Unlock() delete(subscribeInfo, channelId) } + +func UnsubscribeAllWithClient(clientId string) { + subscribeLock.Lock() + defer subscribeLock.Unlock() + for _, v := range subscribeInfo { + for k, item := range v { + if item == clientId { + delete(v, k) + } + } + } +}