diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 1035ed9f..6e2fb1b4 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -283,6 +283,8 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { } } + w.opts.legacyFilterOpts = append(w.opts.legacyFilterOpts, legacy_filter.WithPeerManager(w.peermanager)) + w.legacyFilter = legacy_filter.NewWakuFilter(w.bcaster, w.opts.isLegacyFilterFullNode, w.timesource, w.opts.prometheusReg, w.log, w.opts.legacyFilterOpts...) w.filterFullNode = filter.NewWakuFilterFullNode(w.timesource, w.opts.prometheusReg, w.log, w.opts.filterOpts...) w.filterLightNode = filter.NewWakuFilterLightNode(w.bcaster, w.peermanager, w.timesource, w.opts.prometheusReg, w.log) diff --git a/waku/v2/node/wakunode2_test.go b/waku/v2/node/wakunode2_test.go index cd5c17f8..71a3d101 100644 --- a/waku/v2/node/wakunode2_test.go +++ b/waku/v2/node/wakunode2_test.go @@ -17,6 +17,7 @@ import ( "github.com/waku-org/go-waku/waku/persistence" "github.com/waku-org/go-waku/waku/persistence/sqlite" "github.com/waku-org/go-waku/waku/v2/dnsdisc" + "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" @@ -263,7 +264,7 @@ func TestDecoupledStoreFromRelay(t *testing.T) { _, filter, err := wakuNode2.LegacyFilter().Subscribe(ctx, legacy_filter.ContentFilter{ Topic: string(relay.DefaultWakuTopic), - }) + }, legacy_filter.WithPeer(wakuNode1.host.ID())) require.NoError(t, err) // Sleep to make sure the filter is subscribed @@ -303,9 +304,9 @@ func TestDecoupledStoreFromRelay(t *testing.T) { require.NoError(t, err) defer wakuNode3.Stop() - err = wakuNode3.DialPeerWithMultiAddress(ctx, wakuNode2.ListenAddresses()[0]) + _, err = wakuNode3.AddPeer(wakuNode2.ListenAddresses()[0], peerstore.Static, []string{relay.DefaultWakuTopic}, store.StoreID_v20beta4) require.NoError(t, err) - + time.Sleep(2 * time.Second) // NODE2 should have returned the message received via filter result, err := wakuNode3.Store().Query(ctx, store.Query{}) require.NoError(t, err) diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index 6ded3fa2..a99b2cfc 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -3,6 +3,7 @@ package peermanager import ( "context" "errors" + "math/rand" "sync" "time" @@ -13,13 +14,13 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/protocol" + "github.com/libp2p/go-libp2p/p2p/protocol/ping" ma "github.com/multiformats/go-multiaddr" "github.com/waku-org/go-waku/logging" wps "github.com/waku-org/go-waku/waku/v2/peerstore" waku_proto "github.com/waku-org/go-waku/waku/v2/protocol" wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr" "github.com/waku-org/go-waku/waku/v2/protocol/relay" - "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -45,6 +46,18 @@ type PeerManager struct { subRelayTopics map[string]*NodeTopicDetails } +// PeerSelection provides various options based on which Peer is selected from a list of peers. +type PeerSelection int + +const ( + Automatic PeerSelection = iota + LowestRTT +) + +// ErrNoPeersAvailable is emitted when no suitable peers are found for +// some protocol +var ErrNoPeersAvailable = errors.New("no suitable peers found") + const peerConnectivityLoopSecs = 15 const maxConnsToPeerRatio = 5 @@ -161,10 +174,10 @@ func (pm *PeerManager) getRelayPeers(specificPeers ...peer.ID) (inRelayPeers pee //Need to filter peers to check if they support relay if inPeers.Len() != 0 { - inRelayPeers, _ = utils.FilterPeersByProto(pm.host, inPeers, relay.WakuRelayID_v200) + inRelayPeers, _ = pm.FilterPeersByProto(inPeers, relay.WakuRelayID_v200) } if outPeers.Len() != 0 { - outRelayPeers, _ = utils.FilterPeersByProto(pm.host, outPeers, relay.WakuRelayID_v200) + outRelayPeers, _ = pm.FilterPeersByProto(outPeers, relay.WakuRelayID_v200) } return } @@ -426,34 +439,34 @@ func (pm *PeerManager) SelectPeerByContentTopic(proto protocol.ID, contentTopic if err != nil { return "", err } - return pm.SelectPeer(proto, pubsubTopic, specificPeers...) + return pm.SelectPeer(PeerSelectionCriteria{PubsubTopic: pubsubTopic, Proto: proto, SpecificPeers: specificPeers}) } -// SelectPeer is used to return a random peer that supports a given protocol. +// SelectRandomPeer is used to return a random peer that supports a given protocol. // If a list of specific peers is passed, the peer will be chosen from that list assuming // it supports the chosen protocol, otherwise it will chose a peer from the service slot. // If a peer cannot be found in the service slot, a peer will be selected from node peerstore // if pubSubTopic is specified, peer is selected from list that support the pubSubTopic -func (pm *PeerManager) SelectPeer(proto protocol.ID, pubSubTopic string, specificPeers ...peer.ID) (peer.ID, error) { +func (pm *PeerManager) SelectRandomPeer(criteria PeerSelectionCriteria) (peer.ID, error) { // @TODO We need to be more strategic about which peers we dial. Right now we just set one on the service. // Ideally depending on the query and our set of peers we take a subset of ideal peers. // This will require us to check for various factors such as: // - which topics they track // - latency? - if peerID := pm.selectServicePeer(proto, pubSubTopic, specificPeers...); peerID != nil { + if peerID := pm.selectServicePeer(criteria.Proto, criteria.PubsubTopic, criteria.SpecificPeers...); peerID != nil { return *peerID, nil } // if not found in serviceSlots or proto == WakuRelayIDv200 - filteredPeers, err := utils.FilterPeersByProto(pm.host, specificPeers, proto) + filteredPeers, err := pm.FilterPeersByProto(criteria.SpecificPeers, criteria.Proto) if err != nil { return "", err } - if pubSubTopic != "" { - filteredPeers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(pubSubTopic, filteredPeers...) + if criteria.PubsubTopic != "" { + filteredPeers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(criteria.PubsubTopic, filteredPeers...) } - return utils.SelectRandomPeer(filteredPeers, pm.logger) + return selectRandomPeer(filteredPeers, pm.logger) } func (pm *PeerManager) selectServicePeer(proto protocol.ID, pubSubTopic string, specificPeers ...peer.ID) (peerIDPtr *peer.ID) { @@ -473,7 +486,7 @@ func (pm *PeerManager) selectServicePeer(proto protocol.ID, pubSubTopic string, keys = append(keys, i) } selectedPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(pubSubTopic, keys...) - peerID, err := utils.SelectRandomPeer(selectedPeers, pm.logger) + peerID, err := selectRandomPeer(selectedPeers, pm.logger) if err == nil { peerIDPtr = &peerID } else { @@ -483,3 +496,138 @@ func (pm *PeerManager) selectServicePeer(proto protocol.ID, pubSubTopic string, } return } + +// PeerSelectionCriteria is the selection Criteria that is used by PeerManager to select peers. +type PeerSelectionCriteria struct { + SelectionType PeerSelection + Proto protocol.ID + PubsubTopic string + SpecificPeers peer.IDSlice + Ctx context.Context +} + +// SelectPeer selects a peer based on selectionType specified. +// Context is required only in case of selectionType set to LowestRTT +func (pm *PeerManager) SelectPeer(criteria PeerSelectionCriteria) (peer.ID, error) { + + switch criteria.SelectionType { + case Automatic: + return pm.SelectRandomPeer(criteria) + case LowestRTT: + if criteria.Ctx == nil { + criteria.Ctx = context.Background() + pm.logger.Warn("context is not passed for peerSelectionwithRTT, using background context") + } + return pm.SelectPeerWithLowestRTT(criteria) + default: + return "", errors.New("unknown peer selection type specified") + } +} + +type pingResult struct { + p peer.ID + rtt time.Duration +} + +// SelectPeerWithLowestRTT will select a peer that supports a specific protocol with the lowest reply time +// If a list of specific peers is passed, the peer will be chosen from that list assuming +// it supports the chosen protocol, otherwise it will chose a peer from the node peerstore +// TO OPTIMIZE: As of now the peer with lowest RTT is identified when select is called, this should be optimized +// to maintain the RTT as part of peer-scoring and just select based on that. +func (pm *PeerManager) SelectPeerWithLowestRTT(criteria PeerSelectionCriteria) (peer.ID, error) { + var peers peer.IDSlice + var err error + if criteria.Ctx == nil { + criteria.Ctx = context.Background() + } + + if criteria.PubsubTopic != "" { + peers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(criteria.PubsubTopic, criteria.SpecificPeers...) + } + + peers, err = pm.FilterPeersByProto(peers, criteria.Proto) + if err != nil { + return "", err + } + wg := sync.WaitGroup{} + waitCh := make(chan struct{}) + pingCh := make(chan pingResult, 1000) + + wg.Add(len(peers)) + + go func() { + for _, p := range peers { + go func(p peer.ID) { + defer wg.Done() + ctx, cancel := context.WithTimeout(criteria.Ctx, 3*time.Second) + defer cancel() + result := <-ping.Ping(ctx, pm.host, p) + if result.Error == nil { + pingCh <- pingResult{ + p: p, + rtt: result.RTT, + } + } else { + pm.logger.Debug("could not ping", logging.HostID("peer", p), zap.Error(result.Error)) + } + }(p) + } + wg.Wait() + close(waitCh) + close(pingCh) + }() + + select { + case <-waitCh: + var min *pingResult + for p := range pingCh { + if min == nil { + min = &p + } else { + if p.rtt < min.rtt { + min = &p + } + } + } + if min == nil { + return "", ErrNoPeersAvailable + } + + return min.p, nil + case <-criteria.Ctx.Done(): + return "", ErrNoPeersAvailable + } +} + +// selectRandomPeer selects randomly a peer from the list of peers passed. +func selectRandomPeer(peers peer.IDSlice, log *zap.Logger) (peer.ID, error) { + if len(peers) >= 1 { + peerID := peers[rand.Intn(len(peers))] + // TODO: proper heuristic here that compares peer scores and selects "best" one. For now a random peer for the given protocol is returned + return peerID, nil // nolint: gosec + } + + return "", ErrNoPeersAvailable +} + +// FilterPeersByProto filters list of peers that support specified protocols. +// If specificPeers is nil, all peers in the host's peerStore are considered for filtering. +func (pm *PeerManager) FilterPeersByProto(specificPeers peer.IDSlice, proto ...protocol.ID) (peer.IDSlice, error) { + peerSet := specificPeers + if len(peerSet) == 0 { + peerSet = pm.host.Peerstore().Peers() + } + + var peers peer.IDSlice + for _, peer := range peerSet { + protocols, err := pm.host.Peerstore().SupportsProtocols(peer, proto...) + if err != nil { + return nil, err + } + + if len(protocols) > 0 { + peers = append(peers, peer) + } + } + return peers, nil +} diff --git a/waku/v2/peermanager/peer_manager_test.go b/waku/v2/peermanager/peer_manager_test.go index ce42ab43..385c6f0a 100644 --- a/waku/v2/peermanager/peer_manager_test.go +++ b/waku/v2/peermanager/peer_manager_test.go @@ -4,6 +4,7 @@ import ( "context" "crypto/rand" "fmt" + "strings" "testing" "time" @@ -20,16 +21,22 @@ import ( func getAddr(h host.Host) multiaddr.Multiaddr { id, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", h.ID().Pretty())) - return h.Network().ListenAddresses()[0].Encapsulate(id) + var selectedAddr multiaddr.Multiaddr + //For now skipping circuit relay addresses as libp2p seems to be returning empty p2p-circuit addresses. + for _, addr := range h.Network().ListenAddresses() { + if strings.Contains(addr.String(), "p2p-circuit") { + continue + } + selectedAddr = addr + } + return selectedAddr.Encapsulate(id) } func initTest(t *testing.T) (context.Context, *PeerManager, func()) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() // hosts h1, err := tests.MakeHost(ctx, 0, rand.Reader) require.NoError(t, err) - defer h1.Close() // host 1 is used by peer manager pm := NewPeerManager(10, 20, utils.Logger()) @@ -66,7 +73,7 @@ func TestServiceSlots(t *testing.T) { /////////////// // select peer from pm, currently only h2 is set in pm - peerID, err := pm.SelectPeer(protocol, "") + peerID, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol}) require.NoError(t, err) require.Equal(t, peerID, h2.ID()) @@ -75,7 +82,7 @@ func TestServiceSlots(t *testing.T) { require.NoError(t, err) // check that returned peer is h2 or h3 peer - peerID, err = pm.SelectPeer(protocol, "") + peerID, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol}) require.NoError(t, err) if peerID == h2.ID() || peerID == h3.ID() { //Test success @@ -91,15 +98,15 @@ func TestServiceSlots(t *testing.T) { require.NoError(t, err) defer h4.Close() - _, err = pm.SelectPeer(protocol1, "") - require.Error(t, err, utils.ErrNoPeersAvailable) + _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol1}) + require.Error(t, err, ErrNoPeersAvailable) // add h4 peer for protocol1 _, err = pm.AddPeer(getAddr(h4), wps.Static, []string{""}, libp2pProtocol.ID(protocol1)) require.NoError(t, err) //Test peer selection for protocol1 - peerID, err = pm.SelectPeer(protocol1, "") + peerID, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol1}) require.NoError(t, err) require.Equal(t, peerID, h4.ID()) @@ -127,19 +134,22 @@ func TestPeerSelection(t *testing.T) { _, err = pm.AddPeer(getAddr(h3), wps.Static, []string{"/waku/rs/2/1"}, libp2pProtocol.ID(protocol)) require.NoError(t, err) - _, err = pm.SelectPeer(protocol, "") + _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol}) require.NoError(t, err) - peerID, err := pm.SelectPeer(protocol, "/waku/rs/2/2") + peerID, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopic: "/waku/rs/2/2"}) require.NoError(t, err) require.Equal(t, h2.ID(), peerID) - _, err = pm.SelectPeer(protocol, "/waku/rs/2/3") - require.Error(t, utils.ErrNoPeersAvailable, err) + _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopic: "/waku/rs/2/3"}) + require.Error(t, ErrNoPeersAvailable, err) - _, err = pm.SelectPeer(protocol, "/waku/rs/2/1") + _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopic: "/waku/rs/2/1"}) require.NoError(t, err) + //Test for selectWithLowestRTT + _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: LowestRTT, Proto: protocol, PubsubTopic: "/waku/rs/2/1"}) + require.NoError(t, err) } func TestDefaultProtocol(t *testing.T) { @@ -149,8 +159,8 @@ func TestDefaultProtocol(t *testing.T) { // check peer for default protocol /////////////// //Test empty peer selection for relay protocol - _, err := pm.SelectPeer(relay.WakuRelayID_v200, "") - require.Error(t, err, utils.ErrNoPeersAvailable) + _, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: relay.WakuRelayID_v200}) + require.Error(t, err, ErrNoPeersAvailable) /////////////// // getting peer for default protocol @@ -164,7 +174,7 @@ func TestDefaultProtocol(t *testing.T) { require.NoError(t, err) // since we are not passing peerList, selectPeer fn using filterByProto checks in PeerStore for peers with same protocol. - peerID, err := pm.SelectPeer(relay.WakuRelayID_v200, "") + peerID, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: relay.WakuRelayID_v200}) require.NoError(t, err) require.Equal(t, peerID, h5.ID()) } @@ -184,13 +194,13 @@ func TestAdditionAndRemovalOfPeer(t *testing.T) { _, err = pm.AddPeer(getAddr(h6), wps.Static, []string{""}, protocol2) require.NoError(t, err) - peerID, err := pm.SelectPeer(protocol2, "") + peerID, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol2}) require.NoError(t, err) require.Equal(t, peerID, h6.ID()) pm.RemovePeer(peerID) - _, err = pm.SelectPeer(protocol2, "") - require.Error(t, err, utils.ErrNoPeersAvailable) + _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol2}) + require.Error(t, err, ErrNoPeersAvailable) } func TestConnectToRelayPeers(t *testing.T) { diff --git a/waku/v2/peermanager/service_slot.go b/waku/v2/peermanager/service_slot.go index c88711e7..df63b7ac 100644 --- a/waku/v2/peermanager/service_slot.go +++ b/waku/v2/peermanager/service_slot.go @@ -6,7 +6,6 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/relay" - "github.com/waku-org/go-waku/waku/v2/utils" ) type peerMap struct { @@ -26,7 +25,7 @@ func (pm *peerMap) getRandom() (peer.ID, error) { for pID := range pm.m { return pID, nil } - return "", utils.ErrNoPeersAvailable + return "", ErrNoPeersAvailable } diff --git a/waku/v2/peermanager/service_slot_test.go b/waku/v2/peermanager/service_slot_test.go index 381a1425..493685a7 100644 --- a/waku/v2/peermanager/service_slot_test.go +++ b/waku/v2/peermanager/service_slot_test.go @@ -6,7 +6,6 @@ import ( "github.com/libp2p/go-libp2p/core/peer" libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" "github.com/stretchr/testify/require" - "github.com/waku-org/go-waku/waku/v2/utils" ) func TestServiceSlot(t *testing.T) { @@ -27,7 +26,7 @@ func TestServiceSlot(t *testing.T) { slots.getPeers(protocol).remove(peerID) // _, err = slots.getPeers(protocol).getRandom() - require.Equal(t, err, utils.ErrNoPeersAvailable) + require.Equal(t, err, ErrNoPeersAvailable) } func TestServiceSlotRemovePeerFromAll(t *testing.T) { @@ -50,7 +49,7 @@ func TestServiceSlotRemovePeerFromAll(t *testing.T) { slots.removePeer(peerID) // _, err = slots.getPeers(protocol).getRandom() - require.Equal(t, err, utils.ErrNoPeersAvailable) + require.Equal(t, err, ErrNoPeersAvailable) _, err = slots.getPeers(protocol1).getRandom() - require.Equal(t, err, utils.ErrNoPeersAvailable) + require.Equal(t, err, ErrNoPeersAvailable) } diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index f3930111..3b141836 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -269,11 +269,6 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot } } - if params.selectedPeer == "" { - wf.metrics.RecordError(peerNotFoundFailure) - return nil, ErrNoPeersAvailable - } - pubSubTopicMap, err := contentFilterToPubSubTopicMap(contentFilter) if err != nil { return nil, err @@ -281,16 +276,42 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot failedContentTopics := []string{} subscriptions := make([]*subscription.SubscriptionDetails, 0) for pubSubTopic, cTopics := range pubSubTopicMap { + var selectedPeer peer.ID + //TO Optimize: find a peer with all pubSubTopics in the list if possible, if not only then look for single pubSubTopic + if params.pm != nil && params.selectedPeer == "" { + selectedPeer, err = wf.pm.SelectPeer( + peermanager.PeerSelectionCriteria{ + SelectionType: params.peerSelectionType, + Proto: FilterSubscribeID_v20beta1, + PubsubTopic: pubSubTopic, + SpecificPeers: params.preferredPeers, + Ctx: ctx, + }, + ) + } else { + selectedPeer = params.selectedPeer + } + + if selectedPeer == "" { + wf.metrics.RecordError(peerNotFoundFailure) + wf.log.Error("selecting peer", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), + zap.Error(err)) + failedContentTopics = append(failedContentTopics, cTopics...) + continue + } + var cFilter protocol.ContentFilter cFilter.PubsubTopic = pubSubTopic cFilter.ContentTopics = protocol.NewContentTopicSet(cTopics...) + err := wf.request(ctx, params, pb.FilterSubscribeRequest_SUBSCRIBE, cFilter) if err != nil { wf.log.Error("Failed to subscribe", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), zap.Error(err)) failedContentTopics = append(failedContentTopics, cTopics...) + continue } - subscriptions = append(subscriptions, wf.subscriptions.NewSubscription(params.selectedPeer, cFilter)) + subscriptions = append(subscriptions, wf.subscriptions.NewSubscription(selectedPeer, cFilter)) } if len(failedContentTopics) > 0 { diff --git a/waku/v2/protocol/filter/options.go b/waku/v2/protocol/filter/options.go index 991dde86..d73bee91 100644 --- a/waku/v2/protocol/filter/options.go +++ b/waku/v2/protocol/filter/options.go @@ -1,7 +1,6 @@ package filter import ( - "context" "sync" "time" @@ -9,15 +8,16 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/protocol" - "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) type ( FilterSubscribeParameters struct { - selectedPeer peer.ID - requestID []byte - log *zap.Logger + selectedPeer peer.ID + peerSelectionType peermanager.PeerSelection + preferredPeers peer.IDSlice + requestID []byte + log *zap.Logger // Subscribe-specific host host.Host @@ -56,19 +56,8 @@ func WithPeer(p peer.ID) FilterSubscribeOption { // supports the chosen protocol, otherwise it will chose a peer from the node peerstore func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption { return func(params *FilterSubscribeParameters) error { - var p peer.ID - var err error - if params.pm == nil { - p, err = utils.SelectPeer(params.host, FilterSubscribeID_v20beta1, fromThesePeers, params.log) - } else { - p, err = params.pm.SelectPeer(FilterSubscribeID_v20beta1, "", fromThesePeers...) - } - - if err != nil { - return err - } - - params.selectedPeer = p + params.peerSelectionType = peermanager.Automatic + params.preferredPeers = fromThesePeers return nil } } @@ -77,14 +66,9 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption // with the lowest ping If a list of specific peers is passed, the peer will be chosen // from that list assuming it supports the chosen protocol, otherwise it will chose a // peer from the node peerstore -func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) FilterSubscribeOption { +func WithFastestPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption { return func(params *FilterSubscribeParameters) error { - p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, FilterSubscribeID_v20beta1, fromThesePeers, params.log) - if err != nil { - return err - } - - params.selectedPeer = p + params.peerSelectionType = peermanager.LowestRTT return nil } } diff --git a/waku/v2/protocol/filter/options_test.go b/waku/v2/protocol/filter/options_test.go index eeca00ee..d02bb2da 100644 --- a/waku/v2/protocol/filter/options_test.go +++ b/waku/v2/protocol/filter/options_test.go @@ -21,7 +21,7 @@ func TestFilterOption(t *testing.T) { options := []FilterSubscribeOption{ WithPeer("QmWLxGxG65CZ7vRj5oNXCJvbY9WkF9d9FxuJg8cg8Y7q3"), WithAutomaticPeerSelection(), - WithFastestPeerSelection(context.Background()), + WithFastestPeerSelection(), } params := new(FilterSubscribeParameters) diff --git a/waku/v2/protocol/legacy_filter/waku_filter.go b/waku/v2/protocol/legacy_filter/waku_filter.go index e28a0729..f2079d40 100644 --- a/waku/v2/protocol/legacy_filter/waku_filter.go +++ b/waku/v2/protocol/legacy_filter/waku_filter.go @@ -13,6 +13,7 @@ import ( "github.com/libp2p/go-msgio/pbio" "github.com/prometheus/client_golang/prometheus" "github.com/waku-org/go-waku/logging" + "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter/pb" wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" @@ -48,6 +49,7 @@ type ( WakuFilter struct { *protocol.CommonService h host.Host + pm *peermanager.PeerManager isFullNode bool msgSub relay.Subscription metrics Metrics @@ -237,7 +239,17 @@ func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFil for _, opt := range optList { opt(params) } - + if wf.pm != nil && params.selectedPeer == "" { + params.selectedPeer, _ = wf.pm.SelectPeer( + peermanager.PeerSelectionCriteria{ + SelectionType: params.peerSelectionType, + Proto: FilterID_v20beta1, + PubsubTopic: filter.Topic, + SpecificPeers: params.preferredPeers, + Ctx: ctx, + }, + ) + } if params.selectedPeer == "" { wf.metrics.RecordError(peerNotFoundFailure) return nil, ErrNoPeersAvailable diff --git a/waku/v2/protocol/legacy_filter/waku_filter_option.go b/waku/v2/protocol/legacy_filter/waku_filter_option.go index 8b548b64..4b103a36 100644 --- a/waku/v2/protocol/legacy_filter/waku_filter_option.go +++ b/waku/v2/protocol/legacy_filter/waku_filter_option.go @@ -1,26 +1,28 @@ package legacy_filter import ( - "context" "time" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" - "github.com/waku-org/go-waku/waku/v2/utils" + "github.com/waku-org/go-waku/waku/v2/peermanager" "go.uber.org/zap" ) type ( FilterSubscribeParameters struct { - host host.Host - selectedPeer peer.ID - log *zap.Logger + host host.Host + selectedPeer peer.ID + peerSelectionType peermanager.PeerSelection + preferredPeers peer.IDSlice + log *zap.Logger } FilterSubscribeOption func(*FilterSubscribeParameters) FilterParameters struct { Timeout time.Duration + pm *peermanager.PeerManager } Option func(*FilterParameters) @@ -32,6 +34,12 @@ func WithTimeout(timeout time.Duration) Option { } } +func WithPeerManager(pm *peermanager.PeerManager) Option { + return func(params *FilterParameters) { + params.pm = pm + } +} + func WithPeer(p peer.ID) FilterSubscribeOption { return func(params *FilterSubscribeParameters) { params.selectedPeer = p @@ -43,12 +51,8 @@ func WithPeer(p peer.ID) FilterSubscribeOption { // supports the chosen protocol, otherwise it will chose a peer from the node peerstore func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption { return func(params *FilterSubscribeParameters) { - p, err := utils.SelectPeer(params.host, FilterID_v20beta1, fromThesePeers, params.log) - if err == nil { - params.selectedPeer = p - } else { - params.log.Info("selecting peer", zap.Error(err)) - } + params.peerSelectionType = peermanager.Automatic + params.preferredPeers = fromThesePeers } } @@ -56,14 +60,9 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption // with the lowest ping If a list of specific peers is passed, the peer will be chosen // from that list assuming it supports the chosen protocol, otherwise it will chose a // peer from the node peerstore -func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) FilterSubscribeOption { +func WithFastestPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption { return func(params *FilterSubscribeParameters) { - p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, FilterID_v20beta1, fromThesePeers, params.log) - if err == nil { - params.selectedPeer = p - } else { - params.log.Info("selecting peer", zap.Error(err)) - } + params.peerSelectionType = peermanager.LowestRTT } } diff --git a/waku/v2/protocol/legacy_filter/waku_filter_option_test.go b/waku/v2/protocol/legacy_filter/waku_filter_option_test.go index ab732bbb..bf0e0ffc 100644 --- a/waku/v2/protocol/legacy_filter/waku_filter_option_test.go +++ b/waku/v2/protocol/legacy_filter/waku_filter_option_test.go @@ -20,7 +20,7 @@ func TestFilterOption(t *testing.T) { options := []FilterSubscribeOption{ WithPeer("QmWLxGxG65CZ7vRj5oNXCJvbY9WkF9d9FxuJg8cg8Y7q3"), WithAutomaticPeerSelection(), - WithFastestPeerSelection(context.Background()), + WithFastestPeerSelection(), } params := new(FilterSubscribeParameters) diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index 8570ce81..2de94bc2 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -150,11 +150,6 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, p return nil, errors.New("lightpush params are mandatory") } - if params.selectedPeer == "" { - wakuLP.metrics.RecordError(peerNotFoundFailure) - return nil, ErrNoPeersAvailable - } - if len(params.requestID) == 0 { return nil, ErrInvalidID } @@ -210,16 +205,12 @@ func (wakuLP *WakuLightPush) Stop() { wakuLP.h.RemoveStreamHandler(LightPushID_v20beta1) } -// Optional PublishToTopic is used to broadcast a WakuMessage to a pubsub topic via lightpush protocol -// If pubSubTopic is not provided, then contentTopic is use to derive the relevant pubSubTopic via autosharding. -func (wakuLP *WakuLightPush) PublishToTopic(ctx context.Context, message *wpb.WakuMessage, opts ...Option) ([]byte, error) { - if message == nil { - return nil, errors.New("message can't be null") - } +func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMessage, opts ...Option) (*lightPushParameters, error) { params := new(lightPushParameters) params.host = wakuLP.h params.log = wakuLP.log params.pm = wakuLP.pm + var err error optList := append(DefaultOptions(wakuLP.h), opts...) for _, opt := range optList { @@ -227,12 +218,44 @@ func (wakuLP *WakuLightPush) PublishToTopic(ctx context.Context, message *wpb.Wa } if params.pubsubTopic == "" { - var err error params.pubsubTopic, err = protocol.GetPubSubTopicFromContentTopic(message.ContentTopic) if err != nil { return nil, err } } + + if params.pm != nil && params.selectedPeer == "" { + params.selectedPeer, err = wakuLP.pm.SelectPeer( + peermanager.PeerSelectionCriteria{ + SelectionType: params.peerSelectionType, + Proto: LightPushID_v20beta1, + PubsubTopic: params.pubsubTopic, + SpecificPeers: params.preferredPeers, + Ctx: ctx, + }, + ) + } + if params.selectedPeer == "" { + if err != nil { + params.log.Error("selecting peer", zap.Error(err)) + wakuLP.metrics.RecordError(peerNotFoundFailure) + return nil, ErrNoPeersAvailable + } + } + return params, nil +} + +// Optional PublishToTopic is used to broadcast a WakuMessage to a pubsub topic via lightpush protocol +// If pubSubTopic is not provided, then contentTopic is use to derive the relevant pubSubTopic via autosharding. +func (wakuLP *WakuLightPush) PublishToTopic(ctx context.Context, message *wpb.WakuMessage, opts ...Option) ([]byte, error) { + if message == nil { + return nil, errors.New("message can't be null") + } + + params, err := wakuLP.handleOpts(ctx, message, opts...) + if err != nil { + return nil, err + } req := new(pb.PushRequest) req.Message = message req.PubsubTopic = params.pubsubTopic diff --git a/waku/v2/protocol/lightpush/waku_lightpush_option.go b/waku/v2/protocol/lightpush/waku_lightpush_option.go index f0d496d6..55dba60a 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_option.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_option.go @@ -1,23 +1,22 @@ package lightpush import ( - "context" - "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/protocol" - "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) type lightPushParameters struct { - host host.Host - selectedPeer peer.ID - requestID []byte - pm *peermanager.PeerManager - log *zap.Logger - pubsubTopic string + host host.Host + selectedPeer peer.ID + peerSelectionType peermanager.PeerSelection + preferredPeers peer.IDSlice + requestID []byte + pm *peermanager.PeerManager + log *zap.Logger + pubsubTopic string } // Option is the type of options accepted when performing LightPush protocol requests @@ -36,18 +35,8 @@ func WithPeer(p peer.ID) Option { // from the node peerstore func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) Option { return func(params *lightPushParameters) { - var p peer.ID - var err error - if params.pm == nil { - p, err = utils.SelectPeer(params.host, LightPushID_v20beta1, fromThesePeers, params.log) - } else { - p, err = params.pm.SelectPeer(LightPushID_v20beta1, "", fromThesePeers...) - } - if err == nil { - params.selectedPeer = p - } else { - params.log.Info("selecting peer", zap.Error(err)) - } + params.peerSelectionType = peermanager.Automatic + params.preferredPeers = fromThesePeers } } @@ -61,14 +50,9 @@ func WithPubSubTopic(pubsubTopic string) Option { // with the lowest ping. If a list of specific peers is passed, the peer will be chosen // from that list assuming it supports the chosen protocol, otherwise it will chose a peer // from the node peerstore -func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) Option { +func WithFastestPeerSelection(fromThesePeers ...peer.ID) Option { return func(params *lightPushParameters) { - p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, LightPushID_v20beta1, fromThesePeers, params.log) - if err == nil { - params.selectedPeer = p - } else { - params.log.Info("selecting peer", zap.Error(err)) - } + params.peerSelectionType = peermanager.LowestRTT } } diff --git a/waku/v2/protocol/lightpush/waku_lightpush_option_test.go b/waku/v2/protocol/lightpush/waku_lightpush_option_test.go index 6fa21677..3bb72654 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_option_test.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_option_test.go @@ -20,7 +20,7 @@ func TestLightPushOption(t *testing.T) { options := []Option{ WithPeer("QmWLxGxG65CZ7vRj5oNXCJvbY9WkF9d9FxuJg8cg8Y7q3"), WithAutomaticPeerSelection(), - WithFastestPeerSelection(context.Background()), + WithFastestPeerSelection(), WithRequestID([]byte("requestID")), WithAutomaticRequestID(), } diff --git a/waku/v2/protocol/lightpush/waku_lightpush_test.go b/waku/v2/protocol/lightpush/waku_lightpush_test.go index 5a919194..e0bda40f 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_test.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_test.go @@ -106,6 +106,7 @@ func TestWakuLightPush(t *testing.T) { var lpOptions []Option lpOptions = append(lpOptions, WithPubSubTopic(testTopic)) + lpOptions = append(lpOptions, WithPeer(host2.ID())) // Checking that msg hash is correct hash, err := client.PublishToTopic(ctx, msg2, lpOptions...) @@ -215,9 +216,10 @@ func TestWakuLightPushAutoSharding(t *testing.T) { <-sub2.Ch }() - + var lpOptions []Option + lpOptions = append(lpOptions, WithPeer(host2.ID())) // Verifying successful request - hash1, err := client.Publish(ctx, msg1) + hash1, err := client.Publish(ctx, msg1, lpOptions...) require.NoError(t, err) require.Equal(t, protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), string(pubSubTopic)).Hash(), hash1) diff --git a/waku/v2/protocol/peer_exchange/client.go b/waku/v2/protocol/peer_exchange/client.go index 1466b845..65a7305f 100644 --- a/waku/v2/protocol/peer_exchange/client.go +++ b/waku/v2/protocol/peer_exchange/client.go @@ -28,7 +28,20 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts for _, opt := range optList { opt(params) } - + if params.pm != nil && params.selectedPeer == "" { + var err error + params.selectedPeer, err = wakuPX.pm.SelectPeer( + peermanager.PeerSelectionCriteria{ + SelectionType: params.peerSelectionType, + Proto: PeerExchangeID_v20alpha1, + SpecificPeers: params.preferredPeers, + Ctx: ctx, + }, + ) + if err != nil { + return err + } + } if params.selectedPeer == "" { wakuPX.metrics.RecordError(dialFailure) return ErrNoPeersAvailable diff --git a/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go b/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go index 704e0d21..57cf2455 100644 --- a/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go +++ b/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go @@ -1,20 +1,19 @@ package peer_exchange import ( - "context" - "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/waku/v2/peermanager" - "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) type PeerExchangeParameters struct { - host host.Host - selectedPeer peer.ID - pm *peermanager.PeerManager - log *zap.Logger + host host.Host + selectedPeer peer.ID + peerSelectionType peermanager.PeerSelection + preferredPeers peer.IDSlice + pm *peermanager.PeerManager + log *zap.Logger } type PeerExchangeOption func(*PeerExchangeParameters) @@ -26,24 +25,15 @@ func WithPeer(p peer.ID) PeerExchangeOption { } } -// WithAutomaticPeerSelection is an option used to randomly select a peer from the peer store +// WithAutomaticPeerSelection is an option used to randomly select a peer from the Waku peer store // to obtains peers from. If a list of specific peers is passed, the peer will be chosen // from that list assuming it supports the chosen protocol, otherwise it will chose a peer // from the node peerstore +// Note: this option can only be used if WakuNode is initialized which internally intializes the peerManager func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) PeerExchangeOption { return func(params *PeerExchangeParameters) { - var p peer.ID - var err error - if params.pm == nil { - p, err = utils.SelectPeer(params.host, PeerExchangeID_v20alpha1, fromThesePeers, params.log) - } else { - p, err = params.pm.SelectPeer(PeerExchangeID_v20alpha1, "", fromThesePeers...) - } - if err == nil { - params.selectedPeer = p - } else { - params.log.Info("selecting peer", zap.Error(err)) - } + params.peerSelectionType = peermanager.Automatic + params.preferredPeers = fromThesePeers } } @@ -51,14 +41,10 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) PeerExchangeOption { // with the lowest ping. If a list of specific peers is passed, the peer will be chosen // from that list assuming it supports the chosen protocol, otherwise it will chose a peer // from the node peerstore -func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) PeerExchangeOption { +func WithFastestPeerSelection(fromThesePeers ...peer.ID) PeerExchangeOption { return func(params *PeerExchangeParameters) { - p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, PeerExchangeID_v20alpha1, fromThesePeers, params.log) - if err == nil { - params.selectedPeer = p - } else { - params.log.Info("selecting peer", zap.Error(err)) - } + params.peerSelectionType = peermanager.LowestRTT + params.preferredPeers = fromThesePeers } } diff --git a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go index 93e4f0a1..ca64b55f 100644 --- a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go +++ b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go @@ -163,7 +163,7 @@ func TestRetrieveProvidePeerExchangePeers(t *testing.T) { err = host3.Peerstore().AddProtocols(host1.ID(), PeerExchangeID_v20alpha1) require.NoError(t, err) - err = px3.Request(context.Background(), 1) + err = px3.Request(context.Background(), 1, WithPeer(host1.ID())) require.NoError(t, err) time.Sleep(3 * time.Second) // Give the algorithm some time to work its magic diff --git a/waku/v2/protocol/store/waku_store_client.go b/waku/v2/protocol/store/waku_store_client.go index 3855ff6a..68a0b390 100644 --- a/waku/v2/protocol/store/waku_store_client.go +++ b/waku/v2/protocol/store/waku_store_client.go @@ -11,10 +11,10 @@ import ( "go.uber.org/zap" "github.com/waku-org/go-waku/logging" + "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/protocol" wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" - "github.com/waku-org/go-waku/waku/v2/utils" ) type Query struct { @@ -81,12 +81,14 @@ func (r *Result) GetMessages() []*wpb.WakuMessage { type criteriaFN = func(msg *wpb.WakuMessage) (bool, error) type HistoryRequestParameters struct { - selectedPeer peer.ID - localQuery bool - requestID []byte - cursor *pb.Index - pageSize uint64 - asc bool + selectedPeer peer.ID + peerSelectionType peermanager.PeerSelection + preferredPeers peer.IDSlice + localQuery bool + requestID []byte + cursor *pb.Index + pageSize uint64 + asc bool s *WakuStore } @@ -104,20 +106,11 @@ func WithPeer(p peer.ID) HistoryRequestOption { // to request the message history. If a list of specific peers is passed, the peer will be chosen // from that list assuming it supports the chosen protocol, otherwise it will chose a peer // from the node peerstore +// Note: This option is avaiable only with peerManager func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) HistoryRequestOption { return func(params *HistoryRequestParameters) { - var p peer.ID - var err error - if params.s.pm == nil { - p, err = utils.SelectPeer(params.s.h, StoreID_v20beta4, fromThesePeers, params.s.log) - } else { - p, err = params.s.pm.SelectPeer(StoreID_v20beta4, "", fromThesePeers...) - } - if err == nil { - params.selectedPeer = p - } else { - params.s.log.Info("selecting peer", zap.Error(err)) - } + params.peerSelectionType = peermanager.Automatic + params.preferredPeers = fromThesePeers } } @@ -125,14 +118,10 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) HistoryRequestOption // with the lowest ping. If a list of specific peers is passed, the peer will be chosen // from that list assuming it supports the chosen protocol, otherwise it will chose a peer // from the node peerstore -func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) HistoryRequestOption { +// Note: This option is avaiable only with peerManager +func WithFastestPeerSelection(fromThesePeers ...peer.ID) HistoryRequestOption { return func(params *HistoryRequestParameters) { - p, err := utils.SelectPeerWithLowestRTT(ctx, params.s.h, StoreID_v20beta4, fromThesePeers, params.s.log) - if err == nil { - params.selectedPeer = p - } else { - params.s.log.Info("selecting peer", zap.Error(err)) - } + params.peerSelectionType = peermanager.LowestRTT } } @@ -275,6 +264,21 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR for _, opt := range optList { opt(params) } + if store.pm != nil && params.selectedPeer == "" { + var err error + params.selectedPeer, err = store.pm.SelectPeer( + peermanager.PeerSelectionCriteria{ + SelectionType: params.peerSelectionType, + Proto: StoreID_v20beta4, + PubsubTopic: query.Topic, + SpecificPeers: params.preferredPeers, + Ctx: ctx, + }, + ) + if err != nil { + return nil, err + } + } if !params.localQuery && params.selectedPeer == "" { store.metrics.RecordError(peerNotFoundFailure) diff --git a/waku/v2/protocol/store/waku_store_protocol_test.go b/waku/v2/protocol/store/waku_store_protocol_test.go index f3490cfb..690861c9 100644 --- a/waku/v2/protocol/store/waku_store_protocol_test.go +++ b/waku/v2/protocol/store/waku_store_protocol_test.go @@ -60,7 +60,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) { ContentTopics: []string{topic1}, } - response, err := s2.Query(ctx, q, DefaultOptions()...) + response, err := s2.Query(ctx, q, WithPeer(host1.ID())) require.NoError(t, err) require.Len(t, response.Messages, 1) @@ -155,7 +155,7 @@ func TestWakuStoreProtocolNext(t *testing.T) { ContentTopics: []string{topic1}, } - response, err := s2.Query(ctx, q, WithAutomaticPeerSelection(), WithAutomaticRequestID(), WithPaging(true, 2)) + response, err := s2.Query(ctx, q, WithPeer(host1.ID()), WithAutomaticRequestID(), WithPaging(true, 2)) require.NoError(t, err) require.Len(t, response.Messages, 2) require.Equal(t, response.Messages[0].Timestamp, msg1.Timestamp) @@ -230,7 +230,7 @@ func TestWakuStoreResult(t *testing.T) { ContentTopics: []string{topic1}, } - result, err := s2.Query(ctx, q, WithAutomaticPeerSelection(), WithAutomaticRequestID(), WithPaging(true, 2)) + result, err := s2.Query(ctx, q, WithPeer(host1.ID()), WithAutomaticRequestID(), WithPaging(true, 2)) require.NoError(t, err) require.False(t, result.started) require.Len(t, result.GetMessages(), 0) @@ -332,7 +332,7 @@ func TestWakuStoreProtocolFind(t *testing.T) { return msg.ContentTopic == "hello", nil } - foundMsg, err := s2.Find(ctx, q, fn, WithAutomaticPeerSelection(), WithAutomaticRequestID(), WithPaging(true, 2)) + foundMsg, err := s2.Find(ctx, q, fn, WithPeer(host1.ID()), WithAutomaticRequestID(), WithPaging(true, 2)) require.NoError(t, err) require.NotNil(t, foundMsg) require.Equal(t, "hello", foundMsg.ContentTopic) @@ -341,7 +341,7 @@ func TestWakuStoreProtocolFind(t *testing.T) { return msg.ContentTopic == "bye", nil } - foundMsg, err = s2.Find(ctx, q, fn2, WithAutomaticPeerSelection(), WithAutomaticRequestID(), WithPaging(true, 2)) + foundMsg, err = s2.Find(ctx, q, fn2, WithPeer(host1.ID()), WithAutomaticRequestID(), WithPaging(true, 2)) require.NoError(t, err) require.Nil(t, foundMsg) } diff --git a/waku/v2/utils/peer.go b/waku/v2/utils/peer.go index e2173b30..8321dc3e 100644 --- a/waku/v2/utils/peer.go +++ b/waku/v2/utils/peer.go @@ -1,24 +1,10 @@ package utils import ( - "context" - "errors" - "math/rand" - "sync" - "time" - - "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/core/protocol" - "github.com/libp2p/go-libp2p/p2p/protocol/ping" "github.com/multiformats/go-multiaddr" - "go.uber.org/zap" ) -// ErrNoPeersAvailable is emitted when no suitable peers are found for -// some protocol -var ErrNoPeersAvailable = errors.New("no suitable peers found") - // GetPeerID is used to extract the peerID from a multiaddress func GetPeerID(m multiaddr.Multiaddr) (peer.ID, error) { peerIDStr, err := m.ValueForProtocol(multiaddr.P_P2P) @@ -33,131 +19,3 @@ func GetPeerID(m multiaddr.Multiaddr) (peer.ID, error) { return peerID, nil } - -// FilterPeersByProto filters list of peers that support specified protocols. -// If specificPeers is nil, all peers in the host's peerStore are considered for filtering. -func FilterPeersByProto(host host.Host, specificPeers peer.IDSlice, proto ...protocol.ID) (peer.IDSlice, error) { - peerSet := specificPeers - if len(peerSet) == 0 { - peerSet = host.Peerstore().Peers() - } - - var peers peer.IDSlice - for _, peer := range peerSet { - protocols, err := host.Peerstore().SupportsProtocols(peer, proto...) - if err != nil { - return nil, err - } - - if len(protocols) > 0 { - peers = append(peers, peer) - } - } - return peers, nil -} - -// SelectRandomPeer selects randomly a peer from the list of peers passed. -func SelectRandomPeer(peers peer.IDSlice, log *zap.Logger) (peer.ID, error) { - if len(peers) >= 1 { - peerID := peers[rand.Intn(len(peers))] - // TODO: proper heuristic here that compares peer scores and selects "best" one. For now a random peer for the given protocol is returned - return peerID, nil // nolint: gosec - } - - return "", ErrNoPeersAvailable -} - -// SelectPeer is used to return a random peer that supports a given protocol. -// Note: Use this method only if WakuNode is not being initialized, otherwise use peermanager.SelectPeer. -// If a list of specific peers is passed, the peer will be chosen from that list assuming -// it supports the chosen protocol, otherwise it will chose a peer from the node peerstore -func SelectPeer(host host.Host, protocolID protocol.ID, specificPeers []peer.ID, log *zap.Logger) (peer.ID, error) { - // @TODO We need to be more strategic about which peers we dial. Right now we just set one on the service. - // Ideally depending on the query and our set of peers we take a subset of ideal peers. - // This will require us to check for various factors such as: - // - which topics they track - // - latency? - // - default store peer? - - peers, err := FilterPeersByProto(host, specificPeers, protocolID) - if err != nil { - return "", err - } - - return SelectRandomPeer(peers, log) -} - -type pingResult struct { - p peer.ID - rtt time.Duration -} - -// SelectPeerWithLowestRTT will select a peer that supports a specific protocol with the lowest reply time -// If a list of specific peers is passed, the peer will be chosen from that list assuming -// it supports the chosen protocol, otherwise it will chose a peer from the node peerstore -func SelectPeerWithLowestRTT(ctx context.Context, host host.Host, protocolID protocol.ID, specificPeers []peer.ID, _ *zap.Logger) (peer.ID, error) { - var peers peer.IDSlice - - peerSet := specificPeers - if len(peerSet) == 0 { - peerSet = host.Peerstore().Peers() - } - - for _, peer := range peerSet { - protocols, err := host.Peerstore().SupportsProtocols(peer, protocolID) - if err != nil { - return "", err - } - - if len(protocols) > 0 { - peers = append(peers, peer) - } - } - - wg := sync.WaitGroup{} - waitCh := make(chan struct{}) - pingCh := make(chan pingResult, 1000) - - wg.Add(len(peers)) - - go func() { - for _, p := range peers { - go func(p peer.ID) { - defer wg.Done() - ctx, cancel := context.WithTimeout(ctx, 3*time.Second) - defer cancel() - result := <-ping.Ping(ctx, host, p) - if result.Error == nil { - pingCh <- pingResult{ - p: p, - rtt: result.RTT, - } - } - }(p) - } - wg.Wait() - close(waitCh) - close(pingCh) - }() - - select { - case <-waitCh: - var min *pingResult - for p := range pingCh { - if min == nil { - min = &p - } else { - if p.rtt < min.rtt { - min = &p - } - } - } - if min == nil { - return "", ErrNoPeersAvailable - } - - return min.p, nil - case <-ctx.Done(): - return "", ErrNoPeersAvailable - } -} diff --git a/waku/v2/utils/test/peer_test.go b/waku/v2/utils/test/peer_test.go deleted file mode 100644 index 44463c7d..00000000 --- a/waku/v2/utils/test/peer_test.go +++ /dev/null @@ -1,83 +0,0 @@ -package tests - -import ( - "context" - "crypto/rand" - "testing" - "time" - - "github.com/libp2p/go-libp2p/core/peerstore" - "github.com/libp2p/go-libp2p/core/protocol" - "github.com/stretchr/testify/require" - "github.com/waku-org/go-waku/tests" - "github.com/waku-org/go-waku/waku/v2/utils" -) - -func TestSelectPeer(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - h1, err := tests.MakeHost(ctx, 0, rand.Reader) - require.NoError(t, err) - defer h1.Close() - - h2, err := tests.MakeHost(ctx, 0, rand.Reader) - require.NoError(t, err) - defer h2.Close() - - h3, err := tests.MakeHost(ctx, 0, rand.Reader) - require.NoError(t, err) - defer h3.Close() - - proto := protocol.ID("test/protocol") - - h1.Peerstore().AddAddrs(h2.ID(), h2.Network().ListenAddresses(), peerstore.PermanentAddrTTL) - h1.Peerstore().AddAddrs(h3.ID(), h3.Network().ListenAddresses(), peerstore.PermanentAddrTTL) - - // No peers with selected protocol - _, err = utils.SelectPeer(h1, proto, nil, utils.Logger()) - require.Error(t, utils.ErrNoPeersAvailable, err) - - // Peers with selected protocol - _ = h1.Peerstore().AddProtocols(h2.ID(), proto) - _ = h1.Peerstore().AddProtocols(h3.ID(), proto) - - _, err = utils.SelectPeerWithLowestRTT(ctx, h1, proto, nil, utils.Logger()) - require.NoError(t, err) - -} - -func TestSelectPeerWithLowestRTT(t *testing.T) { - // help-wanted: how to slowdown the ping response to properly test the rtt - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - h1, err := tests.MakeHost(ctx, 0, rand.Reader) - require.NoError(t, err) - defer h1.Close() - - h2, err := tests.MakeHost(ctx, 0, rand.Reader) - require.NoError(t, err) - defer h2.Close() - - h3, err := tests.MakeHost(ctx, 0, rand.Reader) - require.NoError(t, err) - defer h3.Close() - - proto := protocol.ID("test/protocol") - - h1.Peerstore().AddAddrs(h2.ID(), h2.Network().ListenAddresses(), peerstore.PermanentAddrTTL) - h1.Peerstore().AddAddrs(h3.ID(), h2.Network().ListenAddresses(), peerstore.PermanentAddrTTL) - - // No peers with selected protocol - _, err = utils.SelectPeerWithLowestRTT(ctx, h1, proto, nil, utils.Logger()) - require.Error(t, utils.ErrNoPeersAvailable, err) - - // Peers with selected protocol - _ = h1.Peerstore().AddProtocols(h2.ID(), proto) - _ = h1.Peerstore().AddProtocols(h3.ID(), proto) - - _, err = utils.SelectPeerWithLowestRTT(ctx, h1, proto, nil, utils.Logger()) - require.NoError(t, err) -}