From 94fc4e6fd26ed2ca441ed805a5210f20ac64ef19 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Wed, 12 Feb 2020 09:37:22 -0500 Subject: [PATCH] don't use sleeps for synchronization --- libp2p/muxers/mplex/coder.nim | 18 ++++++++---------- libp2p/muxers/mplex/mplex.nim | 22 ++++++++++------------ 2 files changed, 18 insertions(+), 22 deletions(-) diff --git a/libp2p/muxers/mplex/coder.nim b/libp2p/muxers/mplex/coder.nim index 7e6c9a4bc..0451805d4 100644 --- a/libp2p/muxers/mplex/coder.nim +++ b/libp2p/muxers/mplex/coder.nim @@ -43,21 +43,19 @@ proc readMplexVarint(conn: Connection): Future[Option[uint]] {.async, gcsafe.} = except LPStreamIncompleteError as exc: trace "unable to read varint", exc = exc.msg -proc readMsg*(conn: Connection): Future[Option[Msg]] {.async, gcsafe.} = +proc readMsg*(conn: Connection): Future[Msg] {.async, gcsafe.} = let headerVarint = await conn.readMplexVarint() - if headerVarint.isNone: - return - - trace "read header varint", varint = $headerVarint + trace "read header varint", varint = headerVarint let dataLenVarint = await conn.readMplexVarint() + trace "read data len varing", varint = dataLenVarint var data: seq[byte] - if dataLenVarint.isSome and dataLenVarint.get() > 0.uint: - data = await conn.read(dataLenVarint.get().int) - trace "read size varint", varint = $dataLenVarint + if dataLenVarint.int > 0: + data = await conn.read(dataLenVarint.int) + trace "read data", data = data - let header = headerVarint.get() - result = some((header shr 3, MessageType(header and 0x7), data)) + let header = headerVarint + result = (header shr 3, MessageType(header and 0x7), data) proc writeMsg*(conn: Connection, id: uint, diff --git a/libp2p/muxers/mplex/mplex.nim b/libp2p/muxers/mplex/mplex.nim index 4a8769daa..41c49eb5d 100644 --- a/libp2p/muxers/mplex/mplex.nim +++ b/libp2p/muxers/mplex/mplex.nim @@ -23,6 +23,8 @@ import ../muxer, logScope: topic = "Mplex" +const DefaultRWTimeout = InfiniteDuration + type Mplex* = ref object of Muxer remote*: Table[uint, LPChannel] @@ -64,14 +66,10 @@ method handle*(m: Mplex) {.async, gcsafe.} = try: while not m.connection.closed: trace "waiting for data" - let msg = await m.connection.readMsg() - if msg.isNone: - trace "connection EOF" - # TODO: allow poll with timeout to avoid using `sleepAsync` - await sleepAsync(1.millis) - continue - - let (id, msgType, data) = msg.get() + let (id, msgType, data) = await m.connection.readMsg() + trace "read message from connection", id = id, + msgType = msgType, + data = data let initiator = bool(ord(msgType) and 1) var channel: LPChannel if MessageType(msgType) != MessageType.New: @@ -80,7 +78,6 @@ method handle*(m: Mplex) {.async, gcsafe.} = trace "Channel not found, skipping", id = id, initiator = initiator, msg = msgType - await sleepAsync(1.millis) continue channel = channels[id] @@ -99,7 +96,6 @@ method handle*(m: Mplex) {.async, gcsafe.} = # asyncCheck cleanupChann(m, channel, initiator)) asyncCheck m.streamHandler(stream) - continue of MessageType.MsgIn, MessageType.MsgOut: trace "pushing data to channel", id = id, @@ -145,8 +141,10 @@ proc newMplex*(conn: Connection, .addCallback do (udata: pointer): trace "connection closed, cleaning up mplex" asyncCheck m.close() - -method newStream*(m: Mplex, name: string = "", lazy: bool = false): Future[Connection] {.async, gcsafe.} = + +method newStream*(m: Mplex, + name: string = "", + lazy: bool = false): Future[Connection] {.async, gcsafe.} = let channel = await m.newStreamInternal(lazy = lazy) if not lazy: await channel.open()