remove sleepAsync for synchronization

This commit is contained in:
Dmitriy Ryajov 2020-02-12 09:39:32 -05:00
parent 53e163abf2
commit f4e9bc8bfb
1 changed files with 22 additions and 20 deletions

View File

@ -103,30 +103,32 @@ proc pushTo*(s: BufferStream, data: seq[byte]) {.async.} =
## is preserved.
##
await s.lock.acquire()
var index = 0
while true:
try:
await s.lock.acquire()
var index = 0
while true:
# give readers a chance free up the buffer
# it it's full.
if s.readBuf.len >= s.maxSize:
await sleepAsync(1.millis)
# give readers a chance free up the buffer
# it it's full.
if s.readBuf.len >= s.maxSize:
await sleepAsync(10.millis)
while index < data.len and s.readBuf.len < s.maxSize:
s.readBuf.addLast(data[index])
inc(index)
while index < data.len and s.readBuf.len < s.maxSize:
s.readBuf.addLast(data[index])
inc(index)
# resolve the next queued read request
if s.readReqs.len > 0:
s.readReqs.popFirst().complete()
# resolve the next queued read request
if s.readReqs.len > 0:
s.readReqs.popFirst().complete()
if index >= data.len:
return
if index >= data.len:
break
# if we couldn't transfer all the data to the
# internal buf wait on a read event
await s.dataReadEvent.wait()
s.lock.release()
# if we couldn't transfer all the data to the
# internal buf wait on a read event
await s.dataReadEvent.wait()
s.dataReadEvent.clear()
finally:
s.lock.release()
method read*(s: BufferStream, n = -1): Future[seq[byte]] {.async.} =
## Read all bytes (n <= 0) or exactly `n` bytes from buffer