fix: relay unsubscribe (#3422)

* waku_relay protocol fix unsubscribe and remove topic validator
* simplify subscription and avoid unneeded code
* tests adaptations
* call wakuRelay.subscribe only in one place within waku_node
This commit is contained in:
Ivan FB 2025-06-02 22:02:49 +02:00 committed by GitHub
parent 1632496a25
commit 9fc631e103
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 366 additions and 237 deletions

View File

@ -380,7 +380,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
if conf.relay:
let shards =
conf.shards.mapIt(RelayShard(clusterId: conf.clusterId, shardId: uint16(it)))
(await node.mountRelay(shards)).isOkOr:
(await node.mountRelay()).isOkOr:
echo "failed to mount relay: " & error
return
@ -535,7 +535,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
chat.printReceivedMessage(msg)
node.subscribe(
(kind: PubsubSub, topic: DefaultPubsubTopic), some(WakuRelayHandler(handler))
(kind: PubsubSub, topic: DefaultPubsubTopic), WakuRelayHandler(handler)
).isOkOr:
error "failed to subscribe to pubsub topic",
topic = DefaultPubsubTopic, error = error

View File

@ -232,7 +232,7 @@ proc start*(cmb: Chat2MatterBridge) {.async.} =
except:
error "exception in relayHandler: " & getCurrentExceptionMsg()
cmb.nodev2.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler)).isOkOr:
cmb.nodev2.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), relayHandler).isOkOr:
error "failed to subscribe to relay", topic = DefaultPubsubTopic, error = error
return

View File

@ -554,7 +554,7 @@ proc subscribeAndHandleMessages(
else:
msgPerContentTopic[msg.contentTopic] = 1
node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(WakuRelayHandler(handler))).isOkOr:
node.subscribe((kind: PubsubSub, topic: pubsubTopic), WakuRelayHandler(handler)).isOkOr:
error "failed to subscribe to pubsub topic", pubsubTopic, error
quit(1)

View File

@ -119,7 +119,7 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} =
contentTopic = msg.contentTopic,
timestamp = msg.timestamp
node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(WakuRelayHandler(handler))).isOkOr:
node.subscribe((kind: PubsubSub, topic: pubsubTopic), WakuRelayHandler(handler)).isOkOr:
error "failed to subscribe to pubsub topic", pubsubTopic, error
quit(1)

View File

