85 lines
1.8 KiB
Go
85 lines
1.8 KiB
Go
// Package registrar is for the service discovery system
|
|
package registrar
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
clientv3 "go.etcd.io/etcd/client/v3"
|
|
)
|
|
|
|
type ServiceRegistrar struct {
|
|
client *clientv3.Client
|
|
leaseID clientv3.LeaseID
|
|
ttl int64
|
|
key string
|
|
cancel context.CancelFunc
|
|
}
|
|
|
|
// NewServiceRegistrar creates a registrar with the given etcd client
|
|
func NewServiceRegistrar(endpoints []string) (*ServiceRegistrar, error) {
|
|
cfg := clientv3.Config{
|
|
Endpoints: endpoints,
|
|
DialTimeout: 5 * time.Second,
|
|
}
|
|
|
|
cli, err := clientv3.New(cfg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &ServiceRegistrar{client: cli}, nil
|
|
}
|
|
|
|
// Register service with etcd using TTL lease
|
|
func (r *ServiceRegistrar) Register(serviceName string, servicePart string, instanceID string, host string, port int, ttlSeconds int64) error {
|
|
r.ttl = ttlSeconds
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
|
|
defer cancel()
|
|
|
|
r.key = fmt.Sprintf("/services/%s/%s/%s", serviceName, servicePart, instanceID)
|
|
|
|
// Create lease
|
|
leaseResp, err := r.client.Grant(ctx, r.ttl)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
r.leaseID = leaseResp.ID
|
|
|
|
// Put key with lease
|
|
_, err = r.client.Put(ctx, r.key, fmt.Sprintf("%s:%d", host, port), clientv3.WithLease(r.leaseID))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Keep alive context
|
|
kaCtx, kaCancel := context.WithCancel(context.Background())
|
|
r.cancel = kaCancel
|
|
|
|
// Keep lease alive
|
|
go func() {
|
|
ch, err := r.client.KeepAlive(kaCtx, r.leaseID)
|
|
if err != nil {
|
|
fmt.Printf("KeepAlive error: %v\n", err)
|
|
return
|
|
}
|
|
for range ch {
|
|
// optionally inspect TTL updates here
|
|
}
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
// Deregister removes the registered key and stops keep-alive
|
|
func (r *ServiceRegistrar) Deregister() error {
|
|
if r.cancel != nil {
|
|
r.cancel()
|
|
}
|
|
|
|
_, err := r.client.Delete(context.Background(), r.key)
|
|
return err
|
|
}
|