mirror of
https://github.com/waku-org/nwaku.git
synced 2025-01-14 08:57:14 +00:00
refactor(wakunode2): extract setup proc definitions from main section
This commit is contained in:
parent
bdb120d842
commit
c19081b6ce
@ -49,6 +49,410 @@ logScope:
|
|||||||
topics = "wakunode.setup"
|
topics = "wakunode.setup"
|
||||||
|
|
||||||
|
|
||||||
|
type SetupResult[T] = Result[T, string]
|
||||||
|
|
||||||
|
|
||||||
|
proc setupStorage(conf: WakuNodeConf):
|
||||||
|
SetupResult[tuple[pStorage: WakuPeerStorage, mStorage: MessageStore]] =
|
||||||
|
|
||||||
|
## Setup a SQLite Database for a wakunode based on a supplied
|
||||||
|
## configuration file and perform all necessary migration.
|
||||||
|
##
|
||||||
|
## If config allows, return peer storage and message store
|
||||||
|
## for use elsewhere.
|
||||||
|
|
||||||
|
var
|
||||||
|
sqliteDatabase: SqliteDatabase
|
||||||
|
storeTuple: tuple[pStorage: WakuPeerStorage, mStorage: MessageStore]
|
||||||
|
|
||||||
|
# Setup database connection
|
||||||
|
if conf.dbPath != "":
|
||||||
|
let dbRes = SqliteDatabase.init(conf.dbPath)
|
||||||
|
if dbRes.isErr():
|
||||||
|
warn "failed to init database connection", err = dbRes.error
|
||||||
|
waku_node_errors.inc(labelValues = ["init_db_failure"])
|
||||||
|
return err("failed to init database connection")
|
||||||
|
else:
|
||||||
|
sqliteDatabase = dbRes.value
|
||||||
|
|
||||||
|
|
||||||
|
if not sqliteDatabase.isNil():
|
||||||
|
## Database vacuuming
|
||||||
|
# TODO: Wrap and move this logic to the appropriate module
|
||||||
|
let
|
||||||
|
pageSize = ?sqliteDatabase.getPageSize()
|
||||||
|
pageCount = ?sqliteDatabase.getPageCount()
|
||||||
|
freelistCount = ?sqliteDatabase.getFreelistCount()
|
||||||
|
|
||||||
|
debug "sqlite database page stats", pageSize=pageSize, pages=pageCount, freePages=freelistCount
|
||||||
|
|
||||||
|
# TODO: Run vacuuming conditionally based on database page stats
|
||||||
|
if conf.dbVacuum and (pageCount > 0 and freelistCount > 0):
|
||||||
|
debug "starting sqlite database vacuuming"
|
||||||
|
|
||||||
|
let resVacuum = sqliteDatabase.vacuum()
|
||||||
|
if resVacuum.isErr():
|
||||||
|
return err("failed to execute vacuum: " & resVacuum.error())
|
||||||
|
|
||||||
|
debug "finished sqlite database vacuuming"
|
||||||
|
|
||||||
|
sqliteDatabase.runMigrations(conf)
|
||||||
|
|
||||||
|
|
||||||
|
if conf.persistPeers:
|
||||||
|
let res = WakuPeerStorage.new(sqliteDatabase)
|
||||||
|
if res.isErr():
|
||||||
|
warn "failed to init peer store", err = res.error
|
||||||
|
waku_node_errors.inc(labelValues = ["init_store_failure"])
|
||||||
|
else:
|
||||||
|
storeTuple.pStorage = res.value
|
||||||
|
|
||||||
|
if conf.persistMessages:
|
||||||
|
if conf.sqliteStore:
|
||||||
|
debug "setting up sqlite-only store"
|
||||||
|
let res = SqliteStore.init(sqliteDatabase)
|
||||||
|
if res.isErr():
|
||||||
|
warn "failed to init message store", err = res.error
|
||||||
|
waku_node_errors.inc(labelValues = ["init_store_failure"])
|
||||||
|
else:
|
||||||
|
storeTuple.mStorage = res.value
|
||||||
|
elif not sqliteDatabase.isNil():
|
||||||
|
debug "setting up dual message store"
|
||||||
|
let res = DualMessageStore.init(sqliteDatabase, conf.storeCapacity)
|
||||||
|
if res.isErr():
|
||||||
|
warn "failed to init message store", err = res.error
|
||||||
|
waku_node_errors.inc(labelValues = ["init_store_failure"])
|
||||||
|
else:
|
||||||
|
storeTuple.mStorage = res.value
|
||||||
|
else:
|
||||||
|
debug "setting up in-memory store"
|
||||||
|
storeTuple.mStorage = StoreQueueRef.new(conf.storeCapacity)
|
||||||
|
|
||||||
|
ok(storeTuple)
|
||||||
|
|
||||||
|
proc retrieveDynamicBootstrapNodes(conf: WakuNodeConf): SetupResult[seq[RemotePeerInfo]] =
|
||||||
|
|
||||||
|
if conf.dnsDiscovery and conf.dnsDiscoveryUrl != "":
|
||||||
|
# DNS discovery
|
||||||
|
debug "Discovering nodes using Waku DNS discovery", url=conf.dnsDiscoveryUrl
|
||||||
|
|
||||||
|
var nameServers: seq[TransportAddress]
|
||||||
|
for ip in conf.dnsDiscoveryNameServers:
|
||||||
|
nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53
|
||||||
|
|
||||||
|
let dnsResolver = DnsResolver.new(nameServers)
|
||||||
|
|
||||||
|
proc resolver(domain: string): Future[string] {.async, gcsafe.} =
|
||||||
|
trace "resolving", domain=domain
|
||||||
|
let resolved = await dnsResolver.resolveTxt(domain)
|
||||||
|
return resolved[0] # Use only first answer
|
||||||
|
|
||||||
|
var wakuDnsDiscovery = WakuDnsDiscovery.init(conf.dnsDiscoveryUrl,
|
||||||
|
resolver)
|
||||||
|
if wakuDnsDiscovery.isOk:
|
||||||
|
return wakuDnsDiscovery.get().findPeers()
|
||||||
|
.mapErr(proc (e: cstring): string = $e)
|
||||||
|
else:
|
||||||
|
warn "Failed to init Waku DNS discovery"
|
||||||
|
|
||||||
|
debug "No method for retrieving dynamic bootstrap nodes specified."
|
||||||
|
ok(newSeq[RemotePeerInfo]()) # Return an empty seq by default
|
||||||
|
|
||||||
|
proc initNode(conf: WakuNodeConf,
|
||||||
|
pStorage: WakuPeerStorage = nil,
|
||||||
|
dynamicBootstrapNodes: openArray[RemotePeerInfo] = @[]): SetupResult[WakuNode] =
|
||||||
|
|
||||||
|
## Setup a basic Waku v2 node based on a supplied configuration
|
||||||
|
## file. Optionally include persistent peer storage.
|
||||||
|
## No protocols are mounted yet.
|
||||||
|
|
||||||
|
var dnsResolver: DnsResolver
|
||||||
|
if conf.dnsAddrs:
|
||||||
|
# Support for DNS multiaddrs
|
||||||
|
var nameServers: seq[TransportAddress]
|
||||||
|
for ip in conf.dnsAddrsNameServers:
|
||||||
|
nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53
|
||||||
|
|
||||||
|
dnsResolver = DnsResolver.new(nameServers)
|
||||||
|
|
||||||
|
let
|
||||||
|
## `udpPort` is only supplied to satisfy underlying APIs but is not
|
||||||
|
## actually a supported transport for libp2p traffic.
|
||||||
|
udpPort = conf.tcpPort
|
||||||
|
(extIp, extTcpPort, extUdpPort) = setupNat(conf.nat,
|
||||||
|
clientId,
|
||||||
|
Port(uint16(conf.tcpPort) + conf.portsShift),
|
||||||
|
Port(uint16(udpPort) + conf.portsShift))
|
||||||
|
|
||||||
|
dns4DomainName = if conf.dns4DomainName != "": some(conf.dns4DomainName)
|
||||||
|
else: none(string)
|
||||||
|
|
||||||
|
discv5UdpPort = if conf.discv5Discovery: some(Port(uint16(conf.discv5UdpPort) + conf.portsShift))
|
||||||
|
else: none(Port)
|
||||||
|
|
||||||
|
## @TODO: the NAT setup assumes a manual port mapping configuration if extIp config is set. This probably
|
||||||
|
## implies adding manual config item for extPort as well. The following heuristic assumes that, in absence of manual
|
||||||
|
## config, the external port is the same as the bind port.
|
||||||
|
extPort = if (extIp.isSome() or dns4DomainName.isSome()) and extTcpPort.isNone():
|
||||||
|
some(Port(uint16(conf.tcpPort) + conf.portsShift))
|
||||||
|
else:
|
||||||
|
extTcpPort
|
||||||
|
|
||||||
|
wakuFlags = initWakuFlags(conf.lightpush,
|
||||||
|
conf.filter,
|
||||||
|
conf.store,
|
||||||
|
conf.relay)
|
||||||
|
|
||||||
|
var node: WakuNode
|
||||||
|
try:
|
||||||
|
node = WakuNode.new(conf.nodekey,
|
||||||
|
conf.listenAddress, Port(uint16(conf.tcpPort) + conf.portsShift),
|
||||||
|
extIp, extPort,
|
||||||
|
pStorage,
|
||||||
|
conf.maxConnections.int,
|
||||||
|
Port(uint16(conf.websocketPort) + conf.portsShift),
|
||||||
|
conf.websocketSupport,
|
||||||
|
conf.websocketSecureSupport,
|
||||||
|
conf.websocketSecureKeyPath,
|
||||||
|
conf.websocketSecureCertPath,
|
||||||
|
some(wakuFlags),
|
||||||
|
dnsResolver,
|
||||||
|
conf.relayPeerExchange, # We send our own signed peer record when peer exchange enabled
|
||||||
|
dns4DomainName,
|
||||||
|
discv5UdpPort
|
||||||
|
)
|
||||||
|
except:
|
||||||
|
return err("failed to create waku node instance: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
if conf.discv5Discovery:
|
||||||
|
let
|
||||||
|
discoveryConfig = DiscoveryConfig.init(
|
||||||
|
conf.discv5TableIpLimit, conf.discv5BucketIpLimit, conf.discv5BitsPerHop)
|
||||||
|
|
||||||
|
# select dynamic bootstrap nodes that have an ENR containing a udp port.
|
||||||
|
# Discv5 only supports UDP https://github.com/ethereum/devp2p/blob/master/discv5/discv5-theory.md)
|
||||||
|
var discv5BootstrapEnrs: seq[enr.Record]
|
||||||
|
for n in dynamicBootstrapNodes:
|
||||||
|
if n.enr.isSome():
|
||||||
|
let
|
||||||
|
enr = n.enr.get()
|
||||||
|
tenrRes = enr.toTypedRecord()
|
||||||
|
if tenrRes.isOk() and (tenrRes.get().udp.isSome() or tenrRes.get().udp6.isSome()):
|
||||||
|
discv5BootstrapEnrs.add(enr)
|
||||||
|
|
||||||
|
# parse enrURIs from the configuration and add the resulting ENRs to the discv5BootstrapEnrs seq
|
||||||
|
for enrUri in conf.discv5BootstrapNodes:
|
||||||
|
addBootstrapNode(enrUri, discv5BootstrapEnrs)
|
||||||
|
|
||||||
|
node.wakuDiscv5 = WakuDiscoveryV5.new(
|
||||||
|
extIP, extPort, discv5UdpPort,
|
||||||
|
conf.listenAddress,
|
||||||
|
discv5UdpPort.get(),
|
||||||
|
discv5BootstrapEnrs,
|
||||||
|
conf.discv5EnrAutoUpdate,
|
||||||
|
keys.PrivateKey(conf.nodekey.skkey),
|
||||||
|
wakuFlags,
|
||||||
|
[], # Empty enr fields, for now
|
||||||
|
node.rng,
|
||||||
|
discoveryConfig
|
||||||
|
)
|
||||||
|
|
||||||
|
ok(node)
|
||||||
|
|
||||||
|
proc setupProtocols(node: WakuNode, conf: WakuNodeConf, mStorage: MessageStore): Future[SetupResult[void]] {.async.} =
|
||||||
|
## Setup configured protocols on an existing Waku v2 node.
|
||||||
|
## Optionally include persistent message storage.
|
||||||
|
## No protocols are started yet.
|
||||||
|
|
||||||
|
# Mount relay on all nodes
|
||||||
|
var peerExchangeHandler = none(RoutingRecordsHandler)
|
||||||
|
if conf.relayPeerExchange:
|
||||||
|
proc handlePeerExchange(peer: PeerId, topic: string,
|
||||||
|
peers: seq[RoutingRecordsPair]) {.gcsafe, raises: [Defect].} =
|
||||||
|
## Handle peers received via gossipsub peer exchange
|
||||||
|
# TODO: Only consider peers on pubsub topics we subscribe to
|
||||||
|
let exchangedPeers = peers.filterIt(it.record.isSome()) # only peers with populated records
|
||||||
|
.mapIt(toRemotePeerInfo(it.record.get()))
|
||||||
|
|
||||||
|
debug "connecting to exchanged peers", src=peer, topic=topic, numPeers=exchangedPeers.len
|
||||||
|
|
||||||
|
# asyncSpawn, as we don't want to block here
|
||||||
|
asyncSpawn node.connectToNodes(exchangedPeers, "peer exchange")
|
||||||
|
|
||||||
|
peerExchangeHandler = some(handlePeerExchange)
|
||||||
|
|
||||||
|
if conf.relay:
|
||||||
|
try:
|
||||||
|
let pubsubTopics = conf.topics.split(" ")
|
||||||
|
await mountRelay(node, pubsubTopics, peerExchangeHandler = peerExchangeHandler)
|
||||||
|
except:
|
||||||
|
return err("failed to mount waku relay protocol: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
|
||||||
|
# Keepalive mounted on all nodes
|
||||||
|
try:
|
||||||
|
await mountLibp2pPing(node)
|
||||||
|
except:
|
||||||
|
return err("failed to mount libp2p ping protocol: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
when defined(rln) or defined(rlnzerokit):
|
||||||
|
if conf.rlnRelay:
|
||||||
|
try:
|
||||||
|
let res = node.mountRlnRelay(conf)
|
||||||
|
if res.isErr():
|
||||||
|
return err("failed to mount waku RLN relay protocol: " & res.error)
|
||||||
|
except:
|
||||||
|
return err("failed to mount waku RLN relay protocol: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
if conf.swap:
|
||||||
|
try:
|
||||||
|
await mountSwap(node)
|
||||||
|
# TODO: Set swap peer, for now should be same as store peer
|
||||||
|
except:
|
||||||
|
return err("failed to mount waku swap protocol: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
# Store setup
|
||||||
|
if (conf.storenode != "") or (conf.store):
|
||||||
|
let retentionPolicy = if conf.sqliteStore: TimeRetentionPolicy.init(conf.sqliteRetentionTime)
|
||||||
|
else: CapacityRetentionPolicy.init(conf.storeCapacity)
|
||||||
|
|
||||||
|
try:
|
||||||
|
await mountStore(node, mStorage, retentionPolicy=some(retentionPolicy))
|
||||||
|
except:
|
||||||
|
return err("failed to mount waku store protocol: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
executeMessageRetentionPolicy(node)
|
||||||
|
startMessageRetentionPolicyPeriodicTask(node, interval=MessageStoreDefaultRetentionPolicyInterval)
|
||||||
|
|
||||||
|
if conf.storenode != "":
|
||||||
|
try:
|
||||||
|
setStorePeer(node, conf.storenode)
|
||||||
|
except:
|
||||||
|
return err("failed to set node waku store peer: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
# NOTE Must be mounted after relay
|
||||||
|
if (conf.lightpushnode != "") or (conf.lightpush):
|
||||||
|
try:
|
||||||
|
await mountLightPush(node)
|
||||||
|
except:
|
||||||
|
return err("failed to mount waku lightpush protocol: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
if conf.lightpushnode != "":
|
||||||
|
try:
|
||||||
|
setLightPushPeer(node, conf.lightpushnode)
|
||||||
|
except:
|
||||||
|
return err("failed to set node waku lightpush peer: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
# Filter setup. NOTE Must be mounted after relay
|
||||||
|
if (conf.filternode != "") or (conf.filter):
|
||||||
|
try:
|
||||||
|
await mountFilter(node, filterTimeout = chronos.seconds(conf.filterTimeout))
|
||||||
|
except:
|
||||||
|
return err("failed to mount waku filter protocol: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
if conf.filternode != "":
|
||||||
|
try:
|
||||||
|
setFilterPeer(node, conf.filternode)
|
||||||
|
except:
|
||||||
|
return err("failed to set node waku filter peer: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
# waku peer exchange setup
|
||||||
|
if (conf.peerExchangeNode != "") or (conf.peerExchange):
|
||||||
|
try:
|
||||||
|
await mountWakuPeerExchange(node)
|
||||||
|
except:
|
||||||
|
return err("failed to mount waku peer-exchange protocol: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
if conf.peerExchangeNode != "":
|
||||||
|
try:
|
||||||
|
setPeerExchangePeer(node, conf.peerExchangeNode)
|
||||||
|
except:
|
||||||
|
return err("failed to set node waku lightpush peer: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
return ok()
|
||||||
|
|
||||||
|
proc startNode(node: WakuNode, conf: WakuNodeConf,
|
||||||
|
dynamicBootstrapNodes: seq[RemotePeerInfo] = @[]): Future[SetupResult[void]] {.async.} =
|
||||||
|
## Start a configured node and all mounted protocols.
|
||||||
|
## Resume history, connect to static nodes and start
|
||||||
|
## keep-alive, if configured.
|
||||||
|
|
||||||
|
# Start Waku v2 node
|
||||||
|
try:
|
||||||
|
await node.start()
|
||||||
|
except:
|
||||||
|
return err("failed to start waku node: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
# Start discv5 and connect to discovered nodes
|
||||||
|
if conf.discv5Discovery:
|
||||||
|
try:
|
||||||
|
if not await node.startDiscv5():
|
||||||
|
error "could not start Discovery v5"
|
||||||
|
except:
|
||||||
|
return err("failed to start waku discovery v5: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
|
||||||
|
# Resume historical messages, this has to be called after the node has been started
|
||||||
|
if conf.store and conf.persistMessages:
|
||||||
|
try:
|
||||||
|
await node.resume()
|
||||||
|
except:
|
||||||
|
return err("failed to resume messages history: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
# Connect to configured static nodes
|
||||||
|
if conf.staticnodes.len > 0:
|
||||||
|
try:
|
||||||
|
await connectToNodes(node, conf.staticnodes, "static")
|
||||||
|
except:
|
||||||
|
return err("failed to connect to static nodes: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
if dynamicBootstrapNodes.len > 0:
|
||||||
|
info "Connecting to dynamic bootstrap peers"
|
||||||
|
try:
|
||||||
|
await connectToNodes(node, dynamicBootstrapNodes, "dynamic bootstrap")
|
||||||
|
except:
|
||||||
|
return err("failed to connect to dynamic bootstrap nodes: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
|
||||||
|
# retrieve and connect to peer exchange peers
|
||||||
|
if conf.peerExchangeNode != "":
|
||||||
|
info "Retrieving peer info via peer exchange protocol"
|
||||||
|
let desiredOutDegree = node.wakuRelay.parameters.d.uint64()
|
||||||
|
try:
|
||||||
|
discard await node.wakuPeerExchange.request(desiredOutDegree)
|
||||||
|
except:
|
||||||
|
return err("failed to retrieve peer info via peer exchange protocol: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
# Start keepalive, if enabled
|
||||||
|
if conf.keepAlive:
|
||||||
|
node.startKeepalive()
|
||||||
|
|
||||||
|
return ok()
|
||||||
|
|
||||||
|
proc startExternal(node: WakuNode, conf: WakuNodeConf): SetupResult[void] =
|
||||||
|
## Start configured external interfaces and monitoring tools
|
||||||
|
## on a Waku v2 node, including the RPC API, REST API and metrics
|
||||||
|
## monitoring ports.
|
||||||
|
|
||||||
|
if conf.rpc:
|
||||||
|
try:
|
||||||
|
startRpcServer(node, conf.rpcAddress, Port(conf.rpcPort + conf.portsShift), conf)
|
||||||
|
except:
|
||||||
|
return err("failed to start the json-rpc server: " & getCurrentExceptionMsg())
|
||||||
|
|
||||||
|
if conf.rest:
|
||||||
|
startRestServer(node, conf.restAddress, Port(conf.restPort + conf.portsShift), conf)
|
||||||
|
|
||||||
|
if conf.metricsLogging:
|
||||||
|
startMetricsLog()
|
||||||
|
|
||||||
|
if conf.metricsServer:
|
||||||
|
startMetricsServer(conf.metricsServerAddress,
|
||||||
|
Port(conf.metricsServerPort + conf.portsShift))
|
||||||
|
|
||||||
|
ok()
|
||||||
|
|
||||||
|
|
||||||
{.pop.} # @TODO confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError
|
{.pop.} # @TODO confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError
|
||||||
when isMainModule:
|
when isMainModule:
|
||||||
## Node setup happens in 6 phases:
|
## Node setup happens in 6 phases:
|
||||||
@ -59,354 +463,6 @@ when isMainModule:
|
|||||||
## 5. Start monitoring tools and external interfaces
|
## 5. Start monitoring tools and external interfaces
|
||||||
## 6. Setup graceful shutdown hooks
|
## 6. Setup graceful shutdown hooks
|
||||||
|
|
||||||
###################
|
|
||||||
# Setup functions #
|
|
||||||
###################
|
|
||||||
|
|
||||||
type SetupResult[T] = Result[T, string]
|
|
||||||
|
|
||||||
# 1/7 Setup storage
|
|
||||||
proc setupStorage(conf: WakuNodeConf):
|
|
||||||
SetupResult[tuple[pStorage: WakuPeerStorage, mStorage: MessageStore]] =
|
|
||||||
|
|
||||||
## Setup a SQLite Database for a wakunode based on a supplied
|
|
||||||
## configuration file and perform all necessary migration.
|
|
||||||
##
|
|
||||||
## If config allows, return peer storage and message store
|
|
||||||
## for use elsewhere.
|
|
||||||
|
|
||||||
var
|
|
||||||
sqliteDatabase: SqliteDatabase
|
|
||||||
storeTuple: tuple[pStorage: WakuPeerStorage, mStorage: MessageStore]
|
|
||||||
|
|
||||||
# Setup database connection
|
|
||||||
if conf.dbPath != "":
|
|
||||||
let dbRes = SqliteDatabase.init(conf.dbPath)
|
|
||||||
if dbRes.isErr():
|
|
||||||
warn "failed to init database connection", err = dbRes.error
|
|
||||||
waku_node_errors.inc(labelValues = ["init_db_failure"])
|
|
||||||
return err("failed to init database connection")
|
|
||||||
else:
|
|
||||||
sqliteDatabase = dbRes.value
|
|
||||||
|
|
||||||
|
|
||||||
if not sqliteDatabase.isNil():
|
|
||||||
## Database vacuuming
|
|
||||||
# TODO: Wrap and move this logic to the appropriate module
|
|
||||||
let
|
|
||||||
pageSize = ?sqliteDatabase.getPageSize()
|
|
||||||
pageCount = ?sqliteDatabase.getPageCount()
|
|
||||||
freelistCount = ?sqliteDatabase.getFreelistCount()
|
|
||||||
|
|
||||||
debug "sqlite database page stats", pageSize=pageSize, pages=pageCount, freePages=freelistCount
|
|
||||||
|
|
||||||
# TODO: Run vacuuming conditionally based on database page stats
|
|
||||||
if conf.dbVacuum and (pageCount > 0 and freelistCount > 0):
|
|
||||||
debug "starting sqlite database vacuuming"
|
|
||||||
|
|
||||||
let resVacuum = sqliteDatabase.vacuum()
|
|
||||||
if resVacuum.isErr():
|
|
||||||
return err("failed to execute vacuum: " & resVacuum.error())
|
|
||||||
|
|
||||||
debug "finished sqlite database vacuuming"
|
|
||||||
|
|
||||||
sqliteDatabase.runMigrations(conf)
|
|
||||||
|
|
||||||
|
|
||||||
if conf.persistPeers:
|
|
||||||
let res = WakuPeerStorage.new(sqliteDatabase)
|
|
||||||
if res.isErr():
|
|
||||||
warn "failed to init peer store", err = res.error
|
|
||||||
waku_node_errors.inc(labelValues = ["init_store_failure"])
|
|
||||||
else:
|
|
||||||
storeTuple.pStorage = res.value
|
|
||||||
|
|
||||||
if conf.persistMessages:
|
|
||||||
if conf.sqliteStore:
|
|
||||||
debug "setting up sqlite-only store"
|
|
||||||
let res = SqliteStore.init(sqliteDatabase)
|
|
||||||
if res.isErr():
|
|
||||||
warn "failed to init message store", err = res.error
|
|
||||||
waku_node_errors.inc(labelValues = ["init_store_failure"])
|
|
||||||
else:
|
|
||||||
storeTuple.mStorage = res.value
|
|
||||||
elif not sqliteDatabase.isNil():
|
|
||||||
debug "setting up dual message store"
|
|
||||||
let res = DualMessageStore.init(sqliteDatabase, conf.storeCapacity)
|
|
||||||
if res.isErr():
|
|
||||||
warn "failed to init message store", err = res.error
|
|
||||||
waku_node_errors.inc(labelValues = ["init_store_failure"])
|
|
||||||
else:
|
|
||||||
storeTuple.mStorage = res.value
|
|
||||||
else:
|
|
||||||
debug "setting up in-memory store"
|
|
||||||
storeTuple.mStorage = StoreQueueRef.new(conf.storeCapacity)
|
|
||||||
|
|
||||||
ok(storeTuple)
|
|
||||||
|
|
||||||
# 2/7 Retrieve dynamic bootstrap nodes
|
|
||||||
proc retrieveDynamicBootstrapNodes(conf: WakuNodeConf): SetupResult[seq[RemotePeerInfo]] =
|
|
||||||
|
|
||||||
if conf.dnsDiscovery and conf.dnsDiscoveryUrl != "":
|
|
||||||
# DNS discovery
|
|
||||||
debug "Discovering nodes using Waku DNS discovery", url=conf.dnsDiscoveryUrl
|
|
||||||
|
|
||||||
var nameServers: seq[TransportAddress]
|
|
||||||
for ip in conf.dnsDiscoveryNameServers:
|
|
||||||
nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53
|
|
||||||
|
|
||||||
let dnsResolver = DnsResolver.new(nameServers)
|
|
||||||
|
|
||||||
proc resolver(domain: string): Future[string] {.async, gcsafe.} =
|
|
||||||
trace "resolving", domain=domain
|
|
||||||
let resolved = await dnsResolver.resolveTxt(domain)
|
|
||||||
return resolved[0] # Use only first answer
|
|
||||||
|
|
||||||
var wakuDnsDiscovery = WakuDnsDiscovery.init(conf.dnsDiscoveryUrl,
|
|
||||||
resolver)
|
|
||||||
if wakuDnsDiscovery.isOk:
|
|
||||||
return wakuDnsDiscovery.get().findPeers()
|
|
||||||
.mapErr(proc (e: cstring): string = $e)
|
|
||||||
else:
|
|
||||||
warn "Failed to init Waku DNS discovery"
|
|
||||||
|
|
||||||
debug "No method for retrieving dynamic bootstrap nodes specified."
|
|
||||||
ok(newSeq[RemotePeerInfo]()) # Return an empty seq by default
|
|
||||||
|
|
||||||
# 3/7 Initialize node
|
|
||||||
proc initNode(conf: WakuNodeConf,
|
|
||||||
pStorage: WakuPeerStorage = nil,
|
|
||||||
dynamicBootstrapNodes: openArray[RemotePeerInfo] = @[]): SetupResult[WakuNode] =
|
|
||||||
|
|
||||||
## Setup a basic Waku v2 node based on a supplied configuration
|
|
||||||
## file. Optionally include persistent peer storage.
|
|
||||||
## No protocols are mounted yet.
|
|
||||||
|
|
||||||
var dnsResolver: DnsResolver
|
|
||||||
if conf.dnsAddrs:
|
|
||||||
# Support for DNS multiaddrs
|
|
||||||
var nameServers: seq[TransportAddress]
|
|
||||||
for ip in conf.dnsAddrsNameServers:
|
|
||||||
nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53
|
|
||||||
|
|
||||||
dnsResolver = DnsResolver.new(nameServers)
|
|
||||||
|
|
||||||
let
|
|
||||||
## `udpPort` is only supplied to satisfy underlying APIs but is not
|
|
||||||
## actually a supported transport for libp2p traffic.
|
|
||||||
udpPort = conf.tcpPort
|
|
||||||
(extIp, extTcpPort, extUdpPort) = setupNat(conf.nat,
|
|
||||||
clientId,
|
|
||||||
Port(uint16(conf.tcpPort) + conf.portsShift),
|
|
||||||
Port(uint16(udpPort) + conf.portsShift))
|
|
||||||
|
|
||||||
dns4DomainName = if conf.dns4DomainName != "": some(conf.dns4DomainName)
|
|
||||||
else: none(string)
|
|
||||||
|
|
||||||
discv5UdpPort = if conf.discv5Discovery: some(Port(uint16(conf.discv5UdpPort) + conf.portsShift))
|
|
||||||
else: none(Port)
|
|
||||||
|
|
||||||
## @TODO: the NAT setup assumes a manual port mapping configuration if extIp config is set. This probably
|
|
||||||
## implies adding manual config item for extPort as well. The following heuristic assumes that, in absence of manual
|
|
||||||
## config, the external port is the same as the bind port.
|
|
||||||
extPort = if (extIp.isSome() or dns4DomainName.isSome()) and extTcpPort.isNone():
|
|
||||||
some(Port(uint16(conf.tcpPort) + conf.portsShift))
|
|
||||||
else:
|
|
||||||
extTcpPort
|
|
||||||
|
|
||||||
wakuFlags = initWakuFlags(conf.lightpush,
|
|
||||||
conf.filter,
|
|
||||||
conf.store,
|
|
||||||
conf.relay)
|
|
||||||
|
|
||||||
node = WakuNode.new(conf.nodekey,
|
|
||||||
conf.listenAddress, Port(uint16(conf.tcpPort) + conf.portsShift),
|
|
||||||
extIp, extPort,
|
|
||||||
pStorage,
|
|
||||||
conf.maxConnections.int,
|
|
||||||
Port(uint16(conf.websocketPort) + conf.portsShift),
|
|
||||||
conf.websocketSupport,
|
|
||||||
conf.websocketSecureSupport,
|
|
||||||
conf.websocketSecureKeyPath,
|
|
||||||
conf.websocketSecureCertPath,
|
|
||||||
some(wakuFlags),
|
|
||||||
dnsResolver,
|
|
||||||
conf.relayPeerExchange, # We send our own signed peer record when peer exchange enabled
|
|
||||||
dns4DomainName,
|
|
||||||
discv5UdpPort
|
|
||||||
)
|
|
||||||
|
|
||||||
if conf.discv5Discovery:
|
|
||||||
let
|
|
||||||
discoveryConfig = DiscoveryConfig.init(
|
|
||||||
conf.discv5TableIpLimit, conf.discv5BucketIpLimit, conf.discv5BitsPerHop)
|
|
||||||
|
|
||||||
# select dynamic bootstrap nodes that have an ENR containing a udp port.
|
|
||||||
# Discv5 only supports UDP https://github.com/ethereum/devp2p/blob/master/discv5/discv5-theory.md)
|
|
||||||
var discv5BootstrapEnrs: seq[enr.Record]
|
|
||||||
for n in dynamicBootstrapNodes:
|
|
||||||
if n.enr.isSome():
|
|
||||||
let
|
|
||||||
enr = n.enr.get()
|
|
||||||
tenrRes = enr.toTypedRecord()
|
|
||||||
if tenrRes.isOk() and (tenrRes.get().udp.isSome() or tenrRes.get().udp6.isSome()):
|
|
||||||
discv5BootstrapEnrs.add(enr)
|
|
||||||
|
|
||||||
# parse enrURIs from the configuration and add the resulting ENRs to the discv5BootstrapEnrs seq
|
|
||||||
for enrUri in conf.discv5BootstrapNodes:
|
|
||||||
addBootstrapNode(enrUri, discv5BootstrapEnrs)
|
|
||||||
|
|
||||||
node.wakuDiscv5 = WakuDiscoveryV5.new(
|
|
||||||
extIP, extPort, discv5UdpPort,
|
|
||||||
conf.listenAddress,
|
|
||||||
discv5UdpPort.get(),
|
|
||||||
discv5BootstrapEnrs,
|
|
||||||
conf.discv5EnrAutoUpdate,
|
|
||||||
keys.PrivateKey(conf.nodekey.skkey),
|
|
||||||
wakuFlags,
|
|
||||||
[], # Empty enr fields, for now
|
|
||||||
node.rng,
|
|
||||||
discoveryConfig
|
|
||||||
)
|
|
||||||
|
|
||||||
ok(node)
|
|
||||||
|
|
||||||
# 4/7 Mount and initialize configured protocols
|
|
||||||
proc setupProtocols(node: WakuNode, conf: WakuNodeConf, mStorage: MessageStore): SetupResult[bool] =
|
|
||||||
## Setup configured protocols on an existing Waku v2 node.
|
|
||||||
## Optionally include persistent message storage.
|
|
||||||
## No protocols are started yet.
|
|
||||||
|
|
||||||
# Mount relay on all nodes
|
|
||||||
var peerExchangeHandler = none(RoutingRecordsHandler)
|
|
||||||
if conf.relayPeerExchange:
|
|
||||||
proc handlePeerExchange(peer: PeerId, topic: string,
|
|
||||||
peers: seq[RoutingRecordsPair]) {.gcsafe, raises: [Defect].} =
|
|
||||||
## Handle peers received via gossipsub peer exchange
|
|
||||||
# TODO: Only consider peers on pubsub topics we subscribe to
|
|
||||||
let exchangedPeers = peers.filterIt(it.record.isSome()) # only peers with populated records
|
|
||||||
.mapIt(toRemotePeerInfo(it.record.get()))
|
|
||||||
|
|
||||||
debug "connecting to exchanged peers", src=peer, topic=topic, numPeers=exchangedPeers.len
|
|
||||||
|
|
||||||
# asyncSpawn, as we don't want to block here
|
|
||||||
asyncSpawn node.connectToNodes(exchangedPeers, "peer exchange")
|
|
||||||
|
|
||||||
peerExchangeHandler = some(handlePeerExchange)
|
|
||||||
|
|
||||||
if conf.relay:
|
|
||||||
waitFor mountRelay(node,
|
|
||||||
conf.topics.split(" "),
|
|
||||||
peerExchangeHandler = peerExchangeHandler)
|
|
||||||
|
|
||||||
# Keepalive mounted on all nodes
|
|
||||||
waitFor mountLibp2pPing(node)
|
|
||||||
|
|
||||||
when defined(rln) or defined(rlnzerokit):
|
|
||||||
if conf.rlnRelay:
|
|
||||||
let res = node.mountRlnRelay(conf)
|
|
||||||
if res.isErr():
|
|
||||||
debug "could not mount WakuRlnRelay"
|
|
||||||
|
|
||||||
if conf.swap:
|
|
||||||
waitFor mountSwap(node)
|
|
||||||
# TODO Set swap peer, for now should be same as store peer
|
|
||||||
|
|
||||||
# Store setup
|
|
||||||
if (conf.storenode != "") or (conf.store):
|
|
||||||
let retentionPolicy = if conf.sqliteStore: TimeRetentionPolicy.init(conf.sqliteRetentionTime)
|
|
||||||
else: CapacityRetentionPolicy.init(conf.storeCapacity)
|
|
||||||
waitFor mountStore(node, mStorage, retentionPolicy=some(retentionPolicy))
|
|
||||||
|
|
||||||
executeMessageRetentionPolicy(node)
|
|
||||||
startMessageRetentionPolicyPeriodicTask(node, interval=MessageStoreDefaultRetentionPolicyInterval)
|
|
||||||
|
|
||||||
if conf.storenode != "":
|
|
||||||
setStorePeer(node, conf.storenode)
|
|
||||||
|
|
||||||
# NOTE Must be mounted after relay
|
|
||||||
if (conf.lightpushnode != "") or (conf.lightpush):
|
|
||||||
waitFor mountLightPush(node)
|
|
||||||
|
|
||||||
if conf.lightpushnode != "":
|
|
||||||
setLightPushPeer(node, conf.lightpushnode)
|
|
||||||
|
|
||||||
# Filter setup. NOTE Must be mounted after relay
|
|
||||||
if (conf.filternode != "") or (conf.filter):
|
|
||||||
waitFor mountFilter(node, filterTimeout = chronos.seconds(conf.filterTimeout))
|
|
||||||
|
|
||||||
if conf.filternode != "":
|
|
||||||
setFilterPeer(node, conf.filternode)
|
|
||||||
|
|
||||||
# waku peer exchange setup
|
|
||||||
if (conf.peerExchangeNode != "") or (conf.peerExchange):
|
|
||||||
waitFor mountWakuPeerExchange(node)
|
|
||||||
|
|
||||||
if conf.peerExchangeNode != "":
|
|
||||||
setPeerExchangePeer(node, conf.peerExchangeNode)
|
|
||||||
|
|
||||||
ok(true) # Success
|
|
||||||
|
|
||||||
# 5/7 Start node and mounted protocols
|
|
||||||
proc startNode(node: WakuNode, conf: WakuNodeConf,
|
|
||||||
dynamicBootstrapNodes: seq[RemotePeerInfo] = @[]): SetupResult[bool] =
|
|
||||||
## Start a configured node and all mounted protocols.
|
|
||||||
## Resume history, connect to static nodes and start
|
|
||||||
## keep-alive, if configured.
|
|
||||||
|
|
||||||
# Start Waku v2 node
|
|
||||||
waitFor node.start()
|
|
||||||
|
|
||||||
# Start discv5 and connect to discovered nodes
|
|
||||||
if conf.discv5Discovery:
|
|
||||||
if not waitFor node.startDiscv5():
|
|
||||||
error "could not start Discovery v5"
|
|
||||||
|
|
||||||
# Resume historical messages, this has to be called after the node has been started
|
|
||||||
if conf.store and conf.persistMessages:
|
|
||||||
waitFor node.resume()
|
|
||||||
|
|
||||||
# Connect to configured static nodes
|
|
||||||
if conf.staticnodes.len > 0:
|
|
||||||
waitFor connectToNodes(node, conf.staticnodes, "static")
|
|
||||||
|
|
||||||
if dynamicBootstrapNodes.len > 0:
|
|
||||||
info "Connecting to dynamic bootstrap peers"
|
|
||||||
waitFor connectToNodes(node, dynamicBootstrapNodes, "dynamic bootstrap")
|
|
||||||
|
|
||||||
# retrieve and connect to peer exchange peers
|
|
||||||
if conf.peerExchangeNode != "":
|
|
||||||
info "Retrieving peer info via peer exchange protocol"
|
|
||||||
let desiredOutDegree = node.wakuRelay.parameters.d.uint64()
|
|
||||||
discard waitFor node.wakuPeerExchange.request(desiredOutDegree)
|
|
||||||
|
|
||||||
# Start keepalive, if enabled
|
|
||||||
if conf.keepAlive:
|
|
||||||
node.startKeepalive()
|
|
||||||
|
|
||||||
ok(true) # Success
|
|
||||||
|
|
||||||
# 6/7 Start monitoring tools and external interfaces
|
|
||||||
proc startExternal(node: WakuNode, conf: WakuNodeConf): SetupResult[bool] =
|
|
||||||
## Start configured external interfaces and monitoring tools
|
|
||||||
## on a Waku v2 node, including the RPC API, REST API and metrics
|
|
||||||
## monitoring ports.
|
|
||||||
|
|
||||||
if conf.rpc:
|
|
||||||
startRpcServer(node, conf.rpcAddress, Port(conf.rpcPort + conf.portsShift), conf)
|
|
||||||
|
|
||||||
if conf.rest:
|
|
||||||
startRestServer(node, conf.restAddress, Port(conf.restPort + conf.portsShift), conf)
|
|
||||||
|
|
||||||
if conf.metricsLogging:
|
|
||||||
startMetricsLog()
|
|
||||||
|
|
||||||
if conf.metricsServer:
|
|
||||||
startMetricsServer(conf.metricsServerAddress,
|
|
||||||
Port(conf.metricsServerPort + conf.portsShift))
|
|
||||||
|
|
||||||
ok(true) # Success
|
|
||||||
|
|
||||||
{.push warning[ProveInit]: off.}
|
{.push warning[ProveInit]: off.}
|
||||||
let conf = try:
|
let conf = try:
|
||||||
WakuNodeConf.load(
|
WakuNodeConf.load(
|
||||||
@ -414,8 +470,8 @@ when isMainModule:
|
|||||||
if conf.configFile.isSome:
|
if conf.configFile.isSome:
|
||||||
sources.addConfigFile(Toml, conf.configFile.get)
|
sources.addConfigFile(Toml, conf.configFile.get)
|
||||||
)
|
)
|
||||||
except CatchableError as err:
|
except CatchableError:
|
||||||
error "Failure while loading the configuration: \n", err_msg=err.msg
|
error "Failure while loading the configuration: \n", error=getCurrentExceptionMsg()
|
||||||
quit 1 # if we don't leave here, the initialization of conf does not work in the success case
|
quit 1 # if we don't leave here, the initialization of conf does not work in the success case
|
||||||
{.pop.}
|
{.pop.}
|
||||||
|
|
||||||
@ -442,9 +498,8 @@ when isMainModule:
|
|||||||
mStorage: MessageStore
|
mStorage: MessageStore
|
||||||
|
|
||||||
let setupStorageRes = setupStorage(conf)
|
let setupStorageRes = setupStorage(conf)
|
||||||
|
if setupStorageRes.isErr():
|
||||||
if setupStorageRes.isErr:
|
error "1/7 Setting up storage failed. Continuing without storage.", error=setupStorageRes.error
|
||||||
error "1/7 Setting up storage failed. Continuing without storage."
|
|
||||||
else:
|
else:
|
||||||
(pStorage, mStorage) = setupStorageRes.get()
|
(pStorage, mStorage) = setupStorageRes.get()
|
||||||
|
|
||||||
@ -452,41 +507,37 @@ when isMainModule:
|
|||||||
|
|
||||||
var dynamicBootstrapNodes: seq[RemotePeerInfo]
|
var dynamicBootstrapNodes: seq[RemotePeerInfo]
|
||||||
let dynamicBootstrapNodesRes = retrieveDynamicBootstrapNodes(conf)
|
let dynamicBootstrapNodesRes = retrieveDynamicBootstrapNodes(conf)
|
||||||
if dynamicBootstrapNodesRes.isErr:
|
if dynamicBootstrapNodesRes.isErr():
|
||||||
warn "2/7 Retrieving dynamic bootstrap nodes failed. Continuing without dynamic bootstrap nodes."
|
warn "2/7 Retrieving dynamic bootstrap nodes failed. Continuing without dynamic bootstrap nodes.", error=dynamicBootstrapNodesRes.error
|
||||||
else:
|
else:
|
||||||
dynamicBootstrapNodes = dynamicBootstrapNodesRes.get()
|
dynamicBootstrapNodes = dynamicBootstrapNodesRes.get()
|
||||||
|
|
||||||
debug "3/7 Initializing node"
|
debug "3/7 Initializing node"
|
||||||
|
|
||||||
let initNodeRes = initNode(conf, pStorage, dynamicBootstrapNodes)
|
let initNodeRes = initNode(conf, pStorage, dynamicBootstrapNodes)
|
||||||
|
if initNodeRes.isErr():
|
||||||
if initNodeRes.isErr:
|
error "3/7 Initializing node failed. Quitting.", error=initNodeRes.error
|
||||||
error "3/7 Initializing node failed. Quitting."
|
|
||||||
quit(QuitFailure)
|
quit(QuitFailure)
|
||||||
else:
|
else:
|
||||||
node = initNodeRes.get()
|
node = initNodeRes.get()
|
||||||
|
|
||||||
debug "4/7 Mounting protocols"
|
debug "4/7 Mounting protocols"
|
||||||
|
|
||||||
let setupProtocolsRes = setupProtocols(node, conf, mStorage)
|
let setupProtocolsRes = waitFor setupProtocols(node, conf, mStorage)
|
||||||
|
if setupProtocolsRes.isErr():
|
||||||
if setupProtocolsRes.isErr:
|
error "4/7 Mounting protocols failed. Continuing in current state.", error=setupProtocolsRes.error
|
||||||
error "4/7 Mounting protocols failed. Continuing in current state."
|
|
||||||
|
|
||||||
debug "5/7 Starting node and mounted protocols"
|
debug "5/7 Starting node and mounted protocols"
|
||||||
|
|
||||||
let startNodeRes = startNode(node, conf, dynamicBootstrapNodes)
|
let startNodeRes = waitFor startNode(node, conf, dynamicBootstrapNodes)
|
||||||
|
if startNodeRes.isErr():
|
||||||
if startNodeRes.isErr:
|
error "5/7 Starting node and mounted protocols failed. Continuing in current state.", error=startNodeRes.error
|
||||||
error "5/7 Starting node and mounted protocols failed. Continuing in current state."
|
|
||||||
|
|
||||||
debug "6/7 Starting monitoring and external interfaces"
|
debug "6/7 Starting monitoring and external interfaces"
|
||||||
|
|
||||||
let startExternalRes = startExternal(node, conf)
|
let startExternalRes = startExternal(node, conf)
|
||||||
|
if startExternalRes.isErr():
|
||||||
if startExternalRes.isErr:
|
error "6/7 Starting monitoring and external interfaces failed. Continuing in current state.", error=startExternalRes.error
|
||||||
error "6/7 Starting monitoring and external interfaces failed. Continuing in current state."
|
|
||||||
|
|
||||||
debug "7/7 Setting up shutdown hooks"
|
debug "7/7 Setting up shutdown hooks"
|
||||||
|
|
||||||
@ -514,6 +565,6 @@ when isMainModule:
|
|||||||
|
|
||||||
c_signal(ansi_c.SIGTERM, handleSigterm)
|
c_signal(ansi_c.SIGTERM, handleSigterm)
|
||||||
|
|
||||||
debug "Node setup complete"
|
info "Node setup complete"
|
||||||
|
|
||||||
runForever()
|
runForever()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user