Messaging/pkg/internal/services/status.go

90 lines
2.4 KiB
Go
Raw Permalink Normal View History

2024-08-23 11:32:24 +00:00
package services
import (
"context"
"fmt"
2024-11-02 05:24:37 +00:00
localCache "git.solsynth.dev/hypernet/messaging/pkg/internal/cache"
"git.solsynth.dev/hypernet/messaging/pkg/internal/database"
"git.solsynth.dev/hypernet/messaging/pkg/internal/gap"
"git.solsynth.dev/hypernet/messaging/pkg/internal/models"
"git.solsynth.dev/hypernet/nexus/pkg/nex"
"git.solsynth.dev/hypernet/nexus/pkg/proto"
"github.com/eko/gocache/lib/v4/cache"
"github.com/eko/gocache/lib/v4/marshaler"
"github.com/eko/gocache/lib/v4/store"
2024-08-23 11:32:24 +00:00
)
type statusQueryCacheEntry struct {
2024-08-23 14:26:54 +00:00
Target []uint64
Data any
}
func GetTypingStatusQueryCacheKey(channelId uint, userId uint) string {
return fmt.Sprintf("typing-status-query#%d;%d", channelId, userId)
}
2024-08-23 11:32:24 +00:00
func SetTypingStatus(channelId uint, userId uint) error {
2024-08-23 14:26:54 +00:00
var broadcastTarget []uint64
var data any
cacheManager := cache.New[any](localCache.S)
marshal := marshaler.New(cacheManager)
contx := context.Background()
2024-08-23 14:26:54 +00:00
hitCache := false
if val, err := marshal.Get(contx, GetTypingStatusQueryCacheKey(channelId, userId), new(statusQueryCacheEntry)); err == nil {
2024-10-09 15:56:24 +00:00
entry := val.(*statusQueryCacheEntry)
broadcastTarget = entry.Target
data = entry.Data
hitCache = true
2024-08-23 14:26:54 +00:00
}
if !hitCache {
var member models.ChannelMember
if err := database.C.
2024-11-02 05:23:27 +00:00
Where("account_id = ? AND channel_id = ?", userId, channelId).
First(&member).Error; err != nil {
return fmt.Errorf("channel member not found: %v", err)
}
2024-08-23 14:26:54 +00:00
var channel models.Channel
if err := database.C.
Preload("Members").
Where("id = ?", channelId).
First(&channel).Error; err != nil {
return fmt.Errorf("channel not found: %v", err)
}
2024-08-23 14:26:54 +00:00
for _, item := range channel.Members {
2024-11-02 05:23:27 +00:00
broadcastTarget = append(broadcastTarget, uint64(item.AccountID))
2024-08-23 14:26:54 +00:00
}
data = map[string]any{
"user_id": userId,
"member_id": member.ID,
"channel_id": channelId,
"member": member,
"channel": channel,
}
// Cache queries
2024-11-02 05:23:27 +00:00
_ = marshal.Set(
contx,
GetTypingStatusQueryCacheKey(channelId, userId),
statusQueryCacheEntry{broadcastTarget, data},
store.WithTags([]string{"typing-status-query", fmt.Sprintf("channel#%d", channelId), fmt.Sprintf("user#%d", userId)}),
)
2024-08-23 11:32:24 +00:00
}
sc := proto.NewStreamServiceClient(gap.Nx.GetNexusGrpcConn())
2024-08-23 11:32:24 +00:00
_, err := sc.PushStreamBatch(context.Background(), &proto.PushStreamBatchRequest{
2024-08-23 13:01:47 +00:00
UserId: broadcastTarget,
Body: nex.WebSocketPackage{
2024-08-23 14:26:54 +00:00
Action: "status.typing",
Payload: data,
2024-08-23 11:32:24 +00:00
}.Marshal(),
})
return err
}