more connection closes to fix leaks
This commit is contained in:
parent
e4303110a6
commit
8dda6c289d
|
@ -126,8 +126,7 @@ method handle*(m: Mplex) {.async, gcsafe.} =
|
||||||
trace "Exception occurred", exception = exc.msg
|
trace "Exception occurred", exception = exc.msg
|
||||||
finally:
|
finally:
|
||||||
trace "stopping mplex main loop"
|
trace "stopping mplex main loop"
|
||||||
if not m.connection.closed():
|
await m.close()
|
||||||
await m.connection.close()
|
|
||||||
|
|
||||||
proc newMplex*(conn: Connection,
|
proc newMplex*(conn: Connection,
|
||||||
maxChanns: uint = MaxChannels): Mplex =
|
maxChanns: uint = MaxChannels): Mplex =
|
||||||
|
@ -154,5 +153,10 @@ method newStream*(m: Mplex,
|
||||||
|
|
||||||
method close*(m: Mplex) {.async, gcsafe.} =
|
method close*(m: Mplex) {.async, gcsafe.} =
|
||||||
trace "closing mplex muxer"
|
trace "closing mplex muxer"
|
||||||
|
if not m.connection.closed():
|
||||||
|
await m.connection.close()
|
||||||
|
|
||||||
await allFutures(@[allFutures(toSeq(m.remote.values).mapIt(it.reset())),
|
await allFutures(@[allFutures(toSeq(m.remote.values).mapIt(it.reset())),
|
||||||
allFutures(toSeq(m.local.values).mapIt(it.reset()))])
|
allFutures(toSeq(m.local.values).mapIt(it.reset()))])
|
||||||
|
m.remote.clear()
|
||||||
|
m.local.clear()
|
||||||
|
|
|
@ -198,6 +198,7 @@ proc upgradeIncoming(s: Switch, conn: Connection) {.async, gcsafe.} =
|
||||||
|
|
||||||
# handle subsequent requests
|
# handle subsequent requests
|
||||||
await ms.handle(sconn)
|
await ms.handle(sconn)
|
||||||
|
await sconn.close()
|
||||||
|
|
||||||
if (await ms.select(conn)): # just handshake
|
if (await ms.select(conn)): # just handshake
|
||||||
# add the secure handlers
|
# add the secure handlers
|
||||||
|
@ -285,14 +286,14 @@ proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} =
|
||||||
|
|
||||||
proc handle(conn: Connection): Future[void] {.async, closure, gcsafe.} =
|
proc handle(conn: Connection): Future[void] {.async, closure, gcsafe.} =
|
||||||
try:
|
try:
|
||||||
|
dumpNumberOfInstances()
|
||||||
await s.upgradeIncoming(conn) # perform upgrade on incoming connection
|
await s.upgradeIncoming(conn) # perform upgrade on incoming connection
|
||||||
except CatchableError as exc:
|
except CatchableError as exc:
|
||||||
trace "Exception occurred in Switch.start", exc = exc.msg
|
trace "Exception occurred in Switch.start", exc = exc.msg
|
||||||
finally:
|
finally:
|
||||||
if not isNil(conn) and not conn.closed:
|
await conn.close()
|
||||||
await conn.close()
|
|
||||||
|
|
||||||
await s.cleanupConn(conn)
|
await s.cleanupConn(conn)
|
||||||
|
dumpNumberOfInstances()
|
||||||
|
|
||||||
var startFuts: seq[Future[void]]
|
var startFuts: seq[Future[void]]
|
||||||
for t in s.transports: # for each transport
|
for t in s.transports: # for each transport
|
||||||
|
|
|
@ -33,7 +33,10 @@ proc connHandler*(t: Transport,
|
||||||
let handlerFut = if isNil(t.handler): nil else: t.handler(conn)
|
let handlerFut = if isNil(t.handler): nil else: t.handler(conn)
|
||||||
let connHolder: ConnHolder = ConnHolder(connection: conn,
|
let connHolder: ConnHolder = ConnHolder(connection: conn,
|
||||||
connFuture: handlerFut)
|
connFuture: handlerFut)
|
||||||
t.connections.add(connHolder)
|
# TODO: this needs rethinking,
|
||||||
|
# currently it leaks since there
|
||||||
|
# is no way to delete the conn on close
|
||||||
|
# t.connections.add(connHolder)
|
||||||
result = conn
|
result = conn
|
||||||
|
|
||||||
proc connCb(server: StreamServer,
|
proc connCb(server: StreamServer,
|
||||||
|
|
Loading…
Reference in New Issue