diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 36148a7f..c9ddcf42 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -521,7 +521,7 @@ func (w *WakuNode) startStore() { case <-w.quit: return case <-ticker.C: - _, err := utils.SelectPeer(w.host, string(store.StoreID_v20beta4), w.log) + _, err := utils.SelectPeer(w.host, string(store.StoreID_v20beta4), nil, w.log) if err == nil { break peerVerif } diff --git a/waku/v2/protocol/filter/waku_filter_option.go b/waku/v2/protocol/filter/waku_filter_option.go index 8c1acd68..b0b7c37f 100644 --- a/waku/v2/protocol/filter/waku_filter_option.go +++ b/waku/v2/protocol/filter/waku_filter_option.go @@ -38,9 +38,12 @@ func WithPeer(p peer.ID) FilterSubscribeOption { } } -func WithAutomaticPeerSelection() FilterSubscribeOption { +// WithAutomaticPeerSelection is an option used to randomly select a peer from the peer store. +// If a list of specific peers is passed, the peer will be chosen from that list assuming it +// supports the chosen protocol, otherwise it will chose a peer from the node peerstore +func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption { return func(params *FilterSubscribeParameters) { - p, err := utils.SelectPeer(params.host, string(FilterID_v20beta1), params.log) + p, err := utils.SelectPeer(params.host, string(FilterID_v20beta1), fromThesePeers, params.log) if err == nil { params.selectedPeer = *p } else { @@ -49,9 +52,13 @@ func WithAutomaticPeerSelection() FilterSubscribeOption { } } -func WithFastestPeerSelection(ctx context.Context) FilterSubscribeOption { +// WithFastestPeerSelection is an option used to select a peer from the peer store +// with the lowest ping If a list of specific peers is passed, the peer will be chosen +// from that list assuming it supports the chosen protocol, otherwise it will chose a +// peer from the node peerstore +func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) FilterSubscribeOption { return func(params *FilterSubscribeParameters) { - p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(FilterID_v20beta1), params.log) + p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(FilterID_v20beta1), fromThesePeers, params.log) if err == nil { params.selectedPeer = *p } else { diff --git a/waku/v2/protocol/lightpush/waku_lightpush_option.go b/waku/v2/protocol/lightpush/waku_lightpush_option.go index 19ad4e47..093dac04 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_option.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_option.go @@ -27,10 +27,12 @@ func WithPeer(p peer.ID) LightPushOption { } // WithAutomaticPeerSelection is an option used to randomly select a peer from the peer store -// to push a waku message to -func WithAutomaticPeerSelection() LightPushOption { +// to push a waku message to. If a list of specific peers is passed, the peer will be chosen +// from that list assuming it supports the chosen protocol, otherwise it will chose a peer +// from the node peerstore +func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) LightPushOption { return func(params *LightPushParameters) { - p, err := utils.SelectPeer(params.host, string(LightPushID_v20beta1), params.log) + p, err := utils.SelectPeer(params.host, string(LightPushID_v20beta1), fromThesePeers, params.log) if err == nil { params.selectedPeer = *p } else { @@ -40,10 +42,12 @@ func WithAutomaticPeerSelection() LightPushOption { } // WithFastestPeerSelection is an option used to select a peer from the peer store -// with the lowest ping -func WithFastestPeerSelection(ctx context.Context) LightPushOption { +// with the lowest ping. If a list of specific peers is passed, the peer will be chosen +// from that list assuming it supports the chosen protocol, otherwise it will chose a peer +// from the node peerstore +func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) LightPushOption { return func(params *LightPushParameters) { - p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(LightPushID_v20beta1), params.log) + p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(LightPushID_v20beta1), fromThesePeers, params.log) if err == nil { params.selectedPeer = *p } else { diff --git a/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go b/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go index db1c05fd..0b1f738b 100644 --- a/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go +++ b/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go @@ -25,10 +25,12 @@ func WithPeer(p peer.ID) PeerExchangeOption { } // WithAutomaticPeerSelection is an option used to randomly select a peer from the peer store -// to push a waku message to -func WithAutomaticPeerSelection() PeerExchangeOption { +// to obtains peers from. If a list of specific peers is passed, the peer will be chosen +// from that list assuming it supports the chosen protocol, otherwise it will chose a peer +// from the node peerstore +func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) PeerExchangeOption { return func(params *PeerExchangeParameters) { - p, err := utils.SelectPeer(params.host, string(PeerExchangeID_v20alpha1), params.log) + p, err := utils.SelectPeer(params.host, string(PeerExchangeID_v20alpha1), fromThesePeers, params.log) if err == nil { params.selectedPeer = *p } else { @@ -38,10 +40,12 @@ func WithAutomaticPeerSelection() PeerExchangeOption { } // WithFastestPeerSelection is an option used to select a peer from the peer store -// with the lowest ping -func WithFastestPeerSelection(ctx context.Context) PeerExchangeOption { +// with the lowest ping. If a list of specific peers is passed, the peer will be chosen +// from that list assuming it supports the chosen protocol, otherwise it will chose a peer +// from the node peerstore +func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) PeerExchangeOption { return func(params *PeerExchangeParameters) { - p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(PeerExchangeID_v20alpha1), params.log) + p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(PeerExchangeID_v20alpha1), fromThesePeers, params.log) if err == nil { params.selectedPeer = *p } else { diff --git a/waku/v2/protocol/store/waku_store.go b/waku/v2/protocol/store/waku_store.go index 4981381a..9c46ceae 100644 --- a/waku/v2/protocol/store/waku_store.go +++ b/waku/v2/protocol/store/waku_store.go @@ -324,10 +324,12 @@ func WithPeer(p peer.ID) HistoryRequestOption { } // WithAutomaticPeerSelection is an option used to randomly select a peer from the peer store -// to request the message history -func WithAutomaticPeerSelection() HistoryRequestOption { +// to request the message history. If a list of specific peers is passed, the peer will be chosen +// from that list assuming it supports the chosen protocol, otherwise it will chose a peer +// from the node peerstore +func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) HistoryRequestOption { return func(params *HistoryRequestParameters) { - p, err := utils.SelectPeer(params.s.h, string(StoreID_v20beta4), params.s.log) + p, err := utils.SelectPeer(params.s.h, string(StoreID_v20beta4), fromThesePeers, params.s.log) if err == nil { params.selectedPeer = *p } else { @@ -336,9 +338,13 @@ func WithAutomaticPeerSelection() HistoryRequestOption { } } -func WithFastestPeerSelection(ctx context.Context) HistoryRequestOption { +// WithFastestPeerSelection is an option used to select a peer from the peer store +// with the lowest ping. If a list of specific peers is passed, the peer will be chosen +// from that list assuming it supports the chosen protocol, otherwise it will chose a peer +// from the node peerstore +func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) HistoryRequestOption { return func(params *HistoryRequestParameters) { - p, err := utils.SelectPeerWithLowestRTT(ctx, params.s.h, string(StoreID_v20beta4), params.s.log) + p, err := utils.SelectPeerWithLowestRTT(ctx, params.s.h, string(StoreID_v20beta4), fromThesePeers, params.s.log) if err == nil { params.selectedPeer = *p } else { @@ -680,7 +686,7 @@ func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList } if len(peerList) == 0 { - p, err := utils.SelectPeer(store.h, string(StoreID_v20beta4), store.log) + p, err := utils.SelectPeer(store.h, string(StoreID_v20beta4), nil, store.log) if err != nil { store.log.Info("selecting peer", zap.Error(err)) return -1, ErrNoPeersAvailable diff --git a/waku/v2/utils/peer.go b/waku/v2/utils/peer.go index 94de2f21..20f87fb3 100644 --- a/waku/v2/utils/peer.go +++ b/waku/v2/utils/peer.go @@ -19,15 +19,23 @@ import ( var ErrNoPeersAvailable = errors.New("no suitable peers found") // SelectPeer is used to return a random peer that supports a given protocol. -func SelectPeer(host host.Host, protocolId string, log *zap.Logger) (*peer.ID, error) { +// If a list of specific peers is passed, the peer will be chosen from that list assuming +// it supports the chosen protocol, otherwise it will chose a peer from the node peerstore +func SelectPeer(host host.Host, protocolId string, specificPeers []peer.ID, log *zap.Logger) (*peer.ID, error) { // @TODO We need to be more strategic about which peers we dial. Right now we just set one on the service. // Ideally depending on the query and our set of peers we take a subset of ideal peers. // This will require us to check for various factors such as: // - which topics they track // - latency? // - default store peer? + + peerSet := specificPeers + if len(peerSet) == 0 { + peerSet = host.Peerstore().Peers() + } + var peers peer.IDSlice - for _, peer := range host.Peerstore().Peers() { + for _, peer := range peerSet { protocols, err := host.Peerstore().SupportsProtocols(peer, protocolId) if err != nil { log.Error("obtaining protocols supported by peers", zap.Error(err), logging.HostID("peer", peer)) @@ -53,9 +61,17 @@ type pingResult struct { } // SelectPeerWithLowestRTT will select a peer that supports a specific protocol with the lowest reply time -func SelectPeerWithLowestRTT(ctx context.Context, host host.Host, protocolId string, log *zap.Logger) (*peer.ID, error) { +// If a list of specific peers is passed, the peer will be chosen from that list assuming +// it supports the chosen protocol, otherwise it will chose a peer from the node peerstore +func SelectPeerWithLowestRTT(ctx context.Context, host host.Host, protocolId string, specificPeers []peer.ID, log *zap.Logger) (*peer.ID, error) { var peers peer.IDSlice - for _, peer := range host.Peerstore().Peers() { + + peerSet := specificPeers + if len(peerSet) == 0 { + peerSet = host.Peerstore().Peers() + } + + for _, peer := range peerSet { protocols, err := host.Peerstore().SupportsProtocols(peer, protocolId) if err != nil { log.Error("error obtaining the protocols supported by peers", zap.Error(err)) diff --git a/waku/v2/utils/peer_test.go b/waku/v2/utils/peer_test.go index 486075de..587fd483 100644 --- a/waku/v2/utils/peer_test.go +++ b/waku/v2/utils/peer_test.go @@ -33,14 +33,14 @@ func TestSelectPeer(t *testing.T) { h1.Peerstore().AddAddrs(h3.ID(), h2.Network().ListenAddresses(), peerstore.PermanentAddrTTL) // No peers with selected protocol - _, err = SelectPeer(h1, proto, Logger()) + _, err = SelectPeer(h1, proto, nil, Logger()) require.Error(t, ErrNoPeersAvailable, err) // Peers with selected protocol _ = h1.Peerstore().AddProtocols(h2.ID(), proto) _ = h1.Peerstore().AddProtocols(h3.ID(), proto) - _, err = SelectPeerWithLowestRTT(ctx, h1, proto, Logger()) + _, err = SelectPeerWithLowestRTT(ctx, h1, proto, nil, Logger()) require.NoError(t, err) } @@ -69,13 +69,13 @@ func TestSelectPeerWithLowestRTT(t *testing.T) { h1.Peerstore().AddAddrs(h3.ID(), h2.Network().ListenAddresses(), peerstore.PermanentAddrTTL) // No peers with selected protocol - _, err = SelectPeerWithLowestRTT(ctx, h1, proto, Logger()) + _, err = SelectPeerWithLowestRTT(ctx, h1, proto, nil, Logger()) require.Error(t, ErrNoPeersAvailable, err) // Peers with selected protocol _ = h1.Peerstore().AddProtocols(h2.ID(), proto) _ = h1.Peerstore().AddProtocols(h3.ID(), proto) - _, err = SelectPeerWithLowestRTT(ctx, h1, proto, Logger()) + _, err = SelectPeerWithLowestRTT(ctx, h1, proto, nil, Logger()) require.NoError(t, err) }