From c9065e020e25a51b911f58d2a6d68632c822c3ed Mon Sep 17 00:00:00 2001 From: kaiserd Date: Thu, 17 Mar 2022 17:05:11 +0000 Subject: [PATCH] deploy: 2053a7a51d915bc69453c485a9e1bdb0f3d02f0a --- tests/v2/test_waku_discv5.nim | 2 +- waku/v2/node/discv5/waku_discv5.nim | 41 +++++- waku/v2/node/wakunode2.nim | 125 +++++++++++------- .../protocol/waku_store/waku_store_types.nim | 2 + waku/v2/utils/peers.nim | 8 +- 5 files changed, 121 insertions(+), 57 deletions(-) diff --git a/tests/v2/test_waku_discv5.nim b/tests/v2/test_waku_discv5.nim index 39aef91d8..0201c831f 100644 --- a/tests/v2/test_waku_discv5.nim +++ b/tests/v2/test_waku_discv5.nim @@ -52,7 +52,7 @@ procSuite "Waku Discovery v5": some(extIp), some(nodeTcpPort1), some(nodeUdpPort1), bindIp, nodeUdpPort1, - @[], + newSeq[string](), false, keys.PrivateKey(nodeKey1.skkey), flags, diff --git a/waku/v2/node/discv5/waku_discv5.nim b/waku/v2/node/discv5/waku_discv5.nim index 65bdfdc85..7e549f293 100644 --- a/waku/v2/node/discv5/waku_discv5.nim +++ b/waku/v2/node/discv5/waku_discv5.nim @@ -45,7 +45,7 @@ proc parseBootstrapAddress(address: TaintedString): else: return err "Ignoring unrecognized bootstrap address type" -proc addBootstrapNode(bootstrapAddr: string, +proc addBootstrapNode*(bootstrapAddr: string, bootstrapEnrs: var seq[enr.Record]) = # Ignore empty lines or lines starting with # if bootstrapAddr.len == 0 or bootstrapAddr[0] == '#': @@ -103,18 +103,13 @@ proc new*(T: type WakuDiscoveryV5, extTcpPort, extUdpPort: Option[Port], bindIP: ValidIpAddress, discv5UdpPort: Port, - bootstrapNodes: seq[string], + bootstrapEnrs: seq[enr.Record], enrAutoUpdate = false, privateKey: keys.PrivateKey, flags: WakuEnrBitfield, enrFields: openArray[(string, seq[byte])], rng: ref BrHmacDrbgContext, discv5Config: protocol.DiscoveryConfig = protocol.defaultDiscoveryConfig): T = - - var bootstrapEnrs: seq[enr.Record] - for node in bootstrapNodes: - addBootstrapNode(node, bootstrapEnrs) - ## TODO: consider loading from a configurable bootstrap file ## We always add the waku field as specified @@ -134,6 +129,38 @@ proc new*(T: type WakuDiscoveryV5, return WakuDiscoveryV5(protocol: protocol, listening: false) +# constructor that takes bootstrap Enrs in Enr Uri form +proc new*(T: type WakuDiscoveryV5, + extIp: Option[ValidIpAddress], + extTcpPort, extUdpPort: Option[Port], + bindIP: ValidIpAddress, + discv5UdpPort: Port, + bootstrapNodes: seq[string], + enrAutoUpdate = false, + privateKey: keys.PrivateKey, + flags: WakuEnrBitfield, + enrFields: openArray[(string, seq[byte])], + rng: ref BrHmacDrbgContext, + discv5Config: protocol.DiscoveryConfig = protocol.defaultDiscoveryConfig): T = + + var bootstrapEnrs: seq[enr.Record] + for node in bootstrapNodes: + addBootstrapNode(node, bootstrapEnrs) + + return WakuDiscoveryV5.new( + extIP, extTcpPort, extUdpPort, + bindIP, + discv5UdpPort, + bootstrapEnrs, + enrAutoUpdate, + privateKey, + flags, + enrFields, + rng, + discv5Config + ) + + proc open*(wakuDiscv5: WakuDiscoveryV5) {.raises: [Defect, CatchableError].} = debug "Opening Waku discovery v5 ports" diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index 6dd3f0dec..9140ae8a3 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -963,7 +963,7 @@ when isMainModule: # Setup functions # ################### - # 1/6 Setup storage + # 1/7 Setup storage proc setupStorage(conf: WakuNodeConf): SetupResult[tuple[pStorage: WakuPeerStorage, mStorage: WakuMessageStore]] = @@ -1013,9 +1013,35 @@ when isMainModule: ok(storeTuple) - # 2/6 Initialize node + # 2/7 Retrieve dynamic bootstrap nodes + proc retrieveDynamicBootstrapNodes(conf: WakuNodeConf): SetupResult[seq[RemotePeerInfo]] = + # DNS discovery + if conf.dnsDiscovery and conf.dnsDiscoveryUrl != "": + debug "Discovering nodes using Waku DNS discovery", url=conf.dnsDiscoveryUrl + + var nameServers: seq[TransportAddress] + for ip in conf.dnsDiscoveryNameServers: + nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53 + + let dnsResolver = DnsResolver.new(nameServers) + + proc resolver(domain: string): Future[string] {.async, gcsafe.} = + trace "resolving", domain=domain + let resolved = await dnsResolver.resolveTxt(domain) + return resolved[0] # Use only first answer + + var wakuDnsDiscovery = WakuDnsDiscovery.init(conf.dnsDiscoveryUrl, + resolver) + if wakuDnsDiscovery.isOk: + return wakuDnsDiscovery.get().findPeers() + .mapErr(proc (e: cstring): string = $e) + else: + warn "Failed to init Waku DNS discovery" + + # 3/7 Initialize node proc initNode(conf: WakuNodeConf, - pStorage: WakuPeerStorage = nil): SetupResult[WakuNode] = + pStorage: WakuPeerStorage = nil, + dynamicBootstrapNodes: openArray[RemotePeerInfo] = @[]): SetupResult[WakuNode] = ## Setup a basic Waku v2 node based on a supplied configuration ## file. Optionally include persistent peer storage. @@ -1075,11 +1101,26 @@ when isMainModule: discoveryConfig = DiscoveryConfig.init( conf.discv5TableIpLimit, conf.discv5BucketIpLimit, conf.discv5BitsPerHop) + # select dynamic bootstrap nodes that have an ENR containing a udp port. + # Discv5 only supports UDP https://github.com/ethereum/devp2p/blob/master/discv5/discv5-theory.md) + var discv5BootstrapEnrs: seq[enr.Record] + for n in dynamicBootstrapNodes: + if n.enr.isSome(): + let + enr = n.enr.get() + tenrRes = enr.toTypedRecord() + if tenrRes.isOk() and (tenrRes.get().udp.isSome() or tenrRes.get().udp6.isSome()): + discv5BootstrapEnrs.add(enr) + + # parse enrURIs from the configuration and add the resulting ENRs to the discv5BootstrapEnrs seq + for enrUri in conf.discv5BootstrapNodes: + addBootstrapNode(enrUri, discv5BootstrapEnrs) + node.wakuDiscv5 = WakuDiscoveryV5.new( extIP, extPort, some(discv5UdpPort), conf.listenAddress, discv5UdpPort, - conf.discv5BootstrapNodes, + discv5BootstrapEnrs, conf.discv5EnrAutoUpdate, keys.PrivateKey(conf.nodekey.skkey), wakuFlags, @@ -1090,7 +1131,7 @@ when isMainModule: ok(node) - # 3/6 Mount and initialize configured protocols + # 4/7 Mount and initialize configured protocols proc setupProtocols(node: var WakuNode, conf: WakuNodeConf, mStorage: WakuMessageStore = nil): SetupResult[bool] = @@ -1161,8 +1202,9 @@ when isMainModule: ok(true) # Success - # 4/6 Start node and mounted protocols - proc startNode(node: WakuNode, conf: WakuNodeConf): SetupResult[bool] = + # 5/7 Start node and mounted protocols + proc startNode(node: WakuNode, conf: WakuNodeConf, + dynamicBootstrapNodes: seq[RemotePeerInfo] = @[]): SetupResult[bool] = ## Start a configured node and all mounted protocols. ## Resume history, connect to static nodes and start ## keep-alive, if configured. @@ -1170,7 +1212,7 @@ when isMainModule: # Start Waku v2 node waitFor node.start() - # start discv5 and connect to discovered nodes + # Start discv5 and connect to discovered nodes if conf.discv5Discovery: if not waitFor node.startDiscv5(): error "could not start Discovery v5" @@ -1183,38 +1225,16 @@ when isMainModule: if conf.staticnodes.len > 0: waitFor connectToNodes(node, conf.staticnodes, "static") - # Connect to discovered nodes - if conf.dnsDiscovery and conf.dnsDiscoveryUrl != "": - debug "Discovering nodes using Waku DNS discovery", url=conf.dnsDiscoveryUrl - - var nameServers: seq[TransportAddress] - for ip in conf.dnsDiscoveryNameServers: - nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53 - - let dnsResolver = DnsResolver.new(nameServers) - - proc resolver(domain: string): Future[string] {.async, gcsafe.} = - trace "resolving", domain=domain - let resolved = await dnsResolver.resolveTxt(domain) - return resolved[0] # Use only first answer - - var wakuDnsDiscovery = WakuDnsDiscovery.init(conf.dnsDiscoveryUrl, - resolver) - if wakuDnsDiscovery.isOk: - let discoveredPeers = wakuDnsDiscovery.get().findPeers() - if discoveredPeers.isOk: - info "Connecting to discovered peers" - waitFor connectToNodes(node, discoveredPeers.get(), "dnsdisc") - else: - warn "Failed to init Waku DNS discovery" - + info "Connecting to dynamic bootstrap peers" + waitFor connectToNodes(node, dynamicBootstrapNodes, "dynamic bootstrap") + # Start keepalive, if enabled if conf.keepAlive: node.startKeepalive() ok(true) # Success - # 5/6 Start monitoring tools and external interfaces + # 6/7 Start monitoring tools and external interfaces proc startExternal(node: WakuNode, conf: WakuNodeConf): SetupResult[bool] = ## Start configured external interfaces and monitoring tools ## on a Waku v2 node, including the RPC API and metrics @@ -1242,7 +1262,7 @@ when isMainModule: # Node setup # ############## - debug "1/6 Setting up storage" + debug "1/7 Setting up storage" var pStorage: WakuPeerStorage @@ -1251,44 +1271,53 @@ when isMainModule: let setupStorageRes = setupStorage(conf) if setupStorageRes.isErr: - error "1/6 Setting up storage failed. Continuing without storage." + error "1/7 Setting up storage failed. Continuing without storage." else: (pStorage, mStorage) = setupStorageRes.get() - debug "2/6 Initializing node" + debug "2/7 Retrieve dynamic bootstrap nodes" + + var dynamicBootstrapNodes: seq[RemotePeerInfo] + let dynamicBootstrapNodesRes = retrieveDynamicBootstrapNodes(conf) + if dynamicBootstrapNodesRes.isErr: + error "2/7 Retrieving dynamic bootstrap nodes failed. Continuing without dynamic bootstrap nodes." + else: + dynamicBootstrapNodes = dynamicBootstrapNodesRes.get() - let initNodeRes = initNode(conf, pStorage) + debug "3/7 Initializing node" + + let initNodeRes = initNode(conf, pStorage, dynamicBootstrapNodes) if initNodeRes.isErr: - error "2/6 Initializing node failed. Quitting." + error "3/7 Initializing node failed. Quitting." quit(QuitFailure) else: node = initNodeRes.get() - debug "3/6 Mounting protocols" + debug "4/7 Mounting protocols" let setupProtocolsRes = setupProtocols(node, conf, mStorage) if setupProtocolsRes.isErr: - error "3/6 Mounting protocols failed. Continuing in current state." + error "4/7 Mounting protocols failed. Continuing in current state." - debug "4/6 Starting node and mounted protocols" + debug "5/7 Starting node and mounted protocols" - let startNodeRes = startNode(node, conf) + let startNodeRes = startNode(node, conf, dynamicBootstrapNodes) if startNodeRes.isErr: - error "4/6 Starting node and mounted protocols failed. Continuing in current state." + error "5/7 Starting node and mounted protocols failed. Continuing in current state." - debug "5/6 Starting monitoring and external interfaces" + debug "6/7 Starting monitoring and external interfaces" let startExternalRes = startExternal(node, conf) if startExternalRes.isErr: - error "5/6 Starting monitoring and external interfaces failed. Continuing in current state." + error "6/7 Starting monitoring and external interfaces failed. Continuing in current state." - debug "6/6 Setting up shutdown hooks" + debug "7/7 Setting up shutdown hooks" - # 6/6 Setup graceful shutdown hooks + # 7/7 Setup graceful shutdown hooks ## Setup shutdown hooks for this process. ## Stop node gracefully on shutdown. diff --git a/waku/v2/protocol/waku_store/waku_store_types.nim b/waku/v2/protocol/waku_store/waku_store_types.nim index d07f52c55..d051d95fa 100644 --- a/waku/v2/protocol/waku_store/waku_store_types.nim +++ b/waku/v2/protocol/waku_store/waku_store_types.nim @@ -200,6 +200,7 @@ proc fwdPage(storeQueue: StoreQueueRef, outSeq = @[] outPagingInfo = PagingInfo(pageSize: 0, cursor: startCursor.get(), direction: PagingDirection.FORWARD) outError = HistoryResponseError.INVALID_CURSOR + w.destroy return (outSeq, outPagingInfo, outError) # Advance walker once more @@ -271,6 +272,7 @@ proc bwdPage(storeQueue: StoreQueueRef, outSeq = @[] outPagingInfo = PagingInfo(pageSize: 0, cursor: startCursor.get(), direction: PagingDirection.BACKWARD) outError = HistoryResponseError.INVALID_CURSOR + w.destroy return (outSeq, outPagingInfo, outError) # Step walker one more step back diff --git a/waku/v2/utils/peers.nim b/waku/v2/utils/peers.nim index b49554497..855cd30c5 100644 --- a/waku/v2/utils/peers.nim +++ b/waku/v2/utils/peers.nim @@ -17,6 +17,7 @@ type RemotePeerInfo* = ref object of RootObj peerId*: PeerID addrs*: seq[MultiAddress] + enr*: Option[enr.Record] protocols*: seq[string] func `$`*(remotePeerInfo: RemotePeerInfo): string = @@ -26,11 +27,13 @@ proc init*( p: typedesc[RemotePeerInfo], peerId: PeerID, addrs: seq[MultiAddress] = @[], + enr: Option[enr.Record] = none(enr.Record), protocols: seq[string] = @[]): RemotePeerInfo = let remotePeerInfo = RemotePeerInfo( peerId: peerId, addrs: addrs, + enr: enr, protocols: protocols) return remotePeerInfo @@ -38,12 +41,14 @@ proc init*( proc init*(p: typedesc[RemotePeerInfo], peerId: string, addrs: seq[MultiAddress] = @[], + enr: Option[enr.Record] = none(enr.Record), protocols: seq[string] = @[]): RemotePeerInfo {.raises: [Defect, ResultError[cstring], LPError].} = let remotePeerInfo = RemotePeerInfo( peerId: PeerID.init(peerId).tryGet(), addrs: addrs, + enr: enr, protocols: protocols) return remotePeerInfo @@ -145,11 +150,12 @@ proc toRemotePeerInfo*(enr: enr.Record): Result[RemotePeerInfo, cstring] = if addrs.len == 0: return err("enr: no addresses in record") - return ok(RemotePeerInfo.init(peerId, addrs)) + return ok(RemotePeerInfo.init(peerId, addrs, some(enr))) ## Converts the local peerInfo to dialable RemotePeerInfo ## Useful for testing or internal connections proc toRemotePeerInfo*(peerInfo: PeerInfo): RemotePeerInfo = RemotePeerInfo.init(peerInfo.peerId, peerInfo.addrs, + none(enr.Record), # we could generate an ENR from PeerInfo peerInfo.protocols)