mirror of https://github.com/status-im/go-waku.git
Update peer selection options for light protocols (#787)
* Update peer selection options for lightPush * Update peer selection options for filter * migrate peer selection functionality from peer manager Co-authored-by: richΛrd <info@richardramos.me> --------- Co-authored-by: richΛrd <info@richardramos.me>
This commit is contained in:
parent
2ef7e732dd
commit
6955d01498
|
@ -283,6 +283,8 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
|
|||
}
|
||||
}
|
||||
|
||||
w.opts.legacyFilterOpts = append(w.opts.legacyFilterOpts, legacy_filter.WithPeerManager(w.peermanager))
|
||||
|
||||
w.legacyFilter = legacy_filter.NewWakuFilter(w.bcaster, w.opts.isLegacyFilterFullNode, w.timesource, w.opts.prometheusReg, w.log, w.opts.legacyFilterOpts...)
|
||||
w.filterFullNode = filter.NewWakuFilterFullNode(w.timesource, w.opts.prometheusReg, w.log, w.opts.filterOpts...)
|
||||
w.filterLightNode = filter.NewWakuFilterLightNode(w.bcaster, w.peermanager, w.timesource, w.opts.prometheusReg, w.log)
|
||||
|
|
|
@ -17,6 +17,7 @@ import (
|
|||
"github.com/waku-org/go-waku/waku/persistence"
|
||||
"github.com/waku-org/go-waku/waku/persistence/sqlite"
|
||||
"github.com/waku-org/go-waku/waku/v2/dnsdisc"
|
||||
"github.com/waku-org/go-waku/waku/v2/peerstore"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
||||
|
@ -263,7 +264,7 @@ func TestDecoupledStoreFromRelay(t *testing.T) {
|
|||
|
||||
_, filter, err := wakuNode2.LegacyFilter().Subscribe(ctx, legacy_filter.ContentFilter{
|
||||
Topic: string(relay.DefaultWakuTopic),
|
||||
})
|
||||
}, legacy_filter.WithPeer(wakuNode1.host.ID()))
|
||||
require.NoError(t, err)
|
||||
|
||||
// Sleep to make sure the filter is subscribed
|
||||
|
@ -303,9 +304,9 @@ func TestDecoupledStoreFromRelay(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
defer wakuNode3.Stop()
|
||||
|
||||
err = wakuNode3.DialPeerWithMultiAddress(ctx, wakuNode2.ListenAddresses()[0])
|
||||
_, err = wakuNode3.AddPeer(wakuNode2.ListenAddresses()[0], peerstore.Static, []string{relay.DefaultWakuTopic}, store.StoreID_v20beta4)
|
||||
require.NoError(t, err)
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
// NODE2 should have returned the message received via filter
|
||||
result, err := wakuNode3.Store().Query(ctx, store.Query{})
|
||||
require.NoError(t, err)
|
||||
|
|
|
@ -3,6 +3,7 @@ package peermanager
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -13,13 +14,13 @@ import (
|
|||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/libp2p/go-libp2p/core/peerstore"
|
||||
"github.com/libp2p/go-libp2p/core/protocol"
|
||||
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
"github.com/waku-org/go-waku/logging"
|
||||
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
|
||||
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -45,6 +46,18 @@ type PeerManager struct {
|
|||
subRelayTopics map[string]*NodeTopicDetails
|
||||
}
|
||||
|
||||
// PeerSelection provides various options based on which Peer is selected from a list of peers.
|
||||
type PeerSelection int
|
||||
|
||||
const (
|
||||
Automatic PeerSelection = iota
|
||||
LowestRTT
|
||||
)
|
||||
|
||||
// ErrNoPeersAvailable is emitted when no suitable peers are found for
|
||||
// some protocol
|
||||
var ErrNoPeersAvailable = errors.New("no suitable peers found")
|
||||
|
||||
const peerConnectivityLoopSecs = 15
|
||||
const maxConnsToPeerRatio = 5
|
||||
|
||||
|
@ -161,10 +174,10 @@ func (pm *PeerManager) getRelayPeers(specificPeers ...peer.ID) (inRelayPeers pee
|
|||
|
||||
//Need to filter peers to check if they support relay
|
||||
if inPeers.Len() != 0 {
|
||||
inRelayPeers, _ = utils.FilterPeersByProto(pm.host, inPeers, relay.WakuRelayID_v200)
|
||||
inRelayPeers, _ = pm.FilterPeersByProto(inPeers, relay.WakuRelayID_v200)
|
||||
}
|
||||
if outPeers.Len() != 0 {
|
||||
outRelayPeers, _ = utils.FilterPeersByProto(pm.host, outPeers, relay.WakuRelayID_v200)
|
||||
outRelayPeers, _ = pm.FilterPeersByProto(outPeers, relay.WakuRelayID_v200)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
@ -426,34 +439,34 @@ func (pm *PeerManager) SelectPeerByContentTopic(proto protocol.ID, contentTopic
|
|||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return pm.SelectPeer(proto, pubsubTopic, specificPeers...)
|
||||
return pm.SelectPeer(PeerSelectionCriteria{PubsubTopic: pubsubTopic, Proto: proto, SpecificPeers: specificPeers})
|
||||
}
|
||||
|
||||
// SelectPeer is used to return a random peer that supports a given protocol.
|
||||
// SelectRandomPeer is used to return a random peer that supports a given protocol.
|
||||
// If a list of specific peers is passed, the peer will be chosen from that list assuming
|
||||
// it supports the chosen protocol, otherwise it will chose a peer from the service slot.
|
||||
// If a peer cannot be found in the service slot, a peer will be selected from node peerstore
|
||||
// if pubSubTopic is specified, peer is selected from list that support the pubSubTopic
|
||||
func (pm *PeerManager) SelectPeer(proto protocol.ID, pubSubTopic string, specificPeers ...peer.ID) (peer.ID, error) {
|
||||
func (pm *PeerManager) SelectRandomPeer(criteria PeerSelectionCriteria) (peer.ID, error) {
|
||||
// @TODO We need to be more strategic about which peers we dial. Right now we just set one on the service.
|
||||
// Ideally depending on the query and our set of peers we take a subset of ideal peers.
|
||||
// This will require us to check for various factors such as:
|
||||
// - which topics they track
|
||||
// - latency?
|
||||
|
||||
if peerID := pm.selectServicePeer(proto, pubSubTopic, specificPeers...); peerID != nil {
|
||||
if peerID := pm.selectServicePeer(criteria.Proto, criteria.PubsubTopic, criteria.SpecificPeers...); peerID != nil {
|
||||
return *peerID, nil
|
||||
}
|
||||
|
||||
// if not found in serviceSlots or proto == WakuRelayIDv200
|
||||
filteredPeers, err := utils.FilterPeersByProto(pm.host, specificPeers, proto)
|
||||
filteredPeers, err := pm.FilterPeersByProto(criteria.SpecificPeers, criteria.Proto)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if pubSubTopic != "" {
|
||||
filteredPeers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(pubSubTopic, filteredPeers...)
|
||||
if criteria.PubsubTopic != "" {
|
||||
filteredPeers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(criteria.PubsubTopic, filteredPeers...)
|
||||
}
|
||||
return utils.SelectRandomPeer(filteredPeers, pm.logger)
|
||||
return selectRandomPeer(filteredPeers, pm.logger)
|
||||
}
|
||||
|
||||
func (pm *PeerManager) selectServicePeer(proto protocol.ID, pubSubTopic string, specificPeers ...peer.ID) (peerIDPtr *peer.ID) {
|
||||
|
@ -473,7 +486,7 @@ func (pm *PeerManager) selectServicePeer(proto protocol.ID, pubSubTopic string,
|
|||
keys = append(keys, i)
|
||||
}
|
||||
selectedPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(pubSubTopic, keys...)
|
||||
peerID, err := utils.SelectRandomPeer(selectedPeers, pm.logger)
|
||||
peerID, err := selectRandomPeer(selectedPeers, pm.logger)
|
||||
if err == nil {
|
||||
peerIDPtr = &peerID
|
||||
} else {
|
||||
|
@ -483,3 +496,138 @@ func (pm *PeerManager) selectServicePeer(proto protocol.ID, pubSubTopic string,
|
|||
}
|
||||
return
|
||||
}
|
||||
|
||||
// PeerSelectionCriteria is the selection Criteria that is used by PeerManager to select peers.
|
||||
type PeerSelectionCriteria struct {
|
||||
SelectionType PeerSelection
|
||||
Proto protocol.ID
|
||||
PubsubTopic string
|
||||
SpecificPeers peer.IDSlice
|
||||
Ctx context.Context
|
||||
}
|
||||
|
||||
// SelectPeer selects a peer based on selectionType specified.
|
||||
// Context is required only in case of selectionType set to LowestRTT
|
||||
func (pm *PeerManager) SelectPeer(criteria PeerSelectionCriteria) (peer.ID, error) {
|
||||
|
||||
switch criteria.SelectionType {
|
||||
case Automatic:
|
||||
return pm.SelectRandomPeer(criteria)
|
||||
case LowestRTT:
|
||||
if criteria.Ctx == nil {
|
||||
criteria.Ctx = context.Background()
|
||||
pm.logger.Warn("context is not passed for peerSelectionwithRTT, using background context")
|
||||
}
|
||||
return pm.SelectPeerWithLowestRTT(criteria)
|
||||
default:
|
||||
return "", errors.New("unknown peer selection type specified")
|
||||
}
|
||||
}
|
||||
|
||||
type pingResult struct {
|
||||
p peer.ID
|
||||
rtt time.Duration
|
||||
}
|
||||
|
||||
// SelectPeerWithLowestRTT will select a peer that supports a specific protocol with the lowest reply time
|
||||
// If a list of specific peers is passed, the peer will be chosen from that list assuming
|
||||
// it supports the chosen protocol, otherwise it will chose a peer from the node peerstore
|
||||
// TO OPTIMIZE: As of now the peer with lowest RTT is identified when select is called, this should be optimized
|
||||
// to maintain the RTT as part of peer-scoring and just select based on that.
|
||||
func (pm *PeerManager) SelectPeerWithLowestRTT(criteria PeerSelectionCriteria) (peer.ID, error) {
|
||||
var peers peer.IDSlice
|
||||
var err error
|
||||
if criteria.Ctx == nil {
|
||||
criteria.Ctx = context.Background()
|
||||
}
|
||||
|
||||
if criteria.PubsubTopic != "" {
|
||||
peers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(criteria.PubsubTopic, criteria.SpecificPeers...)
|
||||
}
|
||||
|
||||
peers, err = pm.FilterPeersByProto(peers, criteria.Proto)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
wg := sync.WaitGroup{}
|
||||
waitCh := make(chan struct{})
|
||||
pingCh := make(chan pingResult, 1000)
|
||||
|
||||
wg.Add(len(peers))
|
||||
|
||||
go func() {
|
||||
for _, p := range peers {
|
||||
go func(p peer.ID) {
|
||||
defer wg.Done()
|
||||
ctx, cancel := context.WithTimeout(criteria.Ctx, 3*time.Second)
|
||||
defer cancel()
|
||||
result := <-ping.Ping(ctx, pm.host, p)
|
||||
if result.Error == nil {
|
||||
pingCh <- pingResult{
|
||||
p: p,
|
||||
rtt: result.RTT,
|
||||
}
|
||||
} else {
|
||||
pm.logger.Debug("could not ping", logging.HostID("peer", p), zap.Error(result.Error))
|
||||
}
|
||||
}(p)
|
||||
}
|
||||
wg.Wait()
|
||||
close(waitCh)
|
||||
close(pingCh)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-waitCh:
|
||||
var min *pingResult
|
||||
for p := range pingCh {
|
||||
if min == nil {
|
||||
min = &p
|
||||
} else {
|
||||
if p.rtt < min.rtt {
|
||||
min = &p
|
||||
}
|
||||
}
|
||||
}
|
||||
if min == nil {
|
||||
return "", ErrNoPeersAvailable
|
||||
}
|
||||
|
||||
return min.p, nil
|
||||
case <-criteria.Ctx.Done():
|
||||
return "", ErrNoPeersAvailable
|
||||
}
|
||||
}
|
||||
|
||||
// selectRandomPeer selects randomly a peer from the list of peers passed.
|
||||
func selectRandomPeer(peers peer.IDSlice, log *zap.Logger) (peer.ID, error) {
|
||||
if len(peers) >= 1 {
|
||||
peerID := peers[rand.Intn(len(peers))]
|
||||
// TODO: proper heuristic here that compares peer scores and selects "best" one. For now a random peer for the given protocol is returned
|
||||
return peerID, nil // nolint: gosec
|
||||
}
|
||||
|
||||
return "", ErrNoPeersAvailable
|
||||
}
|
||||
|
||||
// FilterPeersByProto filters list of peers that support specified protocols.
|
||||
// If specificPeers is nil, all peers in the host's peerStore are considered for filtering.
|
||||
func (pm *PeerManager) FilterPeersByProto(specificPeers peer.IDSlice, proto ...protocol.ID) (peer.IDSlice, error) {
|
||||
peerSet := specificPeers
|
||||
if len(peerSet) == 0 {
|
||||
peerSet = pm.host.Peerstore().Peers()
|
||||
}
|
||||
|
||||
var peers peer.IDSlice
|
||||
for _, peer := range peerSet {
|
||||
protocols, err := pm.host.Peerstore().SupportsProtocols(peer, proto...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(protocols) > 0 {
|
||||
peers = append(peers, peer)
|
||||
}
|
||||
}
|
||||
return peers, nil
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"crypto/rand"
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -20,16 +21,22 @@ import (
|
|||
|
||||
func getAddr(h host.Host) multiaddr.Multiaddr {
|
||||
id, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", h.ID().Pretty()))
|
||||
return h.Network().ListenAddresses()[0].Encapsulate(id)
|
||||
var selectedAddr multiaddr.Multiaddr
|
||||
//For now skipping circuit relay addresses as libp2p seems to be returning empty p2p-circuit addresses.
|
||||
for _, addr := range h.Network().ListenAddresses() {
|
||||
if strings.Contains(addr.String(), "p2p-circuit") {
|
||||
continue
|
||||
}
|
||||
selectedAddr = addr
|
||||
}
|
||||
return selectedAddr.Encapsulate(id)
|
||||
}
|
||||
|
||||
func initTest(t *testing.T) (context.Context, *PeerManager, func()) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
// hosts
|
||||
h1, err := tests.MakeHost(ctx, 0, rand.Reader)
|
||||
require.NoError(t, err)
|
||||
defer h1.Close()
|
||||
|
||||
// host 1 is used by peer manager
|
||||
pm := NewPeerManager(10, 20, utils.Logger())
|
||||
|
@ -66,7 +73,7 @@ func TestServiceSlots(t *testing.T) {
|
|||
///////////////
|
||||
|
||||
// select peer from pm, currently only h2 is set in pm
|
||||
peerID, err := pm.SelectPeer(protocol, "")
|
||||
peerID, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, peerID, h2.ID())
|
||||
|
||||
|
@ -75,7 +82,7 @@ func TestServiceSlots(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
|
||||
// check that returned peer is h2 or h3 peer
|
||||
peerID, err = pm.SelectPeer(protocol, "")
|
||||
peerID, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol})
|
||||
require.NoError(t, err)
|
||||
if peerID == h2.ID() || peerID == h3.ID() {
|
||||
//Test success
|
||||
|
@ -91,15 +98,15 @@ func TestServiceSlots(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
defer h4.Close()
|
||||
|
||||
_, err = pm.SelectPeer(protocol1, "")
|
||||
require.Error(t, err, utils.ErrNoPeersAvailable)
|
||||
_, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol1})
|
||||
require.Error(t, err, ErrNoPeersAvailable)
|
||||
|
||||
// add h4 peer for protocol1
|
||||
_, err = pm.AddPeer(getAddr(h4), wps.Static, []string{""}, libp2pProtocol.ID(protocol1))
|
||||
require.NoError(t, err)
|
||||
|
||||
//Test peer selection for protocol1
|
||||
peerID, err = pm.SelectPeer(protocol1, "")
|
||||
peerID, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol1})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, peerID, h4.ID())
|
||||
|
||||
|
@ -127,19 +134,22 @@ func TestPeerSelection(t *testing.T) {
|
|||
_, err = pm.AddPeer(getAddr(h3), wps.Static, []string{"/waku/rs/2/1"}, libp2pProtocol.ID(protocol))
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = pm.SelectPeer(protocol, "")
|
||||
_, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol})
|
||||
require.NoError(t, err)
|
||||
|
||||
peerID, err := pm.SelectPeer(protocol, "/waku/rs/2/2")
|
||||
peerID, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopic: "/waku/rs/2/2"})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, h2.ID(), peerID)
|
||||
|
||||
_, err = pm.SelectPeer(protocol, "/waku/rs/2/3")
|
||||
require.Error(t, utils.ErrNoPeersAvailable, err)
|
||||
_, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopic: "/waku/rs/2/3"})
|
||||
require.Error(t, ErrNoPeersAvailable, err)
|
||||
|
||||
_, err = pm.SelectPeer(protocol, "/waku/rs/2/1")
|
||||
_, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopic: "/waku/rs/2/1"})
|
||||
require.NoError(t, err)
|
||||
|
||||
//Test for selectWithLowestRTT
|
||||
_, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: LowestRTT, Proto: protocol, PubsubTopic: "/waku/rs/2/1"})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestDefaultProtocol(t *testing.T) {
|
||||
|
@ -149,8 +159,8 @@ func TestDefaultProtocol(t *testing.T) {
|
|||
// check peer for default protocol
|
||||
///////////////
|
||||
//Test empty peer selection for relay protocol
|
||||
_, err := pm.SelectPeer(relay.WakuRelayID_v200, "")
|
||||
require.Error(t, err, utils.ErrNoPeersAvailable)
|
||||
_, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: relay.WakuRelayID_v200})
|
||||
require.Error(t, err, ErrNoPeersAvailable)
|
||||
|
||||
///////////////
|
||||
// getting peer for default protocol
|
||||
|
@ -164,7 +174,7 @@ func TestDefaultProtocol(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
|
||||
// since we are not passing peerList, selectPeer fn using filterByProto checks in PeerStore for peers with same protocol.
|
||||
peerID, err := pm.SelectPeer(relay.WakuRelayID_v200, "")
|
||||
peerID, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: relay.WakuRelayID_v200})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, peerID, h5.ID())
|
||||
}
|
||||
|
@ -184,13 +194,13 @@ func TestAdditionAndRemovalOfPeer(t *testing.T) {
|
|||
_, err = pm.AddPeer(getAddr(h6), wps.Static, []string{""}, protocol2)
|
||||
require.NoError(t, err)
|
||||
|
||||
peerID, err := pm.SelectPeer(protocol2, "")
|
||||
peerID, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol2})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, peerID, h6.ID())
|
||||
|
||||
pm.RemovePeer(peerID)
|
||||
_, err = pm.SelectPeer(protocol2, "")
|
||||
require.Error(t, err, utils.ErrNoPeersAvailable)
|
||||
_, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol2})
|
||||
require.Error(t, err, ErrNoPeersAvailable)
|
||||
}
|
||||
|
||||
func TestConnectToRelayPeers(t *testing.T) {
|
||||
|
|
|
@ -6,7 +6,6 @@ import (
|
|||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/libp2p/go-libp2p/core/protocol"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
)
|
||||
|
||||
type peerMap struct {
|
||||
|
@ -26,7 +25,7 @@ func (pm *peerMap) getRandom() (peer.ID, error) {
|
|||
for pID := range pm.m {
|
||||
return pID, nil
|
||||
}
|
||||
return "", utils.ErrNoPeersAvailable
|
||||
return "", ErrNoPeersAvailable
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -6,7 +6,6 @@ import (
|
|||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
)
|
||||
|
||||
func TestServiceSlot(t *testing.T) {
|
||||
|
@ -27,7 +26,7 @@ func TestServiceSlot(t *testing.T) {
|
|||
slots.getPeers(protocol).remove(peerID)
|
||||
//
|
||||
_, err = slots.getPeers(protocol).getRandom()
|
||||
require.Equal(t, err, utils.ErrNoPeersAvailable)
|
||||
require.Equal(t, err, ErrNoPeersAvailable)
|
||||
}
|
||||
|
||||
func TestServiceSlotRemovePeerFromAll(t *testing.T) {
|
||||
|
@ -50,7 +49,7 @@ func TestServiceSlotRemovePeerFromAll(t *testing.T) {
|
|||
slots.removePeer(peerID)
|
||||
//
|
||||
_, err = slots.getPeers(protocol).getRandom()
|
||||
require.Equal(t, err, utils.ErrNoPeersAvailable)
|
||||
require.Equal(t, err, ErrNoPeersAvailable)
|
||||
_, err = slots.getPeers(protocol1).getRandom()
|
||||
require.Equal(t, err, utils.ErrNoPeersAvailable)
|
||||
require.Equal(t, err, ErrNoPeersAvailable)
|
||||
}
|
||||
|
|
|
@ -269,11 +269,6 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot
|
|||
}
|
||||
}
|
||||
|
||||
if params.selectedPeer == "" {
|
||||
wf.metrics.RecordError(peerNotFoundFailure)
|
||||
return nil, ErrNoPeersAvailable
|
||||
}
|
||||
|
||||
pubSubTopicMap, err := contentFilterToPubSubTopicMap(contentFilter)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -281,16 +276,42 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot
|
|||
failedContentTopics := []string{}
|
||||
subscriptions := make([]*subscription.SubscriptionDetails, 0)
|
||||
for pubSubTopic, cTopics := range pubSubTopicMap {
|
||||
var selectedPeer peer.ID
|
||||
//TO Optimize: find a peer with all pubSubTopics in the list if possible, if not only then look for single pubSubTopic
|
||||
if params.pm != nil && params.selectedPeer == "" {
|
||||
selectedPeer, err = wf.pm.SelectPeer(
|
||||
peermanager.PeerSelectionCriteria{
|
||||
SelectionType: params.peerSelectionType,
|
||||
Proto: FilterSubscribeID_v20beta1,
|
||||
PubsubTopic: pubSubTopic,
|
||||
SpecificPeers: params.preferredPeers,
|
||||
Ctx: ctx,
|
||||
},
|
||||
)
|
||||
} else {
|
||||
selectedPeer = params.selectedPeer
|
||||
}
|
||||
|
||||
if selectedPeer == "" {
|
||||
wf.metrics.RecordError(peerNotFoundFailure)
|
||||
wf.log.Error("selecting peer", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics),
|
||||
zap.Error(err))
|
||||
failedContentTopics = append(failedContentTopics, cTopics...)
|
||||
continue
|
||||
}
|
||||
|
||||
var cFilter protocol.ContentFilter
|
||||
cFilter.PubsubTopic = pubSubTopic
|
||||
cFilter.ContentTopics = protocol.NewContentTopicSet(cTopics...)
|
||||
|
||||
err := wf.request(ctx, params, pb.FilterSubscribeRequest_SUBSCRIBE, cFilter)
|
||||
if err != nil {
|
||||
wf.log.Error("Failed to subscribe", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics),
|
||||
zap.Error(err))
|
||||
failedContentTopics = append(failedContentTopics, cTopics...)
|
||||
continue
|
||||
}
|
||||
subscriptions = append(subscriptions, wf.subscriptions.NewSubscription(params.selectedPeer, cFilter))
|
||||
subscriptions = append(subscriptions, wf.subscriptions.NewSubscription(selectedPeer, cFilter))
|
||||
}
|
||||
|
||||
if len(failedContentTopics) > 0 {
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package filter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -9,13 +8,14 @@ import (
|
|||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/waku-org/go-waku/waku/v2/peermanager"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type (
|
||||
FilterSubscribeParameters struct {
|
||||
selectedPeer peer.ID
|
||||
peerSelectionType peermanager.PeerSelection
|
||||
preferredPeers peer.IDSlice
|
||||
requestID []byte
|
||||
log *zap.Logger
|
||||
|
||||
|
@ -56,19 +56,8 @@ func WithPeer(p peer.ID) FilterSubscribeOption {
|
|||
// supports the chosen protocol, otherwise it will chose a peer from the node peerstore
|
||||
func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption {
|
||||
return func(params *FilterSubscribeParameters) error {
|
||||
var p peer.ID
|
||||
var err error
|
||||
if params.pm == nil {
|
||||
p, err = utils.SelectPeer(params.host, FilterSubscribeID_v20beta1, fromThesePeers, params.log)
|
||||
} else {
|
||||
p, err = params.pm.SelectPeer(FilterSubscribeID_v20beta1, "", fromThesePeers...)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
params.selectedPeer = p
|
||||
params.peerSelectionType = peermanager.Automatic
|
||||
params.preferredPeers = fromThesePeers
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
@ -77,14 +66,9 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption
|
|||
// with the lowest ping If a list of specific peers is passed, the peer will be chosen
|
||||
// from that list assuming it supports the chosen protocol, otherwise it will chose a
|
||||
// peer from the node peerstore
|
||||
func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) FilterSubscribeOption {
|
||||
func WithFastestPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption {
|
||||
return func(params *FilterSubscribeParameters) error {
|
||||
p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, FilterSubscribeID_v20beta1, fromThesePeers, params.log)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
params.selectedPeer = p
|
||||
params.peerSelectionType = peermanager.LowestRTT
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,7 +21,7 @@ func TestFilterOption(t *testing.T) {
|
|||
options := []FilterSubscribeOption{
|
||||
WithPeer("QmWLxGxG65CZ7vRj5oNXCJvbY9WkF9d9FxuJg8cg8Y7q3"),
|
||||
WithAutomaticPeerSelection(),
|
||||
WithFastestPeerSelection(context.Background()),
|
||||
WithFastestPeerSelection(),
|
||||
}
|
||||
|
||||
params := new(FilterSubscribeParameters)
|
||||
|
|
|
@ -13,6 +13,7 @@ import (
|
|||
"github.com/libp2p/go-msgio/pbio"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/waku-org/go-waku/logging"
|
||||
"github.com/waku-org/go-waku/waku/v2/peermanager"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter/pb"
|
||||
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||
|
@ -48,6 +49,7 @@ type (
|
|||
WakuFilter struct {
|
||||
*protocol.CommonService
|
||||
h host.Host
|
||||
pm *peermanager.PeerManager
|
||||
isFullNode bool
|
||||
msgSub relay.Subscription
|
||||
metrics Metrics
|
||||
|
@ -237,7 +239,17 @@ func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFil
|
|||
for _, opt := range optList {
|
||||
opt(params)
|
||||
}
|
||||
|
||||
if wf.pm != nil && params.selectedPeer == "" {
|
||||
params.selectedPeer, _ = wf.pm.SelectPeer(
|
||||
peermanager.PeerSelectionCriteria{
|
||||
SelectionType: params.peerSelectionType,
|
||||
Proto: FilterID_v20beta1,
|
||||
PubsubTopic: filter.Topic,
|
||||
SpecificPeers: params.preferredPeers,
|
||||
Ctx: ctx,
|
||||
},
|
||||
)
|
||||
}
|
||||
if params.selectedPeer == "" {
|
||||
wf.metrics.RecordError(peerNotFoundFailure)
|
||||
return nil, ErrNoPeersAvailable
|
||||
|
|
|
@ -1,12 +1,11 @@
|
|||
package legacy_filter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
"github.com/waku-org/go-waku/waku/v2/peermanager"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -14,6 +13,8 @@ type (
|
|||
FilterSubscribeParameters struct {
|
||||
host host.Host
|
||||
selectedPeer peer.ID
|
||||
peerSelectionType peermanager.PeerSelection
|
||||
preferredPeers peer.IDSlice
|
||||
log *zap.Logger
|
||||
}
|
||||
|
||||
|
@ -21,6 +22,7 @@ type (
|
|||
|
||||
FilterParameters struct {
|
||||
Timeout time.Duration
|
||||
pm *peermanager.PeerManager
|
||||
}
|
||||
|
||||
Option func(*FilterParameters)
|
||||
|
@ -32,6 +34,12 @@ func WithTimeout(timeout time.Duration) Option {
|
|||
}
|
||||
}
|
||||
|
||||
func WithPeerManager(pm *peermanager.PeerManager) Option {
|
||||
return func(params *FilterParameters) {
|
||||
params.pm = pm
|
||||
}
|
||||
}
|
||||
|
||||
func WithPeer(p peer.ID) FilterSubscribeOption {
|
||||
return func(params *FilterSubscribeParameters) {
|
||||
params.selectedPeer = p
|
||||
|
@ -43,12 +51,8 @@ func WithPeer(p peer.ID) FilterSubscribeOption {
|
|||
// supports the chosen protocol, otherwise it will chose a peer from the node peerstore
|
||||
func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption {
|
||||
return func(params *FilterSubscribeParameters) {
|
||||
p, err := utils.SelectPeer(params.host, FilterID_v20beta1, fromThesePeers, params.log)
|
||||
if err == nil {
|
||||
params.selectedPeer = p
|
||||
} else {
|
||||
params.log.Info("selecting peer", zap.Error(err))
|
||||
}
|
||||
params.peerSelectionType = peermanager.Automatic
|
||||
params.preferredPeers = fromThesePeers
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -56,14 +60,9 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption
|
|||
// with the lowest ping If a list of specific peers is passed, the peer will be chosen
|
||||
// from that list assuming it supports the chosen protocol, otherwise it will chose a
|
||||
// peer from the node peerstore
|
||||
func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) FilterSubscribeOption {
|
||||
func WithFastestPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption {
|
||||
return func(params *FilterSubscribeParameters) {
|
||||
p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, FilterID_v20beta1, fromThesePeers, params.log)
|
||||
if err == nil {
|
||||
params.selectedPeer = p
|
||||
} else {
|
||||
params.log.Info("selecting peer", zap.Error(err))
|
||||
}
|
||||
params.peerSelectionType = peermanager.LowestRTT
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@ func TestFilterOption(t *testing.T) {
|
|||
options := []FilterSubscribeOption{
|
||||
WithPeer("QmWLxGxG65CZ7vRj5oNXCJvbY9WkF9d9FxuJg8cg8Y7q3"),
|
||||
WithAutomaticPeerSelection(),
|
||||
WithFastestPeerSelection(context.Background()),
|
||||
WithFastestPeerSelection(),
|
||||
}
|
||||
|
||||
params := new(FilterSubscribeParameters)
|
||||
|
|
|
@ -150,11 +150,6 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, p
|
|||
return nil, errors.New("lightpush params are mandatory")
|
||||
}
|
||||
|
||||
if params.selectedPeer == "" {
|
||||
wakuLP.metrics.RecordError(peerNotFoundFailure)
|
||||
return nil, ErrNoPeersAvailable
|
||||
}
|
||||
|
||||
if len(params.requestID) == 0 {
|
||||
return nil, ErrInvalidID
|
||||
}
|
||||
|
@ -210,16 +205,12 @@ func (wakuLP *WakuLightPush) Stop() {
|
|||
wakuLP.h.RemoveStreamHandler(LightPushID_v20beta1)
|
||||
}
|
||||
|
||||
// Optional PublishToTopic is used to broadcast a WakuMessage to a pubsub topic via lightpush protocol
|
||||
// If pubSubTopic is not provided, then contentTopic is use to derive the relevant pubSubTopic via autosharding.
|
||||
func (wakuLP *WakuLightPush) PublishToTopic(ctx context.Context, message *wpb.WakuMessage, opts ...Option) ([]byte, error) {
|
||||
if message == nil {
|
||||
return nil, errors.New("message can't be null")
|
||||
}
|
||||
func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMessage, opts ...Option) (*lightPushParameters, error) {
|
||||
params := new(lightPushParameters)
|
||||
params.host = wakuLP.h
|
||||
params.log = wakuLP.log
|
||||
params.pm = wakuLP.pm
|
||||
var err error
|
||||
|
||||
optList := append(DefaultOptions(wakuLP.h), opts...)
|
||||
for _, opt := range optList {
|
||||
|
@ -227,12 +218,44 @@ func (wakuLP *WakuLightPush) PublishToTopic(ctx context.Context, message *wpb.Wa
|
|||
}
|
||||
|
||||
if params.pubsubTopic == "" {
|
||||
var err error
|
||||
params.pubsubTopic, err = protocol.GetPubSubTopicFromContentTopic(message.ContentTopic)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if params.pm != nil && params.selectedPeer == "" {
|
||||
params.selectedPeer, err = wakuLP.pm.SelectPeer(
|
||||
peermanager.PeerSelectionCriteria{
|
||||
SelectionType: params.peerSelectionType,
|
||||
Proto: LightPushID_v20beta1,
|
||||
PubsubTopic: params.pubsubTopic,
|
||||
SpecificPeers: params.preferredPeers,
|
||||
Ctx: ctx,
|
||||
},
|
||||
)
|
||||
}
|
||||
if params.selectedPeer == "" {
|
||||
if err != nil {
|
||||
params.log.Error("selecting peer", zap.Error(err))
|
||||
wakuLP.metrics.RecordError(peerNotFoundFailure)
|
||||
return nil, ErrNoPeersAvailable
|
||||
}
|
||||
}
|
||||
return params, nil
|
||||
}
|
||||
|
||||
// Optional PublishToTopic is used to broadcast a WakuMessage to a pubsub topic via lightpush protocol
|
||||
// If pubSubTopic is not provided, then contentTopic is use to derive the relevant pubSubTopic via autosharding.
|
||||
func (wakuLP *WakuLightPush) PublishToTopic(ctx context.Context, message *wpb.WakuMessage, opts ...Option) ([]byte, error) {
|
||||
if message == nil {
|
||||
return nil, errors.New("message can't be null")
|
||||
}
|
||||
|
||||
params, err := wakuLP.handleOpts(ctx, message, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
req := new(pb.PushRequest)
|
||||
req.Message = message
|
||||
req.PubsubTopic = params.pubsubTopic
|
||||
|
|
|
@ -1,19 +1,18 @@
|
|||
package lightpush
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/waku-org/go-waku/waku/v2/peermanager"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type lightPushParameters struct {
|
||||
host host.Host
|
||||
selectedPeer peer.ID
|
||||
peerSelectionType peermanager.PeerSelection
|
||||
preferredPeers peer.IDSlice
|
||||
requestID []byte
|
||||
pm *peermanager.PeerManager
|
||||
log *zap.Logger
|
||||
|
@ -36,18 +35,8 @@ func WithPeer(p peer.ID) Option {
|
|||
// from the node peerstore
|
||||
func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) Option {
|
||||
return func(params *lightPushParameters) {
|
||||
var p peer.ID
|
||||
var err error
|
||||
if params.pm == nil {
|
||||
p, err = utils.SelectPeer(params.host, LightPushID_v20beta1, fromThesePeers, params.log)
|
||||
} else {
|
||||
p, err = params.pm.SelectPeer(LightPushID_v20beta1, "", fromThesePeers...)
|
||||
}
|
||||
if err == nil {
|
||||
params.selectedPeer = p
|
||||
} else {
|
||||
params.log.Info("selecting peer", zap.Error(err))
|
||||
}
|
||||
params.peerSelectionType = peermanager.Automatic
|
||||
params.preferredPeers = fromThesePeers
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -61,14 +50,9 @@ func WithPubSubTopic(pubsubTopic string) Option {
|
|||
// with the lowest ping. If a list of specific peers is passed, the peer will be chosen
|
||||
// from that list assuming it supports the chosen protocol, otherwise it will chose a peer
|
||||
// from the node peerstore
|
||||
func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) Option {
|
||||
func WithFastestPeerSelection(fromThesePeers ...peer.ID) Option {
|
||||
return func(params *lightPushParameters) {
|
||||
p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, LightPushID_v20beta1, fromThesePeers, params.log)
|
||||
if err == nil {
|
||||
params.selectedPeer = p
|
||||
} else {
|
||||
params.log.Info("selecting peer", zap.Error(err))
|
||||
}
|
||||
params.peerSelectionType = peermanager.LowestRTT
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@ func TestLightPushOption(t *testing.T) {
|
|||
options := []Option{
|
||||
WithPeer("QmWLxGxG65CZ7vRj5oNXCJvbY9WkF9d9FxuJg8cg8Y7q3"),
|
||||
WithAutomaticPeerSelection(),
|
||||
WithFastestPeerSelection(context.Background()),
|
||||
WithFastestPeerSelection(),
|
||||
WithRequestID([]byte("requestID")),
|
||||
WithAutomaticRequestID(),
|
||||
}
|
||||
|
|
|
@ -106,6 +106,7 @@ func TestWakuLightPush(t *testing.T) {
|
|||
|
||||
var lpOptions []Option
|
||||
lpOptions = append(lpOptions, WithPubSubTopic(testTopic))
|
||||
lpOptions = append(lpOptions, WithPeer(host2.ID()))
|
||||
|
||||
// Checking that msg hash is correct
|
||||
hash, err := client.PublishToTopic(ctx, msg2, lpOptions...)
|
||||
|
@ -215,9 +216,10 @@ func TestWakuLightPushAutoSharding(t *testing.T) {
|
|||
<-sub2.Ch
|
||||
|
||||
}()
|
||||
|
||||
var lpOptions []Option
|
||||
lpOptions = append(lpOptions, WithPeer(host2.ID()))
|
||||
// Verifying successful request
|
||||
hash1, err := client.Publish(ctx, msg1)
|
||||
hash1, err := client.Publish(ctx, msg1, lpOptions...)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), string(pubSubTopic)).Hash(), hash1)
|
||||
|
||||
|
|
|
@ -28,7 +28,20 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts
|
|||
for _, opt := range optList {
|
||||
opt(params)
|
||||
}
|
||||
|
||||
if params.pm != nil && params.selectedPeer == "" {
|
||||
var err error
|
||||
params.selectedPeer, err = wakuPX.pm.SelectPeer(
|
||||
peermanager.PeerSelectionCriteria{
|
||||
SelectionType: params.peerSelectionType,
|
||||
Proto: PeerExchangeID_v20alpha1,
|
||||
SpecificPeers: params.preferredPeers,
|
||||
Ctx: ctx,
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if params.selectedPeer == "" {
|
||||
wakuPX.metrics.RecordError(dialFailure)
|
||||
return ErrNoPeersAvailable
|
||||
|
|
|
@ -1,18 +1,17 @@
|
|||
package peer_exchange
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/waku-org/go-waku/waku/v2/peermanager"
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type PeerExchangeParameters struct {
|
||||
host host.Host
|
||||
selectedPeer peer.ID
|
||||
peerSelectionType peermanager.PeerSelection
|
||||
preferredPeers peer.IDSlice
|
||||
pm *peermanager.PeerManager
|
||||
log *zap.Logger
|
||||
}
|
||||
|
@ -26,24 +25,15 @@ func WithPeer(p peer.ID) PeerExchangeOption {
|
|||
}
|
||||
}
|
||||
|
||||
// WithAutomaticPeerSelection is an option used to randomly select a peer from the peer store
|
||||
// WithAutomaticPeerSelection is an option used to randomly select a peer from the Waku peer store
|
||||
// to obtains peers from. If a list of specific peers is passed, the peer will be chosen
|
||||
// from that list assuming it supports the chosen protocol, otherwise it will chose a peer
|
||||
// from the node peerstore
|
||||
// Note: this option can only be used if WakuNode is initialized which internally intializes the peerManager
|
||||
func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) PeerExchangeOption {
|
||||
return func(params *PeerExchangeParameters) {
|
||||
var p peer.ID
|
||||
var err error
|
||||
if params.pm == nil {
|
||||
p, err = utils.SelectPeer(params.host, PeerExchangeID_v20alpha1, fromThesePeers, params.log)
|
||||
} else {
|
||||
p, err = params.pm.SelectPeer(PeerExchangeID_v20alpha1, "", fromThesePeers...)
|
||||
}
|
||||
if err == nil {
|
||||
params.selectedPeer = p
|
||||
} else {
|
||||
params.log.Info("selecting peer", zap.Error(err))
|
||||
}
|
||||
params.peerSelectionType = peermanager.Automatic
|
||||
params.preferredPeers = fromThesePeers
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -51,14 +41,10 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) PeerExchangeOption {
|
|||
// with the lowest ping. If a list of specific peers is passed, the peer will be chosen
|
||||
// from that list assuming it supports the chosen protocol, otherwise it will chose a peer
|
||||
// from the node peerstore
|
||||
func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) PeerExchangeOption {
|
||||
func WithFastestPeerSelection(fromThesePeers ...peer.ID) PeerExchangeOption {
|
||||
return func(params *PeerExchangeParameters) {
|
||||
p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, PeerExchangeID_v20alpha1, fromThesePeers, params.log)
|
||||
if err == nil {
|
||||
params.selectedPeer = p
|
||||
} else {
|
||||
params.log.Info("selecting peer", zap.Error(err))
|
||||
}
|
||||
params.peerSelectionType = peermanager.LowestRTT
|
||||
params.preferredPeers = fromThesePeers
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -163,7 +163,7 @@ func TestRetrieveProvidePeerExchangePeers(t *testing.T) {
|
|||
err = host3.Peerstore().AddProtocols(host1.ID(), PeerExchangeID_v20alpha1)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = px3.Request(context.Background(), 1)
|
||||
err = px3.Request(context.Background(), 1, WithPeer(host1.ID()))
|
||||
require.NoError(t, err)
|
||||
|
||||
time.Sleep(3 * time.Second) // Give the algorithm some time to work its magic
|
||||
|
|
|
@ -11,10 +11,10 @@ import (
|
|||
"go.uber.org/zap"
|
||||
|
||||
"github.com/waku-org/go-waku/logging"
|
||||
"github.com/waku-org/go-waku/waku/v2/peermanager"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
)
|
||||
|
||||
type Query struct {
|
||||
|
@ -82,6 +82,8 @@ type criteriaFN = func(msg *wpb.WakuMessage) (bool, error)
|
|||
|
||||
type HistoryRequestParameters struct {
|
||||
selectedPeer peer.ID
|
||||
peerSelectionType peermanager.PeerSelection
|
||||
preferredPeers peer.IDSlice
|
||||
localQuery bool
|
||||
requestID []byte
|
||||
cursor *pb.Index
|
||||
|
@ -104,20 +106,11 @@ func WithPeer(p peer.ID) HistoryRequestOption {
|
|||
// to request the message history. If a list of specific peers is passed, the peer will be chosen
|
||||
// from that list assuming it supports the chosen protocol, otherwise it will chose a peer
|
||||
// from the node peerstore
|
||||
// Note: This option is avaiable only with peerManager
|
||||
func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) HistoryRequestOption {
|
||||
return func(params *HistoryRequestParameters) {
|
||||
var p peer.ID
|
||||
var err error
|
||||
if params.s.pm == nil {
|
||||
p, err = utils.SelectPeer(params.s.h, StoreID_v20beta4, fromThesePeers, params.s.log)
|
||||
} else {
|
||||
p, err = params.s.pm.SelectPeer(StoreID_v20beta4, "", fromThesePeers...)
|
||||
}
|
||||
if err == nil {
|
||||
params.selectedPeer = p
|
||||
} else {
|
||||
params.s.log.Info("selecting peer", zap.Error(err))
|
||||
}
|
||||
params.peerSelectionType = peermanager.Automatic
|
||||
params.preferredPeers = fromThesePeers
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -125,14 +118,10 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) HistoryRequestOption
|
|||
// with the lowest ping. If a list of specific peers is passed, the peer will be chosen
|
||||
// from that list assuming it supports the chosen protocol, otherwise it will chose a peer
|
||||
// from the node peerstore
|
||||
func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) HistoryRequestOption {
|
||||
// Note: This option is avaiable only with peerManager
|
||||
func WithFastestPeerSelection(fromThesePeers ...peer.ID) HistoryRequestOption {
|
||||
return func(params *HistoryRequestParameters) {
|
||||
p, err := utils.SelectPeerWithLowestRTT(ctx, params.s.h, StoreID_v20beta4, fromThesePeers, params.s.log)
|
||||
if err == nil {
|
||||
params.selectedPeer = p
|
||||
} else {
|
||||
params.s.log.Info("selecting peer", zap.Error(err))
|
||||
}
|
||||
params.peerSelectionType = peermanager.LowestRTT
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -275,6 +264,21 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR
|
|||
for _, opt := range optList {
|
||||
opt(params)
|
||||
}
|
||||
if store.pm != nil && params.selectedPeer == "" {
|
||||
var err error
|
||||
params.selectedPeer, err = store.pm.SelectPeer(
|
||||
peermanager.PeerSelectionCriteria{
|
||||
SelectionType: params.peerSelectionType,
|
||||
Proto: StoreID_v20beta4,
|
||||
PubsubTopic: query.Topic,
|
||||
SpecificPeers: params.preferredPeers,
|
||||
Ctx: ctx,
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if !params.localQuery && params.selectedPeer == "" {
|
||||
store.metrics.RecordError(peerNotFoundFailure)
|
||||
|
|
|
@ -60,7 +60,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) {
|
|||
ContentTopics: []string{topic1},
|
||||
}
|
||||
|
||||
response, err := s2.Query(ctx, q, DefaultOptions()...)
|
||||
response, err := s2.Query(ctx, q, WithPeer(host1.ID()))
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Len(t, response.Messages, 1)
|
||||
|
@ -155,7 +155,7 @@ func TestWakuStoreProtocolNext(t *testing.T) {
|
|||
ContentTopics: []string{topic1},
|
||||
}
|
||||
|
||||
response, err := s2.Query(ctx, q, WithAutomaticPeerSelection(), WithAutomaticRequestID(), WithPaging(true, 2))
|
||||
response, err := s2.Query(ctx, q, WithPeer(host1.ID()), WithAutomaticRequestID(), WithPaging(true, 2))
|
||||
require.NoError(t, err)
|
||||
require.Len(t, response.Messages, 2)
|
||||
require.Equal(t, response.Messages[0].Timestamp, msg1.Timestamp)
|
||||
|
@ -230,7 +230,7 @@ func TestWakuStoreResult(t *testing.T) {
|
|||
ContentTopics: []string{topic1},
|
||||
}
|
||||
|
||||
result, err := s2.Query(ctx, q, WithAutomaticPeerSelection(), WithAutomaticRequestID(), WithPaging(true, 2))
|
||||
result, err := s2.Query(ctx, q, WithPeer(host1.ID()), WithAutomaticRequestID(), WithPaging(true, 2))
|
||||
require.NoError(t, err)
|
||||
require.False(t, result.started)
|
||||
require.Len(t, result.GetMessages(), 0)
|
||||
|
@ -332,7 +332,7 @@ func TestWakuStoreProtocolFind(t *testing.T) {
|
|||
return msg.ContentTopic == "hello", nil
|
||||
}
|
||||
|
||||
foundMsg, err := s2.Find(ctx, q, fn, WithAutomaticPeerSelection(), WithAutomaticRequestID(), WithPaging(true, 2))
|
||||
foundMsg, err := s2.Find(ctx, q, fn, WithPeer(host1.ID()), WithAutomaticRequestID(), WithPaging(true, 2))
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, foundMsg)
|
||||
require.Equal(t, "hello", foundMsg.ContentTopic)
|
||||
|
@ -341,7 +341,7 @@ func TestWakuStoreProtocolFind(t *testing.T) {
|
|||
return msg.ContentTopic == "bye", nil
|
||||
}
|
||||
|
||||
foundMsg, err = s2.Find(ctx, q, fn2, WithAutomaticPeerSelection(), WithAutomaticRequestID(), WithPaging(true, 2))
|
||||
foundMsg, err = s2.Find(ctx, q, fn2, WithPeer(host1.ID()), WithAutomaticRequestID(), WithPaging(true, 2))
|
||||
require.NoError(t, err)
|
||||
require.Nil(t, foundMsg)
|
||||
}
|
||||
|
|
|
@ -1,24 +1,10 @@
|
|||
package utils
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/libp2p/go-libp2p/core/protocol"
|
||||
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// ErrNoPeersAvailable is emitted when no suitable peers are found for
|
||||
// some protocol
|
||||
var ErrNoPeersAvailable = errors.New("no suitable peers found")
|
||||
|
||||
// GetPeerID is used to extract the peerID from a multiaddress
|
||||
func GetPeerID(m multiaddr.Multiaddr) (peer.ID, error) {
|
||||
peerIDStr, err := m.ValueForProtocol(multiaddr.P_P2P)
|
||||
|
@ -33,131 +19,3 @@ func GetPeerID(m multiaddr.Multiaddr) (peer.ID, error) {
|
|||
|
||||
return peerID, nil
|
||||
}
|
||||
|
||||
// FilterPeersByProto filters list of peers that support specified protocols.
|
||||
// If specificPeers is nil, all peers in the host's peerStore are considered for filtering.
|
||||
func FilterPeersByProto(host host.Host, specificPeers peer.IDSlice, proto ...protocol.ID) (peer.IDSlice, error) {
|
||||
peerSet := specificPeers
|
||||
if len(peerSet) == 0 {
|
||||
peerSet = host.Peerstore().Peers()
|
||||
}
|
||||
|
||||
var peers peer.IDSlice
|
||||
for _, peer := range peerSet {
|
||||
protocols, err := host.Peerstore().SupportsProtocols(peer, proto...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(protocols) > 0 {
|
||||
peers = append(peers, peer)
|
||||
}
|
||||
}
|
||||
return peers, nil
|
||||
}
|
||||
|
||||
// SelectRandomPeer selects randomly a peer from the list of peers passed.
|
||||
func SelectRandomPeer(peers peer.IDSlice, log *zap.Logger) (peer.ID, error) {
|
||||
if len(peers) >= 1 {
|
||||
peerID := peers[rand.Intn(len(peers))]
|
||||
// TODO: proper heuristic here that compares peer scores and selects "best" one. For now a random peer for the given protocol is returned
|
||||
return peerID, nil // nolint: gosec
|
||||
}
|
||||
|
||||
return "", ErrNoPeersAvailable
|
||||
}
|
||||
|
||||
// SelectPeer is used to return a random peer that supports a given protocol.
|
||||
// Note: Use this method only if WakuNode is not being initialized, otherwise use peermanager.SelectPeer.
|
||||
// If a list of specific peers is passed, the peer will be chosen from that list assuming
|
||||
// it supports the chosen protocol, otherwise it will chose a peer from the node peerstore
|
||||
func SelectPeer(host host.Host, protocolID protocol.ID, specificPeers []peer.ID, log *zap.Logger) (peer.ID, error) {
|
||||
// @TODO We need to be more strategic about which peers we dial. Right now we just set one on the service.
|
||||
// Ideally depending on the query and our set of peers we take a subset of ideal peers.
|
||||
// This will require us to check for various factors such as:
|
||||
// - which topics they track
|
||||
// - latency?
|
||||
// - default store peer?
|
||||
|
||||
peers, err := FilterPeersByProto(host, specificPeers, protocolID)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return SelectRandomPeer(peers, log)
|
||||
}
|
||||
|
||||
type pingResult struct {
|
||||
p peer.ID
|
||||
rtt time.Duration
|
||||
}
|
||||
|
||||
// SelectPeerWithLowestRTT will select a peer that supports a specific protocol with the lowest reply time
|
||||
// If a list of specific peers is passed, the peer will be chosen from that list assuming
|
||||
// it supports the chosen protocol, otherwise it will chose a peer from the node peerstore
|
||||
func SelectPeerWithLowestRTT(ctx context.Context, host host.Host, protocolID protocol.ID, specificPeers []peer.ID, _ *zap.Logger) (peer.ID, error) {
|
||||
var peers peer.IDSlice
|
||||
|
||||
peerSet := specificPeers
|
||||
if len(peerSet) == 0 {
|
||||
peerSet = host.Peerstore().Peers()
|
||||
}
|
||||
|
||||
for _, peer := range peerSet {
|
||||
protocols, err := host.Peerstore().SupportsProtocols(peer, protocolID)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if len(protocols) > 0 {
|
||||
peers = append(peers, peer)
|
||||
}
|
||||
}
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
waitCh := make(chan struct{})
|
||||
pingCh := make(chan pingResult, 1000)
|
||||
|
||||
wg.Add(len(peers))
|
||||
|
||||
go func() {
|
||||
for _, p := range peers {
|
||||
go func(p peer.ID) {
|
||||
defer wg.Done()
|
||||
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
|
||||
defer cancel()
|
||||
result := <-ping.Ping(ctx, host, p)
|
||||
if result.Error == nil {
|
||||
pingCh <- pingResult{
|
||||
p: p,
|
||||
rtt: result.RTT,
|
||||
}
|
||||
}
|
||||
}(p)
|
||||
}
|
||||
wg.Wait()
|
||||
close(waitCh)
|
||||
close(pingCh)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-waitCh:
|
||||
var min *pingResult
|
||||
for p := range pingCh {
|
||||
if min == nil {
|
||||
min = &p
|
||||
} else {
|
||||
if p.rtt < min.rtt {
|
||||
min = &p
|
||||
}
|
||||
}
|
||||
}
|
||||
if min == nil {
|
||||
return "", ErrNoPeersAvailable
|
||||
}
|
||||
|
||||
return min.p, nil
|
||||
case <-ctx.Done():
|
||||
return "", ErrNoPeersAvailable
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,83 +0,0 @@
|
|||
package tests
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/peerstore"
|
||||
"github.com/libp2p/go-libp2p/core/protocol"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/waku-org/go-waku/tests"
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
)
|
||||
|
||||
func TestSelectPeer(t *testing.T) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
h1, err := tests.MakeHost(ctx, 0, rand.Reader)
|
||||
require.NoError(t, err)
|
||||
defer h1.Close()
|
||||
|
||||
h2, err := tests.MakeHost(ctx, 0, rand.Reader)
|
||||
require.NoError(t, err)
|
||||
defer h2.Close()
|
||||
|
||||
h3, err := tests.MakeHost(ctx, 0, rand.Reader)
|
||||
require.NoError(t, err)
|
||||
defer h3.Close()
|
||||
|
||||
proto := protocol.ID("test/protocol")
|
||||
|
||||
h1.Peerstore().AddAddrs(h2.ID(), h2.Network().ListenAddresses(), peerstore.PermanentAddrTTL)
|
||||
h1.Peerstore().AddAddrs(h3.ID(), h3.Network().ListenAddresses(), peerstore.PermanentAddrTTL)
|
||||
|
||||
// No peers with selected protocol
|
||||
_, err = utils.SelectPeer(h1, proto, nil, utils.Logger())
|
||||
require.Error(t, utils.ErrNoPeersAvailable, err)
|
||||
|
||||
// Peers with selected protocol
|
||||
_ = h1.Peerstore().AddProtocols(h2.ID(), proto)
|
||||
_ = h1.Peerstore().AddProtocols(h3.ID(), proto)
|
||||
|
||||
_, err = utils.SelectPeerWithLowestRTT(ctx, h1, proto, nil, utils.Logger())
|
||||
require.NoError(t, err)
|
||||
|
||||
}
|
||||
|
||||
func TestSelectPeerWithLowestRTT(t *testing.T) {
|
||||
// help-wanted: how to slowdown the ping response to properly test the rtt
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
h1, err := tests.MakeHost(ctx, 0, rand.Reader)
|
||||
require.NoError(t, err)
|
||||
defer h1.Close()
|
||||
|
||||
h2, err := tests.MakeHost(ctx, 0, rand.Reader)
|
||||
require.NoError(t, err)
|
||||
defer h2.Close()
|
||||
|
||||
h3, err := tests.MakeHost(ctx, 0, rand.Reader)
|
||||
require.NoError(t, err)
|
||||
defer h3.Close()
|
||||
|
||||
proto := protocol.ID("test/protocol")
|
||||
|
||||
h1.Peerstore().AddAddrs(h2.ID(), h2.Network().ListenAddresses(), peerstore.PermanentAddrTTL)
|
||||
h1.Peerstore().AddAddrs(h3.ID(), h2.Network().ListenAddresses(), peerstore.PermanentAddrTTL)
|
||||
|
||||
// No peers with selected protocol
|
||||
_, err = utils.SelectPeerWithLowestRTT(ctx, h1, proto, nil, utils.Logger())
|
||||
require.Error(t, utils.ErrNoPeersAvailable, err)
|
||||
|
||||
// Peers with selected protocol
|
||||
_ = h1.Peerstore().AddProtocols(h2.ID(), proto)
|
||||
_ = h1.Peerstore().AddProtocols(h3.ID(), proto)
|
||||
|
||||
_, err = utils.SelectPeerWithLowestRTT(ctx, h1, proto, nil, utils.Logger())
|
||||
require.NoError(t, err)
|
||||
}
|
Loading…
Reference in New Issue