chore: rate limit peer exchange protocol, enhanced response status in RPC (#3035)

* Enhanced peer-ex protocol - added rate limiting, added response status and desc to the rpc

* Better error result handling for PeerEx request, adjusted tests

* Refactored RateLimit configuration option for better CLI UX - now possible to set separate limits per protocol. Adjusted mountings. Added and adjusted tests

* Fix libwaku due to changes of error return type of fetchPeerExchangePeers

* Fix rate limit setting tests due to changed defaults

* Introduce new gauge to help dasboard effectively show current rate limit applied for protocol

* Adjust timeing in filter rate limit test to let macos CI test run ok.

* Address review findings, namings, error logs, removed left-overs

* Changes to reflect latest spec agreement and changes. PeerExchange RPC is changed the now respond structure will contain status_code and status_desc.
This commit is contained in:
NagyZoltanPeter 2024-09-18 15:58:07 +02:00 committed by GitHub
parent 6dfefc5e42
commit e7ae1a0382
23 changed files with 653 additions and 97 deletions

View File

@ -104,7 +104,8 @@ proc updateDiscv5BootstrapNodes(nodes: string, waku: ptr Waku): Result[void, str
proc performPeerExchangeRequestTo(
numPeers: uint64, waku: ptr Waku
): Future[Result[int, string]] {.async.} =
return await waku.node.fetchPeerExchangePeers(numPeers)
return (await waku.node.fetchPeerExchangePeers(numPeers)).isOkOr:
return err($error)
proc process*(
self: ptr DiscoveryRequest, waku: ptr Waku

View File

@ -8,4 +8,5 @@ import
./test_parse_size,
./test_tokenbucket,
./test_requestratelimiter,
./test_ratelimit_setting,
./test_timed_map

View File

@ -0,0 +1,165 @@
# Chronos Test Suite
# (c) Copyright 2022-Present
# Status Research & Development GmbH
#
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
{.used.}
import testutils/unittests
import chronos, libp2p/stream/connection
import std/[sequtils, options, tables]
import ../../waku/common/rate_limit/request_limiter
import ../../waku/common/rate_limit/timed_map
let proto = "ProtocolDescriptor"
let conn1 = Connection(peerId: PeerId.random().tryGet())
let conn2 = Connection(peerId: PeerId.random().tryGet())
let conn3 = Connection(peerId: PeerId.random().tryGet())
suite "RateLimitSetting":
test "Parse rate limit setting - ok":
let test1 = "10/2m"
let test2 = " store : 10 /1h"
let test2a = "storev2 : 10 /1h"
let test2b = "storeV3: 12 /1s"
let test3 = "LIGHTPUSH: 10/ 1m"
let test4 = "px:10/2 s "
let test5 = "filter:42/66ms"
let expU = UnlimitedRateLimit
let exp1: RateLimitSetting = (10, 2.minutes)
let exp2: RateLimitSetting = (10, 1.hours)
let exp2a: RateLimitSetting = (10, 1.hours)
let exp2b: RateLimitSetting = (12, 1.seconds)
let exp3: RateLimitSetting = (10, 1.minutes)
let exp4: RateLimitSetting = (10, 2.seconds)
let exp5: RateLimitSetting = (42, 66.milliseconds)
let res1 = ProtocolRateLimitSettings.parse(@[test1])
let res2 = ProtocolRateLimitSettings.parse(@[test2])
let res2a = ProtocolRateLimitSettings.parse(@[test2a])
let res2b = ProtocolRateLimitSettings.parse(@[test2b])
let res3 = ProtocolRateLimitSettings.parse(@[test3])
let res4 = ProtocolRateLimitSettings.parse(@[test4])
let res5 = ProtocolRateLimitSettings.parse(@[test5])
check:
res1.isOk()
res1.get() == {GLOBAL: exp1, FILTER: FilterDefaultPerPeerRateLimit}.toTable()
res2.isOk()
res2.get() ==
{
GLOBAL: expU,
FILTER: FilterDefaultPerPeerRateLimit,
STOREV2: exp2,
STOREV3: exp2,
}.toTable()
res2a.isOk()
res2a.get() ==
{GLOBAL: expU, FILTER: FilterDefaultPerPeerRateLimit, STOREV2: exp2a}.toTable()
res2b.isOk()
res2b.get() ==
{GLOBAL: expU, FILTER: FilterDefaultPerPeerRateLimit, STOREV3: exp2b}.toTable()
res3.isOk()
res3.get() ==
{GLOBAL: expU, FILTER: FilterDefaultPerPeerRateLimit, LIGHTPUSH: exp3}.toTable()
res4.isOk()
res4.get() ==
{GLOBAL: expU, FILTER: FilterDefaultPerPeerRateLimit, PEEREXCHG: exp4}.toTable()
res5.isOk()
res5.get() == {GLOBAL: expU, FILTER: exp5}.toTable()
test "Parse rate limit setting - err":
let test1 = "10/2d"
let test2 = " stre : 10 /1h"
let test2a = "storev2 10 /1h"
let test2b = "storev3: 12 1s"
let test3 = "somethingelse: 10/ 1m"
let test4 = ":px:10/2 s "
let test5 = "filter:p42/66ms"
let res1 = ProtocolRateLimitSettings.parse(@[test1])
let res2 = ProtocolRateLimitSettings.parse(@[test2])
let res2a = ProtocolRateLimitSettings.parse(@[test2a])
let res2b = ProtocolRateLimitSettings.parse(@[test2b])
let res3 = ProtocolRateLimitSettings.parse(@[test3])
let res4 = ProtocolRateLimitSettings.parse(@[test4])
let res5 = ProtocolRateLimitSettings.parse(@[test5])
check:
res1.isErr()
res2.isErr()
res2a.isErr()
res2b.isErr()
res3.isErr()
res4.isErr()
res5.isErr()
test "Parse rate limit setting - complex":
let expU = UnlimitedRateLimit
let test1 = @["lightpush:2/2ms", "10/2m", " store: 3/3s", " storev2:12/12s"]
let exp1 = {
GLOBAL: (10, 2.minutes),
FILTER: FilterDefaultPerPeerRateLimit,
LIGHTPUSH: (2, 2.milliseconds),
STOREV3: (3, 3.seconds),
STOREV2: (12, 12.seconds),
}.toTable()
let res1 = ProtocolRateLimitSettings.parse(test1)
check:
res1.isOk()
res1.get() == exp1
res1.get().getSetting(PEEREXCHG) == (10, 2.minutes)
res1.get().getSetting(STOREV2) == (12, 12.seconds)
res1.get().getSetting(STOREV3) == (3, 3.seconds)
res1.get().getSetting(LIGHTPUSH) == (2, 2.milliseconds)
let test2 = @["lightpush:2/2ms", " store: 3/3s", "px:10/10h", "filter:4/42ms"]
let exp2 = {
GLOBAL: expU,
LIGHTPUSH: (2, 2.milliseconds),
STOREV3: (3, 3.seconds),
STOREV2: (3, 3.seconds),
FILTER: (4, 42.milliseconds),
PEEREXCHG: (10, 10.hours),
}.toTable()
let res2 = ProtocolRateLimitSettings.parse(test2)
check:
res2.isOk()
res2.get() == exp2
let test3 =
@["storev2:1/1s", "store:3/3s", "storev3:4/42ms", "storev3:5/5s", "storev3:6/6s"]
let exp3 = {
GLOBAL: expU,
FILTER: FilterDefaultPerPeerRateLimit,
STOREV3: (6, 6.seconds),
STOREV2: (1, 1.seconds),
}.toTable()
let res3 = ProtocolRateLimitSettings.parse(test3)
check:
res3.isOk()
res3.get() == exp3
res3.get().getSetting(LIGHTPUSH) == expU
let test4 = newSeq[string](0)
let exp4 = {GLOBAL: expU, FILTER: FilterDefaultPerPeerRateLimit}.toTable()
let res4 = ProtocolRateLimitSettings.parse(test4)
check:
res4.isOk()
res4.get() == exp4
res3.get().getSetting(LIGHTPUSH) == expU

View File

@ -82,3 +82,17 @@ suite "RequestRateLimiter":
# requests of other peers can also go
check limiter.checkUsage(proto, conn2, now + 4100.milliseconds) == true
check limiter.checkUsage(proto, conn3, now + 5.minutes) == true
test "RequestRateLimiter lowest possible volume":
# keep limits low for easier calculation of ratios
let rateLimit: RateLimitSetting = (1, 1.seconds)
var limiter = newRequestRateLimiter(some(rateLimit))
let now = Moment.now()
# with first use we register the peer also and start its timer
check limiter.checkUsage(proto, conn1, now + 500.milliseconds) == true
# run out of main tokens but still used one more token from the peer's bucket
check limiter.checkUsage(proto, conn1, now + 800.milliseconds) == false
check limiter.checkUsage(proto, conn1, now + 1499.milliseconds) == false
check limiter.checkUsage(proto, conn1, now + 1501.milliseconds) == true

View File

@ -84,7 +84,8 @@ suite "Waku Peer Exchange":
# Then no peers are fetched
check:
node.peerManager.peerStore.peers.len == 0
res.error == "PeerExchange is not mounted"
res.error.status_code == SERVICE_UNAVAILABLE
res.error.status_desc == some("PeerExchange is not mounted")
asyncTest "Node fetches with mounted peer exchange, but no peers":
# Given a node with peer exchange mounted
@ -92,7 +93,9 @@ suite "Waku Peer Exchange":
# When a node fetches peers
let res = await node.fetchPeerExchangePeers(1)
check res.error == "Peer exchange failure: peer_not_found_failure"
check:
res.error.status_code == SERVICE_UNAVAILABLE
res.error.status_desc == some("peer_not_found_failure")
# Then no peers are fetched
check node.peerManager.peerStore.peers.len == 0

View File

@ -146,7 +146,7 @@ suite "Waku Filter - DOS protection":
some(FilterSubscribeErrorKind.TOO_MANY_REQUESTS)
# ensure period of time has passed and clients can again use the service
await sleepAsync(700.milliseconds)
await sleepAsync(1000.milliseconds)
check client1.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) ==
none(FilterSubscribeErrorKind)
check client2.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) ==

View File

@ -1,11 +1,10 @@
{.used.}
import
std/[options, sequtils, tables],
std/[options, sequtils, tables, net],
testutils/unittests,
chronos,
chronicles,
stew/shims/net,
libp2p/[switch, peerId, crypto/crypto, multistream, muxers/muxer],
eth/[keys, p2p/discoveryv5/enr]
@ -223,6 +222,7 @@ suite "Waku Peer Exchange":
# Check that it failed gracefully
check:
response.isErr
response.error.status_code == PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE
asyncTest "Request 0 peers, with 0 peers in PeerExchange":
# Given a disconnected PeerExchange
@ -237,7 +237,7 @@ suite "Waku Peer Exchange":
# Then the response should be an error
check:
response.isErr
response.error == "peer_not_found_failure"
response.error.status_code == PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE
asyncTest "Pool filtering":
let
@ -331,7 +331,7 @@ suite "Waku Peer Exchange":
# Then the response should be an error
check:
response.isErr
response.error == "dial_failure"
response.error.status_code == PeerExchangeResponseStatusCode.DIAL_FAILURE
asyncTest "Connections are closed after response is sent":
# Create 3 nodes
@ -385,7 +385,7 @@ suite "Waku Peer Exchange":
let conn = connOpt.get()
# Send bytes so that they directly hit the handler
let rpc = PeerExchangeRpc(request: PeerExchangeRequest(numPeers: 1))
let rpc = PeerExchangeRpc.makeRequest(1)
var buffer: seq[byte]
await conn.writeLP(rpc.encode().buffer)
@ -397,5 +397,62 @@ suite "Waku Peer Exchange":
# Check we got back the enr we mocked
check:
decodedBuff.get().response.status_code == PeerExchangeResponseStatusCode.SUCCESS
decodedBuff.get().response.peerInfos.len == 1
decodedBuff.get().response.peerInfos[0].enr == enr1.raw
asyncTest "RateLimit as expected":
let
node1 =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
node2 =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
# Start and mount peer exchange
await allFutures([node1.start(), node2.start()])
await allFutures(
[
node1.mountPeerExchange(rateLimit = (1, 150.milliseconds)),
node2.mountPeerExchange(),
]
)
# Create connection
let connOpt = await node2.peerManager.dialPeer(
node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec
)
require:
connOpt.isSome
# Create some enr and add to peer exchange (simulating disv5)
var enr1, enr2 = enr.Record()
check enr1.fromUri(
"enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB"
)
check enr2.fromUri(
"enr:-Iu4QGJllOWlviPIh_SGR-VVm55nhnBIU5L-s3ran7ARz_4oDdtJPtUs3Bc5aqZHCiPQX6qzNYF2ARHER0JPX97TFbEBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQP3ULycvday4EkvtVu0VqbBdmOkbfVLJx8fPe0lE_dRkIN0Y3CC6mCFd2FrdTIB"
)
# Mock that we have discovered these enrs
node1.wakuPeerExchange.enrCache.add(enr1)
node1.wakuPeerExchange.enrCache.add(enr2)
await sleepAsync(150.milliseconds)
# Request 2 peer from px. Test all request variants
let response1 = await node2.wakuPeerExchange.request(1)
check:
response1.isOk
response1.get().peerInfos.len == 1
let response2 =
await node2.wakuPeerExchange.request(1, node1.peerInfo.toRemotePeerInfo())
check:
response2.isErr
response2.error().status_code == PeerExchangeResponseStatusCode.TOO_MANY_REQUESTS
await sleepAsync(150.milliseconds)
let response3 = await node2.wakuPeerExchange.request(1, connOpt.get())
check:
response3.isOk
response3.get().peerInfos.len == 1

View File

@ -1,10 +1,9 @@
{.used.}
import
std/[options],
std/[options, net],
testutils/unittests,
chronos,
stew/shims/net,
libp2p/switch,
libp2p/peerId,
libp2p/crypto/crypto,
@ -23,6 +22,14 @@ import
suite "Peer Exchange RPC":
asyncTest "Encode - Decode":
# Setup
let rpcReq = PeerExchangeRpc.makeRequest(2)
let rpcReqBuffer: seq[byte] = rpcReq.encode().buffer
let resReq = PeerExchangeRpc.decode(rpcReqBuffer)
check:
resReq.isOk
resReq.get().request.numPeers == 2
var
enr1 = enr.Record(seqNum: 0, raw: @[])
enr2 = enr.Record(seqNum: 0, raw: @[])
@ -35,19 +42,18 @@ suite "Peer Exchange RPC":
"enr:-Iu4QK_T7kzAmewG92u1pr7o6St3sBqXaiIaWIsFNW53_maJEaOtGLSN2FUbm6LmVxSfb1WfC7Eyk-nFYI7Gs3SlchwBgmlkgnY0gmlwhI5d6VKJc2VjcDI1NmsxoQLPYQDvrrFdCrhqw3JuFaGD71I8PtPfk6e7TJ3pg_vFQYN0Y3CC6mKDdWRwgiMq"
)
let
peerInfos =
@[PeerExchangePeerInfo(enr: enr1.raw), PeerExchangePeerInfo(enr: enr2.raw)]
rpc = PeerExchangeRpc(response: PeerExchangeResponse(peerInfos: peerInfos))
let peerInfos =
@[PeerExchangePeerInfo(enr: enr1.raw), PeerExchangePeerInfo(enr: enr2.raw)]
let rpc = PeerExchangeRpc.makeResponse(peerInfos)
# When encoding and decoding
let
rpcBuffer: seq[byte] = rpc.encode().buffer
res = PeerExchangeRpc.decode(rpcBuffer)
let rpcBuffer: seq[byte] = rpc.encode().buffer
let res = PeerExchangeRpc.decode(rpcBuffer)
# Then the peerInfos match the originals
check:
res.isOk
res.get().response.status_code == PeerExchangeResponseStatusCode.SUCCESS
res.get().response.peerInfos == peerInfos
# When using the decoded responses to create new enrs
@ -55,6 +61,9 @@ suite "Peer Exchange RPC":
resEnr1 = enr.Record(seqNum: 0, raw: @[])
resEnr2 = enr.Record(seqNum: 0, raw: @[])
check:
res.get().response.status_code == PeerExchangeResponseStatusCode.SUCCESS
discard resEnr1.fromBytes(res.get().response.peerInfos[0].enr)
discard resEnr2.fromBytes(res.get().response.peerInfos[1].enr)

