119 lines
3.3 KiB
Go
119 lines
3.3 KiB
Go
package directory
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
|
|
"git.solsynth.dev/hypernet/nexus/pkg/proto"
|
|
"github.com/rs/zerolog/log"
|
|
"github.com/samber/lo"
|
|
)
|
|
|
|
type ServiceRpcServer struct {
|
|
proto.UnimplementedDirectoryServiceServer
|
|
}
|
|
|
|
func instantiationService(in *ServiceInstance) *proto.ServiceInfo {
|
|
if in == nil {
|
|
return nil
|
|
}
|
|
return &proto.ServiceInfo{
|
|
Id: in.ID,
|
|
Type: in.Type,
|
|
Label: in.Label,
|
|
GrpcAddr: in.GrpcAddr,
|
|
HttpAddr: in.HttpAddr,
|
|
}
|
|
}
|
|
|
|
func (v *ServiceRpcServer) GetService(ctx context.Context, request *proto.GetServiceRequest) (*proto.GetServiceResponse, error) {
|
|
if request.Id != nil {
|
|
out := GetServiceInstance(request.GetId())
|
|
return &proto.GetServiceResponse{
|
|
Data: instantiationService(out),
|
|
}, nil
|
|
}
|
|
if request.Type != nil {
|
|
out := GetServiceInstanceByType(request.GetType())
|
|
return &proto.GetServiceResponse{
|
|
Data: instantiationService(out),
|
|
}, nil
|
|
}
|
|
return nil, fmt.Errorf("no filter condition is provided")
|
|
}
|
|
|
|
func (v *ServiceRpcServer) ListService(ctx context.Context, request *proto.ListServiceRequest) (*proto.ListServiceResponse, error) {
|
|
var out []*ServiceInstance
|
|
if request.Type != nil {
|
|
out = ListServiceInstanceByType(request.GetType())
|
|
} else {
|
|
out = ListServiceInstance()
|
|
}
|
|
return &proto.ListServiceResponse{
|
|
Data: lo.Map(out, func(item *ServiceInstance, index int) *proto.ServiceInfo {
|
|
return instantiationService(item)
|
|
}),
|
|
}, nil
|
|
}
|
|
|
|
func (v *ServiceRpcServer) AddService(ctx context.Context, info *proto.ServiceInfo) (*proto.AddServiceResponse, error) {
|
|
clientId, err := GetClientId(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if info.GetId() != clientId {
|
|
return nil, status.Errorf(codes.InvalidArgument, "client_id mismatch in metadata")
|
|
} else if len(clientId) == 0 {
|
|
return nil, status.Errorf(codes.InvalidArgument, "client_id must not be blank")
|
|
}
|
|
|
|
in := &ServiceInstance{
|
|
ID: clientId,
|
|
Type: info.GetType(),
|
|
Label: info.GetLabel(),
|
|
GrpcAddr: info.GetGrpcAddr(),
|
|
HttpAddr: info.HttpAddr,
|
|
}
|
|
err = AddServiceInstance(in)
|
|
if err == nil {
|
|
log.Info().Str("id", clientId).Str("label", info.GetLabel()).Msg("New service registered")
|
|
} else {
|
|
log.Error().Str("id", clientId).Str("label", info.GetLabel()).Err(err).Msg("Unable to register a service")
|
|
}
|
|
return &proto.AddServiceResponse{
|
|
IsSuccess: err == nil,
|
|
}, nil
|
|
}
|
|
|
|
func (v *ServiceRpcServer) RemoveService(ctx context.Context, request *proto.RemoveServiceRequest) (*proto.RemoveServiceResponse, error) {
|
|
err := RemoveServiceInstance(request.GetId())
|
|
if err == nil {
|
|
log.Info().Str("id", request.GetId()).Msg("A service removed")
|
|
} else {
|
|
log.Error().Str("id", request.GetId()).Err(err).Msg("Unable to remove a service")
|
|
}
|
|
return &proto.RemoveServiceResponse{
|
|
IsSuccess: err == nil,
|
|
}, nil
|
|
}
|
|
|
|
func (v *ServiceRpcServer) BroadcastEvent(ctx context.Context, in *proto.EventInfo) (*proto.EventResponse, error) {
|
|
services := ListServiceInstance()
|
|
log.Debug().
|
|
Int("destinations", len(services)).
|
|
Str("event", in.GetEvent()).
|
|
Msg("Broadcasting event...")
|
|
for _, service := range services {
|
|
conn, err := service.GetGrpcConn()
|
|
if err != nil {
|
|
continue
|
|
}
|
|
_, _ = proto.NewDirectoryServiceClient(conn).BroadcastEvent(ctx, in)
|
|
}
|
|
|
|
return &proto.EventResponse{}, nil
|
|
}
|