From 8dda6c289d2b1f582c1cbe68b1d1915d2902142a Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Sat, 4 Apr 2020 20:42:08 -0600 Subject: [PATCH] more connection closes to fix leaks --- libp2p/muxers/mplex/mplex.nim | 8 ++++++-- libp2p/switch.nim | 7 ++++--- libp2p/transports/tcptransport.nim | 5 ++++- 3 files changed, 14 insertions(+), 6 deletions(-) diff --git a/libp2p/muxers/mplex/mplex.nim b/libp2p/muxers/mplex/mplex.nim index 54ba181d7..50acef392 100644 --- a/libp2p/muxers/mplex/mplex.nim +++ b/libp2p/muxers/mplex/mplex.nim @@ -126,8 +126,7 @@ method handle*(m: Mplex) {.async, gcsafe.} = trace "Exception occurred", exception = exc.msg finally: trace "stopping mplex main loop" - if not m.connection.closed(): - await m.connection.close() + await m.close() proc newMplex*(conn: Connection, maxChanns: uint = MaxChannels): Mplex = @@ -154,5 +153,10 @@ method newStream*(m: Mplex, method close*(m: Mplex) {.async, gcsafe.} = trace "closing mplex muxer" + if not m.connection.closed(): + await m.connection.close() + await allFutures(@[allFutures(toSeq(m.remote.values).mapIt(it.reset())), allFutures(toSeq(m.local.values).mapIt(it.reset()))]) + m.remote.clear() + m.local.clear() diff --git a/libp2p/switch.nim b/libp2p/switch.nim index f4a7bbdce..c566aae2d 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -198,6 +198,7 @@ proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} = # handle subsequent requests await ms.handle(sconn) + await sconn.close() if (await ms.select(conn)): # just handshake # add the secure handlers @@ -285,14 +286,14 @@ proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} = proc handle(conn: Connection): Future[void] {.async, closure, gcsafe.} = try: + dumpNumberOfInstances() await s.upgradeIncoming(conn) # perform upgrade on incoming connection except CatchableError as exc: trace "Exception occurred in Switch.start", exc = exc.msg finally: - if not isNil(conn) and not conn.closed: - await conn.close() - + await conn.close() await s.cleanupConn(conn) + dumpNumberOfInstances() var startFuts: seq[Future[void]] for t in s.transports: # for each transport diff --git a/libp2p/transports/tcptransport.nim b/libp2p/transports/tcptransport.nim index 70d1d76c8..6d6efb302 100644 --- a/libp2p/transports/tcptransport.nim +++ b/libp2p/transports/tcptransport.nim @@ -33,7 +33,10 @@ proc connHandler*(t: Transport, let handlerFut = if isNil(t.handler): nil else: t.handler(conn) let connHolder: ConnHolder = ConnHolder(connection: conn, connFuture: handlerFut) - t.connections.add(connHolder) + # TODO: this needs rethinking, + # currently it leaks since there + # is no way to delete the conn on close + # t.connections.add(connHolder) result = conn proc connCb(server: StreamServer,