@ -3,7 +3,6 @@
std/[options, tables, strutils, sequtils, os],
chronos, chronicles, metrics,
stew/shims/net as stewNet,
@ -77,8 +76,6 @@ type
peerInfo*: PeerInfo
libp2pPing*: Ping
libp2pTransportLoops*: seq[Future[void]]
# TODO Revist messages field indexing as well as if this should be Message or WakuMessage
messages*: seq[(Topic, WakuMessage)]
filters*: Filters
rng*: ref BrHmacDrbgContext
started*: bool # Indicates that node has started listening
@ -163,33 +160,6 @@ proc new*(T: type WakuNode, nodeKey: crypto.PrivateKey,
return wakuNode
proc start*(node: WakuNode) {.async.} =
## Starts a created Waku Node.
## Status: Implemented.
node.libp2pTransportLoops = await node.switch.start()
# TODO Get this from WakuNode obj
let peerInfo = node.peerInfo
info "PeerInfo", peerId = peerInfo.peerId, addrs = peerInfo.addrs
let listenStr = $peerInfo.addrs[^1] & "/p2p/" & $peerInfo.peerId
## XXX: this should be /ip4..., / stripped?
info "Listening on", full = listenStr
if not node.wakuRelay.isNil:
await node.wakuRelay.start()
node.started = true
proc stop*(node: WakuNode) {.async.} =
if not node.wakuRelay.isNil:
await node.wakuRelay.stop()
await node.switch.stop()
node.started = false
proc subscribe(node: WakuNode, topic: Topic, handler: Option[TopicHandler]) =
if node.wakuRelay.isNil:
error "Invalid API call to `subscribe`. WakuRelay not mounted."
@ -379,7 +349,6 @@ proc resume*(node: WakuNode, peerList: Option[seq[PeerInfo]] = none(seq[PeerInfo
if retrievedMessages.isOk:
info "the number of retrieved messages since the last online time: ", number=retrievedMessages.value
# TODO Extend with more relevant info: topics, peers, memory usage, online time, etc
proc info*(node: WakuNode): WakuInfo =
## Returns information about the Node, such as what multiaddress it can be reached at.
@ -464,7 +433,6 @@ when defined(rln):
node.wakuRlnRelay = rlnPeer
when defined(rln):
proc addRLNRelayValidator*(node: WakuNode, pubsubTopic: string) =
## this procedure is a thin wrapper for the pubsub addValidator method
@ -480,6 +448,39 @@ when defined(rln):
let pb = PubSub(node.wakuRelay)
pb.addValidator(pubsubTopic, validator)
proc startRelay*(node: WakuNode) {.async.} =
if node.wakuRelay.isNil:
trace "Failed to start relay. Not mounted."
## Setup and start relay protocol
info "starting relay"
# Topic subscriptions
for topic in node.wakuRelay.defaultTopics:
node.subscribe(topic, none(TopicHandler))
# Resume previous relay connections
if node.peerManager.hasPeers(WakuRelayCodec):
info "Found previous WakuRelay peers. Reconnecting."
# Reconnect to previous relay peers. This will respect a backoff period, if necessary
let backoffPeriod = node.wakuRelay.parameters.pruneBackoff + chronos.seconds(BackoffSlackTime)
await node.peerManager.reconnectPeers(WakuRelayCodec,
when defined(rln):
if node.wakuRelay.rlnRelayEnabled:
# TODO currently the message validator is set for the defaultTopic, this can be configurable to accept other pubsub topics as well
addRLNRelayValidator(node, defaultTopic)
info "WakuRLNRelay is mounted successfully"
# Start the WakuRelay protocol
await node.wakuRelay.start()
info "relay started successfully"
proc mountRelay*(node: WakuNode,
topics: seq[string] = newSeq[string](),
rlnRelayEnabled = false,
@ -499,44 +500,34 @@ proc mountRelay*(node: WakuNode,
info "mounting relay", rlnRelayEnabled=rlnRelayEnabled, relayMessages=relayMessages
## The default relay topics is the union of
## all configured topics plus the hard-coded defaultTopic(s)
wakuRelay.defaultTopics = concat(@[defaultTopic], topics)
wakuRelay.rlnRelayEnabled = rlnRelayEnabled
node.switch.mount(wakuRelay, protocolMatcher(WakuRelayCodec))
if not relayMessages:
if relayMessages:
## Some nodes may choose not to have the capability to relay messages (e.g. "light" nodes).
## All nodes, however, currently require WakuRelay, regardless of desired capabilities.
## This is to allow protocol stream negotation with relay-capable nodes to succeed.
## Here we mount relay on the switch only, but do not proceed to subscribe to any pubsub
## topics. We also never start the relay protocol. node.wakuRelay remains nil.
## @TODO: in future, this WakuRelay dependency will be removed completely
## @TODO: in future, this WakuRelay dependency will be removed completely
node.wakuRelay = wakuRelay
node.wakuRelay = wakuRelay
node.subscribe(defaultTopic, none(TopicHandler))
for topic in topics:
node.subscribe(topic, none(TopicHandler))
if node.peerManager.hasPeers(WakuRelayCodec):
trace "Found previous WakuRelay peers. Reconnecting."
# Reconnect to previous relay peers. This will respect a backoff period, if necessary
waitFor node.peerManager.reconnectPeers(WakuRelayCodec,
wakuRelay.parameters.pruneBackoff + chronos.seconds(BackoffSlackTime))
when defined(rln):
if rlnRelayEnabled:
# TODO pass rln relay inputs to this proc, right now it uses default values that are set in the mountRlnRelay proc
info "WakuRLNRelay is enabled"
waitFor mountRlnRelay(node)
# TODO currently the message validator is set for the defaultTopic, this can be configurable to accept other pubsub topics as well
addRLNRelayValidator(node, defaultTopic)
info "WakuRLNRelay is mounted successfully"
info "relay mounted successfully"
if node.started:
# Node has already started. Start the WakuRelay protocol
waitFor node.wakuRelay.start()
info "relay mounted and started successfully"
# Node has started already. Let's start relay too.
waitFor node.startRelay()
proc mountLightPush*(node: WakuNode) {.raises: [Defect, LPError].} =
info "mounting light push"
@ -558,7 +549,7 @@ proc mountLibp2pPing*(node: WakuNode) {.raises: [Defect, LPError].} =
except Exception as e:
# This is necessary as `Ping.new*` does not have explicit `raises` requirement
# @TODO: remove exception handling once explicit `raises` in ping module
raise newException(LPError, "Failed to initialise ping protocol")
raise newException(LPError, "Failed to initialize ping protocol")
@ -652,195 +643,283 @@ proc connectToNodes*(n: WakuNode, nodes: seq[PeerInfo]) {.async.} =
# later.
await sleepAsync(5.seconds)
proc start*(node: WakuNode) {.async.} =
## Starts a created Waku Node and
## all its mounted protocols.
## Status: Implemented.
node.libp2pTransportLoops = await node.switch.start()
# TODO Get this from WakuNode obj
let peerInfo = node.peerInfo
info "PeerInfo", peerId = peerInfo.peerId, addrs = peerInfo.addrs
let listenStr = $peerInfo.addrs[^1] & "/p2p/" & $peerInfo.peerId
## XXX: this should be /ip4..., / stripped?
info "Listening on", full = listenStr
if not node.wakuRelay.isNil:
await node.startRelay()
info "Node started successfully"
node.started = true
proc stop*(node: WakuNode) {.async.} =
if not node.wakuRelay.isNil:
await node.wakuRelay.stop()
await node.switch.stop()
node.started = false
{.pop.} # @TODO confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError
when isMainModule:
## Node setup happens in 6 phases:
## 1. Set up storage
## 2. Initialize node
## 3. Mount and initialize configured protocols
## 4. Start node and mounted protocols
## 5. Start monitoring tools and external interfaces
## 6. Setup graceful shutdown hooks
confutils, json_rpc/rpcserver, metrics,
topics = "wakunode.setup"
# Setup functions #
proc startRpc(node: WakuNode, rpcIp: ValidIpAddress, rpcPort: Port, conf: WakuNodeConf) {.raises: [Defect, RpcBindError, CatchableError].} =
ta = initTAddress(rpcIp, rpcPort)
rpcServer = newRpcHttpServer([ta])
installDebugApiHandlers(node, rpcServer)
# 1/6 Setup storage
proc setupStorage(conf: WakuNodeConf):
SetupResult[tuple[pStorage: WakuPeerStorage, mStorage: WakuMessageStore]] =
# Install enabled API handlers:
if conf.relay:
let topicCache = newTable[string, seq[WakuMessage]]()
installRelayApiHandlers(node, rpcServer, topicCache)
if conf.rpcPrivate:
# Private API access allows WakuRelay functionality that
# is backwards compatible with Waku v1.
installPrivateApiHandlers(node, rpcServer, node.rng, topicCache)
## Setup a SQLite Database for a wakunode based on a supplied
## configuration file and perform all necessary migration.
## If config allows, return peer storage and message store
## for use elsewhere.
if conf.filter:
let messageCache = newTable[ContentTopic, seq[WakuMessage]]()
installFilterApiHandlers(node, rpcServer, messageCache)
if conf.store:
installStoreApiHandlers(node, rpcServer)
if conf.rpcAdmin:
installAdminApiHandlers(node, rpcServer)
info "RPC Server started", ta
sqliteDatabase: SqliteDatabase
storeTuple: tuple[pStorage: WakuPeerStorage, mStorage: WakuMessageStore]
proc startMetricsServer(serverIp: ValidIpAddress, serverPort: Port) {.raises: [Defect, Exception].} =
info "Starting metrics HTTP server", serverIp, serverPort
# Setup DB
if conf.dbPath != "":
let dbRes = SqliteDatabase.init(conf.dbPath)
if dbRes.isErr:
warn "failed to init database", err = dbRes.error
waku_node_errors.inc(labelValues = ["init_db_failure"])
return err("failed to init database")
sqliteDatabase = dbRes.value
if not sqliteDatabase.isNil:
# Database initialized. Let's set it up
sqliteDatabase.runMigrations(conf) # First migrate what we have
if conf.persistPeers:
# Peer persistence enable. Set up Peer table in storage
let res = WakuPeerStorage.new(sqliteDatabase)
if res.isErr:
warn "failed to init new WakuPeerStorage", err = res.error
waku_node_errors.inc(labelValues = ["init_store_failure"])
storeTuple.pStorage = res.value
startMetricsHttpServer($serverIp, serverPort)
if conf.persistMessages:
# Historical message persistence enable. Set up Message table in storage
let res = WakuMessageStore.init(sqliteDatabase)
info "Metrics HTTP server started", serverIp, serverPort
if res.isErr:
warn "failed to init WakuMessageStore", err = res.error
waku_node_errors.inc(labelValues = ["init_store_failure"])
storeTuple.mStorage = res.value
proc startMetricsLog() =
# https://github.com/nim-lang/Nim/issues/17369
var logMetrics: proc(udata: pointer) {.gcsafe, raises: [Defect].}
logMetrics = proc(udata: pointer) =
# TODO: libp2p_pubsub_peers is not public, so we need to make this either
# public in libp2p or do our own peer counting after all.
totalMessages = 0.float64
# 2/6 Initialize node
proc initNode(conf: WakuNodeConf,
pStorage: WakuPeerStorage = nil): SetupResult[WakuNode] =
## Setup a basic Waku v2 node based on a supplied configuration
## file. Optionally include persistent peer storage.
## No protocols are mounted yet.
for key in waku_node_messages.metrics.keys():
totalMessages = totalMessages + waku_node_messages.value(key)
except KeyError:
(extIp, extTcpPort, extUdpPort) = setupNat(conf.nat,
Port(uint16(conf.tcpPort) + conf.portsShift),
Port(uint16(conf.udpPort) + conf.portsShift))
## @TODO: the NAT setup assumes a manual port mapping configuration if extIp config is set. This probably
## implies adding manual config item for extPort as well. The following heuristic assumes that, in absence of manual
## config, the external port is the same as the bind port.
extPort = if extIp.isSome() and extTcpPort.isNone():
some(Port(uint16(conf.tcpPort) + conf.portsShift))
node = WakuNode.new(conf.nodekey,
conf.listenAddress, Port(uint16(conf.tcpPort) + conf.portsShift),
extIp, extPort,
info "Node metrics", totalMessages
discard setTimer(Moment.fromNow(2.seconds), logMetrics)
discard setTimer(Moment.fromNow(2.seconds), logMetrics)
# 3/6 Mount and initialize configured protocols
proc setupProtocols(node: var WakuNode,
conf: WakuNodeConf,
mStorage: WakuMessageStore = nil): SetupResult[bool] =
## Setup configured protocols on an existing Waku v2 node.
## Optionally include persistent message storage.
## No protocols are started yet.
# Mount relay on all nodes
conf.topics.split(" "),
rlnRelayEnabled = conf.rlnRelay,
relayMessages = conf.relay) # Indicates if node is capable to relay messages
# Keepalive mounted on all nodes
if conf.swap:
# TODO Set swap peer, for now should be same as store peer
# Store setup
if (conf.storenode != "") or (conf.store):
mountStore(node, mStorage, conf.persistMessages)
if conf.storenode != "":
setStorePeer(node, conf.storenode)
# NOTE Must be mounted after relay
if (conf.lightpushnode != "") or (conf.lightpush):
if conf.lightpushnode != "":
setLightPushPeer(node, conf.lightpushnode)
# Filter setup. NOTE Must be mounted after relay
if (conf.filternode != "") or (conf.filter):
if conf.filternode != "":
setFilterPeer(node, conf.filternode)
ok(true) # Success
# 4/6 Start node and mounted protocols
proc startNode(node: WakuNode, conf: WakuNodeConf): SetupResult[bool] =
## Start a configured node and all mounted protocols.
## Resume history, connect to static nodes and start
## keep-alive, if configured.
# Start Waku v2 node
waitFor node.start()
# Resume historical messages, this has to be called after the node has been started
if conf.store and conf.persistMessages:
waitFor node.resume()
# Connect to configured static nodes
if conf.staticnodes.len > 0:
waitFor connectToNodes(node, conf.staticnodes)
# Start keepalive, if enabled
if conf.keepAlive:
ok(true) # Success
# 5/6 Start monitoring tools and external interfaces
proc startExternal(node: WakuNode, conf: WakuNodeConf): SetupResult[bool] =
## Start configured external interfaces and monitoring tools
## on a Waku v2 node, including the RPC API and metrics
## monitoring ports.
if conf.rpc:
startRpc(node, conf.rpcAddress, Port(conf.rpcPort + conf.portsShift), conf)
if conf.metricsLogging:
if conf.metricsServer:
Port(conf.metricsServerPort + conf.portsShift))
ok(true) # Success
conf = WakuNodeConf.load()
# Storage setup
var sqliteDatabase: SqliteDatabase
if conf.dbPath != "":
let dbRes = SqliteDatabase.init(conf.dbPath)
if dbRes.isErr:
warn "failed to init database", err = dbRes.error
waku_node_errors.inc(labelValues = ["init_db_failure"])
sqliteDatabase = dbRes.value
if not sqliteDatabase.isNil:
var migrationPath = ""
if conf.persistPeers and conf.persistMessages: migrationPath = migration_types.ALL_STORE_MIGRATION_PATH
elif conf.persistPeers: migrationPath = migration_types.PEER_STORE_MIGRATION_PATH
elif conf.persistMessages: migrationPath = migration_types.MESSAGE_STORE_MIGRATION_PATH
# run migration
info "running migration ... "
let migrationResult = sqliteDatabase.migrate(migrationPath)
if migrationResult.isErr:
warn "migration failed"
info "migration is done"
var pStorage: WakuPeerStorage
if conf.persistPeers and not sqliteDatabase.isNil:
let res = WakuPeerStorage.new(sqliteDatabase)
if res.isErr:
warn "failed to init new WakuPeerStorage", err = res.error
waku_node_errors.inc(labelValues = ["init_store_failure"])
pStorage = res.value
(extIp, extTcpPort, extUdpPort) = setupNat(conf.nat, clientId,
Port(uint16(conf.tcpPort) + conf.portsShift),
Port(uint16(conf.udpPort) + conf.portsShift))
## @TODO: the NAT setup assumes a manual port mapping configuration if extIp config is set. This probably
## implies adding manual config item for extPort as well. The following heuristic assumes that, in absence of manual
## config, the external port is the same as the bind port.
extPort = if extIp.isSome() and extTcpPort.isNone(): some(Port(uint16(conf.tcpPort) + conf.portsShift))
else: extTcpPort
node = WakuNode.new(conf.nodekey,
conf.listenAddress, Port(uint16(conf.tcpPort) + conf.portsShift),
extIp, extPort,
waitFor node.start()
if conf.swap:
# TODO Set swap peer, for now should be same as store peer
# Store setup
if (conf.storenode != "") or (conf.store):
var store: WakuMessageStore
if (not sqliteDatabase.isNil) and conf.persistMessages:
let res = WakuMessageStore.init(sqliteDatabase)
if res.isErr:
warn "failed to init WakuMessageStore", err = res.error
waku_node_errors.inc(labelValues = ["init_store_failure"])
store = res.value
mountStore(node, store, conf.persistMessages)
if conf.storenode != "":
setStorePeer(node, conf.storenode)
# Relay setup
conf.topics.split(" "),
rlnRelayEnabled = conf.rlnRelay,
relayMessages = conf.relay) # Indicates if node is capable to relay messages
# Keepalive mounted on all nodes
node: WakuNode # This is the node we're going to setup using the conf
# Node setup #
debug "1/6 Setting up storage"
pStorage: WakuPeerStorage
mStorage: WakuMessageStore
# Resume historical messages, this has to be called after the relay setup
if conf.store and conf.persistMessages:
waitFor node.resume()
let setupStorageRes = setupStorage(conf)
if conf.staticnodes.len > 0:
waitFor connectToNodes(node, conf.staticnodes)
if setupStorageRes.isErr:
error "1/6 Setting up storage failed. Continuing without storage."
(pStorage, mStorage) = setupStorageRes.get()
# NOTE Must be mounted after relay
if (conf.lightpushnode != "") or (conf.lightpush):
debug "2/6 Initializing node"
if conf.lightpushnode != "":
setLightPushPeer(node, conf.lightpushnode)
let initNodeRes = initNode(conf, pStorage)
if initNodeRes.isErr:
error "2/6 Initializing node failed. Quitting."
node = initNodeRes.get()
debug "3/6 Mounting protocols"
let setupProtocolsRes = setupProtocols(node, conf, mStorage)
if setupProtocolsRes.isErr:
error "3/6 Mounting protocols failed. Continuing in current state."
debug "4/6 Starting node and mounted protocols"
# Filter setup. NOTE Must be mounted after relay
if (conf.filternode != "") or (conf.filter):
let startNodeRes = startNode(node, conf)
if conf.filternode != "":
setFilterPeer(node, conf.filternode)
if startNodeRes.isErr:
error "4/6 Starting node and mounted protocols failed. Continuing in current state."
if conf.rpc:
startRpc(node, conf.rpcAddress, Port(conf.rpcPort + conf.portsShift), conf)
debug "5/6 Starting monitoring and external interfaces"
if conf.metricsLogging:
let startExternalRes = startExternal(node, conf)
if conf.metricsServer:
Port(conf.metricsServerPort + conf.portsShift))
# Setup graceful shutdown
if startExternalRes.isErr:
error "5/6 Starting monitoring and external interfaces failed. Continuing in current state."
debug "6/6 Setting up shutdown hooks"
# 6/6 Setup graceful shutdown hooks
## Setup shutdown hooks for this process.
## Stop node gracefully on shutdown.
# Handle Ctrl-C SIGINT
proc handleCtrlC() {.noconv.} =
@ -862,8 +941,6 @@ when isMainModule:
c_signal(SIGTERM, handleSigterm)
# Start keepalive, if enabled
if conf.keepAlive:
debug "Node setup complete"