diff --git a/tests/node/test_wakunode_peer_exchange.nim b/tests/node/test_wakunode_peer_exchange.nim index 3996be0dc..3075fa83f 100644 --- a/tests/node/test_wakunode_peer_exchange.nim +++ b/tests/node/test_wakunode_peer_exchange.nim @@ -78,11 +78,11 @@ suite "Waku Peer Exchange": check: node.peerManager.switch.peerStore.peers.len == 0 res.error.status_code == SERVICE_UNAVAILABLE - res.error.status_desc == some("PeerExchange is not mounted") + res.error.status_desc == some("PeerExchangeClient is not mounted") asyncTest "Node fetches with mounted peer exchange, but no peers": # Given a node with peer exchange mounted - await node.mountPeerExchange() + await node.mountPeerExchangeClient() # When a node fetches peers let res = await node.fetchPeerExchangePeers(1) @@ -95,7 +95,7 @@ suite "Waku Peer Exchange": asyncTest "Node succesfully exchanges px peers with faked discv5": # Given both nodes mount peer exchange - await allFutures([node.mountPeerExchange(), node2.mountPeerExchange()]) + await allFutures([node.mountPeerExchangeClient(), node2.mountPeerExchange()]) check node.peerManager.switch.peerStore.peers.len == 0 # Mock that we discovered a node (to avoid running discv5) @@ -271,6 +271,7 @@ suite "Waku Peer Exchange with discv5": # Mount peer exchange await node1.mountPeerExchange() await node3.mountPeerExchange() + await node3.mountPeerExchangeClient() let dialResponse = await node3.dialForPeerExchange(node1.switch.peerInfo.toRemotePeerInfo()) diff --git a/tests/waku_peer_exchange/test_protocol.nim b/tests/waku_peer_exchange/test_protocol.nim index 8f7f20574..1d10cf270 100644 --- a/tests/waku_peer_exchange/test_protocol.nim +++ b/tests/waku_peer_exchange/test_protocol.nim @@ -16,6 +16,7 @@ import waku_peer_exchange/rpc, waku_peer_exchange/rpc_codec, waku_peer_exchange/protocol, + waku_peer_exchange/client, node/peer_manager, waku_core, common/enr/builder, @@ -145,7 +146,7 @@ suite "Waku Peer Exchange": # Start and mount peer exchange await allFutures([node1.start(), node2.start()]) - await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()]) + await allFutures([node1.mountPeerExchange(), node2.mountPeerExchangeClient()]) # Create connection let connOpt = await node2.peerManager.dialPeer( @@ -168,10 +169,10 @@ suite "Waku Peer Exchange": node1.wakuPeerExchange.enrCache.add(enr2) # Request 2 peer from px. Test all request variants - let response1 = await node2.wakuPeerExchange.request(2) + let response1 = await node2.wakuPeerExchangeClient.request(2) let response2 = - await node2.wakuPeerExchange.request(2, node1.peerInfo.toRemotePeerInfo()) - let response3 = await node2.wakuPeerExchange.request(2, connOpt.get()) + await node2.wakuPeerExchangeClient.request(2, node1.peerInfo.toRemotePeerInfo()) + let response3 = await node2.wakuPeerExchangeClient.request(2, connOpt.get()) # Check the response or dont even continue require: @@ -213,7 +214,7 @@ suite "Waku Peer Exchange": await connOpt.get().close() # Request 2 peer from px - let response = await node1.wakuPeerExchange.request(2, connOpt.get()) + let response = await node1.wakuPeerExchangeClient.request(2, connOpt.get()) # Check that it failed gracefully check: @@ -225,10 +226,10 @@ suite "Waku Peer Exchange": let switch = newTestSwitch() peerManager = PeerManager.new(switch) - peerExchange = WakuPeerExchange.new(peerManager) + peerExchangeClient = WakuPeerExchangeClient.new(peerManager) # When requesting 0 peers - let response = await peerExchange.request(0) + let response = await peerExchangeClient.request(0) # Then the response should be an error check: @@ -278,7 +279,7 @@ suite "Waku Peer Exchange": # Start and mount peer exchange await allFutures([node1.start(), node2.start()]) - await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()]) + await allFutures([node1.mountPeerExchange(), node2.mountPeerExchangeClient()]) # Connect the nodes let dialResponse = await node2.peerManager.dialPeer( @@ -294,7 +295,7 @@ suite "Waku Peer Exchange": node1.wakuPeerExchange.enrCache.add(record) # When requesting 0 peers - let response = await node1.wakuPeerExchange.request(0) + let response = await node2.wakuPeerExchangeClient.request(0) # Then the response should be empty assertResultOk(response) @@ -310,19 +311,19 @@ suite "Waku Peer Exchange": # Start and mount peer exchange await allFutures([node1.start(), node2.start()]) - await allFutures([node1.mountPeerExchange(), node2.mountPeerExchange()]) + await allFutures([node1.mountPeerExchangeClient(), node2.mountPeerExchange()]) # Mock that we have discovered one enr var record = enr.Record() check record.fromUri( "enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB" ) - node1.wakuPeerExchange.enrCache.add(record) + node2.wakuPeerExchange.enrCache.add(record) # When making any request with an invalid peer info var remotePeerInfo2 = node2.peerInfo.toRemotePeerInfo() remotePeerInfo2.peerId.data.add(255.byte) - let response = await node1.wakuPeerExchange.request(1, remotePeerInfo2) + let response = await node1.wakuPeerExchangeClient.request(1, remotePeerInfo2) # Then the response should be an error check: @@ -337,10 +338,11 @@ suite "Waku Peer Exchange": await allFutures(nodes.mapIt(it.start())) await allFutures(nodes.mapIt(it.mountPeerExchange())) + await allFutures(nodes.mapIt(it.mountPeerExchangeClient())) # Multiple nodes request to node 0 for i in 1 ..< 3: - let resp = await nodes[i].wakuPeerExchange.request( + let resp = await nodes[i].wakuPeerExchangeClient.request( 2, nodes[0].switch.peerInfo.toRemotePeerInfo() ) require resp.isOk @@ -409,7 +411,7 @@ suite "Waku Peer Exchange": await allFutures( [ node1.mountPeerExchange(rateLimit = (1, 150.milliseconds)), - node2.mountPeerExchange(), + node2.mountPeerExchangeClient(), ] ) @@ -436,19 +438,19 @@ suite "Waku Peer Exchange": await sleepAsync(150.milliseconds) # Request 2 peer from px. Test all request variants - let response1 = await node2.wakuPeerExchange.request(1) + let response1 = await node2.wakuPeerExchangeClient.request(1) check: response1.isOk response1.get().peerInfos.len == 1 let response2 = - await node2.wakuPeerExchange.request(1, node1.peerInfo.toRemotePeerInfo()) + await node2.wakuPeerExchangeClient.request(1, node1.peerInfo.toRemotePeerInfo()) check: response2.isErr response2.error().status_code == PeerExchangeResponseStatusCode.TOO_MANY_REQUESTS await sleepAsync(150.milliseconds) - let response3 = await node2.wakuPeerExchange.request(1, connOpt.get()) + let response3 = await node2.wakuPeerExchangeClient.request(1, connOpt.get()) check: response3.isOk response3.get().peerInfos.len == 1 diff --git a/tests/waku_peer_exchange/utils.nim b/tests/waku_peer_exchange/utils.nim index 88d28c5ee..ce7660bf0 100644 --- a/tests/waku_peer_exchange/utils.nim +++ b/tests/waku_peer_exchange/utils.nim @@ -17,6 +17,7 @@ import waku_peer_exchange, waku_peer_exchange/rpc, waku_peer_exchange/protocol, + waku_peer_exchange/client, node/peer_manager, waku_core, ], @@ -40,7 +41,8 @@ proc dialForPeerExchange*( require connOpt.isSome() await sleepAsync(FUTURE_TIMEOUT_SHORT) - let response = await client.wakuPeerExchange.request(requestedPeers, connOpt.get()) + let response = + await client.wakuPeerExchangeClient.request(requestedPeers, connOpt.get()) assertResultOk(response) if uint64(response.get().peerInfos.len) > minimumPeers: diff --git a/waku/factory/conf_builder/waku_conf_builder.nim b/waku/factory/conf_builder/waku_conf_builder.nim index 32631e1d7..e351d6839 100644 --- a/waku/factory/conf_builder/waku_conf_builder.nim +++ b/waku/factory/conf_builder/waku_conf_builder.nim @@ -615,8 +615,10 @@ proc build*( protectedShards: protectedShards, relay: relay, lightPush: lightPush, - peerExchange: peerExchange, + peerExchangeService: peerExchange, rendezvous: rendezvous, + peerExchangeDiscovery: true, + # enabling peer exchange client by default for quicker bootstrapping remoteStoreNode: builder.remoteStoreNode, remoteLightPushNode: builder.remoteLightPushNode, remoteFilterNode: builder.remoteFilterNode, diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index a944379e9..1a18d8c72 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -412,7 +412,7 @@ proc setupProtocols( return err("failed to set node waku filter peer: " & filterNode.error) # waku peer exchange setup - if conf.peerExchange: + if conf.peerExchangeService: try: await mountPeerExchange( node, some(conf.clusterId), node.rateLimitSettings.getSetting(PEEREXCHG) @@ -429,6 +429,8 @@ proc setupProtocols( return err("failed to set node waku peer-exchange peer: " & peerExchangeNode.error) + if conf.peerExchangeDiscovery: + await node.mountPeerExchangeClient() return ok() ## Start node @@ -473,7 +475,7 @@ proc startNode*( # # Use px to periodically get peers if discv5 is disabled, as discv5 nodes have their own # periodic loop to find peers and px returned peers actually come from discv5 - if conf.peerExchange and not conf.discv5Conf.isSome(): + if conf.peerExchangeDiscovery and not conf.discv5Conf.isSome(): node.startPeerExchangeLoop() # Maintain relay connections diff --git a/waku/factory/waku_conf.nim b/waku/factory/waku_conf.nim index 4a0504906..c492b8450 100644 --- a/waku/factory/waku_conf.nim +++ b/waku/factory/waku_conf.nim @@ -85,7 +85,8 @@ type WakuConf* {.requiresInit.} = ref object relay*: bool lightPush*: bool - peerExchange*: bool + peerExchangeService*: bool + peerExchangeDiscovery*: bool # TODO: remove relay peer exchange relayPeerExchange*: bool @@ -145,7 +146,7 @@ proc logConf*(conf: WakuConf) = store = conf.storeServiceConf.isSome(), filter = conf.filterServiceConf.isSome(), lightPush = conf.lightPush, - peerExchange = conf.peerExchange + peerExchange = conf.peerExchangeService info "Configuration. Network", cluster = conf.clusterId diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 7fddd7d63..a59fff522 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -111,6 +111,7 @@ type wakuLightPush*: WakuLightPush wakuLightpushClient*: WakuLightPushClient wakuPeerExchange*: WakuPeerExchange + wakuPeerExchangeClient*: WakuPeerExchangeClient wakuMetadata*: WakuMetadata wakuAutoSharding*: Option[Sharding] enr*: enr.Record @@ -1281,21 +1282,26 @@ proc mountPeerExchange*( except LPError: error "failed to mount wakuPeerExchange", error = getCurrentExceptionMsg() +proc mountPeerExchangeClient*(node: WakuNode) {.async: (raises: []).} = + info "mounting waku peer exchange client" + if node.wakuPeerExchangeClient.isNil(): + node.wakuPeerExchangeClient = WakuPeerExchangeClient.new(node.peerManager) + proc fetchPeerExchangePeers*( node: Wakunode, amount = DefaultPXNumPeersReq ): Future[Result[int, PeerExchangeResponseStatus]] {.async: (raises: []).} = - if node.wakuPeerExchange.isNil(): - error "could not get peers from px, waku peer-exchange is nil" + if node.wakuPeerExchangeClient.isNil(): + error "could not get peers from px, waku peer-exchange-client is nil" return err( ( status_code: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE, - status_desc: some("PeerExchange is not mounted"), + status_desc: some("PeerExchangeClient is not mounted"), ) ) info "Retrieving peer info via peer exchange protocol", amount - let pxPeersRes = await node.wakuPeerExchange.request(amount) - if pxPeersRes.isOk: + let pxPeersRes = await node.wakuPeerExchangeClient.request(amount) + if pxPeersRes.isOk(): var validPeers = 0 let peers = pxPeersRes.get().peerInfos for pi in peers: @@ -1313,17 +1319,19 @@ proc fetchPeerExchangePeers*( proc peerExchangeLoop(node: WakuNode) {.async.} = while true: - await sleepAsync(1.minutes) if not node.started: + await sleepAsync(5.seconds) continue (await node.fetchPeerExchangePeers()).isOkOr: warn "Cannot fetch peers from peer exchange", cause = error + await sleepAsync(1.minutes) proc startPeerExchangeLoop*(node: WakuNode) = - if node.wakuPeerExchange.isNil(): + if node.wakuPeerExchangeClient.isNil(): error "startPeerExchangeLoop: Peer Exchange is not mounted" return - node.wakuPeerExchange.pxLoopHandle = node.peerExchangeLoop() + info "Starting peer exchange loop" + node.wakuPeerExchangeClient.pxLoopHandle = node.peerExchangeLoop() # TODO: Move to application module (e.g., wakunode2.nim) proc setPeerExchangePeer*( @@ -1561,6 +1569,10 @@ proc stop*(node: WakuNode) {.async.} = if not node.wakuPeerExchange.isNil() and not node.wakuPeerExchange.pxLoopHandle.isNil(): await node.wakuPeerExchange.pxLoopHandle.cancelAndWait() + if not node.wakuPeerExchangeClient.isNil() and + not node.wakuPeerExchangeClient.pxLoopHandle.isNil(): + await node.wakuPeerExchangeClient.pxLoopHandle.cancelAndWait() + if not node.wakuRendezvous.isNil(): await node.wakuRendezvous.stopWait() diff --git a/waku/waku_peer_exchange.nim b/waku/waku_peer_exchange.nim index 994989df4..15298423b 100644 --- a/waku/waku_peer_exchange.nim +++ b/waku/waku_peer_exchange.nim @@ -1,5 +1,5 @@ {.push raises: [].} -import ./waku_peer_exchange/[protocol, rpc] +import ./waku_peer_exchange/[protocol, rpc, common, client] -export protocol, rpc +export protocol, rpc, common, client diff --git a/waku/waku_peer_exchange/client.nim b/waku/waku_peer_exchange/client.nim new file mode 100644 index 000000000..b11ce9e13 --- /dev/null +++ b/waku/waku_peer_exchange/client.nim @@ -0,0 +1,102 @@ +import std/options, results, chronicles, chronos, metrics + +import ./common, ./rpc, ./rpc_codec, ../node/peer_manager + +from ../waku_core/codecs import WakuPeerExchangeCodec + +declarePublicGauge waku_px_peers_received_total, + "number of ENRs received via peer exchange" +declarePublicCounter waku_px_client_errors, "number of peer exchange errors", ["type"] + +logScope: + topics = "waku peer_exchange client" + +type WakuPeerExchangeClient* = ref object + peerManager*: PeerManager + pxLoopHandle*: Future[void] + +proc new*(T: type WakuPeerExchangeClient, peerManager: PeerManager): T = + WakuPeerExchangeClient(peerManager: peerManager) + +proc request*( + wpx: WakuPeerExchangeClient, numPeers = DefaultPXNumPeersReq, conn: Connection +): Future[WakuPeerExchangeResult[PeerExchangeResponse]] {.async: (raises: []).} = + let rpc = PeerExchangeRpc.makeRequest(numPeers) + + var buffer: seq[byte] + var callResult = + (status_code: PeerExchangeResponseStatusCode.SUCCESS, status_desc: none(string)) + try: + await conn.writeLP(rpc.encode().buffer) + buffer = await conn.readLp(DefaultMaxRpcSize.int) + except CatchableError as exc: + error "exception when handling peer exchange request", error = exc.msg + waku_px_client_errors.inc(labelValues = ["error_sending_or_receiving_px_req"]) + callResult = ( + status_code: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE, + status_desc: some($exc.msg), + ) + finally: + # close, no more data is expected + await conn.closeWithEof() + + if callResult.status_code != PeerExchangeResponseStatusCode.SUCCESS: + error "peer exchange request failed", status_code = callResult.status_code + return err(callResult) + + let decoded = PeerExchangeRpc.decode(buffer).valueOr: + error "peer exchange request error decoding buffer", error = $error + return err( + ( + status_code: PeerExchangeResponseStatusCode.BAD_RESPONSE, + status_desc: some($error), + ) + ) + if decoded.response.status_code != PeerExchangeResponseStatusCode.SUCCESS: + error "peer exchange request error", status_code = decoded.response.status_code + return err( + ( + status_code: decoded.response.status_code, + status_desc: decoded.response.status_desc, + ) + ) + + return ok(decoded.response) + +proc request*( + wpx: WakuPeerExchangeClient, numPeers = DefaultPXNumPeersReq, peer: RemotePeerInfo +): Future[WakuPeerExchangeResult[PeerExchangeResponse]] {.async: (raises: []).} = + try: + let connOpt = await wpx.peerManager.dialPeer(peer, WakuPeerExchangeCodec) + if connOpt.isNone(): + error "error in request connOpt is none" + return err( + ( + status_code: PeerExchangeResponseStatusCode.DIAL_FAILURE, + status_desc: some(dialFailure), + ) + ) + return await wpx.request(numPeers, connOpt.get()) + except CatchableError: + error "peer exchange request exception", error = getCurrentExceptionMsg() + return err( + ( + status_code: PeerExchangeResponseStatusCode.BAD_RESPONSE, + status_desc: some("exception dialing peer: " & getCurrentExceptionMsg()), + ) + ) + +proc request*( + wpx: WakuPeerExchangeClient, numPeers = DefaultPXNumPeersReq +): Future[WakuPeerExchangeResult[PeerExchangeResponse]] {.async: (raises: []).} = + let peerOpt = wpx.peerManager.selectPeer(WakuPeerExchangeCodec) + if peerOpt.isNone(): + waku_px_client_errors.inc(labelValues = [peerNotFoundFailure]) + error "peer exchange error peerOpt is none" + return err( + ( + status_code: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE, + status_desc: some(peerNotFoundFailure), + ) + ) + return await wpx.request(numPeers, peerOpt.get()) diff --git a/waku/waku_peer_exchange/common.nim b/waku/waku_peer_exchange/common.nim new file mode 100644 index 000000000..85d7398b1 --- /dev/null +++ b/waku/waku_peer_exchange/common.nim @@ -0,0 +1,21 @@ +import results, metrics, chronos +import ./rpc, ../waku_core + +const + # We add a 64kB safety buffer for protocol overhead. + # 10x-multiplier also for safety + DefaultMaxRpcSize* = 10 * DefaultMaxWakuMessageSize + 64 * 1024 + # TODO what is the expected size of a PX message? As currently specified, it can contain an arbitrary number of ENRs... + MaxPeersCacheSize* = 60 + CacheRefreshInterval* = 10.minutes + DefaultPXNumPeersReq* = 5.uint64() + +# Error types (metric label values) +const + dialFailure* = "dial_failure" + peerNotFoundFailure* = "peer_not_found_failure" + decodeRpcFailure* = "decode_rpc_failure" + retrievePeersDiscv5Error* = "retrieve_peers_discv5_failure" + pxFailure* = "px_failure" + +type WakuPeerExchangeResult*[T] = Result[T, PeerExchangeResponseStatus] diff --git a/waku/waku_peer_exchange/protocol.nim b/waku/waku_peer_exchange/protocol.nim index 14de77c67..296ab69cc 100644 --- a/waku/waku_peer_exchange/protocol.nim +++ b/waku/waku_peer_exchange/protocol.nim @@ -14,136 +14,29 @@ import ../discovery/waku_discv5, ./rpc, ./rpc_codec, - ../common/rate_limit/request_limiter + ../common/rate_limit/request_limiter, + ./common from ../waku_core/codecs import WakuPeerExchangeCodec export WakuPeerExchangeCodec -declarePublicGauge waku_px_peers_received_total, - "number of ENRs received via peer exchange" declarePublicGauge waku_px_peers_received_unknown, "number of previously unknown ENRs received via peer exchange" -declarePublicCounter waku_px_peers_sent, - "number of ENRs sent to peer exchange requesters" declarePublicGauge waku_px_peers_cached, "number of peer exchange peer ENRs cached" declarePublicCounter waku_px_errors, "number of peer exchange errors", ["type"] +declarePublicCounter waku_px_peers_sent, + "number of ENRs sent to peer exchange requesters" logScope: topics = "waku peer_exchange" -const - # We add a 64kB safety buffer for protocol overhead. - # 10x-multiplier also for safety - DefaultMaxRpcSize* = 10 * DefaultMaxWakuMessageSize + 64 * 1024 - # TODO what is the expected size of a PX message? As currently specified, it can contain an arbitary number of ENRs... - MaxPeersCacheSize = 60 - CacheRefreshInterval = 10.minutes - DefaultPXNumPeersReq* = 5.uint64() - -# Error types (metric label values) -const - dialFailure = "dial_failure" - peerNotFoundFailure = "peer_not_found_failure" - decodeRpcFailure = "decode_rpc_failure" - retrievePeersDiscv5Error = "retrieve_peers_discv5_failure" - pxFailure = "px_failure" - -type - WakuPeerExchangeResult*[T] = Result[T, PeerExchangeResponseStatus] - - WakuPeerExchange* = ref object of LPProtocol - peerManager*: PeerManager - enrCache*: seq[enr.Record] - cluster*: Option[uint16] - # todo: next step: ring buffer; future: implement cache satisfying https://rfc.vac.dev/spec/34/ - requestRateLimiter*: RequestRateLimiter - pxLoopHandle*: Future[void] - -proc request*( - wpx: WakuPeerExchange, numPeers = DefaultPXNumPeersReq, conn: Connection -): Future[WakuPeerExchangeResult[PeerExchangeResponse]] {.async: (raises: []).} = - let rpc = PeerExchangeRpc.makeRequest(numPeers) - - var buffer: seq[byte] - var callResult = - (status_code: PeerExchangeResponseStatusCode.SUCCESS, status_desc: none(string)) - try: - await conn.writeLP(rpc.encode().buffer) - buffer = await conn.readLp(DefaultMaxRpcSize.int) - except CatchableError as exc: - error "exception when handling peer exchange request", - error = getCurrentExceptionMsg() - waku_px_errors.inc(labelValues = [exc.msg]) - callResult = ( - status_code: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE, - status_desc: some($exc.msg), - ) - finally: - # close, no more data is expected - await conn.closeWithEof() - - if callResult.status_code != PeerExchangeResponseStatusCode.SUCCESS: - error "peer exchange request failed", status_code = callResult.status_code - return err(callResult) - - let decodedBuff = PeerExchangeRpc.decode(buffer) - if decodedBuff.isErr(): - error "peer exchange request error decoding buffer", error = $decodedBuff.error - return err( - ( - status_code: PeerExchangeResponseStatusCode.BAD_RESPONSE, - status_desc: some($decodedBuff.error), - ) - ) - if decodedBuff.get().response.status_code != PeerExchangeResponseStatusCode.SUCCESS: - error "peer exchange request error", - status_code = decodedBuff.get().response.status_code - return err( - ( - status_code: decodedBuff.get().response.status_code, - status_desc: decodedBuff.get().response.status_desc, - ) - ) - - return ok(decodedBuff.get().response) - -proc request*( - wpx: WakuPeerExchange, numPeers = DefaultPXNumPeersReq, peer: RemotePeerInfo -): Future[WakuPeerExchangeResult[PeerExchangeResponse]] {.async: (raises: []).} = - try: - let connOpt = await wpx.peerManager.dialPeer(peer, WakuPeerExchangeCodec) - if connOpt.isNone(): - error "error in request connOpt is none" - return err( - ( - status_code: PeerExchangeResponseStatusCode.DIAL_FAILURE, - status_desc: some(dialFailure), - ) - ) - return await wpx.request(numPeers, connOpt.get()) - except CatchableError: - error "peer exchange request exception", error = getCurrentExceptionMsg() - return err( - ( - status_code: PeerExchangeResponseStatusCode.BAD_RESPONSE, - status_desc: some("exception dialing peer: " & getCurrentExceptionMsg()), - ) - ) - -proc request*( - wpx: WakuPeerExchange, numPeers = DefaultPXNumPeersReq -): Future[WakuPeerExchangeResult[PeerExchangeResponse]] {.async: (raises: []).} = - let peerOpt = wpx.peerManager.selectPeer(WakuPeerExchangeCodec) - if peerOpt.isNone(): - waku_px_errors.inc(labelValues = [peerNotFoundFailure]) - error "peer exchange error peerOpt is none" - return err( - ( - status_code: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE, - status_desc: some(peerNotFoundFailure), - ) - ) - return await wpx.request(numPeers, peerOpt.get()) +type WakuPeerExchange* = ref object of LPProtocol + peerManager*: PeerManager + enrCache*: seq[enr.Record] + cluster*: Option[uint16] + # todo: next step: ring buffer; future: implement cache satisfying https://rfc.vac.dev/spec/34/ + requestRateLimiter*: RequestRateLimiter + pxLoopHandle*: Future[void] proc respond( wpx: WakuPeerExchange, enrs: seq[enr.Record], conn: Connection