mirror of https://github.com/waku-org/nwaku.git
refactor: simplify app.nim and move discovery items to appropriate modules (#2657)
This commit is contained in:
parent
e03d1165e6
commit
404810aa8d
|
@ -1,33 +0,0 @@
|
|||
when (NimMajor, NimMinor) < (1, 4):
|
||||
{.push raises: [Defect].}
|
||||
else:
|
||||
{.push raises: [].}
|
||||
|
||||
import waku_discv5, ../../waku_core
|
||||
|
||||
## This module contains the logic needed to discover other peers and
|
||||
## also to make the "self" node discoverable by other peers.
|
||||
|
||||
type DiscoveryManager* = object
|
||||
wakuDiscv5*: Option[WakuDiscoveryV5]
|
||||
dynamicBootstrapNodes*: seq[RemotePeerInfo]
|
||||
|
||||
#[
|
||||
TODO: in future PRs we will have:
|
||||
|
||||
App* = object
|
||||
version: string
|
||||
conf: WakuNodeConf
|
||||
rng: ref HmacDrbgContext
|
||||
key: crypto.PrivateKey
|
||||
|
||||
## in future PRs, the following two items will be encapsulated by 'DiscoveryManager'
|
||||
wakuDiscv5: Option[WakuDiscoveryV5] <-- this will get removed
|
||||
dynamicBootstrapNodes: seq[RemotePeerInfo] <-- this will get removed
|
||||
|
||||
node: WakuNode <-- this will contain a discManager instance
|
||||
|
||||
restServer: Option[WakuRestServerRef]
|
||||
metricsServer: Option[MetricsHttpServerRef]
|
||||
|
||||
]#
|
|
@ -14,7 +14,11 @@ import
|
|||
eth/keys as eth_keys,
|
||||
eth/p2p/discoveryv5/node,
|
||||
eth/p2p/discoveryv5/protocol
|
||||
import ../node/peer_manager/peer_manager, ../waku_core, ../waku_enr
|
||||
import
|
||||
../node/peer_manager/peer_manager,
|
||||
../waku_core,
|
||||
../waku_enr,
|
||||
../factory/external_config
|
||||
|
||||
export protocol, waku_enr
|
||||
|
||||
|
@ -244,7 +248,7 @@ proc subscriptionsListener(wd: WakuDiscoveryV5) {.async.} =
|
|||
|
||||
wd.topicSubscriptionQueue.unregister(key)
|
||||
|
||||
proc start*(wd: WakuDiscoveryV5): Future[Result[void, string]] {.async.} =
|
||||
proc start*(wd: WakuDiscoveryV5): Future[Result[void, string]] {.async: (raises: []).} =
|
||||
if wd.listening:
|
||||
return err("already listening")
|
||||
|
||||
|
@ -313,3 +317,42 @@ proc addBootstrapNode*(bootstrapAddr: string, bootstrapEnrs: var seq[enr.Record]
|
|||
return
|
||||
|
||||
bootstrapEnrs.add(enrRes.value)
|
||||
|
||||
proc setupDiscoveryV5*(
|
||||
myENR: enr.Record,
|
||||
nodePeerManager: PeerManager,
|
||||
nodeTopicSubscriptionQueue: AsyncEventQueue[SubscriptionEvent],
|
||||
conf: WakuNodeConf,
|
||||
dynamicBootstrapNodes: seq[RemotePeerInfo],
|
||||
rng: ref HmacDrbgContext,
|
||||
key: crypto.PrivateKey,
|
||||
): WakuDiscoveryV5 =
|
||||
let dynamicBootstrapEnrs =
|
||||
dynamicBootstrapNodes.filterIt(it.hasUdpPort()).mapIt(it.enr.get())
|
||||
|
||||
var discv5BootstrapEnrs: seq[enr.Record]
|
||||
|
||||
# parse enrURIs from the configuration and add the resulting ENRs to the discv5BootstrapEnrs seq
|
||||
for enrUri in conf.discv5BootstrapNodes:
|
||||
addBootstrapNode(enrUri, discv5BootstrapEnrs)
|
||||
|
||||
discv5BootstrapEnrs.add(dynamicBootstrapEnrs)
|
||||
|
||||
let discv5Config = DiscoveryConfig.init(
|
||||
conf.discv5TableIpLimit, conf.discv5BucketIpLimit, conf.discv5BitsPerHop
|
||||
)
|
||||
|
||||
let discv5UdpPort = Port(uint16(conf.discv5UdpPort) + conf.portsShift)
|
||||
|
||||
let discv5Conf = WakuDiscoveryV5Config(
|
||||
discv5Config: some(discv5Config),
|
||||
address: conf.listenAddress,
|
||||
port: discv5UdpPort,
|
||||
privateKey: eth_keys.PrivateKey(key.skkey),
|
||||
bootstrapRecords: discv5BootstrapEnrs,
|
||||
autoupdateRecord: conf.discv5EnrAutoUpdate,
|
||||
)
|
||||
|
||||
WakuDiscoveryV5.new(
|
||||
rng, discv5Conf, some(myENR), some(nodePeerManager), nodeTopicSubscriptionQueue
|
||||
)
|
||||
|
|
|
@ -21,6 +21,7 @@ import
|
|||
libp2p/multiaddress,
|
||||
libp2p/peerid,
|
||||
dnsdisc/client
|
||||
import libp2p/nameresolving/dnsresolver
|
||||
import ../waku_core
|
||||
|
||||
export client
|
||||
|
@ -97,3 +98,35 @@ proc init*(
|
|||
debug "init success"
|
||||
|
||||
return ok(wakuDnsDisc)
|
||||
|
||||
proc retrieveDynamicBootstrapNodes*(
|
||||
dnsDiscovery: bool, dnsDiscoveryUrl: string, dnsDiscoveryNameServers: seq[IpAddress]
|
||||
): Result[seq[RemotePeerInfo], string] =
|
||||
## Retrieve dynamic bootstrap nodes (DNS discovery)
|
||||
|
||||
if dnsDiscovery and dnsDiscoveryUrl != "":
|
||||
# DNS discovery
|
||||
debug "Discovering nodes using Waku DNS discovery", url = dnsDiscoveryUrl
|
||||
|
||||
var nameServers: seq[TransportAddress]
|
||||
for ip in dnsDiscoveryNameServers:
|
||||
nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53
|
||||
|
||||
let dnsResolver = DnsResolver.new(nameServers)
|
||||
|
||||
proc resolver(domain: string): Future[string] {.async, gcsafe.} =
|
||||
trace "resolving", domain = domain
|
||||
let resolved = await dnsResolver.resolveTxt(domain)
|
||||
return resolved[0] # Use only first answer
|
||||
|
||||
var wakuDnsDiscovery = WakuDnsDiscovery.init(dnsDiscoveryUrl, resolver)
|
||||
if wakuDnsDiscovery.isOk():
|
||||
return wakuDnsDiscovery.get().findPeers().mapErr(
|
||||
proc(e: cstring): string =
|
||||
$e
|
||||
)
|
||||
else:
|
||||
warn "Failed to init Waku DNS discovery"
|
||||
|
||||
debug "No method for retrieving dynamic bootstrap nodes specified."
|
||||
ok(newSeq[RemotePeerInfo]()) # Return an empty seq by default
|
||||
|
|
|
@ -11,7 +11,6 @@ import
|
|||
libp2p/wire,
|
||||
libp2p/multicodec,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/nameresolving/dnsresolver,
|
||||
libp2p/protocols/pubsub/gossipsub,
|
||||
libp2p/peerid,
|
||||
eth/keys,
|
||||
|
@ -59,7 +58,7 @@ type
|
|||
rng: ref HmacDrbgContext
|
||||
key: crypto.PrivateKey
|
||||
|
||||
wakuDiscv5*: Option[WakuDiscoveryV5]
|
||||
wakuDiscv5*: WakuDiscoveryV5
|
||||
dynamicBootstrapNodes: seq[RemotePeerInfo]
|
||||
|
||||
node: WakuNode
|
||||
|
@ -75,38 +74,6 @@ func node*(app: App): WakuNode =
|
|||
func version*(app: App): string =
|
||||
app.version
|
||||
|
||||
## Retrieve dynamic bootstrap nodes (DNS discovery)
|
||||
|
||||
proc retrieveDynamicBootstrapNodes*(
|
||||
dnsDiscovery: bool, dnsDiscoveryUrl: string, dnsDiscoveryNameServers: seq[IpAddress]
|
||||
): Result[seq[RemotePeerInfo], string] =
|
||||
if dnsDiscovery and dnsDiscoveryUrl != "":
|
||||
# DNS discovery
|
||||
debug "Discovering nodes using Waku DNS discovery", url = dnsDiscoveryUrl
|
||||
|
||||
var nameServers: seq[TransportAddress]
|
||||
for ip in dnsDiscoveryNameServers:
|
||||
nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53
|
||||
|
||||
let dnsResolver = DnsResolver.new(nameServers)
|
||||
|
||||
proc resolver(domain: string): Future[string] {.async, gcsafe.} =
|
||||
trace "resolving", domain = domain
|
||||
let resolved = await dnsResolver.resolveTxt(domain)
|
||||
return resolved[0] # Use only first answer
|
||||
|
||||
var wakuDnsDiscovery = WakuDnsDiscovery.init(dnsDiscoveryUrl, resolver)
|
||||
if wakuDnsDiscovery.isOk():
|
||||
return wakuDnsDiscovery.get().findPeers().mapErr(
|
||||
proc(e: cstring): string =
|
||||
$e
|
||||
)
|
||||
else:
|
||||
warn "Failed to init Waku DNS discovery"
|
||||
|
||||
debug "No method for retrieving dynamic bootstrap nodes specified."
|
||||
ok(newSeq[RemotePeerInfo]()) # Return an empty seq by default
|
||||
|
||||
## Initialisation
|
||||
|
||||
proc init*(T: type App, conf: WakuNodeConf): Result[App, string] =
|
||||
|
@ -121,7 +88,7 @@ proc init*(T: type App, conf: WakuNodeConf): Result[App, string] =
|
|||
confCopy.nodekey = some(keyRes.get())
|
||||
|
||||
debug "Retrieve dynamic bootstrap nodes"
|
||||
let dynamicBootstrapNodesRes = retrieveDynamicBootstrapNodes(
|
||||
let dynamicBootstrapNodesRes = waku_dnsdisc.retrieveDynamicBootstrapNodes(
|
||||
confCopy.dnsDiscovery, confCopy.dnsDiscoveryUrl, confCopy.dnsDiscoveryNameServers
|
||||
)
|
||||
if dynamicBootstrapNodesRes.isErr():
|
||||
|
@ -147,43 +114,6 @@ proc init*(T: type App, conf: WakuNodeConf): Result[App, string] =
|
|||
|
||||
ok(app)
|
||||
|
||||
## Setup DiscoveryV5
|
||||
|
||||
proc setupDiscoveryV5*(app: App): WakuDiscoveryV5 =
|
||||
let dynamicBootstrapEnrs =
|
||||
app.dynamicBootstrapNodes.filterIt(it.hasUdpPort()).mapIt(it.enr.get())
|
||||
|
||||
var discv5BootstrapEnrs: seq[enr.Record]
|
||||
|
||||
# parse enrURIs from the configuration and add the resulting ENRs to the discv5BootstrapEnrs seq
|
||||
for enrUri in app.conf.discv5BootstrapNodes:
|
||||
addBootstrapNode(enrUri, discv5BootstrapEnrs)
|
||||
|
||||
discv5BootstrapEnrs.add(dynamicBootstrapEnrs)
|
||||
|
||||
let discv5Config = DiscoveryConfig.init(
|
||||
app.conf.discv5TableIpLimit, app.conf.discv5BucketIpLimit, app.conf.discv5BitsPerHop
|
||||
)
|
||||
|
||||
let discv5UdpPort = Port(uint16(app.conf.discv5UdpPort) + app.conf.portsShift)
|
||||
|
||||
let discv5Conf = WakuDiscoveryV5Config(
|
||||
discv5Config: some(discv5Config),
|
||||
address: app.conf.listenAddress,
|
||||
port: discv5UdpPort,
|
||||
privateKey: keys.PrivateKey(app.key.skkey),
|
||||
bootstrapRecords: discv5BootstrapEnrs,
|
||||
autoupdateRecord: app.conf.discv5EnrAutoUpdate,
|
||||
)
|
||||
|
||||
WakuDiscoveryV5.new(
|
||||
app.rng,
|
||||
discv5Conf,
|
||||
some(app.node.enr),
|
||||
some(app.node.peerManager),
|
||||
app.node.topicSubscriptionQueue,
|
||||
)
|
||||
|
||||
proc getPorts(
|
||||
listenAddrs: seq[MultiAddress]
|
||||
): AppResult[tuple[tcpPort, websocketPort: Option[Port]]] =
|
||||
|
@ -259,17 +189,13 @@ proc startApp*(app: var App): AppResult[void] =
|
|||
|
||||
## Discv5
|
||||
if app.conf.discv5Discovery:
|
||||
app.wakuDiscV5 = some(app.setupDiscoveryV5())
|
||||
app.wakuDiscV5 = waku_discv5.setupDiscoveryV5(
|
||||
app.node.enr, app.node.peerManager, app.node.topicSubscriptionQueue, app.conf,
|
||||
app.dynamicBootstrapNodes, app.rng, app.key,
|
||||
)
|
||||
|
||||
if app.wakuDiscv5.isSome():
|
||||
let wakuDiscv5 = app.wakuDiscv5.get()
|
||||
let catchRes = catch:
|
||||
(waitFor wakuDiscv5.start())
|
||||
let startRes = catchRes.valueOr:
|
||||
return err("failed to start waku discovery v5: " & catchRes.error.msg)
|
||||
|
||||
startRes.isOkOr:
|
||||
return err("failed to start waku discovery v5: " & error)
|
||||
(waitFor app.wakuDiscV5.start()).isOkOr:
|
||||
return err("failed to start waku discovery v5: " & $error)
|
||||
|
||||
return ok()
|
||||
|
||||
|
@ -282,8 +208,8 @@ proc stop*(app: App): Future[void] {.async: (raises: [Exception]).} =
|
|||
if not app.metricsServer.isNil():
|
||||
await app.metricsServer.stop()
|
||||
|
||||
if app.wakuDiscv5.isSome():
|
||||
await app.wakuDiscv5.get().stop()
|
||||
if not app.wakuDiscv5.isNil():
|
||||
await app.wakuDiscv5.stop()
|
||||
|
||||
if not app.node.isNil():
|
||||
await app.node.stop()
|
||||
|
|
|
@ -50,7 +50,6 @@ import
|
|||
../waku_rln_relay,
|
||||
./config,
|
||||
./peer_manager,
|
||||
../discovery/waku_dnsdisc,
|
||||
../common/ratelimit
|
||||
|
||||
declarePublicCounter waku_node_messages, "number of messages received", ["type"]
|
||||
|
|
|
@ -113,7 +113,7 @@ proc startRestServerEsentials*(
|
|||
proc startRestServerProtocolSupport*(
|
||||
restServer: WakuRestServerRef,
|
||||
node: WakuNode,
|
||||
wakuDiscv5: Option[WakuDiscoveryV5],
|
||||
wakuDiscv5: WakuDiscoveryV5,
|
||||
conf: WakuNodeConf,
|
||||
): Result[void, string] =
|
||||
if not conf.rest:
|
||||
|
@ -154,8 +154,8 @@ proc startRestServerProtocolSupport*(
|
|||
let filterCache = MessageCache.init()
|
||||
|
||||
let filterDiscoHandler =
|
||||
if wakuDiscv5.isSome():
|
||||
some(defaultDiscoveryHandler(wakuDiscv5.get(), Filter))
|
||||
if not wakuDiscv5.isNil():
|
||||
some(defaultDiscoveryHandler(wakuDiscv5, Filter))
|
||||
else:
|
||||
none(DiscoveryHandler)
|
||||
|
||||
|
@ -168,8 +168,8 @@ proc startRestServerProtocolSupport*(
|
|||
|
||||
## Store REST API
|
||||
let storeDiscoHandler =
|
||||
if wakuDiscv5.isSome():
|
||||
some(defaultDiscoveryHandler(wakuDiscv5.get(), Store))
|
||||
if not wakuDiscv5.isNil():
|
||||
some(defaultDiscoveryHandler(wakuDiscv5, Store))
|
||||
else:
|
||||
none(DiscoveryHandler)
|
||||
|
||||
|
@ -182,8 +182,8 @@ proc startRestServerProtocolSupport*(
|
|||
if (conf.lightpushnode != "" and node.wakuLightpushClient != nil) or
|
||||
(conf.lightpush and node.wakuLightPush != nil and node.wakuRelay != nil):
|
||||
let lightDiscoHandler =
|
||||
if wakuDiscv5.isSome():
|
||||
some(defaultDiscoveryHandler(wakuDiscv5.get(), Lightpush))
|
||||
if not wakuDiscv5.isNil():
|
||||
some(defaultDiscoveryHandler(wakuDiscv5, Lightpush))
|
||||
else:
|
||||
none(DiscoveryHandler)
|
||||
|
||||
|
|
Loading…
Reference in New Issue