diff --git a/README.md b/README.md index 19ad48b..91b8330 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,114 @@ # Turbine -Turbine is a set of infrastructure services such as the Gateway that built with Golang -but designed for the Solar Network. +A modular service framework. + +## Registrar + +The Registrar is the service discovery system of the DysonNetwork. +Here are a port to the Golang in order to support other Golang services. + +To use the system, try build with these API: + +```go +package main + +import ( + "log" + "os" + "os/signal" + "syscall" + + "yourmodule/registry" +) + +func main() { + endpoints := []string{"localhost:2379"} + + registrar, err := registry.NewServiceRegistrar(endpoints) + if err != nil { + log.Fatalf("Error creating registrar: %v", err) + } + + serviceName := "orders" + host := "10.0.0.5" + port := 5000 + ttl := int64(30) + + err = registrar.Register(serviceName, "http", "instance-1", host, port, ttl) + if err != nil { + log.Fatalf("Register error: %v", err) + } + log.Println("Service registered") + + // Wait for termination + stop := make(chan os.Signal, 1) + signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM) + <-stop + + err = registrar.Deregister() + if err != nil { + log.Printf("Deregister error: %v", err) + } else { + log.Println("Service deregistered") + } +} +``` + +## Gateway + +The gateway is the entry point for all requests. It uses the `registrar` to discover services in real-time from etcd and forwards requests to the appropriate service instances. + +### Features +- **Service Discovery**: Automatically discovers `http` services from etcd. +- **Dynamic Routing**: Maintains an in-memory routing table that is automatically updated when services are added or removed. +- **Request Proxying**: Forwards incoming requests to the correct service instance based on the URL path (`//...`). +- **Load Balancing**: Implements round-robin load balancing across service instances. +- **Route Overrides**: Allows for custom routing rules to be defined in the configuration file. + +### Configuration + +The gateway is configured via a `settings.toml` file located in the same directory. + +```toml +# The address the gateway will listen on +listen = ":8080" + +# ETCD configuration for service discovery +[etcd] +endpoints = ["127.0.0.1:2379"] +# Set to true if your etcd server does not use TLS +insecure = true + +# Custom route overrides +# The key is the incoming path prefix. +# The value is the destination in the format "//" +[routes] +"/websocket" = "/chatter/ws" +``` + +## Config Service + +The config service provides a centralized location for other services to fetch their configuration. This is useful for managing connection strings, feature flags, and other shared parameters without hardcoding them into each service. + +### Usage +The config service reads a `shared_config.toml` file from its own directory (`pkg/config`) and serves it as a JSON object over a simple HTTP endpoint. + +To retrieve the configuration, other services can make a GET request to the gateway at `/config`. The gateway will route the request to an available instance of the config service. + +**Example with curl:** +```bash +curl http://localhost:8080/config +``` + +**Expected Response (JSON):** +```json +{ + "database": { + "connection_string": "postgres://user:password@db-host:5432/mydatabase?sslmode=require" + }, + "redis": { + "address": "redis-host:6379" + } +} +``` + diff --git a/pkg/gateway/main.go b/pkg/gateway/main.go index 6756c68..1485193 100644 --- a/pkg/gateway/main.go +++ b/pkg/gateway/main.go @@ -1,40 +1,165 @@ package main import ( + "fmt" "os" + "strings" + + "git.solsynth.dev/goatworks/turbine/pkg/shared/registrar" "github.com/gofiber/fiber/v3" + "github.com/gofiber/fiber/v3/middleware/proxy" "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/spf13/viper" ) +var serviceDiscovery *registrar.ServiceDiscovery + func init() { log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr}) } func main() { + log.Info().Msg("Starting Turbine Gateway...") viper.SetConfigName("settings") viper.AddConfigPath(".") - viper.AddConfigPath("..") + viper.AddConfigPath("/etc/turbine/") viper.SetConfigType("toml") + + log.Info().Msg("Reading configuration...") if err := viper.ReadInConfig(); err != nil { log.Fatal().Err(err).Msg("Failed to read config file...") } + log.Info().Msg("Configuration loaded.") + + // Setup service discovery + etcdEndpoints := viper.GetStringSlice("etcd.endpoints") + if len(etcdEndpoints) == 0 { + log.Fatal().Msg("etcd.endpoints not configured in settings.toml") + } + + if viper.GetBool("etcd.insecure") { + log.Info().Msg("Using insecure transport for etcd") + for i, ep := range etcdEndpoints { + if !strings.HasPrefix(ep, "http://") && !strings.HasPrefix(ep, "https://") { + etcdEndpoints[i] = "http://" + ep + } + } + } + + log.Info().Strs("endpoints", etcdEndpoints).Msg("Connecting to etcd...") + + var err error + serviceDiscovery, err = registrar.NewServiceDiscovery(etcdEndpoints) + if err != nil { + log.Fatal().Err(err).Msg("Failed to create service discovery client") + } + log.Info().Msg("Service discovery client created.") + + log.Info().Msg("Fetching initial service list...") + if err := serviceDiscovery.Start(); err != nil { + log.Fatal().Err(err).Msg("Failed to start service discovery") + } + log.Info().Msg("Service discovery started.") app := fiber.New(fiber.Config{ - ServerHeader: "Turbine", - BodyLimit: 2147483647, + ServerHeader: "Turbine Gateway", + BodyLimit: 2147483647, // 2GB }) + // Health check and status app.Get("/", func(c fiber.Ctx) error { - return c.SendString("Hello, World!") + return c.JSON(fiber.Map{ + "status": "running", + "services": serviceDiscovery.GetServiceRoutes(), + }) + }) + + // Handle route overrides first + routeOverrides := viper.GetStringMapString("routes") + for from, to := range routeOverrides { + toParts := strings.SplitN(strings.TrimPrefix(to, "/"), "/", 2) + if len(toParts) < 1 { + log.Warn().Str("from", from).Str("to", to).Msg("Invalid route override config") + continue + } + serviceName := toParts[0] + var remainingPath string + if len(toParts) > 1 { + remainingPath = toParts[1] + } + + log.Info().Str("from", from).Str("to", to).Msg("Applying route override") + + app.Use(from, func(c fiber.Ctx) error { + instance, err := serviceDiscovery.GetNextInstance(serviceName) + if err != nil { + log.Error().Err(err).Str("service", serviceName).Msg("Failed to get service instance for override") + return c.Status(fiber.StatusServiceUnavailable).SendString(err.Error()) + } + + targetURL := fmt.Sprintf("http://%s/%s", instance, remainingPath) + + originalPath := strings.TrimPrefix(c.Path(), from) + if originalPath != "" { + targetURL = fmt.Sprintf("%s%s", targetURL, originalPath) + } + + if len(c.Request().URI().QueryString()) > 0 { + targetURL = fmt.Sprintf("%s?%s", targetURL, string(c.Request().URI().QueryString())) + } + + log.Info().Str("from", c.Path()).Str("to", targetURL).Msg("Forwarding with override") + + if err := proxy.Do(c, targetURL); err != nil { + return err + } + c.Response().SetStatusCode(fiber.StatusOK) + return nil + }) + } + + // Generic proxy handler + app.Use("/*", func(c fiber.Ctx) error { + path := c.Path() + parts := strings.Split(strings.TrimPrefix(path, "/"), "/") + if len(parts) < 1 || parts[0] == "" { + // Let the health check handle this + return c.Next() + } + + serviceName := parts[0] + remainingPath := strings.Join(parts[1:], "/") + + instance, err := serviceDiscovery.GetNextInstance(serviceName) + if err != nil { + log.Warn().Err(err).Str("service", serviceName).Msg("Failed to get service instance") + return c.Status(fiber.StatusServiceUnavailable).SendString(err.Error()) + } + + targetURL := fmt.Sprintf("http://%s/%s", instance, remainingPath) + if len(c.Request().URI().QueryString()) > 0 { + targetURL = fmt.Sprintf("%s?%s", targetURL, string(c.Request().URI().QueryString())) + } + + log.Info().Str("from", path).Str("to", targetURL).Msg("Forwarding request") + + if err := proxy.Do(c, targetURL); err != nil { + return err + } + + c.Response().SetStatusCode(fiber.StatusOK) + return nil }) listenAddr := viper.GetString("listen") - log.Info().Msg("Listening on " + listenAddr) + if listenAddr == "" { + listenAddr = ":8080" // default + } + log.Info().Msg("Gateway is listening on " + listenAddr) - err := app.Listen(listenAddr, fiber.ListenConfig{ + err = app.Listen(listenAddr, fiber.ListenConfig{ DisableStartupMessage: true, }) if err != nil { diff --git a/pkg/gateway/settings.toml b/pkg/gateway/settings.toml index a918485..bbfeeb0 100644 --- a/pkg/gateway/settings.toml +++ b/pkg/gateway/settings.toml @@ -1 +1,11 @@ listen = ":2999" + +[etcd] +endpoints = ["etcd.orb.local:2379"] +insecure = true + +# Route overrides. The key is the incoming path prefix. +# The value is the destination in the format "//" +[routes] +"/websocket" = "/chatter/ws" + diff --git a/pkg/shared/registrar/retirever.go b/pkg/shared/registrar/retirever.go deleted file mode 100644 index 1553f32..0000000 --- a/pkg/shared/registrar/retirever.go +++ /dev/null @@ -1,21 +0,0 @@ -package registrar - -import ( - "context" - "fmt" - - clientv3 "go.etcd.io/etcd/client/v3" -) - -func (r *ServiceRegistrar) ListInstance(serviceName string, servicePart string) ([]string, error) { - keyPrefix := fmt.Sprintf("/services/%s/%s", serviceName, servicePart) - resp, err := r.client.Get(context.Background(), keyPrefix, clientv3.WithPrefix()) - if err != nil { - return nil, err - } - var result []string - for _, kv := range resp.Kvs { - result = append(result, string(kv.Value)) - } - return result, nil -} diff --git a/pkg/shared/registrar/retriever.go b/pkg/shared/registrar/retriever.go new file mode 100644 index 0000000..25ec127 --- /dev/null +++ b/pkg/shared/registrar/retriever.go @@ -0,0 +1,170 @@ +// Package registrar is for the service discovery system +package registrar + +import ( + "context" + "fmt" + "strings" + "sync" + "time" + + "github.com/rs/zerolog/log" + clientv3 "go.etcd.io/etcd/client/v3" +) + +// ServiceDiscovery handles discovering services from etcd. +type ServiceDiscovery struct { + client *clientv3.Client + serviceRoutes map[string]*ServiceRoute + mutex sync.RWMutex +} + +// ServiceRoute contains routing information for a service. +type ServiceRoute struct { + Instances []string + next int + mutex sync.Mutex +} + +// GetNextInstance returns the next available instance address using round-robin. +func (sr *ServiceRoute) GetNextInstance() string { + sr.mutex.Lock() + defer sr.mutex.Unlock() + if len(sr.Instances) == 0 { + return "" + } + instance := sr.Instances[sr.next] + sr.next = (sr.next + 1) % len(sr.Instances) + return instance +} + +// NewServiceDiscovery creates a new ServiceDiscovery client. +func NewServiceDiscovery(endpoints []string) (*ServiceDiscovery, error) { + cfg := clientv3.Config{ + Endpoints: endpoints, + DialTimeout: 5 * time.Second, + } + + cli, err := clientv3.New(cfg) + if err != nil { + return nil, err + } + + return &ServiceDiscovery{ + client: cli, + serviceRoutes: make(map[string]*ServiceRoute), + }, nil +} + +// Start initializes the service discovery by fetching all services and starting a watch for updates. +func (sd *ServiceDiscovery) Start() error { + if err := sd.fetchAllServices(); err != nil { + return fmt.Errorf("failed to fetch initial services: %w", err) + } + + go sd.watchServices() + + return nil +} + +func (sd *ServiceDiscovery) fetchAllServices() error { + sd.mutex.Lock() + defer sd.mutex.Unlock() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + keyPrefix := "/services/" + resp, err := sd.client.Get(ctx, keyPrefix, clientv3.WithPrefix()) + if err != nil { + return err + } + + newRoutes := make(map[string][]string) + for _, kv := range resp.Kvs { + parts := strings.Split(strings.TrimPrefix(string(kv.Key), "/services/"), "/") + if len(parts) < 3 { + continue // Invalid key format + } + serviceName := parts[0] + servicePart := parts[1] + + // The gateway is only concerned with http services. + if servicePart != "http" { + continue + } + + if _, ok := newRoutes[serviceName]; !ok { + newRoutes[serviceName] = []string{} + } + newRoutes[serviceName] = append(newRoutes[serviceName], string(kv.Value)) + } + + // Update the main serviceRoutes map + // Remove services that no longer exist + for serviceName := range sd.serviceRoutes { + if _, ok := newRoutes[serviceName]; !ok { + delete(sd.serviceRoutes, serviceName) + log.Info().Str("service", serviceName).Msg("Service removed from routing map") + } + } + + // Add new or update existing services + for serviceName, instances := range newRoutes { + if _, ok := sd.serviceRoutes[serviceName]; !ok { + sd.serviceRoutes[serviceName] = &ServiceRoute{ + next: 0, + } + log.Info().Str("service", serviceName).Msg("New service added to routing map") + } + sd.serviceRoutes[serviceName].Instances = instances + log.Info().Str("service", serviceName).Strs("instances", instances).Msg("Service instances updated") + } + + log.Info().Msgf("Service routes reloaded. Total services: %d", len(sd.serviceRoutes)) + return nil +} + +func (sd *ServiceDiscovery) watchServices() { + keyPrefix := "/services/" + rch := sd.client.Watch(context.Background(), keyPrefix, clientv3.WithPrefix()) + + log.Info().Msg("Watching for service changes in etcd...") + + for wresp := range rch { + for _, ev := range wresp.Events { + log.Info(). + Str("type", ev.Type.String()). + Str("key", string(ev.Kv.Key)). + Msg("Service change detected, rebuilding routing map.") + if err := sd.fetchAllServices(); err != nil { + log.Error().Err(err).Msg("Failed to rebuild service map on watch event") + } + } + } +} + +// GetNextInstance finds the next instance for a given service. +func (sd *ServiceDiscovery) GetNextInstance(serviceName string) (string, error) { + sd.mutex.RLock() + defer sd.mutex.RUnlock() + + route, ok := sd.serviceRoutes[serviceName] + if !ok || len(route.Instances) == 0 { + return "", fmt.Errorf("service not found or has no instances: %s", serviceName) + } + + return route.GetNextInstance(), nil +} + +// GetServiceRoutes returns a copy of the current service routes for inspection. +func (sd *ServiceDiscovery) GetServiceRoutes() map[string][]string { + sd.mutex.RLock() + defer sd.mutex.RUnlock() + + routes := make(map[string][]string) + for name, route := range sd.serviceRoutes { + routes[name] = route.Instances + } + return routes +} \ No newline at end of file