remove all()

This commit is contained in:
Dmitriy Ryajov 2020-05-27 14:46:25 -06:00
parent 293b7da295
commit 815282a5da
4 changed files with 74 additions and 48 deletions

View File

@ -197,17 +197,30 @@ method close*(m: Mplex) {.async, gcsafe.} =
if m.isClosed: if m.isClosed:
return return
trace "closing mplex muxer", oid = m.oid try:
await all( trace "closing mplex muxer", oid = m.oid
toSeq(m.remote.values).mapIt(it.reset()) & let channs = toSeq(m.remote.values) &
toSeq(m.local.values).mapIt(it.reset())) toSeq(m.local.values)
await all(m.conns.mapIt(it.close())) # dispose of channe's connections for chann in channs:
await all(m.handlerFuts) try:
await chann.reset()
except CatchableError as exc:
warn "error resetting channel", exc = exc.msg
await m.connection.close() for conn in m.conns:
m.remote.clear() try:
m.local.clear() await conn.close()
m.conns = @[] except CatchableError as exc:
m.handlerFuts = @[] warn "error closing channel's connection"
m.isClosed = true
checkFutures(
await allFinished(m.handlerFuts))
await m.connection.close()
finally:
m.remote.clear()
m.local.clear()
m.conns = @[]
m.handlerFuts = @[]
m.isClosed = true

View File

@ -60,8 +60,7 @@ method init(c: MuxerProvider) =
if not isNil(c.muxerHandler): if not isNil(c.muxerHandler):
futs &= c.muxerHandler(muxer) futs &= c.muxerHandler(muxer)
# log and re-raise on errors checkFutures(await allFinished(futs))
await all(futs)
except CatchableError as exc: except CatchableError as exc:
trace "exception in muxer handler", exc = exc.msg trace "exception in muxer handler", exc = exc.msg

View File

@ -328,22 +328,30 @@ proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} =
result = startFuts # listen for incoming connections result = startFuts # listen for incoming connections
proc stop*(s: Switch) {.async.} = proc stop*(s: Switch) {.async.} =
trace "stopping switch" try:
trace "stopping switch"
# we want to report erros but we do not want to fail # we want to report errors but we do not want to fail
# or crash here, cos we need to clean possibly MANY items # or crash here, cos we need to clean possibly MANY items
# and any following conn/transport won't be cleaned up # and any following conn/transport won't be cleaned up
if s.pubSub.isSome: if s.pubSub.isSome:
await s.pubSub.get().stop() await s.pubSub.get().stop()
await all( for conn in toSeq(s.connections.values):
toSeq(s.connections.values) try:
.mapIt(s.cleanupConn(it))) await s.cleanupConn(conn)
except CatchableError as exc:
warn "error cleaning up connections"
await all( for t in s.transports:
s.transports.mapIt(it.close())) try:
await t.close()
except CatchableError as exc:
warn "error cleaning up transports"
trace "switch stopped" trace "switch stopped"
except CatchableError as exc:
warn "error stopping switch", exc = exc.msg
proc subscribeToPeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} = proc subscribeToPeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} =
## Subscribe to pub sub peer ## Subscribe to pub sub peer

View File

@ -113,35 +113,41 @@ method initTransport*(t: TcpTransport) =
inc getTcpTransportTracker().opened inc getTcpTransportTracker().opened
method close*(t: TcpTransport) {.async, gcsafe.} = method close*(t: TcpTransport) {.async, gcsafe.} =
## start the transport try:
trace "stopping transport" ## start the transport
await procCall Transport(t).close() # call base trace "stopping transport"
await procCall Transport(t).close() # call base
await all( checkFutures(await allFinished(
t.clients.mapIt(it.closeWait())) t.clients.mapIt(it.closeWait())))
# server can be nil # server can be nil
if not isNil(t.server): if not isNil(t.server):
t.server.stop() t.server.stop()
await t.server.closeWait() await t.server.closeWait()
t.server = nil t.server = nil
for fut in t.handlers: for fut in t.handlers:
if not fut.finished: if not fut.finished:
fut.cancel() fut.cancel()
await all(t.handlers)
t.handlers = @[]
for fut in t.cleanups: checkFutures(
if not fut.finished: await allFinished(t.handlers))
fut.cancel() t.handlers = @[]
await all(t.cleanups)
t.cleanups = @[]
trace "transport stopped" for fut in t.cleanups:
if not fut.finished:
fut.cancel()
inc getTcpTransportTracker().closed checkFutures(
await allFinished(t.cleanups))
t.cleanups = @[]
trace "transport stopped"
inc getTcpTransportTracker().closed
except CatchableError as exc:
trace "error shutting down tcp transport", exc = exc.msg
method listen*(t: TcpTransport, method listen*(t: TcpTransport,
ma: MultiAddress, ma: MultiAddress,