Properly handle context.Canceled error in rendezvous (#1161)
This commit is contained in:
parent
874a3e8151
commit
c35120c0b3
|
@ -84,28 +84,38 @@ func (r *Rendezvous) Stop() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (r *Rendezvous) register(topic string) {
|
||||
func (r *Rendezvous) register(topic string) error {
|
||||
srv := r.servers[rand.Intn(len(r.servers))]
|
||||
ctx, cancel := context.WithTimeout(r.rootCtx, requestTimeout)
|
||||
defer cancel()
|
||||
|
||||
r.mu.RLock()
|
||||
defer r.mu.RUnlock()
|
||||
defer cancel()
|
||||
if err := r.client.Register(ctx, srv, topic, r.record); err != nil {
|
||||
log.Debug("error registering", "topic", topic, "rendevous server", srv, "err", err)
|
||||
|
||||
err := r.client.Register(ctx, srv, topic, r.record)
|
||||
if err != nil {
|
||||
log.Error("error registering", "topic", topic, "rendezvous server", srv, "err", err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Register renews registration in the specified server.
|
||||
func (r *Rendezvous) Register(topic string, stop chan struct{}) error {
|
||||
ticker := time.NewTicker(r.registrationPeriod)
|
||||
defer ticker.Stop()
|
||||
r.register(topic)
|
||||
|
||||
if err := r.register(topic); err == context.Canceled {
|
||||
return err
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-stop:
|
||||
return nil
|
||||
case <-ticker.C:
|
||||
r.register(topic)
|
||||
if err := r.register(topic); err == context.Canceled {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -130,7 +140,9 @@ func (r *Rendezvous) Discover(
|
|||
records, err := r.client.Discover(ctx, srv, topic, r.bucketSize)
|
||||
r.mu.RUnlock()
|
||||
cancel()
|
||||
if err != nil {
|
||||
if err == context.Canceled {
|
||||
return err
|
||||
} else if err != nil {
|
||||
log.Debug("error fetching records", "topic", topic, "rendezvous server", srv, "err", err)
|
||||
continue
|
||||
}
|
||||
|
|
|
@ -27,6 +27,7 @@ func TestRendezvousDiscovery(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
srv := server.NewServer(laddr, priv, server.NewStorage(db))
|
||||
require.NoError(t, srv.Start())
|
||||
defer srv.Stop()
|
||||
|
||||
identity, err := crypto.GenerateKey()
|
||||
require.NoError(t, err)
|
||||
|
@ -54,5 +55,4 @@ func TestRendezvousDiscovery(t *testing.T) {
|
|||
}
|
||||
close(stop)
|
||||
close(period)
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue