497 lines
21 KiB
Nim
497 lines
21 KiB
Nim
#
|
|
# Chronos Asynchronous Bound Stream
|
|
# (c) Copyright 2021-Present
|
|
# Status Research & Development GmbH
|
|
#
|
|
# Licensed under either of
|
|
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
|
# MIT license (LICENSE-MIT)
|
|
|
|
## This module implements bounded stream reading and writing.
|
|
##
|
|
## For stream reading it means that you should read exactly bounded size of
|
|
## bytes or you should read all bytes until specific boundary.
|
|
##
|
|
## For stream writing it means that you should write exactly bounded size
|
|
## of bytes.
|
|
import stew/results
|
|
import ../asyncloop, ../timer
|
|
import asyncstream, ../transports/stream, ../transports/common
|
|
export asyncloop, asyncstream, stream, timer, common
|
|
|
|
type
|
|
BoundCmp* {.pure.} = enum
|
|
Equal, LessOrEqual
|
|
|
|
BoundedStreamReader* = ref object of AsyncStreamReader
|
|
boundSize: Opt[uint64]
|
|
boundary: seq[byte]
|
|
offset: uint64
|
|
cmpop: BoundCmp
|
|
|
|
BoundedStreamWriter* = ref object of AsyncStreamWriter
|
|
boundSize: uint64
|
|
offset: uint64
|
|
cmpop: BoundCmp
|
|
|
|
BoundedStreamError* = object of AsyncStreamError
|
|
BoundedStreamIncompleteError* = object of BoundedStreamError
|
|
BoundedStreamOverflowError* = object of BoundedStreamError
|
|
|
|
BoundedStreamRW* = BoundedStreamReader | BoundedStreamWriter
|
|
|
|
const
|
|
BoundedBufferSize* = 4096
|
|
BoundarySizeDefectMessage = "Boundary must not be empty array"
|
|
|
|
template newBoundedStreamIncompleteError(): ref BoundedStreamError =
|
|
newException(BoundedStreamIncompleteError,
|
|
"Stream boundary is not reached yet")
|
|
|
|
template newBoundedStreamOverflowError(): ref BoundedStreamOverflowError =
|
|
newException(BoundedStreamOverflowError, "Stream boundary exceeded")
|
|
|
|
proc readUntilBoundary(rstream: AsyncStreamReader, pbytes: pointer,
|
|
nbytes: int, sep: seq[byte]): Future[int] {.async.} =
|
|
doAssert(not(isNil(pbytes)), "pbytes must not be nil")
|
|
doAssert(nbytes >= 0, "nbytes must be non-negative value")
|
|
checkStreamClosed(rstream)
|
|
|
|
if nbytes == 0:
|
|
return 0
|
|
|
|
var k = 0
|
|
var state = 0
|
|
var pbuffer = cast[ptr UncheckedArray[byte]](pbytes)
|
|
|
|
proc predicate(data: openArray[byte]): tuple[consumed: int, done: bool] =
|
|
if len(data) == 0:
|
|
(0, true)
|
|
else:
|
|
var index = 0
|
|
while index < len(data):
|
|
if k >= nbytes:
|
|
return (index, true)
|
|
let ch = data[index]
|
|
inc(index)
|
|
pbuffer[k] = ch
|
|
inc(k)
|
|
if len(sep) > 0:
|
|
if sep[state] == ch:
|
|
inc(state)
|
|
if state == len(sep):
|
|
break
|
|
else:
|
|
state = 0
|
|
(index, (state == len(sep)) or (k == nbytes))
|
|
|
|
await rstream.readMessage(predicate)
|
|
return k
|
|
|
|
func endsWith(s, suffix: openArray[byte]): bool =
|
|
var i = 0
|
|
var j = len(s) - len(suffix)
|
|
while i + j >= 0 and i + j < len(s):
|
|
if s[i + j] != suffix[i]: return false
|
|
inc(i)
|
|
if i >= len(suffix): return true
|
|
|
|
proc boundedReadLoop(stream: AsyncStreamReader) {.async.} =
|
|
var rstream = BoundedStreamReader(stream)
|
|
rstream.state = AsyncStreamState.Running
|
|
var buffer = newSeq[byte](rstream.buffer.bufferLen())
|
|
while true:
|
|
let toRead =
|
|
if rstream.boundSize.isNone():
|
|
len(buffer)
|
|
else:
|
|
int(min(rstream.boundSize.get() - rstream.offset, uint64(len(buffer))))
|
|
try:
|
|
if toRead == 0:
|
|
# When ``rstream.boundSize`` is set and we already readed
|
|
# ``rstream.boundSize`` bytes.
|
|
if rstream.state == AsyncStreamState.Running:
|
|
rstream.state = AsyncStreamState.Finished
|
|
else:
|
|
let res = await readUntilBoundary(rstream.rsource, addr buffer[0],
|
|
toRead, rstream.boundary)
|
|
if res > 0:
|
|
if len(rstream.boundary) > 0:
|
|
if endsWith(buffer.toOpenArray(0, res - 1), rstream.boundary):
|
|
let length = res - len(rstream.boundary)
|
|
rstream.offset = rstream.offset + uint64(length)
|
|
# There should be one step between transferring last bytes to the
|
|
# consumer and declaring stream EOF. Otherwise could not be
|
|
# consumed.
|
|
await upload(addr rstream.buffer, addr buffer[0], length)
|
|
if rstream.state == AsyncStreamState.Running:
|
|
rstream.state = AsyncStreamState.Finished
|
|
else:
|
|
rstream.offset = rstream.offset + uint64(res)
|
|
# There should be one step between transferring last bytes to the
|
|
# consumer and declaring stream EOF. Otherwise could not be
|
|
# consumed.
|
|
await upload(addr rstream.buffer, addr buffer[0], res)
|
|
|
|
if (res < toRead) and rstream.rsource.atEof():
|
|
case rstream.cmpop
|
|
of BoundCmp.Equal:
|
|
if rstream.state == AsyncStreamState.Running:
|
|
rstream.state = AsyncStreamState.Error
|
|
rstream.error = newBoundedStreamIncompleteError()
|
|
of BoundCmp.LessOrEqual:
|
|
if rstream.state == AsyncStreamState.Running:
|
|
rstream.state = AsyncStreamState.Finished
|
|
else:
|
|
rstream.offset = rstream.offset + uint64(res)
|
|
# There should be one step between transferring last bytes to the
|
|
# consumer and declaring stream EOF. Otherwise could not be
|
|
# consumed.
|
|
await upload(addr rstream.buffer, addr buffer[0], res)
|
|
|
|
if (res < toRead) and rstream.rsource.atEof():
|
|
case rstream.cmpop
|
|
of BoundCmp.Equal:
|
|
if rstream.state == AsyncStreamState.Running:
|
|
rstream.state = AsyncStreamState.Error
|
|
rstream.error = newBoundedStreamIncompleteError()
|
|
of BoundCmp.LessOrEqual:
|
|
if rstream.state == AsyncStreamState.Running:
|
|
rstream.state = AsyncStreamState.Finished
|
|
else:
|
|
case rstream.cmpop
|
|
of BoundCmp.Equal:
|
|
if rstream.state == AsyncStreamState.Running:
|
|
rstream.state = AsyncStreamState.Error
|
|
rstream.error = newBoundedStreamIncompleteError()
|
|
of BoundCmp.LessOrEqual:
|
|
if rstream.state == AsyncStreamState.Running:
|
|
rstream.state = AsyncStreamState.Finished
|
|
|
|
except AsyncStreamError as exc:
|
|
if rstream.state == AsyncStreamState.Running:
|
|
rstream.state = AsyncStreamState.Error
|
|
rstream.error = exc
|
|
except CancelledError:
|
|
if rstream.state == AsyncStreamState.Running:
|
|
rstream.state = AsyncStreamState.Error
|
|
rstream.error = newAsyncStreamUseClosedError()
|
|
|
|
case rstream.state
|
|
of AsyncStreamState.Running:
|
|
discard
|
|
of AsyncStreamState.Error, AsyncStreamState.Stopped:
|
|
# Send `Error` or `Stopped` state to the consumer without waiting.
|
|
rstream.buffer.forget()
|
|
break
|
|
of AsyncStreamState.Finished:
|
|
# Send `EOF` state to the consumer and wait until it will be received.
|
|
await rstream.buffer.transfer()
|
|
break
|
|
of AsyncStreamState.Closing, AsyncStreamState.Closed:
|
|
break
|
|
|
|
proc boundedWriteLoop(stream: AsyncStreamWriter) {.async.} =
|
|
var error: ref AsyncStreamError
|
|
var wstream = BoundedStreamWriter(stream)
|
|
|
|
wstream.state = AsyncStreamState.Running
|
|
while true:
|
|
var item: WriteItem
|
|
try:
|
|
item = await wstream.queue.get()
|
|
if item.size > 0:
|
|
if uint64(item.size) <= (wstream.boundSize - wstream.offset):
|
|
# Writing chunk data.
|
|
case item.kind
|
|
of WriteType.Pointer:
|
|
await wstream.wsource.write(item.dataPtr, item.size)
|
|
of WriteType.Sequence:
|
|
await wstream.wsource.write(addr item.dataSeq[0], item.size)
|
|
of WriteType.String:
|
|
await wstream.wsource.write(addr item.dataStr[0], item.size)
|
|
wstream.offset = wstream.offset + uint64(item.size)
|
|
item.future.complete()
|
|
else:
|
|
if wstream.state == AsyncStreamState.Running:
|
|
wstream.state = AsyncStreamState.Error
|
|
error = newBoundedStreamOverflowError()
|
|
else:
|
|
if wstream.offset == wstream.boundSize:
|
|
if wstream.state == AsyncStreamState.Running:
|
|
wstream.state = AsyncStreamState.Finished
|
|
item.future.complete()
|
|
else:
|
|
case wstream.cmpop
|
|
of BoundCmp.Equal:
|
|
if wstream.state == AsyncStreamState.Running:
|
|
wstream.state = AsyncStreamState.Error
|
|
error = newBoundedStreamIncompleteError()
|
|
of BoundCmp.LessOrEqual:
|
|
if wstream.state == AsyncStreamState.Running:
|
|
wstream.state = AsyncStreamState.Finished
|
|
item.future.complete()
|
|
except CancelledError:
|
|
if wstream.state == AsyncStreamState.Running:
|
|
wstream.state = AsyncStreamState.Stopped
|
|
error = newAsyncStreamUseClosedError()
|
|
except AsyncStreamError as exc:
|
|
if wstream.state == AsyncStreamState.Running:
|
|
wstream.state = AsyncStreamState.Error
|
|
error = exc
|
|
|
|
case wstream.state
|
|
of AsyncStreamState.Running:
|
|
discard
|
|
of AsyncStreamState.Error, AsyncStreamState.Stopped:
|
|
if not(isNil(item.future)):
|
|
if not(item.future.finished()):
|
|
item.future.fail(error)
|
|
break
|
|
of AsyncStreamState.Finished, AsyncStreamState.Closing,
|
|
AsyncStreamState.Closed:
|
|
error = newAsyncStreamUseClosedError()
|
|
break
|
|
|
|
doAssert(not(isNil(error)))
|
|
while not(wstream.queue.empty()):
|
|
let item = wstream.queue.popFirstNoWait()
|
|
if not(item.future.finished()):
|
|
item.future.fail(error)
|
|
|
|
proc bytesLeft*(stream: BoundedStreamRW): uint64 =
|
|
## Returns number of bytes left in stream.
|
|
if stream.boundSize.isSome():
|
|
stream.boundSize.get() - stream.bytesCount
|
|
else:
|
|
0'u64
|
|
|
|
proc init*[T](child: BoundedStreamReader, rsource: AsyncStreamReader,
|
|
boundSize: uint64, comparison = BoundCmp.Equal,
|
|
bufferSize = BoundedBufferSize, udata: ref T) =
|
|
child.boundSize = some(boundSize)
|
|
child.cmpop = comparison
|
|
init(AsyncStreamReader(child), rsource, boundedReadLoop, bufferSize,
|
|
udata)
|
|
|
|
proc init*[T](child: BoundedStreamReader, rsource: AsyncStreamReader,
|
|
boundary: openArray[byte], comparison = BoundCmp.Equal,
|
|
bufferSize = BoundedBufferSize, udata: ref T) =
|
|
doAssert(len(boundary) > 0, BoundarySizeDefectMessage)
|
|
child.boundary = @boundary
|
|
child.cmpop = comparison
|
|
init(AsyncStreamReader(child), rsource, boundedReadLoop, bufferSize,
|
|
udata)
|
|
|
|
proc init*[T](child: BoundedStreamReader, rsource: AsyncStreamReader,
|
|
boundSize: uint64, boundary: openArray[byte],
|
|
comparison = BoundCmp.Equal,
|
|
bufferSize = BoundedBufferSize, udata: ref T) =
|
|
doAssert(len(boundary) > 0, BoundarySizeDefectMessage)
|
|
child.boundSize = Opt.some(boundSize)
|
|
child.boundary = @boundary
|
|
child.cmpop = comparison
|
|
init(AsyncStreamReader(child), rsource, boundedReadLoop, bufferSize,
|
|
udata)
|
|
|
|
proc init*(child: BoundedStreamReader, rsource: AsyncStreamReader,
|
|
boundSize: uint64, comparison = BoundCmp.Equal,
|
|
bufferSize = BoundedBufferSize) =
|
|
child.boundSize = Opt.some(boundSize)
|
|
child.cmpop = comparison
|
|
init(AsyncStreamReader(child), rsource, boundedReadLoop, bufferSize)
|
|
|
|
proc init*(child: BoundedStreamReader, rsource: AsyncStreamReader,
|
|
boundary: openArray[byte], comparison = BoundCmp.Equal,
|
|
bufferSize = BoundedBufferSize) =
|
|
doAssert(len(boundary) > 0, BoundarySizeDefectMessage)
|
|
child.boundary = @boundary
|
|
child.cmpop = comparison
|
|
init(AsyncStreamReader(child), rsource, boundedReadLoop, bufferSize)
|
|
|
|
proc init*(child: BoundedStreamReader, rsource: AsyncStreamReader,
|
|
boundSize: uint64, boundary: openArray[byte],
|
|
comparison = BoundCmp.Equal, bufferSize = BoundedBufferSize) =
|
|
doAssert(len(boundary) > 0, BoundarySizeDefectMessage)
|
|
child.boundSize = Opt.some(boundSize)
|
|
child.boundary = @boundary
|
|
child.cmpop = comparison
|
|
init(AsyncStreamReader(child), rsource, boundedReadLoop, bufferSize)
|
|
|
|
proc newBoundedStreamReader*[T](rsource: AsyncStreamReader,
|
|
boundSize: uint64,
|
|
comparison = BoundCmp.Equal,
|
|
bufferSize = BoundedBufferSize,
|
|
udata: ref T): BoundedStreamReader =
|
|
## Create new stream reader which will be limited by size ``boundSize``. When
|
|
## number of bytes readed by consumer reaches ``boundSize``,
|
|
## BoundedStreamReader will enter EOF state (no more bytes will be returned
|
|
## to the consumer).
|
|
##
|
|
## If ``comparison`` operator is ``BoundCmp.Equal`` and number of bytes readed
|
|
## from source stream reader ``rsource`` is less than ``boundSize`` -
|
|
## ``BoundedStreamIncompleteError`` will be raised. But comparison operator
|
|
## ``BoundCmp.LessOrEqual`` allows to consume less bytes without
|
|
## ``BoundedStreamIncompleteError`` exception.
|
|
var res = BoundedStreamReader()
|
|
res.init(rsource, boundSize, comparison, bufferSize, udata)
|
|
res
|
|
|
|
proc newBoundedStreamReader*[T](rsource: AsyncStreamReader,
|
|
boundary: openArray[byte],
|
|
comparison = BoundCmp.Equal,
|
|
bufferSize = BoundedBufferSize,
|
|
udata: ref T): BoundedStreamReader =
|
|
## Create new stream reader which will be limited by binary boundary
|
|
## ``boundary``. As soon as reader reaches ``boundary`` BoundedStreamReader
|
|
## will enter EOF state (no more bytes will be returned to the consumer).
|
|
##
|
|
## If ``comparison`` operator is ``BoundCmp.Equal`` and number of bytes readed
|
|
## from source stream reader ``rsource`` is less than ``boundSize`` -
|
|
## ``BoundedStreamIncompleteError`` will be raised. But comparison operator
|
|
## ``BoundCmp.LessOrEqual`` allows to consume less bytes without
|
|
## ``BoundedStreamIncompleteError`` exception.
|
|
var res = BoundedStreamReader()
|
|
res.init(rsource, boundary, comparison, bufferSize, udata)
|
|
res
|
|
|
|
proc newBoundedStreamReader*[T](rsource: AsyncStreamReader,
|
|
boundSize: uint64,
|
|
boundary: openArray[byte],
|
|
comparison = BoundCmp.Equal,
|
|
bufferSize = BoundedBufferSize,
|
|
udata: ref T): BoundedStreamReader =
|
|
## Create new stream reader which will be limited by size ``boundSize`` or
|
|
## boundary ``boundary``. As soon as reader reaches ``boundary`` ``OR`` number
|
|
## of bytes readed from source stream reader ``rsource`` reaches ``boundSize``
|
|
## BoundStreamReader will enter EOF state (no more bytes will be returned to
|
|
## the consumer).
|
|
##
|
|
## If ``comparison`` operator is ``BoundCmp.Equal`` and number of bytes readed
|
|
## from source stream reader ``rsource`` is less than ``boundSize`` -
|
|
## ``BoundedStreamIncompleteError`` will be raised. But comparison operator
|
|
## ``BoundCmp.LessOrEqual`` allows to consume less bytes without
|
|
## ``BoundedStreamIncompleteError`` exception.
|
|
var res = BoundedStreamReader()
|
|
res.init(rsource, boundSize, boundary, comparison, bufferSize, udata)
|
|
res
|
|
|
|
proc newBoundedStreamReader*(rsource: AsyncStreamReader,
|
|
boundSize: uint64,
|
|
comparison = BoundCmp.Equal,
|
|
bufferSize = BoundedBufferSize,
|
|
): BoundedStreamReader =
|
|
## Create new stream reader which will be limited by size ``boundSize``. When
|
|
## number of bytes readed by consumer reaches ``boundSize``,
|
|
## BoundedStreamReader will enter EOF state (no more bytes will be returned
|
|
## to the consumer).
|
|
##
|
|
## If ``comparison`` operator is ``BoundCmp.Equal`` and number of bytes readed
|
|
## from source stream reader ``rsource`` is less than ``boundSize`` -
|
|
## ``BoundedStreamIncompleteError`` will be raised. But comparison operator
|
|
## ``BoundCmp.LessOrEqual`` allows to consume less bytes without
|
|
## ``BoundedStreamIncompleteError`` exception.
|
|
var res = BoundedStreamReader()
|
|
res.init(rsource, boundSize, comparison, bufferSize)
|
|
res
|
|
|
|
proc newBoundedStreamReader*(rsource: AsyncStreamReader,
|
|
boundary: openArray[byte],
|
|
comparison = BoundCmp.Equal,
|
|
bufferSize = BoundedBufferSize,
|
|
): BoundedStreamReader =
|
|
## Create new stream reader which will be limited by binary boundary
|
|
## ``boundary``. As soon as reader reaches ``boundary`` BoundedStreamReader
|
|
## will enter EOF state (no more bytes will be returned to the consumer).
|
|
##
|
|
## If ``comparison`` operator is ``BoundCmp.Equal`` and number of bytes readed
|
|
## from source stream reader ``rsource`` is less than ``boundSize`` -
|
|
## ``BoundedStreamIncompleteError`` will be raised. But comparison operator
|
|
## ``BoundCmp.LessOrEqual`` allows to consume less bytes without
|
|
## ``BoundedStreamIncompleteError`` exception.
|
|
var res = BoundedStreamReader()
|
|
res.init(rsource, boundary, comparison, bufferSize)
|
|
res
|
|
|
|
proc newBoundedStreamReader*(rsource: AsyncStreamReader,
|
|
boundSize: uint64,
|
|
boundary: openArray[byte],
|
|
comparison = BoundCmp.Equal,
|
|
bufferSize = BoundedBufferSize,
|
|
): BoundedStreamReader =
|
|
## Create new stream reader which will be limited by size ``boundSize`` or
|
|
## boundary ``boundary``. As soon as reader reaches ``boundary`` ``OR`` number
|
|
## of bytes readed from source stream reader ``rsource`` reaches ``boundSize``
|
|
## BoundStreamReader will enter EOF state (no more bytes will be returned to
|
|
## the consumer).
|
|
##
|
|
## If ``comparison`` operator is ``BoundCmp.Equal`` and number of bytes readed
|
|
## from source stream reader ``rsource`` is less than ``boundSize`` -
|
|
## ``BoundedStreamIncompleteError`` will be raised. But comparison operator
|
|
## ``BoundCmp.LessOrEqual`` allows to consume less bytes without
|
|
## ``BoundedStreamIncompleteError`` exception.
|
|
var res = BoundedStreamReader()
|
|
res.init(rsource, boundSize, boundary, comparison, bufferSize)
|
|
res
|
|
|
|
proc init*[T](child: BoundedStreamWriter, wsource: AsyncStreamWriter,
|
|
boundSize: uint64, comparison = BoundCmp.Equal,
|
|
queueSize = AsyncStreamDefaultQueueSize, udata: ref T) =
|
|
child.boundSize = boundSize
|
|
child.cmpop = comparison
|
|
init(AsyncStreamWriter(child), wsource, boundedWriteLoop, queueSize,
|
|
udata)
|
|
|
|
proc init*(child: BoundedStreamWriter, wsource: AsyncStreamWriter,
|
|
boundSize: uint64, comparison = BoundCmp.Equal,
|
|
queueSize = AsyncStreamDefaultQueueSize) =
|
|
child.boundSize = boundSize
|
|
child.cmpop = comparison
|
|
init(AsyncStreamWriter(child), wsource, boundedWriteLoop, queueSize)
|
|
|
|
proc newBoundedStreamWriter*[T](wsource: AsyncStreamWriter,
|
|
boundSize: uint64,
|
|
comparison = BoundCmp.Equal,
|
|
queueSize = AsyncStreamDefaultQueueSize,
|
|
udata: ref T): BoundedStreamWriter =
|
|
## Create new stream writer which will be limited by size ``boundSize``. As
|
|
## soon as number of bytes written to the destination stream ``wsource``
|
|
## reaches ``boundSize`` stream will enter EOF state (no more bytes will be
|
|
## sent to remote destination stream ``wsource``).
|
|
##
|
|
## If ``comparison`` operator is ``BoundCmp.Equal`` and number of bytes
|
|
## written to destination stream ``wsource`` is less than ``boundSize`` -
|
|
## ``BoundedStreamIncompleteError`` will be raised on stream finishing. But
|
|
## comparison operator ``BoundCmp.LessOrEqual`` allows to send less bytes
|
|
## without ``BoundedStreamIncompleteError`` exception.
|
|
##
|
|
## For both comparison operators any attempt to write more bytes than
|
|
## ``boundSize`` will be interrupted with ``BoundedStreamOverflowError``
|
|
## exception.
|
|
var res = BoundedStreamWriter()
|
|
res.init(wsource, boundSize, comparison, queueSize, udata)
|
|
res
|
|
|
|
proc newBoundedStreamWriter*(wsource: AsyncStreamWriter,
|
|
boundSize: uint64,
|
|
comparison = BoundCmp.Equal,
|
|
queueSize = AsyncStreamDefaultQueueSize,
|
|
): BoundedStreamWriter =
|
|
## Create new stream writer which will be limited by size ``boundSize``. As
|
|
## soon as number of bytes written to the destination stream ``wsource``
|
|
## reaches ``boundSize`` stream will enter EOF state (no more bytes will be
|
|
## sent to remote destination stream ``wsource``).
|
|
##
|
|
## If ``comparison`` operator is ``BoundCmp.Equal`` and number of bytes
|
|
## written to destination stream ``wsource`` is less than ``boundSize`` -
|
|
## ``BoundedStreamIncompleteError`` will be raised on stream finishing. But
|
|
## comparison operator ``BoundCmp.LessOrEqual`` allows to send less bytes
|
|
## without ``BoundedStreamIncompleteError`` exception.
|
|
##
|
|
## For both comparison operators any attempt to write more bytes than
|
|
## ``boundSize`` will be interrupted with ``BoundedStreamOverflowError``
|
|
## exception.
|
|
var res = BoundedStreamWriter()
|
|
res.init(wsource, boundSize, comparison, queueSize)
|
|
res
|