From 8ba64affba7d77310b4fe6d23fec5efa00469ef2 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Thu, 28 Oct 2021 09:03:23 -0400 Subject: [PATCH] test: resume and time based queries (#108) --- tests/utils.go | 5 + .../protocol/lightpush/waku_lightpush_test.go | 2 +- waku/v2/protocol/store/waku_resume_test.go | 146 ++++++++++++++ waku/v2/protocol/store/waku_store.go | 3 +- .../store/waku_store_persistence_test.go | 3 + .../protocol/store/waku_store_query_test.go | 182 +++++++----------- 6 files changed, 230 insertions(+), 111 deletions(-) create mode 100644 waku/v2/protocol/store/waku_resume_test.go diff --git a/tests/utils.go b/tests/utils.go index 481a3beb..2bdaf2ad 100644 --- a/tests/utils.go +++ b/tests/utils.go @@ -13,6 +13,7 @@ import ( "github.com/libp2p/go-libp2p-core/host" "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr" + "github.com/status-im/go-waku/waku/v2/protocol/pb" ) func GetHostAddress(ha host.Host) ma.Multiaddr { @@ -67,3 +68,7 @@ func MakeHost(ctx context.Context, port int, randomness io.Reader) (host.Host, e libp2p.Identity(prvKey), ) } + +func CreateWakuMessage(contentTopic string, timestamp float64) *pb.WakuMessage { + return &pb.WakuMessage{Payload: []byte{1, 2, 3}, ContentTopic: contentTopic, Version: 0, Timestamp: timestamp} +} diff --git a/waku/v2/protocol/lightpush/waku_lightpush_test.go b/waku/v2/protocol/lightpush/waku_lightpush_test.go index afda245c..d7da5894 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_test.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_test.go @@ -85,7 +85,7 @@ func TestWakuLightPush(t *testing.T) { req.PubsubTopic = string(testTopic) // Wait for the mesh connection to happen between node1 and node2 - time.Sleep(5 * time.Second) + time.Sleep(2 * time.Second) var wg sync.WaitGroup wg.Add(1) diff --git a/waku/v2/protocol/store/waku_resume_test.go b/waku/v2/protocol/store/waku_resume_test.go new file mode 100644 index 00000000..045383b3 --- /dev/null +++ b/waku/v2/protocol/store/waku_resume_test.go @@ -0,0 +1,146 @@ +package store + +import ( + "context" + "testing" + "time" + + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/peerstore" + "github.com/status-im/go-waku/tests" + "github.com/status-im/go-waku/waku/v2/protocol/pb" + "github.com/stretchr/testify/require" +) + +func TestFindLastSeenMessage(t *testing.T) { + msg1 := tests.CreateWakuMessage("1", 1) + msg2 := tests.CreateWakuMessage("2", 2) + msg3 := tests.CreateWakuMessage("3", 3) + msg4 := tests.CreateWakuMessage("4", 4) + msg5 := tests.CreateWakuMessage("5", 5) + + s := NewWakuStore(true, nil) + s.storeMessage("test", msg1) + s.storeMessage("test", msg3) + s.storeMessage("test", msg5) + s.storeMessage("test", msg2) + s.storeMessage("test", msg4) + + require.Equal(t, msg5.Timestamp, s.findLastSeen()) +} + +func TestResume(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) + require.NoError(t, err) + + s1 := NewWakuStore(true, nil) + s1.Start(ctx, host1) + defer s1.Stop() + + for i := 0; i < 10; i++ { + var contentTopic = "1" + if i%2 == 0 { + contentTopic = "2" + } + + msg := tests.CreateWakuMessage(contentTopic, float64(time.Duration(i)*time.Second)) + s1.storeMessage("test", msg) + } + + host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) + require.NoError(t, err) + + s2 := NewWakuStore(false, nil) + s2.Start(ctx, host2) + defer s2.Stop() + + host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL) + err = host2.Peerstore().AddProtocols(host1.ID(), string(StoreID_v20beta3)) + require.NoError(t, err) + + msgCount, err := s2.Resume(ctx, "test", []peer.ID{host1.ID()}) + + require.NoError(t, err) + require.Equal(t, 10, msgCount) + require.Len(t, s2.messages, 10) + + // Test duplication + msgCount, err = s2.Resume(ctx, "test", []peer.ID{host1.ID()}) + + require.NoError(t, err) + require.Equal(t, 0, msgCount) +} + +func TestResumeWithListOfPeers(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Host that does not support store protocol + invalidHost, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) + require.NoError(t, err) + + host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) + require.NoError(t, err) + + s1 := NewWakuStore(true, nil) + s1.Start(ctx, host1) + defer s1.Stop() + + msg0 := &pb.WakuMessage{Payload: []byte{1, 2, 3}, ContentTopic: "2", Version: 0, Timestamp: float64(0 * time.Second)} + + s1.storeMessage("test", msg0) + + host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) + require.NoError(t, err) + + s2 := NewWakuStore(false, nil) + s2.Start(ctx, host2) + defer s2.Stop() + + host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL) + err = host2.Peerstore().AddProtocols(host1.ID(), string(StoreID_v20beta3)) + require.NoError(t, err) + + msgCount, err := s2.Resume(ctx, "test", []peer.ID{invalidHost.ID(), host1.ID()}) + + require.NoError(t, err) + require.Equal(t, 1, msgCount) + require.Len(t, s2.messages, 1) +} + +func TestResumeWithoutSpecifyingPeer(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) + require.NoError(t, err) + + s1 := NewWakuStore(true, nil) + s1.Start(ctx, host1) + defer s1.Stop() + + msg0 := &pb.WakuMessage{Payload: []byte{1, 2, 3}, ContentTopic: "2", Version: 0, Timestamp: float64(0 * time.Second)} + + s1.storeMessage("test", msg0) + + host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) + require.NoError(t, err) + + s2 := NewWakuStore(false, nil) + s2.Start(ctx, host2) + defer s2.Stop() + + host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL) + err = host2.Peerstore().AddProtocols(host1.ID(), string(StoreID_v20beta3)) + require.NoError(t, err) + + msgCount, err := s2.Resume(ctx, "test", []peer.ID{}) + + require.NoError(t, err) + require.Equal(t, 1, msgCount) + require.Len(t, s2.messages, 1) +} diff --git a/waku/v2/protocol/store/waku_store.go b/waku/v2/protocol/store/waku_store.go index 978babe0..62c77d9b 100644 --- a/waku/v2/protocol/store/waku_store.go +++ b/waku/v2/protocol/store/waku_store.go @@ -552,9 +552,10 @@ func (store *WakuStore) queryLoop(ctx context.Context, query *pb.HistoryQuery, c // returns the number of retrieved messages, or error if all the requests fail for _, peer := range candidateList { result, err := store.queryFrom(ctx, query, peer, protocol.GenerateRequestId()) - if err != nil { + if err == nil { return result, nil } + log.Error(fmt.Errorf("resume history with peer %s failed: %w", peer, err)) } return nil, ErrFailedQuery diff --git a/waku/v2/protocol/store/waku_store_persistence_test.go b/waku/v2/protocol/store/waku_store_persistence_test.go index 791ea681..b7267ae9 100644 --- a/waku/v2/protocol/store/waku_store_persistence_test.go +++ b/waku/v2/protocol/store/waku_store_persistence_test.go @@ -42,4 +42,7 @@ func TestStorePersistence(t *testing.T) { s2.fetchDBRecords(ctx) require.Len(t, s2.messages, 1) require.Equal(t, msg, s2.messages[0].msg) + + // Storing a duplicated message should not crash. It's okay to generate an error log in this case + s1.storeMessage(defaultPubSubTopic, msg) } diff --git a/waku/v2/protocol/store/waku_store_query_test.go b/waku/v2/protocol/store/waku_store_query_test.go index 56f3b0da..f5b00568 100644 --- a/waku/v2/protocol/store/waku_store_query_test.go +++ b/waku/v2/protocol/store/waku_store_query_test.go @@ -3,6 +3,7 @@ package store import ( "testing" + "github.com/status-im/go-waku/tests" "github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/utils" "github.com/stretchr/testify/require" @@ -12,22 +13,11 @@ func TestStoreQuery(t *testing.T) { defaultPubSubTopic := "test" defaultContentTopic := "1" - msg := &pb.WakuMessage{ - Payload: []byte{1, 2, 3}, - ContentTopic: defaultContentTopic, - Version: 0, - Timestamp: utils.GetUnixEpoch(), - } - - msg2 := &pb.WakuMessage{ - Payload: []byte{1, 2, 3}, - ContentTopic: "2", - Version: 0, - Timestamp: utils.GetUnixEpoch(), - } + msg1 := tests.CreateWakuMessage(defaultContentTopic, utils.GetUnixEpoch()) + msg2 := tests.CreateWakuMessage("2", utils.GetUnixEpoch()) s := NewWakuStore(true, nil) - s.storeMessage(defaultPubSubTopic, msg) + s.storeMessage(defaultPubSubTopic, msg1) s.storeMessage(defaultPubSubTopic, msg2) response := s.FindMessages(&pb.HistoryQuery{ @@ -39,7 +29,7 @@ func TestStoreQuery(t *testing.T) { }) require.Len(t, response.Messages, 1) - require.Equal(t, msg, response.Messages[0]) + require.Equal(t, msg1, response.Messages[0]) } func TestStoreQueryMultipleContentFilters(t *testing.T) { @@ -48,29 +38,13 @@ func TestStoreQueryMultipleContentFilters(t *testing.T) { topic2 := "2" topic3 := "3" - msg := &pb.WakuMessage{ - Payload: []byte{1, 2, 3}, - ContentTopic: topic1, - Version: 0, - Timestamp: utils.GetUnixEpoch(), - } - - msg2 := &pb.WakuMessage{ - Payload: []byte{1, 2, 3}, - ContentTopic: topic2, - Version: 0, - Timestamp: utils.GetUnixEpoch(), - } - - msg3 := &pb.WakuMessage{ - Payload: []byte{1, 2, 3}, - ContentTopic: topic3, - Version: 0, - Timestamp: utils.GetUnixEpoch(), - } + msg1 := tests.CreateWakuMessage(topic1, utils.GetUnixEpoch()) + msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) + msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) s := NewWakuStore(true, nil) - s.storeMessage(defaultPubSubTopic, msg) + + s.storeMessage(defaultPubSubTopic, msg1) s.storeMessage(defaultPubSubTopic, msg2) s.storeMessage(defaultPubSubTopic, msg3) @@ -86,7 +60,7 @@ func TestStoreQueryMultipleContentFilters(t *testing.T) { }) require.Len(t, response.Messages, 2) - require.Contains(t, response.Messages, msg) + require.Contains(t, response.Messages, msg1) require.Contains(t, response.Messages, msg3) require.NotContains(t, response.Messages, msg2) } @@ -98,29 +72,12 @@ func TestStoreQueryPubsubTopicFilter(t *testing.T) { pubsubTopic1 := "topic1" pubsubTopic2 := "topic2" - msg := &pb.WakuMessage{ - Payload: []byte{1, 2, 3}, - ContentTopic: topic1, - Version: 0, - Timestamp: utils.GetUnixEpoch(), - } - - msg2 := &pb.WakuMessage{ - Payload: []byte{1, 2, 3}, - ContentTopic: topic2, - Version: 0, - Timestamp: utils.GetUnixEpoch(), - } - - msg3 := &pb.WakuMessage{ - Payload: []byte{1, 2, 3}, - ContentTopic: topic3, - Version: 0, - Timestamp: utils.GetUnixEpoch(), - } + msg1 := tests.CreateWakuMessage(topic1, utils.GetUnixEpoch()) + msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) + msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) s := NewWakuStore(true, nil) - s.storeMessage(pubsubTopic1, msg) + s.storeMessage(pubsubTopic1, msg1) s.storeMessage(pubsubTopic2, msg2) s.storeMessage(pubsubTopic2, msg3) @@ -137,7 +94,7 @@ func TestStoreQueryPubsubTopicFilter(t *testing.T) { }) require.Len(t, response.Messages, 1) - require.Equal(t, msg, response.Messages[0]) + require.Equal(t, msg1, response.Messages[0]) } func TestStoreQueryPubsubTopicNoMatch(t *testing.T) { @@ -147,29 +104,12 @@ func TestStoreQueryPubsubTopicNoMatch(t *testing.T) { pubsubTopic1 := "topic1" pubsubTopic2 := "topic2" - msg := &pb.WakuMessage{ - Payload: []byte{1, 2, 3}, - ContentTopic: topic1, - Version: 0, - Timestamp: utils.GetUnixEpoch(), - } - - msg2 := &pb.WakuMessage{ - Payload: []byte{1, 2, 3}, - ContentTopic: topic2, - Version: 0, - Timestamp: utils.GetUnixEpoch(), - } - - msg3 := &pb.WakuMessage{ - Payload: []byte{1, 2, 3}, - ContentTopic: topic3, - Version: 0, - Timestamp: utils.GetUnixEpoch(), - } + msg1 := tests.CreateWakuMessage(topic1, utils.GetUnixEpoch()) + msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) + msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) s := NewWakuStore(true, nil) - s.storeMessage(pubsubTopic2, msg) + s.storeMessage(pubsubTopic2, msg1) s.storeMessage(pubsubTopic2, msg2) s.storeMessage(pubsubTopic2, msg3) @@ -186,29 +126,12 @@ func TestStoreQueryPubsubTopicAllMessages(t *testing.T) { topic3 := "3" pubsubTopic1 := "topic1" - msg := &pb.WakuMessage{ - Payload: []byte{1, 2, 3}, - ContentTopic: topic1, - Version: 0, - Timestamp: utils.GetUnixEpoch(), - } - - msg2 := &pb.WakuMessage{ - Payload: []byte{1, 2, 3}, - ContentTopic: topic2, - Version: 0, - Timestamp: utils.GetUnixEpoch(), - } - - msg3 := &pb.WakuMessage{ - Payload: []byte{1, 2, 3}, - ContentTopic: topic3, - Version: 0, - Timestamp: utils.GetUnixEpoch(), - } + msg1 := tests.CreateWakuMessage(topic1, utils.GetUnixEpoch()) + msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) + msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) s := NewWakuStore(true, nil) - s.storeMessage(pubsubTopic1, msg) + s.storeMessage(pubsubTopic1, msg1) s.storeMessage(pubsubTopic1, msg2) s.storeMessage(pubsubTopic1, msg3) @@ -217,7 +140,7 @@ func TestStoreQueryPubsubTopicAllMessages(t *testing.T) { }) require.Len(t, response.Messages, 3) - require.Contains(t, response.Messages, msg) + require.Contains(t, response.Messages, msg1) require.Contains(t, response.Messages, msg2) require.Contains(t, response.Messages, msg3) } @@ -228,14 +151,9 @@ func TestStoreQueryForwardPagination(t *testing.T) { s := NewWakuStore(true, nil) for i := 0; i < 10; i++ { - msg := &pb.WakuMessage{ - Payload: []byte{byte(i)}, - ContentTopic: topic1, - Version: 0, - Timestamp: utils.GetUnixEpoch(), - } + msg := tests.CreateWakuMessage(topic1, utils.GetUnixEpoch()) + msg.Payload = []byte{byte(i)} s.storeMessage(pubsubTopic1, msg) - } response := s.FindMessages(&pb.HistoryQuery{ @@ -279,3 +197,49 @@ func TestStoreQueryBackwardPagination(t *testing.T) { require.Equal(t, byte(i), response.Messages[i].Payload[0]) } } + +func TestTemporalHistoryQueries(t *testing.T) { + s := NewWakuStore(true, nil) + + var messages []*pb.WakuMessage + for i := 0; i < 10; i++ { + contentTopic := "1" + if i%2 == 0 { + contentTopic = "2" + } + msg := tests.CreateWakuMessage(contentTopic, float64(i)) + s.storeMessage("test", msg) + messages = append(messages, msg) + } + + // handle temporal history query with a valid time window + response := s.FindMessages(&pb.HistoryQuery{ + ContentFilters: []*pb.ContentFilter{{ContentTopic: "1"}}, + StartTime: float64(2), + EndTime: float64(5), + }) + + require.Len(t, response.Messages, 2) + require.Equal(t, messages[3].Timestamp, response.Messages[0].Timestamp) + require.Equal(t, messages[5].Timestamp, response.Messages[1].Timestamp) + + // handle temporal history query with a zero-size time window + response = s.FindMessages(&pb.HistoryQuery{ + ContentFilters: []*pb.ContentFilter{{ContentTopic: "1"}}, + StartTime: float64(2), + EndTime: float64(2), + }) + + require.Len(t, response.Messages, 0) + + // handle temporal history query with an invalid time window + response = s.FindMessages(&pb.HistoryQuery{ + ContentFilters: []*pb.ContentFilter{{ContentTopic: "1"}}, + StartTime: float64(5), + EndTime: float64(2), + }) + // time window is invalid since start time > end time + // perhaps it should return an error? + + require.Len(t, response.Messages, 0) +}