feat(relay): add support for gossipsub peer exchange (#911)

This commit is contained in:
Hanno Cornelius 2022-03-29 10:09:48 +02:00 committed by GitHub
parent d0cf3ed1f9
commit 52de0e434f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 114 additions and 10 deletions

View File

@ -21,7 +21,8 @@ import
./v2/test_waku_discv5,
./v2/test_enr_utils,
./v2/test_waku_store_queue,
./v2/test_pagination_utils
./v2/test_pagination_utils,
./v2/test_peer_exchange
when defined(rln):
import ./v2/test_waku_rln_relay

View File

@ -0,0 +1,64 @@
{.used.}
import
std/[sequtils, options],
chronicles,
chronos,
libp2p/crypto/crypto,
libp2p/protocols/pubsub/gossipsub,
stew/shims/net,
testutils/unittests,
../../waku/v2/node/wakunode2,
../test_helpers
procSuite "Peer Exchange":
asyncTest "GossipSub (relay) peer exchange":
## Tests peer exchange
# Create nodes and ENR. These will be added to the discoverable list
let
bindIp = ValidIpAddress.init("0.0.0.0")
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node1 = WakuNode.new(nodeKey1, bindIp, Port(60000))
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node2 = WakuNode.new(nodeKey2, bindIp, Port(60002), sendSignedPeerRecord = true)
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
node3 = WakuNode.new(nodeKey3, bindIp, Port(60003), sendSignedPeerRecord = true)
var
peerExchangeHandler: RoutingRecordsHandler
completionFut = newFuture[bool]()
proc handlePeerExchange(peer: PeerId, topic: string,
peers: seq[RoutingRecordsPair]) {.gcsafe, raises: [Defect].} =
## Handle peers received via gossipsub peer exchange
let peerRecords = peers.mapIt(it.record.get())
check:
# Node 3 is informed of node 2 via peer exchange
peer == node1.switch.peerInfo.peerId
topic == defaultTopic
peerRecords.countIt(it.peerId == node2.switch.peerInfo.peerId) == 1
if (not completionFut.completed()):
completionFut.complete(true)
peerExchangeHandler = handlePeerExchange
node1.mountRelay()
node2.mountRelay()
node3.mountRelay(peerExchangeHandler = some(peerExchangeHandler))
# Ensure that node1 prunes all peers after the first connection
node1.wakuRelay.parameters.dHigh = 1
await allFutures([node1.start(), node2.start(), node3.start()])
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
await node3.connectToNodes(@[node1.switch.peerInfo.toRemotePeerInfo()])
check:
(await completionFut.withTimeout(5.seconds)) == true
await allFutures([node1.stop(), node2.stop(), node3.stop()])

View File

@ -89,6 +89,11 @@ type
defaultValue: true
name: "relay" }: bool
relayPeerExchange* {.
desc: "Enable gossipsub peer exchange in relay protocol: true|false",
defaultValue: true
name: "relay-peer-exchange" }: bool
rlnRelay* {.
desc: "Enable spam protection through rln-relay: true|false",
defaultValue: false

View File

@ -53,7 +53,7 @@ logScope:
const clientId* = "Nimbus Waku v2 node"
# Default topic
const defaultTopic = "/waku/2/default-waku/proto"
const defaultTopic* = "/waku/2/default-waku/proto"
# Default Waku Filter Timeout
const WakuFilterTimeout: Duration = 1.days
@ -170,6 +170,7 @@ proc new*(T: type WakuNode, nodeKey: crypto.PrivateKey,
secureCert: string = "",
wakuFlags = none(WakuEnrBitfield),
nameResolver: NameResolver = nil,
sendSignedPeerRecord = false,
dns4DomainName = none(string)
): T
{.raises: [Defect, LPError, IOError, TLSStreamProtocolError].} =
@ -242,7 +243,8 @@ proc new*(T: type WakuNode, nodeKey: crypto.PrivateKey,
wssEnabled = wssEnabled,
secureKeyPath = secureKey,
secureCertPath = secureCert,
nameResolver = nameResolver)
nameResolver = nameResolver,
sendSignedPeerRecord = sendSignedPeerRecord)
let wakuNode = WakuNode(
peerManager: PeerManager.new(switch, peerStorage),
@ -673,7 +675,8 @@ proc startRelay*(node: WakuNode) {.async.} =
proc mountRelay*(node: WakuNode,
topics: seq[string] = newSeq[string](),
relayMessages = true,
triggerSelf = true)
triggerSelf = true,
peerExchangeHandler = none(RoutingRecordsHandler))
# @TODO: Better error handling: CatchableError is raised by `waitFor`
{.gcsafe, raises: [Defect, InitializationError, LPError, CatchableError].} =
@ -699,6 +702,10 @@ proc mountRelay*(node: WakuNode,
## all configured topics plus the hard-coded defaultTopic(s)
wakuRelay.defaultTopics = concat(@[defaultTopic], topics)
## Add peer exchange handler
if peerExchangeHandler.isSome():
wakuRelay.routingRecordsHandler.add(peerExchangeHandler.get())
node.switch.mount(wakuRelay, protocolMatcher(WakuRelayCodec))
if relayMessages:
@ -1107,6 +1114,7 @@ when isMainModule:
conf.websocketSecureCertPath,
some(wakuFlags),
dnsResolver,
conf.relayPeerExchange, # We send our own signed peer record when peer exchange enabled
dns4DomainName
)
@ -1147,7 +1155,7 @@ when isMainModule:
ok(node)
# 4/7 Mount and initialize configured protocols
proc setupProtocols(node: var WakuNode,
proc setupProtocols(node: WakuNode,
conf: WakuNodeConf,
mStorage: WakuMessageStore = nil): SetupResult[bool] =
@ -1156,10 +1164,26 @@ when isMainModule:
## No protocols are started yet.
# Mount relay on all nodes
var peerExchangeHandler = none(RoutingRecordsHandler)
if conf.relayPeerExchange:
proc handlePeerExchange(peer: PeerId, topic: string,
peers: seq[RoutingRecordsPair]) {.gcsafe, raises: [Defect].} =
## Handle peers received via gossipsub peer exchange
# TODO: Only consider peers on pubsub topics we subscribe to
let exchangedPeers = peers.filterIt(it.record.isSome()) # only peers with populated records
.mapIt(toRemotePeerInfo(it.record.get()))
debug "connecting to exchanged peers", src=peer, topic=topic, numPeers=exchangedPeers.len
# asyncSpawn, as we don't want to block here
asyncSpawn node.connectToNodes(exchangedPeers, "peer exchange")
peerExchangeHandler = some(handlePeerExchange)
mountRelay(node,
conf.topics.split(" "),
relayMessages = conf.relay, # Indicates if node is capable to relay messages
)
conf.topics.split(" "),
relayMessages = conf.relay, # Indicates if node is capable to relay messages
peerExchangeHandler = peerExchangeHandler)
# Keepalive mounted on all nodes
mountLibp2pPing(node)

