diff --git a/tests/waku_rln_relay/test_waku_rln_relay.nim b/tests/waku_rln_relay/test_waku_rln_relay.nim index 7c3f82244..f66f8cd04 100644 --- a/tests/waku_rln_relay/test_waku_rln_relay.nim +++ b/tests/waku_rln_relay/test_waku_rln_relay.nim @@ -658,7 +658,7 @@ suite "Waku rln relay": # it is a duplicate result3.value == true - asyncTest "validateMessage test": + asyncTest "validateMessageAndUpdateLog test": let index = MembershipIndex(5) let rlnConf = WakuRlnConfig(rlnRelayDynamic: false, @@ -695,13 +695,13 @@ suite "Waku rln relay": # validate messages # validateMessage proc checks the validity of the message fields and adds it to the log (if valid) let - msgValidate1 = wakuRlnRelay.validateMessage(wm1, some(time)) + msgValidate1 = wakuRlnRelay.validateMessageAndUpdateLog(wm1, some(time)) # wm2 is published within the same Epoch as wm1 and should be found as spam - msgValidate2 = wakuRlnRelay.validateMessage(wm2, some(time)) + msgValidate2 = wakuRlnRelay.validateMessageAndUpdateLog(wm2, some(time)) # a valid message should be validated successfully - msgValidate3 = wakuRlnRelay.validateMessage(wm3, some(time)) + msgValidate3 = wakuRlnRelay.validateMessageAndUpdateLog(wm3, some(time)) # wm4 has no rln proof and should not be validated - msgValidate4 = wakuRlnRelay.validateMessage(wm4, some(time)) + msgValidate4 = wakuRlnRelay.validateMessageAndUpdateLog(wm4, some(time)) check: @@ -750,13 +750,13 @@ suite "Waku rln relay": # validateMessage proc checks the validity of the message fields and adds it to the log (if valid) let # this should be no verification, Valid - msgValidate1 = wakuRlnRelay.validateMessage(wm1, some(time)) + msgValidate1 = wakuRlnRelay.validateMessageAndUpdateLog(wm1, some(time)) # this should be verification, Valid - msgValidate2 = wakuRlnRelay.validateMessage(wm2, some(time)) + msgValidate2 = wakuRlnRelay.validateMessageAndUpdateLog(wm2, some(time)) # this should be verification, Invalid - msgValidate3 = wakuRlnRelay.validateMessage(wm3, some(time)) + msgValidate3 = wakuRlnRelay.validateMessageAndUpdateLog(wm3, some(time)) # this should be verification, Spam - msgValidate4 = wakuRlnRelay.validateMessage(wm4, some(time)) + msgValidate4 = wakuRlnRelay.validateMessageAndUpdateLog(wm4, some(time)) check: msgValidate1 == MessageValidationResult.Valid @@ -848,7 +848,7 @@ suite "Waku rln relay": # getMembershipCredentials returns the credential in the keystore which matches # the query, in this case the query is = - # chainId = "5" and + # chainId = "5" and # address = "0x0123456789012345678901234567890123456789" and # treeIndex = 1 let readKeystoreMembership = readKeystoreRes.get() diff --git a/tests/wakunode_jsonrpc/test_jsonrpc_relay.nim b/tests/wakunode_jsonrpc/test_jsonrpc_relay.nim index b4d12feef..39f1b1a7e 100644 --- a/tests/wakunode_jsonrpc/test_jsonrpc_relay.nim +++ b/tests/wakunode_jsonrpc/test_jsonrpc_relay.nim @@ -1,7 +1,7 @@ {.used.} import - std/[options, sequtils], + std/[options, sequtils, tempfiles], stew/shims/net as stewNet, testutils/unittests, chronicles, @@ -21,6 +21,10 @@ import ../testlib/wakucore, ../testlib/wakunode +when defined(rln): + import + ../../../waku/waku_rln_relay + proc newTestMessageCache(): relay_api.MessageCache = relay_api.MessageCache.init(capacity=30) @@ -100,6 +104,15 @@ suite "Waku v2 JSON-RPC API - Relay": await srcNode.mountRelay(@[pubSubTopic]) await dstNode.mountRelay(@[pubSubTopic]) + when defined(rln): + await srcNode.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false, + rlnRelayCredIndex: 1, + rlnRelayTreePath: genTempPath("rln_tree", "wakunode_1"))) + + await dstNode.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false, + rlnRelayCredIndex: 2, + rlnRelayTreePath: genTempPath("rln_tree", "wakunode_2"))) + await srcNode.connectToNodes(@[dstNode.peerInfo.toRemotePeerInfo()]) @@ -139,7 +152,12 @@ suite "Waku v2 JSON-RPC API - Relay": response == true await dstHandlerFut.withTimeout(chronos.seconds(5)) - let (topic, msg) = dstHandlerFut.read() + var (topic, msg) = dstHandlerFut.read() + + # proof is injected under the hood, we compare just the message + when defined(rln): + msg.proof = @[] + check: topic == pubSubTopic msg == message @@ -171,7 +189,7 @@ suite "Waku v2 JSON-RPC API - Relay": # RPC server (destination node) let - rpcPort = Port(8548) + rpcPort = Port(8549) ta = initTAddress(ValidIpAddress.init("0.0.0.0"), rpcPort) server = newRpcHttpServer([ta]) diff --git a/tests/wakunode_rest/test_rest_relay.nim b/tests/wakunode_rest/test_rest_relay.nim index 2159c052d..1550004d0 100644 --- a/tests/wakunode_rest/test_rest_relay.nim +++ b/tests/wakunode_rest/test_rest_relay.nim @@ -1,7 +1,7 @@ {.used.} import - std/sequtils, + std/[sequtils,tempfiles], stew/byteutils, stew/shims/net, testutils/unittests, @@ -22,6 +22,9 @@ import ../testlib/wakucore, ../testlib/wakunode +when defined(rln): + import + ../../../waku/waku_rln_relay proc testWakuNode(): WakuNode = let @@ -183,6 +186,10 @@ suite "Waku v2 Rest API - Relay": let node = testWakuNode() await node.start() await node.mountRelay() + when defined(rln): + await node.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false, + rlnRelayCredIndex: 1, + rlnRelayTreePath: genTempPath("rln_tree", "wakunode_1"))) # RPC server setup let restPort = Port(58014) diff --git a/waku/node/jsonrpc/relay/handlers.nim b/waku/node/jsonrpc/relay/handlers.nim index 7a8d952e0..38e08c1d0 100644 --- a/waku/node/jsonrpc/relay/handlers.nim +++ b/waku/node/jsonrpc/relay/handlers.nim @@ -22,7 +22,8 @@ from std/times import toUnix when defined(rln): import - ../../../waku_rln_relay + ../../../waku_rln_relay, + ../../../waku_rln_relay/rln/wrappers logScope: topics = "waku node jsonrpc relay_api" @@ -77,9 +78,9 @@ proc installRelayApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageC return true - server.rpc("post_waku_v2_relay_v1_message") do (topic: PubsubTopic, msg: WakuMessageRPC) -> bool: + server.rpc("post_waku_v2_relay_v1_message") do (pubsubTopic: PubsubTopic, msg: WakuMessageRPC) -> bool: ## Publishes a WakuMessage to a PubSub topic - debug "post_waku_v2_relay_v1_message" + debug "post_waku_v2_relay_v1_message", pubsubTopic=pubsubTopic let payloadRes = base64.decode(msg.payload) if payloadRes.isErr(): @@ -93,18 +94,38 @@ proc installRelayApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageC timestamp: msg.timestamp.get(Timestamp(0)), ephemeral: msg.ephemeral.get(false) ) - + + # ensure the node is subscribed to the pubsubTopic. otherwise it risks publishing + # to a topic with no connected peers + if pubsubTopic notin node.wakuRelay.subscribedTopics(): + raise newException(ValueError, "Failed to publish: Node not subscribed to pubsubTopic: " & pubsubTopic) + + # if RLN is mounted, append the proof to the message when defined(rln): if not node.wakuRlnRelay.isNil(): - let success = node.wakuRlnRelay.appendRLNProof(message, + # append the proof to the message + let success = node.wakuRlnRelay.appendRLNProof(message, float64(getTime().toUnix())) if not success: - raise newException(ValueError, "Failed to append RLN proof to message") - - let publishFut = node.publish(topic, message) + raise newException(ValueError, "Failed to publish: error appending RLN proof to message") + # validate the message before sending it + let result = node.wakuRlnRelay.validateMessage(message) + if result == MessageValidationResult.Invalid: + raise newException(ValueError, "Failed to publish: invalid RLN proof") + elif result == MessageValidationResult.Spam: + raise newException(ValueError, "Failed to publish: limit exceeded, try again later") + elif result == MessageValidationResult.Valid: + debug "RLN proof validated successfully", pubSubTopic=pubSubTopic + else: + raise newException(ValueError, "Failed to publish: unknown RLN proof validation result") + else: + raise newException(ValueError, "Failed to publish: RLN enabled but not mounted") + # if we reach here its either a non-RLN message or a RLN message with a valid proof + debug "Publishing message", pubSubTopic=pubSubTopic, rln=defined(rln) + let publishFut = node.publish(pubsubTopic, message) if not await publishFut.withTimeout(futTimeout): - raise newException(ValueError, "Failed to publish to topic " & topic) + raise newException(ValueError, "Failed to publish: timed out") return true diff --git a/waku/node/rest/relay/handlers.nim b/waku/node/rest/relay/handlers.nim index 757972eec..586040100 100644 --- a/waku/node/rest/relay/handlers.nim +++ b/waku/node/rest/relay/handlers.nim @@ -13,11 +13,20 @@ import presto/common import ../../waku_node, + ../../../waku_relay/protocol, ../serdes, ../responses, ./types, ./topic_cache +from std/times import getTime +from std/times import toUnix + +when defined(rln): + import + ../../../waku_rln_relay, + ../../../waku_rln_relay/rln/wrappers + export types @@ -127,6 +136,11 @@ proc installRelayPostMessagesV1Handler*(router: var RestRouter, node: WakuNode) return RestApiResponse.badRequest() let pubSubTopic = topic.get() + # ensure the node is subscribed to the topic. otherwise it risks publishing + # to a topic with no connected peers + if pubSubTopic notin node.wakuRelay.subscribedTopics(): + return RestApiResponse.badRequest("Failed to publish: Node not subscribed to topic: " & pubsubTopic) + # Check the request body if contentBody.isNone(): return RestApiResponse.badRequest() @@ -144,9 +158,35 @@ proc installRelayPostMessagesV1Handler*(router: var RestRouter, node: WakuNode) if resMessage.isErr(): return RestApiResponse.badRequest() + var message = resMessage.get() + + # if RLN is mounted, append the proof to the message + when defined(rln): + if not node.wakuRlnRelay.isNil(): + # append the proof to the message + let success = node.wakuRlnRelay.appendRLNProof(message, + float64(getTime().toUnix())) + if not success: + return RestApiResponse.internalServerError("Failed to publish: error appending RLN proof to message") + + # validate the message before sending it + let result = node.wakuRlnRelay.validateMessage(message) + if result == MessageValidationResult.Invalid: + return RestApiResponse.internalServerError("Failed to publish: invalid RLN proof") + elif result == MessageValidationResult.Spam: + return RestApiResponse.badRequest("Failed to publish: limit exceeded, try again later") + elif result == MessageValidationResult.Valid: + debug "RLN proof validated successfully", pubSubTopic=pubSubTopic + else: + return RestApiResponse.internalServerError("Failed to publish: unknown RLN proof validation result") + else: + return RestApiResponse.internalServerError("Failed to publish: RLN enabled but not mounted") + + # if we reach here its either a non-RLN message or a RLN message with a valid proof + debug "Publishing message", pubSubTopic=pubSubTopic, rln=defined(rln) if not (waitFor node.publish(pubSubTopic, resMessage.value).withTimeout(futTimeout)): - error "Failed to publish message to topic", topic=pubSubTopic - return RestApiResponse.internalServerError() + error "Failed to publish message to topic", pubSubTopic=pubSubTopic + return RestApiResponse.internalServerError("Failed to publish: timedout") return RestApiResponse.ok() diff --git a/waku/waku_relay/protocol.nim b/waku/waku_relay/protocol.nim index a2a17a9b7..bfca5d5ad 100644 --- a/waku/waku_relay/protocol.nim +++ b/waku/waku_relay/protocol.nim @@ -191,9 +191,8 @@ proc validator(pubsubTopic: string, message: messages.Message): Future[Validatio proc isSubscribed*(w: WakuRelay, topic: PubsubTopic): bool = GossipSub(w).topics.hasKey(topic) -iterator subscribedTopics*(w: WakuRelay): lent PubsubTopic = - for topic in GossipSub(w).topics.keys(): - yield topic +proc subscribedTopics*(w: WakuRelay): seq[PubsubTopic] = + return toSeq(GossipSub(w).topics.keys()) proc subscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: WakuRelayHandler) = debug "subscribe", pubsubTopic=pubsubTopic @@ -202,7 +201,7 @@ proc subscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: WakuRelayHandle let wrappedHandler = proc(pubsubTopic: string, data: seq[byte]): Future[void] {.gcsafe, raises: [].} = let decMsg = WakuMessage.decode(data) if decMsg.isErr(): - # fine if triggerSelf enabled, since validators are bypassed + # fine if triggerSelf enabled, since validators are bypassed error "failed to decode WakuMessage, validator passed a wrong message", error = decMsg.error let fut = newFuture[void]() fut.complete() diff --git a/waku/waku_rln_relay/rln_relay.nim b/waku/waku_rln_relay/rln_relay.nim index 2c9046302..c9ed26786 100644 --- a/waku/waku_rln_relay/rln_relay.nim +++ b/waku/waku_rln_relay/rln_relay.nim @@ -181,7 +181,7 @@ proc validateMessage*(rlnPeer: WakuRLNRelay, # track message count for metrics waku_rln_messages_total.inc() - # checks if the `msg`'s epoch is far from the current epoch + # checks if the `msg`'s epoch is far from the current epoch # it corresponds to the validation of rln external nullifier var epoch: Epoch if timeOption.isSome(): @@ -190,20 +190,19 @@ proc validateMessage*(rlnPeer: WakuRLNRelay, # get current rln epoch epoch = getCurrentEpoch() - debug "current epoch", currentEpoch = fromEpoch(epoch) let msgEpoch = proof.epoch # calculate the gaps gap = absDiff(epoch, msgEpoch) - debug "message epoch", msgEpoch = fromEpoch(msgEpoch) + debug "epoch info", currentEpoch = fromEpoch(epoch), msgEpoch = fromEpoch(msgEpoch) # validate the epoch if gap > MaxEpochGap: # message's epoch is too old or too ahead # accept messages whose epoch is within +-MaxEpochGap from the current epoch warn "invalid message: epoch gap exceeds a threshold", gap = gap, - payload = string.fromBytes(msg.payload), msgEpoch = fromEpoch(proof.epoch) + payloadLen = msg.payload.len, msgEpoch = fromEpoch(proof.epoch) waku_rln_invalid_messages_total.inc(labelValues=["invalid_epoch"]) return MessageValidationResult.Invalid @@ -224,11 +223,11 @@ proc validateMessage*(rlnPeer: WakuRLNRelay, if proofVerificationRes.isErr(): waku_rln_errors_total.inc(labelValues=["proof_verification"]) - warn "invalid message: proof verification failed", payload = string.fromBytes(msg.payload) + warn "invalid message: proof verification failed", payloadLen = msg.payload.len return MessageValidationResult.Invalid if not proofVerificationRes.value(): # invalid proof - debug "invalid message: invalid proof", payload = string.fromBytes(msg.payload) + debug "invalid message: invalid proof", payloadLen = msg.payload.len waku_rln_invalid_messages_total.inc(labelValues=["invalid_proof"]) return MessageValidationResult.Invalid @@ -241,19 +240,39 @@ proc validateMessage*(rlnPeer: WakuRLNRelay, if hasDup.isErr(): waku_rln_errors_total.inc(labelValues=["duplicate_check"]) elif hasDup.value == true: - debug "invalid message: message is spam", payload = string.fromBytes(msg.payload) + debug "invalid message: message is spam", payloadLen = msg.payload.len waku_rln_spam_messages_total.inc() return MessageValidationResult.Spam - # insert the message to the log - # the result of `updateLog` is discarded because message insertion is guaranteed by the implementation i.e., - # it will never error out - discard rlnPeer.updateLog(proofMetadataRes.get()) - debug "message is valid", payload = string.fromBytes(msg.payload) + debug "message is valid", payloadLen = msg.payload.len let rootIndex = rlnPeer.groupManager.indexOfRoot(proof.merkleRoot) waku_rln_valid_messages_total.observe(rootIndex.toFloat()) return MessageValidationResult.Valid +proc validateMessageAndUpdateLog*( + rlnPeer: WakuRLNRelay, + msg: WakuMessage, + timeOption = none(float64)): MessageValidationResult = + ## validates the message and updates the log to prevent double messaging + ## in future messages + + let result = rlnPeer.validateMessage(msg, timeOption) + + let decodeRes = RateLimitProof.init(msg.proof) + if decodeRes.isErr(): + return MessageValidationResult.Invalid + + let msgProof = decodeRes.get() + let proofMetadataRes = msgProof.extractMetadata() + + if proofMetadataRes.isErr(): + return MessageValidationResult.Invalid + + # insert the message to the log (never errors) + discard rlnPeer.updateLog(proofMetadataRes.get()) + + return result + proc toRLNSignal*(wakumessage: WakuMessage): seq[byte] = ## it is a utility proc that prepares the `data` parameter of the proof generation procedure i.e., `proofGen` that resides in the current module ## it extracts the `contentTopic` and the `payload` of the supplied `wakumessage` and serializes them into a byte sequence @@ -309,9 +328,10 @@ proc generateRlnValidator*(wakuRlnRelay: WakuRLNRelay, let msgProof = decodeRes.get() - # validate the message + # validate the message and update log + let validationRes = wakuRlnRelay.validateMessageAndUpdateLog(wakumessage) + let - validationRes = wakuRlnRelay.validateMessage(wakumessage) proof = toHex(msgProof.proof) epoch = fromEpoch(msgProof.epoch) root = inHex(msgProof.merkleRoot)