Merge master and resolve conflicts - use master's vendor dependencies

This commit is contained in:
darshankabariya 2026-01-08 03:22:54 +05:30
commit 526cd37535
No known key found for this signature in database
GPG Key ID: 9A92CCD9899F0D22
19 changed files with 265 additions and 410 deletions

12
.gitmodules vendored
View File

@ -12,4 +12,14 @@
url = https://github.com/logos-messaging/waku-rlnv2-contract.git
ignore = untracked
branch = master
[submodule "vendor/nim-lsquic"]
path = vendor/nim-lsquic
url = https://github.com/vacp2p/nim-lsquic
[submodule "vendor/nim-jwt"]
path = vendor/nim-jwt
url = https://github.com/vacp2p/nim-jwt.git
[submodule "vendor/nim-ffi"]
path = vendor/nim-ffi
url = https://github.com/logos-messaging/nim-ffi/
ignore = untracked
branch = master

View File

@ -6,7 +6,6 @@ import
./test_protobuf_validation,
./test_sqlite_migrations,
./test_parse_size,
./test_tokenbucket,
./test_requestratelimiter,
./test_ratelimit_setting,
./test_timed_map,

View File

@ -1,69 +0,0 @@
# Chronos Test Suite
# (c) Copyright 2022-Present
# Status Research & Development GmbH
#
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
{.used.}
import testutils/unittests
import chronos
import ../../waku/common/rate_limit/token_bucket
suite "Token Bucket":
test "TokenBucket Sync test - strict":
var bucket = TokenBucket.newStrict(1000, 1.milliseconds)
let
start = Moment.now()
fullTime = start + 1.milliseconds
check:
bucket.tryConsume(800, start) == true
bucket.tryConsume(200, start) == true
# Out of budget
bucket.tryConsume(100, start) == false
bucket.tryConsume(800, fullTime) == true
bucket.tryConsume(200, fullTime) == true
# Out of budget
bucket.tryConsume(100, fullTime) == false
test "TokenBucket Sync test - compensating":
var bucket = TokenBucket.new(1000, 1.milliseconds)
let
start = Moment.now()
fullTime = start + 1.milliseconds
check:
bucket.tryConsume(800, start) == true
bucket.tryConsume(200, start) == true
# Out of budget
bucket.tryConsume(100, start) == false
bucket.tryConsume(800, fullTime) == true
bucket.tryConsume(200, fullTime) == true
# Due not using the bucket for a full period the compensation will satisfy this request
bucket.tryConsume(100, fullTime) == true
test "TokenBucket Max compensation":
var bucket = TokenBucket.new(1000, 1.minutes)
var reqTime = Moment.now()
check bucket.tryConsume(1000, reqTime)
check bucket.tryConsume(1, reqTime) == false
reqTime += 1.minutes
check bucket.tryConsume(500, reqTime) == true
reqTime += 1.minutes
check bucket.tryConsume(1000, reqTime) == true
reqTime += 10.seconds
# max compensation is 25% so try to consume 250 more
check bucket.tryConsume(250, reqTime) == true
reqTime += 49.seconds
# out of budget within the same period
check bucket.tryConsume(1, reqTime) == false
test "TokenBucket Short replenish":
var bucket = TokenBucket.new(15000, 1.milliseconds)
let start = Moment.now()
check bucket.tryConsume(15000, start)
check bucket.tryConsume(1, start) == false
check bucket.tryConsume(15000, start + 1.milliseconds) == true

View File

@ -997,6 +997,7 @@ procSuite "Peer Manager":
.build(),
maxFailedAttempts = 1,
storage = nil,
maxConnections = 20,
)
# Create 30 peers and add them to the peerstore
@ -1063,6 +1064,7 @@ procSuite "Peer Manager":
backoffFactor = 2,
maxFailedAttempts = 10,
storage = nil,
maxConnections = 20,
)
var p1: PeerId
require p1.init("QmeuZJbXrszW2jdT7GdduSjQskPU3S7vvGWKtKgDfkDvW" & "1")
@ -1116,6 +1118,7 @@ procSuite "Peer Manager":
.build(),
maxFailedAttempts = 150,
storage = nil,
maxConnections = 20,
)
# Should result in backoff > 1 week
@ -1131,6 +1134,7 @@ procSuite "Peer Manager":
.build(),
maxFailedAttempts = 10,
storage = nil,
maxConnections = 20,
)
let pm = PeerManager.new(
@ -1144,6 +1148,7 @@ procSuite "Peer Manager":
.build(),
maxFailedAttempts = 5,
storage = nil,
maxConnections = 20,
)
asyncTest "colocationLimit is enforced by pruneConnsByIp()":

