diff --git a/docs/api/store.md b/docs/api/store.md index a2ff27d1..effeda4f 100644 --- a/docs/api/store.md +++ b/docs/api/store.md @@ -28,20 +28,15 @@ if err != nil { // Handle error ... } -for { - hasNext, err := result.Next(ctx) - if err != nil { - // Handle error ... - break - } - - if !hasNext { // No more messages available - break - } - +for !result.IsComplete() { for _, msg := range result.GetMessages() { // Do something with the messages } + + err := result.Next(ctx) + if err != nil { + // Handle error ... + } } ``` diff --git a/waku/v2/protocol/store/client.go b/waku/v2/protocol/store/client.go index 5ebc62e4..4faec5cf 100644 --- a/waku/v2/protocol/store/client.go +++ b/waku/v2/protocol/store/client.go @@ -215,8 +215,7 @@ func (s *WakuStore) next(ctx context.Context, r *Result) (*Result, error) { if r.IsComplete() { return &Result{ store: s, - started: true, - messages: []*pb.WakuMessageKeyValue{}, + messages: nil, cursor: nil, storeRequest: r.storeRequest, storeResponse: r.storeResponse, @@ -234,7 +233,6 @@ func (s *WakuStore) next(ctx context.Context, r *Result) (*Result, error) { } result := &Result{ - started: true, store: s, messages: response.Messages, storeRequest: storeRequest, diff --git a/waku/v2/protocol/store/client_test.go b/waku/v2/protocol/store/client_test.go index d6685a25..035b7a6f 100644 --- a/waku/v2/protocol/store/client_test.go +++ b/waku/v2/protocol/store/client_test.go @@ -78,7 +78,7 @@ func TestStoreClient(t *testing.T) { startTime := utils.GetUnixEpoch(timesource.NewDefaultClock()) for i := 0; i < 5; i++ { msg := &pb.WakuMessage{ - Payload: []byte{1, 2, 3, 4, 5}, + Payload: []byte{byte(i), 1, 2, 3, 4, 5}, ContentTopic: "test", Version: proto.Uint32(0), Timestamp: utils.GetUnixEpoch(timesource.NewDefaultClock()), @@ -108,71 +108,71 @@ func TestStoreClient(t *testing.T) { require.NoError(t, err) // -- First page: - hasNext, err := response.Next(ctx) - require.NoError(t, err) - require.True(t, hasNext) require.False(t, response.IsComplete()) require.Len(t, response.messages, 2) require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[0].GetTimestamp()) require.Equal(t, response.messages[1].Message.GetTimestamp(), messages[1].GetTimestamp()) - // -- Second page: - hasNext, err = response.Next(ctx) + err = response.Next(ctx) require.NoError(t, err) - require.True(t, hasNext) + + // -- Second page: require.False(t, response.IsComplete()) require.Len(t, response.messages, 2) require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[2].GetTimestamp()) require.Equal(t, response.messages[1].Message.GetTimestamp(), messages[3].GetTimestamp()) - // -- Third page: - hasNext, err = response.Next(ctx) + err = response.Next(ctx) require.NoError(t, err) - require.False(t, hasNext) - require.True(t, response.IsComplete()) + + // -- Third page: + require.False(t, response.IsComplete()) require.Len(t, response.messages, 1) require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[4].GetTimestamp()) - // -- Trying to continue a completed cursor - hasNext, err = response.Next(ctx) + err = response.Next(ctx) require.NoError(t, err) - require.False(t, hasNext) + + // -- Trying to continue a completed cursor require.True(t, response.IsComplete()) + require.Len(t, response.messages, 0) + + err = response.Next(ctx) + require.NoError(t, err) // Query messages with backward pagination response, err = wakuStore.Query(ctx, FilterCriteria{ContentFilter: protocol.NewContentFilter(relay.DefaultWakuTopic, "test"), TimeStart: startTime, TimeEnd: endTime}, WithPaging(false, 2)) require.NoError(t, err) // -- First page: - hasNext, err = response.Next(ctx) - require.NoError(t, err) - require.True(t, hasNext) require.False(t, response.IsComplete()) require.Len(t, response.messages, 2) require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[3].GetTimestamp()) require.Equal(t, response.messages[1].Message.GetTimestamp(), messages[4].GetTimestamp()) - // -- Second page: - hasNext, err = response.Next(ctx) + err = response.Next(ctx) require.NoError(t, err) - require.True(t, hasNext) + + // -- Second page: require.False(t, response.IsComplete()) require.Len(t, response.messages, 2) require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[1].GetTimestamp()) require.Equal(t, response.messages[1].Message.GetTimestamp(), messages[2].GetTimestamp()) - // -- Third page: - hasNext, err = response.Next(ctx) + err = response.Next(ctx) require.NoError(t, err) - require.False(t, hasNext) - require.True(t, response.IsComplete()) + + // -- Third page: + require.False(t, response.IsComplete()) require.Len(t, response.messages, 1) require.Equal(t, response.messages[0].Message.GetTimestamp(), messages[0].GetTimestamp()) - // -- Trying to continue a completed cursor - hasNext, err = response.Next(ctx) + err = response.Next(ctx) + require.NoError(t, err) + + // -- Trying to continue a completed cursor + err = response.Next(ctx) require.NoError(t, err) - require.False(t, hasNext) require.True(t, response.IsComplete()) // No cursor should be returned if there are no messages that match the criteria diff --git a/waku/v2/protocol/store/result.go b/waku/v2/protocol/store/result.go index df6342db..0b3cb36f 100644 --- a/waku/v2/protocol/store/result.go +++ b/waku/v2/protocol/store/result.go @@ -9,7 +9,9 @@ import ( // Result represents a valid response from a store node type Result struct { - started bool + noCursor bool + done bool + messages []*pb.WakuMessageKeyValue store *WakuStore storeRequest *pb.StoreQueryRequest @@ -23,7 +25,7 @@ func (r *Result) Cursor() []byte { } func (r *Result) IsComplete() bool { - return r.cursor == nil + return r.noCursor && r.done } func (r *Result) PeerID() peer.ID { @@ -38,30 +40,28 @@ func (r *Result) Response() *pb.StoreQueryResponse { return r.storeResponse } -func (r *Result) Next(ctx context.Context) (bool, error) { - if !r.started { - r.started = true - return len(r.messages) != 0, nil - } - - if r.IsComplete() { - return false, nil +func (r *Result) Next(ctx context.Context) error { + if r.noCursor { + r.done = true + r.messages = nil + return nil } newResult, err := r.store.next(ctx, r) if err != nil { - return false, err + return err } r.cursor = newResult.cursor r.messages = newResult.messages - return !r.IsComplete(), nil + if r.cursor == nil { + r.noCursor = true + } + + return nil } func (r *Result) Messages() []*pb.WakuMessageKeyValue { - if !r.started { - return nil - } return r.messages }