nwaku/tests/v2/test_waku_peer_exchange.nim
Hanno Cornelius 21737c7c1b
feat(discv5): added find random nodes with predicate (#1762) (#1763)
* feat(discv5): added find random nodes with predicate (#1762)

Co-authored-by: Lorenzo Delgado <lnsdev@proton.me>

* chore: play around with sleep times on tests

* chore: simplify flaky tests

* chore: update waku/v2/node/waku_node.nim

Co-authored-by: Ivan Folgueira Bande <128452529+Ivansete-status@users.noreply.github.com>

* chore: update tests/v2/test_waku_discv5.nim

Co-authored-by: Ivan Folgueira Bande <128452529+Ivansete-status@users.noreply.github.com>

* chore: update examples/v2/subscriber.nim

Co-authored-by: Ivan Folgueira Bande <128452529+Ivansete-status@users.noreply.github.com>

---------

Co-authored-by: Lorenzo Delgado <lnsdev@proton.me>
Co-authored-by: Ivan Folgueira Bande <128452529+Ivansete-status@users.noreply.github.com>
2023-06-06 16:36:20 +02:00

287 lines
10 KiB
Nim
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

{.used.}
import
std/[options, sequtils, tables],
testutils/unittests,
chronos,
chronicles,
stew/shims/net,
libp2p/switch,
libp2p/peerId,
libp2p/crypto/crypto,
libp2p/multistream,
libp2p/muxers/muxer,
eth/keys,
eth/p2p/discoveryv5/enr
import
../../waku/v2/waku_node,
../../waku/v2/node/peer_manager,
../../waku/v2/waku_discv5,
../../waku/v2/waku_peer_exchange,
../../waku/v2/waku_peer_exchange/rpc,
../../waku/v2/waku_peer_exchange/rpc_codec,
../../waku/v2/waku_peer_exchange/protocol,
./testlib/wakucore,
./testlib/wakunode
# TODO: Extend test coverage
procSuite "Waku Peer Exchange":
asyncTest "encode and decode peer exchange response":
## Setup
var
enr1 = enr.Record(seqNum: 0, raw: @[])
enr2 = enr.Record(seqNum: 0, raw: @[])
check enr1.fromUri("enr:-JK4QPmO-sE2ELiWr8qVFs1kaY4jQZQpNaHvSPRmKiKcaDoqYRdki2c1BKSliImsxFeOD_UHnkddNL2l0XT9wlsP0WEBgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQIMwKqlOl3zpwnrsKRKHuWPSuFzit1Cl6IZvL2uzBRe8oN0Y3CC6mKDdWRwgiMqhXdha3UyDw")
check enr2.fromUri("enr:-Iu4QK_T7kzAmewG92u1pr7o6St3sBqXaiIaWIsFNW53_maJEaOtGLSN2FUbm6LmVxSfb1WfC7Eyk-nFYI7Gs3SlchwBgmlkgnY0gmlwhI5d6VKJc2VjcDI1NmsxoQLPYQDvrrFdCrhqw3JuFaGD71I8PtPfk6e7TJ3pg_vFQYN0Y3CC6mKDdWRwgiMq")
let peerInfos = @[
PeerExchangePeerInfo(enr: enr1.raw),
PeerExchangePeerInfo(enr: enr2.raw),
]
var rpc = PeerExchangeRpc(
response: PeerExchangeResponse(
peerInfos: peerInfos
)
)
## When
let
rpcBuffer: seq[byte] = rpc.encode().buffer
res = PeerExchangeRpc.decode(rpcBuffer)
## Then
check:
res.isOk
res.get().response.peerInfos == peerInfos
## When
var
resEnr1 = enr.Record(seqNum: 0, raw: @[])
resEnr2 = enr.Record(seqNum: 0, raw: @[])
discard resEnr1.fromBytes(res.get().response.peerInfos[0].enr)
discard resEnr2.fromBytes(res.get().response.peerInfos[1].enr)
## Then
check:
resEnr1 == enr1
resEnr2 == enr2
asyncTest "retrieve and provide peer exchange peers from discv5":
## Setup (copied from test_waku_discv5.nim)
let
bindIp = ValidIpAddress.init("0.0.0.0")
extIp = ValidIpAddress.init("127.0.0.1")
nodeKey1 = generateSecp256k1Key()
nodeTcpPort1 = Port(64010)
nodeUdpPort1 = Port(9000)
node1 = newTestWakuNode(nodeKey1, bindIp, nodeTcpPort1)
nodeKey2 = generateSecp256k1Key()
nodeTcpPort2 = Port(64012)
nodeUdpPort2 = Port(9002)
node2 = newTestWakuNode(nodeKey2, bindIp, nodeTcpPort2)
nodeKey3 = generateSecp256k1Key()
nodeTcpPort3 = Port(64014)
nodeUdpPort3 = Port(9004)
node3 = newTestWakuNode(nodeKey3, bindIp, nodeTcpPort3)
# todo: px flag
flags = CapabilitiesBitfield.init(
lightpush = false,
filter = false,
store = false,
relay = true
)
# Mount discv5
node1.wakuDiscv5 = WakuDiscoveryV5.new(
some(extIp), some(nodeTcpPort1), some(nodeUdpPort1),
bindIp,
nodeUdpPort1,
newSeq[enr.Record](),
false,
keys.PrivateKey(nodeKey1.skkey),
flags,
newSeq[MultiAddress](), # Empty multiaddr fields, for now
node1.rng
)
node2.wakuDiscv5 = WakuDiscoveryV5.new(
some(extIp), some(nodeTcpPort2), some(nodeUdpPort2),
bindIp,
nodeUdpPort2,
@[node1.wakuDiscv5.protocol.localNode.record], # Bootstrap with node1
false,
keys.PrivateKey(nodeKey2.skkey),
flags,
newSeq[MultiAddress](), # Empty multiaddr fields, for now
node2.rng
)
## Given
await allFutures(node1.start(), node2.start(), node3.start())
await allFutures(node1.startDiscv5(), node2.startDiscv5())
var attempts = 10
while (node1.wakuDiscv5.protocol.nodesDiscovered < 1 or
node2.wakuDiscv5.protocol.nodesDiscovered < 1) and
attempts > 0:
await sleepAsync(1.seconds)
attempts -= 1
# node2 can be connected, so will be returned by peer exchange
require (await node1.peerManager.connectRelay(node2.switch.peerInfo.toRemotePeerInfo()))
# Mount peer exchange
await node1.mountPeerExchange()
await node3.mountPeerExchange()
var peerInfosLen = 0
var response: WakuPeerExchangeResult[PeerExchangeResponse]
attempts = 10
while peerInfosLen == 0 and attempts > 0:
var connOpt = await node3.peerManager.dialPeer(node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec)
require connOpt.isSome
response = await node3.wakuPeerExchange.request(1, connOpt.get())
require response.isOk
peerInfosLen = response.get().peerInfos.len
await sleepAsync(1.seconds)
attempts -= 1
check:
response.get().peerInfos.len == 1
response.get().peerInfos[0].enr == node2.wakuDiscV5.protocol.localNode.record.raw
await allFutures([node1.stop(), node2.stop(), node3.stop()])
asyncTest "peer exchange request functions returns some discovered peers":
let
node1 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
node2 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
# Start and mount peer exchange
await allFutures([node1.start(), node2.start()])
await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()])
# Create connection
let connOpt = await node2.peerManager.dialPeer(node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec)
require:
connOpt.isSome
# Create some enr and add to peer exchange (sumilating disv5)
var enr1, enr2 = enr.Record()
check enr1.fromUri("enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB")
check enr2.fromUri("enr:-Iu4QGJllOWlviPIh_SGR-VVm55nhnBIU5L-s3ran7ARz_4oDdtJPtUs3Bc5aqZHCiPQX6qzNYF2ARHER0JPX97TFbEBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQP3ULycvday4EkvtVu0VqbBdmOkbfVLJx8fPe0lE_dRkIN0Y3CC6mCFd2FrdTIB")
# Mock that we have discovered these enrs
node1.wakuPeerExchange.enrCache.add(enr1)
node1.wakuPeerExchange.enrCache.add(enr2)
# Request 2 peer from px. Test all request variants
let response1 = await node2.wakuPeerExchange.request(2)
let response2 = await node2.wakuPeerExchange.request(2, node1.peerInfo.toRemotePeerInfo())
let response3 = await node2.wakuPeerExchange.request(2, connOpt.get())
# Check the response or dont even continue
require:
response1.isOk
response2.isOk
response3.isOk
check:
response1.get().peerInfos.len == 2
response2.get().peerInfos.len == 2
response3.get().peerInfos.len == 2
# Since it can return duplicates test that at least one of the enrs is in the response
response1.get().peerInfos.anyIt(it.enr == enr1.raw) or response1.get().peerInfos.anyIt(it.enr == enr2.raw)
response2.get().peerInfos.anyIt(it.enr == enr1.raw) or response2.get().peerInfos.anyIt(it.enr == enr2.raw)
response3.get().peerInfos.anyIt(it.enr == enr1.raw) or response3.get().peerInfos.anyIt(it.enr == enr2.raw)
asyncTest "peer exchange handler works as expected":
let
node1 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
node2 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
# Start and mount peer exchange
await allFutures([node1.start(), node2.start()])
await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()])
# Mock that we have discovered these enrs
var enr1 = enr.Record()
check enr1.fromUri("enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB")
node1.wakuPeerExchange.enrCache.add(enr1)
# Create connection
let connOpt = await node2.peerManager.dialPeer(node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec)
require connOpt.isSome
let conn = connOpt.get()
# Send bytes so that they directly hit the handler
let rpc = PeerExchangeRpc(
request: PeerExchangeRequest(numPeers: 1))
var buffer: seq[byte]
await conn.writeLP(rpc.encode().buffer)
buffer = await conn.readLp(MaxRpcSize.int)
# Decode the response
let decodedBuff = PeerExchangeRpc.decode(buffer)
require decodedBuff.isOk
# Check we got back the enr we mocked
check:
decodedBuff.get().response.peerInfos.len == 1
decodedBuff.get().response.peerInfos[0].enr == enr1.raw
asyncTest "peer exchange request fails gracefully":
let
node1 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
node2 = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))
# Start and mount peer exchange
await allFutures([node1.start(), node2.start()])
await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()])
# Create connection
let connOpt = await node2.peerManager.dialPeer(node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec)
require connOpt.isSome
# Force closing the connection to simulate a failed peer
await connOpt.get().close()
# Request 2 peer from px
let response = await node1.wakuPeerExchange.request(2, connOpt.get())
# Check that it failed gracefully
check: response.isErr
asyncTest "connections are closed after response is sent":
# Create 3 nodes
let nodes = toSeq(0..<3).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountPeerExchange()))
# Multiple nodes request to node 0
for i in 1..<3:
let resp = await nodes[i].wakuPeerExchange.request(2, nodes[0].switch.peerInfo.toRemotePeerInfo())
require resp.isOk
# Wait for streams to be closed
await sleepAsync(1.seconds)
# Check that all streams are closed for px
check:
nodes[0].peerManager.getNumStreams(WakuPeerExchangeCodec) == (0, 0)
nodes[1].peerManager.getNumStreams(WakuPeerExchangeCodec) == (0, 0)
nodes[2].peerManager.getNumStreams(WakuPeerExchangeCodec) == (0, 0)