diff --git a/waku/persistence/store.go b/waku/persistence/store.go index 464762bf..85bff55a 100644 --- a/waku/persistence/store.go +++ b/waku/persistence/store.go @@ -324,7 +324,9 @@ func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, err } defer stmt.Close() - parameters = append(parameters, query.PagingInfo.PageSize) + pageSize := query.PagingInfo.PageSize + 1 + + parameters = append(parameters, pageSize) rows, err := stmt.Query(parameters...) if err != nil { return nil, nil, err @@ -338,13 +340,15 @@ func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, err } result = append(result, record) } - defer rows.Close() cursor := &pb.Index{} if len(result) != 0 { - lastMsgIdx := len(result) - 1 - cursor = protocol.NewEnvelope(result[lastMsgIdx].Message, result[lastMsgIdx].ReceiverTime, result[lastMsgIdx].PubsubTopic).Index() + if len(result) > int(query.PagingInfo.PageSize) { + result = result[0:query.PagingInfo.PageSize] + lastMsgIdx := len(result) - 1 + cursor = protocol.NewEnvelope(result[lastMsgIdx].Message, result[lastMsgIdx].ReceiverTime, result[lastMsgIdx].PubsubTopic).Index() + } } // The retrieved messages list should always be in chronological order diff --git a/waku/v2/protocol/store/waku_store.go b/waku/v2/protocol/store/waku_store.go index 8f7f34fe..11743e3e 100644 --- a/waku/v2/protocol/store/waku_store.go +++ b/waku/v2/protocol/store/waku_store.go @@ -131,6 +131,10 @@ func (r *Result) Cursor() *pb.Index { return r.cursor } +func (r *Result) IsComplete() bool { + return len(r.cursor.Digest) == 0 +} + func (r *Result) PeerID() peer.ID { return r.peerId } @@ -480,6 +484,15 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR // This function is useful for iterating over results without having to manually // specify the cursor and pagination order and max number of results func (store *WakuStore) Next(ctx context.Context, r *Result) (*Result, error) { + if r.IsComplete() { + return &Result{ + Messages: []*pb.WakuMessage{}, + cursor: &pb.Index{}, + query: r.query, + peerId: r.PeerID(), + }, nil + } + q := &pb.HistoryQuery{ PubsubTopic: r.Query().PubsubTopic, ContentFilters: r.Query().ContentFilters, diff --git a/waku/v2/protocol/store/waku_store_pagination_test.go b/waku/v2/protocol/store/waku_store_pagination_test.go index c1bee493..2caa4d94 100644 --- a/waku/v2/protocol/store/waku_store_pagination_test.go +++ b/waku/v2/protocol/store/waku_store_pagination_test.go @@ -86,7 +86,7 @@ func TestForwardPagination(t *testing.T) { require.NoError(t, err) require.Len(t, messages, 10) require.Equal(t, msgList[9].Message(), messages[9]) - require.Equal(t, msgList[9].Index(), newPagingInfo.Cursor) + require.Equal(t, &pb.Index{}, newPagingInfo.Cursor) require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction) require.Equal(t, uint64(10), newPagingInfo.PageSize) @@ -105,7 +105,7 @@ func TestForwardPagination(t *testing.T) { require.NoError(t, err) require.Len(t, messages, 6) require.Equal(t, []*pb.WakuMessage{msgList[4].Message(), msgList[5].Message(), msgList[6].Message(), msgList[7].Message(), msgList[8].Message(), msgList[9].Message()}, messages) - require.Equal(t, msgList[9].Index(), newPagingInfo.Cursor) + require.Equal(t, &pb.Index{}, newPagingInfo.Cursor) require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction) require.Equal(t, uint64(6), newPagingInfo.PageSize) @@ -141,7 +141,7 @@ func TestForwardPagination(t *testing.T) { messages, newPagingInfo, err = findMessages(&pb.HistoryQuery{PagingInfo: pagingInfo}, singleItemDB) require.NoError(t, err) require.Len(t, messages, 1) - require.Equal(t, msgList[0].Index(), newPagingInfo.Cursor) + require.Equal(t, &pb.Index{}, newPagingInfo.Cursor) require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction) require.Equal(t, uint64(1), newPagingInfo.PageSize) } @@ -182,7 +182,7 @@ func TestBackwardPagination(t *testing.T) { require.Len(t, messages, 10) require.Equal(t, msgList[0].Message(), messages[0]) require.Equal(t, msgList[9].Message(), messages[9]) - require.Equal(t, msgList[0].Index(), newPagingInfo.Cursor) + require.Equal(t, &pb.Index{}, newPagingInfo.Cursor) require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction) require.Equal(t, uint64(10), newPagingInfo.PageSize) @@ -201,7 +201,7 @@ func TestBackwardPagination(t *testing.T) { require.NoError(t, err) require.Len(t, messages, 3) require.Equal(t, []*pb.WakuMessage{msgList[0].Message(), msgList[1].Message(), msgList[2].Message()}, messages) - require.Equal(t, msgList[0].Index(), newPagingInfo.Cursor) + require.Equal(t, &pb.Index{}, newPagingInfo.Cursor) require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction) require.Equal(t, uint64(3), newPagingInfo.PageSize) @@ -237,7 +237,7 @@ func TestBackwardPagination(t *testing.T) { messages, newPagingInfo, err = findMessages(&pb.HistoryQuery{PagingInfo: pagingInfo}, singleItemDB) require.NoError(t, err) require.Len(t, messages, 1) - require.Equal(t, msgList[0].Index(), newPagingInfo.Cursor) + require.Equal(t, &pb.Index{}, newPagingInfo.Cursor) require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction) require.Equal(t, uint64(1), newPagingInfo.PageSize) }