feat: ratelimit store queries

This commit is contained in:
Richard Ramos 2024-09-17 17:34:46 -04:00
parent 991e872de9
commit 2dfbdba4a9
No known key found for this signature in database
GPG Key ID: 1CE87DB518195760
3 changed files with 49 additions and 13 deletions

View File

@ -19,6 +19,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
"github.com/waku-org/go-waku/waku/v2/timesource"
"go.uber.org/zap"
"golang.org/x/time/rate"
"google.golang.org/protobuf/proto"
)
@ -32,6 +33,8 @@ const MaxPageSize = 100
// DefaultPageSize is the default number of waku messages per page
const DefaultPageSize = 20
const maxQueriesPerSecond = 8
const ok = uint32(200)
var (
@ -65,10 +68,11 @@ func (e *StoreError) Error() string {
// WakuStore represents an instance of a store client
type WakuStore struct {
h host.Host
timesource timesource.Timesource
log *zap.Logger
pm *peermanager.PeerManager
h host.Host
timesource timesource.Timesource
log *zap.Logger
pm *peermanager.PeerManager
rateLimiters map[peer.ID]*rate.Limiter
}
// NewWakuStore is used to instantiate a StoreV3 client
@ -77,6 +81,7 @@ func NewWakuStore(pm *peermanager.PeerManager, timesource timesource.Timesource,
s.log = log.Named("store-client")
s.timesource = timesource
s.pm = pm
s.rateLimiters = make(map[peer.ID]*rate.Limiter)
if pm != nil {
pm.RegisterWakuProtocol(StoreQueryID_v300, StoreENRField)
@ -171,7 +176,7 @@ func (s *WakuStore) Request(ctx context.Context, criteria Criteria, opts ...Requ
return nil, err
}
response, err := s.queryFrom(ctx, storeRequest, params.selectedPeer)
response, err := s.queryFrom(ctx, storeRequest, params)
if err != nil {
return nil, err
}
@ -211,7 +216,7 @@ func (s *WakuStore) Exists(ctx context.Context, messageHash wpb.MessageHash, opt
return len(result.messages) != 0, nil
}
func (s *WakuStore) next(ctx context.Context, r *Result) (*Result, error) {
func (s *WakuStore) next(ctx context.Context, r *Result, opts ...RequestOption) (*Result, error) {
if r.IsComplete() {
return &Result{
store: s,
@ -223,11 +228,22 @@ func (s *WakuStore) next(ctx context.Context, r *Result) (*Result, error) {
}, nil
}
params := new(Parameters)
params.selectedPeer = r.PeerID()
optList := DefaultOptions()
optList = append(optList, opts...)
for _, opt := range optList {
err := opt(params)
if err != nil {
return nil, err
}
}
storeRequest := proto.Clone(r.storeRequest).(*pb.StoreQueryRequest)
storeRequest.RequestId = hex.EncodeToString(protocol.GenerateRequestID())
storeRequest.PaginationCursor = r.Cursor()
response, err := s.queryFrom(ctx, storeRequest, r.PeerID())
response, err := s.queryFrom(ctx, storeRequest, params)
if err != nil {
return nil, err
}
@ -245,16 +261,28 @@ func (s *WakuStore) next(ctx context.Context, r *Result) (*Result, error) {
}
func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRequest, selectedPeer peer.ID) (*pb.StoreQueryResponse, error) {
logger := s.log.With(logging.HostID("peer", selectedPeer), zap.String("requestId", hex.EncodeToString([]byte(storeRequest.RequestId))))
func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRequest, params *Parameters) (*pb.StoreQueryResponse, error) {
logger := s.log.With(logging.HostID("peer", params.selectedPeer), zap.String("requestId", hex.EncodeToString([]byte(storeRequest.RequestId))))
logger.Debug("sending store request")
stream, err := s.h.NewStream(ctx, selectedPeer, StoreQueryID_v300)
if !params.skipRatelimit {
rateLimiter, ok := s.rateLimiters[params.selectedPeer]
if !ok {
rateLimiter = rate.NewLimiter(maxQueriesPerSecond, 1)
s.rateLimiters[params.selectedPeer] = rateLimiter
}
err := rateLimiter.Wait(ctx)
if err != nil {
return nil, err
}
}
stream, err := s.h.NewStream(ctx, params.selectedPeer, StoreQueryID_v300)
if err != nil {
logger.Error("creating stream to peer", zap.Error(err))
if ps, ok := s.h.Peerstore().(peerstore.WakuPeerstore); ok {
ps.AddConnFailure(selectedPeer)
ps.AddConnFailure(params.selectedPeer)
}
return nil, err
}

View File

@ -19,6 +19,7 @@ type Parameters struct {
pageLimit uint64
forward bool
includeData bool
skipRatelimit bool
}
type RequestOption func(*Parameters) error
@ -115,6 +116,13 @@ func IncludeData(v bool) RequestOption {
}
}
func SkipRateLimit() RequestOption {
return func(params *Parameters) error {
params.skipRatelimit = true
return nil
}
}
// Default options to be used when querying a store node for results
func DefaultOptions() []RequestOption {
return []RequestOption{

View File

@ -39,14 +39,14 @@ func (r *Result) Response() *pb.StoreQueryResponse {
return r.storeResponse
}
func (r *Result) Next(ctx context.Context) error {
func (r *Result) Next(ctx context.Context, opts ...RequestOption) error {
if r.cursor == nil {
r.done = true
r.messages = nil
return nil
}
newResult, err := r.store.next(ctx, r)
newResult, err := r.store.next(ctx, r, opts...)
if err != nil {
return err
}