feat: DOS protection of non relay protocols - rate limit phase3 (#2897)

* DOS protection of non relay protocols - rate limit phase3:
- Enhanced TokenBucket to be able to add compensation tokens based on previous usage percentage,
- per peer rate limiter 'PeerRateLimier' applied on waku_filter_v2 with opinionated default of acceptable request rate
- Add traffic metrics to filter message push
- RequestRateLimiter added to combine simple token bucket limiting of request numbers but consider per peer usage over time and prevent some peers to over use the service
  (although currently rule violating peers will not be disconnected by this time only their requests will get not served)
- TimedMap utility created (inspired and taken from libp2p TimedCache) which serves as forgiving feature for peers had been overusing the service.
- Added more tests
- Fix rebase issues
- Applied new RequestRateLimiter for store and legacy_store and lightpush
* Incorporate review comments, typos, file/class naming and placement changes.
* Add issue link reference of the original issue with nim-chronos TokenBucket
* Make TimedEntry of TimedMap private and not mixable with similar named in libp2p
* Fix review comments, renamings, const instead of values and more comments.
This commit is contained in:
NagyZoltanPeter 2024-07-16 15:46:21 +02:00 committed by GitHub
parent b196bc097b
commit ca634ef3ba
30 changed files with 1098 additions and 182 deletions

View File

@ -5,4 +5,7 @@ import
./test_envvar_serialization,
./test_protobuf_validation,
./test_sqlite_migrations,
./test_parse_size
./test_parse_size,
./test_tokenbucket,
./test_requestratelimiter,
./test_timed_map

View File

@ -0,0 +1,84 @@
# Chronos Test Suite
# (c) Copyright 2022-Present
# Status Research & Development GmbH
#
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
{.used.}
import testutils/unittests
import chronos, libp2p/stream/connection
import std/[sequtils, options]
import ../../waku/common/rate_limit/request_limiter
import ../../waku/common/rate_limit/timed_map
let proto = "ProtocolDescriptor"
let conn1 = Connection(peerId: PeerId.random().tryGet())
let conn2 = Connection(peerId: PeerId.random().tryGet())
let conn3 = Connection(peerId: PeerId.random().tryGet())
suite "RequestRateLimiter":
test "RequestRateLimiter Allow up to main bucket":
# keep limits low for easier calculation of ratios
let rateLimit: RateLimitSetting = (4, 2.minutes)
var limiter = newRequestRateLimiter(some(rateLimit))
# per peer tokens will be 6 / 4min
# as ratio is 2 in this case but max tokens are main tokens*ratio . 0.75
# notice meanwhile we have 8 global tokens over 2 period (4 mins) in sum
# See: waku/common/rate_limit/request_limiter.nim #func calcPeriodRatio
let now = Moment.now()
# with first use we register the peer also and start its timer
check limiter.checkUsage(proto, conn2, now) == true
for i in 0 ..< 3:
check limiter.checkUsage(proto, conn1, now) == true
check limiter.checkUsage(proto, conn2, now + 3.minutes) == true
for i in 0 ..< 3:
check limiter.checkUsage(proto, conn1, now + 3.minutes) == true
# conn1 reached the 75% of the main bucket over 2 periods of time
check limiter.checkUsage(proto, conn1, now + 3.minutes) == false
# conn2 has not used its tokens while we have 1 more tokens left in the main bucket
check limiter.checkUsage(proto, conn2, now + 3.minutes) == true
test "RequestRateLimiter Restrict overusing peer":
# keep limits low for easier calculation of ratios
let rateLimit: RateLimitSetting = (10, 2.minutes)
var limiter = newRequestRateLimiter(some(rateLimit))
# per peer tokens will be 15 / 4min
# as ratio is 2 in this case but max tokens are main tokens*ratio . 0.75
# notice meanwhile we have 20 tokens over 2 period (4 mins) in sum
# See: waku/common/rate_limit/request_limiter.nim #func calcPeriodRatio
let now = Moment.now()
# with first use we register the peer also and start its timer
for i in 0 ..< 10:
check limiter.checkUsage(proto, conn1, now) == true
# run out of main tokens but still used one more token from the peer's bucket
check limiter.checkUsage(proto, conn1, now) == false
for i in 0 ..< 4:
check limiter.checkUsage(proto, conn1, now + 3.minutes) == true
# conn1 reached the 75% of the main bucket over 2 periods of time
check limiter.checkUsage(proto, conn1, now + 3.minutes) == false
check limiter.checkUsage(proto, conn2, now + 3.minutes) == true
check limiter.checkUsage(proto, conn2, now + 3.minutes) == true
check limiter.checkUsage(proto, conn3, now + 3.minutes) == true
check limiter.checkUsage(proto, conn2, now + 3.minutes) == true
check limiter.checkUsage(proto, conn3, now + 3.minutes) == true
# conn1 gets replenished as the ratio was 2 giving twice as long replenish period than the main bucket
# see waku/common/rate_limit/request_limiter.nim #func calcPeriodRatio and calcPeerTokenSetting
check limiter.checkUsage(proto, conn1, now + 4.minutes) == true
# requests of other peers can also go
check limiter.checkUsage(proto, conn2, now + 4100.milliseconds) == true
check limiter.checkUsage(proto, conn3, now + 5.minutes) == true

View File

@ -0,0 +1,60 @@
{.used.}
import unittest2
import chronos/timer
import ../../waku/common/rate_limit/timed_map
suite "TimedMap":
test "put/get":
var cache = TimedMap[int, string].init(5.seconds)
let now = Moment.now()
check:
cache.mgetOrPut(1, "1", now) == "1"
cache.mgetOrPut(1, "1", now + 1.seconds) == "1"
cache.mgetOrPut(2, "2", now + 4.seconds) == "2"
check:
1 in cache
2 in cache
check:
cache.mgetOrPut(3, "3", now + 6.seconds) == "3"
# expires 1
check:
1 notin cache
2 in cache
3 in cache
cache.addedAt(2) == now + 4.seconds
check:
cache.mgetOrPut(2, "modified2", now + 8.seconds) == "2" # refreshes 2
cache.mgetOrPut(4, "4", now + 12.seconds) == "4" # expires 3
check:
2 in cache
3 notin cache
4 in cache
check:
cache.remove(4).isSome()
4 notin cache
check:
cache.mgetOrPut(100, "100", now + 100.seconds) == "100" # expires everything
100 in cache
2 notin cache
test "enough items to force cache heap storage growth":
var cache = TimedMap[int, string].init(5.seconds)
let now = Moment.now()
for i in 101 .. 100000:
check:
cache.mgetOrPut(i, $i, now) == $i
for i in 101 .. 100000:
check:
i in cache

View File

@ -0,0 +1,69 @@
# Chronos Test Suite
# (c) Copyright 2022-Present
# Status Research & Development GmbH
#
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
{.used.}
import testutils/unittests
import chronos
import ../../waku/common/rate_limit/token_bucket
suite "Token Bucket":
test "TokenBucket Sync test - strict":
var bucket = TokenBucket.newStrict(1000, 1.milliseconds)
let
start = Moment.now()
fullTime = start + 1.milliseconds
check:
bucket.tryConsume(800, start) == true
bucket.tryConsume(200, start) == true
# Out of budget
bucket.tryConsume(100, start) == false
bucket.tryConsume(800, fullTime) == true
bucket.tryConsume(200, fullTime) == true
# Out of budget
bucket.tryConsume(100, fullTime) == false
test "TokenBucket Sync test - compensating":
var bucket = TokenBucket.new(1000, 1.milliseconds)
let
start = Moment.now()
fullTime = start + 1.milliseconds
check:
bucket.tryConsume(800, start) == true
bucket.tryConsume(200, start) == true
# Out of budget
bucket.tryConsume(100, start) == false
bucket.tryConsume(800, fullTime) == true
bucket.tryConsume(200, fullTime) == true
# Due not using the bucket for a full period the compensation will satisfy this request
bucket.tryConsume(100, fullTime) == true
test "TokenBucket Max compensation":
var bucket = TokenBucket.new(1000, 1.minutes)
var reqTime = Moment.now()
check bucket.tryConsume(1000, reqTime)
check bucket.tryConsume(1, reqTime) == false
reqTime += 1.minutes
check bucket.tryConsume(500, reqTime) == true
reqTime += 1.minutes
check bucket.tryConsume(1000, reqTime) == true
reqTime += 10.seconds
# max compensation is 25% so try to consume 250 more
check bucket.tryConsume(250, reqTime) == true
reqTime += 49.seconds
# out of budget within the same period
check bucket.tryConsume(1, reqTime) == false
test "TokenBucket Short replenish":
var bucket = TokenBucket.new(15000, 1.milliseconds)
let start = Moment.now()
check bucket.tryConsume(15000, start)
check bucket.tryConsume(1, start) == false
check bucket.tryConsume(15000, start + 1.milliseconds) == true

View File

@ -1,3 +1,4 @@
{.used.}
import ./test_waku_client, ./test_waku_filter_protocol
import
./test_waku_client, ./test_waku_filter_protocol, ./test_waku_filter_dos_protection

View File

@ -0,0 +1,183 @@
{.used.}
import
std/[options, tables, sequtils, strutils, json],
testutils/unittests,
stew/[results, byteutils],
chronos,
chronicles,
os,
libp2p/peerstore
import
waku/[
node/peer_manager,
waku_core,
common/rate_limit/setting,
common/rate_limit/token_bucket,
],
waku/waku_filter_v2/[common, client, subscriptions, protocol, rpc_codec],
../testlib/[wakucore, testasync, testutils, futures, sequtils],
./waku_filter_utils,
../resources/payloads
type AFilterClient = ref object of RootObj
clientSwitch*: Switch
wakuFilterClient*: WakuFilterClient
clientPeerId*: PeerId
messagePushHandler*: FilterPushHandler
msgSeq*: seq[(PubsubTopic, WakuMessage)]
pushHandlerFuture*: Future[(PubsubTopic, WakuMessage)]
proc init(T: type[AFilterClient]): T =
var r = T(
clientSwitch: newStandardSwitch(),
msgSeq: @[],
pushHandlerFuture: newPushHandlerFuture(),
)
r.wakuFilterClient = waitFor newTestWakuFilterClient(r.clientSwitch)
r.messagePushHandler = proc(
pubsubTopic: PubsubTopic, message: WakuMessage
): Future[void] {.async, closure, gcsafe.} =
r.msgSeq.add((pubsubTopic, message))
r.pushHandlerFuture.complete((pubsubTopic, message))
r.clientPeerId = r.clientSwitch.peerInfo.toRemotePeerInfo().peerId
r.wakuFilterClient.registerPushHandler(r.messagePushHandler)
return r
proc subscribe(
client: AFilterClient,
serverRemotePeerInfo: RemotePeerInfo,
pubsubTopic: PubsubTopic,
contentTopicSeq: seq[ContentTopic],
): Option[FilterSubscribeErrorKind] =
let subscribeResponse = waitFor client.wakuFilterClient.subscribe(
serverRemotePeerInfo, pubsubTopic, contentTopicSeq
)
if subscribeResponse.isOk():
return none[FilterSubscribeErrorKind]()
return some(subscribeResponse.error().kind)
proc unsubscribe(
client: AFilterClient,
serverRemotePeerInfo: RemotePeerInfo,
pubsubTopic: PubsubTopic,
contentTopicSeq: seq[ContentTopic],
): Option[FilterSubscribeErrorKind] =
let unsubscribeResponse = waitFor client.wakuFilterClient.unsubscribe(
serverRemotePeerInfo, pubsubTopic, contentTopicSeq
)
if unsubscribeResponse.isOk():
return none[FilterSubscribeErrorKind]()
return some(unsubscribeResponse.error().kind)
proc ping(
client: AFilterClient, serverRemotePeerInfo: RemotePeerInfo
): Option[FilterSubscribeErrorKind] =
let pingResponse = waitFor client.wakuFilterClient.ping(serverRemotePeerInfo)
if pingResponse.isOk():
return none[FilterSubscribeErrorKind]()
return some(pingResponse.error().kind)
suite "Waku Filter - DOS protection":
var serverSwitch {.threadvar.}: Switch
var client1 {.threadvar.}: AFilterClient
var client2 {.threadvar.}: AFilterClient
var wakuFilter {.threadvar.}: WakuFilter
var serverRemotePeerInfo {.threadvar.}: RemotePeerInfo
var pubsubTopic {.threadvar.}: PubsubTopic
var contentTopic {.threadvar.}: ContentTopic
var contentTopicSeq {.threadvar.}: seq[ContentTopic]
asyncSetup:
client1 = AFilterClient.init()
client2 = AFilterClient.init()
pubsubTopic = DefaultPubsubTopic
contentTopic = DefaultContentTopic
contentTopicSeq = @[contentTopic]
serverSwitch = newStandardSwitch()
wakuFilter = await newTestWakuFilter(
serverSwitch, rateLimitSetting = some((3, 1000.milliseconds))
)
await allFutures(
serverSwitch.start(), client1.clientSwitch.start(), client2.clientSwitch.start()
)
serverRemotePeerInfo = serverSwitch.peerInfo.toRemotePeerInfo()
client1.clientPeerId = client1.clientSwitch.peerInfo.toRemotePeerInfo().peerId
client2.clientPeerId = client2.clientSwitch.peerInfo.toRemotePeerInfo().peerId
asyncTeardown:
await allFutures(
wakuFilter.stop(),
client1.wakuFilterClient.stop(),
client2.wakuFilterClient.stop(),
serverSwitch.stop(),
client1.clientSwitch.stop(),
client2.clientSwitch.stop(),
)
asyncTest "Limit number of subscriptions requests":
# Given
check client1.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) ==
none(FilterSubscribeErrorKind)
check client2.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) ==
none(FilterSubscribeErrorKind)
await sleepAsync(20.milliseconds)
check client1.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) ==
none(FilterSubscribeErrorKind)
check client2.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) ==
none(FilterSubscribeErrorKind)
await sleepAsync(20.milliseconds)
check client1.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) ==
none(FilterSubscribeErrorKind)
await sleepAsync(20.milliseconds)
check client1.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) ==
some(FilterSubscribeErrorKind.TOO_MANY_REQUESTS)
check client2.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) ==
none(FilterSubscribeErrorKind)
check client2.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) ==
some(FilterSubscribeErrorKind.TOO_MANY_REQUESTS)
# ensure period of time has passed and clients can again use the service
await sleepAsync(600.milliseconds)
check client1.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) ==
none(FilterSubscribeErrorKind)
check client2.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) ==
none(FilterSubscribeErrorKind)
asyncTest "Ensure normal usage allowed":
# Given
check client1.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) ==
none(FilterSubscribeErrorKind)
check wakuFilter.subscriptions.isSubscribed(client1.clientPeerId)
await sleepAsync(500.milliseconds)
check client1.ping(serverRemotePeerInfo) == none(FilterSubscribeErrorKind)
check wakuFilter.subscriptions.isSubscribed(client1.clientPeerId)
await sleepAsync(500.milliseconds)
check client1.ping(serverRemotePeerInfo) == none(FilterSubscribeErrorKind)
check wakuFilter.subscriptions.isSubscribed(client1.clientPeerId)
await sleepAsync(50.milliseconds)
check client1.unsubscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) ==
none(FilterSubscribeErrorKind)
check wakuFilter.subscriptions.isSubscribed(client1.clientPeerId) == false
await sleepAsync(50.milliseconds)
check client1.ping(serverRemotePeerInfo) == some(FilterSubscribeErrorKind.NOT_FOUND)
check client1.ping(serverRemotePeerInfo) == some(FilterSubscribeErrorKind.NOT_FOUND)
await sleepAsync(50.milliseconds)
check client1.ping(serverRemotePeerInfo) ==
some(FilterSubscribeErrorKind.TOO_MANY_REQUESTS)
check client2.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) ==
none(FilterSubscribeErrorKind)
check wakuFilter.subscriptions.isSubscribed(client2.clientPeerId) == true

