chore: refactor to use rate limit state

This commit is contained in:
kaichaosun 2025-03-17 15:03:46 +08:00
parent 7ff0d97456
commit 348d339ea8
No known key found for this signature in database
GPG Key ID: 223E0F992F4F03BF
2 changed files with 16 additions and 12 deletions

View File

@ -11,8 +11,8 @@ import (
var ErrRateLimited = errors.New("rate limit exceeded") var ErrRateLimited = errors.New("rate limit exceeded")
const RlnLimiterCapacity = 600 const DefaultRlnLimiterCapacity = 600
const RlnLimiterRefillInterval = 10 * time.Minute const DefaultRlnLimiterRefillInterval = 10 * time.Minute
// RlnRateLimiter is used to rate limit the outgoing messages, // RlnRateLimiter is used to rate limit the outgoing messages,
// The capacity and refillInterval comes from RLN contract configuration. // The capacity and refillInterval comes from RLN contract configuration.
@ -22,22 +22,22 @@ type RlnRateLimiter struct {
tokens int tokens int
refillInterval time.Duration refillInterval time.Duration
lastRefill time.Time lastRefill time.Time
updateCh chan BucketUpdate updateCh chan RlnRateLimitState
} }
// BucketUpdate includes the information that need to be persisted in database. // RlnRateLimitState includes the information that need to be persisted in database.
type BucketUpdate struct { type RlnRateLimitState struct {
RemainingTokens int RemainingTokens int
LastRefill time.Time LastRefill time.Time
} }
// NewRlnPublishRateLimiter creates a new rate limiter, starts with a full capacity bucket. // NewRlnPublishRateLimiter creates a new rate limiter, starts with a full capacity bucket.
func NewRlnRateLimiter(capacity int, refillInterval time.Duration, availableTokens int, lastRefill time.Time, updateCh chan BucketUpdate) *RlnRateLimiter { func NewRlnRateLimiter(capacity int, refillInterval time.Duration, state RlnRateLimitState, updateCh chan RlnRateLimitState) *RlnRateLimiter {
return &RlnRateLimiter{ return &RlnRateLimiter{
capacity: capacity, capacity: capacity,
tokens: availableTokens, // Start with a full bucket in the first run, then track the remaining tokens in storage tokens: state.RemainingTokens,
refillInterval: refillInterval, refillInterval: refillInterval,
lastRefill: lastRefill, lastRefill: state.LastRefill,
updateCh: updateCh, updateCh: updateCh,
} }
} }
@ -67,7 +67,7 @@ func (rl *RlnRateLimiter) Allow() bool {
// sendUpdate sends the latest token state to the update channel. // sendUpdate sends the latest token state to the update channel.
func (rl *RlnRateLimiter) sendUpdate() { func (rl *RlnRateLimiter) sendUpdate() {
rl.updateCh <- BucketUpdate{RemainingTokens: rl.tokens, LastRefill: rl.lastRefill} rl.updateCh <- RlnRateLimitState{RemainingTokens: rl.tokens, LastRefill: rl.lastRefill}
} }
func (rl *RlnRateLimiter) Check(ctx context.Context, logger *zap.Logger) error { func (rl *RlnRateLimiter) Check(ctx context.Context, logger *zap.Logger) error {

View File

@ -11,10 +11,14 @@ import (
) )
func TestRlnRateLimit(t *testing.T) { func TestRlnRateLimit(t *testing.T) {
updateCh := make(chan BucketUpdate, 10) updateCh := make(chan RlnRateLimitState, 10)
refillTime := time.Now() refillTime := time.Now()
capacity := 3 capacity := 3
r := NewRlnRateLimiter(capacity, 5*time.Second, capacity, refillTime, updateCh) state := RlnRateLimitState{
RemainingTokens: capacity,
LastRefill: refillTime,
}
r := NewRlnRateLimiter(capacity, 5*time.Second, state, updateCh)
l := utils.Logger() l := utils.Logger()
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
@ -22,7 +26,7 @@ func TestRlnRateLimit(t *testing.T) {
sleepDuration := 6 * time.Second sleepDuration := 6 * time.Second
var mu sync.Mutex var mu sync.Mutex
go func(ctx context.Context, ch chan BucketUpdate) { go func(ctx context.Context, ch chan RlnRateLimitState) {
usedToken := 0 usedToken := 0
for { for {
select { select {