diff --git a/examples/v2/publisher.nim b/examples/v2/publisher.nim index ed7557c4b..58e39b5d2 100644 --- a/examples/v2/publisher.nim +++ b/examples/v2/publisher.nim @@ -52,6 +52,7 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} = await node.start() await node.mountRelay() + node.peerManager.start() if not await node.startDiscv5(): error "failed to start discv5" quit(1) diff --git a/examples/v2/subscriber.nim b/examples/v2/subscriber.nim index 2f9f2a59e..1e51743b7 100644 --- a/examples/v2/subscriber.nim +++ b/examples/v2/subscriber.nim @@ -48,6 +48,7 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} = await node.start() await node.mountRelay() + node.peerManager.start() if not await node.startDiscv5(): error "failed to start discv5" quit(1) diff --git a/tests/v2/test_wakunode_relay.nim b/tests/v2/test_wakunode_relay.nim index 1db98e389..c105508c2 100644 --- a/tests/v2/test_wakunode_relay.nim +++ b/tests/v2/test_wakunode_relay.nim @@ -1,12 +1,12 @@ {.used.} import - std/os, + std/[os,sysrand,sequtils,tables,math], stew/byteutils, stew/shims/net as stewNet, testutils/unittests, - chronicles, - chronos, + chronicles, + chronos, libp2p/crypto/crypto, libp2p/crypto/secp, libp2p/peerid, @@ -19,8 +19,11 @@ import ../../waku/v2/protocol/waku_message, ../../waku/v2/node/peer_manager, ../../waku/v2/utils/peers, - ../../waku/v2/node/waku_node - + ../../waku/v2/node/waku_node, + ../../waku/v2/protocol/waku_relay, + ../test_helpers, + ./testlib/common + #./testlib/testutils template sourceDir: string = currentSourcePath.parentDir() const KEY_PATH = sourceDir / "resources/test_key.pem" @@ -28,7 +31,7 @@ const CERT_PATH = sourceDir / "resources/test_cert.pem" procSuite "WakuNode - Relay": let rng = crypto.newRng() - + asyncTest "Relay protocol is started correctly": let nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] @@ -61,7 +64,7 @@ procSuite "WakuNode - Relay": GossipSub(node2.wakuRelay).heartbeatFut.isNil == false await allFutures([node1.stop(), node2.stop()]) - + asyncTest "Messages are correctly relayed": let nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] @@ -109,7 +112,7 @@ procSuite "WakuNode - Relay": await node1.stop() await node2.stop() await node3.stop() - + asyncTest "filtering relayed messages using topic validators": ## test scenario: ## node1 and node3 set node2 as their relay node @@ -204,7 +207,39 @@ procSuite "WakuNode - Relay": (await completionFutValidatorRej.withTimeout(10.seconds)) == true await allFutures(node1.stop(), node2.stop(), node3.stop()) - + + asyncTest "Stats of peer sending wrong WakuMessages are updated": + # Create 2 nodes + let nodes = toSeq(0..1).mapIt(WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0))) + + # Start all the nodes and mount relay with + await allFutures(nodes.mapIt(it.start())) + await allFutures(nodes.mapIt(it.mountRelay())) + + # Connect nodes + let conn = await nodes[0].peerManager.dialPeer(nodes[1].switch.peerInfo.toRemotePeerInfo(), WakuRelayCodec) + require conn.isSome + + # Node 1 subscribes to topic + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = discard + nodes[1].subscribe(DefaultPubsubTopic, handler) + await sleepAsync(500.millis) + + # Node 0 publishes 5 messages not compliant with WakuMessage (aka random bytes) + for i in 0..4: + discard await nodes[0].wakuRelay.publish(DefaultPubsubTopic, urandom(1*(10^2))) + + # Wait for gossip + await sleepAsync(500.millis) + + # Verify that node 1 has received 5 invalid messages from node 0 + # meaning that message validity is enforced to gossip messages + var peerStats = nodes[1].wakuRelay.peerStats + check: + peerStats[nodes[0].switch.peerInfo.peerId].topicInfos[DefaultPubsubTopic].invalidMessageDeliveries == 5.0 + + await allFutures(nodes.mapIt(it.stop())) + asyncTest "Messages are relayed between two websocket nodes": let nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] @@ -380,7 +415,7 @@ procSuite "WakuNode - Relay": check: (await completionFut.withTimeout(5.seconds)) == true - + await allFutures(node1.stop(), node2.stop()) asyncTest "Messages are relayed between nodes with multiple transports (websocket and secure Websockets)": @@ -389,7 +424,7 @@ procSuite "WakuNode - Relay": node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), bindPort = Port(60550), wsBindPort = Port(8000), wssEnabled = true, secureKey = KEY_PATH, secureCert = CERT_PATH) nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), bindPort = Port(60552),wsBindPort = Port(8100), wsEnabled = true ) - + let pubSubTopic = "test" contentTopic = ContentTopic("/waku/2/default-content/proto") diff --git a/waku/v2/protocol/waku_relay.nim b/waku/v2/protocol/waku_relay.nim index 2bd0c22bd..142849aca 100644 --- a/waku/v2/protocol/waku_relay.nim +++ b/waku/v2/protocol/waku_relay.nim @@ -97,6 +97,17 @@ proc new*(T: type WakuRelay, verifySignature = false, maxMessageSize = MaxWakuMessageSize ) + + # Rejects messages that are not WakuMessage + proc validator(topic: string, message: messages.Message): Future[ValidationResult] {.async.} = + let msg = WakuMessage.decode(message.data) + if msg.isOk(): + return ValidationResult.Accept + return ValidationResult.Reject + + # Add validator to all default pubsub topics + for pubSubTopic in defaultPubsubTopics: + wr.addValidator(pubSubTopic, validator) except InitializationError: return err("initialization error: " & getCurrentExceptionMsg())