From 8c76799d9e75da45ab53a782b88f965de31b381e Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Sat, 7 Sep 2019 17:32:32 -0600 Subject: [PATCH] make writes sequential --- libp2p/muxers/mplex/channel.nim | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/libp2p/muxers/mplex/channel.nim b/libp2p/muxers/mplex/channel.nim index 638076bb9..13a8737ec 100644 --- a/libp2p/muxers/mplex/channel.nim +++ b/libp2p/muxers/mplex/channel.nim @@ -29,6 +29,7 @@ type msgCode*: MessageType closeCode*: MessageType resetCode*: MessageType + asyncLock: AsyncLock proc newChannel*(id: uint, conn: Connection, @@ -43,10 +44,14 @@ proc newChannel*(id: uint, result.msgCode = if initiator: MessageType.MsgOut else: MessageType.MsgIn result.closeCode = if initiator: MessageType.CloseOut else: MessageType.CloseIn result.resetCode = if initiator: MessageType.ResetOut else: MessageType.ResetIn + result.asyncLock = newAsyncLock() let chan = result proc writeHandler(data: seq[byte]): Future[void] {.async, gcsafe.} = + # writes should happen in sequence + await chan.asyncLock.acquire() await conn.writeMsg(chan.id, chan.msgCode, data) # write header + chan.asyncLock.release() result.initBufferStream(writeHandler, size) @@ -54,11 +59,14 @@ proc closeMessage(s: Channel) {.async, gcsafe.} = await s.conn.writeMsg(s.id, s.closeCode) # write header proc closed*(s: Channel): bool = - s.closedLocal + s.closedLocal and s.closedLocal proc closedByRemote*(s: Channel) {.async.} = s.closedRemote = true +proc cleanUp*(s: Channel): Future[void] = + result = procCall close(BufferStream(s)) + method close*(s: Channel) {.async, gcsafe.} = s.closedLocal = true await s.closeMessage()