diff --git a/apps/wakunode2/app.nim b/apps/wakunode2/app.nim index bbd012899..e721f10a7 100644 --- a/apps/wakunode2/app.nim +++ b/apps/wakunode2/app.nim @@ -71,6 +71,7 @@ type key: crypto.PrivateKey record: Record + wakuDiscv5: Option[WakuDiscoveryV5] peerStore: Option[WakuPeerStorage] dynamicBootstrapNodes: seq[RemotePeerInfo] @@ -196,6 +197,37 @@ proc setupDyamicBootstrapNodes*(app: var App): AppResult[void] = ok() +## Setup DiscoveryV5 + +proc setupDiscoveryV5*(app: App): WakuDiscoveryV5 = + let dynamicBootstrapEnrs = app.dynamicBootstrapNodes + .filterIt(it.hasUdpPort()) + .mapIt(it.enr.get()) + + var discv5BootstrapEnrs: seq[enr.Record] + + # parse enrURIs from the configuration and add the resulting ENRs to the discv5BootstrapEnrs seq + for enrUri in app.conf.discv5BootstrapNodes: + addBootstrapNode(enrUri, discv5BootstrapEnrs) + + discv5BootstrapEnrs.add(dynamicBootstrapEnrs) + + let discv5Config = DiscoveryConfig.init(app.conf.discv5TableIpLimit, + app.conf.discv5BucketIpLimit, + app.conf.discv5BitsPerHop) + + let discv5UdpPort = Port(uint16(app.conf.discv5UdpPort) + app.conf.portsShift) + + let discv5Conf = WakuDiscoveryV5Config( + discv5Config: some(discv5Config), + address: app.netConf.bindIp, + port: discv5UdpPort, + privateKey: keys.PrivateKey(app.key.skkey), + bootstrapRecords: discv5BootstrapEnrs, + autoupdateRecord: app.conf.discv5EnrAutoUpdate, + ) + + WakuDiscoveryV5.new(app.rng, discv5Conf, some(app.record)) ## Init waku node instance @@ -225,39 +257,6 @@ proc initNode(conf: WakuNodeConf, let pStorage = if peerStore.isNone(): nil else: peerStore.get() - var wakuDiscv5 = none(WakuDiscoveryV5) - - if conf.discv5Discovery: - let dynamicBootstrapEnrs = dynamicBootstrapNodes - .filterIt(it.hasUdpPort()) - .mapIt(it.enr.get()) - var discv5BootstrapEnrs: seq[enr.Record] - # parse enrURIs from the configuration and add the resulting ENRs to the discv5BootstrapEnrs seq - for enrUri in conf.discv5BootstrapNodes: - addBootstrapNode(enrUri, discv5BootstrapEnrs) - discv5BootstrapEnrs.add(dynamicBootstrapEnrs) - let discv5Config = DiscoveryConfig.init(conf.discv5TableIpLimit, - conf.discv5BucketIpLimit, - conf.discv5BitsPerHop) - try: - wakuDiscv5 = some(WakuDiscoveryV5.new( - extIp = netConfig.extIp, - extTcpPort = netConfig.extPort, - extUdpPort = netConfig.discv5UdpPort, - bindIp = netConfig.bindIp, - discv5UdpPort = netConfig.discv5UdpPort.get(), - bootstrapEnrs = discv5BootstrapEnrs, - enrAutoUpdate = conf.discv5EnrAutoUpdate, - privateKey = keys.PrivateKey(nodekey.skkey), - flags = netConfig.wakuFlags.get(), - multiaddrs = netConfig.enrMultiaddrs, - rng = rng, - conf.topics, - discv5Config = discv5Config - )) - except CatchableError: - return err("failed to create waku discv5 instance: " & getCurrentExceptionMsg()) - # Build waku node instance var builder = WakuNodeBuilder.init() builder.withRng(rng) @@ -273,20 +272,25 @@ proc initNode(conf: WakuNodeConf, sendSignedPeerRecord = conf.relayPeerExchange, # We send our own signed peer record when peer exchange enabled agentString = some(conf.agentString) ) - builder.withWakuDiscv5(wakuDiscv5.get(nil)) builder.withPeerManagerConfig(maxRelayPeers = some(conf.maxRelayPeers.int)) node = ? builder.build().mapErr(proc (err: string): string = "failed to create waku node instance: " & err) ok(node) -proc setupWakuNode*(app: var App): AppResult[void] = +proc setupWakuApp*(app: var App): AppResult[void] = + + ## Discv5 + if app.conf.discv5Discovery: + app.wakuDiscV5 = some(app.setupDiscoveryV5()) + ## Waku node let initNodeRes = initNode(app.conf, app.netConf, app.rng, app.key, app.record, app.peerStore, app.dynamicBootstrapNodes) if initNodeRes.isErr(): return err("failed to init node: " & initNodeRes.error) app.node = initNodeRes.get() + ok() @@ -466,12 +470,6 @@ proc startNode(node: WakuNode, conf: WakuNodeConf, except CatchableError: return err("failed to start waku node: " & getCurrentExceptionMsg()) - # Start discv5 based discovery service (discovery loop) - if conf.discv5Discovery: - let startDiscv5Res = await node.startDiscv5() - if startDiscv5Res.isErr(): - return err("failed to start waku discovery v5: " & startDiscv5Res.error) - # Connect to configured static nodes if conf.staticnodes.len > 0: try: @@ -501,7 +499,15 @@ proc startNode(node: WakuNode, conf: WakuNodeConf, return ok() -proc startNode*(app: App): Future[AppResult[void]] {.async.} = +proc startApp*(app: App): Future[AppResult[void]] {.async.} = + if app.wakuDiscv5.isSome(): + let res = await app.wakuDiscv5.get().start() + + if res.isErr(): + return err("failed to start waku discovery v5: " & res.error) + + asyncSpawn app.wakuDiscv5.get().searchLoop(app.node.peerManager, some(app.record)) + return await startNode( app.node, app.conf, @@ -624,5 +630,8 @@ proc stop*(app: App): Future[void] {.async.} = if app.metricsServer.isSome(): await app.metricsServer.get().stop() + if app.wakuDiscv5.isSome(): + await app.wakuDiscv5.get().stop() + if not app.node.isNil(): await app.node.stop() diff --git a/apps/wakunode2/wakunode2.nim b/apps/wakunode2/wakunode2.nim index b328b7d44..30a281f33 100644 --- a/apps/wakunode2/wakunode2.nim +++ b/apps/wakunode2/wakunode2.nim @@ -73,7 +73,7 @@ when isMainModule: debug "3/7 Initializing node" - let res4 = wakunode2.setupWakuNode() + let res4 = wakunode2.setupWakuApp() if res4.isErr(): error "3/7 Initializing node failed", error=res4.error quit(QuitFailure) @@ -87,7 +87,7 @@ when isMainModule: debug "5/7 Starting node and mounted protocols" - let res6 = waitFor wakunode2.startNode() + let res6 = waitFor wakunode2.startApp() if res6.isErr(): error "5/7 Starting node and protocols failed", error=res6.error quit(QuitFailure) diff --git a/examples/v2/publisher.nim b/examples/v2/publisher.nim index 01ed4470d..9dc265fb7 100644 --- a/examples/v2/publisher.nim +++ b/examples/v2/publisher.nim @@ -52,7 +52,7 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} = discard bootstrapNodeEnr.fromURI(bootstrapNode) # assumes behind a firewall, so not care about being discoverable - node.wakuDiscv5 = WakuDiscoveryV5.new( + let wakuDiscv5 = WakuDiscoveryV5.new( extIp= none(ValidIpAddress), extTcpPort = none(Port), extUdpPort = none(Port), @@ -69,11 +69,13 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} = await node.mountRelay() node.peerManager.start() - let discv5Res = await node.startDiscv5() + let discv5Res = await wakuDiscv5.start() if discv5Res.isErr(): error "failed to start discv5", error= discv5Res.error quit(1) + asyncSpawn wakuDiscv5.searchLoop(node.peerManager, some(node.enr)) + # wait for a minimum of peers to be connected, otherwise messages wont be gossiped while true: let numConnectedPeers = node.peerManager.peerStore[ConnectionBook].book.values().countIt(it == Connected) diff --git a/examples/v2/subscriber.nim b/examples/v2/subscriber.nim index 542d0bc4d..0fad77cca 100644 --- a/examples/v2/subscriber.nim +++ b/examples/v2/subscriber.nim @@ -47,7 +47,7 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} = discard bootstrapNodeEnr.fromURI(bootstrapNode) # assumes behind a firewall, so not care about being discoverable - node.wakuDiscv5 = WakuDiscoveryV5.new( + let wakuDiscv5 = WakuDiscoveryV5.new( extIp= none(ValidIpAddress), extTcpPort = none(Port), extUdpPort = none(Port), @@ -64,11 +64,13 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} = await node.mountRelay() node.peerManager.start() - let discv5Res = await node.startDiscv5() + let discv5Res = await wakuDiscv5.start() if discv5Res.isErr(): error "failed to start discv5", error = discv5Res.error quit(1) + asyncSpawn wakuDiscv5.searchLoop(node.peerManager, some(node.enr)) + # wait for a minimum of peers to be connected, otherwise messages wont be gossiped while true: let numConnectedPeers = node.peerManager.peerStore[ConnectionBook].book.values().countIt(it == Connected) diff --git a/tests/v2/test_waku_discv5.nim b/tests/v2/test_waku_discv5.nim index d868949e7..b0a070545 100644 --- a/tests/v2/test_waku_discv5.nim +++ b/tests/v2/test_waku_discv5.nim @@ -9,7 +9,6 @@ import libp2p/crypto/crypto as libp2p_keys, eth/keys as eth_keys import - ../../waku/v2/waku_node, ../../waku/v2/waku_enr, ../../waku/v2/waku_discv5, ./testlib/common, @@ -33,10 +32,10 @@ proc newTestEnrRecord(privKey: libp2p_keys.PrivateKey, builder.build().tryGet() -proc newTestDiscv5Node(privKey: libp2p_keys.PrivateKey, +proc newTestDiscv5(privKey: libp2p_keys.PrivateKey, bindIp: string, tcpPort: uint16, udpPort: uint16, record: waku_enr.Record, - bootstrapRecords = newSeq[waku_enr.Record]()): WakuNode = + bootstrapRecords = newSeq[waku_enr.Record]()): WakuDiscoveryV5 = let config = WakuDiscoveryV5Config( privateKey: eth_keys.PrivateKey(privKey.skkey), address: ValidIpAddress.init(bindIp), @@ -44,16 +43,9 @@ proc newTestDiscv5Node(privKey: libp2p_keys.PrivateKey, bootstrapRecords: bootstrapRecords, ) - let protocol = WakuDiscoveryV5.new(rng(), config, some(record)) - let node = newTestWakuNode( - nodeKey = privKey, - bindIp = ValidIpAddress.init(bindIp), - bindPort = Port(tcpPort), - wakuDiscv5 = some(protocol) - ) - - return node + let discv5 = WakuDiscoveryV5.new(rng(), config, some(record)) + return discv5 procSuite "Waku Discovery v5": @@ -73,7 +65,7 @@ procSuite "Waku Discovery v5": tcpPort = tcpPort1, udpPort = udpPort1, ) - let node1 = newTestDiscv5Node( + let node1 = newTestDiscv5( privKey = privKey1, bindIp = bindIp1, tcpPort = tcpPort1, @@ -96,7 +88,7 @@ procSuite "Waku Discovery v5": udpPort = udpPort2, ) - let node2 = newTestDiscv5Node( + let node2 = newTestDiscv5( privKey = privKey2, bindIp = bindIp2, tcpPort = tcpPort2, @@ -119,7 +111,7 @@ procSuite "Waku Discovery v5": udpPort = udpPort3, ) - let node3 = newTestDiscv5Node( + let node3 = newTestDiscv5( privKey = privKey3, bindIp = bindIp3, tcpPort = tcpPort3, @@ -131,11 +123,7 @@ procSuite "Waku Discovery v5": await allFutures(node1.start(), node2.start(), node3.start()) ## When - # Starting discv5 via `WakuNode.startDiscV5()` starts the discv5 background task. - await allFutures(node1.startDiscv5(), node2.startDiscv5(), node3.startDiscv5()) - - await sleepAsync(5.seconds) # Wait for discv5 discovery loop to run - let res = await node1.wakuDiscv5.findRandomPeers() + let res = await node3.findRandomPeers() ## Then check: @@ -209,7 +197,7 @@ procSuite "Waku Discovery v5": # Nodes - let node1 = newTestDiscv5Node( + let node1 = newTestDiscv5( privKey = privKey1, bindIp = bindIp1, tcpPort = tcpPort1, @@ -217,7 +205,7 @@ procSuite "Waku Discovery v5": record = record1, bootstrapRecords = @[record2] ) - let node2 = newTestDiscv5Node( + let node2 = newTestDiscv5( privKey = privKey2, bindIp = bindIp2, tcpPort = tcpPort2, @@ -226,7 +214,7 @@ procSuite "Waku Discovery v5": bootstrapRecords = @[record3, record4] ) - let node3 = newTestDiscv5Node( + let node3 = newTestDiscv5( privKey = privKey3, bindIp = bindIp3, tcpPort = tcpPort3, @@ -234,7 +222,7 @@ procSuite "Waku Discovery v5": record = record3 ) - let node4 = newTestDiscv5Node( + let node4 = newTestDiscv5( privKey = privKey4, bindIp = bindIp4, tcpPort = tcpPort4, @@ -243,11 +231,6 @@ procSuite "Waku Discovery v5": ) # Start nodes' discoveryV5 protocols - require node1.wakuDiscV5.start().isOk() - require node2.wakuDiscV5.start().isOk() - require node3.wakuDiscV5.start().isOk() - require node4.wakuDiscV5.start().isOk() - await allFutures(node1.start(), node2.start(), node3.start(), node4.start()) ## Given @@ -264,13 +247,7 @@ procSuite "Waku Discovery v5": ## When - # # Do a random peer search with a predicate multiple times - # var peers = initHashSet[waku_enr.Record]() - # for i in 0..<10: - # for peer in await node1.wakuDiscv5.findRandomPeers(pred=recordPredicate): - # peers.incl(peer) - await sleepAsync(5.seconds) # Wait for discv5 discvery loop to run - let peers = await node1.wakuDiscv5.findRandomPeers(some(recordPredicate)) + let peers = await node1.findRandomPeers(some(recordPredicate)) ## Then check: diff --git a/tests/v2/test_waku_peer_exchange.nim b/tests/v2/test_waku_peer_exchange.nim index a03752bc9..7bd14c5b2 100644 --- a/tests/v2/test_waku_peer_exchange.nim +++ b/tests/v2/test_waku_peer_exchange.nim @@ -72,68 +72,93 @@ procSuite "Waku Peer Exchange": resEnr2 == enr2 asyncTest "retrieve and provide peer exchange peers from discv5": - ## Setup (copied from test_waku_discv5.nim) + ## Given (copied from test_waku_discv5.nim) let - bindIp = ValidIpAddress.init("0.0.0.0") - extIp = ValidIpAddress.init("127.0.0.1") - - nodeKey1 = generateSecp256k1Key() - nodeTcpPort1 = Port(64010) - nodeUdpPort1 = Port(9000) - node1 = newTestWakuNode(nodeKey1, bindIp, nodeTcpPort1) - - nodeKey2 = generateSecp256k1Key() - nodeTcpPort2 = Port(64012) - nodeUdpPort2 = Port(9002) - node2 = newTestWakuNode(nodeKey2, bindIp, nodeTcpPort2) - - nodeKey3 = generateSecp256k1Key() - nodeTcpPort3 = Port(64014) - nodeUdpPort3 = Port(9004) - node3 = newTestWakuNode(nodeKey3, bindIp, nodeTcpPort3) - # todo: px flag flags = CapabilitiesBitfield.init( lightpush = false, filter = false, store = false, relay = true - ) + ) + bindIp = ValidIpAddress.init("0.0.0.0") + extIp = ValidIpAddress.init("127.0.0.1") - # Mount discv5 - node1.wakuDiscv5 = WakuDiscoveryV5.new( - some(extIp), some(nodeTcpPort1), some(nodeUdpPort1), + nodeKey1 = generateSecp256k1Key() + nodeTcpPort1 = Port(64010) + nodeUdpPort1 = Port(9000) + node1 = newTestWakuNode( + nodeKey1, bindIp, - nodeUdpPort1, - newSeq[enr.Record](), - false, - keys.PrivateKey(nodeKey1.skkey), - flags, - newSeq[MultiAddress](), # Empty multiaddr fields, for now + nodeTcpPort1, + some(extIp), + wakuFlags = some(flags), + discv5UdpPort = some(nodeUdpPort1) + ) + + nodeKey2 = generateSecp256k1Key() + nodeTcpPort2 = Port(64012) + nodeUdpPort2 = Port(9002) + node2 = newTestWakuNode(nodeKey2, + bindIp, + nodeTcpPort2, + some(extIp), + wakuFlags = some(flags), + discv5UdpPort = some(nodeUdpPort2) + ) + + nodeKey3 = generateSecp256k1Key() + nodeTcpPort3 = Port(64014) + nodeUdpPort3 = Port(9004) + node3 = newTestWakuNode(nodeKey3, + bindIp, + nodeTcpPort3, + some(extIp), + wakuFlags = some(flags), + discv5UdpPort = some(nodeUdpPort3) + ) + + # discv5 + let conf1 = WakuDiscoveryV5Config( + discv5Config: none(DiscoveryConfig), + address: bindIp, + port: nodeUdpPort1, + privateKey: keys.PrivateKey(nodeKey1.skkey), + bootstrapRecords: @[], + autoupdateRecord: true + ) + + let disc1 = WakuDiscoveryV5.new( node1.rng, - newSeq[string]() + conf1, + some(node1.enr) ) - node2.wakuDiscv5 = WakuDiscoveryV5.new( - some(extIp), some(nodeTcpPort2), some(nodeUdpPort2), - bindIp, - nodeUdpPort2, - @[node1.wakuDiscv5.protocol.localNode.record], # Bootstrap with node1 - false, - keys.PrivateKey(nodeKey2.skkey), - flags, - newSeq[MultiAddress](), # Empty multiaddr fields, for now + let conf2 = WakuDiscoveryV5Config( + discv5Config: none(DiscoveryConfig), + address: bindIp, + port: nodeUdpPort2, + privateKey: keys.PrivateKey(nodeKey2.skkey), + bootstrapRecords: @[disc1.protocol.getRecord()], + autoupdateRecord: true + ) + + let disc2 = WakuDiscoveryV5.new( node2.rng, - newSeq[string]() + conf2, + some(node2.enr) ) - ## Given - await allFutures(node1.start(), node2.start(), node3.start()) - await allFutures(node1.startDiscv5(), node2.startDiscv5()) + await allFutures(node1.start(), node2.start(), node3.start()) + await allFutures(disc1.start(), disc2.start()) + asyncSpawn disc1.searchLoop(node1.peerManager, none(enr.Record)) + asyncSpawn disc2.searchLoop(node2.peerManager, none(enr.Record)) + + ## When var attempts = 10 - while (node1.wakuDiscv5.protocol.nodesDiscovered < 1 or - node2.wakuDiscv5.protocol.nodesDiscovered < 1) and + while (disc1.protocol.nodesDiscovered < 1 or + disc2.protocol.nodesDiscovered < 1) and attempts > 0: await sleepAsync(1.seconds) attempts -= 1 @@ -157,11 +182,12 @@ procSuite "Waku Peer Exchange": await sleepAsync(1.seconds) attempts -= 1 + ## Then check: response.get().peerInfos.len == 1 - response.get().peerInfos[0].enr == node2.wakuDiscV5.protocol.localNode.record.raw + response.get().peerInfos[0].enr == disc2.protocol.localNode.record.raw - await allFutures([node1.stop(), node2.stop(), node3.stop()]) + await allFutures([node1.stop(), node2.stop(), node3.stop(), disc1.stop(), disc2.stop()]) asyncTest "peer exchange request functions returns some discovered peers": let diff --git a/tests/v2/testlib/wakunode.nim b/tests/v2/testlib/wakunode.nim index 5621d9c69..47e836ffc 100644 --- a/tests/v2/testlib/wakunode.nim +++ b/tests/v2/testlib/wakunode.nim @@ -36,7 +36,6 @@ proc newTestWakuNode*(nodeKey: crypto.PrivateKey, sendSignedPeerRecord = false, dns4DomainName = none(string), discv5UdpPort = none(Port), - wakuDiscv5 = none(WakuDiscoveryV5), agentString = none(string), peerStoreCapacity = none(int)): WakuNode = let netConfigRes = NetConfig.init( @@ -69,6 +68,5 @@ proc newTestWakuNode*(nodeKey: crypto.PrivateKey, agentString = agentString, ) - builder.withWakuDiscv5(wakuDiscv5.get(nil)) return builder.build().get() diff --git a/tests/wakunode2/test_app.nim b/tests/wakunode2/test_app.nim index 4673de71b..040582db0 100644 --- a/tests/wakunode2/test_app.nim +++ b/tests/wakunode2/test_app.nim @@ -63,9 +63,9 @@ suite "Wakunode2 - App initialization": var wakunode2 = App.init(rng(), conf) require wakunode2.setupPeerPersistence().isOk() require wakunode2.setupDyamicBootstrapNodes().isOk() - require wakunode2.setupWakuNode().isOk() + require wakunode2.setupWakuApp().isOk() require isOk(waitFor wakunode2.setupAndMountProtocols()) - require isOk(waitFor wakunode2.startNode()) + require isOk(waitFor wakunode2.startApp()) require wakunode2.setupMonitoringAndExternalInterfaces().isOk() ## Then diff --git a/waku/v2/node/builder.nim b/waku/v2/node/builder.nim index 95c801644..dcd2c3688 100644 --- a/waku/v2/node/builder.nim +++ b/waku/v2/node/builder.nim @@ -44,9 +44,6 @@ type switchSslSecureCert: Option[string] switchSendSignedPeerRecord: Option[bool] - # Waku discv5 - wakuDiscv5: Option[WakuDiscoveryV5] - WakuNodeBuilderResult* = Result[void, string] @@ -132,14 +129,6 @@ proc withSwitchConfiguration*(builder: var WakuNodeBuilder, if not nameResolver.isNil(): builder.switchNameResolver = some(nameResolver) - -## Waku discv5 - -proc withWakuDiscv5*(builder: var WakuNodeBuilder, instance: WakuDiscoveryV5) = - if not instance.isNil(): - builder.wakuDiscv5 = some(instance) - - ## Build proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] = @@ -196,7 +185,6 @@ proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] = netConfig = builder.netConfig.get(), enr = builder.record, switch = switch, - wakuDiscv5 = builder.wakuDiscv5, peerManager = peerManager, rng = rng, ) diff --git a/waku/v2/node/waku_node.nim b/waku/v2/node/waku_node.nim index b910a520a..b888715f7 100644 --- a/waku/v2/node/waku_node.nim +++ b/waku/v2/node/waku_node.nim @@ -813,65 +813,6 @@ proc startKeepalive*(node: WakuNode) = asyncSpawn node.keepaliveLoop(defaultKeepalive) -proc runDiscv5Loop(node: WakuNode) {.async.} = - ## Continuously add newly discovered nodes using Node Discovery v5 - if node.wakuDiscv5.isNil(): - warn "Trying to run discovery v5 while it's disabled" - return - - info "starting discv5 discovery loop" - - let shardPredOp = shardingPredicate(node.enr) - - while node.wakuDiscv5.listening: - trace "running discv5 discovery loop" - let discoveredRecords = await node.wakuDiscv5.findRandomPeers(shardPredOp) - let discoveredPeers = discoveredRecords.mapIt(it.toRemotePeerInfo()).filterIt(it.isOk()).mapIt(it.value) - - for peer in discoveredPeers: - let isNew = not node.peerManager.peerStore[AddressBook].contains(peer.peerId) - if isNew: - debug "new peer discovered", peer= $peer, origin= "discv5" - - node.peerManager.addPeer(peer, PeerOrigin.Discv5) - - # Discovery `queryRandom` can have a synchronous fast path for example - # when no peers are in the routing table. Don't run it in continuous loop. - # - # Also, give some time to dial the discovered nodes and update stats, etc. - await sleepAsync(5.seconds) - -proc startDiscv5*(node: WakuNode): Future[Result[void, string]] {.async.} = - ## Start Discovery v5 service - if node.wakuDiscv5.isNil(): - return err("discovery v5 is disabled") - - info "Starting discovery v5 service" - let res = node.wakuDiscv5.start() - if res.isErr(): - return err("error in startDiscv5: " & res.error) - - trace "Start discovering new peers using discv5" - asyncSpawn node.runDiscv5Loop() - - debug "Successfully started discovery v5 service" - info "Discv5: discoverable ENR ", enr = node.wakuDiscV5.protocol.localNode.record.toUri() - return ok() - - -proc stopDiscv5*(node: WakuNode): Future[bool] {.async.} = - ## Stop Discovery v5 service - - if not node.wakuDiscv5.isNil(): - info "Stopping discovery v5 service" - - ## Stop Discovery v5 process and close listening port - if node.wakuDiscv5.listening: - trace "Stop listening on discv5 port" - await node.wakuDiscv5.closeWait() - - debug "Successfully stopped discovery v5 service" - proc mountRendezvous*(node: WakuNode) {.async, raises: [Defect, LPError].} = info "mounting rendezvous discovery protocol" @@ -923,9 +864,6 @@ proc stop*(node: WakuNode) {.async.} = if not node.wakuRelay.isNil(): await node.wakuRelay.stop() - if not node.wakuDiscv5.isNil(): - discard await node.stopDiscv5() - await node.switch.stop() node.peerManager.stop() diff --git a/waku/v2/waku_discv5.nim b/waku/v2/waku_discv5.nim index 8e8b94137..2d19b0ffd 100644 --- a/waku/v2/waku_discv5.nim +++ b/waku/v2/waku_discv5.nim @@ -15,6 +15,7 @@ import eth/p2p/discoveryv5/node, eth/p2p/discoveryv5/protocol import + ../../waku/v2/node/peer_manager/peer_manager, ./waku_core, ./waku_enr @@ -121,34 +122,6 @@ proc new*(T: type WakuDiscoveryV5, WakuDiscoveryV5.new(rng, conf, some(record)) - -proc start*(wd: WakuDiscoveryV5): Result[void, string] = - if wd.listening: - return err("already listening") - - # Start listening on configured port - debug "start listening on udp port", address = $wd.conf.address, port = $wd.conf.port - try: - wd.protocol.open() - except CatchableError: - return err("failed to open udp port: " & getCurrentExceptionMsg()) - - wd.listening = true - - # Start Discovery v5 - trace "start discv5 service" - wd.protocol.start() - - ok() - -proc closeWait*(wd: WakuDiscoveryV5) {.async.} = - debug "closing Waku discovery v5 node" - if not wd.listening: - return - - wd.listening = false - await wd.protocol.closeWait() - proc shardingPredicate*(record: Record): Option[WakuDiscv5Predicate] = ## Filter peers based on relay sharding information @@ -185,6 +158,64 @@ proc findRandomPeers*(wd: WakuDiscoveryV5, pred = none(WakuDiscv5Predicate)): Fu return discoveredRecords +#TODO abstract away PeerManager +proc searchLoop*(wd: WakuDiscoveryV5, peerManager: PeerManager, record: Option[enr.Record]) {.async.} = + ## Continuously add newly discovered nodes + + info "Starting discovery v5 search" + + let shardPredOp = + if record.isSome(): + shardingPredicate(record.get()) + else: + none(WakuDiscv5Predicate) + + while wd.listening: + trace "running discv5 discovery loop" + let discoveredRecords = await wd.findRandomPeers(shardPredOp) + let discoveredPeers = discoveredRecords.mapIt(it.toRemotePeerInfo()).filterIt(it.isOk()).mapIt(it.value) + + for peer in discoveredPeers: + # Peers added are filtered by the peer manager + peerManager.addPeer(peer, PeerOrigin.Discv5) + + # Discovery `queryRandom` can have a synchronous fast path for example + # when no peers are in the routing table. Don't run it in continuous loop. + # + # Also, give some time to dial the discovered nodes and update stats, etc. + await sleepAsync(5.seconds) + +proc start*(wd: WakuDiscoveryV5): Future[Result[void, string]] {.async.} = + if wd.listening: + return err("already listening") + + info "Starting discovery v5 service" + + debug "start listening on udp port", address = $wd.conf.address, port = $wd.conf.port + try: + wd.protocol.open() + except CatchableError: + return err("failed to open udp port: " & getCurrentExceptionMsg()) + + wd.listening = true + + trace "start discv5 service" + wd.protocol.start() + + debug "Successfully started discovery v5 service" + info "Discv5: discoverable ENR ", enr = wd.protocol.localNode.record.toUri() + +proc stop*(wd: WakuDiscoveryV5): Future[void] {.async.} = + if not wd.listening: + return + + info "Stopping discovery v5 service" + + wd.listening = false + trace "Stop listening on discv5 port" + await wd.protocol.closeWait() + + debug "Successfully stopped discovery v5 service" ## Helper functions