diff --git a/docs/api/store.md b/docs/api/store.md index 3a0781c6..a2ff27d1 100644 --- a/docs/api/store.md +++ b/docs/api/store.md @@ -24,23 +24,24 @@ query := store.Query{ } result, err := wakuNode.Store().Query(context.Background(), query, WithPaging(true, 20)); +if err != nil { + // Handle error ... +} + for { + hasNext, err := result.Next(ctx) if err != nil { - fmt.Println(err) + // Handle error ... break } - if len(result.messages) == 0 { - // No more messages available + if !hasNext { // No more messages available break } - for _, msg := range result.messages { - fmt.Println(string(msg.Payload)) + for _, msg := range result.GetMessages() { + // Do something with the messages } - - // Fetch more messages - result, err := wakuNode.Store().Next(context.Background(), result) } ``` diff --git a/waku/v2/protocol/store/waku_store_client.go b/waku/v2/protocol/store/waku_store_client.go index c360a659..00f70849 100644 --- a/waku/v2/protocol/store/waku_store_client.go +++ b/waku/v2/protocol/store/waku_store_client.go @@ -26,11 +26,12 @@ type Query struct { // Result represents a valid response from a store node type Result struct { + started bool Messages []*pb.WakuMessage - - query *pb.HistoryQuery - cursor *pb.Index - peerId peer.ID + store Store + query *pb.HistoryQuery + cursor *pb.Index + peerId peer.ID } func (r *Result) Cursor() *pb.Index { @@ -49,6 +50,34 @@ func (r *Result) Query() *pb.HistoryQuery { return r.query } +func (r *Result) Next(ctx context.Context) (bool, error) { + if !r.started { + r.started = true + return len(r.Messages) != 0, nil + } + + if r.IsComplete() { + return false, nil + } + + newResult, err := r.store.Next(ctx, r) + if err != nil { + return false, err + } + + r.cursor = newResult.cursor + r.Messages = newResult.Messages + + return true, nil +} + +func (r *Result) GetMessages() []*pb.WakuMessage { + if !r.started { + return nil + } + return r.Messages +} + type criteriaFN = func(msg *pb.WakuMessage) (bool, error) type HistoryRequestParameters struct { @@ -254,6 +283,7 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR store.log.Info("waku.store retrieved", logging.HexArray("hashes", messageIDs)) result := &Result{ + store: store, Messages: response.Messages, query: q, peerId: params.selectedPeer, @@ -309,6 +339,8 @@ func (store *WakuStore) Find(ctx context.Context, query Query, cb criteriaFN, op func (store *WakuStore) Next(ctx context.Context, r *Result) (*Result, error) { if r.IsComplete() { return &Result{ + store: store, + started: true, Messages: []*pb.WakuMessage{}, cursor: nil, query: r.query, @@ -343,6 +375,8 @@ func (store *WakuStore) Next(ctx context.Context, r *Result) (*Result, error) { } result := &Result{ + started: true, + store: store, Messages: response.Messages, query: q, peerId: r.PeerID(), diff --git a/waku/v2/protocol/store/waku_store_protocol_test.go b/waku/v2/protocol/store/waku_store_protocol_test.go index b31e44b2..e7d3363b 100644 --- a/waku/v2/protocol/store/waku_store_protocol_test.go +++ b/waku/v2/protocol/store/waku_store_protocol_test.go @@ -116,20 +116,109 @@ func TestWakuStoreProtocolNext(t *testing.T) { response, err = s2.Next(ctx, response) require.NoError(t, err) require.Len(t, response.Messages, 2) + require.True(t, response.started) require.Equal(t, response.Messages[0].Timestamp, msg3.Timestamp) require.Equal(t, response.Messages[1].Timestamp, msg4.Timestamp) response, err = s2.Next(ctx, response) require.NoError(t, err) + require.True(t, response.started) require.Len(t, response.Messages, 1) require.Equal(t, response.Messages[0].Timestamp, msg5.Timestamp) // No more records available response, err = s2.Next(ctx, response) require.NoError(t, err) + require.True(t, response.started) require.Len(t, response.Messages, 0) } +func TestWakuStoreResult(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) + require.NoError(t, err) + + db := MemoryDB(t) + s1 := NewWakuStore(host1, nil, db, timesource.NewDefaultClock(), utils.Logger()) + err = s1.Start(ctx) + require.NoError(t, err) + + topic1 := "1" + pubsubTopic1 := "topic1" + + msg1 := tests.CreateWakuMessage(topic1, 1) + msg2 := tests.CreateWakuMessage(topic1, 2) + msg3 := tests.CreateWakuMessage(topic1, 3) + msg4 := tests.CreateWakuMessage(topic1, 4) + msg5 := tests.CreateWakuMessage(topic1, 5) + + s1.MsgC <- protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1) + s1.MsgC <- protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic1) + s1.MsgC <- protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic1) + s1.MsgC <- protocol.NewEnvelope(msg4, utils.GetUnixEpoch(), pubsubTopic1) + s1.MsgC <- protocol.NewEnvelope(msg5, utils.GetUnixEpoch(), pubsubTopic1) + + host2, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) + require.NoError(t, err) + + host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL) + err = host2.Peerstore().AddProtocols(host1.ID(), string(StoreID_v20beta4)) + require.NoError(t, err) + + s2 := NewWakuStore(host2, nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + err = s2.Start(ctx) + require.NoError(t, err) + defer s2.Stop() + + q := Query{ + Topic: pubsubTopic1, + ContentTopics: []string{topic1}, + } + + result, err := s2.Query(ctx, q, WithAutomaticPeerSelection(), WithAutomaticRequestId(), WithPaging(true, 2)) + require.NoError(t, err) + require.False(t, result.started) + require.Len(t, result.GetMessages(), 0) + require.False(t, result.IsComplete()) + + // First iteration + hasNext, err := result.Next(ctx) + require.NoError(t, err) + require.True(t, result.started) + require.True(t, hasNext) + require.Len(t, result.GetMessages(), 2) + require.Equal(t, result.GetMessages()[0].Timestamp, msg1.Timestamp) + require.Equal(t, result.GetMessages()[1].Timestamp, msg2.Timestamp) + require.False(t, result.IsComplete()) + + // Second iteration + hasNext, err = result.Next(ctx) + require.NoError(t, err) + require.True(t, result.started) + require.True(t, hasNext) + require.Len(t, result.GetMessages(), 2) + require.Equal(t, result.GetMessages()[0].Timestamp, msg3.Timestamp) + require.Equal(t, result.GetMessages()[1].Timestamp, msg4.Timestamp) + require.False(t, result.IsComplete()) + + // Third iteration + hasNext, err = result.Next(ctx) + require.NoError(t, err) + require.True(t, result.started) + require.True(t, hasNext) + require.Len(t, result.GetMessages(), 1) + require.Equal(t, result.GetMessages()[0].Timestamp, msg5.Timestamp) + require.True(t, result.IsComplete()) + + // No more records available + hasNext, err = result.Next(ctx) + require.NoError(t, err) + require.True(t, result.started) + require.False(t, hasNext) +} + func TestWakuStoreProtocolFind(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel()