fix(store): simplify cursor

This commit is contained in:
Richard Ramos 2024-05-16 18:53:32 -04:00
parent 879bc08426
commit 05d247d272
No known key found for this signature in database
GPG Key ID: 1CE87DB518195760
4 changed files with 49 additions and 56 deletions

View File

@ -28,20 +28,15 @@ if err != nil {
// Handle error ... // Handle error ...
} }
for { for !result.IsComplete() {
hasNext, err := result.Next(ctx)
if err != nil {
// Handle error ...
break
}
if !hasNext { // No more messages available
break
}
for _, msg := range result.GetMessages() { for _, msg := range result.GetMessages() {
// Do something with the messages // Do something with the messages
} }
err := result.Next(ctx)
if err != nil {
// Handle error ...
}
} }
``` ```

View File

@ -215,8 +215,7 @@ func (s *WakuStore) next(ctx context.Context, r *Result) (*Result, error) {
if r.IsComplete() { if r.IsComplete() {
return &Result{ return &Result{
store: s, store: s,
started: true, messages: nil,
messages: []*pb.WakuMessageKeyValue{},
cursor: nil, cursor: nil,
storeRequest: r.storeRequest, storeRequest: r.storeRequest,
storeResponse: r.storeResponse, storeResponse: r.storeResponse,
@ -234,7 +233,6 @@ func (s *WakuStore) next(ctx context.Context, r *Result) (*Result, error) {
} }
result := &Result{ result := &Result{
started: true,
store: s, store: s,
messages: response.Messages, messages: response.Messages,
storeRequest: storeRequest, storeRequest: storeRequest,

View File

@ -78,7 +78,7 @@ func TestStoreClient(t *testing.T) {
startTime := utils.GetUnixEpoch(timesource.NewDefaultClock()) startTime := utils.GetUnixEpoch(timesource.NewDefaultClock())
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
msg := &pb.WakuMessage{ msg := &pb.WakuMessage{
Payload: []byte{1, 2, 3, 4, 5}, Payload: []byte{byte(i), 1, 2, 3, 4, 5},
ContentTopic: "test", ContentTopic: "test",
Version: proto.Uint32(0), Version: proto.Uint32(0),
Timestamp: utils.GetUnixEpoch(timesource.NewDefaultClock()), Timestamp: utils.GetUnixEpoch(timesource.NewDefaultClock()),
@ -108,71 +108,71 @@ func TestStoreClient(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
// -- First page: // -- First page:
hasNext, err := response.Next(ctx)
require.NoError(t, err)
require.True(t, hasNext)
require.False(t, response.IsComplete()) require.False(t, response.IsComplete())
require.Len(t, response.messages, 2) require.Len(t, response.messages, 2)
require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[0].GetTimestamp()) require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[0].GetTimestamp())
require.Equal(t, response.messages[1].Message.GetTimestamp(), messages[1].GetTimestamp()) require.Equal(t, response.messages[1].Message.GetTimestamp(), messages[1].GetTimestamp())
// -- Second page: err = response.Next(ctx)
hasNext, err = response.Next(ctx)
require.NoError(t, err) require.NoError(t, err)
require.True(t, hasNext)
// -- Second page:
require.False(t, response.IsComplete()) require.False(t, response.IsComplete())
require.Len(t, response.messages, 2) require.Len(t, response.messages, 2)
require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[2].GetTimestamp()) require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[2].GetTimestamp())
require.Equal(t, response.messages[1].Message.GetTimestamp(), messages[3].GetTimestamp()) require.Equal(t, response.messages[1].Message.GetTimestamp(), messages[3].GetTimestamp())
// -- Third page: err = response.Next(ctx)
hasNext, err = response.Next(ctx)
require.NoError(t, err) require.NoError(t, err)
require.False(t, hasNext)
require.True(t, response.IsComplete()) // -- Third page:
require.False(t, response.IsComplete())
require.Len(t, response.messages, 1) require.Len(t, response.messages, 1)
require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[4].GetTimestamp()) require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[4].GetTimestamp())
// -- Trying to continue a completed cursor err = response.Next(ctx)
hasNext, err = response.Next(ctx)
require.NoError(t, err) require.NoError(t, err)
require.False(t, hasNext)
// -- Trying to continue a completed cursor
require.True(t, response.IsComplete()) require.True(t, response.IsComplete())
require.Len(t, response.messages, 0)
err = response.Next(ctx)
require.NoError(t, err)
// Query messages with backward pagination // Query messages with backward pagination
response, err = wakuStore.Query(ctx, FilterCriteria{ContentFilter: protocol.NewContentFilter(relay.DefaultWakuTopic, "test"), TimeStart: startTime, TimeEnd: endTime}, WithPaging(false, 2)) response, err = wakuStore.Query(ctx, FilterCriteria{ContentFilter: protocol.NewContentFilter(relay.DefaultWakuTopic, "test"), TimeStart: startTime, TimeEnd: endTime}, WithPaging(false, 2))
require.NoError(t, err) require.NoError(t, err)
// -- First page: // -- First page:
hasNext, err = response.Next(ctx)
require.NoError(t, err)
require.True(t, hasNext)
require.False(t, response.IsComplete()) require.False(t, response.IsComplete())
require.Len(t, response.messages, 2) require.Len(t, response.messages, 2)
require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[3].GetTimestamp()) require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[3].GetTimestamp())
require.Equal(t, response.messages[1].Message.GetTimestamp(), messages[4].GetTimestamp()) require.Equal(t, response.messages[1].Message.GetTimestamp(), messages[4].GetTimestamp())
// -- Second page: err = response.Next(ctx)
hasNext, err = response.Next(ctx)
require.NoError(t, err) require.NoError(t, err)
require.True(t, hasNext)
// -- Second page:
require.False(t, response.IsComplete()) require.False(t, response.IsComplete())
require.Len(t, response.messages, 2) require.Len(t, response.messages, 2)
require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[1].GetTimestamp()) require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[1].GetTimestamp())
require.Equal(t, response.messages[1].Message.GetTimestamp(), messages[2].GetTimestamp()) require.Equal(t, response.messages[1].Message.GetTimestamp(), messages[2].GetTimestamp())
// -- Third page: err = response.Next(ctx)
hasNext, err = response.Next(ctx)
require.NoError(t, err) require.NoError(t, err)
require.False(t, hasNext)
require.True(t, response.IsComplete()) // -- Third page:
require.False(t, response.IsComplete())
require.Len(t, response.messages, 1) require.Len(t, response.messages, 1)
require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[0].GetTimestamp()) require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[0].GetTimestamp())
// -- Trying to continue a completed cursor err = response.Next(ctx)
hasNext, err = response.Next(ctx) require.NoError(t, err)
// -- Trying to continue a completed cursor
err = response.Next(ctx)
require.NoError(t, err) require.NoError(t, err)
require.False(t, hasNext)
require.True(t, response.IsComplete()) require.True(t, response.IsComplete())
// No cursor should be returned if there are no messages that match the criteria // No cursor should be returned if there are no messages that match the criteria

