Add AsyncStreams.

Add Chunked-Encoding AsyncStream reader/writer.
Add tests.
This commit is contained in:
cheatfate 2019-05-07 23:11:40 +03:00
parent 04b60a0e40
commit 1763c9dcff
No known key found for this signature in database
GPG Key ID: 46ADD633A7201F95
9 changed files with 1802 additions and 16 deletions

View File

@ -1,5 +1,5 @@
packageName = "chronos"
version = "2.2.5"
version = "2.2.6"
author = "Status Research & Development GmbH"
description = "Chronos"
license = "Apache License 2.0 or MIT"

View File

@ -770,7 +770,7 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] =
if not retFuture.finished:
if isNil(udata):
fut.removeCallback(continuation)
retFuture.fail(newException(AsyncTimeoutError, ""))
retFuture.fail(newException(AsyncTimeoutError, "Timeout exceeded!"))
else:
if not retFuture.finished:
if fut.failed:
@ -792,7 +792,7 @@ proc wait*[T](fut: Future[T], timeout = InfiniteDuration): Future[T] =
else:
retFuture.complete(fut.read())
else:
retFuture.fail(newException(AsyncTimeoutError, ""))
retFuture.fail(newException(AsyncTimeoutError, "Timeout exceeded!"))
else:
addTimer(Moment.fromNow(timeout), continuation, nil)
fut.addCallback(continuation)

View File

@ -48,8 +48,18 @@ proc setSockOpt*(socket: AsyncFD, level, optname, optval: int): bool =
sizeof(value).SockLen) < 0'i32:
result = false
proc setSockOpt*(socket: AsyncFD, level, optname: int, value: pointer,
valuelen: int): bool =
## `setsockopt()` for custom options (pointer and length).
## Returns ``true`` on success, ``false`` on error.
result = true
if setsockopt(SocketHandle(socket), cint(level), cint(optname), value,
Socklen(valuelen)) < 0'i32:
result = false
proc getSockOpt*(socket: AsyncFD, level, optname: int, value: var int): bool =
## `getsockopt()` for integer options.
## Returns ``true`` on success, ``false`` on error.
var res: cint
var size = sizeof(res).SockLen
result = true
@ -58,6 +68,16 @@ proc getSockOpt*(socket: AsyncFD, level, optname: int, value: var int): bool =
return false
value = int(res)
proc getSockOpt*(socket: AsyncFD, level, optname: int, value: pointer,
valuelen: var int): bool =
## `getsockopt()` for custom options (pointer and length).
## Returns ``true`` on success, ``false`` on error.
var res: cint
result = true
if getsockopt(SocketHandle(socket), cint(level), cint(optname),
value, cast[ptr Socklen](addr valuelen)) < 0'i32:
result = false
proc getSocketError*(socket: AsyncFD, err: var int): bool =
## Recover error code associated with socket handle ``socket``.
if not getSockOpt(socket, cint(SOL_SOCKET), cint(SO_ERROR), err):

View File

