Properly track and close mplex handlers (#166)
* Properly track and close mplex handlers * Avoid verbose warnings * Fix tryAndWarn trace issue * Handle LPEOF in lpchannel close
This commit is contained in:
parent
618e01eba3
commit
005e088405
|
@ -19,7 +19,8 @@ macro checkFutures*[T](futs: seq[Future[T]], exclude: untyped = []): untyped =
|
|||
if res.failed:
|
||||
let exc = res.readError()
|
||||
# We still don't abort but warn
|
||||
warn "Something went wrong in a future", error=exc.name, msg = exc.msg
|
||||
warn "Something went wrong in a future", error=exc.name
|
||||
trace "Exception message", msg=exc.msg
|
||||
else:
|
||||
quote do:
|
||||
for res in `futs`:
|
||||
|
@ -31,7 +32,8 @@ macro checkFutures*[T](futs: seq[Future[T]], exclude: untyped = []): untyped =
|
|||
trace "Ignoring an error (no warning)", error=exc.name, msg=exc.msg
|
||||
break check
|
||||
# We still don't abort but warn
|
||||
warn "Something went wrong in a future", error=exc.name, msg = exc.msg
|
||||
warn "Something went wrong in a future", error=exc.name
|
||||
trace "Exception message", msg=exc.msg
|
||||
|
||||
proc allFuturesThrowing*[T](args: varargs[Future[T]]): Future[void] =
|
||||
var futs: seq[Future[T]]
|
||||
|
@ -59,4 +61,4 @@ template tryAndWarn*(msg: static[string]; body: untyped): untyped =
|
|||
except CancelledError as ex:
|
||||
raise ex
|
||||
except CatchableError as ex:
|
||||
warn "ignored an error", name=ex.name, msg=msg
|
||||
warn "Ignored an error", name=ex.name, msg=msg
|
||||
|
|
|
@ -92,7 +92,14 @@ proc open*(s: LPChannel): Future[void] =
|
|||
|
||||
method close*(s: LPChannel) {.async, gcsafe.} =
|
||||
s.closedLocal = true
|
||||
# If remote is closed
|
||||
# EOF will happepn here
|
||||
# We can safely ignore in that case
|
||||
# s.closed won't be true sadly
|
||||
try:
|
||||
await s.closeMessage()
|
||||
except LPStreamEOFError:
|
||||
discard
|
||||
|
||||
proc resetMessage(s: LPChannel) {.async.} =
|
||||
await s.conn.writeMsg(s.id, s.resetCode)
|
||||
|
|
|
@ -29,6 +29,7 @@ type
|
|||
Mplex* = ref object of Muxer
|
||||
remote*: Table[uint64, LPChannel]
|
||||
local*: Table[uint64, LPChannel]
|
||||
handlers*: array[2, Table[uint64, Future[void]]]
|
||||
currentId*: uint64
|
||||
maxChannels*: uint64
|
||||
|
||||
|
@ -52,15 +53,23 @@ proc newStreamInternal*(m: Mplex,
|
|||
result = newChannel(id, m.connection, initiator, name, lazy = lazy)
|
||||
m.getChannelList(initiator)[id] = result
|
||||
|
||||
proc cleanupChann(m: Mplex, chann: LPChannel, initiator: bool) {.async, inline.} =
|
||||
proc cleanupChann(m: Mplex, chann: LPChannel, initiator: bool) {.async.} =
|
||||
## call the channel's `close` to signal the
|
||||
## remote that the channel is closing
|
||||
if not isNil(chann) and not chann.closed:
|
||||
trace "cleaning up channel", id = chann.id
|
||||
await chann.close()
|
||||
await chann.cleanUp()
|
||||
m.getChannelList(initiator).del(chann.id)
|
||||
trace "cleaned up channel", id = chann.id
|
||||
|
||||
proc cleanupChann(chann: LPChannel) {.async.} =
|
||||
trace "cleaning up channel", id = chann.id
|
||||
await chann.reset()
|
||||
await chann.close()
|
||||
await chann.cleanUp()
|
||||
trace "cleaned up channel", id = chann.id
|
||||
|
||||
method handle*(m: Mplex) {.async, gcsafe.} =
|
||||
trace "starting mplex main loop"
|
||||
try:
|
||||
|
@ -85,7 +94,7 @@ method handle*(m: Mplex) {.async, gcsafe.} =
|
|||
of MessageType.New:
|
||||
let name = cast[string](data)
|
||||
channel = await m.newStreamInternal(false, id, name)
|
||||
trace "created channel", id = id, name = name, inititator = true
|
||||
trace "created channel", id = id, name = name, inititator = initiator
|
||||
if not isNil(m.streamHandler):
|
||||
let stream = newConnection(channel)
|
||||
stream.peerInfo = m.connection.peerInfo
|
||||
|
@ -93,13 +102,13 @@ method handle*(m: Mplex) {.async, gcsafe.} =
|
|||
proc handler() {.async.} =
|
||||
tryAndWarn "mplex channel handler":
|
||||
await m.streamHandler(stream)
|
||||
# TODO closing stream
|
||||
# or doing cleanupChann
|
||||
# will make go interop tests fail
|
||||
# need to investigate why
|
||||
if not initiator:
|
||||
await m.cleanupChann(channel, false)
|
||||
|
||||
asynccheck handler()
|
||||
continue
|
||||
if not initiator:
|
||||
m.handlers[0][id] = handler()
|
||||
else:
|
||||
m.handlers[1][id] = handler()
|
||||
of MessageType.MsgIn, MessageType.MsgOut:
|
||||
trace "pushing data to channel", id = id,
|
||||
initiator = initiator,
|
||||
|
@ -162,10 +171,16 @@ method close*(m: Mplex) {.async, gcsafe.} =
|
|||
|
||||
let
|
||||
futs = await allFinished(
|
||||
toSeq(m.remote.values).mapIt(it.reset()) &
|
||||
toSeq(m.local.values).mapIt(it.reset()))
|
||||
toSeq(m.remote.values).mapIt(it.cleanupChann()) &
|
||||
toSeq(m.local.values).mapIt(it.cleanupChann()) &
|
||||
toSeq(m.handlers[0].values).mapIt(it) &
|
||||
toSeq(m.handlers[1].values).mapIt(it))
|
||||
|
||||
checkFutures(futs)
|
||||
|
||||
m.handlers[0].clear()
|
||||
m.handlers[1].clear()
|
||||
m.remote.clear()
|
||||
m.local.clear()
|
||||
|
||||
trace "mplex muxer closed"
|
||||
|
|
Loading…
Reference in New Issue