diff --git a/waku/v2/discv5/discover.go b/waku/v2/discv5/discover.go index 94bb3c4e..8df3498b 100644 --- a/waku/v2/discv5/discover.go +++ b/waku/v2/discv5/discover.go @@ -52,7 +52,7 @@ type peerCache struct { type PeerRecord struct { expire int64 Peer peer.AddrInfo - Node enode.Node + Node *enode.Node } type discV5Parameters struct { @@ -126,10 +126,6 @@ func NewDiscoveryV5(host host.Host, priv *ecdsa.PrivateKey, localnode *enode.Loc config: discover.Config{ PrivateKey: priv, Bootnodes: params.bootnodes, - ValidNodeFn: func(n enode.Node) bool { - // TODO: track https://github.com/status-im/nim-waku/issues/770 for improvements over validation func - return evaluateNode(&n) - }, V5Config: discover.V5Config{ ProtocolID: &protocolID, }, @@ -291,7 +287,7 @@ func (d *DiscoveryV5) iterate(ctx context.Context, iterator enode.Iterator, limi for { if len(d.peerCache.recs) >= limit { - break + time.Sleep(1 * time.Minute) } if ctx.Err() != nil { @@ -317,10 +313,15 @@ func (d *DiscoveryV5) iterate(ctx context.Context, iterator enode.Iterator, limi d.peerCache.Lock() for _, p := range peerAddrs { + _, ok := d.peerCache.recs[p.ID] + if ok { + continue + } + d.peerCache.recs[p.ID] = PeerRecord{ expire: time.Now().Unix() + 3600, // Expires in 1hr Peer: p, - Node: *iterator.Node(), + Node: iterator.Node(), } } d.peerCache.Unlock() diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 4633aa9b..b22ed781 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -190,7 +190,10 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { w.opts.wOpts = append(w.opts.wOpts, pubsub.WithDiscovery(w.DiscV5(), w.opts.discV5Opts...)) } - w.peerExchange = peer_exchange.NewWakuPeerExchange(w.host, w.DiscV5(), w.log) + w.peerExchange, err = peer_exchange.NewWakuPeerExchange(w.host, w.DiscV5(), w.log) + if err != nil { + return nil, err + } w.relay = relay.NewWakuRelay(w.host, w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.log, w.opts.wOpts...) w.filter = filter.NewWakuFilter(w.host, w.bcaster, w.opts.isFilterFullNode, w.timesource, w.log, w.opts.filterOpts...) diff --git a/waku/v2/protocol/peer_exchange/waku_peer_exchange.go b/waku/v2/protocol/peer_exchange/waku_peer_exchange.go index a7c09982..f7c7dac7 100644 --- a/waku/v2/protocol/peer_exchange/waku_peer_exchange.go +++ b/waku/v2/protocol/peer_exchange/waku_peer_exchange.go @@ -18,6 +18,7 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" + "github.com/libp2p/go-libp2p/p2p/discovery/backoff" "github.com/libp2p/go-msgio/protoio" "github.com/waku-org/go-waku/logging" "github.com/waku-org/go-waku/waku/v2/discv5" @@ -32,7 +33,7 @@ import ( const PeerExchangeID_v20alpha1 = libp2pProtocol.ID("/vac/waku/peer-exchange/2.0.0-alpha1") const MaxCacheSize = 1000 const CacheCleanWindow = 200 -const dialTimeout = 7 * time.Second +const dialTimeout = 30 * time.Second var ( ErrNoPeersAvailable = errors.New("no suitable remote peers") @@ -40,7 +41,7 @@ var ( ) type peerRecord struct { - node enode.Node + node *enode.Node idx int } @@ -50,24 +51,36 @@ type WakuPeerExchange struct { log *zap.Logger - cancel context.CancelFunc - started bool - wg sync.WaitGroup - + cancel context.CancelFunc + started bool + wg sync.WaitGroup + connector *backoff.BackoffConnector enrCache map[enode.ID]peerRecord // todo: next step: ring buffer; future: implement cache satisfying https://rfc.vac.dev/spec/34/ enrCacheMutex sync.RWMutex rng *rand.Rand } // NewWakuPeerExchange returns a new instance of WakuPeerExchange struct -func NewWakuPeerExchange(h host.Host, disc *discv5.DiscoveryV5, log *zap.Logger) *WakuPeerExchange { +func NewWakuPeerExchange(h host.Host, disc *discv5.DiscoveryV5, log *zap.Logger) (*WakuPeerExchange, error) { wakuPX := new(WakuPeerExchange) wakuPX.h = h wakuPX.disc = disc wakuPX.log = log.Named("wakupx") wakuPX.enrCache = make(map[enode.ID]peerRecord) wakuPX.rng = rand.New(rand.NewSource(rand.Int63())) - return wakuPX + + cacheSize := 600 + rngSrc := rand.NewSource(rand.Int63()) + minBackoff, maxBackoff := time.Second*30, time.Hour + bkf := backoff.NewExponentialBackoff(minBackoff, maxBackoff, backoff.FullJitter, time.Second, 5.0, 0, rand.New(rngSrc)) + connector, err := backoff.NewBackoffConnector(h, cacheSize, dialTimeout, bkf) + + if err != nil { + return nil, err + } + wakuPX.connector = connector + + return wakuPX, nil } // Start inits the peer exchange protocol @@ -122,6 +135,13 @@ func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb if len(peers) != 0 { log.Info("connecting to newly discovered peers", zap.Int("count", len(peers))) + + ch := make(chan peer.AddrInfo, len(peers)) + for _, p := range peers { + ch <- p + } + + wakuPX.connector.Connect(ctx, ch) for _, p := range peers { func(p peer.AddrInfo) { ctx, cancel := context.WithTimeout(ctx, dialTimeout) diff --git a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go index e473cc24..363fb528 100644 --- a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go +++ b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go @@ -136,8 +136,11 @@ func TestRetrieveProvidePeerExchangePeers(t *testing.T) { time.Sleep(3 * time.Second) // Wait some time for peers to be discovered // mount peer exchange - px1 := NewWakuPeerExchange(host1, d1, utils.Logger()) - px3 := NewWakuPeerExchange(host3, nil, utils.Logger()) + px1, err := NewWakuPeerExchange(host1, d1, utils.Logger()) + require.NoError(t, err) + + px3, err := NewWakuPeerExchange(host3, nil, utils.Logger()) + require.NoError(t, err) err = px1.Start(context.Background()) require.NoError(t, err)