From f436240d53cc9082558a41df48f3130d2b3b617b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81lex=20Cabeza=20Romero?= Date: Thu, 14 Mar 2024 17:48:09 +0100 Subject: [PATCH] test(peer-exchange): Implement peer exchange tests (#2464) * Implement peer exchange tests. * Refactor, and remove duplicated tests. * feat(wakunode): Resultify fetch peer exchange peers (#2486) --- tests/all_tests_waku.nim | 10 +- tests/node/test_all.nim | 1 + tests/node/test_wakunode_filter.nim | 1 + tests/node/test_wakunode_peer_exchange.nim | 307 +++++++++++++++ tests/node/test_wakunode_store.nim | 119 +----- tests/test_peer_exchange.nim | 73 ---- tests/test_relay_peer_exchange.nim | 100 +++++ tests/test_waku_peer_exchange.nim | 316 ---------------- tests/test_wakunode.nim | 26 -- tests/testlib/futures.nim | 5 +- tests/waku_peer_exchange/test_all.nim | 1 + tests/waku_peer_exchange/test_protocol.nim | 394 ++++++++++++++++++++ tests/waku_peer_exchange/test_rpc_codec.nim | 64 ++++ tests/waku_peer_exchange/utils.nim | 51 +++ waku/factory/node_factory.nim | 4 +- waku/node/waku_node.nim | 22 +- waku/waku_core/peers.nim | 2 +- 17 files changed, 954 insertions(+), 542 deletions(-) create mode 100644 tests/node/test_wakunode_peer_exchange.nim delete mode 100644 tests/test_peer_exchange.nim create mode 100644 tests/test_relay_peer_exchange.nim delete mode 100644 tests/test_waku_peer_exchange.nim create mode 100644 tests/waku_peer_exchange/test_all.nim create mode 100644 tests/waku_peer_exchange/test_protocol.nim create mode 100644 tests/waku_peer_exchange/test_rpc_codec.nim create mode 100644 tests/waku_peer_exchange/utils.nim diff --git a/tests/all_tests_waku.nim b/tests/all_tests_waku.nim index 70261b2f1..5989eb666 100644 --- a/tests/all_tests_waku.nim +++ b/tests/all_tests_waku.nim @@ -38,7 +38,12 @@ when defined(waku_exp_store_resume): # TODO: Review store resume test cases (#1282) import ./waku_store/test_resume -import ./waku_relay/test_all, ./waku_filter_v2/test_all, ./waku_lightpush/test_all +import + ./node/test_all, + ./waku_filter_v2/test_all, + ./waku_peer_exchange/test_all, + ./waku_lightpush/test_all, + ./waku_relay/test_all import # Waku v2 tests @@ -47,7 +52,6 @@ import # Waku Filter ./test_waku_filter_legacy, ./test_wakunode_filter_legacy, - ./test_waku_peer_exchange, ./test_peer_store_extended, ./test_message_cache, ./test_peer_manager, @@ -56,7 +60,7 @@ import ./test_waku_enr, ./test_waku_dnsdisc, ./test_waku_discv5, - ./test_peer_exchange, + ./test_relay_peer_exchange, ./test_waku_noise, ./test_waku_noise_sessions, ./test_waku_netconfig, diff --git a/tests/node/test_all.nim b/tests/node/test_all.nim index d8a9685de..6ef93f57e 100644 --- a/tests/node/test_all.nim +++ b/tests/node/test_all.nim @@ -1,4 +1,5 @@ import ./test_wakunode_filter, ./test_wakunode_lightpush, + ./test_wakunode_peer_exchange, ./test_wakunode_store diff --git a/tests/node/test_wakunode_filter.nim b/tests/node/test_wakunode_filter.nim index 383b3881e..771f2985b 100644 --- a/tests/node/test_wakunode_filter.nim +++ b/tests/node/test_wakunode_filter.nim @@ -56,6 +56,7 @@ suite "Waku Filter - End to End": await allFutures(server.start(), client.start()) await server.mountFilter() + await server.mountLegacyFilter() await client.mountFilterClient() client.wakuFilterClient.registerPushHandler(messagePushHandler) diff --git a/tests/node/test_wakunode_peer_exchange.nim b/tests/node/test_wakunode_peer_exchange.nim new file mode 100644 index 000000000..1cb95e097 --- /dev/null +++ b/tests/node/test_wakunode_peer_exchange.nim @@ -0,0 +1,307 @@ +{.used.} + +import + std/[options, sequtils], + testutils/unittests, + chronos, + chronicles, + stew/shims/net, + libp2p/switch, + libp2p/peerId, + libp2p/crypto/crypto, + eth/keys, + eth/p2p/discoveryv5/enr + +import + ../../../waku/[ + waku_node, + waku_discv5, + waku_peer_exchange, + node/peer_manager, + waku_relay/protocol, + waku_core + ], + ../waku_peer_exchange/utils, + ../testlib/[wakucore, wakunode, testasync] + +suite "Waku Peer Exchange": + let + bindIp: IPAddress = parseIpAddress("0.0.0.0") + bindPort: Port = Port(0) + + var node {.threadvar.}: WakuNode + + suite "mountPeerExchange": + asyncSetup: + node = newTestWakuNode(generateSecp256k1Key(), bindIp, bindPort) + + asyncTest "Started node mounts peer exchange": + # Given a started node without peer exchange mounted + await node.start() + check: + node.wakuPeerExchange == nil + + # When mounting peer exchange + await node.mountPeerExchange() + + # Then peer exchange is mounted + check: + node.wakuPeerExchange != nil + node.wakuPeerExchange.started == true + + # Cleanup + await node.stop() + + asyncTest "Stopped node mounts peer exchange": + # Given a stopped node without peer exchange mounted + check: + node.wakuPeerExchange == nil + + # When mounting peer exchange + await node.mountPeerExchange() + + # Then peer exchange is mounted + check: + node.wakuPeerExchange != nil + node.wakuPeerExchange.started == false + + suite "fetchPeerExchangePeers": + var node2 {.threadvar.}: WakuNode + + asyncSetup: + node = newTestWakuNode(generateSecp256k1Key(), bindIp, bindPort) + node2 = newTestWakuNode(generateSecp256k1Key(), bindIp, bindPort) + + await allFutures(node.start(), node2.start()) + + asyncTeardown: + await allFutures(node.stop(), node2.stop()) + + asyncTest "Node fetches without mounting peer exchange": + # When a node, without peer exchange mounted, fetches peers + let res = await node.fetchPeerExchangePeers(1) + + # Then no peers are fetched + check: + node.peerManager.peerStore.peers.len == 0 + res.error == "PeerExchange is not mounted" + + asyncTest "Node fetches with mounted peer exchange, but no peers": + # Given a node with peer exchange mounted + await node.mountPeerExchange() + + # When a node fetches peers + let res = await node.fetchPeerExchangePeers(1) + check res.error == "Peer exchange failure: peer_not_found_failure" + + # Then no peers are fetched + check node.peerManager.peerStore.peers.len == 0 + + asyncTest "Node succesfully exchanges px peers with faked discv5": + # Given both nodes mount peer exchange + await allFutures([node.mountPeerExchange(), node2.mountPeerExchange()]) + check node.peerManager.peerStore.peers.len == 0 + + # Mock that we discovered a node (to avoid running discv5) + var enr = enr.Record() + assert enr.fromUri( + "enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB" + ), "Failed to parse ENR" + node2.wakuPeerExchange.enrCache.add(enr) + + # Set node2 as service peer (default one) for px protocol + node.peerManager.addServicePeer( + node2.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec + ) + + # Request 1 peer from peer exchange protocol + let res = await node.fetchPeerExchangePeers(1) + check res.tryGet() == 1 + + # Check that the peer ended up in the peerstore + let rpInfo = enr.toRemotePeerInfo.get() + check: + node.peerManager.peerStore.peers.anyIt(it.peerId == rpInfo.peerId) + node.peerManager.peerStore.peers.anyIt(it.addrs == rpInfo.addrs) + + suite "setPeerExchangePeer": + var node2 {.threadvar.}: WakuNode + + asyncSetup: + node = newTestWakuNode(generateSecp256k1Key(), bindIp, bindPort) + node2 = newTestWakuNode(generateSecp256k1Key(), bindIp, bindPort) + + await allFutures(node.start(), node2.start()) + + asyncTeardown: + await allFutures(node.stop(), node2.stop()) + + asyncTest "peer set successfully": + # Given a node with peer exchange mounted + await node.mountPeerExchange() + let initialPeers = node.peerManager.peerStore.peers.len + + # And a valid peer info + let remotePeerInfo2 = node2.peerInfo.toRemotePeerInfo() + + # When making a request with a valid peer info + node.setPeerExchangePeer(remotePeerInfo2) + + # Then the peer is added to the peer store + check: + node.peerManager.peerStore.peers.len == (initialPeers + 1) + + asyncTest "peer exchange not mounted": + # Given a node without peer exchange mounted + check node.wakuPeerExchange == nil + let initialPeers = node.peerManager.peerStore.peers.len + + # And a valid peer info + let invalidMultiAddress = MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + + # When making any request with an invalid peer info + node.setPeerExchangePeer(invalidMultiAddress) + + # Then no peer is added to the peer store + check: + node.peerManager.peerStore.peers.len == initialPeers + + asyncTest "peer info parse error": + # Given a node with peer exchange mounted + await node.mountPeerExchange() + let initialPeers = node.peerManager.peerStore.peers.len + + # And given a peer info with an invalid peer id + var remotePeerInfo2 = node2.peerInfo.toRemotePeerInfo() + remotePeerInfo2.peerId.data.add(255.byte) + + # When making any request with an invalid peer info + node.setPeerExchangePeer("invalidpeerinfo") + + # Then no peer is added to the peer store + check: + node.peerManager.peerStore.peers.len == initialPeers + +suite "Waku Peer Exchange with discv5": + asyncTest "Node successfully exchanges px peers with real discv5": + ## Given (copied from test_waku_discv5.nim) + let + # todo: px flag + flags = + CapabilitiesBitfield.init( + lightpush = false, filter = false, store = false, relay = true + ) + bindIp = parseIpAddress("0.0.0.0") + extIp = parseIpAddress("127.0.0.1") + + nodeKey1 = generateSecp256k1Key() + nodeTcpPort1 = Port(64010) + nodeUdpPort1 = Port(9000) + node1 = + newTestWakuNode( + nodeKey1, + bindIp, + nodeTcpPort1, + some(extIp), + wakuFlags = some(flags), + discv5UdpPort = some(nodeUdpPort1), + ) + + nodeKey2 = generateSecp256k1Key() + nodeTcpPort2 = Port(64012) + nodeUdpPort2 = Port(9002) + node2 = + newTestWakuNode( + nodeKey2, + bindIp, + nodeTcpPort2, + some(extIp), + wakuFlags = some(flags), + discv5UdpPort = some(nodeUdpPort2), + ) + + nodeKey3 = generateSecp256k1Key() + nodeTcpPort3 = Port(64014) + nodeUdpPort3 = Port(9004) + node3 = + newTestWakuNode( + nodeKey3, + bindIp, + nodeTcpPort3, + some(extIp), + wakuFlags = some(flags), + discv5UdpPort = some(nodeUdpPort3), + ) + + # discv5 + let + conf1 = + WakuDiscoveryV5Config( + discv5Config: none(DiscoveryConfig), + address: bindIp, + port: nodeUdpPort1, + privateKey: keys.PrivateKey(nodeKey1.skkey), + bootstrapRecords: @[], + autoupdateRecord: true, + ) + + let + disc1 = + WakuDiscoveryV5.new(node1.rng, conf1, some(node1.enr), some(node1.peerManager)) + + let + conf2 = + WakuDiscoveryV5Config( + discv5Config: none(DiscoveryConfig), + address: bindIp, + port: nodeUdpPort2, + privateKey: keys.PrivateKey(nodeKey2.skkey), + bootstrapRecords: @[disc1.protocol.getRecord()], + autoupdateRecord: true, + ) + + let + disc2 = + WakuDiscoveryV5.new(node2.rng, conf2, some(node2.enr), some(node2.peerManager)) + + await allFutures(node1.start(), node2.start(), node3.start()) + let resultDisc1StartRes = await disc1.start() + assert resultDisc1StartRes.isOk(), resultDisc1StartRes.error + let resultDisc2StartRes = await disc2.start() + assert resultDisc2StartRes.isOk(), resultDisc2StartRes.error + + ## When + var attempts = 10 + while (disc1.protocol.nodesDiscovered < 1 or disc2.protocol.nodesDiscovered < 1) and + attempts > 0: + await sleepAsync(1.seconds) + attempts -= 1 + + # node2 can be connected, so will be returned by peer exchange + require ( + await node1.peerManager.connectRelay(node2.switch.peerInfo.toRemotePeerInfo()) + ) + + # Mount peer exchange + await node1.mountPeerExchange() + await node3.mountPeerExchange() + + let + dialResponse = + await node3.dialForPeerExchange(node1.switch.peerInfo.toRemotePeerInfo()) + + check dialResponse.isOk + + let + requestPeers = 1 + currentPeers = node3.peerManager.peerStore.peers.len + let res = await node3.fetchPeerExchangePeers(1) + check res.tryGet() == 1 + + # Then node3 has received 1 peer from node1 + check: + node3.peerManager.peerStore.peers.len == currentPeers + requestPeers + + await allFutures( + [node1.stop(), node2.stop(), node3.stop(), disc1.stop(), disc2.stop()] + ) diff --git a/tests/node/test_wakunode_store.nim b/tests/node/test_wakunode_store.nim index 22153de29..a98407b49 100644 --- a/tests/node/test_wakunode_store.nim +++ b/tests/node/test_wakunode_store.nim @@ -508,7 +508,7 @@ suite "Waku Store - End to End - Sorted Archive": # Cleanup waitFor otherServer.stop() -suite "Waku Store - End to End - Unsorted Archive with provided Timestamp": +suite "Waku Store - End to End - Unsorted Archive": var pubsubTopic {.threadvar.}: PubsubTopic var contentTopic {.threadvar.}: ContentTopic var contentTopicSeq {.threadvar.}: seq[ContentTopic] @@ -652,111 +652,6 @@ suite "Waku Store - End to End - Unsorted Archive with provided Timestamp": unsortedArchiveMessages[9] ] -suite "Waku Store - End to End - Unsorted Archive without provided Timestamp": - var pubsubTopic {.threadvar.}: PubsubTopic - var contentTopic {.threadvar.}: ContentTopic - var contentTopicSeq {.threadvar.}: seq[ContentTopic] - - var historyQuery {.threadvar.}: HistoryQuery - var unsortedArchiveMessages {.threadvar.}: seq[WakuMessage] - - var server {.threadvar.}: WakuNode - var client {.threadvar.}: WakuNode - - var serverRemotePeerInfo {.threadvar.}: RemotePeerInfo - - asyncSetup: - pubsubTopic = DefaultPubsubTopic - contentTopic = DefaultContentTopic - contentTopicSeq = @[contentTopic] - - historyQuery = - HistoryQuery( - pubsubTopic: some(pubsubTopic), - contentTopics: contentTopicSeq, - direction: PagingDirection.FORWARD, - pageSize: 5, - ) - - unsortedArchiveMessages = - @[ # Not providing explicit timestamp means it will be set in "arrive" order - fakeWakuMessage(@[byte 09]), - fakeWakuMessage(@[byte 07]), - fakeWakuMessage(@[byte 05]), - fakeWakuMessage(@[byte 03]), - fakeWakuMessage(@[byte 01]), - fakeWakuMessage(@[byte 00]), - fakeWakuMessage(@[byte 02]), - fakeWakuMessage(@[byte 04]), - fakeWakuMessage(@[byte 06]), - fakeWakuMessage(@[byte 08]) - ] - - let - serverKey = generateSecp256k1Key() - clientKey = generateSecp256k1Key() - - server = newTestWakuNode(serverKey, ValidIpAddress.init("0.0.0.0"), Port(0)) - client = newTestWakuNode(clientKey, ValidIpAddress.init("0.0.0.0"), Port(0)) - - let - unsortedArchiveDriverWithMessages = - newArchiveDriverWithMessages(pubsubTopic, unsortedArchiveMessages) - mountUnsortedArchiveResult = - server.mountArchive(unsortedArchiveDriverWithMessages) - - assert mountUnsortedArchiveResult.isOk() - - waitFor server.mountStore() - client.mountStoreClient() - - waitFor allFutures(server.start(), client.start()) - - serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo() - - asyncTeardown: - waitFor allFutures(client.stop(), server.stop()) - - asyncTest "Sorting using receiverTime": - # When making a history query - let queryResponse = await client.query(historyQuery, serverRemotePeerInfo) - - # Then the response contains the messages - check: - queryResponse.get().messages == - @[ - unsortedArchiveMessages[0], - unsortedArchiveMessages[1], - unsortedArchiveMessages[2], - unsortedArchiveMessages[3], - unsortedArchiveMessages[4] - ] - - # Given the next query - var - historyQuery2 = - HistoryQuery( - cursor: queryResponse.get().cursor, - pubsubTopic: some(pubsubTopic), - contentTopics: contentTopicSeq, - direction: PagingDirection.FORWARD, - pageSize: 5, - ) - - # When making the next history query - let queryResponse2 = await client.query(historyQuery2, serverRemotePeerInfo) - - # Then the response contains the messages - check: - queryResponse2.get().messages == - @[ - unsortedArchiveMessages[5], - unsortedArchiveMessages[6], - unsortedArchiveMessages[7], - unsortedArchiveMessages[8], - unsortedArchiveMessages[9] - ] - suite "Waku Store - End to End - Archive with Multiple Topics": var pubsubTopic {.threadvar.}: PubsubTopic var pubsubTopicB {.threadvar.}: PubsubTopic @@ -794,11 +689,9 @@ suite "Waku Store - End to End - Archive with Multiple Topics": ) let timeOrigin = now() - - proc myOriginTs(offset = 0): Timestamp {.gcsafe, raises: [].} = - ts(offset, timeOrigin) - - originTs = myOriginTs + originTs = + proc(offset = 0): Timestamp {.gcsafe, raises: [].} = + ts(offset, timeOrigin) archiveMessages = @[ @@ -828,9 +721,9 @@ suite "Waku Store - End to End - Archive with Multiple Topics": newSqliteArchiveDriver().put(pubsubTopic, archiveMessages[0..<6]).put( pubsubTopicB, archiveMessages[6..<10] ) - let mountUnsortedArchiveResult = server.mountArchive(archiveDriver) + let mountSortedArchiveResult = server.mountArchive(archiveDriver) - assert mountUnsortedArchiveResult.isOk() + assert mountSortedArchiveResult.isOk() waitFor server.mountStore() client.mountStoreClient() diff --git a/tests/test_peer_exchange.nim b/tests/test_peer_exchange.nim deleted file mode 100644 index b6ac30f39..000000000 --- a/tests/test_peer_exchange.nim +++ /dev/null @@ -1,73 +0,0 @@ -{.used.} - -import - std/[sequtils, options], - stew/shims/net, - testutils/unittests, - chronicles, - chronos, - libp2p/peerid, - libp2p/crypto/crypto, - libp2p/protocols/pubsub/gossipsub -import - ../../waku/waku_core, - ../../waku/waku_node, - ./testlib/wakucore, - ./testlib/wakunode - -procSuite "Peer Exchange": - asyncTest "GossipSub (relay) peer exchange": - ## Tests peer exchange - - # Create nodes and ENR. These will be added to the discoverable list - let - bindIp = parseIpAddress("0.0.0.0") - nodeKey1 = generateSecp256k1Key() - node1 = newTestWakuNode(nodeKey1, bindIp, Port(0)) - nodeKey2 = generateSecp256k1Key() - node2 = newTestWakuNode(nodeKey2, bindIp, Port(0), sendSignedPeerRecord = true) - nodeKey3 = generateSecp256k1Key() - node3 = newTestWakuNode(nodeKey3, bindIp, Port(0), sendSignedPeerRecord = true) - - var - peerExchangeHandler, emptyHandler: RoutingRecordsHandler - completionFut = newFuture[bool]() - - proc ignorePeerExchange(peer: PeerId, topic: string, - peers: seq[RoutingRecordsPair]) {.gcsafe.} = - discard - - proc handlePeerExchange(peer: PeerId, topic: string, - peers: seq[RoutingRecordsPair]) {.gcsafe.} = - ## Handle peers received via gossipsub peer exchange - let peerRecords = peers.mapIt(it.record.get()) - - check: - # Node 3 is informed of node 2 via peer exchange - peer == node1.switch.peerInfo.peerId - topic == DefaultPubsubTopic - peerRecords.countIt(it.peerId == node2.switch.peerInfo.peerId) == 1 - - if (not completionFut.completed()): - completionFut.complete(true) - - peerExchangeHandler = handlePeerExchange - emptyHandler = ignorePeerExchange - - await node1.mountRelay(@[DefaultPubsubTopic], some(emptyHandler)) - await node2.mountRelay(@[DefaultPubsubTopic], some(emptyHandler)) - await node3.mountRelay(@[DefaultPubsubTopic], some(peerExchangeHandler)) - - # Ensure that node1 prunes all peers after the first connection - node1.wakuRelay.parameters.dHigh = 1 - - await allFutures([node1.start(), node2.start(), node3.start()]) - - await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) - - await node3.connectToNodes(@[node1.switch.peerInfo.toRemotePeerInfo()]) - - check: - (await completionFut.withTimeout(5.seconds)) == true - - await allFutures([node1.stop(), node2.stop(), node3.stop()]) diff --git a/tests/test_relay_peer_exchange.nim b/tests/test_relay_peer_exchange.nim new file mode 100644 index 000000000..6f842245f --- /dev/null +++ b/tests/test_relay_peer_exchange.nim @@ -0,0 +1,100 @@ +{.used.} + +import + std/[sequtils, options], + stew/shims/net, + testutils/unittests, + chronicles, + chronos, + libp2p/peerid, + libp2p/crypto/crypto, + libp2p/protocols/pubsub/gossipsub + +import + ../../waku/waku_core, + ../../waku/waku_node, + ./testlib/wakucore, + ./testlib/wakunode + +procSuite "Relay (GossipSub) Peer Exchange": + asyncTest "Mount relay without peer exchange handler": + # Given two nodes + let + listenAddress = parseIpAddress("0.0.0.0") + port = Port(0) + node1Key = generateSecp256k1Key() + node1 = newTestWakuNode(node1Key, listenAddress, port) + node2Key = generateSecp256k1Key() + node2 = + newTestWakuNode(node2Key, listenAddress, port, sendSignedPeerRecord = true) + + # When both client and server mount relay without a handler + await node1.mountRelay(@[DefaultPubsubTopic]) + await node2.mountRelay(@[DefaultPubsubTopic], none(RoutingRecordsHandler)) + + # Then the relays are mounted without a handler + check: + node1.wakuRelay.parameters.enablePX == false + node1.wakuRelay.routingRecordsHandler.len == 0 + node2.wakuRelay.parameters.enablePX == false + node2.wakuRelay.routingRecordsHandler.len == 0 + + asyncTest "Mount relay with peer exchange handler": + ## Given three nodes + # Create nodes and ENR. These will be added to the discoverable list + let + bindIp = parseIpAddress("0.0.0.0") + port = Port(0) + nodeKey1 = generateSecp256k1Key() + node1 = newTestWakuNode(nodeKey1, bindIp, port) + nodeKey2 = generateSecp256k1Key() + node2 = newTestWakuNode(nodeKey2, bindIp, port, sendSignedPeerRecord = true) + nodeKey3 = generateSecp256k1Key() + node3 = newTestWakuNode(nodeKey3, bindIp, port, sendSignedPeerRecord = true) + + # Given some peer exchange handlers + proc emptyPeerExchangeHandler( + peer: PeerId, topic: string, peers: seq[RoutingRecordsPair] + ) {.gcsafe.} = + discard + + var completionFut = newFuture[bool]() + proc peerExchangeHandler( + peer: PeerId, topic: string, peers: seq[RoutingRecordsPair] + ) {.gcsafe.} = + ## Handle peers received via gossipsub peer exchange + let peerRecords = peers.mapIt(it.record.get()) + + check: + # Node 3 is informed of node 2 via peer exchange + peer == node1.switch.peerInfo.peerId + topic == DefaultPubsubTopic + peerRecords.countIt(it.peerId == node2.switch.peerInfo.peerId) == 1 + + if (not completionFut.completed()): + completionFut.complete(true) + + let + emptyPeerExchangeHandle: RoutingRecordsHandler = emptyPeerExchangeHandler + peerExchangeHandle: RoutingRecordsHandler = peerExchangeHandler + + # Givem the nodes mount relay with a peer exchange handler + await node1.mountRelay(@[DefaultPubsubTopic], some(emptyPeerExchangeHandle)) + await node2.mountRelay(@[DefaultPubsubTopic], some(emptyPeerExchangeHandle)) + await node3.mountRelay(@[DefaultPubsubTopic], some(peerExchangeHandle)) + + # Ensure that node1 prunes all peers after the first connection + node1.wakuRelay.parameters.dHigh = 1 + + await allFutures([node1.start(), node2.start(), node3.start()]) + + # When nodes are connected + await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) + await node3.connectToNodes(@[node1.switch.peerInfo.toRemotePeerInfo()]) + + # Verify that the handlePeerExchange was called (node3) + check: + (await completionFut.withTimeout(5.seconds)) == true + + # Clean up + await allFutures([node1.stop(), node2.stop(), node3.stop()]) diff --git a/tests/test_waku_peer_exchange.nim b/tests/test_waku_peer_exchange.nim deleted file mode 100644 index 1cc958091..000000000 --- a/tests/test_waku_peer_exchange.nim +++ /dev/null @@ -1,316 +0,0 @@ -{.used.} - -import - std/[options, sequtils, tables], - testutils/unittests, - chronos, - chronicles, - stew/shims/net, - libp2p/switch, - libp2p/peerId, - libp2p/crypto/crypto, - libp2p/multistream, - libp2p/muxers/muxer, - eth/keys, - eth/p2p/discoveryv5/enr -import - ../../waku/waku_node, - ../../waku/node/peer_manager, - ../../waku/waku_discv5, - ../../waku/waku_peer_exchange, - ../../waku/waku_peer_exchange/rpc, - ../../waku/waku_peer_exchange/rpc_codec, - ../../waku/waku_peer_exchange/protocol, - ./testlib/wakucore, - ./testlib/wakunode - - -# TODO: Extend test coverage -procSuite "Waku Peer Exchange": - - asyncTest "encode and decode peer exchange response": - ## Setup - var - enr1 = enr.Record(seqNum: 0, raw: @[]) - enr2 = enr.Record(seqNum: 0, raw: @[]) - - check enr1.fromUri("enr:-JK4QPmO-sE2ELiWr8qVFs1kaY4jQZQpNaHvSPRmKiKcaDoqYRdki2c1BKSliImsxFeOD_UHnkddNL2l0XT9wlsP0WEBgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQIMwKqlOl3zpwnrsKRKHuWPSuFzit1Cl6IZvL2uzBRe8oN0Y3CC6mKDdWRwgiMqhXdha3UyDw") - check enr2.fromUri("enr:-Iu4QK_T7kzAmewG92u1pr7o6St3sBqXaiIaWIsFNW53_maJEaOtGLSN2FUbm6LmVxSfb1WfC7Eyk-nFYI7Gs3SlchwBgmlkgnY0gmlwhI5d6VKJc2VjcDI1NmsxoQLPYQDvrrFdCrhqw3JuFaGD71I8PtPfk6e7TJ3pg_vFQYN0Y3CC6mKDdWRwgiMq") - - let peerInfos = @[ - PeerExchangePeerInfo(enr: enr1.raw), - PeerExchangePeerInfo(enr: enr2.raw), - ] - - var rpc = PeerExchangeRpc( - response: PeerExchangeResponse( - peerInfos: peerInfos - ) - ) - - ## When - let - rpcBuffer: seq[byte] = rpc.encode().buffer - res = PeerExchangeRpc.decode(rpcBuffer) - - ## Then - check: - res.isOk - res.get().response.peerInfos == peerInfos - - ## When - var - resEnr1 = enr.Record(seqNum: 0, raw: @[]) - resEnr2 = enr.Record(seqNum: 0, raw: @[]) - - discard resEnr1.fromBytes(res.get().response.peerInfos[0].enr) - discard resEnr2.fromBytes(res.get().response.peerInfos[1].enr) - - ## Then - check: - resEnr1 == enr1 - resEnr2 == enr2 - - asyncTest "retrieve and provide peer exchange peers from discv5": - ## Given (copied from test_waku_discv5.nim) - let - # todo: px flag - flags = CapabilitiesBitfield.init( - lightpush = false, - filter = false, - store = false, - relay = true - ) - bindIp = parseIpAddress("0.0.0.0") - extIp = parseIpAddress("127.0.0.1") - - nodeKey1 = generateSecp256k1Key() - nodeTcpPort1 = Port(64010) - nodeUdpPort1 = Port(9000) - node1 = newTestWakuNode( - nodeKey1, - bindIp, - nodeTcpPort1, - some(extIp), - wakuFlags = some(flags), - discv5UdpPort = some(nodeUdpPort1) - ) - - nodeKey2 = generateSecp256k1Key() - nodeTcpPort2 = Port(64012) - nodeUdpPort2 = Port(9002) - node2 = newTestWakuNode(nodeKey2, - bindIp, - nodeTcpPort2, - some(extIp), - wakuFlags = some(flags), - discv5UdpPort = some(nodeUdpPort2) - ) - - nodeKey3 = generateSecp256k1Key() - nodeTcpPort3 = Port(64014) - nodeUdpPort3 = Port(9004) - node3 = newTestWakuNode(nodeKey3, - bindIp, - nodeTcpPort3, - some(extIp), - wakuFlags = some(flags), - discv5UdpPort = some(nodeUdpPort3) - ) - - # discv5 - let conf1 = WakuDiscoveryV5Config( - discv5Config: none(DiscoveryConfig), - address: bindIp, - port: nodeUdpPort1, - privateKey: keys.PrivateKey(nodeKey1.skkey), - bootstrapRecords: @[], - autoupdateRecord: true - ) - - let disc1 = WakuDiscoveryV5.new( - node1.rng, - conf1, - some(node1.enr), - some(node1.peerManager), - ) - - let conf2 = WakuDiscoveryV5Config( - discv5Config: none(DiscoveryConfig), - address: bindIp, - port: nodeUdpPort2, - privateKey: keys.PrivateKey(nodeKey2.skkey), - bootstrapRecords: @[disc1.protocol.getRecord()], - autoupdateRecord: true - ) - - let disc2 = WakuDiscoveryV5.new( - node2.rng, - conf2, - some(node2.enr), - some(node2.peerManager), - ) - - await allFutures(node1.start(), node2.start(), node3.start()) - let resultDisc1StartRes = await disc1.start() - assert resultDisc1StartRes.isOk(), resultDisc1StartRes.error - let resultDisc2StartRes = await disc2.start() - assert resultDisc2StartRes.isOk(), resultDisc2StartRes.error - - ## When - var attempts = 10 - while (disc1.protocol.nodesDiscovered < 1 or - disc2.protocol.nodesDiscovered < 1) and - attempts > 0: - await sleepAsync(1.seconds) - attempts -= 1 - - # node2 can be connected, so will be returned by peer exchange - require (await node1.peerManager.connectRelay(node2.switch.peerInfo.toRemotePeerInfo())) - - # Mount peer exchange - await node1.mountPeerExchange() - await node3.mountPeerExchange() - - var peerInfosLen = 0 - var response: WakuPeerExchangeResult[PeerExchangeResponse] - attempts = 10 - while peerInfosLen == 0 and attempts > 0: - var connOpt = await node3.peerManager.dialPeer(node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec) - require connOpt.isSome - response = await node3.wakuPeerExchange.request(1, connOpt.get()) - require response.isOk - peerInfosLen = response.get().peerInfos.len - await sleepAsync(1.seconds) - attempts -= 1 - - ## Then - check: - response.get().peerInfos.len == 1 - response.get().peerInfos[0].enr == disc2.protocol.localNode.record.raw - - await allFutures([node1.stop(), node2.stop(), node3.stop(), disc1.stop(), disc2.stop()]) - - asyncTest "peer exchange request functions returns some discovered peers": - let - node1 = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) - node2 = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) - - # Start and mount peer exchange - await allFutures([node1.start(), node2.start()]) - await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()]) - - # Create connection - let connOpt = await node2.peerManager.dialPeer(node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec) - require: - connOpt.isSome - - # Create some enr and add to peer exchange (sumilating disv5) - var enr1, enr2 = enr.Record() - check enr1.fromUri("enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB") - check enr2.fromUri("enr:-Iu4QGJllOWlviPIh_SGR-VVm55nhnBIU5L-s3ran7ARz_4oDdtJPtUs3Bc5aqZHCiPQX6qzNYF2ARHER0JPX97TFbEBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQP3ULycvday4EkvtVu0VqbBdmOkbfVLJx8fPe0lE_dRkIN0Y3CC6mCFd2FrdTIB") - - # Mock that we have discovered these enrs - node1.wakuPeerExchange.enrCache.add(enr1) - node1.wakuPeerExchange.enrCache.add(enr2) - - # Request 2 peer from px. Test all request variants - let response1 = await node2.wakuPeerExchange.request(2) - let response2 = await node2.wakuPeerExchange.request(2, node1.peerInfo.toRemotePeerInfo()) - let response3 = await node2.wakuPeerExchange.request(2, connOpt.get()) - - # Check the response or dont even continue - require: - response1.isOk - response2.isOk - response3.isOk - - check: - response1.get().peerInfos.len == 2 - response2.get().peerInfos.len == 2 - response3.get().peerInfos.len == 2 - - # Since it can return duplicates test that at least one of the enrs is in the response - response1.get().peerInfos.anyIt(it.enr == enr1.raw) or response1.get().peerInfos.anyIt(it.enr == enr2.raw) - response2.get().peerInfos.anyIt(it.enr == enr1.raw) or response2.get().peerInfos.anyIt(it.enr == enr2.raw) - response3.get().peerInfos.anyIt(it.enr == enr1.raw) or response3.get().peerInfos.anyIt(it.enr == enr2.raw) - - asyncTest "peer exchange handler works as expected": - let - node1 = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) - node2 = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) - - # Start and mount peer exchange - await allFutures([node1.start(), node2.start()]) - await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()]) - - # Mock that we have discovered these enrs - var enr1 = enr.Record() - check enr1.fromUri("enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB") - node1.wakuPeerExchange.enrCache.add(enr1) - - # Create connection - let connOpt = await node2.peerManager.dialPeer(node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec) - require connOpt.isSome - let conn = connOpt.get() - - # Send bytes so that they directly hit the handler - let rpc = PeerExchangeRpc( - request: PeerExchangeRequest(numPeers: 1)) - - var buffer: seq[byte] - await conn.writeLP(rpc.encode().buffer) - buffer = await conn.readLp(MaxRpcSize.int) - - # Decode the response - let decodedBuff = PeerExchangeRpc.decode(buffer) - require decodedBuff.isOk - - # Check we got back the enr we mocked - check: - decodedBuff.get().response.peerInfos.len == 1 - decodedBuff.get().response.peerInfos[0].enr == enr1.raw - - asyncTest "peer exchange request fails gracefully": - let - node1 = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) - node2 = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) - - # Start and mount peer exchange - await allFutures([node1.start(), node2.start()]) - await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()]) - - # Create connection - let connOpt = await node2.peerManager.dialPeer(node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec) - require connOpt.isSome - - # Force closing the connection to simulate a failed peer - await connOpt.get().close() - - # Request 2 peer from px - let response = await node1.wakuPeerExchange.request(2, connOpt.get()) - - # Check that it failed gracefully - check: response.isErr - - - asyncTest "connections are closed after response is sent": - # Create 3 nodes - let nodes = toSeq(0..<3).mapIt(newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))) - - await allFutures(nodes.mapIt(it.start())) - await allFutures(nodes.mapIt(it.mountPeerExchange())) - - # Multiple nodes request to node 0 - for i in 1..<3: - let resp = await nodes[i].wakuPeerExchange.request(2, nodes[0].switch.peerInfo.toRemotePeerInfo()) - require resp.isOk - - # Wait for streams to be closed - await sleepAsync(1.seconds) - - # Check that all streams are closed for px - check: - nodes[0].peerManager.getNumStreams(WakuPeerExchangeCodec) == (0, 0) - nodes[1].peerManager.getNumStreams(WakuPeerExchangeCodec) == (0, 0) - nodes[2].peerManager.getNumStreams(WakuPeerExchangeCodec) == (0, 0) diff --git a/tests/test_wakunode.nim b/tests/test_wakunode.nim index e7a1831f3..279f4a842 100644 --- a/tests/test_wakunode.nim +++ b/tests/test_wakunode.nim @@ -344,29 +344,3 @@ suite "WakuNode": node1MultiAddrs.contains(expectedMultiaddress1) await allFutures(node1.stop(), node2.stop()) - - asyncTest "Function fetchPeerExchangePeers succesfully exchanges px peers": - let - node1 = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) - node2 = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) - - # Start and mount peer exchange - await allFutures([node1.start(), node2.start()]) - await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()]) - - # Mock that we discovered a node (to avoid running discv5) - var enr = enr.Record() - require enr.fromUri("enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB") - node2.wakuPeerExchange.enrCache.add(enr) - - # Set node2 as service peer (default one) for px protocol - node1.peerManager.addServicePeer(node2.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec) - - # Request 1 peer from peer exchange protocol - await node1.fetchPeerExchangePeers(1) - - # Check that the peer ended up in the peerstore - let rpInfo = enr.toRemotePeerInfo.get() - check: - node1.peerManager.peerStore.peers.anyIt(it.peerId == rpInfo.peerId) - node1.peerManager.peerStore.peers.anyIt(it.addrs == rpInfo.addrs) diff --git a/tests/testlib/futures.nim b/tests/testlib/futures.nim index d027a9c4c..fcc37d909 100644 --- a/tests/testlib/futures.nim +++ b/tests/testlib/futures.nim @@ -5,6 +5,7 @@ import ../../../waku/[waku_core/message, waku_store] const FUTURE_TIMEOUT* = 1.seconds FUTURE_TIMEOUT_LONG* = 10.seconds + FUTURE_TIMEOUT_SHORT* = 100.milliseconds proc newPushHandlerFuture*(): Future[(string, WakuMessage)] = newFuture[(string, WakuMessage)]() @@ -31,6 +32,8 @@ proc toResult*(future: Future[void]): Result[void, string] = else: return chronos.err("Future finished but failed.") -proc waitForResult*[T](future: Future[T], timeout = FUTURE_TIMEOUT): Future[Result[T, string]] {.async.} = +proc waitForResult*[T]( + future: Future[T], timeout = FUTURE_TIMEOUT +): Future[Result[T, string]] {.async.} = discard await future.withTimeout(timeout) return future.toResult() diff --git a/tests/waku_peer_exchange/test_all.nim b/tests/waku_peer_exchange/test_all.nim new file mode 100644 index 000000000..a72d752ed --- /dev/null +++ b/tests/waku_peer_exchange/test_all.nim @@ -0,0 +1 @@ +import ./test_protocol, ./test_rpc_codec \ No newline at end of file diff --git a/tests/waku_peer_exchange/test_protocol.nim b/tests/waku_peer_exchange/test_protocol.nim new file mode 100644 index 000000000..757ae312b --- /dev/null +++ b/tests/waku_peer_exchange/test_protocol.nim @@ -0,0 +1,394 @@ +{.used.} + +import + std/[options, sequtils, tables], + testutils/unittests, + chronos, + chronicles, + stew/shims/net, + libp2p/[switch, peerId, crypto/crypto, multistream, muxers/muxer], + eth/[keys, p2p/discoveryv5/enr] + +import + ../../../waku/[ + waku_node, + node/peer_manager, + waku_discv5, + waku_peer_exchange, + waku_peer_exchange/rpc, + waku_peer_exchange/rpc_codec, + waku_peer_exchange/protocol, + node/peer_manager, + waku_relay/protocol, + waku_relay, + waku_core, + waku_core/message/codec + ], + ../testlib/[wakucore, wakunode, simple_mock, assertions], + ./utils.nim + +suite "Waku Peer Exchange": + # Some of this tests use node.wakuPeerExchange instead of just a standalone PeerExchange. + # This is because attempts to connect the switches for two standalones PeerExchanges failed. + # TODO: Try to make the tests work with standalone PeerExchanges + + suite "request": + asyncTest "Retrieve and provide peer exchange peers from discv5": + ## Given (copied from test_waku_discv5.nim) + let + # todo: px flag + flags = + CapabilitiesBitfield.init( + lightpush = false, filter = false, store = false, relay = true + ) + bindIp = parseIpAddress("0.0.0.0") + extIp = parseIpAddress("127.0.0.1") + + nodeKey1 = generateSecp256k1Key() + nodeTcpPort1 = Port(64010) + nodeUdpPort1 = Port(9000) + node1 = + newTestWakuNode( + nodeKey1, + bindIp, + nodeTcpPort1, + some(extIp), + wakuFlags = some(flags), + discv5UdpPort = some(nodeUdpPort1), + ) + + nodeKey2 = generateSecp256k1Key() + nodeTcpPort2 = Port(64012) + nodeUdpPort2 = Port(9002) + node2 = + newTestWakuNode( + nodeKey2, + bindIp, + nodeTcpPort2, + some(extIp), + wakuFlags = some(flags), + discv5UdpPort = some(nodeUdpPort2), + ) + + nodeKey3 = generateSecp256k1Key() + nodeTcpPort3 = Port(64014) + nodeUdpPort3 = Port(9004) + node3 = + newTestWakuNode( + nodeKey3, + bindIp, + nodeTcpPort3, + some(extIp), + wakuFlags = some(flags), + discv5UdpPort = some(nodeUdpPort3), + ) + + # discv5 + let + conf1 = + WakuDiscoveryV5Config( + discv5Config: none(DiscoveryConfig), + address: bindIp, + port: nodeUdpPort1, + privateKey: keys.PrivateKey(nodeKey1.skkey), + bootstrapRecords: @[], + autoupdateRecord: true, + ) + + let + disc1 = + WakuDiscoveryV5.new( + node1.rng, conf1, some(node1.enr), some(node1.peerManager) + ) + + let + conf2 = + WakuDiscoveryV5Config( + discv5Config: none(DiscoveryConfig), + address: bindIp, + port: nodeUdpPort2, + privateKey: keys.PrivateKey(nodeKey2.skkey), + bootstrapRecords: @[disc1.protocol.getRecord()], + autoupdateRecord: true, + ) + + let + disc2 = + WakuDiscoveryV5.new( + node2.rng, conf2, some(node2.enr), some(node2.peerManager) + ) + + await allFutures(node1.start(), node2.start(), node3.start()) + let resultDisc1StartRes = await disc1.start() + assert resultDisc1StartRes.isOk(), resultDisc1StartRes.error + let resultDisc2StartRes = await disc2.start() + assert resultDisc2StartRes.isOk(), resultDisc2StartRes.error + + ## When + var attempts = 10 + while (disc1.protocol.nodesDiscovered < 1 or disc2.protocol.nodesDiscovered < 1) and + attempts > 0: + await sleepAsync(1.seconds) + attempts -= 1 + + # node2 can be connected, so will be returned by peer exchange + require ( + await node1.peerManager.connectRelay(node2.switch.peerInfo.toRemotePeerInfo()) + ) + + # Mount peer exchange + await node1.mountPeerExchange() + await node3.mountPeerExchange() + + let + dialResponse = + await node3.dialForPeerExchange(node1.switch.peerInfo.toRemotePeerInfo()) + let response = dialResponse.get() + + ## Then + check: + response.get().peerInfos.len == 1 + response.get().peerInfos[0].enr == disc2.protocol.localNode.record.raw + + await allFutures( + [node1.stop(), node2.stop(), node3.stop(), disc1.stop(), disc2.stop()] + ) + + asyncTest "Request returns some discovered peers": + let + node1 = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + node2 = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + + # Start and mount peer exchange + await allFutures([node1.start(), node2.start()]) + await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()]) + + # Create connection + let + connOpt = + await node2.peerManager.dialPeer( + node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec + ) + require: + connOpt.isSome + + # Create some enr and add to peer exchange (simulating disv5) + var enr1, enr2 = enr.Record() + check enr1.fromUri( + "enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB" + ) + check enr2.fromUri( + "enr:-Iu4QGJllOWlviPIh_SGR-VVm55nhnBIU5L-s3ran7ARz_4oDdtJPtUs3Bc5aqZHCiPQX6qzNYF2ARHER0JPX97TFbEBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQP3ULycvday4EkvtVu0VqbBdmOkbfVLJx8fPe0lE_dRkIN0Y3CC6mCFd2FrdTIB" + ) + + # Mock that we have discovered these enrs + node1.wakuPeerExchange.enrCache.add(enr1) + node1.wakuPeerExchange.enrCache.add(enr2) + + # Request 2 peer from px. Test all request variants + let response1 = await node2.wakuPeerExchange.request(2) + let + response2 = + await node2.wakuPeerExchange.request(2, node1.peerInfo.toRemotePeerInfo()) + let response3 = await node2.wakuPeerExchange.request(2, connOpt.get()) + + # Check the response or dont even continue + require: + response1.isOk + response2.isOk + response3.isOk + + check: + response1.get().peerInfos.len == 2 + response2.get().peerInfos.len == 2 + response3.get().peerInfos.len == 2 + + # Since it can return duplicates test that at least one of the enrs is in the response + response1.get().peerInfos.anyIt(it.enr == enr1.raw) or + response1.get().peerInfos.anyIt(it.enr == enr2.raw) + response2.get().peerInfos.anyIt(it.enr == enr1.raw) or + response2.get().peerInfos.anyIt(it.enr == enr2.raw) + response3.get().peerInfos.anyIt(it.enr == enr1.raw) or + response3.get().peerInfos.anyIt(it.enr == enr2.raw) + + asyncTest "Request fails gracefully": + let + node1 = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + node2 = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + + # Start and mount peer exchange + await allFutures([node1.start(), node2.start()]) + await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()]) + + # Create connection + let + connOpt = + await node2.peerManager.dialPeer( + node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec + ) + require connOpt.isSome + + # Force closing the connection to simulate a failed peer + await connOpt.get().close() + + # Request 2 peer from px + let response = await node1.wakuPeerExchange.request(2, connOpt.get()) + + # Check that it failed gracefully + check: + response.isErr + + asyncTest "Request 0 peers, with 0 peers in PeerExchange": + # Given a disconnected PeerExchange + let + switch = newTestSwitch() + peerManager = PeerManager.new(switch) + peerExchange = WakuPeerExchange.new(peerManager) + + # When requesting 0 peers + let response = await peerExchange.request(0) + + # Then the response should be an error + check: + response.isErr + response.error == "peer_not_found_failure" + + asyncTest "Request 0 peers, with 1 peer in PeerExchange": + # Given two valid nodes with PeerExchange + let + node1 = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + node2 = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + + # Start and mount peer exchange + await allFutures([node1.start(), node2.start()]) + await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()]) + + # Connect the nodes + let + dialResponse = + await node2.peerManager.dialPeer( + node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec + ) + assert dialResponse.isSome + + # Mock that we have discovered one enr + var record = enr.Record() + check record.fromUri( + "enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB" + ) + node1.wakuPeerExchange.enrCache.add(record) + + # When requesting 0 peers + let response = await node1.wakuPeerExchange.request(0) + + # Then the response should be empty + assertResultOk(response) + check response.get().peerInfos.len == 0 + + asyncTest "Request with invalid peer info": + # Given two valid nodes with PeerExchange + let + node1 = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + node2 = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + + # Start and mount peer exchange + await allFutures([node1.start(), node2.start()]) + await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()]) + + # Mock that we have discovered one enr + var record = enr.Record() + check record.fromUri( + "enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB" + ) + node1.wakuPeerExchange.enrCache.add(record) + + # When making any request with an invalid peer info + var remotePeerInfo2 = node2.peerInfo.toRemotePeerInfo() + remotePeerInfo2.peerId.data.add(255.byte) + let response = await node1.wakuPeerExchange.request(1, remotePeerInfo2) + + # Then the response should be an error + check: + response.isErr + response.error == "dial_failure" + + asyncTest "Connections are closed after response is sent": + # Create 3 nodes + let + nodes = + toSeq(0..<3).mapIt( + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + ) + + await allFutures(nodes.mapIt(it.start())) + await allFutures(nodes.mapIt(it.mountPeerExchange())) + + # Multiple nodes request to node 0 + for i in 1..<3: + let + resp = + await nodes[i].wakuPeerExchange.request( + 2, nodes[0].switch.peerInfo.toRemotePeerInfo() + ) + require resp.isOk + + # Wait for streams to be closed + await sleepAsync(1.seconds) + + # Check that all streams are closed for px + check: + nodes[0].peerManager.getNumStreams(WakuPeerExchangeCodec) == (0, 0) + nodes[1].peerManager.getNumStreams(WakuPeerExchangeCodec) == (0, 0) + nodes[2].peerManager.getNumStreams(WakuPeerExchangeCodec) == (0, 0) + + suite "Protocol Handler": + asyncTest "Works as expected": + let + node1 = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + node2 = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + + # Start and mount peer exchange + await allFutures([node1.start(), node2.start()]) + await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()]) + + # Mock that we have discovered these enrs + var enr1 = enr.Record() + check enr1.fromUri( + "enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB" + ) + node1.wakuPeerExchange.enrCache.add(enr1) + + # Create connection + let + connOpt = + await node2.peerManager.dialPeer( + node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec + ) + require connOpt.isSome + let conn = connOpt.get() + + # Send bytes so that they directly hit the handler + let rpc = PeerExchangeRpc(request: PeerExchangeRequest(numPeers: 1)) + + var buffer: seq[byte] + await conn.writeLP(rpc.encode().buffer) + buffer = await conn.readLp(MaxRpcSize.int) + + # Decode the response + let decodedBuff = PeerExchangeRpc.decode(buffer) + require decodedBuff.isOk + + # Check we got back the enr we mocked + check: + decodedBuff.get().response.peerInfos.len == 1 + decodedBuff.get().response.peerInfos[0].enr == enr1.raw diff --git a/tests/waku_peer_exchange/test_rpc_codec.nim b/tests/waku_peer_exchange/test_rpc_codec.nim new file mode 100644 index 000000000..6d57e49e5 --- /dev/null +++ b/tests/waku_peer_exchange/test_rpc_codec.nim @@ -0,0 +1,64 @@ +{.used.} + +import + std/[options], + testutils/unittests, + chronos, + stew/shims/net, + libp2p/switch, + libp2p/peerId, + libp2p/crypto/crypto, + eth/keys, + eth/p2p/discoveryv5/enr + +import + ../../../waku/[ + node/peer_manager, + waku_discv5, + waku_peer_exchange/rpc, + waku_peer_exchange/rpc_codec + ], + ../testlib/[wakucore] + +suite "Peer Exchange RPC": + asyncTest "Encode - Decode": + # Setup + var + enr1 = enr.Record(seqNum: 0, raw: @[]) + enr2 = enr.Record(seqNum: 0, raw: @[]) + + check: + enr1.fromUri( + "enr:-JK4QPmO-sE2ELiWr8qVFs1kaY4jQZQpNaHvSPRmKiKcaDoqYRdki2c1BKSliImsxFeOD_UHnkddNL2l0XT9wlsP0WEBgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQIMwKqlOl3zpwnrsKRKHuWPSuFzit1Cl6IZvL2uzBRe8oN0Y3CC6mKDdWRwgiMqhXdha3UyDw" + ) + enr2.fromUri( + "enr:-Iu4QK_T7kzAmewG92u1pr7o6St3sBqXaiIaWIsFNW53_maJEaOtGLSN2FUbm6LmVxSfb1WfC7Eyk-nFYI7Gs3SlchwBgmlkgnY0gmlwhI5d6VKJc2VjcDI1NmsxoQLPYQDvrrFdCrhqw3JuFaGD71I8PtPfk6e7TJ3pg_vFQYN0Y3CC6mKDdWRwgiMq" + ) + + let + peerInfos = + @[PeerExchangePeerInfo(enr: enr1.raw), PeerExchangePeerInfo(enr: enr2.raw)] + rpc = PeerExchangeRpc(response: PeerExchangeResponse(peerInfos: peerInfos)) + + # When encoding and decoding + let + rpcBuffer: seq[byte] = rpc.encode().buffer + res = PeerExchangeRpc.decode(rpcBuffer) + + # Then the peerInfos match the originals + check: + res.isOk + res.get().response.peerInfos == peerInfos + + # When using the decoded responses to create new enrs + var + resEnr1 = enr.Record(seqNum: 0, raw: @[]) + resEnr2 = enr.Record(seqNum: 0, raw: @[]) + + discard resEnr1.fromBytes(res.get().response.peerInfos[0].enr) + discard resEnr2.fromBytes(res.get().response.peerInfos[1].enr) + + # Then they match the original enrs + check: + resEnr1 == enr1 + resEnr2 == enr2 diff --git a/tests/waku_peer_exchange/utils.nim b/tests/waku_peer_exchange/utils.nim new file mode 100644 index 000000000..84ad586c5 --- /dev/null +++ b/tests/waku_peer_exchange/utils.nim @@ -0,0 +1,51 @@ +{.used.} + +import + std/options, + testutils/unittests, + chronos, + libp2p/switch, + libp2p/peerId, + libp2p/crypto/crypto, + eth/keys, + eth/p2p/discoveryv5/enr + +import + ../../../waku/[ + waku_node, + waku_discv5, + waku_peer_exchange, + waku_peer_exchange/rpc, + waku_peer_exchange/protocol, + node/peer_manager, + waku_core + ], + ../testlib/[futures, wakucore, assertions] + +proc dialForPeerExchange*( + client: WakuNode, + peerInfo: RemotePeerInfo, + requestedPeers: uint64 = 1, + minimumPeers: uint64 = 0, + attempts: uint64 = 100, +): Future[Result[WakuPeerExchangeResult[PeerExchangeResponse], string]] {.async.} = + # Dials a peer and awaits until it's able to receive a peer exchange response + # For the test, the relevant part is the dialPeer call. + # But because the test needs peers, and due to the asynchronous nature of the dialing, + # we await until we receive peers from the peer exchange protocol. + var attempts = attempts + + while attempts > 0: + let connOpt = await client.peerManager.dialPeer(peerInfo, WakuPeerExchangeCodec) + require connOpt.isSome() + await sleepAsync(FUTURE_TIMEOUT_SHORT) + + let response = await client.wakuPeerExchange.request(requestedPeers, connOpt.get()) + assertResultOk(response) + + if uint64(response.get().peerInfos.len) > minimumPeers: + return ok(response) + + attempts -= 1 + + return err("Attempts exhausted.") diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index 6b59982f1..cc5360bdf 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -333,7 +333,9 @@ proc startNode*(node: WakuNode, conf: WakuNodeConf, # retrieve px peers and add the to the peer store if conf.peerExchangeNode != "": let desiredOutDegree = node.wakuRelay.parameters.d.uint64() - await node.fetchPeerExchangePeers(desiredOutDegree) + (await node.fetchPeerExchangePeers(desiredOutDegree)).isOkOr: + error "error while fetching peers from peer exchange", error = error + quit(QuitFailure) # Start keepalive, if enabled if conf.keepAlive: diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index c11600bb3..47a0c8735 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -450,7 +450,7 @@ proc filterHandleMessage*(node: WakuNode, {.async.}= if node.wakuFilter.isNil() or node.wakuFilterLegacy.isNil(): - error "cannot handle filter message", error="waku filter is nil" + error "cannot handle filter message", error = "waku filter and waku filter legacy are both required" return await allFutures(node.wakuFilter.handleMessage(pubsubTopic, message), @@ -1022,10 +1022,12 @@ proc mountPeerExchange*(node: WakuNode) {.async, raises: [Defect, LPError].} = node.switch.mount(node.wakuPeerExchange, protocolMatcher(WakuPeerExchangeCodec)) -proc fetchPeerExchangePeers*(node: Wakunode, amount: uint64) {.async, raises: [Defect].} = +proc fetchPeerExchangePeers*( + node: Wakunode, amount: uint64 +): Future[Result[int, string]] {.async, raises: [Defect].} = if node.wakuPeerExchange.isNil(): error "could not get peers from px, waku peer-exchange is nil" - return + return err("PeerExchange is not mounted") info "Retrieving peer info via peer exchange protocol" let pxPeersRes = await node.wakuPeerExchange.request(amount) @@ -1035,14 +1037,18 @@ proc fetchPeerExchangePeers*(node: Wakunode, amount: uint64) {.async, raises: [D for pi in peers: var record: enr.Record if enr.fromBytes(record, pi.enr): - node.peerManager.addPeer(record.toRemotePeerInfo().get, PeerExcahnge) + node.peerManager.addPeer(record.toRemotePeerInfo().get, PeerExchange) validPeers += 1 - info "Retrieved peer info via peer exchange protocol", validPeers = validPeers, totalPeers = peers.len + info "Retrieved peer info via peer exchange protocol", + validPeers = validPeers, totalPeers = peers.len + return ok(validPeers) else: - warn "Failed to retrieve peer info via peer exchange protocol", error = pxPeersRes.error + warn "failed to retrieve peer info via peer exchange protocol", + error = pxPeersRes.error + return err("Peer exchange failure: " & $pxPeersRes.error) # TODO: Move to application module (e.g., wakunode2.nim) -proc setPeerExchangePeer*(node: WakuNode, peer: RemotePeerInfo|string) = +proc setPeerExchangePeer*(node: WakuNode, peer: RemotePeerInfo | MultiAddress | string) = if node.wakuPeerExchange.isNil(): error "could not set peer, waku peer-exchange is nil" return @@ -1054,7 +1060,7 @@ proc setPeerExchangePeer*(node: WakuNode, peer: RemotePeerInfo|string) = error "could not parse peer info", error = remotePeerRes.error return - node.peerManager.addPeer(remotePeerRes.value, WakuPeerExchangeCodec) + node.peerManager.addPeer(remotePeerRes.value, PeerExchange) waku_px_peers.inc() diff --git a/waku/waku_core/peers.nim b/waku/waku_core/peers.nim index f7d4fae59..2f1cbb889 100644 --- a/waku/waku_core/peers.nim +++ b/waku/waku_core/peers.nim @@ -35,7 +35,7 @@ type UnknownOrigin, Discv5, Static, - PeerExcahnge, + PeerExchange, Dns PeerDirection* = enum