mirror of
https://github.com/waku-org/nwaku.git
synced 2025-02-19 02:16:33 +00:00
deploy: 52de0e434ffa25def7657a8babf0a456e52508a1
This commit is contained in:
parent
61a8e0f234
commit
aa7ee42181
@ -21,7 +21,8 @@ import
|
||||
./v2/test_waku_discv5,
|
||||
./v2/test_enr_utils,
|
||||
./v2/test_waku_store_queue,
|
||||
./v2/test_pagination_utils
|
||||
./v2/test_pagination_utils,
|
||||
./v2/test_peer_exchange
|
||||
|
||||
when defined(rln):
|
||||
import ./v2/test_waku_rln_relay
|
||||
|
64
tests/v2/test_peer_exchange.nim
Normal file
64
tests/v2/test_peer_exchange.nim
Normal file
@ -0,0 +1,64 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
std/[sequtils, options],
|
||||
chronicles,
|
||||
chronos,
|
||||
libp2p/crypto/crypto,
|
||||
libp2p/protocols/pubsub/gossipsub,
|
||||
stew/shims/net,
|
||||
testutils/unittests,
|
||||
../../waku/v2/node/wakunode2,
|
||||
../test_helpers
|
||||
|
||||
procSuite "Peer Exchange":
|
||||
asyncTest "GossipSub (relay) peer exchange":
|
||||
## Tests peer exchange
|
||||
|
||||
# Create nodes and ENR. These will be added to the discoverable list
|
||||
let
|
||||
bindIp = ValidIpAddress.init("0.0.0.0")
|
||||
nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node1 = WakuNode.new(nodeKey1, bindIp, Port(60000))
|
||||
nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node2 = WakuNode.new(nodeKey2, bindIp, Port(60002), sendSignedPeerRecord = true)
|
||||
nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||
node3 = WakuNode.new(nodeKey3, bindIp, Port(60003), sendSignedPeerRecord = true)
|
||||
|
||||
var
|
||||
peerExchangeHandler: RoutingRecordsHandler
|
||||
completionFut = newFuture[bool]()
|
||||
|
||||
proc handlePeerExchange(peer: PeerId, topic: string,
|
||||
peers: seq[RoutingRecordsPair]) {.gcsafe, raises: [Defect].} =
|
||||
## Handle peers received via gossipsub peer exchange
|
||||
let peerRecords = peers.mapIt(it.record.get())
|
||||
|
||||
check:
|
||||
# Node 3 is informed of node 2 via peer exchange
|
||||
peer == node1.switch.peerInfo.peerId
|
||||
topic == defaultTopic
|
||||
peerRecords.countIt(it.peerId == node2.switch.peerInfo.peerId) == 1
|
||||
|
||||
if (not completionFut.completed()):
|
||||
completionFut.complete(true)
|
||||
|
||||
peerExchangeHandler = handlePeerExchange
|
||||
|
||||
node1.mountRelay()
|
||||
node2.mountRelay()
|
||||
node3.mountRelay(peerExchangeHandler = some(peerExchangeHandler))
|
||||
|
||||
# Ensure that node1 prunes all peers after the first connection
|
||||
node1.wakuRelay.parameters.dHigh = 1
|
||||
|
||||
await allFutures([node1.start(), node2.start(), node3.start()])
|
||||
|
||||
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
|
||||
|
||||
await node3.connectToNodes(@[node1.switch.peerInfo.toRemotePeerInfo()])
|
||||
|
||||
check:
|
||||
(await completionFut.withTimeout(5.seconds)) == true
|
||||
|
||||
await allFutures([node1.stop(), node2.stop(), node3.stop()])
|
@ -2,7 +2,7 @@
|
||||
|
||||
# libtool - Provide generalized library-building support services.
|
||||
# Generated automatically by config.status (libbacktrace) version-unused
|
||||
# Libtool was configured on host fv-az449-957:
|
||||
# Libtool was configured on host fv-az447-409:
|
||||
# NOTE: Changes made to this file will be lost: look at ltmain.sh.
|
||||
#
|
||||
# Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005,
|
||||
|
@ -89,6 +89,11 @@ type
|
||||
defaultValue: true
|
||||
name: "relay" }: bool
|
||||
|
||||
relayPeerExchange* {.
|
||||
desc: "Enable gossipsub peer exchange in relay protocol: true|false",
|
||||
defaultValue: true
|
||||
name: "relay-peer-exchange" }: bool
|
||||
|
||||
rlnRelay* {.
|
||||
desc: "Enable spam protection through rln-relay: true|false",
|
||||
defaultValue: false
|
||||
|
@ -53,7 +53,7 @@ logScope:
|
||||
const clientId* = "Nimbus Waku v2 node"
|
||||
|
||||
# Default topic
|
||||
const defaultTopic = "/waku/2/default-waku/proto"
|
||||
const defaultTopic* = "/waku/2/default-waku/proto"
|
||||
|
||||
# Default Waku Filter Timeout
|
||||
const WakuFilterTimeout: Duration = 1.days
|
||||
@ -170,6 +170,7 @@ proc new*(T: type WakuNode, nodeKey: crypto.PrivateKey,
|
||||
secureCert: string = "",
|
||||
wakuFlags = none(WakuEnrBitfield),
|
||||
nameResolver: NameResolver = nil,
|
||||
sendSignedPeerRecord = false,
|
||||
dns4DomainName = none(string)
|
||||
): T
|
||||
{.raises: [Defect, LPError, IOError, TLSStreamProtocolError].} =
|
||||
@ -242,7 +243,8 @@ proc new*(T: type WakuNode, nodeKey: crypto.PrivateKey,
|
||||
wssEnabled = wssEnabled,
|
||||
secureKeyPath = secureKey,
|
||||
secureCertPath = secureCert,
|
||||
nameResolver = nameResolver)
|
||||
nameResolver = nameResolver,
|
||||
sendSignedPeerRecord = sendSignedPeerRecord)
|
||||
|
||||
let wakuNode = WakuNode(
|
||||
peerManager: PeerManager.new(switch, peerStorage),
|
||||
@ -673,7 +675,8 @@ proc startRelay*(node: WakuNode) {.async.} =
|
||||
proc mountRelay*(node: WakuNode,
|
||||
topics: seq[string] = newSeq[string](),
|
||||
relayMessages = true,
|
||||
triggerSelf = true)
|
||||
triggerSelf = true,
|
||||
peerExchangeHandler = none(RoutingRecordsHandler))
|
||||
# @TODO: Better error handling: CatchableError is raised by `waitFor`
|
||||
{.gcsafe, raises: [Defect, InitializationError, LPError, CatchableError].} =
|
||||
|
||||
@ -699,6 +702,10 @@ proc mountRelay*(node: WakuNode,
|
||||
## all configured topics plus the hard-coded defaultTopic(s)
|
||||
wakuRelay.defaultTopics = concat(@[defaultTopic], topics)
|
||||
|
||||
## Add peer exchange handler
|
||||
if peerExchangeHandler.isSome():
|
||||
wakuRelay.routingRecordsHandler.add(peerExchangeHandler.get())
|
||||
|
||||
node.switch.mount(wakuRelay, protocolMatcher(WakuRelayCodec))
|
||||
|
||||
if relayMessages:
|
||||
@ -1107,6 +1114,7 @@ when isMainModule:
|
||||
conf.websocketSecureCertPath,
|
||||
some(wakuFlags),
|
||||
dnsResolver,
|
||||
conf.relayPeerExchange, # We send our own signed peer record when peer exchange enabled
|
||||
dns4DomainName
|
||||
)
|
||||
|
||||
@ -1147,7 +1155,7 @@ when isMainModule:
|
||||
ok(node)
|
||||
|
||||
# 4/7 Mount and initialize configured protocols
|
||||
proc setupProtocols(node: var WakuNode,
|
||||
proc setupProtocols(node: WakuNode,
|
||||
conf: WakuNodeConf,
|
||||
mStorage: WakuMessageStore = nil): SetupResult[bool] =
|
||||
|
||||
@ -1156,10 +1164,26 @@ when isMainModule:
|
||||
## No protocols are started yet.
|
||||
|
||||
# Mount relay on all nodes
|
||||
var peerExchangeHandler = none(RoutingRecordsHandler)
|
||||
if conf.relayPeerExchange:
|
||||
proc handlePeerExchange(peer: PeerId, topic: string,
|
||||
peers: seq[RoutingRecordsPair]) {.gcsafe, raises: [Defect].} =
|
||||
## Handle peers received via gossipsub peer exchange
|
||||
# TODO: Only consider peers on pubsub topics we subscribe to
|
||||
let exchangedPeers = peers.filterIt(it.record.isSome()) # only peers with populated records
|
||||
.mapIt(toRemotePeerInfo(it.record.get()))
|
||||
|
||||
debug "connecting to exchanged peers", src=peer, topic=topic, numPeers=exchangedPeers.len
|
||||
|
||||
# asyncSpawn, as we don't want to block here
|
||||
asyncSpawn node.connectToNodes(exchangedPeers, "peer exchange")
|
||||
|
||||
peerExchangeHandler = some(handlePeerExchange)
|
||||
|
||||
mountRelay(node,
|
||||
conf.topics.split(" "),
|
||||
relayMessages = conf.relay, # Indicates if node is capable to relay messages
|
||||
)
|
||||
conf.topics.split(" "),
|
||||
relayMessages = conf.relay, # Indicates if node is capable to relay messages
|
||||
peerExchangeHandler = peerExchangeHandler)
|
||||
|
||||
# Keepalive mounted on all nodes
|
||||
mountLibp2pPing(node)
|
||||
|
@ -2,7 +2,7 @@
|
||||
|
||||
# Collection of utilities related to Waku peers
|
||||
import
|
||||
std/[options, strutils],
|
||||
std/[options, sequtils, strutils],
|
||||
stew/results,
|
||||
stew/shims/net,
|
||||
eth/keys,
|
||||
@ -11,7 +11,8 @@ import
|
||||
libp2p/[errors,
|
||||
multiaddress,
|
||||
peerid,
|
||||
peerinfo]
|
||||
peerinfo,
|
||||
routing_record]
|
||||
|
||||
type
|
||||
RemotePeerInfo* = ref object of RootObj
|
||||
@ -152,6 +153,12 @@ proc toRemotePeerInfo*(enr: enr.Record): Result[RemotePeerInfo, cstring] =
|
||||
|
||||
return ok(RemotePeerInfo.init(peerId, addrs, some(enr)))
|
||||
|
||||
## Converts peer records to dialable RemotePeerInfo
|
||||
## Useful if signed peer records have been received in an exchange
|
||||
proc toRemotePeerInfo*(peerRecord: PeerRecord): RemotePeerInfo =
|
||||
RemotePeerInfo.init(peerRecord.peerId,
|
||||
peerRecord.addresses.mapIt(it.address))
|
||||
|
||||
## Converts the local peerInfo to dialable RemotePeerInfo
|
||||
## Useful for testing or internal connections
|
||||
proc toRemotePeerInfo*(peerInfo: PeerInfo): RemotePeerInfo =
|
||||
|
@ -61,6 +61,7 @@ proc newWakuSwitch*(
|
||||
maxOut = -1,
|
||||
maxConnsPerPeer = MaxConnectionsPerPeer,
|
||||
nameResolver: NameResolver = nil,
|
||||
sendSignedPeerRecord = false,
|
||||
wssEnabled: bool = false,
|
||||
secureKeyPath: string = "",
|
||||
secureCertPath: string = ""): Switch
|
||||
@ -77,6 +78,8 @@ proc newWakuSwitch*(
|
||||
.withNoise()
|
||||
.withTcpTransport(transportFlags)
|
||||
.withNameResolver(nameResolver)
|
||||
.withSignedPeerRecord(sendSignedPeerRecord)
|
||||
|
||||
if privKey.isSome():
|
||||
b = b.withPrivateKey(privKey.get())
|
||||
if wsAddress.isSome():
|
||||
|
Loading…
x
Reference in New Issue
Block a user