diff --git a/waku/persistence/store.go b/waku/persistence/store.go index 77446e0c..8022d9d2 100644 --- a/waku/persistence/store.go +++ b/waku/persistence/store.go @@ -5,15 +5,27 @@ import ( "log" "github.com/status-im/go-waku/waku/v2/protocol/pb" - "github.com/status-im/go-waku/waku/v2/protocol/store" ) +type MessageProvider interface { + GetAll() ([]StoredMessage, error) + Put(cursor *pb.Index, pubsubTopic string, message *pb.WakuMessage) error + Stop() +} + // DBStore is a MessageProvider that has a *sql.DB connection type DBStore struct { - store.MessageProvider + MessageProvider db *sql.DB } +type StoredMessage struct { + ID []byte + PubsubTopic string + ReceiverTime float64 + Message *pb.WakuMessage +} + // DBOption is an optional setting that can be used to configure the DBStore type DBOption func(*DBStore) error @@ -92,13 +104,13 @@ func (d *DBStore) Put(cursor *pb.Index, pubsubTopic string, message *pb.WakuMess } // Returns all the stored WakuMessages -func (d *DBStore) GetAll() ([]store.StoredMessage, error) { +func (d *DBStore) GetAll() ([]StoredMessage, error) { rows, err := d.db.Query("SELECT id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version FROM message ORDER BY senderTimestamp ASC") if err != nil { return nil, err } - var result []store.StoredMessage + var result []StoredMessage defer rows.Close() @@ -122,7 +134,7 @@ func (d *DBStore) GetAll() ([]store.StoredMessage, error) { msg.Timestamp = senderTimestamp msg.Version = version - record := store.StoredMessage{ + record := StoredMessage{ ID: id, PubsubTopic: pubsubTopic, ReceiverTime: receiverTimestamp, diff --git a/waku/v2/protocol/store/waku_store.go b/waku/v2/protocol/store/waku_store.go index 2f428ead..978babe0 100644 --- a/waku/v2/protocol/store/waku_store.go +++ b/waku/v2/protocol/store/waku_store.go @@ -20,6 +20,7 @@ import ( "go.opencensus.io/stats" "go.opencensus.io/tag" + "github.com/status-im/go-waku/waku/persistence" "github.com/status-im/go-waku/waku/v2/metrics" "github.com/status-im/go-waku/waku/v2/protocol" "github.com/status-im/go-waku/waku/v2/protocol/pb" @@ -51,6 +52,10 @@ func minOf(vars ...int) int { } func paginateWithIndex(list []IndexedWakuMessage, pinfo *pb.PagingInfo) (resMessages []IndexedWakuMessage, resPagingInfo *pb.PagingInfo) { + if pinfo == nil { + pinfo = new(pb.PagingInfo) + } + // takes list, and performs paging based on pinfo // returns the page i.e, a sequence of IndexedWakuMessage and the new paging info to be used for the next paging request cursor := pinfo.Cursor @@ -179,15 +184,8 @@ func (w *WakuStore) FindMessages(query *pb.HistoryQuery) *pb.HistoryResponse { return result } -type StoredMessage struct { - ID []byte - PubsubTopic string - ReceiverTime float64 - Message *pb.WakuMessage -} - type MessageProvider interface { - GetAll() ([]StoredMessage, error) + GetAll() ([]persistence.StoredMessage, error) Put(cursor *pb.Index, pubsubTopic string, message *pb.WakuMessage) error Stop() } @@ -243,6 +241,12 @@ func (store *WakuStore) Start(ctx context.Context, h host.Host) { return } + store.fetchDBRecords(ctx) + + log.Info("Store protocol started") +} + +func (store *WakuStore) fetchDBRecords(ctx context.Context) { storedMessages, err := store.msgProvider.GetAll() if err != nil { log.Error("could not load DBProvider messages", err) @@ -265,8 +269,6 @@ func (store *WakuStore) Start(ctx context.Context, h host.Host) { log.Error("failed to record with tags") } } - - log.Info("Store protocol started") } func (store *WakuStore) storeMessageWithIndex(pubsubTopic string, idx *pb.Index, msg *pb.WakuMessage) { @@ -526,6 +528,10 @@ func (store *WakuStore) Query(ctx context.Context, q *pb.HistoryQuery, opts ...H return nil, ErrInvalidId } + if q.PagingInfo == nil { + q.PagingInfo = &pb.PagingInfo{} + } + if params.cursor != nil { q.PagingInfo.Cursor = params.cursor } diff --git a/waku/v2/protocol/store/waku_store_pagination_test.go b/waku/v2/protocol/store/waku_store_pagination_test.go new file mode 100644 index 00000000..895042dc --- /dev/null +++ b/waku/v2/protocol/store/waku_store_pagination_test.go @@ -0,0 +1,278 @@ +package store + +import ( + "sort" + "testing" + + "github.com/status-im/go-waku/waku/v2/protocol/pb" + "github.com/status-im/go-waku/waku/v2/utils" + "github.com/stretchr/testify/require" +) + +func TestIndexComputation(t *testing.T) { + msg := &pb.WakuMessage{ + Payload: []byte{1, 2, 3}, + Timestamp: utils.GetUnixEpoch(), + } + + idx, err := computeIndex(msg) + require.NoError(t, err) + require.NotZero(t, idx.ReceiverTime) + require.Equal(t, msg.Timestamp, idx.SenderTime) + require.NotZero(t, idx.Digest) + require.Len(t, idx.Digest, 32) + + msg1 := &pb.WakuMessage{ + Payload: []byte{1, 2, 3}, + Timestamp: 123, + ContentTopic: "/waku/2/default-content/proto", + } + idx1, err := computeIndex(msg1) + require.NoError(t, err) + + msg2 := &pb.WakuMessage{ + Payload: []byte{1, 2, 3}, + Timestamp: 123, + ContentTopic: "/waku/2/default-content/proto", + } + idx2, err := computeIndex(msg2) + require.NoError(t, err) + + require.Equal(t, idx1.Digest, idx2.Digest) +} + +func TestIndexComparison(t *testing.T) { + + index1 := &pb.Index{ + ReceiverTime: 2, + SenderTime: 1, + Digest: []byte{1}, + } + + index2 := &pb.Index{ + ReceiverTime: 2, + SenderTime: 1, + Digest: []byte{2}, + } + + index3 := &pb.Index{ + ReceiverTime: 1, + SenderTime: 2, + Digest: []byte{3}, + } + + iwm1 := IndexedWakuMessage{index: index1} + iwm2 := IndexedWakuMessage{index: index2} + iwm3 := IndexedWakuMessage{index: index3} + + require.Equal(t, 0, indexComparison(index1, index1)) + require.Equal(t, -1, indexComparison(index1, index2)) + require.Equal(t, 1, indexComparison(index2, index1)) + require.Equal(t, -1, indexComparison(index1, index3)) + require.Equal(t, 1, indexComparison(index3, index1)) + + require.Equal(t, 0, indexedWakuMessageComparison(iwm1, iwm1)) + require.Equal(t, -1, indexedWakuMessageComparison(iwm1, iwm2)) + require.Equal(t, 1, indexedWakuMessageComparison(iwm2, iwm1)) + require.Equal(t, -1, indexedWakuMessageComparison(iwm1, iwm3)) + require.Equal(t, 1, indexedWakuMessageComparison(iwm3, iwm1)) + + sortingList := []IndexedWakuMessage{iwm3, iwm1, iwm2} + sort.Slice(sortingList, func(i, j int) bool { + return indexedWakuMessageComparison(sortingList[i], sortingList[j]) == -1 + }) + + require.Equal(t, iwm1, sortingList[0]) + require.Equal(t, iwm2, sortingList[1]) + require.Equal(t, iwm3, sortingList[2]) +} + +func createSampleList(s int) []IndexedWakuMessage { + var result []IndexedWakuMessage + for i := 0; i < s; i++ { + result = append(result, IndexedWakuMessage{ + msg: &pb.WakuMessage{ + Payload: []byte{byte(i)}, + }, + index: &pb.Index{ + ReceiverTime: float64(i), + SenderTime: float64(i), + Digest: []byte{1}, + }, + }) + } + return result +} + +func TestFindIndex(t *testing.T) { + msgList := createSampleList(10) + require.Equal(t, 3, findIndex(msgList, msgList[3].index)) + require.Equal(t, -1, findIndex(msgList, &pb.Index{})) +} + +func TestForwardPagination(t *testing.T) { + msgList := createSampleList(10) + + // test for a normal pagination + pagingInfo := &pb.PagingInfo{PageSize: 2, Cursor: msgList[3].index, Direction: pb.PagingInfo_FORWARD} + messages, newPagingInfo := paginateWithoutIndex(msgList, pagingInfo) + require.Len(t, messages, 2) + require.Equal(t, []*pb.WakuMessage{msgList[4].msg, msgList[5].msg}, messages) + require.Equal(t, msgList[5].index, newPagingInfo.Cursor) + require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction) + require.Equal(t, pagingInfo.PageSize, newPagingInfo.PageSize) + + // test for an initial pagination request with an empty cursor + pagingInfo = &pb.PagingInfo{PageSize: 2, Direction: pb.PagingInfo_FORWARD} + messages, newPagingInfo = paginateWithoutIndex(msgList, pagingInfo) + require.Len(t, messages, 2) + require.Equal(t, []*pb.WakuMessage{msgList[0].msg, msgList[1].msg}, messages) + require.Equal(t, msgList[1].index, newPagingInfo.Cursor) + require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction) + require.Equal(t, pagingInfo.PageSize, newPagingInfo.PageSize) + + // test for an initial pagination request with an empty cursor to fetch the entire history + pagingInfo = &pb.PagingInfo{PageSize: 13, Direction: pb.PagingInfo_FORWARD} + messages, newPagingInfo = paginateWithoutIndex(msgList, pagingInfo) + require.Len(t, messages, 10) + require.Equal(t, msgList[9].msg, messages[9]) + require.Equal(t, msgList[9].index, newPagingInfo.Cursor) + require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction) + require.Equal(t, uint64(10), newPagingInfo.PageSize) + + // test for an empty msgList + pagingInfo = &pb.PagingInfo{PageSize: 2, Direction: pb.PagingInfo_FORWARD} + var msgList2 []IndexedWakuMessage + messages, newPagingInfo = paginateWithoutIndex(msgList2, pagingInfo) + require.Len(t, messages, 0) + require.Equal(t, pagingInfo.Cursor, newPagingInfo.Cursor) + require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction) + require.Equal(t, uint64(0), newPagingInfo.PageSize) + + // test for a page size larger than the remaining messages + pagingInfo = &pb.PagingInfo{PageSize: 10, Cursor: msgList[3].index, Direction: pb.PagingInfo_FORWARD} + messages, newPagingInfo = paginateWithoutIndex(msgList, pagingInfo) + require.Len(t, messages, 6) + require.Equal(t, []*pb.WakuMessage{msgList[4].msg, msgList[5].msg, msgList[6].msg, msgList[7].msg, msgList[8].msg, msgList[9].msg}, messages) + require.Equal(t, msgList[9].index, newPagingInfo.Cursor) + require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction) + require.Equal(t, uint64(6), newPagingInfo.PageSize) + + // test for a page size larger than the maximum allowed page size + pagingInfo = &pb.PagingInfo{PageSize: MaxPageSize + 1, Cursor: msgList[3].index, Direction: pb.PagingInfo_FORWARD} + messages, newPagingInfo = paginateWithoutIndex(msgList, pagingInfo) + require.True(t, len(messages) <= MaxPageSize) + require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction) + require.True(t, newPagingInfo.PageSize <= MaxPageSize) + + // test for a cursor pointing to the end of the message list + pagingInfo = &pb.PagingInfo{PageSize: 10, Cursor: msgList[9].index, Direction: pb.PagingInfo_FORWARD} + messages, newPagingInfo = paginateWithoutIndex(msgList, pagingInfo) + require.Len(t, messages, 0) + require.Equal(t, msgList[9].index, newPagingInfo.Cursor) + require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction) + require.Equal(t, uint64(0), newPagingInfo.PageSize) + + // test for an invalid cursor + invalidIndex, err := computeIndex(&pb.WakuMessage{Payload: []byte{255, 255, 255}}) + require.NoError(t, err) + pagingInfo = &pb.PagingInfo{PageSize: 10, Cursor: invalidIndex, Direction: pb.PagingInfo_FORWARD} + messages, newPagingInfo = paginateWithoutIndex(msgList, pagingInfo) + require.Len(t, messages, 0) + require.Equal(t, pagingInfo.Cursor, newPagingInfo.Cursor) + require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction) + require.Equal(t, uint64(0), newPagingInfo.PageSize) + + // test initial paging query over a message list with one message + singleItemMsgList := msgList[0:1] + pagingInfo = &pb.PagingInfo{PageSize: 10, Direction: pb.PagingInfo_FORWARD} + messages, newPagingInfo = paginateWithoutIndex(singleItemMsgList, pagingInfo) + require.Len(t, messages, 1) + require.Equal(t, msgList[0].index, newPagingInfo.Cursor) + require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction) + require.Equal(t, uint64(1), newPagingInfo.PageSize) +} + +func TestBackwardPagination(t *testing.T) { + msgList := createSampleList(10) + + // test for a normal pagination + pagingInfo := &pb.PagingInfo{PageSize: 2, Cursor: msgList[3].index, Direction: pb.PagingInfo_BACKWARD} + messages, newPagingInfo := paginateWithoutIndex(msgList, pagingInfo) + require.Len(t, messages, 2) + require.Equal(t, []*pb.WakuMessage{msgList[1].msg, msgList[2].msg}, messages) + require.Equal(t, msgList[1].index, newPagingInfo.Cursor) + require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction) + require.Equal(t, pagingInfo.PageSize, newPagingInfo.PageSize) + + // test for an initial pagination request with an empty cursor + pagingInfo = &pb.PagingInfo{PageSize: 2, Direction: pb.PagingInfo_BACKWARD} + messages, newPagingInfo = paginateWithoutIndex(msgList, pagingInfo) + require.Len(t, messages, 2) + require.Equal(t, []*pb.WakuMessage{msgList[8].msg, msgList[9].msg}, messages) + require.Equal(t, msgList[8].index, newPagingInfo.Cursor) + require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction) + require.Equal(t, pagingInfo.PageSize, newPagingInfo.PageSize) + + // test for an initial pagination request with an empty cursor to fetch the entire history + pagingInfo = &pb.PagingInfo{PageSize: 13, Direction: pb.PagingInfo_BACKWARD} + messages, newPagingInfo = paginateWithoutIndex(msgList, pagingInfo) + require.Len(t, messages, 10) + require.Equal(t, msgList[0].msg, messages[0]) + require.Equal(t, msgList[9].msg, messages[9]) + require.Equal(t, msgList[0].index, newPagingInfo.Cursor) + require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction) + require.Equal(t, uint64(10), newPagingInfo.PageSize) + + // test for an empty msgList + pagingInfo = &pb.PagingInfo{PageSize: 2, Direction: pb.PagingInfo_BACKWARD} + var msgList2 []IndexedWakuMessage + messages, newPagingInfo = paginateWithoutIndex(msgList2, pagingInfo) + require.Len(t, messages, 0) + require.Equal(t, pagingInfo.Cursor, newPagingInfo.Cursor) + require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction) + require.Equal(t, uint64(0), newPagingInfo.PageSize) + + // test for a page size larger than the remaining messages + pagingInfo = &pb.PagingInfo{PageSize: 5, Cursor: msgList[3].index, Direction: pb.PagingInfo_BACKWARD} + messages, newPagingInfo = paginateWithoutIndex(msgList, pagingInfo) + require.Len(t, messages, 3) + require.Equal(t, []*pb.WakuMessage{msgList[0].msg, msgList[1].msg, msgList[2].msg}, messages) + require.Equal(t, msgList[0].index, newPagingInfo.Cursor) + require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction) + require.Equal(t, uint64(3), newPagingInfo.PageSize) + + // test for a page size larger than the maximum allowed page size + pagingInfo = &pb.PagingInfo{PageSize: MaxPageSize + 1, Cursor: msgList[3].index, Direction: pb.PagingInfo_BACKWARD} + messages, newPagingInfo = paginateWithoutIndex(msgList, pagingInfo) + require.True(t, len(messages) <= MaxPageSize) + require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction) + require.True(t, newPagingInfo.PageSize <= MaxPageSize) + + // test for a cursor pointing to the beginning of the message list + pagingInfo = &pb.PagingInfo{PageSize: 10, Cursor: msgList[0].index, Direction: pb.PagingInfo_BACKWARD} + messages, newPagingInfo = paginateWithoutIndex(msgList, pagingInfo) + require.Len(t, messages, 0) + require.Equal(t, msgList[0].index, newPagingInfo.Cursor) + require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction) + require.Equal(t, uint64(0), newPagingInfo.PageSize) + + // test for an invalid cursor + invalidIndex, err := computeIndex(&pb.WakuMessage{Payload: []byte{255, 255, 255}}) + require.NoError(t, err) + pagingInfo = &pb.PagingInfo{PageSize: 10, Cursor: invalidIndex, Direction: pb.PagingInfo_BACKWARD} + messages, newPagingInfo = paginateWithoutIndex(msgList, pagingInfo) + require.Len(t, messages, 0) + require.Equal(t, pagingInfo.Cursor, newPagingInfo.Cursor) + require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction) + require.Equal(t, uint64(0), newPagingInfo.PageSize) + + // test initial paging query over a message list with one message + singleItemMsgList := msgList[0:1] + pagingInfo = &pb.PagingInfo{PageSize: 10, Direction: pb.PagingInfo_BACKWARD} + messages, newPagingInfo = paginateWithoutIndex(singleItemMsgList, pagingInfo) + require.Len(t, messages, 1) + require.Equal(t, msgList[0].index, newPagingInfo.Cursor) + require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction) + require.Equal(t, uint64(1), newPagingInfo.PageSize) +} diff --git a/waku/v2/protocol/store/waku_store_persistence_test.go b/waku/v2/protocol/store/waku_store_persistence_test.go new file mode 100644 index 00000000..791ea681 --- /dev/null +++ b/waku/v2/protocol/store/waku_store_persistence_test.go @@ -0,0 +1,45 @@ +package store + +import ( + "context" + "database/sql" + "testing" + + "github.com/status-im/go-waku/waku/persistence" + "github.com/status-im/go-waku/waku/persistence/sqlite" + "github.com/status-im/go-waku/waku/v2/protocol/pb" + "github.com/status-im/go-waku/waku/v2/utils" + "github.com/stretchr/testify/require" +) + +func TestStorePersistence(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var db *sql.DB + db, err := sqlite.NewDB(":memory:") + require.NoError(t, err) + + dbStore, err := persistence.NewDBStore(persistence.WithDB(db)) + require.NoError(t, err) + + s1 := NewWakuStore(true, dbStore) + s1.fetchDBRecords(ctx) + require.Len(t, s1.messages, 0) + + defaultPubSubTopic := "test" + defaultContentTopic := "1" + msg := &pb.WakuMessage{ + Payload: []byte{1, 2, 3}, + ContentTopic: defaultContentTopic, + Version: 0, + Timestamp: utils.GetUnixEpoch(), + } + + s1.storeMessage(defaultPubSubTopic, msg) + + s2 := NewWakuStore(true, dbStore) + s2.fetchDBRecords(ctx) + require.Len(t, s2.messages, 1) + require.Equal(t, msg, s2.messages[0].msg) +} diff --git a/waku/v2/protocol/store/waku_store_protocol_test.go b/waku/v2/protocol/store/waku_store_protocol_test.go new file mode 100644 index 00000000..3b7218ed --- /dev/null +++ b/waku/v2/protocol/store/waku_store_protocol_test.go @@ -0,0 +1,65 @@ +package store + +import ( + "context" + "testing" + + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/peerstore" + ma "github.com/multiformats/go-multiaddr" + "github.com/status-im/go-waku/waku/v2/protocol" + "github.com/status-im/go-waku/waku/v2/protocol/pb" + "github.com/status-im/go-waku/waku/v2/utils" + "github.com/stretchr/testify/require" +) + +func getHostAddress(ha host.Host) ma.Multiaddr { + return ha.Addrs()[0] +} + +func TestWakuStoreProtocolQuery(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) + require.NoError(t, err) + + s1 := NewWakuStore(true, nil) + s1.Start(ctx, host1) + defer s1.Stop() + + topic1 := "1" + pubsubTopic1 := "topic1" + + msg := &pb.WakuMessage{ + Payload: []byte{1, 2, 3}, + ContentTopic: topic1, + Version: 0, + Timestamp: utils.GetUnixEpoch(), + } + host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) + require.NoError(t, err) + + // Simulate a message has been received via relay protocol + s1.MsgC <- protocol.NewEnvelope(msg, pubsubTopic1) + + s2 := NewWakuStore(false, nil) + s2.Start(ctx, host2) + defer s2.Stop() + + host2.Peerstore().AddAddr(host1.ID(), getHostAddress(host1), peerstore.PermanentAddrTTL) + err = host2.Peerstore().AddProtocols(host1.ID(), string(StoreID_v20beta3)) + require.NoError(t, err) + + response, err := s2.Query(ctx, &pb.HistoryQuery{ + PubsubTopic: pubsubTopic1, + ContentFilters: []*pb.ContentFilter{{ + ContentTopic: topic1, + }}, + }, DefaultOptions()...) + + require.NoError(t, err) + require.Len(t, response.Messages, 1) + require.Equal(t, msg, response.Messages[0]) +} diff --git a/waku/v2/protocol/store/waku_store_query_test.go b/waku/v2/protocol/store/waku_store_query_test.go new file mode 100644 index 00000000..56f3b0da --- /dev/null +++ b/waku/v2/protocol/store/waku_store_query_test.go @@ -0,0 +1,281 @@ +package store + +import ( + "testing" + + "github.com/status-im/go-waku/waku/v2/protocol/pb" + "github.com/status-im/go-waku/waku/v2/utils" + "github.com/stretchr/testify/require" +) + +func TestStoreQuery(t *testing.T) { + defaultPubSubTopic := "test" + defaultContentTopic := "1" + + msg := &pb.WakuMessage{ + Payload: []byte{1, 2, 3}, + ContentTopic: defaultContentTopic, + Version: 0, + Timestamp: utils.GetUnixEpoch(), + } + + msg2 := &pb.WakuMessage{ + Payload: []byte{1, 2, 3}, + ContentTopic: "2", + Version: 0, + Timestamp: utils.GetUnixEpoch(), + } + + s := NewWakuStore(true, nil) + s.storeMessage(defaultPubSubTopic, msg) + s.storeMessage(defaultPubSubTopic, msg2) + + response := s.FindMessages(&pb.HistoryQuery{ + ContentFilters: []*pb.ContentFilter{ + { + ContentTopic: defaultContentTopic, + }, + }, + }) + + require.Len(t, response.Messages, 1) + require.Equal(t, msg, response.Messages[0]) +} + +func TestStoreQueryMultipleContentFilters(t *testing.T) { + defaultPubSubTopic := "test" + topic1 := "1" + topic2 := "2" + topic3 := "3" + + msg := &pb.WakuMessage{ + Payload: []byte{1, 2, 3}, + ContentTopic: topic1, + Version: 0, + Timestamp: utils.GetUnixEpoch(), + } + + msg2 := &pb.WakuMessage{ + Payload: []byte{1, 2, 3}, + ContentTopic: topic2, + Version: 0, + Timestamp: utils.GetUnixEpoch(), + } + + msg3 := &pb.WakuMessage{ + Payload: []byte{1, 2, 3}, + ContentTopic: topic3, + Version: 0, + Timestamp: utils.GetUnixEpoch(), + } + + s := NewWakuStore(true, nil) + s.storeMessage(defaultPubSubTopic, msg) + s.storeMessage(defaultPubSubTopic, msg2) + s.storeMessage(defaultPubSubTopic, msg3) + + response := s.FindMessages(&pb.HistoryQuery{ + ContentFilters: []*pb.ContentFilter{ + { + ContentTopic: topic1, + }, + { + ContentTopic: topic3, + }, + }, + }) + + require.Len(t, response.Messages, 2) + require.Contains(t, response.Messages, msg) + require.Contains(t, response.Messages, msg3) + require.NotContains(t, response.Messages, msg2) +} + +func TestStoreQueryPubsubTopicFilter(t *testing.T) { + topic1 := "1" + topic2 := "2" + topic3 := "3" + pubsubTopic1 := "topic1" + pubsubTopic2 := "topic2" + + msg := &pb.WakuMessage{ + Payload: []byte{1, 2, 3}, + ContentTopic: topic1, + Version: 0, + Timestamp: utils.GetUnixEpoch(), + } + + msg2 := &pb.WakuMessage{ + Payload: []byte{1, 2, 3}, + ContentTopic: topic2, + Version: 0, + Timestamp: utils.GetUnixEpoch(), + } + + msg3 := &pb.WakuMessage{ + Payload: []byte{1, 2, 3}, + ContentTopic: topic3, + Version: 0, + Timestamp: utils.GetUnixEpoch(), + } + + s := NewWakuStore(true, nil) + s.storeMessage(pubsubTopic1, msg) + s.storeMessage(pubsubTopic2, msg2) + s.storeMessage(pubsubTopic2, msg3) + + response := s.FindMessages(&pb.HistoryQuery{ + PubsubTopic: pubsubTopic1, + ContentFilters: []*pb.ContentFilter{ + { + ContentTopic: topic1, + }, + { + ContentTopic: topic3, + }, + }, + }) + + require.Len(t, response.Messages, 1) + require.Equal(t, msg, response.Messages[0]) +} + +func TestStoreQueryPubsubTopicNoMatch(t *testing.T) { + topic1 := "1" + topic2 := "2" + topic3 := "3" + pubsubTopic1 := "topic1" + pubsubTopic2 := "topic2" + + msg := &pb.WakuMessage{ + Payload: []byte{1, 2, 3}, + ContentTopic: topic1, + Version: 0, + Timestamp: utils.GetUnixEpoch(), + } + + msg2 := &pb.WakuMessage{ + Payload: []byte{1, 2, 3}, + ContentTopic: topic2, + Version: 0, + Timestamp: utils.GetUnixEpoch(), + } + + msg3 := &pb.WakuMessage{ + Payload: []byte{1, 2, 3}, + ContentTopic: topic3, + Version: 0, + Timestamp: utils.GetUnixEpoch(), + } + + s := NewWakuStore(true, nil) + s.storeMessage(pubsubTopic2, msg) + s.storeMessage(pubsubTopic2, msg2) + s.storeMessage(pubsubTopic2, msg3) + + response := s.FindMessages(&pb.HistoryQuery{ + PubsubTopic: pubsubTopic1, + }) + + require.Len(t, response.Messages, 0) +} + +func TestStoreQueryPubsubTopicAllMessages(t *testing.T) { + topic1 := "1" + topic2 := "2" + topic3 := "3" + pubsubTopic1 := "topic1" + + msg := &pb.WakuMessage{ + Payload: []byte{1, 2, 3}, + ContentTopic: topic1, + Version: 0, + Timestamp: utils.GetUnixEpoch(), + } + + msg2 := &pb.WakuMessage{ + Payload: []byte{1, 2, 3}, + ContentTopic: topic2, + Version: 0, + Timestamp: utils.GetUnixEpoch(), + } + + msg3 := &pb.WakuMessage{ + Payload: []byte{1, 2, 3}, + ContentTopic: topic3, + Version: 0, + Timestamp: utils.GetUnixEpoch(), + } + + s := NewWakuStore(true, nil) + s.storeMessage(pubsubTopic1, msg) + s.storeMessage(pubsubTopic1, msg2) + s.storeMessage(pubsubTopic1, msg3) + + response := s.FindMessages(&pb.HistoryQuery{ + PubsubTopic: pubsubTopic1, + }) + + require.Len(t, response.Messages, 3) + require.Contains(t, response.Messages, msg) + require.Contains(t, response.Messages, msg2) + require.Contains(t, response.Messages, msg3) +} + +func TestStoreQueryForwardPagination(t *testing.T) { + topic1 := "1" + pubsubTopic1 := "topic1" + + s := NewWakuStore(true, nil) + for i := 0; i < 10; i++ { + msg := &pb.WakuMessage{ + Payload: []byte{byte(i)}, + ContentTopic: topic1, + Version: 0, + Timestamp: utils.GetUnixEpoch(), + } + s.storeMessage(pubsubTopic1, msg) + + } + + response := s.FindMessages(&pb.HistoryQuery{ + PubsubTopic: pubsubTopic1, + PagingInfo: &pb.PagingInfo{ + Direction: pb.PagingInfo_FORWARD, + }, + }) + + require.Len(t, response.Messages, 10) + for i := 0; i < 10; i++ { + require.Equal(t, byte(i), response.Messages[i].Payload[0]) + } +} + +func TestStoreQueryBackwardPagination(t *testing.T) { + topic1 := "1" + pubsubTopic1 := "topic1" + + s := NewWakuStore(true, nil) + for i := 0; i < 10; i++ { + msg := &pb.WakuMessage{ + Payload: []byte{byte(i)}, + ContentTopic: topic1, + Version: 0, + Timestamp: utils.GetUnixEpoch(), + } + s.storeMessage(pubsubTopic1, msg) + + } + + response := s.FindMessages(&pb.HistoryQuery{ + PubsubTopic: pubsubTopic1, + PagingInfo: &pb.PagingInfo{ + Direction: pb.PagingInfo_FORWARD, + }, + }) + + require.Len(t, response.Messages, 10) + for i := 9; i >= 0; i-- { + require.Equal(t, byte(i), response.Messages[i].Payload[0]) + } +}