2022-09-12 12:51:52 +00:00
|
|
|
|
{.used.}
|
|
|
|
|
|
|
|
|
|
import
|
2023-04-04 08:58:45 +00:00
|
|
|
|
std/[os, sequtils, sysrand, math],
|
2022-09-12 12:51:52 +00:00
|
|
|
|
stew/byteutils,
|
|
|
|
|
stew/shims/net as stewNet,
|
|
|
|
|
testutils/unittests,
|
2023-02-07 16:42:17 +00:00
|
|
|
|
chronicles,
|
|
|
|
|
chronos,
|
2022-09-12 12:51:52 +00:00
|
|
|
|
libp2p/crypto/crypto,
|
|
|
|
|
libp2p/crypto/secp,
|
|
|
|
|
libp2p/peerid,
|
|
|
|
|
libp2p/multiaddress,
|
|
|
|
|
libp2p/switch,
|
|
|
|
|
libp2p/protocols/pubsub/rpc/messages,
|
|
|
|
|
libp2p/protocols/pubsub/pubsub,
|
2023-04-04 08:58:45 +00:00
|
|
|
|
libp2p/protocols/pubsub/gossipsub,
|
|
|
|
|
libp2p/multihash,
|
|
|
|
|
secp256k1
|
2022-09-12 12:51:52 +00:00
|
|
|
|
import
|
2022-10-21 13:01:39 +00:00
|
|
|
|
../../waku/v2/protocol/waku_message,
|
2023-02-06 09:03:30 +00:00
|
|
|
|
../../waku/v2/node/peer_manager,
|
2022-09-12 12:51:52 +00:00
|
|
|
|
../../waku/v2/utils/peers,
|
2023-02-07 16:42:17 +00:00
|
|
|
|
../../waku/v2/node/waku_node,
|
|
|
|
|
../../waku/v2/protocol/waku_relay,
|
2023-04-04 08:58:45 +00:00
|
|
|
|
../../waku/v2/protocol/waku_relay/validators,
|
2023-02-13 10:43:49 +00:00
|
|
|
|
../testlib/testutils,
|
2023-04-04 08:58:45 +00:00
|
|
|
|
../testlib/common,
|
2023-02-13 10:43:49 +00:00
|
|
|
|
../testlib/waku2
|
2022-09-12 12:51:52 +00:00
|
|
|
|
|
|
|
|
|
template sourceDir: string = currentSourcePath.parentDir()
|
|
|
|
|
const KEY_PATH = sourceDir / "resources/test_key.pem"
|
|
|
|
|
const CERT_PATH = sourceDir / "resources/test_cert.pem"
|
|
|
|
|
|
2023-02-13 10:43:49 +00:00
|
|
|
|
suite "WakuNode - Relay":
|
2023-02-07 16:42:17 +00:00
|
|
|
|
|
2022-09-12 12:51:52 +00:00
|
|
|
|
asyncTest "Relay protocol is started correctly":
|
|
|
|
|
let
|
2023-02-13 10:43:49 +00:00
|
|
|
|
nodeKey1 = generateSecp256k1Key()
|
2023-02-13 11:27:49 +00:00
|
|
|
|
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(0))
|
2022-09-12 12:51:52 +00:00
|
|
|
|
|
|
|
|
|
# Relay protocol starts if mounted after node start
|
|
|
|
|
|
|
|
|
|
await node1.start()
|
|
|
|
|
await node1.mountRelay()
|
|
|
|
|
|
|
|
|
|
check:
|
2023-02-10 14:17:50 +00:00
|
|
|
|
GossipSub(node1.wakuRelay).heartbeatFut.isNil() == false
|
2022-09-12 12:51:52 +00:00
|
|
|
|
|
|
|
|
|
# Relay protocol starts if mounted before node start
|
|
|
|
|
|
|
|
|
|
let
|
2023-02-13 10:43:49 +00:00
|
|
|
|
nodeKey2 = generateSecp256k1Key()
|
2023-02-13 11:27:49 +00:00
|
|
|
|
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(0))
|
2022-09-12 12:51:52 +00:00
|
|
|
|
|
|
|
|
|
await node2.mountRelay()
|
|
|
|
|
|
|
|
|
|
check:
|
|
|
|
|
# Relay has not yet started as node has not yet started
|
2023-02-10 14:17:50 +00:00
|
|
|
|
GossipSub(node2.wakuRelay).heartbeatFut.isNil()
|
2022-09-12 12:51:52 +00:00
|
|
|
|
|
|
|
|
|
await node2.start()
|
|
|
|
|
|
|
|
|
|
check:
|
|
|
|
|
# Relay started on node start
|
2023-02-10 14:17:50 +00:00
|
|
|
|
GossipSub(node2.wakuRelay).heartbeatFut.isNil() == false
|
2022-09-12 12:51:52 +00:00
|
|
|
|
|
|
|
|
|
await allFutures([node1.stop(), node2.stop()])
|
2023-02-07 16:42:17 +00:00
|
|
|
|
|
2022-09-12 12:51:52 +00:00
|
|
|
|
asyncTest "Messages are correctly relayed":
|
|
|
|
|
let
|
2023-02-13 10:43:49 +00:00
|
|
|
|
nodeKey1 = generateSecp256k1Key()
|
2023-02-13 11:27:49 +00:00
|
|
|
|
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(0))
|
2023-02-13 10:43:49 +00:00
|
|
|
|
nodeKey2 = generateSecp256k1Key()
|
2023-02-13 11:27:49 +00:00
|
|
|
|
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(0))
|
2023-02-13 10:43:49 +00:00
|
|
|
|
nodeKey3 = generateSecp256k1Key()
|
2023-02-13 11:27:49 +00:00
|
|
|
|
node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(0))
|
2022-09-12 12:51:52 +00:00
|
|
|
|
pubSubTopic = "test"
|
|
|
|
|
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
|
|
|
|
payload = "hello world".toBytes()
|
|
|
|
|
message = WakuMessage(payload: payload, contentTopic: contentTopic)
|
|
|
|
|
|
|
|
|
|
await node1.start()
|
|
|
|
|
await node1.mountRelay(@[pubSubTopic])
|
|
|
|
|
|
|
|
|
|
await node2.start()
|
|
|
|
|
await node2.mountRelay(@[pubSubTopic])
|
|
|
|
|
|
|
|
|
|
await node3.start()
|
|
|
|
|
await node3.mountRelay(@[pubSubTopic])
|
|
|
|
|
|
2023-02-10 14:17:50 +00:00
|
|
|
|
await allFutures(
|
|
|
|
|
node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]),
|
|
|
|
|
node3.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
|
|
|
|
|
)
|
2022-09-12 12:51:52 +00:00
|
|
|
|
|
|
|
|
|
var completionFut = newFuture[bool]()
|
|
|
|
|
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
2022-11-07 15:24:16 +00:00
|
|
|
|
let msg = WakuMessage.decode(data)
|
2022-09-12 12:51:52 +00:00
|
|
|
|
if msg.isOk():
|
|
|
|
|
let val = msg.value()
|
|
|
|
|
check:
|
|
|
|
|
topic == pubSubTopic
|
|
|
|
|
val.contentTopic == contentTopic
|
|
|
|
|
val.payload == payload
|
|
|
|
|
completionFut.complete(true)
|
|
|
|
|
|
|
|
|
|
node3.subscribe(pubSubTopic, relayHandler)
|
|
|
|
|
await sleepAsync(500.millis)
|
|
|
|
|
|
|
|
|
|
await node1.publish(pubSubTopic, message)
|
|
|
|
|
|
2023-02-10 14:17:50 +00:00
|
|
|
|
## Then
|
2022-09-12 12:51:52 +00:00
|
|
|
|
check:
|
|
|
|
|
(await completionFut.withTimeout(5.seconds)) == true
|
2023-02-10 14:17:50 +00:00
|
|
|
|
|
|
|
|
|
## Cleanup
|
|
|
|
|
await allFutures(node1.stop(), node2.stop(), node3.stop())
|
2023-02-07 16:42:17 +00:00
|
|
|
|
|
2022-09-12 12:51:52 +00:00
|
|
|
|
asyncTest "filtering relayed messages using topic validators":
|
|
|
|
|
## test scenario:
|
|
|
|
|
## node1 and node3 set node2 as their relay node
|
|
|
|
|
## node3 publishes two messages with two different contentTopics but on the same pubsub topic
|
|
|
|
|
## node1 is also subscribed to the same pubsub topic
|
|
|
|
|
## node2 sets a validator for the same pubsub topic
|
|
|
|
|
## only one of the messages gets delivered to node1 because the validator only validates one of the content topics
|
|
|
|
|
|
|
|
|
|
let
|
|
|
|
|
# publisher node
|
2023-02-13 10:43:49 +00:00
|
|
|
|
nodeKey1 = generateSecp256k1Key()
|
2023-02-13 11:27:49 +00:00
|
|
|
|
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(0))
|
2022-09-12 12:51:52 +00:00
|
|
|
|
# Relay node
|
2023-02-13 10:43:49 +00:00
|
|
|
|
nodeKey2 = generateSecp256k1Key()
|
2023-02-13 11:27:49 +00:00
|
|
|
|
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(0))
|
2022-09-12 12:51:52 +00:00
|
|
|
|
# Subscriber
|
2023-02-13 10:43:49 +00:00
|
|
|
|
nodeKey3 = generateSecp256k1Key()
|
2023-02-13 11:27:49 +00:00
|
|
|
|
node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(0))
|
2022-09-12 12:51:52 +00:00
|
|
|
|
|
|
|
|
|
pubSubTopic = "test"
|
|
|
|
|
contentTopic1 = ContentTopic("/waku/2/default-content/proto")
|
|
|
|
|
payload = "hello world".toBytes()
|
|
|
|
|
message1 = WakuMessage(payload: payload, contentTopic: contentTopic1)
|
|
|
|
|
|
|
|
|
|
payload2 = "you should not see this message!".toBytes()
|
|
|
|
|
contentTopic2 = ContentTopic("2")
|
|
|
|
|
message2 = WakuMessage(payload: payload2, contentTopic: contentTopic2)
|
|
|
|
|
|
|
|
|
|
# start all the nodes
|
|
|
|
|
await node1.start()
|
|
|
|
|
await node1.mountRelay(@[pubSubTopic])
|
|
|
|
|
|
|
|
|
|
await node2.start()
|
|
|
|
|
await node2.mountRelay(@[pubSubTopic])
|
|
|
|
|
|
|
|
|
|
await node3.start()
|
|
|
|
|
await node3.mountRelay(@[pubSubTopic])
|
|
|
|
|
|
|
|
|
|
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
|
|
|
|
|
await node3.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
|
|
|
|
|
|
|
|
|
|
var completionFutValidatorAcc = newFuture[bool]()
|
|
|
|
|
var completionFutValidatorRej = newFuture[bool]()
|
|
|
|
|
|
2023-02-10 14:17:50 +00:00
|
|
|
|
# set a topic validator for pubSubTopic
|
2022-09-12 12:51:52 +00:00
|
|
|
|
proc validator(topic: string, message: messages.Message): Future[ValidationResult] {.async.} =
|
|
|
|
|
## the validator that only allows messages with contentTopic1 to be relayed
|
|
|
|
|
check:
|
|
|
|
|
topic == pubSubTopic
|
2023-02-10 14:17:50 +00:00
|
|
|
|
|
2022-11-07 15:24:16 +00:00
|
|
|
|
let msg = WakuMessage.decode(message.data)
|
2023-02-10 14:17:50 +00:00
|
|
|
|
if msg.isErr():
|
|
|
|
|
completionFutValidatorAcc.complete(false)
|
|
|
|
|
return ValidationResult.Reject
|
2022-09-12 12:51:52 +00:00
|
|
|
|
|
2023-02-10 14:17:50 +00:00
|
|
|
|
# only relay messages with contentTopic1
|
|
|
|
|
if msg.value.contentTopic != contentTopic1:
|
|
|
|
|
completionFutValidatorRej.complete(true)
|
|
|
|
|
return ValidationResult.Reject
|
|
|
|
|
|
|
|
|
|
completionFutValidatorAcc.complete(true)
|
|
|
|
|
return ValidationResult.Accept
|
|
|
|
|
|
|
|
|
|
node2.wakuRelay.addValidator(pubSubTopic, validator)
|
2022-09-12 12:51:52 +00:00
|
|
|
|
|
|
|
|
|
var completionFut = newFuture[bool]()
|
|
|
|
|
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
|
|
|
|
debug "relayed pubsub topic:", topic
|
2022-11-07 15:24:16 +00:00
|
|
|
|
let msg = WakuMessage.decode(data)
|
2022-09-12 12:51:52 +00:00
|
|
|
|
if msg.isOk():
|
|
|
|
|
let val = msg.value()
|
|
|
|
|
check:
|
|
|
|
|
topic == pubSubTopic
|
|
|
|
|
# check that only messages with contentTopic1 is relayed (but not contentTopic2)
|
|
|
|
|
val.contentTopic == contentTopic1
|
|
|
|
|
# relay handler is called
|
|
|
|
|
completionFut.complete(true)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
node3.subscribe(pubSubTopic, relayHandler)
|
|
|
|
|
await sleepAsync(500.millis)
|
|
|
|
|
|
|
|
|
|
await node1.publish(pubSubTopic, message1)
|
|
|
|
|
await sleepAsync(500.millis)
|
|
|
|
|
|
|
|
|
|
# message2 never gets relayed because of the validator
|
|
|
|
|
await node1.publish(pubSubTopic, message2)
|
|
|
|
|
await sleepAsync(500.millis)
|
|
|
|
|
|
|
|
|
|
check:
|
|
|
|
|
(await completionFut.withTimeout(10.seconds)) == true
|
|
|
|
|
# check that validator is called for message1
|
|
|
|
|
(await completionFutValidatorAcc.withTimeout(10.seconds)) == true
|
|
|
|
|
# check that validator is called for message2
|
|
|
|
|
(await completionFutValidatorRej.withTimeout(10.seconds)) == true
|
|
|
|
|
|
|
|
|
|
await allFutures(node1.stop(), node2.stop(), node3.stop())
|
2023-02-07 16:42:17 +00:00
|
|
|
|
|
2023-02-10 14:17:50 +00:00
|
|
|
|
# TODO: Add a function to validate the WakuMessage integrity
|
|
|
|
|
xasyncTest "Stats of peer sending wrong WakuMessages are updated":
|
2023-02-07 16:42:17 +00:00
|
|
|
|
# Create 2 nodes
|
2023-02-13 10:43:49 +00:00
|
|
|
|
let nodes = toSeq(0..1).mapIt(WakuNode.new(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
|
2023-02-07 16:42:17 +00:00
|
|
|
|
|
|
|
|
|
# Start all the nodes and mount relay with
|
|
|
|
|
await allFutures(nodes.mapIt(it.start()))
|
|
|
|
|
await allFutures(nodes.mapIt(it.mountRelay()))
|
|
|
|
|
|
|
|
|
|
# Connect nodes
|
2023-03-28 11:29:48 +00:00
|
|
|
|
let connOk = await nodes[0].peerManager.connectRelay(nodes[1].switch.peerInfo.toRemotePeerInfo())
|
|
|
|
|
require:
|
|
|
|
|
connOk == true
|
2023-02-07 16:42:17 +00:00
|
|
|
|
|
|
|
|
|
# Node 1 subscribes to topic
|
2023-02-08 15:09:59 +00:00
|
|
|
|
nodes[1].subscribe(DefaultPubsubTopic)
|
2023-02-07 16:42:17 +00:00
|
|
|
|
await sleepAsync(500.millis)
|
|
|
|
|
|
|
|
|
|
# Node 0 publishes 5 messages not compliant with WakuMessage (aka random bytes)
|
|
|
|
|
for i in 0..4:
|
|
|
|
|
discard await nodes[0].wakuRelay.publish(DefaultPubsubTopic, urandom(1*(10^2)))
|
|
|
|
|
|
|
|
|
|
# Wait for gossip
|
|
|
|
|
await sleepAsync(500.millis)
|
|
|
|
|
|
|
|
|
|
# Verify that node 1 has received 5 invalid messages from node 0
|
|
|
|
|
# meaning that message validity is enforced to gossip messages
|
|
|
|
|
var peerStats = nodes[1].wakuRelay.peerStats
|
|
|
|
|
check:
|
|
|
|
|
peerStats[nodes[0].switch.peerInfo.peerId].topicInfos[DefaultPubsubTopic].invalidMessageDeliveries == 5.0
|
|
|
|
|
|
|
|
|
|
await allFutures(nodes.mapIt(it.stop()))
|
|
|
|
|
|
2023-04-04 08:58:45 +00:00
|
|
|
|
# TODO: Test multiple protected topics
|
|
|
|
|
|
|
|
|
|
asyncTest "Spam protected topic accepts signed messages":
|
|
|
|
|
# Create 5 nodes
|
|
|
|
|
let nodes = toSeq(0..<5).mapIt(WakuNode.new(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
|
|
|
|
|
|
|
|
|
|
# Protected topic and key to sign
|
|
|
|
|
let spamProtectedTopic = PubSubTopic("some-spam-protected-topic")
|
|
|
|
|
let secretKey = SkSecretKey.fromHex("5526a8990317c9b7b58d07843d270f9cd1d9aaee129294c1c478abf7261dd9e6").expect("valid key")
|
|
|
|
|
let publicKey = secretKey.toPublicKey()
|
|
|
|
|
let topicsPrivateKeys = {spamProtectedTopic: secretKey}.toTable
|
|
|
|
|
let topicsPublicKeys = {spamProtectedTopic: publicKey}.toTable
|
|
|
|
|
|
|
|
|
|
# Start all the nodes and mount relay with protected topic
|
|
|
|
|
await allFutures(nodes.mapIt(it.start()))
|
|
|
|
|
|
|
|
|
|
# Mount relay for all nodes
|
|
|
|
|
await allFutures(nodes.mapIt(it.mountRelay()))
|
|
|
|
|
|
|
|
|
|
# Add signed message validator to all nodes. They will only route signed messages
|
|
|
|
|
for node in nodes:
|
|
|
|
|
for topic, publicKey in topicsPublicKeys:
|
|
|
|
|
node.wakuRelay.addSignedTopicValidator(PubsubTopic(topic), publicKey)
|
|
|
|
|
|
|
|
|
|
# Connect the nodes in a full mesh
|
|
|
|
|
for i in 0..<5:
|
|
|
|
|
for j in 0..<5:
|
|
|
|
|
if i == j:
|
|
|
|
|
continue
|
|
|
|
|
let connOk = await nodes[i].peerManager.connectRelay(nodes[j].switch.peerInfo.toRemotePeerInfo())
|
|
|
|
|
require connOk
|
|
|
|
|
|
|
|
|
|
var msgReceived = 0
|
|
|
|
|
proc handler(pubsubTopic: PubsubTopic, data: WakuMessage) {.async, gcsafe.} =
|
|
|
|
|
msgReceived += 1
|
|
|
|
|
|
|
|
|
|
# Subscribe all nodes to the same topic/handler
|
|
|
|
|
for node in nodes: node.wakuRelay.subscribe(spamProtectedTopic, handler)
|
|
|
|
|
await sleepAsync(500.millis)
|
|
|
|
|
|
|
|
|
|
# Each node publishes 10 signed messages
|
|
|
|
|
for i in 0..<5:
|
|
|
|
|
for j in 0..<10:
|
|
|
|
|
var msg = WakuMessage(
|
|
|
|
|
payload: urandom(1*(10^3)), contentTopic: spamProtectedTopic,
|
|
|
|
|
version: 2, timestamp: now(), ephemeral: true)
|
|
|
|
|
|
|
|
|
|
# Include signature
|
|
|
|
|
msg.meta = secretKey.sign(SkMessage(spamProtectedTopic.msgHash(msg))).toRaw()[0..63]
|
|
|
|
|
|
|
|
|
|
await nodes[i].publish(spamProtectedTopic, msg)
|
|
|
|
|
|
|
|
|
|
# Wait for gossip
|
|
|
|
|
await sleepAsync(500.millis)
|
|
|
|
|
|
|
|
|
|
# 50 messages were sent to 5 peers = 250 messages
|
|
|
|
|
check:
|
|
|
|
|
msgReceived == 250
|
|
|
|
|
|
|
|
|
|
# No invalid messages were received by any peer
|
|
|
|
|
for i in 0..<5:
|
|
|
|
|
for k, v in nodes[i].wakuRelay.peerStats.mpairs:
|
|
|
|
|
check:
|
|
|
|
|
v.topicInfos[spamProtectedTopic].invalidMessageDeliveries == 0.0
|
|
|
|
|
|
|
|
|
|
# Stop all nodes
|
|
|
|
|
await allFutures(nodes.mapIt(it.stop()))
|
|
|
|
|
|
|
|
|
|
asyncTest "Spam protected topic rejects non-signed and wrongly-signed messages":
|
|
|
|
|
# Create 5 nodes
|
|
|
|
|
let nodes = toSeq(0..<5).mapIt(WakuNode.new(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
|
|
|
|
|
|
|
|
|
|
# Protected topic and key to sign
|
|
|
|
|
let spamProtectedTopic = PubSubTopic("some-spam-protected-topic")
|
|
|
|
|
let secretKey = SkSecretKey.fromHex("5526a8990317c9b7b58d07843d270f9cd1d9aaee129294c1c478abf7261dd9e6").expect("valid key")
|
|
|
|
|
let publicKey = secretKey.toPublicKey()
|
|
|
|
|
let topicsPrivateKeys = {spamProtectedTopic: secretKey}.toTable
|
|
|
|
|
let topicsPublicKeys = {spamProtectedTopic: publicKey}.toTable
|
|
|
|
|
|
|
|
|
|
# Non whitelisted secret key
|
|
|
|
|
let wrongSecretKey = SkSecretKey.fromHex("32ad0cc8edeb9f8a3e8635c5fe5bd200b9247a33da5e7171bd012691805151f3").expect("valid key")
|
|
|
|
|
|
|
|
|
|
# Start all the nodes and mount relay with protected topic
|
|
|
|
|
await allFutures(nodes.mapIt(it.start()))
|
|
|
|
|
|
|
|
|
|
# Mount relay with spam protected topics
|
|
|
|
|
await allFutures(nodes.mapIt(it.mountRelay()))
|
|
|
|
|
|
|
|
|
|
# Add signed message validator to all nodes. They will only route signed messages
|
|
|
|
|
for node in nodes:
|
|
|
|
|
for topic, publicKey in topicsPublicKeys:
|
|
|
|
|
node.wakuRelay.addSignedTopicValidator(PubsubTopic(topic), publicKey)
|
|
|
|
|
|
|
|
|
|
# Connect the nodes in a full mesh
|
|
|
|
|
for i in 0..<5:
|
|
|
|
|
for j in 0..<5:
|
|
|
|
|
if i == j:
|
|
|
|
|
continue
|
|
|
|
|
let connOk = await nodes[i].peerManager.connectRelay(nodes[j].switch.peerInfo.toRemotePeerInfo())
|
|
|
|
|
require connOk
|
|
|
|
|
|
|
|
|
|
var msgReceived = 0
|
|
|
|
|
proc handler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
|
|
|
|
|
msgReceived += 1
|
|
|
|
|
|
|
|
|
|
# Subscribe all nodes to the same topic/handler
|
|
|
|
|
for node in nodes: node.wakuRelay.subscribe(spamProtectedTopic, handler)
|
|
|
|
|
await sleepAsync(500.millis)
|
|
|
|
|
|
|
|
|
|
# Each node sends 10 messages, signed but with a non-whitelisted key (total = 50)
|
|
|
|
|
for i in 0..<5:
|
|
|
|
|
for j in 0..<10:
|
|
|
|
|
var msg = WakuMessage(
|
|
|
|
|
payload: urandom(1*(10^3)), contentTopic: spamProtectedTopic,
|
|
|
|
|
version: 2, timestamp: now(), ephemeral: true)
|
|
|
|
|
|
|
|
|
|
# Sign the message with a wrong key
|
|
|
|
|
msg.meta = wrongSecretKey.sign(SkMessage(spamProtectedTopic.msgHash(msg))).toRaw()[0..63]
|
|
|
|
|
|
|
|
|
|
await nodes[i].publish(spamProtectedTopic, msg)
|
|
|
|
|
|
|
|
|
|
# Each node sends 10 messages that are not signed (total = 50)
|
|
|
|
|
for i in 0..<5:
|
|
|
|
|
for j in 0..<10:
|
|
|
|
|
let unsignedMessage = WakuMessage(
|
|
|
|
|
payload: urandom(1*(10^3)), contentTopic: spamProtectedTopic,
|
|
|
|
|
version: 2, timestamp: now(), ephemeral: true)
|
|
|
|
|
await nodes[i].publish(spamProtectedTopic, unsignedMessage)
|
|
|
|
|
|
|
|
|
|
# Wait for gossip
|
|
|
|
|
await sleepAsync(500.millis)
|
|
|
|
|
|
|
|
|
|
# Since we have a full mesh with 5 nodes and each one publishes 50+50 msgs
|
|
|
|
|
# there are 500 messages being sent.
|
|
|
|
|
# 100 are received ok in the handler (first hop)
|
|
|
|
|
# 400 are are wrong so rejected (rejected not relayed)
|
|
|
|
|
check:
|
|
|
|
|
msgReceived == 100
|
|
|
|
|
|
|
|
|
|
var msgRejected = 0
|
|
|
|
|
for i in 0..<5:
|
|
|
|
|
for k, v in nodes[i].wakuRelay.peerStats.mpairs:
|
|
|
|
|
msgRejected += v.topicInfos[spamProtectedTopic].invalidMessageDeliveries.int
|
|
|
|
|
|
|
|
|
|
check:
|
|
|
|
|
msgRejected == 400
|
|
|
|
|
|
|
|
|
|
await allFutures(nodes.mapIt(it.stop()))
|
|
|
|
|
|
|
|
|
|
asyncTest "Spam protected topic rejects a spammer node":
|
|
|
|
|
# Create 5 nodes
|
|
|
|
|
let nodes = toSeq(0..<5).mapIt(WakuNode.new(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
|
|
|
|
|
|
|
|
|
|
# Protected topic and key to sign
|
|
|
|
|
let spamProtectedTopic = PubSubTopic("some-spam-protected-topic")
|
|
|
|
|
let secretKey = SkSecretKey.fromHex("5526a8990317c9b7b58d07843d270f9cd1d9aaee129294c1c478abf7261dd9e6").expect("valid key")
|
|
|
|
|
let publicKey = secretKey.toPublicKey()
|
|
|
|
|
let topicsPrivateKeys = {spamProtectedTopic: secretKey}.toTable
|
|
|
|
|
let topicsPublicKeys = {spamProtectedTopic: publicKey}.toTable
|
|
|
|
|
|
|
|
|
|
# Non whitelisted secret key
|
|
|
|
|
let wrongSecretKey = SkSecretKey.fromHex("32ad0cc8edeb9f8a3e8635c5fe5bd200b9247a33da5e7171bd012691805151f3").expect("valid key")
|
|
|
|
|
|
|
|
|
|
# Start all the nodes and mount relay with protected topic
|
|
|
|
|
await allFutures(nodes.mapIt(it.start()))
|
|
|
|
|
|
|
|
|
|
# Mount relay for all nodes
|
|
|
|
|
await allFutures(nodes.mapIt(it.mountRelay()))
|
|
|
|
|
|
|
|
|
|
var msgReceived = 0
|
|
|
|
|
proc handler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
|
|
|
|
|
msgReceived += 1
|
|
|
|
|
|
|
|
|
|
# Subscribe all nodes to the same topic/handler
|
|
|
|
|
for node in nodes: node.wakuRelay.subscribe(spamProtectedTopic, handler)
|
|
|
|
|
await sleepAsync(500.millis)
|
|
|
|
|
|
|
|
|
|
# Add signed message validator to all nodes. They will only route signed messages
|
|
|
|
|
for node in nodes:
|
|
|
|
|
for topic, publicKey in topicsPublicKeys:
|
|
|
|
|
node.wakuRelay.addSignedTopicValidator(PubsubTopic(topic), publicKey)
|
|
|
|
|
|
|
|
|
|
# nodes[0] is connected only to nodes[1]
|
|
|
|
|
let connOk1 = await nodes[0].peerManager.connectRelay(nodes[1].switch.peerInfo.toRemotePeerInfo())
|
|
|
|
|
require connOk1
|
|
|
|
|
|
|
|
|
|
# rest of nodes[1..4] are connected in a full mesh
|
|
|
|
|
for i in 1..<5:
|
|
|
|
|
for j in 1..<5:
|
|
|
|
|
if i == j:
|
|
|
|
|
continue
|
|
|
|
|
let connOk2 = await nodes[i].peerManager.connectRelay(nodes[j].switch.peerInfo.toRemotePeerInfo())
|
|
|
|
|
require connOk2
|
|
|
|
|
|
|
|
|
|
await sleepAsync(500.millis)
|
|
|
|
|
|
|
|
|
|
# nodes[0] spams 50 non signed messages (nodes[0] just knows of nodes[1])
|
|
|
|
|
for j in 0..<50:
|
|
|
|
|
let unsignedMessage = WakuMessage(
|
|
|
|
|
payload: urandom(1*(10^3)), contentTopic: spamProtectedTopic,
|
|
|
|
|
version: 2, timestamp: now(), ephemeral: true)
|
|
|
|
|
await nodes[0].publish(spamProtectedTopic, unsignedMessage)
|
|
|
|
|
|
|
|
|
|
# nodes[0] spams 50 wrongly signed messages (nodes[0] just knows of nodes[1])
|
|
|
|
|
for j in 0..<50:
|
|
|
|
|
var msg = WakuMessage(
|
|
|
|
|
payload: urandom(1*(10^3)), contentTopic: spamProtectedTopic,
|
|
|
|
|
version: 2, timestamp: now(), ephemeral: true)
|
|
|
|
|
# Sign the message with a wrong key
|
|
|
|
|
msg.meta = wrongSecretKey.sign(SkMessage(spamProtectedTopic.msgHash(msg))).toRaw()[0..63]
|
|
|
|
|
await nodes[0].publish(spamProtectedTopic, msg)
|
|
|
|
|
|
|
|
|
|
# Wait for gossip
|
|
|
|
|
await sleepAsync(500.millis)
|
|
|
|
|
|
|
|
|
|
# only 100 messages are received (50 + 50) which demonstrate
|
|
|
|
|
# nodes[1] doest gossip invalid messages.
|
|
|
|
|
check:
|
|
|
|
|
msgReceived == 100
|
|
|
|
|
|
|
|
|
|
# peer1 got invalid messages from peer0
|
|
|
|
|
let p0Id = nodes[0].peerInfo.peerId
|
|
|
|
|
check:
|
|
|
|
|
nodes[1].wakuRelay.peerStats[p0Id].topicInfos[spamProtectedTopic].invalidMessageDeliveries == 100.0
|
|
|
|
|
|
|
|
|
|
# peer1 did not gossip further, so no other node rx invalid messages
|
|
|
|
|
for i in 0..<5:
|
|
|
|
|
for k, v in nodes[i].wakuRelay.peerStats.mpairs:
|
|
|
|
|
if k == p0Id and i == 1:
|
|
|
|
|
continue
|
|
|
|
|
check:
|
|
|
|
|
v.topicInfos[spamProtectedTopic].invalidMessageDeliveries == 0.0
|
|
|
|
|
|
|
|
|
|
# Stop all nodes
|
|
|
|
|
await allFutures(nodes.mapIt(it.stop()))
|
|
|
|
|
|
2022-09-12 12:51:52 +00:00
|
|
|
|
asyncTest "Messages are relayed between two websocket nodes":
|
|
|
|
|
let
|
2023-02-13 10:43:49 +00:00
|
|
|
|
nodeKey1 = generateSecp256k1Key()
|
2022-09-12 12:51:52 +00:00
|
|
|
|
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
2023-02-08 15:09:59 +00:00
|
|
|
|
bindPort = Port(60510), wsBindPort = Port(8001), wsEnabled = true)
|
2023-02-13 10:43:49 +00:00
|
|
|
|
nodeKey2 = generateSecp256k1Key()
|
2022-09-12 12:51:52 +00:00
|
|
|
|
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
2023-02-08 15:09:59 +00:00
|
|
|
|
bindPort = Port(60512), wsBindPort = Port(8101), wsEnabled = true)
|
2022-09-12 12:51:52 +00:00
|
|
|
|
pubSubTopic = "test"
|
|
|
|
|
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
|
|
|
|
payload = "hello world".toBytes()
|
|
|
|
|
message = WakuMessage(payload: payload, contentTopic: contentTopic)
|
|
|
|
|
|
|
|
|
|
await node1.start()
|
|
|
|
|
await node1.mountRelay(@[pubSubTopic])
|
|
|
|
|
|
|
|
|
|
await node2.start()
|
|
|
|
|
await node2.mountRelay(@[pubSubTopic])
|
|
|
|
|
|
|
|
|
|
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
|
|
|
|
|
|
|
|
|
|
var completionFut = newFuture[bool]()
|
|
|
|
|
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
2022-11-07 15:24:16 +00:00
|
|
|
|
let msg = WakuMessage.decode(data)
|
2022-09-12 12:51:52 +00:00
|
|
|
|
if msg.isOk():
|
|
|
|
|
let val = msg.value()
|
|
|
|
|
check:
|
|
|
|
|
topic == pubSubTopic
|
|
|
|
|
val.contentTopic == contentTopic
|
|
|
|
|
val.payload == payload
|
|
|
|
|
completionFut.complete(true)
|
|
|
|
|
|
|
|
|
|
node1.subscribe(pubSubTopic, relayHandler)
|
|
|
|
|
await sleepAsync(500.millis)
|
|
|
|
|
|
|
|
|
|
await node2.publish(pubSubTopic, message)
|
|
|
|
|
await sleepAsync(500.millis)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
check:
|
|
|
|
|
(await completionFut.withTimeout(5.seconds)) == true
|
|
|
|
|
await node1.stop()
|
|
|
|
|
await node2.stop()
|
|
|
|
|
|
|
|
|
|
asyncTest "Messages are relayed between nodes with multiple transports (TCP and Websockets)":
|
|
|
|
|
let
|
2023-02-13 10:43:49 +00:00
|
|
|
|
nodeKey1 = generateSecp256k1Key()
|
2022-09-12 12:51:52 +00:00
|
|
|
|
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
2023-02-08 15:09:59 +00:00
|
|
|
|
bindPort = Port(60520), wsBindPort = Port(8002), wsEnabled = true)
|
2023-02-13 10:43:49 +00:00
|
|
|
|
nodeKey2 = generateSecp256k1Key()
|
2022-09-12 12:51:52 +00:00
|
|
|
|
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
2022-11-03 13:47:56 +00:00
|
|
|
|
bindPort = Port(60522))
|
2022-09-12 12:51:52 +00:00
|
|
|
|
pubSubTopic = "test"
|
|
|
|
|
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
|
|
|
|
payload = "hello world".toBytes()
|
|
|
|
|
message = WakuMessage(payload: payload, contentTopic: contentTopic)
|
|
|
|
|
|
|
|
|
|
await node1.start()
|
|
|
|
|
await node1.mountRelay(@[pubSubTopic])
|
|
|
|
|
|
|
|
|
|
await node2.start()
|
|
|
|
|
await node2.mountRelay(@[pubSubTopic])
|
|
|
|
|
|
|
|
|
|
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
|
|
|
|
|
|
|
|
|
|
var completionFut = newFuture[bool]()
|
|
|
|
|
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
2022-11-07 15:24:16 +00:00
|
|
|
|
let msg = WakuMessage.decode(data)
|
2022-09-12 12:51:52 +00:00
|
|
|
|
if msg.isOk():
|
|
|
|
|
let val = msg.value()
|
|
|
|
|
check:
|
|
|
|
|
topic == pubSubTopic
|
|
|
|
|
val.contentTopic == contentTopic
|
|
|
|
|
val.payload == payload
|
|
|
|
|
completionFut.complete(true)
|
|
|
|
|
|
|
|
|
|
node1.subscribe(pubSubTopic, relayHandler)
|
|
|
|
|
await sleepAsync(500.millis)
|
|
|
|
|
|
|
|
|
|
await node2.publish(pubSubTopic, message)
|
|
|
|
|
await sleepAsync(500.millis)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
check:
|
|
|
|
|
(await completionFut.withTimeout(5.seconds)) == true
|
|
|
|
|
await node1.stop()
|
|
|
|
|
await node2.stop()
|
|
|
|
|
|
|
|
|
|
asyncTest "Messages relaying fails with non-overlapping transports (TCP or Websockets)":
|
|
|
|
|
let
|
2023-02-13 10:43:49 +00:00
|
|
|
|
nodeKey1 = generateSecp256k1Key()
|
2022-09-12 12:51:52 +00:00
|
|
|
|
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
2022-11-03 13:47:56 +00:00
|
|
|
|
bindPort = Port(60530))
|
2023-02-13 10:43:49 +00:00
|
|
|
|
nodeKey2 = generateSecp256k1Key()
|
2022-09-12 12:51:52 +00:00
|
|
|
|
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
2023-02-08 15:09:59 +00:00
|
|
|
|
bindPort = Port(60532), wsBindPort = Port(8103), wsEnabled = true)
|
2022-09-12 12:51:52 +00:00
|
|
|
|
pubSubTopic = "test"
|
|
|
|
|
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
|
|
|
|
payload = "hello world".toBytes()
|
|
|
|
|
message = WakuMessage(payload: payload, contentTopic: contentTopic)
|
|
|
|
|
|
|
|
|
|
await node1.start()
|
|
|
|
|
await node1.mountRelay(@[pubSubTopic])
|
|
|
|
|
|
|
|
|
|
await node2.start()
|
|
|
|
|
await node2.mountRelay(@[pubSubTopic])
|
|
|
|
|
|
|
|
|
|
#delete websocket peer address
|
|
|
|
|
# TODO: a better way to find the index - this is too brittle
|
2022-10-28 09:51:46 +00:00
|
|
|
|
node2.switch.peerInfo.listenAddrs.delete(0)
|
2022-09-12 12:51:52 +00:00
|
|
|
|
|
|
|
|
|
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
|
|
|
|
|
|
|
|
|
|
var completionFut = newFuture[bool]()
|
|
|
|
|
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
2022-11-07 15:24:16 +00:00
|
|
|
|
let msg = WakuMessage.decode(data)
|
2022-09-12 12:51:52 +00:00
|
|
|
|
if msg.isOk():
|
|
|
|
|
let val = msg.value()
|
|
|
|
|
check:
|
|
|
|
|
topic == pubSubTopic
|
|
|
|
|
val.contentTopic == contentTopic
|
|
|
|
|
val.payload == payload
|
|
|
|
|
completionFut.complete(true)
|
|
|
|
|
|
|
|
|
|
node1.subscribe(pubSubTopic, relayHandler)
|
|
|
|
|
await sleepAsync(500.millis)
|
|
|
|
|
|
|
|
|
|
await node2.publish(pubSubTopic, message)
|
|
|
|
|
await sleepAsync(500.millis)
|
|
|
|
|
|
|
|
|
|
check:
|
|
|
|
|
(await completionFut.withTimeout(5.seconds)) == false
|
|
|
|
|
|
|
|
|
|
await allFutures(node1.stop(), node2.stop())
|
|
|
|
|
|
|
|
|
|
asyncTest "Messages are relayed between nodes with multiple transports (TCP and secure Websockets)":
|
|
|
|
|
let
|
2023-02-13 10:43:49 +00:00
|
|
|
|
nodeKey1 = generateSecp256k1Key()
|
2022-09-12 12:51:52 +00:00
|
|
|
|
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"),
|
2023-02-08 15:09:59 +00:00
|
|
|
|
bindPort = Port(60540), wsBindPort = Port(8004), wssEnabled = true, secureKey = KEY_PATH, secureCert = CERT_PATH)
|
2023-02-13 10:43:49 +00:00
|
|
|
|
nodeKey2 = generateSecp256k1Key()
|
2022-09-12 12:51:52 +00:00
|
|
|
|
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"),
|
2022-11-03 13:47:56 +00:00
|
|
|
|
bindPort = Port(60542))
|
2022-09-12 12:51:52 +00:00
|
|
|
|
pubSubTopic = "test"
|
|
|
|
|
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
|
|
|
|
payload = "hello world".toBytes()
|
|
|
|
|
message = WakuMessage(payload: payload, contentTopic: contentTopic)
|
|
|
|
|
|
|
|
|
|
await node1.start()
|
|
|
|
|
await node1.mountRelay(@[pubSubTopic])
|
|
|
|
|
|
|
|
|
|
await node2.start()
|
|
|
|
|
await node2.mountRelay(@[pubSubTopic])
|
|
|
|
|
|
|
|
|
|
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
|
|
|
|
|
|
|
|
|
|
var completionFut = newFuture[bool]()
|
|
|
|
|
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
2022-11-07 15:24:16 +00:00
|
|
|
|
let msg = WakuMessage.decode(data)
|
2022-09-12 12:51:52 +00:00
|
|
|
|
if msg.isOk():
|
|
|
|
|
let val = msg.value()
|
|
|
|
|
check:
|
|
|
|
|
topic == pubSubTopic
|
|
|
|
|
val.contentTopic == contentTopic
|
|
|
|
|
val.payload == payload
|
|
|
|
|
completionFut.complete(true)
|
|
|
|
|
|
|
|
|
|
node1.subscribe(pubSubTopic, relayHandler)
|
|
|
|
|
await sleepAsync(500.millis)
|
|
|
|
|
|
|
|
|
|
await node2.publish(pubSubTopic, message)
|
|
|
|
|
await sleepAsync(500.millis)
|
|
|
|
|
|
|
|
|
|
check:
|
|
|
|
|
(await completionFut.withTimeout(5.seconds)) == true
|
2023-02-07 16:42:17 +00:00
|
|
|
|
|
2022-09-12 12:51:52 +00:00
|
|
|
|
await allFutures(node1.stop(), node2.stop())
|
|
|
|
|
|
|
|
|
|
asyncTest "Messages are relayed between nodes with multiple transports (websocket and secure Websockets)":
|
|
|
|
|
let
|
2023-02-13 10:43:49 +00:00
|
|
|
|
nodeKey1 = generateSecp256k1Key()
|
2023-02-08 15:09:59 +00:00
|
|
|
|
node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), bindPort = Port(60550), wsBindPort = Port(8005), wssEnabled = true, secureKey = KEY_PATH, secureCert = CERT_PATH)
|
2023-02-13 10:43:49 +00:00
|
|
|
|
nodeKey2 = generateSecp256k1Key()
|
2023-02-08 15:09:59 +00:00
|
|
|
|
node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), bindPort = Port(60552),wsBindPort = Port(8105), wsEnabled = true )
|
2023-02-07 16:42:17 +00:00
|
|
|
|
|
2022-09-12 12:51:52 +00:00
|
|
|
|
let
|
|
|
|
|
pubSubTopic = "test"
|
|
|
|
|
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
|
|
|
|
payload = "hello world".toBytes()
|
|
|
|
|
message = WakuMessage(payload: payload, contentTopic: contentTopic)
|
|
|
|
|
|
|
|
|
|
await node1.start()
|
|
|
|
|
await node1.mountRelay(@[pubSubTopic])
|
|
|
|
|
|
|
|
|
|
await node2.start()
|
|
|
|
|
await node2.mountRelay(@[pubSubTopic])
|
|
|
|
|
|
|
|
|
|
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
|
|
|
|
|
|
|
|
|
|
var completionFut = newFuture[bool]()
|
|
|
|
|
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
2022-11-07 15:24:16 +00:00
|
|
|
|
let msg = WakuMessage.decode(data)
|
2022-09-12 12:51:52 +00:00
|
|
|
|
if msg.isOk():
|
|
|
|
|
let val = msg.value()
|
|
|
|
|
check:
|
|
|
|
|
topic == pubSubTopic
|
|
|
|
|
val.contentTopic == contentTopic
|
|
|
|
|
val.payload == payload
|
|
|
|
|
completionFut.complete(true)
|
|
|
|
|
|
|
|
|
|
node1.subscribe(pubSubTopic, relayHandler)
|
|
|
|
|
await sleepAsync(500.millis)
|
|
|
|
|
|
|
|
|
|
await node2.publish(pubSubTopic, message)
|
|
|
|
|
await sleepAsync(500.millis)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
check:
|
|
|
|
|
(await completionFut.withTimeout(5.seconds)) == true
|
|
|
|
|
await node1.stop()
|
|
|
|
|
await node2.stop()
|