chore(px): close px streams after resp is sent (#1746)

This commit is contained in:
Alvaro Revuelta 2023-05-25 16:42:48 +02:00 committed by GitHub
parent 35520bd0a5
commit 3c2d2891e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 37 additions and 2 deletions

View File

@ -1,7 +1,7 @@
{.used.} {.used.}
import import
std/[options, sequtils], std/[options, sequtils, tables],
testutils/unittests, testutils/unittests,
chronos, chronos,
chronicles, chronicles,
@ -9,6 +9,8 @@ import
libp2p/switch, libp2p/switch,
libp2p/peerId, libp2p/peerId,
libp2p/crypto/crypto, libp2p/crypto/crypto,
libp2p/multistream,
libp2p/muxers/muxer,
eth/keys, eth/keys,
eth/p2p/discoveryv5/enr eth/p2p/discoveryv5/enr
import import
@ -18,6 +20,7 @@ import
../../waku/v2/waku_peer_exchange, ../../waku/v2/waku_peer_exchange,
../../waku/v2/waku_peer_exchange/rpc, ../../waku/v2/waku_peer_exchange/rpc,
../../waku/v2/waku_peer_exchange/rpc_codec, ../../waku/v2/waku_peer_exchange/rpc_codec,
../../waku/v2/waku_peer_exchange/protocol,
./testlib/wakucore, ./testlib/wakucore,
./testlib/wakunode ./testlib/wakunode
@ -259,3 +262,25 @@ procSuite "Waku Peer Exchange":
# Check that it failed gracefully # Check that it failed gracefully
check: response.isErr check: response.isErr
asyncTest "connections are closed after response is sent":
# Create 3 nodes
let nodes = toSeq(0..<3).mapIt(newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)))
await allFutures(nodes.mapIt(it.start()))
await allFutures(nodes.mapIt(it.mountPeerExchange()))
# Multiple nodes request to node 0
for i in 1..<3:
let resp = await nodes[i].wakuPeerExchange.request(2, nodes[0].switch.peerInfo.toRemotePeerInfo())
require resp.isOk
# Wait for streams to be closed
await sleepAsync(1.seconds)
# Check that all streams are closed for px
check:
nodes[0].peerManager.getNumStreams(WakuPeerExchangeCodec) == (0, 0)
nodes[1].peerManager.getNumStreams(WakuPeerExchangeCodec) == (0, 0)
nodes[2].peerManager.getNumStreams(WakuPeerExchangeCodec) == (0, 0)

View File

@ -55,12 +55,19 @@ proc request*(wpx: WakuPeerExchange, numPeers: uint64, conn: Connection): Future
request: PeerExchangeRequest(numPeers: numPeers)) request: PeerExchangeRequest(numPeers: numPeers))
var buffer: seq[byte] var buffer: seq[byte]
var error: string
try: try:
await conn.writeLP(rpc.encode().buffer) await conn.writeLP(rpc.encode().buffer)
buffer = await conn.readLp(MaxRpcSize.int) buffer = await conn.readLp(MaxRpcSize.int)
except CatchableError as exc: except CatchableError as exc:
waku_px_errors.inc(labelValues = [exc.msg]) waku_px_errors.inc(labelValues = [exc.msg])
return err("write/read failed: " & $exc.msg) error = $exc.msg
finally:
# close, no more data is expected
await conn.closeWithEof()
if error.len > 0:
return err("write/read failed: " & error)
let decodedBuff = PeerExchangeRpc.decode(buffer) let decodedBuff = PeerExchangeRpc.decode(buffer)
if decodedBuff.isErr(): if decodedBuff.isErr():
@ -155,6 +162,9 @@ proc initProtocolHandler(wpx: WakuPeerExchange) =
else: else:
waku_px_peers_sent.inc(enrs.len().int64()) waku_px_peers_sent.inc(enrs.len().int64())
# close, no data is expected
await conn.closeWithEof()
wpx.handler = handler wpx.handler = handler
wpx.codec = WakuPeerExchangeCodec wpx.codec = WakuPeerExchangeCodec