Merge fc1ad8abc25ca9b9c8b6ca9462918e5e60653c88 into dafdee9f5ffc0460f45307c61fbd8e9832fc3ecd

This commit is contained in:
NagyZoltanPeter 2025-12-30 10:34:32 +02:00 committed by GitHub
commit dc31c9941e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 125 additions and 282 deletions

6
.gitmodules vendored
View File

@ -184,6 +184,12 @@
url = https://github.com/logos-messaging/waku-rlnv2-contract.git
ignore = untracked
branch = master
[submodule "vendor/nim-lsquic"]
path = vendor/nim-lsquic
url = https://github.com/vacp2p/nim-lsquic
[submodule "vendor/nim-jwt"]
path = vendor/nim-jwt
url = https://github.com/vacp2p/nim-jwt.git
[submodule "vendor/nim-ffi"]
path = vendor/nim-ffi
url = https://github.com/logos-messaging/nim-ffi/

View File

@ -6,7 +6,6 @@ import
./test_protobuf_validation,
./test_sqlite_migrations,
./test_parse_size,
./test_tokenbucket,
./test_requestratelimiter,
./test_ratelimit_setting,
./test_timed_map,

View File

@ -1,69 +0,0 @@
# Chronos Test Suite
# (c) Copyright 2022-Present
# Status Research & Development GmbH
#
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
{.used.}
import testutils/unittests
import chronos
import ../../waku/common/rate_limit/token_bucket
suite "Token Bucket":
test "TokenBucket Sync test - strict":
var bucket = TokenBucket.newStrict(1000, 1.milliseconds)
let
start = Moment.now()
fullTime = start + 1.milliseconds
check:
bucket.tryConsume(800, start) == true
bucket.tryConsume(200, start) == true
# Out of budget
bucket.tryConsume(100, start) == false
bucket.tryConsume(800, fullTime) == true
bucket.tryConsume(200, fullTime) == true
# Out of budget
bucket.tryConsume(100, fullTime) == false
test "TokenBucket Sync test - compensating":
var bucket = TokenBucket.new(1000, 1.milliseconds)
let
start = Moment.now()
fullTime = start + 1.milliseconds
check:
bucket.tryConsume(800, start) == true
bucket.tryConsume(200, start) == true
# Out of budget
bucket.tryConsume(100, start) == false
bucket.tryConsume(800, fullTime) == true
bucket.tryConsume(200, fullTime) == true
# Due not using the bucket for a full period the compensation will satisfy this request
bucket.tryConsume(100, fullTime) == true
test "TokenBucket Max compensation":
var bucket = TokenBucket.new(1000, 1.minutes)
var reqTime = Moment.now()
check bucket.tryConsume(1000, reqTime)
check bucket.tryConsume(1, reqTime) == false
reqTime += 1.minutes
check bucket.tryConsume(500, reqTime) == true
reqTime += 1.minutes
check bucket.tryConsume(1000, reqTime) == true
reqTime += 10.seconds
# max compensation is 25% so try to consume 250 more
check bucket.tryConsume(250, reqTime) == true
reqTime += 49.seconds
# out of budget within the same period
check bucket.tryConsume(1, reqTime) == false
test "TokenBucket Short replenish":
var bucket = TokenBucket.new(15000, 1.milliseconds)
let start = Moment.now()
check bucket.tryConsume(15000, start)
check bucket.tryConsume(1, start) == false
check bucket.tryConsume(15000, start + 1.milliseconds) == true

View File

@ -997,6 +997,7 @@ procSuite "Peer Manager":
.build(),
maxFailedAttempts = 1,
storage = nil,
maxConnections = 20,
)
# Create 30 peers and add them to the peerstore
@ -1063,6 +1064,7 @@ procSuite "Peer Manager":
backoffFactor = 2,
maxFailedAttempts = 10,
storage = nil,
maxConnections = 20,
)
var p1: PeerId
require p1.init("QmeuZJbXrszW2jdT7GdduSjQskPU3S7vvGWKtKgDfkDvW" & "1")
@ -1116,6 +1118,7 @@ procSuite "Peer Manager":
.build(),
maxFailedAttempts = 150,
storage = nil,
maxConnections = 20,
)
# Should result in backoff > 1 week
@ -1131,6 +1134,7 @@ procSuite "Peer Manager":
.build(),
maxFailedAttempts = 10,
storage = nil,
maxConnections = 20,
)
let pm = PeerManager.new(
@ -1144,6 +1148,7 @@ procSuite "Peer Manager":
.build(),
maxFailedAttempts = 5,
storage = nil,
maxConnections = 20,
)
asyncTest "colocationLimit is enforced by pruneConnsByIp()":

