nwaku/waku/factory/waku.nim

316 lines
9.9 KiB
Nim
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

{.push raises: [].}
import
std/[options, sequtils],
results,
chronicles,
chronos,
libp2p/wire,
libp2p/multicodec,
libp2p/crypto/crypto,
libp2p/protocols/pubsub/gossipsub,
libp2p/peerid,
eth/keys,
presto,
metrics,
metrics/chronos_httpserver
import
../common/logging,
../waku_core,
../waku_node,
../node/peer_manager,
../node/health_monitor,
../node/delivery_monitor/delivery_monitor,
../waku_api/message_cache,
../waku_api/rest/server,
../waku_archive,
../discovery/waku_dnsdisc,
../discovery/waku_discv5,
../waku_enr/sharding,
../waku_rln_relay,
../waku_store,
../waku_filter_v2,
../factory/networks_config,
../factory/node_factory,
../factory/internal_config,
../factory/external_config
logScope:
topics = "wakunode waku"
# Git version in git describe format (defined at compile time)
const git_version* {.strdefine.} = "n/a"
type Waku* = object
version: string
conf: WakuNodeConf
rng: ref HmacDrbgContext
key: crypto.PrivateKey
wakuDiscv5*: WakuDiscoveryV5
dynamicBootstrapNodes: seq[RemotePeerInfo]
node*: WakuNode
deliveryMonitor: DeliveryMonitor
restServer*: WakuRestServerRef
metricsServer*: MetricsHttpServerRef
proc logConfig(conf: WakuNodeConf) =
info "Configuration: Enabled protocols",
relay = conf.relay,
rlnRelay = conf.rlnRelay,
store = conf.store,
filter = conf.filter,
lightpush = conf.lightpush,
peerExchange = conf.peerExchange
info "Configuration. Network", cluster = conf.clusterId, maxPeers = conf.maxRelayPeers
for shard in conf.shards:
info "Configuration. Shards", shard = shard
for i in conf.discv5BootstrapNodes:
info "Configuration. Bootstrap nodes", node = i
if conf.rlnRelay and conf.rlnRelayDynamic:
info "Configuration. Validation",
mechanism = "onchain rln",
contract = conf.rlnRelayEthContractAddress,
maxMessageSize = conf.maxMessageSize,
rlnEpochSizeSec = conf.rlnEpochSizeSec,
rlnRelayUserMessageLimit = conf.rlnRelayUserMessageLimit,
rlnRelayEthClientAddress = string(conf.rlnRelayEthClientAddress)
func version*(waku: Waku): string =
waku.version
proc validateShards(conf: WakuNodeConf): Result[void, string] =
let numShardsInNetwork = getNumShardsInNetwork(conf)
for shard in conf.shards:
if shard >= numShardsInNetwork:
let msg =
"validateShards invalid shard: " & $shard & " when numShardsInNetwork: " &
$numShardsInNetwork # fmt doesn't work
error "validateShards failed", error = msg
return err(msg)
return ok()
## Initialisation
proc init*(T: type Waku, confCopy: var WakuNodeConf): Result[Waku, string] =
let rng = crypto.newRng()
logging.setupLog(confCopy.logLevel, confCopy.logFormat)
# TODO: remove after pubsubtopic config gets removed
var shards = newSeq[uint16]()
if confCopy.pubsubTopics.len > 0:
let shardsRes = topicsToRelayShards(confCopy.pubsubTopics)
if shardsRes.isErr():
error "failed to parse pubsub topic, please format according to static shard specification",
error = shardsRes.error
return err("failed to parse pubsub topic: " & $shardsRes.error)
let shardsOpt = shardsRes.get()
if shardsOpt.isSome():
let relayShards = shardsOpt.get()
if relayShards.clusterId != confCopy.clusterId:
error "clusterId of the pubsub topic should match the node's cluster. e.g. --pubsub-topic=/waku/2/rs/22/1 and --cluster-id=22",
nodeCluster = confCopy.clusterId, pubsubCluster = relayShards.clusterId
return err(
"clusterId of the pubsub topic should match the node's cluster. e.g. --pubsub-topic=/waku/2/rs/22/1 and --cluster-id=22"
)
for shard in relayShards.shardIds:
shards.add(shard)
confCopy.shards = shards
case confCopy.clusterId
# cluster-id=1 (aka The Waku Network)
of 1:
let twnClusterConf = ClusterConf.TheWakuNetworkConf()
# Override configuration
confCopy.maxMessageSize = twnClusterConf.maxMessageSize
confCopy.clusterId = twnClusterConf.clusterId
confCopy.rlnRelayEthContractAddress = twnClusterConf.rlnRelayEthContractAddress
confCopy.rlnRelayChainId = twnClusterConf.rlnRelayChainId
confCopy.rlnRelayDynamic = twnClusterConf.rlnRelayDynamic
confCopy.rlnRelayBandwidthThreshold = twnClusterConf.rlnRelayBandwidthThreshold
confCopy.discv5Discovery = twnClusterConf.discv5Discovery
confCopy.discv5BootstrapNodes =
confCopy.discv5BootstrapNodes & twnClusterConf.discv5BootstrapNodes
confCopy.rlnEpochSizeSec = twnClusterConf.rlnEpochSizeSec
confCopy.rlnRelayUserMessageLimit = twnClusterConf.rlnRelayUserMessageLimit
confCopy.numShardsInNetwork = twnClusterConf.numShardsInNetwork
# Only set rlnRelay to true if relay is configured
if confCopy.relay:
confCopy.rlnRelay = twnClusterConf.rlnRelay
else:
discard
info "Running nwaku node", version = git_version
logConfig(confCopy)
let validateShardsRes = validateShards(confCopy)
if validateShardsRes.isErr():
error "Failed validating shards", error = $validateShardsRes.error
return err("Failed validating shards: " & $validateShardsRes.error)
if not confCopy.nodekey.isSome():
let keyRes = crypto.PrivateKey.random(Secp256k1, rng[])
if keyRes.isErr():
error "Failed to generate key", error = $keyRes.error
return err("Failed to generate key: " & $keyRes.error)
confCopy.nodekey = some(keyRes.get())
debug "Retrieve dynamic bootstrap nodes"
let dynamicBootstrapNodesRes = waku_dnsdisc.retrieveDynamicBootstrapNodes(
confCopy.dnsDiscovery, confCopy.dnsDiscoveryUrl, confCopy.dnsDiscoveryNameServers
)
if dynamicBootstrapNodesRes.isErr():
error "Retrieving dynamic bootstrap nodes failed",
error = dynamicBootstrapNodesRes.error
return err(
"Retrieving dynamic bootstrap nodes failed: " & dynamicBootstrapNodesRes.error
)
let nodeRes = setupNode(confCopy, some(rng))
if nodeRes.isErr():
error "Failed setting up node", error = nodeRes.error
return err("Failed setting up node: " & nodeRes.error)
let node = nodeRes.get()
var deliveryMonitor: DeliveryMonitor
if confCopy.reliabilityEnabled:
if confCopy.storenode == "":
return err("A storenode should be set when reliability mode is on")
let deliveryMonitorRes = DeliveryMonitor.new(
node.wakuStoreClient, node.wakuRelay, node.wakuLightpushClient,
node.wakuFilterClient,
)
if deliveryMonitorRes.isErr():
return err("could not create delivery monitor: " & $deliveryMonitorRes.error)
deliveryMonitor = deliveryMonitorRes.get()
var waku = Waku(
version: git_version,
conf: confCopy,
rng: rng,
key: confCopy.nodekey.get(),
node: node,
dynamicBootstrapNodes: dynamicBootstrapNodesRes.get(),
deliveryMonitor: deliveryMonitor,
)
ok(waku)
proc getPorts(
listenAddrs: seq[MultiAddress]
): Result[tuple[tcpPort, websocketPort: Option[Port]], string] =
var tcpPort, websocketPort = none(Port)
for a in listenAddrs:
if a.isWsAddress():
if websocketPort.isNone():
let wsAddress = initTAddress(a).valueOr:
return err("getPorts wsAddr error:" & $error)
websocketPort = some(wsAddress.port)
elif tcpPort.isNone():
let tcpAddress = initTAddress(a).valueOr:
return err("getPorts tcpAddr error:" & $error)
tcpPort = some(tcpAddress.port)
return ok((tcpPort: tcpPort, websocketPort: websocketPort))
proc getRunningNetConfig(waku: ptr Waku): Result[NetConfig, string] =
var conf = waku[].conf
let (tcpPort, websocketPort) = getPorts(waku[].node.switch.peerInfo.listenAddrs).valueOr:
return err("Could not retrieve ports " & error)
if tcpPort.isSome():
conf.tcpPort = tcpPort.get()
if websocketPort.isSome():
conf.websocketPort = websocketPort.get()
# Rebuild NetConfig with bound port values
let netConf = networkConfiguration(conf, clientId).valueOr:
return err("Could not update NetConfig: " & error)
return ok(netConf)
proc updateEnr(waku: ptr Waku, netConf: NetConfig): Result[void, string] =
let record = enrConfiguration(waku[].conf, netConf, waku[].key).valueOr:
return err("ENR setup failed: " & error)
if isClusterMismatched(record, waku[].conf.clusterId):
return err("cluster id mismatch configured shards")
waku[].node.enr = record
return ok()
proc updateWaku(waku: ptr Waku): Result[void, string] =
if waku[].conf.tcpPort == Port(0) or waku[].conf.websocketPort == Port(0):
let netConf = getRunningNetConfig(waku).valueOr:
return err("error calling updateNetConfig: " & $error)
updateEnr(waku, netConf).isOkOr:
return err("error calling updateEnr: " & $error)
waku[].node.announcedAddresses = netConf.announcedAddresses
printNodeNetworkInfo(waku[].node)
return ok()
proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises: []).} =
if not waku[].conf.discv5Only:
(await startNode(waku.node, waku.conf, waku.dynamicBootstrapNodes)).isOkOr:
return err("error while calling startNode: " & $error)
# Update waku data that is set dynamically on node start
updateWaku(waku).isOkOr:
return err("Error in updateApp: " & $error)
## Discv5
if waku[].conf.discv5Discovery or waku[].conf.discv5Only:
waku[].wakuDiscV5 = waku_discv5.setupDiscoveryV5(
waku.node.enr, waku.node.peerManager, waku.node.topicSubscriptionQueue, waku.conf,
waku.dynamicBootstrapNodes, waku.rng, waku.key,
)
(await waku.wakuDiscV5.start()).isOkOr:
return err("failed to start waku discovery v5: " & $error)
## Reliability
if not waku[].deliveryMonitor.isNil():
waku[].deliveryMonitor.startDeliveryMonitor()
return ok()
# Waku shutdown
proc stop*(waku: Waku): Future[void] {.async: (raises: [Exception]).} =
if not waku.restServer.isNil():
await waku.restServer.stop()
if not waku.metricsServer.isNil():
await waku.metricsServer.stop()
if not waku.wakuDiscv5.isNil():
await waku.wakuDiscv5.stop()
if not waku.node.isNil():
await waku.node.stop()