From f4e9bc8bfbbe412f6b9852179b432c2262adb1cb Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Wed, 12 Feb 2020 09:39:32 -0500 Subject: [PATCH] remove sleepAsync for synchronization --- libp2p/stream/bufferstream.nim | 42 ++++++++++++++++++---------------- 1 file changed, 22 insertions(+), 20 deletions(-) diff --git a/libp2p/stream/bufferstream.nim b/libp2p/stream/bufferstream.nim index b4f87f75f..f3f29e049 100644 --- a/libp2p/stream/bufferstream.nim +++ b/libp2p/stream/bufferstream.nim @@ -103,30 +103,32 @@ proc pushTo*(s: BufferStream, data: seq[byte]) {.async.} = ## is preserved. ## - await s.lock.acquire() - var index = 0 - while true: + try: + await s.lock.acquire() + var index = 0 + while true: + # give readers a chance free up the buffer + # it it's full. + if s.readBuf.len >= s.maxSize: + await sleepAsync(1.millis) - # give readers a chance free up the buffer - # it it's full. - if s.readBuf.len >= s.maxSize: - await sleepAsync(10.millis) + while index < data.len and s.readBuf.len < s.maxSize: + s.readBuf.addLast(data[index]) + inc(index) - while index < data.len and s.readBuf.len < s.maxSize: - s.readBuf.addLast(data[index]) - inc(index) + # resolve the next queued read request + if s.readReqs.len > 0: + s.readReqs.popFirst().complete() - # resolve the next queued read request - if s.readReqs.len > 0: - s.readReqs.popFirst().complete() + if index >= data.len: + return - if index >= data.len: - break - - # if we couldn't transfer all the data to the - # internal buf wait on a read event - await s.dataReadEvent.wait() - s.lock.release() + # if we couldn't transfer all the data to the + # internal buf wait on a read event + await s.dataReadEvent.wait() + s.dataReadEvent.clear() + finally: + s.lock.release() method read*(s: BufferStream, n = -1): Future[seq[byte]] {.async.} = ## Read all bytes (n <= 0) or exactly `n` bytes from buffer