fix(PeerManager): SelectRandom filter by protocol (#1296)

This commit is contained in:
Igor Sirotin 2025-12-08 22:26:41 +01:00 committed by GitHub
parent 84a4b1be7a
commit 5635735da6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 111 additions and 69 deletions

View File

@ -15,6 +15,7 @@ import (
"github.com/multiformats/go-multiaddr"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/discv5"
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
@ -125,36 +126,38 @@ func TestPeerSelection(t *testing.T) {
defer h3.Close()
protocol := libp2pProtocol.ID("test/protocol")
_, err = pm.AddPeer([]multiaddr.Multiaddr{tests.GetAddr(h2)}, wps.Static, []string{"/waku/2/rs/2/1", "/waku/2/rs/2/2"}, libp2pProtocol.ID(protocol))
_, err = pm.AddPeer([]multiaddr.Multiaddr{tests.GetAddr(h2)}, wps.Static, []string{"/waku/2/rs/2/1", "/waku/2/rs/2/2"}, protocol)
require.NoError(t, err)
_, err = pm.AddPeer([]multiaddr.Multiaddr{tests.GetAddr(h3)}, wps.Static, []string{"/waku/2/rs/2/1"}, libp2pProtocol.ID(protocol))
_, err = pm.AddPeer([]multiaddr.Multiaddr{tests.GetAddr(h3)}, wps.Static, []string{"/waku/2/rs/2/1"}, protocol)
require.NoError(t, err)
_, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol})
require.NoError(t, err)
var peerIDs peer.IDSlice
peerIDs, err := pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/2"}})
peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol})
require.NoError(t, err)
require.Equal(t, h2.ID(), peerIDs[0])
require.Len(t, peerIDs, 1) // Only 1 peer is returned randomly, because MaxPeers defaults to 1 when not set
_, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/3"}})
require.Error(t, utils.ErrNoPeersAvailable, err)
_, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}})
peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/2"}})
require.NoError(t, err)
require.Len(t, peerIDs, 1)
require.Equal(t, h2.ID(), peerIDs[0]) // Only h2 has this pubsub topic
//Test for selectWithLowestRTT
_, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: LowestRTT, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}})
peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/3"}})
require.Error(t, utils.ErrNoPeersAvailable, err) // No peer has this pubsub topic
require.Empty(t, peerIDs)
peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}})
require.NoError(t, err)
require.Len(t, peerIDs, 1) // Both h2 and h3 have this pubsub topic, but only 1 peer is returned randomly because MaxPeers defaults to 1 when not set
peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}, MaxPeers: 2})
require.Equal(t, 2, peerIDs.Len())
require.NoError(t, err)
require.Len(t, peerIDs, 2)
peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}, MaxPeers: 3})
require.Equal(t, 2, peerIDs.Len())
require.NoError(t, err)
require.Len(t, peerIDs, 2)
h4, err := tests.MakeHost(ctx, 0, rand.Reader)
require.NoError(t, err)
@ -163,9 +166,14 @@ func TestPeerSelection(t *testing.T) {
require.NoError(t, err)
peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}, MaxPeers: 3})
require.Equal(t, 3, peerIDs.Len())
require.Len(t, peerIDs, 3)
require.NoError(t, err)
//Test for selectWithLowestRTT
// NOTE: This test must go the last because it involves pinging peers, which modifies the list of supported protocols
peerIDs, err = pm.SelectPeers(PeerSelectionCriteria{SelectionType: LowestRTT, Proto: protocol, PubsubTopics: []string{"/waku/2/rs/2/1"}, MaxPeers: 2})
require.NoError(t, err)
require.Len(t, peerIDs, 1) // With LowestRTT, only 1 peer is returned, even if MaxPeers is set
}
func TestDefaultProtocol(t *testing.T) {

View File

@ -7,11 +7,13 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
"golang.org/x/exp/maps"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/utils"
)
type PeerSet map[peer.ID]struct{}
@ -46,7 +48,7 @@ func (pm *PeerManager) SelectPeerByContentTopics(proto protocol.ID, contentTopic
return peers[0], nil
}
// SelectRandomPeer is used to return a random peer that supports a given protocol.
// SelectRandom is used to return a random peer that supports a given protocol.
// If a list of specific peers is passed, the peer will be chosen from that list assuming
// it supports the chosen protocol, otherwise it will chose a peer from the service slot.
// If a peer cannot be found in the service slot, a peer will be selected from node peerstore
@ -57,26 +59,40 @@ func (pm *PeerManager) SelectRandom(criteria PeerSelectionCriteria) (peer.IDSlic
// This will require us to check for various factors such as:
// - which topics they track
// - latency?
peerIDs, err := pm.selectServicePeer(criteria)
if err == nil && len(peerIDs) == criteria.MaxPeers {
return maps.Keys(peerIDs), nil
} else if !errors.Is(err, utils.ErrNoPeersAvailable) {
pm.logger.Debug("could not retrieve random peer from slot", zap.String("protocol", string(criteria.Proto)),
zap.Strings("pubsubTopics", criteria.PubsubTopics), zap.Error(err))
}
if !errors.Is(err, utils.ErrNoPeersAvailable) {
pm.logger.Debug("could not retrieve random peer from slot",
zap.String("protocol", string(criteria.Proto)),
zap.Strings("pubsubTopics", criteria.PubsubTopics),
zap.Error(err))
return nil, err
} else if len(peerIDs) == 0 {
}
if len(peerIDs) == 0 {
peerIDs = make(PeerSet)
}
// if not found in serviceSlots or proto == WakuRelayIDv200
filteredPeers, err := pm.FilterPeersByProto(criteria.SpecificPeers, criteria.ExcludePeers, criteria.Proto)
// if not found in serviceSlots or proto == WakuRelayIDv200 (service slots don't support WakuRelayIDv200)
peerSet := criteria.SpecificPeers
if len(peerSet) == 0 {
peerSet = pm.host.Peerstore().Peers()
}
filteredPeers, err := pm.FilterPeersByProto(peerSet, criteria.ExcludePeers, criteria.Proto)
if err != nil {
return nil, err
}
if len(filteredPeers) == 0 && criteria.Proto != relay.WakuRelayID_v200 {
return nil, utils.ErrNoPeersAvailable
}
if len(criteria.PubsubTopics) > 0 {
filteredPeers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, filteredPeers...)
}
//Not passing excludePeers as filterPeers are already considering excluded ones.
randomPeers, err := selectRandomPeers(filteredPeers, nil, criteria.MaxPeers-len(peerIDs))
randomPeers, err := selectRandomPeers(filteredPeers, nil, min(criteria.MaxPeers, len(peerIDs)))
if err != nil && len(peerIDs) == 0 {
return nil, err
}
@ -88,7 +104,6 @@ func (pm *PeerManager) SelectRandom(criteria PeerSelectionCriteria) (peer.IDSlic
}
func getRandom(filter PeerSet, count int, excludePeers PeerSet) (PeerSet, error) {
i := 0
selectedPeers := make(PeerSet)
for pID := range filter {
if PeerInSet(excludePeers, pID) {
@ -96,8 +111,7 @@ func getRandom(filter PeerSet, count int, excludePeers PeerSet) (PeerSet, error)
}
//Map's iterator in golang works using randomness and hence not random function is being used.
selectedPeers[pID] = struct{}{}
i++
if i == count {
if len(selectedPeers) == count {
break
}
}
@ -121,41 +135,57 @@ func PeerSliceToMap(peers peer.IDSlice) PeerSet {
return peerSet
}
/*
selectServicePeer tries to select peer from serviceSlot first based on criteria.
serviceSlots is a map of peerMap by protocol.ID.
- Slots are created automatically in getPeers.
- Slots are not created for relay.WakuRelayID_v200.
Therefore for Relay protocol, selectServicePeer will always return ErrNoPeersAvailable.
If there is no pubsubTopics criteria, a random peer from the selected slot is returned.
Otherwise, peers from the slot are filtered by pubsubTopics and random peers are selected from the filtered list.
If no peer is found in the slot, on-demand discovery is triggered for the given pubsubTopics and protocol.
The function retries once to fetch peers from the slot after discovery.
*/
func (pm *PeerManager) selectServicePeer(criteria PeerSelectionCriteria) (PeerSet, error) {
peers := make(PeerSet)
var err error
for retryCnt := 0; retryCnt < 1; retryCnt++ {
//Try to fetch from serviceSlot
if slot := pm.serviceSlots.getPeers(criteria.Proto); slot != nil {
if len(criteria.PubsubTopics) == 0 || (len(criteria.PubsubTopics) == 1 && criteria.PubsubTopics[0] == "") {
return slot.getRandom(criteria.MaxPeers, criteria.ExcludePeers)
} else { //PubsubTopic based selection
slot.mu.RLock()
keys := make([]peer.ID, 0, len(slot.m))
for i := range slot.m {
if PeerInSet(criteria.ExcludePeers, i) {
continue
}
keys = append(keys, i)
}
slot.mu.RUnlock()
selectedPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, keys...)
tmpPeers, err := selectRandomPeers(selectedPeers, criteria.ExcludePeers, criteria.MaxPeers)
for tmpPeer := range tmpPeers {
peers[tmpPeer] = struct{}{}
}
if err == nil && len(peers) == criteria.MaxPeers {
return peers, nil
} else {
pm.logger.Debug("discovering peers by pubsubTopic", zap.Strings("pubsubTopics", criteria.PubsubTopics))
//Trigger on-demand discovery for this topic and connect to peer immediately.
//For now discover atleast 1 peer for the criteria
pm.discoverPeersByPubsubTopics(criteria.PubsubTopics, criteria.Proto, criteria.Ctx, 1)
//Try to fetch peers again.
continue
}
}
slot := pm.serviceSlots.getPeers(criteria.Proto)
if slot == nil {
continue
}
if len(criteria.PubsubTopics) == 0 || (len(criteria.PubsubTopics) == 1 && criteria.PubsubTopics[0] == "") {
return slot.getRandom(criteria.MaxPeers, criteria.ExcludePeers)
}
//PubsubTopic based selection
slot.mu.RLock()
keys := make([]peer.ID, 0, len(slot.m))
for peerID := range slot.m {
if PeerInSet(criteria.ExcludePeers, peerID) {
continue
}
keys = append(keys, peerID)
}
slot.mu.RUnlock()
selectedPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopics(criteria.PubsubTopics, keys...)
tmpPeers, err := selectRandomPeers(selectedPeers, criteria.ExcludePeers, criteria.MaxPeers)
for tmpPeer := range tmpPeers {
peers[tmpPeer] = struct{}{}
}
if err == nil && len(peers) == criteria.MaxPeers {
return peers, nil
}
//Trigger on-demand discovery for this topic and connect to peer immediately.
//For now discover at least 1 peer for the criteria
pm.logger.Debug("discovering peers by pubsubTopic", zap.Strings("pubsubTopics", criteria.PubsubTopics))
pm.discoverPeersByPubsubTopics(criteria.PubsubTopics, criteria.Proto, criteria.Ctx, 1)
//Try to fetch peers again.
}
if len(peers) == 0 {
pm.logger.Debug("could not retrieve random peer from slot", zap.Error(err))

View File

@ -8,8 +8,9 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol"
"golang.org/x/exp/maps"
"github.com/waku-org/go-waku/waku/v2/protocol"
)
// Origin is used to determine how the peer is identified,
@ -233,17 +234,20 @@ func (ps *WakuPeerstoreImpl) PeersByPubSubTopics(pubSubTopics []string, specific
for _, p := range specificPeers {
peerMatch := true
peerTopics, err := ps.PubSubTopics(p)
if err == nil {
for _, t := range pubSubTopics {
if _, ok := peerTopics[t]; !ok {
peerMatch = false
break
}
if err != nil {
//Note: skipping a peer in case of an error as there would be others available.
continue
}
for _, t := range pubSubTopics {
if _, ok := peerTopics[t]; !ok {
peerMatch = false
break
}
if peerMatch {
result = append(result, p)
}
} //Note: skipping a peer in case of an error as there would be others available.
}
if peerMatch {
result = append(result, p)
}
}
return result
}