Dealer/pkg/internal/grpc/services.go

93 lines
2.7 KiB
Go
Raw Normal View History

2024-07-14 12:25:30 +00:00
package grpc
import (
"context"
"fmt"
2024-09-19 13:19:45 +00:00
"time"
2024-07-23 09:33:38 +00:00
2024-07-14 12:25:30 +00:00
"git.solsynth.dev/hydrogen/dealer/pkg/internal/directory"
"git.solsynth.dev/hydrogen/dealer/pkg/proto"
"github.com/rs/zerolog/log"
2024-07-14 12:25:30 +00:00
"github.com/samber/lo"
)
2024-09-19 13:19:45 +00:00
func convertServiceToInfo(in *directory.ServiceInstance) *proto.ServiceInfo {
2024-07-23 09:33:38 +00:00
if in == nil {
return nil
}
2024-07-14 12:25:30 +00:00
return &proto.ServiceInfo{
Id: in.ID,
Type: in.Type,
Label: in.Label,
GrpcAddr: in.GrpcAddr,
HttpAddr: in.HttpAddr,
}
}
func (v *Server) GetService(ctx context.Context, request *proto.GetServiceRequest) (*proto.GetServiceResponse, error) {
if request.Id != nil {
out := directory.GetServiceInstance(request.GetId())
return &proto.GetServiceResponse{
2024-09-19 13:19:45 +00:00
Data: convertServiceToInfo(out),
2024-07-14 12:25:30 +00:00
}, nil
}
if request.Type != nil {
out := directory.GetServiceInstanceByType(request.GetType())
return &proto.GetServiceResponse{
2024-09-19 13:19:45 +00:00
Data: convertServiceToInfo(out),
2024-07-14 12:25:30 +00:00
}, nil
}
return nil, fmt.Errorf("no filter condition is provided")
}
func (v *Server) ListService(ctx context.Context, request *proto.ListServiceRequest) (*proto.ListServiceResponse, error) {
var out []*directory.ServiceInstance
if request.Type != nil {
out = directory.ListServiceInstanceByType(request.GetType())
} else {
out = directory.ListServiceInstance()
}
return &proto.ListServiceResponse{
Data: lo.Map(out, func(item *directory.ServiceInstance, index int) *proto.ServiceInfo {
2024-09-19 13:19:45 +00:00
return convertServiceToInfo(item)
2024-07-14 12:25:30 +00:00
}),
}, nil
}
func (v *Server) AddService(ctx context.Context, info *proto.ServiceInfo) (*proto.AddServiceResponse, error) {
in := &directory.ServiceInstance{
ID: info.GetId(),
Type: info.GetType(),
Label: info.GetLabel(),
GrpcAddr: info.GetGrpcAddr(),
HttpAddr: info.HttpAddr,
}
directory.AddServiceInstance(in)
log.Info().Str("id", info.GetId()).Str("label", info.GetLabel()).Msg("New service added.")
2024-07-14 12:25:30 +00:00
return &proto.AddServiceResponse{
IsSuccess: true,
}, nil
}
func (v *Server) RemoveService(ctx context.Context, request *proto.RemoveServiceRequest) (*proto.RemoveServiceResponse, error) {
directory.RemoveServiceInstance(request.GetId())
log.Info().Str("id", request.GetId()).Msg("A service removed.")
2024-07-14 12:25:30 +00:00
return &proto.RemoveServiceResponse{
IsSuccess: true,
}, nil
}
2024-09-19 13:19:45 +00:00
func (v *Server) BroadcastDeletion(ctx context.Context, request *proto.DeletionRequest) (*proto.DeletionResponse, error) {
for _, service := range directory.ListServiceInstance() {
conn, err := service.GetGrpcConn()
if err != nil {
continue
}
pc := proto.NewServiceDirectoryClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
_, err = pc.BroadcastDeletion(ctx, request)
cancel()
}
return &proto.DeletionResponse{}, nil
}