feat: Added simple, configurable rate limit for lightpush and store-query (#2390)

* feat: Added simple, configurable rate limit for lightpush and store-query
Adjust lightpush rest response to rate limit, added tests ann some fixes
Add rest store query test for rate limit checks and proper error response
Update apps/wakunode2/external_config.nim
Move chronos/tokenbucket to nwaku codebasee with limited and fixed feature set
Add meterics counter to lightpush rate limits

Co-authored-by: gabrielmer <101006718+gabrielmer@users.noreply.github.com>
This commit is contained in:
NagyZoltanPeter 2024-04-15 15:28:35 +02:00 committed by GitHub
parent 4117fe65b7
commit a00f350cd1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 686 additions and 29 deletions

View File

@ -10,11 +10,13 @@ import
../testlib/[common, wakucore]
proc newTestWakuLightpushNode*(
switch: Switch, handler: PushMessageHandler
switch: Switch,
handler: PushMessageHandler,
rateLimitSetting: Option[RateLimitSetting] = none[RateLimitSetting](),
): Future[WakuLightPush] {.async.} =
let
peerManager = PeerManager.new(switch)
proto = WakuLightPush.new(peerManager, rng, handler)
proto = WakuLightPush.new(peerManager, rng, handler, rateLimitSetting)
await proto.start()
switch.mount(proto)

View File

@ -1 +1 @@
import ./test_client
import ./test_client, ./test_ratelimit

View File

@ -203,7 +203,7 @@ suite "Waku Lightpush Client":
# 1KiB
message2 = fakeWakuMessage(
contentTopic = contentTopic, payload = getByteSequence(10 * 1024)
) # 10KiB
) # 10KiB
message3 = fakeWakuMessage(
contentTopic = contentTopic, payload = getByteSequence(100 * 1024)
) # 100KiB

View File

@ -0,0 +1,151 @@
{.used.}
import
std/[options, strscans],
testutils/unittests,
chronicles,
chronos,
libp2p/crypto/crypto
import
../../waku/[
node/peer_manager,
common/ratelimit,
waku_core,
waku_lightpush,
waku_lightpush/client,
waku_lightpush/common,
waku_lightpush/protocol_metrics,
waku_lightpush/rpc,
waku_lightpush/rpc_codec,
],
../testlib/[assertions, wakucore, testasync, futures, testutils],
./lightpush_utils,
../resources/[pubsub_topics, content_topics, payloads]
suite "Rate limited push service":
asyncTest "push message with rate limit not violated":
## Setup
let
serverSwitch = newTestSwitch()
clientSwitch = newTestSwitch()
await allFutures(serverSwitch.start(), clientSwitch.start())
## Given
var handlerFuture = newFuture[(string, WakuMessage)]()
let handler: PushMessageHandler = proc(
peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.} =
handlerFuture.complete((pubsubTopic, message))
return ok()
let
tokenPeriod = 500.millis
server =
await newTestWakuLightpushNode(serverSwitch, handler, some((3, tokenPeriod)))
client = newTestWakuLightpushClient(clientSwitch)
let serverPeerId = serverSwitch.peerInfo.toRemotePeerInfo()
let sendMsgProc = proc(): Future[void] {.async.} =
let message = fakeWakuMessage()
handlerFuture = newFuture[(string, WakuMessage)]()
let requestRes =
await client.publish(DefaultPubsubTopic, message, peer = serverPeerId)
check await handlerFuture.withTimeout(50.millis)
assert requestRes.isOk(), requestRes.error
check handlerFuture.finished()
let (handledMessagePubsubTopic, handledMessage) = handlerFuture.read()
check:
handledMessagePubsubTopic == DefaultPubsubTopic
handledMessage == message
let waitInBetweenFor = 20.millis
# Test cannot be too explicit about the time when the TokenBucket resets
# the internal timer, although in normal use there is no use case to care about it.
var firstWaitExtend = 300.millis
for runCnt in 0 ..< 3:
let startTime = Moment.now()
for testCnt in 0 ..< 3:
await sendMsgProc()
await sleepAsync(20.millis)
var endTime = Moment.now()
var elapsed: Duration = (endTime - startTime)
await sleepAsync(tokenPeriod - elapsed + firstWaitExtend)
firstWaitEXtend = 100.millis
## Cleanup
await allFutures(clientSwitch.stop(), serverSwitch.stop())
asyncTest "push message with rate limit reject":
## Setup
let
serverSwitch = newTestSwitch()
clientSwitch = newTestSwitch()
await allFutures(serverSwitch.start(), clientSwitch.start())
## Given
var handlerFuture = newFuture[(string, WakuMessage)]()
let handler = proc(
peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.} =
handlerFuture.complete((pubsubTopic, message))
return ok()
let
server =
await newTestWakuLightpushNode(serverSwitch, handler, some((3, 500.millis)))
client = newTestWakuLightpushClient(clientSwitch)
let serverPeerId = serverSwitch.peerInfo.toRemotePeerInfo()
let topic = DefaultPubsubTopic
let successProc = proc(): Future[void] {.async.} =
let message = fakeWakuMessage()
handlerFuture = newFuture[(string, WakuMessage)]()
let requestRes =
await client.publish(DefaultPubsubTopic, message, peer = serverPeerId)
discard await handlerFuture.withTimeout(10.millis)
check:
requestRes.isOk()
handlerFuture.finished()
let (handledMessagePubsubTopic, handledMessage) = handlerFuture.read()
check:
handledMessagePubsubTopic == DefaultPubsubTopic
handledMessage == message
let rejectProc = proc(): Future[void] {.async.} =
let message = fakeWakuMessage()
handlerFuture = newFuture[(string, WakuMessage)]()
let requestRes =
await client.publish(DefaultPubsubTopic, message, peer = serverPeerId)
discard await handlerFuture.withTimeout(10.millis)
check:
requestRes.isErr()
requestRes.error == "TOO_MANY_REQUESTS"
for testCnt in 0 .. 2:
await successProc()
await sleepAsync(20.millis)
await rejectProc()
await sleepAsync(500.millis)
## next one shall succeed due to the rate limit time window has passed
await successProc()
## Cleanup
await allFutures(clientSwitch.stop(), serverSwitch.stop())

