From e42defcfefd51d4c50a95a618b7871ccc037d241 Mon Sep 17 00:00:00 2001 From: kaichaosun Date: Tue, 3 Dec 2024 15:19:44 +0800 Subject: [PATCH] feat: rate limit with rln configuration --- waku/v2/api/publish/message_sender.go | 9 +-- waku/v2/api/publish/rate_limiting.go | 18 ++++-- waku/v2/api/publish/rate_limiting_test.go | 2 +- waku/v2/api/publish/rln_rate_limiting.go | 64 +++++++++++++++++++ waku/v2/api/publish/rln_rate_limiting_test.go | 26 ++++++++ 5 files changed, 107 insertions(+), 12 deletions(-) create mode 100644 waku/v2/api/publish/rln_rate_limiting.go create mode 100644 waku/v2/api/publish/rln_rate_limiting_test.go diff --git a/waku/v2/api/publish/message_sender.go b/waku/v2/api/publish/message_sender.go index 4bf597c1..1cb60a60 100644 --- a/waku/v2/api/publish/message_sender.go +++ b/waku/v2/api/publish/message_sender.go @@ -12,12 +12,9 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "go.uber.org/zap" - "golang.org/x/time/rate" ) const DefaultPeersToPublishForLightpush = 2 -const DefaultPublishingLimiterRate = rate.Limit(5) -const DefaultPublishingLimitBurst = 10 type PublishMethod int @@ -53,7 +50,7 @@ type MessageSender struct { publishMethod PublishMethod publisher Publisher messageSentCheck ISentCheck - rateLimiter *PublishRateLimiter + rateLimiter PublishRateLimiter logger *zap.Logger evtMessageSent event.Emitter } @@ -89,7 +86,7 @@ func NewMessageSender(publishMethod PublishMethod, publisher Publisher, logger * return &MessageSender{ publishMethod: publishMethod, publisher: publisher, - rateLimiter: NewPublishRateLimiter(DefaultPublishingLimiterRate, DefaultPublishingLimitBurst), + rateLimiter: NewRlnRateLimiter(RlnLimiterCapacity, RlnLimiterRefillInterval), logger: logger, }, nil } @@ -99,7 +96,7 @@ func (ms *MessageSender) WithMessageSentCheck(messageSentCheck ISentCheck) *Mess return ms } -func (ms *MessageSender) WithRateLimiting(rateLimiter *PublishRateLimiter) *MessageSender { +func (ms *MessageSender) WithRateLimiting(rateLimiter PublishRateLimiter) *MessageSender { ms.rateLimiter = rateLimiter return ms } diff --git a/waku/v2/api/publish/rate_limiting.go b/waku/v2/api/publish/rate_limiting.go index a0bddcbd..b4aa9a3e 100644 --- a/waku/v2/api/publish/rate_limiting.go +++ b/waku/v2/api/publish/rate_limiting.go @@ -9,22 +9,30 @@ import ( "golang.org/x/time/rate" ) +const DefaultPublishingLimiterRate = rate.Limit(5) +const DefaultPublishingLimitBurst = 10 + +// RateLimiter +type PublishRateLimiter interface { + Check(ctx context.Context, logger *zap.Logger) error +} + // PublishRateLimiter is used to decorate publish functions to limit the // number of messages per second that can be published -type PublishRateLimiter struct { +type DefaultRateLimiter struct { limiter *rate.Limiter } // NewPublishRateLimiter will create a new instance of PublishRateLimiter. // You can specify an rate.Inf value to in practice ignore the rate limiting -func NewPublishRateLimiter(r rate.Limit, b int) *PublishRateLimiter { - return &PublishRateLimiter{ +func NewDefaultRateLimiter(r rate.Limit, b int) *DefaultRateLimiter { + return &DefaultRateLimiter{ limiter: rate.NewLimiter(r, b), } } // ThrottlePublishFn is used to decorate a PublishFn so rate limiting is applied -func (p *PublishRateLimiter) ThrottlePublishFn(ctx context.Context, publishFn PublishFn) PublishFn { +func (p *DefaultRateLimiter) ThrottlePublishFn(ctx context.Context, publishFn PublishFn) PublishFn { return func(envelope *protocol.Envelope, logger *zap.Logger) error { if err := p.Check(ctx, logger); err != nil { return err @@ -33,7 +41,7 @@ func (p *PublishRateLimiter) ThrottlePublishFn(ctx context.Context, publishFn Pu } } -func (p *PublishRateLimiter) Check(ctx context.Context, logger *zap.Logger) error { +func (p *DefaultRateLimiter) Check(ctx context.Context, logger *zap.Logger) error { if err := p.limiter.Wait(ctx); err != nil { if !errors.Is(err, context.Canceled) { logger.Error("could not send message (limiter)", zap.Error(err)) diff --git a/waku/v2/api/publish/rate_limiting_test.go b/waku/v2/api/publish/rate_limiting_test.go index bde68bf3..2053b7f0 100644 --- a/waku/v2/api/publish/rate_limiting_test.go +++ b/waku/v2/api/publish/rate_limiting_test.go @@ -14,7 +14,7 @@ import ( ) func TestRateLimit(t *testing.T) { - r := NewPublishRateLimiter(rate.Limit(1), 1) + r := NewDefaultRateLimiter(rate.Limit(1), 1) l := utils.Logger() var counter atomic.Int32 diff --git a/waku/v2/api/publish/rln_rate_limiting.go b/waku/v2/api/publish/rln_rate_limiting.go new file mode 100644 index 00000000..13d1553b --- /dev/null +++ b/waku/v2/api/publish/rln_rate_limiting.go @@ -0,0 +1,64 @@ +package publish + +import ( + "context" + "errors" + "sync" + "time" + + "go.uber.org/zap" +) + +var ErrRateLimited = errors.New("rate limit exceeded") + +const RlnLimiterCapacity = 100 +const RlnLimiterRefillInterval = 10 * time.Minute + +// RlnRateLimiter is used to rate limit the outgoing messages, +// The capacity and refillAfter comes from RLN contract configuration. +type RlnRateLimiter struct { + mu sync.Mutex + capacity int + tokens int + refillAfter time.Duration + lastRefill time.Time +} + +// NewRlnPublishRateLimiter creates a new rate limiter, starts with a full capacity bucket. +func NewRlnRateLimiter(capacity int, refillAfter time.Duration) *RlnRateLimiter { + return &RlnRateLimiter{ + capacity: capacity, + tokens: capacity, // Start with a full bucket + refillAfter: refillAfter, + lastRefill: time.Now(), + } +} + +// Allow checks if a token can be consumed, and refills the bucket if necessary +func (rl *RlnRateLimiter) Allow() bool { + rl.mu.Lock() + defer rl.mu.Unlock() + + // Refill tokens if the refill interval has passed + now := time.Now() + if now.Sub(rl.lastRefill) >= rl.refillAfter { + rl.tokens = rl.capacity // Refill the bucket + rl.lastRefill = now + } + + // Check if there are tokens available + if rl.tokens > 0 { + rl.tokens-- + return true + } + + return false +} + +func (rl *RlnRateLimiter) Check(ctx context.Context, logger *zap.Logger) error { + if rl.Allow() { + return nil + } + logger.Error("could not send message rate limited", zap.Error(ErrRateLimited)) + return ErrRateLimited +} diff --git a/waku/v2/api/publish/rln_rate_limiting_test.go b/waku/v2/api/publish/rln_rate_limiting_test.go new file mode 100644 index 00000000..f91f6ca6 --- /dev/null +++ b/waku/v2/api/publish/rln_rate_limiting_test.go @@ -0,0 +1,26 @@ +package publish + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/waku-org/go-waku/waku/v2/utils" +) + +func TestRlnRateLimit(t *testing.T) { + r := NewRlnRateLimiter(3, 5*time.Second) + l := utils.Logger() + + for i := 0; i < 3; i++ { + require.NoError(t, r.Check(context.Background(), l)) + } + require.ErrorIs(t, r.Check(context.Background(), l), ErrRateLimited) + + time.Sleep(6 * time.Second) + for i := 0; i < 3; i++ { + require.NoError(t, r.Check(context.Background(), l)) + } + require.ErrorIs(t, r.Check(context.Background(), l), ErrRateLimited) +}