diff --git a/tests/v2/test_waku_peer_exchange.nim b/tests/v2/test_waku_peer_exchange.nim index 0e87239a9..ae23786a2 100644 --- a/tests/v2/test_waku_peer_exchange.nim +++ b/tests/v2/test_waku_peer_exchange.nim @@ -1,7 +1,7 @@ {.used.} import - std/[options, sequtils], + std/[options, sequtils, tables], testutils/unittests, chronos, chronicles, @@ -9,6 +9,8 @@ import libp2p/switch, libp2p/peerId, libp2p/crypto/crypto, + libp2p/multistream, + libp2p/muxers/muxer, eth/keys, eth/p2p/discoveryv5/enr import @@ -18,6 +20,7 @@ import ../../waku/v2/waku_peer_exchange, ../../waku/v2/waku_peer_exchange/rpc, ../../waku/v2/waku_peer_exchange/rpc_codec, + ../../waku/v2/waku_peer_exchange/protocol, ./testlib/wakucore, ./testlib/wakunode @@ -259,3 +262,25 @@ procSuite "Waku Peer Exchange": # Check that it failed gracefully check: response.isErr + + + asyncTest "connections are closed after response is sent": + # Create 3 nodes + let nodes = toSeq(0..<3).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0))) + + await allFutures(nodes.mapIt(it.start())) + await allFutures(nodes.mapIt(it.mountPeerExchange())) + + # Multiple nodes request to node 0 + for i in 1..<3: + let resp = await nodes[i].wakuPeerExchange.request(2, nodes[0].switch.peerInfo.toRemotePeerInfo()) + require resp.isOk + + # Wait for streams to be closed + await sleepAsync(1.seconds) + + # Check that all streams are closed for px + check: + nodes[0].peerManager.getNumStreams(WakuPeerExchangeCodec) == (0, 0) + nodes[1].peerManager.getNumStreams(WakuPeerExchangeCodec) == (0, 0) + nodes[2].peerManager.getNumStreams(WakuPeerExchangeCodec) == (0, 0) diff --git a/waku/v2/waku_peer_exchange/protocol.nim b/waku/v2/waku_peer_exchange/protocol.nim index b07888c2a..73cc2b969 100644 --- a/waku/v2/waku_peer_exchange/protocol.nim +++ b/waku/v2/waku_peer_exchange/protocol.nim @@ -55,12 +55,19 @@ proc request*(wpx: WakuPeerExchange, numPeers: uint64, conn: Connection): Future request: PeerExchangeRequest(numPeers: numPeers)) var buffer: seq[byte] + var error: string try: await conn.writeLP(rpc.encode().buffer) buffer = await conn.readLp(MaxRpcSize.int) except CatchableError as exc: waku_px_errors.inc(labelValues = [exc.msg]) - return err("write/read failed: " & $exc.msg) + error = $exc.msg + finally: + # close, no more data is expected + await conn.closeWithEof() + + if error.len > 0: + return err("write/read failed: " & error) let decodedBuff = PeerExchangeRpc.decode(buffer) if decodedBuff.isErr(): @@ -155,6 +162,9 @@ proc initProtocolHandler(wpx: WakuPeerExchange) = else: waku_px_peers_sent.inc(enrs.len().int64()) + # close, no data is expected + await conn.closeWithEof() + wpx.handler = handler wpx.codec = WakuPeerExchangeCodec