From f0acee4d1dd3c36e0e0d1eba344bb85f2c5264ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?rich=CE=9Brd?= Date: Wed, 18 Sep 2024 17:09:37 -0400 Subject: [PATCH] feat: ratelimit store queries and add options to Next (#1221) --- waku/v2/node/wakunode2.go | 2 +- waku/v2/node/wakuoptions.go | 14 +++++++++ waku/v2/protocol/store/client.go | 45 ++++++++++++++++++++++----- waku/v2/protocol/store/client_test.go | 2 +- waku/v2/protocol/store/options.go | 9 ++++++ waku/v2/protocol/store/result.go | 4 +-- 6 files changed, 64 insertions(+), 12 deletions(-) diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index f9dc443f..10153fd6 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -292,7 +292,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { w.filterLightNode = filter.NewWakuFilterLightNode(w.bcaster, w.peermanager, w.timesource, w.opts.onlineChecker, w.opts.prometheusReg, w.log) w.lightPush = lightpush.NewWakuLightPush(w.Relay(), w.peermanager, w.opts.prometheusReg, w.log, w.opts.lightpushOpts...) - w.store = store.NewWakuStore(w.peermanager, w.timesource, w.log) + w.store = store.NewWakuStore(w.peermanager, w.timesource, w.log, w.opts.storeRateLimit) if params.storeFactory != nil { w.storeFactory = params.storeFactory diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index 445065de..112cafe6 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -38,6 +38,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" "go.uber.org/zap/zapcore" + "golang.org/x/time/rate" ) // Default UserAgent @@ -94,6 +95,8 @@ type WakuNodeParameters struct { enableStore bool messageProvider legacy_store.MessageProvider + storeRateLimit rate.Limit + enableRendezvousPoint bool rendezvousDB *rendezvous.DB @@ -139,6 +142,7 @@ var DefaultWakuNodeOptions = []WakuNodeOption{ WithCircuitRelayParams(2*time.Second, 3*time.Minute), WithPeerStoreCapacity(DefaultMaxPeerStoreCapacity), WithOnlineChecker(onlinechecker.NewDefaultOnlineChecker(true)), + WithWakuStoreRateLimit(8), // Value currently set in status.staging } // MultiAddresses return the list of multiaddresses configured in the node @@ -458,6 +462,16 @@ func WithWakuFilterFullNode(filterOpts ...filter.Option) WakuNodeOption { } } +// WithWakuStoreRateLimit is used to set a default rate limit on which storenodes will +// be sent per peerID to avoid running into a TOO_MANY_REQUESTS (429) error when consuming +// the store protocol from a storenode +func WithWakuStoreRateLimit(value rate.Limit) WakuNodeOption { + return func(params *WakuNodeParameters) error { + params.storeRateLimit = value + return nil + } +} + // WithWakuStore enables the Waku V2 Store protocol and if the messages should // be stored or not in a message provider. func WithWakuStore() WakuNodeOption { diff --git a/waku/v2/protocol/store/client.go b/waku/v2/protocol/store/client.go index 92c47ff4..3398c4bf 100644 --- a/waku/v2/protocol/store/client.go +++ b/waku/v2/protocol/store/client.go @@ -19,6 +19,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" "github.com/waku-org/go-waku/waku/v2/timesource" "go.uber.org/zap" + "golang.org/x/time/rate" "google.golang.org/protobuf/proto" ) @@ -69,14 +70,19 @@ type WakuStore struct { timesource timesource.Timesource log *zap.Logger pm *peermanager.PeerManager + + defaultRatelimit rate.Limit + rateLimiters map[peer.ID]*rate.Limiter } // NewWakuStore is used to instantiate a StoreV3 client -func NewWakuStore(pm *peermanager.PeerManager, timesource timesource.Timesource, log *zap.Logger) *WakuStore { +func NewWakuStore(pm *peermanager.PeerManager, timesource timesource.Timesource, log *zap.Logger, defaultRatelimit rate.Limit) *WakuStore { s := new(WakuStore) s.log = log.Named("store-client") s.timesource = timesource s.pm = pm + s.defaultRatelimit = defaultRatelimit + s.rateLimiters = make(map[peer.ID]*rate.Limiter) if pm != nil { pm.RegisterWakuProtocol(StoreQueryID_v300, StoreENRField) @@ -171,7 +177,7 @@ func (s *WakuStore) Request(ctx context.Context, criteria Criteria, opts ...Requ return nil, err } - response, err := s.queryFrom(ctx, storeRequest, params.selectedPeer) + response, err := s.queryFrom(ctx, storeRequest, params) if err != nil { return nil, err } @@ -211,7 +217,7 @@ func (s *WakuStore) Exists(ctx context.Context, messageHash wpb.MessageHash, opt return len(result.messages) != 0, nil } -func (s *WakuStore) next(ctx context.Context, r *Result) (*Result, error) { +func (s *WakuStore) next(ctx context.Context, r *Result, opts ...RequestOption) (*Result, error) { if r.IsComplete() { return &Result{ store: s, @@ -223,11 +229,22 @@ func (s *WakuStore) next(ctx context.Context, r *Result) (*Result, error) { }, nil } + params := new(Parameters) + params.selectedPeer = r.PeerID() + optList := DefaultOptions() + optList = append(optList, opts...) + for _, opt := range optList { + err := opt(params) + if err != nil { + return nil, err + } + } + storeRequest := proto.Clone(r.storeRequest).(*pb.StoreQueryRequest) storeRequest.RequestId = hex.EncodeToString(protocol.GenerateRequestID()) storeRequest.PaginationCursor = r.Cursor() - response, err := s.queryFrom(ctx, storeRequest, r.PeerID()) + response, err := s.queryFrom(ctx, storeRequest, params) if err != nil { return nil, err } @@ -245,16 +262,28 @@ func (s *WakuStore) next(ctx context.Context, r *Result) (*Result, error) { } -func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRequest, selectedPeer peer.ID) (*pb.StoreQueryResponse, error) { - logger := s.log.With(logging.HostID("peer", selectedPeer), zap.String("requestId", hex.EncodeToString([]byte(storeRequest.RequestId)))) +func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRequest, params *Parameters) (*pb.StoreQueryResponse, error) { + logger := s.log.With(logging.HostID("peer", params.selectedPeer), zap.String("requestId", hex.EncodeToString([]byte(storeRequest.RequestId)))) logger.Debug("sending store request") - stream, err := s.h.NewStream(ctx, selectedPeer, StoreQueryID_v300) + if !params.skipRatelimit { + rateLimiter, ok := s.rateLimiters[params.selectedPeer] + if !ok { + rateLimiter = rate.NewLimiter(s.defaultRatelimit, 1) + s.rateLimiters[params.selectedPeer] = rateLimiter + } + err := rateLimiter.Wait(ctx) + if err != nil { + return nil, err + } + } + + stream, err := s.h.NewStream(ctx, params.selectedPeer, StoreQueryID_v300) if err != nil { logger.Error("creating stream to peer", zap.Error(err)) if ps, ok := s.h.Peerstore().(peerstore.WakuPeerstore); ok { - ps.AddConnFailure(selectedPeer) + ps.AddConnFailure(params.selectedPeer) } return nil, err } diff --git a/waku/v2/protocol/store/client_test.go b/waku/v2/protocol/store/client_test.go index 357dc270..733a27e9 100644 --- a/waku/v2/protocol/store/client_test.go +++ b/waku/v2/protocol/store/client_test.go @@ -69,7 +69,7 @@ func TestStoreClient(t *testing.T) { pm.Start(ctx) // Creating a storeV3 instance for all queries - wakuStore := NewWakuStore(pm, timesource.NewDefaultClock(), utils.Logger()) + wakuStore := NewWakuStore(pm, timesource.NewDefaultClock(), utils.Logger(), 8) wakuStore.SetHost(host) _, err = wakuRelay.Subscribe(context.Background(), protocol.NewContentFilter(pubsubTopic), relay.WithoutConsumer()) diff --git a/waku/v2/protocol/store/options.go b/waku/v2/protocol/store/options.go index b38afd53..b8deba47 100644 --- a/waku/v2/protocol/store/options.go +++ b/waku/v2/protocol/store/options.go @@ -19,6 +19,7 @@ type Parameters struct { pageLimit uint64 forward bool includeData bool + skipRatelimit bool } type RequestOption func(*Parameters) error @@ -115,6 +116,14 @@ func IncludeData(v bool) RequestOption { } } +// Skips the rate limiting for the current request (might cause the store request to fail with TOO_MANY_REQUESTS (429)) +func SkipRateLimit() RequestOption { + return func(params *Parameters) error { + params.skipRatelimit = true + return nil + } +} + // Default options to be used when querying a store node for results func DefaultOptions() []RequestOption { return []RequestOption{ diff --git a/waku/v2/protocol/store/result.go b/waku/v2/protocol/store/result.go index 5ea4765e..604d6453 100644 --- a/waku/v2/protocol/store/result.go +++ b/waku/v2/protocol/store/result.go @@ -39,14 +39,14 @@ func (r *Result) Response() *pb.StoreQueryResponse { return r.storeResponse } -func (r *Result) Next(ctx context.Context) error { +func (r *Result) Next(ctx context.Context, opts ...RequestOption) error { if r.cursor == nil { r.done = true r.messages = nil return nil } - newResult, err := r.store.next(ctx, r) + newResult, err := r.store.next(ctx, r, opts...) if err != nil { return err }