From 6dcf177414f4f9fc1795be614b63e3069f657168 Mon Sep 17 00:00:00 2001 From: kaichao Date: Tue, 24 Dec 2024 11:27:48 +0800 Subject: [PATCH] feat: rate limit with rln configuration (#1262) --- waku/v2/api/publish/message_sender.go | 16 ++--- waku/v2/api/publish/message_sender_test.go | 10 +-- waku/v2/api/publish/rate_limiting.go | 18 ++++-- waku/v2/api/publish/rate_limiting_test.go | 2 +- waku/v2/api/publish/rln_rate_limiting.go | 63 +++++++++++++++++++ waku/v2/api/publish/rln_rate_limiting_test.go | 26 ++++++++ 6 files changed, 117 insertions(+), 18 deletions(-) create mode 100644 waku/v2/api/publish/rln_rate_limiting.go create mode 100644 waku/v2/api/publish/rln_rate_limiting_test.go diff --git a/waku/v2/api/publish/message_sender.go b/waku/v2/api/publish/message_sender.go index 4bf597c1..7ccfe840 100644 --- a/waku/v2/api/publish/message_sender.go +++ b/waku/v2/api/publish/message_sender.go @@ -12,12 +12,9 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "go.uber.org/zap" - "golang.org/x/time/rate" ) const DefaultPeersToPublishForLightpush = 2 -const DefaultPublishingLimiterRate = rate.Limit(5) -const DefaultPublishingLimitBurst = 10 type PublishMethod int @@ -53,7 +50,7 @@ type MessageSender struct { publishMethod PublishMethod publisher Publisher messageSentCheck ISentCheck - rateLimiter *PublishRateLimiter + rateLimiter PublishRateLimiter logger *zap.Logger evtMessageSent event.Emitter } @@ -82,14 +79,19 @@ func (r *Request) WithPublishMethod(publishMethod PublishMethod) *Request { return r } -func NewMessageSender(publishMethod PublishMethod, publisher Publisher, logger *zap.Logger) (*MessageSender, error) { +func NewMessageSender(publishMethod PublishMethod, publisher Publisher, rateLimiter PublishRateLimiter, logger *zap.Logger) (*MessageSender, error) { if publishMethod == UnknownMethod { return nil, errors.New("publish method is required") } + + if rateLimiter == nil { + rateLimiter = NewDefaultRateLimiter(DefaultPublishingLimiterRate, DefaultPublishingLimitBurst) + } + return &MessageSender{ publishMethod: publishMethod, publisher: publisher, - rateLimiter: NewPublishRateLimiter(DefaultPublishingLimiterRate, DefaultPublishingLimitBurst), + rateLimiter: rateLimiter, logger: logger, }, nil } @@ -99,7 +101,7 @@ func (ms *MessageSender) WithMessageSentCheck(messageSentCheck ISentCheck) *Mess return ms } -func (ms *MessageSender) WithRateLimiting(rateLimiter *PublishRateLimiter) *MessageSender { +func (ms *MessageSender) WithRateLimiting(rateLimiter PublishRateLimiter) *MessageSender { ms.rateLimiter = rateLimiter return ms } diff --git a/waku/v2/api/publish/message_sender_test.go b/waku/v2/api/publish/message_sender_test.go index c770992c..7266b65f 100644 --- a/waku/v2/api/publish/message_sender_test.go +++ b/waku/v2/api/publish/message_sender_test.go @@ -41,7 +41,7 @@ func (m *MockMessageSentCheck) Start() { } func TestNewSenderWithUnknownMethod(t *testing.T) { - sender, err := NewMessageSender(UnknownMethod, nil, nil) + sender, err := NewMessageSender(UnknownMethod, nil, nil, nil) require.NotNil(t, err) require.Nil(t, sender) } @@ -55,7 +55,7 @@ func TestNewSenderWithRelay(t *testing.T) { _, err = relayNode.Subscribe(context.Background(), protocol.NewContentFilter("test-pubsub-topic")) require.Nil(t, err) publisher := NewDefaultPublisher(nil, relayNode) - sender, err := NewMessageSender(Relay, publisher, utils.Logger()) + sender, err := NewMessageSender(Relay, publisher, nil, utils.Logger()) require.Nil(t, err) require.NotNil(t, sender) require.Nil(t, sender.messageSentCheck) @@ -81,7 +81,7 @@ func TestNewSenderWithRelayAndMessageSentCheck(t *testing.T) { _, err = relayNode.Subscribe(context.Background(), protocol.NewContentFilter("test-pubsub-topic")) require.Nil(t, err) publisher := NewDefaultPublisher(nil, relayNode) - sender, err := NewMessageSender(Relay, publisher, utils.Logger()) + sender, err := NewMessageSender(Relay, publisher, nil, utils.Logger()) check := &MockMessageSentCheck{Messages: make(map[string]map[common.Hash]uint32)} sender.WithMessageSentCheck(check) @@ -111,7 +111,7 @@ func TestNewSenderWithRelayAndMessageSentCheck(t *testing.T) { } func TestNewSenderWithLightPush(t *testing.T) { - sender, err := NewMessageSender(LightPush, nil, nil) + sender, err := NewMessageSender(LightPush, nil, nil, nil) require.Nil(t, err) require.NotNil(t, sender) require.Equal(t, LightPush, sender.publishMethod) @@ -140,7 +140,7 @@ func TestMessageSentEmitter(t *testing.T) { _, err = relayNode.Subscribe(context.Background(), protocol.NewContentFilter("test-pubsub-topic")) require.Nil(t, err) publisher := NewDefaultPublisher(nil, relayNode) - sender, err := NewMessageSender(Relay, publisher, utils.Logger()) + sender, err := NewMessageSender(Relay, publisher, nil, utils.Logger()) require.Nil(t, err) check := &MockMessageSentCheck{Messages: make(map[string]map[common.Hash]uint32)} diff --git a/waku/v2/api/publish/rate_limiting.go b/waku/v2/api/publish/rate_limiting.go index a0bddcbd..b4aa9a3e 100644 --- a/waku/v2/api/publish/rate_limiting.go +++ b/waku/v2/api/publish/rate_limiting.go @@ -9,22 +9,30 @@ import ( "golang.org/x/time/rate" ) +const DefaultPublishingLimiterRate = rate.Limit(5) +const DefaultPublishingLimitBurst = 10 + +// RateLimiter +type PublishRateLimiter interface { + Check(ctx context.Context, logger *zap.Logger) error +} + // PublishRateLimiter is used to decorate publish functions to limit the // number of messages per second that can be published -type PublishRateLimiter struct { +type DefaultRateLimiter struct { limiter *rate.Limiter } // NewPublishRateLimiter will create a new instance of PublishRateLimiter. // You can specify an rate.Inf value to in practice ignore the rate limiting -func NewPublishRateLimiter(r rate.Limit, b int) *PublishRateLimiter { - return &PublishRateLimiter{ +func NewDefaultRateLimiter(r rate.Limit, b int) *DefaultRateLimiter { + return &DefaultRateLimiter{ limiter: rate.NewLimiter(r, b), } } // ThrottlePublishFn is used to decorate a PublishFn so rate limiting is applied -func (p *PublishRateLimiter) ThrottlePublishFn(ctx context.Context, publishFn PublishFn) PublishFn { +func (p *DefaultRateLimiter) ThrottlePublishFn(ctx context.Context, publishFn PublishFn) PublishFn { return func(envelope *protocol.Envelope, logger *zap.Logger) error { if err := p.Check(ctx, logger); err != nil { return err @@ -33,7 +41,7 @@ func (p *PublishRateLimiter) ThrottlePublishFn(ctx context.Context, publishFn Pu } } -func (p *PublishRateLimiter) Check(ctx context.Context, logger *zap.Logger) error { +func (p *DefaultRateLimiter) Check(ctx context.Context, logger *zap.Logger) error { if err := p.limiter.Wait(ctx); err != nil { if !errors.Is(err, context.Canceled) { logger.Error("could not send message (limiter)", zap.Error(err)) diff --git a/waku/v2/api/publish/rate_limiting_test.go b/waku/v2/api/publish/rate_limiting_test.go index bde68bf3..2053b7f0 100644 --- a/waku/v2/api/publish/rate_limiting_test.go +++ b/waku/v2/api/publish/rate_limiting_test.go @@ -14,7 +14,7 @@ import ( ) func TestRateLimit(t *testing.T) { - r := NewPublishRateLimiter(rate.Limit(1), 1) + r := NewDefaultRateLimiter(rate.Limit(1), 1) l := utils.Logger() var counter atomic.Int32 diff --git a/waku/v2/api/publish/rln_rate_limiting.go b/waku/v2/api/publish/rln_rate_limiting.go new file mode 100644 index 00000000..11a469bf --- /dev/null +++ b/waku/v2/api/publish/rln_rate_limiting.go @@ -0,0 +1,63 @@ +package publish + +import ( + "context" + "errors" + "sync" + "time" + + "go.uber.org/zap" +) + +var ErrRateLimited = errors.New("rate limit exceeded") + +const RlnLimiterCapacity = 100 +const RlnLimiterRefillInterval = 10 * time.Minute + +// RlnRateLimiter is used to rate limit the outgoing messages, +// The capacity and refillInterval comes from RLN contract configuration. +type RlnRateLimiter struct { + mu sync.Mutex + capacity int + tokens int + refillInterval time.Duration + lastRefill time.Time +} + +// NewRlnPublishRateLimiter creates a new rate limiter, starts with a full capacity bucket. +func NewRlnRateLimiter(capacity int, refillInterval time.Duration) *RlnRateLimiter { + return &RlnRateLimiter{ + capacity: capacity, + tokens: capacity, // Start with a full bucket + refillInterval: refillInterval, + lastRefill: time.Now(), + } +} + +// Allow checks if a token can be consumed, and refills the bucket if necessary +func (rl *RlnRateLimiter) Allow() bool { + rl.mu.Lock() + defer rl.mu.Unlock() + + // Refill tokens if the refill interval has passed + now := time.Now() + if now.Sub(rl.lastRefill) >= rl.refillInterval { + rl.tokens = rl.capacity // Refill the bucket + rl.lastRefill = now + } + + // Check if there are tokens available + if rl.tokens > 0 { + rl.tokens-- + return true + } + + return false +} + +func (rl *RlnRateLimiter) Check(ctx context.Context, logger *zap.Logger) error { + if rl.Allow() { + return nil + } + return ErrRateLimited +} diff --git a/waku/v2/api/publish/rln_rate_limiting_test.go b/waku/v2/api/publish/rln_rate_limiting_test.go new file mode 100644 index 00000000..f91f6ca6 --- /dev/null +++ b/waku/v2/api/publish/rln_rate_limiting_test.go @@ -0,0 +1,26 @@ +package publish + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/waku-org/go-waku/waku/v2/utils" +) + +func TestRlnRateLimit(t *testing.T) { + r := NewRlnRateLimiter(3, 5*time.Second) + l := utils.Logger() + + for i := 0; i < 3; i++ { + require.NoError(t, r.Check(context.Background(), l)) + } + require.ErrorIs(t, r.Check(context.Background(), l), ErrRateLimited) + + time.Sleep(6 * time.Second) + for i := 0; i < 3; i++ { + require.NoError(t, r.Check(context.Background(), l)) + } + require.ErrorIs(t, r.Check(context.Background(), l), ErrRateLimited) +}