diff --git a/chronos/asyncsync.nim b/chronos/asyncsync.nim index fa23471a..9bab1fd6 100644 --- a/chronos/asyncsync.nim +++ b/chronos/asyncsync.nim @@ -28,7 +28,7 @@ type ## is blocked in ``acquire()`` is being processed. locked: bool acquired: bool - waiters: seq[Future[void]] + waiters: seq[Future[void].Raising([CancelledError])] AsyncEvent* = ref object of RootRef ## A primitive event object. @@ -41,7 +41,7 @@ type ## state to be signaled, when event get fired, then all coroutines ## continue proceeds in order, they have entered waiting state. flag: bool - waiters: seq[Future[void]] + waiters: seq[Future[void].Raising([CancelledError])] AsyncQueue*[T] = ref object of RootRef ## A queue, useful for coordinating producer and consumer coroutines. @@ -50,8 +50,8 @@ type ## infinite. If it is an integer greater than ``0``, then "await put()" ## will block when the queue reaches ``maxsize``, until an item is ## removed by "await get()". - getters: seq[Future[void]] - putters: seq[Future[void]] + getters: seq[Future[void].Raising([CancelledError])] + putters: seq[Future[void].Raising([CancelledError])] queue: Deque[T] maxsize: int @@ -69,7 +69,7 @@ type EventQueueReader* = object key: EventQueueKey offset: int - waiter: Future[void] + waiter: Future[void].Raising([CancelledError]) overflow: bool AsyncEventQueue*[T] = ref object of RootObj @@ -90,17 +90,14 @@ proc newAsyncLock*(): AsyncLock = ## The ``release()`` procedure changes the state to unlocked and returns ## immediately. - # Workaround for callSoon() not worked correctly before - # getThreadDispatcher() call. - discard getThreadDispatcher() - AsyncLock(waiters: newSeq[Future[void]](), locked: false, acquired: false) + AsyncLock() proc wakeUpFirst(lock: AsyncLock): bool {.inline.} = ## Wake up the first waiter if it isn't done. var i = 0 var res = false while i < len(lock.waiters): - var waiter = lock.waiters[i] + let waiter = lock.waiters[i] inc(i) if not(waiter.finished()): waiter.complete() @@ -120,7 +117,7 @@ proc checkAll(lock: AsyncLock): bool {.inline.} = return false return true -proc acquire*(lock: AsyncLock) {.async.} = +proc acquire*(lock: AsyncLock) {.async: (raises: [CancelledError]).} = ## Acquire a lock ``lock``. ## ## This procedure blocks until the lock ``lock`` is unlocked, then sets it @@ -129,7 +126,7 @@ proc acquire*(lock: AsyncLock) {.async.} = lock.acquired = true lock.locked = true else: - var w = newFuture[void]("AsyncLock.acquire") + let w = Future[void].Raising([CancelledError]).init("AsyncLock.acquire") lock.waiters.add(w) await w lock.acquired = true @@ -165,13 +162,10 @@ proc newAsyncEvent*(): AsyncEvent = ## procedure and reset to `false` with the `clear()` procedure. ## The `wait()` procedure blocks until the flag is `true`. The flag is ## initially `false`. + AsyncEvent() - # Workaround for callSoon() not worked correctly before - # getThreadDispatcher() call. - discard getThreadDispatcher() - AsyncEvent(waiters: newSeq[Future[void]](), flag: false) - -proc wait*(event: AsyncEvent): Future[void] = +proc wait*(event: AsyncEvent): Future[void] {. + async: (raw: true, raises: [CancelledError]).} = ## Block until the internal flag of ``event`` is `true`. ## If the internal flag is `true` on entry, return immediately. Otherwise, ## block until another task calls `fire()` to set the flag to `true`, @@ -210,20 +204,15 @@ proc isSet*(event: AsyncEvent): bool = proc newAsyncQueue*[T](maxsize: int = 0): AsyncQueue[T] = ## Creates a new asynchronous queue ``AsyncQueue``. - # Workaround for callSoon() not worked correctly before - # getThreadDispatcher() call. - discard getThreadDispatcher() AsyncQueue[T]( - getters: newSeq[Future[void]](), - putters: newSeq[Future[void]](), queue: initDeque[T](), maxsize: maxsize ) -proc wakeupNext(waiters: var seq[Future[void]]) {.inline.} = +proc wakeupNext(waiters: var seq) {.inline.} = var i = 0 while i < len(waiters): - var waiter = waiters[i] + let waiter = waiters[i] inc(i) if not(waiter.finished()): @@ -250,6 +239,24 @@ proc empty*[T](aq: AsyncQueue[T]): bool {.inline.} = ## Return ``true`` if the queue is empty, ``false`` otherwise. (len(aq.queue) == 0) +proc addFirstImpl[T](aq: AsyncQueue[T], item: T) = + aq.queue.addFirst(item) + aq.getters.wakeupNext() + +proc addLastImpl[T](aq: AsyncQueue[T], item: T) = + aq.queue.addLast(item) + aq.getters.wakeupNext() + +proc popFirstImpl[T](aq: AsyncQueue[T]): T = + let res = aq.queue.popFirst() + aq.putters.wakeupNext() + res + +proc popLastImpl[T](aq: AsyncQueue[T]): T = + let res = aq.queue.popLast() + aq.putters.wakeupNext() + res + proc addFirstNoWait*[T](aq: AsyncQueue[T], item: T) {. raises: [AsyncQueueFullError].}= ## Put an item ``item`` to the beginning of the queue ``aq`` immediately. @@ -257,8 +264,7 @@ proc addFirstNoWait*[T](aq: AsyncQueue[T], item: T) {. ## If queue ``aq`` is full, then ``AsyncQueueFullError`` exception raised. if aq.full(): raise newException(AsyncQueueFullError, "AsyncQueue is full!") - aq.queue.addFirst(item) - aq.getters.wakeupNext() + aq.addFirstImpl(item) proc addLastNoWait*[T](aq: AsyncQueue[T], item: T) {. raises: [AsyncQueueFullError].}= @@ -267,8 +273,7 @@ proc addLastNoWait*[T](aq: AsyncQueue[T], item: T) {. ## If queue ``aq`` is full, then ``AsyncQueueFullError`` exception raised. if aq.full(): raise newException(AsyncQueueFullError, "AsyncQueue is full!") - aq.queue.addLast(item) - aq.getters.wakeupNext() + aq.addLastImpl(item) proc popFirstNoWait*[T](aq: AsyncQueue[T]): T {. raises: [AsyncQueueEmptyError].} = @@ -277,9 +282,7 @@ proc popFirstNoWait*[T](aq: AsyncQueue[T]): T {. ## If queue ``aq`` is empty, then ``AsyncQueueEmptyError`` exception raised. if aq.empty(): raise newException(AsyncQueueEmptyError, "AsyncQueue is empty!") - let res = aq.queue.popFirst() - aq.putters.wakeupNext() - res + aq.popFirstImpl() proc popLastNoWait*[T](aq: AsyncQueue[T]): T {. raises: [AsyncQueueEmptyError].} = @@ -288,65 +291,63 @@ proc popLastNoWait*[T](aq: AsyncQueue[T]): T {. ## If queue ``aq`` is empty, then ``AsyncQueueEmptyError`` exception raised. if aq.empty(): raise newException(AsyncQueueEmptyError, "AsyncQueue is empty!") - let res = aq.queue.popLast() - aq.putters.wakeupNext() - res + aq.popLastImpl() -proc addFirst*[T](aq: AsyncQueue[T], item: T) {.async.} = +proc addFirst*[T](aq: AsyncQueue[T], item: T) {.async: (raises: [CancelledError]).} = ## Put an ``item`` to the beginning of the queue ``aq``. If the queue is full, ## wait until a free slot is available before adding item. while aq.full(): - var putter = newFuture[void]("AsyncQueue.addFirst") + let putter = Future[void].Raising([CancelledError]).init("AsyncQueue.addFirst") aq.putters.add(putter) try: await putter - except CatchableError as exc: + except CancelledError as exc: if not(aq.full()) and not(putter.cancelled()): aq.putters.wakeupNext() raise exc - aq.addFirstNoWait(item) + aq.addFirstImpl(item) -proc addLast*[T](aq: AsyncQueue[T], item: T) {.async.} = +proc addLast*[T](aq: AsyncQueue[T], item: T) {.async: (raises: [CancelledError]).} = ## Put an ``item`` to the end of the queue ``aq``. If the queue is full, ## wait until a free slot is available before adding item. while aq.full(): - var putter = newFuture[void]("AsyncQueue.addLast") + let putter = Future[void].Raising([CancelledError]).init("AsyncQueue.addLast") aq.putters.add(putter) try: await putter - except CatchableError as exc: + except CancelledError as exc: if not(aq.full()) and not(putter.cancelled()): aq.putters.wakeupNext() raise exc - aq.addLastNoWait(item) + aq.addLastImpl(item) -proc popFirst*[T](aq: AsyncQueue[T]): Future[T] {.async.} = +proc popFirst*[T](aq: AsyncQueue[T]): Future[T] {.async: (raises: [CancelledError]).} = ## Remove and return an ``item`` from the beginning of the queue ``aq``. ## If the queue is empty, wait until an item is available. while aq.empty(): - var getter = newFuture[void]("AsyncQueue.popFirst") + let getter = Future[void].Raising([CancelledError]).init("AsyncQueue.popFirst") aq.getters.add(getter) try: await getter - except CatchableError as exc: + except CancelledError as exc: if not(aq.empty()) and not(getter.cancelled()): aq.getters.wakeupNext() raise exc - return aq.popFirstNoWait() + aq.popFirstImpl() -proc popLast*[T](aq: AsyncQueue[T]): Future[T] {.async.} = +proc popLast*[T](aq: AsyncQueue[T]): Future[T] {.async: (raises: [CancelledError]).} = ## Remove and return an ``item`` from the end of the queue ``aq``. ## If the queue is empty, wait until an item is available. while aq.empty(): - var getter = newFuture[void]("AsyncQueue.popLast") + let getter = Future[void].Raising([CancelledError]).init("AsyncQueue.popLast") aq.getters.add(getter) try: await getter - except CatchableError as exc: + except CancelledError as exc: if not(aq.empty()) and not(getter.cancelled()): aq.getters.wakeupNext() raise exc - return aq.popLastNoWait() + aq.popLastImpl() proc putNoWait*[T](aq: AsyncQueue[T], item: T) {. raises: [AsyncQueueFullError].} = @@ -358,11 +359,13 @@ proc getNoWait*[T](aq: AsyncQueue[T]): T {. ## Alias of ``popFirstNoWait()``. aq.popFirstNoWait() -proc put*[T](aq: AsyncQueue[T], item: T): Future[void] {.inline.} = +proc put*[T](aq: AsyncQueue[T], item: T): Future[void] {. + async: (raw: true, raises: [CancelledError]).} = ## Alias of ``addLast()``. aq.addLast(item) -proc get*[T](aq: AsyncQueue[T]): Future[T] {.inline.} = +proc get*[T](aq: AsyncQueue[T]): Future[T] {. + async: (raw: true, raises: [CancelledError]).} = ## Alias of ``popFirst()``. aq.popFirst() @@ -416,7 +419,7 @@ proc contains*[T](aq: AsyncQueue[T], item: T): bool {.inline.} = ## via the ``in`` operator. for e in aq.queue.items(): if e == item: return true - return false + false proc `$`*[T](aq: AsyncQueue[T]): string = ## Turn an async queue ``aq`` into its string representation. @@ -452,8 +455,7 @@ proc compact(ab: AsyncEventQueue) {.raises: [].} = else: ab.queue.clear() -proc getReaderIndex(ab: AsyncEventQueue, key: EventQueueKey): int {. - raises: [].} = +proc getReaderIndex(ab: AsyncEventQueue, key: EventQueueKey): int = for index, value in ab.readers.pairs(): if value.key == key: return index @@ -507,7 +509,7 @@ proc close*(ab: AsyncEventQueue) {.raises: [].} = ab.readers.reset() ab.queue.clear() -proc closeWait*(ab: AsyncEventQueue): Future[void] {.raises: [].} = +proc closeWait*(ab: AsyncEventQueue): Future[void] {.async: (raw: true, raises: []).} = let retFuture = newFuture[void]("AsyncEventQueue.closeWait()", {FutureFlag.OwnCancelSchedule}) proc continuation(udata: pointer) {.gcsafe.} = @@ -528,7 +530,7 @@ template readerOverflow*(ab: AsyncEventQueue, reader: EventQueueReader): bool = ab.limit + (reader.offset - ab.offset) <= len(ab.queue) -proc emit*[T](ab: AsyncEventQueue[T], data: T) {.raises: [].} = +proc emit*[T](ab: AsyncEventQueue[T], data: T) = if len(ab.readers) > 0: # We enqueue `data` only if there active reader present. var changesPresent = false @@ -565,7 +567,8 @@ proc emit*[T](ab: AsyncEventQueue[T], data: T) {.raises: [].} = proc waitEvents*[T](ab: AsyncEventQueue[T], key: EventQueueKey, - eventsCount = -1): Future[seq[T]] {.async.} = + eventsCount = -1): Future[seq[T]] {. + async: (raises: [AsyncEventQueueFullError, CancelledError]).} = ## Wait for events var events: seq[T] @@ -595,7 +598,8 @@ proc waitEvents*[T](ab: AsyncEventQueue[T], doAssert(length >= ab.readers[index].offset) if length == ab.readers[index].offset: # We are at the end of queue, it means that we should wait for new events. - let waitFuture = newFuture[void]("AsyncEventQueue.waitEvents") + let waitFuture = Future[void].Raising([CancelledError]).init( + "AsyncEventQueue.waitEvents") ab.readers[index].waiter = waitFuture resetFuture = true await waitFuture @@ -626,4 +630,4 @@ proc waitEvents*[T](ab: AsyncEventQueue[T], if (eventsCount <= 0) or (len(events) == eventsCount): break - return events + events diff --git a/chronos/internal/asyncfutures.nim b/chronos/internal/asyncfutures.nim index c4a73747..a36ff4a9 100644 --- a/chronos/internal/asyncfutures.nim +++ b/chronos/internal/asyncfutures.nim @@ -16,7 +16,9 @@ import stew/base10 import ./[asyncengine, raisesfutures] import ../[config, futures] -export raisesfutures.InternalRaisesFuture +export + raisesfutures.Raising, raisesfutures.InternalRaisesFuture, + raisesfutures.init, raisesfutures.error, raisesfutures.readError when chronosStackTrace: import std/strutils @@ -109,7 +111,7 @@ template newInternalRaisesFuture*[T, E](fromProc: static[string] = ""): auto = ## that this future belongs to, is a good habit as it helps with debugging. newInternalRaisesFutureImpl[T, E](getSrcLocation(fromProc)) -template newFutureSeq*[A, B](fromProc: static[string] = ""): FutureSeq[A, B] = +template newFutureSeq*[A, B](fromProc: static[string] = ""): FutureSeq[A, B] {.deprecated.} = ## Create a new future which can hold/preserve GC sequence until future will ## not be completed. ## @@ -117,7 +119,7 @@ template newFutureSeq*[A, B](fromProc: static[string] = ""): FutureSeq[A, B] = ## that this future belongs to, is a good habit as it helps with debugging. newFutureSeqImpl[A, B](getSrcLocation(fromProc)) -template newFutureStr*[T](fromProc: static[string] = ""): FutureStr[T] = +template newFutureStr*[T](fromProc: static[string] = ""): FutureStr[T] {.deprecated.} = ## Create a new future which can hold/preserve GC string until future will ## not be completed. ## @@ -205,7 +207,8 @@ template complete*(future: Future[void]) = ## Completes a void ``future``. complete(future, getSrcLocation()) -proc fail(future: FutureBase, error: ref CatchableError, loc: ptr SrcLoc) = +proc failImpl( + future: FutureBase, error: ref CatchableError, loc: ptr SrcLoc) = if not(future.cancelled()): checkFinished(future, loc) future.internalError = error @@ -216,10 +219,16 @@ proc fail(future: FutureBase, error: ref CatchableError, loc: ptr SrcLoc) = getStackTrace(error) future.finish(FutureState.Failed) -template fail*( - future: FutureBase, error: ref CatchableError, warn: static bool = false) = +template fail*[T]( + future: Future[T], error: ref CatchableError, warn: static bool = false) = ## Completes ``future`` with ``error``. - fail(future, error, getSrcLocation()) + failImpl(future, error, getSrcLocation()) + +template fail*[T, E]( + future: InternalRaisesFuture[T, E], error: ref CatchableError, + warn: static bool = true) = + checkRaises(future, E, error, warn) + failImpl(future, error, getSrcLocation()) template newCancelledError(): ref CancelledError = (ref CancelledError)(msg: "Future operation cancelled!") @@ -377,8 +386,6 @@ proc futureContinue*(fut: FutureBase) {.raises: [], gcsafe.} = {.pop.} when chronosStackTrace: - import std/strutils - template getFilenameProcname(entry: StackTraceEntry): (string, string) = when compiles(entry.filenameStr) and compiles(entry.procnameStr): # We can't rely on "entry.filename" and "entry.procname" still being valid @@ -462,31 +469,36 @@ proc internalCheckComplete*(fut: FutureBase) {.raises: [CatchableError].} = injectStacktrace(fut.internalError) raise fut.internalError -macro internalCheckComplete*(f: InternalRaisesFuture): untyped = +macro internalCheckComplete*(fut: InternalRaisesFuture, raises: typed) = # For InternalRaisesFuture[void, (ValueError, OSError), will do: # {.cast(raises: [ValueError, OSError]).}: # if isNil(f.error): discard # else: raise f.error - let e = getTypeInst(f)[2] - let types = getType(e) + # TODO https://github.com/nim-lang/Nim/issues/22937 + # we cannot `getTypeInst` on the `fut` - when aliases are involved, the + # generics are lost - so instead, we pass the raises list explicitly + let types = getRaisesTypes(raises) if isNoRaises(types): return quote do: - if not(isNil(`f`.internalError)): - raiseAssert("Unhandled future exception: " & `f`.error.msg) + if not(isNil(`fut`.internalError)): + # This would indicate a bug in which `error` was set via the non-raising + # base type + raiseAssert("Error set on a non-raising future: " & `fut`.internalError.msg) expectKind(types, nnkBracketExpr) expectKind(types[0], nnkSym) + assert types[0].strVal == "tuple" let ifRaise = nnkIfExpr.newTree( nnkElifExpr.newTree( - quote do: isNil(`f`.internalError), + quote do: isNil(`fut`.internalError), quote do: discard ), nnkElseExpr.newTree( - nnkRaiseStmt.newNimNode(lineInfoFrom=f).add( - quote do: (`f`.internalError) + nnkRaiseStmt.newNimNode(lineInfoFrom=fut).add( + quote do: (`fut`.internalError) ) ) ) @@ -1118,7 +1130,7 @@ proc one*[F: SomeFuture](futs: varargs[F]): Future[F] {. return retFuture proc race*(futs: varargs[FutureBase]): Future[FutureBase] {. - async: (raw: true, raises: [CancelledError]).} = + async: (raw: true, raises: [ValueError, CancelledError]).} = ## Returns a future which will complete and return completed FutureBase, ## when one of the futures in ``futs`` will be completed, failed or canceled. ## @@ -1488,12 +1500,6 @@ when defined(windows): {.pop.} # Automatically deduced raises from here onwards -template fail*[T, E]( - future: InternalRaisesFuture[T, E], error: ref CatchableError, - warn: static bool = true) = - checkRaises(future, error, warn) - fail(future, error, getSrcLocation()) - proc waitFor*[T, E](fut: InternalRaisesFuture[T, E]): T = # {.raises: [E]} ## **Blocks** the current thread until the specified future finishes and ## reads it, potentially raising an exception if the future failed or was @@ -1512,7 +1518,7 @@ proc read*[T: not void, E](future: InternalRaisesFuture[T, E]): lent T = # {.rai # TODO: Make a custom exception type for this? raise newException(ValueError, "Future still in progress.") - internalCheckComplete(future) + internalCheckComplete(future, E) future.internalValue proc read*[E](future: InternalRaisesFuture[void, E]) = # {.raises: [E, CancelledError].} diff --git a/chronos/internal/asyncmacro.nim b/chronos/internal/asyncmacro.nim index 11daf336..88e11e39 100644 --- a/chronos/internal/asyncmacro.nim +++ b/chronos/internal/asyncmacro.nim @@ -497,7 +497,7 @@ proc asyncSingleProc(prc, params: NimNode): NimNode {.compileTime.} = prc -template await*[T](f: Future[T]): untyped = +template await*[T](f: Future[T]): T = when declared(chronosInternalRetFuture): chronosInternalRetFuture.internalChild = f # `futureContinue` calls the iterator generated by the `async` @@ -512,6 +512,21 @@ template await*[T](f: Future[T]): untyped = else: unsupported "await is only available within {.async.}" +template await*[T, E](f: InternalRaisesFuture[T, E]): T = + when declared(chronosInternalRetFuture): + chronosInternalRetFuture.internalChild = f + # `futureContinue` calls the iterator generated by the `async` + # transformation - `yield` gives control back to `futureContinue` which is + # responsible for resuming execution once the yielded future is finished + yield chronosInternalRetFuture.internalChild + # `child` released by `futureContinue` + cast[type(f)](chronosInternalRetFuture.internalChild).internalCheckComplete(E) + + when T isnot void: + cast[type(f)](chronosInternalRetFuture.internalChild).value() + else: + unsupported "await is only available within {.async.}" + template awaitne*[T](f: Future[T]): Future[T] = when declared(chronosInternalRetFuture): chronosInternalRetFuture.internalChild = f diff --git a/chronos/internal/raisesfutures.nim b/chronos/internal/raisesfutures.nim index ad811f72..79384d2e 100644 --- a/chronos/internal/raisesfutures.nim +++ b/chronos/internal/raisesfutures.nim @@ -1,5 +1,5 @@ import - std/macros, + std/[macros, sequtils], ../futures type @@ -18,6 +18,45 @@ proc makeNoRaises*(): NimNode {.compileTime.} = ident"void" +macro Raising*[T](F: typedesc[Future[T]], E: varargs[typedesc]): untyped = + ## Given a Future type instance, return a type storing `{.raises.}` + ## information + ## + ## Note; this type may change in the future + E.expectKind(nnkBracket) + + let raises = if E.len == 0: + makeNoRaises() + else: + nnkTupleConstr.newTree(E.mapIt(it)) + nnkBracketExpr.newTree( + ident "InternalRaisesFuture", + nnkDotExpr.newTree(F, ident"T"), + raises + ) + +template init*[T, E]( + F: type InternalRaisesFuture[T, E], fromProc: static[string] = ""): F = + ## Creates a new pending future. + ## + ## Specifying ``fromProc``, which is a string specifying the name of the proc + ## that this future belongs to, is a good habit as it helps with debugging. + let res = F() + internalInitFutureBase(res, getSrcLocation(fromProc), FutureState.Pending, {}) + res + +template init*[T, E]( + F: type InternalRaisesFuture[T, E], fromProc: static[string] = "", + flags: static[FutureFlags]): F = + ## Creates a new pending future. + ## + ## Specifying ``fromProc``, which is a string specifying the name of the proc + ## that this future belongs to, is a good habit as it helps with debugging. + let res = F() + internalInitFutureBase( + res, getSrcLocation(fromProc), FutureState.Pending, flags) + res + proc isNoRaises*(n: NimNode): bool {.compileTime.} = n.eqIdent("void") @@ -78,21 +117,15 @@ macro union*(tup0: typedesc[tuple], tup1: typedesc[tuple]): typedesc = if result.len == 0: result = makeNoRaises() -proc getRaises*(future: NimNode): NimNode {.compileTime.} = - # Given InternalRaisesFuture[T, (A, B, C)], returns (A, B, C) - let types = getType(getTypeInst(future)[2]) - if isNoRaises(types): - nnkBracketExpr.newTree(newEmptyNode()) - else: - expectKind(types, nnkBracketExpr) - expectKind(types[0], nnkSym) - assert types[0].strVal == "tuple" - assert types.len >= 1 - - types +proc getRaisesTypes*(raises: NimNode): NimNode = + let typ = getType(raises) + case typ.typeKind + of ntyTypeDesc: typ[1] + else: typ macro checkRaises*[T: CatchableError]( - future: InternalRaisesFuture, error: ref T, warn: static bool = true): untyped = + future: InternalRaisesFuture, raises: typed, error: ref T, + warn: static bool = true): untyped = ## Generate code that checks that the given error is compatible with the ## raises restrictions of `future`. ## @@ -100,11 +133,18 @@ macro checkRaises*[T: CatchableError]( ## information available at compile time - in particular, if the raises ## inherit from `error`, we end up with the equivalent of a downcast which ## raises a Defect if it fails. - let raises = getRaises(future) + let + raises = getRaisesTypes(raises) expectKind(getTypeInst(error), nnkRefTy) let toMatch = getTypeInst(error)[0] + + if isNoRaises(raises): + error( + "`fail`: `" & repr(toMatch) & "` incompatible with `raises: []`", future) + return + var typeChecker = ident"false" maybeChecker = ident"false" @@ -134,3 +174,15 @@ macro checkRaises*[T: CatchableError]( else: `warning` assert(`runtimeChecker`, `errorMsg`) + +proc error*[T](future: InternalRaisesFuture[T, void]): ref CatchableError {. + raises: [].} = + static: + warning("No exceptions possible with this operation, `error` always returns nil") + nil + +proc readError*[T](future: InternalRaisesFuture[T, void]): ref CatchableError {. + raises: [ValueError].} = + static: + warning("No exceptions possible with this operation, `readError` always raises") + raise newException(ValueError, "No error in future.") diff --git a/chronos/transports/common.nim b/chronos/transports/common.nim index d8263af2..24f9852b 100644 --- a/chronos/transports/common.nim +++ b/chronos/transports/common.nim @@ -113,6 +113,8 @@ type ## Transport's capability not supported exception TransportUseClosedError* = object of TransportError ## Usage after transport close exception + TransportUseEofError* = object of TransportError + ## Usage after transport half-close exception TransportTooManyError* = object of TransportError ## Too many open file descriptors exception TransportAbortedError* = object of TransportError @@ -567,11 +569,11 @@ template checkClosed*(t: untyped, future: untyped) = template checkWriteEof*(t: untyped, future: untyped) = if (WriteEof in (t).state): - future.fail(newException(TransportError, + future.fail(newException(TransportUseEofError, "Transport connection is already dropped!")) return future -template getError*(t: untyped): ref CatchableError = +template getError*(t: untyped): ref TransportError = var err = (t).error (t).error = nil err diff --git a/chronos/transports/datagram.nim b/chronos/transports/datagram.nim index aec18ae3..30f872d5 100644 --- a/chronos/transports/datagram.nim +++ b/chronos/transports/datagram.nim @@ -27,7 +27,10 @@ type DatagramCallback* = proc(transp: DatagramTransport, remote: TransportAddress): Future[void] {. - gcsafe, raises: [].} + async: (raises: []).} + + UnsafeDatagramCallback* = proc(transp: DatagramTransport, + remote: TransportAddress): Future[void] {.async.} DatagramTransport* = ref object of RootRef fd*: AsyncFD # File descriptor @@ -35,7 +38,7 @@ type flags: set[ServerFlags] # Flags buffer: seq[byte] # Reading buffer buflen: int # Reading buffer effective size - error: ref CatchableError # Current error + error: ref TransportError # Current error queue: Deque[GramVector] # Writer queue local: TransportAddress # Local address remote: TransportAddress # Remote address @@ -599,6 +602,41 @@ proc close*(transp: DatagramTransport) = transp.state.incl({WriteClosed, ReadClosed}) closeSocket(transp.fd, continuation) +proc newDatagramTransportCommon(cbproc: UnsafeDatagramCallback, + remote: TransportAddress, + local: TransportAddress, + sock: AsyncFD, + flags: set[ServerFlags], + udata: pointer, + child: DatagramTransport, + bufferSize: int, + ttl: int, + dualstack = DualStackType.Auto + ): DatagramTransport {. + raises: [TransportOsError].} = + ## Create new UDP datagram transport (IPv4). + ## + ## ``cbproc`` - callback which will be called, when new datagram received. + ## ``remote`` - bind transport to remote address (optional). + ## ``local`` - bind transport to local address (to serving incoming + ## datagrams, optional) + ## ``sock`` - application-driven socket to use. + ## ``flags`` - flags that will be applied to socket. + ## ``udata`` - custom argument which will be passed to ``cbproc``. + ## ``bufSize`` - size of internal buffer. + ## ``ttl`` - TTL for UDP datagram packet (only usable when flags has + ## ``Broadcast`` option). + + proc wrap(transp: DatagramTransport, + remote: TransportAddress) {.async: (raises: []).} = + try: + cbproc(transp, remote) + except CatchableError as exc: + raiseAssert "Unexpected exception from stream server cbproc: " & exc.msg + + newDatagramTransportCommon(wrap, remote, local, sock, flags, udata, child, + bufferSize, ttl, dualstack) + proc newDatagramTransport*(cbproc: DatagramCallback, remote: TransportAddress = AnyAddress, local: TransportAddress = AnyAddress, @@ -689,7 +727,102 @@ proc newDatagramTransport6*[T](cbproc: DatagramCallback, cast[pointer](udata), child, bufSize, ttl, dualstack) -proc join*(transp: DatagramTransport): Future[void] = +proc newDatagramTransport*(cbproc: UnsafeDatagramCallback, + remote: TransportAddress = AnyAddress, + local: TransportAddress = AnyAddress, + sock: AsyncFD = asyncInvalidSocket, + flags: set[ServerFlags] = {}, + udata: pointer = nil, + child: DatagramTransport = nil, + bufSize: int = DefaultDatagramBufferSize, + ttl: int = 0, + dualstack = DualStackType.Auto + ): DatagramTransport {. + raises: [TransportOsError], + deprecated: "Callback must not raise exceptions, annotate with {.async: (raises: []).}".} = + ## Create new UDP datagram transport (IPv4). + ## + ## ``cbproc`` - callback which will be called, when new datagram received. + ## ``remote`` - bind transport to remote address (optional). + ## ``local`` - bind transport to local address (to serving incoming + ## datagrams, optional) + ## ``sock`` - application-driven socket to use. + ## ``flags`` - flags that will be applied to socket. + ## ``udata`` - custom argument which will be passed to ``cbproc``. + ## ``bufSize`` - size of internal buffer. + ## ``ttl`` - TTL for UDP datagram packet (only usable when flags has + ## ``Broadcast`` option). + newDatagramTransportCommon(cbproc, remote, local, sock, flags, udata, child, + bufSize, ttl, dualstack) + +proc newDatagramTransport*[T](cbproc: UnsafeDatagramCallback, + udata: ref T, + remote: TransportAddress = AnyAddress, + local: TransportAddress = AnyAddress, + sock: AsyncFD = asyncInvalidSocket, + flags: set[ServerFlags] = {}, + child: DatagramTransport = nil, + bufSize: int = DefaultDatagramBufferSize, + ttl: int = 0, + dualstack = DualStackType.Auto + ): DatagramTransport {. + raises: [TransportOsError], + deprecated: "Callback must not raise exceptions, annotate with {.async: (raises: []).}".} = + var fflags = flags + {GCUserData} + GC_ref(udata) + newDatagramTransportCommon(cbproc, remote, local, sock, fflags, + cast[pointer](udata), child, bufSize, ttl, + dualstack) + +proc newDatagramTransport6*(cbproc: UnsafeDatagramCallback, + remote: TransportAddress = AnyAddress6, + local: TransportAddress = AnyAddress6, + sock: AsyncFD = asyncInvalidSocket, + flags: set[ServerFlags] = {}, + udata: pointer = nil, + child: DatagramTransport = nil, + bufSize: int = DefaultDatagramBufferSize, + ttl: int = 0, + dualstack = DualStackType.Auto + ): DatagramTransport {. + raises: [TransportOsError], + deprecated: "Callback must not raise exceptions, annotate with {.async: (raises: []).}".} = + ## Create new UDP datagram transport (IPv6). + ## + ## ``cbproc`` - callback which will be called, when new datagram received. + ## ``remote`` - bind transport to remote address (optional). + ## ``local`` - bind transport to local address (to serving incoming + ## datagrams, optional) + ## ``sock`` - application-driven socket to use. + ## ``flags`` - flags that will be applied to socket. + ## ``udata`` - custom argument which will be passed to ``cbproc``. + ## ``bufSize`` - size of internal buffer. + ## ``ttl`` - TTL for UDP datagram packet (only usable when flags has + ## ``Broadcast`` option). + newDatagramTransportCommon(cbproc, remote, local, sock, flags, udata, child, + bufSize, ttl, dualstack) + +proc newDatagramTransport6*[T](cbproc: UnsafeDatagramCallback, + udata: ref T, + remote: TransportAddress = AnyAddress6, + local: TransportAddress = AnyAddress6, + sock: AsyncFD = asyncInvalidSocket, + flags: set[ServerFlags] = {}, + child: DatagramTransport = nil, + bufSize: int = DefaultDatagramBufferSize, + ttl: int = 0, + dualstack = DualStackType.Auto + ): DatagramTransport {. + raises: [TransportOsError], + deprecated: "Callback must not raise exceptions, annotate with {.async: (raises: []).}".} = + var fflags = flags + {GCUserData} + GC_ref(udata) + newDatagramTransportCommon(cbproc, remote, local, sock, fflags, + cast[pointer](udata), child, bufSize, ttl, + dualstack) + +proc join*(transp: DatagramTransport): Future[void] {. + async: (raw: true, raises: [CancelledError]).} = ## Wait until the transport ``transp`` will be closed. var retFuture = newFuture[void]("datagram.transport.join") @@ -707,14 +840,15 @@ proc join*(transp: DatagramTransport): Future[void] = return retFuture -proc closeWait*(transp: DatagramTransport): Future[void] = +proc closeWait*(transp: DatagramTransport): Future[void] {. + async: (raw: true, raises: []).} = ## Close transport ``transp`` and release all resources. - const FutureName = "datagram.transport.closeWait" + let retFuture = newFuture[void]( + "datagram.transport.closeWait", {FutureFlag.OwnCancelSchedule}) if {ReadClosed, WriteClosed} * transp.state != {}: - return Future.completed(FutureName) - - let retFuture = newFuture[void](FutureName, {FutureFlag.OwnCancelSchedule}) + retFuture.complete() + return retFuture proc continuation(udata: pointer) {.gcsafe.} = retFuture.complete() @@ -733,7 +867,8 @@ proc closeWait*(transp: DatagramTransport): Future[void] = retFuture proc send*(transp: DatagramTransport, pbytes: pointer, - nbytes: int): Future[void] = + nbytes: int): Future[void] {. + async: (raw: true, raises: [TransportError, CancelledError]).} = ## Send buffer with pointer ``pbytes`` and size ``nbytes`` using transport ## ``transp`` to remote destination address which was bounded on transport. var retFuture = newFuture[void]("datagram.transport.send(pointer)") @@ -751,22 +886,21 @@ proc send*(transp: DatagramTransport, pbytes: pointer, return retFuture proc send*(transp: DatagramTransport, msg: sink string, - msglen = -1): Future[void] = + msglen = -1): Future[void] {. + async: (raw: true, raises: [TransportError, CancelledError]).} = ## Send string ``msg`` using transport ``transp`` to remote destination ## address which was bounded on transport. - var retFuture = newFutureStr[void]("datagram.transport.send(string)") + var retFuture = newFuture[void]("datagram.transport.send(string)") transp.checkClosed(retFuture) - when declared(shallowCopy): - if not(isLiteral(msg)): - shallowCopy(retFuture.gcholder, msg) - else: - retFuture.gcholder = msg - else: - retFuture.gcholder = msg + let length = if msglen <= 0: len(msg) else: msglen - let vector = GramVector(kind: WithoutAddress, buf: addr retFuture.gcholder[0], + var localCopy = msg + retFuture.addCallback(proc(_: pointer) = reset(localCopy)) + + let vector = GramVector(kind: WithoutAddress, buf: addr localCopy[0], buflen: length, - writer: cast[Future[void]](retFuture)) + writer: retFuture) + transp.queue.addLast(vector) if WritePaused in transp.state: let wres = transp.resumeWrite() @@ -775,22 +909,20 @@ proc send*(transp: DatagramTransport, msg: sink string, return retFuture proc send*[T](transp: DatagramTransport, msg: sink seq[T], - msglen = -1): Future[void] = + msglen = -1): Future[void] {. + async: (raw: true, raises: [TransportError, CancelledError]).} = ## Send string ``msg`` using transport ``transp`` to remote destination ## address which was bounded on transport. - var retFuture = newFutureSeq[void, T]("datagram.transport.send(seq)") + var retFuture = newFuture[void]("datagram.transport.send(seq)") transp.checkClosed(retFuture) - when declared(shallowCopy): - if not(isLiteral(msg)): - shallowCopy(retFuture.gcholder, msg) - else: - retFuture.gcholder = msg - else: - retFuture.gcholder = msg + let length = if msglen <= 0: (len(msg) * sizeof(T)) else: (msglen * sizeof(T)) - let vector = GramVector(kind: WithoutAddress, buf: addr retFuture.gcholder[0], + var localCopy = msg + retFuture.addCallback(proc(_: pointer) = reset(localCopy)) + + let vector = GramVector(kind: WithoutAddress, buf: addr localCopy[0], buflen: length, - writer: cast[Future[void]](retFuture)) + writer: retFuture) transp.queue.addLast(vector) if WritePaused in transp.state: let wres = transp.resumeWrite() @@ -799,7 +931,8 @@ proc send*[T](transp: DatagramTransport, msg: sink seq[T], return retFuture proc sendTo*(transp: DatagramTransport, remote: TransportAddress, - pbytes: pointer, nbytes: int): Future[void] = + pbytes: pointer, nbytes: int): Future[void] {. + async: (raw: true, raises: [TransportError, CancelledError]).} = ## Send buffer with pointer ``pbytes`` and size ``nbytes`` using transport ## ``transp`` to remote destination address ``remote``. var retFuture = newFuture[void]("datagram.transport.sendTo(pointer)") @@ -814,22 +947,20 @@ proc sendTo*(transp: DatagramTransport, remote: TransportAddress, return retFuture proc sendTo*(transp: DatagramTransport, remote: TransportAddress, - msg: sink string, msglen = -1): Future[void] = + msg: sink string, msglen = -1): Future[void] {. + async: (raw: true, raises: [TransportError, CancelledError]).} = ## Send string ``msg`` using transport ``transp`` to remote destination ## address ``remote``. - var retFuture = newFutureStr[void]("datagram.transport.sendTo(string)") + var retFuture = newFuture[void]("datagram.transport.sendTo(string)") transp.checkClosed(retFuture) - when declared(shallowCopy): - if not(isLiteral(msg)): - shallowCopy(retFuture.gcholder, msg) - else: - retFuture.gcholder = msg - else: - retFuture.gcholder = msg + let length = if msglen <= 0: len(msg) else: msglen - let vector = GramVector(kind: WithAddress, buf: addr retFuture.gcholder[0], + var localCopy = msg + retFuture.addCallback(proc(_: pointer) = reset(localCopy)) + + let vector = GramVector(kind: WithAddress, buf: addr localCopy[0], buflen: length, - writer: cast[Future[void]](retFuture), + writer: retFuture, address: remote) transp.queue.addLast(vector) if WritePaused in transp.state: @@ -839,20 +970,17 @@ proc sendTo*(transp: DatagramTransport, remote: TransportAddress, return retFuture proc sendTo*[T](transp: DatagramTransport, remote: TransportAddress, - msg: sink seq[T], msglen = -1): Future[void] = + msg: sink seq[T], msglen = -1): Future[void] {. + async: (raw: true, raises: [TransportError, CancelledError]).} = ## Send sequence ``msg`` using transport ``transp`` to remote destination ## address ``remote``. - var retFuture = newFutureSeq[void, T]("datagram.transport.sendTo(seq)") + var retFuture = newFuture[void]("datagram.transport.sendTo(seq)") transp.checkClosed(retFuture) - when declared(shallowCopy): - if not(isLiteral(msg)): - shallowCopy(retFuture.gcholder, msg) - else: - retFuture.gcholder = msg - else: - retFuture.gcholder = msg let length = if msglen <= 0: (len(msg) * sizeof(T)) else: (msglen * sizeof(T)) - let vector = GramVector(kind: WithAddress, buf: addr retFuture.gcholder[0], + var localCopy = msg + retFuture.addCallback(proc(_: pointer) = reset(localCopy)) + + let vector = GramVector(kind: WithAddress, buf: addr localCopy[0], buflen: length, writer: cast[Future[void]](retFuture), address: remote) @@ -864,7 +992,7 @@ proc sendTo*[T](transp: DatagramTransport, remote: TransportAddress, return retFuture proc peekMessage*(transp: DatagramTransport, msg: var seq[byte], - msglen: var int) {.raises: [CatchableError].} = + msglen: var int) {.raises: [TransportError].} = ## Get access to internal message buffer and length of incoming datagram. if ReadError in transp.state: transp.state.excl(ReadError) @@ -876,7 +1004,7 @@ proc peekMessage*(transp: DatagramTransport, msg: var seq[byte], msglen = transp.buflen proc getMessage*(transp: DatagramTransport): seq[byte] {. - raises: [CatchableError].} = + raises: [TransportError].} = ## Copy data from internal message buffer and return result. var default: seq[byte] if ReadError in transp.state: diff --git a/chronos/transports/stream.nim b/chronos/transports/stream.nim index 7471a446..bdcb8d7b 100644 --- a/chronos/transports/stream.nim +++ b/chronos/transports/stream.nim @@ -58,6 +58,8 @@ type done: bool] {. gcsafe, raises: [].} + ReaderFuture = Future[void].Raising([TransportError, CancelledError]) + const StreamTransportTrackerName* = "stream.transport" StreamServerTrackerName* = "stream.server" @@ -68,10 +70,10 @@ when defined(windows): StreamTransport* = ref object of RootRef fd*: AsyncFD # File descriptor state: set[TransportState] # Current Transport state - reader: Future[void] # Current reader Future + reader: ReaderFuture # Current reader Future buffer: seq[byte] # Reading buffer offset: int # Reading buffer offset - error: ref CatchableError # Current error + error: ref TransportError # Current error queue: Deque[StreamVector] # Writer queue future: Future[void] # Stream life future # Windows specific part @@ -87,18 +89,18 @@ when defined(windows): local: TransportAddress # Local address remote: TransportAddress # Remote address of TransportKind.Pipe: - todo1: int + discard of TransportKind.File: - todo2: int + discard else: type StreamTransport* = ref object of RootRef fd*: AsyncFD # File descriptor state: set[TransportState] # Current Transport state - reader: Future[void] # Current reader Future + reader: ReaderFuture # Current reader Future buffer: seq[byte] # Reading buffer offset: int # Reading buffer offset - error: ref CatchableError # Current error + error: ref TransportError # Current error queue: Deque[StreamVector] # Writer queue future: Future[void] # Stream life future case kind*: TransportKind @@ -107,18 +109,23 @@ else: local: TransportAddress # Local address remote: TransportAddress # Remote address of TransportKind.Pipe: - todo1: int + discard of TransportKind.File: - todo2: int + discard type StreamCallback* = proc(server: StreamServer, - client: StreamTransport): Future[void] {. - gcsafe, raises: [].} + client: StreamTransport) {.async: (raises: []).} ## New remote client connection callback ## ``server`` - StreamServer object. ## ``client`` - accepted client transport. + UnsafeStreamCallback* = proc(server: StreamServer, + client: StreamTransport) {.async.} + ## Connection callback that doesn't check for exceptions at compile time + ## ``server`` - StreamServer object. + ## ``client`` - accepted client transport. + TransportInitCallback* = proc(server: StreamServer, fd: AsyncFD): StreamTransport {. gcsafe, raises: [].} @@ -199,7 +206,7 @@ proc completePendingWriteQueue(queue: var Deque[StreamVector], vector.writer.complete(v) proc failPendingWriteQueue(queue: var Deque[StreamVector], - error: ref CatchableError) {.inline.} = + error: ref TransportError) {.inline.} = while len(queue) > 0: var vector = queue.popFirst() if not(vector.writer.finished()): @@ -640,7 +647,8 @@ when defined(windows): localAddress = TransportAddress(), flags: set[SocketFlags] = {}, dualstack = DualStackType.Auto - ): Future[StreamTransport] = + ): Future[StreamTransport] {. + async: (raw: true, raises: [TransportError, CancelledError]).} = ## Open new connection to remote peer with address ``address`` and create ## new transport object ``StreamTransport`` for established connection. ## ``bufferSize`` is size of internal buffer for transport. @@ -1031,7 +1039,8 @@ when defined(windows): server.aovl.data.cb(addr server.aovl) ok() - proc accept*(server: StreamServer): Future[StreamTransport] = + proc accept*(server: StreamServer): Future[StreamTransport] {. + async: (raw: true, raises: [TransportError, CancelledError]).} = var retFuture = newFuture[StreamTransport]("stream.server.accept") doAssert(server.status != ServerStatus.Running, @@ -1472,7 +1481,8 @@ else: localAddress = TransportAddress(), flags: set[SocketFlags] = {}, dualstack = DualStackType.Auto, - ): Future[StreamTransport] = + ): Future[StreamTransport] {. + async: (raw: true, raises: [TransportError, CancelledError]).} = ## Open new connection to remote peer with address ``address`` and create ## new transport object ``StreamTransport`` for established connection. ## ``bufferSize`` - size of internal buffer for transport. @@ -1658,7 +1668,8 @@ else: transp.state.excl(WritePaused) ok() - proc accept*(server: StreamServer): Future[StreamTransport] = + proc accept*(server: StreamServer): Future[StreamTransport] {. + async: (raw: true, raises: [TransportError, CancelledError]).} = var retFuture = newFuture[StreamTransport]("stream.server.accept") doAssert(server.status != ServerStatus.Running, @@ -1762,7 +1773,8 @@ proc stop*(server: StreamServer) {.raises: [TransportOsError].} = let res = stop2(server) if res.isErr(): raiseTransportOsError(res.error()) -proc join*(server: StreamServer): Future[void] = +proc join*(server: StreamServer): Future[void] {. + async: (raw: true, raises: [CancelledError]).} = ## Waits until ``server`` is not closed. var retFuture = newFuture[void]("stream.transport.server.join") @@ -1785,7 +1797,8 @@ proc connect*(address: TransportAddress, flags: set[TransportFlags], localAddress = TransportAddress(), dualstack = DualStackType.Auto - ): Future[StreamTransport] = + ): Future[StreamTransport] {. + async: (raw: true, raises: [TransportError, CancelledError]).} = # Retro compatibility with TransportFlags var mappedFlags: set[SocketFlags] if TcpNoDelay in flags: mappedFlags.incl(SocketFlags.TcpNoDelay) @@ -1817,7 +1830,8 @@ proc close*(server: StreamServer) = else: server.sock.closeSocket(continuation) -proc closeWait*(server: StreamServer): Future[void] = +proc closeWait*(server: StreamServer): Future[void] {. + async: (raw: true, raises: [CancelledError]).} = ## Close server ``server`` and release all resources. server.close() server.join() @@ -2066,6 +2080,7 @@ proc createStreamServer*(host: TransportAddress, sres proc createStreamServer*(host: TransportAddress, + cbproc: UnsafeStreamCallback, flags: set[ServerFlags] = {}, sock: AsyncFD = asyncInvalidSocket, backlog: int = DefaultBacklogSize, @@ -2074,8 +2089,30 @@ proc createStreamServer*(host: TransportAddress, init: TransportInitCallback = nil, udata: pointer = nil, dualstack = DualStackType.Auto): StreamServer {. - raises: [CatchableError].} = - createStreamServer(host, nil, flags, sock, backlog, bufferSize, + raises: [TransportOsError], + deprecated: "Callback must not raise exceptions, annotate with {.async: (raises: []).}".} = + proc wrap(server: StreamServer, + client: StreamTransport) {.async: (raises: []).} = + try: + cbproc(server, client) + except CatchableError as exc: + raiseAssert "Unexpected exception from stream server cbproc: " & exc.msg + + createStreamServer( + host, wrap, flags, sock, backlog, bufferSize, child, init, udata, + dualstack) + +proc createStreamServer*(host: TransportAddress, + flags: set[ServerFlags] = {}, + sock: AsyncFD = asyncInvalidSocket, + backlog: int = DefaultBacklogSize, + bufferSize: int = DefaultStreamBufferSize, + child: StreamServer = nil, + init: TransportInitCallback = nil, + udata: pointer = nil, + dualstack = DualStackType.Auto): StreamServer {. + raises: [TransportOsError].} = + createStreamServer(host, StreamCallback(nil), flags, sock, backlog, bufferSize, child, init, cast[pointer](udata), dualstack) proc createStreamServer*[T](host: TransportAddress, @@ -2088,7 +2125,24 @@ proc createStreamServer*[T](host: TransportAddress, child: StreamServer = nil, init: TransportInitCallback = nil, dualstack = DualStackType.Auto): StreamServer {. - raises: [CatchableError].} = + raises: [TransportOsError].} = + var fflags = flags + {GCUserData} + GC_ref(udata) + createStreamServer(host, cbproc, fflags, sock, backlog, bufferSize, + child, init, cast[pointer](udata), dualstack) + +proc createStreamServer*[T](host: TransportAddress, + cbproc: UnsafeStreamCallback, + flags: set[ServerFlags] = {}, + udata: ref T, + sock: AsyncFD = asyncInvalidSocket, + backlog: int = DefaultBacklogSize, + bufferSize: int = DefaultStreamBufferSize, + child: StreamServer = nil, + init: TransportInitCallback = nil, + dualstack = DualStackType.Auto): StreamServer {. + raises: [TransportOsError], + deprecated: "Callback must not raise exceptions, annotate with {.async: (raises: []).}".} = var fflags = flags + {GCUserData} GC_ref(udata) createStreamServer(host, cbproc, fflags, sock, backlog, bufferSize, @@ -2103,10 +2157,10 @@ proc createStreamServer*[T](host: TransportAddress, child: StreamServer = nil, init: TransportInitCallback = nil, dualstack = DualStackType.Auto): StreamServer {. - raises: [CatchableError].} = + raises: [TransportOsError].} = var fflags = flags + {GCUserData} GC_ref(udata) - createStreamServer(host, nil, fflags, sock, backlog, bufferSize, + createStreamServer(host, StreamCallback(nil), fflags, sock, backlog, bufferSize, child, init, cast[pointer](udata), dualstack) proc getUserData*[T](server: StreamServer): T {.inline.} = @@ -2157,7 +2211,8 @@ template fastWrite(transp: auto, pbytes: var ptr byte, rbytes: var int, return retFuture proc write*(transp: StreamTransport, pbytes: pointer, - nbytes: int): Future[int] = + nbytes: int): Future[int] {. + async: (raw: true, raises: [TransportError, CancelledError]).} = ## Write data from buffer ``pbytes`` with size ``nbytes`` using transport ## ``transp``. var retFuture = newFuture[int]("stream.transport.write(pointer)") @@ -2179,9 +2234,10 @@ proc write*(transp: StreamTransport, pbytes: pointer, return retFuture proc write*(transp: StreamTransport, msg: sink string, - msglen = -1): Future[int] = + msglen = -1): Future[int] {. + async: (raw: true, raises: [TransportError, CancelledError]).} = ## Write data from string ``msg`` using transport ``transp``. - var retFuture = newFutureStr[int]("stream.transport.write(string)") + var retFuture = newFuture[int]("stream.transport.write(string)") transp.checkClosed(retFuture) transp.checkWriteEof(retFuture) @@ -2197,17 +2253,10 @@ proc write*(transp: StreamTransport, msg: sink string, let written = nbytes - rbytes # In case fastWrite wrote some - pbytes = - when declared(shallowCopy): - if not(isLiteral(msg)): - shallowCopy(retFuture.gcholder, msg) - cast[ptr byte](addr retFuture.gcholder[written]) - else: - retFuture.gcholder = msg[written ..< nbytes] - cast[ptr byte](addr retFuture.gcholder[0]) - else: - retFuture.gcholder = msg[written ..< nbytes] - cast[ptr byte](addr retFuture.gcholder[0]) + var localCopy = msg + retFuture.addCallback(proc(_: pointer) = reset(localCopy)) + + pbytes = cast[ptr byte](addr localCopy[written]) var vector = StreamVector(kind: DataBuffer, writer: retFuture, buf: pbytes, buflen: rbytes, size: nbytes) @@ -2218,9 +2267,10 @@ proc write*(transp: StreamTransport, msg: sink string, return retFuture proc write*[T](transp: StreamTransport, msg: sink seq[T], - msglen = -1): Future[int] = + msglen = -1): Future[int] {. + async: (raw: true, raises: [TransportError, CancelledError]).} = ## Write sequence ``msg`` using transport ``transp``. - var retFuture = newFutureSeq[int, T]("stream.transport.write(seq)") + var retFuture = newFuture[int]("stream.transport.write(seq)") transp.checkClosed(retFuture) transp.checkWriteEof(retFuture) @@ -2236,17 +2286,10 @@ proc write*[T](transp: StreamTransport, msg: sink seq[T], let written = nbytes - rbytes # In case fastWrite wrote some - pbytes = - when declared(shallowCopy): - if not(isLiteral(msg)): - shallowCopy(retFuture.gcholder, msg) - cast[ptr byte](addr retFuture.gcholder[written]) - else: - retFuture.gcholder = msg[written ..< nbytes] - cast[ptr byte](addr retFuture.gcholder[0]) - else: - retFuture.gcholder = msg[written ..< nbytes] - cast[ptr byte](addr retFuture.gcholder[0]) + var localCopy = msg + retFuture.addCallback(proc(_: pointer) = reset(localCopy)) + + pbytes = cast[ptr byte](addr localCopy[written]) var vector = StreamVector(kind: DataBuffer, writer: retFuture, buf: pbytes, buflen: rbytes, size: nbytes) @@ -2257,7 +2300,8 @@ proc write*[T](transp: StreamTransport, msg: sink seq[T], return retFuture proc writeFile*(transp: StreamTransport, handle: int, - offset: uint = 0, size: int = 0): Future[int] = + offset: uint = 0, size: int = 0): Future[int] {. + async: (raw: true, raises: [TransportError, CancelledError]).} = ## Write data from file descriptor ``handle`` to transport ``transp``. ## ## You can specify starting ``offset`` in opened file and number of bytes @@ -2304,7 +2348,7 @@ template readLoop(name, body: untyped): untyped = break else: checkPending(transp) - var fut = newFuture[void](name) + let fut = ReaderFuture.init(name) transp.reader = fut let res = resumeRead(transp) if res.isErr(): @@ -2328,7 +2372,8 @@ template readLoop(name, body: untyped): untyped = await fut proc readExactly*(transp: StreamTransport, pbytes: pointer, - nbytes: int) {.async.} = + nbytes: int) {. + async: (raises: [TransportError, CancelledError]).} = ## Read exactly ``nbytes`` bytes from transport ``transp`` and store it to ## ``pbytes``. ``pbytes`` must not be ``nil`` pointer and ``nbytes`` should ## be Natural. @@ -2357,7 +2402,8 @@ proc readExactly*(transp: StreamTransport, pbytes: pointer, (consumed: count, done: index == nbytes) proc readOnce*(transp: StreamTransport, pbytes: pointer, - nbytes: int): Future[int] {.async.} = + nbytes: int): Future[int] {. + async: (raises: [TransportError, CancelledError]).} = ## Perform one read operation on transport ``transp``. ## ## If internal buffer is not empty, ``nbytes`` bytes will be transferred from @@ -2376,7 +2422,8 @@ proc readOnce*(transp: StreamTransport, pbytes: pointer, return count proc readUntil*(transp: StreamTransport, pbytes: pointer, nbytes: int, - sep: seq[byte]): Future[int] {.async.} = + sep: seq[byte]): Future[int] {. + async: (raises: [TransportError, CancelledError]).} = ## Read data from the transport ``transp`` until separator ``sep`` is found. ## ## On success, the data and separator will be removed from the internal @@ -2428,7 +2475,8 @@ proc readUntil*(transp: StreamTransport, pbytes: pointer, nbytes: int, return k proc readLine*(transp: StreamTransport, limit = 0, - sep = "\r\n"): Future[string] {.async.} = + sep = "\r\n"): Future[string] {. + async: (raises: [TransportError, CancelledError]).} = ## Read one line from transport ``transp``, where "line" is a sequence of ## bytes ending with ``sep`` (default is "\r\n"). ## @@ -2470,7 +2518,8 @@ proc readLine*(transp: StreamTransport, limit = 0, (index, (state == len(sep)) or (lim == len(result))) -proc read*(transp: StreamTransport): Future[seq[byte]] {.async.} = +proc read*(transp: StreamTransport): Future[seq[byte]] {. + async: (raises: [TransportError, CancelledError]).} = ## Read all bytes from transport ``transp``. ## ## This procedure allocates buffer seq[byte] and return it as result. @@ -2481,7 +2530,8 @@ proc read*(transp: StreamTransport): Future[seq[byte]] {.async.} = result.add(transp.buffer.toOpenArray(0, transp.offset - 1)) (transp.offset, false) -proc read*(transp: StreamTransport, n: int): Future[seq[byte]] {.async.} = +proc read*(transp: StreamTransport, n: int): Future[seq[byte]] {. + async: (raises: [TransportError, CancelledError]).} = ## Read all bytes (n <= 0) or exactly `n` bytes from transport ``transp``. ## ## This procedure allocates buffer seq[byte] and return it as result. @@ -2496,7 +2546,8 @@ proc read*(transp: StreamTransport, n: int): Future[seq[byte]] {.async.} = result.add(transp.buffer.toOpenArray(0, count - 1)) (count, len(result) == n) -proc consume*(transp: StreamTransport): Future[int] {.async.} = +proc consume*(transp: StreamTransport): Future[int] {. + async: (raises: [TransportError, CancelledError]).} = ## Consume all bytes from transport ``transp`` and discard it. ## ## Return number of bytes actually consumed and discarded. @@ -2507,7 +2558,8 @@ proc consume*(transp: StreamTransport): Future[int] {.async.} = result += transp.offset (transp.offset, false) -proc consume*(transp: StreamTransport, n: int): Future[int] {.async.} = +proc consume*(transp: StreamTransport, n: int): Future[int] {. + async: (raises: [TransportError, CancelledError]).} = ## Consume all bytes (n <= 0) or ``n`` bytes from transport ``transp`` and ## discard it. ## @@ -2524,7 +2576,8 @@ proc consume*(transp: StreamTransport, n: int): Future[int] {.async.} = (count, result == n) proc readMessage*(transp: StreamTransport, - predicate: ReadMessagePredicate) {.async.} = + predicate: ReadMessagePredicate) {. + async: (raises: [TransportError, CancelledError]).} = ## Read all bytes from transport ``transp`` until ``predicate`` callback ## will not be satisfied. ## @@ -2547,7 +2600,8 @@ proc readMessage*(transp: StreamTransport, else: predicate(transp.buffer.toOpenArray(0, transp.offset - 1)) -proc join*(transp: StreamTransport): Future[void] = +proc join*(transp: StreamTransport): Future[void] {. + async: (raw: true, raises: [CancelledError]).} = ## Wait until ``transp`` will not be closed. var retFuture = newFuture[void]("stream.transport.join") @@ -2606,14 +2660,15 @@ proc close*(transp: StreamTransport) = elif transp.kind == TransportKind.Socket: closeSocket(transp.fd, continuation) -proc closeWait*(transp: StreamTransport): Future[void] = +proc closeWait*(transp: StreamTransport): Future[void] {. + async: (raw: true, raises: []).} = ## Close and frees resources of transport ``transp``. - const FutureName = "stream.transport.closeWait" + let retFuture = newFuture[void]( + "stream.transport.closeWait", {FutureFlag.OwnCancelSchedule}) if {ReadClosed, WriteClosed} * transp.state != {}: - return Future.completed(FutureName) - - let retFuture = newFuture[void](FutureName, {FutureFlag.OwnCancelSchedule}) + retFuture.complete() + return retFuture proc continuation(udata: pointer) {.gcsafe.} = retFuture.complete() @@ -2631,7 +2686,8 @@ proc closeWait*(transp: StreamTransport): Future[void] = retFuture.cancelCallback = cancellation retFuture -proc shutdownWait*(transp: StreamTransport): Future[void] = +proc shutdownWait*(transp: StreamTransport): Future[void] {. + async: (raw: true, raises: [TransportError, CancelledError]).} = ## Perform graceful shutdown of TCP connection backed by transport ``transp``. doAssert(transp.kind == TransportKind.Socket) let retFuture = newFuture[void]("stream.transport.shutdown") diff --git a/tests/testasyncstream.nim b/tests/testasyncstream.nim index 86b73575..bd0207f8 100644 --- a/tests/testasyncstream.nim +++ b/tests/testasyncstream.nim @@ -87,14 +87,17 @@ suite "AsyncStream test suite": test "AsyncStream(StreamTransport) readExactly() test": proc testReadExactly(): Future[bool] {.async.} = proc serveClient(server: StreamServer, - transp: StreamTransport) {.async.} = - var wstream = newAsyncStreamWriter(transp) - await wstream.write("000000000011111111112222222222") - await wstream.finish() - await wstream.closeWait() - await transp.closeWait() - server.stop() - server.close() + transp: StreamTransport) {.async: (raises: []).} = + try: + var wstream = newAsyncStreamWriter(transp) + await wstream.write("000000000011111111112222222222") + await wstream.finish() + await wstream.closeWait() + await transp.closeWait() + server.stop() + server.close() + except CatchableError as exc: + raiseAssert exc.msg var buffer = newSeq[byte](10) var server = createStreamServer(initTAddress("127.0.0.1:0"), @@ -117,14 +120,17 @@ suite "AsyncStream test suite": test "AsyncStream(StreamTransport) readUntil() test": proc testReadUntil(): Future[bool] {.async.} = proc serveClient(server: StreamServer, - transp: StreamTransport) {.async.} = - var wstream = newAsyncStreamWriter(transp) - await wstream.write("0000000000NNz1111111111NNz2222222222NNz") - await wstream.finish() - await wstream.closeWait() - await transp.closeWait() - server.stop() - server.close() + transp: StreamTransport) {.async: (raises: []).} = + try: + var wstream = newAsyncStreamWriter(transp) + await wstream.write("0000000000NNz1111111111NNz2222222222NNz") + await wstream.finish() + await wstream.closeWait() + await transp.closeWait() + server.stop() + server.close() + except CatchableError as exc: + raiseAssert exc.msg var buffer = newSeq[byte](13) var sep = @[byte('N'), byte('N'), byte('z')] @@ -155,14 +161,17 @@ suite "AsyncStream test suite": test "AsyncStream(StreamTransport) readLine() test": proc testReadLine(): Future[bool] {.async.} = proc serveClient(server: StreamServer, - transp: StreamTransport) {.async.} = - var wstream = newAsyncStreamWriter(transp) - await wstream.write("0000000000\r\n1111111111\r\n2222222222\r\n") - await wstream.finish() - await wstream.closeWait() - await transp.closeWait() - server.stop() - server.close() + transp: StreamTransport) {.async: (raises: []).} = + try: + var wstream = newAsyncStreamWriter(transp) + await wstream.write("0000000000\r\n1111111111\r\n2222222222\r\n") + await wstream.finish() + await wstream.closeWait() + await transp.closeWait() + server.stop() + server.close() + except CatchableError as exc: + raiseAssert exc.msg var server = createStreamServer(initTAddress("127.0.0.1:0"), serveClient, {ReuseAddr}) @@ -184,14 +193,17 @@ suite "AsyncStream test suite": test "AsyncStream(StreamTransport) read() test": proc testRead(): Future[bool] {.async.} = proc serveClient(server: StreamServer, - transp: StreamTransport) {.async.} = - var wstream = newAsyncStreamWriter(transp) - await wstream.write("000000000011111111112222222222") - await wstream.finish() - await wstream.closeWait() - await transp.closeWait() - server.stop() - server.close() + transp: StreamTransport) {.async: (raises: []).} = + try: + var wstream = newAsyncStreamWriter(transp) + await wstream.write("000000000011111111112222222222") + await wstream.finish() + await wstream.closeWait() + await transp.closeWait() + server.stop() + server.close() + except CatchableError as exc: + raiseAssert exc.msg var server = createStreamServer(initTAddress("127.0.0.1:0"), serveClient, {ReuseAddr}) @@ -211,14 +223,17 @@ suite "AsyncStream test suite": test "AsyncStream(StreamTransport) consume() test": proc testConsume(): Future[bool] {.async.} = proc serveClient(server: StreamServer, - transp: StreamTransport) {.async.} = - var wstream = newAsyncStreamWriter(transp) - await wstream.write("0000000000111111111122222222223333333333") - await wstream.finish() - await wstream.closeWait() - await transp.closeWait() - server.stop() - server.close() + transp: StreamTransport) {.async: (raises: []).} = + try: + var wstream = newAsyncStreamWriter(transp) + await wstream.write("0000000000111111111122222222223333333333") + await wstream.finish() + await wstream.closeWait() + await transp.closeWait() + server.stop() + server.close() + except CatchableError as exc: + raiseAssert exc.msg var server = createStreamServer(initTAddress("127.0.0.1:0"), serveClient, {ReuseAddr}) @@ -247,26 +262,29 @@ suite "AsyncStream test suite": test "AsyncStream(AsyncStream) readExactly() test": proc testReadExactly2(): Future[bool] {.async.} = proc serveClient(server: StreamServer, - transp: StreamTransport) {.async.} = - var wstream = newAsyncStreamWriter(transp) - var wstream2 = newChunkedStreamWriter(wstream) - var s1 = "00000" - var s2 = "11111" - var s3 = "22222" - await wstream2.write("00000") - await wstream2.write(addr s1[0], len(s1)) - await wstream2.write("11111") - await wstream2.write(s2.toBytes()) - await wstream2.write("22222") - await wstream2.write(addr s3[0], len(s3)) + transp: StreamTransport) {.async: (raises: []).} = + try: + var wstream = newAsyncStreamWriter(transp) + var wstream2 = newChunkedStreamWriter(wstream) + var s1 = "00000" + var s2 = "11111" + var s3 = "22222" + await wstream2.write("00000") + await wstream2.write(addr s1[0], len(s1)) + await wstream2.write("11111") + await wstream2.write(s2.toBytes()) + await wstream2.write("22222") + await wstream2.write(addr s3[0], len(s3)) - await wstream2.finish() - await wstream.finish() - await wstream2.closeWait() - await wstream.closeWait() - await transp.closeWait() - server.stop() - server.close() + await wstream2.finish() + await wstream.finish() + await wstream2.closeWait() + await wstream.closeWait() + await transp.closeWait() + server.stop() + server.close() + except CatchableError as exc: + raiseAssert exc.msg var buffer = newSeq[byte](10) var server = createStreamServer(initTAddress("127.0.0.1:0"), @@ -299,25 +317,28 @@ suite "AsyncStream test suite": test "AsyncStream(AsyncStream) readUntil() test": proc testReadUntil2(): Future[bool] {.async.} = proc serveClient(server: StreamServer, - transp: StreamTransport) {.async.} = - var wstream = newAsyncStreamWriter(transp) - var wstream2 = newChunkedStreamWriter(wstream) - var s1 = "00000NNz" - var s2 = "11111NNz" - var s3 = "22222NNz" - await wstream2.write("00000") - await wstream2.write(addr s1[0], len(s1)) - await wstream2.write("11111") - await wstream2.write(s2) - await wstream2.write("22222") - await wstream2.write(s3.toBytes()) - await wstream2.finish() - await wstream.finish() - await wstream2.closeWait() - await wstream.closeWait() - await transp.closeWait() - server.stop() - server.close() + transp: StreamTransport) {.async: (raises: []).} = + try: + var wstream = newAsyncStreamWriter(transp) + var wstream2 = newChunkedStreamWriter(wstream) + var s1 = "00000NNz" + var s2 = "11111NNz" + var s3 = "22222NNz" + await wstream2.write("00000") + await wstream2.write(addr s1[0], len(s1)) + await wstream2.write("11111") + await wstream2.write(s2) + await wstream2.write("22222") + await wstream2.write(s3.toBytes()) + await wstream2.finish() + await wstream.finish() + await wstream2.closeWait() + await wstream.closeWait() + await transp.closeWait() + server.stop() + server.close() + except CatchableError as exc: + raiseAssert exc.msg var buffer = newSeq[byte](13) var sep = @[byte('N'), byte('N'), byte('z')] @@ -358,22 +379,25 @@ suite "AsyncStream test suite": test "AsyncStream(AsyncStream) readLine() test": proc testReadLine2(): Future[bool] {.async.} = proc serveClient(server: StreamServer, - transp: StreamTransport) {.async.} = - var wstream = newAsyncStreamWriter(transp) - var wstream2 = newChunkedStreamWriter(wstream) - await wstream2.write("00000") - await wstream2.write("00000\r\n") - await wstream2.write("11111") - await wstream2.write("11111\r\n") - await wstream2.write("22222") - await wstream2.write("22222\r\n") - await wstream2.finish() - await wstream.finish() - await wstream2.closeWait() - await wstream.closeWait() - await transp.closeWait() - server.stop() - server.close() + transp: StreamTransport) {.async: (raises: []).} = + try: + var wstream = newAsyncStreamWriter(transp) + var wstream2 = newChunkedStreamWriter(wstream) + await wstream2.write("00000") + await wstream2.write("00000\r\n") + await wstream2.write("11111") + await wstream2.write("11111\r\n") + await wstream2.write("22222") + await wstream2.write("22222\r\n") + await wstream2.finish() + await wstream.finish() + await wstream2.closeWait() + await wstream.closeWait() + await transp.closeWait() + server.stop() + server.close() + except CatchableError as exc: + raiseAssert exc.msg var server = createStreamServer(initTAddress("127.0.0.1:0"), serveClient, {ReuseAddr}) @@ -405,21 +429,24 @@ suite "AsyncStream test suite": test "AsyncStream(AsyncStream) read() test": proc testRead2(): Future[bool] {.async.} = proc serveClient(server: StreamServer, - transp: StreamTransport) {.async.} = - var wstream = newAsyncStreamWriter(transp) - var wstream2 = newChunkedStreamWriter(wstream) - var s2 = "1111111111" - var s3 = "2222222222" - await wstream2.write("0000000000") - await wstream2.write(s2) - await wstream2.write(s3.toBytes()) - await wstream2.finish() - await wstream.finish() - await wstream2.closeWait() - await wstream.closeWait() - await transp.closeWait() - server.stop() - server.close() + transp: StreamTransport) {.async: (raises: []).} = + try: + var wstream = newAsyncStreamWriter(transp) + var wstream2 = newChunkedStreamWriter(wstream) + var s2 = "1111111111" + var s3 = "2222222222" + await wstream2.write("0000000000") + await wstream2.write(s2) + await wstream2.write(s3.toBytes()) + await wstream2.finish() + await wstream.finish() + await wstream2.closeWait() + await wstream.closeWait() + await transp.closeWait() + server.stop() + server.close() + except CatchableError as exc: + raiseAssert exc.msg var server = createStreamServer(initTAddress("127.0.0.1:0"), serveClient, {ReuseAddr}) @@ -446,31 +473,34 @@ suite "AsyncStream test suite": test "AsyncStream(AsyncStream) consume() test": proc testConsume2(): Future[bool] {.async.} = proc serveClient(server: StreamServer, - transp: StreamTransport) {.async.} = - const - S4 = @[byte('3'), byte('3'), byte('3'), byte('3'), byte('3')] - var wstream = newAsyncStreamWriter(transp) - var wstream2 = newChunkedStreamWriter(wstream) + transp: StreamTransport) {.async: (raises: []).} = + try: + const + S4 = @[byte('3'), byte('3'), byte('3'), byte('3'), byte('3')] + var wstream = newAsyncStreamWriter(transp) + var wstream2 = newChunkedStreamWriter(wstream) - var s1 = "00000" - var s2 = "11111".toBytes() - var s3 = "22222" + var s1 = "00000" + var s2 = "11111".toBytes() + var s3 = "22222" - await wstream2.write("00000") - await wstream2.write(s1) - await wstream2.write("11111") - await wstream2.write(s2) - await wstream2.write("22222") - await wstream2.write(addr s3[0], len(s3)) - await wstream2.write("33333") - await wstream2.write(S4) - await wstream2.finish() - await wstream.finish() - await wstream2.closeWait() - await wstream.closeWait() - await transp.closeWait() - server.stop() - server.close() + await wstream2.write("00000") + await wstream2.write(s1) + await wstream2.write("11111") + await wstream2.write(s2) + await wstream2.write("22222") + await wstream2.write(addr s3[0], len(s3)) + await wstream2.write("33333") + await wstream2.write(S4) + await wstream2.finish() + await wstream.finish() + await wstream2.closeWait() + await wstream.closeWait() + await transp.closeWait() + server.stop() + server.close() + except CatchableError as exc: + raiseAssert exc.msg var server = createStreamServer(initTAddress("127.0.0.1:0"), serveClient, {ReuseAddr}) @@ -511,27 +541,30 @@ suite "AsyncStream test suite": message = createBigMessage("ABCDEFGHIJKLMNOP", size) proc processClient(server: StreamServer, - transp: StreamTransport) {.async.} = - var wstream = newAsyncStreamWriter(transp) - var wbstream = newBoundedStreamWriter(wstream, uint64(size)) + transp: StreamTransport) {.async: (raises: []).} = try: - check wbstream.atEof() == false - await wbstream.write(message) - check wbstream.atEof() == false - await wbstream.finish() - check wbstream.atEof() == true - expect AsyncStreamWriteEOFError: + var wstream = newAsyncStreamWriter(transp) + var wbstream = newBoundedStreamWriter(wstream, uint64(size)) + try: + check wbstream.atEof() == false await wbstream.write(message) - expect AsyncStreamWriteEOFError: - await wbstream.write(message) - expect AsyncStreamWriteEOFError: - await wbstream.write(message) - check wbstream.atEof() == true - await wbstream.closeWait() - check wbstream.atEof() == true - finally: - await wstream.closeWait() - await transp.closeWait() + check wbstream.atEof() == false + await wbstream.finish() + check wbstream.atEof() == true + expect AsyncStreamWriteEOFError: + await wbstream.write(message) + expect AsyncStreamWriteEOFError: + await wbstream.write(message) + expect AsyncStreamWriteEOFError: + await wbstream.write(message) + check wbstream.atEof() == true + await wbstream.closeWait() + check wbstream.atEof() == true + finally: + await wstream.closeWait() + await transp.closeWait() + except CatchableError as exc: + raiseAssert exc.msg let flags = {ServerFlags.ReuseAddr, ServerFlags.TcpNoDelay} var server = createStreamServer(initTAddress("127.0.0.1:0"), @@ -580,15 +613,18 @@ suite "ChunkedStream test suite": ] proc checkVector(inputstr: string): Future[string] {.async.} = proc serveClient(server: StreamServer, - transp: StreamTransport) {.async.} = - var wstream = newAsyncStreamWriter(transp) - var data = inputstr - await wstream.write(data) - await wstream.finish() - await wstream.closeWait() - await transp.closeWait() - server.stop() - server.close() + transp: StreamTransport) {.async: (raises: []).} = + try: + var wstream = newAsyncStreamWriter(transp) + var data = inputstr + await wstream.write(data) + await wstream.finish() + await wstream.closeWait() + await transp.closeWait() + server.stop() + server.close() + except CatchableError as exc: + raiseAssert exc.msg var server = createStreamServer(initTAddress("127.0.0.1:0"), serveClient, {ReuseAddr}) @@ -630,15 +666,18 @@ suite "ChunkedStream test suite": ] proc checkVector(inputstr: string): Future[bool] {.async.} = proc serveClient(server: StreamServer, - transp: StreamTransport) {.async.} = - var wstream = newAsyncStreamWriter(transp) - var data = inputstr - await wstream.write(data) - await wstream.finish() - await wstream.closeWait() - await transp.closeWait() - server.stop() - server.close() + transp: StreamTransport) {.async: (raises: []).} = + try: + var wstream = newAsyncStreamWriter(transp) + var data = inputstr + await wstream.write(data) + await wstream.finish() + await wstream.closeWait() + await transp.closeWait() + server.stop() + server.close() + except CatchableError as exc: + raiseAssert exc.msg var res = false var server = createStreamServer(initTAddress("127.0.0.1:0"), @@ -713,14 +752,17 @@ suite "ChunkedStream test suite": test "ChunkedStream too big chunk header test": proc checkTooBigChunkHeader(inputstr: seq[byte]): Future[bool] {.async.} = proc serveClient(server: StreamServer, - transp: StreamTransport) {.async.} = - var wstream = newAsyncStreamWriter(transp) - await wstream.write(inputstr) - await wstream.finish() - await wstream.closeWait() - await transp.closeWait() - server.stop() - server.close() + transp: StreamTransport) {.async: (raises: []).} = + try: + var wstream = newAsyncStreamWriter(transp) + await wstream.write(inputstr) + await wstream.finish() + await wstream.closeWait() + await transp.closeWait() + server.stop() + server.close() + except CatchableError as exc: + raiseAssert exc.msg var server = createStreamServer(initTAddress("127.0.0.1:0"), serveClient, {ReuseAddr}) @@ -751,23 +793,26 @@ suite "ChunkedStream test suite": proc checkVector(inputstr: seq[byte], chunkSize: int): Future[seq[byte]] {.async.} = proc serveClient(server: StreamServer, - transp: StreamTransport) {.async.} = - var wstream = newAsyncStreamWriter(transp) - var wstream2 = newChunkedStreamWriter(wstream) - var data = inputstr - var offset = 0 - while true: - if len(data) == offset: - break - let toWrite = min(chunkSize, len(data) - offset) - await wstream2.write(addr data[offset], toWrite) - offset = offset + toWrite - await wstream2.finish() - await wstream2.closeWait() - await wstream.closeWait() - await transp.closeWait() - server.stop() - server.close() + transp: StreamTransport) {.async: (raises: []).} = + try: + var wstream = newAsyncStreamWriter(transp) + var wstream2 = newChunkedStreamWriter(wstream) + var data = inputstr + var offset = 0 + while true: + if len(data) == offset: + break + let toWrite = min(chunkSize, len(data) - offset) + await wstream2.write(addr data[offset], toWrite) + offset = offset + toWrite + await wstream2.finish() + await wstream2.closeWait() + await wstream.closeWait() + await transp.closeWait() + server.stop() + server.close() + except CatchableError as exc: + raiseAssert exc.msg var server = createStreamServer(initTAddress("127.0.0.1:0"), serveClient, {ReuseAddr}) @@ -796,23 +841,26 @@ suite "ChunkedStream test suite": writeChunkSize: int, readChunkSize: int): Future[seq[byte]] {.async.} = proc serveClient(server: StreamServer, - transp: StreamTransport) {.async.} = - var wstream = newAsyncStreamWriter(transp) - var wstream2 = newChunkedStreamWriter(wstream) - var data = inputstr - var offset = 0 - while true: - if len(data) == offset: - break - let toWrite = min(writeChunkSize, len(data) - offset) - await wstream2.write(addr data[offset], toWrite) - offset = offset + toWrite - await wstream2.finish() - await wstream2.closeWait() - await wstream.closeWait() - await transp.closeWait() - server.stop() - server.close() + transp: StreamTransport) {.async: (raises: []).} = + try: + var wstream = newAsyncStreamWriter(transp) + var wstream2 = newChunkedStreamWriter(wstream) + var data = inputstr + var offset = 0 + while true: + if len(data) == offset: + break + let toWrite = min(writeChunkSize, len(data) - offset) + await wstream2.write(addr data[offset], toWrite) + offset = offset + toWrite + await wstream2.finish() + await wstream2.closeWait() + await wstream.closeWait() + await transp.closeWait() + server.stop() + server.close() + except CatchableError as exc: + raiseAssert exc.msg var server = createStreamServer(initTAddress("127.0.0.1:0"), serveClient, {ReuseAddr}) @@ -849,30 +897,33 @@ suite "TLSStream test suite": const HttpHeadersMark = @[byte(0x0D), byte(0x0A), byte(0x0D), byte(0x0A)] test "Simple HTTPS connection": proc headerClient(address: TransportAddress, - name: string): Future[bool] {.async.} = - var mark = "HTTP/1.1 " - var buffer = newSeq[byte](8192) - var transp = await connect(address) - var reader = newAsyncStreamReader(transp) - var writer = newAsyncStreamWriter(transp) - var tlsstream = newTLSClientAsyncStream(reader, writer, name) - await tlsstream.writer.write("GET / HTTP/1.1\r\nHost: " & name & - "\r\nConnection: close\r\n\r\n") - var readFut = tlsstream.reader.readUntil(addr buffer[0], len(buffer), - HttpHeadersMark) - let res = await withTimeout(readFut, 5.seconds) - if res: - var length = readFut.read() - buffer.setLen(length) - if len(buffer) > len(mark): - if equalMem(addr buffer[0], addr mark[0], len(mark)): - result = true + name: string): Future[bool] {.async: (raises: []).} = + try: + var mark = "HTTP/1.1 " + var buffer = newSeq[byte](8192) + var transp = await connect(address) + var reader = newAsyncStreamReader(transp) + var writer = newAsyncStreamWriter(transp) + var tlsstream = newTLSClientAsyncStream(reader, writer, name) + await tlsstream.writer.write("GET / HTTP/1.1\r\nHost: " & name & + "\r\nConnection: close\r\n\r\n") + var readFut = tlsstream.reader.readUntil(addr buffer[0], len(buffer), + HttpHeadersMark) + let res = await withTimeout(readFut, 5.seconds) + if res: + var length = readFut.read() + buffer.setLen(length) + if len(buffer) > len(mark): + if equalMem(addr buffer[0], addr mark[0], len(mark)): + result = true - await tlsstream.reader.closeWait() - await tlsstream.writer.closeWait() - await reader.closeWait() - await writer.closeWait() - await transp.closeWait() + await tlsstream.reader.closeWait() + await tlsstream.writer.closeWait() + await reader.closeWait() + await writer.closeWait() + await transp.closeWait() + except CatchableError as exc: + raiseAssert exc.msg let res = waitFor(headerClient(resolveTAddress("www.google.com:443")[0], "www.google.com")) @@ -884,20 +935,23 @@ suite "TLSStream test suite": let testMessage = "TEST MESSAGE" proc serveClient(server: StreamServer, - transp: StreamTransport) {.async.} = - var reader = newAsyncStreamReader(transp) - var writer = newAsyncStreamWriter(transp) - var sstream = newTLSServerAsyncStream(reader, writer, key, cert) - await handshake(sstream) - await sstream.writer.write(testMessage & "\r\n") - await sstream.writer.finish() - await sstream.writer.closeWait() - await sstream.reader.closeWait() - await reader.closeWait() - await writer.closeWait() - await transp.closeWait() - server.stop() - server.close() + transp: StreamTransport) {.async: (raises: []).} = + try: + var reader = newAsyncStreamReader(transp) + var writer = newAsyncStreamWriter(transp) + var sstream = newTLSServerAsyncStream(reader, writer, key, cert) + await handshake(sstream) + await sstream.writer.write(testMessage & "\r\n") + await sstream.writer.finish() + await sstream.writer.closeWait() + await sstream.reader.closeWait() + await reader.closeWait() + await writer.closeWait() + await transp.closeWait() + server.stop() + server.close() + except CatchableError as exc: + raiseAssert exc.msg key = TLSPrivateKey.init(pemkey) cert = TLSCertificate.init(pemcert) @@ -931,20 +985,23 @@ suite "TLSStream test suite": let trustAnchors = TrustAnchorStore.new(SelfSignedTrustAnchors) proc serveClient(server: StreamServer, - transp: StreamTransport) {.async.} = - var reader = newAsyncStreamReader(transp) - var writer = newAsyncStreamWriter(transp) - var sstream = newTLSServerAsyncStream(reader, writer, key, cert) - await handshake(sstream) - await sstream.writer.write(testMessage & "\r\n") - await sstream.writer.finish() - await sstream.writer.closeWait() - await sstream.reader.closeWait() - await reader.closeWait() - await writer.closeWait() - await transp.closeWait() - server.stop() - server.close() + transp: StreamTransport) {.async: (raises: []).} = + try: + var reader = newAsyncStreamReader(transp) + var writer = newAsyncStreamWriter(transp) + var sstream = newTLSServerAsyncStream(reader, writer, key, cert) + await handshake(sstream) + await sstream.writer.write(testMessage & "\r\n") + await sstream.writer.finish() + await sstream.writer.closeWait() + await sstream.reader.closeWait() + await reader.closeWait() + await writer.closeWait() + await transp.closeWait() + server.stop() + server.close() + except CatchableError as exc: + raiseAssert exc.msg var server = createStreamServer(initTAddress("127.0.0.1:0"), serveClient, {ReuseAddr}) @@ -988,46 +1045,49 @@ suite "BoundedStream test suite": var clientRes = false proc processClient(server: StreamServer, - transp: StreamTransport) {.async.} = - var wstream = newAsyncStreamWriter(transp) - case btest - of BoundaryRead: - await wstream.write(message) - await wstream.write(boundary) - await wstream.finish() - await wstream.closeWait() - clientRes = true - of BoundaryDouble: - await wstream.write(message) - await wstream.write(boundary) - await wstream.write(message) - await wstream.finish() - await wstream.closeWait() - clientRes = true - of BoundarySize: - var ncmessage = message - ncmessage.setLen(len(message) - 2) - await wstream.write(ncmessage) - await wstream.write(@[0x2D'u8, 0x2D'u8]) - await wstream.finish() - await wstream.closeWait() - clientRes = true - of BoundaryIncomplete: - var ncmessage = message - ncmessage.setLen(len(message) - 2) - await wstream.write(ncmessage) - await wstream.finish() - await wstream.closeWait() - clientRes = true - of BoundaryEmpty: - await wstream.write(boundary) - await wstream.finish() - await wstream.closeWait() - clientRes = true + transp: StreamTransport) {.async: (raises: []).} = + try: + var wstream = newAsyncStreamWriter(transp) + case btest + of BoundaryRead: + await wstream.write(message) + await wstream.write(boundary) + await wstream.finish() + await wstream.closeWait() + clientRes = true + of BoundaryDouble: + await wstream.write(message) + await wstream.write(boundary) + await wstream.write(message) + await wstream.finish() + await wstream.closeWait() + clientRes = true + of BoundarySize: + var ncmessage = message + ncmessage.setLen(len(message) - 2) + await wstream.write(ncmessage) + await wstream.write(@[0x2D'u8, 0x2D'u8]) + await wstream.finish() + await wstream.closeWait() + clientRes = true + of BoundaryIncomplete: + var ncmessage = message + ncmessage.setLen(len(message) - 2) + await wstream.write(ncmessage) + await wstream.finish() + await wstream.closeWait() + clientRes = true + of BoundaryEmpty: + await wstream.write(boundary) + await wstream.finish() + await wstream.closeWait() + clientRes = true - await transp.closeWait() - server.stop() - server.close() + await transp.closeWait() + server.stop() + server.close() + except CatchableError as exc: + raiseAssert exc.msg var res = false let flags = {ServerFlags.ReuseAddr, ServerFlags.TcpNoDelay} @@ -1090,60 +1150,63 @@ suite "BoundedStream test suite": message.add(messagePart) proc processClient(server: StreamServer, - transp: StreamTransport) {.async.} = - var wstream = newAsyncStreamWriter(transp) - var wbstream = newBoundedStreamWriter(wstream, uint64(size), - comparison = cmp) - case stest - of SizeReadWrite: - for i in 0 ..< 10: - await wbstream.write(messagePart) - await wbstream.finish() - await wbstream.closeWait() - clientRes = true - of SizeOverflow: - for i in 0 ..< 10: - await wbstream.write(messagePart) - try: - await wbstream.write(messagePart) - except BoundedStreamOverflowError: + transp: StreamTransport) {.async: (raises: []).} = + try: + var wstream = newAsyncStreamWriter(transp) + var wbstream = newBoundedStreamWriter(wstream, uint64(size), + comparison = cmp) + case stest + of SizeReadWrite: + for i in 0 ..< 10: + await wbstream.write(messagePart) + await wbstream.finish() + await wbstream.closeWait() clientRes = true - await wbstream.closeWait() - of SizeIncomplete: - for i in 0 ..< 9: - await wbstream.write(messagePart) - case cmp - of BoundCmp.Equal: + of SizeOverflow: + for i in 0 ..< 10: + await wbstream.write(messagePart) try: - await wbstream.finish() - except BoundedStreamIncompleteError: + await wbstream.write(messagePart) + except BoundedStreamOverflowError: clientRes = true - of BoundCmp.LessOrEqual: - try: - await wbstream.finish() - clientRes = true - except BoundedStreamIncompleteError: - discard - await wbstream.closeWait() - of SizeEmpty: - case cmp - of BoundCmp.Equal: - try: - await wbstream.finish() - except BoundedStreamIncompleteError: - clientRes = true - of BoundCmp.LessOrEqual: - try: - await wbstream.finish() - clientRes = true - except BoundedStreamIncompleteError: - discard - await wbstream.closeWait() + await wbstream.closeWait() + of SizeIncomplete: + for i in 0 ..< 9: + await wbstream.write(messagePart) + case cmp + of BoundCmp.Equal: + try: + await wbstream.finish() + except BoundedStreamIncompleteError: + clientRes = true + of BoundCmp.LessOrEqual: + try: + await wbstream.finish() + clientRes = true + except BoundedStreamIncompleteError: + discard + await wbstream.closeWait() + of SizeEmpty: + case cmp + of BoundCmp.Equal: + try: + await wbstream.finish() + except BoundedStreamIncompleteError: + clientRes = true + of BoundCmp.LessOrEqual: + try: + await wbstream.finish() + clientRes = true + except BoundedStreamIncompleteError: + discard + await wbstream.closeWait() - await wstream.closeWait() - await transp.closeWait() - server.stop() - server.close() + await wstream.closeWait() + await transp.closeWait() + server.stop() + server.close() + except CatchableError as exc: + raiseAssert exc.msg let flags = {ServerFlags.ReuseAddr, ServerFlags.TcpNoDelay} var server = createStreamServer(initTAddress("127.0.0.1:0"), @@ -1243,23 +1306,26 @@ suite "BoundedStream test suite": writeChunkSize: int, readChunkSize: int): Future[seq[byte]] {.async.} = proc serveClient(server: StreamServer, - transp: StreamTransport) {.async.} = - var wstream = newAsyncStreamWriter(transp) - var wstream2 = newBoundedStreamWriter(wstream, uint64(len(inputstr))) - var data = inputstr - var offset = 0 - while true: - if len(data) == offset: - break - let toWrite = min(writeChunkSize, len(data) - offset) - await wstream2.write(addr data[offset], toWrite) - offset = offset + toWrite - await wstream2.finish() - await wstream2.closeWait() - await wstream.closeWait() - await transp.closeWait() - server.stop() - server.close() + transp: StreamTransport) {.async: (raises: []).} = + try: + var wstream = newAsyncStreamWriter(transp) + var wstream2 = newBoundedStreamWriter(wstream, uint64(len(inputstr))) + var data = inputstr + var offset = 0 + while true: + if len(data) == offset: + break + let toWrite = min(writeChunkSize, len(data) - offset) + await wstream2.write(addr data[offset], toWrite) + offset = offset + toWrite + await wstream2.finish() + await wstream2.closeWait() + await wstream.closeWait() + await transp.closeWait() + server.stop() + server.close() + except CatchableError as exc: + raiseAssert exc.msg var server = createStreamServer(initTAddress("127.0.0.1:0"), serveClient, {ReuseAddr}) @@ -1293,17 +1359,20 @@ suite "BoundedStream test suite": proc checkEmptyStreams(): Future[bool] {.async.} = var writer1Res = false proc serveClient(server: StreamServer, - transp: StreamTransport) {.async.} = - var wstream = newAsyncStreamWriter(transp) - var wstream2 = newBoundedStreamWriter(wstream, 0'u64) - await wstream2.finish() - let res = wstream2.atEof() - await wstream2.closeWait() - await wstream.closeWait() - await transp.closeWait() - server.stop() - server.close() - writer1Res = res + transp: StreamTransport) {.async: (raises: []).} = + try: + var wstream = newAsyncStreamWriter(transp) + var wstream2 = newBoundedStreamWriter(wstream, 0'u64) + await wstream2.finish() + let res = wstream2.atEof() + await wstream2.closeWait() + await wstream.closeWait() + await transp.closeWait() + server.stop() + server.close() + writer1Res = res + except CatchableError as exc: + raiseAssert exc.msg var server = createStreamServer(initTAddress("127.0.0.1:0"), serveClient, {ReuseAddr}) diff --git a/tests/testbugs.nim b/tests/testbugs.nim index 1f2a932d..fc4af3a4 100644 --- a/tests/testbugs.nim +++ b/tests/testbugs.nim @@ -21,16 +21,19 @@ suite "Asynchronous issues test suite": test: string proc udp4DataAvailable(transp: DatagramTransport, - remote: TransportAddress) {.async, gcsafe.} = - var udata = getUserData[CustomData](transp) - var expect = TEST_MSG - var data: seq[byte] - var datalen: int - transp.peekMessage(data, datalen) - if udata.test == "CHECK" and datalen == MSG_LEN and - equalMem(addr data[0], addr expect[0], datalen): - udata.test = "OK" - transp.close() + remote: TransportAddress) {.async: (raises: []).} = + try: + var udata = getUserData[CustomData](transp) + var expect = TEST_MSG + var data: seq[byte] + var datalen: int + transp.peekMessage(data, datalen) + if udata.test == "CHECK" and datalen == MSG_LEN and + equalMem(addr data[0], addr expect[0], datalen): + udata.test = "OK" + transp.close() + except CatchableError as exc: + raiseAssert exc.msg proc issue6(): Future[bool] {.async.} = var myself = initTAddress("127.0.0.1:" & $HELLO_PORT) diff --git a/tests/testdatagram.nim b/tests/testdatagram.nim index c941761a..bd33ef36 100644 --- a/tests/testdatagram.nim +++ b/tests/testdatagram.nim @@ -30,286 +30,319 @@ suite "Datagram Transport test suite": " clients x " & $MessagesCount & " messages)" proc client1(transp: DatagramTransport, - raddr: TransportAddress): Future[void] {.async.} = - var pbytes = transp.getMessage() - var nbytes = len(pbytes) - if nbytes > 0: - var data = newString(nbytes + 1) - copyMem(addr data[0], addr pbytes[0], nbytes) - data.setLen(nbytes) - if data.startsWith("REQUEST"): - var numstr = data[7..^1] - var num = parseInt(numstr) - var ans = "ANSWER" & $num - await transp.sendTo(raddr, addr ans[0], len(ans)) + raddr: TransportAddress): Future[void] {.async: (raises: []).} = + try: + var pbytes = transp.getMessage() + var nbytes = len(pbytes) + if nbytes > 0: + var data = newString(nbytes + 1) + copyMem(addr data[0], addr pbytes[0], nbytes) + data.setLen(nbytes) + if data.startsWith("REQUEST"): + var numstr = data[7..^1] + var num = parseInt(numstr) + var ans = "ANSWER" & $num + await transp.sendTo(raddr, addr ans[0], len(ans)) + else: + var err = "ERROR" + await transp.sendTo(raddr, addr err[0], len(err)) else: - var err = "ERROR" - await transp.sendTo(raddr, addr err[0], len(err)) - else: - var counterPtr = cast[ptr int](transp.udata) - counterPtr[] = -1 - transp.close() + var counterPtr = cast[ptr int](transp.udata) + counterPtr[] = -1 + transp.close() + except CatchableError as exc: + raiseAssert exc.msg proc client2(transp: DatagramTransport, - raddr: TransportAddress): Future[void] {.async.} = - var pbytes = transp.getMessage() - var nbytes = len(pbytes) - if nbytes > 0: - var data = newString(nbytes + 1) - copyMem(addr data[0], addr pbytes[0], nbytes) - data.setLen(nbytes) - if data.startsWith("ANSWER"): - var counterPtr = cast[ptr int](transp.udata) - counterPtr[] = counterPtr[] + 1 - if counterPtr[] == TestsCount: - transp.close() + raddr: TransportAddress): Future[void] {.async: (raises: []).} = + try: + var pbytes = transp.getMessage() + var nbytes = len(pbytes) + if nbytes > 0: + var data = newString(nbytes + 1) + copyMem(addr data[0], addr pbytes[0], nbytes) + data.setLen(nbytes) + if data.startsWith("ANSWER"): + var counterPtr = cast[ptr int](transp.udata) + counterPtr[] = counterPtr[] + 1 + if counterPtr[] == TestsCount: + transp.close() + else: + var ta = initTAddress("127.0.0.1:33336") + var req = "REQUEST" & $counterPtr[] + await transp.sendTo(ta, addr req[0], len(req)) else: - var ta = initTAddress("127.0.0.1:33336") - var req = "REQUEST" & $counterPtr[] - await transp.sendTo(ta, addr req[0], len(req)) + var counterPtr = cast[ptr int](transp.udata) + counterPtr[] = -1 + transp.close() else: + ## Read operation failed with error var counterPtr = cast[ptr int](transp.udata) counterPtr[] = -1 transp.close() - else: - ## Read operation failed with error - var counterPtr = cast[ptr int](transp.udata) - counterPtr[] = -1 - transp.close() + except CatchableError as exc: + raiseAssert exc.msg proc client3(transp: DatagramTransport, - raddr: TransportAddress): Future[void] {.async.} = - var pbytes = transp.getMessage() - var nbytes = len(pbytes) - if nbytes > 0: - var data = newString(nbytes + 1) - copyMem(addr data[0], addr pbytes[0], nbytes) - data.setLen(nbytes) - if data.startsWith("ANSWER"): - var counterPtr = cast[ptr int](transp.udata) - counterPtr[] = counterPtr[] + 1 - if counterPtr[] == TestsCount: - transp.close() + raddr: TransportAddress): Future[void] {.async: (raises: []).} = + try: + var pbytes = transp.getMessage() + var nbytes = len(pbytes) + if nbytes > 0: + var data = newString(nbytes + 1) + copyMem(addr data[0], addr pbytes[0], nbytes) + data.setLen(nbytes) + if data.startsWith("ANSWER"): + var counterPtr = cast[ptr int](transp.udata) + counterPtr[] = counterPtr[] + 1 + if counterPtr[] == TestsCount: + transp.close() + else: + var req = "REQUEST" & $counterPtr[] + await transp.send(addr req[0], len(req)) else: - var req = "REQUEST" & $counterPtr[] - await transp.send(addr req[0], len(req)) + var counterPtr = cast[ptr int](transp.udata) + counterPtr[] = -1 + transp.close() else: + ## Read operation failed with error var counterPtr = cast[ptr int](transp.udata) counterPtr[] = -1 transp.close() - else: - ## Read operation failed with error - var counterPtr = cast[ptr int](transp.udata) - counterPtr[] = -1 - transp.close() + except CatchableError as exc: + raiseAssert exc.msg proc client4(transp: DatagramTransport, - raddr: TransportAddress): Future[void] {.async.} = - var pbytes = transp.getMessage() - var nbytes = len(pbytes) - if nbytes > 0: - var data = newString(nbytes + 1) - copyMem(addr data[0], addr pbytes[0], nbytes) - data.setLen(nbytes) - if data.startsWith("ANSWER"): - var counterPtr = cast[ptr int](transp.udata) - counterPtr[] = counterPtr[] + 1 - if counterPtr[] == MessagesCount: - transp.close() + raddr: TransportAddress): Future[void] {.async: (raises: []).} = + try: + var pbytes = transp.getMessage() + var nbytes = len(pbytes) + if nbytes > 0: + var data = newString(nbytes + 1) + copyMem(addr data[0], addr pbytes[0], nbytes) + data.setLen(nbytes) + if data.startsWith("ANSWER"): + var counterPtr = cast[ptr int](transp.udata) + counterPtr[] = counterPtr[] + 1 + if counterPtr[] == MessagesCount: + transp.close() + else: + var req = "REQUEST" & $counterPtr[] + await transp.send(addr req[0], len(req)) else: - var req = "REQUEST" & $counterPtr[] - await transp.send(addr req[0], len(req)) + var counterPtr = cast[ptr int](transp.udata) + counterPtr[] = -1 + transp.close() else: + ## Read operation failed with error var counterPtr = cast[ptr int](transp.udata) counterPtr[] = -1 transp.close() - else: - ## Read operation failed with error - var counterPtr = cast[ptr int](transp.udata) - counterPtr[] = -1 - transp.close() + except CatchableError as exc: + raiseAssert exc.msg proc client5(transp: DatagramTransport, - raddr: TransportAddress): Future[void] {.async.} = - var pbytes = transp.getMessage() - var nbytes = len(pbytes) - if nbytes > 0: - var data = newString(nbytes + 1) - copyMem(addr data[0], addr pbytes[0], nbytes) - data.setLen(nbytes) - if data.startsWith("ANSWER"): - var counterPtr = cast[ptr int](transp.udata) - counterPtr[] = counterPtr[] + 1 - if counterPtr[] == MessagesCount: - transp.close() + raddr: TransportAddress): Future[void] {.async: (raises: []).} = + try: + var pbytes = transp.getMessage() + var nbytes = len(pbytes) + if nbytes > 0: + var data = newString(nbytes + 1) + copyMem(addr data[0], addr pbytes[0], nbytes) + data.setLen(nbytes) + if data.startsWith("ANSWER"): + var counterPtr = cast[ptr int](transp.udata) + counterPtr[] = counterPtr[] + 1 + if counterPtr[] == MessagesCount: + transp.close() + else: + var req = "REQUEST" & $counterPtr[] + await transp.sendTo(raddr, addr req[0], len(req)) else: - var req = "REQUEST" & $counterPtr[] - await transp.sendTo(raddr, addr req[0], len(req)) + var counterPtr = cast[ptr int](transp.udata) + counterPtr[] = -1 + transp.close() else: + ## Read operation failed with error var counterPtr = cast[ptr int](transp.udata) counterPtr[] = -1 transp.close() - else: - ## Read operation failed with error - var counterPtr = cast[ptr int](transp.udata) - counterPtr[] = -1 - transp.close() + except CatchableError as exc: + raiseAssert exc.msg proc client6(transp: DatagramTransport, - raddr: TransportAddress): Future[void] {.async.} = - var pbytes = transp.getMessage() - var nbytes = len(pbytes) - if nbytes > 0: - var data = newString(nbytes + 1) - copyMem(addr data[0], addr pbytes[0], nbytes) - data.setLen(nbytes) - if data.startsWith("REQUEST"): - var numstr = data[7..^1] - var num = parseInt(numstr) - var ans = "ANSWER" & $num - await transp.sendTo(raddr, ans) + raddr: TransportAddress): Future[void] {.async: (raises: []).} = + try: + var pbytes = transp.getMessage() + var nbytes = len(pbytes) + if nbytes > 0: + var data = newString(nbytes + 1) + copyMem(addr data[0], addr pbytes[0], nbytes) + data.setLen(nbytes) + if data.startsWith("REQUEST"): + var numstr = data[7..^1] + var num = parseInt(numstr) + var ans = "ANSWER" & $num + await transp.sendTo(raddr, ans) + else: + var err = "ERROR" + await transp.sendTo(raddr, err) else: - var err = "ERROR" - await transp.sendTo(raddr, err) - else: - ## Read operation failed with error - var counterPtr = cast[ptr int](transp.udata) - counterPtr[] = -1 - transp.close() + ## Read operation failed with error + var counterPtr = cast[ptr int](transp.udata) + counterPtr[] = -1 + transp.close() + except CatchableError as exc: + raiseAssert exc.msg proc client7(transp: DatagramTransport, - raddr: TransportAddress): Future[void] {.async.} = - var pbytes = transp.getMessage() - var nbytes = len(pbytes) - if nbytes > 0: - var data = newString(nbytes + 1) - copyMem(addr data[0], addr pbytes[0], nbytes) - data.setLen(nbytes) - if data.startsWith("ANSWER"): - var counterPtr = cast[ptr int](transp.udata) - counterPtr[] = counterPtr[] + 1 - if counterPtr[] == TestsCount: - transp.close() + raddr: TransportAddress): Future[void] {.async: (raises: []).} = + try: + var pbytes = transp.getMessage() + var nbytes = len(pbytes) + if nbytes > 0: + var data = newString(nbytes + 1) + copyMem(addr data[0], addr pbytes[0], nbytes) + data.setLen(nbytes) + if data.startsWith("ANSWER"): + var counterPtr = cast[ptr int](transp.udata) + counterPtr[] = counterPtr[] + 1 + if counterPtr[] == TestsCount: + transp.close() + else: + var req = "REQUEST" & $counterPtr[] + await transp.sendTo(raddr, req) else: - var req = "REQUEST" & $counterPtr[] - await transp.sendTo(raddr, req) + var counterPtr = cast[ptr int](transp.udata) + counterPtr[] = -1 + transp.close() else: + ## Read operation failed with error var counterPtr = cast[ptr int](transp.udata) counterPtr[] = -1 transp.close() - else: - ## Read operation failed with error - var counterPtr = cast[ptr int](transp.udata) - counterPtr[] = -1 - transp.close() + except CatchableError as exc: + raiseAssert exc.msg proc client8(transp: DatagramTransport, - raddr: TransportAddress): Future[void] {.async.} = - var pbytes = transp.getMessage() - var nbytes = len(pbytes) - if nbytes > 0: - var data = newString(nbytes + 1) - copyMem(addr data[0], addr pbytes[0], nbytes) - data.setLen(nbytes) - if data.startsWith("ANSWER"): - var counterPtr = cast[ptr int](transp.udata) - counterPtr[] = counterPtr[] + 1 - if counterPtr[] == TestsCount: - transp.close() + raddr: TransportAddress): Future[void] {.async: (raises: []).} = + try: + var pbytes = transp.getMessage() + var nbytes = len(pbytes) + if nbytes > 0: + var data = newString(nbytes + 1) + copyMem(addr data[0], addr pbytes[0], nbytes) + data.setLen(nbytes) + if data.startsWith("ANSWER"): + var counterPtr = cast[ptr int](transp.udata) + counterPtr[] = counterPtr[] + 1 + if counterPtr[] == TestsCount: + transp.close() + else: + var req = "REQUEST" & $counterPtr[] + await transp.send(req) else: - var req = "REQUEST" & $counterPtr[] - await transp.send(req) + var counterPtr = cast[ptr int](transp.udata) + counterPtr[] = -1 + transp.close() else: + ## Read operation failed with error var counterPtr = cast[ptr int](transp.udata) counterPtr[] = -1 transp.close() - else: - ## Read operation failed with error - var counterPtr = cast[ptr int](transp.udata) - counterPtr[] = -1 - transp.close() + except CatchableError as exc: + raiseAssert exc.msg proc client9(transp: DatagramTransport, - raddr: TransportAddress): Future[void] {.async.} = - var pbytes = transp.getMessage() - var nbytes = len(pbytes) - if nbytes > 0: - var data = newString(nbytes + 1) - copyMem(addr data[0], addr pbytes[0], nbytes) - data.setLen(nbytes) - if data.startsWith("REQUEST"): - var numstr = data[7..^1] - var num = parseInt(numstr) - var ans = "ANSWER" & $num - var ansseq = newSeq[byte](len(ans)) - copyMem(addr ansseq[0], addr ans[0], len(ans)) - await transp.sendTo(raddr, ansseq) + raddr: TransportAddress): Future[void] {.async: (raises: []).} = + try: + var pbytes = transp.getMessage() + var nbytes = len(pbytes) + if nbytes > 0: + var data = newString(nbytes + 1) + copyMem(addr data[0], addr pbytes[0], nbytes) + data.setLen(nbytes) + if data.startsWith("REQUEST"): + var numstr = data[7..^1] + var num = parseInt(numstr) + var ans = "ANSWER" & $num + var ansseq = newSeq[byte](len(ans)) + copyMem(addr ansseq[0], addr ans[0], len(ans)) + await transp.sendTo(raddr, ansseq) + else: + var err = "ERROR" + var errseq = newSeq[byte](len(err)) + copyMem(addr errseq[0], addr err[0], len(err)) + await transp.sendTo(raddr, errseq) else: - var err = "ERROR" - var errseq = newSeq[byte](len(err)) - copyMem(addr errseq[0], addr err[0], len(err)) - await transp.sendTo(raddr, errseq) - else: - ## Read operation failed with error - var counterPtr = cast[ptr int](transp.udata) - counterPtr[] = -1 - transp.close() + ## Read operation failed with error + var counterPtr = cast[ptr int](transp.udata) + counterPtr[] = -1 + transp.close() + except CatchableError as exc: + raiseAssert exc.msg proc client10(transp: DatagramTransport, - raddr: TransportAddress): Future[void] {.async.} = - var pbytes = transp.getMessage() - var nbytes = len(pbytes) - if nbytes > 0: - var data = newString(nbytes + 1) - copyMem(addr data[0], addr pbytes[0], nbytes) - data.setLen(nbytes) - if data.startsWith("ANSWER"): - var counterPtr = cast[ptr int](transp.udata) - counterPtr[] = counterPtr[] + 1 - if counterPtr[] == TestsCount: - transp.close() + raddr: TransportAddress): Future[void] {.async: (raises: []).} = + try: + var pbytes = transp.getMessage() + var nbytes = len(pbytes) + if nbytes > 0: + var data = newString(nbytes + 1) + copyMem(addr data[0], addr pbytes[0], nbytes) + data.setLen(nbytes) + if data.startsWith("ANSWER"): + var counterPtr = cast[ptr int](transp.udata) + counterPtr[] = counterPtr[] + 1 + if counterPtr[] == TestsCount: + transp.close() + else: + var req = "REQUEST" & $counterPtr[] + var reqseq = newSeq[byte](len(req)) + copyMem(addr reqseq[0], addr req[0], len(req)) + await transp.sendTo(raddr, reqseq) else: - var req = "REQUEST" & $counterPtr[] - var reqseq = newSeq[byte](len(req)) - copyMem(addr reqseq[0], addr req[0], len(req)) - await transp.sendTo(raddr, reqseq) + var counterPtr = cast[ptr int](transp.udata) + counterPtr[] = -1 + transp.close() else: + ## Read operation failed with error var counterPtr = cast[ptr int](transp.udata) counterPtr[] = -1 transp.close() - else: - ## Read operation failed with error - var counterPtr = cast[ptr int](transp.udata) - counterPtr[] = -1 - transp.close() + except CatchableError as exc: + raiseAssert exc.msg proc client11(transp: DatagramTransport, - raddr: TransportAddress): Future[void] {.async.} = - var pbytes = transp.getMessage() - var nbytes = len(pbytes) - if nbytes > 0: - var data = newString(nbytes + 1) - copyMem(addr data[0], addr pbytes[0], nbytes) - data.setLen(nbytes) - if data.startsWith("ANSWER"): - var counterPtr = cast[ptr int](transp.udata) - counterPtr[] = counterPtr[] + 1 - if counterPtr[] == TestsCount: - transp.close() + raddr: TransportAddress): Future[void] {.async: (raises: []).} = + try: + var pbytes = transp.getMessage() + var nbytes = len(pbytes) + if nbytes > 0: + var data = newString(nbytes + 1) + copyMem(addr data[0], addr pbytes[0], nbytes) + data.setLen(nbytes) + if data.startsWith("ANSWER"): + var counterPtr = cast[ptr int](transp.udata) + counterPtr[] = counterPtr[] + 1 + if counterPtr[] == TestsCount: + transp.close() + else: + var req = "REQUEST" & $counterPtr[] + var reqseq = newSeq[byte](len(req)) + copyMem(addr reqseq[0], addr req[0], len(req)) + await transp.send(reqseq) else: - var req = "REQUEST" & $counterPtr[] - var reqseq = newSeq[byte](len(req)) - copyMem(addr reqseq[0], addr req[0], len(req)) - await transp.send(reqseq) + var counterPtr = cast[ptr int](transp.udata) + counterPtr[] = -1 + transp.close() else: + ## Read operation failed with error var counterPtr = cast[ptr int](transp.udata) counterPtr[] = -1 transp.close() - else: - ## Read operation failed with error - var counterPtr = cast[ptr int](transp.udata) - counterPtr[] = -1 - transp.close() + except CatchableError as exc: + raiseAssert exc.msg proc testPointerSendTo(): Future[int] {.async.} = ## sendTo(pointer) test @@ -439,7 +472,7 @@ suite "Datagram Transport test suite": var ta = initTAddress("127.0.0.1:0") var counter = 0 proc clientMark(transp: DatagramTransport, - raddr: TransportAddress): Future[void] {.async.} = + raddr: TransportAddress): Future[void] {.async: (raises: []).} = counter = 1 transp.close() var dgram1 = newDatagramTransport(client1, local = ta) @@ -457,7 +490,7 @@ suite "Datagram Transport test suite": proc testTransportClose(): Future[bool] {.async.} = var ta = initTAddress("127.0.0.1:45000") proc clientMark(transp: DatagramTransport, - raddr: TransportAddress): Future[void] {.async.} = + raddr: TransportAddress): Future[void] {.async: (raises: []).} = discard var dgram = newDatagramTransport(clientMark, local = ta) dgram.close() @@ -473,12 +506,15 @@ suite "Datagram Transport test suite": var bta = initTAddress("255.255.255.255:45010") var res = 0 proc clientMark(transp: DatagramTransport, - raddr: TransportAddress): Future[void] {.async.} = - var bmsg = transp.getMessage() - var smsg = string.fromBytes(bmsg) - if smsg == expectMessage: - inc(res) - transp.close() + raddr: TransportAddress): Future[void] {.async: (raises: []).} = + try: + var bmsg = transp.getMessage() + var smsg = string.fromBytes(bmsg) + if smsg == expectMessage: + inc(res) + transp.close() + except CatchableError as exc: + raiseAssert exc.msg var dgram1 = newDatagramTransport(clientMark, local = ta1, flags = {Broadcast}, ttl = 2) await dgram1.sendTo(bta, expectMessage) @@ -493,15 +529,19 @@ suite "Datagram Transport test suite": var event = newAsyncEvent() proc clientMark1(transp: DatagramTransport, - raddr: TransportAddress): Future[void] {.async.} = - var bmsg = transp.getMessage() - var smsg = string.fromBytes(bmsg) - if smsg == expectStr: - inc(res) - event.fire() + raddr: TransportAddress): Future[void] {.async: (raises: []).} = + try: + var bmsg = transp.getMessage() + var smsg = string.fromBytes(bmsg) + if smsg == expectStr: + inc(res) + event.fire() + except CatchableError as exc: + raiseAssert exc.msg + proc clientMark2(transp: DatagramTransport, - raddr: TransportAddress): Future[void] {.async.} = + raddr: TransportAddress): Future[void] {.async: (raises: []).} = discard var dgram1 = newDatagramTransport(clientMark1, local = ta) @@ -544,15 +584,18 @@ suite "Datagram Transport test suite": res = 0 proc process1(transp: DatagramTransport, - raddr: TransportAddress): Future[void] {.async.} = - var bmsg = transp.getMessage() - var smsg = string.fromBytes(bmsg) - if smsg == expectStr: - inc(res) - event.fire() + raddr: TransportAddress): Future[void] {.async: (raises: []).} = + try: + var bmsg = transp.getMessage() + var smsg = string.fromBytes(bmsg) + if smsg == expectStr: + inc(res) + event.fire() + except CatchableError as exc: + raiseAssert exc.msg proc process2(transp: DatagramTransport, - raddr: TransportAddress): Future[void] {.async.} = + raddr: TransportAddress): Future[void] {.async: (raises: []).} = discard let diff --git a/tests/testmacro.nim b/tests/testmacro.nim index 13611934..01337937 100644 --- a/tests/testmacro.nim +++ b/tests/testmacro.nim @@ -459,20 +459,31 @@ suite "Exceptions tracking": check waitFor(test1()) == 12 proc test2: Future[int] {.async: (raw: true, raises: [IOError, OSError]).} = + checkNotCompiles: + result.fail(newException(ValueError, "fail")) + result = newFuture[int]() result.fail(newException(IOError, "fail")) - result.fail(newException(OSError, "fail")) - checkNotCompiles: - result.fail(newException(ValueError, "fail")) proc test3: Future[void] {.async: (raw: true, raises: []).} = + result = newFuture[void]() checkNotCompiles: result.fail(newException(ValueError, "fail")) - + result.complete() # Inheritance proc test4: Future[void] {.async: (raw: true, raises: [CatchableError]).} = + result = newFuture[void]() result.fail(newException(IOError, "fail")) + check: + waitFor(test1()) == 12 + expect(IOError): + discard waitFor(test2()) + + waitFor(test3()) + expect(IOError): + waitFor(test4()) + test "or errors": proc testit {.async: (raises: [ValueError]).} = raise (ref ValueError)() diff --git a/tests/testserver.nim b/tests/testserver.nim index a63c9df7..280148cc 100644 --- a/tests/testserver.nim +++ b/tests/testserver.nim @@ -27,29 +27,36 @@ suite "Server's test suite": checkLeaks() proc serveStreamClient(server: StreamServer, - transp: StreamTransport) {.async.} = + transp: StreamTransport) {.async: (raises: []).} = discard proc serveCustomStreamClient(server: StreamServer, - transp: StreamTransport) {.async.} = - var cserver = cast[CustomServer](server) - var ctransp = cast[CustomTransport](transp) - cserver.test1 = "CONNECTION" - cserver.test2 = ctransp.test - cserver.test3 = await transp.readLine() - var answer = "ANSWER\r\n" - discard await transp.write(answer) - transp.close() - await transp.join() + transp: StreamTransport) {.async: (raises: []).} = + try: + var cserver = cast[CustomServer](server) + var ctransp = cast[CustomTransport](transp) + cserver.test1 = "CONNECTION" + cserver.test2 = ctransp.test + cserver.test3 = await transp.readLine() + var answer = "ANSWER\r\n" + discard await transp.write(answer) + transp.close() + await transp.join() + except CatchableError as exc: + raiseAssert exc.msg + proc serveUdataStreamClient(server: StreamServer, - transp: StreamTransport) {.async.} = - var udata = getUserData[CustomData](server) - var line = await transp.readLine() - var msg = line & udata.test & "\r\n" - discard await transp.write(msg) - transp.close() - await transp.join() + transp: StreamTransport) {.async: (raises: []).} = + try: + var udata = getUserData[CustomData](server) + var line = await transp.readLine() + var msg = line & udata.test & "\r\n" + discard await transp.write(msg) + transp.close() + await transp.join() + except CatchableError as exc: + raiseAssert exc.msg proc customServerTransport(server: StreamServer, fd: AsyncFD): StreamTransport = diff --git a/tests/teststream.nim b/tests/teststream.nim index b0427928..fb5534b5 100644 --- a/tests/teststream.nim +++ b/tests/teststream.nim @@ -55,124 +55,148 @@ suite "Stream Transport test suite": for i in 0 ..< len(result): result[i] = byte(message[i mod len(message)]) - proc serveClient1(server: StreamServer, transp: StreamTransport) {.async.} = - while not transp.atEof(): - var data = await transp.readLine() - if len(data) == 0: - doAssert(transp.atEof()) - break - doAssert(data.startsWith("REQUEST")) - var numstr = data[7..^1] - var num = parseInt(numstr) - var ans = "ANSWER" & $num & "\r\n" - var res = await transp.write(cast[pointer](addr ans[0]), len(ans)) - doAssert(res == len(ans)) - transp.close() - await transp.join() + proc serveClient1(server: StreamServer, transp: StreamTransport) {. + async: (raises: []).} = + try: + while not transp.atEof(): + var data = await transp.readLine() + if len(data) == 0: + doAssert(transp.atEof()) + break + doAssert(data.startsWith("REQUEST")) + var numstr = data[7..^1] + var num = parseInt(numstr) + var ans = "ANSWER" & $num & "\r\n" + var res = await transp.write(cast[pointer](addr ans[0]), len(ans)) + doAssert(res == len(ans)) + transp.close() + await transp.join() + except CatchableError as exc: + raiseAssert exc.msg - proc serveClient2(server: StreamServer, transp: StreamTransport) {.async.} = - var buffer: array[20, char] - var check = "REQUEST" - while not transp.atEof(): - zeroMem(addr buffer[0], MessageSize) - try: - await transp.readExactly(addr buffer[0], MessageSize) - except TransportIncompleteError: - break - doAssert(equalMem(addr buffer[0], addr check[0], len(check))) - var numstr = "" - var i = 7 - while i < MessageSize and (buffer[i] in {'0'..'9'}): - numstr.add(buffer[i]) - inc(i) - var num = parseInt(numstr) - var ans = "ANSWER" & $num - zeroMem(addr buffer[0], MessageSize) - copyMem(addr buffer[0], addr ans[0], len(ans)) - var res = await transp.write(cast[pointer](addr buffer[0]), MessageSize) - doAssert(res == MessageSize) - transp.close() - await transp.join() + proc serveClient2(server: StreamServer, transp: StreamTransport) {. + async: (raises: []).} = + try: + var buffer: array[20, char] + var check = "REQUEST" + while not transp.atEof(): + zeroMem(addr buffer[0], MessageSize) + try: + await transp.readExactly(addr buffer[0], MessageSize) + except TransportIncompleteError: + break + doAssert(equalMem(addr buffer[0], addr check[0], len(check))) + var numstr = "" + var i = 7 + while i < MessageSize and (buffer[i] in {'0'..'9'}): + numstr.add(buffer[i]) + inc(i) + var num = parseInt(numstr) + var ans = "ANSWER" & $num + zeroMem(addr buffer[0], MessageSize) + copyMem(addr buffer[0], addr ans[0], len(ans)) + var res = await transp.write(cast[pointer](addr buffer[0]), MessageSize) + doAssert(res == MessageSize) + transp.close() + await transp.join() + except CatchableError as exc: + raiseAssert exc.msg - proc serveClient3(server: StreamServer, transp: StreamTransport) {.async.} = - var buffer: array[20, char] - var check = "REQUEST" - var suffixStr = "SUFFIX" - var suffix = newSeq[byte](6) - copyMem(addr suffix[0], addr suffixStr[0], len(suffixStr)) - var counter = MessagesCount - while counter > 0: - zeroMem(addr buffer[0], MessageSize) - var res = await transp.readUntil(addr buffer[0], MessageSize, suffix) - doAssert(equalMem(addr buffer[0], addr check[0], len(check))) - var numstr = "" - var i = 7 - while i < MessageSize and (buffer[i] in {'0'..'9'}): - numstr.add(buffer[i]) - inc(i) - var num = parseInt(numstr) - doAssert(len(numstr) < 8) - var ans = "ANSWER" & $num & "SUFFIX" - zeroMem(addr buffer[0], MessageSize) - copyMem(addr buffer[0], addr ans[0], len(ans)) - res = await transp.write(cast[pointer](addr buffer[0]), len(ans)) - doAssert(res == len(ans)) - dec(counter) - transp.close() - await transp.join() + proc serveClient3(server: StreamServer, transp: StreamTransport) {. + async: (raises: []).} = + try: + var buffer: array[20, char] + var check = "REQUEST" + var suffixStr = "SUFFIX" + var suffix = newSeq[byte](6) + copyMem(addr suffix[0], addr suffixStr[0], len(suffixStr)) + var counter = MessagesCount + while counter > 0: + zeroMem(addr buffer[0], MessageSize) + var res = await transp.readUntil(addr buffer[0], MessageSize, suffix) + doAssert(equalMem(addr buffer[0], addr check[0], len(check))) + var numstr = "" + var i = 7 + while i < MessageSize and (buffer[i] in {'0'..'9'}): + numstr.add(buffer[i]) + inc(i) + var num = parseInt(numstr) + doAssert(len(numstr) < 8) + var ans = "ANSWER" & $num & "SUFFIX" + zeroMem(addr buffer[0], MessageSize) + copyMem(addr buffer[0], addr ans[0], len(ans)) + res = await transp.write(cast[pointer](addr buffer[0]), len(ans)) + doAssert(res == len(ans)) + dec(counter) + transp.close() + await transp.join() + except CatchableError as exc: + raiseAssert exc.msg - proc serveClient4(server: StreamServer, transp: StreamTransport) {.async.} = - var pathname = await transp.readLine() - var size = await transp.readLine() - var sizeNum = parseInt(size) - doAssert(sizeNum >= 0) - var rbuffer = newSeq[byte](sizeNum) - await transp.readExactly(addr rbuffer[0], sizeNum) - var lbuffer = readFile(pathname) - doAssert(len(lbuffer) == sizeNum) - doAssert(equalMem(addr rbuffer[0], addr lbuffer[0], sizeNum)) - var answer = "OK\r\n" - var res = await transp.write(cast[pointer](addr answer[0]), len(answer)) - doAssert(res == len(answer)) - transp.close() - await transp.join() + proc serveClient4(server: StreamServer, transp: StreamTransport) {. + async: (raises: []).} = + try: + var pathname = await transp.readLine() + var size = await transp.readLine() + var sizeNum = parseInt(size) + doAssert(sizeNum >= 0) + var rbuffer = newSeq[byte](sizeNum) + await transp.readExactly(addr rbuffer[0], sizeNum) + var lbuffer = readFile(pathname) + doAssert(len(lbuffer) == sizeNum) + doAssert(equalMem(addr rbuffer[0], addr lbuffer[0], sizeNum)) + var answer = "OK\r\n" + var res = await transp.write(cast[pointer](addr answer[0]), len(answer)) + doAssert(res == len(answer)) + transp.close() + await transp.join() + except CatchableError as exc: + raiseAssert exc.msg - proc serveClient7(server: StreamServer, transp: StreamTransport) {.async.} = - var answer = "DONE\r\n" - var expect = "" - var line = await transp.readLine() - doAssert(len(line) == BigMessageCount * len(BigMessagePattern)) - for i in 0..