From 8cf22775e9036028e259a309e76b86dcc51c8fc1 Mon Sep 17 00:00:00 2001 From: benbierens Date: Fri, 7 Jul 2023 07:49:30 +0200 Subject: [PATCH] very experimental attempt to find threshold breach using chronos modification --- codex/rest/api.nim | 24 + docker/asyncfutures2.nim | 1041 +++++++++++++++++++++++++++++++ docker/asyncloop.nim | 1273 ++++++++++++++++++++++++++++++++++++++ docker/codex.Dockerfile | 4 +- 4 files changed, 2341 insertions(+), 1 deletion(-) create mode 100644 docker/asyncfutures2.nim create mode 100644 docker/asyncloop.nim diff --git a/codex/rest/api.nim b/codex/rest/api.nim index 84edfd3f..938c59c4 100644 --- a/codex/rest/api.nim +++ b/codex/rest/api.nim @@ -337,6 +337,30 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf, loopMeasure: LoopMeasure) trace "debug/peer returning peer record" return RestApiResponse.response($json) + when defined(chronosDurationThreshold): + var breaches = newSeq[string]() + proc onBreach(stackTrace: string, durationUs: int64) = + error "Duration threshold breached", durationUs, stackTrace + breaches.add($durationUs & " usecs at " & stackTrace) + + setChronosDurationThresholdBreachedHandler(onBreach) + + router.api( + MethodGet, + "/api/codex/v1/debug/loop") do () -> RestApiResponse: + + let jarray = newJArray() + for entry in breaches: + jarray.add(%*entry) + # "entry": entry + # }) + + let jobj = %*{ + "breaches": jarray + } + + return RestApiResponse.response($jobj) + router.api( MethodGet, "/api/codex/v1/sales/slots") do () -> RestApiResponse: diff --git a/docker/asyncfutures2.nim b/docker/asyncfutures2.nim new file mode 100644 index 00000000..4fcbb7ee --- /dev/null +++ b/docker/asyncfutures2.nim @@ -0,0 +1,1041 @@ +# +# Chronos +# +# (c) Copyright 2015 Dominik Picheta +# (c) Copyright 2018-2021 Status Research & Development GmbH +# +# Licensed under either of +# Apache License, version 2.0, (LICENSE-APACHEv2) +# MIT license (LICENSE-MIT) + +import std/[os, tables, strutils, heapqueue, deques, sequtils] +import stew/base10 +import ./srcloc +export srcloc + +when defined(nimHasStacktracesModule): + import system/stacktraces +else: + const + reraisedFromBegin = -10 + reraisedFromEnd = -100 + +const + LocCreateIndex* = 0 + LocCompleteIndex* = 1 + +when defined(chronosStackTrace): + type StackTrace = string + +type + FutureState* {.pure.} = enum + Pending, Finished, Cancelled, Failed + + FutureBase* = ref object of RootObj ## Untyped future. + location*: array[2, ptr SrcLoc] + callbacks: seq[AsyncCallback] + cancelcb*: CallbackFunc + child*: FutureBase + state*: FutureState + error*: ref CatchableError ## Stored exception + mustCancel*: bool + id*: uint + + when defined(chronosStackTrace): + errorStackTrace*: StackTrace + stackTrace: StackTrace ## For debugging purposes only. + + when defined(chronosFutureTracking): + next*: FutureBase + prev*: FutureBase + + # ZAH: we have discussed some possible optimizations where + # the future can be stored within the caller's stack frame. + # How much refactoring is needed to make this a regular non-ref type? + # Obviously, it will still be allocated on the heap when necessary. + Future*[T] = ref object of FutureBase ## Typed future. + when defined(chronosStrictException): + closure*: iterator(f: Future[T]): FutureBase {.raises: [Defect, CatchableError], gcsafe.} + else: + closure*: iterator(f: Future[T]): FutureBase {.raises: [Defect, CatchableError, Exception], gcsafe.} + value: T ## Stored value + + FutureStr*[T] = ref object of Future[T] + ## Future to hold GC strings + gcholder*: string + + FutureSeq*[A, B] = ref object of Future[A] + ## Future to hold GC seqs + gcholder*: seq[B] + + FutureDefect* = object of Defect + cause*: FutureBase + + FutureError* = object of CatchableError + + CancelledError* = object of FutureError + + FutureList* = object + head*: FutureBase + tail*: FutureBase + count*: uint + +var currentID* {.threadvar.}: uint +currentID = 0'u + +when defined(chronosFutureTracking): + var futureList* {.threadvar.}: FutureList + futureList = FutureList() + +template setupFutureBase(loc: ptr SrcLoc) = + new(result) + currentID.inc() + result.state = FutureState.Pending + when defined(chronosStackTrace): + result.stackTrace = getStackTrace() + result.id = currentID + result.location[LocCreateIndex] = loc + + when defined(chronosFutureTracking): + result.next = nil + result.prev = futureList.tail + if not(isNil(futureList.tail)): + futureList.tail.next = result + futureList.tail = result + if isNil(futureList.head): + futureList.head = result + futureList.count.inc() + +proc newFutureImpl[T](loc: ptr SrcLoc): Future[T] = + setupFutureBase(loc) + +proc newFutureSeqImpl[A, B](loc: ptr SrcLoc): FutureSeq[A, B] = + setupFutureBase(loc) + +proc newFutureStrImpl[T](loc: ptr SrcLoc): FutureStr[T] = + setupFutureBase(loc) + +template newFuture*[T](fromProc: static[string] = ""): Future[T] = + ## Creates a new future. + ## + ## Specifying ``fromProc``, which is a string specifying the name of the proc + ## that this future belongs to, is a good habit as it helps with debugging. + newFutureImpl[T](getSrcLocation(fromProc)) + +template newFutureSeq*[A, B](fromProc: static[string] = ""): FutureSeq[A, B] = + ## Create a new future which can hold/preserve GC sequence until future will + ## not be completed. + ## + ## Specifying ``fromProc``, which is a string specifying the name of the proc + ## that this future belongs to, is a good habit as it helps with debugging. + newFutureSeqImpl[A, B](getSrcLocation(fromProc)) + +template newFutureStr*[T](fromProc: static[string] = ""): FutureStr[T] = + ## Create a new future which can hold/preserve GC string until future will + ## not be completed. + ## + ## Specifying ``fromProc``, which is a string specifying the name of the proc + ## that this future belongs to, is a good habit as it helps with debugging. + newFutureStrImpl[T](getSrcLocation(fromProc)) + +proc finished*(future: FutureBase): bool {.inline.} = + ## Determines whether ``future`` has completed, i.e. ``future`` state changed + ## from state ``Pending`` to one of the states (``Finished``, ``Cancelled``, + ## ``Failed``). + result = (future.state != FutureState.Pending) + +proc cancelled*(future: FutureBase): bool {.inline.} = + ## Determines whether ``future`` has cancelled. + (future.state == FutureState.Cancelled) + +proc failed*(future: FutureBase): bool {.inline.} = + ## Determines whether ``future`` completed with an error. + (future.state == FutureState.Failed) + +proc completed*(future: FutureBase): bool {.inline.} = + ## Determines whether ``future`` completed without an error. + (future.state == FutureState.Finished) + +proc done*(future: FutureBase): bool {.inline.} = + ## This is an alias for ``completed(future)`` procedure. + completed(future) + +when defined(chronosFutureTracking): + proc futureDestructor(udata: pointer) = + ## This procedure will be called when Future[T] got finished, cancelled or + ## failed and all Future[T].callbacks are already scheduled and processed. + let future = cast[FutureBase](udata) + if future == futureList.tail: futureList.tail = future.prev + if future == futureList.head: futureList.head = future.next + if not(isNil(future.next)): future.next.prev = future.prev + if not(isNil(future.prev)): future.prev.next = future.next + futureList.count.dec() + + proc scheduleDestructor(future: FutureBase) {.inline.} = + callSoon(futureDestructor, cast[pointer](future)) + +proc checkFinished(future: FutureBase, loc: ptr SrcLoc) = + ## Checks whether `future` is finished. If it is then raises a + ## ``FutureDefect``. + if future.finished(): + var msg = "" + msg.add("An attempt was made to complete a Future more than once. ") + msg.add("Details:") + msg.add("\n Future ID: " & Base10.toString(future.id)) + msg.add("\n Creation location:") + msg.add("\n " & $future.location[LocCreateIndex]) + msg.add("\n First completion location:") + msg.add("\n " & $future.location[LocCompleteIndex]) + msg.add("\n Second completion location:") + msg.add("\n " & $loc) + when defined(chronosStackTrace): + msg.add("\n Stack trace to moment of creation:") + msg.add("\n" & indent(future.stackTrace.strip(), 4)) + msg.add("\n Stack trace to moment of secondary completion:") + msg.add("\n" & indent(getStackTrace().strip(), 4)) + msg.add("\n\n") + var err = newException(FutureDefect, msg) + err.cause = future + raise err + else: + future.location[LocCompleteIndex] = loc + +proc finish(fut: FutureBase, state: FutureState) = + # We do not perform any checks here, because: + # 1. `finish()` is a private procedure and `state` is under our control. + # 2. `fut.state` is checked by `checkFinished()`. + fut.state = state + fut.cancelcb = nil # release cancellation callback memory + for item in fut.callbacks.mitems(): + if not(isNil(item.function)): + callSoon(item) + item = default(AsyncCallback) # release memory as early as possible + fut.callbacks = default(seq[AsyncCallback]) # release seq as well + + when defined(chronosFutureTracking): + scheduleDestructor(fut) + +proc complete[T](future: Future[T], val: T, loc: ptr SrcLoc) = + if not(future.cancelled()): + checkFinished(FutureBase(future), loc) + doAssert(isNil(future.error)) + future.value = val + future.finish(FutureState.Finished) + +template complete*[T](future: Future[T], val: T) = + ## Completes ``future`` with value ``val``. + complete(future, val, getSrcLocation()) + +proc complete(future: Future[void], loc: ptr SrcLoc) = + if not(future.cancelled()): + checkFinished(FutureBase(future), loc) + doAssert(isNil(future.error)) + future.finish(FutureState.Finished) + +template complete*(future: Future[void]) = + ## Completes a void ``future``. + complete(future, getSrcLocation()) + +proc fail[T](future: Future[T], error: ref CatchableError, loc: ptr SrcLoc) = + if not(future.cancelled()): + checkFinished(FutureBase(future), loc) + future.error = error + when defined(chronosStackTrace): + future.errorStackTrace = if getStackTrace(error) == "": + getStackTrace() + else: + getStackTrace(error) + future.finish(FutureState.Failed) + +template fail*[T](future: Future[T], error: ref CatchableError) = + ## Completes ``future`` with ``error``. + fail(future, error, getSrcLocation()) + +template newCancelledError(): ref CancelledError = + (ref CancelledError)(msg: "Future operation cancelled!") + +proc cancelAndSchedule(future: FutureBase, loc: ptr SrcLoc) = + if not(future.finished()): + checkFinished(future, loc) + future.error = newCancelledError() + when defined(chronosStackTrace): + future.errorStackTrace = getStackTrace() + future.finish(FutureState.Cancelled) + +template cancelAndSchedule*[T](future: Future[T]) = + cancelAndSchedule(FutureBase(future), getSrcLocation()) + +proc cancel(future: FutureBase, loc: ptr SrcLoc): bool = + ## Request that Future ``future`` cancel itself. + ## + ## This arranges for a `CancelledError` to be thrown into procedure which + ## waits for ``future`` on the next cycle through the event loop. + ## The procedure then has a chance to clean up or even deny the request + ## using `try/except/finally`. + ## + ## This call do not guarantee that the ``future`` will be cancelled: the + ## exception might be caught and acted upon, delaying cancellation of the + ## ``future`` or preventing cancellation completely. The ``future`` may also + ## return value or raise different exception. + ## + ## Immediately after this procedure is called, ``future.cancelled()`` will + ## not return ``true`` (unless the Future was already cancelled). + if future.finished(): + return false + + if not(isNil(future.child)): + if cancel(future.child, getSrcLocation()): + return true + else: + if not(isNil(future.cancelcb)): + future.cancelcb(cast[pointer](future)) + future.cancelcb = nil + cancelAndSchedule(future, getSrcLocation()) + + future.mustCancel = true + return true + +template cancel*(future: FutureBase) = + ## Cancel ``future``. + discard cancel(future, getSrcLocation()) + +template cancel*[T](future: Future[T]) = + ## Cancel ``future``. + discard cancel(FutureBase(future), getSrcLocation()) + +proc clearCallbacks(future: FutureBase) = + future.callbacks = default(seq[AsyncCallback]) + +proc addCallback*(future: FutureBase, cb: CallbackFunc, udata: pointer = nil) = + ## Adds the callbacks proc to be called when the future completes. + ## + ## If future has already completed then ``cb`` will be called immediately. + doAssert(not isNil(cb)) + if future.finished(): + callSoon(cb, udata) + else: + var acb = AsyncCallback(function: cb, udata: udata) + when defined(chronosDurationThreshold): + acb.stackTrace = getStackTrace() + + future.callbacks.add acb + +proc addCallback*[T](future: Future[T], cb: CallbackFunc) = + ## Adds the callbacks proc to be called when the future completes. + ## + ## If future has already completed then ``cb`` will be called immediately. + future.addCallback(cb, cast[pointer](future)) + +proc removeCallback*(future: FutureBase, cb: CallbackFunc, + udata: pointer = nil) = + ## Remove future from list of callbacks - this operation may be slow if there + ## are many registered callbacks! + doAssert(not isNil(cb)) + # Make sure to release memory associated with callback, or reference chains + # may be created! + future.callbacks.keepItIf: + it.function != cb or it.udata != udata + +proc removeCallback*[T](future: Future[T], cb: CallbackFunc) = + future.removeCallback(cb, cast[pointer](future)) + +proc `callback=`*(future: FutureBase, cb: CallbackFunc, udata: pointer = nil) = + ## Clears the list of callbacks and sets the callback proc to be called when + ## the future completes. + ## + ## If future has already completed then ``cb`` will be called immediately. + ## + ## It's recommended to use ``addCallback`` or ``then`` instead. + # ZAH: how about `setLen(1); callbacks[0] = cb` + future.clearCallbacks + future.addCallback(cb, udata) + +proc `callback=`*[T](future: Future[T], cb: CallbackFunc) = + ## Sets the callback proc to be called when the future completes. + ## + ## If future has already completed then ``cb`` will be called immediately. + `callback=`(future, cb, cast[pointer](future)) + +proc `cancelCallback=`*[T](future: Future[T], cb: CallbackFunc) = + ## Sets the callback procedure to be called when the future is cancelled. + ## + ## This callback will be called immediately as ``future.cancel()`` invoked. + future.cancelcb = cb + +{.push stackTrace: off.} +proc internalContinue[T](fut: pointer) {.gcsafe, raises: [Defect].} + +proc futureContinue*[T](fut: Future[T]) {.gcsafe, raises: [Defect].} = + # Used internally by async transformation + try: + if not(fut.closure.finished()): + var next = fut.closure(fut) + # Continue while the yielded future is already finished. + while (not next.isNil()) and next.finished(): + next = fut.closure(fut) + if fut.closure.finished(): + break + + if fut.closure.finished(): + fut.closure = nil + if next == nil: + if not(fut.finished()): + raiseAssert "Async procedure (" & ($fut.location[LocCreateIndex]) & ") yielded `nil`, " & + "are you await'ing a `nil` Future?" + else: + GC_ref(fut) + next.addCallback(internalContinue[T], cast[pointer](fut)) + except CancelledError: + fut.cancelAndSchedule() + except CatchableError as exc: + fut.fail(exc) + except Exception as exc: + if exc of Defect: + raise (ref Defect)(exc) + + fut.fail((ref ValueError)(msg: exc.msg, parent: exc)) + +proc internalContinue[T](fut: pointer) {.gcsafe, raises: [Defect].} = + let asFut = cast[Future[T]](fut) + GC_unref(asFut) + futureContinue(asFut) + +{.pop.} + +template getFilenameProcname(entry: StackTraceEntry): (string, string) = + when compiles(entry.filenameStr) and compiles(entry.procnameStr): + # We can't rely on "entry.filename" and "entry.procname" still being valid + # cstring pointers, because the "string.data" buffers they pointed to might + # be already garbage collected (this entry being a non-shallow copy, + # "entry.filename" no longer points to "entry.filenameStr.data", but to the + # buffer of the original object). + (entry.filenameStr, entry.procnameStr) + else: + ($entry.filename, $entry.procname) + +proc getHint(entry: StackTraceEntry): string = + ## We try to provide some hints about stack trace entries that the user + ## may not be familiar with, in particular calls inside the stdlib. + + let (filename, procname) = getFilenameProcname(entry) + + if procname == "processPendingCallbacks": + if cmpIgnoreStyle(filename, "asyncdispatch.nim") == 0: + return "Executes pending callbacks" + elif procname == "poll": + if cmpIgnoreStyle(filename, "asyncdispatch.nim") == 0: + return "Processes asynchronous completion events" + + if procname == "internalContinue": + if cmpIgnoreStyle(filename, "asyncfutures.nim") == 0: + return "Resumes an async procedure" + +proc `$`(stackTraceEntries: seq[StackTraceEntry]): string = + try: + when defined(nimStackTraceOverride) and declared(addDebuggingInfo): + let entries = addDebuggingInfo(stackTraceEntries) + else: + let entries = stackTraceEntries + + # Find longest filename & line number combo for alignment purposes. + var longestLeft = 0 + for entry in entries: + let (filename, procname) = getFilenameProcname(entry) + + if procname == "": continue + + let leftLen = filename.len + len($entry.line) + if leftLen > longestLeft: + longestLeft = leftLen + + var indent = 2 + # Format the entries. + for entry in entries: + let (filename, procname) = getFilenameProcname(entry) + + if procname == "": + if entry.line == reraisedFromBegin: + result.add(spaces(indent) & "#[\n") + indent.inc(2) + elif entry.line == reraisedFromEnd: + indent.dec(2) + result.add(spaces(indent) & "]#\n") + continue + + let left = "$#($#)" % [filename, $entry.line] + result.add((spaces(indent) & "$#$# $#\n") % [ + left, + spaces(longestLeft - left.len + 2), + procname + ]) + let hint = getHint(entry) + if hint.len > 0: + result.add(spaces(indent+2) & "## " & hint & "\n") + except ValueError as exc: + return exc.msg # Shouldn't actually happen since we set the formatting + # string + +when defined(chronosStackTrace): + proc injectStacktrace(future: FutureBase) = + const header = "\nAsync traceback:\n" + + var exceptionMsg = future.error.msg + if header in exceptionMsg: + # This is messy: extract the original exception message from the msg + # containing the async traceback. + let start = exceptionMsg.find(header) + exceptionMsg = exceptionMsg[0..`_. +## +## Limitations/Bugs +## ---------------- +## +## * The effect system (``raises: []``) does not work with async procedures. +## * Can't await in a ``except`` body +## * Forward declarations for async procs are broken, +## link includes workaround: https://github.com/nim-lang/Nim/issues/3182. + +# TODO: Check if yielded future is nil and throw a more meaningful exception + +const + unixPlatform = defined(macosx) or defined(freebsd) or + defined(netbsd) or defined(openbsd) or + defined(dragonfly) or defined(macos) or + defined(linux) or defined(android) or + defined(solaris) + MaxEventsCount* = 64 + +when defined(windows): + import winlean, sets, hashes +elif unixPlatform: + import ./selectors2 + from posix import EINTR, EAGAIN, EINPROGRESS, EWOULDBLOCK, MSG_PEEK, + MSG_NOSIGNAL + from posix import SIGHUP, SIGINT, SIGQUIT, SIGILL, SIGTRAP, SIGABRT, + SIGBUS, SIGFPE, SIGKILL, SIGUSR1, SIGSEGV, SIGUSR2, + SIGPIPE, SIGALRM, SIGTERM, SIGPIPE + export SIGHUP, SIGINT, SIGQUIT, SIGILL, SIGTRAP, SIGABRT, + SIGBUS, SIGFPE, SIGKILL, SIGUSR1, SIGSEGV, SIGUSR2, + SIGPIPE, SIGALRM, SIGTERM, SIGPIPE + +type + CallbackFunc* = proc (arg: pointer) {.gcsafe, raises: [Defect].} + + AsyncCallback* = object + function*: CallbackFunc + udata*: pointer + when defined(chronosDurationThreshold): + stackTrace*: string + + AsyncError* = object of CatchableError + ## Generic async exception + AsyncTimeoutError* = object of AsyncError + ## Timeout exception + + TimerCallback* = ref object + finishAt*: Moment + function*: AsyncCallback + + TrackerBase* = ref object of RootRef + id*: string + dump*: proc(): string {.gcsafe, raises: [Defect].} + isLeaked*: proc(): bool {.gcsafe, raises: [Defect].} + + PDispatcherBase = ref object of RootRef + timers*: HeapQueue[TimerCallback] + callbacks*: Deque[AsyncCallback] + idlers*: Deque[AsyncCallback] + trackers*: Table[string, TrackerBase] + +const chronosDurationThreshold {.intdefine.} = 0 + +when defined(chronosDurationThreshold): + type + ChronosDurationThreadholdBreachedCallback* = proc(stackTrace: string, durationUs: int64) {.gcsafe, raises: [].} + + proc defaultThresholdBreachedHandler(stackTrace: string, durationUs: int64) {.gcsafe, raises: [].} = + raise newException(Defect, "ChronosDurationThreshold breached. Time taken: " & $durationUs & " usecs - stackTrace: " & stackTrace) + + var chronosDurationThresholdBreachedHandler {.threadvar.} : ChronosDurationThreadholdBreachedCallback + + proc setChronosDurationThresholdBreachedHandler*(handler: ChronosDurationThreadholdBreachedCallback) = + chronosDurationThresholdBreachedHandler = handler + + proc invokeChronosDurationThresholdBreachedHandler(asyncCallback: AsyncCallback, durationUs: int64) = + if chronosDurationThresholdBreachedHandler == nil: + defaultThresholdBreachedHandler(asyncCallback.stackTrace, durationUs) + return + + chronosDurationThresholdBreachedHandler(asyncCallback.stackTrace, durationUs) + +proc sentinelCallbackImpl(arg: pointer) {.gcsafe, raises: [Defect].} = + raiseAssert "Sentinel callback MUST not be scheduled" + +const + SentinelCallback = AsyncCallback(function: sentinelCallbackImpl, + udata: nil) + +proc isSentinel(acb: AsyncCallback): bool {.raises: [Defect].} = + acb == SentinelCallback + +proc `<`(a, b: TimerCallback): bool = + result = a.finishAt < b.finishAt + +func getAsyncTimestamp*(a: Duration): auto {.inline.} = + ## Return rounded up value of duration with milliseconds resolution. + ## + ## This function also take care on int32 overflow, because Linux and Windows + ## accepts signed 32bit integer as timeout. + let milsec = Millisecond.nanoseconds() + let nansec = a.nanoseconds() + var res = nansec div milsec + let mid = nansec mod milsec + when defined(windows): + res = min(int64(high(int32) - 1), res) + result = cast[DWORD](res) + result += DWORD(min(1'i32, cast[int32](mid))) + else: + res = min(int64(high(int32) - 1), res) + result = cast[int32](res) + result += min(1, cast[int32](mid)) + +template processTimersGetTimeout(loop, timeout: untyped) = + var lastFinish = curTime + while loop.timers.len > 0: + if loop.timers[0].function.function.isNil: + discard loop.timers.pop() + continue + + lastFinish = loop.timers[0].finishAt + if curTime < lastFinish: + break + + loop.callbacks.addLast(loop.timers.pop().function) + + if loop.timers.len > 0: + timeout = (lastFinish - curTime).getAsyncTimestamp() + + if timeout == 0: + if (len(loop.callbacks) == 0) and (len(loop.idlers) == 0): + when defined(windows): + timeout = INFINITE + else: + timeout = -1 + else: + if (len(loop.callbacks) != 0) or (len(loop.idlers) != 0): + timeout = 0 + +template processTimers(loop: untyped) = + var curTime = Moment.now() + while loop.timers.len > 0: + if loop.timers[0].function.function.isNil: + discard loop.timers.pop() + continue + + if curTime < loop.timers[0].finishAt: + break + loop.callbacks.addLast(loop.timers.pop().function) + +template processIdlers(loop: untyped) = + if len(loop.idlers) > 0: + loop.callbacks.addLast(loop.idlers.popFirst()) + +template processCallbacks(loop: untyped) = + while true: + let callable = loop.callbacks.popFirst() # len must be > 0 due to sentinel + if isSentinel(callable): + break + if not(isNil(callable.function)): + when defined(chronosDurationThreshold): + let startTime = getMonoTime().ticks + + callable.function(callable.udata) + + when defined(chronosDurationThreshold): + let + durationNs = getMonoTime().ticks - startTime + durationUs = durationNs div 1000 + + if durationUs > chronosDurationThreshold: + invokeChronosDurationThresholdBreachedHandler(callable, durationUs) + +proc raiseAsDefect*(exc: ref Exception, msg: string) {. + raises: [Defect], noreturn, noinline.} = + # Reraise an exception as a Defect, where it's unexpected and can't be handled + # We include the stack trace in the message because otherwise, it's easily + # lost - Nim doesn't print it for `parent` exceptions for example (!) + raise (ref Defect)( + msg: msg & "\n" & exc.msg & "\n" & exc.getStackTrace(), parent: exc) + +when defined(windows): + type + WSAPROC_TRANSMITFILE = proc(hSocket: SocketHandle, hFile: Handle, + nNumberOfBytesToWrite: DWORD, + nNumberOfBytesPerSend: DWORD, + lpOverlapped: POVERLAPPED, + lpTransmitBuffers: pointer, + dwReserved: DWORD): cint {. + gcsafe, stdcall, raises: [].} + + LPFN_GETQUEUEDCOMPLETIONSTATUSEX = proc(completionPort: Handle, + lpPortEntries: ptr OVERLAPPED_ENTRY, + ulCount: DWORD, + ulEntriesRemoved: var ULONG, + dwMilliseconds: DWORD, + fAlertable: WINBOOL): WINBOOL {. + gcsafe, stdcall, raises: [].} + + CompletionKey = ULONG_PTR + + CompletionData* = object + cb*: CallbackFunc + errCode*: OSErrorCode + bytesCount*: int32 + udata*: pointer + + CustomOverlapped* = object of OVERLAPPED + data*: CompletionData + + OVERLAPPED_ENTRY* = object + lpCompletionKey*: ULONG_PTR + lpOverlapped*: ptr CustomOverlapped + internal: ULONG_PTR + dwNumberOfBytesTransferred: DWORD + + PDispatcher* = ref object of PDispatcherBase + ioPort: Handle + handles: HashSet[AsyncFD] + connectEx*: WSAPROC_CONNECTEX + acceptEx*: WSAPROC_ACCEPTEX + getAcceptExSockAddrs*: WSAPROC_GETACCEPTEXSOCKADDRS + transmitFile*: WSAPROC_TRANSMITFILE + getQueuedCompletionStatusEx*: LPFN_GETQUEUEDCOMPLETIONSTATUSEX + + PtrCustomOverlapped* = ptr CustomOverlapped + + RefCustomOverlapped* = ref CustomOverlapped + + AsyncFD* = distinct int + + proc getModuleHandle(lpModuleName: WideCString): Handle {. + stdcall, dynlib: "kernel32", importc: "GetModuleHandleW", sideEffect.} + proc getProcAddress(hModule: Handle, lpProcName: cstring): pointer {. + stdcall, dynlib: "kernel32", importc: "GetProcAddress", sideEffect.} + proc rtlNtStatusToDosError(code: uint64): ULONG {. + stdcall, dynlib: "ntdll", importc: "RtlNtStatusToDosError", sideEffect.} + + proc hash(x: AsyncFD): Hash {.borrow.} + proc `==`*(x: AsyncFD, y: AsyncFD): bool {.borrow, gcsafe.} + + proc getFunc(s: SocketHandle, fun: var pointer, guid: var GUID): bool = + var bytesRet: DWORD + fun = nil + result = WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, addr guid, + sizeof(GUID).DWORD, addr fun, sizeof(pointer).DWORD, + addr bytesRet, nil, nil) == 0 + + proc globalInit() {.raises: [Defect, OSError].} = + var wsa: WSAData + if wsaStartup(0x0202'i16, addr wsa) != 0: + raiseOSError(osLastError()) + + proc initAPI(loop: PDispatcher) {.raises: [Defect, CatchableError].} = + var + WSAID_TRANSMITFILE = GUID( + D1: 0xb5367df0'i32, D2: 0xcbac'i16, D3: 0x11cf'i16, + D4: [0x95'i8, 0xca'i8, 0x00'i8, 0x80'i8, + 0x5f'i8, 0x48'i8, 0xa1'i8, 0x92'i8]) + + let kernel32 = getModuleHandle(newWideCString("kernel32.dll")) + loop.getQueuedCompletionStatusEx = cast[LPFN_GETQUEUEDCOMPLETIONSTATUSEX]( + getProcAddress(kernel32, "GetQueuedCompletionStatusEx")) + + let sock = winlean.socket(winlean.AF_INET, 1, 6) + if sock == INVALID_SOCKET: + raiseOSError(osLastError()) + + var funcPointer: pointer = nil + if not getFunc(sock, funcPointer, WSAID_CONNECTEX): + let err = osLastError() + close(sock) + raiseOSError(err) + loop.connectEx = cast[WSAPROC_CONNECTEX](funcPointer) + if not getFunc(sock, funcPointer, WSAID_ACCEPTEX): + let err = osLastError() + close(sock) + raiseOSError(err) + loop.acceptEx = cast[WSAPROC_ACCEPTEX](funcPointer) + if not getFunc(sock, funcPointer, WSAID_GETACCEPTEXSOCKADDRS): + let err = osLastError() + close(sock) + raiseOSError(err) + loop.getAcceptExSockAddrs = cast[WSAPROC_GETACCEPTEXSOCKADDRS](funcPointer) + if not getFunc(sock, funcPointer, WSAID_TRANSMITFILE): + let err = osLastError() + close(sock) + raiseOSError(err) + loop.transmitFile = cast[WSAPROC_TRANSMITFILE](funcPointer) + close(sock) + + proc newDispatcher*(): PDispatcher {.raises: [Defect, CatchableError].} = + ## Creates a new Dispatcher instance. + var res = PDispatcher() + res.ioPort = createIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 1) + when declared(initHashSet): + # After 0.20.0 Nim's stdlib version + res.handles = initHashSet[AsyncFD]() + else: + # Pre 0.20.0 Nim's stdlib version + res.handles = initSet[AsyncFD]() + when declared(initHeapQueue): + # After 0.20.0 Nim's stdlib version + res.timers = initHeapQueue[TimerCallback]() + else: + # Pre 0.20.0 Nim's stdlib version + res.timers = newHeapQueue[TimerCallback]() + res.callbacks = initDeque[AsyncCallback](64) + res.callbacks.addLast(SentinelCallback) + res.idlers = initDeque[AsyncCallback]() + res.trackers = initTable[string, TrackerBase]() + initAPI(res) + res + + var gDisp{.threadvar.}: PDispatcher ## Global dispatcher + + proc setThreadDispatcher*(disp: PDispatcher) {.gcsafe, raises: [Defect].} + proc getThreadDispatcher*(): PDispatcher {.gcsafe, raises: [Defect].} + + proc getIoHandler*(disp: PDispatcher): Handle = + ## Returns the underlying IO Completion Port handle (Windows) or selector + ## (Unix) for the specified dispatcher. + return disp.ioPort + + proc register*(fd: AsyncFD) {.raises: [Defect, CatchableError].} = + ## Register file descriptor ``fd`` in thread's dispatcher. + let loop = getThreadDispatcher() + if createIoCompletionPort(fd.Handle, loop.ioPort, + cast[CompletionKey](fd), 1) == 0: + raiseOSError(osLastError()) + loop.handles.incl(fd) + + proc unregister*(fd: AsyncFD) {.raises: [Defect, CatchableError].} = + ## Unregisters ``fd``. + getThreadDispatcher().handles.excl(fd) + + proc poll*() {.raises: [Defect, CatchableError].} = + ## Perform single asynchronous step, processing timers and completing + ## tasks. Blocks until at least one event has completed. + ## + ## Exceptions raised here indicate that waiting for tasks to be unblocked + ## failed - exceptions from within tasks are instead propagated through + ## their respective futures and not allowed to interrrupt the poll call. + let loop = getThreadDispatcher() + var + curTime = Moment.now() + curTimeout = DWORD(0) + events: array[MaxEventsCount, OVERLAPPED_ENTRY] + + # On reentrant `poll` calls from `processCallbacks`, e.g., `waitFor`, + # complete pending work of the outer `processCallbacks` call. + # On non-reentrant `poll` calls, this only removes sentinel element. + processCallbacks(loop) + + # Moving expired timers to `loop.callbacks` and calculate timeout + loop.processTimersGetTimeout(curTimeout) + + let networkEventsCount = + if isNil(loop.getQueuedCompletionStatusEx): + let res = getQueuedCompletionStatus( + loop.ioPort, + addr events[0].dwNumberOfBytesTransferred, + addr events[0].lpCompletionKey, + cast[ptr POVERLAPPED](addr events[0].lpOverlapped), + curTimeout + ) + if res == WINBOOL(0): + let errCode = osLastError() + if not(isNil(events[0].lpOverlapped)): + 1 + else: + if int32(errCode) != WAIT_TIMEOUT: + raiseOSError(errCode) + 0 + else: + 1 + else: + var eventsReceived = ULONG(0) + let res = loop.getQueuedCompletionStatusEx( + loop.ioPort, + addr events[0], + ULONG(len(events)), + eventsReceived, + curTimeout, + WINBOOL(0) + ) + if res == WINBOOL(0): + let errCode = osLastError() + if int32(errCode) != WAIT_TIMEOUT: + raiseOSError(errCode) + 0 + else: + eventsReceived + + for i in 0 ..< networkEventsCount: + var customOverlapped = events[i].lpOverlapped + customOverlapped.data.errCode = + block: + let res = cast[uint64](customOverlapped.internal) + if res == 0'u64: + OSErrorCode(-1) + else: + OSErrorCode(rtlNtStatusToDosError(res)) + customOverlapped.data.bytesCount = events[i].dwNumberOfBytesTransferred + let acb = AsyncCallback(function: customOverlapped.data.cb, + udata: cast[pointer](customOverlapped)) + loop.callbacks.addLast(acb) + + # Moving expired timers to `loop.callbacks`. + loop.processTimers() + + # We move idle callbacks to `loop.callbacks` only if there no pending + # network events. + if networkEventsCount == 0: + loop.processIdlers() + + # All callbacks which will be added during `processCallbacks` will be + # scheduled after the sentinel and are processed on next `poll()` call. + loop.callbacks.addLast(SentinelCallback) + processCallbacks(loop) + + # All callbacks done, skip `processCallbacks` at start. + loop.callbacks.addFirst(SentinelCallback) + + proc closeSocket*(fd: AsyncFD, aftercb: CallbackFunc = nil) = + ## Closes a socket and ensures that it is unregistered. + let loop = getThreadDispatcher() + loop.handles.excl(fd) + close(SocketHandle(fd)) + if not isNil(aftercb): + var acb = AsyncCallback(function: aftercb) + loop.callbacks.addLast(acb) + + proc closeHandle*(fd: AsyncFD, aftercb: CallbackFunc = nil) = + ## Closes a (pipe/file) handle and ensures that it is unregistered. + let loop = getThreadDispatcher() + loop.handles.excl(fd) + discard closeHandle(Handle(fd)) + if not isNil(aftercb): + var acb = AsyncCallback(function: aftercb) + loop.callbacks.addLast(acb) + + proc contains*(disp: PDispatcher, fd: AsyncFD): bool = + ## Returns ``true`` if ``fd`` is registered in thread's dispatcher. + return fd in disp.handles + +elif unixPlatform: + const + SIG_IGN = cast[proc(x: cint) {.raises: [], noconv, gcsafe.}](1) + + type + AsyncFD* = distinct cint + + SelectorData* = object + reader*: AsyncCallback + writer*: AsyncCallback + + PDispatcher* = ref object of PDispatcherBase + selector: Selector[SelectorData] + keys: seq[ReadyKey] + + proc `==`*(x, y: AsyncFD): bool {.borrow, gcsafe.} + + proc globalInit() = + # We are ignoring SIGPIPE signal, because we are working with EPIPE. + posix.signal(cint(SIGPIPE), SIG_IGN) + + proc initAPI(disp: PDispatcher) {.raises: [Defect, CatchableError].} = + discard + + proc newDispatcher*(): PDispatcher {.raises: [Defect, CatchableError].} = + ## Create new dispatcher. + var res = PDispatcher() + res.selector = newSelector[SelectorData]() + when declared(initHeapQueue): + # After 0.20.0 Nim's stdlib version + res.timers = initHeapQueue[TimerCallback]() + else: + # Before 0.20.0 Nim's stdlib version + res.timers.newHeapQueue() + res.callbacks = initDeque[AsyncCallback](64) + res.callbacks.addLast(SentinelCallback) + res.idlers = initDeque[AsyncCallback]() + res.keys = newSeq[ReadyKey](64) + res.trackers = initTable[string, TrackerBase]() + initAPI(res) + res + + var gDisp{.threadvar.}: PDispatcher ## Global dispatcher + + proc setThreadDispatcher*(disp: PDispatcher) {.gcsafe, raises: [Defect].} + proc getThreadDispatcher*(): PDispatcher {.gcsafe, raises: [Defect].} + + proc getIoHandler*(disp: PDispatcher): Selector[SelectorData] = + ## Returns system specific OS queue. + return disp.selector + + proc register*(fd: AsyncFD) {.raises: [Defect, CatchableError].} = + ## Register file descriptor ``fd`` in thread's dispatcher. + let loop = getThreadDispatcher() + var data: SelectorData + loop.selector.registerHandle(int(fd), {}, data) + + proc unregister*(fd: AsyncFD) {.raises: [Defect, CatchableError].} = + ## Unregister file descriptor ``fd`` from thread's dispatcher. + getThreadDispatcher().selector.unregister(int(fd)) + + proc contains*(disp: PDispatcher, fd: AsyncFD): bool {.inline.} = + ## Returns ``true`` if ``fd`` is registered in thread's dispatcher. + result = int(fd) in disp.selector + + proc addReader*(fd: AsyncFD, cb: CallbackFunc, udata: pointer = nil) {. + raises: [Defect, IOSelectorsException, ValueError].} = + ## Start watching the file descriptor ``fd`` for read availability and then + ## call the callback ``cb`` with specified argument ``udata``. + let loop = getThreadDispatcher() + var newEvents = {Event.Read} + withData(loop.selector, int(fd), adata) do: + let acb = AsyncCallback(function: cb, udata: udata) + adata.reader = acb + newEvents.incl(Event.Read) + if not(isNil(adata.writer.function)): + newEvents.incl(Event.Write) + do: + raise newException(ValueError, "File descriptor not registered.") + loop.selector.updateHandle(int(fd), newEvents) + + proc removeReader*(fd: AsyncFD) {. + raises: [Defect, IOSelectorsException, ValueError].} = + ## Stop watching the file descriptor ``fd`` for read availability. + let loop = getThreadDispatcher() + var newEvents: set[Event] + withData(loop.selector, int(fd), adata) do: + # We need to clear `reader` data, because `selectors` don't do it + adata.reader = default(AsyncCallback) + if not(isNil(adata.writer.function)): + newEvents.incl(Event.Write) + do: + raise newException(ValueError, "File descriptor not registered.") + loop.selector.updateHandle(int(fd), newEvents) + + proc addWriter*(fd: AsyncFD, cb: CallbackFunc, udata: pointer = nil) {. + raises: [Defect, IOSelectorsException, ValueError].} = + ## Start watching the file descriptor ``fd`` for write availability and then + ## call the callback ``cb`` with specified argument ``udata``. + let loop = getThreadDispatcher() + var newEvents = {Event.Write} + withData(loop.selector, int(fd), adata) do: + let acb = AsyncCallback(function: cb, udata: udata) + adata.writer = acb + newEvents.incl(Event.Write) + if not(isNil(adata.reader.function)): + newEvents.incl(Event.Read) + do: + raise newException(ValueError, "File descriptor not registered.") + loop.selector.updateHandle(int(fd), newEvents) + + proc removeWriter*(fd: AsyncFD) {. + raises: [Defect, IOSelectorsException, ValueError].} = + ## Stop watching the file descriptor ``fd`` for write availability. + let loop = getThreadDispatcher() + var newEvents: set[Event] + withData(loop.selector, int(fd), adata) do: + # We need to clear `writer` data, because `selectors` don't do it + adata.writer = default(AsyncCallback) + if not(isNil(adata.reader.function)): + newEvents.incl(Event.Read) + do: + raise newException(ValueError, "File descriptor not registered.") + loop.selector.updateHandle(int(fd), newEvents) + + proc closeSocket*(fd: AsyncFD, aftercb: CallbackFunc = nil) = + ## Close asynchronous socket. + ## + ## Please note, that socket is not closed immediately. To avoid bugs with + ## closing socket, while operation pending, socket will be closed as + ## soon as all pending operations will be notified. + ## You can execute ``aftercb`` before actual socket close operation. + let loop = getThreadDispatcher() + + proc continuation(udata: pointer) = + if SocketHandle(fd) in loop.selector: + try: + unregister(fd) + except CatchableError as exc: + raiseAsDefect(exc, "unregister failed") + + close(SocketHandle(fd)) + if not isNil(aftercb): + aftercb(nil) + + withData(loop.selector, int(fd), adata) do: + # We are scheduling reader and writer callbacks to be called + # explicitly, so they can get an error and continue work. + # Callbacks marked as deleted so we don't need to get REAL notifications + # from system queue for this reader and writer. + + if not(isNil(adata.reader.function)): + loop.callbacks.addLast(adata.reader) + adata.reader = default(AsyncCallback) + + if not(isNil(adata.writer.function)): + loop.callbacks.addLast(adata.writer) + adata.writer = default(AsyncCallback) + + # We can't unregister file descriptor from system queue here, because + # in such case processing queue will stuck on poll() call, because there + # can be no file descriptors registered in system queue. + var acb = AsyncCallback(function: continuation) + loop.callbacks.addLast(acb) + + proc closeHandle*(fd: AsyncFD, aftercb: CallbackFunc = nil) = + ## Close asynchronous file/pipe handle. + ## + ## Please note, that socket is not closed immediately. To avoid bugs with + ## closing socket, while operation pending, socket will be closed as + ## soon as all pending operations will be notified. + ## You can execute ``aftercb`` before actual socket close operation. + closeSocket(fd, aftercb) + + when ioselSupportedPlatform: + proc addSignal*(signal: int, cb: CallbackFunc, + udata: pointer = nil): int {. + raises: [Defect, IOSelectorsException, ValueError, OSError].} = + ## Start watching signal ``signal``, and when signal appears, call the + ## callback ``cb`` with specified argument ``udata``. Returns signal + ## identifier code, which can be used to remove signal callback + ## via ``removeSignal``. + let loop = getThreadDispatcher() + var data: SelectorData + result = loop.selector.registerSignal(signal, data) + withData(loop.selector, result, adata) do: + adata.reader = AsyncCallback(function: cb, udata: udata) + do: + raise newException(ValueError, "File descriptor not registered.") + + proc removeSignal*(sigfd: int) {. + raises: [Defect, IOSelectorsException].} = + ## Remove watching signal ``signal``. + let loop = getThreadDispatcher() + loop.selector.unregister(sigfd) + + proc poll*() {.raises: [Defect, CatchableError].} = + ## Perform single asynchronous step. + let loop = getThreadDispatcher() + var curTime = Moment.now() + var curTimeout = 0 + + when ioselSupportedPlatform: + let customSet = {Event.Timer, Event.Signal, Event.Process, + Event.Vnode} + + # On reentrant `poll` calls from `processCallbacks`, e.g., `waitFor`, + # complete pending work of the outer `processCallbacks` call. + # On non-reentrant `poll` calls, this only removes sentinel element. + processCallbacks(loop) + + # Moving expired timers to `loop.callbacks` and calculate timeout. + loop.processTimersGetTimeout(curTimeout) + + # Processing IO descriptors and all hardware events. + let count = loop.selector.selectInto(curTimeout, loop.keys) + for i in 0..