refactor: add Next() and create Query and Result structs (#117)

This commit is contained in:
Richard Ramos 2021-10-31 15:00:38 -04:00 committed by GitHub
parent d150123f21
commit 98255060f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 180 additions and 38 deletions

View File

@ -12,7 +12,7 @@ require (
github.com/libp2p/go-libp2p-core v0.9.0
github.com/multiformats/go-multiaddr v0.4.0
github.com/rivo/tview v0.0.0-20210312174852-ae9464cc3598
github.com/status-im/go-waku v0.0.0-20211012131444-baf57d82a30a
github.com/status-im/go-waku v0.0.0-20211030231903-d150123f21a0
golang.org/x/crypto v0.0.0-20210813211128-0a44fdfbc16e
google.golang.org/protobuf v1.27.1
)

View File

@ -36,7 +36,7 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
collectd.org v0.3.0/go.mod h1:A/8DzQBkF6abtvrT2j/AU/4tiBgJWYyh0y/oB/4MlWE=
contrib.go.opencensus.io/exporter/prometheus v0.3.0/go.mod h1:rpCPVQKhiyH8oomWgm34ZmgIdZa8OVYO5WAIygPbBBE=
contrib.go.opencensus.io/exporter/prometheus v0.4.0/go.mod h1:o7cosnyfuPVK0tB8q0QmaQNhGnptITnPQB+z1+qeFB0=
dmitri.shuralyov.com/app/changes v0.0.0-20180602232624-0a106ad413e3/go.mod h1:Yl+fi1br7+Rr3LqpNJf1/uxUdtRUV+Tnj0o93V2B9MU=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
dmitri.shuralyov.com/html/belt v0.0.0-20180602232347-f7d459c86be0/go.mod h1:JLBrvjyP0v+ecvNYvCpyZgu5/xkfAUhi6wJj28eUfSU=
@ -474,7 +474,6 @@ github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlT
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.8/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
@ -760,6 +759,7 @@ github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m
github.com/mattn/go-runewidth v0.0.10 h1:CoZ3S2P7pvtP45xOtBw+/mDL2z0RKI576gSkzRRpdGg=
github.com/mattn/go-runewidth v0.0.10/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk=
github.com/mattn/go-sqlite3 v1.11.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/mattn/go-sqlite3 v1.14.6 h1:dNPt6NO46WmLVt2DLNpwczCmdV5boIZ6g/tlDrlRUbg=
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/mattn/go-tty v0.0.0-20180907095812-13ff1204f104/go.mod h1:XPvLUNfbS4fJH25nqRHfWLMa1ONC8Amw+mIA639KxkE=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
@ -936,7 +936,6 @@ github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXP
github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829/go.mod h1:p2iRAGwDERtqlqzRXnrOVns+ignqQo//hLXqYxZYVNs=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
github.com/prometheus/client_golang v1.3.0/go.mod h1:hJaj2vgQTGQmVCsAACORcieXFeDPbaTKGT+JTgUa3og=
github.com/prometheus/client_golang v1.6.0/go.mod h1:ZLOG9ck3JLRdB5MgO8f+lLTe83AXG6ro35rLTxvnIl4=
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
github.com/prometheus/client_golang v1.9.0/go.mod h1:FqZLKOZnGdFAhOK4nqGHa7D66IdsO+O441Eve7ptJDU=
github.com/prometheus/client_golang v1.10.0/go.mod h1:WJM3cc3yu7XKBKa/I8WeZm+V3eltZnBwfENSU7mdogU=
@ -955,7 +954,6 @@ github.com/prometheus/common v0.2.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y8
github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.6.0/go.mod h1:eBmuwkDJBwy6iBfxCBob6t6dR6ENT/y+J+Zk0j9GMYc=
github.com/prometheus/common v0.7.0/go.mod h1:DjGbpBbp5NYNiECxcL/VnbXCCaQpKd3tt26CguLLsqA=
github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8bs7vj7HSQ4=
github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo=
github.com/prometheus/common v0.15.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s=
github.com/prometheus/common v0.18.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s=
@ -968,13 +966,11 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R
github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
github.com/prometheus/procfs v0.0.11/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/procfs v0.2.0/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU=
github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/statsd_exporter v0.20.0/go.mod h1:YL3FWCG8JBBtaUSxAg4Gz2ZYu22bS84XM89ZQXXTWmQ=
github.com/prometheus/statsd_exporter v0.21.0/go.mod h1:rbT83sZq2V+p73lHhPZfMc3MLCHmSHelCh9hSGYNLTQ=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
@ -1045,10 +1041,10 @@ github.com/spf13/pflag v1.0.1/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnIn
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
github.com/src-d/envconfig v1.0.0/go.mod h1:Q9YQZ7BKITldTBnoxsE5gOeB5y66RyPXeue/R4aaNBc=
github.com/status-im/go-waku v0.0.0-20211012131444-baf57d82a30a h1:7P3E7XF9kJ05pPI6gzAk5wzh8i5OZ/BDYjY7srroVQ4=
github.com/status-im/go-waku v0.0.0-20211012131444-baf57d82a30a/go.mod h1:CZV26mf0bzp94VE9QxxhpM4uFByn4UX698VUPalZMgQ=
github.com/status-im/go-waku-rendezvous v0.0.0-20211005020656-b53661c58574 h1:v2gpjWKyI+vZZugjjhPDqIhg6uNrGLusHh3ilvbv8/Y=
github.com/status-im/go-waku-rendezvous v0.0.0-20211005020656-b53661c58574/go.mod h1:Fa1uJjMz9MpfZc2tC5xdN9q90xg1VphSnevxWiBbFO0=
github.com/status-im/go-waku v0.0.0-20211030231903-d150123f21a0 h1:s359WxDL1G0Qar/S7lYa1Ip/+vR5b9Lcp+fcuv4aNvY=
github.com/status-im/go-waku v0.0.0-20211030231903-d150123f21a0/go.mod h1:A0lI3uZYLKrXiviVkwGgBdT8b9HLcW3U/xUcE/4665k=
github.com/status-im/go-waku-rendezvous v0.0.0-20211018070416-a93f3b70c432 h1:cbNFU38iimo9fY4B7CdF/fvIF6tNPJIZjBbpfmW2EY4=
github.com/status-im/go-waku-rendezvous v0.0.0-20211018070416-a93f3b70c432/go.mod h1:A8t3i0CUGtXCA0aiLsP7iyikmk/KaD/2XVvNJqGCU20=
github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4/go.mod h1:RZLeN1LMWmRsyYjvAu+I6Dm9QmlDaIIt+Y+4Kd7Tp+Q=
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
@ -1330,7 +1326,6 @@ golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200331124033-c3d80250170d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200420163511-1957bb5e6d1f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200501052902-10377860bb8e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200511232937-7e40ca221e25/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200515095857-1151b9dac4a9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=

View File

@ -78,7 +78,7 @@ func main() {
opts := []node.WakuNodeOption{
node.WithPrivateKey(prvKey),
node.WithHostAddress([]*net.TCPAddr{hostAddr}),
node.WithWakuStore(false, true),
node.WithWakuStore(false, false),
node.WithKeepAlive(time.Duration(*keepAliveFlag) * time.Second),
}
@ -201,7 +201,11 @@ func main() {
ui.displayMessage("Querying historic messages")
tCtx, _ := context.WithTimeout(ctx, 5*time.Second)
response, err := wakuNode.Query(tCtx, []string{*contentTopicFlag}, 0, 0,
q := store.Query{
ContentTopics: []string{*contentTopicFlag},
}
response, err := wakuNode.Query(tCtx, q,
store.WithAutomaticRequestId(),
store.WithPeer(*storeNodeId),
store.WithPaging(true, 0))

View File

@ -344,25 +344,20 @@ func (w *WakuNode) AddPeer(address ma.Multiaddr, protocolID p2pproto.ID) (*peer.
return &info.ID, w.addPeer(info, protocolID)
}
func (w *WakuNode) Query(ctx context.Context, contentTopics []string, startTime float64, endTime float64, opts ...store.HistoryRequestOption) (*pb.HistoryResponse, error) {
func (w *WakuNode) Query(ctx context.Context, query store.Query, opts ...store.HistoryRequestOption) (*store.Result, error) {
if w.store == nil {
return nil, errors.New("WakuStore is not set")
}
query := new(pb.HistoryQuery)
for _, ct := range contentTopics {
query.ContentFilters = append(query.ContentFilters, &pb.ContentFilter{ContentTopic: ct})
return w.store.Query(ctx, query, opts...)
}
query.StartTime = startTime
query.EndTime = endTime
query.PagingInfo = new(pb.PagingInfo)
result, err := w.store.Query(ctx, query, opts...)
if err != nil {
return nil, err
func (w *WakuNode) Next(ctx context.Context, result *store.Result) (*store.Result, error) {
if w.store == nil {
return nil, errors.New("WakuStore is not set")
}
return result, nil
return w.store.Next(ctx, result)
}
func (w *WakuNode) Resume(ctx context.Context, peerList []peer.ID) error {

View File

@ -188,6 +188,33 @@ type MessageProvider interface {
Stop()
}
type Query struct {
Topic string
ContentTopics []string
StartTime float64
EndTime float64
}
type Result struct {
Messages []*pb.WakuMessage
query *pb.HistoryQuery
cursor *pb.Index
peerId peer.ID
}
func (r *Result) Cursor() *pb.Index {
return r.cursor
}
func (r *Result) PeerID() peer.ID {
return r.peerId
}
func (r *Result) Query() *pb.HistoryQuery {
return r.query
}
type IndexedWakuMessage struct {
msg *pb.WakuMessage
index *pb.Index
@ -493,7 +520,19 @@ func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selec
return historyResponseRPC.Response, nil
}
func (store *WakuStore) Query(ctx context.Context, q *pb.HistoryQuery, opts ...HistoryRequestOption) (*pb.HistoryResponse, error) {
func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryRequestOption) (*Result, error) {
q := &pb.HistoryQuery{
PubsubTopic: query.Topic,
ContentFilters: []*pb.ContentFilter{},
StartTime: query.StartTime,
EndTime: query.EndTime,
PagingInfo: &pb.PagingInfo{},
}
for _, cf := range query.ContentTopics {
q.ContentFilters = append(q.ContentFilters, &pb.ContentFilter{ContentTopic: cf})
}
params := new(HistoryRequestParameters)
params.s = store
@ -511,10 +550,6 @@ func (store *WakuStore) Query(ctx context.Context, q *pb.HistoryQuery, opts ...H
return nil, ErrInvalidId
}
if q.PagingInfo == nil {
q.PagingInfo = &pb.PagingInfo{}
}
if params.cursor != nil {
q.PagingInfo.Cursor = params.cursor
}
@ -527,7 +562,55 @@ func (store *WakuStore) Query(ctx context.Context, q *pb.HistoryQuery, opts ...H
q.PagingInfo.PageSize = params.pageSize
return store.queryFrom(ctx, q, params.selectedPeer, params.requestId)
response, err := store.queryFrom(ctx, q, params.selectedPeer, params.requestId)
if err != nil {
return nil, err
}
if response.Error == pb.HistoryResponse_INVALID_CURSOR {
return nil, errors.New("invalid cursor")
}
return &Result{
Messages: response.Messages,
cursor: response.PagingInfo.Cursor,
query: q,
peerId: params.selectedPeer,
}, nil
}
func (store *WakuStore) Next(ctx context.Context, r *Result) (*Result, error) {
q := &pb.HistoryQuery{
PubsubTopic: r.query.PubsubTopic,
ContentFilters: r.query.ContentFilters,
StartTime: r.query.StartTime,
EndTime: r.query.EndTime,
PagingInfo: &pb.PagingInfo{
PageSize: r.query.PagingInfo.PageSize,
Direction: r.query.PagingInfo.Direction,
Cursor: &pb.Index{
Digest: r.cursor.Digest,
ReceiverTime: r.cursor.ReceiverTime,
SenderTime: r.cursor.SenderTime,
},
},
}
response, err := store.queryFrom(ctx, q, r.peerId, protocol.GenerateRequestId())
if err != nil {
return nil, err
}
if response.Error == pb.HistoryResponse_INVALID_CURSOR {
return nil, errors.New("invalid cursor")
}
return &Result{
Messages: response.Messages,
cursor: response.PagingInfo.Cursor,
query: q,
peerId: r.peerId,
}, nil
}
func (store *WakuStore) queryLoop(ctx context.Context, query *pb.HistoryQuery, candidateList []peer.ID) (*pb.HistoryResponse, error) {

View File

@ -47,14 +47,79 @@ func TestWakuStoreProtocolQuery(t *testing.T) {
err = host2.Peerstore().AddProtocols(host1.ID(), string(StoreID_v20beta3))
require.NoError(t, err)
response, err := s2.Query(ctx, &pb.HistoryQuery{
PubsubTopic: pubsubTopic1,
ContentFilters: []*pb.ContentFilter{{
ContentTopic: topic1,
}},
}, DefaultOptions()...)
q := Query{
Topic: pubsubTopic1,
ContentTopics: []string{topic1},
}
response, err := s2.Query(ctx, q, DefaultOptions()...)
require.NoError(t, err)
require.Len(t, response.Messages, 1)
require.Equal(t, msg, response.Messages[0])
}
func TestWakuStoreProtocolNext(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s1 := NewWakuStore(true, nil)
s1.Start(ctx, host1)
defer s1.Stop()
topic1 := "1"
pubsubTopic1 := "topic1"
msg1 := tests.CreateWakuMessage(topic1, float64(1))
msg2 := tests.CreateWakuMessage(topic1, float64(2))
msg3 := tests.CreateWakuMessage(topic1, float64(3))
msg4 := tests.CreateWakuMessage(topic1, float64(4))
msg5 := tests.CreateWakuMessage(topic1, float64(5))
s1.MsgC <- protocol.NewEnvelope(msg1, pubsubTopic1)
s1.MsgC <- protocol.NewEnvelope(msg2, pubsubTopic1)
s1.MsgC <- protocol.NewEnvelope(msg3, pubsubTopic1)
s1.MsgC <- protocol.NewEnvelope(msg4, pubsubTopic1)
s1.MsgC <- protocol.NewEnvelope(msg5, pubsubTopic1)
host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL)
err = host2.Peerstore().AddProtocols(host1.ID(), string(StoreID_v20beta3))
require.NoError(t, err)
s2 := NewWakuStore(false, nil)
s2.Start(ctx, host2)
defer s2.Stop()
q := Query{
Topic: pubsubTopic1,
ContentTopics: []string{topic1},
}
response, err := s2.Query(ctx, q, WithAutomaticPeerSelection(), WithAutomaticRequestId(), WithPaging(true, 2))
require.NoError(t, err)
require.Len(t, response.Messages, 2)
require.Equal(t, response.Messages[0].Timestamp, msg1.Timestamp)
require.Equal(t, response.Messages[1].Timestamp, msg2.Timestamp)
response, err = s2.Next(ctx, response)
require.NoError(t, err)
require.Len(t, response.Messages, 2)
require.Equal(t, response.Messages[0].Timestamp, msg3.Timestamp)
require.Equal(t, response.Messages[1].Timestamp, msg4.Timestamp)
response, err = s2.Next(ctx, response)
require.NoError(t, err)
require.Len(t, response.Messages, 1)
require.Equal(t, response.Messages[0].Timestamp, msg5.Timestamp)
// No more records available
response, err = s2.Next(ctx, response)
require.NoError(t, err)
require.Len(t, response.Messages, 0)
}