diff --git a/pkg/http/api/command.go b/pkg/http/api/command.go new file mode 100644 index 0000000..f3c53ff --- /dev/null +++ b/pkg/http/api/command.go @@ -0,0 +1,48 @@ +package api + +import ( + "context" + "git.solsynth.dev/hypernet/nexus/pkg/directory" + "git.solsynth.dev/hypernet/nexus/pkg/proto" + "github.com/gofiber/fiber/v2" + "github.com/rs/zerolog/log" + "google.golang.org/grpc/metadata" + "strings" + "time" +) + +func invokeCommand(c *fiber.Ctx) error { + command := c.Params("command") + method := strings.ToLower(c.Method()) + + handler := directory.GetCommandHandler(command, method) + if handler == nil { + return fiber.NewError(fiber.StatusNotFound, "command not found") + } + + conn, err := handler.GetGrpcConn() + if err != nil { + return fiber.NewError(fiber.StatusServiceUnavailable, "service unavailable") + } + + log.Debug().Str("id", command).Str("method", method).Msg("Invoking command from HTTP Gateway...") + + ctx := metadata.AppendToOutgoingContext(c.Context(), "client_id", "http-gateway", "ip", c.IP(), "user_agent", c.Get(fiber.HeaderUserAgent)) + ctx, cancel := context.WithTimeout(ctx, time.Second*10) + defer cancel() + + out, err := proto.NewCommandControllerClient(conn).SendCommand(ctx, &proto.CommandArgument{ + Command: command, + Method: method, + Payload: c.Body(), + }) + + if err != nil { + return fiber.NewError(fiber.StatusInternalServerError, err.Error()) + } else { + if !out.IsDelivered { + log.Debug().Str("id", command).Str("method", method).Msg("Invoking command from HTTP Gateway... failed, delivery not confirmed") + } + return c.Status(int(out.Status)).Send(out.Payload) + } +} diff --git a/pkg/http/api/directory.go b/pkg/http/api/directory.go index a90fcc0..5e6de1b 100644 --- a/pkg/http/api/directory.go +++ b/pkg/http/api/directory.go @@ -1,13 +1,8 @@ package api import ( - "github.com/spf13/viper" - "strings" - "git.solsynth.dev/hypernet/nexus/pkg/directory" "github.com/gofiber/fiber/v2" - "github.com/gofiber/fiber/v2/middleware/proxy" - "github.com/rs/zerolog/log" "github.com/samber/lo" ) @@ -22,33 +17,3 @@ func listExistsService(c *fiber.Ctx) error { } })) } - -func forwardServiceRequest(c *fiber.Ctx) error { - serviceType := c.Params("service") - ogKeyword := serviceType - - aliasingMap := viper.GetStringMapString("services.aliases") - if val, ok := aliasingMap[serviceType]; ok { - serviceType = val - } - - service := directory.GetServiceInstanceByType(serviceType) - - if service == nil || service.HttpAddr == nil { - return fiber.NewError(fiber.StatusNotFound, "service not found") - } - - ogUrl := c.Request().URI().String() - url := c.OriginalURL() - url = strings.Replace(url, "/cgi/"+ogKeyword, "/api", 1) - url = "http://" + *service.HttpAddr + url - - log.Debug(). - Str("from", ogUrl). - Str("to", url). - Str("service", serviceType). - Str("id", service.ID). - Msg("Forwarding request for service...") - - return proxy.Do(c, url) -} diff --git a/pkg/http/api/index.go b/pkg/http/api/index.go index 6e66f16..2b11f17 100644 --- a/pkg/http/api/index.go +++ b/pkg/http/api/index.go @@ -24,5 +24,5 @@ func MapAPIs(app *fiber.App) { return c.Next() }).Get("/ws", websocket.New(listenWebsocket)) - app.All("/cgi/:service/*", forwardServiceRequest) + app.All("/cgi/:command", invokeCommand) } diff --git a/pkg/nex/command.go b/pkg/nex/command.go index 4ba13d8..a7d28b5 100644 --- a/pkg/nex/command.go +++ b/pkg/nex/command.go @@ -9,6 +9,7 @@ import ( "google.golang.org/grpc/reflection" "net" "net/http" + "strings" "time" ) @@ -19,20 +20,33 @@ func GetCommandKey(id, method string) string { } func (v *Conn) AddCommand(id, method string, tags []string, fn CommandHandler) error { + method = strings.ToLower(method) dir := proto.NewCommandControllerClient(v.nexusConn) ctx := context.Background() ctx = metadata.AppendToOutgoingContext(ctx, "client_id", v.Info.Id) - _, err := dir.AddCommand(ctx, &proto.CommandInfo{ - Id: id, - Method: method, - Tags: tags, - }) - if err == nil { - v.commandHandlers[GetCommandKey(id, method)] = fn + var addingMethodQueue []string + if method == "all" { + addingMethodQueue = []string{"get", "post", "put", "patch", "delete"} + } else { + addingMethodQueue = append(addingMethodQueue, method) } - return err + for _, method := range addingMethodQueue { + ky := GetCommandKey(id, method) + _, err := dir.AddCommand(ctx, &proto.CommandInfo{ + Id: id, + Method: method, + Tags: tags, + }) + if err == nil { + v.commandHandlers[ky] = fn + } else { + return err + } + } + + return nil } type localCommandRpcServer struct { @@ -43,7 +57,8 @@ type localCommandRpcServer struct { } func (v localCommandRpcServer) SendCommand(ctx context.Context, argument *proto.CommandArgument) (*proto.CommandReturn, error) { - if handler, ok := v.conn.commandHandlers[argument.GetCommand()]; !ok { + ky := GetCommandKey(argument.GetCommand(), argument.GetMethod()) + if handler, ok := v.conn.commandHandlers[ky]; !ok { return &proto.CommandReturn{ Status: http.StatusNotFound, Payload: []byte(argument.GetCommand() + " not found"), diff --git a/pkg/nex/command_test.go b/pkg/nex/command_test.go index 5a5385c..72d0b2b 100644 --- a/pkg/nex/command_test.go +++ b/pkg/nex/command_test.go @@ -25,7 +25,15 @@ func TestHandleCommand(t *testing.T) { t.Fatal(fmt.Errorf("unable to register service: %v", err)) } - err = conn.AddCommand("echo", "get", nil, func(ctx *nex.CommandCtx) error { + err = conn.AddCommand("say.hi", "all", nil, func(ctx *nex.CommandCtx) error { + return ctx.Write([]byte("Hello, World!"), http.StatusOK) + }) + if err != nil { + t.Fatal(fmt.Errorf("unable to add command: %v", err)) + return + } + err = conn.AddCommand("echo", "all", nil, func(ctx *nex.CommandCtx) error { + t.Log("Received command: ", string(ctx.Read())) return ctx.Write(ctx.Read(), http.StatusOK) }) if err != nil { @@ -34,13 +42,13 @@ func TestHandleCommand(t *testing.T) { } go func() { - err := conn.RunCommands("127.0.0.1:6001") + err := conn.RunCommands("0.0.0.0:6001") if err != nil { t.Error(fmt.Errorf("unable to run commands: %v", err)) return } }() - t.Log("Waiting 10 seconds for calling command...") - time.Sleep(time.Second * 10) + t.Log("Waiting 60 seconds for calling command...") + time.Sleep(time.Second * 60) } diff --git a/settings.toml b/settings.toml index 0f72f7c..40794bb 100644 --- a/settings.toml +++ b/settings.toml @@ -24,8 +24,5 @@ refresh_token_duration = 2592000 dsn = "host=localhost user=postgres password=password dbname=hy_dealer port=5432 sslmode=disable" prefix = "dealer_" -[services] -aliases = { id = "auth", uc = "files", co = "interactive", im = "messaging" } - [scraper] user-agent = "SolarBot/1.0" \ No newline at end of file