diff --git a/pkg/internal/directory/service.go b/pkg/internal/directory/service.go index 0a8fba6..bfe4ca9 100644 --- a/pkg/internal/directory/service.go +++ b/pkg/internal/directory/service.go @@ -1,6 +1,9 @@ package directory -import "google.golang.org/grpc" +import ( + "github.com/rs/zerolog/log" + "google.golang.org/grpc" +) type ServiceInstance struct { ID string `json:"id"` @@ -9,21 +12,24 @@ type ServiceInstance struct { GrpcAddr string `json:"grpc_addr"` HttpAddr *string `json:"http_addr"` - grpcConn *grpc.ClientConn retryCount int } +var connectionCache = make(map[string]*grpc.ClientConn) + func (v *ServiceInstance) GetGrpcConn() (*grpc.ClientConn, error) { - if v.grpcConn != nil { - return v.grpcConn, nil + if conn, ok := connectionCache[v.ID]; ok { + return conn, nil } - var err error - v.grpcConn, err = ConnectService(v) + conn, err := ConnectService(v) if err != nil { _ = RemoveServiceInstance(v.ID) + log.Error().Str("id", v.ID).Err(err).Msg("Failed to connect to service, dropped...") return nil, err + } else { + connectionCache[v.ID] = conn } - return v.grpcConn, nil + return conn, nil } diff --git a/pkg/internal/directory/validator.go b/pkg/internal/directory/validator.go new file mode 100644 index 0000000..663a9ea --- /dev/null +++ b/pkg/internal/directory/validator.go @@ -0,0 +1,35 @@ +package directory + +import "github.com/rs/zerolog/log" + +func ValidateServices() { + services := ListServiceInstance() + if len(services) == 0 { + return + } + + checklist := make(map[string]bool) + successCount := 0 + log.Info().Int("count", len(services)).Msg("Validating services...") + for _, service := range services { + if _, ok := checklist[service.GrpcAddr]; ok { + _ = RemoveServiceInstance(service.ID) + log.Warn().Str("id", service.ID).Str("addr", service.GrpcAddr).Msg("Duplicated service address, dropped...") + continue + } + // Directly use the connect method to skip cache + if _, err := ConnectService(service); err != nil { + _ = RemoveServiceInstance(service.ID) + log.Warn().Err(err).Str("id", service.ID).Str("addr", service.GrpcAddr).Msg("Unable connect to service, dropped...") + continue + } + + successCount++ + } + + log.Info(). + Int("success", successCount). + Int("failed", len(services)-successCount). + Int("total", len(services)). + Msg("Service validation completed.") +} diff --git a/pkg/main.go b/pkg/main.go index af8eff3..7d0651e 100644 --- a/pkg/main.go +++ b/pkg/main.go @@ -4,6 +4,7 @@ import ( "fmt" "git.solsynth.dev/hypernet/nexus/pkg/internal/auth" "git.solsynth.dev/hypernet/nexus/pkg/internal/database" + "git.solsynth.dev/hypernet/nexus/pkg/internal/directory" "git.solsynth.dev/hypernet/nexus/pkg/internal/http" "git.solsynth.dev/hypernet/nexus/pkg/internal/kv" "git.solsynth.dev/hypernet/nexus/pkg/nex/sec" @@ -93,6 +94,9 @@ func main() { log.Info().Msg("Internal jwt private key loaded.") } + // Post-boot actions + directory.ValidateServices() + // Server go server.NewServer().Listen()