nwaku/waku/waku_store/protocol.nim

163 lines
4.9 KiB
Nim
Raw Permalink Normal View History

## Waku Store protocol for historical messaging support.
## See spec for more details:
## https://github.com/vacp2p/specs/blob/master/specs/waku/v2/waku-store.md
{.push raises: [].}
2021-06-09 14:37:08 +00:00
import
std/[options, times],
results,
chronicles,
chronos,
bearssl/rand,
libp2p/crypto/crypto,
libp2p/protocols/protocol,
libp2p/protobuf/minprotobuf,
libp2p/stream/connection,
metrics
import
../waku_core,
../node/peer_manager,
./common,
./rpc_codec,
./protocol_metrics,
../common/rate_limit/request_limiter
logScope:
topics = "waku store"
2024-04-25 13:09:52 +00:00
type StoreQueryRequestHandler* =
proc(req: StoreQueryRequest): Future[StoreQueryResult] {.async, gcsafe.}
type WakuStore* = ref object of LPProtocol
peerManager: PeerManager
rng: ref rand.HmacDrbgContext
2024-04-25 13:09:52 +00:00
requestHandler*: StoreQueryRequestHandler
requestRateLimiter*: RequestRateLimiter
## Protocol
Pagination feature/indexing waku messages (#233) * changes the digest type to MDigest[256] and modifies the computeIndex * fixes formatting issue * adds the pagination with its tests stores and retrieves IndexedWakuMessage adds the paginate proc adds the paginate function fixes some formatting issues minor edits indentation and fixes a bug removes unused imports minor fixes indentations and adds a new testcase adds indexedWakuMessageComparison adds `==` proc for IndexedWakuMessage separates the comparison of index and indexed waku messages adds testcases for the Index comparison and IndexedWakuMessage comparison WIP WIP: adds an decoder for Index removes an unnecessary imports WIP adds findIndex() proc removes the equality check '==' for IndexedWakuMessages edits the code format and adds the pagination test edits paginate() proc to work on a copy of the input list deletes unnecessary echo adds the boundary calculations for forward and backward pagination adds test cases for the page boundaries tests corner cases for the queried cursor and pagesize minor adds some comments adds a proc to extract WakuMessages from a list of IndexedWakuMessages integrates pagination into the findMessages proc adds some comments changes paginate to paginateWithIndex removes some echos modifies paginateWithIndex to handle invalid cursors adds test case for an invalid cursor WIP: adds a `$` proc for IndexedWakuMessages adds some debugging message prints adds an integration test for handling query with pagination * fixes a type mismatch issue in the min proc * replaces boolean direction with their enums and updates contentTopics * adds the unit test for the sorting of the indexed waku messages * fixes a flaky test * fixes a flaky test * removes index equality check proc * handles an initial query with an empty cursor * adds test for the initial query * adds integration test for pagination * adds a test for empty message list * adds comments and fixes an issue * adds comments * code cleanup * removes the content topic validation check * resolves the errors related to the windows CI tests * Update waku/protocol/v2/waku_store.nim Co-authored-by: Oskar Thorén <ot@oskarthoren.com> * Update waku/protocol/v2/waku_store.nim Co-authored-by: Oskar Thorén <ot@oskarthoren.com> * Update tests/v2/test_waku_pagination.nim Co-authored-by: Oskar Thorén <ot@oskarthoren.com> * Update waku/protocol/v2/waku_store.nim Co-authored-by: Oskar Thorén <ot@oskarthoren.com> * Update waku/protocol/v2/waku_store.nim Co-authored-by: Oskar Thorén <ot@oskarthoren.com> * Update waku/protocol/v2/waku_store.nim Co-authored-by: Oskar Thorén <ot@oskarthoren.com> * changes the output type of findIndex to Option * Update tests/v2/test_waku_pagination.nim Co-authored-by: Dean Eigenmann <7621705+decanus@users.noreply.github.com> * Update tests/v2/test_waku_pagination.nim Co-authored-by: Dean Eigenmann <7621705+decanus@users.noreply.github.com> * Update tests/v2/test_waku_pagination.nim Co-authored-by: Dean Eigenmann <7621705+decanus@users.noreply.github.com> * Apply suggestions from code review Co-authored-by: Dean Eigenmann <7621705+decanus@users.noreply.github.com> * Apply suggestions from code review Co-authored-by: Dean Eigenmann <7621705+decanus@users.noreply.github.com> * adds some comments * fixes an indentation issue * some code modification for array initialization * Apply suggestions from code review Co-authored-by: Oskar Thorén <ot@oskarthoren.com> Co-authored-by: Dean Eigenmann <7621705+decanus@users.noreply.github.com> * does some code reorganizations and clean up * CreateSampleList to createSampleList * replaces a byte array literal initialization with a for loop * relocates indexedWakuMessageComparison and indexComparison * minor Co-authored-by: Oskar Thorén <ot@oskarthoren.com> Co-authored-by: Dean Eigenmann <7621705+decanus@users.noreply.github.com>
2020-11-09 04:48:09 +00:00
type StoreResp = tuple[resp: seq[byte], requestId: string]
proc handleQueryRequest(
2024-04-25 13:09:52 +00:00
self: WakuStore, requestor: PeerId, raw_request: seq[byte]
): Future[StoreResp] {.async.} =
2024-04-25 13:09:52 +00:00
var res = StoreQueryResponse()
2024-04-25 13:09:52 +00:00
let req = StoreQueryRequest.decode(raw_request).valueOr:
error "failed to decode rpc", peerId = requestor, error = $error
2024-04-25 13:09:52 +00:00
waku_store_errors.inc(labelValues = [decodeRpcFailure])
2024-04-25 13:09:52 +00:00
res.statusCode = uint32(ErrorCode.BAD_REQUEST)
res.statusDesc = "decoding rpc failed: " & $error
return (res.encode().buffer, "not_parsed_requestId")
2024-04-25 13:09:52 +00:00
let requestId = req.requestId
2024-04-25 13:09:52 +00:00
info "received store query request",
peerId = requestor, requestId = requestId, request = req
waku_store_queries.inc()
let queryResult = await self.requestHandler(req)
res = queryResult.valueOr:
error "store query failed",
peerId = requestor, requestId = requestId, error = $error
2024-04-25 13:09:52 +00:00
res.statusCode = uint32(error.kind)
res.statusDesc = $error
return (res.encode().buffer, "not_parsed_requestId")
2024-04-25 13:09:52 +00:00
res.requestId = requestId
res.statusCode = 200
res.statusDesc = "OK"
2024-04-25 13:09:52 +00:00
info "sending store query response",
peerId = requestor, requestId = requestId, messages = res.messages.len
return (res.encode().buffer, requestId)
2024-04-25 13:09:52 +00:00
proc initProtocolHandler(self: WakuStore) =
let rejectReposnseBuffer = StoreQueryResponse(
## We will not copy and decode RPC buffer from stream only for requestId
## in reject case as it is comparably too expensive and opens possible
## attack surface
requestId: "N/A",
statusCode: uint32(ErrorCode.TOO_MANY_REQUESTS),
statusDesc: $ErrorCode.TOO_MANY_REQUESTS,
).encode().buffer
2024-04-25 13:09:52 +00:00
proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} =
var successfulQuery = false ## only consider the correct queries in metrics
var resBuf: StoreResp
var queryDuration: float
self.requestRateLimiter.checkUsageLimit(WakuStoreCodec, conn):
let readRes = catch:
await conn.readLp(DefaultMaxRpcSize.int)
2024-04-25 13:09:52 +00:00
let reqBuf = readRes.valueOr:
error "Connection read error", error = error.msg
return
waku_service_network_bytes.inc(
amount = reqBuf.len().int64, labelValues = [WakuStoreCodec, "in"]
)
let queryStartTime = getTime().toUnixFloat()
resBuf = await self.handleQueryRequest(conn.peerId, reqBuf)
queryDuration = getTime().toUnixFloat() - queryStartTime
waku_store_time_seconds.set(queryDuration, ["query-db-time"])
successfulQuery = true
do:
debug "store query request rejected due rate limit exceeded",
peerId = conn.peerId, limit = $self.requestRateLimiter.setting
resBuf = (rejectReposnseBuffer, "rejected")
let writeRespStartTime = getTime().toUnixFloat()
2024-04-25 13:09:52 +00:00
let writeRes = catch:
await conn.writeLp(resBuf.resp)
2024-04-25 13:09:52 +00:00
if writeRes.isErr():
error "Connection write error", error = writeRes.error.msg
return
if successfulQuery:
let writeDuration = getTime().toUnixFloat() - writeRespStartTime
waku_store_time_seconds.set(writeDuration, ["send-store-resp-time"])
debug "after sending response",
requestId = resBuf.requestId,
queryDurationSecs = queryDuration,
writeStreamDurationSecs = writeDuration
waku_service_network_bytes.inc(
amount = resBuf.resp.len().int64, labelValues = [WakuStoreCodec, "out"]
)
2024-04-25 13:09:52 +00:00
self.handler = handler
self.codec = WakuStoreCodec
proc new*(
T: type WakuStore,
peerManager: PeerManager,
rng: ref rand.HmacDrbgContext,
2024-04-25 13:09:52 +00:00
requestHandler: StoreQueryRequestHandler,
rateLimitSetting: Option[RateLimitSetting] = none[RateLimitSetting](),
): T =
2024-04-25 13:09:52 +00:00
if requestHandler.isNil(): # TODO use an Option instead ???
raise newException(NilAccessDefect, "history query handler is nil")
2024-04-25 13:09:52 +00:00
let store = WakuStore(
rng: rng,
peerManager: peerManager,
2024-04-25 13:09:52 +00:00
requestHandler: requestHandler,
requestRateLimiter: newRequestRateLimiter(rateLimitSetting),
)
2024-04-25 13:09:52 +00:00
store.initProtocolHandler()
setServiceLimitMetric(WakuStoreCodec, rateLimitSetting)
2024-04-25 13:09:52 +00:00
return store