test(peer-exchange): Implement peer exchange tests (#2464)

* Implement peer exchange tests.
* Refactor, and remove duplicated tests.
* feat(wakunode): Resultify fetch peer exchange peers (#2486)
This commit is contained in:
Álex Cabeza Romero 2024-03-14 17:48:09 +01:00 committed by GitHub
parent 2c1391d304
commit f436240d53
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 954 additions and 542 deletions

View File

@ -38,7 +38,12 @@ when defined(waku_exp_store_resume):
# TODO: Review store resume test cases (#1282)
import ./waku_store/test_resume
import ./waku_relay/test_all, ./waku_filter_v2/test_all, ./waku_lightpush/test_all
import
./node/test_all,
./waku_filter_v2/test_all,
./waku_peer_exchange/test_all,
./waku_lightpush/test_all,
./waku_relay/test_all
import
# Waku v2 tests
@ -47,7 +52,6 @@ import
# Waku Filter
./test_waku_filter_legacy,
./test_wakunode_filter_legacy,
./test_waku_peer_exchange,
./test_peer_store_extended,
./test_message_cache,
./test_peer_manager,
@ -56,7 +60,7 @@ import
./test_waku_enr,
./test_waku_dnsdisc,
./test_waku_discv5,
./test_peer_exchange,
./test_relay_peer_exchange,
./test_waku_noise,
./test_waku_noise_sessions,
./test_waku_netconfig,

View File

@ -1,4 +1,5 @@
import
./test_wakunode_filter,
./test_wakunode_lightpush,
./test_wakunode_peer_exchange,
./test_wakunode_store

View File

@ -56,6 +56,7 @@ suite "Waku Filter - End to End":
await allFutures(server.start(), client.start())
await server.mountFilter()
await server.mountLegacyFilter()
await client.mountFilterClient()
client.wakuFilterClient.registerPushHandler(messagePushHandler)

View File

@ -0,0 +1,307 @@
{.used.}
import
std/[options, sequtils],
testutils/unittests,
chronos,
chronicles,
stew/shims/net,
libp2p/switch,
libp2p/peerId,
libp2p/crypto/crypto,
eth/keys,
eth/p2p/discoveryv5/enr
import
../../../waku/[
waku_node,
waku_discv5,
waku_peer_exchange,
node/peer_manager,
waku_relay/protocol,
waku_core
],
../waku_peer_exchange/utils,
../testlib/[wakucore, wakunode, testasync]
suite "Waku Peer Exchange":
let
bindIp: IPAddress = parseIpAddress("0.0.0.0")
bindPort: Port = Port(0)
var node {.threadvar.}: WakuNode
suite "mountPeerExchange":
asyncSetup:
node = newTestWakuNode(generateSecp256k1Key(), bindIp, bindPort)
asyncTest "Started node mounts peer exchange":
# Given a started node without peer exchange mounted
await node.start()
check:
node.wakuPeerExchange == nil
# When mounting peer exchange
await node.mountPeerExchange()
# Then peer exchange is mounted
check:
node.wakuPeerExchange != nil
node.wakuPeerExchange.started == true
# Cleanup
await node.stop()
asyncTest "Stopped node mounts peer exchange":
# Given a stopped node without peer exchange mounted
check:
node.wakuPeerExchange == nil
# When mounting peer exchange
await node.mountPeerExchange()
# Then peer exchange is mounted
check:
node.wakuPeerExchange != nil
node.wakuPeerExchange.started == false
suite "fetchPeerExchangePeers":
var node2 {.threadvar.}: WakuNode
asyncSetup:
node = newTestWakuNode(generateSecp256k1Key(), bindIp, bindPort)
node2 = newTestWakuNode(generateSecp256k1Key(), bindIp, bindPort)
await allFutures(node.start(), node2.start())
asyncTeardown:
await allFutures(node.stop(), node2.stop())
asyncTest "Node fetches without mounting peer exchange":
# When a node, without peer exchange mounted, fetches peers
let res = await node.fetchPeerExchangePeers(1)
# Then no peers are fetched
check:
node.peerManager.peerStore.peers.len == 0
res.error == "PeerExchange is not mounted"
asyncTest "Node fetches with mounted peer exchange, but no peers":
# Given a node with peer exchange mounted
await node.mountPeerExchange()
# When a node fetches peers
let res = await node.fetchPeerExchangePeers(1)
check res.error == "Peer exchange failure: peer_not_found_failure"
# Then no peers are fetched
check node.peerManager.peerStore.peers.len == 0
asyncTest "Node succesfully exchanges px peers with faked discv5":
# Given both nodes mount peer exchange
await allFutures([node.mountPeerExchange(), node2.mountPeerExchange()])
check node.peerManager.peerStore.peers.len == 0
# Mock that we discovered a node (to avoid running discv5)
var enr = enr.Record()
assert enr.fromUri(
"enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB"
), "Failed to parse ENR"
node2.wakuPeerExchange.enrCache.add(enr)
# Set node2 as service peer (default one) for px protocol
node.peerManager.addServicePeer(
node2.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec
)
# Request 1 peer from peer exchange protocol
let res = await node.fetchPeerExchangePeers(1)
check res.tryGet() == 1
# Check that the peer ended up in the peerstore
let rpInfo = enr.toRemotePeerInfo.get()
check:
node.peerManager.peerStore.peers.anyIt(it.peerId == rpInfo.peerId)
node.peerManager.peerStore.peers.anyIt(it.addrs == rpInfo.addrs)
suite "setPeerExchangePeer":
var node2 {.threadvar.}: WakuNode
asyncSetup:
node = newTestWakuNode(generateSecp256k1Key(), bindIp, bindPort)
node2 = newTestWakuNode(generateSecp256k1Key(), bindIp, bindPort)
await allFutures(node.start(), node2.start())
asyncTeardown:
await allFutures(node.stop(), node2.stop())
asyncTest "peer set successfully":
# Given a node with peer exchange mounted
await node.mountPeerExchange()
let initialPeers = node.peerManager.peerStore.peers.len
# And a valid peer info
let remotePeerInfo2 = node2.peerInfo.toRemotePeerInfo()
# When making a request with a valid peer info
node.setPeerExchangePeer(remotePeerInfo2)
# Then the peer is added to the peer store
check:
node.peerManager.peerStore.peers.len == (initialPeers + 1)
asyncTest "peer exchange not mounted":
# Given a node without peer exchange mounted
check node.wakuPeerExchange == nil
let initialPeers = node.peerManager.peerStore.peers.len
# And a valid peer info
let invalidMultiAddress = MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
# When making any request with an invalid peer info
node.setPeerExchangePeer(invalidMultiAddress)
# Then no peer is added to the peer store
check:
node.peerManager.peerStore.peers.len == initialPeers
asyncTest "peer info parse error":
# Given a node with peer exchange mounted
await node.mountPeerExchange()
let initialPeers = node.peerManager.peerStore.peers.len
# And given a peer info with an invalid peer id
var remotePeerInfo2 = node2.peerInfo.toRemotePeerInfo()
remotePeerInfo2.peerId.data.add(255.byte)
# When making any request with an invalid peer info
node.setPeerExchangePeer("invalidpeerinfo")
# Then no peer is added to the peer store
check:
node.peerManager.peerStore.peers.len == initialPeers
suite "Waku Peer Exchange with discv5":
asyncTest "Node successfully exchanges px peers with real discv5":
## Given (copied from test_waku_discv5.nim)
let
# todo: px flag
flags =
CapabilitiesBitfield.init(
lightpush = false, filter = false, store = false, relay = true
)
bindIp = parseIpAddress("0.0.0.0")
extIp = parseIpAddress("127.0.0.1")
nodeKey1 = generateSecp256k1Key()
nodeTcpPort1 = Port(64010)
nodeUdpPort1 = Port(9000)
node1 =
newTestWakuNode(
nodeKey1,
bindIp,
nodeTcpPort1,
some(extIp),
wakuFlags = some(flags),
discv5UdpPort = some(nodeUdpPort1),
)
nodeKey2 = generateSecp256k1Key()
nodeTcpPort2 = Port(64012)
nodeUdpPort2 = Port(9002)
node2 =
newTestWakuNode(
nodeKey2,
bindIp,
nodeTcpPort2,
some(extIp),
wakuFlags = some(flags),
discv5UdpPort = some(nodeUdpPort2),
)
nodeKey3 = generateSecp256k1Key()
nodeTcpPort3 = Port(64014)
nodeUdpPort3 = Port(9004)
node3 =
newTestWakuNode(
nodeKey3,
bindIp,
nodeTcpPort3,
some(extIp),
wakuFlags = some(flags),
discv5UdpPort = some(nodeUdpPort3),
)
# discv5
let
conf1 =
WakuDiscoveryV5Config(
discv5Config: none(DiscoveryConfig),
address: bindIp,
port: nodeUdpPort1,
privateKey: keys.PrivateKey(nodeKey1.skkey),
bootstrapRecords: @[],
autoupdateRecord: true,
)
let
disc1 =
WakuDiscoveryV5.new(node1.rng, conf1, some(node1.enr), some(node1.peerManager))
let
conf2 =
WakuDiscoveryV5Config(
discv5Config: none(DiscoveryConfig),
address: bindIp,
port: nodeUdpPort2,
privateKey: keys.PrivateKey(nodeKey2.skkey),
bootstrapRecords: @[disc1.protocol.getRecord()],
autoupdateRecord: true,
)
let
disc2 =
WakuDiscoveryV5.new(node2.rng, conf2, some(node2.enr), some(node2.peerManager))
await allFutures(node1.start(), node2.start(), node3.start())
let resultDisc1StartRes = await disc1.start()
assert resultDisc1StartRes.isOk(), resultDisc1StartRes.error
let resultDisc2StartRes = await disc2.start()
assert resultDisc2StartRes.isOk(), resultDisc2StartRes.error
## When
var attempts = 10
while (disc1.protocol.nodesDiscovered < 1 or disc2.protocol.nodesDiscovered < 1) and
attempts > 0:
await sleepAsync(1.seconds)
attempts -= 1
# node2 can be connected, so will be returned by peer exchange
require (
await node1.peerManager.connectRelay(node2.switch.peerInfo.toRemotePeerInfo())
)
# Mount peer exchange
await node1.mountPeerExchange()
await node3.mountPeerExchange()
let
dialResponse =
await node3.dialForPeerExchange(node1.switch.peerInfo.toRemotePeerInfo())
check dialResponse.isOk
let
requestPeers = 1
currentPeers = node3.peerManager.peerStore.peers.len
let res = await node3.fetchPeerExchangePeers(1)
check res.tryGet() == 1
# Then node3 has received 1 peer from node1
check:
node3.peerManager.peerStore.peers.len == currentPeers + requestPeers
await allFutures(
[node1.stop(), node2.stop(), node3.stop(), disc1.stop(), disc2.stop()]
)

View File

@ -508,7 +508,7 @@ suite "Waku Store - End to End - Sorted Archive":
# Cleanup
waitFor otherServer.stop()
suite "Waku Store - End to End - Unsorted Archive with provided Timestamp":
suite "Waku Store - End to End - Unsorted Archive":
var pubsubTopic {.threadvar.}: PubsubTopic
var contentTopic {.threadvar.}: ContentTopic
var contentTopicSeq {.threadvar.}: seq[ContentTopic]
@ -652,111 +652,6 @@ suite "Waku Store - End to End - Unsorted Archive with provided Timestamp":
unsortedArchiveMessages[9]
]
suite "Waku Store - End to End - Unsorted Archive without provided Timestamp":
var pubsubTopic {.threadvar.}: PubsubTopic
var contentTopic {.threadvar.}: ContentTopic
var contentTopicSeq {.threadvar.}: seq[ContentTopic]
var historyQuery {.threadvar.}: HistoryQuery
var unsortedArchiveMessages {.threadvar.}: seq[WakuMessage]
var server {.threadvar.}: WakuNode
var client {.threadvar.}: WakuNode
var serverRemotePeerInfo {.threadvar.}: RemotePeerInfo
asyncSetup:
pubsubTopic = DefaultPubsubTopic
contentTopic = DefaultContentTopic
contentTopicSeq = @[contentTopic]
historyQuery =
HistoryQuery(
pubsubTopic: some(pubsubTopic),
contentTopics: contentTopicSeq,
direction: PagingDirection.FORWARD,
pageSize: 5,
)
unsortedArchiveMessages =
@[ # Not providing explicit timestamp means it will be set in "arrive" order
fakeWakuMessage(@[byte 09]),
fakeWakuMessage(@[byte 07]),
fakeWakuMessage(@[byte 05]),
fakeWakuMessage(@[byte 03]),
fakeWakuMessage(@[byte 01]),
fakeWakuMessage(@[byte 00]),
fakeWakuMessage(@[byte 02]),
fakeWakuMessage(@[byte 04]),
fakeWakuMessage(@[byte 06]),
fakeWakuMessage(@[byte 08])
]
let
serverKey = generateSecp256k1Key()
clientKey = generateSecp256k1Key()
server = newTestWakuNode(serverKey, ValidIpAddress.init("0.0.0.0"), Port(0))
client = newTestWakuNode(clientKey, ValidIpAddress.init("0.0.0.0"), Port(0))
let
unsortedArchiveDriverWithMessages =
newArchiveDriverWithMessages(pubsubTopic, unsortedArchiveMessages)
mountUnsortedArchiveResult =
server.mountArchive(unsortedArchiveDriverWithMessages)
assert mountUnsortedArchiveResult.isOk()
waitFor server.mountStore()
client.mountStoreClient()
waitFor allFutures(server.start(), client.start())
serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo()
asyncTeardown:
waitFor allFutures(client.stop(), server.stop())
asyncTest "Sorting using receiverTime":
# When making a history query
let queryResponse = await client.query(historyQuery, serverRemotePeerInfo)
# Then the response contains the messages
check:
queryResponse.get().messages ==
@[
unsortedArchiveMessages[0],
unsortedArchiveMessages[1],
unsortedArchiveMessages[2],
unsortedArchiveMessages[3],
unsortedArchiveMessages[4]
]
# Given the next query
var
historyQuery2 =
HistoryQuery(
cursor: queryResponse.get().cursor,
pubsubTopic: some(pubsubTopic),
contentTopics: contentTopicSeq,
direction: PagingDirection.FORWARD,
pageSize: 5,
)
# When making the next history query
let queryResponse2 = await client.query(historyQuery2, serverRemotePeerInfo)
# Then the response contains the messages
check:
queryResponse2.get().messages ==
@[
unsortedArchiveMessages[5],
unsortedArchiveMessages[6],
unsortedArchiveMessages[7],
unsortedArchiveMessages[8],
unsortedArchiveMessages[9]
]
suite "Waku Store - End to End - Archive with Multiple Topics":
var pubsubTopic {.threadvar.}: PubsubTopic
var pubsubTopicB {.threadvar.}: PubsubTopic
@ -794,11 +689,9 @@ suite "Waku Store - End to End - Archive with Multiple Topics":
)
let timeOrigin = now()
proc myOriginTs(offset = 0): Timestamp {.gcsafe, raises: [].} =
ts(offset, timeOrigin)
originTs = myOriginTs
originTs =
proc(offset = 0): Timestamp {.gcsafe, raises: [].} =
ts(offset, timeOrigin)
archiveMessages =
@[
@ -828,9 +721,9 @@ suite "Waku Store - End to End - Archive with Multiple Topics":
newSqliteArchiveDriver().put(pubsubTopic, archiveMessages[0..<6]).put(
pubsubTopicB, archiveMessages[6..<10]
)
let mountUnsortedArchiveResult = server.mountArchive(archiveDriver)
let mountSortedArchiveResult = server.mountArchive(archiveDriver)
assert mountUnsortedArchiveResult.isOk()
assert mountSortedArchiveResult.isOk()
waitFor server.mountStore()
client.mountStoreClient()

View File

@ -1,73 +0,0 @@
{.used.}
import
std/[sequtils, options],
stew/shims/net,
testutils/unittests,
chronicles,
chronos,
libp2p/peerid,
libp2p/crypto/crypto,
libp2p/protocols/pubsub/gossipsub
import
../../waku/waku_core,
../../waku/waku_node,
./testlib/wakucore,
./testlib/wakunode
procSuite "Peer Exchange":
asyncTest "GossipSub (relay) peer exchange":
## Tests peer exchange
# Create nodes and ENR. These will be added to the discoverable list
let
bindIp = parseIpAddress("0.0.0.0")
nodeKey1 = generateSecp256k1Key()
node1 = newTestWakuNode(nodeKey1, bindIp, Port(0))
nodeKey2 = generateSecp256k1Key()
node2 = newTestWakuNode(nodeKey2, bindIp, Port(0), sendSignedPeerRecord = true)
nodeKey3 = generateSecp256k1Key()
node3 = newTestWakuNode(nodeKey3, bindIp, Port(0), sendSignedPeerRecord = true)
var
peerExchangeHandler, emptyHandler: RoutingRecordsHandler
completionFut = newFuture[bool]()
proc ignorePeerExchange(peer: PeerId, topic: string,
peers: seq[RoutingRecordsPair]) {.gcsafe.} =
discard
proc handlePeerExchange(peer: PeerId, topic: string,
peers: seq[RoutingRecordsPair]) {.gcsafe.} =
## Handle peers received via gossipsub peer exchange
let peerRecords = peers.mapIt(it.record.get())
check:
# Node 3 is informed of node 2 via peer exchange
peer == node1.switch.peerInfo.peerId
topic == DefaultPubsubTopic
peerRecords.countIt(it.peerId == node2.switch.peerInfo.peerId) == 1
if (not completionFut.completed()):
completionFut.complete(true)
peerExchangeHandler = handlePeerExchange
emptyHandler = ignorePeerExchange
await node1.mountRelay(@[DefaultPubsubTopic], some(emptyHandler))
await node2.mountRelay(@[DefaultPubsubTopic], some(emptyHandler))
await node3.mountRelay(@[DefaultPubsubTopic], some(peerExchangeHandler))
# Ensure that node1 prunes all peers after the first connection
node1.wakuRelay.parameters.dHigh = 1
await allFutures([node1.start(), node2.start(), node3.start()])
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
await node3.connectToNodes(@[node1.switch.peerInfo.toRemotePeerInfo()])
check:
(await completionFut.withTimeout(5.seconds)) == true
await allFutures([node1.stop(), node2.stop(), node3.stop()])

View File

@ -0,0 +1,100 @@
{.used.}
import
std/[sequtils, options],
stew/shims/net,
testutils/unittests,
chronicles,
chronos,
libp2p/peerid,
libp2p/crypto/crypto,
libp2p/protocols/pubsub/gossipsub
import
../../waku/waku_core,
../../waku/waku_node,
./testlib/wakucore,
./testlib/wakunode
procSuite "Relay (GossipSub) Peer Exchange":
asyncTest "Mount relay without peer exchange handler":
# Given two nodes
let
listenAddress = parseIpAddress("0.0.0.0")
port = Port(0)
node1Key = generateSecp256k1Key()
node1 = newTestWakuNode(node1Key, listenAddress, port)
node2Key = generateSecp256k1Key()
node2 =
newTestWakuNode(node2Key, listenAddress, port, sendSignedPeerRecord = true)
# When both client and server mount relay without a handler
await node1.mountRelay(@[DefaultPubsubTopic])
await node2.mountRelay(@[DefaultPubsubTopic], none(RoutingRecordsHandler))
# Then the relays are mounted without a handler
check:
node1.wakuRelay.parameters.enablePX == false
node1.wakuRelay.routingRecordsHandler.len == 0
node2.wakuRelay.parameters.enablePX == false
node2.wakuRelay.routingRecordsHandler.len == 0
asyncTest "Mount relay with peer exchange handler":
## Given three nodes
# Create nodes and ENR. These will be added to the discoverable list
let
bindIp = parseIpAddress("0.0.0.0")
port = Port(0)
nodeKey1 = generateSecp256k1Key()
node1 = newTestWakuNode(nodeKey1, bindIp, port)
nodeKey2 = generateSecp256k1Key()
node2 = newTestWakuNode(nodeKey2, bindIp, port, sendSignedPeerRecord = true)
nodeKey3 = generateSecp256k1Key()
node3 = newTestWakuNode(nodeKey3, bindIp, port, sendSignedPeerRecord = true)
# Given some peer exchange handlers
proc emptyPeerExchangeHandler(
peer: PeerId, topic: string, peers: seq[RoutingRecordsPair]
) {.gcsafe.} =
discard
var completionFut = newFuture[bool]()
proc peerExchangeHandler(
peer: PeerId, topic: string, peers: seq[RoutingRecordsPair]
) {.gcsafe.} =
## Handle peers received via gossipsub peer exchange
let peerRecords = peers.mapIt(it.record.get())
check:
# Node 3 is informed of node 2 via peer exchange
peer == node1.switch.peerInfo.peerId
topic == DefaultPubsubTopic
peerRecords.countIt(it.peerId == node2.switch.peerInfo.peerId) == 1
if (not completionFut.completed()):
completionFut.complete(true)
let
emptyPeerExchangeHandle: RoutingRecordsHandler = emptyPeerExchangeHandler
peerExchangeHandle: RoutingRecordsHandler = peerExchangeHandler
# Givem the nodes mount relay with a peer exchange handler
await node1.mountRelay(@[DefaultPubsubTopic], some(emptyPeerExchangeHandle))
await node2.mountRelay(@[DefaultPubsubTopic], some(emptyPeerExchangeHandle))
await node3.mountRelay(@[DefaultPubsubTopic], some(peerExchangeHandle))
# Ensure that node1 prunes all peers after the first connection
node1.wakuRelay.parameters.dHigh = 1
await allFutures([node1.start(), node2.start(), node3.start()])
# When nodes are connected
await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()])
await node3.connectToNodes(@[node1.switch.peerInfo.toRemotePeerInfo()])
# Verify that the handlePeerExchange was called (node3)
check:
(await completionFut.withTimeout(5.seconds)) == true
# Clean up
await allFutures([node1.stop(), node2.stop(), node3.stop()])

