mirror of https://github.com/waku-org/nwaku.git
Waku v2 Node Discovery via DNS: Integration (#690)
This commit is contained in:
parent
1ebcb266cb
commit
36b9176569
|
@ -130,3 +130,8 @@
|
|||
url = https://github.com/status-im/nim-zlib.git
|
||||
ignore = untracked
|
||||
branch = master
|
||||
[submodule "vendor/nim-dnsdisc"]
|
||||
path = vendor/nim-dnsdisc
|
||||
url = https://github.com/status-im/nim-dnsdisc.git
|
||||
ignore = untracked
|
||||
branch = main
|
||||
|
|
|
@ -6,6 +6,8 @@ This release contains the following:
|
|||
|
||||
### Features
|
||||
|
||||
- Start of Waku node discovery via DNS following [EIP-1459](https://eips.ethereum.org/EIPS/eip-1459)
|
||||
|
||||
### Changes
|
||||
|
||||
- GossipSub [prune backoff period](https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md#prune-backoff-and-peer-exchange) is now the recommended 1 minute
|
||||
|
|
|
@ -16,7 +16,8 @@ import
|
|||
./v2/test_peer_storage,
|
||||
./v2/test_waku_keepalive,
|
||||
./v2/test_migration_utils,
|
||||
./v2/test_namespacing_utils
|
||||
./v2/test_namespacing_utils,
|
||||
./v2/test_waku_dnsdisc
|
||||
|
||||
when defined(rln):
|
||||
import ./v2/test_waku_rln_relay
|
||||
|
|
|
@ -0,0 +1,97 @@
|
|||
{.used.}
|
||||
|
||||
import
|
||||
std/[sequtils, tables],
|
||||
chronicles,
|
||||
chronos,
|
||||
testutils/unittests,
|
||||
stew/shims/net,
|
||||
stew/[base32, results],
|
||||
libp2p/crypto/crypto,
|
||||
eth/keys,
|
||||
discovery/dnsdisc/builder,
|
||||
../../waku/v2/node/dnsdisc/waku_dnsdisc,
|
||||
../../waku/v2/node/wakunode2,
|
||||
../test_helpers
|
||||
|
||||
procSuite "Waku DNS Discovery":
|
||||
asyncTest "Waku DNS Discovery end-to-end":
|
||||
## Tests integrated DNS discovery, from building
|
||||
## the tree to connecting to discovered nodes
|
||||
|
||||
# Create nodes and ENR. These will be added to the discoverable list
|
||||
let
|
||||
bindIp = ValidIpAddress.init("0.0.0.0")
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.new(nodeKey1, bindIp, Port(60000))
|
||||
enr1 = node1.enr
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.new(nodeKey2, bindIp, Port(60002))
|
||||
enr2 = node2.enr
|
||||
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node3 = WakuNode.new(nodeKey3, bindIp, Port(60003))
|
||||
enr3 = node3.enr
|
||||
|
||||
node1.mountRelay()
|
||||
node2.mountRelay()
|
||||
node3.mountRelay()
|
||||
await allFutures([node1.start(), node2.start(), node3.start()])
|
||||
|
||||
# Build and sign tree
|
||||
var tree = buildTree(1, # Seq no
|
||||
@[enr1, enr2, enr3], # ENR entries
|
||||
@[]).get() # No link entries
|
||||
|
||||
let treeKeys = keys.KeyPair.random(rng[])
|
||||
|
||||
# Sign tree
|
||||
check:
|
||||
tree.signTree(treeKeys.seckey()).isOk()
|
||||
|
||||
# Create TXT records at domain
|
||||
let
|
||||
domain = "testnodes.aq"
|
||||
zoneTxts = tree.buildTXT(domain).get()
|
||||
username = Base32.encode(treeKeys.pubkey().toRawCompressed())
|
||||
location = LinkPrefix & username & "@" & domain # See EIP-1459: https://eips.ethereum.org/EIPS/eip-1459
|
||||
|
||||
# Create a resolver for the domain
|
||||
|
||||
proc resolver(domain: string): Future[string] {.async, gcsafe.} =
|
||||
return zoneTxts[domain]
|
||||
|
||||
# Create Waku DNS discovery client on a new Waku v2 node using the resolver
|
||||
|
||||
let
|
||||
nodeKey4 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node4 = WakuNode.new(nodeKey4, bindIp, Port(60004))
|
||||
enr4 = node4.enr
|
||||
|
||||
node4.mountRelay()
|
||||
await node4.start()
|
||||
|
||||
var wakuDnsDisc = WakuDnsDiscovery.init(enr4, location, resolver).get()
|
||||
|
||||
let res = wakuDnsDisc.findPeers()
|
||||
|
||||
check:
|
||||
# We have discovered all three nodes
|
||||
res.isOk()
|
||||
res[].len == 3
|
||||
res[].mapIt(it.peerId).contains(node1.peerInfo.peerId)
|
||||
res[].mapIt(it.peerId).contains(node2.peerInfo.peerId)
|
||||
res[].mapIt(it.peerId).contains(node3.peerInfo.peerId)
|
||||
|
||||
# Connect to discovered nodes
|
||||
await node4.connectToNodes(res[])
|
||||
|
||||
check:
|
||||
# We have successfully connected to all discovered nodes
|
||||
node4.peerManager.peers().anyIt(it.peerId == node1.peerInfo.peerId)
|
||||
node4.peerManager.connectedness(node1.peerInfo.peerId) == Connected
|
||||
node4.peerManager.peers().anyIt(it.peerId == node2.peerInfo.peerId)
|
||||
node4.peerManager.connectedness(node2.peerInfo.peerId) == Connected
|
||||
node4.peerManager.peers().anyIt(it.peerId == node3.peerInfo.peerId)
|
||||
node4.peerManager.connectedness(node3.peerInfo.peerId) == Connected
|
||||
|
||||
await allFutures([node1.stop(), node2.stop(), node3.stop(), node4.stop()])
|
|
@ -0,0 +1 @@
|
|||
Subproject commit dcb9290d004476fb0a5389baa88121b072abf135
|
|
@ -181,6 +181,18 @@ type
|
|||
defaultValue: false
|
||||
name: "metrics-logging" }: bool
|
||||
|
||||
## DNS discovery config
|
||||
|
||||
dnsDiscovery* {.
|
||||
desc: "Enable discovering nodes via DNS"
|
||||
defaultValue: false
|
||||
name: "dns-discovery" }: bool
|
||||
|
||||
dnsDiscoveryUrl* {.
|
||||
desc: "URL for DNS node list in format 'enrtree://<key>@<fqdn>'",
|
||||
defaultValue: ""
|
||||
name: "dns-discovery-url" }: string
|
||||
|
||||
# NOTE: Keys are different in nim-libp2p
|
||||
proc parseCmdArg*(T: type crypto.PrivateKey, p: TaintedString): T =
|
||||
try:
|
||||
|
|
|
@ -0,0 +1,174 @@
|
|||
{.push raises: [Defect].}
|
||||
|
||||
## A set of utilities to integrate EIP-1459 DNS-based discovery
|
||||
## for Waku v2 nodes.
|
||||
##
|
||||
## EIP-1459 is defined in https://eips.ethereum.org/EIPS/eip-1459
|
||||
|
||||
import
|
||||
std/options,
|
||||
stew/shims/net,
|
||||
chronicles,
|
||||
chronos,
|
||||
metrics,
|
||||
eth/keys,
|
||||
eth/p2p/discoveryv5/enr,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/crypto/secp,
|
||||
libp2p/peerinfo,
|
||||
libp2p/multiaddress,
|
||||
discovery/dnsdisc/client
|
||||
|
||||
export client
|
||||
|
||||
declarePublicGauge waku_dnsdisc_discovered, "number of nodes discovered"
|
||||
declarePublicGauge waku_dnsdisc_errors, "number of waku dnsdisc errors", ["type"]
|
||||
|
||||
logScope:
|
||||
topics = "wakudnsdisc"
|
||||
|
||||
type
|
||||
WakuDnsDiscovery* = object
|
||||
enr*: enr.Record
|
||||
client*: Client
|
||||
resolver*: Resolver
|
||||
|
||||
##################
|
||||
# Util functions #
|
||||
##################
|
||||
|
||||
func getTransportProtocol(typedR: TypedRecord): Option[IpTransportProtocol] =
|
||||
if typedR.tcp6.isSome or typedR.tcp.isSome:
|
||||
return some(IpTransportProtocol.tcpProtocol)
|
||||
|
||||
if typedR.udp6.isSome or typedR.udp.isSome:
|
||||
return some(IpTransportProtocol.udpProtocol)
|
||||
|
||||
return none(IpTransportProtocol)
|
||||
|
||||
func toPeerInfo*(enr: enr.Record): Result[PeerInfo, cstring] =
|
||||
let typedR = ? enr.toTypedRecord
|
||||
|
||||
if not typedR.secp256k1.isSome:
|
||||
return err("enr: no secp256k1 key in record")
|
||||
|
||||
let
|
||||
pubKey = ? keys.PublicKey.fromRaw(typedR.secp256k1.get)
|
||||
peerId = ? PeerID.init(crypto.PublicKey(scheme: Secp256k1,
|
||||
skkey: secp.SkPublicKey(pubKey)))
|
||||
|
||||
var addrs = newSeq[MultiAddress]()
|
||||
|
||||
let transportProto = getTransportProtocol(typedR)
|
||||
if transportProto.isNone:
|
||||
return err("enr: could not determine transport protocol")
|
||||
|
||||
case transportProto.get()
|
||||
of tcpProtocol:
|
||||
if typedR.ip.isSome and typedR.tcp.isSome:
|
||||
let ip = ipv4(typedR.ip.get)
|
||||
addrs.add MultiAddress.init(ip, tcpProtocol, Port typedR.tcp.get)
|
||||
|
||||
if typedR.ip6.isSome:
|
||||
let ip = ipv6(typedR.ip6.get)
|
||||
if typedR.tcp6.isSome:
|
||||
addrs.add MultiAddress.init(ip, tcpProtocol, Port typedR.tcp6.get)
|
||||
elif typedR.tcp.isSome:
|
||||
addrs.add MultiAddress.init(ip, tcpProtocol, Port typedR.tcp.get)
|
||||
else:
|
||||
discard
|
||||
|
||||
of udpProtocol:
|
||||
if typedR.ip.isSome and typedR.udp.isSome:
|
||||
let ip = ipv4(typedR.ip.get)
|
||||
addrs.add MultiAddress.init(ip, udpProtocol, Port typedR.udp.get)
|
||||
|
||||
if typedR.ip6.isSome:
|
||||
let ip = ipv6(typedR.ip6.get)
|
||||
if typedR.udp6.isSome:
|
||||
addrs.add MultiAddress.init(ip, udpProtocol, Port typedR.udp6.get)
|
||||
elif typedR.udp.isSome:
|
||||
addrs.add MultiAddress.init(ip, udpProtocol, Port typedR.udp.get)
|
||||
else:
|
||||
discard
|
||||
|
||||
if addrs.len == 0:
|
||||
return err("enr: no addresses in record")
|
||||
|
||||
return ok(PeerInfo.init(peerId, addrs))
|
||||
|
||||
func createEnr*(privateKey: crypto.PrivateKey,
|
||||
enrIp: Option[ValidIpAddress],
|
||||
enrTcpPort, enrUdpPort: Option[Port]): enr.Record =
|
||||
|
||||
assert privateKey.scheme == PKScheme.Secp256k1
|
||||
|
||||
let
|
||||
rawPk = privateKey.getRawBytes().expect("Private key is valid")
|
||||
pk = keys.PrivateKey.fromRaw(rawPk).expect("Raw private key is of valid length")
|
||||
enr = enr.Record.init(1, pk, enrIp, enrTcpPort, enrUdpPort).expect("Record within size limits")
|
||||
|
||||
return enr
|
||||
|
||||
#####################
|
||||
# DNS Discovery API #
|
||||
#####################
|
||||
|
||||
proc emptyResolver*(domain: string): Future[string] {.async, gcsafe.} =
|
||||
debug "Empty resolver called", domain=domain
|
||||
return ""
|
||||
|
||||
proc findPeers*(wdd: var WakuDnsDiscovery): Result[seq[PeerInfo], cstring] =
|
||||
## Find peers to connect to using DNS based discovery
|
||||
|
||||
info "Finding peers using Waku DNS discovery"
|
||||
|
||||
# Synchronise client tree using configured resolver
|
||||
var tree: Tree
|
||||
try:
|
||||
tree = wdd.client.getTree(wdd.resolver) # @TODO: this is currently a blocking operation to not violate memory safety
|
||||
except Exception:
|
||||
error "Failed to synchronise client tree"
|
||||
waku_dnsdisc_errors.inc(labelValues = ["tree_sync_failure"])
|
||||
return err("Node discovery failed")
|
||||
|
||||
let discoveredEnr = wdd.client.getNodeRecords()
|
||||
|
||||
if discoveredEnr.len > 0:
|
||||
info "Successfully discovered ENR", count=discoveredEnr.len
|
||||
else:
|
||||
trace "No ENR retrieved from client tree"
|
||||
|
||||
var discoveredNodes: seq[PeerInfo]
|
||||
|
||||
for enr in discoveredEnr:
|
||||
# Convert discovered ENR to PeerInfo and add to discovered nodes
|
||||
let res = enr.toPeerInfo()
|
||||
|
||||
if res.isOk():
|
||||
discoveredNodes.add(res.get())
|
||||
else:
|
||||
error "Failed to convert ENR to peer info", enr=enr, err=res.error()
|
||||
waku_dnsdisc_errors.inc(labelValues = ["peer_info_failure"])
|
||||
|
||||
if discoveredNodes.len > 0:
|
||||
info "Successfully discovered nodes", count=discoveredNodes.len
|
||||
waku_dnsdisc_discovered.inc(discoveredNodes.len.int64)
|
||||
|
||||
return ok(discoveredNodes)
|
||||
|
||||
proc init*(T: type WakuDnsDiscovery,
|
||||
enr: enr.Record,
|
||||
locationUrl: string,
|
||||
resolver: Resolver): Result[T, cstring] =
|
||||
## Initialise Waku peer discovery via DNS
|
||||
|
||||
debug "init WakuDnsDiscovery", enr=enr, locationUrl=locationUrl
|
||||
|
||||
let
|
||||
client = ? Client.init(locationUrl)
|
||||
wakuDnsDisc = WakuDnsDiscovery(enr: enr, client: client, resolver: resolver)
|
||||
|
||||
debug "init success"
|
||||
|
||||
return ok(wakuDnsDisc)
|
|
@ -5,6 +5,7 @@ import
|
|||
chronos, chronicles, metrics,
|
||||
stew/shims/net as stewNet,
|
||||
eth/keys,
|
||||
eth/p2p/discoveryv5/enr,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/protocols/ping,
|
||||
libp2p/protocols/pubsub/gossipsub,
|
||||
|
@ -18,7 +19,8 @@ import
|
|||
../utils/peers,
|
||||
../utils/requests,
|
||||
./storage/migration/migration_types,
|
||||
./peer_manager/peer_manager
|
||||
./peer_manager/peer_manager,
|
||||
./dnsdisc/waku_dnsdisc
|
||||
|
||||
export
|
||||
builders,
|
||||
|
@ -74,6 +76,7 @@ type
|
|||
wakuRlnRelay*: WakuRLNRelay
|
||||
wakuLightPush*: WakuLightPush
|
||||
peerInfo*: PeerInfo
|
||||
enr*: enr.Record
|
||||
libp2pPing*: Ping
|
||||
libp2pTransportLoops*: seq[Future[void]]
|
||||
filters*: Filters
|
||||
|
@ -135,6 +138,12 @@ proc new*(T: type WakuNode, nodeKey: crypto.PrivateKey,
|
|||
announcedAddresses = if extIp.isNone() or extPort.isNone(): @[]
|
||||
else: @[tcpEndPoint(extIp.get(), extPort.get())]
|
||||
peerInfo = PeerInfo.init(nodekey)
|
||||
enrIp = if extIp.isSome(): extIp
|
||||
else: some(bindIp)
|
||||
enrTcpPort = if extPort.isSome(): extPort
|
||||
else: some(bindPort)
|
||||
enr = createEnr(nodeKey, enrIp, enrTcpPort, none(Port))
|
||||
|
||||
info "Initializing networking", hostAddress,
|
||||
announcedAddresses
|
||||
# XXX: Add this when we create node or start it?
|
||||
|
@ -156,6 +165,7 @@ proc new*(T: type WakuNode, nodeKey: crypto.PrivateKey,
|
|||
switch: switch,
|
||||
rng: rng,
|
||||
peerInfo: peerInfo,
|
||||
enr: enr,
|
||||
filters: initTable[string, Filter]()
|
||||
)
|
||||
|
||||
|
@ -659,6 +669,7 @@ proc start*(node: WakuNode) {.async.} =
|
|||
let listenStr = $peerInfo.addrs[^1] & "/p2p/" & $peerInfo.peerId
|
||||
## XXX: this should be /ip4..., / stripped?
|
||||
info "Listening on", full = listenStr
|
||||
info "Discoverable ENR ", enr = node.enr.toURI()
|
||||
|
||||
if not node.wakuRelay.isNil:
|
||||
await node.startRelay()
|
||||
|
@ -839,6 +850,22 @@ when isMainModule:
|
|||
if conf.staticnodes.len > 0:
|
||||
waitFor connectToNodes(node, conf.staticnodes)
|
||||
|
||||
# Connect to discovered nodes
|
||||
if conf.dnsDiscovery and conf.dnsDiscoveryUrl != "":
|
||||
# @ TODO: this is merely POC integration with an empty resolver
|
||||
debug "Waku DNS Discovery enabled. Using empty resolver."
|
||||
|
||||
var wakuDnsDiscovery = WakuDnsDiscovery.init(node.enr,
|
||||
conf.dnsDiscoveryUrl,
|
||||
emptyResolver) # TODO: Add DNS resolver
|
||||
if wakuDnsDiscovery.isOk:
|
||||
let discoveredPeers = wakuDnsDiscovery.get().findPeers()
|
||||
if discoveredPeers.isOk:
|
||||
info "Connecting to discovered peers"
|
||||
waitFor connectToNodes(node, discoveredPeers.get())
|
||||
else:
|
||||
warn "Failed to init Waku DNS discovery"
|
||||
|
||||
# Start keepalive, if enabled
|
||||
if conf.keepAlive:
|
||||
node.startKeepalive()
|
||||
|
|
Loading…
Reference in New Issue