feat: add rate limit store

This commit is contained in:
pablo 2025-07-16 15:51:26 +03:00
parent 9f52377d44
commit 414ec6f920
No known key found for this signature in database
GPG Key ID: 78F35FCC60FDC63A
7 changed files with 96 additions and 250 deletions

2
.gitignore vendored
View File

@ -25,5 +25,7 @@ apps/*
!*.nim
tests/*
!*.nim
ratelimit/*
!*.nim
nimble.develop
nimble.paths

View File

@ -7,7 +7,9 @@ license = "MIT"
srcDir = "src"
### Dependencies
requires "nim >= 2.2.4", "chronicles", "chronos", "db_connector", "waku"
requires "nim >= 2.2.4",
"chronicles", "chronos", "db_connector",
"https://github.com/waku-org/token_bucket.git"
task buildSharedLib, "Build shared library for C bindings":
exec "nim c --mm:refc --app:lib --out:../library/c-bindings/libchatsdk.so chat_sdk/chat_sdk.nim"

View File

@ -1,5 +1,6 @@
import std/[times, deques, options]
import waku/common/rate_limit/token_bucket
import token_bucket
import ./store/store
import chronos
type
@ -27,7 +28,8 @@ type
MessageSender*[T: Serializable] = proc(msgs: seq[MsgIdMsg[T]]) {.async.}
RateLimitManager*[T: Serializable] = ref object
RateLimitManager*[T: Serializable, S: RateLimitStore] = ref object
store: S
bucket: TokenBucket
sender: MessageSender[T]
queueCritical: Deque[seq[MsgIdMsg[T]]]
@ -35,25 +37,41 @@ type
sleepDuration: chronos.Duration
pxQueueHandleLoop: Future[void]
proc new*[T: Serializable](
M: type[RateLimitManager[T]],
proc new*[T: Serializable, S: RateLimitStore](
M: type[RateLimitManager[T, S]],
store: S,
sender: MessageSender[T],
capacity: int = 100,
duration: chronos.Duration = chronos.minutes(10),
sleepDuration: chronos.Duration = chronos.milliseconds(1000),
): M =
M(
bucket: TokenBucket.newStrict(capacity, duration),
): Future[RateLimitManager[T, S]] {.async.} =
var current = await store.loadBucketState()
if current.isNone():
# initialize bucket state with full capacity
current = some(
BucketState(budget: capacity, budgetCap: capacity, lastTimeFull: Moment.now())
)
discard await store.saveBucketState(current.get())
return RateLimitManager[T, S](
store: store,
bucket: TokenBucket.new(
current.get().budgetCap,
duration,
ReplenishMode.Strict,
current.get().budget,
current.get().lastTimeFull,
),
sender: sender,
queueCritical: Deque[seq[MsgIdMsg[T]]](),
queueNormal: Deque[seq[MsgIdMsg[T]]](),
sleepDuration: sleepDuration,
)
proc getCapacityState[T: Serializable](
manager: RateLimitManager[T], now: Moment, count: int = 1
proc getCapacityState[T: Serializable, S: RateLimitStore](
manager: RateLimitManager[T, S], now: Moment, count: int = 1
): CapacityState =
let (budget, budgetCap) = manager.bucket.getAvailableCapacity(now)
let (budget, budgetCap, _) = manager.bucket.getAvailableCapacity(now)
let countAfter = budget - count
let ratio = countAfter.float / budgetCap.float
if ratio < 0.0:
@ -63,15 +81,15 @@ proc getCapacityState[T: Serializable](
else:
return CapacityState.Normal
proc passToSender[T: Serializable](
manager: RateLimitManager[T],
msgs: sink seq[MsgIdMsg[T]],
proc passToSender[T: Serializable, S: RateLimitStore](
manager: RateLimitManager[T, S],
msgs: seq[tuple[msgId: string, msg: T]],
now: Moment,
priority: Priority,
): Future[SendResult] {.async.} =
let count = msgs.len
let capacity = manager.bucket.tryConsume(count, now)
if not capacity:
let consumed = manager.bucket.tryConsume(count, now)
if not consumed:
case priority
of Priority.Critical:
manager.queueCritical.addLast(msgs)
@ -81,12 +99,17 @@ proc passToSender[T: Serializable](
return SendResult.Enqueued
of Priority.Optional:
return SendResult.Dropped
let (budget, budgetCap, lastTimeFull) = manager.bucket.getAvailableCapacity(now)
discard await manager.store.saveBucketState(
BucketState(budget: budget, budgetCap: budgetCap, lastTimeFull: lastTimeFull)
)
await manager.sender(msgs)
return SendResult.PassedToSender
proc processCriticalQueue[T: Serializable](
manager: RateLimitManager[T], now: Moment
) {.async.} =
proc processCriticalQueue[T: Serializable, S: RateLimitStore](
manager: RateLimitManager[T, S], now: Moment
): Future[void] {.async.} =
while manager.queueCritical.len > 0:
let msgs = manager.queueCritical.popFirst()
let capacityState = manager.getCapacityState(now, msgs.len)
@ -99,9 +122,9 @@ proc processCriticalQueue[T: Serializable](
manager.queueCritical.addFirst(msgs)
break
proc processNormalQueue[T: Serializable](
manager: RateLimitManager[T], now: Moment
) {.async.} =
proc processNormalQueue[T: Serializable, S: RateLimitStore](
manager: RateLimitManager[T, S], now: Moment
): Future[void] {.async.} =
while manager.queueNormal.len > 0:
let msgs = manager.queueNormal.popFirst()
let capacityState = manager.getCapacityState(now, msgs.len)
@ -112,13 +135,13 @@ proc processNormalQueue[T: Serializable](
manager.queueNormal.addFirst(msgs)
break
proc sendOrEnqueue*[T: Serializable](
manager: RateLimitManager[T],
msgs: seq[MsgIdMsg[T]],
proc sendOrEnqueue*[T: Serializable, S: RateLimitStore](
manager: RateLimitManager[T, S],
msgs: seq[tuple[msgId: string, msg: T]],
priority: Priority,
now: Moment = Moment.now(),
): Future[SendResult] {.async.} =
let (_, budgetCap) = manager.bucket.getAvailableCapacity(now)
let (_, budgetCap, _) = manager.bucket.getAvailableCapacity(now)
if msgs.len.float / budgetCap.float >= 0.3:
# drop batch if it's too large to avoid starvation
return SendResult.DroppedBatchTooLarge
@ -147,11 +170,13 @@ proc sendOrEnqueue*[T: Serializable](
of Priority.Optional:
return SendResult.Dropped
proc getEnqueued*[T: Serializable](
manager: RateLimitManager[T]
): tuple[critical: seq[MsgIdMsg[T]], normal: seq[MsgIdMsg[T]]] =
var criticalMsgs: seq[MsgIdMsg[T]]
var normalMsgs: seq[MsgIdMsg[T]]
proc getEnqueued*[T: Serializable, S: RateLimitStore](
manager: RateLimitManager[T, S]
): tuple[
critical: seq[tuple[msgId: string, msg: T]], normal: seq[tuple[msgId: string, msg: T]]
] =
var criticalMsgs: seq[tuple[msgId: string, msg: T]]
var normalMsgs: seq[tuple[msgId: string, msg: T]]
for batch in manager.queueCritical:
criticalMsgs.add(batch)
@ -161,8 +186,8 @@ proc getEnqueued*[T: Serializable](
return (criticalMsgs, normalMsgs)
proc queueHandleLoop[T: Serializable](
manager: RateLimitManager[T],
proc queueHandleLoop*[T: Serializable, S: RateLimitStore](
manager: RateLimitManager[T, S],
nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} =
Moment.now(),
) {.async.} =
@ -177,18 +202,22 @@ proc queueHandleLoop[T: Serializable](
# configurable sleep duration for processing queued messages
await sleepAsync(manager.sleepDuration)
proc start*[T: Serializable](
manager: RateLimitManager[T],
proc start*[T: Serializable, S: RateLimitStore](
manager: RateLimitManager[T, S],
nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} =
Moment.now(),
) {.async.} =
manager.pxQueueHandleLoop = manager.queueHandleLoop(nowProvider)
manager.pxQueueHandleLoop = queueHandleLoop(manager, nowProvider)
proc stop*[T: Serializable](manager: RateLimitManager[T]) {.async.} =
proc stop*[T: Serializable, S: RateLimitStore](
manager: RateLimitManager[T, S]
) {.async.} =
if not isNil(manager.pxQueueHandleLoop):
await manager.pxQueueHandleLoop.cancelAndWait()
func `$`*[T: Serializable](b: RateLimitManager[T]): string {.inline.} =
func `$`*[T: Serializable, S: RateLimitStore](
b: RateLimitManager[T, S]
): string {.inline.} =
if isNil(b):
return "nil"
return

View File

@ -4,18 +4,18 @@ import chronos
# Memory Implementation
type MemoryRateLimitStore* = ref object
bucketState: BucketState
bucketState: Option[BucketState]
proc newMemoryRateLimitStore*(): MemoryRateLimitStore =
result = MemoryRateLimitStore()
proc new*(T: type[MemoryRateLimitStore]): T =
return T(bucketState: none(BucketState))
proc saveBucketState*(
store: MemoryRateLimitStore, bucketState: BucketState
): Future[bool] {.async.} =
store.bucketState = bucketState
store.bucketState = some(bucketState)
return true
proc loadBucketState*(
store: MemoryRateLimitStore
): Future[Option[BucketState]] {.async.} =
return some(store.bucketState)
return store.bucketState

View File

@ -7,7 +7,7 @@ type
budgetCap*: int
lastTimeFull*: Moment
RateLimitStoreConcept* =
RateLimitStore* =
concept s
s.saveBucketState(BucketState) is Future[bool]
s.loadBucketState() is Future[Option[BucketState]]

View File

@ -1,198 +0,0 @@
{.push raises: [].}
import chronos, std/math, std/options
const BUDGET_COMPENSATION_LIMIT_PERCENT = 0.25
## This is an extract from chronos/rate_limit.nim due to the found bug in the original implementation.
## Unfortunately that bug cannot be solved without harm the original features of TokenBucket class.
## So, this current shortcut is used to enable move ahead with nwaku rate limiter implementation.
## ref: https://github.com/status-im/nim-chronos/issues/500
##
## This version of TokenBucket is different from the original one in chronos/rate_limit.nim in many ways:
## - It has a new mode called `Compensating` which is the default mode.
## Compensation is calculated as the not used bucket capacity in the last measured period(s) in average.
## or up until maximum the allowed compansation treshold (Currently it is const 25%).
## Also compensation takes care of the proper time period calculation to avoid non-usage periods that can lead to
## overcompensation.
## - Strict mode is also available which will only replenish when time period is over but also will fill
## the bucket to the max capacity.
type
ReplenishMode* = enum
Strict
Compensating
TokenBucket* = ref object
budget: int ## Current number of tokens in the bucket
budgetCap: int ## Bucket capacity
lastTimeFull: Moment
## This timer measures the proper periodizaiton of the bucket refilling
fillDuration: Duration ## Refill period
case replenishMode*: ReplenishMode
of Strict:
## In strict mode, the bucket is refilled only till the budgetCap
discard
of Compensating:
## This is the default mode.
maxCompensation: float
func periodDistance(bucket: TokenBucket, currentTime: Moment): float =
## notice fillDuration cannot be zero by design
## period distance is a float number representing the calculated period time
## since the last time bucket was refilled.
return
nanoseconds(currentTime - bucket.lastTimeFull).float /
nanoseconds(bucket.fillDuration).float
func getUsageAverageSince(bucket: TokenBucket, distance: float): float =
if distance == 0.float:
## in case there is zero time difference than the usage percentage is 100%
return 1.0
## budgetCap can never be zero
## usage average is calculated as a percentage of total capacity available over
## the measured period
return bucket.budget.float / bucket.budgetCap.float / distance
proc calcCompensation(bucket: TokenBucket, averageUsage: float): int =
# if we already fully used or even overused the tokens, there is no place for compensation
if averageUsage >= 1.0:
return 0
## compensation is the not used bucket capacity in the last measured period(s) in average.
## or maximum the allowed compansation treshold
let compensationPercent =
min((1.0 - averageUsage) * bucket.budgetCap.float, bucket.maxCompensation)
return trunc(compensationPercent).int
func periodElapsed(bucket: TokenBucket, currentTime: Moment): bool =
return currentTime - bucket.lastTimeFull >= bucket.fillDuration
## Update will take place if bucket is empty and trying to consume tokens.
## It checks if the bucket can be replenished as refill duration is passed or not.
## - strict mode:
proc updateStrict(bucket: TokenBucket, currentTime: Moment) =
if bucket.fillDuration == default(Duration):
bucket.budget = min(bucket.budgetCap, bucket.budget)
return
if not periodElapsed(bucket, currentTime):
return
bucket.budget = bucket.budgetCap
bucket.lastTimeFull = currentTime
## - compensating - ballancing load:
## - between updates we calculate average load (current bucket capacity / number of periods till last update)
## - gives the percentage load used recently
## - with this we can replenish bucket up to 100% + calculated leftover from previous period (caped with max treshold)
proc updateWithCompensation(bucket: TokenBucket, currentTime: Moment) =
if bucket.fillDuration == default(Duration):
bucket.budget = min(bucket.budgetCap, bucket.budget)
return
# do not replenish within the same period
if not periodElapsed(bucket, currentTime):
return
let distance = bucket.periodDistance(currentTime)
let recentAvgUsage = bucket.getUsageAverageSince(distance)
let compensation = bucket.calcCompensation(recentAvgUsage)
bucket.budget = bucket.budgetCap + compensation
bucket.lastTimeFull = currentTime
proc update(bucket: TokenBucket, currentTime: Moment) =
if bucket.replenishMode == ReplenishMode.Compensating:
updateWithCompensation(bucket, currentTime)
else:
updateStrict(bucket, currentTime)
## Returns the available capacity of the bucket: (budget, budgetCap)
proc getAvailableCapacity*(
bucket: TokenBucket, currentTime: Moment
): tuple[budget: int, budgetCap: int] =
if periodElapsed(bucket, currentTime):
case bucket.replenishMode
of ReplenishMode.Strict:
return (bucket.budgetCap, bucket.budgetCap)
of ReplenishMode.Compensating:
let distance = bucket.periodDistance(currentTime)
let recentAvgUsage = bucket.getUsageAverageSince(distance)
let compensation = bucket.calcCompensation(recentAvgUsage)
let availableBudget = bucket.budgetCap + compensation
return (availableBudget, bucket.budgetCap)
return (bucket.budget, bucket.budgetCap)
proc tryConsume*(bucket: TokenBucket, tokens: int, now = Moment.now()): bool =
## If `tokens` are available, consume them,
## Otherwhise, return false.
if bucket.budget >= bucket.budgetCap:
bucket.lastTimeFull = now
if bucket.budget >= tokens:
bucket.budget -= tokens
return true
bucket.update(now)
if bucket.budget >= tokens:
bucket.budget -= tokens
return true
else:
return false
proc replenish*(bucket: TokenBucket, tokens: int, now = Moment.now()) =
## Add `tokens` to the budget (capped to the bucket capacity)
bucket.budget += tokens
bucket.update(now)
proc new*(
T: type[TokenBucket],
budgetCap: int,
fillDuration: Duration = 1.seconds,
mode: ReplenishMode = ReplenishMode.Compensating,
): T =
assert not isZero(fillDuration)
assert budgetCap != 0
## Create different mode TokenBucket
case mode
of ReplenishMode.Strict:
return T(
budget: budgetCap,
budgetCap: budgetCap,
fillDuration: fillDuration,
lastTimeFull: Moment.now(),
replenishMode: mode,
)
of ReplenishMode.Compensating:
T(
budget: budgetCap,
budgetCap: budgetCap,
fillDuration: fillDuration,
lastTimeFull: Moment.now(),
replenishMode: mode,
maxCompensation: budgetCap.float * BUDGET_COMPENSATION_LIMIT_PERCENT,
)
proc newStrict*(T: type[TokenBucket], capacity: int, period: Duration): TokenBucket =
T.new(capacity, period, ReplenishMode.Strict)
proc newCompensating*(
T: type[TokenBucket], capacity: int, period: Duration
): TokenBucket =
T.new(capacity, period, ReplenishMode.Compensating)
func `$`*(b: TokenBucket): string {.inline.} =
if isNil(b):
return "nil"
return $b.budgetCap & "/" & $b.fillDuration
func `$`*(ob: Option[TokenBucket]): string {.inline.} =
if ob.isNone():
return "no-limit"
return $ob.get()

View File

@ -1,5 +1,6 @@
import testutils/unittests
import ../ratelimit/rate_limit_manager
import ../ratelimit/store/memory
import chronos
# Implement the Serializable concept for string
@ -22,8 +23,9 @@ suite "Queue RateLimitManager":
asyncTest "sendOrEnqueue - immediate send when capacity available":
## Given
let manager = RateLimitManager[string].new(
mockSender, capacity = 10, duration = chronos.milliseconds(100)
let store: MemoryRateLimitStore = MemoryRateLimitStore.new()
let manager = await RateLimitManager[string, MemoryRateLimitStore].new(
store, mockSender, capacity = 10, duration = chronos.milliseconds(100)
)
let testMsg = "Hello World"
@ -40,8 +42,9 @@ suite "Queue RateLimitManager":
asyncTest "sendOrEnqueue - multiple messages":
## Given
let manager = RateLimitManager[string].new(
mockSender, capacity = 10, duration = chronos.milliseconds(100)
let store = MemoryRateLimitStore.new()
let manager = await RateLimitManager[string, MemoryRateLimitStore].new(
store, mockSender, capacity = 10, duration = chronos.milliseconds(100)
)
## When
@ -60,7 +63,9 @@ suite "Queue RateLimitManager":
asyncTest "start and stop - drop large batch":
## Given
let manager = RateLimitManager[string].new(
let store = MemoryRateLimitStore.new()
let manager = await RateLimitManager[string, MemoryRateLimitStore].new(
store,
mockSender,
capacity = 2,
duration = chronos.milliseconds(100),
@ -75,7 +80,9 @@ suite "Queue RateLimitManager":
asyncTest "enqueue - enqueue critical only when exceeded":
## Given
let manager = RateLimitManager[string].new(
let store = MemoryRateLimitStore.new()
let manager = await RateLimitManager[string, MemoryRateLimitStore].new(
store,
mockSender,
capacity = 10,
duration = chronos.milliseconds(100),
@ -123,7 +130,9 @@ suite "Queue RateLimitManager":
asyncTest "enqueue - enqueue normal on 70% capacity":
## Given
let manager = RateLimitManager[string].new(
let store = MemoryRateLimitStore.new()
let manager = await RateLimitManager[string, MemoryRateLimitStore].new(
store,
mockSender,
capacity = 10,
duration = chronos.milliseconds(100),
@ -174,7 +183,9 @@ suite "Queue RateLimitManager":
asyncTest "enqueue - process queued messages":
## Given
let manager = RateLimitManager[string].new(
let store = MemoryRateLimitStore.new()
let manager = await RateLimitManager[string, MemoryRateLimitStore].new(
store,
mockSender,
capacity = 10,
duration = chronos.milliseconds(200),