feat: allow specifying list of peerIDs to chose when doing a request

This commit is contained in:
Richard Ramos 2022-11-24 17:50:43 -04:00 committed by RichΛrd
parent 04a2a3fbae
commit 3620a6b222
7 changed files with 68 additions and 31 deletions

View File

@ -521,7 +521,7 @@ func (w *WakuNode) startStore() {
case <-w.quit: case <-w.quit:
return return
case <-ticker.C: case <-ticker.C:
_, err := utils.SelectPeer(w.host, string(store.StoreID_v20beta4), w.log) _, err := utils.SelectPeer(w.host, string(store.StoreID_v20beta4), nil, w.log)
if err == nil { if err == nil {
break peerVerif break peerVerif
} }

View File

@ -38,9 +38,12 @@ func WithPeer(p peer.ID) FilterSubscribeOption {
} }
} }
func WithAutomaticPeerSelection() FilterSubscribeOption { // WithAutomaticPeerSelection is an option used to randomly select a peer from the peer store.
// If a list of specific peers is passed, the peer will be chosen from that list assuming it
// supports the chosen protocol, otherwise it will chose a peer from the node peerstore
func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption {
return func(params *FilterSubscribeParameters) { return func(params *FilterSubscribeParameters) {
p, err := utils.SelectPeer(params.host, string(FilterID_v20beta1), params.log) p, err := utils.SelectPeer(params.host, string(FilterID_v20beta1), fromThesePeers, params.log)
if err == nil { if err == nil {
params.selectedPeer = *p params.selectedPeer = *p
} else { } else {
@ -49,9 +52,13 @@ func WithAutomaticPeerSelection() FilterSubscribeOption {
} }
} }
func WithFastestPeerSelection(ctx context.Context) FilterSubscribeOption { // WithFastestPeerSelection is an option used to select a peer from the peer store
// with the lowest ping If a list of specific peers is passed, the peer will be chosen
// from that list assuming it supports the chosen protocol, otherwise it will chose a
// peer from the node peerstore
func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) FilterSubscribeOption {
return func(params *FilterSubscribeParameters) { return func(params *FilterSubscribeParameters) {
p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(FilterID_v20beta1), params.log) p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(FilterID_v20beta1), fromThesePeers, params.log)
if err == nil { if err == nil {
params.selectedPeer = *p params.selectedPeer = *p
} else { } else {

View File

@ -27,10 +27,12 @@ func WithPeer(p peer.ID) LightPushOption {
} }
// WithAutomaticPeerSelection is an option used to randomly select a peer from the peer store // WithAutomaticPeerSelection is an option used to randomly select a peer from the peer store
// to push a waku message to // to push a waku message to. If a list of specific peers is passed, the peer will be chosen
func WithAutomaticPeerSelection() LightPushOption { // from that list assuming it supports the chosen protocol, otherwise it will chose a peer
// from the node peerstore
func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) LightPushOption {
return func(params *LightPushParameters) { return func(params *LightPushParameters) {
p, err := utils.SelectPeer(params.host, string(LightPushID_v20beta1), params.log) p, err := utils.SelectPeer(params.host, string(LightPushID_v20beta1), fromThesePeers, params.log)
if err == nil { if err == nil {
params.selectedPeer = *p params.selectedPeer = *p
} else { } else {
@ -40,10 +42,12 @@ func WithAutomaticPeerSelection() LightPushOption {
} }
// WithFastestPeerSelection is an option used to select a peer from the peer store // WithFastestPeerSelection is an option used to select a peer from the peer store
// with the lowest ping // with the lowest ping. If a list of specific peers is passed, the peer will be chosen
func WithFastestPeerSelection(ctx context.Context) LightPushOption { // from that list assuming it supports the chosen protocol, otherwise it will chose a peer
// from the node peerstore
func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) LightPushOption {
return func(params *LightPushParameters) { return func(params *LightPushParameters) {
p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(LightPushID_v20beta1), params.log) p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(LightPushID_v20beta1), fromThesePeers, params.log)
if err == nil { if err == nil {
params.selectedPeer = *p params.selectedPeer = *p
} else { } else {

View File

@ -25,10 +25,12 @@ func WithPeer(p peer.ID) PeerExchangeOption {
} }
// WithAutomaticPeerSelection is an option used to randomly select a peer from the peer store // WithAutomaticPeerSelection is an option used to randomly select a peer from the peer store
// to push a waku message to // to obtains peers from. If a list of specific peers is passed, the peer will be chosen
func WithAutomaticPeerSelection() PeerExchangeOption { // from that list assuming it supports the chosen protocol, otherwise it will chose a peer
// from the node peerstore
func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) PeerExchangeOption {
return func(params *PeerExchangeParameters) { return func(params *PeerExchangeParameters) {
p, err := utils.SelectPeer(params.host, string(PeerExchangeID_v20alpha1), params.log) p, err := utils.SelectPeer(params.host, string(PeerExchangeID_v20alpha1), fromThesePeers, params.log)
if err == nil { if err == nil {
params.selectedPeer = *p params.selectedPeer = *p
} else { } else {
@ -38,10 +40,12 @@ func WithAutomaticPeerSelection() PeerExchangeOption {
} }
// WithFastestPeerSelection is an option used to select a peer from the peer store // WithFastestPeerSelection is an option used to select a peer from the peer store
// with the lowest ping // with the lowest ping. If a list of specific peers is passed, the peer will be chosen
func WithFastestPeerSelection(ctx context.Context) PeerExchangeOption { // from that list assuming it supports the chosen protocol, otherwise it will chose a peer
// from the node peerstore
func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) PeerExchangeOption {
return func(params *PeerExchangeParameters) { return func(params *PeerExchangeParameters) {
p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(PeerExchangeID_v20alpha1), params.log) p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(PeerExchangeID_v20alpha1), fromThesePeers, params.log)
if err == nil { if err == nil {
params.selectedPeer = *p params.selectedPeer = *p
} else { } else {

View File

@ -324,10 +324,12 @@ func WithPeer(p peer.ID) HistoryRequestOption {
} }
// WithAutomaticPeerSelection is an option used to randomly select a peer from the peer store // WithAutomaticPeerSelection is an option used to randomly select a peer from the peer store
// to request the message history // to request the message history. If a list of specific peers is passed, the peer will be chosen
func WithAutomaticPeerSelection() HistoryRequestOption { // from that list assuming it supports the chosen protocol, otherwise it will chose a peer
// from the node peerstore
func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) HistoryRequestOption {
return func(params *HistoryRequestParameters) { return func(params *HistoryRequestParameters) {
p, err := utils.SelectPeer(params.s.h, string(StoreID_v20beta4), params.s.log) p, err := utils.SelectPeer(params.s.h, string(StoreID_v20beta4), fromThesePeers, params.s.log)
if err == nil { if err == nil {
params.selectedPeer = *p params.selectedPeer = *p
} else { } else {
@ -336,9 +338,13 @@ func WithAutomaticPeerSelection() HistoryRequestOption {
} }
} }
func WithFastestPeerSelection(ctx context.Context) HistoryRequestOption { // WithFastestPeerSelection is an option used to select a peer from the peer store
// with the lowest ping. If a list of specific peers is passed, the peer will be chosen
// from that list assuming it supports the chosen protocol, otherwise it will chose a peer
// from the node peerstore
func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) HistoryRequestOption {
return func(params *HistoryRequestParameters) { return func(params *HistoryRequestParameters) {
p, err := utils.SelectPeerWithLowestRTT(ctx, params.s.h, string(StoreID_v20beta4), params.s.log) p, err := utils.SelectPeerWithLowestRTT(ctx, params.s.h, string(StoreID_v20beta4), fromThesePeers, params.s.log)
if err == nil { if err == nil {
params.selectedPeer = *p params.selectedPeer = *p
} else { } else {
@ -680,7 +686,7 @@ func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList
} }
if len(peerList) == 0 { if len(peerList) == 0 {
p, err := utils.SelectPeer(store.h, string(StoreID_v20beta4), store.log) p, err := utils.SelectPeer(store.h, string(StoreID_v20beta4), nil, store.log)
if err != nil { if err != nil {
store.log.Info("selecting peer", zap.Error(err)) store.log.Info("selecting peer", zap.Error(err))
return -1, ErrNoPeersAvailable return -1, ErrNoPeersAvailable

View File

@ -19,15 +19,23 @@ import (
var ErrNoPeersAvailable = errors.New("no suitable peers found") var ErrNoPeersAvailable = errors.New("no suitable peers found")
// SelectPeer is used to return a random peer that supports a given protocol. // SelectPeer is used to return a random peer that supports a given protocol.
func SelectPeer(host host.Host, protocolId string, log *zap.Logger) (*peer.ID, error) { // If a list of specific peers is passed, the peer will be chosen from that list assuming
// it supports the chosen protocol, otherwise it will chose a peer from the node peerstore
func SelectPeer(host host.Host, protocolId string, specificPeers []peer.ID, log *zap.Logger) (*peer.ID, error) {
// @TODO We need to be more strategic about which peers we dial. Right now we just set one on the service. // @TODO We need to be more strategic about which peers we dial. Right now we just set one on the service.
// Ideally depending on the query and our set of peers we take a subset of ideal peers. // Ideally depending on the query and our set of peers we take a subset of ideal peers.
// This will require us to check for various factors such as: // This will require us to check for various factors such as:
// - which topics they track // - which topics they track
// - latency? // - latency?
// - default store peer? // - default store peer?
peerSet := specificPeers
if len(peerSet) == 0 {
peerSet = host.Peerstore().Peers()
}
var peers peer.IDSlice var peers peer.IDSlice
for _, peer := range host.Peerstore().Peers() { for _, peer := range peerSet {
protocols, err := host.Peerstore().SupportsProtocols(peer, protocolId) protocols, err := host.Peerstore().SupportsProtocols(peer, protocolId)
if err != nil { if err != nil {
log.Error("obtaining protocols supported by peers", zap.Error(err), logging.HostID("peer", peer)) log.Error("obtaining protocols supported by peers", zap.Error(err), logging.HostID("peer", peer))
@ -53,9 +61,17 @@ type pingResult struct {
} }
// SelectPeerWithLowestRTT will select a peer that supports a specific protocol with the lowest reply time // SelectPeerWithLowestRTT will select a peer that supports a specific protocol with the lowest reply time
func SelectPeerWithLowestRTT(ctx context.Context, host host.Host, protocolId string, log *zap.Logger) (*peer.ID, error) { // If a list of specific peers is passed, the peer will be chosen from that list assuming
// it supports the chosen protocol, otherwise it will chose a peer from the node peerstore
func SelectPeerWithLowestRTT(ctx context.Context, host host.Host, protocolId string, specificPeers []peer.ID, log *zap.Logger) (*peer.ID, error) {
var peers peer.IDSlice var peers peer.IDSlice
for _, peer := range host.Peerstore().Peers() {
peerSet := specificPeers
if len(peerSet) == 0 {
peerSet = host.Peerstore().Peers()
}
for _, peer := range peerSet {
protocols, err := host.Peerstore().SupportsProtocols(peer, protocolId) protocols, err := host.Peerstore().SupportsProtocols(peer, protocolId)
if err != nil { if err != nil {
log.Error("error obtaining the protocols supported by peers", zap.Error(err)) log.Error("error obtaining the protocols supported by peers", zap.Error(err))

View File

@ -33,14 +33,14 @@ func TestSelectPeer(t *testing.T) {
h1.Peerstore().AddAddrs(h3.ID(), h2.Network().ListenAddresses(), peerstore.PermanentAddrTTL) h1.Peerstore().AddAddrs(h3.ID(), h2.Network().ListenAddresses(), peerstore.PermanentAddrTTL)
// No peers with selected protocol // No peers with selected protocol
_, err = SelectPeer(h1, proto, Logger()) _, err = SelectPeer(h1, proto, nil, Logger())
require.Error(t, ErrNoPeersAvailable, err) require.Error(t, ErrNoPeersAvailable, err)
// Peers with selected protocol // Peers with selected protocol
_ = h1.Peerstore().AddProtocols(h2.ID(), proto) _ = h1.Peerstore().AddProtocols(h2.ID(), proto)
_ = h1.Peerstore().AddProtocols(h3.ID(), proto) _ = h1.Peerstore().AddProtocols(h3.ID(), proto)
_, err = SelectPeerWithLowestRTT(ctx, h1, proto, Logger()) _, err = SelectPeerWithLowestRTT(ctx, h1, proto, nil, Logger())
require.NoError(t, err) require.NoError(t, err)
} }
@ -69,13 +69,13 @@ func TestSelectPeerWithLowestRTT(t *testing.T) {
h1.Peerstore().AddAddrs(h3.ID(), h2.Network().ListenAddresses(), peerstore.PermanentAddrTTL) h1.Peerstore().AddAddrs(h3.ID(), h2.Network().ListenAddresses(), peerstore.PermanentAddrTTL)
// No peers with selected protocol // No peers with selected protocol
_, err = SelectPeerWithLowestRTT(ctx, h1, proto, Logger()) _, err = SelectPeerWithLowestRTT(ctx, h1, proto, nil, Logger())
require.Error(t, ErrNoPeersAvailable, err) require.Error(t, ErrNoPeersAvailable, err)
// Peers with selected protocol // Peers with selected protocol
_ = h1.Peerstore().AddProtocols(h2.ID(), proto) _ = h1.Peerstore().AddProtocols(h2.ID(), proto)
_ = h1.Peerstore().AddProtocols(h3.ID(), proto) _ = h1.Peerstore().AddProtocols(h3.ID(), proto)
_, err = SelectPeerWithLowestRTT(ctx, h1, proto, Logger()) _, err = SelectPeerWithLowestRTT(ctx, h1, proto, nil, Logger())
require.NoError(t, err) require.NoError(t, err)
} }