From e2b87eee7b3ab1fd3cc4d07ea8a48de69380f23a Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Mon, 24 Jun 2024 13:54:06 +0530 Subject: [PATCH] feat: modify peer-manager to consider relay target peers for connecting to peers --- waku/v2/node/wakunode2.go | 2 +- waku/v2/node/wakuoptions.go | 5 +- waku/v2/peermanager/peer_connector.go | 5 +- waku/v2/peermanager/peer_manager.go | 88 +++++++++++-------- waku/v2/peermanager/peer_manager_test.go | 6 +- waku/v2/peermanager/topic_event_handler.go | 6 +- .../peermanager/topic_event_handler_test.go | 9 +- waku/v2/protocol/filter/test_utils.go | 2 +- .../legacy_store/waku_store_client_test.go | 2 +- .../protocol/lightpush/waku_lightpush_test.go | 2 +- waku/v2/protocol/metadata/waku_metadata.go | 19 ++-- .../peer_exchange/waku_peer_exchange_test.go | 4 +- waku/v2/protocol/store/client_test.go | 2 +- 13 files changed, 89 insertions(+), 63 deletions(-) diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 7c0e70c8..3d2aa1a5 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -256,7 +256,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { w.metadata = metadata //Initialize peer manager. - w.peermanager = peermanager.NewPeerManager(w.opts.maxPeerConnections, w.opts.peerStoreCapacity, metadata, w.log) + w.peermanager = peermanager.NewPeerManager(w.opts.maxPeerConnections, w.opts.peerStoreCapacity, metadata, params.enableRelay, w.log) w.peerConnector, err = peermanager.NewPeerConnectionStrategy(w.peermanager, discoveryConnectTimeout, w.log) if err != nil { diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index 0c175992..a34376c1 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -44,6 +44,8 @@ const UserAgent string = "go-waku" const defaultMinRelayPeersToPublish = 0 const DefaultMaxConnectionsPerIP = 5 +const DefaultMaxConnections = 300 +const DefaultMaxPeerStoreCapacity = 300 type WakuNodeParameters struct { hostAddr *net.TCPAddr @@ -124,9 +126,10 @@ type WakuNodeOption func(*WakuNodeParameters) error // Default options used in the libp2p node var DefaultWakuNodeOptions = []WakuNodeOption{ WithPrometheusRegisterer(prometheus.NewRegistry()), - WithMaxPeerConnections(50), + WithMaxPeerConnections(DefaultMaxConnections), WithMaxConnectionsPerIP(DefaultMaxConnectionsPerIP), WithCircuitRelayParams(2*time.Second, 3*time.Minute), + WithPeerStoreCapacity(DefaultMaxPeerStoreCapacity), } // MultiAddresses return the list of multiaddresses configured in the node diff --git a/waku/v2/peermanager/peer_connector.go b/waku/v2/peermanager/peer_connector.go index 6f40a171..f081c564 100644 --- a/waku/v2/peermanager/peer_connector.go +++ b/waku/v2/peermanager/peer_connector.go @@ -17,7 +17,6 @@ import ( "github.com/libp2p/go-libp2p/p2p/discovery/backoff" "github.com/waku-org/go-waku/logging" wps "github.com/waku-org/go-waku/waku/v2/peerstore" - waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/service" "go.uber.org/zap" @@ -127,7 +126,7 @@ func (c *PeerConnectionStrategy) consumeSubscription(s subscription) { triggerImmediateConnection := false //Not connecting to peer as soon as it is discovered, // rather expecting this to be pushed from PeerManager based on the need. - if len(c.host.Network().Peers()) < waku_proto.GossipSubDMin { + if len(c.host.Network().Peers()) < c.pm.OutPeersTarget { triggerImmediateConnection = true } c.logger.Debug("adding discovered peer", logging.HostID("peerID", p.AddrInfo.ID)) @@ -227,7 +226,7 @@ func (c *PeerConnectionStrategy) addConnectionBackoff(peerID peer.ID) { func (c *PeerConnectionStrategy) dialPeers() { defer c.WaitGroup().Done() - maxGoRoutines := c.pm.OutRelayPeersTarget + maxGoRoutines := c.pm.OutPeersTarget if maxGoRoutines > maxActiveDials { maxGoRoutines = maxActiveDials } diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index 554688da..c3b1cf74 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -73,8 +73,8 @@ type PeerManager struct { maxPeers int maxRelayPeers int logger *zap.Logger - InRelayPeersTarget int - OutRelayPeersTarget int + InPeersTarget int + OutPeersTarget int host host.Host serviceSlots *ServiceSlots ctx context.Context @@ -85,6 +85,7 @@ type PeerManager struct { wakuprotoToENRFieldMap map[protocol.ID]WakuProtoInfo TopicHealthNotifCh chan<- TopicHealthStatus rttCache *FastestPeerSelector + RelayEnabled bool } // PeerSelection provides various options based on which Peer is selected from a list of peers. @@ -143,6 +144,7 @@ func (pm *PeerManager) checkAndUpdateTopicHealth(topic *NodeTopicDetails) int { } } //Update topic's health + //TODO: This should be done based on number of full-mesh peers. oldHealth := topic.healthStatus if healthyPeerCount < 1 { //Ideally this check should be done with minPeersForRelay, but leaving it as is for now. topic.healthStatus = UnHealthy @@ -174,31 +176,38 @@ func (pm *PeerManager) TopicHealth(pubsubTopic string) (TopicHealth, error) { } // NewPeerManager creates a new peerManager instance. -func NewPeerManager(maxConnections int, maxPeers int, metadata *metadata.WakuMetadata, logger *zap.Logger) *PeerManager { +func NewPeerManager(maxConnections int, maxPeers int, metadata *metadata.WakuMetadata, relayEnabled bool, logger *zap.Logger) *PeerManager { + var inPeersTarget, outPeersTarget, maxRelayPeers int + if relayEnabled { + maxRelayPeers, _ := relayAndServicePeers(maxConnections) + inPeersTarget, outPeersTarget = inAndOutRelayPeers(maxRelayPeers) - maxRelayPeers, _ := relayAndServicePeers(maxConnections) - inRelayPeersTarget, outRelayPeersTarget := inAndOutRelayPeers(maxRelayPeers) - - if maxPeers == 0 || maxConnections > maxPeers { - maxPeers = maxConnsToPeerRatio * maxConnections + if maxPeers == 0 || maxConnections > maxPeers { + maxPeers = maxConnsToPeerRatio * maxConnections + } + } else { + maxRelayPeers = 0 + inPeersTarget = 0 + //TODO: ideally this should be 2 filter peers per topic, 2 lightpush peers per topic and 2-4 store nodes per topic + outPeersTarget = 10 } - pm := &PeerManager{ logger: logger.Named("peer-manager"), metadata: metadata, maxRelayPeers: maxRelayPeers, - InRelayPeersTarget: inRelayPeersTarget, - OutRelayPeersTarget: outRelayPeersTarget, + InPeersTarget: inPeersTarget, + OutPeersTarget: outPeersTarget, serviceSlots: NewServiceSlot(), subRelayTopics: make(map[string]*NodeTopicDetails), maxPeers: maxPeers, wakuprotoToENRFieldMap: map[protocol.ID]WakuProtoInfo{}, rttCache: NewFastestPeerSelector(logger), + RelayEnabled: relayEnabled, } logger.Info("PeerManager init values", zap.Int("maxConnections", maxConnections), zap.Int("maxRelayPeers", maxRelayPeers), - zap.Int("outRelayPeersTarget", outRelayPeersTarget), - zap.Int("inRelayPeersTarget", pm.InRelayPeersTarget), + zap.Int("outPeersTarget", outPeersTarget), + zap.Int("inPeersTarget", pm.InPeersTarget), zap.Int("maxPeers", maxPeers)) return pm @@ -225,7 +234,7 @@ func (pm *PeerManager) Start(ctx context.Context) { pm.RegisterWakuProtocol(relay.WakuRelayID_v200, relay.WakuRelayENRField) pm.ctx = ctx - if pm.sub != nil { + if pm.sub != nil && pm.RelayEnabled { go pm.peerEventLoop(ctx) } go pm.connectivityLoop(ctx) @@ -233,7 +242,7 @@ func (pm *PeerManager) Start(ctx context.Context) { // This is a connectivity loop, which currently checks and prunes inbound connections. func (pm *PeerManager) connectivityLoop(ctx context.Context) { - pm.connectToRelayPeers() + pm.connectToPeers() t := time.NewTicker(peerConnectivityLoopSecs * time.Second) defer t.Stop() for { @@ -241,7 +250,7 @@ func (pm *PeerManager) connectivityLoop(ctx context.Context) { case <-ctx.Done(): return case <-t.C: - pm.connectToRelayPeers() + pm.connectToPeers() } } } @@ -302,10 +311,10 @@ func (pm *PeerManager) ensureMinRelayConnsPerTopic() { // match those peers that are currently connected curPeerLen := pm.checkAndUpdateTopicHealth(topicInst) - if curPeerLen < waku_proto.GossipSubDMin { - pm.logger.Debug("subscribed topic is not sufficiently healthy, initiating more connections to maintain health", + if curPeerLen < pm.OutPeersTarget { + pm.logger.Debug("subscribed topic has not reached target peers, initiating more connections to maintain healthy mesh", zap.String("pubSubTopic", topicStr), zap.Int("connectedPeerCount", curPeerLen), - zap.Int("optimumPeers", waku_proto.GossipSubDMin)) + zap.Int("targetPeers", pm.OutPeersTarget)) //Find not connected peers. notConnectedPeers := pm.getNotConnectedPers(topicStr) if notConnectedPeers.Len() == 0 { @@ -315,35 +324,42 @@ func (pm *PeerManager) ensureMinRelayConnsPerTopic() { } pm.logger.Debug("connecting to eligible peers in peerstore", zap.String("pubSubTopic", topicStr)) //Connect to eligible peers. - numPeersToConnect := waku_proto.GossipSubDMin - curPeerLen + numPeersToConnect := pm.OutPeersTarget - curPeerLen if numPeersToConnect > notConnectedPeers.Len() { numPeersToConnect = notConnectedPeers.Len() } - pm.connectToPeers(notConnectedPeers[0:numPeersToConnect]) + pm.connectToSpecifiedPeers(notConnectedPeers[0:numPeersToConnect]) } } } -// connectToRelayPeers ensures minimum D connections are there for each pubSubTopic. +// connectToPeers ensures minimum D connections are there for each pubSubTopic. // If not, initiates connections to additional peers. // It also checks for incoming relay connections and prunes once they cross inRelayTarget -func (pm *PeerManager) connectToRelayPeers() { - //Check for out peer connections and connect to more peers. - pm.ensureMinRelayConnsPerTopic() +func (pm *PeerManager) connectToPeers() { + if pm.RelayEnabled { + //Check for out peer connections and connect to more peers. + pm.ensureMinRelayConnsPerTopic() - inRelayPeers, outRelayPeers := pm.getRelayPeers() - pm.logger.Debug("number of relay peers connected", - zap.Int("in", inRelayPeers.Len()), - zap.Int("out", outRelayPeers.Len())) - if inRelayPeers.Len() > 0 && - inRelayPeers.Len() > pm.InRelayPeersTarget { - pm.pruneInRelayConns(inRelayPeers) + inRelayPeers, outRelayPeers := pm.getRelayPeers() + pm.logger.Debug("number of relay peers connected", + zap.Int("in", inRelayPeers.Len()), + zap.Int("out", outRelayPeers.Len())) + if inRelayPeers.Len() > 0 && + inRelayPeers.Len() > pm.InPeersTarget { + pm.pruneInRelayConns(inRelayPeers) + } + } else { + //TODO: Connect to filter peers per topic as of now. + //Fetch filter peers from peerStore, TODO: topics for lightNode not available here? + //Filter subscribe to notify peerManager whenever a new topic/shard is subscribed to. + pm.logger.Debug("light mode..not doing anything") } } -// connectToPeers connects to peers provided in the list if the addresses have not expired. -func (pm *PeerManager) connectToPeers(peers peer.IDSlice) { +// connectToSpecifiedPeers connects to peers provided in the list if the addresses have not expired. +func (pm *PeerManager) connectToSpecifiedPeers(peers peer.IDSlice) { for _, peerID := range peers { peerData := AddrInfoToPeerData(wps.PeerManager, peerID, pm.host) if peerData == nil { @@ -377,8 +393,8 @@ func (pm *PeerManager) pruneInRelayConns(inRelayPeers peer.IDSlice) { //TODO: Need to have more intelligent way of doing this, maybe peer scores. //TODO: Keep optimalPeersRequired for a pubSubTopic in mind while pruning connections to peers. pm.logger.Info("peer connections exceed target relay peers, hence pruning", - zap.Int("cnt", inRelayPeers.Len()), zap.Int("target", pm.InRelayPeersTarget)) - for pruningStartIndex := pm.InRelayPeersTarget; pruningStartIndex < inRelayPeers.Len(); pruningStartIndex++ { + zap.Int("cnt", inRelayPeers.Len()), zap.Int("target", pm.InPeersTarget)) + for pruningStartIndex := pm.InPeersTarget; pruningStartIndex < inRelayPeers.Len(); pruningStartIndex++ { p := inRelayPeers[pruningStartIndex] err := pm.host.Network().ClosePeer(p) if err != nil { diff --git a/waku/v2/peermanager/peer_manager_test.go b/waku/v2/peermanager/peer_manager_test.go index c63efc0d..9493225a 100644 --- a/waku/v2/peermanager/peer_manager_test.go +++ b/waku/v2/peermanager/peer_manager_test.go @@ -30,7 +30,7 @@ func initTest(t *testing.T) (context.Context, *PeerManager, func()) { require.NoError(t, err) // host 1 is used by peer manager - pm := NewPeerManager(10, 20, nil, utils.Logger()) + pm := NewPeerManager(10, 20, nil, true, utils.Logger()) pm.SetHost(h1) return ctx, pm, func() { @@ -228,7 +228,7 @@ func TestConnectToRelayPeers(t *testing.T) { defer deferFn() - pm.connectToRelayPeers() + pm.connectToPeers() } @@ -252,7 +252,7 @@ func createHostWithDiscv5AndPM(t *testing.T, hostName string, topic string, enrF err = wenr.Update(utils.Logger(), localNode, wenr.WithWakuRelaySharding(rs[0])) require.NoError(t, err) - pm := NewPeerManager(10, 20, nil, logger) + pm := NewPeerManager(10, 20, nil, true, logger) pm.SetHost(host) peerconn, err := NewPeerConnectionStrategy(pm, 30*time.Second, logger) require.NoError(t, err) diff --git a/waku/v2/peermanager/topic_event_handler.go b/waku/v2/peermanager/topic_event_handler.go index 3060bf7b..1b965ef0 100644 --- a/waku/v2/peermanager/topic_event_handler.go +++ b/waku/v2/peermanager/topic_event_handler.go @@ -48,7 +48,9 @@ func (pm *PeerManager) handleNewRelayTopicSubscription(pubsubTopic string, topic pm.checkAndUpdateTopicHealth(pm.subRelayTopics[pubsubTopic]) - if connectedPeers >= waku_proto.GossipSubDMin { //TODO: Use a config rather than hard-coding. + //Leaving this logic based on gossipSubDMin as this is a good start for a subscribed topic. + // subsequent connectivity loop iteration would initiate more connections which should take it towards a healthy mesh. + if connectedPeers >= waku_proto.GossipSubDMin { // Should we use optimal number or define some sort of a config for the node to choose from? // A desktop node may choose this to be 4-6, whereas a service node may choose this to be 8-12 based on resources it has // or bandwidth it can support. @@ -70,7 +72,7 @@ func (pm *PeerManager) handleNewRelayTopicSubscription(pubsubTopic string, topic } //For now all peers are being given same priority, // Later we may want to choose peers that have more shards in common over others. - pm.connectToPeers(notConnectedPeers[0:numPeersToConnect]) + pm.connectToSpecifiedPeers(notConnectedPeers[0:numPeersToConnect]) } else { triggerDiscovery = true } diff --git a/waku/v2/peermanager/topic_event_handler_test.go b/waku/v2/peermanager/topic_event_handler_test.go index 8fc8da94..f072019b 100644 --- a/waku/v2/peermanager/topic_event_handler_test.go +++ b/waku/v2/peermanager/topic_event_handler_test.go @@ -3,6 +3,9 @@ package peermanager import ( "context" "crypto/rand" + "testing" + "time" + "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" @@ -17,8 +20,6 @@ import ( "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" - "testing" - "time" ) func makeWakuRelay(t *testing.T, log *zap.Logger) (*relay.WakuRelay, host.Host, relay.Broadcaster) { @@ -44,7 +45,7 @@ func makeWakuRelay(t *testing.T, log *zap.Logger) (*relay.WakuRelay, host.Host, func makePeerManagerWithEventBus(t *testing.T, r *relay.WakuRelay, h *host.Host) (*PeerManager, event.Bus) { // Host 1 used by peer manager - pm := NewPeerManager(10, 20, nil, utils.Logger()) + pm := NewPeerManager(10, 20, nil, true, utils.Logger()) pm.SetHost(*h) // Create a new relay event bus @@ -77,7 +78,7 @@ func TestSubscribeToRelayEvtBus(t *testing.T) { r, h1, _ := makeWakuRelay(t, log) // Host 1 used by peer manager - pm := NewPeerManager(10, 20, nil, utils.Logger()) + pm := NewPeerManager(10, 20, nil, true, utils.Logger()) pm.SetHost(h1) // Create a new relay event bus diff --git a/waku/v2/protocol/filter/test_utils.go b/waku/v2/protocol/filter/test_utils.go index 2beabc2d..8be1df35 100644 --- a/waku/v2/protocol/filter/test_utils.go +++ b/waku/v2/protocol/filter/test_utils.go @@ -164,7 +164,7 @@ func (s *FilterTestSuite) GetWakuFilterLightNode() LightNodeData { s.Require().NoError(err) b := relay.NewBroadcaster(10) s.Require().NoError(b.Start(context.Background())) - pm := peermanager.NewPeerManager(5, 5, nil, s.Log) + pm := peermanager.NewPeerManager(5, 5, nil, true, s.Log) filterPush := NewWakuFilterLightNode(b, pm, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.Log) filterPush.SetHost(host) pm.SetHost(host) diff --git a/waku/v2/protocol/legacy_store/waku_store_client_test.go b/waku/v2/protocol/legacy_store/waku_store_client_test.go index c947a64d..7d9ebcba 100644 --- a/waku/v2/protocol/legacy_store/waku_store_client_test.go +++ b/waku/v2/protocol/legacy_store/waku_store_client_test.go @@ -36,7 +36,7 @@ func TestQueryOptions(t *testing.T) { require.NoError(t, err) // Let peer manager reside at host - pm := peermanager.NewPeerManager(5, 5, nil, utils.Logger()) + pm := peermanager.NewPeerManager(5, 5, nil, true, utils.Logger()) pm.SetHost(host) // Add host2 to peerstore diff --git a/waku/v2/protocol/lightpush/waku_lightpush_test.go b/waku/v2/protocol/lightpush/waku_lightpush_test.go index 8b3d93bb..99525b1e 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_test.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_test.go @@ -237,7 +237,7 @@ func TestWakuLightPushCornerCases(t *testing.T) { testContentTopic := "/test/10/my-lp-app/proto" // Prepare peer manager instance to include in test - pm := peermanager.NewPeerManager(10, 10, nil, utils.Logger()) + pm := peermanager.NewPeerManager(10, 10, nil, true, utils.Logger()) node1, sub1, host1 := makeWakuRelay(t, testTopic) defer node1.Stop() diff --git a/waku/v2/protocol/metadata/waku_metadata.go b/waku/v2/protocol/metadata/waku_metadata.go index 228f4487..87590203 100644 --- a/waku/v2/protocol/metadata/waku_metadata.go +++ b/waku/v2/protocol/metadata/waku_metadata.go @@ -125,27 +125,30 @@ func (wakuM *WakuMetadata) Request(ctx context.Context, peerID peer.ID) (*protoc writer := pbio.NewDelimitedWriter(stream) reader := pbio.NewDelimitedReader(stream, math.MaxInt32) + logger.Debug("sending metadata request") err = writer.WriteMsg(request) if err != nil { logger.Error("writing request", zap.Error(err)) if err := stream.Reset(); err != nil { - wakuM.log.Error("resetting connection", zap.Error(err)) + logger.Error("resetting connection", zap.Error(err)) } return nil, err } + logger.Debug("sent metadata request") response := &pb.WakuMetadataResponse{} err = reader.ReadMsg(response) if err != nil { logger.Error("reading response", zap.Error(err)) if err := stream.Reset(); err != nil { - wakuM.log.Error("resetting connection", zap.Error(err)) + logger.Error("resetting connection", zap.Error(err)) } return nil, err } stream.Close() + logger.Debug("received metadata response") if response.ClusterId == nil { return nil, errors.New("node did not provide a waku clusterid") @@ -163,6 +166,7 @@ func (wakuM *WakuMetadata) Request(ctx context.Context, peerID peer.ID) (*protoc rShardIDs = append(rShardIDs, uint16(i)) } } + logger.Debug("getting remote cluster and shards") rs, err := protocol.NewRelayShards(rClusterID, rShardIDs...) if err != nil { @@ -176,7 +180,7 @@ func (wakuM *WakuMetadata) onRequest(ctx context.Context) func(network.Stream) { return func(stream network.Stream) { logger := wakuM.log.With(logging.HostID("peer", stream.Conn().RemotePeer())) request := &pb.WakuMetadataRequest{} - + logger.Debug("received metadata request from peer") writer := pbio.NewDelimitedWriter(stream) reader := pbio.NewDelimitedReader(stream, math.MaxInt32) @@ -184,11 +188,10 @@ func (wakuM *WakuMetadata) onRequest(ctx context.Context) func(network.Stream) { if err != nil { logger.Error("reading request", zap.Error(err)) if err := stream.Reset(); err != nil { - wakuM.log.Error("resetting connection", zap.Error(err)) + logger.Error("resetting connection", zap.Error(err)) } return } - response := new(pb.WakuMetadataResponse) clusterID, shards, err := wakuM.ClusterAndShards() @@ -205,10 +208,11 @@ func (wakuM *WakuMetadata) onRequest(ctx context.Context) func(network.Stream) { if err != nil { logger.Error("writing response", zap.Error(err)) if err := stream.Reset(); err != nil { - wakuM.log.Error("resetting connection", zap.Error(err)) + logger.Error("resetting connection", zap.Error(err)) } return } + logger.Debug("sent metadata response to peer") stream.Close() } @@ -248,14 +252,15 @@ func (wakuM *WakuMetadata) disconnectPeer(peerID peer.ID, reason error) { // Connected is called when a connection is opened func (wakuM *WakuMetadata) Connected(n network.Network, cc network.Conn) { go func() { + wakuM.log.Debug("peer connected", zap.Stringer("peer", cc.RemotePeer())) // Metadata verification is done only if a clusterID is specified if wakuM.clusterID == 0 { return } peerID := cc.RemotePeer() - shard, err := wakuM.Request(wakuM.ctx, peerID) + if err != nil { wakuM.disconnectPeer(peerID, err) return diff --git a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go index 37139c36..56ab59e0 100644 --- a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go +++ b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go @@ -291,7 +291,7 @@ func TestRetrieveProvidePeerExchangeWithPMAndPeerAddr(t *testing.T) { require.NoError(t, err) // Prepare peer manager for host3 - pm3 := peermanager.NewPeerManager(10, 20, nil, log) + pm3 := peermanager.NewPeerManager(10, 20, nil, true, log) pm3.SetHost(host3) pxPeerConn3, err := peermanager.NewPeerConnectionStrategy(pm3, 30*time.Second, utils.Logger()) require.NoError(t, err) @@ -366,7 +366,7 @@ func TestRetrieveProvidePeerExchangeWithPMOnly(t *testing.T) { require.NoError(t, err) // Prepare peer manager for host3 - pm3 := peermanager.NewPeerManager(10, 20, nil, log) + pm3 := peermanager.NewPeerManager(10, 20, nil, true, log) pm3.SetHost(host3) pxPeerConn3, err := peermanager.NewPeerConnectionStrategy(pm3, 30*time.Second, utils.Logger()) require.NoError(t, err) diff --git a/waku/v2/protocol/store/client_test.go b/waku/v2/protocol/store/client_test.go index 035b7a6f..d851cfc4 100644 --- a/waku/v2/protocol/store/client_test.go +++ b/waku/v2/protocol/store/client_test.go @@ -43,7 +43,7 @@ func TestStoreClient(t *testing.T) { err = wakuRelay.Start(context.Background()) require.NoError(t, err) - pm := peermanager.NewPeerManager(5, 5, nil, utils.Logger()) + pm := peermanager.NewPeerManager(5, 5, nil, true, utils.Logger()) pm.SetHost(host) err = pm.SubscribeToRelayEvtBus(wakuRelay.Events()) require.NoError(t, err)