🧱 Introduce etcd for high availability (HA)

This commit is contained in:
LittleSheep 2024-10-21 22:47:31 +08:00
parent f6ff7178b9
commit 85783aa331
7 changed files with 193 additions and 91 deletions

View File

@ -18,5 +18,5 @@ type Command struct {
// The implementation of the command, the handler is the service that will be invoked // The implementation of the command, the handler is the service that will be invoked
Handler []*ServiceInstance `json:"handler"` Handler []*ServiceInstance `json:"handler"`
robinIndex uint RobinIndex uint `json:"robin_index"`
} }

View File

@ -1,67 +1,79 @@
package directory package directory
import ( import (
"context"
"git.solsynth.dev/hypernet/nexus/pkg/internal/kv"
"git.solsynth.dev/hypernet/nexus/pkg/nex" "git.solsynth.dev/hypernet/nexus/pkg/nex"
"github.com/goccy/go-json"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"github.com/samber/lo" "github.com/samber/lo"
"strings"
"sync"
) )
// In commands, we use the map and the mutex because it is usually read and only sometimes write const CommandInfoKvPrefix = "nexus.command/"
var commandDirectory = make(map[string]*Command)
var commandDirectoryMutex sync.Mutex
func AddCommand(id, method string, tags []string, handler *ServiceInstance) {
commandDirectoryMutex.Lock()
defer commandDirectoryMutex.Unlock()
func AddCommand(id, method string, tags []string, handler *ServiceInstance) error {
if tags == nil { if tags == nil {
tags = make([]string, 0) tags = make([]string, 0)
} }
ky := nex.GetCommandKey(id, method) ky := CommandInfoKvPrefix + nex.GetCommandKey(id, method)
if _, ok := commandDirectory[ky]; !ok {
commandDirectory[ky] = &Command{ command := &Command{
ID: id, ID: id,
Method: method, Method: method,
Tags: tags, Tags: tags,
Handler: []*ServiceInstance{handler}, Handler: []*ServiceInstance{handler},
} }
} else {
commandDirectory[ky].Handler = append(commandDirectory[ky].Handler, handler)
commandDirectory[ky].Tags = lo.Uniq(append(commandDirectory[ky].Tags, tags...))
}
commandDirectory[ky].Handler = lo.UniqBy(commandDirectory[ky].Handler, func(item *ServiceInstance) string { command.Handler = lo.UniqBy(command.Handler, func(item *ServiceInstance) string {
return item.ID return item.ID
}) })
log.Info().Str("id", id).Str("method", method).Str("tags", strings.Join(tags, ",")).Msg("New command registered") commandJSON, err := json.Marshal(command)
if err != nil {
log.Printf("Error marshaling command: %v", err)
return nil
}
_, err = kv.Kv.Put(context.Background(), ky, string(commandJSON))
return err
} }
func GetCommandHandler(id, method string) *ServiceInstance { func GetCommandHandler(id, method string) *ServiceInstance {
commandDirectoryMutex.Lock() ky := CommandInfoKvPrefix + nex.GetCommandKey(id, method)
defer commandDirectoryMutex.Unlock()
ky := nex.GetCommandKey(id, method) resp, err := kv.Kv.Get(context.Background(), ky)
if val, ok := commandDirectory[ky]; ok { if err != nil {
if len(val.Handler) == 0 {
return nil return nil
} }
idx := val.robinIndex % uint(len(val.Handler)) if len(resp.Kvs) == 0 {
val.robinIndex = idx + 1
return val.Handler[idx]
}
return nil return nil
} }
func RemoveCommand(id, method string) { var command Command
commandDirectoryMutex.Lock() if err := json.Unmarshal(resp.Kvs[0].Value, &command); err != nil {
defer commandDirectoryMutex.Unlock() return nil
}
ky := nex.GetCommandKey(id, method)
delete(commandDirectory, ky) if len(command.Handler) == 0 {
return nil
}
idx := command.RobinIndex % uint(len(command.Handler))
command.RobinIndex = idx + 1
raw, err := json.Marshal(&command)
if err == nil {
_, _ = kv.Kv.Put(context.Background(), ky, string(raw))
}
return command.Handler[idx]
}
func RemoveCommand(id, method string) error {
ky := CommandInfoKvPrefix + nex.GetCommandKey(id, method)
_, err := kv.Kv.Delete(context.Background(), ky)
return err
} }

View File

