mirror of https://github.com/waku-org/nwaku.git
182 lines
5.6 KiB
Nim
182 lines
5.6 KiB
Nim
## Waku Store protocol for historical messaging support.
|
|
## See spec for more details:
|
|
## https://github.com/vacp2p/specs/blob/master/specs/waku/v2/waku-store.md
|
|
{.push raises: [].}
|
|
|
|
import
|
|
std/[options, times],
|
|
results,
|
|
chronicles,
|
|
chronos,
|
|
bearssl/rand,
|
|
libp2p/crypto/crypto,
|
|
libp2p/protocols/protocol,
|
|
libp2p/protobuf/minprotobuf,
|
|
libp2p/stream/connection,
|
|
metrics
|
|
import
|
|
../waku_core,
|
|
../node/peer_manager,
|
|
./common,
|
|
./rpc,
|
|
./rpc_codec,
|
|
./protocol_metrics,
|
|
../common/rate_limit/request_limiter
|
|
|
|
logScope:
|
|
topics = "waku legacy store"
|
|
|
|
type HistoryQueryHandler* =
|
|
proc(req: HistoryQuery): Future[HistoryResult] {.async, gcsafe.}
|
|
|
|
type WakuStore* = ref object of LPProtocol
|
|
peerManager: PeerManager
|
|
rng: ref rand.HmacDrbgContext
|
|
queryHandler*: HistoryQueryHandler
|
|
requestRateLimiter*: RequestRateLimiter
|
|
|
|
## Protocol
|
|
|
|
type StoreResp = tuple[resp: seq[byte], requestId: string]
|
|
|
|
proc handleLegacyQueryRequest(
|
|
self: WakuStore, requestor: PeerId, raw_request: seq[byte]
|
|
): Future[StoreResp] {.async.} =
|
|
let decodeRes = HistoryRPC.decode(raw_request)
|
|
if decodeRes.isErr():
|
|
error "failed to decode rpc", peerId = requestor, error = $decodeRes.error
|
|
waku_legacy_store_errors.inc(labelValues = [decodeRpcFailure])
|
|
return (newSeq[byte](), "failed to decode rpc")
|
|
|
|
let reqRpc = decodeRes.value
|
|
|
|
if reqRpc.query.isNone():
|
|
error "empty query rpc", peerId = requestor, requestId = reqRpc.requestId
|
|
waku_legacy_store_errors.inc(labelValues = [emptyRpcQueryFailure])
|
|
return (newSeq[byte](), "empty query rpc")
|
|
|
|
let requestId = reqRpc.requestId
|
|
var request = reqRpc.query.get().toAPI()
|
|
request.requestId = requestId
|
|
|
|
info "received history query",
|
|
peerId = requestor, requestId = requestId, query = request
|
|
waku_legacy_store_queries.inc()
|
|
|
|
var responseRes: HistoryResult
|
|
try:
|
|
responseRes = await self.queryHandler(request)
|
|
except Exception:
|
|
error "history query failed",
|
|
peerId = requestor, requestId = requestId, error = getCurrentExceptionMsg()
|
|
|
|
let error = HistoryError(kind: HistoryErrorKind.UNKNOWN).toRPC()
|
|
let response = HistoryResponseRPC(error: error)
|
|
return (
|
|
HistoryRPC(requestId: requestId, response: some(response)).encode().buffer,
|
|
requestId,
|
|
)
|
|
|
|
if responseRes.isErr():
|
|
error "history query failed",
|
|
peerId = requestor, requestId = requestId, error = responseRes.error
|
|
|
|
let response = responseRes.toRPC()
|
|
return (
|
|
HistoryRPC(requestId: requestId, response: some(response)).encode().buffer,
|
|
requestId,
|
|
)
|
|
|
|
let response = responseRes.toRPC()
|
|
|
|
info "sending history response",
|
|
peerId = requestor, requestId = requestId, messages = response.messages.len
|
|
|
|
return (
|
|
HistoryRPC(requestId: requestId, response: some(response)).encode().buffer,
|
|
requestId,
|
|
)
|
|
|
|
proc initProtocolHandler(ws: WakuStore) =
|
|
let rejectResponseBuf = HistoryRPC(
|
|
## We will not copy and decode RPC buffer from stream only for requestId
|
|
## in reject case as it is comparably too expensive and opens possible
|
|
## attack surface
|
|
requestId: "N/A",
|
|
response: some(
|
|
HistoryResponseRPC(
|
|
error: HistoryError(kind: HistoryErrorKind.TOO_MANY_REQUESTS).toRPC()
|
|
)
|
|
),
|
|
).encode().buffer
|
|
|
|
proc handler(conn: Connection, proto: string) {.async, closure.} =
|
|
var successfulQuery = false ## only consider the correct queries in metrics
|
|
var resBuf: StoreResp
|
|
var queryDuration: float
|
|
ws.requestRateLimiter.checkUsageLimit(WakuLegacyStoreCodec, conn):
|
|
let readRes = catch:
|
|
await conn.readLp(DefaultMaxRpcSize.int)
|
|
|
|
let reqBuf = readRes.valueOr:
|
|
error "Connection read error", error = error.msg
|
|
return
|
|
|
|
waku_service_network_bytes.inc(
|
|
amount = reqBuf.len().int64, labelValues = [WakuLegacyStoreCodec, "in"]
|
|
)
|
|
|
|
let queryStartTime = getTime().toUnixFloat()
|
|
resBuf = await ws.handleLegacyQueryRequest(conn.peerId, reqBuf)
|
|
queryDuration = getTime().toUnixFloat() - queryStartTime
|
|
waku_legacy_store_time_seconds.set(queryDuration, ["query-db-time"])
|
|
successfulQuery = true
|
|
do:
|
|
debug "Legacy store query request rejected due rate limit exceeded",
|
|
peerId = conn.peerId, limit = $ws.requestRateLimiter.setting
|
|
resBuf = (rejectResponseBuf, "rejected")
|
|
|
|
let writeRespStartTime = getTime().toUnixFloat()
|
|
let writeRes = catch:
|
|
await conn.writeLp(resBuf.resp)
|
|
|
|
if writeRes.isErr():
|
|
error "Connection write error", error = writeRes.error.msg
|
|
return
|
|
|
|
if successfulQuery:
|
|
let writeDuration = getTime().toUnixFloat() - writeRespStartTime
|
|
waku_legacy_store_time_seconds.set(writeDuration, ["send-store-resp-time"])
|
|
debug "after sending response",
|
|
requestId = resBuf.requestId,
|
|
queryDurationSecs = queryDuration,
|
|
writeStreamDurationSecs = writeDuration
|
|
|
|
waku_service_network_bytes.inc(
|
|
amount = resBuf.resp.len().int64, labelValues = [WakuLegacyStoreCodec, "out"]
|
|
)
|
|
|
|
ws.handler = handler
|
|
ws.codec = WakuLegacyStoreCodec
|
|
|
|
proc new*(
|
|
T: type WakuStore,
|
|
peerManager: PeerManager,
|
|
rng: ref rand.HmacDrbgContext,
|
|
queryHandler: HistoryQueryHandler,
|
|
rateLimitSetting: Option[RateLimitSetting] = none[RateLimitSetting](),
|
|
): T =
|
|
# Raise a defect if history query handler is nil
|
|
if queryHandler.isNil():
|
|
raise newException(NilAccessDefect, "history query handler is nil")
|
|
|
|
let ws = WakuStore(
|
|
rng: rng,
|
|
peerManager: peerManager,
|
|
queryHandler: queryHandler,
|
|
requestRateLimiter: newRequestRateLimiter(rateLimitSetting),
|
|
)
|
|
ws.initProtocolHandler()
|
|
setServiceLimitMetric(WakuLegacyStoreCodec, rateLimitSetting)
|
|
ws
|