introduce asyncraises in transports/asyncsync (#470)

With these fixes, `transports`/`asyncsync` correctly propagate and document their raises information - generally, most transport functions (send etc) raise `TransportError` and `CancelledError` - `closeWait` is special in that it generally doesn't fail.

This PR introduces the syntax `Future[void].Raises([types])` to create the `InternalRaisesFuture` type with the correct encoding for the types - this allows it to be used in user code while retaining the possibility to change the internal representation down the line.

* introduce raising constraints on stream callbacks - these constraints now give a warning when called with a callback that can raise exceptions (raising callbacks would crash 
* fix fail and its tests, which wasn't always given a good generic match
* work around nim bugs related to macro expansion of generic types
* make sure transports raise only `TransportError`-derived exceptions (and `CancelledError`)
This commit is contained in:
Jacek Sieka 2023-11-15 09:38:48 +01:00 committed by GitHub
parent 24be151cf3
commit f5ff9e32ca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 1574 additions and 1103 deletions

View File

@ -28,7 +28,7 @@ type
## is blocked in ``acquire()`` is being processed.
locked: bool
acquired: bool
waiters: seq[Future[void]]
waiters: seq[Future[void].Raising([CancelledError])]
AsyncEvent* = ref object of RootRef
## A primitive event object.
@ -41,7 +41,7 @@ type
## state to be signaled, when event get fired, then all coroutines
## continue proceeds in order, they have entered waiting state.
flag: bool
waiters: seq[Future[void]]
waiters: seq[Future[void].Raising([CancelledError])]
AsyncQueue*[T] = ref object of RootRef
## A queue, useful for coordinating producer and consumer coroutines.
@ -50,8 +50,8 @@ type
## infinite. If it is an integer greater than ``0``, then "await put()"
## will block when the queue reaches ``maxsize``, until an item is
## removed by "await get()".
getters: seq[Future[void]]
putters: seq[Future[void]]
getters: seq[Future[void].Raising([CancelledError])]
putters: seq[Future[void].Raising([CancelledError])]
queue: Deque[T]
maxsize: int
@ -69,7 +69,7 @@ type
EventQueueReader* = object
key: EventQueueKey
offset: int
waiter: Future[void]
waiter: Future[void].Raising([CancelledError])
overflow: bool
AsyncEventQueue*[T] = ref object of RootObj
@ -90,17 +90,14 @@ proc newAsyncLock*(): AsyncLock =
## The ``release()`` procedure changes the state to unlocked and returns
## immediately.
# Workaround for callSoon() not worked correctly before
# getThreadDispatcher() call.
discard getThreadDispatcher()
AsyncLock(waiters: newSeq[Future[void]](), locked: false, acquired: false)
AsyncLock()
proc wakeUpFirst(lock: AsyncLock): bool {.inline.} =
## Wake up the first waiter if it isn't done.
var i = 0
var res = false
while i < len(lock.waiters):
var waiter = lock.waiters[i]
let waiter = lock.waiters[i]
inc(i)
if not(waiter.finished()):
waiter.complete()
@ -120,7 +117,7 @@ proc checkAll(lock: AsyncLock): bool {.inline.} =
return false
return true
proc acquire*(lock: AsyncLock) {.async.} =
proc acquire*(lock: AsyncLock) {.async: (raises: [CancelledError]).} =
## Acquire a lock ``lock``.
##
## This procedure blocks until the lock ``lock`` is unlocked, then sets it
@ -129,7 +126,7 @@ proc acquire*(lock: AsyncLock) {.async.} =
lock.acquired = true
lock.locked = true
else:
var w = newFuture[void]("AsyncLock.acquire")
let w = Future[void].Raising([CancelledError]).init("AsyncLock.acquire")
lock.waiters.add(w)
await w
lock.acquired = true
@ -165,13 +162,10 @@ proc newAsyncEvent*(): AsyncEvent =
## procedure and reset to `false` with the `clear()` procedure.
## The `wait()` procedure blocks until the flag is `true`. The flag is
## initially `false`.
AsyncEvent()
# Workaround for callSoon() not worked correctly before
# getThreadDispatcher() call.
discard getThreadDispatcher()
AsyncEvent(waiters: newSeq[Future[void]](), flag: false)
proc wait*(event: AsyncEvent): Future[void] =
proc wait*(event: AsyncEvent): Future[void] {.
async: (raw: true, raises: [CancelledError]).} =
## Block until the internal flag of ``event`` is `true`.
## If the internal flag is `true` on entry, return immediately. Otherwise,
## block until another task calls `fire()` to set the flag to `true`,
@ -210,20 +204,15 @@ proc isSet*(event: AsyncEvent): bool =
proc newAsyncQueue*[T](maxsize: int = 0): AsyncQueue[T] =
## Creates a new asynchronous queue ``AsyncQueue``.
# Workaround for callSoon() not worked correctly before
# getThreadDispatcher() call.
discard getThreadDispatcher()
AsyncQueue[T](
getters: newSeq[Future[void]](),
putters: newSeq[Future[void]](),
queue: initDeque[T](),
maxsize: maxsize
)
proc wakeupNext(waiters: var seq[Future[void]]) {.inline.} =
proc wakeupNext(waiters: var seq) {.inline.} =
var i = 0
while i < len(waiters):
var waiter = waiters[i]
let waiter = waiters[i]
inc(i)
if not(waiter.finished()):
@ -250,6 +239,24 @@ proc empty*[T](aq: AsyncQueue[T]): bool {.inline.} =
## Return ``true`` if the queue is empty, ``false`` otherwise.
(len(aq.queue) == 0)
proc addFirstImpl[T](aq: AsyncQueue[T], item: T) =
aq.queue.addFirst(item)
aq.getters.wakeupNext()
proc addLastImpl[T](aq: AsyncQueue[T], item: T) =
aq.queue.addLast(item)
aq.getters.wakeupNext()
proc popFirstImpl[T](aq: AsyncQueue[T]): T =
let res = aq.queue.popFirst()
aq.putters.wakeupNext()
res
proc popLastImpl[T](aq: AsyncQueue[T]): T =
let res = aq.queue.popLast()
aq.putters.wakeupNext()
res
proc addFirstNoWait*[T](aq: AsyncQueue[T], item: T) {.
raises: [AsyncQueueFullError].}=
## Put an item ``item`` to the beginning of the queue ``aq`` immediately.
@ -257,8 +264,7 @@ proc addFirstNoWait*[T](aq: AsyncQueue[T], item: T) {.
## If queue ``aq`` is full, then ``AsyncQueueFullError`` exception raised.
if aq.full():
raise newException(AsyncQueueFullError, "AsyncQueue is full!")
aq.queue.addFirst(item)
aq.getters.wakeupNext()
aq.addFirstImpl(item)
proc addLastNoWait*[T](aq: AsyncQueue[T], item: T) {.
raises: [AsyncQueueFullError].}=
@ -267,8 +273,7 @@ proc addLastNoWait*[T](aq: AsyncQueue[T], item: T) {.
## If queue ``aq`` is full, then ``AsyncQueueFullError`` exception raised.
if aq.full():
raise newException(AsyncQueueFullError, "AsyncQueue is full!")
aq.queue.addLast(item)
aq.getters.wakeupNext()
aq.addLastImpl(item)
proc popFirstNoWait*[T](aq: AsyncQueue[T]): T {.
raises: [AsyncQueueEmptyError].} =
@ -277,9 +282,7 @@ proc popFirstNoWait*[T](aq: AsyncQueue[T]): T {.
## If queue ``aq`` is empty, then ``AsyncQueueEmptyError`` exception raised.
if aq.empty():
raise newException(AsyncQueueEmptyError, "AsyncQueue is empty!")
let res = aq.queue.popFirst()
aq.putters.wakeupNext()
res
aq.popFirstImpl()
proc popLastNoWait*[T](aq: AsyncQueue[T]): T {.
raises: [AsyncQueueEmptyError].} =
@ -288,65 +291,63 @@ proc popLastNoWait*[T](aq: AsyncQueue[T]): T {.
## If queue ``aq`` is empty, then ``AsyncQueueEmptyError`` exception raised.
if aq.empty():
raise newException(AsyncQueueEmptyError, "AsyncQueue is empty!")
let res = aq.queue.popLast()
aq.putters.wakeupNext()
res
aq.popLastImpl()
proc addFirst*[T](aq: AsyncQueue[T], item: T) {.async.} =
proc addFirst*[T](aq: AsyncQueue[T], item: T) {.async: (raises: [CancelledError]).} =
## Put an ``item`` to the beginning of the queue ``aq``. If the queue is full,
## wait until a free slot is available before adding item.
while aq.full():
var putter = newFuture[void]("AsyncQueue.addFirst")
let putter = Future[void].Raising([CancelledError]).init("AsyncQueue.addFirst")
aq.putters.add(putter)
try:
await putter
except CatchableError as exc:
except CancelledError as exc:
if not(aq.full()) and not(putter.cancelled()):
aq.putters.wakeupNext()
raise exc
aq.addFirstNoWait(item)
aq.addFirstImpl(item)
proc addLast*[T](aq: AsyncQueue[T], item: T) {.async.} =
proc addLast*[T](aq: AsyncQueue[T], item: T) {.async: (raises: [CancelledError]).} =
## Put an ``item`` to the end of the queue ``aq``. If the queue is full,
## wait until a free slot is available before adding item.
while aq.full():
var putter = newFuture[void]("AsyncQueue.addLast")
let putter = Future[void].Raising([CancelledError]).init("AsyncQueue.addLast")
aq.putters.add(putter)
try:
await putter
except CatchableError as exc:
except CancelledError as exc:
if not(aq.full()) and not(putter.cancelled()):
aq.putters.wakeupNext()
raise exc
aq.addLastNoWait(item)
aq.addLastImpl(item)
proc popFirst*[T](aq: AsyncQueue[T]): Future[T] {.async.} =
proc popFirst*[T](aq: AsyncQueue[T]): Future[T] {.async: (raises: [CancelledError]).} =
## Remove and return an ``item`` from the beginning of the queue ``aq``.
## If the queue is empty, wait until an item is available.
while aq.empty():
var getter = newFuture[void]("AsyncQueue.popFirst")
let getter = Future[void].Raising([CancelledError]).init("AsyncQueue.popFirst")
aq.getters.add(getter)
try:
await getter
except CatchableError as exc:
except CancelledError as exc:
if not(aq.empty()) and not(getter.cancelled()):
aq.getters.wakeupNext()
raise exc
return aq.popFirstNoWait()
aq.popFirstImpl()
proc popLast*[T](aq: AsyncQueue[T]): Future[T] {.async.} =
proc popLast*[T](aq: AsyncQueue[T]): Future[T] {.async: (raises: [CancelledError]).} =
## Remove and return an ``item`` from the end of the queue ``aq``.
## If the queue is empty, wait until an item is available.
while aq.empty():
var getter = newFuture[void]("AsyncQueue.popLast")
let getter = Future[void].Raising([CancelledError]).init("AsyncQueue.popLast")
aq.getters.add(getter)
try:
await getter
except CatchableError as exc:
except CancelledError as exc:
if not(aq.empty()) and not(getter.cancelled()):
aq.getters.wakeupNext()
raise exc
return aq.popLastNoWait()
aq.popLastImpl()
proc putNoWait*[T](aq: AsyncQueue[T], item: T) {.
raises: [AsyncQueueFullError].} =
@ -358,11 +359,13 @@ proc getNoWait*[T](aq: AsyncQueue[T]): T {.
## Alias of ``popFirstNoWait()``.
aq.popFirstNoWait()
proc put*[T](aq: AsyncQueue[T], item: T): Future[void] {.inline.} =
proc put*[T](aq: AsyncQueue[T], item: T): Future[void] {.
async: (raw: true, raises: [CancelledError]).} =
## Alias of ``addLast()``.
aq.addLast(item)
proc get*[T](aq: AsyncQueue[T]): Future[T] {.inline.} =
proc get*[T](aq: AsyncQueue[T]): Future[T] {.
async: (raw: true, raises: [CancelledError]).} =
## Alias of ``popFirst()``.
aq.popFirst()
@ -416,7 +419,7 @@ proc contains*[T](aq: AsyncQueue[T], item: T): bool {.inline.} =
## via the ``in`` operator.
for e in aq.queue.items():
if e == item: return true
return false
false
proc `$`*[T](aq: AsyncQueue[T]): string =
## Turn an async queue ``aq`` into its string representation.
@ -452,8 +455,7 @@ proc compact(ab: AsyncEventQueue) {.raises: [].} =
else:
ab.queue.clear()
proc getReaderIndex(ab: AsyncEventQueue, key: EventQueueKey): int {.
raises: [].} =
proc getReaderIndex(ab: AsyncEventQueue, key: EventQueueKey): int =
for index, value in ab.readers.pairs():
if value.key == key:
return index
@ -507,7 +509,7 @@ proc close*(ab: AsyncEventQueue) {.raises: [].} =
ab.readers.reset()
ab.queue.clear()
proc closeWait*(ab: AsyncEventQueue): Future[void] {.raises: [].} =
proc closeWait*(ab: AsyncEventQueue): Future[void] {.async: (raw: true, raises: []).} =
let retFuture = newFuture[void]("AsyncEventQueue.closeWait()",
{FutureFlag.OwnCancelSchedule})
proc continuation(udata: pointer) {.gcsafe.} =
@ -528,7 +530,7 @@ template readerOverflow*(ab: AsyncEventQueue,
reader: EventQueueReader): bool =
ab.limit + (reader.offset - ab.offset) <= len(ab.queue)
proc emit*[T](ab: AsyncEventQueue[T], data: T) {.raises: [].} =
proc emit*[T](ab: AsyncEventQueue[T], data: T) =
if len(ab.readers) > 0:
# We enqueue `data` only if there active reader present.
var changesPresent = false
@ -565,7 +567,8 @@ proc emit*[T](ab: AsyncEventQueue[T], data: T) {.raises: [].} =
proc waitEvents*[T](ab: AsyncEventQueue[T],
key: EventQueueKey,
eventsCount = -1): Future[seq[T]] {.async.} =
eventsCount = -1): Future[seq[T]] {.
async: (raises: [AsyncEventQueueFullError, CancelledError]).} =
## Wait for events
var
events: seq[T]
@ -595,7 +598,8 @@ proc waitEvents*[T](ab: AsyncEventQueue[T],
doAssert(length >= ab.readers[index].offset)
if length == ab.readers[index].offset:
# We are at the end of queue, it means that we should wait for new events.
let waitFuture = newFuture[void]("AsyncEventQueue.waitEvents")
let waitFuture = Future[void].Raising([CancelledError]).init(
"AsyncEventQueue.waitEvents")
ab.readers[index].waiter = waitFuture
resetFuture = true
await waitFuture
@ -626,4 +630,4 @@ proc waitEvents*[T](ab: AsyncEventQueue[T],
if (eventsCount <= 0) or (len(events) == eventsCount):
break
return events
events

View File

@ -16,7 +16,9 @@ import stew/base10
import ./[asyncengine, raisesfutures]
import ../[config, futures]
export raisesfutures.InternalRaisesFuture
export
raisesfutures.Raising, raisesfutures.InternalRaisesFuture,
raisesfutures.init, raisesfutures.error, raisesfutures.readError
when chronosStackTrace:
import std/strutils
@ -109,7 +111,7 @@ template newInternalRaisesFuture*[T, E](fromProc: static[string] = ""): auto =
## that this future belongs to, is a good habit as it helps with debugging.
newInternalRaisesFutureImpl[T, E](getSrcLocation(fromProc))
template newFutureSeq*[A, B](fromProc: static[string] = ""): FutureSeq[A, B] =
template newFutureSeq*[A, B](fromProc: static[string] = ""): FutureSeq[A, B] {.deprecated.} =
## Create a new future which can hold/preserve GC sequence until future will
## not be completed.
##
@ -117,7 +119,7 @@ template newFutureSeq*[A, B](fromProc: static[string] = ""): FutureSeq[A, B] =
## that this future belongs to, is a good habit as it helps with debugging.
newFutureSeqImpl[A, B](getSrcLocation(fromProc))
template newFutureStr*[T](fromProc: static[string] = ""): FutureStr[T] =
template newFutureStr*[T](fromProc: static[string] = ""): FutureStr[T] {.deprecated.} =
## Create a new future which can hold/preserve GC string until future will
## not be completed.
##
@ -205,7 +207,8 @@ template complete*(future: Future[void]) =
## Completes a void ``future``.
complete(future, getSrcLocation())
proc fail(future: FutureBase, error: ref CatchableError, loc: ptr SrcLoc) =
proc failImpl(
future: FutureBase, error: ref CatchableError, loc: ptr SrcLoc) =
if not(future.cancelled()):
checkFinished(future, loc)
future.internalError = error
@ -216,10 +219,16 @@ proc fail(future: FutureBase, error: ref CatchableError, loc: ptr SrcLoc) =
getStackTrace(error)
future.finish(FutureState.Failed)
template fail*(
future: FutureBase, error: ref CatchableError, warn: static bool = false) =
template fail*[T](
future: Future[T], error: ref CatchableError, warn: static bool = false) =
## Completes ``future`` with ``error``.
fail(future, error, getSrcLocation())
failImpl(future, error, getSrcLocation())
template fail*[T, E](
future: InternalRaisesFuture[T, E], error: ref CatchableError,
warn: static bool = true) =
checkRaises(future, E, error, warn)
failImpl(future, error, getSrcLocation())
template newCancelledError(): ref CancelledError =
(ref CancelledError)(msg: "Future operation cancelled!")
@ -377,8 +386,6 @@ proc futureContinue*(fut: FutureBase) {.raises: [], gcsafe.} =
{.pop.}
when chronosStackTrace:
import std/strutils
template getFilenameProcname(entry: StackTraceEntry): (string, string) =
when compiles(entry.filenameStr) and compiles(entry.procnameStr):
# We can't rely on "entry.filename" and "entry.procname" still being valid
@ -462,31 +469,36 @@ proc internalCheckComplete*(fut: FutureBase) {.raises: [CatchableError].} =
injectStacktrace(fut.internalError)
raise fut.internalError
macro internalCheckComplete*(f: InternalRaisesFuture): untyped =
macro internalCheckComplete*(fut: InternalRaisesFuture, raises: typed) =
# For InternalRaisesFuture[void, (ValueError, OSError), will do:
# {.cast(raises: [ValueError, OSError]).}:
# if isNil(f.error): discard
# else: raise f.error
let e = getTypeInst(f)[2]
let types = getType(e)
# TODO https://github.com/nim-lang/Nim/issues/22937
# we cannot `getTypeInst` on the `fut` - when aliases are involved, the
# generics are lost - so instead, we pass the raises list explicitly
let types = getRaisesTypes(raises)
if isNoRaises(types):
return quote do:
if not(isNil(`f`.internalError)):
raiseAssert("Unhandled future exception: " & `f`.error.msg)
if not(isNil(`fut`.internalError)):
# This would indicate a bug in which `error` was set via the non-raising
# base type
raiseAssert("Error set on a non-raising future: " & `fut`.internalError.msg)
expectKind(types, nnkBracketExpr)
expectKind(types[0], nnkSym)
assert types[0].strVal == "tuple"
let ifRaise = nnkIfExpr.newTree(
nnkElifExpr.newTree(
quote do: isNil(`f`.internalError),
quote do: isNil(`fut`.internalError),
quote do: discard
),
nnkElseExpr.newTree(
nnkRaiseStmt.newNimNode(lineInfoFrom=f).add(
quote do: (`f`.internalError)
nnkRaiseStmt.newNimNode(lineInfoFrom=fut).add(
quote do: (`fut`.internalError)
)
)
)
@ -1118,7 +1130,7 @@ proc one*[F: SomeFuture](futs: varargs[F]): Future[F] {.
return retFuture
proc race*(futs: varargs[FutureBase]): Future[FutureBase] {.
async: (raw: true, raises: [CancelledError]).} =
async: (raw: true, raises: [ValueError, CancelledError]).} =
## Returns a future which will complete and return completed FutureBase,
## when one of the futures in ``futs`` will be completed, failed or canceled.
##
@ -1488,12 +1500,6 @@ when defined(windows):
{.pop.} # Automatically deduced raises from here onwards
template fail*[T, E](
future: InternalRaisesFuture[T, E], error: ref CatchableError,
warn: static bool = true) =
checkRaises(future, error, warn)
fail(future, error, getSrcLocation())
proc waitFor*[T, E](fut: InternalRaisesFuture[T, E]): T = # {.raises: [E]}
## **Blocks** the current thread until the specified future finishes and
## reads it, potentially raising an exception if the future failed or was
@ -1512,7 +1518,7 @@ proc read*[T: not void, E](future: InternalRaisesFuture[T, E]): lent T = # {.rai
# TODO: Make a custom exception type for this?
raise newException(ValueError, "Future still in progress.")
internalCheckComplete(future)
internalCheckComplete(future, E)
future.internalValue
proc read*[E](future: InternalRaisesFuture[void, E]) = # {.raises: [E, CancelledError].}

View File

@ -497,7 +497,7 @@ proc asyncSingleProc(prc, params: NimNode): NimNode {.compileTime.} =
prc
template await*[T](f: Future[T]): untyped =
template await*[T](f: Future[T]): T =
when declared(chronosInternalRetFuture):
chronosInternalRetFuture.internalChild = f
# `futureContinue` calls the iterator generated by the `async`
@ -512,6 +512,21 @@ template await*[T](f: Future[T]): untyped =
else:
unsupported "await is only available within {.async.}"
template await*[T, E](f: InternalRaisesFuture[T, E]): T =
when declared(chronosInternalRetFuture):
chronosInternalRetFuture.internalChild = f
# `futureContinue` calls the iterator generated by the `async`
# transformation - `yield` gives control back to `futureContinue` which is
# responsible for resuming execution once the yielded future is finished
yield chronosInternalRetFuture.internalChild
# `child` released by `futureContinue`
cast[type(f)](chronosInternalRetFuture.internalChild).internalCheckComplete(E)
when T isnot void:
cast[type(f)](chronosInternalRetFuture.internalChild).value()
else:
unsupported "await is only available within {.async.}"
template awaitne*[T](f: Future[T]): Future[T] =
when declared(chronosInternalRetFuture):
chronosInternalRetFuture.internalChild = f

View File

@ -1,5 +1,5 @@
import
std/macros,
std/[macros, sequtils],
../futures
type
@ -18,6 +18,45 @@ proc makeNoRaises*(): NimNode {.compileTime.} =
ident"void"
macro Raising*[T](F: typedesc[Future[T]], E: varargs[typedesc]): untyped =
## Given a Future type instance, return a type storing `{.raises.}`
## information
##
## Note; this type may change in the future
E.expectKind(nnkBracket)
let raises = if E.len == 0:
makeNoRaises()
else:
nnkTupleConstr.newTree(E.mapIt(it))
nnkBracketExpr.newTree(
ident "InternalRaisesFuture",
nnkDotExpr.newTree(F, ident"T"),
raises
)
template init*[T, E](
F: type InternalRaisesFuture[T, E], fromProc: static[string] = ""): F =
## Creates a new pending future.
##
## Specifying ``fromProc``, which is a string specifying the name of the proc
## that this future belongs to, is a good habit as it helps with debugging.
let res = F()
internalInitFutureBase(res, getSrcLocation(fromProc), FutureState.Pending, {})
res
template init*[T, E](
F: type InternalRaisesFuture[T, E], fromProc: static[string] = "",
flags: static[FutureFlags]): F =
## Creates a new pending future.
##
## Specifying ``fromProc``, which is a string specifying the name of the proc
## that this future belongs to, is a good habit as it helps with debugging.
let res = F()
internalInitFutureBase(
res, getSrcLocation(fromProc), FutureState.Pending, flags)
res
proc isNoRaises*(n: NimNode): bool {.compileTime.} =
n.eqIdent("void")
@ -78,21 +117,15 @@ macro union*(tup0: typedesc[tuple], tup1: typedesc[tuple]): typedesc =
if result.len == 0:
result = makeNoRaises()
proc getRaises*(future: NimNode): NimNode {.compileTime.} =
# Given InternalRaisesFuture[T, (A, B, C)], returns (A, B, C)
let types = getType(getTypeInst(future)[2])
if isNoRaises(types):
nnkBracketExpr.newTree(newEmptyNode())
else:
expectKind(types, nnkBracketExpr)
expectKind(types[0], nnkSym)
assert types[0].strVal == "tuple"
assert types.len >= 1
types
proc getRaisesTypes*(raises: NimNode): NimNode =
let typ = getType(raises)
case typ.typeKind
of ntyTypeDesc: typ[1]
else: typ
macro checkRaises*[T: CatchableError](
future: InternalRaisesFuture, error: ref T, warn: static bool = true): untyped =
future: InternalRaisesFuture, raises: typed, error: ref T,
warn: static bool = true): untyped =
## Generate code that checks that the given error is compatible with the
## raises restrictions of `future`.
##
@ -100,11 +133,18 @@ macro checkRaises*[T: CatchableError](
## information available at compile time - in particular, if the raises
## inherit from `error`, we end up with the equivalent of a downcast which
## raises a Defect if it fails.
let raises = getRaises(future)
let
raises = getRaisesTypes(raises)
expectKind(getTypeInst(error), nnkRefTy)
let toMatch = getTypeInst(error)[0]
if isNoRaises(raises):
error(
"`fail`: `" & repr(toMatch) & "` incompatible with `raises: []`", future)
return
var
typeChecker = ident"false"
maybeChecker = ident"false"
@ -134,3 +174,15 @@ macro checkRaises*[T: CatchableError](
else:
`warning`
assert(`runtimeChecker`, `errorMsg`)
proc error*[T](future: InternalRaisesFuture[T, void]): ref CatchableError {.
raises: [].} =
static:
warning("No exceptions possible with this operation, `error` always returns nil")
nil
proc readError*[T](future: InternalRaisesFuture[T, void]): ref CatchableError {.
raises: [ValueError].} =
static:
warning("No exceptions possible with this operation, `readError` always raises")
raise newException(ValueError, "No error in future.")

View File

@ -113,6 +113,8 @@ type
## Transport's capability not supported exception
TransportUseClosedError* = object of TransportError
## Usage after transport close exception
TransportUseEofError* = object of TransportError
## Usage after transport half-close exception
TransportTooManyError* = object of TransportError
## Too many open file descriptors exception
TransportAbortedError* = object of TransportError
@ -567,11 +569,11 @@ template checkClosed*(t: untyped, future: untyped) =
template checkWriteEof*(t: untyped, future: untyped) =
if (WriteEof in (t).state):
future.fail(newException(TransportError,
future.fail(newException(TransportUseEofError,
"Transport connection is already dropped!"))
return future
template getError*(t: untyped): ref CatchableError =
template getError*(t: untyped): ref TransportError =
var err = (t).error
(t).error = nil
err

View File

@ -27,7 +27,10 @@ type
DatagramCallback* = proc(transp: DatagramTransport,
remote: TransportAddress): Future[void] {.
gcsafe, raises: [].}
async: (raises: []).}
UnsafeDatagramCallback* = proc(transp: DatagramTransport,
remote: TransportAddress): Future[void] {.async.}
DatagramTransport* = ref object of RootRef
fd*: AsyncFD # File descriptor
@ -35,7 +38,7 @@ type
flags: set[ServerFlags] # Flags
buffer: seq[byte] # Reading buffer
buflen: int # Reading buffer effective size
error: ref CatchableError # Current error
error: ref TransportError # Current error
queue: Deque[GramVector] # Writer queue
local: TransportAddress # Local address
remote: TransportAddress # Remote address
@ -599,6 +602,41 @@ proc close*(transp: DatagramTransport) =
transp.state.incl({WriteClosed, ReadClosed})
closeSocket(transp.fd, continuation)
proc newDatagramTransportCommon(cbproc: UnsafeDatagramCallback,
remote: TransportAddress,
local: TransportAddress,
sock: AsyncFD,
flags: set[ServerFlags],
udata: pointer,
child: DatagramTransport,
bufferSize: int,
ttl: int,
dualstack = DualStackType.Auto
): DatagramTransport {.
raises: [TransportOsError].} =
## Create new UDP datagram transport (IPv4).
##
## ``cbproc`` - callback which will be called, when new datagram received.
## ``remote`` - bind transport to remote address (optional).
## ``local`` - bind transport to local address (to serving incoming
## datagrams, optional)
## ``sock`` - application-driven socket to use.
## ``flags`` - flags that will be applied to socket.
## ``udata`` - custom argument which will be passed to ``cbproc``.
## ``bufSize`` - size of internal buffer.
## ``ttl`` - TTL for UDP datagram packet (only usable when flags has
## ``Broadcast`` option).
proc wrap(transp: DatagramTransport,
remote: TransportAddress) {.async: (raises: []).} =
try:
cbproc(transp, remote)
except CatchableError as exc:
raiseAssert "Unexpected exception from stream server cbproc: " & exc.msg
newDatagramTransportCommon(wrap, remote, local, sock, flags, udata, child,
bufferSize, ttl, dualstack)
proc newDatagramTransport*(cbproc: DatagramCallback,
remote: TransportAddress = AnyAddress,
local: TransportAddress = AnyAddress,
@ -689,7 +727,102 @@ proc newDatagramTransport6*[T](cbproc: DatagramCallback,
cast[pointer](udata), child, bufSize, ttl,
dualstack)
proc join*(transp: DatagramTransport): Future[void] =
proc newDatagramTransport*(cbproc: UnsafeDatagramCallback,
remote: TransportAddress = AnyAddress,
local: TransportAddress = AnyAddress,
sock: AsyncFD = asyncInvalidSocket,
flags: set[ServerFlags] = {},
udata: pointer = nil,
child: DatagramTransport = nil,
bufSize: int = DefaultDatagramBufferSize,
ttl: int = 0,
dualstack = DualStackType.Auto
): DatagramTransport {.
raises: [TransportOsError],
deprecated: "Callback must not raise exceptions, annotate with {.async: (raises: []).}".} =
## Create new UDP datagram transport (IPv4).
##
## ``cbproc`` - callback which will be called, when new datagram received.
## ``remote`` - bind transport to remote address (optional).
## ``local`` - bind transport to local address (to serving incoming
## datagrams, optional)
## ``sock`` - application-driven socket to use.
## ``flags`` - flags that will be applied to socket.
## ``udata`` - custom argument which will be passed to ``cbproc``.
## ``bufSize`` - size of internal buffer.
## ``ttl`` - TTL for UDP datagram packet (only usable when flags has
## ``Broadcast`` option).
newDatagramTransportCommon(cbproc, remote, local, sock, flags, udata, child,
bufSize, ttl, dualstack)
proc newDatagramTransport*[T](cbproc: UnsafeDatagramCallback,
udata: ref T,
remote: TransportAddress = AnyAddress,
local: TransportAddress = AnyAddress,
sock: AsyncFD = asyncInvalidSocket,
flags: set[ServerFlags] = {},
child: DatagramTransport = nil,
bufSize: int = DefaultDatagramBufferSize,
ttl: int = 0,
dualstack = DualStackType.Auto
): DatagramTransport {.
raises: [TransportOsError],
deprecated: "Callback must not raise exceptions, annotate with {.async: (raises: []).}".} =
var fflags = flags + {GCUserData}
GC_ref(udata)
newDatagramTransportCommon(cbproc, remote, local, sock, fflags,
cast[pointer](udata), child, bufSize, ttl,
dualstack)
proc newDatagramTransport6*(cbproc: UnsafeDatagramCallback,
remote: TransportAddress = AnyAddress6,
local: TransportAddress = AnyAddress6,
sock: AsyncFD = asyncInvalidSocket,
flags: set[ServerFlags] = {},
udata: pointer = nil,
child: DatagramTransport = nil,
bufSize: int = DefaultDatagramBufferSize,
ttl: int = 0,
dualstack = DualStackType.Auto
): DatagramTransport {.
raises: [TransportOsError],
deprecated: "Callback must not raise exceptions, annotate with {.async: (raises: []).}".} =
## Create new UDP datagram transport (IPv6).
##
## ``cbproc`` - callback which will be called, when new datagram received.
## ``remote`` - bind transport to remote address (optional).
## ``local`` - bind transport to local address (to serving incoming
## datagrams, optional)
## ``sock`` - application-driven socket to use.
## ``flags`` - flags that will be applied to socket.
## ``udata`` - custom argument which will be passed to ``cbproc``.
## ``bufSize`` - size of internal buffer.
## ``ttl`` - TTL for UDP datagram packet (only usable when flags has
## ``Broadcast`` option).
newDatagramTransportCommon(cbproc, remote, local, sock, flags, udata, child,
bufSize, ttl, dualstack)
proc newDatagramTransport6*[T](cbproc: UnsafeDatagramCallback,
udata: ref T,
remote: TransportAddress = AnyAddress6,
local: TransportAddress = AnyAddress6,
sock: AsyncFD = asyncInvalidSocket,
flags: set[ServerFlags] = {},
child: DatagramTransport = nil,
bufSize: int = DefaultDatagramBufferSize,
ttl: int = 0,
dualstack = DualStackType.Auto
): DatagramTransport {.
raises: [TransportOsError],
deprecated: "Callback must not raise exceptions, annotate with {.async: (raises: []).}".} =
var fflags = flags + {GCUserData}
GC_ref(udata)
newDatagramTransportCommon(cbproc, remote, local, sock, fflags,
cast[pointer](udata), child, bufSize, ttl,
dualstack)
proc join*(transp: DatagramTransport): Future[void] {.
async: (raw: true, raises: [CancelledError]).} =
## Wait until the transport ``transp`` will be closed.
var retFuture = newFuture[void]("datagram.transport.join")
@ -707,14 +840,15 @@ proc join*(transp: DatagramTransport): Future[void] =
return retFuture
proc closeWait*(transp: DatagramTransport): Future[void] =
proc closeWait*(transp: DatagramTransport): Future[void] {.
async: (raw: true, raises: []).} =
## Close transport ``transp`` and release all resources.
const FutureName = "datagram.transport.closeWait"
let retFuture = newFuture[void](
"datagram.transport.closeWait", {FutureFlag.OwnCancelSchedule})
if {ReadClosed, WriteClosed} * transp.state != {}:
return Future.completed(FutureName)
let retFuture = newFuture[void](FutureName, {FutureFlag.OwnCancelSchedule})
retFuture.complete()
return retFuture
proc continuation(udata: pointer) {.gcsafe.} =
retFuture.complete()
@ -733,7 +867,8 @@ proc closeWait*(transp: DatagramTransport): Future[void] =
retFuture
proc send*(transp: DatagramTransport, pbytes: pointer,
nbytes: int): Future[void] =
nbytes: int): Future[void] {.
async: (raw: true, raises: [TransportError, CancelledError]).} =
## Send buffer with pointer ``pbytes`` and size ``nbytes`` using transport
## ``transp`` to remote destination address which was bounded on transport.
var retFuture = newFuture[void]("datagram.transport.send(pointer)")
@ -751,22 +886,21 @@ proc send*(transp: DatagramTransport, pbytes: pointer,
return retFuture
proc send*(transp: DatagramTransport, msg: sink string,
msglen = -1): Future[void] =
msglen = -1): Future[void] {.
async: (raw: true, raises: [TransportError, CancelledError]).} =
## Send string ``msg`` using transport ``transp`` to remote destination
## address which was bounded on transport.
var retFuture = newFutureStr[void]("datagram.transport.send(string)")
var retFuture = newFuture[void]("datagram.transport.send(string)")
transp.checkClosed(retFuture)
when declared(shallowCopy):
if not(isLiteral(msg)):
shallowCopy(retFuture.gcholder, msg)
else:
retFuture.gcholder = msg
else:
retFuture.gcholder = msg
let length = if msglen <= 0: len(msg) else: msglen
let vector = GramVector(kind: WithoutAddress, buf: addr retFuture.gcholder[0],
var localCopy = msg
retFuture.addCallback(proc(_: pointer) = reset(localCopy))
let vector = GramVector(kind: WithoutAddress, buf: addr localCopy[0],
buflen: length,
writer: cast[Future[void]](retFuture))
writer: retFuture)
transp.queue.addLast(vector)
if WritePaused in transp.state:
let wres = transp.resumeWrite()
@ -775,22 +909,20 @@ proc send*(transp: DatagramTransport, msg: sink string,
return retFuture
proc send*[T](transp: DatagramTransport, msg: sink seq[T],
msglen = -1): Future[void] =
msglen = -1): Future[void] {.
async: (raw: true, raises: [TransportError, CancelledError]).} =
## Send string ``msg`` using transport ``transp`` to remote destination
## address which was bounded on transport.
var retFuture = newFutureSeq[void, T]("datagram.transport.send(seq)")
var retFuture = newFuture[void]("datagram.transport.send(seq)")
transp.checkClosed(retFuture)
when declared(shallowCopy):
if not(isLiteral(msg)):
shallowCopy(retFuture.gcholder, msg)
else:
retFuture.gcholder = msg
else:
retFuture.gcholder = msg
let length = if msglen <= 0: (len(msg) * sizeof(T)) else: (msglen * sizeof(T))
let vector = GramVector(kind: WithoutAddress, buf: addr retFuture.gcholder[0],
var localCopy = msg
retFuture.addCallback(proc(_: pointer) = reset(localCopy))
let vector = GramVector(kind: WithoutAddress, buf: addr localCopy[0],
buflen: length,
writer: cast[Future[void]](retFuture))
writer: retFuture)
transp.queue.addLast(vector)
if WritePaused in transp.state:
let wres = transp.resumeWrite()
@ -799,7 +931,8 @@ proc send*[T](transp: DatagramTransport, msg: sink seq[T],
return retFuture
proc sendTo*(transp: DatagramTransport, remote: TransportAddress,
pbytes: pointer, nbytes: int): Future[void] =
pbytes: pointer, nbytes: int): Future[void] {.
async: (raw: true, raises: [TransportError, CancelledError]).} =
## Send buffer with pointer ``pbytes`` and size ``nbytes`` using transport
## ``transp`` to remote destination address ``remote``.
var retFuture = newFuture[void]("datagram.transport.sendTo(pointer)")
@ -814,22 +947,20 @@ proc sendTo*(transp: DatagramTransport, remote: TransportAddress,
return retFuture
proc sendTo*(transp: DatagramTransport, remote: TransportAddress,
msg: sink string, msglen = -1): Future[void] =
msg: sink string, msglen = -1): Future[void] {.
async: (raw: true, raises: [TransportError, CancelledError]).} =
## Send string ``msg`` using transport ``transp`` to remote destination
## address ``remote``.
var retFuture = newFutureStr[void]("datagram.transport.sendTo(string)")
var retFuture = newFuture[void]("datagram.transport.sendTo(string)")
transp.checkClosed(retFuture)
when declared(shallowCopy):
if not(isLiteral(msg)):
shallowCopy(retFuture.gcholder, msg)
else:
retFuture.gcholder = msg
else:
retFuture.gcholder = msg
let length = if msglen <= 0: len(msg) else: msglen
let vector = GramVector(kind: WithAddress, buf: addr retFuture.gcholder[0],
var localCopy = msg
retFuture.addCallback(proc(_: pointer) = reset(localCopy))
let vector = GramVector(kind: WithAddress, buf: addr localCopy[0],
buflen: length,
writer: cast[Future[void]](retFuture),
writer: retFuture,
address: remote)
transp.queue.addLast(vector)
if WritePaused in transp.state:
@ -839,20 +970,17 @@ proc sendTo*(transp: DatagramTransport, remote: TransportAddress,
return retFuture
proc sendTo*[T](transp: DatagramTransport, remote: TransportAddress,
msg: sink seq[T], msglen = -1): Future[void] =
msg: sink seq[T], msglen = -1): Future[void] {.
async: (raw: true, raises: [TransportError, CancelledError]).} =
## Send sequence ``msg`` using transport ``transp`` to remote destination
## address ``remote``.
var retFuture = newFutureSeq[void, T]("datagram.transport.sendTo(seq)")
var retFuture = newFuture[void]("datagram.transport.sendTo(seq)")
transp.checkClosed(retFuture)
when declared(shallowCopy):
if not(isLiteral(msg)):
shallowCopy(retFuture.gcholder, msg)
else:
retFuture.gcholder = msg
else:
retFuture.gcholder = msg
let length = if msglen <= 0: (len(msg) * sizeof(T)) else: (msglen * sizeof(T))
let vector = GramVector(kind: WithAddress, buf: addr retFuture.gcholder[0],
var localCopy = msg
retFuture.addCallback(proc(_: pointer) = reset(localCopy))
let vector = GramVector(kind: WithAddress, buf: addr localCopy[0],
buflen: length,
writer: cast[Future[void]](retFuture),
address: remote)
@ -864,7 +992,7 @@ proc sendTo*[T](transp: DatagramTransport, remote: TransportAddress,
return retFuture
proc peekMessage*(transp: DatagramTransport, msg: var seq[byte],
msglen: var int) {.raises: [CatchableError].} =
msglen: var int) {.raises: [TransportError].} =
## Get access to internal message buffer and length of incoming datagram.
if ReadError in transp.state:
transp.state.excl(ReadError)
@ -876,7 +1004,7 @@ proc peekMessage*(transp: DatagramTransport, msg: var seq[byte],
msglen = transp.buflen
proc getMessage*(transp: DatagramTransport): seq[byte] {.
raises: [CatchableError].} =
raises: [TransportError].} =
## Copy data from internal message buffer and return result.
var default: seq[byte]
if ReadError in transp.state:

View File

@ -58,6 +58,8 @@ type
done: bool] {.
gcsafe, raises: [].}
ReaderFuture = Future[void].Raising([TransportError, CancelledError])
const
StreamTransportTrackerName* = "stream.transport"
StreamServerTrackerName* = "stream.server"
@ -68,10 +70,10 @@ when defined(windows):
StreamTransport* = ref object of RootRef
fd*: AsyncFD # File descriptor
state: set[TransportState] # Current Transport state
reader: Future[void] # Current reader Future
reader: ReaderFuture # Current reader Future
buffer: seq[byte] # Reading buffer
offset: int # Reading buffer offset
error: ref CatchableError # Current error
error: ref TransportError # Current error
queue: Deque[StreamVector] # Writer queue
future: Future[void] # Stream life future
# Windows specific part
@ -87,18 +89,18 @@ when defined(windows):
local: TransportAddress # Local address
remote: TransportAddress # Remote address
of TransportKind.Pipe:
todo1: int
discard
of TransportKind.File:
todo2: int
discard
else:
type
StreamTransport* = ref object of RootRef
fd*: AsyncFD # File descriptor
state: set[TransportState] # Current Transport state
reader: Future[void] # Current reader Future
reader: ReaderFuture # Current reader Future
buffer: seq[byte] # Reading buffer
offset: int # Reading buffer offset
error: ref CatchableError # Current error
error: ref TransportError # Current error
queue: Deque[StreamVector] # Writer queue
future: Future[void] # Stream life future
case kind*: TransportKind
@ -107,18 +109,23 @@ else:
local: TransportAddress # Local address
remote: TransportAddress # Remote address
of TransportKind.Pipe:
todo1: int
discard
of TransportKind.File:
todo2: int
discard
type
StreamCallback* = proc(server: StreamServer,
client: StreamTransport): Future[void] {.
gcsafe, raises: [].}
client: StreamTransport) {.async: (raises: []).}
## New remote client connection callback
## ``server`` - StreamServer object.
## ``client`` - accepted client transport.
UnsafeStreamCallback* = proc(server: StreamServer,
client: StreamTransport) {.async.}
## Connection callback that doesn't check for exceptions at compile time
## ``server`` - StreamServer object.
## ``client`` - accepted client transport.
TransportInitCallback* = proc(server: StreamServer,
fd: AsyncFD): StreamTransport {.
gcsafe, raises: [].}
@ -199,7 +206,7 @@ proc completePendingWriteQueue(queue: var Deque[StreamVector],
vector.writer.complete(v)
proc failPendingWriteQueue(queue: var Deque[StreamVector],
error: ref CatchableError) {.inline.} =
error: ref TransportError) {.inline.} =
while len(queue) > 0:
var vector = queue.popFirst()
if not(vector.writer.finished()):
@ -640,7 +647,8 @@ when defined(windows):
localAddress = TransportAddress(),
flags: set[SocketFlags] = {},
dualstack = DualStackType.Auto
): Future[StreamTransport] =
): Future[StreamTransport] {.
async: (raw: true, raises: [TransportError, CancelledError]).} =
## Open new connection to remote peer with address ``address`` and create
## new transport object ``StreamTransport`` for established connection.
## ``bufferSize`` is size of internal buffer for transport.
@ -1031,7 +1039,8 @@ when defined(windows):
server.aovl.data.cb(addr server.aovl)
ok()
proc accept*(server: StreamServer): Future[StreamTransport] =
proc accept*(server: StreamServer): Future[StreamTransport] {.
async: (raw: true, raises: [TransportError, CancelledError]).} =
var retFuture = newFuture[StreamTransport]("stream.server.accept")
doAssert(server.status != ServerStatus.Running,
@ -1472,7 +1481,8 @@ else:
localAddress = TransportAddress(),
flags: set[SocketFlags] = {},
dualstack = DualStackType.Auto,
): Future[StreamTransport] =
): Future[StreamTransport] {.
async: (raw: true, raises: [TransportError, CancelledError]).} =
## Open new connection to remote peer with address ``address`` and create
## new transport object ``StreamTransport`` for established connection.
## ``bufferSize`` - size of internal buffer for transport.
@ -1658,7 +1668,8 @@ else:
transp.state.excl(WritePaused)
ok()
proc accept*(server: StreamServer): Future[StreamTransport] =
proc accept*(server: StreamServer): Future[StreamTransport] {.
async: (raw: true, raises: [TransportError, CancelledError]).} =
var retFuture = newFuture[StreamTransport]("stream.server.accept")
doAssert(server.status != ServerStatus.Running,
@ -1762,7 +1773,8 @@ proc stop*(server: StreamServer) {.raises: [TransportOsError].} =
let res = stop2(server)
if res.isErr(): raiseTransportOsError(res.error())
proc join*(server: StreamServer): Future[void] =
proc join*(server: StreamServer): Future[void] {.
async: (raw: true, raises: [CancelledError]).} =
## Waits until ``server`` is not closed.
var retFuture = newFuture[void]("stream.transport.server.join")
@ -1785,7 +1797,8 @@ proc connect*(address: TransportAddress,
flags: set[TransportFlags],
localAddress = TransportAddress(),
dualstack = DualStackType.Auto
): Future[StreamTransport] =
): Future[StreamTransport] {.
async: (raw: true, raises: [TransportError, CancelledError]).} =
# Retro compatibility with TransportFlags
var mappedFlags: set[SocketFlags]
if TcpNoDelay in flags: mappedFlags.incl(SocketFlags.TcpNoDelay)
@ -1817,7 +1830,8 @@ proc close*(server: StreamServer) =
else:
server.sock.closeSocket(continuation)
proc closeWait*(server: StreamServer): Future[void] =
proc closeWait*(server: StreamServer): Future[void] {.
async: (raw: true, raises: [CancelledError]).} =
## Close server ``server`` and release all resources.
server.close()
server.join()
@ -2066,6 +2080,7 @@ proc createStreamServer*(host: TransportAddress,
sres
proc createStreamServer*(host: TransportAddress,
cbproc: UnsafeStreamCallback,
flags: set[ServerFlags] = {},
sock: AsyncFD = asyncInvalidSocket,
backlog: int = DefaultBacklogSize,
@ -2074,8 +2089,30 @@ proc createStreamServer*(host: TransportAddress,
init: TransportInitCallback = nil,
udata: pointer = nil,
dualstack = DualStackType.Auto): StreamServer {.
raises: [CatchableError].} =
createStreamServer(host, nil, flags, sock, backlog, bufferSize,
raises: [TransportOsError],
deprecated: "Callback must not raise exceptions, annotate with {.async: (raises: []).}".} =
proc wrap(server: StreamServer,
client: StreamTransport) {.async: (raises: []).} =
try:
cbproc(server, client)
except CatchableError as exc:
raiseAssert "Unexpected exception from stream server cbproc: " & exc.msg
createStreamServer(
host, wrap, flags, sock, backlog, bufferSize, child, init, udata,
dualstack)
proc createStreamServer*(host: TransportAddress,
flags: set[ServerFlags] = {},
sock: AsyncFD = asyncInvalidSocket,
backlog: int = DefaultBacklogSize,
bufferSize: int = DefaultStreamBufferSize,
child: StreamServer = nil,
init: TransportInitCallback = nil,
udata: pointer = nil,
dualstack = DualStackType.Auto): StreamServer {.
raises: [TransportOsError].} =
createStreamServer(host, StreamCallback(nil), flags, sock, backlog, bufferSize,
child, init, cast[pointer](udata), dualstack)
proc createStreamServer*[T](host: TransportAddress,
@ -2088,7 +2125,24 @@ proc createStreamServer*[T](host: TransportAddress,
child: StreamServer = nil,
init: TransportInitCallback = nil,
dualstack = DualStackType.Auto): StreamServer {.
raises: [CatchableError].} =
raises: [TransportOsError].} =
var fflags = flags + {GCUserData}
GC_ref(udata)
createStreamServer(host, cbproc, fflags, sock, backlog, bufferSize,
child, init, cast[pointer](udata), dualstack)
proc createStreamServer*[T](host: TransportAddress,
cbproc: UnsafeStreamCallback,
flags: set[ServerFlags] = {},
udata: ref T,
sock: AsyncFD = asyncInvalidSocket,
backlog: int = DefaultBacklogSize,
bufferSize: int = DefaultStreamBufferSize,
child: StreamServer = nil,
init: TransportInitCallback = nil,
dualstack = DualStackType.Auto): StreamServer {.
raises: [TransportOsError],
deprecated: "Callback must not raise exceptions, annotate with {.async: (raises: []).}".} =
var fflags = flags + {GCUserData}
GC_ref(udata)
createStreamServer(host, cbproc, fflags, sock, backlog, bufferSize,
@ -2103,10 +2157,10 @@ proc createStreamServer*[T](host: TransportAddress,
child: StreamServer = nil,
init: TransportInitCallback = nil,
dualstack = DualStackType.Auto): StreamServer {.
raises: [CatchableError].} =
raises: [TransportOsError].} =
var fflags = flags + {GCUserData}
GC_ref(udata)
createStreamServer(host, nil, fflags, sock, backlog, bufferSize,
createStreamServer(host, StreamCallback(nil), fflags, sock, backlog, bufferSize,
child, init, cast[pointer](udata), dualstack)
proc getUserData*[T](server: StreamServer): T {.inline.} =
@ -2157,7 +2211,8 @@ template fastWrite(transp: auto, pbytes: var ptr byte, rbytes: var int,
return retFuture
proc write*(transp: StreamTransport, pbytes: pointer,
nbytes: int): Future[int] =
nbytes: int): Future[int] {.
async: (raw: true, raises: [TransportError, CancelledError]).} =
## Write data from buffer ``pbytes`` with size ``nbytes`` using transport
## ``transp``.
var retFuture = newFuture[int]("stream.transport.write(pointer)")
@ -2179,9 +2234,10 @@ proc write*(transp: StreamTransport, pbytes: pointer,
return retFuture
proc write*(transp: StreamTransport, msg: sink string,
msglen = -1): Future[int] =
msglen = -1): Future[int] {.
async: (raw: true, raises: [TransportError, CancelledError]).} =
## Write data from string ``msg`` using transport ``transp``.
var retFuture = newFutureStr[int]("stream.transport.write(string)")
var retFuture = newFuture[int]("stream.transport.write(string)")
transp.checkClosed(retFuture)
transp.checkWriteEof(retFuture)
@ -2197,17 +2253,10 @@ proc write*(transp: StreamTransport, msg: sink string,
let
written = nbytes - rbytes # In case fastWrite wrote some
pbytes =
when declared(shallowCopy):
if not(isLiteral(msg)):
shallowCopy(retFuture.gcholder, msg)
cast[ptr byte](addr retFuture.gcholder[written])
else:
retFuture.gcholder = msg[written ..< nbytes]
cast[ptr byte](addr retFuture.gcholder[0])
else:
retFuture.gcholder = msg[written ..< nbytes]
cast[ptr byte](addr retFuture.gcholder[0])
var localCopy = msg
retFuture.addCallback(proc(_: pointer) = reset(localCopy))
pbytes = cast[ptr byte](addr localCopy[written])
var vector = StreamVector(kind: DataBuffer, writer: retFuture,
buf: pbytes, buflen: rbytes, size: nbytes)
@ -2218,9 +2267,10 @@ proc write*(transp: StreamTransport, msg: sink string,
return retFuture
proc write*[T](transp: StreamTransport, msg: sink seq[T],
msglen = -1): Future[int] =
msglen = -1): Future[int] {.
async: (raw: true, raises: [TransportError, CancelledError]).} =
## Write sequence ``msg`` using transport ``transp``.
var retFuture = newFutureSeq[int, T]("stream.transport.write(seq)")
var retFuture = newFuture[int]("stream.transport.write(seq)")
transp.checkClosed(retFuture)
transp.checkWriteEof(retFuture)
@ -2236,17 +2286,10 @@ proc write*[T](transp: StreamTransport, msg: sink seq[T],
let
written = nbytes - rbytes # In case fastWrite wrote some
pbytes =
when declared(shallowCopy):
if not(isLiteral(msg)):
shallowCopy(retFuture.gcholder, msg)
cast[ptr byte](addr retFuture.gcholder[written])
else:
retFuture.gcholder = msg[written ..< nbytes]
cast[ptr byte](addr retFuture.gcholder[0])
else:
retFuture.gcholder = msg[written ..< nbytes]
cast[ptr byte](addr retFuture.gcholder[0])
var localCopy = msg
retFuture.addCallback(proc(_: pointer) = reset(localCopy))
pbytes = cast[ptr byte](addr localCopy[written])
var vector = StreamVector(kind: DataBuffer, writer: retFuture,
buf: pbytes, buflen: rbytes, size: nbytes)
@ -2257,7 +2300,8 @@ proc write*[T](transp: StreamTransport, msg: sink seq[T],
return retFuture
proc writeFile*(transp: StreamTransport, handle: int,
offset: uint = 0, size: int = 0): Future[int] =
offset: uint = 0, size: int = 0): Future[int] {.
async: (raw: true, raises: [TransportError, CancelledError]).} =
## Write data from file descriptor ``handle`` to transport ``transp``.
##
## You can specify starting ``offset`` in opened file and number of bytes
@ -2304,7 +2348,7 @@ template readLoop(name, body: untyped): untyped =
break
else:
checkPending(transp)
var fut = newFuture[void](name)
let fut = ReaderFuture.init(name)
transp.reader = fut
let res = resumeRead(transp)
if res.isErr():
@ -2328,7 +2372,8 @@ template readLoop(name, body: untyped): untyped =
await fut
proc readExactly*(transp: StreamTransport, pbytes: pointer,
nbytes: int) {.async.} =
nbytes: int) {.
async: (raises: [TransportError, CancelledError]).} =
## Read exactly ``nbytes`` bytes from transport ``transp`` and store it to
## ``pbytes``. ``pbytes`` must not be ``nil`` pointer and ``nbytes`` should
## be Natural.
@ -2357,7 +2402,8 @@ proc readExactly*(transp: StreamTransport, pbytes: pointer,
(consumed: count, done: index == nbytes)
proc readOnce*(transp: StreamTransport, pbytes: pointer,
nbytes: int): Future[int] {.async.} =
nbytes: int): Future[int] {.
async: (raises: [TransportError, CancelledError]).} =
## Perform one read operation on transport ``transp``.
##
## If internal buffer is not empty, ``nbytes`` bytes will be transferred from
@ -2376,7 +2422,8 @@ proc readOnce*(transp: StreamTransport, pbytes: pointer,
return count
proc readUntil*(transp: StreamTransport, pbytes: pointer, nbytes: int,
sep: seq[byte]): Future[int] {.async.} =
sep: seq[byte]): Future[int] {.
async: (raises: [TransportError, CancelledError]).} =
## Read data from the transport ``transp`` until separator ``sep`` is found.
##
## On success, the data and separator will be removed from the internal
@ -2428,7 +2475,8 @@ proc readUntil*(transp: StreamTransport, pbytes: pointer, nbytes: int,
return k
proc readLine*(transp: StreamTransport, limit = 0,
sep = "\r\n"): Future[string] {.async.} =
sep = "\r\n"): Future[string] {.
async: (raises: [TransportError, CancelledError]).} =
## Read one line from transport ``transp``, where "line" is a sequence of
## bytes ending with ``sep`` (default is "\r\n").
##
@ -2470,7 +2518,8 @@ proc readLine*(transp: StreamTransport, limit = 0,
(index, (state == len(sep)) or (lim == len(result)))
proc read*(transp: StreamTransport): Future[seq[byte]] {.async.} =
proc read*(transp: StreamTransport): Future[seq[byte]] {.
async: (raises: [TransportError, CancelledError]).} =
## Read all bytes from transport ``transp``.
##
## This procedure allocates buffer seq[byte] and return it as result.
@ -2481,7 +2530,8 @@ proc read*(transp: StreamTransport): Future[seq[byte]] {.async.} =
result.add(transp.buffer.toOpenArray(0, transp.offset - 1))
(transp.offset, false)
proc read*(transp: StreamTransport, n: int): Future[seq[byte]] {.async.} =
proc read*(transp: StreamTransport, n: int): Future[seq[byte]] {.
async: (raises: [TransportError, CancelledError]).} =
## Read all bytes (n <= 0) or exactly `n` bytes from transport ``transp``.
##
## This procedure allocates buffer seq[byte] and return it as result.
@ -2496,7 +2546,8 @@ proc read*(transp: StreamTransport, n: int): Future[seq[byte]] {.async.} =
result.add(transp.buffer.toOpenArray(0, count - 1))
(count, len(result) == n)
proc consume*(transp: StreamTransport): Future[int] {.async.} =
proc consume*(transp: StreamTransport): Future[int] {.
async: (raises: [TransportError, CancelledError]).} =
## Consume all bytes from transport ``transp`` and discard it.
##
## Return number of bytes actually consumed and discarded.
@ -2507,7 +2558,8 @@ proc consume*(transp: StreamTransport): Future[int] {.async.} =
result += transp.offset
(transp.offset, false)
proc consume*(transp: StreamTransport, n: int): Future[int] {.async.} =
proc consume*(transp: StreamTransport, n: int): Future[int] {.
async: (raises: [TransportError, CancelledError]).} =
## Consume all bytes (n <= 0) or ``n`` bytes from transport ``transp`` and
## discard it.
##
@ -2524,7 +2576,8 @@ proc consume*(transp: StreamTransport, n: int): Future[int] {.async.} =
(count, result == n)
proc readMessage*(transp: StreamTransport,
predicate: ReadMessagePredicate) {.async.} =
predicate: ReadMessagePredicate) {.
async: (raises: [TransportError, CancelledError]).} =
## Read all bytes from transport ``transp`` until ``predicate`` callback
## will not be satisfied.
##
@ -2547,7 +2600,8 @@ proc readMessage*(transp: StreamTransport,
else:
predicate(transp.buffer.toOpenArray(0, transp.offset - 1))
proc join*(transp: StreamTransport): Future[void] =
proc join*(transp: StreamTransport): Future[void] {.
async: (raw: true, raises: [CancelledError]).} =
## Wait until ``transp`` will not be closed.
var retFuture = newFuture[void]("stream.transport.join")
@ -2606,14 +2660,15 @@ proc close*(transp: StreamTransport) =
elif transp.kind == TransportKind.Socket:
closeSocket(transp.fd, continuation)
proc closeWait*(transp: StreamTransport): Future[void] =
proc closeWait*(transp: StreamTransport): Future[void] {.
async: (raw: true, raises: []).} =
## Close and frees resources of transport ``transp``.
const FutureName = "stream.transport.closeWait"
let retFuture = newFuture[void](
"stream.transport.closeWait", {FutureFlag.OwnCancelSchedule})
if {ReadClosed, WriteClosed} * transp.state != {}:
return Future.completed(FutureName)
let retFuture = newFuture[void](FutureName, {FutureFlag.OwnCancelSchedule})
retFuture.complete()
return retFuture
proc continuation(udata: pointer) {.gcsafe.} =
retFuture.complete()
@ -2631,7 +2686,8 @@ proc closeWait*(transp: StreamTransport): Future[void] =
retFuture.cancelCallback = cancellation
retFuture
proc shutdownWait*(transp: StreamTransport): Future[void] =
proc shutdownWait*(transp: StreamTransport): Future[void] {.
async: (raw: true, raises: [TransportError, CancelledError]).} =
## Perform graceful shutdown of TCP connection backed by transport ``transp``.
doAssert(transp.kind == TransportKind.Socket)
let retFuture = newFuture[void]("stream.transport.shutdown")

View File

@ -87,14 +87,17 @@ suite "AsyncStream test suite":
test "AsyncStream(StreamTransport) readExactly() test":
proc testReadExactly(): Future[bool] {.async.} =
proc serveClient(server: StreamServer,
transp: StreamTransport) {.async.} =
var wstream = newAsyncStreamWriter(transp)
await wstream.write("000000000011111111112222222222")
await wstream.finish()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
transp: StreamTransport) {.async: (raises: []).} =
try:
var wstream = newAsyncStreamWriter(transp)
await wstream.write("000000000011111111112222222222")
await wstream.finish()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
except CatchableError as exc:
raiseAssert exc.msg
var buffer = newSeq[byte](10)
var server = createStreamServer(initTAddress("127.0.0.1:0"),
@ -117,14 +120,17 @@ suite "AsyncStream test suite":
test "AsyncStream(StreamTransport) readUntil() test":
proc testReadUntil(): Future[bool] {.async.} =
proc serveClient(server: StreamServer,
transp: StreamTransport) {.async.} =
var wstream = newAsyncStreamWriter(transp)
await wstream.write("0000000000NNz1111111111NNz2222222222NNz")
await wstream.finish()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
transp: StreamTransport) {.async: (raises: []).} =
try:
var wstream = newAsyncStreamWriter(transp)
await wstream.write("0000000000NNz1111111111NNz2222222222NNz")
await wstream.finish()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
except CatchableError as exc:
raiseAssert exc.msg
var buffer = newSeq[byte](13)
var sep = @[byte('N'), byte('N'), byte('z')]
@ -155,14 +161,17 @@ suite "AsyncStream test suite":
test "AsyncStream(StreamTransport) readLine() test":
proc testReadLine(): Future[bool] {.async.} =
proc serveClient(server: StreamServer,
transp: StreamTransport) {.async.} =
var wstream = newAsyncStreamWriter(transp)
await wstream.write("0000000000\r\n1111111111\r\n2222222222\r\n")
await wstream.finish()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
transp: StreamTransport) {.async: (raises: []).} =
try:
var wstream = newAsyncStreamWriter(transp)
await wstream.write("0000000000\r\n1111111111\r\n2222222222\r\n")
await wstream.finish()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
except CatchableError as exc:
raiseAssert exc.msg
var server = createStreamServer(initTAddress("127.0.0.1:0"),
serveClient, {ReuseAddr})
@ -184,14 +193,17 @@ suite "AsyncStream test suite":
test "AsyncStream(StreamTransport) read() test":
proc testRead(): Future[bool] {.async.} =
proc serveClient(server: StreamServer,
transp: StreamTransport) {.async.} =
var wstream = newAsyncStreamWriter(transp)
await wstream.write("000000000011111111112222222222")
await wstream.finish()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
transp: StreamTransport) {.async: (raises: []).} =
try:
var wstream = newAsyncStreamWriter(transp)
await wstream.write("000000000011111111112222222222")
await wstream.finish()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
except CatchableError as exc:
raiseAssert exc.msg
var server = createStreamServer(initTAddress("127.0.0.1:0"),
serveClient, {ReuseAddr})
@ -211,14 +223,17 @@ suite "AsyncStream test suite":
test "AsyncStream(StreamTransport) consume() test":
proc testConsume(): Future[bool] {.async.} =
proc serveClient(server: StreamServer,
transp: StreamTransport) {.async.} =
var wstream = newAsyncStreamWriter(transp)
await wstream.write("0000000000111111111122222222223333333333")
await wstream.finish()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
transp: StreamTransport) {.async: (raises: []).} =
try:
var wstream = newAsyncStreamWriter(transp)
await wstream.write("0000000000111111111122222222223333333333")
await wstream.finish()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
except CatchableError as exc:
raiseAssert exc.msg
var server = createStreamServer(initTAddress("127.0.0.1:0"),
serveClient, {ReuseAddr})
@ -247,26 +262,29 @@ suite "AsyncStream test suite":
test "AsyncStream(AsyncStream) readExactly() test":
proc testReadExactly2(): Future[bool] {.async.} =
proc serveClient(server: StreamServer,
transp: StreamTransport) {.async.} =
var wstream = newAsyncStreamWriter(transp)
var wstream2 = newChunkedStreamWriter(wstream)
var s1 = "00000"
var s2 = "11111"
var s3 = "22222"
await wstream2.write("00000")
await wstream2.write(addr s1[0], len(s1))
await wstream2.write("11111")
await wstream2.write(s2.toBytes())
await wstream2.write("22222")
await wstream2.write(addr s3[0], len(s3))
transp: StreamTransport) {.async: (raises: []).} =
try:
var wstream = newAsyncStreamWriter(transp)
var wstream2 = newChunkedStreamWriter(wstream)
var s1 = "00000"
var s2 = "11111"
var s3 = "22222"
await wstream2.write("00000")
await wstream2.write(addr s1[0], len(s1))
await wstream2.write("11111")
await wstream2.write(s2.toBytes())
await wstream2.write("22222")
await wstream2.write(addr s3[0], len(s3))
await wstream2.finish()
await wstream.finish()
await wstream2.closeWait()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
await wstream2.finish()
await wstream.finish()
await wstream2.closeWait()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
except CatchableError as exc:
raiseAssert exc.msg
var buffer = newSeq[byte](10)
var server = createStreamServer(initTAddress("127.0.0.1:0"),
@ -299,25 +317,28 @@ suite "AsyncStream test suite":
test "AsyncStream(AsyncStream) readUntil() test":
proc testReadUntil2(): Future[bool] {.async.} =
proc serveClient(server: StreamServer,
transp: StreamTransport) {.async.} =
var wstream = newAsyncStreamWriter(transp)
var wstream2 = newChunkedStreamWriter(wstream)
var s1 = "00000NNz"
var s2 = "11111NNz"
var s3 = "22222NNz"
await wstream2.write("00000")
await wstream2.write(addr s1[0], len(s1))
await wstream2.write("11111")
await wstream2.write(s2)
await wstream2.write("22222")
await wstream2.write(s3.toBytes())
await wstream2.finish()
await wstream.finish()
await wstream2.closeWait()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
transp: StreamTransport) {.async: (raises: []).} =
try:
var wstream = newAsyncStreamWriter(transp)
var wstream2 = newChunkedStreamWriter(wstream)
var s1 = "00000NNz"
var s2 = "11111NNz"
var s3 = "22222NNz"
await wstream2.write("00000")
await wstream2.write(addr s1[0], len(s1))
await wstream2.write("11111")
await wstream2.write(s2)
await wstream2.write("22222")
await wstream2.write(s3.toBytes())
await wstream2.finish()
await wstream.finish()
await wstream2.closeWait()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
except CatchableError as exc:
raiseAssert exc.msg
var buffer = newSeq[byte](13)
var sep = @[byte('N'), byte('N'), byte('z')]
@ -358,22 +379,25 @@ suite "AsyncStream test suite":
test "AsyncStream(AsyncStream) readLine() test":
proc testReadLine2(): Future[bool] {.async.} =
proc serveClient(server: StreamServer,
transp: StreamTransport) {.async.} =
var wstream = newAsyncStreamWriter(transp)
var wstream2 = newChunkedStreamWriter(wstream)
await wstream2.write("00000")
await wstream2.write("00000\r\n")
await wstream2.write("11111")
await wstream2.write("11111\r\n")
await wstream2.write("22222")
await wstream2.write("22222\r\n")
await wstream2.finish()
await wstream.finish()
await wstream2.closeWait()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
transp: StreamTransport) {.async: (raises: []).} =
try:
var wstream = newAsyncStreamWriter(transp)
var wstream2 = newChunkedStreamWriter(wstream)
await wstream2.write("00000")
await wstream2.write("00000\r\n")
await wstream2.write("11111")
await wstream2.write("11111\r\n")
await wstream2.write("22222")
await wstream2.write("22222\r\n")
await wstream2.finish()
await wstream.finish()
await wstream2.closeWait()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
except CatchableError as exc:
raiseAssert exc.msg
var server = createStreamServer(initTAddress("127.0.0.1:0"),
serveClient, {ReuseAddr})
@ -405,21 +429,24 @@ suite "AsyncStream test suite":
test "AsyncStream(AsyncStream) read() test":
proc testRead2(): Future[bool] {.async.} =
proc serveClient(server: StreamServer,
transp: StreamTransport) {.async.} =
var wstream = newAsyncStreamWriter(transp)
var wstream2 = newChunkedStreamWriter(wstream)
var s2 = "1111111111"
var s3 = "2222222222"
await wstream2.write("0000000000")
await wstream2.write(s2)
await wstream2.write(s3.toBytes())
await wstream2.finish()
await wstream.finish()
await wstream2.closeWait()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
transp: StreamTransport) {.async: (raises: []).} =
try:
var wstream = newAsyncStreamWriter(transp)
var wstream2 = newChunkedStreamWriter(wstream)
var s2 = "1111111111"
var s3 = "2222222222"
await wstream2.write("0000000000")
await wstream2.write(s2)
await wstream2.write(s3.toBytes())
await wstream2.finish()
await wstream.finish()
await wstream2.closeWait()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
except CatchableError as exc:
raiseAssert exc.msg
var server = createStreamServer(initTAddress("127.0.0.1:0"),
serveClient, {ReuseAddr})
@ -446,31 +473,34 @@ suite "AsyncStream test suite":
test "AsyncStream(AsyncStream) consume() test":
proc testConsume2(): Future[bool] {.async.} =
proc serveClient(server: StreamServer,
transp: StreamTransport) {.async.} =
const
S4 = @[byte('3'), byte('3'), byte('3'), byte('3'), byte('3')]
var wstream = newAsyncStreamWriter(transp)
var wstream2 = newChunkedStreamWriter(wstream)
transp: StreamTransport) {.async: (raises: []).} =
try:
const
S4 = @[byte('3'), byte('3'), byte('3'), byte('3'), byte('3')]
var wstream = newAsyncStreamWriter(transp)
var wstream2 = newChunkedStreamWriter(wstream)
var s1 = "00000"
var s2 = "11111".toBytes()
var s3 = "22222"
var s1 = "00000"
var s2 = "11111".toBytes()
var s3 = "22222"
await wstream2.write("00000")
await wstream2.write(s1)
await wstream2.write("11111")
await wstream2.write(s2)
await wstream2.write("22222")
await wstream2.write(addr s3[0], len(s3))
await wstream2.write("33333")
await wstream2.write(S4)
await wstream2.finish()
await wstream.finish()
await wstream2.closeWait()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
await wstream2.write("00000")
await wstream2.write(s1)
await wstream2.write("11111")
await wstream2.write(s2)
await wstream2.write("22222")
await wstream2.write(addr s3[0], len(s3))
await wstream2.write("33333")
await wstream2.write(S4)
await wstream2.finish()
await wstream.finish()
await wstream2.closeWait()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
except CatchableError as exc:
raiseAssert exc.msg
var server = createStreamServer(initTAddress("127.0.0.1:0"),
serveClient, {ReuseAddr})
@ -511,27 +541,30 @@ suite "AsyncStream test suite":
message = createBigMessage("ABCDEFGHIJKLMNOP", size)
proc processClient(server: StreamServer,
transp: StreamTransport) {.async.} =
var wstream = newAsyncStreamWriter(transp)
var wbstream = newBoundedStreamWriter(wstream, uint64(size))
transp: StreamTransport) {.async: (raises: []).} =
try:
check wbstream.atEof() == false
await wbstream.write(message)
check wbstream.atEof() == false
await wbstream.finish()
check wbstream.atEof() == true
expect AsyncStreamWriteEOFError:
var wstream = newAsyncStreamWriter(transp)
var wbstream = newBoundedStreamWriter(wstream, uint64(size))
try:
check wbstream.atEof() == false
await wbstream.write(message)
expect AsyncStreamWriteEOFError:
await wbstream.write(message)
expect AsyncStreamWriteEOFError:
await wbstream.write(message)
check wbstream.atEof() == true
await wbstream.closeWait()
check wbstream.atEof() == true
finally:
await wstream.closeWait()
await transp.closeWait()
check wbstream.atEof() == false
await wbstream.finish()
check wbstream.atEof() == true
expect AsyncStreamWriteEOFError:
await wbstream.write(message)
expect AsyncStreamWriteEOFError:
await wbstream.write(message)
expect AsyncStreamWriteEOFError:
await wbstream.write(message)
check wbstream.atEof() == true
await wbstream.closeWait()
check wbstream.atEof() == true
finally:
await wstream.closeWait()
await transp.closeWait()
except CatchableError as exc:
raiseAssert exc.msg
let flags = {ServerFlags.ReuseAddr, ServerFlags.TcpNoDelay}
var server = createStreamServer(initTAddress("127.0.0.1:0"),
@ -580,15 +613,18 @@ suite "ChunkedStream test suite":
]
proc checkVector(inputstr: string): Future[string] {.async.} =
proc serveClient(server: StreamServer,
transp: StreamTransport) {.async.} =
var wstream = newAsyncStreamWriter(transp)
var data = inputstr
await wstream.write(data)
await wstream.finish()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
transp: StreamTransport) {.async: (raises: []).} =
try:
var wstream = newAsyncStreamWriter(transp)
var data = inputstr
await wstream.write(data)
await wstream.finish()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
except CatchableError as exc:
raiseAssert exc.msg
var server = createStreamServer(initTAddress("127.0.0.1:0"),
serveClient, {ReuseAddr})
@ -630,15 +666,18 @@ suite "ChunkedStream test suite":
]
proc checkVector(inputstr: string): Future[bool] {.async.} =
proc serveClient(server: StreamServer,
transp: StreamTransport) {.async.} =
var wstream = newAsyncStreamWriter(transp)
var data = inputstr
await wstream.write(data)
await wstream.finish()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
transp: StreamTransport) {.async: (raises: []).} =
try:
var wstream = newAsyncStreamWriter(transp)
var data = inputstr
await wstream.write(data)
await wstream.finish()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
except CatchableError as exc:
raiseAssert exc.msg
var res = false
var server = createStreamServer(initTAddress("127.0.0.1:0"),
@ -713,14 +752,17 @@ suite "ChunkedStream test suite":
test "ChunkedStream too big chunk header test":
proc checkTooBigChunkHeader(inputstr: seq[byte]): Future[bool] {.async.} =
proc serveClient(server: StreamServer,
transp: StreamTransport) {.async.} =
var wstream = newAsyncStreamWriter(transp)
await wstream.write(inputstr)
await wstream.finish()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
transp: StreamTransport) {.async: (raises: []).} =
try:
var wstream = newAsyncStreamWriter(transp)
await wstream.write(inputstr)
await wstream.finish()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
except CatchableError as exc:
raiseAssert exc.msg
var server = createStreamServer(initTAddress("127.0.0.1:0"),
serveClient, {ReuseAddr})
@ -751,23 +793,26 @@ suite "ChunkedStream test suite":
proc checkVector(inputstr: seq[byte],
chunkSize: int): Future[seq[byte]] {.async.} =
proc serveClient(server: StreamServer,
transp: StreamTransport) {.async.} =
var wstream = newAsyncStreamWriter(transp)
var wstream2 = newChunkedStreamWriter(wstream)
var data = inputstr
var offset = 0
while true:
if len(data) == offset:
break
let toWrite = min(chunkSize, len(data) - offset)
await wstream2.write(addr data[offset], toWrite)
offset = offset + toWrite
await wstream2.finish()
await wstream2.closeWait()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
transp: StreamTransport) {.async: (raises: []).} =
try:
var wstream = newAsyncStreamWriter(transp)
var wstream2 = newChunkedStreamWriter(wstream)
var data = inputstr
var offset = 0
while true:
if len(data) == offset:
break
let toWrite = min(chunkSize, len(data) - offset)
await wstream2.write(addr data[offset], toWrite)
offset = offset + toWrite
await wstream2.finish()
await wstream2.closeWait()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
except CatchableError as exc:
raiseAssert exc.msg
var server = createStreamServer(initTAddress("127.0.0.1:0"),
serveClient, {ReuseAddr})
@ -796,23 +841,26 @@ suite "ChunkedStream test suite":
writeChunkSize: int,
readChunkSize: int): Future[seq[byte]] {.async.} =
proc serveClient(server: StreamServer,
transp: StreamTransport) {.async.} =
var wstream = newAsyncStreamWriter(transp)
var wstream2 = newChunkedStreamWriter(wstream)
var data = inputstr
var offset = 0
while true:
if len(data) == offset:
break
let toWrite = min(writeChunkSize, len(data) - offset)
await wstream2.write(addr data[offset], toWrite)
offset = offset + toWrite
await wstream2.finish()
await wstream2.closeWait()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
transp: StreamTransport) {.async: (raises: []).} =
try:
var wstream = newAsyncStreamWriter(transp)
var wstream2 = newChunkedStreamWriter(wstream)
var data = inputstr
var offset = 0
while true:
if len(data) == offset:
break
let toWrite = min(writeChunkSize, len(data) - offset)
await wstream2.write(addr data[offset], toWrite)
offset = offset + toWrite
await wstream2.finish()
await wstream2.closeWait()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
except CatchableError as exc:
raiseAssert exc.msg
var server = createStreamServer(initTAddress("127.0.0.1:0"),
serveClient, {ReuseAddr})
@ -849,30 +897,33 @@ suite "TLSStream test suite":
const HttpHeadersMark = @[byte(0x0D), byte(0x0A), byte(0x0D), byte(0x0A)]
test "Simple HTTPS connection":
proc headerClient(address: TransportAddress,
name: string): Future[bool] {.async.} =
var mark = "HTTP/1.1 "
var buffer = newSeq[byte](8192)
var transp = await connect(address)
var reader = newAsyncStreamReader(transp)
var writer = newAsyncStreamWriter(transp)
var tlsstream = newTLSClientAsyncStream(reader, writer, name)
await tlsstream.writer.write("GET / HTTP/1.1\r\nHost: " & name &
"\r\nConnection: close\r\n\r\n")
var readFut = tlsstream.reader.readUntil(addr buffer[0], len(buffer),
HttpHeadersMark)
let res = await withTimeout(readFut, 5.seconds)
if res:
var length = readFut.read()
buffer.setLen(length)
if len(buffer) > len(mark):
if equalMem(addr buffer[0], addr mark[0], len(mark)):
result = true
name: string): Future[bool] {.async: (raises: []).} =
try:
var mark = "HTTP/1.1 "
var buffer = newSeq[byte](8192)
var transp = await connect(address)
var reader = newAsyncStreamReader(transp)
var writer = newAsyncStreamWriter(transp)
var tlsstream = newTLSClientAsyncStream(reader, writer, name)
await tlsstream.writer.write("GET / HTTP/1.1\r\nHost: " & name &
"\r\nConnection: close\r\n\r\n")
var readFut = tlsstream.reader.readUntil(addr buffer[0], len(buffer),
HttpHeadersMark)
let res = await withTimeout(readFut, 5.seconds)
if res:
var length = readFut.read()
buffer.setLen(length)
if len(buffer) > len(mark):
if equalMem(addr buffer[0], addr mark[0], len(mark)):
result = true
await tlsstream.reader.closeWait()
await tlsstream.writer.closeWait()
await reader.closeWait()
await writer.closeWait()
await transp.closeWait()
await tlsstream.reader.closeWait()
await tlsstream.writer.closeWait()
await reader.closeWait()
await writer.closeWait()
await transp.closeWait()
except CatchableError as exc:
raiseAssert exc.msg
let res = waitFor(headerClient(resolveTAddress("www.google.com:443")[0],
"www.google.com"))
@ -884,20 +935,23 @@ suite "TLSStream test suite":
let testMessage = "TEST MESSAGE"
proc serveClient(server: StreamServer,
transp: StreamTransport) {.async.} =
var reader = newAsyncStreamReader(transp)
var writer = newAsyncStreamWriter(transp)
var sstream = newTLSServerAsyncStream(reader, writer, key, cert)
await handshake(sstream)
await sstream.writer.write(testMessage & "\r\n")
await sstream.writer.finish()
await sstream.writer.closeWait()
await sstream.reader.closeWait()
await reader.closeWait()
await writer.closeWait()
await transp.closeWait()
server.stop()
server.close()
transp: StreamTransport) {.async: (raises: []).} =
try:
var reader = newAsyncStreamReader(transp)
var writer = newAsyncStreamWriter(transp)
var sstream = newTLSServerAsyncStream(reader, writer, key, cert)
await handshake(sstream)
await sstream.writer.write(testMessage & "\r\n")
await sstream.writer.finish()
await sstream.writer.closeWait()
await sstream.reader.closeWait()
await reader.closeWait()
await writer.closeWait()
await transp.closeWait()
server.stop()
server.close()
except CatchableError as exc:
raiseAssert exc.msg
key = TLSPrivateKey.init(pemkey)
cert = TLSCertificate.init(pemcert)
@ -931,20 +985,23 @@ suite "TLSStream test suite":
let trustAnchors = TrustAnchorStore.new(SelfSignedTrustAnchors)
proc serveClient(server: StreamServer,
transp: StreamTransport) {.async.} =
var reader = newAsyncStreamReader(transp)
var writer = newAsyncStreamWriter(transp)
var sstream = newTLSServerAsyncStream(reader, writer, key, cert)
await handshake(sstream)
await sstream.writer.write(testMessage & "\r\n")
await sstream.writer.finish()
await sstream.writer.closeWait()
await sstream.reader.closeWait()
await reader.closeWait()
await writer.closeWait()
await transp.closeWait()
server.stop()
server.close()
transp: StreamTransport) {.async: (raises: []).} =
try:
var reader = newAsyncStreamReader(transp)
var writer = newAsyncStreamWriter(transp)
var sstream = newTLSServerAsyncStream(reader, writer, key, cert)
await handshake(sstream)
await sstream.writer.write(testMessage & "\r\n")
await sstream.writer.finish()
await sstream.writer.closeWait()
await sstream.reader.closeWait()
await reader.closeWait()
await writer.closeWait()
await transp.closeWait()
server.stop()
server.close()
except CatchableError as exc:
raiseAssert exc.msg
var server = createStreamServer(initTAddress("127.0.0.1:0"),
serveClient, {ReuseAddr})
@ -988,46 +1045,49 @@ suite "BoundedStream test suite":
var clientRes = false
proc processClient(server: StreamServer,
transp: StreamTransport) {.async.} =
var wstream = newAsyncStreamWriter(transp)
case btest
of BoundaryRead:
await wstream.write(message)
await wstream.write(boundary)
await wstream.finish()
await wstream.closeWait()
clientRes = true
of BoundaryDouble:
await wstream.write(message)
await wstream.write(boundary)
await wstream.write(message)
await wstream.finish()
await wstream.closeWait()
clientRes = true
of BoundarySize:
var ncmessage = message
ncmessage.setLen(len(message) - 2)
await wstream.write(ncmessage)
await wstream.write(@[0x2D'u8, 0x2D'u8])
await wstream.finish()
await wstream.closeWait()
clientRes = true
of BoundaryIncomplete:
var ncmessage = message
ncmessage.setLen(len(message) - 2)
await wstream.write(ncmessage)
await wstream.finish()
await wstream.closeWait()
clientRes = true
of BoundaryEmpty:
await wstream.write(boundary)
await wstream.finish()
await wstream.closeWait()
clientRes = true
transp: StreamTransport) {.async: (raises: []).} =
try:
var wstream = newAsyncStreamWriter(transp)
case btest
of BoundaryRead:
await wstream.write(message)
await wstream.write(boundary)
await wstream.finish()
await wstream.closeWait()
clientRes = true
of BoundaryDouble:
await wstream.write(message)
await wstream.write(boundary)
await wstream.write(message)
await wstream.finish()
await wstream.closeWait()
clientRes = true
of BoundarySize:
var ncmessage = message
ncmessage.setLen(len(message) - 2)
await wstream.write(ncmessage)
await wstream.write(@[0x2D'u8, 0x2D'u8])
await wstream.finish()
await wstream.closeWait()
clientRes = true
of BoundaryIncomplete:
var ncmessage = message
ncmessage.setLen(len(message) - 2)
await wstream.write(ncmessage)
await wstream.finish()
await wstream.closeWait()
clientRes = true
of BoundaryEmpty:
await wstream.write(boundary)
await wstream.finish()
await wstream.closeWait()
clientRes = true
await transp.closeWait()
server.stop()
server.close()
await transp.closeWait()
server.stop()
server.close()
except CatchableError as exc:
raiseAssert exc.msg
var res = false
let flags = {ServerFlags.ReuseAddr, ServerFlags.TcpNoDelay}
@ -1090,60 +1150,63 @@ suite "BoundedStream test suite":
message.add(messagePart)
proc processClient(server: StreamServer,
transp: StreamTransport) {.async.} =
var wstream = newAsyncStreamWriter(transp)
var wbstream = newBoundedStreamWriter(wstream, uint64(size),
comparison = cmp)
case stest
of SizeReadWrite:
for i in 0 ..< 10:
await wbstream.write(messagePart)
await wbstream.finish()
await wbstream.closeWait()
clientRes = true
of SizeOverflow:
for i in 0 ..< 10:
await wbstream.write(messagePart)
try:
await wbstream.write(messagePart)
except BoundedStreamOverflowError:
transp: StreamTransport) {.async: (raises: []).} =
try:
var wstream = newAsyncStreamWriter(transp)
var wbstream = newBoundedStreamWriter(wstream, uint64(size),
comparison = cmp)
case stest
of SizeReadWrite:
for i in 0 ..< 10:
await wbstream.write(messagePart)
await wbstream.finish()
await wbstream.closeWait()
clientRes = true
await wbstream.closeWait()
of SizeIncomplete:
for i in 0 ..< 9:
await wbstream.write(messagePart)
case cmp
of BoundCmp.Equal:
of SizeOverflow:
for i in 0 ..< 10:
await wbstream.write(messagePart)
try:
await wbstream.finish()
except BoundedStreamIncompleteError:
await wbstream.write(messagePart)
except BoundedStreamOverflowError:
clientRes = true
of BoundCmp.LessOrEqual:
try:
await wbstream.finish()
clientRes = true
except BoundedStreamIncompleteError:
discard
await wbstream.closeWait()
of SizeEmpty:
case cmp
of BoundCmp.Equal:
try:
await wbstream.finish()
except BoundedStreamIncompleteError:
clientRes = true
of BoundCmp.LessOrEqual:
try:
await wbstream.finish()
clientRes = true
except BoundedStreamIncompleteError:
discard
await wbstream.closeWait()
await wbstream.closeWait()
of SizeIncomplete:
for i in 0 ..< 9:
await wbstream.write(messagePart)
case cmp
of BoundCmp.Equal:
try:
await wbstream.finish()
except BoundedStreamIncompleteError:
clientRes = true
of BoundCmp.LessOrEqual:
try:
await wbstream.finish()
clientRes = true
except BoundedStreamIncompleteError:
discard
await wbstream.closeWait()
of SizeEmpty:
case cmp
of BoundCmp.Equal:
try:
await wbstream.finish()
except BoundedStreamIncompleteError:
clientRes = true
of BoundCmp.LessOrEqual:
try:
await wbstream.finish()
clientRes = true
except BoundedStreamIncompleteError:
discard
await wbstream.closeWait()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
except CatchableError as exc:
raiseAssert exc.msg
let flags = {ServerFlags.ReuseAddr, ServerFlags.TcpNoDelay}
var server = createStreamServer(initTAddress("127.0.0.1:0"),
@ -1243,23 +1306,26 @@ suite "BoundedStream test suite":
writeChunkSize: int,
readChunkSize: int): Future[seq[byte]] {.async.} =
proc serveClient(server: StreamServer,
transp: StreamTransport) {.async.} =
var wstream = newAsyncStreamWriter(transp)
var wstream2 = newBoundedStreamWriter(wstream, uint64(len(inputstr)))
var data = inputstr
var offset = 0
while true:
if len(data) == offset:
break
let toWrite = min(writeChunkSize, len(data) - offset)
await wstream2.write(addr data[offset], toWrite)
offset = offset + toWrite
await wstream2.finish()
await wstream2.closeWait()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
transp: StreamTransport) {.async: (raises: []).} =
try:
var wstream = newAsyncStreamWriter(transp)
var wstream2 = newBoundedStreamWriter(wstream, uint64(len(inputstr)))
var data = inputstr
var offset = 0
while true:
if len(data) == offset:
break
let toWrite = min(writeChunkSize, len(data) - offset)
await wstream2.write(addr data[offset], toWrite)
offset = offset + toWrite
await wstream2.finish()
await wstream2.closeWait()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
except CatchableError as exc:
raiseAssert exc.msg
var server = createStreamServer(initTAddress("127.0.0.1:0"),
serveClient, {ReuseAddr})
@ -1293,17 +1359,20 @@ suite "BoundedStream test suite":
proc checkEmptyStreams(): Future[bool] {.async.} =
var writer1Res = false
proc serveClient(server: StreamServer,
transp: StreamTransport) {.async.} =
var wstream = newAsyncStreamWriter(transp)
var wstream2 = newBoundedStreamWriter(wstream, 0'u64)
await wstream2.finish()
let res = wstream2.atEof()
await wstream2.closeWait()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
writer1Res = res
transp: StreamTransport) {.async: (raises: []).} =
try:
var wstream = newAsyncStreamWriter(transp)
var wstream2 = newBoundedStreamWriter(wstream, 0'u64)
await wstream2.finish()
let res = wstream2.atEof()
await wstream2.closeWait()
await wstream.closeWait()
await transp.closeWait()
server.stop()
server.close()
writer1Res = res
except CatchableError as exc:
raiseAssert exc.msg
var server = createStreamServer(initTAddress("127.0.0.1:0"),
serveClient, {ReuseAddr})

View File

@ -21,16 +21,19 @@ suite "Asynchronous issues test suite":
test: string
proc udp4DataAvailable(transp: DatagramTransport,
remote: TransportAddress) {.async, gcsafe.} =
var udata = getUserData[CustomData](transp)
var expect = TEST_MSG
var data: seq[byte]
var datalen: int
transp.peekMessage(data, datalen)
if udata.test == "CHECK" and datalen == MSG_LEN and
equalMem(addr data[0], addr expect[0], datalen):
udata.test = "OK"
transp.close()
remote: TransportAddress) {.async: (raises: []).} =
try:
var udata = getUserData[CustomData](transp)
var expect = TEST_MSG
var data: seq[byte]
var datalen: int
transp.peekMessage(data, datalen)
if udata.test == "CHECK" and datalen == MSG_LEN and
equalMem(addr data[0], addr expect[0], datalen):
udata.test = "OK"
transp.close()
except CatchableError as exc:
raiseAssert exc.msg
proc issue6(): Future[bool] {.async.} =
var myself = initTAddress("127.0.0.1:" & $HELLO_PORT)