View File

@ -244,9 +244,7 @@ procSuite "WakuNode - Store":
server.wakuFilterClient.registerPushHandler(filterHandler)
let resp = waitFor server.filterSubscribe(
some(DefaultPubsubTopic),
DefaultContentTopic,
peer = filterSourcePeer,
some(DefaultPubsubTopic), DefaultContentTopic, peer = filterSourcePeer
)
waitFor sleepAsync(100.millis)
@ -319,3 +317,97 @@ procSuite "WakuNode - Store":
# Cleanup
waitFor allFutures(client.stop(), server.stop())
test "Store protocol queries does not violate request rate limitation":
## Setup
let
serverKey = generateSecp256k1Key()
server = newTestWakuNode(serverKey, parseIpAddress("0.0.0.0"), Port(0))
clientKey = generateSecp256k1Key()
client = newTestWakuNode(clientKey, parseIpAddress("0.0.0.0"), Port(0))
waitFor allFutures(client.start(), server.start())
let mountArchiveRes = server.mountArchive(archiveA)
assert mountArchiveRes.isOk(), mountArchiveRes.error
waitFor server.mountStore((4, 500.millis))
client.mountStoreClient()
## Given
let req = HistoryQuery(contentTopics: @[DefaultContentTopic])
let serverPeer = server.peerInfo.toRemotePeerInfo()
let requestProc = proc() {.async.} =
let queryRes = waitFor client.query(req, peer = serverPeer)
assert queryRes.isOk(), queryRes.error
let response = queryRes.get()
check:
response.messages == msgListA
for count in 0 ..< 4:
waitFor requestProc()
waitFor sleepAsync(20.millis)
waitFor sleepAsync(500.millis)
for count in 0 ..< 4:
waitFor requestProc()
waitFor sleepAsync(20.millis)
# Cleanup
waitFor allFutures(client.stop(), server.stop())
test "Store protocol queries overrun request rate limitation":
## Setup
let
serverKey = generateSecp256k1Key()
server = newTestWakuNode(serverKey, parseIpAddress("0.0.0.0"), Port(0))
clientKey = generateSecp256k1Key()
client = newTestWakuNode(clientKey, parseIpAddress("0.0.0.0"), Port(0))
waitFor allFutures(client.start(), server.start())
let mountArchiveRes = server.mountArchive(archiveA)
assert mountArchiveRes.isOk(), mountArchiveRes.error
waitFor server.mountStore((3, 500.millis))
client.mountStoreClient()
## Given
let req = HistoryQuery(contentTopics: @[DefaultContentTopic])
let serverPeer = server.peerInfo.toRemotePeerInfo()
let successProc = proc() {.async.} =
let queryRes = waitFor client.query(req, peer = serverPeer)
check queryRes.isOk()
let response = queryRes.get()
check:
response.messages == msgListA
let failsProc = proc() {.async.} =
let queryRes = waitFor client.query(req, peer = serverPeer)
check queryRes.isErr()
check queryRes.error == "TOO_MANY_REQUESTS"
for count in 0 ..< 3:
waitFor successProc()
waitFor sleepAsync(20.millis)
waitFor failsProc()
waitFor sleepAsync(500.millis)
for count in 0 ..< 3:
waitFor successProc()
waitFor sleepAsync(20.millis)
# Cleanup
waitFor allFutures(client.stop(), server.stop())

