feat: Added flexible rate limit checks for store, legacy store and lightpush (#2668)

* Added flexible rate limit checks for store, legacy store and lightpush. Also added rate and traffic metrics.

* Fix chat2 after WakuLegacyStoreCodec rename

* Update waku/common/ratelimit.nim

Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com>

* Update waku/common/ratelimit.nim

Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com>

* Update waku/waku_store_legacy/protocol.nim

Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com>

* Fix review findings, added limit to debug logs

---------

Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com>
This commit is contained in:
NagyZoltanPeter 2024-05-09 20:07:49 +02:00 committed by GitHub
parent fa26d05f8e
commit 026d804a0d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 538 additions and 429 deletions

View File

@ -470,7 +470,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
echo "Connecting to storenode: " & $(storenode.get())
node.mountLegacyStoreClient()
node.peerManager.addServicePeer(storenode.get(), WakuStoreCodec)
node.peerManager.addServicePeer(storenode.get(), WakuLegacyStoreCodec)
proc storeHandler(response: HistoryResponse) {.gcsafe.} =
for msg in response.messages:

View File

@ -1,12 +1,16 @@
{.used.}
import
when defined(waku_exp_store_resume):
# TODO: Review store resume test cases (#1282)
# Ongoing changes to test code base had ruin this test meanwhile, need to investigate and fix
import
std/[options, tables, sets],
testutils/unittests,
chronos,
chronicles,
libp2p/crypto/crypto
import
import
../../waku/common/databases/db_sqlite,
../../waku/waku_archive/driver,
../../waku/waku_archive/driver/sqlite_driver/sqlite_driver,
@ -19,7 +23,7 @@ import
./testlib/common,
./testlib/switch
procSuite "Waku Store - resume store":
procSuite "Waku Store - resume store":
## Fixtures
let storeA = block:
let store = newTestMessageStore()
@ -120,7 +124,9 @@ procSuite "Waku Store - resume store":
serverSwitchB = newTestSwitch()
clientSwitch = newTestSwitch()
await allFutures(serverSwitchA.start(), serverSwitchB.start(), clientSwitch.start())
await allFutures(
serverSwitchA.start(), serverSwitchB.start(), clientSwitch.start()
)
let
serverA = await newTestWakuStoreNode(serverSwitchA, store = testStore)
@ -208,7 +214,9 @@ procSuite "Waku Store - resume store":
serverBSwitch = newTestSwitch()
clientSwitch = newTestSwitch()
await allFutures(serverASwitch.start(), serverBSwitch.start(), clientSwitch.start())
await allFutures(
serverASwitch.start(), serverBSwitch.start(), clientSwitch.start()
)
let
serverA = await newTestWakuStore(serverASwitch, store = storeA)
@ -242,7 +250,7 @@ procSuite "Waku Store - resume store":
## Cleanup
await allFutures(serverASwitch.stop(), serverBSwitch.stop(), clientSwitch.stop())
suite "WakuNode - waku store":
suite "WakuNode - waku store":
asyncTest "Resume proc fetches the history":
## Setup
let

View File

@ -3,8 +3,8 @@ when (NimMajor, NimMinor) < (1, 4):
else:
{.push raises: [].}
import std/options
import chronos/timer
import std/options, chronos/timer, libp2p/stream/connection
import ./tokenbucket
export tokenbucket
@ -23,3 +23,34 @@ proc newTokenBucket*(setting: Option[RateLimitSetting]): Option[TokenBucket] =
return none[TokenBucket]()
return some(TokenBucket.new(volume, period))
proc checkUsage(
t: var Option[TokenBucket], proto: string, conn: Connection
): bool {.raises: [].} =
if t.isNone():
return true
let tokenBucket = t.get()
if not tokenBucket.tryConsume(1):
return false
return true
template checkUsageLimit*(
t: var Option[TokenBucket],
proto: string,
conn: Connection,
bodyWithinLimit, bodyRejected: untyped,
) =
if t.checkUsage(proto, conn):
waku_service_requests.inc(labelValues = [proto])
bodyWithinLimit
else:
waku_service_requests_rejected.inc(labelValues = [proto])
bodyRejected
func `$`*(ob: Option[TokenBucket]): string {.inline.} =
if ob.isNone():
return "no-limit"
return $ob.get()

View File

@ -62,3 +62,6 @@ proc new*(T: type[TokenBucket], budgetCap: int, fillDuration: Duration = 1.secon
fillDuration: fillDuration,
lastTimeFull: Moment.now(),
)
func `$`*(b: TokenBucket): string {.inline.} =
return $b.budgetCap & "/" & $b.fillDuration

View File

@ -10,3 +10,9 @@ declarePublicCounter waku_service_requests,
declarePublicCounter waku_service_requests_rejected,
"number of non-relay service requests received being rejected due to limit overdue",
["service"]
declarePublicCounter waku_service_inbound_network_bytes,
"total incoming traffic of specific waku services", labels = ["service"]
declarePublicCounter waku_service_outbound_network_bytes,
"total outgoing traffic of specific waku services", labels = ["service"]

View File

@ -269,7 +269,9 @@ proc setupProtocols(
if conf.storenode != "":
let storeNode = parsePeerInfo(conf.storenode)
if storeNode.isOk():
node.peerManager.addServicePeer(storeNode.value, legacy_common.WakuStoreCodec)
node.peerManager.addServicePeer(
storeNode.value, legacy_common.WakuLegacyStoreCodec
)
else:
return err("failed to set node waku legacy store peer: " & storeNode.error)

View File

@ -735,7 +735,7 @@ proc mountLegacyStore*(
await node.wakuLegacyStore.start()
node.switch.mount(
node.wakuLegacyStore, protocolMatcher(legacy_store_common.WakuStoreCodec)
node.wakuLegacyStore, protocolMatcher(legacy_store_common.WakuLegacyStoreCodec)
)
proc mountLegacyStoreClient*(node: WakuNode) =
@ -771,7 +771,7 @@ proc query*(
if node.wakuLegacyStoreClient.isNil():
return err("waku legacy store client is nil")
let peerOpt = node.peerManager.selectPeer(legacy_store_common.WakuStoreCodec)
let peerOpt = node.peerManager.selectPeer(legacy_store_common.WakuLegacyStoreCodec)
if peerOpt.isNone():
error "no suitable remote peers"
return err("peer_not_found_failure")

View File

@ -14,6 +14,7 @@ import
import
../../../waku_core,
../../../waku_store_legacy/common,
../../../waku_store/common,
../../../waku_filter_v2,
../../../waku_lightpush/common,
../../../waku_relay,
@ -43,7 +44,6 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) =
var peers: WakuPeers = @[]
if not node.wakuRelay.isNil():
# Map managed peers to WakuPeers and add to return list
let relayPeers = node.peerManager.peerStore.peers(WakuRelayCodec).mapIt(
(
multiaddr: constructMultiaddrStr(it),
@ -54,7 +54,6 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) =
tuplesToWakuPeers(peers, relayPeers)
if not node.wakuFilter.isNil():
# Map WakuFilter peers to WakuPeers and add to return list
let filterV2Peers = node.peerManager.peerStore
.peers(WakuFilterSubscribeCodec)
.mapIt(
@ -66,8 +65,7 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) =
)
tuplesToWakuPeers(peers, filterV2Peers)
if not node.wakuLegacyStore.isNil():
# Map WakuStore peers to WakuPeers and add to return list
if not node.wakuStore.isNil():
let storePeers = node.peerManager.peerStore.peers(WakuStoreCodec).mapIt(
(
multiaddr: constructMultiaddrStr(it),
@ -77,6 +75,18 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) =
)
tuplesToWakuPeers(peers, storePeers)
if not node.wakuLegacyStore.isNil():
let legacyStorePeers = node.peerManager.peerStore
.peers(WakuLegacyStoreCodec)
.mapIt(
(
multiaddr: constructMultiaddrStr(it),
protocol: WakuLegacyStoreCodec,
connected: it.connectedness == Connectedness.Connected,
)
)
tuplesToWakuPeers(peers, legacyStorePeers)
if not node.wakuLightPush.isNil():
# Map WakuStore peers to WakuPeers and add to return list
let lightpushPeers = node.peerManager.peerStore.peers(WakuLightPushCodec).mapIt(

View File

@ -243,7 +243,7 @@ proc installStoreApiHandlers*(
return RestApiResponse.badRequest(error)
let peerAddr = parsedPeerAddr.valueOr:
node.peerManager.selectPeer(WakuStoreCodec).valueOr:
node.peerManager.selectPeer(WakuLegacyStoreCodec).valueOr:
let handler = discHandler.valueOr:
return NoPeerNoDiscError

View File

@ -32,7 +32,6 @@ proc handleRequest*(
let reqDecodeRes = PushRPC.decode(buffer)
var
isSuccess = false
isRejectedDueRateLimit = false
pushResponseInfo = ""
requestId = ""
@ -40,16 +39,7 @@ proc handleRequest*(
pushResponseInfo = decodeRpcFailure & ": " & $reqDecodeRes.error
elif reqDecodeRes.get().request.isNone():
pushResponseInfo = emptyRequestBodyFailure
elif wl.requestRateLimiter.isSome() and not wl.requestRateLimiter.get().tryConsume(1):
isRejectedDueRateLimit = true
let pushRpcRequest = reqDecodeRes.get()
debug "lightpush request rejected due rate limit exceeded",
peerId = peerId, requestId = pushRpcRequest.requestId
pushResponseInfo = TooManyRequestsMessage
waku_service_requests_rejected.inc(labelValues = ["Lightpush"])
else:
waku_service_requests.inc(labelValues = ["Lightpush"])
let pushRpcRequest = reqDecodeRes.get()
requestId = pushRpcRequest.requestId
@ -70,7 +60,7 @@ proc handleRequest*(
isSuccess = handleRes.isOk()
pushResponseInfo = (if isSuccess: "OK" else: handleRes.error)
if not isSuccess and not isRejectedDueRateLimit:
if not isSuccess:
waku_lightpush_errors.inc(labelValues = [pushResponseInfo])
error "failed to push message", error = pushResponseInfo
let response = PushResponse(isSuccess: isSuccess, info: some(pushResponseInfo))
@ -79,10 +69,35 @@ proc handleRequest*(
proc initProtocolHandler(wl: WakuLightPush) =
proc handle(conn: Connection, proto: string) {.async.} =
var rpc: PushRPC
wl.requestRateLimiter.checkUsageLimit(WakuLightPushCodec, conn):
let buffer = await conn.readLp(DefaultMaxRpcSize)
let rpc = await handleRequest(wl, conn.peerId, buffer)
waku_service_inbound_network_bytes.inc(
amount = buffer.len().int64, labelValues = [WakuLightPushCodec]
)
rpc = await handleRequest(wl, conn.peerId, buffer)
do:
debug "lightpush request rejected due rate limit exceeded",
peerId = conn.peerId, limit = $wl.requestRateLimiter
rpc = static(
PushRPC(
## We will not copy and decode RPC buffer from stream only for requestId
## in reject case as it is comparably too expensive and opens possible
## attack surface
requestId: "N/A",
response:
some(PushResponse(isSuccess: false, info: some(TooManyRequestsMessage))),
)
)
await conn.writeLp(rpc.encode().buffer)
## For lightpush might not worth to measure outgoing trafic as it is only
## small respones about success/failure
wl.handler = handle
wl.codec = WakuLightPushCodec

View File

@ -43,7 +43,7 @@ type WakuStore* = ref object of LPProtocol
## Protocol
proc handleQueryRequest*(
proc handleQueryRequest(
self: WakuStore, requestor: PeerId, raw_request: seq[byte]
): Future[seq[byte]] {.async.} =
var res = StoreQueryResponse()
@ -59,21 +59,6 @@ proc handleQueryRequest*(
let requestId = req.requestId
if self.requestRateLimiter.isSome() and not self.requestRateLimiter.get().tryConsume(
1
):
debug "store query request rejected due rate limit exceeded",
peerId = $requestor, requestId = requestId
res.statusCode = uint32(ErrorCode.TOO_MANY_REQUESTS)
res.statusDesc = $ErrorCode.TOO_MANY_REQUESTS
waku_service_requests_rejected.inc(labelValues = ["Store"])
return res.encode().buffer
waku_service_requests.inc(labelValues = ["Store"])
info "received store query request",
peerId = requestor, requestId = requestId, request = req
waku_store_queries.inc()
@ -99,7 +84,18 @@ proc handleQueryRequest*(
return res.encode().buffer
proc initProtocolHandler(self: WakuStore) =
let rejectReposnseBuffer = StoreQueryResponse(
## We will not copy and decode RPC buffer from stream only for requestId
## in reject case as it is comparably too expensive and opens possible
## attack surface
requestId: "N/A",
statusCode: uint32(ErrorCode.TOO_MANY_REQUESTS),
statusDesc: $ErrorCode.TOO_MANY_REQUESTS,
).encode().buffer
proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} =
var resBuf: seq[byte]
self.requestRateLimiter.checkUsageLimit(WakuStoreCodec, conn):
let readRes = catch:
await conn.readLp(DefaultMaxRpcSize.int)
@ -107,7 +103,15 @@ proc initProtocolHandler(self: WakuStore) =
error "Connection read error", error = error.msg
return
let resBuf = await self.handleQueryRequest(conn.peerId, reqBuf)
waku_service_inbound_network_bytes.inc(
amount = reqBuf.len().int64, labelValues = [WakuStoreCodec]
)
resBuf = await self.handleQueryRequest(conn.peerId, reqBuf)
do:
debug "store query request rejected due rate limit exceeded",
peerId = conn.peerId, limit = $self.requestRateLimiter
resBuf = rejectReposnseBuffer
let writeRes = catch:
await conn.writeLp(resBuf)
@ -116,6 +120,10 @@ proc initProtocolHandler(self: WakuStore) =
error "Connection write error", error = writeRes.error.msg
return
waku_service_outbound_network_bytes.inc(
amount = resBuf.len().int64, labelValues = [WakuStoreCodec]
)
self.handler = handler
self.codec = WakuStoreCodec

View File

@ -39,7 +39,7 @@ proc new*(
proc sendHistoryQueryRPC(
w: WakuStoreClient, req: HistoryQuery, peer: RemotePeerInfo
): Future[HistoryResult] {.async, gcsafe.} =
let connOpt = await w.peerManager.dialPeer(peer, WakuStoreCodec)
let connOpt = await w.peerManager.dialPeer(peer, WakuLegacyStoreCodec)
if connOpt.isNone():
waku_legacy_store_errors.inc(labelValues = [dialFailure])
return err(HistoryError(kind: HistoryErrorKind.PEER_DIAL_FAILURE, address: $peer))
@ -217,7 +217,7 @@ when defined(waku_exp_store_resume):
else:
debug "no candidate list is provided, selecting a random peer"
# if no peerList is set then query from one of the peers stored in the peer manager
let peerOpt = w.peerManager.selectPeer(WakuStoreCodec)
let peerOpt = w.peerManager.selectPeer(WakuLegacyStoreCodec)
if peerOpt.isNone():
warn "no suitable remote peers"
waku_legacy_store_errors.inc(labelValues = [peerNotFoundFailure])

View File

@ -7,7 +7,7 @@ import std/[options, sequtils], stew/results, stew/byteutils, nimcrypto/sha2
import ../waku_core, ../common/paging
const
WakuStoreCodec* = "/vac/waku/store/2.0.0-beta4"
WakuLegacyStoreCodec* = "/vac/waku/store/2.0.0-beta4"
DefaultPageSize*: uint64 = 20

View File

@ -44,13 +44,12 @@ type WakuStore* = ref object of LPProtocol
## Protocol
proc initProtocolHandler(ws: WakuStore) =
proc handler(conn: Connection, proto: string) {.async.} =
let buf = await conn.readLp(DefaultMaxRpcSize.int)
let decodeRes = HistoryRPC.decode(buf)
proc handleLegacyQueryRequest(
self: WakuStore, requestor: PeerId, raw_request: seq[byte]
): Future[seq[byte]] {.async.} =
let decodeRes = HistoryRPC.decode(raw_request)
if decodeRes.isErr():
error "failed to decode rpc", peerId = $conn.peerId
error "failed to decode rpc", peerId = requestor
waku_legacy_store_errors.inc(labelValues = [decodeRpcFailure])
# TODO: Return (BAD_REQUEST, cause: "decode rpc failed")
return
@ -58,63 +57,90 @@ proc initProtocolHandler(ws: WakuStore) =
let reqRpc = decodeRes.value
if reqRpc.query.isNone():
error "empty query rpc", peerId = $conn.peerId, requestId = reqRpc.requestId
error "empty query rpc", peerId = requestor, requestId = reqRpc.requestId
waku_legacy_store_errors.inc(labelValues = [emptyRpcQueryFailure])
# TODO: Return (BAD_REQUEST, cause: "empty query")
return
if ws.requestRateLimiter.isSome() and not ws.requestRateLimiter.get().tryConsume(1):
trace "store query request rejected due rate limit exceeded",
peerId = $conn.peerId, requestId = reqRpc.requestId
let error = HistoryError(kind: HistoryErrorKind.TOO_MANY_REQUESTS).toRPC()
let response = HistoryResponseRPC(error: error)
let rpc = HistoryRPC(requestId: reqRpc.requestId, response: some(response))
await conn.writeLp(rpc.encode().buffer)
waku_service_requests_rejected.inc(labelValues = ["Store"])
return
waku_service_requests.inc(labelValues = ["Store"])
let
requestId = reqRpc.requestId
request = reqRpc.query.get().toAPI()
info "received history query",
peerId = conn.peerId, requestId = requestId, query = request
peerId = requestor, requestId = requestId, query = request
waku_legacy_store_queries.inc()
var responseRes: HistoryResult
try:
responseRes = await ws.queryHandler(request)
responseRes = await self.queryHandler(request)
except Exception:
error "history query failed",
peerId = $conn.peerId, requestId = requestId, error = getCurrentExceptionMsg()
peerId = requestor, requestId = requestId, error = getCurrentExceptionMsg()
let error = HistoryError(kind: HistoryErrorKind.UNKNOWN).toRPC()
let response = HistoryResponseRPC(error: error)
let rpc = HistoryRPC(requestId: requestId, response: some(response))
await conn.writeLp(rpc.encode().buffer)
return
return HistoryRPC(requestId: requestId, response: some(response)).encode().buffer
if responseRes.isErr():
error "history query failed",
peerId = $conn.peerId, requestId = requestId, error = responseRes.error
peerId = requestor, requestId = requestId, error = responseRes.error
let response = responseRes.toRPC()
let rpc = HistoryRPC(requestId: requestId, response: some(response))
await conn.writeLp(rpc.encode().buffer)
return
return HistoryRPC(requestId: requestId, response: some(response)).encode().buffer
let response = responseRes.toRPC()
info "sending history response",
peerId = conn.peerId, requestId = requestId, messages = response.messages.len
peerId = requestor, requestId = requestId, messages = response.messages.len
let rpc = HistoryRPC(requestId: requestId, response: some(response))
await conn.writeLp(rpc.encode().buffer)
return HistoryRPC(requestId: requestId, response: some(response)).encode().buffer
proc initProtocolHandler(ws: WakuStore) =
let rejectResponseBuf = HistoryRPC(
## We will not copy and decode RPC buffer from stream only for requestId
## in reject case as it is comparably too expensive and opens possible
## attack surface
requestId: "N/A",
response: some(
HistoryResponseRPC(
error: HistoryError(kind: HistoryErrorKind.TOO_MANY_REQUESTS).toRPC()
)
),
).encode().buffer
proc handler(conn: Connection, proto: string) {.async, closure.} =
var resBuf: seq[byte]
ws.requestRateLimiter.checkUsageLimit(WakuLegacyStoreCodec, conn):
let readRes = catch:
await conn.readLp(DefaultMaxRpcSize.int)
let reqBuf = readRes.valueOr:
error "Connection read error", error = error.msg
return
waku_service_inbound_network_bytes.inc(
amount = reqBuf.len().int64, labelValues = [WakuLegacyStoreCodec]
)
resBuf = await ws.handleLegacyQueryRequest(conn.peerId, reqBuf)
do:
debug "Legacy store query request rejected due rate limit exceeded",
peerId = conn.peerId, limit = $ws.requestRateLimiter
resBuf = rejectResponseBuf
let writeRes = catch:
await conn.writeLp(resBuf)
if writeRes.isErr():
error "Connection write error", error = writeRes.error.msg
return
waku_service_outbound_network_bytes.inc(
amount = resBuf.len().int64, labelValues = [WakuLegacyStoreCodec]
)
ws.handler = handler
ws.codec = WakuStoreCodec
ws.codec = WakuLegacyStoreCodec
proc new*(
T: type WakuStore,