diff --git a/chronos/asyncsync.nim b/chronos/asyncsync.nim index 9bab1fd6..f77d5fe5 100644 --- a/chronos/asyncsync.nim +++ b/chronos/asyncsync.nim @@ -165,7 +165,7 @@ proc newAsyncEvent*(): AsyncEvent = AsyncEvent() proc wait*(event: AsyncEvent): Future[void] {. - async: (raw: true, raises: [CancelledError]).} = + async: (raw: true, raises: [CancelledError]).} = ## Block until the internal flag of ``event`` is `true`. ## If the internal flag is `true` on entry, return immediately. Otherwise, ## block until another task calls `fire()` to set the flag to `true`, @@ -258,7 +258,7 @@ proc popLastImpl[T](aq: AsyncQueue[T]): T = res proc addFirstNoWait*[T](aq: AsyncQueue[T], item: T) {. - raises: [AsyncQueueFullError].}= + raises: [AsyncQueueFullError].} = ## Put an item ``item`` to the beginning of the queue ``aq`` immediately. ## ## If queue ``aq`` is full, then ``AsyncQueueFullError`` exception raised. @@ -267,7 +267,7 @@ proc addFirstNoWait*[T](aq: AsyncQueue[T], item: T) {. aq.addFirstImpl(item) proc addLastNoWait*[T](aq: AsyncQueue[T], item: T) {. - raises: [AsyncQueueFullError].}= + raises: [AsyncQueueFullError].} = ## Put an item ``item`` at the end of the queue ``aq`` immediately. ## ## If queue ``aq`` is full, then ``AsyncQueueFullError`` exception raised. @@ -276,7 +276,7 @@ proc addLastNoWait*[T](aq: AsyncQueue[T], item: T) {. aq.addLastImpl(item) proc popFirstNoWait*[T](aq: AsyncQueue[T]): T {. - raises: [AsyncQueueEmptyError].} = + raises: [AsyncQueueEmptyError].} = ## Get an item from the beginning of the queue ``aq`` immediately. ## ## If queue ``aq`` is empty, then ``AsyncQueueEmptyError`` exception raised. @@ -285,7 +285,7 @@ proc popFirstNoWait*[T](aq: AsyncQueue[T]): T {. aq.popFirstImpl() proc popLastNoWait*[T](aq: AsyncQueue[T]): T {. - raises: [AsyncQueueEmptyError].} = + raises: [AsyncQueueEmptyError].} = ## Get an item from the end of the queue ``aq`` immediately. ## ## If queue ``aq`` is empty, then ``AsyncQueueEmptyError`` exception raised. @@ -293,11 +293,13 @@ proc popLastNoWait*[T](aq: AsyncQueue[T]): T {. raise newException(AsyncQueueEmptyError, "AsyncQueue is empty!") aq.popLastImpl() -proc addFirst*[T](aq: AsyncQueue[T], item: T) {.async: (raises: [CancelledError]).} = +proc addFirst*[T](aq: AsyncQueue[T], item: T) {. + async: (raises: [CancelledError]).} = ## Put an ``item`` to the beginning of the queue ``aq``. If the queue is full, ## wait until a free slot is available before adding item. while aq.full(): - let putter = Future[void].Raising([CancelledError]).init("AsyncQueue.addFirst") + let putter = + Future[void].Raising([CancelledError]).init("AsyncQueue.addFirst") aq.putters.add(putter) try: await putter @@ -307,11 +309,13 @@ proc addFirst*[T](aq: AsyncQueue[T], item: T) {.async: (raises: [CancelledError] raise exc aq.addFirstImpl(item) -proc addLast*[T](aq: AsyncQueue[T], item: T) {.async: (raises: [CancelledError]).} = +proc addLast*[T](aq: AsyncQueue[T], item: T) {. + async: (raises: [CancelledError]).} = ## Put an ``item`` to the end of the queue ``aq``. If the queue is full, ## wait until a free slot is available before adding item. while aq.full(): - let putter = Future[void].Raising([CancelledError]).init("AsyncQueue.addLast") + let putter = + Future[void].Raising([CancelledError]).init("AsyncQueue.addLast") aq.putters.add(putter) try: await putter @@ -321,11 +325,13 @@ proc addLast*[T](aq: AsyncQueue[T], item: T) {.async: (raises: [CancelledError]) raise exc aq.addLastImpl(item) -proc popFirst*[T](aq: AsyncQueue[T]): Future[T] {.async: (raises: [CancelledError]).} = +proc popFirst*[T](aq: AsyncQueue[T]): Future[T] {. + async: (raises: [CancelledError]).} = ## Remove and return an ``item`` from the beginning of the queue ``aq``. ## If the queue is empty, wait until an item is available. while aq.empty(): - let getter = Future[void].Raising([CancelledError]).init("AsyncQueue.popFirst") + let getter = + Future[void].Raising([CancelledError]).init("AsyncQueue.popFirst") aq.getters.add(getter) try: await getter @@ -335,11 +341,13 @@ proc popFirst*[T](aq: AsyncQueue[T]): Future[T] {.async: (raises: [CancelledErro raise exc aq.popFirstImpl() -proc popLast*[T](aq: AsyncQueue[T]): Future[T] {.async: (raises: [CancelledError]).} = +proc popLast*[T](aq: AsyncQueue[T]): Future[T] {. + async: (raises: [CancelledError]).} = ## Remove and return an ``item`` from the end of the queue ``aq``. ## If the queue is empty, wait until an item is available. while aq.empty(): - let getter = Future[void].Raising([CancelledError]).init("AsyncQueue.popLast") + let getter = + Future[void].Raising([CancelledError]).init("AsyncQueue.popLast") aq.getters.add(getter) try: await getter @@ -350,22 +358,22 @@ proc popLast*[T](aq: AsyncQueue[T]): Future[T] {.async: (raises: [CancelledError aq.popLastImpl() proc putNoWait*[T](aq: AsyncQueue[T], item: T) {. - raises: [AsyncQueueFullError].} = + raises: [AsyncQueueFullError].} = ## Alias of ``addLastNoWait()``. aq.addLastNoWait(item) proc getNoWait*[T](aq: AsyncQueue[T]): T {. - raises: [AsyncQueueEmptyError].} = + raises: [AsyncQueueEmptyError].} = ## Alias of ``popFirstNoWait()``. aq.popFirstNoWait() proc put*[T](aq: AsyncQueue[T], item: T): Future[void] {. - async: (raw: true, raises: [CancelledError]).} = + async: (raw: true, raises: [CancelledError]).} = ## Alias of ``addLast()``. aq.addLast(item) proc get*[T](aq: AsyncQueue[T]): Future[T] {. - async: (raw: true, raises: [CancelledError]).} = + async: (raw: true, raises: [CancelledError]).} = ## Alias of ``popFirst()``. aq.popFirst() @@ -509,7 +517,8 @@ proc close*(ab: AsyncEventQueue) {.raises: [].} = ab.readers.reset() ab.queue.clear() -proc closeWait*(ab: AsyncEventQueue): Future[void] {.async: (raw: true, raises: []).} = +proc closeWait*(ab: AsyncEventQueue): Future[void] {. + async: (raw: true, raises: []).} = let retFuture = newFuture[void]("AsyncEventQueue.closeWait()", {FutureFlag.OwnCancelSchedule}) proc continuation(udata: pointer) {.gcsafe.} = @@ -568,7 +577,7 @@ proc emit*[T](ab: AsyncEventQueue[T], data: T) = proc waitEvents*[T](ab: AsyncEventQueue[T], key: EventQueueKey, eventsCount = -1): Future[seq[T]] {. - async: (raises: [AsyncEventQueueFullError, CancelledError]).} = + async: (raises: [AsyncEventQueueFullError, CancelledError]).} = ## Wait for events var events: seq[T] diff --git a/chronos/streams/asyncstream.nim b/chronos/streams/asyncstream.nim index 4698e835..a5210847 100644 --- a/chronos/streams/asyncstream.nim +++ b/chronos/streams/asyncstream.nim @@ -24,15 +24,13 @@ const ## AsyncStreamWriter leaks tracker name type - AsyncStreamError* = object of CatchableError + AsyncStreamError* = object of AsyncError AsyncStreamIncorrectDefect* = object of Defect AsyncStreamIncompleteError* = object of AsyncStreamError AsyncStreamLimitError* = object of AsyncStreamError AsyncStreamUseClosedError* = object of AsyncStreamError AsyncStreamReadError* = object of AsyncStreamError - par*: ref CatchableError AsyncStreamWriteError* = object of AsyncStreamError - par*: ref CatchableError AsyncStreamWriteEOFError* = object of AsyncStreamWriteError AsyncBuffer* = object @@ -53,7 +51,7 @@ type dataStr*: string size*: int offset*: int - future*: Future[void] + future*: Future[void].Raising([CancelledError, AsyncStreamError]) AsyncStreamState* = enum Running, ## Stream is online and working @@ -64,10 +62,10 @@ type Closed ## Stream was closed StreamReaderLoop* = proc (stream: AsyncStreamReader): Future[void] {. - gcsafe, raises: [].} + async: (raises: []).} ## Main read loop for read streams. StreamWriterLoop* = proc (stream: AsyncStreamWriter): Future[void] {. - gcsafe, raises: [].} + async: (raises: []).} ## Main write loop for write streams. AsyncStreamReader* = ref object of RootRef @@ -124,12 +122,12 @@ proc `[]`*(sb: AsyncBuffer, index: int): byte {.inline.} = proc update*(sb: var AsyncBuffer, size: int) {.inline.} = sb.offset += size -proc wait*(sb: var AsyncBuffer): Future[void] = +template wait*(sb: var AsyncBuffer): untyped = sb.events[0].clear() sb.events[1].fire() sb.events[0].wait() -proc transfer*(sb: var AsyncBuffer): Future[void] = +template transfer*(sb: var AsyncBuffer): untyped = sb.events[1].clear() sb.events[0].fire() sb.events[1].wait() @@ -150,7 +148,8 @@ proc copyData*(sb: AsyncBuffer, dest: pointer, offset, length: int) {.inline.} = unsafeAddr sb.buffer[0], length) proc upload*(sb: ptr AsyncBuffer, pbytes: ptr byte, - nbytes: int): Future[void] {.async.} = + nbytes: int): Future[void] {. + async: (raises: [CancelledError]).} = ## You can upload any amount of bytes to the buffer. If size of internal ## buffer is not enough to fit all the data at once, data will be uploaded ## via chunks of size up to internal buffer size. @@ -186,18 +185,20 @@ template copyOut*(dest: pointer, item: WriteItem, length: int) = elif item.kind == String: copyMem(dest, unsafeAddr item.dataStr[item.offset], length) -proc newAsyncStreamReadError(p: ref CatchableError): ref AsyncStreamReadError {. - noinline.} = +proc newAsyncStreamReadError( + p: ref TransportError + ): ref AsyncStreamReadError {.noinline.} = var w = newException(AsyncStreamReadError, "Read stream failed") w.msg = w.msg & ", originated from [" & $p.name & "] " & p.msg - w.par = p + w.parent = p w -proc newAsyncStreamWriteError(p: ref CatchableError): ref AsyncStreamWriteError {. - noinline.} = +proc newAsyncStreamWriteError( + p: ref TransportError + ): ref AsyncStreamWriteError {.noinline.} = var w = newException(AsyncStreamWriteError, "Write stream failed") w.msg = w.msg & ", originated from [" & $p.name & "] " & p.msg - w.par = p + w.parent = p w proc newAsyncStreamIncompleteError*(): ref AsyncStreamIncompleteError {. @@ -344,7 +345,8 @@ template readLoop(body: untyped): untyped = await rstream.buffer.wait() proc readExactly*(rstream: AsyncStreamReader, pbytes: pointer, - nbytes: int) {.async.} = + nbytes: int) {. + async: (raises: [CancelledError, AsyncStreamError]).} = ## Read exactly ``nbytes`` bytes from read-only stream ``rstream`` and store ## it to ``pbytes``. ## @@ -365,7 +367,7 @@ proc readExactly*(rstream: AsyncStreamReader, pbytes: pointer, raise exc except TransportIncompleteError: raise newAsyncStreamIncompleteError() - except CatchableError as exc: + except TransportError as exc: raise newAsyncStreamReadError(exc) else: if isNil(rstream.readerLoop): @@ -384,7 +386,8 @@ proc readExactly*(rstream: AsyncStreamReader, pbytes: pointer, (consumed: count, done: index == nbytes) proc readOnce*(rstream: AsyncStreamReader, pbytes: pointer, - nbytes: int): Future[int] {.async.} = + nbytes: int): Future[int] {. + async: (raises: [CancelledError, AsyncStreamError]).} = ## Perform one read operation on read-only stream ``rstream``. ## ## If internal buffer is not empty, ``nbytes`` bytes will be transferred from @@ -398,7 +401,7 @@ proc readOnce*(rstream: AsyncStreamReader, pbytes: pointer, return await readOnce(rstream.tsource, pbytes, nbytes) except CancelledError as exc: raise exc - except CatchableError as exc: + except TransportError as exc: raise newAsyncStreamReadError(exc) else: if isNil(rstream.readerLoop): @@ -415,7 +418,8 @@ proc readOnce*(rstream: AsyncStreamReader, pbytes: pointer, return count proc readUntil*(rstream: AsyncStreamReader, pbytes: pointer, nbytes: int, - sep: seq[byte]): Future[int] {.async.} = + sep: seq[byte]): Future[int] {. + async: (raises: [CancelledError, AsyncStreamError]).} = ## Read data from the read-only stream ``rstream`` until separator ``sep`` is ## found. ## @@ -446,7 +450,7 @@ proc readUntil*(rstream: AsyncStreamReader, pbytes: pointer, nbytes: int, raise newAsyncStreamIncompleteError() except TransportLimitError: raise newAsyncStreamLimitError() - except CatchableError as exc: + except TransportError as exc: raise newAsyncStreamReadError(exc) else: if isNil(rstream.readerLoop): @@ -476,7 +480,8 @@ proc readUntil*(rstream: AsyncStreamReader, pbytes: pointer, nbytes: int, return k proc readLine*(rstream: AsyncStreamReader, limit = 0, - sep = "\r\n"): Future[string] {.async.} = + sep = "\r\n"): Future[string] {. + async: (raises: [CancelledError, AsyncStreamError]).} = ## Read one line from read-only stream ``rstream``, where ``"line"`` is a ## sequence of bytes ending with ``sep`` (default is ``"\r\n"``). ## @@ -495,7 +500,7 @@ proc readLine*(rstream: AsyncStreamReader, limit = 0, return await readLine(rstream.tsource, limit, sep) except CancelledError as exc: raise exc - except CatchableError as exc: + except TransportError as exc: raise newAsyncStreamReadError(exc) else: if isNil(rstream.readerLoop): @@ -530,7 +535,8 @@ proc readLine*(rstream: AsyncStreamReader, limit = 0, (index, (state == len(sep)) or (lim == len(res))) return res -proc read*(rstream: AsyncStreamReader): Future[seq[byte]] {.async.} = +proc read*(rstream: AsyncStreamReader): Future[seq[byte]] {. + async: (raises: [CancelledError, AsyncStreamError]).} = ## Read all bytes from read-only stream ``rstream``. ## ## This procedure allocates buffer seq[byte] and return it as result. @@ -543,7 +549,7 @@ proc read*(rstream: AsyncStreamReader): Future[seq[byte]] {.async.} = raise exc except TransportLimitError: raise newAsyncStreamLimitError() - except CatchableError as exc: + except TransportError as exc: raise newAsyncStreamReadError(exc) else: if isNil(rstream.readerLoop): @@ -559,7 +565,8 @@ proc read*(rstream: AsyncStreamReader): Future[seq[byte]] {.async.} = (count, false) return res -proc read*(rstream: AsyncStreamReader, n: int): Future[seq[byte]] {.async.} = +proc read*(rstream: AsyncStreamReader, n: int): Future[seq[byte]] {. + async: (raises: [CancelledError, AsyncStreamError]).} = ## Read all bytes (n <= 0) or exactly `n` bytes from read-only stream ## ``rstream``. ## @@ -571,7 +578,7 @@ proc read*(rstream: AsyncStreamReader, n: int): Future[seq[byte]] {.async.} = return await read(rstream.tsource, n) except CancelledError as exc: raise exc - except CatchableError as exc: + except TransportError as exc: raise newAsyncStreamReadError(exc) else: if isNil(rstream.readerLoop): @@ -590,7 +597,8 @@ proc read*(rstream: AsyncStreamReader, n: int): Future[seq[byte]] {.async.} = (count, len(res) == n) return res -proc consume*(rstream: AsyncStreamReader): Future[int] {.async.} = +proc consume*(rstream: AsyncStreamReader): Future[int] {. + async: (raises: [CancelledError, AsyncStreamError]).} = ## Consume (discard) all bytes from read-only stream ``rstream``. ## ## Return number of bytes actually consumed (discarded). @@ -603,7 +611,7 @@ proc consume*(rstream: AsyncStreamReader): Future[int] {.async.} = raise exc except TransportLimitError: raise newAsyncStreamLimitError() - except CatchableError as exc: + except TransportError as exc: raise newAsyncStreamReadError(exc) else: if isNil(rstream.readerLoop): @@ -618,7 +626,8 @@ proc consume*(rstream: AsyncStreamReader): Future[int] {.async.} = (rstream.buffer.dataLen(), false) return res -proc consume*(rstream: AsyncStreamReader, n: int): Future[int] {.async.} = +proc consume*(rstream: AsyncStreamReader, n: int): Future[int] {. + async: (raises: [CancelledError, AsyncStreamError]).} = ## Consume (discard) all bytes (n <= 0) or ``n`` bytes from read-only stream ## ``rstream``. ## @@ -632,7 +641,7 @@ proc consume*(rstream: AsyncStreamReader, n: int): Future[int] {.async.} = raise exc except TransportLimitError: raise newAsyncStreamLimitError() - except CatchableError as exc: + except TransportError as exc: raise newAsyncStreamReadError(exc) else: if isNil(rstream.readerLoop): @@ -652,7 +661,7 @@ proc consume*(rstream: AsyncStreamReader, n: int): Future[int] {.async.} = return res proc readMessage*(rstream: AsyncStreamReader, pred: ReadMessagePredicate) {. - async.} = + async: (raises: [CancelledError, AsyncStreamError]).} = ## Read all bytes from stream ``rstream`` until ``predicate`` callback ## will not be satisfied. ## @@ -673,7 +682,7 @@ proc readMessage*(rstream: AsyncStreamReader, pred: ReadMessagePredicate) {. await readMessage(rstream.tsource, pred) except CancelledError as exc: raise exc - except CatchableError as exc: + except TransportError as exc: raise newAsyncStreamReadError(exc) else: if isNil(rstream.readerLoop): @@ -691,7 +700,8 @@ proc readMessage*(rstream: AsyncStreamReader, pred: ReadMessagePredicate) {. pred(rstream.buffer.buffer.toOpenArray(0, count - 1)) proc write*(wstream: AsyncStreamWriter, pbytes: pointer, - nbytes: int) {.async.} = + nbytes: int) {. + async: (raises: [CancelledError, AsyncStreamError]).} = ## Write sequence of bytes pointed by ``pbytes`` of length ``nbytes`` to ## writer stream ``wstream``. ## @@ -708,9 +718,7 @@ proc write*(wstream: AsyncStreamWriter, pbytes: pointer, res = await write(wstream.tsource, pbytes, nbytes) except CancelledError as exc: raise exc - except AsyncStreamError as exc: - raise exc - except CatchableError as exc: + except TransportError as exc: raise newAsyncStreamWriteError(exc) if res != nbytes: raise newAsyncStreamIncompleteError() @@ -720,23 +728,17 @@ proc write*(wstream: AsyncStreamWriter, pbytes: pointer, await write(wstream.wsource, pbytes, nbytes) wstream.bytesCount = wstream.bytesCount + uint64(nbytes) else: - var item = WriteItem(kind: Pointer) - item.dataPtr = pbytes - item.size = nbytes - item.future = newFuture[void]("async.stream.write(pointer)") - try: - await wstream.queue.put(item) - await item.future - wstream.bytesCount = wstream.bytesCount + uint64(item.size) - except CancelledError as exc: - raise exc - except AsyncStreamError as exc: - raise exc - except CatchableError as exc: - raise newAsyncStreamWriteError(exc) + let item = WriteItem( + kind: Pointer, dataPtr: pbytes, size: nbytes, + future: Future[void].Raising([CancelledError, AsyncStreamError]) + .init("async.stream.write(pointer)")) + await wstream.queue.put(item) + await item.future + wstream.bytesCount = wstream.bytesCount + uint64(item.size) proc write*(wstream: AsyncStreamWriter, sbytes: sink seq[byte], - msglen = -1) {.async.} = + msglen = -1) {. + async: (raises: [CancelledError, AsyncStreamError]).} = ## Write sequence of bytes ``sbytes`` of length ``msglen`` to writer ## stream ``wstream``. ## @@ -758,7 +760,7 @@ proc write*(wstream: AsyncStreamWriter, sbytes: sink seq[byte], res = await write(wstream.tsource, sbytes, length) except CancelledError as exc: raise exc - except CatchableError as exc: + except TransportError as exc: raise newAsyncStreamWriteError(exc) if res != length: raise newAsyncStreamIncompleteError() @@ -768,29 +770,17 @@ proc write*(wstream: AsyncStreamWriter, sbytes: sink seq[byte], await write(wstream.wsource, sbytes, length) wstream.bytesCount = wstream.bytesCount + uint64(length) else: - var item = WriteItem(kind: Sequence) - when declared(shallowCopy): - if not(isLiteral(sbytes)): - shallowCopy(item.dataSeq, sbytes) - else: - item.dataSeq = sbytes - else: - item.dataSeq = sbytes - item.size = length - item.future = newFuture[void]("async.stream.write(seq)") - try: - await wstream.queue.put(item) - await item.future - wstream.bytesCount = wstream.bytesCount + uint64(item.size) - except CancelledError as exc: - raise exc - except AsyncStreamError as exc: - raise exc - except CatchableError as exc: - raise newAsyncStreamWriteError(exc) + let item = WriteItem( + kind: Sequence, dataSeq: move(sbytes), size: length, + future: Future[void].Raising([CancelledError, AsyncStreamError]) + .init("async.stream.write(seq)")) + await wstream.queue.put(item) + await item.future + wstream.bytesCount = wstream.bytesCount + uint64(item.size) proc write*(wstream: AsyncStreamWriter, sbytes: sink string, - msglen = -1) {.async.} = + msglen = -1) {. + async: (raises: [CancelledError, AsyncStreamError]).} = ## Write string ``sbytes`` of length ``msglen`` to writer stream ``wstream``. ## ## String ``sbytes`` must not be zero-length. @@ -811,7 +801,7 @@ proc write*(wstream: AsyncStreamWriter, sbytes: sink string, res = await write(wstream.tsource, sbytes, length) except CancelledError as exc: raise exc - except CatchableError as exc: + except TransportError as exc: raise newAsyncStreamWriteError(exc) if res != length: raise newAsyncStreamIncompleteError() @@ -821,28 +811,16 @@ proc write*(wstream: AsyncStreamWriter, sbytes: sink string, await write(wstream.wsource, sbytes, length) wstream.bytesCount = wstream.bytesCount + uint64(length) else: - var item = WriteItem(kind: String) - when declared(shallowCopy): - if not(isLiteral(sbytes)): - shallowCopy(item.dataStr, sbytes) - else: - item.dataStr = sbytes - else: - item.dataStr = sbytes - item.size = length - item.future = newFuture[void]("async.stream.write(string)") - try: - await wstream.queue.put(item) - await item.future - wstream.bytesCount = wstream.bytesCount + uint64(item.size) - except CancelledError as exc: - raise exc - except AsyncStreamError as exc: - raise exc - except CatchableError as exc: - raise newAsyncStreamWriteError(exc) + let item = WriteItem( + kind: String, dataStr: move(sbytes), size: length, + future: Future[void].Raising([CancelledError, AsyncStreamError]) + .init("async.stream.write(string)")) + await wstream.queue.put(item) + await item.future + wstream.bytesCount = wstream.bytesCount + uint64(item.size) -proc finish*(wstream: AsyncStreamWriter) {.async.} = +proc finish*(wstream: AsyncStreamWriter) {. + async: (raises: [CancelledError, AsyncStreamError]).} = ## Finish write stream ``wstream``. checkStreamClosed(wstream) # For AsyncStreamWriter Finished state could be set manually or by stream's @@ -852,20 +830,15 @@ proc finish*(wstream: AsyncStreamWriter) {.async.} = if isNil(wstream.writerLoop): await wstream.wsource.finish() else: - var item = WriteItem(kind: Pointer) - item.size = 0 - item.future = newFuture[void]("async.stream.finish") - try: - await wstream.queue.put(item) - await item.future - except CancelledError as exc: - raise exc - except AsyncStreamError as exc: - raise exc - except CatchableError as exc: - raise newAsyncStreamWriteError(exc) + let item = WriteItem( + kind: Pointer, size: 0, + future: Future[void].Raising([CancelledError, AsyncStreamError]) + .init("async.stream.finish")) + await wstream.queue.put(item) + await item.future -proc join*(rw: AsyncStreamRW): Future[void] = +proc join*(rw: AsyncStreamRW): Future[void] {. + async: (raw: true, raises: [CancelledError]).} = ## Get Future[void] which will be completed when stream become finished or ## closed. when rw is AsyncStreamReader: @@ -924,7 +897,8 @@ proc close*(rw: AsyncStreamRW) = rw.future.addCallback(continuation) rw.future.cancelSoon() -proc closeWait*(rw: AsyncStreamRW): Future[void] = +proc closeWait*(rw: AsyncStreamRW): Future[void] {. + async: (raw: true, raises: []).} = ## Close and frees resources of stream ``rw``. const FutureName = when rw is AsyncStreamReader: @@ -932,25 +906,20 @@ proc closeWait*(rw: AsyncStreamRW): Future[void] = else: "async.stream.writer.closeWait" - if rw.closed(): - return Future.completed(FutureName) + let retFuture = Future[void].Raising([]).init(FutureName) - let retFuture = newFuture[void](FutureName, {FutureFlag.OwnCancelSchedule}) + if rw.closed(): + retFuture.complete() + return retFuture proc continuation(udata: pointer) {.gcsafe, raises:[].} = retFuture.complete() - proc cancellation(udata: pointer) {.gcsafe, raises:[].} = - # We are not going to change the state of `retFuture` to cancelled, so we - # will prevent the entire sequence of Futures from being cancelled. - discard - rw.close() if rw.future.finished(): retFuture.complete() else: rw.future.addCallback(continuation, cast[pointer](retFuture)) - retFuture.cancelCallback = cancellation retFuture proc startReader(rstream: AsyncStreamReader) = diff --git a/chronos/streams/boundstream.nim b/chronos/streams/boundstream.nim index dbb36ef0..ce695719 100644 --- a/chronos/streams/boundstream.nim +++ b/chronos/streams/boundstream.nim @@ -14,6 +14,9 @@ ## ## For stream writing it means that you should write exactly bounded size ## of bytes. + +{.push raises: [].} + import results import ../asyncloop, ../timer import asyncstream, ../transports/stream, ../transports/common @@ -52,7 +55,8 @@ template newBoundedStreamOverflowError(): ref BoundedStreamOverflowError = newException(BoundedStreamOverflowError, "Stream boundary exceeded") proc readUntilBoundary(rstream: AsyncStreamReader, pbytes: pointer, - nbytes: int, sep: seq[byte]): Future[int] {.async.} = + nbytes: int, sep: seq[byte]): Future[int] {. + async: (raises: [CancelledError, AsyncStreamError]).} = doAssert(not(isNil(pbytes)), "pbytes must not be nil") doAssert(nbytes >= 0, "nbytes must be non-negative value") checkStreamClosed(rstream) @@ -96,7 +100,7 @@ func endsWith(s, suffix: openArray[byte]): bool = inc(i) if i >= len(suffix): return true -proc boundedReadLoop(stream: AsyncStreamReader) {.async.} = +proc boundedReadLoop(stream: AsyncStreamReader) {.async: (raises: []).} = var rstream = BoundedStreamReader(stream) rstream.state = AsyncStreamState.Running var buffer = newSeq[byte](rstream.buffer.bufferLen()) @@ -186,12 +190,16 @@ proc boundedReadLoop(stream: AsyncStreamReader) {.async.} = break of AsyncStreamState.Finished: # Send `EOF` state to the consumer and wait until it will be received. - await rstream.buffer.transfer() + try: + await rstream.buffer.transfer() + except CancelledError: + rstream.state = AsyncStreamState.Error + rstream.error = newBoundedStreamIncompleteError() break of AsyncStreamState.Closing, AsyncStreamState.Closed: break -proc boundedWriteLoop(stream: AsyncStreamWriter) {.async.} = +proc boundedWriteLoop(stream: AsyncStreamWriter) {.async: (raises: []).} = var error: ref AsyncStreamError var wstream = BoundedStreamWriter(stream) @@ -255,7 +263,11 @@ proc boundedWriteLoop(stream: AsyncStreamWriter) {.async.} = doAssert(not(isNil(error))) while not(wstream.queue.empty()): - let item = wstream.queue.popFirstNoWait() + let item = + try: + wstream.queue.popFirstNoWait() + except AsyncQueueEmptyError: + raiseAssert "AsyncQueue should not be empty at this moment" if not(item.future.finished()): item.future.fail(error) diff --git a/chronos/streams/chunkstream.nim b/chronos/streams/chunkstream.nim index c0269a2a..77392076 100644 --- a/chronos/streams/chunkstream.nim +++ b/chronos/streams/chunkstream.nim @@ -8,6 +8,9 @@ # MIT license (LICENSE-MIT) ## This module implements HTTP/1.1 chunked-encoded stream reading and writing. + +{.push raises: [].} + import ../asyncloop, ../timer import asyncstream, ../transports/stream, ../transports/common import results @@ -95,7 +98,7 @@ proc setChunkSize(buffer: var openArray[byte], length: int64): int = buffer[c + 1] = byte(0x0A) (c + 2) -proc chunkedReadLoop(stream: AsyncStreamReader) {.async.} = +proc chunkedReadLoop(stream: AsyncStreamReader) {.async: (raises: []).} = var rstream = ChunkedStreamReader(stream) var buffer = newSeq[byte](MaxChunkHeaderSize) rstream.state = AsyncStreamState.Running @@ -156,6 +159,10 @@ proc chunkedReadLoop(stream: AsyncStreamReader) {.async.} = if rstream.state == AsyncStreamState.Running: rstream.state = AsyncStreamState.Error rstream.error = exc + except AsyncStreamError as exc: + if rstream.state == AsyncStreamState.Running: + rstream.state = AsyncStreamState.Error + rstream.error = exc if rstream.state != AsyncStreamState.Running: # We need to notify consumer about error/close, but we do not care about @@ -163,7 +170,7 @@ proc chunkedReadLoop(stream: AsyncStreamReader) {.async.} = rstream.buffer.forget() break -proc chunkedWriteLoop(stream: AsyncStreamWriter) {.async.} = +proc chunkedWriteLoop(stream: AsyncStreamWriter) {.async: (raises: []).} = var wstream = ChunkedStreamWriter(stream) var buffer: array[16, byte] var error: ref AsyncStreamError @@ -220,7 +227,11 @@ proc chunkedWriteLoop(stream: AsyncStreamWriter) {.async.} = if not(item.future.finished()): item.future.fail(error) while not(wstream.queue.empty()): - let pitem = wstream.queue.popFirstNoWait() + let pitem = + try: + wstream.queue.popFirstNoWait() + except AsyncQueueEmptyError: + raiseAssert "AsyncQueue should not be empty at this moment" if not(pitem.future.finished()): pitem.future.fail(error) break diff --git a/chronos/streams/tlsstream.nim b/chronos/streams/tlsstream.nim index 0c8efb94..26f2babe 100644 --- a/chronos/streams/tlsstream.nim +++ b/chronos/streams/tlsstream.nim @@ -9,6 +9,9 @@ ## This module implements Transport Layer Security (TLS) stream. This module ## uses sources of BearSSL by Thomas Pornin. + +{.push raises: [].} + import bearssl/[brssl, ec, errors, pem, rsa, ssl, x509], bearssl/certs/cacert @@ -71,7 +74,7 @@ type scontext: ptr SslServerContext stream*: TLSAsyncStream handshaked*: bool - handshakeFut*: Future[void] + handshakeFut*: Future[void].Raising([CancelledError, AsyncStreamError]) TLSStreamReader* = ref object of AsyncStreamReader case kind: TLSStreamKind @@ -81,7 +84,7 @@ type scontext: ptr SslServerContext stream*: TLSAsyncStream handshaked*: bool - handshakeFut*: Future[void] + handshakeFut*: Future[void].Raising([CancelledError, AsyncStreamError]) TLSAsyncStream* = ref object of RootRef xwc*: X509NoanchorContext @@ -91,7 +94,7 @@ type x509*: X509MinimalContext reader*: TLSStreamReader writer*: TLSStreamWriter - mainLoop*: Future[void] + mainLoop*: Future[void].Raising([]) trustAnchors: TrustAnchorStore SomeTLSStreamType* = TLSStreamReader|TLSStreamWriter|TLSAsyncStream @@ -101,9 +104,7 @@ type TLSStreamHandshakeError* = object of TLSStreamError TLSStreamInitError* = object of TLSStreamError TLSStreamReadError* = object of TLSStreamError - par*: ref AsyncStreamError TLSStreamWriteError* = object of TLSStreamError - par*: ref AsyncStreamError TLSStreamProtocolError* = object of TLSStreamError errCode*: int @@ -111,7 +112,7 @@ proc newTLSStreamWriteError(p: ref AsyncStreamError): ref TLSStreamWriteError {. noinline.} = var w = newException(TLSStreamWriteError, "Write stream failed") w.msg = w.msg & ", originated from [" & $p.name & "] " & p.msg - w.par = p + w.parent = p w template newTLSStreamProtocolImpl[T](message: T): ref TLSStreamProtocolError = @@ -137,7 +138,8 @@ template newTLSUnexpectedProtocolError(): ref TLSStreamProtocolError = proc newTLSStreamProtocolError[T](message: T): ref TLSStreamProtocolError = newTLSStreamProtocolImpl(message) -proc raiseTLSStreamProtocolError[T](message: T) {.noreturn, noinline.} = +proc raiseTLSStreamProtocolError[T](message: T) {. + noreturn, noinline, raises: [TLSStreamProtocolError].} = raise newTLSStreamProtocolImpl(message) proc new*(T: typedesc[TrustAnchorStore], @@ -150,7 +152,8 @@ proc new*(T: typedesc[TrustAnchorStore], TrustAnchorStore(anchors: res) proc tlsWriteRec(engine: ptr SslEngineContext, - writer: TLSStreamWriter): Future[TLSResult] {.async.} = + writer: TLSStreamWriter): Future[TLSResult] {. + async: (raises: []).} = try: var length = 0'u var buf = sslEngineSendrecBuf(engine[], length) @@ -168,7 +171,8 @@ proc tlsWriteRec(engine: ptr SslEngineContext, TLSResult.Stopped proc tlsWriteApp(engine: ptr SslEngineContext, - writer: TLSStreamWriter): Future[TLSResult] {.async.} = + writer: TLSStreamWriter): Future[TLSResult] {. + async: (raises: []).} = try: var item = await writer.queue.get() if item.size > 0: @@ -192,7 +196,10 @@ proc tlsWriteApp(engine: ptr SslEngineContext, # only part of item and adjust offset. item.offset = item.offset + int(length) item.size = item.size - int(length) - writer.queue.addFirstNoWait(item) + try: + writer.queue.addFirstNoWait(item) + except AsyncQueueFullError: + raiseAssert "AsyncQueue should not be full at this moment" sslEngineSendappAck(engine[], length) TLSResult.Success else: @@ -205,7 +212,8 @@ proc tlsWriteApp(engine: ptr SslEngineContext, TLSResult.Stopped proc tlsReadRec(engine: ptr SslEngineContext, - reader: TLSStreamReader): Future[TLSResult] {.async.} = + reader: TLSStreamReader): Future[TLSResult] {. + async: (raises: []).} = try: var length = 0'u var buf = sslEngineRecvrecBuf(engine[], length) @@ -226,7 +234,8 @@ proc tlsReadRec(engine: ptr SslEngineContext, TLSResult.Stopped proc tlsReadApp(engine: ptr SslEngineContext, - reader: TLSStreamReader): Future[TLSResult] {.async.} = + reader: TLSStreamReader): Future[TLSResult] {. + async: (raises: []).} = try: var length = 0'u var buf = sslEngineRecvappBuf(engine[], length) @@ -240,7 +249,7 @@ proc tlsReadApp(engine: ptr SslEngineContext, template readAndReset(fut: untyped) = if fut.finished(): - let res = fut.read() + let res = fut.value() case res of TLSResult.Success, TLSResult.WriteEof, TLSResult.Stopped: fut = nil @@ -256,18 +265,6 @@ template readAndReset(fut: untyped) = loopState = AsyncStreamState.Finished break -proc cancelAndWait*(a, b, c, d: Future[TLSResult]): Future[void] = - var waiting: seq[FutureBase] - if not(isNil(a)) and not(a.finished()): - waiting.add(a.cancelAndWait()) - if not(isNil(b)) and not(b.finished()): - waiting.add(b.cancelAndWait()) - if not(isNil(c)) and not(c.finished()): - waiting.add(c.cancelAndWait()) - if not(isNil(d)) and not(d.finished()): - waiting.add(d.cancelAndWait()) - allFutures(waiting) - proc dumpState*(state: cuint): string = var res = "" if (state and SSL_CLOSED) == SSL_CLOSED: @@ -287,10 +284,10 @@ proc dumpState*(state: cuint): string = res.add("SSL_RECVAPP") "{" & res & "}" -proc tlsLoop*(stream: TLSAsyncStream) {.async.} = +proc tlsLoop*(stream: TLSAsyncStream) {.async: (raises: []).} = var - sendRecFut, sendAppFut: Future[TLSResult] - recvRecFut, recvAppFut: Future[TLSResult] + sendRecFut, sendAppFut: Future[TLSResult].Raising([]) + recvRecFut, recvAppFut: Future[TLSResult].Raising([]) let engine = case stream.reader.kind @@ -302,7 +299,7 @@ proc tlsLoop*(stream: TLSAsyncStream) {.async.} = var loopState = AsyncStreamState.Running while true: - var waiting: seq[Future[TLSResult]] + var waiting: seq[Future[TLSResult].Raising([])] var state = sslEngineCurrentState(engine[]) if (state and SSL_CLOSED) == SSL_CLOSED: @@ -353,6 +350,8 @@ proc tlsLoop*(stream: TLSAsyncStream) {.async.} = if len(waiting) > 0: try: discard await one(waiting) + except ValueError: + raiseAssert "array should not be empty at this moment" except CancelledError: if loopState == AsyncStreamState.Running: loopState = AsyncStreamState.Stopped @@ -360,8 +359,18 @@ proc tlsLoop*(stream: TLSAsyncStream) {.async.} = if loopState != AsyncStreamState.Running: break - # Cancelling and waiting all the pending operations - await cancelAndWait(sendRecFut, sendAppFut, recvRecFut, recvAppFut) + # Cancelling and waiting and all the pending operations + var pending: seq[FutureBase] + if not(isNil(sendRecFut)) and not(sendRecFut.finished()): + pending.add(sendRecFut.cancelAndWait()) + if not(isNil(sendAppFut)) and not(sendAppFut.finished()): + pending.add(sendAppFut.cancelAndWait()) + if not(isNil(recvRecFut)) and not(recvRecFut.finished()): + pending.add(recvRecFut.cancelAndWait()) + if not(isNil(recvAppFut)) and not(recvAppFut.finished()): + pending.add(recvAppFut.cancelAndWait()) + await noCancel(allFutures(pending)) + # Calculating error let error = case loopState @@ -395,7 +404,11 @@ proc tlsLoop*(stream: TLSAsyncStream) {.async.} = if not(isNil(error)): # Completing all pending writes while(not(stream.writer.queue.empty())): - let item = stream.writer.queue.popFirstNoWait() + let item = + try: + stream.writer.queue.popFirstNoWait() + except AsyncQueueEmptyError: + raiseAssert "AsyncQueue should not be empty at this moment" if not(item.future.finished()): item.future.fail(error) # Completing handshake @@ -415,18 +428,18 @@ proc tlsLoop*(stream: TLSAsyncStream) {.async.} = # Completing readers stream.reader.buffer.forget() -proc tlsWriteLoop(stream: AsyncStreamWriter) {.async.} = +proc tlsWriteLoop(stream: AsyncStreamWriter) {.async: (raises: []).} = var wstream = TLSStreamWriter(stream) wstream.state = AsyncStreamState.Running - await sleepAsync(0.milliseconds) + await noCancel(sleepAsync(0.milliseconds)) if isNil(wstream.stream.mainLoop): wstream.stream.mainLoop = tlsLoop(wstream.stream) await wstream.stream.mainLoop -proc tlsReadLoop(stream: AsyncStreamReader) {.async.} = +proc tlsReadLoop(stream: AsyncStreamReader) {.async: (raises: []).} = var rstream = TLSStreamReader(stream) rstream.state = AsyncStreamState.Running - await sleepAsync(0.milliseconds) + await noCancel(sleepAsync(0.milliseconds)) if isNil(rstream.stream.mainLoop): rstream.stream.mainLoop = tlsLoop(rstream.stream) await rstream.stream.mainLoop @@ -451,7 +464,7 @@ proc newTLSClientAsyncStream*( maxVersion = TLSVersion.TLS12, flags: set[TLSFlags] = {}, trustAnchors: SomeTrustAnchorType = MozillaTrustAnchors - ): TLSAsyncStream = + ): TLSAsyncStream {.raises: [TLSStreamInitError].} = ## Create new TLS asynchronous stream for outbound (client) connections ## using reading stream ``rsource`` and writing stream ``wsource``. ## @@ -541,7 +554,8 @@ proc newTLSServerAsyncStream*(rsource: AsyncStreamReader, minVersion = TLSVersion.TLS11, maxVersion = TLSVersion.TLS12, cache: TLSSessionCache = nil, - flags: set[TLSFlags] = {}): TLSAsyncStream = + flags: set[TLSFlags] = {}): TLSAsyncStream {. + raises: [TLSStreamInitError, TLSStreamProtocolError].} = ## Create new TLS asynchronous stream for inbound (server) connections ## using reading stream ``rsource`` and writing stream ``wsource``. ## @@ -609,10 +623,8 @@ proc newTLSServerAsyncStream*(rsource: AsyncStreamReader, if err == 0: raise newException(TLSStreamInitError, "Could not initialize TLS layer") - init(AsyncStreamWriter(res.writer), wsource, tlsWriteLoop, - bufferSize) - init(AsyncStreamReader(res.reader), rsource, tlsReadLoop, - bufferSize) + init(AsyncStreamWriter(res.writer), wsource, tlsWriteLoop, bufferSize) + init(AsyncStreamReader(res.reader), rsource, tlsReadLoop, bufferSize) res proc copyKey(src: RsaPrivateKey): TLSPrivateKey = @@ -653,7 +665,8 @@ proc copyKey(src: EcPrivateKey): TLSPrivateKey = res.eckey.curve = src.curve res -proc init*(tt: typedesc[TLSPrivateKey], data: openArray[byte]): TLSPrivateKey = +proc init*(tt: typedesc[TLSPrivateKey], data: openArray[byte]): TLSPrivateKey {. + raises: [TLSStreamProtocolError].} = ## Initialize TLS private key from array of bytes ``data``. ## ## This procedure initializes private key using raw, DER-encoded format, @@ -676,7 +689,8 @@ proc init*(tt: typedesc[TLSPrivateKey], data: openArray[byte]): TLSPrivateKey = raiseTLSStreamProtocolError("Unknown key type (" & $keyType & ")") res -proc pemDecode*(data: openArray[char]): seq[PEMElement] = +proc pemDecode*(data: openArray[char]): seq[PEMElement] {. + raises: [TLSStreamProtocolError].} = ## Decode PEM encoded string and get array of binary blobs. if len(data) == 0: raiseTLSStreamProtocolError("Empty PEM message") @@ -717,7 +731,8 @@ proc pemDecode*(data: openArray[char]): seq[PEMElement] = raiseTLSStreamProtocolError("Invalid PEM encoding") res -proc init*(tt: typedesc[TLSPrivateKey], data: openArray[char]): TLSPrivateKey = +proc init*(tt: typedesc[TLSPrivateKey], data: openArray[char]): TLSPrivateKey {. + raises: [TLSStreamProtocolError].} = ## Initialize TLS private key from string ``data``. ## ## This procedure initializes private key using unencrypted PKCS#8 PEM @@ -735,7 +750,8 @@ proc init*(tt: typedesc[TLSPrivateKey], data: openArray[char]): TLSPrivateKey = res proc init*(tt: typedesc[TLSCertificate], - data: openArray[char]): TLSCertificate = + data: openArray[char]): TLSCertificate {. + raises: [TLSStreamProtocolError].} = ## Initialize TLS certificates from string ``data``. ## ## This procedure initializes array of certificates from PEM encoded string. @@ -770,9 +786,11 @@ proc init*(tt: typedesc[TLSSessionCache], size: int = 4096): TLSSessionCache = sslSessionCacheLruInit(addr res.context, addr res.storage[0], rsize) res -proc handshake*(rws: SomeTLSStreamType): Future[void] = +proc handshake*(rws: SomeTLSStreamType): Future[void] {. + async: (raw: true, raises: [CancelledError, AsyncStreamError]).} = ## Wait until initial TLS handshake will be successfully performed. - var retFuture = newFuture[void]("tlsstream.handshake") + let retFuture = Future[void].Raising([CancelledError, AsyncStreamError]) + .init("tlsstream.handshake") when rws is TLSStreamReader: if rws.handshaked: retFuture.complete() diff --git a/chronos/transports/stream.nim b/chronos/transports/stream.nim index bdcb8d7b..f2e7a586 100644 --- a/chronos/transports/stream.nim +++ b/chronos/transports/stream.nim @@ -140,7 +140,7 @@ type # transport for new client proc remoteAddress*(transp: StreamTransport): TransportAddress {. - raises: [TransportAbortedError, TransportTooManyError, TransportOsError].} = + raises: [TransportOsError].} = ## Returns ``transp`` remote socket address. doAssert(transp.kind == TransportKind.Socket, "Socket transport required!") if transp.remote.family == AddressFamily.None: @@ -148,12 +148,12 @@ proc remoteAddress*(transp: StreamTransport): TransportAddress {. var slen = SockLen(sizeof(saddr)) if getpeername(SocketHandle(transp.fd), cast[ptr SockAddr](addr saddr), addr slen) != 0: - raiseTransportError(osLastError()) + raiseTransportOsError(osLastError()) fromSAddr(addr saddr, slen, transp.remote) transp.remote proc localAddress*(transp: StreamTransport): TransportAddress {. - raises: [TransportAbortedError, TransportTooManyError, TransportOsError].} = + raises: [TransportOsError].} = ## Returns ``transp`` local socket address. doAssert(transp.kind == TransportKind.Socket, "Socket transport required!") if transp.local.family == AddressFamily.None: @@ -161,7 +161,7 @@ proc localAddress*(transp: StreamTransport): TransportAddress {. var slen = SockLen(sizeof(saddr)) if getsockname(SocketHandle(transp.fd), cast[ptr SockAddr](addr saddr), addr slen) != 0: - raiseTransportError(osLastError()) + raiseTransportOsError(osLastError()) fromSAddr(addr saddr, slen, transp.local) transp.local