View File

@ -122,24 +122,51 @@ suite "Waku Filter - DOS protection":
check client2.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) ==
none(FilterSubscribeErrorKind)
await sleepAsync(20.milliseconds)
check client1.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) ==
none(FilterSubscribeErrorKind)
# Avoid using tiny sleeps to control refill behavior: CI scheduling can
# oversleep and mint additional tokens. Instead, issue a small burst of
# subscribe requests and require at least one TOO_MANY_REQUESTS.
var c1SubscribeFutures = newSeq[Future[FilterSubscribeResult]]()
for i in 0 ..< 6:
c1SubscribeFutures.add(
client1.wakuFilterClient.subscribe(
serverRemotePeerInfo, pubsubTopic, contentTopicSeq
)
)
let c1Finished = await allFinished(c1SubscribeFutures)
var c1GotTooMany = false
for fut in c1Finished:
check not fut.failed()
let res = fut.read()
if res.isErr() and res.error().kind == FilterSubscribeErrorKind.TOO_MANY_REQUESTS:
c1GotTooMany = true
break
check c1GotTooMany
# Ensure the other client is not affected by client1's rate limit.
check client2.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) ==
none(FilterSubscribeErrorKind)
await sleepAsync(20.milliseconds)
check client1.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) ==
none(FilterSubscribeErrorKind)
await sleepAsync(20.milliseconds)
check client1.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) ==
some(FilterSubscribeErrorKind.TOO_MANY_REQUESTS)
check client2.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) ==
none(FilterSubscribeErrorKind)
check client2.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) ==
some(FilterSubscribeErrorKind.TOO_MANY_REQUESTS)
var c2SubscribeFutures = newSeq[Future[FilterSubscribeResult]]()
for i in 0 ..< 6:
c2SubscribeFutures.add(
client2.wakuFilterClient.subscribe(
serverRemotePeerInfo, pubsubTopic, contentTopicSeq
)
)
let c2Finished = await allFinished(c2SubscribeFutures)
var c2GotTooMany = false
for fut in c2Finished:
check not fut.failed()
let res = fut.read()
if res.isErr() and res.error().kind == FilterSubscribeErrorKind.TOO_MANY_REQUESTS:
c2GotTooMany = true
break
check c2GotTooMany
# ensure period of time has passed and clients can again use the service
await sleepAsync(1000.milliseconds)
await sleepAsync(1100.milliseconds)
check client1.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) ==
none(FilterSubscribeErrorKind)
check client2.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) ==
@ -147,29 +174,54 @@ suite "Waku Filter - DOS protection":
asyncTest "Ensure normal usage allowed":
# Given
# Rate limit setting is (3 requests / 1000ms) per peer.
# In a token-bucket model this means:
# - capacity = 3 tokens
# - refill rate = 3 tokens / second => ~1 token every ~333ms
# - each request consumes 1 token (including UNSUBSCRIBE)
check client1.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) ==
none(FilterSubscribeErrorKind)
check wakuFilter.subscriptions.isSubscribed(client1.clientPeerId)
await sleepAsync(500.milliseconds)
check client1.ping(serverRemotePeerInfo) == none(FilterSubscribeErrorKind)
check wakuFilter.subscriptions.isSubscribed(client1.clientPeerId)
# Expected remaining tokens (approx): 2
await sleepAsync(500.milliseconds)
check client1.ping(serverRemotePeerInfo) == none(FilterSubscribeErrorKind)
check wakuFilter.subscriptions.isSubscribed(client1.clientPeerId)
await sleepAsync(50.milliseconds)
# After ~500ms, ~1 token refilled; PING consumes 1 => expected remaining: 2
await sleepAsync(500.milliseconds)
check client1.ping(serverRemotePeerInfo) == none(FilterSubscribeErrorKind)
check wakuFilter.subscriptions.isSubscribed(client1.clientPeerId)
# After another ~500ms, ~1 token refilled; PING consumes 1 => expected remaining: 2
check client1.unsubscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) ==
none(FilterSubscribeErrorKind)
check wakuFilter.subscriptions.isSubscribed(client1.clientPeerId) == false
await sleepAsync(50.milliseconds)
check client1.ping(serverRemotePeerInfo) == some(FilterSubscribeErrorKind.NOT_FOUND)
check client1.ping(serverRemotePeerInfo) == some(FilterSubscribeErrorKind.NOT_FOUND)
await sleepAsync(50.milliseconds)
check client1.ping(serverRemotePeerInfo) ==
some(FilterSubscribeErrorKind.TOO_MANY_REQUESTS)
# After unsubscribing, PING is expected to return NOT_FOUND while still
# counting towards the rate limit.
# CI can oversleep / schedule slowly, which can mint extra tokens between
# requests. To make the test robust, issue a small burst of pings and
# require at least one TOO_MANY_REQUESTS response.
var pingFutures = newSeq[Future[FilterSubscribeResult]]()
for i in 0 ..< 9:
pingFutures.add(client1.wakuFilterClient.ping(serverRemotePeerInfo))
let finished = await allFinished(pingFutures)
var gotTooMany = false
for fut in finished:
check not fut.failed()
let pingRes = fut.read()
if pingRes.isErr() and pingRes.error().kind == FilterSubscribeErrorKind.TOO_MANY_REQUESTS:
gotTooMany = true
break
check gotTooMany
check client2.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) ==
none(FilterSubscribeErrorKind)

