Batch push websocket to improve performance

This commit is contained in:
2024-07-17 11:58:51 +08:00
parent d97837dab6
commit 96b96912ed
11 changed files with 200 additions and 230 deletions

View File

@ -1,38 +0,0 @@
package grpc
import (
"context"
"git.solsynth.dev/hydrogen/dealer/pkg/hyper"
"git.solsynth.dev/hydrogen/dealer/pkg/proto"
"google.golang.org/grpc"
)
func (v *Server) Authenticate(ctx context.Context, request *proto.AuthRequest) (*proto.AuthReply, error) {
return forwardInvokeRequest(
hyper.ServiceTypeAuthProvider,
func(ctx context.Context, conn *grpc.ClientConn) (*proto.AuthReply, error) {
out, err := proto.NewAuthClient(conn).Authenticate(ctx, request)
return out, err
},
)
}
func (v *Server) EnsurePermGranted(ctx context.Context, request *proto.CheckPermRequest) (*proto.CheckPermResponse, error) {
return forwardInvokeRequest(
hyper.ServiceTypeAuthProvider,
func(ctx context.Context, conn *grpc.ClientConn) (*proto.CheckPermResponse, error) {
out, err := proto.NewAuthClient(conn).EnsurePermGranted(ctx, request)
return out, err
},
)
}
func (v *Server) EnsureUserPermGranted(ctx context.Context, request *proto.CheckUserPermRequest) (*proto.CheckUserPermResponse, error) {
return forwardInvokeRequest(
hyper.ServiceTypeAuthProvider,
func(ctx context.Context, conn *grpc.ClientConn) (*proto.CheckUserPermResponse, error) {
out, err := proto.NewAuthClient(conn).EnsureUserPermGranted(ctx, request)
return out, err
},
)
}

View File

