diff --git a/tests/api/test_api_send.nim b/tests/api/test_api_send.nim new file mode 100644 index 000000000..f14d57d44 --- /dev/null +++ b/tests/api/test_api_send.nim @@ -0,0 +1,153 @@ +{.used.} + +import + std/[options, sequtils, strutils], + chronos, + testutils/unittests, + stew/byteutils, + libp2p/[switch, peerinfo] +import ../testlib/[common, wakucore, wakunode, testasync, futures, testutils] +import + waku, + waku/ + [ + waku_node, + waku_core, + waku_relay/protocol, + waku_filter_v2/common, + waku_store/common, + ] +import waku/api/api_conf, waku/factory/waku_conf, waku/factory/networks_config + +suite "Waku API - Send": + var + relayNode1 {.threadvar.}: WakuNode + relayNode1PeerInfo {.threadvar.}: RemotePeerInfo + relayNode1PeerId {.threadvar.}: PeerId + + relayNode2 {.threadvar.}: WakuNode + relayNode2PeerInfo {.threadvar.}: RemotePeerInfo + relayNode2PeerId {.threadvar.}: PeerId + + lightpushNode {.threadvar.}: WakuNode + lightpushNodePeerInfo {.threadvar.}: RemotePeerInfo + lightpushNodePeerId {.threadvar.}: PeerId + + storeNode {.threadvar.}: WakuNode + storeNodePeerInfo {.threadvar.}: RemotePeerInfo + storeNodePeerId {.threadvar.}: PeerId + + asyncSetup: + # handlerFuture = newPushHandlerFuture() + # handler = proc( + # peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage + # ): Future[WakuLightPushResult[void]] {.async.} = + # handlerFuture.complete((pubsubTopic, message)) + # return ok() + + relayNode1 = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + relayNode2 = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + + lightpushNode = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + storeNode = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + + await allFutures( + relayNode1.start(), relayNode2.start(), lightpushNode.start(), storeNode.start() + ) + + (await relayNode1.mountRelay()).isOkOr: + raiseAssert "Failed to mount relay" + + (await relayNode2.mountRelay()).isOkOr: + raiseAssert "Failed to mount relay" + + (await lightpushNode.mountRelay()).isOkOr: + raiseAssert "Failed to mount relay" + await lightpushNode.mountLightPush() + + (await storeNode.mountRelay()).isOkOr: + raiseAssert "Failed to mount relay" + await storeNode.mountStore() + + relayNode1PeerInfo = relayNode1.peerInfo.toRemotePeerInfo() + relayNode1PeerId = relayNode1.peerInfo.peerId + + relayNode2PeerInfo = relayNode2.peerInfo.toRemotePeerInfo() + relayNode2PeerId = relayNode2.peerInfo.peerId + + lightpushNodePeerInfo = lightpushNode.peerInfo.toRemotePeerInfo() + lightpushNodePeerId = lightpushNode.peerInfo.peerId + + storeNodePeerInfo = storeNode.peerInfo.toRemotePeerInfo() + storeNodePeerId = storeNode.peerInfo.peerId + asyncTeardown: + await allFutures( + relayNode1.stop(), relayNode2.stop(), lightpushNode.stop(), storeNode.stop() + ) + + asyncTest "Check API availability (unhealthy node)": + # Create a node config that doesn't start or has no peers + let nodeConfig = NodeConfig.init( + mode = WakuMode.Core, + protocolsConfig = ProtocolsConfig.init( + entryNodes = @[], + clusterId = 1, + autoShardingConfig = AutoShardingConfig(numShardsInCluster: 1), + ), + ) + + let wakuConfRes = toWakuConf(nodeConfig) + + ## Then + require wakuConfRes.isOk() + let wakuConf = wakuConfRes.get() + require wakuConf.validate().isOk() + check: + wakuConf.clusterId == 1 + wakuConf.shardingConf.numShardsInCluster == 1 + + var node = (await createNode(nodeConfig)).valueOr: + raiseAssert error + + let sentListener = MessageSentEvent.listen( + proc(event: MessageSentEvent) {.async: (raises: []).} = + raiseAssert "Should not be called" + ).valueOr: + raiseAssert error + + let errorListener = MessageErrorEvent.listen( + proc(event: MessageErrorEvent) {.async: (raises: []).} = + check true + ).valueOr: + raiseAssert error + + let propagatedListener = MessagePropagatedEvent.listen( + proc(event: MessagePropagatedEvent) {.async: (raises: []).} = + raiseAssert "Should not be called" + ).valueOr: + raiseAssert error + defer: + MessageSentEvent.dropListener(sentListener) + MessageErrorEvent.dropListener(errorListener) + MessagePropagatedEvent.dropListener(propagatedListener) + + let envelope = MessageEnvelope.init( + ContentTopic("/waku/2/default-content/proto"), "test payload" + ) + + let sendResult = await node.send(envelope) + + if sendResult.isErr(): + echo "Send error: ", sendResult.error + + check: + sendResult.isErr() + # Depending on implementation, it might say "not healthy" + sendResult.error.contains("not healthy") + + (await node.stop()).isOkOr: + raiseAssert "Failed to stop node: " & error diff --git a/waku.nim b/waku.nim index 12e69fdaf..65a017c5a 100644 --- a/waku.nim +++ b/waku.nim @@ -3,8 +3,8 @@ ## This module re-exports the public API for creating and managing Waku nodes ## when using nwaku as a library dependency. -import waku/api/[api, api_conf, types] -export api, api_conf, types +import waku/api +export api import waku/factory/waku export waku diff --git a/waku/api.nim b/waku/api.nim index c3211867d..110a8f431 100644 --- a/waku/api.nim +++ b/waku/api.nim @@ -1,3 +1,4 @@ import ./api/[api, api_conf, entry_nodes] +import ./events/message_events -export api, api_conf, entry_nodes +export api, api_conf, entry_nodes, message_events diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index 057c78810..40b9110dd 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -200,9 +200,7 @@ proc new*( return err("Failed setting up app callbacks: " & $error) ## Delivery Monitor - let deliveryService = DeliveryService.new( - wakuConf.p2pReliability, node, - ).valueOr: + let deliveryService = DeliveryService.new(wakuConf.p2pReliability, node).valueOr: return err("could not create delivery service: " & $error) var waku = Waku( @@ -350,7 +348,7 @@ proc startDnsDiscoveryRetryLoop(waku: ptr Waku): Future[void] {.async.} = error "failed to connect to dynamic bootstrap nodes: " & getCurrentExceptionMsg() return -proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async.} = +proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises: []).} = if waku[].node.started: warn "startWaku: waku node already started" return ok() @@ -360,9 +358,15 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async.} = if conf.dnsDiscoveryConf.isSome(): let dnsDiscoveryConf = waku.conf.dnsDiscoveryConf.get() - let dynamicBootstrapNodesRes = await waku_dnsdisc.retrieveDynamicBootstrapNodes( - dnsDiscoveryConf.enrTreeUrl, dnsDiscoveryConf.nameServers - ) + let dynamicBootstrapNodesRes = + try: + await waku_dnsdisc.retrieveDynamicBootstrapNodes( + dnsDiscoveryConf.enrTreeUrl, dnsDiscoveryConf.nameServers + ) + except CatchableError: + Result[seq[RemotePeerInfo], string].err( + "Retrieving dynamic bootstrap nodes failed: " & getCurrentExceptionMsg() + ) if dynamicBootstrapNodesRes.isErr(): error "Retrieving dynamic bootstrap nodes failed", @@ -376,8 +380,11 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async.} = return err("error while calling startNode: " & $error) ## Update waku data that is set dynamically on node start - (await updateWaku(waku)).isOkOr: - return err("Error in updateApp: " & $error) + try: + (await updateWaku(waku)).isOkOr: + return err("Error in updateApp: " & $error) + except CatchableError: + return err("Error in updateApp: " & getCurrentExceptionMsg()) ## Discv5 if conf.discv5Conf.isSome(): @@ -419,44 +426,54 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async.} = return err ("Starting protocols support REST server failed: " & $error) if conf.metricsServerConf.isSome(): - waku[].metricsServer = ( - await ( - waku_metrics.startMetricsServerAndLogging( - conf.metricsServerConf.get(), conf.portsShift + try: + waku[].metricsServer = ( + await ( + waku_metrics.startMetricsServerAndLogging( + conf.metricsServerConf.get(), conf.portsShift + ) ) + ).valueOr: + return err("Starting monitoring and external interfaces failed: " & error) + except CatchableError: + return err( + "Starting monitoring and external interfaces failed: " & getCurrentExceptionMsg() ) - ).valueOr: - return err("Starting monitoring and external interfaces failed: " & error) - waku[].healthMonitor.setOverallHealth(HealthStatus.READY) return ok() -proc stop*(waku: Waku): Future[void] {.async: (raises: [Exception]).} = +proc stop*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} = ## Waku shutdown if not waku.node.started: warn "stop: attempting to stop node that isn't running" - waku.healthMonitor.setOverallHealth(HealthStatus.SHUTTING_DOWN) + try: + waku.healthMonitor.setOverallHealth(HealthStatus.SHUTTING_DOWN) - if not waku.metricsServer.isNil(): - await waku.metricsServer.stop() + if not waku.metricsServer.isNil(): + await waku.metricsServer.stop() - if not waku.wakuDiscv5.isNil(): - await waku.wakuDiscv5.stop() + if not waku.wakuDiscv5.isNil(): + await waku.wakuDiscv5.stop() - if not waku.node.isNil(): - await waku.node.stop() + if not waku.node.isNil(): + await waku.node.stop() - if not waku.dnsRetryLoopHandle.isNil(): - await waku.dnsRetryLoopHandle.cancelAndWait() + if not waku.dnsRetryLoopHandle.isNil(): + await waku.dnsRetryLoopHandle.cancelAndWait() - if not waku.healthMonitor.isNil(): - await waku.healthMonitor.stopHealthMonitor() + if not waku.healthMonitor.isNil(): + await waku.healthMonitor.stopHealthMonitor() - if not waku.restServer.isNil(): - await waku.restServer.stop() + if not waku.restServer.isNil(): + await waku.restServer.stop() + except Exception: + error "waku stop failed: " & getCurrentExceptionMsg() + return err("waku stop failed: " & getCurrentExceptionMsg()) + + return ok() proc isModeCoreAvailable*(waku: Waku): bool = return not waku.node.wakuRelay.isNil()