From 976164ba3c1bb261ac8e91504c743271ec6e2699 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Mon, 6 Apr 2020 15:33:44 -0600 Subject: [PATCH] proper connection cleanup --- libp2p/connection.nim | 8 +++++--- libp2p/transports/tcptransport.nim | 19 +++++++++++-------- libp2p/transports/transport.nim | 8 ++------ 3 files changed, 18 insertions(+), 17 deletions(-) diff --git a/libp2p/connection.nim b/libp2p/connection.nim index be49a1dc4..8073a87b3 100644 --- a/libp2p/connection.nim +++ b/libp2p/connection.nim @@ -113,9 +113,10 @@ method closed*(s: Connection): bool = result = s.stream.closed method close*(s: Connection) {.async, gcsafe.} = - trace "closing connection", closed = s.closed, - peer = if not isNil(s.peerInfo): - s.peerInfo.id else: "" + trace "about to close connection", closed = s.closed, + peer = if not isNil(s.peerInfo): + s.peerInfo.id else: "" + if not s.closed: if not isNil(s.stream) and not s.stream.closed: trace "closing connection", closed = s.closed, @@ -125,6 +126,7 @@ method close*(s: Connection) {.async, gcsafe.} = s.closeEvent.fire() s.isClosed = true + trace "connection closed", closed = s.closed, peer = if not isNil(s.peerInfo): s.peerInfo.id else: "" diff --git a/libp2p/transports/tcptransport.nim b/libp2p/transports/tcptransport.nim index 6c47996b4..f2af4da2c 100644 --- a/libp2p/transports/tcptransport.nim +++ b/libp2p/transports/tcptransport.nim @@ -21,6 +21,10 @@ logScope: type TcpTransport* = ref object of Transport server*: StreamServer +proc cleanup(t: Transport, conn: Connection) {.async.} = + await conn.closeEvent.wait() + t.connections.keepItIf(it != conn) + proc connHandler*(t: Transport, server: StreamServer, client: StreamTransport, @@ -32,12 +36,10 @@ proc connHandler*(t: Transport, if not initiator: if not isNil(t.handler): asyncCheck t.handler(conn) - # TODO: this needs rethinking, - # currently it leaks since there - # is no way to delete the conn on close - # let connHolder: ConnHolder = ConnHolder(connection: conn, - # connFuture: handlerFut) - # t.connections.add(connHolder) + + t.connections.add(conn) + asyncCheck t.cleanup(conn) + result = conn proc connCb(server: StreamServer, @@ -55,10 +57,11 @@ method close*(t: TcpTransport): Future[void] {.async, gcsafe.} = await procCall Transport(t).close() # call base # server can be nil - if t.server != nil: + if isNil(t.server): t.server.stop() t.server.close() - trace "transport stopped" + await t.server.join() + trace "transport stopped" method listen*(t: TcpTransport, ma: MultiAddress, diff --git a/libp2p/transports/transport.nim b/libp2p/transports/transport.nim index 8d5a2c4cc..564afd702 100644 --- a/libp2p/transports/transport.nim +++ b/libp2p/transports/transport.nim @@ -16,13 +16,9 @@ import ../connection, type ConnHandler* = proc (conn: Connection): Future[void] {.gcsafe.} - ConnHolder* = object - connection*: Connection - connFuture*: Future[void] - Transport* = ref object of RootObj ma*: Multiaddress - connections*: seq[ConnHolder] + connections*: seq[Connection] handler*: ConnHandler multicodec*: MultiCodec @@ -37,7 +33,7 @@ proc newTransport*(t: typedesc[Transport]): t {.gcsafe.} = method close*(t: Transport) {.base, async, gcsafe.} = ## stop and cleanup the transport ## including all outstanding connections - await allFutures(t.connections.mapIt(it.connection.close())) + await allFutures(t.connections.mapIt(it.close())) method listen*(t: Transport, ma: MultiAddress,