View File

@ -22,8 +22,10 @@ import
../../waku/waku_api/rest/lightpush/handlers as lightpush_api,
../../waku/waku_api/rest/lightpush/client as lightpush_api_client,
../../waku/waku_relay,
../../waku/common/ratelimit,
../testlib/wakucore,
../testlib/wakunode
../testlib/wakunode,
../testlib/testutils
proc testWakuNode(): WakuNode =
let
@ -41,7 +43,9 @@ type RestLightPushTest = object
restServer: WakuRestServerRef
client: RestClientRef
proc init(T: type RestLightPushTest): Future[T] {.async.} =
proc init(
T: type RestLightPushTest, rateLimit: RateLimitSetting = (0, 0.millis)
): Future[T] {.async.} =
var testSetup = RestLightPushTest()
testSetup.serviceNode = testWakuNode()
testSetup.pushNode = testWakuNode()
@ -55,7 +59,7 @@ proc init(T: type RestLightPushTest): Future[T] {.async.} =
await testSetup.consumerNode.mountRelay()
await testSetup.serviceNode.mountRelay()
await testSetup.serviceNode.mountLightPush()
await testSetup.serviceNode.mountLightPush(rateLimit)
testSetup.pushNode.mountLightPushClient()
testSetup.serviceNode.peerManager.addServicePeer(
@ -178,6 +182,74 @@ suite "Waku v2 Rest API - lightpush":
await restLightPushTest.shutdown()
# disabled due to this bug in nim-chronos https://github.com/status-im/nim-chronos/issues/500
xasyncTest "Request rate limit push message":
# Given
let budgetCap = 3
let tokenPeriod = 500.millis
let restLightPushTest = await RestLightPushTest.init((budgetCap, tokenPeriod))
restLightPushTest.consumerNode.subscribe(
(kind: PubsubSub, topic: DefaultPubsubTopic)
)
restLightPushTest.serviceNode.subscribe(
(kind: PubsubSub, topic: DefaultPubsubTopic)
)
require:
toSeq(restLightPushTest.serviceNode.wakuRelay.subscribedTopics).len == 1
# When
let pushProc = proc() {.async.} =
let message: RelayWakuMessage = fakeWakuMessage(
contentTopic = DefaultContentTopic, payload = toBytes("TEST-1")
)
.toRelayWakuMessage()
let requestBody =
PushRequest(pubsubTopic: some(DefaultPubsubTopic), message: message)
let response = await restLightPushTest.client.sendPushRequest(requestBody)
echo "response", $response
# Then
check:
response.status == 200
$response.contentType == $MIMETYPE_TEXT
let pushRejectedProc = proc() {.async.} =
let message: RelayWakuMessage = fakeWakuMessage(
contentTopic = DefaultContentTopic, payload = toBytes("TEST-1")
)
.toRelayWakuMessage()
let requestBody =
PushRequest(pubsubTopic: some(DefaultPubsubTopic), message: message)
let response = await restLightPushTest.client.sendPushRequest(requestBody)
echo "response", $response
# Then
check:
response.status == 429
await pushProc()
await pushProc()
await pushProc()
await pushRejectedProc()
await sleepAsync(tokenPeriod)
for runCnt in 0 ..< 3:
let startTime = Moment.now()
for sendCnt in 0 ..< budgetCap:
await pushProc()
let endTime = Moment.now()
let elapsed: Duration = (endTime - startTime)
await sleepAsync(tokenPeriod - elapsed)
await restLightPushTest.shutdown()
## TODO: Re-work this test when lightpush protocol change is done: https://github.com/waku-org/pm/issues/93
## This test is similar when no available peer exists for publish. Currently it is returning success,
## that makes this test not useful.

View File

@ -532,7 +532,7 @@ procSuite "Waku v2 Rest API - Store":
let node = testWakuNode()
await node.start()
let restPort = Port(58014)
let restPort = Port(58017)
let restAddress = parseIpAddress("0.0.0.0")
let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet()
@ -638,3 +638,145 @@ procSuite "Waku v2 Rest API - Store":
storeMessage.timestamp.get() == msg.timestamp
storeMessage.ephemeral.get() == msg.ephemeral
storeMessage.meta.get() == base64.encode(msg.meta)
asyncTest "Rate limit store node history query":
# Test adapted from the analogous present at waku_store/test_wakunode_store.nim
let node = testWakuNode()
await node.start()
let restPort = Port(58018)
let restAddress = parseIpAddress("0.0.0.0")
let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet()
installStoreApiHandlers(restServer.router, node)
restServer.start()
# WakuStore setup
let driver: ArchiveDriver = QueueDriver.new()
let mountArchiveRes = node.mountArchive(driver)
assert mountArchiveRes.isOk(), mountArchiveRes.error
await node.mountStore((2, 500.millis))
node.mountStoreClient()
let key = generateEcdsaKey()
var peerSwitch = newStandardSwitch(some(key))
await peerSwitch.start()
peerSwitch.mount(node.wakuStore)
# Now prime it with some history before tests
let timeOrigin = wakucore.now()
let msgList =
@[
fakeWakuMessage(@[byte 00], ts = ts(00, timeOrigin)),
fakeWakuMessage(@[byte 01], ts = ts(10, timeOrigin)),
fakeWakuMessage(@[byte 02], ts = ts(20, timeOrigin)),
fakeWakuMessage(@[byte 03], ts = ts(30, timeOrigin)),
fakeWakuMessage(@[byte 04], ts = ts(40, timeOrigin)),
fakeWakuMessage(@[byte 05], ts = ts(50, timeOrigin)),
fakeWakuMessage(@[byte 06], ts = ts(60, timeOrigin)),
fakeWakuMessage(@[byte 07], ts = ts(70, timeOrigin)),
fakeWakuMessage(@[byte 08], ts = ts(80, timeOrigin)),
fakeWakuMessage(@[byte 09], ts = ts(90, timeOrigin)),
]
for msg in msgList:
require (waitFor driver.put(DefaultPubsubTopic, msg)).isOk()
let client = newRestHttpClient(initTAddress(restAddress, restPort))
let remotePeerInfo = peerSwitch.peerInfo.toRemotePeerInfo()
let fullAddr = $remotePeerInfo.addrs[0] & "/p2p/" & $remotePeerInfo.peerId
var pages = newSeq[seq[WakuMessage]](2)
# Fields that compose a HistoryCursor object
var reqPubsubTopic = DefaultPubsubTopic
var reqSenderTime = Timestamp(0)
var reqStoreTime = Timestamp(0)
var reqDigest = waku_store.MessageDigest()
for i in 0 ..< 2:
let response = await client.getStoreMessagesV1(
encodeUrl(fullAddr),
encodeUrl(reqPubsubTopic),
"", # content topics. Empty ignores the field.
"", # start time. Empty ignores the field.
"", # end time. Empty ignores the field.
encodeUrl($reqSenderTime), # sender time
encodeUrl($reqStoreTime), # store time
reqDigest.toRestStringMessageDigest(),
# base64-encoded digest. Empty ignores the field.
"3", # page size. Empty implies default page size.
"true", # ascending
)
var wakuMessages = newSeq[WakuMessage](0)
for j in 0 ..< response.data.messages.len:
wakuMessages.add(response.data.messages[j].toWakuMessage())
pages[i] = wakuMessages
# populate the cursor for next page
if response.data.cursor.isSome():
reqPubsubTopic = response.data.cursor.get().pubsubTopic
reqDigest = response.data.cursor.get().digest
reqSenderTime = response.data.cursor.get().senderTime
reqStoreTime = response.data.cursor.get().storeTime
check:
response.status == 200
$response.contentType == $MIMETYPE_JSON
check:
pages[0] == msgList[0 .. 2]
pages[1] == msgList[3 .. 5]
# request last third will lead to rate limit rejection
var response = await client.getStoreMessagesV1(
encodeUrl(fullAddr),
encodeUrl(reqPubsubTopic),
"", # content topics. Empty ignores the field.
"", # start time. Empty ignores the field.
"", # end time. Empty ignores the field.
encodeUrl($reqSenderTime), # sender time
encodeUrl($reqStoreTime), # store time
reqDigest.toRestStringMessageDigest(),
# base64-encoded digest. Empty ignores the field.
)
check:
response.status == 429
$response.contentType == $MIMETYPE_TEXT
response.data.error_message.get == "Request rate limmit reached"
await sleepAsync(500.millis)
# retry after respective amount of time shall succeed
response = await client.getStoreMessagesV1(
encodeUrl(fullAddr),
encodeUrl(reqPubsubTopic),
"", # content topics. Empty ignores the field.
"", # start time. Empty ignores the field.
"", # end time. Empty ignores the field.
encodeUrl($reqSenderTime), # sender time
encodeUrl($reqStoreTime), # store time
reqDigest.toRestStringMessageDigest(),
# base64-encoded digest. Empty ignores the field.
"5", # page size. Empty implies default page size.
"true", # ascending
)
check:
response.status == 200
$response.contentType == $MIMETYPE_JSON
var wakuMessages = newSeq[WakuMessage](0)
for j in 0 ..< response.data.messages.len:
wakuMessages.add(response.data.messages[j].toWakuMessage())
check wakuMessages == msgList[6 .. 9]
await restServer.stop()
await restServer.closeWait()
await node.stop()

24
waku/common/ratelimit.nim Normal file
View File

@ -0,0 +1,24 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import std/options
import chronos/timer
import ./tokenbucket
export tokenbucket
type RateLimitSetting* = tuple[volume: int, period: Duration]
let DefaultGlobalNonRelayRateLimit*: RateLimitSetting = (60, 1.minutes)
proc newTokenBucket*(setting: Option[RateLimitSetting]): Option[TokenBucket] =
if setting.isNone:
return none[TokenBucket]()
let (volume, period) = setting.get()
if volume <= 0 or period <= 0.seconds:
return none[TokenBucket]()
return some(TokenBucket.new(volume, period))

View File

@ -0,0 +1,64 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import chronos
## This is an extract from chronos/ratelimit.nim due to the found bug in the original implementation.
## Unfortunately that bug cannot be solved without harm the original features of TokenBucket class.
## So, this current shortcut is used to enable move ahead with nwaku rate limiter implementation.
type TokenBucket* = ref object
budget*: int ## Current number of tokens in the bucket
budgetCap: int ## Bucket capacity
lastTimeFull: Moment
## This timer measures the proper periodizaiton of the bucket refilling
fillDuration: Duration ## Refill period
## Update will take place if bucket is empty and trying to consume tokens.
## It checks if the bucket can be replenished as refill duration is passed or not.
proc update(bucket: TokenBucket, currentTime: Moment) =
if bucket.fillDuration == default(Duration):
bucket.budget = min(bucket.budgetCap, bucket.budget)
return
let timeDeltaFromLastFull = currentTime - bucket.lastTimeFull
if timeDeltaFromLastFull.milliseconds < bucket.fillDuration.milliseconds:
return
bucket.budget = bucket.budgetCap
bucket.lastTimeFull = currentTime
proc tryConsume*(bucket: TokenBucket, tokens: int, now = Moment.now()): bool =
## If `tokens` are available, consume them,
## Otherwhise, return false.
if bucket.budget == bucket.budgetCap:
bucket.lastTimeFull = now
if bucket.budget >= tokens:
bucket.budget -= tokens
return true
bucket.update(now)
if bucket.budget >= tokens:
bucket.budget -= tokens
return true
else:
return false
proc replenish*(bucket: TokenBucket, tokens: int, now = Moment.now()) =
## Add `tokens` to the budget (capped to the bucket capacity)
bucket.budget += tokens
bucket.update(now)
proc new*(T: type[TokenBucket], budgetCap: int, fillDuration: Duration = 1.seconds): T =
## Create a TokenBucket
T(
budget: budgetCap,
budgetCap: budgetCap,
fillDuration: fillDuration,
lastTimeFull: Moment.now(),
)

View File

@ -0,0 +1,12 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import metrics
declarePublicCounter waku_service_requests,
"number of non-relay service requests received", ["service"]
declarePublicCounter waku_service_requests_rejected,
"number of non-relay service requests received being rejected due to limit overdue",
["service"]

View File

@ -589,6 +589,22 @@ type WakuNodeConf* = object
name: "websocket-secure-cert-path"
.}: string
## Rate limitation config
## Currently default to switch of rate limit until become official
requestRateLimit* {.
desc:
"Number of requests to serve by each service in the specified period. Set it to 0 for unlimited",
defaultValue: 0,
name: "request-rate-limit"
.}: int
## Currently default to switch of rate limit until become official
requestRatePeriod* {.
desc: "Period of request rate limitation in seconds. Set it to 0 for unlimited",
defaultValue: 0,
name: "request-rate-period"
.}: int64
## Parsing
# NOTE: Keys are different in nim-libp2p

