From a73bc40ae4fe3712cab191d02cc2be0b6c54d1fe Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Tue, 2 Jun 2020 10:27:39 -0600 Subject: [PATCH] emulate defered --- libp2p/muxers/mplex/mplex.nim | 177 +++++++++++++++++----------------- tests/testinterop.nim | 4 - 2 files changed, 89 insertions(+), 92 deletions(-) diff --git a/libp2p/muxers/mplex/mplex.nim b/libp2p/muxers/mplex/mplex.nim index 2353eab18..6dd53b5b7 100644 --- a/libp2p/muxers/mplex/mplex.nim +++ b/libp2p/muxers/mplex/mplex.nim @@ -67,101 +67,102 @@ proc newStreamInternal*(m: Mplex, method handle*(m: Mplex) {.async, gcsafe.} = trace "starting mplex main loop", oid = m.oid try: - while not m.connection.closed: - trace "waiting for data", oid = m.oid - let (id, msgType, data) = await m.connection.readMsg() - trace "read message from connection", id = id, + try: + while not m.connection.closed: + trace "waiting for data", oid = m.oid + let (id, msgType, data) = await m.connection.readMsg() + trace "read message from connection", id = id, + msgType = msgType, + data = data.shortLog, + oid = m.oid + let initiator = bool(ord(msgType) and 1) + var channel: LPChannel + if MessageType(msgType) != MessageType.New: + let channels = m.getChannelList(initiator) + if id notin channels: + trace "Channel not found, skipping", id = id, + initiator = initiator, + msg = msgType, + oid = m.oid + continue + channel = channels[id] + + case msgType: + of MessageType.New: + let name = cast[string](data) + channel = await m.newStreamInternal(false, id, name) + trace "created channel", id = id, + name = name, + inititator = channel.initiator, + channoid = channel.oid, + oid = m.oid + if not isNil(m.streamHandler): + let stream = newConnection(channel) + m.conns.add(stream) + stream.peerInfo = m.connection.peerInfo + + var fut = newFuture[void]() + proc handler() {.async.} = + try: + await m.streamHandler(stream) + except CatchableError as exc: + trace "exception in stream handler", exc = exc.msg + finally: + m.conns.keepItIf(it != stream) + m.handlerFuts.keepItIf(it != fut) + + fut = handler() + + of MessageType.MsgIn, MessageType.MsgOut: + trace "pushing data to channel", id = id, + initiator = initiator, msgType = msgType, - data = data.shortLog, + size = data.len, + name = channel.name, + channoid = channel.oid, oid = m.oid - let initiator = bool(ord(msgType) and 1) - var channel: LPChannel - if MessageType(msgType) != MessageType.New: - let channels = m.getChannelList(initiator) - if id notin channels: - trace "Channel not found, skipping", id = id, - initiator = initiator, - msg = msgType, - oid = m.oid - continue - channel = channels[id] - case msgType: - of MessageType.New: - let name = cast[string](data) - channel = await m.newStreamInternal(false, id, name) - trace "created channel", id = id, - name = name, - inititator = channel.initiator, - channoid = channel.oid, - oid = m.oid - if not isNil(m.streamHandler): - let stream = newConnection(channel) - m.conns.add(stream) - stream.peerInfo = m.connection.peerInfo + if data.len > MaxMsgSize: + raise newLPStreamLimitError() + await channel.pushTo(data) + of MessageType.CloseIn, MessageType.CloseOut: + trace "closing channel", id = id, + initiator = initiator, + msgType = msgType, + name = channel.name, + channoid = channel.oid, + oid = m.oid - var fut = newFuture[void]() - proc handler() {.async.} = - try: - await m.streamHandler(stream) - except CatchableError as exc: - trace "exception in stream handler", exc = exc.msg - finally: - m.conns.keepItIf(it != stream) - m.handlerFuts.keepItIf(it != fut) + await channel.closeRemote() + m.getChannelList(initiator).del(id) + trace "deleted channel", id = id, + initiator = initiator, + msgType = msgType, + name = channel.name, + channoid = channel.oid, + oid = m.oid + of MessageType.ResetIn, MessageType.ResetOut: + trace "resetting channel", id = id, + initiator = initiator, + msgType = msgType, + name = channel.name, + channoid = channel.oid, + oid = m.oid - fut = handler() - - of MessageType.MsgIn, MessageType.MsgOut: - trace "pushing data to channel", id = id, - initiator = initiator, - msgType = msgType, - size = data.len, - name = channel.name, - channoid = channel.oid, - oid = m.oid - - if data.len > MaxMsgSize: - raise newLPStreamLimitError() - await channel.pushTo(data) - of MessageType.CloseIn, MessageType.CloseOut: - trace "closing channel", id = id, - initiator = initiator, - msgType = msgType, - name = channel.name, - channoid = channel.oid, - oid = m.oid - - await channel.closeRemote() - m.getChannelList(initiator).del(id) - trace "deleted channel", id = id, - initiator = initiator, - msgType = msgType, - name = channel.name, - channoid = channel.oid, - oid = m.oid - of MessageType.ResetIn, MessageType.ResetOut: - trace "resetting channel", id = id, - initiator = initiator, - msgType = msgType, - name = channel.name, - channoid = channel.oid, - oid = m.oid - - await channel.reset() - m.getChannelList(initiator).del(id) - trace "deleted channel", id = id, - initiator = initiator, - msgType = msgType, - name = channel.name, - channoid = channel.oid, - oid = m.oid - break + await channel.reset() + m.getChannelList(initiator).del(id) + trace "deleted channel", id = id, + initiator = initiator, + msgType = msgType, + name = channel.name, + channoid = channel.oid, + oid = m.oid + break + finally: + trace "stopping mplex main loop", oid = m.oid + await m.close() except CatchableError as exc: trace "Exception occurred", exception = exc.msg, oid = m.oid - finally: - trace "stopping mplex main loop", oid = m.oid - await m.close() proc newMplex*(conn: Connection, maxChanns: uint = MaxChannels): Mplex = diff --git a/tests/testinterop.nim b/tests/testinterop.nim index 61ffefea7..c33d26826 100644 --- a/tests/testinterop.nim +++ b/tests/testinterop.nim @@ -83,7 +83,6 @@ proc testPubSubDaemonPublish(gossip: bool = false, let smsg = cast[string](data) check smsg == pubsubData times.inc() - echo "TIMES ", times if times >= count and not finished: finished = true @@ -108,7 +107,6 @@ proc testPubSubDaemonPublish(gossip: bool = false, await wait(publisher(), 5.minutes) # should be plenty of time - echo "HEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE" result = true await nativeNode.stop() await allFutures(awaiters) @@ -144,7 +142,6 @@ proc testPubSubNodePublish(gossip: bool = false, let smsg = cast[string](message.data) check smsg == pubsubData times.inc() - echo "TIMES ", times if times >= count and not finished: finished = true result = true # don't cancel subscription @@ -356,7 +353,6 @@ suite "Interop": check line == test await conn.writeLp(cast[seq[byte]](test)) count.inc() - echo "COUNT ", count testFuture.complete(count) await conn.close()