@ -0,0 +1,906 @@
#
# Chronos Asynchronous Streams
# (c) Copyright 2019-Present
# Status Research & Development GmbH
#
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
import ../asyncloop, ../asyncsync
import ../transports/common, ../transports/stream
export asyncsync, stream, common
const
AsyncStreamDefaultBufferSize* = 4096
## Default reading stream internal buffer size.
AsyncStreamDefaultQueueSize* = 0
## Default writing stream internal queue size.
AsyncStreamReaderTrackerName* = "async.stream.reader"
## AsyncStreamReader leaks tracker name
AsyncStreamWriterTrackerName* = "async.stream.writer"
## AsyncStreamWriter leaks tracker name
type
AsyncBuffer* = object
offset*: int
buffer*: seq[byte]
events*: array[2, AsyncEvent]
WriteType* = enum
Pointer, Sequence, String
WriteItem* = object
case kind*: WriteType
of Pointer:
data1*: pointer
of Sequence:
data2*: seq[byte]
of String:
data3*: string
size*: int
future*: Future[void]
AsyncStreamState* = enum
Running, ## Stream is online and working
Error, ## Stream has stored error
Stopped, ## Stream was closed while working
Finished, ## Stream was properly finished
Closed ## Stream was closed
StreamReaderLoop* = proc (stream: AsyncStreamReader): Future[void] {.gcsafe.}
## Main read loop for read streams.
StreamWriterLoop* = proc (stream: AsyncStreamWriter): Future[void] {.gcsafe.}
## Main write loop for write streams.
AsyncStreamReader* = ref object of RootRef
rsource*: AsyncStreamReader
tsource*: StreamTransport
readerLoop*: StreamReaderLoop
state*: AsyncStreamState
exevent*: AsyncEvent
buffer*: AsyncBuffer
udata: pointer
error*: ref Exception
future: Future[void]
AsyncStreamWriter* = ref object of RootRef
wsource*: AsyncStreamWriter
tsource*: StreamTransport
writerLoop*: StreamWriterLoop
state*: AsyncStreamState
exevent*: AsyncEvent
queue*: AsyncQueue[WriteItem]
udata: pointer
future: Future[void]
AsyncStream* = ref object of RootRef
reader*: AsyncStreamReader
writer*: AsyncStreamWriter
AsyncStreamTracker* = ref object of TrackerBase
opened*: int64
closed*: int64
AsyncStreamRW* = AsyncStreamReader | AsyncStreamWriter
AsyncStreamError* = object of Exception
AsyncStreamIncompleteError* = object of AsyncStreamError
AsyncStreamIncorrectError* = object of AsyncStreamError
AsyncStreamLimitError* = object of AsyncStreamError
AsyncStreamReadError* = object of AsyncStreamError
par*: ref Exception
AsyncStreamWriteError* = object of AsyncStreamError
par*: ref Exception
AsyncStreamTimeoutError* = object of AsyncStreamError
proc init*(t: typedesc[AsyncBuffer], size: int): AsyncBuffer =
result.buffer = newSeq[byte](size)
result.events[0] = newAsyncEvent()
result.events[1] = newAsyncEvent()
result.offset = 0
proc getBuffer*(sb: AsyncBuffer): pointer {.inline.} =
result = unsafeAddr sb.buffer[sb.offset]
proc bufferLen*(sb: AsyncBuffer): int {.inline.} =
result = len(sb.buffer) - sb.offset
proc getData*(sb: AsyncBuffer): pointer {.inline.} =
result = unsafeAddr sb.buffer[0]
proc dataLen*(sb: AsyncBuffer): int {.inline.} =
result = sb.offset
proc `[]`*(sb: AsyncBuffer, index: int): byte {.inline.} =
doAssert(index < sb.offset)
result = sb.buffer[index]
proc update*(sb: var AsyncBuffer, size: int) {.inline.} =
doAssert(sb.offset + size < len(sb.buffer))
sb.offset += size
proc wait*(sb: var AsyncBuffer): Future[void] =
sb.events[0].clear()
sb.events[1].fire()
result = sb.events[0].wait()
proc transfer*(sb: var AsyncBuffer): Future[void] =
sb.events[1].clear()
sb.events[0].fire()
result = sb.events[1].wait()
proc forget*(sb: var AsyncBuffer) {.inline.} =
sb.events[1].clear()
sb.events[0].fire()
proc shift*(sb: var AsyncBuffer, size: int) {.inline.} =
if sb.offset > size:
moveMem(addr sb.buffer[0], addr sb.buffer[size], sb.offset - size)
sb.offset = sb.offset - size
else:
sb.offset = 0
proc copyData*(sb: AsyncBuffer, dest: pointer, offset, length: int) {.inline.} =
doAssert(length <= sb.dataLen())
copyMem(cast[pointer](cast[uint](dest) + cast[uint](offset)),
unsafeAddr sb.buffer[0], length)
template toDataOpenArray*(sb: AsyncBuffer): auto =
toOpenArray(sb.buffer, 0, sb.offset - 1)
template toBufferOpenArray*(sb: AsyncBuffer): auto =
toOpenArray(sb.buffer, sb.offset, len(sb.buffer) - 1)
proc newAsyncStreamReadError(p: ref Exception): ref Exception {.inline.} =
var w = newException(AsyncStreamReadError, "Read stream failed")
w.par = p
result = w
proc newAsyncStreamWriteError(p: ref Exception): ref Exception {.inline.} =
var w = newException(AsyncStreamWriteError, "Write stream failed")
w.par = p
result = w
proc newAsyncStreamIncompleteError(): ref Exception {.inline.} =
result = newException(AsyncStreamIncompleteError, "Incomplete data received")
proc newAsyncStreamLimitError(): ref Exception {.inline.} =
result = newException(AsyncStreamLimitError, "Buffer limit reached")
proc newAsyncStreamIncorrectError(m: string): ref Exception {.inline.} =
result = newException(AsyncStreamIncorrectError, m)
proc atEof*(rstream: AsyncStreamReader): bool =
## Returns ``true`` is reading stream is closed or finished and internal
## buffer do not have any bytes left.
result = rstream.state in {AsyncStreamState.Stopped, Finished, Closed} and
(rstream.buffer.dataLen() == 0)
proc atEof*(wstream: AsyncStreamWriter): bool =
## Returns ``true`` is writing stream ``wstream`` closed or finished.
result = wstream.state in {AsyncStreamState.Stopped, Finished, Closed}
proc closed*(rw: AsyncStreamRW): bool {.inline.} =
## Returns ``true`` is reading/writing stream is closed.
(rw.state == AsyncStreamState.Closed)
proc finished*(rw: AsyncStreamRW): bool {.inline.} =
## Returns ``true`` is reading/writing stream is finished (completed).
(rw.state == AsyncStreamState.Finished)
proc stopped*(rw: AsyncStreamRW): bool {.inline.} =
## Returns ``true`` is reading/writing stream is stopped (interrupted).
(rw.state == AsyncStreamState.Stopped)
proc running*(rw: AsyncStreamRW): bool {.inline.} =
## Returns ``true`` is reading/writing stream is still pending.
(rw.state == AsyncStreamState.Running)
proc setupAsyncStreamReaderTracker(): AsyncStreamTracker {.gcsafe.}
proc setupAsyncStreamWriterTracker(): AsyncStreamTracker {.gcsafe.}
proc getAsyncStreamReaderTracker(): AsyncStreamTracker {.inline.} =
result = cast[AsyncStreamTracker](getTracker(AsyncStreamReaderTrackerName))
if isNil(result):
result = setupAsyncStreamReaderTracker()
proc getAsyncStreamWriterTracker(): AsyncStreamTracker {.inline.} =
result = cast[AsyncStreamTracker](getTracker(AsyncStreamWriterTrackerName))
if isNil(result):
result = setupAsyncStreamWriterTracker()
proc dumpAsyncStreamReaderTracking(): string {.gcsafe.} =
var tracker = getAsyncStreamReaderTracker()
result = "Opened async stream readers: " & $tracker.opened & "\n" &
"Closed async stream readers: " & $tracker.closed
proc dumpAsyncStreamWriterTracking(): string {.gcsafe.} =
var tracker = getAsyncStreamWriterTracker()
result = "Opened async stream writers: " & $tracker.opened & "\n" &
"Closed async stream writers: " & $tracker.closed
proc leakAsyncStreamReader(): bool {.gcsafe.} =
var tracker = getAsyncStreamReaderTracker()
result = tracker.opened != tracker.closed
proc leakAsyncStreamWriter(): bool {.gcsafe.} =
var tracker = getAsyncStreamWriterTracker()
result = tracker.opened != tracker.closed
proc trackAsyncStreamReader(t: AsyncStreamReader) {.inline.} =
var tracker = getAsyncStreamReaderTracker()
inc(tracker.opened)
proc untrackAsyncStreamReader*(t: AsyncStreamReader) {.inline.} =
var tracker = getAsyncStreamReaderTracker()
inc(tracker.closed)
proc trackAsyncStreamWriter(t: AsyncStreamWriter) {.inline.} =
var tracker = getAsyncStreamWriterTracker()
inc(tracker.opened)
proc untrackAsyncStreamWriter*(t: AsyncStreamWriter) {.inline.} =
var tracker = getAsyncStreamWriterTracker()
inc(tracker.closed)
proc setupAsyncStreamReaderTracker(): AsyncStreamTracker {.gcsafe.} =
result = new AsyncStreamTracker
result.opened = 0
result.closed = 0
result.dump = dumpAsyncStreamReaderTracking
result.isLeaked = leakAsyncStreamReader
addTracker(AsyncStreamReaderTrackerName, result)
proc setupAsyncStreamWriterTracker(): AsyncStreamTracker {.gcsafe.} =
result = new AsyncStreamTracker
result.opened = 0
result.closed = 0
result.dump = dumpAsyncStreamWriterTracking
result.isLeaked = leakAsyncStreamWriter
addTracker(AsyncStreamWriterTrackerName, result)
proc readExactly*(rstream: AsyncStreamReader, pbytes: pointer,
nbytes: int) {.async.} =
## Read exactly ``nbytes`` bytes from read-only stream ``rstream`` and store
## it to ``pbytes``.
##
## If EOF is received and ``nbytes`` is not yet readed, the procedure
## will raise ``AsyncStreamIncompleteError``.
if not rstream.running():
raise newAsyncStreamIncorrectError("Incorrect stream state")
if isNil(rstream.rsource):
try:
await readExactly(rstream.tsource, pbytes, nbytes)
except TransportIncompleteError:
raise newAsyncStreamIncompleteError()
except:
raise newAsyncStreamReadError(getCurrentException())
else:
var index = 0
while true:
let datalen = rstream.buffer.dataLen()
if rstream.state == Error:
raise newAsyncStreamReadError(rstream.error)
if datalen == 0 and rstream.atEof():
raise newAsyncStreamIncompleteError()
if datalen >= (nbytes - index):
rstream.buffer.copyData(pbytes, index, nbytes - index)
rstream.buffer.shift(nbytes - index)
break
else:
rstream.buffer.copyData(pbytes, index, datalen)
index += datalen
rstream.buffer.shift(datalen)
await rstream.buffer.wait()
proc readOnce*(rstream: AsyncStreamReader, pbytes: pointer,
nbytes: int): Future[int] {.async.} =
## Perform one read operation on read-only stream ``rstream``.
##
## If internal buffer is not empty, ``nbytes`` bytes will be transferred from
## internal buffer, otherwise it will wait until some bytes will be received.
if not rstream.running():
raise newAsyncStreamIncorrectError("Incorrect stream state")
if isNil(rstream.rsource):
try:
result = await readOnce(rstream.tsource, pbytes, nbytes)
except:
raise newAsyncStreamReadError(getCurrentException())
else:
while true:
let datalen = rstream.buffer.dataLen()
if rstream.state == Error:
raise newAsyncStreamReadError(rstream.error)
if datalen == 0:
if rstream.atEof():
result = 0
break
await rstream.buffer.wait()
else:
let size = min(datalen, nbytes)
rstream.buffer.copyData(pbytes, 0, size)
rstream.buffer.shift(size)
result = size
break
proc readUntil*(rstream: AsyncStreamReader, pbytes: pointer, nbytes: int,
sep: seq[byte]): Future[int] {.async.} =
## Read data from the read-only stream ``rstream`` until separator ``sep`` is
## found.
##
## On success, the data and separator will be removed from the internal
## buffer (consumed). Returned data will include the separator at the end.
##
## If EOF is received, and `sep` was not found, procedure will raise
## ``AsyncStreamIncompleteError``.
##
## If ``nbytes`` bytes has been received and `sep` was not found, procedure
## will raise ``AsyncStreamLimitError``.
##
## Procedure returns actual number of bytes read.
if not rstream.running():
raise newAsyncStreamIncorrectError("Incorrect stream state")
if isNil(rstream.rsource):
try:
result = await readUntil(rstream.tsource, pbytes, nbytes, sep)
except TransportIncompleteError:
raise newAsyncStreamIncompleteError()
except TransportLimitError:
raise newAsyncStreamLimitError()
except:
raise newAsyncStreamReadError(getCurrentException())
else:
var
dest = cast[ptr UncheckedArray[byte]](pbytes)
state = 0
k = 0
while true:
let datalen = rstream.buffer.dataLen()
if rstream.state == Error:
raise newAsyncStreamReadError(rstream.error)
if datalen == 0 and rstream.atEof():
raise newAsyncStreamIncompleteError()
var index = 0
while index < datalen:
let ch = rstream.buffer[index]
if sep[state] == ch:
inc(state)
else:
state = 0
if k < nbytes:
dest[k] = ch
inc(k)
else:
raise newAsyncStreamLimitError()
if state == len(sep):
break
inc(index)
if state == len(sep):
rstream.buffer.shift(index + 1)
result = k
break
else:
rstream.buffer.shift(datalen)
await rstream.buffer.wait()
proc readLine*(rstream: AsyncStreamReader, limit = 0,
sep = "\r\n"): Future[string] {.async.} =
## Read one line from read-only stream ``rstream``, where ``"line"`` is a
## sequence of bytes ending with ``sep`` (default is ``"\r\n"``).
##
## If EOF is received, and ``sep`` was not found, the method will return the
## partial read bytes.
##
## If the EOF was received and the internal buffer is empty, return an
## empty string.
##
## If ``limit`` more then 0, then result string will be limited to ``limit``
## bytes.
if not rstream.running():
raise newAsyncStreamIncorrectError("Incorrect stream state")
if isNil(rstream.rsource):
try:
result = await readLine(rstream.tsource, limit, sep)
except:
raise newAsyncStreamReadError(getCurrentException())
else:
var res = ""
var
lim = if limit <= 0: -1 else: limit
state = 0
while true:
let datalen = rstream.buffer.dataLen()
if rstream.state == Error:
raise newAsyncStreamReadError(rstream.error)
if datalen == 0 and rstream.atEof():
result = res
break
var index = 0
while index < datalen:
let ch = char(rstream.buffer[index])
if sep[state] == ch:
inc(state)
if state == len(sep) or len(res) == lim:
rstream.buffer.shift(index + 1)
break
else:
state = 0
res.add(ch)
if len(res) == lim:
rstream.buffer.shift(index + 1)
break
inc(index)
if state == len(sep) or (lim == len(res)):
result = res
break
else:
rstream.buffer.shift(datalen)
await rstream.buffer.wait()
proc read*(rstream: AsyncStreamReader, n = 0): Future[seq[byte]] {.async.} =
## Read all bytes (n <= 0) or exactly `n` bytes from read-only stream
## ``rstream``.
##
## This procedure allocates buffer seq[byte] and return it as result.
if not rstream.running():
raise newAsyncStreamIncorrectError("Incorrect stream state")
if isNil(rstream.rsource):
try:
result = await read(rstream.tsource, n)
except:
raise newAsyncStreamReadError(getCurrentException())
else:
var res = newSeq[byte]()
while true:
let datalen = rstream.buffer.dataLen()
if rstream.state == Error:
raise newAsyncStreamReadError(rstream.error)
if datalen == 0 and rstream.atEof():
result = res
break
if datalen > 0:
let s = len(res)
let o = s + datalen
if n <= 0:
res.setLen(o)
rstream.buffer.copyData(addr res[s], 0, datalen)
rstream.buffer.shift(datalen)
else:
let left = n - s
if datalen >= left:
res.setLen(n)
rstream.buffer.copyData(addr res[s], 0, left)
rstream.buffer.shift(left)
result = res
break
else:
res.setLen(o)
rstream.buffer.copyData(addr res[s], 0, datalen)
rstream.buffer.shift(datalen)
await rstream.buffer.wait()
proc consume*(rstream: AsyncStreamReader, n = -1): Future[int] {.async.} =
## Consume (discard) all bytes (n <= 0) or ``n`` bytes from read-only stream
## ``rstream``.
##
## Return number of bytes actually consumed (discarded).
if not rstream.running():
raise newAsyncStreamIncorrectError("Incorrect stream state")
if isNil(rstream.rsource):
try:
result = await consume(rstream.tsource, n)
except TransportLimitError:
raise newAsyncStreamLimitError()
except:
raise newAsyncStreamReadError(getCurrentException())
else:
var res = 0
while true:
let datalen = rstream.buffer.dataLen()
if rstream.state == Error:
raise newAsyncStreamReadError(rstream.error)
if datalen == 0:
if rstream.atEof():
if n <= 0:
result = res
break
else:
raise newAsyncStreamLimitError()
else:
if n <= 0:
res += datalen
rstream.buffer.shift(datalen)
else:
let left = n - res
if datalen >= left:
res += left
rstream.buffer.shift(left)
result = res
break
else:
res += datalen
rstream.buffer.shift(datalen)
await rstream.buffer.wait()
proc write*(wstream: AsyncStreamWriter, pbytes: pointer,
nbytes: int) {.async.} =
## Write sequence of bytes pointed by ``pbytes`` of length ``nbytes`` to
## writer stream ``wstream``.
##
## ``nbytes` must be more then zero.
if not wstream.running():
raise newAsyncStreamIncorrectError("Incorrect stream state")
if nbytes <= 0:
raise newAsyncStreamIncorrectError("Zero length message")
if isNil(wstream.wsource):
var resFut = write(wstream.tsource, pbytes, nbytes)
yield resFut
if resFut.failed:
raise newAsyncStreamWriteError(resFut.error)
if resFut.read() != nbytes:
raise newAsyncStreamIncompleteError()
else:
var item = WriteItem(kind: Pointer)
item.data1 = pbytes
item.size = nbytes
item.future = newFuture[void]("async.stream.write(pointer)")
await wstream.queue.put(item)
yield item.future
if item.future.failed:
raise newAsyncStreamWriteError(item.future.error)
proc write*(wstream: AsyncStreamWriter, sbytes: seq[byte],
msglen = -1) {.async.} =
## Write sequence of bytes ``sbytes`` of length ``msglen`` to writer
## stream ``wstream``.
##
## Sequence of bytes ``sbytes`` must not be zero-length.
##
## If ``msglen < 0`` whole sequence ``sbytes`` will be writen to stream.
## If ``msglen > len(sbytes)`` only ``len(sbytes)`` bytes will be written to
## stream.
let length = if msglen <= 0: len(sbytes) else: min(msglen, len(sbytes))
if not wstream.running():
raise newAsyncStreamIncorrectError("Incorrect stream state")
if length <= 0:
raise newAsyncStreamIncorrectError("Zero length message")
if isNil(wstream.wsource):
var resFut = write(wstream.tsource, sbytes)
yield resFut
if resFut.failed:
raise newAsyncStreamWriteError(resFut.error)
if resFut.read() != length:
raise newAsyncStreamIncompleteError()
else:
var item = WriteItem(kind: Sequence)
if not isLiteral(sbytes):
shallowCopy(item.data2, sbytes)
else:
item.data2 = sbytes
item.size = length
item.future = newFuture[void]("async.stream.write(seq)")
await wstream.queue.put(item)
yield item.future
if item.future.failed:
raise newAsyncStreamWriteError(item.future.error)
proc write*(wstream: AsyncStreamWriter, sbytes: string,
msglen = -1) {.async.} =
## Write string ``sbytes`` of length ``msglen`` to writer stream ``wstream``.
##
## String ``sbytes`` must not be zero-length.
##
## If ``msglen < 0`` whole string ``sbytes`` will be writen to stream.
## If ``msglen > len(sbytes)`` only ``len(sbytes)`` bytes will be written to
## stream.
let length = if msglen <= 0: len(sbytes) else: min(msglen, len(sbytes))
if not wstream.running():
raise newAsyncStreamIncorrectError("Incorrect stream state")
if length <= 0:
raise newAsyncStreamIncorrectError("Zero length message")
if isNil(wstream.wsource):
var resFut = write(wstream.tsource, sbytes, msglen)
yield resFut
if resFut.failed:
raise newAsyncStreamWriteError(resFut.error)
if resFut.read() != length:
raise newAsyncStreamIncompleteError()
else:
var item = WriteItem(kind: String)
if not isLiteral(sbytes):
shallowCopy(item.data3, sbytes)
else:
item.data3 = sbytes
item.size = length
item.future = newFuture[void]("async.stream.write(string)")
await wstream.queue.put(item)
yield item.future
if item.future.failed:
raise newAsyncStreamWriteError(item.future.error)
proc finish*(wstream: AsyncStreamWriter) {.async.} =
## Finish write stream ``wstream``.
if not wstream.running():
raise newAsyncStreamIncorrectError("Incorrect stream state")
if not isNil(wstream.wsource):
var item = WriteItem(kind: Pointer)
item.size = 0
item.future = newFuture[void]("async.stream.finish")
await wstream.queue.put(item)
yield item.future
if item.future.failed:
raise newAsyncStreamWriteError(item.future.error)
proc join*(rw: AsyncStreamRW): Future[void] =
## Get Future[void] which will be completed when stream become finished or
## closed.
when rw is AsyncStreamReader:
var retFuture = newFuture[void]("async.stream.reader.join")
else:
var retFuture = newFuture[void]("async.stream.writer.join")
proc continuation(udata: pointer) = retFuture.complete()
if not rw.future.finished:
rw.future.addCallback(continuation)
else:
retFuture.complete()
return retFuture
proc close*(rw: AsyncStreamRW) =
## Close and frees resources of stream ``rw``.
##
## Note close() procedure is not completed immediately!
if rw.closed():
raise newAsyncStreamIncorrectError("Stream is already closed!")
proc continuation(udata: pointer) =
if not isNil(rw.udata):
GC_unref(cast[ref int](rw.udata))
rw.state = AsyncStreamState.Closed
rw.future.complete()
when rw is AsyncStreamReader:
untrackAsyncStreamReader(rw)
elif rw is AsyncStreamWriter:
untrackAsyncStreamWriter(rw)
when rw is AsyncStreamReader:
if isNil(rw.rsource):
callSoon(continuation)
else:
rw.exevent.fire()
elif rw is AsyncStreamWriter:
if isNil(rw.wsource):
callSoon(continuation)
else:
rw.exevent.fire()
proc closeWait*(rw: AsyncStreamRW): Future[void] =
## Close and frees resources of stream ``rw``.
rw.close()
result = rw.join()
proc startReader(rstream: AsyncStreamReader) =
rstream.state = Running
if not isNil(rstream.readerLoop):
rstream.future = rstream.readerLoop(rstream)
else:
rstream.future = newFuture[void]("async.stream.empty.reader")
proc startWriter(wstream: AsyncStreamWriter) =
wstream.state = Running
if not isNil(wstream.writerLoop):
wstream.future = wstream.writerLoop(wstream)
else:
wstream.future = newFuture[void]("async.stream.empty.writer")
proc init*(child, wsource: AsyncStreamWriter, loop: StreamWriterLoop,
queueSize = AsyncStreamDefaultQueueSize) =
## Initialize newly allocated object ``child`` with AsyncStreamWriter
## parameters.
child.writerLoop = loop
child.wsource = wsource
child.tsource = wsource.tsource
child.exevent = newAsyncEvent()
child.queue = newAsyncQueue[WriteItem](queueSize)
trackAsyncStreamWriter(child)
child.startWriter()
proc init*[T](child, wsource: AsyncStreamWriter, loop: StreamWriterLoop,
queueSize = AsyncStreamDefaultQueueSize, udata: ref T) =
## Initialize newly allocated object ``child`` with AsyncStreamWriter
## parameters.
child.writerLoop = loop
child.wsource = wsource
child.tsource = wsource.tsource
child.exevent = newAsyncEvent()
child.queue = newAsyncQueue[WriteItem](queueSize)
if not isNil(udata):
GC_ref(udata)
child.udata = cast[pointer](udata)
trackAsyncStreamWriter(child)
child.startWriter()
proc init*(child, rsource: AsyncStreamReader, loop: StreamReaderLoop,
bufferSize = AsyncStreamDefaultBufferSize) =
## Initialize newly allocated object ``child`` with AsyncStreamReader
## parameters.
child.readerLoop = loop
child.rsource = rsource
child.tsource = rsource.tsource
child.exevent = newAsyncEvent()
child.buffer = AsyncBuffer.init(bufferSize)
trackAsyncStreamReader(child)
child.startReader()
proc init*[T](child, rsource: AsyncStreamReader, loop: StreamReaderLoop,
bufferSize = AsyncStreamDefaultBufferSize,
udata: ref T) =
## Initialize newly allocated object ``child`` with AsyncStreamReader
## parameters.
child.readerLoop = loop
child.rsource = rsource
child.tsource = rsource.tsource
child.exevent = newAsyncEvent()
child.buffer = AsyncBuffer.init(bufferSize)
if not isNil(udata):
GC_ref(udata)
child.udata = cast[pointer](udata)
trackAsyncStreamReader(child)
child.startReader()
proc init*(child: AsyncStreamWriter, tsource: StreamTransport) =
## Initialize newly allocated object ``child`` with AsyncStreamWriter
## parameters.
child.writerLoop = nil
child.wsource = nil
child.tsource = tsource
trackAsyncStreamWriter(child)
child.startWriter()
proc init*[T](child: AsyncStreamWriter, tsource: StreamTransport,
udata: ref T) =
## Initialize newly allocated object ``child`` with AsyncStreamWriter
## parameters.
child.writerLoop = nil
child.wsource = nil
child.tsource = tsource
trackAsyncStreamWriter(child)
child.startWriter()
proc init*(child: AsyncStreamReader, tsource: StreamTransport) =
## Initialize newly allocated object ``child`` with AsyncStreamReader
## parameters.
child.readerLoop = nil
child.rsource = nil
child.tsource = tsource
trackAsyncStreamReader(child)
child.startReader()
proc init*[T](child: AsyncStreamReader, tsource: StreamTransport,
udata: ref T) =
## Initialize newly allocated object ``child`` with AsyncStreamReader
## parameters.
child.readerLoop = nil
child.rsource = nil
child.tsource = tsource
if not isNil(udata):
GC_ref(udata)
child.udata = cast[pointer](udata)
trackAsyncStreamReader(child)
child.startReader()
proc newAsyncStreamReader*[T](rsource: AsyncStreamReader,
loop: StreamReaderLoop,
bufferSize = AsyncStreamDefaultBufferSize,
udata: ref T): AsyncStreamReader =
## Create new AsyncStreamReader object, which will use other async stream
## reader ``rsource`` as source data channel.
##
## ``loop`` is main reading loop procedure.
##
## ``bufferSize`` is internal buffer size.
##
## ``udata`` - user object which will be associated with new AsyncStreamReader
## object.
result = new AsyncStreamReader
result.init(rsource, loop, bufferSize, udata)
proc newAsyncStreamReader*(rsource: AsyncStreamReader,
loop: StreamReaderLoop,
bufferSize = AsyncStreamDefaultBufferSize
): AsyncStreamReader =
## Create new AsyncStreamReader object, which will use other async stream
## reader ``rsource`` as source data channel.
##
## ``loop`` is main reading loop procedure.
##
## ``bufferSize`` is internal buffer size.
result = new AsyncStreamReader
result.init(rsource, loop, bufferSize)
proc newAsyncStreamReader*[T](tsource: StreamTransport,
udata: ref T): AsyncStreamReader =
## Create new AsyncStreamReader object, which will use stream transport
## ``tsource`` as source data channel.
##
## ``udata`` - user object which will be associated with new AsyncStreamWriter
## object.
result = new AsyncStreamReader
result.init(tsource, udata)
proc newAsyncStreamReader*(tsource: StreamTransport): AsyncStreamReader =
## Create new AsyncStreamReader object, which will use stream transport
## ``tsource`` as source data channel.
result = new AsyncStreamReader
result.init(tsource)
proc newAsyncStreamWriter*[T](wsource: AsyncStreamWriter,
loop: StreamWriterLoop,
queueSize = AsyncStreamDefaultQueueSize,
udata: ref T): AsyncStreamWriter =
## Create new AsyncStreamWriter object which will use other AsyncStreamWriter
## object ``wsource`` as data channel.
##
## ``loop`` is main writing loop procedure.
##
## ``queueSize`` is writing queue size (default size is unlimited).
##
## ``udata`` - user object which will be associated with new AsyncStreamWriter
## object.
result = new AsyncStreamWriter
result.init(wsource, loop, queueSize, udata)
proc newAsyncStreamWriter*(wsource: AsyncStreamWriter,
loop: StreamWriterLoop,
queueSize = AsyncStreamDefaultQueueSize
): AsyncStreamWriter =
## Create new AsyncStreamWriter object which will use other AsyncStreamWriter
## object ``wsource`` as data channel.
##
## ``loop`` is main writing loop procedure.
##
## ``queueSize`` is writing queue size (default size is unlimited).
result = new AsyncStreamWriter
result.init(wsource, loop, queueSize)
proc newAsyncStreamWriter*[T](tsource: StreamTransport,
udata: ref T): AsyncStreamWriter =
## Create new AsyncStreamWriter object which will use stream transport
## ``tsource`` as data channel.
##
## ``udata`` - user object which will be associated with new AsyncStreamWriter
## object.
result = new AsyncStreamWriter
result.init(tsource, udata)
proc newAsyncStreamWriter*(tsource: StreamTransport): AsyncStreamWriter =
## Create new AsyncStreamWriter object which will use stream transport
## ``tsource`` as data channel.
result = new AsyncStreamWriter
result.init(tsource)
proc getUserData*[T](rw: AsyncStreamRW): T {.inline.} =
## Obtain user data associated with AsyncStreamReader or AsyncStreamWriter
## object ``rw``.
result = cast[T](rw.udata)

