♻️ Better grpc map encoder / decoder
This commit is contained in:
22
pkg/directory/command.go
Normal file
22
pkg/directory/command.go
Normal file
@ -0,0 +1,22 @@
|
||||
package directory
|
||||
|
||||
const (
|
||||
CommandMethodGet = "get"
|
||||
CommandMethodPut = "put"
|
||||
CommandMethodPatch = "patch"
|
||||
CommandMethodPost = "post"
|
||||
CommandMethodDelete = "delete"
|
||||
)
|
||||
|
||||
type Command struct {
|
||||
// The unique identifier of the command, different method command can hold the same command id
|
||||
ID string `json:"id"`
|
||||
// The method of the command, such as get, post, others; inspired by RESTful design
|
||||
Method string `json:"method"`
|
||||
// The tags of the command will be used to invoke the pre-command middlewares and post-command middlewares
|
||||
Tags []string `json:"tags"`
|
||||
// The implementation of the command, the handler is the service that will be invoked
|
||||
Handler []*ServiceInstance `json:"handler"`
|
||||
|
||||
robinIndex uint
|
||||
}
|
62
pkg/directory/command_mapping.go
Normal file
62
pkg/directory/command_mapping.go
Normal file
@ -0,0 +1,62 @@
|
||||
package directory
|
||||
|
||||
import (
|
||||
"github.com/samber/lo"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// In commands, we use the map and the mutex because it is usually read and only sometimes write
|
||||
var commandDirectory = make(map[string]*Command)
|
||||
var commandDirectoryMutex sync.Mutex
|
||||
|
||||
func GetCommandKey(id, method string) string {
|
||||
return id + ":" + method
|
||||
}
|
||||
|
||||
func AddCommand(id, method string, tags []string, handler *ServiceInstance) {
|
||||
commandDirectoryMutex.Lock()
|
||||
defer commandDirectoryMutex.Unlock()
|
||||
|
||||
ky := GetCommandKey(id, method)
|
||||
if _, ok := commandDirectory[id]; !ok {
|
||||
commandDirectory[id] = &Command{
|
||||
ID: id,
|
||||
Method: method,
|
||||
Tags: tags,
|
||||
Handler: []*ServiceInstance{handler},
|
||||
}
|
||||
} else {
|
||||
commandDirectory[ky].Handler = append(commandDirectory[ky].Handler, handler)
|
||||
commandDirectory[ky].Tags = lo.Uniq(append(commandDirectory[ky].Tags, tags...))
|
||||
}
|
||||
|
||||
commandDirectory[ky].Handler = lo.UniqBy(commandDirectory[ky].Handler, func(item *ServiceInstance) string {
|
||||
return item.ID
|
||||
})
|
||||
}
|
||||
|
||||
func GetCommandHandler(id, method string) *ServiceInstance {
|
||||
commandDirectoryMutex.Lock()
|
||||
defer commandDirectoryMutex.Unlock()
|
||||
|
||||
ky := GetCommandKey(id, method)
|
||||
if val, ok := commandDirectory[ky]; ok {
|
||||
if len(val.Handler) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
idx := val.robinIndex % uint(len(val.Handler))
|
||||
val.robinIndex = idx + 1
|
||||
return val.Handler[idx]
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func RemoveCommand(id, method string) {
|
||||
commandDirectoryMutex.Lock()
|
||||
defer commandDirectoryMutex.Unlock()
|
||||
|
||||
ky := GetCommandKey(id, method)
|
||||
delete(commandDirectory, ky)
|
||||
}
|
89
pkg/directory/command_rpc.go
Normal file
89
pkg/directory/command_rpc.go
Normal file
@ -0,0 +1,89 @@
|
||||
package directory
|
||||
|
||||
import (
|
||||
"context"
|
||||
"git.solsynth.dev/hypernet/nexus/pkg/proto"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
"io"
|
||||
"time"
|
||||
)
|
||||
|
||||
type CommandRpcServer struct {
|
||||
proto.UnimplementedCommandControllerServer
|
||||
}
|
||||
|
||||
func (c CommandRpcServer) AddCommand(ctx context.Context, info *proto.CommandInfo) (*proto.AddCommandResponse, error) {
|
||||
clientId, err := GetClientId(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
service := GetServiceInstanceByType(clientId)
|
||||
if service == nil {
|
||||
return nil, status.Errorf(codes.NotFound, "service not found")
|
||||
}
|
||||
|
||||
AddCommand(info.GetId(), info.GetMethod(), info.GetTags(), service)
|
||||
return &proto.AddCommandResponse{
|
||||
IsSuccess: true,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c CommandRpcServer) RemoveCommand(ctx context.Context, request *proto.CommandLookupRequest) (*proto.RemoveCommandResponse, error) {
|
||||
RemoveCommand(request.GetId(), request.GetMethod())
|
||||
return &proto.RemoveCommandResponse{
|
||||
IsSuccess: true,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c CommandRpcServer) SendCommand(ctx context.Context, argument *proto.CommandArgument) (*proto.CommandReturn, error) {
|
||||
id := argument.GetCommand()
|
||||
method := argument.GetMethod()
|
||||
|
||||
handler := GetCommandHandler(id, method)
|
||||
if handler == nil {
|
||||
return nil, status.Errorf(codes.NotFound, "command not found")
|
||||
}
|
||||
|
||||
conn, err := handler.GetGrpcConn()
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Unavailable, "service unavailable")
|
||||
}
|
||||
|
||||
contx, cancel := context.WithTimeout(context.Background(), time.Second*10)
|
||||
defer cancel()
|
||||
|
||||
return proto.NewCommandControllerClient(conn).SendCommand(contx, argument)
|
||||
}
|
||||
|
||||
func (c CommandRpcServer) SendStreamCommand(g grpc.BidiStreamingServer[proto.CommandArgument, proto.CommandReturn]) error {
|
||||
for {
|
||||
pck, err := g.Recv()
|
||||
if err == io.EOF {
|
||||
return nil
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
id := pck.GetCommand()
|
||||
method := pck.GetMethod()
|
||||
|
||||
handler := GetCommandHandler(id, method)
|
||||
if handler == nil {
|
||||
return status.Errorf(codes.NotFound, "command not found")
|
||||
}
|
||||
|
||||
conn, err := handler.GetGrpcConn()
|
||||
|
||||
contx, cancel := context.WithTimeout(context.Background(), time.Second*10)
|
||||
result, _ := proto.NewCommandControllerClient(conn).SendCommand(contx, pck)
|
||||
cancel()
|
||||
|
||||
_ = g.Send(&proto.CommandReturn{
|
||||
Status: result.Status,
|
||||
Payload: result.Payload,
|
||||
})
|
||||
}
|
||||
}
|
20
pkg/directory/exts.go
Normal file
20
pkg/directory/exts.go
Normal file
@ -0,0 +1,20 @@
|
||||
package directory
|
||||
|
||||
import (
|
||||
"context"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
func GetClientId(ctx context.Context) (string, error) {
|
||||
var clientId string
|
||||
if md, ok := metadata.FromIncomingContext(ctx); !ok {
|
||||
return clientId, status.Errorf(codes.InvalidArgument, "missing metadata")
|
||||
} else if val, ok := md["client_id"]; !ok || len(val) == 0 {
|
||||
return clientId, status.Errorf(codes.Unauthenticated, "missing client_id in metadata")
|
||||
} else {
|
||||
clientId = val[0]
|
||||
}
|
||||
return clientId, nil
|
||||
}
|
@ -4,6 +4,7 @@ import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
// In services, we use sync.Map because it will be both often read and write
|
||||
var serviceDirectory sync.Map
|
||||
|
||||
func GetServiceInstance(id string) *ServiceInstance {
|
@ -3,17 +3,19 @@ package directory
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
"git.solsynth.dev/hypernet/nexus/pkg/proto"
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/samber/lo"
|
||||
)
|
||||
|
||||
type DirectoryRpcServer struct {
|
||||
type ServiceRpcServer struct {
|
||||
proto.UnimplementedServiceDirectoryServer
|
||||
}
|
||||
|
||||
func convertServiceToInfo(in *ServiceInstance) *proto.ServiceInfo {
|
||||
func instantiationService(in *ServiceInstance) *proto.ServiceInfo {
|
||||
if in == nil {
|
||||
return nil
|
||||
}
|
||||
@ -26,23 +28,23 @@ func convertServiceToInfo(in *ServiceInstance) *proto.ServiceInfo {
|
||||
}
|
||||
}
|
||||
|
||||
func (v *DirectoryRpcServer) GetService(ctx context.Context, request *proto.GetServiceRequest) (*proto.GetServiceResponse, error) {
|
||||
func (v *ServiceRpcServer) GetService(ctx context.Context, request *proto.GetServiceRequest) (*proto.GetServiceResponse, error) {
|
||||
if request.Id != nil {
|
||||
out := GetServiceInstance(request.GetId())
|
||||
return &proto.GetServiceResponse{
|
||||
Data: convertServiceToInfo(out),
|
||||
Data: instantiationService(out),
|
||||
}, nil
|
||||
}
|
||||
if request.Type != nil {
|
||||
out := GetServiceInstanceByType(request.GetType())
|
||||
return &proto.GetServiceResponse{
|
||||
Data: convertServiceToInfo(out),
|
||||
Data: instantiationService(out),
|
||||
}, nil
|
||||
}
|
||||
return nil, fmt.Errorf("no filter condition is provided")
|
||||
}
|
||||
|
||||
func (v *DirectoryRpcServer) ListService(ctx context.Context, request *proto.ListServiceRequest) (*proto.ListServiceResponse, error) {
|
||||
func (v *ServiceRpcServer) ListService(ctx context.Context, request *proto.ListServiceRequest) (*proto.ListServiceResponse, error) {
|
||||
var out []*ServiceInstance
|
||||
if request.Type != nil {
|
||||
out = ListServiceInstanceByType(request.GetType())
|
||||
@ -51,27 +53,36 @@ func (v *DirectoryRpcServer) ListService(ctx context.Context, request *proto.Lis
|
||||
}
|
||||
return &proto.ListServiceResponse{
|
||||
Data: lo.Map(out, func(item *ServiceInstance, index int) *proto.ServiceInfo {
|
||||
return convertServiceToInfo(item)
|
||||
return instantiationService(item)
|
||||
}),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (v *DirectoryRpcServer) AddService(ctx context.Context, info *proto.ServiceInfo) (*proto.AddServiceResponse, error) {
|
||||
func (v *ServiceRpcServer) AddService(ctx context.Context, info *proto.ServiceInfo) (*proto.AddServiceResponse, error) {
|
||||
clientId, err := GetClientId(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if info.GetId() != clientId {
|
||||
return nil, status.Errorf(codes.InvalidArgument, "client_id mismatch in metadata")
|
||||
}
|
||||
|
||||
in := &ServiceInstance{
|
||||
ID: info.GetId(),
|
||||
ID: clientId,
|
||||
Type: info.GetType(),
|
||||
Label: info.GetLabel(),
|
||||
GrpcAddr: info.GetGrpcAddr(),
|
||||
HttpAddr: info.HttpAddr,
|
||||
}
|
||||
AddServiceInstance(in)
|
||||
log.Info().Str("id", info.GetId()).Str("label", info.GetLabel()).Msg("New service added.")
|
||||
log.Info().Str("id", clientId).Str("label", info.GetLabel()).Msg("New service added.")
|
||||
return &proto.AddServiceResponse{
|
||||
IsSuccess: true,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (v *DirectoryRpcServer) RemoveService(ctx context.Context, request *proto.RemoveServiceRequest) (*proto.RemoveServiceResponse, error) {
|
||||
func (v *ServiceRpcServer) RemoveService(ctx context.Context, request *proto.RemoveServiceRequest) (*proto.RemoveServiceResponse, error) {
|
||||
RemoveServiceInstance(request.GetId())
|
||||
log.Info().Str("id", request.GetId()).Msg("A service removed.")
|
||||
return &proto.RemoveServiceResponse{
|
Reference in New Issue
Block a user