feat: add rate limit store

This commit is contained in:
pablo 2025-07-16 15:51:26 +03:00
parent ce18bb0f50
commit 9b6c9f359d
No known key found for this signature in database
GPG Key ID: 78F35FCC60FDC63A
6 changed files with 94 additions and 50 deletions

2
.gitignore vendored
View File

@ -25,5 +25,7 @@ apps/*
!*.nim !*.nim
tests/* tests/*
!*.nim !*.nim
ratelimit/*
!*.nim
nimble.develop nimble.develop
nimble.paths nimble.paths

View File

@ -1,6 +1,7 @@
import std/[times, deques, options] import std/[times, deques, options]
# import ./token_bucket import ./token_bucket
import waku/common/rate_limit/token_bucket # import waku/common/rate_limit/token_bucket
import ./store/store
import chronos import chronos
type type
@ -27,7 +28,8 @@ type
MessageSender*[T: Serializable] = MessageSender*[T: Serializable] =
proc(msgs: seq[tuple[msgId: string, msg: T]]): Future[void] {.async.} proc(msgs: seq[tuple[msgId: string, msg: T]]): Future[void] {.async.}
RateLimitManager*[T: Serializable] = ref object RateLimitManager*[T: Serializable, S: RateLimitStore] = ref object
store: S
bucket: TokenBucket bucket: TokenBucket
sender: MessageSender[T] sender: MessageSender[T]
running: bool running: bool
@ -35,14 +37,30 @@ type
queueNormal: Deque[seq[tuple[msgId: string, msg: T]]] queueNormal: Deque[seq[tuple[msgId: string, msg: T]]]
sleepDuration: chronos.Duration sleepDuration: chronos.Duration
proc newRateLimitManager*[T: Serializable]( proc newRateLimitManager*[T: Serializable, S: RateLimitStore](
store: S,
sender: MessageSender[T], sender: MessageSender[T],
capacity: int = 100, capacity: int = 100,
duration: chronos.Duration = chronos.minutes(10), duration: chronos.Duration = chronos.minutes(10),
sleepDuration: chronos.Duration = chronos.milliseconds(1000), sleepDuration: chronos.Duration = chronos.milliseconds(1000),
): RateLimitManager[T] = ): Future[RateLimitManager[T, S]] {.async.} =
RateLimitManager[T]( var current = await store.loadBucketState()
bucket: TokenBucket.newStrict(capacity, duration), if current.isNone():
# initialize bucket state with full capacity
current = some(
BucketState(budget: capacity, budgetCap: capacity, lastTimeFull: Moment.now())
)
discard await store.saveBucketState(current.get())
return RateLimitManager[T, S](
store: store,
bucket: TokenBucket.new(
current.get().budgetCap,
duration,
ReplenishMode.Strict,
current.get().budget,
current.get().lastTimeFull,
),
sender: sender, sender: sender,
running: false, running: false,
queueCritical: Deque[seq[tuple[msgId: string, msg: T]]](), queueCritical: Deque[seq[tuple[msgId: string, msg: T]]](),
@ -50,10 +68,10 @@ proc newRateLimitManager*[T: Serializable](
sleepDuration: sleepDuration, sleepDuration: sleepDuration,
) )
proc getCapacityState[T: Serializable]( proc getCapacityState[T: Serializable, S: RateLimitStore](
manager: RateLimitManager[T], now: Moment, count: int = 1 manager: RateLimitManager[T, S], now: Moment, count: int = 1
): CapacityState = ): CapacityState =
let (budget, budgetCap) = manager.bucket.getAvailableCapacity(now) let (budget, budgetCap, _) = manager.bucket.getAvailableCapacity(now)
let countAfter = budget - count let countAfter = budget - count
let ratio = countAfter.float / budgetCap.float let ratio = countAfter.float / budgetCap.float
if ratio < 0.0: if ratio < 0.0:
@ -63,15 +81,15 @@ proc getCapacityState[T: Serializable](
else: else:
return CapacityState.Normal return CapacityState.Normal
proc passToSender[T: Serializable]( proc passToSender[T: Serializable, S: RateLimitStore](
manager: RateLimitManager[T], manager: RateLimitManager[T, S],
msgs: seq[tuple[msgId: string, msg: T]], msgs: seq[tuple[msgId: string, msg: T]],
now: Moment, now: Moment,
priority: Priority, priority: Priority,
): Future[SendResult] {.async.} = ): Future[SendResult] {.async.} =
let count = msgs.len let count = msgs.len
let capacity = manager.bucket.tryConsume(count, now) let consumed = manager.bucket.tryConsume(count, now)
if not capacity: if not consumed:
case priority case priority
of Priority.Critical: of Priority.Critical:
manager.queueCritical.addLast(msgs) manager.queueCritical.addLast(msgs)
@ -81,11 +99,16 @@ proc passToSender[T: Serializable](
return SendResult.Enqueued return SendResult.Enqueued
of Priority.Optional: of Priority.Optional:
return SendResult.Dropped return SendResult.Dropped
let (budget, budgetCap, lastTimeFull) = manager.bucket.getAvailableCapacity(now)
discard await manager.store.saveBucketState(
BucketState(budget: budget, budgetCap: budgetCap, lastTimeFull: lastTimeFull)
)
await manager.sender(msgs) await manager.sender(msgs)
return SendResult.PassedToSender return SendResult.PassedToSender
proc processCriticalQueue[T: Serializable]( proc processCriticalQueue[T: Serializable, S: RateLimitStore](
manager: RateLimitManager[T], now: Moment manager: RateLimitManager[T, S], now: Moment
): Future[void] {.async.} = ): Future[void] {.async.} =
while manager.queueCritical.len > 0: while manager.queueCritical.len > 0:
let msgs = manager.queueCritical.popFirst() let msgs = manager.queueCritical.popFirst()
@ -99,8 +122,8 @@ proc processCriticalQueue[T: Serializable](
manager.queueCritical.addFirst(msgs) manager.queueCritical.addFirst(msgs)
break break
proc processNormalQueue[T: Serializable]( proc processNormalQueue[T: Serializable, S: RateLimitStore](
manager: RateLimitManager[T], now: Moment manager: RateLimitManager[T, S], now: Moment
): Future[void] {.async.} = ): Future[void] {.async.} =
while manager.queueNormal.len > 0: while manager.queueNormal.len > 0:
let msgs = manager.queueNormal.popFirst() let msgs = manager.queueNormal.popFirst()
@ -112,13 +135,13 @@ proc processNormalQueue[T: Serializable](
manager.queueNormal.addFirst(msgs) manager.queueNormal.addFirst(msgs)
break break
proc sendOrEnqueue*[T: Serializable]( proc sendOrEnqueue*[T: Serializable, S: RateLimitStore](
manager: RateLimitManager[T], manager: RateLimitManager[T, S],
msgs: seq[tuple[msgId: string, msg: T]], msgs: seq[tuple[msgId: string, msg: T]],
priority: Priority, priority: Priority,
now: Moment = Moment.now(), now: Moment = Moment.now(),
): Future[SendResult] {.async.} = ): Future[SendResult] {.async.} =
let (_, budgetCap) = manager.bucket.getAvailableCapacity(now) let (_, budgetCap, _) = manager.bucket.getAvailableCapacity(now)
if msgs.len.float / budgetCap.float >= 0.3: if msgs.len.float / budgetCap.float >= 0.3:
# drop batch if it's too large to avoid starvation # drop batch if it's too large to avoid starvation
return SendResult.DroppedBatchTooLarge return SendResult.DroppedBatchTooLarge
@ -147,8 +170,8 @@ proc sendOrEnqueue*[T: Serializable](
of Priority.Optional: of Priority.Optional:
return SendResult.Dropped return SendResult.Dropped
proc getEnqueued*[T: Serializable]( proc getEnqueued*[T: Serializable, S: RateLimitStore](
manager: RateLimitManager[T] manager: RateLimitManager[T, S]
): tuple[ ): tuple[
critical: seq[tuple[msgId: string, msg: T]], normal: seq[tuple[msgId: string, msg: T]] critical: seq[tuple[msgId: string, msg: T]], normal: seq[tuple[msgId: string, msg: T]]
] = ] =
@ -163,8 +186,8 @@ proc getEnqueued*[T: Serializable](
return (criticalMsgs, normalMsgs) return (criticalMsgs, normalMsgs)
proc start*[T: Serializable]( proc start*[T: Serializable, S: RateLimitStore](
manager: RateLimitManager[T], manager: RateLimitManager[T, S],
nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} = nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} =
Moment.now(), Moment.now(),
): Future[void] {.async.} = ): Future[void] {.async.} =
@ -181,10 +204,12 @@ proc start*[T: Serializable](
echo "Error in queue processing: ", e.msg echo "Error in queue processing: ", e.msg
await sleepAsync(manager.sleepDuration) await sleepAsync(manager.sleepDuration)
proc stop*[T: Serializable](manager: RateLimitManager[T]) = proc stop*[T: Serializable, S: RateLimitStore](manager: RateLimitManager[T, S]) =
manager.running = false manager.running = false
func `$`*[T: Serializable](b: RateLimitManager[T]): string {.inline.} = func `$`*[T: Serializable, S: RateLimitStore](
b: RateLimitManager[T, S]
): string {.inline.} =
if isNil(b): if isNil(b):
return "nil" return "nil"
return return

View File

@ -4,18 +4,18 @@ import chronos
# Memory Implementation # Memory Implementation
type MemoryRateLimitStore* = ref object type MemoryRateLimitStore* = ref object
bucketState: BucketState bucketState: Option[BucketState]
proc newMemoryRateLimitStore*(): MemoryRateLimitStore = proc new*(T: type[MemoryRateLimitStore]): T =
result = MemoryRateLimitStore() return T(bucketState: none(BucketState))
proc saveBucketState*( proc saveBucketState*(
store: MemoryRateLimitStore, bucketState: BucketState store: MemoryRateLimitStore, bucketState: BucketState
): Future[bool] {.async.} = ): Future[bool] {.async.} =
store.bucketState = bucketState store.bucketState = some(bucketState)
return true return true
proc loadBucketState*( proc loadBucketState*(
store: MemoryRateLimitStore store: MemoryRateLimitStore
): Future[Option[BucketState]] {.async.} = ): Future[Option[BucketState]] {.async.} =
return some(store.bucketState) return store.bucketState

View File

@ -7,7 +7,7 @@ type
budgetCap*: int budgetCap*: int
lastTimeFull*: Moment lastTimeFull*: Moment
RateLimitStoreConcept* = RateLimitStore* =
concept s concept s
s.saveBucketState(BucketState) is Future[bool] s.saveBucketState(BucketState) is Future[bool]
s.loadBucketState() is Future[Option[BucketState]] s.loadBucketState() is Future[Option[BucketState]]

View File

@ -112,18 +112,18 @@ proc update(bucket: TokenBucket, currentTime: Moment) =
## Returns the available capacity of the bucket: (budget, budgetCap) ## Returns the available capacity of the bucket: (budget, budgetCap)
proc getAvailableCapacity*( proc getAvailableCapacity*(
bucket: TokenBucket, currentTime: Moment bucket: TokenBucket, currentTime: Moment
): tuple[budget: int, budgetCap: int] = ): tuple[budget: int, budgetCap: int, lastTimeFull: Moment] =
if periodElapsed(bucket, currentTime): if periodElapsed(bucket, currentTime):
case bucket.replenishMode case bucket.replenishMode
of ReplenishMode.Strict: of ReplenishMode.Strict:
return (bucket.budgetCap, bucket.budgetCap) return (bucket.budgetCap, bucket.budgetCap, bucket.lastTimeFull)
of ReplenishMode.Compensating: of ReplenishMode.Compensating:
let distance = bucket.periodDistance(currentTime) let distance = bucket.periodDistance(currentTime)
let recentAvgUsage = bucket.getUsageAverageSince(distance) let recentAvgUsage = bucket.getUsageAverageSince(distance)
let compensation = bucket.calcCompensation(recentAvgUsage) let compensation = bucket.calcCompensation(recentAvgUsage)
let availableBudget = bucket.budgetCap + compensation let availableBudget = bucket.budgetCap + compensation
return (availableBudget, bucket.budgetCap) return (availableBudget, bucket.budgetCap, bucket.lastTimeFull)
return (bucket.budget, bucket.budgetCap) return (bucket.budget, bucket.budgetCap, bucket.lastTimeFull)
proc tryConsume*(bucket: TokenBucket, tokens: int, now = Moment.now()): bool = proc tryConsume*(bucket: TokenBucket, tokens: int, now = Moment.now()): bool =
## If `tokens` are available, consume them, ## If `tokens` are available, consume them,
@ -154,26 +154,30 @@ proc new*(
budgetCap: int, budgetCap: int,
fillDuration: Duration = 1.seconds, fillDuration: Duration = 1.seconds,
mode: ReplenishMode = ReplenishMode.Compensating, mode: ReplenishMode = ReplenishMode.Compensating,
budget: int = -1, # -1 means "use budgetCap"
lastTimeFull: Moment = Moment.now(),
): T = ): T =
assert not isZero(fillDuration) assert not isZero(fillDuration)
assert budgetCap != 0 assert budgetCap != 0
let actualBudget = if budget == -1: budgetCap else: budget
assert actualBudget >= 0 and actualBudget <= budgetCap
## Create different mode TokenBucket ## Create different mode TokenBucket
case mode case mode
of ReplenishMode.Strict: of ReplenishMode.Strict:
return T( return T(
budget: budgetCap, budget: actualBudget,
budgetCap: budgetCap, budgetCap: budgetCap,
fillDuration: fillDuration, fillDuration: fillDuration,
lastTimeFull: Moment.now(), lastTimeFull: lastTimeFull,
replenishMode: mode, replenishMode: mode,
) )
of ReplenishMode.Compensating: of ReplenishMode.Compensating:
T( T(
budget: budgetCap, budget: actualBudget,
budgetCap: budgetCap, budgetCap: budgetCap,
fillDuration: fillDuration, fillDuration: fillDuration,
lastTimeFull: Moment.now(), lastTimeFull: lastTimeFull,
replenishMode: mode, replenishMode: mode,
maxCompensation: budgetCap.float * BUDGET_COMPENSATION_LIMIT_PERCENT, maxCompensation: budgetCap.float * BUDGET_COMPENSATION_LIMIT_PERCENT,
) )

View File

@ -2,6 +2,7 @@
import testutils/unittests import testutils/unittests
import ../ratelimit/ratelimit import ../ratelimit/ratelimit
import ../ratelimit/store/memory
import chronos import chronos
import strutils import strutils
@ -25,8 +26,9 @@ suite "Queue RateLimitManager":
asyncTest "sendOrEnqueue - immediate send when capacity available": asyncTest "sendOrEnqueue - immediate send when capacity available":
## Given ## Given
let manager = newRateLimitManager[string]( let store: MemoryRateLimitStore = MemoryRateLimitStore.new()
mockSender, capacity = 10, duration = chronos.milliseconds(100) let manager = await newRateLimitManager[string, MemoryRateLimitStore](
store, mockSender, capacity = 10, duration = chronos.milliseconds(100)
) )
let testMsg = "Hello World" let testMsg = "Hello World"
@ -43,8 +45,9 @@ suite "Queue RateLimitManager":
asyncTest "sendOrEnqueue - multiple messages": asyncTest "sendOrEnqueue - multiple messages":
## Given ## Given
let manager = newRateLimitManager[string]( let store = MemoryRateLimitStore.new()
mockSender, capacity = 10, duration = chronos.milliseconds(100) let manager = await newRateLimitManager[string, MemoryRateLimitStore](
store, mockSender, capacity = 10, duration = chronos.milliseconds(100)
) )
## When ## When
@ -63,7 +66,9 @@ suite "Queue RateLimitManager":
asyncTest "start and stop - basic functionality": asyncTest "start and stop - basic functionality":
## Given ## Given
let manager = newRateLimitManager[string]( let store = MemoryRateLimitStore.new()
let manager = await newRateLimitManager[string, MemoryRateLimitStore](
store,
mockSender, mockSender,
capacity = 10, capacity = 10,
duration = chronos.milliseconds(100), duration = chronos.milliseconds(100),
@ -94,7 +99,9 @@ suite "Queue RateLimitManager":
asyncTest "start and stop - drop large batch": asyncTest "start and stop - drop large batch":
## Given ## Given
let manager = newRateLimitManager[string]( let store = MemoryRateLimitStore.new()
let manager = await newRateLimitManager[string, MemoryRateLimitStore](
store,
mockSender, mockSender,
capacity = 2, capacity = 2,
duration = chronos.milliseconds(100), duration = chronos.milliseconds(100),
@ -109,7 +116,9 @@ suite "Queue RateLimitManager":
asyncTest "enqueue - enqueue critical only when exceeded": asyncTest "enqueue - enqueue critical only when exceeded":
## Given ## Given
let manager = newRateLimitManager[string]( let store = MemoryRateLimitStore.new()
let manager = await newRateLimitManager[string, MemoryRateLimitStore](
store,
mockSender, mockSender,
capacity = 10, capacity = 10,
duration = chronos.milliseconds(100), duration = chronos.milliseconds(100),
@ -163,7 +172,9 @@ suite "Queue RateLimitManager":
asyncTest "enqueue - enqueue normal on 70% capacity": asyncTest "enqueue - enqueue normal on 70% capacity":
## Given ## Given
let manager = newRateLimitManager[string]( let store = MemoryRateLimitStore.new()
let manager = await newRateLimitManager[string, MemoryRateLimitStore](
store,
mockSender, mockSender,
capacity = 10, capacity = 10,
duration = chronos.milliseconds(100), duration = chronos.milliseconds(100),
@ -220,7 +231,9 @@ suite "Queue RateLimitManager":
asyncTest "enqueue - process queued messages": asyncTest "enqueue - process queued messages":
## Given ## Given
let manager = newRateLimitManager[string]( let store = MemoryRateLimitStore.new()
let manager = await newRateLimitManager[string, MemoryRateLimitStore](
store,
mockSender, mockSender,
capacity = 10, capacity = 10,
duration = chronos.milliseconds(200), duration = chronos.milliseconds(200),