feat: select peer with lowest ping time and test peer functions

This commit is contained in:
Richard Ramos 2021-11-09 15:01:53 -04:00
parent 9426cd133a
commit 27c339614f
No known key found for this signature in database
GPG Key ID: 80D4B01265FDFE8F
13 changed files with 249 additions and 34 deletions

View File

@ -128,13 +128,13 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
}
func (w *WakuNode) Start() error {
w.store = store.NewWakuStore(w.host, w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration)
w.store = store.NewWakuStore(w.host, w.opts.messageProvider, w.ping, w.opts.maxMessages, w.opts.maxDuration)
if w.opts.enableStore {
w.startStore()
}
if w.opts.enableFilter {
w.filter = filter.NewWakuFilter(w.ctx, w.host, w.opts.isFilterFullNode)
w.filter = filter.NewWakuFilter(w.ctx, w.host, w.ping, w.opts.isFilterFullNode)
}
if w.opts.enableRendezvous {
@ -147,7 +147,7 @@ func (w *WakuNode) Start() error {
return err
}
w.lightPush = lightpush.NewWakuLightPush(w.ctx, w.host, w.relay)
w.lightPush = lightpush.NewWakuLightPush(w.ctx, w.host, w.ping, w.relay)
if w.opts.enableLightPush {
if err := w.lightPush.Start(); err != nil {
return err

View File

@ -11,6 +11,7 @@ import (
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
"github.com/libp2p/go-msgio/protoio"
"github.com/status-im/go-waku/waku/v2/metrics"
"github.com/status-im/go-waku/waku/v2/protocol"
@ -29,6 +30,7 @@ var (
type (
FilterSubscribeParameters struct {
host host.Host
ping *ping.PingService
selectedPeer peer.ID
}
@ -54,6 +56,7 @@ type (
WakuFilter struct {
ctx context.Context
h host.Host
ping *ping.PingService
isFullNode bool
MsgC chan *protocol.Envelope
@ -85,6 +88,17 @@ func WithAutomaticPeerSelection() FilterSubscribeOption {
}
}
func WithFastestPeerSelection(ctx context.Context) FilterSubscribeOption {
return func(params *FilterSubscribeParameters) {
p, err := utils.SelectPeerWithLowestRTT(ctx, params.ping, string(FilterID_v20beta1))
if err == nil {
params.selectedPeer = *p
} else {
log.Info("Error selecting peer: ", err)
}
}
}
func DefaultOptions() []FilterSubscribeOption {
return []FilterSubscribeOption{
WithAutomaticPeerSelection(),
@ -137,7 +151,7 @@ func (wf *WakuFilter) onRequest(s network.Stream) {
}
}
func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool) *WakuFilter {
func NewWakuFilter(ctx context.Context, host host.Host, ping *ping.PingService, isFullNode bool) *WakuFilter {
ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "filter"))
if err != nil {
log.Error(err)
@ -147,6 +161,7 @@ func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool) *WakuFi
wf.ctx = ctx
wf.MsgC = make(chan *protocol.Envelope)
wf.h = host
wf.ping = ping
wf.isFullNode = isFullNode
wf.filters = NewFilterMap()
wf.subscribers = NewSubscribers()
@ -230,6 +245,7 @@ func (wf *WakuFilter) FilterListener() {
func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFilter, opts ...FilterSubscribeOption) (subscription *FilterSubscription, err error) {
params := new(FilterSubscribeParameters)
params.host = wf.h
params.ping = wf.ping
optList := DefaultOptions()
optList = append(optList, opts...)

View File

@ -38,7 +38,7 @@ func makeWakuFilter(t *testing.T) (*WakuFilter, host.Host) {
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
filter := NewWakuFilter(context.Background(), host, false)
filter := NewWakuFilter(context.Background(), host, nil, false)
return filter, host
}
@ -68,7 +68,7 @@ func TestWakuFilter(t *testing.T) {
defer node2.Stop()
defer sub2.Unsubscribe()
node2Filter := NewWakuFilter(ctx, host2, true)
node2Filter := NewWakuFilter(ctx, host2, nil, true)
broadcaster.Register(node2Filter.MsgC)
host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL)

View File

@ -10,6 +10,7 @@ import (
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
"github.com/libp2p/go-msgio/protoio"
"github.com/status-im/go-waku/waku/v2/metrics"
"github.com/status-im/go-waku/waku/v2/protocol"
@ -28,14 +29,16 @@ var (
type WakuLightPush struct {
h host.Host
ping *ping.PingService
relay *relay.WakuRelay
ctx context.Context
}
func NewWakuLightPush(ctx context.Context, h host.Host, relay *relay.WakuRelay) *WakuLightPush {
func NewWakuLightPush(ctx context.Context, h host.Host, ping *ping.PingService, relay *relay.WakuRelay) *WakuLightPush {
wakuLP := new(WakuLightPush)
wakuLP.relay = relay
wakuLP.ctx = ctx
wakuLP.ping = ping
wakuLP.h = h
return wakuLP
@ -122,6 +125,7 @@ func (wakuLP *WakuLightPush) onRequest(s network.Stream) {
func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, opts ...LightPushOption) (*pb.PushResponse, error) {
params := new(LightPushParameters)
params.ping = wakuLP.ping
optList := DefaultOptions(wakuLP.h)
optList = append(optList, opts...)

View File

@ -1,14 +1,18 @@
package lightpush
import (
"context"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
"github.com/status-im/go-waku/waku/v2/protocol"
"github.com/status-im/go-waku/waku/v2/utils"
)
type LightPushParameters struct {
selectedPeer peer.ID
ping *ping.PingService
requestId []byte
}
@ -31,6 +35,17 @@ func WithAutomaticPeerSelection(host host.Host) LightPushOption {
}
}
func WithFastestPeerSelection(ctx context.Context) LightPushOption {
return func(params *LightPushParameters) {
p, err := utils.SelectPeerWithLowestRTT(ctx, params.ping, string(LightPushID_v20beta1))
if err == nil {
params.selectedPeer = *p
} else {
log.Info("Error selecting peer: ", err)
}
}
}
func WithRequestId(requestId []byte) LightPushOption {
return func(params *LightPushParameters) {
params.requestId = requestId

View File

@ -55,7 +55,7 @@ func TestWakuLightPush(t *testing.T) {
defer sub2.Unsubscribe()
ctx := context.Background()
lightPushNode2 := NewWakuLightPush(ctx, host2, node2)
lightPushNode2 := NewWakuLightPush(ctx, host2, nil, node2)
err := lightPushNode2.Start()
require.NoError(t, err)
defer lightPushNode2.Stop()
@ -65,7 +65,7 @@ func TestWakuLightPush(t *testing.T) {
clientHost, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
client := NewWakuLightPush(ctx, clientHost, nil)
client := NewWakuLightPush(ctx, clientHost, nil, nil)
host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL)
err = host2.Peerstore().AddProtocols(host1.ID(), string(relay.WakuRelayID_v200))
@ -121,7 +121,7 @@ func TestWakuLightPushStartWithoutRelay(t *testing.T) {
clientHost, err := tests.MakeHost(context.Background(), 0, rand.Reader)
require.NoError(t, err)
client := NewWakuLightPush(ctx, clientHost, nil)
client := NewWakuLightPush(ctx, clientHost, nil, nil)
err = client.Start()
require.Errorf(t, err, "relay is required")
@ -135,7 +135,7 @@ func TestWakuLightPushNoPeers(t *testing.T) {
clientHost, err := tests.MakeHost(context.Background(), 0, rand.Reader)
require.NoError(t, err)
client := NewWakuLightPush(ctx, clientHost, nil)
client := NewWakuLightPush(ctx, clientHost, nil, nil)
_, err = client.Publish(ctx, tests.CreateWakuMessage("test", float64(0)), &testTopic)
require.Errorf(t, err, "no suitable remote peers")

View File

@ -21,7 +21,7 @@ func TestFindLastSeenMessage(t *testing.T) {
msg4 := protocol.NewEnvelope(tests.CreateWakuMessage("4", 4), "test")
msg5 := protocol.NewEnvelope(tests.CreateWakuMessage("5", 5), "test")
s := NewWakuStore(nil, nil, 0, 0)
s := NewWakuStore(nil, nil, nil, 0, 0)
s.storeMessage(msg1)
s.storeMessage(msg3)
s.storeMessage(msg5)
@ -38,7 +38,7 @@ func TestResume(t *testing.T) {
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s1 := NewWakuStore(host1, nil, 0, 0)
s1 := NewWakuStore(host1, nil, nil, 0, 0)
s1.Start(ctx)
defer s1.Stop()
@ -55,7 +55,7 @@ func TestResume(t *testing.T) {
host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s2 := NewWakuStore(host2, nil, 0, 0)
s2 := NewWakuStore(host2, nil, nil, 0, 0)
s2.Start(ctx)
defer s2.Stop()
@ -87,7 +87,7 @@ func TestResumeWithListOfPeers(t *testing.T) {
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s1 := NewWakuStore(host1, nil, 0, 0)
s1 := NewWakuStore(host1, nil, nil, 0, 0)
s1.Start(ctx)
defer s1.Stop()
@ -98,7 +98,7 @@ func TestResumeWithListOfPeers(t *testing.T) {
host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s2 := NewWakuStore(host2, nil, 0, 0)
s2 := NewWakuStore(host2, nil, nil, 0, 0)
s2.Start(ctx)
defer s2.Stop()
@ -120,7 +120,7 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) {
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s1 := NewWakuStore(host1, nil, 0, 0)
s1 := NewWakuStore(host1, nil, nil, 0, 0)
s1.Start(ctx)
defer s1.Stop()
@ -131,7 +131,7 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) {
host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s2 := NewWakuStore(host2, nil, 0, 0)
s2 := NewWakuStore(host2, nil, nil, 0, 0)
s2.Start(ctx)
defer s2.Stop()

View File

@ -15,6 +15,7 @@ import (
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
"github.com/libp2p/go-msgio/protoio"
"github.com/status-im/go-waku/waku/persistence"
@ -233,13 +234,15 @@ type WakuStore struct {
messageQueue *MessageQueue
msgProvider MessageProvider
h host.Host
ping *ping.PingService
}
// NewWakuStore creates a WakuStore using an specific MessageProvider for storing the messages
func NewWakuStore(host host.Host, p MessageProvider, maxNumberOfMessages int, maxRetentionDuration time.Duration) *WakuStore {
func NewWakuStore(host host.Host, p MessageProvider, ping *ping.PingService, maxNumberOfMessages int, maxRetentionDuration time.Duration) *WakuStore {
wakuStore := new(WakuStore)
wakuStore.msgProvider = p
wakuStore.h = host
wakuStore.ping = ping
wakuStore.messageQueue = NewMessageQueue(maxNumberOfMessages, maxRetentionDuration)
return wakuStore
}
@ -411,6 +414,7 @@ func findIndex(msgList []IndexedWakuMessage, index *pb.Index) int {
}
type HistoryRequestParameters struct {
ping *ping.PingService
selectedPeer peer.ID
requestId []byte
cursor *pb.Index
@ -442,6 +446,17 @@ func WithAutomaticPeerSelection() HistoryRequestOption {
}
}
func WithFastestPeerSelection(ctx context.Context) HistoryRequestOption {
return func(params *HistoryRequestParameters) {
p, err := utils.SelectPeerWithLowestRTT(ctx, params.ping, string(StoreID_v20beta3))
if err == nil {
params.selectedPeer = *p
} else {
log.Info("Error selecting peer: ", err)
}
}
}
func WithRequestId(requestId []byte) HistoryRequestOption {
return func(params *HistoryRequestParameters) {
params.requestId = requestId
@ -530,6 +545,7 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR
params := new(HistoryRequestParameters)
params.s = store
params.ping = store.ping
optList := DefaultOptions()
optList = append(optList, opts...)

View File

@ -24,7 +24,7 @@ func TestStorePersistence(t *testing.T) {
dbStore, err := persistence.NewDBStore(persistence.WithDB(db))
require.NoError(t, err)
s1 := NewWakuStore(nil, dbStore, 0, 0)
s1 := NewWakuStore(nil, dbStore, nil, 0, 0)
s1.fetchDBRecords(ctx)
require.Len(t, s1.messageQueue.messages, 0)
@ -39,7 +39,7 @@ func TestStorePersistence(t *testing.T) {
s1.storeMessage(protocol.NewEnvelope(msg, defaultPubSubTopic))
s2 := NewWakuStore(nil, dbStore, 0, 0)
s2 := NewWakuStore(nil, dbStore, nil, 0, 0)
s2.fetchDBRecords(ctx)
require.Len(t, s2.messageQueue.messages, 1)
require.Equal(t, msg, s2.messageQueue.messages[0].msg)

View File

@ -20,7 +20,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) {
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s1 := NewWakuStore(host1, nil, 0, 0)
s1 := NewWakuStore(host1, nil, nil, 0, 0)
s1.Start(ctx)
defer s1.Stop()
@ -39,7 +39,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) {
// Simulate a message has been received via relay protocol
s1.MsgC <- protocol.NewEnvelope(msg, pubsubTopic1)
s2 := NewWakuStore(host2, nil, 0, 0)
s2 := NewWakuStore(host2, nil, nil, 0, 0)
s2.Start(ctx)
defer s2.Stop()
@ -66,7 +66,7 @@ func TestWakuStoreProtocolNext(t *testing.T) {
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s1 := NewWakuStore(host1, nil, 0, 0)
s1 := NewWakuStore(host1, nil, nil, 0, 0)
s1.Start(ctx)
defer s1.Stop()
@ -92,7 +92,7 @@ func TestWakuStoreProtocolNext(t *testing.T) {
err = host2.Peerstore().AddProtocols(host1.ID(), string(StoreID_v20beta3))
require.NoError(t, err)
s2 := NewWakuStore(host2, nil, 0, 0)
s2 := NewWakuStore(host2, nil, nil, 0, 0)
s2.Start(ctx)
defer s2.Stop()

View File

@ -17,7 +17,7 @@ func TestStoreQuery(t *testing.T) {
msg1 := tests.CreateWakuMessage(defaultContentTopic, utils.GetUnixEpoch())
msg2 := tests.CreateWakuMessage("2", utils.GetUnixEpoch())
s := NewWakuStore(nil, nil, 0, 0)
s := NewWakuStore(nil, nil, nil, 0, 0)
s.storeMessage(protocol.NewEnvelope(msg1, defaultPubSubTopic))
s.storeMessage(protocol.NewEnvelope(msg2, defaultPubSubTopic))
@ -43,7 +43,7 @@ func TestStoreQueryMultipleContentFilters(t *testing.T) {
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
s := NewWakuStore(nil, nil, 0, 0)
s := NewWakuStore(nil, nil, nil, 0, 0)
s.storeMessage(protocol.NewEnvelope(msg1, defaultPubSubTopic))
s.storeMessage(protocol.NewEnvelope(msg2, defaultPubSubTopic))
@ -77,7 +77,7 @@ func TestStoreQueryPubsubTopicFilter(t *testing.T) {
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
s := NewWakuStore(nil, nil, 0, 0)
s := NewWakuStore(nil, nil, nil, 0, 0)
s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic1))
s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic2))
s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic2))
@ -109,7 +109,7 @@ func TestStoreQueryPubsubTopicNoMatch(t *testing.T) {
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
s := NewWakuStore(nil, nil, 0, 0)
s := NewWakuStore(nil, nil, nil, 0, 0)
s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic2))
s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic2))
s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic2))
@ -131,7 +131,7 @@ func TestStoreQueryPubsubTopicAllMessages(t *testing.T) {
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
s := NewWakuStore(nil, nil, 0, 0)
s := NewWakuStore(nil, nil, nil, 0, 0)
s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic1))
s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic1))
s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic1))
@ -150,7 +150,7 @@ func TestStoreQueryForwardPagination(t *testing.T) {
topic1 := "1"
pubsubTopic1 := "topic1"
s := NewWakuStore(nil, nil, 0, 0)
s := NewWakuStore(nil, nil, nil, 0, 0)
for i := 0; i < 10; i++ {
msg := tests.CreateWakuMessage(topic1, utils.GetUnixEpoch())
msg.Payload = []byte{byte(i)}
@ -174,7 +174,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) {
topic1 := "1"
pubsubTopic1 := "topic1"
s := NewWakuStore(nil, nil, 0, 0)
s := NewWakuStore(nil, nil, nil, 0, 0)
for i := 0; i < 10; i++ {
msg := &pb.WakuMessage{
Payload: []byte{byte(i)},
@ -200,7 +200,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) {
}
func TestTemporalHistoryQueries(t *testing.T) {
s := NewWakuStore(nil, nil, 0, 0)
s := NewWakuStore(nil, nil, nil, 0, 0)
var messages []*pb.WakuMessage
for i := 0; i < 10; i++ {

View File

@ -1,16 +1,23 @@
package utils
import (
"context"
"errors"
"math/rand"
"sync"
"time"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
)
var log = logging.Logger("utils")
var ErrNoPeersAvailable = errors.New("no suitable peers found")
var PingServiceNotAvailable = errors.New("ping service not available")
// SelectPeer is used to return a random peer that supports a given protocol.
func SelectPeer(host host.Host, protocolId string) (*peer.ID, error) {
// @TODO We need to be more strategic about which peers we dial. Right now we just set one on the service.
@ -37,5 +44,75 @@ func SelectPeer(host host.Host, protocolId string) (*peer.ID, error) {
return &peers[rand.Intn(len(peers))], nil // nolint: gosec
}
return nil, errors.New("no suitable peers found")
return nil, ErrNoPeersAvailable
}
type pingResult struct {
p peer.ID
rtt time.Duration
}
func SelectPeerWithLowestRTT(ctx context.Context, pingService *ping.PingService, protocolId string) (*peer.ID, error) {
if pingService == nil {
return nil, PingServiceNotAvailable
}
var peers peer.IDSlice
for _, peer := range pingService.Host.Peerstore().Peers() {
protocols, err := pingService.Host.Peerstore().SupportsProtocols(peer, protocolId)
if err != nil {
log.Error("error obtaining the protocols supported by peers", err)
return nil, err
}
if len(protocols) > 0 {
peers = append(peers, peer)
}
}
wg := sync.WaitGroup{}
waitCh := make(chan struct{})
pingCh := make(chan pingResult, 1000)
go func() {
for _, p := range peers {
wg.Add(1)
go func(p peer.ID) {
defer wg.Done()
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
result := <-pingService.Ping(ctx, p)
if result.Error == nil {
pingCh <- pingResult{
p: p,
rtt: result.RTT,
}
}
}(p)
}
wg.Wait()
close(waitCh)
close(pingCh)
}()
select {
case <-waitCh:
var min *pingResult
for p := range pingCh {
if min == nil {
min = &p
} else {
if p.rtt < min.rtt {
min = &p
}
}
}
if min == nil {
return nil, ErrNoPeersAvailable
} else {
return &min.p, nil
}
case <-ctx.Done():
return nil, ErrNoPeersAvailable
}
}

View File

@ -0,0 +1,87 @@
package utils
import (
"context"
"crypto/rand"
"fmt"
"testing"
"time"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
"github.com/status-im/go-waku/tests"
"github.com/stretchr/testify/require"
)
func TestSelectPeer(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
h1, err := tests.MakeHost(ctx, 0, rand.Reader)
require.NoError(t, err)
defer h1.Close()
h2, err := tests.MakeHost(ctx, 0, rand.Reader)
require.NoError(t, err)
defer h2.Close()
h3, err := tests.MakeHost(ctx, 0, rand.Reader)
require.NoError(t, err)
defer h3.Close()
proto := "test/protocol"
pingService := ping.NewPingService(h1)
h1.Peerstore().AddAddrs(h2.ID(), h2.Network().ListenAddresses(), peerstore.PermanentAddrTTL)
h1.Peerstore().AddAddrs(h3.ID(), h2.Network().ListenAddresses(), peerstore.PermanentAddrTTL)
// No peers with selected protocol
_, err = SelectPeer(h1, proto)
fmt.Println(err)
require.Error(t, ErrNoPeersAvailable, err)
// Peers with selected protocol
_ = h1.Peerstore().AddProtocols(h2.ID(), proto)
_ = h1.Peerstore().AddProtocols(h3.ID(), proto)
_, err = SelectPeerWithLowestRTT(ctx, pingService, proto)
require.NoError(t, err)
}
func TestSelectPeerWithLowestRTT(t *testing.T) {
// help-wanted: how to slowdown the ping response to properly test the rtt
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
h1, err := tests.MakeHost(ctx, 0, rand.Reader)
require.NoError(t, err)
defer h1.Close()
h2, err := tests.MakeHost(ctx, 0, rand.Reader)
require.NoError(t, err)
defer h2.Close()
h3, err := tests.MakeHost(ctx, 0, rand.Reader)
require.NoError(t, err)
defer h3.Close()
proto := "test/protocol"
pingService := ping.NewPingService(h1)
h1.Peerstore().AddAddrs(h2.ID(), h2.Network().ListenAddresses(), peerstore.PermanentAddrTTL)
h1.Peerstore().AddAddrs(h3.ID(), h2.Network().ListenAddresses(), peerstore.PermanentAddrTTL)
// No peers with selected protocol
_, err = SelectPeerWithLowestRTT(ctx, pingService, proto)
fmt.Println(err)
require.Error(t, ErrNoPeersAvailable, err)
// Peers with selected protocol
_ = h1.Peerstore().AddProtocols(h2.ID(), proto)
_ = h1.Peerstore().AddProtocols(h3.ID(), proto)
_, err = SelectPeerWithLowestRTT(ctx, pingService, proto)
require.NoError(t, err)
}