View File

@ -80,11 +80,12 @@ suite "Rate limited push service":
await allFutures(serverSwitch.start(), clientSwitch.start())
## Given
var handlerFuture = newFuture[(string, WakuMessage)]()
# Don't rely on per-request timing assumptions or a single shared Future.
# CI can be slow enough that sequential requests accidentally refill tokens.
# Instead we issue a small burst and assert we observe at least one rejection.
let handler = proc(
peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage
): Future[WakuLightPushResult] {.async.} =
handlerFuture.complete((pubsubTopic, message))
return lightpushSuccessResult(1)
let
@ -93,45 +94,38 @@ suite "Rate limited push service":
client = newTestWakuLightpushClient(clientSwitch)
let serverPeerId = serverSwitch.peerInfo.toRemotePeerInfo()
let topic = DefaultPubsubTopic
let tokenPeriod = 500.millis
let successProc = proc(): Future[void] {.async.} =
# Fire a burst of requests; require at least one success and one rejection.
var publishFutures = newSeq[Future[WakuLightPushResult]]()
for i in 0 ..< 10:
let message = fakeWakuMessage()
handlerFuture = newFuture[(string, WakuMessage)]()
let requestRes =
await client.publish(some(DefaultPubsubTopic), message, serverPeerId)
discard await handlerFuture.withTimeout(10.millis)
publishFutures.add(
client.publish(some(DefaultPubsubTopic), message, serverPeerId)
)
check:
requestRes.isOk()
handlerFuture.finished()
let (handledMessagePubsubTopic, handledMessage) = handlerFuture.read()
check:
handledMessagePubsubTopic == DefaultPubsubTopic
handledMessage == message
let finished = await allFinished(publishFutures)
var gotOk = false
var gotTooMany = false
for fut in finished:
check not fut.failed()
let res = fut.read()
if res.isOk():
gotOk = true
else:
check res.error.code == LightPushErrorCode.TOO_MANY_REQUESTS
check res.error.desc == some(TooManyRequestsMessage)
gotTooMany = true
let rejectProc = proc(): Future[void] {.async.} =
let message = fakeWakuMessage()
handlerFuture = newFuture[(string, WakuMessage)]()
let requestRes =
await client.publish(some(DefaultPubsubTopic), message, serverPeerId)
discard await handlerFuture.withTimeout(10.millis)
check gotOk
check gotTooMany
check:
requestRes.isErr()
requestRes.error.code == LightPushErrorCode.TOO_MANY_REQUESTS
requestRes.error.desc == some(TooManyRequestsMessage)
for testCnt in 0 .. 2:
await successProc()
await sleepAsync(20.millis)
await rejectProc()
await sleepAsync(500.millis)
## next one shall succeed due to the rate limit time window has passed
await successProc()
# ensure period of time has passed and the client can again use the service
await sleepAsync(tokenPeriod + 100.millis)
let recoveryRes = await client.publish(
some(DefaultPubsubTopic), fakeWakuMessage(), serverPeerId
)
check recoveryRes.isOk()
## Cleanup
await allFutures(clientSwitch.stop(), serverSwitch.stop())

