test: store

This commit is contained in:
Richard Ramos 2021-10-25 15:41:08 -04:00 committed by GitHub
parent ce49f29c08
commit 8253e381df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 702 additions and 15 deletions

View File

@ -5,15 +5,27 @@ import (
"log" "log"
"github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/status-im/go-waku/waku/v2/protocol/store"
) )
type MessageProvider interface {
GetAll() ([]StoredMessage, error)
Put(cursor *pb.Index, pubsubTopic string, message *pb.WakuMessage) error
Stop()
}
// DBStore is a MessageProvider that has a *sql.DB connection // DBStore is a MessageProvider that has a *sql.DB connection
type DBStore struct { type DBStore struct {
store.MessageProvider MessageProvider
db *sql.DB db *sql.DB
} }
type StoredMessage struct {
ID []byte
PubsubTopic string
ReceiverTime float64
Message *pb.WakuMessage
}
// DBOption is an optional setting that can be used to configure the DBStore // DBOption is an optional setting that can be used to configure the DBStore
type DBOption func(*DBStore) error type DBOption func(*DBStore) error
@ -92,13 +104,13 @@ func (d *DBStore) Put(cursor *pb.Index, pubsubTopic string, message *pb.WakuMess
} }
// Returns all the stored WakuMessages // Returns all the stored WakuMessages
func (d *DBStore) GetAll() ([]store.StoredMessage, error) { func (d *DBStore) GetAll() ([]StoredMessage, error) {
rows, err := d.db.Query("SELECT id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version FROM message ORDER BY senderTimestamp ASC") rows, err := d.db.Query("SELECT id, receiverTimestamp, senderTimestamp, contentTopic, pubsubTopic, payload, version FROM message ORDER BY senderTimestamp ASC")
if err != nil { if err != nil {
return nil, err return nil, err
} }
var result []store.StoredMessage var result []StoredMessage
defer rows.Close() defer rows.Close()
@ -122,7 +134,7 @@ func (d *DBStore) GetAll() ([]store.StoredMessage, error) {
msg.Timestamp = senderTimestamp msg.Timestamp = senderTimestamp
msg.Version = version msg.Version = version
record := store.StoredMessage{ record := StoredMessage{
ID: id, ID: id,
PubsubTopic: pubsubTopic, PubsubTopic: pubsubTopic,
ReceiverTime: receiverTimestamp, ReceiverTime: receiverTimestamp,

View File

@ -20,6 +20,7 @@ import (
"go.opencensus.io/stats" "go.opencensus.io/stats"
"go.opencensus.io/tag" "go.opencensus.io/tag"
"github.com/status-im/go-waku/waku/persistence"
"github.com/status-im/go-waku/waku/v2/metrics" "github.com/status-im/go-waku/waku/v2/metrics"
"github.com/status-im/go-waku/waku/v2/protocol" "github.com/status-im/go-waku/waku/v2/protocol"
"github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/protocol/pb"
@ -51,6 +52,10 @@ func minOf(vars ...int) int {
} }
func paginateWithIndex(list []IndexedWakuMessage, pinfo *pb.PagingInfo) (resMessages []IndexedWakuMessage, resPagingInfo *pb.PagingInfo) { func paginateWithIndex(list []IndexedWakuMessage, pinfo *pb.PagingInfo) (resMessages []IndexedWakuMessage, resPagingInfo *pb.PagingInfo) {
if pinfo == nil {
pinfo = new(pb.PagingInfo)
}
// takes list, and performs paging based on pinfo // takes list, and performs paging based on pinfo
// returns the page i.e, a sequence of IndexedWakuMessage and the new paging info to be used for the next paging request // returns the page i.e, a sequence of IndexedWakuMessage and the new paging info to be used for the next paging request
cursor := pinfo.Cursor cursor := pinfo.Cursor
@ -179,15 +184,8 @@ func (w *WakuStore) FindMessages(query *pb.HistoryQuery) *pb.HistoryResponse {
return result return result
} }
type StoredMessage struct {
ID []byte
PubsubTopic string
ReceiverTime float64
Message *pb.WakuMessage
}
type MessageProvider interface { type MessageProvider interface {
GetAll() ([]StoredMessage, error) GetAll() ([]persistence.StoredMessage, error)
Put(cursor *pb.Index, pubsubTopic string, message *pb.WakuMessage) error Put(cursor *pb.Index, pubsubTopic string, message *pb.WakuMessage) error
Stop() Stop()
} }
@ -243,6 +241,12 @@ func (store *WakuStore) Start(ctx context.Context, h host.Host) {
return return
} }
store.fetchDBRecords(ctx)
log.Info("Store protocol started")
}
func (store *WakuStore) fetchDBRecords(ctx context.Context) {
storedMessages, err := store.msgProvider.GetAll() storedMessages, err := store.msgProvider.GetAll()
if err != nil { if err != nil {
log.Error("could not load DBProvider messages", err) log.Error("could not load DBProvider messages", err)
@ -265,8 +269,6 @@ func (store *WakuStore) Start(ctx context.Context, h host.Host) {
log.Error("failed to record with tags") log.Error("failed to record with tags")
} }
} }
log.Info("Store protocol started")
} }
func (store *WakuStore) storeMessageWithIndex(pubsubTopic string, idx *pb.Index, msg *pb.WakuMessage) { func (store *WakuStore) storeMessageWithIndex(pubsubTopic string, idx *pb.Index, msg *pb.WakuMessage) {
@ -526,6 +528,10 @@ func (store *WakuStore) Query(ctx context.Context, q *pb.HistoryQuery, opts ...H
return nil, ErrInvalidId return nil, ErrInvalidId
} }
if q.PagingInfo == nil {
q.PagingInfo = &pb.PagingInfo{}
}
if params.cursor != nil { if params.cursor != nil {
q.PagingInfo.Cursor = params.cursor q.PagingInfo.Cursor = params.cursor
} }

View File

@ -0,0 +1,278 @@
package store
import (
"sort"
"testing"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/status-im/go-waku/waku/v2/utils"
"github.com/stretchr/testify/require"
)
func TestIndexComputation(t *testing.T) {
msg := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
Timestamp: utils.GetUnixEpoch(),
}
idx, err := computeIndex(msg)
require.NoError(t, err)
require.NotZero(t, idx.ReceiverTime)
require.Equal(t, msg.Timestamp, idx.SenderTime)
require.NotZero(t, idx.Digest)
require.Len(t, idx.Digest, 32)
msg1 := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
Timestamp: 123,
ContentTopic: "/waku/2/default-content/proto",
}
idx1, err := computeIndex(msg1)
require.NoError(t, err)
msg2 := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
Timestamp: 123,
ContentTopic: "/waku/2/default-content/proto",
}
idx2, err := computeIndex(msg2)
require.NoError(t, err)
require.Equal(t, idx1.Digest, idx2.Digest)
}
func TestIndexComparison(t *testing.T) {
index1 := &pb.Index{
ReceiverTime: 2,
SenderTime: 1,
Digest: []byte{1},
}
index2 := &pb.Index{
ReceiverTime: 2,
SenderTime: 1,
Digest: []byte{2},
}
index3 := &pb.Index{
ReceiverTime: 1,
SenderTime: 2,
Digest: []byte{3},
}
iwm1 := IndexedWakuMessage{index: index1}
iwm2 := IndexedWakuMessage{index: index2}
iwm3 := IndexedWakuMessage{index: index3}
require.Equal(t, 0, indexComparison(index1, index1))
require.Equal(t, -1, indexComparison(index1, index2))
require.Equal(t, 1, indexComparison(index2, index1))
require.Equal(t, -1, indexComparison(index1, index3))
require.Equal(t, 1, indexComparison(index3, index1))
require.Equal(t, 0, indexedWakuMessageComparison(iwm1, iwm1))
require.Equal(t, -1, indexedWakuMessageComparison(iwm1, iwm2))
require.Equal(t, 1, indexedWakuMessageComparison(iwm2, iwm1))
require.Equal(t, -1, indexedWakuMessageComparison(iwm1, iwm3))
require.Equal(t, 1, indexedWakuMessageComparison(iwm3, iwm1))
sortingList := []IndexedWakuMessage{iwm3, iwm1, iwm2}
sort.Slice(sortingList, func(i, j int) bool {
return indexedWakuMessageComparison(sortingList[i], sortingList[j]) == -1
})
require.Equal(t, iwm1, sortingList[0])
require.Equal(t, iwm2, sortingList[1])
require.Equal(t, iwm3, sortingList[2])
}
func createSampleList(s int) []IndexedWakuMessage {
var result []IndexedWakuMessage
for i := 0; i < s; i++ {
result = append(result, IndexedWakuMessage{
msg: &pb.WakuMessage{
Payload: []byte{byte(i)},
},
index: &pb.Index{
ReceiverTime: float64(i),
SenderTime: float64(i),
Digest: []byte{1},
},
})
}
return result
}
func TestFindIndex(t *testing.T) {
msgList := createSampleList(10)
require.Equal(t, 3, findIndex(msgList, msgList[3].index))
require.Equal(t, -1, findIndex(msgList, &pb.Index{}))
}
func TestForwardPagination(t *testing.T) {
msgList := createSampleList(10)
// test for a normal pagination
pagingInfo := &pb.PagingInfo{PageSize: 2, Cursor: msgList[3].index, Direction: pb.PagingInfo_FORWARD}
messages, newPagingInfo := paginateWithoutIndex(msgList, pagingInfo)
require.Len(t, messages, 2)
require.Equal(t, []*pb.WakuMessage{msgList[4].msg, msgList[5].msg}, messages)
require.Equal(t, msgList[5].index, newPagingInfo.Cursor)
require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction)
require.Equal(t, pagingInfo.PageSize, newPagingInfo.PageSize)
// test for an initial pagination request with an empty cursor
pagingInfo = &pb.PagingInfo{PageSize: 2, Direction: pb.PagingInfo_FORWARD}
messages, newPagingInfo = paginateWithoutIndex(msgList, pagingInfo)
require.Len(t, messages, 2)
require.Equal(t, []*pb.WakuMessage{msgList[0].msg, msgList[1].msg}, messages)
require.Equal(t, msgList[1].index, newPagingInfo.Cursor)
require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction)
require.Equal(t, pagingInfo.PageSize, newPagingInfo.PageSize)
// test for an initial pagination request with an empty cursor to fetch the entire history
pagingInfo = &pb.PagingInfo{PageSize: 13, Direction: pb.PagingInfo_FORWARD}
messages, newPagingInfo = paginateWithoutIndex(msgList, pagingInfo)
require.Len(t, messages, 10)
require.Equal(t, msgList[9].msg, messages[9])
require.Equal(t, msgList[9].index, newPagingInfo.Cursor)
require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction)
require.Equal(t, uint64(10), newPagingInfo.PageSize)
// test for an empty msgList
pagingInfo = &pb.PagingInfo{PageSize: 2, Direction: pb.PagingInfo_FORWARD}
var msgList2 []IndexedWakuMessage
messages, newPagingInfo = paginateWithoutIndex(msgList2, pagingInfo)
require.Len(t, messages, 0)
require.Equal(t, pagingInfo.Cursor, newPagingInfo.Cursor)
require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction)
require.Equal(t, uint64(0), newPagingInfo.PageSize)
// test for a page size larger than the remaining messages
pagingInfo = &pb.PagingInfo{PageSize: 10, Cursor: msgList[3].index, Direction: pb.PagingInfo_FORWARD}
messages, newPagingInfo = paginateWithoutIndex(msgList, pagingInfo)
require.Len(t, messages, 6)
require.Equal(t, []*pb.WakuMessage{msgList[4].msg, msgList[5].msg, msgList[6].msg, msgList[7].msg, msgList[8].msg, msgList[9].msg}, messages)
require.Equal(t, msgList[9].index, newPagingInfo.Cursor)
require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction)
require.Equal(t, uint64(6), newPagingInfo.PageSize)
// test for a page size larger than the maximum allowed page size
pagingInfo = &pb.PagingInfo{PageSize: MaxPageSize + 1, Cursor: msgList[3].index, Direction: pb.PagingInfo_FORWARD}
messages, newPagingInfo = paginateWithoutIndex(msgList, pagingInfo)
require.True(t, len(messages) <= MaxPageSize)
require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction)
require.True(t, newPagingInfo.PageSize <= MaxPageSize)
// test for a cursor pointing to the end of the message list
pagingInfo = &pb.PagingInfo{PageSize: 10, Cursor: msgList[9].index, Direction: pb.PagingInfo_FORWARD}
messages, newPagingInfo = paginateWithoutIndex(msgList, pagingInfo)
require.Len(t, messages, 0)
require.Equal(t, msgList[9].index, newPagingInfo.Cursor)
require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction)
require.Equal(t, uint64(0), newPagingInfo.PageSize)
// test for an invalid cursor
invalidIndex, err := computeIndex(&pb.WakuMessage{Payload: []byte{255, 255, 255}})
require.NoError(t, err)
pagingInfo = &pb.PagingInfo{PageSize: 10, Cursor: invalidIndex, Direction: pb.PagingInfo_FORWARD}
messages, newPagingInfo = paginateWithoutIndex(msgList, pagingInfo)
require.Len(t, messages, 0)
require.Equal(t, pagingInfo.Cursor, newPagingInfo.Cursor)
require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction)
require.Equal(t, uint64(0), newPagingInfo.PageSize)
// test initial paging query over a message list with one message
singleItemMsgList := msgList[0:1]
pagingInfo = &pb.PagingInfo{PageSize: 10, Direction: pb.PagingInfo_FORWARD}
messages, newPagingInfo = paginateWithoutIndex(singleItemMsgList, pagingInfo)
require.Len(t, messages, 1)
require.Equal(t, msgList[0].index, newPagingInfo.Cursor)
require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction)
require.Equal(t, uint64(1), newPagingInfo.PageSize)
}
func TestBackwardPagination(t *testing.T) {
msgList := createSampleList(10)
// test for a normal pagination
pagingInfo := &pb.PagingInfo{PageSize: 2, Cursor: msgList[3].index, Direction: pb.PagingInfo_BACKWARD}
messages, newPagingInfo := paginateWithoutIndex(msgList, pagingInfo)
require.Len(t, messages, 2)
require.Equal(t, []*pb.WakuMessage{msgList[1].msg, msgList[2].msg}, messages)
require.Equal(t, msgList[1].index, newPagingInfo.Cursor)
require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction)
require.Equal(t, pagingInfo.PageSize, newPagingInfo.PageSize)
// test for an initial pagination request with an empty cursor
pagingInfo = &pb.PagingInfo{PageSize: 2, Direction: pb.PagingInfo_BACKWARD}
messages, newPagingInfo = paginateWithoutIndex(msgList, pagingInfo)
require.Len(t, messages, 2)
require.Equal(t, []*pb.WakuMessage{msgList[8].msg, msgList[9].msg}, messages)
require.Equal(t, msgList[8].index, newPagingInfo.Cursor)
require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction)
require.Equal(t, pagingInfo.PageSize, newPagingInfo.PageSize)
// test for an initial pagination request with an empty cursor to fetch the entire history
pagingInfo = &pb.PagingInfo{PageSize: 13, Direction: pb.PagingInfo_BACKWARD}
messages, newPagingInfo = paginateWithoutIndex(msgList, pagingInfo)
require.Len(t, messages, 10)
require.Equal(t, msgList[0].msg, messages[0])
require.Equal(t, msgList[9].msg, messages[9])
require.Equal(t, msgList[0].index, newPagingInfo.Cursor)
require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction)
require.Equal(t, uint64(10), newPagingInfo.PageSize)
// test for an empty msgList
pagingInfo = &pb.PagingInfo{PageSize: 2, Direction: pb.PagingInfo_BACKWARD}
var msgList2 []IndexedWakuMessage
messages, newPagingInfo = paginateWithoutIndex(msgList2, pagingInfo)
require.Len(t, messages, 0)
require.Equal(t, pagingInfo.Cursor, newPagingInfo.Cursor)
require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction)
require.Equal(t, uint64(0), newPagingInfo.PageSize)
// test for a page size larger than the remaining messages
pagingInfo = &pb.PagingInfo{PageSize: 5, Cursor: msgList[3].index, Direction: pb.PagingInfo_BACKWARD}
messages, newPagingInfo = paginateWithoutIndex(msgList, pagingInfo)
require.Len(t, messages, 3)
require.Equal(t, []*pb.WakuMessage{msgList[0].msg, msgList[1].msg, msgList[2].msg}, messages)
require.Equal(t, msgList[0].index, newPagingInfo.Cursor)
require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction)
require.Equal(t, uint64(3), newPagingInfo.PageSize)
// test for a page size larger than the maximum allowed page size
pagingInfo = &pb.PagingInfo{PageSize: MaxPageSize + 1, Cursor: msgList[3].index, Direction: pb.PagingInfo_BACKWARD}
messages, newPagingInfo = paginateWithoutIndex(msgList, pagingInfo)
require.True(t, len(messages) <= MaxPageSize)
require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction)
require.True(t, newPagingInfo.PageSize <= MaxPageSize)
// test for a cursor pointing to the beginning of the message list
pagingInfo = &pb.PagingInfo{PageSize: 10, Cursor: msgList[0].index, Direction: pb.PagingInfo_BACKWARD}
messages, newPagingInfo = paginateWithoutIndex(msgList, pagingInfo)
require.Len(t, messages, 0)
require.Equal(t, msgList[0].index, newPagingInfo.Cursor)
require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction)
require.Equal(t, uint64(0), newPagingInfo.PageSize)
// test for an invalid cursor
invalidIndex, err := computeIndex(&pb.WakuMessage{Payload: []byte{255, 255, 255}})
require.NoError(t, err)
pagingInfo = &pb.PagingInfo{PageSize: 10, Cursor: invalidIndex, Direction: pb.PagingInfo_BACKWARD}
messages, newPagingInfo = paginateWithoutIndex(msgList, pagingInfo)
require.Len(t, messages, 0)
require.Equal(t, pagingInfo.Cursor, newPagingInfo.Cursor)
require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction)
require.Equal(t, uint64(0), newPagingInfo.PageSize)
// test initial paging query over a message list with one message
singleItemMsgList := msgList[0:1]
pagingInfo = &pb.PagingInfo{PageSize: 10, Direction: pb.PagingInfo_BACKWARD}
messages, newPagingInfo = paginateWithoutIndex(singleItemMsgList, pagingInfo)
require.Len(t, messages, 1)
require.Equal(t, msgList[0].index, newPagingInfo.Cursor)
require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction)
require.Equal(t, uint64(1), newPagingInfo.PageSize)
}