View File

@ -1,316 +0,0 @@
{.used.}
import
std/[options, sequtils, tables],
testutils/unittests,
chronos,
chronicles,
stew/shims/net,
libp2p/switch,
libp2p/peerId,
libp2p/crypto/crypto,
libp2p/multistream,
libp2p/muxers/muxer,
eth/keys,
eth/p2p/discoveryv5/enr
import
../../waku/waku_node,
../../waku/node/peer_manager,
../../waku/waku_discv5,
../../waku/waku_peer_exchange,
../../waku/waku_peer_exchange/rpc,
../../waku/waku_peer_exchange/rpc_codec,
../../waku/waku_peer_exchange/protocol,
./testlib/wakucore,
./testlib/wakunode
# TODO: Extend test coverage
procSuite "Waku Peer Exchange":
asyncTest "encode and decode peer exchange response":
## Setup
var
enr1 = enr.Record(seqNum: 0, raw: @[])
enr2 = enr.Record(seqNum: 0, raw: @[])
check enr1.fromUri("enr:-JK4QPmO-sE2ELiWr8qVFs1kaY4jQZQpNaHvSPRmKiKcaDoqYRdki2c1BKSliImsxFeOD_UHnkddNL2l0XT9wlsP0WEBgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQIMwKqlOl3zpwnrsKRKHuWPSuFzit1Cl6IZvL2uzBRe8oN0Y3CC6mKDdWRwgiMqhXdha3UyDw")
check enr2.fromUri("enr:-Iu4QK_T7kzAmewG92u1pr7o6St3sBqXaiIaWIsFNW53_maJEaOtGLSN2FUbm6LmVxSfb1WfC7Eyk-nFYI7Gs3SlchwBgmlkgnY0gmlwhI5d6VKJc2VjcDI1NmsxoQLPYQDvrrFdCrhqw3JuFaGD71I8PtPfk6e7TJ3pg_vFQYN0Y3CC6mKDdWRwgiMq")
let peerInfos = @[
PeerExchangePeerInfo(enr: enr1.raw),
PeerExchangePeerInfo(enr: enr2.raw),
]
var rpc = PeerExchangeRpc(
response: PeerExchangeResponse(
peerInfos: peerInfos
)
)
## When
let
rpcBuffer: seq[byte] = rpc.encode().buffer
res = PeerExchangeRpc.decode(rpcBuffer)
## Then
check:
res.isOk
res.get().response.peerInfos == peerInfos
## When
var
resEnr1 = enr.Record(seqNum: 0, raw: @[])
resEnr2 = enr.Record(seqNum: 0, raw: @[])
discard resEnr1.fromBytes(res.get().response.peerInfos[0].enr)
discard resEnr2.fromBytes(res.get().response.peerInfos[1].enr)
## Then
check:
resEnr1 == enr1
resEnr2 == enr2
asyncTest "retrieve and provide peer exchange peers from discv5":
## Given (copied from test_waku_discv5.nim)
let
# todo: px flag
flags = CapabilitiesBitfield.init(
lightpush = false,
filter = false,
store = false,
relay = true
)
bindIp = parseIpAddress("0.0.0.0")
extIp = parseIpAddress("127.0.0.1")
nodeKey1 = generateSecp256k1Key()
nodeTcpPort1 = Port(64010)
nodeUdpPort1 = Port(9000)
node1 = newTestWakuNode(
nodeKey1,
bindIp,
nodeTcpPort1,
some(extIp),
wakuFlags = some(flags),
discv5UdpPort = some(nodeUdpPort1)
)
nodeKey2 = generateSecp256k1Key()
nodeTcpPort2 = Port(64012)
nodeUdpPort2 = Port(9002)
node2 = newTestWakuNode(nodeKey2,
bindIp,
nodeTcpPort2,
some(extIp),
wakuFlags = some(flags),
discv5UdpPort = some(nodeUdpPort2)
)
nodeKey3 = generateSecp256k1Key()
nodeTcpPort3 = Port(64014)
nodeUdpPort3 = Port(9004)
node3 = newTestWakuNode(nodeKey3,
bindIp,
nodeTcpPort3,
some(extIp),
wakuFlags = some(flags),
discv5UdpPort = some(nodeUdpPort3)
)
# discv5
let conf1 = WakuDiscoveryV5Config(
discv5Config: none(DiscoveryConfig),
address: bindIp,
port: nodeUdpPort1,
privateKey: keys.PrivateKey(nodeKey1.skkey),
bootstrapRecords: @[],
autoupdateRecord: true
)
let disc1 = WakuDiscoveryV5.new(
node1.rng,
conf1,
some(node1.enr),
some(node1.peerManager),
)
let conf2 = WakuDiscoveryV5Config(
discv5Config: none(DiscoveryConfig),
address: bindIp,
port: nodeUdpPort2,
privateKey: keys.PrivateKey(nodeKey2.skkey),
bootstrapRecords: @[disc1.protocol.getRecord()],
autoupdateRecord: true
)
let disc2 = WakuDiscoveryV5.new(
node2.rng,
conf2,
some(node2.enr),
some(node2.peerManager),
)
await allFutures(node1.start(), node2.start(), node3.start())
let resultDisc1StartRes = await disc1.start()
assert resultDisc1StartRes.isOk(), resultDisc1StartRes.error
let resultDisc2StartRes = await disc2.start()
assert resultDisc2StartRes.isOk(), resultDisc2StartRes.error
## When
var attempts = 10
while (disc1.protocol.nodesDiscovered < 1 or
disc2.protocol.nodesDiscovered < 1) and
attempts > 0:
await sleepAsync(1.seconds)
attempts -= 1
# node2 can be connected, so will be returned by peer exchange
require (await node1.peerManager.connectRelay(node2.switch.peerInfo.toRemotePeerInfo()))
# Mount peer exchange
await node1.mountPeerExchange()
await node3.mountPeerExchange()
var peerInfosLen = 0
var response: WakuPeerExchangeResult[PeerExchangeResponse]
attempts = 10
while peerInfosLen == 0 and attempts > 0:
var connOpt = await node3.peerManager.dialPeer(node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec)
require connOpt.isSome
response = await node3.wakuPeerExchange.request(1, connOpt.get())
require response.isOk
peerInfosLen = response.get().peerInfos.len
await sleepAsync(1.seconds)
attempts -= 1
## Then
check:
response.get().peerInfos.len == 1
response.get().peerInfos[0].enr == disc2.protocol.localNode.record.raw
await allFutures([node1.stop(), node2.stop(), node3.stop(), disc1.stop(), disc2.stop()])
asyncTest "peer exchange request functions returns some discovered peers":
let
node1 = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
node2 = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
# Start and mount peer exchange
await allFutures([node1.start(), node2.start()])
await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()])
# Create connection
let connOpt = await node2.peerManager.dialPeer(node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec)
require:
connOpt.isSome
# Create some enr and add to peer exchange (sumilating disv5)
var enr1, enr2 = enr.Record()
check enr1.fromUri("enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB")
check enr2.fromUri("enr:-Iu4QGJllOWlviPIh_SGR-VVm55nhnBIU5L-s3ran7ARz_4oDdtJPtUs3Bc5aqZHCiPQX6qzNYF2ARHER0JPX97TFbEBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQP3ULycvday4EkvtVu0VqbBdmOkbfVLJx8fPe0lE_dRkIN0Y3CC6mCFd2FrdTIB")
# Mock that we have discovered these enrs
node1.wakuPeerExchange.enrCache.add(enr1)
node1.wakuPeerExchange.enrCache.add(enr2)
# Request 2 peer from px. Test all request variants
let response1 = await node2.wakuPeerExchange.request(2)
let response2 = await node2.wakuPeerExchange.request(2, node1.peerInfo.toRemotePeerInfo())
let response3 = await node2.wakuPeerExchange.request(2, connOpt.get())
# Check the response or dont even continue
require:
response1.isOk
response2.isOk
response3.isOk
check:
response1.get().peerInfos.len == 2
response2.get().peerInfos.len == 2
response3.get().peerInfos.len == 2
# Since it can return duplicates test that at least one of the enrs is in the response
response1.get().peerInfos.anyIt(it.enr == enr1.raw) or response1.get().peerInfos.anyIt(it.enr == enr2.raw)
response2.get().peerInfos.anyIt(it.enr == enr1.raw) or response2.get().peerInfos.anyIt(it.enr == enr2.raw)
response3.get().peerInfos.anyIt(it.enr == enr1.raw) or response3.get().peerInfos.anyIt(it.enr == enr2.raw)
asyncTest "peer exchange handler works as expected":
let
node1 = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
node2 = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
# Start and mount peer exchange
await allFutures([node1.start(), node2.start()])
await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()])
# Mock that we have discovered these enrs
var enr1 = enr.Record()
check enr1.fromUri("enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB")
node1.wakuPeerExchange.enrCache.add(enr1)
# Create connection
let connOpt = await node2.peerManager.dialPeer(node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec)
require connOpt.isSome
let conn = connOpt.get()
# Send bytes so that they directly hit the handler
let rpc = PeerExchangeRpc(
request: PeerExchangeRequest(numPeers: 1))
var buffer: seq[byte]
await conn.writeLP(rpc.encode().buffer)
buffer = await conn.readLp(MaxRpcSize.int)
# Decode the response
let decodedBuff = PeerExchangeRpc.decode(buffer)
require decodedBuff.isOk
# Check we got back the enr we mocked
check:
decodedBuff.get().response.peerInfos.len == 1
decodedBuff.get().response.peerInfos[0].enr == enr1.raw
asyncTest "peer exchange request fails gracefully":
let
node1 = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
node2 = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
# Start and mount peer exchange
await allFutures([node1.start(), node2.start()])
await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()])
# Create connection
let connOpt = await node2.peerManager.dialPeer(node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec)
require connOpt.isSome
# Force closing the connection to simulate a failed peer
await connOpt.get().close()
# Request 2 peer from px
let response = await node1.wakuPeerExchange.request(2, connOpt.get())
# Check that it failed gracefully
check: response.isErr
asyncTest "connections are closed after response is sent":
# Create 3 nodes
let nodes = toSeq(0..<3).mapIt(newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)))
await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountPeerExchange()))
# Multiple nodes request to node 0
for i in 1..<3:
let resp = await nodes[i].wakuPeerExchange.request(2, nodes[0].switch.peerInfo.toRemotePeerInfo())
require resp.isOk
# Wait for streams to be closed
await sleepAsync(1.seconds)
# Check that all streams are closed for px
check:
nodes[0].peerManager.getNumStreams(WakuPeerExchangeCodec) == (0, 0)
nodes[1].peerManager.getNumStreams(WakuPeerExchangeCodec) == (0, 0)
nodes[2].peerManager.getNumStreams(WakuPeerExchangeCodec) == (0, 0)

