diff --git a/apps/chat2/chat2.nim b/apps/chat2/chat2.nim index 9c0f47dcd..1ba599d78 100644 --- a/apps/chat2/chat2.nim +++ b/apps/chat2/chat2.nim @@ -380,7 +380,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = if conf.relay: let shards = conf.shards.mapIt(RelayShard(clusterId: conf.clusterId, shardId: uint16(it))) - (await node.mountRelay(shards)).isOkOr: + (await node.mountRelay()).isOkOr: echo "failed to mount relay: " & error return @@ -535,7 +535,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = chat.printReceivedMessage(msg) node.subscribe( - (kind: PubsubSub, topic: DefaultPubsubTopic), some(WakuRelayHandler(handler)) + (kind: PubsubSub, topic: DefaultPubsubTopic), WakuRelayHandler(handler) ).isOkOr: error "failed to subscribe to pubsub topic", topic = DefaultPubsubTopic, error = error diff --git a/apps/chat2bridge/chat2bridge.nim b/apps/chat2bridge/chat2bridge.nim index 7a7a5d08f..a62d98261 100644 --- a/apps/chat2bridge/chat2bridge.nim +++ b/apps/chat2bridge/chat2bridge.nim @@ -232,7 +232,7 @@ proc start*(cmb: Chat2MatterBridge) {.async.} = except: error "exception in relayHandler: " & getCurrentExceptionMsg() - cmb.nodev2.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler)).isOkOr: + cmb.nodev2.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), relayHandler).isOkOr: error "failed to subscribe to relay", topic = DefaultPubsubTopic, error = error return diff --git a/apps/networkmonitor/networkmonitor.nim b/apps/networkmonitor/networkmonitor.nim index 7b71a630e..f8cde5281 100644 --- a/apps/networkmonitor/networkmonitor.nim +++ b/apps/networkmonitor/networkmonitor.nim @@ -554,7 +554,7 @@ proc subscribeAndHandleMessages( else: msgPerContentTopic[msg.contentTopic] = 1 - node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(WakuRelayHandler(handler))).isOkOr: + node.subscribe((kind: PubsubSub, topic: pubsubTopic), WakuRelayHandler(handler)).isOkOr: error "failed to subscribe to pubsub topic", pubsubTopic, error quit(1) diff --git a/examples/subscriber.nim b/examples/subscriber.nim index 7eb900792..fb040b05a 100644 --- a/examples/subscriber.nim +++ b/examples/subscriber.nim @@ -119,7 +119,7 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} = contentTopic = msg.contentTopic, timestamp = msg.timestamp - node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(WakuRelayHandler(handler))).isOkOr: + node.subscribe((kind: PubsubSub, topic: pubsubTopic), WakuRelayHandler(handler)).isOkOr: error "failed to subscribe to pubsub topic", pubsubTopic, error quit(1) diff --git a/library/waku_thread/inter_thread_communication/requests/protocols/relay_request.nim b/library/waku_thread/inter_thread_communication/requests/protocols/relay_request.nim index c2f002c44..cfff1442c 100644 --- a/library/waku_thread/inter_thread_communication/requests/protocols/relay_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/protocols/relay_request.nim @@ -111,7 +111,7 @@ proc process*( of SUBSCRIBE: waku.node.subscribe( (kind: SubscriptionKind.PubsubSub, topic: $self.pubsubTopic), - handler = some(self.relayEventCallback), + handler = self.relayEventCallback, ).isOkOr: error "SUBSCRIBE failed", error return err($error) diff --git a/tests/node/test_wakunode_legacy_lightpush.nim b/tests/node/test_wakunode_legacy_lightpush.nim index 806bfe032..5d01e9f58 100644 --- a/tests/node/test_wakunode_legacy_lightpush.nim +++ b/tests/node/test_wakunode_legacy_lightpush.nim @@ -189,9 +189,9 @@ suite "Waku Legacy Lightpush message delivery": await allFutures(destNode.start(), bridgeNode.start(), lightNode.start()) - (await destNode.mountRelay(@[DefaultRelayShard])).isOkOr: + (await destNode.mountRelay()).isOkOr: assert false, "Failed to mount relay" - (await bridgeNode.mountRelay(@[DefaultRelayShard])).isOkOr: + (await bridgeNode.mountRelay()).isOkOr: assert false, "Failed to mount relay" await bridgeNode.mountLegacyLightPush() lightNode.mountLegacyLightPushClient() @@ -214,7 +214,7 @@ suite "Waku Legacy Lightpush message delivery": msg == message completionFutRelay.complete(true) - destNode.subscribe((kind: PubsubSub, topic: CustomPubsubTopic), some(relayHandler)).isOkOr: + destNode.subscribe((kind: PubsubSub, topic: CustomPubsubTopic), relayHandler).isOkOr: assert false, "Failed to subscribe to topic:" & $error # Wait for subscription to take effect diff --git a/tests/node/test_wakunode_lightpush.nim b/tests/node/test_wakunode_lightpush.nim index dccb899af..6a42c899b 100644 --- a/tests/node/test_wakunode_lightpush.nim +++ b/tests/node/test_wakunode_lightpush.nim @@ -183,9 +183,9 @@ suite "Waku Lightpush message delivery": await allFutures(destNode.start(), bridgeNode.start(), lightNode.start()) - (await destNode.mountRelay(@[DefaultRelayShard])).isOkOr: + (await destNode.mountRelay()).isOkOr: assert false, "Failed to mount relay" - (await bridgeNode.mountRelay(@[DefaultRelayShard])).isOkOr: + (await bridgeNode.mountRelay()).isOkOr: assert false, "Failed to mount relay" await bridgeNode.mountLightPush() lightNode.mountLightPushClient() @@ -209,7 +209,7 @@ suite "Waku Lightpush message delivery": msg == message completionFutRelay.complete(true) - destNode.subscribe((kind: PubsubSub, topic: CustomPubsubTopic), some(relayHandler)).isOkOr: + destNode.subscribe((kind: PubsubSub, topic: CustomPubsubTopic), relayHandler).isOkOr: assert false, "Failed to subscribe to relay" # Wait for subscription to take effect diff --git a/tests/test_relay_peer_exchange.nim b/tests/test_relay_peer_exchange.nim index a5e3b63ee..84976bd9a 100644 --- a/tests/test_relay_peer_exchange.nim +++ b/tests/test_relay_peer_exchange.nim @@ -22,9 +22,9 @@ procSuite "Relay (GossipSub) Peer Exchange": newTestWakuNode(node2Key, listenAddress, port, sendSignedPeerRecord = true) # When both client and server mount relay without a handler - (await node1.mountRelay(@[DefaultRelayShard])).isOkOr: + (await node1.mountRelay()).isOkOr: assert false, "Failed to mount relay" - (await node2.mountRelay(@[DefaultRelayShard], none(RoutingRecordsHandler))).isOkOr: + (await node2.mountRelay(none(RoutingRecordsHandler))).isOkOr: assert false, "Failed to mount relay" # Then the relays are mounted without a handler @@ -74,11 +74,11 @@ procSuite "Relay (GossipSub) Peer Exchange": peerExchangeHandle: RoutingRecordsHandler = peerExchangeHandler # Givem the nodes mount relay with a peer exchange handler - (await node1.mountRelay(@[DefaultRelayShard], some(emptyPeerExchangeHandle))).isOkOr: + (await node1.mountRelay(some(emptyPeerExchangeHandle))).isOkOr: assert false, "Failed to mount relay" - (await node2.mountRelay(@[DefaultRelayShard], some(emptyPeerExchangeHandle))).isOkOr: + (await node2.mountRelay(some(emptyPeerExchangeHandle))).isOkOr: assert false, "Failed to mount relay" - (await node3.mountRelay(@[DefaultRelayShard], some(peerExchangeHandle))).isOkOr: + (await node3.mountRelay(some(peerExchangeHandle))).isOkOr: assert false, "Failed to mount relay" # Ensure that node1 prunes all peers after the first connection @@ -86,6 +86,19 @@ procSuite "Relay (GossipSub) Peer Exchange": await allFutures([node1.start(), node2.start(), node3.start()]) + # The three nodes should be subscribed to the same shard + proc simpleHandler( + topic: PubsubTopic, msg: WakuMessage + ): Future[void] {.async, gcsafe.} = + await sleepAsync(0.milliseconds) + + node1.subscribe((kind: PubsubSub, topic: $DefaultRelayShard), simpleHandler).isOkOr: + assert false, "Failed to subscribe to topic: " & $error + node2.subscribe((kind: PubsubSub, topic: $DefaultRelayShard), simpleHandler).isOkOr: + assert false, "Failed to subscribe to topic: " & $error + node3.subscribe((kind: PubsubSub, topic: $DefaultRelayShard), simpleHandler).isOkOr: + assert false, "Failed to subscribe to topic: " & $error + # When nodes are connected await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) await node3.connectToNodes(@[node1.switch.peerInfo.toRemotePeerInfo()]) diff --git a/tests/test_wakunode.nim b/tests/test_wakunode.nim index e50f3fe98..a7f1084fb 100644 --- a/tests/test_wakunode.nim +++ b/tests/test_wakunode.nim @@ -34,14 +34,14 @@ suite "WakuNode": # Setup node 1 with stable codec "/vac/waku/relay/2.0.0" await node1.start() - (await node1.mountRelay(@[shard])).isOkOr: + (await node1.mountRelay()).isOkOr: assert false, "Failed to mount relay" node1.wakuRelay.codec = "/vac/waku/relay/2.0.0" # Setup node 2 with beta codec "/vac/waku/relay/2.0.0-beta2" await node2.start() - (await node2.mountRelay(@[shard])).isOkOr: + (await node2.mountRelay()).isOkOr: assert false, "Failed to mount relay" node2.wakuRelay.codec = "/vac/waku/relay/2.0.0-beta2" @@ -69,7 +69,7 @@ suite "WakuNode": assert false, "Failed to unsubscribe from topic: " & $error ## Subscribe to the relay topic to add the custom relay handler defined above - node2.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)).isOkOr: + node2.subscribe((kind: PubsubSub, topic: $shard), relayHandler).isOkOr: assert false, "Failed to subscribe to topic" await sleepAsync(2000.millis) diff --git a/tests/waku_relay/test_protocol.nim b/tests/waku_relay/test_protocol.nim index bc2097caa..46032b693 100644 --- a/tests/waku_relay/test_protocol.nim +++ b/tests/waku_relay/test_protocol.nim @@ -77,7 +77,8 @@ suite "Waku Relay": asyncTest "Publish with Subscription (Network Size: 1)": # When subscribing to a Pubsub Topic - discard node.subscribe(pubsubTopic, simpleFutureHandler) + + node.subscribe(pubsubTopic, simpleFutureHandler) # Then the node is subscribed check: @@ -111,7 +112,7 @@ suite "Waku Relay": otherHandlerFuture.complete((topic, message)) # When subscribing the second node to the Pubsub Topic - discard otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler) + otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler) # Then the second node is subscribed, but not the first one check: @@ -172,8 +173,8 @@ suite "Waku Relay": otherHandlerFuture.complete((topic, message)) # When subscribing both nodes to the same Pubsub Topic - discard node.subscribe(pubsubTopic, simpleFutureHandler) - discard otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler) + node.subscribe(pubsubTopic, simpleFutureHandler) + otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler) # Then both nodes are subscribed check: @@ -228,7 +229,7 @@ suite "Waku Relay": asyncTest "Refreshing subscription": # Given a subscribed node - discard node.subscribe(pubsubTopic, simpleFutureHandler) + node.subscribe(pubsubTopic, simpleFutureHandler) check: node.isSubscribed(pubsubTopic) node.subscribedTopics == pubsubTopicSeq @@ -244,7 +245,7 @@ suite "Waku Relay": ) {.async, gcsafe.} = otherHandlerFuture.complete((topic, message)) - discard node.subscribe(pubsubTopic, otherSimpleFutureHandler) + node.subscribe(pubsubTopic, otherSimpleFutureHandler) check: node.isSubscribed(pubsubTopic) node.subscribedTopics == pubsubTopicSeq @@ -291,14 +292,14 @@ suite "Waku Relay": otherHandlerFuture.complete((topic, message)) otherNode.addValidator(len4Validator) - discard otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler) + otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler) await sleepAsync(500.millis) check: otherNode.isSubscribed(pubsubTopic) # Given a subscribed node with a validator node.addValidator(len4Validator) - discard node.subscribe(pubsubTopic, simpleFutureHandler) + node.subscribe(pubsubTopic, simpleFutureHandler) await sleepAsync(500.millis) check: node.isSubscribed(pubsubTopic) @@ -380,8 +381,8 @@ suite "Waku Relay": ) {.async, gcsafe.} = otherHandlerFuture.complete((topic, message)) - discard node.subscribe(pubsubTopic, simpleFutureHandler) - discard otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler) + node.subscribe(pubsubTopic, simpleFutureHandler) + otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler) check: node.isSubscribed(pubsubTopic) node.subscribedTopics == pubsubTopicSeq @@ -464,8 +465,8 @@ suite "Waku Relay": ) {.async, gcsafe.} = handlerFuture2.complete((topic, message)) - discard node.subscribe(pubsubTopic, simpleFutureHandler) - discard node.subscribe(pubsubTopicB, simpleFutureHandler2) + node.subscribe(pubsubTopic, simpleFutureHandler) + node.subscribe(pubsubTopicB, simpleFutureHandler2) # Given the other nodes are subscribed to two pubsub topics var otherHandlerFuture1 = newPushHandlerFuture() @@ -492,10 +493,10 @@ suite "Waku Relay": ) {.async, gcsafe.} = anotherHandlerFuture2.complete((topic, message)) - discard otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler1) - discard otherNode.subscribe(pubsubTopicC, otherSimpleFutureHandler2) - discard anotherNode.subscribe(pubsubTopicB, anotherSimpleFutureHandler1) - discard anotherNode.subscribe(pubsubTopicC, anotherSimpleFutureHandler2) + otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler1) + otherNode.subscribe(pubsubTopicC, otherSimpleFutureHandler2) + anotherNode.subscribe(pubsubTopicB, anotherSimpleFutureHandler1) + anotherNode.subscribe(pubsubTopicC, anotherSimpleFutureHandler2) await sleepAsync(500.millis) # When publishing a message in node for each of the pubsub topics @@ -735,15 +736,13 @@ suite "Waku Relay": otherSwitch = newTestSwitch() otherNode = await newTestWakuRelay(otherSwitch) await allFutures(otherSwitch.start(), otherNode.start()) - let otherTopicHandler: TopicHandler = - otherNode.subscribe(pubsubTopic, simpleFutureHandler) + otherNode.subscribe(pubsubTopic, simpleFutureHandler) # Given a node without a subscription check: node.subscribedTopics == [] - # When unsubscribing from a pubsub topic from an unsubscribed topic handler - node.unsubscribe(pubsubTopic, otherTopicHandler) + node.unsubscribe(pubsubTopic) # Then the node is still not subscribed check: @@ -754,11 +753,11 @@ suite "Waku Relay": asyncTest "Single Node with Single Pubsub Topic": # Given a node subscribed to a pubsub topic - let topicHandler = node.subscribe(pubsubTopic, simpleFutureHandler) + node.subscribe(pubsubTopic, simpleFutureHandler) check node.subscribedTopics == pubsubTopicSeq # When unsubscribing from the pubsub topic - node.unsubscribe(pubsubTopic, topicHandler) + node.unsubscribe(pubsubTopic) # Then the node is not subscribed anymore check node.subscribedTopics == [] @@ -768,9 +767,8 @@ suite "Waku Relay": let pubsubTopicB = "/waku/2/rs/0/1" # Given a node subscribed to multiple pubsub topics - let - topicHandler = node.subscribe(pubsubTopic, simpleFutureHandler) - topicHandlerB = node.subscribe(pubsubTopicB, simpleFutureHandler) + node.subscribe(pubsubTopic, simpleFutureHandler) + node.subscribe(pubsubTopicB, simpleFutureHandler) assert pubsubTopic in node.subscribedTopics, fmt"Node is not subscribed to {pubsubTopic}" @@ -778,13 +776,13 @@ suite "Waku Relay": fmt"Node is not subscribed to {pubsubTopicB}" # When unsubscribing from one of the pubsub topics - node.unsubscribe(pubsubTopic, topicHandler) + node.unsubscribe(pubsubTopic) # Then the node is still subscribed to the other pubsub topic check node.subscribedTopics == @[pubsubTopicB] # When unsubscribing from the other pubsub topic - node.unsubscribe(pubsubTopicB, topicHandlerB) + node.unsubscribe(pubsubTopicB) # Then the node is not subscribed anymore check node.subscribedTopics == [] @@ -802,7 +800,7 @@ suite "Waku Relay": asyncTest "Single Node with Single Pubsub Topic": # Given a node subscribed to a pubsub topic - discard node.subscribe(pubsubTopic, simpleFutureHandler) + node.subscribe(pubsubTopic, simpleFutureHandler) check node.subscribedTopics == pubsubTopicSeq # When unsubscribing from all pubsub topics @@ -816,9 +814,9 @@ suite "Waku Relay": let pubsubTopicB = "/waku/2/rs/0/1" # Given a node subscribed to multiple pubsub topics - discard node.subscribe(pubsubTopic, simpleFutureHandler) - discard node.subscribe(pubsubTopic, simpleFutureHandler) - discard node.subscribe(pubsubTopicB, simpleFutureHandler) + node.subscribe(pubsubTopic, simpleFutureHandler) + node.subscribe(pubsubTopic, simpleFutureHandler) + node.subscribe(pubsubTopicB, simpleFutureHandler) assert pubsubTopic in node.subscribedTopics, fmt"Node is not subscribed to {pubsubTopic}" @@ -855,8 +853,8 @@ suite "Waku Relay": ) {.async, gcsafe.} = otherHandlerFuture.complete((topic, message)) - discard otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler) - discard node.subscribe(pubsubTopic, simpleFutureHandler) + otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler) + node.subscribe(pubsubTopic, simpleFutureHandler) check: node.subscribedTopics == pubsubTopicSeq otherNode.subscribedTopics == pubsubTopicSeq @@ -1021,8 +1019,8 @@ suite "Waku Relay": ) {.async, gcsafe.} = otherHandlerFuture.complete((topic, message)) - discard otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler) - discard node.subscribe(pubsubTopic, simpleFutureHandler) + otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler) + node.subscribe(pubsubTopic, simpleFutureHandler) check: node.subscribedTopics == pubsubTopicSeq otherNode.subscribedTopics == pubsubTopicSeq @@ -1163,8 +1161,8 @@ suite "Waku Relay": otherMessageSeq.add((topic, message)) otherHandlerFuture.complete((topic, message)) - discard node.subscribe(pubsubTopic, thisSimpleFutureHandler) - discard otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler) + node.subscribe(pubsubTopic, thisSimpleFutureHandler) + otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler) check: node.subscribedTopics == pubsubTopicSeq otherNode.subscribedTopics == pubsubTopicSeq @@ -1237,8 +1235,8 @@ suite "Waku Relay": ) {.async, gcsafe.} = otherHandlerFuture.complete((topic, message)) - discard otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler) - discard node.subscribe(pubsubTopic, simpleFutureHandler) + otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler) + node.subscribe(pubsubTopic, simpleFutureHandler) check: node.subscribedTopics == pubsubTopicSeq otherNode.subscribedTopics == pubsubTopicSeq @@ -1332,8 +1330,8 @@ suite "Waku Relay": ) {.async, gcsafe.} = otherHandlerFuture.complete((topic, message)) - discard otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler) - discard node.subscribe(pubsubTopic, simpleFutureHandler) + otherNode.subscribe(pubsubTopic, otherSimpleFutureHandler) + node.subscribe(pubsubTopic, simpleFutureHandler) check: node.subscribedTopics == pubsubTopicSeq otherNode.subscribedTopics == pubsubTopicSeq diff --git a/tests/waku_relay/test_wakunode_relay.nim b/tests/waku_relay/test_wakunode_relay.nim index 3f6bfd3e7..ad8d83361 100644 --- a/tests/waku_relay/test_wakunode_relay.nim +++ b/tests/waku_relay/test_wakunode_relay.nim @@ -70,15 +70,15 @@ suite "WakuNode - Relay": message = WakuMessage(payload: payload, contentTopic: contentTopic) await node1.start() - (await node1.mountRelay(@[shard])).isOkOr: + (await node1.mountRelay()).isOkOr: assert false, "Failed to mount relay" await node2.start() - (await node2.mountRelay(@[shard])).isOkOr: + (await node2.mountRelay()).isOkOr: assert false, "Failed to mount relay" await node3.start() - (await node3.mountRelay(@[shard])).isOkOr: + (await node3.mountRelay()).isOkOr: assert false, "Failed to mount relay" await allFutures( @@ -97,13 +97,19 @@ suite "WakuNode - Relay": msg.timestamp > 0 completionFut.complete(true) - ## The following unsubscription is necessary to remove the default relay handler, which is - ## added when mountRelay is called. - node3.unsubscribe((kind: PubsubUnsub, topic: $shard)).isOkOr: - assert false, "Failed to unsubscribe from topic: " & $error + proc simpleHandler( + topic: PubsubTopic, msg: WakuMessage + ): Future[void] {.async, gcsafe.} = + await sleepAsync(0.milliseconds) + + ## node1 and node2 explicitly subscribe to the same shard as node3 + node1.subscribe((kind: PubsubSub, topic: $shard), simpleHandler).isOkOr: + assert false, "Failed to subscribe to topic: " & $error + node2.subscribe((kind: PubsubSub, topic: $shard), simpleHandler).isOkOr: + assert false, "Failed to subscribe to topic: " & $error ## Subscribe to the relay topic to add the custom relay handler defined above - node3.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)).isOkOr: + node3.subscribe((kind: PubsubSub, topic: $shard), relayHandler).isOkOr: assert false, "Failed to subscribe to topic: " & $error await sleepAsync(500.millis) @@ -147,15 +153,15 @@ suite "WakuNode - Relay": # start all the nodes await node1.start() - (await node1.mountRelay(@[shard])).isOkOr: + (await node1.mountRelay()).isOkOr: assert false, "Failed to mount relay" await node2.start() - (await node2.mountRelay(@[shard])).isOkOr: + (await node2.mountRelay()).isOkOr: assert false, "Failed to mount relay" await node3.start() - (await node3.mountRelay(@[shard])).isOkOr: + (await node3.mountRelay()).isOkOr: assert false, "Failed to mount relay" await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) @@ -193,13 +199,19 @@ suite "WakuNode - Relay": # relay handler is called completionFut.complete(true) - ## The following unsubscription is necessary to remove the default relay handler, which is - ## added when mountRelay is called. - node3.unsubscribe((kind: PubsubUnsub, topic: $shard)).isOkOr: - assert false, "Failed to unsubscribe from topic: " & $error + proc simpleHandler( + topic: PubsubTopic, msg: WakuMessage + ): Future[void] {.async, gcsafe.} = + await sleepAsync(0.milliseconds) + + ## node1 and node2 explicitly subscribe to the same shard as node3 + node1.subscribe((kind: PubsubSub, topic: $shard), simpleHandler).isOkOr: + assert false, "Failed to subscribe to topic: " & $error + node2.subscribe((kind: PubsubSub, topic: $shard), simpleHandler).isOkOr: + assert false, "Failed to subscribe to topic: " & $error ## Subscribe to the relay topic to add the custom relay handler defined above - node3.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)).isOkOr: + node3.subscribe((kind: PubsubSub, topic: $shard), relayHandler).isOkOr: assert false, "Failed to subscribe to topic: " & $error await sleepAsync(500.millis) @@ -287,11 +299,11 @@ suite "WakuNode - Relay": message = WakuMessage(payload: payload, contentTopic: contentTopic) await node1.start() - (await node1.mountRelay(@[shard])).isOkOr: + (await node1.mountRelay()).isOkOr: assert false, "Failed to mount relay" await node2.start() - (await node2.mountRelay(@[shard])).isOkOr: + (await node2.mountRelay()).isOkOr: assert false, "Failed to mount relay" await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) @@ -313,7 +325,7 @@ suite "WakuNode - Relay": assert false, "Failed to unsubscribe from topic: " & $error ## Subscribe to the relay topic to add the custom relay handler defined above - node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)).isOkOr: + node1.subscribe((kind: PubsubSub, topic: $shard), relayHandler).isOkOr: assert false, "Failed to subscribe to topic: " & $error await sleepAsync(500.millis) @@ -345,11 +357,11 @@ suite "WakuNode - Relay": message = WakuMessage(payload: payload, contentTopic: contentTopic) await node1.start() - (await node1.mountRelay(@[shard])).isOkOr: + (await node1.mountRelay()).isOkOr: assert false, "Failed to mount relay" await node2.start() - (await node2.mountRelay(@[shard])).isOkOr: + (await node2.mountRelay()).isOkOr: assert false, "Failed to mount relay" await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) @@ -371,7 +383,7 @@ suite "WakuNode - Relay": assert false, "Failed to unsubscribe from topic: " & $error ## Subscribe to the relay topic to add the custom relay handler defined above - node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)).isOkOr: + node1.subscribe((kind: PubsubSub, topic: $shard), relayHandler).isOkOr: assert false, "Failed to subscribe to topic: " & $error await sleepAsync(500.millis) @@ -403,11 +415,11 @@ suite "WakuNode - Relay": message = WakuMessage(payload: payload, contentTopic: contentTopic) await node1.start() - (await node1.mountRelay(@[shard])).isOkOr: + (await node1.mountRelay()).isOkOr: assert false, "Failed to mount relay" await node2.start() - (await node2.mountRelay(@[shard])).isOkOr: + (await node2.mountRelay()).isOkOr: assert false, "Failed to mount relay" #delete websocket peer address @@ -433,7 +445,7 @@ suite "WakuNode - Relay": assert false, "Failed to unsubscribe from topic: " & $error ## Subscribe to the relay topic to add the custom relay handler defined above - node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)).isOkOr: + node1.subscribe((kind: PubsubSub, topic: $shard), relayHandler).isOkOr: assert false, "Failed to subscribe to topic: " & $error await sleepAsync(500.millis) @@ -467,11 +479,11 @@ suite "WakuNode - Relay": message = WakuMessage(payload: payload, contentTopic: contentTopic) await node1.start() - (await node1.mountRelay(@[shard])).isOkOr: + (await node1.mountRelay()).isOkOr: assert false, "Failed to mount relay" await node2.start() - (await node2.mountRelay(@[shard])).isOkOr: + (await node2.mountRelay()).isOkOr: assert false, "Failed to mount relay" await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) @@ -493,7 +505,7 @@ suite "WakuNode - Relay": assert false, "Failed to unsubscribe from topic: " & $error ## Subscribe to the relay topic to add the custom relay handler defined above - node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)).isOkOr: + node1.subscribe((kind: PubsubSub, topic: $shard), relayHandler).isOkOr: assert false, "Failed to subscribe to topic: " & $error await sleepAsync(500.millis) @@ -535,11 +547,11 @@ suite "WakuNode - Relay": message = WakuMessage(payload: payload, contentTopic: contentTopic) await node1.start() - (await node1.mountRelay(@[shard])).isOkOr: + (await node1.mountRelay()).isOkOr: assert false, "Failed to mount relay" await node2.start() - (await node2.mountRelay(@[shard])).isOkOr: + (await node2.mountRelay()).isOkOr: assert false, "Failed to mount relay" await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) @@ -561,7 +573,7 @@ suite "WakuNode - Relay": assert false, "Failed to unsubscribe from topic: " & $error ## Subscribe to the relay topic to add the custom relay handler defined above - node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)).isOkOr: + node1.subscribe((kind: PubsubSub, topic: $shard), relayHandler).isOkOr: assert false, "Failed to subscribe to topic: " & $error await sleepAsync(500.millis) @@ -583,10 +595,15 @@ suite "WakuNode - Relay": await allFutures(nodes.mapIt(it.start())) await allFutures(nodes.mapIt(it.mountRelay())) + proc simpleHandler( + topic: PubsubTopic, msg: WakuMessage + ): Future[void] {.async, gcsafe.} = + await sleepAsync(0.milliseconds) + # subscribe all nodes to a topic let topic = "topic" for node in nodes: - discard node.wakuRelay.subscribe(topic, nil) + node.wakuRelay.subscribe(topic, simpleHandler) await sleepAsync(500.millis) # connect nodes in full mesh @@ -661,19 +678,24 @@ suite "WakuNode - Relay": "topic must use the same shard" ## When - node.subscribe((kind: ContentSub, topic: contentTopicA), some(handler)).isOkOr: + node.subscribe((kind: ContentSub, topic: contentTopicA), handler).isOkOr: assert false, "Failed to subscribe to topic: " & $error - node.subscribe((kind: ContentSub, topic: contentTopicB), some(handler)).isOkOr: + node.subscribe((kind: ContentSub, topic: contentTopicB), handler).isOkOr: assert false, "The subscription call shouldn't error even though it's already subscribed to that shard" - node.subscribe((kind: ContentSub, topic: contentTopicC), some(handler)).isOkOr: + node.subscribe((kind: ContentSub, topic: contentTopicC), handler).isOkOr: assert false, "The subscription call shouldn't error even though it's already subscribed to that shard" + ## The node should be subscribed to the shard + check node.wakuRelay.isSubscribed(shard) + ## Then node.unsubscribe((kind: ContentUnsub, topic: contentTopicB)).isOkOr: assert false, "Failed to unsubscribe to topic: " & $error - check node.wakuRelay.isSubscribed(shard) + + ## After unsubcription, the node should not be subscribed to the shard anymore + check not node.wakuRelay.isSubscribed(shard) ## Cleanup await node.stop() diff --git a/tests/waku_relay/utils.nim b/tests/waku_relay/utils.nim index 309f800dc..81e366298 100644 --- a/tests/waku_relay/utils.nim +++ b/tests/waku_relay/utils.nim @@ -60,7 +60,7 @@ proc subscribeToContentTopicWithHandler*( if topic == topic: completionFut.complete(true) - (node.subscribe((kind: ContentSub, topic: contentTopic), some(relayHandler))).isOkOr: + (node.subscribe((kind: ContentSub, topic: contentTopic), relayHandler)).isOkOr: error "Failed to subscribe to content topic", error completionFut.complete(true) return completionFut @@ -73,7 +73,7 @@ proc subscribeCompletionHandler*(node: WakuNode, pubsubTopic: string): Future[bo if topic == pubsubTopic: completionFut.complete(true) - (node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))).isOkOr: + (node.subscribe((kind: PubsubSub, topic: pubsubTopic), relayHandler)).isOkOr: error "Failed to subscribe to pubsub topic", error completionFut.complete(false) return completionFut diff --git a/tests/waku_rln_relay/test_wakunode_rln_relay.nim b/tests/waku_rln_relay/test_wakunode_rln_relay.nim index 312fe5cfc..8b5a47174 100644 --- a/tests/waku_rln_relay/test_wakunode_rln_relay.nim +++ b/tests/waku_rln_relay/test_wakunode_rln_relay.nim @@ -57,7 +57,7 @@ procSuite "WakuNode - RLN relay": # set up three nodes # node1 - (await node1.mountRelay(@[DefaultRelayShard])).isOkOr: + (await node1.mountRelay()).isOkOr: assert false, "Failed to mount relay" # mount rlnrelay in off-chain mode @@ -74,7 +74,7 @@ procSuite "WakuNode - RLN relay": await node1.start() # node 2 - (await node2.mountRelay(@[DefaultRelayShard])).isOkOr: + (await node2.mountRelay()).isOkOr: assert false, "Failed to mount relay" # mount rlnrelay in off-chain mode let wakuRlnConfig2 = WakuRlnConfig( @@ -90,7 +90,7 @@ procSuite "WakuNode - RLN relay": await node2.start() # node 3 - (await node3.mountRelay(@[DefaultRelayShard])).isOkOr: + (await node3.mountRelay()).isOkOr: assert false, "Failed to mount relay" let wakuRlnConfig3 = WakuRlnConfig( @@ -117,13 +117,18 @@ procSuite "WakuNode - RLN relay": if topic == DefaultPubsubTopic: completionFut.complete(true) - ## The following unsubscription is necessary to remove the default relay handler, which is - ## added when mountRelay is called. - node3.unsubscribe((kind: PubsubUnsub, topic: DefaultPubsubTopic)).isOkOr: - assert false, "Failed to unsubscribe from topic: " & $error + proc simpleHandler( + topic: PubsubTopic, msg: WakuMessage + ): Future[void] {.async, gcsafe.} = + await sleepAsync(0.milliseconds) + + node1.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler).isOkOr: + assert false, "Failed to subscribe to pubsub topic in node1: " & $error + node2.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler).isOkOr: + assert false, "Failed to subscribe to pubsub topic in node2: " & $error ## Subscribe to the relay topic to add the custom relay handler defined above - node3.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler)).isOkOr: + node3.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), relayHandler).isOkOr: assert false, "Failed to subscribe to pubsub topic: " & $error await sleepAsync(2000.millis) @@ -146,8 +151,7 @@ procSuite "WakuNode - RLN relay": discard await node1.publish(some(DefaultPubsubTopic), message) await sleepAsync(2000.millis) - check: - (await completionFut.withTimeout(10.seconds)) == true + assert (await completionFut.withTimeout(10.seconds)), "completionFut timed out" await node1.stop() await node2.stop() @@ -169,7 +173,7 @@ procSuite "WakuNode - RLN relay": ] # set up three nodes - await allFutures(nodes.mapIt(it.mountRelay(shards))) + await allFutures(nodes.mapIt(it.mountRelay())) # mount rlnrelay in off-chain mode for index, node in nodes: @@ -201,17 +205,20 @@ procSuite "WakuNode - RLN relay": elif topic == $shards[1]: rxMessagesTopic2 = rxMessagesTopic2 + 1 - ## This unsubscription is necessary to remove the default relay handler, which is - ## added when mountRelay is called. - nodes[2].unsubscribe((kind: PubsubUnsub, topic: $shards[0])).isOkOr: - assert false, "Failed to unsubscribe to pubsub topic: " & $error - nodes[2].unsubscribe((kind: PubsubUnsub, topic: $shards[1])).isOkOr: - assert false, "Failed to unsubscribe to pubsub topic: " & $error + proc simpleHandler( + topic: PubsubTopic, msg: WakuMessage + ): Future[void] {.async, gcsafe.} = + await sleepAsync(0.milliseconds) + + nodes[0].subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler).isOkOr: + assert false, "Failed to subscribe to pubsub topic in nodes[0]: " & $error + nodes[1].subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler).isOkOr: + assert false, "Failed to subscribe to pubsub topic in nodes[1]: " & $error # mount the relay handlers - nodes[2].subscribe((kind: PubsubSub, topic: $shards[0]), some(relayHandler)).isOkOr: + nodes[2].subscribe((kind: PubsubSub, topic: $shards[0]), relayHandler).isOkOr: assert false, "Failed to subscribe to pubsub topic: " & $error - nodes[2].subscribe((kind: PubsubSub, topic: $shards[1]), some(relayHandler)).isOkOr: + nodes[2].subscribe((kind: PubsubSub, topic: $shards[1]), relayHandler).isOkOr: assert false, "Failed to subscribe to pubsub topic: " & $error await sleepAsync(1000.millis) @@ -279,7 +286,7 @@ procSuite "WakuNode - RLN relay": # set up three nodes # node1 - (await node1.mountRelay(@[DefaultRelayShard])).isOkOr: + (await node1.mountRelay()).isOkOr: assert false, "Failed to mount relay" # mount rlnrelay in off-chain mode @@ -296,7 +303,7 @@ procSuite "WakuNode - RLN relay": await node1.start() # node 2 - (await node2.mountRelay(@[DefaultRelayShard])).isOkOr: + (await node2.mountRelay()).isOkOr: assert false, "Failed to mount relay" # mount rlnrelay in off-chain mode let wakuRlnConfig2 = WakuRlnConfig( @@ -312,7 +319,7 @@ procSuite "WakuNode - RLN relay": await node2.start() # node 3 - (await node3.mountRelay(@[DefaultRelayShard])).isOkOr: + (await node3.mountRelay()).isOkOr: assert false, "Failed to mount relay" let wakuRlnConfig3 = WakuRlnConfig( @@ -339,13 +346,18 @@ procSuite "WakuNode - RLN relay": if topic == DefaultPubsubTopic: completionFut.complete(true) - ## The following unsubscription is necessary to remove the default relay handler, which is - ## added when mountRelay is called. - node3.unsubscribe((kind: PubsubUnsub, topic: DefaultPubsubTopic)).isOkOr: - assert false, "Failed to unsubscribe to pubsub topic: " & $error + proc simpleHandler( + topic: PubsubTopic, msg: WakuMessage + ): Future[void] {.async, gcsafe.} = + await sleepAsync(0.milliseconds) + + node1.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler).isOkOr: + assert false, "Failed to subscribe to pubsub topic in node1: " & $error + node2.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler).isOkOr: + assert false, "Failed to subscribe to pubsub topic in node2: " & $error # mount the relay handler - node3.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler)).isOkOr: + node3.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), relayHandler).isOkOr: assert false, "Failed to subscribe to pubsub topic: " & $error await sleepAsync(2000.millis) @@ -408,7 +420,7 @@ procSuite "WakuNode - RLN relay": # set up three nodes # node1 - (await node1.mountRelay(@[DefaultRelayShard])).isOkOr: + (await node1.mountRelay()).isOkOr: assert false, "Failed to mount relay" # mount rlnrelay in off-chain mode @@ -425,7 +437,7 @@ procSuite "WakuNode - RLN relay": await node1.start() # node 2 - (await node2.mountRelay(@[DefaultRelayShard])).isOkOr: + (await node2.mountRelay()).isOkOr: assert false, "Failed to mount relay" # mount rlnrelay in off-chain mode @@ -441,7 +453,7 @@ procSuite "WakuNode - RLN relay": await node2.start() # node 3 - (await node3.mountRelay(@[DefaultRelayShard])).isOkOr: + (await node3.mountRelay()).isOkOr: assert false, "Failed to mount relay" # mount rlnrelay in off-chain mode @@ -513,13 +525,18 @@ procSuite "WakuNode - RLN relay": if msg.payload == wm4.payload: completionFut4.complete(true) - ## The following unsubscription is necessary to remove the default relay handler, which is - ## added when mountRelay is called. - node3.unsubscribe((kind: PubsubUnsub, topic: DefaultPubsubTopic)).isOkOr: - assert false, "Failed to unsubscribe to pubsub topic: " & $error + proc simpleHandler( + topic: PubsubTopic, msg: WakuMessage + ): Future[void] {.async, gcsafe.} = + await sleepAsync(0.milliseconds) + + node1.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler).isOkOr: + assert false, "Failed to subscribe to pubsub topic in node1: " & $error + node2.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler).isOkOr: + assert false, "Failed to subscribe to pubsub topic in node2: " & $error # mount the relay handler for node3 - node3.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler)).isOkOr: + node3.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), relayHandler).isOkOr: assert false, "Failed to subscribe to pubsub topic: " & $error await sleepAsync(2000.millis) @@ -562,14 +579,14 @@ procSuite "WakuNode - RLN relay": epochSizeSec: uint64 = 5 # This means rlnMaxEpochGap = 4 # Given both nodes mount relay and rlnrelay - (await node1.mountRelay(shardSeq)).isOkOr: + (await node1.mountRelay()).isOkOr: assert false, "Failed to mount relay" let wakuRlnConfig1 = buildWakuRlnConfig(1, epochSizeSec, "wakunode_10") (await node1.mountRlnRelay(wakuRlnConfig1)).isOkOr: assert false, "Failed to mount rlnrelay" # Mount rlnrelay in node2 in off-chain mode - (await node2.mountRelay(@[DefaultRelayShard])).isOkOr: + (await node2.mountRelay()).isOkOr: assert false, "Failed to mount relay" let wakuRlnConfig2 = buildWakuRlnConfig(2, epochSizeSec, "wakunode_11") await node2.mountRlnRelay(wakuRlnConfig2) @@ -613,7 +630,7 @@ procSuite "WakuNode - RLN relay": if msg == wm6: completionFut6.complete(true) - node2.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler)).isOkOr: + node2.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), relayHandler).isOkOr: assert false, "Failed to subscribe to pubsub topic: " & $error # Given all messages have an rln proof and are published by the node 1 @@ -704,17 +721,27 @@ procSuite "WakuNode - RLN relay": # Given both nodes mount relay and rlnrelay # Mount rlnrelay in node1 in off-chain mode - (await node1.mountRelay(shardSeq)).isOkOr: + (await node1.mountRelay()).isOkOr: assert false, "Failed to mount relay" let wakuRlnConfig1 = buildWakuRlnConfig(1, epochSizeSec, "wakunode_10") await node1.mountRlnRelay(wakuRlnConfig1) # Mount rlnrelay in node2 in off-chain mode - (await node2.mountRelay(@[DefaultRelayShard])).isOkOr: + (await node2.mountRelay()).isOkOr: assert false, "Failed to mount relay" let wakuRlnConfig2 = buildWakuRlnConfig(2, epochSizeSec, "wakunode_11") await node2.mountRlnRelay(wakuRlnConfig2) + proc simpleHandler( + topic: PubsubTopic, msg: WakuMessage + ): Future[void] {.async, gcsafe.} = + await sleepAsync(0.milliseconds) + + node1.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler).isOkOr: + assert false, "Failed to subscribe to pubsub topic in node2: " & $error + node2.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler).isOkOr: + assert false, "Failed to subscribe to pubsub topic in node1: " & $error + # Given the two nodes are started and connected waitFor allFutures(node1.start(), node2.start()) await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) diff --git a/tests/wakunode2/test_validators.nim b/tests/wakunode2/test_validators.nim index 44b6ae118..b0a8dd8fb 100644 --- a/tests/wakunode2/test_validators.nim +++ b/tests/wakunode2/test_validators.nim @@ -73,7 +73,9 @@ suite "WakuNode2 - Validators": # Subscribe all nodes to the same topic/handler for node in nodes: - discard node.wakuRelay.subscribe($spamProtectedShard, handler) + node.subscribe((kind: PubsubSub, topic: $spamProtectedShard), handler).isOkOr: + assert false, "Failed to subscribe to topic: " & $error + await sleepAsync(500.millis) # Each node publishes 10 signed messages @@ -163,7 +165,9 @@ suite "WakuNode2 - Validators": # Subscribe all nodes to the same topic/handler for node in nodes: - discard node.wakuRelay.subscribe($spamProtectedShard, handler) + node.subscribe((kind: PubsubSub, topic: $spamProtectedShard), handler).isOkOr: + assert false, "Failed to subscribe to topic: " & $error + await sleepAsync(500.millis) # Each node sends 5 messages, signed but with a non-whitelisted key (total = 25) @@ -291,7 +295,8 @@ suite "WakuNode2 - Validators": # Subscribe all nodes to the same topic/handler for node in nodes: - discard node.wakuRelay.subscribe($spamProtectedShard, handler) + node.subscribe((kind: PubsubSub, topic: $spamProtectedShard), handler).isOkOr: + assert false, "Failed to subscribe to topic: " & $error await sleepAsync(500.millis) # Add signed message validator to all nodes. They will only route signed messages diff --git a/tests/wakunode_rest/test_rest_admin.nim b/tests/wakunode_rest/test_rest_admin.nim index a3546f1f8..4e59b0725 100644 --- a/tests/wakunode_rest/test_rest_admin.nim +++ b/tests/wakunode_rest/test_rest_admin.nim @@ -43,14 +43,27 @@ suite "Waku v2 Rest API - Admin": node3 = newTestWakuNode(generateSecp256k1Key(), getPrimaryIPAddr(), Port(60604)) await allFutures(node1.start(), node2.start(), node3.start()) - let shards = @[RelayShard(clusterId: 1, shardId: 0)] await allFutures( - node1.mountRelay(shards = shards), - node2.mountRelay(shards = shards), - node3.mountRelay(shards = shards), + node1.mountRelay(), + node2.mountRelay(), + node3.mountRelay(), node3.mountPeerExchange(), ) + # The three nodes should be subscribed to the same shard + proc simpleHandler( + topic: PubsubTopic, msg: WakuMessage + ): Future[void] {.async, gcsafe.} = + await sleepAsync(0.milliseconds) + + let shard = RelayShard(clusterId: 1, shardId: 0) + node1.subscribe((kind: PubsubSub, topic: $shard), simpleHandler).isOkOr: + assert false, "Failed to subscribe to topic: " & $error + node2.subscribe((kind: PubsubSub, topic: $shard), simpleHandler).isOkOr: + assert false, "Failed to subscribe to topic: " & $error + node3.subscribe((kind: PubsubSub, topic: $shard), simpleHandler).isOkOr: + assert false, "Failed to subscribe to topic: " & $error + peerInfo1 = node1.switch.peerInfo peerInfo2 = node2.switch.peerInfo peerInfo3 = node3.switch.peerInfo diff --git a/tests/wakunode_rest/test_rest_filter.nim b/tests/wakunode_rest/test_rest_filter.nim index dcd430a0e..f8dbf429a 100644 --- a/tests/wakunode_rest/test_rest_filter.nim +++ b/tests/wakunode_rest/test_rest_filter.nim @@ -278,8 +278,16 @@ suite "Waku v2 Rest API - Filter V2": restFilterTest = await RestFilterTest.init() subPeerId = restFilterTest.subscriberNode.peerInfo.toRemotePeerInfo().peerId + let simpleHandler = proc( + topic: PubsubTopic, msg: WakuMessage + ): Future[void] {.async, gcsafe.} = + await sleepAsync(0.milliseconds) + restFilterTest.messageCache.pubsubSubscribe(DefaultPubsubTopic) - restFilterTest.serviceNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic)).isOkOr: + + restFilterTest.serviceNode.subscribe( + (kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler + ).isOkOr: assert false, "Failed to subscribe to topic: " & $error # When @@ -325,7 +333,14 @@ suite "Waku v2 Rest API - Filter V2": # setup filter service and client node let restFilterTest = await RestFilterTest.init() let subPeerId = restFilterTest.subscriberNode.peerInfo.toRemotePeerInfo().peerId - restFilterTest.serviceNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic)).isOkOr: + let simpleHandler = proc( + topic: PubsubTopic, msg: WakuMessage + ): Future[void] {.async, gcsafe.} = + await sleepAsync(0.milliseconds) + + restFilterTest.serviceNode.subscribe( + (kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler + ).isOkOr: assert false, "Failed to subscribe to topic: " & $error let requestBody = FilterSubscribeRequest( @@ -397,7 +412,14 @@ suite "Waku v2 Rest API - Filter V2": # setup filter service and client node let restFilterTest = await RestFilterTest.init() let subPeerId = restFilterTest.subscriberNode.peerInfo.toRemotePeerInfo().peerId - restFilterTest.serviceNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic)).isOkOr: + let simpleHandler = proc( + topic: PubsubTopic, msg: WakuMessage + ): Future[void] {.async, gcsafe.} = + await sleepAsync(0.milliseconds) + + restFilterTest.serviceNode.subscribe( + (kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler + ).isOkOr: assert false, "Failed to subscribe to topic: " & $error let requestBody = FilterSubscribeRequest( diff --git a/tests/wakunode_rest/test_rest_lightpush.nim b/tests/wakunode_rest/test_rest_lightpush.nim index 035b2a884..b09c72ee3 100644 --- a/tests/wakunode_rest/test_rest_lightpush.nim +++ b/tests/wakunode_rest/test_rest_lightpush.nim @@ -128,13 +128,18 @@ suite "Waku v2 Rest API - lightpush": # Given let restLightPushTest = await RestLightPushTest.init() + let simpleHandler = proc( + topic: PubsubTopic, msg: WakuMessage + ): Future[void] {.async, gcsafe.} = + await sleepAsync(0.milliseconds) + restLightPushTest.consumerNode.subscribe( - (kind: PubsubSub, topic: DefaultPubsubTopic) + (kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler ).isOkOr: assert false, "Failed to subscribe to relay: " & $error restLightPushTest.serviceNode.subscribe( - (kind: PubsubSub, topic: DefaultPubsubTopic) + (kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler ).isOkOr: assert false, "Failed to subscribe to relay: " & $error require: @@ -162,9 +167,13 @@ suite "Waku v2 Rest API - lightpush": asyncTest "Push message bad-request": # Given let restLightPushTest = await RestLightPushTest.init() + let simpleHandler = proc( + topic: PubsubTopic, msg: WakuMessage + ): Future[void] {.async, gcsafe.} = + await sleepAsync(0.milliseconds) restLightPushTest.serviceNode.subscribe( - (kind: PubsubSub, topic: DefaultPubsubTopic) + (kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler ).isOkOr: assert false, "Failed to subscribe to relay: " & $error require: @@ -220,14 +229,18 @@ suite "Waku v2 Rest API - lightpush": let budgetCap = 3 let tokenPeriod = 500.millis let restLightPushTest = await RestLightPushTest.init((budgetCap, tokenPeriod)) + let simpleHandler = proc( + topic: PubsubTopic, msg: WakuMessage + ): Future[void] {.async, gcsafe.} = + await sleepAsync(0.milliseconds) restLightPushTest.consumerNode.subscribe( - (kind: PubsubSub, topic: DefaultPubsubTopic) + (kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler ).isOkOr: assert false, "Failed to subscribe to relay: " & $error restLightPushTest.serviceNode.subscribe( - (kind: PubsubSub, topic: DefaultPubsubTopic) + (kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler ).isOkOr: assert false, "Failed to subscribe to relay: " & $error require: diff --git a/tests/wakunode_rest/test_rest_lightpush_legacy.nim b/tests/wakunode_rest/test_rest_lightpush_legacy.nim index f50703bae..fea51554b 100644 --- a/tests/wakunode_rest/test_rest_lightpush_legacy.nim +++ b/tests/wakunode_rest/test_rest_lightpush_legacy.nim @@ -122,14 +122,18 @@ suite "Waku v2 Rest API - lightpush": asyncTest "Push message request": # Given let restLightPushTest = await RestLightPushTest.init() + let simpleHandler = proc( + topic: PubsubTopic, msg: WakuMessage + ): Future[void] {.async, gcsafe.} = + await sleepAsync(0.milliseconds) restLightPushTest.consumerNode.subscribe( - (kind: PubsubSub, topic: DefaultPubsubTopic) + (kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler ).isOkOr: assert false, "Failed to subscribe to topic" restLightPushTest.serviceNode.subscribe( - (kind: PubsubSub, topic: DefaultPubsubTopic) + (kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler ).isOkOr: assert false, "Failed to subscribe to topic" require: @@ -157,9 +161,13 @@ suite "Waku v2 Rest API - lightpush": asyncTest "Push message bad-request": # Given let restLightPushTest = await RestLightPushTest.init() + let simpleHandler = proc( + topic: PubsubTopic, msg: WakuMessage + ): Future[void] {.async, gcsafe.} = + await sleepAsync(0.milliseconds) restLightPushTest.serviceNode.subscribe( - (kind: PubsubSub, topic: DefaultPubsubTopic) + (kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler ).isOkOr: assert false, "Failed to subscribe to topic" require: @@ -218,14 +226,18 @@ suite "Waku v2 Rest API - lightpush": let budgetCap = 3 let tokenPeriod = 500.millis let restLightPushTest = await RestLightPushTest.init((budgetCap, tokenPeriod)) + let simpleHandler = proc( + topic: PubsubTopic, msg: WakuMessage + ): Future[void] {.async, gcsafe.} = + await sleepAsync(0.milliseconds) restLightPushTest.consumerNode.subscribe( - (kind: PubsubSub, topic: DefaultPubsubTopic) + (kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler ).isOkOr: assert false, "Failed to subscribe to topic" restLightPushTest.serviceNode.subscribe( - (kind: PubsubSub, topic: DefaultPubsubTopic) + (kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler ).isOkOr: assert false, "Failed to subscribe to topic" require: diff --git a/tests/wakunode_rest/test_rest_relay.nim b/tests/wakunode_rest/test_rest_relay.nim index 208b86190..8ea7f2abe 100644 --- a/tests/wakunode_rest/test_rest_relay.nim +++ b/tests/wakunode_rest/test_rest_relay.nim @@ -95,9 +95,18 @@ suite "Waku v2 Rest API - Relay": shard3 = RelayShard(clusterId: DefaultClusterId, shardId: 3) shard4 = RelayShard(clusterId: DefaultClusterId, shardId: 4) - (await node.mountRelay(@[shard0, shard1, shard2, shard3, shard4])).isOkOr: + (await node.mountRelay()).isOkOr: assert false, "Failed to mount relay" + proc simpleHandler( + topic: PubsubTopic, msg: WakuMessage + ): Future[void] {.async, gcsafe.} = + await sleepAsync(0.milliseconds) + + for shard in @[$shard0, $shard1, $shard2, $shard3, $shard4]: + node.subscribe((kind: PubsubSub, topic: shard), simpleHandler).isOkOr: + assert false, "Failed to subscribe to pubsub topic: " & $error + var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet() @@ -248,8 +257,14 @@ suite "Waku v2 Rest API - Relay": let client = newRestHttpClient(initTAddress(restAddress, restPort)) - node.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic)).isOkOr: + let simpleHandler = proc( + topic: PubsubTopic, msg: WakuMessage + ): Future[void] {.async, gcsafe.} = + await sleepAsync(0.milliseconds) + + node.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler).isOkOr: assert false, "Failed to subscribe to pubsub topic" + require: toSeq(node.wakuRelay.subscribedTopics).len == 1 @@ -477,7 +492,12 @@ suite "Waku v2 Rest API - Relay": let client = newRestHttpClient(initTAddress(restAddress, restPort)) - node.subscribe((kind: ContentSub, topic: DefaultContentTopic)).isOkOr: + let simpleHandler = proc( + topic: PubsubTopic, msg: WakuMessage + ): Future[void] {.async, gcsafe.} = + await sleepAsync(0.milliseconds) + + node.subscribe((kind: ContentSub, topic: DefaultContentTopic), simpleHandler).isOkOr: assert false, "Failed to subscribe to content topic: " & $error require: toSeq(node.wakuRelay.subscribedTopics).len == 1 @@ -583,7 +603,12 @@ suite "Waku v2 Rest API - Relay": let client = newRestHttpClient(initTAddress(restAddress, restPort)) - node.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic)).isOkOr: + let simpleHandler = proc( + topic: PubsubTopic, msg: WakuMessage + ): Future[void] {.async, gcsafe.} = + await sleepAsync(0.milliseconds) + + node.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler).isOkOr: assert false, "Failed to subscribe to pubsub topic: " & $error require: toSeq(node.wakuRelay.subscribedTopics).len == 1 @@ -640,7 +665,12 @@ suite "Waku v2 Rest API - Relay": let client = newRestHttpClient(initTAddress(restAddress, restPort)) - node.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic)).isOkOr: + let simpleHandler = proc( + topic: PubsubTopic, msg: WakuMessage + ): Future[void] {.async, gcsafe.} = + await sleepAsync(0.milliseconds) + + node.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), simpleHandler).isOkOr: assert false, "Failed to subscribe to pubsub topic: " & $error require: toSeq(node.wakuRelay.subscribedTopics).len == 1 diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index 7df5c2567..a03e2a1e1 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -317,10 +317,7 @@ proc setupProtocols( ( await mountRelay( - node, - shards, - peerExchangeHandler = peerExchangeHandler, - int(conf.maxMessageSizeBytes), + node, peerExchangeHandler = peerExchangeHandler, int(conf.maxMessageSizeBytes) ) ).isOkOr: return err("failed to mount waku relay protocol: " & $error) diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index 01dc7a36f..fe797b0a3 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -1,7 +1,7 @@ {.push raises: [].} import - std/[options, sequtils], + std/[options, sequtils, strformat], results, chronicles, chronos, @@ -130,8 +130,12 @@ proc setupAppCallbacks( conf.shards.mapIt(RelayShard(clusterId: conf.clusterId, shardId: uint16(it))) let shards = confShards & autoShards - for shard in shards: - discard node.wakuRelay.subscribe($shard, appCallbacks.relayHandler) + let uniqueShards = deduplicate(shards) + + for shard in uniqueShards: + let topic = $shard + node.subscribe((kind: PubsubSub, topic: topic), appCallbacks.relayHandler).isOkOr: + return err(fmt"Could not subscribe {topic}: " & $error) if not appCallbacks.topicHealthChangeHandler.isNil(): if node.wakuRelay.isNil(): diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 1538d9096..ac72f3e37 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -116,7 +116,6 @@ type announcedAddresses*: seq[MultiAddress] started*: bool # Indicates that node has started listening topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent] - contentTopicHandlers: Table[ContentTopic, TopicHandler] rateLimitSettings*: ProtocolRateLimitSettings proc new*( @@ -256,7 +255,13 @@ proc mountStoreSync*( ## Waku relay -proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) = +proc registerRelayHandler( + node: WakuNode, topic: PubsubTopic, appHandler: WakuRelayHandler +) = + ## Registers the only handler for the given topic. + ## Notice that this handler internally calls other handlers, such as filter, + ## archive, etc, plus the handler provided by the application. + if node.wakuRelay.isSubscribed(topic): return @@ -289,18 +294,19 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) = node.wakuStoreReconciliation.messageIngress(topic, msg) - let defaultHandler = proc( + let uniqueTopicHandler = proc( topic: PubsubTopic, msg: WakuMessage ): Future[void] {.async, gcsafe.} = await traceHandler(topic, msg) await filterHandler(topic, msg) await archiveHandler(topic, msg) await syncHandler(topic, msg) + await appHandler(topic, msg) - discard node.wakuRelay.subscribe(topic, defaultHandler) + node.wakuRelay.subscribe(topic, uniqueTopicHandler) proc subscribe*( - node: WakuNode, subscription: SubscriptionEvent, handler = none(WakuRelayHandler) + node: WakuNode, subscription: SubscriptionEvent, handler: WakuRelayHandler ): Result[void, string] = ## Subscribes to a PubSub or Content topic. Triggers handler when receiving messages on ## this topic. WakuRelayHandler is a method that takes a topic and a Waku message. @@ -326,18 +332,8 @@ proc subscribe*( warn "No-effect API call to subscribe. Already subscribed to topic", pubsubTopic return ok() - if contentTopicOp.isSome() and node.contentTopicHandlers.hasKey(contentTopicOp.get()): - warn "No-effect API call to `subscribe`. Was already subscribed" - return ok() - + node.registerRelayHandler(pubsubTopic, handler) node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: pubsubTopic)) - node.registerRelayDefaultHandler(pubsubTopic) - - if handler.isSome(): - let wrappedHandler = node.wakuRelay.subscribe(pubsubTopic, handler.get()) - - if contentTopicOp.isSome(): - node.contentTopicHandlers[contentTopicOp.get()] = wrappedHandler return ok() @@ -367,17 +363,9 @@ proc unsubscribe*( warn "No-effect API call to `unsubscribe`. Was not subscribed", pubsubTopic return ok() - if contentTopicOp.isSome(): - # Remove this handler only - var handler: TopicHandler - ## TODO: refactor this part. I think we can simplify it - if node.contentTopicHandlers.pop(contentTopicOp.get(), handler): - debug "unsubscribe", contentTopic = contentTopicOp.get() - node.wakuRelay.unsubscribe(pubsubTopic) - else: - debug "unsubscribe", pubsubTopic = pubsubTopic - node.wakuRelay.unsubscribe(pubsubTopic) - node.topicSubscriptionQueue.emit((kind: PubsubUnsub, topic: pubsubTopic)) + debug "unsubscribe", pubsubTopic, contentTopicOp + node.wakuRelay.unsubscribe(pubsubTopic) + node.topicSubscriptionQueue.emit((kind: PubsubUnsub, topic: pubsubTopic)) return ok() @@ -439,7 +427,6 @@ proc startRelay*(node: WakuNode) {.async.} = proc mountRelay*( node: WakuNode, - shards: seq[RelayShard] = @[], peerExchangeHandler = none(RoutingRecordsHandler), maxMessageSize = int(DefaultMaxWakuMessageSize), ): Future[Result[void, string]] {.async.} = @@ -465,16 +452,7 @@ proc mountRelay*( node.switch.mount(node.wakuRelay, protocolMatcher(WakuRelayCodec)) - ## Make sure we don't have duplicates - let uniqueShards = deduplicate(shards) - - # Subscribe to shards - for shard in uniqueShards: - node.subscribe((kind: PubsubSub, topic: $shard)).isOkOr: - error "failed to subscribe to shard", error = error - return err("failed to subscribe to shard in mountRelay: " & error) - - info "relay mounted successfully", shards = uniqueShards + info "relay mounted successfully" return ok() ## Waku filter diff --git a/waku/waku_api/rest/builder.nim b/waku/waku_api/rest/builder.nim index 6e880f5a3..f11a11fbc 100644 --- a/waku/waku_api/rest/builder.nim +++ b/waku/waku_api/rest/builder.nim @@ -148,9 +148,9 @@ proc startRestServerProtocolSupport*( let pubsubTopic = $RelayShard(clusterId: clusterId, shardId: shard) cache.pubsubSubscribe(pubsubTopic) - ## TODO: remove this line. use observer-observable pattern - ## within waku_node::registerRelayDefaultHandler - discard node.wakuRelay.subscribe(pubsubTopic, handler) + node.subscribe((kind: PubsubSub, topic: pubsubTopic), handler).isOkOr: + error "Could not subscribe", pubsubTopic, error + continue for contentTopic in contentTopics: cache.contentSubscribe(contentTopic) @@ -160,9 +160,9 @@ proc startRestServerProtocolSupport*( continue let pubsubTopic = $shard - ## TODO: remove this line. use observer-observable pattern - ## within waku_node::registerRelayDefaultHandler - discard node.wakuRelay.subscribe(pubsubTopic, handler) + node.subscribe((kind: PubsubSub, topic: pubsubTopic), handler).isOkOr: + error "Could not subscribe", pubsubTopic, error + continue installRelayApiHandlers(router, node, cache) else: diff --git a/waku/waku_api/rest/relay/handlers.nim b/waku/waku_api/rest/relay/handlers.nim index 252375208..06bbc0c06 100644 --- a/waku/waku_api/rest/relay/handlers.nim +++ b/waku/waku_api/rest/relay/handlers.nim @@ -67,9 +67,7 @@ proc installRelayApiHandlers*( for pubsubTopic in newTopics: cache.pubsubSubscribe(pubsubTopic) - node.subscribe( - (kind: PubsubSub, topic: pubsubTopic), some(messageCacheHandler(cache)) - ).isOkOr: + node.subscribe((kind: PubsubSub, topic: pubsubTopic), messageCacheHandler(cache)).isOkOr: let errorMsg = "Subscribe failed:" & $error error "SUBSCRIBE failed", error = errorMsg return RestApiResponse.internalServerError(errorMsg) @@ -202,7 +200,7 @@ proc installRelayApiHandlers*( cache.contentSubscribe(contentTopic) node.subscribe( - (kind: ContentSub, topic: contentTopic), some(messageCacheHandler(cache)) + (kind: ContentSub, topic: contentTopic), messageCacheHandler(cache) ).isOkOr: let errorMsg = "Subscribe failed:" & $error error "SUBSCRIBE failed", error = errorMsg diff --git a/waku/waku_relay/protocol.nim b/waku/waku_relay/protocol.nim index 8da3f89b5..c87519b06 100644 --- a/waku/waku_relay/protocol.nim +++ b/waku/waku_relay/protocol.nim @@ -131,6 +131,8 @@ type # a map of validators to error messages to return when validation fails topicValidator: Table[PubsubTopic, ValidatorHandler] # map topic with its assigned validator within pubsub + topicHandlers: Table[PubsubTopic, TopicHandler] + # map topic with the TopicHandler proc in charge of attending topic's incoming message events publishObservers: seq[PublishObserver] topicsHealth*: Table[string, TopicHealth] onTopicHealthChange*: TopicHealthChangeHandler @@ -488,13 +490,11 @@ proc validateMessage*( return ok() -proc subscribe*( - w: WakuRelay, pubsubTopic: PubsubTopic, handler: WakuRelayHandler -): TopicHandler = +proc subscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: WakuRelayHandler) = debug "subscribe", pubsubTopic = pubsubTopic # We need to wrap the handler since gossipsub doesnt understand WakuMessage - let wrappedHandler = proc( + let topicHandler = proc( pubsubTopic: string, data: seq[byte] ): Future[void] {.gcsafe, raises: [].} = let decMsg = WakuMessage.decode(data) @@ -526,9 +526,9 @@ proc subscribe*( w.topicParams[pubsubTopic] = TopicParameters # subscribe to the topic with our wrapped handler - procCall GossipSub(w).subscribe(pubsubTopic, wrappedHandler) + procCall GossipSub(w).subscribe(pubsubTopic, topicHandler) - return wrappedHandler + w.topicHandlers[pubsubTopic] = topicHandler proc unsubscribeAll*(w: WakuRelay, pubsubTopic: PubsubTopic) = ## Unsubscribe all handlers on this pubsub topic @@ -537,35 +537,32 @@ proc unsubscribeAll*(w: WakuRelay, pubsubTopic: PubsubTopic) = procCall GossipSub(w).unsubscribeAll(pubsubTopic) w.topicValidator.del(pubsubTopic) + w.topicHandlers.del(pubsubTopic) proc unsubscribe*(w: WakuRelay, pubsubTopic: PubsubTopic) = if not w.topicValidator.hasKey(pubsubTopic): error "unsubscribe no validator for this topic", pubsubTopic return - if pubsubtopic notin Pubsub(w).topics: + if not w.topicHandlers.hasKey(pubsubTopic): error "not subscribed to the given topic", pubsubTopic return - var topicHandlerSeq: seq[TopicHandler] + var topicHandler: TopicHandler var topicValidator: ValidatorHandler try: - topicHandlerSeq = Pubsub(w).topics[pubsubTopic] - if topicHandlerSeq.len == 0: - error "unsubscribe no handler for this topic", pubsubTopic - return + topicHandler = w.topicHandlers[pubsubTopic] topicValidator = w.topicValidator[pubsubTopic] except KeyError: error "exception in unsubscribe", pubsubTopic, error = getCurrentExceptionMsg() return - let topicHandler = topicHandlerSeq[0] - debug "unsubscribe", pubsubTopic - procCall GossipSub(w).unsubscribe($pubsubTopic, topicHandler) - ## TODO: uncomment the following line when https://github.com/vacp2p/nim-libp2p/pull/1356 - ## is available in a nim-libp2p release. - # procCall GossipSub(w).removeValidator(pubsubTopic, topicValidator) + procCall GossipSub(w).unsubscribe(pubsubTopic, topicHandler) + procCall GossipSub(w).removeValidator(pubsubTopic, topicValidator) + + w.topicValidator.del(pubsubTopic) + w.topicHandlers.del(pubsubTopic) proc publish*( w: WakuRelay, pubsubTopic: PubsubTopic, wakuMessage: WakuMessage