View File

@ -86,58 +86,52 @@ suite "Rate limited push service":
await allFutures(serverSwitch.start(), clientSwitch.start())
## Given
var handlerFuture = newFuture[(string, WakuMessage)]()
let handler = proc(
peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.} =
handlerFuture.complete((pubsubTopic, message))
return ok()
let
tokenPeriod = 500.millis
server = await newTestWakuLegacyLightpushNode(
serverSwitch, handler, some((3, 500.millis))
serverSwitch, handler, some((3, tokenPeriod))
)
client = newTestWakuLegacyLightpushClient(clientSwitch)
let serverPeerId = serverSwitch.peerInfo.toRemotePeerInfo()
let topic = DefaultPubsubTopic
let successProc = proc(): Future[void] {.async.} =
let message = fakeWakuMessage()
handlerFuture = newFuture[(string, WakuMessage)]()
let requestRes =
await client.publish(DefaultPubsubTopic, message, peer = serverPeerId)
discard await handlerFuture.withTimeout(10.millis)
# Avoid assuming the exact Nth request will be rejected. With Chronos TokenBucket
# minting semantics and real network latency, CI timing can allow refills.
# Instead, send a short burst and require that we observe at least one rejection.
let burstSize = 10
var publishFutures: seq[Future[WakuLightPushResult[string]]] = @[]
for _ in 0 ..< burstSize:
publishFutures.add(
client.publish(DefaultPubsubTopic, fakeWakuMessage(), peer = serverPeerId)
)
check:
requestRes.isOk()
handlerFuture.finished()
let (handledMessagePubsubTopic, handledMessage) = handlerFuture.read()
check:
handledMessagePubsubTopic == DefaultPubsubTopic
handledMessage == message
let finished = await allFinished(publishFutures)
var gotOk = false
var gotTooMany = false
for fut in finished:
check not fut.failed()
let res = fut.read()
if res.isOk():
gotOk = true
elif res.error == "TOO_MANY_REQUESTS":
gotTooMany = true
let rejectProc = proc(): Future[void] {.async.} =
let message = fakeWakuMessage()
handlerFuture = newFuture[(string, WakuMessage)]()
let requestRes =
await client.publish(DefaultPubsubTopic, message, peer = serverPeerId)
discard await handlerFuture.withTimeout(10.millis)
check:
gotOk
gotTooMany
check:
requestRes.isErr()
requestRes.error == "TOO_MANY_REQUESTS"
for testCnt in 0 .. 2:
await successProc()
await sleepAsync(20.millis)
await rejectProc()
await sleepAsync(500.millis)
await sleepAsync(tokenPeriod + 100.millis)
## next one shall succeed due to the rate limit time window has passed
await successProc()
let afterCooldownRes =
await client.publish(DefaultPubsubTopic, fakeWakuMessage(), peer = serverPeerId)
check:
afterCooldownRes.isOk()
## Cleanup
await allFutures(clientSwitch.stop(), serverSwitch.stop())

View File

@ -413,7 +413,7 @@ procSuite "WakuNode - Store":
for count in 0 ..< 3:
waitFor successProc()
waitFor sleepAsync(20.millis)
waitFor sleepAsync(5.millis)
waitFor failsProc()

1
vendor/nim-jwt vendored Submodule

@ -0,0 +1 @@
Subproject commit 18f8378de52b241f321c1f9ea905456e89b95c6f

1
vendor/nim-lsquic vendored Submodule

@ -0,0 +1 @@
Subproject commit f3fe33462601ea34eb2e8e9c357c92e61f8d121b

View File