View File

@ -0,0 +1,383 @@
#
# Chronos Asynchronous Chunked-Encoding Stream
# (c) Copyright 2019-Present
# Status Research & Development GmbH
#
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
import ../asyncloop, ../timer
import asyncstream, ../transports/stream, ../transports/common
export asyncstream, stream, timer, common
const
ChunkBufferSize = 4096
ChunkHeaderSize = 8
ChunkDefaultTimeout = 10.seconds
CRLF = @[byte(0x0D), byte(0x0A)]
type
ChunkedStreamReader* = ref object of AsyncStreamReader
timeout: Duration
ChunkedStreamWriter* = ref object of AsyncStreamWriter
ChunkedStreamError* = object of Exception
ChunkedStreamTimeoutError* = object of ChunkedStreamError
ChunkedStreamProtocolError* = object of ChunkedStreamError
proc newTimeoutError(): ref Exception {.inline.} =
newException(ChunkedStreamTimeoutError, "Timeout exceeded!")
proc newProtocolError(): ref Exception {.inline.} =
newException(ChunkedStreamProtocolError, "Protocol error!")
proc oneOf*[A, B, C](fut1: Future[A], fut2: Future[B],
fut3: Future[C]): Future[void] =
## Wait for completion of one future from list [`fut1`, `fut2`, `fut3`],
## resulting future do not care about futures error, so you need to check
## `fut1`, `fut2`, `fut3` for error manually.
var retFuture = newFuture[void]("chunked.oneOf()")
proc cb(data: pointer) {.gcsafe.} =
var index: int
if not retFuture.finished:
var fut = cast[FutureBase](data)
if cast[pointer](fut1) == data:
fut2.removeCallback(cb)
fut3.removeCallback(cb)
elif cast[pointer](fut2) == data:
fut1.removeCallback(cb)
fut3.removeCallback(cb)
elif cast[pointer](fut3) == data:
fut2.removeCallback(cb)
fut3.removeCallback(cb)
retFuture.complete()
fut1.callback = cb
fut2.callback = cb
fut3.callback = cb
return retFuture
proc oneOf*[A, B](fut1: Future[A], fut2: Future[B]): Future[void] =
## Wait for completion of `fut1` or `fut2`, resulting future do not care about
## error, so you need to check `fut1` and `fut2` for error.
var retFuture = newFuture[void]("chunked.oneOf()")
proc cb(data: pointer) {.gcsafe.} =
var index: int
if not retFuture.finished:
var fut = cast[FutureBase](data)
if cast[pointer](fut1) == data:
fut2.removeCallback(cb)
elif cast[pointer](fut2) == data:
fut1.removeCallback(cb)
retFuture.complete()
fut1.callback = cb
fut2.callback = cb
return retFuture
proc getChunkSize(buffer: openarray[byte]): uint64 =
# We using `uint64` representation, but allow only 2^32 chunk size,
# ChunkHeaderSize.
for i in 0..<min(len(buffer), ChunkHeaderSize):
let ch = buffer[i]
if char(ch) in {'0'..'9', 'a'..'f', 'A'..'F'}:
if ch >= byte('0') and ch <= byte('9'):
result = (result shl 4) or uint64(ch - byte('0'))
else:
result = (result shl 4) or uint64((ch and 0x0F) + 9)
else:
result = 0xFFFF_FFFF_FFFF_FFFF'u64
break
proc setChunkSize(buffer: var openarray[byte], length: int64): int =
# Store length as chunk header size (hexadecimal value) with CRLF.
# Maximum stored value is ``0xFFFF_FFFF``.
# Buffer ``buffer`` length must be at least 10 octets.
doAssert(length <= int64(uint32.high))
var n = 0xF000_0000'i64
var i = 32
var c = 0
if length == 0:
buffer[0] = byte('0')
buffer[1] = byte(0x0D)
buffer[2] = byte(0x0A)
result = 3
else:
while n != 0:
var v = length and n
if v != 0 or c != 0:
let digit = byte((length and n) shr (i - 4))
var ch = digit + byte('0')
if ch > byte('9'):
ch = ch + 0x07'u8
buffer[c] = ch
inc(c)
n = n shr 4
i = i - 4
buffer[c] = byte(0x0D)
buffer[c + 1] = byte(0x0A)
result = c + 2
proc chunkedReadLoop(stream: AsyncStreamReader) {.async.} =
var rstream = cast[ChunkedStreamReader](stream)
var exitFut = rstream.exevent.wait()
var buffer = newSeq[byte](1024)
var timerFut: Future[void]
if rstream.timeout == InfiniteDuration:
timerFut = newFuture[void]()
while true:
if rstream.timeout != InfiniteDuration:
timerFut = sleepAsync(rstream.timeout)
# Reading chunk size
var ruFut1 = rstream.rsource.readUntil(addr buffer[0], 1024, CRLF)
await oneOf(ruFut1, timerFut, exitFut)
if exitFut.finished or timerFut.finished:
if timerFut.finished:
rstream.error = newTimeoutError()
rstream.state = AsyncStreamState.Error
else:
rstream.state = AsyncStreamState.Stopped
break
if ruFut1.failed:
rstream.error = ruFut1.error
rstream.state = AsyncStreamState.Error
break
let length = ruFut1.read()
var chunksize = getChunkSize(buffer.toOpenArray(0, length - len(CRLF) - 1))
if chunksize == 0xFFFF_FFFF_FFFF_FFFF'u64:
rstream.error = newProtocolError()
rstream.state = AsyncStreamState.Error
break
elif chunksize > 0'u64:
while chunksize > 0'u64:
let toRead = min(int(chunksize), rstream.buffer.bufferLen())
var reFut2 = rstream.rsource.readExactly(rstream.buffer.getBuffer(),
toRead)
await oneOf(reFut2, timerFut, exitFut)
if exitFut.finished or timerFut.finished:
if timerFut.finished:
rstream.error = newTimeoutError()
rstream.state = AsyncStreamState.Error
else:
rstream.state = AsyncStreamState.Stopped
break
if reFut2.failed:
rstream.error = reFut2.error
rstream.state = AsyncStreamState.Error
break
rstream.buffer.update(toRead)
await rstream.buffer.transfer() or exitFut
if exitFut.finished:
rstream.state = AsyncStreamState.Stopped
break
chunksize = chunksize - uint64(toRead)
if rstream.state != AsyncStreamState.Running:
break
# Reading chunk trailing CRLF
var reFut3 = rstream.rsource.readExactly(addr buffer[0], 2)
await oneOf(reFut3, timerFut, exitFut)
if exitFut.finished or timerFut.finished:
if timerFut.finished:
rstream.error = newTimeoutError()
rstream.state = AsyncStreamState.Error
else:
rstream.state = AsyncStreamState.Stopped
break
if reFut3.failed:
rstream.error = reFut3.error
rstream.state = AsyncStreamState.Error
break
if buffer[0] != CRLF[0] or buffer[1] != CRLF[1]:
rstream.error = newProtocolError()
rstream.state = AsyncStreamState.Error
break
else:
# Reading trailing line for last chunk
var ruFut4 = rstream.rsource.readUntil(addr buffer[0], len(buffer), CRLF)
await oneOf(ruFut4, timerFut, exitFut)
if exitFut.finished or timerFut.finished:
if timerFut.finished:
rstream.error = newTimeoutError()
rstream.state = AsyncStreamState.Error
else:
rstream.state = AsyncStreamState.Stopped
break
if ruFut4.failed:
rstream.error = ruFut4.error
rstream.state = AsyncStreamState.Error
break
rstream.state = AsyncStreamState.Finished
await rstream.buffer.transfer() or exitFut
if exitFut.finished:
rstream.state = AsyncStreamState.Stopped
break
if rstream.state in {AsyncStreamState.Stopped, AsyncStreamState.Error}:
# We need to notify consumer about error/close, but we do not care about
# incoming data anymore.
rstream.buffer.forget()
untrackAsyncStreamReader(rstream)
proc chunkedWriteLoop(stream: AsyncStreamWriter) {.async.} =
var wstream = cast[ChunkedStreamWriter](stream)
var exitFut = wstream.exevent.wait()
var buffer: array[16, byte]
var error: ref Exception
var wFut1, wFut2, wFut3: Future[void]
wstream.state = AsyncStreamState.Running
while true:
# Getting new item from stream's queue.
var getFut = wstream.queue.get()
await oneOf(getFut, exitFut)
if exitFut.finished:
wstream.state = AsyncStreamState.Stopped
break
var item = getFut.read()
# `item.size == 0` is marker of stream finish, while `item.size != 0` is
# data's marker.
if item.size > 0:
let length = setChunkSize(buffer, int64(item.size))
# Writing chunk header <length>CRLF.
wFut1 = wstream.wsource.write(addr buffer[0], length)
await oneOf(wFut1, exitFut)
if exitFut.finished:
wstream.state = AsyncStreamState.Stopped
break
if wFut1.failed:
item.future.fail(wFut1.error)
continue
# Writing chunk data.
if item.kind == Pointer:
wFut2 = wstream.wsource.write(item.data1, item.size)
elif item.kind == Sequence:
wFut2 = wstream.wsource.write(addr item.data2[0], item.size)
elif item.kind == String:
wFut2 = wstream.wsource.write(addr item.data3[0], item.size)
await oneOf(wFut2, exitFut)
if exitFut.finished:
wstream.state = AsyncStreamState.Stopped
break
if wFut2.failed:
item.future.fail(wFut2.error)
continue
# Writing chunk footer CRLF.
var wFut3 = wstream.wsource.write(CRLF)
await oneOf(wFut3, exitFut)
if exitFut.finished:
wstream.state = AsyncStreamState.Stopped
break
if wFut3.failed:
item.future.fail(wFut3.error)
continue
# Everything is fine, completing queue item's future.
item.future.complete()
else:
let length = setChunkSize(buffer, 0'i64)
# Write finish chunk `0`.
wFut1 = wstream.wsource.write(addr buffer[0], length)
await oneOf(wFut1, exitFut)
if exitFut.finished:
wstream.state = AsyncStreamState.Stopped
break
if wFut1.failed:
item.future.fail(wFut1.error)
# We break here, because this is last chunk
break
# Write trailing CRLF.
wFut2 = wstream.wsource.write(CRLF)
await oneOf(wFut2, exitFut)
if exitFut.finished:
wstream.state = AsyncStreamState.Stopped
break
if wFut2.failed:
item.future.fail(wFut2.error)
# We break here, because this is last chunk
break
# Everything is fine, completing queue item's future.
item.future.complete()
# Set stream state to Finished.
wstream.state = AsyncStreamState.Finished
break
untrackAsyncStreamWriter(wstream)
proc init*[T](child: ChunkedStreamReader, rsource: AsyncStreamReader,
bufferSize = ChunkBufferSize, timeout = ChunkDefaultTimeout,
udata: ref T) =
child.timeout = timeout
init(cast[AsyncStreamReader](child), rsource, chunkedReadLoop, bufferSize,
udata)
proc init*(child: ChunkedStreamReader, rsource: AsyncStreamReader,
bufferSize = ChunkBufferSize, timeout = ChunkDefaultTimeout) =
child.timeout = timeout
init(cast[AsyncStreamReader](child), rsource, chunkedReadLoop, bufferSize)
proc newChunkedStreamReader*[T](rsource: AsyncStreamReader,
bufferSize = AsyncStreamDefaultBufferSize,
timeout = ChunkDefaultTimeout,
udata: ref T): ChunkedStreamReader =
result = new ChunkedStreamReader
result.init(rsource, bufferSize, timeout, udata)
proc newChunkedStreamReader*(rsource: AsyncStreamReader,
bufferSize = AsyncStreamDefaultBufferSize,
timeout = ChunkDefaultTimeout,
): ChunkedStreamReader =
result = new ChunkedStreamReader
result.init(rsource, bufferSize, timeout)
proc init*[T](child: ChunkedStreamWriter, wsource: AsyncStreamWriter,
queueSize = AsyncStreamDefaultQueueSize, udata: ref T) =
init(cast[AsyncStreamWriter](child), wsource, chunkedWriteLoop, queueSize,
udata)
proc init*(child: ChunkedStreamWriter, wsource: AsyncStreamWriter,
queueSize = AsyncStreamDefaultQueueSize) =
init(cast[AsyncStreamWriter](child), wsource, chunkedWriteLoop, queueSize)
proc newChunkedStreamWriter*[T](wsource: AsyncStreamWriter,
queueSize = AsyncStreamDefaultQueueSize,
udata: ref T): ChunkedStreamWriter =
result = new ChunkedStreamWriter
result.init(wsource, queueSize, udata)
proc newChunkedStreamWriter*(wsource: AsyncStreamWriter,
queueSize = AsyncStreamDefaultQueueSize,
): ChunkedStreamWriter =
result = new ChunkedStreamWriter
result.init(wsource, queueSize)

View File

@ -6,6 +6,8 @@
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
import transports/[datagram, stream, common, ipnet, osnet]
import streams/[asyncstream, chunkstream]
export datagram, common, stream, ipnet, osnet
export asyncstream, chunkstream

View File

@ -1469,7 +1469,7 @@ proc readUntil*(transp: StreamTransport, pbytes: pointer, nbytes: int,
## Read data from the transport ``transp`` until separator ``sep`` is found.
##
## On success, the data and separator will be removed from the internal
## buffer (consumed). Returned data will NOT include the separator at the end.
## buffer (consumed). Returned data will include the separator at the end.
##
## If EOF is received, and `sep` was not found, procedure will raise
## ``TransportIncompleteError``.
@ -1578,35 +1578,37 @@ proc read*(transp: StreamTransport, n = -1): Future[seq[byte]] {.async.} =
## This procedure allocates buffer seq[byte] and return it as result.
checkClosed(transp)
checkPending(transp)
result = newSeq[byte]()
var res = newSeq[byte]()
while true:
if (ReadError in transp.state):
raise transp.getError()
if (ReadClosed in transp.state) or transp.atEof():
result = res
break
if transp.offset > 0:
let s = len(result)
let s = len(res)
let o = s + transp.offset
if n < 0:
if n <= 0:
# grabbing all incoming data, until EOF
result.setLen(o)
copyMem(cast[pointer](addr result[s]), addr(transp.buffer[0]),
res.setLen(o)
copyMem(cast[pointer](addr res[s]), addr(transp.buffer[0]),
transp.offset)
transp.offset = 0
else:
let left = n - s
if transp.offset >= left:
# size of buffer data is more then we need, grabbing only part
result.setLen(n)
copyMem(cast[pointer](addr result[s]), addr(transp.buffer[0]),
res.setLen(n)
copyMem(cast[pointer](addr res[s]), addr(transp.buffer[0]),
left)
transp.shiftBuffer(left)
result = res
break
else:
# there not enough data in buffer, grabbing all
result.setLen(o)
copyMem(cast[pointer](addr result[s]), addr(transp.buffer[0]),
res.setLen(o)
copyMem(cast[pointer](addr res[s]), addr(transp.buffer[0]),
transp.offset)
transp.offset = 0
@ -1631,7 +1633,7 @@ proc consume*(transp: StreamTransport, n = -1): Future[int] {.async.} =
break
if transp.offset > 0:
if n == -1:
if n <= 0:
# consume all incoming data, until EOF
result += transp.offset
transp.offset = 0

View File

@ -6,4 +6,5 @@
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
import testsync, testsoon, testtime, testfut, testsignal, testaddress,
testdatagram, teststream, testserver, testbugs, testnet
testdatagram, teststream, testserver, testbugs, testnet,
testasyncstream

472
tests/testasyncstream.nim Normal file
View File

@ -0,0 +1,472 @@
# Chronos Test Suite
# (c) Copyright 2019-Present
# Status Research & Development GmbH
#
# Licensed under either of
# Apache License, version 2.0, (LICENSE-APACHEv2)
# MIT license (LICENSE-MIT)
import strutils, unittest, os
import hexdump
import ../chronos
suite "AsyncStream test suite":
test "AsyncStream(StreamTransport) readExactly() test":
proc testReadExactly(address: TransportAddress): Future[bool] {.async.} =
proc serveClient(server: StreamServer,
transp: StreamTransport) {.async.} =
var wstream = newAsyncStreamWriter(transp)
await wstream.write("000000000011111111112222222222")
await wstream.finish()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
var buffer = newSeq[byte](10)
var server = createStreamServer(address, serveClient, {ReuseAddr})
server.start()
var transp = await connect(address)
var rstream = newAsyncStreamReader(transp)
await rstream.readExactly(addr buffer[0], 10)
check cast[string](buffer) == "0000000000"
await rstream.readExactly(addr buffer[0], 10)
check cast[string](buffer) == "1111111111"
await rstream.readExactly(addr buffer[0], 10)
check cast[string](buffer) == "2222222222"
await rstream.closeWait()
await transp.closeWait()
await server.join()
result = true
check waitFor(testReadExactly(initTAddress("127.0.0.1:46001"))) == true
test "AsyncStream(StreamTransport) readUntil() test":
proc testReadUntil(address: TransportAddress): Future[bool] {.async.} =
proc serveClient(server: StreamServer,
transp: StreamTransport) {.async.} =
var wstream = newAsyncStreamWriter(transp)
await wstream.write("0000000000NNz1111111111NNz2222222222NNz")
await wstream.finish()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
var buffer = newSeq[byte](13)
var sep = @[byte('N'), byte('N'), byte('z')]
var server = createStreamServer(address, serveClient, {ReuseAddr})
server.start()
var transp = await connect(address)
var rstream = newAsyncStreamReader(transp)
var r1 = await rstream.readUntil(addr buffer[0], len(buffer), sep)
check:
r1 == 13
cast[string](buffer) == "0000000000NNz"
var r2 = await rstream.readUntil(addr buffer[0], len(buffer), sep)
check:
r2 == 13
cast[string](buffer) == "1111111111NNz"
var r3 = await rstream.readUntil(addr buffer[0], len(buffer), sep)
check:
r3 == 13
cast[string](buffer) == "2222222222NNz"
await rstream.closeWait()
await transp.closeWait()
await server.join()
result = true
check waitFor(testReadUntil(initTAddress("127.0.0.1:46001"))) == true
test "AsyncStream(StreamTransport) readLine() test":
proc testReadLine(address: TransportAddress): Future[bool] {.async.} =
proc serveClient(server: StreamServer,
transp: StreamTransport) {.async.} =
var wstream = newAsyncStreamWriter(transp)
await wstream.write("0000000000\r\n1111111111\r\n2222222222\r\n")
await wstream.finish()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
var server = createStreamServer(address, serveClient, {ReuseAddr})
server.start()
var transp = await connect(address)
var rstream = newAsyncStreamReader(transp)
var r1 = await rstream.readLine()
check r1 == "0000000000"
var r2 = await rstream.readLine()
check r2 == "1111111111"
var r3 = await rstream.readLine()
check r3 == "2222222222"
await rstream.closeWait()
await transp.closeWait()
await server.join()
result = true
check waitFor(testReadLine(initTAddress("127.0.0.1:46001"))) == true
test "AsyncStream(StreamTransport) read() test":
proc testRead(address: TransportAddress): Future[bool] {.async.} =
proc serveClient(server: StreamServer,
transp: StreamTransport) {.async.} =
var wstream = newAsyncStreamWriter(transp)
await wstream.write("000000000011111111112222222222")
await wstream.finish()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
var server = createStreamServer(address, serveClient, {ReuseAddr})
server.start()
var transp = await connect(address)
var rstream = newAsyncStreamReader(transp)
var buf1 = await rstream.read(10)
check cast[string](buf1) == "0000000000"
var buf2 = await rstream.read()
check cast[string](buf2) == "11111111112222222222"
await rstream.closeWait()
await transp.closeWait()
await server.join()
result = true
check waitFor(testRead(initTAddress("127.0.0.1:46001"))) == true
test "AsyncStream(StreamTransport) consume() test":
proc testConsume(address: TransportAddress): Future[bool] {.async.} =
proc serveClient(server: StreamServer,
transp: StreamTransport) {.async.} =
var wstream = newAsyncStreamWriter(transp)
await wstream.write("0000000000111111111122222222223333333333")
await wstream.finish()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
var server = createStreamServer(address, serveClient, {ReuseAddr})
server.start()
var transp = await connect(address)
var rstream = newAsyncStreamReader(transp)
var res1 = await rstream.consume(10)
check:
res1 == 10
var buf1 = await rstream.read(10)
check cast[string](buf1) == "1111111111"
var res2 = await rstream.consume(10)
check:
res2 == 10
var buf2 = await rstream.read(10)
check cast[string](buf2) == "3333333333"
await rstream.closeWait()
await transp.closeWait()
await server.join()
result = true
check waitFor(testConsume(initTAddress("127.0.0.1:46001"))) == true
test "AsyncStream(StreamTransport) leaks test":
check:
getTracker("async.stream.reader").isLeaked() == false
getTracker("async.stream.writer").isLeaked() == false
getTracker("stream.server").isLeaked() == false
getTracker("stream.transport").isLeaked() == false
test "AsyncStream(AsyncStream) readExactly() test":
proc testReadExactly2(address: TransportAddress): Future[bool] {.async.} =
proc serveClient(server: StreamServer,
transp: StreamTransport) {.async.} =
var wstream = newAsyncStreamWriter(transp)
var wstream2 = newChunkedStreamWriter(wstream)
var s1 = "00000"
var s2 = "11111"
var s3 = "22222"
await wstream2.write("00000")
await wstream2.write(addr s1[0], len(s1))
await wstream2.write("11111")
await wstream2.write(cast[seq[byte]](s2))
await wstream2.write("22222")
await wstream2.write(addr s3[0], len(s3))
await wstream2.finish()
await wstream.finish()
await wstream2.closeWait()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
var buffer = newSeq[byte](10)
var server = createStreamServer(address, serveClient, {ReuseAddr})
server.start()
var transp = await connect(address)
var rstream = newAsyncStreamReader(transp)
var rstream2 = newChunkedStreamReader(rstream)
await rstream2.readExactly(addr buffer[0], 10)
check cast[string](buffer) == "0000000000"
await rstream2.readExactly(addr buffer[0], 10)
check cast[string](buffer) == "1111111111"
await rstream2.readExactly(addr buffer[0], 10)
check cast[string](buffer) == "2222222222"
await rstream2.closeWait()
await rstream.closeWait()
await transp.closeWait()
await server.join()
result = true
check waitFor(testReadExactly2(initTAddress("127.0.0.1:46001"))) == true
test "AsyncStream(AsyncStream) readUntil() test":
proc testReadUntil2(address: TransportAddress): Future[bool] {.async.} =
proc serveClient(server: StreamServer,
transp: StreamTransport) {.async.} =
var wstream = newAsyncStreamWriter(transp)
var wstream2 = newChunkedStreamWriter(wstream)
var s1 = "00000NNz"
var s2 = "11111NNz"
var s3 = "22222NNz"
await wstream2.write("00000")
await wstream2.write(addr s1[0], len(s1))
await wstream2.write("11111")
await wstream2.write(s2)
await wstream2.write("22222")
await wstream2.write(cast[seq[byte]](s3))
await wstream2.finish()
await wstream.finish()
await wstream2.closeWait()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
var buffer = newSeq[byte](13)
var sep = @[byte('N'), byte('N'), byte('z')]
var server = createStreamServer(address, serveClient, {ReuseAddr})
server.start()
var transp = await connect(address)
var rstream = newAsyncStreamReader(transp)
var rstream2 = newChunkedStreamReader(rstream)
var r1 = await rstream2.readUntil(addr buffer[0], len(buffer), sep)
check:
r1 == 13
cast[string](buffer) == "0000000000NNz"
var r2 = await rstream2.readUntil(addr buffer[0], len(buffer), sep)
check:
r2 == 13
cast[string](buffer) == "1111111111NNz"
var r3 = await rstream2.readUntil(addr buffer[0], len(buffer), sep)
check:
r3 == 13
cast[string](buffer) == "2222222222NNz"
await rstream2.closeWait()
await rstream.closeWait()
await transp.closeWait()
await server.join()
result = true
check waitFor(testReadUntil2(initTAddress("127.0.0.1:46001"))) == true
test "AsyncStream(AsyncStream) readLine() test":
proc testReadLine2(address: TransportAddress): Future[bool] {.async.} =
proc serveClient(server: StreamServer,
transp: StreamTransport) {.async.} =
var wstream = newAsyncStreamWriter(transp)
var wstream2 = newChunkedStreamWriter(wstream)
await wstream2.write("00000")
await wstream2.write("00000\r\n")
await wstream2.write("11111")
await wstream2.write("11111\r\n")
await wstream2.write("22222")
await wstream2.write("22222\r\n")
await wstream2.finish()
await wstream.finish()
await wstream2.closeWait()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
var server = createStreamServer(address, serveClient, {ReuseAddr})
server.start()
var transp = await connect(address)
var rstream = newAsyncStreamReader(transp)
var rstream2 = newChunkedStreamReader(rstream)
var r1 = await rstream2.readLine()
check r1 == "0000000000"
var r2 = await rstream2.readLine()
check r2 == "1111111111"
var r3 = await rstream2.readLine()
check r3 == "2222222222"
await rstream2.closeWait()
await rstream.closeWait()
await transp.closeWait()
await server.join()
result = true
check waitFor(testReadLine2(initTAddress("127.0.0.1:46001"))) == true
test "AsyncStream(AsyncStream) read() test":
proc testRead2(address: TransportAddress): Future[bool] {.async.} =
proc serveClient(server: StreamServer,
transp: StreamTransport) {.async.} =
var wstream = newAsyncStreamWriter(transp)
var wstream2 = newChunkedStreamWriter(wstream)
var s2 = "1111111111"
var s3 = "2222222222"
await wstream2.write("0000000000")
await wstream2.write(s2)
await wstream2.write(cast[seq[byte]](s3))
await wstream2.finish()
await wstream.finish()
await wstream2.closeWait()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
var server = createStreamServer(address, serveClient, {ReuseAddr})
server.start()
var transp = await connect(address)
var rstream = newAsyncStreamReader(transp)
var rstream2 = newChunkedStreamReader(rstream)
var buf1 = await rstream2.read(10)
check cast[string](buf1) == "0000000000"
var buf2 = await rstream2.read()
check cast[string](buf2) == "11111111112222222222"
await rstream2.closeWait()
await rstream.closeWait()
await transp.closeWait()
await server.join()
result = true
check waitFor(testRead2(initTAddress("127.0.0.1:46001"))) == true
test "AsyncStream(AsyncStream) consume() test":
proc testConsume2(address: TransportAddress): Future[bool] {.async.} =
proc serveClient(server: StreamServer,
transp: StreamTransport) {.async.} =
const
S4 = @[byte('3'), byte('3'), byte('3'), byte('3'), byte('3')]
var wstream = newAsyncStreamWriter(transp)
var wstream2 = newChunkedStreamWriter(wstream)
var s1 = "00000"
var s2 = cast[seq[byte]]("11111")
var s3 = "22222"
await wstream2.write("00000")
await wstream2.write(s1)
await wstream2.write("11111")
await wstream2.write(s2)
await wstream2.write("22222")
await wstream2.write(addr s3[0], len(s3))
await wstream2.write("33333")
await wstream2.write(S4)
await wstream2.finish()
await wstream.finish()
await wstream2.closeWait()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
var server = createStreamServer(address, serveClient, {ReuseAddr})
server.start()
var transp = await connect(address)
var rstream = newAsyncStreamReader(transp)
var rstream2 = newChunkedStreamReader(rstream)
var res1 = await rstream2.consume(10)
check:
res1 == 10
var buf1 = await rstream2.read(10)
check cast[string](buf1) == "1111111111"
var res2 = await rstream2.consume(10)
check:
res2 == 10
var buf2 = await rstream2.read(10)
check cast[string](buf2) == "3333333333"
await rstream2.closeWait()
await rstream.closeWait()
await transp.closeWait()
await server.join()
result = true
check waitFor(testConsume2(initTAddress("127.0.0.1:46001"))) == true
test "AsyncStream(AsyncStream) leaks test":
check:
getTracker("async.stream.reader").isLeaked() == false
getTracker("async.stream.writer").isLeaked() == false
getTracker("stream.server").isLeaked() == false
getTracker("stream.transport").isLeaked() == false
suite "ChunkedStream test suite":
test "ChunkedStream test vectors":
const ChunkedVectors = [
["4\r\nWiki\r\n5\r\npedia\r\nE\r\n in\r\n\r\nchunks.\r\n0\r\n\r\n",
"Wikipedia in\r\n\r\nchunks."],
["4\r\nWiki\r\n5\r\npedia\r\nE\r\n in\r\n\r\nchunks.\r\n0\r\n\r\n0\r\n\r\n",
"Wikipedia in\r\n\r\nchunks."],
]
proc checkVector(address: TransportAddress,
inputstr: string): Future[string] {.async.} =
proc serveClient(server: StreamServer,
transp: StreamTransport) {.async.} =
var wstream = newAsyncStreamWriter(transp)
var data = inputstr
await wstream.write(data)
await wstream.finish()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
var server = createStreamServer(address, serveClient, {ReuseAddr})
server.start()
var transp = await connect(address)
var rstream = newAsyncStreamReader(transp)
var rstream2 = newChunkedStreamReader(rstream)
var res = await rstream2.read()
var ress = cast[string](res)
await rstream2.closeWait()
await rstream.closeWait()
await transp.closeWait()
await server.join()
result = ress
proc testVectors(address: TransportAddress): Future[bool] {.async.} =
var res = true
for i in 0..<len(ChunkedVectors):
var r = await checkVector(address, ChunkedVectors[i][0])
if r != ChunkedVectors[i][1]:
res = false
break
result = res
check waitFor(testVectors(initTAddress("127.0.0.1:46001"))) == true
test "ChunkedStream incorrect chunk test":
const BadVectors = [
["100000000 \r\n1"],
["z\r\n1"]
]
proc checkVector(address: TransportAddress,
inputstr: string): Future[bool] {.async.} =
proc serveClient(server: StreamServer,
transp: StreamTransport) {.async.} =
var wstream = newAsyncStreamWriter(transp)
var data = inputstr
await wstream.write(data)
await wstream.finish()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
var res = false
var server = createStreamServer(address, serveClient, {ReuseAddr})
server.start()
var transp = await connect(address)
var rstream = newAsyncStreamReader(transp)
var rstream2 = newChunkedStreamReader(rstream)
try:
var r = await rstream2.read()
except AsyncStreamReadError as e:
res = true
await rstream2.closeWait()
await rstream.closeWait()
await transp.closeWait()
await server.join()
result = res
proc testVectors2(address: TransportAddress): Future[bool] {.async.} =
var res = true
for i in 0..<len(BadVectors):
var r = await checkVector(address, BadVectors[i][0])
if not(r):
res = false
break
result = res
check waitFor(testVectors2(initTAddress("127.0.0.1:46001"))) == true