170 lines
4.5 KiB
Go
170 lines
4.5 KiB
Go
// Package registrar is for the service discovery system
|
|
package registrar
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/rs/zerolog/log"
|
|
clientv3 "go.etcd.io/etcd/client/v3"
|
|
)
|
|
|
|
// ServiceDiscovery handles discovering services from etcd.
|
|
type ServiceDiscovery struct {
|
|
client *clientv3.Client
|
|
serviceRoutes map[string]*ServiceRoute
|
|
mutex sync.RWMutex
|
|
}
|
|
|
|
// ServiceRoute contains routing information for a service.
|
|
type ServiceRoute struct {
|
|
Instances []string
|
|
next int
|
|
mutex sync.Mutex
|
|
}
|
|
|
|
// GetNextInstance returns the next available instance address using round-robin.
|
|
func (sr *ServiceRoute) GetNextInstance() string {
|
|
sr.mutex.Lock()
|
|
defer sr.mutex.Unlock()
|
|
if len(sr.Instances) == 0 {
|
|
return ""
|
|
}
|
|
instance := sr.Instances[sr.next]
|
|
sr.next = (sr.next + 1) % len(sr.Instances)
|
|
return instance
|
|
}
|
|
|
|
// NewServiceDiscovery creates a new ServiceDiscovery client.
|
|
func NewServiceDiscovery(endpoints []string) (*ServiceDiscovery, error) {
|
|
cfg := clientv3.Config{
|
|
Endpoints: endpoints,
|
|
DialTimeout: 5 * time.Second,
|
|
}
|
|
|
|
cli, err := clientv3.New(cfg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &ServiceDiscovery{
|
|
client: cli,
|
|
serviceRoutes: make(map[string]*ServiceRoute),
|
|
}, nil
|
|
}
|
|
|
|
// Start initializes the service discovery by fetching all services and starting a watch for updates.
|
|
func (sd *ServiceDiscovery) Start() error {
|
|
if err := sd.fetchAllServices(); err != nil {
|
|
return fmt.Errorf("failed to fetch initial services: %w", err)
|
|
}
|
|
|
|
go sd.watchServices()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (sd *ServiceDiscovery) fetchAllServices() error {
|
|
sd.mutex.Lock()
|
|
defer sd.mutex.Unlock()
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
|
|
keyPrefix := "/services/"
|
|
resp, err := sd.client.Get(ctx, keyPrefix, clientv3.WithPrefix())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
newRoutes := make(map[string][]string)
|
|
for _, kv := range resp.Kvs {
|
|
parts := strings.Split(strings.TrimPrefix(string(kv.Key), "/services/"), "/")
|
|
if len(parts) < 3 {
|
|
continue // Invalid key format
|
|
}
|
|
serviceName := parts[0]
|
|
servicePart := parts[1]
|
|
|
|
// The gateway is only concerned with http services.
|
|
if servicePart != "http" {
|
|
continue
|
|
}
|
|
|
|
if _, ok := newRoutes[serviceName]; !ok {
|
|
newRoutes[serviceName] = []string{}
|
|
}
|
|
newRoutes[serviceName] = append(newRoutes[serviceName], string(kv.Value))
|
|
}
|
|
|
|
// Update the main serviceRoutes map
|
|
// Remove services that no longer exist
|
|
for serviceName := range sd.serviceRoutes {
|
|
if _, ok := newRoutes[serviceName]; !ok {
|
|
delete(sd.serviceRoutes, serviceName)
|
|
log.Info().Str("service", serviceName).Msg("Service removed from routing map")
|
|
}
|
|
}
|
|
|
|
// Add new or update existing services
|
|
for serviceName, instances := range newRoutes {
|
|
if _, ok := sd.serviceRoutes[serviceName]; !ok {
|
|
sd.serviceRoutes[serviceName] = &ServiceRoute{
|
|
next: 0,
|
|
}
|
|
log.Info().Str("service", serviceName).Msg("New service added to routing map")
|
|
}
|
|
sd.serviceRoutes[serviceName].Instances = instances
|
|
log.Info().Str("service", serviceName).Strs("instances", instances).Msg("Service instances updated")
|
|
}
|
|
|
|
log.Info().Msgf("Service routes reloaded. Total services: %d", len(sd.serviceRoutes))
|
|
return nil
|
|
}
|
|
|
|
func (sd *ServiceDiscovery) watchServices() {
|
|
keyPrefix := "/services/"
|
|
rch := sd.client.Watch(context.Background(), keyPrefix, clientv3.WithPrefix())
|
|
|
|
log.Info().Msg("Watching for service changes in etcd...")
|
|
|
|
for wresp := range rch {
|
|
for _, ev := range wresp.Events {
|
|
log.Info().
|
|
Str("type", ev.Type.String()).
|
|
Str("key", string(ev.Kv.Key)).
|
|
Msg("Service change detected, rebuilding routing map.")
|
|
if err := sd.fetchAllServices(); err != nil {
|
|
log.Error().Err(err).Msg("Failed to rebuild service map on watch event")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// GetNextInstance finds the next instance for a given service.
|
|
func (sd *ServiceDiscovery) GetNextInstance(serviceName string) (string, error) {
|
|
sd.mutex.RLock()
|
|
defer sd.mutex.RUnlock()
|
|
|
|
route, ok := sd.serviceRoutes[serviceName]
|
|
if !ok || len(route.Instances) == 0 {
|
|
return "", fmt.Errorf("service not found or has no instances: %s", serviceName)
|
|
}
|
|
|
|
return route.GetNextInstance(), nil
|
|
}
|
|
|
|
// GetServiceRoutes returns a copy of the current service routes for inspection.
|
|
func (sd *ServiceDiscovery) GetServiceRoutes() map[string][]string {
|
|
sd.mutex.RLock()
|
|
defer sd.mutex.RUnlock()
|
|
|
|
routes := make(map[string][]string)
|
|
for name, route := range sd.serviceRoutes {
|
|
routes[name] = route.Instances
|
|
}
|
|
return routes
|
|
} |