mirror of
https://github.com/waku-org/nwaku.git
synced 2025-01-13 08:14:43 +00:00
ba418ab5ba
* DOS protection of non relay protocols - rate limit phase3: - Enhanced TokenBucket to be able to add compensation tokens based on previous usage percentage, - per peer rate limiter 'PeerRateLimier' applied on waku_filter_v2 with opinionated default of acceptable request rate - Add traffic metrics to filter message push - RequestRateLimiter added to combine simple token bucket limiting of request numbers but consider per peer usage over time and prevent some peers to over use the service (although currently rule violating peers will not be disconnected by this time only their requests will get not served) - TimedMap utility created (inspired and taken from libp2p TimedCache) which serves as forgiving feature for peers had been overusing the service. - Added more tests - Fix rebase issues - Applied new RequestRateLimiter for store and legacy_store and lightpush * Incorporate review comments, typos, file/class naming and placement changes. * Add issue link reference of the original issue with nim-chronos TokenBucket * Make TimedEntry of TimedMap private and not mixable with similar named in libp2p * Fix review comments, renamings, const instead of values and more comments.
152 lines
4.5 KiB
Nim
152 lines
4.5 KiB
Nim
{.used.}
|
|
|
|
import
|
|
std/[options, strscans],
|
|
testutils/unittests,
|
|
chronicles,
|
|
chronos,
|
|
libp2p/crypto/crypto
|
|
|
|
import
|
|
waku/[
|
|
node/peer_manager,
|
|
common/rate_limit/setting,
|
|
waku_core,
|
|
waku_lightpush,
|
|
waku_lightpush/client,
|
|
waku_lightpush/common,
|
|
waku_lightpush/protocol_metrics,
|
|
waku_lightpush/rpc,
|
|
waku_lightpush/rpc_codec,
|
|
],
|
|
../testlib/[assertions, wakucore, testasync, futures, testutils],
|
|
./lightpush_utils,
|
|
../resources/[pubsub_topics, content_topics, payloads]
|
|
|
|
suite "Rate limited push service":
|
|
asyncTest "push message with rate limit not violated":
|
|
## Setup
|
|
let
|
|
serverSwitch = newTestSwitch()
|
|
clientSwitch = newTestSwitch()
|
|
|
|
await allFutures(serverSwitch.start(), clientSwitch.start())
|
|
|
|
## Given
|
|
var handlerFuture = newFuture[(string, WakuMessage)]()
|
|
let handler: PushMessageHandler = proc(
|
|
peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage
|
|
): Future[WakuLightPushResult[void]] {.async.} =
|
|
handlerFuture.complete((pubsubTopic, message))
|
|
return ok()
|
|
|
|
let
|
|
tokenPeriod = 500.millis
|
|
server =
|
|
await newTestWakuLightpushNode(serverSwitch, handler, some((3, tokenPeriod)))
|
|
client = newTestWakuLightpushClient(clientSwitch)
|
|
|
|
let serverPeerId = serverSwitch.peerInfo.toRemotePeerInfo()
|
|
|
|
let sendMsgProc = proc(): Future[void] {.async.} =
|
|
let message = fakeWakuMessage()
|
|
|
|
handlerFuture = newFuture[(string, WakuMessage)]()
|
|
let requestRes =
|
|
await client.publish(DefaultPubsubTopic, message, peer = serverPeerId)
|
|
|
|
check await handlerFuture.withTimeout(50.millis)
|
|
|
|
assert requestRes.isOk(), requestRes.error
|
|
check handlerFuture.finished()
|
|
|
|
let (handledMessagePubsubTopic, handledMessage) = handlerFuture.read()
|
|
|
|
check:
|
|
handledMessagePubsubTopic == DefaultPubsubTopic
|
|
handledMessage == message
|
|
|
|
let waitInBetweenFor = 20.millis
|
|
|
|
# Test cannot be too explicit about the time when the TokenBucket resets
|
|
# the internal timer, although in normal use there is no use case to care about it.
|
|
var firstWaitExtend = 300.millis
|
|
|
|
for runCnt in 0 ..< 3:
|
|
let startTime = Moment.now()
|
|
for testCnt in 0 ..< 3:
|
|
await sendMsgProc()
|
|
await sleepAsync(20.millis)
|
|
|
|
var endTime = Moment.now()
|
|
var elapsed: Duration = (endTime - startTime)
|
|
await sleepAsync(tokenPeriod - elapsed + firstWaitExtend)
|
|
firstWaitEXtend = 100.millis
|
|
|
|
## Cleanup
|
|
await allFutures(clientSwitch.stop(), serverSwitch.stop())
|
|
|
|
asyncTest "push message with rate limit reject":
|
|
## Setup
|
|
let
|
|
serverSwitch = newTestSwitch()
|
|
clientSwitch = newTestSwitch()
|
|
|
|
await allFutures(serverSwitch.start(), clientSwitch.start())
|
|
|
|
## Given
|
|
var handlerFuture = newFuture[(string, WakuMessage)]()
|
|
let handler = proc(
|
|
peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage
|
|
): Future[WakuLightPushResult[void]] {.async.} =
|
|
handlerFuture.complete((pubsubTopic, message))
|
|
return ok()
|
|
|
|
let
|
|
server =
|
|
await newTestWakuLightpushNode(serverSwitch, handler, some((3, 500.millis)))
|
|
client = newTestWakuLightpushClient(clientSwitch)
|
|
|
|
let serverPeerId = serverSwitch.peerInfo.toRemotePeerInfo()
|
|
let topic = DefaultPubsubTopic
|
|
|
|
let successProc = proc(): Future[void] {.async.} =
|
|
let message = fakeWakuMessage()
|
|
handlerFuture = newFuture[(string, WakuMessage)]()
|
|
let requestRes =
|
|
await client.publish(DefaultPubsubTopic, message, peer = serverPeerId)
|
|
discard await handlerFuture.withTimeout(10.millis)
|
|
|
|
check:
|
|
requestRes.isOk()
|
|
handlerFuture.finished()
|
|
let (handledMessagePubsubTopic, handledMessage) = handlerFuture.read()
|
|
check:
|
|
handledMessagePubsubTopic == DefaultPubsubTopic
|
|
handledMessage == message
|
|
|
|
let rejectProc = proc(): Future[void] {.async.} =
|
|
let message = fakeWakuMessage()
|
|
handlerFuture = newFuture[(string, WakuMessage)]()
|
|
let requestRes =
|
|
await client.publish(DefaultPubsubTopic, message, peer = serverPeerId)
|
|
discard await handlerFuture.withTimeout(10.millis)
|
|
|
|
check:
|
|
requestRes.isErr()
|
|
requestRes.error == "TOO_MANY_REQUESTS"
|
|
|
|
for testCnt in 0 .. 2:
|
|
await successProc()
|
|
await sleepAsync(20.millis)
|
|
|
|
await rejectProc()
|
|
|
|
await sleepAsync(500.millis)
|
|
|
|
## next one shall succeed due to the rate limit time window has passed
|
|
await successProc()
|
|
|
|
## Cleanup
|
|
await allFutures(clientSwitch.stop(), serverSwitch.stop())
|