View File

@ -344,29 +344,3 @@ suite "WakuNode":
node1MultiAddrs.contains(expectedMultiaddress1)
await allFutures(node1.stop(), node2.stop())
asyncTest "Function fetchPeerExchangePeers succesfully exchanges px peers":
let
node1 = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
node2 = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
# Start and mount peer exchange
await allFutures([node1.start(), node2.start()])
await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()])
# Mock that we discovered a node (to avoid running discv5)
var enr = enr.Record()
require enr.fromUri("enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB")
node2.wakuPeerExchange.enrCache.add(enr)
# Set node2 as service peer (default one) for px protocol
node1.peerManager.addServicePeer(node2.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec)
# Request 1 peer from peer exchange protocol
await node1.fetchPeerExchangePeers(1)
# Check that the peer ended up in the peerstore
let rpInfo = enr.toRemotePeerInfo.get()
check:
node1.peerManager.peerStore.peers.anyIt(it.peerId == rpInfo.peerId)
node1.peerManager.peerStore.peers.anyIt(it.addrs == rpInfo.addrs)

View File

@ -5,6 +5,7 @@ import ../../../waku/[waku_core/message, waku_store]
const
FUTURE_TIMEOUT* = 1.seconds
FUTURE_TIMEOUT_LONG* = 10.seconds
FUTURE_TIMEOUT_SHORT* = 100.milliseconds
proc newPushHandlerFuture*(): Future[(string, WakuMessage)] =
newFuture[(string, WakuMessage)]()
@ -31,6 +32,8 @@ proc toResult*(future: Future[void]): Result[void, string] =
else:
return chronos.err("Future finished but failed.")
proc waitForResult*[T](future: Future[T], timeout = FUTURE_TIMEOUT): Future[Result[T, string]] {.async.} =
proc waitForResult*[T](
future: Future[T], timeout = FUTURE_TIMEOUT
): Future[Result[T, string]] {.async.} =
discard await future.withTimeout(timeout)
return future.toResult()

