From c1d657a482f26029376b411cbe44be0e21138957 Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Tue, 16 Dec 2025 02:34:10 +0100 Subject: [PATCH] Adapt using chronos' TokenBucket. Removed TokenBucket and test. bump nim-chronos -> nim-libp2p/nim-lsquic/nim-jwt -> adapt to latest libp2p changes --- .gitmodules | 6 + tests/common/test_all.nim | 1 - tests/common/test_tokenbucket.nim | 69 ------- vendor/nim-chronos | 2 +- vendor/nim-jwt | 1 + vendor/nim-libp2p | 2 +- vendor/nim-lsquic | 1 + waku.nimble | 4 +- waku/common/rate_limit/per_peer_limiter.nim | 2 +- .../rate_limit/single_token_limiter.nim | 14 +- waku/common/rate_limit/token_bucket.nim | 182 ------------------ waku/factory/builder.nim | 1 + waku/factory/waku.nim | 3 - waku/node/peer_manager/peer_manager.nim | 7 +- waku/waku_rendezvous/protocol.nim | 1 - 15 files changed, 30 insertions(+), 266 deletions(-) delete mode 100644 tests/common/test_tokenbucket.nim create mode 160000 vendor/nim-jwt create mode 160000 vendor/nim-lsquic delete mode 100644 waku/common/rate_limit/token_bucket.nim diff --git a/.gitmodules b/.gitmodules index 93a3a006f..f5249f089 100644 --- a/.gitmodules +++ b/.gitmodules @@ -184,3 +184,9 @@ url = https://github.com/logos-messaging/waku-rlnv2-contract.git ignore = untracked branch = master +[submodule "vendor/nim-lsquic"] + path = vendor/nim-lsquic + url = https://github.com/vacp2p/nim-lsquic +[submodule "vendor/nim-jwt"] + path = vendor/nim-jwt + url = https://github.com/vacp2p/nim-jwt.git diff --git a/tests/common/test_all.nim b/tests/common/test_all.nim index 7495c7c9e..d597a7424 100644 --- a/tests/common/test_all.nim +++ b/tests/common/test_all.nim @@ -6,7 +6,6 @@ import ./test_protobuf_validation, ./test_sqlite_migrations, ./test_parse_size, - ./test_tokenbucket, ./test_requestratelimiter, ./test_ratelimit_setting, ./test_timed_map, diff --git a/tests/common/test_tokenbucket.nim b/tests/common/test_tokenbucket.nim deleted file mode 100644 index 5bc1a0583..000000000 --- a/tests/common/test_tokenbucket.nim +++ /dev/null @@ -1,69 +0,0 @@ -# Chronos Test Suite -# (c) Copyright 2022-Present -# Status Research & Development GmbH -# -# Licensed under either of -# Apache License, version 2.0, (LICENSE-APACHEv2) -# MIT license (LICENSE-MIT) - -{.used.} - -import testutils/unittests -import chronos -import ../../waku/common/rate_limit/token_bucket - -suite "Token Bucket": - test "TokenBucket Sync test - strict": - var bucket = TokenBucket.newStrict(1000, 1.milliseconds) - let - start = Moment.now() - fullTime = start + 1.milliseconds - check: - bucket.tryConsume(800, start) == true - bucket.tryConsume(200, start) == true - # Out of budget - bucket.tryConsume(100, start) == false - bucket.tryConsume(800, fullTime) == true - bucket.tryConsume(200, fullTime) == true - # Out of budget - bucket.tryConsume(100, fullTime) == false - - test "TokenBucket Sync test - compensating": - var bucket = TokenBucket.new(1000, 1.milliseconds) - let - start = Moment.now() - fullTime = start + 1.milliseconds - check: - bucket.tryConsume(800, start) == true - bucket.tryConsume(200, start) == true - # Out of budget - bucket.tryConsume(100, start) == false - bucket.tryConsume(800, fullTime) == true - bucket.tryConsume(200, fullTime) == true - # Due not using the bucket for a full period the compensation will satisfy this request - bucket.tryConsume(100, fullTime) == true - - test "TokenBucket Max compensation": - var bucket = TokenBucket.new(1000, 1.minutes) - var reqTime = Moment.now() - - check bucket.tryConsume(1000, reqTime) - check bucket.tryConsume(1, reqTime) == false - reqTime += 1.minutes - check bucket.tryConsume(500, reqTime) == true - reqTime += 1.minutes - check bucket.tryConsume(1000, reqTime) == true - reqTime += 10.seconds - # max compensation is 25% so try to consume 250 more - check bucket.tryConsume(250, reqTime) == true - reqTime += 49.seconds - # out of budget within the same period - check bucket.tryConsume(1, reqTime) == false - - test "TokenBucket Short replenish": - var bucket = TokenBucket.new(15000, 1.milliseconds) - let start = Moment.now() - check bucket.tryConsume(15000, start) - check bucket.tryConsume(1, start) == false - - check bucket.tryConsume(15000, start + 1.milliseconds) == true diff --git a/vendor/nim-chronos b/vendor/nim-chronos index 0646c444f..85af4db76 160000 --- a/vendor/nim-chronos +++ b/vendor/nim-chronos @@ -1 +1 @@ -Subproject commit 0646c444fce7c7ed08ef6f2c9a7abfd172ffe655 +Subproject commit 85af4db764ecd3573c4704139560df3943216cf1 diff --git a/vendor/nim-jwt b/vendor/nim-jwt new file mode 160000 index 000000000..18f8378de --- /dev/null +++ b/vendor/nim-jwt @@ -0,0 +1 @@ +Subproject commit 18f8378de52b241f321c1f9ea905456e89b95c6f diff --git a/vendor/nim-libp2p b/vendor/nim-libp2p index e82080f7b..eb7e6ff89 160000 --- a/vendor/nim-libp2p +++ b/vendor/nim-libp2p @@ -1 +1 @@ -Subproject commit e82080f7b1aa61c6d35fa5311b873f41eff4bb52 +Subproject commit eb7e6ff89889e41b57515f891ba82986c54809fb diff --git a/vendor/nim-lsquic b/vendor/nim-lsquic new file mode 160000 index 000000000..f3fe33462 --- /dev/null +++ b/vendor/nim-lsquic @@ -0,0 +1 @@ +Subproject commit f3fe33462601ea34eb2e8e9c357c92e61f8d121b diff --git a/waku.nimble b/waku.nimble index 09ff48969..bebd3cc5e 100644 --- a/waku.nimble +++ b/waku.nimble @@ -30,7 +30,9 @@ requires "nim >= 2.2.4", "regex", "results", "db_connector", - "minilru" + "minilru", + "lsquic", + "jwt" ### Helper functions proc buildModule(filePath, params = "", lang = "c"): bool = diff --git a/waku/common/rate_limit/per_peer_limiter.nim b/waku/common/rate_limit/per_peer_limiter.nim index 5cb96a2d1..16b6bf065 100644 --- a/waku/common/rate_limit/per_peer_limiter.nim +++ b/waku/common/rate_limit/per_peer_limiter.nim @@ -20,7 +20,7 @@ proc mgetOrPut( perPeerRateLimiter: var PerPeerRateLimiter, peerId: PeerId ): var Option[TokenBucket] = return perPeerRateLimiter.peerBucket.mgetOrPut( - peerId, newTokenBucket(perPeerRateLimiter.setting, ReplenishMode.Compensating) + peerId, newTokenBucket(perPeerRateLimiter.setting, ReplenishMode.Continuous) ) template checkUsageLimit*( diff --git a/waku/common/rate_limit/single_token_limiter.nim b/waku/common/rate_limit/single_token_limiter.nim index 50fb2d64c..f976c5367 100644 --- a/waku/common/rate_limit/single_token_limiter.nim +++ b/waku/common/rate_limit/single_token_limiter.nim @@ -6,12 +6,14 @@ import std/[options], chronos/timer, libp2p/stream/connection, libp2p/utility import std/times except TimeInterval, Duration -import ./[token_bucket, setting, service_metrics] +import chronos/ratelimit as token_bucket + +import ./[setting, service_metrics] export token_bucket, setting, service_metrics proc newTokenBucket*( setting: Option[RateLimitSetting], - replenishMode: ReplenishMode = ReplenishMode.Compensating, + replenishMode: ReplenishMode = ReplenishMode.Continuous, ): Option[TokenBucket] = if setting.isNone(): return none[TokenBucket]() @@ -19,7 +21,13 @@ proc newTokenBucket*( if setting.get().isUnlimited(): return none[TokenBucket]() - return some(TokenBucket.new(setting.get().volume, setting.get().period)) + return some( + TokenBucket.new( + capacity = setting.get().volume, + fillDuration = setting.get().period, + mode = Continuous, + ) + ) proc checkUsage( t: var TokenBucket, proto: string, now = Moment.now() diff --git a/waku/common/rate_limit/token_bucket.nim b/waku/common/rate_limit/token_bucket.nim deleted file mode 100644 index 799817ebd..000000000 --- a/waku/common/rate_limit/token_bucket.nim +++ /dev/null @@ -1,182 +0,0 @@ -{.push raises: [].} - -import chronos, std/math, std/options - -const BUDGET_COMPENSATION_LIMIT_PERCENT = 0.25 - -## This is an extract from chronos/rate_limit.nim due to the found bug in the original implementation. -## Unfortunately that bug cannot be solved without harm the original features of TokenBucket class. -## So, this current shortcut is used to enable move ahead with nwaku rate limiter implementation. -## ref: https://github.com/status-im/nim-chronos/issues/500 -## -## This version of TokenBucket is different from the original one in chronos/rate_limit.nim in many ways: -## - It has a new mode called `Compensating` which is the default mode. -## Compensation is calculated as the not used bucket capacity in the last measured period(s) in average. -## or up until maximum the allowed compansation treshold (Currently it is const 25%). -## Also compensation takes care of the proper time period calculation to avoid non-usage periods that can lead to -## overcompensation. -## - Strict mode is also available which will only replenish when time period is over but also will fill -## the bucket to the max capacity. - -type - ReplenishMode* = enum - Strict - Compensating - - TokenBucket* = ref object - budget: int ## Current number of tokens in the bucket - budgetCap: int ## Bucket capacity - lastTimeFull: Moment - ## This timer measures the proper periodizaiton of the bucket refilling - fillDuration: Duration ## Refill period - case replenishMode*: ReplenishMode - of Strict: - ## In strict mode, the bucket is refilled only till the budgetCap - discard - of Compensating: - ## This is the default mode. - maxCompensation: float - -func periodDistance(bucket: TokenBucket, currentTime: Moment): float = - ## notice fillDuration cannot be zero by design - ## period distance is a float number representing the calculated period time - ## since the last time bucket was refilled. - return - nanoseconds(currentTime - bucket.lastTimeFull).float / - nanoseconds(bucket.fillDuration).float - -func getUsageAverageSince(bucket: TokenBucket, distance: float): float = - if distance == 0.float: - ## in case there is zero time difference than the usage percentage is 100% - return 1.0 - - ## budgetCap can never be zero - ## usage average is calculated as a percentage of total capacity available over - ## the measured period - return bucket.budget.float / bucket.budgetCap.float / distance - -proc calcCompensation(bucket: TokenBucket, averageUsage: float): int = - # if we already fully used or even overused the tokens, there is no place for compensation - if averageUsage >= 1.0: - return 0 - - ## compensation is the not used bucket capacity in the last measured period(s) in average. - ## or maximum the allowed compansation treshold - let compensationPercent = - min((1.0 - averageUsage) * bucket.budgetCap.float, bucket.maxCompensation) - return trunc(compensationPercent).int - -func periodElapsed(bucket: TokenBucket, currentTime: Moment): bool = - return currentTime - bucket.lastTimeFull >= bucket.fillDuration - -## Update will take place if bucket is empty and trying to consume tokens. -## It checks if the bucket can be replenished as refill duration is passed or not. -## - strict mode: -proc updateStrict(bucket: TokenBucket, currentTime: Moment) = - if bucket.fillDuration == default(Duration): - bucket.budget = min(bucket.budgetCap, bucket.budget) - return - - if not periodElapsed(bucket, currentTime): - return - - bucket.budget = bucket.budgetCap - bucket.lastTimeFull = currentTime - -## - compensating - ballancing load: -## - between updates we calculate average load (current bucket capacity / number of periods till last update) -## - gives the percentage load used recently -## - with this we can replenish bucket up to 100% + calculated leftover from previous period (caped with max treshold) -proc updateWithCompensation(bucket: TokenBucket, currentTime: Moment) = - if bucket.fillDuration == default(Duration): - bucket.budget = min(bucket.budgetCap, bucket.budget) - return - - # do not replenish within the same period - if not periodElapsed(bucket, currentTime): - return - - let distance = bucket.periodDistance(currentTime) - let recentAvgUsage = bucket.getUsageAverageSince(distance) - let compensation = bucket.calcCompensation(recentAvgUsage) - - bucket.budget = bucket.budgetCap + compensation - bucket.lastTimeFull = currentTime - -proc update(bucket: TokenBucket, currentTime: Moment) = - if bucket.replenishMode == ReplenishMode.Compensating: - updateWithCompensation(bucket, currentTime) - else: - updateStrict(bucket, currentTime) - -proc tryConsume*(bucket: TokenBucket, tokens: int, now = Moment.now()): bool = - ## If `tokens` are available, consume them, - ## Otherwhise, return false. - - if bucket.budget >= bucket.budgetCap: - bucket.lastTimeFull = now - - if bucket.budget >= tokens: - bucket.budget -= tokens - return true - - bucket.update(now) - - if bucket.budget >= tokens: - bucket.budget -= tokens - return true - else: - return false - -proc replenish*(bucket: TokenBucket, tokens: int, now = Moment.now()) = - ## Add `tokens` to the budget (capped to the bucket capacity) - bucket.budget += tokens - bucket.update(now) - -proc new*( - T: type[TokenBucket], - budgetCap: int, - fillDuration: Duration = 1.seconds, - mode: ReplenishMode = ReplenishMode.Compensating, -): T = - assert not isZero(fillDuration) - assert budgetCap != 0 - - ## Create different mode TokenBucket - case mode - of ReplenishMode.Strict: - return T( - budget: budgetCap, - budgetCap: budgetCap, - fillDuration: fillDuration, - lastTimeFull: Moment.now(), - replenishMode: mode, - ) - of ReplenishMode.Compensating: - T( - budget: budgetCap, - budgetCap: budgetCap, - fillDuration: fillDuration, - lastTimeFull: Moment.now(), - replenishMode: mode, - maxCompensation: budgetCap.float * BUDGET_COMPENSATION_LIMIT_PERCENT, - ) - -proc newStrict*(T: type[TokenBucket], capacity: int, period: Duration): TokenBucket = - T.new(capacity, period, ReplenishMode.Strict) - -proc newCompensating*( - T: type[TokenBucket], capacity: int, period: Duration -): TokenBucket = - T.new(capacity, period, ReplenishMode.Compensating) - -func `$`*(b: TokenBucket): string {.inline.} = - if isNil(b): - return "nil" - return $b.budgetCap & "/" & $b.fillDuration - -func `$`*(ob: Option[TokenBucket]): string {.inline.} = - if ob.isNone(): - return "no-limit" - - return $ob.get() diff --git a/waku/factory/builder.nim b/waku/factory/builder.nim index 772cfbffd..f379f92bb 100644 --- a/waku/factory/builder.nim +++ b/waku/factory/builder.nim @@ -209,6 +209,7 @@ proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] = maxServicePeers = some(builder.maxServicePeers), colocationLimit = builder.colocationLimit, shardedPeerManagement = builder.shardAware, + maxConnections = builder.switchMaxConnections.get(builders.MaxConnections), ) var node: WakuNode diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index bed8a9137..d55206f97 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -13,8 +13,6 @@ import libp2p/services/autorelayservice, libp2p/services/hpservice, libp2p/peerid, - libp2p/discovery/discoverymngr, - libp2p/discovery/rendezvousinterface, eth/keys, eth/p2p/discoveryv5/enr, presto, @@ -63,7 +61,6 @@ type Waku* = ref object dynamicBootstrapNodes*: seq[RemotePeerInfo] dnsRetryLoopHandle: Future[void] networkConnLoopHandle: Future[void] - discoveryMngr: DiscoveryManager node*: WakuNode diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index 1abcc1ac0..487d3894d 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -103,6 +103,7 @@ type PeerManager* = ref object of RootObj onConnectionChange*: ConnectionChangeHandler online: bool ## state managed by online_monitor module getShards: GetShards + maxConnections: int #~~~~~~~~~~~~~~~~~~~# # Helper Functions # @@ -748,7 +749,6 @@ proc logAndMetrics(pm: PeerManager) {.async.} = var peerStore = pm.switch.peerStore # log metrics let (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec) - let maxConnections = pm.switch.connManager.inSema.size let notConnectedPeers = peerStore.getDisconnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs)) let outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId)) @@ -758,7 +758,7 @@ proc logAndMetrics(pm: PeerManager) {.async.} = info "Relay peer connections", inRelayConns = $inRelayPeers.len & "/" & $pm.inRelayPeersTarget, outRelayConns = $outRelayPeers.len & "/" & $pm.outRelayPeersTarget, - totalConnections = $totalConnections & "/" & $maxConnections, + totalConnections = $totalConnections & "/" & $pm.maxConnections, notConnectedPeers = notConnectedPeers.len, outsideBackoffPeers = outsideBackoffPeers.len @@ -1048,9 +1048,9 @@ proc new*( maxFailedAttempts = MaxFailedAttempts, colocationLimit = DefaultColocationLimit, shardedPeerManagement = false, + maxConnections: int = MaxConnections, ): PeerManager {.gcsafe.} = let capacity = switch.peerStore.capacity - let maxConnections = switch.connManager.inSema.size if maxConnections > capacity: error "Max number of connections can't be greater than PeerManager capacity", capacity = capacity, maxConnections = maxConnections @@ -1099,6 +1099,7 @@ proc new*( colocationLimit: colocationLimit, shardedPeerManagement: shardedPeerManagement, online: true, + maxConnections: maxConnections, ) proc peerHook( diff --git a/waku/waku_rendezvous/protocol.nim b/waku/waku_rendezvous/protocol.nim index 7b97375ff..00b5f1a5c 100644 --- a/waku/waku_rendezvous/protocol.nim +++ b/waku/waku_rendezvous/protocol.nim @@ -8,7 +8,6 @@ import stew/byteutils, libp2p/protocols/rendezvous, libp2p/protocols/rendezvous/protobuf, - libp2p/discovery/discoverymngr, libp2p/utils/semaphore, libp2p/utils/offsettedseq, libp2p/crypto/curve25519,