From 1b4876d26d5f6aeb11cd18ccdaf81694d5221385 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Mon, 25 May 2020 08:33:24 -0600 Subject: [PATCH] emulate `defered` --- libp2p/muxers/mplex/mplex.nim | 203 ++++++++++++------------- libp2p/muxers/muxer.nim | 2 +- libp2p/protocols/pubsub/pubsubpeer.nim | 37 ++--- 3 files changed, 117 insertions(+), 125 deletions(-) diff --git a/libp2p/muxers/mplex/mplex.nim b/libp2p/muxers/mplex/mplex.nim index 2353eab..eca5389 100644 --- a/libp2p/muxers/mplex/mplex.nim +++ b/libp2p/muxers/mplex/mplex.nim @@ -67,101 +67,102 @@ proc newStreamInternal*(m: Mplex, method handle*(m: Mplex) {.async, gcsafe.} = trace "starting mplex main loop", oid = m.oid try: - while not m.connection.closed: - trace "waiting for data", oid = m.oid - let (id, msgType, data) = await m.connection.readMsg() - trace "read message from connection", id = id, + try: + while not m.connection.closed: + trace "waiting for data", oid = m.oid + let (id, msgType, data) = await m.connection.readMsg() + trace "read message from connection", id = id, + msgType = msgType, + data = data.shortLog, + oid = m.oid + let initiator = bool(ord(msgType) and 1) + var channel: LPChannel + if MessageType(msgType) != MessageType.New: + let channels = m.getChannelList(initiator) + if id notin channels: + trace "Channel not found, skipping", id = id, + initiator = initiator, + msg = msgType, + oid = m.oid + continue + channel = channels[id] + + case msgType: + of MessageType.New: + let name = cast[string](data) + channel = await m.newStreamInternal(false, id, name) + trace "created channel", id = id, + name = name, + inititator = channel.initiator, + channoid = channel.oid, + oid = m.oid + if not isNil(m.streamHandler): + let stream = newConnection(channel) + m.conns.add(stream) + stream.peerInfo = m.connection.peerInfo + + var fut = newFuture[void]() + proc handler() {.async.} = + try: + await m.streamHandler(stream) + except CatchableError as exc: + trace "exception in stream handler", exc = exc.msg + finally: + m.conns.keepItIf(it != stream) + m.handlerFuts.keepItIf(it != fut) + + fut = handler() + + of MessageType.MsgIn, MessageType.MsgOut: + trace "pushing data to channel", id = id, + initiator = initiator, msgType = msgType, - data = data.shortLog, + size = data.len, + name = channel.name, + channoid = channel.oid, oid = m.oid - let initiator = bool(ord(msgType) and 1) - var channel: LPChannel - if MessageType(msgType) != MessageType.New: - let channels = m.getChannelList(initiator) - if id notin channels: - trace "Channel not found, skipping", id = id, - initiator = initiator, - msg = msgType, - oid = m.oid - continue - channel = channels[id] - case msgType: - of MessageType.New: - let name = cast[string](data) - channel = await m.newStreamInternal(false, id, name) - trace "created channel", id = id, - name = name, - inititator = channel.initiator, - channoid = channel.oid, - oid = m.oid - if not isNil(m.streamHandler): - let stream = newConnection(channel) - m.conns.add(stream) - stream.peerInfo = m.connection.peerInfo + if data.len > MaxMsgSize: + raise newLPStreamLimitError() + await channel.pushTo(data) + of MessageType.CloseIn, MessageType.CloseOut: + trace "closing channel", id = id, + initiator = initiator, + msgType = msgType, + name = channel.name, + channoid = channel.oid, + oid = m.oid - var fut = newFuture[void]() - proc handler() {.async.} = - try: - await m.streamHandler(stream) - except CatchableError as exc: - trace "exception in stream handler", exc = exc.msg - finally: - m.conns.keepItIf(it != stream) - m.handlerFuts.keepItIf(it != fut) + await channel.closeRemote() + m.getChannelList(initiator).del(id) + trace "deleted channel", id = id, + initiator = initiator, + msgType = msgType, + name = channel.name, + channoid = channel.oid, + oid = m.oid + of MessageType.ResetIn, MessageType.ResetOut: + trace "resetting channel", id = id, + initiator = initiator, + msgType = msgType, + name = channel.name, + channoid = channel.oid, + oid = m.oid - fut = handler() - - of MessageType.MsgIn, MessageType.MsgOut: - trace "pushing data to channel", id = id, - initiator = initiator, - msgType = msgType, - size = data.len, - name = channel.name, - channoid = channel.oid, - oid = m.oid - - if data.len > MaxMsgSize: - raise newLPStreamLimitError() - await channel.pushTo(data) - of MessageType.CloseIn, MessageType.CloseOut: - trace "closing channel", id = id, - initiator = initiator, - msgType = msgType, - name = channel.name, - channoid = channel.oid, - oid = m.oid - - await channel.closeRemote() - m.getChannelList(initiator).del(id) - trace "deleted channel", id = id, - initiator = initiator, - msgType = msgType, - name = channel.name, - channoid = channel.oid, - oid = m.oid - of MessageType.ResetIn, MessageType.ResetOut: - trace "resetting channel", id = id, - initiator = initiator, - msgType = msgType, - name = channel.name, - channoid = channel.oid, - oid = m.oid - - await channel.reset() - m.getChannelList(initiator).del(id) - trace "deleted channel", id = id, - initiator = initiator, - msgType = msgType, - name = channel.name, - channoid = channel.oid, - oid = m.oid - break + await channel.reset() + m.getChannelList(initiator).del(id) + trace "deleted channel", id = id, + initiator = initiator, + msgType = msgType, + name = channel.name, + channoid = channel.oid, + oid = m.oid + break + finally: + trace "stopping mplex main loop", oid = m.oid + await m.close() except CatchableError as exc: trace "Exception occurred", exception = exc.msg, oid = m.oid - finally: - trace "stopping mplex main loop", oid = m.oid - await m.close() proc newMplex*(conn: Connection, maxChanns: uint = MaxChannels): Mplex = @@ -199,26 +200,16 @@ method close*(m: Mplex) {.async, gcsafe.} = try: trace "closing mplex muxer", oid = m.oid - let channs = toSeq(m.remote.values) & - toSeq(m.local.values) + await all( + toSeq(m.remote.values).mapIt(it.reset()) & + toSeq(m.local.values).mapIt(it.reset())) - for chann in channs: - try: - await chann.reset() - except CatchableError as exc: - warn "error resetting channel", exc = exc.msg - - for conn in m.conns: - try: - await conn.close() - except CatchableError as exc: - warn "error closing channel's connection" - - checkFutures( - await allFinished(m.handlerFuts)) - - await m.connection.close() + await all(m.conns.mapIt(it.close())) # dispose of channel's connections + await all(m.handlerFuts) + except CatchableError as exc: + trace "exception in mplex close", exc = exc.msg finally: + await m.connection.close() m.remote.clear() m.local.clear() m.conns = @[] diff --git a/libp2p/muxers/muxer.nim b/libp2p/muxers/muxer.nim index 7a95b1c..800ecc4 100644 --- a/libp2p/muxers/muxer.nim +++ b/libp2p/muxers/muxer.nim @@ -60,7 +60,7 @@ method init(c: MuxerProvider) = if not isNil(c.muxerHandler): futs &= c.muxerHandler(muxer) - checkFutures(await allFinished(futs)) + await all(futs) except CatchableError as exc: trace "exception in muxer handler", exc = exc.msg diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 0b7b412..50b5e0b 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -66,28 +66,29 @@ proc sendObservers(p: PubSubPeer, msg: var RPCMsg) = proc handle*(p: PubSubPeer, conn: Connection) {.async.} = trace "handling pubsub rpc", peer = p.id, closed = conn.closed try: - while not conn.closed: - trace "waiting for data", peer = p.id, closed = conn.closed - let data = await conn.readLp(64 * 1024) - let digest = $(sha256.digest(data)) - trace "read data from peer", peer = p.id, data = data.shortLog - if digest in p.recvdRpcCache: - trace "message already received, skipping", peer = p.id - continue + try: + while not conn.closed: + trace "waiting for data", peer = p.id, closed = conn.closed + let data = await conn.readLp(64 * 1024) + let digest = $(sha256.digest(data)) + trace "read data from peer", peer = p.id, data = data.shortLog + if digest in p.recvdRpcCache: + trace "message already received, skipping", peer = p.id + continue - var msg = decodeRpcMsg(data) - trace "decoded msg from peer", peer = p.id, msg = msg.shortLog + var msg = decodeRpcMsg(data) + trace "decoded msg from peer", peer = p.id, msg = msg.shortLog + # trigger hooks + for obs in p.observers[]: + obs.onRecv(p, msg) + await p.handler(p, @[msg]) + p.recvdRpcCache.put(digest) + finally: + trace "exiting pubsub peer read loop", peer = p.id + await conn.close() - p.recvObservers(msg) # hooks can modify the message - - await p.handler(p, @[msg]) - p.recvdRpcCache.put(digest) except CatchableError as exc: trace "Exception occurred in PubSubPeer.handle", exc = exc.msg - finally: - trace "exiting pubsub peer read loop", peer = p.id - if not conn.closed(): - await conn.close() proc send*(p: PubSubPeer, msgs: seq[RPCMsg]) {.async.} = for m in msgs.items: