mirror of https://github.com/waku-org/nwaku.git
chore: migrating logic from wakunode2.nim to node_factory.nim (#2504)
This commit is contained in:
parent
b890f9c663
commit
dcc88ee0b2
|
@ -65,13 +65,10 @@ type
|
|||
App* = object
|
||||
version: string
|
||||
conf: WakuNodeConf
|
||||
netConf: NetConfig
|
||||
rng: ref HmacDrbgContext
|
||||
key: crypto.PrivateKey
|
||||
record: Record
|
||||
|
||||
wakuDiscv5: Option[WakuDiscoveryV5]
|
||||
peerStore: Option[WakuPeerStorage]
|
||||
dynamicBootstrapNodes: seq[RemotePeerInfo]
|
||||
|
||||
node: WakuNode
|
||||
|
@ -88,74 +85,43 @@ func node*(app: App): WakuNode =
|
|||
func version*(app: App): string =
|
||||
app.version
|
||||
|
||||
|
||||
## Initialisation
|
||||
|
||||
proc init*(T: type App, rng: ref HmacDrbgContext, conf: WakuNodeConf): T =
|
||||
let key =
|
||||
if conf.nodeKey.isSome():
|
||||
conf.nodeKey.get()
|
||||
else:
|
||||
let keyRes = crypto.PrivateKey.random(Secp256k1, rng[])
|
||||
proc init*(T: type App, conf: WakuNodeConf): Result[App, string] =
|
||||
|
||||
if keyRes.isErr():
|
||||
error "failed to generate key", error=keyRes.error
|
||||
quit(QuitFailure)
|
||||
var confCopy = conf
|
||||
let rng = crypto.newRng()
|
||||
|
||||
keyRes.get()
|
||||
if not confCopy.nodekey.isSome():
|
||||
let keyRes = crypto.PrivateKey.random(Secp256k1, rng[])
|
||||
if keyRes.isErr():
|
||||
error "Failed to generate key", error = $keyRes.error
|
||||
return err("Failed to generate key: " & $keyRes.error)
|
||||
confCopy.nodekey = some(keyRes.get())
|
||||
|
||||
let netConfigRes = networkConfiguration(conf, clientId)
|
||||
debug "Retrieve dynamic bootstrap nodes"
|
||||
let dynamicBootstrapNodesRes = retrieveDynamicBootstrapNodes(confCopy.dnsDiscovery,
|
||||
confCopy.dnsDiscoveryUrl,
|
||||
confCopy.dnsDiscoveryNameServers)
|
||||
if dynamicBootstrapNodesRes.isErr():
|
||||
error "Retrieving dynamic bootstrap nodes failed", error = dynamicBootstrapNodesRes.error
|
||||
return err("Retrieving dynamic bootstrap nodes failed: " & dynamicBootstrapNodesRes.error)
|
||||
|
||||
let netConfig =
|
||||
if netConfigRes.isErr():
|
||||
error "failed to create internal config", error=netConfigRes.error
|
||||
quit(QuitFailure)
|
||||
else: netConfigRes.get()
|
||||
let nodeRes = setupNode(confCopy, some(rng))
|
||||
if nodeRes.isErr():
|
||||
error "Failed setting up node", error=nodeRes.error
|
||||
return err("Failed setting up node: " & nodeRes.error)
|
||||
|
||||
let recordRes = enrConfiguration(conf, netConfig, key)
|
||||
var app = App(
|
||||
version: git_version,
|
||||
conf: confCopy,
|
||||
rng: rng,
|
||||
key: confCopy.nodekey.get(),
|
||||
node: nodeRes.get(),
|
||||
dynamicBootstrapNodes: dynamicBootstrapNodesRes.get()
|
||||
)
|
||||
|
||||
let record =
|
||||
if recordRes.isErr():
|
||||
error "failed to create record", error=recordRes.error
|
||||
quit(QuitFailure)
|
||||
else: recordRes.get()
|
||||
|
||||
if isClusterMismatched(record, conf.clusterId):
|
||||
error "cluster id mismatch configured shards"
|
||||
quit(QuitFailure)
|
||||
|
||||
App(
|
||||
version: git_version,
|
||||
conf: conf,
|
||||
netConf: netConfig,
|
||||
rng: rng,
|
||||
key: key,
|
||||
record: record,
|
||||
node: nil
|
||||
)
|
||||
|
||||
proc setupPeerPersistence*(app: var App): AppResult[void] =
|
||||
if not app.conf.peerPersistence:
|
||||
return ok()
|
||||
|
||||
let peerStoreRes = setupPeerStorage()
|
||||
if peerStoreRes.isErr():
|
||||
return err("failed to setup peer store" & peerStoreRes.error)
|
||||
|
||||
app.peerStore = peerStoreRes.get()
|
||||
|
||||
ok()
|
||||
|
||||
proc setupDyamicBootstrapNodes*(app: var App): AppResult[void] =
|
||||
let dynamicBootstrapNodesRes = retrieveDynamicBootstrapNodes(app.conf.dnsDiscovery,
|
||||
app.conf.dnsDiscoveryUrl,
|
||||
app.conf.dnsDiscoveryNameServers)
|
||||
if dynamicBootstrapNodesRes.isOk():
|
||||
app.dynamicBootstrapNodes = dynamicBootstrapNodesRes.get()
|
||||
else:
|
||||
warn "2/7 Retrieving dynamic bootstrap nodes failed. Continuing without dynamic bootstrap nodes.", error=dynamicBootstrapNodesRes.error
|
||||
|
||||
ok()
|
||||
ok(app)
|
||||
|
||||
## Setup DiscoveryV5
|
||||
|
||||
|
@ -190,25 +156,11 @@ proc setupDiscoveryV5*(app: App): WakuDiscoveryV5 =
|
|||
WakuDiscoveryV5.new(
|
||||
app.rng,
|
||||
discv5Conf,
|
||||
some(app.record),
|
||||
some(app.node.enr),
|
||||
some(app.node.peerManager),
|
||||
app.node.topicSubscriptionQueue,
|
||||
)
|
||||
|
||||
proc setupWakuApp*(app: var App): AppResult[void] =
|
||||
## Waku node
|
||||
let initNodeRes = initNode(app.conf, app.netConf, app.rng, app.key, app.record, app.peerStore, app.dynamicBootstrapNodes)
|
||||
if initNodeRes.isErr():
|
||||
return err("failed to init node: " & initNodeRes.error)
|
||||
|
||||
app.node = initNodeRes.get()
|
||||
|
||||
## Discv5
|
||||
if app.conf.discv5Discovery:
|
||||
app.wakuDiscV5 = some(app.setupDiscoveryV5())
|
||||
|
||||
ok()
|
||||
|
||||
proc getPorts(listenAddrs: seq[MultiAddress]):
|
||||
AppResult[tuple[tcpPort, websocketPort: Option[Port]]] =
|
||||
|
||||
|
@ -227,7 +179,7 @@ proc getPorts(listenAddrs: seq[MultiAddress]):
|
|||
|
||||
return ok((tcpPort: tcpPort, websocketPort: websocketPort))
|
||||
|
||||
proc updateNetConfig(app: var App): AppResult[void] =
|
||||
proc getRunningNetConfig(app: App): AppResult[NetConfig] =
|
||||
|
||||
var conf = app.conf
|
||||
let (tcpPort, websocketPort) = getPorts(app.node.switch.peerInfo.listenAddrs).valueOr:
|
||||
|
@ -243,52 +195,39 @@ proc updateNetConfig(app: var App): AppResult[void] =
|
|||
let netConf = networkConfiguration(conf, clientId).valueOr:
|
||||
return err("Could not update NetConfig: " & error)
|
||||
|
||||
app.netConf = netConf
|
||||
return ok(netConf)
|
||||
|
||||
return ok()
|
||||
proc updateEnr(app: var App, netConf: NetConfig): AppResult[void] =
|
||||
|
||||
proc updateEnr(app: var App): AppResult[void] =
|
||||
|
||||
let record = enrConfiguration(app.conf, app.netConf, app.key).valueOr:
|
||||
let record = enrConfiguration(app.conf, netConf, app.key).valueOr:
|
||||
return err("ENR setup failed: " & error)
|
||||
|
||||
if isClusterMismatched(record, app.conf.clusterId):
|
||||
return err("cluster id mismatch configured shards")
|
||||
|
||||
app.record = record
|
||||
app.node.enr = record
|
||||
|
||||
if app.conf.discv5Discovery:
|
||||
app.wakuDiscV5 = some(app.setupDiscoveryV5())
|
||||
|
||||
return ok()
|
||||
|
||||
proc updateApp(app: var App): AppResult[void] =
|
||||
|
||||
if app.conf.tcpPort == Port(0) or app.conf.websocketPort == Port(0):
|
||||
|
||||
updateNetConfig(app).isOkOr:
|
||||
let netConf = getRunningNetConfig(app).valueOr:
|
||||
return err("error calling updateNetConfig: " & $error)
|
||||
|
||||
updateEnr(app).isOkOr:
|
||||
updateEnr(app, netConf).isOkOr:
|
||||
return err("error calling updateEnr: " & $error)
|
||||
|
||||
app.node.announcedAddresses = app.netConf.announcedAddresses
|
||||
app.node.announcedAddresses = netConf.announcedAddresses
|
||||
|
||||
printNodeNetworkInfo(app.node)
|
||||
|
||||
return ok()
|
||||
|
||||
proc setupAndMountProtocols*(app: App): Future[AppResult[void]] {.async.} =
|
||||
return await setupProtocols(
|
||||
app.node,
|
||||
app.conf,
|
||||
app.key
|
||||
)
|
||||
|
||||
proc startApp*(app: var App): AppResult[void] =
|
||||
|
||||
let nodeRes = catch: (waitFor startNode(app.node,app.conf,app.dynamicBootstrapNodes))
|
||||
let nodeRes = catch: (waitFor startNode(app.node, app.conf, app.dynamicBootstrapNodes))
|
||||
if nodeRes.isErr():
|
||||
return err("exception starting node: " & nodeRes.error.msg)
|
||||
|
||||
|
@ -299,6 +238,10 @@ proc startApp*(app: var App): AppResult[void] =
|
|||
app.updateApp().isOkOr:
|
||||
return err("Error in updateApp: " & $error)
|
||||
|
||||
## Discv5
|
||||
if app.conf.discv5Discovery:
|
||||
app.wakuDiscV5 = some(app.setupDiscoveryV5())
|
||||
|
||||
if app.wakuDiscv5.isSome():
|
||||
let wakuDiscv5 = app.wakuDiscv5.get()
|
||||
let catchRes = catch: (waitFor wakuDiscv5.start())
|
||||
|
@ -464,7 +407,7 @@ proc setupMonitoringAndExternalInterfaces*(app: var App): AppResult[void] =
|
|||
if app.conf.rest:
|
||||
let startRestServerRes = startRestServer(app, app.conf.restAddress, Port(app.conf.restPort + app.conf.portsShift), app.conf)
|
||||
if startRestServerRes.isErr():
|
||||
error "6/7 Starting REST server failed. Continuing in current state.", error=startRestServerRes.error
|
||||
error "Starting REST server failed. Continuing in current state.", error=startRestServerRes.error
|
||||
else:
|
||||
app.restServer = some(startRestServerRes.value)
|
||||
|
||||
|
@ -472,14 +415,14 @@ proc setupMonitoringAndExternalInterfaces*(app: var App): AppResult[void] =
|
|||
if app.conf.metricsServer:
|
||||
let startMetricsServerRes = startMetricsServer(app.conf.metricsServerAddress, Port(app.conf.metricsServerPort + app.conf.portsShift))
|
||||
if startMetricsServerRes.isErr():
|
||||
error "6/7 Starting metrics server failed. Continuing in current state.", error=startMetricsServerRes.error
|
||||
error "Starting metrics server failed. Continuing in current state.", error=startMetricsServerRes.error
|
||||
else:
|
||||
app.metricsServer = some(startMetricsServerRes.value)
|
||||
|
||||
if app.conf.metricsLogging:
|
||||
let startMetricsLoggingRes = startMetricsLogging()
|
||||
if startMetricsLoggingRes.isErr():
|
||||
error "6/7 Starting metrics console logging failed. Continuing in current state.", error=startMetricsLoggingRes.error
|
||||
error "Starting metrics console logging failed. Continuing in current state.", error=startMetricsLoggingRes.error
|
||||
|
||||
ok()
|
||||
|
||||
|
|
|
@ -63,7 +63,6 @@ when isMainModule:
|
|||
## 6. Setup graceful shutdown hooks
|
||||
|
||||
const versionString = "version / git commit hash: " & app.git_version
|
||||
let rng = crypto.newRng()
|
||||
|
||||
let confRes = WakuNodeConf.load(version = versionString)
|
||||
if confRes.isErr():
|
||||
|
@ -111,60 +110,22 @@ when isMainModule:
|
|||
conf.rlnEpochSizeSec = twnClusterConf.rlnEpochSizeSec
|
||||
conf.rlnRelayUserMessageLimit = twnClusterConf.rlnRelayUserMessageLimit
|
||||
|
||||
var wakunode2 = App.init(rng, conf)
|
||||
|
||||
info "Running nwaku node", version = app.git_version
|
||||
logConfig(conf)
|
||||
|
||||
|
||||
##############
|
||||
# Node setup #
|
||||
##############
|
||||
|
||||
debug "1/7 Setting up storage"
|
||||
|
||||
## Peer persistence
|
||||
let res1 = wakunode2.setupPeerPersistence()
|
||||
if res1.isErr():
|
||||
error "1/7 Setting up storage failed", error = res1.error
|
||||
var wakunode2 = App.init(conf).valueOr:
|
||||
error "App initialization failed", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
debug "2/7 Retrieve dynamic bootstrap nodes"
|
||||
|
||||
let res3 = wakunode2.setupDyamicBootstrapNodes()
|
||||
if res3.isErr():
|
||||
error "2/7 Retrieving dynamic bootstrap nodes failed", error = res3.error
|
||||
wakunode2.startApp().isOkOr:
|
||||
error "Starting app failed", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
debug "3/7 Initializing node"
|
||||
wakunode2.setupMonitoringAndExternalInterfaces().isOkOr:
|
||||
error "Starting monitoring and external interfaces failed", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
let res4 = wakunode2.setupWakuApp()
|
||||
if res4.isErr():
|
||||
error "3/7 Initializing node failed", error = res4.error
|
||||
quit(QuitFailure)
|
||||
|
||||
debug "4/7 Mounting protocols"
|
||||
|
||||
let res5 = waitFor wakunode2.setupAndMountProtocols()
|
||||
if res5.isErr():
|
||||
error "4/7 Mounting protocols failed", error = res5.error
|
||||
quit(QuitFailure)
|
||||
|
||||
debug "5/7 Starting node and mounted protocols"
|
||||
|
||||
let res6 = wakunode2.startApp()
|
||||
if res6.isErr():
|
||||
error "5/7 Starting node and protocols failed", error = res6.error
|
||||
quit(QuitFailure)
|
||||
|
||||
debug "6/7 Starting monitoring and external interfaces"
|
||||
|
||||
let res7 = wakunode2.setupMonitoringAndExternalInterfaces()
|
||||
if res7.isErr():
|
||||
error "6/7 Starting monitoring and external interfaces failed", error = res7.error
|
||||
quit(QuitFailure)
|
||||
|
||||
debug "7/7 Setting up shutdown hooks"
|
||||
debug "Setting up shutdown hooks"
|
||||
## Setup shutdown hooks for this process.
|
||||
## Stop node gracefully on shutdown.
|
||||
|
||||
|
|
|
@ -22,7 +22,8 @@ suite "Wakunode2 - App":
|
|||
## Given
|
||||
let conf = defaultTestWakuNodeConf()
|
||||
|
||||
var wakunode2 = App.init(rng(), conf)
|
||||
let wakunode2 = App.init(conf).valueOr:
|
||||
raiseAssert error
|
||||
|
||||
## When
|
||||
let version = wakunode2.version
|
||||
|
@ -37,39 +38,26 @@ suite "Wakunode2 - App initialization":
|
|||
var conf = defaultTestWakuNodeConf()
|
||||
conf.peerPersistence = true
|
||||
|
||||
var wakunode2 = App.init(rng(), conf)
|
||||
let wakunode2 = App.init(conf).valueOr:
|
||||
raiseAssert error
|
||||
|
||||
## When
|
||||
let res = wakunode2.setupPeerPersistence()
|
||||
|
||||
## Then
|
||||
assert res.isOk(), $res.error
|
||||
check:
|
||||
not wakunode2.node.peerManager.storage.isNil()
|
||||
|
||||
test "node setup is successful with default configuration":
|
||||
## Given
|
||||
let conf = defaultTestWakuNodeConf()
|
||||
|
||||
## When
|
||||
var wakunode2 = App.init(rng(), conf)
|
||||
var wakunode2 = App.init(conf).valueOr:
|
||||
raiseAssert error
|
||||
|
||||
let persRes = wakunode2.setupPeerPersistence()
|
||||
assert persRes.isOk(), persRes.error
|
||||
wakunode2.startApp().isOkOr:
|
||||
raiseAssert error
|
||||
|
||||
let bootRes = wakunode2.setupDyamicBootstrapNodes()
|
||||
assert bootRes.isOk(), bootRes.error
|
||||
|
||||
let setupRes = wakunode2.setupWakuApp()
|
||||
assert setupRes.isOk(), setupRes.error
|
||||
|
||||
let mountRes = waitFor wakunode2.setupAndMountProtocols()
|
||||
let mountRes = wakunode2.setupMonitoringAndExternalInterfaces()
|
||||
assert mountRes.isOk(), mountRes.error
|
||||
|
||||
let startRes = wakunode2.startApp()
|
||||
assert startRes.isOk(), startRes.error
|
||||
|
||||
let monitorRes = wakunode2.setupMonitoringAndExternalInterfaces()
|
||||
assert monitorRes.isOk(), monitorRes.error
|
||||
|
||||
## Then
|
||||
let node = wakunode2.node
|
||||
check:
|
||||
|
@ -88,22 +76,18 @@ suite "Wakunode2 - App initialization":
|
|||
conf.tcpPort = Port(0)
|
||||
|
||||
## When
|
||||
var wakunode2 = App.init(rng(), conf)
|
||||
require wakunode2.setupPeerPersistence().isOk()
|
||||
require wakunode2.setupDyamicBootstrapNodes().isOk()
|
||||
require wakunode2.setupWakuApp().isOk()
|
||||
require isOk(waitFor wakunode2.setupAndMountProtocols())
|
||||
require isOk(wakunode2.startApp())
|
||||
require wakunode2.setupMonitoringAndExternalInterfaces().isOk()
|
||||
var wakunode2 = App.init(conf).valueOr:
|
||||
raiseAssert error
|
||||
|
||||
wakunode2.startApp().isOkOr:
|
||||
raiseAssert error
|
||||
|
||||
## Then
|
||||
let
|
||||
node = wakunode2.node
|
||||
typedNodeEnr = node.enr.toTypedRecord()
|
||||
typedAppEnr = wakunode2.record.toTypedRecord()
|
||||
|
||||
assert typedNodeEnr.isOk(), $typedNodeEnr.error
|
||||
assert typedAppEnr.isOk(), $typedAppEnr.error
|
||||
|
||||
check:
|
||||
# App started properly
|
||||
|
@ -114,10 +98,7 @@ suite "Wakunode2 - App initialization":
|
|||
not node.rendezvous.isNil()
|
||||
|
||||
# DS structures are updated with dynamic ports
|
||||
wakunode2.netConf.bindPort != Port(0)
|
||||
wakunode2.netConf.enrPort.get() != Port(0)
|
||||
typedNodeEnr.get().tcp.get() != 0
|
||||
typedAppEnr.get().tcp.get() != 0
|
||||
|
||||
## Cleanup
|
||||
waitFor wakunode2.stop()
|
||||
|
|
|
@ -372,4 +372,56 @@ proc startNode*(node: WakuNode, conf: WakuNodeConf,
|
|||
if conf.relay:
|
||||
node.peerManager.start()
|
||||
|
||||
return ok()
|
||||
return ok()
|
||||
|
||||
proc setupNode*(conf: WakuNodeConf, rng: Option[ref HmacDrbgContext] = none(ref HmacDrbgContext)):
|
||||
Result[WakuNode, string] =
|
||||
var nodeRng = if rng.isSome(): rng.get() else: crypto.newRng()
|
||||
|
||||
# Use provided key only if corresponding rng is also provided
|
||||
let key =
|
||||
if conf.nodeKey.isSome() and rng.isSome():
|
||||
conf.nodeKey.get()
|
||||
else:
|
||||
warn "missing key or rng, generating new set"
|
||||
crypto.PrivateKey.random(Secp256k1, nodeRng[]).valueOr:
|
||||
error "Failed to generate key", error=error
|
||||
return err("Failed to generate key: " & $error)
|
||||
|
||||
let netConfig = networkConfiguration(conf, clientId).valueOr:
|
||||
error "failed to create internal config", error=error
|
||||
return err("failed to create internal config: " & error)
|
||||
|
||||
let record = enrConfiguration(conf, netConfig, key).valueOr:
|
||||
error "failed to create record", error=error
|
||||
return err("failed to create record: " & error)
|
||||
|
||||
if isClusterMismatched(record, conf.clusterId):
|
||||
error "cluster id mismatch configured shards"
|
||||
return err("cluster id mismatch configured shards")
|
||||
|
||||
debug "Setting up storage"
|
||||
|
||||
## Peer persistence
|
||||
var peerStore: Option[WakuPeerStorage]
|
||||
if conf.peerPersistence:
|
||||
peerStore = setupPeerStorage().valueOr:
|
||||
error "Setting up storage failed", error = "failed to setup peer store " & error
|
||||
return err("Setting up storage failed: " & error)
|
||||
|
||||
debug "Initializing node"
|
||||
|
||||
let node = initNode(conf, netConfig, nodeRng, key, record, peerStore).valueOr:
|
||||
error "Initializing node failed", error = error
|
||||
return err("Initializing node failed: " & error)
|
||||
|
||||
debug "Mounting protocols"
|
||||
|
||||
try:
|
||||
(waitFor node.setupProtocols(conf, key)).isOkOr:
|
||||
error "Mounting protocols failed", error = error
|
||||
return err("Mounting protocols failed: " & error)
|
||||
except CatchableError:
|
||||
return err("Exception setting up protocols: " & getCurrentExceptionMsg())
|
||||
|
||||
return ok(node)
|
|
@ -75,7 +75,7 @@ type
|
|||
initialBackoffInSec*: int
|
||||
backoffFactor*: int
|
||||
maxFailedAttempts*: int
|
||||
storage: PeerStorage
|
||||
storage*: PeerStorage
|
||||
serviceSlots*: Table[string, RemotePeerInfo]
|
||||
maxRelayPeers*: int
|
||||
outRelayPeersTarget: int
|
||||
|
|
Loading…
Reference in New Issue