@ -2,77 +2,137 @@ package directory
import ( import (
"context" "context"
"git.solsynth.dev/hypernet/nexus/pkg/internal/kv"
"git.solsynth.dev/hypernet/nexus/pkg/nex" "git.solsynth.dev/hypernet/nexus/pkg/nex"
"git.solsynth.dev/hypernet/nexus/pkg/proto" "git.solsynth.dev/hypernet/nexus/pkg/proto"
"sync" "github.com/goccy/go-json"
clientv3 "go.etcd.io/etcd/client/v3"
"math/rand"
"time" "time"
) )
// In services, we use sync.Map because it will be both often read and write const ServiceInfoKvPrefix = "nexus.service/"
var serviceDirectory sync.Map
func AddServiceInstance(in *ServiceInstance) error {
key := ServiceInfoKvPrefix + in.ID
data, err := json.Marshal(in)
if err != nil {
return err
}
_, err = kv.Kv.Put(context.Background(), key, string(data))
return err
}
func GetServiceInstance(id string) *ServiceInstance { func GetServiceInstance(id string) *ServiceInstance {
val, ok := serviceDirectory.Load(id) key := ServiceInfoKvPrefix + id
if ok { resp, err := kv.Kv.Get(context.Background(), key)
return val.(*ServiceInstance) if err != nil || len(resp.Kvs) == 0 {
} else {
return nil return nil
} }
var instance ServiceInstance
err = json.Unmarshal(resp.Kvs[0].Value, &instance)
if err != nil {
return nil
} }
func GetServiceInstanceByType(t string) *ServiceInstance { return &instance
var result *ServiceInstance
serviceDirectory.Range(func(key, value any) bool {
if value.(*ServiceInstance).Type == t {
result = value.(*ServiceInstance)
return false
}
return true
})
return result
} }
func ListServiceInstance() []*ServiceInstance { func ListServiceInstance() []*ServiceInstance {
resp, err := kv.Kv.Get(context.Background(), ServiceInfoKvPrefix, clientv3.WithPrefix())
if err != nil {
return nil
}
var result []*ServiceInstance var result []*ServiceInstance
serviceDirectory.Range(func(key, value interface{}) bool { for _, val := range resp.Kvs {
result = append(result, value.(*ServiceInstance)) var instance ServiceInstance
return true if err := json.Unmarshal(val.Value, &instance); err != nil {
}) continue
}
result = append(result, &instance)
}
return result return result
} }
func ListServiceInstanceByType(t string) []*ServiceInstance { func ListServiceInstanceByType(t string) []*ServiceInstance {
var result []*ServiceInstance resp, err := kv.Kv.Get(context.Background(), ServiceInfoKvPrefix, clientv3.WithPrefix())
serviceDirectory.Range(func(key, value interface{}) bool { if err != nil {
if value.(*ServiceInstance).Type == t { return nil
result = append(result, value.(*ServiceInstance)) }
var result []*ServiceInstance
for _, val := range resp.Kvs {
var instance ServiceInstance
if err := json.Unmarshal(val.Value, &instance); err != nil {
continue
}
if instance.Type == t {
result = append(result, &instance)
}
} }
return true
})
return result return result
} }
func AddServiceInstance(in *ServiceInstance) { var srvRng = rand.New(rand.NewSource(time.Now().UnixNano()))
serviceDirectory.Store(in.ID, in)
}
func RemoveServiceInstance(id string) { func GetServiceInstanceByType(t string) *ServiceInstance {
serviceDirectory.Delete(id) resp, err := kv.Kv.Get(context.Background(), ServiceInfoKvPrefix, clientv3.WithPrefix())
}
func BroadcastEvent(event string, data any) {
serviceDirectory.Range(func(key, value any) bool {
conn, err := value.(*ServiceInstance).GetGrpcConn()
if err != nil { if err != nil {
return true return nil
}
var instances []*ServiceInstance
for _, val := range resp.Kvs {
var instance ServiceInstance
if err := json.Unmarshal(val.Value, &instance); err != nil {
continue
}
if instance.Type == t {
instances = append(instances, &instance)
}
}
if len(instances) == 0 {
return nil
}
idx := srvRng.Intn(len(instances))
return instances[idx]
}
func RemoveServiceInstance(id string) error {
key := ServiceInfoKvPrefix + id
_, err := kv.Kv.Delete(context.Background(), key)
return err
}
func BroadcastEvent(event string, data any) error {
resp, err := kv.Kv.Get(context.Background(), ServiceInfoKvPrefix, clientv3.WithPrefix())
if err != nil {
return err
}
for _, val := range resp.Kvs {
var instance ServiceInstance
if err := json.Unmarshal(val.Value, &instance); err != nil {
continue
}
conn, err := instance.GetGrpcConn()
if err != nil {
continue
} }
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
_, _ = proto.NewDirectoryServiceClient(conn).BroadcastEvent(ctx, &proto.EventInfo{ _, _ = proto.NewDirectoryServiceClient(conn).BroadcastEvent(ctx, &proto.EventInfo{
Event: event, Event: event,
Data: nex.EncodeMap(data), Data: nex.EncodeMap(data),
}) })
return true cancel()
}) }
return nil
} }

View File