@ -111,7 +111,7 @@ proc process*(
of SUBSCRIBE:
waku.node.subscribe(
(kind: SubscriptionKind.PubsubSub, topic: $self.pubsubTopic),
handler = some(self.relayEventCallback),
handler = self.relayEventCallback,
).isOkOr:
error "SUBSCRIBE failed", error
return err($error)

View File

@ -189,9 +189,9 @@ suite "Waku Legacy Lightpush message delivery":
await allFutures(destNode.start(), bridgeNode.start(), lightNode.start())
(await destNode.mountRelay(@[DefaultRelayShard])).isOkOr:
(await destNode.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
(await bridgeNode.mountRelay(@[DefaultRelayShard])).isOkOr:
(await bridgeNode.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
await bridgeNode.mountLegacyLightPush()
lightNode.mountLegacyLightPushClient()
@ -214,7 +214,7 @@ suite "Waku Legacy Lightpush message delivery":
msg == message
completionFutRelay.complete(true)
destNode.subscribe((kind: PubsubSub, topic: CustomPubsubTopic), some(relayHandler)).isOkOr:
destNode.subscribe((kind: PubsubSub, topic: CustomPubsubTopic), relayHandler).isOkOr:
assert false, "Failed to subscribe to topic:" & $error
# Wait for subscription to take effect

View File

@ -183,9 +183,9 @@ suite "Waku Lightpush message delivery":
await allFutures(destNode.start(), bridgeNode.start(), lightNode.start())
(await destNode.mountRelay(@[DefaultRelayShard])).isOkOr:
(await destNode.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
(await bridgeNode.mountRelay(@[DefaultRelayShard])).isOkOr:
(await bridgeNode.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
await bridgeNode.mountLightPush()
lightNode.mountLightPushClient()
@ -209,7 +209,7 @@ suite "Waku Lightpush message delivery":
msg == message
completionFutRelay.complete(true)
destNode.subscribe((kind: PubsubSub, topic: CustomPubsubTopic), some(relayHandler)).isOkOr:
destNode.subscribe((kind: PubsubSub, topic: CustomPubsubTopic), relayHandler).isOkOr:
assert false, "Failed to subscribe to relay"
# Wait for subscription to take effect

View File

@ -22,9 +22,9 @@ procSuite "Relay (GossipSub) Peer Exchange":
newTestWakuNode(node2Key, listenAddress, port, sendSignedPeerRecord = true)
# When both client and server mount relay without a handler
(await node1.mountRelay(@[DefaultRelayShard])).isOkOr:
(await node1.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
(await node2.mountRelay(@[DefaultRelayShard], none(RoutingRecordsHandler))).isOkOr:
(await node2.mountRelay(none(RoutingRecordsHandler))).isOkOr:
assert false, "Failed to mount relay"
# Then the relays are mounted without a handler
@ -74,11 +74,11 @@ procSuite "Relay (GossipSub) Peer Exchange":
peerExchangeHandle: RoutingRecordsHandler = peerExchangeHandler
# Givem the nodes mount relay with a peer exchange handler
(await node1.mountRelay(@[DefaultRelayShard], some(emptyPeerExchangeHandle))).isOkOr:
(await node1.mountRelay(some(emptyPeerExchangeHandle))).isOkOr:
assert false, "Failed to mount relay"
(await node2.mountRelay(@[DefaultRelayShard], some(emptyPeerExchangeHandle))).isOkOr:
(await node2.mountRelay(some(emptyPeerExchangeHandle))).isOkOr:
assert false, "Failed to mount relay"
(await node3.mountRelay(@[DefaultRelayShard], some(peerExchangeHandle))).isOkOr:
(await node3.mountRelay(some(peerExchangeHandle))).isOkOr:
assert false, "Failed to mount relay"
# Ensure that node1 prunes all peers after the first connection
@ -86,6 +86,19 @@ procSuite "Relay (GossipSub) Peer Exchange":
await allFutures([node1.start(), node2.start(), node3.start()])
# The three nodes should be subscribed to the same shard
proc simpleHandler(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
await sleepAsync(0.milliseconds)
node1.subscribe((kind: PubsubSub, topic: $DefaultRelayShard), simpleHandler).isOkOr:
assert false, "Failed to subscribe to topic: " & $error
node2.subscribe((kind: PubsubSub, topic: $DefaultRelayShard), simpleHandler).isOkOr:
assert false, "Failed to subscribe to topic: " & $error
node3.subscribe((kind: PubsubSub, topic: $DefaultRelayShard), simpleHandler).isOkOr:
assert false, "Failed to subscribe to topic: " & $error
# When nodes are connected
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
await node3.connectToNodes(@[node1.switch.peerInfo.toRemotePeerInfo()])

View File

@ -34,14 +34,14 @@ suite "WakuNode":
# Setup node 1 with stable codec "/vac/waku/relay/2.0.0"
await node1.start()
(await node1.mountRelay(@[shard])).isOkOr:
(await node1.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
node1.wakuRelay.codec = "/vac/waku/relay/2.0.0"
# Setup node 2 with beta codec "/vac/waku/relay/2.0.0-beta2"
await node2.start()
(await node2.mountRelay(@[shard])).isOkOr:
(await node2.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
node2.wakuRelay.codec = "/vac/waku/relay/2.0.0-beta2"
@ -69,7 +69,7 @@ suite "WakuNode":
assert false, "Failed to unsubscribe from topic: " & $error
## Subscribe to the relay topic to add the custom relay handler defined above
node2.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)).isOkOr:
node2.subscribe((kind: PubsubSub, topic: $shard), relayHandler).isOkOr:
assert false, "Failed to subscribe to topic"
await sleepAsync(2000.millis)

View File

@ -77,7 +77,8 @@ suite "Waku Relay":
asyncTest "Publish with Subscription (Network Size: 1)":
# When subscribing to a Pubsub Topic
discard node.subscribe(pubsubTopic, simpleFutureHandler)
node.subscribe(pubsubTopic, simpleFutureHandler)
# Then the node is subscribed
check:
@ -111,7 +112,7 @@ suite "Waku Relay":
otherHandlerFuture.complete((topic, message))
# When subscribing the second node to the Pubsub Topic
discard otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler)
otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler)
# Then the second node is subscribed, but not the first one
check:
@ -172,8 +173,8 @@ suite "Waku Relay":
otherHandlerFuture.complete((topic, message))
# When subscribing both nodes to the same Pubsub Topic
discard node.subscribe(pubsubTopic, simpleFutureHandler)
discard otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler)
node.subscribe(pubsubTopic, simpleFutureHandler)
otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler)
# Then both nodes are subscribed
check:
@ -228,7 +229,7 @@ suite "Waku Relay":
asyncTest "Refreshing subscription":
# Given a subscribed node
discard node.subscribe(pubsubTopic, simpleFutureHandler)
node.subscribe(pubsubTopic, simpleFutureHandler)
check:
node.isSubscribed(pubsubTopic)
node.subscribedTopics == pubsubTopicSeq
@ -244,7 +245,7 @@ suite "Waku Relay":
) {.async, gcsafe.} =
otherHandlerFuture.complete((topic, message))
discard node.subscribe(pubsubTopic, otherSimpleFutureHandler)
node.subscribe(pubsubTopic, otherSimpleFutureHandler)
check:
node.isSubscribed(pubsubTopic)
node.subscribedTopics == pubsubTopicSeq
@ -291,14 +292,14 @@ suite "Waku Relay":
otherHandlerFuture.complete((topic, message))
otherNode.addValidator(len4Validator)
discard otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler)
otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler)
await sleepAsync(500.millis)
check:
otherNode.isSubscribed(pubsubTopic)
# Given a subscribed node with a validator
node.addValidator(len4Validator)
discard node.subscribe(pubsubTopic, simpleFutureHandler)
node.subscribe(pubsubTopic, simpleFutureHandler)
await sleepAsync(500.millis)
check:
node.isSubscribed(pubsubTopic)
@ -380,8 +381,8 @@ suite "Waku Relay":
) {.async, gcsafe.} =
otherHandlerFuture.complete((topic, message))
discard node.subscribe(pubsubTopic, simpleFutureHandler)
discard otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler)
node.subscribe(pubsubTopic, simpleFutureHandler)
otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler)
check:
node.isSubscribed(pubsubTopic)
node.subscribedTopics == pubsubTopicSeq
@ -464,8 +465,8 @@ suite "Waku Relay":
) {.async, gcsafe.} =
handlerFuture2.complete((topic, message))
discard node.subscribe(pubsubTopic, simpleFutureHandler)
discard node.subscribe(pubsubTopicB, simpleFutureHandler2)
node.subscribe(pubsubTopic, simpleFutureHandler)
node.subscribe(pubsubTopicB, simpleFutureHandler2)
# Given the other nodes are subscribed to two pubsub topics
var otherHandlerFuture1 = newPushHandlerFuture()
@ -492,10 +493,10 @@ suite "Waku Relay":
) {.async, gcsafe.} =
anotherHandlerFuture2.complete((topic, message))
discard otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler1)
discard otherNode.subscribe(pubsubTopicC, otherSimpleFutureHandler2)
discard anotherNode.subscribe(pubsubTopicB, anotherSimpleFutureHandler1)
discard anotherNode.subscribe(pubsubTopicC, anotherSimpleFutureHandler2)
otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler1)
otherNode.subscribe(pubsubTopicC, otherSimpleFutureHandler2)
anotherNode.subscribe(pubsubTopicB, anotherSimpleFutureHandler1)
anotherNode.subscribe(pubsubTopicC, anotherSimpleFutureHandler2)
await sleepAsync(500.millis)
# When publishing a message in node for each of the pubsub topics
@ -735,15 +736,13 @@ suite "Waku Relay":
otherSwitch = newTestSwitch()
otherNode = await newTestWakuRelay(otherSwitch)
await allFutures(otherSwitch.start(), otherNode.start())
let otherTopicHandler: TopicHandler =
otherNode.subscribe(pubsubTopic, simpleFutureHandler)
otherNode.subscribe(pubsubTopic, simpleFutureHandler)
# Given a node without a subscription
check:
node.subscribedTopics == []
# When unsubscribing from a pubsub topic from an unsubscribed topic handler
node.unsubscribe(pubsubTopic, otherTopicHandler)
node.unsubscribe(pubsubTopic)
# Then the node is still not subscribed
check:
@ -754,11 +753,11 @@ suite "Waku Relay":
asyncTest "Single Node with Single Pubsub Topic":
# Given a node subscribed to a pubsub topic
let topicHandler = node.subscribe(pubsubTopic, simpleFutureHandler)
node.subscribe(pubsubTopic, simpleFutureHandler)
check node.subscribedTopics == pubsubTopicSeq
# When unsubscribing from the pubsub topic
node.unsubscribe(pubsubTopic, topicHandler)
node.unsubscribe(pubsubTopic)
# Then the node is not subscribed anymore
check node.subscribedTopics == []
@ -768,9 +767,8 @@ suite "Waku Relay":
let pubsubTopicB = "/waku/2/rs/0/1"
# Given a node subscribed to multiple pubsub topics
let
topicHandler = node.subscribe(pubsubTopic, simpleFutureHandler)
topicHandlerB = node.subscribe(pubsubTopicB, simpleFutureHandler)
node.subscribe(pubsubTopic, simpleFutureHandler)
node.subscribe(pubsubTopicB, simpleFutureHandler)
assert pubsubTopic in node.subscribedTopics,
fmt"Node is not subscribed to {pubsubTopic}"
@ -778,13 +776,13 @@ suite "Waku Relay":
fmt"Node is not subscribed to {pubsubTopicB}"
# When unsubscribing from one of the pubsub topics
node.unsubscribe(pubsubTopic, topicHandler)
node.unsubscribe(pubsubTopic)
# Then the node is still subscribed to the other pubsub topic
check node.subscribedTopics == @[pubsubTopicB]
# When unsubscribing from the other pubsub topic
node.unsubscribe(pubsubTopicB, topicHandlerB)
node.unsubscribe(pubsubTopicB)
# Then the node is not subscribed anymore
check node.subscribedTopics == []
@ -802,7 +800,7 @@ suite "Waku Relay":
asyncTest "Single Node with Single Pubsub Topic":
# Given a node subscribed to a pubsub topic
discard node.subscribe(pubsubTopic, simpleFutureHandler)
node.subscribe(pubsubTopic, simpleFutureHandler)
check node.subscribedTopics == pubsubTopicSeq
# When unsubscribing from all pubsub topics
@ -816,9 +814,9 @@ suite "Waku Relay":
let pubsubTopicB = "/waku/2/rs/0/1"
# Given a node subscribed to multiple pubsub topics
discard node.subscribe(pubsubTopic, simpleFutureHandler)
discard node.subscribe(pubsubTopic, simpleFutureHandler)
discard node.subscribe(pubsubTopicB, simpleFutureHandler)
node.subscribe(pubsubTopic, simpleFutureHandler)
node.subscribe(pubsubTopic, simpleFutureHandler)
node.subscribe(pubsubTopicB, simpleFutureHandler)
assert pubsubTopic in node.subscribedTopics,
fmt"Node is not subscribed to {pubsubTopic}"
@ -855,8 +853,8 @@ suite "Waku Relay":
) {.async, gcsafe.} =
otherHandlerFuture.complete((topic, message))
discard otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler)
discard node.subscribe(pubsubTopic, simpleFutureHandler)
otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler)
node.subscribe(pubsubTopic, simpleFutureHandler)
check:
node.subscribedTopics == pubsubTopicSeq
otherNode.subscribedTopics == pubsubTopicSeq
@ -1021,8 +1019,8 @@ suite "Waku Relay":
) {.async, gcsafe.} =
otherHandlerFuture.complete((topic, message))
discard otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler)
discard node.subscribe(pubsubTopic, simpleFutureHandler)
otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler)
node.subscribe(pubsubTopic, simpleFutureHandler)
check:
node.subscribedTopics == pubsubTopicSeq
otherNode.subscribedTopics == pubsubTopicSeq
@ -1163,8 +1161,8 @@ suite "Waku Relay":
otherMessageSeq.add((topic, message))
otherHandlerFuture.complete((topic, message))
discard node.subscribe(pubsubTopic, thisSimpleFutureHandler)
discard otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler)
node.subscribe(pubsubTopic, thisSimpleFutureHandler)
otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler)
check:
node.subscribedTopics == pubsubTopicSeq
otherNode.subscribedTopics == pubsubTopicSeq
@ -1237,8 +1235,8 @@ suite "Waku Relay":
) {.async, gcsafe.} =
otherHandlerFuture.complete((topic, message))
discard otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler)
discard node.subscribe(pubsubTopic, simpleFutureHandler)
otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler)
node.subscribe(pubsubTopic, simpleFutureHandler)
check:
node.subscribedTopics == pubsubTopicSeq
otherNode.subscribedTopics == pubsubTopicSeq
@ -1332,8 +1330,8 @@ suite "Waku Relay":
) {.async, gcsafe.} =
otherHandlerFuture.complete((topic, message))
discard otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler)
discard node.subscribe(pubsubTopic, simpleFutureHandler)
otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler)
node.subscribe(pubsubTopic, simpleFutureHandler)
check:
node.subscribedTopics == pubsubTopicSeq
otherNode.subscribedTopics == pubsubTopicSeq

