Discv5 POC integration (#748)

This commit is contained in:
Hanno Cornelius 2021-11-01 19:02:39 +01:00 committed by GitHub
parent dbbc0f750b
commit f85434e072
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 442 additions and 75 deletions

View File

@ -7,6 +7,7 @@ This release contains the following:
### Features
- Waku v2 node discovery via DNS following [EIP-1459](https://eips.ethereum.org/EIPS/eip-1459)
- Waku v2 node discovery via [Node Discovery v5](https://github.com/ethereum/devp2p/blob/master/discv5/discv5-theory.md)
### Changes

View File

@ -17,7 +17,8 @@ import
./v2/test_waku_keepalive,
./v2/test_migration_utils,
./v2/test_namespacing_utils,
./v2/test_waku_dnsdisc
./v2/test_waku_dnsdisc,
./v2/test_waku_discv5
when defined(rln):
import ./v2/test_waku_rln_relay

View File

@ -0,0 +1,113 @@
{.used.}
import
std/[tables, sequtils],
chronicles,
chronos,
testutils/unittests,
stew/byteutils,
stew/shims/net,
libp2p/crypto/crypto,
eth/keys,
eth/p2p/discoveryv5/enr,
../../waku/v2/node/discv5/waku_discv5,
../../waku/v2/node/wakunode2,
../test_helpers
procSuite "Waku Discovery v5":
asyncTest "Waku Discovery v5 end-to-end":
## Tests integrated discovery v5
let
bindIp = ValidIpAddress.init("0.0.0.0")
extIp = ValidIpAddress.init("127.0.0.1")
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
nodeTcpPort1 = Port(60000)
nodeUdpPort1 = Port(9000)
node1 = WakuNode.new(nodeKey1, bindIp, nodeTcpPort1)
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
nodeTcpPort2 = Port(60002)
nodeUdpPort2 = Port(9002)
node2 = WakuNode.new(nodeKey2, bindIp, nodeTcpPort2)
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
nodeTcpPort3 = Port(60004)
nodeUdpPort3 = Port(9004)
node3 = WakuNode.new(nodeKey3, bindIp, nodeTcpPort3)
# E2E relay test paramaters
pubSubTopic = "/waku/2/default-waku/proto"
contentTopic = ContentTopic("/waku/2/default-content/proto")
payload = "Can you see me?".toBytes()
message = WakuMessage(payload: payload, contentTopic: contentTopic)
# Mount discv5
node1.wakuDiscv5 = WakuDiscoveryV5.new(
some(extIp), some(nodeTcpPort1), some(nodeUdpPort1),
bindIp,
nodeUdpPort1,
@[],
false,
keys.PrivateKey(nodeKey1.skkey),
[], # Empty enr fields, for now
node1.rng
)
node2.wakuDiscv5 = WakuDiscoveryV5.new(
some(extIp), some(nodeTcpPort2), some(nodeUdpPort2),
bindIp,
nodeUdpPort2,
@[node1.wakuDiscv5.protocol.localNode.record.toURI()], # Bootstrap with node1
false,
keys.PrivateKey(nodeKey2.skkey),
[], # Empty enr fields, for now
node2.rng
)
node3.wakuDiscv5 = WakuDiscoveryV5.new(
some(extIp), some(nodeTcpPort3), some(nodeUdpPort3),
bindIp,
nodeUdpPort3,
@[node2.wakuDiscv5.protocol.localNode.record.toURI()], # Bootstrap with node2
false,
keys.PrivateKey(nodeKey3.skkey),
[], # Empty enr fields, for now
node3.rng
)
node1.mountRelay()
node2.mountRelay()
node3.mountRelay()
await allFutures([node1.start(), node2.start(), node3.start()])
await allFutures([node1.startDiscv5(), node2.startDiscv5(), node3.startDiscv5()])
await sleepAsync(3000.millis) # Give the algorithm some time to work its magic
check:
node1.wakuDiscv5.protocol.nodesDiscovered > 0
node2.wakuDiscv5.protocol.nodesDiscovered > 0
node3.wakuDiscv5.protocol.nodesDiscovered > 0
# Let's see if we can deliver a message end-to-end
var completionFut = newFuture[bool]()
proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
let msg = WakuMessage.init(data)
if msg.isOk():
let val = msg.value()
check:
topic == pubSubTopic
val.contentTopic == contentTopic
val.payload == payload
completionFut.complete(true)
node3.subscribe(pubSubTopic, relayHandler)
await sleepAsync(2000.millis)
await node1.publish(pubSubTopic, message)
check:
(await completionFut.withTimeout(6.seconds)) == true
await allFutures([node1.stop(), node2.stop(), node3.stop()])

View File

@ -24,7 +24,7 @@ type
listenAddress* {.
defaultValue: defaultListenAddress(config)
desc: "Listening address for the LibP2P traffic."
desc: "Listening address for LibP2P (and Discovery v5, if enabled) traffic."
name: "listen-address"}: ValidIpAddress
tcpPort* {.
@ -85,6 +85,7 @@ type
desc: "the pubsub topic for which rln-relay gets enabled",
defaultValue: "waku/2/rlnrelay/proto"
name: "rln-relay-pubsub-topic" }: string
staticnodes* {.
desc: "Peer multiaddr to directly connect with. Argument may be repeated."
name: "staticnode" }: seq[string]
@ -207,6 +208,29 @@ type
desc: "DNS name server IPs to query. Argument may be repeated."
defaultValue: @[ValidIpAddress.init("1.1.1.1"), ValidIpAddress.init("1.0.0.1")]
name: "dns-discovery-name-server" }: seq[ValidIpAddress]
## Discovery v5 config
discv5Discovery* {.
desc: "Enable discovering nodes via Node Discovery v5"
defaultValue: false
name: "discv5-discovery" }: bool
discv5UdpPort* {.
desc: "Listening UDP port for Node Discovery v5."
defaultValue: 9000
name: "discv5-udp-port" }: Port
discv5BootstrapNodes* {.
desc: "Text-encoded ENR for bootstrap node. Used when connecting to the network. Argument may be repeated."
name: "discv5-bootstrap-node" }: seq[string]
discv5EnrAutoUpdate* {.
desc: "Discovery can automatically update its ENR with the IP address " &
"and UDP port as seen by other nodes it communicates with. " &
"This option allows to enable/disable this functionality"
defaultValue: false
name: "discv5-enr-auto-update" .}: bool
# NOTE: Keys are different in nim-libp2p
proc parseCmdArg*(T: type crypto.PrivateKey, p: TaintedString): T =

View File

@ -0,0 +1,131 @@
{.push raises: [Defect].}
import
std/[strutils, options],
chronos, chronicles, metrics,
eth/keys,
eth/p2p/discoveryv5/[enr, protocol],
stew/shims/net,
stew/results,
../config,
../../utils/peers
export protocol
declarePublicGauge waku_discv5_discovered, "number of nodes discovered"
declarePublicGauge waku_discv5_errors, "number of waku discv5 errors", ["type"]
logScope:
topics = "wakudiscv5"
type
WakuDiscoveryV5* = ref object
protocol*: protocol.Protocol
listening*: bool
proc parseBootstrapAddress(address: TaintedString):
Result[enr.Record, cstring] =
logScope:
address = string(address)
if address[0] == '/':
return err "MultiAddress bootstrap addresses are not supported"
else:
let lowerCaseAddress = toLowerAscii(string address)
if lowerCaseAddress.startsWith("enr:"):
var enrRec: enr.Record
if enrRec.fromURI(string address):
return ok enrRec
return err "Invalid ENR bootstrap record"
elif lowerCaseAddress.startsWith("enode:"):
return err "ENode bootstrap addresses are not supported"
else:
return err "Ignoring unrecognized bootstrap address type"
proc addBootstrapNode(bootstrapAddr: string,
bootstrapEnrs: var seq[enr.Record]) =
# Ignore empty lines or lines starting with #
if bootstrapAddr.len == 0 or bootstrapAddr[0] == '#':
return
let enrRes = parseBootstrapAddress(bootstrapAddr)
if enrRes.isOk:
bootstrapEnrs.add enrRes.value
else:
warn "Ignoring invalid bootstrap address",
bootstrapAddr, reason = enrRes.error
####################
# Discovery v5 API #
####################
proc findRandomPeers*(wakuDiscv5: WakuDiscoveryV5): Future[Result[seq[RemotePeerInfo], cstring]] {.async.} =
## Find random peers to connect to using Discovery v5
## Query for a random target and collect all discovered nodes
## @TODO: we could filter nodes here
let discoveredNodes = await wakuDiscv5.protocol.queryRandom()
var discoveredPeers: seq[RemotePeerInfo]
for node in discoveredNodes:
# Convert discovered ENR to RemotePeerInfo and add to discovered nodes
let res = node.record.toRemotePeerInfo()
if res.isOk():
discoveredPeers.add(res.get())
else:
error "Failed to convert ENR to peer info", enr=node.record, err=res.error()
waku_discv5_errors.inc(labelValues = ["peer_info_failure"])
if discoveredPeers.len > 0:
info "Successfully discovered nodes", count=discoveredPeers.len
waku_discv5_discovered.inc(discoveredPeers.len.int64)
return ok(discoveredPeers)
proc new*(T: type WakuDiscoveryV5,
extIp: Option[ValidIpAddress],
extTcpPort, extUdpPort: Option[Port],
bindIP: ValidIpAddress,
discv5UdpPort: Port,
bootstrapNodes: seq[string],
enrAutoUpdate = false,
privateKey: PrivateKey,
enrFields: openArray[(string, seq[byte])],
rng: ref BrHmacDrbgContext): T =
var bootstrapEnrs: seq[enr.Record]
for node in bootstrapNodes:
addBootstrapNode(node, bootstrapEnrs)
## TODO: consider loading from a configurable bootstrap file
let protocol = newProtocol(
privateKey,
enrIp = extIp, enrTcpPort = extTcpPort, enrUdpPort = extUdpPort, # We use the external IP & ports for ENR
enrFields,
bootstrapEnrs,
bindPort = discv5UdpPort,
bindIp = bindIP,
enrAutoUpdate = enrAutoUpdate,
rng = rng)
return WakuDiscoveryV5(protocol: protocol, listening: false)
proc open*(wakuDiscv5: WakuDiscoveryV5) {.raises: [Defect, CatchableError].} =
debug "Opening Waku discovery v5 ports"
wakuDiscv5.protocol.open()
wakuDiscv5.listening = true
proc start*(wakuDiscv5: WakuDiscoveryV5) =
debug "Starting Waku discovery v5 service"
wakuDiscv5.protocol.start()
proc closeWait*(wakuDiscv5: WakuDiscoveryV5) {.async.} =
debug "Closing Waku discovery v5 node"
wakuDiscv5.listening = false
await wakuDiscv5.protocol.closeWait()

View File

@ -37,66 +37,6 @@ type
# Util functions #
##################
func getTransportProtocol(typedR: TypedRecord): Option[IpTransportProtocol] =
if typedR.tcp6.isSome or typedR.tcp.isSome:
return some(IpTransportProtocol.tcpProtocol)
if typedR.udp6.isSome or typedR.udp.isSome:
return some(IpTransportProtocol.udpProtocol)
return none(IpTransportProtocol)
func toRemotePeerInfo*(enr: enr.Record): Result[RemotePeerInfo, cstring] =
let typedR = ? enr.toTypedRecord
if not typedR.secp256k1.isSome:
return err("enr: no secp256k1 key in record")
let
pubKey = ? keys.PublicKey.fromRaw(typedR.secp256k1.get)
peerId = ? PeerID.init(crypto.PublicKey(scheme: Secp256k1,
skkey: secp.SkPublicKey(pubKey)))
var addrs = newSeq[MultiAddress]()
let transportProto = getTransportProtocol(typedR)
if transportProto.isNone:
return err("enr: could not determine transport protocol")
case transportProto.get()
of tcpProtocol:
if typedR.ip.isSome and typedR.tcp.isSome:
let ip = ipv4(typedR.ip.get)
addrs.add MultiAddress.init(ip, tcpProtocol, Port typedR.tcp.get)
if typedR.ip6.isSome:
let ip = ipv6(typedR.ip6.get)
if typedR.tcp6.isSome:
addrs.add MultiAddress.init(ip, tcpProtocol, Port typedR.tcp6.get)
elif typedR.tcp.isSome:
addrs.add MultiAddress.init(ip, tcpProtocol, Port typedR.tcp.get)
else:
discard
of udpProtocol:
if typedR.ip.isSome and typedR.udp.isSome:
let ip = ipv4(typedR.ip.get)
addrs.add MultiAddress.init(ip, udpProtocol, Port typedR.udp.get)
if typedR.ip6.isSome:
let ip = ipv6(typedR.ip6.get)
if typedR.udp6.isSome:
addrs.add MultiAddress.init(ip, udpProtocol, Port typedR.udp6.get)
elif typedR.udp.isSome:
addrs.add MultiAddress.init(ip, udpProtocol, Port typedR.udp.get)
else:
discard
if addrs.len == 0:
return err("enr: no addresses in record")
return ok(RemotePeerInfo.init(peerId, addrs))
func createEnr*(privateKey: crypto.PrivateKey,
enrIp: Option[ValidIpAddress],
enrTcpPort, enrUdpPort: Option[Port]): enr.Record =

View File

@ -22,7 +22,8 @@ import
../utils/requests,
./storage/migration/migration_types,
./peer_manager/peer_manager,
./dnsdisc/waku_dnsdisc
./dnsdisc/waku_dnsdisc,
./discv5/waku_discv5
export
builders,
@ -83,6 +84,7 @@ type
libp2pTransportLoops*: seq[Future[void]]
filters*: Filters
rng*: ref BrHmacDrbgContext
wakuDiscv5*: WakuDiscoveryV5
started*: bool # Indicates that node has started listening
proc protocolMatcher(codec: string): Matcher =
@ -701,6 +703,78 @@ proc connectToNodes*(n: WakuNode, nodes: seq[RemotePeerInfo]) {.async.} =
# later.
await sleepAsync(5.seconds)
proc runDiscv5Loop(node: WakuNode) {.async.} =
## Continuously add newly discovered nodes
## using Node Discovery v5
if (node.wakuDiscv5.isNil):
warn "Trying to run discovery v5 while it's disabled"
return
info "Starting discovery loop"
while node.wakuDiscv5.listening:
trace "Running discovery loop"
## Query for a random target and collect all discovered nodes
## @TODO: we could filter nodes here
let discoveredPeers = await node.wakuDiscv5.findRandomPeers()
if discoveredPeers.isOk:
## Let's attempt to connect to peers we
## have not encountered before
trace "Discovered peers", count=discoveredPeers.get().len()
let newPeers = discoveredPeers.get().filterIt(
not node.switch.peerStore.addressBook.contains(it.peerId))
if newPeers.len > 0:
debug "Connecting to newly discovered peers", count=newPeers.len()
await connectToNodes(node, newPeers)
# Discovery `queryRandom` can have a synchronous fast path for example
# when no peers are in the routing table. Don't run it in continuous loop.
#
# Also, give some time to dial the discovered nodes and update stats etc
await sleepAsync(5.seconds)
proc startDiscv5*(node: WakuNode): Future[bool] {.async.} =
## Start Discovery v5 service
info "Starting discovery v5 service"
if not node.wakuDiscv5.isNil:
## First start listening on configured port
try:
trace "Start listening on discv5 port"
node.wakuDiscv5.open()
except CatchableError:
error "Failed to start discovery service. UDP port may be already in use"
return false
## Start Discovery v5
trace "Start discv5 service"
node.wakuDiscv5.start()
trace "Start discovering new peers using discv5"
asyncSpawn node.runDiscv5Loop()
debug "Successfully started discovery v5 service"
return true
return false
proc stopDiscv5*(node: WakuNode): Future[bool] {.async.} =
## Stop Discovery v5 service
if not node.wakuDiscv5.isNil:
info "Stopping discovery v5 service"
## Stop Discovery v5 process and close listening port
if node.wakuDiscv5.listening:
trace "Stop listening on discv5 port"
await node.wakuDiscv5.closeWait()
debug "Successfully stopped discovery v5 service"
proc start*(node: WakuNode) {.async.} =
## Starts a created Waku Node and
## all its mounted protocols.
@ -726,6 +800,9 @@ proc start*(node: WakuNode) {.async.} =
proc stop*(node: WakuNode) {.async.} =
if not node.wakuRelay.isNil:
await node.wakuRelay.stop()
if not node.wakuDiscv5.isNil:
discard await node.stopDiscv5()
await node.switch.stop()
@ -815,10 +892,10 @@ when isMainModule:
## file. Optionally include persistent peer storage.
## No protocols are mounted yet.
## `udpPort` is only supplied to satisfy underlying APIs but is not
## actually a supported transport.
let udpPort = conf.tcpPort
let
let
## `udpPort` is only supplied to satisfy underlying APIs but is not
## actually a supported transport for libp2p traffic.
udpPort = conf.tcpPort
(extIp, extTcpPort, extUdpPort) = setupNat(conf.nat,
clientId,
Port(uint16(conf.tcpPort) + conf.portsShift),
@ -830,18 +907,33 @@ when isMainModule:
some(Port(uint16(conf.tcpPort) + conf.portsShift))
else:
extTcpPort
node = WakuNode.new(conf.nodekey,
conf.listenAddress, Port(uint16(conf.tcpPort) + conf.portsShift),
extIp, extPort,
pStorage,
conf.maxConnections.int)
let node = WakuNode.new(conf.nodekey,
conf.listenAddress, Port(uint16(conf.tcpPort) + conf.portsShift),
extIp, extPort,
pStorage,
conf.maxConnections.int)
if conf.discv5Discovery:
let discv5UdpPort = Port(uint16(conf.discv5UdpPort) + conf.portsShift)
node.wakuDiscv5 = WakuDiscoveryV5.new(
extIP, extTcpPort, some(discv5UdpPort),
conf.listenAddress,
discv5UdpPort,
conf.discv5BootstrapNodes,
conf.discv5EnrAutoUpdate,
keys.PrivateKey(conf.nodekey.skkey),
[], # Empty enr fields, for now
node.rng
)
ok(node)
# 3/6 Mount and initialize configured protocols
proc setupProtocols(node: var WakuNode,
conf: WakuNodeConf,
mStorage: WakuMessageStore = nil): SetupResult[bool] =
conf: WakuNodeConf,
mStorage: WakuMessageStore = nil): SetupResult[bool] =
## Setup configured protocols on an existing Waku v2 node.
## Optionally include persistent message storage.

View File

@ -2,8 +2,12 @@
# Collection of utilities related to Waku peers
import
std/strutils,
std/[options, strutils],
stew/results,
stew/shims/net,
eth/keys,
eth/p2p/discoveryv5/enr,
libp2p/crypto/[crypto, secp],
libp2p/[errors,
multiaddress,
peerid,
@ -53,6 +57,15 @@ proc initAddress(T: type MultiAddress, str: string): T {.raises: [Defect, ValueE
raise newException(ValueError,
"Invalid bootstrap node multi-address")
func getTransportProtocol(typedR: TypedRecord): Option[IpTransportProtocol] =
if typedR.tcp6.isSome or typedR.tcp.isSome:
return some(IpTransportProtocol.tcpProtocol)
if typedR.udp6.isSome or typedR.udp.isSome:
return some(IpTransportProtocol.udpProtocol)
return none(IpTransportProtocol)
## Parses a fully qualified peer multiaddr, in the
## format `(ip4|ip6)/tcp/p2p`, into dialable PeerInfo
proc parseRemotePeerInfo*(address: string): RemotePeerInfo {.raises: [Defect, ValueError, LPError].}=
@ -80,6 +93,58 @@ proc parseRemotePeerInfo*(address: string): RemotePeerInfo {.raises: [Defect, Va
return RemotePeerInfo.init(peerIdStr, @[wireAddr])
## Converts an ENR to dialable RemotePeerInfo
proc toRemotePeerInfo*(enr: enr.Record): Result[RemotePeerInfo, cstring] =
let typedR = ? enr.toTypedRecord
if not typedR.secp256k1.isSome:
return err("enr: no secp256k1 key in record")
let
pubKey = ? keys.PublicKey.fromRaw(typedR.secp256k1.get)
peerId = ? PeerID.init(crypto.PublicKey(scheme: Secp256k1,
skkey: secp.SkPublicKey(pubKey)))
var addrs = newSeq[MultiAddress]()
let transportProto = getTransportProtocol(typedR)
if transportProto.isNone:
return err("enr: could not determine transport protocol")
case transportProto.get()
of tcpProtocol:
if typedR.ip.isSome and typedR.tcp.isSome:
let ip = ipv4(typedR.ip.get)
addrs.add MultiAddress.init(ip, tcpProtocol, Port typedR.tcp.get)
if typedR.ip6.isSome:
let ip = ipv6(typedR.ip6.get)
if typedR.tcp6.isSome:
addrs.add MultiAddress.init(ip, tcpProtocol, Port typedR.tcp6.get)
elif typedR.tcp.isSome:
addrs.add MultiAddress.init(ip, tcpProtocol, Port typedR.tcp.get)
else:
discard
of udpProtocol:
if typedR.ip.isSome and typedR.udp.isSome:
let ip = ipv4(typedR.ip.get)
addrs.add MultiAddress.init(ip, udpProtocol, Port typedR.udp.get)
if typedR.ip6.isSome:
let ip = ipv6(typedR.ip6.get)
if typedR.udp6.isSome:
addrs.add MultiAddress.init(ip, udpProtocol, Port typedR.udp6.get)
elif typedR.udp.isSome:
addrs.add MultiAddress.init(ip, udpProtocol, Port typedR.udp.get)
else:
discard
if addrs.len == 0:
return err("enr: no addresses in record")
return ok(RemotePeerInfo.init(peerId, addrs))
## Converts the local peerInfo to dialable RemotePeerInfo
## Useful for testing or internal connections
proc toRemotePeerInfo*(peerInfo: PeerInfo): RemotePeerInfo =