mirror of
https://github.com/logos-messaging/logos-messaging-go.git
synced 2026-01-10 18:03:07 +00:00
feat: rate limit with rln configuration
This commit is contained in:
parent
78b522db50
commit
e42defcfef
@ -12,12 +12,9 @@ import (
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
const DefaultPeersToPublishForLightpush = 2
|
||||
const DefaultPublishingLimiterRate = rate.Limit(5)
|
||||
const DefaultPublishingLimitBurst = 10
|
||||
|
||||
type PublishMethod int
|
||||
|
||||
@ -53,7 +50,7 @@ type MessageSender struct {
|
||||
publishMethod PublishMethod
|
||||
publisher Publisher
|
||||
messageSentCheck ISentCheck
|
||||
rateLimiter *PublishRateLimiter
|
||||
rateLimiter PublishRateLimiter
|
||||
logger *zap.Logger
|
||||
evtMessageSent event.Emitter
|
||||
}
|
||||
@ -89,7 +86,7 @@ func NewMessageSender(publishMethod PublishMethod, publisher Publisher, logger *
|
||||
return &MessageSender{
|
||||
publishMethod: publishMethod,
|
||||
publisher: publisher,
|
||||
rateLimiter: NewPublishRateLimiter(DefaultPublishingLimiterRate, DefaultPublishingLimitBurst),
|
||||
rateLimiter: NewRlnRateLimiter(RlnLimiterCapacity, RlnLimiterRefillInterval),
|
||||
logger: logger,
|
||||
}, nil
|
||||
}
|
||||
@ -99,7 +96,7 @@ func (ms *MessageSender) WithMessageSentCheck(messageSentCheck ISentCheck) *Mess
|
||||
return ms
|
||||
}
|
||||
|
||||
func (ms *MessageSender) WithRateLimiting(rateLimiter *PublishRateLimiter) *MessageSender {
|
||||
func (ms *MessageSender) WithRateLimiting(rateLimiter PublishRateLimiter) *MessageSender {
|
||||
ms.rateLimiter = rateLimiter
|
||||
return ms
|
||||
}
|
||||
|
||||
@ -9,22 +9,30 @@ import (
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
const DefaultPublishingLimiterRate = rate.Limit(5)
|
||||
const DefaultPublishingLimitBurst = 10
|
||||
|
||||
// RateLimiter
|
||||
type PublishRateLimiter interface {
|
||||
Check(ctx context.Context, logger *zap.Logger) error
|
||||
}
|
||||
|
||||
// PublishRateLimiter is used to decorate publish functions to limit the
|
||||
// number of messages per second that can be published
|
||||
type PublishRateLimiter struct {
|
||||
type DefaultRateLimiter struct {
|
||||
limiter *rate.Limiter
|
||||
}
|
||||
|
||||
// NewPublishRateLimiter will create a new instance of PublishRateLimiter.
|
||||
// You can specify an rate.Inf value to in practice ignore the rate limiting
|
||||
func NewPublishRateLimiter(r rate.Limit, b int) *PublishRateLimiter {
|
||||
return &PublishRateLimiter{
|
||||
func NewDefaultRateLimiter(r rate.Limit, b int) *DefaultRateLimiter {
|
||||
return &DefaultRateLimiter{
|
||||
limiter: rate.NewLimiter(r, b),
|
||||
}
|
||||
}
|
||||
|
||||
// ThrottlePublishFn is used to decorate a PublishFn so rate limiting is applied
|
||||
func (p *PublishRateLimiter) ThrottlePublishFn(ctx context.Context, publishFn PublishFn) PublishFn {
|
||||
func (p *DefaultRateLimiter) ThrottlePublishFn(ctx context.Context, publishFn PublishFn) PublishFn {
|
||||
return func(envelope *protocol.Envelope, logger *zap.Logger) error {
|
||||
if err := p.Check(ctx, logger); err != nil {
|
||||
return err
|
||||
@ -33,7 +41,7 @@ func (p *PublishRateLimiter) ThrottlePublishFn(ctx context.Context, publishFn Pu
|
||||
}
|
||||
}
|
||||
|
||||
func (p *PublishRateLimiter) Check(ctx context.Context, logger *zap.Logger) error {
|
||||
func (p *DefaultRateLimiter) Check(ctx context.Context, logger *zap.Logger) error {
|
||||
if err := p.limiter.Wait(ctx); err != nil {
|
||||
if !errors.Is(err, context.Canceled) {
|
||||
logger.Error("could not send message (limiter)", zap.Error(err))
|
||||
|
||||
@ -14,7 +14,7 @@ import (
|
||||
)
|
||||
|
||||
func TestRateLimit(t *testing.T) {
|
||||
r := NewPublishRateLimiter(rate.Limit(1), 1)
|
||||
r := NewDefaultRateLimiter(rate.Limit(1), 1)
|
||||
l := utils.Logger()
|
||||
|
||||
var counter atomic.Int32
|
||||
|
||||
64
waku/v2/api/publish/rln_rate_limiting.go
Normal file
64
waku/v2/api/publish/rln_rate_limiting.go
Normal file
@ -0,0 +1,64 @@
|
||||
package publish
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var ErrRateLimited = errors.New("rate limit exceeded")
|
||||
|
||||
const RlnLimiterCapacity = 100
|
||||
const RlnLimiterRefillInterval = 10 * time.Minute
|
||||
|
||||
// RlnRateLimiter is used to rate limit the outgoing messages,
|
||||
// The capacity and refillAfter comes from RLN contract configuration.
|
||||
type RlnRateLimiter struct {
|
||||
mu sync.Mutex
|
||||
capacity int
|
||||
tokens int
|
||||
refillAfter time.Duration
|
||||
lastRefill time.Time
|
||||
}
|
||||
|
||||
// NewRlnPublishRateLimiter creates a new rate limiter, starts with a full capacity bucket.
|
||||
func NewRlnRateLimiter(capacity int, refillAfter time.Duration) *RlnRateLimiter {
|
||||
return &RlnRateLimiter{
|
||||
capacity: capacity,
|
||||
tokens: capacity, // Start with a full bucket
|
||||
refillAfter: refillAfter,
|
||||
lastRefill: time.Now(),
|
||||
}
|
||||
}
|
||||
|
||||
// Allow checks if a token can be consumed, and refills the bucket if necessary
|
||||
func (rl *RlnRateLimiter) Allow() bool {
|
||||
rl.mu.Lock()
|
||||
defer rl.mu.Unlock()
|
||||
|
||||
// Refill tokens if the refill interval has passed
|
||||
now := time.Now()
|
||||
if now.Sub(rl.lastRefill) >= rl.refillAfter {
|
||||
rl.tokens = rl.capacity // Refill the bucket
|
||||
rl.lastRefill = now
|
||||
}
|
||||
|
||||
// Check if there are tokens available
|
||||
if rl.tokens > 0 {
|
||||
rl.tokens--
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (rl *RlnRateLimiter) Check(ctx context.Context, logger *zap.Logger) error {
|
||||
if rl.Allow() {
|
||||
return nil
|
||||
}
|
||||
logger.Error("could not send message rate limited", zap.Error(ErrRateLimited))
|
||||
return ErrRateLimited
|
||||
}
|
||||
26
waku/v2/api/publish/rln_rate_limiting_test.go
Normal file
26
waku/v2/api/publish/rln_rate_limiting_test.go
Normal file
@ -0,0 +1,26 @@
|
||||
package publish
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
)
|
||||
|
||||
func TestRlnRateLimit(t *testing.T) {
|
||||
r := NewRlnRateLimiter(3, 5*time.Second)
|
||||
l := utils.Logger()
|
||||
|
||||
for i := 0; i < 3; i++ {
|
||||
require.NoError(t, r.Check(context.Background(), l))
|
||||
}
|
||||
require.ErrorIs(t, r.Check(context.Background(), l), ErrRateLimited)
|
||||
|
||||
time.Sleep(6 * time.Second)
|
||||
for i := 0; i < 3; i++ {
|
||||
require.NoError(t, r.Check(context.Background(), l))
|
||||
}
|
||||
require.ErrorIs(t, r.Check(context.Background(), l), ErrRateLimited)
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user