View File

@ -27,7 +27,8 @@ import
../waku_lightpush/common,
../waku_archive/driver/builder,
../waku_archive/retention_policy/builder,
../common/utils/parse_size_units
../common/utils/parse_size_units,
../common/ratelimit
## Peer persistence
@ -241,7 +242,9 @@ proc setupProtocols(
# Store setup
try:
await mountStore(node)
let rateLimitSetting: RateLimitSetting =
(conf.requestRateLimit, chronos.seconds(conf.requestRatePeriod))
await mountStore(node, rateLimitSetting)
except CatchableError:
return err("failed to mount waku store protocol: " & getCurrentExceptionMsg())
@ -256,7 +259,9 @@ proc setupProtocols(
# NOTE Must be mounted after relay
if conf.lightpush:
try:
await mountLightPush(node)
let rateLimitSetting: RateLimitSetting =
(conf.requestRateLimit, chronos.seconds(conf.requestRatePeriod))
await mountLightPush(node, rateLimitSetting)
except CatchableError:
return err("failed to mount waku lightpush protocol: " & getCurrentExceptionMsg())

View File

@ -45,7 +45,8 @@ import
../waku_peer_exchange,
../waku_rln_relay,
./config,
./peer_manager
./peer_manager,
../common/ratelimit
declarePublicCounter waku_node_messages, "number of messages received", ["type"]
declarePublicHistogram waku_histogram_message_size,
@ -699,7 +700,9 @@ proc toHistoryResult*(res: ArchiveResult): HistoryResult =
)
)
proc mountStore*(node: WakuNode) {.async, raises: [Defect, LPError].} =
proc mountStore*(
node: WakuNode, rateLimit: RateLimitSetting = DefaultGlobalNonRelayRateLimit
) {.async, raises: [Defect, LPError].} =
info "mounting waku store protocol"
if node.wakuArchive.isNil():
@ -718,7 +721,8 @@ proc mountStore*(node: WakuNode) {.async, raises: [Defect, LPError].} =
let response = await node.wakuArchive.findMessages(request)
return response.toHistoryResult()
node.wakuStore = WakuStore.new(node.peerManager, node.rng, queryHandler)
node.wakuStore =
WakuStore.new(node.peerManager, node.rng, queryHandler, some(rateLimit))
if node.started:
# Node has started already. Let's start store too.
@ -789,7 +793,9 @@ when defined(waku_exp_store_resume):
## Waku lightpush
proc mountLightPush*(node: WakuNode) {.async.} =
proc mountLightPush*(
node: WakuNode, rateLimit: RateLimitSetting = DefaultGlobalNonRelayRateLimit
) {.async.} =
info "mounting light push"
var pushHandler: PushMessageHandler
@ -813,7 +819,8 @@ proc mountLightPush*(node: WakuNode) {.async.} =
return ok()
debug "mounting lightpush with relay"
node.wakuLightPush = WakuLightPush.new(node.peerManager, node.rng, pushHandler)
node.wakuLightPush =
WakuLightPush.new(node.peerManager, node.rng, pushHandler, some(rateLimit))
if node.started:
# Node has started already. Let's start lightpush too.

