Push stream able to get which request success

⬆️ Upgrade protobuf version
This commit is contained in:
2025-02-02 15:52:00 +08:00
parent 6de240179f
commit 0f350d00a8
17 changed files with 510 additions and 992 deletions

View File

@ -3,6 +3,7 @@ package grpc
import (
"context"
"fmt"
"git.solsynth.dev/hypernet/nexus/pkg/internal/http/ws"
"github.com/rs/zerolog/log"
@ -19,16 +20,17 @@ func (v *Server) CountStreamConnection(ctx context.Context, request *proto.Count
func (v *Server) PushStream(ctx context.Context, request *proto.PushStreamRequest) (*proto.PushStreamResponse, error) {
var cnt int
var success int
var successes []uint64
var errs []error
if request.UserId != nil {
cnt, success, errs = ws.WebsocketPush(uint(request.GetUserId()), request.GetBody())
cnt, successes, errs = ws.WebsocketPush(uint(request.GetUserId()), request.GetBody())
} else if request.ClientId != nil {
cnt, success, errs = ws.WebsocketPushDirect(request.GetClientId(), request.GetBody())
cnt, successes, errs = ws.WebsocketPushDirect(request.GetClientId(), request.GetBody())
} else {
return nil, fmt.Errorf("you must give one of the user id or client id")
}
success := len(successes)
log.Debug().
Uint64("client_id", request.GetClientId()).
Uint64("user_id", request.GetUserId()).
@ -43,6 +45,7 @@ func (v *Server) PushStream(ctx context.Context, request *proto.PushStreamReques
IsAllSuccess: false,
AffectedCount: int64(success),
FailedCount: int64(cnt - success),
SuccessList: successes,
}, nil
} else if cnt > 0 && success == 0 {
// All fail
@ -58,10 +61,10 @@ func (v *Server) PushStream(ctx context.Context, request *proto.PushStreamReques
func (v *Server) PushStreamBatch(ctx context.Context, request *proto.PushStreamBatchRequest) (*proto.PushStreamResponse, error) {
var cnt int
var success int
var successes []uint64
var errs []error
if len(request.UserId) != 0 {
cnt, success, errs = ws.WebsocketPushBatch(
cnt, successes, errs = ws.WebsocketPushBatch(
lo.Map(request.GetUserId(), func(item uint64, idx int) uint {
return uint(item)
},
@ -71,10 +74,11 @@ func (v *Server) PushStreamBatch(ctx context.Context, request *proto.PushStreamB
if len(request.ClientId) != 0 {
cCnt, cSuccess, cErrs := ws.WebsocketPushBatchDirect(request.GetClientId(), request.GetBody())
cnt += cCnt
success += cSuccess
successes = append(successes, cSuccess...)
errs = append(errs, cErrs...)
}
success := len(successes)
log.Debug().
Any("client_id", request.GetClientId()).
Any("user_id", request.GetUserId()).
@ -99,5 +103,6 @@ func (v *Server) PushStreamBatch(ctx context.Context, request *proto.PushStreamB
IsAllSuccess: true,
AffectedCount: int64(success),
FailedCount: int64(cnt - success),
SuccessList: successes,
}, nil
}

View File

@ -1,11 +1,12 @@
package ws
import (
"math/rand"
"sync"
"git.solsynth.dev/hypernet/nexus/pkg/internal/directory"
"git.solsynth.dev/hypernet/nexus/pkg/nex/sec"
"github.com/rs/zerolog/log"
"math/rand"
"sync"
"github.com/gofiber/contrib/websocket"
)
@ -60,25 +61,25 @@ func ClientCount(uid uint) int {
return len(wsConn[uid])
}
func WebsocketPush(uid uint, body []byte) (count int, success int, errs []error) {
func WebsocketPush(uid uint, body []byte) (count int, successes []uint64, errs []error) {
for _, conn := range wsConn[uid] {
if err := conn.WriteMessage(1, body); err != nil {
errs = append(errs, err)
} else {
success++
successes = append(successes, uint64(uid))
}
count++
}
return
}
func WebsocketPushDirect(clientId uint64, body []byte) (count int, success int, errs []error) {
func WebsocketPushDirect(clientId uint64, body []byte) (count int, successes []uint64, errs []error) {
for _, m := range wsConn {
if conn, ok := m[clientId]; ok {
if err := conn.WriteMessage(1, body); err != nil {
errs = append(errs, err)
} else {
success++
successes = append(successes, clientId)
}
count++
}
@ -86,13 +87,13 @@ func WebsocketPushDirect(clientId uint64, body []byte) (count int, success int,
return
}
func WebsocketPushBatch(uidList []uint, body []byte) (count int, success int, errs []error) {
func WebsocketPushBatch(uidList []uint, body []byte) (count int, successes []uint64, errs []error) {
for _, uid := range uidList {
for _, conn := range wsConn[uid] {
if err := conn.WriteMessage(1, body); err != nil {
errs = append(errs, err)
} else {
success++
successes = append(successes, uint64(uid))
}
count++
}
@ -100,14 +101,14 @@ func WebsocketPushBatch(uidList []uint, body []byte) (count int, success int, er
return
}
func WebsocketPushBatchDirect(clientIdList []uint64, body []byte) (count int, success int, errs []error) {
func WebsocketPushBatchDirect(clientIdList []uint64, body []byte) (count int, successes []uint64, errs []error) {
for _, clientId := range clientIdList {
for _, m := range wsConn {
if conn, ok := m[clientId]; ok {
if err := conn.WriteMessage(1, body); err != nil {
errs = append(errs, err)
} else {
success++
successes = append(successes, clientId)
}
count++
}