View File

@ -0,0 +1 @@
import ./test_protocol, ./test_rpc_codec

View File

@ -0,0 +1,394 @@
{.used.}
import
std/[options, sequtils, tables],
testutils/unittests,
chronos,
chronicles,
stew/shims/net,
libp2p/[switch, peerId, crypto/crypto, multistream, muxers/muxer],
eth/[keys, p2p/discoveryv5/enr]
import
../../../waku/[
waku_node,
node/peer_manager,
waku_discv5,
waku_peer_exchange,
waku_peer_exchange/rpc,
waku_peer_exchange/rpc_codec,
waku_peer_exchange/protocol,
node/peer_manager,
waku_relay/protocol,
waku_relay,
waku_core,
waku_core/message/codec
],
../testlib/[wakucore, wakunode, simple_mock, assertions],
./utils.nim
suite "Waku Peer Exchange":
# Some of this tests use node.wakuPeerExchange instead of just a standalone PeerExchange.
# This is because attempts to connect the switches for two standalones PeerExchanges failed.
# TODO: Try to make the tests work with standalone PeerExchanges
suite "request":
asyncTest "Retrieve and provide peer exchange peers from discv5":
## Given (copied from test_waku_discv5.nim)
let
# todo: px flag
flags =
CapabilitiesBitfield.init(
lightpush = false, filter = false, store = false, relay = true
)
bindIp = parseIpAddress("0.0.0.0")
extIp = parseIpAddress("127.0.0.1")
nodeKey1 = generateSecp256k1Key()
nodeTcpPort1 = Port(64010)
nodeUdpPort1 = Port(9000)
node1 =
newTestWakuNode(
nodeKey1,
bindIp,
nodeTcpPort1,
some(extIp),
wakuFlags = some(flags),
discv5UdpPort = some(nodeUdpPort1),
)
nodeKey2 = generateSecp256k1Key()
nodeTcpPort2 = Port(64012)
nodeUdpPort2 = Port(9002)
node2 =
newTestWakuNode(
nodeKey2,
bindIp,
nodeTcpPort2,
some(extIp),
wakuFlags = some(flags),
discv5UdpPort = some(nodeUdpPort2),
)
nodeKey3 = generateSecp256k1Key()
nodeTcpPort3 = Port(64014)
nodeUdpPort3 = Port(9004)
node3 =
newTestWakuNode(
nodeKey3,
bindIp,
nodeTcpPort3,
some(extIp),
wakuFlags = some(flags),
discv5UdpPort = some(nodeUdpPort3),
)
# discv5
let
conf1 =
WakuDiscoveryV5Config(
discv5Config: none(DiscoveryConfig),
address: bindIp,
port: nodeUdpPort1,
privateKey: keys.PrivateKey(nodeKey1.skkey),
bootstrapRecords: @[],
autoupdateRecord: true,
)
let
disc1 =
WakuDiscoveryV5.new(
node1.rng, conf1, some(node1.enr), some(node1.peerManager)
)
let
conf2 =
WakuDiscoveryV5Config(
discv5Config: none(DiscoveryConfig),
address: bindIp,
port: nodeUdpPort2,
privateKey: keys.PrivateKey(nodeKey2.skkey),
bootstrapRecords: @[disc1.protocol.getRecord()],
autoupdateRecord: true,
)
let
disc2 =
WakuDiscoveryV5.new(
node2.rng, conf2, some(node2.enr), some(node2.peerManager)
)
await allFutures(node1.start(), node2.start(), node3.start())
let resultDisc1StartRes = await disc1.start()
assert resultDisc1StartRes.isOk(), resultDisc1StartRes.error
let resultDisc2StartRes = await disc2.start()
assert resultDisc2StartRes.isOk(), resultDisc2StartRes.error
## When
var attempts = 10
while (disc1.protocol.nodesDiscovered < 1 or disc2.protocol.nodesDiscovered < 1) and
attempts > 0:
await sleepAsync(1.seconds)
attempts -= 1
# node2 can be connected, so will be returned by peer exchange
require (
await node1.peerManager.connectRelay(node2.switch.peerInfo.toRemotePeerInfo())
)
# Mount peer exchange
await node1.mountPeerExchange()
await node3.mountPeerExchange()
let
dialResponse =
await node3.dialForPeerExchange(node1.switch.peerInfo.toRemotePeerInfo())
let response = dialResponse.get()
## Then
check:
response.get().peerInfos.len == 1
response.get().peerInfos[0].enr == disc2.protocol.localNode.record.raw
await allFutures(
[node1.stop(), node2.stop(), node3.stop(), disc1.stop(), disc2.stop()]
)
asyncTest "Request returns some discovered peers":
let
node1 =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
node2 =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
# Start and mount peer exchange
await allFutures([node1.start(), node2.start()])
await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()])
# Create connection
let
connOpt =
await node2.peerManager.dialPeer(
node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec
)
require:
connOpt.isSome
# Create some enr and add to peer exchange (simulating disv5)
var enr1, enr2 = enr.Record()
check enr1.fromUri(
"enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB"
)
check enr2.fromUri(
"enr:-Iu4QGJllOWlviPIh_SGR-VVm55nhnBIU5L-s3ran7ARz_4oDdtJPtUs3Bc5aqZHCiPQX6qzNYF2ARHER0JPX97TFbEBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQP3ULycvday4EkvtVu0VqbBdmOkbfVLJx8fPe0lE_dRkIN0Y3CC6mCFd2FrdTIB"
)
# Mock that we have discovered these enrs
node1.wakuPeerExchange.enrCache.add(enr1)
node1.wakuPeerExchange.enrCache.add(enr2)
# Request 2 peer from px. Test all request variants
let response1 = await node2.wakuPeerExchange.request(2)
let
response2 =
await node2.wakuPeerExchange.request(2, node1.peerInfo.toRemotePeerInfo())
let response3 = await node2.wakuPeerExchange.request(2, connOpt.get())
# Check the response or dont even continue
require:
response1.isOk
response2.isOk
response3.isOk
check:
response1.get().peerInfos.len == 2
response2.get().peerInfos.len == 2
response3.get().peerInfos.len == 2
# Since it can return duplicates test that at least one of the enrs is in the response
response1.get().peerInfos.anyIt(it.enr == enr1.raw) or
response1.get().peerInfos.anyIt(it.enr == enr2.raw)
response2.get().peerInfos.anyIt(it.enr == enr1.raw) or
response2.get().peerInfos.anyIt(it.enr == enr2.raw)
response3.get().peerInfos.anyIt(it.enr == enr1.raw) or
response3.get().peerInfos.anyIt(it.enr == enr2.raw)
asyncTest "Request fails gracefully":
let
node1 =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
node2 =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
# Start and mount peer exchange
await allFutures([node1.start(), node2.start()])
await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()])
# Create connection
let
connOpt =
await node2.peerManager.dialPeer(
node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec
)
require connOpt.isSome
# Force closing the connection to simulate a failed peer
await connOpt.get().close()
# Request 2 peer from px
let response = await node1.wakuPeerExchange.request(2, connOpt.get())
# Check that it failed gracefully
check:
response.isErr
asyncTest "Request 0 peers, with 0 peers in PeerExchange":
# Given a disconnected PeerExchange
let
switch = newTestSwitch()
peerManager = PeerManager.new(switch)
peerExchange = WakuPeerExchange.new(peerManager)
# When requesting 0 peers
let response = await peerExchange.request(0)
# Then the response should be an error
check:
response.isErr
response.error == "peer_not_found_failure"
asyncTest "Request 0 peers, with 1 peer in PeerExchange":
# Given two valid nodes with PeerExchange
let
node1 =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
node2 =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
# Start and mount peer exchange
await allFutures([node1.start(), node2.start()])
await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()])
# Connect the nodes
let
dialResponse =
await node2.peerManager.dialPeer(
node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec
)
assert dialResponse.isSome
# Mock that we have discovered one enr
var record = enr.Record()
check record.fromUri(
"enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB"
)
node1.wakuPeerExchange.enrCache.add(record)
# When requesting 0 peers
let response = await node1.wakuPeerExchange.request(0)
# Then the response should be empty
assertResultOk(response)
check response.get().peerInfos.len == 0
asyncTest "Request with invalid peer info":
# Given two valid nodes with PeerExchange
let
node1 =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
node2 =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
# Start and mount peer exchange
await allFutures([node1.start(), node2.start()])
await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()])
# Mock that we have discovered one enr
var record = enr.Record()
check record.fromUri(
"enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB"
)
node1.wakuPeerExchange.enrCache.add(record)
# When making any request with an invalid peer info
var remotePeerInfo2 = node2.peerInfo.toRemotePeerInfo()
remotePeerInfo2.peerId.data.add(255.byte)
let response = await node1.wakuPeerExchange.request(1, remotePeerInfo2)
# Then the response should be an error
check:
response.isErr
response.error == "dial_failure"
asyncTest "Connections are closed after response is sent":
# Create 3 nodes
let
nodes =
toSeq(0..<3).mapIt(
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
)
await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountPeerExchange()))
# Multiple nodes request to node 0
for i in 1..<3:
let
resp =
await nodes[i].wakuPeerExchange.request(
2, nodes[0].switch.peerInfo.toRemotePeerInfo()
)
require resp.isOk
# Wait for streams to be closed
await sleepAsync(1.seconds)
# Check that all streams are closed for px
check:
nodes[0].peerManager.getNumStreams(WakuPeerExchangeCodec) == (0, 0)
nodes[1].peerManager.getNumStreams(WakuPeerExchangeCodec) == (0, 0)
nodes[2].peerManager.getNumStreams(WakuPeerExchangeCodec) == (0, 0)
suite "Protocol Handler":
asyncTest "Works as expected":
let
node1 =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
node2 =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
# Start and mount peer exchange
await allFutures([node1.start(), node2.start()])
await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()])
# Mock that we have discovered these enrs
var enr1 = enr.Record()
check enr1.fromUri(
"enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB"
)
node1.wakuPeerExchange.enrCache.add(enr1)
# Create connection
let
connOpt =
await node2.peerManager.dialPeer(
node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec
)
require connOpt.isSome
let conn = connOpt.get()
# Send bytes so that they directly hit the handler
let rpc = PeerExchangeRpc(request: PeerExchangeRequest(numPeers: 1))
var buffer: seq[byte]
await conn.writeLP(rpc.encode().buffer)
buffer = await conn.readLp(MaxRpcSize.int)
# Decode the response
let decodedBuff = PeerExchangeRpc.decode(buffer)
require decodedBuff.isOk
# Check we got back the enr we mocked
check:
decodedBuff.get().response.peerInfos.len == 1
decodedBuff.get().response.peerInfos[0].enr == enr1.raw

