mirror of
https://github.com/logos-messaging/logos-messaging-go.git
synced 2026-01-10 01:43:08 +00:00
fix: code review
This commit is contained in:
parent
2f55784db7
commit
5c5e0b005b
@ -7,7 +7,6 @@ import (
|
||||
|
||||
"github.com/libp2p/go-libp2p"
|
||||
"github.com/libp2p/go-libp2p-core/peerstore"
|
||||
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
@ -25,13 +24,11 @@ func TestKeepAlive(t *testing.T) {
|
||||
err = host1.Connect(ctx, host1.Peerstore().PeerInfo(host2.ID()))
|
||||
require.NoError(t, err)
|
||||
|
||||
ping := ping.NewPingService(host1)
|
||||
|
||||
require.Len(t, host1.Network().Peers(), 1)
|
||||
|
||||
ctx2, cancel2 := context.WithTimeout(ctx, 3*time.Second)
|
||||
defer cancel2()
|
||||
pingPeer(ctx2, ping, host2.ID())
|
||||
pingPeer(ctx2, host1, host2.ID())
|
||||
|
||||
require.NoError(t, ctx.Err())
|
||||
}
|
||||
|
||||
@ -40,7 +40,6 @@ type WakuNode struct {
|
||||
filter *filter.WakuFilter
|
||||
lightPush *lightpush.WakuLightPush
|
||||
rendezvous *rendezvous.RendezvousService
|
||||
ping *ping.PingService
|
||||
store *store.WakuStore
|
||||
|
||||
bcaster v2.Broadcaster
|
||||
@ -128,13 +127,13 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
|
||||
}
|
||||
|
||||
func (w *WakuNode) Start() error {
|
||||
w.store = store.NewWakuStore(w.host, w.opts.messageProvider, w.ping, w.opts.maxMessages, w.opts.maxDuration)
|
||||
w.store = store.NewWakuStore(w.host, w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration)
|
||||
if w.opts.enableStore {
|
||||
w.startStore()
|
||||
}
|
||||
|
||||
if w.opts.enableFilter {
|
||||
w.filter = filter.NewWakuFilter(w.ctx, w.host, w.ping, w.opts.isFilterFullNode)
|
||||
w.filter = filter.NewWakuFilter(w.ctx, w.host, w.opts.isFilterFullNode)
|
||||
}
|
||||
|
||||
if w.opts.enableRendezvous {
|
||||
@ -147,7 +146,7 @@ func (w *WakuNode) Start() error {
|
||||
return err
|
||||
}
|
||||
|
||||
w.lightPush = lightpush.NewWakuLightPush(w.ctx, w.host, w.ping, w.relay)
|
||||
w.lightPush = lightpush.NewWakuLightPush(w.ctx, w.host, w.relay)
|
||||
if w.opts.enableLightPush {
|
||||
if err := w.lightPush.Start(); err != nil {
|
||||
return err
|
||||
@ -402,7 +401,6 @@ func (w *WakuNode) Peers() PeerStats {
|
||||
func (w *WakuNode) startKeepAlive(t time.Duration) {
|
||||
log.Info("Setting up ping protocol with duration of ", t)
|
||||
|
||||
w.ping = ping.NewPingService(w.host)
|
||||
ticker := time.NewTicker(t)
|
||||
|
||||
go func() {
|
||||
@ -417,7 +415,7 @@ func (w *WakuNode) startKeepAlive(t time.Duration) {
|
||||
// through Network's peer collection, as it will be empty
|
||||
for _, p := range w.host.Peerstore().Peers() {
|
||||
if p != w.host.ID() {
|
||||
go pingPeer(w.ctx, w.ping, p)
|
||||
go pingPeer(w.ctx, w.host, p)
|
||||
}
|
||||
}
|
||||
case <-w.quit:
|
||||
@ -428,12 +426,12 @@ func (w *WakuNode) startKeepAlive(t time.Duration) {
|
||||
}()
|
||||
}
|
||||
|
||||
func pingPeer(ctx context.Context, pingService *ping.PingService, peer peer.ID) {
|
||||
func pingPeer(ctx context.Context, host host.Host, peer peer.ID) {
|
||||
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
|
||||
defer cancel()
|
||||
|
||||
log.Debug("Pinging ", peer)
|
||||
pr := pingService.Ping(ctx, peer)
|
||||
pr := ping.Ping(ctx, host, peer)
|
||||
select {
|
||||
case res := <-pr:
|
||||
if res.Error != nil {
|
||||
|
||||
@ -11,7 +11,6 @@ import (
|
||||
"github.com/libp2p/go-libp2p-core/network"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol"
|
||||
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
|
||||
"github.com/libp2p/go-msgio/protoio"
|
||||
"github.com/status-im/go-waku/waku/v2/metrics"
|
||||
"github.com/status-im/go-waku/waku/v2/protocol"
|
||||
@ -30,7 +29,6 @@ var (
|
||||
type (
|
||||
FilterSubscribeParameters struct {
|
||||
host host.Host
|
||||
ping *ping.PingService
|
||||
selectedPeer peer.ID
|
||||
}
|
||||
|
||||
@ -56,7 +54,6 @@ type (
|
||||
WakuFilter struct {
|
||||
ctx context.Context
|
||||
h host.Host
|
||||
ping *ping.PingService
|
||||
isFullNode bool
|
||||
MsgC chan *protocol.Envelope
|
||||
|
||||
@ -90,7 +87,7 @@ func WithAutomaticPeerSelection() FilterSubscribeOption {
|
||||
|
||||
func WithFastestPeerSelection(ctx context.Context) FilterSubscribeOption {
|
||||
return func(params *FilterSubscribeParameters) {
|
||||
p, err := utils.SelectPeerWithLowestRTT(ctx, params.ping, string(FilterID_v20beta1))
|
||||
p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(FilterID_v20beta1))
|
||||
if err == nil {
|
||||
params.selectedPeer = *p
|
||||
} else {
|
||||
@ -151,7 +148,7 @@ func (wf *WakuFilter) onRequest(s network.Stream) {
|
||||
}
|
||||
}
|
||||
|
||||
func NewWakuFilter(ctx context.Context, host host.Host, ping *ping.PingService, isFullNode bool) *WakuFilter {
|
||||
func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool) *WakuFilter {
|
||||
ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "filter"))
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
@ -161,7 +158,6 @@ func NewWakuFilter(ctx context.Context, host host.Host, ping *ping.PingService,
|
||||
wf.ctx = ctx
|
||||
wf.MsgC = make(chan *protocol.Envelope)
|
||||
wf.h = host
|
||||
wf.ping = ping
|
||||
wf.isFullNode = isFullNode
|
||||
wf.filters = NewFilterMap()
|
||||
wf.subscribers = NewSubscribers()
|
||||
@ -245,7 +241,6 @@ func (wf *WakuFilter) FilterListener() {
|
||||
func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFilter, opts ...FilterSubscribeOption) (subscription *FilterSubscription, err error) {
|
||||
params := new(FilterSubscribeParameters)
|
||||
params.host = wf.h
|
||||
params.ping = wf.ping
|
||||
|
||||
optList := DefaultOptions()
|
||||
optList = append(optList, opts...)
|
||||
|
||||
@ -38,7 +38,7 @@ func makeWakuFilter(t *testing.T) (*WakuFilter, host.Host) {
|
||||
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
|
||||
require.NoError(t, err)
|
||||
|
||||
filter := NewWakuFilter(context.Background(), host, nil, false)
|
||||
filter := NewWakuFilter(context.Background(), host, false)
|
||||
|
||||
return filter, host
|
||||
}
|
||||
@ -68,7 +68,7 @@ func TestWakuFilter(t *testing.T) {
|
||||
defer node2.Stop()
|
||||
defer sub2.Unsubscribe()
|
||||
|
||||
node2Filter := NewWakuFilter(ctx, host2, nil, true)
|
||||
node2Filter := NewWakuFilter(ctx, host2, true)
|
||||
broadcaster.Register(node2Filter.MsgC)
|
||||
|
||||
host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL)
|
||||
|
||||
@ -10,7 +10,6 @@ import (
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
"github.com/libp2p/go-libp2p-core/network"
|
||||
libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol"
|
||||
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
|
||||
"github.com/libp2p/go-msgio/protoio"
|
||||
"github.com/status-im/go-waku/waku/v2/metrics"
|
||||
"github.com/status-im/go-waku/waku/v2/protocol"
|
||||
@ -29,16 +28,14 @@ var (
|
||||
|
||||
type WakuLightPush struct {
|
||||
h host.Host
|
||||
ping *ping.PingService
|
||||
relay *relay.WakuRelay
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
func NewWakuLightPush(ctx context.Context, h host.Host, ping *ping.PingService, relay *relay.WakuRelay) *WakuLightPush {
|
||||
func NewWakuLightPush(ctx context.Context, h host.Host, relay *relay.WakuRelay) *WakuLightPush {
|
||||
wakuLP := new(WakuLightPush)
|
||||
wakuLP.relay = relay
|
||||
wakuLP.ctx = ctx
|
||||
wakuLP.ping = ping
|
||||
wakuLP.h = h
|
||||
|
||||
return wakuLP
|
||||
@ -125,7 +122,7 @@ func (wakuLP *WakuLightPush) onRequest(s network.Stream) {
|
||||
|
||||
func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, opts ...LightPushOption) (*pb.PushResponse, error) {
|
||||
params := new(LightPushParameters)
|
||||
params.ping = wakuLP.ping
|
||||
params.host = wakuLP.h
|
||||
|
||||
optList := DefaultOptions(wakuLP.h)
|
||||
optList = append(optList, opts...)
|
||||
|
||||
@ -5,14 +5,13 @@ import (
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
|
||||
"github.com/status-im/go-waku/waku/v2/protocol"
|
||||
"github.com/status-im/go-waku/waku/v2/utils"
|
||||
)
|
||||
|
||||
type LightPushParameters struct {
|
||||
host host.Host
|
||||
selectedPeer peer.ID
|
||||
ping *ping.PingService
|
||||
requestId []byte
|
||||
}
|
||||
|
||||
@ -37,7 +36,7 @@ func WithAutomaticPeerSelection(host host.Host) LightPushOption {
|
||||
|
||||
func WithFastestPeerSelection(ctx context.Context) LightPushOption {
|
||||
return func(params *LightPushParameters) {
|
||||
p, err := utils.SelectPeerWithLowestRTT(ctx, params.ping, string(LightPushID_v20beta1))
|
||||
p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(LightPushID_v20beta1))
|
||||
if err == nil {
|
||||
params.selectedPeer = *p
|
||||
} else {
|
||||
|
||||
@ -55,7 +55,7 @@ func TestWakuLightPush(t *testing.T) {
|
||||
defer sub2.Unsubscribe()
|
||||
|
||||
ctx := context.Background()
|
||||
lightPushNode2 := NewWakuLightPush(ctx, host2, nil, node2)
|
||||
lightPushNode2 := NewWakuLightPush(ctx, host2, node2)
|
||||
err := lightPushNode2.Start()
|
||||
require.NoError(t, err)
|
||||
defer lightPushNode2.Stop()
|
||||
@ -65,7 +65,7 @@ func TestWakuLightPush(t *testing.T) {
|
||||
|
||||
clientHost, err := tests.MakeHost(context.Background(), port, rand.Reader)
|
||||
require.NoError(t, err)
|
||||
client := NewWakuLightPush(ctx, clientHost, nil, nil)
|
||||
client := NewWakuLightPush(ctx, clientHost, nil)
|
||||
|
||||
host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL)
|
||||
err = host2.Peerstore().AddProtocols(host1.ID(), string(relay.WakuRelayID_v200))
|
||||
@ -121,7 +121,7 @@ func TestWakuLightPushStartWithoutRelay(t *testing.T) {
|
||||
|
||||
clientHost, err := tests.MakeHost(context.Background(), 0, rand.Reader)
|
||||
require.NoError(t, err)
|
||||
client := NewWakuLightPush(ctx, clientHost, nil, nil)
|
||||
client := NewWakuLightPush(ctx, clientHost, nil)
|
||||
err = client.Start()
|
||||
|
||||
require.Errorf(t, err, "relay is required")
|
||||
@ -135,7 +135,7 @@ func TestWakuLightPushNoPeers(t *testing.T) {
|
||||
|
||||
clientHost, err := tests.MakeHost(context.Background(), 0, rand.Reader)
|
||||
require.NoError(t, err)
|
||||
client := NewWakuLightPush(ctx, clientHost, nil, nil)
|
||||
client := NewWakuLightPush(ctx, clientHost, nil)
|
||||
|
||||
_, err = client.Publish(ctx, tests.CreateWakuMessage("test", float64(0)), &testTopic)
|
||||
require.Errorf(t, err, "no suitable remote peers")
|
||||
|
||||
@ -21,7 +21,7 @@ func TestFindLastSeenMessage(t *testing.T) {
|
||||
msg4 := protocol.NewEnvelope(tests.CreateWakuMessage("4", 4), "test")
|
||||
msg5 := protocol.NewEnvelope(tests.CreateWakuMessage("5", 5), "test")
|
||||
|
||||
s := NewWakuStore(nil, nil, nil, 0, 0)
|
||||
s := NewWakuStore(nil, nil, 0, 0)
|
||||
s.storeMessage(msg1)
|
||||
s.storeMessage(msg3)
|
||||
s.storeMessage(msg5)
|
||||
@ -38,7 +38,7 @@ func TestResume(t *testing.T) {
|
||||
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
||||
require.NoError(t, err)
|
||||
|
||||
s1 := NewWakuStore(host1, nil, nil, 0, 0)
|
||||
s1 := NewWakuStore(host1, nil, 0, 0)
|
||||
s1.Start(ctx)
|
||||
defer s1.Stop()
|
||||
|
||||
@ -55,7 +55,7 @@ func TestResume(t *testing.T) {
|
||||
host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
||||
require.NoError(t, err)
|
||||
|
||||
s2 := NewWakuStore(host2, nil, nil, 0, 0)
|
||||
s2 := NewWakuStore(host2, nil, 0, 0)
|
||||
s2.Start(ctx)
|
||||
defer s2.Stop()
|
||||
|
||||
@ -87,7 +87,7 @@ func TestResumeWithListOfPeers(t *testing.T) {
|
||||
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
||||
require.NoError(t, err)
|
||||
|
||||
s1 := NewWakuStore(host1, nil, nil, 0, 0)
|
||||
s1 := NewWakuStore(host1, nil, 0, 0)
|
||||
s1.Start(ctx)
|
||||
defer s1.Stop()
|
||||
|
||||
@ -98,7 +98,7 @@ func TestResumeWithListOfPeers(t *testing.T) {
|
||||
host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
||||
require.NoError(t, err)
|
||||
|
||||
s2 := NewWakuStore(host2, nil, nil, 0, 0)
|
||||
s2 := NewWakuStore(host2, nil, 0, 0)
|
||||
s2.Start(ctx)
|
||||
defer s2.Stop()
|
||||
|
||||
@ -120,7 +120,7 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) {
|
||||
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
||||
require.NoError(t, err)
|
||||
|
||||
s1 := NewWakuStore(host1, nil, nil, 0, 0)
|
||||
s1 := NewWakuStore(host1, nil, 0, 0)
|
||||
s1.Start(ctx)
|
||||
defer s1.Stop()
|
||||
|
||||
@ -131,7 +131,7 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) {
|
||||
host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
||||
require.NoError(t, err)
|
||||
|
||||
s2 := NewWakuStore(host2, nil, nil, 0, 0)
|
||||
s2 := NewWakuStore(host2, nil, 0, 0)
|
||||
s2.Start(ctx)
|
||||
defer s2.Stop()
|
||||
|
||||
|
||||
@ -15,7 +15,6 @@ import (
|
||||
"github.com/libp2p/go-libp2p-core/network"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
libp2pProtocol "github.com/libp2p/go-libp2p-core/protocol"
|
||||
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
|
||||
"github.com/libp2p/go-msgio/protoio"
|
||||
|
||||
"github.com/status-im/go-waku/waku/persistence"
|
||||
@ -234,15 +233,13 @@ type WakuStore struct {
|
||||
messageQueue *MessageQueue
|
||||
msgProvider MessageProvider
|
||||
h host.Host
|
||||
ping *ping.PingService
|
||||
}
|
||||
|
||||
// NewWakuStore creates a WakuStore using an specific MessageProvider for storing the messages
|
||||
func NewWakuStore(host host.Host, p MessageProvider, ping *ping.PingService, maxNumberOfMessages int, maxRetentionDuration time.Duration) *WakuStore {
|
||||
func NewWakuStore(host host.Host, p MessageProvider, maxNumberOfMessages int, maxRetentionDuration time.Duration) *WakuStore {
|
||||
wakuStore := new(WakuStore)
|
||||
wakuStore.msgProvider = p
|
||||
wakuStore.h = host
|
||||
wakuStore.ping = ping
|
||||
wakuStore.messageQueue = NewMessageQueue(maxNumberOfMessages, maxRetentionDuration)
|
||||
return wakuStore
|
||||
}
|
||||
@ -414,7 +411,6 @@ func findIndex(msgList []IndexedWakuMessage, index *pb.Index) int {
|
||||
}
|
||||
|
||||
type HistoryRequestParameters struct {
|
||||
ping *ping.PingService
|
||||
selectedPeer peer.ID
|
||||
requestId []byte
|
||||
cursor *pb.Index
|
||||
@ -448,7 +444,7 @@ func WithAutomaticPeerSelection() HistoryRequestOption {
|
||||
|
||||
func WithFastestPeerSelection(ctx context.Context) HistoryRequestOption {
|
||||
return func(params *HistoryRequestParameters) {
|
||||
p, err := utils.SelectPeerWithLowestRTT(ctx, params.ping, string(StoreID_v20beta3))
|
||||
p, err := utils.SelectPeerWithLowestRTT(ctx, params.s.h, string(StoreID_v20beta3))
|
||||
if err == nil {
|
||||
params.selectedPeer = *p
|
||||
} else {
|
||||
@ -545,7 +541,6 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR
|
||||
|
||||
params := new(HistoryRequestParameters)
|
||||
params.s = store
|
||||
params.ping = store.ping
|
||||
|
||||
optList := DefaultOptions()
|
||||
optList = append(optList, opts...)
|
||||
|
||||
@ -24,7 +24,7 @@ func TestStorePersistence(t *testing.T) {
|
||||
dbStore, err := persistence.NewDBStore(persistence.WithDB(db))
|
||||
require.NoError(t, err)
|
||||
|
||||
s1 := NewWakuStore(nil, dbStore, nil, 0, 0)
|
||||
s1 := NewWakuStore(nil, dbStore, 0, 0)
|
||||
s1.fetchDBRecords(ctx)
|
||||
require.Len(t, s1.messageQueue.messages, 0)
|
||||
|
||||
@ -39,7 +39,7 @@ func TestStorePersistence(t *testing.T) {
|
||||
|
||||
s1.storeMessage(protocol.NewEnvelope(msg, defaultPubSubTopic))
|
||||
|
||||
s2 := NewWakuStore(nil, dbStore, nil, 0, 0)
|
||||
s2 := NewWakuStore(nil, dbStore, 0, 0)
|
||||
s2.fetchDBRecords(ctx)
|
||||
require.Len(t, s2.messageQueue.messages, 1)
|
||||
require.Equal(t, msg, s2.messageQueue.messages[0].msg)
|
||||
|
||||
@ -20,7 +20,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) {
|
||||
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
||||
require.NoError(t, err)
|
||||
|
||||
s1 := NewWakuStore(host1, nil, nil, 0, 0)
|
||||
s1 := NewWakuStore(host1, nil, 0, 0)
|
||||
s1.Start(ctx)
|
||||
defer s1.Stop()
|
||||
|
||||
@ -39,7 +39,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) {
|
||||
// Simulate a message has been received via relay protocol
|
||||
s1.MsgC <- protocol.NewEnvelope(msg, pubsubTopic1)
|
||||
|
||||
s2 := NewWakuStore(host2, nil, nil, 0, 0)
|
||||
s2 := NewWakuStore(host2, nil, 0, 0)
|
||||
s2.Start(ctx)
|
||||
defer s2.Stop()
|
||||
|
||||
@ -66,7 +66,7 @@ func TestWakuStoreProtocolNext(t *testing.T) {
|
||||
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
||||
require.NoError(t, err)
|
||||
|
||||
s1 := NewWakuStore(host1, nil, nil, 0, 0)
|
||||
s1 := NewWakuStore(host1, nil, 0, 0)
|
||||
s1.Start(ctx)
|
||||
defer s1.Stop()
|
||||
|
||||
@ -92,7 +92,7 @@ func TestWakuStoreProtocolNext(t *testing.T) {
|
||||
err = host2.Peerstore().AddProtocols(host1.ID(), string(StoreID_v20beta3))
|
||||
require.NoError(t, err)
|
||||
|
||||
s2 := NewWakuStore(host2, nil, nil, 0, 0)
|
||||
s2 := NewWakuStore(host2, nil, 0, 0)
|
||||
s2.Start(ctx)
|
||||
defer s2.Stop()
|
||||
|
||||
|
||||
@ -17,7 +17,7 @@ func TestStoreQuery(t *testing.T) {
|
||||
msg1 := tests.CreateWakuMessage(defaultContentTopic, utils.GetUnixEpoch())
|
||||
msg2 := tests.CreateWakuMessage("2", utils.GetUnixEpoch())
|
||||
|
||||
s := NewWakuStore(nil, nil, nil, 0, 0)
|
||||
s := NewWakuStore(nil, nil, 0, 0)
|
||||
s.storeMessage(protocol.NewEnvelope(msg1, defaultPubSubTopic))
|
||||
s.storeMessage(protocol.NewEnvelope(msg2, defaultPubSubTopic))
|
||||
|
||||
@ -43,7 +43,7 @@ func TestStoreQueryMultipleContentFilters(t *testing.T) {
|
||||
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
|
||||
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
|
||||
|
||||
s := NewWakuStore(nil, nil, nil, 0, 0)
|
||||
s := NewWakuStore(nil, nil, 0, 0)
|
||||
|
||||
s.storeMessage(protocol.NewEnvelope(msg1, defaultPubSubTopic))
|
||||
s.storeMessage(protocol.NewEnvelope(msg2, defaultPubSubTopic))
|
||||
@ -77,7 +77,7 @@ func TestStoreQueryPubsubTopicFilter(t *testing.T) {
|
||||
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
|
||||
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
|
||||
|
||||
s := NewWakuStore(nil, nil, nil, 0, 0)
|
||||
s := NewWakuStore(nil, nil, 0, 0)
|
||||
s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic1))
|
||||
s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic2))
|
||||
s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic2))
|
||||
@ -109,7 +109,7 @@ func TestStoreQueryPubsubTopicNoMatch(t *testing.T) {
|
||||
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
|
||||
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
|
||||
|
||||
s := NewWakuStore(nil, nil, nil, 0, 0)
|
||||
s := NewWakuStore(nil, nil, 0, 0)
|
||||
s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic2))
|
||||
s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic2))
|
||||
s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic2))
|
||||
@ -131,7 +131,7 @@ func TestStoreQueryPubsubTopicAllMessages(t *testing.T) {
|
||||
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
|
||||
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
|
||||
|
||||
s := NewWakuStore(nil, nil, nil, 0, 0)
|
||||
s := NewWakuStore(nil, nil, 0, 0)
|
||||
s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic1))
|
||||
s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic1))
|
||||
s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic1))
|
||||
@ -150,7 +150,7 @@ func TestStoreQueryForwardPagination(t *testing.T) {
|
||||
topic1 := "1"
|
||||
pubsubTopic1 := "topic1"
|
||||
|
||||
s := NewWakuStore(nil, nil, nil, 0, 0)
|
||||
s := NewWakuStore(nil, nil, 0, 0)
|
||||
for i := 0; i < 10; i++ {
|
||||
msg := tests.CreateWakuMessage(topic1, utils.GetUnixEpoch())
|
||||
msg.Payload = []byte{byte(i)}
|
||||
@ -174,7 +174,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) {
|
||||
topic1 := "1"
|
||||
pubsubTopic1 := "topic1"
|
||||
|
||||
s := NewWakuStore(nil, nil, nil, 0, 0)
|
||||
s := NewWakuStore(nil, nil, 0, 0)
|
||||
for i := 0; i < 10; i++ {
|
||||
msg := &pb.WakuMessage{
|
||||
Payload: []byte{byte(i)},
|
||||
@ -200,7 +200,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestTemporalHistoryQueries(t *testing.T) {
|
||||
s := NewWakuStore(nil, nil, nil, 0, 0)
|
||||
s := NewWakuStore(nil, nil, 0, 0)
|
||||
|
||||
var messages []*pb.WakuMessage
|
||||
for i := 0; i < 10; i++ {
|
||||
|
||||
@ -52,14 +52,10 @@ type pingResult struct {
|
||||
rtt time.Duration
|
||||
}
|
||||
|
||||
func SelectPeerWithLowestRTT(ctx context.Context, pingService *ping.PingService, protocolId string) (*peer.ID, error) {
|
||||
if pingService == nil {
|
||||
return nil, PingServiceNotAvailable
|
||||
}
|
||||
|
||||
func SelectPeerWithLowestRTT(ctx context.Context, host host.Host, protocolId string) (*peer.ID, error) {
|
||||
var peers peer.IDSlice
|
||||
for _, peer := range pingService.Host.Peerstore().Peers() {
|
||||
protocols, err := pingService.Host.Peerstore().SupportsProtocols(peer, protocolId)
|
||||
for _, peer := range host.Peerstore().Peers() {
|
||||
protocols, err := host.Peerstore().SupportsProtocols(peer, protocolId)
|
||||
if err != nil {
|
||||
log.Error("error obtaining the protocols supported by peers", err)
|
||||
return nil, err
|
||||
@ -81,7 +77,7 @@ func SelectPeerWithLowestRTT(ctx context.Context, pingService *ping.PingService,
|
||||
defer wg.Done()
|
||||
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
|
||||
defer cancel()
|
||||
result := <-pingService.Ping(ctx, p)
|
||||
result := <-ping.Ping(ctx, host, p)
|
||||
if result.Error == nil {
|
||||
pingCh <- pingResult{
|
||||
p: p,
|
||||
|
||||
@ -8,7 +8,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/peerstore"
|
||||
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
|
||||
"github.com/status-im/go-waku/tests"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
@ -30,21 +29,19 @@ func TestSelectPeer(t *testing.T) {
|
||||
defer h3.Close()
|
||||
|
||||
proto := "test/protocol"
|
||||
pingService := ping.NewPingService(h1)
|
||||
|
||||
h1.Peerstore().AddAddrs(h2.ID(), h2.Network().ListenAddresses(), peerstore.PermanentAddrTTL)
|
||||
h1.Peerstore().AddAddrs(h3.ID(), h2.Network().ListenAddresses(), peerstore.PermanentAddrTTL)
|
||||
|
||||
// No peers with selected protocol
|
||||
_, err = SelectPeer(h1, proto)
|
||||
fmt.Println(err)
|
||||
require.Error(t, ErrNoPeersAvailable, err)
|
||||
|
||||
// Peers with selected protocol
|
||||
_ = h1.Peerstore().AddProtocols(h2.ID(), proto)
|
||||
_ = h1.Peerstore().AddProtocols(h3.ID(), proto)
|
||||
|
||||
_, err = SelectPeerWithLowestRTT(ctx, pingService, proto)
|
||||
_, err = SelectPeerWithLowestRTT(ctx, h1, proto)
|
||||
require.NoError(t, err)
|
||||
|
||||
}
|
||||
@ -68,13 +65,12 @@ func TestSelectPeerWithLowestRTT(t *testing.T) {
|
||||
defer h3.Close()
|
||||
|
||||
proto := "test/protocol"
|
||||
pingService := ping.NewPingService(h1)
|
||||
|
||||
h1.Peerstore().AddAddrs(h2.ID(), h2.Network().ListenAddresses(), peerstore.PermanentAddrTTL)
|
||||
h1.Peerstore().AddAddrs(h3.ID(), h2.Network().ListenAddresses(), peerstore.PermanentAddrTTL)
|
||||
|
||||
// No peers with selected protocol
|
||||
_, err = SelectPeerWithLowestRTT(ctx, pingService, proto)
|
||||
_, err = SelectPeerWithLowestRTT(ctx, h1, proto)
|
||||
fmt.Println(err)
|
||||
require.Error(t, ErrNoPeersAvailable, err)
|
||||
|
||||
@ -82,6 +78,6 @@ func TestSelectPeerWithLowestRTT(t *testing.T) {
|
||||
_ = h1.Peerstore().AddProtocols(h2.ID(), proto)
|
||||
_ = h1.Peerstore().AddProtocols(h3.ID(), proto)
|
||||
|
||||
_, err = SelectPeerWithLowestRTT(ctx, pingService, proto)
|
||||
_, err = SelectPeerWithLowestRTT(ctx, h1, proto)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user