View File

@ -2,7 +2,7 @@
# Collection of utilities related to Waku peers
import
std/[options, strutils],
std/[options, sequtils, strutils],
stew/results,
stew/shims/net,
eth/keys,
@ -11,7 +11,8 @@ import
libp2p/[errors,
multiaddress,
peerid,
peerinfo]
peerinfo,
routing_record]
type
RemotePeerInfo* = ref object of RootObj
@ -152,6 +153,12 @@ proc toRemotePeerInfo*(enr: enr.Record): Result[RemotePeerInfo, cstring] =
return ok(RemotePeerInfo.init(peerId, addrs, some(enr)))
## Converts peer records to dialable RemotePeerInfo
## Useful if signed peer records have been received in an exchange
proc toRemotePeerInfo*(peerRecord: PeerRecord): RemotePeerInfo =
RemotePeerInfo.init(peerRecord.peerId,
peerRecord.addresses.mapIt(it.address))
## Converts the local peerInfo to dialable RemotePeerInfo
## Useful for testing or internal connections
proc toRemotePeerInfo*(peerInfo: PeerInfo): RemotePeerInfo =

View File

@ -61,6 +61,7 @@ proc newWakuSwitch*(
maxOut = -1,
maxConnsPerPeer = MaxConnectionsPerPeer,
nameResolver: NameResolver = nil,
sendSignedPeerRecord = false,
wssEnabled: bool = false,
secureKeyPath: string = "",
secureCertPath: string = ""): Switch
@ -77,6 +78,8 @@ proc newWakuSwitch*(
.withNoise()
.withTcpTransport(transportFlags)
.withNameResolver(nameResolver)
.withSignedPeerRecord(sendSignedPeerRecord)
if privKey.isSome():
b = b.withPrivateKey(privKey.get())
if wsAddress.isSome():