mirror of https://github.com/status-im/go-waku.git
chore: improve pagination logic (#325)
This commit is contained in:
parent
2881d0cd5e
commit
5af5e89c08
|
@ -324,7 +324,9 @@ func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, err
|
||||||
}
|
}
|
||||||
defer stmt.Close()
|
defer stmt.Close()
|
||||||
|
|
||||||
parameters = append(parameters, query.PagingInfo.PageSize)
|
pageSize := query.PagingInfo.PageSize + 1
|
||||||
|
|
||||||
|
parameters = append(parameters, pageSize)
|
||||||
rows, err := stmt.Query(parameters...)
|
rows, err := stmt.Query(parameters...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
|
@ -338,14 +340,16 @@ func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, err
|
||||||
}
|
}
|
||||||
result = append(result, record)
|
result = append(result, record)
|
||||||
}
|
}
|
||||||
|
|
||||||
defer rows.Close()
|
defer rows.Close()
|
||||||
|
|
||||||
cursor := &pb.Index{}
|
cursor := &pb.Index{}
|
||||||
if len(result) != 0 {
|
if len(result) != 0 {
|
||||||
|
if len(result) > int(query.PagingInfo.PageSize) {
|
||||||
|
result = result[0:query.PagingInfo.PageSize]
|
||||||
lastMsgIdx := len(result) - 1
|
lastMsgIdx := len(result) - 1
|
||||||
cursor = protocol.NewEnvelope(result[lastMsgIdx].Message, result[lastMsgIdx].ReceiverTime, result[lastMsgIdx].PubsubTopic).Index()
|
cursor = protocol.NewEnvelope(result[lastMsgIdx].Message, result[lastMsgIdx].ReceiverTime, result[lastMsgIdx].PubsubTopic).Index()
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// The retrieved messages list should always be in chronological order
|
// The retrieved messages list should always be in chronological order
|
||||||
if query.PagingInfo.Direction == pb.PagingInfo_BACKWARD {
|
if query.PagingInfo.Direction == pb.PagingInfo_BACKWARD {
|
||||||
|
|
|
@ -131,6 +131,10 @@ func (r *Result) Cursor() *pb.Index {
|
||||||
return r.cursor
|
return r.cursor
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *Result) IsComplete() bool {
|
||||||
|
return len(r.cursor.Digest) == 0
|
||||||
|
}
|
||||||
|
|
||||||
func (r *Result) PeerID() peer.ID {
|
func (r *Result) PeerID() peer.ID {
|
||||||
return r.peerId
|
return r.peerId
|
||||||
}
|
}
|
||||||
|
@ -480,6 +484,15 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR
|
||||||
// This function is useful for iterating over results without having to manually
|
// This function is useful for iterating over results without having to manually
|
||||||
// specify the cursor and pagination order and max number of results
|
// specify the cursor and pagination order and max number of results
|
||||||
func (store *WakuStore) Next(ctx context.Context, r *Result) (*Result, error) {
|
func (store *WakuStore) Next(ctx context.Context, r *Result) (*Result, error) {
|
||||||
|
if r.IsComplete() {
|
||||||
|
return &Result{
|
||||||
|
Messages: []*pb.WakuMessage{},
|
||||||
|
cursor: &pb.Index{},
|
||||||
|
query: r.query,
|
||||||
|
peerId: r.PeerID(),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
q := &pb.HistoryQuery{
|
q := &pb.HistoryQuery{
|
||||||
PubsubTopic: r.Query().PubsubTopic,
|
PubsubTopic: r.Query().PubsubTopic,
|
||||||
ContentFilters: r.Query().ContentFilters,
|
ContentFilters: r.Query().ContentFilters,
|
||||||
|
|
|
@ -86,7 +86,7 @@ func TestForwardPagination(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Len(t, messages, 10)
|
require.Len(t, messages, 10)
|
||||||
require.Equal(t, msgList[9].Message(), messages[9])
|
require.Equal(t, msgList[9].Message(), messages[9])
|
||||||
require.Equal(t, msgList[9].Index(), newPagingInfo.Cursor)
|
require.Equal(t, &pb.Index{}, newPagingInfo.Cursor)
|
||||||
require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction)
|
require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction)
|
||||||
require.Equal(t, uint64(10), newPagingInfo.PageSize)
|
require.Equal(t, uint64(10), newPagingInfo.PageSize)
|
||||||
|
|
||||||
|
@ -105,7 +105,7 @@ func TestForwardPagination(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Len(t, messages, 6)
|
require.Len(t, messages, 6)
|
||||||
require.Equal(t, []*pb.WakuMessage{msgList[4].Message(), msgList[5].Message(), msgList[6].Message(), msgList[7].Message(), msgList[8].Message(), msgList[9].Message()}, messages)
|
require.Equal(t, []*pb.WakuMessage{msgList[4].Message(), msgList[5].Message(), msgList[6].Message(), msgList[7].Message(), msgList[8].Message(), msgList[9].Message()}, messages)
|
||||||
require.Equal(t, msgList[9].Index(), newPagingInfo.Cursor)
|
require.Equal(t, &pb.Index{}, newPagingInfo.Cursor)
|
||||||
require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction)
|
require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction)
|
||||||
require.Equal(t, uint64(6), newPagingInfo.PageSize)
|
require.Equal(t, uint64(6), newPagingInfo.PageSize)
|
||||||
|
|
||||||
|
@ -141,7 +141,7 @@ func TestForwardPagination(t *testing.T) {
|
||||||
messages, newPagingInfo, err = findMessages(&pb.HistoryQuery{PagingInfo: pagingInfo}, singleItemDB)
|
messages, newPagingInfo, err = findMessages(&pb.HistoryQuery{PagingInfo: pagingInfo}, singleItemDB)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Len(t, messages, 1)
|
require.Len(t, messages, 1)
|
||||||
require.Equal(t, msgList[0].Index(), newPagingInfo.Cursor)
|
require.Equal(t, &pb.Index{}, newPagingInfo.Cursor)
|
||||||
require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction)
|
require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction)
|
||||||
require.Equal(t, uint64(1), newPagingInfo.PageSize)
|
require.Equal(t, uint64(1), newPagingInfo.PageSize)
|
||||||
}
|
}
|
||||||
|
@ -182,7 +182,7 @@ func TestBackwardPagination(t *testing.T) {
|
||||||
require.Len(t, messages, 10)
|
require.Len(t, messages, 10)
|
||||||
require.Equal(t, msgList[0].Message(), messages[0])
|
require.Equal(t, msgList[0].Message(), messages[0])
|
||||||
require.Equal(t, msgList[9].Message(), messages[9])
|
require.Equal(t, msgList[9].Message(), messages[9])
|
||||||
require.Equal(t, msgList[0].Index(), newPagingInfo.Cursor)
|
require.Equal(t, &pb.Index{}, newPagingInfo.Cursor)
|
||||||
require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction)
|
require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction)
|
||||||
require.Equal(t, uint64(10), newPagingInfo.PageSize)
|
require.Equal(t, uint64(10), newPagingInfo.PageSize)
|
||||||
|
|
||||||
|
@ -201,7 +201,7 @@ func TestBackwardPagination(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Len(t, messages, 3)
|
require.Len(t, messages, 3)
|
||||||
require.Equal(t, []*pb.WakuMessage{msgList[0].Message(), msgList[1].Message(), msgList[2].Message()}, messages)
|
require.Equal(t, []*pb.WakuMessage{msgList[0].Message(), msgList[1].Message(), msgList[2].Message()}, messages)
|
||||||
require.Equal(t, msgList[0].Index(), newPagingInfo.Cursor)
|
require.Equal(t, &pb.Index{}, newPagingInfo.Cursor)
|
||||||
require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction)
|
require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction)
|
||||||
require.Equal(t, uint64(3), newPagingInfo.PageSize)
|
require.Equal(t, uint64(3), newPagingInfo.PageSize)
|
||||||
|
|
||||||
|
@ -237,7 +237,7 @@ func TestBackwardPagination(t *testing.T) {
|
||||||
messages, newPagingInfo, err = findMessages(&pb.HistoryQuery{PagingInfo: pagingInfo}, singleItemDB)
|
messages, newPagingInfo, err = findMessages(&pb.HistoryQuery{PagingInfo: pagingInfo}, singleItemDB)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Len(t, messages, 1)
|
require.Len(t, messages, 1)
|
||||||
require.Equal(t, msgList[0].Index(), newPagingInfo.Cursor)
|
require.Equal(t, &pb.Index{}, newPagingInfo.Cursor)
|
||||||
require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction)
|
require.Equal(t, pagingInfo.Direction, newPagingInfo.Direction)
|
||||||
require.Equal(t, uint64(1), newPagingInfo.PageSize)
|
require.Equal(t, uint64(1), newPagingInfo.PageSize)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue