diff --git a/waku/v2/discv5/discover.go b/waku/v2/discv5/discover.go index 920cdf5b..97d16531 100644 --- a/waku/v2/discv5/discover.go +++ b/waku/v2/discv5/discover.go @@ -4,8 +4,10 @@ import ( "context" "crypto/ecdsa" "errors" + "fmt" "net" "sync" + "time" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" @@ -248,11 +250,10 @@ func (d *DiscoveryV5) Iterator() (enode.Iterator, error) { return enode.Filter(iterator, evaluateNode), nil } -func (d *DiscoveryV5) iterate(ctx context.Context) { +func (d *DiscoveryV5) iterate(ctx context.Context) error { iterator, err := d.Iterator() if err != nil { - d.log.Debug("obtaining iterator", zap.Error(err)) - return + return fmt.Errorf("obtaining iterator: %w", err) } defer iterator.Close() @@ -282,11 +283,13 @@ func (d *DiscoveryV5) iterate(ctx context.Context) { if len(peerAddrs) != 0 { select { case <-ctx.Done(): - return + return nil case d.peerConnector.PeerChannel() <- peerAddrs[0]: } } } + + return nil } func (d *DiscoveryV5) runDiscoveryV5Loop(ctx context.Context) { @@ -299,10 +302,11 @@ restartLoop: for { select { case <-ch: - if d.listener == nil { - break + err := d.iterate(ctx) + if err != nil { + d.log.Debug("iterating discv5", zap.Error(err)) + time.Sleep(2 * time.Second) } - d.iterate(ctx) ch <- struct{}{} case <-ctx.Done(): close(ch) diff --git a/waku/v2/protocol/peer_exchange/waku_peer_exchange.go b/waku/v2/protocol/peer_exchange/waku_peer_exchange.go index d6a81590..86df733f 100644 --- a/waku/v2/protocol/peer_exchange/waku_peer_exchange.go +++ b/waku/v2/protocol/peer_exchange/waku_peer_exchange.go @@ -5,6 +5,7 @@ import ( "bytes" "context" "errors" + "fmt" "math" "math/rand" "sync" @@ -311,11 +312,10 @@ func (wakuPX *WakuPeerExchange) cleanCache() { wakuPX.enrCache = r } -func (wakuPX *WakuPeerExchange) iterate(ctx context.Context) { +func (wakuPX *WakuPeerExchange) iterate(ctx context.Context) error { iterator, err := wakuPX.disc.Iterator() if err != nil { - wakuPX.log.Debug("obtaining iterator", zap.Error(err)) - return + return fmt.Errorf("obtaining iterator: %w", err) } defer iterator.Close() @@ -346,6 +346,8 @@ func (wakuPX *WakuPeerExchange) iterate(ctx context.Context) { } wakuPX.enrCacheMutex.Unlock() } + + return nil } func (wakuPX *WakuPeerExchange) runPeerExchangeDiscv5Loop(ctx context.Context) { @@ -367,7 +369,11 @@ restartLoop: for { select { case <-ch: - wakuPX.iterate(ctx) + err := wakuPX.iterate(ctx) + if err != nil { + wakuPX.log.Debug("iterating peer exchange", zap.Error(err)) + time.Sleep(2 * time.Second) + } ch <- struct{}{} case <-ticker.C: wakuPX.cleanCache()