From 5c5e0b005b3a764f69a22f0738998b81ff3755f7 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Tue, 9 Nov 2021 19:18:14 -0400 Subject: [PATCH] fix: code review --- waku/v2/node/keepalive_test.go | 5 +---- waku/v2/node/wakunode2.go | 14 ++++++-------- waku/v2/protocol/filter/waku_filter.go | 9 ++------- waku/v2/protocol/filter/waku_filter_test.go | 4 ++-- waku/v2/protocol/lightpush/waku_lightpush.go | 7 ++----- .../protocol/lightpush/waku_lightpush_option.go | 5 ++--- .../v2/protocol/lightpush/waku_lightpush_test.go | 8 ++++---- waku/v2/protocol/store/waku_resume_test.go | 14 +++++++------- waku/v2/protocol/store/waku_store.go | 9 ++------- .../store/waku_store_persistence_test.go | 4 ++-- .../protocol/store/waku_store_protocol_test.go | 8 ++++---- waku/v2/protocol/store/waku_store_query_test.go | 16 ++++++++-------- waku/v2/utils/peer.go | 12 ++++-------- waku/v2/utils/peer_test.go | 10 +++------- 14 files changed, 49 insertions(+), 76 deletions(-) diff --git a/waku/v2/node/keepalive_test.go b/waku/v2/node/keepalive_test.go index d90b12de..8d72e193 100644 --- a/waku/v2/node/keepalive_test.go +++ b/waku/v2/node/keepalive_test.go @@ -7,7 +7,6 @@ import ( "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p-core/peerstore" - "github.com/libp2p/go-libp2p/p2p/protocol/ping" "github.com/stretchr/testify/require" ) @@ -25,13 +24,11 @@ func TestKeepAlive(t *testing.T) { err = host1.Connect(ctx, host1.Peerstore().PeerInfo(host2.ID())) require.NoError(t, err) - ping := ping.NewPingService(host1) - require.Len(t, host1.Network().Peers(), 1) ctx2, cancel2 := context.WithTimeout(ctx, 3*time.Second) defer cancel2() - pingPeer(ctx2, ping, host2.ID()) + pingPeer(ctx2, host1, host2.ID()) require.NoError(t, ctx.Err()) } diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 77eccc1e..32b3de51 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -40,7 +40,6 @@ type WakuNode struct { filter *filter.WakuFilter lightPush *lightpush.WakuLightPush rendezvous *rendezvous.RendezvousService - ping *ping.PingService store *store.WakuStore bcaster v2.Broadcaster @@ -128,13 +127,13 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { } func (w *WakuNode) Start() error { - w.store = store.NewWakuStore(w.host, w.opts.messageProvider, w.ping, w.opts.maxMessages, w.opts.maxDuration) + w.store = store.NewWakuStore(w.host, w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration) if w.opts.enableStore { w.startStore() } if w.opts.enableFilter { - w.filter = filter.NewWakuFilter(w.ctx, w.host, w.ping, w.opts.isFilterFullNode) + w.filter = filter.NewWakuFilter(w.ctx, w.host, w.opts.isFilterFullNode) } if w.opts.enableRendezvous { @@ -147,7 +146,7 @@ func (w *WakuNode) Start() error { return err } - w.lightPush = lightpush.NewWakuLightPush(w.ctx, w.host, w.ping, w.relay) + w.lightPush = lightpush.NewWakuLightPush(w.ctx, w.host, w.relay) if w.opts.enableLightPush { if err := w.lightPush.Start(); err != nil { return err @@ -402,7 +401,6 @@ func (w *WakuNode) Peers() PeerStats { func (w *WakuNode) startKeepAlive(t time.Duration) { log.Info("Setting up ping protocol with duration of ", t) - w.ping = ping.NewPingService(w.host) ticker := time.NewTicker(t) go func() { @@ -417,7 +415,7 @@ func (w *WakuNode) startKeepAlive(t time.Duration) { // through Network's peer collection, as it will be empty for _, p := range w.host.Peerstore().Peers() { if p != w.host.ID() { - go pingPeer(w.ctx, w.ping, p) + go pingPeer(w.ctx, w.host, p) } } case <-w.quit: @@ -428,12 +426,12 @@ func (w *WakuNode) startKeepAlive(t time.Duration) { }() } -func pingPeer(ctx context.Context, pingService *ping.PingService, peer peer.ID) { +func pingPeer(ctx context.Context, host host.Host, peer peer.ID) { ctx, cancel := context.WithTimeout(ctx, 3*time.Second) defer cancel() log.Debug("Pinging ", peer) - pr := pingService.Ping(ctx, peer) + pr := ping.Ping(ctx, host, peer) select { case res := <-pr: if res.Error != nil { diff --git a/waku/v2/protocol/filter/waku_filter.go b/waku/v2/protocol/filter/waku_filter.go index cf53952e..161a9782 100644 --- a/waku/v2/protocol/filter/waku_filter.go +++ b/waku/v2/protocol/filter/waku_filter.go @@ -11,7 +11,6 @@ import ( "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol" - "github.com/libp2p/go-libp2p/p2p/protocol/ping" "github.com/libp2p/go-msgio/protoio" "github.com/status-im/go-waku/waku/v2/metrics" "github.com/status-im/go-waku/waku/v2/protocol" @@ -30,7 +29,6 @@ var ( type ( FilterSubscribeParameters struct { host host.Host - ping *ping.PingService selectedPeer peer.ID } @@ -56,7 +54,6 @@ type ( WakuFilter struct { ctx context.Context h host.Host - ping *ping.PingService isFullNode bool MsgC chan *protocol.Envelope @@ -90,7 +87,7 @@ func WithAutomaticPeerSelection() FilterSubscribeOption { func WithFastestPeerSelection(ctx context.Context) FilterSubscribeOption { return func(params *FilterSubscribeParameters) { - p, err := utils.SelectPeerWithLowestRTT(ctx, params.ping, string(FilterID_v20beta1)) + p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(FilterID_v20beta1)) if err == nil { params.selectedPeer = *p } else { @@ -151,7 +148,7 @@ func (wf *WakuFilter) onRequest(s network.Stream) { } } -func NewWakuFilter(ctx context.Context, host host.Host, ping *ping.PingService, isFullNode bool) *WakuFilter { +func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool) *WakuFilter { ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "filter")) if err != nil { log.Error(err) @@ -161,7 +158,6 @@ func NewWakuFilter(ctx context.Context, host host.Host, ping *ping.PingService, wf.ctx = ctx wf.MsgC = make(chan *protocol.Envelope) wf.h = host - wf.ping = ping wf.isFullNode = isFullNode wf.filters = NewFilterMap() wf.subscribers = NewSubscribers() @@ -245,7 +241,6 @@ func (wf *WakuFilter) FilterListener() { func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFilter, opts ...FilterSubscribeOption) (subscription *FilterSubscription, err error) { params := new(FilterSubscribeParameters) params.host = wf.h - params.ping = wf.ping optList := DefaultOptions() optList = append(optList, opts...) diff --git a/waku/v2/protocol/filter/waku_filter_test.go b/waku/v2/protocol/filter/waku_filter_test.go index f1728d09..6e28b012 100644 --- a/waku/v2/protocol/filter/waku_filter_test.go +++ b/waku/v2/protocol/filter/waku_filter_test.go @@ -38,7 +38,7 @@ func makeWakuFilter(t *testing.T) (*WakuFilter, host.Host) { host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - filter := NewWakuFilter(context.Background(), host, nil, false) + filter := NewWakuFilter(context.Background(), host, false) return filter, host } @@ -68,7 +68,7 @@ func TestWakuFilter(t *testing.T) { defer node2.Stop() defer sub2.Unsubscribe() - node2Filter := NewWakuFilter(ctx, host2, nil, true) + node2Filter := NewWakuFilter(ctx, host2, true) broadcaster.Register(node2Filter.MsgC) host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL) diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index 561f284a..053364b4 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -10,7 +10,6 @@ import ( "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol" - "github.com/libp2p/go-libp2p/p2p/protocol/ping" "github.com/libp2p/go-msgio/protoio" "github.com/status-im/go-waku/waku/v2/metrics" "github.com/status-im/go-waku/waku/v2/protocol" @@ -29,16 +28,14 @@ var ( type WakuLightPush struct { h host.Host - ping *ping.PingService relay *relay.WakuRelay ctx context.Context } -func NewWakuLightPush(ctx context.Context, h host.Host, ping *ping.PingService, relay *relay.WakuRelay) *WakuLightPush { +func NewWakuLightPush(ctx context.Context, h host.Host, relay *relay.WakuRelay) *WakuLightPush { wakuLP := new(WakuLightPush) wakuLP.relay = relay wakuLP.ctx = ctx - wakuLP.ping = ping wakuLP.h = h return wakuLP @@ -125,7 +122,7 @@ func (wakuLP *WakuLightPush) onRequest(s network.Stream) { func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, opts ...LightPushOption) (*pb.PushResponse, error) { params := new(LightPushParameters) - params.ping = wakuLP.ping + params.host = wakuLP.h optList := DefaultOptions(wakuLP.h) optList = append(optList, opts...) diff --git a/waku/v2/protocol/lightpush/waku_lightpush_option.go b/waku/v2/protocol/lightpush/waku_lightpush_option.go index a6dc6271..d880f597 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_option.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_option.go @@ -5,14 +5,13 @@ import ( "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" - "github.com/libp2p/go-libp2p/p2p/protocol/ping" "github.com/status-im/go-waku/waku/v2/protocol" "github.com/status-im/go-waku/waku/v2/utils" ) type LightPushParameters struct { + host host.Host selectedPeer peer.ID - ping *ping.PingService requestId []byte } @@ -37,7 +36,7 @@ func WithAutomaticPeerSelection(host host.Host) LightPushOption { func WithFastestPeerSelection(ctx context.Context) LightPushOption { return func(params *LightPushParameters) { - p, err := utils.SelectPeerWithLowestRTT(ctx, params.ping, string(LightPushID_v20beta1)) + p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(LightPushID_v20beta1)) if err == nil { params.selectedPeer = *p } else { diff --git a/waku/v2/protocol/lightpush/waku_lightpush_test.go b/waku/v2/protocol/lightpush/waku_lightpush_test.go index 1bfc95a3..a30e3415 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_test.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_test.go @@ -55,7 +55,7 @@ func TestWakuLightPush(t *testing.T) { defer sub2.Unsubscribe() ctx := context.Background() - lightPushNode2 := NewWakuLightPush(ctx, host2, nil, node2) + lightPushNode2 := NewWakuLightPush(ctx, host2, node2) err := lightPushNode2.Start() require.NoError(t, err) defer lightPushNode2.Stop() @@ -65,7 +65,7 @@ func TestWakuLightPush(t *testing.T) { clientHost, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - client := NewWakuLightPush(ctx, clientHost, nil, nil) + client := NewWakuLightPush(ctx, clientHost, nil) host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL) err = host2.Peerstore().AddProtocols(host1.ID(), string(relay.WakuRelayID_v200)) @@ -121,7 +121,7 @@ func TestWakuLightPushStartWithoutRelay(t *testing.T) { clientHost, err := tests.MakeHost(context.Background(), 0, rand.Reader) require.NoError(t, err) - client := NewWakuLightPush(ctx, clientHost, nil, nil) + client := NewWakuLightPush(ctx, clientHost, nil) err = client.Start() require.Errorf(t, err, "relay is required") @@ -135,7 +135,7 @@ func TestWakuLightPushNoPeers(t *testing.T) { clientHost, err := tests.MakeHost(context.Background(), 0, rand.Reader) require.NoError(t, err) - client := NewWakuLightPush(ctx, clientHost, nil, nil) + client := NewWakuLightPush(ctx, clientHost, nil) _, err = client.Publish(ctx, tests.CreateWakuMessage("test", float64(0)), &testTopic) require.Errorf(t, err, "no suitable remote peers") diff --git a/waku/v2/protocol/store/waku_resume_test.go b/waku/v2/protocol/store/waku_resume_test.go index 65b3e733..cf13ef3d 100644 --- a/waku/v2/protocol/store/waku_resume_test.go +++ b/waku/v2/protocol/store/waku_resume_test.go @@ -21,7 +21,7 @@ func TestFindLastSeenMessage(t *testing.T) { msg4 := protocol.NewEnvelope(tests.CreateWakuMessage("4", 4), "test") msg5 := protocol.NewEnvelope(tests.CreateWakuMessage("5", 5), "test") - s := NewWakuStore(nil, nil, nil, 0, 0) + s := NewWakuStore(nil, nil, 0, 0) s.storeMessage(msg1) s.storeMessage(msg3) s.storeMessage(msg5) @@ -38,7 +38,7 @@ func TestResume(t *testing.T) { host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(host1, nil, nil, 0, 0) + s1 := NewWakuStore(host1, nil, 0, 0) s1.Start(ctx) defer s1.Stop() @@ -55,7 +55,7 @@ func TestResume(t *testing.T) { host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s2 := NewWakuStore(host2, nil, nil, 0, 0) + s2 := NewWakuStore(host2, nil, 0, 0) s2.Start(ctx) defer s2.Stop() @@ -87,7 +87,7 @@ func TestResumeWithListOfPeers(t *testing.T) { host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(host1, nil, nil, 0, 0) + s1 := NewWakuStore(host1, nil, 0, 0) s1.Start(ctx) defer s1.Stop() @@ -98,7 +98,7 @@ func TestResumeWithListOfPeers(t *testing.T) { host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s2 := NewWakuStore(host2, nil, nil, 0, 0) + s2 := NewWakuStore(host2, nil, 0, 0) s2.Start(ctx) defer s2.Stop() @@ -120,7 +120,7 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) { host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(host1, nil, nil, 0, 0) + s1 := NewWakuStore(host1, nil, 0, 0) s1.Start(ctx) defer s1.Stop() @@ -131,7 +131,7 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) { host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s2 := NewWakuStore(host2, nil, nil, 0, 0) + s2 := NewWakuStore(host2, nil, 0, 0) s2.Start(ctx) defer s2.Stop() diff --git a/waku/v2/protocol/store/waku_store.go b/waku/v2/protocol/store/waku_store.go index 98150780..90ec1a5a 100644 --- a/waku/v2/protocol/store/waku_store.go +++ b/waku/v2/protocol/store/waku_store.go @@ -15,7 +15,6 @@ import ( "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol" - "github.com/libp2p/go-libp2p/p2p/protocol/ping" "github.com/libp2p/go-msgio/protoio" "github.com/status-im/go-waku/waku/persistence" @@ -234,15 +233,13 @@ type WakuStore struct { messageQueue *MessageQueue msgProvider MessageProvider h host.Host - ping *ping.PingService } // NewWakuStore creates a WakuStore using an specific MessageProvider for storing the messages -func NewWakuStore(host host.Host, p MessageProvider, ping *ping.PingService, maxNumberOfMessages int, maxRetentionDuration time.Duration) *WakuStore { +func NewWakuStore(host host.Host, p MessageProvider, maxNumberOfMessages int, maxRetentionDuration time.Duration) *WakuStore { wakuStore := new(WakuStore) wakuStore.msgProvider = p wakuStore.h = host - wakuStore.ping = ping wakuStore.messageQueue = NewMessageQueue(maxNumberOfMessages, maxRetentionDuration) return wakuStore } @@ -414,7 +411,6 @@ func findIndex(msgList []IndexedWakuMessage, index *pb.Index) int { } type HistoryRequestParameters struct { - ping *ping.PingService selectedPeer peer.ID requestId []byte cursor *pb.Index @@ -448,7 +444,7 @@ func WithAutomaticPeerSelection() HistoryRequestOption { func WithFastestPeerSelection(ctx context.Context) HistoryRequestOption { return func(params *HistoryRequestParameters) { - p, err := utils.SelectPeerWithLowestRTT(ctx, params.ping, string(StoreID_v20beta3)) + p, err := utils.SelectPeerWithLowestRTT(ctx, params.s.h, string(StoreID_v20beta3)) if err == nil { params.selectedPeer = *p } else { @@ -545,7 +541,6 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR params := new(HistoryRequestParameters) params.s = store - params.ping = store.ping optList := DefaultOptions() optList = append(optList, opts...) diff --git a/waku/v2/protocol/store/waku_store_persistence_test.go b/waku/v2/protocol/store/waku_store_persistence_test.go index 6dc509b8..f239c118 100644 --- a/waku/v2/protocol/store/waku_store_persistence_test.go +++ b/waku/v2/protocol/store/waku_store_persistence_test.go @@ -24,7 +24,7 @@ func TestStorePersistence(t *testing.T) { dbStore, err := persistence.NewDBStore(persistence.WithDB(db)) require.NoError(t, err) - s1 := NewWakuStore(nil, dbStore, nil, 0, 0) + s1 := NewWakuStore(nil, dbStore, 0, 0) s1.fetchDBRecords(ctx) require.Len(t, s1.messageQueue.messages, 0) @@ -39,7 +39,7 @@ func TestStorePersistence(t *testing.T) { s1.storeMessage(protocol.NewEnvelope(msg, defaultPubSubTopic)) - s2 := NewWakuStore(nil, dbStore, nil, 0, 0) + s2 := NewWakuStore(nil, dbStore, 0, 0) s2.fetchDBRecords(ctx) require.Len(t, s2.messageQueue.messages, 1) require.Equal(t, msg, s2.messageQueue.messages[0].msg) diff --git a/waku/v2/protocol/store/waku_store_protocol_test.go b/waku/v2/protocol/store/waku_store_protocol_test.go index 65dbf085..50bc943a 100644 --- a/waku/v2/protocol/store/waku_store_protocol_test.go +++ b/waku/v2/protocol/store/waku_store_protocol_test.go @@ -20,7 +20,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) { host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(host1, nil, nil, 0, 0) + s1 := NewWakuStore(host1, nil, 0, 0) s1.Start(ctx) defer s1.Stop() @@ -39,7 +39,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) { // Simulate a message has been received via relay protocol s1.MsgC <- protocol.NewEnvelope(msg, pubsubTopic1) - s2 := NewWakuStore(host2, nil, nil, 0, 0) + s2 := NewWakuStore(host2, nil, 0, 0) s2.Start(ctx) defer s2.Stop() @@ -66,7 +66,7 @@ func TestWakuStoreProtocolNext(t *testing.T) { host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(host1, nil, nil, 0, 0) + s1 := NewWakuStore(host1, nil, 0, 0) s1.Start(ctx) defer s1.Stop() @@ -92,7 +92,7 @@ func TestWakuStoreProtocolNext(t *testing.T) { err = host2.Peerstore().AddProtocols(host1.ID(), string(StoreID_v20beta3)) require.NoError(t, err) - s2 := NewWakuStore(host2, nil, nil, 0, 0) + s2 := NewWakuStore(host2, nil, 0, 0) s2.Start(ctx) defer s2.Stop() diff --git a/waku/v2/protocol/store/waku_store_query_test.go b/waku/v2/protocol/store/waku_store_query_test.go index f4411c74..0f39e25b 100644 --- a/waku/v2/protocol/store/waku_store_query_test.go +++ b/waku/v2/protocol/store/waku_store_query_test.go @@ -17,7 +17,7 @@ func TestStoreQuery(t *testing.T) { msg1 := tests.CreateWakuMessage(defaultContentTopic, utils.GetUnixEpoch()) msg2 := tests.CreateWakuMessage("2", utils.GetUnixEpoch()) - s := NewWakuStore(nil, nil, nil, 0, 0) + s := NewWakuStore(nil, nil, 0, 0) s.storeMessage(protocol.NewEnvelope(msg1, defaultPubSubTopic)) s.storeMessage(protocol.NewEnvelope(msg2, defaultPubSubTopic)) @@ -43,7 +43,7 @@ func TestStoreQueryMultipleContentFilters(t *testing.T) { msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) - s := NewWakuStore(nil, nil, nil, 0, 0) + s := NewWakuStore(nil, nil, 0, 0) s.storeMessage(protocol.NewEnvelope(msg1, defaultPubSubTopic)) s.storeMessage(protocol.NewEnvelope(msg2, defaultPubSubTopic)) @@ -77,7 +77,7 @@ func TestStoreQueryPubsubTopicFilter(t *testing.T) { msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) - s := NewWakuStore(nil, nil, nil, 0, 0) + s := NewWakuStore(nil, nil, 0, 0) s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic1)) s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic2)) s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic2)) @@ -109,7 +109,7 @@ func TestStoreQueryPubsubTopicNoMatch(t *testing.T) { msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) - s := NewWakuStore(nil, nil, nil, 0, 0) + s := NewWakuStore(nil, nil, 0, 0) s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic2)) s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic2)) s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic2)) @@ -131,7 +131,7 @@ func TestStoreQueryPubsubTopicAllMessages(t *testing.T) { msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) - s := NewWakuStore(nil, nil, nil, 0, 0) + s := NewWakuStore(nil, nil, 0, 0) s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic1)) s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic1)) s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic1)) @@ -150,7 +150,7 @@ func TestStoreQueryForwardPagination(t *testing.T) { topic1 := "1" pubsubTopic1 := "topic1" - s := NewWakuStore(nil, nil, nil, 0, 0) + s := NewWakuStore(nil, nil, 0, 0) for i := 0; i < 10; i++ { msg := tests.CreateWakuMessage(topic1, utils.GetUnixEpoch()) msg.Payload = []byte{byte(i)} @@ -174,7 +174,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) { topic1 := "1" pubsubTopic1 := "topic1" - s := NewWakuStore(nil, nil, nil, 0, 0) + s := NewWakuStore(nil, nil, 0, 0) for i := 0; i < 10; i++ { msg := &pb.WakuMessage{ Payload: []byte{byte(i)}, @@ -200,7 +200,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) { } func TestTemporalHistoryQueries(t *testing.T) { - s := NewWakuStore(nil, nil, nil, 0, 0) + s := NewWakuStore(nil, nil, 0, 0) var messages []*pb.WakuMessage for i := 0; i < 10; i++ { diff --git a/waku/v2/utils/peer.go b/waku/v2/utils/peer.go index c686ef0b..862363a4 100644 --- a/waku/v2/utils/peer.go +++ b/waku/v2/utils/peer.go @@ -52,14 +52,10 @@ type pingResult struct { rtt time.Duration } -func SelectPeerWithLowestRTT(ctx context.Context, pingService *ping.PingService, protocolId string) (*peer.ID, error) { - if pingService == nil { - return nil, PingServiceNotAvailable - } - +func SelectPeerWithLowestRTT(ctx context.Context, host host.Host, protocolId string) (*peer.ID, error) { var peers peer.IDSlice - for _, peer := range pingService.Host.Peerstore().Peers() { - protocols, err := pingService.Host.Peerstore().SupportsProtocols(peer, protocolId) + for _, peer := range host.Peerstore().Peers() { + protocols, err := host.Peerstore().SupportsProtocols(peer, protocolId) if err != nil { log.Error("error obtaining the protocols supported by peers", err) return nil, err @@ -81,7 +77,7 @@ func SelectPeerWithLowestRTT(ctx context.Context, pingService *ping.PingService, defer wg.Done() ctx, cancel := context.WithTimeout(ctx, 3*time.Second) defer cancel() - result := <-pingService.Ping(ctx, p) + result := <-ping.Ping(ctx, host, p) if result.Error == nil { pingCh <- pingResult{ p: p, diff --git a/waku/v2/utils/peer_test.go b/waku/v2/utils/peer_test.go index e0596edd..011bd11d 100644 --- a/waku/v2/utils/peer_test.go +++ b/waku/v2/utils/peer_test.go @@ -8,7 +8,6 @@ import ( "time" "github.com/libp2p/go-libp2p-core/peerstore" - "github.com/libp2p/go-libp2p/p2p/protocol/ping" "github.com/status-im/go-waku/tests" "github.com/stretchr/testify/require" ) @@ -30,21 +29,19 @@ func TestSelectPeer(t *testing.T) { defer h3.Close() proto := "test/protocol" - pingService := ping.NewPingService(h1) h1.Peerstore().AddAddrs(h2.ID(), h2.Network().ListenAddresses(), peerstore.PermanentAddrTTL) h1.Peerstore().AddAddrs(h3.ID(), h2.Network().ListenAddresses(), peerstore.PermanentAddrTTL) // No peers with selected protocol _, err = SelectPeer(h1, proto) - fmt.Println(err) require.Error(t, ErrNoPeersAvailable, err) // Peers with selected protocol _ = h1.Peerstore().AddProtocols(h2.ID(), proto) _ = h1.Peerstore().AddProtocols(h3.ID(), proto) - _, err = SelectPeerWithLowestRTT(ctx, pingService, proto) + _, err = SelectPeerWithLowestRTT(ctx, h1, proto) require.NoError(t, err) } @@ -68,13 +65,12 @@ func TestSelectPeerWithLowestRTT(t *testing.T) { defer h3.Close() proto := "test/protocol" - pingService := ping.NewPingService(h1) h1.Peerstore().AddAddrs(h2.ID(), h2.Network().ListenAddresses(), peerstore.PermanentAddrTTL) h1.Peerstore().AddAddrs(h3.ID(), h2.Network().ListenAddresses(), peerstore.PermanentAddrTTL) // No peers with selected protocol - _, err = SelectPeerWithLowestRTT(ctx, pingService, proto) + _, err = SelectPeerWithLowestRTT(ctx, h1, proto) fmt.Println(err) require.Error(t, ErrNoPeersAvailable, err) @@ -82,6 +78,6 @@ func TestSelectPeerWithLowestRTT(t *testing.T) { _ = h1.Peerstore().AddProtocols(h2.ID(), proto) _ = h1.Peerstore().AddProtocols(h3.ID(), proto) - _, err = SelectPeerWithLowestRTT(ctx, pingService, proto) + _, err = SelectPeerWithLowestRTT(ctx, h1, proto) require.NoError(t, err) }