diff --git a/apps/wakunode2/app.nim b/apps/wakunode2/app.nim index e8cbdbf15..26ecef78d 100644 --- a/apps/wakunode2/app.nim +++ b/apps/wakunode2/app.nim @@ -652,13 +652,11 @@ proc startNode(node: WakuNode, conf: WakuNodeConf, except CatchableError: return err("failed to start waku node: " & getCurrentExceptionMsg()) - # Start discv5 and connect to discovered nodes + # Start discv5 based discovery service (discovery loop) if conf.discv5Discovery: - try: - if not await node.startDiscv5(): - error "could not start Discovery v5" - except CatchableError: - return err("failed to start waku discovery v5: " & getCurrentExceptionMsg()) + let startDiscv5Res = await node.startDiscv5() + if startDiscv5Res.isErr(): + return err("failed to start waku discovery v5: " & startDiscv5Res.error) # Connect to configured static nodes if conf.staticnodes.len > 0: diff --git a/examples/v2/publisher.nim b/examples/v2/publisher.nim index 4aca434b5..99f9a8101 100644 --- a/examples/v2/publisher.nim +++ b/examples/v2/publisher.nim @@ -66,8 +66,10 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} = await node.start() await node.mountRelay() node.peerManager.start() - if not await node.startDiscv5(): - error "failed to start discv5" + + let discv5Res = await node.startDiscv5() + if discv5Res.isErr(): + error "failed to start discv5", error= discv5Res.error quit(1) # wait for a minimum of peers to be connected, otherwise messages wont be gossiped diff --git a/examples/v2/subscriber.nim b/examples/v2/subscriber.nim index 4d87df375..b8e1cb913 100644 --- a/examples/v2/subscriber.nim +++ b/examples/v2/subscriber.nim @@ -61,8 +61,10 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} = await node.start() await node.mountRelay() node.peerManager.start() - if not await node.startDiscv5(): - error "failed to start discv5" + + let discv5Res = await node.startDiscv5() + if discv5Res.isErr(): + error "failed to start discv5", error = discv5Res.error quit(1) # wait for a minimum of peers to be connected, otherwise messages wont be gossiped diff --git a/tests/v2/test_waku_discv5.nim b/tests/v2/test_waku_discv5.nim index 9d7c53d3a..3a56c4576 100644 --- a/tests/v2/test_waku_discv5.nim +++ b/tests/v2/test_waku_discv5.nim @@ -1,201 +1,282 @@ {.used.} import - stew/[results, byteutils], + std/[sequtils, sets], + stew/results, stew/shims/net, chronos, chronicles, testutils/unittests, - libp2p/crypto/crypto, - eth/keys, - eth/p2p/discoveryv5/enr + libp2p/crypto/crypto as libp2p_keys, + eth/keys as eth_keys import ../../waku/v2/waku_node, - ../../waku/v2/waku_core, + ../../waku/v2/waku_enr, ../../waku/v2/waku_discv5, ./testlib/common, ./testlib/wakucore, ./testlib/wakunode + +proc newTestEnrRecord(privKey: libp2p_keys.PrivateKey, + extIp: string, tcpPort: uint16, udpPort: uint16, + flags = none(CapabilitiesBitfield)): waku_enr.Record = + var builder = EnrBuilder.init(privKey) + builder.withIpAddressAndPorts( + ipAddr = some(ValidIpAddress.init(extIp)), + tcpPort = some(Port(tcpPort)), + udpPort = some(Port(udpPort)), + ) + + if flags.isSome(): + builder.withWakuCapabilities(flags.get()) + + builder.build().tryGet() + + +proc newTestDiscv5Node(privKey: libp2p_keys.PrivateKey, + bindIp: string, tcpPort: uint16, udpPort: uint16, + record: waku_enr.Record, + bootstrapRecords = newSeq[waku_enr.Record]()): WakuNode = + let config = WakuDiscoveryV5Config( + privateKey: eth_keys.PrivateKey(privKey.skkey), + address: ValidIpAddress.init(bindIp), + port: Port(udpPort), + bootstrapRecords: bootstrapRecords, + ) + + let protocol = WakuDiscoveryV5.new(rng(), config, some(record)) + let node = newTestWakuNode( + nodeKey = privKey, + bindIp = ValidIpAddress.init(bindIp), + bindPort = Port(tcpPort), + wakuDiscv5 = some(protocol) + ) + + return node + + + procSuite "Waku Discovery v5": - asyncTest "Waku Discovery v5 end-to-end": - ## Tests integrated discovery v5 + asyncTest "find random peers": + ## Given + # Node 1 let - bindIp = ValidIpAddress.init("0.0.0.0") - extIp = ValidIpAddress.init("127.0.0.1") + privKey1 = generateSecp256k1Key() + bindIp1 = "0.0.0.0" + extIp1 = "127.0.0.1" + tcpPort1 = 61500u16 + udpPort1 = 9000u16 - nodeKey1 = generateSecp256k1Key() - nodeTcpPort1 = Port(61500) - nodeUdpPort1 = Port(9000) - node1 = newTestWakuNode(nodeKey1, bindIp, nodeTcpPort1) + let record1 = newTestEnrRecord( + privKey = privKey1, + extIp = extIp1, + tcpPort = tcpPort1, + udpPort = udpPort1, + ) + let node1 = newTestDiscv5Node( + privKey = privKey1, + bindIp = bindIp1, + tcpPort = tcpPort1, + udpPort = udpPort1, + record = record1 + ) - nodeKey2 = generateSecp256k1Key() - nodeTcpPort2 = Port(61502) - nodeUdpPort2 = Port(9002) - node2 = newTestWakuNode(nodeKey2, bindIp, nodeTcpPort2) - - nodeKey3 = generateSecp256k1Key() - nodeTcpPort3 = Port(61504) - nodeUdpPort3 = Port(9004) - node3 = newTestWakuNode(nodeKey3, bindIp, nodeTcpPort3) - - flags = CapabilitiesBitfield.init( - lightpush = false, - filter = false, - store = false, - relay = true - ) - - # E2E relay test paramaters - pubSubTopic = "/waku/2/default-waku/proto" - contentTopic = ContentTopic("/waku/2/default-content/proto") - payload = "Can you see me?".toBytes() - message = WakuMessage(payload: payload, contentTopic: contentTopic) - - # Mount discv5 - node1.wakuDiscv5 = WakuDiscoveryV5.new( - some(extIp), some(nodeTcpPort1), some(nodeUdpPort1), - bindIp, - nodeUdpPort1, - newSeq[enr.Record](), - false, - keys.PrivateKey(nodeKey1.skkey), - flags, - newSeq[MultiAddress](), # Empty multiaddr fields, for now - node1.rng - ) - - node2.wakuDiscv5 = WakuDiscoveryV5.new( - some(extIp), some(nodeTcpPort2), some(nodeUdpPort2), - bindIp, - nodeUdpPort2, - @[node1.wakuDiscv5.protocol.localNode.record], # Bootstrap with node1 - false, - keys.PrivateKey(nodeKey2.skkey), - flags, - newSeq[MultiAddress](), # Empty multiaddr fields, for now - node2.rng - ) - - node3.wakuDiscv5 = WakuDiscoveryV5.new( - some(extIp), some(nodeTcpPort3), some(nodeUdpPort3), - bindIp, - nodeUdpPort3, - @[node2.wakuDiscv5.protocol.localNode.record], # Bootstrap with node2 - false, - keys.PrivateKey(nodeKey3.skkey), - flags, - newSeq[MultiAddress](), # Empty multiaddr fields, for now - node3.rng - ) - - await node1.mountRelay() - await node2.mountRelay() - await node3.mountRelay() - - await allFutures([node1.start(), node2.start(), node3.start()]) - - await allFutures([node1.startDiscv5(), node2.startDiscv5(), node3.startDiscv5()]) - - await sleepAsync(3000.millis) # Give the algorithm some time to work its magic - check: - node1.wakuDiscv5.protocol.nodesDiscovered > 0 - node2.wakuDiscv5.protocol.nodesDiscovered > 0 - node3.wakuDiscv5.protocol.nodesDiscovered > 0 - - # Let's see if we can deliver a message end-to-end - # var completionFut = newFuture[bool]() - # proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = - # let msg = WakuMessage.decode(data) - # if msg.isOk(): - # let val = msg.value() - # check: - # topic == pubSubTopic - # val.contentTopic == contentTopic - # val.payload == payload - # completionFut.complete(true) - - # node3.subscribe(pubSubTopic, relayHandler) - # await sleepAsync(2000.millis) - - # await node1.publish(pubSubTopic, message) - - # check: - # (await completionFut.withTimeout(6.seconds)) == true - - await allFutures([node1.stop(), node2.stop(), node3.stop()]) - - asyncTest "Custom multiaddresses are advertised correctly": + # Node 2 let - bindIp = ValidIpAddress.init("0.0.0.0") - extIp = ValidIpAddress.init("127.0.0.1") - expectedMultiAddr = MultiAddress.init("/ip4/200.200.200.200/tcp/9000/wss").tryGet() + privKey2 = generateSecp256k1Key() + bindIp2 = "0.0.0.0" + extIp2 = "127.0.0.1" + tcpPort2 = 61502u16 + udpPort2 = 9002u16 - flags = CapabilitiesBitfield.init( - lightpush = false, - filter = false, - store = false, - relay = true - ) + let record2 = newTestEnrRecord( + privKey = privKey2, + extIp = extIp2, + tcpPort = tcpPort2, + udpPort = udpPort2, + ) - nodeTcpPort1 = Port(9010) - nodeUdpPort1 = Port(9012) - node1Key = generateSecp256k1Key() - node1NetConfig = NetConfig.init(bindIp = bindIp, - extIp = some(extIp), - extPort = some(nodeTcpPort1), - bindPort = nodeTcpPort1, - extmultiAddrs = @[expectedMultiAddr], - wakuFlags = some(flags), - discv5UdpPort = some(nodeUdpPort1)).get() - node1discV5 = WakuDiscoveryV5.new(extIp = node1NetConfig.extIp, - extTcpPort = node1NetConfig.extPort, - extUdpPort = node1NetConfig.discv5UdpPort, - bindIp = node1NetConfig.bindIp, - discv5UdpPort = node1NetConfig.discv5UdpPort.get(), - privateKey = keys.PrivateKey(node1Key.skkey), - multiaddrs = node1NetConfig.enrMultiaddrs, - flags = node1NetConfig.wakuFlags.get(), - rng = rng) - node1 = WakuNode.new(nodekey = node1Key, - netConfig = node1NetConfig, - wakuDiscv5 = some(node1discV5), - rng = rng) + let node2 = newTestDiscv5Node( + privKey = privKey2, + bindIp = bindIp2, + tcpPort = tcpPort2, + udpPort = udpPort2, + record = record2, + ) + # Node 3 + let + privKey3 = generateSecp256k1Key() + bindIp3 = "0.0.0.0" + extIp3 = "127.0.0.1" + tcpPort3 = 61504u16 + udpPort3 = 9004u16 - nodeTcpPort2 = Port(9014) - nodeUdpPort2 = Port(9016) - node2Key = generateSecp256k1Key() - node2NetConfig = NetConfig.init(bindIp = bindIp, - extIp = some(extIp), - extPort = some(nodeTcpPort2), - bindPort = nodeTcpPort2, - wakuFlags = some(flags), - discv5UdpPort = some(nodeUdpPort2)).get() - node2discV5 = WakuDiscoveryV5.new(extIp = node2NetConfig.extIp, - extTcpPort = node2NetConfig.extPort, - extUdpPort = node2NetConfig.discv5UdpPort, - bindIp = node2NetConfig.bindIp, - discv5UdpPort = node2NetConfig.discv5UdpPort.get(), - bootstrapEnrs = @[node1.wakuDiscv5.protocol.localNode.record], - privateKey = keys.PrivateKey(node2Key.skkey), - flags = node2NetConfig.wakuFlags.get(), - rng = rng) - node2 = WakuNode.new(nodeKey = node2Key, - netConfig = node2NetConfig, - wakuDiscv5 = some(node2discV5)) + let record3 = newTestEnrRecord( + privKey = privKey3, + extIp = extIp3, + tcpPort = tcpPort3, + udpPort = udpPort3, + ) - await allFutures([node1.start(), node2.start()]) + let node3 = newTestDiscv5Node( + privKey = privKey3, + bindIp = bindIp3, + tcpPort = tcpPort3, + udpPort = udpPort3, + record = record3, + bootstrapRecords = @[record1, record2] + ) - await allFutures([node1.startDiscv5(), node2.startDiscv5()]) + await allFutures(node1.start(), node2.start(), node3.start()) - await sleepAsync(3000.millis) # Give the algorithm some time to work its magic + ## When + # Starting discv5 via `WakuNode.startDiscV5()` starts the discv5 background task. + await allFutures(node1.startDiscv5(), node2.startDiscv5(), node3.startDiscv5()) - let node1Enr = node2.wakuDiscv5.protocol.routingTable.buckets[0].nodes[0].record - let multiaddrs = node1Enr.toTyped().get().multiaddrs.get() + await sleepAsync(5.seconds) # Wait for discv5 discovery loop to run + let res = await node1.wakuDiscv5.findRandomPeers() + ## Then check: - node1.wakuDiscv5.protocol.nodesDiscovered > 0 - node2.wakuDiscv5.protocol.nodesDiscovered > 0 - multiaddrs.contains(expectedMultiAddr) + res.len >= 1 + ## Cleanup + await allFutures(node1.stop(), node2.stop(), node3.stop()) + + asyncTest "find random peers with predicate": + ## Setup + # Records + let + privKey1 = generateSecp256k1Key() + bindIp1 = "0.0.0.0" + extIp1 = "127.0.0.1" + tcpPort1 = 61500u16 + udpPort1 = 9000u16 + + let record1 = newTestEnrRecord( + privKey = privKey1, + extIp = extIp1, + tcpPort = tcpPort1, + udpPort = udpPort1, + flags = some(CapabilitiesBitfield.init(Capabilities.Relay)) + ) + + let + privKey2 = generateSecp256k1Key() + bindIp2 = "0.0.0.0" + extIp2 = "127.0.0.1" + tcpPort2 = 61502u16 + udpPort2 = 9002u16 + + let record2 = newTestEnrRecord( + privKey = privKey2, + extIp = extIp2, + tcpPort = tcpPort2, + udpPort = udpPort2, + flags = some(CapabilitiesBitfield.init(Capabilities.Relay, Capabilities.Store)) + ) + + let + privKey3 = generateSecp256k1Key() + bindIp3 = "0.0.0.0" + extIp3 = "127.0.0.1" + tcpPort3 = 61504u16 + udpPort3 = 9004u16 + + let record3 = newTestEnrRecord( + privKey = privKey3, + extIp = extIp3, + tcpPort = tcpPort3, + udpPort = udpPort3, + flags = some(CapabilitiesBitfield.init(Capabilities.Relay, Capabilities.Filter)) + ) + + let + privKey4 = generateSecp256k1Key() + bindIp4 = "0.0.0.0" + extIp4 = "127.0.0.1" + tcpPort4 = 61506u16 + udpPort4 = 9006u16 + + let record4 = newTestEnrRecord( + privKey = privKey4, + extIp = extIp4, + tcpPort = tcpPort4, + udpPort = udpPort4, + flags = some(CapabilitiesBitfield.init(Capabilities.Relay, Capabilities.Store)) + ) + + + # Nodes + let node1 = newTestDiscv5Node( + privKey = privKey1, + bindIp = bindIp1, + tcpPort = tcpPort1, + udpPort = udpPort1, + record = record1, + bootstrapRecords = @[record2] + ) + let node2 = newTestDiscv5Node( + privKey = privKey2, + bindIp = bindIp2, + tcpPort = tcpPort2, + udpPort = udpPort2, + record = record2, + bootstrapRecords = @[record3, record4] + ) + + let node3 = newTestDiscv5Node( + privKey = privKey3, + bindIp = bindIp3, + tcpPort = tcpPort3, + udpPort = udpPort3, + record = record3 + ) + + let node4 = newTestDiscv5Node( + privKey = privKey4, + bindIp = bindIp4, + tcpPort = tcpPort4, + udpPort = udpPort4, + record = record4 + ) + + # Start nodes' discoveryV5 protocols + require node1.wakuDiscV5.start().isOk() + require node2.wakuDiscV5.start().isOk() + require node3.wakuDiscV5.start().isOk() + require node4.wakuDiscV5.start().isOk() + + await allFutures(node1.start(), node2.start(), node3.start(), node4.start()) + + ## Given + let recordPredicate = proc(record: waku_enr.Record): bool = + let typedRecord = record.toTyped() + if typedRecord.isErr(): + return false + + let capabilities = typedRecord.value.waku2 + if capabilities.isNone(): + return false + + return capabilities.get().supportsCapability(Capabilities.Store) + + + ## When + # # Do a random peer search with a predicate multiple times + # var peers = initHashSet[waku_enr.Record]() + # for i in 0..<10: + # for peer in await node1.wakuDiscv5.findRandomPeers(pred=recordPredicate): + # peers.incl(peer) + await sleepAsync(5.seconds) # Wait for discv5 discvery loop to run + let peers = await node1.wakuDiscv5.findRandomPeers(pred=recordPredicate) + + ## Then + check: + peers.len >= 1 + peers.allIt(it.supportsCapability(Capabilities.Store)) + + # Cleanup + await allFutures(node1.stop(), node2.stop(), node3.stop(), node4.stop()) diff --git a/tests/v2/test_waku_peer_exchange.nim b/tests/v2/test_waku_peer_exchange.nim index ae23786a2..1e310fbf2 100644 --- a/tests/v2/test_waku_peer_exchange.nim +++ b/tests/v2/test_waku_peer_exchange.nim @@ -126,8 +126,8 @@ procSuite "Waku Peer Exchange": ) ## Given - await allFutures([node1.start(), node2.start(), node3.start()]) - await allFutures([node1.startDiscv5(), node2.startDiscv5()]) + await allFutures(node1.start(), node2.start(), node3.start()) + await allFutures(node1.startDiscv5(), node2.startDiscv5()) var attempts = 10 while (node1.wakuDiscv5.protocol.nodesDiscovered < 1 or diff --git a/waku/v2/node/waku_node.nim b/waku/v2/node/waku_node.nim index 8d7b86a91..1b0413610 100644 --- a/waku/v2/node/waku_node.nim +++ b/waku/v2/node/waku_node.nim @@ -856,59 +856,48 @@ proc startKeepalive*(node: WakuNode) = asyncSpawn node.keepaliveLoop(defaultKeepalive) proc runDiscv5Loop(node: WakuNode) {.async.} = - ## Continuously add newly discovered nodes - ## using Node Discovery v5 - if (node.wakuDiscv5.isNil): + ## Continuously add newly discovered nodes using Node Discovery v5 + if node.wakuDiscv5.isNil(): warn "Trying to run discovery v5 while it's disabled" return - info "Starting discovery loop" + info "starting discv5 discovery loop" while node.wakuDiscv5.listening: - trace "Running discovery loop" - let discoveredPeersRes = await node.wakuDiscv5.findRandomPeers() + trace "running discv5 discovery loop" + let discoveredRecords = await node.wakuDiscv5.findRandomPeers() + let discoveredPeers = discoveredRecords.mapIt(it.toRemotePeerInfo()).filterIt(it.isOk()).mapIt(it.value) - if discoveredPeersRes.isOk: - let discoveredPeers = discoveredPeersRes.get - let newSeen = discoveredPeers.countIt(not node.peerManager.peerStore[AddressBook].contains(it.peerId)) - info "Discovered peers", discovered=discoveredPeers.len, new=newSeen + for peer in discoveredPeers: + let isNew = not node.peerManager.peerStore[AddressBook].contains(peer.peerId) + if isNew: + debug "new peer discovered", peer= $peer, origin= "discv5" - # Add all peers, new ones and already seen (in case their addresses changed) - for peer in discoveredPeers: - node.peerManager.addPeer(peer, Discv5) + node.peerManager.addPeer(peer, PeerOrigin.Discv5) # Discovery `queryRandom` can have a synchronous fast path for example # when no peers are in the routing table. Don't run it in continuous loop. # - # Also, give some time to dial the discovered nodes and update stats etc + # Also, give some time to dial the discovered nodes and update stats, etc. await sleepAsync(5.seconds) -proc startDiscv5*(node: WakuNode): Future[bool] {.async.} = +proc startDiscv5*(node: WakuNode): Future[Result[void, string]] {.async.} = ## Start Discovery v5 service + if node.wakuDiscv5.isNil(): + return err("discovery v5 is disabled") info "Starting discovery v5 service" + let res = node.wakuDiscv5.start() + if res.isErr(): + return err("error in startDiscv5: " & res.error) - if not node.wakuDiscv5.isNil(): - ## First start listening on configured port - try: - trace "Start listening on discv5 port" - node.wakuDiscv5.open() - except CatchableError: - error "Failed to start discovery service. UDP port may be already in use" - return false + trace "Start discovering new peers using discv5" + asyncSpawn node.runDiscv5Loop() - ## Start Discovery v5 - trace "Start discv5 service" - node.wakuDiscv5.start() - trace "Start discovering new peers using discv5" + debug "Successfully started discovery v5 service" + info "Discv5: discoverable ENR ", enr = node.wakuDiscV5.protocol.localNode.record.toUri() + return ok() - asyncSpawn node.runDiscv5Loop() - - debug "Successfully started discovery v5 service" - info "Discv5: discoverable ENR ", enr = node.wakuDiscV5.protocol.localNode.record.toUri() - return true - - return false proc stopDiscv5*(node: WakuNode): Future[bool] {.async.} = ## Stop Discovery v5 service diff --git a/waku/v2/waku_discv5.nim b/waku/v2/waku_discv5.nim index 259dd1633..7616d7af4 100644 --- a/waku/v2/waku_discv5.nim +++ b/waku/v2/waku_discv5.nim @@ -4,15 +4,14 @@ else: {.push raises: [].} import - std/[strutils, options], + std/[sequtils, strutils, options], stew/results, stew/shims/net, chronos, chronicles, metrics, libp2p/multiaddress, - eth/keys, - eth/p2p/discoveryv5/enr, + eth/keys as eth_keys, eth/p2p/discoveryv5/node, eth/p2p/discoveryv5/protocol import @@ -29,14 +28,122 @@ logScope: topics = "waku discv5" +## Config + +type WakuDiscoveryV5Config* = object + discv5Config*: Option[DiscoveryConfig] + address*: ValidIpAddress + port*: Port + privateKey*: eth_keys.PrivateKey + bootstrapRecords*: seq[waku_enr.Record] + autoupdateRecord*: bool + + +## Protocol + +type WakuDiscv5Predicate* = proc(record: waku_enr.Record): bool {.closure, gcsafe.} + type WakuDiscoveryV5* = ref object + conf: WakuDiscoveryV5Config protocol*: protocol.Protocol listening*: bool +proc new*(T: type WakuDiscoveryV5, rng: ref HmacDrbgContext, conf: WakuDiscoveryV5Config, record: Option[waku_enr.Record]): T = + let protocol = newProtocol( + rng = rng, + config = conf.discv5Config.get(protocol.defaultDiscoveryConfig), + bindPort = conf.port, + bindIp = conf.address, + privKey = conf.privateKey, + bootstrapRecords = conf.bootstrapRecords, + enrAutoUpdate = conf.autoupdateRecord, + previousRecord = record, + enrIp = none(ValidIpAddress), + enrTcpPort = none(Port), + enrUdpPort = none(Port), + ) -#################### -# Helper functions # -#################### + WakuDiscoveryV5(conf: conf, protocol: protocol, listening: false) + +proc new*(T: type WakuDiscoveryV5, + extIp: Option[ValidIpAddress], + extTcpPort: Option[Port], + extUdpPort: Option[Port], + bindIP: ValidIpAddress, + discv5UdpPort: Port, + bootstrapEnrs = newSeq[enr.Record](), + enrAutoUpdate = false, + privateKey: eth_keys.PrivateKey, + flags: CapabilitiesBitfield, + multiaddrs = newSeq[MultiAddress](), + rng: ref HmacDrbgContext, + discv5Config: protocol.DiscoveryConfig = protocol.defaultDiscoveryConfig): T {. + deprecated: "use the config and record proc variant instead".}= + + let record = block: + var builder = EnrBuilder.init(privateKey) + builder.withIpAddressAndPorts( + ipAddr = extIp, + tcpPort = extTcpPort, + udpPort = extUdpPort, + ) + builder.withWakuCapabilities(flags) + builder.withMultiaddrs(multiaddrs) + builder.build().expect("Record within size limits") + + let conf = WakuDiscoveryV5Config( + discv5Config: some(discv5Config), + address: bindIP, + port: discv5UdpPort, + privateKey: privateKey, + bootstrapRecords: bootstrapEnrs, + autoupdateRecord: enrAutoUpdate, + ) + + WakuDiscoveryV5.new(rng, conf, some(record)) + + +proc start*(wd: WakuDiscoveryV5): Result[void, string] = + if wd.listening: + return err("already listening") + + # Start listening on configured port + debug "start listening on udp port", address = $wd.conf.address, port = $wd.conf.port + try: + wd.protocol.open() + except CatchableError: + return err("failed to open udp port: " & getCurrentExceptionMsg()) + + wd.listening = true + + # Start Discovery v5 + trace "start discv5 service" + wd.protocol.start() + + ok() + +proc closeWait*(wd: WakuDiscoveryV5) {.async.} = + debug "closing Waku discovery v5 node" + if not wd.listening: + return + + wd.listening = false + await wd.protocol.closeWait() + +proc findRandomPeers*(wd: WakuDiscoveryV5, pred: WakuDiscv5Predicate = nil): Future[seq[waku_enr.Record]] {.async.} = + ## Find random peers to connect to using Discovery v5 + let discoveredNodes = await wd.protocol.queryRandom() + + var discoveredRecords = discoveredNodes.mapIt(it.record) + + # Filter out nodes that do not match the predicate + if not pred.isNil(): + discoveredRecords = discoveredRecords.filter(pred) + + return discoveredRecords + + +## Helper functions proc parseBootstrapAddress(address: string): Result[enr.Record, cstring] = logScope: @@ -71,91 +178,3 @@ proc addBootstrapNode*(bootstrapAddr: string, return bootstrapEnrs.add(enrRes.value) - - -#################### -# Discovery v5 API # -#################### - -proc new*(T: type WakuDiscoveryV5, - extIp: Option[ValidIpAddress], - extTcpPort: Option[Port], - extUdpPort: Option[Port], - bindIP: ValidIpAddress, - discv5UdpPort: Port, - bootstrapEnrs = newSeq[enr.Record](), - enrAutoUpdate = false, - privateKey: keys.PrivateKey, - flags: CapabilitiesBitfield, - multiaddrs = newSeq[MultiAddress](), - rng: ref HmacDrbgContext, - discv5Config: protocol.DiscoveryConfig = protocol.defaultDiscoveryConfig): T = - - # Add the waku capabilities field - var enrInitFields = @[(CapabilitiesEnrField, @[flags.byte])] - - # Add the waku multiaddrs field - if multiaddrs.len > 0: - let value = waku_enr.encodeMultiaddrs(multiaddrs) - enrInitFields.add((MultiaddrEnrField, value)) - - let protocol = newProtocol( - privateKey, - enrIp = extIp, - enrTcpPort = extTcpPort, - enrUdpPort = extUdpPort, - enrInitFields, - bootstrapEnrs, - bindPort = discv5UdpPort, - bindIp = bindIP, - enrAutoUpdate = enrAutoUpdate, - config = discv5Config, - rng = rng - ) - - WakuDiscoveryV5(protocol: protocol, listening: false) - -# TODO: Do not raise an exception, return a result -proc open*(wd: WakuDiscoveryV5) {.raises: [CatchableError].} = - debug "Opening Waku discovery v5 ports" - if wd.listening: - return - - wd.protocol.open() - wd.listening = true - -proc start*(wd: WakuDiscoveryV5) = - debug "starting Waku discovery v5 service" - wd.protocol.start() - -proc closeWait*(wd: WakuDiscoveryV5) {.async.} = - debug "closing Waku discovery v5 node" - if not wd.listening: - return - - wd.listening = false - await wd.protocol.closeWait() - -proc findRandomPeers*(wd: WakuDiscoveryV5): Future[Result[seq[RemotePeerInfo], cstring]] {.async.} = - ## Find random peers to connect to using Discovery v5 - - # Query for a random target and collect all discovered nodes - let discoveredNodes = await wd.protocol.queryRandom() - - ## Filter based on our needs - # let filteredNodes = discoveredNodes.filter(isWakuNode) # Currently only a single predicate - # TODO: consider node filtering based on ENR; we do not filter based on ENR in the first waku discv5 beta stage - - var discoveredPeers: seq[RemotePeerInfo] - - for node in discoveredNodes: - let res = node.record.toRemotePeerInfo() - if res.isErr(): - error "failed to convert ENR to peer info", enr= $node.record, err=res.error - waku_discv5_errors.inc(labelValues = ["peer_info_failure"]) - continue - - discoveredPeers.add(res.value) - - - return ok(discoveredPeers)