View File

@ -30,286 +30,319 @@ suite "Datagram Transport test suite":
" clients x " & $MessagesCount & " messages)"
proc client1(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var pbytes = transp.getMessage()
var nbytes = len(pbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
if data.startsWith("REQUEST"):
var numstr = data[7..^1]
var num = parseInt(numstr)
var ans = "ANSWER" & $num
await transp.sendTo(raddr, addr ans[0], len(ans))
raddr: TransportAddress): Future[void] {.async: (raises: []).} =
try:
var pbytes = transp.getMessage()
var nbytes = len(pbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
if data.startsWith("REQUEST"):
var numstr = data[7..^1]
var num = parseInt(numstr)
var ans = "ANSWER" & $num
await transp.sendTo(raddr, addr ans[0], len(ans))
else:
var err = "ERROR"
await transp.sendTo(raddr, addr err[0], len(err))
else:
var err = "ERROR"
await transp.sendTo(raddr, addr err[0], len(err))
else:
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
except CatchableError as exc:
raiseAssert exc.msg
proc client2(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var pbytes = transp.getMessage()
var nbytes = len(pbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = counterPtr[] + 1
if counterPtr[] == TestsCount:
transp.close()
raddr: TransportAddress): Future[void] {.async: (raises: []).} =
try:
var pbytes = transp.getMessage()
var nbytes = len(pbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = counterPtr[] + 1
if counterPtr[] == TestsCount:
transp.close()
else:
var ta = initTAddress("127.0.0.1:33336")
var req = "REQUEST" & $counterPtr[]
await transp.sendTo(ta, addr req[0], len(req))
else:
var ta = initTAddress("127.0.0.1:33336")
var req = "REQUEST" & $counterPtr[]
await transp.sendTo(ta, addr req[0], len(req))
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
else:
## Read operation failed with error
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
else:
## Read operation failed with error
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
except CatchableError as exc:
raiseAssert exc.msg
proc client3(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var pbytes = transp.getMessage()
var nbytes = len(pbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = counterPtr[] + 1
if counterPtr[] == TestsCount:
transp.close()
raddr: TransportAddress): Future[void] {.async: (raises: []).} =
try:
var pbytes = transp.getMessage()
var nbytes = len(pbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = counterPtr[] + 1
if counterPtr[] == TestsCount:
transp.close()
else:
var req = "REQUEST" & $counterPtr[]
await transp.send(addr req[0], len(req))
else:
var req = "REQUEST" & $counterPtr[]
await transp.send(addr req[0], len(req))
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
else:
## Read operation failed with error
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
else:
## Read operation failed with error
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
except CatchableError as exc:
raiseAssert exc.msg
proc client4(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var pbytes = transp.getMessage()
var nbytes = len(pbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = counterPtr[] + 1
if counterPtr[] == MessagesCount:
transp.close()
raddr: TransportAddress): Future[void] {.async: (raises: []).} =
try:
var pbytes = transp.getMessage()
var nbytes = len(pbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = counterPtr[] + 1
if counterPtr[] == MessagesCount:
transp.close()
else:
var req = "REQUEST" & $counterPtr[]
await transp.send(addr req[0], len(req))
else:
var req = "REQUEST" & $counterPtr[]
await transp.send(addr req[0], len(req))
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
else:
## Read operation failed with error
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
else:
## Read operation failed with error
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
except CatchableError as exc:
raiseAssert exc.msg
proc client5(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var pbytes = transp.getMessage()
var nbytes = len(pbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = counterPtr[] + 1
if counterPtr[] == MessagesCount:
transp.close()
raddr: TransportAddress): Future[void] {.async: (raises: []).} =
try:
var pbytes = transp.getMessage()
var nbytes = len(pbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = counterPtr[] + 1
if counterPtr[] == MessagesCount:
transp.close()
else:
var req = "REQUEST" & $counterPtr[]
await transp.sendTo(raddr, addr req[0], len(req))
else:
var req = "REQUEST" & $counterPtr[]
await transp.sendTo(raddr, addr req[0], len(req))
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
else:
## Read operation failed with error
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
else:
## Read operation failed with error
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
except CatchableError as exc:
raiseAssert exc.msg
proc client6(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var pbytes = transp.getMessage()
var nbytes = len(pbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
if data.startsWith("REQUEST"):
var numstr = data[7..^1]
var num = parseInt(numstr)
var ans = "ANSWER" & $num
await transp.sendTo(raddr, ans)
raddr: TransportAddress): Future[void] {.async: (raises: []).} =
try:
var pbytes = transp.getMessage()
var nbytes = len(pbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
if data.startsWith("REQUEST"):
var numstr = data[7..^1]
var num = parseInt(numstr)
var ans = "ANSWER" & $num
await transp.sendTo(raddr, ans)
else:
var err = "ERROR"
await transp.sendTo(raddr, err)
else:
var err = "ERROR"
await transp.sendTo(raddr, err)
else:
## Read operation failed with error
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
## Read operation failed with error
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
except CatchableError as exc:
raiseAssert exc.msg
proc client7(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var pbytes = transp.getMessage()
var nbytes = len(pbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = counterPtr[] + 1
if counterPtr[] == TestsCount:
transp.close()
raddr: TransportAddress): Future[void] {.async: (raises: []).} =
try:
var pbytes = transp.getMessage()
var nbytes = len(pbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = counterPtr[] + 1
if counterPtr[] == TestsCount:
transp.close()
else:
var req = "REQUEST" & $counterPtr[]
await transp.sendTo(raddr, req)
else:
var req = "REQUEST" & $counterPtr[]
await transp.sendTo(raddr, req)
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
else:
## Read operation failed with error
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
else:
## Read operation failed with error
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
except CatchableError as exc:
raiseAssert exc.msg
proc client8(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var pbytes = transp.getMessage()
var nbytes = len(pbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = counterPtr[] + 1
if counterPtr[] == TestsCount:
transp.close()
raddr: TransportAddress): Future[void] {.async: (raises: []).} =
try:
var pbytes = transp.getMessage()
var nbytes = len(pbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = counterPtr[] + 1
if counterPtr[] == TestsCount:
transp.close()
else:
var req = "REQUEST" & $counterPtr[]
await transp.send(req)
else:
var req = "REQUEST" & $counterPtr[]
await transp.send(req)
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
else:
## Read operation failed with error
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
else:
## Read operation failed with error
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
except CatchableError as exc:
raiseAssert exc.msg
proc client9(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var pbytes = transp.getMessage()
var nbytes = len(pbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
if data.startsWith("REQUEST"):
var numstr = data[7..^1]
var num = parseInt(numstr)
var ans = "ANSWER" & $num
var ansseq = newSeq[byte](len(ans))
copyMem(addr ansseq[0], addr ans[0], len(ans))
await transp.sendTo(raddr, ansseq)
raddr: TransportAddress): Future[void] {.async: (raises: []).} =
try:
var pbytes = transp.getMessage()
var nbytes = len(pbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
if data.startsWith("REQUEST"):
var numstr = data[7..^1]
var num = parseInt(numstr)
var ans = "ANSWER" & $num
var ansseq = newSeq[byte](len(ans))
copyMem(addr ansseq[0], addr ans[0], len(ans))
await transp.sendTo(raddr, ansseq)
else:
var err = "ERROR"
var errseq = newSeq[byte](len(err))
copyMem(addr errseq[0], addr err[0], len(err))
await transp.sendTo(raddr, errseq)
else:
var err = "ERROR"
var errseq = newSeq[byte](len(err))
copyMem(addr errseq[0], addr err[0], len(err))
await transp.sendTo(raddr, errseq)
else:
## Read operation failed with error
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
## Read operation failed with error
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
except CatchableError as exc:
raiseAssert exc.msg
proc client10(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var pbytes = transp.getMessage()
var nbytes = len(pbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = counterPtr[] + 1
if counterPtr[] == TestsCount:
transp.close()
raddr: TransportAddress): Future[void] {.async: (raises: []).} =
try:
var pbytes = transp.getMessage()
var nbytes = len(pbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = counterPtr[] + 1
if counterPtr[] == TestsCount:
transp.close()
else:
var req = "REQUEST" & $counterPtr[]
var reqseq = newSeq[byte](len(req))
copyMem(addr reqseq[0], addr req[0], len(req))
await transp.sendTo(raddr, reqseq)
else:
var req = "REQUEST" & $counterPtr[]
var reqseq = newSeq[byte](len(req))
copyMem(addr reqseq[0], addr req[0], len(req))
await transp.sendTo(raddr, reqseq)
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
else:
## Read operation failed with error
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
else:
## Read operation failed with error
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
except CatchableError as exc:
raiseAssert exc.msg
proc client11(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var pbytes = transp.getMessage()
var nbytes = len(pbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = counterPtr[] + 1
if counterPtr[] == TestsCount:
transp.close()
raddr: TransportAddress): Future[void] {.async: (raises: []).} =
try:
var pbytes = transp.getMessage()
var nbytes = len(pbytes)
if nbytes > 0:
var data = newString(nbytes + 1)
copyMem(addr data[0], addr pbytes[0], nbytes)
data.setLen(nbytes)
if data.startsWith("ANSWER"):
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = counterPtr[] + 1
if counterPtr[] == TestsCount:
transp.close()
else:
var req = "REQUEST" & $counterPtr[]
var reqseq = newSeq[byte](len(req))
copyMem(addr reqseq[0], addr req[0], len(req))
await transp.send(reqseq)
else:
var req = "REQUEST" & $counterPtr[]
var reqseq = newSeq[byte](len(req))
copyMem(addr reqseq[0], addr req[0], len(req))
await transp.send(reqseq)
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
else:
## Read operation failed with error
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
else:
## Read operation failed with error
var counterPtr = cast[ptr int](transp.udata)
counterPtr[] = -1
transp.close()
except CatchableError as exc:
raiseAssert exc.msg
proc testPointerSendTo(): Future[int] {.async.} =
## sendTo(pointer) test
@ -439,7 +472,7 @@ suite "Datagram Transport test suite":
var ta = initTAddress("127.0.0.1:0")
var counter = 0
proc clientMark(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
raddr: TransportAddress): Future[void] {.async: (raises: []).} =
counter = 1
transp.close()
var dgram1 = newDatagramTransport(client1, local = ta)
@ -457,7 +490,7 @@ suite "Datagram Transport test suite":
proc testTransportClose(): Future[bool] {.async.} =
var ta = initTAddress("127.0.0.1:45000")
proc clientMark(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
raddr: TransportAddress): Future[void] {.async: (raises: []).} =
discard
var dgram = newDatagramTransport(clientMark, local = ta)
dgram.close()
@ -473,12 +506,15 @@ suite "Datagram Transport test suite":
var bta = initTAddress("255.255.255.255:45010")
var res = 0
proc clientMark(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var bmsg = transp.getMessage()
var smsg = string.fromBytes(bmsg)
if smsg == expectMessage:
inc(res)
transp.close()
raddr: TransportAddress): Future[void] {.async: (raises: []).} =
try:
var bmsg = transp.getMessage()
var smsg = string.fromBytes(bmsg)
if smsg == expectMessage:
inc(res)
transp.close()
except CatchableError as exc:
raiseAssert exc.msg
var dgram1 = newDatagramTransport(clientMark, local = ta1,
flags = {Broadcast}, ttl = 2)
await dgram1.sendTo(bta, expectMessage)
@ -493,15 +529,19 @@ suite "Datagram Transport test suite":
var event = newAsyncEvent()
proc clientMark1(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var bmsg = transp.getMessage()
var smsg = string.fromBytes(bmsg)
if smsg == expectStr:
inc(res)
event.fire()
raddr: TransportAddress): Future[void] {.async: (raises: []).} =
try:
var bmsg = transp.getMessage()
var smsg = string.fromBytes(bmsg)
if smsg == expectStr:
inc(res)
event.fire()
except CatchableError as exc:
raiseAssert exc.msg
proc clientMark2(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
raddr: TransportAddress): Future[void] {.async: (raises: []).} =
discard
var dgram1 = newDatagramTransport(clientMark1, local = ta)
@ -544,15 +584,18 @@ suite "Datagram Transport test suite":
res = 0
proc process1(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
var bmsg = transp.getMessage()
var smsg = string.fromBytes(bmsg)
if smsg == expectStr:
inc(res)
event.fire()
raddr: TransportAddress): Future[void] {.async: (raises: []).} =
try:
var bmsg = transp.getMessage()
var smsg = string.fromBytes(bmsg)
if smsg == expectStr:
inc(res)
event.fire()
except CatchableError as exc:
raiseAssert exc.msg
proc process2(transp: DatagramTransport,
raddr: TransportAddress): Future[void] {.async.} =
raddr: TransportAddress): Future[void] {.async: (raises: []).} =
discard
let

View File

@ -459,20 +459,31 @@ suite "Exceptions tracking":
check waitFor(test1()) == 12
proc test2: Future[int] {.async: (raw: true, raises: [IOError, OSError]).} =
checkNotCompiles:
result.fail(newException(ValueError, "fail"))
result = newFuture[int]()
result.fail(newException(IOError, "fail"))
result.fail(newException(OSError, "fail"))
checkNotCompiles:
result.fail(newException(ValueError, "fail"))
proc test3: Future[void] {.async: (raw: true, raises: []).} =
result = newFuture[void]()
checkNotCompiles:
result.fail(newException(ValueError, "fail"))
result.complete()
# Inheritance
proc test4: Future[void] {.async: (raw: true, raises: [CatchableError]).} =
result = newFuture[void]()
result.fail(newException(IOError, "fail"))
check:
waitFor(test1()) == 12
expect(IOError):
discard waitFor(test2())
waitFor(test3())
expect(IOError):
waitFor(test4())
test "or errors":
proc testit {.async: (raises: [ValueError]).} =
raise (ref ValueError)()

View File

@ -27,29 +27,36 @@ suite "Server's test suite":
checkLeaks()
proc serveStreamClient(server: StreamServer,
transp: StreamTransport) {.async.} =
transp: StreamTransport) {.async: (raises: []).} =
discard
proc serveCustomStreamClient(server: StreamServer,
transp: StreamTransport) {.async.} =
var cserver = cast[CustomServer](server)
var ctransp = cast[CustomTransport](transp)
cserver.test1 = "CONNECTION"
cserver.test2 = ctransp.test
cserver.test3 = await transp.readLine()
var answer = "ANSWER\r\n"
discard await transp.write(answer)
transp.close()
await transp.join()
transp: StreamTransport) {.async: (raises: []).} =
try:
var cserver = cast[CustomServer](server)
var ctransp = cast[CustomTransport](transp)
cserver.test1 = "CONNECTION"
cserver.test2 = ctransp.test
cserver.test3 = await transp.readLine()
var answer = "ANSWER\r\n"
discard await transp.write(answer)
transp.close()
await transp.join()
except CatchableError as exc:
raiseAssert exc.msg
proc serveUdataStreamClient(server: StreamServer,
transp: StreamTransport) {.async.} =
var udata = getUserData[CustomData](server)
var line = await transp.readLine()
var msg = line & udata.test & "\r\n"
discard await transp.write(msg)
transp.close()
await transp.join()
transp: StreamTransport) {.async: (raises: []).} =
try:
var udata = getUserData[CustomData](server)
var line = await transp.readLine()
var msg = line & udata.test & "\r\n"
discard await transp.write(msg)
transp.close()
await transp.join()
except CatchableError as exc:
raiseAssert exc.msg
proc customServerTransport(server: StreamServer,
fd: AsyncFD): StreamTransport =

View File

@ -55,124 +55,148 @@ suite "Stream Transport test suite":
for i in 0 ..< len(result):
result[i] = byte(message[i mod len(message)])
proc serveClient1(server: StreamServer, transp: StreamTransport) {.async.} =
while not transp.atEof():
var data = await transp.readLine()
if len(data) == 0:
doAssert(transp.atEof())
break
doAssert(data.startsWith("REQUEST"))
var numstr = data[7..^1]
var num = parseInt(numstr)
var ans = "ANSWER" & $num & "\r\n"
var res = await transp.write(cast[pointer](addr ans[0]), len(ans))
doAssert(res == len(ans))
transp.close()
await transp.join()
proc serveClient1(server: StreamServer, transp: StreamTransport) {.
async: (raises: []).} =
try:
while not transp.atEof():
var data = await transp.readLine()
if len(data) == 0:
doAssert(transp.atEof())
break
doAssert(data.startsWith("REQUEST"))
var numstr = data[7..^1]
var num = parseInt(numstr)
var ans = "ANSWER" & $num & "\r\n"
var res = await transp.write(cast[pointer](addr ans[0]), len(ans))
doAssert(res == len(ans))
transp.close()
await transp.join()
except CatchableError as exc:
raiseAssert exc.msg
proc serveClient2(server: StreamServer, transp: StreamTransport) {.async.} =
var buffer: array[20, char]
var check = "REQUEST"
while not transp.atEof():
zeroMem(addr buffer[0], MessageSize)
try:
await transp.readExactly(addr buffer[0], MessageSize)
except TransportIncompleteError:
break
doAssert(equalMem(addr buffer[0], addr check[0], len(check)))
var numstr = ""
var i = 7
while i < MessageSize and (buffer[i] in {'0'..'9'}):
numstr.add(buffer[i])
inc(i)
var num = parseInt(numstr)
var ans = "ANSWER" & $num
zeroMem(addr buffer[0], MessageSize)
copyMem(addr buffer[0], addr ans[0], len(ans))
var res = await transp.write(cast[pointer](addr buffer[0]), MessageSize)
doAssert(res == MessageSize)
transp.close()
await transp.join()
proc serveClient2(server: StreamServer, transp: StreamTransport) {.
async: (raises: []).} =
try:
var buffer: array[20, char]
var check = "REQUEST"
while not transp.atEof():
zeroMem(addr buffer[0], MessageSize)
try:
await transp.readExactly(addr buffer[0], MessageSize)
except TransportIncompleteError:
break
doAssert(equalMem(addr buffer[0], addr check[0], len(check)))
var numstr = ""
var i = 7
while i < MessageSize and (buffer[i] in {'0'..'9'}):
numstr.add(buffer[i])
inc(i)
var num = parseInt(numstr)
var ans = "ANSWER" & $num
zeroMem(addr buffer[0], MessageSize)
copyMem(addr buffer[0], addr ans[0], len(ans))
var res = await transp.write(cast[pointer](addr buffer[0]), MessageSize)
doAssert(res == MessageSize)
transp.close()
await transp.join()
except CatchableError as exc:
raiseAssert exc.msg
proc serveClient3(server: StreamServer, transp: StreamTransport) {.async.} =
var buffer: array[20, char]
var check = "REQUEST"
var suffixStr = "SUFFIX"
var suffix = newSeq[byte](6)
copyMem(addr suffix[0], addr suffixStr[0], len(suffixStr))
var counter = MessagesCount
while counter > 0:
zeroMem(addr buffer[0], MessageSize)
var res = await transp.readUntil(addr buffer[0], MessageSize, suffix)
doAssert(equalMem(addr buffer[0], addr check[0], len(check)))
var numstr = ""
var i = 7
while i < MessageSize and (buffer[i] in {'0'..'9'}):
numstr.add(buffer[i])
inc(i)
var num = parseInt(numstr)
doAssert(len(numstr) < 8)
var ans = "ANSWER" & $num & "SUFFIX"
zeroMem(addr buffer[0], MessageSize)
copyMem(addr buffer[0], addr ans[0], len(ans))
res = await transp.write(cast[pointer](addr buffer[0]), len(ans))
doAssert(res == len(ans))
dec(counter)
transp.close()
await transp.join()
proc serveClient3(server: StreamServer, transp: StreamTransport) {.
async: (raises: []).} =
try:
var buffer: array[20, char]
var check = "REQUEST"
var suffixStr = "SUFFIX"
var suffix = newSeq[byte](6)
copyMem(addr suffix[0], addr suffixStr[0], len(suffixStr))
var counter = MessagesCount
while counter > 0:
zeroMem(addr buffer[0], MessageSize)
var res = await transp.readUntil(addr buffer[0], MessageSize, suffix)
doAssert(equalMem(addr buffer[0], addr check[0], len(check)))
var numstr = ""
var i = 7
while i < MessageSize and (buffer[i] in {'0'..'9'}):
numstr.add(buffer[i])
inc(i)
var num = parseInt(numstr)
doAssert(len(numstr) < 8)
var ans = "ANSWER" & $num & "SUFFIX"
zeroMem(addr buffer[0], MessageSize)
copyMem(addr buffer[0], addr ans[0], len(ans))
res = await transp.write(cast[pointer](addr buffer[0]), len(ans))
doAssert(res == len(ans))
dec(counter)
transp.close()
await transp.join()
except CatchableError as exc:
raiseAssert exc.msg
proc serveClient4(server: StreamServer, transp: StreamTransport) {.async.} =
var pathname = await transp.readLine()
var size = await transp.readLine()
var sizeNum = parseInt(size)
doAssert(sizeNum >= 0)
var rbuffer = newSeq[byte](sizeNum)
await transp.readExactly(addr rbuffer[0], sizeNum)
var lbuffer = readFile(pathname)
doAssert(len(lbuffer) == sizeNum)
doAssert(equalMem(addr rbuffer[0], addr lbuffer[0], sizeNum))
var answer = "OK\r\n"
var res = await transp.write(cast[pointer](addr answer[0]), len(answer))
doAssert(res == len(answer))
transp.close()
await transp.join()
proc serveClient4(server: StreamServer, transp: StreamTransport) {.
async: (raises: []).} =
try:
var pathname = await transp.readLine()
var size = await transp.readLine()
var sizeNum = parseInt(size)
doAssert(sizeNum >= 0)
var rbuffer = newSeq[byte](sizeNum)
await transp.readExactly(addr rbuffer[0], sizeNum)
var lbuffer = readFile(pathname)
doAssert(len(lbuffer) == sizeNum)
doAssert(equalMem(addr rbuffer[0], addr lbuffer[0], sizeNum))
var answer = "OK\r\n"
var res = await transp.write(cast[pointer](addr answer[0]), len(answer))
doAssert(res == len(answer))
transp.close()
await transp.join()
except CatchableError as exc:
raiseAssert exc.msg
proc serveClient7(server: StreamServer, transp: StreamTransport) {.async.} =
var answer = "DONE\r\n"
var expect = ""
var line = await transp.readLine()
doAssert(len(line) == BigMessageCount * len(BigMessagePattern))
for i in 0..<BigMessageCount:
expect.add(BigMessagePattern)
doAssert(line == expect)
var res = await transp.write(answer)
doAssert(res == len(answer))
transp.close()
await transp.join()
server.stop()
server.close()
proc serveClient7(server: StreamServer, transp: StreamTransport) {.
async: (raises: []).} =
try:
var answer = "DONE\r\n"
var expect = ""
var line = await transp.readLine()
doAssert(len(line) == BigMessageCount * len(BigMessagePattern))
for i in 0..<BigMessageCount:
expect.add(BigMessagePattern)
doAssert(line == expect)
var res = await transp.write(answer)
doAssert(res == len(answer))
transp.close()
await transp.join()
server.stop()
server.close()
except CatchableError as exc:
raiseAssert exc.msg
proc serveClient8(server: StreamServer, transp: StreamTransport) {.async.} =
var answer = "DONE\r\n"
var strpattern = BigMessagePattern
var pattern = newSeq[byte](len(BigMessagePattern))
var expect = newSeq[byte]()
var data = newSeq[byte]((BigMessageCount + 1) * len(BigMessagePattern))
var sep = @[0x0D'u8, 0x0A'u8]
copyMem(addr pattern[0], addr strpattern[0], len(BigMessagePattern))
var count = await transp.readUntil(addr data[0], len(data), sep = sep)
doAssert(count == BigMessageCount * len(BigMessagePattern) + 2)
for i in 0..<BigMessageCount:
expect.add(pattern)
expect.add(sep)
data.setLen(count)
doAssert(expect == data)
var res = await transp.write(answer)
doAssert(res == len(answer))
transp.close()
await transp.join()
server.stop()
server.close()
proc serveClient8(server: StreamServer, transp: StreamTransport) {.
async: (raises: []).} =
try:
var answer = "DONE\r\n"
var strpattern = BigMessagePattern
var pattern = newSeq[byte](len(BigMessagePattern))
var expect = newSeq[byte]()
var data = newSeq[byte]((BigMessageCount + 1) * len(BigMessagePattern))
var sep = @[0x0D'u8, 0x0A'u8]
copyMem(addr pattern[0], addr strpattern[0], len(BigMessagePattern))
var count = await transp.readUntil(addr data[0], len(data), sep = sep)
doAssert(count == BigMessageCount * len(BigMessagePattern) + 2)
for i in 0..<BigMessageCount:
expect.add(pattern)
expect.add(sep)
data.setLen(count)
doAssert(expect == data)
var res = await transp.write(answer)
doAssert(res == len(answer))
transp.close()
await transp.join()
server.stop()
server.close()
except CatchableError as exc:
raiseAssert exc.msg
proc swarmWorker1(address: TransportAddress): Future[int] {.async.} =
var transp = await connect(address)
@ -399,18 +423,22 @@ suite "Stream Transport test suite":
var res = workers[i].read()
result += res
proc serveClient(server: StreamServer, transp: StreamTransport) {.async.} =
var data = await transp.read()
doAssert(len(data) == len(ConstantMessage) * MessagesCount)
transp.close()
var expect = ""
for i in 0..<MessagesCount:
expect.add(ConstantMessage)
doAssert(equalMem(addr expect[0], addr data[0], len(data)))
dec(counter)
if counter == 0:
server.stop()
server.close()
proc serveClient(server: StreamServer, transp: StreamTransport) {.
async: (raises: []).} =
try:
var data = await transp.read()
doAssert(len(data) == len(ConstantMessage) * MessagesCount)
transp.close()
var expect = ""
for i in 0..<MessagesCount:
expect.add(ConstantMessage)
doAssert(equalMem(addr expect[0], addr data[0], len(data)))
dec(counter)
if counter == 0:
server.stop()
server.close()
except CatchableError as exc:
raiseAssert exc.msg
var server = createStreamServer(address, serveClient, {ReuseAddr})
server.start()
@ -420,18 +448,22 @@ suite "Stream Transport test suite":
proc testWCR(address: TransportAddress): Future[int] {.async.} =
var counter = ClientsCount
proc serveClient(server: StreamServer, transp: StreamTransport) {.async.} =
var expect = ConstantMessage
var skip = await transp.consume(len(ConstantMessage) * (MessagesCount - 1))
doAssert(skip == len(ConstantMessage) * (MessagesCount - 1))
var data = await transp.read()
doAssert(len(data) == len(ConstantMessage))
transp.close()
doAssert(equalMem(addr data[0], addr expect[0], len(expect)))
dec(counter)
if counter == 0:
server.stop()
server.close()
proc serveClient(server: StreamServer, transp: StreamTransport) {.
async: (raises: []).} =
try:
var expect = ConstantMessage
var skip = await transp.consume(len(ConstantMessage) * (MessagesCount - 1))
doAssert(skip == len(ConstantMessage) * (MessagesCount - 1))
var data = await transp.read()
doAssert(len(data) == len(ConstantMessage))
transp.close()
doAssert(equalMem(addr data[0], addr expect[0], len(expect)))
dec(counter)
if counter == 0:
server.stop()
server.close()
except CatchableError as exc:
raiseAssert exc.msg
proc swarmWorker(address: TransportAddress): Future[int] {.async.} =
var transp = await connect(address)
@ -534,11 +566,15 @@ suite "Stream Transport test suite":
# server.close()
# await server.join()
proc serveClient11(server: StreamServer, transp: StreamTransport) {.async.} =
var res = await transp.write(BigMessagePattern)
doAssert(res == len(BigMessagePattern))
transp.close()
await transp.join()
proc serveClient11(server: StreamServer, transp: StreamTransport) {.
async: (raises: []).} =
try:
var res = await transp.write(BigMessagePattern)
doAssert(res == len(BigMessagePattern))
transp.close()
await transp.join()
except CatchableError as exc:
raiseAssert exc.msg
proc swarmWorker11(address: TransportAddress): Future[int] {.async.} =
var buffer: array[len(BigMessagePattern) + 1, byte]
@ -558,11 +594,15 @@ suite "Stream Transport test suite":
server.close()
await server.join()
proc serveClient12(server: StreamServer, transp: StreamTransport) {.async.} =
var res = await transp.write(BigMessagePattern)
doAssert(res == len(BigMessagePattern))
transp.close()
await transp.join()
proc serveClient12(server: StreamServer, transp: StreamTransport) {.
async: (raises: []).} =
try:
var res = await transp.write(BigMessagePattern)
doAssert(res == len(BigMessagePattern))
transp.close()
await transp.join()
except CatchableError as exc:
raiseAssert exc.msg
proc swarmWorker12(address: TransportAddress): Future[int] {.async.} =
var buffer: array[len(BigMessagePattern), byte]
@ -584,9 +624,13 @@ suite "Stream Transport test suite":
server.close()
await server.join()
proc serveClient13(server: StreamServer, transp: StreamTransport) {.async.} =
transp.close()
await transp.join()
proc serveClient13(server: StreamServer, transp: StreamTransport) {.
async: (raises: []).} =
try:
transp.close()
await transp.join()
except CatchableError as exc:
raiseAssert exc.msg
proc swarmWorker13(address: TransportAddress): Future[int] {.async.} =
var transp = await connect(address)
@ -645,11 +689,15 @@ suite "Stream Transport test suite":
else:
return (e.code == oserrno.ECONNREFUSED) or (e.code == oserrno.ENOENT)
proc serveClient16(server: StreamServer, transp: StreamTransport) {.async.} =
var res = await transp.write(BigMessagePattern)
doAssert(res == len(BigMessagePattern))
transp.close()
await transp.join()
proc serveClient16(server: StreamServer, transp: StreamTransport) {.
async: (raises: []).} =
try:
var res = await transp.write(BigMessagePattern)
doAssert(res == len(BigMessagePattern))
transp.close()
await transp.join()
except CatchableError as exc:
raiseAssert exc.msg
proc swarmWorker16(address: TransportAddress): Future[int] {.async.} =
var buffer = newString(5)
@ -680,7 +728,8 @@ suite "Stream Transport test suite":
await server.join()
proc testCloseTransport(address: TransportAddress): Future[int] {.async.} =
proc client(server: StreamServer, transp: StreamTransport) {.async.} =
proc client(server: StreamServer, transp: StreamTransport) {.
async: (raises: []).} =
discard
var server = createStreamServer(address, client, {ReuseAddr})
server.start()
@ -694,9 +743,12 @@ suite "Stream Transport test suite":
proc testWriteConnReset(address: TransportAddress): Future[int] {.async.} =
var syncFut = newFuture[void]()
proc client(server: StreamServer, transp: StreamTransport) {.async.} =
await transp.closeWait()
syncFut.complete()
proc client(server: StreamServer, transp: StreamTransport) {.async: (raises: []).} =
try:
await transp.closeWait()
syncFut.complete()
except CatchableError as exc:
raiseAssert exc.msg
var n = 10
var server = createStreamServer(address, client, {ReuseAddr})
server.start()
@ -721,12 +773,16 @@ suite "Stream Transport test suite":
var serverRemote, serverLocal: TransportAddress
var connRemote, connLocal: TransportAddress
proc serveClient(server: StreamServer, transp: StreamTransport) {.async.} =
serverRemote = transp.remoteAddress()
serverLocal = transp.localAddress()
await transp.closeWait()
server.stop()
server.close()
proc serveClient(server: StreamServer, transp: StreamTransport) {.
async: (raises: []).} =
try:
serverRemote = transp.remoteAddress()
serverLocal = transp.localAddress()
await transp.closeWait()
server.stop()
server.close()
except CatchableError as exc:
raiseAssert exc.msg
var ta = initTAddress("0.0.0.0:0")
var server = createStreamServer(ta, serveClient, {ReuseAddr})
@ -748,13 +804,17 @@ suite "Stream Transport test suite":
var bigMessageSize = 10 * 1024 * 1024 - 1
var finishMessage = "DONE"
var cdata = newSeqOfCap[byte](bigMessageSize)
proc serveClient(server: StreamServer, transp: StreamTransport) {.async.} =
cdata = await transp.read(bigMessageSize)
var size = await transp.write(finishMessage)
doAssert(size == len(finishMessage))
await transp.closeWait()
server.stop()
server.close()
proc serveClient(server: StreamServer, transp: StreamTransport) {.
async: (raises: []).} =
try:
cdata = await transp.read(bigMessageSize)
var size = await transp.write(finishMessage)
doAssert(size == len(finishMessage))
await transp.closeWait()
server.stop()
server.close()
except CatchableError as exc:
raiseAssert exc.msg
var flag = false
var server = createStreamServer(address, serveClient, {ReuseAddr})
@ -787,10 +847,15 @@ suite "Stream Transport test suite":
result = flag
proc testReadLine(address: TransportAddress): Future[bool] {.async.} =
proc serveClient(server: StreamServer, transp: StreamTransport) {.async.} =
discard await transp.write("DATA\r\r\r\r\r\n")
transp.close()
await transp.join()
proc serveClient(server: StreamServer, transp: StreamTransport) {.
async: (raises: []).} =
try:
discard await transp.write("DATA\r\r\r\r\r\n")
transp.close()
await transp.join()
except CatchableError as exc:
raiseAssert exc.msg
var server = createStreamServer(address, serveClient, {ReuseAddr})
server.start()
@ -895,52 +960,56 @@ suite "Stream Transport test suite":
var state = 0
var c1, c2, c3, c4, c5, c6, c7: bool
proc serveClient(server: StreamServer, transp: StreamTransport) {.async.} =
if state == 0:
# EOF from the beginning.
state = 1
await transp.closeWait()
elif state == 1:
# Message has only zero-size header.
var message = createLVMessage(0'u32)
discard await transp.write(message)
state = 2
await transp.closeWait()
elif state == 2:
# Message has header, but do not have any data at all.
var message = createLVMessage(4'u32)
message.setLen(4)
discard await transp.write(message)
state = 3
await transp.closeWait()
elif state == 3:
# Message do not have enough data for specified size in header.
var message = createLVMessage(1024'u32)
message.setLen(1024)
discard await transp.write(message)
state = 4
await transp.closeWait()
elif state == 4:
# Good encoded message with oversize.
var message = createLVMessage(1024'u32)
discard await transp.write(message)
state = 5
await transp.closeWait()
elif state == 5:
# Good encoded message.
var message = createLVMessage(1024'u32)
discard await transp.write(message)
state = 6
await transp.closeWait()
elif state == 6:
# Good encoded message with additional data.
var message = createLVMessage(1024'u32)
discard await transp.write(message)
discard await transp.write("DONE")
state = 7
await transp.closeWait()
else:
doAssert(false)
proc serveClient(server: StreamServer, transp: StreamTransport) {.
async: (raises: []).} =
try:
if state == 0:
# EOF from the beginning.
state = 1
await transp.closeWait()
elif state == 1:
# Message has only zero-size header.
var message = createLVMessage(0'u32)
discard await transp.write(message)
state = 2
await transp.closeWait()
elif state == 2:
# Message has header, but do not have any data at all.
var message = createLVMessage(4'u32)
message.setLen(4)
discard await transp.write(message)
state = 3
await transp.closeWait()
elif state == 3:
# Message do not have enough data for specified size in header.
var message = createLVMessage(1024'u32)
message.setLen(1024)
discard await transp.write(message)
state = 4
await transp.closeWait()
elif state == 4:
# Good encoded message with oversize.
var message = createLVMessage(1024'u32)
discard await transp.write(message)
state = 5
await transp.closeWait()
elif state == 5:
# Good encoded message.
var message = createLVMessage(1024'u32)
discard await transp.write(message)
state = 6
await transp.closeWait()
elif state == 6:
# Good encoded message with additional data.
var message = createLVMessage(1024'u32)
discard await transp.write(message)
discard await transp.write("DONE")
state = 7
await transp.closeWait()
else:
doAssert(false)
except CatchableError as exc:
raiseAssert exc.msg
var server = createStreamServer(address, serveClient, {ReuseAddr})
server.start()
@ -1260,8 +1329,11 @@ suite "Stream Transport test suite":
proc testConnectBindLocalAddress() {.async.} =
proc client(server: StreamServer, transp: StreamTransport) {.async.} =
await transp.closeWait()
proc client(server: StreamServer, transp: StreamTransport) {.async: (raises: []).} =
try:
await transp.closeWait()
except CatchableError as exc:
raiseAssert exc.msg
let server1 = createStreamServer(initTAddress("127.0.0.1:0"), client)
let server2 = createStreamServer(initTAddress("127.0.0.1:0"), client)
@ -1302,8 +1374,11 @@ suite "Stream Transport test suite":
await server3.closeWait()
proc testConnectCancelLeaksTest() {.async.} =
proc client(server: StreamServer, transp: StreamTransport) {.async.} =
await transp.closeWait()
proc client(server: StreamServer, transp: StreamTransport) {.async: (raises: []).} =
try:
await transp.closeWait()
except CatchableError as exc:
raiseAssert exc.msg
let
server = createStreamServer(initTAddress("127.0.0.1:0"), client)