From 815282a5dae665d9e4da2b67d2e61125a12e2d95 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Wed, 27 May 2020 14:46:25 -0600 Subject: [PATCH] remove all() --- libp2p/muxers/mplex/mplex.nim | 37 +++++++++++++++------- libp2p/muxers/muxer.nim | 3 +- libp2p/switch.nim | 32 ++++++++++++------- libp2p/transports/tcptransport.nim | 50 +++++++++++++++++------------- 4 files changed, 74 insertions(+), 48 deletions(-) diff --git a/libp2p/muxers/mplex/mplex.nim b/libp2p/muxers/mplex/mplex.nim index 489f41b8a..2353eab18 100644 --- a/libp2p/muxers/mplex/mplex.nim +++ b/libp2p/muxers/mplex/mplex.nim @@ -197,17 +197,30 @@ method close*(m: Mplex) {.async, gcsafe.} = if m.isClosed: return - trace "closing mplex muxer", oid = m.oid - await all( - toSeq(m.remote.values).mapIt(it.reset()) & - toSeq(m.local.values).mapIt(it.reset())) + try: + trace "closing mplex muxer", oid = m.oid + let channs = toSeq(m.remote.values) & + toSeq(m.local.values) - await all(m.conns.mapIt(it.close())) # dispose of channe's connections - await all(m.handlerFuts) + for chann in channs: + try: + await chann.reset() + except CatchableError as exc: + warn "error resetting channel", exc = exc.msg - await m.connection.close() - m.remote.clear() - m.local.clear() - m.conns = @[] - m.handlerFuts = @[] - m.isClosed = true + for conn in m.conns: + try: + await conn.close() + except CatchableError as exc: + warn "error closing channel's connection" + + checkFutures( + await allFinished(m.handlerFuts)) + + await m.connection.close() + finally: + m.remote.clear() + m.local.clear() + m.conns = @[] + m.handlerFuts = @[] + m.isClosed = true diff --git a/libp2p/muxers/muxer.nim b/libp2p/muxers/muxer.nim index 41a2eaac1..7a95b1c04 100644 --- a/libp2p/muxers/muxer.nim +++ b/libp2p/muxers/muxer.nim @@ -60,8 +60,7 @@ method init(c: MuxerProvider) = if not isNil(c.muxerHandler): futs &= c.muxerHandler(muxer) - # log and re-raise on errors - await all(futs) + checkFutures(await allFinished(futs)) except CatchableError as exc: trace "exception in muxer handler", exc = exc.msg diff --git a/libp2p/switch.nim b/libp2p/switch.nim index f11fb3315..685825907 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -328,22 +328,30 @@ proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} = result = startFuts # listen for incoming connections proc stop*(s: Switch) {.async.} = - trace "stopping switch" + try: + trace "stopping switch" - # we want to report erros but we do not want to fail - # or crash here, cos we need to clean possibly MANY items - # and any following conn/transport won't be cleaned up - if s.pubSub.isSome: - await s.pubSub.get().stop() + # we want to report errors but we do not want to fail + # or crash here, cos we need to clean possibly MANY items + # and any following conn/transport won't be cleaned up + if s.pubSub.isSome: + await s.pubSub.get().stop() - await all( - toSeq(s.connections.values) - .mapIt(s.cleanupConn(it))) + for conn in toSeq(s.connections.values): + try: + await s.cleanupConn(conn) + except CatchableError as exc: + warn "error cleaning up connections" - await all( - s.transports.mapIt(it.close())) + for t in s.transports: + try: + await t.close() + except CatchableError as exc: + warn "error cleaning up transports" - trace "switch stopped" + trace "switch stopped" + except CatchableError as exc: + warn "error stopping switch", exc = exc.msg proc subscribeToPeer*(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} = ## Subscribe to pub sub peer diff --git a/libp2p/transports/tcptransport.nim b/libp2p/transports/tcptransport.nim index fd51adaf6..9799471dd 100644 --- a/libp2p/transports/tcptransport.nim +++ b/libp2p/transports/tcptransport.nim @@ -113,35 +113,41 @@ method initTransport*(t: TcpTransport) = inc getTcpTransportTracker().opened method close*(t: TcpTransport) {.async, gcsafe.} = - ## start the transport - trace "stopping transport" - await procCall Transport(t).close() # call base + try: + ## start the transport + trace "stopping transport" + await procCall Transport(t).close() # call base - await all( - t.clients.mapIt(it.closeWait())) + checkFutures(await allFinished( + t.clients.mapIt(it.closeWait()))) - # server can be nil - if not isNil(t.server): - t.server.stop() - await t.server.closeWait() + # server can be nil + if not isNil(t.server): + t.server.stop() + await t.server.closeWait() - t.server = nil + t.server = nil - for fut in t.handlers: - if not fut.finished: - fut.cancel() - await all(t.handlers) - t.handlers = @[] + for fut in t.handlers: + if not fut.finished: + fut.cancel() - for fut in t.cleanups: - if not fut.finished: - fut.cancel() - await all(t.cleanups) - t.cleanups = @[] + checkFutures( + await allFinished(t.handlers)) + t.handlers = @[] - trace "transport stopped" + for fut in t.cleanups: + if not fut.finished: + fut.cancel() - inc getTcpTransportTracker().closed + checkFutures( + await allFinished(t.cleanups)) + t.cleanups = @[] + + trace "transport stopped" + inc getTcpTransportTracker().closed + except CatchableError as exc: + trace "error shutting down tcp transport", exc = exc.msg method listen*(t: TcpTransport, ma: MultiAddress,