diff --git a/apps/wakunode2/wakunode2_validator_signed.nim b/apps/wakunode2/wakunode2_validator_signed.nim index 99b799ea7..fc0d0c46d 100644 --- a/apps/wakunode2/wakunode2_validator_signed.nim +++ b/apps/wakunode2/wakunode2_validator_signed.nim @@ -52,18 +52,16 @@ proc withinTimeWindow*(msg: WakuMessage): bool = proc addSignedTopicValidator*(w: WakuRelay, topic: PubsubTopic, publicTopicKey: SkPublicKey) = debug "adding validator to signed topic", topic=topic, publicTopicKey=publicTopicKey - proc validator(topic: string, message: messages.Message): Future[errors.ValidationResult] {.async.} = - let msg = WakuMessage.decode(message.data) + proc validator(topic: string, msg: WakuMessage): Future[errors.ValidationResult] {.async.} = var outcome = errors.ValidationResult.Reject - if msg.isOk(): - if msg.get.timestamp != 0: - if msg.get.withinTimeWindow(): - let msgHash = SkMessage(topic.msgHash(msg.get)) - let recoveredSignature = SkSignature.fromRaw(msg.get.meta) - if recoveredSignature.isOk(): - if recoveredSignature.get.verify(msgHash, publicTopicKey): - outcome = errors.ValidationResult.Accept + if msg.timestamp != 0: + if msg.withinTimeWindow(): + let msgHash = SkMessage(topic.msgHash(msg)) + let recoveredSignature = SkSignature.fromRaw(msg.meta) + if recoveredSignature.isOk(): + if recoveredSignature.get.verify(msgHash, publicTopicKey): + outcome = errors.ValidationResult.Accept waku_msg_validator_signed_outcome.inc(labelValues = [$outcome]) return outcome diff --git a/tests/waku_relay/test_waku_relay.nim b/tests/waku_relay/test_waku_relay.nim index eac196147..188ddf607 100644 --- a/tests/waku_relay/test_waku_relay.nim +++ b/tests/waku_relay/test_waku_relay.nim @@ -201,13 +201,9 @@ suite "Waku Relay": await sleepAsync(500.millis) # Validator - proc validator(topic: PubsubTopic, msg: Message): Future[ValidationResult] {.async.} = - let msg = WakuMessage.decode(msg.data) - if msg.isErr(): - return ValidationResult.Ignore - + proc validator(topic: PubsubTopic, msg: WakuMessage): Future[ValidationResult] {.async.} = # only relay messages with contentTopic1 - if msg.value.contentTopic != contentTopic: + if msg.contentTopic != contentTopic: return ValidationResult.Reject return ValidationResult.Accept diff --git a/tests/waku_relay/test_wakunode_relay.nim b/tests/waku_relay/test_wakunode_relay.nim index 544fcf62a..5a8e101b5 100644 --- a/tests/waku_relay/test_wakunode_relay.nim +++ b/tests/waku_relay/test_wakunode_relay.nim @@ -158,18 +158,13 @@ suite "WakuNode - Relay": var completionFutValidatorRej = newFuture[bool]() # set a topic validator for pubSubTopic - proc validator(topic: string, message: messages.Message): Future[ValidationResult] {.async.} = + proc validator(topic: string, msg: WakuMessage): Future[ValidationResult] {.async.} = ## the validator that only allows messages with contentTopic1 to be relayed check: topic == pubSubTopic - let msg = WakuMessage.decode(message.data) - if msg.isErr(): - completionFutValidatorAcc.complete(false) - return ValidationResult.Reject - # only relay messages with contentTopic1 - if msg.value.contentTopic != contentTopic1: + if msg.contentTopic != contentTopic1: completionFutValidatorRej.complete(true) return ValidationResult.Reject diff --git a/tests/waku_rln_relay/test_waku_rln_relay.nim b/tests/waku_rln_relay/test_waku_rln_relay.nim index 066aaad33..d99bc1812 100644 --- a/tests/waku_rln_relay/test_waku_rln_relay.nim +++ b/tests/waku_rln_relay/test_waku_rln_relay.nim @@ -710,61 +710,6 @@ suite "Waku rln relay": msgValidate3 == MessageValidationResult.Valid msgValidate4 == MessageValidationResult.Invalid - asyncTest "should validate invalid proofs if bandwidth is available": - let index = MembershipIndex(5) - - let rlnConf = WakuRlnConfig(rlnRelayDynamic: false, - rlnRelayCredIndex: some(index), - rlnRelayBandwidthThreshold: 4, - rlnRelayTreePath: genTempPath("rln_tree", "waku_rln_relay_3")) - let wakuRlnRelayRes = await WakuRlnRelay.new(rlnConf) - require: - wakuRlnRelayRes.isOk() - let wakuRlnRelay = wakuRlnRelayRes.get() - - # get the current epoch time - let time = epochTime() - - # create some messages from the same peer and append rln proof to them, except wm4 - var - # this one will pass through the bandwidth threshold - wm1 = WakuMessage(payload: "Spam".toBytes()) - # this message, will be over the bandwidth threshold, hence has to be verified - wm2 = WakuMessage(payload: "Valid message".toBytes()) - # this message will be over the bandwidth threshold, hence has to be verified, will be false (since no proof) - wm3 = WakuMessage(payload: "Invalid message".toBytes()) - wm4 = WakuMessage(payload: "Spam message".toBytes()) - - let - proofAdded1 = wakuRlnRelay.appendRLNProof(wm1, time) - proofAdded2 = wakuRlnRelay.appendRLNProof(wm2, time+EpochUnitSeconds) - proofAdded3 = wakuRlnRelay.appendRLNProof(wm4, time) - - # ensure proofs are added - require: - proofAdded1 - proofAdded2 - proofAdded3 - - # validate messages - # validateMessage proc checks the validity of the message fields and adds it to the log (if valid) - let - # this should be no verification, Valid - msgValidate1 = wakuRlnRelay.validateMessageAndUpdateLog(wm1, some(time)) - # this should be verification, Valid - msgValidate2 = wakuRlnRelay.validateMessageAndUpdateLog(wm2, some(time)) - # this should be verification, Invalid - msgValidate3 = wakuRlnRelay.validateMessageAndUpdateLog(wm3, some(time)) - # this should be verification, Spam - msgValidate4 = wakuRlnRelay.validateMessageAndUpdateLog(wm4, some(time)) - - check: - msgValidate1 == MessageValidationResult.Valid - msgValidate2 == MessageValidationResult.Valid - msgValidate3 == MessageValidationResult.Invalid - msgValidate4 == MessageValidationResult.Spam - - test "toIDCommitment and toUInt256": # create an instance of rln let rlnInstance = createRLNInstanceWrapper() diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 5df6200a1..cd6603886 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -749,7 +749,7 @@ when defined(rln): # register rln validator for all subscribed relay pubsub topics for pubsubTopic in node.wakuRelay.subscribedTopics: debug "Registering RLN validator for topic", pubsubTopic=pubsubTopic - procCall GossipSub(node.wakuRelay).addValidator(pubsubTopic, validator) + node.wakuRelay.addValidator(pubsubTopic, validator) node.wakuRlnRelay = rlnRelay ## Waku peer-exchange diff --git a/waku/waku_relay/protocol.nim b/waku/waku_relay/protocol.nim index bfca5d5ad..7b54e6148 100644 --- a/waku/waku_relay/protocol.nim +++ b/waku/waku_relay/protocol.nim @@ -8,8 +8,8 @@ else: {.push raises: [].} import - std/sequtils, stew/results, + sequtils, chronos, chronicles, metrics, @@ -122,7 +122,14 @@ const GossipsubParameters = GossipSubParams( type WakuRelayResult*[T] = Result[T, string] WakuRelayHandler* = proc(pubsubTopic: PubsubTopic, message: WakuMessage): Future[void] {.gcsafe, raises: [Defect].} + WakuValidatorHandler* = proc(pubsubTopic: PubsubTopic, message: WakuMessage): Future[ValidationResult] {.gcsafe, raises: [Defect].} WakuRelay* = ref object of GossipSub + # a map of PubsubTopic's => seq[WakuValidatorHandler] that are + # called in order every time a message is received on a given pubsub topic + wakuValidators: Table[PubsubTopic, seq[WakuValidatorHandler]] + # a map that stores whether the ordered validator has been inserted + # for a given PubsubTopic + validatorInserted: Table[PubsubTopic, bool] proc initProtocolHandler(w: WakuRelay) = proc handler(conn: Connection, proto: string) {.async.} = @@ -167,9 +174,11 @@ proc new*(T: type WakuRelay, switch: Switch): WakuRelayResult[T] = return ok(w) -method addValidator*(w: WakuRelay, topic: varargs[string], handler: ValidatorHandler) {.gcsafe.} = - procCall GossipSub(w).addValidator(topic, handler) - +proc addValidator*(w: WakuRelay, + topic: varargs[string], + handler: WakuValidatorHandler) {.gcsafe.} = + for t in topic: + w.wakuValidators.mgetOrPut(t, @[]).add(handler) method start*(w: WakuRelay) {.async.} = debug "start" @@ -179,21 +188,32 @@ method stop*(w: WakuRelay) {.async.} = debug "stop" await procCall GossipSub(w).stop() -# rejects messages that are not WakuMessage -proc validator(pubsubTopic: string, message: messages.Message): Future[ValidationResult] {.async.} = - # can be optimized by checking if the message is a WakuMessage without allocating memory - # see nim-libp2p protobuf library - let msg = WakuMessage.decode(message.data) - if msg.isOk(): - return ValidationResult.Accept - return ValidationResult.Reject - proc isSubscribed*(w: WakuRelay, topic: PubsubTopic): bool = GossipSub(w).topics.hasKey(topic) proc subscribedTopics*(w: WakuRelay): seq[PubsubTopic] = return toSeq(GossipSub(w).topics.keys()) +proc generateOrderedValidator*(w: WakuRelay): auto {.gcsafe.} = + # rejects messages that are not WakuMessage + let wrappedValidator = proc(pubsubTopic: string, + message: messages.Message): Future[ValidationResult] {.async.} = + # can be optimized by checking if the message is a WakuMessage without allocating memory + # see nim-libp2p protobuf library + let msgRes = WakuMessage.decode(message.data) + if msgRes.isErr(): + return ValidationResult.Reject + let msg = msgRes.get() + + # now sequentially validate the message + if w.wakuValidators.hasKey(pubsubTopic): + for validator in w.wakuValidators[pubsubTopic]: + let validatorRes = await validator(pubsubTopic, msg) + if validatorRes != ValidationResult.Accept: + return validatorRes + return ValidationResult.Accept + return wrappedValidator + proc subscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: WakuRelayHandler) = debug "subscribe", pubsubTopic=pubsubTopic @@ -209,8 +229,10 @@ proc subscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: WakuRelayHandle else: return handler(pubsubTopic, decMsg.get()) - # add the default validator to the topic - procCall GossipSub(w).addValidator(pubSubTopic, validator) + # add the ordered validator to the topic + if not w.validatorInserted.hasKey(pubSubTopic): + procCall GossipSub(w).addValidator(pubSubTopic, w.generateOrderedValidator()) + w.validatorInserted[pubSubTopic] = true # set this topic parameters for scoring w.topicParams[pubsubTopic] = TopicParameters @@ -222,6 +244,7 @@ proc unsubscribe*(w: WakuRelay, pubsubTopic: PubsubTopic) = debug "unsubscribe", pubsubTopic=pubsubTopic procCall GossipSub(w).unsubscribeAll(pubsubTopic) + w.validatorInserted.del(pubsubTopic) proc publish*(w: WakuRelay, pubsubTopic: PubsubTopic, message: WakuMessage): Future[int] {.async.} = trace "publish", pubsubTopic=pubsubTopic diff --git a/waku/waku_rln_relay/rln_relay.nim b/waku/waku_rln_relay/rln_relay.nim index 887e28d00..494b0947f 100644 --- a/waku/waku_rln_relay/rln_relay.nim +++ b/waku/waku_rln_relay/rln_relay.nim @@ -22,6 +22,7 @@ import ./protocol_types, ./protocol_metrics import + ../waku_relay, # for WakuRelayHandler ../waku_core, ../waku_keystore, ../utils/collector @@ -79,7 +80,6 @@ type WakuRLNRelay* = ref object of RootObj nullifierLog*: Table[Epoch, seq[ProofMetadata]] lastEpoch*: Epoch # the epoch of the last published rln message groupManager*: GroupManager - messageBucket*: Option[TokenBucket] method stop*(rlnPeer: WakuRLNRelay) {.async.} = ## stops the rln-relay protocol @@ -301,57 +301,44 @@ proc appendRLNProof*(rlnPeer: WakuRLNRelay, return true proc generateRlnValidator*(wakuRlnRelay: WakuRLNRelay, - spamHandler: Option[SpamHandler] = none(SpamHandler)): pubsub.ValidatorHandler = + spamHandler: Option[SpamHandler] = none(SpamHandler)): WakuValidatorHandler = ## this procedure is a thin wrapper for the pubsub addValidator method ## it sets a validator for waku messages, acting in the registered pubsub topic ## the message validation logic is according to https://rfc.vac.dev/spec/17/ - proc validator(topic: string, message: messages.Message): Future[pubsub.ValidationResult] {.async.} = + proc validator(topic: string, message: WakuMessage): Future[pubsub.ValidationResult] {.async.} = trace "rln-relay topic validator is called" - ## Check if enough tokens can be consumed from the message bucket - try: - if wakuRlnRelay.messageBucket.isSome() and - wakuRlnRelay.messageBucket.get().tryConsume(message.data.len): + let decodeRes = RateLimitProof.init(message.proof) + + if decodeRes.isErr(): + return pubsub.ValidationResult.Reject + + let msgProof = decodeRes.get() + + # validate the message and update log + let validationRes = wakuRlnRelay.validateMessageAndUpdateLog(message) + + let + proof = toHex(msgProof.proof) + epoch = fromEpoch(msgProof.epoch) + root = inHex(msgProof.merkleRoot) + shareX = inHex(msgProof.shareX) + shareY = inHex(msgProof.shareY) + nullifier = inHex(msgProof.nullifier) + payload = string.fromBytes(message.payload) + case validationRes: + of Valid: + trace "message validity is verified, relaying:", proof=proof, root=root, shareX=shareX, shareY=shareY, nullifier=nullifier return pubsub.ValidationResult.Accept - else: - trace "message bandwidth limit exceeded, running rate limit proof validation" - except OverflowDefect: # not a problem - trace "not enough bandwidth, running rate limit proof validation" - - let decodeRes = WakuMessage.decode(message.data) - if decodeRes.isOk(): - let wakumessage = decodeRes.value - let decodeRes = RateLimitProof.init(wakumessage.proof) - - if decodeRes.isErr(): + of Invalid: + trace "message validity could not be verified, discarding:", proof=proof, root=root, shareX=shareX, shareY=shareY, nullifier=nullifier + return pubsub.ValidationResult.Reject + of Spam: + trace "A spam message is found! yay! discarding:", proof=proof, root=root, shareX=shareX, shareY=shareY, nullifier=nullifier + if spamHandler.isSome(): + let handler = spamHandler.get() + handler(message) return pubsub.ValidationResult.Reject - - let msgProof = decodeRes.get() - - # validate the message and update log - let validationRes = wakuRlnRelay.validateMessageAndUpdateLog(wakumessage) - - let - proof = toHex(msgProof.proof) - epoch = fromEpoch(msgProof.epoch) - root = inHex(msgProof.merkleRoot) - shareX = inHex(msgProof.shareX) - shareY = inHex(msgProof.shareY) - nullifier = inHex(msgProof.nullifier) - payload = string.fromBytes(wakumessage.payload) - case validationRes: - of Valid: - trace "message validity is verified, relaying:", proof=proof, root=root, shareX=shareX, shareY=shareY, nullifier=nullifier - return pubsub.ValidationResult.Accept - of Invalid: - trace "message validity could not be verified, discarding:", proof=proof, root=root, shareX=shareX, shareY=shareY, nullifier=nullifier - return pubsub.ValidationResult.Reject - of Spam: - trace "A spam message is found! yay! discarding:", proof=proof, root=root, shareX=shareX, shareY=shareY, nullifier=nullifier - if spamHandler.isSome(): - let handler = spamHandler.get() - handler(wakumessage) - return pubsub.ValidationResult.Reject return validator proc mount(conf: WakuRlnConfig, @@ -393,12 +380,7 @@ proc mount(conf: WakuRlnConfig, # Start the group sync await groupManager.startGroupSync() - let messageBucket = if conf.rlnRelayBandwidthThreshold > 0: - some(TokenBucket.new(conf.rlnRelayBandwidthThreshold)) - else: none(TokenBucket) - - return WakuRLNRelay(groupManager: groupManager, - messageBucket: messageBucket) + return WakuRLNRelay(groupManager: groupManager) proc new*(T: type WakuRlnRelay,