View File

@ -70,15 +70,15 @@ suite "WakuNode - Relay":
message = WakuMessage(payload: payload, contentTopic: contentTopic)
await node1.start()
(await node1.mountRelay(@[shard])).isOkOr:
(await node1.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
await node2.start()
(await node2.mountRelay(@[shard])).isOkOr:
(await node2.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
await node3.start()
(await node3.mountRelay(@[shard])).isOkOr:
(await node3.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
await allFutures(
@ -97,13 +97,19 @@ suite "WakuNode - Relay":
msg.timestamp > 0
completionFut.complete(true)
## The following unsubscription is necessary to remove the default relay handler, which is
## added when mountRelay is called.
node3.unsubscribe((kind: PubsubUnsub, topic: $shard)).isOkOr:
assert false, "Failed to unsubscribe from topic: " & $error
proc simpleHandler(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
await sleepAsync(0.milliseconds)
## node1 and node2 explicitly subscribe to the same shard as node3
node1.subscribe((kind: PubsubSub, topic: $shard), simpleHandler).isOkOr:
assert false, "Failed to subscribe to topic: " & $error
node2.subscribe((kind: PubsubSub, topic: $shard), simpleHandler).isOkOr:
assert false, "Failed to subscribe to topic: " & $error
## Subscribe to the relay topic to add the custom relay handler defined above
node3.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)).isOkOr:
node3.subscribe((kind: PubsubSub, topic: $shard), relayHandler).isOkOr:
assert false, "Failed to subscribe to topic: " & $error
await sleepAsync(500.millis)
@ -147,15 +153,15 @@ suite "WakuNode - Relay":
# start all the nodes
await node1.start()
(await node1.mountRelay(@[shard])).isOkOr:
(await node1.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
await node2.start()
(await node2.mountRelay(@[shard])).isOkOr:
(await node2.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
await node3.start()
(await node3.mountRelay(@[shard])).isOkOr:
(await node3.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
@ -193,13 +199,19 @@ suite "WakuNode - Relay":
# relay handler is called
completionFut.complete(true)
## The following unsubscription is necessary to remove the default relay handler, which is
## added when mountRelay is called.
node3.unsubscribe((kind: PubsubUnsub, topic: $shard)).isOkOr:
assert false, "Failed to unsubscribe from topic: " & $error
proc simpleHandler(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
await sleepAsync(0.milliseconds)
## node1 and node2 explicitly subscribe to the same shard as node3
node1.subscribe((kind: PubsubSub, topic: $shard), simpleHandler).isOkOr:
assert false, "Failed to subscribe to topic: " & $error
node2.subscribe((kind: PubsubSub, topic: $shard), simpleHandler).isOkOr:
assert false, "Failed to subscribe to topic: " & $error
## Subscribe to the relay topic to add the custom relay handler defined above
node3.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)).isOkOr:
node3.subscribe((kind: PubsubSub, topic: $shard), relayHandler).isOkOr:
assert false, "Failed to subscribe to topic: " & $error
await sleepAsync(500.millis)
@ -287,11 +299,11 @@ suite "WakuNode - Relay":
message = WakuMessage(payload: payload, contentTopic: contentTopic)
await node1.start()
(await node1.mountRelay(@[shard])).isOkOr:
(await node1.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
await node2.start()
(await node2.mountRelay(@[shard])).isOkOr:
(await node2.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
@ -313,7 +325,7 @@ suite "WakuNode - Relay":
assert false, "Failed to unsubscribe from topic: " & $error
## Subscribe to the relay topic to add the custom relay handler defined above
node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)).isOkOr:
node1.subscribe((kind: PubsubSub, topic: $shard), relayHandler).isOkOr:
assert false, "Failed to subscribe to topic: " & $error
await sleepAsync(500.millis)
@ -345,11 +357,11 @@ suite "WakuNode - Relay":
message = WakuMessage(payload: payload, contentTopic: contentTopic)
await node1.start()
(await node1.mountRelay(@[shard])).isOkOr:
(await node1.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
await node2.start()
(await node2.mountRelay(@[shard])).isOkOr:
(await node2.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
@ -371,7 +383,7 @@ suite "WakuNode - Relay":
assert false, "Failed to unsubscribe from topic: " & $error
## Subscribe to the relay topic to add the custom relay handler defined above
node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)).isOkOr:
node1.subscribe((kind: PubsubSub, topic: $shard), relayHandler).isOkOr:
assert false, "Failed to subscribe to topic: " & $error
await sleepAsync(500.millis)
@ -403,11 +415,11 @@ suite "WakuNode - Relay":
message = WakuMessage(payload: payload, contentTopic: contentTopic)
await node1.start()
(await node1.mountRelay(@[shard])).isOkOr:
(await node1.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
await node2.start()
(await node2.mountRelay(@[shard])).isOkOr:
(await node2.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
#delete websocket peer address
@ -433,7 +445,7 @@ suite "WakuNode - Relay":
assert false, "Failed to unsubscribe from topic: " & $error
## Subscribe to the relay topic to add the custom relay handler defined above
node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)).isOkOr:
node1.subscribe((kind: PubsubSub, topic: $shard), relayHandler).isOkOr:
assert false, "Failed to subscribe to topic: " & $error
await sleepAsync(500.millis)
@ -467,11 +479,11 @@ suite "WakuNode - Relay":
message = WakuMessage(payload: payload, contentTopic: contentTopic)
await node1.start()
(await node1.mountRelay(@[shard])).isOkOr:
(await node1.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
await node2.start()
(await node2.mountRelay(@[shard])).isOkOr:
(await node2.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
@ -493,7 +505,7 @@ suite "WakuNode - Relay":
assert false, "Failed to unsubscribe from topic: " & $error
## Subscribe to the relay topic to add the custom relay handler defined above
node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)).isOkOr:
node1.subscribe((kind: PubsubSub, topic: $shard), relayHandler).isOkOr:
assert false, "Failed to subscribe to topic: " & $error
await sleepAsync(500.millis)
@ -535,11 +547,11 @@ suite "WakuNode - Relay":
message = WakuMessage(payload: payload, contentTopic: contentTopic)
await node1.start()
(await node1.mountRelay(@[shard])).isOkOr:
(await node1.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
await node2.start()
(await node2.mountRelay(@[shard])).isOkOr:
(await node2.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
@ -561,7 +573,7 @@ suite "WakuNode - Relay":
assert false, "Failed to unsubscribe from topic: " & $error
## Subscribe to the relay topic to add the custom relay handler defined above
node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)).isOkOr:
node1.subscribe((kind: PubsubSub, topic: $shard), relayHandler).isOkOr:
assert false, "Failed to subscribe to topic: " & $error
await sleepAsync(500.millis)
@ -583,10 +595,15 @@ suite "WakuNode - Relay":
await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountRelay()))
proc simpleHandler(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
await sleepAsync(0.milliseconds)
# subscribe all nodes to a topic
let topic = "topic"
for node in nodes:
discard node.wakuRelay.subscribe(topic, nil)
node.wakuRelay.subscribe(topic, simpleHandler)
await sleepAsync(500.millis)
# connect nodes in full mesh
@ -661,19 +678,24 @@ suite "WakuNode - Relay":
"topic must use the same shard"
## When
node.subscribe((kind: ContentSub, topic: contentTopicA), some(handler)).isOkOr:
node.subscribe((kind: ContentSub, topic: contentTopicA), handler).isOkOr:
assert false, "Failed to subscribe to topic: " & $error
node.subscribe((kind: ContentSub, topic: contentTopicB), some(handler)).isOkOr:
node.subscribe((kind: ContentSub, topic: contentTopicB), handler).isOkOr:
assert false,
"The subscription call shouldn't error even though it's already subscribed to that shard"
node.subscribe((kind: ContentSub, topic: contentTopicC), some(handler)).isOkOr:
node.subscribe((kind: ContentSub, topic: contentTopicC), handler).isOkOr:
assert false,
"The subscription call shouldn't error even though it's already subscribed to that shard"
## The node should be subscribed to the shard
check node.wakuRelay.isSubscribed(shard)
## Then
node.unsubscribe((kind: ContentUnsub, topic: contentTopicB)).isOkOr:
assert false, "Failed to unsubscribe to topic: " & $error
check node.wakuRelay.isSubscribed(shard)
## After unsubcription, the node should not be subscribed to the shard anymore
check not node.wakuRelay.isSubscribed(shard)
## Cleanup
await node.stop()

View File

@ -60,7 +60,7 @@ proc subscribeToContentTopicWithHandler*(
if topic == topic:
completionFut.complete(true)
(node.subscribe((kind: ContentSub, topic: contentTopic), some(relayHandler))).isOkOr:
(node.subscribe((kind: ContentSub, topic: contentTopic), relayHandler)).isOkOr:
error "Failed to subscribe to content topic", error
completionFut.complete(true)
return completionFut
@ -73,7 +73,7 @@ proc subscribeCompletionHandler*(node: WakuNode, pubsubTopic: string): Future[bo
if topic == pubsubTopic:
completionFut.complete(true)
(node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))).isOkOr:
(node.subscribe((kind: PubsubSub, topic: pubsubTopic), relayHandler)).isOkOr:
error "Failed to subscribe to pubsub topic", error
completionFut.complete(false)
return completionFut

View File

@ -57,7 +57,7 @@ procSuite "WakuNode - RLN relay":
# set up three nodes
# node1
(await node1.mountRelay(@[DefaultRelayShard])).isOkOr:
(await node1.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
# mount rlnrelay in off-chain mode
@ -74,7 +74,7 @@ procSuite "WakuNode - RLN relay":
await node1.start()
# node 2
(await node2.mountRelay(@[DefaultRelayShard])).isOkOr:
(await node2.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
# mount rlnrelay in off-chain mode
let wakuRlnConfig2 = WakuRlnConfig(
@ -90,7 +90,7 @@ procSuite "WakuNode - RLN relay":
await node2.start()
# node 3
(await node3.mountRelay(@[DefaultRelayShard])).isOkOr:
(await node3.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
let wakuRlnConfig3 = WakuRlnConfig(
@ -117,13 +117,18 @@ procSuite "WakuNode - RLN relay":
if topic == DefaultPubsubTopic:
completionFut.complete(true)
## The following unsubscription is necessary to remove the default relay handler, which is
## added when mountRelay is called.
node3.unsubscribe((kind: PubsubUnsub, topic: DefaultPubsubTopic)).isOkOr:
assert false, "Failed to unsubscribe from topic: " & $error
proc simpleHandler(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
await sleepAsync(0.milliseconds)
node1.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler).isOkOr:
assert false, "Failed to subscribe to pubsub topic in node1: " & $error
node2.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler).isOkOr:
assert false, "Failed to subscribe to pubsub topic in node2: " & $error
## Subscribe to the relay topic to add the custom relay handler defined above
node3.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler)).isOkOr:
node3.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), relayHandler).isOkOr:
assert false, "Failed to subscribe to pubsub topic: " & $error
await sleepAsync(2000.millis)
@ -146,8 +151,7 @@ procSuite "WakuNode - RLN relay":
discard await node1.publish(some(DefaultPubsubTopic), message)
await sleepAsync(2000.millis)
check:
(await completionFut.withTimeout(10.seconds)) == true
assert (await completionFut.withTimeout(10.seconds)), "completionFut timed out"
await node1.stop()
await node2.stop()
@ -169,7 +173,7 @@ procSuite "WakuNode - RLN relay":
]
# set up three nodes
await allFutures(nodes.mapIt(it.mountRelay(shards)))
await allFutures(nodes.mapIt(it.mountRelay()))
# mount rlnrelay in off-chain mode
for index, node in nodes:
@ -201,17 +205,20 @@ procSuite "WakuNode - RLN relay":
elif topic == $shards[1]:
rxMessagesTopic2 = rxMessagesTopic2 + 1
## This unsubscription is necessary to remove the default relay handler, which is
## added when mountRelay is called.
nodes[2].unsubscribe((kind: PubsubUnsub, topic: $shards[0])).isOkOr:
assert false, "Failed to unsubscribe to pubsub topic: " & $error
nodes[2].unsubscribe((kind: PubsubUnsub, topic: $shards[1])).isOkOr:
assert false, "Failed to unsubscribe to pubsub topic: " & $error
proc simpleHandler(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
await sleepAsync(0.milliseconds)
nodes[0].subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler).isOkOr:
assert false, "Failed to subscribe to pubsub topic in nodes[0]: " & $error
nodes[1].subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler).isOkOr:
assert false, "Failed to subscribe to pubsub topic in nodes[1]: " & $error
# mount the relay handlers
nodes[2].subscribe((kind: PubsubSub, topic: $shards[0]), some(relayHandler)).isOkOr:
nodes[2].subscribe((kind: PubsubSub, topic: $shards[0]), relayHandler).isOkOr:
assert false, "Failed to subscribe to pubsub topic: " & $error
nodes[2].subscribe((kind: PubsubSub, topic: $shards[1]), some(relayHandler)).isOkOr:
nodes[2].subscribe((kind: PubsubSub, topic: $shards[1]), relayHandler).isOkOr:
assert false, "Failed to subscribe to pubsub topic: " & $error
await sleepAsync(1000.millis)
@ -279,7 +286,7 @@ procSuite "WakuNode - RLN relay":
# set up three nodes
# node1
(await node1.mountRelay(@[DefaultRelayShard])).isOkOr:
(await node1.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
# mount rlnrelay in off-chain mode
@ -296,7 +303,7 @@ procSuite "WakuNode - RLN relay":
await node1.start()
# node 2
(await node2.mountRelay(@[DefaultRelayShard])).isOkOr:
(await node2.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
# mount rlnrelay in off-chain mode
let wakuRlnConfig2 = WakuRlnConfig(
@ -312,7 +319,7 @@ procSuite "WakuNode - RLN relay":
await node2.start()
# node 3
(await node3.mountRelay(@[DefaultRelayShard])).isOkOr:
(await node3.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
let wakuRlnConfig3 = WakuRlnConfig(
@ -339,13 +346,18 @@ procSuite "WakuNode - RLN relay":
if topic == DefaultPubsubTopic:
completionFut.complete(true)
## The following unsubscription is necessary to remove the default relay handler, which is
## added when mountRelay is called.
node3.unsubscribe((kind: PubsubUnsub, topic: DefaultPubsubTopic)).isOkOr:
assert false, "Failed to unsubscribe to pubsub topic: " & $error
proc simpleHandler(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
await sleepAsync(0.milliseconds)
node1.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler).isOkOr:
assert false, "Failed to subscribe to pubsub topic in node1: " & $error
node2.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler).isOkOr:
assert false, "Failed to subscribe to pubsub topic in node2: " & $error
# mount the relay handler
node3.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler)).isOkOr:
node3.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), relayHandler).isOkOr:
assert false, "Failed to subscribe to pubsub topic: " & $error
await sleepAsync(2000.millis)
@ -408,7 +420,7 @@ procSuite "WakuNode - RLN relay":
# set up three nodes
# node1
(await node1.mountRelay(@[DefaultRelayShard])).isOkOr:
(await node1.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
# mount rlnrelay in off-chain mode
@ -425,7 +437,7 @@ procSuite "WakuNode - RLN relay":
await node1.start()
# node 2
(await node2.mountRelay(@[DefaultRelayShard])).isOkOr:
(await node2.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
# mount rlnrelay in off-chain mode
@ -441,7 +453,7 @@ procSuite "WakuNode - RLN relay":
await node2.start()
# node 3
(await node3.mountRelay(@[DefaultRelayShard])).isOkOr:
(await node3.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
# mount rlnrelay in off-chain mode
@ -513,13 +525,18 @@ procSuite "WakuNode - RLN relay":
if msg.payload == wm4.payload:
completionFut4.complete(true)
## The following unsubscription is necessary to remove the default relay handler, which is
## added when mountRelay is called.
node3.unsubscribe((kind: PubsubUnsub, topic: DefaultPubsubTopic)).isOkOr:
assert false, "Failed to unsubscribe to pubsub topic: " & $error
proc simpleHandler(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
await sleepAsync(0.milliseconds)
node1.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler).isOkOr:
assert false, "Failed to subscribe to pubsub topic in node1: " & $error
node2.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler).isOkOr:
assert false, "Failed to subscribe to pubsub topic in node2: " & $error
# mount the relay handler for node3
node3.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler)).isOkOr:
node3.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), relayHandler).isOkOr:
assert false, "Failed to subscribe to pubsub topic: " & $error
await sleepAsync(2000.millis)
@ -562,14 +579,14 @@ procSuite "WakuNode - RLN relay":
epochSizeSec: uint64 = 5 # This means rlnMaxEpochGap = 4
# Given both nodes mount relay and rlnrelay
(await node1.mountRelay(shardSeq)).isOkOr:
(await node1.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
let wakuRlnConfig1 = buildWakuRlnConfig(1, epochSizeSec, "wakunode_10")
(await node1.mountRlnRelay(wakuRlnConfig1)).isOkOr:
assert false, "Failed to mount rlnrelay"
# Mount rlnrelay in node2 in off-chain mode
(await node2.mountRelay(@[DefaultRelayShard])).isOkOr:
(await node2.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
let wakuRlnConfig2 = buildWakuRlnConfig(2, epochSizeSec, "wakunode_11")
await node2.mountRlnRelay(wakuRlnConfig2)
@ -613,7 +630,7 @@ procSuite "WakuNode - RLN relay":
if msg == wm6:
completionFut6.complete(true)
node2.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler)).isOkOr:
node2.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), relayHandler).isOkOr:
assert false, "Failed to subscribe to pubsub topic: " & $error
# Given all messages have an rln proof and are published by the node 1
@ -704,17 +721,27 @@ procSuite "WakuNode - RLN relay":
# Given both nodes mount relay and rlnrelay
# Mount rlnrelay in node1 in off-chain mode
(await node1.mountRelay(shardSeq)).isOkOr:
(await node1.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
let wakuRlnConfig1 = buildWakuRlnConfig(1, epochSizeSec, "wakunode_10")
await node1.mountRlnRelay(wakuRlnConfig1)
# Mount rlnrelay in node2 in off-chain mode
(await node2.mountRelay(@[DefaultRelayShard])).isOkOr:
(await node2.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
let wakuRlnConfig2 = buildWakuRlnConfig(2, epochSizeSec, "wakunode_11")
await node2.mountRlnRelay(wakuRlnConfig2)
proc simpleHandler(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
await sleepAsync(0.milliseconds)
node1.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler).isOkOr:
assert false, "Failed to subscribe to pubsub topic in node2: " & $error
node2.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler).isOkOr:
assert false, "Failed to subscribe to pubsub topic in node1: " & $error
# Given the two nodes are started and connected
waitFor allFutures(node1.start(), node2.start())
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])

