nwaku/waku/waku_store/protocol.nim
NagyZoltanPeter ba418ab5ba
feat: DOS protection of non relay protocols - rate limit phase3 (#2897)
* DOS protection of non relay protocols - rate limit phase3:
- Enhanced TokenBucket to be able to add compensation tokens based on previous usage percentage,
- per peer rate limiter 'PeerRateLimier' applied on waku_filter_v2 with opinionated default of acceptable request rate
- Add traffic metrics to filter message push
- RequestRateLimiter added to combine simple token bucket limiting of request numbers but consider per peer usage over time and prevent some peers to over use the service
  (although currently rule violating peers will not be disconnected by this time only their requests will get not served)
- TimedMap utility created (inspired and taken from libp2p TimedCache) which serves as forgiving feature for peers had been overusing the service.
- Added more tests
- Fix rebase issues
- Applied new RequestRateLimiter for store and legacy_store and lightpush
* Incorporate review comments, typos, file/class naming and placement changes.
* Add issue link reference of the original issue with nim-chronos TokenBucket
* Make TimedEntry of TimedMap private and not mixable with similar named in libp2p
* Fix review comments, renamings, const instead of values and more comments.
2024-07-16 15:46:21 +02:00

146 lines
4.1 KiB
Nim

## Waku Store protocol for historical messaging support.
## See spec for more details:
## https://github.com/vacp2p/specs/blob/master/specs/waku/v2/waku-store.md
{.push raises: [].}
import
std/options,
results,
chronicles,
chronos,
bearssl/rand,
libp2p/crypto/crypto,
libp2p/protocols/protocol,
libp2p/protobuf/minprotobuf,
libp2p/stream/connection,
metrics
import
../waku_core,
../node/peer_manager,
./common,
./rpc_codec,
./protocol_metrics,
../common/rate_limit/request_limiter
logScope:
topics = "waku store"
const MaxMessageTimestampVariance* = getNanoSecondTime(20)
# 20 seconds maximum allowable sender timestamp "drift"
type StoreQueryRequestHandler* =
proc(req: StoreQueryRequest): Future[StoreQueryResult] {.async, gcsafe.}
type WakuStore* = ref object of LPProtocol
peerManager: PeerManager
rng: ref rand.HmacDrbgContext
requestHandler*: StoreQueryRequestHandler
requestRateLimiter*: RequestRateLimiter
## Protocol
proc handleQueryRequest(
self: WakuStore, requestor: PeerId, raw_request: seq[byte]
): Future[seq[byte]] {.async.} =
var res = StoreQueryResponse()
let req = StoreQueryRequest.decode(raw_request).valueOr:
error "failed to decode rpc", peerId = requestor, error = $error
waku_store_errors.inc(labelValues = [decodeRpcFailure])
res.statusCode = uint32(ErrorCode.BAD_REQUEST)
res.statusDesc = "decoding rpc failed: " & $error
return res.encode().buffer
let requestId = req.requestId
info "received store query request",
peerId = requestor, requestId = requestId, request = req
waku_store_queries.inc()
let queryResult = await self.requestHandler(req)
res = queryResult.valueOr:
error "store query failed",
peerId = requestor, requestId = requestId, error = $error
res.statusCode = uint32(error.kind)
res.statusDesc = $error
return res.encode().buffer
res.requestId = requestId
res.statusCode = 200
res.statusDesc = "OK"
info "sending store query response",
peerId = requestor, requestId = requestId, messages = res.messages.len
return res.encode().buffer
proc initProtocolHandler(self: WakuStore) =
let rejectReposnseBuffer = StoreQueryResponse(
## We will not copy and decode RPC buffer from stream only for requestId
## in reject case as it is comparably too expensive and opens possible
## attack surface
requestId: "N/A",
statusCode: uint32(ErrorCode.TOO_MANY_REQUESTS),
statusDesc: $ErrorCode.TOO_MANY_REQUESTS,
).encode().buffer
proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} =
var resBuf: seq[byte]
self.requestRateLimiter.checkUsageLimit(WakuStoreCodec, conn):
let readRes = catch:
await conn.readLp(DefaultMaxRpcSize.int)
let reqBuf = readRes.valueOr:
error "Connection read error", error = error.msg
return
waku_service_network_bytes.inc(
amount = reqBuf.len().int64, labelValues = [WakuStoreCodec, "in"]
)
resBuf = await self.handleQueryRequest(conn.peerId, reqBuf)
do:
debug "store query request rejected due rate limit exceeded",
peerId = conn.peerId, limit = $self.requestRateLimiter.setting
resBuf = rejectReposnseBuffer
let writeRes = catch:
await conn.writeLp(resBuf)
if writeRes.isErr():
error "Connection write error", error = writeRes.error.msg
return
waku_service_network_bytes.inc(
amount = resBuf.len().int64, labelValues = [WakuStoreCodec, "out"]
)
self.handler = handler
self.codec = WakuStoreCodec
proc new*(
T: type WakuStore,
peerManager: PeerManager,
rng: ref rand.HmacDrbgContext,
requestHandler: StoreQueryRequestHandler,
rateLimitSetting: Option[RateLimitSetting] = none[RateLimitSetting](),
): T =
if requestHandler.isNil(): # TODO use an Option instead ???
raise newException(NilAccessDefect, "history query handler is nil")
let store = WakuStore(
rng: rng,
peerManager: peerManager,
requestHandler: requestHandler,
requestRateLimiter: newRequestRateLimiter(rateLimitSetting),
)
store.initProtocolHandler()
return store