proper connection cleanup

This commit is contained in:
Dmitriy Ryajov 2020-04-06 15:33:44 -06:00
parent 0c8dde15e7
commit 976164ba3c
3 changed files with 18 additions and 17 deletions

View File

@ -113,9 +113,10 @@ method closed*(s: Connection): bool =
result = s.stream.closed result = s.stream.closed
method close*(s: Connection) {.async, gcsafe.} = method close*(s: Connection) {.async, gcsafe.} =
trace "closing connection", closed = s.closed, trace "about to close connection", closed = s.closed,
peer = if not isNil(s.peerInfo): peer = if not isNil(s.peerInfo):
s.peerInfo.id else: "" s.peerInfo.id else: ""
if not s.closed: if not s.closed:
if not isNil(s.stream) and not s.stream.closed: if not isNil(s.stream) and not s.stream.closed:
trace "closing connection", closed = s.closed, trace "closing connection", closed = s.closed,
@ -125,6 +126,7 @@ method close*(s: Connection) {.async, gcsafe.} =
s.closeEvent.fire() s.closeEvent.fire()
s.isClosed = true s.isClosed = true
trace "connection closed", closed = s.closed, trace "connection closed", closed = s.closed,
peer = if not isNil(s.peerInfo): peer = if not isNil(s.peerInfo):
s.peerInfo.id else: "" s.peerInfo.id else: ""

View File

@ -21,6 +21,10 @@ logScope:
type TcpTransport* = ref object of Transport type TcpTransport* = ref object of Transport
server*: StreamServer server*: StreamServer
proc cleanup(t: Transport, conn: Connection) {.async.} =
await conn.closeEvent.wait()
t.connections.keepItIf(it != conn)
proc connHandler*(t: Transport, proc connHandler*(t: Transport,
server: StreamServer, server: StreamServer,
client: StreamTransport, client: StreamTransport,
@ -32,12 +36,10 @@ proc connHandler*(t: Transport,
if not initiator: if not initiator:
if not isNil(t.handler): if not isNil(t.handler):
asyncCheck t.handler(conn) asyncCheck t.handler(conn)
# TODO: this needs rethinking,
# currently it leaks since there t.connections.add(conn)
# is no way to delete the conn on close asyncCheck t.cleanup(conn)
# let connHolder: ConnHolder = ConnHolder(connection: conn,
# connFuture: handlerFut)
# t.connections.add(connHolder)
result = conn result = conn
proc connCb(server: StreamServer, proc connCb(server: StreamServer,
@ -55,9 +57,10 @@ method close*(t: TcpTransport): Future[void] {.async, gcsafe.} =
await procCall Transport(t).close() # call base await procCall Transport(t).close() # call base
# server can be nil # server can be nil
if t.server != nil: if isNil(t.server):
t.server.stop() t.server.stop()
t.server.close() t.server.close()
await t.server.join()
trace "transport stopped" trace "transport stopped"
method listen*(t: TcpTransport, method listen*(t: TcpTransport,

View File

@ -16,13 +16,9 @@ import ../connection,
type type
ConnHandler* = proc (conn: Connection): Future[void] {.gcsafe.} ConnHandler* = proc (conn: Connection): Future[void] {.gcsafe.}
ConnHolder* = object
connection*: Connection
connFuture*: Future[void]
Transport* = ref object of RootObj Transport* = ref object of RootObj
ma*: Multiaddress ma*: Multiaddress
connections*: seq[ConnHolder] connections*: seq[Connection]
handler*: ConnHandler handler*: ConnHandler
multicodec*: MultiCodec multicodec*: MultiCodec
@ -37,7 +33,7 @@ proc newTransport*(t: typedesc[Transport]): t {.gcsafe.} =
method close*(t: Transport) {.base, async, gcsafe.} = method close*(t: Transport) {.base, async, gcsafe.} =
## stop and cleanup the transport ## stop and cleanup the transport
## including all outstanding connections ## including all outstanding connections
await allFutures(t.connections.mapIt(it.connection.close())) await allFutures(t.connections.mapIt(it.close()))
method listen*(t: Transport, method listen*(t: Transport,
ma: MultiAddress, ma: MultiAddress,