View File

@ -147,29 +147,43 @@ suite "Waku Filter - DOS protection":
asyncTest "Ensure normal usage allowed":
# Given
# Rate limit setting is (3 requests / 1000ms) per peer.
# In a token-bucket model this means:
# - capacity = 3 tokens
# - refill rate = 3 tokens / second => ~1 token every ~333ms
# - each request consumes 1 token (including UNSUBSCRIBE)
check client1.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) ==
none(FilterSubscribeErrorKind)
check wakuFilter.subscriptions.isSubscribed(client1.clientPeerId)
await sleepAsync(500.milliseconds)
check client1.ping(serverRemotePeerInfo) == none(FilterSubscribeErrorKind)
check wakuFilter.subscriptions.isSubscribed(client1.clientPeerId)
# Expected remaining tokens (approx): 2
await sleepAsync(500.milliseconds)
check client1.ping(serverRemotePeerInfo) == none(FilterSubscribeErrorKind)
check wakuFilter.subscriptions.isSubscribed(client1.clientPeerId)
# After ~500ms, ~1 token refilled; PING consumes 1 => expected remaining: 2
await sleepAsync(500.milliseconds)
check client1.ping(serverRemotePeerInfo) == none(FilterSubscribeErrorKind)
check wakuFilter.subscriptions.isSubscribed(client1.clientPeerId)
# After another ~500ms, ~1 token refilled; PING consumes 1 => expected remaining: 2
await sleepAsync(50.milliseconds)
check client1.unsubscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) ==
none(FilterSubscribeErrorKind)
check wakuFilter.subscriptions.isSubscribed(client1.clientPeerId) == false
# ~50ms is not enough to refill a token at 3/sec; UNSUBSCRIBE consumes 1 => expected remaining: 1
await sleepAsync(50.milliseconds)
check client1.ping(serverRemotePeerInfo) == some(FilterSubscribeErrorKind.NOT_FOUND)
check client1.ping(serverRemotePeerInfo) == some(FilterSubscribeErrorKind.NOT_FOUND)
await sleepAsync(50.milliseconds)
# PING consumes the last token => expected remaining: 0
check client1.ping(serverRemotePeerInfo) ==
some(FilterSubscribeErrorKind.TOO_MANY_REQUESTS)
# Immediate second PING has no token available => expected remaining: 0
check client2.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) ==
none(FilterSubscribeErrorKind)

2
vendor/nim-chronos vendored

@ -1 +1 @@
Subproject commit 0646c444fce7c7ed08ef6f2c9a7abfd172ffe655
Subproject commit 85af4db764ecd3573c4704139560df3943216cf1

1
vendor/nim-jwt vendored Submodule

@ -0,0 +1 @@
Subproject commit 18f8378de52b241f321c1f9ea905456e89b95c6f

2
vendor/nim-libp2p vendored

@ -1 +1 @@
Subproject commit e82080f7b1aa61c6d35fa5311b873f41eff4bb52
Subproject commit eb7e6ff89889e41b57515f891ba82986c54809fb

1
vendor/nim-lsquic vendored Submodule

@ -0,0 +1 @@
Subproject commit f3fe33462601ea34eb2e8e9c357c92e61f8d121b

View File