@ -1,28 +0,0 @@
package grpc
import (
"context"
"fmt"
"git.solsynth.dev/hydrogen/dealer/pkg/hyper"
"git.solsynth.dev/hydrogen/dealer/pkg/internal/directory"
"google.golang.org/grpc"
"time"
)
func forwardInvokeRequest[T any](serviceType string, executor func(context.Context, *grpc.ClientConn) (T, error)) (T, error) {
var emptyResult T
instance := directory.GetServiceInstance(serviceType)
if instance == nil {
return emptyResult, fmt.Errorf("no available service %s found", hyper.ServiceTypeAuthProvider)
}
conn, err := instance.GetGrpcConn()
if err != nil {
return emptyResult, fmt.Errorf("service is down: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
return executor(ctx, conn)
}

View File

@ -1,28 +0,0 @@
package grpc
import (
"context"
"git.solsynth.dev/hydrogen/dealer/pkg/hyper"
"git.solsynth.dev/hydrogen/dealer/pkg/proto"
"google.golang.org/grpc"
)
func (v *Server) NotifyUser(ctx context.Context, request *proto.NotifyUserRequest) (*proto.NotifyResponse, error) {
return forwardInvokeRequest(
hyper.ServiceTypeAuthProvider,
func(ctx context.Context, conn *grpc.ClientConn) (*proto.NotifyResponse, error) {
out, err := proto.NewNotifierClient(conn).NotifyUser(ctx, request)
return out, err
},
)
}
func (v *Server) NotifyAllUser(ctx context.Context, request *proto.NotifyRequest) (*proto.NotifyResponse, error) {
return forwardInvokeRequest(
hyper.ServiceTypeAuthProvider,
func(ctx context.Context, conn *grpc.ClientConn) (*proto.NotifyResponse, error) {
out, err := proto.NewNotifierClient(conn).NotifyAllUser(ctx, request)
return out, err
},
)
}

View File

@ -1,78 +0,0 @@
package grpc
import (
"context"
"git.solsynth.dev/hydrogen/dealer/pkg/hyper"
"git.solsynth.dev/hydrogen/dealer/pkg/proto"
"google.golang.org/grpc"
)
func (v *Server) ListCommunityRealm(ctx context.Context, request *proto.ListRealmRequest) (*proto.ListRealmResponse, error) {
return forwardInvokeRequest(
hyper.ServiceTypeAuthProvider,
func(ctx context.Context, conn *grpc.ClientConn) (*proto.ListRealmResponse, error) {
out, err := proto.NewRealmClient(conn).ListCommunityRealm(ctx, request)
return out, err
},
)
}
func (v *Server) ListAvailableRealm(ctx context.Context, request *proto.LookupUserRealmRequest) (*proto.ListRealmResponse, error) {
return forwardInvokeRequest(
hyper.ServiceTypeAuthProvider,
func(ctx context.Context, conn *grpc.ClientConn) (*proto.ListRealmResponse, error) {
out, err := proto.NewRealmClient(conn).ListAvailableRealm(ctx, request)
return out, err
},
)
}
func (v *Server) ListOwnedRealm(ctx context.Context, request *proto.LookupUserRealmRequest) (*proto.ListRealmResponse, error) {
return forwardInvokeRequest(
hyper.ServiceTypeAuthProvider,
func(ctx context.Context, conn *grpc.ClientConn) (*proto.ListRealmResponse, error) {
out, err := proto.NewRealmClient(conn).ListOwnedRealm(ctx, request)
return out, err
},
)
}
func (v *Server) GetRealm(ctx context.Context, request *proto.LookupRealmRequest) (*proto.RealmInfo, error) {
return forwardInvokeRequest(
hyper.ServiceTypeAuthProvider,
func(ctx context.Context, conn *grpc.ClientConn) (*proto.RealmInfo, error) {
out, err := proto.NewRealmClient(conn).GetRealm(ctx, request)
return out, err
},
)
}
func (v *Server) ListRealmMember(ctx context.Context, request *proto.RealmMemberLookupRequest) (*proto.ListRealmMemberResponse, error) {
return forwardInvokeRequest(
hyper.ServiceTypeAuthProvider,
func(ctx context.Context, conn *grpc.ClientConn) (*proto.ListRealmMemberResponse, error) {
out, err := proto.NewRealmClient(conn).ListRealmMember(ctx, request)
return out, err
},
)
}
func (v *Server) GetRealmMember(ctx context.Context, request *proto.RealmMemberLookupRequest) (*proto.RealmMemberInfo, error) {
return forwardInvokeRequest(
hyper.ServiceTypeAuthProvider,
func(ctx context.Context, conn *grpc.ClientConn) (*proto.RealmMemberInfo, error) {
out, err := proto.NewRealmClient(conn).GetRealmMember(ctx, request)
return out, err
},
)
}
func (v *Server) CheckRealmMemberPerm(ctx context.Context, request *proto.CheckRealmPermRequest) (*proto.CheckRealmPermResponse, error) {
return forwardInvokeRequest(
hyper.ServiceTypeAuthProvider,
func(ctx context.Context, conn *grpc.ClientConn) (*proto.CheckRealmPermResponse, error) {
out, err := proto.NewRealmClient(conn).CheckRealmMemberPerm(ctx, request)
return out, err
},
)
}

View File

@ -1,18 +0,0 @@
package grpc
import (
"context"
"git.solsynth.dev/hydrogen/dealer/pkg/hyper"
"git.solsynth.dev/hydrogen/dealer/pkg/proto"
"google.golang.org/grpc"
)
func (v *Server) RecordEvent(ctx context.Context, request *proto.RecordEventRequest) (*proto.RecordEventResponse, error) {
return forwardInvokeRequest(
hyper.ServiceTypeAuthProvider,
func(ctx context.Context, conn *grpc.ClientConn) (*proto.RecordEventResponse, error) {
out, err := proto.NewEventRecorderClient(conn).RecordEvent(ctx, request)
return out, err
},
)
}

View File

@ -16,9 +16,6 @@ import (
type Server struct {
proto.UnimplementedServiceDirectoryServer
proto.UnimplementedStreamControllerServer
proto.UnimplementedEventRecorderServer
proto.UnimplementedNotifierServer
proto.UnimplementedRealmServer
proto.UnimplementedAuthServer
srv *grpc.Server
@ -31,9 +28,6 @@ func NewServer() *Server {
proto.RegisterServiceDirectoryServer(server.srv, server)
proto.RegisterStreamControllerServer(server.srv, server)
proto.RegisterEventRecorderServer(server.srv, server)
proto.RegisterNotifierServer(server.srv, server)
proto.RegisterRealmServer(server.srv, server)
proto.RegisterAuthServer(server.srv, server)
health.RegisterHealthServer(server.srv, server)

View File

@ -3,8 +3,10 @@ package grpc
import (
"context"
"fmt"
"git.solsynth.dev/hydrogen/dealer/pkg/internal/services"
"git.solsynth.dev/hydrogen/dealer/pkg/proto"
"github.com/samber/lo"
)
func (v *Server) CountStreamConnection(ctx context.Context, request *proto.CountConnectionRequest) (*proto.CountConnectionResponse, error) {
@ -34,3 +36,29 @@ func (v *Server) PushStream(ctx context.Context, request *proto.PushStreamReques
FailedCount: int64(cnt - success),
}, nil
}
func (v *Server) PushStreamBatch(ctx context.Context, request *proto.PushStreamBatchRequest) (*proto.PushStreamResponse, error) {
cnt, success, errs := services.WebsocketPushBatch(
lo.Map(request.GetUserId(), func(item uint64, idx int) uint {
return uint(item)
},
), request.GetBody(),
)
if len(errs) > 0 {
// Partial fail
return &proto.PushStreamResponse{
IsAllSuccess: false,
AffectedCount: int64(success),
FailedCount: int64(cnt - success),
}, nil
} else if cnt > 0 && success == 0 {
// All fail
return nil, fmt.Errorf("all push request failed: %v", errs)
}
return &proto.PushStreamResponse{
IsAllSuccess: true,
AffectedCount: int64(success),
FailedCount: int64(cnt - success),
}, nil
}

View File

@ -1,9 +1,10 @@
package services
import (
"sync"
"git.solsynth.dev/hydrogen/dealer/pkg/internal/models"
"github.com/gofiber/contrib/websocket"
"sync"
)
var (
@ -44,3 +45,17 @@ func WebsocketPush(uid uint, body []byte) (count int, success int, errs []error)
}
return
}
func WebsocketPushBatch(uidList []uint, body []byte) (count int, success int, errs []error) {
for _, uid := range uidList {
for conn := range wsConn[uid] {
if err := conn.WriteMessage(1, body); err != nil {
errs = append(errs, err)
} else {
success++
}
count++
}
}
return
}