From 5aa11311f83305847cabd13927251fbcb6add56f Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Tue, 6 Aug 2024 17:51:11 +0530 Subject: [PATCH] fix: use corrected connected peer count and add check to avoid crash (#1182) --- .github/workflows/ci.yml | 2 +- waku/v2/peermanager/peer_manager.go | 33 +++++++++++++---------------- 2 files changed, 16 insertions(+), 19 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 690b3da9..aec81a70 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -163,7 +163,7 @@ jobs: - name: "Run storev3 tests" run: | docker compose -f .github/docker-compose/nwaku.yml up -d - NWAKU_HOST=$(docker-compose -f .github/docker-compose/nwaku.yml port nwaku 60000) + NWAKU_HOST=$(docker compose -f .github/docker-compose/nwaku.yml port nwaku 60000) NWAKU_PORT=$(echo $NWAKU_HOST | cut -d ":" -f 2) sleep 5 make test-storev3 TEST_STOREV3_NODE="/ip4/127.0.0.1/tcp/${NWAKU_PORT}/p2p/16Uiu2HAmMGhfSTUzKbsjMWxc6T1X4wiTWSF1bEWSLjAukCm7KiHV" diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index 1cfc5484..1441f7f4 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -309,19 +309,15 @@ func (pm *PeerManager) ensureMinRelayConnsPerTopic() { defer pm.topicMutex.RUnlock() for topicStr, topicInst := range pm.subRelayTopics { - // @cammellos reported that ListPeers returned an invalid number of - // peers. This will ensure that the peers returned by this function - // match those peers that are currently connected - meshPeerLen := pm.checkAndUpdateTopicHealth(topicInst) - topicPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(topicStr) - curPeerLen := topicPeers.Len() - if meshPeerLen < waku_proto.GossipSubDMin || curPeerLen < pm.OutPeersTarget { + curConnectedPeerLen := pm.getPeersBasedOnconnectionStatus(topicStr, network.Connected).Len() + + if meshPeerLen < waku_proto.GossipSubDMin || curConnectedPeerLen < pm.OutPeersTarget { pm.logger.Debug("subscribed topic has not reached target peers, initiating more connections to maintain healthy mesh", - zap.String("pubSubTopic", topicStr), zap.Int("connectedPeerCount", curPeerLen), + zap.String("pubSubTopic", topicStr), zap.Int("connectedPeerCount", curConnectedPeerLen), zap.Int("targetPeers", pm.OutPeersTarget)) //Find not connected peers. - notConnectedPeers := pm.getNotConnectedPers(topicStr) + notConnectedPeers := pm.getPeersBasedOnconnectionStatus(topicStr, network.NotConnected) if notConnectedPeers.Len() == 0 { pm.logger.Debug("could not find any peers in peerstore to connect to, discovering more", zap.String("pubSubTopic", topicStr)) go pm.discoverPeersByPubsubTopics([]string{topicStr}, relay.WakuRelayID_v200, pm.ctx, 2) @@ -329,12 +325,13 @@ func (pm *PeerManager) ensureMinRelayConnsPerTopic() { } pm.logger.Debug("connecting to eligible peers in peerstore", zap.String("pubSubTopic", topicStr)) //Connect to eligible peers. - numPeersToConnect := pm.OutPeersTarget - curPeerLen - - if numPeersToConnect > notConnectedPeers.Len() { - numPeersToConnect = notConnectedPeers.Len() + numPeersToConnect := pm.OutPeersTarget - curConnectedPeerLen + if numPeersToConnect > 0 { + if numPeersToConnect > notConnectedPeers.Len() { + numPeersToConnect = notConnectedPeers.Len() + } + pm.connectToSpecifiedPeers(notConnectedPeers[0:numPeersToConnect]) } - pm.connectToSpecifiedPeers(notConnectedPeers[0:numPeersToConnect]) } } } @@ -374,8 +371,8 @@ func (pm *PeerManager) connectToSpecifiedPeers(peers peer.IDSlice) { } } -// getNotConnectedPers returns peers for a pubSubTopic that are not connected. -func (pm *PeerManager) getNotConnectedPers(pubsubTopic string) (notConnectedPeers peer.IDSlice) { +// getPeersBasedOnconnectionStatus returns peers for a pubSubTopic that are either connected/not-connected based on status passed. +func (pm *PeerManager) getPeersBasedOnconnectionStatus(pubsubTopic string, connected network.Connectedness) (filteredPeers peer.IDSlice) { var peerList peer.IDSlice if pubsubTopic == "" { peerList = pm.host.Peerstore().Peers() @@ -383,8 +380,8 @@ func (pm *PeerManager) getNotConnectedPers(pubsubTopic string) (notConnectedPeer peerList = pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubsubTopic) } for _, peerID := range peerList { - if pm.host.Network().Connectedness(peerID) != network.Connected { - notConnectedPeers = append(notConnectedPeers, peerID) + if pm.host.Network().Connectedness(peerID) == connected { + filteredPeers = append(filteredPeers, peerID) } } return