Files
Turbine/pkg/shared/registrar/mod.go
2025-12-13 13:47:10 +08:00

85 lines
1.8 KiB
Go

// Package registrar is for the service discovery system
package registrar
import (
"context"
"fmt"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
)
type ServiceRegistrar struct {
client *clientv3.Client
leaseID clientv3.LeaseID
ttl int64
key string
cancel context.CancelFunc
}
// NewServiceRegistrar creates a registrar with the given etcd client
func NewServiceRegistrar(endpoints []string) (*ServiceRegistrar, error) {
cfg := clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
}
cli, err := clientv3.New(cfg)
if err != nil {
return nil, err
}
return &ServiceRegistrar{client: cli}, nil
}
// Register service with etcd using TTL lease
func (r *ServiceRegistrar) Register(serviceName string, servicePart string, instanceID string, host string, port int, ttlSeconds int64) error {
r.ttl = ttlSeconds
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
r.key = fmt.Sprintf("/services/%s/%s/%s", serviceName, servicePart, instanceID)
// Create lease
leaseResp, err := r.client.Grant(ctx, r.ttl)
if err != nil {
return err
}
r.leaseID = leaseResp.ID
// Put key with lease
_, err = r.client.Put(ctx, r.key, fmt.Sprintf("%s:%d", host, port), clientv3.WithLease(r.leaseID))
if err != nil {
return err
}
// Keep alive context
kaCtx, kaCancel := context.WithCancel(context.Background())
r.cancel = kaCancel
// Keep lease alive
go func() {
ch, err := r.client.KeepAlive(kaCtx, r.leaseID)
if err != nil {
fmt.Printf("KeepAlive error: %v\n", err)
return
}
for range ch {
// optionally inspect TTL updates here
}
}()
return nil
}
// Deregister removes the registered key and stops keep-alive
func (r *ServiceRegistrar) Deregister() error {
if r.cancel != nil {
r.cancel()
}
_, err := r.client.Delete(context.Background(), r.key)
return err
}