View File

@ -73,7 +73,9 @@ suite "WakuNode2 - Validators":
# Subscribe all nodes to the same topic/handler
for node in nodes:
discard node.wakuRelay.subscribe($spamProtectedShard, handler)
node.subscribe((kind: PubsubSub, topic: $spamProtectedShard), handler).isOkOr:
assert false, "Failed to subscribe to topic: " & $error
await sleepAsync(500.millis)
# Each node publishes 10 signed messages
@ -163,7 +165,9 @@ suite "WakuNode2 - Validators":
# Subscribe all nodes to the same topic/handler
for node in nodes:
discard node.wakuRelay.subscribe($spamProtectedShard, handler)
node.subscribe((kind: PubsubSub, topic: $spamProtectedShard), handler).isOkOr:
assert false, "Failed to subscribe to topic: " & $error
await sleepAsync(500.millis)
# Each node sends 5 messages, signed but with a non-whitelisted key (total = 25)
@ -291,7 +295,8 @@ suite "WakuNode2 - Validators":
# Subscribe all nodes to the same topic/handler
for node in nodes:
discard node.wakuRelay.subscribe($spamProtectedShard, handler)
node.subscribe((kind: PubsubSub, topic: $spamProtectedShard), handler).isOkOr:
assert false, "Failed to subscribe to topic: " & $error
await sleepAsync(500.millis)
# Add signed message validator to all nodes. They will only route signed messages

