diff --git a/go.mod b/go.mod index d12be2b..54a04d9 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module git.solsynth.dev/hypernet/messaging go 1.23.2 require ( - git.solsynth.dev/hypernet/nexus v0.0.0-20241123050605-25ab1371739b + git.solsynth.dev/hypernet/nexus v0.0.0-20250202075200-0f350d00a876 git.solsynth.dev/hypernet/passport v0.0.0-20250201110034-ec0048042a84 git.solsynth.dev/hypernet/pusher v0.0.0-20241228030233-50ff8304e465 github.com/dgraph-io/ristretto v0.2.0 diff --git a/go.sum b/go.sum index 2e13e89..ef2406f 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,8 @@ filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= git.solsynth.dev/hypernet/nexus v0.0.0-20241123050605-25ab1371739b h1:8yB9kMwEMY/nIbmDDxrhH5sTypgmK5PIIiIfP5QXx4s= git.solsynth.dev/hypernet/nexus v0.0.0-20241123050605-25ab1371739b/go.mod h1:PhLCv2lsNoscPVJbkWnxwQnJ141lc4RIEkVffrHwl4s= +git.solsynth.dev/hypernet/nexus v0.0.0-20250202075200-0f350d00a876 h1:+uhz0RSIqMoYipVg/M1Lch7i0ZB/+vf34XBPe4ugDyY= +git.solsynth.dev/hypernet/nexus v0.0.0-20250202075200-0f350d00a876/go.mod h1:v+rpf1ZDRi8moaThTAkj5DMQU+rw96YTHcN8/7n/p2Y= git.solsynth.dev/hypernet/passport v0.0.0-20250201110034-ec0048042a84 h1:5nJGgRjePhqHW42Ym5vXJj1jxfImR/fzo7RfOSJ2kpw= git.solsynth.dev/hypernet/passport v0.0.0-20250201110034-ec0048042a84/go.mod h1:WIx0+N0BvbsbY/lYxuC2rq6v4EIasoTwg5cnC+ExMeI= git.solsynth.dev/hypernet/pusher v0.0.0-20241228030233-50ff8304e465 h1:KFtv9lF0JMUGsq1uHwQvop8PTyqdiLuUQuRrd5WmzPk= diff --git a/pkg/internal/services/events.go b/pkg/internal/services/events.go index 441f80b..881c0b8 100644 --- a/pkg/internal/services/events.go +++ b/pkg/internal/services/events.go @@ -92,10 +92,13 @@ func NewEvent(event models.Event) (models.Event, error) { idxList := lo.Map(members, func(item models.ChannelMember, index int) uint64 { return uint64(item.AccountID) }) - PushCommandBatch(idxList, nex.WebSocketPackage{ + successList64 := PushCommandBatch(idxList, nex.WebSocketPackage{ Action: "events.new", Payload: event, }) + successList := lo.Map(successList64, func(item uint64, index int) uint { + return uint(item) + }) if strings.HasPrefix(event.Type, "messages") { event.Channel, _ = GetChannel(event.ChannelID) @@ -105,7 +108,9 @@ func NewEvent(event models.Event) (models.Event, error) { event.Channel.Realm = &realm } } - go NotifyMessageEvent(members, event) + go NotifyMessageEvent(lo.Filter(members, func(item models.ChannelMember, _ int) bool { + return !lo.Contains(successList, item.AccountID) + }), event) } return event, nil diff --git a/pkg/internal/services/websocket.go b/pkg/internal/services/websocket.go index 2f6fd01..f2a0370 100644 --- a/pkg/internal/services/websocket.go +++ b/pkg/internal/services/websocket.go @@ -27,12 +27,12 @@ func PushCommand(userId uint, task nex.WebSocketPackage) { } } -func PushCommandBatch(userId []uint64, task nex.WebSocketPackage) { +func PushCommandBatch(userId []uint64, task nex.WebSocketPackage) []uint64 { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() pc := gap.Nx.GetNexusGrpcConn() - _, err := proto.NewStreamServiceClient(pc).PushStreamBatch(ctx, &proto.PushStreamBatchRequest{ + resp, err := proto.NewStreamServiceClient(pc).PushStreamBatch(ctx, &proto.PushStreamBatchRequest{ UserId: userId, Body: task.Marshal(), }) @@ -40,4 +40,6 @@ func PushCommandBatch(userId []uint64, task nex.WebSocketPackage) { if err != nil { log.Warn().Err(err).Msg("Failed to push websocket command to nexus in batches...") } + + return resp.GetSuccessList() }