@ -11,45 +11,29 @@ license = "MIT or Apache License 2.0"
#bin = @["build/waku"]
### Dependencies
requires "nim >= 2.2.4"
# Pure Nim packages - use flexible version requirements
# Exact versions are locked via nimble.lock for reproducible builds
requires "chronicles"
requires "chronos"
requires "confutils"
requires "dnsdisc"
requires "faststreams"
requires "httputils"
requires "json_rpc"
requires "json_serialization"
requires "serialization"
requires "stew"
requires "stint"
requires "metrics"
requires "presto"
requires "taskpools"
requires "testutils"
requires "unittest2"
requires "websock"
requires "zlib"
requires "toml_serialization"
requires "minilru"
requires "regex"
requires "unicodedb"
requires "results"
requires "nimcrypto"
requires "db_connector"
requires "dnsclient"
requires "nph"
requires "ffi"
requires "eth"
requires "libp2p"
requires "web3"
requires "bearssl"
requires "secp256k1"
requires "nat_traversal"
requires "sqlite3_abi"
requires "nim >= 2.2.4",
"chronicles",
"confutils",
"chronos",
"dnsdisc",
"eth",
"json_rpc",
"libbacktrace",
"nimcrypto",
"serialization",
"stew",
"stint",
"metrics",
"libp2p >= 1.14.3",
"web3",
"presto",
"regex",
"results",
"db_connector",
"minilru",
"lsquic",
"jwt",
"ffi"
### Helper functions
proc buildModule(filePath, params = "", lang = "c"): bool =
@ -166,7 +150,8 @@ task chat2, "Build example Waku chat usage":
let name = "chat2"
buildBinary name,
"apps/chat2/",
"-d:chronicles_sinks=textlines[file] -d:ssl -d:chronicles_log_level='TRACE' "
"-d:chronicles_sinks=textlines[file] -d:chronicles_log_level='TRACE' "
# -d:ssl - cause unlisted exception error in libp2p/utility...
task chat2mix, "Build example Waku chat mix usage":
# NOTE For debugging, set debug level. For chat usage we want minimal log
@ -176,7 +161,8 @@ task chat2mix, "Build example Waku chat mix usage":
let name = "chat2mix"
buildBinary name,
"apps/chat2mix/",
"-d:chronicles_sinks=textlines[file] -d:ssl -d:chronicles_log_level='TRACE' "
"-d:chronicles_sinks=textlines[file] -d:chronicles_log_level='TRACE' "
# -d:ssl - cause unlisted exception error in libp2p/utility...
task chat2bridge, "Build chat2bridge":
let name = "chat2bridge"

View File

