2022-09-12 12:51:52 +00:00
|
|
|
|
{.used.}
|
|
|
|
|
|
|
|
|
|
import
|
2023-04-04 08:58:45 +00:00
|
|
|
|
std/[os, sequtils, sysrand, math],
|
2022-09-12 12:51:52 +00:00
|
|
|
|
stew/byteutils,
|
|
|
|
|
stew/shims/net as stewNet,
|
|
|
|
|
testutils/unittests,
|
2023-02-07 16:42:17 +00:00
|
|
|
|
chronos,
|
2022-09-12 12:51:52 +00:00
|
|
|
|
libp2p/switch,
|
|
|
|
|
libp2p/protocols/pubsub/pubsub,
|
2023-09-07 11:41:56 +00:00
|
|
|
|
libp2p/protocols/pubsub/gossipsub
|
2022-09-12 12:51:52 +00:00
|
|
|
|
import
|
2024-07-05 22:03:38 +00:00
|
|
|
|
waku/[waku_core, node/peer_manager, waku_node, waku_relay],
|
2023-02-13 10:43:49 +00:00
|
|
|
|
../testlib/testutils,
|
2023-04-05 14:01:51 +00:00
|
|
|
|
../testlib/wakucore,
|
|
|
|
|
../testlib/wakunode
|
2022-09-12 12:51:52 +00:00
|
|
|
|
|
2024-03-15 23:08:47 +00:00
|
|
|
|
template sourceDir(): string =
|
|
|
|
|
currentSourcePath.parentDir()
|
|
|
|
|
|
2022-09-12 12:51:52 +00:00
|
|
|
|
const KEY_PATH = sourceDir / "resources/test_key.pem"
|
|
|
|
|
const CERT_PATH = sourceDir / "resources/test_cert.pem"
|
|
|
|
|
|
2023-02-13 10:43:49 +00:00
|
|
|
|
suite "WakuNode - Relay":
|
2022-09-12 12:51:52 +00:00
|
|
|
|
asyncTest "Relay protocol is started correctly":
|
|
|
|
|
let
|
2023-02-13 10:43:49 +00:00
|
|
|
|
nodeKey1 = generateSecp256k1Key()
|
2023-12-14 06:16:39 +00:00
|
|
|
|
node1 = newTestWakuNode(nodeKey1, parseIpAddress("0.0.0.0"), Port(0))
|
2022-09-12 12:51:52 +00:00
|
|
|
|
|
|
|
|
|
# Relay protocol starts if mounted after node start
|
|
|
|
|
|
|
|
|
|
await node1.start()
|
|
|
|
|
await node1.mountRelay()
|
|
|
|
|
|
|
|
|
|
check:
|
2023-02-10 14:17:50 +00:00
|
|
|
|
GossipSub(node1.wakuRelay).heartbeatFut.isNil() == false
|
2022-09-12 12:51:52 +00:00
|
|
|
|
|
|
|
|
|
# Relay protocol starts if mounted before node start
|
|
|
|
|
|
|
|
|
|
let
|
2023-02-13 10:43:49 +00:00
|
|
|
|
nodeKey2 = generateSecp256k1Key()
|
2023-12-14 06:16:39 +00:00
|
|
|
|
node2 = newTestWakuNode(nodeKey2, parseIpAddress("0.0.0.0"), Port(0))
|
2022-09-12 12:51:52 +00:00
|
|
|
|
|
|
|
|
|
await node2.mountRelay()
|
|
|
|
|
|
|
|
|
|
check:
|
|
|
|
|
# Relay has not yet started as node has not yet started
|
2023-02-10 14:17:50 +00:00
|
|
|
|
GossipSub(node2.wakuRelay).heartbeatFut.isNil()
|
2022-09-12 12:51:52 +00:00
|
|
|
|
|
|
|
|
|
await node2.start()
|
|
|
|
|
|
|
|
|
|
check:
|
|
|
|
|
# Relay started on node start
|
2023-02-10 14:17:50 +00:00
|
|
|
|
GossipSub(node2.wakuRelay).heartbeatFut.isNil() == false
|
2022-09-12 12:51:52 +00:00
|
|
|
|
|
|
|
|
|
await allFutures([node1.stop(), node2.stop()])
|
2023-02-07 16:42:17 +00:00
|
|
|
|
|
2022-09-12 12:51:52 +00:00
|
|
|
|
asyncTest "Messages are correctly relayed":
|
|
|
|
|
let
|
2023-02-13 10:43:49 +00:00
|
|
|
|
nodeKey1 = generateSecp256k1Key()
|
2023-12-14 06:16:39 +00:00
|
|
|
|
node1 = newTestWakuNode(nodeKey1, parseIpAddress("0.0.0.0"), Port(0))
|
2023-02-13 10:43:49 +00:00
|
|
|
|
nodeKey2 = generateSecp256k1Key()
|
2023-12-14 06:16:39 +00:00
|
|
|
|
node2 = newTestWakuNode(nodeKey2, parseIpAddress("0.0.0.0"), Port(0))
|
2023-02-13 10:43:49 +00:00
|
|
|
|
nodeKey3 = generateSecp256k1Key()
|
2023-12-14 06:16:39 +00:00
|
|
|
|
node3 = newTestWakuNode(nodeKey3, parseIpAddress("0.0.0.0"), Port(0))
|
2024-09-10 21:07:12 +00:00
|
|
|
|
shard = DefaultRelayShard
|
2022-09-12 12:51:52 +00:00
|
|
|
|
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
|
|
|
|
payload = "hello world".toBytes()
|
|
|
|
|
message = WakuMessage(payload: payload, contentTopic: contentTopic)
|
|
|
|
|
|
|
|
|
|
await node1.start()
|
2024-09-10 21:07:12 +00:00
|
|
|
|
await node1.mountRelay(@[shard])
|
2022-09-12 12:51:52 +00:00
|
|
|
|
|
|
|
|
|
await node2.start()
|
2024-09-10 21:07:12 +00:00
|
|
|
|
await node2.mountRelay(@[shard])
|
2022-09-12 12:51:52 +00:00
|
|
|
|
|
|
|
|
|
await node3.start()
|
2024-09-10 21:07:12 +00:00
|
|
|
|
await node3.mountRelay(@[shard])
|
2022-09-12 12:51:52 +00:00
|
|
|
|
|
2023-02-10 14:17:50 +00:00
|
|
|
|
await allFutures(
|
|
|
|
|
node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]),
|
2024-03-15 23:08:47 +00:00
|
|
|
|
node3.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]),
|
2023-02-10 14:17:50 +00:00
|
|
|
|
)
|
2022-09-12 12:51:52 +00:00
|
|
|
|
|
|
|
|
|
var completionFut = newFuture[bool]()
|
2024-03-15 23:08:47 +00:00
|
|
|
|
proc relayHandler(
|
|
|
|
|
topic: PubsubTopic, msg: WakuMessage
|
|
|
|
|
): Future[void] {.async, gcsafe.} =
|
2023-06-06 17:28:47 +00:00
|
|
|
|
check:
|
2024-09-10 21:07:12 +00:00
|
|
|
|
topic == $shard
|
2023-06-06 17:28:47 +00:00
|
|
|
|
msg.contentTopic == contentTopic
|
|
|
|
|
msg.payload == payload
|
2022-09-12 12:51:52 +00:00
|
|
|
|
completionFut.complete(true)
|
|
|
|
|
|
2024-09-10 21:07:12 +00:00
|
|
|
|
node3.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler))
|
2022-09-12 12:51:52 +00:00
|
|
|
|
await sleepAsync(500.millis)
|
|
|
|
|
|
2024-09-10 21:07:12 +00:00
|
|
|
|
var res = await node1.publish(some($shard), message)
|
2024-01-18 12:49:13 +00:00
|
|
|
|
assert res.isOk(), $res.error
|
2022-09-12 12:51:52 +00:00
|
|
|
|
|
2023-02-10 14:17:50 +00:00
|
|
|
|
## Then
|
2022-09-12 12:51:52 +00:00
|
|
|
|
check:
|
|
|
|
|
(await completionFut.withTimeout(5.seconds)) == true
|
2023-02-10 14:17:50 +00:00
|
|
|
|
|
|
|
|
|
## Cleanup
|
|
|
|
|
await allFutures(node1.stop(), node2.stop(), node3.stop())
|
2023-02-07 16:42:17 +00:00
|
|
|
|
|
2022-09-12 12:51:52 +00:00
|
|
|
|
asyncTest "filtering relayed messages using topic validators":
|
|
|
|
|
## test scenario:
|
|
|
|
|
## node1 and node3 set node2 as their relay node
|
|
|
|
|
## node3 publishes two messages with two different contentTopics but on the same pubsub topic
|
|
|
|
|
## node1 is also subscribed to the same pubsub topic
|
|
|
|
|
## node2 sets a validator for the same pubsub topic
|
|
|
|
|
## only one of the messages gets delivered to node1 because the validator only validates one of the content topics
|
|
|
|
|
|
|
|
|
|
let
|
|
|
|
|
# publisher node
|
2023-02-13 10:43:49 +00:00
|
|
|
|
nodeKey1 = generateSecp256k1Key()
|
2023-12-14 06:16:39 +00:00
|
|
|
|
node1 = newTestWakuNode(nodeKey1, parseIpAddress("0.0.0.0"), Port(0))
|
2022-09-12 12:51:52 +00:00
|
|
|
|
# Relay node
|
2023-02-13 10:43:49 +00:00
|
|
|
|
nodeKey2 = generateSecp256k1Key()
|
2023-12-14 06:16:39 +00:00
|
|
|
|
node2 = newTestWakuNode(nodeKey2, parseIpAddress("0.0.0.0"), Port(0))
|
2022-09-12 12:51:52 +00:00
|
|
|
|
# Subscriber
|
2023-02-13 10:43:49 +00:00
|
|
|
|
nodeKey3 = generateSecp256k1Key()
|
2023-12-14 06:16:39 +00:00
|
|
|
|
node3 = newTestWakuNode(nodeKey3, parseIpAddress("0.0.0.0"), Port(0))
|
2022-09-12 12:51:52 +00:00
|
|
|
|
|
2024-09-10 21:07:12 +00:00
|
|
|
|
shard = DefaultRelayShard
|
2022-09-12 12:51:52 +00:00
|
|
|
|
contentTopic1 = ContentTopic("/waku/2/default-content/proto")
|
|
|
|
|
payload = "hello world".toBytes()
|
|
|
|
|
message1 = WakuMessage(payload: payload, contentTopic: contentTopic1)
|
|
|
|
|
|
|
|
|
|
payload2 = "you should not see this message!".toBytes()
|
|
|
|
|
contentTopic2 = ContentTopic("2")
|
|
|
|
|
message2 = WakuMessage(payload: payload2, contentTopic: contentTopic2)
|
|
|
|
|
|
|
|
|
|
# start all the nodes
|
|
|
|
|
await node1.start()
|
2024-09-10 21:07:12 +00:00
|
|
|
|
await node1.mountRelay(@[shard])
|
2022-09-12 12:51:52 +00:00
|
|
|
|
|
|
|
|
|
await node2.start()
|
2024-09-10 21:07:12 +00:00
|
|
|
|
await node2.mountRelay(@[shard])
|
2022-09-12 12:51:52 +00:00
|
|
|
|
|
|
|
|
|
await node3.start()
|
2024-09-10 21:07:12 +00:00
|
|
|
|
await node3.mountRelay(@[shard])
|
2022-09-12 12:51:52 +00:00
|
|
|
|
|
|
|
|
|
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
|
|
|
|
|
await node3.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
|
|
|
|
|
|
|
|
|
|
var completionFutValidatorAcc = newFuture[bool]()
|
|
|
|
|
var completionFutValidatorRej = newFuture[bool]()
|
|
|
|
|
|
2023-02-10 14:17:50 +00:00
|
|
|
|
# set a topic validator for pubSubTopic
|
2024-03-15 23:08:47 +00:00
|
|
|
|
proc validator(
|
|
|
|
|
topic: string, msg: WakuMessage
|
|
|
|
|
): Future[ValidationResult] {.async.} =
|
2022-09-12 12:51:52 +00:00
|
|
|
|
## the validator that only allows messages with contentTopic1 to be relayed
|
|
|
|
|
check:
|
2024-09-10 21:07:12 +00:00
|
|
|
|
topic == $shard
|
2023-02-10 14:17:50 +00:00
|
|
|
|
|
|
|
|
|
# only relay messages with contentTopic1
|
2024-03-15 23:08:47 +00:00
|
|
|
|
if msg.contentTopic != contentTopic1:
|
2023-02-10 14:17:50 +00:00
|
|
|
|
completionFutValidatorRej.complete(true)
|
|
|
|
|
return ValidationResult.Reject
|
|
|
|
|
|
|
|
|
|
completionFutValidatorAcc.complete(true)
|
|
|
|
|
return ValidationResult.Accept
|
|
|
|
|
|
2024-02-01 17:16:10 +00:00
|
|
|
|
node2.wakuRelay.addValidator(validator)
|
2022-09-12 12:51:52 +00:00
|
|
|
|
|
|
|
|
|
var completionFut = newFuture[bool]()
|
2024-03-15 23:08:47 +00:00
|
|
|
|
proc relayHandler(
|
|
|
|
|
topic: PubsubTopic, msg: WakuMessage
|
|
|
|
|
): Future[void] {.async, gcsafe.} =
|
2023-06-06 17:28:47 +00:00
|
|
|
|
check:
|
2024-09-10 21:07:12 +00:00
|
|
|
|
topic == $shard
|
2023-06-06 17:28:47 +00:00
|
|
|
|
# check that only messages with contentTopic1 is relayed (but not contentTopic2)
|
|
|
|
|
msg.contentTopic == contentTopic1
|
2022-09-12 12:51:52 +00:00
|
|
|
|
# relay handler is called
|
|
|
|
|
completionFut.complete(true)
|
|
|
|
|
|
2024-09-10 21:07:12 +00:00
|
|
|
|
node3.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler))
|
2022-09-12 12:51:52 +00:00
|
|
|
|
await sleepAsync(500.millis)
|
|
|
|
|
|
2024-09-10 21:07:12 +00:00
|
|
|
|
var res = await node1.publish(some($shard), message1)
|
2024-01-18 12:49:13 +00:00
|
|
|
|
assert res.isOk(), $res.error
|
2024-03-15 23:08:47 +00:00
|
|
|
|
|
2022-09-12 12:51:52 +00:00
|
|
|
|
await sleepAsync(500.millis)
|
|
|
|
|
|
|
|
|
|
# message2 never gets relayed because of the validator
|
2024-09-10 21:07:12 +00:00
|
|
|
|
res = await node1.publish(some($shard), message2)
|
2024-01-18 12:49:13 +00:00
|
|
|
|
assert res.isOk(), $res.error
|
2024-03-15 23:08:47 +00:00
|
|
|
|
|
2022-09-12 12:51:52 +00:00
|
|
|
|
await sleepAsync(500.millis)
|
|
|
|
|
|
|
|
|
|
check:
|
|
|
|
|
(await completionFut.withTimeout(10.seconds)) == true
|
|
|
|
|
# check that validator is called for message1
|
|
|
|
|
(await completionFutValidatorAcc.withTimeout(10.seconds)) == true
|
|
|
|
|
# check that validator is called for message2
|
|
|
|
|
(await completionFutValidatorRej.withTimeout(10.seconds)) == true
|
|
|
|
|
|
|
|
|
|
await allFutures(node1.stop(), node2.stop(), node3.stop())
|
2023-02-07 16:42:17 +00:00
|
|
|
|
|
2023-02-10 14:17:50 +00:00
|
|
|
|
# TODO: Add a function to validate the WakuMessage integrity
|
|
|
|
|
xasyncTest "Stats of peer sending wrong WakuMessages are updated":
|
2023-02-07 16:42:17 +00:00
|
|
|
|
# Create 2 nodes
|
2024-03-15 23:08:47 +00:00
|
|
|
|
let nodes = toSeq(0 .. 1).mapIt(
|
|
|
|
|
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
|
|
|
|
|
)
|
2023-02-07 16:42:17 +00:00
|
|
|
|
|
|
|
|
|
# Start all the nodes and mount relay with
|
|
|
|
|
await allFutures(nodes.mapIt(it.start()))
|
|
|
|
|
await allFutures(nodes.mapIt(it.mountRelay()))
|
|
|
|
|
|
|
|
|
|
# Connect nodes
|
2024-10-29 16:37:07 +00:00
|
|
|
|
let connOk = await nodes[0].peerManager.connectPeer(
|
2024-03-15 23:08:47 +00:00
|
|
|
|
nodes[1].switch.peerInfo.toRemotePeerInfo()
|
|
|
|
|
)
|
2023-03-28 11:29:48 +00:00
|
|
|
|
require:
|
|
|
|
|
connOk == true
|
2023-02-07 16:42:17 +00:00
|
|
|
|
|
|
|
|
|
# Node 1 subscribes to topic
|
2023-09-26 11:33:52 +00:00
|
|
|
|
nodes[1].subscribe((kind: PubsubSub, topic: DefaultPubsubTopic))
|
2023-02-07 16:42:17 +00:00
|
|
|
|
await sleepAsync(500.millis)
|
|
|
|
|
|
|
|
|
|
# Node 0 publishes 5 messages not compliant with WakuMessage (aka random bytes)
|
2024-03-15 23:08:47 +00:00
|
|
|
|
for i in 0 .. 4:
|
|
|
|
|
discard
|
|
|
|
|
await nodes[0].wakuRelay.publish(DefaultPubsubTopic, urandom(1 * (10 ^ 2)))
|
2023-02-07 16:42:17 +00:00
|
|
|
|
|
|
|
|
|
# Wait for gossip
|
|
|
|
|
await sleepAsync(500.millis)
|
|
|
|
|
|
|
|
|
|
# Verify that node 1 has received 5 invalid messages from node 0
|
|
|
|
|
# meaning that message validity is enforced to gossip messages
|
|
|
|
|
var peerStats = nodes[1].wakuRelay.peerStats
|
|
|
|
|
check:
|
2024-03-15 23:08:47 +00:00
|
|
|
|
peerStats[nodes[0].switch.peerInfo.peerId].topicInfos[DefaultPubsubTopic].invalidMessageDeliveries ==
|
|
|
|
|
5.0
|
2023-02-07 16:42:17 +00:00
|
|
|
|
|
|
|
|
|
await allFutures(nodes.mapIt(it.stop()))
|
|
|
|
|
|
2022-09-12 12:51:52 +00:00
|
|
|
|
asyncTest "Messages are relayed between two websocket nodes":
|
|
|
|
|
let
|
2023-02-13 10:43:49 +00:00
|
|
|
|
nodeKey1 = generateSecp256k1Key()
|
2024-03-15 23:08:47 +00:00
|
|
|
|
node1 = newTestWakuNode(
|
|
|
|
|
nodeKey1,
|
|
|
|
|
parseIpAddress("0.0.0.0"),
|
|
|
|
|
bindPort = Port(0),
|
|
|
|
|
wsBindPort = Port(0),
|
|
|
|
|
wsEnabled = true,
|
|
|
|
|
)
|
2023-02-13 10:43:49 +00:00
|
|
|
|
nodeKey2 = generateSecp256k1Key()
|
2024-03-15 23:08:47 +00:00
|
|
|
|
node2 = newTestWakuNode(
|
|
|
|
|
nodeKey2,
|
|
|
|
|
parseIpAddress("0.0.0.0"),
|
|
|
|
|
bindPort = Port(0),
|
|
|
|
|
wsBindPort = Port(0),
|
|
|
|
|
wsEnabled = true,
|
|
|
|
|
)
|
2024-09-10 21:07:12 +00:00
|
|
|
|
shard = DefaultRelayShard
|
2022-09-12 12:51:52 +00:00
|
|
|
|
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
|
|
|
|
payload = "hello world".toBytes()
|
|
|
|
|
message = WakuMessage(payload: payload, contentTopic: contentTopic)
|
|
|
|
|
|
|
|
|
|
await node1.start()
|
2024-09-10 21:07:12 +00:00
|
|
|
|
await node1.mountRelay(@[shard])
|
2022-09-12 12:51:52 +00:00
|
|
|
|
|
|
|
|
|
await node2.start()
|
2024-09-10 21:07:12 +00:00
|
|
|
|
await node2.mountRelay(@[shard])
|
2022-09-12 12:51:52 +00:00
|
|
|
|
|
|
|
|
|
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
|
|
|
|
|
|
|
|
|
|
var completionFut = newFuture[bool]()
|
2024-03-15 23:08:47 +00:00
|
|
|
|
proc relayHandler(
|
|
|
|
|
topic: PubsubTopic, msg: WakuMessage
|
|
|
|
|
): Future[void] {.async, gcsafe.} =
|
2023-06-06 17:28:47 +00:00
|
|
|
|
check:
|
2024-09-10 21:07:12 +00:00
|
|
|
|
topic == $shard
|
2023-06-06 17:28:47 +00:00
|
|
|
|
msg.contentTopic == contentTopic
|
|
|
|
|
msg.payload == payload
|
2022-09-12 12:51:52 +00:00
|
|
|
|
completionFut.complete(true)
|
|
|
|
|
|
2024-09-10 21:07:12 +00:00
|
|
|
|
node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler))
|
2022-09-12 12:51:52 +00:00
|
|
|
|
await sleepAsync(500.millis)
|
|
|
|
|
|
2024-09-10 21:07:12 +00:00
|
|
|
|
let res = await node2.publish(some($shard), message)
|
2024-01-18 12:49:13 +00:00
|
|
|
|
assert res.isOk(), $res.error
|
2022-09-12 12:51:52 +00:00
|
|
|
|
|
2024-03-15 23:08:47 +00:00
|
|
|
|
await sleepAsync(500.millis)
|
2022-09-12 12:51:52 +00:00
|
|
|
|
|
|
|
|
|
check:
|
|
|
|
|
(await completionFut.withTimeout(5.seconds)) == true
|
|
|
|
|
await node1.stop()
|
|
|
|
|
await node2.stop()
|
|
|
|
|
|
|
|
|
|
asyncTest "Messages are relayed between nodes with multiple transports (TCP and Websockets)":
|
|
|
|
|
let
|
2023-02-13 10:43:49 +00:00
|
|
|
|
nodeKey1 = generateSecp256k1Key()
|
2024-03-15 23:08:47 +00:00
|
|
|
|
node1 = newTestWakuNode(
|
|
|
|
|
nodeKey1,
|
|
|
|
|
parseIpAddress("0.0.0.0"),
|
|
|
|
|
bindPort = Port(0),
|
|
|
|
|
wsBindPort = Port(0),
|
|
|
|
|
wsEnabled = true,
|
|
|
|
|
)
|
2023-02-13 10:43:49 +00:00
|
|
|
|
nodeKey2 = generateSecp256k1Key()
|
2024-03-15 23:08:47 +00:00
|
|
|
|
node2 = newTestWakuNode(nodeKey2, parseIpAddress("0.0.0.0"), bindPort = Port(0))
|
2024-09-10 21:07:12 +00:00
|
|
|
|
shard = DefaultRelayShard
|
2022-09-12 12:51:52 +00:00
|
|
|
|
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
|
|
|
|
payload = "hello world".toBytes()
|
|
|
|
|
message = WakuMessage(payload: payload, contentTopic: contentTopic)
|
|
|
|
|
|
|
|
|
|
await node1.start()
|
2024-09-10 21:07:12 +00:00
|
|
|
|
await node1.mountRelay(@[shard])
|
2022-09-12 12:51:52 +00:00
|
|
|
|
|
|
|
|
|
await node2.start()
|
2024-09-10 21:07:12 +00:00
|
|
|
|
await node2.mountRelay(@[shard])
|
2022-09-12 12:51:52 +00:00
|
|
|
|
|
|
|
|
|
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
|
|
|
|
|
|
|
|
|
|
var completionFut = newFuture[bool]()
|
2024-03-15 23:08:47 +00:00
|
|
|
|
proc relayHandler(
|
|
|
|
|
topic: PubsubTopic, msg: WakuMessage
|
|
|
|
|
): Future[void] {.async, gcsafe.} =
|
2023-06-06 17:28:47 +00:00
|
|
|
|
check:
|
2024-09-10 21:07:12 +00:00
|
|
|
|
topic == $shard
|
2023-06-06 17:28:47 +00:00
|
|
|
|
msg.contentTopic == contentTopic
|
|
|
|
|
msg.payload == payload
|
2022-09-12 12:51:52 +00:00
|
|
|
|
completionFut.complete(true)
|
|
|
|
|
|
2024-09-10 21:07:12 +00:00
|
|
|
|
node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler))
|
2022-09-12 12:51:52 +00:00
|
|
|
|
await sleepAsync(500.millis)
|
|
|
|
|
|
2024-09-10 21:07:12 +00:00
|
|
|
|
let res = await node2.publish(some($shard), message)
|
2024-01-18 12:49:13 +00:00
|
|
|
|
assert res.isOk(), $res.error
|
|
|
|
|
|
2022-09-12 12:51:52 +00:00
|
|
|
|
await sleepAsync(500.millis)
|
|
|
|
|
|
|
|
|
|
check:
|
|
|
|
|
(await completionFut.withTimeout(5.seconds)) == true
|
|
|
|
|
await node1.stop()
|
|
|
|
|
await node2.stop()
|
|
|
|
|
|
|
|
|
|
asyncTest "Messages relaying fails with non-overlapping transports (TCP or Websockets)":
|
|
|
|
|
let
|
2023-02-13 10:43:49 +00:00
|
|
|
|
nodeKey1 = generateSecp256k1Key()
|
2024-03-15 23:08:47 +00:00
|
|
|
|
node1 = newTestWakuNode(nodeKey1, parseIpAddress("0.0.0.0"), bindPort = Port(0))
|
2023-02-13 10:43:49 +00:00
|
|
|
|
nodeKey2 = generateSecp256k1Key()
|
2024-03-15 23:08:47 +00:00
|
|
|
|
node2 = newTestWakuNode(
|
|
|
|
|
nodeKey2,
|
|
|
|
|
parseIpAddress("0.0.0.0"),
|
|
|
|
|
bindPort = Port(0),
|
|
|
|
|
wsBindPort = Port(0),
|
|
|
|
|
wsEnabled = true,
|
|
|
|
|
)
|
2024-09-10 21:07:12 +00:00
|
|
|
|
shard = DefaultRelayShard
|
2022-09-12 12:51:52 +00:00
|
|
|
|
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
|
|
|
|
payload = "hello world".toBytes()
|
|
|
|
|
message = WakuMessage(payload: payload, contentTopic: contentTopic)
|
|
|
|
|
|
|
|
|
|
await node1.start()
|
2024-09-10 21:07:12 +00:00
|
|
|
|
await node1.mountRelay(@[shard])
|
2022-09-12 12:51:52 +00:00
|
|
|
|
|
|
|
|
|
await node2.start()
|
2024-09-10 21:07:12 +00:00
|
|
|
|
await node2.mountRelay(@[shard])
|
2022-09-12 12:51:52 +00:00
|
|
|
|
|
|
|
|
|
#delete websocket peer address
|
|
|
|
|
# TODO: a better way to find the index - this is too brittle
|
2022-10-28 09:51:46 +00:00
|
|
|
|
node2.switch.peerInfo.listenAddrs.delete(0)
|
2022-09-12 12:51:52 +00:00
|
|
|
|
|
|
|
|
|
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
|
|
|
|
|
|
|
|
|
|
var completionFut = newFuture[bool]()
|
2024-03-15 23:08:47 +00:00
|
|
|
|
proc relayHandler(
|
|
|
|
|
topic: PubsubTopic, msg: WakuMessage
|
|
|
|
|
): Future[void] {.async, gcsafe.} =
|
2023-06-06 17:28:47 +00:00
|
|
|
|
check:
|
2024-09-10 21:07:12 +00:00
|
|
|
|
topic == $shard
|
2023-06-06 17:28:47 +00:00
|
|
|
|
msg.contentTopic == contentTopic
|
|
|
|
|
msg.payload == payload
|
2022-09-12 12:51:52 +00:00
|
|
|
|
completionFut.complete(true)
|
|
|
|
|
|
2024-09-10 21:07:12 +00:00
|
|
|
|
node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler))
|
2022-09-12 12:51:52 +00:00
|
|
|
|
await sleepAsync(500.millis)
|
|
|
|
|
|
2024-09-10 21:07:12 +00:00
|
|
|
|
let res = await node2.publish(some($shard), message)
|
2024-01-18 12:49:13 +00:00
|
|
|
|
assert res.isOk(), $res.error
|
|
|
|
|
|
2022-09-12 12:51:52 +00:00
|
|
|
|
await sleepAsync(500.millis)
|
|
|
|
|
|
|
|
|
|
check:
|
|
|
|
|
(await completionFut.withTimeout(5.seconds)) == false
|
|
|
|
|
|
|
|
|
|
await allFutures(node1.stop(), node2.stop())
|
|
|
|
|
|
|
|
|
|
asyncTest "Messages are relayed between nodes with multiple transports (TCP and secure Websockets)":
|
|
|
|
|
let
|
2023-02-13 10:43:49 +00:00
|
|
|
|
nodeKey1 = generateSecp256k1Key()
|
2024-03-15 23:08:47 +00:00
|
|
|
|
node1 = newTestWakuNode(
|
|
|
|
|
nodeKey1,
|
|
|
|
|
parseIpAddress("0.0.0.0"),
|
|
|
|
|
bindPort = Port(0),
|
|
|
|
|
wsBindPort = Port(0),
|
|
|
|
|
wssEnabled = true,
|
|
|
|
|
secureKey = KEY_PATH,
|
|
|
|
|
secureCert = CERT_PATH,
|
|
|
|
|
)
|
2023-02-13 10:43:49 +00:00
|
|
|
|
nodeKey2 = generateSecp256k1Key()
|
2024-03-15 23:08:47 +00:00
|
|
|
|
node2 = newTestWakuNode(nodeKey2, parseIpAddress("0.0.0.0"), bindPort = Port(0))
|
2024-09-10 21:07:12 +00:00
|
|
|
|
shard = DefaultRelayShard
|
2022-09-12 12:51:52 +00:00
|
|
|
|
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
|
|
|
|
payload = "hello world".toBytes()
|
|
|
|
|
message = WakuMessage(payload: payload, contentTopic: contentTopic)
|
|
|
|
|
|
|
|
|
|
await node1.start()
|
2024-09-10 21:07:12 +00:00
|
|
|
|
await node1.mountRelay(@[shard])
|
2022-09-12 12:51:52 +00:00
|
|
|
|
|
|
|
|
|
await node2.start()
|
2024-09-10 21:07:12 +00:00
|
|
|
|
await node2.mountRelay(@[shard])
|
2022-09-12 12:51:52 +00:00
|
|
|
|
|
|
|
|
|
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
|
|
|
|
|
|
|
|
|
|
var completionFut = newFuture[bool]()
|
2024-03-15 23:08:47 +00:00
|
|
|
|
proc relayHandler(
|
|
|
|
|
topic: PubsubTopic, msg: WakuMessage
|
|
|
|
|
): Future[void] {.async, gcsafe.} =
|
2023-06-06 17:28:47 +00:00
|
|
|
|
check:
|
2024-09-10 21:07:12 +00:00
|
|
|
|
topic == $shard
|
2023-06-06 17:28:47 +00:00
|
|
|
|
msg.contentTopic == contentTopic
|
|
|
|
|
msg.payload == payload
|
2022-09-12 12:51:52 +00:00
|
|
|
|
completionFut.complete(true)
|
|
|
|
|
|
2024-09-10 21:07:12 +00:00
|
|
|
|
node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler))
|
2022-09-12 12:51:52 +00:00
|
|
|
|
await sleepAsync(500.millis)
|
|
|
|
|
|
2024-09-10 21:07:12 +00:00
|
|
|
|
let res = await node2.publish(some($shard), message)
|
2024-01-18 12:49:13 +00:00
|
|
|
|
assert res.isOk(), $res.error
|
|
|
|
|
|
2022-09-12 12:51:52 +00:00
|
|
|
|
await sleepAsync(500.millis)
|
|
|
|
|
|
|
|
|
|
check:
|
|
|
|
|
(await completionFut.withTimeout(5.seconds)) == true
|
2023-02-07 16:42:17 +00:00
|
|
|
|
|
2022-09-12 12:51:52 +00:00
|
|
|
|
await allFutures(node1.stop(), node2.stop())
|
|
|
|
|
|
|
|
|
|
asyncTest "Messages are relayed between nodes with multiple transports (websocket and secure Websockets)":
|
|
|
|
|
let
|
2023-02-13 10:43:49 +00:00
|
|
|
|
nodeKey1 = generateSecp256k1Key()
|
2024-03-15 23:08:47 +00:00
|
|
|
|
node1 = newTestWakuNode(
|
|
|
|
|
nodeKey1,
|
|
|
|
|
parseIpAddress("0.0.0.0"),
|
|
|
|
|
bindPort = Port(0),
|
|
|
|
|
wsBindPort = Port(0),
|
|
|
|
|
wssEnabled = true,
|
|
|
|
|
secureKey = KEY_PATH,
|
|
|
|
|
secureCert = CERT_PATH,
|
|
|
|
|
)
|
2023-02-13 10:43:49 +00:00
|
|
|
|
nodeKey2 = generateSecp256k1Key()
|
2024-03-15 23:08:47 +00:00
|
|
|
|
node2 = newTestWakuNode(
|
|
|
|
|
nodeKey2,
|
|
|
|
|
parseIpAddress("0.0.0.0"),
|
|
|
|
|
bindPort = Port(0),
|
|
|
|
|
wsBindPort = Port(0),
|
|
|
|
|
wsEnabled = true,
|
|
|
|
|
)
|
2023-02-07 16:42:17 +00:00
|
|
|
|
|
2022-09-12 12:51:52 +00:00
|
|
|
|
let
|
2024-09-10 21:07:12 +00:00
|
|
|
|
shard = DefaultRelayShard
|
2022-09-12 12:51:52 +00:00
|
|
|
|
contentTopic = ContentTopic("/waku/2/default-content/proto")
|
|
|
|
|
payload = "hello world".toBytes()
|
|
|
|
|
message = WakuMessage(payload: payload, contentTopic: contentTopic)
|
|
|
|
|
|
|
|
|
|
await node1.start()
|
2024-09-10 21:07:12 +00:00
|
|
|
|
await node1.mountRelay(@[shard])
|
2022-09-12 12:51:52 +00:00
|
|
|
|
|
|
|
|
|
await node2.start()
|
2024-09-10 21:07:12 +00:00
|
|
|
|
await node2.mountRelay(@[shard])
|
2022-09-12 12:51:52 +00:00
|
|
|
|
|
|
|
|
|
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
|
|
|
|
|
|
|
|
|
|
var completionFut = newFuture[bool]()
|
2024-03-15 23:08:47 +00:00
|
|
|
|
proc relayHandler(
|
|
|
|
|
topic: PubsubTopic, msg: WakuMessage
|
|
|
|
|
): Future[void] {.async, gcsafe.} =
|
2023-06-06 17:28:47 +00:00
|
|
|
|
check:
|
2024-09-10 21:07:12 +00:00
|
|
|
|
topic == $shard
|
2023-06-06 17:28:47 +00:00
|
|
|
|
msg.contentTopic == contentTopic
|
|
|
|
|
msg.payload == payload
|
2022-09-12 12:51:52 +00:00
|
|
|
|
completionFut.complete(true)
|
|
|
|
|
|
2024-09-10 21:07:12 +00:00
|
|
|
|
node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler))
|
2022-09-12 12:51:52 +00:00
|
|
|
|
await sleepAsync(500.millis)
|
|
|
|
|
|
2024-09-10 21:07:12 +00:00
|
|
|
|
let res = await node2.publish(some($shard), message)
|
2024-01-18 12:49:13 +00:00
|
|
|
|
assert res.isOk(), $res.error
|
2022-09-12 12:51:52 +00:00
|
|
|
|
|
2024-03-15 23:08:47 +00:00
|
|
|
|
await sleepAsync(500.millis)
|
2022-09-12 12:51:52 +00:00
|
|
|
|
|
|
|
|
|
check:
|
|
|
|
|
(await completionFut.withTimeout(5.seconds)) == true
|
|
|
|
|
await node1.stop()
|
|
|
|
|
await node2.stop()
|
2023-06-06 17:28:47 +00:00
|
|
|
|
|
|
|
|
|
asyncTest "Bad peers with low reputation are disconnected":
|
|
|
|
|
# Create 5 nodes
|
2024-03-15 23:08:47 +00:00
|
|
|
|
let nodes = toSeq(0 ..< 5).mapIt(
|
|
|
|
|
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
|
|
|
|
|
)
|
2023-06-06 17:28:47 +00:00
|
|
|
|
await allFutures(nodes.mapIt(it.start()))
|
|
|
|
|
await allFutures(nodes.mapIt(it.mountRelay()))
|
|
|
|
|
|
|
|
|
|
# subscribe all nodes to a topic
|
|
|
|
|
let topic = "topic"
|
2024-03-15 23:08:47 +00:00
|
|
|
|
for node in nodes:
|
|
|
|
|
discard node.wakuRelay.subscribe(topic, nil)
|
2023-06-06 17:28:47 +00:00
|
|
|
|
await sleepAsync(500.millis)
|
|
|
|
|
|
|
|
|
|
# connect nodes in full mesh
|
2024-03-15 23:08:47 +00:00
|
|
|
|
for i in 0 ..< 5:
|
|
|
|
|
for j in 0 ..< 5:
|
2023-06-06 17:28:47 +00:00
|
|
|
|
if i == j:
|
|
|
|
|
continue
|
2024-10-29 16:37:07 +00:00
|
|
|
|
let connOk = await nodes[i].peerManager.connectPeer(
|
2024-03-15 23:08:47 +00:00
|
|
|
|
nodes[j].switch.peerInfo.toRemotePeerInfo()
|
|
|
|
|
)
|
2023-06-06 17:28:47 +00:00
|
|
|
|
require connOk
|
|
|
|
|
|
|
|
|
|
# connection triggers different actions, wait for them
|
|
|
|
|
await sleepAsync(1.seconds)
|
|
|
|
|
|
|
|
|
|
# all peers are connected in a mesh, 4 conns each
|
2024-03-15 23:08:47 +00:00
|
|
|
|
for i in 0 ..< 5:
|
2023-06-06 17:28:47 +00:00
|
|
|
|
check:
|
|
|
|
|
nodes[i].peerManager.switch.connManager.getConnections().len == 4
|
|
|
|
|
|
|
|
|
|
# node[0] publishes wrong messages (random bytes not decoding into WakuMessage)
|
2024-03-15 23:08:47 +00:00
|
|
|
|
for j in 0 ..< 50:
|
|
|
|
|
discard await nodes[0].wakuRelay.publish(topic, urandom(1 * (10 ^ 3)))
|
2023-06-06 17:28:47 +00:00
|
|
|
|
|
|
|
|
|
# long wait, must be higher than the configured decayInterval (how often score is updated)
|
|
|
|
|
await sleepAsync(20.seconds)
|
|
|
|
|
|
|
|
|
|
# all nodes lower the score of nodes[0] (will change if gossipsub params or amount of msg changes)
|
2024-03-15 23:08:47 +00:00
|
|
|
|
for i in 1 ..< 5:
|
2023-06-06 17:28:47 +00:00
|
|
|
|
check:
|
|
|
|
|
nodes[i].wakuRelay.peerStats[nodes[0].switch.peerInfo.peerId].score == -249999.9
|
|
|
|
|
|
|
|
|
|
# nodes[0] was blacklisted from all other peers, no connections
|
|
|
|
|
check:
|
|
|
|
|
nodes[0].peerManager.switch.connManager.getConnections().len == 0
|
|
|
|
|
|
|
|
|
|
# the rest of the nodes now have 1 conn less (kicked nodes[0] out)
|
2024-03-15 23:08:47 +00:00
|
|
|
|
for i in 1 ..< 5:
|
2023-06-06 17:28:47 +00:00
|
|
|
|
check:
|
|
|
|
|
nodes[i].peerManager.switch.connManager.getConnections().len == 3
|
|
|
|
|
|
|
|
|
|
# Stop all nodes
|
|
|
|
|
await allFutures(nodes.mapIt(it.stop()))
|
2023-09-26 11:33:52 +00:00
|
|
|
|
|
|
|
|
|
asyncTest "Unsubscribe keep the subscription if other content topics also use the shard":
|
|
|
|
|
## Setup
|
|
|
|
|
let
|
|
|
|
|
nodeKey = generateSecp256k1Key()
|
2023-12-14 06:16:39 +00:00
|
|
|
|
node = newTestWakuNode(nodeKey, parseIpAddress("0.0.0.0"), Port(0))
|
2023-09-26 11:33:52 +00:00
|
|
|
|
|
|
|
|
|
await node.start()
|
|
|
|
|
await node.mountRelay()
|
2024-03-13 09:58:13 +00:00
|
|
|
|
require node.mountSharding(1, 1).isOk
|
2023-09-26 11:33:52 +00:00
|
|
|
|
|
|
|
|
|
## Given
|
|
|
|
|
let
|
2024-03-13 09:58:13 +00:00
|
|
|
|
shard = "/waku/2/rs/1/0"
|
2023-09-26 11:33:52 +00:00
|
|
|
|
contentTopicA = DefaultContentTopic
|
|
|
|
|
contentTopicB = ContentTopic("/waku/2/default-content1/proto")
|
|
|
|
|
contentTopicC = ContentTopic("/waku/2/default-content2/proto")
|
2024-03-15 23:08:47 +00:00
|
|
|
|
handler: WakuRelayHandler = proc(
|
|
|
|
|
pubsubTopic: PubsubTopic, message: WakuMessage
|
|
|
|
|
): Future[void] {.gcsafe, raises: [Defect].} =
|
|
|
|
|
discard pubsubTopic
|
|
|
|
|
discard message
|
|
|
|
|
assert shard == node.wakuSharding.getShard(contentTopicA).expect("Valid Topic"),
|
|
|
|
|
"topic must use the same shard"
|
|
|
|
|
assert shard == node.wakuSharding.getShard(contentTopicB).expect("Valid Topic"),
|
|
|
|
|
"topic must use the same shard"
|
|
|
|
|
assert shard == node.wakuSharding.getShard(contentTopicC).expect("Valid Topic"),
|
|
|
|
|
"topic must use the same shard"
|
2023-09-26 11:33:52 +00:00
|
|
|
|
|
|
|
|
|
## When
|
|
|
|
|
node.subscribe((kind: ContentSub, topic: contentTopicA), some(handler))
|
|
|
|
|
node.subscribe((kind: ContentSub, topic: contentTopicB), some(handler))
|
|
|
|
|
node.subscribe((kind: ContentSub, topic: contentTopicC), some(handler))
|
|
|
|
|
|
|
|
|
|
## Then
|
|
|
|
|
node.unsubscribe((kind: ContentUnsub, topic: contentTopicB))
|
|
|
|
|
check node.wakuRelay.isSubscribed(shard)
|
|
|
|
|
|
|
|
|
|
node.unsubscribe((kind: ContentUnsub, topic: contentTopicA))
|
|
|
|
|
check node.wakuRelay.isSubscribed(shard)
|
|
|
|
|
|
|
|
|
|
node.unsubscribe((kind: ContentUnsub, topic: contentTopicC))
|
|
|
|
|
check not node.wakuRelay.isSubscribed(shard)
|
|
|
|
|
|
|
|
|
|
## Cleanup
|
2024-03-15 23:08:47 +00:00
|
|
|
|
await node.stop()
|