diff --git a/.gitignore b/.gitignore index 4ee6ee2..1843ca9 100644 --- a/.gitignore +++ b/.gitignore @@ -25,5 +25,7 @@ apps/* !*.nim tests/* !*.nim +ratelimit/* +!*.nim nimble.develop nimble.paths diff --git a/ratelimit/ratelimit.nim b/ratelimit/ratelimit.nim index 9bd1bf6..ec10f2d 100644 --- a/ratelimit/ratelimit.nim +++ b/ratelimit/ratelimit.nim @@ -1,6 +1,7 @@ import std/[times, deques, options] -# import ./token_bucket -import waku/common/rate_limit/token_bucket +import ./token_bucket +# import waku/common/rate_limit/token_bucket +import ./store/store import chronos type @@ -27,7 +28,8 @@ type MessageSender*[T: Serializable] = proc(msgs: seq[tuple[msgId: string, msg: T]]): Future[void] {.async.} - RateLimitManager*[T: Serializable] = ref object + RateLimitManager*[T: Serializable, S: RateLimitStore] = ref object + store: S bucket: TokenBucket sender: MessageSender[T] running: bool @@ -35,14 +37,30 @@ type queueNormal: Deque[seq[tuple[msgId: string, msg: T]]] sleepDuration: chronos.Duration -proc newRateLimitManager*[T: Serializable]( +proc newRateLimitManager*[T: Serializable, S: RateLimitStore]( + store: S, sender: MessageSender[T], capacity: int = 100, duration: chronos.Duration = chronos.minutes(10), sleepDuration: chronos.Duration = chronos.milliseconds(1000), -): RateLimitManager[T] = - RateLimitManager[T]( - bucket: TokenBucket.newStrict(capacity, duration), +): Future[RateLimitManager[T, S]] {.async.} = + var current = await store.loadBucketState() + if current.isNone(): + # initialize bucket state with full capacity + current = some( + BucketState(budget: capacity, budgetCap: capacity, lastTimeFull: Moment.now()) + ) + discard await store.saveBucketState(current.get()) + + return RateLimitManager[T, S]( + store: store, + bucket: TokenBucket.new( + current.get().budgetCap, + duration, + ReplenishMode.Strict, + current.get().budget, + current.get().lastTimeFull, + ), sender: sender, running: false, queueCritical: Deque[seq[tuple[msgId: string, msg: T]]](), @@ -50,10 +68,10 @@ proc newRateLimitManager*[T: Serializable]( sleepDuration: sleepDuration, ) -proc getCapacityState[T: Serializable]( - manager: RateLimitManager[T], now: Moment, count: int = 1 +proc getCapacityState[T: Serializable, S: RateLimitStore]( + manager: RateLimitManager[T, S], now: Moment, count: int = 1 ): CapacityState = - let (budget, budgetCap) = manager.bucket.getAvailableCapacity(now) + let (budget, budgetCap, _) = manager.bucket.getAvailableCapacity(now) let countAfter = budget - count let ratio = countAfter.float / budgetCap.float if ratio < 0.0: @@ -63,15 +81,15 @@ proc getCapacityState[T: Serializable]( else: return CapacityState.Normal -proc passToSender[T: Serializable]( - manager: RateLimitManager[T], +proc passToSender[T: Serializable, S: RateLimitStore]( + manager: RateLimitManager[T, S], msgs: seq[tuple[msgId: string, msg: T]], now: Moment, priority: Priority, ): Future[SendResult] {.async.} = let count = msgs.len - let capacity = manager.bucket.tryConsume(count, now) - if not capacity: + let consumed = manager.bucket.tryConsume(count, now) + if not consumed: case priority of Priority.Critical: manager.queueCritical.addLast(msgs) @@ -81,11 +99,16 @@ proc passToSender[T: Serializable]( return SendResult.Enqueued of Priority.Optional: return SendResult.Dropped + + let (budget, budgetCap, lastTimeFull) = manager.bucket.getAvailableCapacity(now) + discard await manager.store.saveBucketState( + BucketState(budget: budget, budgetCap: budgetCap, lastTimeFull: lastTimeFull) + ) await manager.sender(msgs) return SendResult.PassedToSender -proc processCriticalQueue[T: Serializable]( - manager: RateLimitManager[T], now: Moment +proc processCriticalQueue[T: Serializable, S: RateLimitStore]( + manager: RateLimitManager[T, S], now: Moment ): Future[void] {.async.} = while manager.queueCritical.len > 0: let msgs = manager.queueCritical.popFirst() @@ -99,8 +122,8 @@ proc processCriticalQueue[T: Serializable]( manager.queueCritical.addFirst(msgs) break -proc processNormalQueue[T: Serializable]( - manager: RateLimitManager[T], now: Moment +proc processNormalQueue[T: Serializable, S: RateLimitStore]( + manager: RateLimitManager[T, S], now: Moment ): Future[void] {.async.} = while manager.queueNormal.len > 0: let msgs = manager.queueNormal.popFirst() @@ -112,13 +135,13 @@ proc processNormalQueue[T: Serializable]( manager.queueNormal.addFirst(msgs) break -proc sendOrEnqueue*[T: Serializable]( - manager: RateLimitManager[T], +proc sendOrEnqueue*[T: Serializable, S: RateLimitStore]( + manager: RateLimitManager[T, S], msgs: seq[tuple[msgId: string, msg: T]], priority: Priority, now: Moment = Moment.now(), ): Future[SendResult] {.async.} = - let (_, budgetCap) = manager.bucket.getAvailableCapacity(now) + let (_, budgetCap, _) = manager.bucket.getAvailableCapacity(now) if msgs.len.float / budgetCap.float >= 0.3: # drop batch if it's too large to avoid starvation return SendResult.DroppedBatchTooLarge @@ -147,8 +170,8 @@ proc sendOrEnqueue*[T: Serializable]( of Priority.Optional: return SendResult.Dropped -proc getEnqueued*[T: Serializable]( - manager: RateLimitManager[T] +proc getEnqueued*[T: Serializable, S: RateLimitStore]( + manager: RateLimitManager[T, S] ): tuple[ critical: seq[tuple[msgId: string, msg: T]], normal: seq[tuple[msgId: string, msg: T]] ] = @@ -163,8 +186,8 @@ proc getEnqueued*[T: Serializable]( return (criticalMsgs, normalMsgs) -proc start*[T: Serializable]( - manager: RateLimitManager[T], +proc start*[T: Serializable, S: RateLimitStore]( + manager: RateLimitManager[T, S], nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} = Moment.now(), ): Future[void] {.async.} = @@ -181,10 +204,12 @@ proc start*[T: Serializable]( echo "Error in queue processing: ", e.msg await sleepAsync(manager.sleepDuration) -proc stop*[T: Serializable](manager: RateLimitManager[T]) = +proc stop*[T: Serializable, S: RateLimitStore](manager: RateLimitManager[T, S]) = manager.running = false -func `$`*[T: Serializable](b: RateLimitManager[T]): string {.inline.} = +func `$`*[T: Serializable, S: RateLimitStore]( + b: RateLimitManager[T, S] +): string {.inline.} = if isNil(b): return "nil" return diff --git a/ratelimit/store/memory.nim b/ratelimit/store/memory.nim index 557a17d..6391314 100644 --- a/ratelimit/store/memory.nim +++ b/ratelimit/store/memory.nim @@ -4,18 +4,18 @@ import chronos # Memory Implementation type MemoryRateLimitStore* = ref object - bucketState: BucketState + bucketState: Option[BucketState] -proc newMemoryRateLimitStore*(): MemoryRateLimitStore = - result = MemoryRateLimitStore() +proc new*(T: type[MemoryRateLimitStore]): T = + return T(bucketState: none(BucketState)) proc saveBucketState*( store: MemoryRateLimitStore, bucketState: BucketState ): Future[bool] {.async.} = - store.bucketState = bucketState + store.bucketState = some(bucketState) return true proc loadBucketState*( store: MemoryRateLimitStore ): Future[Option[BucketState]] {.async.} = - return some(store.bucketState) + return store.bucketState diff --git a/ratelimit/store/store.nim b/ratelimit/store/store.nim index c4f6da3..c916750 100644 --- a/ratelimit/store/store.nim +++ b/ratelimit/store/store.nim @@ -7,7 +7,7 @@ type budgetCap*: int lastTimeFull*: Moment - RateLimitStoreConcept* = + RateLimitStore* = concept s s.saveBucketState(BucketState) is Future[bool] s.loadBucketState() is Future[Option[BucketState]] diff --git a/ratelimit/token_bucket.nim b/ratelimit/token_bucket.nim index 447569a..e4a4487 100644 --- a/ratelimit/token_bucket.nim +++ b/ratelimit/token_bucket.nim @@ -112,18 +112,18 @@ proc update(bucket: TokenBucket, currentTime: Moment) = ## Returns the available capacity of the bucket: (budget, budgetCap) proc getAvailableCapacity*( bucket: TokenBucket, currentTime: Moment -): tuple[budget: int, budgetCap: int] = +): tuple[budget: int, budgetCap: int, lastTimeFull: Moment] = if periodElapsed(bucket, currentTime): case bucket.replenishMode of ReplenishMode.Strict: - return (bucket.budgetCap, bucket.budgetCap) + return (bucket.budgetCap, bucket.budgetCap, bucket.lastTimeFull) of ReplenishMode.Compensating: let distance = bucket.periodDistance(currentTime) let recentAvgUsage = bucket.getUsageAverageSince(distance) let compensation = bucket.calcCompensation(recentAvgUsage) let availableBudget = bucket.budgetCap + compensation - return (availableBudget, bucket.budgetCap) - return (bucket.budget, bucket.budgetCap) + return (availableBudget, bucket.budgetCap, bucket.lastTimeFull) + return (bucket.budget, bucket.budgetCap, bucket.lastTimeFull) proc tryConsume*(bucket: TokenBucket, tokens: int, now = Moment.now()): bool = ## If `tokens` are available, consume them, @@ -154,26 +154,30 @@ proc new*( budgetCap: int, fillDuration: Duration = 1.seconds, mode: ReplenishMode = ReplenishMode.Compensating, + budget: int = -1, # -1 means "use budgetCap" + lastTimeFull: Moment = Moment.now(), ): T = assert not isZero(fillDuration) assert budgetCap != 0 + let actualBudget = if budget == -1: budgetCap else: budget + assert actualBudget >= 0 and actualBudget <= budgetCap ## Create different mode TokenBucket case mode of ReplenishMode.Strict: return T( - budget: budgetCap, + budget: actualBudget, budgetCap: budgetCap, fillDuration: fillDuration, - lastTimeFull: Moment.now(), + lastTimeFull: lastTimeFull, replenishMode: mode, ) of ReplenishMode.Compensating: T( - budget: budgetCap, + budget: actualBudget, budgetCap: budgetCap, fillDuration: fillDuration, - lastTimeFull: Moment.now(), + lastTimeFull: lastTimeFull, replenishMode: mode, maxCompensation: budgetCap.float * BUDGET_COMPENSATION_LIMIT_PERCENT, ) diff --git a/tests/test_ratelimit.nim b/tests/test_ratelimit.nim index f38ddbf..a10dd9e 100644 --- a/tests/test_ratelimit.nim +++ b/tests/test_ratelimit.nim @@ -2,6 +2,7 @@ import testutils/unittests import ../ratelimit/ratelimit +import ../ratelimit/store/memory import chronos import strutils @@ -25,8 +26,9 @@ suite "Queue RateLimitManager": asyncTest "sendOrEnqueue - immediate send when capacity available": ## Given - let manager = newRateLimitManager[string]( - mockSender, capacity = 10, duration = chronos.milliseconds(100) + let store: MemoryRateLimitStore = MemoryRateLimitStore.new() + let manager = await newRateLimitManager[string, MemoryRateLimitStore]( + store, mockSender, capacity = 10, duration = chronos.milliseconds(100) ) let testMsg = "Hello World" @@ -43,8 +45,9 @@ suite "Queue RateLimitManager": asyncTest "sendOrEnqueue - multiple messages": ## Given - let manager = newRateLimitManager[string]( - mockSender, capacity = 10, duration = chronos.milliseconds(100) + let store = MemoryRateLimitStore.new() + let manager = await newRateLimitManager[string, MemoryRateLimitStore]( + store, mockSender, capacity = 10, duration = chronos.milliseconds(100) ) ## When @@ -63,7 +66,9 @@ suite "Queue RateLimitManager": asyncTest "start and stop - basic functionality": ## Given - let manager = newRateLimitManager[string]( + let store = MemoryRateLimitStore.new() + let manager = await newRateLimitManager[string, MemoryRateLimitStore]( + store, mockSender, capacity = 10, duration = chronos.milliseconds(100), @@ -94,7 +99,9 @@ suite "Queue RateLimitManager": asyncTest "start and stop - drop large batch": ## Given - let manager = newRateLimitManager[string]( + let store = MemoryRateLimitStore.new() + let manager = await newRateLimitManager[string, MemoryRateLimitStore]( + store, mockSender, capacity = 2, duration = chronos.milliseconds(100), @@ -109,7 +116,9 @@ suite "Queue RateLimitManager": asyncTest "enqueue - enqueue critical only when exceeded": ## Given - let manager = newRateLimitManager[string]( + let store = MemoryRateLimitStore.new() + let manager = await newRateLimitManager[string, MemoryRateLimitStore]( + store, mockSender, capacity = 10, duration = chronos.milliseconds(100), @@ -163,7 +172,9 @@ suite "Queue RateLimitManager": asyncTest "enqueue - enqueue normal on 70% capacity": ## Given - let manager = newRateLimitManager[string]( + let store = MemoryRateLimitStore.new() + let manager = await newRateLimitManager[string, MemoryRateLimitStore]( + store, mockSender, capacity = 10, duration = chronos.milliseconds(100), @@ -220,7 +231,9 @@ suite "Queue RateLimitManager": asyncTest "enqueue - process queued messages": ## Given - let manager = newRateLimitManager[string]( + let store = MemoryRateLimitStore.new() + let manager = await newRateLimitManager[string, MemoryRateLimitStore]( + store, mockSender, capacity = 10, duration = chronos.milliseconds(200),