diff --git a/tests/all_tests_v2.nim b/tests/all_tests_v2.nim index c89317c7d..981ec02d2 100644 --- a/tests/all_tests_v2.nim +++ b/tests/all_tests_v2.nim @@ -3,6 +3,7 @@ import # TODO: enable this when it is altered into a proper waku relay test # ./v2/test_waku, ./v2/test_wakunode, + ./v2/test_wakunode_relay, ./v2/test_wakunode_lightpush, ./v2/test_waku_store_rpc_codec, ./v2/test_waku_store, diff --git a/tests/v2/test_wakunode.nim b/tests/v2/test_wakunode.nim index 70471bd2c..569daeaca 100644 --- a/tests/v2/test_wakunode.nim +++ b/tests/v2/test_wakunode.nim @@ -11,73 +11,15 @@ import libp2p/protocols/pubsub/pubsub, libp2p/protocols/pubsub/gossipsub, libp2p/nameresolving/mockresolver, - eth/keys, ../../waku/v2/protocol/[waku_relay, waku_message], - ../../waku/v2/protocol/waku_filter, ../../waku/v2/node/peer_manager/peer_manager, ../../waku/v2/utils/peers, ../../waku/v2/node/wakunode2 - -template sourceDir: string = currentSourcePath.parentDir() -const KEY_PATH = sourceDir / "resources/test_key.pem" -const CERT_PATH = sourceDir / "resources/test_cert.pem" - procSuite "WakuNode": let rng = crypto.newRng() - asyncTest "Messages are correctly relayed": - let - nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), - Port(60000)) - nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), - Port(60002)) - nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"), - Port(60003)) - pubSubTopic = "test" - contentTopic = ContentTopic("/waku/2/default-content/proto") - payload = "hello world".toBytes() - message = WakuMessage(payload: payload, contentTopic: contentTopic) - - await node1.start() - await node1.mountRelay(@[pubSubTopic]) - - await node2.start() - await node2.mountRelay(@[pubSubTopic]) - - await node3.start() - await node3.mountRelay(@[pubSubTopic]) - - await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) - await node3.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) - - var completionFut = newFuture[bool]() - proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = - let msg = WakuMessage.init(data) - if msg.isOk(): - let val = msg.value() - check: - topic == pubSubTopic - val.contentTopic == contentTopic - val.payload == payload - completionFut.complete(true) - - node3.subscribe(pubSubTopic, relayHandler) - await sleepAsync(2000.millis) - - await node1.publish(pubSubTopic, message) - await sleepAsync(2000.millis) - - check: - (await completionFut.withTimeout(5.seconds)) == true - await node1.stop() - await node2.stop() - await node3.stop() - asyncTest "Protocol matcher works as expected": let nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] @@ -109,7 +51,6 @@ procSuite "WakuNode": node2.wakuRelay.codec == "/vac/waku/relay/2.0.0-beta2" # Now verify that protocol matcher returns `true` and relay works - await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) var completionFut = newFuture[bool]() @@ -131,8 +72,8 @@ procSuite "WakuNode": check: (await completionFut.withTimeout(5.seconds)) == true - await node1.stop() - await node2.stop() + + await allFutures(node1.stop(), node2.stop()) asyncTest "resolve and connect to dns multiaddrs": let resolver = MockResolver.new() @@ -162,140 +103,6 @@ procSuite "WakuNode": await allFutures([node1.stop(), node2.stop()]) - asyncTest "filtering relayed messages using topic validators": - ## test scenario: - ## node1 and node3 set node2 as their relay node - ## node3 publishes two messages with two different contentTopics but on the same pubsub topic - ## node1 is also subscribed to the same pubsub topic - ## node2 sets a validator for the same pubsub topic - ## only one of the messages gets delivered to node1 because the validator only validates one of the content topics - - let - # publisher node - nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000)) - # Relay node - nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002)) - # Subscriber - nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(60003)) - - pubSubTopic = "test" - contentTopic1 = ContentTopic("/waku/2/default-content/proto") - payload = "hello world".toBytes() - message1 = WakuMessage(payload: payload, contentTopic: contentTopic1) - - payload2 = "you should not see this message!".toBytes() - contentTopic2 = ContentTopic("2") - message2 = WakuMessage(payload: payload2, contentTopic: contentTopic2) - - # start all the nodes - await node1.start() - await node1.mountRelay(@[pubSubTopic]) - - await node2.start() - await node2.mountRelay(@[pubSubTopic]) - - await node3.start() - await node3.mountRelay(@[pubSubTopic]) - - await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) - await node3.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) - - var completionFutValidatorAcc = newFuture[bool]() - var completionFutValidatorRej = newFuture[bool]() - - proc validator(topic: string, message: messages.Message): Future[ValidationResult] {.async.} = - ## the validator that only allows messages with contentTopic1 to be relayed - check: - topic == pubSubTopic - let msg = WakuMessage.init(message.data) - if msg.isOk(): - # only relay messages with contentTopic1 - if msg.value().contentTopic == contentTopic1: - result = ValidationResult.Accept - completionFutValidatorAcc.complete(true) - else: - result = ValidationResult.Reject - completionFutValidatorRej.complete(true) - - # set a topic validator for pubSubTopic - let pb = PubSub(node2.wakuRelay) - pb.addValidator(pubSubTopic, validator) - - var completionFut = newFuture[bool]() - proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = - debug "relayed pubsub topic:", topic - let msg = WakuMessage.init(data) - if msg.isOk(): - let val = msg.value() - check: - topic == pubSubTopic - # check that only messages with contentTopic1 is relayed (but not contentTopic2) - val.contentTopic == contentTopic1 - # relay handler is called - completionFut.complete(true) - - - node3.subscribe(pubSubTopic, relayHandler) - await sleepAsync(2000.millis) - - await node1.publish(pubSubTopic, message1) - await sleepAsync(2000.millis) - - # message2 never gets relayed because of the validator - await node1.publish(pubSubTopic, message2) - await sleepAsync(2000.millis) - - check: - (await completionFut.withTimeout(10.seconds)) == true - # check that validator is called for message1 - (await completionFutValidatorAcc.withTimeout(10.seconds)) == true - # check that validator is called for message2 - (await completionFutValidatorRej.withTimeout(10.seconds)) == true - - - await node1.stop() - await node2.stop() - await node3.stop() - - asyncTest "Relay protocol is started correctly": - let - nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), - Port(60000)) - - # Relay protocol starts if mounted after node start - - await node1.start() - - await node1.mountRelay() - - check: - GossipSub(node1.wakuRelay).heartbeatFut.isNil == false - - # Relay protocol starts if mounted before node start - - let - nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), - Port(60002)) - - await node2.mountRelay() - - check: - # Relay has not yet started as node has not yet started - GossipSub(node2.wakuRelay).heartbeatFut.isNil - - await node2.start() - - check: - # Relay started on node start - GossipSub(node2.wakuRelay).heartbeatFut.isNil == false - - await allFutures([node1.stop(), node2.stop()]) - asyncTest "Maximum connections can be configured": let maxConnections = 2 @@ -336,188 +143,6 @@ procSuite "WakuNode": await allFutures([node1.stop(), node2.stop(), node3.stop()]) - - asyncTest "Messages are relayed between two websocket nodes": - let - nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), - bindPort = Port(60000), wsBindPort = Port(8000), wsEnabled = true) - nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), - bindPort = Port(60002), wsBindPort = Port(8100), wsEnabled = true) - pubSubTopic = "test" - contentTopic = ContentTopic("/waku/2/default-content/proto") - payload = "hello world".toBytes() - message = WakuMessage(payload: payload, contentTopic: contentTopic) - - await node1.start() - await node1.mountRelay(@[pubSubTopic]) - - await node2.start() - await node2.mountRelay(@[pubSubTopic]) - - await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) - - var completionFut = newFuture[bool]() - proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = - let msg = WakuMessage.init(data) - if msg.isOk(): - let val = msg.value() - check: - topic == pubSubTopic - val.contentTopic == contentTopic - val.payload == payload - completionFut.complete(true) - - node1.subscribe(pubSubTopic, relayHandler) - await sleepAsync(2000.millis) - - await node2.publish(pubSubTopic, message) - await sleepAsync(2000.millis) - - - check: - (await completionFut.withTimeout(5.seconds)) == true - await node1.stop() - await node2.stop() - - - asyncTest "Messages are relayed between nodes with multiple transports (TCP and Websockets)": - let - nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), - bindPort = Port(60000), wsBindPort = Port(8000), wsEnabled = true) - nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), - bindPort = Port(60002)) - pubSubTopic = "test" - contentTopic = ContentTopic("/waku/2/default-content/proto") - payload = "hello world".toBytes() - message = WakuMessage(payload: payload, contentTopic: contentTopic) - - await node1.start() - await node1.mountRelay(@[pubSubTopic]) - - await node2.start() - await node2.mountRelay(@[pubSubTopic]) - - await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) - - var completionFut = newFuture[bool]() - proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = - let msg = WakuMessage.init(data) - if msg.isOk(): - let val = msg.value() - check: - topic == pubSubTopic - val.contentTopic == contentTopic - val.payload == payload - completionFut.complete(true) - - node1.subscribe(pubSubTopic, relayHandler) - await sleepAsync(2000.millis) - - await node2.publish(pubSubTopic, message) - await sleepAsync(2000.millis) - - - check: - (await completionFut.withTimeout(5.seconds)) == true - await node1.stop() - await node2.stop() - - asyncTest "Messages relaying fails with non-overlapping transports (TCP or Websockets)": - let - nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), - bindPort = Port(60000)) - nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), - bindPort = Port(60002), wsBindPort = Port(8100), wsEnabled = true) - pubSubTopic = "test" - contentTopic = ContentTopic("/waku/2/default-content/proto") - payload = "hello world".toBytes() - message = WakuMessage(payload: payload, contentTopic: contentTopic) - - await node1.start() - await node1.mountRelay(@[pubSubTopic]) - - await node2.start() - await node2.mountRelay(@[pubSubTopic]) - - #delete websocket peer address - # TODO: a better way to find the index - this is too brittle - node2.switch.peerInfo.addrs.delete(0) - - await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) - - var completionFut = newFuture[bool]() - proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = - let msg = WakuMessage.init(data) - if msg.isOk(): - let val = msg.value() - check: - topic == pubSubTopic - val.contentTopic == contentTopic - val.payload == payload - completionFut.complete(true) - - node1.subscribe(pubSubTopic, relayHandler) - await sleepAsync(2000.millis) - - await node2.publish(pubSubTopic, message) - await sleepAsync(2000.millis) - - - check: - (await completionFut.withTimeout(5.seconds)) == false - await node1.stop() - await node2.stop() - - asyncTest "Messages are relayed between nodes with multiple transports (TCP and secure Websockets)": - let - nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), - bindPort = Port(60000), wsBindPort = Port(8000), wssEnabled = true, secureKey = KEY_PATH, secureCert = CERT_PATH) - nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), - bindPort = Port(60002)) - pubSubTopic = "test" - contentTopic = ContentTopic("/waku/2/default-content/proto") - payload = "hello world".toBytes() - message = WakuMessage(payload: payload, contentTopic: contentTopic) - - await node1.start() - await node1.mountRelay(@[pubSubTopic]) - - await node2.start() - await node2.mountRelay(@[pubSubTopic]) - - await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) - - var completionFut = newFuture[bool]() - proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = - let msg = WakuMessage.init(data) - if msg.isOk(): - let val = msg.value() - check: - topic == pubSubTopic - val.contentTopic == contentTopic - val.payload == payload - completionFut.complete(true) - - node1.subscribe(pubSubTopic, relayHandler) - await sleepAsync(2000.millis) - - await node2.publish(pubSubTopic, message) - await sleepAsync(2000.millis) - - - check: - (await completionFut.withTimeout(5.seconds)) == true - await node1.stop() - await node2.stop() - asyncTest "Messages fails with wrong key path": let nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] @@ -527,50 +152,6 @@ procSuite "WakuNode": discard WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), bindPort = Port(60000), wsBindPort = Port(8000), wssEnabled = true, secureKey = "../../waku/v2/node/key_dummy.txt") - asyncTest "Messages are relayed between nodes with multiple transports (websocket and secure Websockets)": - let - nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), - bindPort = Port(60000), wsBindPort = Port(8000), wssEnabled = true, secureKey = KEY_PATH, secureCert = CERT_PATH) - nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), - bindPort = Port(60002),wsBindPort = Port(8100), wsEnabled = true ) - pubSubTopic = "test" - contentTopic = ContentTopic("/waku/2/default-content/proto") - payload = "hello world".toBytes() - message = WakuMessage(payload: payload, contentTopic: contentTopic) - - await node1.start() - await node1.mountRelay(@[pubSubTopic]) - - await node2.start() - await node2.mountRelay(@[pubSubTopic]) - - await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) - - var completionFut = newFuture[bool]() - proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = - let msg = WakuMessage.init(data) - if msg.isOk(): - let val = msg.value() - check: - topic == pubSubTopic - val.contentTopic == contentTopic - val.payload == payload - completionFut.complete(true) - - node1.subscribe(pubSubTopic, relayHandler) - await sleepAsync(2000.millis) - - await node2.publish(pubSubTopic, message) - await sleepAsync(2000.millis) - - - check: - (await completionFut.withTimeout(5.seconds)) == true - await node1.stop() - await node2.stop() - asyncTest "Peer info updates with correct announced addresses": let nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[] diff --git a/tests/v2/test_wakunode_relay.nim b/tests/v2/test_wakunode_relay.nim new file mode 100644 index 000000000..693b92ac7 --- /dev/null +++ b/tests/v2/test_wakunode_relay.nim @@ -0,0 +1,432 @@ +{.used.} + +import + std/os, + stew/byteutils, + stew/shims/net as stewNet, + testutils/unittests, + chronicles, + chronos, + libp2p/crypto/crypto, + libp2p/crypto/secp, + libp2p/peerid, + libp2p/multiaddress, + libp2p/switch, + libp2p/protocols/pubsub/rpc/messages, + libp2p/protocols/pubsub/pubsub, + libp2p/protocols/pubsub/gossipsub +import + ../../waku/v2/protocol/[waku_relay, waku_message], + ../../waku/v2/node/peer_manager/peer_manager, + ../../waku/v2/utils/peers, + ../../waku/v2/node/wakunode2 + + +template sourceDir: string = currentSourcePath.parentDir() +const KEY_PATH = sourceDir / "resources/test_key.pem" +const CERT_PATH = sourceDir / "resources/test_cert.pem" + +procSuite "WakuNode - Relay": + let rng = crypto.newRng() + + asyncTest "Relay protocol is started correctly": + let + nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), + Port(60000)) + + # Relay protocol starts if mounted after node start + + await node1.start() + await node1.mountRelay() + + check: + GossipSub(node1.wakuRelay).heartbeatFut.isNil == false + + # Relay protocol starts if mounted before node start + + let + nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002)) + + await node2.mountRelay() + + check: + # Relay has not yet started as node has not yet started + GossipSub(node2.wakuRelay).heartbeatFut.isNil + + await node2.start() + + check: + # Relay started on node start + GossipSub(node2.wakuRelay).heartbeatFut.isNil == false + + await allFutures([node1.stop(), node2.stop()]) + + asyncTest "Messages are correctly relayed": + let + nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), + Port(60000)) + nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), + Port(60002)) + nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"), + Port(60003)) + pubSubTopic = "test" + contentTopic = ContentTopic("/waku/2/default-content/proto") + payload = "hello world".toBytes() + message = WakuMessage(payload: payload, contentTopic: contentTopic) + + await node1.start() + await node1.mountRelay(@[pubSubTopic]) + + await node2.start() + await node2.mountRelay(@[pubSubTopic]) + + await node3.start() + await node3.mountRelay(@[pubSubTopic]) + + await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) + await node3.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) + + var completionFut = newFuture[bool]() + proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = + let msg = WakuMessage.init(data) + if msg.isOk(): + let val = msg.value() + check: + topic == pubSubTopic + val.contentTopic == contentTopic + val.payload == payload + completionFut.complete(true) + + node3.subscribe(pubSubTopic, relayHandler) + await sleepAsync(500.millis) + + await node1.publish(pubSubTopic, message) + await sleepAsync(500.millis) + + check: + (await completionFut.withTimeout(5.seconds)) == true + await node1.stop() + await node2.stop() + await node3.stop() + + asyncTest "filtering relayed messages using topic validators": + ## test scenario: + ## node1 and node3 set node2 as their relay node + ## node3 publishes two messages with two different contentTopics but on the same pubsub topic + ## node1 is also subscribed to the same pubsub topic + ## node2 sets a validator for the same pubsub topic + ## only one of the messages gets delivered to node1 because the validator only validates one of the content topics + + let + # publisher node + nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000)) + # Relay node + nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002)) + # Subscriber + nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(60003)) + + pubSubTopic = "test" + contentTopic1 = ContentTopic("/waku/2/default-content/proto") + payload = "hello world".toBytes() + message1 = WakuMessage(payload: payload, contentTopic: contentTopic1) + + payload2 = "you should not see this message!".toBytes() + contentTopic2 = ContentTopic("2") + message2 = WakuMessage(payload: payload2, contentTopic: contentTopic2) + + # start all the nodes + await node1.start() + await node1.mountRelay(@[pubSubTopic]) + + await node2.start() + await node2.mountRelay(@[pubSubTopic]) + + await node3.start() + await node3.mountRelay(@[pubSubTopic]) + + await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) + await node3.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) + + var completionFutValidatorAcc = newFuture[bool]() + var completionFutValidatorRej = newFuture[bool]() + + proc validator(topic: string, message: messages.Message): Future[ValidationResult] {.async.} = + ## the validator that only allows messages with contentTopic1 to be relayed + check: + topic == pubSubTopic + let msg = WakuMessage.init(message.data) + if msg.isOk(): + # only relay messages with contentTopic1 + if msg.value().contentTopic == contentTopic1: + result = ValidationResult.Accept + completionFutValidatorAcc.complete(true) + else: + result = ValidationResult.Reject + completionFutValidatorRej.complete(true) + + # set a topic validator for pubSubTopic + let pb = PubSub(node2.wakuRelay) + pb.addValidator(pubSubTopic, validator) + + var completionFut = newFuture[bool]() + proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = + debug "relayed pubsub topic:", topic + let msg = WakuMessage.init(data) + if msg.isOk(): + let val = msg.value() + check: + topic == pubSubTopic + # check that only messages with contentTopic1 is relayed (but not contentTopic2) + val.contentTopic == contentTopic1 + # relay handler is called + completionFut.complete(true) + + + node3.subscribe(pubSubTopic, relayHandler) + await sleepAsync(500.millis) + + await node1.publish(pubSubTopic, message1) + await sleepAsync(500.millis) + + # message2 never gets relayed because of the validator + await node1.publish(pubSubTopic, message2) + await sleepAsync(500.millis) + + check: + (await completionFut.withTimeout(10.seconds)) == true + # check that validator is called for message1 + (await completionFutValidatorAcc.withTimeout(10.seconds)) == true + # check that validator is called for message2 + (await completionFutValidatorRej.withTimeout(10.seconds)) == true + + await allFutures(node1.stop(), node2.stop(), node3.stop()) + + asyncTest "Messages are relayed between two websocket nodes": + let + nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), + bindPort = Port(60000), wsBindPort = Port(8000), wsEnabled = true) + nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), + bindPort = Port(60002), wsBindPort = Port(8100), wsEnabled = true) + pubSubTopic = "test" + contentTopic = ContentTopic("/waku/2/default-content/proto") + payload = "hello world".toBytes() + message = WakuMessage(payload: payload, contentTopic: contentTopic) + + await node1.start() + await node1.mountRelay(@[pubSubTopic]) + + await node2.start() + await node2.mountRelay(@[pubSubTopic]) + + await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) + + var completionFut = newFuture[bool]() + proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = + let msg = WakuMessage.init(data) + if msg.isOk(): + let val = msg.value() + check: + topic == pubSubTopic + val.contentTopic == contentTopic + val.payload == payload + completionFut.complete(true) + + node1.subscribe(pubSubTopic, relayHandler) + await sleepAsync(500.millis) + + await node2.publish(pubSubTopic, message) + await sleepAsync(500.millis) + + + check: + (await completionFut.withTimeout(5.seconds)) == true + await node1.stop() + await node2.stop() + + asyncTest "Messages are relayed between nodes with multiple transports (TCP and Websockets)": + let + nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), + bindPort = Port(60000), wsBindPort = Port(8000), wsEnabled = true) + nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), + bindPort = Port(60002)) + pubSubTopic = "test" + contentTopic = ContentTopic("/waku/2/default-content/proto") + payload = "hello world".toBytes() + message = WakuMessage(payload: payload, contentTopic: contentTopic) + + await node1.start() + await node1.mountRelay(@[pubSubTopic]) + + await node2.start() + await node2.mountRelay(@[pubSubTopic]) + + await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) + + var completionFut = newFuture[bool]() + proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = + let msg = WakuMessage.init(data) + if msg.isOk(): + let val = msg.value() + check: + topic == pubSubTopic + val.contentTopic == contentTopic + val.payload == payload + completionFut.complete(true) + + node1.subscribe(pubSubTopic, relayHandler) + await sleepAsync(500.millis) + + await node2.publish(pubSubTopic, message) + await sleepAsync(500.millis) + + + check: + (await completionFut.withTimeout(5.seconds)) == true + await node1.stop() + await node2.stop() + + asyncTest "Messages relaying fails with non-overlapping transports (TCP or Websockets)": + let + nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), + bindPort = Port(60000)) + nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), + bindPort = Port(60002), wsBindPort = Port(8100), wsEnabled = true) + pubSubTopic = "test" + contentTopic = ContentTopic("/waku/2/default-content/proto") + payload = "hello world".toBytes() + message = WakuMessage(payload: payload, contentTopic: contentTopic) + + await node1.start() + await node1.mountRelay(@[pubSubTopic]) + + await node2.start() + await node2.mountRelay(@[pubSubTopic]) + + #delete websocket peer address + # TODO: a better way to find the index - this is too brittle + node2.switch.peerInfo.addrs.delete(0) + + await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) + + var completionFut = newFuture[bool]() + proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = + let msg = WakuMessage.init(data) + if msg.isOk(): + let val = msg.value() + check: + topic == pubSubTopic + val.contentTopic == contentTopic + val.payload == payload + completionFut.complete(true) + + node1.subscribe(pubSubTopic, relayHandler) + await sleepAsync(500.millis) + + await node2.publish(pubSubTopic, message) + await sleepAsync(500.millis) + + check: + (await completionFut.withTimeout(5.seconds)) == false + + await allFutures(node1.stop(), node2.stop()) + + asyncTest "Messages are relayed between nodes with multiple transports (TCP and secure Websockets)": + let + nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), + bindPort = Port(60000), wsBindPort = Port(8000), wssEnabled = true, secureKey = KEY_PATH, secureCert = CERT_PATH) + nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), + bindPort = Port(60002)) + pubSubTopic = "test" + contentTopic = ContentTopic("/waku/2/default-content/proto") + payload = "hello world".toBytes() + message = WakuMessage(payload: payload, contentTopic: contentTopic) + + await node1.start() + await node1.mountRelay(@[pubSubTopic]) + + await node2.start() + await node2.mountRelay(@[pubSubTopic]) + + await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) + + var completionFut = newFuture[bool]() + proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = + let msg = WakuMessage.init(data) + if msg.isOk(): + let val = msg.value() + check: + topic == pubSubTopic + val.contentTopic == contentTopic + val.payload == payload + completionFut.complete(true) + + node1.subscribe(pubSubTopic, relayHandler) + await sleepAsync(500.millis) + + await node2.publish(pubSubTopic, message) + await sleepAsync(500.millis) + + check: + (await completionFut.withTimeout(5.seconds)) == true + + await allFutures(node1.stop(), node2.stop()) + + asyncTest "Messages are relayed between nodes with multiple transports (websocket and secure Websockets)": + let + nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), bindPort = Port(60000), wsBindPort = Port(8000), wssEnabled = true, secureKey = KEY_PATH, secureCert = CERT_PATH) + nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), bindPort = Port(60002),wsBindPort = Port(8100), wsEnabled = true ) + + let + pubSubTopic = "test" + contentTopic = ContentTopic("/waku/2/default-content/proto") + payload = "hello world".toBytes() + message = WakuMessage(payload: payload, contentTopic: contentTopic) + + await node1.start() + await node1.mountRelay(@[pubSubTopic]) + + await node2.start() + await node2.mountRelay(@[pubSubTopic]) + + await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) + + var completionFut = newFuture[bool]() + proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = + let msg = WakuMessage.init(data) + if msg.isOk(): + let val = msg.value() + check: + topic == pubSubTopic + val.contentTopic == contentTopic + val.payload == payload + completionFut.complete(true) + + node1.subscribe(pubSubTopic, relayHandler) + await sleepAsync(500.millis) + + await node2.publish(pubSubTopic, message) + await sleepAsync(500.millis) + + + check: + (await completionFut.withTimeout(5.seconds)) == true + await node1.stop() + await node2.stop()