diff --git a/chronos/bipbuffer.nim b/chronos/bipbuffer.nim new file mode 100644 index 0000000..5aa34c4 --- /dev/null +++ b/chronos/bipbuffer.nim @@ -0,0 +1,140 @@ +# +# Chronos +# +# (c) Copyright 2018-Present Status Research & Development GmbH +# +# Licensed under either of +# Apache License, version 2.0, (LICENSE-APACHEv2) +# MIT license (LICENSE-MIT) + +## This module implements Bip Buffer (bi-partite circular buffer) by Simone +## Cooke. +## +## The Bip-Buffer is like a circular buffer, but slightly different. Instead of +## keeping one head and tail pointer to the data in the buffer, it maintains two +## revolving regions, allowing for fast data access without having to worry +## about wrapping at the end of the buffer. Buffer allocations are always +## maintained as contiguous blocks, allowing the buffer to be used in a highly +## efficient manner with API calls, and also reducing the amount of copying +## which needs to be performed to put data into the buffer. Finally, a two-phase +## allocation system allows the user to pessimistically reserve an area of +## buffer space, and then trim back the buffer to commit to only the space which +## was used. +## +## https://www.codeproject.com/Articles/3479/The-Bip-Buffer-The-Circular-Buffer-with-a-Twist + +{.push raises: [].} + +type + BipPos = object + start: Natural + finish: Natural + + BipBuffer* = object + a, b, r: BipPos + data: seq[byte] + +proc init*(t: typedesc[BipBuffer], size: int): BipBuffer = + ## Creates new Bip Buffer with size `size`. + BipBuffer(data: newSeq[byte](size)) + +template len(pos: BipPos): Natural = + pos.finish - pos.start + +template reset(pos: var BipPos) = + pos = BipPos() + +func init(t: typedesc[BipPos], start, finish: Natural): BipPos = + BipPos(start: start, finish: finish) + +func calcReserve(bp: BipBuffer): tuple[space: Natural, start: Natural] = + if len(bp.b) > 0: + (Natural(bp.a.start - bp.b.finish), bp.b.finish) + else: + let spaceAfterA = Natural(len(bp.data) - bp.a.finish) + if spaceAfterA >= bp.a.start: + (spaceAfterA, bp.a.finish) + else: + (bp.a.start, Natural(0)) + +func availSpace*(bp: BipBuffer): Natural = + ## Returns amount of space available for reserve in buffer `bp`. + let (res, _) = bp.calcReserve() + res + +func len*(bp: BipBuffer): Natural = + ## Returns amount of used space in buffer `bp`. + len(bp.b) + len(bp.a) + +proc reserve*(bp: var BipBuffer, + size: Natural = 0): tuple[data: ptr byte, size: Natural] = + ## Reserve `size` bytes in buffer. + ## + ## If `size == 0` (default) reserve all available space from buffer. + ## + ## If there is not enough space in buffer for resevation - error will be + ## returned. + ## + ## Returns current reserved range as pointer of type `pt` and size of + ## type `st`. + const ErrorMessage = "Not enough space available" + doAssert(size <= len(bp.data)) + let (availableSpace, reserveStart) = bp.calcReserve() + if availableSpace == 0: + raiseAssert ErrorMessage + let reserveLength = + if size == 0: + availableSpace + else: + if size < availableSpace: + raiseAssert ErrorMessage + size + bp.r = BipPos.init(reserveStart, Natural(reserveStart + reserveLength)) + (addr bp.data[bp.r.start], len(bp.r)) + +proc commit*(bp: var BipBuffer, size: Natural) = + ## Updates structure's pointers when new data inserted into buffer. + doAssert(len(bp.r) >= size, + "Committed size could not be larger than the previously reserved one") + if size == 0: + bp.r.reset() + return + + let toCommit = min(size, len(bp.r)) + if len(bp.a) == 0 and len(bp.b) == 0: + bp.a.start = bp.r.start + bp.a.finish = bp.r.start + toCommit + elif bp.r.start == bp.a.finish: + bp.a.finish += toCommit + else: + bp.b.finish += toCommit + bp.r.reset() + +proc consume*(bp: var BipBuffer, size: Natural) = + ## The procedure removes/frees `size` bytes from the buffer ``bp``. + var currentSize = size + if currentSize >= len(bp.a): + currentSize -= len(bp.a) + bp.a = bp.b + bp.b.reset() + if currentSize >= len(bp.a): + currentSize -= len(bp.a) + bp.a.reset() + else: + bp.a.start += currentSize + else: + bp.a.start += currentSize + +iterator items*(bp: BipBuffer): byte = + ## Iterates over all the bytes in the buffer. + for index in bp.a.start ..< bp.a.finish: + yield bp.data[index] + for index in bp.b.start ..< bp.b.finish: + yield bp.data[index] + +iterator regions*(bp: var BipBuffer): tuple[data: ptr byte, size: Natural] = + ## Iterates over all the regions (`a` and `b`) in the buffer. + if len(bp.a) > 0: + yield (addr bp.data[bp.a.start], len(bp.a)) + if len(bp.b) > 0: + yield (addr bp.data[bp.b.start], len(bp.b)) diff --git a/chronos/streams/asyncstream.nim b/chronos/streams/asyncstream.nim index 3d2f858..473cc38 100644 --- a/chronos/streams/asyncstream.nim +++ b/chronos/streams/asyncstream.nim @@ -9,7 +9,7 @@ {.push raises: [].} -import ../[config, asyncloop, asyncsync] +import ../[config, asyncloop, asyncsync, bipbuffer] import ../transports/[common, stream] export asyncloop, asyncsync, stream, common @@ -34,10 +34,11 @@ type AsyncStreamWriteEOFError* = object of AsyncStreamWriteError AsyncBuffer* = object - offset*: int - buffer*: seq[byte] + backend*: BipBuffer events*: array[2, AsyncEvent] + AsyncBufferRef* = ref AsyncBuffer + WriteType* = enum Pointer, Sequence, String @@ -73,7 +74,7 @@ type tsource*: StreamTransport readerLoop*: StreamReaderLoop state*: AsyncStreamState - buffer*: AsyncBuffer + buffer*: AsyncBufferRef udata: pointer error*: ref AsyncStreamError bytesCount*: uint64 @@ -96,85 +97,51 @@ type AsyncStreamRW* = AsyncStreamReader | AsyncStreamWriter -proc init*(t: typedesc[AsyncBuffer], size: int): AsyncBuffer = - AsyncBuffer( - buffer: newSeq[byte](size), - events: [newAsyncEvent(), newAsyncEvent()], - offset: 0 +proc new*(t: typedesc[AsyncBufferRef], size: int): AsyncBufferRef = + AsyncBufferRef( + backend: BipBuffer.init(size), + events: [newAsyncEvent(), newAsyncEvent()] ) -proc getBuffer*(sb: AsyncBuffer): pointer {.inline.} = - unsafeAddr sb.buffer[sb.offset] - -proc bufferLen*(sb: AsyncBuffer): int {.inline.} = - len(sb.buffer) - sb.offset - -proc getData*(sb: AsyncBuffer): pointer {.inline.} = - unsafeAddr sb.buffer[0] - -template dataLen*(sb: AsyncBuffer): int = - sb.offset - -proc `[]`*(sb: AsyncBuffer, index: int): byte {.inline.} = - doAssert(index < sb.offset) - sb.buffer[index] - -proc update*(sb: var AsyncBuffer, size: int) {.inline.} = - sb.offset += size - -template wait*(sb: var AsyncBuffer): untyped = +template wait*(sb: AsyncBufferRef): untyped = sb.events[0].clear() sb.events[1].fire() sb.events[0].wait() -template transfer*(sb: var AsyncBuffer): untyped = +template transfer*(sb: AsyncBufferRef): untyped = sb.events[1].clear() sb.events[0].fire() sb.events[1].wait() -proc forget*(sb: var AsyncBuffer) {.inline.} = +proc forget*(sb: AsyncBufferRef) {.inline.} = sb.events[1].clear() sb.events[0].fire() -proc shift*(sb: var AsyncBuffer, size: int) {.inline.} = - if sb.offset > size: - moveMem(addr sb.buffer[0], addr sb.buffer[size], sb.offset - size) - sb.offset = sb.offset - size - else: - sb.offset = 0 - -proc copyData*(sb: AsyncBuffer, dest: pointer, offset, length: int) {.inline.} = - copyMem(cast[pointer](cast[uint](dest) + cast[uint](offset)), - unsafeAddr sb.buffer[0], length) - -proc upload*(sb: ptr AsyncBuffer, pbytes: ptr byte, +proc upload*(sb: AsyncBufferRef, pbytes: ptr byte, nbytes: int): Future[void] {. async: (raises: [CancelledError]).} = ## You can upload any amount of bytes to the buffer. If size of internal ## buffer is not enough to fit all the data at once, data will be uploaded ## via chunks of size up to internal buffer size. - var length = nbytes - var srcBuffer = cast[ptr UncheckedArray[byte]](pbytes) - var srcOffset = 0 + var + length = nbytes + srcBuffer = pbytes.toUnchecked() + offset = 0 + while length > 0: - let size = min(length, sb[].bufferLen()) + let size = min(length, sb.backend.availSpace()) if size == 0: - # Internal buffer is full, we need to transfer data to consumer. - await sb[].transfer() + # Internal buffer is full, we need to notify consumer. + await sb.transfer() else: + let (data, _) = sb.backend.reserve() # Copy data from `pbytes` to internal buffer. - copyMem(addr sb[].buffer[sb.offset], addr srcBuffer[srcOffset], size) - sb[].offset = sb[].offset + size - srcOffset = srcOffset + size + copyMem(data, addr srcBuffer[offset], size) + sb.backend.commit(size) + offset = offset + size length = length - size # We notify consumers that new data is available. - sb[].forget() - -template toDataOpenArray*(sb: AsyncBuffer): auto = - toOpenArray(sb.buffer, 0, sb.offset - 1) - -template toBufferOpenArray*(sb: AsyncBuffer): auto = - toOpenArray(sb.buffer, sb.offset, len(sb.buffer) - 1) + sb.forget() template copyOut*(dest: pointer, item: WriteItem, length: int) = if item.kind == Pointer: @@ -243,7 +210,7 @@ proc atEof*(rstream: AsyncStreamReader): bool = rstream.rsource.atEof() else: (rstream.state != AsyncStreamState.Running) and - (rstream.buffer.dataLen() == 0) + (len(rstream.buffer.backend) == 0) proc atEof*(wstream: AsyncStreamWriter): bool = ## Returns ``true`` is writing stream ``wstream`` closed or finished. @@ -331,12 +298,12 @@ template checkStreamFinished*(t: untyped) = template readLoop(body: untyped): untyped = while true: - if rstream.buffer.dataLen() == 0: + if len(rstream.buffer.backend) == 0: if rstream.state == AsyncStreamState.Error: raise rstream.error let (consumed, done) = body - rstream.buffer.shift(consumed) + rstream.buffer.backend.consume(consumed) rstream.bytesCount = rstream.bytesCount + uint64(consumed) if done: break @@ -373,17 +340,23 @@ proc readExactly*(rstream: AsyncStreamReader, pbytes: pointer, if isNil(rstream.readerLoop): await readExactly(rstream.rsource, pbytes, nbytes) else: - var index = 0 - var pbuffer = cast[ptr UncheckedArray[byte]](pbytes) + var + index = 0 + pbuffer = pbytes.toUnchecked() readLoop(): - if rstream.buffer.dataLen() == 0: + if len(rstream.buffer.backend) == 0: if rstream.atEof(): raise newAsyncStreamIncompleteError() - let count = min(nbytes - index, rstream.buffer.dataLen()) - if count > 0: - rstream.buffer.copyData(addr pbuffer[index], 0, count) - index += count - (consumed: count, done: index == nbytes) + var readed = 0 + for (region, rsize) in rstream.buffer.backend.regions(): + let count = min(nbytes - index, rsize) + readed += count + if count > 0: + copyMem(addr pbuffer[index], region, count) + index += count + if index == nbytes: + break + (consumed: readed, done: index == nbytes) proc readOnce*(rstream: AsyncStreamReader, pbytes: pointer, nbytes: int): Future[int] {. @@ -407,15 +380,21 @@ proc readOnce*(rstream: AsyncStreamReader, pbytes: pointer, if isNil(rstream.readerLoop): return await readOnce(rstream.rsource, pbytes, nbytes) else: - var count = 0 + var + pbuffer = pbytes.toUnchecked() + index = 0 readLoop(): - if rstream.buffer.dataLen() == 0: + if len(rstream.buffer.backend) == 0: (0, rstream.atEof()) else: - count = min(rstream.buffer.dataLen(), nbytes) - rstream.buffer.copyData(pbytes, 0, count) - (count, true) - return count + for (region, rsize) in rstream.buffer.backend.regions(): + let size = min(rsize, nbytes - index) + copyMem(addr pbuffer[index], region, size) + index += size + if index >= nbytes: + break + (index, true) + index proc readUntil*(rstream: AsyncStreamReader, pbytes: pointer, nbytes: int, sep: seq[byte]): Future[int] {. @@ -456,28 +435,32 @@ proc readUntil*(rstream: AsyncStreamReader, pbytes: pointer, nbytes: int, if isNil(rstream.readerLoop): return await readUntil(rstream.rsource, pbytes, nbytes, sep) else: - var pbuffer = cast[ptr UncheckedArray[byte]](pbytes) - var state = 0 - var k = 0 + var + pbuffer = pbytes.toUnchecked() + state = 0 + k = 0 readLoop(): if rstream.atEof(): raise newAsyncStreamIncompleteError() + var index = 0 - while index < rstream.buffer.dataLen(): + for ch in rstream.buffer.backend: if k >= nbytes: raise newAsyncStreamLimitError() - let ch = rstream.buffer[index] + inc(index) pbuffer[k] = ch inc(k) + if sep[state] == ch: inc(state) if state == len(sep): break else: state = 0 + (index, state == len(sep)) - return k + k proc readLine*(rstream: AsyncStreamReader, limit = 0, sep = "\r\n"): Future[string] {. @@ -507,18 +490,19 @@ proc readLine*(rstream: AsyncStreamReader, limit = 0, return await readLine(rstream.rsource, limit, sep) else: let lim = if limit <= 0: -1 else: limit - var state = 0 - var res = "" + var + state = 0 + res = "" + readLoop(): if rstream.atEof(): (0, true) else: var index = 0 - while index < rstream.buffer.dataLen(): - let ch = char(rstream.buffer[index]) + for ch in rstream.buffer.backend: inc(index) - if sep[state] == ch: + if sep[state] == char(ch): inc(state) if state == len(sep): break @@ -529,11 +513,14 @@ proc readLine*(rstream: AsyncStreamReader, limit = 0, res.add(sep[0 ..< missing]) else: res.add(sep[0 ..< state]) - res.add(ch) + state = 0 + + res.add(char(ch)) if len(res) == lim: break + (index, (state == len(sep)) or (lim == len(res))) - return res + res proc read*(rstream: AsyncStreamReader): Future[seq[byte]] {. async: (raises: [CancelledError, AsyncStreamError]).} = @@ -555,15 +542,17 @@ proc read*(rstream: AsyncStreamReader): Future[seq[byte]] {. if isNil(rstream.readerLoop): return await read(rstream.rsource) else: - var res = newSeq[byte]() + var res: seq[byte] readLoop(): if rstream.atEof(): (0, true) else: - let count = rstream.buffer.dataLen() - res.add(rstream.buffer.buffer.toOpenArray(0, count - 1)) - (count, false) - return res + var readed = 0 + for (region, rsize) in rstream.buffer.backend.regions(): + readed += rsize + res.add(region.toUnchecked().toOpenArray(0, rsize - 1)) + (readed, false) + res proc read*(rstream: AsyncStreamReader, n: int): Future[seq[byte]] {. async: (raises: [CancelledError, AsyncStreamError]).} = @@ -592,10 +581,13 @@ proc read*(rstream: AsyncStreamReader, n: int): Future[seq[byte]] {. if rstream.atEof(): (0, true) else: - let count = min(rstream.buffer.dataLen(), n - len(res)) - res.add(rstream.buffer.buffer.toOpenArray(0, count - 1)) - (count, len(res) == n) - return res + var readed = 0 + for (region, rsize) in rstream.buffer.backend.regions(): + let count = min(rsize, n - len(res)) + readed += count + res.add(region.toUnchecked().toOpenArray(0, count - 1)) + (readed, len(res) == n) + res proc consume*(rstream: AsyncStreamReader): Future[int] {. async: (raises: [CancelledError, AsyncStreamError]).} = @@ -622,9 +614,10 @@ proc consume*(rstream: AsyncStreamReader): Future[int] {. if rstream.atEof(): (0, true) else: - res += rstream.buffer.dataLen() - (rstream.buffer.dataLen(), false) - return res + let used = len(rstream.buffer.backend) + res += used + (used, false) + res proc consume*(rstream: AsyncStreamReader, n: int): Future[int] {. async: (raises: [CancelledError, AsyncStreamError]).} = @@ -652,13 +645,12 @@ proc consume*(rstream: AsyncStreamReader, n: int): Future[int] {. else: var res = 0 readLoop(): - if rstream.atEof(): - (0, true) - else: - let count = min(rstream.buffer.dataLen(), n - res) - res += count - (count, res == n) - return res + let + used = len(rstream.buffer.backend) + count = min(used, n - res) + res += count + (count, res == n) + res proc readMessage*(rstream: AsyncStreamReader, pred: ReadMessagePredicate) {. async: (raises: [CancelledError, AsyncStreamError]).} = @@ -689,15 +681,18 @@ proc readMessage*(rstream: AsyncStreamReader, pred: ReadMessagePredicate) {. await readMessage(rstream.rsource, pred) else: readLoop(): - let count = rstream.buffer.dataLen() - if count == 0: + if len(rstream.buffer.backend) == 0: if rstream.atEof(): pred([]) else: # Case, when transport's buffer is not yet filled with data. (0, false) else: - pred(rstream.buffer.buffer.toOpenArray(0, count - 1)) + var res: tuple[consumed: int, done: bool] + for (region, rsize) in rstream.buffer.backend.regions(): + res = pred(region.toUnchecked().toOpenArray(0, rsize - 1)) + break + res proc write*(wstream: AsyncStreamWriter, pbytes: pointer, nbytes: int) {. @@ -951,7 +946,7 @@ proc init*(child, rsource: AsyncStreamReader, loop: StreamReaderLoop, child.readerLoop = loop child.rsource = rsource child.tsource = rsource.tsource - child.buffer = AsyncBuffer.init(bufferSize) + child.buffer = AsyncBufferRef.new(bufferSize) trackCounter(AsyncStreamReaderTrackerName) child.startReader() @@ -963,7 +958,7 @@ proc init*[T](child, rsource: AsyncStreamReader, loop: StreamReaderLoop, child.readerLoop = loop child.rsource = rsource child.tsource = rsource.tsource - child.buffer = AsyncBuffer.init(bufferSize) + child.buffer = AsyncBufferRef.new(bufferSize) if not isNil(udata): GC_ref(udata) child.udata = cast[pointer](udata) diff --git a/chronos/streams/boundstream.nim b/chronos/streams/boundstream.nim index 8d2e52c..0f7eba1 100644 --- a/chronos/streams/boundstream.nim +++ b/chronos/streams/boundstream.nim @@ -18,7 +18,7 @@ {.push raises: [].} import results -import ../[asyncloop, timer, config] +import ../[asyncloop, timer, bipbuffer, config] import asyncstream, ../transports/[stream, common] export asyncloop, asyncstream, stream, timer, common @@ -103,7 +103,7 @@ func endsWith(s, suffix: openArray[byte]): bool = proc boundedReadLoop(stream: AsyncStreamReader) {.async: (raises: []).} = var rstream = BoundedStreamReader(stream) rstream.state = AsyncStreamState.Running - var buffer = newSeq[byte](rstream.buffer.bufferLen()) + var buffer = newSeq[byte](rstream.buffer.backend.availSpace()) while true: let toRead = if rstream.boundSize.isNone(): @@ -127,7 +127,7 @@ proc boundedReadLoop(stream: AsyncStreamReader) {.async: (raises: []).} = # There should be one step between transferring last bytes to the # consumer and declaring stream EOF. Otherwise could not be # consumed. - await upload(addr rstream.buffer, addr buffer[0], length) + await upload(rstream.buffer, addr buffer[0], length) if rstream.state == AsyncStreamState.Running: rstream.state = AsyncStreamState.Finished else: @@ -135,7 +135,7 @@ proc boundedReadLoop(stream: AsyncStreamReader) {.async: (raises: []).} = # There should be one step between transferring last bytes to the # consumer and declaring stream EOF. Otherwise could not be # consumed. - await upload(addr rstream.buffer, addr buffer[0], res) + await upload(rstream.buffer, addr buffer[0], res) if (res < toRead) and rstream.rsource.atEof(): case rstream.cmpop @@ -151,7 +151,7 @@ proc boundedReadLoop(stream: AsyncStreamReader) {.async: (raises: []).} = # There should be one step between transferring last bytes to the # consumer and declaring stream EOF. Otherwise could not be # consumed. - await upload(addr rstream.buffer, addr buffer[0], res) + await upload(rstream.buffer, addr buffer[0], res) if (res < toRead) and rstream.rsource.atEof(): case rstream.cmpop diff --git a/chronos/streams/chunkstream.nim b/chronos/streams/chunkstream.nim index f3e73e0..b9475d5 100644 --- a/chronos/streams/chunkstream.nim +++ b/chronos/streams/chunkstream.nim @@ -11,7 +11,7 @@ {.push raises: [].} -import ../[asyncloop, timer, config] +import ../[asyncloop, timer, bipbuffer, config] import asyncstream, ../transports/[stream, common] import results export asyncloop, asyncstream, stream, timer, common, results @@ -118,11 +118,11 @@ proc chunkedReadLoop(stream: AsyncStreamReader) {.async: (raises: []).} = var chunksize = cres.get() if chunksize > 0'u64: while chunksize > 0'u64: - let toRead = int(min(chunksize, - uint64(rstream.buffer.bufferLen()))) - await rstream.rsource.readExactly(rstream.buffer.getBuffer(), - toRead) - rstream.buffer.update(toRead) + let + (data, rsize) = rstream.buffer.backend.reserve() + toRead = int(min(chunksize, uint64(rsize))) + await rstream.rsource.readExactly(data, toRead) + rstream.buffer.backend.commit(toRead) await rstream.buffer.transfer() chunksize = chunksize - uint64(toRead) diff --git a/chronos/streams/tlsstream.nim b/chronos/streams/tlsstream.nim index 86f6d4c..9d90ab7 100644 --- a/chronos/streams/tlsstream.nim +++ b/chronos/streams/tlsstream.nim @@ -242,7 +242,7 @@ proc tlsReadApp(engine: ptr SslEngineContext, try: var length = 0'u var buf = sslEngineRecvappBuf(engine[], length) - await upload(addr reader.buffer, buf, int(length)) + await upload(reader.buffer, buf, int(length)) sslEngineRecvappAck(engine[], length) TLSResult.Success except CancelledError: diff --git a/chronos/transports/datagram.nim b/chronos/transports/datagram.nim index f89a6b4..d639121 100644 --- a/chronos/transports/datagram.nim +++ b/chronos/transports/datagram.nim @@ -11,7 +11,7 @@ import std/deques when not(defined(windows)): import ".."/selectors2 -import ".."/[asyncloop, config, osdefs, oserrno, osutils, handles] +import ".."/[asyncloop, osdefs, oserrno, osutils, handles] import "."/common import stew/ptrops diff --git a/chronos/transports/stream.nim b/chronos/transports/stream.nim index c80d992..b81a512 100644 --- a/chronos/transports/stream.nim +++ b/chronos/transports/stream.nim @@ -11,7 +11,7 @@ import std/deques import stew/ptrops -import ".."/[asyncloop, config, handles, osdefs, osutils, oserrno] +import ".."/[asyncloop, config, handles, bipbuffer, osdefs, osutils, oserrno] import ./common type @@ -72,8 +72,7 @@ when defined(windows): fd*: AsyncFD # File descriptor state: set[TransportState] # Current Transport state reader: ReaderFuture # Current reader Future - buffer: seq[byte] # Reading buffer - offset: int # Reading buffer offset + buffer: BipBuffer # Reading buffer error: ref TransportError # Current error queue: Deque[StreamVector] # Writer queue future: Future[void].Raising([]) # Stream life future @@ -82,7 +81,6 @@ when defined(windows): wwsabuf: WSABUF # Writer WSABUF rovl: CustomOverlapped # Reader OVERLAPPED structure wovl: CustomOverlapped # Writer OVERLAPPED structure - roffset: int # Pending reading offset flags: set[TransportFlags] # Internal flags case kind*: TransportKind of TransportKind.Socket: @@ -99,8 +97,7 @@ else: fd*: AsyncFD # File descriptor state: set[TransportState] # Current Transport state reader: ReaderFuture # Current reader Future - buffer: seq[byte] # Reading buffer - offset: int # Reading buffer offset + buffer: BipBuffer # Reading buffer error: ref TransportError # Current error queue: Deque[StreamVector] # Writer queue future: Future[void].Raising([]) # Stream life future @@ -184,14 +181,6 @@ template checkPending(t: untyped) = if not(isNil((t).reader)): raise newException(TransportError, "Read operation already pending!") -template shiftBuffer(t, c: untyped) = - if (t).offset > c: - if c > 0: - moveMem(addr((t).buffer[0]), addr((t).buffer[(c)]), (t).offset - (c)) - (t).offset = (t).offset - (c) - else: - (t).offset = 0 - template shiftVectorBuffer(v: var StreamVector, o: untyped) = (v).buf = cast[pointer](cast[uint]((v).buf) + uint(o)) (v).buflen -= int(o) @@ -228,6 +217,9 @@ proc clean(transp: StreamTransport) {.inline.} = transp.future.complete() GC_unref(transp) +template toUnchecked*(a: untyped): untyped = + cast[ptr UncheckedArray[byte]](a) + when defined(windows): template zeroOvelappedOffset(t: untyped) = @@ -245,9 +237,9 @@ when defined(windows): cast[HANDLE]((v).buflen) template setReaderWSABuffer(t: untyped) = - (t).rwsabuf.buf = cast[cstring]( - cast[uint](addr t.buffer[0]) + uint((t).roffset)) - (t).rwsabuf.len = ULONG(len((t).buffer) - (t).roffset) + let res = (t).buffer.reserve() + (t).rwsabuf.buf = cast[cstring](res.data) + (t).rwsabuf.len = uint32(res.size) template setWriterWSABuffer(t, v: untyped) = (t).wwsabuf.buf = cast[cstring](v.buf) @@ -381,8 +373,9 @@ when defined(windows): else: transp.queue.addFirst(vector) else: - let loop = getThreadDispatcher() - let size = min(uint32(getFileSize(vector)), 2_147_483_646'u32) + let + loop = getThreadDispatcher() + size = min(uint32(getFileSize(vector)), 2_147_483_646'u32) transp.wovl.setOverlappedOffset(vector.offset) var ret = loop.transmitFile(sock, getFileHandle(vector), size, @@ -481,29 +474,28 @@ when defined(windows): if bytesCount == 0: transp.state.incl({ReadEof, ReadPaused}) else: - if transp.offset != transp.roffset: - moveMem(addr transp.buffer[transp.offset], - addr transp.buffer[transp.roffset], - bytesCount) - transp.offset += int(bytesCount) - transp.roffset = transp.offset - if transp.offset == len(transp.buffer): + transp.buffer.commit(bytesCount) + if transp.buffer.availSpace() == 0: transp.state.incl(ReadPaused) of ERROR_OPERATION_ABORTED, ERROR_CONNECTION_ABORTED, ERROR_BROKEN_PIPE: # CancelIO() interrupt or closeSocket() call. + transp.buffer.commit(0) transp.state.incl(ReadPaused) of ERROR_NETNAME_DELETED, WSAECONNABORTED: + transp.buffer.commit(0) if transp.kind == TransportKind.Socket: transp.state.incl({ReadEof, ReadPaused}) else: transp.setReadError(err) of ERROR_PIPE_NOT_CONNECTED: + transp.buffer.commit(0) if transp.kind == TransportKind.Pipe: transp.state.incl({ReadEof, ReadPaused}) else: transp.setReadError(err) else: + transp.buffer.commit(0) transp.setReadError(err) transp.completeReader() @@ -524,7 +516,6 @@ when defined(windows): transp.state.incl(ReadPending) if transp.kind == TransportKind.Socket: let sock = SocketHandle(transp.fd) - transp.roffset = transp.offset transp.setReaderWSABuffer() let ret = wsaRecv(sock, addr transp.rwsabuf, 1, addr bytesCount, addr flags, @@ -549,7 +540,6 @@ when defined(windows): transp.completeReader() elif transp.kind == TransportKind.Pipe: let pipe = HANDLE(transp.fd) - transp.roffset = transp.offset transp.setReaderWSABuffer() let ret = readFile(pipe, cast[pointer](transp.rwsabuf.buf), DWORD(transp.rwsabuf.len), addr bytesCount, @@ -595,7 +585,7 @@ when defined(windows): udata: cast[pointer](transp)) transp.wovl.data = CompletionData(cb: writeStreamLoop, udata: cast[pointer](transp)) - transp.buffer = newSeq[byte](bufsize) + transp.buffer = BipBuffer.init(bufsize) transp.state = {ReadPaused, WritePaused} transp.queue = initDeque[StreamVector]() transp.future = Future[void].Raising([]).init( @@ -616,7 +606,7 @@ when defined(windows): udata: cast[pointer](transp)) transp.wovl.data = CompletionData(cb: writeStreamLoop, udata: cast[pointer](transp)) - transp.buffer = newSeq[byte](bufsize) + transp.buffer = BipBuffer.init(bufsize) transp.flags = flags transp.state = {ReadPaused, WritePaused} transp.queue = initDeque[StreamVector]() @@ -1390,11 +1380,12 @@ else: else: if transp.kind == TransportKind.Socket: while true: - let res = handleEintr( - osdefs.recv(fd, addr transp.buffer[transp.offset], - len(transp.buffer) - transp.offset, cint(0))) + let + (data, size) = transp.buffer.reserve() + res = handleEintr(osdefs.recv(fd, data, size, cint(0))) if res < 0: let err = osLastError() + transp.buffer.commit(0) case err of oserrno.ECONNRESET: transp.state.incl({ReadEof, ReadPaused}) @@ -1408,13 +1399,14 @@ else: discard removeReader2(transp.fd) elif res == 0: transp.state.incl({ReadEof, ReadPaused}) + transp.buffer.commit(0) let rres = removeReader2(transp.fd) if rres.isErr(): transp.state.incl(ReadError) transp.setReadError(rres.error()) else: - transp.offset += res - if transp.offset == len(transp.buffer): + transp.buffer.commit(res) + if transp.buffer.availSpace() == 0: transp.state.incl(ReadPaused) let rres = removeReader2(transp.fd) if rres.isErr(): @@ -1424,23 +1416,25 @@ else: break elif transp.kind == TransportKind.Pipe: while true: - let res = handleEintr( - osdefs.read(cint(fd), addr transp.buffer[transp.offset], - len(transp.buffer) - transp.offset)) + let + (data, size) = transp.buffer.reserve() + res = handleEintr(osdefs.read(cint(fd), data, size)) if res < 0: let err = osLastError() + transp.buffer.commit(0) transp.state.incl(ReadPaused) transp.setReadError(err) discard removeReader2(transp.fd) elif res == 0: transp.state.incl({ReadEof, ReadPaused}) + transp.buffer.commit(0) let rres = removeReader2(transp.fd) if rres.isErr(): transp.state.incl(ReadError) transp.setReadError(rres.error()) else: - transp.offset += res - if transp.offset == len(transp.buffer): + transp.buffer.commit(res) + if transp.buffer.availSpace() == 0: transp.state.incl(ReadPaused) let rres = removeReader2(transp.fd) if rres.isErr(): @@ -1458,7 +1452,7 @@ else: transp = StreamTransport(kind: TransportKind.Socket) transp.fd = sock - transp.buffer = newSeq[byte](bufsize) + transp.buffer = BipBuffer.init(bufsize) transp.state = {ReadPaused, WritePaused} transp.queue = initDeque[StreamVector]() transp.future = Future[void].Raising([]).init( @@ -1475,7 +1469,7 @@ else: transp = StreamTransport(kind: TransportKind.Pipe) transp.fd = fd - transp.buffer = newSeq[byte](bufsize) + transp.buffer = BipBuffer.init(bufsize) transp.state = {ReadPaused, WritePaused} transp.queue = initDeque[StreamVector]() transp.future = Future[void].Raising([]).init( @@ -2339,7 +2333,7 @@ proc writeFile*(transp: StreamTransport, handle: int, proc atEof*(transp: StreamTransport): bool {.inline.} = ## Returns ``true`` if ``transp`` is at EOF. - (transp.offset == 0) and (ReadEof in transp.state) and + (len(transp.buffer) == 0) and (ReadEof in transp.state) and (ReadPaused in transp.state) template readLoop(name, body: untyped): untyped = @@ -2351,16 +2345,17 @@ template readLoop(name, body: untyped): untyped = if ReadClosed in transp.state: raise newException(TransportUseClosedError, "Attempt to read data from closed stream") - if transp.offset == 0: + if len(transp.buffer) == 0: # We going to raise an error, only if transport buffer is empty. if ReadError in transp.state: raise transp.getError() let (consumed, done) = body - transp.shiftBuffer(consumed) + transp.buffer.consume(consumed) if done: break - else: + + if len(transp.buffer) == 0: checkPending(transp) let fut = ReaderFuture.init(name) transp.reader = fut @@ -2403,17 +2398,23 @@ proc readExactly*(transp: StreamTransport, pbytes: pointer, if nbytes == 0: return - var index = 0 - var pbuffer = cast[ptr UncheckedArray[byte]](pbytes) + var + index = 0 + pbuffer = pbytes.toUnchecked() readLoop("stream.transport.readExactly"): - if transp.offset == 0: + if len(transp.buffer) == 0: if transp.atEof(): raise newException(TransportIncompleteError, "Data incomplete!") - let count = min(nbytes - index, transp.offset) - if count > 0: - copyMem(addr pbuffer[index], addr(transp.buffer[0]), count) - index += count - (consumed: count, done: index == nbytes) + var readed = 0 + for (region, rsize) in transp.buffer.regions(): + let count = min(nbytes - index, rsize) + readed += count + if count > 0: + copyMem(addr pbuffer[index], region, count) + index += count + if index == nbytes: + break + (consumed: readed, done: index == nbytes) proc readOnce*(transp: StreamTransport, pbytes: pointer, nbytes: int): Future[int] {. @@ -2425,15 +2426,21 @@ proc readOnce*(transp: StreamTransport, pbytes: pointer, doAssert(not(isNil(pbytes)), "pbytes must not be nil") doAssert(nbytes > 0, "nbytes must be positive integer") - var count = 0 + var + pbuffer = pbytes.toUnchecked() + index = 0 readLoop("stream.transport.readOnce"): - if transp.offset == 0: + if len(transp.buffer) == 0: (0, transp.atEof()) else: - count = min(transp.offset, nbytes) - copyMem(pbytes, addr(transp.buffer[0]), count) - (count, true) - return count + for (region, rsize) in transp.buffer.regions(): + let size = min(rsize, nbytes - index) + copyMem(addr pbuffer[index], region, size) + index += size + if index >= nbytes: + break + (index, true) + index proc readUntil*(transp: StreamTransport, pbytes: pointer, nbytes: int, sep: seq[byte]): Future[int] {. @@ -2457,7 +2464,7 @@ proc readUntil*(transp: StreamTransport, pbytes: pointer, nbytes: int, if nbytes == 0: raise newException(TransportLimitError, "Limit reached!") - var pbuffer = cast[ptr UncheckedArray[byte]](pbytes) + var pbuffer = pbytes.toUnchecked() var state = 0 var k = 0 @@ -2466,14 +2473,11 @@ proc readUntil*(transp: StreamTransport, pbytes: pointer, nbytes: int, raise newException(TransportIncompleteError, "Data incomplete!") var index = 0 - - while index < transp.offset: + for ch in transp.buffer: if k >= nbytes: raise newException(TransportLimitError, "Limit reached!") - let ch = transp.buffer[index] inc(index) - pbuffer[k] = ch inc(k) @@ -2485,8 +2489,7 @@ proc readUntil*(transp: StreamTransport, pbytes: pointer, nbytes: int, state = 0 (index, state == len(sep)) - - return k + k proc readLine*(transp: StreamTransport, limit = 0, sep = "\r\n"): Future[string] {. @@ -2503,46 +2506,52 @@ proc readLine*(transp: StreamTransport, limit = 0, ## If ``limit`` more then 0, then read is limited to ``limit`` bytes. let lim = if limit <= 0: -1 else: limit var state = 0 + var res: string readLoop("stream.transport.readLine"): if transp.atEof(): (0, true) else: var index = 0 - while index < transp.offset: - let ch = char(transp.buffer[index]) - index += 1 + for ch in transp.buffer: + inc(index) - if sep[state] == ch: + if sep[state] == char(ch): inc(state) if state == len(sep): break else: if state != 0: if limit > 0: - let missing = min(state, lim - len(result) - 1) - result.add(sep[0 ..< missing]) + let missing = min(state, lim - len(res) - 1) + res.add(sep[0 ..< missing]) else: - result.add(sep[0 ..< state]) + res.add(sep[0 ..< state]) state = 0 - result.add(ch) - if len(result) == lim: + res.add(char(ch)) + if len(res) == lim: break - (index, (state == len(sep)) or (lim == len(result))) + (index, (state == len(sep)) or (lim == len(res))) + res proc read*(transp: StreamTransport): Future[seq[byte]] {. async: (raises: [TransportError, CancelledError]).} = ## Read all bytes from transport ``transp``. ## ## This procedure allocates buffer seq[byte] and return it as result. + var res: seq[byte] readLoop("stream.transport.read"): if transp.atEof(): (0, true) else: - result.add(transp.buffer.toOpenArray(0, transp.offset - 1)) - (transp.offset, false) + var readed = 0 + for (region, rsize) in transp.buffer.regions(): + readed += rsize + res.add(region.toUnchecked().toOpenArray(0, rsize - 1)) + (readed, false) + res proc read*(transp: StreamTransport, n: int): Future[seq[byte]] {. async: (raises: [TransportError, CancelledError]).} = @@ -2550,27 +2559,35 @@ proc read*(transp: StreamTransport, n: int): Future[seq[byte]] {. ## ## This procedure allocates buffer seq[byte] and return it as result. if n <= 0: - return await transp.read() + await transp.read() else: + var res: seq[byte] readLoop("stream.transport.read"): if transp.atEof(): (0, true) else: - let count = min(transp.offset, n - len(result)) - result.add(transp.buffer.toOpenArray(0, count - 1)) - (count, len(result) == n) + var readed = 0 + for (region, rsize) in transp.buffer.regions(): + let count = min(rsize, n - len(res)) + readed += count + res.add(region.toUnchecked().toOpenArray(0, count - 1)) + (readed, len(res) == n) + res proc consume*(transp: StreamTransport): Future[int] {. async: (raises: [TransportError, CancelledError]).} = ## Consume all bytes from transport ``transp`` and discard it. ## ## Return number of bytes actually consumed and discarded. + var res = 0 readLoop("stream.transport.consume"): if transp.atEof(): (0, true) else: - result += transp.offset - (transp.offset, false) + let used = len(transp.buffer) + res += used + (used, false) + res proc consume*(transp: StreamTransport, n: int): Future[int] {. async: (raises: [TransportError, CancelledError]).} = @@ -2579,15 +2596,19 @@ proc consume*(transp: StreamTransport, n: int): Future[int] {. ## ## Return number of bytes actually consumed and discarded. if n <= 0: - return await transp.consume() + await transp.consume() else: + var res = 0 readLoop("stream.transport.consume"): if transp.atEof(): (0, true) else: - let count = min(transp.offset, n - result) - result += count - (count, result == n) + let + used = len(transp.buffer) + count = min(used, n - res) + res += count + (count, res == n) + res proc readMessage*(transp: StreamTransport, predicate: ReadMessagePredicate) {. @@ -2605,14 +2626,18 @@ proc readMessage*(transp: StreamTransport, ## ``predicate`` callback will receive (zero-length) openArray, if transport ## is at EOF. readLoop("stream.transport.readMessage"): - if transp.offset == 0: + if len(transp.buffer) == 0: if transp.atEof(): predicate([]) else: # Case, when transport's buffer is not yet filled with data. (0, false) else: - predicate(transp.buffer.toOpenArray(0, transp.offset - 1)) + var res: tuple[consumed: int, done: bool] + for (region, rsize) in transp.buffer.regions(): + res = predicate(region.toUnchecked().toOpenArray(0, rsize - 1)) + break + res proc join*(transp: StreamTransport): Future[void] {. async: (raw: true, raises: [CancelledError]).} = @@ -2630,7 +2655,7 @@ proc join*(transp: StreamTransport): Future[void] {. retFuture.cancelCallback = cancel else: retFuture.complete() - return retFuture + retFuture proc closed*(transp: StreamTransport): bool {.inline.} = ## Returns ``true`` if transport in closed state.