View File

@ -0,0 +1,64 @@
{.used.}
import
std/[options],
testutils/unittests,
chronos,
stew/shims/net,
libp2p/switch,
libp2p/peerId,
libp2p/crypto/crypto,
eth/keys,
eth/p2p/discoveryv5/enr
import
../../../waku/[
node/peer_manager,
waku_discv5,
waku_peer_exchange/rpc,
waku_peer_exchange/rpc_codec
],
../testlib/[wakucore]
suite "Peer Exchange RPC":
asyncTest "Encode - Decode":
# Setup
var
enr1 = enr.Record(seqNum: 0, raw: @[])
enr2 = enr.Record(seqNum: 0, raw: @[])
check:
enr1.fromUri(
"enr:-JK4QPmO-sE2ELiWr8qVFs1kaY4jQZQpNaHvSPRmKiKcaDoqYRdki2c1BKSliImsxFeOD_UHnkddNL2l0XT9wlsP0WEBgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQIMwKqlOl3zpwnrsKRKHuWPSuFzit1Cl6IZvL2uzBRe8oN0Y3CC6mKDdWRwgiMqhXdha3UyDw"
)
enr2.fromUri(
"enr:-Iu4QK_T7kzAmewG92u1pr7o6St3sBqXaiIaWIsFNW53_maJEaOtGLSN2FUbm6LmVxSfb1WfC7Eyk-nFYI7Gs3SlchwBgmlkgnY0gmlwhI5d6VKJc2VjcDI1NmsxoQLPYQDvrrFdCrhqw3JuFaGD71I8PtPfk6e7TJ3pg_vFQYN0Y3CC6mKDdWRwgiMq"
)
let
peerInfos =
@[PeerExchangePeerInfo(enr: enr1.raw), PeerExchangePeerInfo(enr: enr2.raw)]
rpc = PeerExchangeRpc(response: PeerExchangeResponse(peerInfos: peerInfos))
# When encoding and decoding
let
rpcBuffer: seq[byte] = rpc.encode().buffer
res = PeerExchangeRpc.decode(rpcBuffer)
# Then the peerInfos match the originals
check:
res.isOk
res.get().response.peerInfos == peerInfos
# When using the decoded responses to create new enrs
var
resEnr1 = enr.Record(seqNum: 0, raw: @[])
resEnr2 = enr.Record(seqNum: 0, raw: @[])
discard resEnr1.fromBytes(res.get().response.peerInfos[0].enr)
discard resEnr2.fromBytes(res.get().response.peerInfos[1].enr)
# Then they match the original enrs
check:
resEnr1 == enr1
resEnr2 == enr2