@ -31,6 +31,8 @@ requires "nim >= 2.2.4",
"results",
"db_connector",
"minilru",
"lsquic",
"jwt",
"ffi"
### Helper functions
@ -148,7 +150,8 @@ task chat2, "Build example Waku chat usage":
let name = "chat2"
buildBinary name,
"apps/chat2/",
"-d:chronicles_sinks=textlines[file] -d:ssl -d:chronicles_log_level='TRACE' "
"-d:chronicles_sinks=textlines[file] -d:chronicles_log_level='TRACE' "
# -d:ssl - cause unlisted exception error in libp2p/utility...
task chat2mix, "Build example Waku chat mix usage":
# NOTE For debugging, set debug level. For chat usage we want minimal log
@ -158,7 +161,8 @@ task chat2mix, "Build example Waku chat mix usage":
let name = "chat2mix"
buildBinary name,
"apps/chat2mix/",
"-d:chronicles_sinks=textlines[file] -d:ssl -d:chronicles_log_level='TRACE' "
"-d:chronicles_sinks=textlines[file] -d:chronicles_log_level='TRACE' "
# -d:ssl - cause unlisted exception error in libp2p/utility...
task chat2bridge, "Build chat2bridge":
let name = "chat2bridge"

View File

@ -20,7 +20,7 @@ proc mgetOrPut(
perPeerRateLimiter: var PerPeerRateLimiter, peerId: PeerId
): var Option[TokenBucket] =
return perPeerRateLimiter.peerBucket.mgetOrPut(
peerId, newTokenBucket(perPeerRateLimiter.setting, ReplenishMode.Compensating)
peerId, newTokenBucket(perPeerRateLimiter.setting, ReplenishMode.Continuous)
)
template checkUsageLimit*(

View File

@ -39,38 +39,82 @@ const SECONDS_RATIO = 3
const MINUTES_RATIO = 2
type RequestRateLimiter* = ref object of RootObj
tokenBucket: Option[TokenBucket]
tokenBucket: TokenBucket
setting*: Option[RateLimitSetting]
mainBucketSetting: RateLimitSetting
ratio: int
peerBucketSetting*: RateLimitSetting
peerUsage: TimedMap[PeerId, TokenBucket]
checkUsageImpl: proc(
t: var RequestRateLimiter, proto: string, conn: Connection, now: Moment
): bool {.gcsafe, raises: [].}
proc newMainTokenBucket(
setting: RateLimitSetting, ratio: int, startTime: Moment
): TokenBucket =
## RequestRateLimiter's global bucket should keep the *rate* of the configured
## setting while allowing a larger burst window. We achieve this by scaling
## both capacity and fillDuration by the same ratio.
##
## This matches previous behavior where unused tokens could effectively
## accumulate across multiple periods.
let burstCapacity = setting.volume * ratio
var bucket = TokenBucket.new(
capacity = burstCapacity,
fillDuration = setting.period * ratio,
startTime = startTime,
mode = Continuous,
)
# Start with the configured volume (not the burst capacity) so that the
# initial burst behavior matches the raw setting, while still allowing
# accumulation up to `burstCapacity` over time.
let excess = burstCapacity - setting.volume
if excess > 0:
discard bucket.tryConsume(excess, startTime)
return bucket
proc mgetOrPut(
requestRateLimiter: var RequestRateLimiter, peerId: PeerId
requestRateLimiter: var RequestRateLimiter, peerId: PeerId, now: Moment
): var TokenBucket =
let bucketForNew = newTokenBucket(some(requestRateLimiter.peerBucketSetting)).valueOr:
let bucketForNew = newTokenBucket(
some(requestRateLimiter.peerBucketSetting), Discrete, now
).valueOr:
raiseAssert "This branch is not allowed to be reached as it will not be called if the setting is None."
return requestRateLimiter.peerUsage.mgetOrPut(peerId, bucketForNew)
proc checkUsage*(
t: var RequestRateLimiter, proto: string, conn: Connection, now = Moment.now()
): bool {.raises: [].} =
if t.tokenBucket.isNone():
return true
proc checkUsageUnlimited(
t: var RequestRateLimiter, proto: string, conn: Connection, now: Moment
): bool {.gcsafe, raises: [].} =
true
let peerBucket = t.mgetOrPut(conn.peerId)
proc checkUsageLimited(
t: var RequestRateLimiter, proto: string, conn: Connection, now: Moment
): bool {.gcsafe, raises: [].} =
# Lazy-init the main bucket using the first observed request time. This makes
# refill behavior deterministic under tests where `now` is controlled.
if isNil(t.tokenBucket):
t.tokenBucket = newMainTokenBucket(t.mainBucketSetting, t.ratio, now)
let peerBucket = t.mgetOrPut(conn.peerId, now)
## check requesting peer's usage is not over the calculated ratio and let that peer go which not requested much/or this time...
if not peerBucket.tryConsume(1, now):
trace "peer usage limit reached", peer = conn.peerId
return false
# Ok if the peer can consume, check the overall budget we have left
let tokenBucket = t.tokenBucket.get()
if not tokenBucket.tryConsume(1, now):
if not t.tokenBucket.tryConsume(1, now):
return false
return true
proc checkUsage*(
t: var RequestRateLimiter, proto: string, conn: Connection, now = Moment.now()
): bool {.raises: [].} =
t.checkUsageImpl(t, proto, conn, now)
template checkUsageLimit*(
t: var RequestRateLimiter,
proto: string,
@ -135,9 +179,19 @@ func calcPeerTokenSetting(
proc newRequestRateLimiter*(setting: Option[RateLimitSetting]): RequestRateLimiter =
let ratio = calcPeriodRatio(setting)
let isLimited = setting.isSome() and not setting.get().isUnlimited()
let mainBucketSetting =
if isLimited:
setting.get()
else:
(0, 0.minutes)
return RequestRateLimiter(
tokenBucket: newTokenBucket(setting),
tokenBucket: nil,
setting: setting,
mainBucketSetting: mainBucketSetting,
ratio: ratio,
peerBucketSetting: calcPeerTokenSetting(setting, ratio),
peerUsage: init(TimedMap[PeerId, TokenBucket], calcCacheTimeout(setting, ratio)),
checkUsageImpl: (if isLimited: checkUsageLimited else: checkUsageUnlimited),
)

View File

@ -6,12 +6,15 @@ import std/[options], chronos/timer, libp2p/stream/connection, libp2p/utility
import std/times except TimeInterval, Duration
import ./[token_bucket, setting, service_metrics]
import chronos/ratelimit as token_bucket
import ./[setting, service_metrics]
export token_bucket, setting, service_metrics
proc newTokenBucket*(
setting: Option[RateLimitSetting],
replenishMode: ReplenishMode = ReplenishMode.Compensating,
replenishMode: static[ReplenishMode] = ReplenishMode.Continuous,
startTime: Moment = Moment.now(),
): Option[TokenBucket] =
if setting.isNone():
return none[TokenBucket]()
@ -19,7 +22,14 @@ proc newTokenBucket*(
if setting.get().isUnlimited():
return none[TokenBucket]()
return some(TokenBucket.new(setting.get().volume, setting.get().period))
return some(
TokenBucket.new(
capacity = setting.get().volume,
fillDuration = setting.get().period,
startTime = startTime,
mode = replenishMode,
)
)
proc checkUsage(
t: var TokenBucket, proto: string, now = Moment.now()

View File

@ -1,182 +0,0 @@
{.push raises: [].}
import chronos, std/math, std/options
const BUDGET_COMPENSATION_LIMIT_PERCENT = 0.25
## This is an extract from chronos/rate_limit.nim due to the found bug in the original implementation.
## Unfortunately that bug cannot be solved without harm the original features of TokenBucket class.
## So, this current shortcut is used to enable move ahead with nwaku rate limiter implementation.
## ref: https://github.com/status-im/nim-chronos/issues/500
##
## This version of TokenBucket is different from the original one in chronos/rate_limit.nim in many ways:
## - It has a new mode called `Compensating` which is the default mode.
## Compensation is calculated as the not used bucket capacity in the last measured period(s) in average.
## or up until maximum the allowed compansation treshold (Currently it is const 25%).
## Also compensation takes care of the proper time period calculation to avoid non-usage periods that can lead to
## overcompensation.
## - Strict mode is also available which will only replenish when time period is over but also will fill
## the bucket to the max capacity.
type
ReplenishMode* = enum
Strict
Compensating
TokenBucket* = ref object
budget: int ## Current number of tokens in the bucket
budgetCap: int ## Bucket capacity
lastTimeFull: Moment
## This timer measures the proper periodizaiton of the bucket refilling
fillDuration: Duration ## Refill period
case replenishMode*: ReplenishMode
of Strict:
## In strict mode, the bucket is refilled only till the budgetCap
discard
of Compensating:
## This is the default mode.
maxCompensation: float
func periodDistance(bucket: TokenBucket, currentTime: Moment): float =
## notice fillDuration cannot be zero by design
## period distance is a float number representing the calculated period time
## since the last time bucket was refilled.
return
nanoseconds(currentTime - bucket.lastTimeFull).float /
nanoseconds(bucket.fillDuration).float
func getUsageAverageSince(bucket: TokenBucket, distance: float): float =
if distance == 0.float:
## in case there is zero time difference than the usage percentage is 100%
return 1.0
## budgetCap can never be zero
## usage average is calculated as a percentage of total capacity available over
## the measured period
return bucket.budget.float / bucket.budgetCap.float / distance
proc calcCompensation(bucket: TokenBucket, averageUsage: float): int =
# if we already fully used or even overused the tokens, there is no place for compensation
if averageUsage >= 1.0:
return 0
## compensation is the not used bucket capacity in the last measured period(s) in average.
## or maximum the allowed compansation treshold
let compensationPercent =
min((1.0 - averageUsage) * bucket.budgetCap.float, bucket.maxCompensation)
return trunc(compensationPercent).int
func periodElapsed(bucket: TokenBucket, currentTime: Moment): bool =
return currentTime - bucket.lastTimeFull >= bucket.fillDuration
## Update will take place if bucket is empty and trying to consume tokens.
## It checks if the bucket can be replenished as refill duration is passed or not.
## - strict mode:
proc updateStrict(bucket: TokenBucket, currentTime: Moment) =
if bucket.fillDuration == default(Duration):
bucket.budget = min(bucket.budgetCap, bucket.budget)
return
if not periodElapsed(bucket, currentTime):
return
bucket.budget = bucket.budgetCap
bucket.lastTimeFull = currentTime
## - compensating - ballancing load:
## - between updates we calculate average load (current bucket capacity / number of periods till last update)
## - gives the percentage load used recently
## - with this we can replenish bucket up to 100% + calculated leftover from previous period (caped with max treshold)
proc updateWithCompensation(bucket: TokenBucket, currentTime: Moment) =
if bucket.fillDuration == default(Duration):
bucket.budget = min(bucket.budgetCap, bucket.budget)
return
# do not replenish within the same period
if not periodElapsed(bucket, currentTime):
return
let distance = bucket.periodDistance(currentTime)
let recentAvgUsage = bucket.getUsageAverageSince(distance)
let compensation = bucket.calcCompensation(recentAvgUsage)
bucket.budget = bucket.budgetCap + compensation
bucket.lastTimeFull = currentTime
proc update(bucket: TokenBucket, currentTime: Moment) =
if bucket.replenishMode == ReplenishMode.Compensating:
updateWithCompensation(bucket, currentTime)
else:
updateStrict(bucket, currentTime)
proc tryConsume*(bucket: TokenBucket, tokens: int, now = Moment.now()): bool =
## If `tokens` are available, consume them,
## Otherwhise, return false.
if bucket.budget >= bucket.budgetCap:
bucket.lastTimeFull = now
if bucket.budget >= tokens:
bucket.budget -= tokens
return true
bucket.update(now)
if bucket.budget >= tokens:
bucket.budget -= tokens
return true
else:
return false
proc replenish*(bucket: TokenBucket, tokens: int, now = Moment.now()) =
## Add `tokens` to the budget (capped to the bucket capacity)
bucket.budget += tokens
bucket.update(now)
proc new*(
T: type[TokenBucket],
budgetCap: int,
fillDuration: Duration = 1.seconds,
mode: ReplenishMode = ReplenishMode.Compensating,
): T =
assert not isZero(fillDuration)
assert budgetCap != 0
## Create different mode TokenBucket
case mode
of ReplenishMode.Strict:
return T(
budget: budgetCap,
budgetCap: budgetCap,
fillDuration: fillDuration,
lastTimeFull: Moment.now(),
replenishMode: mode,
)
of ReplenishMode.Compensating:
T(
budget: budgetCap,
budgetCap: budgetCap,
fillDuration: fillDuration,
lastTimeFull: Moment.now(),
replenishMode: mode,
maxCompensation: budgetCap.float * BUDGET_COMPENSATION_LIMIT_PERCENT,
)
proc newStrict*(T: type[TokenBucket], capacity: int, period: Duration): TokenBucket =
T.new(capacity, period, ReplenishMode.Strict)
proc newCompensating*(
T: type[TokenBucket], capacity: int, period: Duration
): TokenBucket =
T.new(capacity, period, ReplenishMode.Compensating)
func `$`*(b: TokenBucket): string {.inline.} =
if isNil(b):
return "nil"
return $b.budgetCap & "/" & $b.fillDuration
func `$`*(ob: Option[TokenBucket]): string {.inline.} =
if ob.isNone():
return "no-limit"
return $ob.get()

View File

@ -209,6 +209,7 @@ proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] =
maxServicePeers = some(builder.maxServicePeers),
colocationLimit = builder.colocationLimit,
shardedPeerManagement = builder.shardAware,
maxConnections = builder.switchMaxConnections.get(builders.MaxConnections),
)
var node: WakuNode

View File

@ -13,7 +13,6 @@ import
libp2p/services/autorelayservice,
libp2p/services/hpservice,
libp2p/peerid,
libp2p/discovery/rendezvousinterface,
eth/keys,
eth/p2p/discoveryv5/enr,
presto,

View File

@ -103,6 +103,7 @@ type PeerManager* = ref object of RootObj
onConnectionChange*: ConnectionChangeHandler
online: bool ## state managed by online_monitor module
getShards: GetShards
maxConnections: int
#~~~~~~~~~~~~~~~~~~~#
# Helper Functions #
@ -748,7 +749,6 @@ proc logAndMetrics(pm: PeerManager) {.async.} =
var peerStore = pm.switch.peerStore
# log metrics
let (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
let maxConnections = pm.switch.connManager.inSema.size
let notConnectedPeers =
peerStore.getDisconnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs))
let outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId))
@ -758,7 +758,7 @@ proc logAndMetrics(pm: PeerManager) {.async.} =
info "Relay peer connections",
inRelayConns = $inRelayPeers.len & "/" & $pm.inRelayPeersTarget,
outRelayConns = $outRelayPeers.len & "/" & $pm.outRelayPeersTarget,
totalConnections = $totalConnections & "/" & $maxConnections,
totalConnections = $totalConnections & "/" & $pm.maxConnections,
notConnectedPeers = notConnectedPeers.len,
outsideBackoffPeers = outsideBackoffPeers.len
@ -1048,9 +1048,9 @@ proc new*(
maxFailedAttempts = MaxFailedAttempts,
colocationLimit = DefaultColocationLimit,
shardedPeerManagement = false,
maxConnections: int = MaxConnections,
): PeerManager {.gcsafe.} =
let capacity = switch.peerStore.capacity
let maxConnections = switch.connManager.inSema.size
if maxConnections > capacity:
error "Max number of connections can't be greater than PeerManager capacity",
capacity = capacity, maxConnections = maxConnections
@ -1099,6 +1099,7 @@ proc new*(
colocationLimit: colocationLimit,
shardedPeerManagement: shardedPeerManagement,
online: true,
maxConnections: maxConnections,
)
proc peerHook(

View File

@ -8,7 +8,6 @@ import
stew/byteutils,
libp2p/protocols/rendezvous,
libp2p/protocols/rendezvous/protobuf,
libp2p/discovery/discoverymngr,
libp2p/utils/semaphore,
libp2p/utils/offsettedseq,
libp2p/crypto/curve25519,