diff --git a/waku/v2/node/keepalive_test.go b/waku/v2/node/keepalive_test.go index d90b12de..8d72e193 100644 --- a/waku/v2/node/keepalive_test.go +++ b/waku/v2/node/keepalive_test.go @@ -7,7 +7,6 @@ import ( "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p-core/peerstore" - "github.com/libp2p/go-libp2p/p2p/protocol/ping" "github.com/stretchr/testify/require" ) @@ -25,13 +24,11 @@ func TestKeepAlive(t *testing.T) { err = host1.Connect(ctx, host1.Peerstore().PeerInfo(host2.ID())) require.NoError(t, err) - ping := ping.NewPingService(host1) - require.Len(t, host1.Network().Peers(), 1) ctx2, cancel2 := context.WithTimeout(ctx, 3*time.Second) defer cancel2() - pingPeer(ctx2, ping, host2.ID()) + pingPeer(ctx2, host1, host2.ID()) require.NoError(t, ctx.Err()) } diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 8d64967c..32b3de51 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -40,7 +40,6 @@ type WakuNode struct { filter *filter.WakuFilter lightPush *lightpush.WakuLightPush rendezvous *rendezvous.RendezvousService - ping *ping.PingService store *store.WakuStore bcaster v2.Broadcaster @@ -402,7 +401,6 @@ func (w *WakuNode) Peers() PeerStats { func (w *WakuNode) startKeepAlive(t time.Duration) { log.Info("Setting up ping protocol with duration of ", t) - w.ping = ping.NewPingService(w.host) ticker := time.NewTicker(t) go func() { @@ -416,7 +414,9 @@ func (w *WakuNode) startKeepAlive(t time.Duration) { // which is not possible when iterating // through Network's peer collection, as it will be empty for _, p := range w.host.Peerstore().Peers() { - go pingPeer(w.ctx, w.ping, p) + if p != w.host.ID() { + go pingPeer(w.ctx, w.host, p) + } } case <-w.quit: ticker.Stop() @@ -426,12 +426,12 @@ func (w *WakuNode) startKeepAlive(t time.Duration) { }() } -func pingPeer(ctx context.Context, pingService *ping.PingService, peer peer.ID) { +func pingPeer(ctx context.Context, host host.Host, peer peer.ID) { ctx, cancel := context.WithTimeout(ctx, 3*time.Second) defer cancel() log.Debug("Pinging ", peer) - pr := pingService.Ping(ctx, peer) + pr := ping.Ping(ctx, host, peer) select { case res := <-pr: if res.Error != nil { diff --git a/waku/v2/protocol/filter/waku_filter.go b/waku/v2/protocol/filter/waku_filter.go index 303a0aeb..161a9782 100644 --- a/waku/v2/protocol/filter/waku_filter.go +++ b/waku/v2/protocol/filter/waku_filter.go @@ -85,6 +85,17 @@ func WithAutomaticPeerSelection() FilterSubscribeOption { } } +func WithFastestPeerSelection(ctx context.Context) FilterSubscribeOption { + return func(params *FilterSubscribeParameters) { + p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(FilterID_v20beta1)) + if err == nil { + params.selectedPeer = *p + } else { + log.Info("Error selecting peer: ", err) + } + } +} + func DefaultOptions() []FilterSubscribeOption { return []FilterSubscribeOption{ WithAutomaticPeerSelection(), diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index df90caa6..053364b4 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -122,6 +122,7 @@ func (wakuLP *WakuLightPush) onRequest(s network.Stream) { func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, opts ...LightPushOption) (*pb.PushResponse, error) { params := new(LightPushParameters) + params.host = wakuLP.h optList := DefaultOptions(wakuLP.h) optList = append(optList, opts...) diff --git a/waku/v2/protocol/lightpush/waku_lightpush_option.go b/waku/v2/protocol/lightpush/waku_lightpush_option.go index 13758d56..d880f597 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_option.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_option.go @@ -1,6 +1,8 @@ package lightpush import ( + "context" + "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" "github.com/status-im/go-waku/waku/v2/protocol" @@ -8,6 +10,7 @@ import ( ) type LightPushParameters struct { + host host.Host selectedPeer peer.ID requestId []byte } @@ -31,6 +34,17 @@ func WithAutomaticPeerSelection(host host.Host) LightPushOption { } } +func WithFastestPeerSelection(ctx context.Context) LightPushOption { + return func(params *LightPushParameters) { + p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(LightPushID_v20beta1)) + if err == nil { + params.selectedPeer = *p + } else { + log.Info("Error selecting peer: ", err) + } + } +} + func WithRequestId(requestId []byte) LightPushOption { return func(params *LightPushParameters) { params.requestId = requestId diff --git a/waku/v2/protocol/store/waku_store.go b/waku/v2/protocol/store/waku_store.go index 5bccc0c9..90ec1a5a 100644 --- a/waku/v2/protocol/store/waku_store.go +++ b/waku/v2/protocol/store/waku_store.go @@ -442,6 +442,17 @@ func WithAutomaticPeerSelection() HistoryRequestOption { } } +func WithFastestPeerSelection(ctx context.Context) HistoryRequestOption { + return func(params *HistoryRequestParameters) { + p, err := utils.SelectPeerWithLowestRTT(ctx, params.s.h, string(StoreID_v20beta3)) + if err == nil { + params.selectedPeer = *p + } else { + log.Info("Error selecting peer: ", err) + } + } +} + func WithRequestId(requestId []byte) HistoryRequestOption { return func(params *HistoryRequestParameters) { params.requestId = requestId diff --git a/waku/v2/utils/peer.go b/waku/v2/utils/peer.go index 3cc65c8f..862363a4 100644 --- a/waku/v2/utils/peer.go +++ b/waku/v2/utils/peer.go @@ -1,16 +1,23 @@ package utils import ( + "context" "errors" "math/rand" + "sync" + "time" logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p/p2p/protocol/ping" ) var log = logging.Logger("utils") +var ErrNoPeersAvailable = errors.New("no suitable peers found") +var PingServiceNotAvailable = errors.New("ping service not available") + // SelectPeer is used to return a random peer that supports a given protocol. func SelectPeer(host host.Host, protocolId string) (*peer.ID, error) { // @TODO We need to be more strategic about which peers we dial. Right now we just set one on the service. @@ -37,5 +44,71 @@ func SelectPeer(host host.Host, protocolId string) (*peer.ID, error) { return &peers[rand.Intn(len(peers))], nil // nolint: gosec } - return nil, errors.New("no suitable peers found") + return nil, ErrNoPeersAvailable +} + +type pingResult struct { + p peer.ID + rtt time.Duration +} + +func SelectPeerWithLowestRTT(ctx context.Context, host host.Host, protocolId string) (*peer.ID, error) { + var peers peer.IDSlice + for _, peer := range host.Peerstore().Peers() { + protocols, err := host.Peerstore().SupportsProtocols(peer, protocolId) + if err != nil { + log.Error("error obtaining the protocols supported by peers", err) + return nil, err + } + + if len(protocols) > 0 { + peers = append(peers, peer) + } + } + + wg := sync.WaitGroup{} + waitCh := make(chan struct{}) + pingCh := make(chan pingResult, 1000) + + go func() { + for _, p := range peers { + wg.Add(1) + go func(p peer.ID) { + defer wg.Done() + ctx, cancel := context.WithTimeout(ctx, 3*time.Second) + defer cancel() + result := <-ping.Ping(ctx, host, p) + if result.Error == nil { + pingCh <- pingResult{ + p: p, + rtt: result.RTT, + } + } + }(p) + } + wg.Wait() + close(waitCh) + close(pingCh) + }() + + select { + case <-waitCh: + var min *pingResult + for p := range pingCh { + if min == nil { + min = &p + } else { + if p.rtt < min.rtt { + min = &p + } + } + } + if min == nil { + return nil, ErrNoPeersAvailable + } else { + return &min.p, nil + } + case <-ctx.Done(): + return nil, ErrNoPeersAvailable + } } diff --git a/waku/v2/utils/peer_test.go b/waku/v2/utils/peer_test.go new file mode 100644 index 00000000..011bd11d --- /dev/null +++ b/waku/v2/utils/peer_test.go @@ -0,0 +1,83 @@ +package utils + +import ( + "context" + "crypto/rand" + "fmt" + "testing" + "time" + + "github.com/libp2p/go-libp2p-core/peerstore" + "github.com/status-im/go-waku/tests" + "github.com/stretchr/testify/require" +) + +func TestSelectPeer(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + h1, err := tests.MakeHost(ctx, 0, rand.Reader) + require.NoError(t, err) + defer h1.Close() + + h2, err := tests.MakeHost(ctx, 0, rand.Reader) + require.NoError(t, err) + defer h2.Close() + + h3, err := tests.MakeHost(ctx, 0, rand.Reader) + require.NoError(t, err) + defer h3.Close() + + proto := "test/protocol" + + h1.Peerstore().AddAddrs(h2.ID(), h2.Network().ListenAddresses(), peerstore.PermanentAddrTTL) + h1.Peerstore().AddAddrs(h3.ID(), h2.Network().ListenAddresses(), peerstore.PermanentAddrTTL) + + // No peers with selected protocol + _, err = SelectPeer(h1, proto) + require.Error(t, ErrNoPeersAvailable, err) + + // Peers with selected protocol + _ = h1.Peerstore().AddProtocols(h2.ID(), proto) + _ = h1.Peerstore().AddProtocols(h3.ID(), proto) + + _, err = SelectPeerWithLowestRTT(ctx, h1, proto) + require.NoError(t, err) + +} + +func TestSelectPeerWithLowestRTT(t *testing.T) { + // help-wanted: how to slowdown the ping response to properly test the rtt + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + h1, err := tests.MakeHost(ctx, 0, rand.Reader) + require.NoError(t, err) + defer h1.Close() + + h2, err := tests.MakeHost(ctx, 0, rand.Reader) + require.NoError(t, err) + defer h2.Close() + + h3, err := tests.MakeHost(ctx, 0, rand.Reader) + require.NoError(t, err) + defer h3.Close() + + proto := "test/protocol" + + h1.Peerstore().AddAddrs(h2.ID(), h2.Network().ListenAddresses(), peerstore.PermanentAddrTTL) + h1.Peerstore().AddAddrs(h3.ID(), h2.Network().ListenAddresses(), peerstore.PermanentAddrTTL) + + // No peers with selected protocol + _, err = SelectPeerWithLowestRTT(ctx, h1, proto) + fmt.Println(err) + require.Error(t, ErrNoPeersAvailable, err) + + // Peers with selected protocol + _ = h1.Peerstore().AddProtocols(h2.ID(), proto) + _ = h1.Peerstore().AddProtocols(h3.ID(), proto) + + _, err = SelectPeerWithLowestRTT(ctx, h1, proto) + require.NoError(t, err) +}