mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-02 14:03:06 +00:00
Adapt using chronos' TokenBucket. Removed TokenBucket and test. bump nim-chronos -> nim-libp2p/nim-lsquic/nim-jwt -> adapt to latest libp2p changes
This commit is contained in:
parent
2477c4980f
commit
c1d657a482
6
.gitmodules
vendored
6
.gitmodules
vendored
@ -184,3 +184,9 @@
|
||||
url = https://github.com/logos-messaging/waku-rlnv2-contract.git
|
||||
ignore = untracked
|
||||
branch = master
|
||||
[submodule "vendor/nim-lsquic"]
|
||||
path = vendor/nim-lsquic
|
||||
url = https://github.com/vacp2p/nim-lsquic
|
||||
[submodule "vendor/nim-jwt"]
|
||||
path = vendor/nim-jwt
|
||||
url = https://github.com/vacp2p/nim-jwt.git
|
||||
|
||||
@ -6,7 +6,6 @@ import
|
||||
./test_protobuf_validation,
|
||||
./test_sqlite_migrations,
|
||||
./test_parse_size,
|
||||
./test_tokenbucket,
|
||||
./test_requestratelimiter,
|
||||
./test_ratelimit_setting,
|
||||
./test_timed_map,
|
||||
|
||||
@ -1,69 +0,0 @@
|
||||
# Chronos Test Suite
|
||||
# (c) Copyright 2022-Present
|
||||
# Status Research & Development GmbH
|
||||
#
|
||||
# Licensed under either of
|
||||
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
||||
# MIT license (LICENSE-MIT)
|
||||
|
||||
{.used.}
|
||||
|
||||
import testutils/unittests
|
||||
import chronos
|
||||
import ../../waku/common/rate_limit/token_bucket
|
||||
|
||||
suite "Token Bucket":
|
||||
test "TokenBucket Sync test - strict":
|
||||
var bucket = TokenBucket.newStrict(1000, 1.milliseconds)
|
||||
let
|
||||
start = Moment.now()
|
||||
fullTime = start + 1.milliseconds
|
||||
check:
|
||||
bucket.tryConsume(800, start) == true
|
||||
bucket.tryConsume(200, start) == true
|
||||
# Out of budget
|
||||
bucket.tryConsume(100, start) == false
|
||||
bucket.tryConsume(800, fullTime) == true
|
||||
bucket.tryConsume(200, fullTime) == true
|
||||
# Out of budget
|
||||
bucket.tryConsume(100, fullTime) == false
|
||||
|
||||
test "TokenBucket Sync test - compensating":
|
||||
var bucket = TokenBucket.new(1000, 1.milliseconds)
|
||||
let
|
||||
start = Moment.now()
|
||||
fullTime = start + 1.milliseconds
|
||||
check:
|
||||
bucket.tryConsume(800, start) == true
|
||||
bucket.tryConsume(200, start) == true
|
||||
# Out of budget
|
||||
bucket.tryConsume(100, start) == false
|
||||
bucket.tryConsume(800, fullTime) == true
|
||||
bucket.tryConsume(200, fullTime) == true
|
||||
# Due not using the bucket for a full period the compensation will satisfy this request
|
||||
bucket.tryConsume(100, fullTime) == true
|
||||
|
||||
test "TokenBucket Max compensation":
|
||||
var bucket = TokenBucket.new(1000, 1.minutes)
|
||||
var reqTime = Moment.now()
|
||||
|
||||
check bucket.tryConsume(1000, reqTime)
|
||||
check bucket.tryConsume(1, reqTime) == false
|
||||
reqTime += 1.minutes
|
||||
check bucket.tryConsume(500, reqTime) == true
|
||||
reqTime += 1.minutes
|
||||
check bucket.tryConsume(1000, reqTime) == true
|
||||
reqTime += 10.seconds
|
||||
# max compensation is 25% so try to consume 250 more
|
||||
check bucket.tryConsume(250, reqTime) == true
|
||||
reqTime += 49.seconds
|
||||
# out of budget within the same period
|
||||
check bucket.tryConsume(1, reqTime) == false
|
||||
|
||||
test "TokenBucket Short replenish":
|
||||
var bucket = TokenBucket.new(15000, 1.milliseconds)
|
||||
let start = Moment.now()
|
||||
check bucket.tryConsume(15000, start)
|
||||
check bucket.tryConsume(1, start) == false
|
||||
|
||||
check bucket.tryConsume(15000, start + 1.milliseconds) == true
|
||||
2
vendor/nim-chronos
vendored
2
vendor/nim-chronos
vendored
@ -1 +1 @@
|
||||
Subproject commit 0646c444fce7c7ed08ef6f2c9a7abfd172ffe655
|
||||
Subproject commit 85af4db764ecd3573c4704139560df3943216cf1
|
||||
1
vendor/nim-jwt
vendored
Submodule
1
vendor/nim-jwt
vendored
Submodule
@ -0,0 +1 @@
|
||||
Subproject commit 18f8378de52b241f321c1f9ea905456e89b95c6f
|
||||
2
vendor/nim-libp2p
vendored
2
vendor/nim-libp2p
vendored
@ -1 +1 @@
|
||||
Subproject commit e82080f7b1aa61c6d35fa5311b873f41eff4bb52
|
||||
Subproject commit eb7e6ff89889e41b57515f891ba82986c54809fb
|
||||
1
vendor/nim-lsquic
vendored
Submodule
1
vendor/nim-lsquic
vendored
Submodule
@ -0,0 +1 @@
|
||||
Subproject commit f3fe33462601ea34eb2e8e9c357c92e61f8d121b
|
||||
@ -30,7 +30,9 @@ requires "nim >= 2.2.4",
|
||||
"regex",
|
||||
"results",
|
||||
"db_connector",
|
||||
"minilru"
|
||||
"minilru",
|
||||
"lsquic",
|
||||
"jwt"
|
||||
|
||||
### Helper functions
|
||||
proc buildModule(filePath, params = "", lang = "c"): bool =
|
||||
|
||||
@ -20,7 +20,7 @@ proc mgetOrPut(
|
||||
perPeerRateLimiter: var PerPeerRateLimiter, peerId: PeerId
|
||||
): var Option[TokenBucket] =
|
||||
return perPeerRateLimiter.peerBucket.mgetOrPut(
|
||||
peerId, newTokenBucket(perPeerRateLimiter.setting, ReplenishMode.Compensating)
|
||||
peerId, newTokenBucket(perPeerRateLimiter.setting, ReplenishMode.Continuous)
|
||||
)
|
||||
|
||||
template checkUsageLimit*(
|
||||
|
||||
@ -6,12 +6,14 @@ import std/[options], chronos/timer, libp2p/stream/connection, libp2p/utility
|
||||
|
||||
import std/times except TimeInterval, Duration
|
||||
|
||||
import ./[token_bucket, setting, service_metrics]
|
||||
import chronos/ratelimit as token_bucket
|
||||
|
||||
import ./[setting, service_metrics]
|
||||
export token_bucket, setting, service_metrics
|
||||
|
||||
proc newTokenBucket*(
|
||||
setting: Option[RateLimitSetting],
|
||||
replenishMode: ReplenishMode = ReplenishMode.Compensating,
|
||||
replenishMode: ReplenishMode = ReplenishMode.Continuous,
|
||||
): Option[TokenBucket] =
|
||||
if setting.isNone():
|
||||
return none[TokenBucket]()
|
||||
@ -19,7 +21,13 @@ proc newTokenBucket*(
|
||||
if setting.get().isUnlimited():
|
||||
return none[TokenBucket]()
|
||||
|
||||
return some(TokenBucket.new(setting.get().volume, setting.get().period))
|
||||
return some(
|
||||
TokenBucket.new(
|
||||
capacity = setting.get().volume,
|
||||
fillDuration = setting.get().period,
|
||||
mode = Continuous,
|
||||
)
|
||||
)
|
||||
|
||||
proc checkUsage(
|
||||
t: var TokenBucket, proto: string, now = Moment.now()
|
||||
|
||||
@ -1,182 +0,0 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import chronos, std/math, std/options
|
||||
|
||||
const BUDGET_COMPENSATION_LIMIT_PERCENT = 0.25
|
||||
|
||||
## This is an extract from chronos/rate_limit.nim due to the found bug in the original implementation.
|
||||
## Unfortunately that bug cannot be solved without harm the original features of TokenBucket class.
|
||||
## So, this current shortcut is used to enable move ahead with nwaku rate limiter implementation.
|
||||
## ref: https://github.com/status-im/nim-chronos/issues/500
|
||||
##
|
||||
## This version of TokenBucket is different from the original one in chronos/rate_limit.nim in many ways:
|
||||
## - It has a new mode called `Compensating` which is the default mode.
|
||||
## Compensation is calculated as the not used bucket capacity in the last measured period(s) in average.
|
||||
## or up until maximum the allowed compansation treshold (Currently it is const 25%).
|
||||
## Also compensation takes care of the proper time period calculation to avoid non-usage periods that can lead to
|
||||
## overcompensation.
|
||||
## - Strict mode is also available which will only replenish when time period is over but also will fill
|
||||
## the bucket to the max capacity.
|
||||
|
||||
type
|
||||
ReplenishMode* = enum
|
||||
Strict
|
||||
Compensating
|
||||
|
||||
TokenBucket* = ref object
|
||||
budget: int ## Current number of tokens in the bucket
|
||||
budgetCap: int ## Bucket capacity
|
||||
lastTimeFull: Moment
|
||||
## This timer measures the proper periodizaiton of the bucket refilling
|
||||
fillDuration: Duration ## Refill period
|
||||
case replenishMode*: ReplenishMode
|
||||
of Strict:
|
||||
## In strict mode, the bucket is refilled only till the budgetCap
|
||||
discard
|
||||
of Compensating:
|
||||
## This is the default mode.
|
||||
maxCompensation: float
|
||||
|
||||
func periodDistance(bucket: TokenBucket, currentTime: Moment): float =
|
||||
## notice fillDuration cannot be zero by design
|
||||
## period distance is a float number representing the calculated period time
|
||||
## since the last time bucket was refilled.
|
||||
return
|
||||
nanoseconds(currentTime - bucket.lastTimeFull).float /
|
||||
nanoseconds(bucket.fillDuration).float
|
||||
|
||||
func getUsageAverageSince(bucket: TokenBucket, distance: float): float =
|
||||
if distance == 0.float:
|
||||
## in case there is zero time difference than the usage percentage is 100%
|
||||
return 1.0
|
||||
|
||||
## budgetCap can never be zero
|
||||
## usage average is calculated as a percentage of total capacity available over
|
||||
## the measured period
|
||||
return bucket.budget.float / bucket.budgetCap.float / distance
|
||||
|
||||
proc calcCompensation(bucket: TokenBucket, averageUsage: float): int =
|
||||
# if we already fully used or even overused the tokens, there is no place for compensation
|
||||
if averageUsage >= 1.0:
|
||||
return 0
|
||||
|
||||
## compensation is the not used bucket capacity in the last measured period(s) in average.
|
||||
## or maximum the allowed compansation treshold
|
||||
let compensationPercent =
|
||||
min((1.0 - averageUsage) * bucket.budgetCap.float, bucket.maxCompensation)
|
||||
return trunc(compensationPercent).int
|
||||
|
||||
func periodElapsed(bucket: TokenBucket, currentTime: Moment): bool =
|
||||
return currentTime - bucket.lastTimeFull >= bucket.fillDuration
|
||||
|
||||
## Update will take place if bucket is empty and trying to consume tokens.
|
||||
## It checks if the bucket can be replenished as refill duration is passed or not.
|
||||
## - strict mode:
|
||||
proc updateStrict(bucket: TokenBucket, currentTime: Moment) =
|
||||
if bucket.fillDuration == default(Duration):
|
||||
bucket.budget = min(bucket.budgetCap, bucket.budget)
|
||||
return
|
||||
|
||||
if not periodElapsed(bucket, currentTime):
|
||||
return
|
||||
|
||||
bucket.budget = bucket.budgetCap
|
||||
bucket.lastTimeFull = currentTime
|
||||
|
||||
## - compensating - ballancing load:
|
||||
## - between updates we calculate average load (current bucket capacity / number of periods till last update)
|
||||
## - gives the percentage load used recently
|
||||
## - with this we can replenish bucket up to 100% + calculated leftover from previous period (caped with max treshold)
|
||||
proc updateWithCompensation(bucket: TokenBucket, currentTime: Moment) =
|
||||
if bucket.fillDuration == default(Duration):
|
||||
bucket.budget = min(bucket.budgetCap, bucket.budget)
|
||||
return
|
||||
|
||||
# do not replenish within the same period
|
||||
if not periodElapsed(bucket, currentTime):
|
||||
return
|
||||
|
||||
let distance = bucket.periodDistance(currentTime)
|
||||
let recentAvgUsage = bucket.getUsageAverageSince(distance)
|
||||
let compensation = bucket.calcCompensation(recentAvgUsage)
|
||||
|
||||
bucket.budget = bucket.budgetCap + compensation
|
||||
bucket.lastTimeFull = currentTime
|
||||
|
||||
proc update(bucket: TokenBucket, currentTime: Moment) =
|
||||
if bucket.replenishMode == ReplenishMode.Compensating:
|
||||
updateWithCompensation(bucket, currentTime)
|
||||
else:
|
||||
updateStrict(bucket, currentTime)
|
||||
|
||||
proc tryConsume*(bucket: TokenBucket, tokens: int, now = Moment.now()): bool =
|
||||
## If `tokens` are available, consume them,
|
||||
## Otherwhise, return false.
|
||||
|
||||
if bucket.budget >= bucket.budgetCap:
|
||||
bucket.lastTimeFull = now
|
||||
|
||||
if bucket.budget >= tokens:
|
||||
bucket.budget -= tokens
|
||||
return true
|
||||
|
||||
bucket.update(now)
|
||||
|
||||
if bucket.budget >= tokens:
|
||||
bucket.budget -= tokens
|
||||
return true
|
||||
else:
|
||||
return false
|
||||
|
||||
proc replenish*(bucket: TokenBucket, tokens: int, now = Moment.now()) =
|
||||
## Add `tokens` to the budget (capped to the bucket capacity)
|
||||
bucket.budget += tokens
|
||||
bucket.update(now)
|
||||
|
||||
proc new*(
|
||||
T: type[TokenBucket],
|
||||
budgetCap: int,
|
||||
fillDuration: Duration = 1.seconds,
|
||||
mode: ReplenishMode = ReplenishMode.Compensating,
|
||||
): T =
|
||||
assert not isZero(fillDuration)
|
||||
assert budgetCap != 0
|
||||
|
||||
## Create different mode TokenBucket
|
||||
case mode
|
||||
of ReplenishMode.Strict:
|
||||
return T(
|
||||
budget: budgetCap,
|
||||
budgetCap: budgetCap,
|
||||
fillDuration: fillDuration,
|
||||
lastTimeFull: Moment.now(),
|
||||
replenishMode: mode,
|
||||
)
|
||||
of ReplenishMode.Compensating:
|
||||
T(
|
||||
budget: budgetCap,
|
||||
budgetCap: budgetCap,
|
||||
fillDuration: fillDuration,
|
||||
lastTimeFull: Moment.now(),
|
||||
replenishMode: mode,
|
||||
maxCompensation: budgetCap.float * BUDGET_COMPENSATION_LIMIT_PERCENT,
|
||||
)
|
||||
|
||||
proc newStrict*(T: type[TokenBucket], capacity: int, period: Duration): TokenBucket =
|
||||
T.new(capacity, period, ReplenishMode.Strict)
|
||||
|
||||
proc newCompensating*(
|
||||
T: type[TokenBucket], capacity: int, period: Duration
|
||||
): TokenBucket =
|
||||
T.new(capacity, period, ReplenishMode.Compensating)
|
||||
|
||||
func `$`*(b: TokenBucket): string {.inline.} =
|
||||
if isNil(b):
|
||||
return "nil"
|
||||
return $b.budgetCap & "/" & $b.fillDuration
|
||||
|
||||
func `$`*(ob: Option[TokenBucket]): string {.inline.} =
|
||||
if ob.isNone():
|
||||
return "no-limit"
|
||||
|
||||
return $ob.get()
|
||||
@ -209,6 +209,7 @@ proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] =
|
||||
maxServicePeers = some(builder.maxServicePeers),
|
||||
colocationLimit = builder.colocationLimit,
|
||||
shardedPeerManagement = builder.shardAware,
|
||||
maxConnections = builder.switchMaxConnections.get(builders.MaxConnections),
|
||||
)
|
||||
|
||||
var node: WakuNode
|
||||
|
||||
@ -13,8 +13,6 @@ import
|
||||
libp2p/services/autorelayservice,
|
||||
libp2p/services/hpservice,
|
||||
libp2p/peerid,
|
||||
libp2p/discovery/discoverymngr,
|
||||
libp2p/discovery/rendezvousinterface,
|
||||
eth/keys,
|
||||
eth/p2p/discoveryv5/enr,
|
||||
presto,
|
||||
@ -63,7 +61,6 @@ type Waku* = ref object
|
||||
dynamicBootstrapNodes*: seq[RemotePeerInfo]
|
||||
dnsRetryLoopHandle: Future[void]
|
||||
networkConnLoopHandle: Future[void]
|
||||
discoveryMngr: DiscoveryManager
|
||||
|
||||
node*: WakuNode
|
||||
|
||||
|
||||
@ -103,6 +103,7 @@ type PeerManager* = ref object of RootObj
|
||||
onConnectionChange*: ConnectionChangeHandler
|
||||
online: bool ## state managed by online_monitor module
|
||||
getShards: GetShards
|
||||
maxConnections: int
|
||||
|
||||
#~~~~~~~~~~~~~~~~~~~#
|
||||
# Helper Functions #
|
||||
@ -748,7 +749,6 @@ proc logAndMetrics(pm: PeerManager) {.async.} =
|
||||
var peerStore = pm.switch.peerStore
|
||||
# log metrics
|
||||
let (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
|
||||
let maxConnections = pm.switch.connManager.inSema.size
|
||||
let notConnectedPeers =
|
||||
peerStore.getDisconnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs))
|
||||
let outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId))
|
||||
@ -758,7 +758,7 @@ proc logAndMetrics(pm: PeerManager) {.async.} =
|
||||
info "Relay peer connections",
|
||||
inRelayConns = $inRelayPeers.len & "/" & $pm.inRelayPeersTarget,
|
||||
outRelayConns = $outRelayPeers.len & "/" & $pm.outRelayPeersTarget,
|
||||
totalConnections = $totalConnections & "/" & $maxConnections,
|
||||
totalConnections = $totalConnections & "/" & $pm.maxConnections,
|
||||
notConnectedPeers = notConnectedPeers.len,
|
||||
outsideBackoffPeers = outsideBackoffPeers.len
|
||||
|
||||
@ -1048,9 +1048,9 @@ proc new*(
|
||||
maxFailedAttempts = MaxFailedAttempts,
|
||||
colocationLimit = DefaultColocationLimit,
|
||||
shardedPeerManagement = false,
|
||||
maxConnections: int = MaxConnections,
|
||||
): PeerManager {.gcsafe.} =
|
||||
let capacity = switch.peerStore.capacity
|
||||
let maxConnections = switch.connManager.inSema.size
|
||||
if maxConnections > capacity:
|
||||
error "Max number of connections can't be greater than PeerManager capacity",
|
||||
capacity = capacity, maxConnections = maxConnections
|
||||
@ -1099,6 +1099,7 @@ proc new*(
|
||||
colocationLimit: colocationLimit,
|
||||
shardedPeerManagement: shardedPeerManagement,
|
||||
online: true,
|
||||
maxConnections: maxConnections,
|
||||
)
|
||||
|
||||
proc peerHook(
|
||||
|
||||
@ -8,7 +8,6 @@ import
|
||||
stew/byteutils,
|
||||
libp2p/protocols/rendezvous,
|
||||
libp2p/protocols/rendezvous/protobuf,
|
||||
libp2p/discovery/discoverymngr,
|
||||
libp2p/utils/semaphore,
|
||||
libp2p/utils/offsettedseq,
|
||||
libp2p/crypto/curve25519,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user