
665 lines
22 KiB
Raw Normal View History

std/[sequtils, tempfiles],
2022-11-04 10:52:08 +01:00
stew/shims/net as stewNet,
2022-11-04 10:52:08 +01:00
from std/times import epochTime
procSuite "WakuNode - RLN relay":
# NOTE: we set the rlnRelayUserMessageLimit to 1 to make the tests easier to reason about
asyncTest "testing rln-relay with valid proof":
# publisher node
nodeKey1 = generateSecp256k1Key()
node1 = newTestWakuNode(nodeKey1, parseIpAddress(""), Port(0))
# Relay node
nodeKey2 = generateSecp256k1Key()
node2 = newTestWakuNode(nodeKey2, parseIpAddress(""), Port(0))
# Subscriber
nodeKey3 = generateSecp256k1Key()
node3 = newTestWakuNode(nodeKey3, parseIpAddress(""), Port(0))
contentTopic = ContentTopic("/waku/2/default-content/proto")
# set up three nodes
# node1
await node1.mountRelay(@[DefaultPubsubTopic])
# mount rlnrelay in off-chain mode
when defined(rln_v2):
let wakuRlnConfig1 = WakuRlnConfig(
rlnRelayDynamic: false,
rlnRelayCredIndex: some(1.uint),
rlnRelayUserMessageLimit: 1,
rlnEpochSizeSec: 1,
rlnRelayTreePath: genTempPath("rln_tree", "wakunode"),
let wakuRlnConfig1 = WakuRlnConfig(
rlnRelayDynamic: false,
rlnRelayCredIndex: some(1.uint),
rlnEpochSizeSec: 1,
rlnRelayTreePath: genTempPath("rln_tree", "wakunode"),
await node1.mountRlnRelay(wakuRlnConfig1)
await node1.start()
# node 2
await node2.mountRelay(@[DefaultPubsubTopic])
# mount rlnrelay in off-chain mode
when defined(rln_v2):
let wakuRlnConfig2 = WakuRlnConfig(
rlnRelayDynamic: false,
rlnRelayCredIndex: some(2.uint),
rlnRelayUserMessageLimit: 1,
rlnEpochSizeSec: 1,
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_2"),
let wakuRlnConfig2 = WakuRlnConfig(
rlnRelayDynamic: false,
rlnRelayCredIndex: some(2.uint),
rlnEpochSizeSec: 1,
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_2"),
await node2.mountRlnRelay(wakuRlnConfig2)
await node2.start()
# node 3
await node3.mountRelay(@[DefaultPubsubTopic])
when defined(rln_v2):
let wakuRlnConfig3 = WakuRlnConfig(
rlnRelayDynamic: false,
rlnRelayCredIndex: some(3.uint),
rlnRelayUserMessageLimit: 1,
rlnEpochSizeSec: 1,
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_3"),
let wakuRlnConfig3 = WakuRlnConfig(
rlnRelayDynamic: false,
rlnRelayCredIndex: some(3.uint),
rlnEpochSizeSec: 1,
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_3"),
await node3.mountRlnRelay(wakuRlnConfig3)
await node3.start()
# connect them together
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
await node3.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
var completionFut = newFuture[bool]()
proc relayHandler(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
debug "The received topic:", topic
if topic == DefaultPubsubTopic:
# mount the relay handler
node3.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler))
await sleepAsync(2000.millis)
# prepare the message payload
let payload = "Hello".toBytes()
# prepare the epoch
var message = WakuMessage(payload: @payload, contentTopic: contentTopic)
doAssert(node1.wakuRlnRelay.unsafeAppendRLNProof(message, epochTime()).isOk())
## node1 publishes a message with a rate limit proof, the message is then relayed to node2 which in turn
## verifies the rate limit proof of the message and relays the message to node3
## verification at node2 occurs inside a topic validator which is installed as part of the waku-rln-relay mount proc
discard await node1.publish(some(DefaultPubsubTopic), message)
await sleepAsync(2000.millis)
(await completionFut.withTimeout(10.seconds)) == true
await node1.stop()
await node2.stop()
await node3.stop()
asyncTest "testing rln-relay is applied in all rln pubsub/content topics":
# create 3 nodes
let nodes = toSeq(0 ..< 3).mapIt(
newTestWakuNode(generateSecp256k1Key(), parseIpAddress(""), Port(0))
await allFutures(nodes.mapIt(it.start()))
let pubsubTopics =
let contentTopics =
# set up three nodes
await allFutures(nodes.mapIt(it.mountRelay(pubsubTopics)))
# mount rlnrelay in off-chain mode
for index, node in nodes:
when defined(rln_v2):
let wakuRlnConfig = WakuRlnConfig(
rlnRelayDynamic: false,
rlnRelayCredIndex: some(index.uint + 1),
rlnRelayUserMessageLimit: 1,
rlnEpochSizeSec: 1,
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_" & $(index + 1)),
let wakuRlnConfig = WakuRlnConfig(
rlnRelayDynamic: false,
rlnRelayCredIndex: some(index.uint + 1),
rlnEpochSizeSec: 1,
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_" & $(index + 1)),
await node.mountRlnRelay(wakuRlnConfig)
# start them
await allFutures(nodes.mapIt(it.start()))
# connect them together
await nodes[0].connectToNodes(@[nodes[1].switch.peerInfo.toRemotePeerInfo()])
await nodes[2].connectToNodes(@[nodes[1].switch.peerInfo.toRemotePeerInfo()])
var rxMessagesTopic1 = 0
var rxMessagesTopic2 = 0
proc relayHandler(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
info "relayHandler. The received topic:", topic
if topic == pubsubTopics[0]:
rxMessagesTopic1 = rxMessagesTopic1 + 1
elif topic == pubsubTopics[1]:
rxMessagesTopic2 = rxMessagesTopic2 + 1
# mount the relay handlers
nodes[2].subscribe((kind: PubsubSub, topic: pubsubTopics[0]), some(relayHandler))
nodes[2].subscribe((kind: PubsubSub, topic: pubsubTopics[1]), some(relayHandler))
await sleepAsync(1000.millis)
2023-08-31 12:24:41 +02:00
# generate some messages with rln proofs first. generating
# the proof takes some time, so this is done before publishing
# to avoid blocking the test
var messages1: seq[WakuMessage] = @[]
var messages2: seq[WakuMessage] = @[]
let epochTime = epochTime()
for i in 0 ..< 3:
var message = WakuMessage(
payload: ("Payload_" & $i).toBytes(), contentTopic: contentTopics[0]
nodes[0].wakuRlnRelay.unsafeAppendRLNProof(message, epochTime).isOkOr:
raiseAssert $error
2023-08-31 12:24:41 +02:00
for i in 0 ..< 3:
var message = WakuMessage(
payload: ("Payload_" & $i).toBytes(), contentTopic: contentTopics[1]
nodes[1].wakuRlnRelay.unsafeAppendRLNProof(message, epochTime).isOkOr:
raiseAssert $error
2023-08-31 12:24:41 +02:00
# publish 3 messages from node[0] (last 2 are spam, window is 10 secs)
# publish 3 messages from node[1] (last 2 are spam, window is 10 secs)
for msg in messages1:
discard await nodes[0].publish(some(pubsubTopics[0]), msg)
for msg in messages2:
discard await nodes[1].publish(some(pubsubTopics[1]), msg)
# wait for gossip to propagate
2023-08-22 10:10:54 +02:00
await sleepAsync(5000.millis)
# check that node[2] got messages from both topics
2023-08-22 10:10:54 +02:00
# and that rln was applied (just 1 msg is rx, rest are spam)
rxMessagesTopic1 == 1
rxMessagesTopic2 == 1
await allFutures(nodes.mapIt(it.stop()))
asyncTest "testing rln-relay with invalid proof":
# publisher node
nodeKey1 = generateSecp256k1Key()
node1 = newTestWakuNode(nodeKey1, parseIpAddress(""), Port(0))
# Relay node
nodeKey2 = generateSecp256k1Key()
node2 = newTestWakuNode(nodeKey2, parseIpAddress(""), Port(0))
# Subscriber
nodeKey3 = generateSecp256k1Key()
node3 = newTestWakuNode(nodeKey3, parseIpAddress(""), Port(0))
contentTopic = ContentTopic("/waku/2/default-content/proto")
# set up three nodes
# node1
await node1.mountRelay(@[DefaultPubsubTopic])
# mount rlnrelay in off-chain mode
when defined(rln_v2):
let wakuRlnConfig1 = WakuRlnConfig(
rlnRelayDynamic: false,
rlnRelayCredIndex: some(1.uint),
rlnRelayUserMessageLimit: 1,
rlnEpochSizeSec: 1,
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_4"),
let wakuRlnConfig1 = WakuRlnConfig(
rlnRelayDynamic: false,
rlnRelayCredIndex: some(1.uint),
rlnEpochSizeSec: 1,
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_4"),
await node1.mountRlnRelay(wakuRlnConfig1)
await node1.start()
# node 2
await node2.mountRelay(@[DefaultPubsubTopic])
# mount rlnrelay in off-chain mode
when defined(rln_v2):
let wakuRlnConfig2 = WakuRlnConfig(
rlnRelayDynamic: false,
rlnRelayCredIndex: some(2.uint),
rlnRelayUserMessageLimit: 1,
rlnEpochSizeSec: 1,
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_5"),
let wakuRlnConfig2 = WakuRlnConfig(
rlnRelayDynamic: false,
rlnRelayCredIndex: some(2.uint),
rlnEpochSizeSec: 1,
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_5"),
await node2.mountRlnRelay(wakuRlnConfig2)
await node2.start()
# node 3
await node3.mountRelay(@[DefaultPubsubTopic])
when defined(rln_v2):
let wakuRlnConfig3 = WakuRlnConfig(
rlnRelayDynamic: false,
rlnRelayCredIndex: some(3.uint),
rlnRelayUserMessageLimit: 1,
rlnEpochSizeSec: 1,
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_6"),
let wakuRlnConfig3 = WakuRlnConfig(
rlnRelayDynamic: false,
rlnRelayCredIndex: some(3.uint),
rlnEpochSizeSec: 1,
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_6"),
await node3.mountRlnRelay(wakuRlnConfig3)
await node3.start()
# connect them together
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
await node3.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
# define a custom relay handler
var completionFut = newFuture[bool]()
proc relayHandler(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
debug "The received topic:", topic
if topic == DefaultPubsubTopic:
# mount the relay handler
node3.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler))
await sleepAsync(2000.millis)
# prepare the message payload
let payload = "Hello".toBytes()
# prepare the epoch
let epoch = node1.wakuRlnRelay.getCurrentEpoch()
# prepare the proof
contentTopicBytes = contentTopic.toBytes
input = concat(payload, contentTopicBytes)
extraBytes: seq[byte] = @[byte(1), 2, 3]
when defined(rln_v2):
let nonceManager = node1.wakuRlnRelay.nonceManager
let rateLimitProofRes = node1.wakuRlnRelay.groupManager.generateProof(
concat(input, extraBytes), epoch, MessageId(0)
let rateLimitProofRes = node1.wakuRlnRelay.groupManager.generateProof(
concat(input, extraBytes),
# we add extra bytes to invalidate proof verification against original payload
assert rateLimitProofRes.isOk(), $rateLimitProofRes.error
# check the proof is generated correctly outside when block to avoid duplication
let rateLimitProof = rateLimitProofRes.get().encode().buffer
let message =
WakuMessage(payload: @payload, contentTopic: contentTopic, proof: rateLimitProof)
## node1 publishes a message with an invalid rln proof, the message is then relayed to node2 which in turn
## attempts to verify the rate limit proof and fails hence does not relay the message to node3, thus the relayHandler of node3
## never gets called
## verification at node2 occurs inside a topic validator which is installed as part of the waku-rln-relay mount proc
discard await node1.publish(some(DefaultPubsubTopic), message)
await sleepAsync(2000.millis)
# the relayHandler of node3 never gets called
(await completionFut.withTimeout(10.seconds)) == false
await node1.stop()
await node2.stop()
await node3.stop()
asyncTest "testing rln-relay double-signaling detection":
# publisher node
nodeKey1 = generateSecp256k1Key()
node1 = newTestWakuNode(nodeKey1, parseIpAddress(""), Port(0))
# Relay node
nodeKey2 = generateSecp256k1Key()
node2 = newTestWakuNode(nodeKey2, parseIpAddress(""), Port(0))
# Subscriber
nodeKey3 = generateSecp256k1Key()
node3 = newTestWakuNode(nodeKey3, parseIpAddress(""), Port(0))
contentTopic = ContentTopic("/waku/2/default-content/proto")
# set up three nodes
# node1
await node1.mountRelay(@[DefaultPubsubTopic])
# mount rlnrelay in off-chain mode
when defined(rln_v2):
let wakuRlnConfig1 = WakuRlnConfig(
rlnRelayDynamic: false,
rlnRelayCredIndex: some(1.uint),
rlnRelayUserMessageLimit: 1,
rlnEpochSizeSec: 1,
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_7"),
let wakuRlnConfig1 = WakuRlnConfig(
rlnRelayDynamic: false,
rlnRelayCredIndex: some(1.uint),
rlnEpochSizeSec: 1,
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_7"),
await node1.mountRlnRelay(wakuRlnConfig1)
await node1.start()
# node 2
await node2.mountRelay(@[DefaultPubsubTopic])
# mount rlnrelay in off-chain mode
when defined(rln_v2):
let wakuRlnConfig2 = WakuRlnConfig(
rlnRelayDynamic: false,
rlnRelayCredIndex: some(2.uint),
rlnRelayUserMessageLimit: 1,
rlnEpochSizeSec: 1,
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_8"),
let wakuRlnConfig2 = WakuRlnConfig(
rlnRelayDynamic: false,
rlnRelayCredIndex: some(2.uint),
rlnEpochSizeSec: 1,
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_8"),
await node2.mountRlnRelay(wakuRlnConfig2)
await node2.start()
# node 3
await node3.mountRelay(@[DefaultPubsubTopic])
# mount rlnrelay in off-chain mode
when defined(rln_v2):
let wakuRlnConfig3 = WakuRlnConfig(
rlnRelayDynamic: false,
rlnRelayCredIndex: some(3.uint),
rlnRelayUserMessageLimit: 1,
rlnEpochSizeSec: 1,
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_9"),
let wakuRlnConfig3 = WakuRlnConfig(
rlnRelayDynamic: false,
rlnRelayCredIndex: some(3.uint),
rlnEpochSizeSec: 1,
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_9"),
await node3.mountRlnRelay(wakuRlnConfig3)
await node3.start()
# connect the nodes together node1 <-> node2 <-> node3
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
await node3.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
# get the current epoch time
let time = epochTime()
# create some messages with rate limit proofs
wm1 = WakuMessage(payload: "message 1".toBytes(), contentTopic: contentTopic)
# another message in the same epoch as wm1, it will break the messaging rate limit
wm2 = WakuMessage(payload: "message 2".toBytes(), contentTopic: contentTopic)
# wm3 points to the next epoch
wm3 = WakuMessage(payload: "message 3".toBytes(), contentTopic: contentTopic)
wm4 = WakuMessage(payload: "message 4".toBytes(), contentTopic: contentTopic)
node3.wakuRlnRelay.unsafeAppendRLNProof(wm1, time).isOkOr:
raiseAssert $error
node3.wakuRlnRelay.unsafeAppendRLNProof(wm2, time).isOkOr:
raiseAssert $error
wm3, time + float64(node3.wakuRlnRelay.rlnEpochSizeSec)
raiseAssert $error
# relay handler for node3
var completionFut1 = newFuture[bool]()
var completionFut2 = newFuture[bool]()
var completionFut3 = newFuture[bool]()
var completionFut4 = newFuture[bool]()
proc relayHandler(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
debug "The received topic:", topic
if topic == DefaultPubsubTopic:
if msg == wm1:
if msg == wm2:
if msg == wm3:
if msg == wm4:
# mount the relay handler for node3
node3.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler))
await sleepAsync(2000.millis)
## node1 publishes and relays 4 messages to node2
## verification at node2 occurs inside a topic validator which is installed as part of the waku-rln-relay mount proc
## node2 relays either of wm1 or wm2 to node3, depending on which message arrives at node2 first
## node2 should detect either of wm1 or wm2 as spam and not relay it
## node2 should relay wm3 to node3
## node2 should not relay wm4 because it has no valid rln proof
discard await node1.publish(some(DefaultPubsubTopic), wm1)
discard await node1.publish(some(DefaultPubsubTopic), wm2)
discard await node1.publish(some(DefaultPubsubTopic), wm3)
discard await node1.publish(some(DefaultPubsubTopic), wm4)
await sleepAsync(2000.millis)
res1 = await completionFut1.withTimeout(10.seconds)
res2 = await completionFut2.withTimeout(10.seconds)
(res1 and res2) == false
# either of the wm1 and wm2 is found as spam hence not relayed
(await completionFut3.withTimeout(10.seconds)) == true
(await completionFut4.withTimeout(10.seconds)) == false
await node1.stop()
await node2.stop()
await node3.stop()
asyncTest "clearNullifierLog: should clear epochs > MaxEpochGap":
# publisher node
nodeKey1 = generateSecp256k1Key()
node1 = newTestWakuNode(nodeKey1, parseIpAddress(""), Port(0))
# Relay node
nodeKey2 = generateSecp256k1Key()
node2 = newTestWakuNode(nodeKey2, parseIpAddress(""), Port(0))
# Subscriber
nodeKey3 = generateSecp256k1Key()
node3 = newTestWakuNode(nodeKey3, parseIpAddress(""), Port(0))
contentTopic = ContentTopic("/waku/2/default-content/proto")
# set up 2 nodes
# node1
await node1.mountRelay(@[DefaultPubsubTopic])
# mount rlnrelay in off-chain mode
when defined(rln_v2):
let wakuRlnConfig1 = WakuRlnConfig(
rlnRelayDynamic: false,
rlnRelayCredIndex: some(1.uint),
rlnRelayUserMessageLimit: 1,
rlnEpochSizeSec: 1,
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_10"),
let wakuRlnConfig1 = WakuRlnConfig(
rlnRelayDynamic: false,
rlnRelayCredIndex: some(1.uint),
rlnEpochSizeSec: 1,
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_10"),
await node1.mountRlnRelay(wakuRlnConfig1)
await node1.start()
# node 2
await node2.mountRelay(@[DefaultPubsubTopic])
# mount rlnrelay in off-chain mode
when defined(rln_v2):
let wakuRlnConfig2 = WakuRlnConfig(
rlnRelayDynamic: false,
rlnRelayCredIndex: some(2.uint),
rlnRelayUserMessageLimit: 1,
rlnEpochSizeSec: 1,
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_11"),
let wakuRlnConfig2 = WakuRlnConfig(
rlnRelayDynamic: false,
rlnRelayCredIndex: some(2.uint),
rlnEpochSizeSec: 1,
rlnRelayTreePath: genTempPath("rln_tree", "wakunode_11"),
await node2.mountRlnRelay(wakuRlnConfig2)
await node2.start()
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
# get the current epoch time
let time = epochTime()
# create some messages with rate limit proofs
wm1 = WakuMessage(payload: "message 1".toBytes(), contentTopic: contentTopic)
# another message in the same epoch as wm1, it will break the messaging rate limit
wm2 = WakuMessage(payload: "message 2".toBytes(), contentTopic: contentTopic)
# wm3 points to the next epoch
wm3 = WakuMessage(payload: "message 3".toBytes(), contentTopic: contentTopic)
node1.wakuRlnRelay.unsafeAppendRLNProof(wm1, time).isOkOr:
raiseAssert $error
node1.wakuRlnRelay.unsafeAppendRLNProof(wm2, time).isOkOr:
raiseAssert $error
wm3, time + float64(node1.wakuRlnRelay.rlnEpochSizeSec * 2)
raiseAssert $error
# relay handler for node2
var completionFut1 = newFuture[bool]()
var completionFut2 = newFuture[bool]()
var completionFut3 = newFuture[bool]()
proc relayHandler(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
debug "The received topic:", topic
if topic == DefaultPubsubTopic:
if msg == wm1:
if msg == wm2:
if msg == wm3:
# mount the relay handler for node2
node2.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler))
await sleepAsync(2000.millis)
discard await node1.publish(some(DefaultPubsubTopic), wm1)
discard await node1.publish(some(DefaultPubsubTopic), wm2)
discard await node1.publish(some(DefaultPubsubTopic), wm3)
res1 = await completionFut1.withTimeout(10.seconds)
res2 = await completionFut2.withTimeout(10.seconds)
res3 = await completionFut3.withTimeout(10.seconds)
res1 == true
res2 == false
res3 == true
node2.wakuRlnRelay.nullifierLog.len() == 2
await node1.stop()
await node2.stop()