View File

@ -0,0 +1,45 @@
package store
import (
"context"
"database/sql"
"testing"
"github.com/status-im/go-waku/waku/persistence"
"github.com/status-im/go-waku/waku/persistence/sqlite"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/status-im/go-waku/waku/v2/utils"
"github.com/stretchr/testify/require"
)
func TestStorePersistence(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var db *sql.DB
db, err := sqlite.NewDB(":memory:")
require.NoError(t, err)
dbStore, err := persistence.NewDBStore(persistence.WithDB(db))
require.NoError(t, err)
s1 := NewWakuStore(true, dbStore)
s1.fetchDBRecords(ctx)
require.Len(t, s1.messages, 0)
defaultPubSubTopic := "test"
defaultContentTopic := "1"
msg := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: defaultContentTopic,
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
s1.storeMessage(defaultPubSubTopic, msg)
s2 := NewWakuStore(true, dbStore)
s2.fetchDBRecords(ctx)
require.Len(t, s2.messages, 1)
require.Equal(t, msg, s2.messages[0].msg)
}

View File

@ -0,0 +1,65 @@
package store
import (
"context"
"testing"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peerstore"
ma "github.com/multiformats/go-multiaddr"
"github.com/status-im/go-waku/waku/v2/protocol"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/status-im/go-waku/waku/v2/utils"
"github.com/stretchr/testify/require"
)
func getHostAddress(ha host.Host) ma.Multiaddr {
return ha.Addrs()[0]
}
func TestWakuStoreProtocolQuery(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s1 := NewWakuStore(true, nil)
s1.Start(ctx, host1)
defer s1.Stop()
topic1 := "1"
pubsubTopic1 := "topic1"
msg := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: topic1,
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
// Simulate a message has been received via relay protocol
s1.MsgC <- protocol.NewEnvelope(msg, pubsubTopic1)
s2 := NewWakuStore(false, nil)
s2.Start(ctx, host2)
defer s2.Stop()
host2.Peerstore().AddAddr(host1.ID(), getHostAddress(host1), peerstore.PermanentAddrTTL)
err = host2.Peerstore().AddProtocols(host1.ID(), string(StoreID_v20beta3))
require.NoError(t, err)
response, err := s2.Query(ctx, &pb.HistoryQuery{
PubsubTopic: pubsubTopic1,
ContentFilters: []*pb.ContentFilter{{
ContentTopic: topic1,
}},
}, DefaultOptions()...)
require.NoError(t, err)
require.Len(t, response.Messages, 1)
require.Equal(t, msg, response.Messages[0])
}

