Exit from register and discover gracefully is rendezvous was stopped (#1238)
This commit is contained in:
parent
c86f8bf6ca
commit
659d79277b
|
@ -24,8 +24,9 @@ const (
|
|||
)
|
||||
|
||||
var (
|
||||
errNodeIsNil = errors.New("node cannot be nil")
|
||||
errIdentityIsNil = errors.New("identity cannot be nil")
|
||||
errNodeIsNil = errors.New("node cannot be nil")
|
||||
errIdentityIsNil = errors.New("identity cannot be nil")
|
||||
errDiscoveryIsStopped = errors.New("discovery is stopped")
|
||||
)
|
||||
|
||||
func NewRendezvous(servers []ma.Multiaddr, identity *ecdsa.PrivateKey, node *discover.Node) (*Rendezvous, error) {
|
||||
|
@ -126,7 +127,9 @@ func (r *Rendezvous) register(topic string, record enr.Record) error {
|
|||
|
||||
r.mu.RLock()
|
||||
defer r.mu.RUnlock()
|
||||
|
||||
if r.client == nil {
|
||||
return errDiscoveryIsStopped
|
||||
}
|
||||
err := r.client.Register(ctx, srv, topic, record, r.registrationPeriod)
|
||||
if err != nil {
|
||||
log.Error("error registering", "topic", topic, "rendezvous server", srv, "err", err)
|
||||
|
@ -156,11 +159,24 @@ func (r *Rendezvous) Register(topic string, stop chan struct{}) error {
|
|||
case <-ticker.C:
|
||||
if err := r.register(topic, record); err == context.Canceled {
|
||||
return err
|
||||
} else if err == errDiscoveryIsStopped {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Rendezvous) discoverRequest(srv ma.Multiaddr, topic string) ([]enr.Record, error) {
|
||||
ctx, cancel := context.WithTimeout(r.rootCtx, requestTimeout)
|
||||
defer cancel()
|
||||
r.mu.RLock()
|
||||
defer r.mu.RUnlock()
|
||||
if r.client == nil {
|
||||
return nil, errDiscoveryIsStopped
|
||||
}
|
||||
return r.client.Discover(ctx, srv, topic, r.bucketSize)
|
||||
}
|
||||
|
||||
// Discover will search for new records every time period fetched from period channel.
|
||||
func (r *Rendezvous) Discover(
|
||||
topic string, period <-chan time.Duration, found chan<- *discv5.Node, lookup chan<- bool,
|
||||
|
@ -176,33 +192,32 @@ func (r *Rendezvous) Discover(
|
|||
ticker = time.NewTicker(newPeriod)
|
||||
case <-ticker.C:
|
||||
srv := r.servers[rand.Intn(len(r.servers))]
|
||||
ctx, cancel := context.WithTimeout(r.rootCtx, requestTimeout)
|
||||
r.mu.RLock()
|
||||
records, err := r.client.Discover(ctx, srv, topic, r.bucketSize)
|
||||
r.mu.RUnlock()
|
||||
cancel()
|
||||
records, err := r.discoverRequest(srv, topic)
|
||||
if err == context.Canceled {
|
||||
return err
|
||||
} else if err == errDiscoveryIsStopped {
|
||||
return nil
|
||||
} else if err != nil {
|
||||
log.Debug("error fetching records", "topic", topic, "rendezvous server", srv, "err", err)
|
||||
continue
|
||||
}
|
||||
for i := range records {
|
||||
n, err := enrToNode(records[i])
|
||||
log.Debug("converted enr to", "ENODE", n.String())
|
||||
if err != nil {
|
||||
log.Warn("error converting enr record to node", "err", err)
|
||||
continue
|
||||
}
|
||||
select {
|
||||
case found <- n:
|
||||
case newPeriod, ok := <-period:
|
||||
// closing a period channel is a signal to producer that consumer exited
|
||||
ticker.Stop()
|
||||
if !ok {
|
||||
return nil
|
||||
} else {
|
||||
for i := range records {
|
||||
n, err := enrToNode(records[i])
|
||||
log.Debug("converted enr to", "ENODE", n.String())
|
||||
if err != nil {
|
||||
log.Warn("error converting enr record to node", "err", err)
|
||||
|
||||
} else {
|
||||
select {
|
||||
case found <- n:
|
||||
case newPeriod, ok := <-period:
|
||||
// closing a period channel is a signal to producer that consumer exited
|
||||
ticker.Stop()
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
ticker = time.NewTicker(newPeriod)
|
||||
}
|
||||
}
|
||||
ticker = time.NewTicker(newPeriod)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -73,6 +73,16 @@ func TestMakeRecordReturnsCachedRecord(t *testing.T) {
|
|||
require.Equal(t, record.NodeAddr(), rst.NodeAddr())
|
||||
}
|
||||
|
||||
func TestRendezvousRegisterAndDiscoverExitGracefully(t *testing.T) {
|
||||
r, err := NewRendezvous(make([]ma.Multiaddr, 1), nil, nil)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, r.Start())
|
||||
require.NoError(t, r.Stop())
|
||||
require.EqualError(t, errDiscoveryIsStopped, r.register("", enr.Record{}).Error())
|
||||
_, err = r.discoverRequest(nil, "")
|
||||
require.EqualError(t, errDiscoveryIsStopped, err.Error())
|
||||
}
|
||||
|
||||
func BenchmarkRendezvousStart(b *testing.B) {
|
||||
identity, err := crypto.GenerateKey()
|
||||
require.NoError(b, err)
|
||||
|
|
Loading…
Reference in New Issue