diff --git a/.codeclimate.yml b/.codeclimate.yml new file mode 100644 index 00000000..2a6681f1 --- /dev/null +++ b/.codeclimate.yml @@ -0,0 +1,11 @@ +plugins: + golint: + enabled: true + gofmt: + enabled: true + govet: + enabled: true + # golangci-lint: + #enabled: true +exclude_patterns: + - "." diff --git a/tests/utils.go b/tests/utils.go index dff08bec..ae54a594 100644 --- a/tests/utils.go +++ b/tests/utils.go @@ -7,17 +7,14 @@ import ( "fmt" "io" "net" - "sync" "testing" "github.com/ethereum/go-ethereum/log" "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/host" - "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem" "github.com/multiformats/go-multiaddr" - "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol/pb" ) @@ -140,47 +137,3 @@ func RandomHex(n int) (string, error) { } return hex.EncodeToString(bytes), nil } - -type TestPeerDiscoverer struct { - sync.RWMutex - peerMap map[peer.ID]struct{} - peerCh chan peermanager.PeerData -} - -func NewTestPeerDiscoverer() *TestPeerDiscoverer { - result := &TestPeerDiscoverer{ - peerMap: make(map[peer.ID]struct{}), - peerCh: make(chan peermanager.PeerData, 10), - } - - return result -} - -func (t *TestPeerDiscoverer) Subscribe(ctx context.Context, ch <-chan peermanager.PeerData) { - go func() { - for p := range ch { - t.Lock() - t.peerMap[p.AddrInfo.ID] = struct{}{} - t.Unlock() - } - }() -} - -func (t *TestPeerDiscoverer) HasPeer(p peer.ID) bool { - t.RLock() - defer t.RUnlock() - _, ok := t.peerMap[p] - return ok -} - -func (t *TestPeerDiscoverer) PeerCount() int { - t.RLock() - defer t.RUnlock() - return len(t.peerMap) -} - -func (t *TestPeerDiscoverer) Clear() { - t.Lock() - defer t.Unlock() - t.peerMap = make(map[peer.ID]struct{}) -} diff --git a/waku/v2/discv5/discover_test.go b/waku/v2/discv5/discover_test.go index e24bc990..0759b699 100644 --- a/waku/v2/discv5/discover_test.go +++ b/waku/v2/discv5/discover_test.go @@ -14,6 +14,7 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" "github.com/prometheus/client_golang/prometheus" + "github.com/waku-org/go-waku/waku/v2/peermanager" wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr" "github.com/multiformats/go-multiaddr" @@ -108,7 +109,7 @@ func TestDiscV5(t *testing.T) { ip1, _ := extractIP(host1.Addrs()[0]) l1, err := newLocalnode(prvKey1, ip1, udpPort1, wenr.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger()) require.NoError(t, err) - peerconn1 := tests.NewTestPeerDiscoverer() + peerconn1 := peermanager.NewTestPeerDiscoverer() d1, err := NewDiscoveryV5(prvKey1, l1, peerconn1, prometheus.DefaultRegisterer, utils.Logger(), WithUDPPort(uint(udpPort1))) require.NoError(t, err) d1.SetHost(host1) @@ -120,7 +121,7 @@ func TestDiscV5(t *testing.T) { require.NoError(t, err) l2, err := newLocalnode(prvKey2, ip2, udpPort2, wenr.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger()) require.NoError(t, err) - peerconn2 := tests.NewTestPeerDiscoverer() + peerconn2 := peermanager.NewTestPeerDiscoverer() d2, err := NewDiscoveryV5(prvKey2, l2, peerconn2, prometheus.DefaultRegisterer, utils.Logger(), WithUDPPort(uint(udpPort2)), WithBootnodes([]*enode.Node{d1.localnode.Node()})) require.NoError(t, err) d2.SetHost(host2) @@ -132,7 +133,7 @@ func TestDiscV5(t *testing.T) { require.NoError(t, err) l3, err := newLocalnode(prvKey3, ip3, udpPort3, wenr.NewWakuEnrBitfield(true, true, true, true), nil, utils.Logger()) require.NoError(t, err) - peerconn3 := tests.NewTestPeerDiscoverer() + peerconn3 := peermanager.NewTestPeerDiscoverer() d3, err := NewDiscoveryV5(prvKey3, l3, peerconn3, prometheus.DefaultRegisterer, utils.Logger(), WithUDPPort(uint(udpPort3)), WithBootnodes([]*enode.Node{d2.localnode.Node()})) require.NoError(t, err) d3.SetHost(host3) diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index b21c3193..1a468810 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -22,7 +22,6 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/protocol" - "github.com/libp2p/go-libp2p/p2p/discovery/backoff" "github.com/libp2p/go-libp2p/p2p/host/autorelay" "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem" "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto" @@ -254,13 +253,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { //Initialize peer manager. w.peermanager = peermanager.NewPeerManager(w.opts.maxPeerConnections, w.log) - // Setup peer connection strategy - cacheSize := 600 - rngSrc := rand.NewSource(rand.Int63()) - minBackoff, maxBackoff := time.Minute, time.Hour - bkf := backoff.NewExponentialBackoff(minBackoff, maxBackoff, backoff.FullJitter, time.Second, 5.0, 0, rand.New(rngSrc)) - - w.peerConnector, err = peermanager.NewPeerConnectionStrategy(cacheSize, w.peermanager, discoveryConnectTimeout, bkf, w.log) + w.peerConnector, err = peermanager.NewPeerConnectionStrategy(w.peermanager, discoveryConnectTimeout, w.log) if err != nil { w.log.Error("creating peer connection strategy", zap.Error(err)) } diff --git a/waku/v2/peermanager/connection_gater.go b/waku/v2/peermanager/connection_gater.go index fe10e4cc..99abdcfa 100644 --- a/waku/v2/peermanager/connection_gater.go +++ b/waku/v2/peermanager/connection_gater.go @@ -82,6 +82,7 @@ func (c *ConnectionGater) InterceptUpgraded(_ network.Conn) (allow bool, reason return true, 0 } +// NotifyDisconnect is called when a connection disconnects. func (c *ConnectionGater) NotifyDisconnect(addr multiaddr.Multiaddr) { ip, err := manet.ToIP(addr) if err != nil { @@ -111,16 +112,10 @@ func (c *ConnectionGater) validateInboundConn(addr multiaddr.Multiaddr) bool { c.Lock() defer c.Unlock() - currConnections, ok := c.limiter[ip.String()] - if !ok { - c.limiter[ip.String()] = 1 - return true - } else { - if currConnections+1 > maxConnsPerIP { - return false - } - - c.limiter[ip.String()]++ + if currConnections := c.limiter[ip.String()]; currConnections+1 > maxConnsPerIP { + return false } + + c.limiter[ip.String()]++ return true } diff --git a/waku/v2/peermanager/mock_peer_discoverer.go b/waku/v2/peermanager/mock_peer_discoverer.go new file mode 100644 index 00000000..5e3b9c52 --- /dev/null +++ b/waku/v2/peermanager/mock_peer_discoverer.go @@ -0,0 +1,56 @@ +package peermanager + +import ( + "context" + "sync" + + "github.com/libp2p/go-libp2p/core/peer" +) + +// TestPeerDiscoverer is mock peer discoverer for testing +type TestPeerDiscoverer struct { + sync.RWMutex + peerMap map[peer.ID]struct{} +} + +// NewTestPeerDiscoverer is a constructor for TestPeerDiscoverer +func NewTestPeerDiscoverer() *TestPeerDiscoverer { + result := &TestPeerDiscoverer{ + peerMap: make(map[peer.ID]struct{}), + } + + return result +} + +// Subscribe is for subscribing to peer discoverer +func (t *TestPeerDiscoverer) Subscribe(ctx context.Context, ch <-chan PeerData) { + go func() { + for p := range ch { + t.Lock() + t.peerMap[p.AddrInfo.ID] = struct{}{} + t.Unlock() + } + }() +} + +// HasPeer is for checking if a peer is present in peer discoverer +func (t *TestPeerDiscoverer) HasPeer(p peer.ID) bool { + t.RLock() + defer t.RUnlock() + _, ok := t.peerMap[p] + return ok +} + +// PeerCount is for getting the number of peers in peer discoverer +func (t *TestPeerDiscoverer) PeerCount() int { + t.RLock() + defer t.RUnlock() + return len(t.peerMap) +} + +// Clear is for clearing the peer discoverer +func (t *TestPeerDiscoverer) Clear() { + t.Lock() + defer t.Unlock() + t.peerMap = make(map[peer.ID]struct{}) +} diff --git a/waku/v2/peermanager/peer_connector.go b/waku/v2/peermanager/peer_connector.go index fa1e2a87..7468418e 100644 --- a/waku/v2/peermanager/peer_connector.go +++ b/waku/v2/peermanager/peer_connector.go @@ -5,6 +5,7 @@ package peermanager import ( "context" "errors" + "math/rand" "sync" "time" @@ -53,27 +54,34 @@ type PeerConnectionStrategy struct { logger *zap.Logger } +// backoff describes the strategy used to decide how long to backoff after previously attempting to connect to a peer +func getBackOff() backoff.BackoffFactory { + rngSrc := rand.NewSource(rand.Int63()) + minBackoff, maxBackoff := time.Minute, time.Hour + bkf := backoff.NewExponentialBackoff(minBackoff, maxBackoff, backoff.FullJitter, time.Second, 5.0, 0, rand.New(rngSrc)) + return bkf +} + // NewPeerConnectionStrategy creates a utility to connect to peers, // but only if we have not recently tried connecting to them already. // -// cacheSize is the size of a TwoQueueCache // dialTimeout is how long we attempt to connect to a peer before giving up // minPeers is the minimum number of peers that the node should have -// backoff describes the strategy used to decide how long to backoff after previously attempting to connect to a peer -func NewPeerConnectionStrategy(cacheSize int, pm *PeerManager, - dialTimeout time.Duration, backoff backoff.BackoffFactory, - logger *zap.Logger) (*PeerConnectionStrategy, error) { - +func NewPeerConnectionStrategy(pm *PeerManager, + dialTimeout time.Duration, logger *zap.Logger) (*PeerConnectionStrategy, error) { + // cacheSize is the size of a TwoQueueCache + cacheSize := 600 cache, err := lru.New2Q(cacheSize) if err != nil { return nil, err } + // pc := &PeerConnectionStrategy{ cache: cache, wg: sync.WaitGroup{}, dialTimeout: dialTimeout, pm: pm, - backoff: backoff, + backoff: getBackOff(), logger: logger.Named("discovery-connector"), } pm.SetPeerConnector(pc) diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index f3d315b4..e888c12a 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -17,8 +17,8 @@ import ( "go.uber.org/zap" ) -// TODO: Move all the protocol IDs to a common location. // WakuRelayIDv200 is protocol ID for Waku v2 relay protocol +// TODO: Move all the protocol IDs to a common location. const WakuRelayIDv200 = protocol.ID("/vac/waku/relay/2.0.0") // PeerManager applies various controls and manage connections towards peers. @@ -258,7 +258,7 @@ func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, protocol //Add Service peers to serviceSlots. for _, proto := range protocols { - pm.AddPeerToServiceSlot(proto, info.ID) + pm.addPeerToServiceSlot(proto, info.ID) } //Add to the peer-store @@ -279,10 +279,10 @@ func (pm *PeerManager) RemovePeer(peerID peer.ID) { pm.serviceSlots.removePeer(peerID) } -// AddPeerToServiceSlot adds a peerID to serviceSlot. +// addPeerToServiceSlot adds a peerID to serviceSlot. // Adding to peerStore is expected to be already done by caller. // If relay proto is passed, it is not added to serviceSlot. -func (pm *PeerManager) AddPeerToServiceSlot(proto protocol.ID, peerID peer.ID) { +func (pm *PeerManager) addPeerToServiceSlot(proto protocol.ID, peerID peer.ID) { if proto == WakuRelayIDv200 { pm.logger.Warn("Cannot add Relay peer to service peer slots") return diff --git a/waku/v2/peermanager/peer_manager_test.go b/waku/v2/peermanager/peer_manager_test.go new file mode 100644 index 00000000..15c2aedf --- /dev/null +++ b/waku/v2/peermanager/peer_manager_test.go @@ -0,0 +1,153 @@ +package peermanager + +import ( + "context" + "crypto/rand" + "fmt" + "testing" + "time" + + "github.com/libp2p/go-libp2p/core/host" + libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" + "github.com/multiformats/go-multiaddr" + "github.com/stretchr/testify/require" + "github.com/waku-org/go-waku/tests" + wps "github.com/waku-org/go-waku/waku/v2/peerstore" + "github.com/waku-org/go-waku/waku/v2/utils" +) + +func getAddr(h host.Host) multiaddr.Multiaddr { + id, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", h.ID().Pretty())) + return h.Network().ListenAddresses()[0].Encapsulate(id) +} + +func initTest(t *testing.T) (context.Context, *PeerManager, func()) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + // hosts + h1, err := tests.MakeHost(ctx, 0, rand.Reader) + require.NoError(t, err) + defer h1.Close() + // host 1 is used by peer manager + pm := NewPeerManager(10, utils.Logger()) + pm.SetHost(h1) + return ctx, pm, func() { + cancel() + h1.Close() + } +} + +func TestServiceSlots(t *testing.T) { + ctx, pm, deferFn := initTest(t) + defer deferFn() + + h2, err := tests.MakeHost(ctx, 0, rand.Reader) + require.NoError(t, err) + defer h2.Close() + + h3, err := tests.MakeHost(ctx, 0, rand.Reader) + require.NoError(t, err) + defer h3.Close() + // protocols + protocol := libp2pProtocol.ID("test/protocol") + protocol1 := libp2pProtocol.ID("test/protocol1") + + // add h2 peer to peer manager + t.Log(h2.ID()) + _, err = pm.AddPeer(getAddr(h2), wps.Static, libp2pProtocol.ID(protocol)) + require.NoError(t, err) + + /////////////// + // getting peer for protocol + /////////////// + + // select peer from pm, currently only h2 is set in pm + peerID, err := pm.SelectPeer(protocol, nil, utils.Logger()) + require.NoError(t, err) + require.Equal(t, peerID, h2.ID()) + + // add h3 peer to peer manager + _, err = pm.AddPeer(getAddr(h3), wps.Static, libp2pProtocol.ID(protocol)) + require.NoError(t, err) + + // check that returned peer is h2 or h3 peer + peerID, err = pm.SelectPeer(protocol, nil, utils.Logger()) + require.NoError(t, err) + if peerID == h2.ID() || peerID == h3.ID() { + //Test success + t.Log("Random peer selection per protocol successful") + } else { + t.FailNow() + } + + /////////////// + // getting peer for protocol1 + /////////////// + h4, err := tests.MakeHost(ctx, 0, rand.Reader) + require.NoError(t, err) + defer h4.Close() + + _, err = pm.SelectPeer(protocol1, nil, utils.Logger()) + require.Error(t, err, utils.ErrNoPeersAvailable) + + // add h4 peer for protocol1 + _, err = pm.AddPeer(getAddr(h4), wps.Static, libp2pProtocol.ID(protocol1)) + require.NoError(t, err) + + //Test peer selection for protocol1 + peerID, err = pm.SelectPeer(protocol1, nil, utils.Logger()) + require.NoError(t, err) + require.Equal(t, peerID, h4.ID()) + +} + +func TestDefaultProtocol(t *testing.T) { + ctx, pm, deferFn := initTest(t) + defer deferFn() + /////////////// + // check peer for default protocol + /////////////// + //Test empty peer selection for relay protocol + _, err := pm.SelectPeer(WakuRelayIDv200, nil, utils.Logger()) + require.Error(t, err, utils.ErrNoPeersAvailable) + + /////////////// + // getting peer for default protocol + /////////////// + h5, err := tests.MakeHost(ctx, 0, rand.Reader) + require.NoError(t, err) + defer h5.Close() + + //Test peer selection for relay protocol from peer store + _, err = pm.AddPeer(getAddr(h5), wps.Static, WakuRelayIDv200) + require.NoError(t, err) + + // since we are not passing peerList, selectPeer fn using filterByProto checks in PeerStore for peers with same protocol. + peerID, err := pm.SelectPeer(WakuRelayIDv200, nil, utils.Logger()) + require.NoError(t, err) + require.Equal(t, peerID, h5.ID()) +} + +func TestAdditionAndRemovalOfPeer(t *testing.T) { + ctx, pm, deferFn := initTest(t) + defer deferFn() + /////////////// + // set h6 peer for protocol2 and remove that peer and check again + /////////////// + //Test random peer selection + protocol2 := libp2pProtocol.ID("test/protocol2") + h6, err := tests.MakeHost(ctx, 0, rand.Reader) + require.NoError(t, err) + defer h6.Close() + + _, err = pm.AddPeer(getAddr(h6), wps.Static, protocol2) + require.NoError(t, err) + + peerID, err := pm.SelectPeer(protocol2, nil, utils.Logger()) + require.NoError(t, err) + require.Equal(t, peerID, h6.ID()) + + pm.RemovePeer(peerID) + _, err = pm.SelectPeer(protocol2, nil, utils.Logger()) + require.Error(t, err, utils.ErrNoPeersAvailable) +} diff --git a/waku/v2/peermanager/test/peer_manager_test.go b/waku/v2/peermanager/test/peer_manager_test.go deleted file mode 100644 index cd40e476..00000000 --- a/waku/v2/peermanager/test/peer_manager_test.go +++ /dev/null @@ -1,122 +0,0 @@ -package test - -import ( - "context" - "crypto/rand" - "testing" - "time" - - "github.com/libp2p/go-libp2p/core/peerstore" - libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" - "github.com/stretchr/testify/require" - "github.com/waku-org/go-waku/tests" - "github.com/waku-org/go-waku/waku/v2/peermanager" - "github.com/waku-org/go-waku/waku/v2/utils" -) - -func TestServiceSlots(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - h1, err := tests.MakeHost(ctx, 0, rand.Reader) - require.NoError(t, err) - defer h1.Close() - - h2, err := tests.MakeHost(ctx, 0, rand.Reader) - require.NoError(t, err) - defer h2.Close() - - h3, err := tests.MakeHost(ctx, 0, rand.Reader) - require.NoError(t, err) - defer h3.Close() - protocol := libp2pProtocol.ID("test/protocol") - protocol1 := libp2pProtocol.ID("test/protocol1") - - pm := peermanager.NewPeerManager(10, utils.Logger()) - pm.SetHost(h1) - - h1.Peerstore().AddAddrs(h2.ID(), h2.Network().ListenAddresses(), peerstore.PermanentAddrTTL) - err = h1.Peerstore().AddProtocols(h2.ID(), libp2pProtocol.ID(protocol)) - require.NoError(t, err) - - //Test selection from peerStore. - peerId, err := pm.SelectPeer(protocol, nil, utils.Logger()) - require.NoError(t, err) - require.Equal(t, peerId, h2.ID()) - - //Test addition and selection from service-slot - pm.AddPeerToServiceSlot(protocol, h2.ID()) - - peerId, err = pm.SelectPeer(protocol, nil, utils.Logger()) - require.NoError(t, err) - if peerId == h2.ID() || peerId == h1.ID() { - //Test success - t.Log("Random peer selection per protocol successful") - } else { - t.FailNow() - } - require.Equal(t, peerId, h2.ID()) - - h1.Peerstore().AddAddrs(h3.ID(), h3.Network().ListenAddresses(), peerstore.PermanentAddrTTL) - pm.AddPeerToServiceSlot(protocol, h3.ID()) - - h4, err := tests.MakeHost(ctx, 0, rand.Reader) - require.NoError(t, err) - defer h4.Close() - - h1.Peerstore().AddAddrs(h4.ID(), h4.Network().ListenAddresses(), peerstore.PermanentAddrTTL) - pm.AddPeerToServiceSlot(protocol1, h4.ID()) - - //Test peer selection from first added peer to serviceSlot - peerId, err = pm.SelectPeer(protocol, nil, utils.Logger()) - require.NoError(t, err) - if peerId == h2.ID() || peerId == h3.ID() { - //Test success - t.Log("Random peer selection per protocol successful") - } else { - t.FailNow() - } - - //Test peer selection for specific protocol - peerId, err = pm.SelectPeer(protocol1, nil, utils.Logger()) - require.NoError(t, err) - require.Equal(t, peerId, h4.ID()) - - h5, err := tests.MakeHost(ctx, 0, rand.Reader) - require.NoError(t, err) - defer h5.Close() - - //Test empty peer selection for relay protocol - _, err = pm.SelectPeer(peermanager.WakuRelayIDv200, nil, utils.Logger()) - require.Error(t, err, utils.ErrNoPeersAvailable) - //Test peer selection for relay protocol from peer store - h1.Peerstore().AddAddrs(h5.ID(), h5.Network().ListenAddresses(), peerstore.PermanentAddrTTL) - pm.AddPeerToServiceSlot(peermanager.WakuRelayIDv200, h5.ID()) - - _, err = pm.SelectPeer(peermanager.WakuRelayIDv200, nil, utils.Logger()) - require.Error(t, err, utils.ErrNoPeersAvailable) - - err = h1.Peerstore().AddProtocols(h5.ID(), peermanager.WakuRelayIDv200) - require.NoError(t, err) - - peerId, err = pm.SelectPeer(peermanager.WakuRelayIDv200, nil, utils.Logger()) - require.NoError(t, err) - require.Equal(t, peerId, h5.ID()) - - //Test random peer selection - protocol2 := libp2pProtocol.ID("test/protocol2") - h6, err := tests.MakeHost(ctx, 0, rand.Reader) - require.NoError(t, err) - defer h6.Close() - - h1.Peerstore().AddAddrs(h6.ID(), h6.Network().ListenAddresses(), peerstore.PermanentAddrTTL) - err = h1.Peerstore().AddProtocols(h6.ID(), libp2pProtocol.ID(protocol2)) - require.NoError(t, err) - - peerId, err = pm.SelectPeer(protocol2, nil, utils.Logger()) - require.NoError(t, err) - require.Equal(t, peerId, h6.ID()) - - pm.RemovePeer(peerId) - _, err = pm.SelectPeer(protocol2, nil, utils.Logger()) - require.Error(t, err, utils.ErrNoPeersAvailable) -} diff --git a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go index 695dfc13..93e4f0a1 100644 --- a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go +++ b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go @@ -21,6 +21,7 @@ import ( "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/waku/v2/discv5" + "github.com/waku-org/go-waku/waku/v2/peermanager" wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" @@ -107,7 +108,7 @@ func TestRetrieveProvidePeerExchangePeers(t *testing.T) { ip1, _ := extractIP(host1.Addrs()[0]) l1, err := newLocalnode(prvKey1, ip1, udpPort1, wenr.NewWakuEnrBitfield(false, false, false, true), nil, utils.Logger()) require.NoError(t, err) - discv5PeerConn1 := tests.NewTestPeerDiscoverer() + discv5PeerConn1 := peermanager.NewTestPeerDiscoverer() d1, err := discv5.NewDiscoveryV5(prvKey1, l1, discv5PeerConn1, prometheus.DefaultRegisterer, utils.Logger(), discv5.WithUDPPort(uint(udpPort1))) require.NoError(t, err) d1.SetHost(host1) @@ -119,7 +120,7 @@ func TestRetrieveProvidePeerExchangePeers(t *testing.T) { require.NoError(t, err) l2, err := newLocalnode(prvKey2, ip2, udpPort2, wenr.NewWakuEnrBitfield(false, false, false, true), nil, utils.Logger()) require.NoError(t, err) - discv5PeerConn2 := tests.NewTestPeerDiscoverer() + discv5PeerConn2 := peermanager.NewTestPeerDiscoverer() d2, err := discv5.NewDiscoveryV5(prvKey2, l2, discv5PeerConn2, prometheus.DefaultRegisterer, utils.Logger(), discv5.WithUDPPort(uint(udpPort2)), discv5.WithBootnodes([]*enode.Node{d1.Node()})) require.NoError(t, err) d2.SetHost(host2) @@ -142,12 +143,12 @@ func TestRetrieveProvidePeerExchangePeers(t *testing.T) { time.Sleep(3 * time.Second) // Wait some time for peers to be discovered // mount peer exchange - pxPeerConn1 := tests.NewTestPeerDiscoverer() + pxPeerConn1 := peermanager.NewTestPeerDiscoverer() px1, err := NewWakuPeerExchange(d1, pxPeerConn1, nil, prometheus.DefaultRegisterer, utils.Logger()) require.NoError(t, err) px1.SetHost(host1) - pxPeerConn3 := tests.NewTestPeerDiscoverer() + pxPeerConn3 := peermanager.NewTestPeerDiscoverer() px3, err := NewWakuPeerExchange(nil, pxPeerConn3, nil, prometheus.DefaultRegisterer, utils.Logger()) require.NoError(t, err) px3.SetHost(host3)