From b3208b343f35baebd2adea0da28bc9bf664d45ee Mon Sep 17 00:00:00 2001 From: Sanaz Taheri Boshrooyeh <35961250+staheri14@users.noreply.github.com> Date: Thu, 11 Mar 2021 15:42:26 -0800 Subject: [PATCH] Filtering messages in the relay protocol using libp2p topic validators (#414) * adds a test for topic validators * edits the import * edits the comments * further edits on the comments * mode comments on the test * checks the pubsub topic in the validator * adds a fix * changes the sleep time to 2 secs --- tests/v2/test_wakunode.nim | 102 +++++++++++++++++++++++++++++++++++++ 1 file changed, 102 insertions(+) diff --git a/tests/v2/test_wakunode.nim b/tests/v2/test_wakunode.nim index afee342b4..d12871f71 100644 --- a/tests/v2/test_wakunode.nim +++ b/tests/v2/test_wakunode.nim @@ -8,6 +8,8 @@ import libp2p/peerid, libp2p/multiaddress, libp2p/switch, + libp2p/protocols/pubsub/rpc/messages, + libp2p/protocols/pubsub/pubsub, eth/keys, ../../waku/v2/protocol/[waku_relay, waku_message, message_notifier], ../../waku/v2/protocol/waku_store/waku_store, @@ -293,3 +295,103 @@ procSuite "WakuNode": expect ValueError: # unsupported transport discard parsePeerInfo("/ip4/127.0.0.1/udp/60002/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc") + + asyncTest "filtering relayed messages using topic validators": + ## test scenario: + ## node1 and node3 set node2 as their relay node + ## node3 publishes two messages with two different contentTopics but on the same pubsub topic + ## node1 is also subscribed to the same pubsub topic + ## node2 sets a validator for the same pubsub topic + ## only one of the messages gets delivered to node1 because the validator only validates one of the content topics + + let + # publisher node + nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000)) + # Relay node + nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002)) + # Subscriber + nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node3 = WakuNode.init(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(60003)) + + pubSubTopic = "test" + contentTopic1 = ContentTopic(1) + payload = "hello world".toBytes() + message1 = WakuMessage(payload: payload, contentTopic: contentTopic1) + + payload2 = "you should not see this message!".toBytes() + contentTopic2 = ContentTopic(2) + message2 = WakuMessage(payload: payload2, contentTopic: contentTopic2) + + # start all the nodes + await node1.start() + node1.mountRelay(@[pubSubTopic]) + + await node2.start() + node2.mountRelay(@[pubSubTopic]) + + await node3.start() + node3.mountRelay(@[pubSubTopic]) + + await node1.connectToNodes(@[node2.peerInfo]) + await node3.connectToNodes(@[node2.peerInfo]) + + + var completionFutValidatorAcc = newFuture[bool]() + var completionFutValidatorRej = newFuture[bool]() + + proc validator(topic: string, message: messages.Message): Future[ValidationResult] {.async.} = + ## the validator that only allows messages with contentTopic1 to be relayed + check: + topic == pubSubTopic + let msg = WakuMessage.init(message.data) + if msg.isOk(): + # only relay messages with contentTopic1 + if msg.value().contentTopic == contentTopic1: + result = ValidationResult.Accept + completionFutValidatorAcc.complete(true) + else: + result = ValidationResult.Reject + completionFutValidatorRej.complete(true) + + # set a topic validator for pubSubTopic + let pb = PubSub(node2.wakuRelay) + pb.addValidator(pubSubTopic, validator) + + var completionFut = newFuture[bool]() + proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = + debug "relayed pubsub topic:", topic + let msg = WakuMessage.init(data) + if msg.isOk(): + let val = msg.value() + check: + topic == pubSubTopic + # check that only messages with contentTopic1 is relayed (but not contentTopic2) + val.contentTopic == contentTopic1 + # relay handler is called + completionFut.complete(true) + + + node3.subscribe(pubSubTopic, relayHandler) + await sleepAsync(2000.millis) + + await node1.publish(pubSubTopic, message1) + await sleepAsync(2000.millis) + + # message2 never gets relayed because of the validator + await node1.publish(pubSubTopic, message2) + await sleepAsync(2000.millis) + + check: + (await completionFut.withTimeout(10.seconds)) == true + # check that validator is called for message1 + (await completionFutValidatorAcc.withTimeout(10.seconds)) == true + # check that validator is called for message2 + (await completionFutValidatorRej.withTimeout(10.seconds)) == true + + + await node1.stop() + await node2.stop() + await node3.stop() +