From 3d8d435502f786366ca3e065b13143d61ebddb21 Mon Sep 17 00:00:00 2001 From: harsh jain Date: Tue, 19 Sep 2023 13:28:11 +0700 Subject: [PATCH] test(store): make queries (#752) * test(store): make queries * test: most recent timestamp and count of msg * nit: add comment for pageSize+1 --- waku/persistence/store.go | 10 ++-- waku/persistence/store_test.go | 90 ++++++++++++++++++++++++++++++++++ 2 files changed, 96 insertions(+), 4 deletions(-) diff --git a/waku/persistence/store.go b/waku/persistence/store.go index 81c40275..5f386ba2 100644 --- a/waku/persistence/store.go +++ b/waku/persistence/store.go @@ -410,6 +410,10 @@ func (d *DBStore) prepareQuerySQL(query *pb.HistoryQuery) (string, []interface{} paramCnt++ sqlQuery += fmt.Sprintf("LIMIT $%d", paramCnt) + // Always search for _max page size_ + 1. If the extra row does not exist, do not return pagination info. + pageSize := query.PagingInfo.PageSize + 1 + parameters = append(parameters, pageSize) + sqlQuery = fmt.Sprintf(sqlQuery, conditionStr, orderDirection, orderDirection, orderDirection, orderDirection) d.log.Info(fmt.Sprintf("sqlQuery: %s", sqlQuery)) @@ -434,10 +438,7 @@ func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, err return nil, nil, err } defer stmt.Close() - pageSize := query.PagingInfo.PageSize + 1 - - parameters = append(parameters, pageSize) - + // measurementStart := time.Now() rows, err := stmt.Query(parameters...) if err != nil { @@ -458,6 +459,7 @@ func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, err var cursor *pb.Index if len(result) != 0 { + // since there are more rows than pagingInfo.PageSize, we need to return a cursor, for pagination if len(result) > int(query.PagingInfo.PageSize) { result = result[0:query.PagingInfo.PageSize] lastMsgIdx := len(result) - 1 diff --git a/waku/persistence/store_test.go b/waku/persistence/store_test.go index 5d430ac7..e1470112 100644 --- a/waku/persistence/store_test.go +++ b/waku/persistence/store_test.go @@ -3,6 +3,7 @@ package persistence import ( "context" "database/sql" + "fmt" "testing" "time" @@ -14,6 +15,7 @@ import ( "github.com/waku-org/go-waku/waku/persistence/migrate" sqlitemigrations "github.com/waku-org/go-waku/waku/persistence/sqlite/migrations" "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" @@ -95,4 +97,92 @@ func TestStoreRetention(t *testing.T) { require.Equal(t, "test5", dbResults[0].Message.ContentTopic) require.Equal(t, "test6", dbResults[1].Message.ContentTopic) require.Equal(t, "test7", dbResults[2].Message.ContentTopic) + // checking the number of all the message in the db + msgCount, err := store.Count() + require.NoError(t, err) + require.Equal(t, msgCount, 3) +} + +func TestQuery(t *testing.T) { + db := NewMock() + store, err := NewDBStore(prometheus.DefaultRegisterer, utils.Logger(), WithDB(db), WithMigrations(Migrate), WithRetentionPolicy(5, 20*time.Second)) + require.NoError(t, err) + + insertTime := time.Now() + ////////////////////////////////// + _ = store.Put(protocol.NewEnvelope(tests.CreateWakuMessage("test1", insertTime.Add(-40*time.Second).UnixNano()), insertTime.Add(-10*time.Second).UnixNano(), "test")) + _ = store.Put(protocol.NewEnvelope(tests.CreateWakuMessage("test2", insertTime.Add(-30*time.Second).UnixNano()), insertTime.Add(-10*time.Second).UnixNano(), "test")) + _ = store.Put(protocol.NewEnvelope(tests.CreateWakuMessage("test3", insertTime.Add(-20*time.Second).UnixNano()), insertTime.Add(-10*time.Second).UnixNano(), "test")) + _ = store.Put(protocol.NewEnvelope(tests.CreateWakuMessage("test3", insertTime.Add(-20*time.Second).UnixNano()), insertTime.Add(-10*time.Second).UnixNano(), "test2")) + _ = store.Put(protocol.NewEnvelope(tests.CreateWakuMessage("test4", insertTime.Add(-10*time.Second).UnixNano()), insertTime.Add(-10*time.Second).UnixNano(), "test")) + + // Range [startTime-endTime] + // Check: matching ContentTopics and pubsubTopic, and ts of msg in range + // this filters test1,test2 contentTopics. test3 matches list of contentTopic but its not within the time range + cursor, msgs, err := store.Query(&pb.HistoryQuery{ + PubsubTopic: "test", + ContentFilters: []*pb.ContentFilter{ + {ContentTopic: "test1"}, + {ContentTopic: "test2"}, + {ContentTopic: "test3"}, + }, + PagingInfo: &pb.PagingInfo{PageSize: 10}, + StartTime: insertTime.Add(-41 * time.Second).UnixNano(), + EndTime: insertTime.Add(-21 * time.Second).UnixNano(), + }) + require.NoError(t, err) + require.Len(t, msgs, 2) + + _ = store.Put(protocol.NewEnvelope(tests.CreateWakuMessage("test5", insertTime.UnixNano()), insertTime.Add(-10*time.Second).UnixNano(), "test")) + + // Range [cursor-endTime] + // Check: matching ContentTopic,pubsubTopic, pageSize + // cursor has last message id of test2 which is now-30second + // endTime is now+1sec + // matched messages are both test4,test5, but the len of returned result depends on pageSize + var cursor2 *pb.Index + for _, pageSize := range []int{1, 2} { + cursorLocal, msgs, err := store.Query(&pb.HistoryQuery{ + PubsubTopic: "test", + ContentFilters: []*pb.ContentFilter{ + {ContentTopic: "test4"}, + {ContentTopic: "test5"}, + }, + PagingInfo: &pb.PagingInfo{Cursor: cursor, PageSize: uint64(pageSize), Direction: pb.PagingInfo_FORWARD}, + EndTime: insertTime.Add(1 * time.Second).UnixNano(), + }) + require.NoError(t, err) + require.Len(t, msgs, pageSize) // due to pageSize + require.Equal(t, msgs[0].Message.ContentTopic, "test4") + if len(msgs) > 1 { + require.Equal(t, msgs[1].Message.ContentTopic, "test5") + } + cursor2 = cursorLocal + } + + // range [startTime-cursor(test5_ContentTopic_Msg)], + // Check: backend range with cursor excludes test1 ContentTopic, matching ContentTopic + // check backward pagination + _, msgs, err = store.Query(&pb.HistoryQuery{ + PubsubTopic: "test", + ContentFilters: []*pb.ContentFilter{ + {ContentTopic: "test1"}, // contentTopic test1 is out of the range + {ContentTopic: "test2"}, + {ContentTopic: "test3"}, + {ContentTopic: "test4"}, + }, + PagingInfo: &pb.PagingInfo{Cursor: cursor2, PageSize: 4, Direction: pb.PagingInfo_BACKWARD}, + StartTime: insertTime.Add(-39 * time.Second).UnixNano(), + }) + require.NoError(t, err) + require.Len(t, msgs, 3) // due to pageSize + // Check:this also makes returned messages are sorted ascending + for ind, msg := range msgs { + require.Equal(t, msg.Message.ContentTopic, fmt.Sprintf("test%d", ind+2)) // test2,test3,test4 + } + + // checking most recent timestamp in db + timestamp, err := store.MostRecentTimestamp() + require.NoError(t, err) + require.Equal(t, timestamp, insertTime.UnixNano()) }