chore: move discv5 out of node. (#1818)

- Refactor discv5 start, stop & loop.
- Fix tests.
This commit is contained in:
Simon-Pierre Vivier 2023-06-27 09:50:11 -04:00 committed by GitHub
parent 52894a82d0
commit 62d3653022
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 208 additions and 237 deletions

View File

@ -71,6 +71,7 @@ type
key: crypto.PrivateKey
record: Record
wakuDiscv5: Option[WakuDiscoveryV5]
peerStore: Option[WakuPeerStorage]
dynamicBootstrapNodes: seq[RemotePeerInfo]
@ -196,6 +197,37 @@ proc setupDyamicBootstrapNodes*(app: var App): AppResult[void] =
ok()
## Setup DiscoveryV5
proc setupDiscoveryV5*(app: App): WakuDiscoveryV5 =
let dynamicBootstrapEnrs = app.dynamicBootstrapNodes
.filterIt(it.hasUdpPort())
.mapIt(it.enr.get())
var discv5BootstrapEnrs: seq[enr.Record]
# parse enrURIs from the configuration and add the resulting ENRs to the discv5BootstrapEnrs seq
for enrUri in app.conf.discv5BootstrapNodes:
addBootstrapNode(enrUri, discv5BootstrapEnrs)
discv5BootstrapEnrs.add(dynamicBootstrapEnrs)
let discv5Config = DiscoveryConfig.init(app.conf.discv5TableIpLimit,
app.conf.discv5BucketIpLimit,
app.conf.discv5BitsPerHop)
let discv5UdpPort = Port(uint16(app.conf.discv5UdpPort) + app.conf.portsShift)
let discv5Conf = WakuDiscoveryV5Config(
discv5Config: some(discv5Config),
address: app.netConf.bindIp,
port: discv5UdpPort,
privateKey: keys.PrivateKey(app.key.skkey),
bootstrapRecords: discv5BootstrapEnrs,
autoupdateRecord: app.conf.discv5EnrAutoUpdate,
)
WakuDiscoveryV5.new(app.rng, discv5Conf, some(app.record))
## Init waku node instance
@ -225,39 +257,6 @@ proc initNode(conf: WakuNodeConf,
let pStorage = if peerStore.isNone(): nil
else: peerStore.get()
var wakuDiscv5 = none(WakuDiscoveryV5)
if conf.discv5Discovery:
let dynamicBootstrapEnrs = dynamicBootstrapNodes
.filterIt(it.hasUdpPort())
.mapIt(it.enr.get())
var discv5BootstrapEnrs: seq[enr.Record]
# parse enrURIs from the configuration and add the resulting ENRs to the discv5BootstrapEnrs seq
for enrUri in conf.discv5BootstrapNodes:
addBootstrapNode(enrUri, discv5BootstrapEnrs)
discv5BootstrapEnrs.add(dynamicBootstrapEnrs)
let discv5Config = DiscoveryConfig.init(conf.discv5TableIpLimit,
conf.discv5BucketIpLimit,
conf.discv5BitsPerHop)
try:
wakuDiscv5 = some(WakuDiscoveryV5.new(
extIp = netConfig.extIp,
extTcpPort = netConfig.extPort,
extUdpPort = netConfig.discv5UdpPort,
bindIp = netConfig.bindIp,
discv5UdpPort = netConfig.discv5UdpPort.get(),
bootstrapEnrs = discv5BootstrapEnrs,
enrAutoUpdate = conf.discv5EnrAutoUpdate,
privateKey = keys.PrivateKey(nodekey.skkey),
flags = netConfig.wakuFlags.get(),
multiaddrs = netConfig.enrMultiaddrs,
rng = rng,
conf.topics,
discv5Config = discv5Config
))
except CatchableError:
return err("failed to create waku discv5 instance: " & getCurrentExceptionMsg())
# Build waku node instance
var builder = WakuNodeBuilder.init()
builder.withRng(rng)
@ -273,20 +272,25 @@ proc initNode(conf: WakuNodeConf,
sendSignedPeerRecord = conf.relayPeerExchange, # We send our own signed peer record when peer exchange enabled
agentString = some(conf.agentString)
)
builder.withWakuDiscv5(wakuDiscv5.get(nil))
builder.withPeerManagerConfig(maxRelayPeers = some(conf.maxRelayPeers.int))
node = ? builder.build().mapErr(proc (err: string): string = "failed to create waku node instance: " & err)
ok(node)
proc setupWakuNode*(app: var App): AppResult[void] =
proc setupWakuApp*(app: var App): AppResult[void] =
## Discv5
if app.conf.discv5Discovery:
app.wakuDiscV5 = some(app.setupDiscoveryV5())
## Waku node
let initNodeRes = initNode(app.conf, app.netConf, app.rng, app.key, app.record, app.peerStore, app.dynamicBootstrapNodes)
if initNodeRes.isErr():
return err("failed to init node: " & initNodeRes.error)
app.node = initNodeRes.get()
ok()
@ -466,12 +470,6 @@ proc startNode(node: WakuNode, conf: WakuNodeConf,
except CatchableError:
return err("failed to start waku node: " & getCurrentExceptionMsg())
# Start discv5 based discovery service (discovery loop)
if conf.discv5Discovery:
let startDiscv5Res = await node.startDiscv5()
if startDiscv5Res.isErr():
return err("failed to start waku discovery v5: " & startDiscv5Res.error)
# Connect to configured static nodes
if conf.staticnodes.len > 0:
try:
@ -501,7 +499,15 @@ proc startNode(node: WakuNode, conf: WakuNodeConf,
return ok()
proc startNode*(app: App): Future[AppResult[void]] {.async.} =
proc startApp*(app: App): Future[AppResult[void]] {.async.} =
if app.wakuDiscv5.isSome():
let res = await app.wakuDiscv5.get().start()
if res.isErr():
return err("failed to start waku discovery v5: " & res.error)
asyncSpawn app.wakuDiscv5.get().searchLoop(app.node.peerManager, some(app.record))
return await startNode(
app.node,
app.conf,
@ -624,5 +630,8 @@ proc stop*(app: App): Future[void] {.async.} =
if app.metricsServer.isSome():
await app.metricsServer.get().stop()
if app.wakuDiscv5.isSome():
await app.wakuDiscv5.get().stop()
if not app.node.isNil():
await app.node.stop()

View File

@ -73,7 +73,7 @@ when isMainModule:
debug "3/7 Initializing node"
let res4 = wakunode2.setupWakuNode()
let res4 = wakunode2.setupWakuApp()
if res4.isErr():
error "3/7 Initializing node failed", error=res4.error
quit(QuitFailure)
@ -87,7 +87,7 @@ when isMainModule:
debug "5/7 Starting node and mounted protocols"
let res6 = waitFor wakunode2.startNode()
let res6 = waitFor wakunode2.startApp()
if res6.isErr():
error "5/7 Starting node and protocols failed", error=res6.error
quit(QuitFailure)

View File

@ -52,7 +52,7 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} =
discard bootstrapNodeEnr.fromURI(bootstrapNode)
# assumes behind a firewall, so not care about being discoverable
node.wakuDiscv5 = WakuDiscoveryV5.new(
let wakuDiscv5 = WakuDiscoveryV5.new(
extIp= none(ValidIpAddress),
extTcpPort = none(Port),
extUdpPort = none(Port),
@ -69,11 +69,13 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} =
await node.mountRelay()
node.peerManager.start()
let discv5Res = await node.startDiscv5()
let discv5Res = await wakuDiscv5.start()
if discv5Res.isErr():
error "failed to start discv5", error= discv5Res.error
quit(1)
asyncSpawn wakuDiscv5.searchLoop(node.peerManager, some(node.enr))
# wait for a minimum of peers to be connected, otherwise messages wont be gossiped
while true:
let numConnectedPeers = node.peerManager.peerStore[ConnectionBook].book.values().countIt(it == Connected)

View File

@ -47,7 +47,7 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} =
discard bootstrapNodeEnr.fromURI(bootstrapNode)
# assumes behind a firewall, so not care about being discoverable
node.wakuDiscv5 = WakuDiscoveryV5.new(
let wakuDiscv5 = WakuDiscoveryV5.new(
extIp= none(ValidIpAddress),
extTcpPort = none(Port),
extUdpPort = none(Port),
@ -64,11 +64,13 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} =
await node.mountRelay()
node.peerManager.start()
let discv5Res = await node.startDiscv5()
let discv5Res = await wakuDiscv5.start()
if discv5Res.isErr():
error "failed to start discv5", error = discv5Res.error
quit(1)
asyncSpawn wakuDiscv5.searchLoop(node.peerManager, some(node.enr))
# wait for a minimum of peers to be connected, otherwise messages wont be gossiped
while true:
let numConnectedPeers = node.peerManager.peerStore[ConnectionBook].book.values().countIt(it == Connected)

View File

@ -9,7 +9,6 @@ import
libp2p/crypto/crypto as libp2p_keys,
eth/keys as eth_keys
import
../../waku/v2/waku_node,
../../waku/v2/waku_enr,
../../waku/v2/waku_discv5,
./testlib/common,
@ -33,10 +32,10 @@ proc newTestEnrRecord(privKey: libp2p_keys.PrivateKey,
builder.build().tryGet()
proc newTestDiscv5Node(privKey: libp2p_keys.PrivateKey,
proc newTestDiscv5(privKey: libp2p_keys.PrivateKey,
bindIp: string, tcpPort: uint16, udpPort: uint16,
record: waku_enr.Record,
bootstrapRecords = newSeq[waku_enr.Record]()): WakuNode =
bootstrapRecords = newSeq[waku_enr.Record]()): WakuDiscoveryV5 =
let config = WakuDiscoveryV5Config(
privateKey: eth_keys.PrivateKey(privKey.skkey),
address: ValidIpAddress.init(bindIp),
@ -44,16 +43,9 @@ proc newTestDiscv5Node(privKey: libp2p_keys.PrivateKey,
bootstrapRecords: bootstrapRecords,
)
let protocol = WakuDiscoveryV5.new(rng(), config, some(record))
let node = newTestWakuNode(
nodeKey = privKey,
bindIp = ValidIpAddress.init(bindIp),
bindPort = Port(tcpPort),
wakuDiscv5 = some(protocol)
)
return node
let discv5 = WakuDiscoveryV5.new(rng(), config, some(record))
return discv5
procSuite "Waku Discovery v5":
@ -73,7 +65,7 @@ procSuite "Waku Discovery v5":
tcpPort = tcpPort1,
udpPort = udpPort1,
)
let node1 = newTestDiscv5Node(
let node1 = newTestDiscv5(
privKey = privKey1,
bindIp = bindIp1,
tcpPort = tcpPort1,
@ -96,7 +88,7 @@ procSuite "Waku Discovery v5":
udpPort = udpPort2,
)
let node2 = newTestDiscv5Node(
let node2 = newTestDiscv5(
privKey = privKey2,
bindIp = bindIp2,
tcpPort = tcpPort2,
@ -119,7 +111,7 @@ procSuite "Waku Discovery v5":
udpPort = udpPort3,
)
let node3 = newTestDiscv5Node(
let node3 = newTestDiscv5(
privKey = privKey3,
bindIp = bindIp3,
tcpPort = tcpPort3,
@ -131,11 +123,7 @@ procSuite "Waku Discovery v5":
await allFutures(node1.start(), node2.start(), node3.start())
## When
# Starting discv5 via `WakuNode.startDiscV5()` starts the discv5 background task.
await allFutures(node1.startDiscv5(), node2.startDiscv5(), node3.startDiscv5())
await sleepAsync(5.seconds) # Wait for discv5 discovery loop to run
let res = await node1.wakuDiscv5.findRandomPeers()
let res = await node3.findRandomPeers()
## Then
check:
@ -209,7 +197,7 @@ procSuite "Waku Discovery v5":
# Nodes
let node1 = newTestDiscv5Node(
let node1 = newTestDiscv5(
privKey = privKey1,
bindIp = bindIp1,
tcpPort = tcpPort1,
@ -217,7 +205,7 @@ procSuite "Waku Discovery v5":
record = record1,
bootstrapRecords = @[record2]
)
let node2 = newTestDiscv5Node(
let node2 = newTestDiscv5(
privKey = privKey2,
bindIp = bindIp2,
tcpPort = tcpPort2,
@ -226,7 +214,7 @@ procSuite "Waku Discovery v5":
bootstrapRecords = @[record3, record4]
)
let node3 = newTestDiscv5Node(
let node3 = newTestDiscv5(
privKey = privKey3,
bindIp = bindIp3,
tcpPort = tcpPort3,
@ -234,7 +222,7 @@ procSuite "Waku Discovery v5":
record = record3
)
let node4 = newTestDiscv5Node(
let node4 = newTestDiscv5(
privKey = privKey4,
bindIp = bindIp4,
tcpPort = tcpPort4,
@ -243,11 +231,6 @@ procSuite "Waku Discovery v5":
)
# Start nodes' discoveryV5 protocols
require node1.wakuDiscV5.start().isOk()
require node2.wakuDiscV5.start().isOk()
require node3.wakuDiscV5.start().isOk()
require node4.wakuDiscV5.start().isOk()
await allFutures(node1.start(), node2.start(), node3.start(), node4.start())
## Given
@ -264,13 +247,7 @@ procSuite "Waku Discovery v5":
## When
# # Do a random peer search with a predicate multiple times
# var peers = initHashSet[waku_enr.Record]()
# for i in 0..<10:
# for peer in await node1.wakuDiscv5.findRandomPeers(pred=recordPredicate):
# peers.incl(peer)
await sleepAsync(5.seconds) # Wait for discv5 discvery loop to run
let peers = await node1.wakuDiscv5.findRandomPeers(some(recordPredicate))
let peers = await node1.findRandomPeers(some(recordPredicate))
## Then
check:

View File

@ -72,68 +72,93 @@ procSuite "Waku Peer Exchange":
resEnr2 == enr2
asyncTest "retrieve and provide peer exchange peers from discv5":
## Setup (copied from test_waku_discv5.nim)
## Given (copied from test_waku_discv5.nim)
let
bindIp = ValidIpAddress.init("0.0.0.0")
extIp = ValidIpAddress.init("127.0.0.1")
nodeKey1 = generateSecp256k1Key()
nodeTcpPort1 = Port(64010)
nodeUdpPort1 = Port(9000)
node1 = newTestWakuNode(nodeKey1, bindIp, nodeTcpPort1)
nodeKey2 = generateSecp256k1Key()
nodeTcpPort2 = Port(64012)
nodeUdpPort2 = Port(9002)
node2 = newTestWakuNode(nodeKey2, bindIp, nodeTcpPort2)
nodeKey3 = generateSecp256k1Key()
nodeTcpPort3 = Port(64014)
nodeUdpPort3 = Port(9004)
node3 = newTestWakuNode(nodeKey3, bindIp, nodeTcpPort3)
# todo: px flag
flags = CapabilitiesBitfield.init(
lightpush = false,
filter = false,
store = false,
relay = true
)
)
bindIp = ValidIpAddress.init("0.0.0.0")
extIp = ValidIpAddress.init("127.0.0.1")
# Mount discv5
node1.wakuDiscv5 = WakuDiscoveryV5.new(
some(extIp), some(nodeTcpPort1), some(nodeUdpPort1),
nodeKey1 = generateSecp256k1Key()
nodeTcpPort1 = Port(64010)
nodeUdpPort1 = Port(9000)
node1 = newTestWakuNode(
nodeKey1,
bindIp,
nodeUdpPort1,
newSeq[enr.Record](),
false,
keys.PrivateKey(nodeKey1.skkey),
flags,
newSeq[MultiAddress](), # Empty multiaddr fields, for now
nodeTcpPort1,
some(extIp),
wakuFlags = some(flags),
discv5UdpPort = some(nodeUdpPort1)
)
nodeKey2 = generateSecp256k1Key()
nodeTcpPort2 = Port(64012)
nodeUdpPort2 = Port(9002)
node2 = newTestWakuNode(nodeKey2,
bindIp,
nodeTcpPort2,
some(extIp),
wakuFlags = some(flags),
discv5UdpPort = some(nodeUdpPort2)
)
nodeKey3 = generateSecp256k1Key()
nodeTcpPort3 = Port(64014)
nodeUdpPort3 = Port(9004)
node3 = newTestWakuNode(nodeKey3,
bindIp,
nodeTcpPort3,
some(extIp),
wakuFlags = some(flags),
discv5UdpPort = some(nodeUdpPort3)
)
# discv5
let conf1 = WakuDiscoveryV5Config(
discv5Config: none(DiscoveryConfig),
address: bindIp,
port: nodeUdpPort1,
privateKey: keys.PrivateKey(nodeKey1.skkey),
bootstrapRecords: @[],
autoupdateRecord: true
)
let disc1 = WakuDiscoveryV5.new(
node1.rng,
newSeq[string]()
conf1,
some(node1.enr)
)
node2.wakuDiscv5 = WakuDiscoveryV5.new(
some(extIp), some(nodeTcpPort2), some(nodeUdpPort2),
bindIp,
nodeUdpPort2,
@[node1.wakuDiscv5.protocol.localNode.record], # Bootstrap with node1
false,
keys.PrivateKey(nodeKey2.skkey),
flags,
newSeq[MultiAddress](), # Empty multiaddr fields, for now
let conf2 = WakuDiscoveryV5Config(
discv5Config: none(DiscoveryConfig),
address: bindIp,
port: nodeUdpPort2,
privateKey: keys.PrivateKey(nodeKey2.skkey),
bootstrapRecords: @[disc1.protocol.getRecord()],
autoupdateRecord: true
)
let disc2 = WakuDiscoveryV5.new(
node2.rng,
newSeq[string]()
conf2,
some(node2.enr)
)
## Given
await allFutures(node1.start(), node2.start(), node3.start())
await allFutures(node1.startDiscv5(), node2.startDiscv5())
await allFutures(node1.start(), node2.start(), node3.start())
await allFutures(disc1.start(), disc2.start())
asyncSpawn disc1.searchLoop(node1.peerManager, none(enr.Record))
asyncSpawn disc2.searchLoop(node2.peerManager, none(enr.Record))
## When
var attempts = 10
while (node1.wakuDiscv5.protocol.nodesDiscovered < 1 or
node2.wakuDiscv5.protocol.nodesDiscovered < 1) and
while (disc1.protocol.nodesDiscovered < 1 or
disc2.protocol.nodesDiscovered < 1) and
attempts > 0:
await sleepAsync(1.seconds)
attempts -= 1
@ -157,11 +182,12 @@ procSuite "Waku Peer Exchange":
await sleepAsync(1.seconds)
attempts -= 1
## Then
check:
response.get().peerInfos.len == 1
response.get().peerInfos[0].enr == node2.wakuDiscV5.protocol.localNode.record.raw
response.get().peerInfos[0].enr == disc2.protocol.localNode.record.raw
await allFutures([node1.stop(), node2.stop(), node3.stop()])
await allFutures([node1.stop(), node2.stop(), node3.stop(), disc1.stop(), disc2.stop()])
asyncTest "peer exchange request functions returns some discovered peers":
let

View File

@ -36,7 +36,6 @@ proc newTestWakuNode*(nodeKey: crypto.PrivateKey,
sendSignedPeerRecord = false,
dns4DomainName = none(string),
discv5UdpPort = none(Port),
wakuDiscv5 = none(WakuDiscoveryV5),
agentString = none(string),
peerStoreCapacity = none(int)): WakuNode =
let netConfigRes = NetConfig.init(
@ -69,6 +68,5 @@ proc newTestWakuNode*(nodeKey: crypto.PrivateKey,
agentString = agentString,
)
builder.withWakuDiscv5(wakuDiscv5.get(nil))
return builder.build().get()

View File

@ -63,9 +63,9 @@ suite "Wakunode2 - App initialization":
var wakunode2 = App.init(rng(), conf)
require wakunode2.setupPeerPersistence().isOk()
require wakunode2.setupDyamicBootstrapNodes().isOk()
require wakunode2.setupWakuNode().isOk()
require wakunode2.setupWakuApp().isOk()
require isOk(waitFor wakunode2.setupAndMountProtocols())
require isOk(waitFor wakunode2.startNode())
require isOk(waitFor wakunode2.startApp())
require wakunode2.setupMonitoringAndExternalInterfaces().isOk()
## Then

View File

@ -44,9 +44,6 @@ type
switchSslSecureCert: Option[string]
switchSendSignedPeerRecord: Option[bool]
# Waku discv5
wakuDiscv5: Option[WakuDiscoveryV5]
WakuNodeBuilderResult* = Result[void, string]
@ -132,14 +129,6 @@ proc withSwitchConfiguration*(builder: var WakuNodeBuilder,
if not nameResolver.isNil():
builder.switchNameResolver = some(nameResolver)
## Waku discv5
proc withWakuDiscv5*(builder: var WakuNodeBuilder, instance: WakuDiscoveryV5) =
if not instance.isNil():
builder.wakuDiscv5 = some(instance)
## Build
proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] =
@ -196,7 +185,6 @@ proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] =
netConfig = builder.netConfig.get(),
enr = builder.record,
switch = switch,
wakuDiscv5 = builder.wakuDiscv5,
peerManager = peerManager,
rng = rng,
)

View File

@ -813,65 +813,6 @@ proc startKeepalive*(node: WakuNode) =
asyncSpawn node.keepaliveLoop(defaultKeepalive)
proc runDiscv5Loop(node: WakuNode) {.async.} =
## Continuously add newly discovered nodes using Node Discovery v5
if node.wakuDiscv5.isNil():
warn "Trying to run discovery v5 while it's disabled"
return
info "starting discv5 discovery loop"
let shardPredOp = shardingPredicate(node.enr)
while node.wakuDiscv5.listening:
trace "running discv5 discovery loop"
let discoveredRecords = await node.wakuDiscv5.findRandomPeers(shardPredOp)
let discoveredPeers = discoveredRecords.mapIt(it.toRemotePeerInfo()).filterIt(it.isOk()).mapIt(it.value)
for peer in discoveredPeers:
let isNew = not node.peerManager.peerStore[AddressBook].contains(peer.peerId)
if isNew:
debug "new peer discovered", peer= $peer, origin= "discv5"
node.peerManager.addPeer(peer, PeerOrigin.Discv5)
# Discovery `queryRandom` can have a synchronous fast path for example
# when no peers are in the routing table. Don't run it in continuous loop.
#
# Also, give some time to dial the discovered nodes and update stats, etc.
await sleepAsync(5.seconds)
proc startDiscv5*(node: WakuNode): Future[Result[void, string]] {.async.} =
## Start Discovery v5 service
if node.wakuDiscv5.isNil():
return err("discovery v5 is disabled")
info "Starting discovery v5 service"
let res = node.wakuDiscv5.start()
if res.isErr():
return err("error in startDiscv5: " & res.error)
trace "Start discovering new peers using discv5"
asyncSpawn node.runDiscv5Loop()
debug "Successfully started discovery v5 service"
info "Discv5: discoverable ENR ", enr = node.wakuDiscV5.protocol.localNode.record.toUri()
return ok()
proc stopDiscv5*(node: WakuNode): Future[bool] {.async.} =
## Stop Discovery v5 service
if not node.wakuDiscv5.isNil():
info "Stopping discovery v5 service"
## Stop Discovery v5 process and close listening port
if node.wakuDiscv5.listening:
trace "Stop listening on discv5 port"
await node.wakuDiscv5.closeWait()
debug "Successfully stopped discovery v5 service"
proc mountRendezvous*(node: WakuNode) {.async, raises: [Defect, LPError].} =
info "mounting rendezvous discovery protocol"
@ -923,9 +864,6 @@ proc stop*(node: WakuNode) {.async.} =
if not node.wakuRelay.isNil():
await node.wakuRelay.stop()
if not node.wakuDiscv5.isNil():
discard await node.stopDiscv5()
await node.switch.stop()
node.peerManager.stop()

View File

@ -15,6 +15,7 @@ import
eth/p2p/discoveryv5/node,
eth/p2p/discoveryv5/protocol
import
../../waku/v2/node/peer_manager/peer_manager,
./waku_core,
./waku_enr
@ -121,34 +122,6 @@ proc new*(T: type WakuDiscoveryV5,
WakuDiscoveryV5.new(rng, conf, some(record))
proc start*(wd: WakuDiscoveryV5): Result[void, string] =
if wd.listening:
return err("already listening")
# Start listening on configured port
debug "start listening on udp port", address = $wd.conf.address, port = $wd.conf.port
try:
wd.protocol.open()
except CatchableError:
return err("failed to open udp port: " & getCurrentExceptionMsg())
wd.listening = true
# Start Discovery v5
trace "start discv5 service"
wd.protocol.start()
ok()
proc closeWait*(wd: WakuDiscoveryV5) {.async.} =
debug "closing Waku discovery v5 node"
if not wd.listening:
return
wd.listening = false
await wd.protocol.closeWait()
proc shardingPredicate*(record: Record): Option[WakuDiscv5Predicate] =
## Filter peers based on relay sharding information
@ -185,6 +158,64 @@ proc findRandomPeers*(wd: WakuDiscoveryV5, pred = none(WakuDiscv5Predicate)): Fu
return discoveredRecords
#TODO abstract away PeerManager
proc searchLoop*(wd: WakuDiscoveryV5, peerManager: PeerManager, record: Option[enr.Record]) {.async.} =
## Continuously add newly discovered nodes
info "Starting discovery v5 search"
let shardPredOp =
if record.isSome():
shardingPredicate(record.get())
else:
none(WakuDiscv5Predicate)
while wd.listening:
trace "running discv5 discovery loop"
let discoveredRecords = await wd.findRandomPeers(shardPredOp)
let discoveredPeers = discoveredRecords.mapIt(it.toRemotePeerInfo()).filterIt(it.isOk()).mapIt(it.value)
for peer in discoveredPeers:
# Peers added are filtered by the peer manager
peerManager.addPeer(peer, PeerOrigin.Discv5)
# Discovery `queryRandom` can have a synchronous fast path for example
# when no peers are in the routing table. Don't run it in continuous loop.
#
# Also, give some time to dial the discovered nodes and update stats, etc.
await sleepAsync(5.seconds)
proc start*(wd: WakuDiscoveryV5): Future[Result[void, string]] {.async.} =
if wd.listening:
return err("already listening")
info "Starting discovery v5 service"
debug "start listening on udp port", address = $wd.conf.address, port = $wd.conf.port
try:
wd.protocol.open()
except CatchableError:
return err("failed to open udp port: " & getCurrentExceptionMsg())
wd.listening = true
trace "start discv5 service"
wd.protocol.start()
debug "Successfully started discovery v5 service"
info "Discv5: discoverable ENR ", enr = wd.protocol.localNode.record.toUri()
proc stop*(wd: WakuDiscoveryV5): Future[void] {.async.} =
if not wd.listening:
return
info "Stopping discovery v5 service"
wd.listening = false
trace "Stop listening on discv5 port"
await wd.protocol.closeWait()
debug "Successfully stopped discovery v5 service"
## Helper functions