View File

@ -1,9 +1,19 @@
{.push raises: [].}
import metrics
import std/options
import metrics, setting
declarePublicGauge waku_service_requests_limit,
"Applied rate limit of non-relay service", ["service"]
declarePublicCounter waku_service_requests,
"number of non-relay service requests received", ["service", "state"]
declarePublicCounter waku_service_network_bytes,
"total incoming traffic of specific waku services", labels = ["service", "direction"]
proc setServiceLimitMetric*(service: string, limit: Option[RateLimitSetting]) =
if limit.isSome() and not limit.get().isUnlimited():
waku_service_requests_limit.set(
limit.get().calculateLimitPerSecond(), labelValues = [service]
)

View File

@ -1,12 +1,34 @@
{.push raises: [].}
import chronos/timer
import chronos/timer, std/[tables, strutils, options], regex, results
# Setting for TokenBucket defined as volume over period of time
type RateLimitSetting* = tuple[volume: int, period: Duration]
type RateLimitedProtocol* = enum
GLOBAL
STOREV2
STOREV3
LIGHTPUSH
PEEREXCHG
FILTER
type ProtocolRateLimitSettings* = Table[RateLimitedProtocol, RateLimitSetting]
# Set the default to switch off rate limiting for now
let DefaultGlobalNonRelayRateLimit*: RateLimitSetting = (0, 0.minutes)
let UnlimitedRateLimit*: RateLimitSetting = (0, 0.seconds)
# Acceptable call frequence from one peer using filter service
# Assumption is having to set up a subscription with max 30 calls than using ping in every min
# While subscribe/unsubscribe events are distributed in time among clients, pings will happen regularly from
# all subscribed peers
let FilterDefaultPerPeerRateLimit*: RateLimitSetting = (30, 1.minutes)
# For being used under GC-safe condition must use threadvar
var DefaultProtocolRateLimit* {.threadvar.}: ProtocolRateLimitSettings
DefaultProtocolRateLimit =
{GLOBAL: UnlimitedRateLimit, FILTER: FilterDefaultPerPeerRateLimit}.toTable()
proc isUnlimited*(t: RateLimitSetting): bool {.inline.} =
return t.volume <= 0 or t.period <= 0.seconds
@ -17,3 +39,97 @@ func `$`*(t: RateLimitSetting): string {.inline.} =
"no-limit"
else:
$t.volume & "/" & $t.period
proc translate(sProtocol: string): RateLimitedProtocol {.raises: [ValueError].} =
if sProtocol.len == 0:
return GLOBAL
case sProtocol
of "global":
return GLOBAL
of "storev2":
return STOREV2
of "storev3":
return STOREV3
of "lightpush":
return LIGHTPUSH
of "px":
return PEEREXCHG
of "filter":
return FILTER
else:
raise newException(ValueError, "Unknown protocol definition: " & sProtocol)
proc fillSettingTable(
t: var ProtocolRateLimitSettings, sProtocol: var string, setting: RateLimitSetting
) {.raises: [ValueError].} =
if sProtocol == "store":
# generic store will only applies to version which is not listed directly
discard t.hasKeyOrPut(STOREV2, setting)
discard t.hasKeyOrPut(STOREV3, setting)
else:
let protocol = translate(sProtocol)
# always overrides, last one wins if same protocol duplicated
t[protocol] = setting
proc parse*(
T: type ProtocolRateLimitSettings, settings: seq[string]
): Result[ProtocolRateLimitSettings, string] =
var settingsTable: ProtocolRateLimitSettings =
initTable[RateLimitedProtocol, RateLimitSetting]()
## Following regex can match the exact syntax of how rate limit can be set for different protocol or global.
## It uses capture groups
## group0: Will be check if protocol name is followed by a colon but only if protocol name is set.
## group1: Protocol name, if empty we take it as "global" setting
## group2: Volume of tokens - only integer
## group3: Duration of period - only integer
## group4: Unit of period - only h:hour, m:minute, s:second, ms:millisecond allowed
## whitespaces are allowed lazily
const parseRegex =
"""^\s*((store|storev2|storev3|lightpush|px|filter)\s*:)?\s*(\d+)\s*\/\s*(\d+)\s*(s|h|m|ms)\s*$"""
const regexParseSize = re2(parseRegex)
for settingStr in settings:
let aSetting = settingStr.toLower()
try:
var m: RegexMatch2
if aSetting.match(regexParseSize, m) == false:
return err("Invalid rate-limit setting: " & settingStr)
var sProtocol = aSetting[m.captures[1]]
let volume = aSetting[m.captures[2]].parseInt()
let duration = aSetting[m.captures[3]].parseInt()
let periodUnit = aSetting[m.captures[4]]
var period = 0.seconds
case periodUnit
of "ms":
period = duration.milliseconds
of "s":
period = duration.seconds
of "m":
period = duration.minutes
of "h":
period = duration.hours
fillSettingTable(settingsTable, sProtocol, (volume, period))
except ValueError:
return err("Invalid rate-limit setting: " & settingStr)
# If there were no global setting predefined, we set unlimited
# due it is taken for protocols not defined in the list - thus those will not apply accidentally wrong settings.
discard settingsTable.hasKeyOrPut(GLOBAL, UnlimitedRateLimit)
discard settingsTable.hasKeyOrPut(FILTER, FilterDefaultPerPeerRateLimit)
return ok(settingsTable)
proc getSetting*(
t: ProtocolRateLimitSettings, protocol: RateLimitedProtocol
): RateLimitSetting =
let default = t.getOrDefault(GLOBAL, UnlimitedRateLimit)
return t.getOrDefault(protocol, default)
proc calculateLimitPerSecond*(setting: RateLimitSetting): float64 =
if setting.isUnlimited():
return 0.float64
return (setting.volume.float64 / setting.period.milliseconds.float64) * 1000.float64

