From 6bc05efc02ab8984643032b90e44c032075d619f Mon Sep 17 00:00:00 2001 From: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Date: Mon, 5 May 2025 22:57:20 +0200 Subject: [PATCH] chore: Avoid double relay subscription (#3396) * make sure subscribe once to every topic in waku_node * start suggest use of removeValidator in waku_relay/protocol. Commented until libp2p updated. --- apps/chat2/chat2.nim | 8 +- apps/chat2bridge/chat2bridge.nim | 9 +- apps/networkmonitor/networkmonitor.nim | 9 +- examples/publisher.nim | 5 +- examples/subscriber.nim | 8 +- .../stealth_commitment_protocol.nim | 4 +- .../requests/protocols/relay_request.nim | 17 +- tests/node/peer_manager/test_peer_manager.nim | 18 ++- tests/node/test_wakunode_filter.nim | 9 +- tests/node/test_wakunode_legacy_lightpush.nim | 22 ++- tests/node/test_wakunode_lightpush.nim | 20 ++- tests/node/test_wakunode_peer_manager.nim | 36 +++-- tests/node/test_wakunode_relay_rln.nim | 8 +- tests/test_peer_manager.nim | 27 ++-- tests/test_relay_peer_exchange.nim | 15 +- tests/test_waku_dnsdisc.nim | 12 +- tests/test_waku_keepalive.nim | 6 +- tests/test_wakunode.nim | 39 +++-- tests/waku_relay/test_wakunode_relay.nim | 145 +++++++++++++----- tests/waku_relay/utils.nim | 15 +- .../test_wakunode_rln_relay.nim | 89 ++++++++--- tests/waku_rln_relay/utils_static.nim | 6 +- tests/wakunode_rest/test_rest_cors.nim | 12 +- tests/wakunode_rest/test_rest_debug.nim | 6 +- tests/wakunode_rest/test_rest_filter.nim | 13 +- tests/wakunode_rest/test_rest_health.nim | 3 +- tests/wakunode_rest/test_rest_lightpush.nim | 23 ++- .../test_rest_lightpush_legacy.nim | 23 ++- tests/wakunode_rest/test_rest_relay.nim | 57 +++++-- tests/wakunode_rest/test_rest_store.nim | 18 ++- waku/factory/node_factory.nim | 6 +- waku/node/waku_node.nim | 68 ++++---- waku/waku_api/rest/builder.nim | 22 ++- waku/waku_api/rest/relay/handlers.nim | 22 ++- waku/waku_relay/protocol.nim | 47 ++++-- 35 files changed, 596 insertions(+), 251 deletions(-) diff --git a/apps/chat2/chat2.nim b/apps/chat2/chat2.nim index 3723291e3..b28724357 100644 --- a/apps/chat2/chat2.nim +++ b/apps/chat2/chat2.nim @@ -381,7 +381,9 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = if conf.relay: let shards = conf.shards.mapIt(RelayShard(clusterId: conf.clusterId, shardId: uint16(it))) - await node.mountRelay(shards) + (await node.mountRelay(shards)).isOkOr: + echo "failed to mount relay: " & error + return await node.mountLibp2pPing() @@ -535,7 +537,9 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = node.subscribe( (kind: PubsubSub, topic: DefaultPubsubTopic), some(WakuRelayHandler(handler)) - ) + ).isOkOr: + error "failed to subscribe to pubsub topic", + topic = DefaultPubsubTopic, error = error if conf.rlnRelay: info "WakuRLNRelay is enabled" diff --git a/apps/chat2bridge/chat2bridge.nim b/apps/chat2bridge/chat2bridge.nim index 96782360b..7a7a5d08f 100644 --- a/apps/chat2bridge/chat2bridge.nim +++ b/apps/chat2bridge/chat2bridge.nim @@ -215,7 +215,10 @@ proc start*(cmb: Chat2MatterBridge) {.async.} = # Always mount relay for bridge # `triggerSelf` is false on a `bridge` to avoid duplicates - await cmb.nodev2.mountRelay() + (await cmb.nodev2.mountRelay()).isOkOr: + error "failed to mount relay", error = error + return + cmb.nodev2.wakuRelay.triggerSelf = false # Bridging @@ -229,7 +232,9 @@ proc start*(cmb: Chat2MatterBridge) {.async.} = except: error "exception in relayHandler: " & getCurrentExceptionMsg() - cmb.nodev2.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler)) + cmb.nodev2.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler)).isOkOr: + error "failed to subscribe to relay", topic = DefaultPubsubTopic, error = error + return proc stop*(cmb: Chat2MatterBridge) {.async: (raises: [Exception]).} = info "Stopping Chat2MatterBridge" diff --git a/apps/networkmonitor/networkmonitor.nim b/apps/networkmonitor/networkmonitor.nim index 2861c85ae..d5d7e6bcf 100644 --- a/apps/networkmonitor/networkmonitor.nim +++ b/apps/networkmonitor/networkmonitor.nim @@ -554,7 +554,9 @@ proc subscribeAndHandleMessages( else: msgPerContentTopic[msg.contentTopic] = 1 - node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(WakuRelayHandler(handler))) + node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(WakuRelayHandler(handler))).isOkOr: + error "failed to subscribe to pubsub topic", pubsubTopic, error + quit(1) when isMainModule: # known issue: confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError @@ -619,7 +621,10 @@ when isMainModule: let (node, discv5) = nodeRes.get() - waitFor node.mountRelay() + (waitFor node.mountRelay()).isOkOr: + error "failed to mount waku relay protocol: ", err = error + quit 1 + waitFor node.mountLibp2pPing() var onFatalErrorAction = proc(msg: string) {.gcsafe, closure.} = diff --git a/examples/publisher.nim b/examples/publisher.nim index 5b1ca9f18..907ce2274 100644 --- a/examples/publisher.nim +++ b/examples/publisher.nim @@ -86,7 +86,10 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} = ) await node.start() - await node.mountRelay() + (await node.mountRelay()).isOkOr: + error "failed to mount relay", error = error + quit(1) + node.peerManager.start() (await wakuDiscv5.start()).isOkOr: diff --git a/examples/subscriber.nim b/examples/subscriber.nim index 90440aabc..633bfa4ca 100644 --- a/examples/subscriber.nim +++ b/examples/subscriber.nim @@ -84,7 +84,9 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} = ) await node.start() - await node.mountRelay() + (await node.mountRelay()).isOkOr: + error "failed to mount relay", error = error + quit(1) node.peerManager.start() (await wakuDiscv5.start()).isOkOr: @@ -118,7 +120,9 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} = contentTopic = msg.contentTopic, timestamp = msg.timestamp - node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(WakuRelayHandler(handler))) + node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(WakuRelayHandler(handler))).isOkOr: + error "failed to subscribe to pubsub topic", pubsubTopic, error + quit(1) when isMainModule: let rng = crypto.newRng() diff --git a/examples/wakustealthcommitments/stealth_commitment_protocol.nim b/examples/wakustealthcommitments/stealth_commitment_protocol.nim index c6e6d6b9c..7da6bff56 100644 --- a/examples/wakustealthcommitments/stealth_commitment_protocol.nim +++ b/examples/wakustealthcommitments/stealth_commitment_protocol.nim @@ -187,5 +187,7 @@ proc new*( except CatchableError: error "could not handle SCP message: ", err = getCurrentExceptionMsg() - waku.node.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(handler)) + waku.node.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(handler)).isOkOr: + error "could not subscribe to pubsub topic: ", err = $error + return err("could not subscribe to pubsub topic: " & $error) return ok(SCP) diff --git a/library/waku_thread/inter_thread_communication/requests/protocols/relay_request.nim b/library/waku_thread/inter_thread_communication/requests/protocols/relay_request.nim index 97f01488a..6a437122a 100644 --- a/library/waku_thread/inter_thread_communication/requests/protocols/relay_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/protocols/relay_request.nim @@ -7,6 +7,7 @@ import ../../../../../waku/waku_core/message, ../../../../../waku/waku_core/time, # Timestamp ../../../../../waku/waku_core/topics/pubsub_topic, + ../../../../../waku/waku_core/topics, ../../../../../waku/waku_relay/protocol, ../../../../../waku/node/peer_manager, ../../../../alloc @@ -108,12 +109,18 @@ proc process*( case self.operation of SUBSCRIBE: - # TO DO: properly perform 'subscribe' - waku.node.registerRelayDefaultHandler($self.pubsubTopic) - discard waku.node.wakuRelay.subscribe($self.pubsubTopic, self.relayEventCallback) + waku.node.subscribe( + (kind: SubscriptionKind.PubsubSub, topic: $self.pubsubTopic), + handler = some(self.relayEventCallback), + ).isOkOr: + let errorMsg = "Subscribe failed:" & $error + error "SUBSCRIBE failed", error = errorMsg + return err(errorMsg) of UNSUBSCRIBE: - # TODO: properly perform 'unsubscribe' - waku.node.wakuRelay.unsubscribeAll($self.pubsubTopic) + waku.node.unsubscribe((kind: SubscriptionKind.PubsubSub, topic: $self.pubsubTopic)).isOkOr: + let errorMsg = "Unsubscribe failed:" & $error + error "UNSUBSCRIBE failed", error = errorMsg + return err(errorMsg) of PUBLISH: let msg = self.message.toWakuMessage() let pubsubTopic = $self.pubsubTopic diff --git a/tests/node/peer_manager/test_peer_manager.nim b/tests/node/peer_manager/test_peer_manager.nim index 57acf13df..6eddda0d6 100644 --- a/tests/node/peer_manager/test_peer_manager.nim +++ b/tests/node/peer_manager/test_peer_manager.nim @@ -76,8 +76,10 @@ suite "Peer Manager": # And both mount metadata and relay discard client.mountMetadata(0) # clusterId irrelevant, overridden by topic discard server.mountMetadata(0) # clusterId irrelevant, overridden by topic - await client.mountRelay() - await server.mountRelay() + (await client.mountRelay()).isOkOr: + assert false, "Failed to mount relay" + (await server.mountRelay()).isOkOr: + assert false, "Failed to mount relay" # And both nodes are started await allFutures(server.start(), client.start()) @@ -89,7 +91,8 @@ suite "Peer Manager": await sleepAsync(FUTURE_TIMEOUT) # When making an operation that triggers onPeerMetadata - client.subscribe((kind: SubscriptionKind.PubsubSub, topic: "newTopic")) + client.subscribe((kind: SubscriptionKind.PubsubSub, topic: "newTopic")).isOkOr: + assert false, "Failed to subscribe to relay" await sleepAsync(FUTURE_TIMEOUT) check: @@ -109,8 +112,10 @@ suite "Peer Manager": # And both mount metadata and relay discard client.mountMetadata(0) # clusterId irrelevant, overridden by topic discard server.mountMetadata(0) # clusterId irrelevant, overridden by topic - await client.mountRelay() - await server.mountRelay() + (await client.mountRelay()).isOkOr: + assert false, "Failed to mount relay" + (await server.mountRelay()).isOkOr: + assert false, "Failed to mount relay" # And both nodes are started await allFutures(server.start(), client.start()) @@ -122,7 +127,8 @@ suite "Peer Manager": await sleepAsync(FUTURE_TIMEOUT) # When making an operation that triggers onPeerMetadata - client.subscribe((kind: SubscriptionKind.PubsubSub, topic: "newTopic")) + client.subscribe((kind: SubscriptionKind.PubsubSub, topic: "newTopic")).isOkOr: + assert false, "Failed to subscribe to relay" await sleepAsync(FUTURE_TIMEOUT) check: diff --git a/tests/node/test_wakunode_filter.nim b/tests/node/test_wakunode_filter.nim index 83c486a7e..bf9f2495b 100644 --- a/tests/node/test_wakunode_filter.nim +++ b/tests/node/test_wakunode_filter.nim @@ -135,7 +135,8 @@ suite "Waku Filter - End to End": asyncTest "Client Node can't receive Push from Server Node, via Relay": # Given the server node has Relay enabled - await server.mountRelay() + (await server.mountRelay()).isOkOr: + assert false, "error mounting relay: " & $error # And valid filter subscription let subscribeResponse = await client.filterSubscribe( @@ -159,7 +160,8 @@ suite "Waku Filter - End to End": server = newTestWakuNode(serverKey, parseIpAddress("0.0.0.0"), Port(0)) await server.start() - await server.mountRelay() + (await server.mountRelay()).isOkOr: + assert false, "error mounting relay: " & $error let serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo() @@ -222,7 +224,8 @@ suite "Waku Filter - End to End": pushedMsg == msg asyncTest "Filter Client Node can't receive messages after subscribing and restarting, via Relay": - await server.mountRelay() + (await server.mountRelay()).isOkOr: + assert false, "error mounting relay: " & $error # Given a valid filter subscription let subscribeResponse = await client.filterSubscribe( diff --git a/tests/node/test_wakunode_legacy_lightpush.nim b/tests/node/test_wakunode_legacy_lightpush.nim index 4ff9c7f00..e19d29c64 100644 --- a/tests/node/test_wakunode_legacy_lightpush.nim +++ b/tests/node/test_wakunode_legacy_lightpush.nim @@ -52,7 +52,9 @@ suite "Waku Legacy Lightpush - End To End": await allFutures(server.start(), client.start()) await server.start() - await server.mountRelay() + (await server.mountRelay()).isOkOr: + assert false, "Failed to mount relay" + await server.mountLegacyLightpush() # without rln-relay client.mountLegacyLightpushClient() @@ -142,7 +144,8 @@ suite "RLN Proofs as a Lightpush Service": await allFutures(server.start(), client.start()) await server.start() - await server.mountRelay() + (await server.mountRelay()).isOkOr: + assert false, "Failed to mount relay" await server.mountRlnRelay(wakuRlnConfig) await server.mountLegacyLightPush() client.mountLegacyLightPushClient() @@ -187,8 +190,10 @@ suite "Waku Legacy Lightpush message delivery": await allFutures(destNode.start(), bridgeNode.start(), lightNode.start()) - await destNode.mountRelay(@[DefaultRelayShard]) - await bridgeNode.mountRelay(@[DefaultRelayShard]) + (await destNode.mountRelay(@[DefaultRelayShard])).isOkOr: + assert false, "Failed to mount relay" + (await bridgeNode.mountRelay(@[DefaultRelayShard])).isOkOr: + assert false, "Failed to mount relay" await bridgeNode.mountLegacyLightPush() lightNode.mountLegacyLightPushClient() @@ -199,24 +204,25 @@ suite "Waku Legacy Lightpush message delivery": await destNode.connectToNodes(@[bridgeNode.peerInfo.toRemotePeerInfo()]) ## Given + const CustomPubsubTopic = "/waku/2/rs/0/1" let message = fakeWakuMessage() - var completionFutRelay = newFuture[bool]() proc relayHandler( topic: PubsubTopic, msg: WakuMessage ): Future[void] {.async, gcsafe.} = check: - topic == DefaultPubsubTopic + topic == CustomPubsubTopic msg == message completionFutRelay.complete(true) - destNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler)) + destNode.subscribe((kind: PubsubSub, topic: CustomPubsubTopic), some(relayHandler)).isOkOr: + assert false, "Failed to subscribe to topic:" & $error # Wait for subscription to take effect await sleepAsync(100.millis) ## When - let res = await lightNode.legacyLightpushPublish(some(DefaultPubsubTopic), message) + let res = await lightNode.legacyLightpushPublish(some(CustomPubsubTopic), message) assert res.isOk(), $res.error ## Then diff --git a/tests/node/test_wakunode_lightpush.nim b/tests/node/test_wakunode_lightpush.nim index 2e785e368..72e9b8bf3 100644 --- a/tests/node/test_wakunode_lightpush.nim +++ b/tests/node/test_wakunode_lightpush.nim @@ -46,7 +46,8 @@ suite "Waku Lightpush - End To End": await allFutures(server.start(), client.start()) await server.start() - await server.mountRelay() + (await server.mountRelay()).isOkOr: + assert false, "Failed to mount relay" await server.mountLightpush() # without rln-relay client.mountLightpushClient() @@ -137,7 +138,8 @@ suite "RLN Proofs as a Lightpush Service": await allFutures(server.start(), client.start()) await server.start() - await server.mountRelay() + (await server.mountRelay()).isOkOr: + assert false, "Failed to mount relay" await server.mountRlnRelay(wakuRlnConfig) await server.mountLightPush() client.mountLightPushClient() @@ -182,8 +184,10 @@ suite "Waku Lightpush message delivery": await allFutures(destNode.start(), bridgeNode.start(), lightNode.start()) - await destNode.mountRelay(@[DefaultRelayShard]) - await bridgeNode.mountRelay(@[DefaultRelayShard]) + (await destNode.mountRelay(@[DefaultRelayShard])).isOkOr: + assert false, "Failed to mount relay" + (await bridgeNode.mountRelay(@[DefaultRelayShard])).isOkOr: + assert false, "Failed to mount relay" await bridgeNode.mountLightPush() lightNode.mountLightPushClient() @@ -194,6 +198,7 @@ suite "Waku Lightpush message delivery": await destNode.connectToNodes(@[bridgeNode.peerInfo.toRemotePeerInfo()]) ## Given + const CustomPubsubTopic = "/waku/2/rs/0/1" let message = fakeWakuMessage() var completionFutRelay = newFuture[bool]() @@ -201,17 +206,18 @@ suite "Waku Lightpush message delivery": topic: PubsubTopic, msg: WakuMessage ): Future[void] {.async, gcsafe.} = check: - topic == DefaultPubsubTopic + topic == CustomPubsubTopic msg == message completionFutRelay.complete(true) - destNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler)) + destNode.subscribe((kind: PubsubSub, topic: CustomPubsubTopic), some(relayHandler)).isOkOr: + assert false, "Failed to subscribe to relay" # Wait for subscription to take effect await sleepAsync(100.millis) ## When - let res = await lightNode.lightpushPublish(some(DefaultPubsubTopic), message) + let res = await lightNode.lightpushPublish(some(CustomPubsubTopic), message) assert res.isOk(), $res.error assert res.get() == 1, "Expected to relay the message to 1 node" diff --git a/tests/node/test_wakunode_peer_manager.nim b/tests/node/test_wakunode_peer_manager.nim index e37b3e108..88fcc827f 100644 --- a/tests/node/test_wakunode_peer_manager.nim +++ b/tests/node/test_wakunode_peer_manager.nim @@ -308,7 +308,8 @@ suite "Peer Manager": asyncTest "Peer Protocol Support Verification (Before Connection)": # Given the server has mounted some Waku protocols - await server.mountRelay() + (await server.mountRelay()).isOkOr: + assert false, "Failed to mount relay" await server.mountFilter() # When connecting to the server @@ -335,7 +336,8 @@ suite "Peer Manager": server2RemotePeerInfo = server2.switch.peerInfo.toRemotePeerInfo() server2PeerId = server2RemotePeerInfo.peerId - await server2.mountRelay() + (await server2.mountRelay()).isOkOr: + assert false, "Failed to mount relay" # When connecting to both servers await client.connectToNodes(@[serverRemotePeerInfo, server2RemotePeerInfo]) @@ -533,8 +535,10 @@ suite "Peer Manager": suite "Peer Connectivity States": asyncTest "State Tracking & Transition": # Given two correctly initialised nodes, but not connected - await server.mountRelay() - await client.mountRelay() + (await server.mountRelay()).isOkOr: + assert false, "Failed to mount relay" + (await client.mountRelay()).isOkOr: + assert false, "Failed to mount relay" # Then their connectedness should be NotConnected check: @@ -587,8 +591,10 @@ suite "Peer Manager": suite "Automatic Reconnection": asyncTest "Automatic Reconnection Implementation": # Given two correctly initialised nodes, that are available for reconnection - await server.mountRelay() - await client.mountRelay() + (await server.mountRelay()).isOkOr: + assert false, "Failed to mount relay" + (await client.mountRelay()).isOkOr: + assert false, "Failed to mount relay" await client.connectToNodes(@[serverRemotePeerInfo]) waitActive: @@ -810,7 +816,8 @@ suite "Mount Order": serverKey = generateSecp256k1Key() server = newTestWakuNode(serverKey, listenIp, listenPort) - await server.mountRelay() + (await server.mountRelay()).isOkOr: + assert false, "Failed to mount relay" await server.start() let serverRemotePeerInfo = server.switch.peerInfo.toRemotePeerInfo() @@ -834,7 +841,8 @@ suite "Mount Order": serverKey = generateSecp256k1Key() server = newTestWakuNode(serverKey, listenIp, listenPort) - await server.mountRelay() + (await server.mountRelay()).isOkOr: + assert false, "Failed to mount relay" let serverRemotePeerInfo = server.switch.peerInfo.toRemotePeerInfo() serverPeerId = serverRemotePeerInfo.peerId @@ -859,7 +867,8 @@ suite "Mount Order": server = newTestWakuNode(serverKey, listenIp, listenPort) await server.start() - await server.mountRelay() + (await server.mountRelay()).isOkOr: + assert false, "Failed to mount relay" let serverRemotePeerInfo = server.switch.peerInfo.toRemotePeerInfo() serverPeerId = serverRemotePeerInfo.peerId @@ -886,7 +895,8 @@ suite "Mount Order": let serverRemotePeerInfo = server.switch.peerInfo.toRemotePeerInfo() serverPeerId = serverRemotePeerInfo.peerId - await server.mountRelay() + (await server.mountRelay()).isOkOr: + assert false, "Failed to mount relay" # When connecting to the server await client.connectToNodes(@[serverRemotePeerInfo]) @@ -910,7 +920,8 @@ suite "Mount Order": serverRemotePeerInfo = server.switch.peerInfo.toRemotePeerInfo() serverPeerId = serverRemotePeerInfo.peerId await server.start() - await server.mountRelay() + (await server.mountRelay()).isOkOr: + assert false, "Failed to mount relay" # When connecting to the server await client.connectToNodes(@[serverRemotePeerInfo]) @@ -932,7 +943,8 @@ suite "Mount Order": let serverRemotePeerInfo = server.switch.peerInfo.toRemotePeerInfo() serverPeerId = serverRemotePeerInfo.peerId - await server.mountRelay() + (await server.mountRelay()).isOkOr: + assert false, "Failed to mount relay" await server.start() # When connecting to the server diff --git a/tests/node/test_wakunode_relay_rln.nim b/tests/node/test_wakunode_relay_rln.nim index 0bf608d12..45dc6ce37 100644 --- a/tests/node/test_wakunode_relay_rln.nim +++ b/tests/node/test_wakunode_relay_rln.nim @@ -263,7 +263,9 @@ suite "Waku RlnRelay - End to End - Static": completionFut.complete((topic, msg)) let subscriptionEvent = (kind: PubsubSub, topic: pubsubTopic) - server.subscribe(subscriptionEvent, some(relayHandler)) + server.subscribe(subscriptionEvent, some(relayHandler)).isOkOr: + assert false, "Failed to subscribe to pubsub topic" + await sleepAsync(FUTURE_TIMEOUT) # Generate Messages @@ -357,7 +359,9 @@ suite "Waku RlnRelay - End to End - Static": completionFut.complete((topic, msg)) let subscriptionEvent = (kind: PubsubSub, topic: pubsubTopic) - server.subscribe(subscriptionEvent, some(relayHandler)) + server.subscribe(subscriptionEvent, some(relayHandler)).isOkOr: + assert false, "Failed to subscribe to pubsub topic" + await sleepAsync(FUTURE_TIMEOUT) # Generate Messages diff --git a/tests/test_peer_manager.nim b/tests/test_peer_manager.nim index d79c6b991..1c2805710 100644 --- a/tests/test_peer_manager.nim +++ b/tests/test_peer_manager.nim @@ -282,8 +282,10 @@ procSuite "Peer Manager": await node1.start() await node2.start() - await node1.mountRelay() - await node2.mountRelay() + (await node1.mountRelay()).isOkOr: + assert false, "Failed to mount relay" + (await node2.mountRelay()).isOkOr: + assert false, "Failed to mount relay" let peerInfo2 = node2.switch.peerInfo var remotePeerInfo2 = peerInfo2.toRemotePeerInfo() @@ -323,7 +325,8 @@ procSuite "Peer Manager": node3.peerManager.switch.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId) node3.peerManager.switch.peerStore.connectedness(peerInfo2.peerId) == NotConnected - await node3.mountRelay() + (await node3.mountRelay()).isOkOr: + assert false, "Failed to mount relay" await node3.peerManager.connectToRelayPeers() @@ -352,8 +355,10 @@ procSuite "Peer Manager": await node1.start() await node2.start() - await node1.mountRelay() - await node2.mountRelay() + (await node1.mountRelay()).isOkOr: + assert false, "Failed to mount relay" + (await node2.mountRelay()).isOkOr: + assert false, "Failed to mount relay" let peerInfo2 = node2.switch.peerInfo var remotePeerInfo2 = peerInfo2.toRemotePeerInfo() @@ -393,7 +398,8 @@ procSuite "Peer Manager": node3.peerManager.switch.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId) node3.peerManager.switch.peerStore.connectedness(peerInfo2.peerId) == NotConnected - await node3.mountRelay() + (await node3.mountRelay()).isOkOr: + assert false, "Failed to mount relay" await node3.peerManager.manageRelayPeers() @@ -482,9 +488,11 @@ procSuite "Peer Manager": await node1.start() await node2.start() - await node1.mountRelay() + (await node1.mountRelay()).isOkOr: + assert false, "Failed to mount relay" node1.wakuRelay.codec = betaCodec - await node2.mountRelay() + (await node2.mountRelay()).isOkOr: + assert false, "Failed to mount relay" node2.wakuRelay.codec = betaCodec require: @@ -506,7 +514,8 @@ procSuite "Peer Manager": peerStorage = storage, ) - await node3.mountRelay() + (await node3.mountRelay()).isOkOr: + assert false, "Failed to mount relay" node3.wakuRelay.codec = stableCodec check: # Node 2 and 3 have differing codecs diff --git a/tests/test_relay_peer_exchange.nim b/tests/test_relay_peer_exchange.nim index e950cb015..a729ff1a7 100644 --- a/tests/test_relay_peer_exchange.nim +++ b/tests/test_relay_peer_exchange.nim @@ -23,8 +23,10 @@ procSuite "Relay (GossipSub) Peer Exchange": newTestWakuNode(node2Key, listenAddress, port, sendSignedPeerRecord = true) # When both client and server mount relay without a handler - await node1.mountRelay(@[DefaultRelayShard]) - await node2.mountRelay(@[DefaultRelayShard], none(RoutingRecordsHandler)) + (await node1.mountRelay(@[DefaultRelayShard])).isOkOr: + assert false, "Failed to mount relay" + (await node2.mountRelay(@[DefaultRelayShard], none(RoutingRecordsHandler))).isOkOr: + assert false, "Failed to mount relay" # Then the relays are mounted without a handler check: @@ -73,9 +75,12 @@ procSuite "Relay (GossipSub) Peer Exchange": peerExchangeHandle: RoutingRecordsHandler = peerExchangeHandler # Givem the nodes mount relay with a peer exchange handler - await node1.mountRelay(@[DefaultRelayShard], some(emptyPeerExchangeHandle)) - await node2.mountRelay(@[DefaultRelayShard], some(emptyPeerExchangeHandle)) - await node3.mountRelay(@[DefaultRelayShard], some(peerExchangeHandle)) + (await node1.mountRelay(@[DefaultRelayShard], some(emptyPeerExchangeHandle))).isOkOr: + assert false, "Failed to mount relay" + (await node2.mountRelay(@[DefaultRelayShard], some(emptyPeerExchangeHandle))).isOkOr: + assert false, "Failed to mount relay" + (await node3.mountRelay(@[DefaultRelayShard], some(peerExchangeHandle))).isOkOr: + assert false, "Failed to mount relay" # Ensure that node1 prunes all peers after the first connection node1.wakuRelay.parameters.dHigh = 1 diff --git a/tests/test_waku_dnsdisc.nim b/tests/test_waku_dnsdisc.nim index fe29627d4..7028b20eb 100644 --- a/tests/test_waku_dnsdisc.nim +++ b/tests/test_waku_dnsdisc.nim @@ -37,9 +37,12 @@ suite "Waku DNS Discovery": node3 = newTestWakuNode(nodeKey3, bindIp, Port(63503)) enr3 = node3.enr - await node1.mountRelay() - await node2.mountRelay() - await node3.mountRelay() + (await node1.mountRelay()).isOkOr: + assert false, "Failed to mount relay" + (await node2.mountRelay()).isOkOr: + assert false, "Failed to mount relay" + (await node3.mountRelay()).isOkOr: + assert false, "Failed to mount relay" await allFutures([node1.start(), node2.start(), node3.start()]) # Build and sign tree @@ -75,7 +78,8 @@ suite "Waku DNS Discovery": nodeKey4 = generateSecp256k1Key() node4 = newTestWakuNode(nodeKey4, bindIp, Port(63504)) - await node4.mountRelay() + (await node4.mountRelay()).isOkOr: + assert false, "Failed to mount relay" await node4.start() var wakuDnsDisc = WakuDnsDiscovery.init(location, resolver).get() diff --git a/tests/test_waku_keepalive.nim b/tests/test_waku_keepalive.nim index c961773e5..d4d05ad97 100644 --- a/tests/test_waku_keepalive.nim +++ b/tests/test_waku_keepalive.nim @@ -31,11 +31,13 @@ suite "Waku Keepalive": completionFut.complete(true) await node1.start() - await node1.mountRelay() + (await node1.mountRelay()).isOkOr: + assert false, "Failed to mount relay" await node1.mountLibp2pPing() await node2.start() - await node2.mountRelay() + (await node2.mountRelay()).isOkOr: + assert false, "Failed to mount relay" let pingProto = Ping.new(handler = pingHandler) await pingProto.start() diff --git a/tests/test_wakunode.nim b/tests/test_wakunode.nim index df4b442d6..51dd999b0 100644 --- a/tests/test_wakunode.nim +++ b/tests/test_wakunode.nim @@ -34,13 +34,15 @@ suite "WakuNode": # Setup node 1 with stable codec "/vac/waku/relay/2.0.0" await node1.start() - await node1.mountRelay(@[shard]) + (await node1.mountRelay(@[shard])).isOkOr: + assert false, "Failed to mount relay" node1.wakuRelay.codec = "/vac/waku/relay/2.0.0" # Setup node 2 with beta codec "/vac/waku/relay/2.0.0-beta2" await node2.start() - await node2.mountRelay(@[shard]) + (await node2.mountRelay(@[shard])).isOkOr: + assert false, "Failed to mount relay" node2.wakuRelay.codec = "/vac/waku/relay/2.0.0-beta2" check: @@ -61,7 +63,14 @@ suite "WakuNode": msg.payload == payload completionFut.complete(true) - node2.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)) + ## The following unsubscription is necessary to remove the default relay handler, which is + ## added when mountRelay is called. + node2.unsubscribe((kind: PubsubUnsub, topic: $shard)).isOkOr: + assert false, "Failed to unsubscribe from topic: " & $error + + ## Subscribe to the relay topic to add the custom relay handler defined above + node2.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)).isOkOr: + assert false, "Failed to subscribe to topic" await sleepAsync(2000.millis) var res = await node1.publish(some($shard), message) @@ -92,8 +101,10 @@ suite "WakuNode": node2PeerId = $(node2.switch.peerInfo.peerId) node2Dns4Addr = "/dns4/localhost/tcp/61022/p2p/" & node2PeerId - await node1.mountRelay() - await node2.mountRelay() + (await node1.mountRelay()).isOkOr: + assert false, "Failed to mount relay" + (await node2.mountRelay()).isOkOr: + assert false, "Failed to mount relay" await allFutures([node1.start(), node2.start()]) @@ -117,7 +128,8 @@ suite "WakuNode": # Initialize and start node1 await node1.start() - await node1.mountRelay() + (await node1.mountRelay()).isOkOr: + assert false, "Failed to mount relay" # Create an array to hold the other nodes var otherNodes: seq[WakuNode] = @[] @@ -129,7 +141,8 @@ suite "WakuNode": port = 60012 + i * 2 # Ensure unique ports for each node node = newTestWakuNode(nodeKey, parseIpAddress("127.0.0.1"), Port(port)) await node.start() - await node.mountRelay() + (await node.mountRelay()).isOkOr: + assert false, "Failed to mount relay" otherNodes.add(node) # Connect all other nodes to node1 @@ -296,10 +309,12 @@ suite "WakuNode": node2 = newTestWakuNode(nodeKey2, parseIpAddress("0.0.0.0"), Port(61016)) await node1.start() - await node1.mountRelay() + (await node1.mountRelay()).isOkOr: + assert false, "Failed to mount relay" await node2.start() - await node2.mountRelay() + (await node2.mountRelay()).isOkOr: + assert false, "Failed to mount relay" await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) await node2.connectToNodes(@[node1.switch.peerInfo.toRemotePeerInfo()]) @@ -337,10 +352,12 @@ suite "WakuNode": node2 = newTestWakuNode(nodeKey2, parseIpAddress("0.0.0.0"), Port(61020)) await node1.start() - await node1.mountRelay() + (await node1.mountRelay()).isOkOr: + assert false, "Failed to mount relay" await node2.start() - await node2.mountRelay() + (await node2.mountRelay()).isOkOr: + assert false, "Failed to mount relay" await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) await node2.connectToNodes(@[node1.switch.peerInfo.toRemotePeerInfo()]) diff --git a/tests/waku_relay/test_wakunode_relay.nim b/tests/waku_relay/test_wakunode_relay.nim index 8e028acdc..5d5ce8458 100644 --- a/tests/waku_relay/test_wakunode_relay.nim +++ b/tests/waku_relay/test_wakunode_relay.nim @@ -30,7 +30,8 @@ suite "WakuNode - Relay": # Relay protocol starts if mounted after node start await node1.start() - await node1.mountRelay() + (await node1.mountRelay()).isOkOr: + assert false, "Failed to mount relay" check: GossipSub(node1.wakuRelay).heartbeatFut.isNil() == false @@ -41,7 +42,8 @@ suite "WakuNode - Relay": nodeKey2 = generateSecp256k1Key() node2 = newTestWakuNode(nodeKey2, parseIpAddress("0.0.0.0"), Port(0)) - await node2.mountRelay() + (await node2.mountRelay()).isOkOr: + assert false, "Failed to mount relay" check: # Relay has not yet started as node has not yet started @@ -69,13 +71,16 @@ suite "WakuNode - Relay": message = WakuMessage(payload: payload, contentTopic: contentTopic) await node1.start() - await node1.mountRelay(@[shard]) + (await node1.mountRelay(@[shard])).isOkOr: + assert false, "Failed to mount relay" await node2.start() - await node2.mountRelay(@[shard]) + (await node2.mountRelay(@[shard])).isOkOr: + assert false, "Failed to mount relay" await node3.start() - await node3.mountRelay(@[shard]) + (await node3.mountRelay(@[shard])).isOkOr: + assert false, "Failed to mount relay" await allFutures( node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]), @@ -93,7 +98,14 @@ suite "WakuNode - Relay": msg.timestamp > 0 completionFut.complete(true) - node3.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)) + ## The following unsubscription is necessary to remove the default relay handler, which is + ## added when mountRelay is called. + node3.unsubscribe((kind: PubsubUnsub, topic: $shard)).isOkOr: + assert false, "Failed to unsubscribe from topic: " & $error + + ## Subscribe to the relay topic to add the custom relay handler defined above + node3.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)).isOkOr: + assert false, "Failed to subscribe to topic: " & $error await sleepAsync(500.millis) var res = await node1.publish(some($shard), message) @@ -136,13 +148,16 @@ suite "WakuNode - Relay": # start all the nodes await node1.start() - await node1.mountRelay(@[shard]) + (await node1.mountRelay(@[shard])).isOkOr: + assert false, "Failed to mount relay" await node2.start() - await node2.mountRelay(@[shard]) + (await node2.mountRelay(@[shard])).isOkOr: + assert false, "Failed to mount relay" await node3.start() - await node3.mountRelay(@[shard]) + (await node3.mountRelay(@[shard])).isOkOr: + assert false, "Failed to mount relay" await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) await node3.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) @@ -179,7 +194,14 @@ suite "WakuNode - Relay": # relay handler is called completionFut.complete(true) - node3.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)) + ## The following unsubscription is necessary to remove the default relay handler, which is + ## added when mountRelay is called. + node3.unsubscribe((kind: PubsubUnsub, topic: $shard)).isOkOr: + assert false, "Failed to unsubscribe from topic: " & $error + + ## Subscribe to the relay topic to add the custom relay handler defined above + node3.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)).isOkOr: + assert false, "Failed to subscribe to topic: " & $error await sleepAsync(500.millis) var res = await node1.publish(some($shard), message1) @@ -221,7 +243,8 @@ suite "WakuNode - Relay": connOk == true # Node 1 subscribes to topic - nodes[1].subscribe((kind: PubsubSub, topic: DefaultPubsubTopic)) + nodes[1].subscribe((kind: PubsubSub, topic: DefaultPubsubTopic)).isOkOr: + assert false, "Failed to subscribe to topic: " & $error await sleepAsync(500.millis) # Node 0 publishes 5 messages not compliant with WakuMessage (aka random bytes) @@ -265,10 +288,12 @@ suite "WakuNode - Relay": message = WakuMessage(payload: payload, contentTopic: contentTopic) await node1.start() - await node1.mountRelay(@[shard]) + (await node1.mountRelay(@[shard])).isOkOr: + assert false, "Failed to mount relay" await node2.start() - await node2.mountRelay(@[shard]) + (await node2.mountRelay(@[shard])).isOkOr: + assert false, "Failed to mount relay" await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) @@ -283,7 +308,14 @@ suite "WakuNode - Relay": msg.timestamp > 0 completionFut.complete(true) - node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)) + ## The following unsubscription is necessary to remove the default relay handler, which is + ## added when mountRelay is called. + node1.unsubscribe((kind: PubsubUnsub, topic: $shard)).isOkOr: + assert false, "Failed to unsubscribe from topic: " & $error + + ## Subscribe to the relay topic to add the custom relay handler defined above + node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)).isOkOr: + assert false, "Failed to subscribe to topic: " & $error await sleepAsync(500.millis) let res = await node2.publish(some($shard), message) @@ -314,10 +346,12 @@ suite "WakuNode - Relay": message = WakuMessage(payload: payload, contentTopic: contentTopic) await node1.start() - await node1.mountRelay(@[shard]) + (await node1.mountRelay(@[shard])).isOkOr: + assert false, "Failed to mount relay" await node2.start() - await node2.mountRelay(@[shard]) + (await node2.mountRelay(@[shard])).isOkOr: + assert false, "Failed to mount relay" await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) @@ -332,7 +366,14 @@ suite "WakuNode - Relay": msg.timestamp > 0 completionFut.complete(true) - node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)) + ## The following unsubscription is necessary to remove the default relay handler, which is + ## added when mountRelay is called. + node1.unsubscribe((kind: PubsubUnsub, topic: $shard)).isOkOr: + assert false, "Failed to unsubscribe from topic: " & $error + + ## Subscribe to the relay topic to add the custom relay handler defined above + node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)).isOkOr: + assert false, "Failed to subscribe to topic: " & $error await sleepAsync(500.millis) let res = await node2.publish(some($shard), message) @@ -363,10 +404,12 @@ suite "WakuNode - Relay": message = WakuMessage(payload: payload, contentTopic: contentTopic) await node1.start() - await node1.mountRelay(@[shard]) + (await node1.mountRelay(@[shard])).isOkOr: + assert false, "Failed to mount relay" await node2.start() - await node2.mountRelay(@[shard]) + (await node2.mountRelay(@[shard])).isOkOr: + assert false, "Failed to mount relay" #delete websocket peer address # TODO: a better way to find the index - this is too brittle @@ -385,7 +428,14 @@ suite "WakuNode - Relay": msg.timestamp > 0 completionFut.complete(true) - node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)) + ## The following unsubscription is necessary to remove the default relay handler, which is + ## added when mountRelay is called. + node1.unsubscribe((kind: PubsubUnsub, topic: $shard)).isOkOr: + assert false, "Failed to unsubscribe from topic: " & $error + + ## Subscribe to the relay topic to add the custom relay handler defined above + node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)).isOkOr: + assert false, "Failed to subscribe to topic: " & $error await sleepAsync(500.millis) let res = await node2.publish(some($shard), message) @@ -418,10 +468,12 @@ suite "WakuNode - Relay": message = WakuMessage(payload: payload, contentTopic: contentTopic) await node1.start() - await node1.mountRelay(@[shard]) + (await node1.mountRelay(@[shard])).isOkOr: + assert false, "Failed to mount relay" await node2.start() - await node2.mountRelay(@[shard]) + (await node2.mountRelay(@[shard])).isOkOr: + assert false, "Failed to mount relay" await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) @@ -436,7 +488,14 @@ suite "WakuNode - Relay": msg.timestamp > 0 completionFut.complete(true) - node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)) + ## The following unsubscription is necessary to remove the default relay handler, which is + ## added when mountRelay is called. + node1.unsubscribe((kind: PubsubUnsub, topic: $shard)).isOkOr: + assert false, "Failed to unsubscribe from topic: " & $error + + ## Subscribe to the relay topic to add the custom relay handler defined above + node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)).isOkOr: + assert false, "Failed to subscribe to topic: " & $error await sleepAsync(500.millis) let res = await node2.publish(some($shard), message) @@ -477,10 +536,12 @@ suite "WakuNode - Relay": message = WakuMessage(payload: payload, contentTopic: contentTopic) await node1.start() - await node1.mountRelay(@[shard]) + (await node1.mountRelay(@[shard])).isOkOr: + assert false, "Failed to mount relay" await node2.start() - await node2.mountRelay(@[shard]) + (await node2.mountRelay(@[shard])).isOkOr: + assert false, "Failed to mount relay" await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) @@ -495,7 +556,14 @@ suite "WakuNode - Relay": msg.timestamp > 0 completionFut.complete(true) - node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)) + ## The following unsubscription is necessary to remove the default relay handler, which is + ## added when mountRelay is called. + node1.unsubscribe((kind: PubsubUnsub, topic: $shard)).isOkOr: + assert false, "Failed to unsubscribe from topic: " & $error + + ## Subscribe to the relay topic to add the custom relay handler defined above + node1.subscribe((kind: PubsubSub, topic: $shard), some(relayHandler)).isOkOr: + assert false, "Failed to subscribe to topic: " & $error await sleepAsync(500.millis) let res = await node2.publish(some($shard), message) @@ -564,14 +632,15 @@ suite "WakuNode - Relay": # Stop all nodes await allFutures(nodes.mapIt(it.stop())) - asyncTest "Unsubscribe keep the subscription if other content topics also use the shard": + asyncTest "Only one subscription is allowed for contenttopics that generate the same shard": ## Setup let nodeKey = generateSecp256k1Key() node = newTestWakuNode(nodeKey, parseIpAddress("0.0.0.0"), Port(0)) await node.start() - await node.mountRelay() + (await node.mountRelay()).isOkOr: + assert false, "Failed to mount relay" require node.mountSharding(1, 1).isOk ## Given @@ -593,19 +662,19 @@ suite "WakuNode - Relay": "topic must use the same shard" ## When - node.subscribe((kind: ContentSub, topic: contentTopicA), some(handler)) - node.subscribe((kind: ContentSub, topic: contentTopicB), some(handler)) - node.subscribe((kind: ContentSub, topic: contentTopicC), some(handler)) + node.subscribe((kind: ContentSub, topic: contentTopicA), some(handler)).isOkOr: + assert false, "Failed to subscribe to topic: " & $error + node.subscribe((kind: ContentSub, topic: contentTopicB), some(handler)).isErrOr: + assert false, + "The subscription should fail because is already subscribe to that shard" + node.subscribe((kind: ContentSub, topic: contentTopicC), some(handler)).isErrOr: + assert false, + "The subscription should fail because is already subscribe to that shard" ## Then - node.unsubscribe((kind: ContentUnsub, topic: contentTopicB)) + node.unsubscribe((kind: ContentUnsub, topic: contentTopicB)).isOkOr: + assert false, "Failed to unsubscribe to topic: " & $error check node.wakuRelay.isSubscribed(shard) - node.unsubscribe((kind: ContentUnsub, topic: contentTopicA)) - check node.wakuRelay.isSubscribed(shard) - - node.unsubscribe((kind: ContentUnsub, topic: contentTopicC)) - check not node.wakuRelay.isSubscribed(shard) - ## Cleanup await node.stop() diff --git a/tests/waku_relay/utils.nim b/tests/waku_relay/utils.nim index 3e39294a1..821881f4c 100644 --- a/tests/waku_relay/utils.nim +++ b/tests/waku_relay/utils.nim @@ -5,6 +5,7 @@ import stew/byteutils, stew/shims/net as stewNet, chronos, + chronicles, libp2p/switch, libp2p/protocols/pubsub/pubsub @@ -50,12 +51,6 @@ proc setupRln*(node: WakuNode, identifier: uint) {.async.} = ) ) -proc setupRelayWithRln*( - node: WakuNode, identifier: uint, shards: seq[RelayShard] -) {.async.} = - await node.mountRelay(shards) - await setupRln(node, identifier) - proc subscribeToContentTopicWithHandler*( node: WakuNode, contentTopic: string ): Future[bool] = @@ -66,7 +61,9 @@ proc subscribeToContentTopicWithHandler*( if topic == topic: completionFut.complete(true) - node.subscribe((kind: ContentSub, topic: contentTopic), some(relayHandler)) + (node.subscribe((kind: ContentSub, topic: contentTopic), some(relayHandler))).isOkOr: + error "Failed to subscribe to content topic", error + completionFut.complete(true) return completionFut proc subscribeCompletionHandler*(node: WakuNode, pubsubTopic: string): Future[bool] = @@ -77,7 +74,9 @@ proc subscribeCompletionHandler*(node: WakuNode, pubsubTopic: string): Future[bo if topic == pubsubTopic: completionFut.complete(true) - node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler)) + (node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))).isOkOr: + error "Failed to subscribe to pubsub topic", error + completionFut.complete(false) return completionFut proc sendRlnMessage*( diff --git a/tests/waku_rln_relay/test_wakunode_rln_relay.nim b/tests/waku_rln_relay/test_wakunode_rln_relay.nim index 2a0fd5779..bd8edfcd3 100644 --- a/tests/waku_rln_relay/test_wakunode_rln_relay.nim +++ b/tests/waku_rln_relay/test_wakunode_rln_relay.nim @@ -58,7 +58,8 @@ procSuite "WakuNode - RLN relay": # set up three nodes # node1 - await node1.mountRelay(@[DefaultRelayShard]) + (await node1.mountRelay(@[DefaultRelayShard])).isOkOr: + assert false, "Failed to mount relay" # mount rlnrelay in off-chain mode let wakuRlnConfig1 = WakuRlnConfig( @@ -74,7 +75,8 @@ procSuite "WakuNode - RLN relay": await node1.start() # node 2 - await node2.mountRelay(@[DefaultRelayShard]) + (await node2.mountRelay(@[DefaultRelayShard])).isOkOr: + assert false, "Failed to mount relay" # mount rlnrelay in off-chain mode let wakuRlnConfig2 = WakuRlnConfig( rlnRelayDynamic: false, @@ -89,7 +91,8 @@ procSuite "WakuNode - RLN relay": await node2.start() # node 3 - await node3.mountRelay(@[DefaultRelayShard]) + (await node3.mountRelay(@[DefaultRelayShard])).isOkOr: + assert false, "Failed to mount relay" let wakuRlnConfig3 = WakuRlnConfig( rlnRelayDynamic: false, @@ -115,8 +118,14 @@ procSuite "WakuNode - RLN relay": if topic == DefaultPubsubTopic: completionFut.complete(true) - # mount the relay handler - node3.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler)) + ## The following unsubscription is necessary to remove the default relay handler, which is + ## added when mountRelay is called. + node3.unsubscribe((kind: PubsubUnsub, topic: DefaultPubsubTopic)).isOkOr: + assert false, "Failed to unsubscribe from topic: " & $error + + ## Subscribe to the relay topic to add the custom relay handler defined above + node3.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler)).isOkOr: + assert false, "Failed to subscribe to pubsub topic: " & $error await sleepAsync(2000.millis) # prepare the message payload @@ -126,6 +135,11 @@ procSuite "WakuNode - RLN relay": var message = WakuMessage(payload: @payload, contentTopic: contentTopic) doAssert(node1.wakuRlnRelay.unsafeAppendRLNProof(message, epochTime()).isOk()) + debug "Nodes participating in the test", + node1 = shortLog(node1.switch.peerInfo.peerId), + node2 = shortLog(node2.switch.peerInfo.peerId), + node3 = shortLog(node3.switch.peerInfo.peerId) + ## node1 publishes a message with a rate limit proof, the message is then relayed to node2 which in turn ## verifies the rate limit proof of the message and relays the message to node3 ## verification at node2 occurs inside a topic validator which is installed as part of the waku-rln-relay mount proc @@ -187,9 +201,18 @@ procSuite "WakuNode - RLN relay": elif topic == $shards[1]: rxMessagesTopic2 = rxMessagesTopic2 + 1 + ## This unsubscription is necessary to remove the default relay handler, which is + ## added when mountRelay is called. + nodes[2].unsubscribe((kind: PubsubUnsub, topic: $shards[0])).isOkOr: + assert false, "Failed to unsubscribe to pubsub topic: " & $error + nodes[2].unsubscribe((kind: PubsubUnsub, topic: $shards[1])).isOkOr: + assert false, "Failed to unsubscribe to pubsub topic: " & $error + # mount the relay handlers - nodes[2].subscribe((kind: PubsubSub, topic: $shards[0]), some(relayHandler)) - nodes[2].subscribe((kind: PubsubSub, topic: $shards[1]), some(relayHandler)) + nodes[2].subscribe((kind: PubsubSub, topic: $shards[0]), some(relayHandler)).isOkOr: + assert false, "Failed to subscribe to pubsub topic: " & $error + nodes[2].subscribe((kind: PubsubSub, topic: $shards[1]), some(relayHandler)).isOkOr: + assert false, "Failed to subscribe to pubsub topic: " & $error await sleepAsync(1000.millis) # generate some messages with rln proofs first. generating @@ -250,7 +273,8 @@ procSuite "WakuNode - RLN relay": # set up three nodes # node1 - await node1.mountRelay(@[DefaultRelayShard]) + (await node1.mountRelay(@[DefaultRelayShard])).isOkOr: + assert false, "Failed to mount relay" # mount rlnrelay in off-chain mode let wakuRlnConfig1 = WakuRlnConfig( @@ -266,7 +290,8 @@ procSuite "WakuNode - RLN relay": await node1.start() # node 2 - await node2.mountRelay(@[DefaultRelayShard]) + (await node2.mountRelay(@[DefaultRelayShard])).isOkOr: + assert false, "Failed to mount relay" # mount rlnrelay in off-chain mode let wakuRlnConfig2 = WakuRlnConfig( rlnRelayDynamic: false, @@ -281,7 +306,8 @@ procSuite "WakuNode - RLN relay": await node2.start() # node 3 - await node3.mountRelay(@[DefaultRelayShard]) + (await node3.mountRelay(@[DefaultRelayShard])).isOkOr: + assert false, "Failed to mount relay" let wakuRlnConfig3 = WakuRlnConfig( rlnRelayDynamic: false, @@ -307,8 +333,14 @@ procSuite "WakuNode - RLN relay": if topic == DefaultPubsubTopic: completionFut.complete(true) + ## The following unsubscription is necessary to remove the default relay handler, which is + ## added when mountRelay is called. + node3.unsubscribe((kind: PubsubUnsub, topic: DefaultPubsubTopic)).isOkOr: + assert false, "Failed to unsubscribe to pubsub topic: " & $error + # mount the relay handler - node3.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler)) + node3.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler)).isOkOr: + assert false, "Failed to subscribe to pubsub topic: " & $error await sleepAsync(2000.millis) # prepare the message payload @@ -366,7 +398,8 @@ procSuite "WakuNode - RLN relay": # set up three nodes # node1 - await node1.mountRelay(@[DefaultRelayShard]) + (await node1.mountRelay(@[DefaultRelayShard])).isOkOr: + assert false, "Failed to mount relay" # mount rlnrelay in off-chain mode let wakuRlnConfig1 = WakuRlnConfig( @@ -382,7 +415,8 @@ procSuite "WakuNode - RLN relay": await node1.start() # node 2 - await node2.mountRelay(@[DefaultRelayShard]) + (await node2.mountRelay(@[DefaultRelayShard])).isOkOr: + assert false, "Failed to mount relay" # mount rlnrelay in off-chain mode let wakuRlnConfig2 = WakuRlnConfig( @@ -397,7 +431,8 @@ procSuite "WakuNode - RLN relay": await node2.start() # node 3 - await node3.mountRelay(@[DefaultRelayShard]) + (await node3.mountRelay(@[DefaultRelayShard])).isOkOr: + assert false, "Failed to mount relay" # mount rlnrelay in off-chain mode let wakuRlnConfig3 = WakuRlnConfig( @@ -456,8 +491,14 @@ procSuite "WakuNode - RLN relay": if msg.payload == wm4.payload: completionFut4.complete(true) + ## The following unsubscription is necessary to remove the default relay handler, which is + ## added when mountRelay is called. + node3.unsubscribe((kind: PubsubUnsub, topic: DefaultPubsubTopic)).isOkOr: + assert false, "Failed to unsubscribe to pubsub topic: " & $error + # mount the relay handler for node3 - node3.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler)) + node3.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler)).isOkOr: + assert false, "Failed to subscribe to pubsub topic: " & $error await sleepAsync(2000.millis) ## node1 publishes and relays 4 messages to node2 @@ -500,12 +541,15 @@ procSuite "WakuNode - RLN relay": epochSizeSec: uint64 = 5 # This means rlnMaxEpochGap = 4 # Given both nodes mount relay and rlnrelay - await node1.mountRelay(shardSeq) + (await node1.mountRelay(shardSeq)).isOkOr: + assert false, "Failed to mount relay" let wakuRlnConfig1 = buildWakuRlnConfig(1, epochSizeSec, "wakunode_10") - await node1.mountRlnRelay(wakuRlnConfig1) + (await node1.mountRlnRelay(wakuRlnConfig1)).isOkOr: + assert false, "Failed to mount rlnrelay" # Mount rlnrelay in node2 in off-chain mode - await node2.mountRelay(@[DefaultRelayShard]) + (await node2.mountRelay(@[DefaultRelayShard])).isOkOr: + assert false, "Failed to mount relay" let wakuRlnConfig2 = buildWakuRlnConfig(2, epochSizeSec, "wakunode_11") await node2.mountRlnRelay(wakuRlnConfig2) @@ -548,7 +592,8 @@ procSuite "WakuNode - RLN relay": if msg == wm6: completionFut6.complete(true) - node2.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler)) + node2.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler)).isOkOr: + assert false, "Failed to subscribe to pubsub topic: " & $error # Given all messages have an rln proof and are published by the node 1 let publishSleepDuration: Duration = 5000.millis @@ -638,12 +683,14 @@ procSuite "WakuNode - RLN relay": # Given both nodes mount relay and rlnrelay # Mount rlnrelay in node1 in off-chain mode - await node1.mountRelay(shardSeq) + (await node1.mountRelay(shardSeq)).isOkOr: + assert false, "Failed to mount relay" let wakuRlnConfig1 = buildWakuRlnConfig(1, epochSizeSec, "wakunode_10") await node1.mountRlnRelay(wakuRlnConfig1) # Mount rlnrelay in node2 in off-chain mode - await node2.mountRelay(@[DefaultRelayShard]) + (await node2.mountRelay(@[DefaultRelayShard])).isOkOr: + assert false, "Failed to mount relay" let wakuRlnConfig2 = buildWakuRlnConfig(2, epochSizeSec, "wakunode_11") await node2.mountRlnRelay(wakuRlnConfig2) diff --git a/tests/waku_rln_relay/utils_static.nim b/tests/waku_rln_relay/utils_static.nim index d2a781fcd..719ce465c 100644 --- a/tests/waku_rln_relay/utils_static.nim +++ b/tests/waku_rln_relay/utils_static.nim @@ -5,6 +5,7 @@ import stew/byteutils, stew/shims/net as stewNet, chronos, + chronicles, libp2p/switch, libp2p/protocols/pubsub/pubsub @@ -45,7 +46,10 @@ proc subscribeCompletionHandler*(node: WakuNode, pubsubTopic: string): Future[bo if topic == pubsubTopic: completionFut.complete(true) - node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler)) + node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler)).isOkOr: + error "failed to subscribe to relay", topic = pubsubTopic, error = error + completionFut.complete(false) + return completionFut proc sendRlnMessage*( diff --git a/tests/wakunode_rest/test_rest_cors.nim b/tests/wakunode_rest/test_rest_cors.nim index fc32440d7..7d29711b1 100644 --- a/tests/wakunode_rest/test_rest_cors.nim +++ b/tests/wakunode_rest/test_rest_cors.nim @@ -102,7 +102,8 @@ suite "Waku v2 REST API CORS Handling": # Given let node = testWakuNode() await node.start() - await node.mountRelay() + (await node.mountRelay()).isOkOr: + assert false, "Failed to mount relay" var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") @@ -155,7 +156,8 @@ suite "Waku v2 REST API CORS Handling": # Given let node = testWakuNode() await node.start() - await node.mountRelay() + (await node.mountRelay()).isOkOr: + assert false, "Failed to mount relay" var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") @@ -211,7 +213,8 @@ suite "Waku v2 REST API CORS Handling": # Given let node = testWakuNode() await node.start() - await node.mountRelay() + (await node.mountRelay()).isOkOr: + assert false, "Failed to mount relay" var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") @@ -258,7 +261,8 @@ suite "Waku v2 REST API CORS Handling": # Given let node = testWakuNode() await node.start() - await node.mountRelay() + (await node.mountRelay()).isOkOr: + assert false, "Failed to mount relay" var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") diff --git a/tests/wakunode_rest/test_rest_debug.nim b/tests/wakunode_rest/test_rest_debug.nim index f4e66eb20..3129b3544 100644 --- a/tests/wakunode_rest/test_rest_debug.nim +++ b/tests/wakunode_rest/test_rest_debug.nim @@ -37,7 +37,8 @@ suite "Waku v2 REST API - Debug": # Given let node = testWakuNode() await node.start() - await node.mountRelay() + (await node.mountRelay()).isOkOr: + assert false, "Failed to mount relay" var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") @@ -66,7 +67,8 @@ suite "Waku v2 REST API - Debug": # Given let node = testWakuNode() await node.start() - await node.mountRelay() + (await node.mountRelay()).isOkOr: + assert false, "Failed to mount relay" var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") diff --git a/tests/wakunode_rest/test_rest_filter.nim b/tests/wakunode_rest/test_rest_filter.nim index 358872769..556b6b52e 100644 --- a/tests/wakunode_rest/test_rest_filter.nim +++ b/tests/wakunode_rest/test_rest_filter.nim @@ -54,7 +54,9 @@ proc init(T: type RestFilterTest): Future[T] {.async.} = await allFutures(testSetup.serviceNode.start(), testSetup.subscriberNode.start()) - await testSetup.serviceNode.mountRelay() + (await testSetup.serviceNode.mountRelay()).isOkOr: + assert false, "Failed to mount relay: " & $error + await testSetup.serviceNode.mountFilter(messageCacheTTL = 1.seconds) await testSetup.subscriberNode.mountFilterClient() @@ -278,7 +280,8 @@ suite "Waku v2 Rest API - Filter V2": subPeerId = restFilterTest.subscriberNode.peerInfo.toRemotePeerInfo().peerId restFilterTest.messageCache.pubsubSubscribe(DefaultPubsubTopic) - restFilterTest.serviceNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic)) + restFilterTest.serviceNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic)).isOkOr: + assert false, "Failed to subscribe to topic: " & $error # When var requestBody = FilterSubscribeRequest( @@ -323,7 +326,8 @@ suite "Waku v2 Rest API - Filter V2": # setup filter service and client node let restFilterTest = await RestFilterTest.init() let subPeerId = restFilterTest.subscriberNode.peerInfo.toRemotePeerInfo().peerId - restFilterTest.serviceNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic)) + restFilterTest.serviceNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic)).isOkOr: + assert false, "Failed to subscribe to topic: " & $error let requestBody = FilterSubscribeRequest( requestId: "1001", @@ -394,7 +398,8 @@ suite "Waku v2 Rest API - Filter V2": # setup filter service and client node let restFilterTest = await RestFilterTest.init() let subPeerId = restFilterTest.subscriberNode.peerInfo.toRemotePeerInfo().peerId - restFilterTest.serviceNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic)) + restFilterTest.serviceNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic)).isOkOr: + assert false, "Failed to subscribe to topic: " & $error let requestBody = FilterSubscribeRequest( requestId: "1001", diff --git a/tests/wakunode_rest/test_rest_health.nim b/tests/wakunode_rest/test_rest_health.nim index 7d842a3eb..ac2fd9eac 100644 --- a/tests/wakunode_rest/test_rest_health.nim +++ b/tests/wakunode_rest/test_rest_health.nim @@ -42,7 +42,8 @@ suite "Waku v2 REST API - health": let node = testWakuNode() let healthMonitor = WakuNodeHealthMonitor() await node.start() - await node.mountRelay() + (await node.mountRelay()).isOkOr: + assert false, "Failed to mount relay" healthMonitor.setOverallHealth(HealthStatus.INITIALIZING) diff --git a/tests/wakunode_rest/test_rest_lightpush.nim b/tests/wakunode_rest/test_rest_lightpush.nim index 2c4ec0959..72e309a13 100644 --- a/tests/wakunode_rest/test_rest_lightpush.nim +++ b/tests/wakunode_rest/test_rest_lightpush.nim @@ -58,8 +58,10 @@ proc init( testSetup.consumerNode.start(), ) - await testSetup.consumerNode.mountRelay() - await testSetup.serviceNode.mountRelay() + (await testSetup.consumerNode.mountRelay()).isOkOr: + assert false, "Failed to mount relay: " & $error + (await testSetup.serviceNode.mountRelay()).isOkOr: + assert false, "Failed to mount relay: " & $error await testSetup.serviceNode.mountLightPush(rateLimit) testSetup.pushNode.mountLightPushClient() @@ -129,10 +131,13 @@ suite "Waku v2 Rest API - lightpush": restLightPushTest.consumerNode.subscribe( (kind: PubsubSub, topic: DefaultPubsubTopic) - ) + ).isOkOr: + assert false, "Failed to subscribe to relay: " & $error + restLightPushTest.serviceNode.subscribe( (kind: PubsubSub, topic: DefaultPubsubTopic) - ) + ).isOkOr: + assert false, "Failed to subscribe to relay: " & $error require: toSeq(restLightPushTest.serviceNode.wakuRelay.subscribedTopics).len == 1 @@ -161,7 +166,8 @@ suite "Waku v2 Rest API - lightpush": restLightPushTest.serviceNode.subscribe( (kind: PubsubSub, topic: DefaultPubsubTopic) - ) + ).isOkOr: + assert false, "Failed to subscribe to relay: " & $error require: toSeq(restLightPushTest.serviceNode.wakuRelay.subscribedTopics).len == 1 @@ -218,10 +224,13 @@ suite "Waku v2 Rest API - lightpush": restLightPushTest.consumerNode.subscribe( (kind: PubsubSub, topic: DefaultPubsubTopic) - ) + ).isOkOr: + assert false, "Failed to subscribe to relay: " & $error + restLightPushTest.serviceNode.subscribe( (kind: PubsubSub, topic: DefaultPubsubTopic) - ) + ).isOkOr: + assert false, "Failed to subscribe to relay: " & $error require: toSeq(restLightPushTest.serviceNode.wakuRelay.subscribedTopics).len == 1 diff --git a/tests/wakunode_rest/test_rest_lightpush_legacy.nim b/tests/wakunode_rest/test_rest_lightpush_legacy.nim index 8176aed7a..e1d6dca30 100644 --- a/tests/wakunode_rest/test_rest_lightpush_legacy.nim +++ b/tests/wakunode_rest/test_rest_lightpush_legacy.nim @@ -58,8 +58,10 @@ proc init( testSetup.consumerNode.start(), ) - await testSetup.consumerNode.mountRelay() - await testSetup.serviceNode.mountRelay() + (await testSetup.consumerNode.mountRelay()).isOkOr: + assert false, "Failed to mount relay" + (await testSetup.serviceNode.mountRelay()).isOkOr: + assert false, "Failed to mount relay" await testSetup.serviceNode.mountLegacyLightPush(rateLimit) testSetup.pushNode.mountLegacyLightPushClient() @@ -124,10 +126,13 @@ suite "Waku v2 Rest API - lightpush": restLightPushTest.consumerNode.subscribe( (kind: PubsubSub, topic: DefaultPubsubTopic) - ) + ).isOkOr: + assert false, "Failed to subscribe to topic" + restLightPushTest.serviceNode.subscribe( (kind: PubsubSub, topic: DefaultPubsubTopic) - ) + ).isOkOr: + assert false, "Failed to subscribe to topic" require: toSeq(restLightPushTest.serviceNode.wakuRelay.subscribedTopics).len == 1 @@ -156,7 +161,8 @@ suite "Waku v2 Rest API - lightpush": restLightPushTest.serviceNode.subscribe( (kind: PubsubSub, topic: DefaultPubsubTopic) - ) + ).isOkOr: + assert false, "Failed to subscribe to topic" require: toSeq(restLightPushTest.serviceNode.wakuRelay.subscribedTopics).len == 1 @@ -216,10 +222,13 @@ suite "Waku v2 Rest API - lightpush": restLightPushTest.consumerNode.subscribe( (kind: PubsubSub, topic: DefaultPubsubTopic) - ) + ).isOkOr: + assert false, "Failed to subscribe to topic" + restLightPushTest.serviceNode.subscribe( (kind: PubsubSub, topic: DefaultPubsubTopic) - ) + ).isOkOr: + assert false, "Failed to subscribe to topic" require: toSeq(restLightPushTest.serviceNode.wakuRelay.subscribedTopics).len == 1 diff --git a/tests/wakunode_rest/test_rest_relay.nim b/tests/wakunode_rest/test_rest_relay.nim index 9732d114b..acfa05bab 100644 --- a/tests/wakunode_rest/test_rest_relay.nim +++ b/tests/wakunode_rest/test_rest_relay.nim @@ -41,7 +41,8 @@ suite "Waku v2 Rest API - Relay": # Given let node = testWakuNode() await node.start() - await node.mountRelay() + (await node.mountRelay()).isOkOr: + assert false, "Failed to mount relay" var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") @@ -95,7 +96,8 @@ suite "Waku v2 Rest API - Relay": shard3 = RelayShard(clusterId: DefaultClusterId, shardId: 3) shard4 = RelayShard(clusterId: DefaultClusterId, shardId: 4) - await node.mountRelay(@[shard0, shard1, shard2, shard3]) + (await node.mountRelay(@[shard0, shard1, shard2, shard3, shard4])).isOkOr: + assert false, "Failed to mount relay" var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") @@ -144,7 +146,8 @@ suite "Waku v2 Rest API - Relay": # Given let node = testWakuNode() await node.start() - await node.mountRelay() + (await node.mountRelay()).isOkOr: + assert false, "Failed to mount relay" var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") @@ -220,7 +223,8 @@ suite "Waku v2 Rest API - Relay": # Given let node = testWakuNode() await node.start() - await node.mountRelay() + (await node.mountRelay()).isOkOr: + assert false, "Failed to mount relay" let wakuRlnConfig = WakuRlnConfig( rlnRelayDynamic: false, rlnRelayCredIndex: some(1.uint), @@ -245,7 +249,8 @@ suite "Waku v2 Rest API - Relay": let client = newRestHttpClient(initTAddress(restAddress, restPort)) - node.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic)) + node.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic)).isOkOr: + assert false, "Failed to subscribe to pubsub topic" require: toSeq(node.wakuRelay.subscribedTopics).len == 1 @@ -275,7 +280,8 @@ suite "Waku v2 Rest API - Relay": # Given let node = testWakuNode() await node.start() - await node.mountRelay() + (await node.mountRelay()).isOkOr: + assert false, "Failed to mount relay" require node.mountSharding(1, 8).isOk var restPort = Port(0) @@ -324,11 +330,13 @@ suite "Waku v2 Rest API - Relay": # Given let node = testWakuNode() await node.start() - await node.mountRelay() + (await node.mountRelay()).isOkOr: + assert false, "Failed to mount relay" var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet() + restServer.start() restPort = restServer.httpServer.address.port # update with bound port for client use @@ -347,11 +355,18 @@ suite "Waku v2 Rest API - Relay": cache.contentSubscribe("/waku/2/default-contentY/proto") installRelayApiHandlers(restServer.router, node, cache) - restServer.start() # When let client = newRestHttpClient(initTAddress(restAddress, restPort)) - let response = await client.relayDeleteAutoSubscriptionsV1(contentTopics) + + var response = await client.relayPostAutoSubscriptionsV1(contentTopics) + + check: + response.status == 200 + $response.contentType == $MIMETYPE_TEXT + response.data == "OK" + + response = await client.relayDeleteAutoSubscriptionsV1(contentTopics) # Then check: @@ -373,7 +388,8 @@ suite "Waku v2 Rest API - Relay": # Given let node = testWakuNode() await node.start() - await node.mountRelay() + (await node.mountRelay()).isOkOr: + assert false, "Failed to mount relay" var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") @@ -437,7 +453,8 @@ suite "Waku v2 Rest API - Relay": # Given let node = testWakuNode() await node.start() - await node.mountRelay() + (await node.mountRelay()).isOkOr: + assert false, "Failed to mount relay" let wakuRlnConfig = WakuRlnConfig( rlnRelayDynamic: false, rlnRelayCredIndex: some(1.uint), @@ -461,7 +478,8 @@ suite "Waku v2 Rest API - Relay": let client = newRestHttpClient(initTAddress(restAddress, restPort)) - node.subscribe((kind: ContentSub, topic: DefaultContentTopic)) + node.subscribe((kind: ContentSub, topic: DefaultContentTopic)).isOkOr: + assert false, "Failed to subscribe to content topic: " & $error require: toSeq(node.wakuRelay.subscribedTopics).len == 1 @@ -489,7 +507,8 @@ suite "Waku v2 Rest API - Relay": # Given let node = testWakuNode() await node.start() - await node.mountRelay() + (await node.mountRelay()).isOkOr: + assert false, "Failed to mount relay" let wakuRlnConfig = WakuRlnConfig( rlnRelayDynamic: false, rlnRelayCredIndex: some(1.uint), @@ -539,7 +558,8 @@ suite "Waku v2 Rest API - Relay": # Given let node = testWakuNode() await node.start() - await node.mountRelay() + (await node.mountRelay()).isOkOr: + assert false, "Failed to mount relay" let wakuRlnConfig = WakuRlnConfig( rlnRelayDynamic: false, rlnRelayCredIndex: some(1.uint), @@ -564,7 +584,8 @@ suite "Waku v2 Rest API - Relay": let client = newRestHttpClient(initTAddress(restAddress, restPort)) - node.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic)) + node.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic)).isOkOr: + assert false, "Failed to subscribe to pubsub topic: " & $error require: toSeq(node.wakuRelay.subscribedTopics).len == 1 @@ -594,7 +615,8 @@ suite "Waku v2 Rest API - Relay": # Given let node = testWakuNode() await node.start() - await node.mountRelay() + (await node.mountRelay()).isOkOr: + assert false, "Failed to mount relay" let wakuRlnConfig = WakuRlnConfig( rlnRelayDynamic: false, rlnRelayCredIndex: some(1.uint), @@ -619,7 +641,8 @@ suite "Waku v2 Rest API - Relay": let client = newRestHttpClient(initTAddress(restAddress, restPort)) - node.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic)) + node.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic)).isOkOr: + assert false, "Failed to subscribe to pubsub topic: " & $error require: toSeq(node.wakuRelay.subscribedTopics).len == 1 diff --git a/tests/wakunode_rest/test_rest_store.nim b/tests/wakunode_rest/test_rest_store.nim index c31e3939c..d0631bfbf 100644 --- a/tests/wakunode_rest/test_rest_store.nim +++ b/tests/wakunode_rest/test_rest_store.nim @@ -86,7 +86,8 @@ procSuite "Waku Rest API - Store v3": asyncTest "invalid cursor": let node = testWakuNode() await node.start() - await node.mountRelay() + (await node.mountRelay()).isOkOr: + error "failed to mount relay", error = error var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") @@ -165,7 +166,8 @@ procSuite "Waku Rest API - Store v3": asyncTest "Filter by start and end time": let node = testWakuNode() await node.start() - await node.mountRelay() + (await node.mountRelay()).isOkOr: + error "failed to mount relay", error = error var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") @@ -330,7 +332,8 @@ procSuite "Waku Rest API - Store v3": # Given let node = testWakuNode() await node.start() - await node.mountRelay() + (await node.mountRelay()).isOkOr: + error "failed to mount relay", error = error var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") @@ -403,7 +406,8 @@ procSuite "Waku Rest API - Store v3": # Given let node = testWakuNode() await node.start() - await node.mountRelay() + (await node.mountRelay()).isOkOr: + error "failed to mount relay", error = error var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") @@ -492,7 +496,8 @@ procSuite "Waku Rest API - Store v3": # Given let node = testWakuNode() await node.start() - await node.mountRelay() + (await node.mountRelay()).isOkOr: + error "failed to mount relay", error = error var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") @@ -548,7 +553,8 @@ procSuite "Waku Rest API - Store v3": # Given let node = testWakuNode() await node.start() - await node.mountRelay() + (await node.mountRelay()).isOkOr: + error "failed to mount relay", error = error var restPort = Port(0) let restAddress = parseIpAddress("0.0.0.0") diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index 3142ff766..78093c6cd 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -301,12 +301,12 @@ proc setupProtocols( debug "Setting max message size", num_bytes = parsedMaxMsgSize - try: + ( await mountRelay( node, shards, peerExchangeHandler = peerExchangeHandler, int(parsedMaxMsgSize) ) - except CatchableError: - return err("failed to mount waku relay protocol: " & getCurrentExceptionMsg()) + ).isOkOr: + return err("failed to mount waku relay protocol: " & $error) # Add validation keys to protected topics var subscribedProtectedShards: seq[ProtectedShard] diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index ce86c3c57..bb8b6f9c3 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -256,7 +256,7 @@ proc mountStoreSync*( ## Waku relay -proc registerRelayDefaultHandler*(node: WakuNode, topic: PubsubTopic) = +proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) = if node.wakuRelay.isSubscribed(topic): return @@ -301,30 +301,34 @@ proc registerRelayDefaultHandler*(node: WakuNode, topic: PubsubTopic) = proc subscribe*( node: WakuNode, subscription: SubscriptionEvent, handler = none(WakuRelayHandler) -) = +): Result[void, string] = ## Subscribes to a PubSub or Content topic. Triggers handler when receiving messages on ## this topic. WakuRelayHandler is a method that takes a topic and a Waku message. if node.wakuRelay.isNil(): error "Invalid API call to `subscribe`. WakuRelay not mounted." - return + return err("Invalid API call to `subscribe`. WakuRelay not mounted.") let (pubsubTopic, contentTopicOp) = case subscription.kind of ContentSub: let shard = node.wakuSharding.getShard((subscription.topic)).valueOr: error "Autosharding error", error = error - return + return err("Autosharding error: " & error) ($shard, some(subscription.topic)) of PubsubSub: (subscription.topic, none(ContentTopic)) else: - return + return err("Unsupported subscription type in relay subscribe") + + if node.wakuRelay.isSubscribed(pubsubTopic): + debug "already subscribed to topic", pubsubTopic + return err("Already subscribed to topic: " & $pubsubTopic) if contentTopicOp.isSome() and node.contentTopicHandlers.hasKey(contentTopicOp.get()): error "Invalid API call to `subscribe`. Was already subscribed" - return + return err("Invalid API call to `subscribe`. Was already subscribed") node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: pubsubTopic)) node.registerRelayDefaultHandler(pubsubTopic) @@ -335,43 +339,49 @@ proc subscribe*( if contentTopicOp.isSome(): node.contentTopicHandlers[contentTopicOp.get()] = wrappedHandler -proc unsubscribe*(node: WakuNode, subscription: SubscriptionEvent) = + return ok() + +proc unsubscribe*( + node: WakuNode, subscription: SubscriptionEvent +): Result[void, string] = ## Unsubscribes from a specific PubSub or Content topic. if node.wakuRelay.isNil(): error "Invalid API call to `unsubscribe`. WakuRelay not mounted." - return + return err("Invalid API call to `unsubscribe`. WakuRelay not mounted.") let (pubsubTopic, contentTopicOp) = case subscription.kind of ContentUnsub: let shard = node.wakuSharding.getShard((subscription.topic)).valueOr: error "Autosharding error", error = error - return + return err("Autosharding error: " & error) ($shard, some(subscription.topic)) of PubsubUnsub: (subscription.topic, none(ContentTopic)) else: - return + return err("Unsupported subscription type in relay unsubscribe") if not node.wakuRelay.isSubscribed(pubsubTopic): - error "Invalid API call to `unsubscribe`. Was not subscribed" + error "Invalid API call to `unsubscribe`. Was not subscribed", pubsubTopic return + err("Invalid API call to `unsubscribe`. Was not subscribed to: " & $pubsubTopic) if contentTopicOp.isSome(): # Remove this handler only var handler: TopicHandler + ## TODO: refactor this part. I think we can simplify it if node.contentTopicHandlers.pop(contentTopicOp.get(), handler): debug "unsubscribe", contentTopic = contentTopicOp.get() - node.wakuRelay.unsubscribe(pubsubTopic, handler) - - if contentTopicOp.isNone() or node.wakuRelay.topics.getOrDefault(pubsubTopic).len == 1: - # Remove all handlers + node.wakuRelay.unsubscribe(pubsubTopic) + else: debug "unsubscribe", pubsubTopic = pubsubTopic - node.wakuRelay.unsubscribeAll(pubsubTopic) + node.wakuRelay.unsubscribe(pubsubTopic) node.topicSubscriptionQueue.emit((kind: PubsubUnsub, topic: pubsubTopic)) + return ok() + proc publish*( node: WakuNode, pubsubTopicOp: Option[PubsubTopic], message: WakuMessage ): Future[Result[void, string]] {.async, gcsafe.} = @@ -433,20 +443,17 @@ proc mountRelay*( shards: seq[RelayShard] = @[], peerExchangeHandler = none(RoutingRecordsHandler), maxMessageSize = int(DefaultMaxWakuMessageSize), -) {.async, gcsafe.} = +): Future[Result[void, string]] {.async.} = if not node.wakuRelay.isNil(): error "wakuRelay already mounted, skipping" - return + return err("wakuRelay already mounted, skipping") ## The default relay topics is the union of all configured topics plus default PubsubTopic(s) info "mounting relay protocol" - let initRes = WakuRelay.new(node.switch, maxMessageSize) - if initRes.isErr(): - error "failed mounting relay protocol", error = initRes.error - return - - node.wakuRelay = initRes.value + node.wakuRelay = WakuRelay.new(node.switch, maxMessageSize).valueOr: + error "failed mounting relay protocol", error = error + return err("failed mounting relay protocol: " & error) ## Add peer exchange handler if peerExchangeHandler.isSome(): @@ -459,11 +466,17 @@ proc mountRelay*( node.switch.mount(node.wakuRelay, protocolMatcher(WakuRelayCodec)) - info "relay mounted successfully", shards = shards + ## Make sure we don't have duplicates + let uniqueShards = deduplicate(shards) # Subscribe to shards - for shard in shards: - node.subscribe((kind: PubsubSub, topic: $shard)) + for shard in uniqueShards: + node.subscribe((kind: PubsubSub, topic: $shard)).isOkOr: + error "failed to subscribe to shard", error = error + return err("failed to subscribe to shard in mountRelay: " & error) + + info "relay mounted successfully", shards = uniqueShards + return ok() ## Waku filter @@ -1218,6 +1231,7 @@ proc mountRlnRelay*( raise newException(CatchableError, "failed to mount WakuRlnRelay: " & rlnRelayRes.error) let rlnRelay = rlnRelayRes.get() + if (rlnConf.rlnRelayUserMessageLimit > rlnRelay.groupManager.rlnRelayMaxMessageLimit): error "rln-relay-user-message-limit can't exceed the MAX_MESSAGE_LIMIT in the rln contract" let validator = generateRlnValidator(rlnRelay, spamHandler) diff --git a/waku/waku_api/rest/builder.nim b/waku/waku_api/rest/builder.nim index cb324075a..747835fc8 100644 --- a/waku/waku_api/rest/builder.nim +++ b/waku/waku_api/rest/builder.nim @@ -18,7 +18,8 @@ import waku/waku_api/rest/legacy_store/handlers as rest_store_legacy_api, waku/waku_api/rest/health/handlers as rest_health_api, waku/waku_api/rest/admin/handlers as rest_admin_api, - waku/waku_core/topics + waku/waku_core/topics, + waku/waku_relay/protocol ## Monitoring and external interfaces @@ -129,18 +130,31 @@ proc startRestServerProtocolSupport*( ## Relay REST API if conf.relay: + ## This MessageCache is used, f.e., in js-waku<>nwaku interop tests. + ## js-waku tests asks nwaku-docker through REST whether a message is properly received. let cache = MessageCache.init(int(conf.restRelayCacheCapacity)) - let handler = messageCacheHandler(cache) + let handler: WakuRelayHandler = messageCacheHandler(cache) for shard in conf.shards: let pubsubTopic = $RelayShard(clusterId: conf.clusterId, shardId: shard) cache.pubsubSubscribe(pubsubTopic) - node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(handler)) + + ## TODO: remove this line. use observer-observable pattern + ## within waku_node::registerRelayDefaultHandler + discard node.wakuRelay.subscribe(pubsubTopic, handler) for contentTopic in conf.contentTopics: cache.contentSubscribe(contentTopic) - node.subscribe((kind: ContentSub, topic: contentTopic), some(handler)) + + let shard = node.wakuSharding.getShard(contentTopic).valueOr: + error "Autosharding error in REST", error = error + continue + let pubsubTopic = $shard + + ## TODO: remove this line. use observer-observable pattern + ## within waku_node::registerRelayDefaultHandler + discard node.wakuRelay.subscribe(pubsubTopic, handler) installRelayApiHandlers(router, node, cache) else: diff --git a/waku/waku_api/rest/relay/handlers.nim b/waku/waku_api/rest/relay/handlers.nim index 7ee0ee7e3..252375208 100644 --- a/waku/waku_api/rest/relay/handlers.nim +++ b/waku/waku_api/rest/relay/handlers.nim @@ -66,9 +66,13 @@ proc installRelayApiHandlers*( for pubsubTopic in newTopics: cache.pubsubSubscribe(pubsubTopic) + node.subscribe( (kind: PubsubSub, topic: pubsubTopic), some(messageCacheHandler(cache)) - ) + ).isOkOr: + let errorMsg = "Subscribe failed:" & $error + error "SUBSCRIBE failed", error = errorMsg + return RestApiResponse.internalServerError(errorMsg) return RestApiResponse.ok() @@ -88,7 +92,10 @@ proc installRelayApiHandlers*( # Unsubscribe all handlers from requested topics for pubsubTopic in req: cache.pubsubUnsubscribe(pubsubTopic) - node.unsubscribe((kind: PubsubUnsub, topic: pubsubTopic)) + node.unsubscribe((kind: PubsubUnsub, topic: pubsubTopic)).isOkOr: + let errorMsg = "Unsubscribe failed:" & $error + error "UNSUBSCRIBE failed", error = errorMsg + return RestApiResponse.internalServerError(errorMsg) # Successfully unsubscribed from all requested topics return RestApiResponse.ok() @@ -193,9 +200,13 @@ proc installRelayApiHandlers*( for contentTopic in newTopics: cache.contentSubscribe(contentTopic) + node.subscribe( (kind: ContentSub, topic: contentTopic), some(messageCacheHandler(cache)) - ) + ).isOkOr: + let errorMsg = "Subscribe failed:" & $error + error "SUBSCRIBE failed", error = errorMsg + return RestApiResponse.internalServerError(errorMsg) return RestApiResponse.ok() @@ -211,7 +222,10 @@ proc installRelayApiHandlers*( for contentTopic in req: cache.contentUnsubscribe(contentTopic) - node.unsubscribe((kind: ContentUnsub, topic: contentTopic)) + node.unsubscribe((kind: ContentUnsub, topic: contentTopic)).isOkOr: + let errorMsg = "Unsubscribe failed:" & $error + error "UNSUBSCRIBE failed", error = errorMsg + return RestApiResponse.internalServerError(errorMsg) return RestApiResponse.ok() diff --git a/waku/waku_relay/protocol.nim b/waku/waku_relay/protocol.nim index 4eeaf4607..daaf056b7 100644 --- a/waku/waku_relay/protocol.nim +++ b/waku/waku_relay/protocol.nim @@ -129,7 +129,8 @@ type # the second entry contains the error messages to be returned when the validator fails wakuValidators: seq[tuple[handler: WakuValidatorHandler, errorMessage: string]] # a map of validators to error messages to return when validation fails - validatorInserted: Table[PubsubTopic, bool] + topicValidator: Table[PubsubTopic, ValidatorHandler] + # map topic with its assigned validator within pubsub publishObservers: seq[PublishObserver] topicsHealth*: Table[string, TopicHealth] onTopicHealthChange*: TopicHealthChangeHandler @@ -427,7 +428,7 @@ proc isSubscribed*(w: WakuRelay, topic: PubsubTopic): bool = proc subscribedTopics*(w: WakuRelay): seq[PubsubTopic] = return toSeq(GossipSub(w).topics.keys()) -proc generateOrderedValidator(w: WakuRelay): auto {.gcsafe.} = +proc generateOrderedValidator(w: WakuRelay): ValidatorHandler {.gcsafe.} = # rejects messages that are not WakuMessage let wrappedValidator = proc( pubsubTopic: string, message: messages.Message @@ -516,9 +517,10 @@ proc subscribe*( # Add the ordered validator to the topic # This assumes that if `w.validatorInserted.hasKey(pubSubTopic) is true`, it contains the ordered validator. # Otherwise this might lead to unintended behaviour. - if not w.validatorInserted.hasKey(pubSubTopic): + if not w.topicValidator.hasKey(pubSubTopic): + let newValidator = w.generateOrderedValidator() procCall GossipSub(w).addValidator(pubSubTopic, w.generateOrderedValidator()) - w.validatorInserted[pubSubTopic] = true + w.topicValidator[pubSubTopic] = newValidator # set this topic parameters for scoring w.topicParams[pubsubTopic] = TopicParameters @@ -534,14 +536,36 @@ proc unsubscribeAll*(w: WakuRelay, pubsubTopic: PubsubTopic) = debug "unsubscribe all", pubsubTopic = pubsubTopic procCall GossipSub(w).unsubscribeAll(pubsubTopic) - w.validatorInserted.del(pubsubTopic) + w.topicValidator.del(pubsubTopic) -proc unsubscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: TopicHandler) = - ## Unsubscribe this handler on this pubsub topic +proc unsubscribe*(w: WakuRelay, pubsubTopic: PubsubTopic) = + if not w.topicValidator.hasKey(pubsubTopic): + error "unsubscribe no validator for this topic", pubsubTopic + return - debug "unsubscribe", pubsubTopic = pubsubTopic + if pubsubtopic notin Pubsub(w).topics: + error "not subscribed to the given topic", pubsubTopic + return - procCall GossipSub(w).unsubscribe(pubsubTopic, handler) + var topicHandlerSeq: seq[TopicHandler] + var topicValidator: ValidatorHandler + try: + topicHandlerSeq = Pubsub(w).topics[pubsubTopic] + if topicHandlerSeq.len == 0: + error "unsubscribe no handler for this topic", pubsubTopic + return + topicValidator = w.topicValidator[pubsubTopic] + except KeyError: + error "exception in unsubscribe", pubsubTopic, error = getCurrentExceptionMsg() + return + + let topicHandler = topicHandlerSeq[0] + + debug "unsubscribe", pubsubTopic + procCall GossipSub(w).unsubscribe($pubsubTopic, topicHandler) + ## TODO: uncomment the following line when https://github.com/vacp2p/nim-libp2p/pull/1356 + ## is available in a nim-libp2p release. + # procCall GossipSub(w).removeValidator(pubsubTopic, topicValidator) proc publish*( w: WakuRelay, pubsubTopic: PubsubTopic, wakuMessage: WakuMessage @@ -624,7 +648,4 @@ proc getNumConnectedPeers*( proc getSubscribedTopics*(w: WakuRelay): seq[PubsubTopic] = ## Returns a seq containing the current list of subscribed topics - var topics: seq[PubsubTopic] - for t in w.validatorInserted.keys(): - topics.add(t) - return topics + return PubSub(w).topics.keys.toSeq().mapIt(cast[PubsubTopic](it))