From 9b6c9f359d8383fee5b863fc9a2ffe1865456186 Mon Sep 17 00:00:00 2001
From: pablo
Date: Wed, 16 Jul 2025 15:51:26 +0300
Subject: [PATCH] feat: add rate limit store
---
.gitignore | 2 +
ratelimit/ratelimit.nim | 79 +++++++++++++++++++++++++-------------
ratelimit/store/memory.nim | 10 ++---
ratelimit/store/store.nim | 2 +-
ratelimit/token_bucket.nim | 20 ++++++----
tests/test_ratelimit.nim | 31 ++++++++++-----
6 files changed, 94 insertions(+), 50 deletions(-)
diff --git a/.gitignore b/.gitignore
index 4ee6ee2..1843ca9 100644
--- a/.gitignore
+++ b/.gitignore
@@ -25,5 +25,7 @@ apps/*
!*.nim
tests/*
!*.nim
+ratelimit/*
+!*.nim
nimble.develop
nimble.paths
diff --git a/ratelimit/ratelimit.nim b/ratelimit/ratelimit.nim
index 9bd1bf6..ec10f2d 100644
--- a/ratelimit/ratelimit.nim
+++ b/ratelimit/ratelimit.nim
@@ -1,6 +1,7 @@
import std/[times, deques, options]
-# import ./token_bucket
-import waku/common/rate_limit/token_bucket
+import ./token_bucket
+# import waku/common/rate_limit/token_bucket
+import ./store/store
import chronos
type
@@ -27,7 +28,8 @@ type
MessageSender*[T: Serializable] =
proc(msgs: seq[tuple[msgId: string, msg: T]]): Future[void] {.async.}
- RateLimitManager*[T: Serializable] = ref object
+ RateLimitManager*[T: Serializable, S: RateLimitStore] = ref object
+ store: S
bucket: TokenBucket
sender: MessageSender[T]
running: bool
@@ -35,14 +37,30 @@ type
queueNormal: Deque[seq[tuple[msgId: string, msg: T]]]
sleepDuration: chronos.Duration
-proc newRateLimitManager*[T: Serializable](
+proc newRateLimitManager*[T: Serializable, S: RateLimitStore](
+ store: S,
sender: MessageSender[T],
capacity: int = 100,
duration: chronos.Duration = chronos.minutes(10),
sleepDuration: chronos.Duration = chronos.milliseconds(1000),
-): RateLimitManager[T] =
- RateLimitManager[T](
- bucket: TokenBucket.newStrict(capacity, duration),
+): Future[RateLimitManager[T, S]] {.async.} =
+ var current = await store.loadBucketState()
+ if current.isNone():
+ # initialize bucket state with full capacity
+ current = some(
+ BucketState(budget: capacity, budgetCap: capacity, lastTimeFull: Moment.now())
+ )
+ discard await store.saveBucketState(current.get())
+
+ return RateLimitManager[T, S](
+ store: store,
+ bucket: TokenBucket.new(
+ current.get().budgetCap,
+ duration,
+ ReplenishMode.Strict,
+ current.get().budget,
+ current.get().lastTimeFull,
+ ),
sender: sender,
running: false,
queueCritical: Deque[seq[tuple[msgId: string, msg: T]]](),
@@ -50,10 +68,10 @@ proc newRateLimitManager*[T: Serializable](
sleepDuration: sleepDuration,
)
-proc getCapacityState[T: Serializable](
- manager: RateLimitManager[T], now: Moment, count: int = 1
+proc getCapacityState[T: Serializable, S: RateLimitStore](
+ manager: RateLimitManager[T, S], now: Moment, count: int = 1
): CapacityState =
- let (budget, budgetCap) = manager.bucket.getAvailableCapacity(now)
+ let (budget, budgetCap, _) = manager.bucket.getAvailableCapacity(now)
let countAfter = budget - count
let ratio = countAfter.float / budgetCap.float
if ratio < 0.0:
@@ -63,15 +81,15 @@ proc getCapacityState[T: Serializable](
else:
return CapacityState.Normal
-proc passToSender[T: Serializable](
- manager: RateLimitManager[T],
+proc passToSender[T: Serializable, S: RateLimitStore](
+ manager: RateLimitManager[T, S],
msgs: seq[tuple[msgId: string, msg: T]],
now: Moment,
priority: Priority,
): Future[SendResult] {.async.} =
let count = msgs.len
- let capacity = manager.bucket.tryConsume(count, now)
- if not capacity:
+ let consumed = manager.bucket.tryConsume(count, now)
+ if not consumed:
case priority
of Priority.Critical:
manager.queueCritical.addLast(msgs)
@@ -81,11 +99,16 @@ proc passToSender[T: Serializable](
return SendResult.Enqueued
of Priority.Optional:
return SendResult.Dropped
+
+ let (budget, budgetCap, lastTimeFull) = manager.bucket.getAvailableCapacity(now)
+ discard await manager.store.saveBucketState(
+ BucketState(budget: budget, budgetCap: budgetCap, lastTimeFull: lastTimeFull)
+ )
await manager.sender(msgs)
return SendResult.PassedToSender
-proc processCriticalQueue[T: Serializable](
- manager: RateLimitManager[T], now: Moment
+proc processCriticalQueue[T: Serializable, S: RateLimitStore](
+ manager: RateLimitManager[T, S], now: Moment
): Future[void] {.async.} =
while manager.queueCritical.len > 0:
let msgs = manager.queueCritical.popFirst()
@@ -99,8 +122,8 @@ proc processCriticalQueue[T: Serializable](
manager.queueCritical.addFirst(msgs)
break
-proc processNormalQueue[T: Serializable](
- manager: RateLimitManager[T], now: Moment
+proc processNormalQueue[T: Serializable, S: RateLimitStore](
+ manager: RateLimitManager[T, S], now: Moment
): Future[void] {.async.} =
while manager.queueNormal.len > 0:
let msgs = manager.queueNormal.popFirst()
@@ -112,13 +135,13 @@ proc processNormalQueue[T: Serializable](
manager.queueNormal.addFirst(msgs)
break
-proc sendOrEnqueue*[T: Serializable](
- manager: RateLimitManager[T],
+proc sendOrEnqueue*[T: Serializable, S: RateLimitStore](
+ manager: RateLimitManager[T, S],
msgs: seq[tuple[msgId: string, msg: T]],
priority: Priority,
now: Moment = Moment.now(),
): Future[SendResult] {.async.} =
- let (_, budgetCap) = manager.bucket.getAvailableCapacity(now)
+ let (_, budgetCap, _) = manager.bucket.getAvailableCapacity(now)
if msgs.len.float / budgetCap.float >= 0.3:
# drop batch if it's too large to avoid starvation
return SendResult.DroppedBatchTooLarge
@@ -147,8 +170,8 @@ proc sendOrEnqueue*[T: Serializable](
of Priority.Optional:
return SendResult.Dropped
-proc getEnqueued*[T: Serializable](
- manager: RateLimitManager[T]
+proc getEnqueued*[T: Serializable, S: RateLimitStore](
+ manager: RateLimitManager[T, S]
): tuple[
critical: seq[tuple[msgId: string, msg: T]], normal: seq[tuple[msgId: string, msg: T]]
] =
@@ -163,8 +186,8 @@ proc getEnqueued*[T: Serializable](
return (criticalMsgs, normalMsgs)
-proc start*[T: Serializable](
- manager: RateLimitManager[T],
+proc start*[T: Serializable, S: RateLimitStore](
+ manager: RateLimitManager[T, S],
nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} =
Moment.now(),
): Future[void] {.async.} =
@@ -181,10 +204,12 @@ proc start*[T: Serializable](
echo "Error in queue processing: ", e.msg
await sleepAsync(manager.sleepDuration)
-proc stop*[T: Serializable](manager: RateLimitManager[T]) =
+proc stop*[T: Serializable, S: RateLimitStore](manager: RateLimitManager[T, S]) =
manager.running = false
-func `$`*[T: Serializable](b: RateLimitManager[T]): string {.inline.} =
+func `$`*[T: Serializable, S: RateLimitStore](
+ b: RateLimitManager[T, S]
+): string {.inline.} =
if isNil(b):
return "nil"
return
diff --git a/ratelimit/store/memory.nim b/ratelimit/store/memory.nim
index 557a17d..6391314 100644
--- a/ratelimit/store/memory.nim
+++ b/ratelimit/store/memory.nim
@@ -4,18 +4,18 @@ import chronos
# Memory Implementation
type MemoryRateLimitStore* = ref object
- bucketState: BucketState
+ bucketState: Option[BucketState]
-proc newMemoryRateLimitStore*(): MemoryRateLimitStore =
- result = MemoryRateLimitStore()
+proc new*(T: type[MemoryRateLimitStore]): T =
+ return T(bucketState: none(BucketState))
proc saveBucketState*(
store: MemoryRateLimitStore, bucketState: BucketState
): Future[bool] {.async.} =
- store.bucketState = bucketState
+ store.bucketState = some(bucketState)
return true
proc loadBucketState*(
store: MemoryRateLimitStore
): Future[Option[BucketState]] {.async.} =
- return some(store.bucketState)
+ return store.bucketState
diff --git a/ratelimit/store/store.nim b/ratelimit/store/store.nim
index c4f6da3..c916750 100644
--- a/ratelimit/store/store.nim
+++ b/ratelimit/store/store.nim
@@ -7,7 +7,7 @@ type
budgetCap*: int
lastTimeFull*: Moment
- RateLimitStoreConcept* =
+ RateLimitStore* =
concept s
s.saveBucketState(BucketState) is Future[bool]
s.loadBucketState() is Future[Option[BucketState]]
diff --git a/ratelimit/token_bucket.nim b/ratelimit/token_bucket.nim
index 447569a..e4a4487 100644
--- a/ratelimit/token_bucket.nim
+++ b/ratelimit/token_bucket.nim
@@ -112,18 +112,18 @@ proc update(bucket: TokenBucket, currentTime: Moment) =
## Returns the available capacity of the bucket: (budget, budgetCap)
proc getAvailableCapacity*(
bucket: TokenBucket, currentTime: Moment
-): tuple[budget: int, budgetCap: int] =
+): tuple[budget: int, budgetCap: int, lastTimeFull: Moment] =
if periodElapsed(bucket, currentTime):
case bucket.replenishMode
of ReplenishMode.Strict:
- return (bucket.budgetCap, bucket.budgetCap)
+ return (bucket.budgetCap, bucket.budgetCap, bucket.lastTimeFull)
of ReplenishMode.Compensating:
let distance = bucket.periodDistance(currentTime)
let recentAvgUsage = bucket.getUsageAverageSince(distance)
let compensation = bucket.calcCompensation(recentAvgUsage)
let availableBudget = bucket.budgetCap + compensation
- return (availableBudget, bucket.budgetCap)
- return (bucket.budget, bucket.budgetCap)
+ return (availableBudget, bucket.budgetCap, bucket.lastTimeFull)
+ return (bucket.budget, bucket.budgetCap, bucket.lastTimeFull)
proc tryConsume*(bucket: TokenBucket, tokens: int, now = Moment.now()): bool =
## If `tokens` are available, consume them,
@@ -154,26 +154,30 @@ proc new*(
budgetCap: int,
fillDuration: Duration = 1.seconds,
mode: ReplenishMode = ReplenishMode.Compensating,
+ budget: int = -1, # -1 means "use budgetCap"
+ lastTimeFull: Moment = Moment.now(),
): T =
assert not isZero(fillDuration)
assert budgetCap != 0
+ let actualBudget = if budget == -1: budgetCap else: budget
+ assert actualBudget >= 0 and actualBudget <= budgetCap
## Create different mode TokenBucket
case mode
of ReplenishMode.Strict:
return T(
- budget: budgetCap,
+ budget: actualBudget,
budgetCap: budgetCap,
fillDuration: fillDuration,
- lastTimeFull: Moment.now(),
+ lastTimeFull: lastTimeFull,
replenishMode: mode,
)
of ReplenishMode.Compensating:
T(
- budget: budgetCap,
+ budget: actualBudget,
budgetCap: budgetCap,
fillDuration: fillDuration,
- lastTimeFull: Moment.now(),
+ lastTimeFull: lastTimeFull,
replenishMode: mode,
maxCompensation: budgetCap.float * BUDGET_COMPENSATION_LIMIT_PERCENT,
)
diff --git a/tests/test_ratelimit.nim b/tests/test_ratelimit.nim
index f38ddbf..a10dd9e 100644
--- a/tests/test_ratelimit.nim
+++ b/tests/test_ratelimit.nim
@@ -2,6 +2,7 @@
import testutils/unittests
import ../ratelimit/ratelimit
+import ../ratelimit/store/memory
import chronos
import strutils
@@ -25,8 +26,9 @@ suite "Queue RateLimitManager":
asyncTest "sendOrEnqueue - immediate send when capacity available":
## Given
- let manager = newRateLimitManager[string](
- mockSender, capacity = 10, duration = chronos.milliseconds(100)
+ let store: MemoryRateLimitStore = MemoryRateLimitStore.new()
+ let manager = await newRateLimitManager[string, MemoryRateLimitStore](
+ store, mockSender, capacity = 10, duration = chronos.milliseconds(100)
)
let testMsg = "Hello World"
@@ -43,8 +45,9 @@ suite "Queue RateLimitManager":
asyncTest "sendOrEnqueue - multiple messages":
## Given
- let manager = newRateLimitManager[string](
- mockSender, capacity = 10, duration = chronos.milliseconds(100)
+ let store = MemoryRateLimitStore.new()
+ let manager = await newRateLimitManager[string, MemoryRateLimitStore](
+ store, mockSender, capacity = 10, duration = chronos.milliseconds(100)
)
## When
@@ -63,7 +66,9 @@ suite "Queue RateLimitManager":
asyncTest "start and stop - basic functionality":
## Given
- let manager = newRateLimitManager[string](
+ let store = MemoryRateLimitStore.new()
+ let manager = await newRateLimitManager[string, MemoryRateLimitStore](
+ store,
mockSender,
capacity = 10,
duration = chronos.milliseconds(100),
@@ -94,7 +99,9 @@ suite "Queue RateLimitManager":
asyncTest "start and stop - drop large batch":
## Given
- let manager = newRateLimitManager[string](
+ let store = MemoryRateLimitStore.new()
+ let manager = await newRateLimitManager[string, MemoryRateLimitStore](
+ store,
mockSender,
capacity = 2,
duration = chronos.milliseconds(100),
@@ -109,7 +116,9 @@ suite "Queue RateLimitManager":
asyncTest "enqueue - enqueue critical only when exceeded":
## Given
- let manager = newRateLimitManager[string](
+ let store = MemoryRateLimitStore.new()
+ let manager = await newRateLimitManager[string, MemoryRateLimitStore](
+ store,
mockSender,
capacity = 10,
duration = chronos.milliseconds(100),
@@ -163,7 +172,9 @@ suite "Queue RateLimitManager":
asyncTest "enqueue - enqueue normal on 70% capacity":
## Given
- let manager = newRateLimitManager[string](
+ let store = MemoryRateLimitStore.new()
+ let manager = await newRateLimitManager[string, MemoryRateLimitStore](
+ store,
mockSender,
capacity = 10,
duration = chronos.milliseconds(100),
@@ -220,7 +231,9 @@ suite "Queue RateLimitManager":
asyncTest "enqueue - process queued messages":
## Given
- let manager = newRateLimitManager[string](
+ let store = MemoryRateLimitStore.new()
+ let manager = await newRateLimitManager[string, MemoryRateLimitStore](
+ store,
mockSender,
capacity = 10,
duration = chronos.milliseconds(200),