diff --git a/libp2p/muxers/mplex/mplex.nim b/libp2p/muxers/mplex/mplex.nim index 40e9bf12a..ab8889d3f 100644 --- a/libp2p/muxers/mplex/mplex.nim +++ b/libp2p/muxers/mplex/mplex.nim @@ -81,21 +81,29 @@ method handle*(m: Mplex) {.async, gcsafe.} = let channels = m.getChannelList(initiator) if id notin channels: trace "Channel not found, skipping", id = id, - initiator = initiator, - msg = msgType, - oid = m.oid + initiator = initiator, + msg = msgType, + oid = m.oid continue channel = channels[id] + logScope: + id = id + initiator = initiator + msgType = msgType + size = data.len + oid = m.oid + case msgType: of MessageType.New: let name = string.fromBytes(data) channel = await m.newStreamInternal(false, id, name) - trace "created channel", id = id, - name = name, - inititator = channel.initiator, - channoid = channel.oid, - oid = m.oid + logScope: + name = channel.name + chann_iod = channel.oid + + trace "created channel" + if not isNil(m.streamHandler): let stream = newConnection(channel) m.conns.add(stream) @@ -106,6 +114,8 @@ method handle*(m: Mplex) {.async, gcsafe.} = proc handler() {.async.} = try: await m.streamHandler(stream) + trace "finished handling stream" + # doAssert(stream.closed, "connection not closed by handler!") except CatchableError as exc: trace "exception in stream handler", exc = exc.msg finally: @@ -115,53 +125,39 @@ method handle*(m: Mplex) {.async, gcsafe.} = fut = handler() of MessageType.MsgIn, MessageType.MsgOut: - trace "pushing data to channel", id = id, - initiator = initiator, - msgType = msgType, - size = data.len, - name = channel.name, - channoid = channel.oid, - oid = m.oid + logScope: + name = channel.name + chann_iod = channel.oid + + trace "pushing data to channel" if data.len > MaxMsgSize: raise newLPStreamLimitError() await channel.pushTo(data) of MessageType.CloseIn, MessageType.CloseOut: - trace "closing channel", id = id, - initiator = initiator, - msgType = msgType, - name = channel.name, - channoid = channel.oid, - oid = m.oid + logScope: + name = channel.name + chann_iod = channel.oid + + trace "closing channel" await channel.closeRemote() m.getChannelList(initiator).del(id) - trace "deleted channel", id = id, - initiator = initiator, - msgType = msgType, - name = channel.name, - channoid = channel.oid, - oid = m.oid + trace "deleted channel" of MessageType.ResetIn, MessageType.ResetOut: - trace "resetting channel", id = id, - initiator = initiator, - msgType = msgType, - name = channel.name, - channoid = channel.oid, - oid = m.oid + logScope: + name = channel.name + chann_iod = channel.oid + + trace "resetting channel" await channel.reset() m.getChannelList(initiator).del(id) - trace "deleted channel", id = id, - initiator = initiator, - msgType = msgType, - name = channel.name, - channoid = channel.oid, - oid = m.oid + trace "deleted channel" break finally: - trace "stopping mplex main loop", oid = m.oid - await m.close() + trace "stopping mplex main loop", oid = m.oid + await m.close() except CatchableError as exc: trace "Exception occurred", exception = exc.msg, oid = m.oid