mirror of https://github.com/status-im/go-waku.git
chore: store tests coverage improvement (#993)
This commit is contained in:
parent
ff68934354
commit
68c0ee2598
|
@ -0,0 +1,121 @@
|
||||||
|
package store
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/rand"
|
||||||
|
"github.com/libp2p/go-libp2p/core/peerstore"
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"github.com/waku-org/go-waku/tests"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/peermanager"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/timesource"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
|
"google.golang.org/protobuf/proto"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestQueryOptions(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
pubSubTopic := "/waku/2/go/store/test"
|
||||||
|
contentTopic := "/test/2/my-app/proto"
|
||||||
|
|
||||||
|
// Init hosts with unique ports
|
||||||
|
port, err := tests.FindFreePort(t, "", 5)
|
||||||
|
require.NoError(t, err)
|
||||||
|
host, err := tests.MakeHost(ctx, port, rand.Reader)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
port2, err := tests.FindFreePort(t, "", 5)
|
||||||
|
require.NoError(t, err)
|
||||||
|
host2, err := tests.MakeHost(ctx, port2, rand.Reader)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Let peer manager reside at host
|
||||||
|
pm := peermanager.NewPeerManager(5, 5, utils.Logger())
|
||||||
|
pm.SetHost(host)
|
||||||
|
|
||||||
|
// Add host2 to peerstore
|
||||||
|
host.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL)
|
||||||
|
err = host.Peerstore().AddProtocols(host2.ID(), StoreID_v20beta4)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Create message and subscription
|
||||||
|
msg := tests.CreateWakuMessage(contentTopic, utils.GetUnixEpoch())
|
||||||
|
sub := SimulateSubscription([]*protocol.Envelope{
|
||||||
|
protocol.NewEnvelope(msg, *utils.GetUnixEpoch(), pubSubTopic),
|
||||||
|
})
|
||||||
|
|
||||||
|
// Create store and save our msg into it
|
||||||
|
s := NewWakuStore(MemoryDB(t), pm, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
|
||||||
|
s.SetHost(host)
|
||||||
|
_ = s.storeMessage(protocol.NewEnvelope(msg, *utils.GetUnixEpoch(), pubSubTopic))
|
||||||
|
|
||||||
|
err = s.Start(ctx, sub)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer s.Stop()
|
||||||
|
|
||||||
|
// Create store2 and save our msg into it
|
||||||
|
s2 := NewWakuStore(MemoryDB(t), pm, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
|
||||||
|
s2.SetHost(host2)
|
||||||
|
_ = s2.storeMessage(protocol.NewEnvelope(msg, *utils.GetUnixEpoch(), pubSubTopic))
|
||||||
|
|
||||||
|
sub2 := relay.NewSubscription(protocol.NewContentFilter(relay.DefaultWakuTopic))
|
||||||
|
|
||||||
|
err = s2.Start(ctx, sub2)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer s2.Stop()
|
||||||
|
|
||||||
|
q := Query{
|
||||||
|
PubsubTopic: pubSubTopic,
|
||||||
|
ContentTopics: []string{contentTopic},
|
||||||
|
StartTime: nil,
|
||||||
|
EndTime: nil,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test no peers available
|
||||||
|
_, err = s2.Query(ctx, q)
|
||||||
|
require.Error(t, err)
|
||||||
|
|
||||||
|
// Test peerId and peerAddr options are mutually exclusive
|
||||||
|
_, err = s.Query(ctx, q, WithPeer(host2.ID()), WithPeerAddr(tests.GetHostAddress(host2)))
|
||||||
|
require.Error(t, err)
|
||||||
|
|
||||||
|
// Test peerAddr and peerId options are mutually exclusive
|
||||||
|
_, err = s.Query(ctx, q, WithPeerAddr(tests.GetHostAddress(host2)), WithPeer(host2.ID()))
|
||||||
|
require.Error(t, err)
|
||||||
|
|
||||||
|
// Test WithRequestID
|
||||||
|
result, err := s.Query(ctx, q, WithPeer(host2.ID()), WithRequestID([]byte("requestID")))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, proto.Equal(msg, result.Messages[0]))
|
||||||
|
|
||||||
|
// Save cursor to use it in query with cursor option
|
||||||
|
c := result.Cursor()
|
||||||
|
|
||||||
|
// Test WithCursor
|
||||||
|
result, err = s.Query(ctx, q, WithPeer(host2.ID()), WithCursor(c))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, proto.Equal(msg, result.Messages[0]))
|
||||||
|
|
||||||
|
// Test WithFastestPeerSelection
|
||||||
|
_, err = s.Query(ctx, q, WithFastestPeerSelection())
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, proto.Equal(msg, result.Messages[0]))
|
||||||
|
|
||||||
|
emptyPubSubTopicQuery := Query{
|
||||||
|
PubsubTopic: "",
|
||||||
|
ContentTopics: []string{contentTopic},
|
||||||
|
StartTime: nil,
|
||||||
|
EndTime: nil,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test empty PubSubTopic provided in Query
|
||||||
|
result, err = s.Query(ctx, emptyPubSubTopicQuery, WithPeer(host2.ID()))
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, proto.Equal(msg, result.Messages[0]))
|
||||||
|
|
||||||
|
}
|
|
@ -2,6 +2,9 @@ package store
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"github.com/waku-org/go-waku/waku/persistence"
|
||||||
|
"github.com/waku-org/go-waku/waku/persistence/sqlite"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -366,3 +369,59 @@ func TestWakuStoreProtocolFind(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Nil(t, foundMsg)
|
require.Nil(t, foundMsg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestWakuStoreStart(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
host, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
messageProvider := MemoryDB(t)
|
||||||
|
|
||||||
|
s := NewWakuStore(messageProvider, nil, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
|
||||||
|
s.SetHost(host)
|
||||||
|
|
||||||
|
pubSubTopic := "/waku/2/go/store/test"
|
||||||
|
contentTopic := "/test/2/my-app"
|
||||||
|
|
||||||
|
msg := &pb.WakuMessage{
|
||||||
|
Payload: []byte{1, 2, 3},
|
||||||
|
ContentTopic: contentTopic,
|
||||||
|
Timestamp: utils.GetUnixEpoch(),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Simulate a message has been received via relay protocol
|
||||||
|
sub := SimulateSubscription([]*protocol.Envelope{protocol.NewEnvelope(msg, *utils.GetUnixEpoch(), pubSubTopic)})
|
||||||
|
|
||||||
|
// Store has nil message provider -> Start should return nil/no error
|
||||||
|
s.msgProvider = nil
|
||||||
|
err = s.Start(ctx, sub)
|
||||||
|
require.NoError(t, err)
|
||||||
|
s.Stop()
|
||||||
|
|
||||||
|
// Start again already started store -> Start should return nil/no error
|
||||||
|
s.msgProvider = messageProvider
|
||||||
|
err = s.Start(ctx, sub)
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = s.Start(ctx, sub)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer s.Stop()
|
||||||
|
|
||||||
|
// Store Start cannot start its message provider -> return error
|
||||||
|
var brokenDB *sql.DB
|
||||||
|
brokenDB, err = sqlite.NewDB("sqlite:///no.db", utils.Logger())
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
dbStore, err := persistence.NewDBStore(prometheus.DefaultRegisterer, utils.Logger(), persistence.WithDB(brokenDB),
|
||||||
|
persistence.WithRetentionPolicy(10, 0))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
s2 := NewWakuStore(dbStore, nil, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
|
||||||
|
s2.SetHost(host)
|
||||||
|
|
||||||
|
err = s2.Start(ctx, sub)
|
||||||
|
require.Error(t, err)
|
||||||
|
defer s2.Stop()
|
||||||
|
|
||||||
|
}
|
||||||
|
|
|
@ -248,3 +248,48 @@ func TestTemporalHistoryQueries(t *testing.T) {
|
||||||
|
|
||||||
require.Len(t, response.Messages, 0)
|
require.Len(t, response.Messages, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSetMessageProvider(t *testing.T) {
|
||||||
|
pubSubTopic := "/waku/2/go/store/test"
|
||||||
|
contentTopic := "/test/2/my-app"
|
||||||
|
contentTopic2 := "/test/2/my-app2"
|
||||||
|
|
||||||
|
msg := tests.CreateWakuMessage(contentTopic, utils.GetUnixEpoch())
|
||||||
|
msg2 := tests.CreateWakuMessage(contentTopic2, utils.GetUnixEpoch())
|
||||||
|
|
||||||
|
msgProvider := MemoryDB(t)
|
||||||
|
msgProvider2 := MemoryDB(t)
|
||||||
|
|
||||||
|
s := NewWakuStore(msgProvider, nil, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
|
||||||
|
_ = s.storeMessage(protocol.NewEnvelope(msg, *utils.GetUnixEpoch(), pubSubTopic))
|
||||||
|
|
||||||
|
s2 := NewWakuStore(msgProvider2, nil, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
|
||||||
|
_ = s2.storeMessage(protocol.NewEnvelope(msg2, *utils.GetUnixEpoch(), pubSubTopic))
|
||||||
|
|
||||||
|
// Swap providers -> messages should follow regardless of the store object values
|
||||||
|
s.SetMessageProvider(msgProvider2)
|
||||||
|
s2.SetMessageProvider(msgProvider)
|
||||||
|
|
||||||
|
response := s.FindMessages(&pb.HistoryQuery{
|
||||||
|
ContentFilters: []*pb.ContentFilter{
|
||||||
|
{
|
||||||
|
ContentTopic: contentTopic2,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
require.Len(t, response.Messages, 1)
|
||||||
|
require.True(t, proto.Equal(msg2, response.Messages[0]))
|
||||||
|
|
||||||
|
response2 := s2.FindMessages(&pb.HistoryQuery{
|
||||||
|
ContentFilters: []*pb.ContentFilter{
|
||||||
|
{
|
||||||
|
ContentTopic: contentTopic,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
require.Len(t, response2.Messages, 1)
|
||||||
|
require.True(t, proto.Equal(msg, response2.Messages[0]))
|
||||||
|
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue