diff --git a/tests/common/test_all.nim b/tests/common/test_all.nim index fbc8ada09..1afef23a9 100644 --- a/tests/common/test_all.nim +++ b/tests/common/test_all.nim @@ -5,4 +5,7 @@ import ./test_envvar_serialization, ./test_protobuf_validation, ./test_sqlite_migrations, - ./test_parse_size + ./test_parse_size, + ./test_tokenbucket, + ./test_requestratelimiter, + ./test_timed_map diff --git a/tests/common/test_requestratelimiter.nim b/tests/common/test_requestratelimiter.nim new file mode 100644 index 000000000..256e48118 --- /dev/null +++ b/tests/common/test_requestratelimiter.nim @@ -0,0 +1,84 @@ +# Chronos Test Suite +# (c) Copyright 2022-Present +# Status Research & Development GmbH +# +# Licensed under either of +# Apache License, version 2.0, (LICENSE-APACHEv2) +# MIT license (LICENSE-MIT) + +{.used.} + +import testutils/unittests +import chronos, libp2p/stream/connection +import std/[sequtils, options] + +import ../../waku/common/rate_limit/request_limiter +import ../../waku/common/rate_limit/timed_map + +let proto = "ProtocolDescriptor" + +let conn1 = Connection(peerId: PeerId.random().tryGet()) +let conn2 = Connection(peerId: PeerId.random().tryGet()) +let conn3 = Connection(peerId: PeerId.random().tryGet()) + +suite "RequestRateLimiter": + test "RequestRateLimiter Allow up to main bucket": + # keep limits low for easier calculation of ratios + let rateLimit: RateLimitSetting = (4, 2.minutes) + var limiter = newRequestRateLimiter(some(rateLimit)) + # per peer tokens will be 6 / 4min + # as ratio is 2 in this case but max tokens are main tokens*ratio . 0.75 + # notice meanwhile we have 8 global tokens over 2 period (4 mins) in sum + # See: waku/common/rate_limit/request_limiter.nim #func calcPeriodRatio + + let now = Moment.now() + # with first use we register the peer also and start its timer + check limiter.checkUsage(proto, conn2, now) == true + for i in 0 ..< 3: + check limiter.checkUsage(proto, conn1, now) == true + + check limiter.checkUsage(proto, conn2, now + 3.minutes) == true + for i in 0 ..< 3: + check limiter.checkUsage(proto, conn1, now + 3.minutes) == true + + # conn1 reached the 75% of the main bucket over 2 periods of time + check limiter.checkUsage(proto, conn1, now + 3.minutes) == false + + # conn2 has not used its tokens while we have 1 more tokens left in the main bucket + check limiter.checkUsage(proto, conn2, now + 3.minutes) == true + + test "RequestRateLimiter Restrict overusing peer": + # keep limits low for easier calculation of ratios + let rateLimit: RateLimitSetting = (10, 2.minutes) + var limiter = newRequestRateLimiter(some(rateLimit)) + # per peer tokens will be 15 / 4min + # as ratio is 2 in this case but max tokens are main tokens*ratio . 0.75 + # notice meanwhile we have 20 tokens over 2 period (4 mins) in sum + # See: waku/common/rate_limit/request_limiter.nim #func calcPeriodRatio + + let now = Moment.now() + # with first use we register the peer also and start its timer + for i in 0 ..< 10: + check limiter.checkUsage(proto, conn1, now) == true + + # run out of main tokens but still used one more token from the peer's bucket + check limiter.checkUsage(proto, conn1, now) == false + + for i in 0 ..< 4: + check limiter.checkUsage(proto, conn1, now + 3.minutes) == true + + # conn1 reached the 75% of the main bucket over 2 periods of time + check limiter.checkUsage(proto, conn1, now + 3.minutes) == false + + check limiter.checkUsage(proto, conn2, now + 3.minutes) == true + check limiter.checkUsage(proto, conn2, now + 3.minutes) == true + check limiter.checkUsage(proto, conn3, now + 3.minutes) == true + check limiter.checkUsage(proto, conn2, now + 3.minutes) == true + check limiter.checkUsage(proto, conn3, now + 3.minutes) == true + + # conn1 gets replenished as the ratio was 2 giving twice as long replenish period than the main bucket + # see waku/common/rate_limit/request_limiter.nim #func calcPeriodRatio and calcPeerTokenSetting + check limiter.checkUsage(proto, conn1, now + 4.minutes) == true + # requests of other peers can also go + check limiter.checkUsage(proto, conn2, now + 4100.milliseconds) == true + check limiter.checkUsage(proto, conn3, now + 5.minutes) == true diff --git a/tests/common/test_timed_map.nim b/tests/common/test_timed_map.nim new file mode 100644 index 000000000..3b063d9dd --- /dev/null +++ b/tests/common/test_timed_map.nim @@ -0,0 +1,60 @@ +{.used.} + +import unittest2 +import chronos/timer +import ../../waku/common/rate_limit/timed_map + +suite "TimedMap": + test "put/get": + var cache = TimedMap[int, string].init(5.seconds) + + let now = Moment.now() + check: + cache.mgetOrPut(1, "1", now) == "1" + cache.mgetOrPut(1, "1", now + 1.seconds) == "1" + cache.mgetOrPut(2, "2", now + 4.seconds) == "2" + + check: + 1 in cache + 2 in cache + + check: + cache.mgetOrPut(3, "3", now + 6.seconds) == "3" + # expires 1 + + check: + 1 notin cache + 2 in cache + 3 in cache + + cache.addedAt(2) == now + 4.seconds + + check: + cache.mgetOrPut(2, "modified2", now + 8.seconds) == "2" # refreshes 2 + cache.mgetOrPut(4, "4", now + 12.seconds) == "4" # expires 3 + + check: + 2 in cache + 3 notin cache + 4 in cache + + check: + cache.remove(4).isSome() + 4 notin cache + + check: + cache.mgetOrPut(100, "100", now + 100.seconds) == "100" # expires everything + 100 in cache + 2 notin cache + + test "enough items to force cache heap storage growth": + var cache = TimedMap[int, string].init(5.seconds) + + let now = Moment.now() + for i in 101 .. 100000: + check: + cache.mgetOrPut(i, $i, now) == $i + + for i in 101 .. 100000: + check: + i in cache diff --git a/tests/common/test_tokenbucket.nim b/tests/common/test_tokenbucket.nim new file mode 100644 index 000000000..5bc1a0583 --- /dev/null +++ b/tests/common/test_tokenbucket.nim @@ -0,0 +1,69 @@ +# Chronos Test Suite +# (c) Copyright 2022-Present +# Status Research & Development GmbH +# +# Licensed under either of +# Apache License, version 2.0, (LICENSE-APACHEv2) +# MIT license (LICENSE-MIT) + +{.used.} + +import testutils/unittests +import chronos +import ../../waku/common/rate_limit/token_bucket + +suite "Token Bucket": + test "TokenBucket Sync test - strict": + var bucket = TokenBucket.newStrict(1000, 1.milliseconds) + let + start = Moment.now() + fullTime = start + 1.milliseconds + check: + bucket.tryConsume(800, start) == true + bucket.tryConsume(200, start) == true + # Out of budget + bucket.tryConsume(100, start) == false + bucket.tryConsume(800, fullTime) == true + bucket.tryConsume(200, fullTime) == true + # Out of budget + bucket.tryConsume(100, fullTime) == false + + test "TokenBucket Sync test - compensating": + var bucket = TokenBucket.new(1000, 1.milliseconds) + let + start = Moment.now() + fullTime = start + 1.milliseconds + check: + bucket.tryConsume(800, start) == true + bucket.tryConsume(200, start) == true + # Out of budget + bucket.tryConsume(100, start) == false + bucket.tryConsume(800, fullTime) == true + bucket.tryConsume(200, fullTime) == true + # Due not using the bucket for a full period the compensation will satisfy this request + bucket.tryConsume(100, fullTime) == true + + test "TokenBucket Max compensation": + var bucket = TokenBucket.new(1000, 1.minutes) + var reqTime = Moment.now() + + check bucket.tryConsume(1000, reqTime) + check bucket.tryConsume(1, reqTime) == false + reqTime += 1.minutes + check bucket.tryConsume(500, reqTime) == true + reqTime += 1.minutes + check bucket.tryConsume(1000, reqTime) == true + reqTime += 10.seconds + # max compensation is 25% so try to consume 250 more + check bucket.tryConsume(250, reqTime) == true + reqTime += 49.seconds + # out of budget within the same period + check bucket.tryConsume(1, reqTime) == false + + test "TokenBucket Short replenish": + var bucket = TokenBucket.new(15000, 1.milliseconds) + let start = Moment.now() + check bucket.tryConsume(15000, start) + check bucket.tryConsume(1, start) == false + + check bucket.tryConsume(15000, start + 1.milliseconds) == true diff --git a/tests/waku_filter_v2/test_all.nim b/tests/waku_filter_v2/test_all.nim index 4f09f28c2..a1a43c140 100644 --- a/tests/waku_filter_v2/test_all.nim +++ b/tests/waku_filter_v2/test_all.nim @@ -1,3 +1,4 @@ {.used.} -import ./test_waku_client, ./test_waku_filter_protocol +import + ./test_waku_client, ./test_waku_filter_protocol, ./test_waku_filter_dos_protection diff --git a/tests/waku_filter_v2/test_waku_filter_dos_protection.nim b/tests/waku_filter_v2/test_waku_filter_dos_protection.nim new file mode 100644 index 000000000..ae84d9fa0 --- /dev/null +++ b/tests/waku_filter_v2/test_waku_filter_dos_protection.nim @@ -0,0 +1,183 @@ +{.used.} + +import + std/[options, tables, sequtils, strutils, json], + testutils/unittests, + stew/[results, byteutils], + chronos, + chronicles, + os, + libp2p/peerstore + +import + waku/[ + node/peer_manager, + waku_core, + common/rate_limit/setting, + common/rate_limit/token_bucket, + ], + waku/waku_filter_v2/[common, client, subscriptions, protocol, rpc_codec], + ../testlib/[wakucore, testasync, testutils, futures, sequtils], + ./waku_filter_utils, + ../resources/payloads + +type AFilterClient = ref object of RootObj + clientSwitch*: Switch + wakuFilterClient*: WakuFilterClient + clientPeerId*: PeerId + messagePushHandler*: FilterPushHandler + msgSeq*: seq[(PubsubTopic, WakuMessage)] + pushHandlerFuture*: Future[(PubsubTopic, WakuMessage)] + +proc init(T: type[AFilterClient]): T = + var r = T( + clientSwitch: newStandardSwitch(), + msgSeq: @[], + pushHandlerFuture: newPushHandlerFuture(), + ) + r.wakuFilterClient = waitFor newTestWakuFilterClient(r.clientSwitch) + r.messagePushHandler = proc( + pubsubTopic: PubsubTopic, message: WakuMessage + ): Future[void] {.async, closure, gcsafe.} = + r.msgSeq.add((pubsubTopic, message)) + r.pushHandlerFuture.complete((pubsubTopic, message)) + + r.clientPeerId = r.clientSwitch.peerInfo.toRemotePeerInfo().peerId + r.wakuFilterClient.registerPushHandler(r.messagePushHandler) + return r + +proc subscribe( + client: AFilterClient, + serverRemotePeerInfo: RemotePeerInfo, + pubsubTopic: PubsubTopic, + contentTopicSeq: seq[ContentTopic], +): Option[FilterSubscribeErrorKind] = + let subscribeResponse = waitFor client.wakuFilterClient.subscribe( + serverRemotePeerInfo, pubsubTopic, contentTopicSeq + ) + if subscribeResponse.isOk(): + return none[FilterSubscribeErrorKind]() + + return some(subscribeResponse.error().kind) + +proc unsubscribe( + client: AFilterClient, + serverRemotePeerInfo: RemotePeerInfo, + pubsubTopic: PubsubTopic, + contentTopicSeq: seq[ContentTopic], +): Option[FilterSubscribeErrorKind] = + let unsubscribeResponse = waitFor client.wakuFilterClient.unsubscribe( + serverRemotePeerInfo, pubsubTopic, contentTopicSeq + ) + if unsubscribeResponse.isOk(): + return none[FilterSubscribeErrorKind]() + + return some(unsubscribeResponse.error().kind) + +proc ping( + client: AFilterClient, serverRemotePeerInfo: RemotePeerInfo +): Option[FilterSubscribeErrorKind] = + let pingResponse = waitFor client.wakuFilterClient.ping(serverRemotePeerInfo) + if pingResponse.isOk(): + return none[FilterSubscribeErrorKind]() + + return some(pingResponse.error().kind) + +suite "Waku Filter - DOS protection": + var serverSwitch {.threadvar.}: Switch + var client1 {.threadvar.}: AFilterClient + var client2 {.threadvar.}: AFilterClient + var wakuFilter {.threadvar.}: WakuFilter + var serverRemotePeerInfo {.threadvar.}: RemotePeerInfo + var pubsubTopic {.threadvar.}: PubsubTopic + var contentTopic {.threadvar.}: ContentTopic + var contentTopicSeq {.threadvar.}: seq[ContentTopic] + + asyncSetup: + client1 = AFilterClient.init() + client2 = AFilterClient.init() + + pubsubTopic = DefaultPubsubTopic + contentTopic = DefaultContentTopic + contentTopicSeq = @[contentTopic] + serverSwitch = newStandardSwitch() + wakuFilter = await newTestWakuFilter( + serverSwitch, rateLimitSetting = some((3, 1000.milliseconds)) + ) + + await allFutures( + serverSwitch.start(), client1.clientSwitch.start(), client2.clientSwitch.start() + ) + serverRemotePeerInfo = serverSwitch.peerInfo.toRemotePeerInfo() + client1.clientPeerId = client1.clientSwitch.peerInfo.toRemotePeerInfo().peerId + client2.clientPeerId = client2.clientSwitch.peerInfo.toRemotePeerInfo().peerId + + asyncTeardown: + await allFutures( + wakuFilter.stop(), + client1.wakuFilterClient.stop(), + client2.wakuFilterClient.stop(), + serverSwitch.stop(), + client1.clientSwitch.stop(), + client2.clientSwitch.stop(), + ) + + asyncTest "Limit number of subscriptions requests": + # Given + check client1.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) == + none(FilterSubscribeErrorKind) + check client2.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) == + none(FilterSubscribeErrorKind) + + await sleepAsync(20.milliseconds) + check client1.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) == + none(FilterSubscribeErrorKind) + check client2.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) == + none(FilterSubscribeErrorKind) + await sleepAsync(20.milliseconds) + check client1.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) == + none(FilterSubscribeErrorKind) + await sleepAsync(20.milliseconds) + check client1.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) == + some(FilterSubscribeErrorKind.TOO_MANY_REQUESTS) + check client2.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) == + none(FilterSubscribeErrorKind) + check client2.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) == + some(FilterSubscribeErrorKind.TOO_MANY_REQUESTS) + + # ensure period of time has passed and clients can again use the service + await sleepAsync(600.milliseconds) + check client1.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) == + none(FilterSubscribeErrorKind) + check client2.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) == + none(FilterSubscribeErrorKind) + + asyncTest "Ensure normal usage allowed": + # Given + check client1.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) == + none(FilterSubscribeErrorKind) + check wakuFilter.subscriptions.isSubscribed(client1.clientPeerId) + + await sleepAsync(500.milliseconds) + check client1.ping(serverRemotePeerInfo) == none(FilterSubscribeErrorKind) + check wakuFilter.subscriptions.isSubscribed(client1.clientPeerId) + + await sleepAsync(500.milliseconds) + check client1.ping(serverRemotePeerInfo) == none(FilterSubscribeErrorKind) + check wakuFilter.subscriptions.isSubscribed(client1.clientPeerId) + + await sleepAsync(50.milliseconds) + check client1.unsubscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) == + none(FilterSubscribeErrorKind) + check wakuFilter.subscriptions.isSubscribed(client1.clientPeerId) == false + + await sleepAsync(50.milliseconds) + check client1.ping(serverRemotePeerInfo) == some(FilterSubscribeErrorKind.NOT_FOUND) + check client1.ping(serverRemotePeerInfo) == some(FilterSubscribeErrorKind.NOT_FOUND) + await sleepAsync(50.milliseconds) + check client1.ping(serverRemotePeerInfo) == + some(FilterSubscribeErrorKind.TOO_MANY_REQUESTS) + + check client2.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) == + none(FilterSubscribeErrorKind) + check wakuFilter.subscriptions.isSubscribed(client2.clientPeerId) == true diff --git a/tests/waku_filter_v2/waku_filter_utils.nim b/tests/waku_filter_v2/waku_filter_utils.nim index 446dee79d..5698949c5 100644 --- a/tests/waku_filter_v2/waku_filter_utils.nim +++ b/tests/waku_filter_v2/waku_filter_utils.nim @@ -7,6 +7,7 @@ import waku_filter_v2/client, waku_filter_v2/subscriptions, waku_core, + common/rate_limit/setting, ], ../testlib/[common, wakucore] @@ -15,11 +16,16 @@ proc newTestWakuFilter*( subscriptionTimeout: Duration = DefaultSubscriptionTimeToLiveSec, maxFilterPeers: uint32 = MaxFilterPeers, maxFilterCriteriaPerPeer: uint32 = MaxFilterCriteriaPerPeer, + rateLimitSetting: Option[RateLimitSetting] = none[RateLimitSetting](), ): Future[WakuFilter] {.async.} = let peerManager = PeerManager.new(switch) proto = WakuFilter.new( - peerManager, subscriptionTimeout, maxFilterPeers, maxFilterCriteriaPerPeer + peerManager, + subscriptionTimeout, + maxFilterPeers, + maxFilterCriteriaPerPeer, + rateLimitSetting = rateLimitSetting, ) await proto.start() diff --git a/tests/waku_lightpush/lightpush_utils.nim b/tests/waku_lightpush/lightpush_utils.nim index f5c5ca5e6..45bbe125c 100644 --- a/tests/waku_lightpush/lightpush_utils.nim +++ b/tests/waku_lightpush/lightpush_utils.nim @@ -7,6 +7,7 @@ import waku/waku_core, waku/waku_lightpush, waku/waku_lightpush/[client, common], + waku/common/rate_limit/setting, ../testlib/[common, wakucore] proc newTestWakuLightpushNode*( diff --git a/tests/waku_lightpush/test_ratelimit.nim b/tests/waku_lightpush/test_ratelimit.nim index 280b49d42..148cca3c9 100644 --- a/tests/waku_lightpush/test_ratelimit.nim +++ b/tests/waku_lightpush/test_ratelimit.nim @@ -10,7 +10,7 @@ import import waku/[ node/peer_manager, - common/ratelimit, + common/rate_limit/setting, waku_core, waku_lightpush, waku_lightpush/client, diff --git a/tests/wakunode_rest/test_rest_lightpush.nim b/tests/wakunode_rest/test_rest_lightpush.nim index 0922ba322..1e6ef9e83 100644 --- a/tests/wakunode_rest/test_rest_lightpush.nim +++ b/tests/wakunode_rest/test_rest_lightpush.nim @@ -23,7 +23,7 @@ import waku_api/rest/lightpush/handlers as lightpush_api, waku_api/rest/lightpush/client as lightpush_api_client, waku_relay, - common/ratelimit, + common/rate_limit/setting, ], ../testlib/wakucore, ../testlib/wakunode, diff --git a/waku/common/rate_limit/per_peer_limiter.nim b/waku/common/rate_limit/per_peer_limiter.nim new file mode 100644 index 000000000..0d02fe0a8 --- /dev/null +++ b/waku/common/rate_limit/per_peer_limiter.nim @@ -0,0 +1,40 @@ +## PerPeerRateLimiter +## +## With this class one can easily track usage of a service per PeerId +## Rate limit is applied separately by each peer upon first use. Also time period is counted distinct per peer. +## It will use compensating replenish mode for peers to balance the load and allow fair usage of a service. + + + +{.push raises: [].} + +import std/[options, tables], chronos/timer, libp2p/stream/connection, libp2p/utility + +import ./[single_token_limiter, service_metrics], ../../utils/tableutils + +export token_bucket, setting, service_metrics + +type PerPeerRateLimiter* = ref object of RootObj + setting*: Option[RateLimitSetting] + peerBucket: Table[PeerId, Option[TokenBucket]] + +proc mgetOrPut( + perPeerRateLimiter: var PerPeerRateLimiter, peerId: PeerId +): var Option[TokenBucket] = + return perPeerRateLimiter.peerBucket.mgetOrPut( + peerId, newTokenBucket(perPeerRateLimiter.setting, ReplenishMode.Compensating) + ) + +template checkUsageLimit*( + t: var PerPeerRateLimiter, + proto: string, + conn: Connection, + bodyWithinLimit, bodyRejected: untyped, +) = + checkUsageLimit(t.mgetOrPut(conn.peerId), proto, conn, bodyWithinLimit, bodyRejected) + +proc unregister*(perPeerRateLimiter: var PerPeerRateLimiter, peerId: PeerId) = + perPeerRateLimiter.peerBucket.del(peerId) + +proc unregister*(perPeerRateLimiter: var PerPeerRateLimiter, peerIds: seq[PeerId]) = + perPeerRateLimiter.peerBucket.keepItIf(key notin peerIds) diff --git a/waku/common/rate_limit/request_limiter.nim b/waku/common/rate_limit/request_limiter.nim new file mode 100644 index 000000000..70fcc4905 --- /dev/null +++ b/waku/common/rate_limit/request_limiter.nim @@ -0,0 +1,134 @@ +## RequestRateLimiter +## +## RequestRateLimiter is a general service protection mechanism. +## While applies an overall rate limit, it also ensure fair usage among peers. +## +## This is reached by reject peers that are constantly over using the service while allowing others to use it +## within the global limit set. +## Punished peers will also be recovered after a certain time period if not violating the limit. +## +## This is reached by calculating a ratio of the global limit and applying it to each peer. +## This ratio is applied to the allowed tokens within a ratio * the global time period. +## The allowed tokens for peers are limited to 75% of ratio * global token volume. +## +## This needs to be taken into account when setting the global limit for the specific service type and use cases. + +{.push raises: [].} + +import + std/[options, math], + chronicles, + chronos/timer, + libp2p/stream/connection, + libp2p/utility + +import ./[single_token_limiter, service_metrics, timed_map] + +export token_bucket, setting, service_metrics + +logScope: + topics = "waku ratelimit" + +const PER_PEER_ALLOWED_PERCENT_OF_VOLUME = 0.75 +const UNLIMITED_RATIO = 0 +const UNLIMITED_TIMEOUT = 0.seconds +const MILISECONDS_RATIO = 10 +const SECONDS_RATIO = 3 +const MINUTES_RATIO = 2 + +type RequestRateLimiter* = ref object of RootObj + tokenBucket: Option[TokenBucket] + setting*: Option[RateLimitSetting] + peerBucketSetting*: RateLimitSetting + peerUsage: TimedMap[PeerId, TokenBucket] + +proc mgetOrPut( + requestRateLimiter: var RequestRateLimiter, peerId: PeerId +): var TokenBucket = + let bucketForNew = newTokenBucket(some(requestRateLimiter.peerBucketSetting)).valueOr: + raiseAssert "This branch is not allowed to be reached as it will not be called if the setting is None." + + return requestRateLimiter.peerUsage.mgetOrPut(peerId, bucketForNew) + +proc checkUsage*( + t: var RequestRateLimiter, proto: string, conn: Connection, now = Moment.now() +): bool {.raises: [].} = + if t.tokenBucket.isNone(): + return true + + let peerBucket = t.mgetOrPut(conn.peerId) + ## check requesting peer's usage is not over the calculated ratio and let that peer go which not requested much/or this time... + if not peerBucket.tryConsume(1, now): + trace "peer usage limit reached", peer = conn.peerId + return false + + # Ok if the peer can consume, check the overall budget we have left + let tokenBucket = t.tokenBucket.get() + if not tokenBucket.tryConsume(1, now): + return false + + return true + +template checkUsageLimit*( + t: var RequestRateLimiter, + proto: string, + conn: Connection, + bodyWithinLimit, bodyRejected: untyped, +) = + if t.checkUsage(proto, conn): + waku_service_requests.inc(labelValues = [proto, "served"]) + bodyWithinLimit + else: + waku_service_requests.inc(labelValues = [proto, "rejected"]) + bodyRejected + +# TODO: review these ratio assumptions! Debatable! +func calcPeriodRatio(settingOpt: Option[RateLimitSetting]): int = + settingOpt.withValue(setting): + if setting.isUnlimited(): + return UNLIMITED_RATIO + + if setting.period <= 1.seconds: + return MILISECONDS_RATIO + + if setting.period <= 1.minutes: + return SECONDS_RATIO + + return MINUTES_RATIO + do: + # when setting is none + return UNLIMITED_RATIO + +# calculates peer cache items timeout +# effectively if a peer does not issue any requests for this amount of time will be forgotten. +func calcCacheTimeout(settingOpt: Option[RateLimitSetting], ratio: int): Duration = + settingOpt.withValue(setting): + if setting.isUnlimited(): + return UNLIMITED_TIMEOUT + + # CacheTimout for peers is double the replensih period for peers + return setting.period * ratio * 2 + do: + # when setting is none + return UNLIMITED_TIMEOUT + +func calcPeerTokenSetting( + setting: Option[RateLimitSetting], ratio: int +): RateLimitSetting = + let s = setting.valueOr: + return (0, 0.minutes) + + let peerVolume = + trunc((s.volume * ratio).float * PER_PEER_ALLOWED_PERCENT_OF_VOLUME).int + let peerPeriod = s.period * ratio + + return (peerVolume, peerPeriod) + +proc newRequestRateLimiter*(setting: Option[RateLimitSetting]): RequestRateLimiter = + let ratio = calcPeriodRatio(setting) + return RequestRateLimiter( + tokenBucket: newTokenBucket(setting), + setting: setting, + peerBucketSetting: calcPeerTokenSetting(setting, ratio), + peerUsage: init(TimedMap[PeerId, TokenBucket], calcCacheTimeout(setting, ratio)), + ) diff --git a/waku/common/waku_service_metrics.nim b/waku/common/rate_limit/service_metrics.nim similarity index 100% rename from waku/common/waku_service_metrics.nim rename to waku/common/rate_limit/service_metrics.nim diff --git a/waku/common/rate_limit/setting.nim b/waku/common/rate_limit/setting.nim new file mode 100644 index 000000000..420be9f71 --- /dev/null +++ b/waku/common/rate_limit/setting.nim @@ -0,0 +1,19 @@ +{.push raises: [].} + +import chronos/timer + +# Setting for TokenBucket defined as volume over period of time +type RateLimitSetting* = tuple[volume: int, period: Duration] + +# Set the default to switch off rate limiting for now +let DefaultGlobalNonRelayRateLimit*: RateLimitSetting = (0, 0.minutes) + +proc isUnlimited*(t: RateLimitSetting): bool {.inline.} = + return t.volume <= 0 or t.period <= 0.seconds + +func `$`*(t: RateLimitSetting): string {.inline.} = + return + if t.isUnlimited(): + "no-limit" + else: + $t.volume & "/" & $t.period diff --git a/waku/common/rate_limit/single_token_limiter.nim b/waku/common/rate_limit/single_token_limiter.nim new file mode 100644 index 000000000..1b62114bf --- /dev/null +++ b/waku/common/rate_limit/single_token_limiter.nim @@ -0,0 +1,50 @@ +## This module add usage check helpers for simple rate limiting with the use of TokenBucket. + +{.push raises: [].} + +import std/[options], chronos/timer, libp2p/stream/connection, libp2p/utility + +import ./[token_bucket, setting, service_metrics] +export token_bucket, setting, service_metrics + +proc newTokenBucket*( + setting: Option[RateLimitSetting], + replenishMode: ReplenishMode = ReplenishMode.Compensating, +): Option[TokenBucket] = + if setting.isNone(): + return none[TokenBucket]() + + if setting.get().isUnlimited(): + return none[TokenBucket]() + + return some(TokenBucket.new(setting.get().volume, setting.get().period)) + +proc checkUsage( + t: var TokenBucket, proto: string, now = Moment.now() +): bool {.raises: [].} = + if not t.tryConsume(1, now): + return false + + return true + +proc checkUsage( + t: var Option[TokenBucket], proto: string, now = Moment.now() +): bool {.raises: [].} = + if t.isNone(): + return true + + var tokenBucket = t.get() + return checkUsage(tokenBucket, proto, now) + +template checkUsageLimit*( + t: var Option[TokenBucket] | var TokenBucket, + proto: string, + conn: Connection, + bodyWithinLimit, bodyRejected: untyped, +) = + if t.checkUsage(proto): + waku_service_requests.inc(labelValues = [proto, "served"]) + bodyWithinLimit + else: + waku_service_requests.inc(labelValues = [proto, "rejected"]) + bodyRejected diff --git a/waku/common/rate_limit/timed_map.nim b/waku/common/rate_limit/timed_map.nim new file mode 100644 index 000000000..b05dfb0fb --- /dev/null +++ b/waku/common/rate_limit/timed_map.nim @@ -0,0 +1,162 @@ +## TimedMap +## =========== +## Inspired by nim-libp2p's TimedCache class. This is using the same approach to prune +## untouched items from the map where the set timeout duration is reached. +## But unlike TimedCache this TimedMap is capable to hold and return any type of value for a key. +## +## - `mgetOrPut` proc is similar to std/tables, but will renew the timeout for the key. +## - For non-renewal check use `contains` proc. +## - `expire` proc will remove all items that have expired. +## +## Choose your initial timeout for your needs to control the size of the map. + +{.push raises: [].} + +import std/[hashes, sets] +import chronos/timer, results +import libp2p/utility + +export results + +type + TimedEntry[K, V] = ref object of RootObj + key: K + value: V + addedAt: Moment + expiresAt: Moment + next, prev: TimedEntry[K, V] + + TimedMap*[K, V] = object of RootObj + head, tail: TimedEntry[K, V] # nim linked list doesn't allow inserting at pos + entries: HashSet[TimedEntry[K, V]] + timeout: Duration + +func `==`*[K, V](a, b: TimedEntry[K, V]): bool = + if isNil(a) == isNil(b): + isNil(a) or a.key == b.key + else: + false + +func hash*(a: TimedEntry): Hash = + if isNil(a): + default(Hash) + else: + hash(a[].key) + +func `$`*[T](a: T): string = + if isNil(a): + "nil" + + return $a + +func `$`*[K, V](a: TimedEntry[K, V]): string = + if isNil(a): + return "nil" + + return + "TimedEntry: key:" & $a.key & ", val:" & $a.value & ", addedAt:" & $a.addedAt & + ", expiresAt:" & $a.expiresAt + +func expire*(t: var TimedMap, now: Moment = Moment.now()) = + while t.head != nil and t.head.expiresAt <= now: + t.entries.excl(t.head) + t.head.prev = nil + t.head = t.head.next + if t.head == nil: + t.tail = nil + +func del[K, V](t: var TimedMap[K, V], key: K): Opt[TimedEntry[K, V]] = + # Removes existing key from cache, returning the previous item if present + let tmp = TimedEntry[K, V](key: key) + if tmp in t.entries: + let item = + try: + t.entries[tmp] # use the shared instance in the set + except KeyError: + raiseAssert "just checked" + t.entries.excl(item) + + if t.head == item: + t.head = item.next + if t.tail == item: + t.tail = item.prev + + if item.next != nil: + item.next.prev = item.prev + if item.prev != nil: + item.prev.next = item.next + Opt.some(item) + else: + Opt.none(TimedEntry[K, V]) + +func remove*[K, V](t: var TimedMap[K, V], key: K): Opt[V] = + # Removes existing key from cache, returning the previous value if present + # public version of del without exporting TimedEntry + let deleted = t.del(key) + if deleted.isSome(): + Opt.some(deleted[].value) + else: + Opt.none(V) + +proc mgetOrPut*[K, V](t: var TimedMap[K, V], k: K, v: V, now = Moment.now()): var V = + # Puts k in cache, returning true if the item was already present and false + # otherwise. If the item was already present, its expiry timer will be + # refreshed. + t.expire(now) + + let + previous = t.del(k) # Refresh existing item + addedAt = + if previous.isSome(): + previous[].addedAt + else: + now + value = + if previous.isSome(): + previous[].value + else: + v + + let node = + TimedEntry[K, V](key: k, value: value, addedAt: addedAt, expiresAt: now + t.timeout) + if t.head == nil: + t.tail = node + t.head = t.tail + else: + # search from tail because typically that's where we add when now grows + var cur = t.tail + while cur != nil and node.expiresAt < cur.expiresAt: + cur = cur.prev + + if cur == nil: + node.next = t.head + t.head.prev = node + t.head = node + else: + node.prev = cur + node.next = cur.next + cur.next = node + if cur == t.tail: + t.tail = node + + t.entries.incl(node) + + return node.value + +func contains*[K, V](t: TimedMap[K, V], k: K): bool = + let tmp = TimedEntry[K, V](key: k) + tmp in t.entries + +func addedAt*[K, V](t: var TimedMap[K, V], k: K): Moment = + let tmp = TimedEntry[K, V](key: k) + try: + if tmp in t.entries: # raising is slow + # Use shared instance from entries + return t.entries[tmp][].addedAt + except KeyError: + raiseAssert "just checked" + + default(Moment) + +func init*[K, V](T: type TimedMap[K, V], timeout: Duration): T = + T(timeout: timeout) diff --git a/waku/common/rate_limit/token_bucket.nim b/waku/common/rate_limit/token_bucket.nim new file mode 100644 index 000000000..799817ebd --- /dev/null +++ b/waku/common/rate_limit/token_bucket.nim @@ -0,0 +1,182 @@ +{.push raises: [].} + +import chronos, std/math, std/options + +const BUDGET_COMPENSATION_LIMIT_PERCENT = 0.25 + +## This is an extract from chronos/rate_limit.nim due to the found bug in the original implementation. +## Unfortunately that bug cannot be solved without harm the original features of TokenBucket class. +## So, this current shortcut is used to enable move ahead with nwaku rate limiter implementation. +## ref: https://github.com/status-im/nim-chronos/issues/500 +## +## This version of TokenBucket is different from the original one in chronos/rate_limit.nim in many ways: +## - It has a new mode called `Compensating` which is the default mode. +## Compensation is calculated as the not used bucket capacity in the last measured period(s) in average. +## or up until maximum the allowed compansation treshold (Currently it is const 25%). +## Also compensation takes care of the proper time period calculation to avoid non-usage periods that can lead to +## overcompensation. +## - Strict mode is also available which will only replenish when time period is over but also will fill +## the bucket to the max capacity. + +type + ReplenishMode* = enum + Strict + Compensating + + TokenBucket* = ref object + budget: int ## Current number of tokens in the bucket + budgetCap: int ## Bucket capacity + lastTimeFull: Moment + ## This timer measures the proper periodizaiton of the bucket refilling + fillDuration: Duration ## Refill period + case replenishMode*: ReplenishMode + of Strict: + ## In strict mode, the bucket is refilled only till the budgetCap + discard + of Compensating: + ## This is the default mode. + maxCompensation: float + +func periodDistance(bucket: TokenBucket, currentTime: Moment): float = + ## notice fillDuration cannot be zero by design + ## period distance is a float number representing the calculated period time + ## since the last time bucket was refilled. + return + nanoseconds(currentTime - bucket.lastTimeFull).float / + nanoseconds(bucket.fillDuration).float + +func getUsageAverageSince(bucket: TokenBucket, distance: float): float = + if distance == 0.float: + ## in case there is zero time difference than the usage percentage is 100% + return 1.0 + + ## budgetCap can never be zero + ## usage average is calculated as a percentage of total capacity available over + ## the measured period + return bucket.budget.float / bucket.budgetCap.float / distance + +proc calcCompensation(bucket: TokenBucket, averageUsage: float): int = + # if we already fully used or even overused the tokens, there is no place for compensation + if averageUsage >= 1.0: + return 0 + + ## compensation is the not used bucket capacity in the last measured period(s) in average. + ## or maximum the allowed compansation treshold + let compensationPercent = + min((1.0 - averageUsage) * bucket.budgetCap.float, bucket.maxCompensation) + return trunc(compensationPercent).int + +func periodElapsed(bucket: TokenBucket, currentTime: Moment): bool = + return currentTime - bucket.lastTimeFull >= bucket.fillDuration + +## Update will take place if bucket is empty and trying to consume tokens. +## It checks if the bucket can be replenished as refill duration is passed or not. +## - strict mode: +proc updateStrict(bucket: TokenBucket, currentTime: Moment) = + if bucket.fillDuration == default(Duration): + bucket.budget = min(bucket.budgetCap, bucket.budget) + return + + if not periodElapsed(bucket, currentTime): + return + + bucket.budget = bucket.budgetCap + bucket.lastTimeFull = currentTime + +## - compensating - ballancing load: +## - between updates we calculate average load (current bucket capacity / number of periods till last update) +## - gives the percentage load used recently +## - with this we can replenish bucket up to 100% + calculated leftover from previous period (caped with max treshold) +proc updateWithCompensation(bucket: TokenBucket, currentTime: Moment) = + if bucket.fillDuration == default(Duration): + bucket.budget = min(bucket.budgetCap, bucket.budget) + return + + # do not replenish within the same period + if not periodElapsed(bucket, currentTime): + return + + let distance = bucket.periodDistance(currentTime) + let recentAvgUsage = bucket.getUsageAverageSince(distance) + let compensation = bucket.calcCompensation(recentAvgUsage) + + bucket.budget = bucket.budgetCap + compensation + bucket.lastTimeFull = currentTime + +proc update(bucket: TokenBucket, currentTime: Moment) = + if bucket.replenishMode == ReplenishMode.Compensating: + updateWithCompensation(bucket, currentTime) + else: + updateStrict(bucket, currentTime) + +proc tryConsume*(bucket: TokenBucket, tokens: int, now = Moment.now()): bool = + ## If `tokens` are available, consume them, + ## Otherwhise, return false. + + if bucket.budget >= bucket.budgetCap: + bucket.lastTimeFull = now + + if bucket.budget >= tokens: + bucket.budget -= tokens + return true + + bucket.update(now) + + if bucket.budget >= tokens: + bucket.budget -= tokens + return true + else: + return false + +proc replenish*(bucket: TokenBucket, tokens: int, now = Moment.now()) = + ## Add `tokens` to the budget (capped to the bucket capacity) + bucket.budget += tokens + bucket.update(now) + +proc new*( + T: type[TokenBucket], + budgetCap: int, + fillDuration: Duration = 1.seconds, + mode: ReplenishMode = ReplenishMode.Compensating, +): T = + assert not isZero(fillDuration) + assert budgetCap != 0 + + ## Create different mode TokenBucket + case mode + of ReplenishMode.Strict: + return T( + budget: budgetCap, + budgetCap: budgetCap, + fillDuration: fillDuration, + lastTimeFull: Moment.now(), + replenishMode: mode, + ) + of ReplenishMode.Compensating: + T( + budget: budgetCap, + budgetCap: budgetCap, + fillDuration: fillDuration, + lastTimeFull: Moment.now(), + replenishMode: mode, + maxCompensation: budgetCap.float * BUDGET_COMPENSATION_LIMIT_PERCENT, + ) + +proc newStrict*(T: type[TokenBucket], capacity: int, period: Duration): TokenBucket = + T.new(capacity, period, ReplenishMode.Strict) + +proc newCompensating*( + T: type[TokenBucket], capacity: int, period: Duration +): TokenBucket = + T.new(capacity, period, ReplenishMode.Compensating) + +func `$`*(b: TokenBucket): string {.inline.} = + if isNil(b): + return "nil" + return $b.budgetCap & "/" & $b.fillDuration + +func `$`*(ob: Option[TokenBucket]): string {.inline.} = + if ob.isNone(): + return "no-limit" + + return $ob.get() diff --git a/waku/common/ratelimit.nim b/waku/common/ratelimit.nim deleted file mode 100644 index 1c40cd42a..000000000 --- a/waku/common/ratelimit.nim +++ /dev/null @@ -1,53 +0,0 @@ -{.push raises: [].} - -import std/options, chronos/timer, libp2p/stream/connection - -import ./tokenbucket - -export tokenbucket - -type RateLimitSetting* = tuple[volume: int, period: Duration] - -# Set the default to switch off rate limiting for now -let DefaultGlobalNonRelayRateLimit*: RateLimitSetting = (0, 0.minutes) - -proc newTokenBucket*(setting: Option[RateLimitSetting]): Option[TokenBucket] = - if setting.isNone: - return none[TokenBucket]() - - let (volume, period) = setting.get() - if volume <= 0 or period <= 0.seconds: - return none[TokenBucket]() - - return some(TokenBucket.new(volume, period)) - -proc checkUsage( - t: var Option[TokenBucket], proto: string, conn: Connection -): bool {.raises: [].} = - if t.isNone(): - return true - - let tokenBucket = t.get() - if not tokenBucket.tryConsume(1): - return false - - return true - -template checkUsageLimit*( - t: var Option[TokenBucket], - proto: string, - conn: Connection, - bodyWithinLimit, bodyRejected: untyped, -) = - if t.checkUsage(proto, conn): - waku_service_requests.inc(labelValues = [proto, "served"]) - bodyWithinLimit - else: - waku_service_requests.inc(labelValues = [proto, "rejected"]) - bodyRejected - -func `$`*(ob: Option[TokenBucket]): string {.inline.} = - if ob.isNone(): - return "no-limit" - - return $ob.get() diff --git a/waku/common/tokenbucket.nim b/waku/common/tokenbucket.nim deleted file mode 100644 index a35939f7a..000000000 --- a/waku/common/tokenbucket.nim +++ /dev/null @@ -1,64 +0,0 @@ -{.push raises: [].} - -import chronos - -## This is an extract from chronos/ratelimit.nim due to the found bug in the original implementation. -## Unfortunately that bug cannot be solved without harm the original features of TokenBucket class. -## So, this current shortcut is used to enable move ahead with nwaku rate limiter implementation. -type TokenBucket* = ref object - budget*: int ## Current number of tokens in the bucket - budgetCap: int ## Bucket capacity - lastTimeFull: Moment - ## This timer measures the proper periodizaiton of the bucket refilling - fillDuration: Duration ## Refill period - -## Update will take place if bucket is empty and trying to consume tokens. -## It checks if the bucket can be replenished as refill duration is passed or not. -proc update(bucket: TokenBucket, currentTime: Moment) = - if bucket.fillDuration == default(Duration): - bucket.budget = min(bucket.budgetCap, bucket.budget) - return - - let timeDeltaFromLastFull = currentTime - bucket.lastTimeFull - - if timeDeltaFromLastFull.milliseconds < bucket.fillDuration.milliseconds: - return - - bucket.budget = bucket.budgetCap - bucket.lastTimeFull = currentTime - -proc tryConsume*(bucket: TokenBucket, tokens: int, now = Moment.now()): bool = - ## If `tokens` are available, consume them, - ## Otherwhise, return false. - - if bucket.budget == bucket.budgetCap: - bucket.lastTimeFull = now - - if bucket.budget >= tokens: - bucket.budget -= tokens - return true - - bucket.update(now) - - if bucket.budget >= tokens: - bucket.budget -= tokens - return true - else: - return false - -proc replenish*(bucket: TokenBucket, tokens: int, now = Moment.now()) = - ## Add `tokens` to the budget (capped to the bucket capacity) - bucket.budget += tokens - bucket.update(now) - -proc new*(T: type[TokenBucket], budgetCap: int, fillDuration: Duration = 1.seconds): T = - ## Create a TokenBucket - T( - budget: budgetCap, - budgetCap: budgetCap, - fillDuration: fillDuration, - lastTimeFull: Moment.now(), - ) - -func `$`*(b: TokenBucket): string {.inline.} = - return $b.budgetCap & "/" & $b.fillDuration diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index 53995a0ff..80e59cd98 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -36,7 +36,7 @@ import ../node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations, ../waku_lightpush/common, ../common/utils/parse_size_units, - ../common/ratelimit + ../common/rate_limit/setting ## Peer persistence diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 23d9799c3..3a1e0ba45 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -48,7 +48,7 @@ import ../waku_rln_relay, ./config, ./peer_manager, - ../common/ratelimit + ../common/rate_limit/setting declarePublicCounter waku_node_messages, "number of messages received", ["type"] declarePublicHistogram waku_histogram_message_size, @@ -442,13 +442,18 @@ proc mountFilter*( maxFilterPeers: uint32 = filter_subscriptions.MaxFilterPeers, maxFilterCriteriaPerPeer: uint32 = filter_subscriptions.MaxFilterCriteriaPerPeer, messageCacheTTL: Duration = filter_subscriptions.MessageCacheTTL, + rateLimitSetting: RateLimitSetting = FilterPerPeerRateLimit, ) {.async: (raises: []).} = ## Mounting filter v2 protocol info "mounting filter protocol" node.wakuFilter = WakuFilter.new( - node.peerManager, subscriptionTimeout, maxFilterPeers, maxFilterCriteriaPerPeer, + node.peerManager, + subscriptionTimeout, + maxFilterPeers, + maxFilterCriteriaPerPeer, messageCacheTTL, + some(rateLimitSetting), ) if node.started: diff --git a/waku/waku_api/rest/filter/handlers.nim b/waku/waku_api/rest/filter/handlers.nim index 29c5d7a26..d684335f7 100644 --- a/waku/waku_api/rest/filter/handlers.nim +++ b/waku/waku_api/rest/filter/handlers.nim @@ -71,7 +71,8 @@ proc getStatusDesc( of FilterSubscribeErrorKind.PEER_DIAL_FAILURE: err.address of FilterSubscribeErrorKind.BAD_RESPONSE, FilterSubscribeErrorKind.BAD_REQUEST, - FilterSubscribeErrorKind.NOT_FOUND, FilterSubscribeErrorKind.SERVICE_UNAVAILABLE: + FilterSubscribeErrorKind.NOT_FOUND, FilterSubscribeErrorKind.TOO_MANY_REQUESTS, + FilterSubscribeErrorKind.SERVICE_UNAVAILABLE: err.cause of FilterSubscribeErrorKind.UNKNOWN: "UNKNOWN" @@ -111,6 +112,8 @@ proc convertErrorKindToHttpStatus( return Http400 of filter_protocol_type.FilterSubscribeErrorKind.NOT_FOUND: return Http404 + of filter_protocol_type.FilterSubscribeErrorKind.TOO_MANY_REQUESTS: + return Http429 of filter_protocol_type.FilterSubscribeErrorKind.SERVICE_UNAVAILABLE: return Http503 diff --git a/waku/waku_filter_v2/client.nim b/waku/waku_filter_v2/client.nim index 6823972b2..07b67a8b2 100644 --- a/waku/waku_filter_v2/client.nim +++ b/waku/waku_filter_v2/client.nim @@ -69,7 +69,9 @@ proc sendSubscribeRequest( let response = respDecodeRes.get() - if response.requestId != filterSubscribeRequest.requestId: + # DOS protection rate limit checks does not know about request id + if response.statusCode != FilterSubscribeErrorKind.TOO_MANY_REQUESTS.uint32 and + response.requestId != filterSubscribeRequest.requestId: trace "Filter subscribe response requestId mismatch", servicePeer, response waku_filter_errors.inc(labelValues = [requestIdMismatch]) return err(FilterSubscribeError.badResponse(requestIdMismatch)) diff --git a/waku/waku_filter_v2/common.nim b/waku/waku_filter_v2/common.nim index 31adb56eb..ad0f23fdb 100644 --- a/waku/waku_filter_v2/common.nim +++ b/waku/waku_filter_v2/common.nim @@ -12,6 +12,7 @@ type BAD_RESPONSE = uint32(300) BAD_REQUEST = uint32(400) NOT_FOUND = uint32(404) + TOO_MANY_REQUESTS = uint32(429) SERVICE_UNAVAILABLE = uint32(503) PEER_DIAL_FAILURE = uint32(504) @@ -19,7 +20,7 @@ type case kind*: FilterSubscribeErrorKind of PEER_DIAL_FAILURE: address*: string - of BAD_RESPONSE, BAD_REQUEST, NOT_FOUND, SERVICE_UNAVAILABLE: + of BAD_RESPONSE, BAD_REQUEST, NOT_FOUND, TOO_MANY_REQUESTS, SERVICE_UNAVAILABLE: cause*: string else: discard @@ -49,6 +50,11 @@ proc notFound*( ): FilterSubscribeError = FilterSubscribeError(kind: FilterSubscribeErrorKind.NOT_FOUND, cause: cause) +proc tooManyRequests*( + T: type FilterSubscribeError, cause = "too many requests" +): FilterSubscribeError = + FilterSubscribeError(kind: FilterSubscribeErrorKind.TOO_MANY_REQUESTS, cause: cause) + proc serviceUnavailable*( T: type FilterSubscribeError, cause = "service unavailable" ): FilterSubscribeError = @@ -56,7 +62,7 @@ proc serviceUnavailable*( proc parse*(T: type FilterSubscribeErrorKind, kind: uint32): T = case kind - of 000, 200, 300, 400, 404, 503: + of 000, 200, 300, 400, 404, 429, 503: FilterSubscribeErrorKind(kind) else: FilterSubscribeErrorKind.UNKNOWN @@ -66,7 +72,7 @@ proc parse*(T: type FilterSubscribeError, kind: uint32, cause = "", address = "" case kind of PEER_DIAL_FAILURE: FilterSubscribeError(kind: kind, address: address) - of BAD_RESPONSE, BAD_REQUEST, NOT_FOUND, SERVICE_UNAVAILABLE: + of BAD_RESPONSE, BAD_REQUEST, NOT_FOUND, TOO_MANY_REQUESTS, SERVICE_UNAVAILABLE: FilterSubscribeError(kind: kind, cause: cause) else: FilterSubscribeError(kind: kind) @@ -81,6 +87,8 @@ proc `$`*(err: FilterSubscribeError): string = "BAD_REQUEST: " & err.cause of FilterSubscribeErrorKind.NOT_FOUND: "NOT_FOUND: " & err.cause + of FilterSubscribeErrorKind.TOO_MANY_REQUESTS: + "TOO_MANY_REQUESTS: " & err.cause of FilterSubscribeErrorKind.SERVICE_UNAVAILABLE: "SERVICE_UNAVAILABLE: " & err.cause of FilterSubscribeErrorKind.UNKNOWN: diff --git a/waku/waku_filter_v2/protocol.nim b/waku/waku_filter_v2/protocol.nim index 5b87d6148..588ae7809 100644 --- a/waku/waku_filter_v2/protocol.nim +++ b/waku/waku_filter_v2/protocol.nim @@ -13,11 +13,8 @@ import import ../node/peer_manager, ../waku_core, - ./common, - ./protocol_metrics, - ./rpc_codec, - ./rpc, - ./subscriptions + ../common/rate_limit/per_peer_limiter, + ./[common, protocol_metrics, rpc_codec, rpc, subscriptions] logScope: topics = "waku filter" @@ -30,6 +27,7 @@ type WakuFilter* = ref object of LPProtocol peerManager: PeerManager maintenanceTask: TimerCallback messageCache: TimedCache[string] + peerRequestRateLimiter*: PerPeerRateLimiter proc pingSubscriber(wf: WakuFilter, peerId: PeerID): FilterSubscribeResult = trace "pinging subscriber", peerId = peerId @@ -97,6 +95,9 @@ proc unsubscribe( wf.subscriptions.removeSubscription(peerId, filterCriteria).isOkOr: return err(FilterSubscribeError.notFound()) + ## Note: do not remove from peerRequestRateLimiter to prevent trick with subscribe/unsubscribe loop + ## We remove only if peerManager removes the peer + ok() proc unsubscribeAll(wf: WakuFilter, peerId: PeerID): FilterSubscribeResult = @@ -167,6 +168,9 @@ proc pushToPeer(wf: WakuFilter, peer: PeerId, buffer: seq[byte]) {.async.} = return await conn.get().writeLp(buffer) + waku_service_network_bytes.inc( + amount = buffer.len().int64, labelValues = [WakuFilterPushCodec, "out"] + ) proc pushToPeers( wf: WakuFilter, peers: seq[PeerId], messagePush: MessagePush @@ -210,6 +214,7 @@ proc maintainSubscriptions*(wf: WakuFilter) = if peersToRemove.len > 0: wf.subscriptions.removePeers(peersToRemove) + wf.peerRequestRateLimiter.unregister(peersToRemove) wf.subscriptions.cleanUp() @@ -267,21 +272,36 @@ proc initProtocolHandler(wf: WakuFilter) = proc handler(conn: Connection, proto: string) {.async.} = trace "filter subscribe request handler triggered", peer_id = shortLog(conn.peerId) - let buf = await conn.readLp(int(DefaultMaxSubscribeSize)) + var response: FilterSubscribeResponse - let decodeRes = FilterSubscribeRequest.decode(buf) - if decodeRes.isErr(): - error "Failed to decode filter subscribe request", - peer_id = conn.peerId, err = decodeRes.error - waku_filter_errors.inc(labelValues = [decodeRpcFailure]) - return + wf.peerRequestRateLimiter.checkUsageLimit(WakuFilterSubscribeCodec, conn): + let buf = await conn.readLp(int(DefaultMaxSubscribeSize)) - let request = decodeRes.value #TODO: toAPI() split here + waku_service_network_bytes.inc( + amount = buf.len().int64, labelValues = [WakuFilterSubscribeCodec, "in"] + ) - let response = wf.handleSubscribeRequest(conn.peerId, request) + let decodeRes = FilterSubscribeRequest.decode(buf) + if decodeRes.isErr(): + error "Failed to decode filter subscribe request", + peer_id = conn.peerId, err = decodeRes.error + waku_filter_errors.inc(labelValues = [decodeRpcFailure]) + return - debug "sending filter subscribe response", - peer_id = shortLog(conn.peerId), response = response + let request = decodeRes.value #TODO: toAPI() split here + + response = wf.handleSubscribeRequest(conn.peerId, request) + + debug "sending filter subscribe response", + peer_id = shortLog(conn.peerId), response = response + do: + debug "filter request rejected due rate limit exceeded", + peerId = conn.peerId, limit = $wf.peerRequestRateLimiter.setting + response = FilterSubscribeResponse( + requestId: "N/A", + statusCode: FilterSubscribeErrorKind.TOO_MANY_REQUESTS.uint32, + statusDesc: some("filter request rejected due rate limit exceeded"), + ) await conn.writeLp(response.encode().buffer) #TODO: toRPC() separation here return @@ -296,6 +316,7 @@ proc new*( maxFilterPeers: uint32 = MaxFilterPeers, maxFilterCriteriaPerPeer: uint32 = MaxFilterCriteriaPerPeer, messageCacheTTL: Duration = MessageCacheTTL, + rateLimitSetting: Option[RateLimitSetting] = none[RateLimitSetting](), ): T = let wf = WakuFilter( subscriptions: FilterSubscriptions.init( @@ -303,6 +324,7 @@ proc new*( ), peerManager: peerManager, messageCache: init(TimedCache[string], messageCacheTTL), + peerRequestRateLimiter: PerPeerRateLimiter(setting: rateLimitSetting), ) wf.initProtocolHandler() diff --git a/waku/waku_filter_v2/subscriptions.nim b/waku/waku_filter_v2/subscriptions.nim index 8a5c5bc91..f8f298708 100644 --- a/waku/waku_filter_v2/subscriptions.nim +++ b/waku/waku_filter_v2/subscriptions.nim @@ -1,7 +1,7 @@ {.push raises: [].} import std/[sets, tables], chronicles, chronos, libp2p/peerid, stew/shims/sets -import ../waku_core, ../utils/tableutils +import ../waku_core, ../utils/tableutils, ../common/rate_limit/setting logScope: topics = "waku filter subscriptions" @@ -12,6 +12,12 @@ const DefaultSubscriptionTimeToLiveSec* = 5.minutes MessageCacheTTL* = 2.minutes + # Acceptable call frequence from one peer using filter service + # Assumption is having to set up a subscription with max 30 calls than using ping in every min + # While subscribe/unsubscribe events are distributed in time among clients, pings will happen regularly from + # all subscribed peers + FilterPerPeerRateLimit*: RateLimitSetting = (30, 1.minutes) + type # a single filter criterion is fully defined by a pubsub topic and content topic FilterCriterion* = tuple[pubsubTopic: PubsubTopic, contentTopic: ContentTopic] diff --git a/waku/waku_lightpush/callbacks.nim b/waku/waku_lightpush/callbacks.nim index 5cc14eb0f..c71efad31 100644 --- a/waku/waku_lightpush/callbacks.nim +++ b/waku/waku_lightpush/callbacks.nim @@ -1,23 +1,22 @@ {.push raises: [].} import - ../waku_core, - ../waku_relay, - ./common, - ./protocol, - ../waku_rln_relay, - ../waku_rln_relay/protocol_types, - ../common/ratelimit -import - std/times, - libp2p/peerid, - stew/byteutils + ../waku_core, + ../waku_relay, + ./common, + ./protocol, + ../waku_rln_relay, + ../waku_rln_relay/protocol_types -proc checkAndGenerateRLNProof*(rlnPeer: Option[WakuRLNRelay], message: WakuMessage): Result[WakuMessage, string] = +import std/times, libp2p/peerid, stew/byteutils + +proc checkAndGenerateRLNProof*( + rlnPeer: Option[WakuRLNRelay], message: WakuMessage +): Result[WakuMessage, string] = # check if the message already has RLN proof if message.proof.len > 0: return ok(message) - + if rlnPeer.isNone(): notice "Publishing message without RLN proof" return ok(message) @@ -32,16 +31,15 @@ proc checkAndGenerateRLNProof*(rlnPeer: Option[WakuRLNRelay], message: WakuMessa proc getNilPushHandler*(): PushMessageHandler = return proc( - peer: PeerId, pubsubTopic: string, message: WakuMessage + peer: PeerId, pubsubTopic: string, message: WakuMessage ): Future[WakuLightPushResult[void]] {.async.} = return err("no waku relay found") proc getRelayPushHandler*( - wakuRelay: WakuRelay, - rlnPeer: Option[WakuRLNRelay] = none[WakuRLNRelay]() + wakuRelay: WakuRelay, rlnPeer: Option[WakuRLNRelay] = none[WakuRLNRelay]() ): PushMessageHandler = return proc( - peer: PeerId, pubsubTopic: string, message: WakuMessage + peer: PeerId, pubsubTopic: string, message: WakuMessage ): Future[WakuLightPushResult[void]] {.async.} = # append RLN proof let msgWithProof = checkAndGenerateRLNProof(rlnPeer, message) @@ -57,4 +55,4 @@ proc getRelayPushHandler*( let msgHash = computeMessageHash(pubsubTopic, message).to0xHex() notice "Lightpush request has not been published to any peers", msg_hash = msgHash - return ok() \ No newline at end of file + return ok() diff --git a/waku/waku_lightpush/protocol.nim b/waku/waku_lightpush/protocol.nim index 565d8c237..4af2272b2 100644 --- a/waku/waku_lightpush/protocol.nim +++ b/waku/waku_lightpush/protocol.nim @@ -8,10 +8,7 @@ import ./rpc, ./rpc_codec, ./protocol_metrics, - ../common/ratelimit, - ../common/waku_service_metrics - -export ratelimit + ../common/rate_limit/request_limiter logScope: topics = "waku lightpush" @@ -20,7 +17,7 @@ type WakuLightPush* = ref object of LPProtocol rng*: ref rand.HmacDrbgContext peerManager*: PeerManager pushHandler*: PushMessageHandler - requestRateLimiter*: Option[TokenBucket] + requestRateLimiter*: RequestRateLimiter proc handleRequest*( wl: WakuLightPush, peerId: PeerId, buffer: seq[byte] @@ -76,7 +73,7 @@ proc initProtocolHandler(wl: WakuLightPush) = rpc = await handleRequest(wl, conn.peerId, buffer) do: debug "lightpush request rejected due rate limit exceeded", - peerId = conn.peerId, limit = $wl.requestRateLimiter + peerId = conn.peerId, limit = $wl.requestRateLimiter.setting rpc = static( PushRPC( @@ -108,7 +105,7 @@ proc new*( rng: rng, peerManager: peerManager, pushHandler: pushHandler, - requestRateLimiter: newTokenBucket(rateLimitSetting), + requestRateLimiter: newRequestRateLimiter(rateLimitSetting), ) wl.initProtocolHandler() return wl diff --git a/waku/waku_store/protocol.nim b/waku/waku_store/protocol.nim index eda5168cf..a4e5467a2 100644 --- a/waku/waku_store/protocol.nim +++ b/waku/waku_store/protocol.nim @@ -20,8 +20,7 @@ import ./common, ./rpc_codec, ./protocol_metrics, - ../common/ratelimit, - ../common/waku_service_metrics + ../common/rate_limit/request_limiter logScope: topics = "waku store" @@ -36,7 +35,7 @@ type WakuStore* = ref object of LPProtocol peerManager: PeerManager rng: ref rand.HmacDrbgContext requestHandler*: StoreQueryRequestHandler - requestRateLimiter*: Option[TokenBucket] + requestRateLimiter*: RequestRateLimiter ## Protocol @@ -107,7 +106,7 @@ proc initProtocolHandler(self: WakuStore) = resBuf = await self.handleQueryRequest(conn.peerId, reqBuf) do: debug "store query request rejected due rate limit exceeded", - peerId = conn.peerId, limit = $self.requestRateLimiter + peerId = conn.peerId, limit = $self.requestRateLimiter.setting resBuf = rejectReposnseBuffer let writeRes = catch: @@ -138,7 +137,7 @@ proc new*( rng: rng, peerManager: peerManager, requestHandler: requestHandler, - requestRateLimiter: newTokenBucket(rateLimitSetting), + requestRateLimiter: newRequestRateLimiter(rateLimitSetting), ) store.initProtocolHandler() diff --git a/waku/waku_store_legacy/protocol.nim b/waku/waku_store_legacy/protocol.nim index 5fe388228..6f158394e 100644 --- a/waku/waku_store_legacy/protocol.nim +++ b/waku/waku_store_legacy/protocol.nim @@ -21,8 +21,7 @@ import ./rpc, ./rpc_codec, ./protocol_metrics, - ../common/ratelimit, - ../common/waku_service_metrics + ../common/rate_limit/request_limiter logScope: topics = "waku legacy store" @@ -37,7 +36,7 @@ type WakuStore* = ref object of LPProtocol peerManager: PeerManager rng: ref rand.HmacDrbgContext queryHandler*: HistoryQueryHandler - requestRateLimiter*: Option[TokenBucket] + requestRateLimiter*: RequestRateLimiter ## Protocol @@ -122,7 +121,7 @@ proc initProtocolHandler(ws: WakuStore) = resBuf = await ws.handleLegacyQueryRequest(conn.peerId, reqBuf) do: debug "Legacy store query request rejected due rate limit exceeded", - peerId = conn.peerId, limit = $ws.requestRateLimiter + peerId = conn.peerId, limit = $ws.requestRateLimiter.setting resBuf = rejectResponseBuf let writeRes = catch: @@ -154,7 +153,7 @@ proc new*( rng: rng, peerManager: peerManager, queryHandler: queryHandler, - requestRateLimiter: newTokenBucket(rateLimitSetting), + requestRateLimiter: newRequestRateLimiter(rateLimitSetting), ) ws.initProtocolHandler() ws