From 68c0ee2598b260a002d3b2fab6628e7814dd719b Mon Sep 17 00:00:00 2001 From: Roman Zajic Date: Fri, 5 Jan 2024 20:14:17 +0800 Subject: [PATCH] chore: store tests coverage improvement (#993) --- .../protocol/store/waku_store_client_test.go | 121 ++++++++++++++++++ .../store/waku_store_protocol_test.go | 59 +++++++++ .../protocol/store/waku_store_query_test.go | 45 +++++++ 3 files changed, 225 insertions(+) create mode 100644 waku/v2/protocol/store/waku_store_client_test.go diff --git a/waku/v2/protocol/store/waku_store_client_test.go b/waku/v2/protocol/store/waku_store_client_test.go new file mode 100644 index 00000000..e92056b5 --- /dev/null +++ b/waku/v2/protocol/store/waku_store_client_test.go @@ -0,0 +1,121 @@ +package store + +import ( + "context" + "crypto/rand" + "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + "github.com/waku-org/go-waku/tests" + "github.com/waku-org/go-waku/waku/v2/peermanager" + "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "github.com/waku-org/go-waku/waku/v2/timesource" + "github.com/waku-org/go-waku/waku/v2/utils" + "google.golang.org/protobuf/proto" + "testing" +) + +func TestQueryOptions(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + pubSubTopic := "/waku/2/go/store/test" + contentTopic := "/test/2/my-app/proto" + + // Init hosts with unique ports + port, err := tests.FindFreePort(t, "", 5) + require.NoError(t, err) + host, err := tests.MakeHost(ctx, port, rand.Reader) + require.NoError(t, err) + + port2, err := tests.FindFreePort(t, "", 5) + require.NoError(t, err) + host2, err := tests.MakeHost(ctx, port2, rand.Reader) + require.NoError(t, err) + + // Let peer manager reside at host + pm := peermanager.NewPeerManager(5, 5, utils.Logger()) + pm.SetHost(host) + + // Add host2 to peerstore + host.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL) + err = host.Peerstore().AddProtocols(host2.ID(), StoreID_v20beta4) + require.NoError(t, err) + + // Create message and subscription + msg := tests.CreateWakuMessage(contentTopic, utils.GetUnixEpoch()) + sub := SimulateSubscription([]*protocol.Envelope{ + protocol.NewEnvelope(msg, *utils.GetUnixEpoch(), pubSubTopic), + }) + + // Create store and save our msg into it + s := NewWakuStore(MemoryDB(t), pm, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) + s.SetHost(host) + _ = s.storeMessage(protocol.NewEnvelope(msg, *utils.GetUnixEpoch(), pubSubTopic)) + + err = s.Start(ctx, sub) + require.NoError(t, err) + defer s.Stop() + + // Create store2 and save our msg into it + s2 := NewWakuStore(MemoryDB(t), pm, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) + s2.SetHost(host2) + _ = s2.storeMessage(protocol.NewEnvelope(msg, *utils.GetUnixEpoch(), pubSubTopic)) + + sub2 := relay.NewSubscription(protocol.NewContentFilter(relay.DefaultWakuTopic)) + + err = s2.Start(ctx, sub2) + require.NoError(t, err) + defer s2.Stop() + + q := Query{ + PubsubTopic: pubSubTopic, + ContentTopics: []string{contentTopic}, + StartTime: nil, + EndTime: nil, + } + + // Test no peers available + _, err = s2.Query(ctx, q) + require.Error(t, err) + + // Test peerId and peerAddr options are mutually exclusive + _, err = s.Query(ctx, q, WithPeer(host2.ID()), WithPeerAddr(tests.GetHostAddress(host2))) + require.Error(t, err) + + // Test peerAddr and peerId options are mutually exclusive + _, err = s.Query(ctx, q, WithPeerAddr(tests.GetHostAddress(host2)), WithPeer(host2.ID())) + require.Error(t, err) + + // Test WithRequestID + result, err := s.Query(ctx, q, WithPeer(host2.ID()), WithRequestID([]byte("requestID"))) + require.NoError(t, err) + require.True(t, proto.Equal(msg, result.Messages[0])) + + // Save cursor to use it in query with cursor option + c := result.Cursor() + + // Test WithCursor + result, err = s.Query(ctx, q, WithPeer(host2.ID()), WithCursor(c)) + require.NoError(t, err) + require.True(t, proto.Equal(msg, result.Messages[0])) + + // Test WithFastestPeerSelection + _, err = s.Query(ctx, q, WithFastestPeerSelection()) + require.NoError(t, err) + require.True(t, proto.Equal(msg, result.Messages[0])) + + emptyPubSubTopicQuery := Query{ + PubsubTopic: "", + ContentTopics: []string{contentTopic}, + StartTime: nil, + EndTime: nil, + } + + // Test empty PubSubTopic provided in Query + result, err = s.Query(ctx, emptyPubSubTopicQuery, WithPeer(host2.ID())) + require.NoError(t, err) + require.True(t, proto.Equal(msg, result.Messages[0])) + +} diff --git a/waku/v2/protocol/store/waku_store_protocol_test.go b/waku/v2/protocol/store/waku_store_protocol_test.go index 5b67ed72..10ada3de 100644 --- a/waku/v2/protocol/store/waku_store_protocol_test.go +++ b/waku/v2/protocol/store/waku_store_protocol_test.go @@ -2,6 +2,9 @@ package store import ( "context" + "database/sql" + "github.com/waku-org/go-waku/waku/persistence" + "github.com/waku-org/go-waku/waku/persistence/sqlite" "testing" "time" @@ -366,3 +369,59 @@ func TestWakuStoreProtocolFind(t *testing.T) { require.NoError(t, err) require.Nil(t, foundMsg) } + +func TestWakuStoreStart(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + host, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) + require.NoError(t, err) + + messageProvider := MemoryDB(t) + + s := NewWakuStore(messageProvider, nil, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) + s.SetHost(host) + + pubSubTopic := "/waku/2/go/store/test" + contentTopic := "/test/2/my-app" + + msg := &pb.WakuMessage{ + Payload: []byte{1, 2, 3}, + ContentTopic: contentTopic, + Timestamp: utils.GetUnixEpoch(), + } + + // Simulate a message has been received via relay protocol + sub := SimulateSubscription([]*protocol.Envelope{protocol.NewEnvelope(msg, *utils.GetUnixEpoch(), pubSubTopic)}) + + // Store has nil message provider -> Start should return nil/no error + s.msgProvider = nil + err = s.Start(ctx, sub) + require.NoError(t, err) + s.Stop() + + // Start again already started store -> Start should return nil/no error + s.msgProvider = messageProvider + err = s.Start(ctx, sub) + require.NoError(t, err) + err = s.Start(ctx, sub) + require.NoError(t, err) + defer s.Stop() + + // Store Start cannot start its message provider -> return error + var brokenDB *sql.DB + brokenDB, err = sqlite.NewDB("sqlite:///no.db", utils.Logger()) + require.NoError(t, err) + + dbStore, err := persistence.NewDBStore(prometheus.DefaultRegisterer, utils.Logger(), persistence.WithDB(brokenDB), + persistence.WithRetentionPolicy(10, 0)) + require.NoError(t, err) + + s2 := NewWakuStore(dbStore, nil, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) + s2.SetHost(host) + + err = s2.Start(ctx, sub) + require.Error(t, err) + defer s2.Stop() + +} diff --git a/waku/v2/protocol/store/waku_store_query_test.go b/waku/v2/protocol/store/waku_store_query_test.go index dce36dd1..2d775953 100644 --- a/waku/v2/protocol/store/waku_store_query_test.go +++ b/waku/v2/protocol/store/waku_store_query_test.go @@ -248,3 +248,48 @@ func TestTemporalHistoryQueries(t *testing.T) { require.Len(t, response.Messages, 0) } + +func TestSetMessageProvider(t *testing.T) { + pubSubTopic := "/waku/2/go/store/test" + contentTopic := "/test/2/my-app" + contentTopic2 := "/test/2/my-app2" + + msg := tests.CreateWakuMessage(contentTopic, utils.GetUnixEpoch()) + msg2 := tests.CreateWakuMessage(contentTopic2, utils.GetUnixEpoch()) + + msgProvider := MemoryDB(t) + msgProvider2 := MemoryDB(t) + + s := NewWakuStore(msgProvider, nil, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) + _ = s.storeMessage(protocol.NewEnvelope(msg, *utils.GetUnixEpoch(), pubSubTopic)) + + s2 := NewWakuStore(msgProvider2, nil, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) + _ = s2.storeMessage(protocol.NewEnvelope(msg2, *utils.GetUnixEpoch(), pubSubTopic)) + + // Swap providers -> messages should follow regardless of the store object values + s.SetMessageProvider(msgProvider2) + s2.SetMessageProvider(msgProvider) + + response := s.FindMessages(&pb.HistoryQuery{ + ContentFilters: []*pb.ContentFilter{ + { + ContentTopic: contentTopic2, + }, + }, + }) + + require.Len(t, response.Messages, 1) + require.True(t, proto.Equal(msg2, response.Messages[0])) + + response2 := s2.FindMessages(&pb.HistoryQuery{ + ContentFilters: []*pb.ContentFilter{ + { + ContentTopic: contentTopic, + }, + }, + }) + + require.Len(t, response2.Messages, 1) + require.True(t, proto.Equal(msg, response2.Messages[0])) + +}