View File

@ -7,6 +7,7 @@ import
waku_filter_v2/client,
waku_filter_v2/subscriptions,
waku_core,
common/rate_limit/setting,
],
../testlib/[common, wakucore]
@ -15,11 +16,16 @@ proc newTestWakuFilter*(
subscriptionTimeout: Duration = DefaultSubscriptionTimeToLiveSec,
maxFilterPeers: uint32 = MaxFilterPeers,
maxFilterCriteriaPerPeer: uint32 = MaxFilterCriteriaPerPeer,
rateLimitSetting: Option[RateLimitSetting] = none[RateLimitSetting](),
): Future[WakuFilter] {.async.} =
let
peerManager = PeerManager.new(switch)
proto = WakuFilter.new(
peerManager, subscriptionTimeout, maxFilterPeers, maxFilterCriteriaPerPeer
peerManager,
subscriptionTimeout,
maxFilterPeers,
maxFilterCriteriaPerPeer,
rateLimitSetting = rateLimitSetting,
)
await proto.start()

View File

@ -7,6 +7,7 @@ import
waku/waku_core,
waku/waku_lightpush,
waku/waku_lightpush/[client, common],
waku/common/rate_limit/setting,
../testlib/[common, wakucore]
proc newTestWakuLightpushNode*(

View File

@ -10,7 +10,7 @@ import
import
waku/[
node/peer_manager,
common/ratelimit,
common/rate_limit/setting,
waku_core,
waku_lightpush,
waku_lightpush/client,

View File

@ -23,7 +23,7 @@ import
waku_api/rest/lightpush/handlers as lightpush_api,
waku_api/rest/lightpush/client as lightpush_api_client,
waku_relay,
common/ratelimit,
common/rate_limit/setting,
],
../testlib/wakucore,
../testlib/wakunode,

View File

@ -0,0 +1,40 @@
## PerPeerRateLimiter
##
## With this class one can easily track usage of a service per PeerId
## Rate limit is applied separately by each peer upon first use. Also time period is counted distinct per peer.
## It will use compensating replenish mode for peers to balance the load and allow fair usage of a service.
{.push raises: [].}
import std/[options, tables], chronos/timer, libp2p/stream/connection, libp2p/utility
import ./[single_token_limiter, service_metrics], ../../utils/tableutils
export token_bucket, setting, service_metrics
type PerPeerRateLimiter* = ref object of RootObj
setting*: Option[RateLimitSetting]
peerBucket: Table[PeerId, Option[TokenBucket]]
proc mgetOrPut(
perPeerRateLimiter: var PerPeerRateLimiter, peerId: PeerId
): var Option[TokenBucket] =
return perPeerRateLimiter.peerBucket.mgetOrPut(
peerId, newTokenBucket(perPeerRateLimiter.setting, ReplenishMode.Compensating)
)
template checkUsageLimit*(
t: var PerPeerRateLimiter,
proto: string,
conn: Connection,
bodyWithinLimit, bodyRejected: untyped,
) =
checkUsageLimit(t.mgetOrPut(conn.peerId), proto, conn, bodyWithinLimit, bodyRejected)
proc unregister*(perPeerRateLimiter: var PerPeerRateLimiter, peerId: PeerId) =
perPeerRateLimiter.peerBucket.del(peerId)
proc unregister*(perPeerRateLimiter: var PerPeerRateLimiter, peerIds: seq[PeerId]) =
perPeerRateLimiter.peerBucket.keepItIf(key notin peerIds)

View File

@ -0,0 +1,134 @@
## RequestRateLimiter
##
## RequestRateLimiter is a general service protection mechanism.
## While applies an overall rate limit, it also ensure fair usage among peers.
##
## This is reached by reject peers that are constantly over using the service while allowing others to use it
## within the global limit set.
## Punished peers will also be recovered after a certain time period if not violating the limit.
##
## This is reached by calculating a ratio of the global limit and applying it to each peer.
## This ratio is applied to the allowed tokens within a ratio * the global time period.
## The allowed tokens for peers are limited to 75% of ratio * global token volume.
##
## This needs to be taken into account when setting the global limit for the specific service type and use cases.
{.push raises: [].}
import
std/[options, math],
chronicles,
chronos/timer,
libp2p/stream/connection,
libp2p/utility
import ./[single_token_limiter, service_metrics, timed_map]
export token_bucket, setting, service_metrics
logScope:
topics = "waku ratelimit"
const PER_PEER_ALLOWED_PERCENT_OF_VOLUME = 0.75
const UNLIMITED_RATIO = 0
const UNLIMITED_TIMEOUT = 0.seconds
const MILISECONDS_RATIO = 10
const SECONDS_RATIO = 3
const MINUTES_RATIO = 2
type RequestRateLimiter* = ref object of RootObj
tokenBucket: Option[TokenBucket]
setting*: Option[RateLimitSetting]
peerBucketSetting*: RateLimitSetting
peerUsage: TimedMap[PeerId, TokenBucket]
proc mgetOrPut(
requestRateLimiter: var RequestRateLimiter, peerId: PeerId
): var TokenBucket =
let bucketForNew = newTokenBucket(some(requestRateLimiter.peerBucketSetting)).valueOr:
raiseAssert "This branch is not allowed to be reached as it will not be called if the setting is None."
return requestRateLimiter.peerUsage.mgetOrPut(peerId, bucketForNew)
proc checkUsage*(
t: var RequestRateLimiter, proto: string, conn: Connection, now = Moment.now()
): bool {.raises: [].} =
if t.tokenBucket.isNone():
return true
let peerBucket = t.mgetOrPut(conn.peerId)
## check requesting peer's usage is not over the calculated ratio and let that peer go which not requested much/or this time...
if not peerBucket.tryConsume(1, now):
trace "peer usage limit reached", peer = conn.peerId
return false
# Ok if the peer can consume, check the overall budget we have left
let tokenBucket = t.tokenBucket.get()
if not tokenBucket.tryConsume(1, now):
return false
return true
template checkUsageLimit*(
t: var RequestRateLimiter,
proto: string,
conn: Connection,
bodyWithinLimit, bodyRejected: untyped,
) =
if t.checkUsage(proto, conn):
waku_service_requests.inc(labelValues = [proto, "served"])
bodyWithinLimit
else:
waku_service_requests.inc(labelValues = [proto, "rejected"])
bodyRejected
# TODO: review these ratio assumptions! Debatable!
func calcPeriodRatio(settingOpt: Option[RateLimitSetting]): int =
settingOpt.withValue(setting):
if setting.isUnlimited():
return UNLIMITED_RATIO
if setting.period <= 1.seconds:
return MILISECONDS_RATIO
if setting.period <= 1.minutes:
return SECONDS_RATIO
return MINUTES_RATIO
do:
# when setting is none
return UNLIMITED_RATIO
# calculates peer cache items timeout
# effectively if a peer does not issue any requests for this amount of time will be forgotten.
func calcCacheTimeout(settingOpt: Option[RateLimitSetting], ratio: int): Duration =
settingOpt.withValue(setting):
if setting.isUnlimited():
return UNLIMITED_TIMEOUT
# CacheTimout for peers is double the replensih period for peers
return setting.period * ratio * 2
do:
# when setting is none
return UNLIMITED_TIMEOUT
func calcPeerTokenSetting(
setting: Option[RateLimitSetting], ratio: int
): RateLimitSetting =
let s = setting.valueOr:
return (0, 0.minutes)
let peerVolume =
trunc((s.volume * ratio).float * PER_PEER_ALLOWED_PERCENT_OF_VOLUME).int
let peerPeriod = s.period * ratio
return (peerVolume, peerPeriod)
proc newRequestRateLimiter*(setting: Option[RateLimitSetting]): RequestRateLimiter =
let ratio = calcPeriodRatio(setting)
return RequestRateLimiter(
tokenBucket: newTokenBucket(setting),
setting: setting,
peerBucketSetting: calcPeerTokenSetting(setting, ratio),
peerUsage: init(TimedMap[PeerId, TokenBucket], calcCacheTimeout(setting, ratio)),
)

View File

@ -0,0 +1,19 @@
{.push raises: [].}
import chronos/timer
# Setting for TokenBucket defined as volume over period of time
type RateLimitSetting* = tuple[volume: int, period: Duration]
# Set the default to switch off rate limiting for now
let DefaultGlobalNonRelayRateLimit*: RateLimitSetting = (0, 0.minutes)
proc isUnlimited*(t: RateLimitSetting): bool {.inline.} =
return t.volume <= 0 or t.period <= 0.seconds
func `$`*(t: RateLimitSetting): string {.inline.} =
return
if t.isUnlimited():
"no-limit"
else:
$t.volume & "/" & $t.period

View File

@ -0,0 +1,50 @@
## This module add usage check helpers for simple rate limiting with the use of TokenBucket.
{.push raises: [].}
import std/[options], chronos/timer, libp2p/stream/connection, libp2p/utility
import ./[token_bucket, setting, service_metrics]
export token_bucket, setting, service_metrics
proc newTokenBucket*(
setting: Option[RateLimitSetting],
replenishMode: ReplenishMode = ReplenishMode.Compensating,
): Option[TokenBucket] =
if setting.isNone():
return none[TokenBucket]()
if setting.get().isUnlimited():
return none[TokenBucket]()
return some(TokenBucket.new(setting.get().volume, setting.get().period))
proc checkUsage(
t: var TokenBucket, proto: string, now = Moment.now()
): bool {.raises: [].} =
if not t.tryConsume(1, now):
return false
return true
proc checkUsage(
t: var Option[TokenBucket], proto: string, now = Moment.now()
): bool {.raises: [].} =
if t.isNone():
return true
var tokenBucket = t.get()
return checkUsage(tokenBucket, proto, now)
template checkUsageLimit*(
t: var Option[TokenBucket] | var TokenBucket,
proto: string,
conn: Connection,
bodyWithinLimit, bodyRejected: untyped,
) =
if t.checkUsage(proto):
waku_service_requests.inc(labelValues = [proto, "served"])
bodyWithinLimit
else:
waku_service_requests.inc(labelValues = [proto, "rejected"])
bodyRejected

View File

@ -0,0 +1,162 @@
## TimedMap
## ===========
## Inspired by nim-libp2p's TimedCache class. This is using the same approach to prune
## untouched items from the map where the set timeout duration is reached.
## But unlike TimedCache this TimedMap is capable to hold and return any type of value for a key.
##
## - `mgetOrPut` proc is similar to std/tables, but will renew the timeout for the key.
## - For non-renewal check use `contains` proc.
## - `expire` proc will remove all items that have expired.
##
## Choose your initial timeout for your needs to control the size of the map.
{.push raises: [].}
import std/[hashes, sets]
import chronos/timer, results
import libp2p/utility
export results
type
TimedEntry[K, V] = ref object of RootObj
key: K
value: V
addedAt: Moment
expiresAt: Moment
next, prev: TimedEntry[K, V]
TimedMap*[K, V] = object of RootObj
head, tail: TimedEntry[K, V] # nim linked list doesn't allow inserting at pos
entries: HashSet[TimedEntry[K, V]]
timeout: Duration
func `==`*[K, V](a, b: TimedEntry[K, V]): bool =
if isNil(a) == isNil(b):
isNil(a) or a.key == b.key
else:
false
func hash*(a: TimedEntry): Hash =
if isNil(a):
default(Hash)
else:
hash(a[].key)
func `$`*[T](a: T): string =
if isNil(a):
"nil"
return $a
func `$`*[K, V](a: TimedEntry[K, V]): string =
if isNil(a):
return "nil"
return
"TimedEntry: key:" & $a.key & ", val:" & $a.value & ", addedAt:" & $a.addedAt &
", expiresAt:" & $a.expiresAt
func expire*(t: var TimedMap, now: Moment = Moment.now()) =
while t.head != nil and t.head.expiresAt <= now:
t.entries.excl(t.head)
t.head.prev = nil
t.head = t.head.next
if t.head == nil:
t.tail = nil
func del[K, V](t: var TimedMap[K, V], key: K): Opt[TimedEntry[K, V]] =
# Removes existing key from cache, returning the previous item if present
let tmp = TimedEntry[K, V](key: key)
if tmp in t.entries:
let item =
try:
t.entries[tmp] # use the shared instance in the set
except KeyError:
raiseAssert "just checked"
t.entries.excl(item)
if t.head == item:
t.head = item.next
if t.tail == item:
t.tail = item.prev
if item.next != nil:
item.next.prev = item.prev
if item.prev != nil:
item.prev.next = item.next
Opt.some(item)
else:
Opt.none(TimedEntry[K, V])
func remove*[K, V](t: var TimedMap[K, V], key: K): Opt[V] =
# Removes existing key from cache, returning the previous value if present
# public version of del without exporting TimedEntry
let deleted = t.del(key)
if deleted.isSome():
Opt.some(deleted[].value)
else:
Opt.none(V)
proc mgetOrPut*[K, V](t: var TimedMap[K, V], k: K, v: V, now = Moment.now()): var V =
# Puts k in cache, returning true if the item was already present and false
# otherwise. If the item was already present, its expiry timer will be
# refreshed.
t.expire(now)
let
previous = t.del(k) # Refresh existing item
addedAt =
if previous.isSome():
previous[].addedAt
else:
now
value =
if previous.isSome():
previous[].value
else:
v
let node =
TimedEntry[K, V](key: k, value: value, addedAt: addedAt, expiresAt: now + t.timeout)
if t.head == nil:
t.tail = node
t.head = t.tail
else:
# search from tail because typically that's where we add when now grows
var cur = t.tail
while cur != nil and node.expiresAt < cur.expiresAt:
cur = cur.prev
if cur == nil:
node.next = t.head
t.head.prev = node
t.head = node
else:
node.prev = cur
node.next = cur.next
cur.next = node
if cur == t.tail:
t.tail = node
t.entries.incl(node)
return node.value
func contains*[K, V](t: TimedMap[K, V], k: K): bool =
let tmp = TimedEntry[K, V](key: k)
tmp in t.entries
func addedAt*[K, V](t: var TimedMap[K, V], k: K): Moment =
let tmp = TimedEntry[K, V](key: k)
try:
if tmp in t.entries: # raising is slow
# Use shared instance from entries
return t.entries[tmp][].addedAt
except KeyError:
raiseAssert "just checked"
default(Moment)
func init*[K, V](T: type TimedMap[K, V], timeout: Duration): T =
T(timeout: timeout)

View File

@ -0,0 +1,182 @@
{.push raises: [].}
import chronos, std/math, std/options
const BUDGET_COMPENSATION_LIMIT_PERCENT = 0.25
## This is an extract from chronos/rate_limit.nim due to the found bug in the original implementation.
## Unfortunately that bug cannot be solved without harm the original features of TokenBucket class.
## So, this current shortcut is used to enable move ahead with nwaku rate limiter implementation.
## ref: https://github.com/status-im/nim-chronos/issues/500
##
## This version of TokenBucket is different from the original one in chronos/rate_limit.nim in many ways:
## - It has a new mode called `Compensating` which is the default mode.
## Compensation is calculated as the not used bucket capacity in the last measured period(s) in average.
## or up until maximum the allowed compansation treshold (Currently it is const 25%).
## Also compensation takes care of the proper time period calculation to avoid non-usage periods that can lead to
## overcompensation.
## - Strict mode is also available which will only replenish when time period is over but also will fill
## the bucket to the max capacity.
type
ReplenishMode* = enum
Strict
Compensating
TokenBucket* = ref object
budget: int ## Current number of tokens in the bucket
budgetCap: int ## Bucket capacity
lastTimeFull: Moment
## This timer measures the proper periodizaiton of the bucket refilling
fillDuration: Duration ## Refill period
case replenishMode*: ReplenishMode
of Strict:
## In strict mode, the bucket is refilled only till the budgetCap
discard
of Compensating:
## This is the default mode.
maxCompensation: float
func periodDistance(bucket: TokenBucket, currentTime: Moment): float =
## notice fillDuration cannot be zero by design
## period distance is a float number representing the calculated period time
## since the last time bucket was refilled.
return
nanoseconds(currentTime - bucket.lastTimeFull).float /
nanoseconds(bucket.fillDuration).float
func getUsageAverageSince(bucket: TokenBucket, distance: float): float =
if distance == 0.float:
## in case there is zero time difference than the usage percentage is 100%
return 1.0
## budgetCap can never be zero
## usage average is calculated as a percentage of total capacity available over
## the measured period
return bucket.budget.float / bucket.budgetCap.float / distance
proc calcCompensation(bucket: TokenBucket, averageUsage: float): int =
# if we already fully used or even overused the tokens, there is no place for compensation
if averageUsage >= 1.0:
return 0
## compensation is the not used bucket capacity in the last measured period(s) in average.
## or maximum the allowed compansation treshold
let compensationPercent =
min((1.0 - averageUsage) * bucket.budgetCap.float, bucket.maxCompensation)
return trunc(compensationPercent).int
func periodElapsed(bucket: TokenBucket, currentTime: Moment): bool =
return currentTime - bucket.lastTimeFull >= bucket.fillDuration
## Update will take place if bucket is empty and trying to consume tokens.
## It checks if the bucket can be replenished as refill duration is passed or not.
## - strict mode:
proc updateStrict(bucket: TokenBucket, currentTime: Moment) =
if bucket.fillDuration == default(Duration):
bucket.budget = min(bucket.budgetCap, bucket.budget)
return
if not periodElapsed(bucket, currentTime):
return
bucket.budget = bucket.budgetCap
bucket.lastTimeFull = currentTime
## - compensating - ballancing load:
## - between updates we calculate average load (current bucket capacity / number of periods till last update)
## - gives the percentage load used recently
## - with this we can replenish bucket up to 100% + calculated leftover from previous period (caped with max treshold)
proc updateWithCompensation(bucket: TokenBucket, currentTime: Moment) =
if bucket.fillDuration == default(Duration):
bucket.budget = min(bucket.budgetCap, bucket.budget)
return
# do not replenish within the same period
if not periodElapsed(bucket, currentTime):
return
let distance = bucket.periodDistance(currentTime)
let recentAvgUsage = bucket.getUsageAverageSince(distance)
let compensation = bucket.calcCompensation(recentAvgUsage)
bucket.budget = bucket.budgetCap + compensation
bucket.lastTimeFull = currentTime
proc update(bucket: TokenBucket, currentTime: Moment) =
if bucket.replenishMode == ReplenishMode.Compensating:
updateWithCompensation(bucket, currentTime)
else:
updateStrict(bucket, currentTime)
proc tryConsume*(bucket: TokenBucket, tokens: int, now = Moment.now()): bool =
## If `tokens` are available, consume them,
## Otherwhise, return false.
if bucket.budget >= bucket.budgetCap:
bucket.lastTimeFull = now
if bucket.budget >= tokens:
bucket.budget -= tokens
return true
bucket.update(now)
if bucket.budget >= tokens:
bucket.budget -= tokens
return true
else:
return false
proc replenish*(bucket: TokenBucket, tokens: int, now = Moment.now()) =
## Add `tokens` to the budget (capped to the bucket capacity)
bucket.budget += tokens
bucket.update(now)
proc new*(
T: type[TokenBucket],
budgetCap: int,
fillDuration: Duration = 1.seconds,
mode: ReplenishMode = ReplenishMode.Compensating,
): T =
assert not isZero(fillDuration)
assert budgetCap != 0
## Create different mode TokenBucket
case mode
of ReplenishMode.Strict:
return T(
budget: budgetCap,
budgetCap: budgetCap,
fillDuration: fillDuration,
lastTimeFull: Moment.now(),
replenishMode: mode,
)
of ReplenishMode.Compensating:
T(
budget: budgetCap,
budgetCap: budgetCap,
fillDuration: fillDuration,
lastTimeFull: Moment.now(),
replenishMode: mode,
maxCompensation: budgetCap.float * BUDGET_COMPENSATION_LIMIT_PERCENT,
)
proc newStrict*(T: type[TokenBucket], capacity: int, period: Duration): TokenBucket =
T.new(capacity, period, ReplenishMode.Strict)
proc newCompensating*(
T: type[TokenBucket], capacity: int, period: Duration
): TokenBucket =
T.new(capacity, period, ReplenishMode.Compensating)
func `$`*(b: TokenBucket): string {.inline.} =
if isNil(b):
return "nil"
return $b.budgetCap & "/" & $b.fillDuration
func `$`*(ob: Option[TokenBucket]): string {.inline.} =
if ob.isNone():
return "no-limit"
return $ob.get()

View File

@ -1,53 +0,0 @@
{.push raises: [].}
import std/options, chronos/timer, libp2p/stream/connection
import ./tokenbucket
export tokenbucket
type RateLimitSetting* = tuple[volume: int, period: Duration]
# Set the default to switch off rate limiting for now
let DefaultGlobalNonRelayRateLimit*: RateLimitSetting = (0, 0.minutes)
proc newTokenBucket*(setting: Option[RateLimitSetting]): Option[TokenBucket] =
if setting.isNone:
return none[TokenBucket]()
let (volume, period) = setting.get()
if volume <= 0 or period <= 0.seconds:
return none[TokenBucket]()
return some(TokenBucket.new(volume, period))
proc checkUsage(
t: var Option[TokenBucket], proto: string, conn: Connection
): bool {.raises: [].} =
if t.isNone():
return true
let tokenBucket = t.get()
if not tokenBucket.tryConsume(1):
return false
return true
template checkUsageLimit*(
t: var Option[TokenBucket],
proto: string,
conn: Connection,
bodyWithinLimit, bodyRejected: untyped,
) =
if t.checkUsage(proto, conn):
waku_service_requests.inc(labelValues = [proto, "served"])
bodyWithinLimit
else:
waku_service_requests.inc(labelValues = [proto, "rejected"])
bodyRejected
func `$`*(ob: Option[TokenBucket]): string {.inline.} =
if ob.isNone():
return "no-limit"
return $ob.get()

View File

@ -1,64 +0,0 @@
{.push raises: [].}
import chronos
## This is an extract from chronos/ratelimit.nim due to the found bug in the original implementation.
## Unfortunately that bug cannot be solved without harm the original features of TokenBucket class.
## So, this current shortcut is used to enable move ahead with nwaku rate limiter implementation.
type TokenBucket* = ref object
budget*: int ## Current number of tokens in the bucket
budgetCap: int ## Bucket capacity
lastTimeFull: Moment
## This timer measures the proper periodizaiton of the bucket refilling
fillDuration: Duration ## Refill period
## Update will take place if bucket is empty and trying to consume tokens.
## It checks if the bucket can be replenished as refill duration is passed or not.
proc update(bucket: TokenBucket, currentTime: Moment) =
if bucket.fillDuration == default(Duration):
bucket.budget = min(bucket.budgetCap, bucket.budget)
return
let timeDeltaFromLastFull = currentTime - bucket.lastTimeFull
if timeDeltaFromLastFull.milliseconds < bucket.fillDuration.milliseconds:
return
bucket.budget = bucket.budgetCap
bucket.lastTimeFull = currentTime
proc tryConsume*(bucket: TokenBucket, tokens: int, now = Moment.now()): bool =
## If `tokens` are available, consume them,
## Otherwhise, return false.
if bucket.budget == bucket.budgetCap:
bucket.lastTimeFull = now
if bucket.budget >= tokens:
bucket.budget -= tokens
return true
bucket.update(now)
if bucket.budget >= tokens:
bucket.budget -= tokens
return true
else:
return false
proc replenish*(bucket: TokenBucket, tokens: int, now = Moment.now()) =
## Add `tokens` to the budget (capped to the bucket capacity)
bucket.budget += tokens
bucket.update(now)
proc new*(T: type[TokenBucket], budgetCap: int, fillDuration: Duration = 1.seconds): T =
## Create a TokenBucket
T(
budget: budgetCap,
budgetCap: budgetCap,
fillDuration: fillDuration,
lastTimeFull: Moment.now(),
)
func `$`*(b: TokenBucket): string {.inline.} =
return $b.budgetCap & "/" & $b.fillDuration

View File

@ -36,7 +36,7 @@ import
../node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations,
../waku_lightpush/common,
../common/utils/parse_size_units,
../common/ratelimit
../common/rate_limit/setting
## Peer persistence

View File

@ -48,7 +48,7 @@ import
../waku_rln_relay,
./config,
./peer_manager,
../common/ratelimit
../common/rate_limit/setting
declarePublicCounter waku_node_messages, "number of messages received", ["type"]
declarePublicHistogram waku_histogram_message_size,
@ -442,13 +442,18 @@ proc mountFilter*(
maxFilterPeers: uint32 = filter_subscriptions.MaxFilterPeers,
maxFilterCriteriaPerPeer: uint32 = filter_subscriptions.MaxFilterCriteriaPerPeer,
messageCacheTTL: Duration = filter_subscriptions.MessageCacheTTL,
rateLimitSetting: RateLimitSetting = FilterPerPeerRateLimit,
) {.async: (raises: []).} =
## Mounting filter v2 protocol
info "mounting filter protocol"
node.wakuFilter = WakuFilter.new(
node.peerManager, subscriptionTimeout, maxFilterPeers, maxFilterCriteriaPerPeer,
node.peerManager,
subscriptionTimeout,
maxFilterPeers,
maxFilterCriteriaPerPeer,
messageCacheTTL,
some(rateLimitSetting),
)
if node.started:

View File

@ -71,7 +71,8 @@ proc getStatusDesc(
of FilterSubscribeErrorKind.PEER_DIAL_FAILURE:
err.address
of FilterSubscribeErrorKind.BAD_RESPONSE, FilterSubscribeErrorKind.BAD_REQUEST,
FilterSubscribeErrorKind.NOT_FOUND, FilterSubscribeErrorKind.SERVICE_UNAVAILABLE:
FilterSubscribeErrorKind.NOT_FOUND, FilterSubscribeErrorKind.TOO_MANY_REQUESTS,
FilterSubscribeErrorKind.SERVICE_UNAVAILABLE:
err.cause
of FilterSubscribeErrorKind.UNKNOWN:
"UNKNOWN"
@ -111,6 +112,8 @@ proc convertErrorKindToHttpStatus(
return Http400
of filter_protocol_type.FilterSubscribeErrorKind.NOT_FOUND:
return Http404
of filter_protocol_type.FilterSubscribeErrorKind.TOO_MANY_REQUESTS:
return Http429
of filter_protocol_type.FilterSubscribeErrorKind.SERVICE_UNAVAILABLE:
return Http503

View File

@ -69,7 +69,9 @@ proc sendSubscribeRequest(
let response = respDecodeRes.get()
if response.requestId != filterSubscribeRequest.requestId:
# DOS protection rate limit checks does not know about request id
if response.statusCode != FilterSubscribeErrorKind.TOO_MANY_REQUESTS.uint32 and
response.requestId != filterSubscribeRequest.requestId:
trace "Filter subscribe response requestId mismatch", servicePeer, response
waku_filter_errors.inc(labelValues = [requestIdMismatch])
return err(FilterSubscribeError.badResponse(requestIdMismatch))

View File

@ -12,6 +12,7 @@ type
BAD_RESPONSE = uint32(300)
BAD_REQUEST = uint32(400)
NOT_FOUND = uint32(404)
TOO_MANY_REQUESTS = uint32(429)
SERVICE_UNAVAILABLE = uint32(503)
PEER_DIAL_FAILURE = uint32(504)
@ -19,7 +20,7 @@ type
case kind*: FilterSubscribeErrorKind
of PEER_DIAL_FAILURE:
address*: string
of BAD_RESPONSE, BAD_REQUEST, NOT_FOUND, SERVICE_UNAVAILABLE:
of BAD_RESPONSE, BAD_REQUEST, NOT_FOUND, TOO_MANY_REQUESTS, SERVICE_UNAVAILABLE:
cause*: string
else:
discard
@ -49,6 +50,11 @@ proc notFound*(
): FilterSubscribeError =
FilterSubscribeError(kind: FilterSubscribeErrorKind.NOT_FOUND, cause: cause)
proc tooManyRequests*(
T: type FilterSubscribeError, cause = "too many requests"
): FilterSubscribeError =
FilterSubscribeError(kind: FilterSubscribeErrorKind.TOO_MANY_REQUESTS, cause: cause)
proc serviceUnavailable*(
T: type FilterSubscribeError, cause = "service unavailable"
): FilterSubscribeError =
@ -56,7 +62,7 @@ proc serviceUnavailable*(
proc parse*(T: type FilterSubscribeErrorKind, kind: uint32): T =
case kind
of 000, 200, 300, 400, 404, 503:
of 000, 200, 300, 400, 404, 429, 503:
FilterSubscribeErrorKind(kind)
else:
FilterSubscribeErrorKind.UNKNOWN
@ -66,7 +72,7 @@ proc parse*(T: type FilterSubscribeError, kind: uint32, cause = "", address = ""
case kind
of PEER_DIAL_FAILURE:
FilterSubscribeError(kind: kind, address: address)
of BAD_RESPONSE, BAD_REQUEST, NOT_FOUND, SERVICE_UNAVAILABLE:
of BAD_RESPONSE, BAD_REQUEST, NOT_FOUND, TOO_MANY_REQUESTS, SERVICE_UNAVAILABLE:
FilterSubscribeError(kind: kind, cause: cause)
else:
FilterSubscribeError(kind: kind)
@ -81,6 +87,8 @@ proc `$`*(err: FilterSubscribeError): string =
"BAD_REQUEST: " & err.cause
of FilterSubscribeErrorKind.NOT_FOUND:
"NOT_FOUND: " & err.cause
of FilterSubscribeErrorKind.TOO_MANY_REQUESTS:
"TOO_MANY_REQUESTS: " & err.cause
of FilterSubscribeErrorKind.SERVICE_UNAVAILABLE:
"SERVICE_UNAVAILABLE: " & err.cause
of FilterSubscribeErrorKind.UNKNOWN:

View File

@ -13,11 +13,8 @@ import
import
../node/peer_manager,
../waku_core,
./common,
./protocol_metrics,
./rpc_codec,
./rpc,
./subscriptions
../common/rate_limit/per_peer_limiter,
./[common, protocol_metrics, rpc_codec, rpc, subscriptions]
logScope:
topics = "waku filter"
@ -30,6 +27,7 @@ type WakuFilter* = ref object of LPProtocol
peerManager: PeerManager
maintenanceTask: TimerCallback
messageCache: TimedCache[string]
peerRequestRateLimiter*: PerPeerRateLimiter
proc pingSubscriber(wf: WakuFilter, peerId: PeerID): FilterSubscribeResult =
trace "pinging subscriber", peerId = peerId
@ -97,6 +95,9 @@ proc unsubscribe(
wf.subscriptions.removeSubscription(peerId, filterCriteria).isOkOr:
return err(FilterSubscribeError.notFound())
## Note: do not remove from peerRequestRateLimiter to prevent trick with subscribe/unsubscribe loop
## We remove only if peerManager removes the peer
ok()
proc unsubscribeAll(wf: WakuFilter, peerId: PeerID): FilterSubscribeResult =
@ -167,6 +168,9 @@ proc pushToPeer(wf: WakuFilter, peer: PeerId, buffer: seq[byte]) {.async.} =
return
await conn.get().writeLp(buffer)
waku_service_network_bytes.inc(
amount = buffer.len().int64, labelValues = [WakuFilterPushCodec, "out"]
)
proc pushToPeers(
wf: WakuFilter, peers: seq[PeerId], messagePush: MessagePush
@ -210,6 +214,7 @@ proc maintainSubscriptions*(wf: WakuFilter) =
if peersToRemove.len > 0:
wf.subscriptions.removePeers(peersToRemove)
wf.peerRequestRateLimiter.unregister(peersToRemove)
wf.subscriptions.cleanUp()
@ -267,21 +272,36 @@ proc initProtocolHandler(wf: WakuFilter) =
proc handler(conn: Connection, proto: string) {.async.} =
trace "filter subscribe request handler triggered", peer_id = shortLog(conn.peerId)
let buf = await conn.readLp(int(DefaultMaxSubscribeSize))
var response: FilterSubscribeResponse
let decodeRes = FilterSubscribeRequest.decode(buf)
if decodeRes.isErr():
error "Failed to decode filter subscribe request",
peer_id = conn.peerId, err = decodeRes.error
waku_filter_errors.inc(labelValues = [decodeRpcFailure])
return
wf.peerRequestRateLimiter.checkUsageLimit(WakuFilterSubscribeCodec, conn):
let buf = await conn.readLp(int(DefaultMaxSubscribeSize))
let request = decodeRes.value #TODO: toAPI() split here
waku_service_network_bytes.inc(
amount = buf.len().int64, labelValues = [WakuFilterSubscribeCodec, "in"]
)
let response = wf.handleSubscribeRequest(conn.peerId, request)
let decodeRes = FilterSubscribeRequest.decode(buf)
if decodeRes.isErr():
error "Failed to decode filter subscribe request",
peer_id = conn.peerId, err = decodeRes.error
waku_filter_errors.inc(labelValues = [decodeRpcFailure])
return
debug "sending filter subscribe response",
peer_id = shortLog(conn.peerId), response = response
let request = decodeRes.value #TODO: toAPI() split here
response = wf.handleSubscribeRequest(conn.peerId, request)
debug "sending filter subscribe response",
peer_id = shortLog(conn.peerId), response = response
do:
debug "filter request rejected due rate limit exceeded",
peerId = conn.peerId, limit = $wf.peerRequestRateLimiter.setting
response = FilterSubscribeResponse(
requestId: "N/A",
statusCode: FilterSubscribeErrorKind.TOO_MANY_REQUESTS.uint32,
statusDesc: some("filter request rejected due rate limit exceeded"),
)
await conn.writeLp(response.encode().buffer) #TODO: toRPC() separation here
return
@ -296,6 +316,7 @@ proc new*(
maxFilterPeers: uint32 = MaxFilterPeers,
maxFilterCriteriaPerPeer: uint32 = MaxFilterCriteriaPerPeer,
messageCacheTTL: Duration = MessageCacheTTL,
rateLimitSetting: Option[RateLimitSetting] = none[RateLimitSetting](),
): T =
let wf = WakuFilter(
subscriptions: FilterSubscriptions.init(
@ -303,6 +324,7 @@ proc new*(
),
peerManager: peerManager,
messageCache: init(TimedCache[string], messageCacheTTL),
peerRequestRateLimiter: PerPeerRateLimiter(setting: rateLimitSetting),
)
wf.initProtocolHandler()

View File

@ -1,7 +1,7 @@
{.push raises: [].}
import std/[sets, tables], chronicles, chronos, libp2p/peerid, stew/shims/sets
import ../waku_core, ../utils/tableutils
import ../waku_core, ../utils/tableutils, ../common/rate_limit/setting
logScope:
topics = "waku filter subscriptions"
@ -12,6 +12,12 @@ const
DefaultSubscriptionTimeToLiveSec* = 5.minutes
MessageCacheTTL* = 2.minutes
# Acceptable call frequence from one peer using filter service
# Assumption is having to set up a subscription with max 30 calls than using ping in every min
# While subscribe/unsubscribe events are distributed in time among clients, pings will happen regularly from
# all subscribed peers
FilterPerPeerRateLimit*: RateLimitSetting = (30, 1.minutes)
type
# a single filter criterion is fully defined by a pubsub topic and content topic
FilterCriterion* = tuple[pubsubTopic: PubsubTopic, contentTopic: ContentTopic]

View File

@ -1,23 +1,22 @@
{.push raises: [].}
import
../waku_core,
../waku_relay,
./common,
./protocol,
../waku_rln_relay,
../waku_rln_relay/protocol_types,
../common/ratelimit
import
std/times,
libp2p/peerid,
stew/byteutils
../waku_core,
../waku_relay,
./common,
./protocol,
../waku_rln_relay,
../waku_rln_relay/protocol_types
proc checkAndGenerateRLNProof*(rlnPeer: Option[WakuRLNRelay], message: WakuMessage): Result[WakuMessage, string] =
import std/times, libp2p/peerid, stew/byteutils
proc checkAndGenerateRLNProof*(
rlnPeer: Option[WakuRLNRelay], message: WakuMessage
): Result[WakuMessage, string] =
# check if the message already has RLN proof
if message.proof.len > 0:
return ok(message)
if rlnPeer.isNone():
notice "Publishing message without RLN proof"
return ok(message)
@ -32,16 +31,15 @@ proc checkAndGenerateRLNProof*(rlnPeer: Option[WakuRLNRelay], message: WakuMessa
proc getNilPushHandler*(): PushMessageHandler =
return proc(
peer: PeerId, pubsubTopic: string, message: WakuMessage
peer: PeerId, pubsubTopic: string, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.} =
return err("no waku relay found")
proc getRelayPushHandler*(
wakuRelay: WakuRelay,
rlnPeer: Option[WakuRLNRelay] = none[WakuRLNRelay]()
wakuRelay: WakuRelay, rlnPeer: Option[WakuRLNRelay] = none[WakuRLNRelay]()
): PushMessageHandler =
return proc(
peer: PeerId, pubsubTopic: string, message: WakuMessage
peer: PeerId, pubsubTopic: string, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.} =
# append RLN proof
let msgWithProof = checkAndGenerateRLNProof(rlnPeer, message)
@ -57,4 +55,4 @@ proc getRelayPushHandler*(
let msgHash = computeMessageHash(pubsubTopic, message).to0xHex()
notice "Lightpush request has not been published to any peers", msg_hash = msgHash
return ok()
return ok()

View File

@ -8,10 +8,7 @@ import
./rpc,
./rpc_codec,
./protocol_metrics,
../common/ratelimit,
../common/waku_service_metrics
export ratelimit
../common/rate_limit/request_limiter
logScope:
topics = "waku lightpush"
@ -20,7 +17,7 @@ type WakuLightPush* = ref object of LPProtocol
rng*: ref rand.HmacDrbgContext
peerManager*: PeerManager
pushHandler*: PushMessageHandler
requestRateLimiter*: Option[TokenBucket]
requestRateLimiter*: RequestRateLimiter
proc handleRequest*(
wl: WakuLightPush, peerId: PeerId, buffer: seq[byte]
@ -76,7 +73,7 @@ proc initProtocolHandler(wl: WakuLightPush) =
rpc = await handleRequest(wl, conn.peerId, buffer)
do:
debug "lightpush request rejected due rate limit exceeded",
peerId = conn.peerId, limit = $wl.requestRateLimiter
peerId = conn.peerId, limit = $wl.requestRateLimiter.setting
rpc = static(
PushRPC(
@ -108,7 +105,7 @@ proc new*(
rng: rng,
peerManager: peerManager,
pushHandler: pushHandler,
requestRateLimiter: newTokenBucket(rateLimitSetting),
requestRateLimiter: newRequestRateLimiter(rateLimitSetting),
)
wl.initProtocolHandler()
return wl

View File

@ -20,8 +20,7 @@ import
./common,
./rpc_codec,
./protocol_metrics,
../common/ratelimit,
../common/waku_service_metrics
../common/rate_limit/request_limiter
logScope:
topics = "waku store"
@ -36,7 +35,7 @@ type WakuStore* = ref object of LPProtocol
peerManager: PeerManager
rng: ref rand.HmacDrbgContext
requestHandler*: StoreQueryRequestHandler
requestRateLimiter*: Option[TokenBucket]
requestRateLimiter*: RequestRateLimiter
## Protocol
@ -107,7 +106,7 @@ proc initProtocolHandler(self: WakuStore) =
resBuf = await self.handleQueryRequest(conn.peerId, reqBuf)
do:
debug "store query request rejected due rate limit exceeded",
peerId = conn.peerId, limit = $self.requestRateLimiter
peerId = conn.peerId, limit = $self.requestRateLimiter.setting
resBuf = rejectReposnseBuffer
let writeRes = catch:
@ -138,7 +137,7 @@ proc new*(
rng: rng,
peerManager: peerManager,
requestHandler: requestHandler,
requestRateLimiter: newTokenBucket(rateLimitSetting),
requestRateLimiter: newRequestRateLimiter(rateLimitSetting),
)
store.initProtocolHandler()

View File

@ -21,8 +21,7 @@ import
./rpc,
./rpc_codec,
./protocol_metrics,
../common/ratelimit,
../common/waku_service_metrics
../common/rate_limit/request_limiter
logScope:
topics = "waku legacy store"
@ -37,7 +36,7 @@ type WakuStore* = ref object of LPProtocol
peerManager: PeerManager
rng: ref rand.HmacDrbgContext
queryHandler*: HistoryQueryHandler
requestRateLimiter*: Option[TokenBucket]
requestRateLimiter*: RequestRateLimiter
## Protocol
@ -122,7 +121,7 @@ proc initProtocolHandler(ws: WakuStore) =
resBuf = await ws.handleLegacyQueryRequest(conn.peerId, reqBuf)
do:
debug "Legacy store query request rejected due rate limit exceeded",
peerId = conn.peerId, limit = $ws.requestRateLimiter
peerId = conn.peerId, limit = $ws.requestRateLimiter.setting
resBuf = rejectResponseBuf
let writeRes = catch:
@ -154,7 +153,7 @@ proc new*(
rng: rng,
peerManager: peerManager,
queryHandler: queryHandler,
requestRateLimiter: newTokenBucket(rateLimitSetting),
requestRateLimiter: newRequestRateLimiter(rateLimitSetting),
)
ws.initProtocolHandler()
ws