From 0b851921195883172301ad113b18ac04333a00ca Mon Sep 17 00:00:00 2001 From: Eugene Kabanov Date: Fri, 4 Sep 2020 19:30:45 +0300 Subject: [PATCH] Remove asyncCheck from codebase. (#345) * Remove asyncCheck from codebase. * Replace all `discard` statements with new `asyncSpawn`. * Bump `nim-chronos` requirement. --- libp2p.nimble | 2 +- libp2p/connmanager.nim | 20 +++++++--- libp2p/muxers/mplex/lpchannel.nim | 41 ++++++++++--------- libp2p/muxers/mplex/mplex.nim | 50 ++++++++++++++--------- libp2p/protocols/pubsub/floodsub.nim | 10 ++++- libp2p/protocols/pubsub/gossipsub.nim | 10 ++++- libp2p/protocols/pubsub/pubsubpeer.nim | 10 +++-- libp2p/protocols/secure/secure.nim | 17 ++++++-- libp2p/switch.nim | 55 +++++++++++++++++++++----- libp2p/transports/tcptransport.nim | 9 +++-- 10 files changed, 155 insertions(+), 69 deletions(-) diff --git a/libp2p.nimble b/libp2p.nimble index ef37b2dca..e1d4a08e3 100644 --- a/libp2p.nimble +++ b/libp2p.nimble @@ -11,7 +11,7 @@ requires "nim >= 1.2.0", "nimcrypto >= 0.4.1", "bearssl >= 0.1.4", "chronicles >= 0.7.2", - "chronos >= 2.3.8", + "chronos >= 2.5.2", "metrics", "secp256k1", "stew >= 0.1.0" diff --git a/libp2p/connmanager.nim b/libp2p/connmanager.nim index 8d101146a..4cbafda98 100644 --- a/libp2p/connmanager.nim +++ b/libp2p/connmanager.nim @@ -120,10 +120,17 @@ proc onClose(c: ConnManager, conn: Connection) {.async.} = ## ## triggers the connections resource cleanup ## - - await conn.join() - trace "triggering connection cleanup", peer = $conn.peerInfo - await c.cleanupConn(conn) + try: + await conn.join() + trace "triggering connection cleanup", peer = $conn.peerInfo + await c.cleanupConn(conn) + except CancelledError: + # This is top-level procedure which will work as separate task, so it + # do not need to propogate CancelledError. + trace "Unexpected cancellation in connection manager's cleanup" + except CatchableError as exc: + trace "Unexpected exception in connection manager's cleanup", + errMsg = exc.msg proc selectConn*(c: ConnManager, peerId: PeerID, @@ -184,8 +191,9 @@ proc storeConn*(c: ConnManager, conn: Connection) = c.conns[peerId].incl(conn) - # launch on close listener - asyncCheck c.onClose(conn) + # Launch on close listener + # All the errors are handled inside `onClose()` procedure. + asyncSpawn c.onClose(conn) libp2p_peers.set(c.conns.len.int64) trace "stored connection", connections = c.conns.len, peer = peerId diff --git a/libp2p/muxers/mplex/lpchannel.nim b/libp2p/muxers/mplex/lpchannel.nim index 48eadb97d..e62dfebf2 100644 --- a/libp2p/muxers/mplex/lpchannel.nim +++ b/libp2p/muxers/mplex/lpchannel.nim @@ -66,18 +66,6 @@ template withWriteLock(lock: AsyncLock, body: untyped): untyped = if not(isNil(lock)) and lock.locked: lock.release() -template withEOFExceptions(body: untyped): untyped = - try: - body - except CancelledError as exc: - raise exc - except LPStreamEOFError as exc: - trace "muxed connection EOF", exc = exc.msg - except LPStreamClosedError as exc: - trace "muxed connection closed", exc = exc.msg - except LPStreamIncompleteError as exc: - trace "incomplete message", exc = exc.msg - proc closeMessage(s: LPChannel) {.async.} = logScope: id = s.id @@ -104,11 +92,22 @@ proc resetMessage(s: LPChannel) {.async.} = # stack = getStackTrace() ## send reset message - this will not raise - withEOFExceptions: + try: withWriteLock(s.writeLock): trace "sending reset message" - await s.conn.writeMsg(s.id, s.resetCode) # write reset + except CancelledError: + # This procedure is called from one place and never awaited, so there no + # need to re-raise CancelledError. + trace "Unexpected cancellation while resetting channel" + except LPStreamEOFError as exc: + trace "muxed connection EOF", exc = exc.msg + except LPStreamClosedError as exc: + trace "muxed connection closed", exc = exc.msg + except LPStreamIncompleteError as exc: + trace "incomplete message", exc = exc.msg + except CatchableError as exc: + trace "Unhandled exception leak", exc = exc.msg proc open*(s: LPChannel) {.async, gcsafe.} = logScope: @@ -170,10 +169,7 @@ method reset*(s: LPChannel) {.base, async, gcsafe.} = trace "resetting channel" - # we asyncCheck here because the other end - # might be dead already - reset is always - # optimistic - asyncCheck s.resetMessage() + asyncSpawn s.resetMessage() try: # drain the buffer before closing @@ -210,9 +206,11 @@ method close*(s: LPChannel) {.async, gcsafe.} = await s.closeMessage().wait(2.minutes) if s.atEof: # already closed by remote close parent buffer immediately await procCall BufferStream(s).close() - except CancelledError as exc: + except CancelledError: + trace "Unexpected cancellation while closing channel" await s.reset() - raise exc + # This is top-level procedure which will work as separate task, so it + # do not need to propogate CancelledError. except CatchableError as exc: trace "exception closing channel", exc = exc.msg await s.reset() @@ -220,7 +218,8 @@ method close*(s: LPChannel) {.async, gcsafe.} = trace "lpchannel closed local" s.closedLocal = true - asyncCheck closeInternal() + # All the errors are handled inside `closeInternal()` procedure. + asyncSpawn closeInternal() method initStream*(s: LPChannel) = if s.objName.len == 0: diff --git a/libp2p/muxers/mplex/mplex.nim b/libp2p/muxers/mplex/mplex.nim index 5915f0042..d2611bc2e 100644 --- a/libp2p/muxers/mplex/mplex.nim +++ b/libp2p/muxers/mplex/mplex.nim @@ -48,14 +48,21 @@ proc newTooManyChannels(): ref TooManyChannels = proc cleanupChann(m: Mplex, chann: LPChannel) {.async, inline.} = ## remove the local channel from the internal tables ## - await chann.join() - m.channels[chann.initiator].del(chann.id) - trace "cleaned up channel", id = chann.id, oid = $chann.oid + try: + await chann.join() + m.channels[chann.initiator].del(chann.id) + trace "cleaned up channel", id = chann.id, oid = $chann.oid - when defined(libp2p_expensive_metrics): - libp2p_mplex_channels.set( - m.channels[chann.initiator].len.int64, - labelValues = [$chann.initiator, $m.connection.peerInfo]) + when defined(libp2p_expensive_metrics): + libp2p_mplex_channels.set( + m.channels[chann.initiator].len.int64, + labelValues = [$chann.initiator, $m.connection.peerInfo]) + except CancelledError: + # This is top-level procedure which will work as separate task, so it + # do not need to propogate CancelledError. + trace "Unexpected cancellation in mplex channel cleanup" + except CatchableError as exc: + trace "error cleaning up mplex channel", exc = exc.msg proc newStreamInternal*(m: Mplex, initiator: bool = true, @@ -90,7 +97,8 @@ proc newStreamInternal*(m: Mplex, m.channels[initiator][id] = result - asyncCheck m.cleanupChann(result) + # All the errors are handled inside `cleanupChann()` procedure. + asyncSpawn m.cleanupChann(result) when defined(libp2p_expensive_metrics): libp2p_mplex_channels.set( @@ -104,12 +112,13 @@ proc handleStream(m: Mplex, chann: LPChannel) {.async.} = await m.streamHandler(chann) trace "finished handling stream" doAssert(chann.closed, "connection not closed by handler!") - except CancelledError as exc: - trace "cancelling stream handler", exc = exc.msg + except CancelledError: + trace "Unexpected cancellation in stream handler" await chann.reset() - raise exc + # This is top-level procedure which will work as separate task, so it + # do not need to propogate CancelledError. except CatchableError as exc: - trace "exception in stream handler", exc = exc.msg + trace "Exception in mplex stream handler", exc = exc.msg await chann.reset() method handle*(m: Mplex) {.async, gcsafe.} = @@ -145,7 +154,8 @@ method handle*(m: Mplex) {.async, gcsafe.} = tmp else: if m.channels[false].len > m.maxChannCount - 1: - warn "too many channels created by remote peer", allowedMax = MaxChannelCount + warn "too many channels created by remote peer", + allowedMax = MaxChannelCount raise newTooManyChannels() let name = string.fromBytes(data) @@ -160,12 +170,14 @@ method handle*(m: Mplex) {.async, gcsafe.} = trace "created channel" if not isNil(m.streamHandler): - # launch handler task - asyncCheck m.handleStream(channel) + # Launch handler task + # All the errors are handled inside `handleStream()` procedure. + asyncSpawn m.handleStream(channel) of MessageType.MsgIn, MessageType.MsgOut: if data.len > MaxMsgSize: - warn "attempting to send a packet larger than allowed", allowed = MaxMsgSize + warn "attempting to send a packet larger than allowed", + allowed = MaxMsgSize raise newLPStreamLimitError() trace "pushing data to channel" @@ -180,8 +192,10 @@ method handle*(m: Mplex) {.async, gcsafe.} = trace "resetting channel" await channel.reset() trace "reset channel" - except CancelledError as exc: - raise exc + except CancelledError: + # This procedure is spawned as task and it is not part of public API, so + # there no way for this procedure to be cancelled implicitely. + trace "Unexpected cancellation in mplex handler" except CatchableError as exc: trace "Exception occurred", exception = exc.msg, oid = $m.oid diff --git a/libp2p/protocols/pubsub/floodsub.nim b/libp2p/protocols/pubsub/floodsub.nim index b581b6a66..578ecc89d 100644 --- a/libp2p/protocols/pubsub/floodsub.nim +++ b/libp2p/protocols/pubsub/floodsub.nim @@ -100,8 +100,14 @@ method init*(f: FloodSub) = ## connection for a protocol string ## e.g. ``/floodsub/1.0.0``, etc... ## - - await f.handleConn(conn, proto) + try: + await f.handleConn(conn, proto) + except CancelledError: + # This is top-level procedure which will work as separate task, so it + # do not need to propogate CancelledError. + trace "Unexpected cancellation in floodsub handler" + except CatchableError as exc: + trace "FloodSub handler leaks an error", exc = exc.msg f.handler = handler f.codec = FloodSubCodec diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 173e91dd5..8a32d8bf3 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -75,8 +75,14 @@ method init*(g: GossipSub) = ## connection for a protocol string ## e.g. ``/floodsub/1.0.0``, etc... ## - - await g.handleConn(conn, proto) + try: + await g.handleConn(conn, proto) + except CancelledError: + # This is top-level procedure which will work as separate task, so it + # do not need to propogate CancelledError. + trace "Unexpected cancellation in gossipsub handler" + except CatchableError as exc: + trace "GossipSub handler leaks an error", exc = exc.msg g.handler = handler g.codec = GossipSubCodec diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 8cf50aeba..3c7894ac2 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -107,8 +107,10 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} = if p.sendConn == conn: p.sendConn = nil - except CancelledError as exc: - raise exc + except CancelledError: + # This is top-level procedure which will work as separate task, so it + # do not need to propogate CancelledError. + trace "Unexpected cancellation in PubSubPeer.handle" except CatchableError as exc: trace "Exception occurred in PubSubPeer.handle", exc = exc.msg finally: @@ -174,7 +176,9 @@ proc getSendConn(p: PubSubPeer): Future[Connection] {.async.} = trace "Caching new send connection", oid = $newConn.oid p.sendConn = newConn - asyncCheck p.handle(newConn) # start a read loop on the new connection + # Start a read loop on the new connection. + # All the errors are handled inside `handle()` procedure. + asyncSpawn p.handle(newConn) return newConn except CancelledError as exc: raise exc diff --git a/libp2p/protocols/secure/secure.nim b/libp2p/protocols/secure/secure.nim index f73621d4f..3f5338e1b 100644 --- a/libp2p/protocols/secure/secure.nim +++ b/libp2p/protocols/secure/secure.nim @@ -61,10 +61,21 @@ proc handleConn*(s: Secure, conn: Connection, initiator: bool): Future[Connection] {.async, gcsafe.} = var sconn = await s.handshake(conn, initiator) + + proc cleanup() {.async.} = + try: + await conn.join() + await sconn.close() + except CancelledError: + # This is top-level procedure which will work as separate task, so it + # do not need to propogate CancelledError. + discard + except CatchableError as exc: + trace "error cleaning up secure connection", errMsg = exc.msg + if not isNil(sconn): - conn.join() - .addCallback do(udata: pointer = nil): - asyncCheck sconn.close() + # All the errors are handled inside `cleanup()` procedure. + asyncSpawn cleanup() return sconn diff --git a/libp2p/switch.nim b/libp2p/switch.nim index 23e120476..f7b9aab1b 100644 --- a/libp2p/switch.nim +++ b/libp2p/switch.nim @@ -354,10 +354,21 @@ proc internalConnect(s: Switch, if isNil(conn): # None of the addresses connected raise newException(CatchableError, "Unable to establish outgoing link") - conn.closeEvent.wait() - .addCallback do(udata: pointer): - asyncCheck s.triggerConnEvent( - peerId, ConnEvent(kind: ConnEventKind.Disconnected)) + proc peerCleanup() {.async.} = + try: + await conn.closeEvent.wait() + await s.triggerConnEvent(peerId, + ConnEvent(kind: ConnEventKind.Disconnected)) + except CancelledError: + # This is top-level procedure which will work as separate task, so it + # do not need to propogate CancelledError. + trace "Unexpected cancellation in switch peer connect cleanup" + except CatchableError as exc: + trace "Unexpected exception in switch peer connect cleanup", + errMsg = exc.msg + + # All the errors are handled inside `cleanup()` procedure. + asyncSpawn peerCleanup() await s.triggerConnEvent( peerId, ConnEvent(kind: ConnEventKind.Connected, incoming: false)) @@ -489,13 +500,37 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} = await s.identify(muxer) let peerId = muxer.connection.peerInfo.peerId - muxer.connection.closeEvent.wait() - .addCallback do(udata: pointer): - asyncCheck s.triggerConnEvent( - peerId, ConnEvent(kind: ConnEventKind.Disconnected)) - asyncCheck s.triggerConnEvent( - peerId, ConnEvent(kind: ConnEventKind.Connected, incoming: true)) + proc peerCleanup() {.async.} = + try: + await muxer.connection.closeEvent.wait() + await s.triggerConnEvent(peerId, + ConnEvent(kind: ConnEventKind.Disconnected)) + except CancelledError: + # This is top-level procedure which will work as separate task, so it + # do not need to propogate CancelledError. + trace "Unexpected cancellation in switch muxer cleanup" + except CatchableError as exc: + trace "Unexpected exception in switch muxer cleanup", + errMsg = exc.msg + + proc peerStartup() {.async.} = + try: + await s.triggerConnEvent(peerId, + ConnEvent(kind: ConnEventKind.Connected, + incoming: true)) + except CancelledError: + # This is top-level procedure which will work as separate task, so it + # do not need to propogate CancelledError. + trace "Unexpected cancellation in switch muxer startup" + except CatchableError as exc: + trace "Unexpected exception in switch muxer startup", + errMsg = exc.msg + + # All the errors are handled inside `peerCleanup()` procedure. + asyncSpawn peerCleanup() + # All the errors are handled inside `peerStartup()` procedure. + asyncSpawn peerStartup() except CancelledError as exc: await muxer.close() diff --git a/libp2p/transports/tcptransport.nim b/libp2p/transports/tcptransport.nim index 4d0963b82..3e62e5b2a 100644 --- a/libp2p/transports/tcptransport.nim +++ b/libp2p/transports/tcptransport.nim @@ -76,13 +76,16 @@ proc connHandler*(t: TcpTransport, if not(isNil(conn)): await conn.close() t.clients.keepItIf(it != client) - except CancelledError as exc: - raise exc + except CancelledError: + # This is top-level procedure which will work as separate task, so it + # do not need to propogate CancelledError. + trace "Unexpected cancellation in transport's cleanup" except CatchableError as exc: trace "error cleaning up client", exc = exc.msg t.clients.add(client) - asyncCheck cleanup() + # All the errors are handled inside `cleanup()` procedure. + asyncSpawn cleanup() result = conn proc connCb(server: StreamServer,