From c1d657a482f26029376b411cbe44be0e21138957 Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Tue, 16 Dec 2025 02:34:10 +0100 Subject: [PATCH 1/5] Adapt using chronos' TokenBucket. Removed TokenBucket and test. bump nim-chronos -> nim-libp2p/nim-lsquic/nim-jwt -> adapt to latest libp2p changes --- .gitmodules | 6 + tests/common/test_all.nim | 1 - tests/common/test_tokenbucket.nim | 69 ------- vendor/nim-chronos | 2 +- vendor/nim-jwt | 1 + vendor/nim-libp2p | 2 +- vendor/nim-lsquic | 1 + waku.nimble | 4 +- waku/common/rate_limit/per_peer_limiter.nim | 2 +- .../rate_limit/single_token_limiter.nim | 14 +- waku/common/rate_limit/token_bucket.nim | 182 ------------------ waku/factory/builder.nim | 1 + waku/factory/waku.nim | 3 - waku/node/peer_manager/peer_manager.nim | 7 +- waku/waku_rendezvous/protocol.nim | 1 - 15 files changed, 30 insertions(+), 266 deletions(-) delete mode 100644 tests/common/test_tokenbucket.nim create mode 160000 vendor/nim-jwt create mode 160000 vendor/nim-lsquic delete mode 100644 waku/common/rate_limit/token_bucket.nim diff --git a/.gitmodules b/.gitmodules index 93a3a006f..f5249f089 100644 --- a/.gitmodules +++ b/.gitmodules @@ -184,3 +184,9 @@ url = https://github.com/logos-messaging/waku-rlnv2-contract.git ignore = untracked branch = master +[submodule "vendor/nim-lsquic"] + path = vendor/nim-lsquic + url = https://github.com/vacp2p/nim-lsquic +[submodule "vendor/nim-jwt"] + path = vendor/nim-jwt + url = https://github.com/vacp2p/nim-jwt.git diff --git a/tests/common/test_all.nim b/tests/common/test_all.nim index 7495c7c9e..d597a7424 100644 --- a/tests/common/test_all.nim +++ b/tests/common/test_all.nim @@ -6,7 +6,6 @@ import ./test_protobuf_validation, ./test_sqlite_migrations, ./test_parse_size, - ./test_tokenbucket, ./test_requestratelimiter, ./test_ratelimit_setting, ./test_timed_map, diff --git a/tests/common/test_tokenbucket.nim b/tests/common/test_tokenbucket.nim deleted file mode 100644 index 5bc1a0583..000000000 --- a/tests/common/test_tokenbucket.nim +++ /dev/null @@ -1,69 +0,0 @@ -# Chronos Test Suite -# (c) Copyright 2022-Present -# Status Research & Development GmbH -# -# Licensed under either of -# Apache License, version 2.0, (LICENSE-APACHEv2) -# MIT license (LICENSE-MIT) - -{.used.} - -import testutils/unittests -import chronos -import ../../waku/common/rate_limit/token_bucket - -suite "Token Bucket": - test "TokenBucket Sync test - strict": - var bucket = TokenBucket.newStrict(1000, 1.milliseconds) - let - start = Moment.now() - fullTime = start + 1.milliseconds - check: - bucket.tryConsume(800, start) == true - bucket.tryConsume(200, start) == true - # Out of budget - bucket.tryConsume(100, start) == false - bucket.tryConsume(800, fullTime) == true - bucket.tryConsume(200, fullTime) == true - # Out of budget - bucket.tryConsume(100, fullTime) == false - - test "TokenBucket Sync test - compensating": - var bucket = TokenBucket.new(1000, 1.milliseconds) - let - start = Moment.now() - fullTime = start + 1.milliseconds - check: - bucket.tryConsume(800, start) == true - bucket.tryConsume(200, start) == true - # Out of budget - bucket.tryConsume(100, start) == false - bucket.tryConsume(800, fullTime) == true - bucket.tryConsume(200, fullTime) == true - # Due not using the bucket for a full period the compensation will satisfy this request - bucket.tryConsume(100, fullTime) == true - - test "TokenBucket Max compensation": - var bucket = TokenBucket.new(1000, 1.minutes) - var reqTime = Moment.now() - - check bucket.tryConsume(1000, reqTime) - check bucket.tryConsume(1, reqTime) == false - reqTime += 1.minutes - check bucket.tryConsume(500, reqTime) == true - reqTime += 1.minutes - check bucket.tryConsume(1000, reqTime) == true - reqTime += 10.seconds - # max compensation is 25% so try to consume 250 more - check bucket.tryConsume(250, reqTime) == true - reqTime += 49.seconds - # out of budget within the same period - check bucket.tryConsume(1, reqTime) == false - - test "TokenBucket Short replenish": - var bucket = TokenBucket.new(15000, 1.milliseconds) - let start = Moment.now() - check bucket.tryConsume(15000, start) - check bucket.tryConsume(1, start) == false - - check bucket.tryConsume(15000, start + 1.milliseconds) == true diff --git a/vendor/nim-chronos b/vendor/nim-chronos index 0646c444f..85af4db76 160000 --- a/vendor/nim-chronos +++ b/vendor/nim-chronos @@ -1 +1 @@ -Subproject commit 0646c444fce7c7ed08ef6f2c9a7abfd172ffe655 +Subproject commit 85af4db764ecd3573c4704139560df3943216cf1 diff --git a/vendor/nim-jwt b/vendor/nim-jwt new file mode 160000 index 000000000..18f8378de --- /dev/null +++ b/vendor/nim-jwt @@ -0,0 +1 @@ +Subproject commit 18f8378de52b241f321c1f9ea905456e89b95c6f diff --git a/vendor/nim-libp2p b/vendor/nim-libp2p index e82080f7b..eb7e6ff89 160000 --- a/vendor/nim-libp2p +++ b/vendor/nim-libp2p @@ -1 +1 @@ -Subproject commit e82080f7b1aa61c6d35fa5311b873f41eff4bb52 +Subproject commit eb7e6ff89889e41b57515f891ba82986c54809fb diff --git a/vendor/nim-lsquic b/vendor/nim-lsquic new file mode 160000 index 000000000..f3fe33462 --- /dev/null +++ b/vendor/nim-lsquic @@ -0,0 +1 @@ +Subproject commit f3fe33462601ea34eb2e8e9c357c92e61f8d121b diff --git a/waku.nimble b/waku.nimble index 09ff48969..bebd3cc5e 100644 --- a/waku.nimble +++ b/waku.nimble @@ -30,7 +30,9 @@ requires "nim >= 2.2.4", "regex", "results", "db_connector", - "minilru" + "minilru", + "lsquic", + "jwt" ### Helper functions proc buildModule(filePath, params = "", lang = "c"): bool = diff --git a/waku/common/rate_limit/per_peer_limiter.nim b/waku/common/rate_limit/per_peer_limiter.nim index 5cb96a2d1..16b6bf065 100644 --- a/waku/common/rate_limit/per_peer_limiter.nim +++ b/waku/common/rate_limit/per_peer_limiter.nim @@ -20,7 +20,7 @@ proc mgetOrPut( perPeerRateLimiter: var PerPeerRateLimiter, peerId: PeerId ): var Option[TokenBucket] = return perPeerRateLimiter.peerBucket.mgetOrPut( - peerId, newTokenBucket(perPeerRateLimiter.setting, ReplenishMode.Compensating) + peerId, newTokenBucket(perPeerRateLimiter.setting, ReplenishMode.Continuous) ) template checkUsageLimit*( diff --git a/waku/common/rate_limit/single_token_limiter.nim b/waku/common/rate_limit/single_token_limiter.nim index 50fb2d64c..f976c5367 100644 --- a/waku/common/rate_limit/single_token_limiter.nim +++ b/waku/common/rate_limit/single_token_limiter.nim @@ -6,12 +6,14 @@ import std/[options], chronos/timer, libp2p/stream/connection, libp2p/utility import std/times except TimeInterval, Duration -import ./[token_bucket, setting, service_metrics] +import chronos/ratelimit as token_bucket + +import ./[setting, service_metrics] export token_bucket, setting, service_metrics proc newTokenBucket*( setting: Option[RateLimitSetting], - replenishMode: ReplenishMode = ReplenishMode.Compensating, + replenishMode: ReplenishMode = ReplenishMode.Continuous, ): Option[TokenBucket] = if setting.isNone(): return none[TokenBucket]() @@ -19,7 +21,13 @@ proc newTokenBucket*( if setting.get().isUnlimited(): return none[TokenBucket]() - return some(TokenBucket.new(setting.get().volume, setting.get().period)) + return some( + TokenBucket.new( + capacity = setting.get().volume, + fillDuration = setting.get().period, + mode = Continuous, + ) + ) proc checkUsage( t: var TokenBucket, proto: string, now = Moment.now() diff --git a/waku/common/rate_limit/token_bucket.nim b/waku/common/rate_limit/token_bucket.nim deleted file mode 100644 index 799817ebd..000000000 --- a/waku/common/rate_limit/token_bucket.nim +++ /dev/null @@ -1,182 +0,0 @@ -{.push raises: [].} - -import chronos, std/math, std/options - -const BUDGET_COMPENSATION_LIMIT_PERCENT = 0.25 - -## This is an extract from chronos/rate_limit.nim due to the found bug in the original implementation. -## Unfortunately that bug cannot be solved without harm the original features of TokenBucket class. -## So, this current shortcut is used to enable move ahead with nwaku rate limiter implementation. -## ref: https://github.com/status-im/nim-chronos/issues/500 -## -## This version of TokenBucket is different from the original one in chronos/rate_limit.nim in many ways: -## - It has a new mode called `Compensating` which is the default mode. -## Compensation is calculated as the not used bucket capacity in the last measured period(s) in average. -## or up until maximum the allowed compansation treshold (Currently it is const 25%). -## Also compensation takes care of the proper time period calculation to avoid non-usage periods that can lead to -## overcompensation. -## - Strict mode is also available which will only replenish when time period is over but also will fill -## the bucket to the max capacity. - -type - ReplenishMode* = enum - Strict - Compensating - - TokenBucket* = ref object - budget: int ## Current number of tokens in the bucket - budgetCap: int ## Bucket capacity - lastTimeFull: Moment - ## This timer measures the proper periodizaiton of the bucket refilling - fillDuration: Duration ## Refill period - case replenishMode*: ReplenishMode - of Strict: - ## In strict mode, the bucket is refilled only till the budgetCap - discard - of Compensating: - ## This is the default mode. - maxCompensation: float - -func periodDistance(bucket: TokenBucket, currentTime: Moment): float = - ## notice fillDuration cannot be zero by design - ## period distance is a float number representing the calculated period time - ## since the last time bucket was refilled. - return - nanoseconds(currentTime - bucket.lastTimeFull).float / - nanoseconds(bucket.fillDuration).float - -func getUsageAverageSince(bucket: TokenBucket, distance: float): float = - if distance == 0.float: - ## in case there is zero time difference than the usage percentage is 100% - return 1.0 - - ## budgetCap can never be zero - ## usage average is calculated as a percentage of total capacity available over - ## the measured period - return bucket.budget.float / bucket.budgetCap.float / distance - -proc calcCompensation(bucket: TokenBucket, averageUsage: float): int = - # if we already fully used or even overused the tokens, there is no place for compensation - if averageUsage >= 1.0: - return 0 - - ## compensation is the not used bucket capacity in the last measured period(s) in average. - ## or maximum the allowed compansation treshold - let compensationPercent = - min((1.0 - averageUsage) * bucket.budgetCap.float, bucket.maxCompensation) - return trunc(compensationPercent).int - -func periodElapsed(bucket: TokenBucket, currentTime: Moment): bool = - return currentTime - bucket.lastTimeFull >= bucket.fillDuration - -## Update will take place if bucket is empty and trying to consume tokens. -## It checks if the bucket can be replenished as refill duration is passed or not. -## - strict mode: -proc updateStrict(bucket: TokenBucket, currentTime: Moment) = - if bucket.fillDuration == default(Duration): - bucket.budget = min(bucket.budgetCap, bucket.budget) - return - - if not periodElapsed(bucket, currentTime): - return - - bucket.budget = bucket.budgetCap - bucket.lastTimeFull = currentTime - -## - compensating - ballancing load: -## - between updates we calculate average load (current bucket capacity / number of periods till last update) -## - gives the percentage load used recently -## - with this we can replenish bucket up to 100% + calculated leftover from previous period (caped with max treshold) -proc updateWithCompensation(bucket: TokenBucket, currentTime: Moment) = - if bucket.fillDuration == default(Duration): - bucket.budget = min(bucket.budgetCap, bucket.budget) - return - - # do not replenish within the same period - if not periodElapsed(bucket, currentTime): - return - - let distance = bucket.periodDistance(currentTime) - let recentAvgUsage = bucket.getUsageAverageSince(distance) - let compensation = bucket.calcCompensation(recentAvgUsage) - - bucket.budget = bucket.budgetCap + compensation - bucket.lastTimeFull = currentTime - -proc update(bucket: TokenBucket, currentTime: Moment) = - if bucket.replenishMode == ReplenishMode.Compensating: - updateWithCompensation(bucket, currentTime) - else: - updateStrict(bucket, currentTime) - -proc tryConsume*(bucket: TokenBucket, tokens: int, now = Moment.now()): bool = - ## If `tokens` are available, consume them, - ## Otherwhise, return false. - - if bucket.budget >= bucket.budgetCap: - bucket.lastTimeFull = now - - if bucket.budget >= tokens: - bucket.budget -= tokens - return true - - bucket.update(now) - - if bucket.budget >= tokens: - bucket.budget -= tokens - return true - else: - return false - -proc replenish*(bucket: TokenBucket, tokens: int, now = Moment.now()) = - ## Add `tokens` to the budget (capped to the bucket capacity) - bucket.budget += tokens - bucket.update(now) - -proc new*( - T: type[TokenBucket], - budgetCap: int, - fillDuration: Duration = 1.seconds, - mode: ReplenishMode = ReplenishMode.Compensating, -): T = - assert not isZero(fillDuration) - assert budgetCap != 0 - - ## Create different mode TokenBucket - case mode - of ReplenishMode.Strict: - return T( - budget: budgetCap, - budgetCap: budgetCap, - fillDuration: fillDuration, - lastTimeFull: Moment.now(), - replenishMode: mode, - ) - of ReplenishMode.Compensating: - T( - budget: budgetCap, - budgetCap: budgetCap, - fillDuration: fillDuration, - lastTimeFull: Moment.now(), - replenishMode: mode, - maxCompensation: budgetCap.float * BUDGET_COMPENSATION_LIMIT_PERCENT, - ) - -proc newStrict*(T: type[TokenBucket], capacity: int, period: Duration): TokenBucket = - T.new(capacity, period, ReplenishMode.Strict) - -proc newCompensating*( - T: type[TokenBucket], capacity: int, period: Duration -): TokenBucket = - T.new(capacity, period, ReplenishMode.Compensating) - -func `$`*(b: TokenBucket): string {.inline.} = - if isNil(b): - return "nil" - return $b.budgetCap & "/" & $b.fillDuration - -func `$`*(ob: Option[TokenBucket]): string {.inline.} = - if ob.isNone(): - return "no-limit" - - return $ob.get() diff --git a/waku/factory/builder.nim b/waku/factory/builder.nim index 772cfbffd..f379f92bb 100644 --- a/waku/factory/builder.nim +++ b/waku/factory/builder.nim @@ -209,6 +209,7 @@ proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] = maxServicePeers = some(builder.maxServicePeers), colocationLimit = builder.colocationLimit, shardedPeerManagement = builder.shardAware, + maxConnections = builder.switchMaxConnections.get(builders.MaxConnections), ) var node: WakuNode diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index bed8a9137..d55206f97 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -13,8 +13,6 @@ import libp2p/services/autorelayservice, libp2p/services/hpservice, libp2p/peerid, - libp2p/discovery/discoverymngr, - libp2p/discovery/rendezvousinterface, eth/keys, eth/p2p/discoveryv5/enr, presto, @@ -63,7 +61,6 @@ type Waku* = ref object dynamicBootstrapNodes*: seq[RemotePeerInfo] dnsRetryLoopHandle: Future[void] networkConnLoopHandle: Future[void] - discoveryMngr: DiscoveryManager node*: WakuNode diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index 1abcc1ac0..487d3894d 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -103,6 +103,7 @@ type PeerManager* = ref object of RootObj onConnectionChange*: ConnectionChangeHandler online: bool ## state managed by online_monitor module getShards: GetShards + maxConnections: int #~~~~~~~~~~~~~~~~~~~# # Helper Functions # @@ -748,7 +749,6 @@ proc logAndMetrics(pm: PeerManager) {.async.} = var peerStore = pm.switch.peerStore # log metrics let (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec) - let maxConnections = pm.switch.connManager.inSema.size let notConnectedPeers = peerStore.getDisconnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs)) let outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId)) @@ -758,7 +758,7 @@ proc logAndMetrics(pm: PeerManager) {.async.} = info "Relay peer connections", inRelayConns = $inRelayPeers.len & "/" & $pm.inRelayPeersTarget, outRelayConns = $outRelayPeers.len & "/" & $pm.outRelayPeersTarget, - totalConnections = $totalConnections & "/" & $maxConnections, + totalConnections = $totalConnections & "/" & $pm.maxConnections, notConnectedPeers = notConnectedPeers.len, outsideBackoffPeers = outsideBackoffPeers.len @@ -1048,9 +1048,9 @@ proc new*( maxFailedAttempts = MaxFailedAttempts, colocationLimit = DefaultColocationLimit, shardedPeerManagement = false, + maxConnections: int = MaxConnections, ): PeerManager {.gcsafe.} = let capacity = switch.peerStore.capacity - let maxConnections = switch.connManager.inSema.size if maxConnections > capacity: error "Max number of connections can't be greater than PeerManager capacity", capacity = capacity, maxConnections = maxConnections @@ -1099,6 +1099,7 @@ proc new*( colocationLimit: colocationLimit, shardedPeerManagement: shardedPeerManagement, online: true, + maxConnections: maxConnections, ) proc peerHook( diff --git a/waku/waku_rendezvous/protocol.nim b/waku/waku_rendezvous/protocol.nim index 7b97375ff..00b5f1a5c 100644 --- a/waku/waku_rendezvous/protocol.nim +++ b/waku/waku_rendezvous/protocol.nim @@ -8,7 +8,6 @@ import stew/byteutils, libp2p/protocols/rendezvous, libp2p/protocols/rendezvous/protobuf, - libp2p/discovery/discoverymngr, libp2p/utils/semaphore, libp2p/utils/offsettedseq, libp2p/crypto/curve25519, From 8eae7140720b4220b7800348cc6b913d5e07f6ce Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Fri, 19 Dec 2025 23:22:21 +0100 Subject: [PATCH 2/5] Fix libp2p/utility reports unlisted exception can occure from close of socket in waitForService - -d:ssl compile flag caused it --- waku.nimble | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/waku.nimble b/waku.nimble index bebd3cc5e..789a89c8a 100644 --- a/waku.nimble +++ b/waku.nimble @@ -149,7 +149,8 @@ task chat2, "Build example Waku chat usage": let name = "chat2" buildBinary name, "apps/chat2/", - "-d:chronicles_sinks=textlines[file] -d:ssl -d:chronicles_log_level='TRACE' " + "-d:chronicles_sinks=textlines[file] -d:chronicles_log_level='TRACE' " + # -d:ssl - cause unlisted exception error in libp2p/utility... task chat2mix, "Build example Waku chat mix usage": # NOTE For debugging, set debug level. For chat usage we want minimal log @@ -159,7 +160,8 @@ task chat2mix, "Build example Waku chat mix usage": let name = "chat2mix" buildBinary name, "apps/chat2mix/", - "-d:chronicles_sinks=textlines[file] -d:ssl -d:chronicles_log_level='TRACE' " + "-d:chronicles_sinks=textlines[file] -d:chronicles_log_level='TRACE' " + # -d:ssl - cause unlisted exception error in libp2p/utility... task chat2bridge, "Build chat2bridge": let name = "chat2bridge" From f2deb490c50fa0f5ae33eec50e36ed547a61edf3 Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Sat, 20 Dec 2025 00:20:40 +0100 Subject: [PATCH 3/5] Adapt request_limiter to new chronos' TokenBucket replenish algorithm to keep original intent of use --- waku/common/rate_limit/request_limiter.nim | 78 ++++++++++++++++--- .../rate_limit/single_token_limiter.nim | 6 +- 2 files changed, 70 insertions(+), 14 deletions(-) diff --git a/waku/common/rate_limit/request_limiter.nim b/waku/common/rate_limit/request_limiter.nim index 0ede20be4..bc318e151 100644 --- a/waku/common/rate_limit/request_limiter.nim +++ b/waku/common/rate_limit/request_limiter.nim @@ -39,38 +39,82 @@ const SECONDS_RATIO = 3 const MINUTES_RATIO = 2 type RequestRateLimiter* = ref object of RootObj - tokenBucket: Option[TokenBucket] + tokenBucket: TokenBucket setting*: Option[RateLimitSetting] + mainBucketSetting: RateLimitSetting + ratio: int peerBucketSetting*: RateLimitSetting peerUsage: TimedMap[PeerId, TokenBucket] + checkUsageImpl: proc( + t: var RequestRateLimiter, proto: string, conn: Connection, now: Moment + ): bool {.gcsafe, raises: [].} + +proc newMainTokenBucket( + setting: RateLimitSetting, ratio: int, startTime: Moment +): TokenBucket = + ## RequestRateLimiter's global bucket should keep the *rate* of the configured + ## setting while allowing a larger burst window. We achieve this by scaling + ## both capacity and fillDuration by the same ratio. + ## + ## This matches previous behavior where unused tokens could effectively + ## accumulate across multiple periods. + let burstCapacity = setting.volume * ratio + var bucket = TokenBucket.new( + capacity = burstCapacity, + fillDuration = setting.period * ratio, + startTime = startTime, + mode = Continuous, + ) + + # Start with the configured volume (not the burst capacity) so that the + # initial burst behavior matches the raw setting, while still allowing + # accumulation up to `burstCapacity` over time. + let excess = burstCapacity - setting.volume + if excess > 0: + discard bucket.tryConsume(excess, startTime) + + return bucket proc mgetOrPut( - requestRateLimiter: var RequestRateLimiter, peerId: PeerId + requestRateLimiter: var RequestRateLimiter, peerId: PeerId, now: Moment ): var TokenBucket = - let bucketForNew = newTokenBucket(some(requestRateLimiter.peerBucketSetting)).valueOr: + let bucketForNew = newTokenBucket( + some(requestRateLimiter.peerBucketSetting), Discrete, now + ).valueOr: raiseAssert "This branch is not allowed to be reached as it will not be called if the setting is None." return requestRateLimiter.peerUsage.mgetOrPut(peerId, bucketForNew) -proc checkUsage*( - t: var RequestRateLimiter, proto: string, conn: Connection, now = Moment.now() -): bool {.raises: [].} = - if t.tokenBucket.isNone(): - return true +proc checkUsageUnlimited( + t: var RequestRateLimiter, proto: string, conn: Connection, now: Moment +): bool {.gcsafe, raises: [].} = + true - let peerBucket = t.mgetOrPut(conn.peerId) +proc checkUsageLimited( + t: var RequestRateLimiter, proto: string, conn: Connection, now: Moment +): bool {.gcsafe, raises: [].} = + # Lazy-init the main bucket using the first observed request time. This makes + # refill behavior deterministic under tests where `now` is controlled. + if isNil(t.tokenBucket): + t.tokenBucket = newMainTokenBucket(t.mainBucketSetting, t.ratio, now) + + let peerBucket = t.mgetOrPut(conn.peerId, now) ## check requesting peer's usage is not over the calculated ratio and let that peer go which not requested much/or this time... if not peerBucket.tryConsume(1, now): trace "peer usage limit reached", peer = conn.peerId return false # Ok if the peer can consume, check the overall budget we have left - let tokenBucket = t.tokenBucket.get() - if not tokenBucket.tryConsume(1, now): + if not t.tokenBucket.tryConsume(1, now): return false return true +proc checkUsage*( + t: var RequestRateLimiter, proto: string, conn: Connection, now = Moment.now() +): bool {.raises: [].} = + t.checkUsageImpl(t, proto, conn, now) + template checkUsageLimit*( t: var RequestRateLimiter, proto: string, @@ -135,9 +179,19 @@ func calcPeerTokenSetting( proc newRequestRateLimiter*(setting: Option[RateLimitSetting]): RequestRateLimiter = let ratio = calcPeriodRatio(setting) + let isLimited = setting.isSome() and not setting.get().isUnlimited() + let mainBucketSetting = + if isLimited: + setting.get() + else: + (0, 0.minutes) + return RequestRateLimiter( - tokenBucket: newTokenBucket(setting), + tokenBucket: nil, setting: setting, + mainBucketSetting: mainBucketSetting, + ratio: ratio, peerBucketSetting: calcPeerTokenSetting(setting, ratio), peerUsage: init(TimedMap[PeerId, TokenBucket], calcCacheTimeout(setting, ratio)), + checkUsageImpl: (if isLimited: checkUsageLimited else: checkUsageUnlimited), ) diff --git a/waku/common/rate_limit/single_token_limiter.nim b/waku/common/rate_limit/single_token_limiter.nim index f976c5367..fc4b0acd5 100644 --- a/waku/common/rate_limit/single_token_limiter.nim +++ b/waku/common/rate_limit/single_token_limiter.nim @@ -13,7 +13,8 @@ export token_bucket, setting, service_metrics proc newTokenBucket*( setting: Option[RateLimitSetting], - replenishMode: ReplenishMode = ReplenishMode.Continuous, + replenishMode: static[ReplenishMode] = ReplenishMode.Continuous, + startTime: Moment = Moment.now(), ): Option[TokenBucket] = if setting.isNone(): return none[TokenBucket]() @@ -25,7 +26,8 @@ proc newTokenBucket*( TokenBucket.new( capacity = setting.get().volume, fillDuration = setting.get().period, - mode = Continuous, + startTime = startTime, + mode = replenishMode, ) ) From bfcd177fce74fd5f7cf649d7a57ddb11d61821c8 Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Sat, 20 Dec 2025 21:11:40 +0100 Subject: [PATCH 4/5] Fix filter dos protection test --- .../test_waku_filter_dos_protection.nim | 24 +++++++++++++++---- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/tests/waku_filter_v2/test_waku_filter_dos_protection.nim b/tests/waku_filter_v2/test_waku_filter_dos_protection.nim index 7c8c640ba..70055471b 100644 --- a/tests/waku_filter_v2/test_waku_filter_dos_protection.nim +++ b/tests/waku_filter_v2/test_waku_filter_dos_protection.nim @@ -147,29 +147,43 @@ suite "Waku Filter - DOS protection": asyncTest "Ensure normal usage allowed": # Given + # Rate limit setting is (3 requests / 1000ms) per peer. + # In a token-bucket model this means: + # - capacity = 3 tokens + # - refill rate = 3 tokens / second => ~1 token every ~333ms + # - each request consumes 1 token (including UNSUBSCRIBE) check client1.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) == none(FilterSubscribeErrorKind) check wakuFilter.subscriptions.isSubscribed(client1.clientPeerId) - await sleepAsync(500.milliseconds) - check client1.ping(serverRemotePeerInfo) == none(FilterSubscribeErrorKind) - check wakuFilter.subscriptions.isSubscribed(client1.clientPeerId) + # Expected remaining tokens (approx): 2 await sleepAsync(500.milliseconds) check client1.ping(serverRemotePeerInfo) == none(FilterSubscribeErrorKind) check wakuFilter.subscriptions.isSubscribed(client1.clientPeerId) + # After ~500ms, ~1 token refilled; PING consumes 1 => expected remaining: 2 + + await sleepAsync(500.milliseconds) + check client1.ping(serverRemotePeerInfo) == none(FilterSubscribeErrorKind) + check wakuFilter.subscriptions.isSubscribed(client1.clientPeerId) + + # After another ~500ms, ~1 token refilled; PING consumes 1 => expected remaining: 2 + await sleepAsync(50.milliseconds) check client1.unsubscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) == none(FilterSubscribeErrorKind) check wakuFilter.subscriptions.isSubscribed(client1.clientPeerId) == false + # ~50ms is not enough to refill a token at 3/sec; UNSUBSCRIBE consumes 1 => expected remaining: 1 + await sleepAsync(50.milliseconds) check client1.ping(serverRemotePeerInfo) == some(FilterSubscribeErrorKind.NOT_FOUND) - check client1.ping(serverRemotePeerInfo) == some(FilterSubscribeErrorKind.NOT_FOUND) - await sleepAsync(50.milliseconds) + # PING consumes the last token => expected remaining: 0 + check client1.ping(serverRemotePeerInfo) == some(FilterSubscribeErrorKind.TOO_MANY_REQUESTS) + # Immediate second PING has no token available => expected remaining: 0 check client2.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) == none(FilterSubscribeErrorKind) From fc1ad8abc25ca9b9c8b6ca9462918e5e60653c88 Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Mon, 22 Dec 2025 02:16:42 +0100 Subject: [PATCH 5/5] Fix peer manager tests due change caused by new libp2p --- tests/test_peer_manager.nim | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/test_peer_manager.nim b/tests/test_peer_manager.nim index 1369f3f88..97df39582 100644 --- a/tests/test_peer_manager.nim +++ b/tests/test_peer_manager.nim @@ -997,6 +997,7 @@ procSuite "Peer Manager": .build(), maxFailedAttempts = 1, storage = nil, + maxConnections = 20, ) # Create 30 peers and add them to the peerstore @@ -1063,6 +1064,7 @@ procSuite "Peer Manager": backoffFactor = 2, maxFailedAttempts = 10, storage = nil, + maxConnections = 20, ) var p1: PeerId require p1.init("QmeuZJbXrszW2jdT7GdduSjQskPU3S7vvGWKtKgDfkDvW" & "1") @@ -1116,6 +1118,7 @@ procSuite "Peer Manager": .build(), maxFailedAttempts = 150, storage = nil, + maxConnections = 20, ) # Should result in backoff > 1 week @@ -1131,6 +1134,7 @@ procSuite "Peer Manager": .build(), maxFailedAttempts = 10, storage = nil, + maxConnections = 20, ) let pm = PeerManager.new( @@ -1144,6 +1148,7 @@ procSuite "Peer Manager": .build(), maxFailedAttempts = 5, storage = nil, + maxConnections = 20, ) asyncTest "colocationLimit is enforced by pruneConnsByIp()":