Add custom ring buffer into chronos streams and transports. (#485)

* Add custom ring buffer into chronos stream transport.

* Rename BipBuffer.decommit() to BipBuffer.consume()
Make asyncstream's using BipBuffer.

* Address review comments part 1.

* Address review comments part 2.

* Address review comments.

* Remove unused import results.

* Address review comments.
This commit is contained in:
Eugene Kabanov 2024-03-26 22:33:19 +02:00 committed by GitHub
parent ef1b077adf
commit 402914f4cf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 378 additions and 218 deletions

140
chronos/bipbuffer.nim Normal file
View File

@ -0,0 +1,140 @@
#
# Chronos
#
# (c) Copyright 2018-Present Status Research & Development GmbH
#
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
## This module implements Bip Buffer (bi-partite circular buffer) by Simone
## Cooke.
##
## The Bip-Buffer is like a circular buffer, but slightly different. Instead of
## keeping one head and tail pointer to the data in the buffer, it maintains two
## revolving regions, allowing for fast data access without having to worry
## about wrapping at the end of the buffer. Buffer allocations are always
## maintained as contiguous blocks, allowing the buffer to be used in a highly
## efficient manner with API calls, and also reducing the amount of copying
## which needs to be performed to put data into the buffer. Finally, a two-phase
## allocation system allows the user to pessimistically reserve an area of
## buffer space, and then trim back the buffer to commit to only the space which
## was used.
##
## https://www.codeproject.com/Articles/3479/The-Bip-Buffer-The-Circular-Buffer-with-a-Twist
{.push raises: [].}
type
BipPos = object
start: Natural
finish: Natural
BipBuffer* = object
a, b, r: BipPos
data: seq[byte]
proc init*(t: typedesc[BipBuffer], size: int): BipBuffer =
## Creates new Bip Buffer with size `size`.
BipBuffer(data: newSeq[byte](size))
template len(pos: BipPos): Natural =
pos.finish - pos.start
template reset(pos: var BipPos) =
pos = BipPos()
func init(t: typedesc[BipPos], start, finish: Natural): BipPos =
BipPos(start: start, finish: finish)
func calcReserve(bp: BipBuffer): tuple[space: Natural, start: Natural] =
if len(bp.b) > 0:
(Natural(bp.a.start - bp.b.finish), bp.b.finish)
else:
let spaceAfterA = Natural(len(bp.data) - bp.a.finish)
if spaceAfterA >= bp.a.start:
(spaceAfterA, bp.a.finish)
else:
(bp.a.start, Natural(0))
func availSpace*(bp: BipBuffer): Natural =
## Returns amount of space available for reserve in buffer `bp`.
let (res, _) = bp.calcReserve()
res
func len*(bp: BipBuffer): Natural =
## Returns amount of used space in buffer `bp`.
len(bp.b) + len(bp.a)
proc reserve*(bp: var BipBuffer,
size: Natural = 0): tuple[data: ptr byte, size: Natural] =
## Reserve `size` bytes in buffer.
##
## If `size == 0` (default) reserve all available space from buffer.
##
## If there is not enough space in buffer for resevation - error will be
## returned.
##
## Returns current reserved range as pointer of type `pt` and size of
## type `st`.
const ErrorMessage = "Not enough space available"
doAssert(size <= len(bp.data))
let (availableSpace, reserveStart) = bp.calcReserve()
if availableSpace == 0:
raiseAssert ErrorMessage
let reserveLength =
if size == 0:
availableSpace
else:
if size < availableSpace:
raiseAssert ErrorMessage
size
bp.r = BipPos.init(reserveStart, Natural(reserveStart + reserveLength))
(addr bp.data[bp.r.start], len(bp.r))
proc commit*(bp: var BipBuffer, size: Natural) =
## Updates structure's pointers when new data inserted into buffer.
doAssert(len(bp.r) >= size,
"Committed size could not be larger than the previously reserved one")
if size == 0:
bp.r.reset()
return
let toCommit = min(size, len(bp.r))
if len(bp.a) == 0 and len(bp.b) == 0:
bp.a.start = bp.r.start
bp.a.finish = bp.r.start + toCommit
elif bp.r.start == bp.a.finish:
bp.a.finish += toCommit
else:
bp.b.finish += toCommit
bp.r.reset()
proc consume*(bp: var BipBuffer, size: Natural) =
## The procedure removes/frees `size` bytes from the buffer ``bp``.
var currentSize = size
if currentSize >= len(bp.a):
currentSize -= len(bp.a)
bp.a = bp.b
bp.b.reset()
if currentSize >= len(bp.a):
currentSize -= len(bp.a)
bp.a.reset()
else:
bp.a.start += currentSize
else:
bp.a.start += currentSize
iterator items*(bp: BipBuffer): byte =
## Iterates over all the bytes in the buffer.
for index in bp.a.start ..< bp.a.finish:
yield bp.data[index]
for index in bp.b.start ..< bp.b.finish:
yield bp.data[index]
iterator regions*(bp: var BipBuffer): tuple[data: ptr byte, size: Natural] =
## Iterates over all the regions (`a` and `b`) in the buffer.
if len(bp.a) > 0:
yield (addr bp.data[bp.a.start], len(bp.a))
if len(bp.b) > 0:
yield (addr bp.data[bp.b.start], len(bp.b))

View File

@ -9,7 +9,7 @@
{.push raises: [].}
import ../[config, asyncloop, asyncsync]
import ../[config, asyncloop, asyncsync, bipbuffer]
import ../transports/[common, stream]
export asyncloop, asyncsync, stream, common
@ -34,10 +34,11 @@ type
AsyncStreamWriteEOFError* = object of AsyncStreamWriteError
AsyncBuffer* = object
offset*: int
buffer*: seq[byte]
backend*: BipBuffer
events*: array[2, AsyncEvent]
AsyncBufferRef* = ref AsyncBuffer
WriteType* = enum
Pointer, Sequence, String
@ -73,7 +74,7 @@ type
tsource*: StreamTransport
readerLoop*: StreamReaderLoop
state*: AsyncStreamState
buffer*: AsyncBuffer
buffer*: AsyncBufferRef
udata: pointer
error*: ref AsyncStreamError
bytesCount*: uint64
@ -96,85 +97,51 @@ type
AsyncStreamRW* = AsyncStreamReader | AsyncStreamWriter
proc init*(t: typedesc[AsyncBuffer], size: int): AsyncBuffer =
AsyncBuffer(
buffer: newSeq[byte](size),
events: [newAsyncEvent(), newAsyncEvent()],
offset: 0
proc new*(t: typedesc[AsyncBufferRef], size: int): AsyncBufferRef =
AsyncBufferRef(
backend: BipBuffer.init(size),
events: [newAsyncEvent(), newAsyncEvent()]
)
proc getBuffer*(sb: AsyncBuffer): pointer {.inline.} =
unsafeAddr sb.buffer[sb.offset]
proc bufferLen*(sb: AsyncBuffer): int {.inline.} =
len(sb.buffer) - sb.offset
proc getData*(sb: AsyncBuffer): pointer {.inline.} =
unsafeAddr sb.buffer[0]
template dataLen*(sb: AsyncBuffer): int =
sb.offset
proc `[]`*(sb: AsyncBuffer, index: int): byte {.inline.} =
doAssert(index < sb.offset)
sb.buffer[index]
proc update*(sb: var AsyncBuffer, size: int) {.inline.} =
sb.offset += size
template wait*(sb: var AsyncBuffer): untyped =
template wait*(sb: AsyncBufferRef): untyped =
sb.events[0].clear()
sb.events[1].fire()
sb.events[0].wait()
template transfer*(sb: var AsyncBuffer): untyped =
template transfer*(sb: AsyncBufferRef): untyped =
sb.events[1].clear()
sb.events[0].fire()
sb.events[1].wait()
proc forget*(sb: var AsyncBuffer) {.inline.} =
proc forget*(sb: AsyncBufferRef) {.inline.} =
sb.events[1].clear()
sb.events[0].fire()
proc shift*(sb: var AsyncBuffer, size: int) {.inline.} =
if sb.offset > size:
moveMem(addr sb.buffer[0], addr sb.buffer[size], sb.offset - size)
sb.offset = sb.offset - size
else:
sb.offset = 0
proc copyData*(sb: AsyncBuffer, dest: pointer, offset, length: int) {.inline.} =
copyMem(cast[pointer](cast[uint](dest) + cast[uint](offset)),
unsafeAddr sb.buffer[0], length)
proc upload*(sb: ptr AsyncBuffer, pbytes: ptr byte,
proc upload*(sb: AsyncBufferRef, pbytes: ptr byte,
nbytes: int): Future[void] {.
async: (raises: [CancelledError]).} =
## You can upload any amount of bytes to the buffer. If size of internal
## buffer is not enough to fit all the data at once, data will be uploaded
## via chunks of size up to internal buffer size.
var length = nbytes
var srcBuffer = cast[ptr UncheckedArray[byte]](pbytes)
var srcOffset = 0
var
length = nbytes
srcBuffer = pbytes.toUnchecked()
offset = 0
while length > 0:
let size = min(length, sb[].bufferLen())
let size = min(length, sb.backend.availSpace())
if size == 0:
# Internal buffer is full, we need to transfer data to consumer.
await sb[].transfer()
# Internal buffer is full, we need to notify consumer.
await sb.transfer()
else:
let (data, _) = sb.backend.reserve()
# Copy data from `pbytes` to internal buffer.
copyMem(addr sb[].buffer[sb.offset], addr srcBuffer[srcOffset], size)
sb[].offset = sb[].offset + size
srcOffset = srcOffset + size
copyMem(data, addr srcBuffer[offset], size)
sb.backend.commit(size)
offset = offset + size
length = length - size
# We notify consumers that new data is available.
sb[].forget()
template toDataOpenArray*(sb: AsyncBuffer): auto =
toOpenArray(sb.buffer, 0, sb.offset - 1)
template toBufferOpenArray*(sb: AsyncBuffer): auto =
toOpenArray(sb.buffer, sb.offset, len(sb.buffer) - 1)
sb.forget()
template copyOut*(dest: pointer, item: WriteItem, length: int) =
if item.kind == Pointer:
@ -243,7 +210,7 @@ proc atEof*(rstream: AsyncStreamReader): bool =
rstream.rsource.atEof()
else:
(rstream.state != AsyncStreamState.Running) and
(rstream.buffer.dataLen() == 0)
(len(rstream.buffer.backend) == 0)
proc atEof*(wstream: AsyncStreamWriter): bool =
## Returns ``true`` is writing stream ``wstream`` closed or finished.
@ -331,12 +298,12 @@ template checkStreamFinished*(t: untyped) =
template readLoop(body: untyped): untyped =
while true:
if rstream.buffer.dataLen() == 0:
if len(rstream.buffer.backend) == 0:
if rstream.state == AsyncStreamState.Error:
raise rstream.error
let (consumed, done) = body
rstream.buffer.shift(consumed)
rstream.buffer.backend.consume(consumed)
rstream.bytesCount = rstream.bytesCount + uint64(consumed)
if done:
break
@ -373,17 +340,23 @@ proc readExactly*(rstream: AsyncStreamReader, pbytes: pointer,
if isNil(rstream.readerLoop):
await readExactly(rstream.rsource, pbytes, nbytes)
else:
var index = 0
var pbuffer = cast[ptr UncheckedArray[byte]](pbytes)
var
index = 0
pbuffer = pbytes.toUnchecked()
readLoop():
if rstream.buffer.dataLen() == 0:
if len(rstream.buffer.backend) == 0:
if rstream.atEof():
raise newAsyncStreamIncompleteError()
let count = min(nbytes - index, rstream.buffer.dataLen())
if count > 0:
rstream.buffer.copyData(addr pbuffer[index], 0, count)
index += count
(consumed: count, done: index == nbytes)
var readed = 0
for (region, rsize) in rstream.buffer.backend.regions():
let count = min(nbytes - index, rsize)
readed += count
if count > 0:
copyMem(addr pbuffer[index], region, count)
index += count
if index == nbytes:
break
(consumed: readed, done: index == nbytes)
proc readOnce*(rstream: AsyncStreamReader, pbytes: pointer,
nbytes: int): Future[int] {.
@ -407,15 +380,21 @@ proc readOnce*(rstream: AsyncStreamReader, pbytes: pointer,
if isNil(rstream.readerLoop):
return await readOnce(rstream.rsource, pbytes, nbytes)
else:
var count = 0
var
pbuffer = pbytes.toUnchecked()
index = 0
readLoop():
if rstream.buffer.dataLen() == 0:
if len(rstream.buffer.backend) == 0:
(0, rstream.atEof())
else:
count = min(rstream.buffer.dataLen(), nbytes)
rstream.buffer.copyData(pbytes, 0, count)
(count, true)
return count
for (region, rsize) in rstream.buffer.backend.regions():
let size = min(rsize, nbytes - index)
copyMem(addr pbuffer[index], region, size)
index += size
if index >= nbytes:
break
(index, true)
index
proc readUntil*(rstream: AsyncStreamReader, pbytes: pointer, nbytes: int,
sep: seq[byte]): Future[int] {.
@ -456,28 +435,32 @@ proc readUntil*(rstream: AsyncStreamReader, pbytes: pointer, nbytes: int,
if isNil(rstream.readerLoop):
return await readUntil(rstream.rsource, pbytes, nbytes, sep)
else:
var pbuffer = cast[ptr UncheckedArray[byte]](pbytes)
var state = 0
var k = 0
var
pbuffer = pbytes.toUnchecked()
state = 0
k = 0
readLoop():
if rstream.atEof():
raise newAsyncStreamIncompleteError()
var index = 0
while index < rstream.buffer.dataLen():
for ch in rstream.buffer.backend:
if k >= nbytes:
raise newAsyncStreamLimitError()
let ch = rstream.buffer[index]
inc(index)
pbuffer[k] = ch
inc(k)
if sep[state] == ch:
inc(state)
if state == len(sep):
break
else:
state = 0
(index, state == len(sep))
return k
k
proc readLine*(rstream: AsyncStreamReader, limit = 0,
sep = "\r\n"): Future[string] {.
@ -507,18 +490,19 @@ proc readLine*(rstream: AsyncStreamReader, limit = 0,
return await readLine(rstream.rsource, limit, sep)
else:
let lim = if limit <= 0: -1 else: limit
var state = 0
var res = ""
var
state = 0
res = ""
readLoop():
if rstream.atEof():
(0, true)
else:
var index = 0
while index < rstream.buffer.dataLen():
let ch = char(rstream.buffer[index])
for ch in rstream.buffer.backend:
inc(index)
if sep[state] == ch:
if sep[state] == char(ch):
inc(state)
if state == len(sep):
break
@ -529,11 +513,14 @@ proc readLine*(rstream: AsyncStreamReader, limit = 0,
res.add(sep[0 ..< missing])
else:
res.add(sep[0 ..< state])
res.add(ch)
state = 0
res.add(char(ch))
if len(res) == lim:
break
(index, (state == len(sep)) or (lim == len(res)))
return res
res
proc read*(rstream: AsyncStreamReader): Future[seq[byte]] {.
async: (raises: [CancelledError, AsyncStreamError]).} =
@ -555,15 +542,17 @@ proc read*(rstream: AsyncStreamReader): Future[seq[byte]] {.
if isNil(rstream.readerLoop):
return await read(rstream.rsource)
else:
var res = newSeq[byte]()
var res: seq[byte]
readLoop():
if rstream.atEof():
(0, true)
else:
let count = rstream.buffer.dataLen()
res.add(rstream.buffer.buffer.toOpenArray(0, count - 1))
(count, false)
return res
var readed = 0
for (region, rsize) in rstream.buffer.backend.regions():
readed += rsize
res.add(region.toUnchecked().toOpenArray(0, rsize - 1))
(readed, false)
res
proc read*(rstream: AsyncStreamReader, n: int): Future[seq[byte]] {.
async: (raises: [CancelledError, AsyncStreamError]).} =
@ -592,10 +581,13 @@ proc read*(rstream: AsyncStreamReader, n: int): Future[seq[byte]] {.
if rstream.atEof():
(0, true)
else:
let count = min(rstream.buffer.dataLen(), n - len(res))
res.add(rstream.buffer.buffer.toOpenArray(0, count - 1))
(count, len(res) == n)
return res
var readed = 0
for (region, rsize) in rstream.buffer.backend.regions():
let count = min(rsize, n - len(res))
readed += count
res.add(region.toUnchecked().toOpenArray(0, count - 1))
(readed, len(res) == n)
res
proc consume*(rstream: AsyncStreamReader): Future[int] {.
async: (raises: [CancelledError, AsyncStreamError]).} =
@ -622,9 +614,10 @@ proc consume*(rstream: AsyncStreamReader): Future[int] {.
if rstream.atEof():
(0, true)
else:
res += rstream.buffer.dataLen()
(rstream.buffer.dataLen(), false)
return res
let used = len(rstream.buffer.backend)
res += used
(used, false)
res
proc consume*(rstream: AsyncStreamReader, n: int): Future[int] {.
async: (raises: [CancelledError, AsyncStreamError]).} =
@ -652,13 +645,12 @@ proc consume*(rstream: AsyncStreamReader, n: int): Future[int] {.
else:
var res = 0
readLoop():
if rstream.atEof():
(0, true)
else:
let count = min(rstream.buffer.dataLen(), n - res)
res += count
(count, res == n)
return res
let
used = len(rstream.buffer.backend)
count = min(used, n - res)
res += count
(count, res == n)
res
proc readMessage*(rstream: AsyncStreamReader, pred: ReadMessagePredicate) {.
async: (raises: [CancelledError, AsyncStreamError]).} =
@ -689,15 +681,18 @@ proc readMessage*(rstream: AsyncStreamReader, pred: ReadMessagePredicate) {.
await readMessage(rstream.rsource, pred)
else:
readLoop():
let count = rstream.buffer.dataLen()
if count == 0:
if len(rstream.buffer.backend) == 0:
if rstream.atEof():
pred([])
else:
# Case, when transport's buffer is not yet filled with data.
(0, false)
else:
pred(rstream.buffer.buffer.toOpenArray(0, count - 1))
var res: tuple[consumed: int, done: bool]
for (region, rsize) in rstream.buffer.backend.regions():
res = pred(region.toUnchecked().toOpenArray(0, rsize - 1))
break
res
proc write*(wstream: AsyncStreamWriter, pbytes: pointer,
nbytes: int) {.
@ -951,7 +946,7 @@ proc init*(child, rsource: AsyncStreamReader, loop: StreamReaderLoop,
child.readerLoop = loop
child.rsource = rsource
child.tsource = rsource.tsource
child.buffer = AsyncBuffer.init(bufferSize)
child.buffer = AsyncBufferRef.new(bufferSize)
trackCounter(AsyncStreamReaderTrackerName)
child.startReader()
@ -963,7 +958,7 @@ proc init*[T](child, rsource: AsyncStreamReader, loop: StreamReaderLoop,
child.readerLoop = loop
child.rsource = rsource
child.tsource = rsource.tsource
child.buffer = AsyncBuffer.init(bufferSize)
child.buffer = AsyncBufferRef.new(bufferSize)
if not isNil(udata):
GC_ref(udata)
child.udata = cast[pointer](udata)

View File

@ -18,7 +18,7 @@
{.push raises: [].}
import results
import ../[asyncloop, timer, config]
import ../[asyncloop, timer, bipbuffer, config]
import asyncstream, ../transports/[stream, common]
export asyncloop, asyncstream, stream, timer, common
@ -103,7 +103,7 @@ func endsWith(s, suffix: openArray[byte]): bool =
proc boundedReadLoop(stream: AsyncStreamReader) {.async: (raises: []).} =
var rstream = BoundedStreamReader(stream)
rstream.state = AsyncStreamState.Running
var buffer = newSeq[byte](rstream.buffer.bufferLen())
var buffer = newSeq[byte](rstream.buffer.backend.availSpace())
while true:
let toRead =
if rstream.boundSize.isNone():
@ -127,7 +127,7 @@ proc boundedReadLoop(stream: AsyncStreamReader) {.async: (raises: []).} =
# There should be one step between transferring last bytes to the
# consumer and declaring stream EOF. Otherwise could not be
# consumed.
await upload(addr rstream.buffer, addr buffer[0], length)
await upload(rstream.buffer, addr buffer[0], length)
if rstream.state == AsyncStreamState.Running:
rstream.state = AsyncStreamState.Finished
else:
@ -135,7 +135,7 @@ proc boundedReadLoop(stream: AsyncStreamReader) {.async: (raises: []).} =
# There should be one step between transferring last bytes to the
# consumer and declaring stream EOF. Otherwise could not be
# consumed.
await upload(addr rstream.buffer, addr buffer[0], res)
await upload(rstream.buffer, addr buffer[0], res)
if (res < toRead) and rstream.rsource.atEof():
case rstream.cmpop
@ -151,7 +151,7 @@ proc boundedReadLoop(stream: AsyncStreamReader) {.async: (raises: []).} =
# There should be one step between transferring last bytes to the
# consumer and declaring stream EOF. Otherwise could not be
# consumed.
await upload(addr rstream.buffer, addr buffer[0], res)
await upload(rstream.buffer, addr buffer[0], res)
if (res < toRead) and rstream.rsource.atEof():
case rstream.cmpop

View File

@ -11,7 +11,7 @@
{.push raises: [].}
import ../[asyncloop, timer, config]
import ../[asyncloop, timer, bipbuffer, config]
import asyncstream, ../transports/[stream, common]
import results
export asyncloop, asyncstream, stream, timer, common, results
@ -118,11 +118,11 @@ proc chunkedReadLoop(stream: AsyncStreamReader) {.async: (raises: []).} =
var chunksize = cres.get()
if chunksize > 0'u64:
while chunksize > 0'u64:
let toRead = int(min(chunksize,
uint64(rstream.buffer.bufferLen())))
await rstream.rsource.readExactly(rstream.buffer.getBuffer(),
toRead)
rstream.buffer.update(toRead)
let
(data, rsize) = rstream.buffer.backend.reserve()
toRead = int(min(chunksize, uint64(rsize)))
await rstream.rsource.readExactly(data, toRead)
rstream.buffer.backend.commit(toRead)
await rstream.buffer.transfer()
chunksize = chunksize - uint64(toRead)

View File

@ -242,7 +242,7 @@ proc tlsReadApp(engine: ptr SslEngineContext,
try:
var length = 0'u
var buf = sslEngineRecvappBuf(engine[], length)
await upload(addr reader.buffer, buf, int(length))
await upload(reader.buffer, buf, int(length))
sslEngineRecvappAck(engine[], length)
TLSResult.Success
except CancelledError:

View File

@ -11,7 +11,7 @@
import std/deques
when not(defined(windows)): import ".."/selectors2
import ".."/[asyncloop, config, osdefs, oserrno, osutils, handles]
import ".."/[asyncloop, osdefs, oserrno, osutils, handles]
import "."/common
import stew/ptrops

View File

@ -11,7 +11,7 @@
import std/deques
import stew/ptrops
import ".."/[asyncloop, config, handles, osdefs, osutils, oserrno]
import ".."/[asyncloop, config, handles, bipbuffer, osdefs, osutils, oserrno]
import ./common
type
@ -72,8 +72,7 @@ when defined(windows):
fd*: AsyncFD # File descriptor
state: set[TransportState] # Current Transport state
reader: ReaderFuture # Current reader Future
buffer: seq[byte] # Reading buffer
offset: int # Reading buffer offset
buffer: BipBuffer # Reading buffer
error: ref TransportError # Current error
queue: Deque[StreamVector] # Writer queue
future: Future[void].Raising([]) # Stream life future
@ -82,7 +81,6 @@ when defined(windows):
wwsabuf: WSABUF # Writer WSABUF
rovl: CustomOverlapped # Reader OVERLAPPED structure
wovl: CustomOverlapped # Writer OVERLAPPED structure
roffset: int # Pending reading offset
flags: set[TransportFlags] # Internal flags
case kind*: TransportKind
of TransportKind.Socket:
@ -99,8 +97,7 @@ else:
fd*: AsyncFD # File descriptor
state: set[TransportState] # Current Transport state
reader: ReaderFuture # Current reader Future
buffer: seq[byte] # Reading buffer
offset: int # Reading buffer offset
buffer: BipBuffer # Reading buffer
error: ref TransportError # Current error
queue: Deque[StreamVector] # Writer queue
future: Future[void].Raising([]) # Stream life future
@ -184,14 +181,6 @@ template checkPending(t: untyped) =
if not(isNil((t).reader)):
raise newException(TransportError, "Read operation already pending!")
template shiftBuffer(t, c: untyped) =
if (t).offset > c:
if c > 0:
moveMem(addr((t).buffer[0]), addr((t).buffer[(c)]), (t).offset - (c))
(t).offset = (t).offset - (c)
else:
(t).offset = 0
template shiftVectorBuffer(v: var StreamVector, o: untyped) =
(v).buf = cast[pointer](cast[uint]((v).buf) + uint(o))
(v).buflen -= int(o)
@ -228,6 +217,9 @@ proc clean(transp: StreamTransport) {.inline.} =
transp.future.complete()
GC_unref(transp)
template toUnchecked*(a: untyped): untyped =
cast[ptr UncheckedArray[byte]](a)
when defined(windows):
template zeroOvelappedOffset(t: untyped) =
@ -245,9 +237,9 @@ when defined(windows):
cast[HANDLE]((v).buflen)
template setReaderWSABuffer(t: untyped) =
(t).rwsabuf.buf = cast[cstring](
cast[uint](addr t.buffer[0]) + uint((t).roffset))
(t).rwsabuf.len = ULONG(len((t).buffer) - (t).roffset)
let res = (t).buffer.reserve()
(t).rwsabuf.buf = cast[cstring](res.data)
(t).rwsabuf.len = uint32(res.size)
template setWriterWSABuffer(t, v: untyped) =
(t).wwsabuf.buf = cast[cstring](v.buf)
@ -381,8 +373,9 @@ when defined(windows):
else:
transp.queue.addFirst(vector)
else:
let loop = getThreadDispatcher()
let size = min(uint32(getFileSize(vector)), 2_147_483_646'u32)
let
loop = getThreadDispatcher()
size = min(uint32(getFileSize(vector)), 2_147_483_646'u32)
transp.wovl.setOverlappedOffset(vector.offset)
var ret = loop.transmitFile(sock, getFileHandle(vector), size,
@ -481,29 +474,28 @@ when defined(windows):
if bytesCount == 0:
transp.state.incl({ReadEof, ReadPaused})
else:
if transp.offset != transp.roffset:
moveMem(addr transp.buffer[transp.offset],
addr transp.buffer[transp.roffset],
bytesCount)
transp.offset += int(bytesCount)
transp.roffset = transp.offset
if transp.offset == len(transp.buffer):
transp.buffer.commit(bytesCount)
if transp.buffer.availSpace() == 0:
transp.state.incl(ReadPaused)
of ERROR_OPERATION_ABORTED, ERROR_CONNECTION_ABORTED,
ERROR_BROKEN_PIPE:
# CancelIO() interrupt or closeSocket() call.
transp.buffer.commit(0)
transp.state.incl(ReadPaused)
of ERROR_NETNAME_DELETED, WSAECONNABORTED:
transp.buffer.commit(0)
if transp.kind == TransportKind.Socket:
transp.state.incl({ReadEof, ReadPaused})
else:
transp.setReadError(err)
of ERROR_PIPE_NOT_CONNECTED:
transp.buffer.commit(0)
if transp.kind == TransportKind.Pipe:
transp.state.incl({ReadEof, ReadPaused})
else:
transp.setReadError(err)
else:
transp.buffer.commit(0)
transp.setReadError(err)
transp.completeReader()
@ -524,7 +516,6 @@ when defined(windows):
transp.state.incl(ReadPending)
if transp.kind == TransportKind.Socket:
let sock = SocketHandle(transp.fd)
transp.roffset = transp.offset
transp.setReaderWSABuffer()
let ret = wsaRecv(sock, addr transp.rwsabuf, 1,
addr bytesCount, addr flags,
@ -549,7 +540,6 @@ when defined(windows):
transp.completeReader()
elif transp.kind == TransportKind.Pipe:
let pipe = HANDLE(transp.fd)
transp.roffset = transp.offset
transp.setReaderWSABuffer()
let ret = readFile(pipe, cast[pointer](transp.rwsabuf.buf),
DWORD(transp.rwsabuf.len), addr bytesCount,
@ -595,7 +585,7 @@ when defined(windows):
udata: cast[pointer](transp))
transp.wovl.data = CompletionData(cb: writeStreamLoop,
udata: cast[pointer](transp))
transp.buffer = newSeq[byte](bufsize)
transp.buffer = BipBuffer.init(bufsize)
transp.state = {ReadPaused, WritePaused}
transp.queue = initDeque[StreamVector]()
transp.future = Future[void].Raising([]).init(
@ -616,7 +606,7 @@ when defined(windows):
udata: cast[pointer](transp))
transp.wovl.data = CompletionData(cb: writeStreamLoop,
udata: cast[pointer](transp))
transp.buffer = newSeq[byte](bufsize)
transp.buffer = BipBuffer.init(bufsize)
transp.flags = flags
transp.state = {ReadPaused, WritePaused}
transp.queue = initDeque[StreamVector]()
@ -1390,11 +1380,12 @@ else:
else:
if transp.kind == TransportKind.Socket:
while true:
let res = handleEintr(
osdefs.recv(fd, addr transp.buffer[transp.offset],
len(transp.buffer) - transp.offset, cint(0)))
let
(data, size) = transp.buffer.reserve()
res = handleEintr(osdefs.recv(fd, data, size, cint(0)))
if res < 0:
let err = osLastError()
transp.buffer.commit(0)
case err
of oserrno.ECONNRESET:
transp.state.incl({ReadEof, ReadPaused})
@ -1408,13 +1399,14 @@ else:
discard removeReader2(transp.fd)
elif res == 0:
transp.state.incl({ReadEof, ReadPaused})
transp.buffer.commit(0)
let rres = removeReader2(transp.fd)
if rres.isErr():
transp.state.incl(ReadError)
transp.setReadError(rres.error())
else:
transp.offset += res
if transp.offset == len(transp.buffer):
transp.buffer.commit(res)
if transp.buffer.availSpace() == 0:
transp.state.incl(ReadPaused)
let rres = removeReader2(transp.fd)
if rres.isErr():
@ -1424,23 +1416,25 @@ else:
break
elif transp.kind == TransportKind.Pipe:
while true:
let res = handleEintr(
osdefs.read(cint(fd), addr transp.buffer[transp.offset],
len(transp.buffer) - transp.offset))
let
(data, size) = transp.buffer.reserve()
res = handleEintr(osdefs.read(cint(fd), data, size))
if res < 0:
let err = osLastError()
transp.buffer.commit(0)
transp.state.incl(ReadPaused)
transp.setReadError(err)
discard removeReader2(transp.fd)
elif res == 0:
transp.state.incl({ReadEof, ReadPaused})
transp.buffer.commit(0)
let rres = removeReader2(transp.fd)
if rres.isErr():
transp.state.incl(ReadError)
transp.setReadError(rres.error())
else:
transp.offset += res
if transp.offset == len(transp.buffer):
transp.buffer.commit(res)
if transp.buffer.availSpace() == 0:
transp.state.incl(ReadPaused)
let rres = removeReader2(transp.fd)
if rres.isErr():
@ -1458,7 +1452,7 @@ else:
transp = StreamTransport(kind: TransportKind.Socket)
transp.fd = sock
transp.buffer = newSeq[byte](bufsize)
transp.buffer = BipBuffer.init(bufsize)
transp.state = {ReadPaused, WritePaused}
transp.queue = initDeque[StreamVector]()
transp.future = Future[void].Raising([]).init(
@ -1475,7 +1469,7 @@ else:
transp = StreamTransport(kind: TransportKind.Pipe)
transp.fd = fd
transp.buffer = newSeq[byte](bufsize)
transp.buffer = BipBuffer.init(bufsize)
transp.state = {ReadPaused, WritePaused}
transp.queue = initDeque[StreamVector]()
transp.future = Future[void].Raising([]).init(
@ -2339,7 +2333,7 @@ proc writeFile*(transp: StreamTransport, handle: int,
proc atEof*(transp: StreamTransport): bool {.inline.} =
## Returns ``true`` if ``transp`` is at EOF.
(transp.offset == 0) and (ReadEof in transp.state) and
(len(transp.buffer) == 0) and (ReadEof in transp.state) and
(ReadPaused in transp.state)
template readLoop(name, body: untyped): untyped =
@ -2351,16 +2345,17 @@ template readLoop(name, body: untyped): untyped =
if ReadClosed in transp.state:
raise newException(TransportUseClosedError,
"Attempt to read data from closed stream")
if transp.offset == 0:
if len(transp.buffer) == 0:
# We going to raise an error, only if transport buffer is empty.
if ReadError in transp.state:
raise transp.getError()
let (consumed, done) = body
transp.shiftBuffer(consumed)
transp.buffer.consume(consumed)
if done:
break
else:
if len(transp.buffer) == 0:
checkPending(transp)
let fut = ReaderFuture.init(name)
transp.reader = fut
@ -2403,17 +2398,23 @@ proc readExactly*(transp: StreamTransport, pbytes: pointer,
if nbytes == 0:
return
var index = 0
var pbuffer = cast[ptr UncheckedArray[byte]](pbytes)
var
index = 0
pbuffer = pbytes.toUnchecked()
readLoop("stream.transport.readExactly"):
if transp.offset == 0:
if len(transp.buffer) == 0:
if transp.atEof():
raise newException(TransportIncompleteError, "Data incomplete!")
let count = min(nbytes - index, transp.offset)
if count > 0:
copyMem(addr pbuffer[index], addr(transp.buffer[0]), count)
index += count
(consumed: count, done: index == nbytes)
var readed = 0
for (region, rsize) in transp.buffer.regions():
let count = min(nbytes - index, rsize)
readed += count
if count > 0:
copyMem(addr pbuffer[index], region, count)
index += count
if index == nbytes:
break
(consumed: readed, done: index == nbytes)
proc readOnce*(transp: StreamTransport, pbytes: pointer,
nbytes: int): Future[int] {.
@ -2425,15 +2426,21 @@ proc readOnce*(transp: StreamTransport, pbytes: pointer,
doAssert(not(isNil(pbytes)), "pbytes must not be nil")
doAssert(nbytes > 0, "nbytes must be positive integer")
var count = 0
var
pbuffer = pbytes.toUnchecked()
index = 0
readLoop("stream.transport.readOnce"):
if transp.offset == 0:
if len(transp.buffer) == 0:
(0, transp.atEof())
else:
count = min(transp.offset, nbytes)
copyMem(pbytes, addr(transp.buffer[0]), count)
(count, true)
return count
for (region, rsize) in transp.buffer.regions():
let size = min(rsize, nbytes - index)
copyMem(addr pbuffer[index], region, size)
index += size
if index >= nbytes:
break
(index, true)
index
proc readUntil*(transp: StreamTransport, pbytes: pointer, nbytes: int,
sep: seq[byte]): Future[int] {.
@ -2457,7 +2464,7 @@ proc readUntil*(transp: StreamTransport, pbytes: pointer, nbytes: int,
if nbytes == 0:
raise newException(TransportLimitError, "Limit reached!")
var pbuffer = cast[ptr UncheckedArray[byte]](pbytes)
var pbuffer = pbytes.toUnchecked()
var state = 0
var k = 0
@ -2466,14 +2473,11 @@ proc readUntil*(transp: StreamTransport, pbytes: pointer, nbytes: int,
raise newException(TransportIncompleteError, "Data incomplete!")
var index = 0
while index < transp.offset:
for ch in transp.buffer:
if k >= nbytes:
raise newException(TransportLimitError, "Limit reached!")
let ch = transp.buffer[index]
inc(index)
pbuffer[k] = ch
inc(k)
@ -2485,8 +2489,7 @@ proc readUntil*(transp: StreamTransport, pbytes: pointer, nbytes: int,
state = 0
(index, state == len(sep))
return k
k
proc readLine*(transp: StreamTransport, limit = 0,
sep = "\r\n"): Future[string] {.
@ -2503,46 +2506,52 @@ proc readLine*(transp: StreamTransport, limit = 0,
## If ``limit`` more then 0, then read is limited to ``limit`` bytes.
let lim = if limit <= 0: -1 else: limit
var state = 0
var res: string
readLoop("stream.transport.readLine"):
if transp.atEof():
(0, true)
else:
var index = 0
while index < transp.offset:
let ch = char(transp.buffer[index])
index += 1
for ch in transp.buffer:
inc(index)
if sep[state] == ch:
if sep[state] == char(ch):
inc(state)
if state == len(sep):
break
else:
if state != 0:
if limit > 0:
let missing = min(state, lim - len(result) - 1)
result.add(sep[0 ..< missing])
let missing = min(state, lim - len(res) - 1)
res.add(sep[0 ..< missing])
else:
result.add(sep[0 ..< state])
res.add(sep[0 ..< state])
state = 0
result.add(ch)
if len(result) == lim:
res.add(char(ch))
if len(res) == lim:
break
(index, (state == len(sep)) or (lim == len(result)))
(index, (state == len(sep)) or (lim == len(res)))
res
proc read*(transp: StreamTransport): Future[seq[byte]] {.
async: (raises: [TransportError, CancelledError]).} =
## Read all bytes from transport ``transp``.
##
## This procedure allocates buffer seq[byte] and return it as result.
var res: seq[byte]
readLoop("stream.transport.read"):
if transp.atEof():
(0, true)
else:
result.add(transp.buffer.toOpenArray(0, transp.offset - 1))
(transp.offset, false)
var readed = 0
for (region, rsize) in transp.buffer.regions():
readed += rsize
res.add(region.toUnchecked().toOpenArray(0, rsize - 1))
(readed, false)
res
proc read*(transp: StreamTransport, n: int): Future[seq[byte]] {.
async: (raises: [TransportError, CancelledError]).} =
@ -2550,27 +2559,35 @@ proc read*(transp: StreamTransport, n: int): Future[seq[byte]] {.
##
## This procedure allocates buffer seq[byte] and return it as result.
if n <= 0:
return await transp.read()
await transp.read()
else:
var res: seq[byte]
readLoop("stream.transport.read"):
if transp.atEof():
(0, true)
else:
let count = min(transp.offset, n - len(result))
result.add(transp.buffer.toOpenArray(0, count - 1))
(count, len(result) == n)
var readed = 0
for (region, rsize) in transp.buffer.regions():
let count = min(rsize, n - len(res))
readed += count
res.add(region.toUnchecked().toOpenArray(0, count - 1))
(readed, len(res) == n)
res
proc consume*(transp: StreamTransport): Future[int] {.
async: (raises: [TransportError, CancelledError]).} =
## Consume all bytes from transport ``transp`` and discard it.
##
## Return number of bytes actually consumed and discarded.
var res = 0
readLoop("stream.transport.consume"):
if transp.atEof():
(0, true)
else:
result += transp.offset
(transp.offset, false)
let used = len(transp.buffer)
res += used
(used, false)
res
proc consume*(transp: StreamTransport, n: int): Future[int] {.
async: (raises: [TransportError, CancelledError]).} =
@ -2579,15 +2596,19 @@ proc consume*(transp: StreamTransport, n: int): Future[int] {.
##
## Return number of bytes actually consumed and discarded.
if n <= 0:
return await transp.consume()
await transp.consume()
else:
var res = 0
readLoop("stream.transport.consume"):
if transp.atEof():
(0, true)
else:
let count = min(transp.offset, n - result)
result += count
(count, result == n)
let
used = len(transp.buffer)
count = min(used, n - res)
res += count
(count, res == n)
res
proc readMessage*(transp: StreamTransport,
predicate: ReadMessagePredicate) {.
@ -2605,14 +2626,18 @@ proc readMessage*(transp: StreamTransport,
## ``predicate`` callback will receive (zero-length) openArray, if transport
## is at EOF.
readLoop("stream.transport.readMessage"):
if transp.offset == 0:
if len(transp.buffer) == 0:
if transp.atEof():
predicate([])
else:
# Case, when transport's buffer is not yet filled with data.
(0, false)
else:
predicate(transp.buffer.toOpenArray(0, transp.offset - 1))
var res: tuple[consumed: int, done: bool]
for (region, rsize) in transp.buffer.regions():
res = predicate(region.toUnchecked().toOpenArray(0, rsize - 1))
break
res
proc join*(transp: StreamTransport): Future[void] {.
async: (raw: true, raises: [CancelledError]).} =
@ -2630,7 +2655,7 @@ proc join*(transp: StreamTransport): Future[void] {.
retFuture.cancelCallback = cancel
else:
retFuture.complete()
return retFuture
retFuture
proc closed*(transp: StreamTransport): bool {.inline.} =
## Returns ``true`` if transport in closed state.