mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-05 15:33:08 +00:00
Fix some import issues, start and stop waku shall not throw exception but return with result properly
This commit is contained in:
parent
a9bd1f2f8c
commit
38ba74135d
153
tests/api/test_api_send.nim
Normal file
153
tests/api/test_api_send.nim
Normal file
@ -0,0 +1,153 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/[options, sequtils, strutils],
|
||||
chronos,
|
||||
testutils/unittests,
|
||||
stew/byteutils,
|
||||
libp2p/[switch, peerinfo]
|
||||
import ../testlib/[common, wakucore, wakunode, testasync, futures, testutils]
|
||||
import
|
||||
waku,
|
||||
waku/
|
||||
[
|
||||
waku_node,
|
||||
waku_core,
|
||||
waku_relay/protocol,
|
||||
waku_filter_v2/common,
|
||||
waku_store/common,
|
||||
]
|
||||
import waku/api/api_conf, waku/factory/waku_conf, waku/factory/networks_config
|
||||
|
||||
suite "Waku API - Send":
|
||||
var
|
||||
relayNode1 {.threadvar.}: WakuNode
|
||||
relayNode1PeerInfo {.threadvar.}: RemotePeerInfo
|
||||
relayNode1PeerId {.threadvar.}: PeerId
|
||||
|
||||
relayNode2 {.threadvar.}: WakuNode
|
||||
relayNode2PeerInfo {.threadvar.}: RemotePeerInfo
|
||||
relayNode2PeerId {.threadvar.}: PeerId
|
||||
|
||||
lightpushNode {.threadvar.}: WakuNode
|
||||
lightpushNodePeerInfo {.threadvar.}: RemotePeerInfo
|
||||
lightpushNodePeerId {.threadvar.}: PeerId
|
||||
|
||||
storeNode {.threadvar.}: WakuNode
|
||||
storeNodePeerInfo {.threadvar.}: RemotePeerInfo
|
||||
storeNodePeerId {.threadvar.}: PeerId
|
||||
|
||||
asyncSetup:
|
||||
# handlerFuture = newPushHandlerFuture()
|
||||
# handler = proc(
|
||||
# peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage
|
||||
# ): Future[WakuLightPushResult[void]] {.async.} =
|
||||
# handlerFuture.complete((pubsubTopic, message))
|
||||
# return ok()
|
||||
|
||||
relayNode1 =
|
||||
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
|
||||
relayNode2 =
|
||||
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
|
||||
|
||||
lightpushNode =
|
||||
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
|
||||
storeNode =
|
||||
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
|
||||
|
||||
await allFutures(
|
||||
relayNode1.start(), relayNode2.start(), lightpushNode.start(), storeNode.start()
|
||||
)
|
||||
|
||||
(await relayNode1.mountRelay()).isOkOr:
|
||||
raiseAssert "Failed to mount relay"
|
||||
|
||||
(await relayNode2.mountRelay()).isOkOr:
|
||||
raiseAssert "Failed to mount relay"
|
||||
|
||||
(await lightpushNode.mountRelay()).isOkOr:
|
||||
raiseAssert "Failed to mount relay"
|
||||
await lightpushNode.mountLightPush()
|
||||
|
||||
(await storeNode.mountRelay()).isOkOr:
|
||||
raiseAssert "Failed to mount relay"
|
||||
await storeNode.mountStore()
|
||||
|
||||
relayNode1PeerInfo = relayNode1.peerInfo.toRemotePeerInfo()
|
||||
relayNode1PeerId = relayNode1.peerInfo.peerId
|
||||
|
||||
relayNode2PeerInfo = relayNode2.peerInfo.toRemotePeerInfo()
|
||||
relayNode2PeerId = relayNode2.peerInfo.peerId
|
||||
|
||||
lightpushNodePeerInfo = lightpushNode.peerInfo.toRemotePeerInfo()
|
||||
lightpushNodePeerId = lightpushNode.peerInfo.peerId
|
||||
|
||||
storeNodePeerInfo = storeNode.peerInfo.toRemotePeerInfo()
|
||||
storeNodePeerId = storeNode.peerInfo.peerId
|
||||
asyncTeardown:
|
||||
await allFutures(
|
||||
relayNode1.stop(), relayNode2.stop(), lightpushNode.stop(), storeNode.stop()
|
||||
)
|
||||
|
||||
asyncTest "Check API availability (unhealthy node)":
|
||||
# Create a node config that doesn't start or has no peers
|
||||
let nodeConfig = NodeConfig.init(
|
||||
mode = WakuMode.Core,
|
||||
protocolsConfig = ProtocolsConfig.init(
|
||||
entryNodes = @[],
|
||||
clusterId = 1,
|
||||
autoShardingConfig = AutoShardingConfig(numShardsInCluster: 1),
|
||||
),
|
||||
)
|
||||
|
||||
let wakuConfRes = toWakuConf(nodeConfig)
|
||||
|
||||
## Then
|
||||
require wakuConfRes.isOk()
|
||||
let wakuConf = wakuConfRes.get()
|
||||
require wakuConf.validate().isOk()
|
||||
check:
|
||||
wakuConf.clusterId == 1
|
||||
wakuConf.shardingConf.numShardsInCluster == 1
|
||||
|
||||
var node = (await createNode(nodeConfig)).valueOr:
|
||||
raiseAssert error
|
||||
|
||||
let sentListener = MessageSentEvent.listen(
|
||||
proc(event: MessageSentEvent) {.async: (raises: []).} =
|
||||
raiseAssert "Should not be called"
|
||||
).valueOr:
|
||||
raiseAssert error
|
||||
|
||||
let errorListener = MessageErrorEvent.listen(
|
||||
proc(event: MessageErrorEvent) {.async: (raises: []).} =
|
||||
check true
|
||||
).valueOr:
|
||||
raiseAssert error
|
||||
|
||||
let propagatedListener = MessagePropagatedEvent.listen(
|
||||
proc(event: MessagePropagatedEvent) {.async: (raises: []).} =
|
||||
raiseAssert "Should not be called"
|
||||
).valueOr:
|
||||
raiseAssert error
|
||||
defer:
|
||||
MessageSentEvent.dropListener(sentListener)
|
||||
MessageErrorEvent.dropListener(errorListener)
|
||||
MessagePropagatedEvent.dropListener(propagatedListener)
|
||||
|
||||
let envelope = MessageEnvelope.init(
|
||||
ContentTopic("/waku/2/default-content/proto"), "test payload"
|
||||
)
|
||||
|
||||
let sendResult = await node.send(envelope)
|
||||
|
||||
if sendResult.isErr():
|
||||
echo "Send error: ", sendResult.error
|
||||
|
||||
check:
|
||||
sendResult.isErr()
|
||||
# Depending on implementation, it might say "not healthy"
|
||||
sendResult.error.contains("not healthy")
|
||||
|
||||
(await node.stop()).isOkOr:
|
||||
raiseAssert "Failed to stop node: " & error
|
||||
4
waku.nim
4
waku.nim
@ -3,8 +3,8 @@
|
||||
## This module re-exports the public API for creating and managing Waku nodes
|
||||
## when using nwaku as a library dependency.
|
||||
|
||||
import waku/api/[api, api_conf, types]
|
||||
export api, api_conf, types
|
||||
import waku/api
|
||||
export api
|
||||
|
||||
import waku/factory/waku
|
||||
export waku
|
||||
|
||||
@ -1,3 +1,4 @@
|
||||
import ./api/[api, api_conf, entry_nodes]
|
||||
import ./events/message_events
|
||||
|
||||
export api, api_conf, entry_nodes
|
||||
export api, api_conf, entry_nodes, message_events
|
||||
|
||||
@ -200,9 +200,7 @@ proc new*(
|
||||
return err("Failed setting up app callbacks: " & $error)
|
||||
|
||||
## Delivery Monitor
|
||||
let deliveryService = DeliveryService.new(
|
||||
wakuConf.p2pReliability, node,
|
||||
).valueOr:
|
||||
let deliveryService = DeliveryService.new(wakuConf.p2pReliability, node).valueOr:
|
||||
return err("could not create delivery service: " & $error)
|
||||
|
||||
var waku = Waku(
|
||||
@ -350,7 +348,7 @@ proc startDnsDiscoveryRetryLoop(waku: ptr Waku): Future[void] {.async.} =
|
||||
error "failed to connect to dynamic bootstrap nodes: " & getCurrentExceptionMsg()
|
||||
return
|
||||
|
||||
proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async.} =
|
||||
proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises: []).} =
|
||||
if waku[].node.started:
|
||||
warn "startWaku: waku node already started"
|
||||
return ok()
|
||||
@ -360,9 +358,15 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async.} =
|
||||
|
||||
if conf.dnsDiscoveryConf.isSome():
|
||||
let dnsDiscoveryConf = waku.conf.dnsDiscoveryConf.get()
|
||||
let dynamicBootstrapNodesRes = await waku_dnsdisc.retrieveDynamicBootstrapNodes(
|
||||
dnsDiscoveryConf.enrTreeUrl, dnsDiscoveryConf.nameServers
|
||||
)
|
||||
let dynamicBootstrapNodesRes =
|
||||
try:
|
||||
await waku_dnsdisc.retrieveDynamicBootstrapNodes(
|
||||
dnsDiscoveryConf.enrTreeUrl, dnsDiscoveryConf.nameServers
|
||||
)
|
||||
except CatchableError:
|
||||
Result[seq[RemotePeerInfo], string].err(
|
||||
"Retrieving dynamic bootstrap nodes failed: " & getCurrentExceptionMsg()
|
||||
)
|
||||
|
||||
if dynamicBootstrapNodesRes.isErr():
|
||||
error "Retrieving dynamic bootstrap nodes failed",
|
||||
@ -376,8 +380,11 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async.} =
|
||||
return err("error while calling startNode: " & $error)
|
||||
|
||||
## Update waku data that is set dynamically on node start
|
||||
(await updateWaku(waku)).isOkOr:
|
||||
return err("Error in updateApp: " & $error)
|
||||
try:
|
||||
(await updateWaku(waku)).isOkOr:
|
||||
return err("Error in updateApp: " & $error)
|
||||
except CatchableError:
|
||||
return err("Error in updateApp: " & getCurrentExceptionMsg())
|
||||
|
||||
## Discv5
|
||||
if conf.discv5Conf.isSome():
|
||||
@ -419,44 +426,54 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async.} =
|
||||
return err ("Starting protocols support REST server failed: " & $error)
|
||||
|
||||
if conf.metricsServerConf.isSome():
|
||||
waku[].metricsServer = (
|
||||
await (
|
||||
waku_metrics.startMetricsServerAndLogging(
|
||||
conf.metricsServerConf.get(), conf.portsShift
|
||||
try:
|
||||
waku[].metricsServer = (
|
||||
await (
|
||||
waku_metrics.startMetricsServerAndLogging(
|
||||
conf.metricsServerConf.get(), conf.portsShift
|
||||
)
|
||||
)
|
||||
).valueOr:
|
||||
return err("Starting monitoring and external interfaces failed: " & error)
|
||||
except CatchableError:
|
||||
return err(
|
||||
"Starting monitoring and external interfaces failed: " & getCurrentExceptionMsg()
|
||||
)
|
||||
).valueOr:
|
||||
return err("Starting monitoring and external interfaces failed: " & error)
|
||||
|
||||
waku[].healthMonitor.setOverallHealth(HealthStatus.READY)
|
||||
|
||||
return ok()
|
||||
|
||||
proc stop*(waku: Waku): Future[void] {.async: (raises: [Exception]).} =
|
||||
proc stop*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} =
|
||||
## Waku shutdown
|
||||
|
||||
if not waku.node.started:
|
||||
warn "stop: attempting to stop node that isn't running"
|
||||
|
||||
waku.healthMonitor.setOverallHealth(HealthStatus.SHUTTING_DOWN)
|
||||
try:
|
||||
waku.healthMonitor.setOverallHealth(HealthStatus.SHUTTING_DOWN)
|
||||
|
||||
if not waku.metricsServer.isNil():
|
||||
await waku.metricsServer.stop()
|
||||
if not waku.metricsServer.isNil():
|
||||
await waku.metricsServer.stop()
|
||||
|
||||
if not waku.wakuDiscv5.isNil():
|
||||
await waku.wakuDiscv5.stop()
|
||||
if not waku.wakuDiscv5.isNil():
|
||||
await waku.wakuDiscv5.stop()
|
||||
|
||||
if not waku.node.isNil():
|
||||
await waku.node.stop()
|
||||
if not waku.node.isNil():
|
||||
await waku.node.stop()
|
||||
|
||||
if not waku.dnsRetryLoopHandle.isNil():
|
||||
await waku.dnsRetryLoopHandle.cancelAndWait()
|
||||
if not waku.dnsRetryLoopHandle.isNil():
|
||||
await waku.dnsRetryLoopHandle.cancelAndWait()
|
||||
|
||||
if not waku.healthMonitor.isNil():
|
||||
await waku.healthMonitor.stopHealthMonitor()
|
||||
if not waku.healthMonitor.isNil():
|
||||
await waku.healthMonitor.stopHealthMonitor()
|
||||
|
||||
if not waku.restServer.isNil():
|
||||
await waku.restServer.stop()
|
||||
if not waku.restServer.isNil():
|
||||
await waku.restServer.stop()
|
||||
except Exception:
|
||||
error "waku stop failed: " & getCurrentExceptionMsg()
|
||||
return err("waku stop failed: " & getCurrentExceptionMsg())
|
||||
|
||||
return ok()
|
||||
|
||||
proc isModeCoreAvailable*(waku: Waku): bool =
|
||||
return not waku.node.wakuRelay.isNil()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user