mirror of
https://github.com/logos-messaging/logos-messaging-go.git
synced 2026-01-26 09:33:06 +00:00
fix: code review
This commit is contained in:
parent
2dfbdba4a9
commit
95676e2ec8
@ -292,7 +292,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
|
||||
w.filterLightNode = filter.NewWakuFilterLightNode(w.bcaster, w.peermanager, w.timesource, w.opts.onlineChecker, w.opts.prometheusReg, w.log)
|
||||
w.lightPush = lightpush.NewWakuLightPush(w.Relay(), w.peermanager, w.opts.prometheusReg, w.log, w.opts.lightpushOpts...)
|
||||
|
||||
w.store = store.NewWakuStore(w.peermanager, w.timesource, w.log)
|
||||
w.store = store.NewWakuStore(w.peermanager, w.timesource, w.log, w.opts.storeRateLimit)
|
||||
|
||||
if params.storeFactory != nil {
|
||||
w.storeFactory = params.storeFactory
|
||||
|
||||
@ -38,6 +38,7 @@ import (
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
// Default UserAgent
|
||||
@ -94,6 +95,8 @@ type WakuNodeParameters struct {
|
||||
enableStore bool
|
||||
messageProvider legacy_store.MessageProvider
|
||||
|
||||
storeRateLimit rate.Limit
|
||||
|
||||
enableRendezvousPoint bool
|
||||
rendezvousDB *rendezvous.DB
|
||||
|
||||
@ -139,6 +142,7 @@ var DefaultWakuNodeOptions = []WakuNodeOption{
|
||||
WithCircuitRelayParams(2*time.Second, 3*time.Minute),
|
||||
WithPeerStoreCapacity(DefaultMaxPeerStoreCapacity),
|
||||
WithOnlineChecker(onlinechecker.NewDefaultOnlineChecker(true)),
|
||||
WithWakuStoreRateLimit(8), // Value currently set in status.staging
|
||||
}
|
||||
|
||||
// MultiAddresses return the list of multiaddresses configured in the node
|
||||
@ -458,6 +462,16 @@ func WithWakuFilterFullNode(filterOpts ...filter.Option) WakuNodeOption {
|
||||
}
|
||||
}
|
||||
|
||||
// WithWakuStoreRateLimit is used to set a default rate limit on which storenodes will
|
||||
// be sent per peerID to avoid running into a TOO_MANY_REQUESTS (429) error when consuming
|
||||
// the store protocol from a storenode
|
||||
func WithWakuStoreRateLimit(value rate.Limit) WakuNodeOption {
|
||||
return func(params *WakuNodeParameters) error {
|
||||
params.storeRateLimit = value
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithWakuStore enables the Waku V2 Store protocol and if the messages should
|
||||
// be stored or not in a message provider.
|
||||
func WithWakuStore() WakuNodeOption {
|
||||
|
||||
@ -33,8 +33,6 @@ const MaxPageSize = 100
|
||||
// DefaultPageSize is the default number of waku messages per page
|
||||
const DefaultPageSize = 20
|
||||
|
||||
const maxQueriesPerSecond = 8
|
||||
|
||||
const ok = uint32(200)
|
||||
|
||||
var (
|
||||
@ -68,19 +66,22 @@ func (e *StoreError) Error() string {
|
||||
|
||||
// WakuStore represents an instance of a store client
|
||||
type WakuStore struct {
|
||||
h host.Host
|
||||
timesource timesource.Timesource
|
||||
log *zap.Logger
|
||||
pm *peermanager.PeerManager
|
||||
rateLimiters map[peer.ID]*rate.Limiter
|
||||
h host.Host
|
||||
timesource timesource.Timesource
|
||||
log *zap.Logger
|
||||
pm *peermanager.PeerManager
|
||||
|
||||
defaultRatelimit rate.Limit
|
||||
rateLimiters map[peer.ID]*rate.Limiter
|
||||
}
|
||||
|
||||
// NewWakuStore is used to instantiate a StoreV3 client
|
||||
func NewWakuStore(pm *peermanager.PeerManager, timesource timesource.Timesource, log *zap.Logger) *WakuStore {
|
||||
func NewWakuStore(pm *peermanager.PeerManager, timesource timesource.Timesource, log *zap.Logger, defaultRatelimit rate.Limit) *WakuStore {
|
||||
s := new(WakuStore)
|
||||
s.log = log.Named("store-client")
|
||||
s.timesource = timesource
|
||||
s.pm = pm
|
||||
s.defaultRatelimit = defaultRatelimit
|
||||
s.rateLimiters = make(map[peer.ID]*rate.Limiter)
|
||||
|
||||
if pm != nil {
|
||||
@ -269,7 +270,7 @@ func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRe
|
||||
if !params.skipRatelimit {
|
||||
rateLimiter, ok := s.rateLimiters[params.selectedPeer]
|
||||
if !ok {
|
||||
rateLimiter = rate.NewLimiter(maxQueriesPerSecond, 1)
|
||||
rateLimiter = rate.NewLimiter(s.defaultRatelimit, 1)
|
||||
s.rateLimiters[params.selectedPeer] = rateLimiter
|
||||
}
|
||||
err := rateLimiter.Wait(ctx)
|
||||
|
||||
@ -69,7 +69,7 @@ func TestStoreClient(t *testing.T) {
|
||||
pm.Start(ctx)
|
||||
|
||||
// Creating a storeV3 instance for all queries
|
||||
wakuStore := NewWakuStore(pm, timesource.NewDefaultClock(), utils.Logger())
|
||||
wakuStore := NewWakuStore(pm, timesource.NewDefaultClock(), utils.Logger(), 8)
|
||||
wakuStore.SetHost(host)
|
||||
|
||||
_, err = wakuRelay.Subscribe(context.Background(), protocol.NewContentFilter(pubsubTopic), relay.WithoutConsumer())
|
||||
|
||||
@ -116,6 +116,7 @@ func IncludeData(v bool) RequestOption {
|
||||
}
|
||||
}
|
||||
|
||||
// Skips the rate limiting for the current request (might cause the store request to fail with TOO_MANY_REQUESTS (429))
|
||||
func SkipRateLimit() RequestOption {
|
||||
return func(params *Parameters) error {
|
||||
params.skipRatelimit = true
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user