View File

@ -0,0 +1,51 @@
{.used.}
import
std/options,
testutils/unittests,
chronos,
libp2p/switch,
libp2p/peerId,
libp2p/crypto/crypto,
eth/keys,
eth/p2p/discoveryv5/enr
import
../../../waku/[
waku_node,
waku_discv5,
waku_peer_exchange,
waku_peer_exchange/rpc,
waku_peer_exchange/protocol,
node/peer_manager,
waku_core
],
../testlib/[futures, wakucore, assertions]
proc dialForPeerExchange*(
client: WakuNode,
peerInfo: RemotePeerInfo,
requestedPeers: uint64 = 1,
minimumPeers: uint64 = 0,
attempts: uint64 = 100,
): Future[Result[WakuPeerExchangeResult[PeerExchangeResponse], string]] {.async.} =
# Dials a peer and awaits until it's able to receive a peer exchange response
# For the test, the relevant part is the dialPeer call.
# But because the test needs peers, and due to the asynchronous nature of the dialing,
# we await until we receive peers from the peer exchange protocol.
var attempts = attempts
while attempts > 0:
let connOpt = await client.peerManager.dialPeer(peerInfo, WakuPeerExchangeCodec)
require connOpt.isSome()
await sleepAsync(FUTURE_TIMEOUT_SHORT)
let response = await client.wakuPeerExchange.request(requestedPeers, connOpt.get())
assertResultOk(response)
if uint64(response.get().peerInfos.len) > minimumPeers:
return ok(response)
attempts -= 1
return err("Attempts exhausted.")

