diff --git a/waku/waku_lightpush/protocol.nim b/waku/waku_lightpush/protocol.nim index 1165cbb52..7831fb20f 100644 --- a/waku/waku_lightpush/protocol.nim +++ b/waku/waku_lightpush/protocol.nim @@ -28,94 +28,80 @@ type WakuLightPush* = ref object of LPProtocol requestRateLimiter*: RequestRateLimiter sharding: Sharding +proc handleRequest( + wl: WakuLightPush, peerId: PeerId, pushRequest: LightPushRequest +): Future[WakuLightPushResult] {.async.} = + let pubsubTopic = pushRequest.pubSubTopic.valueOr: + let parsedTopic = NsContentTopic.parse(pushRequest.message.contentTopic).valueOr: + let msg = "Invalid content-topic:" & $error + error "lightpush request handling error", error = msg + return WakuLightPushResult.err( + (code: LightPushStatusCode.INVALID_MESSAGE_ERROR, desc: some(msg)) + ) + + wl.sharding.getShard(parsedTopic).valueOr: + let msg = "Sharding error: " & error + error "lightpush request handling error", error = msg + return WakuLightPushResult.err( + (code: LightPushStatusCode.INTERNAL_SERVER_ERROR, desc: some(msg)) + ) + + # ensure checking topic will not cause error at gossipsub level + if pubsubTopic.isEmptyOrWhitespace(): + let msg = "topic must not be empty" + error "lightpush request handling error", error = msg + return + WakuLightPushResult.err((code: LightPushStatusCode.BAD_REQUEST, desc: some(msg))) + + waku_lightpush_v3_messages.inc(labelValues = ["PushRequest"]) + + let msg_hash = pubsubTopic.computeMessageHash(pushRequest.message).to0xHex() + notice "handling lightpush request", + my_peer_id = wl.peerManager.switch.peerInfo.peerId, + peer_id = peerId, + requestId = pushRequest.requestId, + pubsubTopic = pushRequest.pubsubTopic, + msg_hash = msg_hash, + receivedTime = getNowInNanosecondTime() + + let res = (await wl.pushHandler(peerId, pubsubTopic, pushRequest.message)).valueOr: + return err((code: error.code, desc: error.desc)) + return ok(res) + proc handleRequest*( wl: WakuLightPush, peerId: PeerId, buffer: seq[byte] ): Future[LightPushResponse] {.async.} = - let reqDecodeRes = LightpushRequest.decode(buffer) - var isSuccess = false - var pushResponse: LightpushResponse + var pushResponse: LightPushResponse - if reqDecodeRes.isErr(): - pushResponse = LightpushResponse( + let pushRequest = LightPushRequest.decode(buffer).valueOr: + let desc = decodeRpcFailure & ": " & $error + error "failed to push message", error = desc + let errorCode = LightPushStatusCode.BAD_REQUEST.uint32 + waku_lightpush_v3_errors.inc(labelValues = [$errorCode]) + return LightPushResponse( requestId: "N/A", # due to decode failure we don't know requestId - statusCode: LightpushStatusCode.BAD_REQUEST.uint32, - statusDesc: some(decodeRpcFailure & ": " & $reqDecodeRes.error), - ) - else: - let pushRequest = reqDecodeRes.get() - - let pubsubTopic = pushRequest.pubSubTopic.valueOr: - let parsedTopic = NsContentTopic.parse(pushRequest.message.contentTopic).valueOr: - let msg = "Invalid content-topic:" & $error - error "lightpush request handling error", error = msg - return LightpushResponse( - requestId: pushRequest.requestId, - statusCode: LightpushStatusCode.INVALID_MESSAGE_ERROR.uint32, - statusDesc: some(msg), - ) - - wl.sharding.getShard(parsedTopic).valueOr: - let msg = "Autosharding error: " & error - error "lightpush request handling error", error = msg - return LightpushResponse( - requestId: pushRequest.requestId, - statusCode: LightpushStatusCode.INTERNAL_SERVER_ERROR.uint32, - statusDesc: some(msg), - ) - - # ensure checking topic will not cause error at gossipsub level - if pubsubTopic.isEmptyOrWhitespace(): - let msg = "topic must not be empty" - error "lightpush request handling error", error = msg - return LightPushResponse( - requestId: pushRequest.requestId, - statusCode: LightpushStatusCode.BAD_REQUEST.uint32, - statusDesc: some(msg), - ) - - waku_lightpush_v3_messages.inc(labelValues = ["PushRequest"]) - - let msg_hash = pubsubTopic.computeMessageHash(pushRequest.message).to0xHex() - notice "handling lightpush request", - my_peer_id = wl.peerManager.switch.peerInfo.peerId, - peer_id = peerId, - requestId = pushRequest.requestId, - pubsubTopic = pushRequest.pubsubTopic, - msg_hash = msg_hash, - receivedTime = getNowInNanosecondTime() - - let handleRes = await wl.pushHandler(peerId, pubsubTopic, pushRequest.message) - - isSuccess = handleRes.isOk() - pushResponse = LightpushResponse( - requestId: pushRequest.requestId, - statusCode: - if isSuccess: - LightpushStatusCode.SUCCESS.uint32 - else: - handleRes.error.code.uint32, - statusDesc: - if isSuccess: - none[string]() - else: - handleRes.error.desc, - relayPeerCount: - if isSuccess: - some(handleRes.get()) - else: - none[uint32](), + statusCode: errorCode.uint32, + statusDesc: some(desc), ) - if not isSuccess: - waku_lightpush_v3_errors.inc( - labelValues = [pushResponse.statusDesc.valueOr("unknown")] + let relayPeerCount = (await handleRequest(wl, peerId, pushRequest)).valueOr: + let desc = error.desc + waku_lightpush_v3_errors.inc(labelValues = [$error.code]) + error "failed to push message", error = desc + return LightPushResponse( + requestId: pushRequest.requestId, statusCode: error.code.uint32, statusDesc: desc ) - error "failed to push message", error = pushResponse.statusDesc - return pushResponse + + return LightPushResponse( + requestId: pushRequest.requestId, + statusCode: LightPushStatusCode.SUCCESS.uint32, + statusDesc: none[string](), + relayPeerCount: some(relayPeerCount), + ) proc initProtocolHandler(wl: WakuLightPush) = proc handler(conn: Connection, proto: string) {.async: (raises: [CancelledError]).} = - var rpc: LightpushResponse + var rpc: LightPushResponse wl.requestRateLimiter.checkUsageLimit(WakuLightPushCodec, conn): var buffer: seq[byte] try: @@ -137,7 +123,7 @@ proc initProtocolHandler(wl: WakuLightPush) = peerId = conn.peerId, limit = $wl.requestRateLimiter.setting rpc = static( - LightpushResponse( + LightPushResponse( ## We will not copy and decode RPC buffer from stream only for requestId ## in reject case as it is comparably too expensive and opens possible ## attack surface @@ -152,8 +138,8 @@ proc initProtocolHandler(wl: WakuLightPush) = except LPStreamError: error "lightpush write stream failed", error = getCurrentExceptionMsg() - ## For lightpush might not worth to measure outgoing trafic as it is only - ## small respones about success/failure + ## For lightpush might not worth to measure outgoing traffic as it is only + ## small response about success/failure wl.handler = handler wl.codec = WakuLightPushCodec