don't use sleeps for synchronization

This commit is contained in:
Dmitriy Ryajov 2020-02-12 09:37:22 -05:00
parent 39dc9ad8a3
commit 94fc4e6fd2
2 changed files with 18 additions and 22 deletions

View File

@ -43,21 +43,19 @@ proc readMplexVarint(conn: Connection): Future[Option[uint]] {.async, gcsafe.} =
except LPStreamIncompleteError as exc: except LPStreamIncompleteError as exc:
trace "unable to read varint", exc = exc.msg trace "unable to read varint", exc = exc.msg
proc readMsg*(conn: Connection): Future[Option[Msg]] {.async, gcsafe.} = proc readMsg*(conn: Connection): Future[Msg] {.async, gcsafe.} =
let headerVarint = await conn.readMplexVarint() let headerVarint = await conn.readMplexVarint()
if headerVarint.isNone: trace "read header varint", varint = headerVarint
return
trace "read header varint", varint = $headerVarint
let dataLenVarint = await conn.readMplexVarint() let dataLenVarint = await conn.readMplexVarint()
trace "read data len varing", varint = dataLenVarint
var data: seq[byte] var data: seq[byte]
if dataLenVarint.isSome and dataLenVarint.get() > 0.uint: if dataLenVarint.int > 0:
data = await conn.read(dataLenVarint.get().int) data = await conn.read(dataLenVarint.int)
trace "read size varint", varint = $dataLenVarint trace "read data", data = data
let header = headerVarint.get() let header = headerVarint
result = some((header shr 3, MessageType(header and 0x7), data)) result = (header shr 3, MessageType(header and 0x7), data)
proc writeMsg*(conn: Connection, proc writeMsg*(conn: Connection,
id: uint, id: uint,

View File

@ -23,6 +23,8 @@ import ../muxer,
logScope: logScope:
topic = "Mplex" topic = "Mplex"
const DefaultRWTimeout = InfiniteDuration
type type
Mplex* = ref object of Muxer Mplex* = ref object of Muxer
remote*: Table[uint, LPChannel] remote*: Table[uint, LPChannel]
@ -64,14 +66,10 @@ method handle*(m: Mplex) {.async, gcsafe.} =
try: try:
while not m.connection.closed: while not m.connection.closed:
trace "waiting for data" trace "waiting for data"
let msg = await m.connection.readMsg() let (id, msgType, data) = await m.connection.readMsg()
if msg.isNone: trace "read message from connection", id = id,
trace "connection EOF" msgType = msgType,
# TODO: allow poll with timeout to avoid using `sleepAsync` data = data
await sleepAsync(1.millis)
continue
let (id, msgType, data) = msg.get()
let initiator = bool(ord(msgType) and 1) let initiator = bool(ord(msgType) and 1)
var channel: LPChannel var channel: LPChannel
if MessageType(msgType) != MessageType.New: if MessageType(msgType) != MessageType.New:
@ -80,7 +78,6 @@ method handle*(m: Mplex) {.async, gcsafe.} =
trace "Channel not found, skipping", id = id, trace "Channel not found, skipping", id = id,
initiator = initiator, initiator = initiator,
msg = msgType msg = msgType
await sleepAsync(1.millis)
continue continue
channel = channels[id] channel = channels[id]
@ -99,7 +96,6 @@ method handle*(m: Mplex) {.async, gcsafe.} =
# asyncCheck cleanupChann(m, channel, initiator)) # asyncCheck cleanupChann(m, channel, initiator))
asyncCheck m.streamHandler(stream) asyncCheck m.streamHandler(stream)
continue continue
of MessageType.MsgIn, MessageType.MsgOut: of MessageType.MsgIn, MessageType.MsgOut:
trace "pushing data to channel", id = id, trace "pushing data to channel", id = id,
@ -146,7 +142,9 @@ proc newMplex*(conn: Connection,
trace "connection closed, cleaning up mplex" trace "connection closed, cleaning up mplex"
asyncCheck m.close() asyncCheck m.close()
method newStream*(m: Mplex, name: string = "", lazy: bool = false): Future[Connection] {.async, gcsafe.} = method newStream*(m: Mplex,
name: string = "",
lazy: bool = false): Future[Connection] {.async, gcsafe.} =
let channel = await m.newStreamInternal(lazy = lazy) let channel = await m.newStreamInternal(lazy = lazy)
if not lazy: if not lazy:
await channel.open() await channel.open()