View File

@ -0,0 +1,281 @@
package store
import (
"testing"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/status-im/go-waku/waku/v2/utils"
"github.com/stretchr/testify/require"
)
func TestStoreQuery(t *testing.T) {
defaultPubSubTopic := "test"
defaultContentTopic := "1"
msg := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: defaultContentTopic,
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
msg2 := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: "2",
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
s := NewWakuStore(true, nil)
s.storeMessage(defaultPubSubTopic, msg)
s.storeMessage(defaultPubSubTopic, msg2)
response := s.FindMessages(&pb.HistoryQuery{
ContentFilters: []*pb.ContentFilter{
{
ContentTopic: defaultContentTopic,
},
},
})
require.Len(t, response.Messages, 1)
require.Equal(t, msg, response.Messages[0])
}
func TestStoreQueryMultipleContentFilters(t *testing.T) {
defaultPubSubTopic := "test"
topic1 := "1"
topic2 := "2"
topic3 := "3"
msg := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: topic1,
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
msg2 := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: topic2,
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
msg3 := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: topic3,
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
s := NewWakuStore(true, nil)
s.storeMessage(defaultPubSubTopic, msg)
s.storeMessage(defaultPubSubTopic, msg2)
s.storeMessage(defaultPubSubTopic, msg3)
response := s.FindMessages(&pb.HistoryQuery{
ContentFilters: []*pb.ContentFilter{
{
ContentTopic: topic1,
},
{
ContentTopic: topic3,
},
},
})
require.Len(t, response.Messages, 2)
require.Contains(t, response.Messages, msg)
require.Contains(t, response.Messages, msg3)
require.NotContains(t, response.Messages, msg2)
}
func TestStoreQueryPubsubTopicFilter(t *testing.T) {
topic1 := "1"
topic2 := "2"
topic3 := "3"
pubsubTopic1 := "topic1"
pubsubTopic2 := "topic2"
msg := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: topic1,
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
msg2 := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: topic2,
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
msg3 := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: topic3,
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
s := NewWakuStore(true, nil)
s.storeMessage(pubsubTopic1, msg)
s.storeMessage(pubsubTopic2, msg2)
s.storeMessage(pubsubTopic2, msg3)
response := s.FindMessages(&pb.HistoryQuery{
PubsubTopic: pubsubTopic1,
ContentFilters: []*pb.ContentFilter{
{
ContentTopic: topic1,
},
{
ContentTopic: topic3,
},
},
})
require.Len(t, response.Messages, 1)
require.Equal(t, msg, response.Messages[0])
}
func TestStoreQueryPubsubTopicNoMatch(t *testing.T) {
topic1 := "1"
topic2 := "2"
topic3 := "3"
pubsubTopic1 := "topic1"
pubsubTopic2 := "topic2"
msg := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: topic1,
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
msg2 := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: topic2,
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
msg3 := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: topic3,
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
s := NewWakuStore(true, nil)
s.storeMessage(pubsubTopic2, msg)
s.storeMessage(pubsubTopic2, msg2)
s.storeMessage(pubsubTopic2, msg3)
response := s.FindMessages(&pb.HistoryQuery{
PubsubTopic: pubsubTopic1,
})
require.Len(t, response.Messages, 0)
}
func TestStoreQueryPubsubTopicAllMessages(t *testing.T) {
topic1 := "1"
topic2 := "2"
topic3 := "3"
pubsubTopic1 := "topic1"
msg := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: topic1,
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
msg2 := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: topic2,
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
msg3 := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: topic3,
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
s := NewWakuStore(true, nil)
s.storeMessage(pubsubTopic1, msg)
s.storeMessage(pubsubTopic1, msg2)
s.storeMessage(pubsubTopic1, msg3)
response := s.FindMessages(&pb.HistoryQuery{
PubsubTopic: pubsubTopic1,
})
require.Len(t, response.Messages, 3)
require.Contains(t, response.Messages, msg)
require.Contains(t, response.Messages, msg2)
require.Contains(t, response.Messages, msg3)
}
func TestStoreQueryForwardPagination(t *testing.T) {
topic1 := "1"
pubsubTopic1 := "topic1"
s := NewWakuStore(true, nil)
for i := 0; i < 10; i++ {
msg := &pb.WakuMessage{
Payload: []byte{byte(i)},
ContentTopic: topic1,
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
s.storeMessage(pubsubTopic1, msg)
}
response := s.FindMessages(&pb.HistoryQuery{
PubsubTopic: pubsubTopic1,
PagingInfo: &pb.PagingInfo{
Direction: pb.PagingInfo_FORWARD,
},
})
require.Len(t, response.Messages, 10)
for i := 0; i < 10; i++ {
require.Equal(t, byte(i), response.Messages[i].Payload[0])
}
}
func TestStoreQueryBackwardPagination(t *testing.T) {
topic1 := "1"
pubsubTopic1 := "topic1"
s := NewWakuStore(true, nil)
for i := 0; i < 10; i++ {
msg := &pb.WakuMessage{
Payload: []byte{byte(i)},
ContentTopic: topic1,
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
s.storeMessage(pubsubTopic1, msg)
}
response := s.FindMessages(&pb.HistoryQuery{
PubsubTopic: pubsubTopic1,
PagingInfo: &pb.PagingInfo{
Direction: pb.PagingInfo_FORWARD,
},
})
require.Len(t, response.Messages, 10)
for i := 9; i >= 0; i-- {
require.Equal(t, byte(i), response.Messages[i].Payload[0])
}
}