From d7d00bfd79589dfd677a970d84f573c1419c0def Mon Sep 17 00:00:00 2001 From: gabrielmer <101006718+gabrielmer@users.noreply.github.com> Date: Tue, 3 Dec 2024 14:39:37 +0100 Subject: [PATCH] feat: making dns discovery async (#3175) --- apps/chat2/chat2.nim | 2 +- apps/networkmonitor/networkmonitor.nim | 25 ++++--- .../requests/discovery_request.nim | 10 +-- tests/test_waku_dnsdisc.nim | 2 +- vendor/nim-dnsdisc | 2 +- waku/discovery/waku_discv5.nim | 10 +++ waku/discovery/waku_dnsdisc.nim | 24 ++++--- waku/factory/waku.nim | 68 +++++++++++++++---- 8 files changed, 101 insertions(+), 42 deletions(-) diff --git a/apps/chat2/chat2.nim b/apps/chat2/chat2.nim index f844deada..ae2dc141e 100644 --- a/apps/chat2/chat2.nim +++ b/apps/chat2/chat2.nim @@ -439,7 +439,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = var wakuDnsDiscovery = WakuDnsDiscovery.init(dnsDiscoveryUrl.get(), resolver) if wakuDnsDiscovery.isOk: - let discoveredPeers = wakuDnsDiscovery.get().findPeers() + let discoveredPeers = await wakuDnsDiscovery.get().findPeers() if discoveredPeers.isOk: info "Connecting to discovered peers" discoveredNodes = discoveredPeers.get() diff --git a/apps/networkmonitor/networkmonitor.nim b/apps/networkmonitor/networkmonitor.nim index cb2bec6d5..ed415cdaa 100644 --- a/apps/networkmonitor/networkmonitor.nim +++ b/apps/networkmonitor/networkmonitor.nim @@ -355,7 +355,9 @@ proc crawlNetwork( proc retrieveDynamicBootstrapNodes( dnsDiscovery: bool, dnsDiscoveryUrl: string, dnsDiscoveryNameServers: seq[IpAddress] -): Result[seq[RemotePeerInfo], string] = +): Future[Result[seq[RemotePeerInfo], string]] {.async.} = + ## Retrieve dynamic bootstrap nodes (DNS discovery) + if dnsDiscovery and dnsDiscoveryUrl != "": # DNS discovery debug "Discovering nodes using Waku DNS discovery", url = dnsDiscoveryUrl @@ -369,14 +371,15 @@ proc retrieveDynamicBootstrapNodes( proc resolver(domain: string): Future[string] {.async, gcsafe.} = trace "resolving", domain = domain let resolved = await dnsResolver.resolveTxt(domain) - return resolved[0] # Use only first answer + if resolved.len > 0: + return resolved[0] # Use only first answer var wakuDnsDiscovery = WakuDnsDiscovery.init(dnsDiscoveryUrl, resolver) if wakuDnsDiscovery.isOk(): - return wakuDnsDiscovery.get().findPeers().mapErr( - proc(e: cstring): string = - $e - ) + return (await wakuDnsDiscovery.get().findPeers()).mapErr( + proc(e: cstring): string = + $e + ) else: warn "Failed to init Waku DNS discovery" @@ -385,11 +388,11 @@ proc retrieveDynamicBootstrapNodes( proc getBootstrapFromDiscDns( conf: NetworkMonitorConf -): Result[seq[enr.Record], string] = +): Future[Result[seq[enr.Record], string]] {.async.} = try: let dnsNameServers = @[parseIpAddress("1.1.1.1"), parseIpAddress("1.0.0.1")] let dynamicBootstrapNodesRes = - retrieveDynamicBootstrapNodes(true, conf.dnsDiscoveryUrl, dnsNameServers) + await retrieveDynamicBootstrapNodes(true, conf.dnsDiscoveryUrl, dnsNameServers) if not dynamicBootstrapNodesRes.isOk(): error("failed discovering peers from DNS") let dynamicBootstrapNodes = dynamicBootstrapNodesRes.get() @@ -412,7 +415,7 @@ proc getBootstrapFromDiscDns( proc initAndStartApp( conf: NetworkMonitorConf -): Result[(WakuNode, WakuDiscoveryV5), string] = +): Future[Result[(WakuNode, WakuDiscoveryV5), string]] {.async.} = let bindIp = try: parseIpAddress("0.0.0.0") @@ -472,7 +475,7 @@ proc initAndStartApp( else: nodeRes.get() - var discv5BootstrapEnrsRes = getBootstrapFromDiscDns(conf) + var discv5BootstrapEnrsRes = await getBootstrapFromDiscDns(conf) if discv5BootstrapEnrsRes.isErr(): error("failed discovering peers from DNS") var discv5BootstrapEnrs = discv5BootstrapEnrsRes.get() @@ -604,7 +607,7 @@ when isMainModule: let restClient = clientRest.get() # start waku node - let nodeRes = initAndStartApp(conf) + let nodeRes = waitFor initAndStartApp(conf) if nodeRes.isErr(): error "could not start node" quit 1 diff --git a/library/waku_thread/inter_thread_communication/requests/discovery_request.nim b/library/waku_thread/inter_thread_communication/requests/discovery_request.nim index 746380755..e1da7a68d 100644 --- a/library/waku_thread/inter_thread_communication/requests/discovery_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/discovery_request.nim @@ -80,10 +80,10 @@ proc destroyShared(self: ptr DiscoveryRequest) = proc retrieveBootstrapNodes( enrTreeUrl: string, ipDnsServer: string -): Result[seq[string], string] = +): Future[Result[seq[string], string]] {.async.} = let dnsNameServers = @[parseIpAddress(ipDnsServer)] - let discoveredPeers: seq[RemotePeerInfo] = retrieveDynamicBootstrapNodes( - true, enrTreeUrl, dnsNameServers + let discoveredPeers: seq[RemotePeerInfo] = ( + await retrieveDynamicBootstrapNodes(true, enrTreeUrl, dnsNameServers) ).valueOr: return err("failed discovering peers from DNS: " & $error) @@ -126,7 +126,9 @@ proc process*( return ok("discv5 stopped correctly") of GET_BOOTSTRAP_NODES: - let nodes = retrieveBootstrapNodes($self[].enrTreeUrl, $self[].nameDnsServer).valueOr: + let nodes = ( + await retrieveBootstrapNodes($self[].enrTreeUrl, $self[].nameDnsServer) + ).valueOr: error "GET_BOOTSTRAP_NODES failed", error = error return err($error) diff --git a/tests/test_waku_dnsdisc.nim b/tests/test_waku_dnsdisc.nim index 4040bea8f..228fa5542 100644 --- a/tests/test_waku_dnsdisc.nim +++ b/tests/test_waku_dnsdisc.nim @@ -79,7 +79,7 @@ suite "Waku DNS Discovery": var wakuDnsDisc = WakuDnsDiscovery.init(location, resolver).get() - let res = wakuDnsDisc.findPeers() + let res = await wakuDnsDisc.findPeers() check: # We have discovered all three nodes diff --git a/vendor/nim-dnsdisc b/vendor/nim-dnsdisc index 38f853df3..c3d37c286 160000 --- a/vendor/nim-dnsdisc +++ b/vendor/nim-dnsdisc @@ -1 +1 @@ -Subproject commit 38f853df30bcfdb73055b7fd7de284a47eebecc2 +Subproject commit c3d37c2860bcef9e3e2616ee4c53100fe7f0e845 diff --git a/waku/discovery/waku_discv5.nim b/waku/discovery/waku_discv5.nim index f01c6c50f..28072c83c 100644 --- a/waku/discovery/waku_discv5.nim +++ b/waku/discovery/waku_discv5.nim @@ -448,5 +448,15 @@ proc updateBootstrapRecords*( return err("wrong enr given: " & enrWithoutQuotes) self.protocol.bootstrapRecords = newRecords + self.protocol.seedTable() return ok() + +proc updateBootstrapRecords*( + self: var WakuDiscoveryV5, updatedRecords: seq[enr.Record] +): void = + self.protocol.bootstrapRecords = updatedRecords + + # If we're updating the table with nodes that already existed, it will log an error when trying + # to add a bootstrap node that was already there. That's ok. + self.protocol.seedTable() diff --git a/waku/discovery/waku_dnsdisc.nim b/waku/discovery/waku_dnsdisc.nim index b31de2cf5..b8078feed 100644 --- a/waku/discovery/waku_dnsdisc.nim +++ b/waku/discovery/waku_dnsdisc.nim @@ -6,7 +6,7 @@ ## EIP-1459 is defined in https://eips.ethereum.org/EIPS/eip-1459 import - std/[options, net], + std/[options, net, sequtils], chronicles, chronos, metrics, @@ -40,7 +40,9 @@ proc emptyResolver*(domain: string): Future[string] {.async, gcsafe.} = debug "Empty resolver called", domain = domain return "" -proc findPeers*(wdd: var WakuDnsDiscovery): Result[seq[RemotePeerInfo], cstring] = +proc findPeers*( + wdd: WakuDnsDiscovery +): Future[Result[seq[RemotePeerInfo], cstring]] {.async.} = ## Find peers to connect to using DNS based discovery info "Finding peers using Waku DNS discovery" @@ -48,14 +50,13 @@ proc findPeers*(wdd: var WakuDnsDiscovery): Result[seq[RemotePeerInfo], cstring] # Synchronise client tree using configured resolver var tree: Tree try: - tree = wdd.client.getTree(wdd.resolver) - # @TODO: this is currently a blocking operation to not violate memory safety + tree = (await syncTree(wdd.resolver, wdd.client.loc)).tryGet() except Exception: error "Failed to synchronise client tree" waku_dnsdisc_errors.inc(labelValues = ["tree_sync_failure"]) return err("Node discovery failed") - let discoveredEnr = wdd.client.getNodeRecords() + let discoveredEnr = tree.getNodes().mapIt(it.record) if discoveredEnr.len > 0: info "Successfully discovered ENR", count = discoveredEnr.len @@ -97,7 +98,7 @@ proc init*( proc retrieveDynamicBootstrapNodes*( dnsDiscovery: bool, dnsDiscoveryUrl: string, dnsDiscoveryNameServers: seq[IpAddress] -): Result[seq[RemotePeerInfo], string] = +): Future[Result[seq[RemotePeerInfo], string]] {.async.} = ## Retrieve dynamic bootstrap nodes (DNS discovery) if dnsDiscovery and dnsDiscoveryUrl != "": @@ -113,14 +114,15 @@ proc retrieveDynamicBootstrapNodes*( proc resolver(domain: string): Future[string] {.async, gcsafe.} = trace "resolving", domain = domain let resolved = await dnsResolver.resolveTxt(domain) - return resolved[0] # Use only first answer + if resolved.len > 0: + return resolved[0] # Use only first answer var wakuDnsDiscovery = WakuDnsDiscovery.init(dnsDiscoveryUrl, resolver) if wakuDnsDiscovery.isOk(): - return wakuDnsDiscovery.get().findPeers().mapErr( - proc(e: cstring): string = - $e - ) + return (await wakuDnsDiscovery.get().findPeers()).mapErr( + proc(e: cstring): string = + $e + ) else: warn "Failed to init Waku DNS discovery" diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index 48d75cfd1..8d23cbaa5 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -58,6 +58,7 @@ type Waku* = ref object wakuDiscv5*: WakuDiscoveryV5 dynamicBootstrapNodes: seq[RemotePeerInfo] + dnsRetryLoopHandle: Future[void] discoveryMngr: DiscoveryManager node*: WakuNode @@ -215,17 +216,6 @@ proc new*(T: type Waku, confCopy: var WakuNodeConf): Result[Waku, string] = return err("Failed to generate key: " & $keyRes.error) confCopy.nodekey = some(keyRes.get()) - debug "Retrieve dynamic bootstrap nodes" - let dynamicBootstrapNodesRes = waku_dnsdisc.retrieveDynamicBootstrapNodes( - confCopy.dnsDiscovery, confCopy.dnsDiscoveryUrl, confCopy.dnsDiscoveryNameServers - ) - if dynamicBootstrapNodesRes.isErr(): - error "Retrieving dynamic bootstrap nodes failed", - error = dynamicBootstrapNodesRes.error - return err( - "Retrieving dynamic bootstrap nodes failed: " & dynamicBootstrapNodesRes.error - ) - var relay = newCircuitRelay(confCopy.isRelayClient) let nodeRes = setupNode(confCopy, rng, relay) @@ -255,7 +245,6 @@ proc new*(T: type Waku, confCopy: var WakuNodeConf): Result[Waku, string] = rng: rng, key: confCopy.nodekey.get(), node: node, - dynamicBootstrapNodes: dynamicBootstrapNodesRes.get(), deliveryMonitor: deliveryMonitor, ) @@ -351,7 +340,57 @@ proc updateWaku(waku: ptr Waku): Result[void, string] = return ok() -proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises: []).} = +proc startDnsDiscoveryRetryLoop(waku: ptr Waku): Future[void] {.async.} = + while true: + await sleepAsync(30.seconds) + let dynamicBootstrapNodesRes = await waku_dnsdisc.retrieveDynamicBootstrapNodes( + waku.conf.dnsDiscovery, waku.conf.dnsDiscoveryUrl, + waku.conf.dnsDiscoveryNameServers, + ) + if dynamicBootstrapNodesRes.isErr(): + error "Retrieving dynamic bootstrap nodes failed", + error = dynamicBootstrapNodesRes.error + continue + + waku[].dynamicBootstrapNodes = dynamicBootstrapNodesRes.get() + + if not waku[].wakuDiscv5.isNil(): + let dynamicBootstrapEnrs = waku[].dynamicBootstrapNodes + .filterIt(it.hasUdpPort()) + .mapIt(it.enr.get().toUri()) + var discv5BootstrapEnrs: seq[enr.Record] + # parse enrURIs from the configuration and add the resulting ENRs to the discv5BootstrapEnrs seq + for enrUri in dynamicBootstrapEnrs: + addBootstrapNode(enrUri, discv5BootstrapEnrs) + + waku[].wakuDiscv5.updateBootstrapRecords( + waku[].wakuDiscv5.protocol.bootstrapRecords & discv5BootstrapEnrs + ) + + info "Connecting to dynamic bootstrap peers" + try: + await connectToNodes( + waku[].node, waku[].dynamicBootstrapNodes, "dynamic bootstrap" + ) + except CatchableError: + error "failed to connect to dynamic bootstrap nodes: " & getCurrentExceptionMsg() + return + +proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async.} = + debug "Retrieve dynamic bootstrap nodes" + + let dynamicBootstrapNodesRes = await waku_dnsdisc.retrieveDynamicBootstrapNodes( + waku.conf.dnsDiscovery, waku.conf.dnsDiscoveryUrl, waku.conf.dnsDiscoveryNameServers + ) + + if dynamicBootstrapNodesRes.isErr(): + error "Retrieving dynamic bootstrap nodes failed", + error = dynamicBootstrapNodesRes.error + # Start Dns Discovery retry loop + waku[].dnsRetryLoopHandle = waku.startDnsDiscoveryRetryLoop() + else: + waku[].dynamicBootstrapNodes = dynamicBootstrapNodesRes.get() + if not waku[].conf.discv5Only: (await startNode(waku.node, waku.conf, waku.dynamicBootstrapNodes)).isOkOr: return err("error while calling startNode: " & $error) @@ -400,3 +439,6 @@ proc stop*(waku: Waku): Future[void] {.async: (raises: [Exception]).} = if not waku.node.isNil(): await waku.node.stop() + + if not waku.dnsRetryLoopHandle.isNil(): + await waku.dnsRetryLoopHandle.cancelAndWait()