2020-11-24 04:34:32 +00:00
|
|
|
## Waku Store protocol for historical messaging support.
|
|
|
|
## See spec for more details:
|
|
|
|
## https://github.com/vacp2p/specs/blob/master/specs/waku/v2/waku-store.md
|
2024-06-28 10:34:57 +00:00
|
|
|
{.push raises: [].}
|
2021-06-09 14:37:08 +00:00
|
|
|
|
2020-08-31 03:32:41 +00:00
|
|
|
import
|
2024-09-06 09:33:15 +00:00
|
|
|
std/[options, times],
|
2024-07-09 11:14:28 +00:00
|
|
|
results,
|
2021-07-16 22:28:35 +00:00
|
|
|
chronicles,
|
2022-11-21 08:36:41 +00:00
|
|
|
chronos,
|
2022-09-07 15:31:27 +00:00
|
|
|
bearssl/rand,
|
2020-09-28 21:44:14 +00:00
|
|
|
libp2p/crypto/crypto,
|
2020-08-31 03:32:41 +00:00
|
|
|
libp2p/protocols/protocol,
|
|
|
|
libp2p/protobuf/minprotobuf,
|
|
|
|
libp2p/stream/connection,
|
2022-07-25 11:01:37 +00:00
|
|
|
metrics
|
|
|
|
import
|
2024-04-15 13:28:35 +00:00
|
|
|
../waku_core,
|
|
|
|
../node/peer_manager,
|
|
|
|
./common,
|
|
|
|
./rpc_codec,
|
|
|
|
./protocol_metrics,
|
2024-07-16 13:46:21 +00:00
|
|
|
../common/rate_limit/request_limiter
|
2020-08-27 02:44:09 +00:00
|
|
|
|
2020-09-16 04:23:10 +00:00
|
|
|
logScope:
|
2022-11-03 15:36:24 +00:00
|
|
|
topics = "waku store"
|
2020-09-16 04:23:10 +00:00
|
|
|
|
2024-04-25 13:09:52 +00:00
|
|
|
type StoreQueryRequestHandler* =
|
|
|
|
proc(req: StoreQueryRequest): Future[StoreQueryResult] {.async, gcsafe.}
|
2020-08-27 02:44:09 +00:00
|
|
|
|
2024-03-15 23:08:47 +00:00
|
|
|
type WakuStore* = ref object of LPProtocol
|
|
|
|
peerManager: PeerManager
|
|
|
|
rng: ref rand.HmacDrbgContext
|
2024-04-25 13:09:52 +00:00
|
|
|
requestHandler*: StoreQueryRequestHandler
|
2024-07-16 13:46:21 +00:00
|
|
|
requestRateLimiter*: RequestRateLimiter
|
2022-11-09 17:50:18 +00:00
|
|
|
|
|
|
|
## Protocol
|
2020-11-09 04:48:09 +00:00
|
|
|
|
2024-09-06 09:33:15 +00:00
|
|
|
type StoreResp = tuple[resp: seq[byte], requestId: string]
|
|
|
|
|
2024-05-09 18:07:49 +00:00
|
|
|
proc handleQueryRequest(
|
2024-04-25 13:09:52 +00:00
|
|
|
self: WakuStore, requestor: PeerId, raw_request: seq[byte]
|
2024-09-06 09:33:15 +00:00
|
|
|
): Future[StoreResp] {.async.} =
|
2024-04-25 13:09:52 +00:00
|
|
|
var res = StoreQueryResponse()
|
2022-09-02 08:14:58 +00:00
|
|
|
|
2024-04-25 13:09:52 +00:00
|
|
|
let req = StoreQueryRequest.decode(raw_request).valueOr:
|
2024-05-01 18:47:06 +00:00
|
|
|
error "failed to decode rpc", peerId = requestor, error = $error
|
2024-04-25 13:09:52 +00:00
|
|
|
waku_store_errors.inc(labelValues = [decodeRpcFailure])
|
2022-11-09 17:50:18 +00:00
|
|
|
|
2024-04-25 13:09:52 +00:00
|
|
|
res.statusCode = uint32(ErrorCode.BAD_REQUEST)
|
2024-05-01 18:47:06 +00:00
|
|
|
res.statusDesc = "decoding rpc failed: " & $error
|
2022-11-09 17:50:18 +00:00
|
|
|
|
2024-09-06 09:33:15 +00:00
|
|
|
return (res.encode().buffer, "not_parsed_requestId")
|
2020-08-27 02:44:09 +00:00
|
|
|
|
2024-04-25 13:09:52 +00:00
|
|
|
let requestId = req.requestId
|
2024-04-15 13:28:35 +00:00
|
|
|
|
2024-04-25 13:09:52 +00:00
|
|
|
info "received store query request",
|
|
|
|
peerId = requestor, requestId = requestId, request = req
|
|
|
|
waku_store_queries.inc()
|
|
|
|
|
|
|
|
let queryResult = await self.requestHandler(req)
|
|
|
|
|
|
|
|
res = queryResult.valueOr:
|
|
|
|
error "store query failed",
|
2024-05-01 18:47:06 +00:00
|
|
|
peerId = requestor, requestId = requestId, error = $error
|
2024-04-25 13:09:52 +00:00
|
|
|
|
2024-05-01 18:47:06 +00:00
|
|
|
res.statusCode = uint32(error.kind)
|
|
|
|
res.statusDesc = $error
|
2022-11-09 17:50:18 +00:00
|
|
|
|
2024-09-06 09:33:15 +00:00
|
|
|
return (res.encode().buffer, "not_parsed_requestId")
|
2022-11-09 17:50:18 +00:00
|
|
|
|
2024-04-25 13:09:52 +00:00
|
|
|
res.requestId = requestId
|
|
|
|
res.statusCode = 200
|
2024-04-25 19:43:21 +00:00
|
|
|
res.statusDesc = "OK"
|
2024-04-25 13:09:52 +00:00
|
|
|
|
|
|
|
info "sending store query response",
|
|
|
|
peerId = requestor, requestId = requestId, messages = res.messages.len
|
|
|
|
|
2024-09-06 09:33:15 +00:00
|
|
|
return (res.encode().buffer, requestId)
|
2024-04-25 13:09:52 +00:00
|
|
|
|
|
|
|
proc initProtocolHandler(self: WakuStore) =
|
2024-05-09 18:07:49 +00:00
|
|
|
let rejectReposnseBuffer = StoreQueryResponse(
|
|
|
|
## We will not copy and decode RPC buffer from stream only for requestId
|
|
|
|
## in reject case as it is comparably too expensive and opens possible
|
|
|
|
## attack surface
|
|
|
|
requestId: "N/A",
|
|
|
|
statusCode: uint32(ErrorCode.TOO_MANY_REQUESTS),
|
|
|
|
statusDesc: $ErrorCode.TOO_MANY_REQUESTS,
|
|
|
|
).encode().buffer
|
|
|
|
|
2024-04-25 13:09:52 +00:00
|
|
|
proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} =
|
2024-09-06 09:33:15 +00:00
|
|
|
var successfulQuery = false ## only consider the correct queries in metrics
|
|
|
|
var resBuf: StoreResp
|
2024-05-09 18:07:49 +00:00
|
|
|
self.requestRateLimiter.checkUsageLimit(WakuStoreCodec, conn):
|
|
|
|
let readRes = catch:
|
|
|
|
await conn.readLp(DefaultMaxRpcSize.int)
|
2024-04-25 13:09:52 +00:00
|
|
|
|
2024-05-09 18:07:49 +00:00
|
|
|
let reqBuf = readRes.valueOr:
|
|
|
|
error "Connection read error", error = error.msg
|
|
|
|
return
|
2022-11-09 17:50:18 +00:00
|
|
|
|
2024-06-28 00:48:29 +00:00
|
|
|
waku_service_network_bytes.inc(
|
|
|
|
amount = reqBuf.len().int64, labelValues = [WakuStoreCodec, "in"]
|
2024-05-09 18:07:49 +00:00
|
|
|
)
|
|
|
|
|
2024-09-06 09:33:15 +00:00
|
|
|
let queryStartTime = getTime().toUnixFloat()
|
|
|
|
|
2024-05-09 18:07:49 +00:00
|
|
|
resBuf = await self.handleQueryRequest(conn.peerId, reqBuf)
|
2024-09-06 09:33:15 +00:00
|
|
|
|
|
|
|
let queryDuration = getTime().toUnixFloat() - queryStartTime
|
|
|
|
waku_store_time_seconds.inc(amount = queryDuration, labelValues = ["query-db"])
|
|
|
|
successfulQuery = true
|
2024-05-09 18:07:49 +00:00
|
|
|
do:
|
|
|
|
debug "store query request rejected due rate limit exceeded",
|
2024-07-16 13:46:21 +00:00
|
|
|
peerId = conn.peerId, limit = $self.requestRateLimiter.setting
|
2024-09-06 09:33:15 +00:00
|
|
|
resBuf = (rejectReposnseBuffer, "rejected")
|
|
|
|
|
|
|
|
let writeRespStartTime = getTime().toUnixFloat()
|
2020-11-24 04:53:42 +00:00
|
|
|
|
2024-04-25 13:09:52 +00:00
|
|
|
let writeRes = catch:
|
2024-09-06 09:33:15 +00:00
|
|
|
await conn.writeLp(resBuf.resp)
|
2022-11-09 17:50:18 +00:00
|
|
|
|
2024-04-25 13:09:52 +00:00
|
|
|
if writeRes.isErr():
|
|
|
|
error "Connection write error", error = writeRes.error.msg
|
|
|
|
return
|
2020-08-27 02:44:09 +00:00
|
|
|
|
2024-09-06 09:33:15 +00:00
|
|
|
debug "after sending response", requestId = resBuf.requestId
|
|
|
|
if successfulQuery:
|
|
|
|
let writeDuration = getTime().toUnixFloat() - writeRespStartTime
|
|
|
|
waku_store_time_seconds.inc(amount = writeDuration, labelValues = ["send-resp"])
|
|
|
|
|
2024-06-28 00:48:29 +00:00
|
|
|
waku_service_network_bytes.inc(
|
2024-09-06 09:33:15 +00:00
|
|
|
amount = resBuf.resp.len().int64, labelValues = [WakuStoreCodec, "out"]
|
2024-05-09 18:07:49 +00:00
|
|
|
)
|
|
|
|
|
2024-04-25 13:09:52 +00:00
|
|
|
self.handler = handler
|
|
|
|
self.codec = WakuStoreCodec
|
2020-11-16 08:38:52 +00:00
|
|
|
|
2024-03-15 23:08:47 +00:00
|
|
|
proc new*(
|
|
|
|
T: type WakuStore,
|
|
|
|
peerManager: PeerManager,
|
|
|
|
rng: ref rand.HmacDrbgContext,
|
2024-04-25 13:09:52 +00:00
|
|
|
requestHandler: StoreQueryRequestHandler,
|
2024-04-15 13:28:35 +00:00
|
|
|
rateLimitSetting: Option[RateLimitSetting] = none[RateLimitSetting](),
|
2024-03-15 23:08:47 +00:00
|
|
|
): T =
|
2024-04-25 13:09:52 +00:00
|
|
|
if requestHandler.isNil(): # TODO use an Option instead ???
|
2022-11-21 10:16:57 +00:00
|
|
|
raise newException(NilAccessDefect, "history query handler is nil")
|
|
|
|
|
2024-04-25 13:09:52 +00:00
|
|
|
let store = WakuStore(
|
2024-04-15 13:28:35 +00:00
|
|
|
rng: rng,
|
|
|
|
peerManager: peerManager,
|
2024-04-25 13:09:52 +00:00
|
|
|
requestHandler: requestHandler,
|
2024-07-16 13:46:21 +00:00
|
|
|
requestRateLimiter: newRequestRateLimiter(rateLimitSetting),
|
2024-04-15 13:28:35 +00:00
|
|
|
)
|
2024-04-25 13:09:52 +00:00
|
|
|
|
|
|
|
store.initProtocolHandler()
|
|
|
|
|
|
|
|
return store
|