// Package registrar is for the service discovery system package registrar import ( "context" "fmt" "strings" "sync" "time" "github.com/rs/zerolog/log" clientv3 "go.etcd.io/etcd/client/v3" ) // ServiceDiscovery handles discovering services from etcd. type ServiceDiscovery struct { client *clientv3.Client serviceRoutes map[string]*ServiceRoute mutex sync.RWMutex } // ServiceRoute contains routing information for a service. type ServiceRoute struct { Instances []string next int mutex sync.Mutex } // GetNextInstance returns the next available instance address using round-robin. func (sr *ServiceRoute) GetNextInstance() string { sr.mutex.Lock() defer sr.mutex.Unlock() if len(sr.Instances) == 0 { return "" } instance := sr.Instances[sr.next] sr.next = (sr.next + 1) % len(sr.Instances) return instance } // NewServiceDiscovery creates a new ServiceDiscovery client. func NewServiceDiscovery(endpoints []string) (*ServiceDiscovery, error) { cfg := clientv3.Config{ Endpoints: endpoints, DialTimeout: 5 * time.Second, } cli, err := clientv3.New(cfg) if err != nil { return nil, err } return &ServiceDiscovery{ client: cli, serviceRoutes: make(map[string]*ServiceRoute), }, nil } // Start initializes the service discovery by fetching all services and starting a watch for updates. func (sd *ServiceDiscovery) Start() error { if err := sd.fetchAllServices(); err != nil { return fmt.Errorf("failed to fetch initial services: %w", err) } go sd.watchServices() return nil } func (sd *ServiceDiscovery) fetchAllServices() error { sd.mutex.Lock() defer sd.mutex.Unlock() ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() keyPrefix := "/services/" resp, err := sd.client.Get(ctx, keyPrefix, clientv3.WithPrefix()) if err != nil { return err } newRoutes := make(map[string][]string) for _, kv := range resp.Kvs { parts := strings.Split(strings.TrimPrefix(string(kv.Key), "/services/"), "/") if len(parts) < 3 { continue // Invalid key format } serviceName := parts[0] servicePart := parts[1] // The gateway is only concerned with http services. if servicePart != "http" { continue } if _, ok := newRoutes[serviceName]; !ok { newRoutes[serviceName] = []string{} } newRoutes[serviceName] = append(newRoutes[serviceName], string(kv.Value)) } // Update the main serviceRoutes map // Remove services that no longer exist for serviceName := range sd.serviceRoutes { if _, ok := newRoutes[serviceName]; !ok { delete(sd.serviceRoutes, serviceName) log.Info().Str("service", serviceName).Msg("Service removed from routing map") } } // Add new or update existing services for serviceName, instances := range newRoutes { if _, ok := sd.serviceRoutes[serviceName]; !ok { sd.serviceRoutes[serviceName] = &ServiceRoute{ next: 0, } log.Info().Str("service", serviceName).Msg("New service added to routing map") } sd.serviceRoutes[serviceName].Instances = instances log.Info().Str("service", serviceName).Strs("instances", instances).Msg("Service instances updated") } log.Info().Msgf("Service routes reloaded. Total services: %d", len(sd.serviceRoutes)) return nil } func (sd *ServiceDiscovery) watchServices() { keyPrefix := "/services/" rch := sd.client.Watch(context.Background(), keyPrefix, clientv3.WithPrefix()) log.Info().Msg("Watching for service changes in etcd...") for wresp := range rch { for _, ev := range wresp.Events { log.Info(). Str("type", ev.Type.String()). Str("key", string(ev.Kv.Key)). Msg("Service change detected, rebuilding routing map.") if err := sd.fetchAllServices(); err != nil { log.Error().Err(err).Msg("Failed to rebuild service map on watch event") } } } } // GetNextInstance finds the next instance for a given service. func (sd *ServiceDiscovery) GetNextInstance(serviceName string) (string, error) { sd.mutex.RLock() defer sd.mutex.RUnlock() route, ok := sd.serviceRoutes[serviceName] if !ok || len(route.Instances) == 0 { return "", fmt.Errorf("service not found or has no instances: %s", serviceName) } return route.GetNextInstance(), nil } // GetServiceRoutes returns a copy of the current service routes for inspection. func (sd *ServiceDiscovery) GetServiceRoutes() map[string][]string { sd.mutex.RLock() defer sd.mutex.RUnlock() routes := make(map[string][]string) for name, route := range sd.serviceRoutes { routes[name] = route.Instances } return routes }