diff --git a/pkg/directory/command_rpc.go b/pkg/directory/command_rpc.go index 9e974fd..1d2bb28 100644 --- a/pkg/directory/command_rpc.go +++ b/pkg/directory/command_rpc.go @@ -48,6 +48,7 @@ func (c CommandRpcServer) SendCommand(ctx context.Context, argument *proto.Comma return &proto.CommandReturn{ IsDelivered: false, Status: http.StatusNotFound, + ContentType: "text/plain+error", Payload: []byte("command not found"), }, nil } @@ -57,6 +58,7 @@ func (c CommandRpcServer) SendCommand(ctx context.Context, argument *proto.Comma return &proto.CommandReturn{ IsDelivered: false, Status: http.StatusServiceUnavailable, + ContentType: "text/plain+error", Payload: []byte("service unavailable"), }, nil } @@ -69,6 +71,7 @@ func (c CommandRpcServer) SendCommand(ctx context.Context, argument *proto.Comma return &proto.CommandReturn{ IsDelivered: true, Status: http.StatusInternalServerError, + ContentType: "text/plain+error", Payload: []byte(err.Error()), }, nil } @@ -96,20 +99,18 @@ func (c CommandRpcServer) SendStreamCommand(g grpc.BidiStreamingServer[proto.Com conn, err := handler.GetGrpcConn() ctx, cancel := context.WithTimeout(g.Context(), time.Second*10) - result, err := proto.NewCommandControllerClient(conn).SendCommand(ctx, pck) + out, err := proto.NewCommandControllerClient(conn).SendCommand(ctx, pck) cancel() if err != nil { _ = g.Send(&proto.CommandReturn{ IsDelivered: false, Status: http.StatusInternalServerError, + ContentType: "text/plain+error", Payload: []byte(err.Error()), }) } else { - _ = g.Send(&proto.CommandReturn{ - Status: result.Status, - Payload: result.Payload, - }) + _ = g.Send(out) } } } diff --git a/pkg/http/api/command.go b/pkg/http/api/command.go index f3c53ff..f23088f 100644 --- a/pkg/http/api/command.go +++ b/pkg/http/api/command.go @@ -2,6 +2,7 @@ package api import ( "context" + "fmt" "git.solsynth.dev/hypernet/nexus/pkg/directory" "git.solsynth.dev/hypernet/nexus/pkg/proto" "github.com/gofiber/fiber/v2" @@ -27,7 +28,18 @@ func invokeCommand(c *fiber.Ctx) error { log.Debug().Str("id", command).Str("method", method).Msg("Invoking command from HTTP Gateway...") - ctx := metadata.AppendToOutgoingContext(c.Context(), "client_id", "http-gateway", "ip", c.IP(), "user_agent", c.Get(fiber.HeaderUserAgent)) + var meta []string + meta = append(meta, "client_id", "http-gateway") + meta = append(meta, "net.ip", c.IP()) + meta = append(meta, "http.user_agent", c.Get(fiber.HeaderUserAgent)) + for k, v := range c.GetReqHeaders() { + meta = append( + meta, + strings.ToLower(fmt.Sprintf("header.%s", strings.ReplaceAll(k, "-", "_"))), + strings.Join(v, "\n"), + ) + } + ctx := metadata.AppendToOutgoingContext(c.Context(), meta...) ctx, cancel := context.WithTimeout(ctx, time.Second*10) defer cancel() @@ -43,6 +55,7 @@ func invokeCommand(c *fiber.Ctx) error { if !out.IsDelivered { log.Debug().Str("id", command).Str("method", method).Msg("Invoking command from HTTP Gateway... failed, delivery not confirmed") } + c.Set(fiber.HeaderContentType, out.ContentType) return c.Status(int(out.Status)).Send(out.Payload) } } diff --git a/pkg/nex/command.go b/pkg/nex/command.go index a7d28b5..a87969c 100644 --- a/pkg/nex/command.go +++ b/pkg/nex/command.go @@ -77,8 +77,9 @@ func (v localCommandRpcServer) SendCommand(ctx context.Context, argument *proto. return nil, err } else { return &proto.CommandReturn{ - Status: int32(cc.statusCode), - Payload: cc.responseBody, + Status: int32(cc.statusCode), + ContentType: cc.contentType, + Payload: cc.responseBody, }, nil } } diff --git a/pkg/nex/command_context.go b/pkg/nex/command_context.go index bb1a344..8f4803c 100644 --- a/pkg/nex/command_context.go +++ b/pkg/nex/command_context.go @@ -9,11 +9,29 @@ type CommandCtx struct { requestBody []byte responseBody []byte - statusCode int + contentType string + statusCode int values sync.Map } +func (c *CommandCtx) Values() map[string]any { + duplicate := make(map[string]any) + c.values.Range(func(key, value any) bool { + duplicate[key.(string)] = value + return true + }) + return duplicate +} + +func (c *CommandCtx) ValueOrElse(key string, defaultValue any) any { + val, _ := c.values.Load(key) + if val == nil { + return defaultValue + } + return val +} + func (c *CommandCtx) Value(key string, newValue ...any) any { if len(newValue) > 0 { c.values.Store(key, newValue[0]) @@ -30,8 +48,9 @@ func (c *CommandCtx) ReadJSON(out any) error { return json.Unmarshal(c.requestBody, out) } -func (c *CommandCtx) Write(data []byte, statusCode ...int) error { +func (c *CommandCtx) Write(data []byte, contentType string, statusCode ...int) error { c.responseBody = data + c.contentType = contentType if len(statusCode) > 0 { c.statusCode = statusCode[0] } @@ -43,5 +62,5 @@ func (c *CommandCtx) JSON(data any, statusCode ...int) error { if err != nil { return err } - return c.Write(raw, statusCode...) + return c.Write(raw, "application/json", statusCode...) } diff --git a/pkg/nex/command_test.go b/pkg/nex/command_test.go index 72d0b2b..1cbdad5 100644 --- a/pkg/nex/command_test.go +++ b/pkg/nex/command_test.go @@ -26,7 +26,7 @@ func TestHandleCommand(t *testing.T) { } err = conn.AddCommand("say.hi", "all", nil, func(ctx *nex.CommandCtx) error { - return ctx.Write([]byte("Hello, World!"), http.StatusOK) + return ctx.Write([]byte("Hello, World!"), "text/plain", http.StatusOK) }) if err != nil { t.Fatal(fmt.Errorf("unable to add command: %v", err)) @@ -34,7 +34,13 @@ func TestHandleCommand(t *testing.T) { } err = conn.AddCommand("echo", "all", nil, func(ctx *nex.CommandCtx) error { t.Log("Received command: ", string(ctx.Read())) - return ctx.Write(ctx.Read(), http.StatusOK) + return ctx.Write(ctx.Read(), "text/plain", http.StatusOK) + }) + err = conn.AddCommand("echo.details", "all", nil, func(ctx *nex.CommandCtx) error { + return ctx.JSON(map[string]any{ + "values": ctx.Values(), + "body": ctx.Read(), + }, http.StatusOK) }) if err != nil { t.Fatal(fmt.Errorf("unable to add command: %v", err)) diff --git a/pkg/proto/command.pb.go b/pkg/proto/command.pb.go index 14e23d5..82c2d89 100644 --- a/pkg/proto/command.pb.go +++ b/pkg/proto/command.pb.go @@ -292,7 +292,8 @@ type CommandReturn struct { IsDelivered bool `protobuf:"varint,1,opt,name=is_delivered,json=isDelivered,proto3" json:"is_delivered,omitempty"` Status int32 `protobuf:"varint,2,opt,name=status,proto3" json:"status,omitempty"` - Payload []byte `protobuf:"bytes,3,opt,name=payload,proto3,oneof" json:"payload,omitempty"` + ContentType string `protobuf:"bytes,3,opt,name=content_type,json=contentType,proto3" json:"content_type,omitempty"` + Payload []byte `protobuf:"bytes,4,opt,name=payload,proto3,oneof" json:"payload,omitempty"` } func (x *CommandReturn) Reset() { @@ -339,6 +340,13 @@ func (x *CommandReturn) GetStatus() int32 { return 0 } +func (x *CommandReturn) GetContentType() string { + if x != nil { + return x.ContentType + } + return "" +} + func (x *CommandReturn) GetPayload() []byte { if x != nil { return x.Payload @@ -372,35 +380,37 @@ var file_command_proto_rawDesc = []byte{ 0x65, 0x74, 0x68, 0x6f, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x6d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x12, 0x1d, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x88, - 0x01, 0x01, 0x42, 0x0a, 0x0a, 0x08, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x75, - 0x0a, 0x0d, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, 0x74, 0x75, 0x72, 0x6e, 0x12, - 0x21, 0x0a, 0x0c, 0x69, 0x73, 0x5f, 0x64, 0x65, 0x6c, 0x69, 0x76, 0x65, 0x72, 0x65, 0x64, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0b, 0x69, 0x73, 0x44, 0x65, 0x6c, 0x69, 0x76, 0x65, 0x72, - 0x65, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x05, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x1d, 0x0a, 0x07, 0x70, 0x61, - 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, 0x07, 0x70, - 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x88, 0x01, 0x01, 0x42, 0x0a, 0x0a, 0x08, 0x5f, 0x70, 0x61, - 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x32, 0xa8, 0x02, 0x0a, 0x11, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, - 0x64, 0x43, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x12, 0x3d, 0x0a, 0x0a, 0x41, - 0x64, 0x64, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x12, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x49, 0x6e, 0x66, 0x6f, 0x1a, 0x19, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x41, 0x64, 0x64, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4c, 0x0a, 0x0d, 0x52, 0x65, - 0x6d, 0x6f, 0x76, 0x65, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x1b, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, - 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x2e, 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x3d, 0x0a, 0x0b, 0x53, 0x65, 0x6e, 0x64, - 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x16, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, - 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x41, 0x72, 0x67, 0x75, 0x6d, 0x65, 0x6e, 0x74, 0x1a, - 0x14, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, - 0x65, 0x74, 0x75, 0x72, 0x6e, 0x22, 0x00, 0x12, 0x47, 0x0a, 0x11, 0x53, 0x65, 0x6e, 0x64, 0x53, - 0x74, 0x72, 0x65, 0x61, 0x6d, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x16, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x41, 0x72, 0x67, 0x75, - 0x6d, 0x65, 0x6e, 0x74, 0x1a, 0x14, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x6f, 0x6d, - 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, 0x74, 0x75, 0x72, 0x6e, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, - 0x42, 0x09, 0x5a, 0x07, 0x2e, 0x3b, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x33, + 0x01, 0x01, 0x42, 0x0a, 0x0a, 0x08, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x98, + 0x01, 0x0a, 0x0d, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, 0x74, 0x75, 0x72, 0x6e, + 0x12, 0x21, 0x0a, 0x0c, 0x69, 0x73, 0x5f, 0x64, 0x65, 0x6c, 0x69, 0x76, 0x65, 0x72, 0x65, 0x64, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0b, 0x69, 0x73, 0x44, 0x65, 0x6c, 0x69, 0x76, 0x65, + 0x72, 0x65, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x05, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x21, 0x0a, 0x0c, 0x63, + 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x0b, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1d, + 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x48, + 0x00, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x88, 0x01, 0x01, 0x42, 0x0a, 0x0a, + 0x08, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x32, 0xa8, 0x02, 0x0a, 0x11, 0x43, 0x6f, + 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x43, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x6c, 0x65, 0x72, 0x12, + 0x3d, 0x0a, 0x0a, 0x41, 0x64, 0x64, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x12, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x49, 0x6e, 0x66, + 0x6f, 0x1a, 0x19, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x41, 0x64, 0x64, 0x43, 0x6f, 0x6d, + 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4c, + 0x0a, 0x0d, 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, + 0x1b, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x4c, + 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x43, 0x6f, 0x6d, 0x6d, 0x61, + 0x6e, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x3d, 0x0a, 0x0b, + 0x53, 0x65, 0x6e, 0x64, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x16, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x41, 0x72, 0x67, 0x75, 0x6d, + 0x65, 0x6e, 0x74, 0x1a, 0x14, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, + 0x61, 0x6e, 0x64, 0x52, 0x65, 0x74, 0x75, 0x72, 0x6e, 0x22, 0x00, 0x12, 0x47, 0x0a, 0x11, 0x53, + 0x65, 0x6e, 0x64, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, + 0x12, 0x16, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, + 0x41, 0x72, 0x67, 0x75, 0x6d, 0x65, 0x6e, 0x74, 0x1a, 0x14, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, 0x74, 0x75, 0x72, 0x6e, 0x22, 0x00, + 0x28, 0x01, 0x30, 0x01, 0x42, 0x09, 0x5a, 0x07, 0x2e, 0x3b, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, + 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/pkg/proto/command.proto b/pkg/proto/command.proto index a7ee03e..24212f0 100644 --- a/pkg/proto/command.proto +++ b/pkg/proto/command.proto @@ -39,5 +39,6 @@ message CommandArgument { message CommandReturn { bool is_delivered = 1; int32 status = 2; - optional bytes payload = 3; + string content_type = 3; + optional bytes payload = 4; } \ No newline at end of file