nwaku/waku/waku_store/protocol.nim
NagyZoltanPeter 026d804a0d
feat: Added flexible rate limit checks for store, legacy store and lightpush (#2668)
* Added flexible rate limit checks for store, legacy store and lightpush. Also added rate and traffic metrics.

* Fix chat2 after WakuLegacyStoreCodec rename

* Update waku/common/ratelimit.nim

Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com>

* Update waku/common/ratelimit.nim

Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com>

* Update waku/waku_store_legacy/protocol.nim

Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com>

* Fix review findings, added limit to debug logs

---------

Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com>
2024-05-09 20:07:49 +02:00

150 lines
4.2 KiB
Nim

## Waku Store protocol for historical messaging support.
## See spec for more details:
## https://github.com/vacp2p/specs/blob/master/specs/waku/v2/waku-store.md
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/options,
stew/results,
chronicles,
chronos,
bearssl/rand,
libp2p/crypto/crypto,
libp2p/protocols/protocol,
libp2p/protobuf/minprotobuf,
libp2p/stream/connection,
metrics
import
../waku_core,
../node/peer_manager,
./common,
./rpc_codec,
./protocol_metrics,
../common/ratelimit,
../common/waku_service_metrics
logScope:
topics = "waku store"
const MaxMessageTimestampVariance* = getNanoSecondTime(20)
# 20 seconds maximum allowable sender timestamp "drift"
type StoreQueryRequestHandler* =
proc(req: StoreQueryRequest): Future[StoreQueryResult] {.async, gcsafe.}
type WakuStore* = ref object of LPProtocol
peerManager: PeerManager
rng: ref rand.HmacDrbgContext
requestHandler*: StoreQueryRequestHandler
requestRateLimiter*: Option[TokenBucket]
## Protocol
proc handleQueryRequest(
self: WakuStore, requestor: PeerId, raw_request: seq[byte]
): Future[seq[byte]] {.async.} =
var res = StoreQueryResponse()
let req = StoreQueryRequest.decode(raw_request).valueOr:
error "failed to decode rpc", peerId = requestor, error = $error
waku_store_errors.inc(labelValues = [decodeRpcFailure])
res.statusCode = uint32(ErrorCode.BAD_REQUEST)
res.statusDesc = "decoding rpc failed: " & $error
return res.encode().buffer
let requestId = req.requestId
info "received store query request",
peerId = requestor, requestId = requestId, request = req
waku_store_queries.inc()
let queryResult = await self.requestHandler(req)
res = queryResult.valueOr:
error "store query failed",
peerId = requestor, requestId = requestId, error = $error
res.statusCode = uint32(error.kind)
res.statusDesc = $error
return res.encode().buffer
res.requestId = requestId
res.statusCode = 200
res.statusDesc = "OK"
info "sending store query response",
peerId = requestor, requestId = requestId, messages = res.messages.len
return res.encode().buffer
proc initProtocolHandler(self: WakuStore) =
let rejectReposnseBuffer = StoreQueryResponse(
## We will not copy and decode RPC buffer from stream only for requestId
## in reject case as it is comparably too expensive and opens possible
## attack surface
requestId: "N/A",
statusCode: uint32(ErrorCode.TOO_MANY_REQUESTS),
statusDesc: $ErrorCode.TOO_MANY_REQUESTS,
).encode().buffer
proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} =
var resBuf: seq[byte]
self.requestRateLimiter.checkUsageLimit(WakuStoreCodec, conn):
let readRes = catch:
await conn.readLp(DefaultMaxRpcSize.int)
let reqBuf = readRes.valueOr:
error "Connection read error", error = error.msg
return
waku_service_inbound_network_bytes.inc(
amount = reqBuf.len().int64, labelValues = [WakuStoreCodec]
)
resBuf = await self.handleQueryRequest(conn.peerId, reqBuf)
do:
debug "store query request rejected due rate limit exceeded",
peerId = conn.peerId, limit = $self.requestRateLimiter
resBuf = rejectReposnseBuffer
let writeRes = catch:
await conn.writeLp(resBuf)
if writeRes.isErr():
error "Connection write error", error = writeRes.error.msg
return
waku_service_outbound_network_bytes.inc(
amount = resBuf.len().int64, labelValues = [WakuStoreCodec]
)
self.handler = handler
self.codec = WakuStoreCodec
proc new*(
T: type WakuStore,
peerManager: PeerManager,
rng: ref rand.HmacDrbgContext,
requestHandler: StoreQueryRequestHandler,
rateLimitSetting: Option[RateLimitSetting] = none[RateLimitSetting](),
): T =
if requestHandler.isNil(): # TODO use an Option instead ???
raise newException(NilAccessDefect, "history query handler is nil")
let store = WakuStore(
rng: rng,
peerManager: peerManager,
requestHandler: requestHandler,
requestRateLimiter: newTokenBucket(rateLimitSetting),
)
store.initProtocolHandler()
return store