Use batch notify to speed up the creation of event

This commit is contained in:
2024-07-17 14:10:04 +08:00
parent 2486b317e3
commit 096565f4e0
5 changed files with 102 additions and 76 deletions

View File

@ -3,11 +3,12 @@ package services
import (
"context"
"fmt"
"time"
"git.solsynth.dev/hydrogen/dealer/pkg/hyper"
"git.solsynth.dev/hydrogen/messaging/pkg/internal/database"
"git.solsynth.dev/hydrogen/messaging/pkg/internal/gap"
jsoniter "github.com/json-iterator/go"
"time"
"git.solsynth.dev/hydrogen/dealer/pkg/proto"
"git.solsynth.dev/hydrogen/messaging/pkg/internal/models"
@ -57,7 +58,30 @@ func NotifyAccountMessager(user models.Account, title, body string, subtitle *st
return err
}
_, err = proto.NewNotifierClient(pc).NotifyUser(ctx, &proto.NotifyUserRequest{
UserId: uint64(user.ID),
UserId: uint64(user.ExternalID),
Notify: &proto.NotifyRequest{
Topic: "messaging.message",
Title: title,
Subtitle: subtitle,
Body: body,
IsRealtime: realtime,
IsForcePush: forcePush,
},
})
return err
}
func NotifyAccountMessagerBatch(users []uint64, title, body string, subtitle *string, realtime bool, forcePush bool) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
pc, err := gap.H.GetServiceGrpcConn(hyper.ServiceTypeAuthProvider)
if err != nil {
return err
}
_, err = proto.NewNotifierClient(pc).NotifyUserBatch(ctx, &proto.NotifyUserBatchRequest{
UserId: users,
Notify: &proto.NotifyRequest{
Topic: "messaging.message",
Title: title,

View File

@ -84,12 +84,13 @@ func NewEvent(event models.Event) (models.Event, error) {
channel := event.Channel
event, _ = GetEvent(event.Channel, event.ID)
for _, member := range members {
PushCommand(member.AccountID, models.UnifiedCommand{
Action: "events.new",
Payload: event,
})
}
idxList := lo.Map(members, func(item models.ChannelMember, index int) uint64 {
return uint64(item.AccountID)
})
PushCommandBatch(idxList, models.UnifiedCommand{
Action: "events.new",
Payload: event,
})
if strings.HasPrefix(event.Type, "messages") {
event.Channel = channel
@ -104,6 +105,8 @@ func NotifyMessageEvent(members []models.ChannelMember, event models.Event) {
raw, _ := jsoniter.Marshal(event.Body)
_ = jsoniter.Unmarshal(raw, &body)
var pendingIdx []uint64
for _, member := range members {
if member.ID != event.SenderID {
switch member.Notify {
@ -117,36 +120,39 @@ func NotifyMessageEvent(members []models.ChannelMember, event models.Event) {
break
}
var displayText string
if body.Algorithm == "plain" {
displayText = body.Text
}
if len(displayText) == 0 {
displayText = fmt.Sprintf("%d attachment(s)", len(body.Attachments))
}
var channelDisplay string
if event.Channel.Type == models.ChannelTypeDirect {
channelDisplay = "DM"
}
if len(channelDisplay) == 0 {
channelDisplay = fmt.Sprintf("#%s", event.Channel.Alias)
}
err := NotifyAccountMessager(member.Account,
fmt.Sprintf("%s in %s", event.Sender.Account.Nick, channelDisplay),
fmt.Sprintf("%s", displayText),
nil,
true,
false,
)
if err != nil {
log.Warn().Err(err).Msg("An error occurred when trying notify user.")
}
pendingIdx = append(pendingIdx, uint64(member.AccountID))
}
}
var displayText string
if body.Algorithm == "plain" {
displayText = body.Text
}
if len(displayText) == 0 {
displayText = fmt.Sprintf("%d attachment(s)", len(body.Attachments))
}
var channelDisplay string
if event.Channel.Type == models.ChannelTypeDirect {
channelDisplay = "DM"
}
if len(channelDisplay) == 0 {
channelDisplay = fmt.Sprintf("#%s", event.Channel.Alias)
}
err := NotifyAccountMessagerBatch(
pendingIdx,
fmt.Sprintf("%s in %s", event.Sender.Account.Nick, channelDisplay),
displayText,
nil,
true,
false,
)
if err != nil {
log.Warn().Err(err).Msg("An error occurred when trying notify user.")
}
}
func EditEvent(event models.Event) (models.Event, error) {

View File

@ -2,10 +2,11 @@ package services
import (
"context"
"time"
"git.solsynth.dev/hydrogen/dealer/pkg/proto"
"git.solsynth.dev/hydrogen/messaging/pkg/internal/gap"
"git.solsynth.dev/hydrogen/messaging/pkg/internal/models"
"time"
)
func PushCommand(userId uint, task models.UnifiedCommand) {
@ -18,3 +19,14 @@ func PushCommand(userId uint, task models.UnifiedCommand) {
Body: task.Marshal(),
})
}
func PushCommandBatch(userId []uint64, task models.UnifiedCommand) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
pc := gap.H.GetDealerGrpcConn()
_, _ = proto.NewStreamControllerClient(pc).PushStreamBatch(ctx, &proto.PushStreamBatchRequest{
UserId: userId,
Body: task.Marshal(),
})
}