@ -20,7 +20,7 @@ proc mgetOrPut(
perPeerRateLimiter: var PerPeerRateLimiter, peerId: PeerId
): var Option[TokenBucket] =
return perPeerRateLimiter.peerBucket.mgetOrPut(
peerId, newTokenBucket(perPeerRateLimiter.setting, ReplenishMode.Compensating)
peerId, newTokenBucket(perPeerRateLimiter.setting, ReplenishMode.Continuous)
)
template checkUsageLimit*(

View File

@ -39,38 +39,82 @@ const SECONDS_RATIO = 3
const MINUTES_RATIO = 2
type RequestRateLimiter* = ref object of RootObj
tokenBucket: Option[TokenBucket]
tokenBucket: TokenBucket
setting*: Option[RateLimitSetting]
mainBucketSetting: RateLimitSetting
ratio: int
peerBucketSetting*: RateLimitSetting
peerUsage: TimedMap[PeerId, TokenBucket]
checkUsageImpl: proc(
t: var RequestRateLimiter, proto: string, conn: Connection, now: Moment
): bool {.gcsafe, raises: [].}
proc newMainTokenBucket(
setting: RateLimitSetting, ratio: int, startTime: Moment
): TokenBucket =
## RequestRateLimiter's global bucket should keep the *rate* of the configured
## setting while allowing a larger burst window. We achieve this by scaling
## both capacity and fillDuration by the same ratio.
##
## This matches previous behavior where unused tokens could effectively
## accumulate across multiple periods.
let burstCapacity = setting.volume * ratio
var bucket = TokenBucket.new(
capacity = burstCapacity,
fillDuration = setting.period * ratio,
startTime = startTime,
mode = Continuous,
)
# Start with the configured volume (not the burst capacity) so that the
# initial burst behavior matches the raw setting, while still allowing
# accumulation up to `burstCapacity` over time.
let excess = burstCapacity - setting.volume
if excess > 0:
discard bucket.tryConsume(excess, startTime)
return bucket
proc mgetOrPut(
requestRateLimiter: var RequestRateLimiter, peerId: PeerId
requestRateLimiter: var RequestRateLimiter, peerId: PeerId, now: Moment
): var TokenBucket =
let bucketForNew = newTokenBucket(some(requestRateLimiter.peerBucketSetting)).valueOr:
let bucketForNew = newTokenBucket(
some(requestRateLimiter.peerBucketSetting), Discrete, now
).valueOr:
raiseAssert "This branch is not allowed to be reached as it will not be called if the setting is None."
return requestRateLimiter.peerUsage.mgetOrPut(peerId, bucketForNew)
proc checkUsage*(
t: var RequestRateLimiter, proto: string, conn: Connection, now = Moment.now()
): bool {.raises: [].} =
if t.tokenBucket.isNone():
return true
proc checkUsageUnlimited(
t: var RequestRateLimiter, proto: string, conn: Connection, now: Moment
): bool {.gcsafe, raises: [].} =
true
let peerBucket = t.mgetOrPut(conn.peerId)
proc checkUsageLimited(
t: var RequestRateLimiter, proto: string, conn: Connection, now: Moment
): bool {.gcsafe, raises: [].} =
# Lazy-init the main bucket using the first observed request time. This makes
# refill behavior deterministic under tests where `now` is controlled.
if isNil(t.tokenBucket):
t.tokenBucket = newMainTokenBucket(t.mainBucketSetting, t.ratio, now)
let peerBucket = t.mgetOrPut(conn.peerId, now)
## check requesting peer's usage is not over the calculated ratio and let that peer go which not requested much/or this time...
if not peerBucket.tryConsume(1, now):
trace "peer usage limit reached", peer = conn.peerId
return false
# Ok if the peer can consume, check the overall budget we have left
let tokenBucket = t.tokenBucket.get()
if not tokenBucket.tryConsume(1, now):
if not t.tokenBucket.tryConsume(1, now):
return false
return true
proc checkUsage*(
t: var RequestRateLimiter, proto: string, conn: Connection, now = Moment.now()
): bool {.raises: [].} =
t.checkUsageImpl(t, proto, conn, now)
template checkUsageLimit*(
t: var RequestRateLimiter,
proto: string,
@ -135,9 +179,19 @@ func calcPeerTokenSetting(
proc newRequestRateLimiter*(setting: Option[RateLimitSetting]): RequestRateLimiter =
let ratio = calcPeriodRatio(setting)
let isLimited = setting.isSome() and not setting.get().isUnlimited()
let mainBucketSetting =
if isLimited:
setting.get()
else:
(0, 0.minutes)
return RequestRateLimiter(
tokenBucket: newTokenBucket(setting),
tokenBucket: nil,
setting: setting,
mainBucketSetting: mainBucketSetting,
ratio: ratio,
peerBucketSetting: calcPeerTokenSetting(setting, ratio),
peerUsage: init(TimedMap[PeerId, TokenBucket], calcCacheTimeout(setting, ratio)),
checkUsageImpl: (if isLimited: checkUsageLimited else: checkUsageUnlimited),
)

View File

@ -6,12 +6,15 @@ import std/[options], chronos/timer, libp2p/stream/connection, libp2p/utility
import std/times except TimeInterval, Duration
import ./[token_bucket, setting, service_metrics]
import chronos/ratelimit as token_bucket
import ./[setting, service_metrics]
export token_bucket, setting, service_metrics
proc newTokenBucket*(
setting: Option[RateLimitSetting],
replenishMode: ReplenishMode = ReplenishMode.Compensating,
replenishMode: static[ReplenishMode] = ReplenishMode.Continuous,
startTime: Moment = Moment.now(),
): Option[TokenBucket] =
if setting.isNone():
return none[TokenBucket]()
@ -19,7 +22,14 @@ proc newTokenBucket*(
if setting.get().isUnlimited():
return none[TokenBucket]()
return some(TokenBucket.new(setting.get().volume, setting.get().period))
return some(
TokenBucket.new(
capacity = setting.get().volume,
fillDuration = setting.get().period,
startTime = startTime,
mode = replenishMode,
)
)
proc checkUsage(
t: var TokenBucket, proto: string, now = Moment.now()

View File

@ -1,182 +0,0 @@
{.push raises: [].}
import chronos, std/math, std/options
const BUDGET_COMPENSATION_LIMIT_PERCENT = 0.25
## This is an extract from chronos/rate_limit.nim due to the found bug in the original implementation.
## Unfortunately that bug cannot be solved without harm the original features of TokenBucket class.
## So, this current shortcut is used to enable move ahead with nwaku rate limiter implementation.
## ref: https://github.com/status-im/nim-chronos/issues/500
##
## This version of TokenBucket is different from the original one in chronos/rate_limit.nim in many ways:
## - It has a new mode called `Compensating` which is the default mode.
## Compensation is calculated as the not used bucket capacity in the last measured period(s) in average.
## or up until maximum the allowed compansation treshold (Currently it is const 25%).
## Also compensation takes care of the proper time period calculation to avoid non-usage periods that can lead to
## overcompensation.
## - Strict mode is also available which will only replenish when time period is over but also will fill
## the bucket to the max capacity.
type
ReplenishMode* = enum
Strict
Compensating
TokenBucket* = ref object
budget: int ## Current number of tokens in the bucket
budgetCap: int ## Bucket capacity
lastTimeFull: Moment
## This timer measures the proper periodizaiton of the bucket refilling
fillDuration: Duration ## Refill period
case replenishMode*: ReplenishMode
of Strict:
## In strict mode, the bucket is refilled only till the budgetCap
discard
of Compensating:
## This is the default mode.
maxCompensation: float
func periodDistance(bucket: TokenBucket, currentTime: Moment): float =
## notice fillDuration cannot be zero by design
## period distance is a float number representing the calculated period time
## since the last time bucket was refilled.
return
nanoseconds(currentTime - bucket.lastTimeFull).float /
nanoseconds(bucket.fillDuration).float
func getUsageAverageSince(bucket: TokenBucket, distance: float): float =
if distance == 0.float:
## in case there is zero time difference than the usage percentage is 100%
return 1.0
## budgetCap can never be zero
## usage average is calculated as a percentage of total capacity available over
## the measured period
return bucket.budget.float / bucket.budgetCap.float / distance
proc calcCompensation(bucket: TokenBucket, averageUsage: float): int =
# if we already fully used or even overused the tokens, there is no place for compensation
if averageUsage >= 1.0:
return 0
## compensation is the not used bucket capacity in the last measured period(s) in average.
## or maximum the allowed compansation treshold
let compensationPercent =
min((1.0 - averageUsage) * bucket.budgetCap.float, bucket.maxCompensation)
return trunc(compensationPercent).int
func periodElapsed(bucket: TokenBucket, currentTime: Moment): bool =
return currentTime - bucket.lastTimeFull >= bucket.fillDuration
## Update will take place if bucket is empty and trying to consume tokens.
## It checks if the bucket can be replenished as refill duration is passed or not.
## - strict mode:
proc updateStrict(bucket: TokenBucket, currentTime: Moment) =
if bucket.fillDuration == default(Duration):
bucket.budget = min(bucket.budgetCap, bucket.budget)
return
if not periodElapsed(bucket, currentTime):
return
bucket.budget = bucket.budgetCap
bucket.lastTimeFull = currentTime
## - compensating - ballancing load:
## - between updates we calculate average load (current bucket capacity / number of periods till last update)
## - gives the percentage load used recently
## - with this we can replenish bucket up to 100% + calculated leftover from previous period (caped with max treshold)
proc updateWithCompensation(bucket: TokenBucket, currentTime: Moment) =
if bucket.fillDuration == default(Duration):
bucket.budget = min(bucket.budgetCap, bucket.budget)
return
# do not replenish within the same period
if not periodElapsed(bucket, currentTime):
return
let distance = bucket.periodDistance(currentTime)
let recentAvgUsage = bucket.getUsageAverageSince(distance)
let compensation = bucket.calcCompensation(recentAvgUsage)
bucket.budget = bucket.budgetCap + compensation
bucket.lastTimeFull = currentTime
proc update(bucket: TokenBucket, currentTime: Moment) =
if bucket.replenishMode == ReplenishMode.Compensating:
updateWithCompensation(bucket, currentTime)
else:
updateStrict(bucket, currentTime)
proc tryConsume*(bucket: TokenBucket, tokens: int, now = Moment.now()): bool =
## If `tokens` are available, consume them,
## Otherwhise, return false.
if bucket.budget >= bucket.budgetCap:
bucket.lastTimeFull = now
if bucket.budget >= tokens:
bucket.budget -= tokens
return true
bucket.update(now)
if bucket.budget >= tokens:
bucket.budget -= tokens
return true
else:
return false
proc replenish*(bucket: TokenBucket, tokens: int, now = Moment.now()) =
## Add `tokens` to the budget (capped to the bucket capacity)
bucket.budget += tokens
bucket.update(now)
proc new*(
T: type[TokenBucket],
budgetCap: int,
fillDuration: Duration = 1.seconds,
mode: ReplenishMode = ReplenishMode.Compensating,
): T =
assert not isZero(fillDuration)
assert budgetCap != 0
## Create different mode TokenBucket
case mode
of ReplenishMode.Strict:
return T(
budget: budgetCap,
budgetCap: budgetCap,
fillDuration: fillDuration,
lastTimeFull: Moment.now(),
replenishMode: mode,
)
of ReplenishMode.Compensating:
T(
budget: budgetCap,
budgetCap: budgetCap,
fillDuration: fillDuration,
lastTimeFull: Moment.now(),
replenishMode: mode,
maxCompensation: budgetCap.float * BUDGET_COMPENSATION_LIMIT_PERCENT,
)
proc newStrict*(T: type[TokenBucket], capacity: int, period: Duration): TokenBucket =
T.new(capacity, period, ReplenishMode.Strict)
proc newCompensating*(
T: type[TokenBucket], capacity: int, period: Duration
): TokenBucket =
T.new(capacity, period, ReplenishMode.Compensating)
func `$`*(b: TokenBucket): string {.inline.} =
if isNil(b):
return "nil"
return $b.budgetCap & "/" & $b.fillDuration
func `$`*(ob: Option[TokenBucket]): string {.inline.} =
if ob.isNone():
return "no-limit"
return $ob.get()

View File

@ -209,6 +209,7 @@ proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] =
maxServicePeers = some(builder.maxServicePeers),
colocationLimit = builder.colocationLimit,
shardedPeerManagement = builder.shardAware,
maxConnections = builder.switchMaxConnections.get(builders.MaxConnections),
)
var node: WakuNode

View File

@ -13,7 +13,6 @@ import
libp2p/services/autorelayservice,
libp2p/services/hpservice,
libp2p/peerid,
libp2p/discovery/rendezvousinterface,
eth/keys,
eth/p2p/discoveryv5/enr,
presto,

View File

@ -103,6 +103,7 @@ type PeerManager* = ref object of RootObj
onConnectionChange*: ConnectionChangeHandler
online: bool ## state managed by online_monitor module
getShards: GetShards
maxConnections: int
#~~~~~~~~~~~~~~~~~~~#
# Helper Functions #
@ -748,7 +749,6 @@ proc logAndMetrics(pm: PeerManager) {.async.} =
var peerStore = pm.switch.peerStore
# log metrics
let (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
let maxConnections = pm.switch.connManager.inSema.size
let notConnectedPeers =
peerStore.getDisconnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs))
let outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId))
@ -758,7 +758,7 @@ proc logAndMetrics(pm: PeerManager) {.async.} =
info "Relay peer connections",
inRelayConns = $inRelayPeers.len & "/" & $pm.inRelayPeersTarget,
outRelayConns = $outRelayPeers.len & "/" & $pm.outRelayPeersTarget,
totalConnections = $totalConnections & "/" & $maxConnections,
totalConnections = $totalConnections & "/" & $pm.maxConnections,
notConnectedPeers = notConnectedPeers.len,
outsideBackoffPeers = outsideBackoffPeers.len
@ -1048,9 +1048,9 @@ proc new*(
maxFailedAttempts = MaxFailedAttempts,
colocationLimit = DefaultColocationLimit,
shardedPeerManagement = false,
maxConnections: int = MaxConnections,
): PeerManager {.gcsafe.} =
let capacity = switch.peerStore.capacity
let maxConnections = switch.connManager.inSema.size
if maxConnections > capacity:
error "Max number of connections can't be greater than PeerManager capacity",
capacity = capacity, maxConnections = maxConnections
@ -1099,6 +1099,7 @@ proc new*(
colocationLimit: colocationLimit,
shardedPeerManagement: shardedPeerManagement,
online: true,
maxConnections: maxConnections,
)
proc peerHook(

View File

@ -8,7 +8,6 @@ import
stew/byteutils,
libp2p/protocols/rendezvous,
libp2p/protocols/rendezvous/protobuf,
libp2p/discovery/discoverymngr,
libp2p/utils/semaphore,
libp2p/utils/offsettedseq,
libp2p/crypto/curve25519,