From cf25e3678f373a256f68a0c51b9de011d26d9d85 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Thu, 18 Mar 2021 17:43:39 -0600 Subject: [PATCH] more fixes around raises --- libp2p.nimble | 2 +- libp2p/connmanager.nim | 18 +++--------------- libp2p/daemon/daemonapi.nim | 15 +++++++++------ libp2p/dialer.nim | 17 ++++++++--------- libp2p/muxers/mplex/mplex.nim | 12 ++---------- libp2p/switch.nim | 4 ++-- 6 files changed, 25 insertions(+), 43 deletions(-) diff --git a/libp2p.nimble b/libp2p.nimble index a2ca432bf..fa9eb853b 100644 --- a/libp2p.nimble +++ b/libp2p.nimble @@ -11,7 +11,7 @@ requires "nim >= 1.2.0", "nimcrypto >= 0.4.1", "bearssl >= 0.1.4", "chronicles >= 0.7.2", - "chronos >= 2.5.2", + "chronos#eh-tracking", "metrics", "secp256k1", "stew#head" diff --git a/libp2p/connmanager.nim b/libp2p/connmanager.nim index 227c3812a..bd269c07a 100644 --- a/libp2p/connmanager.nim +++ b/libp2p/connmanager.nim @@ -150,9 +150,7 @@ proc triggerConnEvent*(c: ConnManager, connEvents.add(h(peerId, event)) checkFutures(await allFinished(connEvents)) - except CancelledError as exc: - raise exc - except CatchableError as exc: + except LPError as exc: warn "Exception in triggerConnEvents", msg = exc.msg, peerId, event = $event @@ -217,9 +215,7 @@ proc triggerPeerEvents*(c: ConnManager, raiseAssert exc.msg checkFutures(await allFinished(peerEvents)) - except CancelledError as exc: - raise exc - except CatchableError as exc: # handlers should not raise! + except LPError as exc: # handlers should not raise! warn "Exception in triggerPeerEvents", exc = exc.msg, peerId proc contains*(c: ConnManager, conn: Connection): bool = @@ -316,8 +312,6 @@ proc onConnUpgraded(c: ConnManager, conn: Connection) {.async.} = await c.triggerConnEvent( peerId, ConnEvent(kind: ConnEventKind.Connected, incoming: conn.dir == Direction.In)) except CatchableError as exc: - # This is top-level procedure which will work as separate task, so it - # do not need to propagate CancelledError and should handle other errors warn "Unexpected exception in switch peer connection cleanup", conn, msg = exc.msg @@ -329,8 +323,6 @@ proc peerCleanup(c: ConnManager, conn: Connection) {.async.} = peerId, ConnEvent(kind: ConnEventKind.Disconnected)) await c.triggerPeerEvents(peerId, PeerEvent(kind: PeerEventKind.Left)) except CatchableError as exc: - # This is top-level procedure which will work as separate task, so it - # do not need to propagate CancelledError and should handle other errors warn "Unexpected exception peer cleanup handler", conn, msg = exc.msg @@ -343,12 +335,8 @@ proc onClose(c: ConnManager, conn: Connection) {.async.} = await conn.join() trace "Connection closed, cleaning up", conn await c.cleanupConn(conn) - except CancelledError: - # This is top-level procedure which will work as separate task, so it - # do not need to propagate CancelledError. - debug "Unexpected cancellation in connection manager's cleanup", conn except CatchableError as exc: - debug "Unexpected exception in connection manager's cleanup", + debug "Exception in connection manager's cleanup", errMsg = exc.msg, conn finally: trace "Triggering peerCleanup", conn diff --git a/libp2p/daemon/daemonapi.nim b/libp2p/daemon/daemonapi.nim index 040f7cdbc..db14600a8 100644 --- a/libp2p/daemon/daemonapi.nim +++ b/libp2p/daemon/daemonapi.nim @@ -496,6 +496,8 @@ proc recvMessage(conn: StreamTransport): Future[seq[byte]] {.async.} = await conn.readExactly(addr buffer[0], int(size)) except TransportIncompleteError: buffer.setLen(0) + except IOSelectorsException: + buffer.setLen(0) result = buffer @@ -912,9 +914,9 @@ proc openStream*(api: DaemonAPI, peer: PeerID, stream.flags.incl(Outbound) stream.transp = transp result = stream - except Exception as exc: + except CatchableError as exc: await api.closeConnection(transp) - raiseAssert exc.msg + raise exc proc streamHandler(server: StreamServer, transp: StreamTransport) {.async.} = var api = getUserData[DaemonAPI](server) @@ -951,13 +953,14 @@ proc addHandler*(api: DaemonAPI, protocols: seq[string], protocols)) pb.withMessage() do: api.servers.add(P2PServer(server: server, address: maddress)) - finally: + except CatchableError as exc: for item in protocols: api.handlers.del(item) server.stop() server.close() await server.join() - + raise exc + finally: await api.closeConnection(transp) proc listPeers*(api: DaemonAPI): Future[seq[PeerInfo]] {.async.} = @@ -1325,9 +1328,9 @@ proc pubsubSubscribe*( ticket.transp = transp asyncSpawn pubsubLoop(api, ticket) result = ticket - except Exception as exc: + except CatchableError as exc: await api.closeConnection(transp) - raiseAssert exc.msg + raise exc proc shortLog*(pinfo: PeerInfo): string = ## Get string representation of ``PeerInfo`` object. diff --git a/libp2p/dialer.nim b/libp2p/dialer.nim index 893f121a1..74ae4a42a 100644 --- a/libp2p/dialer.nim +++ b/libp2p/dialer.nim @@ -7,7 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -{.push raises: [Defect, DialFailedError].} +{.push raises: [Defect].} import std/[sugar, tables] @@ -151,7 +151,8 @@ proc internalConnect( method connect*( self: Dialer, peerId: PeerID, - addrs: seq[MultiAddress]) {.async.} = + addrs: seq[MultiAddress]) + {.async, raises: [Defect, DialFailedError].} = ## connect remote peer without negotiating ## a protocol ## @@ -176,7 +177,8 @@ proc negotiateStream( method dial*( self: Dialer, peerId: PeerID, - protos: seq[string]): Future[Connection] {.async.} = + protos: seq[string]): Future[Connection] + {.async, raises: [Defect, DialFailedError].} = ## create a protocol stream over an ## existing connection ## @@ -192,7 +194,8 @@ method dial*( self: Dialer, peerId: PeerID, addrs: seq[MultiAddress], - protos: seq[string]): Future[Connection] {.async.} = + protos: seq[string]): Future[Connection] + {.async, raises: [Defect, DialFailedError].} = ## create a protocol stream and establish ## a connection if one doesn't exist already ## @@ -219,12 +222,8 @@ method dial*( "Couldn't get muxed stream") return await self.negotiateStream(stream, protos) - except CancelledError as exc: - trace "Dial canceled", conn - await cleanup() - raise exc except CatchableError as exc: - debug "Error dialing", conn, msg = exc.msg + debug "Exception dialing", conn, msg = exc.msg await cleanup() raise exc diff --git a/libp2p/muxers/mplex/mplex.nim b/libp2p/muxers/mplex/mplex.nim index 68daa8d6d..5ac6e0de6 100644 --- a/libp2p/muxers/mplex/mplex.nim +++ b/libp2p/muxers/mplex/mplex.nim @@ -70,9 +70,6 @@ proc cleanupChann(m: Mplex, chann: LPChannel) {.async, inline.} = m.channels[chann.initiator].len.int64, labelValues = [$chann.initiator, $m.connection.peerInfo.peerId]) except CatchableError as exc: - # This is top-level procedure which will work as separate task, so it - # do not need to propogate CancelledError, and no other exceptions should - # happen here warn "Error cleaning up mplex channel", m, chann, msg = exc.msg proc newStreamInternal*( @@ -123,8 +120,6 @@ proc handleStream(m: Mplex, chann: LPChannel) {.async.} = trace "finished handling stream", m, chann doAssert(chann.closed, "connection not closed by handler!") except CatchableError as exc: - # This is top-level procedure which will work as separate task, so it - # do not need to propogate CancelledError. trace "Exception in mplex stream handler", m, chann, msg = exc.msg await chann.reset() @@ -187,14 +182,11 @@ method handle*(m: Mplex) {.async, gcsafe.} = await channel.pushEof() of MessageType.ResetIn, MessageType.ResetOut: await channel.reset() - except CancelledError: - # This procedure is spawned as task and it is not part of public API, so - # there no way for this procedure to be cancelled implicitly. - debug "Unexpected cancellation in mplex handler", m except LPStreamEOFError as exc: + # mostly here to avoid scary EOF messages in debug trace "Stream EOF", m, msg = exc.msg except CatchableError as exc: - debug "Unexpected exception in mplex read loop", m, msg = exc.msg + debug "Exception in mplex read loop", m, msg = exc.msg finally: await m.close() trace "Stopped mplex handler", m diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 8fc5892ae..7f9b04de0 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -234,8 +234,8 @@ proc stop*(s: Switch) {.async.} = warn "error cleaning up transports", msg = exc.msg try: - await allFutures(s.acceptFuts) - .wait(1.seconds) + await allFuturesThrowing( + allFinished(s.acceptFuts)).wait(1.seconds) except CatchableError as exc: trace "Exception while stopping accept loops", exc = exc.msg