From af95b5713fcdb79a4ccad79215f33fc5fb4ac4ea Mon Sep 17 00:00:00 2001 From: Alvaro Revuelta Date: Mon, 21 Aug 2023 08:55:34 +0200 Subject: [PATCH] chore(rln): run rln in all relay pubsubtopics + remove cli flags (#1917) --- apps/chat2/chat2.nim | 4 +- apps/chat2/config_chat2.nim | 10 -- apps/wakunode2/app.nim | 2 - apps/wakunode2/external_config.nim | 10 -- tests/waku_rln_relay/test_waku_rln_relay.nim | 17 +-- .../test_wakunode_rln_relay.nim | 131 ++++++++++++------ waku/node/waku_node.nim | 13 +- waku/waku_rln_relay/rln_relay.nim | 32 +---- 8 files changed, 106 insertions(+), 113 deletions(-) diff --git a/apps/chat2/chat2.nim b/apps/chat2/chat2.nim index 1203361b6..3e96ef173 100644 --- a/apps/chat2/chat2.nim +++ b/apps/chat2/chat2.nim @@ -456,7 +456,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = let peerInfo = parsePeerInfo(conf.lightpushnode) if peerInfo.isOk(): await mountLightPush(node) - node.mountLightPushClient() + node.mountLightPushClient() node.peerManager.addServicePeer(peerInfo.value, WakuLightpushCodec) else: error "LightPush not mounted. Couldn't parse conf.lightpushnode", @@ -510,8 +510,6 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = let rlnConf = WakuRlnConfig( rlnRelayDynamic: conf.rlnRelayDynamic, - rlnRelayPubsubTopic: conf.rlnRelayPubsubTopic, - rlnRelayContentTopic: conf.rlnRelayContentTopic, rlnRelayCredIndex: conf.rlnRelayCredIndex, rlnRelayMembershipGroupIndex: conf.rlnRelayMembershipGroupIndex, rlnRelayEthContractAddress: conf.rlnRelayEthContractAddress, diff --git a/apps/chat2/config_chat2.nim b/apps/chat2/config_chat2.nim index ade32418e..66351f27f 100644 --- a/apps/chat2/config_chat2.nim +++ b/apps/chat2/config_chat2.nim @@ -242,16 +242,6 @@ type defaultValue: 0 name: "rln-relay-membership-group-index" }: uint - rlnRelayContentTopic* {. - desc: "the content topic for which rln-relay gets enabled", - defaultValue: "/toy-chat/3/mingde/proto" - name: "rln-relay-content-topic" }: ContentTopic - - rlnRelayPubsubTopic* {. - desc: "the pubsub topic for which rln-relay gets enabled", - defaultValue: "/waku/2/default-waku/proto" - name: "rln-relay-pubsub-topic" }: string - rlnRelayDynamic* {. desc: "Enable waku-rln-relay with on-chain dynamic group management: true|false", defaultValue: false diff --git a/apps/wakunode2/app.nim b/apps/wakunode2/app.nim index 3a877dfaa..f22cc4404 100644 --- a/apps/wakunode2/app.nim +++ b/apps/wakunode2/app.nim @@ -397,8 +397,6 @@ proc setupProtocols(node: WakuNode, let rlnConf = WakuRlnConfig( rlnRelayDynamic: conf.rlnRelayDynamic, - rlnRelayPubsubTopic: conf.rlnRelayPubsubTopic, - rlnRelayContentTopic: conf.rlnRelayContentTopic, rlnRelayCredIndex: conf.rlnRelayCredIndex, rlnRelayMembershipGroupIndex: conf.rlnRelayMembershipGroupIndex, rlnRelayEthContractAddress: conf.rlnRelayEthContractAddress, diff --git a/apps/wakunode2/external_config.nim b/apps/wakunode2/external_config.nim index 338dfe308..916d9ff56 100644 --- a/apps/wakunode2/external_config.nim +++ b/apps/wakunode2/external_config.nim @@ -155,16 +155,6 @@ type defaultValue: 0 name: "rln-relay-membership-group-index" }: uint - rlnRelayPubsubTopic* {. - desc: "the pubsub topic for which rln-relay gets enabled", - defaultValue: "/waku/2/default-waku/proto" - name: "rln-relay-pubsub-topic" }: string - - rlnRelayContentTopic* {. - desc: "the content topic for which rln-relay gets enabled", - defaultValue: "/toy-chat/3/mingde/proto" - name: "rln-relay-content-topic" }: string - rlnRelayDynamic* {. desc: "Enable waku-rln-relay with on-chain dynamic group management: true|false", defaultValue: false diff --git a/tests/waku_rln_relay/test_waku_rln_relay.nim b/tests/waku_rln_relay/test_waku_rln_relay.nim index 94aebeb6f..cf67de02a 100644 --- a/tests/waku_rln_relay/test_waku_rln_relay.nim +++ b/tests/waku_rln_relay/test_waku_rln_relay.nim @@ -18,9 +18,6 @@ import ../../../waku/waku_keystore, ../testlib/common -const RlnRelayPubsubTopic = "waku/2/rlnrelay/proto" -const RlnRelayContentTopic = "waku/2/rlnrelay/proto" - proc createRLNInstanceWrapper(): RLNResult = return createRlnInstance(tree_path = genTempPath("rln_tree", "waku_rln_relay")) @@ -256,7 +253,7 @@ suite "Waku rln relay": require: rlnInstance.isOk() let rln = rlnInstance.get() - + require: rln.setMetadata(RlnMetadata(lastProcessedBlock: 128)).isOk() @@ -269,7 +266,7 @@ suite "Waku rln relay": check: metadata.lastProcessedBlock == 128 - + test "Merkle tree consistency check between deletion and insertion": # create an RLN instance @@ -660,8 +657,6 @@ suite "Waku rln relay": let index = MembershipIndex(5) let rlnConf = WakuRlnConfig(rlnRelayDynamic: false, - rlnRelayPubsubTopic: RlnRelayPubsubTopic, - rlnRelayContentTopic: RlnRelayContentTopic, rlnRelayCredIndex: index.uint, rlnRelayTreePath: genTempPath("rln_tree", "waku_rln_relay_2")) let wakuRlnRelayRes = await WakuRlnRelay.new(rlnConf) @@ -709,13 +704,11 @@ suite "Waku rln relay": msgValidate2 == MessageValidationResult.Spam msgValidate3 == MessageValidationResult.Valid msgValidate4 == MessageValidationResult.Invalid - + asyncTest "should validate invalid proofs if bandwidth is available": let index = MembershipIndex(5) let rlnConf = WakuRlnConfig(rlnRelayDynamic: false, - rlnRelayPubsubTopic: RlnRelayPubsubTopic, - rlnRelayContentTopic: RlnRelayContentTopic, rlnRelayCredIndex: index.uint, rlnRelayBandwidthThreshold: 4, rlnRelayTreePath: genTempPath("rln_tree", "waku_rln_relay_3")) @@ -736,7 +729,7 @@ suite "Waku rln relay": # this message will be over the bandwidth threshold, hence has to be verified, will be false (since no proof) wm3 = WakuMessage(payload: "Invalid message".toBytes()) wm4 = WakuMessage(payload: "Spam message".toBytes()) - + let proofAdded1 = wakuRlnRelay.appendRLNProof(wm1, time) proofAdded2 = wakuRlnRelay.appendRLNProof(wm2, time+EpochUnitSeconds) @@ -765,7 +758,7 @@ suite "Waku rln relay": msgValidate2 == MessageValidationResult.Valid msgValidate3 == MessageValidationResult.Invalid msgValidate4 == MessageValidationResult.Spam - + test "toIDCommitment and toUInt256": # create an instance of rln diff --git a/tests/waku_rln_relay/test_wakunode_rln_relay.nim b/tests/waku_rln_relay/test_wakunode_rln_relay.nim index 4c4acb572..020824c42 100644 --- a/tests/waku_rln_relay/test_wakunode_rln_relay.nim +++ b/tests/waku_rln_relay/test_wakunode_rln_relay.nim @@ -23,9 +23,6 @@ import from std/times import epochTime - -const RlnRelayPubsubTopic = "waku/2/rlnrelay/proto" - procSuite "WakuNode - RLN relay": asyncTest "testing rln-relay with valid proof": @@ -40,17 +37,14 @@ procSuite "WakuNode - RLN relay": nodeKey3 = generateSecp256k1Key() node3 = newTestWakuNode(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(0)) - rlnRelayPubSubTopic = RlnRelayPubsubTopic contentTopic = ContentTopic("/waku/2/default-content/proto") # set up three nodes # node1 - await node1.mountRelay(@[DefaultPubsubTopic, rlnRelayPubSubTopic]) + await node1.mountRelay(@[DefaultPubsubTopic]) # mount rlnrelay in off-chain mode await node1.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false, - rlnRelayPubsubTopic: rlnRelayPubSubTopic, - rlnRelayContentTopic: contentTopic, rlnRelayCredIndex: 1.uint, rlnRelayTreePath: genTempPath("rln_tree", "wakunode"), )) @@ -58,11 +52,9 @@ procSuite "WakuNode - RLN relay": await node1.start() # node 2 - await node2.mountRelay(@[DefaultPubsubTopic, rlnRelayPubSubTopic]) + await node2.mountRelay(@[DefaultPubsubTopic]) # mount rlnrelay in off-chain mode await node2.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false, - rlnRelayPubsubTopic: rlnRelayPubSubTopic, - rlnRelayContentTopic: contentTopic, rlnRelayCredIndex: 2.uint, rlnRelayTreePath: genTempPath("rln_tree", "wakunode_2"), )) @@ -70,11 +62,9 @@ procSuite "WakuNode - RLN relay": await node2.start() # node 3 - await node3.mountRelay(@[DefaultPubsubTopic, rlnRelayPubSubTopic]) + await node3.mountRelay(@[DefaultPubsubTopic]) await node3.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false, - rlnRelayPubsubTopic: rlnRelayPubSubTopic, - rlnRelayContentTopic: contentTopic, rlnRelayCredIndex: 3.uint, rlnRelayTreePath: genTempPath("rln_tree", "wakunode_3"), )) @@ -88,11 +78,11 @@ procSuite "WakuNode - RLN relay": var completionFut = newFuture[bool]() proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} = debug "The received topic:", topic - if topic == rlnRelayPubSubTopic: + if topic == DefaultPubsubTopic: completionFut.complete(true) # mount the relay handler - node3.subscribe(rlnRelayPubSubTopic, relayHandler) + node3.subscribe(DefaultPubsubTopic, relayHandler) await sleepAsync(2000.millis) # prepare the message payload @@ -106,7 +96,7 @@ procSuite "WakuNode - RLN relay": ## node1 publishes a message with a rate limit proof, the message is then relayed to node2 which in turn ## verifies the rate limit proof of the message and relays the message to node3 ## verification at node2 occurs inside a topic validator which is installed as part of the waku-rln-relay mount proc - await node1.publish(rlnRelayPubSubTopic, message) + await node1.publish(DefaultPubsubTopic, message) await sleepAsync(2000.millis) @@ -117,6 +107,71 @@ procSuite "WakuNode - RLN relay": await node2.stop() await node3.stop() + asyncTest "testing rln-relay is applied in all rln pubsub/content topics": + + # create 3 nodes + let nodes = toSeq(0..<3).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))) + await allFutures(nodes.mapIt(it.start())) + + let pubsubTopics = @[ + PubsubTopic("/waku/2/pubsubtopic-a/proto"), + PubsubTopic("/waku/2/pubsubtopic-b/proto")] + let contentTopics = @[ + ContentTopic("/waku/2/content-topic-a/proto"), + ContentTopic("/waku/2/content-topic-b/proto")] + + # set up three nodes + await allFutures(nodes.mapIt(it.mountRelay(pubsubTopics))) + + # mount rlnrelay in off-chain mode + for index, node in nodes: + await node.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false, + rlnRelayCredIndex: index.uint + 1, + rlnRelayTreePath: genTempPath("rln_tree", "wakunode_" & $(index+1)))) + + # start them + await allFutures(nodes.mapIt(it.start())) + + # connect them together + await nodes[0].connectToNodes(@[nodes[1].switch.peerInfo.toRemotePeerInfo()]) + await nodes[2].connectToNodes(@[nodes[1].switch.peerInfo.toRemotePeerInfo()]) + + var rxMessagesTopic1 = 0 + var rxMessagesTopic2 = 0 + proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} = + info "relayHandler. The received topic:", topic + if topic == pubsubTopics[0]: + rxMessagesTopic1 = rxMessagesTopic1 + 1 + elif topic == pubsubTopics[1]: + rxMessagesTopic2 = rxMessagesTopic2 + 1 + + # mount the relay handlers + nodes[2].subscribe(pubsubTopics[0], relayHandler) + nodes[2].subscribe(pubsubTopics[1], relayHandler) + await sleepAsync(1000.millis) + + # publish 5+5 messages to both pubsub topics and content topics + for i in 0..<5: + var message1 = WakuMessage(payload: ("Payload_" & $i).toBytes(), contentTopic: contentTopics[0]) + doAssert(nodes[0].wakuRlnRelay.appendRLNProof(message1, epochTime())) + + var message2 = WakuMessage(payload: ("Payload_" & $i).toBytes(), contentTopic: contentTopics[1]) + doAssert(nodes[1].wakuRlnRelay.appendRLNProof(message2, epochTime())) + + await nodes[0].publish(pubsubTopics[0], message1) + await nodes[1].publish(pubsubTopics[1], message2) + + # wait for gossip to propagate + await sleepAsync(2000.millis) + + # check that node[2] got messages from both topics + # and that rln was applied (4+4 messages were spam) + check: + rxMessagesTopic1 == 1 + rxMessagesTopic2 == 1 + + await allFutures(nodes.mapIt(it.stop())) + asyncTest "testing rln-relay with invalid proof": let # publisher node @@ -129,17 +184,14 @@ procSuite "WakuNode - RLN relay": nodeKey3 = generateSecp256k1Key() node3 = newTestWakuNode(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(0)) - rlnRelayPubSubTopic = RlnRelayPubsubTopic contentTopic = ContentTopic("/waku/2/default-content/proto") # set up three nodes # node1 - await node1.mountRelay(@[DefaultPubsubTopic, rlnRelayPubSubTopic]) + await node1.mountRelay(@[DefaultPubsubTopic]) # mount rlnrelay in off-chain mode await node1.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false, - rlnRelayPubsubTopic: rlnRelayPubSubTopic, - rlnRelayContentTopic: contentTopic, rlnRelayCredIndex: 1.uint, rlnRelayTreePath: genTempPath("rln_tree", "wakunode_4"), rlnRelayBandwidthThreshold: 0, @@ -148,11 +200,9 @@ procSuite "WakuNode - RLN relay": await node1.start() # node 2 - await node2.mountRelay(@[DefaultPubsubTopic, rlnRelayPubSubTopic]) + await node2.mountRelay(@[DefaultPubsubTopic]) # mount rlnrelay in off-chain mode await node2.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false, - rlnRelayPubsubTopic: rlnRelayPubSubTopic, - rlnRelayContentTopic: contentTopic, rlnRelayCredIndex: 2.uint, rlnRelayTreePath: genTempPath("rln_tree", "wakunode_5"), rlnRelayBandwidthThreshold: 0, @@ -161,11 +211,9 @@ procSuite "WakuNode - RLN relay": await node2.start() # node 3 - await node3.mountRelay(@[DefaultPubsubTopic, rlnRelayPubSubTopic]) + await node3.mountRelay(@[DefaultPubsubTopic]) await node3.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false, - rlnRelayPubsubTopic: rlnRelayPubSubTopic, - rlnRelayContentTopic: contentTopic, rlnRelayCredIndex: 3.uint, rlnRelayTreePath: genTempPath("rln_tree", "wakunode_6"), rlnRelayBandwidthThreshold: 0, @@ -181,11 +229,11 @@ procSuite "WakuNode - RLN relay": var completionFut = newFuture[bool]() proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} = debug "The received topic:", topic - if topic == rlnRelayPubSubTopic: + if topic == DefaultPubsubTopic: completionFut.complete(true) # mount the relay handler - node3.subscribe(rlnRelayPubSubTopic, relayHandler) + node3.subscribe(DefaultPubsubTopic, relayHandler) await sleepAsync(2000.millis) # prepare the message payload @@ -214,7 +262,7 @@ procSuite "WakuNode - RLN relay": ## attempts to verify the rate limit proof and fails hence does not relay the message to node3, thus the relayHandler of node3 ## never gets called ## verification at node2 occurs inside a topic validator which is installed as part of the waku-rln-relay mount proc - await node1.publish(rlnRelayPubSubTopic, message) + await node1.publish(DefaultPubsubTopic, message) await sleepAsync(2000.millis) check: @@ -238,17 +286,14 @@ procSuite "WakuNode - RLN relay": nodeKey3 = generateSecp256k1Key() node3 = newTestWakuNode(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(0)) - rlnRelayPubSubTopic = RlnRelayPubsubTopic contentTopic = ContentTopic("/waku/2/default-content/proto") # set up three nodes # node1 - await node1.mountRelay(@[DefaultPubsubTopic, rlnRelayPubSubTopic]) + await node1.mountRelay(@[DefaultPubsubTopic]) # mount rlnrelay in off-chain mode await node1.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false, - rlnRelayPubsubTopic: rlnRelayPubSubTopic, - rlnRelayContentTopic: contentTopic, rlnRelayCredIndex: 1.uint, rlnRelayTreePath: genTempPath("rln_tree", "wakunode_7"), rlnRelayBandwidthThreshold: 0, @@ -257,12 +302,10 @@ procSuite "WakuNode - RLN relay": await node1.start() # node 2 - await node2.mountRelay(@[DefaultPubsubTopic, rlnRelayPubSubTopic]) + await node2.mountRelay(@[DefaultPubsubTopic]) # mount rlnrelay in off-chain mode await node2.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false, - rlnRelayPubsubTopic: rlnRelayPubSubTopic, - rlnRelayContentTopic: contentTopic, rlnRelayCredIndex: 2.uint, rlnRelayTreePath: genTempPath("rln_tree", "wakunode_8"), rlnRelayBandwidthThreshold: 0, @@ -271,12 +314,10 @@ procSuite "WakuNode - RLN relay": await node2.start() # node 3 - await node3.mountRelay(@[DefaultPubsubTopic, rlnRelayPubSubTopic]) + await node3.mountRelay(@[DefaultPubsubTopic]) # mount rlnrelay in off-chain mode await node3.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false, - rlnRelayPubsubTopic: rlnRelayPubSubTopic, - rlnRelayContentTopic: contentTopic, rlnRelayCredIndex: 3.uint, rlnRelayTreePath: genTempPath("rln_tree", "wakunode_9"), rlnRelayBandwidthThreshold: 0, @@ -315,7 +356,7 @@ procSuite "WakuNode - RLN relay": var completionFut4 = newFuture[bool]() proc relayHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} = debug "The received topic:", topic - if topic == rlnRelayPubSubTopic: + if topic == DefaultPubsubTopic: if msg == wm1: completionFut1.complete(true) if msg == wm2: @@ -327,7 +368,7 @@ procSuite "WakuNode - RLN relay": # mount the relay handler for node3 - node3.subscribe(rlnRelayPubSubTopic, relayHandler) + node3.subscribe(DefaultPubsubTopic, relayHandler) await sleepAsync(2000.millis) ## node1 publishes and relays 4 messages to node2 @@ -336,10 +377,10 @@ procSuite "WakuNode - RLN relay": ## node2 should detect either of wm1 or wm2 as spam and not relay it ## node2 should relay wm3 to node3 ## node2 should not relay wm4 because it has no valid rln proof - await node1.publish(rlnRelayPubSubTopic, wm1) - await node1.publish(rlnRelayPubSubTopic, wm2) - await node1.publish(rlnRelayPubSubTopic, wm3) - await node1.publish(rlnRelayPubSubTopic, wm4) + await node1.publish(DefaultPubsubTopic, wm1) + await node1.publish(DefaultPubsubTopic, wm2) + await node1.publish(DefaultPubsubTopic, wm3) + await node1.publish(DefaultPubsubTopic, wm4) await sleepAsync(2000.millis) let diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 48714a296..170a2dcf2 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -405,7 +405,7 @@ proc filterSubscribe*(node: WakuNode, pubsubTopic: Option[PubsubTopic], contentT error "can't get shard", error=topicMapRes.error return else: topicMapRes.get() - + var futures = collect(newSeq): for pubsub, topics in topicMap.pairs: info "registering filter subscription to content", pubsubTopic=pubsub, contentTopics=topics, peer=remotePeer.peerId @@ -456,7 +456,7 @@ proc filterUnsubscribe*(node: WakuNode, pubsubTopic: Option[PubsubTopic], conten error "can't get shard", error = topicMapRes.error return else: topicMapRes.get() - + var futures = collect(newSeq): for pubsub, topics in topicMap.pairs: info "deregistering filter subscription to content", pubsubTopic=pubsub, contentTopics=topics, peer=remotePeer.peerId @@ -741,12 +741,13 @@ when defined(rln): raise newException(CatchableError, "failed to mount WakuRlnRelay: " & rlnRelayRes.error) let rlnRelay = rlnRelayRes.get() let validator = generateRlnValidator(rlnRelay, spamHandler) - let pb = PubSub(node.wakuRelay) - pb.addValidator(rlnRelay.pubsubTopic, validator) + + # register rln validator for all subscribed relay pubsub topics + for pubsubTopic in node.wakuRelay.subscribedTopics: + debug "Registering RLN validator for topic", pubsubTopic=pubsubTopic + procCall GossipSub(node.wakuRelay).addValidator(pubsubTopic, validator) node.wakuRlnRelay = rlnRelay - - ## Waku peer-exchange proc mountPeerExchange*(node: WakuNode) {.async, raises: [Defect, LPError].} = diff --git a/waku/waku_rln_relay/rln_relay.nim b/waku/waku_rln_relay/rln_relay.nim index b6e5cd299..0b4da052c 100644 --- a/waku/waku_rln_relay/rln_relay.nim +++ b/waku/waku_rln_relay/rln_relay.nim @@ -31,8 +31,6 @@ logScope: type WakuRlnConfig* = object rlnRelayDynamic*: bool - rlnRelayPubsubTopic*: PubsubTopic - rlnRelayContentTopic*: ContentTopic rlnRelayCredIndex*: uint rlnRelayMembershipGroupIndex*: uint rlnRelayEthContractAddress*: string @@ -80,10 +78,6 @@ proc calcEpoch*(t: float64): Epoch = return toEpoch(e) type WakuRLNRelay* = ref object of RootObj - pubsubTopic*: string # the pubsub topic for which rln relay is mounted - # contentTopic should be of type waku_core.ContentTopic, however, due to recursive module dependency, the underlying type of ContentTopic is used instead - # TODO a long-term solution is to place types with recursive dependency inside one file - contentTopic*: string # the log of nullifiers and Shamir shares of the past messages grouped per epoch nullifierLog*: Table[Epoch, seq[ProofMetadata]] lastEpoch*: Epoch # the epoch of the last published rln message @@ -171,7 +165,7 @@ proc absDiff*(e1, e2: Epoch): uint64 = else: return epoch2 - epoch1 -proc validateMessage*(rlnPeer: WakuRLNRelay, +proc validateMessage*(rlnPeer: WakuRLNRelay, msg: WakuMessage, timeOption = none(float64)): MessageValidationResult = ## validate the supplied `msg` based on the waku-rln-relay routing protocol i.e., @@ -293,10 +287,8 @@ proc appendRLNProof*(rlnPeer: WakuRLNRelay, proc generateRlnValidator*(wakuRlnRelay: WakuRLNRelay, spamHandler: Option[SpamHandler] = none(SpamHandler)): pubsub.ValidatorHandler = ## this procedure is a thin wrapper for the pubsub addValidator method - ## it sets a validator for the waku messages published on the supplied pubsubTopic and contentTopic - ## if contentTopic is empty, then validation takes place for All the messages published on the given pubsubTopic + ## it sets a validator for waku messages, acting in the registered pubsub topic ## the message validation logic is according to https://rfc.vac.dev/spec/17/ - let contentTopic = wakuRlnRelay.contentTopic proc validator(topic: string, message: messages.Message): Future[pubsub.ValidationResult] {.async.} = trace "rln-relay topic validator is called" @@ -309,21 +301,12 @@ proc generateRlnValidator*(wakuRlnRelay: WakuRLNRelay, info "message bandwidth limit exceeded, running rate limit proof validation" except OverflowDefect: # not a problem debug "not enough bandwidth, running rate limit proof validation" - let decodeRes = WakuMessage.decode(message.data) if decodeRes.isOk(): - let - wakumessage = decodeRes.value - payload = string.fromBytes(wakumessage.payload) - - # check the contentTopic - if (wakumessage.contentTopic != "") and (contentTopic != "") and (wakumessage.contentTopic != contentTopic): - trace "content topic did not match:", contentTopic=wakumessage.contentTopic, payload=payload - return pubsub.ValidationResult.Accept - - + let wakumessage = decodeRes.value let decodeRes = RateLimitProof.init(wakumessage.proof) + if decodeRes.isErr(): return pubsub.ValidationResult.Reject @@ -338,6 +321,7 @@ proc generateRlnValidator*(wakuRlnRelay: WakuRLNRelay, shareX = inHex(msgProof.shareX) shareY = inHex(msgProof.shareY) nullifier = inHex(msgProof.nullifier) + payload = string.fromBytes(wakumessage.payload) case validationRes: of Valid: debug "message validity is verified, relaying:", contentTopic=wakumessage.contentTopic, epoch=epoch, timestamp=wakumessage.timestamp, payload=payload @@ -403,12 +387,10 @@ proc mount(conf: WakuRlnConfig, await groupManager.startGroupSync() let messageBucket = if conf.rlnRelayBandwidthThreshold > 0: - some(TokenBucket.new(conf.rlnRelayBandwidthThreshold)) + some(TokenBucket.new(conf.rlnRelayBandwidthThreshold)) else: none(TokenBucket) - return WakuRLNRelay(pubsubTopic: conf.rlnRelayPubsubTopic, - contentTopic: conf.rlnRelayContentTopic, - groupManager: groupManager, + return WakuRLNRelay(groupManager: groupManager, messageBucket: messageBucket)