diff --git a/waku/v2/peermanager/peer_manager_test.go b/waku/v2/peermanager/peer_manager_test.go index 0c6f4733..b31425b1 100644 --- a/waku/v2/peermanager/peer_manager_test.go +++ b/waku/v2/peermanager/peer_manager_test.go @@ -15,6 +15,7 @@ import ( "github.com/multiformats/go-multiaddr" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" + "github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/waku/v2/discv5" "github.com/waku-org/go-waku/waku/v2/onlinechecker" @@ -125,36 +126,38 @@ func TestPeerSelection(t *testing.T) { defer h3.Close() protocol := libp2pProtocol.ID("test/protocol") - _, err = pm.AddPeer([]multiaddr.Multiaddr{tests.GetAddr(h2)}, wps.Static, []string{"/waku/2/rs/2/1", "/waku/2/rs/2/2"}, libp2pProtocol.ID(protocol)) + _, err = pm.AddPeer([]multiaddr.Multiaddr{tests.GetAddr(h2)}, wps.Static, []string{"/waku/2/rs/2/1", "/waku/2/rs/2/2"}, protocol) require.NoError(t, err) - _, err = pm.AddPeer([]multiaddr.Multiaddr{tests.GetAddr(h3)}, wps.Static, []string{"/waku/2/rs/2/1"}, libp2pProtocol.ID(protocol)) + _, err = pm.AddPeer([]multiaddr.Multiaddr{tests.GetAddr(h3)}, wps.Static, []string{"/waku/2/rs/2/1"}, protocol) require.NoError(t, err) - _, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol}) - require.NoError(t, err) + var peerIDs peer.IDSlice - peerIDs, err := pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/2"}}) + peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol}) require.NoError(t, err) - require.Equal(t, h2.ID(), peerIDs[0]) + require.Len(t, peerIDs, 1) // Only 1 peer is returned randomly, because MaxPeers defaults to 1 when not set - _, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/3"}}) - require.Error(t, utils.ErrNoPeersAvailable, err) - - _, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}}) + peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/2"}}) require.NoError(t, err) + require.Len(t, peerIDs, 1) + require.Equal(t, h2.ID(), peerIDs[0]) // Only h2 has this pubsub topic - //Test for selectWithLowestRTT - _, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: LowestRTT, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}}) + peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/3"}}) + require.Error(t, utils.ErrNoPeersAvailable, err) // No peer has this pubsub topic + require.Empty(t, peerIDs) + + peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}}) require.NoError(t, err) + require.Len(t, peerIDs, 1) // Both h2 and h3 have this pubsub topic, but only 1 peer is returned randomly because MaxPeers defaults to 1 when not set peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}, MaxPeers: 2}) - require.Equal(t, 2, peerIDs.Len()) require.NoError(t, err) + require.Len(t, peerIDs, 2) peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}, MaxPeers: 3}) - require.Equal(t, 2, peerIDs.Len()) require.NoError(t, err) + require.Len(t, peerIDs, 2) h4, err := tests.MakeHost(ctx, 0, rand.Reader) require.NoError(t, err) @@ -163,9 +166,14 @@ func TestPeerSelection(t *testing.T) { require.NoError(t, err) peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}, MaxPeers: 3}) - require.Equal(t, 3, peerIDs.Len()) + require.Len(t, peerIDs, 3) require.NoError(t, err) + //Test for selectWithLowestRTT + // NOTE: This test must go the last because it involves pinging peers, which modifies the list of supported protocols + peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: LowestRTT, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}, MaxPeers: 2}) + require.NoError(t, err) + require.Len(t, peerIDs, 1) // With LowestRTT, only 1 peer is returned, even if MaxPeers is set } func TestDefaultProtocol(t *testing.T) { diff --git a/waku/v2/peermanager/peer_selection.go b/waku/v2/peermanager/peer_selection.go index 7cadbe22..007d1939 100644 --- a/waku/v2/peermanager/peer_selection.go +++ b/waku/v2/peermanager/peer_selection.go @@ -7,11 +7,13 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" - wps "github.com/waku-org/go-waku/waku/v2/peerstore" - waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" - "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" "golang.org/x/exp/maps" + + wps "github.com/waku-org/go-waku/waku/v2/peerstore" + waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "github.com/waku-org/go-waku/waku/v2/utils" ) type PeerSet map[peer.ID]struct{} @@ -46,7 +48,7 @@ func (pm *PeerManager) SelectPeerByContentTopics(proto protocol.ID, contentTopic return peers[0], nil } -// SelectRandomPeer is used to return a random peer that supports a given protocol. +// SelectRandom is used to return a random peer that supports a given protocol. // If a list of specific peers is passed, the peer will be chosen from that list assuming // it supports the chosen protocol, otherwise it will chose a peer from the service slot. // If a peer cannot be found in the service slot, a peer will be selected from node peerstore @@ -57,26 +59,40 @@ func (pm *PeerManager) SelectRandom(criteria PeerSelectionCriteria) (peer.IDSlic // This will require us to check for various factors such as: // - which topics they track // - latency? + peerIDs, err := pm.selectServicePeer(criteria) if err == nil && len(peerIDs) == criteria.MaxPeers { return maps.Keys(peerIDs), nil - } else if !errors.Is(err, utils.ErrNoPeersAvailable) { - pm.logger.Debug("could not retrieve random peer from slot", zap.String("protocol", string(criteria.Proto)), - zap.Strings("pubsubTopics", criteria.PubsubTopics), zap.Error(err)) + } + if !errors.Is(err, utils.ErrNoPeersAvailable) { + pm.logger.Debug("could not retrieve random peer from slot", + zap.String("protocol", string(criteria.Proto)), + zap.Strings("pubsubTopics", criteria.PubsubTopics), + zap.Error(err)) return nil, err - } else if len(peerIDs) == 0 { + } + if len(peerIDs) == 0 { peerIDs = make(PeerSet) } - // if not found in serviceSlots or proto == WakuRelayIDv200 - filteredPeers, err := pm.FilterPeersByProto(criteria.SpecificPeers, criteria.ExcludePeers, criteria.Proto) + + // if not found in serviceSlots or proto == WakuRelayIDv200 (service slots don't support WakuRelayIDv200) + peerSet := criteria.SpecificPeers + if len(peerSet) == 0 { + peerSet = pm.host.Peerstore().Peers() + } + filteredPeers, err := pm.FilterPeersByProto(peerSet, criteria.ExcludePeers, criteria.Proto) if err != nil { return nil, err } + if len(filteredPeers) == 0 && criteria.Proto != relay.WakuRelayID_v200 { + return nil, utils.ErrNoPeersAvailable + } if len(criteria.PubsubTopics) > 0 { filteredPeers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, filteredPeers...) } + //Not passing excludePeers as filterPeers are already considering excluded ones. - randomPeers, err := selectRandomPeers(filteredPeers, nil, criteria.MaxPeers-len(peerIDs)) + randomPeers, err := selectRandomPeers(filteredPeers, nil, min(criteria.MaxPeers, len(peerIDs))) if err != nil && len(peerIDs) == 0 { return nil, err } @@ -88,7 +104,6 @@ func (pm *PeerManager) SelectRandom(criteria PeerSelectionCriteria) (peer.IDSlic } func getRandom(filter PeerSet, count int, excludePeers PeerSet) (PeerSet, error) { - i := 0 selectedPeers := make(PeerSet) for pID := range filter { if PeerInSet(excludePeers, pID) { @@ -96,8 +111,7 @@ func getRandom(filter PeerSet, count int, excludePeers PeerSet) (PeerSet, error) } //Map's iterator in golang works using randomness and hence not random function is being used. selectedPeers[pID] = struct{}{} - i++ - if i == count { + if len(selectedPeers) == count { break } } @@ -121,41 +135,57 @@ func PeerSliceToMap(peers peer.IDSlice) PeerSet { return peerSet } +/* +selectServicePeer tries to select peer from serviceSlot first based on criteria. +serviceSlots is a map of peerMap by protocol.ID. +- Slots are created automatically in getPeers. +- Slots are not created for relay.WakuRelayID_v200. +Therefore for Relay protocol, selectServicePeer will always return ErrNoPeersAvailable. + +If there is no pubsubTopics criteria, a random peer from the selected slot is returned. +Otherwise, peers from the slot are filtered by pubsubTopics and random peers are selected from the filtered list. + +If no peer is found in the slot, on-demand discovery is triggered for the given pubsubTopics and protocol. +The function retries once to fetch peers from the slot after discovery. +*/ func (pm *PeerManager) selectServicePeer(criteria PeerSelectionCriteria) (PeerSet, error) { peers := make(PeerSet) var err error for retryCnt := 0; retryCnt < 1; retryCnt++ { //Try to fetch from serviceSlot - if slot := pm.serviceSlots.getPeers(criteria.Proto); slot != nil { - if len(criteria.PubsubTopics) == 0 || (len(criteria.PubsubTopics) == 1 && criteria.PubsubTopics[0] == "") { - return slot.getRandom(criteria.MaxPeers, criteria.ExcludePeers) - } else { //PubsubTopic based selection - slot.mu.RLock() - keys := make([]peer.ID, 0, len(slot.m)) - for i := range slot.m { - if PeerInSet(criteria.ExcludePeers, i) { - continue - } - keys = append(keys, i) - } - slot.mu.RUnlock() - selectedPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, keys...) - tmpPeers, err := selectRandomPeers(selectedPeers, criteria.ExcludePeers, criteria.MaxPeers) - for tmpPeer := range tmpPeers { - peers[tmpPeer] = struct{}{} - } - if err == nil && len(peers) == criteria.MaxPeers { - return peers, nil - } else { - pm.logger.Debug("discovering peers by pubsubTopic", zap.Strings("pubsubTopics", criteria.PubsubTopics)) - //Trigger on-demand discovery for this topic and connect to peer immediately. - //For now discover atleast 1 peer for the criteria - pm.discoverPeersByPubsubTopics(criteria.PubsubTopics, criteria.Proto, criteria.Ctx, 1) - //Try to fetch peers again. - continue - } - } + slot := pm.serviceSlots.getPeers(criteria.Proto) + if slot == nil { + continue } + if len(criteria.PubsubTopics) == 0 || (len(criteria.PubsubTopics) == 1 && criteria.PubsubTopics[0] == "") { + return slot.getRandom(criteria.MaxPeers, criteria.ExcludePeers) + } + + //PubsubTopic based selection + slot.mu.RLock() + keys := make([]peer.ID, 0, len(slot.m)) + for peerID := range slot.m { + if PeerInSet(criteria.ExcludePeers, peerID) { + continue + } + keys = append(keys, peerID) + } + slot.mu.RUnlock() + selectedPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, keys...) + tmpPeers, err := selectRandomPeers(selectedPeers, criteria.ExcludePeers, criteria.MaxPeers) + for tmpPeer := range tmpPeers { + peers[tmpPeer] = struct{}{} + } + if err == nil && len(peers) == criteria.MaxPeers { + return peers, nil + } + + //Trigger on-demand discovery for this topic and connect to peer immediately. + //For now discover at least 1 peer for the criteria + pm.logger.Debug("discovering peers by pubsubTopic", zap.Strings("pubsubTopics", criteria.PubsubTopics)) + pm.discoverPeersByPubsubTopics(criteria.PubsubTopics, criteria.Proto, criteria.Ctx, 1) + + //Try to fetch peers again. } if len(peers) == 0 { pm.logger.Debug("could not retrieve random peer from slot", zap.Error(err)) diff --git a/waku/v2/peerstore/waku_peer_store.go b/waku/v2/peerstore/waku_peer_store.go index af42a28a..dd3ea82d 100644 --- a/waku/v2/peerstore/waku_peer_store.go +++ b/waku/v2/peerstore/waku_peer_store.go @@ -8,8 +8,9 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" - "github.com/waku-org/go-waku/waku/v2/protocol" "golang.org/x/exp/maps" + + "github.com/waku-org/go-waku/waku/v2/protocol" ) // Origin is used to determine how the peer is identified, @@ -233,17 +234,20 @@ func (ps *WakuPeerstoreImpl) PeersByPubSubTopics(pubSubTopics []string, specific for _, p := range specificPeers { peerMatch := true peerTopics, err := ps.PubSubTopics(p) - if err == nil { - for _, t := range pubSubTopics { - if _, ok := peerTopics[t]; !ok { - peerMatch = false - break - } + if err != nil { + //Note: skipping a peer in case of an error as there would be others available. + continue + } + + for _, t := range pubSubTopics { + if _, ok := peerTopics[t]; !ok { + peerMatch = false + break } - if peerMatch { - result = append(result, p) - } - } //Note: skipping a peer in case of an error as there would be others available. + } + if peerMatch { + result = append(result, p) + } } return result }