@ -75,17 +75,25 @@ func (v *ServiceRpcServer) AddService(ctx context.Context, info *proto.ServiceIn
GrpcAddr: info.GetGrpcAddr(), GrpcAddr: info.GetGrpcAddr(),
HttpAddr: info.HttpAddr, HttpAddr: info.HttpAddr,
} }
AddServiceInstance(in) err = AddServiceInstance(in)
if err == nil {
log.Info().Str("id", clientId).Str("label", info.GetLabel()).Msg("New service registered") log.Info().Str("id", clientId).Str("label", info.GetLabel()).Msg("New service registered")
} else {
log.Error().Str("id", clientId).Str("label", info.GetLabel()).Err(err).Msg("Unable to register a service")
}
return &proto.AddServiceResponse{ return &proto.AddServiceResponse{
IsSuccess: true, IsSuccess: err == nil,
}, nil }, nil
} }
func (v *ServiceRpcServer) RemoveService(ctx context.Context, request *proto.RemoveServiceRequest) (*proto.RemoveServiceResponse, error) { func (v *ServiceRpcServer) RemoveService(ctx context.Context, request *proto.RemoveServiceRequest) (*proto.RemoveServiceResponse, error) {
RemoveServiceInstance(request.GetId()) err := RemoveServiceInstance(request.GetId())
log.Info().Str("id", request.GetId()).Msg("A service removed.") if err == nil {
log.Info().Str("id", request.GetId()).Msg("A service removed")
} else {
log.Error().Str("id", request.GetId()).Err(err).Msg("Unable to remove a service")
}
return &proto.RemoveServiceResponse{ return &proto.RemoveServiceResponse{
IsSuccess: true, IsSuccess: err == nil,
}, nil }, nil
} }

View File

@ -1,15 +1,15 @@
package api package api
import ( import (
directory2 "git.solsynth.dev/hypernet/nexus/pkg/internal/directory" "git.solsynth.dev/hypernet/nexus/pkg/internal/directory"
"github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2"
"github.com/samber/lo" "github.com/samber/lo"
) )
func listExistsService(c *fiber.Ctx) error { func listExistsService(c *fiber.Ctx) error {
services := directory2.ListServiceInstance() services := directory.ListServiceInstance()
return c.JSON(lo.Map(services, func(item *directory2.ServiceInstance, index int) map[string]any { return c.JSON(lo.Map(services, func(item *directory.ServiceInstance, index int) map[string]any {
return map[string]any{ return map[string]any{
"id": item.ID, "id": item.ID,
"type": item.Type, "type": item.Type,

View File

@ -1,6 +1,10 @@
package kv package kv
import ( import (
"context"
"fmt"
"github.com/rs/zerolog/log"
"github.com/samber/lo"
clientv3 "go.etcd.io/etcd/client/v3" clientv3 "go.etcd.io/etcd/client/v3"
"time" "time"
) )
@ -12,8 +16,22 @@ func ConnectEtcd(endpoints []string) error {
Endpoints: endpoints, Endpoints: endpoints,
DialTimeout: 10 * time.Second, DialTimeout: 10 * time.Second,
}) })
if err == nil { if err != nil {
Kv = conn return err
} }
var status []bool
for _, endpoint := range endpoints {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
_, err := conn.Status(ctx, endpoint)
if err != nil {
log.Warn().Str("endpoint", endpoint).Err(err).Msg("An KV endpoint is not available...")
}
status = append(status, err == nil)
cancel()
}
if len(lo.Filter(status, func(s bool, _ int) bool { return s })) == 0 {
return fmt.Errorf("unable to connect to all KV endpoints")
}
Kv = conn
return err return err
} }

View File

@ -47,12 +47,16 @@ func main() {
} }
// Connect to kv (etcd) // Connect to kv (etcd)
log.Info().Msg("Connecting to kv (etcd)...")
if err := kv.ConnectEtcd(viper.GetStringSlice("kv.endpoints")); err != nil { if err := kv.ConnectEtcd(viper.GetStringSlice("kv.endpoints")); err != nil {
log.Error().Err(err).Msg("An error occurred when connecting to kv (etcd), please check your configuration in kv section.") log.Error().Err(err).Msg("An error occurred when connecting to kv (etcd), please check your configuration in kv section.")
log.Fatal().Msg("Kv is required for service discovery and directory feature, cannot be disabled.") log.Fatal().Msg("Kv is required for service discovery and directory feature, cannot be disabled.")
} else {
log.Info().Msg("Connected to kv (etcd)!")
} }
// Connect to database // Connect to database
log.Info().Msg("Connecting to database...")
if db, err := database.Connect(viper.GetString("database.dsn")); err != nil { if db, err := database.Connect(viper.GetString("database.dsn")); err != nil {
log.Error().Err(err).Msg("An error occurred when connecting to database. Database related features will be disabled.") log.Error().Err(err).Msg("An error occurred when connecting to database. Database related features will be disabled.")
} else { } else {
@ -62,7 +66,7 @@ func main() {
log.Error().Err(err).Msg("An error occurred when querying database version. Database related features will be disabled.") log.Error().Err(err).Msg("An error occurred when querying database version. Database related features will be disabled.")
database.Kdb = nil database.Kdb = nil
} else { } else {
log.Info().Str("version", version).Msg("Connected to database") log.Info().Str("version", version).Msg("Connected to database!")
} }
} }