Remove asyncCheck from codebase. (#345)

* Remove asyncCheck from codebase.

* Replace all `discard` statements with new `asyncSpawn`.

* Bump `nim-chronos` requirement.
This commit is contained in:
Eugene Kabanov 2020-09-04 19:30:45 +03:00 committed by GitHub
parent 5819c6a9a7
commit 0b85192119
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 155 additions and 69 deletions

View File

@ -11,7 +11,7 @@ requires "nim >= 1.2.0",
"nimcrypto >= 0.4.1", "nimcrypto >= 0.4.1",
"bearssl >= 0.1.4", "bearssl >= 0.1.4",
"chronicles >= 0.7.2", "chronicles >= 0.7.2",
"chronos >= 2.3.8", "chronos >= 2.5.2",
"metrics", "metrics",
"secp256k1", "secp256k1",
"stew >= 0.1.0" "stew >= 0.1.0"

View File

@ -120,10 +120,17 @@ proc onClose(c: ConnManager, conn: Connection) {.async.} =
## ##
## triggers the connections resource cleanup ## triggers the connections resource cleanup
## ##
try:
await conn.join() await conn.join()
trace "triggering connection cleanup", peer = $conn.peerInfo trace "triggering connection cleanup", peer = $conn.peerInfo
await c.cleanupConn(conn) await c.cleanupConn(conn)
except CancelledError:
# This is top-level procedure which will work as separate task, so it
# do not need to propogate CancelledError.
trace "Unexpected cancellation in connection manager's cleanup"
except CatchableError as exc:
trace "Unexpected exception in connection manager's cleanup",
errMsg = exc.msg
proc selectConn*(c: ConnManager, proc selectConn*(c: ConnManager,
peerId: PeerID, peerId: PeerID,
@ -184,8 +191,9 @@ proc storeConn*(c: ConnManager, conn: Connection) =
c.conns[peerId].incl(conn) c.conns[peerId].incl(conn)
# launch on close listener # Launch on close listener
asyncCheck c.onClose(conn) # All the errors are handled inside `onClose()` procedure.
asyncSpawn c.onClose(conn)
libp2p_peers.set(c.conns.len.int64) libp2p_peers.set(c.conns.len.int64)
trace "stored connection", connections = c.conns.len, peer = peerId trace "stored connection", connections = c.conns.len, peer = peerId

View File

@ -66,18 +66,6 @@ template withWriteLock(lock: AsyncLock, body: untyped): untyped =
if not(isNil(lock)) and lock.locked: if not(isNil(lock)) and lock.locked:
lock.release() lock.release()
template withEOFExceptions(body: untyped): untyped =
try:
body
except CancelledError as exc:
raise exc
except LPStreamEOFError as exc:
trace "muxed connection EOF", exc = exc.msg
except LPStreamClosedError as exc:
trace "muxed connection closed", exc = exc.msg
except LPStreamIncompleteError as exc:
trace "incomplete message", exc = exc.msg
proc closeMessage(s: LPChannel) {.async.} = proc closeMessage(s: LPChannel) {.async.} =
logScope: logScope:
id = s.id id = s.id
@ -104,11 +92,22 @@ proc resetMessage(s: LPChannel) {.async.} =
# stack = getStackTrace() # stack = getStackTrace()
## send reset message - this will not raise ## send reset message - this will not raise
withEOFExceptions: try:
withWriteLock(s.writeLock): withWriteLock(s.writeLock):
trace "sending reset message" trace "sending reset message"
await s.conn.writeMsg(s.id, s.resetCode) # write reset await s.conn.writeMsg(s.id, s.resetCode) # write reset
except CancelledError:
# This procedure is called from one place and never awaited, so there no
# need to re-raise CancelledError.
trace "Unexpected cancellation while resetting channel"
except LPStreamEOFError as exc:
trace "muxed connection EOF", exc = exc.msg
except LPStreamClosedError as exc:
trace "muxed connection closed", exc = exc.msg
except LPStreamIncompleteError as exc:
trace "incomplete message", exc = exc.msg
except CatchableError as exc:
trace "Unhandled exception leak", exc = exc.msg
proc open*(s: LPChannel) {.async, gcsafe.} = proc open*(s: LPChannel) {.async, gcsafe.} =
logScope: logScope:
@ -170,10 +169,7 @@ method reset*(s: LPChannel) {.base, async, gcsafe.} =
trace "resetting channel" trace "resetting channel"
# we asyncCheck here because the other end asyncSpawn s.resetMessage()
# might be dead already - reset is always
# optimistic
asyncCheck s.resetMessage()
try: try:
# drain the buffer before closing # drain the buffer before closing
@ -210,9 +206,11 @@ method close*(s: LPChannel) {.async, gcsafe.} =
await s.closeMessage().wait(2.minutes) await s.closeMessage().wait(2.minutes)
if s.atEof: # already closed by remote close parent buffer immediately if s.atEof: # already closed by remote close parent buffer immediately
await procCall BufferStream(s).close() await procCall BufferStream(s).close()
except CancelledError as exc: except CancelledError:
trace "Unexpected cancellation while closing channel"
await s.reset() await s.reset()
raise exc # This is top-level procedure which will work as separate task, so it
# do not need to propogate CancelledError.
except CatchableError as exc: except CatchableError as exc:
trace "exception closing channel", exc = exc.msg trace "exception closing channel", exc = exc.msg
await s.reset() await s.reset()
@ -220,7 +218,8 @@ method close*(s: LPChannel) {.async, gcsafe.} =
trace "lpchannel closed local" trace "lpchannel closed local"
s.closedLocal = true s.closedLocal = true
asyncCheck closeInternal() # All the errors are handled inside `closeInternal()` procedure.
asyncSpawn closeInternal()
method initStream*(s: LPChannel) = method initStream*(s: LPChannel) =
if s.objName.len == 0: if s.objName.len == 0:

View File

@ -48,6 +48,7 @@ proc newTooManyChannels(): ref TooManyChannels =
proc cleanupChann(m: Mplex, chann: LPChannel) {.async, inline.} = proc cleanupChann(m: Mplex, chann: LPChannel) {.async, inline.} =
## remove the local channel from the internal tables ## remove the local channel from the internal tables
## ##
try:
await chann.join() await chann.join()
m.channels[chann.initiator].del(chann.id) m.channels[chann.initiator].del(chann.id)
trace "cleaned up channel", id = chann.id, oid = $chann.oid trace "cleaned up channel", id = chann.id, oid = $chann.oid
@ -56,6 +57,12 @@ proc cleanupChann(m: Mplex, chann: LPChannel) {.async, inline.} =
libp2p_mplex_channels.set( libp2p_mplex_channels.set(
m.channels[chann.initiator].len.int64, m.channels[chann.initiator].len.int64,
labelValues = [$chann.initiator, $m.connection.peerInfo]) labelValues = [$chann.initiator, $m.connection.peerInfo])
except CancelledError:
# This is top-level procedure which will work as separate task, so it
# do not need to propogate CancelledError.
trace "Unexpected cancellation in mplex channel cleanup"
except CatchableError as exc:
trace "error cleaning up mplex channel", exc = exc.msg
proc newStreamInternal*(m: Mplex, proc newStreamInternal*(m: Mplex,
initiator: bool = true, initiator: bool = true,
@ -90,7 +97,8 @@ proc newStreamInternal*(m: Mplex,
m.channels[initiator][id] = result m.channels[initiator][id] = result
asyncCheck m.cleanupChann(result) # All the errors are handled inside `cleanupChann()` procedure.
asyncSpawn m.cleanupChann(result)
when defined(libp2p_expensive_metrics): when defined(libp2p_expensive_metrics):
libp2p_mplex_channels.set( libp2p_mplex_channels.set(
@ -104,12 +112,13 @@ proc handleStream(m: Mplex, chann: LPChannel) {.async.} =
await m.streamHandler(chann) await m.streamHandler(chann)
trace "finished handling stream" trace "finished handling stream"
doAssert(chann.closed, "connection not closed by handler!") doAssert(chann.closed, "connection not closed by handler!")
except CancelledError as exc: except CancelledError:
trace "cancelling stream handler", exc = exc.msg trace "Unexpected cancellation in stream handler"
await chann.reset() await chann.reset()
raise exc # This is top-level procedure which will work as separate task, so it
# do not need to propogate CancelledError.
except CatchableError as exc: except CatchableError as exc:
trace "exception in stream handler", exc = exc.msg trace "Exception in mplex stream handler", exc = exc.msg
await chann.reset() await chann.reset()
method handle*(m: Mplex) {.async, gcsafe.} = method handle*(m: Mplex) {.async, gcsafe.} =
@ -145,7 +154,8 @@ method handle*(m: Mplex) {.async, gcsafe.} =
tmp tmp
else: else:
if m.channels[false].len > m.maxChannCount - 1: if m.channels[false].len > m.maxChannCount - 1:
warn "too many channels created by remote peer", allowedMax = MaxChannelCount warn "too many channels created by remote peer",
allowedMax = MaxChannelCount
raise newTooManyChannels() raise newTooManyChannels()
let name = string.fromBytes(data) let name = string.fromBytes(data)
@ -160,12 +170,14 @@ method handle*(m: Mplex) {.async, gcsafe.} =
trace "created channel" trace "created channel"
if not isNil(m.streamHandler): if not isNil(m.streamHandler):
# launch handler task # Launch handler task
asyncCheck m.handleStream(channel) # All the errors are handled inside `handleStream()` procedure.
asyncSpawn m.handleStream(channel)
of MessageType.MsgIn, MessageType.MsgOut: of MessageType.MsgIn, MessageType.MsgOut:
if data.len > MaxMsgSize: if data.len > MaxMsgSize:
warn "attempting to send a packet larger than allowed", allowed = MaxMsgSize warn "attempting to send a packet larger than allowed",
allowed = MaxMsgSize
raise newLPStreamLimitError() raise newLPStreamLimitError()
trace "pushing data to channel" trace "pushing data to channel"
@ -180,8 +192,10 @@ method handle*(m: Mplex) {.async, gcsafe.} =
trace "resetting channel" trace "resetting channel"
await channel.reset() await channel.reset()
trace "reset channel" trace "reset channel"
except CancelledError as exc: except CancelledError:
raise exc # This procedure is spawned as task and it is not part of public API, so
# there no way for this procedure to be cancelled implicitely.
trace "Unexpected cancellation in mplex handler"
except CatchableError as exc: except CatchableError as exc:
trace "Exception occurred", exception = exc.msg, oid = $m.oid trace "Exception occurred", exception = exc.msg, oid = $m.oid

View File

@ -100,8 +100,14 @@ method init*(f: FloodSub) =
## connection for a protocol string ## connection for a protocol string
## e.g. ``/floodsub/1.0.0``, etc... ## e.g. ``/floodsub/1.0.0``, etc...
## ##
try:
await f.handleConn(conn, proto) await f.handleConn(conn, proto)
except CancelledError:
# This is top-level procedure which will work as separate task, so it
# do not need to propogate CancelledError.
trace "Unexpected cancellation in floodsub handler"
except CatchableError as exc:
trace "FloodSub handler leaks an error", exc = exc.msg
f.handler = handler f.handler = handler
f.codec = FloodSubCodec f.codec = FloodSubCodec

View File

@ -75,8 +75,14 @@ method init*(g: GossipSub) =
## connection for a protocol string ## connection for a protocol string
## e.g. ``/floodsub/1.0.0``, etc... ## e.g. ``/floodsub/1.0.0``, etc...
## ##
try:
await g.handleConn(conn, proto) await g.handleConn(conn, proto)
except CancelledError:
# This is top-level procedure which will work as separate task, so it
# do not need to propogate CancelledError.
trace "Unexpected cancellation in gossipsub handler"
except CatchableError as exc:
trace "GossipSub handler leaks an error", exc = exc.msg
g.handler = handler g.handler = handler
g.codec = GossipSubCodec g.codec = GossipSubCodec

View File

@ -107,8 +107,10 @@ proc handle*(p: PubSubPeer, conn: Connection) {.async.} =
if p.sendConn == conn: if p.sendConn == conn:
p.sendConn = nil p.sendConn = nil
except CancelledError as exc: except CancelledError:
raise exc # This is top-level procedure which will work as separate task, so it
# do not need to propogate CancelledError.
trace "Unexpected cancellation in PubSubPeer.handle"
except CatchableError as exc: except CatchableError as exc:
trace "Exception occurred in PubSubPeer.handle", exc = exc.msg trace "Exception occurred in PubSubPeer.handle", exc = exc.msg
finally: finally:
@ -174,7 +176,9 @@ proc getSendConn(p: PubSubPeer): Future[Connection] {.async.} =
trace "Caching new send connection", oid = $newConn.oid trace "Caching new send connection", oid = $newConn.oid
p.sendConn = newConn p.sendConn = newConn
asyncCheck p.handle(newConn) # start a read loop on the new connection # Start a read loop on the new connection.
# All the errors are handled inside `handle()` procedure.
asyncSpawn p.handle(newConn)
return newConn return newConn
except CancelledError as exc: except CancelledError as exc:
raise exc raise exc

View File

@ -61,10 +61,21 @@ proc handleConn*(s: Secure,
conn: Connection, conn: Connection,
initiator: bool): Future[Connection] {.async, gcsafe.} = initiator: bool): Future[Connection] {.async, gcsafe.} =
var sconn = await s.handshake(conn, initiator) var sconn = await s.handshake(conn, initiator)
proc cleanup() {.async.} =
try:
await conn.join()
await sconn.close()
except CancelledError:
# This is top-level procedure which will work as separate task, so it
# do not need to propogate CancelledError.
discard
except CatchableError as exc:
trace "error cleaning up secure connection", errMsg = exc.msg
if not isNil(sconn): if not isNil(sconn):
conn.join() # All the errors are handled inside `cleanup()` procedure.
.addCallback do(udata: pointer = nil): asyncSpawn cleanup()
asyncCheck sconn.close()
return sconn return sconn

View File

@ -354,10 +354,21 @@ proc internalConnect(s: Switch,
if isNil(conn): # None of the addresses connected if isNil(conn): # None of the addresses connected
raise newException(CatchableError, "Unable to establish outgoing link") raise newException(CatchableError, "Unable to establish outgoing link")
conn.closeEvent.wait() proc peerCleanup() {.async.} =
.addCallback do(udata: pointer): try:
asyncCheck s.triggerConnEvent( await conn.closeEvent.wait()
peerId, ConnEvent(kind: ConnEventKind.Disconnected)) await s.triggerConnEvent(peerId,
ConnEvent(kind: ConnEventKind.Disconnected))
except CancelledError:
# This is top-level procedure which will work as separate task, so it
# do not need to propogate CancelledError.
trace "Unexpected cancellation in switch peer connect cleanup"
except CatchableError as exc:
trace "Unexpected exception in switch peer connect cleanup",
errMsg = exc.msg
# All the errors are handled inside `cleanup()` procedure.
asyncSpawn peerCleanup()
await s.triggerConnEvent( await s.triggerConnEvent(
peerId, ConnEvent(kind: ConnEventKind.Connected, incoming: false)) peerId, ConnEvent(kind: ConnEventKind.Connected, incoming: false))
@ -489,13 +500,37 @@ proc muxerHandler(s: Switch, muxer: Muxer) {.async, gcsafe.} =
await s.identify(muxer) await s.identify(muxer)
let peerId = muxer.connection.peerInfo.peerId let peerId = muxer.connection.peerInfo.peerId
muxer.connection.closeEvent.wait()
.addCallback do(udata: pointer):
asyncCheck s.triggerConnEvent(
peerId, ConnEvent(kind: ConnEventKind.Disconnected))
asyncCheck s.triggerConnEvent( proc peerCleanup() {.async.} =
peerId, ConnEvent(kind: ConnEventKind.Connected, incoming: true)) try:
await muxer.connection.closeEvent.wait()
await s.triggerConnEvent(peerId,
ConnEvent(kind: ConnEventKind.Disconnected))
except CancelledError:
# This is top-level procedure which will work as separate task, so it
# do not need to propogate CancelledError.
trace "Unexpected cancellation in switch muxer cleanup"
except CatchableError as exc:
trace "Unexpected exception in switch muxer cleanup",
errMsg = exc.msg
proc peerStartup() {.async.} =
try:
await s.triggerConnEvent(peerId,
ConnEvent(kind: ConnEventKind.Connected,
incoming: true))
except CancelledError:
# This is top-level procedure which will work as separate task, so it
# do not need to propogate CancelledError.
trace "Unexpected cancellation in switch muxer startup"
except CatchableError as exc:
trace "Unexpected exception in switch muxer startup",
errMsg = exc.msg
# All the errors are handled inside `peerCleanup()` procedure.
asyncSpawn peerCleanup()
# All the errors are handled inside `peerStartup()` procedure.
asyncSpawn peerStartup()
except CancelledError as exc: except CancelledError as exc:
await muxer.close() await muxer.close()

View File

@ -76,13 +76,16 @@ proc connHandler*(t: TcpTransport,
if not(isNil(conn)): if not(isNil(conn)):
await conn.close() await conn.close()
t.clients.keepItIf(it != client) t.clients.keepItIf(it != client)
except CancelledError as exc: except CancelledError:
raise exc # This is top-level procedure which will work as separate task, so it
# do not need to propogate CancelledError.
trace "Unexpected cancellation in transport's cleanup"
except CatchableError as exc: except CatchableError as exc:
trace "error cleaning up client", exc = exc.msg trace "error cleaning up client", exc = exc.msg
t.clients.add(client) t.clients.add(client)
asyncCheck cleanup() # All the errors are handled inside `cleanup()` procedure.
asyncSpawn cleanup()
result = conn result = conn
proc connCb(server: StreamServer, proc connCb(server: StreamServer,