From 9bb2c8e3968086aaf09cd5cdce97d8b27d9d9f49 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Tue, 20 Feb 2024 14:07:06 -0400 Subject: [PATCH] chore: add artificial delay to peer-exchange --- waku/v2/discv5/discover.go | 33 ++++++++----------- waku/v2/protocol/peer_exchange/protocol.go | 10 ++++-- .../peer_exchange/waku_peer_exchange_test.go | 4 +-- 3 files changed, 23 insertions(+), 24 deletions(-) diff --git a/waku/v2/discv5/discover.go b/waku/v2/discv5/discover.go index e88721eb..dd9a462a 100644 --- a/waku/v2/discv5/discover.go +++ b/waku/v2/discv5/discover.go @@ -317,24 +317,7 @@ func (d *DiscoveryV5) Iterate(ctx context.Context, iterator enode.Iterator, onNo defer iterator.Close() peerCnt := 0 - for { - - if !delayedHasNext(ctx, iterator) { - return - } - - peerCnt++ - if peerCnt == bucketSize { // Delay every bucketSize peers discovered - peerCnt = 0 - t := time.NewTimer(delayBetweenDiscoveredPeerCnt) - select { - case <-ctx.Done(): - return - case <-t.C: - t.Stop() - } - } - + for DelayedHasNext(ctx, iterator, &peerCnt) { _, addresses, err := wenr.Multiaddress(iterator.Node()) if err != nil { d.metrics.RecordError(peerInfoFailure) @@ -364,7 +347,7 @@ func (d *DiscoveryV5) Iterate(ctx context.Context, iterator enode.Iterator, onNo } } -func delayedHasNext(ctx context.Context, iterator enode.Iterator) bool { +func DelayedHasNext(ctx context.Context, iterator enode.Iterator, peerCnt *int) bool { // Delay if .Next() is too fast start := time.Now() hasNext := iterator.Next() @@ -383,6 +366,18 @@ func delayedHasNext(ctx context.Context, iterator enode.Iterator) bool { } } + *peerCnt++ + if *peerCnt == bucketSize { // Delay every bucketSize peers discovered + *peerCnt = 0 + t := time.NewTimer(delayBetweenDiscoveredPeerCnt) + select { + case <-ctx.Done(): + return false + case <-t.C: + t.Stop() + } + } + return true } diff --git a/waku/v2/protocol/peer_exchange/protocol.go b/waku/v2/protocol/peer_exchange/protocol.go index 374cde3a..3f4a1573 100644 --- a/waku/v2/protocol/peer_exchange/protocol.go +++ b/waku/v2/protocol/peer_exchange/protocol.go @@ -144,7 +144,8 @@ func (wakuPX *WakuPeerExchange) iterate(ctx context.Context) error { // Closing iterator defer iterator.Close() - for iterator.Next() { + peerCnt := 0 + for discv5.DelayedHasNext(ctx, iterator, &peerCnt) { _, addresses, err := enr.Multiaddress(iterator.Node()) if err != nil { wakuPX.log.Error("extracting multiaddrs from enr", zap.Error(err)) @@ -183,12 +184,15 @@ func (wakuPX *WakuPeerExchange) runPeerExchangeDiscv5Loop(ctx context.Context) { err := wakuPX.iterate(ctx) if err != nil { wakuPX.log.Debug("iterating peer exchange", zap.Error(err)) - time.Sleep(2 * time.Second) } + + t := time.NewTimer(5 * time.Second) select { + case <-t.C: + t.Stop() case <-ctx.Done(): + t.Stop() return - default: } } } diff --git a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go index 5d2fbaa1..f9f6c526 100644 --- a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go +++ b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go @@ -55,8 +55,6 @@ func TestRetrieveProvidePeerExchangePeers(t *testing.T) { err = d2.Start(context.Background()) require.NoError(t, err) - time.Sleep(3 * time.Second) // Wait some time for peers to be discovered - // mount peer exchange pxPeerConn1 := discv5.NewTestPeerDiscoverer() px1, err := NewWakuPeerExchange(d1, pxPeerConn1, nil, prometheus.DefaultRegisterer, utils.Logger()) @@ -78,6 +76,8 @@ func TestRetrieveProvidePeerExchangePeers(t *testing.T) { err = host3.Peerstore().AddProtocols(host1.ID(), PeerExchangeID_v20alpha1) require.NoError(t, err) + time.Sleep(3 * time.Second) // Wait some time for peers to be discovered + err = px3.Request(context.Background(), 1, WithPeer(host1.ID())) require.NoError(t, err)