✨ Broadcast event
This commit is contained in:
22
pkg/internal/directory/command.go
Normal file
22
pkg/internal/directory/command.go
Normal file
@ -0,0 +1,22 @@
|
||||
package directory
|
||||
|
||||
const (
|
||||
CommandMethodGet = "get"
|
||||
CommandMethodPut = "put"
|
||||
CommandMethodPatch = "patch"
|
||||
CommandMethodPost = "post"
|
||||
CommandMethodDelete = "delete"
|
||||
)
|
||||
|
||||
type Command struct {
|
||||
// The unique identifier of the command, different method command can hold the same command id
|
||||
ID string `json:"id"`
|
||||
// The method of the command, such as get, post, others; inspired by RESTful design
|
||||
Method string `json:"method"`
|
||||
// The tags of the command will be used to invoke the pre-command middlewares and post-command middlewares
|
||||
Tags []string `json:"tags"`
|
||||
// The implementation of the command, the handler is the service that will be invoked
|
||||
Handler []*ServiceInstance `json:"handler"`
|
||||
|
||||
robinIndex uint
|
||||
}
|
67
pkg/internal/directory/command_mapping.go
Normal file
67
pkg/internal/directory/command_mapping.go
Normal file
@ -0,0 +1,67 @@
|
||||
package directory
|
||||
|
||||
import (
|
||||
"git.solsynth.dev/hypernet/nexus/pkg/nex"
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/samber/lo"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// In commands, we use the map and the mutex because it is usually read and only sometimes write
|
||||
var commandDirectory = make(map[string]*Command)
|
||||
var commandDirectoryMutex sync.Mutex
|
||||
|
||||
func AddCommand(id, method string, tags []string, handler *ServiceInstance) {
|
||||
commandDirectoryMutex.Lock()
|
||||
defer commandDirectoryMutex.Unlock()
|
||||
|
||||
if tags == nil {
|
||||
tags = make([]string, 0)
|
||||
}
|
||||
|
||||
ky := nex.GetCommandKey(id, method)
|
||||
if _, ok := commandDirectory[ky]; !ok {
|
||||
commandDirectory[ky] = &Command{
|
||||
ID: id,
|
||||
Method: method,
|
||||
Tags: tags,
|
||||
Handler: []*ServiceInstance{handler},
|
||||
}
|
||||
} else {
|
||||
commandDirectory[ky].Handler = append(commandDirectory[ky].Handler, handler)
|
||||
commandDirectory[ky].Tags = lo.Uniq(append(commandDirectory[ky].Tags, tags...))
|
||||
}
|
||||
|
||||
commandDirectory[ky].Handler = lo.UniqBy(commandDirectory[ky].Handler, func(item *ServiceInstance) string {
|
||||
return item.ID
|
||||
})
|
||||
|
||||
log.Info().Str("id", id).Str("method", method).Str("tags", strings.Join(tags, ",")).Msg("New command registered")
|
||||
}
|
||||
|
||||
func GetCommandHandler(id, method string) *ServiceInstance {
|
||||
commandDirectoryMutex.Lock()
|
||||
defer commandDirectoryMutex.Unlock()
|
||||
|
||||
ky := nex.GetCommandKey(id, method)
|
||||
if val, ok := commandDirectory[ky]; ok {
|
||||
if len(val.Handler) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
idx := val.robinIndex % uint(len(val.Handler))
|
||||
val.robinIndex = idx + 1
|
||||
return val.Handler[idx]
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func RemoveCommand(id, method string) {
|
||||
commandDirectoryMutex.Lock()
|
||||
defer commandDirectoryMutex.Unlock()
|
||||
|
||||
ky := nex.GetCommandKey(id, method)
|
||||
delete(commandDirectory, ky)
|
||||
}
|
116
pkg/internal/directory/command_rpc.go
Normal file
116
pkg/internal/directory/command_rpc.go
Normal file
@ -0,0 +1,116 @@
|
||||
package directory
|
||||
|
||||
import (
|
||||
"context"
|
||||
"git.solsynth.dev/hypernet/nexus/pkg/proto"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
type CommandRpcServer struct {
|
||||
proto.UnimplementedCommandControllerServer
|
||||
}
|
||||
|
||||
func (c CommandRpcServer) AddCommand(ctx context.Context, info *proto.CommandInfo) (*proto.AddCommandResponse, error) {
|
||||
clientId, err := GetClientId(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
service := GetServiceInstance(clientId)
|
||||
if service == nil {
|
||||
return nil, status.Errorf(codes.NotFound, "service not found")
|
||||
}
|
||||
|
||||
AddCommand(info.GetId(), info.GetMethod(), info.GetTags(), service)
|
||||
return &proto.AddCommandResponse{
|
||||
IsSuccess: true,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c CommandRpcServer) RemoveCommand(ctx context.Context, request *proto.CommandLookupRequest) (*proto.RemoveCommandResponse, error) {
|
||||
RemoveCommand(request.GetId(), request.GetMethod())
|
||||
return &proto.RemoveCommandResponse{
|
||||
IsSuccess: true,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c CommandRpcServer) SendCommand(ctx context.Context, argument *proto.CommandArgument) (*proto.CommandReturn, error) {
|
||||
id := argument.GetCommand()
|
||||
method := argument.GetMethod()
|
||||
|
||||
handler := GetCommandHandler(id, method)
|
||||
if handler == nil {
|
||||
return &proto.CommandReturn{
|
||||
IsDelivered: false,
|
||||
Status: http.StatusNotFound,
|
||||
ContentType: "text/plain+error",
|
||||
Payload: []byte("command not found"),
|
||||
}, nil
|
||||
}
|
||||
|
||||
conn, err := handler.GetGrpcConn()
|
||||
if err != nil {
|
||||
return &proto.CommandReturn{
|
||||
IsDelivered: false,
|
||||
Status: http.StatusServiceUnavailable,
|
||||
ContentType: "text/plain+error",
|
||||
Payload: []byte("service unavailable"),
|
||||
}, nil
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
|
||||
defer cancel()
|
||||
|
||||
out, err := proto.NewCommandControllerClient(conn).SendCommand(ctx, argument)
|
||||
if err != nil {
|
||||
return &proto.CommandReturn{
|
||||
IsDelivered: true,
|
||||
Status: http.StatusInternalServerError,
|
||||
ContentType: "text/plain+error",
|
||||
Payload: []byte(err.Error()),
|
||||
}, nil
|
||||
}
|
||||
out.IsDelivered = true
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c CommandRpcServer) SendStreamCommand(g grpc.BidiStreamingServer[proto.CommandArgument, proto.CommandReturn]) error {
|
||||
for {
|
||||
pck, err := g.Recv()
|
||||
if err == io.EOF {
|
||||
return nil
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
id := pck.GetCommand()
|
||||
method := pck.GetMethod()
|
||||
|
||||
handler := GetCommandHandler(id, method)
|
||||
if handler == nil {
|
||||
return status.Errorf(codes.NotFound, "command not found")
|
||||
}
|
||||
|
||||
conn, err := handler.GetGrpcConn()
|
||||
|
||||
ctx, cancel := context.WithTimeout(g.Context(), time.Second*10)
|
||||
out, err := proto.NewCommandControllerClient(conn).SendCommand(ctx, pck)
|
||||
cancel()
|
||||
|
||||
if err != nil {
|
||||
_ = g.Send(&proto.CommandReturn{
|
||||
IsDelivered: false,
|
||||
Status: http.StatusInternalServerError,
|
||||
ContentType: "text/plain+error",
|
||||
Payload: []byte(err.Error()),
|
||||
})
|
||||
} else {
|
||||
_ = g.Send(out)
|
||||
}
|
||||
}
|
||||
}
|
29
pkg/internal/directory/connect.go
Normal file
29
pkg/internal/directory/connect.go
Normal file
@ -0,0 +1,29 @@
|
||||
package directory
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
health "google.golang.org/grpc/health/grpc_health_v1"
|
||||
"time"
|
||||
)
|
||||
|
||||
func ConnectService(in *ServiceInstance) (*grpc.ClientConn, error) {
|
||||
conn, err := grpc.NewClient(
|
||||
in.GrpcAddr,
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create grpc connection: %v", err)
|
||||
}
|
||||
|
||||
client := health.NewHealthClient(conn)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
|
||||
defer cancel()
|
||||
if _, err = client.Check(ctx, &health.HealthCheckRequest{}); err != nil {
|
||||
return conn, fmt.Errorf("grpc service is down: %v", err)
|
||||
}
|
||||
|
||||
return conn, nil
|
||||
}
|
20
pkg/internal/directory/exts.go
Normal file
20
pkg/internal/directory/exts.go
Normal file
@ -0,0 +1,20 @@
|
||||
package directory
|
||||
|
||||
import (
|
||||
"context"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
func GetClientId(ctx context.Context) (string, error) {
|
||||
var clientId string
|
||||
if md, ok := metadata.FromIncomingContext(ctx); !ok {
|
||||
return clientId, status.Errorf(codes.InvalidArgument, "missing metadata")
|
||||
} else if val, ok := md["client_id"]; !ok || len(val) == 0 {
|
||||
return clientId, status.Errorf(codes.Unauthenticated, "missing client_id in metadata")
|
||||
} else {
|
||||
clientId = val[0]
|
||||
}
|
||||
return clientId, nil
|
||||
}
|
29
pkg/internal/directory/service.go
Normal file
29
pkg/internal/directory/service.go
Normal file
@ -0,0 +1,29 @@
|
||||
package directory
|
||||
|
||||
import "google.golang.org/grpc"
|
||||
|
||||
type ServiceInstance struct {
|
||||
ID string `json:"id"`
|
||||
Type string `json:"type"`
|
||||
Label string `json:"label"`
|
||||
GrpcAddr string `json:"grpc_addr"`
|
||||
HttpAddr *string `json:"http_addr"`
|
||||
|
||||
grpcConn *grpc.ClientConn
|
||||
retryCount int
|
||||
}
|
||||
|
||||
func (v *ServiceInstance) GetGrpcConn() (*grpc.ClientConn, error) {
|
||||
if v.grpcConn != nil {
|
||||
return v.grpcConn, nil
|
||||
}
|
||||
|
||||
var err error
|
||||
v.grpcConn, err = ConnectService(v)
|
||||
if err != nil {
|
||||
RemoveServiceInstance(v.ID)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return v.grpcConn, nil
|
||||
}
|
78
pkg/internal/directory/service_mapping.go
Normal file
78
pkg/internal/directory/service_mapping.go
Normal file
@ -0,0 +1,78 @@
|
||||
package directory
|
||||
|
||||
import (
|
||||
"context"
|
||||
"git.solsynth.dev/hypernet/nexus/pkg/nex"
|
||||
"git.solsynth.dev/hypernet/nexus/pkg/proto"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// In services, we use sync.Map because it will be both often read and write
|
||||
var serviceDirectory sync.Map
|
||||
|
||||
func GetServiceInstance(id string) *ServiceInstance {
|
||||
val, ok := serviceDirectory.Load(id)
|
||||
if ok {
|
||||
return val.(*ServiceInstance)
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func GetServiceInstanceByType(t string) *ServiceInstance {
|
||||
var result *ServiceInstance
|
||||
serviceDirectory.Range(func(key, value any) bool {
|
||||
if value.(*ServiceInstance).Type == t {
|
||||
result = value.(*ServiceInstance)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
})
|
||||
return result
|
||||
}
|
||||
|
||||
func ListServiceInstance() []*ServiceInstance {
|
||||
var result []*ServiceInstance
|
||||
serviceDirectory.Range(func(key, value interface{}) bool {
|
||||
result = append(result, value.(*ServiceInstance))
|
||||
return true
|
||||
})
|
||||
return result
|
||||
}
|
||||
|
||||
func ListServiceInstanceByType(t string) []*ServiceInstance {
|
||||
var result []*ServiceInstance
|
||||
serviceDirectory.Range(func(key, value interface{}) bool {
|
||||
if value.(*ServiceInstance).Type == t {
|
||||
result = append(result, value.(*ServiceInstance))
|
||||
}
|
||||
return true
|
||||
})
|
||||
return result
|
||||
}
|
||||
|
||||
func AddServiceInstance(in *ServiceInstance) {
|
||||
serviceDirectory.Store(in.ID, in)
|
||||
}
|
||||
|
||||
func RemoveServiceInstance(id string) {
|
||||
serviceDirectory.Delete(id)
|
||||
}
|
||||
|
||||
func BroadcastEvent(event string, data any) {
|
||||
serviceDirectory.Range(func(key, value any) bool {
|
||||
conn, err := value.(*ServiceInstance).GetGrpcConn()
|
||||
if err != nil {
|
||||
return true
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
_, _ = proto.NewServiceDirectoryClient(conn).BroadcastEvent(ctx, &proto.EventInfo{
|
||||
Event: event,
|
||||
Data: nex.EncodeMap(data),
|
||||
})
|
||||
return true
|
||||
})
|
||||
}
|
91
pkg/internal/directory/service_rpc.go
Normal file
91
pkg/internal/directory/service_rpc.go
Normal file
@ -0,0 +1,91 @@
|
||||
package directory
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
"git.solsynth.dev/hypernet/nexus/pkg/proto"
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/samber/lo"
|
||||
)
|
||||
|
||||
type ServiceRpcServer struct {
|
||||
proto.UnimplementedServiceDirectoryServer
|
||||
}
|
||||
|
||||
func instantiationService(in *ServiceInstance) *proto.ServiceInfo {
|
||||
if in == nil {
|
||||
return nil
|
||||
}
|
||||
return &proto.ServiceInfo{
|
||||
Id: in.ID,
|
||||
Type: in.Type,
|
||||
Label: in.Label,
|
||||
GrpcAddr: in.GrpcAddr,
|
||||
HttpAddr: in.HttpAddr,
|
||||
}
|
||||
}
|
||||
|
||||
func (v *ServiceRpcServer) GetService(ctx context.Context, request *proto.GetServiceRequest) (*proto.GetServiceResponse, error) {
|
||||
if request.Id != nil {
|
||||
out := GetServiceInstance(request.GetId())
|
||||
return &proto.GetServiceResponse{
|
||||
Data: instantiationService(out),
|
||||
}, nil
|
||||
}
|
||||
if request.Type != nil {
|
||||
out := GetServiceInstanceByType(request.GetType())
|
||||
return &proto.GetServiceResponse{
|
||||
Data: instantiationService(out),
|
||||
}, nil
|
||||
}
|
||||
return nil, fmt.Errorf("no filter condition is provided")
|
||||
}
|
||||
|
||||
func (v *ServiceRpcServer) ListService(ctx context.Context, request *proto.ListServiceRequest) (*proto.ListServiceResponse, error) {
|
||||
var out []*ServiceInstance
|
||||
if request.Type != nil {
|
||||
out = ListServiceInstanceByType(request.GetType())
|
||||
} else {
|
||||
out = ListServiceInstance()
|
||||
}
|
||||
return &proto.ListServiceResponse{
|
||||
Data: lo.Map(out, func(item *ServiceInstance, index int) *proto.ServiceInfo {
|
||||
return instantiationService(item)
|
||||
}),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (v *ServiceRpcServer) AddService(ctx context.Context, info *proto.ServiceInfo) (*proto.AddServiceResponse, error) {
|
||||
clientId, err := GetClientId(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if info.GetId() != clientId {
|
||||
return nil, status.Errorf(codes.InvalidArgument, "client_id mismatch in metadata")
|
||||
}
|
||||
|
||||
in := &ServiceInstance{
|
||||
ID: clientId,
|
||||
Type: info.GetType(),
|
||||
Label: info.GetLabel(),
|
||||
GrpcAddr: info.GetGrpcAddr(),
|
||||
HttpAddr: info.HttpAddr,
|
||||
}
|
||||
AddServiceInstance(in)
|
||||
log.Info().Str("id", clientId).Str("label", info.GetLabel()).Msg("New service registered")
|
||||
return &proto.AddServiceResponse{
|
||||
IsSuccess: true,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (v *ServiceRpcServer) RemoveService(ctx context.Context, request *proto.RemoveServiceRequest) (*proto.RemoveServiceResponse, error) {
|
||||
RemoveServiceInstance(request.GetId())
|
||||
log.Info().Str("id", request.GetId()).Msg("A service removed.")
|
||||
return &proto.RemoveServiceResponse{
|
||||
IsSuccess: true,
|
||||
}, nil
|
||||
}
|
@ -1,9 +1,9 @@
|
||||
package grpc
|
||||
|
||||
import (
|
||||
directory2 "git.solsynth.dev/hypernet/nexus/pkg/internal/directory"
|
||||
"net"
|
||||
|
||||
"git.solsynth.dev/hypernet/nexus/pkg/directory"
|
||||
"git.solsynth.dev/hypernet/nexus/pkg/proto"
|
||||
|
||||
"google.golang.org/grpc/reflection"
|
||||
@ -27,8 +27,8 @@ func NewServer() *Server {
|
||||
srv: grpc.NewServer(),
|
||||
}
|
||||
|
||||
proto.RegisterServiceDirectoryServer(server.srv, &directory.ServiceRpcServer{})
|
||||
proto.RegisterCommandControllerServer(server.srv, &directory.CommandRpcServer{})
|
||||
proto.RegisterServiceDirectoryServer(server.srv, &directory2.ServiceRpcServer{})
|
||||
proto.RegisterCommandControllerServer(server.srv, &directory2.CommandRpcServer{})
|
||||
proto.RegisterDatabaseControllerServer(server.srv, server)
|
||||
proto.RegisterStreamControllerServer(server.srv, server)
|
||||
health.RegisterHealthServer(server.srv, server)
|
||||
|
@ -3,7 +3,7 @@ package grpc
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"git.solsynth.dev/hypernet/nexus/pkg/http/ws"
|
||||
"git.solsynth.dev/hypernet/nexus/pkg/internal/http/ws"
|
||||
|
||||
"git.solsynth.dev/hypernet/nexus/pkg/proto"
|
||||
"github.com/samber/lo"
|
||||
|
7
pkg/internal/http/api/check_ip.go
Normal file
7
pkg/internal/http/api/check_ip.go
Normal file
@ -0,0 +1,7 @@
|
||||
package api
|
||||
|
||||
import "github.com/gofiber/fiber/v2"
|
||||
|
||||
func getClientIP(c *fiber.Ctx) error {
|
||||
return c.SendString(c.IP())
|
||||
}
|
67
pkg/internal/http/api/command.go
Normal file
67
pkg/internal/http/api/command.go
Normal file
@ -0,0 +1,67 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"git.solsynth.dev/hypernet/nexus/pkg/internal/directory"
|
||||
"git.solsynth.dev/hypernet/nexus/pkg/proto"
|
||||
"github.com/gofiber/fiber/v2"
|
||||
"github.com/rs/zerolog/log"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
func invokeCommand(c *fiber.Ctx) error {
|
||||
command := c.Params("command")
|
||||
method := strings.ToLower(c.Method())
|
||||
|
||||
handler := directory.GetCommandHandler(command, method)
|
||||
if handler == nil {
|
||||
return fiber.NewError(fiber.StatusNotFound, "command not found")
|
||||
}
|
||||
|
||||
conn, err := handler.GetGrpcConn()
|
||||
if err != nil {
|
||||
return fiber.NewError(fiber.StatusServiceUnavailable, "service unavailable")
|
||||
}
|
||||
|
||||
log.Debug().Str("id", command).Str("method", method).Msg("Invoking command from HTTP Gateway...")
|
||||
|
||||
var meta []string
|
||||
meta = append(meta, "client_id", "http-gateway")
|
||||
meta = append(meta, "net.ip", c.IP())
|
||||
meta = append(meta, "http.user_agent", c.Get(fiber.HeaderUserAgent))
|
||||
for k, v := range c.GetReqHeaders() {
|
||||
meta = append(
|
||||
meta,
|
||||
strings.ToLower(fmt.Sprintf("header.%s", strings.ReplaceAll(k, "-", "_"))),
|
||||
strings.Join(v, "\n"),
|
||||
)
|
||||
}
|
||||
|
||||
for k, v := range c.Queries() {
|
||||
meta = append(
|
||||
meta,
|
||||
strings.ToLower(fmt.Sprintf("query.%s", strings.ReplaceAll(k, "-", "_"))),
|
||||
v,
|
||||
)
|
||||
}
|
||||
|
||||
ctx := metadata.AppendToOutgoingContext(c.Context(), meta...)
|
||||
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
|
||||
defer cancel()
|
||||
|
||||
out, err := proto.NewCommandControllerClient(conn).SendCommand(ctx, &proto.CommandArgument{
|
||||
Command: command,
|
||||
Method: method,
|
||||
Payload: c.Body(),
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return fiber.NewError(fiber.StatusInternalServerError, err.Error())
|
||||
} else {
|
||||
c.Set(fiber.HeaderContentType, out.ContentType)
|
||||
return c.Status(int(out.Status)).Send(out.Payload)
|
||||
}
|
||||
}
|
19
pkg/internal/http/api/directory.go
Normal file
19
pkg/internal/http/api/directory.go
Normal file
@ -0,0 +1,19 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
directory2 "git.solsynth.dev/hypernet/nexus/pkg/internal/directory"
|
||||
"github.com/gofiber/fiber/v2"
|
||||
"github.com/samber/lo"
|
||||
)
|
||||
|
||||
func listExistsService(c *fiber.Ctx) error {
|
||||
services := directory2.ListServiceInstance()
|
||||
|
||||
return c.JSON(lo.Map(services, func(item *directory2.ServiceInstance, index int) map[string]any {
|
||||
return map[string]any{
|
||||
"id": item.ID,
|
||||
"type": item.Type,
|
||||
"label": item.Label,
|
||||
}
|
||||
}))
|
||||
}
|
41
pkg/internal/http/api/forward.go
Normal file
41
pkg/internal/http/api/forward.go
Normal file
@ -0,0 +1,41 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"git.solsynth.dev/hypernet/nexus/pkg/internal/directory"
|
||||
"github.com/gofiber/fiber/v2"
|
||||
"github.com/gofiber/fiber/v2/middleware/proxy"
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/spf13/viper"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func forwardService(c *fiber.Ctx) error {
|
||||
serviceType := c.Params("service")
|
||||
ogKeyword := serviceType
|
||||
|
||||
aliasingMap := viper.GetStringMapString("services.aliases")
|
||||
if val, ok := aliasingMap[serviceType]; ok {
|
||||
serviceType = val
|
||||
}
|
||||
|
||||
service := directory.GetServiceInstanceByType(serviceType)
|
||||
|
||||
if service == nil || service.HttpAddr == nil {
|
||||
return fiber.NewError(fiber.StatusNotFound, "service not found")
|
||||
}
|
||||
|
||||
ogUrl := c.Request().URI().String()
|
||||
url := c.OriginalURL()
|
||||
url = strings.Replace(url, "/cgi/"+ogKeyword, "", 1)
|
||||
url = *service.HttpAddr + url
|
||||
|
||||
log.Debug().
|
||||
Str("from", ogUrl).
|
||||
Str("to", url).
|
||||
Str("service", serviceType).
|
||||
Str("id", service.ID).
|
||||
Msg("Forwarding request for service...")
|
||||
|
||||
return proxy.Do(c, url)
|
||||
|
||||
}
|
30
pkg/internal/http/api/index.go
Normal file
30
pkg/internal/http/api/index.go
Normal file
@ -0,0 +1,30 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"git.solsynth.dev/hypernet/nexus/pkg/internal/http/ws"
|
||||
"github.com/gofiber/contrib/websocket"
|
||||
"github.com/gofiber/fiber/v2"
|
||||
)
|
||||
|
||||
func MapAPIs(app *fiber.App) {
|
||||
// Some built-in public-accessible APIs
|
||||
wellKnown := app.Group("/.well-known").Name("Well Known")
|
||||
{
|
||||
wellKnown.Get("/", func(c *fiber.Ctx) error {
|
||||
return c.SendStatus(fiber.StatusOK)
|
||||
})
|
||||
wellKnown.Get("/check-ip", getClientIP)
|
||||
wellKnown.Get("/directory/services", listExistsService)
|
||||
}
|
||||
|
||||
// Common websocket gateway
|
||||
app.Use(func(c *fiber.Ctx) error {
|
||||
/*if err := exts.EnsureAuthenticated(c); err != nil {
|
||||
return err
|
||||
}*/
|
||||
return c.Next()
|
||||
}).Get("/ws", websocket.New(ws.Listen))
|
||||
|
||||
app.All("/inv/:command", invokeCommand)
|
||||
app.All("/cgi/:service/*", forwardService)
|
||||
}
|
18
pkg/internal/http/exts/request.go
Normal file
18
pkg/internal/http/exts/request.go
Normal file
@ -0,0 +1,18 @@
|
||||
package exts
|
||||
|
||||
import (
|
||||
"github.com/go-playground/validator/v10"
|
||||
"github.com/gofiber/fiber/v2"
|
||||
)
|
||||
|
||||
var validation = validator.New(validator.WithRequiredStructEnabled())
|
||||
|
||||
func BindAndValidate(c *fiber.Ctx, out any) error {
|
||||
if err := c.BodyParser(out); err != nil {
|
||||
return fiber.NewError(fiber.StatusBadRequest, err.Error())
|
||||
} else if err := validation.Struct(out); err != nil {
|
||||
return fiber.NewError(fiber.StatusBadRequest, err.Error())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
54
pkg/internal/http/server.go
Normal file
54
pkg/internal/http/server.go
Normal file
@ -0,0 +1,54 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"git.solsynth.dev/hypernet/nexus/pkg/internal/http/api"
|
||||
"github.com/goccy/go-json"
|
||||
"github.com/gofiber/fiber/v2"
|
||||
"github.com/gofiber/fiber/v2/middleware/cors"
|
||||
"github.com/gofiber/fiber/v2/middleware/idempotency"
|
||||
"github.com/gofiber/fiber/v2/middleware/logger"
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/spf13/viper"
|
||||
)
|
||||
|
||||
type HTTPApp struct {
|
||||
app *fiber.App
|
||||
}
|
||||
|
||||
func NewServer() *HTTPApp {
|
||||
app := fiber.New(fiber.Config{
|
||||
DisableStartupMessage: true,
|
||||
EnableIPValidation: true,
|
||||
ServerHeader: "Hypernet.Nexus",
|
||||
AppName: "Hypernet.Nexus",
|
||||
ProxyHeader: fiber.HeaderXForwardedFor,
|
||||
JSONEncoder: json.Marshal,
|
||||
JSONDecoder: json.Unmarshal,
|
||||
BodyLimit: 512 * 1024 * 1024 * 1024, // 512 TiB
|
||||
EnablePrintRoutes: viper.GetBool("debug.print_routes"),
|
||||
})
|
||||
|
||||
app.Use(idempotency.New())
|
||||
app.Use(cors.New(cors.Config{
|
||||
AllowCredentials: true,
|
||||
AllowMethods: "GET,POST,HEAD,PUT,DELETE,PATCH",
|
||||
AllowOriginsFunc: func(origin string) bool {
|
||||
return true
|
||||
},
|
||||
}))
|
||||
|
||||
app.Use(logger.New(logger.Config{
|
||||
Format: "${status} | ${latency} | ${method} ${path}\n",
|
||||
Output: log.Logger,
|
||||
}))
|
||||
|
||||
api.MapAPIs(app)
|
||||
|
||||
return &HTTPApp{app}
|
||||
}
|
||||
|
||||
func (v *HTTPApp) Listen() {
|
||||
if err := v.app.Listen(viper.GetString("bind")); err != nil {
|
||||
log.Fatal().Err(err).Msg("An error occurred when starting server...")
|
||||
}
|
||||
}
|
106
pkg/internal/http/ws/connections.go
Normal file
106
pkg/internal/http/ws/connections.go
Normal file
@ -0,0 +1,106 @@
|
||||
package ws
|
||||
|
||||
import (
|
||||
"git.solsynth.dev/hypernet/nexus/pkg/internal/directory"
|
||||
"math/rand"
|
||||
"sync"
|
||||
|
||||
"git.solsynth.dev/hypernet/nexus/pkg/internal/models"
|
||||
"github.com/gofiber/contrib/websocket"
|
||||
)
|
||||
|
||||
var (
|
||||
wsMutex sync.Mutex
|
||||
wsConn = make(map[uint]map[uint64]*websocket.Conn)
|
||||
)
|
||||
|
||||
func ClientRegister(user models.Account, conn *websocket.Conn) uint64 {
|
||||
wsMutex.Lock()
|
||||
if wsConn[user.ID] == nil {
|
||||
wsConn[user.ID] = make(map[uint64]*websocket.Conn)
|
||||
}
|
||||
clientId := rand.Uint64()
|
||||
wsConn[user.ID][clientId] = conn
|
||||
wsMutex.Unlock()
|
||||
|
||||
directory.BroadcastEvent("ws.client.register", map[string]any{
|
||||
"user": user.ID,
|
||||
"id": clientId,
|
||||
})
|
||||
|
||||
return clientId
|
||||
}
|
||||
|
||||
func ClientUnregister(user models.Account, id uint64) {
|
||||
wsMutex.Lock()
|
||||
if wsConn[user.ID] == nil {
|
||||
wsConn[user.ID] = make(map[uint64]*websocket.Conn)
|
||||
}
|
||||
delete(wsConn[user.ID], id)
|
||||
wsMutex.Unlock()
|
||||
|
||||
directory.BroadcastEvent("ws.client.unregister", map[string]any{
|
||||
"user": user.ID,
|
||||
"id": id,
|
||||
})
|
||||
}
|
||||
|
||||
func ClientCount(uid uint) int {
|
||||
return len(wsConn[uid])
|
||||
}
|
||||
|
||||
func WebsocketPush(uid uint, body []byte) (count int, success int, errs []error) {
|
||||
for _, conn := range wsConn[uid] {
|
||||
if err := conn.WriteMessage(1, body); err != nil {
|
||||
errs = append(errs, err)
|
||||
} else {
|
||||
success++
|
||||
}
|
||||
count++
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func WebsocketPushDirect(clientId uint64, body []byte) (count int, success int, errs []error) {
|
||||
for _, m := range wsConn {
|
||||
if conn, ok := m[clientId]; ok {
|
||||
if err := conn.WriteMessage(1, body); err != nil {
|
||||
errs = append(errs, err)
|
||||
} else {
|
||||
success++
|
||||
}
|
||||
count++
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func WebsocketPushBatch(uidList []uint, body []byte) (count int, success int, errs []error) {
|
||||
for _, uid := range uidList {
|
||||
for _, conn := range wsConn[uid] {
|
||||
if err := conn.WriteMessage(1, body); err != nil {
|
||||
errs = append(errs, err)
|
||||
} else {
|
||||
success++
|
||||
}
|
||||
count++
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func WebsocketPushBatchDirect(clientIdList []uint64, body []byte) (count int, success int, errs []error) {
|
||||
for _, clientId := range clientIdList {
|
||||
for _, m := range wsConn {
|
||||
if conn, ok := m[clientId]; ok {
|
||||
if err := conn.WriteMessage(1, body); err != nil {
|
||||
errs = append(errs, err)
|
||||
} else {
|
||||
success++
|
||||
}
|
||||
count++
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
85
pkg/internal/http/ws/ws.go
Normal file
85
pkg/internal/http/ws/ws.go
Normal file
@ -0,0 +1,85 @@
|
||||
package ws
|
||||
|
||||
import (
|
||||
"git.solsynth.dev/hypernet/nexus/pkg/internal/models"
|
||||
"git.solsynth.dev/hypernet/nexus/pkg/nex"
|
||||
"github.com/gofiber/contrib/websocket"
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/spf13/viper"
|
||||
)
|
||||
|
||||
func Listen(c *websocket.Conn) {
|
||||
user := c.Locals("user").(models.Account)
|
||||
|
||||
// Push connection
|
||||
clientId := ClientRegister(user, c)
|
||||
log.Debug().
|
||||
Uint("user", user.ID).
|
||||
Uint64("clientId", clientId).
|
||||
Msg("New websocket connection established...")
|
||||
|
||||
// Event loop
|
||||
var mt int
|
||||
var data []byte
|
||||
var err error
|
||||
|
||||
var packet nex.WebSocketPackage
|
||||
|
||||
for {
|
||||
if mt, data, err = c.ReadMessage(); err != nil {
|
||||
break
|
||||
} else if err := jsoniter.Unmarshal(data, &packet); err != nil {
|
||||
_ = c.WriteMessage(mt, nex.WebSocketPackage{
|
||||
Action: "error",
|
||||
Message: "unable to unmarshal your command, requires json request",
|
||||
}.Marshal())
|
||||
continue
|
||||
}
|
||||
|
||||
aliasingMap := viper.GetStringMapString("services.aliases")
|
||||
if val, ok := aliasingMap[packet.Endpoint]; ok {
|
||||
packet.Endpoint = val
|
||||
}
|
||||
|
||||
/*
|
||||
service := directory.GetServiceInstanceByType(packet.Endpoint)
|
||||
if service == nil {
|
||||
_ = c.WriteMessage(mt, nex.NetworkPackage{
|
||||
Action: "error",
|
||||
Message: "service not found",
|
||||
}.Marshal())
|
||||
continue
|
||||
}
|
||||
pc, err := service.GetGrpcConn()
|
||||
if err != nil {
|
||||
_ = c.WriteMessage(mt, nex.NetworkPackage{
|
||||
Action: "error",
|
||||
Message: fmt.Sprintf("unable to connect to service: %v", err.Error()),
|
||||
}.Marshal())
|
||||
continue
|
||||
}
|
||||
|
||||
sc := proto.NewStreamControllerClient(pc)
|
||||
_, err = sc.EmitStreamEvent(context.Background(), &proto.StreamEventRequest{
|
||||
Event: packet.Action,
|
||||
UserId: uint64(user.ID),
|
||||
ClientId: uint64(clientId),
|
||||
Payload: packet.RawPayload(),
|
||||
})
|
||||
if err != nil {
|
||||
_ = c.WriteMessage(mt, nex.NetworkPackage{
|
||||
Action: "error",
|
||||
Message: fmt.Sprintf("unable send message to service: %v", err.Error()),
|
||||
}.Marshal())
|
||||
continue
|
||||
}*/
|
||||
}
|
||||
|
||||
// Pop connection
|
||||
ClientUnregister(user, clientId)
|
||||
log.Debug().
|
||||
Uint("user", user.ID).
|
||||
Uint64("clientId", clientId).
|
||||
Msg("A websocket connection disconnected...")
|
||||
}
|
Reference in New Issue
Block a user