diff --git a/apps/wakunode2/wakunode2.nim b/apps/wakunode2/wakunode2.nim index 11d54b694..7c426f022 100644 --- a/apps/wakunode2/wakunode2.nim +++ b/apps/wakunode2/wakunode2.nim @@ -41,6 +41,7 @@ import ../../waku/v2/protocol/waku_archive/retention_policy/retention_policy_capacity, ../../waku/v2/protocol/waku_archive/retention_policy/retention_policy_time, ../../waku/v2/protocol/waku_store, + ../../waku/v2/protocol/waku_relay, ../../waku/v2/protocol/waku_filter, ../../waku/v2/protocol/waku_lightpush, ../../waku/v2/protocol/waku_peer_exchange, @@ -525,14 +526,10 @@ proc startNode(node: WakuNode, conf: WakuNodeConf, if conf.peerExchange: asyncSpawn runPeerExchangeDiscv5Loop(node.wakuPeerExchange) - # retrieve and connect to peer exchange peers + # retrieve px peers and add the to the peer store if conf.peerExchangeNode != "": - info "Retrieving peer info via peer exchange protocol" let desiredOutDegree = node.wakuRelay.parameters.d.uint64() - try: - discard await node.wakuPeerExchange.request(desiredOutDegree) - except: - return err("failed to retrieve peer info via peer exchange protocol: " & getCurrentExceptionMsg()) + await node.fetchPeerExchangePeers(desiredOutDegree) # Start keepalive, if enabled if conf.keepAlive: diff --git a/tests/v2/test_waku_peer_exchange.nim b/tests/v2/test_waku_peer_exchange.nim index c331fb159..c2d5b22b6 100644 --- a/tests/v2/test_waku_peer_exchange.nim +++ b/tests/v2/test_waku_peer_exchange.nim @@ -1,12 +1,13 @@ {.used.} import - std/options, + std/[options, sequtils], testutils/unittests, chronos, chronicles, stew/shims/net, libp2p/switch, + libp2p/peerId, libp2p/crypto/crypto, eth/keys, eth/p2p/discoveryv5/enr @@ -18,7 +19,8 @@ import ../../waku/v2/protocol/waku_peer_exchange/rpc, ../../waku/v2/protocol/waku_peer_exchange/rpc_codec, ../test_helpers, - ./utils + ./utils, + ./testlib/testutils # TODO: Extend test coverage @@ -30,8 +32,8 @@ procSuite "Waku Peer Exchange": enr1 = enr.Record(seqNum: 0, raw: @[]) enr2 = enr.Record(seqNum: 0, raw: @[]) - discard enr1.fromUri("enr:-JK4QPmO-sE2ELiWr8qVFs1kaY4jQZQpNaHvSPRmKiKcaDoqYRdki2c1BKSliImsxFeOD_UHnkddNL2l0XT9wlsP0WEBgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQIMwKqlOl3zpwnrsKRKHuWPSuFzit1Cl6IZvL2uzBRe8oN0Y3CC6mKDdWRwgiMqhXdha3UyDw") - discard enr2.fromUri("enr:-Iu4QK_T7kzAmewG92u1pr7o6St3sBqXaiIaWIsFNW53_maJEaOtGLSN2FUbm6LmVxSfb1WfC7Eyk-nFYI7Gs3SlchwBgmlkgnY0gmlwhI5d6VKJc2VjcDI1NmsxoQLPYQDvrrFdCrhqw3JuFaGD71I8PtPfk6e7TJ3pg_vFQYN0Y3CC6mKDdWRwgiMq") + check enr1.fromUri("enr:-JK4QPmO-sE2ELiWr8qVFs1kaY4jQZQpNaHvSPRmKiKcaDoqYRdki2c1BKSliImsxFeOD_UHnkddNL2l0XT9wlsP0WEBgmlkgnY0gmlwhH8AAAGJc2VjcDI1NmsxoQIMwKqlOl3zpwnrsKRKHuWPSuFzit1Cl6IZvL2uzBRe8oN0Y3CC6mKDdWRwgiMqhXdha3UyDw") + check enr2.fromUri("enr:-Iu4QK_T7kzAmewG92u1pr7o6St3sBqXaiIaWIsFNW53_maJEaOtGLSN2FUbm6LmVxSfb1WfC7Eyk-nFYI7Gs3SlchwBgmlkgnY0gmlwhI5d6VKJc2VjcDI1NmsxoQLPYQDvrrFdCrhqw3JuFaGD71I8PtPfk6e7TJ3pg_vFQYN0Y3CC6mKDdWRwgiMq") let peerInfos = @[ PeerExchangePeerInfo(enr: enr1.raw), @@ -127,20 +129,127 @@ procSuite "Waku Peer Exchange": await node1.mountPeerExchange() await node3.mountPeerExchange() - await sleepAsync(3000.millis) # Give the algorithm some time to work its magic + # Give the algorithm some time to work its magic + await sleepAsync(3000.millis) asyncSpawn node1.wakuPeerExchange.runPeerExchangeDiscv5Loop() - node3.setPeerExchangePeer(node1.peerInfo.toRemotePeerInfo()) + let connOpt = await node3.peerManager.dialPeer(node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec) + check: + connOpt.isSome + + # Give the algorithm some time to work its magic + await sleepAsync(2000.millis) ## When - discard waitFor node3.wakuPeerExchange.request(1) - - await sleepAsync(2000.millis) # Give the algorithm some time to work its magic + let response = await node3.wakuPeerExchange.request(1, connOpt.get()) ## Then check: + response.isOk + response.get().peerInfos.len == 1 node1.wakuDiscv5.protocol.nodesDiscovered > 0 - node3.switch.peerStore[AddressBook].contains(node2.switch.peerInfo.peerId) await allFutures([node1.stop(), node2.stop(), node3.stop()]) + + asyncTest "peer exchange request functions returns some discovered peers": + let + node1 = WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0)) + node2 = WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0)) + + # Start and mount peer exchange + await allFutures([node1.start(), node2.start()]) + await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()]) + + # Create connection + let connOpt = await node2.peerManager.dialPeer(node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec) + require: + connOpt.isSome + + # Create some enr and add to peer exchange (sumilating disv5) + var enr1, enr2 = enr.Record() + check enr1.fromUri("enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB") + check enr2.fromUri("enr:-Iu4QGJllOWlviPIh_SGR-VVm55nhnBIU5L-s3ran7ARz_4oDdtJPtUs3Bc5aqZHCiPQX6qzNYF2ARHER0JPX97TFbEBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQP3ULycvday4EkvtVu0VqbBdmOkbfVLJx8fPe0lE_dRkIN0Y3CC6mCFd2FrdTIB") + + # Mock that we have discovered these enrs + node1.wakuPeerExchange.enrCache.add(enr1) + node1.wakuPeerExchange.enrCache.add(enr2) + + # Request 2 peer from px. Test all request variants + let response1 = await node2.wakuPeerExchange.request(2) + let response2 = await node2.wakuPeerExchange.request(2, node1.peerInfo.toRemotePeerInfo()) + let response3 = await node2.wakuPeerExchange.request(2, connOpt.get()) + + # Check the response or dont even continue + require: + response1.isOk + response2.isOk + response3.isOk + + check: + response1.get().peerInfos.len == 2 + response2.get().peerInfos.len == 2 + response3.get().peerInfos.len == 2 + + # Since it can return duplicates test that at least one of the enrs is in the response + response1.get().peerInfos.anyIt(it.enr == enr1.raw) or response1.get().peerInfos.anyIt(it.enr == enr2.raw) + response2.get().peerInfos.anyIt(it.enr == enr1.raw) or response2.get().peerInfos.anyIt(it.enr == enr2.raw) + response3.get().peerInfos.anyIt(it.enr == enr1.raw) or response3.get().peerInfos.anyIt(it.enr == enr2.raw) + + asyncTest "peer exchange handler works as expected": + let + node1 = WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0)) + node2 = WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0)) + + # Start and mount peer exchange + await allFutures([node1.start(), node2.start()]) + await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()]) + + # Mock that we have discovered these enrs + var enr1 = enr.Record() + check enr1.fromUri("enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB") + node1.wakuPeerExchange.enrCache.add(enr1) + + # Create connection + let connOpt = await node2.peerManager.dialPeer(node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec) + require connOpt.isSome + let conn = connOpt.get() + + # Send bytes so that they directly hit the handler + let rpc = PeerExchangeRpc( + request: PeerExchangeRequest(numPeers: 1)) + + var buffer: seq[byte] + await conn.writeLP(rpc.encode().buffer) + buffer = await conn.readLp(MaxRpcSize.int) + + # Decode the response + let decodedBuff = PeerExchangeRpc.decode(buffer) + require decodedBuff.isOk + + # Check we got back the enr we mocked + check: + decodedBuff.get().response.peerInfos.len == 1 + decodedBuff.get().response.peerInfos[0].enr == enr1.raw + + asyncTest "peer exchange request fails gracefully": + let + node1 = WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0)) + node2 = WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0)) + + # Start and mount peer exchange + await allFutures([node1.start(), node2.start()]) + await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()]) + + # Create connection + let connOpt = await node2.peerManager.dialPeer(node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec) + require connOpt.isSome + + # Force closing the connection to simulate a failed peer + await connOpt.get().close() + + # Request 2 peer from px + let response = await node1.wakuPeerExchange.request(2, connOpt.get()) + + # Check that it failed gracefully + check: response.isErr diff --git a/tests/v2/test_wakunode.nim b/tests/v2/test_wakunode.nim index 2b59445a2..cc08fb858 100644 --- a/tests/v2/test_wakunode.nim +++ b/tests/v2/test_wakunode.nim @@ -1,6 +1,7 @@ {.used.} import + std/sequtils, stew/byteutils, stew/shims/net as stewNet, testutils/unittests, @@ -13,13 +14,17 @@ import libp2p/protocols/pubsub/rpc/messages, libp2p/protocols/pubsub/pubsub, libp2p/protocols/pubsub/gossipsub, - libp2p/nameresolving/mockresolver + libp2p/nameresolving/mockresolver, + eth/p2p/discoveryv5/enr import ../../waku/v2/node/waku_node, ../../waku/v2/node/peer_manager, ../../waku/v2/protocol/waku_message, ../../waku/v2/protocol/waku_relay, - ../../waku/v2/utils/peers + ../../waku/v2/protocol/waku_peer_exchange, + ../../waku/v2/utils/peers, + ./testlib/testutils, + ../test_helpers procSuite "WakuNode": @@ -284,3 +289,29 @@ procSuite "WakuNode": node1MultiAddrs.contains(expectedMultiaddress1) await allFutures(node1.stop(), node2.stop()) + + asyncTest "Function fetchPeerExchangePeers succesfully exchanges px peers": + let + node1 = WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0)) + node2 = WakuNode.new(generateKey(), ValidIpAddress.init("0.0.0.0"), Port(0)) + + # Start and mount peer exchange + await allFutures([node1.start(), node2.start()]) + await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()]) + + # Mock that we discovered a node (to avoid running discv5) + var enr = enr.Record() + require enr.fromUri("enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB") + node2.wakuPeerExchange.enrCache.add(enr) + + # Set node2 as service peer (default one) for px protocol + node1.peerManager.addServicePeer(node2.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec) + + # Request 1 peer from peer exchange protocol + await node1.fetchPeerExchangePeers(1) + + # Check that the peer ended up in the peerstore + let rpInfo = enr.toRemotePeerInfo.get() + check: + node1.peerManager.peerStore.peers.anyIt(it.peerId == rpInfo.peerId) + node1.peerManager.peerStore.peers.anyIt(it.addrs == rpInfo.addrs) diff --git a/waku/v2/node/peer_manager/waku_peer_store.nim b/waku/v2/node/peer_manager/waku_peer_store.nim index d8d026ad6..460543c00 100644 --- a/waku/v2/node/peer_manager/waku_peer_store.nim +++ b/waku/v2/node/peer_manager/waku_peer_store.nim @@ -175,5 +175,8 @@ proc getPeersByDirection*(peerStore: PeerStore, direction: PeerDirection): seq[S proc getNotConnectedPeers*(peerStore: PeerStore): seq[StoredInfo] = return peerStore.peers.filterIt(it.connectedness != Connected) +proc getConnectedPeers*(peerStore: PeerStore): seq[StoredInfo] = + return peerStore.peers.filterIt(it.connectedness == Connected) + proc getPeersByProtocol*(peerStore: PeerStore, proto: string): seq[StoredInfo] = return peerStore.peers.filterIt(it.protos.contains(proto)) diff --git a/waku/v2/node/waku_node.nim b/waku/v2/node/waku_node.nim index 0a370d192..ec4f9af84 100644 --- a/waku/v2/node/waku_node.nim +++ b/waku/v2/node/waku_node.nim @@ -960,6 +960,25 @@ proc mountPeerExchange*(node: WakuNode) {.async, raises: [Defect, LPError].} = node.switch.mount(node.wakuPeerExchange, protocolMatcher(WakuPeerExchangeCodec)) +proc fetchPeerExchangePeers*(node: Wakunode, amount: uint64) {.async, raises: [Defect].} = + if node.wakuPeerExchange.isNil(): + error "could not get peers from px, waku peer-exchange is nil" + return + + info "Retrieving peer info via peer exchange protocol" + let pxPeersRes = await node.wakuPeerExchange.request(amount) + if pxPeersRes.isOk: + var validPeers = 0 + for pi in pxPeersRes.get().peerInfos: + var record: enr.Record + if enr.fromBytes(record, pi.enr): + # TODO: Add source: PX + node.peerManager.addPeer(record.toRemotePeerInfo().get, WakuRelayCodec) + validPeers += 1 + info "Retrieved peer info via peer exchange protocol", validPeers = validPeers + else: + warn "Failed to retrieve peer info via peer exchange protocol", error = pxPeersRes.error + # TODO: Move to application module (e.g., wakunode2.nim) proc setPeerExchangePeer*(node: WakuNode, peer: RemotePeerInfo|string) {.raises: [Defect, ValueError, LPError].} = if node.wakuPeerExchange.isNil(): diff --git a/waku/v2/protocol/waku_peer_exchange/protocol.nim b/waku/v2/protocol/waku_peer_exchange/protocol.nim index 4c1f590c7..c78f81b03 100644 --- a/waku/v2/protocol/waku_peer_exchange/protocol.nim +++ b/waku/v2/protocol/waku_peer_exchange/protocol.nim @@ -29,7 +29,7 @@ logScope: const # We add a 64kB safety buffer for protocol overhead. # 10x-multiplier also for safety - MaxRpcSize = 10 * MaxWakuMessageSize + 64 * 1024 # TODO what is the expected size of a PX message? As currently specified, it can contain an arbitary number of ENRs... + MaxRpcSize* = 10 * MaxWakuMessageSize + 64 * 1024 # TODO what is the expected size of a PX message? As currently specified, it can contain an arbitary number of ENRs... MaxCacheSize = 1000 CacheCleanWindow = 200 @@ -49,70 +49,53 @@ type WakuPeerExchange* = ref object of LPProtocol peerManager*: PeerManager wakuDiscv5: Option[WakuDiscoveryV5] - enrCache: seq[enr.Record] # todo: next step: ring buffer; future: implement cache satisfying https://rfc.vac.dev/spec/34/ + enrCache*: seq[enr.Record] # todo: next step: ring buffer; future: implement cache satisfying https://rfc.vac.dev/spec/34/ -proc sendPeerExchangeRpcToPeer(wpx: WakuPeerExchange, rpc: PeerExchangeRpc, peer: RemotePeerInfo | PeerId): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} = +proc request*(wpx: WakuPeerExchange, numPeers: uint64, conn: Connection): Future[WakuPeerExchangeResult[PeerExchangeResponse]] {.async, gcsafe.} = + let rpc = PeerExchangeRpc( + request: PeerExchangeRequest(numPeers: numPeers)) + + var buffer: seq[byte] + try: + await conn.writeLP(rpc.encode().buffer) + buffer = await conn.readLp(MaxRpcSize.int) + except CatchableError as exc: + waku_px_errors.inc(labelValues = [exc.msg]) + return err("write/read failed: " & $exc.msg) + + let decodedBuff = PeerExchangeRpc.decode(buffer) + if decodedBuff.isErr(): + return err("decode failed: " & $decodedBuff.error) + return ok(decodedBuff.get().response) + +proc request*(wpx: WakuPeerExchange, numPeers: uint64, peer: RemotePeerInfo): Future[WakuPeerExchangeResult[PeerExchangeResponse]] {.async, gcsafe.} = let connOpt = await wpx.peerManager.dialPeer(peer, WakuPeerExchangeCodec) if connOpt.isNone(): return err(dialFailure) + return await wpx.request(numPeers, connOpt.get()) - let connection = connOpt.get() - - await connection.writeLP(rpc.encode().buffer) - - return ok() - -proc request(wpx: WakuPeerExchange, numPeers: uint64, peer: RemotePeerInfo): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} = - let rpc = PeerExchangeRpc( - request: PeerExchangeRequest( - numPeers: numPeers - ) - ) - - let res = await wpx.sendPeerExchangeRpcToPeer(rpc, peer) - if res.isErr(): - waku_px_errors.inc(labelValues = [res.error()]) - return err(res.error()) - - return ok() - -proc request*(wpx: WakuPeerExchange, numPeers: uint64): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} = +proc request*(wpx: WakuPeerExchange, numPeers: uint64): Future[WakuPeerExchangeResult[PeerExchangeResponse]] {.async, gcsafe.} = let peerOpt = wpx.peerManager.selectPeer(WakuPeerExchangeCodec) if peerOpt.isNone(): waku_px_errors.inc(labelValues = [peerNotFoundFailure]) return err(peerNotFoundFailure) - return await wpx.request(numPeers, peerOpt.get()) -proc respond(wpx: WakuPeerExchange, enrs: seq[enr.Record], peer: RemotePeerInfo | PeerId): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} = - var peerInfos: seq[PeerExchangePeerInfo] = @[] - for e in enrs: - let pi = PeerExchangePeerInfo( - enr: e.raw - ) - peerInfos.add(pi) - +proc respond(wpx: WakuPeerExchange, enrs: seq[enr.Record], conn: Connection): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} = let rpc = PeerExchangeRpc( response: PeerExchangeResponse( - peerInfos: peerInfos + peerInfos: enrs.mapIt(PeerExchangePeerInfo(enr: it.raw)) ) ) - let res = await wpx.sendPeerExchangeRpcToPeer(rpc, peer) - if res.isErr(): - waku_px_errors.inc(labelValues = [res.error()]) - return err(res.error()) + try: + await conn.writeLP(rpc.encode().buffer) + except CatchableError as exc: + waku_px_errors.inc(labelValues = [exc.msg]) + return err(exc.msg) return ok() -proc respond(wpx: WakuPeerExchange, enrs: seq[enr.Record]): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} = - let peerOpt = wpx.peerManager.selectPeer(WakuPeerExchangeCodec) - if peerOpt.isNone(): - waku_px_errors.inc(labelValues = [peerNotFoundFailure]) - return err(peerNotFoundFailure) - - return await wpx.respond(enrs, peerOpt.get()) - proc cleanCache(wpx: WakuPeerExchange) {.gcsafe.} = wpx.enrCache.delete(0..CacheCleanWindow-1) @@ -147,45 +130,32 @@ proc getEnrsFromCache(wpx: WakuPeerExchange, numPeers: uint64): seq[enr.Record] return @[] for i in 0.. 0: - waku_px_peers_received_unknown.inc(newPeers.len().int64()) - debug "Connecting to newly discovered peers", count=newPeers.len() - await wpx.peerManager.connectToNodes(newPeers, WakuRelayCodec, source = "peer exchange") - wpx.handler = handler wpx.codec = WakuPeerExchangeCodec diff --git a/waku/v2/protocol/waku_peer_exchange/rpc_codec.nim b/waku/v2/protocol/waku_peer_exchange/rpc_codec.nim index cb4925db2..f8916185b 100644 --- a/waku/v2/protocol/waku_peer_exchange/rpc_codec.nim +++ b/waku/v2/protocol/waku_peer_exchange/rpc_codec.nim @@ -83,7 +83,8 @@ proc decode*(T: type PeerExchangeRpc, buffer: seq[byte]): ProtoResult[T] = var rpc = PeerExchangeRpc() var requestBuffer: seq[byte] - discard ?pb.getField(1, requestBuffer) + if not ?pb.getField(1, requestBuffer): + return err(ProtoError.RequiredFieldMissing) rpc.request = ?PeerExchangeRequest.decode(requestBuffer) var responseBuffer: seq[byte]