View File

@ -8,7 +8,12 @@ import
libp2p/builders,
libp2p/nameresolving/nameresolver,
libp2p/transports/wstransport
import ../waku_enr, ../discovery/waku_discv5, ../waku_node, ../node/peer_manager
import
../waku_enr,
../discovery/waku_discv5,
../waku_node,
../node/peer_manager,
../common/rate_limit/setting
type
WakuNodeBuilder* = object # General
@ -34,6 +39,9 @@ type
switchSslSecureCert: Option[string]
switchSendSignedPeerRecord: Option[bool]
#Rate limit configs for non-relay req-resp protocols
rateLimitSettings: Option[seq[string]]
WakuNodeBuilderResult* = Result[void, string]
## Init
@ -105,6 +113,9 @@ proc withPeerManagerConfig*(
proc withColocationLimit*(builder: var WakuNodeBuilder, colocationLimit: int) =
builder.colocationLimit = colocationLimit
proc withRateLimit*(builder: var WakuNodeBuilder, limits: seq[string]) =
builder.rateLimitSettings = some(limits)
## Waku switch
proc withSwitchConfiguration*(
@ -184,4 +195,7 @@ proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] =
except Exception:
return err("failed to build WakuNode instance: " & getCurrentExceptionMsg())
if builder.rateLimitSettings.isSome():
?node.setRateLimits(builder.rateLimitSettings.get())
ok(node)

View File

@ -670,21 +670,18 @@ with the drawback of consuming some more bandwitdh.""",
name: "websocket-secure-cert-path"
.}: string
## Rate limitation config
## Currently default to switch of rate limit until become official
requestRateLimit* {.
## Rate limitation config, if not set, rate limit checks will not be performed
rateLimits* {.
desc:
"Number of requests to serve by each service in the specified period. Set it to 0 for unlimited",
defaultValue: 0,
name: "request-rate-limit"
.}: int
## Currently default to switch of rate limit until become official
requestRatePeriod* {.
desc: "Period of request rate limitation in seconds. Set it to 0 for unlimited",
defaultValue: 0,
name: "request-rate-period"
.}: int64
"Rate limit settings for different protocols." &
"Format: protocol:volume/period<unit>" &
" Where 'protocol' can be one of: <store|storev2|storev3|lightpush|px|filter> if not defined it means a global setting" &
" 'volume' and period must be an integer value. " &
" 'unit' must be one of <h|m|s|ms> - hours, minutes, seconds, milliseconds respectively. " &
"Argument may be repeated.",
defaultValue: newSeq[string](0),
name: "rate-limit"
.}: seq[string]
## Parsing
@ -850,6 +847,7 @@ proc load*(T: type WakuNodeConf, version = ""): ConfResult[T] =
sources.addConfigFile(Toml, conf.configFile.get())
,
)
ok(conf)
except CatchableError:
err(getCurrentExceptionMsg())

View File

@ -102,6 +102,7 @@ proc initNode(
builder.withPeerManagerConfig(
maxRelayPeers = conf.maxRelayPeers, shardAware = conf.relayShardedPeerManagement
)
builder.withRateLimit(conf.rateLimits)
node =
?builder.build().mapErr(
@ -274,20 +275,17 @@ proc setupProtocols(
if mountArcRes.isErr():
return err("failed to mount waku archive protocol: " & mountArcRes.error)
let rateLimitSetting: RateLimitSetting =
(conf.requestRateLimit, chronos.seconds(conf.requestRatePeriod))
if conf.legacyStore:
# Store legacy setup
try:
await mountLegacyStore(node, rateLimitSetting)
await mountLegacyStore(node, node.rateLimitSettings.getSetting(STOREV2))
except CatchableError:
return
err("failed to mount waku legacy store protocol: " & getCurrentExceptionMsg())
# Store setup
try:
await mountStore(node, rateLimitSetting)
await mountStore(node, node.rateLimitSettings.getSetting(STOREV3))
except CatchableError:
return err("failed to mount waku store protocol: " & getCurrentExceptionMsg())
@ -326,9 +324,7 @@ proc setupProtocols(
# NOTE Must be mounted after relay
if conf.lightpush:
try:
let rateLimitSetting: RateLimitSetting =
(conf.requestRateLimit, chronos.seconds(conf.requestRatePeriod))
await mountLightPush(node, rateLimitSetting)
await mountLightPush(node, node.rateLimitSettings.getSetting(LIGHTPUSH))
except CatchableError:
return err("failed to mount waku lightpush protocol: " & getCurrentExceptionMsg())
@ -348,6 +344,7 @@ proc setupProtocols(
subscriptionTimeout = chronos.seconds(conf.filterSubscriptionTimeout),
maxFilterPeers = conf.filterMaxPeersToServe,
maxFilterCriteriaPerPeer = conf.filterMaxCriteria,
rateLimitSetting = node.rateLimitSettings.getSetting(FILTER),
)
except CatchableError:
return err("failed to mount waku filter protocol: " & getCurrentExceptionMsg())
@ -368,7 +365,9 @@ proc setupProtocols(
# waku peer exchange setup
if conf.peerExchange:
try:
await mountPeerExchange(node, some(conf.clusterId))
await mountPeerExchange(
node, some(conf.clusterId), node.rateLimitSettings.getSetting(PEEREXCHG)
)
except CatchableError:
return
err("failed to mount waku peer-exchange protocol: " & getCurrentExceptionMsg())

View File

@ -114,6 +114,7 @@ type
started*: bool # Indicates that node has started listening
topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent]
contentTopicHandlers: Table[ContentTopic, TopicHandler]
rateLimitSettings*: ProtocolRateLimitSettings
proc getAutonatService*(rng: ref HmacDrbgContext): AutonatService =
## AutonatService request other peers to dial us back
@ -164,6 +165,7 @@ proc new*(
enr: enr,
announcedAddresses: netConfig.announcedAddresses,
topicSubscriptionQueue: queue,
rateLimitSettings: DefaultProtocolRateLimit,
)
return node
@ -481,7 +483,7 @@ proc mountFilter*(
maxFilterPeers: uint32 = filter_subscriptions.MaxFilterPeers,
maxFilterCriteriaPerPeer: uint32 = filter_subscriptions.MaxFilterCriteriaPerPeer,
messageCacheTTL: Duration = filter_subscriptions.MessageCacheTTL,
rateLimitSetting: RateLimitSetting = FilterPerPeerRateLimit,
rateLimitSetting: RateLimitSetting = FilterDefaultPerPeerRateLimit,
) {.async: (raises: []).} =
## Mounting filter v2 protocol
@ -1144,11 +1146,14 @@ proc mountRlnRelay*(
## Waku peer-exchange
proc mountPeerExchange*(
node: WakuNode, cluster: Option[uint16] = none(uint16)
node: WakuNode,
cluster: Option[uint16] = none(uint16),
rateLimit: RateLimitSetting = DefaultGlobalNonRelayRateLimit,
) {.async: (raises: []).} =
info "mounting waku peer exchange"
node.wakuPeerExchange = WakuPeerExchange.new(node.peerManager, cluster)
node.wakuPeerExchange =
WakuPeerExchange.new(node.peerManager, cluster, some(rateLimit))
if node.started:
try:
@ -1163,10 +1168,15 @@ proc mountPeerExchange*(
proc fetchPeerExchangePeers*(
node: Wakunode, amount: uint64
): Future[Result[int, string]] {.async: (raises: []).} =
): Future[Result[int, PeerExchangeResponseStatus]] {.async: (raises: []).} =
if node.wakuPeerExchange.isNil():
error "could not get peers from px, waku peer-exchange is nil"
return err("PeerExchange is not mounted")
return err(
(
status_code: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE,
status_desc: some("PeerExchange is not mounted"),
)
)
info "Retrieving peer info via peer exchange protocol"
let pxPeersRes = await node.wakuPeerExchange.request(amount)
@ -1184,7 +1194,7 @@ proc fetchPeerExchangePeers*(
else:
warn "failed to retrieve peer info via peer exchange protocol",
error = pxPeersRes.error
return err("Peer exchange failure: " & $pxPeersRes.error)
return err(pxPeersRes.error)
# TODO: Move to application module (e.g., wakunode2.nim)
proc setPeerExchangePeer*(
@ -1373,3 +1383,10 @@ proc isReady*(node: WakuNode): Future[bool] {.async: (raises: [Exception]).} =
return true
return await node.wakuRlnRelay.isReady()
## TODO: add other protocol `isReady` checks
proc setRateLimits*(node: WakuNode, limits: seq[string]): Result[void, string] =
let rateLimitConfig = ProtocolRateLimitSettings.parse(limits)
if rateLimitConfig.isErr():
return err("invalid rate limit settings:" & rateLimitConfig.error)
node.rateLimitSettings = rateLimitConfig.get()
return ok()

View File

@ -328,6 +328,7 @@ proc new*(
)
wf.initProtocolHandler()
setServiceLimitMetric(WakuFilterSubscribeCodec, rateLimitSetting)
return wf
const MaintainSubscriptionsInterval* = 1.minutes

View File

@ -12,12 +12,6 @@ const
DefaultSubscriptionTimeToLiveSec* = 5.minutes
MessageCacheTTL* = 2.minutes
# Acceptable call frequence from one peer using filter service
# Assumption is having to set up a subscription with max 30 calls than using ping in every min
# While subscribe/unsubscribe events are distributed in time among clients, pings will happen regularly from
# all subscribed peers
FilterPerPeerRateLimit*: RateLimitSetting = (30, 1.minutes)
type
# a single filter criterion is fully defined by a pubsub topic and content topic
FilterCriterion* = tuple[pubsubTopic: PubsubTopic, contentTopic: ContentTopic]

View File

@ -110,4 +110,5 @@ proc new*(
requestRateLimiter: newRequestRateLimiter(rateLimitSetting),
)
wl.initProtocolHandler()
setServiceLimitMetric(WakuLightpushCodec, rateLimitSetting)
return wl

View File

@ -1,5 +1,5 @@
{.push raises: [].}
import ./waku_peer_exchange/protocol
import ./waku_peer_exchange/[protocol, rpc]
export protocol
export protocol, rpc

View File

@ -13,7 +13,8 @@ import
../waku_core,
../discovery/waku_discv5,
./rpc,
./rpc_codec
./rpc_codec,
../common/rate_limit/request_limiter
declarePublicGauge waku_px_peers_received_total,
"number of ENRs received via peer exchange"
@ -45,37 +46,55 @@ const
pxFailure = "px_failure"
type
WakuPeerExchangeResult*[T] = Result[T, string]
WakuPeerExchangeResult*[T] = Result[T, PeerExchangeResponseStatus]
WakuPeerExchange* = ref object of LPProtocol
peerManager*: PeerManager
enrCache*: seq[enr.Record]
cluster*: Option[uint16]
# todo: next step: ring buffer; future: implement cache satisfying https://rfc.vac.dev/spec/34/
requestRateLimiter*: RequestRateLimiter
proc request*(
wpx: WakuPeerExchange, numPeers: uint64, conn: Connection
): Future[WakuPeerExchangeResult[PeerExchangeResponse]] {.async: (raises: []).} =
let rpc = PeerExchangeRpc(request: PeerExchangeRequest(numPeers: numPeers))
let rpc = PeerExchangeRpc.makeRequest(numPeers)
var buffer: seq[byte]
var error: string
var callResult =
(status_code: PeerExchangeResponseStatusCode.SUCCESS, status_desc: none(string))
try:
await conn.writeLP(rpc.encode().buffer)
buffer = await conn.readLp(DefaultMaxRpcSize.int)
except CatchableError as exc:
waku_px_errors.inc(labelValues = [exc.msg])
error = $exc.msg
callResult = (
status_code: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE,
status_desc: some($exc.msg),
)
finally:
# close, no more data is expected
await conn.closeWithEof()
if error.len > 0:
return err("write/read failed: " & error)
if callResult.status_code != PeerExchangeResponseStatusCode.SUCCESS:
return err(callResult)
let decodedBuff = PeerExchangeRpc.decode(buffer)
if decodedBuff.isErr():
return err("decode failed: " & $decodedBuff.error)
return err(
(
status_code: PeerExchangeResponseStatusCode.BAD_RESPONSE,
status_desc: some($decodedBuff.error),
)
)
if decodedBuff.get().response.status_code != PeerExchangeResponseStatusCode.SUCCESS:
return err(
(
status_code: decodedBuff.get().response.status_code,
status_desc: decodedBuff.get().response.status_desc,
)
)
return ok(decodedBuff.get().response)
proc request*(
@ -84,10 +103,20 @@ proc request*(
try:
let connOpt = await wpx.peerManager.dialPeer(peer, WakuPeerExchangeCodec)
if connOpt.isNone():
return err(dialFailure)
return err(
(
status_code: PeerExchangeResponseStatusCode.DIAL_FAILURE,
status_desc: some(dialFailure),
)
)
return await wpx.request(numPeers, connOpt.get())
except CatchableError:
return err("exception dialing peer: " & getCurrentExceptionMsg())
return err(
(
status_code: PeerExchangeResponseStatusCode.BAD_RESPONSE,
status_desc: some("exception dialing peer: " & getCurrentExceptionMsg()),
)
)
proc request*(
wpx: WakuPeerExchange, numPeers: uint64
@ -95,22 +124,50 @@ proc request*(
let peerOpt = wpx.peerManager.selectPeer(WakuPeerExchangeCodec)
if peerOpt.isNone():
waku_px_errors.inc(labelValues = [peerNotFoundFailure])
return err(peerNotFoundFailure)
return err(
(
status_code: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE,
status_desc: some(peerNotFoundFailure),
)
)
return await wpx.request(numPeers, peerOpt.get())
proc respond(
wpx: WakuPeerExchange, enrs: seq[enr.Record], conn: Connection
): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} =
let rpc = PeerExchangeRpc(
response:
PeerExchangeResponse(peerInfos: enrs.mapIt(PeerExchangePeerInfo(enr: it.raw)))
)
let rpc = PeerExchangeRpc.makeResponse(enrs.mapIt(PeerExchangePeerInfo(enr: it.raw)))
try:
await conn.writeLP(rpc.encode().buffer)
except CatchableError as exc:
waku_px_errors.inc(labelValues = [exc.msg])
return err(exc.msg)
return err(
(
status_code: PeerExchangeResponseStatusCode.DIAL_FAILURE,
status_desc: some("exception dialing peer: " & exc.msg),
)
)
return ok()
proc respondError(
wpx: WakuPeerExchange,
status_code: PeerExchangeResponseStatusCode,
status_desc: Option[string],
conn: Connection,
): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} =
let rpc = PeerExchangeRpc.makeErrorResponse(status_code, status_desc)
try:
await conn.writeLP(rpc.encode().buffer)
except CatchableError as exc:
waku_px_errors.inc(labelValues = [exc.msg])
return err(
(
status_code: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE,
status_desc: some("exception dialing peer: " & exc.msg),
)
)
return ok()
@ -169,26 +226,44 @@ proc updatePxEnrCache(wpx: WakuPeerExchange) {.async.} =
proc initProtocolHandler(wpx: WakuPeerExchange) =
proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} =
var buffer: seq[byte]
try:
buffer = await conn.readLp(DefaultMaxRpcSize.int)
except CatchableError as exc:
waku_px_errors.inc(labelValues = [exc.msg])
return
wpx.requestRateLimiter.checkUsageLimit(WakuPeerExchangeCodec, conn):
try:
buffer = await conn.readLp(DefaultMaxRpcSize.int)
except CatchableError as exc:
waku_px_errors.inc(labelValues = [exc.msg])
let decBuf = PeerExchangeRpc.decode(buffer)
if decBuf.isErr():
waku_px_errors.inc(labelValues = [decodeRpcFailure])
return
(
await wpx.respondError(
PeerExchangeResponseStatusCode.BAD_REQUEST, some(exc.msg), conn
)
).isOkOr:
error "Failed to respond with BAD_REQUEST:", error = $error
return
let rpc = decBuf.get()
trace "peer exchange request received"
let enrs = wpx.getEnrsFromCache(rpc.request.numPeers)
let res = await wpx.respond(enrs, conn)
if res.isErr:
waku_px_errors.inc(labelValues = [res.error])
else:
waku_px_peers_sent.inc(enrs.len().int64())
let decBuf = PeerExchangeRpc.decode(buffer)
if decBuf.isErr():
waku_px_errors.inc(labelValues = [decodeRpcFailure])
error "Failed to decode PeerExchange request", error = $decBuf.error
(
await wpx.respondError(
PeerExchangeResponseStatusCode.BAD_REQUEST, some($decBuf.error), conn
)
).isOkOr:
error "Failed to respond with BAD_REQUEST:", error = $error
return
trace "peer exchange request received"
let enrs = wpx.getEnrsFromCache(decBuf.get().request.numPeers)
(await wpx.respond(enrs, conn)).isErrOr:
waku_px_peers_sent.inc(enrs.len().int64())
do:
(
await wpx.respondError(
PeerExchangeResponseStatusCode.TOO_MANY_REQUESTS, none(string), conn
)
).isOkOr:
error "Failed to respond with TOO_MANY_REQUESTS:", error = $error
# close, no data is expected
await conn.closeWithEof()
@ -199,8 +274,14 @@ proc new*(
T: type WakuPeerExchange,
peerManager: PeerManager,
cluster: Option[uint16] = none(uint16),
rateLimitSetting: Option[RateLimitSetting] = none[RateLimitSetting](),
): T =
let wpx = WakuPeerExchange(peerManager: peerManager, cluster: cluster)
let wpx = WakuPeerExchange(
peerManager: peerManager,
cluster: cluster,
requestRateLimiter: newRequestRateLimiter(rateLimitSetting),
)
wpx.initProtocolHandler()
setServiceLimitMetric(WakuPeerExchangeCodec, rateLimitSetting)
asyncSpawn wpx.updatePxEnrCache()
return wpx

View File

@ -1,4 +1,15 @@
import std/options
type
PeerExchangeResponseStatusCode* {.pure.} = enum
UNKNOWN = uint32(000)
SUCCESS = uint32(200)
BAD_REQUEST = uint32(400)
BAD_RESPONSE = uint32(401)
TOO_MANY_REQUESTS = uint32(429)
SERVICE_UNAVAILABLE = uint32(503)
DIAL_FAILURE = uint32(599)
PeerExchangePeerInfo* = object
enr*: seq[byte] # RLP encoded ENR: https://eips.ethereum.org/EIPS/eip-778
@ -7,7 +18,44 @@ type
PeerExchangeResponse* = object
peerInfos*: seq[PeerExchangePeerInfo]
status_code*: PeerExchangeResponseStatusCode
status_desc*: Option[string]
PeerExchangeResponseStatus* =
tuple[status_code: PeerExchangeResponseStatusCode, status_desc: Option[string]]
PeerExchangeRpc* = object
request*: PeerExchangeRequest
response*: PeerExchangeResponse
proc makeRequest*(T: type PeerExchangeRpc, numPeers: uint64): T =
return T(request: PeerExchangeRequest(numPeers: numPeers))
proc makeResponse*(T: type PeerExchangeRpc, peerInfos: seq[PeerExchangePeerInfo]): T =
return T(
response: PeerExchangeResponse(
peerInfos: peerInfos, status_code: PeerExchangeResponseStatusCode.SUCCESS
)
)
proc makeErrorResponse*(
T: type PeerExchangeRpc,
status_code: PeerExchangeResponseStatusCode,
status_desc: Option[string] = none(string),
): T =
return T(
response: PeerExchangeResponse(status_code: status_code, status_desc: status_desc)
)
proc `$`*(statusCode: PeerExchangeResponseStatusCode): string =
case statusCode
of PeerExchangeResponseStatusCode.UNKNOWN: "UNKNOWN"
of PeerExchangeResponseStatusCode.SUCCESS: "SUCCESS"
of PeerExchangeResponseStatusCode.BAD_REQUEST: "BAD_REQUEST"
of PeerExchangeResponseStatusCode.BAD_RESPONSE: "BAD_RESPONSE"
of PeerExchangeResponseStatusCode.TOO_MANY_REQUESTS: "TOO_MANY_REQUESTS"
of PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE: "SERVICE_UNAVAILABLE"
of PeerExchangeResponseStatusCode.DIAL_FAILURE: "DIAL_FAILURE"
# proc `$`*(pxResponseStatus: PeerExchangeResponseStatus): string =
# return $pxResponseStatus.status & " - " & pxResponseStatus.desc.get("")

View File

@ -1,5 +1,6 @@
{.push raises: [].}
import std/options
import ../common/protobuf, ./rpc
proc encode*(rpc: PeerExchangeRequest): ProtoBuffer =
@ -38,17 +39,26 @@ proc decode*(T: type PeerExchangePeerInfo, buffer: seq[byte]): ProtoResult[T] =
ok(rpc)
proc parse*(T: type PeerExchangeResponseStatusCode, status: uint32): T =
case status
of 200, 400, 429, 503:
PeerExchangeResponseStatusCode(status)
else:
PeerExchangeResponseStatusCode.UNKNOWN
proc encode*(rpc: PeerExchangeResponse): ProtoBuffer =
var pb = initProtoBuffer()
for pi in rpc.peerInfos:
pb.write3(1, pi.encode())
pb.write3(10, rpc.status_code.uint32)
pb.write3(11, rpc.status_desc)
pb.finish3()
pb
proc decode*(T: type PeerExchangeResponse, buffer: seq[byte]): ProtoResult[T] =
proc decode*(T: type PeerExchangeResponse, buffer: seq[byte]): ProtobufResult[T] =
let pb = initProtoBuffer(buffer)
var rpc = PeerExchangeResponse(peerInfos: @[])
@ -57,6 +67,18 @@ proc decode*(T: type PeerExchangeResponse, buffer: seq[byte]): ProtoResult[T] =
for pib in peerInfoBuffers:
rpc.peerInfos.add(?PeerExchangePeerInfo.decode(pib))
var status_code: uint32
if ?pb.getField(10, status_code):
rpc.status_code = PeerExchangeResponseStatusCode.parse(status_code)
else:
return err(ProtobufError.missingRequiredField("status_code"))
var status_desc: string
if ?pb.getField(11, status_desc):
rpc.status_desc = some(status_desc)
else:
rpc.status_desc = none(string)
ok(rpc)
proc encode*(rpc: PeerExchangeRpc): ProtoBuffer =
@ -64,21 +86,25 @@ proc encode*(rpc: PeerExchangeRpc): ProtoBuffer =
pb.write3(1, rpc.request.encode())
pb.write3(2, rpc.response.encode())
pb.finish3()
pb
proc decode*(T: type PeerExchangeRpc, buffer: seq[byte]): ProtoResult[T] =
proc decode*(T: type PeerExchangeRpc, buffer: seq[byte]): ProtobufResult[T] =
let pb = initProtoBuffer(buffer)
var rpc = PeerExchangeRpc()
var requestBuffer: seq[byte]
if not ?pb.getField(1, requestBuffer):
return err(ProtoError.RequiredFieldMissing)
return err(ProtobufError.missingRequiredField("request"))
rpc.request = ?PeerExchangeRequest.decode(requestBuffer)
var responseBuffer: seq[byte]
discard ?pb.getField(2, responseBuffer)
if not ?pb.getField(2, responseBuffer):
return err(ProtobufError.missingRequiredField("response"))
rpc.response = ?PeerExchangeResponse.decode(responseBuffer)
ok(rpc)

View File

@ -154,5 +154,5 @@ proc new*(
)
store.initProtocolHandler()
setServiceLimitMetric(WakuStoreCodec, rateLimitSetting)
return store

View File

@ -153,4 +153,5 @@ proc new*(
requestRateLimiter: newRequestRateLimiter(rateLimitSetting),
)
ws.initProtocolHandler()
setServiceLimitMetric(WakuLegacyStoreCodec, rateLimitSetting)
ws