View File

@ -9,7 +9,9 @@ import (
// Result represents a valid response from a store node // Result represents a valid response from a store node
type Result struct { type Result struct {
started bool noCursor bool
done bool
messages []*pb.WakuMessageKeyValue messages []*pb.WakuMessageKeyValue
store *WakuStore store *WakuStore
storeRequest *pb.StoreQueryRequest storeRequest *pb.StoreQueryRequest
@ -23,7 +25,7 @@ func (r *Result) Cursor() []byte {
} }
func (r *Result) IsComplete() bool { func (r *Result) IsComplete() bool {
return r.cursor == nil return r.noCursor && r.done
} }
func (r *Result) PeerID() peer.ID { func (r *Result) PeerID() peer.ID {
@ -38,30 +40,28 @@ func (r *Result) Response() *pb.StoreQueryResponse {
return r.storeResponse return r.storeResponse
} }
func (r *Result) Next(ctx context.Context) (bool, error) { func (r *Result) Next(ctx context.Context) error {
if !r.started { if r.noCursor {
r.started = true r.done = true
return len(r.messages) != 0, nil r.messages = nil
} return nil
if r.IsComplete() {
return false, nil
} }
newResult, err := r.store.next(ctx, r) newResult, err := r.store.next(ctx, r)
if err != nil { if err != nil {
return false, err return err
} }
r.cursor = newResult.cursor r.cursor = newResult.cursor
r.messages = newResult.messages r.messages = newResult.messages
return !r.IsComplete(), nil if r.cursor == nil {
r.noCursor = true
}
return nil
} }
func (r *Result) Messages() []*pb.WakuMessageKeyValue { func (r *Result) Messages() []*pb.WakuMessageKeyValue {
if !r.started {
return nil
}
return r.messages return r.messages
} }