fix leaks

This commit is contained in:
Dmitriy Ryajov 2020-08-05 20:19:52 -06:00
parent fef6c43967
commit da03656164
No known key found for this signature in database
GPG Key ID: DA8C680CE7C657A4
4 changed files with 15 additions and 18 deletions

View File

@ -33,6 +33,7 @@ method initStream*(s: ChronosStream) =
proc init*(C: type ChronosStream, proc init*(C: type ChronosStream,
client: StreamTransport, client: StreamTransport,
dir: Direction,
timeout = DefaultChronosStreamTimeout): ChronosStream = timeout = DefaultChronosStreamTimeout): ChronosStream =
result = C(client: client, result = C(client: client,
timeout: timeout) timeout: timeout)

View File

@ -100,20 +100,8 @@ proc connHandler*(t: TcpTransport,
if not isNil(t.handler): if not isNil(t.handler):
t.handlers &= t.handler(conn) t.handlers &= t.handler(conn)
proc cleanup() {.async.} = t.conns.incl(stream)
try: asyncCheck t.cleanup(stream)
await client.join()
trace "cleaning up client", addrs = $client.remoteAddress, connoid = $conn.oid
if not(isNil(conn)):
await conn.close()
t.clients.keepItIf(it != client)
except CancelledError as exc:
raise exc
except CatchableError as exc:
trace "error cleaning up client", exc = exc.msg
t.clients.add(client)
asyncCheck cleanup()
result = conn result = conn
proc connCb(server: StreamServer, proc connCb(server: StreamServer,
@ -159,7 +147,7 @@ method close*(t: TcpTransport) {.async, gcsafe.} =
await procCall Transport(t).close() # call base await procCall Transport(t).close() # call base
checkFutures(await allFinished( checkFutures(await allFinished(
toSeq(t.conns).mapIt(it.client.closeWait()))) toSeq(t.conns).mapIt(it.close())))
# server can be nil # server can be nil
if not isNil(t.server): if not isNil(t.server):

View File

@ -10,7 +10,7 @@ const
StreamServerTrackerName = "stream.server" StreamServerTrackerName = "stream.server"
trackerNames = [ trackerNames = [
# ConnectionTrackerName, ConnectionTrackerName,
BufferStreamTrackerName, BufferStreamTrackerName,
TcpTransportTrackerName, TcpTransportTrackerName,
StreamTransportTrackerName, StreamTransportTrackerName,

View File

@ -205,14 +205,18 @@ suite "TCP transport":
transports.add(TcpTransport.init(maxIncoming = 2)) transports.add(TcpTransport.init(maxIncoming = 2))
asyncCheck transports[0].listen(ma, connHandler) asyncCheck transports[0].listen(ma, connHandler)
var conns: seq[Connection]
try: try:
for i in 0..10: for i in 0..10:
let transport = TcpTransport.init() let transport = TcpTransport.init()
transports.add(transport) transports.add(transport)
discard await transport.dial(transports[0].ma).wait(10.millis) conns.add(await transport.dial(transports[0].ma).wait(10.millis))
except AsyncTimeoutError: except AsyncTimeoutError:
check times == 2 check times == 2
await allFuturesThrowing(
conns.mapIt(it.close()))
await allFuturesThrowing( await allFuturesThrowing(
transports.mapIt(it.close())) transports.mapIt(it.close()))
@ -229,14 +233,18 @@ suite "TCP transport":
transports.add(TcpTransport.init()) transports.add(TcpTransport.init())
asyncCheck transports[0].listen(ma, connHandler) asyncCheck transports[0].listen(ma, connHandler)
var conns: seq[Connection]
try: try:
let transport = TcpTransport.init(maxOutgoing = 2) let transport = TcpTransport.init(maxOutgoing = 2)
transports.add(transport) transports.add(transport)
for i in 0..10: for i in 0..10:
discard await transport.dial(transports[0].ma) conns.add(await transport.dial(transports[0].ma))
except TooManyConnections: except TooManyConnections:
check times == 2 check times == 2
await allFuturesThrowing(
conns.mapIt(it.close()))
await allFuturesThrowing( await allFuturesThrowing(
transports.mapIt(it.close())) transports.mapIt(it.close()))