package dev import ( "bufio" "context" "fmt" "io" "net" "os" "os/exec" "os/signal" "strings" "sync" "syscall" "time" "git.solsynth.dev/goatworks/turbine/pkg/launchpad/config" "git.solsynth.dev/goatworks/turbine/pkg/launchpad/logview" "github.com/rs/zerolog/log" ) var colors = []string{ "\033[32m", "\033[33m", "\033[34m", "\033[35m", "\033[36m", "\033[31m", } // RunDev starts selected services defined in the config in development mode. func RunDev(cfg config.LaunchpadConfig, servicesToStart []config.Service) { log.Info().Msg("Starting services in development mode with dependency checks...") ctx, cancel := context.WithCancel(context.Background()) var wg sync.WaitGroup defer func() { log.Info().Msg("Shutting down all services...") cancel() wg.Wait() log.Info().Msg("All services have been shut down.") }() sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) go func() { <-sigChan log.Info().Msg("Shutdown signal received.") cancel() }() devNetwork := "turbine-dev-net" if len(cfg.Networks) > 0 { for name := range cfg.Networks { devNetwork = name break } } createDockerNetwork(devNetwork) serviceMap := make(map[string]config.Service) for _, s := range cfg.Services { serviceMap[s.Name] = s } started := make(map[string]chan bool) serviceNamesToStart := []string{} for _, s := range servicesToStart { serviceNamesToStart = append(serviceNamesToStart, s.Name) } log.Info().Msgf("Attempting to start: %s", strings.Join(serviceNamesToStart, ", ")) logChan := make(chan logview.LogMessage, 100) for i, s := range servicesToStart { if _, exists := started[s.Name]; !exists { color := colors[i%len(colors)] startServiceWithDeps(ctx, &wg, s, serviceMap, started, devNetwork, color, logChan) } } // Don't start the log viewer if no services were selected to run if len(servicesToStart) > 0 { if err := logview.Start(logChan, serviceNamesToStart); err != nil { log.Fatal().Err(err).Msg("Log viewer failed") } } } func startServiceWithDeps(ctx context.Context, wg *sync.WaitGroup, s config.Service, serviceMap map[string]config.Service, started map[string]chan bool, network, color string, logChan chan<- logview.LogMessage) { if _, exists := started[s.Name]; exists { return } healthyChan := make(chan bool) started[s.Name] = healthyChan var depNames []string for depName := range s.Prod.DependsOn { depNames = append(depNames, depName) } for _, depName := range depNames { if dep, ok := serviceMap[depName]; ok { // Dependencies get a default color for now startServiceWithDeps(ctx, wg, dep, serviceMap, started, network, "\033[37m", logChan) } } log.Info().Str("service", s.Name).Msgf("Waiting for dependencies to be healthy: %v", depNames) for _, depName := range depNames { if depChan, ok := started[depName]; ok { log.Info().Str("service", s.Name).Msgf("Waiting for %s...", depName) select { case <-depChan: log.Info().Str("service", s.Name).Msgf("Dependency %s is healthy.", depName) case <-ctx.Done(): log.Warn().Str("service", s.Name).Msg("Shutdown signal received, aborting startup.") return } } } wg.Add(1) go func() { defer wg.Done() var healthCheckPorts []int if s.Type == "docker" { healthCheckPorts = s.Dev.Healthcheck.TcpPorts startDockerService(ctx, s, color, network, healthCheckPorts, logChan) } else if s.Dev.Command != "" { healthCheckPorts = s.Dev.Healthcheck.TcpPorts startSourceService(ctx, s, color, logChan) } else { log.Warn().Str("service", s.Name).Msg("No dev.command or docker type, skipping.") close(healthyChan) return } waitForHealth(ctx, s.Name, healthCheckPorts) close(healthyChan) <-ctx.Done() }( ) } func waitForHealth(ctx context.Context, serviceName string, ports []int) { if len(ports) == 0 { log.Info().Str("service", serviceName).Msg("No healthcheck ports defined, assuming healthy immediately.") return } for _, port := range ports { address := fmt.Sprintf("127.0.0.1:%d", port) log.Info().Str("service", serviceName).Msgf("Waiting for %s to be available...", address) for { select { case <-ctx.Done(): log.Warn().Str("service", serviceName).Msg("Shutdown signal received, aborting health check.") return default: conn, err := net.DialTimeout("tcp", address, 1*time.Second) if err == nil { conn.Close() log.Info().Str("service", serviceName).Msgf("%s is healthy!", address) goto nextPort } time.Sleep(2 * time.Second) } } nextPort: } } func startSourceService(ctx context.Context, s config.Service, color string, logChan chan<- logview.LogMessage) { log.Info().Str("service", s.Name).Str("command", s.Dev.Command).Msg("Starting from source") parts := strings.Fields(s.Dev.Command) cmd := exec.CommandContext(ctx, parts[0], parts[1:]...) cmd.Dir = s.Path env := os.Environ() for _, e := range s.Prod.Environment { env = append(env, os.ExpandEnv(e)) } cmd.Env = env runAndMonitorCommand(ctx, cmd, s.Name, color, logChan) } func startDockerService(ctx context.Context, s config.Service, color string, network string, portsToMap []int, logChan chan<- logview.LogMessage) { log.Info().Str("service", s.Name).Str("image", s.Prod.Image).Msg("Starting from Docker image") containerName := fmt.Sprintf("%s-dev", s.Name) exec.Command("docker", "rm", "-f", containerName).Run() args := []string{"run", "--rm", "-i", "--name", containerName} if network != "" { args = append(args, "--network", network) } for _, p := range portsToMap { args = append(args, "-p", fmt.Sprintf("%d:%d", p, p)) } for _, e := range s.Prod.Environment { args = append(args, "-e", os.ExpandEnv(e)) } args = append(args, os.ExpandEnv(s.Prod.Image)) if s.Prod.Command != nil { switch v := s.Prod.Command.(type) { case string: args = append(args, strings.Fields(v)...) case []interface{}: for _, item := range v { args = append(args, fmt.Sprintf("%v", item)) } } } go func() { <-ctx.Done() log.Info().Str("service", s.Name).Msg("Stopping docker container...") stopCmd := exec.Command("docker", "stop", containerName) if err := stopCmd.Run(); err != nil { log.Warn().Err(err).Str("service", s.Name).Msg("Failed to stop container.") } }() cmd := exec.Command("docker", args...) runAndMonitorCommand(ctx, cmd, s.Name, color, logChan) } func runAndMonitorCommand(ctx context.Context, cmd *exec.Cmd, serviceName, color string, logChan chan<- logview.LogMessage) { stdout, _ := cmd.StdoutPipe() stderr, _ := cmd.StderrPipe() if err := cmd.Start(); err != nil { log.Error().Err(err).Str("service", serviceName).Msg("Failed to start") return } go streamOutput(stdout, serviceName, color, logChan) go streamOutput(stderr, serviceName, color, logChan) go func() { err := cmd.Wait() if ctx.Err() != nil { // This is expected on clean shutdown } else if err != nil { log.Error().Err(err).Str("service", serviceName).Msg("Exited with error") } else { log.Info().Str("service", serviceName).Msg("Exited") } }() } func streamOutput(pipe io.ReadCloser, serviceName, color string, logChan chan<- logview.LogMessage) { scanner := bufio.NewScanner(pipe) for scanner.Scan() { logChan <- logview.LogMessage{ServiceName: serviceName, Line: scanner.Text(), Color: color} } } func createDockerNetwork(networkName string) { log.Info().Str("network", networkName).Msg("Ensuring docker network exists") cmd := exec.Command("docker", "network", "inspect", networkName) if err := cmd.Run(); err == nil { log.Info().Str("network", networkName).Msg("Network already exists.") return } cmd = exec.Command("docker", "network", "create", networkName) if err := cmd.Run(); err != nil { log.Warn().Err(err).Msg("Could not create docker network.") } }