From da036561640535ba6782d2899f5fa12cce9946c1 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Wed, 5 Aug 2020 20:19:52 -0600 Subject: [PATCH] fix leaks --- libp2p/stream/chronosstream.nim | 1 + libp2p/transports/tcptransport.nim | 18 +++--------------- tests/helpers.nim | 2 +- tests/testtransport.nim | 12 ++++++++++-- 4 files changed, 15 insertions(+), 18 deletions(-) diff --git a/libp2p/stream/chronosstream.nim b/libp2p/stream/chronosstream.nim index 9ec25f30e..d8d6e5ece 100644 --- a/libp2p/stream/chronosstream.nim +++ b/libp2p/stream/chronosstream.nim @@ -33,6 +33,7 @@ method initStream*(s: ChronosStream) = proc init*(C: type ChronosStream, client: StreamTransport, + dir: Direction, timeout = DefaultChronosStreamTimeout): ChronosStream = result = C(client: client, timeout: timeout) diff --git a/libp2p/transports/tcptransport.nim b/libp2p/transports/tcptransport.nim index 0bb2df4d3..29c47bc08 100644 --- a/libp2p/transports/tcptransport.nim +++ b/libp2p/transports/tcptransport.nim @@ -100,20 +100,8 @@ proc connHandler*(t: TcpTransport, if not isNil(t.handler): t.handlers &= t.handler(conn) - proc cleanup() {.async.} = - try: - await client.join() - trace "cleaning up client", addrs = $client.remoteAddress, connoid = $conn.oid - if not(isNil(conn)): - await conn.close() - t.clients.keepItIf(it != client) - except CancelledError as exc: - raise exc - except CatchableError as exc: - trace "error cleaning up client", exc = exc.msg - - t.clients.add(client) - asyncCheck cleanup() + t.conns.incl(stream) + asyncCheck t.cleanup(stream) result = conn proc connCb(server: StreamServer, @@ -159,7 +147,7 @@ method close*(t: TcpTransport) {.async, gcsafe.} = await procCall Transport(t).close() # call base checkFutures(await allFinished( - toSeq(t.conns).mapIt(it.client.closeWait()))) + toSeq(t.conns).mapIt(it.close()))) # server can be nil if not isNil(t.server): diff --git a/tests/helpers.nim b/tests/helpers.nim index 2e10c0153..4d8368700 100644 --- a/tests/helpers.nim +++ b/tests/helpers.nim @@ -10,7 +10,7 @@ const StreamServerTrackerName = "stream.server" trackerNames = [ - # ConnectionTrackerName, + ConnectionTrackerName, BufferStreamTrackerName, TcpTransportTrackerName, StreamTransportTrackerName, diff --git a/tests/testtransport.nim b/tests/testtransport.nim index 1a41268f3..144d699a3 100644 --- a/tests/testtransport.nim +++ b/tests/testtransport.nim @@ -205,14 +205,18 @@ suite "TCP transport": transports.add(TcpTransport.init(maxIncoming = 2)) asyncCheck transports[0].listen(ma, connHandler) + var conns: seq[Connection] try: for i in 0..10: let transport = TcpTransport.init() transports.add(transport) - discard await transport.dial(transports[0].ma).wait(10.millis) + conns.add(await transport.dial(transports[0].ma).wait(10.millis)) except AsyncTimeoutError: check times == 2 + await allFuturesThrowing( + conns.mapIt(it.close())) + await allFuturesThrowing( transports.mapIt(it.close())) @@ -229,14 +233,18 @@ suite "TCP transport": transports.add(TcpTransport.init()) asyncCheck transports[0].listen(ma, connHandler) + var conns: seq[Connection] try: let transport = TcpTransport.init(maxOutgoing = 2) transports.add(transport) for i in 0..10: - discard await transport.dial(transports[0].ma) + conns.add(await transport.dial(transports[0].ma)) except TooManyConnections: check times == 2 + await allFuturesThrowing( + conns.mapIt(it.close())) + await allFuturesThrowing( transports.mapIt(it.close()))