View File

@ -77,6 +77,9 @@ proc installLightPushRequestHandler*(
return RestApiResponse.serviceUnavailable("Push request timed out")
if subFut.value().isErr():
if subFut.value().error == TooManyRequestsMessage:
return RestApiResponse.tooManyRequests("Request rate limmit reached")
return RestApiResponse.serviceUnavailable(
fmt("Failed to request a message push: {subFut.value().error}")
)

View File

@ -33,6 +33,9 @@ proc preconditionFailed*(
): RestApiResponse =
RestApiResponse.error(Http412, msg, $MIMETYPE_TEXT)
proc tooManyRequests*(t: typedesc[RestApiResponse], msg: string = ""): RestApiResponse =
RestApiResponse.error(Http429, msg, $MIMETYPE_TEXT)
proc jsonResponse*(
t: typedesc[RestApiResponse], data: auto, status: HttpCode = Http200
): SerdesResult[RestApiResponse] =

View File

@ -39,9 +39,15 @@ proc performHistoryQuery(
let res = queryFut.read()
if res.isErr():
const msg = "Error occurred in queryFut.read()"
error msg, error = res.error
return RestApiResponse.internalServerError(fmt("{msg} [{res.error}]"))
const TooManyRequestErrorStr =
$HistoryError(kind: HistoryErrorKind.TOO_MANY_REQUESTS)
if res.error == TooManyRequestErrorStr:
debug "Request rate limmit reached on peer ", storePeer
return RestApiResponse.tooManyRequests("Request rate limmit reached")
else:
const msg = "Error occurred in queryFut.read()"
error msg, error = res.error
return RestApiResponse.internalServerError(fmt("{msg} [{res.error}]"))
let storeResp = res.value.toStoreResponseRest()
let resp = RestApiResponse.jsonResponse(storeResp, status = Http200)

View File

@ -13,3 +13,5 @@ type WakuLightPushResult*[T] = Result[T, string]
type PushMessageHandler* = proc(
peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.}
const TooManyRequestsMessage* = "TOO_MANY_REQUESTS"

View File

@ -11,7 +11,11 @@ import
./common,
./rpc,
./rpc_codec,
./protocol_metrics
./protocol_metrics,
../common/ratelimit,
../common/waku_service_metrics
export ratelimit
logScope:
topics = "waku lightpush"
@ -20,6 +24,7 @@ type WakuLightPush* = ref object of LPProtocol
rng*: ref rand.HmacDrbgContext
peerManager*: PeerManager
pushHandler*: PushMessageHandler
requestRateLimiter*: Option[TokenBucket]
proc handleRequest*(
wl: WakuLightPush, peerId: PeerId, buffer: seq[byte]
@ -27,6 +32,7 @@ proc handleRequest*(
let reqDecodeRes = PushRPC.decode(buffer)
var
isSuccess = false
isRejectedDueRateLimit = false
pushResponseInfo = ""
requestId = ""
@ -34,7 +40,16 @@ proc handleRequest*(
pushResponseInfo = decodeRpcFailure & ": " & $reqDecodeRes.error
elif reqDecodeRes.get().request.isNone():
pushResponseInfo = emptyRequestBodyFailure
elif wl.requestRateLimiter.isSome() and not wl.requestRateLimiter.get().tryConsume(1):
isRejectedDueRateLimit = true
let pushRpcRequest = reqDecodeRes.get()
debug "lightpush request rejected due rate limit exceeded",
peerId = peerId, requestId = pushRpcRequest.requestId
pushResponseInfo = TooManyRequestsMessage
waku_service_requests_rejected.inc(labelValues = ["Lightpush"])
else:
waku_service_requests.inc(labelValues = ["Lightpush"])
let pushRpcRequest = reqDecodeRes.get()
requestId = pushRpcRequest.requestId
@ -55,7 +70,7 @@ proc handleRequest*(
isSuccess = handleRes.isOk()
pushResponseInfo = (if isSuccess: "OK" else: handleRes.error)
if not isSuccess:
if not isSuccess and not isRejectedDueRateLimit:
waku_lightpush_errors.inc(labelValues = [pushResponseInfo])
error "failed to push message", error = pushResponseInfo
let response = PushResponse(isSuccess: isSuccess, info: some(pushResponseInfo))
@ -76,7 +91,13 @@ proc new*(
peerManager: PeerManager,
rng: ref rand.HmacDrbgContext,
pushHandler: PushMessageHandler,
rateLimitSetting: Option[RateLimitSetting] = none[RateLimitSetting](),
): T =
let wl = WakuLightPush(rng: rng, peerManager: peerManager, pushHandler: pushHandler)
let wl = WakuLightPush(
rng: rng,
peerManager: peerManager,
pushHandler: pushHandler,
requestRateLimiter: newTokenBucket(rateLimitSetting),
)
wl.initProtocolHandler()
return wl

View File

@ -18,3 +18,4 @@ const
emptyRequestBodyFailure* = "empty_request_body_failure"
emptyResponseBodyFailure* = "empty_response_body_failure"
messagePushFailure* = "message_push_failure"
requestLimitReachedFailure* = "request_limit_reached_failure"

View File

@ -57,6 +57,7 @@ type
UNKNOWN = uint32(000)
BAD_RESPONSE = uint32(300)
BAD_REQUEST = uint32(400)
TOO_MANY_REQUESTS = uint32(429)
SERVICE_UNAVAILABLE = uint32(503)
PEER_DIAL_FAILURE = uint32(504)
@ -73,7 +74,7 @@ type
proc parse*(T: type HistoryErrorKind, kind: uint32): T =
case kind
of 000, 200, 300, 400, 503:
of 000, 200, 300, 400, 429, 503:
HistoryErrorKind(kind)
else:
HistoryErrorKind.UNKNOWN
@ -86,6 +87,8 @@ proc `$`*(err: HistoryError): string =
"BAD_RESPONSE: " & err.cause
of HistoryErrorKind.BAD_REQUEST:
"BAD_REQUEST: " & err.cause
of HistoryErrorKind.TOO_MANY_REQUESTS:
"TOO_MANY_REQUESTS"
of HistoryErrorKind.SERVICE_UNAVAILABLE:
"SERVICE_UNAVAILABLE"
of HistoryErrorKind.UNKNOWN:

View File

@ -18,7 +18,14 @@ import
libp2p/stream/connection,
metrics
import
../waku_core, ../node/peer_manager, ./common, ./rpc, ./rpc_codec, ./protocol_metrics
../waku_core,
../node/peer_manager,
./common,
./rpc,
./rpc_codec,
./protocol_metrics,
../common/ratelimit,
../common/waku_service_metrics
logScope:
topics = "waku store"
@ -33,6 +40,7 @@ type WakuStore* = ref object of LPProtocol
peerManager: PeerManager
rng: ref rand.HmacDrbgContext
queryHandler*: HistoryQueryHandler
requestRateLimiter*: Option[TokenBucket]
## Protocol
@ -55,6 +63,18 @@ proc initProtocolHandler(ws: WakuStore) =
# TODO: Return (BAD_REQUEST, cause: "empty query")
return
if ws.requestRateLimiter.isSome() and not ws.requestRateLimiter.get().tryConsume(1):
trace "store query request rejected due rate limit exceeded",
peerId = $conn.peerId, requestId = reqRpc.requestId
let error = HistoryError(kind: HistoryErrorKind.TOO_MANY_REQUESTS).toRPC()
let response = HistoryResponseRPC(error: error)
let rpc = HistoryRPC(requestId: reqRpc.requestId, response: some(response))
await conn.writeLp(rpc.encode().buffer)
waku_service_requests_rejected.inc(labelValues = ["Store"])
return
waku_service_requests.inc(labelValues = ["Store"])
let
requestId = reqRpc.requestId
request = reqRpc.query.get().toAPI()
@ -101,11 +121,17 @@ proc new*(
peerManager: PeerManager,
rng: ref rand.HmacDrbgContext,
queryHandler: HistoryQueryHandler,
rateLimitSetting: Option[RateLimitSetting] = none[RateLimitSetting](),
): T =
# Raise a defect if history query handler is nil
if queryHandler.isNil():
raise newException(NilAccessDefect, "history query handler is nil")
let ws = WakuStore(rng: rng, peerManager: peerManager, queryHandler: queryHandler)
let ws = WakuStore(
rng: rng,
peerManager: peerManager,
queryHandler: queryHandler,
requestRateLimiter: newTokenBucket(rateLimitSetting),
)
ws.initProtocolHandler()
ws

View File

@ -62,6 +62,7 @@ type
## the state of its request
NONE = uint32(0)
INVALID_CURSOR = uint32(1)
TOO_MANY_REQUESTS = uint32(429)
SERVICE_UNAVAILABLE = uint32(503)
HistoryResponseRPC* = object
@ -76,7 +77,7 @@ type
proc parse*(T: type HistoryResponseErrorRPC, kind: uint32): T =
case kind
of 0, 1, 503:
of 0, 1, 429, 503:
HistoryResponseErrorRPC(kind)
else:
# TODO: Improve error variants/move to satus codes
@ -169,6 +170,8 @@ proc toRPC*(err: HistoryError): HistoryResponseErrorRPC =
of HistoryErrorKind.BAD_REQUEST:
# TODO: Respond aksi with the reason
HistoryResponseErrorRPC.INVALID_CURSOR
of HistoryErrorKind.TOO_MANY_REQUESTS:
HistoryResponseErrorRPC.TOO_MANY_REQUESTS
of HistoryErrorKind.SERVICE_UNAVAILABLE:
HistoryResponseErrorRPC.SERVICE_UNAVAILABLE
else:
@ -179,6 +182,8 @@ proc toAPI*(err: HistoryResponseErrorRPC): HistoryError =
case err
of HistoryResponseErrorRPC.INVALID_CURSOR:
HistoryError(kind: HistoryErrorKind.BAD_REQUEST, cause: "invalid cursor")
of HistoryResponseErrorRPC.TOO_MANY_REQUESTS:
HistoryError(kind: HistoryErrorKind.TOO_MANY_REQUESTS)
of HistoryResponseErrorRPC.SERVICE_UNAVAILABLE:
HistoryError(kind: HistoryErrorKind.SERVICE_UNAVAILABLE)
else: