Token bucket (#279)
- A single sleepAsync per bucket - Manual replenish with async - Cancellation of consume Co-authored-by: Jacek Sieka <jacek@status.im>
This commit is contained in:
parent
266e2c0ed2
commit
24146463a3
|
@ -0,0 +1,138 @@
|
|||
# Chronos Rate Limiter
|
||||
# (c) Copyright 2022-Present
|
||||
# Status Research & Development GmbH
|
||||
#
|
||||
# Licensed under either of
|
||||
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
||||
# MIT license (LICENSE-MIT)
|
||||
|
||||
{.push raises: [Defect].}
|
||||
|
||||
import ../chronos
|
||||
import timer
|
||||
|
||||
export timer
|
||||
|
||||
type
|
||||
BucketWaiter = object
|
||||
future: Future[void]
|
||||
value: int
|
||||
alreadyConsumed: int
|
||||
|
||||
TokenBucket* = ref object
|
||||
budget: int
|
||||
budgetCap: int
|
||||
lastUpdate: Moment
|
||||
fillDuration: Duration
|
||||
workFuture: Future[void]
|
||||
pendingRequests: seq[BucketWaiter]
|
||||
manuallyReplenished: AsyncEvent
|
||||
|
||||
proc update(bucket: TokenBucket) =
|
||||
if bucket.fillDuration == default(Duration):
|
||||
bucket.budget = min(bucket.budgetCap, bucket.budget)
|
||||
return
|
||||
|
||||
let
|
||||
currentTime = Moment.now()
|
||||
timeDelta = currentTime - bucket.lastUpdate
|
||||
fillPercent = timeDelta.milliseconds.float / bucket.fillDuration.milliseconds.float
|
||||
replenished =
|
||||
int(bucket.budgetCap.float * fillPercent)
|
||||
deltaFromReplenished =
|
||||
int(bucket.fillDuration.milliseconds.float *
|
||||
replenished.float / bucket.budgetCap.float)
|
||||
|
||||
bucket.lastUpdate += milliseconds(deltaFromReplenished)
|
||||
bucket.budget = min(bucket.budgetCap, bucket.budget + replenished)
|
||||
|
||||
proc tryConsume*(bucket: TokenBucket, tokens: int): bool =
|
||||
## If `tokens` are available, consume them,
|
||||
## Otherwhise, return false.
|
||||
|
||||
if bucket.budget >= tokens:
|
||||
bucket.budget -= tokens
|
||||
return true
|
||||
|
||||
bucket.update()
|
||||
|
||||
if bucket.budget >= tokens:
|
||||
bucket.budget -= tokens
|
||||
true
|
||||
else:
|
||||
false
|
||||
|
||||
proc worker(bucket: TokenBucket) {.async.} =
|
||||
while bucket.pendingRequests.len > 0:
|
||||
bucket.manuallyReplenished.clear()
|
||||
template waiter: untyped = bucket.pendingRequests[0]
|
||||
|
||||
if bucket.tryConsume(waiter.value):
|
||||
waiter.future.complete()
|
||||
bucket.pendingRequests.delete(0)
|
||||
else:
|
||||
waiter.value -= bucket.budget
|
||||
waiter.alreadyConsumed += bucket.budget
|
||||
bucket.budget = 0
|
||||
|
||||
let eventWaiter = bucket.manuallyReplenished.wait()
|
||||
if bucket.fillDuration.milliseconds > 0:
|
||||
let
|
||||
nextCycleValue = float(min(waiter.value, bucket.budgetCap))
|
||||
budgetRatio = nextCycleValue.float / bucket.budgetCap.float
|
||||
timeToTarget = int(budgetRatio * bucket.fillDuration.milliseconds.float) + 1
|
||||
#TODO this will create a timer for each blocked bucket,
|
||||
#which may cause performance issue when creating many
|
||||
#buckets
|
||||
sleeper = sleepAsync(milliseconds(timeToTarget))
|
||||
await sleeper or eventWaiter
|
||||
sleeper.cancel()
|
||||
eventWaiter.cancel()
|
||||
else:
|
||||
await eventWaiter
|
||||
|
||||
bucket.workFuture = nil
|
||||
|
||||
proc consume*(bucket: TokenBucket, tokens: int): Future[void] =
|
||||
## Wait for `tokens` to be available, and consume them.
|
||||
|
||||
let retFuture = newFuture[void]("TokenBucket.consume")
|
||||
if isNil(bucket.workFuture) or bucket.workFuture.finished():
|
||||
if bucket.tryConsume(tokens):
|
||||
retFuture.complete()
|
||||
return retFuture
|
||||
|
||||
bucket.pendingRequests.add(BucketWaiter(future: retFuture, value: tokens))
|
||||
if isNil(bucket.workFuture) or bucket.workFuture.finished():
|
||||
bucket.workFuture = worker(bucket)
|
||||
|
||||
proc cancellation(udata: pointer) =
|
||||
for index in 0..<bucket.pendingRequests.len:
|
||||
if bucket.pendingRequests[index].future == retFuture:
|
||||
bucket.budget += bucket.pendingRequests[index].alreadyConsumed
|
||||
bucket.pendingRequests.delete(index)
|
||||
if index == 0:
|
||||
bucket.manuallyReplenished.fire()
|
||||
break
|
||||
retFuture.cancelCallback = cancellation
|
||||
return retFuture
|
||||
|
||||
proc replenish*(bucket: TokenBucket, tokens: int) =
|
||||
## Add `tokens` to the budget (capped to the bucket capacity)
|
||||
bucket.budget += tokens
|
||||
bucket.update()
|
||||
bucket.manuallyReplenished.fire()
|
||||
|
||||
proc new*(
|
||||
T: type[TokenBucket],
|
||||
budgetCap: int,
|
||||
fillDuration: Duration = 1.seconds): T =
|
||||
|
||||
## Create a TokenBucket
|
||||
T(
|
||||
budget: budgetCap,
|
||||
budgetCap: budgetCap,
|
||||
fillDuration: fillDuration,
|
||||
lastUpdate: Moment.now(),
|
||||
manuallyReplenished: newAsyncEvent()
|
||||
)
|
|
@ -7,5 +7,8 @@
|
|||
# MIT license (LICENSE-MIT)
|
||||
import testmacro, testsync, testsoon, testtime, testfut, testsignal,
|
||||
testaddress, testdatagram, teststream, testserver, testbugs, testnet,
|
||||
testasyncstream, testhttpserver, testshttpserver, testhttpclient
|
||||
testasyncstream, testhttpserver, testshttpserver, testhttpclient,
|
||||
testratelimit
|
||||
|
||||
# Must be imported last to check for Pending futures
|
||||
import testutils
|
||||
|
|
|
@ -0,0 +1,121 @@
|
|||
# Chronos Test Suite
|
||||
# (c) Copyright 2022-Present
|
||||
# Status Research & Development GmbH
|
||||
#
|
||||
# Licensed under either of
|
||||
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
||||
# MIT license (LICENSE-MIT)
|
||||
|
||||
import unittest
|
||||
import ../chronos
|
||||
import ../chronos/ratelimit
|
||||
|
||||
suite "Token Bucket":
|
||||
test "Sync test":
|
||||
var bucket = TokenBucket.new(1000, 1.milliseconds)
|
||||
check:
|
||||
bucket.tryConsume(800) == true
|
||||
bucket.tryConsume(200) == true
|
||||
|
||||
# Out of budget
|
||||
bucket.tryConsume(100) == false
|
||||
waitFor(sleepAsync(10.milliseconds))
|
||||
check:
|
||||
bucket.tryConsume(800) == true
|
||||
bucket.tryConsume(200) == true
|
||||
|
||||
# Out of budget
|
||||
bucket.tryConsume(100) == false
|
||||
|
||||
test "Async test":
|
||||
var bucket = TokenBucket.new(1000, 1.seconds)
|
||||
check: bucket.tryConsume(1000) == true
|
||||
|
||||
var toWait = newSeq[Future[void]]()
|
||||
for _ in 0..<150:
|
||||
toWait.add(bucket.consume(10))
|
||||
|
||||
let start = Moment.now()
|
||||
waitFor(allFutures(toWait))
|
||||
let duration = Moment.now() - start
|
||||
|
||||
check: duration in 1400.milliseconds .. 1600.milliseconds
|
||||
|
||||
test "Over budget async":
|
||||
var bucket = TokenBucket.new(100, 10.milliseconds)
|
||||
# Consume 10* the budget cap
|
||||
let beforeStart = Moment.now()
|
||||
waitFor(bucket.consume(1000).wait(1.seconds))
|
||||
check Moment.now() - beforeStart in 80.milliseconds .. 120.milliseconds
|
||||
|
||||
test "Sync manual replenish":
|
||||
var bucket = TokenBucket.new(1000, 0.seconds)
|
||||
check:
|
||||
bucket.tryConsume(1000) == true
|
||||
bucket.tryConsume(1000) == false
|
||||
bucket.replenish(2000)
|
||||
check:
|
||||
bucket.tryConsume(1000) == true
|
||||
# replenish is capped to the bucket max
|
||||
bucket.tryConsume(1000) == false
|
||||
|
||||
test "Async manual replenish":
|
||||
var bucket = TokenBucket.new(10 * 150, 0.seconds)
|
||||
check:
|
||||
bucket.tryConsume(10 * 150) == true
|
||||
bucket.tryConsume(1000) == false
|
||||
var toWait = newSeq[Future[void]]()
|
||||
for _ in 0..<150:
|
||||
toWait.add(bucket.consume(10))
|
||||
|
||||
let lastOne = bucket.consume(10)
|
||||
|
||||
# Test cap as well
|
||||
bucket.replenish(1000000)
|
||||
waitFor(allFutures(toWait).wait(10.milliseconds))
|
||||
|
||||
check: not lastOne.finished()
|
||||
|
||||
bucket.replenish(10)
|
||||
waitFor(lastOne.wait(10.milliseconds))
|
||||
|
||||
test "Async cancellation":
|
||||
var bucket = TokenBucket.new(100, 0.seconds)
|
||||
let
|
||||
fut1 = bucket.consume(20)
|
||||
futBlocker = bucket.consume(1000)
|
||||
fut2 = bucket.consume(50)
|
||||
|
||||
waitFor(fut1.wait(10.milliseconds))
|
||||
waitFor(sleepAsync(10.milliseconds))
|
||||
check:
|
||||
futBlocker.finished == false
|
||||
fut2.finished == false
|
||||
|
||||
futBlocker.cancel()
|
||||
waitFor(fut2.wait(10.milliseconds))
|
||||
|
||||
test "Very long replenish":
|
||||
var bucket = TokenBucket.new(7000, 1.hours)
|
||||
check bucket.tryConsume(7000)
|
||||
check bucket.tryConsume(1) == false
|
||||
|
||||
# With this setting, it takes 514 milliseconds
|
||||
# to tick one. Check that we can eventually
|
||||
# consume, even if we update multiple time
|
||||
# before that
|
||||
waitFor(sleepAsync(200.milliseconds))
|
||||
check bucket.tryConsume(1) == false
|
||||
waitFor(sleepAsync(200.milliseconds))
|
||||
check bucket.tryConsume(1) == false
|
||||
waitFor(sleepAsync(200.milliseconds))
|
||||
check bucket.tryConsume(1) == true
|
||||
check bucket.tryConsume(1) == false
|
||||
|
||||
test "Short replenish":
|
||||
var bucket = TokenBucket.new(15000, 1.milliseconds)
|
||||
check bucket.tryConsume(15000)
|
||||
check bucket.tryConsume(1) == false
|
||||
|
||||
waitFor(sleepAsync(1.milliseconds))
|
||||
check bucket.tryConsume(15000) == true
|
Loading…
Reference in New Issue