View File

@ -43,14 +43,27 @@ suite "Waku v2 Rest API - Admin":
node3 = newTestWakuNode(generateSecp256k1Key(), getPrimaryIPAddr(), Port(60604))
await allFutures(node1.start(), node2.start(), node3.start())
let shards = @[RelayShard(clusterId: 1, shardId: 0)]
await allFutures(
node1.mountRelay(shards = shards),
node2.mountRelay(shards = shards),
node3.mountRelay(shards = shards),
node1.mountRelay(),
node2.mountRelay(),
node3.mountRelay(),
node3.mountPeerExchange(),
)
# The three nodes should be subscribed to the same shard
proc simpleHandler(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
await sleepAsync(0.milliseconds)
let shard = RelayShard(clusterId: 1, shardId: 0)
node1.subscribe((kind: PubsubSub, topic: $shard), simpleHandler).isOkOr:
assert false, "Failed to subscribe to topic: " & $error
node2.subscribe((kind: PubsubSub, topic: $shard), simpleHandler).isOkOr:
assert false, "Failed to subscribe to topic: " & $error
node3.subscribe((kind: PubsubSub, topic: $shard), simpleHandler).isOkOr:
assert false, "Failed to subscribe to topic: " & $error
peerInfo1 = node1.switch.peerInfo
peerInfo2 = node2.switch.peerInfo
peerInfo3 = node3.switch.peerInfo

View File

@ -278,8 +278,16 @@ suite "Waku v2 Rest API - Filter V2":
restFilterTest = await RestFilterTest.init()
subPeerId = restFilterTest.subscriberNode.peerInfo.toRemotePeerInfo().peerId
let simpleHandler = proc(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
await sleepAsync(0.milliseconds)
restFilterTest.messageCache.pubsubSubscribe(DefaultPubsubTopic)
restFilterTest.serviceNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic)).isOkOr:
restFilterTest.serviceNode.subscribe(
(kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler
).isOkOr:
assert false, "Failed to subscribe to topic: " & $error
# When
@ -325,7 +333,14 @@ suite "Waku v2 Rest API - Filter V2":
# setup filter service and client node
let restFilterTest = await RestFilterTest.init()
let subPeerId = restFilterTest.subscriberNode.peerInfo.toRemotePeerInfo().peerId
restFilterTest.serviceNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic)).isOkOr:
let simpleHandler = proc(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
await sleepAsync(0.milliseconds)
restFilterTest.serviceNode.subscribe(
(kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler
).isOkOr:
assert false, "Failed to subscribe to topic: " & $error
let requestBody = FilterSubscribeRequest(
@ -397,7 +412,14 @@ suite "Waku v2 Rest API - Filter V2":
# setup filter service and client node
let restFilterTest = await RestFilterTest.init()
let subPeerId = restFilterTest.subscriberNode.peerInfo.toRemotePeerInfo().peerId
restFilterTest.serviceNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic)).isOkOr:
let simpleHandler = proc(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
await sleepAsync(0.milliseconds)
restFilterTest.serviceNode.subscribe(
(kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler
).isOkOr:
assert false, "Failed to subscribe to topic: " & $error
let requestBody = FilterSubscribeRequest(

View File

@ -128,13 +128,18 @@ suite "Waku v2 Rest API - lightpush":
# Given
let restLightPushTest = await RestLightPushTest.init()
let simpleHandler = proc(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
await sleepAsync(0.milliseconds)
restLightPushTest.consumerNode.subscribe(
(kind: PubsubSub, topic: DefaultPubsubTopic)
(kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler
).isOkOr:
assert false, "Failed to subscribe to relay: " & $error
restLightPushTest.serviceNode.subscribe(
(kind: PubsubSub, topic: DefaultPubsubTopic)
(kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler
).isOkOr:
assert false, "Failed to subscribe to relay: " & $error
require:
@ -162,9 +167,13 @@ suite "Waku v2 Rest API - lightpush":
asyncTest "Push message bad-request":
# Given
let restLightPushTest = await RestLightPushTest.init()
let simpleHandler = proc(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
await sleepAsync(0.milliseconds)
restLightPushTest.serviceNode.subscribe(
(kind: PubsubSub, topic: DefaultPubsubTopic)
(kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler
).isOkOr:
assert false, "Failed to subscribe to relay: " & $error
require:
@ -220,14 +229,18 @@ suite "Waku v2 Rest API - lightpush":
let budgetCap = 3
let tokenPeriod = 500.millis
let restLightPushTest = await RestLightPushTest.init((budgetCap, tokenPeriod))
let simpleHandler = proc(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
await sleepAsync(0.milliseconds)
restLightPushTest.consumerNode.subscribe(
(kind: PubsubSub, topic: DefaultPubsubTopic)
(kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler
).isOkOr:
assert false, "Failed to subscribe to relay: " & $error
restLightPushTest.serviceNode.subscribe(
(kind: PubsubSub, topic: DefaultPubsubTopic)
(kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler
).isOkOr:
assert false, "Failed to subscribe to relay: " & $error
require:

View File

@ -122,14 +122,18 @@ suite "Waku v2 Rest API - lightpush":
asyncTest "Push message request":
# Given
let restLightPushTest = await RestLightPushTest.init()
let simpleHandler = proc(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
await sleepAsync(0.milliseconds)
restLightPushTest.consumerNode.subscribe(
(kind: PubsubSub, topic: DefaultPubsubTopic)
(kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler
).isOkOr:
assert false, "Failed to subscribe to topic"
restLightPushTest.serviceNode.subscribe(
(kind: PubsubSub, topic: DefaultPubsubTopic)
(kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler
).isOkOr:
assert false, "Failed to subscribe to topic"
require:
@ -157,9 +161,13 @@ suite "Waku v2 Rest API - lightpush":
asyncTest "Push message bad-request":
# Given
let restLightPushTest = await RestLightPushTest.init()
let simpleHandler = proc(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
await sleepAsync(0.milliseconds)
restLightPushTest.serviceNode.subscribe(
(kind: PubsubSub, topic: DefaultPubsubTopic)
(kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler
).isOkOr:
assert false, "Failed to subscribe to topic"
require:
@ -218,14 +226,18 @@ suite "Waku v2 Rest API - lightpush":
let budgetCap = 3
let tokenPeriod = 500.millis
let restLightPushTest = await RestLightPushTest.init((budgetCap, tokenPeriod))
let simpleHandler = proc(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
await sleepAsync(0.milliseconds)
restLightPushTest.consumerNode.subscribe(
(kind: PubsubSub, topic: DefaultPubsubTopic)
(kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler
).isOkOr:
assert false, "Failed to subscribe to topic"
restLightPushTest.serviceNode.subscribe(
(kind: PubsubSub, topic: DefaultPubsubTopic)
(kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler
).isOkOr:
assert false, "Failed to subscribe to topic"
require:

View File

@ -95,9 +95,18 @@ suite "Waku v2 Rest API - Relay":
shard3 = RelayShard(clusterId: DefaultClusterId, shardId: 3)
shard4 = RelayShard(clusterId: DefaultClusterId, shardId: 4)
(await node.mountRelay(@[shard0, shard1, shard2, shard3, shard4])).isOkOr:
(await node.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
proc simpleHandler(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
await sleepAsync(0.milliseconds)
for shard in @[$shard0, $shard1, $shard2, $shard3, $shard4]:
node.subscribe((kind: PubsubSub, topic: shard), simpleHandler).isOkOr:
assert false, "Failed to subscribe to pubsub topic: " & $error
var restPort = Port(0)
let restAddress = parseIpAddress("0.0.0.0")
let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet()
@ -248,8 +257,14 @@ suite "Waku v2 Rest API - Relay":
let client = newRestHttpClient(initTAddress(restAddress, restPort))
node.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic)).isOkOr:
let simpleHandler = proc(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
await sleepAsync(0.milliseconds)
node.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler).isOkOr:
assert false, "Failed to subscribe to pubsub topic"
require:
toSeq(node.wakuRelay.subscribedTopics).len == 1
@ -477,7 +492,12 @@ suite "Waku v2 Rest API - Relay":
let client = newRestHttpClient(initTAddress(restAddress, restPort))
node.subscribe((kind: ContentSub, topic: DefaultContentTopic)).isOkOr:
let simpleHandler = proc(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
await sleepAsync(0.milliseconds)
node.subscribe((kind: ContentSub, topic: DefaultContentTopic), simpleHandler).isOkOr:
assert false, "Failed to subscribe to content topic: " & $error
require:
toSeq(node.wakuRelay.subscribedTopics).len == 1
@ -583,7 +603,12 @@ suite "Waku v2 Rest API - Relay":
let client = newRestHttpClient(initTAddress(restAddress, restPort))
node.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic)).isOkOr:
let simpleHandler = proc(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
await sleepAsync(0.milliseconds)
node.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler).isOkOr:
assert false, "Failed to subscribe to pubsub topic: " & $error
require:
toSeq(node.wakuRelay.subscribedTopics).len == 1
@ -640,7 +665,12 @@ suite "Waku v2 Rest API - Relay":
let client = newRestHttpClient(initTAddress(restAddress, restPort))
node.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic)).isOkOr:
let simpleHandler = proc(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
await sleepAsync(0.milliseconds)
node.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler).isOkOr:
assert false, "Failed to subscribe to pubsub topic: " & $error
require:
toSeq(node.wakuRelay.subscribedTopics).len == 1

View File

@ -317,10 +317,7 @@ proc setupProtocols(
(
await mountRelay(
node,
shards,
peerExchangeHandler = peerExchangeHandler,
int(conf.maxMessageSizeBytes),
node, peerExchangeHandler = peerExchangeHandler, int(conf.maxMessageSizeBytes)
)
).isOkOr:
return err("failed to mount waku relay protocol: " & $error)

View File

@ -1,7 +1,7 @@
{.push raises: [].}
import
std/[options, sequtils],
std/[options, sequtils, strformat],
results,
chronicles,
chronos,
@ -130,8 +130,12 @@ proc setupAppCallbacks(
conf.shards.mapIt(RelayShard(clusterId: conf.clusterId, shardId: uint16(it)))
let shards = confShards & autoShards
for shard in shards:
discard node.wakuRelay.subscribe($shard, appCallbacks.relayHandler)
let uniqueShards = deduplicate(shards)
for shard in uniqueShards:
let topic = $shard
node.subscribe((kind: PubsubSub, topic: topic), appCallbacks.relayHandler).isOkOr:
return err(fmt"Could not subscribe {topic}: " & $error)
if not appCallbacks.topicHealthChangeHandler.isNil():
if node.wakuRelay.isNil():

View File

@ -116,7 +116,6 @@ type
announcedAddresses*: seq[MultiAddress]
started*: bool # Indicates that node has started listening
topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent]
contentTopicHandlers: Table[ContentTopic, TopicHandler]
rateLimitSettings*: ProtocolRateLimitSettings
proc new*(
@ -256,7 +255,13 @@ proc mountStoreSync*(
## Waku relay
proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) =
proc registerRelayHandler(
node: WakuNode, topic: PubsubTopic, appHandler: WakuRelayHandler
) =
## Registers the only handler for the given topic.
## Notice that this handler internally calls other handlers, such as filter,
## archive, etc, plus the handler provided by the application.
if node.wakuRelay.isSubscribed(topic):
return
@ -289,18 +294,19 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) =
node.wakuStoreReconciliation.messageIngress(topic, msg)
let defaultHandler = proc(
let uniqueTopicHandler = proc(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
await traceHandler(topic, msg)
await filterHandler(topic, msg)
await archiveHandler(topic, msg)
await syncHandler(topic, msg)
await appHandler(topic, msg)
discard node.wakuRelay.subscribe(topic, defaultHandler)
node.wakuRelay.subscribe(topic, uniqueTopicHandler)
proc subscribe*(
node: WakuNode, subscription: SubscriptionEvent, handler = none(WakuRelayHandler)
node: WakuNode, subscription: SubscriptionEvent, handler: WakuRelayHandler
): Result[void, string] =
## Subscribes to a PubSub or Content topic. Triggers handler when receiving messages on
## this topic. WakuRelayHandler is a method that takes a topic and a Waku message.
@ -326,18 +332,8 @@ proc subscribe*(
warn "No-effect API call to subscribe. Already subscribed to topic", pubsubTopic
return ok()
if contentTopicOp.isSome() and node.contentTopicHandlers.hasKey(contentTopicOp.get()):
warn "No-effect API call to `subscribe`. Was already subscribed"
return ok()
node.registerRelayHandler(pubsubTopic, handler)
node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: pubsubTopic))
node.registerRelayDefaultHandler(pubsubTopic)
if handler.isSome():
let wrappedHandler = node.wakuRelay.subscribe(pubsubTopic, handler.get())
if contentTopicOp.isSome():
node.contentTopicHandlers[contentTopicOp.get()] = wrappedHandler
return ok()
@ -367,17 +363,9 @@ proc unsubscribe*(
warn "No-effect API call to `unsubscribe`. Was not subscribed", pubsubTopic
return ok()
if contentTopicOp.isSome():
# Remove this handler only
var handler: TopicHandler
## TODO: refactor this part. I think we can simplify it
if node.contentTopicHandlers.pop(contentTopicOp.get(), handler):
debug "unsubscribe", contentTopic = contentTopicOp.get()
node.wakuRelay.unsubscribe(pubsubTopic)
else:
debug "unsubscribe", pubsubTopic = pubsubTopic
node.wakuRelay.unsubscribe(pubsubTopic)
node.topicSubscriptionQueue.emit((kind: PubsubUnsub, topic: pubsubTopic))
debug "unsubscribe", pubsubTopic, contentTopicOp
node.wakuRelay.unsubscribe(pubsubTopic)
node.topicSubscriptionQueue.emit((kind: PubsubUnsub, topic: pubsubTopic))
return ok()
@ -439,7 +427,6 @@ proc startRelay*(node: WakuNode) {.async.} =
proc mountRelay*(
node: WakuNode,
shards: seq[RelayShard] = @[],
peerExchangeHandler = none(RoutingRecordsHandler),
maxMessageSize = int(DefaultMaxWakuMessageSize),
): Future[Result[void, string]] {.async.} =
@ -465,16 +452,7 @@ proc mountRelay*(
node.switch.mount(node.wakuRelay, protocolMatcher(WakuRelayCodec))
## Make sure we don't have duplicates
let uniqueShards = deduplicate(shards)
# Subscribe to shards
for shard in uniqueShards:
node.subscribe((kind: PubsubSub, topic: $shard)).isOkOr:
error "failed to subscribe to shard", error = error
return err("failed to subscribe to shard in mountRelay: " & error)
info "relay mounted successfully", shards = uniqueShards
info "relay mounted successfully"
return ok()
## Waku filter

View File

@ -148,9 +148,9 @@ proc startRestServerProtocolSupport*(
let pubsubTopic = $RelayShard(clusterId: clusterId, shardId: shard)
cache.pubsubSubscribe(pubsubTopic)
## TODO: remove this line. use observer-observable pattern
## within waku_node::registerRelayDefaultHandler
discard node.wakuRelay.subscribe(pubsubTopic, handler)
node.subscribe((kind: PubsubSub, topic: pubsubTopic), handler).isOkOr:
error "Could not subscribe", pubsubTopic, error
continue
for contentTopic in contentTopics:
cache.contentSubscribe(contentTopic)
@ -160,9 +160,9 @@ proc startRestServerProtocolSupport*(
continue
let pubsubTopic = $shard
## TODO: remove this line. use observer-observable pattern
## within waku_node::registerRelayDefaultHandler
discard node.wakuRelay.subscribe(pubsubTopic, handler)
node.subscribe((kind: PubsubSub, topic: pubsubTopic), handler).isOkOr:
error "Could not subscribe", pubsubTopic, error
continue
installRelayApiHandlers(router, node, cache)
else:

View File

@ -67,9 +67,7 @@ proc installRelayApiHandlers*(
for pubsubTopic in newTopics:
cache.pubsubSubscribe(pubsubTopic)
node.subscribe(
(kind: PubsubSub, topic: pubsubTopic), some(messageCacheHandler(cache))
).isOkOr:
node.subscribe((kind: PubsubSub, topic: pubsubTopic), messageCacheHandler(cache)).isOkOr:
let errorMsg = "Subscribe failed:" & $error
error "SUBSCRIBE failed", error = errorMsg
return RestApiResponse.internalServerError(errorMsg)
@ -202,7 +200,7 @@ proc installRelayApiHandlers*(
cache.contentSubscribe(contentTopic)
node.subscribe(
(kind: ContentSub, topic: contentTopic), some(messageCacheHandler(cache))
(kind: ContentSub, topic: contentTopic), messageCacheHandler(cache)
).isOkOr:
let errorMsg = "Subscribe failed:" & $error
error "SUBSCRIBE failed", error = errorMsg

View File

@ -131,6 +131,8 @@ type
# a map of validators to error messages to return when validation fails
topicValidator: Table[PubsubTopic, ValidatorHandler]
# map topic with its assigned validator within pubsub
topicHandlers: Table[PubsubTopic, TopicHandler]
# map topic with the TopicHandler proc in charge of attending topic's incoming message events
publishObservers: seq[PublishObserver]
topicsHealth*: Table[string, TopicHealth]
onTopicHealthChange*: TopicHealthChangeHandler
@ -488,13 +490,11 @@ proc validateMessage*(
return ok()
proc subscribe*(
w: WakuRelay, pubsubTopic: PubsubTopic, handler: WakuRelayHandler
): TopicHandler =
proc subscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: WakuRelayHandler) =
debug "subscribe", pubsubTopic = pubsubTopic
# We need to wrap the handler since gossipsub doesnt understand WakuMessage
let wrappedHandler = proc(
let topicHandler = proc(
pubsubTopic: string, data: seq[byte]
): Future[void] {.gcsafe, raises: [].} =
let decMsg = WakuMessage.decode(data)
@ -526,9 +526,9 @@ proc subscribe*(
w.topicParams[pubsubTopic] = TopicParameters
# subscribe to the topic with our wrapped handler
procCall GossipSub(w).subscribe(pubsubTopic, wrappedHandler)
procCall GossipSub(w).subscribe(pubsubTopic, topicHandler)
return wrappedHandler
w.topicHandlers[pubsubTopic] = topicHandler
proc unsubscribeAll*(w: WakuRelay, pubsubTopic: PubsubTopic) =
## Unsubscribe all handlers on this pubsub topic
@ -537,35 +537,32 @@ proc unsubscribeAll*(w: WakuRelay, pubsubTopic: PubsubTopic) =
procCall GossipSub(w).unsubscribeAll(pubsubTopic)
w.topicValidator.del(pubsubTopic)
w.topicHandlers.del(pubsubTopic)
proc unsubscribe*(w: WakuRelay, pubsubTopic: PubsubTopic) =
if not w.topicValidator.hasKey(pubsubTopic):
error "unsubscribe no validator for this topic", pubsubTopic
return
if pubsubtopic notin Pubsub(w).topics:
if not w.topicHandlers.hasKey(pubsubTopic):
error "not subscribed to the given topic", pubsubTopic
return
var topicHandlerSeq: seq[TopicHandler]
var topicHandler: TopicHandler
var topicValidator: ValidatorHandler
try:
topicHandlerSeq = Pubsub(w).topics[pubsubTopic]
if topicHandlerSeq.len == 0:
error "unsubscribe no handler for this topic", pubsubTopic
return
topicHandler = w.topicHandlers[pubsubTopic]
topicValidator = w.topicValidator[pubsubTopic]
except KeyError:
error "exception in unsubscribe", pubsubTopic, error = getCurrentExceptionMsg()
return
let topicHandler = topicHandlerSeq[0]
debug "unsubscribe", pubsubTopic
procCall GossipSub(w).unsubscribe($pubsubTopic, topicHandler)
## TODO: uncomment the following line when https://github.com/vacp2p/nim-libp2p/pull/1356
## is available in a nim-libp2p release.
# procCall GossipSub(w).removeValidator(pubsubTopic, topicValidator)
procCall GossipSub(w).unsubscribe(pubsubTopic, topicHandler)
procCall GossipSub(w).removeValidator(pubsubTopic, topicValidator)
w.topicValidator.del(pubsubTopic)
w.topicHandlers.del(pubsubTopic)
proc publish*(
w: WakuRelay, pubsubTopic: PubsubTopic, wakuMessage: WakuMessage