diff --git a/apps/wakunode2/app.nim b/apps/wakunode2/app.nim index f9dedaebe..cfbb19ea1 100644 --- a/apps/wakunode2/app.nim +++ b/apps/wakunode2/app.nim @@ -438,13 +438,15 @@ proc setupProtocols(node: WakuNode, return err("failed to mount waku relay protocol: " & getCurrentExceptionMsg()) # Add validation keys to protected topics + var subscribedProtectedTopics : seq[ProtectedTopic] for topicKey in conf.protectedTopics: if topicKey.topic notin pubsubTopics: warn "protected topic not in subscribed pubsub topics, skipping adding validator", protectedTopic=topicKey.topic, subscribedTopics=pubsubTopics continue + subscribedProtectedTopics.add(topicKey) notice "routing only signed traffic", protectedTopic=topicKey.topic, publicKey=topicKey.key - node.wakuRelay.addSignedTopicValidator(Pubsubtopic(topicKey.topic), topicKey.key) + node.wakuRelay.addSignedTopicsValidator(subscribedProtectedTopics) # Enable Rendezvous Discovery protocol when Relay is enabled try: diff --git a/apps/wakunode2/wakunode2_validator_signed.nim b/apps/wakunode2/wakunode2_validator_signed.nim index fc0d0c46d..91896eda2 100644 --- a/apps/wakunode2/wakunode2_validator_signed.nim +++ b/apps/wakunode2/wakunode2_validator_signed.nim @@ -20,7 +20,8 @@ const MessageWindowInSec = 5*60 # +- 5 minutes import ../../waku/waku_relay/protocol, - ../../waku/waku_core + ../../waku/waku_core, + ./external_config declarePublicCounter waku_msg_validator_signed_outcome, "number of messages for each validation outcome", ["result"] @@ -49,21 +50,28 @@ proc withinTimeWindow*(msg: WakuMessage): bool = return true return false -proc addSignedTopicValidator*(w: WakuRelay, topic: PubsubTopic, publicTopicKey: SkPublicKey) = - debug "adding validator to signed topic", topic=topic, publicTopicKey=publicTopicKey +proc addSignedTopicsValidator*(w: WakuRelay, protectedTopics: seq[ProtectedTopic]) = + debug "adding validator to signed topics" proc validator(topic: string, msg: WakuMessage): Future[errors.ValidationResult] {.async.} = var outcome = errors.ValidationResult.Reject + + for protectedTopic in protectedTopics: - if msg.timestamp != 0: - if msg.withinTimeWindow(): - let msgHash = SkMessage(topic.msgHash(msg)) - let recoveredSignature = SkSignature.fromRaw(msg.meta) - if recoveredSignature.isOk(): - if recoveredSignature.get.verify(msgHash, publicTopicKey): - outcome = errors.ValidationResult.Accept + if(protectedTopic.topic == topic): + if msg.timestamp != 0: + if msg.withinTimeWindow(): + let msgHash = SkMessage(topic.msgHash(msg)) + let recoveredSignature = SkSignature.fromRaw(msg.meta) + if recoveredSignature.isOk(): + if recoveredSignature.get.verify(msgHash, protectedTopic.key): + outcome = errors.ValidationResult.Accept - waku_msg_validator_signed_outcome.inc(labelValues = [$outcome]) - return outcome + if outcome != errors.ValidationResult.Accept: + debug "signed topic validation failed", topic=topic, publicTopicKey=protectedTopic.key + waku_msg_validator_signed_outcome.inc(labelValues = [$outcome]) + return outcome - w.addValidator(topic, validator) + return errors.ValidationResult.Accept + + w.addValidator(validator, "signed topic validation failed") \ No newline at end of file diff --git a/tests/waku_relay/test_protocol.nim b/tests/waku_relay/test_protocol.nim index 74e1ba2d5..d7e48ac4a 100644 --- a/tests/waku_relay/test_protocol.nim +++ b/tests/waku_relay/test_protocol.nim @@ -297,14 +297,14 @@ suite "Waku Relay": proc otherSimpleFutureHandler(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} = otherHandlerFuture.complete((topic, message)) - otherNode.addValidator(pubsubTopic, len4Validator) + otherNode.addValidator(len4Validator) discard otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler) await sleepAsync(500.millis) check: otherNode.isSubscribed(pubsubTopic) # Given a subscribed node with a validator - node.addValidator(pubsubTopic, len4Validator) + node.addValidator(len4Validator) discard node.subscribe(pubsubTopic, simpleFutureHandler) await sleepAsync(500.millis) check: diff --git a/tests/waku_relay/test_wakunode_relay.nim b/tests/waku_relay/test_wakunode_relay.nim index 7d35cee7a..d98ef0b3d 100644 --- a/tests/waku_relay/test_wakunode_relay.nim +++ b/tests/waku_relay/test_wakunode_relay.nim @@ -163,7 +163,7 @@ suite "WakuNode - Relay": completionFutValidatorAcc.complete(true) return ValidationResult.Accept - node2.wakuRelay.addValidator(pubSubTopic, validator) + node2.wakuRelay.addValidator(validator) var completionFut = newFuture[bool]() proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} = diff --git a/tests/wakunode2/test_validators.nim b/tests/wakunode2/test_validators.nim index f040da1e2..4f9e7d33d 100644 --- a/tests/wakunode2/test_validators.nim +++ b/tests/wakunode2/test_validators.nim @@ -13,6 +13,7 @@ import libp2p/multihash, secp256k1 import + ../../apps/wakunode2/external_config, ../../apps/wakunode2/wakunode2_validator_signed, ../../waku/waku_core, ../../waku/node/peer_manager, @@ -42,8 +43,10 @@ suite "WakuNode2 - Validators": # Add signed message validator to all nodes. They will only route signed messages for node in nodes: + var signedTopics : seq[ProtectedTopic] for topic, publicKey in topicsPublicKeys: - node.wakuRelay.addSignedTopicValidator(PubsubTopic(topic), publicKey) + signedTopics.add(ProtectedTopic(topic: topic, key: publicKey)) + node.wakuRelay.addSignedTopicsValidator(signedTopics) # Connect the nodes in a full mesh for i in 0..<5: @@ -114,8 +117,10 @@ suite "WakuNode2 - Validators": # Add signed message validator to all nodes. They will only route signed messages for node in nodes: + var signedTopics : seq[ProtectedTopic] for topic, publicKey in topicsPublicKeys: - node.wakuRelay.addSignedTopicValidator(PubsubTopic(topic), publicKey) + signedTopics.add(ProtectedTopic(topic: topic, key: publicKey)) + node.wakuRelay.addSignedTopicsValidator(signedTopics) # Connect the nodes in a full mesh for i in 0..<5: @@ -232,8 +237,10 @@ suite "WakuNode2 - Validators": # Add signed message validator to all nodes. They will only route signed messages for node in nodes: + var signedTopics : seq[ProtectedTopic] for topic, publicKey in topicsPublicKeys: - node.wakuRelay.addSignedTopicValidator(PubsubTopic(topic), publicKey) + signedTopics.add(ProtectedTopic(topic: topic, key: publicKey)) + node.wakuRelay.addSignedTopicsValidator(signedTopics) # nodes[0] is connected only to nodes[1] let connOk1 = await nodes[0].peerManager.connectRelay(nodes[1].switch.peerInfo.toRemotePeerInfo()) diff --git a/tests/wakunode_rest/test_rest_relay.nim b/tests/wakunode_rest/test_rest_relay.nim index c6a126780..adece80c6 100644 --- a/tests/wakunode_rest/test_rest_relay.nim +++ b/tests/wakunode_rest/test_rest_relay.nim @@ -1,7 +1,7 @@ {.used.} import - std/[sequtils,tempfiles], + std/[sequtils, strformat, tempfiles], stew/byteutils, stew/shims/net, testutils/unittests, @@ -21,7 +21,8 @@ import ../../waku/waku_relay, ../../../waku/waku_rln_relay, ../testlib/wakucore, - ../testlib/wakunode + ../testlib/wakunode, + ../resources/payloads proc testWakuNode(): WakuNode = let @@ -480,6 +481,50 @@ suite "Waku v2 Rest API - Relay": $response.contentType == $MIMETYPE_TEXT response.data == "Failed to publish. Autosharding error: invalid format: topic must start with slash" + await restServer.stop() + await restServer.closeWait() + await node.stop() + + asyncTest "Post a message larger than maximum size - POST /relay/v1/messages/{topic}": + # Given + let node = testWakuNode() + await node.start() + await node.mountRelay() + await node.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false, + rlnRelayCredIndex: some(1.uint), + rlnRelayTreePath: genTempPath("rln_tree", "wakunode_1"))) + + # RPC server setup + var restPort = Port(0) + let restAddress = parseIpAddress("0.0.0.0") + let restServer = RestServerRef.init(restAddress, restPort).tryGet() + + restPort = restServer.server.address.port # update with bound port for client use + + let cache = MessageCache.init() + + installRelayApiHandlers(restServer.router, node, cache) + restServer.start() + + let client = newRestHttpClient(initTAddress(restAddress, restPort)) + + node.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic)) + require: + toSeq(node.wakuRelay.subscribedTopics).len == 1 + + # When + let response = await client.relayPostMessagesV1(DefaultPubsubTopic, RelayWakuMessage( + payload: base64.encode(getByteSequence(MaxWakuMessageSize)), # Message will be bigger than the max size + contentTopic: some(DefaultContentTopic), + timestamp: some(int64(2022)) + )) + + # Then + check: + response.status == 400 + $response.contentType == $MIMETYPE_TEXT + response.data == fmt"Failed to publish: Message size exceeded maximum of {DefaultMaxWakuMessageSizeStr}" + await restServer.stop() await restServer.closeWait() await node.stop() \ No newline at end of file diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index b7e8ed515..01d4613f9 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -987,7 +987,7 @@ proc mountRlnRelay*(node: WakuNode, # register rln validator as default validator debug "Registering RLN validator" - node.wakuRelay.addDefaultValidator(validator) + node.wakuRelay.addValidator(validator, "RLN validation failed") node.wakuRlnRelay = rlnRelay diff --git a/waku/waku_api/rest/relay/handlers.nim b/waku/waku_api/rest/relay/handlers.nim index d003da5d9..eea61fa2a 100644 --- a/waku/waku_api/rest/relay/handlers.nim +++ b/waku/waku_api/rest/relay/handlers.nim @@ -135,16 +135,8 @@ proc installRelayApiHandlers*(router: var RestRouter, node: WakuNode, cache: Mes if not success: return RestApiResponse.internalServerError("Failed to publish: error appending RLN proof to message") - # validate the message before sending it - let result = node.wakuRlnRelay.validateMessageAndUpdateLog(message) - if result == MessageValidationResult.Invalid: - return RestApiResponse.internalServerError("Failed to publish: invalid RLN proof") - elif result == MessageValidationResult.Spam: - return RestApiResponse.badRequest("Failed to publish: limit exceeded, try again later") - elif result == MessageValidationResult.Valid: - debug "RLN proof validated successfully", pubSubTopic=pubSubTopic - else: - return RestApiResponse.internalServerError("Failed to publish: unknown RLN proof validation result") + (await node.wakuRelay.validateMessage(pubsubTopic, message)).isOkOr: + return RestApiResponse.badRequest("Failed to publish: " & error) # if we reach here its either a non-RLN message or a RLN message with a valid proof debug "Publishing message", pubSubTopic=pubSubTopic, rln=not node.wakuRlnRelay.isNil() diff --git a/waku/waku_relay/protocol.nim b/waku/waku_relay/protocol.nim index 11176d17a..3544ea55f 100644 --- a/waku/waku_relay/protocol.nim +++ b/waku/waku_relay/protocol.nim @@ -8,6 +8,7 @@ else: {.push raises: [].} import + std/strformat, stew/results, sequtils, chronos, @@ -124,14 +125,11 @@ type WakuRelayHandler* = proc(pubsubTopic: PubsubTopic, message: WakuMessage): Future[void] {.gcsafe, raises: [Defect].} WakuValidatorHandler* = proc(pubsubTopic: PubsubTopic, message: WakuMessage): Future[ValidationResult] {.gcsafe, raises: [Defect].} WakuRelay* = ref object of GossipSub - # a map of PubsubTopic's => seq[WakuValidatorHandler] that are - # called in order every time a message is received on a given pubsub topic - wakuValidators: Table[PubsubTopic, seq[WakuValidatorHandler]] - # a map that stores whether the ordered validator has been inserted - # for a given PubsubTopic + # seq of tuples: the first entry in the tuple contains the validators are called for every topic + # the second entry contains the error messages to be returned when the validator fails + wakuValidators: seq[tuple[handler: WakuValidatorHandler, errorMessage: string]] + # a map of validators to error messages to return when validation fails validatorInserted: Table[PubsubTopic, bool] - # seq of validators that are called for every pubsub topic - wakuDefaultValidators: seq[WakuValidatorHandler] proc initProtocolHandler(w: WakuRelay) = proc handler(conn: Connection, proto: string) {.async.} = @@ -180,14 +178,9 @@ proc new*(T: type WakuRelay, return ok(w) proc addValidator*(w: WakuRelay, - topic: varargs[string], - handler: WakuValidatorHandler) {.gcsafe.} = - for t in topic: - w.wakuValidators.mgetOrPut(t, @[]).add(handler) - -proc addDefaultValidator*(w: WakuRelay, - handler: WakuValidatorHandler) {.gcsafe.} = - w.wakuDefaultValidators.add(handler) + handler: WakuValidatorHandler, + errorMessage: string = "") {.gcsafe.} = + w.wakuValidators.add((handler, errorMessage)) method start*(w: WakuRelay) {.async.} = debug "start" @@ -216,19 +209,38 @@ proc generateOrderedValidator*(w: WakuRelay): auto {.gcsafe.} = let msg = msgRes.get() # now sequentially validate the message - for validator in w.wakuDefaultValidators: + for (validator, _) in w.wakuValidators: let validatorRes = await validator(pubsubTopic, msg) if validatorRes != ValidationResult.Accept: return validatorRes - - if w.wakuValidators.hasKey(pubsubTopic): - for validator in w.wakuValidators[pubsubTopic]: - let validatorRes = await validator(pubsubTopic, msg) - if validatorRes != ValidationResult.Accept: - return validatorRes return ValidationResult.Accept return wrappedValidator +proc isValidSize(message: WakuMessage): Future[Result[void, string]] {.async.} = + let messageSizeBytes = uint64(message.encode().buffer.len) + + if(messageSizeBytes > MaxWakuMessageSize): + let message = fmt"Message size exceeded maximum of {DefaultMaxWakuMessageSizeStr}" + debug "Invalid Waku Message", error=message + return err(message) + + return ok() + +proc validateMessage*(w: WakuRelay, pubsubTopic: string, msg: WakuMessage): + Future[Result[void, string]] {.async.} = + + (await msg.isValidSize()).isOkOr: + return err(error) + + for (validator, message) in w.wakuValidators: + let validatorRes = await validator(pubsubTopic, msg) + if validatorRes != ValidationResult.Accept: + if message.len > 0: + return err(message) + else: + return err("Validator failed") + return ok() + proc subscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: WakuRelayHandler): TopicHandler = debug "subscribe", pubsubTopic=pubsubTopic