🗑️ Clean up command related stuff
🚚 Move http package to web
			
			
This commit is contained in:
		| @@ -1,22 +0,0 @@ | ||||
| package directory | ||||
|  | ||||
| const ( | ||||
| 	CommandMethodGet    = "get" | ||||
| 	CommandMethodPut    = "put" | ||||
| 	CommandMethodPatch  = "patch" | ||||
| 	CommandMethodPost   = "post" | ||||
| 	CommandMethodDelete = "delete" | ||||
| ) | ||||
|  | ||||
| type Command struct { | ||||
| 	// The unique identifier of the command, different method command can hold the same command id | ||||
| 	ID string `json:"id"` | ||||
| 	// The method of the command, such as get, post, others; inspired by RESTful design | ||||
| 	Method string `json:"method"` | ||||
| 	// The tags of the command will be used to invoke the pre-command middlewares and post-command middlewares | ||||
| 	Tags []string `json:"tags"` | ||||
| 	// The implementation of the command, the handler is the service that will be invoked | ||||
| 	Handler []*ServiceInstance `json:"handler"` | ||||
|  | ||||
| 	RobinIndex uint `json:"robin_index"` | ||||
| } | ||||
| @@ -1,79 +0,0 @@ | ||||
| package directory | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"git.solsynth.dev/hypernet/nexus/pkg/internal/kv" | ||||
| 	"git.solsynth.dev/hypernet/nexus/pkg/nex" | ||||
| 	"github.com/goccy/go-json" | ||||
| 	"github.com/rs/zerolog/log" | ||||
| 	"github.com/samber/lo" | ||||
| ) | ||||
|  | ||||
| const CommandInfoKvPrefix = "nexus.command/" | ||||
|  | ||||
| func AddCommand(id, method string, tags []string, handler *ServiceInstance) error { | ||||
| 	if tags == nil { | ||||
| 		tags = make([]string, 0) | ||||
| 	} | ||||
|  | ||||
| 	ky := CommandInfoKvPrefix + nex.GetCommandKey(id, method) | ||||
|  | ||||
| 	command := &Command{ | ||||
| 		ID:      id, | ||||
| 		Method:  method, | ||||
| 		Tags:    tags, | ||||
| 		Handler: []*ServiceInstance{handler}, | ||||
| 	} | ||||
|  | ||||
| 	command.Handler = lo.UniqBy(command.Handler, func(item *ServiceInstance) string { | ||||
| 		return item.ID | ||||
| 	}) | ||||
|  | ||||
| 	commandJSON, err := json.Marshal(command) | ||||
| 	if err != nil { | ||||
| 		log.Printf("Error marshaling command: %v", err) | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	_, err = kv.Kv.Put(context.Background(), ky, string(commandJSON)) | ||||
| 	return err | ||||
| } | ||||
|  | ||||
| func GetCommandHandler(id, method string) *ServiceInstance { | ||||
| 	ky := CommandInfoKvPrefix + nex.GetCommandKey(id, method) | ||||
|  | ||||
| 	resp, err := kv.Kv.Get(context.Background(), ky) | ||||
| 	if err != nil { | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	if len(resp.Kvs) == 0 { | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	var command Command | ||||
| 	if err := json.Unmarshal(resp.Kvs[0].Value, &command); err != nil { | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	if len(command.Handler) == 0 { | ||||
| 		return nil | ||||
| 	} | ||||
|  | ||||
| 	idx := command.RobinIndex % uint(len(command.Handler)) | ||||
| 	command.RobinIndex = idx + 1 | ||||
|  | ||||
| 	raw, err := json.Marshal(&command) | ||||
| 	if err == nil { | ||||
| 		_, _ = kv.Kv.Put(context.Background(), ky, string(raw)) | ||||
| 	} | ||||
|  | ||||
| 	return command.Handler[idx] | ||||
| } | ||||
|  | ||||
| func RemoveCommand(id, method string) error { | ||||
| 	ky := CommandInfoKvPrefix + nex.GetCommandKey(id, method) | ||||
|  | ||||
| 	_, err := kv.Kv.Delete(context.Background(), ky) | ||||
| 	return err | ||||
| } | ||||
| @@ -1,115 +0,0 @@ | ||||
| package directory | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"git.solsynth.dev/hypernet/nexus/pkg/proto" | ||||
| 	"google.golang.org/grpc/codes" | ||||
| 	"google.golang.org/grpc/status" | ||||
| 	"io" | ||||
| 	"net/http" | ||||
| 	"time" | ||||
| ) | ||||
|  | ||||
| type CommandRpcServer struct { | ||||
| 	proto.UnimplementedCommandProviderServer | ||||
| } | ||||
|  | ||||
| func (c *CommandRpcServer) AddCommand(ctx context.Context, info *proto.CommandInfo) (*proto.AddCommandResponse, error) { | ||||
| 	clientId, err := GetClientId(ctx) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | ||||
| 	service := GetServiceInstance(clientId) | ||||
| 	if service == nil { | ||||
| 		return nil, status.Errorf(codes.NotFound, "service not found") | ||||
| 	} | ||||
|  | ||||
| 	AddCommand(info.GetId(), info.GetMethod(), info.GetTags(), service) | ||||
| 	return &proto.AddCommandResponse{ | ||||
| 		IsSuccess: true, | ||||
| 	}, nil | ||||
| } | ||||
|  | ||||
| func (c *CommandRpcServer) RemoveCommand(ctx context.Context, request *proto.CommandLookupRequest) (*proto.RemoveCommandResponse, error) { | ||||
| 	RemoveCommand(request.GetId(), request.GetMethod()) | ||||
| 	return &proto.RemoveCommandResponse{ | ||||
| 		IsSuccess: true, | ||||
| 	}, nil | ||||
| } | ||||
|  | ||||
| func (c *CommandRpcServer) SendCommand(ctx context.Context, argument *proto.CommandArgument) (*proto.CommandReturn, error) { | ||||
| 	id := argument.GetCommand() | ||||
| 	method := argument.GetMethod() | ||||
|  | ||||
| 	handler := GetCommandHandler(id, method) | ||||
| 	if handler == nil { | ||||
| 		return &proto.CommandReturn{ | ||||
| 			IsDelivered: false, | ||||
| 			Status:      http.StatusNotFound, | ||||
| 			ContentType: "text/plain+error", | ||||
| 			Payload:     []byte("command not found"), | ||||
| 		}, nil | ||||
| 	} | ||||
|  | ||||
| 	conn, err := handler.GetGrpcConn() | ||||
| 	if err != nil { | ||||
| 		return &proto.CommandReturn{ | ||||
| 			IsDelivered: false, | ||||
| 			Status:      http.StatusServiceUnavailable, | ||||
| 			ContentType: "text/plain+error", | ||||
| 			Payload:     []byte("service unavailable"), | ||||
| 		}, nil | ||||
| 	} | ||||
|  | ||||
| 	ctx, cancel := context.WithTimeout(ctx, time.Second*10) | ||||
| 	defer cancel() | ||||
|  | ||||
| 	out, err := proto.NewCommandProviderClient(conn).SendCommand(ctx, argument) | ||||
| 	if err != nil { | ||||
| 		return &proto.CommandReturn{ | ||||
| 			IsDelivered: true, | ||||
| 			Status:      http.StatusInternalServerError, | ||||
| 			ContentType: "text/plain+error", | ||||
| 			Payload:     []byte(err.Error()), | ||||
| 		}, nil | ||||
| 	} | ||||
| 	out.IsDelivered = true | ||||
| 	return out, nil | ||||
| } | ||||
|  | ||||
| func (c *CommandRpcServer) SendStreamCommand(g proto.CommandProvider_SendStreamCommandServer) error { | ||||
| 	for { | ||||
| 		pck, err := g.Recv() | ||||
| 		if err == io.EOF { | ||||
| 			return nil | ||||
| 		} else if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 		id := pck.GetCommand() | ||||
| 		method := pck.GetMethod() | ||||
|  | ||||
| 		handler := GetCommandHandler(id, method) | ||||
| 		if handler == nil { | ||||
| 			return status.Errorf(codes.NotFound, "command not found") | ||||
| 		} | ||||
|  | ||||
| 		conn, err := handler.GetGrpcConn() | ||||
|  | ||||
| 		ctx, cancel := context.WithTimeout(g.Context(), time.Second*10) | ||||
| 		out, err := proto.NewCommandProviderClient(conn).SendCommand(ctx, pck) | ||||
| 		cancel() | ||||
|  | ||||
| 		if err != nil { | ||||
| 			_ = g.Send(&proto.CommandReturn{ | ||||
| 				IsDelivered: false, | ||||
| 				Status:      http.StatusInternalServerError, | ||||
| 				ContentType: "text/plain+error", | ||||
| 				Payload:     []byte(err.Error()), | ||||
| 			}) | ||||
| 		} else { | ||||
| 			_ = g.Send(out) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
| @@ -1,9 +1,10 @@ | ||||
| package grpc | ||||
|  | ||||
| import ( | ||||
| 	"git.solsynth.dev/hypernet/nexus/pkg/internal/directory" | ||||
| 	"net" | ||||
|  | ||||
| 	"git.solsynth.dev/hypernet/nexus/pkg/internal/directory" | ||||
|  | ||||
| 	"git.solsynth.dev/hypernet/nexus/pkg/proto" | ||||
|  | ||||
| 	"google.golang.org/grpc/reflection" | ||||
| @@ -29,7 +30,6 @@ func NewServer() *Server { | ||||
| 	} | ||||
|  | ||||
| 	proto.RegisterDirectoryServiceServer(server.srv, &directory.ServiceRpcServer{}) | ||||
| 	proto.RegisterCommandProviderServer(server.srv, &directory.CommandRpcServer{}) | ||||
| 	proto.RegisterDatabaseServiceServer(server.srv, server) | ||||
| 	proto.RegisterStreamServiceServer(server.srv, server) | ||||
| 	proto.RegisterAllocatorServiceServer(server.srv, server) | ||||
|   | ||||
| @@ -4,7 +4,7 @@ import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
|  | ||||
| 	"git.solsynth.dev/hypernet/nexus/pkg/internal/http/ws" | ||||
| 	"git.solsynth.dev/hypernet/nexus/pkg/internal/web/ws" | ||||
| 	"github.com/rs/zerolog/log" | ||||
|  | ||||
| 	"git.solsynth.dev/hypernet/nexus/pkg/proto" | ||||
|   | ||||
| @@ -1,67 +0,0 @@ | ||||
| package api | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"git.solsynth.dev/hypernet/nexus/pkg/internal/directory" | ||||
| 	"git.solsynth.dev/hypernet/nexus/pkg/proto" | ||||
| 	"github.com/gofiber/fiber/v2" | ||||
| 	"github.com/rs/zerolog/log" | ||||
| 	"google.golang.org/grpc/metadata" | ||||
| 	"strings" | ||||
| 	"time" | ||||
| ) | ||||
|  | ||||
| func invokeCommand(c *fiber.Ctx) error { | ||||
| 	command := c.Params("command") | ||||
| 	method := strings.ToLower(c.Method()) | ||||
|  | ||||
| 	handler := directory.GetCommandHandler(command, method) | ||||
| 	if handler == nil { | ||||
| 		return fiber.NewError(fiber.StatusNotFound, "command not found") | ||||
| 	} | ||||
|  | ||||
| 	conn, err := handler.GetGrpcConn() | ||||
| 	if err != nil { | ||||
| 		return fiber.NewError(fiber.StatusServiceUnavailable, "service unavailable") | ||||
| 	} | ||||
|  | ||||
| 	log.Debug().Str("id", command).Str("method", method).Msg("Invoking command from HTTP Gateway...") | ||||
|  | ||||
| 	var meta []string | ||||
| 	meta = append(meta, "client_id", "http-gateway") | ||||
| 	meta = append(meta, "net.ip", c.IP()) | ||||
| 	meta = append(meta, "http.user_agent", c.Get(fiber.HeaderUserAgent)) | ||||
| 	for k, v := range c.GetReqHeaders() { | ||||
| 		meta = append( | ||||
| 			meta, | ||||
| 			strings.ToLower(fmt.Sprintf("header.%s", strings.ReplaceAll(k, "-", "_"))), | ||||
| 			strings.Join(v, "\n"), | ||||
| 		) | ||||
| 	} | ||||
|  | ||||
| 	for k, v := range c.Queries() { | ||||
| 		meta = append( | ||||
| 			meta, | ||||
| 			strings.ToLower(fmt.Sprintf("query.%s", strings.ReplaceAll(k, "-", "_"))), | ||||
| 			v, | ||||
| 		) | ||||
| 	} | ||||
|  | ||||
| 	ctx := metadata.AppendToOutgoingContext(c.Context(), meta...) | ||||
| 	ctx, cancel := context.WithTimeout(ctx, time.Second*10) | ||||
| 	defer cancel() | ||||
|  | ||||
| 	out, err := proto.NewCommandProviderClient(conn).SendCommand(ctx, &proto.CommandArgument{ | ||||
| 		Command: command, | ||||
| 		Method:  method, | ||||
| 		Payload: c.Body(), | ||||
| 	}) | ||||
|  | ||||
| 	if err != nil { | ||||
| 		return fiber.NewError(fiber.StatusInternalServerError, err.Error()) | ||||
| 	} else { | ||||
| 		c.Set(fiber.HeaderContentType, out.ContentType) | ||||
| 		return c.Status(int(out.Status)).Send(out.Payload) | ||||
| 	} | ||||
| } | ||||
| @@ -4,7 +4,7 @@ import ( | ||||
| 	pkg "git.solsynth.dev/hypernet/nexus/pkg/internal" | ||||
| 	"git.solsynth.dev/hypernet/nexus/pkg/internal/auth" | ||||
| 	"git.solsynth.dev/hypernet/nexus/pkg/internal/directory" | ||||
| 	"git.solsynth.dev/hypernet/nexus/pkg/internal/http/ws" | ||||
| 	"git.solsynth.dev/hypernet/nexus/pkg/internal/web/ws" | ||||
| 	"git.solsynth.dev/hypernet/nexus/pkg/nex" | ||||
| 	"github.com/gofiber/contrib/websocket" | ||||
| 	"github.com/gofiber/fiber/v2" | ||||
| @@ -12,6 +12,8 @@ import ( | ||||
| ) | ||||
| 
 | ||||
| func MapAPIs(app *fiber.App) { | ||||
| 	app.Get("/check-ip", getClientIP) | ||||
| 
 | ||||
| 	// Some built-in public-accessible APIs | ||||
| 	wellKnown := app.Group("/.well-known").Name("Well Known") | ||||
| 	{ | ||||
| @@ -22,7 +24,6 @@ func MapAPIs(app *fiber.App) { | ||||
| 				"status":    true, | ||||
| 			}) | ||||
| 		}) | ||||
| 		wellKnown.Get("/check-ip", getClientIP) | ||||
| 		wellKnown.Get("/directory/services", listExistsService) | ||||
| 
 | ||||
| 		wellKnown.Get("/openid-configuration", func(c *fiber.Ctx) error { | ||||
| @@ -50,6 +51,5 @@ func MapAPIs(app *fiber.App) { | ||||
| 	// Common websocket gateway | ||||
| 	app.Get("/ws", auth.ValidatorMiddleware, websocket.New(ws.Listen)) | ||||
| 
 | ||||
| 	app.All("/inv/:command", invokeCommand) | ||||
| 	app.All("/cgi/:service/*", forwardService) | ||||
| } | ||||
| @@ -1,10 +1,10 @@ | ||||
| package server | ||||
| package web | ||||
| 
 | ||||
| import ( | ||||
| 	"time" | ||||
| 
 | ||||
| 	"git.solsynth.dev/hypernet/nexus/pkg/internal/auth" | ||||
| 	"git.solsynth.dev/hypernet/nexus/pkg/internal/http/api" | ||||
| 	"git.solsynth.dev/hypernet/nexus/pkg/internal/web/api" | ||||
| 	"github.com/goccy/go-json" | ||||
| 	"github.com/gofiber/fiber/v2" | ||||
| 	"github.com/gofiber/fiber/v2/middleware/cors" | ||||
| @@ -15,11 +15,11 @@ import ( | ||||
| 	"github.com/spf13/viper" | ||||
| ) | ||||
| 
 | ||||
| type HTTPApp struct { | ||||
| type WebApp struct { | ||||
| 	app *fiber.App | ||||
| } | ||||
| 
 | ||||
| func NewServer() *HTTPApp { | ||||
| func NewServer() *WebApp { | ||||
| 	app := fiber.New(fiber.Config{ | ||||
| 		DisableStartupMessage: true, | ||||
| 		EnableIPValidation:    true, | ||||
| @@ -55,10 +55,10 @@ func NewServer() *HTTPApp { | ||||
| 
 | ||||
| 	api.MapAPIs(app) | ||||
| 
 | ||||
| 	return &HTTPApp{app} | ||||
| 	return &WebApp{app} | ||||
| } | ||||
| 
 | ||||
| func (v *HTTPApp) Listen() { | ||||
| func (v *WebApp) Listen() { | ||||
| 	if err := v.app.Listen(viper.GetString("bind")); err != nil { | ||||
| 		log.Fatal().Err(err).Msg("An error occurred when starting server...") | ||||
| 	} | ||||
| @@ -9,10 +9,10 @@ import ( | ||||
| 	"git.solsynth.dev/hypernet/nexus/pkg/internal/auth" | ||||
| 	"git.solsynth.dev/hypernet/nexus/pkg/internal/database" | ||||
| 	"git.solsynth.dev/hypernet/nexus/pkg/internal/directory" | ||||
| 	server "git.solsynth.dev/hypernet/nexus/pkg/internal/http" | ||||
| 	"git.solsynth.dev/hypernet/nexus/pkg/internal/kv" | ||||
| 	"git.solsynth.dev/hypernet/nexus/pkg/internal/mq" | ||||
| 	"git.solsynth.dev/hypernet/nexus/pkg/internal/watchtower" | ||||
| 	"git.solsynth.dev/hypernet/nexus/pkg/internal/web" | ||||
| 	"git.solsynth.dev/hypernet/nexus/pkg/nex/sec" | ||||
| 	"github.com/fatih/color" | ||||
|  | ||||
| @@ -109,7 +109,7 @@ func main() { | ||||
| 	go directory.ValidateServices() | ||||
|  | ||||
| 	// Server | ||||
| 	go server.NewServer().Listen() | ||||
| 	go web.NewServer().Listen() | ||||
|  | ||||
| 	// Grpc Server | ||||
| 	go grpc.NewServer().Listen() | ||||
|   | ||||
| @@ -1,150 +0,0 @@ | ||||
| package nex | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"git.solsynth.dev/hypernet/nexus/pkg/proto" | ||||
| 	"google.golang.org/grpc" | ||||
| 	health "google.golang.org/grpc/health/grpc_health_v1" | ||||
| 	"google.golang.org/grpc/metadata" | ||||
| 	"google.golang.org/grpc/reflection" | ||||
| 	"net" | ||||
| 	"net/http" | ||||
| 	"strconv" | ||||
| 	"strings" | ||||
| 	"time" | ||||
| ) | ||||
|  | ||||
| type CommandHandler func(ctx *CommandCtx) error | ||||
|  | ||||
| func GetCommandKey(id, method string) string { | ||||
| 	return id + ":" + method | ||||
| } | ||||
|  | ||||
| func (v *Conn) AddCommand(id, method string, tags []string, fn CommandHandler) error { | ||||
| 	method = strings.ToLower(method) | ||||
| 	dir := proto.NewCommandProviderClient(v.nexusConn) | ||||
| 	ctx := context.Background() | ||||
| 	ctx = metadata.AppendToOutgoingContext(ctx, "client_id", v.Info.Id) | ||||
|  | ||||
| 	var addingMethodQueue []string | ||||
| 	if method == "all" { | ||||
| 		addingMethodQueue = []string{"get", "post", "put", "patch", "delete"} | ||||
| 	} else { | ||||
| 		addingMethodQueue = append(addingMethodQueue, method) | ||||
| 	} | ||||
|  | ||||
| 	for _, method := range addingMethodQueue { | ||||
| 		ky := GetCommandKey(id, method) | ||||
| 		_, err := dir.AddCommand(ctx, &proto.CommandInfo{ | ||||
| 			Id:     id, | ||||
| 			Method: method, | ||||
| 			Tags:   tags, | ||||
| 		}) | ||||
| 		if err == nil { | ||||
| 			v.commandHandlers[ky] = fn | ||||
| 		} else { | ||||
| 			return err | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| type localCommandRpcServer struct { | ||||
| 	conn *Conn | ||||
|  | ||||
| 	proto.UnimplementedCommandProviderServer | ||||
| 	health.UnimplementedHealthServer | ||||
| } | ||||
|  | ||||
| func (v localCommandRpcServer) SendCommand(ctx context.Context, argument *proto.CommandArgument) (*proto.CommandReturn, error) { | ||||
| 	ky := GetCommandKey(argument.GetCommand(), argument.GetMethod()) | ||||
| 	if handler, ok := v.conn.commandHandlers[ky]; !ok { | ||||
| 		return &proto.CommandReturn{ | ||||
| 			Status:  http.StatusNotFound, | ||||
| 			Payload: []byte(argument.GetCommand() + " not found"), | ||||
| 		}, nil | ||||
| 	} else { | ||||
| 		cc := &CommandCtx{ | ||||
| 			requestBody: argument.GetPayload(), | ||||
| 			statusCode:  http.StatusOK, | ||||
| 		} | ||||
| 		if md, ok := metadata.FromIncomingContext(ctx); ok { | ||||
| 			for k, v := range md { | ||||
| 				var val any = nil | ||||
| 				if len(v) == 1 { | ||||
| 					if len(v[0]) != 0 { | ||||
| 						if i, err := strconv.ParseInt(v[0], 10, 64); err == nil { | ||||
| 							val = i | ||||
| 						} else if b, err := strconv.ParseBool(v[0]); err == nil { | ||||
| 							val = b | ||||
| 						} else if f, err := strconv.ParseFloat(v[0], 64); err == nil { | ||||
| 							val = f | ||||
| 						} | ||||
| 						layouts := []string{ | ||||
| 							time.RFC3339, | ||||
| 							"2006-01-02 15:04:05", // Example: 2024-10-20 14:55:05 | ||||
| 							"2006-01-02",          // Example: 2024-10-20 | ||||
| 						} | ||||
| 						for _, layout := range layouts { | ||||
| 							if t, err := time.Parse(layout, v[0]); err == nil { | ||||
| 								val = t | ||||
| 							} | ||||
| 						} | ||||
| 						if val == nil { | ||||
| 							val = v[0] | ||||
| 						} | ||||
| 					} else { | ||||
| 						val = v[0] | ||||
| 					} | ||||
| 				} else if len(v) > 1 { | ||||
| 					val = v | ||||
| 				} | ||||
| 				cc.values.Store(k, val) | ||||
| 			} | ||||
| 		} | ||||
| 		if err := handler(cc); err != nil { | ||||
| 			return nil, err | ||||
| 		} else { | ||||
| 			return &proto.CommandReturn{ | ||||
| 				Status:      int32(cc.statusCode), | ||||
| 				ContentType: cc.contentType, | ||||
| 				Payload:     cc.responseBody, | ||||
| 			}, nil | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (v localCommandRpcServer) Check(ctx context.Context, request *health.HealthCheckRequest) (*health.HealthCheckResponse, error) { | ||||
| 	return &health.HealthCheckResponse{ | ||||
| 		Status: health.HealthCheckResponse_SERVING, | ||||
| 	}, nil | ||||
| } | ||||
|  | ||||
| func (v localCommandRpcServer) Watch(request *health.HealthCheckRequest, server health.Health_WatchServer) error { | ||||
| 	for { | ||||
| 		if server.Send(&health.HealthCheckResponse{ | ||||
| 			Status: health.HealthCheckResponse_SERVING, | ||||
| 		}) != nil { | ||||
| 			break | ||||
| 		} | ||||
| 		time.Sleep(1000 * time.Millisecond) | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (v *Conn) RunCommands(addr string) error { | ||||
| 	v.commandServer = grpc.NewServer() | ||||
| 	service := &localCommandRpcServer{conn: v} | ||||
| 	proto.RegisterCommandProviderServer(v.commandServer, service) | ||||
| 	health.RegisterHealthServer(v.commandServer, service) | ||||
| 	reflection.Register(v.commandServer) | ||||
|  | ||||
| 	listener, err := net.Listen("tcp", addr) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	return v.commandServer.Serve(listener) | ||||
| } | ||||
| @@ -1,90 +0,0 @@ | ||||
| package nex | ||||
|  | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"github.com/goccy/go-json" | ||||
| 	"net/http" | ||||
| 	"sync" | ||||
| ) | ||||
|  | ||||
| type CommandCtx struct { | ||||
| 	requestBody  []byte | ||||
| 	responseBody []byte | ||||
|  | ||||
| 	contentType string | ||||
| 	statusCode  int | ||||
|  | ||||
| 	values sync.Map | ||||
| } | ||||
|  | ||||
| func CtxValueMustBe[T any](c *CommandCtx, key string) (T, error) { | ||||
| 	if val, ok := c.values.Load(key); ok { | ||||
| 		if v, ok := val.(T); ok { | ||||
| 			return v, nil | ||||
| 		} | ||||
| 	} | ||||
| 	var out T | ||||
| 	if err := c.Write([]byte(fmt.Sprintf("value %s not found in type %T", key, out)), "text/plain+error", http.StatusBadRequest); err != nil { | ||||
| 		return out, err | ||||
| 	} | ||||
| 	return out, fmt.Errorf("value %s not found", key) | ||||
| } | ||||
|  | ||||
| func CtxValueShouldBe[T any](c *CommandCtx, key string, defaultValue T) T { | ||||
| 	if val, ok := c.values.Load(key); ok { | ||||
| 		if v, ok := val.(T); ok { | ||||
| 			return v | ||||
| 		} | ||||
| 	} | ||||
| 	return defaultValue | ||||
| } | ||||
|  | ||||
| func (c *CommandCtx) Values() map[string]any { | ||||
| 	duplicate := make(map[string]any) | ||||
| 	c.values.Range(func(key, value any) bool { | ||||
| 		duplicate[key.(string)] = value | ||||
| 		return true | ||||
| 	}) | ||||
| 	return duplicate | ||||
| } | ||||
|  | ||||
| func (c *CommandCtx) ValueOrElse(key string, defaultValue any) any { | ||||
| 	val, _ := c.values.Load(key) | ||||
| 	if val == nil { | ||||
| 		return defaultValue | ||||
| 	} | ||||
| 	return val | ||||
| } | ||||
|  | ||||
| func (c *CommandCtx) Value(key string, newValue ...any) any { | ||||
| 	if len(newValue) > 0 { | ||||
| 		c.values.Store(key, newValue[0]) | ||||
| 	} | ||||
| 	val, _ := c.values.Load(key) | ||||
| 	return val | ||||
| } | ||||
|  | ||||
| func (c *CommandCtx) Read() []byte { | ||||
| 	return c.requestBody | ||||
| } | ||||
|  | ||||
| func (c *CommandCtx) ReadJSON(out any) error { | ||||
| 	return json.Unmarshal(c.requestBody, out) | ||||
| } | ||||
|  | ||||
| func (c *CommandCtx) Write(data []byte, contentType string, statusCode ...int) error { | ||||
| 	c.responseBody = data | ||||
| 	c.contentType = contentType | ||||
| 	if len(statusCode) > 0 { | ||||
| 		c.statusCode = statusCode[0] | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (c *CommandCtx) JSON(data any, statusCode ...int) error { | ||||
| 	raw, err := json.Marshal(data) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	return c.Write(raw, "application/json", statusCode...) | ||||
| } | ||||
| @@ -1,60 +0,0 @@ | ||||
| package nex_test | ||||
|  | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"git.solsynth.dev/hypernet/nexus/pkg/nex" | ||||
| 	"git.solsynth.dev/hypernet/nexus/pkg/proto" | ||||
| 	"net/http" | ||||
| 	"testing" | ||||
| 	"time" | ||||
| ) | ||||
|  | ||||
| func TestHandleCommand(t *testing.T) { | ||||
| 	conn, err := nex.NewNexusConn("127.0.0.1:7001", &proto.ServiceInfo{ | ||||
| 		Id:       "echo01", | ||||
| 		Type:     "echo", | ||||
| 		Label:    "Echo", | ||||
| 		GrpcAddr: "127.0.0.1:6001", | ||||
| 		HttpAddr: nil, | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		t.Fatal(fmt.Errorf("unable to connect nexus: %v", err)) | ||||
| 	} | ||||
|  | ||||
| 	if err := conn.RegisterService(); err != nil { | ||||
| 		t.Fatal(fmt.Errorf("unable to register service: %v", err)) | ||||
| 	} | ||||
|  | ||||
| 	err = conn.AddCommand("say.hi", "all", nil, func(ctx *nex.CommandCtx) error { | ||||
| 		return ctx.Write([]byte("Hello, World!"), "text/plain", http.StatusOK) | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		t.Fatal(fmt.Errorf("unable to add command: %v", err)) | ||||
| 		return | ||||
| 	} | ||||
| 	err = conn.AddCommand("echo", "all", nil, func(ctx *nex.CommandCtx) error { | ||||
| 		t.Log("Received command: ", string(ctx.Read())) | ||||
| 		return ctx.Write(ctx.Read(), "text/plain", http.StatusOK) | ||||
| 	}) | ||||
| 	err = conn.AddCommand("echo.details", "all", nil, func(ctx *nex.CommandCtx) error { | ||||
| 		return ctx.JSON(map[string]any{ | ||||
| 			"values": ctx.Values(), | ||||
| 			"body":   ctx.Read(), | ||||
| 		}, http.StatusOK) | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		t.Fatal(fmt.Errorf("unable to add command: %v", err)) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	go func() { | ||||
| 		err := conn.RunCommands("0.0.0.0:6001") | ||||
| 		if err != nil { | ||||
| 			t.Error(fmt.Errorf("unable to run commands: %v", err)) | ||||
| 			return | ||||
| 		} | ||||
| 	}() | ||||
|  | ||||
| 	t.Log("Waiting 60 seconds for calling command...") | ||||
| 	time.Sleep(time.Second * 60) | ||||
| } | ||||
| @@ -2,9 +2,10 @@ package nex | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"google.golang.org/grpc/metadata" | ||||
| 	"time" | ||||
|  | ||||
| 	"google.golang.org/grpc/metadata" | ||||
|  | ||||
| 	"git.solsynth.dev/hypernet/nexus/pkg/proto" | ||||
| 	"google.golang.org/grpc" | ||||
| 	"google.golang.org/grpc/credentials/insecure" | ||||
| @@ -15,9 +16,6 @@ type Conn struct { | ||||
| 	Addr string | ||||
| 	Info *proto.ServiceInfo | ||||
|  | ||||
| 	commandServer   *grpc.Server | ||||
| 	commandHandlers map[string]CommandHandler | ||||
|  | ||||
| 	nexusConn  *grpc.ClientConn | ||||
| 	clientConn map[string]*grpc.ClientConn | ||||
| } | ||||
| @@ -35,8 +33,6 @@ func NewNexusConn(addr string, info *proto.ServiceInfo) (*Conn, error) { | ||||
| 		Addr: addr, | ||||
| 		Info: info, | ||||
|  | ||||
| 		commandHandlers: make(map[string]CommandHandler), | ||||
|  | ||||
| 		nexusConn:  conn, | ||||
| 		clientConn: make(map[string]*grpc.ClientConn), | ||||
| 	}, nil | ||||
|   | ||||
| @@ -1,130 +0,0 @@ | ||||
| package cruda | ||||
|  | ||||
| import ( | ||||
| 	"errors" | ||||
| 	"git.solsynth.dev/hypernet/nexus/pkg/nex" | ||||
| 	"github.com/go-playground/validator/v10" | ||||
| 	"gorm.io/gorm" | ||||
| 	"net/http" | ||||
| ) | ||||
|  | ||||
| type CrudAction func(v *CrudConn) nex.CommandHandler | ||||
|  | ||||
| func AddModel[T any](v *CrudConn, model T, id, prefix string, tags []string) error { | ||||
| 	funcList := []CrudAction{cmdList[T], cmdGet[T], cmdCreate[T], cmdUpdate[T], cmdDelete[T]} | ||||
| 	funcCmds := []string{".list", "", "", "", ""} | ||||
| 	funcMethods := []string{"get", "get", "put", "patch", "delete"} | ||||
| 	for idx, fn := range funcList { | ||||
| 		if err := v.n.AddCommand(prefix+id+funcCmds[idx], funcMethods[idx], tags, fn(v)); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| var validate = validator.New(validator.WithRequiredStructEnabled()) | ||||
|  | ||||
| func cmdList[T any](c *CrudConn) nex.CommandHandler { | ||||
| 	return func(ctx *nex.CommandCtx) error { | ||||
| 		take := int(nex.CtxValueShouldBe[int64](ctx, "query.take", 10)) | ||||
| 		skip := int(nex.CtxValueShouldBe[int64](ctx, "query.skip", 0)) | ||||
|  | ||||
| 		var str T | ||||
| 		var count int64 | ||||
| 		if err := c.Db.Model(str).Count(&count).Error; err != nil { | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 		var out []T | ||||
| 		if err := c.Db.Offset(skip).Limit(take).Find(&out).Error; err != nil { | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 		return ctx.JSON(map[string]any{ | ||||
| 			"count": count, | ||||
| 			"data":  out, | ||||
| 		}, http.StatusOK) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func cmdGet[T any](c *CrudConn) nex.CommandHandler { | ||||
| 	return func(ctx *nex.CommandCtx) error { | ||||
| 		id, err := nex.CtxValueMustBe[int64](ctx, "query.id") | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 		var out T | ||||
| 		if err := c.Db.First(&out, "id = ?", id).Error; err != nil { | ||||
| 			if errors.Is(err, gorm.ErrRecordNotFound) { | ||||
| 				return ctx.Write([]byte(err.Error()), "text/plain", http.StatusNotFound) | ||||
| 			} | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 		return ctx.JSON(out, http.StatusOK) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func cmdCreate[T any](c *CrudConn) nex.CommandHandler { | ||||
| 	return func(ctx *nex.CommandCtx) error { | ||||
| 		var payload T | ||||
| 		if err := ctx.ReadJSON(&payload); err != nil { | ||||
| 			return err | ||||
| 		} else if err := validate.Struct(payload); err != nil { | ||||
| 			return ctx.Write([]byte(err.Error()), "text/plain+error", http.StatusBadRequest) | ||||
| 		} | ||||
|  | ||||
| 		if err := c.Db.Create(&payload).Error; err != nil { | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 		return ctx.JSON(payload, http.StatusOK) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func cmdUpdate[T any](c *CrudConn) nex.CommandHandler { | ||||
| 	return func(ctx *nex.CommandCtx) error { | ||||
| 		id, err := nex.CtxValueMustBe[int64](ctx, "query.id") | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 		var payload T | ||||
| 		if err := ctx.ReadJSON(&payload); err != nil { | ||||
| 			return err | ||||
| 		} else if err := validate.Struct(payload); err != nil { | ||||
| 			return ctx.Write([]byte(err.Error()), "text/plain+error", http.StatusBadRequest) | ||||
| 		} | ||||
|  | ||||
| 		var out T | ||||
| 		if err := c.Db.Model(out).Where("id = ?", id).Updates(&payload).Error; err != nil { | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 		if err := c.Db.First(&out, "id = ?", id).Error; err != nil { | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 		return ctx.JSON(out, http.StatusOK) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func cmdDelete[T any](c *CrudConn) nex.CommandHandler { | ||||
| 	return func(ctx *nex.CommandCtx) error { | ||||
| 		id, err := nex.CtxValueMustBe[int64](ctx, "query.id") | ||||
| 		if err != nil { | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 		var out T | ||||
| 		if err := c.Db.Delete(&out, "id = ?", id).Error; err != nil { | ||||
| 			if errors.Is(err, gorm.ErrRecordNotFound) { | ||||
| 				return ctx.Write([]byte(err.Error()), "text/plain", http.StatusNotFound) | ||||
| 			} | ||||
| 			return err | ||||
| 		} | ||||
|  | ||||
| 		return ctx.Write(nil, "text/plain", http.StatusOK) | ||||
| 	} | ||||
| } | ||||
| @@ -1,58 +0,0 @@ | ||||
| package cruda_test | ||||
|  | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"git.solsynth.dev/hypernet/nexus/pkg/nex" | ||||
| 	"git.solsynth.dev/hypernet/nexus/pkg/nex/cruda" | ||||
| 	"git.solsynth.dev/hypernet/nexus/pkg/proto" | ||||
| 	"testing" | ||||
| 	"time" | ||||
| ) | ||||
|  | ||||
| type Test struct { | ||||
| 	cruda.BaseModel | ||||
| 	Content string `json:"content" validate:"required"` | ||||
| } | ||||
|  | ||||
| func TestCrudaCommand(t *testing.T) { | ||||
| 	conn, err := nex.NewNexusConn("127.0.0.1:7001", &proto.ServiceInfo{ | ||||
| 		Id:       "cruda01", | ||||
| 		Type:     "cruda", | ||||
| 		Label:    "CRUD Accelerator", | ||||
| 		GrpcAddr: "127.0.0.1:6001", | ||||
| 		HttpAddr: nil, | ||||
| 	}) | ||||
| 	if err != nil { | ||||
| 		t.Fatal(fmt.Errorf("unable to connect nexus: %v", err)) | ||||
| 	} | ||||
|  | ||||
| 	if err := conn.RegisterService(); err != nil { | ||||
| 		t.Fatal(fmt.Errorf("unable to register service: %v", err)) | ||||
| 	} | ||||
|  | ||||
| 	cc := cruda.NewCrudaConn(conn) | ||||
| 	dsn, err := cc.AllocDatabase("test") | ||||
| 	if err != nil { | ||||
| 		t.Fatal(fmt.Errorf("unable to allocate database: %v", err)) | ||||
| 	} | ||||
| 	t.Log(fmt.Sprintf("Allocated database: %s", dsn)) | ||||
|  | ||||
| 	if err := cruda.MigrateModel(cc, Test{}); err != nil { | ||||
| 		t.Fatal(fmt.Errorf("unable to migrate database: %v", err)) | ||||
| 	} | ||||
|  | ||||
| 	if err := cruda.AddModel(cc, Test{}, "tm", "test.", nil); err != nil { | ||||
| 		t.Fatal(fmt.Errorf("unable to add commands: %v", err)) | ||||
| 	} | ||||
|  | ||||
| 	go func() { | ||||
| 		err := conn.RunCommands("0.0.0.0:6001") | ||||
| 		if err != nil { | ||||
| 			t.Error(fmt.Errorf("unable to run commands: %v", err)) | ||||
| 			return | ||||
| 		} | ||||
| 	}() | ||||
|  | ||||
| 	t.Log("Waiting 180 seconds for calling command...") | ||||
| 	time.Sleep(time.Second * 180) | ||||
| } | ||||
		Reference in New Issue
	
	Block a user