diff --git a/pkg/internal/directory/command.go b/pkg/internal/directory/command.go index cbe6732..85de093 100644 --- a/pkg/internal/directory/command.go +++ b/pkg/internal/directory/command.go @@ -18,5 +18,5 @@ type Command struct { // The implementation of the command, the handler is the service that will be invoked Handler []*ServiceInstance `json:"handler"` - robinIndex uint + RobinIndex uint `json:"robin_index"` } diff --git a/pkg/internal/directory/command_mapping.go b/pkg/internal/directory/command_mapping.go index 727ea97..8352f53 100644 --- a/pkg/internal/directory/command_mapping.go +++ b/pkg/internal/directory/command_mapping.go @@ -1,67 +1,79 @@ package directory import ( + "context" + "git.solsynth.dev/hypernet/nexus/pkg/internal/kv" "git.solsynth.dev/hypernet/nexus/pkg/nex" + "github.com/goccy/go-json" "github.com/rs/zerolog/log" "github.com/samber/lo" - "strings" - "sync" ) -// In commands, we use the map and the mutex because it is usually read and only sometimes write -var commandDirectory = make(map[string]*Command) -var commandDirectoryMutex sync.Mutex - -func AddCommand(id, method string, tags []string, handler *ServiceInstance) { - commandDirectoryMutex.Lock() - defer commandDirectoryMutex.Unlock() +const CommandInfoKvPrefix = "nexus.command/" +func AddCommand(id, method string, tags []string, handler *ServiceInstance) error { if tags == nil { tags = make([]string, 0) } - ky := nex.GetCommandKey(id, method) - if _, ok := commandDirectory[ky]; !ok { - commandDirectory[ky] = &Command{ - ID: id, - Method: method, - Tags: tags, - Handler: []*ServiceInstance{handler}, - } - } else { - commandDirectory[ky].Handler = append(commandDirectory[ky].Handler, handler) - commandDirectory[ky].Tags = lo.Uniq(append(commandDirectory[ky].Tags, tags...)) + ky := CommandInfoKvPrefix + nex.GetCommandKey(id, method) + + command := &Command{ + ID: id, + Method: method, + Tags: tags, + Handler: []*ServiceInstance{handler}, } - commandDirectory[ky].Handler = lo.UniqBy(commandDirectory[ky].Handler, func(item *ServiceInstance) string { + command.Handler = lo.UniqBy(command.Handler, func(item *ServiceInstance) string { return item.ID }) - log.Info().Str("id", id).Str("method", method).Str("tags", strings.Join(tags, ",")).Msg("New command registered") + commandJSON, err := json.Marshal(command) + if err != nil { + log.Printf("Error marshaling command: %v", err) + return nil + } + + _, err = kv.Kv.Put(context.Background(), ky, string(commandJSON)) + return err } func GetCommandHandler(id, method string) *ServiceInstance { - commandDirectoryMutex.Lock() - defer commandDirectoryMutex.Unlock() + ky := CommandInfoKvPrefix + nex.GetCommandKey(id, method) - ky := nex.GetCommandKey(id, method) - if val, ok := commandDirectory[ky]; ok { - if len(val.Handler) == 0 { - return nil - } - - idx := val.robinIndex % uint(len(val.Handler)) - val.robinIndex = idx + 1 - return val.Handler[idx] + resp, err := kv.Kv.Get(context.Background(), ky) + if err != nil { + return nil } - return nil + if len(resp.Kvs) == 0 { + return nil + } + + var command Command + if err := json.Unmarshal(resp.Kvs[0].Value, &command); err != nil { + return nil + } + + if len(command.Handler) == 0 { + return nil + } + + idx := command.RobinIndex % uint(len(command.Handler)) + command.RobinIndex = idx + 1 + + raw, err := json.Marshal(&command) + if err == nil { + _, _ = kv.Kv.Put(context.Background(), ky, string(raw)) + } + + return command.Handler[idx] } -func RemoveCommand(id, method string) { - commandDirectoryMutex.Lock() - defer commandDirectoryMutex.Unlock() +func RemoveCommand(id, method string) error { + ky := CommandInfoKvPrefix + nex.GetCommandKey(id, method) - ky := nex.GetCommandKey(id, method) - delete(commandDirectory, ky) + _, err := kv.Kv.Delete(context.Background(), ky) + return err } diff --git a/pkg/internal/directory/service_mapping.go b/pkg/internal/directory/service_mapping.go index 66c79a5..7ed88bb 100644 --- a/pkg/internal/directory/service_mapping.go +++ b/pkg/internal/directory/service_mapping.go @@ -2,77 +2,137 @@ package directory import ( "context" + "git.solsynth.dev/hypernet/nexus/pkg/internal/kv" "git.solsynth.dev/hypernet/nexus/pkg/nex" "git.solsynth.dev/hypernet/nexus/pkg/proto" - "sync" + "github.com/goccy/go-json" + clientv3 "go.etcd.io/etcd/client/v3" + "math/rand" "time" ) -// In services, we use sync.Map because it will be both often read and write -var serviceDirectory sync.Map +const ServiceInfoKvPrefix = "nexus.service/" -func GetServiceInstance(id string) *ServiceInstance { - val, ok := serviceDirectory.Load(id) - if ok { - return val.(*ServiceInstance) - } else { - return nil +func AddServiceInstance(in *ServiceInstance) error { + key := ServiceInfoKvPrefix + in.ID + data, err := json.Marshal(in) + if err != nil { + return err } + + _, err = kv.Kv.Put(context.Background(), key, string(data)) + return err } -func GetServiceInstanceByType(t string) *ServiceInstance { - var result *ServiceInstance - serviceDirectory.Range(func(key, value any) bool { - if value.(*ServiceInstance).Type == t { - result = value.(*ServiceInstance) - return false - } - return true - }) - return result +func GetServiceInstance(id string) *ServiceInstance { + key := ServiceInfoKvPrefix + id + resp, err := kv.Kv.Get(context.Background(), key) + if err != nil || len(resp.Kvs) == 0 { + return nil + } + + var instance ServiceInstance + err = json.Unmarshal(resp.Kvs[0].Value, &instance) + if err != nil { + return nil + } + + return &instance } func ListServiceInstance() []*ServiceInstance { + resp, err := kv.Kv.Get(context.Background(), ServiceInfoKvPrefix, clientv3.WithPrefix()) + if err != nil { + return nil + } + var result []*ServiceInstance - serviceDirectory.Range(func(key, value interface{}) bool { - result = append(result, value.(*ServiceInstance)) - return true - }) + for _, val := range resp.Kvs { + var instance ServiceInstance + if err := json.Unmarshal(val.Value, &instance); err != nil { + continue + } + result = append(result, &instance) + } return result } func ListServiceInstanceByType(t string) []*ServiceInstance { + resp, err := kv.Kv.Get(context.Background(), ServiceInfoKvPrefix, clientv3.WithPrefix()) + if err != nil { + return nil + } + var result []*ServiceInstance - serviceDirectory.Range(func(key, value interface{}) bool { - if value.(*ServiceInstance).Type == t { - result = append(result, value.(*ServiceInstance)) + for _, val := range resp.Kvs { + var instance ServiceInstance + if err := json.Unmarshal(val.Value, &instance); err != nil { + continue } - return true - }) + if instance.Type == t { + result = append(result, &instance) + } + } return result } -func AddServiceInstance(in *ServiceInstance) { - serviceDirectory.Store(in.ID, in) +var srvRng = rand.New(rand.NewSource(time.Now().UnixNano())) + +func GetServiceInstanceByType(t string) *ServiceInstance { + resp, err := kv.Kv.Get(context.Background(), ServiceInfoKvPrefix, clientv3.WithPrefix()) + if err != nil { + return nil + } + + var instances []*ServiceInstance + for _, val := range resp.Kvs { + var instance ServiceInstance + if err := json.Unmarshal(val.Value, &instance); err != nil { + continue + } + if instance.Type == t { + instances = append(instances, &instance) + } + } + + if len(instances) == 0 { + return nil + } + + idx := srvRng.Intn(len(instances)) + return instances[idx] } -func RemoveServiceInstance(id string) { - serviceDirectory.Delete(id) +func RemoveServiceInstance(id string) error { + key := ServiceInfoKvPrefix + id + _, err := kv.Kv.Delete(context.Background(), key) + return err } -func BroadcastEvent(event string, data any) { - serviceDirectory.Range(func(key, value any) bool { - conn, err := value.(*ServiceInstance).GetGrpcConn() +func BroadcastEvent(event string, data any) error { + resp, err := kv.Kv.Get(context.Background(), ServiceInfoKvPrefix, clientv3.WithPrefix()) + if err != nil { + return err + } + + for _, val := range resp.Kvs { + var instance ServiceInstance + if err := json.Unmarshal(val.Value, &instance); err != nil { + continue + } + + conn, err := instance.GetGrpcConn() if err != nil { - return true + continue } ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() _, _ = proto.NewDirectoryServiceClient(conn).BroadcastEvent(ctx, &proto.EventInfo{ Event: event, Data: nex.EncodeMap(data), }) - return true - }) + cancel() + } + + return nil } diff --git a/pkg/internal/directory/service_rpc.go b/pkg/internal/directory/service_rpc.go index a140aca..9d140b7 100644 --- a/pkg/internal/directory/service_rpc.go +++ b/pkg/internal/directory/service_rpc.go @@ -75,17 +75,25 @@ func (v *ServiceRpcServer) AddService(ctx context.Context, info *proto.ServiceIn GrpcAddr: info.GetGrpcAddr(), HttpAddr: info.HttpAddr, } - AddServiceInstance(in) - log.Info().Str("id", clientId).Str("label", info.GetLabel()).Msg("New service registered") + err = AddServiceInstance(in) + if err == nil { + log.Info().Str("id", clientId).Str("label", info.GetLabel()).Msg("New service registered") + } else { + log.Error().Str("id", clientId).Str("label", info.GetLabel()).Err(err).Msg("Unable to register a service") + } return &proto.AddServiceResponse{ - IsSuccess: true, + IsSuccess: err == nil, }, nil } func (v *ServiceRpcServer) RemoveService(ctx context.Context, request *proto.RemoveServiceRequest) (*proto.RemoveServiceResponse, error) { - RemoveServiceInstance(request.GetId()) - log.Info().Str("id", request.GetId()).Msg("A service removed.") + err := RemoveServiceInstance(request.GetId()) + if err == nil { + log.Info().Str("id", request.GetId()).Msg("A service removed") + } else { + log.Error().Str("id", request.GetId()).Err(err).Msg("Unable to remove a service") + } return &proto.RemoveServiceResponse{ - IsSuccess: true, + IsSuccess: err == nil, }, nil } diff --git a/pkg/internal/http/api/directory.go b/pkg/internal/http/api/directory.go index 3e79290..07bfe41 100644 --- a/pkg/internal/http/api/directory.go +++ b/pkg/internal/http/api/directory.go @@ -1,15 +1,15 @@ package api import ( - directory2 "git.solsynth.dev/hypernet/nexus/pkg/internal/directory" + "git.solsynth.dev/hypernet/nexus/pkg/internal/directory" "github.com/gofiber/fiber/v2" "github.com/samber/lo" ) func listExistsService(c *fiber.Ctx) error { - services := directory2.ListServiceInstance() + services := directory.ListServiceInstance() - return c.JSON(lo.Map(services, func(item *directory2.ServiceInstance, index int) map[string]any { + return c.JSON(lo.Map(services, func(item *directory.ServiceInstance, index int) map[string]any { return map[string]any{ "id": item.ID, "type": item.Type, diff --git a/pkg/internal/kv/etcd.go b/pkg/internal/kv/etcd.go index a1e6632..fdc58a7 100644 --- a/pkg/internal/kv/etcd.go +++ b/pkg/internal/kv/etcd.go @@ -1,6 +1,10 @@ package kv import ( + "context" + "fmt" + "github.com/rs/zerolog/log" + "github.com/samber/lo" clientv3 "go.etcd.io/etcd/client/v3" "time" ) @@ -12,8 +16,22 @@ func ConnectEtcd(endpoints []string) error { Endpoints: endpoints, DialTimeout: 10 * time.Second, }) - if err == nil { - Kv = conn + if err != nil { + return err } + var status []bool + for _, endpoint := range endpoints { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + _, err := conn.Status(ctx, endpoint) + if err != nil { + log.Warn().Str("endpoint", endpoint).Err(err).Msg("An KV endpoint is not available...") + } + status = append(status, err == nil) + cancel() + } + if len(lo.Filter(status, func(s bool, _ int) bool { return s })) == 0 { + return fmt.Errorf("unable to connect to all KV endpoints") + } + Kv = conn return err } diff --git a/pkg/main.go b/pkg/main.go index 0cd2840..67607db 100644 --- a/pkg/main.go +++ b/pkg/main.go @@ -47,12 +47,16 @@ func main() { } // Connect to kv (etcd) + log.Info().Msg("Connecting to kv (etcd)...") if err := kv.ConnectEtcd(viper.GetStringSlice("kv.endpoints")); err != nil { log.Error().Err(err).Msg("An error occurred when connecting to kv (etcd), please check your configuration in kv section.") log.Fatal().Msg("Kv is required for service discovery and directory feature, cannot be disabled.") + } else { + log.Info().Msg("Connected to kv (etcd)!") } // Connect to database + log.Info().Msg("Connecting to database...") if db, err := database.Connect(viper.GetString("database.dsn")); err != nil { log.Error().Err(err).Msg("An error occurred when connecting to database. Database related features will be disabled.") } else { @@ -62,7 +66,7 @@ func main() { log.Error().Err(err).Msg("An error occurred when querying database version. Database related features will be disabled.") database.Kdb = nil } else { - log.Info().Str("version", version).Msg("Connected to database") + log.Info().Str("version", version).Msg("Connected to database!") } }