mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-07 16:33:08 +00:00
Adapt request_limiter to new chronos' TokenBucket replenish algorithm to keep original intent of use
This commit is contained in:
parent
8eae714072
commit
f2deb490c5
@ -39,38 +39,82 @@ const SECONDS_RATIO = 3
|
|||||||
const MINUTES_RATIO = 2
|
const MINUTES_RATIO = 2
|
||||||
|
|
||||||
type RequestRateLimiter* = ref object of RootObj
|
type RequestRateLimiter* = ref object of RootObj
|
||||||
tokenBucket: Option[TokenBucket]
|
tokenBucket: TokenBucket
|
||||||
setting*: Option[RateLimitSetting]
|
setting*: Option[RateLimitSetting]
|
||||||
|
mainBucketSetting: RateLimitSetting
|
||||||
|
ratio: int
|
||||||
peerBucketSetting*: RateLimitSetting
|
peerBucketSetting*: RateLimitSetting
|
||||||
peerUsage: TimedMap[PeerId, TokenBucket]
|
peerUsage: TimedMap[PeerId, TokenBucket]
|
||||||
|
checkUsageImpl: proc(
|
||||||
|
t: var RequestRateLimiter, proto: string, conn: Connection, now: Moment
|
||||||
|
): bool {.gcsafe, raises: [].}
|
||||||
|
|
||||||
|
proc newMainTokenBucket(
|
||||||
|
setting: RateLimitSetting, ratio: int, startTime: Moment
|
||||||
|
): TokenBucket =
|
||||||
|
## RequestRateLimiter's global bucket should keep the *rate* of the configured
|
||||||
|
## setting while allowing a larger burst window. We achieve this by scaling
|
||||||
|
## both capacity and fillDuration by the same ratio.
|
||||||
|
##
|
||||||
|
## This matches previous behavior where unused tokens could effectively
|
||||||
|
## accumulate across multiple periods.
|
||||||
|
let burstCapacity = setting.volume * ratio
|
||||||
|
var bucket = TokenBucket.new(
|
||||||
|
capacity = burstCapacity,
|
||||||
|
fillDuration = setting.period * ratio,
|
||||||
|
startTime = startTime,
|
||||||
|
mode = Continuous,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Start with the configured volume (not the burst capacity) so that the
|
||||||
|
# initial burst behavior matches the raw setting, while still allowing
|
||||||
|
# accumulation up to `burstCapacity` over time.
|
||||||
|
let excess = burstCapacity - setting.volume
|
||||||
|
if excess > 0:
|
||||||
|
discard bucket.tryConsume(excess, startTime)
|
||||||
|
|
||||||
|
return bucket
|
||||||
|
|
||||||
proc mgetOrPut(
|
proc mgetOrPut(
|
||||||
requestRateLimiter: var RequestRateLimiter, peerId: PeerId
|
requestRateLimiter: var RequestRateLimiter, peerId: PeerId, now: Moment
|
||||||
): var TokenBucket =
|
): var TokenBucket =
|
||||||
let bucketForNew = newTokenBucket(some(requestRateLimiter.peerBucketSetting)).valueOr:
|
let bucketForNew = newTokenBucket(
|
||||||
|
some(requestRateLimiter.peerBucketSetting), Discrete, now
|
||||||
|
).valueOr:
|
||||||
raiseAssert "This branch is not allowed to be reached as it will not be called if the setting is None."
|
raiseAssert "This branch is not allowed to be reached as it will not be called if the setting is None."
|
||||||
|
|
||||||
return requestRateLimiter.peerUsage.mgetOrPut(peerId, bucketForNew)
|
return requestRateLimiter.peerUsage.mgetOrPut(peerId, bucketForNew)
|
||||||
|
|
||||||
proc checkUsage*(
|
proc checkUsageUnlimited(
|
||||||
t: var RequestRateLimiter, proto: string, conn: Connection, now = Moment.now()
|
t: var RequestRateLimiter, proto: string, conn: Connection, now: Moment
|
||||||
): bool {.raises: [].} =
|
): bool {.gcsafe, raises: [].} =
|
||||||
if t.tokenBucket.isNone():
|
true
|
||||||
return true
|
|
||||||
|
|
||||||
let peerBucket = t.mgetOrPut(conn.peerId)
|
proc checkUsageLimited(
|
||||||
|
t: var RequestRateLimiter, proto: string, conn: Connection, now: Moment
|
||||||
|
): bool {.gcsafe, raises: [].} =
|
||||||
|
# Lazy-init the main bucket using the first observed request time. This makes
|
||||||
|
# refill behavior deterministic under tests where `now` is controlled.
|
||||||
|
if isNil(t.tokenBucket):
|
||||||
|
t.tokenBucket = newMainTokenBucket(t.mainBucketSetting, t.ratio, now)
|
||||||
|
|
||||||
|
let peerBucket = t.mgetOrPut(conn.peerId, now)
|
||||||
## check requesting peer's usage is not over the calculated ratio and let that peer go which not requested much/or this time...
|
## check requesting peer's usage is not over the calculated ratio and let that peer go which not requested much/or this time...
|
||||||
if not peerBucket.tryConsume(1, now):
|
if not peerBucket.tryConsume(1, now):
|
||||||
trace "peer usage limit reached", peer = conn.peerId
|
trace "peer usage limit reached", peer = conn.peerId
|
||||||
return false
|
return false
|
||||||
|
|
||||||
# Ok if the peer can consume, check the overall budget we have left
|
# Ok if the peer can consume, check the overall budget we have left
|
||||||
let tokenBucket = t.tokenBucket.get()
|
if not t.tokenBucket.tryConsume(1, now):
|
||||||
if not tokenBucket.tryConsume(1, now):
|
|
||||||
return false
|
return false
|
||||||
|
|
||||||
return true
|
return true
|
||||||
|
|
||||||
|
proc checkUsage*(
|
||||||
|
t: var RequestRateLimiter, proto: string, conn: Connection, now = Moment.now()
|
||||||
|
): bool {.raises: [].} =
|
||||||
|
t.checkUsageImpl(t, proto, conn, now)
|
||||||
|
|
||||||
template checkUsageLimit*(
|
template checkUsageLimit*(
|
||||||
t: var RequestRateLimiter,
|
t: var RequestRateLimiter,
|
||||||
proto: string,
|
proto: string,
|
||||||
@ -135,9 +179,19 @@ func calcPeerTokenSetting(
|
|||||||
|
|
||||||
proc newRequestRateLimiter*(setting: Option[RateLimitSetting]): RequestRateLimiter =
|
proc newRequestRateLimiter*(setting: Option[RateLimitSetting]): RequestRateLimiter =
|
||||||
let ratio = calcPeriodRatio(setting)
|
let ratio = calcPeriodRatio(setting)
|
||||||
|
let isLimited = setting.isSome() and not setting.get().isUnlimited()
|
||||||
|
let mainBucketSetting =
|
||||||
|
if isLimited:
|
||||||
|
setting.get()
|
||||||
|
else:
|
||||||
|
(0, 0.minutes)
|
||||||
|
|
||||||
return RequestRateLimiter(
|
return RequestRateLimiter(
|
||||||
tokenBucket: newTokenBucket(setting),
|
tokenBucket: nil,
|
||||||
setting: setting,
|
setting: setting,
|
||||||
|
mainBucketSetting: mainBucketSetting,
|
||||||
|
ratio: ratio,
|
||||||
peerBucketSetting: calcPeerTokenSetting(setting, ratio),
|
peerBucketSetting: calcPeerTokenSetting(setting, ratio),
|
||||||
peerUsage: init(TimedMap[PeerId, TokenBucket], calcCacheTimeout(setting, ratio)),
|
peerUsage: init(TimedMap[PeerId, TokenBucket], calcCacheTimeout(setting, ratio)),
|
||||||
|
checkUsageImpl: (if isLimited: checkUsageLimited else: checkUsageUnlimited),
|
||||||
)
|
)
|
||||||
|
|||||||
@ -13,7 +13,8 @@ export token_bucket, setting, service_metrics
|
|||||||
|
|
||||||
proc newTokenBucket*(
|
proc newTokenBucket*(
|
||||||
setting: Option[RateLimitSetting],
|
setting: Option[RateLimitSetting],
|
||||||
replenishMode: ReplenishMode = ReplenishMode.Continuous,
|
replenishMode: static[ReplenishMode] = ReplenishMode.Continuous,
|
||||||
|
startTime: Moment = Moment.now(),
|
||||||
): Option[TokenBucket] =
|
): Option[TokenBucket] =
|
||||||
if setting.isNone():
|
if setting.isNone():
|
||||||
return none[TokenBucket]()
|
return none[TokenBucket]()
|
||||||
@ -25,7 +26,8 @@ proc newTokenBucket*(
|
|||||||
TokenBucket.new(
|
TokenBucket.new(
|
||||||
capacity = setting.get().volume,
|
capacity = setting.get().volume,
|
||||||
fillDuration = setting.get().period,
|
fillDuration = setting.get().period,
|
||||||
mode = Continuous,
|
startTime = startTime,
|
||||||
|
mode = replenishMode,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user