From 005e0884052cc930a239f8c45e57fd222345786d Mon Sep 17 00:00:00 2001 From: Giovanni Petrantoni Date: Tue, 12 May 2020 21:45:32 +0900 Subject: [PATCH] Properly track and close mplex handlers (#166) * Properly track and close mplex handlers * Avoid verbose warnings * Fix tryAndWarn trace issue * Handle LPEOF in lpchannel close --- libp2p/errors.nim | 10 +++++---- libp2p/muxers/mplex/lpchannel.nim | 9 +++++++- libp2p/muxers/mplex/mplex.nim | 35 ++++++++++++++++++++++--------- 3 files changed, 39 insertions(+), 15 deletions(-) diff --git a/libp2p/errors.nim b/libp2p/errors.nim index ba3909a8e..63873bd19 100644 --- a/libp2p/errors.nim +++ b/libp2p/errors.nim @@ -19,7 +19,8 @@ macro checkFutures*[T](futs: seq[Future[T]], exclude: untyped = []): untyped = if res.failed: let exc = res.readError() # We still don't abort but warn - warn "Something went wrong in a future", error=exc.name, msg = exc.msg + warn "Something went wrong in a future", error=exc.name + trace "Exception message", msg=exc.msg else: quote do: for res in `futs`: @@ -28,10 +29,11 @@ macro checkFutures*[T](futs: seq[Future[T]], exclude: untyped = []): untyped = let exc = res.readError() for i in 0..<`nexclude`: if exc of `exclude`[i]: - trace "Ignoring an error (no warning)", error=exc.name, msg = exc.msg + trace "Ignoring an error (no warning)", error=exc.name, msg=exc.msg break check # We still don't abort but warn - warn "Something went wrong in a future", error=exc.name, msg = exc.msg + warn "Something went wrong in a future", error=exc.name + trace "Exception message", msg=exc.msg proc allFuturesThrowing*[T](args: varargs[Future[T]]): Future[void] = var futs: seq[Future[T]] @@ -59,4 +61,4 @@ template tryAndWarn*(msg: static[string]; body: untyped): untyped = except CancelledError as ex: raise ex except CatchableError as ex: - warn "ignored an error", name=ex.name, msg=msg + warn "Ignored an error", name=ex.name, msg=msg diff --git a/libp2p/muxers/mplex/lpchannel.nim b/libp2p/muxers/mplex/lpchannel.nim index e22ed4b9f..2bbb900be 100644 --- a/libp2p/muxers/mplex/lpchannel.nim +++ b/libp2p/muxers/mplex/lpchannel.nim @@ -92,7 +92,14 @@ proc open*(s: LPChannel): Future[void] = method close*(s: LPChannel) {.async, gcsafe.} = s.closedLocal = true - await s.closeMessage() + # If remote is closed + # EOF will happepn here + # We can safely ignore in that case + # s.closed won't be true sadly + try: + await s.closeMessage() + except LPStreamEOFError: + discard proc resetMessage(s: LPChannel) {.async.} = await s.conn.writeMsg(s.id, s.resetCode) diff --git a/libp2p/muxers/mplex/mplex.nim b/libp2p/muxers/mplex/mplex.nim index b3fc0ffdc..fe2cb5558 100644 --- a/libp2p/muxers/mplex/mplex.nim +++ b/libp2p/muxers/mplex/mplex.nim @@ -29,6 +29,7 @@ type Mplex* = ref object of Muxer remote*: Table[uint64, LPChannel] local*: Table[uint64, LPChannel] + handlers*: array[2, Table[uint64, Future[void]]] currentId*: uint64 maxChannels*: uint64 @@ -52,15 +53,23 @@ proc newStreamInternal*(m: Mplex, result = newChannel(id, m.connection, initiator, name, lazy = lazy) m.getChannelList(initiator)[id] = result -proc cleanupChann(m: Mplex, chann: LPChannel, initiator: bool) {.async, inline.} = +proc cleanupChann(m: Mplex, chann: LPChannel, initiator: bool) {.async.} = ## call the channel's `close` to signal the ## remote that the channel is closing if not isNil(chann) and not chann.closed: + trace "cleaning up channel", id = chann.id await chann.close() await chann.cleanUp() m.getChannelList(initiator).del(chann.id) trace "cleaned up channel", id = chann.id +proc cleanupChann(chann: LPChannel) {.async.} = + trace "cleaning up channel", id = chann.id + await chann.reset() + await chann.close() + await chann.cleanUp() + trace "cleaned up channel", id = chann.id + method handle*(m: Mplex) {.async, gcsafe.} = trace "starting mplex main loop" try: @@ -85,7 +94,7 @@ method handle*(m: Mplex) {.async, gcsafe.} = of MessageType.New: let name = cast[string](data) channel = await m.newStreamInternal(false, id, name) - trace "created channel", id = id, name = name, inititator = true + trace "created channel", id = id, name = name, inititator = initiator if not isNil(m.streamHandler): let stream = newConnection(channel) stream.peerInfo = m.connection.peerInfo @@ -93,13 +102,13 @@ method handle*(m: Mplex) {.async, gcsafe.} = proc handler() {.async.} = tryAndWarn "mplex channel handler": await m.streamHandler(stream) - # TODO closing stream - # or doing cleanupChann - # will make go interop tests fail - # need to investigate why + if not initiator: + await m.cleanupChann(channel, false) - asynccheck handler() - continue + if not initiator: + m.handlers[0][id] = handler() + else: + m.handlers[1][id] = handler() of MessageType.MsgIn, MessageType.MsgOut: trace "pushing data to channel", id = id, initiator = initiator, @@ -162,10 +171,16 @@ method close*(m: Mplex) {.async, gcsafe.} = let futs = await allFinished( - toSeq(m.remote.values).mapIt(it.reset()) & - toSeq(m.local.values).mapIt(it.reset())) + toSeq(m.remote.values).mapIt(it.cleanupChann()) & + toSeq(m.local.values).mapIt(it.cleanupChann()) & + toSeq(m.handlers[0].values).mapIt(it) & + toSeq(m.handlers[1].values).mapIt(it)) checkFutures(futs) + m.handlers[0].clear() + m.handlers[1].clear() m.remote.clear() m.local.clear() + + trace "mplex muxer closed"