mirror of https://github.com/waku-org/nwaku.git
Filtering messages in the relay protocol using libp2p topic validators (#414)
* adds a test for topic validators * edits the import * edits the comments * further edits on the comments * mode comments on the test * checks the pubsub topic in the validator * adds a fix * changes the sleep time to 2 secs
This commit is contained in:
parent
a575c44934
commit
b3208b343f
|
@ -8,6 +8,8 @@ import
|
|||
libp2p/peerid,
|
||||
libp2p/multiaddress,
|
||||
libp2p/switch,
|
||||
libp2p/protocols/pubsub/rpc/messages,
|
||||
libp2p/protocols/pubsub/pubsub,
|
||||
eth/keys,
|
||||
../../waku/v2/protocol/[waku_relay, waku_message, message_notifier],
|
||||
../../waku/v2/protocol/waku_store/waku_store,
|
||||
|
@ -293,3 +295,103 @@ procSuite "WakuNode":
|
|||
expect ValueError:
|
||||
# unsupported transport
|
||||
discard parsePeerInfo("/ip4/127.0.0.1/udp/60002/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc")
|
||||
|
||||
asyncTest "filtering relayed messages using topic validators":
|
||||
## test scenario:
|
||||
## node1 and node3 set node2 as their relay node
|
||||
## node3 publishes two messages with two different contentTopics but on the same pubsub topic
|
||||
## node1 is also subscribed to the same pubsub topic
|
||||
## node2 sets a validator for the same pubsub topic
|
||||
## only one of the messages gets delivered to node1 because the validator only validates one of the content topics
|
||||
|
||||
let
|
||||
# publisher node
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000))
|
||||
# Relay node
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002))
|
||||
# Subscriber
|
||||
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node3 = WakuNode.init(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(60003))
|
||||
|
||||
pubSubTopic = "test"
|
||||
contentTopic1 = ContentTopic(1)
|
||||
payload = "hello world".toBytes()
|
||||
message1 = WakuMessage(payload: payload, contentTopic: contentTopic1)
|
||||
|
||||
payload2 = "you should not see this message!".toBytes()
|
||||
contentTopic2 = ContentTopic(2)
|
||||
message2 = WakuMessage(payload: payload2, contentTopic: contentTopic2)
|
||||
|
||||
# start all the nodes
|
||||
await node1.start()
|
||||
node1.mountRelay(@[pubSubTopic])
|
||||
|
||||
await node2.start()
|
||||
node2.mountRelay(@[pubSubTopic])
|
||||
|
||||
await node3.start()
|
||||
node3.mountRelay(@[pubSubTopic])
|
||||
|
||||
await node1.connectToNodes(@[node2.peerInfo])
|
||||
await node3.connectToNodes(@[node2.peerInfo])
|
||||
|
||||
|
||||
var completionFutValidatorAcc = newFuture[bool]()
|
||||
var completionFutValidatorRej = newFuture[bool]()
|
||||
|
||||
proc validator(topic: string, message: messages.Message): Future[ValidationResult] {.async.} =
|
||||
## the validator that only allows messages with contentTopic1 to be relayed
|
||||
check:
|
||||
topic == pubSubTopic
|
||||
let msg = WakuMessage.init(message.data)
|
||||
if msg.isOk():
|
||||
# only relay messages with contentTopic1
|
||||
if msg.value().contentTopic == contentTopic1:
|
||||
result = ValidationResult.Accept
|
||||
completionFutValidatorAcc.complete(true)
|
||||
else:
|
||||
result = ValidationResult.Reject
|
||||
completionFutValidatorRej.complete(true)
|
||||
|
||||
# set a topic validator for pubSubTopic
|
||||
let pb = PubSub(node2.wakuRelay)
|
||||
pb.addValidator(pubSubTopic, validator)
|
||||
|
||||
var completionFut = newFuture[bool]()
|
||||
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
||||
debug "relayed pubsub topic:", topic
|
||||
let msg = WakuMessage.init(data)
|
||||
if msg.isOk():
|
||||
let val = msg.value()
|
||||
check:
|
||||
topic == pubSubTopic
|
||||
# check that only messages with contentTopic1 is relayed (but not contentTopic2)
|
||||
val.contentTopic == contentTopic1
|
||||
# relay handler is called
|
||||
completionFut.complete(true)
|
||||
|
||||
|
||||
node3.subscribe(pubSubTopic, relayHandler)
|
||||
await sleepAsync(2000.millis)
|
||||
|
||||
await node1.publish(pubSubTopic, message1)
|
||||
await sleepAsync(2000.millis)
|
||||
|
||||
# message2 never gets relayed because of the validator
|
||||
await node1.publish(pubSubTopic, message2)
|
||||
await sleepAsync(2000.millis)
|
||||
|
||||
check:
|
||||
(await completionFut.withTimeout(10.seconds)) == true
|
||||
# check that validator is called for message1
|
||||
(await completionFutValidatorAcc.withTimeout(10.seconds)) == true
|
||||
# check that validator is called for message2
|
||||
(await completionFutValidatorRej.withTimeout(10.seconds)) == true
|
||||
|
||||
|
||||
await node1.stop()
|
||||
await node2.stop()
|
||||
await node3.stop()
|
||||
|
||||
|
|
Loading…
Reference in New Issue