From 414ec6f920f8f0df8c56581ba4c112a7f25dea73 Mon Sep 17 00:00:00 2001 From: pablo Date: Wed, 16 Jul 2025 15:51:26 +0300 Subject: [PATCH] feat: add rate limit store --- .gitignore | 2 + chat_sdk.nimble | 4 +- ratelimit/rate_limit_manager.nim | 103 ++++++++++------ ratelimit/store/memory.nim | 10 +- ratelimit/store/store.nim | 2 +- ratelimit/token_bucket.nim | 198 ------------------------------ tests/test_rate_limit_manager.nim | 27 ++-- 7 files changed, 96 insertions(+), 250 deletions(-) delete mode 100644 ratelimit/token_bucket.nim diff --git a/.gitignore b/.gitignore index 4ee6ee2..1843ca9 100644 --- a/.gitignore +++ b/.gitignore @@ -25,5 +25,7 @@ apps/* !*.nim tests/* !*.nim +ratelimit/* +!*.nim nimble.develop nimble.paths diff --git a/chat_sdk.nimble b/chat_sdk.nimble index b7d1da0..e5fbb06 100644 --- a/chat_sdk.nimble +++ b/chat_sdk.nimble @@ -7,7 +7,9 @@ license = "MIT" srcDir = "src" ### Dependencies -requires "nim >= 2.2.4", "chronicles", "chronos", "db_connector", "waku" +requires "nim >= 2.2.4", + "chronicles", "chronos", "db_connector", + "https://github.com/waku-org/token_bucket.git" task buildSharedLib, "Build shared library for C bindings": exec "nim c --mm:refc --app:lib --out:../library/c-bindings/libchatsdk.so chat_sdk/chat_sdk.nim" diff --git a/ratelimit/rate_limit_manager.nim b/ratelimit/rate_limit_manager.nim index c8f51f9..5451216 100644 --- a/ratelimit/rate_limit_manager.nim +++ b/ratelimit/rate_limit_manager.nim @@ -1,5 +1,6 @@ import std/[times, deques, options] -import waku/common/rate_limit/token_bucket +import token_bucket +import ./store/store import chronos type @@ -27,7 +28,8 @@ type MessageSender*[T: Serializable] = proc(msgs: seq[MsgIdMsg[T]]) {.async.} - RateLimitManager*[T: Serializable] = ref object + RateLimitManager*[T: Serializable, S: RateLimitStore] = ref object + store: S bucket: TokenBucket sender: MessageSender[T] queueCritical: Deque[seq[MsgIdMsg[T]]] @@ -35,25 +37,41 @@ type sleepDuration: chronos.Duration pxQueueHandleLoop: Future[void] -proc new*[T: Serializable]( - M: type[RateLimitManager[T]], +proc new*[T: Serializable, S: RateLimitStore]( + M: type[RateLimitManager[T, S]], + store: S, sender: MessageSender[T], capacity: int = 100, duration: chronos.Duration = chronos.minutes(10), sleepDuration: chronos.Duration = chronos.milliseconds(1000), -): M = - M( - bucket: TokenBucket.newStrict(capacity, duration), +): Future[RateLimitManager[T, S]] {.async.} = + var current = await store.loadBucketState() + if current.isNone(): + # initialize bucket state with full capacity + current = some( + BucketState(budget: capacity, budgetCap: capacity, lastTimeFull: Moment.now()) + ) + discard await store.saveBucketState(current.get()) + + return RateLimitManager[T, S]( + store: store, + bucket: TokenBucket.new( + current.get().budgetCap, + duration, + ReplenishMode.Strict, + current.get().budget, + current.get().lastTimeFull, + ), sender: sender, queueCritical: Deque[seq[MsgIdMsg[T]]](), queueNormal: Deque[seq[MsgIdMsg[T]]](), sleepDuration: sleepDuration, ) -proc getCapacityState[T: Serializable]( - manager: RateLimitManager[T], now: Moment, count: int = 1 +proc getCapacityState[T: Serializable, S: RateLimitStore]( + manager: RateLimitManager[T, S], now: Moment, count: int = 1 ): CapacityState = - let (budget, budgetCap) = manager.bucket.getAvailableCapacity(now) + let (budget, budgetCap, _) = manager.bucket.getAvailableCapacity(now) let countAfter = budget - count let ratio = countAfter.float / budgetCap.float if ratio < 0.0: @@ -63,15 +81,15 @@ proc getCapacityState[T: Serializable]( else: return CapacityState.Normal -proc passToSender[T: Serializable]( - manager: RateLimitManager[T], - msgs: sink seq[MsgIdMsg[T]], +proc passToSender[T: Serializable, S: RateLimitStore]( + manager: RateLimitManager[T, S], + msgs: seq[tuple[msgId: string, msg: T]], now: Moment, priority: Priority, ): Future[SendResult] {.async.} = let count = msgs.len - let capacity = manager.bucket.tryConsume(count, now) - if not capacity: + let consumed = manager.bucket.tryConsume(count, now) + if not consumed: case priority of Priority.Critical: manager.queueCritical.addLast(msgs) @@ -81,12 +99,17 @@ proc passToSender[T: Serializable]( return SendResult.Enqueued of Priority.Optional: return SendResult.Dropped + + let (budget, budgetCap, lastTimeFull) = manager.bucket.getAvailableCapacity(now) + discard await manager.store.saveBucketState( + BucketState(budget: budget, budgetCap: budgetCap, lastTimeFull: lastTimeFull) + ) await manager.sender(msgs) return SendResult.PassedToSender -proc processCriticalQueue[T: Serializable]( - manager: RateLimitManager[T], now: Moment -) {.async.} = +proc processCriticalQueue[T: Serializable, S: RateLimitStore]( + manager: RateLimitManager[T, S], now: Moment +): Future[void] {.async.} = while manager.queueCritical.len > 0: let msgs = manager.queueCritical.popFirst() let capacityState = manager.getCapacityState(now, msgs.len) @@ -99,9 +122,9 @@ proc processCriticalQueue[T: Serializable]( manager.queueCritical.addFirst(msgs) break -proc processNormalQueue[T: Serializable]( - manager: RateLimitManager[T], now: Moment -) {.async.} = +proc processNormalQueue[T: Serializable, S: RateLimitStore]( + manager: RateLimitManager[T, S], now: Moment +): Future[void] {.async.} = while manager.queueNormal.len > 0: let msgs = manager.queueNormal.popFirst() let capacityState = manager.getCapacityState(now, msgs.len) @@ -112,13 +135,13 @@ proc processNormalQueue[T: Serializable]( manager.queueNormal.addFirst(msgs) break -proc sendOrEnqueue*[T: Serializable]( - manager: RateLimitManager[T], - msgs: seq[MsgIdMsg[T]], +proc sendOrEnqueue*[T: Serializable, S: RateLimitStore]( + manager: RateLimitManager[T, S], + msgs: seq[tuple[msgId: string, msg: T]], priority: Priority, now: Moment = Moment.now(), ): Future[SendResult] {.async.} = - let (_, budgetCap) = manager.bucket.getAvailableCapacity(now) + let (_, budgetCap, _) = manager.bucket.getAvailableCapacity(now) if msgs.len.float / budgetCap.float >= 0.3: # drop batch if it's too large to avoid starvation return SendResult.DroppedBatchTooLarge @@ -147,11 +170,13 @@ proc sendOrEnqueue*[T: Serializable]( of Priority.Optional: return SendResult.Dropped -proc getEnqueued*[T: Serializable]( - manager: RateLimitManager[T] -): tuple[critical: seq[MsgIdMsg[T]], normal: seq[MsgIdMsg[T]]] = - var criticalMsgs: seq[MsgIdMsg[T]] - var normalMsgs: seq[MsgIdMsg[T]] +proc getEnqueued*[T: Serializable, S: RateLimitStore]( + manager: RateLimitManager[T, S] +): tuple[ + critical: seq[tuple[msgId: string, msg: T]], normal: seq[tuple[msgId: string, msg: T]] +] = + var criticalMsgs: seq[tuple[msgId: string, msg: T]] + var normalMsgs: seq[tuple[msgId: string, msg: T]] for batch in manager.queueCritical: criticalMsgs.add(batch) @@ -161,8 +186,8 @@ proc getEnqueued*[T: Serializable]( return (criticalMsgs, normalMsgs) -proc queueHandleLoop[T: Serializable]( - manager: RateLimitManager[T], +proc queueHandleLoop*[T: Serializable, S: RateLimitStore]( + manager: RateLimitManager[T, S], nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} = Moment.now(), ) {.async.} = @@ -177,18 +202,22 @@ proc queueHandleLoop[T: Serializable]( # configurable sleep duration for processing queued messages await sleepAsync(manager.sleepDuration) -proc start*[T: Serializable]( - manager: RateLimitManager[T], +proc start*[T: Serializable, S: RateLimitStore]( + manager: RateLimitManager[T, S], nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} = Moment.now(), ) {.async.} = - manager.pxQueueHandleLoop = manager.queueHandleLoop(nowProvider) + manager.pxQueueHandleLoop = queueHandleLoop(manager, nowProvider) -proc stop*[T: Serializable](manager: RateLimitManager[T]) {.async.} = +proc stop*[T: Serializable, S: RateLimitStore]( + manager: RateLimitManager[T, S] +) {.async.} = if not isNil(manager.pxQueueHandleLoop): await manager.pxQueueHandleLoop.cancelAndWait() -func `$`*[T: Serializable](b: RateLimitManager[T]): string {.inline.} = +func `$`*[T: Serializable, S: RateLimitStore]( + b: RateLimitManager[T, S] +): string {.inline.} = if isNil(b): return "nil" return diff --git a/ratelimit/store/memory.nim b/ratelimit/store/memory.nim index 557a17d..6391314 100644 --- a/ratelimit/store/memory.nim +++ b/ratelimit/store/memory.nim @@ -4,18 +4,18 @@ import chronos # Memory Implementation type MemoryRateLimitStore* = ref object - bucketState: BucketState + bucketState: Option[BucketState] -proc newMemoryRateLimitStore*(): MemoryRateLimitStore = - result = MemoryRateLimitStore() +proc new*(T: type[MemoryRateLimitStore]): T = + return T(bucketState: none(BucketState)) proc saveBucketState*( store: MemoryRateLimitStore, bucketState: BucketState ): Future[bool] {.async.} = - store.bucketState = bucketState + store.bucketState = some(bucketState) return true proc loadBucketState*( store: MemoryRateLimitStore ): Future[Option[BucketState]] {.async.} = - return some(store.bucketState) + return store.bucketState diff --git a/ratelimit/store/store.nim b/ratelimit/store/store.nim index c4f6da3..c916750 100644 --- a/ratelimit/store/store.nim +++ b/ratelimit/store/store.nim @@ -7,7 +7,7 @@ type budgetCap*: int lastTimeFull*: Moment - RateLimitStoreConcept* = + RateLimitStore* = concept s s.saveBucketState(BucketState) is Future[bool] s.loadBucketState() is Future[Option[BucketState]] diff --git a/ratelimit/token_bucket.nim b/ratelimit/token_bucket.nim deleted file mode 100644 index 447569a..0000000 --- a/ratelimit/token_bucket.nim +++ /dev/null @@ -1,198 +0,0 @@ -{.push raises: [].} - -import chronos, std/math, std/options - -const BUDGET_COMPENSATION_LIMIT_PERCENT = 0.25 - -## This is an extract from chronos/rate_limit.nim due to the found bug in the original implementation. -## Unfortunately that bug cannot be solved without harm the original features of TokenBucket class. -## So, this current shortcut is used to enable move ahead with nwaku rate limiter implementation. -## ref: https://github.com/status-im/nim-chronos/issues/500 -## -## This version of TokenBucket is different from the original one in chronos/rate_limit.nim in many ways: -## - It has a new mode called `Compensating` which is the default mode. -## Compensation is calculated as the not used bucket capacity in the last measured period(s) in average. -## or up until maximum the allowed compansation treshold (Currently it is const 25%). -## Also compensation takes care of the proper time period calculation to avoid non-usage periods that can lead to -## overcompensation. -## - Strict mode is also available which will only replenish when time period is over but also will fill -## the bucket to the max capacity. - -type - ReplenishMode* = enum - Strict - Compensating - - TokenBucket* = ref object - budget: int ## Current number of tokens in the bucket - budgetCap: int ## Bucket capacity - lastTimeFull: Moment - ## This timer measures the proper periodizaiton of the bucket refilling - fillDuration: Duration ## Refill period - case replenishMode*: ReplenishMode - of Strict: - ## In strict mode, the bucket is refilled only till the budgetCap - discard - of Compensating: - ## This is the default mode. - maxCompensation: float - -func periodDistance(bucket: TokenBucket, currentTime: Moment): float = - ## notice fillDuration cannot be zero by design - ## period distance is a float number representing the calculated period time - ## since the last time bucket was refilled. - return - nanoseconds(currentTime - bucket.lastTimeFull).float / - nanoseconds(bucket.fillDuration).float - -func getUsageAverageSince(bucket: TokenBucket, distance: float): float = - if distance == 0.float: - ## in case there is zero time difference than the usage percentage is 100% - return 1.0 - - ## budgetCap can never be zero - ## usage average is calculated as a percentage of total capacity available over - ## the measured period - return bucket.budget.float / bucket.budgetCap.float / distance - -proc calcCompensation(bucket: TokenBucket, averageUsage: float): int = - # if we already fully used or even overused the tokens, there is no place for compensation - if averageUsage >= 1.0: - return 0 - - ## compensation is the not used bucket capacity in the last measured period(s) in average. - ## or maximum the allowed compansation treshold - let compensationPercent = - min((1.0 - averageUsage) * bucket.budgetCap.float, bucket.maxCompensation) - return trunc(compensationPercent).int - -func periodElapsed(bucket: TokenBucket, currentTime: Moment): bool = - return currentTime - bucket.lastTimeFull >= bucket.fillDuration - -## Update will take place if bucket is empty and trying to consume tokens. -## It checks if the bucket can be replenished as refill duration is passed or not. -## - strict mode: -proc updateStrict(bucket: TokenBucket, currentTime: Moment) = - if bucket.fillDuration == default(Duration): - bucket.budget = min(bucket.budgetCap, bucket.budget) - return - - if not periodElapsed(bucket, currentTime): - return - - bucket.budget = bucket.budgetCap - bucket.lastTimeFull = currentTime - -## - compensating - ballancing load: -## - between updates we calculate average load (current bucket capacity / number of periods till last update) -## - gives the percentage load used recently -## - with this we can replenish bucket up to 100% + calculated leftover from previous period (caped with max treshold) -proc updateWithCompensation(bucket: TokenBucket, currentTime: Moment) = - if bucket.fillDuration == default(Duration): - bucket.budget = min(bucket.budgetCap, bucket.budget) - return - - # do not replenish within the same period - if not periodElapsed(bucket, currentTime): - return - - let distance = bucket.periodDistance(currentTime) - let recentAvgUsage = bucket.getUsageAverageSince(distance) - let compensation = bucket.calcCompensation(recentAvgUsage) - - bucket.budget = bucket.budgetCap + compensation - bucket.lastTimeFull = currentTime - -proc update(bucket: TokenBucket, currentTime: Moment) = - if bucket.replenishMode == ReplenishMode.Compensating: - updateWithCompensation(bucket, currentTime) - else: - updateStrict(bucket, currentTime) - -## Returns the available capacity of the bucket: (budget, budgetCap) -proc getAvailableCapacity*( - bucket: TokenBucket, currentTime: Moment -): tuple[budget: int, budgetCap: int] = - if periodElapsed(bucket, currentTime): - case bucket.replenishMode - of ReplenishMode.Strict: - return (bucket.budgetCap, bucket.budgetCap) - of ReplenishMode.Compensating: - let distance = bucket.periodDistance(currentTime) - let recentAvgUsage = bucket.getUsageAverageSince(distance) - let compensation = bucket.calcCompensation(recentAvgUsage) - let availableBudget = bucket.budgetCap + compensation - return (availableBudget, bucket.budgetCap) - return (bucket.budget, bucket.budgetCap) - -proc tryConsume*(bucket: TokenBucket, tokens: int, now = Moment.now()): bool = - ## If `tokens` are available, consume them, - ## Otherwhise, return false. - - if bucket.budget >= bucket.budgetCap: - bucket.lastTimeFull = now - - if bucket.budget >= tokens: - bucket.budget -= tokens - return true - - bucket.update(now) - - if bucket.budget >= tokens: - bucket.budget -= tokens - return true - else: - return false - -proc replenish*(bucket: TokenBucket, tokens: int, now = Moment.now()) = - ## Add `tokens` to the budget (capped to the bucket capacity) - bucket.budget += tokens - bucket.update(now) - -proc new*( - T: type[TokenBucket], - budgetCap: int, - fillDuration: Duration = 1.seconds, - mode: ReplenishMode = ReplenishMode.Compensating, -): T = - assert not isZero(fillDuration) - assert budgetCap != 0 - - ## Create different mode TokenBucket - case mode - of ReplenishMode.Strict: - return T( - budget: budgetCap, - budgetCap: budgetCap, - fillDuration: fillDuration, - lastTimeFull: Moment.now(), - replenishMode: mode, - ) - of ReplenishMode.Compensating: - T( - budget: budgetCap, - budgetCap: budgetCap, - fillDuration: fillDuration, - lastTimeFull: Moment.now(), - replenishMode: mode, - maxCompensation: budgetCap.float * BUDGET_COMPENSATION_LIMIT_PERCENT, - ) - -proc newStrict*(T: type[TokenBucket], capacity: int, period: Duration): TokenBucket = - T.new(capacity, period, ReplenishMode.Strict) - -proc newCompensating*( - T: type[TokenBucket], capacity: int, period: Duration -): TokenBucket = - T.new(capacity, period, ReplenishMode.Compensating) - -func `$`*(b: TokenBucket): string {.inline.} = - if isNil(b): - return "nil" - return $b.budgetCap & "/" & $b.fillDuration - -func `$`*(ob: Option[TokenBucket]): string {.inline.} = - if ob.isNone(): - return "no-limit" - - return $ob.get() diff --git a/tests/test_rate_limit_manager.nim b/tests/test_rate_limit_manager.nim index 6c34c54..b15606e 100644 --- a/tests/test_rate_limit_manager.nim +++ b/tests/test_rate_limit_manager.nim @@ -1,5 +1,6 @@ import testutils/unittests import ../ratelimit/rate_limit_manager +import ../ratelimit/store/memory import chronos # Implement the Serializable concept for string @@ -22,8 +23,9 @@ suite "Queue RateLimitManager": asyncTest "sendOrEnqueue - immediate send when capacity available": ## Given - let manager = RateLimitManager[string].new( - mockSender, capacity = 10, duration = chronos.milliseconds(100) + let store: MemoryRateLimitStore = MemoryRateLimitStore.new() + let manager = await RateLimitManager[string, MemoryRateLimitStore].new( + store, mockSender, capacity = 10, duration = chronos.milliseconds(100) ) let testMsg = "Hello World" @@ -40,8 +42,9 @@ suite "Queue RateLimitManager": asyncTest "sendOrEnqueue - multiple messages": ## Given - let manager = RateLimitManager[string].new( - mockSender, capacity = 10, duration = chronos.milliseconds(100) + let store = MemoryRateLimitStore.new() + let manager = await RateLimitManager[string, MemoryRateLimitStore].new( + store, mockSender, capacity = 10, duration = chronos.milliseconds(100) ) ## When @@ -60,7 +63,9 @@ suite "Queue RateLimitManager": asyncTest "start and stop - drop large batch": ## Given - let manager = RateLimitManager[string].new( + let store = MemoryRateLimitStore.new() + let manager = await RateLimitManager[string, MemoryRateLimitStore].new( + store, mockSender, capacity = 2, duration = chronos.milliseconds(100), @@ -75,7 +80,9 @@ suite "Queue RateLimitManager": asyncTest "enqueue - enqueue critical only when exceeded": ## Given - let manager = RateLimitManager[string].new( + let store = MemoryRateLimitStore.new() + let manager = await RateLimitManager[string, MemoryRateLimitStore].new( + store, mockSender, capacity = 10, duration = chronos.milliseconds(100), @@ -123,7 +130,9 @@ suite "Queue RateLimitManager": asyncTest "enqueue - enqueue normal on 70% capacity": ## Given - let manager = RateLimitManager[string].new( + let store = MemoryRateLimitStore.new() + let manager = await RateLimitManager[string, MemoryRateLimitStore].new( + store, mockSender, capacity = 10, duration = chronos.milliseconds(100), @@ -174,7 +183,9 @@ suite "Queue RateLimitManager": asyncTest "enqueue - process queued messages": ## Given - let manager = RateLimitManager[string].new( + let store = MemoryRateLimitStore.new() + let manager = await RateLimitManager[string, MemoryRateLimitStore].new( + store, mockSender, capacity = 10, duration = chronos.milliseconds(200),