diff --git a/pkg/internal/directory/command.go b/pkg/internal/directory/command.go deleted file mode 100644 index 85de093..0000000 --- a/pkg/internal/directory/command.go +++ /dev/null @@ -1,22 +0,0 @@ -package directory - -const ( - CommandMethodGet = "get" - CommandMethodPut = "put" - CommandMethodPatch = "patch" - CommandMethodPost = "post" - CommandMethodDelete = "delete" -) - -type Command struct { - // The unique identifier of the command, different method command can hold the same command id - ID string `json:"id"` - // The method of the command, such as get, post, others; inspired by RESTful design - Method string `json:"method"` - // The tags of the command will be used to invoke the pre-command middlewares and post-command middlewares - Tags []string `json:"tags"` - // The implementation of the command, the handler is the service that will be invoked - Handler []*ServiceInstance `json:"handler"` - - RobinIndex uint `json:"robin_index"` -} diff --git a/pkg/internal/directory/command_mapping.go b/pkg/internal/directory/command_mapping.go deleted file mode 100644 index 8352f53..0000000 --- a/pkg/internal/directory/command_mapping.go +++ /dev/null @@ -1,79 +0,0 @@ -package directory - -import ( - "context" - "git.solsynth.dev/hypernet/nexus/pkg/internal/kv" - "git.solsynth.dev/hypernet/nexus/pkg/nex" - "github.com/goccy/go-json" - "github.com/rs/zerolog/log" - "github.com/samber/lo" -) - -const CommandInfoKvPrefix = "nexus.command/" - -func AddCommand(id, method string, tags []string, handler *ServiceInstance) error { - if tags == nil { - tags = make([]string, 0) - } - - ky := CommandInfoKvPrefix + nex.GetCommandKey(id, method) - - command := &Command{ - ID: id, - Method: method, - Tags: tags, - Handler: []*ServiceInstance{handler}, - } - - command.Handler = lo.UniqBy(command.Handler, func(item *ServiceInstance) string { - return item.ID - }) - - commandJSON, err := json.Marshal(command) - if err != nil { - log.Printf("Error marshaling command: %v", err) - return nil - } - - _, err = kv.Kv.Put(context.Background(), ky, string(commandJSON)) - return err -} - -func GetCommandHandler(id, method string) *ServiceInstance { - ky := CommandInfoKvPrefix + nex.GetCommandKey(id, method) - - resp, err := kv.Kv.Get(context.Background(), ky) - if err != nil { - return nil - } - - if len(resp.Kvs) == 0 { - return nil - } - - var command Command - if err := json.Unmarshal(resp.Kvs[0].Value, &command); err != nil { - return nil - } - - if len(command.Handler) == 0 { - return nil - } - - idx := command.RobinIndex % uint(len(command.Handler)) - command.RobinIndex = idx + 1 - - raw, err := json.Marshal(&command) - if err == nil { - _, _ = kv.Kv.Put(context.Background(), ky, string(raw)) - } - - return command.Handler[idx] -} - -func RemoveCommand(id, method string) error { - ky := CommandInfoKvPrefix + nex.GetCommandKey(id, method) - - _, err := kv.Kv.Delete(context.Background(), ky) - return err -} diff --git a/pkg/internal/directory/command_rpc.go b/pkg/internal/directory/command_rpc.go deleted file mode 100644 index 5c0747a..0000000 --- a/pkg/internal/directory/command_rpc.go +++ /dev/null @@ -1,115 +0,0 @@ -package directory - -import ( - "context" - "git.solsynth.dev/hypernet/nexus/pkg/proto" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - "io" - "net/http" - "time" -) - -type CommandRpcServer struct { - proto.UnimplementedCommandProviderServer -} - -func (c *CommandRpcServer) AddCommand(ctx context.Context, info *proto.CommandInfo) (*proto.AddCommandResponse, error) { - clientId, err := GetClientId(ctx) - if err != nil { - return nil, err - } - - service := GetServiceInstance(clientId) - if service == nil { - return nil, status.Errorf(codes.NotFound, "service not found") - } - - AddCommand(info.GetId(), info.GetMethod(), info.GetTags(), service) - return &proto.AddCommandResponse{ - IsSuccess: true, - }, nil -} - -func (c *CommandRpcServer) RemoveCommand(ctx context.Context, request *proto.CommandLookupRequest) (*proto.RemoveCommandResponse, error) { - RemoveCommand(request.GetId(), request.GetMethod()) - return &proto.RemoveCommandResponse{ - IsSuccess: true, - }, nil -} - -func (c *CommandRpcServer) SendCommand(ctx context.Context, argument *proto.CommandArgument) (*proto.CommandReturn, error) { - id := argument.GetCommand() - method := argument.GetMethod() - - handler := GetCommandHandler(id, method) - if handler == nil { - return &proto.CommandReturn{ - IsDelivered: false, - Status: http.StatusNotFound, - ContentType: "text/plain+error", - Payload: []byte("command not found"), - }, nil - } - - conn, err := handler.GetGrpcConn() - if err != nil { - return &proto.CommandReturn{ - IsDelivered: false, - Status: http.StatusServiceUnavailable, - ContentType: "text/plain+error", - Payload: []byte("service unavailable"), - }, nil - } - - ctx, cancel := context.WithTimeout(ctx, time.Second*10) - defer cancel() - - out, err := proto.NewCommandProviderClient(conn).SendCommand(ctx, argument) - if err != nil { - return &proto.CommandReturn{ - IsDelivered: true, - Status: http.StatusInternalServerError, - ContentType: "text/plain+error", - Payload: []byte(err.Error()), - }, nil - } - out.IsDelivered = true - return out, nil -} - -func (c *CommandRpcServer) SendStreamCommand(g proto.CommandProvider_SendStreamCommandServer) error { - for { - pck, err := g.Recv() - if err == io.EOF { - return nil - } else if err != nil { - return err - } - - id := pck.GetCommand() - method := pck.GetMethod() - - handler := GetCommandHandler(id, method) - if handler == nil { - return status.Errorf(codes.NotFound, "command not found") - } - - conn, err := handler.GetGrpcConn() - - ctx, cancel := context.WithTimeout(g.Context(), time.Second*10) - out, err := proto.NewCommandProviderClient(conn).SendCommand(ctx, pck) - cancel() - - if err != nil { - _ = g.Send(&proto.CommandReturn{ - IsDelivered: false, - Status: http.StatusInternalServerError, - ContentType: "text/plain+error", - Payload: []byte(err.Error()), - }) - } else { - _ = g.Send(out) - } - } -} diff --git a/pkg/internal/grpc/server.go b/pkg/internal/grpc/server.go index 1db8a6e..58448a1 100644 --- a/pkg/internal/grpc/server.go +++ b/pkg/internal/grpc/server.go @@ -1,9 +1,10 @@ package grpc import ( - "git.solsynth.dev/hypernet/nexus/pkg/internal/directory" "net" + "git.solsynth.dev/hypernet/nexus/pkg/internal/directory" + "git.solsynth.dev/hypernet/nexus/pkg/proto" "google.golang.org/grpc/reflection" @@ -29,7 +30,6 @@ func NewServer() *Server { } proto.RegisterDirectoryServiceServer(server.srv, &directory.ServiceRpcServer{}) - proto.RegisterCommandProviderServer(server.srv, &directory.CommandRpcServer{}) proto.RegisterDatabaseServiceServer(server.srv, server) proto.RegisterStreamServiceServer(server.srv, server) proto.RegisterAllocatorServiceServer(server.srv, server) diff --git a/pkg/internal/grpc/stream.go b/pkg/internal/grpc/stream.go index fefa137..b6cf8b0 100644 --- a/pkg/internal/grpc/stream.go +++ b/pkg/internal/grpc/stream.go @@ -4,7 +4,7 @@ import ( "context" "fmt" - "git.solsynth.dev/hypernet/nexus/pkg/internal/http/ws" + "git.solsynth.dev/hypernet/nexus/pkg/internal/web/ws" "github.com/rs/zerolog/log" "git.solsynth.dev/hypernet/nexus/pkg/proto" diff --git a/pkg/internal/http/api/command.go b/pkg/internal/http/api/command.go deleted file mode 100644 index 7a94eaf..0000000 --- a/pkg/internal/http/api/command.go +++ /dev/null @@ -1,67 +0,0 @@ -package api - -import ( - "context" - "fmt" - "git.solsynth.dev/hypernet/nexus/pkg/internal/directory" - "git.solsynth.dev/hypernet/nexus/pkg/proto" - "github.com/gofiber/fiber/v2" - "github.com/rs/zerolog/log" - "google.golang.org/grpc/metadata" - "strings" - "time" -) - -func invokeCommand(c *fiber.Ctx) error { - command := c.Params("command") - method := strings.ToLower(c.Method()) - - handler := directory.GetCommandHandler(command, method) - if handler == nil { - return fiber.NewError(fiber.StatusNotFound, "command not found") - } - - conn, err := handler.GetGrpcConn() - if err != nil { - return fiber.NewError(fiber.StatusServiceUnavailable, "service unavailable") - } - - log.Debug().Str("id", command).Str("method", method).Msg("Invoking command from HTTP Gateway...") - - var meta []string - meta = append(meta, "client_id", "http-gateway") - meta = append(meta, "net.ip", c.IP()) - meta = append(meta, "http.user_agent", c.Get(fiber.HeaderUserAgent)) - for k, v := range c.GetReqHeaders() { - meta = append( - meta, - strings.ToLower(fmt.Sprintf("header.%s", strings.ReplaceAll(k, "-", "_"))), - strings.Join(v, "\n"), - ) - } - - for k, v := range c.Queries() { - meta = append( - meta, - strings.ToLower(fmt.Sprintf("query.%s", strings.ReplaceAll(k, "-", "_"))), - v, - ) - } - - ctx := metadata.AppendToOutgoingContext(c.Context(), meta...) - ctx, cancel := context.WithTimeout(ctx, time.Second*10) - defer cancel() - - out, err := proto.NewCommandProviderClient(conn).SendCommand(ctx, &proto.CommandArgument{ - Command: command, - Method: method, - Payload: c.Body(), - }) - - if err != nil { - return fiber.NewError(fiber.StatusInternalServerError, err.Error()) - } else { - c.Set(fiber.HeaderContentType, out.ContentType) - return c.Status(int(out.Status)).Send(out.Payload) - } -} diff --git a/pkg/internal/http/api/check_ip.go b/pkg/internal/web/api/check_ip.go similarity index 100% rename from pkg/internal/http/api/check_ip.go rename to pkg/internal/web/api/check_ip.go diff --git a/pkg/internal/http/api/directory.go b/pkg/internal/web/api/directory.go similarity index 100% rename from pkg/internal/http/api/directory.go rename to pkg/internal/web/api/directory.go diff --git a/pkg/internal/http/api/forward.go b/pkg/internal/web/api/forward.go similarity index 100% rename from pkg/internal/http/api/forward.go rename to pkg/internal/web/api/forward.go diff --git a/pkg/internal/http/api/index.go b/pkg/internal/web/api/index.go similarity index 92% rename from pkg/internal/http/api/index.go rename to pkg/internal/web/api/index.go index 444be12..edfa4bb 100644 --- a/pkg/internal/http/api/index.go +++ b/pkg/internal/web/api/index.go @@ -4,7 +4,7 @@ import ( pkg "git.solsynth.dev/hypernet/nexus/pkg/internal" "git.solsynth.dev/hypernet/nexus/pkg/internal/auth" "git.solsynth.dev/hypernet/nexus/pkg/internal/directory" - "git.solsynth.dev/hypernet/nexus/pkg/internal/http/ws" + "git.solsynth.dev/hypernet/nexus/pkg/internal/web/ws" "git.solsynth.dev/hypernet/nexus/pkg/nex" "github.com/gofiber/contrib/websocket" "github.com/gofiber/fiber/v2" @@ -12,6 +12,8 @@ import ( ) func MapAPIs(app *fiber.App) { + app.Get("/check-ip", getClientIP) + // Some built-in public-accessible APIs wellKnown := app.Group("/.well-known").Name("Well Known") { @@ -22,7 +24,6 @@ func MapAPIs(app *fiber.App) { "status": true, }) }) - wellKnown.Get("/check-ip", getClientIP) wellKnown.Get("/directory/services", listExistsService) wellKnown.Get("/openid-configuration", func(c *fiber.Ctx) error { @@ -50,6 +51,5 @@ func MapAPIs(app *fiber.App) { // Common websocket gateway app.Get("/ws", auth.ValidatorMiddleware, websocket.New(ws.Listen)) - app.All("/inv/:command", invokeCommand) app.All("/cgi/:service/*", forwardService) } diff --git a/pkg/internal/http/api/watchtower.go b/pkg/internal/web/api/watchtower.go similarity index 100% rename from pkg/internal/http/api/watchtower.go rename to pkg/internal/web/api/watchtower.go diff --git a/pkg/internal/http/exts/request.go b/pkg/internal/web/exts/request.go similarity index 100% rename from pkg/internal/http/exts/request.go rename to pkg/internal/web/exts/request.go diff --git a/pkg/internal/http/server.go b/pkg/internal/web/server.go similarity index 89% rename from pkg/internal/http/server.go rename to pkg/internal/web/server.go index 9376823..de48e62 100644 --- a/pkg/internal/http/server.go +++ b/pkg/internal/web/server.go @@ -1,10 +1,10 @@ -package server +package web import ( "time" "git.solsynth.dev/hypernet/nexus/pkg/internal/auth" - "git.solsynth.dev/hypernet/nexus/pkg/internal/http/api" + "git.solsynth.dev/hypernet/nexus/pkg/internal/web/api" "github.com/goccy/go-json" "github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2/middleware/cors" @@ -15,11 +15,11 @@ import ( "github.com/spf13/viper" ) -type HTTPApp struct { +type WebApp struct { app *fiber.App } -func NewServer() *HTTPApp { +func NewServer() *WebApp { app := fiber.New(fiber.Config{ DisableStartupMessage: true, EnableIPValidation: true, @@ -55,10 +55,10 @@ func NewServer() *HTTPApp { api.MapAPIs(app) - return &HTTPApp{app} + return &WebApp{app} } -func (v *HTTPApp) Listen() { +func (v *WebApp) Listen() { if err := v.app.Listen(viper.GetString("bind")); err != nil { log.Fatal().Err(err).Msg("An error occurred when starting server...") } diff --git a/pkg/internal/http/ws/connections.go b/pkg/internal/web/ws/connections.go similarity index 100% rename from pkg/internal/http/ws/connections.go rename to pkg/internal/web/ws/connections.go diff --git a/pkg/internal/http/ws/ws.go b/pkg/internal/web/ws/ws.go similarity index 100% rename from pkg/internal/http/ws/ws.go rename to pkg/internal/web/ws/ws.go diff --git a/pkg/main.go b/pkg/main.go index d2597d5..857a9dc 100644 --- a/pkg/main.go +++ b/pkg/main.go @@ -9,10 +9,10 @@ import ( "git.solsynth.dev/hypernet/nexus/pkg/internal/auth" "git.solsynth.dev/hypernet/nexus/pkg/internal/database" "git.solsynth.dev/hypernet/nexus/pkg/internal/directory" - server "git.solsynth.dev/hypernet/nexus/pkg/internal/http" "git.solsynth.dev/hypernet/nexus/pkg/internal/kv" "git.solsynth.dev/hypernet/nexus/pkg/internal/mq" "git.solsynth.dev/hypernet/nexus/pkg/internal/watchtower" + "git.solsynth.dev/hypernet/nexus/pkg/internal/web" "git.solsynth.dev/hypernet/nexus/pkg/nex/sec" "github.com/fatih/color" @@ -109,7 +109,7 @@ func main() { go directory.ValidateServices() // Server - go server.NewServer().Listen() + go web.NewServer().Listen() // Grpc Server go grpc.NewServer().Listen() diff --git a/pkg/nex/command.go b/pkg/nex/command.go deleted file mode 100644 index c37e0d7..0000000 --- a/pkg/nex/command.go +++ /dev/null @@ -1,150 +0,0 @@ -package nex - -import ( - "context" - "git.solsynth.dev/hypernet/nexus/pkg/proto" - "google.golang.org/grpc" - health "google.golang.org/grpc/health/grpc_health_v1" - "google.golang.org/grpc/metadata" - "google.golang.org/grpc/reflection" - "net" - "net/http" - "strconv" - "strings" - "time" -) - -type CommandHandler func(ctx *CommandCtx) error - -func GetCommandKey(id, method string) string { - return id + ":" + method -} - -func (v *Conn) AddCommand(id, method string, tags []string, fn CommandHandler) error { - method = strings.ToLower(method) - dir := proto.NewCommandProviderClient(v.nexusConn) - ctx := context.Background() - ctx = metadata.AppendToOutgoingContext(ctx, "client_id", v.Info.Id) - - var addingMethodQueue []string - if method == "all" { - addingMethodQueue = []string{"get", "post", "put", "patch", "delete"} - } else { - addingMethodQueue = append(addingMethodQueue, method) - } - - for _, method := range addingMethodQueue { - ky := GetCommandKey(id, method) - _, err := dir.AddCommand(ctx, &proto.CommandInfo{ - Id: id, - Method: method, - Tags: tags, - }) - if err == nil { - v.commandHandlers[ky] = fn - } else { - return err - } - } - - return nil -} - -type localCommandRpcServer struct { - conn *Conn - - proto.UnimplementedCommandProviderServer - health.UnimplementedHealthServer -} - -func (v localCommandRpcServer) SendCommand(ctx context.Context, argument *proto.CommandArgument) (*proto.CommandReturn, error) { - ky := GetCommandKey(argument.GetCommand(), argument.GetMethod()) - if handler, ok := v.conn.commandHandlers[ky]; !ok { - return &proto.CommandReturn{ - Status: http.StatusNotFound, - Payload: []byte(argument.GetCommand() + " not found"), - }, nil - } else { - cc := &CommandCtx{ - requestBody: argument.GetPayload(), - statusCode: http.StatusOK, - } - if md, ok := metadata.FromIncomingContext(ctx); ok { - for k, v := range md { - var val any = nil - if len(v) == 1 { - if len(v[0]) != 0 { - if i, err := strconv.ParseInt(v[0], 10, 64); err == nil { - val = i - } else if b, err := strconv.ParseBool(v[0]); err == nil { - val = b - } else if f, err := strconv.ParseFloat(v[0], 64); err == nil { - val = f - } - layouts := []string{ - time.RFC3339, - "2006-01-02 15:04:05", // Example: 2024-10-20 14:55:05 - "2006-01-02", // Example: 2024-10-20 - } - for _, layout := range layouts { - if t, err := time.Parse(layout, v[0]); err == nil { - val = t - } - } - if val == nil { - val = v[0] - } - } else { - val = v[0] - } - } else if len(v) > 1 { - val = v - } - cc.values.Store(k, val) - } - } - if err := handler(cc); err != nil { - return nil, err - } else { - return &proto.CommandReturn{ - Status: int32(cc.statusCode), - ContentType: cc.contentType, - Payload: cc.responseBody, - }, nil - } - } -} - -func (v localCommandRpcServer) Check(ctx context.Context, request *health.HealthCheckRequest) (*health.HealthCheckResponse, error) { - return &health.HealthCheckResponse{ - Status: health.HealthCheckResponse_SERVING, - }, nil -} - -func (v localCommandRpcServer) Watch(request *health.HealthCheckRequest, server health.Health_WatchServer) error { - for { - if server.Send(&health.HealthCheckResponse{ - Status: health.HealthCheckResponse_SERVING, - }) != nil { - break - } - time.Sleep(1000 * time.Millisecond) - } - - return nil -} - -func (v *Conn) RunCommands(addr string) error { - v.commandServer = grpc.NewServer() - service := &localCommandRpcServer{conn: v} - proto.RegisterCommandProviderServer(v.commandServer, service) - health.RegisterHealthServer(v.commandServer, service) - reflection.Register(v.commandServer) - - listener, err := net.Listen("tcp", addr) - if err != nil { - return err - } - - return v.commandServer.Serve(listener) -} diff --git a/pkg/nex/command_context.go b/pkg/nex/command_context.go deleted file mode 100644 index 903f21a..0000000 --- a/pkg/nex/command_context.go +++ /dev/null @@ -1,90 +0,0 @@ -package nex - -import ( - "fmt" - "github.com/goccy/go-json" - "net/http" - "sync" -) - -type CommandCtx struct { - requestBody []byte - responseBody []byte - - contentType string - statusCode int - - values sync.Map -} - -func CtxValueMustBe[T any](c *CommandCtx, key string) (T, error) { - if val, ok := c.values.Load(key); ok { - if v, ok := val.(T); ok { - return v, nil - } - } - var out T - if err := c.Write([]byte(fmt.Sprintf("value %s not found in type %T", key, out)), "text/plain+error", http.StatusBadRequest); err != nil { - return out, err - } - return out, fmt.Errorf("value %s not found", key) -} - -func CtxValueShouldBe[T any](c *CommandCtx, key string, defaultValue T) T { - if val, ok := c.values.Load(key); ok { - if v, ok := val.(T); ok { - return v - } - } - return defaultValue -} - -func (c *CommandCtx) Values() map[string]any { - duplicate := make(map[string]any) - c.values.Range(func(key, value any) bool { - duplicate[key.(string)] = value - return true - }) - return duplicate -} - -func (c *CommandCtx) ValueOrElse(key string, defaultValue any) any { - val, _ := c.values.Load(key) - if val == nil { - return defaultValue - } - return val -} - -func (c *CommandCtx) Value(key string, newValue ...any) any { - if len(newValue) > 0 { - c.values.Store(key, newValue[0]) - } - val, _ := c.values.Load(key) - return val -} - -func (c *CommandCtx) Read() []byte { - return c.requestBody -} - -func (c *CommandCtx) ReadJSON(out any) error { - return json.Unmarshal(c.requestBody, out) -} - -func (c *CommandCtx) Write(data []byte, contentType string, statusCode ...int) error { - c.responseBody = data - c.contentType = contentType - if len(statusCode) > 0 { - c.statusCode = statusCode[0] - } - return nil -} - -func (c *CommandCtx) JSON(data any, statusCode ...int) error { - raw, err := json.Marshal(data) - if err != nil { - return err - } - return c.Write(raw, "application/json", statusCode...) -} diff --git a/pkg/nex/command_test.go b/pkg/nex/command_test.go deleted file mode 100644 index 1cbdad5..0000000 --- a/pkg/nex/command_test.go +++ /dev/null @@ -1,60 +0,0 @@ -package nex_test - -import ( - "fmt" - "git.solsynth.dev/hypernet/nexus/pkg/nex" - "git.solsynth.dev/hypernet/nexus/pkg/proto" - "net/http" - "testing" - "time" -) - -func TestHandleCommand(t *testing.T) { - conn, err := nex.NewNexusConn("127.0.0.1:7001", &proto.ServiceInfo{ - Id: "echo01", - Type: "echo", - Label: "Echo", - GrpcAddr: "127.0.0.1:6001", - HttpAddr: nil, - }) - if err != nil { - t.Fatal(fmt.Errorf("unable to connect nexus: %v", err)) - } - - if err := conn.RegisterService(); err != nil { - t.Fatal(fmt.Errorf("unable to register service: %v", err)) - } - - err = conn.AddCommand("say.hi", "all", nil, func(ctx *nex.CommandCtx) error { - return ctx.Write([]byte("Hello, World!"), "text/plain", http.StatusOK) - }) - if err != nil { - t.Fatal(fmt.Errorf("unable to add command: %v", err)) - return - } - err = conn.AddCommand("echo", "all", nil, func(ctx *nex.CommandCtx) error { - t.Log("Received command: ", string(ctx.Read())) - return ctx.Write(ctx.Read(), "text/plain", http.StatusOK) - }) - err = conn.AddCommand("echo.details", "all", nil, func(ctx *nex.CommandCtx) error { - return ctx.JSON(map[string]any{ - "values": ctx.Values(), - "body": ctx.Read(), - }, http.StatusOK) - }) - if err != nil { - t.Fatal(fmt.Errorf("unable to add command: %v", err)) - return - } - - go func() { - err := conn.RunCommands("0.0.0.0:6001") - if err != nil { - t.Error(fmt.Errorf("unable to run commands: %v", err)) - return - } - }() - - t.Log("Waiting 60 seconds for calling command...") - time.Sleep(time.Second * 60) -} diff --git a/pkg/nex/conn.go b/pkg/nex/conn.go index bef92e5..a421e22 100644 --- a/pkg/nex/conn.go +++ b/pkg/nex/conn.go @@ -2,9 +2,10 @@ package nex import ( "context" - "google.golang.org/grpc/metadata" "time" + "google.golang.org/grpc/metadata" + "git.solsynth.dev/hypernet/nexus/pkg/proto" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -15,9 +16,6 @@ type Conn struct { Addr string Info *proto.ServiceInfo - commandServer *grpc.Server - commandHandlers map[string]CommandHandler - nexusConn *grpc.ClientConn clientConn map[string]*grpc.ClientConn } @@ -35,8 +33,6 @@ func NewNexusConn(addr string, info *proto.ServiceInfo) (*Conn, error) { Addr: addr, Info: info, - commandHandlers: make(map[string]CommandHandler), - nexusConn: conn, clientConn: make(map[string]*grpc.ClientConn), }, nil diff --git a/pkg/nex/cruda/command.go b/pkg/nex/cruda/command.go deleted file mode 100644 index e300973..0000000 --- a/pkg/nex/cruda/command.go +++ /dev/null @@ -1,130 +0,0 @@ -package cruda - -import ( - "errors" - "git.solsynth.dev/hypernet/nexus/pkg/nex" - "github.com/go-playground/validator/v10" - "gorm.io/gorm" - "net/http" -) - -type CrudAction func(v *CrudConn) nex.CommandHandler - -func AddModel[T any](v *CrudConn, model T, id, prefix string, tags []string) error { - funcList := []CrudAction{cmdList[T], cmdGet[T], cmdCreate[T], cmdUpdate[T], cmdDelete[T]} - funcCmds := []string{".list", "", "", "", ""} - funcMethods := []string{"get", "get", "put", "patch", "delete"} - for idx, fn := range funcList { - if err := v.n.AddCommand(prefix+id+funcCmds[idx], funcMethods[idx], tags, fn(v)); err != nil { - return err - } - } - return nil -} - -var validate = validator.New(validator.WithRequiredStructEnabled()) - -func cmdList[T any](c *CrudConn) nex.CommandHandler { - return func(ctx *nex.CommandCtx) error { - take := int(nex.CtxValueShouldBe[int64](ctx, "query.take", 10)) - skip := int(nex.CtxValueShouldBe[int64](ctx, "query.skip", 0)) - - var str T - var count int64 - if err := c.Db.Model(str).Count(&count).Error; err != nil { - return err - } - - var out []T - if err := c.Db.Offset(skip).Limit(take).Find(&out).Error; err != nil { - return err - } - - return ctx.JSON(map[string]any{ - "count": count, - "data": out, - }, http.StatusOK) - } -} - -func cmdGet[T any](c *CrudConn) nex.CommandHandler { - return func(ctx *nex.CommandCtx) error { - id, err := nex.CtxValueMustBe[int64](ctx, "query.id") - if err != nil { - return err - } - - var out T - if err := c.Db.First(&out, "id = ?", id).Error; err != nil { - if errors.Is(err, gorm.ErrRecordNotFound) { - return ctx.Write([]byte(err.Error()), "text/plain", http.StatusNotFound) - } - return err - } - - return ctx.JSON(out, http.StatusOK) - } -} - -func cmdCreate[T any](c *CrudConn) nex.CommandHandler { - return func(ctx *nex.CommandCtx) error { - var payload T - if err := ctx.ReadJSON(&payload); err != nil { - return err - } else if err := validate.Struct(payload); err != nil { - return ctx.Write([]byte(err.Error()), "text/plain+error", http.StatusBadRequest) - } - - if err := c.Db.Create(&payload).Error; err != nil { - return err - } - - return ctx.JSON(payload, http.StatusOK) - } -} - -func cmdUpdate[T any](c *CrudConn) nex.CommandHandler { - return func(ctx *nex.CommandCtx) error { - id, err := nex.CtxValueMustBe[int64](ctx, "query.id") - if err != nil { - return err - } - - var payload T - if err := ctx.ReadJSON(&payload); err != nil { - return err - } else if err := validate.Struct(payload); err != nil { - return ctx.Write([]byte(err.Error()), "text/plain+error", http.StatusBadRequest) - } - - var out T - if err := c.Db.Model(out).Where("id = ?", id).Updates(&payload).Error; err != nil { - return err - } - - if err := c.Db.First(&out, "id = ?", id).Error; err != nil { - return err - } - - return ctx.JSON(out, http.StatusOK) - } -} - -func cmdDelete[T any](c *CrudConn) nex.CommandHandler { - return func(ctx *nex.CommandCtx) error { - id, err := nex.CtxValueMustBe[int64](ctx, "query.id") - if err != nil { - return err - } - - var out T - if err := c.Db.Delete(&out, "id = ?", id).Error; err != nil { - if errors.Is(err, gorm.ErrRecordNotFound) { - return ctx.Write([]byte(err.Error()), "text/plain", http.StatusNotFound) - } - return err - } - - return ctx.Write(nil, "text/plain", http.StatusOK) - } -} diff --git a/pkg/nex/cruda/command_test.go b/pkg/nex/cruda/command_test.go deleted file mode 100644 index 74b7050..0000000 --- a/pkg/nex/cruda/command_test.go +++ /dev/null @@ -1,58 +0,0 @@ -package cruda_test - -import ( - "fmt" - "git.solsynth.dev/hypernet/nexus/pkg/nex" - "git.solsynth.dev/hypernet/nexus/pkg/nex/cruda" - "git.solsynth.dev/hypernet/nexus/pkg/proto" - "testing" - "time" -) - -type Test struct { - cruda.BaseModel - Content string `json:"content" validate:"required"` -} - -func TestCrudaCommand(t *testing.T) { - conn, err := nex.NewNexusConn("127.0.0.1:7001", &proto.ServiceInfo{ - Id: "cruda01", - Type: "cruda", - Label: "CRUD Accelerator", - GrpcAddr: "127.0.0.1:6001", - HttpAddr: nil, - }) - if err != nil { - t.Fatal(fmt.Errorf("unable to connect nexus: %v", err)) - } - - if err := conn.RegisterService(); err != nil { - t.Fatal(fmt.Errorf("unable to register service: %v", err)) - } - - cc := cruda.NewCrudaConn(conn) - dsn, err := cc.AllocDatabase("test") - if err != nil { - t.Fatal(fmt.Errorf("unable to allocate database: %v", err)) - } - t.Log(fmt.Sprintf("Allocated database: %s", dsn)) - - if err := cruda.MigrateModel(cc, Test{}); err != nil { - t.Fatal(fmt.Errorf("unable to migrate database: %v", err)) - } - - if err := cruda.AddModel(cc, Test{}, "tm", "test.", nil); err != nil { - t.Fatal(fmt.Errorf("unable to add commands: %v", err)) - } - - go func() { - err := conn.RunCommands("0.0.0.0:6001") - if err != nil { - t.Error(fmt.Errorf("unable to run commands: %v", err)) - return - } - }() - - t.Log("Waiting 180 seconds for calling command...") - time.Sleep(time.Second * 180) -}