Messaging/pkg/internal/services/status.go

102 lines
2.5 KiB
Go
Raw Normal View History

2024-08-23 11:32:24 +00:00
package services
import (
"context"
"fmt"
"sync"
2024-08-23 11:32:24 +00:00
"git.solsynth.dev/hydrogen/dealer/pkg/hyper"
"git.solsynth.dev/hydrogen/dealer/pkg/proto"
"git.solsynth.dev/hydrogen/messaging/pkg/internal/database"
"git.solsynth.dev/hydrogen/messaging/pkg/internal/gap"
"git.solsynth.dev/hydrogen/messaging/pkg/internal/models"
)
type statusQueryCacheEntry struct {
2024-08-23 14:26:54 +00:00
Target []uint64
Data any
}
var statusQueryCacheLock sync.Mutex
// Map for caching typing status queries [channel id][user id]
var statusQueryCache = make(map[uint]map[uint]statusQueryCacheEntry)
2024-08-23 11:32:24 +00:00
func SetTypingStatus(channelId uint, userId uint) error {
2024-08-23 14:26:54 +00:00
var broadcastTarget []uint64
var data any
2024-08-23 14:26:54 +00:00
hitCache := false
if channelLevel, ok := statusQueryCache[channelId]; ok {
if entry, ok := channelLevel[userId]; ok {
2024-08-23 14:26:54 +00:00
broadcastTarget = entry.Target
data = entry.Data
hitCache = true
}
2024-08-23 14:26:54 +00:00
}
if !hitCache {
var account models.Account
if err := database.C.Where("external_id = ?", userId).First(&account).Error; err != nil {
return fmt.Errorf("account not found: %v", err)
}
2024-08-23 11:32:24 +00:00
2024-08-23 14:26:54 +00:00
var member models.ChannelMember
if err := database.C.
Where("account_id = ? AND channel_id = ?", account.ID, channelId).
First(&member).Error; err != nil {
return fmt.Errorf("channel member not found: %v", err)
} else {
member.Account = account
}
2024-08-23 14:26:54 +00:00
var channel models.Channel
if err := database.C.
Preload("Members").
Where("id = ?", channelId).
First(&channel).Error; err != nil {
return fmt.Errorf("channel not found: %v", err)
}
2024-08-23 14:26:54 +00:00
for _, item := range channel.Members {
if item.AccountID == member.AccountID {
continue
}
broadcastTarget = append(broadcastTarget, uint64(item.AccountID))
}
data = map[string]any{
"user_id": userId,
"member_id": member.ID,
"channel_id": channelId,
"member": member,
"channel": channel,
}
// Cache queries
statusQueryCacheLock.Lock()
if _, ok := statusQueryCache[channelId]; !ok {
statusQueryCache[channelId] = make(map[uint]statusQueryCacheEntry)
}
2024-08-23 14:26:54 +00:00
statusQueryCache[channelId][userId] = statusQueryCacheEntry{broadcastTarget, data}
statusQueryCacheLock.Unlock()
2024-08-23 11:32:24 +00:00
}
sc := proto.NewStreamControllerClient(gap.H.GetDealerGrpcConn())
_, err := sc.PushStreamBatch(context.Background(), &proto.PushStreamBatchRequest{
2024-08-23 13:01:47 +00:00
UserId: broadcastTarget,
2024-08-23 11:32:24 +00:00
Body: hyper.NetworkPackage{
2024-08-23 14:26:54 +00:00
Action: "status.typing",
Payload: data,
2024-08-23 11:32:24 +00:00
}.Marshal(),
})
2024-08-23 14:26:54 +00:00
if len(statusQueryCache) > 512 {
statusQueryCacheLock.Lock()
clear(statusQueryCache)
statusQueryCacheLock.Unlock()
}
2024-08-23 11:32:24 +00:00
return err
}