From 348d339ea8f7b8977172391a44f0aa4a82cfd207 Mon Sep 17 00:00:00 2001 From: kaichaosun Date: Mon, 17 Mar 2025 15:03:46 +0800 Subject: [PATCH] chore: refactor to use rate limit state --- waku/v2/api/publish/rln_rate_limiting.go | 18 +++++++++--------- waku/v2/api/publish/rln_rate_limiting_test.go | 10 +++++++--- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/waku/v2/api/publish/rln_rate_limiting.go b/waku/v2/api/publish/rln_rate_limiting.go index 59f00ab0..d0b52689 100644 --- a/waku/v2/api/publish/rln_rate_limiting.go +++ b/waku/v2/api/publish/rln_rate_limiting.go @@ -11,8 +11,8 @@ import ( var ErrRateLimited = errors.New("rate limit exceeded") -const RlnLimiterCapacity = 600 -const RlnLimiterRefillInterval = 10 * time.Minute +const DefaultRlnLimiterCapacity = 600 +const DefaultRlnLimiterRefillInterval = 10 * time.Minute // RlnRateLimiter is used to rate limit the outgoing messages, // The capacity and refillInterval comes from RLN contract configuration. @@ -22,22 +22,22 @@ type RlnRateLimiter struct { tokens int refillInterval time.Duration lastRefill time.Time - updateCh chan BucketUpdate + updateCh chan RlnRateLimitState } -// BucketUpdate includes the information that need to be persisted in database. -type BucketUpdate struct { +// RlnRateLimitState includes the information that need to be persisted in database. +type RlnRateLimitState struct { RemainingTokens int LastRefill time.Time } // NewRlnPublishRateLimiter creates a new rate limiter, starts with a full capacity bucket. -func NewRlnRateLimiter(capacity int, refillInterval time.Duration, availableTokens int, lastRefill time.Time, updateCh chan BucketUpdate) *RlnRateLimiter { +func NewRlnRateLimiter(capacity int, refillInterval time.Duration, state RlnRateLimitState, updateCh chan RlnRateLimitState) *RlnRateLimiter { return &RlnRateLimiter{ capacity: capacity, - tokens: availableTokens, // Start with a full bucket in the first run, then track the remaining tokens in storage + tokens: state.RemainingTokens, refillInterval: refillInterval, - lastRefill: lastRefill, + lastRefill: state.LastRefill, updateCh: updateCh, } } @@ -67,7 +67,7 @@ func (rl *RlnRateLimiter) Allow() bool { // sendUpdate sends the latest token state to the update channel. func (rl *RlnRateLimiter) sendUpdate() { - rl.updateCh <- BucketUpdate{RemainingTokens: rl.tokens, LastRefill: rl.lastRefill} + rl.updateCh <- RlnRateLimitState{RemainingTokens: rl.tokens, LastRefill: rl.lastRefill} } func (rl *RlnRateLimiter) Check(ctx context.Context, logger *zap.Logger) error { diff --git a/waku/v2/api/publish/rln_rate_limiting_test.go b/waku/v2/api/publish/rln_rate_limiting_test.go index c10a899a..f62d52c1 100644 --- a/waku/v2/api/publish/rln_rate_limiting_test.go +++ b/waku/v2/api/publish/rln_rate_limiting_test.go @@ -11,10 +11,14 @@ import ( ) func TestRlnRateLimit(t *testing.T) { - updateCh := make(chan BucketUpdate, 10) + updateCh := make(chan RlnRateLimitState, 10) refillTime := time.Now() capacity := 3 - r := NewRlnRateLimiter(capacity, 5*time.Second, capacity, refillTime, updateCh) + state := RlnRateLimitState{ + RemainingTokens: capacity, + LastRefill: refillTime, + } + r := NewRlnRateLimiter(capacity, 5*time.Second, state, updateCh) l := utils.Logger() ctx, cancel := context.WithCancel(context.Background()) @@ -22,7 +26,7 @@ func TestRlnRateLimit(t *testing.T) { sleepDuration := 6 * time.Second var mu sync.Mutex - go func(ctx context.Context, ch chan BucketUpdate) { + go func(ctx context.Context, ch chan RlnRateLimitState) { usedToken := 0 for { select {