refactor: big refactor to add waku component in libwaku instead of onlu waku node (#2658)

This commit is contained in:
Ivan FB 2024-05-03 14:07:15 +02:00 committed by GitHub
parent 853ec1869e
commit 2463527b24
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 152 additions and 206 deletions

View File

@ -18,7 +18,7 @@ import
../../waku/common/logging,
../../waku/factory/external_config,
../../waku/factory/networks_config,
../../waku/factory/app,
../../waku/factory/waku,
../../waku/node/health_monitor,
../../waku/node/waku_metrics,
../../waku/waku_api/rest/builder as rest_server_builder
@ -63,7 +63,7 @@ when isMainModule:
## 5. Start monitoring tools and external interfaces
## 6. Setup graceful shutdown hooks
const versionString = "version / git commit hash: " & app.git_version
const versionString = "version / git commit hash: " & waku.git_version
let confRes = WakuNodeConf.load(version = versionString)
if confRes.isErr():
@ -119,7 +119,7 @@ when isMainModule:
else:
discard
info "Running nwaku node", version = app.git_version
info "Running nwaku node", version = waku.git_version
logConfig(conf)
# NOTE: {.threadvar.} is used to make the global variable GC safe for the closure uses it
@ -135,25 +135,25 @@ when isMainModule:
error "Starting esential REST server failed.", error = $error
quit(QuitFailure)
var wakunode2 = App.init(conf).valueOr:
error "App initialization failed", error = error
var waku = Waku.init(conf).valueOr:
error "Waku initialization failed", error = error
quit(QuitFailure)
wakunode2.restServer = restServer
waku.restServer = restServer
nodeHealthMonitor.setNode(wakunode2.node)
nodeHealthMonitor.setNode(waku.node)
wakunode2.startApp().isOkOr:
error "Starting app failed", error = error
(waitFor startWaku(addr waku)).isOkOr:
error "Starting waku failed", error = error
quit(QuitFailure)
rest_server_builder.startRestServerProtocolSupport(
restServer, wakunode2.node, wakunode2.wakuDiscv5, conf
restServer, waku.node, waku.wakuDiscv5, conf
).isOkOr:
error "Starting protocols support REST server failed.", error = $error
quit(QuitFailure)
wakunode2.metricsServer = waku_metrics.startMetricsServerAndLogging(conf).valueOr:
waku.metricsServer = waku_metrics.startMetricsServerAndLogging(conf).valueOr:
error "Starting monitoring and external interfaces failed", error = error
quit(QuitFailure)
@ -163,7 +163,7 @@ when isMainModule:
## Setup shutdown hooks for this process.
## Stop node gracefully on shutdown.
proc asyncStopper(node: App) {.async: (raises: [Exception]).} =
proc asyncStopper(node: Waku) {.async: (raises: [Exception]).} =
nodeHealthMonitor.setOverallHealth(HealthStatus.SHUTTING_DOWN)
await node.stop()
quit(QuitSuccess)
@ -174,7 +174,7 @@ when isMainModule:
# workaround for https://github.com/nim-lang/Nim/issues/4057
setupForeignThreadGc()
notice "Shutting down after receiving SIGINT"
asyncSpawn asyncStopper(wakunode2)
asyncSpawn asyncStopper(waku)
setControlCHook(handleCtrlC)
@ -182,7 +182,7 @@ when isMainModule:
when defined(posix):
proc handleSigterm(signal: cint) {.noconv.} =
notice "Shutting down after receiving SIGTERM"
asyncSpawn asyncStopper(wakunode2)
asyncSpawn asyncStopper(waku)
c_signal(ansi_c.SIGTERM, handleSigterm)
@ -195,7 +195,7 @@ when isMainModule:
# Not available in -d:release mode
writeStackTrace()
waitFor wakunode2.stop()
waitFor waku.stop()
quit(QuitFailure)
c_signal(ansi_c.SIGSEGV, handleSigsegv)

View File

@ -3,8 +3,8 @@ when (NimMajor, NimMinor) < (1, 4):
else:
{.push raises: [].}
import ../../apps/wakunode2/[networks_config, app, external_config]
import ../../waku/common/logging
import
../../waku/common/logging, ../../waku/factory/[waku, networks_config, external_config]
import
std/[options, strutils, os, sequtils],
stew/shims/net as stewNet,
@ -15,11 +15,11 @@ import
libp2p/crypto/crypto
export
networks_config, app, logging, options, strutils, os, sequtils, stewNet, chronicles,
networks_config, waku, logging, options, strutils, os, sequtils, stewNet, chronicles,
chronos, metrics, libbacktrace, crypto
proc setup*(): App =
const versionString = "version / git commit hash: " & app.git_version
proc setup*(): Waku =
const versionString = "version / git commit hash: " & waku.git_version
let rng = crypto.newRng()
let confRes = WakuNodeConf.load(version = versionString)
@ -48,48 +48,17 @@ proc setup*(): App =
conf.rlnEpochSizeSec = twnClusterConf.rlnEpochSizeSec
conf.rlnRelayUserMessageLimit = twnClusterConf.rlnRelayUserMessageLimit
var wakunode2 = App.init(rng, conf)
## Peer persistence
let res1 = wakunode2.setupPeerPersistence()
if res1.isErr():
error "1/5 Setting up storage failed", error = $res1.error
debug "Starting node"
var waku = Waku.init(conf).valueOr:
error "Waku initialization failed", error = error
quit(QuitFailure)
debug "2/5 Retrieve dynamic bootstrap nodes"
let res3 = wakunode2.setupDyamicBootstrapNodes()
if res3.isErr():
error "2/5 Retrieving dynamic bootstrap nodes failed", error = $res3.error
(waitFor startWaku(addr waku)).isOkOr:
error "Starting waku failed", error = error
quit(QuitFailure)
debug "3/5 Initializing node"
let res4 = wakunode2.setupWakuApp()
if res4.isErr():
error "3/5 Initializing node failed", error = $res4.error
quit(QuitFailure)
debug "4/5 Mounting protocols"
var res5: Result[void, string]
try:
res5 = waitFor wakunode2.setupAndMountProtocols()
if res5.isErr():
error "4/5 Mounting protocols failed", error = $res5.error
quit(QuitFailure)
except Exception:
error "4/5 Mounting protocols failed", error = getCurrentExceptionMsg()
quit(QuitFailure)
debug "5/5 Starting node and mounted protocols"
# set triggerSelf to false, we don't want to process our own stealthCommitments
wakunode2.node.wakuRelay.triggerSelf = false
let res6 = wakunode2.startApp()
if res6.isErr():
error "5/5 Starting node and protocols failed", error = $res6.error
quit(QuitFailure)
waku.node.wakuRelay.triggerSelf = false
info "Node setup complete"
return wakunode2
return waku

View File

@ -15,7 +15,7 @@ import
export wire_spec, logging
type StealthCommitmentProtocol* = object
wakuApp: App
waku: Waku
contentTopic: string
spendingKeyPair: StealthCommitmentFFI.KeyPair
viewingKeyPair: StealthCommitmentFFI.KeyPair
@ -51,10 +51,10 @@ proc sendThruWaku*(
timestamp: getNanosecondTime(time),
)
(self.wakuApp.node.wakuRlnRelay.appendRLNProof(message, float64(time))).isOkOr:
(self.waku.node.wakuRlnRelay.appendRLNProof(message, float64(time))).isOkOr:
return err("could not append rate limit proof to the message: " & $error)
(await self.wakuApp.node.publish(some(DefaultPubsubTopic), message)).isOkOr:
(await self.waku.node.publish(some(DefaultPubsubTopic), message)).isOkOr:
return err("failed to publish message: " & $error)
debug "rate limit proof is appended to the message"
@ -167,7 +167,7 @@ proc getSCPHandler(self: StealthCommitmentProtocol): SCPHandler =
return handler
proc new*(
wakuApp: App, contentTopic = ContentTopic("/wakustealthcommitments/1/app/proto")
waku: Waku, contentTopic = ContentTopic("/wakustealthcommitments/1/app/proto")
): Result[StealthCommitmentProtocol, string] =
let spendingKeyPair = StealthCommitmentFFI.generateKeyPair().valueOr:
return err("could not generate spending key pair: " & $error)
@ -178,7 +178,7 @@ proc new*(
info "viewing public key", publicKey = viewingKeyPair.publicKey
let SCP = StealthCommitmentProtocol(
wakuApp: wakuApp,
waku: waku,
contentTopic: contentTopic,
spendingKeyPair: spendingKeyPair,
viewingKeyPair: viewingKeyPair,
@ -192,5 +192,5 @@ proc new*(
except CatchableError:
error "could not handle SCP message: ", err = getCurrentExceptionMsg()
wakuApp.node.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(handler))
waku.node.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(handler))
return ok(SCP)

View File

@ -1,6 +1,6 @@
import std/[options, sequtils, strutils, json]
import chronicles, chronos, stew/results, stew/shims/net
import ../../../../waku/node/waku_node, ../../../alloc
import ../../../../waku/factory/waku, ../../../../waku/node/waku_node, ../../../alloc
type DebugNodeMsgType* = enum
RETRIEVE_LISTENING_ADDRESSES
@ -20,13 +20,13 @@ proc getMultiaddresses(node: WakuNode): seq[string] =
return node.info().listenAddresses
proc process*(
self: ptr DebugNodeRequest, node: WakuNode
self: ptr DebugNodeRequest, waku: Waku
): Future[Result[string, string]] {.async.} =
defer:
destroyShared(self)
case self.operation
of RETRIEVE_LISTENING_ADDRESSES:
return ok($(%*node.getMultiaddresses()))
return ok($(%*waku.node.getMultiaddresses()))
return err("unsupported operation in DebugNodeRequest")

View File

@ -12,7 +12,7 @@ import
../../../../waku/node/peer_manager/peer_manager,
../../../../waku/waku_core,
../../../../waku/factory/external_config,
../../../../waku/node/waku_node,
../../../../waku/factory/waku,
../../../../waku/node/config,
../../../../waku/waku_archive/driver/builder,
../../../../waku/waku_archive/driver,
@ -48,16 +48,12 @@ proc destroyShared(self: ptr NodeLifecycleRequest) =
deallocShared(self[].configJson)
deallocShared(self)
proc createNode(configJson: cstring): Future[Result[WakuNode, string]] {.async.} =
proc createWaku(configJson: cstring): Future[Result[Waku, string]] {.async.} =
var conf: WakuNodeConf
var errorResp: string
try:
if not parseConfig(
$configJson,
conf,
errorResp,
):
if not parseConfig($configJson, conf, errorResp):
return err(errorResp)
except Exception:
return err("exception calling parseConfig: " & getCurrentExceptionMsg())
@ -69,6 +65,7 @@ proc createNode(configJson: cstring): Future[Result[WakuNode, string]] {.async.}
# The Waku Network config (cluster-id=1)
if conf.clusterId == 1:
## TODO: This section is duplicated in wakunode2.nim. We need to move this to a common module
let twnClusterConf = ClusterConf.TheWakuNetworkConf()
if len(conf.shards) != 0:
conf.pubsubTopics = conf.shards.mapIt(twnClusterConf.pubsubTopics[it.uint16])
@ -88,31 +85,28 @@ proc createNode(configJson: cstring): Future[Result[WakuNode, string]] {.async.}
conf.rlnEpochSizeSec = twnClusterConf.rlnEpochSizeSec
conf.rlnRelayUserMessageLimit = twnClusterConf.rlnRelayUserMessageLimit
let wakuRes = Waku.init(conf).valueOr:
error "waku initialization failed", error = error
return err("Failed setting up Waku: " & $error)
let nodeRes = setupNode(conf).valueOr():
error "Failed setting up node", error = error
return err("Failed setting up node: " & $error)
return ok(nodeRes)
return ok(wakuRes)
proc process*(
self: ptr NodeLifecycleRequest, node: ptr WakuNode
self: ptr NodeLifecycleRequest, waku: ptr Waku
): Future[Result[string, string]] {.async.} =
defer:
destroyShared(self)
case self.operation
of CREATE_NODE:
let newNodeRes = await createNode(self.configJson)
if newNodeRes.isErr():
return err(newNodeRes.error)
node[] = newNodeRes.get()
waku[] = (await createWaku(self.configJson)).valueOr:
return err("error processing createWaku request: " & $error)
of START_NODE:
await node[].start()
(await waku.startWaku()).isOkOr:
return err("problem starting waku: " & $error)
of STOP_NODE:
try:
await node[].stop()
await waku[].stop()
except Exception:
return err("exception stopping node: " & getCurrentExceptionMsg())

View File

@ -1,6 +1,6 @@
import std/[options, sequtils, strutils]
import chronicles, chronos, stew/results, stew/shims/net
import ../../../../waku/node/waku_node, ../../../alloc
import ../../../../waku/factory/waku, ../../../../waku/node/waku_node, ../../../alloc
type PeerManagementMsgType* = enum
CONNECT_TO
@ -43,14 +43,14 @@ proc connectTo(
return ok()
proc process*(
self: ptr PeerManagementRequest, node: WakuNode
self: ptr PeerManagementRequest, waku: Waku
): Future[Result[string, string]] {.async.} =
defer:
destroyShared(self)
case self.operation
of CONNECT_TO:
let ret = node.connectTo($self[].peerMultiAddr, self[].dialTimeout)
let ret = waku.node.connectTo($self[].peerMultiAddr, self[].dialTimeout)
if ret.isErr():
return err(ret.error)

View File

@ -2,7 +2,7 @@ import std/[options, sequtils, strutils]
import chronicles, chronos, stew/byteutils, stew/results, stew/shims/net
import
../../../../../waku/waku_core/message/message,
../../../../../waku/node/waku_node,
../../../../../waku/factory/waku,
../../../../../waku/waku_core/message,
../../../../../waku/waku_core/time, # Timestamp
../../../../../waku/waku_core/topics/pubsub_topic,
@ -79,26 +79,26 @@ proc toWakuMessage(m: ThreadSafeWakuMessage): WakuMessage =
return wakuMessage
proc process*(
self: ptr RelayRequest, node: ptr WakuNode
self: ptr RelayRequest, waku: ptr Waku
): Future[Result[string, string]] {.async.} =
defer:
destroyShared(self)
if node.wakuRelay.isNil():
if waku.node.wakuRelay.isNil():
return err("Operation not supported without Waku Relay enabled.")
case self.operation
of SUBSCRIBE:
# TO DO: properly perform 'subscribe'
discard node.wakuRelay.subscribe($self.pubsubTopic, self.relayEventCallback)
discard waku.node.wakuRelay.subscribe($self.pubsubTopic, self.relayEventCallback)
of UNSUBSCRIBE:
# TODO: properly perform 'unsubscribe'
node.wakuRelay.unsubscribeAll($self.pubsubTopic)
waku.node.wakuRelay.unsubscribeAll($self.pubsubTopic)
of PUBLISH:
let msg = self.message.toWakuMessage()
let pubsubTopic = $self.pubsubTopic
let numPeers = await node.wakuRelay.publish(pubsubTopic, msg)
let numPeers = await waku.node.wakuRelay.publish(pubsubTopic, msg)
if numPeers == 0:
return err("Message not sent because no peers found.")
elif numPeers > 0:

View File

@ -1,7 +1,7 @@
import std/[options, sequtils, strutils]
import chronos, stew/results, stew/shims/net
import
../../../../../waku/node/waku_node,
../../../../../waku/factory/waku,
../../../../../waku/waku_archive/driver/builder,
../../../../../waku/waku_archive/driver,
../../../../../waku/waku_archive/retention_policy/builder,
@ -50,20 +50,20 @@ proc destroyShared(self: ptr StoreQueryRequest) =
deallocShared(self)
proc process(
self: ptr StoreQueryRequest, node: ptr WakuNode
self: ptr StoreQueryRequest, waku: ptr Waku
): Future[Result[string, string]] {.async.} =
defer:
destroyShared(self)
proc process*(
self: ptr StoreRequest, node: ptr WakuNode
self: ptr StoreRequest, waku: ptr Waku
): Future[Result[string, string]] {.async.} =
defer:
deallocShared(self)
case self.operation
of REMOTE_QUERY:
return await cast[ptr StoreQueryRequest](self[].storeReq).process(node)
return await cast[ptr StoreQueryRequest](self[].storeReq).process(waku)
of LOCAL_QUERY:
discard
# cast[ptr StoreQueryRequest](request[].reqContent).process(node)

View File

@ -5,7 +5,7 @@
import std/json, stew/results
import chronos
import
../../../waku/node/waku_node,
../../../waku/factory/waku,
./requests/node_lifecycle_request,
./requests/peer_manager_request,
./requests/protocols/relay_request,
@ -32,7 +32,7 @@ proc createShared*(
return ret
proc process*(
T: type InterThreadRequest, request: ptr InterThreadRequest, node: ptr WakuNode
T: type InterThreadRequest, request: ptr InterThreadRequest, waku: ptr Waku
): Future[Result[string, string]] {.async.} =
## Processes the request and deallocates its memory
defer:
@ -43,15 +43,15 @@ proc process*(
let retFut =
case request[].reqType
of LIFECYCLE:
cast[ptr NodeLifecycleRequest](request[].reqContent).process(node)
cast[ptr NodeLifecycleRequest](request[].reqContent).process(waku)
of PEER_MANAGER:
cast[ptr PeerManagementRequest](request[].reqContent).process(node[])
cast[ptr PeerManagementRequest](request[].reqContent).process(waku[])
of RELAY:
cast[ptr RelayRequest](request[].reqContent).process(node)
cast[ptr RelayRequest](request[].reqContent).process(waku)
of STORE:
cast[ptr StoreRequest](request[].reqContent).process(node)
cast[ptr StoreRequest](request[].reqContent).process(waku)
of DEBUG:
cast[ptr DebugNodeRequest](request[].reqContent).process(node[])
cast[ptr DebugNodeRequest](request[].reqContent).process(waku[])
return await retFut

View File

@ -11,7 +11,7 @@ import
stew/results,
stew/shims/net
import
../../../waku/node/waku_node,
../../../waku/factory/waku,
../events/[json_message_event, json_base_event],
./inter_thread_communication/waku_thread_request,
./inter_thread_communication/waku_thread_response
@ -48,7 +48,7 @@ proc run(ctx: ptr Context) {.thread.} =
## This is the worker thread body. This thread runs the Waku node
## and attends library user requests (stop, connect_to, etc.)
var node: WakuNode
var waku: Waku
while running.load == true:
## Trying to get a request from the libwaku main thread
@ -57,7 +57,7 @@ proc run(ctx: ptr Context) {.thread.} =
waitFor ctx.reqSignal.wait()
let recvOk = ctx.reqChannel.tryRecv(request)
if recvOk == true:
let resultResponse = waitFor InterThreadRequest.process(request, addr node)
let resultResponse = waitFor InterThreadRequest.process(request, addr waku)
## Converting a `Result` into a thread-safe transferable response type
let threadSafeResp = InterThreadResponse.createShared(resultResponse)

View File

@ -11,51 +11,51 @@ import
libp2p/switch
import ../testlib/common, ../testlib/wakucore, ../testlib/wakunode
include ../../waku/factory/app
include ../../waku/factory/waku
suite "Wakunode2 - App":
suite "Wakunode2 - Waku":
test "compilation version should be reported":
## Given
let conf = defaultTestWakuNodeConf()
let wakunode2 = App.init(conf).valueOr:
let waku = Waku.init(conf).valueOr:
raiseAssert error
## When
let version = wakunode2.version
let version = waku.version
## Then
check:
version == git_version
suite "Wakunode2 - App initialization":
suite "Wakunode2 - Waku initialization":
test "peer persistence setup should be successfully mounted":
## Given
var conf = defaultTestWakuNodeConf()
conf.peerPersistence = true
let wakunode2 = App.init(conf).valueOr:
let waku = Waku.init(conf).valueOr:
raiseAssert error
check:
not wakunode2.node.peerManager.storage.isNil()
not waku.node.peerManager.storage.isNil()
test "node setup is successful with default configuration":
## Given
let conf = defaultTestWakuNodeConf()
## When
var wakunode2 = App.init(conf).valueOr:
var waku = Waku.init(conf).valueOr:
raiseAssert error
wakunode2.startApp().isOkOr:
(waitFor startWaku(addr waku)).isOkOr:
raiseAssert error
wakunode2.metricsServer = waku_metrics.startMetricsServerAndLogging(conf).valueOr:
waku.metricsServer = waku_metrics.startMetricsServerAndLogging(conf).valueOr:
raiseAssert error
## Then
let node = wakunode2.node
let node = waku.node
check:
not node.isNil()
node.wakuArchive.isNil()
@ -64,7 +64,7 @@ suite "Wakunode2 - App initialization":
not node.rendezvous.isNil()
## Cleanup
waitFor wakunode2.stop()
waitFor waku.stop()
test "app properly handles dynamic port configuration":
## Given
@ -72,21 +72,21 @@ suite "Wakunode2 - App initialization":
conf.tcpPort = Port(0)
## When
var wakunode2 = App.init(conf).valueOr:
var waku = Waku.init(conf).valueOr:
raiseAssert error
wakunode2.startApp().isOkOr:
(waitFor startWaku(addr waku)).isOkOr:
raiseAssert error
## Then
let
node = wakunode2.node
node = waku.node
typedNodeEnr = node.enr.toTypedRecord()
assert typedNodeEnr.isOk(), $typedNodeEnr.error
check:
# App started properly
# Waku started properly
not node.isNil()
node.wakuArchive.isNil()
node.wakuStore.isNil()
@ -97,4 +97,4 @@ suite "Wakunode2 - App initialization":
typedNodeEnr.get().tcp.get() != 0
## Cleanup
waitFor wakunode2.stop()
waitFor waku.stop()

View File

@ -337,7 +337,7 @@ proc setupProtocols(
proc startNode*(
node: WakuNode, conf: WakuNodeConf, dynamicBootstrapNodes: seq[RemotePeerInfo] = @[]
): Future[Result[void, string]] {.async.} =
): Future[Result[void, string]] {.async: (raises: []).} =
## Start a configured node and all mounted protocols.
## Connect to static nodes and start
## keep-alive, if configured.

View File

@ -46,37 +46,31 @@ import
../../waku/factory/external_config
logScope:
topics = "wakunode app"
topics = "wakunode waku"
# Git version in git describe format (defined at compile time)
const git_version* {.strdefine.} = "n/a"
type
App* = object
version: string
conf: WakuNodeConf
rng: ref HmacDrbgContext
key: crypto.PrivateKey
type Waku* = object
version: string
conf: WakuNodeConf
rng: ref HmacDrbgContext
key: crypto.PrivateKey
wakuDiscv5*: WakuDiscoveryV5
dynamicBootstrapNodes: seq[RemotePeerInfo]
wakuDiscv5*: WakuDiscoveryV5
dynamicBootstrapNodes: seq[RemotePeerInfo]
node: WakuNode
node*: WakuNode
restServer*: WakuRestServerRef
metricsServer*: MetricsHttpServerRef
restServer*: WakuRestServerRef
metricsServer*: MetricsHttpServerRef
AppResult*[T] = Result[T, string]
func node*(app: App): WakuNode =
app.node
func version*(app: App): string =
app.version
func version*(waku: Waku): string =
waku.version
## Initialisation
proc init*(T: type App, conf: WakuNodeConf): Result[App, string] =
proc init*(T: type Waku, conf: WakuNodeConf): Result[Waku, string] =
var confCopy = conf
let rng = crypto.newRng()
@ -103,7 +97,7 @@ proc init*(T: type App, conf: WakuNodeConf): Result[App, string] =
error "Failed setting up node", error = nodeRes.error
return err("Failed setting up node: " & nodeRes.error)
var app = App(
var waku = Waku(
version: git_version,
conf: confCopy,
rng: rng,
@ -112,11 +106,11 @@ proc init*(T: type App, conf: WakuNodeConf): Result[App, string] =
dynamicBootstrapNodes: dynamicBootstrapNodesRes.get(),
)
ok(app)
ok(waku)
proc getPorts(
listenAddrs: seq[MultiAddress]
): AppResult[tuple[tcpPort, websocketPort: Option[Port]]] =
): Result[tuple[tcpPort, websocketPort: Option[Port]], string] =
var tcpPort, websocketPort = none(Port)
for a in listenAddrs:
@ -132,9 +126,9 @@ proc getPorts(
return ok((tcpPort: tcpPort, websocketPort: websocketPort))
proc getRunningNetConfig(app: App): AppResult[NetConfig] =
var conf = app.conf
let (tcpPort, websocketPort) = getPorts(app.node.switch.peerInfo.listenAddrs).valueOr:
proc getRunningNetConfig(waku: ptr Waku): Result[NetConfig, string] =
var conf = waku[].conf
let (tcpPort, websocketPort) = getPorts(waku[].node.switch.peerInfo.listenAddrs).valueOr:
return err("Could not retrieve ports " & error)
if tcpPort.isSome():
@ -149,67 +143,62 @@ proc getRunningNetConfig(app: App): AppResult[NetConfig] =
return ok(netConf)
proc updateEnr(app: var App, netConf: NetConfig): AppResult[void] =
let record = enrConfiguration(app.conf, netConf, app.key).valueOr:
proc updateEnr(waku: ptr Waku, netConf: NetConfig): Result[void, string] =
let record = enrConfiguration(waku[].conf, netConf, waku[].key).valueOr:
return err("ENR setup failed: " & error)
if isClusterMismatched(record, app.conf.clusterId):
if isClusterMismatched(record, waku[].conf.clusterId):
return err("cluster id mismatch configured shards")
app.node.enr = record
waku[].node.enr = record
return ok()
proc updateApp(app: var App): AppResult[void] =
if app.conf.tcpPort == Port(0) or app.conf.websocketPort == Port(0):
let netConf = getRunningNetConfig(app).valueOr:
proc updateWaku(waku: ptr Waku): Result[void, string] =
if waku[].conf.tcpPort == Port(0) or waku[].conf.websocketPort == Port(0):
let netConf = getRunningNetConfig(waku).valueOr:
return err("error calling updateNetConfig: " & $error)
updateEnr(app, netConf).isOkOr:
updateEnr(waku, netConf).isOkOr:
return err("error calling updateEnr: " & $error)
app.node.announcedAddresses = netConf.announcedAddresses
waku[].node.announcedAddresses = netConf.announcedAddresses
printNodeNetworkInfo(app.node)
printNodeNetworkInfo(waku[].node)
return ok()
proc startApp*(app: var App): AppResult[void] =
let nodeRes = catch:
(waitFor startNode(app.node, app.conf, app.dynamicBootstrapNodes))
if nodeRes.isErr():
return err("exception starting node: " & nodeRes.error.msg)
proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises: []).} =
(await startNode(waku.node, waku.conf, waku.dynamicBootstrapNodes)).isOkOr:
return err("error while calling startNode: " & $error)
nodeRes.get().isOkOr:
return err("exception starting node: " & error)
# Update app data that is set dynamically on node start
app.updateApp().isOkOr:
# Update waku data that is set dynamically on node start
updateWaku(waku).isOkOr:
return err("Error in updateApp: " & $error)
## Discv5
if app.conf.discv5Discovery:
app.wakuDiscV5 = waku_discv5.setupDiscoveryV5(
app.node.enr, app.node.peerManager, app.node.topicSubscriptionQueue, app.conf,
app.dynamicBootstrapNodes, app.rng, app.key,
if waku[].conf.discv5Discovery:
waku[].wakuDiscV5 = waku_discv5.setupDiscoveryV5(
waku.node.enr, waku.node.peerManager, waku.node.topicSubscriptionQueue, waku.conf,
waku.dynamicBootstrapNodes, waku.rng, waku.key,
)
(waitFor app.wakuDiscV5.start()).isOkOr:
(await waku.wakuDiscV5.start()).isOkOr:
return err("failed to start waku discovery v5: " & $error)
return ok()
# App shutdown
# Waku shutdown
proc stop*(app: App): Future[void] {.async: (raises: [Exception]).} =
if not app.restServer.isNil():
await app.restServer.stop()
proc stop*(waku: Waku): Future[void] {.async: (raises: [Exception]).} =
if not waku.restServer.isNil():
await waku.restServer.stop()
if not app.metricsServer.isNil():
await app.metricsServer.stop()
if not waku.metricsServer.isNil():
await waku.metricsServer.stop()
if not app.wakuDiscv5.isNil():
await app.wakuDiscv5.stop()
if not waku.wakuDiscv5.isNil():
await waku.wakuDiscv5.stop()
if not app.node.isNil():
await app.node.stop()
if not waku.node.isNil():
await waku.node.stop()

View File

@ -1063,7 +1063,7 @@ proc mountPeerExchange*(node: WakuNode) {.async, raises: [Defect, LPError].} =
proc fetchPeerExchangePeers*(
node: Wakunode, amount: uint64
): Future[Result[int, string]] {.async, raises: [Defect].} =
): Future[Result[int, string]] {.async: (raises: []).} =
if node.wakuPeerExchange.isNil():
error "could not get peers from px, waku peer-exchange is nil"
return err("PeerExchange is not mounted")

View File

@ -1,9 +0,0 @@
# Waku
#
# Licenses:
# - MIT ([LICENSE-MIT](../LICENSE-MIT) or http://opensource.org/licenses/MIT)
# - APACHEv2 ([LICENSE-APACHEv2](../LICENSE-APACHEv2) or https://www.apache.org/licenses/LICENSE-2.0)
## An implementation of [Waku v2](https://rfc.vac.dev/spec/10/) in nim.
import waku_node as wakunode2
export wakunode2

View File

@ -54,7 +54,7 @@ type
proc request*(
wpx: WakuPeerExchange, numPeers: uint64, conn: Connection
): Future[WakuPeerExchangeResult[PeerExchangeResponse]] {.async, gcsafe.} =
): Future[WakuPeerExchangeResult[PeerExchangeResponse]] {.async: (raises: []).} =
let rpc = PeerExchangeRpc(request: PeerExchangeRequest(numPeers: numPeers))
var buffer: seq[byte]
@ -79,15 +79,18 @@ proc request*(
proc request*(
wpx: WakuPeerExchange, numPeers: uint64, peer: RemotePeerInfo
): Future[WakuPeerExchangeResult[PeerExchangeResponse]] {.async, gcsafe.} =
let connOpt = await wpx.peerManager.dialPeer(peer, WakuPeerExchangeCodec)
if connOpt.isNone():
return err(dialFailure)
return await wpx.request(numPeers, connOpt.get())
): Future[WakuPeerExchangeResult[PeerExchangeResponse]] {.async: (raises: []).} =
try:
let connOpt = await wpx.peerManager.dialPeer(peer, WakuPeerExchangeCodec)
if connOpt.isNone():
return err(dialFailure)
return await wpx.request(numPeers, connOpt.get())
except CatchableError:
return err("exception dialing peer: " & getCurrentExceptionMsg())
proc request*(
wpx: WakuPeerExchange, numPeers: uint64
): Future[WakuPeerExchangeResult[PeerExchangeResponse]] {.async, gcsafe.} =
): Future[WakuPeerExchangeResult[PeerExchangeResponse]] {.async: (raises: []).} =
let peerOpt = wpx.peerManager.selectPeer(WakuPeerExchangeCodec)
if peerOpt.isNone():
waku_px_errors.inc(labelValues = [peerNotFoundFailure])