feat: select peer with lowest ping time and test peer functions (#143)

* feat: select peer with lowest ping time and test peer functions
* fix: do not self-ping
This commit is contained in:
Richard Ramos 2021-11-09 19:34:04 -04:00 committed by GitHub
parent 9426cd133a
commit 0c873e3c2d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 200 additions and 10 deletions

View File

@ -7,7 +7,6 @@ import (
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
"github.com/stretchr/testify/require"
)
@ -25,13 +24,11 @@ func TestKeepAlive(t *testing.T) {
err = host1.Connect(ctx, host1.Peerstore().PeerInfo(host2.ID()))
require.NoError(t, err)
ping := ping.NewPingService(host1)
require.Len(t, host1.Network().Peers(), 1)
ctx2, cancel2 := context.WithTimeout(ctx, 3*time.Second)
defer cancel2()
pingPeer(ctx2, ping, host2.ID())
pingPeer(ctx2, host1, host2.ID())
require.NoError(t, ctx.Err())
}

View File

@ -40,7 +40,6 @@ type WakuNode struct {
filter *filter.WakuFilter
lightPush *lightpush.WakuLightPush
rendezvous *rendezvous.RendezvousService
ping *ping.PingService
store *store.WakuStore
bcaster v2.Broadcaster
@ -402,7 +401,6 @@ func (w *WakuNode) Peers() PeerStats {
func (w *WakuNode) startKeepAlive(t time.Duration) {
log.Info("Setting up ping protocol with duration of ", t)
w.ping = ping.NewPingService(w.host)
ticker := time.NewTicker(t)
go func() {
@ -416,7 +414,9 @@ func (w *WakuNode) startKeepAlive(t time.Duration) {
// which is not possible when iterating
// through Network's peer collection, as it will be empty
for _, p := range w.host.Peerstore().Peers() {
go pingPeer(w.ctx, w.ping, p)
if p != w.host.ID() {
go pingPeer(w.ctx, w.host, p)
}
}
case <-w.quit:
ticker.Stop()
@ -426,12 +426,12 @@ func (w *WakuNode) startKeepAlive(t time.Duration) {
}()
}
func pingPeer(ctx context.Context, pingService *ping.PingService, peer peer.ID) {
func pingPeer(ctx context.Context, host host.Host, peer peer.ID) {
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
log.Debug("Pinging ", peer)
pr := pingService.Ping(ctx, peer)
pr := ping.Ping(ctx, host, peer)
select {
case res := <-pr:
if res.Error != nil {

View File

@ -85,6 +85,17 @@ func WithAutomaticPeerSelection() FilterSubscribeOption {
}
}
func WithFastestPeerSelection(ctx context.Context) FilterSubscribeOption {
return func(params *FilterSubscribeParameters) {
p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(FilterID_v20beta1))
if err == nil {
params.selectedPeer = *p
} else {
log.Info("Error selecting peer: ", err)
}
}
}
func DefaultOptions() []FilterSubscribeOption {
return []FilterSubscribeOption{
WithAutomaticPeerSelection(),

View File

@ -122,6 +122,7 @@ func (wakuLP *WakuLightPush) onRequest(s network.Stream) {
func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, opts ...LightPushOption) (*pb.PushResponse, error) {
params := new(LightPushParameters)
params.host = wakuLP.h
optList := DefaultOptions(wakuLP.h)
optList = append(optList, opts...)

View File

@ -1,6 +1,8 @@
package lightpush
import (
"context"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/status-im/go-waku/waku/v2/protocol"
@ -8,6 +10,7 @@ import (
)
type LightPushParameters struct {
host host.Host
selectedPeer peer.ID
requestId []byte
}
@ -31,6 +34,17 @@ func WithAutomaticPeerSelection(host host.Host) LightPushOption {
}
}
func WithFastestPeerSelection(ctx context.Context) LightPushOption {
return func(params *LightPushParameters) {
p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(LightPushID_v20beta1))
if err == nil {
params.selectedPeer = *p
} else {
log.Info("Error selecting peer: ", err)
}
}
}
func WithRequestId(requestId []byte) LightPushOption {
return func(params *LightPushParameters) {
params.requestId = requestId

View File

@ -442,6 +442,17 @@ func WithAutomaticPeerSelection() HistoryRequestOption {
}
}
func WithFastestPeerSelection(ctx context.Context) HistoryRequestOption {
return func(params *HistoryRequestParameters) {
p, err := utils.SelectPeerWithLowestRTT(ctx, params.s.h, string(StoreID_v20beta3))
if err == nil {
params.selectedPeer = *p
} else {
log.Info("Error selecting peer: ", err)
}
}
}
func WithRequestId(requestId []byte) HistoryRequestOption {
return func(params *HistoryRequestParameters) {
params.requestId = requestId

View File

@ -1,16 +1,23 @@
package utils
import (
"context"
"errors"
"math/rand"
"sync"
"time"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
)
var log = logging.Logger("utils")
var ErrNoPeersAvailable = errors.New("no suitable peers found")
var PingServiceNotAvailable = errors.New("ping service not available")
// SelectPeer is used to return a random peer that supports a given protocol.
func SelectPeer(host host.Host, protocolId string) (*peer.ID, error) {
// @TODO We need to be more strategic about which peers we dial. Right now we just set one on the service.
@ -37,5 +44,71 @@ func SelectPeer(host host.Host, protocolId string) (*peer.ID, error) {
return &peers[rand.Intn(len(peers))], nil // nolint: gosec
}
return nil, errors.New("no suitable peers found")
return nil, ErrNoPeersAvailable
}
type pingResult struct {
p peer.ID
rtt time.Duration
}
func SelectPeerWithLowestRTT(ctx context.Context, host host.Host, protocolId string) (*peer.ID, error) {
var peers peer.IDSlice
for _, peer := range host.Peerstore().Peers() {
protocols, err := host.Peerstore().SupportsProtocols(peer, protocolId)
if err != nil {
log.Error("error obtaining the protocols supported by peers", err)
return nil, err
}
if len(protocols) > 0 {
peers = append(peers, peer)
}
}
wg := sync.WaitGroup{}
waitCh := make(chan struct{})
pingCh := make(chan pingResult, 1000)
go func() {
for _, p := range peers {
wg.Add(1)
go func(p peer.ID) {
defer wg.Done()
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
result := <-ping.Ping(ctx, host, p)
if result.Error == nil {
pingCh <- pingResult{
p: p,
rtt: result.RTT,
}
}
}(p)
}
wg.Wait()
close(waitCh)
close(pingCh)
}()
select {
case <-waitCh:
var min *pingResult
for p := range pingCh {
if min == nil {
min = &p
} else {
if p.rtt < min.rtt {
min = &p
}
}
}
if min == nil {
return nil, ErrNoPeersAvailable
} else {
return &min.p, nil
}
case <-ctx.Done():
return nil, ErrNoPeersAvailable
}
}

View File

@ -0,0 +1,83 @@
package utils
import (
"context"
"crypto/rand"
"fmt"
"testing"
"time"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/status-im/go-waku/tests"
"github.com/stretchr/testify/require"
)
func TestSelectPeer(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
h1, err := tests.MakeHost(ctx, 0, rand.Reader)
require.NoError(t, err)
defer h1.Close()
h2, err := tests.MakeHost(ctx, 0, rand.Reader)
require.NoError(t, err)
defer h2.Close()
h3, err := tests.MakeHost(ctx, 0, rand.Reader)
require.NoError(t, err)
defer h3.Close()
proto := "test/protocol"
h1.Peerstore().AddAddrs(h2.ID(), h2.Network().ListenAddresses(), peerstore.PermanentAddrTTL)
h1.Peerstore().AddAddrs(h3.ID(), h2.Network().ListenAddresses(), peerstore.PermanentAddrTTL)
// No peers with selected protocol
_, err = SelectPeer(h1, proto)
require.Error(t, ErrNoPeersAvailable, err)
// Peers with selected protocol
_ = h1.Peerstore().AddProtocols(h2.ID(), proto)
_ = h1.Peerstore().AddProtocols(h3.ID(), proto)
_, err = SelectPeerWithLowestRTT(ctx, h1, proto)
require.NoError(t, err)
}
func TestSelectPeerWithLowestRTT(t *testing.T) {
// help-wanted: how to slowdown the ping response to properly test the rtt
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
h1, err := tests.MakeHost(ctx, 0, rand.Reader)
require.NoError(t, err)
defer h1.Close()
h2, err := tests.MakeHost(ctx, 0, rand.Reader)
require.NoError(t, err)
defer h2.Close()
h3, err := tests.MakeHost(ctx, 0, rand.Reader)
require.NoError(t, err)
defer h3.Close()
proto := "test/protocol"
h1.Peerstore().AddAddrs(h2.ID(), h2.Network().ListenAddresses(), peerstore.PermanentAddrTTL)
h1.Peerstore().AddAddrs(h3.ID(), h2.Network().ListenAddresses(), peerstore.PermanentAddrTTL)
// No peers with selected protocol
_, err = SelectPeerWithLowestRTT(ctx, h1, proto)
fmt.Println(err)
require.Error(t, ErrNoPeersAvailable, err)
// Peers with selected protocol
_ = h1.Peerstore().AddProtocols(h2.ID(), proto)
_ = h1.Peerstore().AddProtocols(h3.ID(), proto)
_, err = SelectPeerWithLowestRTT(ctx, h1, proto)
require.NoError(t, err)
}