Command registration

This commit is contained in:
2024-10-20 17:23:53 +08:00
parent 18ce501089
commit ff24a43580
13 changed files with 335 additions and 92 deletions

View File

@ -1,7 +1,10 @@
package directory
import (
"git.solsynth.dev/hypernet/nexus/pkg/nex"
"github.com/rs/zerolog/log"
"github.com/samber/lo"
"strings"
"sync"
)
@ -9,17 +12,17 @@ import (
var commandDirectory = make(map[string]*Command)
var commandDirectoryMutex sync.Mutex
func GetCommandKey(id, method string) string {
return id + ":" + method
}
func AddCommand(id, method string, tags []string, handler *ServiceInstance) {
commandDirectoryMutex.Lock()
defer commandDirectoryMutex.Unlock()
ky := GetCommandKey(id, method)
if _, ok := commandDirectory[id]; !ok {
commandDirectory[id] = &Command{
if tags == nil {
tags = make([]string, 0)
}
ky := nex.GetCommandKey(id, method)
if _, ok := commandDirectory[ky]; !ok {
commandDirectory[ky] = &Command{
ID: id,
Method: method,
Tags: tags,
@ -33,13 +36,15 @@ func AddCommand(id, method string, tags []string, handler *ServiceInstance) {
commandDirectory[ky].Handler = lo.UniqBy(commandDirectory[ky].Handler, func(item *ServiceInstance) string {
return item.ID
})
log.Info().Str("id", id).Str("method", method).Str("tags", strings.Join(tags, ",")).Msg("New command registered")
}
func GetCommandHandler(id, method string) *ServiceInstance {
commandDirectoryMutex.Lock()
defer commandDirectoryMutex.Unlock()
ky := GetCommandKey(id, method)
ky := nex.GetCommandKey(id, method)
if val, ok := commandDirectory[ky]; ok {
if len(val.Handler) == 0 {
return nil
@ -57,6 +62,6 @@ func RemoveCommand(id, method string) {
commandDirectoryMutex.Lock()
defer commandDirectoryMutex.Unlock()
ky := GetCommandKey(id, method)
ky := nex.GetCommandKey(id, method)
delete(commandDirectory, ky)
}

View File

@ -7,6 +7,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"io"
"net/http"
"time"
)
@ -20,7 +21,7 @@ func (c CommandRpcServer) AddCommand(ctx context.Context, info *proto.CommandInf
return nil, err
}
service := GetServiceInstanceByType(clientId)
service := GetServiceInstance(clientId)
if service == nil {
return nil, status.Errorf(codes.NotFound, "service not found")
}
@ -44,18 +45,35 @@ func (c CommandRpcServer) SendCommand(ctx context.Context, argument *proto.Comma
handler := GetCommandHandler(id, method)
if handler == nil {
return nil, status.Errorf(codes.NotFound, "command not found")
return &proto.CommandReturn{
IsDelivered: false,
Status: http.StatusNotFound,
Payload: []byte("command not found"),
}, nil
}
conn, err := handler.GetGrpcConn()
if err != nil {
return nil, status.Errorf(codes.Unavailable, "service unavailable")
return &proto.CommandReturn{
IsDelivered: false,
Status: http.StatusServiceUnavailable,
Payload: []byte("service unavailable"),
}, nil
}
contx, cancel := context.WithTimeout(context.Background(), time.Second*10)
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
return proto.NewCommandControllerClient(conn).SendCommand(contx, argument)
out, err := proto.NewCommandControllerClient(conn).SendCommand(ctx, argument)
if err != nil {
return &proto.CommandReturn{
IsDelivered: true,
Status: http.StatusInternalServerError,
Payload: []byte(err.Error()),
}, nil
}
out.IsDelivered = true
return out, nil
}
func (c CommandRpcServer) SendStreamCommand(g grpc.BidiStreamingServer[proto.CommandArgument, proto.CommandReturn]) error {
@ -77,13 +95,21 @@ func (c CommandRpcServer) SendStreamCommand(g grpc.BidiStreamingServer[proto.Com
conn, err := handler.GetGrpcConn()
contx, cancel := context.WithTimeout(context.Background(), time.Second*10)
result, _ := proto.NewCommandControllerClient(conn).SendCommand(contx, pck)
ctx, cancel := context.WithTimeout(g.Context(), time.Second*10)
result, err := proto.NewCommandControllerClient(conn).SendCommand(ctx, pck)
cancel()
_ = g.Send(&proto.CommandReturn{
Status: result.Status,
Payload: result.Payload,
})
if err != nil {
_ = g.Send(&proto.CommandReturn{
IsDelivered: false,
Status: http.StatusInternalServerError,
Payload: []byte(err.Error()),
})
} else {
_ = g.Send(&proto.CommandReturn{
Status: result.Status,
Payload: result.Payload,
})
}
}
}

View File

@ -76,7 +76,7 @@ func (v *ServiceRpcServer) AddService(ctx context.Context, info *proto.ServiceIn
HttpAddr: info.HttpAddr,
}
AddServiceInstance(in)
log.Info().Str("id", clientId).Str("label", info.GetLabel()).Msg("New service added.")
log.Info().Str("id", clientId).Str("label", info.GetLabel()).Msg("New service registered")
return &proto.AddServiceResponse{
IsSuccess: true,
}, nil