Nexus/pkg/internal/directory/command_rpc.go

116 lines
2.8 KiB
Go
Raw Permalink Normal View History

package directory
import (
"context"
"git.solsynth.dev/hypernet/nexus/pkg/proto"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"io"
2024-10-20 09:23:53 +00:00
"net/http"
"time"
)
type CommandRpcServer struct {
2024-10-21 14:07:36 +00:00
proto.UnimplementedCommandProviderServer
}
2024-11-03 06:24:03 +00:00
func (c *CommandRpcServer) AddCommand(ctx context.Context, info *proto.CommandInfo) (*proto.AddCommandResponse, error) {
clientId, err := GetClientId(ctx)
if err != nil {
return nil, err
}
2024-10-20 09:23:53 +00:00
service := GetServiceInstance(clientId)
if service == nil {
return nil, status.Errorf(codes.NotFound, "service not found")
}
AddCommand(info.GetId(), info.GetMethod(), info.GetTags(), service)
return &proto.AddCommandResponse{
IsSuccess: true,
}, nil
}
2024-11-03 06:24:03 +00:00
func (c *CommandRpcServer) RemoveCommand(ctx context.Context, request *proto.CommandLookupRequest) (*proto.RemoveCommandResponse, error) {
RemoveCommand(request.GetId(), request.GetMethod())
return &proto.RemoveCommandResponse{
IsSuccess: true,
}, nil
}
2024-11-03 06:24:03 +00:00
func (c *CommandRpcServer) SendCommand(ctx context.Context, argument *proto.CommandArgument) (*proto.CommandReturn, error) {
id := argument.GetCommand()
method := argument.GetMethod()
handler := GetCommandHandler(id, method)
if handler == nil {
2024-10-20 09:23:53 +00:00
return &proto.CommandReturn{
IsDelivered: false,
Status: http.StatusNotFound,
2024-10-20 10:13:07 +00:00
ContentType: "text/plain+error",
2024-10-20 09:23:53 +00:00
Payload: []byte("command not found"),
}, nil
}
conn, err := handler.GetGrpcConn()
if err != nil {
2024-10-20 09:23:53 +00:00
return &proto.CommandReturn{
IsDelivered: false,
Status: http.StatusServiceUnavailable,
2024-10-20 10:13:07 +00:00
ContentType: "text/plain+error",
2024-10-20 09:23:53 +00:00
Payload: []byte("service unavailable"),
}, nil
}
2024-10-20 09:23:53 +00:00
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
2024-10-21 14:07:36 +00:00
out, err := proto.NewCommandProviderClient(conn).SendCommand(ctx, argument)
2024-10-20 09:23:53 +00:00
if err != nil {
return &proto.CommandReturn{
IsDelivered: true,
Status: http.StatusInternalServerError,
2024-10-20 10:13:07 +00:00
ContentType: "text/plain+error",
2024-10-20 09:23:53 +00:00
Payload: []byte(err.Error()),
}, nil
}
out.IsDelivered = true
return out, nil
}
2024-11-03 06:24:03 +00:00
func (c *CommandRpcServer) SendStreamCommand(g proto.CommandProvider_SendStreamCommandServer) error {
for {
pck, err := g.Recv()
if err == io.EOF {
return nil
} else if err != nil {
return err
}
id := pck.GetCommand()
method := pck.GetMethod()
handler := GetCommandHandler(id, method)
if handler == nil {
return status.Errorf(codes.NotFound, "command not found")
}
conn, err := handler.GetGrpcConn()
2024-10-20 09:23:53 +00:00
ctx, cancel := context.WithTimeout(g.Context(), time.Second*10)
2024-10-21 14:07:36 +00:00
out, err := proto.NewCommandProviderClient(conn).SendCommand(ctx, pck)
cancel()
2024-10-20 09:23:53 +00:00
if err != nil {
_ = g.Send(&proto.CommandReturn{
IsDelivered: false,
Status: http.StatusInternalServerError,
2024-10-20 10:13:07 +00:00
ContentType: "text/plain+error",
2024-10-20 09:23:53 +00:00
Payload: []byte(err.Error()),
})
} else {
2024-10-20 10:13:07 +00:00
_ = g.Send(out)
2024-10-20 09:23:53 +00:00
}
}
}