diff --git a/examples/chat2/go.mod b/examples/chat2/go.mod index 12491ec4..a71263e2 100644 --- a/examples/chat2/go.mod +++ b/examples/chat2/go.mod @@ -12,7 +12,7 @@ require ( github.com/libp2p/go-libp2p-core v0.9.0 github.com/multiformats/go-multiaddr v0.4.0 github.com/rivo/tview v0.0.0-20210312174852-ae9464cc3598 - github.com/status-im/go-waku v0.0.0-20211012131444-baf57d82a30a + github.com/status-im/go-waku v0.0.0-20211030231903-d150123f21a0 golang.org/x/crypto v0.0.0-20210813211128-0a44fdfbc16e google.golang.org/protobuf v1.27.1 ) diff --git a/examples/chat2/go.sum b/examples/chat2/go.sum index ec210438..e6eb2ae5 100644 --- a/examples/chat2/go.sum +++ b/examples/chat2/go.sum @@ -36,7 +36,7 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= collectd.org v0.3.0/go.mod h1:A/8DzQBkF6abtvrT2j/AU/4tiBgJWYyh0y/oB/4MlWE= -contrib.go.opencensus.io/exporter/prometheus v0.3.0/go.mod h1:rpCPVQKhiyH8oomWgm34ZmgIdZa8OVYO5WAIygPbBBE= +contrib.go.opencensus.io/exporter/prometheus v0.4.0/go.mod h1:o7cosnyfuPVK0tB8q0QmaQNhGnptITnPQB+z1+qeFB0= dmitri.shuralyov.com/app/changes v0.0.0-20180602232624-0a106ad413e3/go.mod h1:Yl+fi1br7+Rr3LqpNJf1/uxUdtRUV+Tnj0o93V2B9MU= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= dmitri.shuralyov.com/html/belt v0.0.0-20180602232347-f7d459c86be0/go.mod h1:JLBrvjyP0v+ecvNYvCpyZgu5/xkfAUhi6wJj28eUfSU= @@ -474,7 +474,6 @@ github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlT github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.8/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= -github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= @@ -760,6 +759,7 @@ github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m github.com/mattn/go-runewidth v0.0.10 h1:CoZ3S2P7pvtP45xOtBw+/mDL2z0RKI576gSkzRRpdGg= github.com/mattn/go-runewidth v0.0.10/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk= github.com/mattn/go-sqlite3 v1.11.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= +github.com/mattn/go-sqlite3 v1.14.6 h1:dNPt6NO46WmLVt2DLNpwczCmdV5boIZ6g/tlDrlRUbg= github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/mattn/go-tty v0.0.0-20180907095812-13ff1204f104/go.mod h1:XPvLUNfbS4fJH25nqRHfWLMa1ONC8Amw+mIA639KxkE= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= @@ -936,7 +936,6 @@ github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXP github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829/go.mod h1:p2iRAGwDERtqlqzRXnrOVns+ignqQo//hLXqYxZYVNs= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_golang v1.3.0/go.mod h1:hJaj2vgQTGQmVCsAACORcieXFeDPbaTKGT+JTgUa3og= -github.com/prometheus/client_golang v1.6.0/go.mod h1:ZLOG9ck3JLRdB5MgO8f+lLTe83AXG6ro35rLTxvnIl4= github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= github.com/prometheus/client_golang v1.9.0/go.mod h1:FqZLKOZnGdFAhOK4nqGHa7D66IdsO+O441Eve7ptJDU= github.com/prometheus/client_golang v1.10.0/go.mod h1:WJM3cc3yu7XKBKa/I8WeZm+V3eltZnBwfENSU7mdogU= @@ -955,7 +954,6 @@ github.com/prometheus/common v0.2.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y8 github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.6.0/go.mod h1:eBmuwkDJBwy6iBfxCBob6t6dR6ENT/y+J+Zk0j9GMYc= github.com/prometheus/common v0.7.0/go.mod h1:DjGbpBbp5NYNiECxcL/VnbXCCaQpKd3tt26CguLLsqA= -github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8bs7vj7HSQ4= github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo= github.com/prometheus/common v0.15.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s= github.com/prometheus/common v0.18.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s= @@ -968,13 +966,11 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= -github.com/prometheus/procfs v0.0.11/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.2.0/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU= github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= -github.com/prometheus/statsd_exporter v0.20.0/go.mod h1:YL3FWCG8JBBtaUSxAg4Gz2ZYu22bS84XM89ZQXXTWmQ= github.com/prometheus/statsd_exporter v0.21.0/go.mod h1:rbT83sZq2V+p73lHhPZfMc3MLCHmSHelCh9hSGYNLTQ= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= @@ -1045,10 +1041,10 @@ github.com/spf13/pflag v1.0.1/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnIn github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s= github.com/src-d/envconfig v1.0.0/go.mod h1:Q9YQZ7BKITldTBnoxsE5gOeB5y66RyPXeue/R4aaNBc= -github.com/status-im/go-waku v0.0.0-20211012131444-baf57d82a30a h1:7P3E7XF9kJ05pPI6gzAk5wzh8i5OZ/BDYjY7srroVQ4= -github.com/status-im/go-waku v0.0.0-20211012131444-baf57d82a30a/go.mod h1:CZV26mf0bzp94VE9QxxhpM4uFByn4UX698VUPalZMgQ= -github.com/status-im/go-waku-rendezvous v0.0.0-20211005020656-b53661c58574 h1:v2gpjWKyI+vZZugjjhPDqIhg6uNrGLusHh3ilvbv8/Y= -github.com/status-im/go-waku-rendezvous v0.0.0-20211005020656-b53661c58574/go.mod h1:Fa1uJjMz9MpfZc2tC5xdN9q90xg1VphSnevxWiBbFO0= +github.com/status-im/go-waku v0.0.0-20211030231903-d150123f21a0 h1:s359WxDL1G0Qar/S7lYa1Ip/+vR5b9Lcp+fcuv4aNvY= +github.com/status-im/go-waku v0.0.0-20211030231903-d150123f21a0/go.mod h1:A0lI3uZYLKrXiviVkwGgBdT8b9HLcW3U/xUcE/4665k= +github.com/status-im/go-waku-rendezvous v0.0.0-20211018070416-a93f3b70c432 h1:cbNFU38iimo9fY4B7CdF/fvIF6tNPJIZjBbpfmW2EY4= +github.com/status-im/go-waku-rendezvous v0.0.0-20211018070416-a93f3b70c432/go.mod h1:A8t3i0CUGtXCA0aiLsP7iyikmk/KaD/2XVvNJqGCU20= github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4/go.mod h1:RZLeN1LMWmRsyYjvAu+I6Dm9QmlDaIIt+Y+4Kd7Tp+Q= github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= @@ -1330,7 +1326,6 @@ golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200331124033-c3d80250170d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200420163511-1957bb5e6d1f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200501052902-10377860bb8e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200511232937-7e40ca221e25/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200515095857-1151b9dac4a9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/examples/chat2/main.go b/examples/chat2/main.go index bb0d21c3..53f63c35 100644 --- a/examples/chat2/main.go +++ b/examples/chat2/main.go @@ -78,7 +78,7 @@ func main() { opts := []node.WakuNodeOption{ node.WithPrivateKey(prvKey), node.WithHostAddress([]*net.TCPAddr{hostAddr}), - node.WithWakuStore(false, true), + node.WithWakuStore(false, false), node.WithKeepAlive(time.Duration(*keepAliveFlag) * time.Second), } @@ -201,7 +201,11 @@ func main() { ui.displayMessage("Querying historic messages") tCtx, _ := context.WithTimeout(ctx, 5*time.Second) - response, err := wakuNode.Query(tCtx, []string{*contentTopicFlag}, 0, 0, + + q := store.Query{ + ContentTopics: []string{*contentTopicFlag}, + } + response, err := wakuNode.Query(tCtx, q, store.WithAutomaticRequestId(), store.WithPeer(*storeNodeId), store.WithPaging(true, 0)) diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 6c3ad738..5843eb33 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -344,25 +344,20 @@ func (w *WakuNode) AddPeer(address ma.Multiaddr, protocolID p2pproto.ID) (*peer. return &info.ID, w.addPeer(info, protocolID) } -func (w *WakuNode) Query(ctx context.Context, contentTopics []string, startTime float64, endTime float64, opts ...store.HistoryRequestOption) (*pb.HistoryResponse, error) { +func (w *WakuNode) Query(ctx context.Context, query store.Query, opts ...store.HistoryRequestOption) (*store.Result, error) { if w.store == nil { return nil, errors.New("WakuStore is not set") } - query := new(pb.HistoryQuery) + return w.store.Query(ctx, query, opts...) +} - for _, ct := range contentTopics { - query.ContentFilters = append(query.ContentFilters, &pb.ContentFilter{ContentTopic: ct}) +func (w *WakuNode) Next(ctx context.Context, result *store.Result) (*store.Result, error) { + if w.store == nil { + return nil, errors.New("WakuStore is not set") } - query.StartTime = startTime - query.EndTime = endTime - query.PagingInfo = new(pb.PagingInfo) - result, err := w.store.Query(ctx, query, opts...) - if err != nil { - return nil, err - } - return result, nil + return w.store.Next(ctx, result) } func (w *WakuNode) Resume(ctx context.Context, peerList []peer.ID) error { diff --git a/waku/v2/protocol/store/waku_store.go b/waku/v2/protocol/store/waku_store.go index dfa7e4f6..599341d8 100644 --- a/waku/v2/protocol/store/waku_store.go +++ b/waku/v2/protocol/store/waku_store.go @@ -188,6 +188,33 @@ type MessageProvider interface { Stop() } +type Query struct { + Topic string + ContentTopics []string + StartTime float64 + EndTime float64 +} + +type Result struct { + Messages []*pb.WakuMessage + + query *pb.HistoryQuery + cursor *pb.Index + peerId peer.ID +} + +func (r *Result) Cursor() *pb.Index { + return r.cursor +} + +func (r *Result) PeerID() peer.ID { + return r.peerId +} + +func (r *Result) Query() *pb.HistoryQuery { + return r.query +} + type IndexedWakuMessage struct { msg *pb.WakuMessage index *pb.Index @@ -493,7 +520,19 @@ func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selec return historyResponseRPC.Response, nil } -func (store *WakuStore) Query(ctx context.Context, q *pb.HistoryQuery, opts ...HistoryRequestOption) (*pb.HistoryResponse, error) { +func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryRequestOption) (*Result, error) { + q := &pb.HistoryQuery{ + PubsubTopic: query.Topic, + ContentFilters: []*pb.ContentFilter{}, + StartTime: query.StartTime, + EndTime: query.EndTime, + PagingInfo: &pb.PagingInfo{}, + } + + for _, cf := range query.ContentTopics { + q.ContentFilters = append(q.ContentFilters, &pb.ContentFilter{ContentTopic: cf}) + } + params := new(HistoryRequestParameters) params.s = store @@ -511,10 +550,6 @@ func (store *WakuStore) Query(ctx context.Context, q *pb.HistoryQuery, opts ...H return nil, ErrInvalidId } - if q.PagingInfo == nil { - q.PagingInfo = &pb.PagingInfo{} - } - if params.cursor != nil { q.PagingInfo.Cursor = params.cursor } @@ -527,7 +562,55 @@ func (store *WakuStore) Query(ctx context.Context, q *pb.HistoryQuery, opts ...H q.PagingInfo.PageSize = params.pageSize - return store.queryFrom(ctx, q, params.selectedPeer, params.requestId) + response, err := store.queryFrom(ctx, q, params.selectedPeer, params.requestId) + if err != nil { + return nil, err + } + + if response.Error == pb.HistoryResponse_INVALID_CURSOR { + return nil, errors.New("invalid cursor") + } + + return &Result{ + Messages: response.Messages, + cursor: response.PagingInfo.Cursor, + query: q, + peerId: params.selectedPeer, + }, nil +} + +func (store *WakuStore) Next(ctx context.Context, r *Result) (*Result, error) { + q := &pb.HistoryQuery{ + PubsubTopic: r.query.PubsubTopic, + ContentFilters: r.query.ContentFilters, + StartTime: r.query.StartTime, + EndTime: r.query.EndTime, + PagingInfo: &pb.PagingInfo{ + PageSize: r.query.PagingInfo.PageSize, + Direction: r.query.PagingInfo.Direction, + Cursor: &pb.Index{ + Digest: r.cursor.Digest, + ReceiverTime: r.cursor.ReceiverTime, + SenderTime: r.cursor.SenderTime, + }, + }, + } + + response, err := store.queryFrom(ctx, q, r.peerId, protocol.GenerateRequestId()) + if err != nil { + return nil, err + } + + if response.Error == pb.HistoryResponse_INVALID_CURSOR { + return nil, errors.New("invalid cursor") + } + + return &Result{ + Messages: response.Messages, + cursor: response.PagingInfo.Cursor, + query: q, + peerId: r.peerId, + }, nil } func (store *WakuStore) queryLoop(ctx context.Context, query *pb.HistoryQuery, candidateList []peer.ID) (*pb.HistoryResponse, error) { diff --git a/waku/v2/protocol/store/waku_store_protocol_test.go b/waku/v2/protocol/store/waku_store_protocol_test.go index 02d9a71c..edf062c4 100644 --- a/waku/v2/protocol/store/waku_store_protocol_test.go +++ b/waku/v2/protocol/store/waku_store_protocol_test.go @@ -47,14 +47,79 @@ func TestWakuStoreProtocolQuery(t *testing.T) { err = host2.Peerstore().AddProtocols(host1.ID(), string(StoreID_v20beta3)) require.NoError(t, err) - response, err := s2.Query(ctx, &pb.HistoryQuery{ - PubsubTopic: pubsubTopic1, - ContentFilters: []*pb.ContentFilter{{ - ContentTopic: topic1, - }}, - }, DefaultOptions()...) + q := Query{ + Topic: pubsubTopic1, + ContentTopics: []string{topic1}, + } + + response, err := s2.Query(ctx, q, DefaultOptions()...) require.NoError(t, err) require.Len(t, response.Messages, 1) require.Equal(t, msg, response.Messages[0]) } + +func TestWakuStoreProtocolNext(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) + require.NoError(t, err) + + s1 := NewWakuStore(true, nil) + s1.Start(ctx, host1) + defer s1.Stop() + + topic1 := "1" + pubsubTopic1 := "topic1" + + msg1 := tests.CreateWakuMessage(topic1, float64(1)) + msg2 := tests.CreateWakuMessage(topic1, float64(2)) + msg3 := tests.CreateWakuMessage(topic1, float64(3)) + msg4 := tests.CreateWakuMessage(topic1, float64(4)) + msg5 := tests.CreateWakuMessage(topic1, float64(5)) + + s1.MsgC <- protocol.NewEnvelope(msg1, pubsubTopic1) + s1.MsgC <- protocol.NewEnvelope(msg2, pubsubTopic1) + s1.MsgC <- protocol.NewEnvelope(msg3, pubsubTopic1) + s1.MsgC <- protocol.NewEnvelope(msg4, pubsubTopic1) + s1.MsgC <- protocol.NewEnvelope(msg5, pubsubTopic1) + + host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) + require.NoError(t, err) + + host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL) + err = host2.Peerstore().AddProtocols(host1.ID(), string(StoreID_v20beta3)) + require.NoError(t, err) + + s2 := NewWakuStore(false, nil) + s2.Start(ctx, host2) + defer s2.Stop() + + q := Query{ + Topic: pubsubTopic1, + ContentTopics: []string{topic1}, + } + + response, err := s2.Query(ctx, q, WithAutomaticPeerSelection(), WithAutomaticRequestId(), WithPaging(true, 2)) + require.NoError(t, err) + require.Len(t, response.Messages, 2) + require.Equal(t, response.Messages[0].Timestamp, msg1.Timestamp) + require.Equal(t, response.Messages[1].Timestamp, msg2.Timestamp) + + response, err = s2.Next(ctx, response) + require.NoError(t, err) + require.Len(t, response.Messages, 2) + require.Equal(t, response.Messages[0].Timestamp, msg3.Timestamp) + require.Equal(t, response.Messages[1].Timestamp, msg4.Timestamp) + + response, err = s2.Next(ctx, response) + require.NoError(t, err) + require.Len(t, response.Messages, 1) + require.Equal(t, response.Messages[0].Timestamp, msg5.Timestamp) + + // No more records available + response, err = s2.Next(ctx, response) + require.NoError(t, err) + require.Len(t, response.Messages, 0) +}