From 83c4b5f04e84d31876fa1cedfa353665326f4baa Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Thu, 6 Jun 2024 19:43:27 -0400 Subject: [PATCH] chore: choose store version --- go/README.md | 6 +++- go/main.go | 91 +++++++++++++++++++++++++++++++++++++++------------- 2 files changed, 73 insertions(+), 24 deletions(-) diff --git a/go/README.md b/go/README.md index d4e6224..3265dbf 100644 --- a/go/README.md +++ b/go/README.md @@ -15,6 +15,8 @@ make --content-topic=/waku/1/0xa0a6b41b/rfc26 \ --start-time=1717507412000000000 \ --end-time=1717593812000000000 + --pagesize=20 \ + --use-legacy=true ``` For the previous execution, you should see among the logs the following: @@ -38,5 +40,7 @@ docker run querytool:latest \ --content-topic=/waku/1/0xc95d2429/rfc26 \ --content-topic=/waku/1/0xa0a6b41b/rfc26 \ --start-time=1717507412000000000 \ - --end-time=1717593812000000000 + --end-time=1717593812000000000 \ + --pagesize=20 \ + --use-legacy=true ``` diff --git a/go/main.go b/go/main.go index c4fc8d3..debf52e 100644 --- a/go/main.go +++ b/go/main.go @@ -16,7 +16,9 @@ import ( "github.com/waku-org/go-waku/logging" "github.com/waku-org/go-waku/waku/cliutils" "github.com/waku-org/go-waku/waku/v2/node" + "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store" + "github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -30,7 +32,9 @@ type Options struct { ContentTopics cli.StringSlice StartTime int64 EndTime int64 + PageSize uint64 StoreNode *multiaddr.Multiaddr + UseLegacy bool QueryTimeout time.Duration LogLevel string LogEncoding string @@ -75,6 +79,12 @@ var flags []cli.Flag = []cli.Flag{ Usage: "Query end time in nanoseconds", Destination: &options.EndTime, }), + altsrc.NewUint64Flag(&cli.Uint64Flag{ + Name: "pagesize", + Value: 20, + Usage: "Pagesize", + Destination: &options.PageSize, + }), cliutils.NewGenericFlagSingleValue(&cli.GenericFlag{ Name: "storenode", Usage: "Multiaddr of a peer that supports store protocol", @@ -83,6 +93,11 @@ var flags []cli.Flag = []cli.Flag{ }, Required: true, }), + altsrc.NewBoolFlag(&cli.BoolFlag{ + Name: "use-legacy", + Usage: "Use legacy store", + Destination: &options.UseLegacy, + }), altsrc.NewDurationFlag(&cli.DurationFlag{ Name: "timeout", Usage: "timeout for each individual store query request", @@ -186,37 +201,67 @@ func Execute(ctx context.Context, opts Options) error { cnt := 0 - query := legacy_store.Query{ - PubsubTopic: options.PubSubTopic, - ContentTopics: options.ContentTopics.Value(), - StartTime: proto.Int64(options.StartTime), - EndTime: proto.Int64(options.EndTime), - } + if !options.UseLegacy { + criteria := store.FilterCriteria{ + ContentFilter: protocol.NewContentFilter(options.PubSubTopic, options.ContentTopics.Value()...), + TimeStart: proto.Int64(options.StartTime), + TimeEnd: proto.Int64(options.EndTime), + } - ctx, cancel := context.WithTimeout(context.Background(), options.QueryTimeout) - result, err := wakuNode.LegacyStore().Query(ctx, query, - legacy_store.WithPeerAddr(*options.StoreNode), - legacy_store.WithPaging(false, 20), - legacy_store.WithRequestID([]byte{1, 2, 3, 4, 5, 6, 7, 8}), - ) - cancel() - if err != nil { - return err - } - - for { ctx, cancel := context.WithTimeout(context.Background(), options.QueryTimeout) - hasNext, err := result.Next(ctx) - defer cancel() + result, err := wakuNode.Store().Query(ctx, criteria, + store.WithPeerAddr(*options.StoreNode), + store.WithPaging(false, options.PageSize), + store.IncludeData(false), + ) + cancel() if err != nil { return err } - if !hasNext { // No more messages available - break + for !result.IsComplete() { + cnt += len(result.Messages()) + + ctx, cancel := context.WithTimeout(context.Background(), options.QueryTimeout) + err := result.Next(ctx) + cancel() + if err != nil { + return err + } } - cnt += len(result.GetMessages()) + } else { + query := legacy_store.Query{ + PubsubTopic: options.PubSubTopic, + ContentTopics: options.ContentTopics.Value(), + StartTime: proto.Int64(options.StartTime), + EndTime: proto.Int64(options.EndTime), + } + + ctx, cancel := context.WithTimeout(context.Background(), options.QueryTimeout) + result, err := wakuNode.LegacyStore().Query(ctx, query, + legacy_store.WithPeerAddr(*options.StoreNode), + legacy_store.WithPaging(false, 20), + ) + cancel() + if err != nil { + return err + } + + for { + ctx, cancel := context.WithTimeout(context.Background(), options.QueryTimeout) + hasNext, err := result.Next(ctx) + cancel() + if err != nil { + return err + } + + if !hasNext { // No more messages available + break + } + + cnt += len(result.GetMessages()) + } } logger.Info("TOTAL MESSAGES RETRIEVED", zap.Int("num", cnt))