View File

@ -333,7 +333,9 @@ proc startNode*(node: WakuNode, conf: WakuNodeConf,
# retrieve px peers and add the to the peer store
if conf.peerExchangeNode != "":
let desiredOutDegree = node.wakuRelay.parameters.d.uint64()
await node.fetchPeerExchangePeers(desiredOutDegree)
(await node.fetchPeerExchangePeers(desiredOutDegree)).isOkOr:
error "error while fetching peers from peer exchange", error = error
quit(QuitFailure)
# Start keepalive, if enabled
if conf.keepAlive:

View File

@ -450,7 +450,7 @@ proc filterHandleMessage*(node: WakuNode,
{.async.}=
if node.wakuFilter.isNil() or node.wakuFilterLegacy.isNil():
error "cannot handle filter message", error="waku filter is nil"
error "cannot handle filter message", error = "waku filter and waku filter legacy are both required"
return
await allFutures(node.wakuFilter.handleMessage(pubsubTopic, message),
@ -1022,10 +1022,12 @@ proc mountPeerExchange*(node: WakuNode) {.async, raises: [Defect, LPError].} =
node.switch.mount(node.wakuPeerExchange, protocolMatcher(WakuPeerExchangeCodec))
proc fetchPeerExchangePeers*(node: Wakunode, amount: uint64) {.async, raises: [Defect].} =
proc fetchPeerExchangePeers*(
node: Wakunode, amount: uint64
): Future[Result[int, string]] {.async, raises: [Defect].} =
if node.wakuPeerExchange.isNil():
error "could not get peers from px, waku peer-exchange is nil"
return
return err("PeerExchange is not mounted")
info "Retrieving peer info via peer exchange protocol"
let pxPeersRes = await node.wakuPeerExchange.request(amount)
@ -1035,14 +1037,18 @@ proc fetchPeerExchangePeers*(node: Wakunode, amount: uint64) {.async, raises: [D
for pi in peers:
var record: enr.Record
if enr.fromBytes(record, pi.enr):
node.peerManager.addPeer(record.toRemotePeerInfo().get, PeerExcahnge)
node.peerManager.addPeer(record.toRemotePeerInfo().get, PeerExchange)
validPeers += 1
info "Retrieved peer info via peer exchange protocol", validPeers = validPeers, totalPeers = peers.len
info "Retrieved peer info via peer exchange protocol",
validPeers = validPeers, totalPeers = peers.len
return ok(validPeers)
else:
warn "Failed to retrieve peer info via peer exchange protocol", error = pxPeersRes.error
warn "failed to retrieve peer info via peer exchange protocol",
error = pxPeersRes.error
return err("Peer exchange failure: " & $pxPeersRes.error)
# TODO: Move to application module (e.g., wakunode2.nim)
proc setPeerExchangePeer*(node: WakuNode, peer: RemotePeerInfo|string) =
proc setPeerExchangePeer*(node: WakuNode, peer: RemotePeerInfo | MultiAddress | string) =
if node.wakuPeerExchange.isNil():
error "could not set peer, waku peer-exchange is nil"
return
@ -1054,7 +1060,7 @@ proc setPeerExchangePeer*(node: WakuNode, peer: RemotePeerInfo|string) =
error "could not parse peer info", error = remotePeerRes.error
return
node.peerManager.addPeer(remotePeerRes.value, WakuPeerExchangeCodec)
node.peerManager.addPeer(remotePeerRes.value, PeerExchange)
waku_px_peers.inc()

View File

@ -35,7 +35,7 @@ type
UnknownOrigin,
Discv5,
Static,
PeerExcahnge,
PeerExchange,
Dns
PeerDirection* = enum