mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-10 09:53:08 +00:00
chore: use chronos' TokenBucket (#3670)
* Adapt using chronos' TokenBucket. Removed TokenBucket and test. bump nim-chronos -> nim-libp2p/nim-lsquic/nim-jwt -> adapt to latest libp2p changes * Fix libp2p/utility reports unlisted exception can occure from close of socket in waitForService - -d:ssl compile flag caused it * Adapt request_limiter to new chronos' TokenBucket replenish algorithm to keep original intent of use * Fix filter dos protection test * Fix peer manager tests due change caused by new libp2p * Adjust store test rate limit to eliminate CI test flakyness of timing * Adjust store test rate limit to eliminate CI test flakyness of timing - lightpush/legacy_lightpush/filter * Rework filter dos protection test to avoid CI crazy timing causing flakyness in test results compared to local runs * Rework lightpush dos protection test to avoid CI crazy timing causing flakyness in test results compared to local runs * Rework lightpush and legacy lightpush rate limit tests to eliminate timing effect in CI that cause longer awaits thus result in minting new tokens unlike local runs
This commit is contained in:
parent
a4e44dbe05
commit
284a0816cc
6
.gitmodules
vendored
6
.gitmodules
vendored
@ -184,6 +184,12 @@
|
||||
url = https://github.com/logos-messaging/waku-rlnv2-contract.git
|
||||
ignore = untracked
|
||||
branch = master
|
||||
[submodule "vendor/nim-lsquic"]
|
||||
path = vendor/nim-lsquic
|
||||
url = https://github.com/vacp2p/nim-lsquic
|
||||
[submodule "vendor/nim-jwt"]
|
||||
path = vendor/nim-jwt
|
||||
url = https://github.com/vacp2p/nim-jwt.git
|
||||
[submodule "vendor/nim-ffi"]
|
||||
path = vendor/nim-ffi
|
||||
url = https://github.com/logos-messaging/nim-ffi/
|
||||
|
||||
@ -6,7 +6,6 @@ import
|
||||
./test_protobuf_validation,
|
||||
./test_sqlite_migrations,
|
||||
./test_parse_size,
|
||||
./test_tokenbucket,
|
||||
./test_requestratelimiter,
|
||||
./test_ratelimit_setting,
|
||||
./test_timed_map,
|
||||
|
||||
@ -1,69 +0,0 @@
|
||||
# Chronos Test Suite
|
||||
# (c) Copyright 2022-Present
|
||||
# Status Research & Development GmbH
|
||||
#
|
||||
# Licensed under either of
|
||||
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
||||
# MIT license (LICENSE-MIT)
|
||||
|
||||
{.used.}
|
||||
|
||||
import testutils/unittests
|
||||
import chronos
|
||||
import ../../waku/common/rate_limit/token_bucket
|
||||
|
||||
suite "Token Bucket":
|
||||
test "TokenBucket Sync test - strict":
|
||||
var bucket = TokenBucket.newStrict(1000, 1.milliseconds)
|
||||
let
|
||||
start = Moment.now()
|
||||
fullTime = start + 1.milliseconds
|
||||
check:
|
||||
bucket.tryConsume(800, start) == true
|
||||
bucket.tryConsume(200, start) == true
|
||||
# Out of budget
|
||||
bucket.tryConsume(100, start) == false
|
||||
bucket.tryConsume(800, fullTime) == true
|
||||
bucket.tryConsume(200, fullTime) == true
|
||||
# Out of budget
|
||||
bucket.tryConsume(100, fullTime) == false
|
||||
|
||||
test "TokenBucket Sync test - compensating":
|
||||
var bucket = TokenBucket.new(1000, 1.milliseconds)
|
||||
let
|
||||
start = Moment.now()
|
||||
fullTime = start + 1.milliseconds
|
||||
check:
|
||||
bucket.tryConsume(800, start) == true
|
||||
bucket.tryConsume(200, start) == true
|
||||
# Out of budget
|
||||
bucket.tryConsume(100, start) == false
|
||||
bucket.tryConsume(800, fullTime) == true
|
||||
bucket.tryConsume(200, fullTime) == true
|
||||
# Due not using the bucket for a full period the compensation will satisfy this request
|
||||
bucket.tryConsume(100, fullTime) == true
|
||||
|
||||
test "TokenBucket Max compensation":
|
||||
var bucket = TokenBucket.new(1000, 1.minutes)
|
||||
var reqTime = Moment.now()
|
||||
|
||||
check bucket.tryConsume(1000, reqTime)
|
||||
check bucket.tryConsume(1, reqTime) == false
|
||||
reqTime += 1.minutes
|
||||
check bucket.tryConsume(500, reqTime) == true
|
||||
reqTime += 1.minutes
|
||||
check bucket.tryConsume(1000, reqTime) == true
|
||||
reqTime += 10.seconds
|
||||
# max compensation is 25% so try to consume 250 more
|
||||
check bucket.tryConsume(250, reqTime) == true
|
||||
reqTime += 49.seconds
|
||||
# out of budget within the same period
|
||||
check bucket.tryConsume(1, reqTime) == false
|
||||
|
||||
test "TokenBucket Short replenish":
|
||||
var bucket = TokenBucket.new(15000, 1.milliseconds)
|
||||
let start = Moment.now()
|
||||
check bucket.tryConsume(15000, start)
|
||||
check bucket.tryConsume(1, start) == false
|
||||
|
||||
check bucket.tryConsume(15000, start + 1.milliseconds) == true
|
||||
@ -997,6 +997,7 @@ procSuite "Peer Manager":
|
||||
.build(),
|
||||
maxFailedAttempts = 1,
|
||||
storage = nil,
|
||||
maxConnections = 20,
|
||||
)
|
||||
|
||||
# Create 30 peers and add them to the peerstore
|
||||
@ -1063,6 +1064,7 @@ procSuite "Peer Manager":
|
||||
backoffFactor = 2,
|
||||
maxFailedAttempts = 10,
|
||||
storage = nil,
|
||||
maxConnections = 20,
|
||||
)
|
||||
var p1: PeerId
|
||||
require p1.init("QmeuZJbXrszW2jdT7GdduSjQskPU3S7vvGWKtKgDfkDvW" & "1")
|
||||
@ -1116,6 +1118,7 @@ procSuite "Peer Manager":
|
||||
.build(),
|
||||
maxFailedAttempts = 150,
|
||||
storage = nil,
|
||||
maxConnections = 20,
|
||||
)
|
||||
|
||||
# Should result in backoff > 1 week
|
||||
@ -1131,6 +1134,7 @@ procSuite "Peer Manager":
|
||||
.build(),
|
||||
maxFailedAttempts = 10,
|
||||
storage = nil,
|
||||
maxConnections = 20,
|
||||
)
|
||||
|
||||
let pm = PeerManager.new(
|
||||
@ -1144,6 +1148,7 @@ procSuite "Peer Manager":
|
||||
.build(),
|
||||
maxFailedAttempts = 5,
|
||||
storage = nil,
|
||||
maxConnections = 20,
|
||||
)
|
||||
|
||||
asyncTest "colocationLimit is enforced by pruneConnsByIp()":
|
||||
|
||||
@ -122,24 +122,51 @@ suite "Waku Filter - DOS protection":
|
||||
check client2.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) ==
|
||||
none(FilterSubscribeErrorKind)
|
||||
|
||||
await sleepAsync(20.milliseconds)
|
||||
check client1.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) ==
|
||||
none(FilterSubscribeErrorKind)
|
||||
# Avoid using tiny sleeps to control refill behavior: CI scheduling can
|
||||
# oversleep and mint additional tokens. Instead, issue a small burst of
|
||||
# subscribe requests and require at least one TOO_MANY_REQUESTS.
|
||||
var c1SubscribeFutures = newSeq[Future[FilterSubscribeResult]]()
|
||||
for i in 0 ..< 6:
|
||||
c1SubscribeFutures.add(
|
||||
client1.wakuFilterClient.subscribe(
|
||||
serverRemotePeerInfo, pubsubTopic, contentTopicSeq
|
||||
)
|
||||
)
|
||||
|
||||
let c1Finished = await allFinished(c1SubscribeFutures)
|
||||
var c1GotTooMany = false
|
||||
for fut in c1Finished:
|
||||
check not fut.failed()
|
||||
let res = fut.read()
|
||||
if res.isErr() and res.error().kind == FilterSubscribeErrorKind.TOO_MANY_REQUESTS:
|
||||
c1GotTooMany = true
|
||||
break
|
||||
check c1GotTooMany
|
||||
|
||||
# Ensure the other client is not affected by client1's rate limit.
|
||||
check client2.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) ==
|
||||
none(FilterSubscribeErrorKind)
|
||||
await sleepAsync(20.milliseconds)
|
||||
check client1.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) ==
|
||||
none(FilterSubscribeErrorKind)
|
||||
await sleepAsync(20.milliseconds)
|
||||
check client1.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) ==
|
||||
some(FilterSubscribeErrorKind.TOO_MANY_REQUESTS)
|
||||
check client2.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) ==
|
||||
none(FilterSubscribeErrorKind)
|
||||
check client2.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) ==
|
||||
some(FilterSubscribeErrorKind.TOO_MANY_REQUESTS)
|
||||
|
||||
var c2SubscribeFutures = newSeq[Future[FilterSubscribeResult]]()
|
||||
for i in 0 ..< 6:
|
||||
c2SubscribeFutures.add(
|
||||
client2.wakuFilterClient.subscribe(
|
||||
serverRemotePeerInfo, pubsubTopic, contentTopicSeq
|
||||
)
|
||||
)
|
||||
|
||||
let c2Finished = await allFinished(c2SubscribeFutures)
|
||||
var c2GotTooMany = false
|
||||
for fut in c2Finished:
|
||||
check not fut.failed()
|
||||
let res = fut.read()
|
||||
if res.isErr() and res.error().kind == FilterSubscribeErrorKind.TOO_MANY_REQUESTS:
|
||||
c2GotTooMany = true
|
||||
break
|
||||
check c2GotTooMany
|
||||
|
||||
# ensure period of time has passed and clients can again use the service
|
||||
await sleepAsync(1000.milliseconds)
|
||||
await sleepAsync(1100.milliseconds)
|
||||
check client1.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) ==
|
||||
none(FilterSubscribeErrorKind)
|
||||
check client2.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) ==
|
||||
@ -147,29 +174,54 @@ suite "Waku Filter - DOS protection":
|
||||
|
||||
asyncTest "Ensure normal usage allowed":
|
||||
# Given
|
||||
# Rate limit setting is (3 requests / 1000ms) per peer.
|
||||
# In a token-bucket model this means:
|
||||
# - capacity = 3 tokens
|
||||
# - refill rate = 3 tokens / second => ~1 token every ~333ms
|
||||
# - each request consumes 1 token (including UNSUBSCRIBE)
|
||||
check client1.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) ==
|
||||
none(FilterSubscribeErrorKind)
|
||||
check wakuFilter.subscriptions.isSubscribed(client1.clientPeerId)
|
||||
|
||||
await sleepAsync(500.milliseconds)
|
||||
check client1.ping(serverRemotePeerInfo) == none(FilterSubscribeErrorKind)
|
||||
check wakuFilter.subscriptions.isSubscribed(client1.clientPeerId)
|
||||
# Expected remaining tokens (approx): 2
|
||||
|
||||
await sleepAsync(500.milliseconds)
|
||||
check client1.ping(serverRemotePeerInfo) == none(FilterSubscribeErrorKind)
|
||||
check wakuFilter.subscriptions.isSubscribed(client1.clientPeerId)
|
||||
|
||||
await sleepAsync(50.milliseconds)
|
||||
# After ~500ms, ~1 token refilled; PING consumes 1 => expected remaining: 2
|
||||
|
||||
await sleepAsync(500.milliseconds)
|
||||
check client1.ping(serverRemotePeerInfo) == none(FilterSubscribeErrorKind)
|
||||
check wakuFilter.subscriptions.isSubscribed(client1.clientPeerId)
|
||||
|
||||
# After another ~500ms, ~1 token refilled; PING consumes 1 => expected remaining: 2
|
||||
|
||||
check client1.unsubscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) ==
|
||||
none(FilterSubscribeErrorKind)
|
||||
check wakuFilter.subscriptions.isSubscribed(client1.clientPeerId) == false
|
||||
|
||||
await sleepAsync(50.milliseconds)
|
||||
check client1.ping(serverRemotePeerInfo) == some(FilterSubscribeErrorKind.NOT_FOUND)
|
||||
check client1.ping(serverRemotePeerInfo) == some(FilterSubscribeErrorKind.NOT_FOUND)
|
||||
await sleepAsync(50.milliseconds)
|
||||
check client1.ping(serverRemotePeerInfo) ==
|
||||
some(FilterSubscribeErrorKind.TOO_MANY_REQUESTS)
|
||||
# After unsubscribing, PING is expected to return NOT_FOUND while still
|
||||
# counting towards the rate limit.
|
||||
|
||||
# CI can oversleep / schedule slowly, which can mint extra tokens between
|
||||
# requests. To make the test robust, issue a small burst of pings and
|
||||
# require at least one TOO_MANY_REQUESTS response.
|
||||
var pingFutures = newSeq[Future[FilterSubscribeResult]]()
|
||||
for i in 0 ..< 9:
|
||||
pingFutures.add(client1.wakuFilterClient.ping(serverRemotePeerInfo))
|
||||
|
||||
let finished = await allFinished(pingFutures)
|
||||
var gotTooMany = false
|
||||
for fut in finished:
|
||||
check not fut.failed()
|
||||
let pingRes = fut.read()
|
||||
if pingRes.isErr() and pingRes.error().kind == FilterSubscribeErrorKind.TOO_MANY_REQUESTS:
|
||||
gotTooMany = true
|
||||
break
|
||||
|
||||
check gotTooMany
|
||||
|
||||
check client2.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) ==
|
||||
none(FilterSubscribeErrorKind)
|
||||
|
||||
@ -80,11 +80,12 @@ suite "Rate limited push service":
|
||||
await allFutures(serverSwitch.start(), clientSwitch.start())
|
||||
|
||||
## Given
|
||||
var handlerFuture = newFuture[(string, WakuMessage)]()
|
||||
# Don't rely on per-request timing assumptions or a single shared Future.
|
||||
# CI can be slow enough that sequential requests accidentally refill tokens.
|
||||
# Instead we issue a small burst and assert we observe at least one rejection.
|
||||
let handler = proc(
|
||||
peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage
|
||||
): Future[WakuLightPushResult] {.async.} =
|
||||
handlerFuture.complete((pubsubTopic, message))
|
||||
return lightpushSuccessResult(1)
|
||||
|
||||
let
|
||||
@ -93,45 +94,38 @@ suite "Rate limited push service":
|
||||
client = newTestWakuLightpushClient(clientSwitch)
|
||||
|
||||
let serverPeerId = serverSwitch.peerInfo.toRemotePeerInfo()
|
||||
let topic = DefaultPubsubTopic
|
||||
let tokenPeriod = 500.millis
|
||||
|
||||
let successProc = proc(): Future[void] {.async.} =
|
||||
# Fire a burst of requests; require at least one success and one rejection.
|
||||
var publishFutures = newSeq[Future[WakuLightPushResult]]()
|
||||
for i in 0 ..< 10:
|
||||
let message = fakeWakuMessage()
|
||||
handlerFuture = newFuture[(string, WakuMessage)]()
|
||||
let requestRes =
|
||||
await client.publish(some(DefaultPubsubTopic), message, serverPeerId)
|
||||
discard await handlerFuture.withTimeout(10.millis)
|
||||
publishFutures.add(
|
||||
client.publish(some(DefaultPubsubTopic), message, serverPeerId)
|
||||
)
|
||||
|
||||
check:
|
||||
requestRes.isOk()
|
||||
handlerFuture.finished()
|
||||
let (handledMessagePubsubTopic, handledMessage) = handlerFuture.read()
|
||||
check:
|
||||
handledMessagePubsubTopic == DefaultPubsubTopic
|
||||
handledMessage == message
|
||||
let finished = await allFinished(publishFutures)
|
||||
var gotOk = false
|
||||
var gotTooMany = false
|
||||
for fut in finished:
|
||||
check not fut.failed()
|
||||
let res = fut.read()
|
||||
if res.isOk():
|
||||
gotOk = true
|
||||
else:
|
||||
check res.error.code == LightPushErrorCode.TOO_MANY_REQUESTS
|
||||
check res.error.desc == some(TooManyRequestsMessage)
|
||||
gotTooMany = true
|
||||
|
||||
let rejectProc = proc(): Future[void] {.async.} =
|
||||
let message = fakeWakuMessage()
|
||||
handlerFuture = newFuture[(string, WakuMessage)]()
|
||||
let requestRes =
|
||||
await client.publish(some(DefaultPubsubTopic), message, serverPeerId)
|
||||
discard await handlerFuture.withTimeout(10.millis)
|
||||
check gotOk
|
||||
check gotTooMany
|
||||
|
||||
check:
|
||||
requestRes.isErr()
|
||||
requestRes.error.code == LightPushErrorCode.TOO_MANY_REQUESTS
|
||||
requestRes.error.desc == some(TooManyRequestsMessage)
|
||||
|
||||
for testCnt in 0 .. 2:
|
||||
await successProc()
|
||||
await sleepAsync(20.millis)
|
||||
|
||||
await rejectProc()
|
||||
|
||||
await sleepAsync(500.millis)
|
||||
|
||||
## next one shall succeed due to the rate limit time window has passed
|
||||
await successProc()
|
||||
# ensure period of time has passed and the client can again use the service
|
||||
await sleepAsync(tokenPeriod + 100.millis)
|
||||
let recoveryRes = await client.publish(
|
||||
some(DefaultPubsubTopic), fakeWakuMessage(), serverPeerId
|
||||
)
|
||||
check recoveryRes.isOk()
|
||||
|
||||
## Cleanup
|
||||
await allFutures(clientSwitch.stop(), serverSwitch.stop())
|
||||
|
||||
@ -86,58 +86,52 @@ suite "Rate limited push service":
|
||||
await allFutures(serverSwitch.start(), clientSwitch.start())
|
||||
|
||||
## Given
|
||||
var handlerFuture = newFuture[(string, WakuMessage)]()
|
||||
let handler = proc(
|
||||
peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage
|
||||
): Future[WakuLightPushResult[void]] {.async.} =
|
||||
handlerFuture.complete((pubsubTopic, message))
|
||||
return ok()
|
||||
|
||||
let
|
||||
tokenPeriod = 500.millis
|
||||
server = await newTestWakuLegacyLightpushNode(
|
||||
serverSwitch, handler, some((3, 500.millis))
|
||||
serverSwitch, handler, some((3, tokenPeriod))
|
||||
)
|
||||
client = newTestWakuLegacyLightpushClient(clientSwitch)
|
||||
|
||||
let serverPeerId = serverSwitch.peerInfo.toRemotePeerInfo()
|
||||
let topic = DefaultPubsubTopic
|
||||
|
||||
let successProc = proc(): Future[void] {.async.} =
|
||||
let message = fakeWakuMessage()
|
||||
handlerFuture = newFuture[(string, WakuMessage)]()
|
||||
let requestRes =
|
||||
await client.publish(DefaultPubsubTopic, message, peer = serverPeerId)
|
||||
discard await handlerFuture.withTimeout(10.millis)
|
||||
# Avoid assuming the exact Nth request will be rejected. With Chronos TokenBucket
|
||||
# minting semantics and real network latency, CI timing can allow refills.
|
||||
# Instead, send a short burst and require that we observe at least one rejection.
|
||||
let burstSize = 10
|
||||
var publishFutures: seq[Future[WakuLightPushResult[string]]] = @[]
|
||||
for _ in 0 ..< burstSize:
|
||||
publishFutures.add(
|
||||
client.publish(DefaultPubsubTopic, fakeWakuMessage(), peer = serverPeerId)
|
||||
)
|
||||
|
||||
check:
|
||||
requestRes.isOk()
|
||||
handlerFuture.finished()
|
||||
let (handledMessagePubsubTopic, handledMessage) = handlerFuture.read()
|
||||
check:
|
||||
handledMessagePubsubTopic == DefaultPubsubTopic
|
||||
handledMessage == message
|
||||
let finished = await allFinished(publishFutures)
|
||||
var gotOk = false
|
||||
var gotTooMany = false
|
||||
for fut in finished:
|
||||
check not fut.failed()
|
||||
let res = fut.read()
|
||||
if res.isOk():
|
||||
gotOk = true
|
||||
elif res.error == "TOO_MANY_REQUESTS":
|
||||
gotTooMany = true
|
||||
|
||||
let rejectProc = proc(): Future[void] {.async.} =
|
||||
let message = fakeWakuMessage()
|
||||
handlerFuture = newFuture[(string, WakuMessage)]()
|
||||
let requestRes =
|
||||
await client.publish(DefaultPubsubTopic, message, peer = serverPeerId)
|
||||
discard await handlerFuture.withTimeout(10.millis)
|
||||
check:
|
||||
gotOk
|
||||
gotTooMany
|
||||
|
||||
check:
|
||||
requestRes.isErr()
|
||||
requestRes.error == "TOO_MANY_REQUESTS"
|
||||
|
||||
for testCnt in 0 .. 2:
|
||||
await successProc()
|
||||
await sleepAsync(20.millis)
|
||||
|
||||
await rejectProc()
|
||||
|
||||
await sleepAsync(500.millis)
|
||||
await sleepAsync(tokenPeriod + 100.millis)
|
||||
|
||||
## next one shall succeed due to the rate limit time window has passed
|
||||
await successProc()
|
||||
let afterCooldownRes =
|
||||
await client.publish(DefaultPubsubTopic, fakeWakuMessage(), peer = serverPeerId)
|
||||
check:
|
||||
afterCooldownRes.isOk()
|
||||
|
||||
## Cleanup
|
||||
await allFutures(clientSwitch.stop(), serverSwitch.stop())
|
||||
|
||||
@ -413,7 +413,7 @@ procSuite "WakuNode - Store":
|
||||
|
||||
for count in 0 ..< 3:
|
||||
waitFor successProc()
|
||||
waitFor sleepAsync(20.millis)
|
||||
waitFor sleepAsync(5.millis)
|
||||
|
||||
waitFor failsProc()
|
||||
|
||||
|
||||
2
vendor/nim-chronos
vendored
2
vendor/nim-chronos
vendored
@ -1 +1 @@
|
||||
Subproject commit 0646c444fce7c7ed08ef6f2c9a7abfd172ffe655
|
||||
Subproject commit 85af4db764ecd3573c4704139560df3943216cf1
|
||||
1
vendor/nim-jwt
vendored
Submodule
1
vendor/nim-jwt
vendored
Submodule
@ -0,0 +1 @@
|
||||
Subproject commit 18f8378de52b241f321c1f9ea905456e89b95c6f
|
||||
2
vendor/nim-libp2p
vendored
2
vendor/nim-libp2p
vendored
@ -1 +1 @@
|
||||
Subproject commit e82080f7b1aa61c6d35fa5311b873f41eff4bb52
|
||||
Subproject commit eb7e6ff89889e41b57515f891ba82986c54809fb
|
||||
1
vendor/nim-lsquic
vendored
Submodule
1
vendor/nim-lsquic
vendored
Submodule
@ -0,0 +1 @@
|
||||
Subproject commit f3fe33462601ea34eb2e8e9c357c92e61f8d121b
|
||||
@ -31,6 +31,8 @@ requires "nim >= 2.2.4",
|
||||
"results",
|
||||
"db_connector",
|
||||
"minilru",
|
||||
"lsquic",
|
||||
"jwt",
|
||||
"ffi"
|
||||
|
||||
### Helper functions
|
||||
@ -148,7 +150,8 @@ task chat2, "Build example Waku chat usage":
|
||||
let name = "chat2"
|
||||
buildBinary name,
|
||||
"apps/chat2/",
|
||||
"-d:chronicles_sinks=textlines[file] -d:ssl -d:chronicles_log_level='TRACE' "
|
||||
"-d:chronicles_sinks=textlines[file] -d:chronicles_log_level='TRACE' "
|
||||
# -d:ssl - cause unlisted exception error in libp2p/utility...
|
||||
|
||||
task chat2mix, "Build example Waku chat mix usage":
|
||||
# NOTE For debugging, set debug level. For chat usage we want minimal log
|
||||
@ -158,7 +161,8 @@ task chat2mix, "Build example Waku chat mix usage":
|
||||
let name = "chat2mix"
|
||||
buildBinary name,
|
||||
"apps/chat2mix/",
|
||||
"-d:chronicles_sinks=textlines[file] -d:ssl -d:chronicles_log_level='TRACE' "
|
||||
"-d:chronicles_sinks=textlines[file] -d:chronicles_log_level='TRACE' "
|
||||
# -d:ssl - cause unlisted exception error in libp2p/utility...
|
||||
|
||||
task chat2bridge, "Build chat2bridge":
|
||||
let name = "chat2bridge"
|
||||
|
||||
@ -20,7 +20,7 @@ proc mgetOrPut(
|
||||
perPeerRateLimiter: var PerPeerRateLimiter, peerId: PeerId
|
||||
): var Option[TokenBucket] =
|
||||
return perPeerRateLimiter.peerBucket.mgetOrPut(
|
||||
peerId, newTokenBucket(perPeerRateLimiter.setting, ReplenishMode.Compensating)
|
||||
peerId, newTokenBucket(perPeerRateLimiter.setting, ReplenishMode.Continuous)
|
||||
)
|
||||
|
||||
template checkUsageLimit*(
|
||||
|
||||
@ -39,38 +39,82 @@ const SECONDS_RATIO = 3
|
||||
const MINUTES_RATIO = 2
|
||||
|
||||
type RequestRateLimiter* = ref object of RootObj
|
||||
tokenBucket: Option[TokenBucket]
|
||||
tokenBucket: TokenBucket
|
||||
setting*: Option[RateLimitSetting]
|
||||
mainBucketSetting: RateLimitSetting
|
||||
ratio: int
|
||||
peerBucketSetting*: RateLimitSetting
|
||||
peerUsage: TimedMap[PeerId, TokenBucket]
|
||||
checkUsageImpl: proc(
|
||||
t: var RequestRateLimiter, proto: string, conn: Connection, now: Moment
|
||||
): bool {.gcsafe, raises: [].}
|
||||
|
||||
proc newMainTokenBucket(
|
||||
setting: RateLimitSetting, ratio: int, startTime: Moment
|
||||
): TokenBucket =
|
||||
## RequestRateLimiter's global bucket should keep the *rate* of the configured
|
||||
## setting while allowing a larger burst window. We achieve this by scaling
|
||||
## both capacity and fillDuration by the same ratio.
|
||||
##
|
||||
## This matches previous behavior where unused tokens could effectively
|
||||
## accumulate across multiple periods.
|
||||
let burstCapacity = setting.volume * ratio
|
||||
var bucket = TokenBucket.new(
|
||||
capacity = burstCapacity,
|
||||
fillDuration = setting.period * ratio,
|
||||
startTime = startTime,
|
||||
mode = Continuous,
|
||||
)
|
||||
|
||||
# Start with the configured volume (not the burst capacity) so that the
|
||||
# initial burst behavior matches the raw setting, while still allowing
|
||||
# accumulation up to `burstCapacity` over time.
|
||||
let excess = burstCapacity - setting.volume
|
||||
if excess > 0:
|
||||
discard bucket.tryConsume(excess, startTime)
|
||||
|
||||
return bucket
|
||||
|
||||
proc mgetOrPut(
|
||||
requestRateLimiter: var RequestRateLimiter, peerId: PeerId
|
||||
requestRateLimiter: var RequestRateLimiter, peerId: PeerId, now: Moment
|
||||
): var TokenBucket =
|
||||
let bucketForNew = newTokenBucket(some(requestRateLimiter.peerBucketSetting)).valueOr:
|
||||
let bucketForNew = newTokenBucket(
|
||||
some(requestRateLimiter.peerBucketSetting), Discrete, now
|
||||
).valueOr:
|
||||
raiseAssert "This branch is not allowed to be reached as it will not be called if the setting is None."
|
||||
|
||||
return requestRateLimiter.peerUsage.mgetOrPut(peerId, bucketForNew)
|
||||
|
||||
proc checkUsage*(
|
||||
t: var RequestRateLimiter, proto: string, conn: Connection, now = Moment.now()
|
||||
): bool {.raises: [].} =
|
||||
if t.tokenBucket.isNone():
|
||||
return true
|
||||
proc checkUsageUnlimited(
|
||||
t: var RequestRateLimiter, proto: string, conn: Connection, now: Moment
|
||||
): bool {.gcsafe, raises: [].} =
|
||||
true
|
||||
|
||||
let peerBucket = t.mgetOrPut(conn.peerId)
|
||||
proc checkUsageLimited(
|
||||
t: var RequestRateLimiter, proto: string, conn: Connection, now: Moment
|
||||
): bool {.gcsafe, raises: [].} =
|
||||
# Lazy-init the main bucket using the first observed request time. This makes
|
||||
# refill behavior deterministic under tests where `now` is controlled.
|
||||
if isNil(t.tokenBucket):
|
||||
t.tokenBucket = newMainTokenBucket(t.mainBucketSetting, t.ratio, now)
|
||||
|
||||
let peerBucket = t.mgetOrPut(conn.peerId, now)
|
||||
## check requesting peer's usage is not over the calculated ratio and let that peer go which not requested much/or this time...
|
||||
if not peerBucket.tryConsume(1, now):
|
||||
trace "peer usage limit reached", peer = conn.peerId
|
||||
return false
|
||||
|
||||
# Ok if the peer can consume, check the overall budget we have left
|
||||
let tokenBucket = t.tokenBucket.get()
|
||||
if not tokenBucket.tryConsume(1, now):
|
||||
if not t.tokenBucket.tryConsume(1, now):
|
||||
return false
|
||||
|
||||
return true
|
||||
|
||||
proc checkUsage*(
|
||||
t: var RequestRateLimiter, proto: string, conn: Connection, now = Moment.now()
|
||||
): bool {.raises: [].} =
|
||||
t.checkUsageImpl(t, proto, conn, now)
|
||||
|
||||
template checkUsageLimit*(
|
||||
t: var RequestRateLimiter,
|
||||
proto: string,
|
||||
@ -135,9 +179,19 @@ func calcPeerTokenSetting(
|
||||
|
||||
proc newRequestRateLimiter*(setting: Option[RateLimitSetting]): RequestRateLimiter =
|
||||
let ratio = calcPeriodRatio(setting)
|
||||
let isLimited = setting.isSome() and not setting.get().isUnlimited()
|
||||
let mainBucketSetting =
|
||||
if isLimited:
|
||||
setting.get()
|
||||
else:
|
||||
(0, 0.minutes)
|
||||
|
||||
return RequestRateLimiter(
|
||||
tokenBucket: newTokenBucket(setting),
|
||||
tokenBucket: nil,
|
||||
setting: setting,
|
||||
mainBucketSetting: mainBucketSetting,
|
||||
ratio: ratio,
|
||||
peerBucketSetting: calcPeerTokenSetting(setting, ratio),
|
||||
peerUsage: init(TimedMap[PeerId, TokenBucket], calcCacheTimeout(setting, ratio)),
|
||||
checkUsageImpl: (if isLimited: checkUsageLimited else: checkUsageUnlimited),
|
||||
)
|
||||
|
||||
@ -6,12 +6,15 @@ import std/[options], chronos/timer, libp2p/stream/connection, libp2p/utility
|
||||
|
||||
import std/times except TimeInterval, Duration
|
||||
|
||||
import ./[token_bucket, setting, service_metrics]
|
||||
import chronos/ratelimit as token_bucket
|
||||
|
||||
import ./[setting, service_metrics]
|
||||
export token_bucket, setting, service_metrics
|
||||
|
||||
proc newTokenBucket*(
|
||||
setting: Option[RateLimitSetting],
|
||||
replenishMode: ReplenishMode = ReplenishMode.Compensating,
|
||||
replenishMode: static[ReplenishMode] = ReplenishMode.Continuous,
|
||||
startTime: Moment = Moment.now(),
|
||||
): Option[TokenBucket] =
|
||||
if setting.isNone():
|
||||
return none[TokenBucket]()
|
||||
@ -19,7 +22,14 @@ proc newTokenBucket*(
|
||||
if setting.get().isUnlimited():
|
||||
return none[TokenBucket]()
|
||||
|
||||
return some(TokenBucket.new(setting.get().volume, setting.get().period))
|
||||
return some(
|
||||
TokenBucket.new(
|
||||
capacity = setting.get().volume,
|
||||
fillDuration = setting.get().period,
|
||||
startTime = startTime,
|
||||
mode = replenishMode,
|
||||
)
|
||||
)
|
||||
|
||||
proc checkUsage(
|
||||
t: var TokenBucket, proto: string, now = Moment.now()
|
||||
|
||||
@ -1,182 +0,0 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import chronos, std/math, std/options
|
||||
|
||||
const BUDGET_COMPENSATION_LIMIT_PERCENT = 0.25
|
||||
|
||||
## This is an extract from chronos/rate_limit.nim due to the found bug in the original implementation.
|
||||
## Unfortunately that bug cannot be solved without harm the original features of TokenBucket class.
|
||||
## So, this current shortcut is used to enable move ahead with nwaku rate limiter implementation.
|
||||
## ref: https://github.com/status-im/nim-chronos/issues/500
|
||||
##
|
||||
## This version of TokenBucket is different from the original one in chronos/rate_limit.nim in many ways:
|
||||
## - It has a new mode called `Compensating` which is the default mode.
|
||||
## Compensation is calculated as the not used bucket capacity in the last measured period(s) in average.
|
||||
## or up until maximum the allowed compansation treshold (Currently it is const 25%).
|
||||
## Also compensation takes care of the proper time period calculation to avoid non-usage periods that can lead to
|
||||
## overcompensation.
|
||||
## - Strict mode is also available which will only replenish when time period is over but also will fill
|
||||
## the bucket to the max capacity.
|
||||
|
||||
type
|
||||
ReplenishMode* = enum
|
||||
Strict
|
||||
Compensating
|
||||
|
||||
TokenBucket* = ref object
|
||||
budget: int ## Current number of tokens in the bucket
|
||||
budgetCap: int ## Bucket capacity
|
||||
lastTimeFull: Moment
|
||||
## This timer measures the proper periodizaiton of the bucket refilling
|
||||
fillDuration: Duration ## Refill period
|
||||
case replenishMode*: ReplenishMode
|
||||
of Strict:
|
||||
## In strict mode, the bucket is refilled only till the budgetCap
|
||||
discard
|
||||
of Compensating:
|
||||
## This is the default mode.
|
||||
maxCompensation: float
|
||||
|
||||
func periodDistance(bucket: TokenBucket, currentTime: Moment): float =
|
||||
## notice fillDuration cannot be zero by design
|
||||
## period distance is a float number representing the calculated period time
|
||||
## since the last time bucket was refilled.
|
||||
return
|
||||
nanoseconds(currentTime - bucket.lastTimeFull).float /
|
||||
nanoseconds(bucket.fillDuration).float
|
||||
|
||||
func getUsageAverageSince(bucket: TokenBucket, distance: float): float =
|
||||
if distance == 0.float:
|
||||
## in case there is zero time difference than the usage percentage is 100%
|
||||
return 1.0
|
||||
|
||||
## budgetCap can never be zero
|
||||
## usage average is calculated as a percentage of total capacity available over
|
||||
## the measured period
|
||||
return bucket.budget.float / bucket.budgetCap.float / distance
|
||||
|
||||
proc calcCompensation(bucket: TokenBucket, averageUsage: float): int =
|
||||
# if we already fully used or even overused the tokens, there is no place for compensation
|
||||
if averageUsage >= 1.0:
|
||||
return 0
|
||||
|
||||
## compensation is the not used bucket capacity in the last measured period(s) in average.
|
||||
## or maximum the allowed compansation treshold
|
||||
let compensationPercent =
|
||||
min((1.0 - averageUsage) * bucket.budgetCap.float, bucket.maxCompensation)
|
||||
return trunc(compensationPercent).int
|
||||
|
||||
func periodElapsed(bucket: TokenBucket, currentTime: Moment): bool =
|
||||
return currentTime - bucket.lastTimeFull >= bucket.fillDuration
|
||||
|
||||
## Update will take place if bucket is empty and trying to consume tokens.
|
||||
## It checks if the bucket can be replenished as refill duration is passed or not.
|
||||
## - strict mode:
|
||||
proc updateStrict(bucket: TokenBucket, currentTime: Moment) =
|
||||
if bucket.fillDuration == default(Duration):
|
||||
bucket.budget = min(bucket.budgetCap, bucket.budget)
|
||||
return
|
||||
|
||||
if not periodElapsed(bucket, currentTime):
|
||||
return
|
||||
|
||||
bucket.budget = bucket.budgetCap
|
||||
bucket.lastTimeFull = currentTime
|
||||
|
||||
## - compensating - ballancing load:
|
||||
## - between updates we calculate average load (current bucket capacity / number of periods till last update)
|
||||
## - gives the percentage load used recently
|
||||
## - with this we can replenish bucket up to 100% + calculated leftover from previous period (caped with max treshold)
|
||||
proc updateWithCompensation(bucket: TokenBucket, currentTime: Moment) =
|
||||
if bucket.fillDuration == default(Duration):
|
||||
bucket.budget = min(bucket.budgetCap, bucket.budget)
|
||||
return
|
||||
|
||||
# do not replenish within the same period
|
||||
if not periodElapsed(bucket, currentTime):
|
||||
return
|
||||
|
||||
let distance = bucket.periodDistance(currentTime)
|
||||
let recentAvgUsage = bucket.getUsageAverageSince(distance)
|
||||
let compensation = bucket.calcCompensation(recentAvgUsage)
|
||||
|
||||
bucket.budget = bucket.budgetCap + compensation
|
||||
bucket.lastTimeFull = currentTime
|
||||
|
||||
proc update(bucket: TokenBucket, currentTime: Moment) =
|
||||
if bucket.replenishMode == ReplenishMode.Compensating:
|
||||
updateWithCompensation(bucket, currentTime)
|
||||
else:
|
||||
updateStrict(bucket, currentTime)
|
||||
|
||||
proc tryConsume*(bucket: TokenBucket, tokens: int, now = Moment.now()): bool =
|
||||
## If `tokens` are available, consume them,
|
||||
## Otherwhise, return false.
|
||||
|
||||
if bucket.budget >= bucket.budgetCap:
|
||||
bucket.lastTimeFull = now
|
||||
|
||||
if bucket.budget >= tokens:
|
||||
bucket.budget -= tokens
|
||||
return true
|
||||
|
||||
bucket.update(now)
|
||||
|
||||
if bucket.budget >= tokens:
|
||||
bucket.budget -= tokens
|
||||
return true
|
||||
else:
|
||||
return false
|
||||
|
||||
proc replenish*(bucket: TokenBucket, tokens: int, now = Moment.now()) =
|
||||
## Add `tokens` to the budget (capped to the bucket capacity)
|
||||
bucket.budget += tokens
|
||||
bucket.update(now)
|
||||
|
||||
proc new*(
|
||||
T: type[TokenBucket],
|
||||
budgetCap: int,
|
||||
fillDuration: Duration = 1.seconds,
|
||||
mode: ReplenishMode = ReplenishMode.Compensating,
|
||||
): T =
|
||||
assert not isZero(fillDuration)
|
||||
assert budgetCap != 0
|
||||
|
||||
## Create different mode TokenBucket
|
||||
case mode
|
||||
of ReplenishMode.Strict:
|
||||
return T(
|
||||
budget: budgetCap,
|
||||
budgetCap: budgetCap,
|
||||
fillDuration: fillDuration,
|
||||
lastTimeFull: Moment.now(),
|
||||
replenishMode: mode,
|
||||
)
|
||||
of ReplenishMode.Compensating:
|
||||
T(
|
||||
budget: budgetCap,
|
||||
budgetCap: budgetCap,
|
||||
fillDuration: fillDuration,
|
||||
lastTimeFull: Moment.now(),
|
||||
replenishMode: mode,
|
||||
maxCompensation: budgetCap.float * BUDGET_COMPENSATION_LIMIT_PERCENT,
|
||||
)
|
||||
|
||||
proc newStrict*(T: type[TokenBucket], capacity: int, period: Duration): TokenBucket =
|
||||
T.new(capacity, period, ReplenishMode.Strict)
|
||||
|
||||
proc newCompensating*(
|
||||
T: type[TokenBucket], capacity: int, period: Duration
|
||||
): TokenBucket =
|
||||
T.new(capacity, period, ReplenishMode.Compensating)
|
||||
|
||||
func `$`*(b: TokenBucket): string {.inline.} =
|
||||
if isNil(b):
|
||||
return "nil"
|
||||
return $b.budgetCap & "/" & $b.fillDuration
|
||||
|
||||
func `$`*(ob: Option[TokenBucket]): string {.inline.} =
|
||||
if ob.isNone():
|
||||
return "no-limit"
|
||||
|
||||
return $ob.get()
|
||||
@ -209,6 +209,7 @@ proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] =
|
||||
maxServicePeers = some(builder.maxServicePeers),
|
||||
colocationLimit = builder.colocationLimit,
|
||||
shardedPeerManagement = builder.shardAware,
|
||||
maxConnections = builder.switchMaxConnections.get(builders.MaxConnections),
|
||||
)
|
||||
|
||||
var node: WakuNode
|
||||
|
||||
@ -13,7 +13,6 @@ import
|
||||
libp2p/services/autorelayservice,
|
||||
libp2p/services/hpservice,
|
||||
libp2p/peerid,
|
||||
libp2p/discovery/rendezvousinterface,
|
||||
eth/keys,
|
||||
eth/p2p/discoveryv5/enr,
|
||||
presto,
|
||||
|
||||
@ -103,6 +103,7 @@ type PeerManager* = ref object of RootObj
|
||||
onConnectionChange*: ConnectionChangeHandler
|
||||
online: bool ## state managed by online_monitor module
|
||||
getShards: GetShards
|
||||
maxConnections: int
|
||||
|
||||
#~~~~~~~~~~~~~~~~~~~#
|
||||
# Helper Functions #
|
||||
@ -748,7 +749,6 @@ proc logAndMetrics(pm: PeerManager) {.async.} =
|
||||
var peerStore = pm.switch.peerStore
|
||||
# log metrics
|
||||
let (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
|
||||
let maxConnections = pm.switch.connManager.inSema.size
|
||||
let notConnectedPeers =
|
||||
peerStore.getDisconnectedPeers().mapIt(RemotePeerInfo.init(it.peerId, it.addrs))
|
||||
let outsideBackoffPeers = notConnectedPeers.filterIt(pm.canBeConnected(it.peerId))
|
||||
@ -758,7 +758,7 @@ proc logAndMetrics(pm: PeerManager) {.async.} =
|
||||
info "Relay peer connections",
|
||||
inRelayConns = $inRelayPeers.len & "/" & $pm.inRelayPeersTarget,
|
||||
outRelayConns = $outRelayPeers.len & "/" & $pm.outRelayPeersTarget,
|
||||
totalConnections = $totalConnections & "/" & $maxConnections,
|
||||
totalConnections = $totalConnections & "/" & $pm.maxConnections,
|
||||
notConnectedPeers = notConnectedPeers.len,
|
||||
outsideBackoffPeers = outsideBackoffPeers.len
|
||||
|
||||
@ -1048,9 +1048,9 @@ proc new*(
|
||||
maxFailedAttempts = MaxFailedAttempts,
|
||||
colocationLimit = DefaultColocationLimit,
|
||||
shardedPeerManagement = false,
|
||||
maxConnections: int = MaxConnections,
|
||||
): PeerManager {.gcsafe.} =
|
||||
let capacity = switch.peerStore.capacity
|
||||
let maxConnections = switch.connManager.inSema.size
|
||||
if maxConnections > capacity:
|
||||
error "Max number of connections can't be greater than PeerManager capacity",
|
||||
capacity = capacity, maxConnections = maxConnections
|
||||
@ -1099,6 +1099,7 @@ proc new*(
|
||||
colocationLimit: colocationLimit,
|
||||
shardedPeerManagement: shardedPeerManagement,
|
||||
online: true,
|
||||
maxConnections: maxConnections,
|
||||
)
|
||||
|
||||
proc peerHook(
|
||||
|
||||
@ -8,7 +8,6 @@ import
|
||||
stew/byteutils,
|
||||
libp2p/protocols/rendezvous,
|
||||
libp2p/protocols/rendezvous/protobuf,
|
||||
libp2p/discovery/discoverymngr,
|
||||
libp2p/utils/semaphore,
|
||||
libp2p/utils/offsettedseq,
|
||||
libp2p/crypto/curve25519,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user