Asyncstreams asyncraises. (#472)

* Fix transports addresses functions should not return so many exceptions.

* Add raising `Defect` functions to AsyncQueue.

* Add raises/asyncraises into async streams.

* Remove `Safe` primitives.
Make AsyncStreamError to be ancestor of AsyncError.
Make AsyncStreamReader/Writer loops requirement to not raise any exceptions

* Remove `par` fields.

* Remove `par` fields from TLSStream.

* Attempt to lower memory usage.
This commit is contained in:
Eugene Kabanov 2023-11-18 00:18:09 +02:00 committed by GitHub
parent 1306170255
commit 0b136b33c8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 217 additions and 198 deletions

View File

@ -165,7 +165,7 @@ proc newAsyncEvent*(): AsyncEvent =
AsyncEvent()
proc wait*(event: AsyncEvent): Future[void] {.
async: (raw: true, raises: [CancelledError]).} =
async: (raw: true, raises: [CancelledError]).} =
## Block until the internal flag of ``event`` is `true`.
## If the internal flag is `true` on entry, return immediately. Otherwise,
## block until another task calls `fire()` to set the flag to `true`,
@ -258,7 +258,7 @@ proc popLastImpl[T](aq: AsyncQueue[T]): T =
res
proc addFirstNoWait*[T](aq: AsyncQueue[T], item: T) {.
raises: [AsyncQueueFullError].}=
raises: [AsyncQueueFullError].} =
## Put an item ``item`` to the beginning of the queue ``aq`` immediately.
##
## If queue ``aq`` is full, then ``AsyncQueueFullError`` exception raised.
@ -267,7 +267,7 @@ proc addFirstNoWait*[T](aq: AsyncQueue[T], item: T) {.
aq.addFirstImpl(item)
proc addLastNoWait*[T](aq: AsyncQueue[T], item: T) {.
raises: [AsyncQueueFullError].}=
raises: [AsyncQueueFullError].} =
## Put an item ``item`` at the end of the queue ``aq`` immediately.
##
## If queue ``aq`` is full, then ``AsyncQueueFullError`` exception raised.
@ -276,7 +276,7 @@ proc addLastNoWait*[T](aq: AsyncQueue[T], item: T) {.
aq.addLastImpl(item)
proc popFirstNoWait*[T](aq: AsyncQueue[T]): T {.
raises: [AsyncQueueEmptyError].} =
raises: [AsyncQueueEmptyError].} =
## Get an item from the beginning of the queue ``aq`` immediately.
##
## If queue ``aq`` is empty, then ``AsyncQueueEmptyError`` exception raised.
@ -285,7 +285,7 @@ proc popFirstNoWait*[T](aq: AsyncQueue[T]): T {.
aq.popFirstImpl()
proc popLastNoWait*[T](aq: AsyncQueue[T]): T {.
raises: [AsyncQueueEmptyError].} =
raises: [AsyncQueueEmptyError].} =
## Get an item from the end of the queue ``aq`` immediately.
##
## If queue ``aq`` is empty, then ``AsyncQueueEmptyError`` exception raised.
@ -293,11 +293,13 @@ proc popLastNoWait*[T](aq: AsyncQueue[T]): T {.
raise newException(AsyncQueueEmptyError, "AsyncQueue is empty!")
aq.popLastImpl()
proc addFirst*[T](aq: AsyncQueue[T], item: T) {.async: (raises: [CancelledError]).} =
proc addFirst*[T](aq: AsyncQueue[T], item: T) {.
async: (raises: [CancelledError]).} =
## Put an ``item`` to the beginning of the queue ``aq``. If the queue is full,
## wait until a free slot is available before adding item.
while aq.full():
let putter = Future[void].Raising([CancelledError]).init("AsyncQueue.addFirst")
let putter =
Future[void].Raising([CancelledError]).init("AsyncQueue.addFirst")
aq.putters.add(putter)
try:
await putter
@ -307,11 +309,13 @@ proc addFirst*[T](aq: AsyncQueue[T], item: T) {.async: (raises: [CancelledError]
raise exc
aq.addFirstImpl(item)
proc addLast*[T](aq: AsyncQueue[T], item: T) {.async: (raises: [CancelledError]).} =
proc addLast*[T](aq: AsyncQueue[T], item: T) {.
async: (raises: [CancelledError]).} =
## Put an ``item`` to the end of the queue ``aq``. If the queue is full,
## wait until a free slot is available before adding item.
while aq.full():
let putter = Future[void].Raising([CancelledError]).init("AsyncQueue.addLast")
let putter =
Future[void].Raising([CancelledError]).init("AsyncQueue.addLast")
aq.putters.add(putter)
try:
await putter
@ -321,11 +325,13 @@ proc addLast*[T](aq: AsyncQueue[T], item: T) {.async: (raises: [CancelledError])
raise exc
aq.addLastImpl(item)
proc popFirst*[T](aq: AsyncQueue[T]): Future[T] {.async: (raises: [CancelledError]).} =
proc popFirst*[T](aq: AsyncQueue[T]): Future[T] {.
async: (raises: [CancelledError]).} =
## Remove and return an ``item`` from the beginning of the queue ``aq``.
## If the queue is empty, wait until an item is available.
while aq.empty():
let getter = Future[void].Raising([CancelledError]).init("AsyncQueue.popFirst")
let getter =
Future[void].Raising([CancelledError]).init("AsyncQueue.popFirst")
aq.getters.add(getter)
try:
await getter
@ -335,11 +341,13 @@ proc popFirst*[T](aq: AsyncQueue[T]): Future[T] {.async: (raises: [CancelledErro
raise exc
aq.popFirstImpl()
proc popLast*[T](aq: AsyncQueue[T]): Future[T] {.async: (raises: [CancelledError]).} =
proc popLast*[T](aq: AsyncQueue[T]): Future[T] {.
async: (raises: [CancelledError]).} =
## Remove and return an ``item`` from the end of the queue ``aq``.
## If the queue is empty, wait until an item is available.
while aq.empty():
let getter = Future[void].Raising([CancelledError]).init("AsyncQueue.popLast")
let getter =
Future[void].Raising([CancelledError]).init("AsyncQueue.popLast")
aq.getters.add(getter)
try:
await getter
@ -350,22 +358,22 @@ proc popLast*[T](aq: AsyncQueue[T]): Future[T] {.async: (raises: [CancelledError
aq.popLastImpl()
proc putNoWait*[T](aq: AsyncQueue[T], item: T) {.
raises: [AsyncQueueFullError].} =
raises: [AsyncQueueFullError].} =
## Alias of ``addLastNoWait()``.
aq.addLastNoWait(item)
proc getNoWait*[T](aq: AsyncQueue[T]): T {.
raises: [AsyncQueueEmptyError].} =
raises: [AsyncQueueEmptyError].} =
## Alias of ``popFirstNoWait()``.
aq.popFirstNoWait()
proc put*[T](aq: AsyncQueue[T], item: T): Future[void] {.
async: (raw: true, raises: [CancelledError]).} =
async: (raw: true, raises: [CancelledError]).} =
## Alias of ``addLast()``.
aq.addLast(item)
proc get*[T](aq: AsyncQueue[T]): Future[T] {.
async: (raw: true, raises: [CancelledError]).} =
async: (raw: true, raises: [CancelledError]).} =
## Alias of ``popFirst()``.
aq.popFirst()
@ -509,7 +517,8 @@ proc close*(ab: AsyncEventQueue) {.raises: [].} =
ab.readers.reset()
ab.queue.clear()
proc closeWait*(ab: AsyncEventQueue): Future[void] {.async: (raw: true, raises: []).} =
proc closeWait*(ab: AsyncEventQueue): Future[void] {.
async: (raw: true, raises: []).} =
let retFuture = newFuture[void]("AsyncEventQueue.closeWait()",
{FutureFlag.OwnCancelSchedule})
proc continuation(udata: pointer) {.gcsafe.} =
@ -568,7 +577,7 @@ proc emit*[T](ab: AsyncEventQueue[T], data: T) =
proc waitEvents*[T](ab: AsyncEventQueue[T],
key: EventQueueKey,
eventsCount = -1): Future[seq[T]] {.
async: (raises: [AsyncEventQueueFullError, CancelledError]).} =
async: (raises: [AsyncEventQueueFullError, CancelledError]).} =
## Wait for events
var
events: seq[T]

View File

@ -24,15 +24,13 @@ const
## AsyncStreamWriter leaks tracker name
type
AsyncStreamError* = object of CatchableError
AsyncStreamError* = object of AsyncError
AsyncStreamIncorrectDefect* = object of Defect
AsyncStreamIncompleteError* = object of AsyncStreamError
AsyncStreamLimitError* = object of AsyncStreamError
AsyncStreamUseClosedError* = object of AsyncStreamError
AsyncStreamReadError* = object of AsyncStreamError
par*: ref CatchableError
AsyncStreamWriteError* = object of AsyncStreamError
par*: ref CatchableError
AsyncStreamWriteEOFError* = object of AsyncStreamWriteError
AsyncBuffer* = object
@ -53,7 +51,7 @@ type
dataStr*: string
size*: int
offset*: int
future*: Future[void]
future*: Future[void].Raising([CancelledError, AsyncStreamError])
AsyncStreamState* = enum
Running, ## Stream is online and working
@ -64,10 +62,10 @@ type
Closed ## Stream was closed
StreamReaderLoop* = proc (stream: AsyncStreamReader): Future[void] {.
gcsafe, raises: [].}
async: (raises: []).}
## Main read loop for read streams.
StreamWriterLoop* = proc (stream: AsyncStreamWriter): Future[void] {.
gcsafe, raises: [].}
async: (raises: []).}
## Main write loop for write streams.
AsyncStreamReader* = ref object of RootRef
@ -124,12 +122,12 @@ proc `[]`*(sb: AsyncBuffer, index: int): byte {.inline.} =
proc update*(sb: var AsyncBuffer, size: int) {.inline.} =
sb.offset += size
proc wait*(sb: var AsyncBuffer): Future[void] =
template wait*(sb: var AsyncBuffer): untyped =
sb.events[0].clear()
sb.events[1].fire()
sb.events[0].wait()
proc transfer*(sb: var AsyncBuffer): Future[void] =
template transfer*(sb: var AsyncBuffer): untyped =
sb.events[1].clear()
sb.events[0].fire()
sb.events[1].wait()
@ -150,7 +148,8 @@ proc copyData*(sb: AsyncBuffer, dest: pointer, offset, length: int) {.inline.} =
unsafeAddr sb.buffer[0], length)
proc upload*(sb: ptr AsyncBuffer, pbytes: ptr byte,
nbytes: int): Future[void] {.async.} =
nbytes: int): Future[void] {.
async: (raises: [CancelledError]).} =
## You can upload any amount of bytes to the buffer. If size of internal
## buffer is not enough to fit all the data at once, data will be uploaded
## via chunks of size up to internal buffer size.
@ -186,18 +185,20 @@ template copyOut*(dest: pointer, item: WriteItem, length: int) =
elif item.kind == String:
copyMem(dest, unsafeAddr item.dataStr[item.offset], length)
proc newAsyncStreamReadError(p: ref CatchableError): ref AsyncStreamReadError {.
noinline.} =
proc newAsyncStreamReadError(
p: ref TransportError
): ref AsyncStreamReadError {.noinline.} =
var w = newException(AsyncStreamReadError, "Read stream failed")
w.msg = w.msg & ", originated from [" & $p.name & "] " & p.msg
w.par = p
w.parent = p
w
proc newAsyncStreamWriteError(p: ref CatchableError): ref AsyncStreamWriteError {.
noinline.} =
proc newAsyncStreamWriteError(
p: ref TransportError
): ref AsyncStreamWriteError {.noinline.} =
var w = newException(AsyncStreamWriteError, "Write stream failed")
w.msg = w.msg & ", originated from [" & $p.name & "] " & p.msg
w.par = p
w.parent = p
w
proc newAsyncStreamIncompleteError*(): ref AsyncStreamIncompleteError {.
@ -344,7 +345,8 @@ template readLoop(body: untyped): untyped =
await rstream.buffer.wait()
proc readExactly*(rstream: AsyncStreamReader, pbytes: pointer,
nbytes: int) {.async.} =
nbytes: int) {.
async: (raises: [CancelledError, AsyncStreamError]).} =
## Read exactly ``nbytes`` bytes from read-only stream ``rstream`` and store
## it to ``pbytes``.
##
@ -365,7 +367,7 @@ proc readExactly*(rstream: AsyncStreamReader, pbytes: pointer,
raise exc
except TransportIncompleteError:
raise newAsyncStreamIncompleteError()
except CatchableError as exc:
except TransportError as exc:
raise newAsyncStreamReadError(exc)
else:
if isNil(rstream.readerLoop):
@ -384,7 +386,8 @@ proc readExactly*(rstream: AsyncStreamReader, pbytes: pointer,
(consumed: count, done: index == nbytes)
proc readOnce*(rstream: AsyncStreamReader, pbytes: pointer,
nbytes: int): Future[int] {.async.} =
nbytes: int): Future[int] {.
async: (raises: [CancelledError, AsyncStreamError]).} =
## Perform one read operation on read-only stream ``rstream``.
##
## If internal buffer is not empty, ``nbytes`` bytes will be transferred from
@ -398,7 +401,7 @@ proc readOnce*(rstream: AsyncStreamReader, pbytes: pointer,
return await readOnce(rstream.tsource, pbytes, nbytes)
except CancelledError as exc:
raise exc
except CatchableError as exc:
except TransportError as exc:
raise newAsyncStreamReadError(exc)
else:
if isNil(rstream.readerLoop):
@ -415,7 +418,8 @@ proc readOnce*(rstream: AsyncStreamReader, pbytes: pointer,
return count
proc readUntil*(rstream: AsyncStreamReader, pbytes: pointer, nbytes: int,
sep: seq[byte]): Future[int] {.async.} =
sep: seq[byte]): Future[int] {.
async: (raises: [CancelledError, AsyncStreamError]).} =
## Read data from the read-only stream ``rstream`` until separator ``sep`` is
## found.
##
@ -446,7 +450,7 @@ proc readUntil*(rstream: AsyncStreamReader, pbytes: pointer, nbytes: int,
raise newAsyncStreamIncompleteError()
except TransportLimitError:
raise newAsyncStreamLimitError()
except CatchableError as exc:
except TransportError as exc:
raise newAsyncStreamReadError(exc)
else:
if isNil(rstream.readerLoop):
@ -476,7 +480,8 @@ proc readUntil*(rstream: AsyncStreamReader, pbytes: pointer, nbytes: int,
return k
proc readLine*(rstream: AsyncStreamReader, limit = 0,
sep = "\r\n"): Future[string] {.async.} =
sep = "\r\n"): Future[string] {.
async: (raises: [CancelledError, AsyncStreamError]).} =
## Read one line from read-only stream ``rstream``, where ``"line"`` is a
## sequence of bytes ending with ``sep`` (default is ``"\r\n"``).
##
@ -495,7 +500,7 @@ proc readLine*(rstream: AsyncStreamReader, limit = 0,
return await readLine(rstream.tsource, limit, sep)
except CancelledError as exc:
raise exc
except CatchableError as exc:
except TransportError as exc:
raise newAsyncStreamReadError(exc)
else:
if isNil(rstream.readerLoop):
@ -530,7 +535,8 @@ proc readLine*(rstream: AsyncStreamReader, limit = 0,
(index, (state == len(sep)) or (lim == len(res)))
return res
proc read*(rstream: AsyncStreamReader): Future[seq[byte]] {.async.} =
proc read*(rstream: AsyncStreamReader): Future[seq[byte]] {.
async: (raises: [CancelledError, AsyncStreamError]).} =
## Read all bytes from read-only stream ``rstream``.
##
## This procedure allocates buffer seq[byte] and return it as result.
@ -543,7 +549,7 @@ proc read*(rstream: AsyncStreamReader): Future[seq[byte]] {.async.} =
raise exc
except TransportLimitError:
raise newAsyncStreamLimitError()
except CatchableError as exc:
except TransportError as exc:
raise newAsyncStreamReadError(exc)
else:
if isNil(rstream.readerLoop):
@ -559,7 +565,8 @@ proc read*(rstream: AsyncStreamReader): Future[seq[byte]] {.async.} =
(count, false)
return res
proc read*(rstream: AsyncStreamReader, n: int): Future[seq[byte]] {.async.} =
proc read*(rstream: AsyncStreamReader, n: int): Future[seq[byte]] {.
async: (raises: [CancelledError, AsyncStreamError]).} =
## Read all bytes (n <= 0) or exactly `n` bytes from read-only stream
## ``rstream``.
##
@ -571,7 +578,7 @@ proc read*(rstream: AsyncStreamReader, n: int): Future[seq[byte]] {.async.} =
return await read(rstream.tsource, n)
except CancelledError as exc:
raise exc
except CatchableError as exc:
except TransportError as exc:
raise newAsyncStreamReadError(exc)
else:
if isNil(rstream.readerLoop):
@ -590,7 +597,8 @@ proc read*(rstream: AsyncStreamReader, n: int): Future[seq[byte]] {.async.} =
(count, len(res) == n)
return res
proc consume*(rstream: AsyncStreamReader): Future[int] {.async.} =
proc consume*(rstream: AsyncStreamReader): Future[int] {.
async: (raises: [CancelledError, AsyncStreamError]).} =
## Consume (discard) all bytes from read-only stream ``rstream``.
##
## Return number of bytes actually consumed (discarded).
@ -603,7 +611,7 @@ proc consume*(rstream: AsyncStreamReader): Future[int] {.async.} =
raise exc
except TransportLimitError:
raise newAsyncStreamLimitError()
except CatchableError as exc:
except TransportError as exc:
raise newAsyncStreamReadError(exc)
else:
if isNil(rstream.readerLoop):
@ -618,7 +626,8 @@ proc consume*(rstream: AsyncStreamReader): Future[int] {.async.} =
(rstream.buffer.dataLen(), false)
return res
proc consume*(rstream: AsyncStreamReader, n: int): Future[int] {.async.} =
proc consume*(rstream: AsyncStreamReader, n: int): Future[int] {.
async: (raises: [CancelledError, AsyncStreamError]).} =
## Consume (discard) all bytes (n <= 0) or ``n`` bytes from read-only stream
## ``rstream``.
##
@ -632,7 +641,7 @@ proc consume*(rstream: AsyncStreamReader, n: int): Future[int] {.async.} =
raise exc
except TransportLimitError:
raise newAsyncStreamLimitError()
except CatchableError as exc:
except TransportError as exc:
raise newAsyncStreamReadError(exc)
else:
if isNil(rstream.readerLoop):
@ -652,7 +661,7 @@ proc consume*(rstream: AsyncStreamReader, n: int): Future[int] {.async.} =
return res
proc readMessage*(rstream: AsyncStreamReader, pred: ReadMessagePredicate) {.
async.} =
async: (raises: [CancelledError, AsyncStreamError]).} =
## Read all bytes from stream ``rstream`` until ``predicate`` callback
## will not be satisfied.
##
@ -673,7 +682,7 @@ proc readMessage*(rstream: AsyncStreamReader, pred: ReadMessagePredicate) {.
await readMessage(rstream.tsource, pred)
except CancelledError as exc:
raise exc
except CatchableError as exc:
except TransportError as exc:
raise newAsyncStreamReadError(exc)
else:
if isNil(rstream.readerLoop):
@ -691,7 +700,8 @@ proc readMessage*(rstream: AsyncStreamReader, pred: ReadMessagePredicate) {.
pred(rstream.buffer.buffer.toOpenArray(0, count - 1))
proc write*(wstream: AsyncStreamWriter, pbytes: pointer,
nbytes: int) {.async.} =
nbytes: int) {.
async: (raises: [CancelledError, AsyncStreamError]).} =
## Write sequence of bytes pointed by ``pbytes`` of length ``nbytes`` to
## writer stream ``wstream``.
##
@ -708,9 +718,7 @@ proc write*(wstream: AsyncStreamWriter, pbytes: pointer,
res = await write(wstream.tsource, pbytes, nbytes)
except CancelledError as exc:
raise exc
except AsyncStreamError as exc:
raise exc
except CatchableError as exc:
except TransportError as exc:
raise newAsyncStreamWriteError(exc)
if res != nbytes:
raise newAsyncStreamIncompleteError()
@ -720,23 +728,17 @@ proc write*(wstream: AsyncStreamWriter, pbytes: pointer,
await write(wstream.wsource, pbytes, nbytes)
wstream.bytesCount = wstream.bytesCount + uint64(nbytes)
else:
var item = WriteItem(kind: Pointer)
item.dataPtr = pbytes
item.size = nbytes
item.future = newFuture[void]("async.stream.write(pointer)")
try:
await wstream.queue.put(item)
await item.future
wstream.bytesCount = wstream.bytesCount + uint64(item.size)
except CancelledError as exc:
raise exc
except AsyncStreamError as exc:
raise exc
except CatchableError as exc:
raise newAsyncStreamWriteError(exc)
let item = WriteItem(
kind: Pointer, dataPtr: pbytes, size: nbytes,
future: Future[void].Raising([CancelledError, AsyncStreamError])
.init("async.stream.write(pointer)"))
await wstream.queue.put(item)
await item.future
wstream.bytesCount = wstream.bytesCount + uint64(item.size)
proc write*(wstream: AsyncStreamWriter, sbytes: sink seq[byte],
msglen = -1) {.async.} =
msglen = -1) {.
async: (raises: [CancelledError, AsyncStreamError]).} =
## Write sequence of bytes ``sbytes`` of length ``msglen`` to writer
## stream ``wstream``.
##
@ -758,7 +760,7 @@ proc write*(wstream: AsyncStreamWriter, sbytes: sink seq[byte],
res = await write(wstream.tsource, sbytes, length)
except CancelledError as exc:
raise exc
except CatchableError as exc:
except TransportError as exc:
raise newAsyncStreamWriteError(exc)
if res != length:
raise newAsyncStreamIncompleteError()
@ -768,29 +770,17 @@ proc write*(wstream: AsyncStreamWriter, sbytes: sink seq[byte],
await write(wstream.wsource, sbytes, length)
wstream.bytesCount = wstream.bytesCount + uint64(length)
else:
var item = WriteItem(kind: Sequence)
when declared(shallowCopy):
if not(isLiteral(sbytes)):
shallowCopy(item.dataSeq, sbytes)
else:
item.dataSeq = sbytes
else:
item.dataSeq = sbytes
item.size = length
item.future = newFuture[void]("async.stream.write(seq)")
try:
await wstream.queue.put(item)
await item.future
wstream.bytesCount = wstream.bytesCount + uint64(item.size)
except CancelledError as exc:
raise exc
except AsyncStreamError as exc:
raise exc
except CatchableError as exc:
raise newAsyncStreamWriteError(exc)
let item = WriteItem(
kind: Sequence, dataSeq: move(sbytes), size: length,
future: Future[void].Raising([CancelledError, AsyncStreamError])
.init("async.stream.write(seq)"))
await wstream.queue.put(item)
await item.future
wstream.bytesCount = wstream.bytesCount + uint64(item.size)
proc write*(wstream: AsyncStreamWriter, sbytes: sink string,
msglen = -1) {.async.} =
msglen = -1) {.
async: (raises: [CancelledError, AsyncStreamError]).} =
## Write string ``sbytes`` of length ``msglen`` to writer stream ``wstream``.
##
## String ``sbytes`` must not be zero-length.
@ -811,7 +801,7 @@ proc write*(wstream: AsyncStreamWriter, sbytes: sink string,
res = await write(wstream.tsource, sbytes, length)
except CancelledError as exc:
raise exc
except CatchableError as exc:
except TransportError as exc:
raise newAsyncStreamWriteError(exc)
if res != length:
raise newAsyncStreamIncompleteError()
@ -821,28 +811,16 @@ proc write*(wstream: AsyncStreamWriter, sbytes: sink string,
await write(wstream.wsource, sbytes, length)
wstream.bytesCount = wstream.bytesCount + uint64(length)
else:
var item = WriteItem(kind: String)
when declared(shallowCopy):
if not(isLiteral(sbytes)):
shallowCopy(item.dataStr, sbytes)
else:
item.dataStr = sbytes
else:
item.dataStr = sbytes
item.size = length
item.future = newFuture[void]("async.stream.write(string)")
try:
await wstream.queue.put(item)
await item.future
wstream.bytesCount = wstream.bytesCount + uint64(item.size)
except CancelledError as exc:
raise exc
except AsyncStreamError as exc:
raise exc
except CatchableError as exc:
raise newAsyncStreamWriteError(exc)
let item = WriteItem(
kind: String, dataStr: move(sbytes), size: length,
future: Future[void].Raising([CancelledError, AsyncStreamError])
.init("async.stream.write(string)"))
await wstream.queue.put(item)
await item.future
wstream.bytesCount = wstream.bytesCount + uint64(item.size)
proc finish*(wstream: AsyncStreamWriter) {.async.} =
proc finish*(wstream: AsyncStreamWriter) {.
async: (raises: [CancelledError, AsyncStreamError]).} =
## Finish write stream ``wstream``.
checkStreamClosed(wstream)
# For AsyncStreamWriter Finished state could be set manually or by stream's
@ -852,20 +830,15 @@ proc finish*(wstream: AsyncStreamWriter) {.async.} =
if isNil(wstream.writerLoop):
await wstream.wsource.finish()
else:
var item = WriteItem(kind: Pointer)
item.size = 0
item.future = newFuture[void]("async.stream.finish")
try:
await wstream.queue.put(item)
await item.future
except CancelledError as exc:
raise exc
except AsyncStreamError as exc:
raise exc
except CatchableError as exc:
raise newAsyncStreamWriteError(exc)
let item = WriteItem(
kind: Pointer, size: 0,
future: Future[void].Raising([CancelledError, AsyncStreamError])
.init("async.stream.finish"))
await wstream.queue.put(item)
await item.future
proc join*(rw: AsyncStreamRW): Future[void] =
proc join*(rw: AsyncStreamRW): Future[void] {.
async: (raw: true, raises: [CancelledError]).} =
## Get Future[void] which will be completed when stream become finished or
## closed.
when rw is AsyncStreamReader:
@ -924,7 +897,8 @@ proc close*(rw: AsyncStreamRW) =
rw.future.addCallback(continuation)
rw.future.cancelSoon()
proc closeWait*(rw: AsyncStreamRW): Future[void] =
proc closeWait*(rw: AsyncStreamRW): Future[void] {.
async: (raw: true, raises: []).} =
## Close and frees resources of stream ``rw``.
const FutureName =
when rw is AsyncStreamReader:
@ -932,25 +906,20 @@ proc closeWait*(rw: AsyncStreamRW): Future[void] =
else:
"async.stream.writer.closeWait"
if rw.closed():
return Future.completed(FutureName)
let retFuture = Future[void].Raising([]).init(FutureName)
let retFuture = newFuture[void](FutureName, {FutureFlag.OwnCancelSchedule})
if rw.closed():
retFuture.complete()
return retFuture
proc continuation(udata: pointer) {.gcsafe, raises:[].} =
retFuture.complete()
proc cancellation(udata: pointer) {.gcsafe, raises:[].} =
# We are not going to change the state of `retFuture` to cancelled, so we
# will prevent the entire sequence of Futures from being cancelled.
discard
rw.close()
if rw.future.finished():
retFuture.complete()
else:
rw.future.addCallback(continuation, cast[pointer](retFuture))
retFuture.cancelCallback = cancellation
retFuture
proc startReader(rstream: AsyncStreamReader) =

View File

@ -14,6 +14,9 @@
##
## For stream writing it means that you should write exactly bounded size
## of bytes.
{.push raises: [].}
import results
import ../asyncloop, ../timer
import asyncstream, ../transports/stream, ../transports/common
@ -52,7 +55,8 @@ template newBoundedStreamOverflowError(): ref BoundedStreamOverflowError =
newException(BoundedStreamOverflowError, "Stream boundary exceeded")
proc readUntilBoundary(rstream: AsyncStreamReader, pbytes: pointer,
nbytes: int, sep: seq[byte]): Future[int] {.async.} =
nbytes: int, sep: seq[byte]): Future[int] {.
async: (raises: [CancelledError, AsyncStreamError]).} =
doAssert(not(isNil(pbytes)), "pbytes must not be nil")
doAssert(nbytes >= 0, "nbytes must be non-negative value")
checkStreamClosed(rstream)
@ -96,7 +100,7 @@ func endsWith(s, suffix: openArray[byte]): bool =
inc(i)
if i >= len(suffix): return true
proc boundedReadLoop(stream: AsyncStreamReader) {.async.} =
proc boundedReadLoop(stream: AsyncStreamReader) {.async: (raises: []).} =
var rstream = BoundedStreamReader(stream)
rstream.state = AsyncStreamState.Running
var buffer = newSeq[byte](rstream.buffer.bufferLen())
@ -186,12 +190,16 @@ proc boundedReadLoop(stream: AsyncStreamReader) {.async.} =
break
of AsyncStreamState.Finished:
# Send `EOF` state to the consumer and wait until it will be received.
await rstream.buffer.transfer()
try:
await rstream.buffer.transfer()
except CancelledError:
rstream.state = AsyncStreamState.Error
rstream.error = newBoundedStreamIncompleteError()
break
of AsyncStreamState.Closing, AsyncStreamState.Closed:
break
proc boundedWriteLoop(stream: AsyncStreamWriter) {.async.} =
proc boundedWriteLoop(stream: AsyncStreamWriter) {.async: (raises: []).} =
var error: ref AsyncStreamError
var wstream = BoundedStreamWriter(stream)
@ -255,7 +263,11 @@ proc boundedWriteLoop(stream: AsyncStreamWriter) {.async.} =
doAssert(not(isNil(error)))
while not(wstream.queue.empty()):
let item = wstream.queue.popFirstNoWait()
let item =
try:
wstream.queue.popFirstNoWait()
except AsyncQueueEmptyError:
raiseAssert "AsyncQueue should not be empty at this moment"
if not(item.future.finished()):
item.future.fail(error)

View File

@ -8,6 +8,9 @@
# MIT license (LICENSE-MIT)
## This module implements HTTP/1.1 chunked-encoded stream reading and writing.
{.push raises: [].}
import ../asyncloop, ../timer
import asyncstream, ../transports/stream, ../transports/common
import results
@ -95,7 +98,7 @@ proc setChunkSize(buffer: var openArray[byte], length: int64): int =
buffer[c + 1] = byte(0x0A)
(c + 2)
proc chunkedReadLoop(stream: AsyncStreamReader) {.async.} =
proc chunkedReadLoop(stream: AsyncStreamReader) {.async: (raises: []).} =
var rstream = ChunkedStreamReader(stream)
var buffer = newSeq[byte](MaxChunkHeaderSize)
rstream.state = AsyncStreamState.Running
@ -156,6 +159,10 @@ proc chunkedReadLoop(stream: AsyncStreamReader) {.async.} =
if rstream.state == AsyncStreamState.Running:
rstream.state = AsyncStreamState.Error
rstream.error = exc
except AsyncStreamError as exc:
if rstream.state == AsyncStreamState.Running:
rstream.state = AsyncStreamState.Error
rstream.error = exc
if rstream.state != AsyncStreamState.Running:
# We need to notify consumer about error/close, but we do not care about
@ -163,7 +170,7 @@ proc chunkedReadLoop(stream: AsyncStreamReader) {.async.} =
rstream.buffer.forget()
break
proc chunkedWriteLoop(stream: AsyncStreamWriter) {.async.} =
proc chunkedWriteLoop(stream: AsyncStreamWriter) {.async: (raises: []).} =
var wstream = ChunkedStreamWriter(stream)
var buffer: array[16, byte]
var error: ref AsyncStreamError
@ -220,7 +227,11 @@ proc chunkedWriteLoop(stream: AsyncStreamWriter) {.async.} =
if not(item.future.finished()):
item.future.fail(error)
while not(wstream.queue.empty()):
let pitem = wstream.queue.popFirstNoWait()
let pitem =
try:
wstream.queue.popFirstNoWait()
except AsyncQueueEmptyError:
raiseAssert "AsyncQueue should not be empty at this moment"
if not(pitem.future.finished()):
pitem.future.fail(error)
break

View File

@ -9,6 +9,9 @@
## This module implements Transport Layer Security (TLS) stream. This module
## uses sources of BearSSL <https://www.bearssl.org> by Thomas Pornin.
{.push raises: [].}
import
bearssl/[brssl, ec, errors, pem, rsa, ssl, x509],
bearssl/certs/cacert
@ -71,7 +74,7 @@ type
scontext: ptr SslServerContext
stream*: TLSAsyncStream
handshaked*: bool
handshakeFut*: Future[void]
handshakeFut*: Future[void].Raising([CancelledError, AsyncStreamError])
TLSStreamReader* = ref object of AsyncStreamReader
case kind: TLSStreamKind
@ -81,7 +84,7 @@ type
scontext: ptr SslServerContext
stream*: TLSAsyncStream
handshaked*: bool
handshakeFut*: Future[void]
handshakeFut*: Future[void].Raising([CancelledError, AsyncStreamError])
TLSAsyncStream* = ref object of RootRef
xwc*: X509NoanchorContext
@ -91,7 +94,7 @@ type
x509*: X509MinimalContext
reader*: TLSStreamReader
writer*: TLSStreamWriter
mainLoop*: Future[void]
mainLoop*: Future[void].Raising([])
trustAnchors: TrustAnchorStore
SomeTLSStreamType* = TLSStreamReader|TLSStreamWriter|TLSAsyncStream
@ -101,9 +104,7 @@ type
TLSStreamHandshakeError* = object of TLSStreamError
TLSStreamInitError* = object of TLSStreamError
TLSStreamReadError* = object of TLSStreamError
par*: ref AsyncStreamError
TLSStreamWriteError* = object of TLSStreamError
par*: ref AsyncStreamError
TLSStreamProtocolError* = object of TLSStreamError
errCode*: int
@ -111,7 +112,7 @@ proc newTLSStreamWriteError(p: ref AsyncStreamError): ref TLSStreamWriteError {.
noinline.} =
var w = newException(TLSStreamWriteError, "Write stream failed")
w.msg = w.msg & ", originated from [" & $p.name & "] " & p.msg
w.par = p
w.parent = p
w
template newTLSStreamProtocolImpl[T](message: T): ref TLSStreamProtocolError =
@ -137,7 +138,8 @@ template newTLSUnexpectedProtocolError(): ref TLSStreamProtocolError =
proc newTLSStreamProtocolError[T](message: T): ref TLSStreamProtocolError =
newTLSStreamProtocolImpl(message)
proc raiseTLSStreamProtocolError[T](message: T) {.noreturn, noinline.} =
proc raiseTLSStreamProtocolError[T](message: T) {.
noreturn, noinline, raises: [TLSStreamProtocolError].} =
raise newTLSStreamProtocolImpl(message)
proc new*(T: typedesc[TrustAnchorStore],
@ -150,7 +152,8 @@ proc new*(T: typedesc[TrustAnchorStore],
TrustAnchorStore(anchors: res)
proc tlsWriteRec(engine: ptr SslEngineContext,
writer: TLSStreamWriter): Future[TLSResult] {.async.} =
writer: TLSStreamWriter): Future[TLSResult] {.
async: (raises: []).} =
try:
var length = 0'u
var buf = sslEngineSendrecBuf(engine[], length)
@ -168,7 +171,8 @@ proc tlsWriteRec(engine: ptr SslEngineContext,
TLSResult.Stopped
proc tlsWriteApp(engine: ptr SslEngineContext,
writer: TLSStreamWriter): Future[TLSResult] {.async.} =
writer: TLSStreamWriter): Future[TLSResult] {.
async: (raises: []).} =
try:
var item = await writer.queue.get()
if item.size > 0:
@ -192,7 +196,10 @@ proc tlsWriteApp(engine: ptr SslEngineContext,
# only part of item and adjust offset.
item.offset = item.offset + int(length)
item.size = item.size - int(length)
writer.queue.addFirstNoWait(item)
try:
writer.queue.addFirstNoWait(item)
except AsyncQueueFullError:
raiseAssert "AsyncQueue should not be full at this moment"
sslEngineSendappAck(engine[], length)
TLSResult.Success
else:
@ -205,7 +212,8 @@ proc tlsWriteApp(engine: ptr SslEngineContext,
TLSResult.Stopped
proc tlsReadRec(engine: ptr SslEngineContext,
reader: TLSStreamReader): Future[TLSResult] {.async.} =
reader: TLSStreamReader): Future[TLSResult] {.
async: (raises: []).} =
try:
var length = 0'u
var buf = sslEngineRecvrecBuf(engine[], length)
@ -226,7 +234,8 @@ proc tlsReadRec(engine: ptr SslEngineContext,
TLSResult.Stopped
proc tlsReadApp(engine: ptr SslEngineContext,
reader: TLSStreamReader): Future[TLSResult] {.async.} =
reader: TLSStreamReader): Future[TLSResult] {.
async: (raises: []).} =
try:
var length = 0'u
var buf = sslEngineRecvappBuf(engine[], length)
@ -240,7 +249,7 @@ proc tlsReadApp(engine: ptr SslEngineContext,
template readAndReset(fut: untyped) =
if fut.finished():
let res = fut.read()
let res = fut.value()
case res
of TLSResult.Success, TLSResult.WriteEof, TLSResult.Stopped:
fut = nil
@ -256,18 +265,6 @@ template readAndReset(fut: untyped) =
loopState = AsyncStreamState.Finished
break
proc cancelAndWait*(a, b, c, d: Future[TLSResult]): Future[void] =
var waiting: seq[FutureBase]
if not(isNil(a)) and not(a.finished()):
waiting.add(a.cancelAndWait())
if not(isNil(b)) and not(b.finished()):
waiting.add(b.cancelAndWait())
if not(isNil(c)) and not(c.finished()):
waiting.add(c.cancelAndWait())
if not(isNil(d)) and not(d.finished()):
waiting.add(d.cancelAndWait())
allFutures(waiting)
proc dumpState*(state: cuint): string =
var res = ""
if (state and SSL_CLOSED) == SSL_CLOSED:
@ -287,10 +284,10 @@ proc dumpState*(state: cuint): string =
res.add("SSL_RECVAPP")
"{" & res & "}"
proc tlsLoop*(stream: TLSAsyncStream) {.async.} =
proc tlsLoop*(stream: TLSAsyncStream) {.async: (raises: []).} =
var
sendRecFut, sendAppFut: Future[TLSResult]
recvRecFut, recvAppFut: Future[TLSResult]
sendRecFut, sendAppFut: Future[TLSResult].Raising([])
recvRecFut, recvAppFut: Future[TLSResult].Raising([])
let engine =
case stream.reader.kind
@ -302,7 +299,7 @@ proc tlsLoop*(stream: TLSAsyncStream) {.async.} =
var loopState = AsyncStreamState.Running
while true:
var waiting: seq[Future[TLSResult]]
var waiting: seq[Future[TLSResult].Raising([])]
var state = sslEngineCurrentState(engine[])
if (state and SSL_CLOSED) == SSL_CLOSED:
@ -353,6 +350,8 @@ proc tlsLoop*(stream: TLSAsyncStream) {.async.} =
if len(waiting) > 0:
try:
discard await one(waiting)
except ValueError:
raiseAssert "array should not be empty at this moment"
except CancelledError:
if loopState == AsyncStreamState.Running:
loopState = AsyncStreamState.Stopped
@ -360,8 +359,18 @@ proc tlsLoop*(stream: TLSAsyncStream) {.async.} =
if loopState != AsyncStreamState.Running:
break
# Cancelling and waiting all the pending operations
await cancelAndWait(sendRecFut, sendAppFut, recvRecFut, recvAppFut)
# Cancelling and waiting and all the pending operations
var pending: seq[FutureBase]
if not(isNil(sendRecFut)) and not(sendRecFut.finished()):
pending.add(sendRecFut.cancelAndWait())
if not(isNil(sendAppFut)) and not(sendAppFut.finished()):
pending.add(sendAppFut.cancelAndWait())
if not(isNil(recvRecFut)) and not(recvRecFut.finished()):
pending.add(recvRecFut.cancelAndWait())
if not(isNil(recvAppFut)) and not(recvAppFut.finished()):
pending.add(recvAppFut.cancelAndWait())
await noCancel(allFutures(pending))
# Calculating error
let error =
case loopState
@ -395,7 +404,11 @@ proc tlsLoop*(stream: TLSAsyncStream) {.async.} =
if not(isNil(error)):
# Completing all pending writes
while(not(stream.writer.queue.empty())):
let item = stream.writer.queue.popFirstNoWait()
let item =
try:
stream.writer.queue.popFirstNoWait()
except AsyncQueueEmptyError:
raiseAssert "AsyncQueue should not be empty at this moment"
if not(item.future.finished()):
item.future.fail(error)
# Completing handshake
@ -415,18 +428,18 @@ proc tlsLoop*(stream: TLSAsyncStream) {.async.} =
# Completing readers
stream.reader.buffer.forget()
proc tlsWriteLoop(stream: AsyncStreamWriter) {.async.} =
proc tlsWriteLoop(stream: AsyncStreamWriter) {.async: (raises: []).} =
var wstream = TLSStreamWriter(stream)
wstream.state = AsyncStreamState.Running
await sleepAsync(0.milliseconds)
await noCancel(sleepAsync(0.milliseconds))
if isNil(wstream.stream.mainLoop):
wstream.stream.mainLoop = tlsLoop(wstream.stream)
await wstream.stream.mainLoop
proc tlsReadLoop(stream: AsyncStreamReader) {.async.} =
proc tlsReadLoop(stream: AsyncStreamReader) {.async: (raises: []).} =
var rstream = TLSStreamReader(stream)
rstream.state = AsyncStreamState.Running
await sleepAsync(0.milliseconds)
await noCancel(sleepAsync(0.milliseconds))
if isNil(rstream.stream.mainLoop):
rstream.stream.mainLoop = tlsLoop(rstream.stream)
await rstream.stream.mainLoop
@ -451,7 +464,7 @@ proc newTLSClientAsyncStream*(
maxVersion = TLSVersion.TLS12,
flags: set[TLSFlags] = {},
trustAnchors: SomeTrustAnchorType = MozillaTrustAnchors
): TLSAsyncStream =
): TLSAsyncStream {.raises: [TLSStreamInitError].} =
## Create new TLS asynchronous stream for outbound (client) connections
## using reading stream ``rsource`` and writing stream ``wsource``.
##
@ -541,7 +554,8 @@ proc newTLSServerAsyncStream*(rsource: AsyncStreamReader,
minVersion = TLSVersion.TLS11,
maxVersion = TLSVersion.TLS12,
cache: TLSSessionCache = nil,
flags: set[TLSFlags] = {}): TLSAsyncStream =
flags: set[TLSFlags] = {}): TLSAsyncStream {.
raises: [TLSStreamInitError, TLSStreamProtocolError].} =
## Create new TLS asynchronous stream for inbound (server) connections
## using reading stream ``rsource`` and writing stream ``wsource``.
##
@ -609,10 +623,8 @@ proc newTLSServerAsyncStream*(rsource: AsyncStreamReader,
if err == 0:
raise newException(TLSStreamInitError, "Could not initialize TLS layer")
init(AsyncStreamWriter(res.writer), wsource, tlsWriteLoop,
bufferSize)
init(AsyncStreamReader(res.reader), rsource, tlsReadLoop,
bufferSize)
init(AsyncStreamWriter(res.writer), wsource, tlsWriteLoop, bufferSize)
init(AsyncStreamReader(res.reader), rsource, tlsReadLoop, bufferSize)
res
proc copyKey(src: RsaPrivateKey): TLSPrivateKey =
@ -653,7 +665,8 @@ proc copyKey(src: EcPrivateKey): TLSPrivateKey =
res.eckey.curve = src.curve
res
proc init*(tt: typedesc[TLSPrivateKey], data: openArray[byte]): TLSPrivateKey =
proc init*(tt: typedesc[TLSPrivateKey], data: openArray[byte]): TLSPrivateKey {.
raises: [TLSStreamProtocolError].} =
## Initialize TLS private key from array of bytes ``data``.
##
## This procedure initializes private key using raw, DER-encoded format,
@ -676,7 +689,8 @@ proc init*(tt: typedesc[TLSPrivateKey], data: openArray[byte]): TLSPrivateKey =
raiseTLSStreamProtocolError("Unknown key type (" & $keyType & ")")
res
proc pemDecode*(data: openArray[char]): seq[PEMElement] =
proc pemDecode*(data: openArray[char]): seq[PEMElement] {.
raises: [TLSStreamProtocolError].} =
## Decode PEM encoded string and get array of binary blobs.
if len(data) == 0:
raiseTLSStreamProtocolError("Empty PEM message")
@ -717,7 +731,8 @@ proc pemDecode*(data: openArray[char]): seq[PEMElement] =
raiseTLSStreamProtocolError("Invalid PEM encoding")
res
proc init*(tt: typedesc[TLSPrivateKey], data: openArray[char]): TLSPrivateKey =
proc init*(tt: typedesc[TLSPrivateKey], data: openArray[char]): TLSPrivateKey {.
raises: [TLSStreamProtocolError].} =
## Initialize TLS private key from string ``data``.
##
## This procedure initializes private key using unencrypted PKCS#8 PEM
@ -735,7 +750,8 @@ proc init*(tt: typedesc[TLSPrivateKey], data: openArray[char]): TLSPrivateKey =
res
proc init*(tt: typedesc[TLSCertificate],
data: openArray[char]): TLSCertificate =
data: openArray[char]): TLSCertificate {.
raises: [TLSStreamProtocolError].} =
## Initialize TLS certificates from string ``data``.
##
## This procedure initializes array of certificates from PEM encoded string.
@ -770,9 +786,11 @@ proc init*(tt: typedesc[TLSSessionCache], size: int = 4096): TLSSessionCache =
sslSessionCacheLruInit(addr res.context, addr res.storage[0], rsize)
res
proc handshake*(rws: SomeTLSStreamType): Future[void] =
proc handshake*(rws: SomeTLSStreamType): Future[void] {.
async: (raw: true, raises: [CancelledError, AsyncStreamError]).} =
## Wait until initial TLS handshake will be successfully performed.
var retFuture = newFuture[void]("tlsstream.handshake")
let retFuture = Future[void].Raising([CancelledError, AsyncStreamError])
.init("tlsstream.handshake")
when rws is TLSStreamReader:
if rws.handshaked:
retFuture.complete()

View File

@ -140,7 +140,7 @@ type
# transport for new client
proc remoteAddress*(transp: StreamTransport): TransportAddress {.
raises: [TransportAbortedError, TransportTooManyError, TransportOsError].} =
raises: [TransportOsError].} =
## Returns ``transp`` remote socket address.
doAssert(transp.kind == TransportKind.Socket, "Socket transport required!")
if transp.remote.family == AddressFamily.None:
@ -148,12 +148,12 @@ proc remoteAddress*(transp: StreamTransport): TransportAddress {.
var slen = SockLen(sizeof(saddr))
if getpeername(SocketHandle(transp.fd), cast[ptr SockAddr](addr saddr),
addr slen) != 0:
raiseTransportError(osLastError())
raiseTransportOsError(osLastError())
fromSAddr(addr saddr, slen, transp.remote)
transp.remote
proc localAddress*(transp: StreamTransport): TransportAddress {.
raises: [TransportAbortedError, TransportTooManyError, TransportOsError].} =
raises: [TransportOsError].} =
## Returns ``transp`` local socket address.
doAssert(transp.kind == TransportKind.Socket, "Socket transport required!")
if transp.local.family == AddressFamily.None:
@ -161,7 +161,7 @@ proc localAddress*(transp: StreamTransport): TransportAddress {.
var slen = SockLen(sizeof(saddr))
if getsockname(SocketHandle(transp.fd), cast[ptr SockAddr](addr saddr),
addr slen) != 0:
raiseTransportError(osLastError())
raiseTransportOsError(osLastError())
fromSAddr(addr saddr, slen, transp.local)
transp.local