From 4abd7a56450db8eb6578a151af0a5b7ba61bd2bf Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Wed, 24 Mar 2021 10:08:33 +0100 Subject: [PATCH] exception tracking (#166) * exception tracking This PR adds minimal exception tracking to chronos, moving the goalpost one step further. In particular, it becomes invalid to raise exceptions from `callSoon` callbacks: this is critical for writing correct error handling because there's no reasonable way that a user of chronos can possibly _reason_ about exceptions coming out of there: the event loop will be in an indeterminite state when the loop is executing an _random_ callback. As expected, there are several issues in the error handling of chronos: in particular, it will end up in an inconsistent internal state whenever the selector loop operations fail, because the internal state update functions are not written in an exception-safe way. This PR turns this into a Defect, which probably is not the optimal way of handling things - expect more work to be done here. Some API have no way of reporting back errors to callers - for example, when something fails in the accept loop, there's not much it can do, and no way to report it back to the user of the API - this has been fixed with the new accept flow - the old one should be deprecated. Finally, there is information loss in the API: in composite operations like `poll` and `waitFor` there's no way to differentiate internal errors from user-level errors originating from callbacks. * store `CatchableError` in future * annotate proc's with correct raises information * `selectors2` to avoid non-CatchableError IOSelectorsException * `$` should never raise * remove unnecessary gcsafe annotations * fix exceptions leaking out of timer waits * fix some imports * functions must signal raising the union of all exceptions across all platforms to enable cross-platform code * switch to unittest2 * add `selectors2` which supercedes the std library version and fixes several exception handling issues in there * fixes * docs, platform-independent eh specifiers for some functions * add feature flag for strict exception mode also bump version to 3.0.0 - _most_ existing code should be compatible with this version of exception handling but some things might need fixing - callbacks, existing raises specifications etc. * fix AsyncCheck for non-void T --- README.md | 43 +- chronos.nimble | 7 +- chronos/apps.nim | 2 +- chronos/apps/http/httpserver.nim | 5 +- chronos/asyncfutures2.nim | 192 +++---- chronos/asyncloop.nim | 148 +++--- chronos/asyncmacro2.nim | 146 ++++-- chronos/asyncsync.nim | 23 +- chronos/debugutils.nim | 5 +- chronos/handles.nim | 17 +- chronos/ioselects/ioselectors_epoll.nim | 524 +++++++++++++++++++ chronos/ioselects/ioselectors_kqueue.nim | 625 +++++++++++++++++++++++ chronos/ioselects/ioselectors_poll.nim | 310 +++++++++++ chronos/ioselects/ioselectors_select.nim | 465 +++++++++++++++++ chronos/selectors2.nim | 360 +++++++++++++ chronos/sendfile.nim | 2 + chronos/srcloc.nim | 2 + chronos/streams/asyncstream.nim | 25 +- chronos/timer.nim | 2 + chronos/transport.nim | 4 +- chronos/transports/common.nim | 44 +- chronos/transports/datagram.nim | 71 ++- chronos/transports/ipnet.nim | 24 +- chronos/transports/osnet.nim | 20 +- chronos/transports/stream.nim | 318 +++++++++--- tests/testaddress.nim | 2 +- tests/testasyncstream.nim | 2 +- tests/testbugs.nim | 2 +- tests/testdatagram.nim | 3 +- tests/testfut.nim | 2 +- tests/testhttpserver.nim | 3 +- tests/testmacro.nim | 2 +- tests/testnet.nim | 2 +- tests/testserver.nim | 2 +- tests/testshttpserver.nim | 3 +- tests/testsignal.nim | 12 +- tests/testsoon.nim | 8 +- tests/teststream.nim | 5 +- tests/testsync.nim | 2 +- tests/testtime.nim | 3 +- tests/testutils.nim | 2 +- 41 files changed, 3053 insertions(+), 386 deletions(-) create mode 100644 chronos/ioselects/ioselectors_epoll.nim create mode 100644 chronos/ioselects/ioselectors_kqueue.nim create mode 100644 chronos/ioselects/ioselectors_poll.nim create mode 100644 chronos/ioselects/ioselectors_select.nim create mode 100644 chronos/selectors2.nim diff --git a/README.md b/README.md index 6c51a404..6f33d17b 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,7 @@ Chronos is an efficient [async/await](https://en.wikipedia.org/wiki/Async/await) * Cancellation support * Synchronization primitivies like queues, events and locks * FIFO processing order of dispatch queue +* Minimal exception effect support (see [exception effects](#exception-effects)) ## Installation @@ -198,15 +199,49 @@ proc p3() {.async.} = fut2 = p2() try: await fut1 - except: + except CachableError: echo "p1() failed: ", fut1.error.name, ": ", fut1.error.msg echo "reachable code here" await fut2 ``` -Exceptions inheriting from `Defect` are treated differently, being raised -directly. Don't try to catch them coming out of `poll()`, because this would -leave behind some zombie futures. +Chronos does not allow that future continuations and other callbacks raise +`CatchableError` - as such, calls to `poll` will never raise exceptions caused +originating from tasks on the dispatcher queue. It is however possible that +`Defect` that happen in tasks bubble up through `poll` as these are not caught +by the transformation. + +### Platform independence + +Several functions in `chronos` are backed by the operating system, such as +waiting for network events, creating files and sockets etc. The specific +exceptions that are raised by the OS is platform-dependent, thus such functions +are declared as raising `CatchableError` but will in general raise something +more specific. In particular, it's possible that some functions that are +annotated as raising `CatchableError` only raise on _some_ platforms - in order +to work on all platforms, calling code must assume that they will raise even +when they don't seem to do so on one platform. + +### Exception effects + +`chronos` currently offers minimal support for exception effects and `raises` +annotations. In general, during the `async` transformation, a generic +`except CatchableError` handler is added around the entire function being +transformed, in order to catch any exceptions and transfer them to the `Future`. +Because of this, the effect system thinks no exceptions are "leaking" because in +fact, exception _handling_ is deferred to when the future is being read. + +Effectively, this means that while code can be compiled with +`{.push raises: [Defect]}`, the intended effect propagation and checking is +**disabled** for `async` functions. + +To enable checking exception effects in `async` code, enable strict mode with +`-d:chronosStrictException`. + +In the strict mode, `async` functions are checked such that they only raise +`CatchableError` and thus must make sure to explicitly specify exception +effects on forward declarations, callbacks and methods using +`{.raises: [CatchableError].}` (or more strict) annotations. ## TODO * Pipe/Subprocess Transports. diff --git a/chronos.nimble b/chronos.nimble index 89e93af1..8d376347 100644 --- a/chronos.nimble +++ b/chronos.nimble @@ -1,5 +1,5 @@ packageName = "chronos" -version = "2.6.1" +version = "3.0.0" author = "Status Research & Development GmbH" description = "Chronos" license = "Apache License 2.0 or MIT" @@ -10,12 +10,13 @@ skipDirs = @["tests"] requires "nim > 1.2.0", "stew", "bearssl", - "httputils" + "httputils", + "https://github.com/status-im/nim-unittest2.git#head" task test, "Run all tests": var commands = @[ "nim c -r -d:useSysAssert -d:useGcAssert tests/", - "nim c -r -d:chronosStackTrace tests/", + "nim c -r -d:chronosStackTrace -d:chronosStrictException tests/", "nim c -r -d:release tests/", "nim c -r -d:release -d:chronosFutureTracking tests/" ] diff --git a/chronos/apps.nim b/chronos/apps.nim index eef6160d..a044ca3c 100644 --- a/chronos/apps.nim +++ b/chronos/apps.nim @@ -6,5 +6,5 @@ # Licensed under either of # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) -import apps/http/httpserver, apps/http/shttpserver +import ./apps/http/[httpserver, shttpserver] export httpserver, shttpserver diff --git a/chronos/apps/http/httpserver.nim b/chronos/apps/http/httpserver.nim index 0ab1c11f..00f60319 100644 --- a/chronos/apps/http/httpserver.nim +++ b/chronos/apps/http/httpserver.nim @@ -44,11 +44,12 @@ type Empty, Prepared, Sending, Finished, Failed, Cancelled, Dumb HttpProcessCallback* = - proc(req: RequestFence): Future[HttpResponseRef] {.gcsafe.} + proc(req: RequestFence): Future[HttpResponseRef] {. + gcsafe, raises: [Defect, CatchableError].} HttpConnectionCallback* = proc(server: HttpServerRef, - transp: StreamTransport): Future[HttpConnectionRef] {.gcsafe.} + transp: StreamTransport): Future[HttpConnectionRef] {.gcsafe, raises: [Defect].} HttpServer* = object of RootObj instance*: StreamServer diff --git a/chronos/asyncfutures2.nim b/chronos/asyncfutures2.nim index 4d87bf28..15355b23 100644 --- a/chronos/asyncfutures2.nim +++ b/chronos/asyncfutures2.nim @@ -9,7 +9,7 @@ # MIT license (LICENSE-MIT) import std/[os, tables, strutils, heapqueue, options, deques, cstrutils, sequtils] -import srcloc +import ./srcloc export srcloc const @@ -29,7 +29,7 @@ type cancelcb*: CallbackFunc child*: FutureBase state*: FutureState - error*: ref Exception ## Stored exception + error*: ref CatchableError ## Stored exception mustCancel*: bool id*: int @@ -171,7 +171,7 @@ proc done*(future: FutureBase): bool {.inline.} = completed(future) when defined(chronosFutureTracking): - proc futureDestructor(udata: pointer) {.gcsafe.} = + proc futureDestructor(udata: pointer) = ## This procedure will be called when Future[T] got finished, cancelled or ## failed and all Future[T].callbacks are already scheduled and processed. let future = cast[FutureBase](udata) @@ -271,7 +271,7 @@ template complete*[T](futvar: FutureVar[T], val: T) = ## Any previously stored value will be overwritten. complete(futvar, val, getSrcLocation()) -proc fail[T](future: Future[T], error: ref Exception, loc: ptr SrcLoc) = +proc fail[T](future: Future[T], error: ref CatchableError, loc: ptr SrcLoc) = if not(future.cancelled()): checkFinished(FutureBase(future), loc) future.error = error @@ -282,7 +282,7 @@ proc fail[T](future: Future[T], error: ref Exception, loc: ptr SrcLoc) = getStackTrace(error) future.finish(FutureState.Failed) -template fail*[T](future: Future[T], error: ref Exception) = +template fail*[T](future: Future[T], error: ref CatchableError) = ## Completes ``future`` with ``error``. fail(future, error, getSrcLocation()) @@ -406,37 +406,39 @@ proc getHint(entry: StackTraceEntry): string = return "Resumes an async procedure" proc `$`*(entries: seq[StackTraceEntry]): string = - result = "" - # Find longest filename & line number combo for alignment purposes. - var longestLeft = 0 - for entry in entries: - if isNil(entry.procName): continue + try: + # Find longest filename & line number combo for alignment purposes. + var longestLeft = 0 + for entry in entries: + if isNil(entry.procName): continue - let left = $entry.filename & $entry.line - if left.len > longestLeft: - longestLeft = left.len + let left = $entry.filename & $entry.line + if left.len > longestLeft: + longestLeft = left.len - var indent = 2 - # Format the entries. - for entry in entries: - if isNil(entry.procName): - if entry.line == -10: - result.add(spaces(indent) & "#[\n") - indent.inc(2) - else: - indent.dec(2) - result.add(spaces(indent) & "]#\n") - continue + var indent = 2 + # Format the entries. + for entry in entries: + if isNil(entry.procName): + if entry.line == -10: + result.add(spaces(indent) & "#[\n") + indent.inc(2) + else: + indent.dec(2) + result.add(spaces(indent) & "]#\n") + continue - let left = "$#($#)" % [$entry.filename, $entry.line] - result.add((spaces(indent) & "$#$# $#\n") % [ - left, - spaces(longestLeft - left.len + 2), - $entry.procName - ]) - let hint = getHint(entry) - if hint.len > 0: - result.add(spaces(indent+2) & "## " & hint & "\n") + let left = "$#($#)" % [$entry.filename, $entry.line] + result.add((spaces(indent) & "$#$# $#\n") % [ + left, + spaces(longestLeft - left.len + 2), + $entry.procName + ]) + let hint = getHint(entry) + if hint.len > 0: + result.add(spaces(indent+2) & "## " & hint & "\n") + except ValueError as exc: + return exc.msg # Shouldn't actually happen since we set the formatting string when defined(chronosStackTrace): proc injectStacktrace(future: FutureBase) = @@ -462,7 +464,7 @@ when defined(chronosStackTrace): # newMsg.add "\n" & $entry future.error.msg = newMsg -proc internalCheckComplete*(fut: FutureBase) = +proc internalCheckComplete*(fut: FutureBase) {.raises: [Defect, CatchableError].} = # For internal use only. Used in asyncmacro if not(isNil(fut.error)): when defined(chronosStackTrace): @@ -474,22 +476,19 @@ proc internalRead*[T](fut: Future[T] | FutureVar[T]): T {.inline.} = when T isnot void: return fut.value -proc read*[T](future: Future[T] | FutureVar[T]): T = +proc read*[T](future: Future[T] | FutureVar[T]): T {.raises: [Defect, CatchableError].} = ## Retrieves the value of ``future``. Future must be finished otherwise ## this function will fail with a ``ValueError`` exception. ## ## If the result of the future is an error then that error will be raised. - {.push hint[ConvFromXtoItselfNotNeeded]: off.} - let fut = Future[T](future) - {.pop.} - if fut.finished(): + if future.finished(): internalCheckComplete(future) internalRead(future) else: # TODO: Make a custom exception type for this? raise newException(ValueError, "Future still in progress.") -proc readError*[T](future: Future[T]): ref Exception = +proc readError*[T](future: Future[T]): ref CatchableError {.raises: [Defect, ValueError].} = ## Retrieves the exception stored in ``future``. ## ## An ``ValueError`` exception will be thrown if no exception exists @@ -507,18 +506,18 @@ proc mget*[T](future: FutureVar[T]): var T = ## Future has not been finished. result = Future[T](future).value -proc asyncCheck*[T](future: Future[T]) = - ## Sets a callback on ``future`` which raises an exception if the future - ## finished with an error. - ## - ## This should be used instead of ``discard`` to discard void futures. - doAssert(not isNil(future), "Future is nil") - proc cb(data: pointer) = - if future.failed() or future.cancelled(): - when defined(chronosStackTrace): - injectStacktrace(future) - raise future.error - future.callback = cb +template taskFutureLocation(future: FutureBase): string = + let loc = future.location[0] + "[" & ( + if len(loc.procedure) == 0: "[unspecified]" else: $loc.procedure & "()" + ) & " at " & $loc.file & ":" & $(loc.line) & "]" + +template taskErrorMessage(future: FutureBase): string = + "Asynchronous task " & taskFutureLocation(future) & + " finished with an exception \"" & $future.error.name & "\"!\nStack trace: " & + future.error.getStackTrace() +template taskCancelMessage(future: FutureBase): string = + "Asynchronous task " & taskFutureLocation(future) & " was cancelled!" proc asyncSpawn*(future: Future[void]) = ## Spawns a new concurrent async task. @@ -534,35 +533,45 @@ proc asyncSpawn*(future: Future[void]) = ## and processed immediately. doAssert(not isNil(future), "Future is nil") - template getFutureLocation(): string = - let loc = future.location[0] - "[" & ( - if len(loc.procedure) == 0: "[unspecified]" else: $loc.procedure & "()" - ) & " at " & $loc.file & ":" & $(loc.line) & "]" - - template getErrorMessage(): string = - "Asynchronous task " & getFutureLocation() & - " finished with an exception \"" & $future.error.name & "\"!" - template getCancelMessage(): string = - "Asynchronous task " & getFutureLocation() & " was cancelled!" - proc cb(data: pointer) = if future.failed(): - raise newException(FutureDefect, getErrorMessage()) + raise newException(FutureDefect, taskErrorMessage(future)) elif future.cancelled(): - raise newException(FutureDefect, getCancelMessage()) + raise newException(FutureDefect, taskCancelMessage(future)) if not(future.finished()): # We adding completion callback only if ``future`` is not finished yet. future.addCallback(cb) else: - if future.failed(): - raise newException(FutureDefect, getErrorMessage()) - elif future.cancelled(): - raise newException(FutureDefect, getCancelMessage()) + cb(nil) -proc asyncDiscard*[T](future: Future[T]) {.deprecated.} = discard - ## This is async workaround for discard ``Future[T]``. +proc asyncCheck*[T](future: Future[T]) {. + deprecated: "Raises Defect on future failure, fix your code and use asyncSpawn!".} = + ## This function used to raise an exception through the `poll` call if + ## the given future failed - there's no way to handle such exceptions so this + ## function is now an alias for `asyncSpawn` + ## + when T is void: + asyncSpawn(future) + else: + proc cb(data: pointer) = + if future.failed(): + raise newException(FutureDefect, taskErrorMessage(future)) + elif future.cancelled(): + raise newException(FutureDefect, taskCancelMessage(future)) + + if not(future.finished()): + # We adding completion callback only if ``future`` is not finished yet. + future.addCallback(cb) + else: + cb(nil) + +proc asyncDiscard*[T](future: Future[T]) {. + deprecated: "Use asyncSpawn or `discard await`".} = discard + ## `asyncDiscard` will discard the outcome of the operation - unlike `discard` + ## it also throws away exceptions! Use `asyncSpawn` if you're sure your + ## code doesn't raise exceptions, or `discard await` to ignore successful + ## outcomes proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] {. deprecated: "Use allFutures[T](varargs[Future[T]])".} = @@ -587,7 +596,7 @@ proc `and`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] {. fut1.callback = cb fut2.callback = cb - proc cancellation(udata: pointer) {.gcsafe.} = + proc cancellation(udata: pointer) = # On cancel we remove all our callbacks only. if not(fut1.finished()): fut1.removeCallback(cb) @@ -611,7 +620,8 @@ proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] = ## ## If cancelled, ``fut1`` and ``fut2`` futures WILL NOT BE cancelled. var retFuture = newFuture[void]("chronos.or") - proc cb(udata: pointer) {.gcsafe.} = + var cb: proc(udata: pointer) {.gcsafe, raises: [Defect].} + cb = proc(udata: pointer) = if not(retFuture.finished()): var fut = cast[FutureBase](udata) if cast[pointer](fut1) == udata: @@ -623,7 +633,7 @@ proc `or`*[T, Y](fut1: Future[T], fut2: Future[Y]): Future[void] = else: retFuture.complete() - proc cancellation(udata: pointer) {.gcsafe.} = + proc cancellation(udata: pointer) = # On cancel we remove all our callbacks only. if not(fut1.finished()): fut1.removeCallback(cb) @@ -676,7 +686,7 @@ proc all*[T](futs: varargs[Future[T]]): auto {. when T is void: var retFuture = newFuture[void]("chronos.all(void)") - proc cb(udata: pointer) {.gcsafe.} = + proc cb(udata: pointer) = if not(retFuture.finished()): inc(completedFutures) if completedFutures == totalFutures: @@ -698,7 +708,7 @@ proc all*[T](futs: varargs[Future[T]]): auto {. var retFuture = newFuture[seq[T]]("chronos.all(T)") var retValues = newSeq[T](totalFutures) - proc cb(udata: pointer) {.gcsafe.} = + proc cb(udata: pointer) = if not(retFuture.finished()): inc(completedFutures) if completedFutures == totalFutures: @@ -707,7 +717,7 @@ proc all*[T](futs: varargs[Future[T]]): auto {. retFuture.fail(nfut.error) break else: - retValues[k] = nfut.read() + retValues[k] = nfut.value if not(retFuture.failed()): retFuture.complete(retValues) @@ -731,7 +741,7 @@ proc oneIndex*[T](futs: varargs[Future[T]]): Future[int] {. var nfuts = @futs var retFuture = newFuture[int]("chronos.oneIndex(T)") - proc cb(udata: pointer) {.gcsafe.} = + proc cb(udata: pointer) = var res = -1 if not(retFuture.finished()): var rfut = cast[FutureBase](udata) @@ -762,7 +772,7 @@ proc oneValue*[T](futs: varargs[Future[T]]): Future[T] {. var nfuts = @futs var retFuture = newFuture[T]("chronos.oneValue(T)") - proc cb(udata: pointer) {.gcsafe.} = + proc cb(udata: pointer) = var resFut: Future[T] if not(retFuture.finished()): var rfut = cast[FutureBase](udata) @@ -794,10 +804,10 @@ proc cancelAndWait*[T](fut: Future[T]): Future[void] = ## If ``fut`` is already finished (completed, failed or cancelled) result ## Future[void] object will be returned complete. var retFuture = newFuture[void]("chronos.cancelAndWait(T)") - proc continuation(udata: pointer) {.gcsafe.} = + proc continuation(udata: pointer) = if not(retFuture.finished()): retFuture.complete() - proc cancellation(udata: pointer) {.gcsafe.} = + proc cancellation(udata: pointer) = if not(fut.finished()): fut.removeCallback(continuation) if fut.finished(): @@ -823,13 +833,13 @@ proc allFutures*[T](futs: varargs[Future[T]]): Future[void] = # Because we can't capture varargs[T] in closures we need to create copy. var nfuts = @futs - proc cb(udata: pointer) {.gcsafe.} = + proc cb(udata: pointer) = if not(retFuture.finished()): inc(completedFutures) if completedFutures == totalFutures: retFuture.complete() - proc cancellation(udata: pointer) {.gcsafe.} = + proc cancellation(udata: pointer) = # On cancel we remove all our callbacks only. for i in 0.. createCb(retFuture) # NOTE: The "_continue" suffix is checked for in asyncfutures.nim to produce # friendlier stack traces: - var cbName = genSym(nskProc, prcName & "_continue") + var cbName = genSym(nskVar, prcName & "_continue") var procCb = getAst createCb(retFutureSym, iteratorNameSym, newStrLitNode(prcName), cbName, @@ -281,7 +342,7 @@ proc asyncSingleProc(prc: NimNode): NimNode {.compileTime.} = #if prcName == "recvLineInto": # echo(toStrLit(result)) -template await*[T](f: Future[T]): auto = +template await*[T](f: Future[T]): untyped = when declared(chronosInternalRetFuture): when not declaredInScope(chronosInternalTmpFuture): var chronosInternalTmpFuture {.inject.}: FutureBase @@ -304,7 +365,8 @@ template await*[T](f: Future[T]): auto = if chronosInternalRetFuture.mustCancel: raise newCancelledError() chronosInternalTmpFuture.internalCheckComplete() - cast[type(f)](chronosInternalTmpFuture).internalRead() + when T isnot void: + cast[type(f)](chronosInternalTmpFuture).internalRead() else: unsupported "await is only available within {.async.}" diff --git a/chronos/asyncsync.nim b/chronos/asyncsync.nim index b352a895..44f09ae9 100644 --- a/chronos/asyncsync.nim +++ b/chronos/asyncsync.nim @@ -9,6 +9,9 @@ # MIT license (LICENSE-MIT) ## This module implements some core synchronization primitives + +{.push raises: [Defect].} + import std/[sequtils, deques] import ./asyncloop @@ -115,7 +118,7 @@ proc locked*(lock: AsyncLock): bool = ## Return `true` if the lock ``lock`` is acquired, `false` otherwise. lock.locked -proc release*(lock: AsyncLock) = +proc release*(lock: AsyncLock) {.raises: [Defect, AsyncLockError].} = ## Release a lock ``lock``. ## ## When the ``lock`` is locked, reset it to unlocked, and return. If any @@ -220,7 +223,8 @@ proc empty*[T](aq: AsyncQueue[T]): bool {.inline.} = ## Return ``true`` if the queue is empty, ``false`` otherwise. (len(aq.queue) == 0) -proc addFirstNoWait*[T](aq: AsyncQueue[T], item: T) = +proc addFirstNoWait*[T](aq: AsyncQueue[T], item: T) {. + raises: [Defect, AsyncQueueFullError].}= ## Put an item ``item`` to the beginning of the queue ``aq`` immediately. ## ## If queue ``aq`` is full, then ``AsyncQueueFullError`` exception raised. @@ -229,7 +233,8 @@ proc addFirstNoWait*[T](aq: AsyncQueue[T], item: T) = aq.queue.addFirst(item) aq.getters.wakeupNext() -proc addLastNoWait*[T](aq: AsyncQueue[T], item: T) = +proc addLastNoWait*[T](aq: AsyncQueue[T], item: T) {. + raises: [Defect, AsyncQueueFullError].}= ## Put an item ``item`` at the end of the queue ``aq`` immediately. ## ## If queue ``aq`` is full, then ``AsyncQueueFullError`` exception raised. @@ -238,7 +243,8 @@ proc addLastNoWait*[T](aq: AsyncQueue[T], item: T) = aq.queue.addLast(item) aq.getters.wakeupNext() -proc popFirstNoWait*[T](aq: AsyncQueue[T]): T = +proc popFirstNoWait*[T](aq: AsyncQueue[T]): T {. + raises: [Defect, AsyncQueueEmptyError].} = ## Get an item from the beginning of the queue ``aq`` immediately. ## ## If queue ``aq`` is empty, then ``AsyncQueueEmptyError`` exception raised. @@ -248,7 +254,8 @@ proc popFirstNoWait*[T](aq: AsyncQueue[T]): T = aq.putters.wakeupNext() res -proc popLastNoWait*[T](aq: AsyncQueue[T]): T = +proc popLastNoWait*[T](aq: AsyncQueue[T]): T {. + raises: [Defect, AsyncQueueEmptyError].} = ## Get an item from the end of the queue ``aq`` immediately. ## ## If queue ``aq`` is empty, then ``AsyncQueueEmptyError`` exception raised. @@ -314,11 +321,13 @@ proc popLast*[T](aq: AsyncQueue[T]): Future[T] {.async.} = raise exc return aq.popLastNoWait() -proc putNoWait*[T](aq: AsyncQueue[T], item: T) {.inline.} = +proc putNoWait*[T](aq: AsyncQueue[T], item: T) {. + raises: [Defect, AsyncQueueFullError].} = ## Alias of ``addLastNoWait()``. aq.addLastNoWait(item) -proc getNoWait*[T](aq: AsyncQueue[T]): T {.inline.} = +proc getNoWait*[T](aq: AsyncQueue[T]): T {. + raises: [Defect, AsyncQueueEmptyError].} = ## Alias of ``popFirstNoWait()``. aq.popFirstNoWait() diff --git a/chronos/debugutils.nim b/chronos/debugutils.nim index f00d8053..87a7225d 100644 --- a/chronos/debugutils.nim +++ b/chronos/debugutils.nim @@ -6,7 +6,10 @@ # Licensed under either of # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) -import asyncloop + +{.push raises: [Defect].} + +import ./asyncloop const AllFutureStates* = {FutureState.Pending, FutureState.Cancelled, diff --git a/chronos/handles.nim b/chronos/handles.nim index eef66cb3..03b7f00f 100644 --- a/chronos/handles.nim +++ b/chronos/handles.nim @@ -7,7 +7,12 @@ # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) -import net, nativesockets, asyncloop +{.push raises: [Defect].} + +import + std/[net, nativesockets], + ./selectors2, + ./asyncloop when defined(windows): import os, winlean @@ -88,7 +93,8 @@ proc getSocketError*(socket: AsyncFD, err: var int): bool = result = getSockOpt(socket, cint(SOL_SOCKET), cint(SO_ERROR), err) proc createAsyncSocket*(domain: Domain, sockType: SockType, - protocol: Protocol): AsyncFD = + protocol: Protocol): AsyncFD {. + raises: [Defect, CatchableError].} = ## Creates new asynchronous socket. ## Returns ``asyncInvalidSocket`` on error. let handle = createNativeSocket(domain, sockType, protocol) @@ -104,7 +110,8 @@ proc createAsyncSocket*(domain: Domain, sockType: SockType, result = AsyncFD(handle) register(result) -proc wrapAsyncSocket*(sock: SocketHandle): AsyncFD = +proc wrapAsyncSocket*(sock: SocketHandle): AsyncFD {. + raises: [Defect, CatchableError].} = ## Wraps socket to asynchronous socket handle. ## Return ``asyncInvalidSocket`` on error. if not setSocketBlocking(sock, false): @@ -117,7 +124,7 @@ proc wrapAsyncSocket*(sock: SocketHandle): AsyncFD = result = AsyncFD(sock) register(result) -proc getMaxOpenFiles*(): int = +proc getMaxOpenFiles*(): int {.raises: [Defect, OSError].} = ## Returns maximum file descriptor number that can be opened by this process. ## ## Note: On Windows its impossible to obtain such number, so getMaxOpenFiles() @@ -131,7 +138,7 @@ proc getMaxOpenFiles*(): int = raiseOSError(osLastError()) result = int(limits.rlim_cur) -proc setMaxOpenFiles*(count: int) = +proc setMaxOpenFiles*(count: int) {.raises: [Defect, OSError].} = ## Set maximum file descriptor number that can be opened by this process. ## ## Note: On Windows its impossible to set this value, so it just a nop call. diff --git a/chronos/ioselects/ioselectors_epoll.nim b/chronos/ioselects/ioselectors_epoll.nim new file mode 100644 index 00000000..16daa2dc --- /dev/null +++ b/chronos/ioselects/ioselectors_epoll.nim @@ -0,0 +1,524 @@ +# +# +# Nim's Runtime Library +# (c) Copyright 2016 Eugene Kabanov +# +# See the file "copying.txt", included in this +# distribution, for details about the copyright. +# + +# This module implements Linux epoll(). + +{.push raises: [Defect].} + +import posix, times, epoll + +# Maximum number of events that can be returned +const MAX_EPOLL_EVENTS = 64 + +when not defined(android): + type + SignalFdInfo* {.importc: "struct signalfd_siginfo", + header: "", pure, final.} = object + ssi_signo*: uint32 + ssi_errno*: int32 + ssi_code*: int32 + ssi_pid*: uint32 + ssi_uid*: uint32 + ssi_fd*: int32 + ssi_tid*: uint32 + ssi_band*: uint32 + ssi_overrun*: uint32 + ssi_trapno*: uint32 + ssi_status*: int32 + ssi_int*: int32 + ssi_ptr*: uint64 + ssi_utime*: uint64 + ssi_stime*: uint64 + ssi_addr*: uint64 + pad* {.importc: "__pad".}: array[0..47, uint8] + +proc timerfd_create(clock_id: ClockId, flags: cint): cint + {.cdecl, importc: "timerfd_create", header: "".} +proc timerfd_settime(ufd: cint, flags: cint, + utmr: var Itimerspec, otmr: var Itimerspec): cint + {.cdecl, importc: "timerfd_settime", header: "".} +proc eventfd(count: cuint, flags: cint): cint + {.cdecl, importc: "eventfd", header: "".} + +when not defined(android): + proc signalfd(fd: cint, mask: var Sigset, flags: cint): cint + {.cdecl, importc: "signalfd", header: "".} + +when hasThreadSupport: + type + SelectorImpl[T] = object + epollFD: cint + numFD: int + fds: ptr SharedArray[SelectorKey[T]] + count: int + Selector*[T] = ptr SelectorImpl[T] +else: + type + SelectorImpl[T] = object + epollFD: cint + numFD: int + fds: seq[SelectorKey[T]] + count: int + Selector*[T] = ref SelectorImpl[T] +type + SelectEventImpl = object + efd: cint + SelectEvent* = ptr SelectEventImpl + +proc newSelector*[T](): Selector[T] {.raises: [Defect, OSError].} = + # Retrieve the maximum fd count (for current OS) via getrlimit() + var a = RLimit() + # Start with a reasonable size, checkFd() will grow this on demand + const numFD = 1024 + + var epollFD = epoll_create(MAX_EPOLL_EVENTS) + if epollFD < 0: + raiseOSError(osLastError()) + + when hasThreadSupport: + result = cast[Selector[T]](allocShared0(sizeof(SelectorImpl[T]))) + result.epollFD = epollFD + result.numFD = numFD + result.fds = allocSharedArray[SelectorKey[T]](numFD) + else: + result = Selector[T]() + result.epollFD = epollFD + result.numFD = numFD + result.fds = newSeq[SelectorKey[T]](numFD) + + for i in 0 ..< numFD: + result.fds[i].ident = InvalidIdent + +proc close*[T](s: Selector[T]) = + let res = posix.close(s.epollFD) + when hasThreadSupport: + deallocSharedArray(s.fds) + deallocShared(cast[pointer](s)) + if res != 0: + raiseIOSelectorsError(osLastError()) + +proc newSelectEvent*(): SelectEvent {.raises: [Defect, OSError, IOSelectorsException].} = + let fdci = eventfd(0, 0) + if fdci == -1: + raiseIOSelectorsError(osLastError()) + setNonBlocking(fdci) + result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl))) + result.efd = fdci + +proc trigger*(ev: SelectEvent) {.raises: [Defect, IOSelectorsException].} = + var data: uint64 = 1 + if posix.write(ev.efd, addr data, sizeof(uint64)) == -1: + raiseIOSelectorsError(osLastError()) + +proc close*(ev: SelectEvent) {.raises: [Defect, IOSelectorsException].} = + let res = posix.close(ev.efd) + deallocShared(cast[pointer](ev)) + if res != 0: + raiseIOSelectorsError(osLastError()) + +template checkFd(s, f) = + if f >= s.numFD: + var numFD = s.numFD + while numFD <= f: numFD *= 2 + when hasThreadSupport: + s.fds = reallocSharedArray(s.fds, numFD) + else: + s.fds.setLen(numFD) + for i in s.numFD ..< numFD: + s.fds[i].ident = InvalidIdent + s.numFD = numFD + +proc registerHandle*[T](s: Selector[T], fd: int | SocketHandle, + events: set[Event], data: T) {. + raises: [Defect, IOSelectorsException].} = + let fdi = int(fd) + s.checkFd(fdi) + doAssert(s.fds[fdi].ident == InvalidIdent, "Descriptor " & $fdi & " already registered") + s.setKey(fdi, events, 0, data) + if events != {}: + var epv = EpollEvent(events: EPOLLRDHUP) + epv.data.u64 = fdi.uint + if Event.Read in events: epv.events = epv.events or EPOLLIN + if Event.Write in events: epv.events = epv.events or EPOLLOUT + if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) != 0: + raiseIOSelectorsError(osLastError()) + inc(s.count) + +proc updateHandle*[T](s: Selector[T], fd: int | SocketHandle, events: set[Event]) {. + raises: [Defect, IOSelectorsException].} = + let maskEvents = {Event.Timer, Event.Signal, Event.Process, Event.Vnode, + Event.User, Event.Oneshot, Event.Error} + let fdi = int(fd) + s.checkFd(fdi) + var pkey = addr(s.fds[fdi]) + doAssert(pkey.ident != InvalidIdent, + "Descriptor " & $fdi & " is not registered in the selector!") + doAssert(pkey.events * maskEvents == {}) + if pkey.events != events: + var epv = EpollEvent(events: EPOLLRDHUP) + epv.data.u64 = fdi.uint + + if Event.Read in events: epv.events = epv.events or EPOLLIN + if Event.Write in events: epv.events = epv.events or EPOLLOUT + + if pkey.events == {}: + if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) != 0: + raiseIOSelectorsError(osLastError()) + inc(s.count) + else: + if events != {}: + if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fdi.cint, addr epv) != 0: + raiseIOSelectorsError(osLastError()) + else: + if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0: + raiseIOSelectorsError(osLastError()) + dec(s.count) + pkey.events = events + +proc unregister*[T](s: Selector[T], fd: int|SocketHandle) {.raises: [Defect, IOSelectorsException].} = + let fdi = int(fd) + s.checkFd(fdi) + var pkey = addr(s.fds[fdi]) + doAssert(pkey.ident != InvalidIdent, + "Descriptor " & $fdi & " is not registered in the selector!") + if pkey.events != {}: + when not defined(android): + if Event.Read in pkey.events or Event.Write in pkey.events or Event.User in pkey.events: + var epv = EpollEvent() + # TODO: Refactor all these EPOLL_CTL_DEL + dec(s.count) into a proc. + if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0: + raiseIOSelectorsError(osLastError()) + dec(s.count) + elif Event.Timer in pkey.events: + if Event.Finished notin pkey.events: + var epv = EpollEvent() + if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0: + raiseIOSelectorsError(osLastError()) + dec(s.count) + if posix.close(cint(fdi)) != 0: + raiseIOSelectorsError(osLastError()) + elif Event.Signal in pkey.events: + var epv = EpollEvent() + if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0: + raiseIOSelectorsError(osLastError()) + var nmask, omask: Sigset + discard sigemptyset(nmask) + discard sigemptyset(omask) + discard sigaddset(nmask, cint(s.fds[fdi].param)) + unblockSignals(nmask, omask) + dec(s.count) + if posix.close(cint(fdi)) != 0: + raiseIOSelectorsError(osLastError()) + elif Event.Process in pkey.events: + if Event.Finished notin pkey.events: + var epv = EpollEvent() + if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0: + raiseIOSelectorsError(osLastError()) + var nmask, omask: Sigset + discard sigemptyset(nmask) + discard sigemptyset(omask) + discard sigaddset(nmask, SIGCHLD) + unblockSignals(nmask, omask) + dec(s.count) + if posix.close(cint(fdi)) != 0: + raiseIOSelectorsError(osLastError()) + else: + if Event.Read in pkey.events or Event.Write in pkey.events or Event.User in pkey.events: + var epv = EpollEvent() + if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0: + raiseIOSelectorsError(osLastError()) + dec(s.count) + elif Event.Timer in pkey.events: + if Event.Finished notin pkey.events: + var epv = EpollEvent() + if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0: + raiseIOSelectorsError(osLastError()) + dec(s.count) + if posix.close(cint(fdi)) != 0: + raiseIOSelectorsError(osLastError()) + clearKey(pkey) + +proc unregister*[T](s: Selector[T], ev: SelectEvent) {. + raises: [Defect, IOSelectorsException].} = + let fdi = int(ev.efd) + s.checkFd(fdi) + var pkey = addr(s.fds[fdi]) + doAssert(pkey.ident != InvalidIdent, "Event is not registered in the queue!") + doAssert(Event.User in pkey.events) + var epv = EpollEvent() + if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fdi.cint, addr epv) != 0: + raiseIOSelectorsError(osLastError()) + dec(s.count) + clearKey(pkey) + +proc registerTimer*[T](s: Selector[T], timeout: int, oneshot: bool, + data: T): int {. + discardable, raises: [Defect, IOSelectorsException].} = + var + newTs: Itimerspec + oldTs: Itimerspec + let fdi = timerfd_create(CLOCK_MONOTONIC, 0).int + if fdi == -1: + raiseIOSelectorsError(osLastError()) + setNonBlocking(fdi.cint) + + s.checkFd(fdi) + doAssert(s.fds[fdi].ident == InvalidIdent) + + var events = {Event.Timer} + var epv = EpollEvent(events: EPOLLIN or EPOLLRDHUP) + epv.data.u64 = fdi.uint + + if oneshot: + newTs.it_interval.tv_sec = posix.Time(0) + newTs.it_interval.tv_nsec = 0 + newTs.it_value.tv_sec = posix.Time(timeout div 1_000) + newTs.it_value.tv_nsec = (timeout %% 1_000) * 1_000_000 + incl(events, Event.Oneshot) + epv.events = epv.events or EPOLLONESHOT + else: + newTs.it_interval.tv_sec = posix.Time(timeout div 1000) + newTs.it_interval.tv_nsec = (timeout %% 1_000) * 1_000_000 + newTs.it_value.tv_sec = newTs.it_interval.tv_sec + newTs.it_value.tv_nsec = newTs.it_interval.tv_nsec + + if timerfd_settime(fdi.cint, cint(0), newTs, oldTs) != 0: + raiseIOSelectorsError(osLastError()) + if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) != 0: + raiseIOSelectorsError(osLastError()) + s.setKey(fdi, events, 0, data) + inc(s.count) + result = fdi + +when not defined(android): + proc registerSignal*[T](s: Selector[T], signal: int, + data: T): int {. + discardable, raises: [Defect, OSError, IOSelectorsException].} = + var + nmask: Sigset + omask: Sigset + + discard sigemptyset(nmask) + discard sigemptyset(omask) + discard sigaddset(nmask, cint(signal)) + blockSignals(nmask, omask) + + let fdi = signalfd(-1, nmask, 0).int + if fdi == -1: + raiseIOSelectorsError(osLastError()) + setNonBlocking(fdi.cint) + + s.checkFd(fdi) + doAssert(s.fds[fdi].ident == InvalidIdent) + + var epv = EpollEvent(events: EPOLLIN or EPOLLRDHUP) + epv.data.u64 = fdi.uint + if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) != 0: + raiseIOSelectorsError(osLastError()) + s.setKey(fdi, {Event.Signal}, signal, data) + inc(s.count) + result = fdi + + proc registerProcess*[T](s: Selector, pid: int, + data: T): int {. + discardable, raises: [Defect, IOSelectorsException].} = + var + nmask: Sigset + omask: Sigset + + discard sigemptyset(nmask) + discard sigemptyset(omask) + discard sigaddset(nmask, posix.SIGCHLD) + blockSignals(nmask, omask) + + let fdi = signalfd(-1, nmask, 0).int + if fdi == -1: + raiseIOSelectorsError(osLastError()) + setNonBlocking(fdi.cint) + + s.checkFd(fdi) + doAssert(s.fds[fdi].ident == InvalidIdent) + + var epv = EpollEvent(events: EPOLLIN or EPOLLRDHUP) + epv.data.u64 = fdi.uint + epv.events = EPOLLIN or EPOLLRDHUP + if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fdi.cint, addr epv) != 0: + raiseIOSelectorsError(osLastError()) + s.setKey(fdi, {Event.Process, Event.Oneshot}, pid, data) + inc(s.count) + result = fdi + +proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) = + let fdi = int(ev.efd) + doAssert(s.fds[fdi].ident == InvalidIdent, "Event is already registered in the queue!") + s.setKey(fdi, {Event.User}, 0, data) + var epv = EpollEvent(events: EPOLLIN or EPOLLRDHUP) + epv.data.u64 = ev.efd.uint + if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, ev.efd, addr epv) != 0: + raiseIOSelectorsError(osLastError()) + inc(s.count) + +proc selectInto*[T](s: Selector[T], timeout: int, + results: var openArray[ReadyKey]): int {.raises: [Defect, IOSelectorsException].} = + var + resTable: array[MAX_EPOLL_EVENTS, EpollEvent] + maxres = MAX_EPOLL_EVENTS + i, k: int + + if maxres > len(results): + maxres = len(results) + + verifySelectParams(timeout) + + let count = epoll_wait(s.epollFD, addr(resTable[0]), maxres.cint, + timeout.cint) + if count < 0: + result = 0 + let err = osLastError() + if cint(err) != EINTR: + raiseIOSelectorsError(err) + elif count == 0: + result = 0 + else: + i = 0 + k = 0 + while i < count: + let fdi = int(resTable[i].data.u64) + let pevents = resTable[i].events + var pkey = addr(s.fds[fdi]) + doAssert(pkey.ident != InvalidIdent) + var rkey = ReadyKey(fd: fdi, events: {}) + + if (pevents and EPOLLERR) != 0 or (pevents and EPOLLHUP) != 0: + if (pevents and EPOLLHUP) != 0: + rkey.errorCode = OSErrorCode ECONNRESET + else: + # Try reading SO_ERROR from fd. + var error: cint + var size = SockLen sizeof(error) + if getsockopt(SocketHandle fdi, SOL_SOCKET, SO_ERROR, addr(error), + addr(size)) == 0'i32: + rkey.errorCode = OSErrorCode error + + rkey.events.incl(Event.Error) + if (pevents and EPOLLOUT) != 0: + rkey.events.incl(Event.Write) + when not defined(android): + if (pevents and EPOLLIN) != 0: + if Event.Read in pkey.events: + rkey.events.incl(Event.Read) + elif Event.Timer in pkey.events: + var data: uint64 = 0 + if posix.read(cint(fdi), addr data, + sizeof(uint64)) != sizeof(uint64): + raiseIOSelectorsError(osLastError()) + rkey.events.incl(Event.Timer) + elif Event.Signal in pkey.events: + var data = SignalFdInfo() + if posix.read(cint(fdi), addr data, + sizeof(SignalFdInfo)) != sizeof(SignalFdInfo): + raiseIOSelectorsError(osLastError()) + rkey.events.incl(Event.Signal) + elif Event.Process in pkey.events: + var data = SignalFdInfo() + if posix.read(cint(fdi), addr data, + sizeof(SignalFdInfo)) != sizeof(SignalFdInfo): + raiseIOSelectorsError(osLastError()) + if cast[int](data.ssi_pid) == pkey.param: + rkey.events.incl(Event.Process) + else: + inc(i) + continue + elif Event.User in pkey.events: + var data: uint64 = 0 + if posix.read(cint(fdi), addr data, + sizeof(uint64)) != sizeof(uint64): + let err = osLastError() + if err == OSErrorCode(EAGAIN): + inc(i) + continue + else: + raiseIOSelectorsError(err) + rkey.events.incl(Event.User) + else: + if (pevents and EPOLLIN) != 0: + if Event.Read in pkey.events: + rkey.events.incl(Event.Read) + elif Event.Timer in pkey.events: + var data: uint64 = 0 + if posix.read(cint(fdi), addr data, + sizeof(uint64)) != sizeof(uint64): + raiseIOSelectorsError(osLastError()) + rkey.events.incl(Event.Timer) + elif Event.User in pkey.events: + var data: uint64 = 0 + if posix.read(cint(fdi), addr data, + sizeof(uint64)) != sizeof(uint64): + let err = osLastError() + if err == OSErrorCode(EAGAIN): + inc(i) + continue + else: + raiseIOSelectorsError(err) + rkey.events.incl(Event.User) + + if Event.Oneshot in pkey.events: + var epv = EpollEvent() + if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, cint(fdi), addr epv) != 0: + raiseIOSelectorsError(osLastError()) + # we will not clear key until it will be unregistered, so + # application can obtain data, but we will decrease counter, + # because epoll is empty. + dec(s.count) + # we are marking key with `Finished` event, to avoid double decrease. + pkey.events.incl(Event.Finished) + + results[k] = rkey + inc(k) + inc(i) + result = k + +proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey] = + result = newSeq[ReadyKey](MAX_EPOLL_EVENTS) + let count = selectInto(s, timeout, result) + result.setLen(count) + +template isEmpty*[T](s: Selector[T]): bool = + (s.count == 0) + +proc contains*[T](s: Selector[T], fd: SocketHandle|int): bool {.inline.} = + let fdi = int(fd) + fdi < s.numFD and s.fds[fdi].ident != InvalidIdent + +proc setData*[T](s: Selector[T], fd: SocketHandle|int, data: T): bool = + let fdi = int(fd) + s.checkFd(fdi) + if fdi in s: + s.fds[fdi].data = data + result = true + +template withData*[T](s: Selector[T], fd: SocketHandle|int, value, + body: untyped) = + mixin checkFd + let fdi = int(fd) + if fdi in s: + var value = addr(s.fds[fdi].data) + body + +template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1, + body2: untyped) = + let fdi = int(fd) + if fdi in s: + var value = addr(s.fds[fdi].data) + body1 + else: + body2 + +proc getFd*[T](s: Selector[T]): int = + return s.epollFd.int diff --git a/chronos/ioselects/ioselectors_kqueue.nim b/chronos/ioselects/ioselectors_kqueue.nim new file mode 100644 index 00000000..e346f822 --- /dev/null +++ b/chronos/ioselects/ioselectors_kqueue.nim @@ -0,0 +1,625 @@ +# +# +# Nim's Runtime Library +# (c) Copyright 2016 Eugene Kabanov +# +# See the file "copying.txt", included in this +# distribution, for details about the copyright. +# + +# This module implements BSD kqueue(). + +import posix, times, kqueue + +const + # Maximum number of events that can be returned. + MAX_KQUEUE_EVENTS = 64 + # SIG_IGN and SIG_DFL declared in posix.nim as variables, but we need them + # to be constants and GC-safe. + SIG_DFL = cast[proc(x: cint) {.raises: [],noconv,gcsafe.}](0) + SIG_IGN = cast[proc(x: cint) {.raises: [],noconv,gcsafe.}](1) + +when defined(kqcache): + const CACHE_EVENTS = true + +when defined(macosx) or defined(freebsd) or defined(dragonfly): + when defined(macosx): + const MAX_DESCRIPTORS_ID = 29 # KERN_MAXFILESPERPROC (MacOS) + else: + const MAX_DESCRIPTORS_ID = 27 # KERN_MAXFILESPERPROC (FreeBSD) + proc sysctl(name: ptr cint, namelen: cuint, oldp: pointer, oldplen: ptr csize_t, + newp: pointer, newplen: csize_t): cint + {.importc: "sysctl",header: """#include + #include """} +elif defined(netbsd) or defined(openbsd): + # OpenBSD and NetBSD don't have KERN_MAXFILESPERPROC, so we are using + # KERN_MAXFILES, because KERN_MAXFILES is always bigger, + # than KERN_MAXFILESPERPROC. + const MAX_DESCRIPTORS_ID = 7 # KERN_MAXFILES + proc sysctl(name: ptr cint, namelen: cuint, oldp: pointer, oldplen: ptr csize_t, + newp: pointer, newplen: csize_t): cint + {.importc: "sysctl",header: """#include + #include """} + +when hasThreadSupport: + type + SelectorImpl[T] = object + kqFD: cint + maxFD: int + changes: ptr SharedArray[KEvent] + fds: ptr SharedArray[SelectorKey[T]] + count: int + changesLock: Lock + changesSize: int + changesLength: int + sock: cint + Selector*[T] = ptr SelectorImpl[T] +else: + type + SelectorImpl[T] = object + kqFD: cint + maxFD: int + changes: seq[KEvent] + fds: seq[SelectorKey[T]] + count: int + sock: cint + Selector*[T] = ref SelectorImpl[T] + +type + SelectEventImpl = object + rfd: cint + wfd: cint + + SelectEvent* = ptr SelectEventImpl + # SelectEvent is declared as `ptr` to be placed in `shared memory`, + # so you can share one SelectEvent handle between threads. + +proc getUnique[T](s: Selector[T]): int {.inline.} = + # we create duplicated handles to get unique indexes for our `fds` array. + result = posix.fcntl(s.sock, F_DUPFD, s.sock) + if result == -1: + raiseIOSelectorsError(osLastError()) + +proc newSelector*[T](): owned(Selector[T]) = + var maxFD = 0.cint + var size = csize_t(sizeof(cint)) + var namearr = [1.cint, MAX_DESCRIPTORS_ID.cint] + # Obtain maximum number of opened file descriptors for process + if sysctl(addr(namearr[0]), 2, cast[pointer](addr maxFD), addr size, + nil, 0) != 0: + raiseIOSelectorsError(osLastError()) + + var kqFD = kqueue() + if kqFD < 0: + raiseIOSelectorsError(osLastError()) + + # we allocating empty socket to duplicate it handle in future, to get unique + # indexes for `fds` array. This is needed to properly identify + # {Event.Timer, Event.Signal, Event.Process} events. + let usock = posix.socket(posix.AF_INET, posix.SOCK_STREAM, + posix.IPPROTO_TCP).cint + if usock == -1: + let err = osLastError() + discard posix.close(kqFD) + raiseIOSelectorsError(err) + + when hasThreadSupport: + result = cast[Selector[T]](allocShared0(sizeof(SelectorImpl[T]))) + result.fds = allocSharedArray[SelectorKey[T]](maxFD) + result.changes = allocSharedArray[KEvent](MAX_KQUEUE_EVENTS) + result.changesSize = MAX_KQUEUE_EVENTS + initLock(result.changesLock) + else: + result = Selector[T]() + result.fds = newSeq[SelectorKey[T]](maxFD) + result.changes = newSeqOfCap[KEvent](MAX_KQUEUE_EVENTS) + + for i in 0 ..< maxFD: + result.fds[i].ident = InvalidIdent + + result.sock = usock + result.kqFD = kqFD + result.maxFD = maxFD.int + +proc close*[T](s: Selector[T]) = + let res1 = posix.close(s.kqFD) + let res2 = posix.close(s.sock) + when hasThreadSupport: + deinitLock(s.changesLock) + deallocSharedArray(s.fds) + deallocShared(cast[pointer](s)) + if res1 != 0 or res2 != 0: + raiseIOSelectorsError(osLastError()) + +proc newSelectEvent*(): SelectEvent = + var fds: array[2, cint] + if posix.pipe(fds) != 0: + raiseIOSelectorsError(osLastError()) + setNonBlocking(fds[0]) + setNonBlocking(fds[1]) + result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl))) + result.rfd = fds[0] + result.wfd = fds[1] + +proc trigger*(ev: SelectEvent) = + var data: uint64 = 1 + if posix.write(ev.wfd, addr data, sizeof(uint64)) != sizeof(uint64): + raiseIOSelectorsError(osLastError()) + +proc close*(ev: SelectEvent) = + let res1 = posix.close(ev.rfd) + let res2 = posix.close(ev.wfd) + deallocShared(cast[pointer](ev)) + if res1 != 0 or res2 != 0: + raiseIOSelectorsError(osLastError()) + +template checkFd(s, f) = + if f >= s.maxFD: + raiseIOSelectorsError("Maximum number of descriptors is exhausted!") + +when hasThreadSupport: + template withChangeLock[T](s: Selector[T], body: untyped) = + acquire(s.changesLock) + {.locks: [s.changesLock].}: + try: + body + finally: + release(s.changesLock) +else: + template withChangeLock(s, body: untyped) = + body + +when hasThreadSupport: + template modifyKQueue[T](s: Selector[T], nident: uint, nfilter: cshort, + nflags: cushort, nfflags: cuint, ndata: int, + nudata: pointer) = + mixin withChangeLock + s.withChangeLock(): + if s.changesLength == s.changesSize: + # if cache array is full, we allocating new with size * 2 + let newSize = s.changesSize shl 1 + let rdata = allocSharedArray[KEvent](newSize) + copyMem(rdata, s.changes, s.changesSize * sizeof(KEvent)) + s.changesSize = newSize + s.changes[s.changesLength] = KEvent(ident: nident, + filter: nfilter, flags: nflags, + fflags: nfflags, data: ndata, + udata: nudata) + inc(s.changesLength) + + when not declared(CACHE_EVENTS): + template flushKQueue[T](s: Selector[T]) = + mixin withChangeLock + s.withChangeLock(): + if s.changesLength > 0: + if kevent(s.kqFD, addr(s.changes[0]), cint(s.changesLength), + nil, 0, nil) == -1: + raiseIOSelectorsError(osLastError()) + s.changesLength = 0 +else: + template modifyKQueue[T](s: Selector[T], nident: uint, nfilter: cshort, + nflags: cushort, nfflags: cuint, ndata: int, + nudata: pointer) = + s.changes.add(KEvent(ident: nident, + filter: nfilter, flags: nflags, + fflags: nfflags, data: ndata, + udata: nudata)) + + when not declared(CACHE_EVENTS): + template flushKQueue[T](s: Selector[T]) = + let length = cint(len(s.changes)) + if length > 0: + if kevent(s.kqFD, addr(s.changes[0]), length, + nil, 0, nil) == -1: + raiseIOSelectorsError(osLastError()) + s.changes.setLen(0) + +proc registerHandle*[T](s: Selector[T], fd: int | SocketHandle, + events: set[Event], data: T) = + let fdi = int(fd) + s.checkFd(fdi) + doAssert(s.fds[fdi].ident == InvalidIdent) + s.setKey(fdi, events, 0, data) + + if events != {}: + if Event.Read in events: + modifyKQueue(s, uint(fdi), EVFILT_READ, EV_ADD, 0, 0, nil) + inc(s.count) + if Event.Write in events: + modifyKQueue(s, uint(fdi), EVFILT_WRITE, EV_ADD, 0, 0, nil) + inc(s.count) + + when not declared(CACHE_EVENTS): + flushKQueue(s) + +proc updateHandle*[T](s: Selector[T], fd: int | SocketHandle, + events: set[Event]) = + let maskEvents = {Event.Timer, Event.Signal, Event.Process, Event.Vnode, + Event.User, Event.Oneshot, Event.Error} + let fdi = int(fd) + s.checkFd(fdi) + var pkey = addr(s.fds[fdi]) + doAssert(pkey.ident != InvalidIdent, + "Descriptor $# is not registered in the queue!" % $fdi) + doAssert(pkey.events * maskEvents == {}) + + if pkey.events != events: + if (Event.Read in pkey.events) and (Event.Read notin events): + modifyKQueue(s, fdi.uint, EVFILT_READ, EV_DELETE, 0, 0, nil) + dec(s.count) + if (Event.Write in pkey.events) and (Event.Write notin events): + modifyKQueue(s, fdi.uint, EVFILT_WRITE, EV_DELETE, 0, 0, nil) + dec(s.count) + if (Event.Read notin pkey.events) and (Event.Read in events): + modifyKQueue(s, fdi.uint, EVFILT_READ, EV_ADD, 0, 0, nil) + inc(s.count) + if (Event.Write notin pkey.events) and (Event.Write in events): + modifyKQueue(s, fdi.uint, EVFILT_WRITE, EV_ADD, 0, 0, nil) + inc(s.count) + + when not declared(CACHE_EVENTS): + flushKQueue(s) + + pkey.events = events + +proc registerTimer*[T](s: Selector[T], timeout: int, oneshot: bool, + data: T): int {.discardable.} = + let fdi = getUnique(s) + s.checkFd(fdi) + doAssert(s.fds[fdi].ident == InvalidIdent) + + let events = if oneshot: {Event.Timer, Event.Oneshot} else: {Event.Timer} + let flags: cushort = if oneshot: EV_ONESHOT or EV_ADD else: EV_ADD + + s.setKey(fdi, events, 0, data) + + # EVFILT_TIMER on Open/Net(BSD) has granularity of only milliseconds, + # but MacOS and FreeBSD allow use `0` as `fflags` to use milliseconds + # too + modifyKQueue(s, fdi.uint, EVFILT_TIMER, flags, 0, cint(timeout), nil) + + when not declared(CACHE_EVENTS): + flushKQueue(s) + + inc(s.count) + result = fdi + +proc registerSignal*[T](s: Selector[T], signal: int, + data: T): int {.discardable.} = + let fdi = getUnique(s) + s.checkFd(fdi) + doAssert(s.fds[fdi].ident == InvalidIdent) + + s.setKey(fdi, {Event.Signal}, signal, data) + var nmask, omask: Sigset + discard sigemptyset(nmask) + discard sigemptyset(omask) + discard sigaddset(nmask, cint(signal)) + blockSignals(nmask, omask) + # to be compatible with linux semantic we need to "eat" signals + posix.signal(cint(signal), SIG_IGN) + + modifyKQueue(s, signal.uint, EVFILT_SIGNAL, EV_ADD, 0, 0, + cast[pointer](fdi)) + + when not declared(CACHE_EVENTS): + flushKQueue(s) + + inc(s.count) + result = fdi + +proc registerProcess*[T](s: Selector[T], pid: int, + data: T): int {.discardable.} = + let fdi = getUnique(s) + s.checkFd(fdi) + doAssert(s.fds[fdi].ident == InvalidIdent) + + var kflags: cushort = EV_ONESHOT or EV_ADD + setKey(s, fdi, {Event.Process, Event.Oneshot}, pid, data) + + modifyKQueue(s, pid.uint, EVFILT_PROC, kflags, NOTE_EXIT, 0, + cast[pointer](fdi)) + + when not declared(CACHE_EVENTS): + flushKQueue(s) + + inc(s.count) + result = fdi + +proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) = + let fdi = ev.rfd.int + doAssert(s.fds[fdi].ident == InvalidIdent, "Event is already registered in the queue!") + setKey(s, fdi, {Event.User}, 0, data) + + modifyKQueue(s, fdi.uint, EVFILT_READ, EV_ADD, 0, 0, nil) + + when not declared(CACHE_EVENTS): + flushKQueue(s) + + inc(s.count) + +template processVnodeEvents(events: set[Event]): cuint = + var rfflags = 0.cuint + if events == {Event.VnodeWrite, Event.VnodeDelete, Event.VnodeExtend, + Event.VnodeAttrib, Event.VnodeLink, Event.VnodeRename, + Event.VnodeRevoke}: + rfflags = NOTE_DELETE or NOTE_WRITE or NOTE_EXTEND or NOTE_ATTRIB or + NOTE_LINK or NOTE_RENAME or NOTE_REVOKE + else: + if Event.VnodeDelete in events: rfflags = rfflags or NOTE_DELETE + if Event.VnodeWrite in events: rfflags = rfflags or NOTE_WRITE + if Event.VnodeExtend in events: rfflags = rfflags or NOTE_EXTEND + if Event.VnodeAttrib in events: rfflags = rfflags or NOTE_ATTRIB + if Event.VnodeLink in events: rfflags = rfflags or NOTE_LINK + if Event.VnodeRename in events: rfflags = rfflags or NOTE_RENAME + if Event.VnodeRevoke in events: rfflags = rfflags or NOTE_REVOKE + rfflags + +proc registerVnode*[T](s: Selector[T], fd: cint, events: set[Event], data: T) = + let fdi = fd.int + setKey(s, fdi, {Event.Vnode} + events, 0, data) + var fflags = processVnodeEvents(events) + + modifyKQueue(s, fdi.uint, EVFILT_VNODE, EV_ADD or EV_CLEAR, fflags, 0, nil) + + when not declared(CACHE_EVENTS): + flushKQueue(s) + + inc(s.count) + +proc unregister*[T](s: Selector[T], fd: int|SocketHandle) = + let fdi = int(fd) + s.checkFd(fdi) + var pkey = addr(s.fds[fdi]) + doAssert(pkey.ident != InvalidIdent, + "Descriptor [" & $fdi & "] is not registered in the queue!") + + if pkey.events != {}: + if pkey.events * {Event.Read, Event.Write} != {}: + if Event.Read in pkey.events: + modifyKQueue(s, uint(fdi), EVFILT_READ, EV_DELETE, 0, 0, nil) + dec(s.count) + if Event.Write in pkey.events: + modifyKQueue(s, uint(fdi), EVFILT_WRITE, EV_DELETE, 0, 0, nil) + dec(s.count) + when not declared(CACHE_EVENTS): + flushKQueue(s) + elif Event.Timer in pkey.events: + if Event.Finished notin pkey.events: + modifyKQueue(s, uint(fdi), EVFILT_TIMER, EV_DELETE, 0, 0, nil) + when not declared(CACHE_EVENTS): + flushKQueue(s) + dec(s.count) + if posix.close(cint(pkey.ident)) != 0: + raiseIOSelectorsError(osLastError()) + elif Event.Signal in pkey.events: + var nmask, omask: Sigset + let signal = cint(pkey.param) + discard sigemptyset(nmask) + discard sigemptyset(omask) + discard sigaddset(nmask, signal) + unblockSignals(nmask, omask) + posix.signal(signal, SIG_DFL) + modifyKQueue(s, uint(pkey.param), EVFILT_SIGNAL, EV_DELETE, 0, 0, nil) + when not declared(CACHE_EVENTS): + flushKQueue(s) + dec(s.count) + if posix.close(cint(pkey.ident)) != 0: + raiseIOSelectorsError(osLastError()) + elif Event.Process in pkey.events: + if Event.Finished notin pkey.events: + modifyKQueue(s, uint(pkey.param), EVFILT_PROC, EV_DELETE, 0, 0, nil) + when not declared(CACHE_EVENTS): + flushKQueue(s) + dec(s.count) + if posix.close(cint(pkey.ident)) != 0: + raiseIOSelectorsError(osLastError()) + elif Event.Vnode in pkey.events: + modifyKQueue(s, uint(fdi), EVFILT_VNODE, EV_DELETE, 0, 0, nil) + when not declared(CACHE_EVENTS): + flushKQueue(s) + dec(s.count) + elif Event.User in pkey.events: + modifyKQueue(s, uint(fdi), EVFILT_READ, EV_DELETE, 0, 0, nil) + when not declared(CACHE_EVENTS): + flushKQueue(s) + dec(s.count) + + clearKey(pkey) + +proc unregister*[T](s: Selector[T], ev: SelectEvent) = + let fdi = int(ev.rfd) + s.checkFd(fdi) + var pkey = addr(s.fds[fdi]) + doAssert(pkey.ident != InvalidIdent, "Event is not registered in the queue!") + doAssert(Event.User in pkey.events) + modifyKQueue(s, uint(fdi), EVFILT_READ, EV_DELETE, 0, 0, nil) + when not declared(CACHE_EVENTS): + flushKQueue(s) + clearKey(pkey) + dec(s.count) + +proc selectInto*[T](s: Selector[T], timeout: int, + results: var openArray[ReadyKey]): int = + var + tv: Timespec + resTable: array[MAX_KQUEUE_EVENTS, KEvent] + ptv = addr tv + maxres = MAX_KQUEUE_EVENTS + + verifySelectParams(timeout) + + if timeout != -1: + if timeout >= 1000: + tv.tv_sec = posix.Time(timeout div 1_000) + tv.tv_nsec = (timeout %% 1_000) * 1_000_000 + else: + tv.tv_sec = posix.Time(0) + tv.tv_nsec = timeout * 1_000_000 + else: + ptv = nil + + if maxres > len(results): + maxres = len(results) + + var count = 0 + when not declared(CACHE_EVENTS): + count = kevent(s.kqFD, nil, cint(0), addr(resTable[0]), cint(maxres), ptv) + else: + when hasThreadSupport: + s.withChangeLock(): + if s.changesLength > 0: + count = kevent(s.kqFD, addr(s.changes[0]), cint(s.changesLength), + addr(resTable[0]), cint(maxres), ptv) + s.changesLength = 0 + else: + count = kevent(s.kqFD, nil, cint(0), addr(resTable[0]), cint(maxres), + ptv) + else: + let length = cint(len(s.changes)) + if length > 0: + count = kevent(s.kqFD, addr(s.changes[0]), length, + addr(resTable[0]), cint(maxres), ptv) + s.changes.setLen(0) + else: + count = kevent(s.kqFD, nil, cint(0), addr(resTable[0]), cint(maxres), + ptv) + + if count < 0: + result = 0 + let err = osLastError() + if cint(err) != EINTR: + raiseIOSelectorsError(err) + elif count == 0: + result = 0 + else: + var i = 0 + var k = 0 # do not delete this, because `continue` used in cycle. + var pkey: ptr SelectorKey[T] + while i < count: + let kevent = addr(resTable[i]) + var rkey = ReadyKey(fd: int(kevent.ident), events: {}) + + if (kevent.flags and EV_ERROR) != 0: + rkey.events = {Event.Error} + rkey.errorCode = OSErrorCode(kevent.data) + + case kevent.filter: + of EVFILT_READ: + pkey = addr(s.fds[int(kevent.ident)]) + rkey.events.incl(Event.Read) + if Event.User in pkey.events: + var data: uint64 = 0 + if posix.read(cint(kevent.ident), addr data, + sizeof(uint64)) != sizeof(uint64): + let err = osLastError() + if err == OSErrorCode(EAGAIN): + # someone already consumed event data + inc(i) + continue + else: + raiseIOSelectorsError(err) + rkey.events = {Event.User} + of EVFILT_WRITE: + pkey = addr(s.fds[int(kevent.ident)]) + rkey.events.incl(Event.Write) + rkey.events = {Event.Write} + of EVFILT_TIMER: + pkey = addr(s.fds[int(kevent.ident)]) + if Event.Oneshot in pkey.events: + # we will not clear key until it will be unregistered, so + # application can obtain data, but we will decrease counter, + # because kqueue is empty. + dec(s.count) + # we are marking key with `Finished` event, to avoid double decrease. + pkey.events.incl(Event.Finished) + rkey.events.incl(Event.Timer) + of EVFILT_VNODE: + pkey = addr(s.fds[int(kevent.ident)]) + rkey.events.incl(Event.Vnode) + if (kevent.fflags and NOTE_DELETE) != 0: + rkey.events.incl(Event.VnodeDelete) + if (kevent.fflags and NOTE_WRITE) != 0: + rkey.events.incl(Event.VnodeWrite) + if (kevent.fflags and NOTE_EXTEND) != 0: + rkey.events.incl(Event.VnodeExtend) + if (kevent.fflags and NOTE_ATTRIB) != 0: + rkey.events.incl(Event.VnodeAttrib) + if (kevent.fflags and NOTE_LINK) != 0: + rkey.events.incl(Event.VnodeLink) + if (kevent.fflags and NOTE_RENAME) != 0: + rkey.events.incl(Event.VnodeRename) + if (kevent.fflags and NOTE_REVOKE) != 0: + rkey.events.incl(Event.VnodeRevoke) + of EVFILT_SIGNAL: + pkey = addr(s.fds[cast[int](kevent.udata)]) + rkey.fd = cast[int](kevent.udata) + rkey.events.incl(Event.Signal) + of EVFILT_PROC: + rkey.fd = cast[int](kevent.udata) + pkey = addr(s.fds[cast[int](kevent.udata)]) + # we will not clear key, until it will be unregistered, so + # application can obtain data, but we will decrease counter, + # because kqueue is empty. + dec(s.count) + # we are marking key with `Finished` event, to avoid double decrease. + pkey.events.incl(Event.Finished) + rkey.events.incl(Event.Process) + else: + doAssert(true, "Unsupported kqueue filter in the queue!") + + if (kevent.flags and EV_EOF) != 0: + # TODO this error handling needs to be rethought. + # `fflags` can sometimes be `0x80000000` and thus we use 'cast' + # here: + if kevent.fflags != 0: + rkey.errorCode = cast[OSErrorCode](kevent.fflags) + else: + # This assumes we are dealing with sockets. + # TODO: For future-proofing it might be a good idea to give the + # user access to the raw `kevent`. + rkey.errorCode = OSErrorCode(ECONNRESET) + rkey.events.incl(Event.Error) + + results[k] = rkey + inc(k) + inc(i) + result = k + +proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey] = + result = newSeq[ReadyKey](MAX_KQUEUE_EVENTS) + let count = selectInto(s, timeout, result) + result.setLen(count) + +template isEmpty*[T](s: Selector[T]): bool = + (s.count == 0) + +proc contains*[T](s: Selector[T], fd: SocketHandle|int): bool {.inline.} = + let fdi = fd.int + fdi < s.maxFD and s.fds[fd.int].ident != InvalidIdent + +proc setData*[T](s: Selector[T], fd: SocketHandle|int, data: T): bool = + let fdi = int(fd) + if fdi in s: + s.fds[fdi].data = data + result = true + +template withData*[T](s: Selector[T], fd: SocketHandle|int, value, + body: untyped) = + let fdi = int(fd) + if fdi in s: + var value = addr(s.fds[fdi].data) + body + +template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1, + body2: untyped) = + let fdi = int(fd) + if fdi in s: + var value = addr(s.fds[fdi].data) + body1 + else: + body2 + + +proc getFd*[T](s: Selector[T]): int = + return s.kqFD.int diff --git a/chronos/ioselects/ioselectors_poll.nim b/chronos/ioselects/ioselectors_poll.nim new file mode 100644 index 00000000..1af2a46d --- /dev/null +++ b/chronos/ioselects/ioselectors_poll.nim @@ -0,0 +1,310 @@ +# +# +# Nim's Runtime Library +# (c) Copyright 2016 Eugene Kabanov +# +# See the file "copying.txt", included in this +# distribution, for details about the copyright. +# + +# This module implements Posix poll(). + +import posix, times + +# Maximum number of events that can be returned +const MAX_POLL_EVENTS = 64 + +when hasThreadSupport: + type + SelectorImpl[T] = object + maxFD : int + pollcnt: int + fds: ptr SharedArray[SelectorKey[T]] + pollfds: ptr SharedArray[TPollFd] + count: int + lock: Lock + Selector*[T] = ptr SelectorImpl[T] +else: + type + SelectorImpl[T] = object + maxFD : int + pollcnt: int + fds: seq[SelectorKey[T]] + pollfds: seq[TPollFd] + count: int + Selector*[T] = ref SelectorImpl[T] + +type + SelectEventImpl = object + rfd: cint + wfd: cint + SelectEvent* = ptr SelectEventImpl + +when hasThreadSupport: + template withPollLock[T](s: Selector[T], body: untyped) = + acquire(s.lock) + {.locks: [s.lock].}: + try: + body + finally: + release(s.lock) +else: + template withPollLock(s, body: untyped) = + body + +proc newSelector*[T](): Selector[T] = + var a = RLimit() + if getrlimit(posix.RLIMIT_NOFILE, a) != 0: + raiseIOSelectorsError(osLastError()) + var maxFD = int(a.rlim_max) + + when hasThreadSupport: + result = cast[Selector[T]](allocShared0(sizeof(SelectorImpl[T]))) + result.maxFD = maxFD + result.fds = allocSharedArray[SelectorKey[T]](maxFD) + result.pollfds = allocSharedArray[TPollFd](maxFD) + initLock(result.lock) + else: + result = Selector[T]() + result.maxFD = maxFD + result.fds = newSeq[SelectorKey[T]](maxFD) + result.pollfds = newSeq[TPollFd](maxFD) + + for i in 0 ..< maxFD: + result.fds[i].ident = InvalidIdent + +proc close*[T](s: Selector[T]) = + when hasThreadSupport: + deinitLock(s.lock) + deallocSharedArray(s.fds) + deallocSharedArray(s.pollfds) + deallocShared(cast[pointer](s)) + +template pollAdd[T](s: Selector[T], sock: cint, events: set[Event]) = + withPollLock(s): + var pollev: cshort = 0 + if Event.Read in events: pollev = pollev or POLLIN + if Event.Write in events: pollev = pollev or POLLOUT + s.pollfds[s.pollcnt].fd = cint(sock) + s.pollfds[s.pollcnt].events = pollev + inc(s.count) + inc(s.pollcnt) + +template pollUpdate[T](s: Selector[T], sock: cint, events: set[Event]) = + withPollLock(s): + var i = 0 + var pollev: cshort = 0 + if Event.Read in events: pollev = pollev or POLLIN + if Event.Write in events: pollev = pollev or POLLOUT + + while i < s.pollcnt: + if s.pollfds[i].fd == sock: + s.pollfds[i].events = pollev + break + inc(i) + doAssert(i < s.pollcnt, + "Descriptor [" & $sock & "] is not registered in the queue!") + +template pollRemove[T](s: Selector[T], sock: cint) = + withPollLock(s): + var i = 0 + while i < s.pollcnt: + if s.pollfds[i].fd == sock: + if i == s.pollcnt - 1: + s.pollfds[i].fd = 0 + s.pollfds[i].events = 0 + s.pollfds[i].revents = 0 + else: + while i < (s.pollcnt - 1): + s.pollfds[i].fd = s.pollfds[i + 1].fd + s.pollfds[i].events = s.pollfds[i + 1].events + inc(i) + break + inc(i) + dec(s.pollcnt) + dec(s.count) + +template checkFd(s, f) = + if f >= s.maxFD: + raiseIOSelectorsError("Maximum number of descriptors is exhausted!") + +proc registerHandle*[T](s: Selector[T], fd: int | SocketHandle, + events: set[Event], data: T) = + var fdi = int(fd) + s.checkFd(fdi) + doAssert(s.fds[fdi].ident == InvalidIdent) + setKey(s, fdi, events, 0, data) + if events != {}: s.pollAdd(fdi.cint, events) + +proc updateHandle*[T](s: Selector[T], fd: int | SocketHandle, + events: set[Event]) = + let maskEvents = {Event.Timer, Event.Signal, Event.Process, Event.Vnode, + Event.User, Event.Oneshot, Event.Error} + let fdi = int(fd) + s.checkFd(fdi) + var pkey = addr(s.fds[fdi]) + doAssert(pkey.ident != InvalidIdent, + "Descriptor [" & $fdi & "] is not registered in the queue!") + doAssert(pkey.events * maskEvents == {}) + + if pkey.events != events: + if pkey.events == {}: + s.pollAdd(fd.cint, events) + else: + if events != {}: + s.pollUpdate(fd.cint, events) + else: + s.pollRemove(fd.cint) + pkey.events = events + +proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) = + var fdi = int(ev.rfd) + doAssert(s.fds[fdi].ident == InvalidIdent, "Event is already registered in the queue!") + var events = {Event.User} + setKey(s, fdi, events, 0, data) + events.incl(Event.Read) + s.pollAdd(fdi.cint, events) + +proc unregister*[T](s: Selector[T], fd: int|SocketHandle) = + let fdi = int(fd) + s.checkFd(fdi) + var pkey = addr(s.fds[fdi]) + doAssert(pkey.ident != InvalidIdent, + "Descriptor [" & $fdi & "] is not registered in the queue!") + pkey.ident = InvalidIdent + if pkey.events != {}: + pkey.events = {} + s.pollRemove(fdi.cint) + +proc unregister*[T](s: Selector[T], ev: SelectEvent) = + let fdi = int(ev.rfd) + s.checkFd(fdi) + var pkey = addr(s.fds[fdi]) + doAssert(pkey.ident != InvalidIdent, "Event is not registered in the queue!") + doAssert(Event.User in pkey.events) + pkey.ident = InvalidIdent + pkey.events = {} + s.pollRemove(fdi.cint) + +proc newSelectEvent*(): SelectEvent = + var fds: array[2, cint] + if posix.pipe(fds) != 0: + raiseIOSelectorsError(osLastError()) + setNonBlocking(fds[0]) + setNonBlocking(fds[1]) + result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl))) + result.rfd = fds[0] + result.wfd = fds[1] + +proc trigger*(ev: SelectEvent) = + var data: uint64 = 1 + if posix.write(ev.wfd, addr data, sizeof(uint64)) != sizeof(uint64): + raiseIOSelectorsError(osLastError()) + +proc close*(ev: SelectEvent) = + let res1 = posix.close(ev.rfd) + let res2 = posix.close(ev.wfd) + deallocShared(cast[pointer](ev)) + if res1 != 0 or res2 != 0: + raiseIOSelectorsError(osLastError()) + +proc selectInto*[T](s: Selector[T], timeout: int, + results: var openarray[ReadyKey]): int = + var maxres = MAX_POLL_EVENTS + if maxres > len(results): + maxres = len(results) + + verifySelectParams(timeout) + + s.withPollLock(): + let count = posix.poll(addr(s.pollfds[0]), Tnfds(s.pollcnt), timeout) + if count < 0: + result = 0 + let err = osLastError() + if cint(err) != EINTR: + raiseIOSelectorsError(err) + elif count == 0: + result = 0 + else: + var i = 0 + var k = 0 + var rindex = 0 + while (i < s.pollcnt) and (k < count) and (rindex < maxres): + let revents = s.pollfds[i].revents + if revents != 0: + let fd = s.pollfds[i].fd + var pkey = addr(s.fds[fd]) + var rkey = ReadyKey(fd: int(fd), events: {}) + + if (revents and POLLIN) != 0: + rkey.events.incl(Event.Read) + if Event.User in pkey.events: + var data: uint64 = 0 + if posix.read(fd, addr data, sizeof(uint64)) != sizeof(uint64): + let err = osLastError() + if err != OSErrorCode(EAGAIN): + raiseIOSelectorsError(err) + else: + # someone already consumed event data + inc(i) + continue + rkey.events = {Event.User} + if (revents and POLLOUT) != 0: + rkey.events.incl(Event.Write) + if (revents and POLLERR) != 0 or (revents and POLLHUP) != 0 or + (revents and POLLNVAL) != 0: + rkey.events.incl(Event.Error) + results[rindex] = rkey + s.pollfds[i].revents = 0 + inc(rindex) + inc(k) + inc(i) + result = k + +proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey] = + result = newSeq[ReadyKey](MAX_POLL_EVENTS) + let count = selectInto(s, timeout, result) + result.setLen(count) + +template isEmpty*[T](s: Selector[T]): bool = + (s.count == 0) + +proc contains*[T](s: Selector[T], fd: SocketHandle|int): bool {.inline.} = + return s.fds[fd.int].ident != InvalidIdent + +proc getData*[T](s: Selector[T], fd: SocketHandle|int): var T = + let fdi = int(fd) + s.checkFd(fdi) + if fdi in s: + result = s.fds[fdi].data + +proc setData*[T](s: Selector[T], fd: SocketHandle|int, data: T): bool = + let fdi = int(fd) + s.checkFd(fdi) + if fdi in s: + s.fds[fdi].data = data + result = true + +template withData*[T](s: Selector[T], fd: SocketHandle|int, value, + body: untyped) = + mixin checkFd + let fdi = int(fd) + s.checkFd(fdi) + if fdi in s: + var value = addr(s.getData(fdi)) + body + +template withData*[T](s: Selector[T], fd: SocketHandle|int, value, body1, + body2: untyped) = + mixin checkFd + let fdi = int(fd) + s.checkFd(fdi) + if fdi in s: + var value = addr(s.getData(fdi)) + body1 + else: + body2 + + +proc getFd*[T](s: Selector[T]): int = + return -1 diff --git a/chronos/ioselects/ioselectors_select.nim b/chronos/ioselects/ioselectors_select.nim new file mode 100644 index 00000000..02a853b4 --- /dev/null +++ b/chronos/ioselects/ioselectors_select.nim @@ -0,0 +1,465 @@ +# +# +# Nim's Runtime Library +# (c) Copyright 2016 Eugene Kabanov +# +# See the file "copying.txt", included in this +# distribution, for details about the copyright. +# + +# This module implements Posix and Windows select(). + +import times, nativesockets + +when defined(windows): + import winlean + when defined(gcc): + {.passl: "-lws2_32".} + elif defined(vcc): + {.passl: "ws2_32.lib".} + const platformHeaders = """#include + #include """ + const EAGAIN = WSAEWOULDBLOCK +else: + const platformHeaders = """#include + #include + #include + #include """ +type + Fdset {.importc: "fd_set", header: platformHeaders, pure, final.} = object +var + FD_SETSIZE {.importc: "FD_SETSIZE", header: platformHeaders.}: cint + +proc IOFD_SET(fd: SocketHandle, fdset: ptr Fdset) + {.cdecl, importc: "FD_SET", header: platformHeaders, inline.} +proc IOFD_CLR(fd: SocketHandle, fdset: ptr Fdset) + {.cdecl, importc: "FD_CLR", header: platformHeaders, inline.} +proc IOFD_ZERO(fdset: ptr Fdset) + {.cdecl, importc: "FD_ZERO", header: platformHeaders, inline.} + +when defined(windows): + proc IOFD_ISSET(fd: SocketHandle, fdset: ptr Fdset): cint + {.stdcall, importc: "FD_ISSET", header: platformHeaders, inline.} + proc ioselect(nfds: cint, readFds, writeFds, exceptFds: ptr Fdset, + timeout: ptr Timeval): cint + {.stdcall, importc: "select", header: platformHeaders.} +else: + proc IOFD_ISSET(fd: SocketHandle, fdset: ptr Fdset): cint + {.cdecl, importc: "FD_ISSET", header: platformHeaders, inline.} + proc ioselect(nfds: cint, readFds, writeFds, exceptFds: ptr Fdset, + timeout: ptr Timeval): cint + {.cdecl, importc: "select", header: platformHeaders.} + +when hasThreadSupport: + type + SelectorImpl[T] = object + rSet: FdSet + wSet: FdSet + eSet: FdSet + maxFD: int + fds: ptr SharedArray[SelectorKey[T]] + count: int + lock: Lock + Selector*[T] = ptr SelectorImpl[T] +else: + type + SelectorImpl[T] = object + rSet: FdSet + wSet: FdSet + eSet: FdSet + maxFD: int + fds: seq[SelectorKey[T]] + count: int + Selector*[T] = ref SelectorImpl[T] + +type + SelectEventImpl = object + rsock: SocketHandle + wsock: SocketHandle + SelectEvent* = ptr SelectEventImpl + +when hasThreadSupport: + template withSelectLock[T](s: Selector[T], body: untyped) = + acquire(s.lock) + {.locks: [s.lock].}: + try: + body + finally: + release(s.lock) +else: + template withSelectLock[T](s: Selector[T], body: untyped) = + body + +proc newSelector*[T](): Selector[T] = + when hasThreadSupport: + result = cast[Selector[T]](allocShared0(sizeof(SelectorImpl[T]))) + result.fds = allocSharedArray[SelectorKey[T]](FD_SETSIZE) + initLock result.lock + else: + result = Selector[T]() + result.fds = newSeq[SelectorKey[T]](FD_SETSIZE) + + for i in 0 ..< FD_SETSIZE: + result.fds[i].ident = InvalidIdent + + IOFD_ZERO(addr result.rSet) + IOFD_ZERO(addr result.wSet) + IOFD_ZERO(addr result.eSet) + +proc close*[T](s: Selector[T]) = + when hasThreadSupport: + deallocSharedArray(s.fds) + deallocShared(cast[pointer](s)) + +when defined(windows): + proc newSelectEvent*(): SelectEvent = + var ssock = createNativeSocket() + var wsock = createNativeSocket() + var rsock: SocketHandle = INVALID_SOCKET + var saddr = Sockaddr_in() + + saddr.sin_family = winlean.AF_INET + saddr.sin_port = 0 + saddr.sin_addr.s_addr = INADDR_ANY + if bindAddr(ssock, cast[ptr SockAddr](addr(saddr)), + sizeof(saddr).SockLen) < 0'i32: + raiseIOSelectorsError(osLastError()) + + if winlean.listen(ssock, 1) != 0: + raiseIOSelectorsError(osLastError()) + + var namelen = sizeof(saddr).SockLen + if getsockname(ssock, cast[ptr SockAddr](addr(saddr)), + addr(namelen)) != 0'i32: + raiseIOSelectorsError(osLastError()) + + saddr.sin_addr.s_addr = 0x0100007F + if winlean.connect(wsock, cast[ptr SockAddr](addr(saddr)), + sizeof(saddr).SockLen) != 0: + raiseIOSelectorsError(osLastError()) + namelen = sizeof(saddr).SockLen + rsock = winlean.accept(ssock, cast[ptr SockAddr](addr(saddr)), + cast[ptr SockLen](addr(namelen))) + if rsock == SocketHandle(-1): + raiseIOSelectorsError(osLastError()) + + if winlean.closesocket(ssock) != 0: + raiseIOSelectorsError(osLastError()) + + var mode = clong(1) + if ioctlsocket(rsock, FIONBIO, addr(mode)) != 0: + raiseIOSelectorsError(osLastError()) + mode = clong(1) + if ioctlsocket(wsock, FIONBIO, addr(mode)) != 0: + raiseIOSelectorsError(osLastError()) + + result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl))) + result.rsock = rsock + result.wsock = wsock + + proc trigger*(ev: SelectEvent) = + var data: uint64 = 1 + if winlean.send(ev.wsock, cast[pointer](addr data), + cint(sizeof(uint64)), 0) != sizeof(uint64): + raiseIOSelectorsError(osLastError()) + + proc close*(ev: SelectEvent) = + let res1 = winlean.closesocket(ev.rsock) + let res2 = winlean.closesocket(ev.wsock) + deallocShared(cast[pointer](ev)) + if res1 != 0 or res2 != 0: + raiseIOSelectorsError(osLastError()) + +else: + proc newSelectEvent*(): SelectEvent = + var fds: array[2, cint] + if posix.pipe(fds) != 0: + raiseIOSelectorsError(osLastError()) + setNonBlocking(fds[0]) + setNonBlocking(fds[1]) + result = cast[SelectEvent](allocShared0(sizeof(SelectEventImpl))) + result.rsock = SocketHandle(fds[0]) + result.wsock = SocketHandle(fds[1]) + + proc trigger*(ev: SelectEvent) = + var data: uint64 = 1 + if posix.write(cint(ev.wsock), addr data, sizeof(uint64)) != sizeof(uint64): + raiseIOSelectorsError(osLastError()) + + proc close*(ev: SelectEvent) = + let res1 = posix.close(cint(ev.rsock)) + let res2 = posix.close(cint(ev.wsock)) + deallocShared(cast[pointer](ev)) + if res1 != 0 or res2 != 0: + raiseIOSelectorsError(osLastError()) + +proc setSelectKey[T](s: Selector[T], fd: SocketHandle, events: set[Event], + data: T) = + var i = 0 + let fdi = int(fd) + while i < FD_SETSIZE: + if s.fds[i].ident == InvalidIdent: + var pkey = addr(s.fds[i]) + pkey.ident = fdi + pkey.events = events + pkey.data = data + break + inc(i) + if i >= FD_SETSIZE: + raiseIOSelectorsError("Maximum number of descriptors is exhausted!") + +proc getKey[T](s: Selector[T], fd: SocketHandle): ptr SelectorKey[T] = + var i = 0 + let fdi = int(fd) + while i < FD_SETSIZE: + if s.fds[i].ident == fdi: + result = addr(s.fds[i]) + break + inc(i) + doAssert(i < FD_SETSIZE, + "Descriptor [" & $int(fd) & "] is not registered in the queue!") + +proc delKey[T](s: Selector[T], fd: SocketHandle) = + var empty: T + var i = 0 + while i < FD_SETSIZE: + if s.fds[i].ident == fd.int: + s.fds[i].ident = InvalidIdent + s.fds[i].events = {} + s.fds[i].data = empty + break + inc(i) + doAssert(i < FD_SETSIZE, + "Descriptor [" & $int(fd) & "] is not registered in the queue!") + +proc registerHandle*[T](s: Selector[T], fd: int | SocketHandle, + events: set[Event], data: T) = + when not defined(windows): + let fdi = int(fd) + s.withSelectLock(): + s.setSelectKey(fd, events, data) + when not defined(windows): + if fdi > s.maxFD: s.maxFD = fdi + if Event.Read in events: + IOFD_SET(fd, addr s.rSet) + inc(s.count) + if Event.Write in events: + IOFD_SET(fd, addr s.wSet) + IOFD_SET(fd, addr s.eSet) + inc(s.count) + +proc registerEvent*[T](s: Selector[T], ev: SelectEvent, data: T) = + when not defined(windows): + let fdi = int(ev.rsock) + s.withSelectLock(): + s.setSelectKey(ev.rsock, {Event.User}, data) + when not defined(windows): + if fdi > s.maxFD: s.maxFD = fdi + IOFD_SET(ev.rsock, addr s.rSet) + inc(s.count) + +proc updateHandle*[T](s: Selector[T], fd: int | SocketHandle, + events: set[Event]) = + let maskEvents = {Event.Timer, Event.Signal, Event.Process, Event.Vnode, + Event.User, Event.Oneshot, Event.Error} + s.withSelectLock(): + var pkey = s.getKey(fd) + doAssert(pkey.events * maskEvents == {}) + if pkey.events != events: + if (Event.Read in pkey.events) and (Event.Read notin events): + IOFD_CLR(fd, addr s.rSet) + dec(s.count) + if (Event.Write in pkey.events) and (Event.Write notin events): + IOFD_CLR(fd, addr s.wSet) + IOFD_CLR(fd, addr s.eSet) + dec(s.count) + if (Event.Read notin pkey.events) and (Event.Read in events): + IOFD_SET(fd, addr s.rSet) + inc(s.count) + if (Event.Write notin pkey.events) and (Event.Write in events): + IOFD_SET(fd, addr s.wSet) + IOFD_SET(fd, addr s.eSet) + inc(s.count) + pkey.events = events + +proc unregister*[T](s: Selector[T], fd: SocketHandle|int) = + s.withSelectLock(): + let fd = fd.SocketHandle + var pkey = s.getKey(fd) + if Event.Read in pkey.events or Event.User in pkey.events: + IOFD_CLR(fd, addr s.rSet) + dec(s.count) + if Event.Write in pkey.events: + IOFD_CLR(fd, addr s.wSet) + IOFD_CLR(fd, addr s.eSet) + dec(s.count) + s.delKey(fd) + +proc unregister*[T](s: Selector[T], ev: SelectEvent) = + let fd = ev.rsock + s.withSelectLock(): + var pkey = s.getKey(fd) + IOFD_CLR(fd, addr s.rSet) + dec(s.count) + s.delKey(fd) + +proc selectInto*[T](s: Selector[T], timeout: int, + results: var openarray[ReadyKey]): int = + var tv = Timeval() + var ptv = addr tv + var rset, wset, eset: FdSet + + verifySelectParams(timeout) + + if timeout != -1: + when defined(genode): + tv.tv_sec = Time(timeout div 1_000) + else: + tv.tv_sec = timeout.int32 div 1_000 + tv.tv_usec = (timeout.int32 %% 1_000) * 1_000 + else: + ptv = nil + + s.withSelectLock(): + rset = s.rSet + wset = s.wSet + eset = s.eSet + + var count = ioselect(cint(s.maxFD) + 1, addr(rset), addr(wset), + addr(eset), ptv) + if count < 0: + result = 0 + when defined(windows): + raiseIOSelectorsError(osLastError()) + else: + let err = osLastError() + if cint(err) != EINTR: + raiseIOSelectorsError(err) + elif count == 0: + result = 0 + else: + var rindex = 0 + var i = 0 + var k = 0 + + while (i < FD_SETSIZE) and (k < count): + if s.fds[i].ident != InvalidIdent: + var flag = false + var pkey = addr(s.fds[i]) + var rkey = ReadyKey(fd: int(pkey.ident), events: {}) + let fd = SocketHandle(pkey.ident) + if IOFD_ISSET(fd, addr rset) != 0: + if Event.User in pkey.events: + var data: uint64 = 0 + if recv(fd, cast[pointer](addr(data)), + sizeof(uint64).cint, 0) != sizeof(uint64): + let err = osLastError() + if cint(err) != EAGAIN: + raiseIOSelectorsError(err) + else: + inc(i) + inc(k) + continue + else: + flag = true + rkey.events = {Event.User} + else: + flag = true + rkey.events = {Event.Read} + if IOFD_ISSET(fd, addr wset) != 0: + rkey.events.incl(Event.Write) + if IOFD_ISSET(fd, addr eset) != 0: + rkey.events.incl(Event.Error) + flag = true + if flag: + results[rindex] = rkey + inc(rindex) + inc(k) + inc(i) + result = rindex + +proc select*[T](s: Selector[T], timeout: int): seq[ReadyKey] = + result = newSeq[ReadyKey](FD_SETSIZE) + var count = selectInto(s, timeout, result) + result.setLen(count) + +proc flush*[T](s: Selector[T]) = discard + +template isEmpty*[T](s: Selector[T]): bool = + (s.count == 0) + +proc contains*[T](s: Selector[T], fd: SocketHandle|int): bool {.inline.} = + s.withSelectLock(): + result = false + + let fdi = int(fd) + for i in 0..= -1, "Cannot select with a negative value, got " & $timeout) + + when defined(linux): + include ./ioselects/ioselectors_epoll + elif bsdPlatform: + include ./ioselects/ioselectors_kqueue + elif defined(windows): + include ./ioselects/ioselectors_select + elif defined(solaris): + include ./ioselects/ioselectors_poll # need to replace it with event ports + elif defined(genode): + include ./ioselects/ioselectors_select # TODO: use the native VFS layer + elif defined(nintendoswitch): + include ./ioselects/ioselectors_select + else: + include ./ioselects/ioselectors_poll + +proc register*[T](s: Selector[T], fd: int | SocketHandle, + events: set[Event], data: T) {.deprecated: "use registerHandle instead".} = + ## **Deprecated since v0.18.0:** Use ``registerHandle`` instead. + s.registerHandle(fd, events, data) + +proc setEvent*(ev: SelectEvent) {.deprecated: "use trigger instead", + raises: [Defect, IOSelectorsException].} = + ## Trigger event ``ev``. + ## + ## **Deprecated since v0.18.0:** Use ``trigger`` instead. + ev.trigger() + +proc update*[T](s: Selector[T], fd: int | SocketHandle, + events: set[Event]) {.deprecated: "use updateHandle instead".} = + ## Update file/socket descriptor ``fd``, registered in selector + ## ``s`` with new events set ``event``. + ## + ## **Deprecated since v0.18.0:** Use ``updateHandle`` instead. + s.updateHandle() diff --git a/chronos/sendfile.nim b/chronos/sendfile.nim index 358643f9..ca3829c9 100644 --- a/chronos/sendfile.nim +++ b/chronos/sendfile.nim @@ -9,6 +9,8 @@ ## This module provides cross-platform wrapper for ``sendfile()`` syscall. +{.push raises: [Defect].} + when defined(nimdoc): proc sendfile*(outfd, infd: int, offset: int, count: var int): int = ## Copies data between file descriptor ``infd`` and ``outfd``. Because this diff --git a/chronos/srcloc.nim b/chronos/srcloc.nim index 9d7f942b..576fc58a 100644 --- a/chronos/srcloc.nim +++ b/chronos/srcloc.nim @@ -1,3 +1,5 @@ +{.push raises: [].} + type SrcLoc* = object procedure*: cstring diff --git a/chronos/streams/asyncstream.nim b/chronos/streams/asyncstream.nim index ce7b1f7e..05b48cee 100644 --- a/chronos/streams/asyncstream.nim +++ b/chronos/streams/asyncstream.nim @@ -6,6 +6,9 @@ # Licensed under either of # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) + +{.push raises: [Defect].} + import ../asyncloop, ../asyncsync import ../transports/common, ../transports/stream export asyncsync, stream, common @@ -58,9 +61,9 @@ type Finished, ## Stream was properly finished Closed ## Stream was closed - StreamReaderLoop* = proc (stream: AsyncStreamReader): Future[void] {.gcsafe.} + StreamReaderLoop* = proc (stream: AsyncStreamReader): Future[void] {.gcsafe, raises: [Defect].} ## Main read loop for read streams. - StreamWriterLoop* = proc (stream: AsyncStreamWriter): Future[void] {.gcsafe.} + StreamWriterLoop* = proc (stream: AsyncStreamWriter): Future[void] {.gcsafe, raises: [Defect].} ## Main write loop for write streams. AsyncStreamReader* = ref object of RootRef @@ -202,16 +205,20 @@ proc newAsyncStreamUseClosedError*(): ref AsyncStreamUseClosedError {. noinline.} = newException(AsyncStreamUseClosedError, "Stream is already closed") -proc raiseAsyncStreamUseClosedError*() {.noinline, noreturn.} = +proc raiseAsyncStreamUseClosedError*() {. + noinline, noreturn, raises: [Defect, AsyncStreamUseClosedError].} = raise newAsyncStreamUseClosedError() -proc raiseAsyncStreamLimitError*() {.noinline, noreturn.} = +proc raiseAsyncStreamLimitError*() {. + noinline, noreturn, raises: [Defect, AsyncStreamLimitError].} = raise newAsyncStreamLimitError() -proc raiseAsyncStreamIncompleteError*() {.noinline, noreturn.} = +proc raiseAsyncStreamIncompleteError*() {. + noinline, noreturn, raises: [Defect, AsyncStreamIncompleteError].} = raise newAsyncStreamIncompleteError() -proc raiseAsyncStreamIncorrectDefect*(m: string) {.noinline, noreturn.} = +proc raiseAsyncStreamIncorrectDefect*(m: string) {. + noinline, noreturn, raises: [Defect].} = raise newException(AsyncStreamIncorrectDefect, m) proc raiseEmptyMessageDefect*() {.noinline, noreturn.} = @@ -248,8 +255,8 @@ proc running*(rw: AsyncStreamRW): bool {.inline.} = ## Returns ``true`` is reading/writing stream is still pending. (rw.state == AsyncStreamState.Running) -proc setupAsyncStreamReaderTracker(): AsyncStreamTracker {.gcsafe.} -proc setupAsyncStreamWriterTracker(): AsyncStreamTracker {.gcsafe.} +proc setupAsyncStreamReaderTracker(): AsyncStreamTracker {.gcsafe, raises: [Defect].} +proc setupAsyncStreamWriterTracker(): AsyncStreamTracker {.gcsafe, raises: [Defect].} proc getAsyncStreamReaderTracker(): AsyncStreamTracker {.inline.} = var res = cast[AsyncStreamTracker](getTracker(AsyncStreamReaderTrackerName)) @@ -873,7 +880,7 @@ proc close*(rw: AsyncStreamRW) = rw.state = AsyncStreamState.Closed - proc continuation(udata: pointer) = + proc continuation(udata: pointer) {.raises: [Defect].} = if not isNil(rw.udata): GC_unref(cast[ref int](rw.udata)) if not(rw.future.finished()): diff --git a/chronos/timer.nim b/chronos/timer.nim index c61ff6d4..c4584233 100644 --- a/chronos/timer.nim +++ b/chronos/timer.nim @@ -24,6 +24,8 @@ ## You can specify which timer you want to use ``-d:asyncTimer=``. const asyncTimer* {.strdefine.} = "mono" +{.push raises: [Defect].} + when defined(windows): when asyncTimer == "system": from winlean import getSystemTimeAsFileTime, FILETIME diff --git a/chronos/transport.nim b/chronos/transport.nim index 48538582..61905d9b 100644 --- a/chronos/transport.nim +++ b/chronos/transport.nim @@ -6,8 +6,8 @@ # Licensed under either of # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) -import transports/[datagram, stream, common, ipnet, osnet] -import streams/[asyncstream, chunkstream] +import ./transports/[datagram, stream, common, ipnet, osnet] +import ./streams/[asyncstream, chunkstream] export datagram, common, stream, ipnet, osnet export asyncstream, chunkstream diff --git a/chronos/transports/common.nim b/chronos/transports/common.nim index 5de8a4d6..f79ca449 100644 --- a/chronos/transports/common.nim +++ b/chronos/transports/common.nim @@ -6,7 +6,10 @@ # Licensed under either of # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) -import os, strutils, nativesockets, net + +{.push raises: [Defect].} + +import std/[os, strutils, nativesockets, net] import ../asyncloop export net @@ -183,9 +186,10 @@ proc `$`*(address: TransportAddress): string = else: result = "" else: - raise newException(TransportAddressError, "Unknown address family!") + result = "Unknown address family: " & $address.family -proc initTAddress*(address: string): TransportAddress = +proc initTAddress*(address: string): TransportAddress {. + raises: [Defect, TransportAddressError].} = ## Parses string representation of ``address``. ``address`` can be IPv4, IPv6 ## or Unix domain address. ## @@ -230,7 +234,8 @@ proc initTAddress*(address: string): TransportAddress = else: result = TransportAddress(family: AddressFamily.Unix) -proc initTAddress*(address: string, port: Port): TransportAddress = +proc initTAddress*(address: string, port: Port): TransportAddress {. + raises: [Defect, TransportAddressError].} = ## Initialize ``TransportAddress`` with IP (IPv4 or IPv6) address ``address`` ## and port number ``port``. try: @@ -246,7 +251,8 @@ proc initTAddress*(address: string, port: Port): TransportAddress = except CatchableError as exc: raise newException(TransportAddressError, exc.msg) -proc initTAddress*(address: string, port: int): TransportAddress {.inline.} = +proc initTAddress*(address: string, port: int): TransportAddress {. + raises: [Defect, TransportAddressError].} = ## Initialize ``TransportAddress`` with IP (IPv4 or IPv6) address ``address`` ## and port number ``port``. if port < 0 or port >= 65536: @@ -267,7 +273,8 @@ proc initTAddress*(address: IpAddress, port: Port): TransportAddress = proc getAddrInfo(address: string, port: Port, domain: Domain, sockType: SockType = SockType.SOCK_STREAM, - protocol: Protocol = Protocol.IPPROTO_TCP): ptr AddrInfo = + protocol: Protocol = Protocol.IPPROTO_TCP): ptr AddrInfo {. + raises: [Defect, TransportAddressError].} = ## We have this one copy of ``getAddrInfo()`` because of AI_V4MAPPED in ## ``net.nim:getAddrInfo()``, which is not cross-platform. var hints: AddrInfo @@ -346,7 +353,7 @@ proc toSAddr*(address: TransportAddress, sa: var Sockaddr_storage, else: discard -proc address*(ta: TransportAddress): IpAddress = +proc address*(ta: TransportAddress): IpAddress {.raises: [Defect, ValueError].} = ## Converts ``TransportAddress`` to ``net.IpAddress`` object. ## ## Note its impossible to convert ``TransportAddress`` of ``Unix`` family, @@ -361,7 +368,8 @@ proc address*(ta: TransportAddress): IpAddress = raise newException(ValueError, "IpAddress supports only IPv4/IPv6!") proc resolveTAddress*(address: string, - family = AddressFamily.IPv4): seq[TransportAddress] = + family = AddressFamily.IPv4): seq[TransportAddress] {. + raises: [Defect, TransportAddressError].} = ## Resolve string representation of ``address``. ## ## Supported formats are: @@ -412,7 +420,8 @@ proc resolveTAddress*(address: string, freeAddrInfo(aiList) proc resolveTAddress*(address: string, port: Port, - family = AddressFamily.IPv4): seq[TransportAddress] = + family = AddressFamily.IPv4): seq[TransportAddress] {. + raises: [Defect, TransportAddressError].} = ## Resolve string representation of ``address``. ## ## ``address`` could be dot IPv4/IPv6 address or hostname. @@ -439,7 +448,7 @@ proc resolveTAddress*(address: string, port: Port, proc resolveTAddress*(address: string, family: IpAddressFamily): seq[TransportAddress] {. - deprecated.} = + deprecated, raises: [Defect, TransportAddressError].} = if family == IpAddressFamily.IPv4: result = resolveTAddress(address, AddressFamily.IPv4) elif family == IpAddressFamily.IPv6: @@ -447,22 +456,24 @@ proc resolveTAddress*(address: string, proc resolveTAddress*(address: string, port: Port, family: IpAddressFamily): seq[TransportAddress] {. - deprecated.} = + deprecated, raises: [Defect, TransportAddressError].} = if family == IpAddressFamily.IPv4: result = resolveTAddress(address, port, AddressFamily.IPv4) elif family == IpAddressFamily.IPv6: result = resolveTAddress(address, port, AddressFamily.IPv6) -proc windowsAnyAddressFix*(a: TransportAddress): TransportAddress {.inline.} = +proc windowsAnyAddressFix*(a: TransportAddress): TransportAddress = ## BSD Sockets on *nix systems are able to perform connections to ## `0.0.0.0` or `::0` which are equal to `127.0.0.1` or `::1`. when defined(windows): if (a.family == AddressFamily.IPv4 and a.address_v4 == AnyAddress.address_v4): - result = initTAddress("127.0.0.1", a.port) + result = try: initTAddress("127.0.0.1", a.port) + except TransportAddressError as exc: raiseAssert exc.msg elif (a.family == AddressFamily.IPv6 and a.address_v6 == AnyAddress6.address_v6): - result = initTAddress("::1", a.port) + result = try: initTAddress("::1", a.port) + except TransportAddressError as exc: raiseAssert exc.msg else: result = a else: @@ -484,7 +495,7 @@ template checkWriteEof*(t: untyped, future: untyped) = "Transport connection is already dropped!")) return future -template getError*(t: untyped): ref Exception = +template getError*(t: untyped): ref CatchableError = var err = (t).error (t).error = nil err @@ -507,7 +518,8 @@ template getTransportOsError*(err: OSErrorCode): ref TransportOsError = template getTransportOsError*(err: cint): ref TransportOsError = getTransportOsError(OSErrorCode(err)) -proc raiseTransportOsError*(err: OSErrorCode) = +proc raiseTransportOsError*(err: OSErrorCode) {. + raises: [Defect, TransportOsError].} = ## Raises transport specific OS error. raise getTransportOsError(err) diff --git a/chronos/transports/datagram.nim b/chronos/transports/datagram.nim index f3f2eb46..7624e91c 100644 --- a/chronos/transports/datagram.nim +++ b/chronos/transports/datagram.nim @@ -7,9 +7,11 @@ # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) -import net, nativesockets, os, deques -import ../asyncloop, ../handles -import common +{.push raises: [Defect].} + +import std/[net, nativesockets, os, deques] +import ".."/[selectors2, asyncloop, handles] +import ./common when defined(windows): import winlean @@ -33,7 +35,7 @@ type writer: Future[void] # Writer vector completion Future DatagramCallback* = proc(transp: DatagramTransport, - remote: TransportAddress): Future[void] {.gcsafe.} + remote: TransportAddress): Future[void] {.gcsafe, raises: [Defect].} DatagramTransport* = ref object of RootRef fd*: AsyncFD # File descriptor @@ -41,7 +43,7 @@ type flags: set[ServerFlags] # Flags buffer: seq[byte] # Reading buffer buflen: int # Reading buffer effective size - error: ref Exception # Current error + error: ref CatchableError # Current error queue: Deque[GramVector] # Writer queue local: TransportAddress # Local address remote: TransportAddress # Remote address @@ -66,7 +68,8 @@ type const DgramTransportTrackerName = "datagram.transport" -proc remoteAddress*(transp: DatagramTransport): TransportAddress = +proc remoteAddress*(transp: DatagramTransport): TransportAddress {. + raises: [Defect, TransportOsError].} = ## Returns ``transp`` remote socket address. if transp.remote.family == AddressFamily.None: var saddr: Sockaddr_storage @@ -77,7 +80,8 @@ proc remoteAddress*(transp: DatagramTransport): TransportAddress = fromSAddr(addr saddr, slen, transp.remote) result = transp.remote -proc localAddress*(transp: DatagramTransport): TransportAddress = +proc localAddress*(transp: DatagramTransport): TransportAddress {. + raises: [Defect, TransportOsError].} = ## Returns ``transp`` local socket address. if transp.local.family == AddressFamily.None: var saddr: Sockaddr_storage @@ -92,7 +96,7 @@ template setReadError(t, e: untyped) = (t).state.incl(ReadError) (t).error = getTransportOsError(e) -proc setupDgramTransportTracker(): DgramTransportTracker {.gcsafe.} +proc setupDgramTransportTracker(): DgramTransportTracker {.gcsafe, raises: [Defect].} proc getDgramTransportTracker(): DgramTransportTracker {.inline.} = result = cast[DgramTransportTracker](getTracker(DgramTransportTrackerName)) @@ -286,7 +290,8 @@ when defined(windows): udata: pointer, child: DatagramTransport, bufferSize: int, - ttl: int): DatagramTransport = + ttl: int): DatagramTransport {. + raises: [Defect, CatchableError].} = var localSock: AsyncFD doAssert(remote.family == local.family) doAssert(not isNil(cbproc)) @@ -300,6 +305,7 @@ when defined(windows): if sock == asyncInvalidSocket: localSock = createAsyncSocket(local.getDomain(), SockType.SOCK_DGRAM, Protocol.IPPROTO_UDP) + if localSock == asyncInvalidSocket: raiseTransportOsError(osLastError()) else: @@ -397,7 +403,7 @@ when defined(windows): else: # Linux/BSD/MacOS part - proc readDatagramLoop(udata: pointer) = + proc readDatagramLoop(udata: pointer) {.raises: Defect.}= var raddr: TransportAddress doAssert(not isNil(udata)) var cdata = cast[ptr CompletionData](udata) @@ -466,15 +472,30 @@ else: break else: transp.state.incl(WritePaused) - transp.fd.removeWriter() + try: + transp.fd.removeWriter() + except IOSelectorsException as exc: + raiseAsDefect exc, "removeWriter" + except ValueError as exc: + raiseAsDefect exc, "removeWriter" proc resumeWrite(transp: DatagramTransport) {.inline.} = transp.state.excl(WritePaused) - addWriter(transp.fd, writeDatagramLoop, cast[pointer](transp)) + try: + addWriter(transp.fd, writeDatagramLoop, cast[pointer](transp)) + except IOSelectorsException as exc: + raiseAsDefect exc, "addWriter" + except ValueError as exc: + raiseAsDefect exc, "addWriter" proc resumeRead(transp: DatagramTransport) {.inline.} = transp.state.excl(ReadPaused) - addReader(transp.fd, readDatagramLoop, cast[pointer](transp)) + try: + addReader(transp.fd, readDatagramLoop, cast[pointer](transp)) + except IOSelectorsException as exc: + raiseAsDefect exc, "addReader" + except ValueError as exc: + raiseAsDefect exc, "addReader" proc newDatagramTransportCommon(cbproc: DatagramCallback, remote: TransportAddress, @@ -482,9 +503,10 @@ else: sock: AsyncFD, flags: set[ServerFlags], udata: pointer, - child: DatagramTransport = nil, + child: DatagramTransport, bufferSize: int, - ttl: int): DatagramTransport = + ttl: int): DatagramTransport {. + raises: [Defect, CatchableError].} = var localSock: AsyncFD doAssert(remote.family == local.family) doAssert(not isNil(cbproc)) @@ -580,7 +602,7 @@ else: proc close*(transp: DatagramTransport) = ## Closes and frees resources of transport ``transp``. - proc continuation(udata: pointer) = + proc continuation(udata: pointer) {.raises: Defect.} = if not(transp.future.finished()): # Stop tracking transport untrackDgram(transp) @@ -612,7 +634,8 @@ proc newDatagramTransport*(cbproc: DatagramCallback, child: DatagramTransport = nil, bufSize: int = DefaultDatagramBufferSize, ttl: int = 0 - ): DatagramTransport = + ): DatagramTransport {. + raises: [Defect, CatchableError].} = ## Create new UDP datagram transport (IPv4). ## ## ``cbproc`` - callback which will be called, when new datagram received. @@ -637,7 +660,8 @@ proc newDatagramTransport*[T](cbproc: DatagramCallback, child: DatagramTransport = nil, bufSize: int = DefaultDatagramBufferSize, ttl: int = 0 - ): DatagramTransport = + ): DatagramTransport {. + raises: [Defect, CatchableError].} = var fflags = flags + {GCUserData} GC_ref(udata) result = newDatagramTransportCommon(cbproc, remote, local, sock, @@ -653,7 +677,8 @@ proc newDatagramTransport6*(cbproc: DatagramCallback, child: DatagramTransport = nil, bufSize: int = DefaultDatagramBufferSize, ttl: int = 0 - ): DatagramTransport = + ): DatagramTransport {. + raises: [Defect, CatchableError].} = ## Create new UDP datagram transport (IPv6). ## ## ``cbproc`` - callback which will be called, when new datagram received. @@ -678,7 +703,8 @@ proc newDatagramTransport6*[T](cbproc: DatagramCallback, child: DatagramTransport = nil, bufSize: int = DefaultDatagramBufferSize, ttl: int = 0 - ): DatagramTransport = + ): DatagramTransport {. + raises: [Defect, CatchableError].} = var fflags = flags + {GCUserData} GC_ref(udata) result = newDatagramTransportCommon(cbproc, remote, local, sock, @@ -815,7 +841,7 @@ proc sendTo*[T](transp: DatagramTransport, remote: TransportAddress, return retFuture proc peekMessage*(transp: DatagramTransport, msg: var seq[byte], - msglen: var int) = + msglen: var int) {.raises: [Defect, CatchableError].} = ## Get access to internal message buffer and length of incoming datagram. if ReadError in transp.state: transp.state.excl(ReadError) @@ -823,7 +849,8 @@ proc peekMessage*(transp: DatagramTransport, msg: var seq[byte], shallowCopy(msg, transp.buffer) msglen = transp.buflen -proc getMessage*(transp: DatagramTransport): seq[byte] = +proc getMessage*(transp: DatagramTransport): seq[byte] {. + raises: [Defect, CatchableError].} = ## Copy data from internal message buffer and return result. if ReadError in transp.state: transp.state.excl(ReadError) diff --git a/chronos/transports/ipnet.nim b/chronos/transports/ipnet.nim index 615e42b9..cd56123e 100644 --- a/chronos/transports/ipnet.nim +++ b/chronos/transports/ipnet.nim @@ -8,8 +8,11 @@ # MIT license (LICENSE-MIT) ## This module implements various IP network utility procedures. -import stew/endians2, strutils -import common + +{.push raises: [Defect].} + +import stew/endians2, std/strutils +import ./common export common type @@ -325,9 +328,9 @@ proc `$`*(mask: IpMask, include0x = false): string = else: result.add(chr(ord('A') + (c - 10))) else: - raise newException(ValueError, "Invalid mask") + return "Unknown mask family: " & $host.family -proc ip*(mask: IpMask): string = +proc ip*(mask: IpMask): string {.raises: [Defect, ValueError].} = ## Returns IP address text representation of IP mask ``mask``. if mask.family == AddressFamily.IPv4: var ip = IpAddress(family: IpAddressFamily.IPv4) @@ -363,7 +366,8 @@ proc init*(t: typedesc[IpNet], host: TransportAddress, result.mask = mask result.host = host -proc init*(t: typedesc[IpNet], network: string): IpNet = +proc init*(t: typedesc[IpNet], network: string): IpNet {. + raises: [Defect, TransportAddressError].} = ## Initialize IP Network from string representation in format ##
/ or
/. var parts = network.rsplit("/", maxsplit = 1) @@ -549,7 +553,10 @@ proc `$`*(net: IpNet): string = result.add("/") let prefix = net.mask.prefix() if prefix == -1: - result.add(net.mask.ip()) + try: + result.add(net.mask.ip()) + except ValueError as exc: + result.add(exc.msg) else: result.add($prefix) elif net.host.family == AddressFamily.IPv6: @@ -559,7 +566,10 @@ proc `$`*(net: IpNet): string = result.add("/") let prefix = net.mask.prefix() if prefix == -1: - result.add(net.mask.ip()) + try: + result.add(net.mask.ip()) + except ValueError as exc: + result.add(exc.msg) else: result.add($prefix) diff --git a/chronos/transports/osnet.nim b/chronos/transports/osnet.nim index 3a7cecc5..7f8bdfd4 100644 --- a/chronos/transports/osnet.nim +++ b/chronos/transports/osnet.nim @@ -9,9 +9,12 @@ ## This module implements cross-platform network interfaces list. ## Currently supported OSes are Windows, Linux, MacOS, BSD(not tested). -import algorithm + +{.push raises: [Defect].} + +import std/algorithm from strutils import toHex -import ipnet +import ./ipnet export ipnet const @@ -19,7 +22,7 @@ const type InterfaceType* = enum - IfError = 0, # This is workaround element for ProoveInit warnings. + IfError = 0, # This is workaround element for ProveInit warnings. IfOther = 1, IfRegular1822 = 2, IfHdh1822 = 3, @@ -316,21 +319,22 @@ proc `$`*(iface: NetworkInterface): string = res.add("inet6 ") res.add($item) res.add(" netmask ") - res.add($(item.netmask().address())) + res.add(try: $(item.netmask().address()) except ValueError as exc: exc.msg) res.add(" brd ") - res.add($(item.broadcast().address())) + res.add( + try: $(item.broadcast().address()) except ValueError as exc: exc.msg) res proc `$`*(route: Route): string = - var res = $route.dest.address() + var res = try: $route.dest.address() except ValueError as exc: exc.msg res.add(" via ") if route.gateway.family != AddressFamily.None: res.add("gateway ") - res.add($route.gateway.address()) + res.add(try: $route.gateway.address() except ValueError as exc: exc.msg) else: res.add("link") res.add(" src ") - res.add($route.source.address()) + res.add(try: $route.source.address() except ValueError as exc: exc.msg) res proc cmp*(a, b: NetworkInterface): int = diff --git a/chronos/transports/stream.nim b/chronos/transports/stream.nim index aae041d7..a97e6b0f 100644 --- a/chronos/transports/stream.nim +++ b/chronos/transports/stream.nim @@ -6,11 +6,12 @@ # Licensed under either of # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) -import net, nativesockets, os, deques -import ../asyncloop, ../handles -import common -{.deadCodeElim: on.} +{.push raises: [Defect].} + +import std/[net, nativesockets, os, deques] +import ".."/[asyncloop, handles, selectors2] +import common when defined(windows): import winlean @@ -62,7 +63,7 @@ type ReadMessagePredicate* = proc (data: openarray[byte]): tuple[consumed: int, done: bool] {. - gcsafe, raises: [].} + gcsafe, raises: [Defect].} const StreamTransportTrackerName = "stream.transport" @@ -78,7 +79,7 @@ when defined(windows): reader: Future[void] # Current reader Future buffer: seq[byte] # Reading buffer offset: int # Reading buffer offset - error: ref Exception # Current error + error: ref CatchableError # Current error queue: Deque[StreamVector] # Writer queue future: Future[void] # Stream life future # Windows specific part @@ -105,7 +106,7 @@ else: reader: Future[void] # Current reader Future buffer: seq[byte] # Reading buffer offset: int # Reading buffer offset - error: ref Exception # Current error + error: ref CatchableError # Current error queue: Deque[StreamVector] # Writer queue future: Future[void] # Stream life future case kind*: TransportKind @@ -120,13 +121,13 @@ else: type StreamCallback* = proc(server: StreamServer, - client: StreamTransport): Future[void] {.gcsafe.} + client: StreamTransport): Future[void] {.gcsafe, raises: [Defect].} ## New remote client connection callback ## ``server`` - StreamServer object. ## ``client`` - accepted client transport. TransportInitCallback* = proc(server: StreamServer, - fd: AsyncFD): StreamTransport {.gcsafe.} + fd: AsyncFD): StreamTransport {.gcsafe, raises: [Defect].} ## Custom transport initialization procedure, which can allocate inherited ## StreamTransport object. @@ -137,7 +138,8 @@ type init*: TransportInitCallback # callback which will be called before # transport for new client -proc remoteAddress*(transp: StreamTransport): TransportAddress = +proc remoteAddress*(transp: StreamTransport): TransportAddress {. + raises: [Defect, TransportError].} = ## Returns ``transp`` remote socket address. if transp.kind != TransportKind.Socket: raise newException(TransportError, "Socket required!") @@ -150,7 +152,8 @@ proc remoteAddress*(transp: StreamTransport): TransportAddress = fromSAddr(addr saddr, slen, transp.remote) result = transp.remote -proc localAddress*(transp: StreamTransport): TransportAddress = +proc localAddress*(transp: StreamTransport): TransportAddress {. + raises: [Defect, TransportError].} = ## Returns ``transp`` local socket address. if transp.kind != TransportKind.Socket: raise newException(TransportError, "Socket required!") @@ -196,8 +199,8 @@ template shiftVectorFile(v, o: untyped) = (v).buf = cast[pointer](cast[uint]((v).buf) - cast[uint](o)) (v).offset += cast[uint]((o)) -proc setupStreamTransportTracker(): StreamTransportTracker {.gcsafe.} -proc setupStreamServerTracker(): StreamServerTracker {.gcsafe.} +proc setupStreamTransportTracker(): StreamTransportTracker {.gcsafe, raises: [Defect].} +proc setupStreamServerTracker(): StreamServerTracker {.gcsafe, raises: [Defect].} proc getStreamTransportTracker(): StreamTransportTracker {.inline.} = result = cast[StreamTransportTracker](getTracker(StreamTransportTrackerName)) @@ -267,7 +270,7 @@ proc completePendingWriteQueue(queue: var Deque[StreamVector], vector.writer.complete(v) proc failPendingWriteQueue(queue: var Deque[StreamVector], - error: ref Exception) {.inline.} = + error: ref CatchableError) {.inline.} = while len(queue) > 0: var vector = queue.popFirst() if not(vector.writer.finished()): @@ -704,8 +707,11 @@ when defined(windows): toSAddr(raddress, saddr, slen) proto = Protocol.IPPROTO_TCP - sock = createAsyncSocket(raddress.getDomain(), SockType.SOCK_STREAM, + sock = try: createAsyncSocket(raddress.getDomain(), SockType.SOCK_STREAM, proto) + except CatchableError as exc: + retFuture.fail(exc) + return retFuture if sock == asyncInvalidSocket: retFuture.fail(getTransportOsError(osLastError())) return retFuture @@ -760,7 +766,8 @@ when defined(windows): elif address.family == AddressFamily.Unix: ## Unix domain socket emulation with Windows Named Pipes. var pipeHandle = INVALID_HANDLE_VALUE - proc pipeContinuation(udata: pointer) {.gcsafe.} = + var pipeContinuation: proc (udata: pointer) {.gcsafe, raises: [Defect].} + pipeContinuation = proc (udata: pointer) = # Continue only if `retFuture` is not cancelled. if not(retFuture.finished()): var pipeSuffix = $cast[cstring](unsafeAddr address.address_un[0]) @@ -777,9 +784,17 @@ when defined(windows): else: retFuture.fail(getTransportOsError(err)) else: - register(AsyncFD(pipeHandle)) - let transp = newStreamPipeTransport(AsyncFD(pipeHandle), + try: + register(AsyncFD(pipeHandle)) + except CatchableError as exc: + retFuture.fail(exc) + return + + let transp = try: newStreamPipeTransport(AsyncFD(pipeHandle), bufferSize, child) + except CatchableError as exc: + retFuture.fail(exc) + return # Start tracking transport trackStream(transp) retFuture.complete(transp) @@ -787,7 +802,8 @@ when defined(windows): return retFuture - proc createAcceptPipe(server: StreamServer) = + proc createAcceptPipe(server: StreamServer) {. + raises: [Defect, CatchableError].} = let pipeSuffix = $cast[cstring](addr server.local.address_un) let pipeName = newWideCString(r"\\.\pipe\" & pipeSuffix[1 .. ^1]) var openMode = PIPE_ACCESS_DUPLEX or FILE_FLAG_OVERLAPPED @@ -840,7 +856,7 @@ when defined(windows): # We should not raise defects in this loop. discard disconnectNamedPipe(Handle(server.sock)) discard closeHandle(HANDLE(server.sock)) - raiseTransportOsError(osLastError()) + raiseAssert osErrorMsg(osLastError()) else: # Server close happens in callback, and we are not started new # connectNamedPipe session. @@ -864,10 +880,12 @@ when defined(windows): DWORD(server.bufferSize), DWORD(0), nil) if pipeHandle == INVALID_HANDLE_VALUE: - raiseTransportOsError(osLastError()) + raiseAssert osErrorMsg(osLastError()) server.sock = AsyncFD(pipeHandle) server.aovl.data.fd = AsyncFD(pipeHandle) - register(server.sock) + try: register(server.sock) + except CatchableError as exc: + raiseAsDefect exc, "register" let res = connectNamedPipe(pipeHandle, cast[POVERLAPPED](addr server.aovl)) if res == 0: @@ -880,7 +898,7 @@ when defined(windows): elif int32(err) == ERROR_PIPE_CONNECTED: discard else: - raiseTransportOsError(err) + raiseAssert osErrorMsg(err) break else: # Server close happens in callback, and we are not started new @@ -905,7 +923,7 @@ when defined(windows): SockLen(sizeof(SocketHandle))) != 0'i32: let err = OSErrorCode(wsaGetLastError()) server.asock.closeSocket() - raiseTransportOsError(err) + raiseAssert osErrorMsg(err) else: var ntransp: StreamTransport if not isNil(server.init): @@ -930,7 +948,7 @@ when defined(windows): break else: server.asock.closeSocket() - raiseTransportOsError(ovl.data.errCode) + raiseAssert $(ovl.data.errCode) else: # Server close happens in callback, and we are not started new # AcceptEx session. @@ -941,10 +959,12 @@ when defined(windows): ## Initiation if server.status notin {ServerStatus.Stopped, ServerStatus.Closed}: server.apending = true - server.asock = createAsyncSocket(server.domain, SockType.SOCK_STREAM, + # TODO No way to report back errors! + server.asock = try: createAsyncSocket(server.domain, SockType.SOCK_STREAM, Protocol.IPPROTO_TCP) + except CatchableError as exc: raiseAsDefect exc, "createAsyncSocket" if server.asock == asyncInvalidSocket: - raiseTransportOsError(OSErrorCode(wsaGetLastError())) + raiseAssert osErrorMsg(OSErrorCode(wsaGetLastError())) var dwBytesReceived = DWORD(0) let dwReceiveDataLength = DWORD(0) @@ -965,7 +985,7 @@ when defined(windows): elif int32(err) == ERROR_IO_PENDING: discard else: - raiseTransportOsError(err) + raiseAssert osErrorMsg(err) break else: # Server close happens in callback, and we are not started new @@ -1071,8 +1091,13 @@ when defined(windows): ntransp = newStreamPipeTransport(server.sock, server.bufferSize, nil, flags) # Start tracking transport + try: + server.createAcceptPipe() + except CatchableError as exc: + closeHandle(server.sock) + retFuture.fail(exc) + return trackStream(ntransp) - server.createAcceptPipe() retFuture.complete(ntransp) elif int32(ovl.data.errCode) in {ERROR_OPERATION_ABORTED, @@ -1082,8 +1107,14 @@ when defined(windows): server.clean() else: let sock = server.sock - server.createAcceptPipe() + try: + server.createAcceptPipe() + except CatchableError as exc: + closeHandle(sock) + retFuture.fail(exc) + return closeHandle(sock) + retFuture.fail(getTransportOsError(ovl.data.errCode)) proc cancellationPipe(udata: pointer) {.gcsafe.} = @@ -1092,8 +1123,12 @@ when defined(windows): if server.local.family in {AddressFamily.IPv4, AddressFamily.IPv6}: # TCP Sockets part var loop = getThreadDispatcher() - server.asock = createAsyncSocket(server.domain, SockType.SOCK_STREAM, + server.asock = try: createAsyncSocket(server.domain, SockType.SOCK_STREAM, Protocol.IPPROTO_TCP) + except CatchableError as exc: + retFuture.fail(exc) + return retFuture + if server.asock == asyncInvalidSocket: let err = osLastError() if int32(err) == ERROR_TOO_MANY_OPEN_FILES: @@ -1173,7 +1208,8 @@ else: result = (err == OSErrorCode(ECONNRESET)) or (err == OSErrorCode(EPIPE)) - proc writeStreamLoop(udata: pointer) {.gcsafe.} = + proc writeStreamLoop(udata: pointer) = + # TODO fix Defect raises - they "shouldn't" happen var cdata = cast[ptr CompletionData](udata) var transp = cast[StreamTransport](cdata.udata) let fd = SocketHandle(cdata.fd) @@ -1206,7 +1242,13 @@ else: if int(err) == EINTR: continue else: - transp.fd.removeWriter() + try: + transp.fd.removeWriter() + except IOSelectorsException as exc: + raiseAsDefect exc, "removeWriter" + except ValueError as exc: + raiseAsDefect exc, "removeWriter" + if isConnResetError(err): # Soft error happens which indicates that remote peer got # disconnected, complete all pending writes in queue with 0. @@ -1239,7 +1281,13 @@ else: if int(err) == EINTR: continue else: - transp.fd.removeWriter() + try: + transp.fd.removeWriter() + except IOSelectorsException as exc: + raiseAsDefect exc, "removeWriter" + except ValueError as exc: + raiseAsDefect exc, "removeWriter" + if isConnResetError(err): # Soft error happens which indicates that remote peer got # disconnected, complete all pending writes in queue with 0. @@ -1270,7 +1318,13 @@ else: if int(err) == EINTR: continue else: - transp.fd.removeWriter() + try: + transp.fd.removeWriter() + except IOSelectorsException as exc: + raiseAsDefect exc, "removeWriter" + except ValueError as exc: + raiseAsDefect exc, "removeWriter" + if isConnResetError(err): # Soft error happens which indicates that remote peer got # disconnected, complete all pending writes in queue with 0. @@ -1303,7 +1357,12 @@ else: if int(err) == EINTR: continue else: - transp.fd.removeWriter() + try: + transp.fd.removeWriter() + except IOSelectorsException as exc: + raiseAsDefect exc, "removeWriter" + except ValueError as exc: + raiseAsDefect exc, "removeWriter" if isConnResetError(err): # Soft error happens which indicates that remote peer got # disconnected, complete all pending writes in queue with 0. @@ -1320,9 +1379,15 @@ else: break else: transp.state.incl(WritePaused) - transp.fd.removeWriter() + try: + transp.fd.removeWriter() + except IOSelectorsException as exc: + raiseAsDefect exc, "removeWriter" + except ValueError as exc: + raiseAsDefect exc, "removeWriter" - proc readStreamLoop(udata: pointer) {.gcsafe.} = + proc readStreamLoop(udata: pointer) = + # TODO fix Defect raises - they "shouldn't" happen var cdata = cast[ptr CompletionData](udata) var transp = cast[StreamTransport](cdata.udata) let fd = SocketHandle(cdata.fd) @@ -1345,19 +1410,39 @@ else: continue elif int(err) in {ECONNRESET}: transp.state.incl({ReadEof, ReadPaused}) - cdata.fd.removeReader() + try: + cdata.fd.removeReader() + except IOSelectorsException as exc: + raiseAsDefect exc, "removeReader" + except ValueError as exc: + raiseAsDefect exc, "removeReader" else: transp.state.incl(ReadPaused) transp.setReadError(err) - cdata.fd.removeReader() + try: + cdata.fd.removeReader() + except IOSelectorsException as exc: + raiseAsDefect exc, "removeReader" + except ValueError as exc: + raiseAsDefect exc, "removeReader" elif res == 0: transp.state.incl({ReadEof, ReadPaused}) - cdata.fd.removeReader() + try: + cdata.fd.removeReader() + except IOSelectorsException as exc: + raiseAsDefect exc, "removeReader" + except ValueError as exc: + raiseAsDefect exc, "removeReader" else: transp.offset += res if transp.offset == len(transp.buffer): transp.state.incl(ReadPaused) - cdata.fd.removeReader() + try: + cdata.fd.removeReader() + except IOSelectorsException as exc: + raiseAsDefect exc, "removeReader" + except ValueError as exc: + raiseAsDefect exc, "removeReader" transp.completeReader() break elif transp.kind == TransportKind.Pipe: @@ -1371,15 +1456,30 @@ else: else: transp.state.incl(ReadPaused) transp.setReadError(err) - cdata.fd.removeReader() + try: + cdata.fd.removeReader() + except IOSelectorsException as exc: + raiseAsDefect exc, "removeReader" + except ValueError as exc: + raiseAsDefect exc, "removeReader" elif res == 0: transp.state.incl({ReadEof, ReadPaused}) - cdata.fd.removeReader() + try: + cdata.fd.removeReader() + except IOSelectorsException as exc: + raiseAsDefect exc, "removeReader" + except ValueError as exc: + raiseAsDefect exc, "removeReader" else: transp.offset += res if transp.offset == len(transp.buffer): transp.state.incl(ReadPaused) - cdata.fd.removeReader() + try: + cdata.fd.removeReader() + except IOSelectorsException as exc: + raiseAsDefect exc, "removeReader" + except ValueError as exc: + raiseAsDefect exc, "removeReader" transp.completeReader() break @@ -1424,7 +1524,6 @@ else: var saddr: Sockaddr_storage slen: SockLen - sock: AsyncFD proto: Protocol var retFuture = newFuture[StreamTransport]("stream.transport.connect") address.toSAddr(saddr, slen) @@ -1433,8 +1532,13 @@ else: # `Protocol` enum is missing `0` value, so we making here cast, until # `Protocol` enum will not support IPPROTO_IP == 0. proto = cast[Protocol](0) - sock = createAsyncSocket(address.getDomain(), SockType.SOCK_STREAM, - proto) + + let sock = try: createAsyncSocket(address.getDomain(), SockType.SOCK_STREAM, + proto) + except CatchableError as exc: + retFuture.fail(exc) + return retFuture + if sock == asyncInvalidSocket: let err = osLastError() if int(err) == EMFILE: @@ -1443,12 +1547,20 @@ else: retFuture.fail(getTransportOsError(err)) return retFuture - proc continuation(udata: pointer) {.gcsafe.} = + proc continuation(udata: pointer) = if not(retFuture.finished()): var data = cast[ptr CompletionData](udata) var err = 0 let fd = data.fd - fd.removeWriter() + try: + fd.removeWriter() + except IOSelectorsException as exc: + retFuture.fail(exc) + return + except ValueError as exc: + retFuture.fail(exc) + return + if not fd.getSocketError(err): closeSocket(fd) retFuture.fail(getTransportOsError(osLastError())) @@ -1462,7 +1574,7 @@ else: trackStream(transp) retFuture.complete(transp) - proc cancel(udata: pointer) {.gcsafe.} = + proc cancel(udata: pointer) = closeSocket(sock) while true: @@ -1483,11 +1595,18 @@ else: # # http://www.madore.org/~david/computers/connect-intr.html if int(err) == EINPROGRESS or int(err) == EINTR: - sock.addWriter(continuation) + try: + sock.addWriter(continuation) + except CatchableError as exc: + closeSocket(sock) + retFuture.fail(exc) + return retFuture + retFuture.cancelCallback = cancel break else: sock.closeSocket() + retFuture.fail(getTransportOsError(err)) break return retFuture @@ -1504,7 +1623,9 @@ else: let res = posix.accept(SocketHandle(server.sock), cast[ptr SockAddr](addr saddr), addr slen) if int(res) > 0: - let sock = wrapAsyncSocket(res) + let sock = try: wrapAsyncSocket(res) + except CatchableError as exc: + raiseAsDefect exc, "wrapAsyncSocket" if sock != asyncInvalidSocket: var ntransp: StreamTransport if not isNil(server.init): @@ -1526,23 +1647,37 @@ else: break else: ## Critical unrecoverable error - raiseTransportOsError(err) + raiseAssert $err - proc resumeAccept(server: StreamServer) = + proc resumeAccept(server: StreamServer) {. + raises: [Defect, IOSelectorsException, ValueError].} = addReader(server.sock, acceptLoop, cast[pointer](server)) - proc pauseAccept(server: StreamServer) = + proc pauseAccept(server: StreamServer) {. + raises: [Defect, IOSelectorsException, ValueError].} = removeReader(server.sock) proc resumeRead(transp: StreamTransport) {.inline.} = if ReadPaused in transp.state: transp.state.excl(ReadPaused) - addReader(transp.fd, readStreamLoop, cast[pointer](transp)) + # TODO reset flag on exception?? + try: + addReader(transp.fd, readStreamLoop, cast[pointer](transp)) + except IOSelectorsException as exc: + raiseAsDefect exc, "addReader" + except ValueError as exc: + raiseAsDefect exc, "addReader" proc resumeWrite(transp: StreamTransport) {.inline.} = if WritePaused in transp.state: transp.state.excl(WritePaused) - addWriter(transp.fd, writeStreamLoop, cast[pointer](transp)) + # TODO reset flag on exception?? + try: + addWriter(transp.fd, writeStreamLoop, cast[pointer](transp)) + except IOSelectorsException as exc: + raiseAsDefect exc, "addWriter" + except ValueError as exc: + raiseAsDefect exc, "addWriter" proc accept*(server: StreamServer): Future[StreamTransport] = var retFuture = newFuture[StreamTransport]("stream.server.accept") @@ -1565,7 +1700,12 @@ else: let res = posix.accept(SocketHandle(server.sock), cast[ptr SockAddr](addr saddr), addr slen) if int(res) > 0: - let sock = wrapAsyncSocket(res) + let sock = try: wrapAsyncSocket(res) + except CatchableError as exc: + close(res) + retFuture.fail(exc) + return + if sock != asyncInvalidSocket: var ntransp: StreamTransport if not isNil(server.init): @@ -1592,23 +1732,41 @@ else: else: retFuture.fail(getTransportOsError(err)) break - removeReader(server.sock) + try: + removeReader(server.sock) + except IOSelectorsException as exc: + raiseAsDefect exc, "removeReader" + except ValueError as exc: + raiseAsDefect exc, "removeReader" - proc cancellation(udata: pointer) {.gcsafe.} = - removeReader(server.sock) + proc cancellation(udata: pointer) = + try: + removeReader(server.sock) + except IOSelectorsException as exc: + raiseAsDefect exc, "removeReader" + except ValueError as exc: + raiseAsDefect exc, "removeReader" + + try: + addReader(server.sock, continuation, nil) + except IOSelectorsException as exc: + raiseAsDefect exc, "addReader" + except ValueError as exc: + raiseAsDefect exc, "addReader" - addReader(server.sock, continuation, nil) retFuture.cancelCallback = cancellation return retFuture -proc start*(server: StreamServer) = +proc start*(server: StreamServer) {. + raises: [Defect, IOSelectorsException, ValueError].} = ## Starts ``server``. doAssert(not(isNil(server.function))) if server.status == ServerStatus.Starting: server.resumeAccept() server.status = ServerStatus.Running -proc stop*(server: StreamServer) = +proc stop*(server: StreamServer) {. + raises: [Defect, IOSelectorsException, ValueError].} = ## Stops ``server``. if server.status == ServerStatus.Running: server.pauseAccept() @@ -1620,10 +1778,10 @@ proc join*(server: StreamServer): Future[void] = ## Waits until ``server`` is not closed. var retFuture = newFuture[void]("stream.transport.server.join") - proc continuation(udata: pointer) {.gcsafe.} = + proc continuation(udata: pointer) = retFuture.complete() - proc cancel(udata: pointer) {.gcsafe.} = + proc cancel(udata: pointer) = server.loopFuture.removeCallback(continuation, cast[pointer](retFuture)) if not(server.loopFuture.finished()): @@ -1638,7 +1796,7 @@ proc close*(server: StreamServer) = ## ## Please note that release of resources is not completed immediately, to be ## sure all resources got released please use ``await server.join()``. - proc continuation(udata: pointer) {.gcsafe.} = + proc continuation(udata: pointer) = # Stop tracking server if not(server.loopFuture.finished()): server.clean() @@ -1680,7 +1838,8 @@ proc createStreamServer*(host: TransportAddress, bufferSize: int = DefaultStreamBufferSize, child: StreamServer = nil, init: TransportInitCallback = nil, - udata: pointer = nil): StreamServer = + udata: pointer = nil): StreamServer {. + raises: [Defect, CatchableError].} = ## Create new TCP stream server. ## ## ``host`` - address to which server will be bound. @@ -1707,6 +1866,7 @@ proc createStreamServer*(host: TransportAddress, serverSocket = createAsyncSocket(host.getDomain(), SockType.SOCK_STREAM, Protocol.IPPROTO_TCP) + if serverSocket == asyncInvalidSocket: raiseTransportOsError(osLastError()) else: @@ -1770,6 +1930,7 @@ proc createStreamServer*(host: TransportAddress, if not setSocketBlocking(SocketHandle(sock), false): raiseTransportOsError(osLastError()) register(sock) + serverSocket = sock if host.family in {AddressFamily.IPv4, AddressFamily.IPv6}: @@ -1869,7 +2030,8 @@ proc createStreamServer*(host: TransportAddress, bufferSize: int = DefaultStreamBufferSize, child: StreamServer = nil, init: TransportInitCallback = nil, - udata: pointer = nil): StreamServer = + udata: pointer = nil): StreamServer {. + raises: [Defect, CatchableError].} = result = createStreamServer(host, nil, flags, sock, backlog, bufferSize, child, init, cast[pointer](udata)) @@ -1881,7 +2043,8 @@ proc createStreamServer*[T](host: TransportAddress, backlog: int = 100, bufferSize: int = DefaultStreamBufferSize, child: StreamServer = nil, - init: TransportInitCallback = nil): StreamServer = + init: TransportInitCallback = nil): StreamServer {. + raises: [Defect, CatchableError].} = var fflags = flags + {GCUserData} GC_ref(udata) result = createStreamServer(host, cbproc, fflags, sock, backlog, bufferSize, @@ -1894,7 +2057,8 @@ proc createStreamServer*[T](host: TransportAddress, backlog: int = 100, bufferSize: int = DefaultStreamBufferSize, child: StreamServer = nil, - init: TransportInitCallback = nil): StreamServer = + init: TransportInitCallback = nil): StreamServer {. + raises: [Defect, CatchableError].} = var fflags = flags + {GCUserData} GC_ref(udata) result = createStreamServer(host, nil, fflags, sock, backlog, bufferSize, @@ -1959,10 +2123,12 @@ proc writeFile*(transp: StreamTransport, handle: int, ## ## You can specify starting ``offset`` in opened file and number of bytes ## to transfer from file to transport via ``size``. + var retFuture = newFuture[int]("stream.transport.writeFile") when defined(windows): if transp.kind != TransportKind.Socket: - raise newException(TransportNoSupport, "writeFile() is not supported!") - var retFuture = newFuture[int]("stream.transport.writeFile") + retFuture.fail(newException( + TransportNoSupport, "writeFile() is not supported!")) + return retFuture transp.checkClosed(retFuture) transp.checkWriteEof(retFuture) var vector = StreamVector(kind: DataFile, writer: retFuture, @@ -2295,11 +2461,13 @@ proc closed*(transp: StreamTransport): bool {.inline.} = result = ({ReadClosed, WriteClosed} * transp.state != {}) proc fromPipe*(fd: AsyncFD, child: StreamTransport = nil, - bufferSize = DefaultStreamBufferSize): StreamTransport = + bufferSize = DefaultStreamBufferSize): StreamTransport {. + raises: [Defect, CatchableError].} = ## Create new transport object using pipe's file descriptor. ## ## ``bufferSize`` is size of internal buffer for transport. register(fd) + result = newStreamPipeTransport(fd, bufferSize, child) # Start tracking transport trackStream(result) diff --git a/tests/testaddress.nim b/tests/testaddress.nim index 5348e727..a1a588ec 100644 --- a/tests/testaddress.nim +++ b/tests/testaddress.nim @@ -5,7 +5,7 @@ # Licensed under either of # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) -import unittest +import unittest2 import ../chronos when defined(nimHasUsed): {.used.} diff --git a/tests/testasyncstream.nim b/tests/testasyncstream.nim index 12b445f0..7a4f2112 100644 --- a/tests/testasyncstream.nim +++ b/tests/testasyncstream.nim @@ -5,7 +5,7 @@ # Licensed under either of # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) -import unittest +import unittest2 import ../chronos import ../chronos/streams/[tlsstream, chunkstream, boundstream] diff --git a/tests/testbugs.nim b/tests/testbugs.nim index 7fc9a9c2..0d9f7474 100644 --- a/tests/testbugs.nim +++ b/tests/testbugs.nim @@ -5,7 +5,7 @@ # Licensed under either of # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) -import unittest +import unittest2 import ../chronos when defined(nimHasUsed): {.used.} diff --git a/tests/testdatagram.nim b/tests/testdatagram.nim index d6557f2e..c353cce0 100644 --- a/tests/testdatagram.nim +++ b/tests/testdatagram.nim @@ -5,7 +5,8 @@ # Licensed under either of # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) -import strutils, net, unittest +import std/[strutils, net] +import unittest2 import ../chronos when defined(nimHasUsed): {.used.} diff --git a/tests/testfut.nim b/tests/testfut.nim index 2d903d79..9e48432b 100644 --- a/tests/testfut.nim +++ b/tests/testfut.nim @@ -5,7 +5,7 @@ # Licensed under either of # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) -import unittest +import unittest2 import ../chronos when defined(nimHasUsed): {.used.} diff --git a/tests/testhttpserver.nim b/tests/testhttpserver.nim index 5cd20d8c..86fd4f12 100644 --- a/tests/testhttpserver.nim +++ b/tests/testhttpserver.nim @@ -5,7 +5,8 @@ # Licensed under either of # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) -import std/[strutils, unittest, algorithm, strutils] +import std/[strutils, algorithm, strutils] +import unittest2 import ../chronos, ../chronos/apps/http/httpserver import stew/base10 diff --git a/tests/testmacro.nim b/tests/testmacro.nim index fc9ca196..c18b37ee 100644 --- a/tests/testmacro.nim +++ b/tests/testmacro.nim @@ -5,7 +5,7 @@ # Licensed under either of # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) -import unittest +import unittest2 import ../chronos when defined(nimHasUsed): {.used.} diff --git a/tests/testnet.nim b/tests/testnet.nim index b0bf5fa1..31c3d66c 100644 --- a/tests/testnet.nim +++ b/tests/testnet.nim @@ -5,7 +5,7 @@ # Licensed under either of # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) -import unittest +import unittest2 import ../chronos/transports/[osnet, ipnet] when defined(nimHasUsed): {.used.} diff --git a/tests/testserver.nim b/tests/testserver.nim index d079648b..8828bb22 100644 --- a/tests/testserver.nim +++ b/tests/testserver.nim @@ -5,7 +5,7 @@ # Licensed under either of # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) -import unittest +import unittest2 import ../chronos when defined(nimHasUsed): {.used.} diff --git a/tests/testshttpserver.nim b/tests/testshttpserver.nim index aedc19ac..9e563b5c 100644 --- a/tests/testshttpserver.nim +++ b/tests/testshttpserver.nim @@ -5,7 +5,8 @@ # Licensed under either of # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) -import std/[strutils, unittest, strutils] +import std/[strutils, strutils] +import unittest2 import ../chronos, ../chronos/apps/http/shttpserver import stew/base10 diff --git a/tests/testsignal.nim b/tests/testsignal.nim index 027f6193..ee32f8ac 100644 --- a/tests/testsignal.nim +++ b/tests/testsignal.nim @@ -5,7 +5,7 @@ # Licensed under either of # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) -import unittest +import unittest2 import ../chronos when defined(nimHasUsed): {.used.} @@ -20,13 +20,19 @@ suite "Signal handling test suite": proc signalProc(udata: pointer) = var cdata = cast[ptr CompletionData](udata) signalCounter = cast[int](cdata.udata) - removeSignal(int(cdata.fd)) + try: + removeSignal(int(cdata.fd)) + except Exception as exc: + raiseAssert exc.msg proc asyncProc() {.async.} = await sleepAsync(500.milliseconds) proc test(signal, value: int): bool = - discard addSignal(signal, signalProc, cast[pointer](value)) + try: + discard addSignal(signal, signalProc, cast[pointer](value)) + except Exception as exc: + raiseAssert exc.msg var fut = asyncProc() discard posix.kill(posix.getpid(), cint(signal)) waitFor(fut) diff --git a/tests/testsoon.nim b/tests/testsoon.nim index dfa712b3..b2ea84a0 100644 --- a/tests/testsoon.nim +++ b/tests/testsoon.nim @@ -5,7 +5,7 @@ # Licensed under either of # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) -import unittest +import unittest2 import ../chronos when defined(nimHasUsed): {.used.} @@ -46,9 +46,11 @@ suite "callSoon() tests suite": await sleepAsync(100.milliseconds) timeoutsTest1 += 1 - proc callbackProc(udata: pointer) {.gcsafe.} = + var callbackproc: proc(udata: pointer) {.gcsafe, raises: [Defect].} + callbackproc = proc (udata: pointer) {.gcsafe, raises: [Defect].} = timeoutsTest2 += 1 - callSoon(callbackProc) + {.gcsafe.}: + callSoon(callbackProc) proc test2(timers, callbacks: var int) = callSoon(callbackProc) diff --git a/tests/teststream.nim b/tests/teststream.nim index 7d20a53b..00ffb37f 100644 --- a/tests/teststream.nim +++ b/tests/teststream.nim @@ -5,7 +5,8 @@ # Licensed under either of # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) -import strutils, unittest, os +import std/[strutils, os] +import unittest2 import ../chronos when defined(nimHasUsed): {.used.} @@ -866,7 +867,7 @@ suite "Stream Transport test suite": var valueLen = 0'u32 res: seq[byte] - error: ref Exception + error: ref CatchableError proc predicate(data: openarray[byte]): tuple[consumed: int, done: bool] = if len(data) == 0: diff --git a/tests/testsync.nim b/tests/testsync.nim index 5bb7acc2..75ccd120 100644 --- a/tests/testsync.nim +++ b/tests/testsync.nim @@ -5,7 +5,7 @@ # Licensed under either of # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) -import unittest +import unittest2 import ../chronos when defined(nimHasUsed): {.used.} diff --git a/tests/testtime.nim b/tests/testtime.nim index 3a659e52..8a1f84d7 100644 --- a/tests/testtime.nim +++ b/tests/testtime.nim @@ -5,7 +5,8 @@ # Licensed under either of # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) -import os, unittest +import std/os +import unittest2 import ../chronos, ../chronos/timer when defined(nimHasUsed): {.used.} diff --git a/tests/testutils.nim b/tests/testutils.nim index 6d8ff8af..79e55ec7 100644 --- a/tests/testutils.nim +++ b/tests/testutils.nim @@ -5,7 +5,7 @@ # Licensed under either of # Apache License, version 2.0, (LICENSE-APACHEv2) # MIT license (LICENSE-MIT) -import unittest +import unittest2 import ../chronos when defined(nimHasUsed): {.used.}