mirror of https://github.com/status-im/go-waku.git
feat: ratelimit store queries and add options to Next (#1221)
This commit is contained in:
parent
991e872de9
commit
f0acee4d1d
|
@ -292,7 +292,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
|
||||||
w.filterLightNode = filter.NewWakuFilterLightNode(w.bcaster, w.peermanager, w.timesource, w.opts.onlineChecker, w.opts.prometheusReg, w.log)
|
w.filterLightNode = filter.NewWakuFilterLightNode(w.bcaster, w.peermanager, w.timesource, w.opts.onlineChecker, w.opts.prometheusReg, w.log)
|
||||||
w.lightPush = lightpush.NewWakuLightPush(w.Relay(), w.peermanager, w.opts.prometheusReg, w.log, w.opts.lightpushOpts...)
|
w.lightPush = lightpush.NewWakuLightPush(w.Relay(), w.peermanager, w.opts.prometheusReg, w.log, w.opts.lightpushOpts...)
|
||||||
|
|
||||||
w.store = store.NewWakuStore(w.peermanager, w.timesource, w.log)
|
w.store = store.NewWakuStore(w.peermanager, w.timesource, w.log, w.opts.storeRateLimit)
|
||||||
|
|
||||||
if params.storeFactory != nil {
|
if params.storeFactory != nil {
|
||||||
w.storeFactory = params.storeFactory
|
w.storeFactory = params.storeFactory
|
||||||
|
|
|
@ -38,6 +38,7 @@ import (
|
||||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"go.uber.org/zap/zapcore"
|
"go.uber.org/zap/zapcore"
|
||||||
|
"golang.org/x/time/rate"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Default UserAgent
|
// Default UserAgent
|
||||||
|
@ -94,6 +95,8 @@ type WakuNodeParameters struct {
|
||||||
enableStore bool
|
enableStore bool
|
||||||
messageProvider legacy_store.MessageProvider
|
messageProvider legacy_store.MessageProvider
|
||||||
|
|
||||||
|
storeRateLimit rate.Limit
|
||||||
|
|
||||||
enableRendezvousPoint bool
|
enableRendezvousPoint bool
|
||||||
rendezvousDB *rendezvous.DB
|
rendezvousDB *rendezvous.DB
|
||||||
|
|
||||||
|
@ -139,6 +142,7 @@ var DefaultWakuNodeOptions = []WakuNodeOption{
|
||||||
WithCircuitRelayParams(2*time.Second, 3*time.Minute),
|
WithCircuitRelayParams(2*time.Second, 3*time.Minute),
|
||||||
WithPeerStoreCapacity(DefaultMaxPeerStoreCapacity),
|
WithPeerStoreCapacity(DefaultMaxPeerStoreCapacity),
|
||||||
WithOnlineChecker(onlinechecker.NewDefaultOnlineChecker(true)),
|
WithOnlineChecker(onlinechecker.NewDefaultOnlineChecker(true)),
|
||||||
|
WithWakuStoreRateLimit(8), // Value currently set in status.staging
|
||||||
}
|
}
|
||||||
|
|
||||||
// MultiAddresses return the list of multiaddresses configured in the node
|
// MultiAddresses return the list of multiaddresses configured in the node
|
||||||
|
@ -458,6 +462,16 @@ func WithWakuFilterFullNode(filterOpts ...filter.Option) WakuNodeOption {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithWakuStoreRateLimit is used to set a default rate limit on which storenodes will
|
||||||
|
// be sent per peerID to avoid running into a TOO_MANY_REQUESTS (429) error when consuming
|
||||||
|
// the store protocol from a storenode
|
||||||
|
func WithWakuStoreRateLimit(value rate.Limit) WakuNodeOption {
|
||||||
|
return func(params *WakuNodeParameters) error {
|
||||||
|
params.storeRateLimit = value
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// WithWakuStore enables the Waku V2 Store protocol and if the messages should
|
// WithWakuStore enables the Waku V2 Store protocol and if the messages should
|
||||||
// be stored or not in a message provider.
|
// be stored or not in a message provider.
|
||||||
func WithWakuStore() WakuNodeOption {
|
func WithWakuStore() WakuNodeOption {
|
||||||
|
|
|
@ -19,6 +19,7 @@ import (
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
|
"github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
|
||||||
"github.com/waku-org/go-waku/waku/v2/timesource"
|
"github.com/waku-org/go-waku/waku/v2/timesource"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
"golang.org/x/time/rate"
|
||||||
"google.golang.org/protobuf/proto"
|
"google.golang.org/protobuf/proto"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -69,14 +70,19 @@ type WakuStore struct {
|
||||||
timesource timesource.Timesource
|
timesource timesource.Timesource
|
||||||
log *zap.Logger
|
log *zap.Logger
|
||||||
pm *peermanager.PeerManager
|
pm *peermanager.PeerManager
|
||||||
|
|
||||||
|
defaultRatelimit rate.Limit
|
||||||
|
rateLimiters map[peer.ID]*rate.Limiter
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewWakuStore is used to instantiate a StoreV3 client
|
// NewWakuStore is used to instantiate a StoreV3 client
|
||||||
func NewWakuStore(pm *peermanager.PeerManager, timesource timesource.Timesource, log *zap.Logger) *WakuStore {
|
func NewWakuStore(pm *peermanager.PeerManager, timesource timesource.Timesource, log *zap.Logger, defaultRatelimit rate.Limit) *WakuStore {
|
||||||
s := new(WakuStore)
|
s := new(WakuStore)
|
||||||
s.log = log.Named("store-client")
|
s.log = log.Named("store-client")
|
||||||
s.timesource = timesource
|
s.timesource = timesource
|
||||||
s.pm = pm
|
s.pm = pm
|
||||||
|
s.defaultRatelimit = defaultRatelimit
|
||||||
|
s.rateLimiters = make(map[peer.ID]*rate.Limiter)
|
||||||
|
|
||||||
if pm != nil {
|
if pm != nil {
|
||||||
pm.RegisterWakuProtocol(StoreQueryID_v300, StoreENRField)
|
pm.RegisterWakuProtocol(StoreQueryID_v300, StoreENRField)
|
||||||
|
@ -171,7 +177,7 @@ func (s *WakuStore) Request(ctx context.Context, criteria Criteria, opts ...Requ
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
response, err := s.queryFrom(ctx, storeRequest, params.selectedPeer)
|
response, err := s.queryFrom(ctx, storeRequest, params)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -211,7 +217,7 @@ func (s *WakuStore) Exists(ctx context.Context, messageHash wpb.MessageHash, opt
|
||||||
return len(result.messages) != 0, nil
|
return len(result.messages) != 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *WakuStore) next(ctx context.Context, r *Result) (*Result, error) {
|
func (s *WakuStore) next(ctx context.Context, r *Result, opts ...RequestOption) (*Result, error) {
|
||||||
if r.IsComplete() {
|
if r.IsComplete() {
|
||||||
return &Result{
|
return &Result{
|
||||||
store: s,
|
store: s,
|
||||||
|
@ -223,11 +229,22 @@ func (s *WakuStore) next(ctx context.Context, r *Result) (*Result, error) {
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
params := new(Parameters)
|
||||||
|
params.selectedPeer = r.PeerID()
|
||||||
|
optList := DefaultOptions()
|
||||||
|
optList = append(optList, opts...)
|
||||||
|
for _, opt := range optList {
|
||||||
|
err := opt(params)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
storeRequest := proto.Clone(r.storeRequest).(*pb.StoreQueryRequest)
|
storeRequest := proto.Clone(r.storeRequest).(*pb.StoreQueryRequest)
|
||||||
storeRequest.RequestId = hex.EncodeToString(protocol.GenerateRequestID())
|
storeRequest.RequestId = hex.EncodeToString(protocol.GenerateRequestID())
|
||||||
storeRequest.PaginationCursor = r.Cursor()
|
storeRequest.PaginationCursor = r.Cursor()
|
||||||
|
|
||||||
response, err := s.queryFrom(ctx, storeRequest, r.PeerID())
|
response, err := s.queryFrom(ctx, storeRequest, params)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -245,16 +262,28 @@ func (s *WakuStore) next(ctx context.Context, r *Result) (*Result, error) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRequest, selectedPeer peer.ID) (*pb.StoreQueryResponse, error) {
|
func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRequest, params *Parameters) (*pb.StoreQueryResponse, error) {
|
||||||
logger := s.log.With(logging.HostID("peer", selectedPeer), zap.String("requestId", hex.EncodeToString([]byte(storeRequest.RequestId))))
|
logger := s.log.With(logging.HostID("peer", params.selectedPeer), zap.String("requestId", hex.EncodeToString([]byte(storeRequest.RequestId))))
|
||||||
|
|
||||||
logger.Debug("sending store request")
|
logger.Debug("sending store request")
|
||||||
|
|
||||||
stream, err := s.h.NewStream(ctx, selectedPeer, StoreQueryID_v300)
|
if !params.skipRatelimit {
|
||||||
|
rateLimiter, ok := s.rateLimiters[params.selectedPeer]
|
||||||
|
if !ok {
|
||||||
|
rateLimiter = rate.NewLimiter(s.defaultRatelimit, 1)
|
||||||
|
s.rateLimiters[params.selectedPeer] = rateLimiter
|
||||||
|
}
|
||||||
|
err := rateLimiter.Wait(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
stream, err := s.h.NewStream(ctx, params.selectedPeer, StoreQueryID_v300)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("creating stream to peer", zap.Error(err))
|
logger.Error("creating stream to peer", zap.Error(err))
|
||||||
if ps, ok := s.h.Peerstore().(peerstore.WakuPeerstore); ok {
|
if ps, ok := s.h.Peerstore().(peerstore.WakuPeerstore); ok {
|
||||||
ps.AddConnFailure(selectedPeer)
|
ps.AddConnFailure(params.selectedPeer)
|
||||||
}
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -69,7 +69,7 @@ func TestStoreClient(t *testing.T) {
|
||||||
pm.Start(ctx)
|
pm.Start(ctx)
|
||||||
|
|
||||||
// Creating a storeV3 instance for all queries
|
// Creating a storeV3 instance for all queries
|
||||||
wakuStore := NewWakuStore(pm, timesource.NewDefaultClock(), utils.Logger())
|
wakuStore := NewWakuStore(pm, timesource.NewDefaultClock(), utils.Logger(), 8)
|
||||||
wakuStore.SetHost(host)
|
wakuStore.SetHost(host)
|
||||||
|
|
||||||
_, err = wakuRelay.Subscribe(context.Background(), protocol.NewContentFilter(pubsubTopic), relay.WithoutConsumer())
|
_, err = wakuRelay.Subscribe(context.Background(), protocol.NewContentFilter(pubsubTopic), relay.WithoutConsumer())
|
||||||
|
|
|
@ -19,6 +19,7 @@ type Parameters struct {
|
||||||
pageLimit uint64
|
pageLimit uint64
|
||||||
forward bool
|
forward bool
|
||||||
includeData bool
|
includeData bool
|
||||||
|
skipRatelimit bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type RequestOption func(*Parameters) error
|
type RequestOption func(*Parameters) error
|
||||||
|
@ -115,6 +116,14 @@ func IncludeData(v bool) RequestOption {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Skips the rate limiting for the current request (might cause the store request to fail with TOO_MANY_REQUESTS (429))
|
||||||
|
func SkipRateLimit() RequestOption {
|
||||||
|
return func(params *Parameters) error {
|
||||||
|
params.skipRatelimit = true
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Default options to be used when querying a store node for results
|
// Default options to be used when querying a store node for results
|
||||||
func DefaultOptions() []RequestOption {
|
func DefaultOptions() []RequestOption {
|
||||||
return []RequestOption{
|
return []RequestOption{
|
||||||
|
|
|
@ -39,14 +39,14 @@ func (r *Result) Response() *pb.StoreQueryResponse {
|
||||||
return r.storeResponse
|
return r.storeResponse
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Result) Next(ctx context.Context) error {
|
func (r *Result) Next(ctx context.Context, opts ...RequestOption) error {
|
||||||
if r.cursor == nil {
|
if r.cursor == nil {
|
||||||
r.done = true
|
r.done = true
|
||||||
r.messages = nil
|
r.messages = nil
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
newResult, err := r.store.next(ctx, r)
|
newResult, err := r.store.next(ctx, r, opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue