fix: add sleep to iterator in case of err

This commit is contained in:
Richard Ramos 2023-01-27 13:36:52 -04:00 committed by RichΛrd
parent 2936a98923
commit d4473e9c46
2 changed files with 21 additions and 11 deletions

View File

@ -4,8 +4,10 @@ import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"net"
"sync"
"time"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
@ -248,11 +250,10 @@ func (d *DiscoveryV5) Iterator() (enode.Iterator, error) {
return enode.Filter(iterator, evaluateNode), nil
}
func (d *DiscoveryV5) iterate(ctx context.Context) {
func (d *DiscoveryV5) iterate(ctx context.Context) error {
iterator, err := d.Iterator()
if err != nil {
d.log.Debug("obtaining iterator", zap.Error(err))
return
return fmt.Errorf("obtaining iterator: %w", err)
}
defer iterator.Close()
@ -282,11 +283,13 @@ func (d *DiscoveryV5) iterate(ctx context.Context) {
if len(peerAddrs) != 0 {
select {
case <-ctx.Done():
return
return nil
case d.peerConnector.PeerChannel() <- peerAddrs[0]:
}
}
}
return nil
}
func (d *DiscoveryV5) runDiscoveryV5Loop(ctx context.Context) {
@ -299,10 +302,11 @@ restartLoop:
for {
select {
case <-ch:
if d.listener == nil {
break
err := d.iterate(ctx)
if err != nil {
d.log.Debug("iterating discv5", zap.Error(err))
time.Sleep(2 * time.Second)
}
d.iterate(ctx)
ch <- struct{}{}
case <-ctx.Done():
close(ch)

View File

@ -5,6 +5,7 @@ import (
"bytes"
"context"
"errors"
"fmt"
"math"
"math/rand"
"sync"
@ -311,11 +312,10 @@ func (wakuPX *WakuPeerExchange) cleanCache() {
wakuPX.enrCache = r
}
func (wakuPX *WakuPeerExchange) iterate(ctx context.Context) {
func (wakuPX *WakuPeerExchange) iterate(ctx context.Context) error {
iterator, err := wakuPX.disc.Iterator()
if err != nil {
wakuPX.log.Debug("obtaining iterator", zap.Error(err))
return
return fmt.Errorf("obtaining iterator: %w", err)
}
defer iterator.Close()
@ -346,6 +346,8 @@ func (wakuPX *WakuPeerExchange) iterate(ctx context.Context) {
}
wakuPX.enrCacheMutex.Unlock()
}
return nil
}
func (wakuPX *WakuPeerExchange) runPeerExchangeDiscv5Loop(ctx context.Context) {
@ -367,7 +369,11 @@ restartLoop:
for {
select {
case <-ch:
wakuPX.iterate(ctx)
err := wakuPX.iterate(ctx)
if err != nil {
wakuPX.log.Debug("iterating peer exchange", zap.Error(err))
time.Sleep(2 * time.Second)
}
ch <- struct{}{}
case <-ticker.C:
wakuPX.cleanCache()