mirror of
https://github.com/status-im/go-waku.git
synced 2025-02-15 06:56:23 +00:00
test(store): make queries (#752)
* test(store): make queries * test: most recent timestamp and count of msg * nit: add comment for pageSize+1
This commit is contained in:
parent
9b05d48318
commit
3d8d435502
@ -410,6 +410,10 @@ func (d *DBStore) prepareQuerySQL(query *pb.HistoryQuery) (string, []interface{}
|
||||
paramCnt++
|
||||
|
||||
sqlQuery += fmt.Sprintf("LIMIT $%d", paramCnt)
|
||||
// Always search for _max page size_ + 1. If the extra row does not exist, do not return pagination info.
|
||||
pageSize := query.PagingInfo.PageSize + 1
|
||||
parameters = append(parameters, pageSize)
|
||||
|
||||
sqlQuery = fmt.Sprintf(sqlQuery, conditionStr, orderDirection, orderDirection, orderDirection, orderDirection)
|
||||
d.log.Info(fmt.Sprintf("sqlQuery: %s", sqlQuery))
|
||||
|
||||
@ -434,10 +438,7 @@ func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, err
|
||||
return nil, nil, err
|
||||
}
|
||||
defer stmt.Close()
|
||||
pageSize := query.PagingInfo.PageSize + 1
|
||||
|
||||
parameters = append(parameters, pageSize)
|
||||
|
||||
//
|
||||
measurementStart := time.Now()
|
||||
rows, err := stmt.Query(parameters...)
|
||||
if err != nil {
|
||||
@ -458,6 +459,7 @@ func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, err
|
||||
|
||||
var cursor *pb.Index
|
||||
if len(result) != 0 {
|
||||
// since there are more rows than pagingInfo.PageSize, we need to return a cursor, for pagination
|
||||
if len(result) > int(query.PagingInfo.PageSize) {
|
||||
result = result[0:query.PagingInfo.PageSize]
|
||||
lastMsgIdx := len(result) - 1
|
||||
|
@ -3,6 +3,7 @@ package persistence
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -14,6 +15,7 @@ import (
|
||||
"github.com/waku-org/go-waku/waku/persistence/migrate"
|
||||
sqlitemigrations "github.com/waku-org/go-waku/waku/persistence/sqlite/migrations"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
|
||||
"github.com/waku-org/go-waku/waku/v2/timesource"
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
"go.uber.org/zap"
|
||||
@ -95,4 +97,92 @@ func TestStoreRetention(t *testing.T) {
|
||||
require.Equal(t, "test5", dbResults[0].Message.ContentTopic)
|
||||
require.Equal(t, "test6", dbResults[1].Message.ContentTopic)
|
||||
require.Equal(t, "test7", dbResults[2].Message.ContentTopic)
|
||||
// checking the number of all the message in the db
|
||||
msgCount, err := store.Count()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, msgCount, 3)
|
||||
}
|
||||
|
||||
func TestQuery(t *testing.T) {
|
||||
db := NewMock()
|
||||
store, err := NewDBStore(prometheus.DefaultRegisterer, utils.Logger(), WithDB(db), WithMigrations(Migrate), WithRetentionPolicy(5, 20*time.Second))
|
||||
require.NoError(t, err)
|
||||
|
||||
insertTime := time.Now()
|
||||
//////////////////////////////////
|
||||
_ = store.Put(protocol.NewEnvelope(tests.CreateWakuMessage("test1", insertTime.Add(-40*time.Second).UnixNano()), insertTime.Add(-10*time.Second).UnixNano(), "test"))
|
||||
_ = store.Put(protocol.NewEnvelope(tests.CreateWakuMessage("test2", insertTime.Add(-30*time.Second).UnixNano()), insertTime.Add(-10*time.Second).UnixNano(), "test"))
|
||||
_ = store.Put(protocol.NewEnvelope(tests.CreateWakuMessage("test3", insertTime.Add(-20*time.Second).UnixNano()), insertTime.Add(-10*time.Second).UnixNano(), "test"))
|
||||
_ = store.Put(protocol.NewEnvelope(tests.CreateWakuMessage("test3", insertTime.Add(-20*time.Second).UnixNano()), insertTime.Add(-10*time.Second).UnixNano(), "test2"))
|
||||
_ = store.Put(protocol.NewEnvelope(tests.CreateWakuMessage("test4", insertTime.Add(-10*time.Second).UnixNano()), insertTime.Add(-10*time.Second).UnixNano(), "test"))
|
||||
|
||||
// Range [startTime-endTime]
|
||||
// Check: matching ContentTopics and pubsubTopic, and ts of msg in range
|
||||
// this filters test1,test2 contentTopics. test3 matches list of contentTopic but its not within the time range
|
||||
cursor, msgs, err := store.Query(&pb.HistoryQuery{
|
||||
PubsubTopic: "test",
|
||||
ContentFilters: []*pb.ContentFilter{
|
||||
{ContentTopic: "test1"},
|
||||
{ContentTopic: "test2"},
|
||||
{ContentTopic: "test3"},
|
||||
},
|
||||
PagingInfo: &pb.PagingInfo{PageSize: 10},
|
||||
StartTime: insertTime.Add(-41 * time.Second).UnixNano(),
|
||||
EndTime: insertTime.Add(-21 * time.Second).UnixNano(),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Len(t, msgs, 2)
|
||||
|
||||
_ = store.Put(protocol.NewEnvelope(tests.CreateWakuMessage("test5", insertTime.UnixNano()), insertTime.Add(-10*time.Second).UnixNano(), "test"))
|
||||
|
||||
// Range [cursor-endTime]
|
||||
// Check: matching ContentTopic,pubsubTopic, pageSize
|
||||
// cursor has last message id of test2 which is now-30second
|
||||
// endTime is now+1sec
|
||||
// matched messages are both test4,test5, but the len of returned result depends on pageSize
|
||||
var cursor2 *pb.Index
|
||||
for _, pageSize := range []int{1, 2} {
|
||||
cursorLocal, msgs, err := store.Query(&pb.HistoryQuery{
|
||||
PubsubTopic: "test",
|
||||
ContentFilters: []*pb.ContentFilter{
|
||||
{ContentTopic: "test4"},
|
||||
{ContentTopic: "test5"},
|
||||
},
|
||||
PagingInfo: &pb.PagingInfo{Cursor: cursor, PageSize: uint64(pageSize), Direction: pb.PagingInfo_FORWARD},
|
||||
EndTime: insertTime.Add(1 * time.Second).UnixNano(),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Len(t, msgs, pageSize) // due to pageSize
|
||||
require.Equal(t, msgs[0].Message.ContentTopic, "test4")
|
||||
if len(msgs) > 1 {
|
||||
require.Equal(t, msgs[1].Message.ContentTopic, "test5")
|
||||
}
|
||||
cursor2 = cursorLocal
|
||||
}
|
||||
|
||||
// range [startTime-cursor(test5_ContentTopic_Msg)],
|
||||
// Check: backend range with cursor excludes test1 ContentTopic, matching ContentTopic
|
||||
// check backward pagination
|
||||
_, msgs, err = store.Query(&pb.HistoryQuery{
|
||||
PubsubTopic: "test",
|
||||
ContentFilters: []*pb.ContentFilter{
|
||||
{ContentTopic: "test1"}, // contentTopic test1 is out of the range
|
||||
{ContentTopic: "test2"},
|
||||
{ContentTopic: "test3"},
|
||||
{ContentTopic: "test4"},
|
||||
},
|
||||
PagingInfo: &pb.PagingInfo{Cursor: cursor2, PageSize: 4, Direction: pb.PagingInfo_BACKWARD},
|
||||
StartTime: insertTime.Add(-39 * time.Second).UnixNano(),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Len(t, msgs, 3) // due to pageSize
|
||||
// Check:this also makes returned messages are sorted ascending
|
||||
for ind, msg := range msgs {
|
||||
require.Equal(t, msg.Message.ContentTopic, fmt.Sprintf("test%d", ind+2)) // test2,test3,test4
|
||||
}
|
||||
|
||||
// checking most recent timestamp in db
|
||||
timestamp, err := store.MostRecentTimestamp()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, timestamp, insertTime.UnixNano())
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user