make writes sequential

This commit is contained in:
Dmitriy Ryajov 2019-09-07 17:32:32 -06:00
parent e53c87e197
commit 8c76799d9e
1 changed files with 9 additions and 1 deletions

View File

@ -29,6 +29,7 @@ type
msgCode*: MessageType msgCode*: MessageType
closeCode*: MessageType closeCode*: MessageType
resetCode*: MessageType resetCode*: MessageType
asyncLock: AsyncLock
proc newChannel*(id: uint, proc newChannel*(id: uint,
conn: Connection, conn: Connection,
@ -43,10 +44,14 @@ proc newChannel*(id: uint,
result.msgCode = if initiator: MessageType.MsgOut else: MessageType.MsgIn result.msgCode = if initiator: MessageType.MsgOut else: MessageType.MsgIn
result.closeCode = if initiator: MessageType.CloseOut else: MessageType.CloseIn result.closeCode = if initiator: MessageType.CloseOut else: MessageType.CloseIn
result.resetCode = if initiator: MessageType.ResetOut else: MessageType.ResetIn result.resetCode = if initiator: MessageType.ResetOut else: MessageType.ResetIn
result.asyncLock = newAsyncLock()
let chan = result let chan = result
proc writeHandler(data: seq[byte]): Future[void] {.async, gcsafe.} = proc writeHandler(data: seq[byte]): Future[void] {.async, gcsafe.} =
# writes should happen in sequence
await chan.asyncLock.acquire()
await conn.writeMsg(chan.id, chan.msgCode, data) # write header await conn.writeMsg(chan.id, chan.msgCode, data) # write header
chan.asyncLock.release()
result.initBufferStream(writeHandler, size) result.initBufferStream(writeHandler, size)
@ -54,11 +59,14 @@ proc closeMessage(s: Channel) {.async, gcsafe.} =
await s.conn.writeMsg(s.id, s.closeCode) # write header await s.conn.writeMsg(s.id, s.closeCode) # write header
proc closed*(s: Channel): bool = proc closed*(s: Channel): bool =
s.closedLocal s.closedLocal and s.closedLocal
proc closedByRemote*(s: Channel) {.async.} = proc closedByRemote*(s: Channel) {.async.} =
s.closedRemote = true s.closedRemote = true
proc cleanUp*(s: Channel): Future[void] =
result = procCall close(BufferStream(s))
method close*(s: Channel) {.async, gcsafe.} = method close*(s: Channel) {.async, gcsafe.} =
s.closedLocal = true s.closedLocal = true
await s.closeMessage() await s.closeMessage()